Lucidrains 系列项目源码解析(一百)(luci开发)

Lucidrains 系列项目源码解析(一百).\\lucidrains\\toolformer-pytorch\\toolformer_pytorch\\__init__.py
# 从 toolformer_pytorch.palm 模块中导入

.\\lucidrains\\toolformer-pytorch\\toolformer_pytorch\\__init__.py

从#toolformer_pytorch.palm 模块导入PaLM 类

从toolformer_pytorch.palm 导入PaLM

#toolformer_pytorch.从toolformer_pytorch模块导入以下函数和类

从toolformer_pytorch.toolformer_pytorch 导入(

Toolformer, # 导入Toolformer 类

filter_tokens_with_api_response, # 导入filter_tokens_with_api_response函数

Sample, # 导入示例函数

Sample_with_api_call, #导入sample_with_api_call函数

has_api_calls, # 导入has_api_calls 函数

invoke_tools, # 导入invoke_tools函数

Replace_all_but_first #导入replace_all_but_first函数

TPDNE (wip)

这个人并不存在,所以现在我们正在重建它,同时尝试使所有内容都足够开源和模块化,以便任何人都可以部署他们梦想的GAN(或很快成为Masu)。发布2 步DDPM)

可能还需要一些时间才能将我的狗“灌注”到机器中去做我希望她永远梦想的事情。

Explained

该网站托管在Hetzner 的GPU 服务器上,每月费用为100 美元,并且图像是实时生成的,因此人们永远不会用完通过该网站体验到的面孔数量。其中许多神经网络可以毫不夸张地向公众描述为“人工智能的无尽梦想”。

这种缩放是如何毫无问题地进行的呢?实际上,该网站有一个神奇的技巧,可以在任何时间点显示相同的图像,所花的时间比人类的反应时间还要短。当用户研究一张面孔并刷新时,下一张面孔就在那里,但这是世界上每个人同时经历的同一张面孔。

该模型本身由Tero Karras 进行训练,名称为StyleGAN 2。

Install

$ pip 安装TPDNE-utils

Usage

从TPDNE_utils 导入sample_image_and_save_repeatly

# 以3D ndarray 形式返回采样图像的函数

defgenerate_image():

将numpy 导入为np

返回np.random.randn(1024, 1024, 3)

# 每250 毫秒保存一个新的样本图像为out/sampled.jpeg

Sample_image_and_save_repeatly(generate_image, \’输出/采样\’)

# 使用nginx 提供out/sampled.jpeg

# 可以选择放在Cloudflare 后面

Todo

nginx 模板自动处理不同类型的张量输出,自动检测通道维度,最后通过自动检测nginx 范围句柄ssl 将图像张量非规范化为0-255 uint8。

Citations

@inproceedings{Karras2020ada,

title={用有限数据训练生成对抗网络},

作者={Tero Karras、Miika Aittala、Janne Hellsten、Samuli Laine、Jaakko Lehtinen、Timo Aila},

书名={NeurIPS},

年份={2020}

}

.\\lucidrains\\TPDNE\\setup.py

#导入配置工具和搜索包的能力

从setuptools、find_packages 导入安装程序

# 设置包信息

环境(

name=\’TPDNE-utils\’, #包名称

package=find_packages(exclude=[]), # 查找所有包

version=\’0.0.11\’, # 版本号

License=\’MIT\’, # 许可证

描述=\’TPDNE\’, # 描述

include_package_data=True, #包含所有数据文件

作者=\’Phil Wang\’, # 作者

author_email=\’lucidrains@gmail.com\’, # 作者的电子邮件地址

long_description_content_type=\’text/markdown\’, # 长描述内容类型

url=\’https://github.com/lucidrains/TPDNE\’, # 项目链接

关键字=[

\’此人不存在\’#关键字

],

install_requires=[ # 安装依赖项

「熊型」

\’einops=0.6\’,

《神社2》,

“粘糊糊的”,

\’枕头\’

],

classifiers=[ # 分类

“开发状态: 4 – 测试版”,

\’目标受众:开发者\’,

\’主题: 科学/工程: 人工智能\’,

\’许可证: OSI 批准的: MIT 许可证\’,

\’编程语言: Python : 3.6\’,

],

.\\lucidrains\\TPDNE\\TPDNE_utils\\tpdne.py

#导入需要的库

导入操作系统

导入系统

将numpy 导入为np

时间导入时间、睡眠

从pathlib导入路径

从functools 导入包装

从PIL导入图像

#导入第三方库

从裸类型导入裸类型

from beartype.typing import 可调用,可选

重新排列并重复einops 导入

jinja2导入环境,来自FileSystemLoader

# 获取当前脚本路径和父目录

脚本路径=路径(__文件__)

current_dir=script_path.parents[0]

# 设置模板环境

环境=环境(加载器=FileSystemLoader(str(current_dir)))

# 获取模板文件

nginx_template=environment.get_template(\’nginx.conf.tmpl\’)

systemd_service_template=environment.get_template(\’tpdne.service.tmpl\’)

#定义一个辅助函数

默认存在(val):

返回值不为None

# 处理图像张量的函数

def auto_handle_image_tensor(t):

如果t.ndim==4:

t=t[0] # 假设批次是第一维,获取第一个样本

如果t.ndim==2:

t=reverserange(t, \’h w – h w 1\’) # 假设它是灰度图像

如果t.shape[0]=3:

t=reverserange(t, \’c h w – h w c\’) # 将通道放在第一位

assert t.shape[-1]=3, \’图像张量必须以形状(高度、宽度、通道)形式返回。通道为3 或1\’

如果t.shape[-1]==1:

t=repeat(t, \’h w 1 – h w c\’, c=3) # 处理单通道图像

# 处理缩放

如果t.dtype==np.float:

has_negatives=np.any(t 0)

如果有_负数:

t=t * 127.5 + 128

: 其他

t=t * 255

t=t.astype(np.uint8)

返回t.clip(0, 255)

主功能

@beartype

defsample_image_and_save_repeatly(

fn: Callable[. np.ndarray], # 返回形状数组(3, width, height) 的函数

Output_path: str=\’./out/random\’, # 不带扩展名的输出图像路径(以webp格式保存)

*,

call_every_ms: int=250, # 采样频率

tmp_dir: str=\’/tmp\’, # 保存临时图像的目录

num_rotated_tmp_images: int=10,

image_format: str=\’jpeg\’,

verbose: 布尔=真,

质量=99,

size_image_to: 选项[int]=无,

generate_favicon: 布尔=真,

favicon_size: int=32,

generate_nginx_conf: 布尔=真,

symbolic_link_nginx_conf: 布尔=真,

nginx_sites_available_path: str=\’/etc/nginx/sites-available\’,

nginx_conf_filename=\’默认\’,

generate_systemd_service_conf: 布尔=假,

systemd_service_path: str=\’/etc/systemd/system\’,

systemd_service_name=\’tpdne\’,

域名=\’_\’

):

断言0 质量=100

在{16, 32} 上断言favicon_size

断言image_format 为{\’jpeg\’, \’png\’, \’webp\’}

tmp_dir=路径(tmp_dir)

输出路径=路径(输出路径)

assert Output_path.suffix==\’\’, \’输出路径后缀由`image_format`关键字arg自动确定\’

输出路径=输出路径.with_suffix(f\’.{图像格式}\’)

每调用秒数=每调用毫秒数/1000

断言tmp_dir.is_dir()

root=输出路径.parents[0]

root.mkdir(父=True,exist_ok=True)

tmp_image_index=0

# 链接nginx

ifgenerate_nginx_conf:

nginx_sites_path=路径(nginx_sites_available_path)

nginx_sites_conf_path=nginx_sites_path/nginx_conf_文件名

断言nginx_sites_path.is_dir()

nginx_conf_text=nginx_template.render(

root=str(root.resolve()),

索引=输出路径.名称,

服务器名=域名

tmp_conf_path=路径(tmp_dir/\’nginx.server.conf\’)

tmp_conf_path.write_text(nginx_conf_text)

print(f\’nginx 服务器配置是用{str(tmp_conf_path)}\’ 生成的)

ifsymbolic_link_nginx_conf:

os.system(f\’ln -nfs {str(tmp_conf_path)} {nginx_sites_conf_path}\’)

print(f\’nginxconf已链接到{nginx_sites_conf_path}\\n请运行`systemctl reload nginx` 才能生效\’)

# 如果需要生成systemd服务配置文件但当前没有随systemd启动

如果generate_systemd_service_conf不存在(os.getenv(\’LAUNCHED_FROM_SYSTEMD\’, None)):

#设置systemd服务路径

systemd_service_path=路径(systemd_service_path)

#设置systemd服务配置文件的路径

systemd_service_conf_path=systemd_service_path/f\'{systemd_service_name}.service\’

# 断言systemd 服务路径是一个目录

断言systemd_service_path.is_dir()

# 使用systemd 服务模板渲染systemd 配置文本

systemd_conf_text=systemd_service_template.render(

工作目录=str(current_dir.resolve()),

python_executable=sys.executable,

script_path=str(script_path.resolve())

# 创建临时服务路径并写入systemd配置文本

tmp_service_path=路径(tmp_dir/\’tpdne.services\’)

tmp_service_path.write_text(systemd_conf_text)

# 创建一个符号链接,将临时服务路径链接到systemd服务配置文件路径。

os.system(f\’ln -nfs {str(tmp_service_path)} {str(systemd_service_conf_path)}\’)

#打印提示信息

print(f\’service {systemd_service_name}.在{str(systemd_service_conf_path)} 创建的服务\’)

print(f\’运行`systemctl启用{systemd_service_name}.service`来启动此脚本\’)

print(f\’然后运行`systemctl status {systemd_service_name}.service`来检查状态\’)

# 退出程序

出口()

# 无限循环调用函数`fn`

而True:

开始=时间()

# 调用函数`fn`获取图像张量

图像张量=fn()

# 处理图像张量

图像张量=auto_handle_image_tensor(图像张量)

# 计算临时图像索引

tmp_image_index=(tmp_image_index + 1) % num_rotated_tmp_images

tmp_path=str(tmp_dir/f\'{tmp_image_index}.{image_format}\’)

# 使用PIL创建图像对象

pil_image=Image.fromarray(image_tensor, \’RGB\’)

# 如果有allocate_image_to 参数,则调整图像大小

如果存在(resize_image_to):

pil_image=pil_image.resize((resize_image_to, Resize_image_to))

# 根据图像格式设置不同的参数

image_save_kwargs=dict()

如果图像格式==\’jpeg\’:

image_save_kwargs=dict(优化=True, 渐进=True)

elif image_format==\’webp\’:

image_save_kwargs=dict(format=\’webp\’)

# 将图像保存到临时路径

pil_image.save(tmp_path,质量=质量,**image_save_kwargs)

# 创建一个符号链接,将临时图像路径链接到输出路径。

操作系统

m(f\’ln -nfs {tmp_path} {output_path}\’)
# 如果需要生成 favicon
if generate_favicon:
tmp_favicon_path = str(tmp_dir / f\’favicon_{tmp_image_index}.png\’)
output_favicon_path = output_path.parents[0] / \’favicon.png\’
# 缩小图像为 favicon 大小
small_pil_image = pil_image.resize((favicon_size, favicon_size))
small_pil_image.save(tmp_favicon_path)
os.system(f\’ln -nfs {tmp_favicon_path} {output_favicon_path}\’)
# 计算执行时间
elapsed = time() – start
# 如果 verbose 为 True,打印执行时间和路径信息
if verbose:
print(f\'{elapsed:.3f}s – tmp image at {tmp_path}, output image at {output_path}\’)
# 确保至少每隔 `call_every_seconds` 秒生成一次图像
if elapsed >= call_every_seconds:
continue
# 休眠直到下一次生成图像的时间点
sleep(call_every_seconds – elapsed)

.\\lucidrains\\TPDNE\\TPDNE_utils\\__init__.py

# 从 TPDNE_utils.tpdne 模块中导入 sample_image_and_save_repeatedly 函数
from TPDNE_utils.tpdne import sample_image_and_save_repeatedly

trRosetta – Pytorch

Implementation of trRosetta and trDesign for Pytorch, made into a convenient package, for protein structure prediction and design. The concept of trDesign will also be abstracted into a wrapper in this repository, so that it can be applied to Alphafold2 once it is replicated. Please join the efforts there if you would like to see this happen!

The original repository can be found here

Update – Xander has released trDesign for Pytorch!

Install

$ pip install tr-rosetta-pytorch

Usage

As a command-line tool, to run a structure prediction

$ tr_rosetta <input-file.a3m>

Code

import torch
from tr_rosetta_pytorch import trRosettaNetwork
model = trRosettaNetwork(
filters = 64,
kernel = 3,
num_layers = 61
).cuda()
x = torch.randn(1, 526, 140, 140).cuda()
theta, phi, distance, omega = model(x)

Citations

@article {Yang1496,
author = {Yang, Jianyi and Anishchenko, Ivan and Park, Hahnbeom and Peng, Zhenling and Ovchinnikov, Sergey and Baker, David},
title = {Improved protein structure prediction using predicted interresidue orientations},
URL = {https://www.pnas.org/content/117/3/1496},
eprint = {https://www.pnas.org/content/117/3/1496.full.pdf},
journal = {Proceedings of the National Academy of Sciences}
}

@article {Anishchenko2020.07.22.211482,
author = {Anishchenko, Ivan and Chidyausiku, Tamuka M. and Ovchinnikov, Sergey and Pellock, Samuel J. and Baker, David},
title = {De novo protein design by deep network hallucination},
URL = {https://www.biorxiv.org/content/early/2020/07/23/2020.07.22.211482},
eprint = {https://www.biorxiv.org/content/early/2020/07/23/2020.07.22.211482.full.pdf},
journal = {bioRxiv}
}

.\\lucidrains\\tr-rosetta-pytorch\\setup.py

# 导入设置工具和查找包工具
from setuptools import setup, find_packages
# 设置包的信息
setup(
name = \’tr-rosetta-pytorch\’, # 包的名称
packages = find_packages(), # 查找所有包
include_package_data = True, # 包含所有数据文件
entry_points={ # 设置入口点
\’console_scripts\’: [ # 控制台脚本
\’tr_rosetta = tr_rosetta_pytorch.cli:predict\’, # 脚本名称和执行函数
],
},
version = \’0.0.3\’, # 版本号
license=\’MIT\’, # 许可证
description = \’trRosetta – Pytorch\’, # 描述
author = \’Phil Wang\’, # 作者
author_email = \’lucidrains@gmail.com\’, # 作者邮箱
url = \’https://github.com/lucidrains/tr-rosetta-pytorch\’, # 项目链接
keywords = [ # 关键词
\’artificial intelligence\’,
\’protein folding\’,
\’protein design\’
],
install_requires=[ # 安装依赖
\’einops>=0.3\’,
\’fire\’,
\’numpy\’,
\’torch>=1.6\’
],
classifiers=[ # 分类器
\’Development Status :: 4 – Beta\’,
\’Intended Audience :: Developers\’,
\’Topic :: Scientific/Engineering :: Artificial Intelligence\’,
\’License :: OSI Approved :: MIT License\’,
\’Programming Language :: Python :: 3.6\’,
],
)

.\\lucidrains\\tr-rosetta-pytorch\\tr_rosetta_pytorch\\cli.py

# 导入必要的库
import fire
import torch
import tarfile
import numpy as np
from pathlib import Path
# 导入自定义模块
from tr_rosetta_pytorch.tr_rosetta_pytorch import trRosettaNetwork
from tr_rosetta_pytorch.utils import preprocess, d
# 定义路径常量
CURRENT_PATH = Path(__file__).parent
DEFAULT_MODEL_PATH = CURRENT_PATH / \’models\’
MODEL_PATH = DEFAULT_MODEL_PATH / \’models.tar.gz\’
MODEL_FILES = [*Path(DEFAULT_MODEL_PATH).glob(\’*.pt\’)]
# 如果模型文件未解压,则解压
if len(MODEL_FILES) == 0:
tar = tarfile.open(str(MODEL_PATH))
tar.extractall(DEFAULT_MODEL_PATH)
tar.close()
# 预测函数
@torch.no_grad()
def get_ensembled_predictions(input_file, output_file=None, model_dir=DEFAULT_MODEL_PATH):
# 创建 trRosettaNetwork 实例
net = trRosettaNetwork()
# 预处理输入文件
i = preprocess(input_file)
# 如果未指定输出文件,则根据输入文件生成默认输出文件名
if output_file is None:
input_path = Path(input_file)
output_file = f\'{input_path.parents[0] / input_path.stem}.npz\’
outputs = []
model_files = [*Path(model_dir).glob(\’*.pt\’)]
# 如果找不到模型文件,则抛出异常
if len(model_files) == 0:
raise \’No model files can be found\’
# 遍历模型文件,加载模型并进行预测
for model_file in model_files:
net.load_state_dict(torch.load(model_file, map_location=torch.device(d())))
net.to(d()).eval()
output = net(i)
outputs.append(output)
# 对模型输出进行平均处理
averaged_outputs = [torch.stack(model_output).mean(dim=0).cpu().numpy().squeeze(0).transpose(1,2,0) for model_output in zip(*outputs)]
# 创建包含预测结果的字典
output_dict = dict(zip([\’theta\’, \’phi\’, \’dist\’, \’omega\’], averaged_outputs))
# 保存预测结果到输出文件
np.savez_compressed(output_file, **output_dict)
print(f\’predictions for {input_file} saved to {output_file}\’)
# 定义命令行接口
def predict():
fire.Fire(get_ensembled_predictions)

.\\lucidrains\\tr-rosetta-pytorch\\tr_rosetta_pytorch\\tr_rosetta_pytorch.py

# 导入 torch 库
import torch
# 从 torch 库中导入 nn 模块和 einsum 函数
from torch import nn, einsum
# 从 torch.nn.functional 中导入 F 模块
import torch.nn.functional as F
# 定义 ELU 激活函数
def elu():
return nn.ELU(inplace=True)
# 定义 Instance Normalization 层
def instance_norm(filters, eps=1e-6, **kwargs):
return nn.InstanceNorm2d(filters, affine=True, eps=eps, **kwargs)
# 定义卷积层
def conv2d(in_chan, out_chan, kernel_size, dilation=1, **kwargs):
# 计算填充大小
padding = dilation * (kernel_size – 1) // 2
return nn.Conv2d(in_chan, out_chan, kernel_size, padding=padding, dilation=dilation, **kwargs)
# 定义 trRosettaNetwork 类,继承自 nn.Module
class trRosettaNetwork(nn.Module):
# 初始化函数
def __init__(self, filters=64, kernel=3, num_layers=61):
super().__init__()
self.filters = filters
self.kernel = kernel
self.num_layers = num_layers
# 第一个块
self.first_block = nn.Sequential(
conv2d(442 + 2 * 42, filters, 1),
instance_norm(filters),
elu()
)
# 带有不同扩张率的残差块堆叠
cycle_dilations = [1, 2, 4, 8, 16]
dilations = [cycle_dilations[i % len(cycle_dilations)] for i in range(num_layers)]
self.layers = nn.ModuleList([nn.Sequential(
conv2d(filters, filters, kernel, dilation=dilation),
instance_norm(filters),
elu(),
nn.Dropout(p=0.15),
conv2d(filters, filters, kernel, dilation=dilation),
instance_norm(filters)
) for dilation in dilations])
self.activate = elu()
# 转换为角度图和距离图
self.to_prob_theta = nn.Sequential(conv2d(filters, 25, 1), nn.Softmax(dim=1))
self.to_prob_phi = nn.Sequential(conv2d(filters, 13, 1), nn.Softmax(dim=1))
self.to_distance = nn.Sequential(conv2d(filters, 37, 1), nn.Softmax(dim=1))
self.to_prob_bb = nn.Sequential(conv2d(filters, 3, 1), nn.Softmax(dim=1))
self.to_prob_omega = nn.Sequential(conv2d(filters, 25, 1), nn.Softmax(dim=1))

# 前向传播函数
def forward(self, x):
x = self.first_block(x)
for layer in self.layers:
x = self.activate(x + layer(x))

prob_theta = self.to_prob_theta(x) # 角度图 theta
prob_phi = self.to_prob_phi(x) # 角度图 phi
x = 0.5 * (x + x.permute((0,1,3,2))) # 对称化
prob_distance = self.to_distance(x) # 距离图
# prob_bb = self.to_prob_bb(x) # beta-链配对(未使用)
prob_omega = self.to_prob_omega(x) # 角度图 omega
return prob_theta, prob_phi, prob_distance, prob_omega

.\\lucidrains\\tr-rosetta-pytorch\\tr_rosetta_pytorch\\utils.py

# 导入所需的库
import string
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
# 定义函数d,用于确定张量所在的设备(CPU或CUDA)
def d(tensor=None):
if tensor is None:
return \’cuda\’ if torch.cuda.is_available() else \’cpu\’
return \’cuda\’ if tensor.is_cuda else \’cpu\’
# 解析A3M文件并将字母转换为0到20的整数
def parse_a3m(filename):
# 创建字母表转换表,将小写字母转换为空格
table = str.maketrans(dict.fromkeys(string.ascii_lowercase))
# 读取A3M文件中的序列并进行转换
seqs = [line.strip().translate(table) for line in open(filename, \’r\’) if line[0] != \’>\’]
# 创建氨基酸字母表和MSA矩阵
alphabet = np.array(list(\”ARNDCQEGHILKMFPSTWYV-\”), dtype=\’|S1\’).view(np.uint8)
msa = np.array([list(s) for s in seqs], dtype=\’|S1\’).view(np.uint8)
# 将字母转换为数字
for i in range(alphabet.shape[0]):
msa[msa == alphabet[i]] = i
# 将所有未知字符视为间隔
msa[msa > 20] = 20
return msa
# 将1-hot MSA转换为PSSM
def msa2pssm(msa1hot, w):
beff = w.sum()
f_i = (w[:, None, None] * msa1hot).sum(dim=0) / beff + 1e-9
h_i = (-f_i * torch.log(f_i)).sum(dim=1)
return torch.cat((f_i, h_i[:, None]), dim=1)
# 根据截断值重新加权MSA
def reweight(msa1hot, cutoff):
id_min = msa1hot.shape[1] * cutoff
id_mtx = torch.einsum(\’ikl,jkl->ij\’, msa1hot, msa1hot)
id_mask = id_mtx > id_min
w = 1. / id_mask.float().sum(dim=-1)
return w
# 快速DCA(Direct Coupling Analysis)缩减协方差矩阵求逆
def fast_dca(msa1hot, weights, penalty = 4.5):
device = msa1hot.device
nr, nc, ns = msa1hot.shape
x = msa1hot.view(nr, -1)
num_points = weights.sum() – torch.sqrt(weights.mean())
mean = (x * weights[:, None]).sum(dim=0, keepdims=True) / num_points
x = (x – mean) * torch.sqrt(weights[:, None])
cov = (x.t() @ x) / num_points
cov_reg = cov + torch.eye(nc * ns).to(device) * penalty / torch.sqrt(weights.sum())
inv_cov = torch.inverse(cov_reg)
x1 = inv_cov.view(nc, ns, nc, ns)
x2 = x1.transpose(1, 2).contiguous()
features = x2.reshape(nc, nc, ns * ns)
x3 = torch.sqrt((x1[:, :-1, :, :-1] ** 2).sum(dim=(1, 3))) * (1 – torch.eye(nc).to(device))
apc = x3.sum(dim=0, keepdims=True) * x3.sum(dim=1, keepdims=True) / x3.sum()
contacts = (x3 – apc) * (1 – torch.eye(nc).to(device))
return torch.cat((features, contacts[:, :, None]), dim=2)
# 预处理函数,将MSA文件转换为适用于神经网络的输入
def preprocess(msa_file, wmin=0.8, ns=21):
a3m = torch.from_numpy(parse_a3m(msa_file)).long()
nrow, ncol = a3m.shape
msa1hot = F.one_hot(a3m, ns).float().to(d())
w = reweight(msa1hot, wmin).float().to(d())
# 1D序列特征
f1d_seq = msa1hot[0, :, :20].float()
f1d_pssm = msa2pssm(msa1hot, w)
f1d = torch.cat((f1d_seq, f1d_pssm), dim=1)
f1d = f1d[None, :, :].reshape((1, ncol, 42))
# 2D序列特征
f2d_dca = fast_dca(msa1hot, w) if nrow > 1 else torch.zeros((ncol, ncol, 442)).float().to(d())
f2d_dca = f2d_dca[None, :, :, :]
f2d = torch.cat((
f1d[:, :, None, :].repeat(1, 1, ncol, 1),
f1d[:, None, :, :].repeat(1, ncol, 1, 1),
f2d_dca
), dim=-1)
f2d = f2d.view(1, ncol, ncol, 442 + 2*42)
return f2d.permute((0, 3, 2, 1))

.\\lucidrains\\tr-rosetta-pytorch\\tr_rosetta_pytorch\\__init__.py

# 从 tr_rosetta_pytorch 模块中导入 trRosettaNetwork 类
from tr_rosetta_pytorch.tr_rosetta_pytorch import trRosettaNetwork

Tranception – Pytorch (wip)

Implementation of Tranception, an attention network, paired with retrieval, that is SOTA for protein fitness prediction. The Transformer architecture is inspired by Primer, and uses ALiBi relative positional encoding

Install

$ pip install tranception-pytorch

Usage

import torch
from tranception_pytorch import Tranception
model = Tranception(
dim = 512,
depth = 6,
heads = 8,
dim_head = 64
)
amino_acids = torch.randint(0, 21, (1, 512))
logits = model(amino_acids) # (1, 512, 21)

Todo

grouped heads with customizable depthwise convs (for variable k-mers), as well as grouped alibi pos bias figure out attention to retrieved (looks like axial attention?) play around with protein gym, and start betting on huggingface’s accelerate

Citations

@article{Notin2022TranceptionPF,
title = {Tranception: protein fitness prediction with autoregressive transformers and inference-time retrieval},
author = {Pascal Notin and Mafalda Dias and Jonathan Frazer and Javier Marchena-Hurtado and Aidan N. Gomez and Debora S. Marks and Yarin Gal},
journal = {ArXiv},
year = {2022},
volume = {abs/2205.13760}
}

.\\lucidrains\\tranception-pytorch\\setup.py

# 导入设置工具和查找包工具
from setuptools import setup, find_packages
# 设置包的信息
setup(
# 包名
name = \’tranception-pytorch\’,
# 查找所有包,不排除任何包
packages = find_packages(exclude=[]),
# 版本号
version = \’0.0.8\’,
# 许可证
license=\’MIT\’,
# 描述
description = \’Tranception – Pytorch\’,
# 作者
author = \’Phil Wang\’,
# 作者邮箱
author_email = \’lucidrains@gmail.com\’,
# 长描述内容类型
long_description_content_type = \’text/markdown\’,
# 项目链接
url = \’https://github.com/lucidrains/tranception-pytorch\’,
# 关键词
keywords = [
\’artificial intelligence\’,
\’deep learning\’,
\’transformers\’,
\’attention mechanism\’,
\’protein fitness\’
],
# 安装依赖
install_requires=[
\’einops>=0.4\’,
\’einops-exts\’,
\’torch>=1.6\’,
],
# 分类
classifiers=[
\’Development Status :: 4 – Beta\’,
\’Intended Audience :: Developers\’,
\’Topic :: Scientific/Engineering :: Artificial Intelligence\’,
\’License :: OSI Approved :: MIT License\’,
\’Programming Language :: Python :: 3.6\’,
],
)

.\\lucidrains\\tranception-pytorch\\tranception_pytorch\\tranception_pytorch.py

# 导入 math 模块
import math
# 导入 torch 模块
import torch
# 导入 torch.nn.functional 模块,并重命名为 F
import torch.nn.functional as F
# 从 torch 模块中导入 nn、einsum 模块
from torch import nn, einsum
# 从 einops 模块中导入 rearrange 函数
from einops import rearrange
# 从 einops_exts 模块中导入 rearrange_many 函数
from einops_exts import rearrange_many
# 从 einops.layers.torch 模块中导入 Rearrange 类
from einops.layers.torch import Rearrange
# 辅助函数
# 判断变量是否存在的函数
def exists(val):
return val is not None
# 如果变量存在则返回该变量,否则返回默认值的函数
def default(val, d):
return val if exists(val) else d
# 相对位置偏置
# 自定义类 LearnedAlibiPosBias 继承自 nn.Module
class LearnedAlibiPosBias(nn.Module):
# 初始化函数
def __init__(self, heads):
super().__init__()
self.heads = heads
# 计算斜率并转换为张量
slopes = torch.Tensor(self._get_slopes(heads))
slopes = rearrange(slopes, \’h -> h 1 1\’)
self.slopes = nn.Parameter(slopes)
# 注册缓冲区 bias
self.register_buffer(\’bias\’, None, persistent = False)
# 获取相对位置偏置的函数
def get_bias(self, i, j, device):
i_arange = torch.arange(i, device = device)
j_arange = torch.arange(j, device = device)
bias = -torch.abs(rearrange(j_arange, \’j -> 1 1 j\’) – rearrange(i_arange, \’i -> 1 i 1\’))
return bias
# 静态方法,用于获取斜率
@staticmethod
def _get_slopes(heads):
def get_slopes_power_of_2(n):
start = (2**(-2**-(math.log2(n)-3)))
ratio = start
return [start*ratio**i for i in range(n)]
if math.log2(heads).is_integer():
return get_slopes_power_of_2(heads)
closest_power_of_2 = 2 ** math.floor(math.log2(heads))
return get_slopes_power_of_2(closest_power_of_2) + get_slopes_power_of_2(2 * closest_power_of_2)[0::2][:heads-closest_power_of_2]
# 前向传播函数
def forward(self, qk_sim):
h, i, j, device = *qk_sim.shape[-3:], qk_sim.device
if exists(self.bias) and self.bias.shape[-1] >= j:
return self.bias[…, :i, :j]
bias = self.get_bias(i, j, device)
bias = bias * self.slopes
num_heads_unalibied = h – bias.shape[0]
bias = F.pad(bias, (0, 0, 0, 0, 0, num_heads_unalibied))
self.register_buffer(\’bias\’, bias, persistent = False)
return bias
# 辅助类
# 自定义类 ReluSquared 继承自 nn.Module
class ReluSquared(nn.Module):
\”\”\” found with neural architecture search in Primer paper \”\”\”
# 前向传播函数
def forward(self, x):
return F.relu(x) ** 2
# 定义 FeedForward 函数
def FeedForward(dim, mult = 4):
hidden_dim = int(dim * mult)
return nn.Sequential(
nn.LayerNorm(dim),
nn.Linear(dim, hidden_dim),
ReluSquared(),
nn.Linear(hidden_dim, dim)
)
# 自定义类 DepthwiseConv1d 继承自 nn.Module
class DepthwiseConv1d(nn.Module):
# 初始化函数
def __init__(self, dim, kernel_size, causal = True):
super().__init__()
assert (kernel_size % 2) == 1
self.padding = (kernel_size – 1, 0) if causal else (kernel_size // 2, kernel_size // 2)
self.conv = nn.Conv1d(dim, dim, kernel_size = kernel_size, groups = dim)
# 前向传播函数
def forward(self, x):
x = F.pad(x, self.padding)
return self.conv(x)
# 自定义类 Attention 继承自 nn.Module
class Attention(nn.Module):
# 初始化函数
def __init__(
self,
*,
dim,
heads = 8,
dim_head = 64,
causal = False,
ds_conv_kernel_sizes = (0, 3, 5, 7) # heads were grouped into 4 groups and given a depthwise conv after the queries / keys / values projection
):
# 调用父类的初始化方法
super().__init__()
# 设置头数等于卷积核大小的组数,确保头数大于等于组数且头数能被组数整除
self.groups = len(ds_conv_kernel_sizes)
assert heads >= self.groups and (heads % self.groups) == 0, f\’heads must be greater than {self.groups} and divisible by {self.groups}\’
# 设置缩放因子为头尺寸的负平方根
self.scale = dim_head ** -0.5
# 是否使用因果卷积
self.causal = causal
self.heads = heads
self.heads_per_group = heads // self.groups
inner_dim = heads * dim_head
# 对输入进行 LayerNorm
self.norm = nn.LayerNorm(dim)
# 用 1×1 卷积层将输入转换为查询、键、值
self.to_qkv = nn.Conv1d(dim, inner_dim * 3, 1, bias = False)
# 使用不同卷积核大小的深度卷积进行 4 组头的处理
self.qkv_ds_convs = nn.ModuleList([])
for _ in range(3): # for queries, keys, values
ds_convs = nn.ModuleList([])
for kernel_size in ds_conv_kernel_sizes:
if kernel_size == 0:
ds_convs.append(nn.Identity())
continue
ds_convs.append(DepthwiseConv1d(dim_head * self.heads_per_group, kernel_size, causal = causal))
self.qkv_ds_convs.append(ds_convs)
# 为 4 组头学习位置偏置
self.learned_alibi_pos_biases = nn.ModuleList([LearnedAlibiPosBias(heads = self.heads_per_group) for _ in range(self.groups)])
# 输出投影
self.to_out = nn.Linear(inner_dim, dim, bias = False)
def forward(self, x):
device, heads_per_group = x.device, self.heads_per_group
# 对输入进行 LayerNorm,并重新排列维度
x = self.norm(x)
x = rearrange(x, \’b n d -> b d n\’)
# 将输入转换为查询、键、值
q, k, v = self.to_qkv(x).chunk(3, dim = 1)
# 重新排列查询、键、值的维度
q, k, v = rearrange_many((q, k, v), \’b (h d) n -> b h d n\’, h = self.heads)
# 对分组头应用因果深度卷积
def apply_causal_ds_conv_to_grouped_heads(args):
projs, ds_convs = args
batch = projs.shape[0]
projs = rearrange_many(projs.split(heads_per_group, dim = 1), \’b h d n -> b (h d) n\’)
conv_out = [fn(t) for fn, t in zip(ds_convs, projs)]
conv_out = map(lambda t: rearrange(t, \’b (h d) n -> b h d n\’, h = heads_per_group), conv_out)
conv_out = torch.cat(tuple(conv_out), dim = 1)
return rearrange(conv_out, \’b h d n -> b h n d\’)
q, k, v = map(apply_causal_ds_conv_to_grouped_heads, zip((q, k, v), self.qkv_ds_convs))
# 缩放和计算相似度
q = q * self.scale
sim = einsum(\’b h i d, b h j d -> b h i j\’, q, k)
# 对 4 组头应用学习的位置偏置
grouped_sims = sim.split(self.heads // self.groups, dim = 1)
grouped_sims = [(alibi(sim_group) + sim_group) for alibi, sim_group in zip(self.learned_alibi_pos_biases, grouped_sims)]
sim = torch.cat(grouped_sims, dim = 1)
# 因果掩码
if self.causal:
i, j = sim.shape[-2:]
causal_mask = torch.ones((i, j), dtype = torch.bool, device = device).triu(j – i + 1)
sim = sim.masked_fill(causal_mask, -torch.finfo(sim.dtype).max)
# 注意力机制
attn = sim.softmax(dim = -1)
out = einsum(\’b h i j, b h j d -> b h i d\’, attn, v)
# 合并头
out = rearrange(out, \’b h n d -> b n (h d)\’)
return self.to_out(out)
# 定义一个名为 Tranception 的类,继承自 nn.Module
class Tranception(nn.Module):
# 初始化函数,接受一系列参数
def __init__(
self,
*,
dim, # 特征维度
depth, # 模型深度
num_tokens = 21, # 标记数量,默认为 21
heads = 8, # 多头注意力机制中的头数,默认为 8
dim_head = 64, # 每个头的维度,默认为 64
ff_mult = 4, # FeedForward 层的倍数,默认为 4
ds_conv_kernel_sizes = (0, 3, 5, 7), # 下采样卷积的内核大小,默认为 (0, 3, 5, 7)
causal = True # 是否使用因果卷积,默认为 True
):
super().__init__() # 调用父类的初始化函数
self.token_emb = nn.Embedding(num_tokens, dim) # 创建一个标记嵌入层
self.layers = nn.ModuleList([]) # 创建一个空的模块列表
for _ in range(depth): # 根据深度循环
self.layers.append(nn.ModuleList([ # 向模块列表中添加模块列表
Attention(dim = dim, heads = heads, dim_head = dim_head, ds_conv_kernel_sizes = ds_conv_kernel_sizes, causal = causal), # 添加注意力层
FeedForward(dim, mult = ff_mult) # 添加前馈神经网络层
]))
self.to_logits = nn.Sequential( # 创建一个序列模块
nn.LayerNorm(dim), # 添加层归一化层
nn.Linear(dim, num_tokens) # 添加线性层
)
# 前向传播函数,接受输入 x 和掩码 mask,默认为 None
def forward(
self,
x,
mask = None
):
x = self.token_emb(x) # 将输入 x 通过标记嵌入层
for attn, ff in self.layers: # 遍历模块列表中的模块
x = attn(x) + x # 执行注意力层并将结果与输入相加
x = ff(x) + x # 执行前馈神经网络层并将结果与输入相加
return self.to_logits(x) # 返回经过线性层处理后的结果

.\\lucidrains\\tranception-pytorch\\tranception_pytorch\\__init__.py

# 从 tranception_pytorch.tranception_pytorch 模块中导入 Tranception 类
from tranception_pytorch.tranception_pytorch import Tranception

Transformer in Transformer

Implementation of Transformer in Transformer, pixel level attention paired with patch level attention for image classification, in Pytorch.

AI Coffee Break with Letitia

Install

$ pip install transformer-in-transformer

Usage

import torch
from transformer_in_transformer import TNT
tnt = TNT(
image_size = 256, # size of image
patch_dim = 512, # dimension of patch token
pixel_dim = 24, # dimension of pixel token
patch_size = 16, # patch size
pixel_size = 4, # pixel size
depth = 6, # depth
num_classes = 1000, # output number of classes
attn_dropout = 0.1, # attention dropout
ff_dropout = 0.1 # feedforward dropout
)
img = torch.randn(2, 3, 256, 256)
logits = tnt(img) # (2, 1000)

Citations

@misc{han2021transformer,
title = {Transformer in Transformer},
author = {Kai Han and An Xiao and Enhua Wu and Jianyuan Guo and Chunjing Xu and Yunhe Wang},
year = {2021},
eprint = {2103.00112},
archivePrefix = {arXiv},
primaryClass = {cs.CV}
}

.\\lucidrains\\transformer-in-transformer\\setup.py

# 导入设置工具和查找包工具
from setuptools import setup, find_packages
# 设置包的元数据
setup(
name = \’transformer-in-transformer\’, # 包名
packages = find_packages(), # 查找所有包
version = \’0.1.2\’, # 版本号
license=\’MIT\’, # 许可证
description = \’Transformer in Transformer – Pytorch\’, # 描述
author = \’Phil Wang\’, # 作者
author_email = \’lucidrains@gmail.com\’, # 作者邮箱
url = \’https://github.com/lucidrains/transformer-in-transformer\’, # 项目链接
keywords = [ # 关键词列表
\’artificial intelligence\’,
\’deep learning\’,
\’transformer\’,
\’image classification\’
],
install_requires=[ # 安装依赖
\’einops>=0.3\’,
\’torch>=1.6\’
],
classifiers=[ # 分类器列表
\’Development Status :: 4 – Beta\’,
\’Intended Audience :: Developers\’,
\’Topic :: Scientific/Engineering :: Artificial Intelligence\’,
\’License :: OSI Approved :: MIT License\’,
\’Programming Language :: Python :: 3.6\’,
],
)

.\\lucidrains\\transformer-in-transformer\\transformer_in_transformer\\tnt.py

# 导入 torch 库
import torch
# 导入 torch 中的函数库
import torch.nn.functional as F
# 从 torch 中导入 nn 和 einsum 模块
from torch import nn, einsum
# 从 einops 中导入 rearrange 和 repeat 函数
from einops import rearrange, repeat
# 从 einops.layers.torch 中导入 Rearrange 类
# 辅助函数
# 判断值是否存在
def exists(val):
return val is not None
# 如果值存在则返回该值,否则返回默认值
def default(val, d):
return val if exists(val) else d
# 判断值是否可以被除数整除
def divisible_by(val, divisor):
return (val % divisor) == 0
# 计算展开后的输出尺寸
def unfold_output_size(image_size, kernel_size, stride, padding):
return int(((image_size – kernel_size + (2 * padding)) / stride) + 1)
# 类
# 预处理层
class PreNorm(nn.Module):
def __init__(self, dim, fn):
super().__init__()
# 使用 LayerNorm 对输入进行归一化
self.norm = nn.LayerNorm(dim)
self.fn = fn
def forward(self, x, **kwargs):
# 对输入进行归一化后,传入下一层处理
return self.fn(self.norm(x), **kwargs)
# 前馈神经网络
class FeedForward(nn.Module):
def __init__(self, dim, mult = 4, dropout = 0.):
super().__init__()
# 神经网络结构:全连接层 -> GELU 激活函数 -> Dropout -> 全连接层
self.net = nn.Sequential(
nn.Linear(dim, dim * mult),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(dim * mult, dim)
)
def forward(self, x):
# 前馈神经网络的前向传播
return self.net(x)
# 注意力机制
class Attention(nn.Module):
def __init__(
self,
*,
dim,
heads = 8,
dim_head = 64,
dropout = 0.
):
super().__init__()
inner_dim = heads * dim_head
self.heads = heads
self.scale = dim_head ** -0.5
# 将输入转换为查询、键、值
self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)
# 输出层结构:全连接层 -> Dropout
self.to_out = nn.Sequential(
nn.Linear(inner_dim, dim),
nn.Dropout(dropout)
)
def forward(self, x):
b, n, d, h = *x.shape, self.heads
q, k, v = self.to_qkv(x).chunk(3, dim = -1)
q, k, v = map(lambda t: rearrange(t, \’b n (h d) -> (b h) n d\’, h = h), (q, k, v))
# 计算注意力分数
sim = einsum(\’b i d, b j d -> b i j\’, q, k) * self.scale
attn = sim.softmax(dim = -1)
# 计算输出
out = einsum(\’b i j, b j d -> b i d\’, attn, v)
out = rearrange(out, \'(b h) n d -> b n (h d)\’, h = h)
return self.to_out(out)
# 主类
class TNT(nn.Module):
def __init__(
self,
*,
image_size,
patch_dim,
pixel_dim,
patch_size,
pixel_size,
depth,
num_classes,
channels = 3,
heads = 8,
dim_head = 64,
ff_dropout = 0.,
attn_dropout = 0.,
unfold_args = None
# 初始化函数,设置模型参数
):
# 调用父类的初始化函数
super().__init__()
# 检查图像大小是否能被分块大小整除
assert divisible_by(image_size, patch_size), \’image size must be divisible by patch size\’
# 检查分块大小是否能被像素大小整除
assert divisible_by(patch_size, pixel_size), \’patch size must be divisible by pixel size for now\’
# 计算分块令牌的数量
num_patch_tokens = (image_size // patch_size) ** 2
# 设置模型参数
self.image_size = image_size
self.patch_size = patch_size
self.patch_tokens = nn.Parameter(torch.randn(num_patch_tokens + 1, patch_dim))
# 设置默认的展开参数
unfold_args = default(unfold_args, (pixel_size, pixel_size, 0))
unfold_args = (*unfold_args, 0) if len(unfold_args) == 2 else unfold_args
kernel_size, stride, padding = unfold_args
# 计算像素宽度和像素数量
pixel_width = unfold_output_size(patch_size, kernel_size, stride, padding)
num_pixels = pixel_width ** 2
# 定义将像素转换为令牌的模块
self.to_pixel_tokens = nn.Sequential(
Rearrange(\’b c (h p1) (w p2) -> (b h w) c p1 p2\’, p1 = patch_size, p2 = patch_size),
nn.Unfold(kernel_size = kernel_size, stride = stride, padding = padding),
Rearrange(\’… c n -> … n c\’),
nn.Linear(channels * kernel_size ** 2, pixel_dim)
)
# 初始化分块位置编码和像素位置编码
self.patch_pos_emb = nn.Parameter(torch.randn(num_patch_tokens + 1, patch_dim))
self.pixel_pos_emb = nn.Parameter(torch.randn(num_pixels, pixel_dim))
# 创建模型层
layers = nn.ModuleList([])
for _ in range(depth):
# 定义将像素转换为分块的模块
pixel_to_patch = nn.Sequential(
nn.LayerNorm(pixel_dim),
Rearrange(\’… n d -> … (n d)\’),
nn.Linear(pixel_dim * num_pixels, patch_dim),
)
# 添加模型层
layers.append(nn.ModuleList([
PreNorm(pixel_dim, Attention(dim = pixel_dim, heads = heads, dim_head = dim_head, dropout = attn_dropout)),
PreNorm(pixel_dim, FeedForward(dim = pixel_dim, dropout = ff_dropout)),
pixel_to_patch,
PreNorm(patch_dim, Attention(dim = patch_dim, heads = heads, dim_head = dim_head, dropout = attn_dropout)),
PreNorm(patch_dim, FeedForward(dim = patch_dim, dropout = ff_dropout)),
]))
# 设置模型层和 MLP 头部
self.layers = layers
self.mlp_head = nn.Sequential(
nn.LayerNorm(patch_dim),
nn.Linear(patch_dim, num_classes)
)
# 前向传播函数
def forward(self, x):
# 获取输入张量的形状和模型参数
b, _, h, w, patch_size, image_size = *x.shape, self.patch_size, self.image_size
# 检查输入的高度和宽度是否能被分块大小整除
assert divisible_by(h, patch_size) and divisible_by(w, patch_size), f\’height {h} and width {w} of input must be divisible by the patch size\’
# 计算分块的数量
num_patches_h = h // patch_size
num_patches_w = w // patch_size
n = num_patches_w * num_patches_h
# 将输入张量转换为像素令牌和分块令牌
pixels = self.to_pixel_tokens(x)
patches = repeat(self.patch_tokens[:(n + 1)], \’n d -> b n d\’, b = b)
# 添加分块位置编码和像素位置编码
patches += rearrange(self.patch_pos_emb[:(n + 1)], \’n d -> () n d\’)
pixels += rearrange(self.pixel_pos_emb, \’n d -> () n d\’)
# 遍历模型层,进行注意力和前馈计算
for pixel_attn, pixel_ff, pixel_to_patch_residual, patch_attn, patch_ff in self.layers:
pixels = pixel_attn(pixels) + pixels
pixels = pixel_ff(pixels) + pixels
patches_residual = pixel_to_patch_residual(pixels)
patches_residual = rearrange(patches_residual, \'(b h w) d -> b (h w) d\’, h = num_patches_h, w = num_patches_w)
patches_residual = F.pad(patches_residual, (0, 0, 1, 0), value = 0) # cls token gets residual of 0
patches = patches + patches_residual
patches = patch_attn(patches) + patches
patches = patch_ff(patches) + patches
# 提取分类令牌并通过 MLP 头部进行分类预测
cls_token = patches[:, 0]
return self.mlp_head(cls_token)

.\\lucidrains\\transformer-in-transformer\\transformer_in_transformer\\__init__.py

# 从transformer_in_transformer包中导入TNT类
from transformer_in_transformer.tnt import TNT

Transframer – Pytorch (wip)

Implementation of Transframer, Deepmind’s U-net + Transformer architecture for up to 30 seconds video generation, in Pytorch

The gist of the paper is the usage of a Unet as a multi-frame encoder, along with a regular transformer decoder cross attending and predicting the rest of the frames. The author builds upon his prior work where images are encoded as sparse discrete cosine transform (DCT) sequences.

I will deviate from the implementation in this paper, using a hierarchical autoregressive transformer, and just a regular resnet block in place of the NF-net block (this design choice is just Deepmind reusing their own code, as NF-net was developed at Deepmind by Brock et al).

Update: On further meditation, there is nothing new in this paper except for generative modeling on DCT representations

Appreciation

This work would not be possible without the generous sponsorship from Stability AI, as well as my other sponsors

Todo

figure out if dct can be directly extracted from images in jpeg format

Citations

@article{Nash2022TransframerAF,
title = {Transframer: Arbitrary Frame Prediction with Generative Models},
author = {Charlie Nash and Jo{\\~a}o Carreira and Jacob Walker and Iain Barr and Andrew Jaegle and Mateusz Malinowski and Peter W. Battaglia},
journal = {ArXiv},
year = {2022},
volume = {abs/2203.09494}
}

.\\lucidrains\\transframer-pytorch\\setup.py

# 导入设置工具和查找包的函数
from setuptools import setup, find_packages
# 设置包的元数据
setup(
name = \’transframer-pytorch\’, # 包的名称
packages = find_packages(exclude=[]), # 查找所有包
version = \’0.0.1\’, # 版本号
license=\’MIT\’, # 许可证
description = \’Transframer – Pytorch\’, # 描述
author = \’Phil Wang\’, # 作者
author_email = \’lucidrains@gmail.com\’, # 作者邮箱
long_description_content_type = \’text/markdown\’, # 长描述内容类型
url = \’https://github.com/lucidrains/transframer-pytorch\’, # 项目链接
keywords = [ # 关键词列表
\’artificial intelligence\’,
\’deep learning\’,
\’transformers\’,
\’attention mechanism\’,
\’unets\’,
\’video generation\’
],
install_requires=[ # 安装依赖
\’einops>=0.4\’,
\’kornia\’,
\’torch>=1.6\’,
],
classifiers=[ # 分类器列表
\’Development Status :: 4 – Beta\’,
\’Intended Audience :: Developers\’,
\’Topic :: Scientific/Engineering :: Artificial Intelligence\’,
\’License :: OSI Approved :: MIT License\’,
\’Programming Language :: Python :: 3.6\’,
],
)

.\\lucidrains\\transframer-pytorch\\transframer_pytorch\\transframer_pytorch.py

# 从 math 模块中导入 sqrt 和 pi 函数
# 从 functools 模块中导入 partial 函数
import torch
# 从 torch.nn.functional 模块中导入 F
import torch.nn.functional as F
# 从 torch.fft 模块中导入 fft 和 irfft 函数
from torch.fft import fft, irfft
# 从 torch 模块中导入 nn 和 einsum 函数
from torch import nn, einsum
# 从 einops 模块中导入 rearrange 和 repeat 函数
from einops import rearrange, repeat
# 从 kornia.color.ycbcr 模块中导入 rgb_to_ycbcr 和 ycbcr_to_rgb 函数
# helpers
# 定义 exists 函数,判断值是否存在
def exists(val):
return val is not None
# 定义 default 函数,如果值存在则返回该值,否则返回默认值
def default(val, d):
return val if exists(val) else d
# tensor helpers
# 定义 l2norm 函数,对张量进行 L2 归一化
def l2norm(t):
return F.normalize(t, dim = -1)
# dct related encoding / decoding functions
# 定义 dct 函数,进行离散余弦变换
# 函数来源于 https://github.com/zh217/torch-dct/blob/master/torch_dct/_dct.py
# 修复了大多数 torch 版本 > 1.9 的问题,使用最新的 fft 和 irfft
def dct(x, norm = None):
shape, dtype, device = x.shape, x.dtype, x.device
N = shape[-1]
x = rearrange(x.contiguous(), \’… n -> (…) n\’)
v = torch.cat([x[:, ::2], x[:, 1::2].flip((1,))], dim = 1)
vc = torch.view_as_real(fft(v, dim=1))
k = -torch.arange(N, dtype = dtype, device = device) * pi / (2 * N)
k = rearrange(k, \’n -> 1 n\’)
v = vc[:, :, 0] * k.cos() – vc[:, :, 1] * k.sin()
if norm == \’ortho\’:
v[:, 0] /= sqrt(N) * 2
v[:, 1:] /= sqrt(N / 2) * 2
v *= 2
return v.view(*shape)
# 定义 idct 函数,进行逆离散余弦变换
def idct(x, norm = None):
shape, dtype, device = x.shape, x.dtype, x.device
N = shape[-1]
x_v = rearrange(x.contiguous(), \’… n -> (…) n\’) / 2
if norm == \’ortho\’:
x_v[:, 0] *= sqrt(N) * 2
x_v[:, 1:] *= sqrt(N / 2) * 2
k = torch.arange(N, dtype = dtype, device = device) * pi / (2 * N)
k = rearrange(k, \’n -> 1 n\’)
w_r = torch.cos(k)
w_i = torch.sin(k)
v_t_r = x_v
v_t_i = torch.cat([x_v[:, :1] * 0, -x_v.flip((1,))[:, :-1]], dim = 1)
v_r = v_t_r * w_r – v_t_i * w_i
v_i = v_t_r * w_i + v_t_i * w_r
v = torch.stack((v_r, v_i), dim = -1)
v = irfft(torch.view_as_complex(v), n = N, dim = 1)
x = torch.zeros_like(v)
x[:, ::2] += v[:, :N – (N // 2)]
x[:, 1::2] += v.flip((1,))[:, :N // 2]
return x.view(*shape)
# 定义 dct_2d 函数,对二维张量进行离散余弦变换
def dct_2d(x, norm = None):
dct_ = partial(dct, norm = norm)
x1 = dct_(x)
x2 = dct_(rearrange(x1, \’… h w -> … w h\’))
return rearrange(x2, \’… h w -> … w h\’)
# 定义 idct_2d 函数,对二维张量进行逆离散余弦变换
def idct_2d(x, norm = None):
idct_ = partial(idct, norm = norm)
x1 = idct_(x)
x2 = idct_(rearrange(x1, \’… h w -> … w h\’))
return rearrange(x2, \’… h w -> … w h\’)
# 定义 blockify 函数,将张量分块
def blockify(x, block_size = 8):
assert block_size in {8, 16}
return rearrange(x, \’b c (h bs1) (w bs2) -> (b h w) c bs1 bs2\’, bs1 = block_size, bs2 = block_size)
# 定义 deblockify 函数,将分块的张量还原为原始形状
def deblockify(x, h, w, block_size = 8):
assert block_size in {8, 16}
return rearrange(x, \'(b h w) c bs1 bs2 -> b c (h bs1) (w bs2)\’, h = h, w = w)
# final functions from rgb -> dct and back
# 定义 images_to_dct 函数,将图像转换为离散余弦变换
def images_to_dct(images):
raise NotImplementedError
# 定义 dct_to_images 函数,将离散余弦��换转换为图像
def dct_to_images(images):
raise NotImplementedError
# feedforward
# 定义 FeedForward 类,包含线性层和 GELU 激活函数
def FeedForward(
dim,
*,
mult = 4.
):
inner_dim = int(dim * mult)
return nn.Sequential(
nn.LayerNorm(dim),
nn.Linear(dim, inner_dim, bias = False),
nn.GELU(),
nn.LayerNorm(inner_dim), # from normformer paper
nn.Linear(inner_dim, dim, bias = False)
)
# attention, what else?
# here we will use one headed key / values (as described in paper, from Noam Shazeer) – along with cosine sim attention
# 定义 Attention 类,包含多头注意力机制
class Attention(nn.Module):
def __init__(
self,
dim,
*,
dim_head = 64,
heads = 8,
scale = 10,
causal = False,
norm_context = False
):
super().__init__()
self.heads = heads
self.scale = scale
self.causal = causal
self.norm = nn.LayerNorm(dim)
self.norm_context = nn.LayerNorm(dim) if norm_context else nn.Identity()
self.to_q = nn.Linear(dim, dim_head * heads, bias = False)
self.to_kv = nn.Linear(dim, dim_head * 2, bias = False)
self.to_out = nn.Linear(dim_head * heads, dim, bias = False)
# 定义一个前向传播函数,接受输入 x,上下文 context 和上下文掩码 context_mask
def forward(
self,
x,
context = None,
context_mask = None
):
# 获取头数 h,缩放因子 scale,是否因果 causal,设备信息 device
h, scale, causal, device = self.heads, self.scale, self.causal, x.device
# 对输入 x 进行归一化处理
x = self.norm(x)
# 如果存在上下文 context,则使用上下文,否则使用输入 x 作为上下文
context = default(context, x)
# 将输入 x 转换为查询向量 q,并重新排列维度
q = self.to_q(x)
q = rearrange(q, \’b n (h d) -> b h n d\’, h = h)
# 如果存在上下文,则对上下文进行归一化处理
if exists(context):
context = self.norm_context(context)
# 将上下文转换为键值对 k, v,并按最后一个维度分割成两部分
k, v = self.to_kv(context).chunk(2, dim = -1)
# 对查询向量 q 和键向量 k 进行 L2 归一化
q, k = map(l2norm, (q, k))
# 计算查询向量 q 和键向量 k 之间的相似度矩阵 sim
sim = einsum(\’b h i d, b j d -> b h i j\’, q, k) * self.scale
# 计算掩码值,用于在相似度矩阵中进行掩码操作
mask_value = -torch.finfo(sim.dtype).max
# 如果存在上下文掩码,则对相似度矩阵进行掩码操作
if exists(context_mask):
context_mask = rearrange(context_mask, \’b j -> b 1 1 j\’)
sim = sim.masked_fill(context_mask, mask_value)
# 如果是因果注意力机制,则对相似度矩阵进行因果掩码操作
if causal:
i, j = sim.shape[-2:]
causal_mask = torch.ones((i, j), dtype = torch.bool, device = device).triu(j – i + 1)
sim = sim.masked_fill(causal_mask, mask_value)
# 对相似度矩阵进行 softmax 操作,得到注意力权重
attn = sim.softmax(dim = -1)
# 根据注意力权重计算输出向量 out
out = einsum(\’b h i j, b j d -> b h i d\’, attn, v)
# 重新排列输出向量的维度
out = rearrange(out, \’b h n d -> b n (h d)\’)
# 返回输出向量
return self.to_out(out)
# 定义一个名为 Block 的类,继承自 nn.Module
class Block(nn.Module):
# 初始化函数,接受输入维度 dim、输出维度 dim_out 和分组数 groups
def __init__(
self,
dim,
dim_out,
groups = 8
):
super().__init__()
# 创建一个卷积层,输入维度为 dim,输出维度为 dim_out,卷积核大小为 3,填充为 1
self.proj = nn.Conv2d(dim, dim_out, 3, padding = 1)
# 创建一个 GroupNorm 层,分组数为 groups,输出维度为 dim_out
self.norm = nn.GroupNorm(groups, dim_out)
# 创建一个 SiLU 激活函数层
self.act = nn.SiLU()
# 前向传播函数,接受输入 x
def forward(self, x):
# 对输入 x 进行卷积操作
x = self.proj(x)
# 对卷积结果进行 GroupNorm 操作
x = self.norm(x)
# 对 GroupNorm 结果进行 SiLU 激活函数操作
return self.act(x)
# 定义一个名为 ResnetBlock 的类,继承自 nn.Module
class ResnetBlock(nn.Module):
# 初始化函数,接受输入维度 dim、输出维度 dim_out 和分组数 groups
def __init__(
self,
dim,
dim_out,
groups = 8
):
super().__init__()
# 创建两个 Block 实例,分别作为 ResNet 块的两个子块
self.block1 = Block(dim, dim_out, groups = groups)
self.block2 = Block(dim_out, dim_out, groups = groups)
# 如果输入维度和输出维度不相等,则创建一个卷积层,否则创建一个恒等映射层
self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()
# 前向传播函数,接受输入 x
def forward(self, x):
# 对输入 x 进行第一个子块的操作
h = self.block1(x)
# 对第一个子块的输出进行第二个子块的操作
h = self.block2(h)
# 返回第一个子块的输出与输入 x 经过卷积的结果的和
return h + self.res_conv(x)
# 定义一个名为 UnetTransformerBlock 的类,继承自 nn.Module
class UnetTransformerBlock(nn.Module):
# 初始化函数,接受输入维度 dim、注意力头维度 dim_head 和注意力头数 heads
def __init__(
self,
dim,
*,
dim_head = 32,
heads = 8
):
super().__init__()
# 创建一个 Attention 层,输入维度为 dim,注意力头维度为 dim_head,注意力头数为 heads
self.attn = Attention(dim = dim, dim_head = dim_head, heads = heads)
# 创建一个 FeedForward 层,输入维度为 dim
self.ff = FeedForward(dim = dim)
# 前向传播函数,接受输入 x
def forward(self, x):
# 保存输入 x 的原始形状
orig_shape = x.shape
# 将输入 x 重排列为 \’b c …\’ 的形式
x = rearrange(x, \’b c … -> b (…) c\’)
# 对输入 x 进行注意力操作并加上原始输入 x
x = self.attn(x) + x
# 对加上注意力结果的 x 进行 FeedForward 操作并加上原始输入 x
x = self.ff(x) + x
# 将 x 重排列为 \’b n c\’ 的形式,再将其形状恢复为原始形状
x = rearrange(x, \’b n c -> b c n\’)
return x.reshape(*orig_shape)
# 定义一个名为 Unet 的类,继承自 nn.Module
class Unet(nn.Module):
# 初始化函数,接受输入维度 dim、输出维度 dim_out、注意力参数 attn_kwargs
def __init__(
self,
dim,
*,
dim_mults = (1, 2, 3, 4),
dim_out,
**attn_kwargs
):
super().__init__()
# 创建一个输出维度为 dim_out 的卷积层
self.to_out = nn.Conv2d(dim, dim_out, 1)
# 计算多层次维度倍增后的维度列表 dims
dims = [dim, *map(lambda t: t * dim, dim_mults)]
# 计算每一层次的维度对 dim_pairs
dim_pairs = tuple(zip(dims[:-1], dims[1:]))
# 中间维度为 dims 的最后一个元素
mid_dim = dims[-1]
# 创建下采样和上采样的模块列表
self.downs = nn.ModuleList([])
self.ups = nn.ModuleList([])
# 创建中间的 ResNet 块
self.mid = ResnetBlock(mid_dim, mid_dim)
# 遍历每一层次的维度对
for dim_in, dim_out in dim_pairs:
# 对每一层次创建下采样模块列表
self.downs.append(nn.ModuleList([
ResnetBlock(dim_in, dim_in),
UnetTransformerBlock(dim_in, **attn_kwargs),
nn.Conv2d(dim_in, dim_out, 3, 2, 1)
]))
# 对每一层次创建上采样模块列表
self.ups.insert(0, nn.ModuleList([
ResnetBlock(dim_out * 2, dim_out),
UnetTransformerBlock(dim_out, **attn_kwargs),
nn.ConvTranspose2d(dim_out, dim_in, 4, 2, 1)
]))
# 前向传播函数,接受输入 x
def forward(self, x):
# 保存每个下采样阶段的隐藏状态
hiddens = []
# 对每个下采样阶段的模块进行操作
for block, attn_block, downsample in self.downs:
x = block(x)
x = attn_block(x)
x = downsample(x)
hiddens.append(x)
# 对中间的 ResNet 块进行操作
x = self.mid(x)
# 对每个上采样阶段的模块进行操作
for block, attn_block, upsample in self.ups:
x = torch.cat((x, hiddens.pop()), dim = 1)
x = block(x)
x = attn_block(x)
x = upsample(x)
# 对输出进行卷积操作并重排列输出形状
out = self.to_out(x)
return rearrange(out, \’b c h w -> b (h w) c\’)
# 定义一个名为 Transframer 的类,继承自 nn.Module
class Transframer(nn.Module):
# 初始化函数,接受参数 unet、dim、depth、max_channels、max_positions、max_values、image_size、block_size、dim_head、heads、ff_mult 和 ignore_index
def __init__(
self,
*,
unet: Unet,
dim,
depth,
max_channels,
max_positions,
max_values,
image_size,
block_size = 8,
dim_head = 32,
heads = 8,
ff_mult = 4.,
ignore_index = -100
):
# 调用父类的构造函数
super().__init__()
# 初始化 UNet 模型
self.unet = unet
# 初始化起始标记
self.start_token = nn.Parameter(torch.randn(dim))
# 初始化块位置嵌入
self.block_pos_emb = nn.Parameter(torch.randn(2, (image_size // block_size), dim))
# 初始化通道嵌入
self.channels = nn.Embedding(max_channels, dim)
# 初始化位置嵌入
self.positions = nn.Embedding(max_positions, dim)
# 初始化值嵌入
self.values = nn.Embedding(max_values, dim)
# 初始化后处理层的 LayerNorm
self.postemb_norm = nn.LayerNorm(dim) # 在 Bloom 和 YaLM 中为了稳定性而完成
# 初始化层列表
self.layers = nn.ModuleList([])
# 循环创建深度个层
for _ in range(depth):
self.layers.append(nn.ModuleList([
Attention(dim, dim_head = dim_head, heads = heads, causal = True),
Attention(dim, dim_head = dim_head, heads = heads, norm_context = True),
FeedForward(dim, mult = ff_mult)
]))
# 初始化最终层的 LayerNorm
self.final_norm = nn.LayerNorm(dim)
# 为最终预测给通道和位置提供单独的嵌入
# 初始化轴向通道嵌入
self.axial_channels = nn.Embedding(max_channels, dim)
# 初始化轴向位置嵌入
self.axial_positions = nn.Embedding(max_positions, dim)
# 初始化轴向注意力机制
self.axial_attn = Attention(dim, dim_head = dim_head, heads = heads, causal = True)
# 初始化轴向前馈网络
self.axial_ff = FeedForward(dim, mult = ff_mult)
# 初始化轴向最终层的 LayerNorm
self.axial_final_norm = nn.LayerNorm(dim)
# 投影到逻辑回归
# 线性变换到通道的逻辑回归
self.to_channel_logits = nn.Linear(dim, max_channels)
# 线性变换到位置的逻辑回归
self.to_position_logits = nn.Linear(dim, max_positions)
# 线性变换到值的逻辑回归
self.to_value_logits = nn.Linear(dim, max_values)
# 设置忽略索引
self.ignore_index = ignore_index
# 获取块位置嵌入
def get_block_pos_emb(self):
block_pos_emb_h, block_pos_emb_w = self.block_pos_emb.unbind(dim = 0)
block_pos_emb = rearrange(block_pos_emb_h, \’h d -> h 1 d\’) + rearrange(block_pos_emb_w, \’w d -> 1 w d\’)
return rearrange(block_pos_emb, \’… d -> (…) d\’)
# 前向传播���数
def forward(
self,
x,
context_frames,
return_loss = False
):
# 断言输入张量 x 的最后一个维度为 3
assert x.shape[-1] == 3
# 使用上下文帧生成编码
encoded = self.unet(context_frames)
# 获取批次大小
batch = x.shape[0]
# 将输入张量 x 拆分为通道、位置和数值
channels, positions, values = x.unbind(dim=-1)
# 获取通道嵌入
channel_emb = self.channels(channels)
# 获取位置嵌入
position_emb = self.positions(positions)
# 获取数值嵌入
value_emb = self.values(values)
# 将通道、位置和数值嵌入相加得到总嵌入
embed = channel_emb + position_emb + value_emb
# 在嵌入前添加起始标记
start_token = repeat(self.start_token, \’d -> b 1 d\’, b=batch)
embed = torch.cat((start_token, embed), dim=1)
# 如果需要返回损失,则截取嵌入的最后一个元素
if return_loss:
embed = embed[:, :-1]
# 对嵌入进行后处理归一化
embed = self.postemb_norm(embed)
# 注意力层 + 交叉注意力层
for attn, cross_attn, ff in self.layers:
embed = attn(embed) + embed
embed = cross_attn(embed, encoded) + embed
embed = ff(embed) + embed
# 对最终嵌入进行归一化
embed = self.final_norm(embed)
# 进行轴向注意力,从通道 + 位置 + 数值的总嵌入到下一个通道 -> 下一个位置
axial_channels_emb = self.axial_channels(channels)
axial_positions_emb = self.axial_positions(positions)
# 将嵌入与轴向嵌入堆叠
embed = torch.stack((embed, axial_channels_emb, axial_positions_emb), dim=-2)
# 重新排列嵌入
embed = rearrange(embed, \’b m n d -> (b m) n d\’)
# 轴向注意力层
embed = self.axial_attn(embed) + embed
embed = self.axial_ff(embed) + embed
# 对轴向最终嵌入进行归一化
embed = self.axial_final_norm(embed)
# 重新排列嵌入
embed = rearrange(embed, \'(b m) n d -> b m n d\’, b=batch)
# 分离通道、位置和数值嵌入
pred_channel_embed, pred_position_embed, pred_value_embed = embed.unbind(dim=-2)
# 转换为 logits
channel_logits = self.to_channel_logits(pred_channel_embed)
position_logits = self.to_position_logits(pred_position_embed)
value_logits = self.to_value_logits(pred_value_embed)
# 如果不需要返回损失,则返回通道 logits、位置 logits 和���值 logits
if not return_loss:
return channel_logits, position_logits, value_logits
# 重新排列 logits
channel_logits, position_logits, value_logits = map(lambda t: rearrange(t, \’b n c -> b c n\’), (channel_logits, position_logits, value_logits))
# 交叉熵损失函数
ce = partial(F.cross_entropy, ignore_index=self.ignore_index)
# 计算通道、位置和数值的损失
channel_loss = ce(channel_logits, channels)
position_loss = ce(position_logits, positions)
value_loss = ce(value_logits, values)
# 返回平均损失
return (channel_loss + position_loss + value_loss) / 3

#以上关于Lucidrains 系列项目源码解析(一百)的相关内容来源网络仅供参考,相关信息请以官方公告为准!

原创文章,作者:CSDN,如若转载,请注明出处:https://www.sudun.com/ask/93038.html

Like (0)
CSDN的头像CSDN
Previous 2024年7月5日
Next 2024年7月5日

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注