.\\lucidrains\\chroma-pytorch\\chroma_pytorch\\semantic_conditioner.py
#导入需要的库
进口手电筒
导入操作系统
导入日志
从变压器导入AutoTokenizer、AutoModelForMaskedLM 和日志记录
从tf_bind_transformer.cache_utils 导入cache_fn和run_once
#设置日志级别为错误
记录.set_verbosity_error()
# 检查值是否存在
默认存在(val):
返回值不为None
#对字典中的值应用函数
def 映射值(fn,字典):
返回{k: fn(v) for k, v indictionary.items()}
# 检查环境变量是否设置为使用CPU进行上下文嵌入
CONTEXT_EMBED_USE_CPU=os.getenv(\’CONTEXT_EMBED_USE_CPU\’, None) 不是None
# 如果CPU配置为用于上下文嵌入,则打印提示消息
对于CONTEXT_EMBED_USE_CPU:
print(\’仅在CPU 上计算上下文嵌入\’)
# 预定义的模型尺寸和路径
模型=字典(
pubmed=字典(
暗淡=768,
路径=\’microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract\’,
)
)
# 用于存储模型和分词器的全局变量
GLOBAL_VARIABLES=dict(模型=无,分词器=无)
# 获取指定模型的上下文维度
def get_contextual_dim(模型名称):
使用MODELS 断言模型名称
返回MODELS[模型名称][\’dim\’]
# 初始化模型和分词器。仅运行一次。
@run_once(\’init_transformer\’)
def init_transformer(型号名称):
路径=MODELS[模型名称][\’路径\’]
GLOBAL_VARIABLES[\’tokenizer\’]=AutoTokenizer.from_pretrained(路径)
模型=AutoModelForMaskedLM.from_pretrained(路径)
# 如果未设置Use CPU for context embedding,则将模型移至GPU
否则CONTEXT_EMBED_USE_CPU:
模型=model.cuda()
GLOBAL_VARIABLES[\’模型\’]=模型
# 分割和编码文本
@torch.no_grad()
def tokenize_text(
句子,
最大长度=256,
型号名称=\’已发布\’,
隐藏状态索引=-1,
return_cls_token=True
):
init_transformer(模型名称)
模型=GLOBAL_VARIABLES[\’模型\’]
分词器=GLOBAL_VARIABLES[\’分词器\’]
编码=tokenizer.batch_encode_plus(
[文本],
add_special_tokens=True,
填充=真,
截断=真,
最大长度=最大长度,
return_attention_mask=True,
return_tensors=\’pt\’
)
# 如果未设置使用CPU 进行上下文嵌入,则将编码移至GPU
否则CONTEXT_EMBED_USE_CPU:
编码=映射值(lambda t: t.cuda(), 编码)
模型.eval()
使用torch.no_grad():
输出=模型(**编码,output_hidden_states=True)
隐藏_状态=输出.隐藏_状态[隐藏_状态索引][0]
如果返回_cls_token:
返回隐藏状态[0]
返回值hidden_state.mean(dim=0)
# 获取文本表示
def get_text_repr(
文本,
*,
设备,
最大长度=256,
型号名称=\’已发布\’,
隐藏状态索引=-1,
return_cls_token=True,
):
在MODELS 中断言model_name。 f\’在可用文本转换器中找不到{model_name}\’
# 如果输入的是字符串,则将其转换为列表
如果isinstance(文本,str):
文本=[文本]
# 缓存文本表示函数
get_context_repr_fn=cache_fn(tokenize_text, path=f\’contexts/{model_name}\’)
# 获取文本的表示
表达式=[get_context_repr_fn(text, max_length=max_length, model_name=model_name,hidden_state_index=hidden_state_index, return_cls_token=return_cls_token)(对于文本中的文本)]
返回torch.stack(表达式).to(设备)
.\\lucidrains\\chroma-pytorch\\chroma_pytorch\\__init__.py
# 从chroma_pytorch 包中导入Chroma 类
从chroma_pytorch.chroma_pytorch 导入Chroma
纸质图1
生成与冠状病毒刺突蛋白结合的蛋白质——贝克实验室的同步RFD 扩散工作
Chroma – Pytorch (wip)
Chroma(一种在Pytorch 中使用DDPM 和GNN 的蛋白质生成模型)的实现似乎表明,通过将去噪扩散随机模型应用于蛋白质设计,可以实现轻微的提升。射频扩散。
斯蒂芬·黑尔评论
如果您对开源此类工作感兴趣,请考虑加入OpenBioML。
Todo
使用卡拉狄加
Citations
@其他的{
title={用可编程生成模型照亮蛋白质空间},
作者={John Ingraham、Max Baranoff、Zach Costello、Vincent Frappier、Ahmed Ismail、Xiang Ti、Wujie Wang、Vincent Xu、Fritz Obermeyer、Andrew Beam、Gevorg Grigorian},
年={2022},
网址={https://cdn.generatebiomedicines.com/assets/ingraham2022.pdf}
}
.\\lucidrains\\chroma-pytorch\\setup.py
#导入配置工具和搜索包的能力
从setuptools、find_packages 导入安装程序
# 设置包元数据
环境(
name=\’chroma-pytorch\’, #包名
package=find_packages(exclude=[]), # 查找所有包
version=\’0.0.1\’, # 版本号
License=\’MIT\’, # 许可证
描述=\’Chroma – Pytorch\’, # 描述
作者=\’Phil Wang\’, # 作者
author_email=\’lucidrains@gmail.com\’, # 作者的电子邮件地址
long_description_content_type=\’text/markdown\’, # 长描述内容类型
url=\’https://github.com/lucidrains/chroma-pytorch\’, # 项目链接
关键字=[ # 关键字列表
\’人工智能\’,
“深度学习”
《噪声消除扩散》
《蛋白质设计》
],
install_requires=[ # 依赖包列表
\’einops=0.6\’,
“谨防不变量”,
\’火炬=1.6\’,
],
classifiers=[ # 分类器列表
“开发状态: 4 – 测试版”,
\’目标受众:开发者\’,
\’主题: 科学/工程: 人工智能\’,
\’许可证: OSI 批准的: MIT 许可证\’,
\’编程语言: Python : 3.6\’,
],
)
.\\lucidrains\\classifier-free-guidance-pytorch\\classifier_free_guidance_pytorch\\attend.py
#导入需要的库
从集合中导入命名元组
从functools 导入包装
从导入的版本包中
进口手电筒
火炬导入nn,来自einsum
将torch.nn.function 导入为F
重新排列并重复einops 导入
# 定义一个包含三个布尔参数的命名元组EfficientAttendantConfig
EfficientAttendanceConfig=namedtuple(\’EfficientAttendanceConfig\’, [\’enable_flash\’, \’enable_math\’, \’enable_mem_efficient\’])
# 定义有一个辅助函数来检查某个值是否存在
默认存在(val):
返回值不为None
# 定义一次装饰器,确保该函数只被调用一次。
德丰斯(fn):
称为=假
@rap(fn)
def 内部(x):
称为非本地
: 如果您致电
返回
称为=true
返回fn(x)
返回内部
# 用1 装饰打印函数,使其只打印一次
print_once=一次(打印)
#主班出勤率
课堂出勤(nn.模块):
def __init__(
自己,
辍学率=0.
因果关系=错误;
闪光=假
):
超级().__init__()
self.dropout=退出
self.attn_dropout=nn.Dropout(dropout)
self.causal=因果关系
self.register_buffer(\’掩码\’, 无, 持久=False)
self.flash=闪光
assert not (flash and version.parse(torch.__version__) version.parse(\’2.0.0\’)), \”要使用flash 注意,必须使用pytorch 2.0 或更高版本。\”
# 确定cuda和cpu的有效注意力设置
self.cpu_config=EfficientAttendanceConfig(真,真,真)
self.cuda_config=无
如果不是torch.cuda.is_available() 或flash:
返回
device_properties=torch.cuda.get_device_properties(torch.device(\’cuda\’))
如果device_properties.major==8 且device_properties.minor==0:
print_once(\’检测到A100 GPU。如果输入张量在cuda 中,则使用刷新注意\’)
self.cuda_config=EfficientAttendanceConfig(真,假,假)
: 其他
print_once(\’检测到非A100 GPU。如果输入张量在cuda 中,请使用数学或内存有效的注意力\’)
self.cuda_config=EfficientAttendantConfig(假,真,真)
#获取面具
def get_mask(自身,n,设备):
如果存在(self.mask) 且self.mask.shape[-1]=n:
返回self.mask[:n,n]
掩码=torch.ones((n, n), device=device, dtype=torch.bool).triu(1)
self.register_buffer(\’掩码\’, 掩码, 持久=False)
返回掩码
# 闪光注意功能
def flash_attn(self, q, k, v, mask=none):
_、头、q_len、_、k_len、is_cuda=*q.shape、k.shape[-2]、q.is_cuda
# 我们建议使用Tri Dao,注意多查询中的单键值
如果k.ndim==3:
k=重复(k, \’b . – b h .\’, h=表)
如果v.ndim==3:
v=重复(v, \’b . – b h .\’, h=表)
# 检查mask 是否存在并扩展为兼容的形状
如果存在(面罩) :
如果掩码.ndim==2:
掩码=排序(掩码, \’b j – b 1 1 j\’)
mask=mask.expand(-1, head, q_len, -1)
# 检查你的设备是否支持Flash Attention
config=self.cuda_config 如果is_cuda 否则self.cpu_config
# 使用torch.backends.cuda.sdp_kernel(**config._asdict()) 调用pytorch 2.0 flash 注意
使用torch.backends.cuda.sdp_kernel(**config._asdict()):
输出=F.scaled_dot_product_attention(
q、k、v、
attn_mask=掩码,
dropout_p=self.dropout 如果self.training 否则0.
is_causal=self.因果关系
)
返回
# 定义一个前向传播器,接受查询(q)、键(k)、值(v)和掩码(mask)作为输入参数
def 前向(自我,q,k,v,掩码=无):
””
爱因斯坦符号
B批
H头
n、i、j – 序列长度(碱基序列长度、源、目标)
d – 特征维度
””
# 获取序列长度n和设备信息
n, 设备=q.shape[-2], q.device
# 基于特征维度倒数根的缩放因子
比例=q.shape[-1] ** -0.5
# 如果开启了flash注意力机制,则调用flash_attn函数
对于self.flash:
返回self.flash_attn(q, k, v, mask=mask)
# 根据键维度确定键值对的Einsum方程
kv_einsum_eq=\’b j d\’ 如果k.ndim==3 否则\’b h j d\’
# 计算相似度
sim=einsum(f\’b h i d, {kv_einsum_eq} – b h i j\’, q, k) * 比例
# 按键填充掩码
如果存在(面罩) :
如果掩码.ndim==2:
掩码=排序(掩码, \’b j – b 1 1 j\’)
sim=sim.masked_fill(~mask, -torch.finfo(sim.dtype).max)
# 因果面具
如果自我因果:
causal_mask=self.get_mask(n, 设备)
sim=sim.masked_fill(causal_mask, -torch.finfo(sim.dtype).max)
# 注意力的权重
attn=sim.softmax(dim=-1)
attn=self.attn_dropout(attn)
# 合计值
out=einsum(f\’b h i j, {kv_einsum_eq} – b h i d\’, attn, v)
return out
.\\lucidrains\\classifier-free-guidance-pytorch\\classifier_free_guidance_pytorch\\bge.py
# 导入所需的模块和函数
from typing import List
from beartype import beartype
import torch
import transformers
from transformers import AutoTokenizer, AutoModel, AutoConfig
transformers.logging.set_verbosity_error()
# 创建 BGEAdapter 类
class BGEAdapter():
def __init__(
self,
name
):
# 设置模型名称
name = \’BAAI/bge-base-en-v1.5\’
# 根据模型名称加载对应的 tokenizer
tokenizer = AutoTokenizer.from_pretrained(name)
# 根据模型名称加载对应的 model
model = AutoModel.from_pretrained(name)
# 根据模型名称加载对应的配置
self.Config = AutoConfig.from_pretrained(name)
# 如果有可用的 CUDA 设备,则将模型移动到 CUDA 上
if torch.cuda.is_available():
model = model.to(\”cuda\”)
# 设置对象的名称、模型和 tokenizer
self.name = name
self.model = model
self.tokenizer = tokenizer
# 定义 dim_latent 属性,返回隐藏层的大小
@property
def dim_latent(self):
return self.Config.hidden_size
# 定义 max_text_len 属性,返回文本的最大长度
@property
def max_text_len(self):
return 512
# 定义 embed_text 方法,用于文本嵌入
@torch.no_grad()
@beartype
def embed_text(
self,
texts: List[str],
return_text_encodings = False,
output_device = None
):
# 使用 tokenizer 对文本进行编码
encoded_input = self.tokenizer(texts, padding=True, truncation=True, return_tensors=\’pt\’).to(\”cuda\”)
# 将模型设置为评估模式
self.model.eval()
# 使用模型对编码后的输入进行推理
with torch.no_grad():
model_output = self.model(**encoded_input)
# 如果不需要返回文本编码,则返回规范化后的 CLS 嵌入
if not return_text_encodings:
sentence_embeddings = model_output[0][:, 0]
sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)
return sentence_embeddings # 返回规范化后的 CLS 嵌入
# 如果需要返回文本编码,则返回最后一个隐藏状态,并根据输出设备进行转换
return model_output.last_hidden_state.to(output_device)
.\\lucidrains\\classifier-free-guidance-pytorch\\classifier_free_guidance_pytorch\\classifier_free_guidance_pytorch.py
# 导入必要的模块
from collections import namedtuple
from functools import wraps, partial, cache
import torch
import torch.nn.functional as F
from torch import nn, einsum, Tensor
from einops import rearrange, repeat, pack, unpack
from beartype import beartype
from beartype.door import is_bearable
from beartype.typing import Callable, Tuple, Optional, List, Literal, Union, Dict, Any
from inspect import signature
from classifier_free_guidance_pytorch.t5 import T5Adapter
from classifier_free_guidance_pytorch.open_clip import OpenClipAdapter
from classifier_free_guidance_pytorch.attend import Attend
from classifier_free_guidance_pytorch.bge import BGEAdapter
# 常量定义
COND_DROP_KEY_NAME = \’cond_drop_prob\’
TEXTS_KEY_NAME = \’texts\’
TEXT_EMBEDS_KEY_NAME = \’text_embeds\’
TEXT_CONDITIONER_NAME = \’text_conditioner\’
CONDITION_FUNCTION_KEY_NAME = \’cond_fns\’
# 定义命名元组
TextCondReturn = namedtuple(\’TextCondReturn\’, [
\’embed\’,
\’mask\’
])
# 辅助函数
# 判断值是否存在
def exists(val):
return val is not None
# 判断列表是否为空
def is_empty(l):
return len(l) == 0
# 返回第一个存在的值
def default(*values):
for value in values:
if exists(value):
return value
return None
# 将值转换为元组
def cast_tuple(val, length = 1):
return val if isinstance(val, tuple) else ((val,) * length)
# 将单个值打包成元组
def pack_one(x, pattern):
return pack([x], pattern)
# 从元组中解包单个值
def unpack_one(x, ps, pattern):
return unpack(x, ps, pattern)[0]
# 张量辅助函数
# 根据概率生成掩码张量
def prob_mask_like(shape, prob, device):
if prob == 1:
return torch.ones(shape, device = device, dtype = torch.bool)
elif prob == 0:
return torch.zeros(shape, device = device, dtype = torch.bool)
else:
return torch.zeros(shape, device = device).float().uniform_(0, 1) < prob
# 使用自动文本条件的分类器自由引导
# 装饰器函数,用于处理函数的参数和自动文本条件
@beartype
def classifier_free_guidance(
fn: Callable,
cond_drop_prob_keyname = COND_DROP_KEY_NAME,
texts_key_name = TEXTS_KEY_NAME,
text_embeds_key_name = TEXT_EMBEDS_KEY_NAME,
cond_fns_keyname = CONDITION_FUNCTION_KEY_NAME,
text_conditioner_name = TEXT_CONDITIONER_NAME
):
# 获取函数的参数信息
fn_params = signature(fn).parameters
# 判断是否需要自动处理文本条件
auto_handle_text_condition = texts_key_name not in fn_params and text_embeds_key_name not in fn_params
# 内部函数,用于实际执行分类器自由引导
@wraps(fn)
def inner(
self,
*args,
cond_scale: float = 1.,
rescale_phi: float = 0.,
cfg_routed_kwargs: Dict[str, Tuple[Any, Any]] = dict(), # 用于传递参数到前向和无效前向调用的字典(用于处理在使用 CFG 进行变换解码时的缓存)
**kwargs
@wraps(fn)
# 定义一个装饰器函数,用于包装原始函数
def fn_maybe_with_text(self, *args, **kwargs):
# 在可能包含文本的情况下,对原始函数进行包装
if auto_handle_text_condition:
# 如果自动处理文本条件为真
texts = kwargs.pop(\’texts\’, None)
text_embeds = kwargs.pop(\’text_embeds\’, None)
assert not (exists(texts) and exists(text_embeds))
# 断言不存在同时有texts和text_embeds
raw_text_cond = cond_fns = None
text_conditioner = getattr(self, text_conditioner_name, None)
# 获取文本条件器对象
cond_drop_prob = kwargs.pop(cond_drop_prob_keyname, None)
assert not exists(cond_drop_prob) or 0. <= cond_drop_prob <= 1.
# 断言不存在cond_drop_prob或者其值在0到1之间
# 自动将文本转换为条件函数
if exists(texts) ^ exists(text_embeds):
assert is_bearable(texts, Optional[List[str]]), f\’keyword `{texts_key_name}` must be a list of strings\’
# 断言texts是可接受的类型,必须是字符串列表
assert exists(text_conditioner) and is_bearable(text_conditioner, Conditioner), \’text_conditioner must be set on your network with the correct hidden dimensions to be conditioned on\’
# 断言存在text_conditioner并且其类型是Conditioner
text_condition_input = dict(texts = texts) if exists(texts) else dict(text_embeds = text_embeds)
cond_fns, raw_text_cond = text_conditioner(**text_condition_input, cond_drop_prob = cond_drop_prob)
# 调用文本条件器生成条件函数和原始文本条件
elif isinstance(text_conditioner, NullConditioner):
assert cond_drop_prob == 0., \’null conditioner has nothing to dropout\’
# 断言cond_drop_prob为0,空条件器没有需要丢弃的内容
cond_fns, raw_text_cond = text_conditioner()
# 调用空条件器
if \’cond_fns\’ in fn_params:
kwargs.update(cond_fns = cond_fns)
if \’raw_text_cond\’ in fn_params:
kwargs.update(raw_text_cond = raw_text_cond)
return fn(self, *args, **kwargs)
# 返回原始函数的结果
# 主分类器自由引导逻辑
if self.training:
assert cond_scale == 1, \’you cannot do condition scaling when in training mode\’
# 断言在训练模式下不能进行条件缩放
return fn_maybe_with_text(self, *args, **kwargs)
# 返回可能包含文本的函数结果
assert cond_scale >= 1, \’invalid conditioning scale, must be greater or equal to 1\’
# 断言条件缩放必须大于等于1
kwargs_without_cond_dropout = {**kwargs, cond_drop_prob_keyname: 0.}
kwargs_with_cond_dropout = {**kwargs, cond_drop_prob_keyname: 1.}
# 创建不带条件丢弃和带条件丢弃的参数字典
# 处理要路由到前向和空前向的参数,以便处理两次调用的缓存
fn_kwargs = {k: v[0] for k, v in cfg_routed_kwargs.items()}
null_fn_kwargs = {k: v[1] for k, v in cfg_routed_kwargs.items()}
# 创建非空前向和空前向的参数字典
# 非空前向
outputs = fn_maybe_with_text(self, *args, **fn_kwargs, **kwargs_without_cond_dropout)
# 调用可能包含文本的函数
if cond_scale == 1:
return outputs
# 如果条件缩放为1,则直接返回结果
logits, *rest = cast_tuple(outputs)
# 将输出结果拆分为logits和其余部分
# 空前向
null_outputs = fn_maybe_with_text(self, *args, **null_fn_kwargs, **kwargs_with_cond_dropout)
# 调用可能包含文本的函数
null_logits, *null_rest = cast_tuple(null_outputs)
# 将空前向的输出结果拆分为null_logits和其余部分
zipped_rest = tuple(zip(rest, null_rest))
# 将非空前向和空前向的其余部分进行压缩
scaled_logits = null_logits + (logits – null_logits) * cond_scale
# 计算缩放后的logits
if rescale_phi <= 0:
logit_output = scaled_logits
else:
# 提议的方法,用于防止分类器自由引导过度饱和
# 与imagen的解决方案不同,适用于像素空间和潜在空间
dims = tuple(range(1, logits.ndim – 1))
rescaled_logits = scaled_logits * (logits.std(dim = dims, keepdim = True) / scaled_logits.std(dim = dims, keepdim= True))
logit_output = rescaled_logits * rescale_phi + scaled_logits * (1. – rescale_phi)
# 计算最终输出logits
if is_empty(zipped_rest):
return logit_output
# 如果压缩后的结果为空,则直接返回logit_output
return (logit_output, *zipped_rest)
# 返回最终结果
return inner
# class decorator
# 装饰器函数,用于添加分类器自由引导的类装饰器
@beartype
def classifier_free_guidance_class_decorator(
orig_class,
cond_drop_prob_keyname = COND_DROP_KEY_NAME,
texts_key_name = TEXTS_KEY_NAME,
text_embeds_key_name = TEXT_EMBEDS_KEY_NAME,
cond_fns_keyname = CONDITION_FUNCTION_KEY_NAME,
text_conditioner_name = TEXT_CONDITIONER_NAME
):
assert issubclass(orig_class, nn.Module)
# decorate init
# 保存原始类的初始化方法
orig_init = orig_class.__init__
# 装饰原始类的初始化方法
@wraps(orig_init)
@beartype
def __init__(
self,
*args,
text_condition_type: Union[
Literal[\’film\’],
Literal[\’attention\’],
Literal[\’null\’],
Literal[\’raw\’],
] = \’film\’,
text_condition_model_types: Tuple[str, …] = (\’t5\’,),
text_condition_hidden_dims: Tuple[int, …],
text_condition_cond_drop_prob: float,
**kwargs
):
# 调用原始类的初始化方法
orig_init(self, *args, **kwargs)
# 根据文本条件类型选择相应的条件器类
if text_condition_type == \’film\’:
condition_klass = TextConditioner
elif text_condition_type == \’attention\’:
condition_klass = AttentionTextConditioner
elif text_condition_type == \’raw\’:
condition_klass = TextEmbeddingReturner
else:
condition_klass = NullConditioner
# 初始化文本条件器
self.text_conditioner = condition_klass(
model_types = text_condition_model_types,
hidden_dims = text_condition_hidden_dims,
cond_drop_prob = text_condition_cond_drop_prob
)
orig_class.__init__ = __init__
# decorate forward
# 装饰原始类的前向传播方法
decorated_forward = classifier_free_guidance(
orig_class.forward,
cond_drop_prob_keyname = cond_drop_prob_keyname,
texts_key_name = texts_key_name,
text_embeds_key_name = text_embeds_key_name,
cond_fns_keyname = cond_fns_keyname,
text_conditioner_name = text_conditioner_name
)
orig_class.forward = decorated_forward
# forward `embed_texts` to the `text_conditioner.embed_texts`
# 定义嵌入文本的方法,将其转发到文本条件器的嵌入文本方法
@beartype
def embed_texts(self, texts: List[str]):
return self.text_conditioner.embed_texts(texts)
# 定义属性,缓存最大条件文本长度
@property
@cache
def max_cond_text_len(self):
total_cond_text_len = sum([text_model.max_text_len for text_model in self.text_conditioner.text_models])
return total_cond_text_len
# 如果原始类没有最大条件文本长度属性,则添加
if not hasattr(orig_class, \’max_cond_text_len\’):
orig_class.max_cond_text_len = max_cond_text_len
# 如果原始类没有嵌入文本方法,则添加
if not hasattr(orig_class, \’embed_texts\’):
orig_class.embed_texts = embed_texts
# 标记类已被装饰
orig_class.__decorated_with_cfg = True
return orig_class
# attention
# 定义注意力模块类
class Attention(nn.Module):
def __init__(
self,
dim,
dim_head = 64,
heads = 8,
dim_context = None,
norm_context = False,
num_null_kv = 0,
flash = False
):
super().__init__()
self.heads = heads
self.scale = dim_head ** -0.5
inner_dim = dim_head * heads
dim_context = default(dim_context, dim)
self.norm = nn.LayerNorm(dim)
self.context_norm = nn.LayerNorm(dim_context) if norm_context else nn.Identity()
self.attend = Attend(flash = flash)
self.num_null_kv = num_null_kv
self.null_kv = nn.Parameter(torch.randn(2, num_null_kv, dim_head))
self.to_q = nn.Linear(dim, inner_dim, bias = False)
self.to_kv = nn.Linear(dim_context, dim_head * 2, bias = False)
self.to_out = nn.Linear(inner_dim, dim, bias = False)
def forward(
self,
x,
context = None,
mask = None
):
# 获取输入张量 x 的第一个维度大小
b = x.shape[0]
# 如果上下文存在,则对上下文进行归一化处理
if exists(context):
context = self.context_norm(context)
# 如果上下文不存在,则使用默认的 x 作为上下文输入
kv_input = default(context, x)
# 对输入张量 x 进行归一化处理
x = self.norm(x)
# 将输入张量 x 分别转换为查询 q,键 k,值 v
q, k, v = self.to_q(x), *self.to_kv(kv_input).chunk(2, dim = -1)
# 如果存在空键值对数量大于 0
if self.num_null_kv > 0:
# 重复空键值对,使其与输入张量 x 的第一个维度大小相匹配
null_k, null_v = repeat(self.null_kv, \’kv n d -> kv b n d\’, b = b).unbind(dim = 0)
# 将空键值对与原始键 k 和值 v 进行拼接
k = torch.cat((null_k, k), dim = -2)
v = torch.cat((null_v, v), dim = -2)
# 如果存在掩码 mask
if exists(mask):
# 在掩码 mask 上添加指定数量的填充值
mask = F.pad(mask, (self.num_null_kv, 0), value = True)
# 重新排列掩码 mask 的维度
mask = rearrange(mask, \’b j -> b 1 1 j\’)
# 重新排列查询 q 的维度
q = rearrange(q, \’b n (h d) -> b h n d\’, h = self.heads)
# 进行注意力计算
out = self.attend(q, k, v, mask = mask)
# 重新排列输出 out 的维度
out = rearrange(out, \’b h n d -> b n (h d)\’)
# 返回最终输出
return self.to_out(out)
# dimension adapters
# 重新排列通道为最后一个维度的函数装饰器
def rearrange_channel_last(fn):
@wraps(fn)
def inner(hiddens):
hiddens, ps = pack_one(hiddens, \’b * d\’)
conditioned = fn(hiddens)
return unpack_one(conditioned, ps, \’b * d\’)
return inner
# 重新排列通道为第一个维度的函数装饰器
def rearrange_channel_first(fn):
\”\”\” will adapt shape of (batch, feature, …) for conditioning \”\”\”
@wraps(fn)
def inner(hiddens):
hiddens, ps = pack_one(hiddens, \’b d *\’)
hiddens = rearrange(hiddens, \’b d n -> b n d\’)
conditioned = fn(hiddens)
conditioned = rearrange(conditioned, \’b n d -> b d n\’)
return unpack_one(conditioned, ps, \’b d *\’)
return inner
# conditioning modules
# FiLM 模块
class FiLM(nn.Module):
def __init__(
self,
dim,
hidden_dim
):
super().__init__()
self.net = nn.Sequential(
nn.Linear(dim, hidden_dim * 4),
nn.SiLU(),
nn.Linear(hidden_dim * 4, hidden_dim * 2)
)
nn.init.zeros_(self.net[-1].weight)
nn.init.zeros_(self.net[-1].bias)
def forward(self, conditions, hiddens):
scale, shift = self.net(conditions).chunk(2, dim = -1)
assert scale.shape[-1] == hiddens.shape[-1], f\’unexpected hidden dimesion {hiddens.shape[-1]} used for conditioning\’
scale, shift = map(lambda t: rearrange(t, \’b d -> b 1 d\’), (scale, shift))
return hiddens * (scale + 1) + shift
# 交叉注意力模块
class CrossAttention(nn.Module):
def __init__(
self,
dim,
hidden_dim,
heads = 8,
dim_head = 64,
flash = False
):
super().__init__()
self.attn = Attention(
dim = hidden_dim,
dim_context = dim,
norm_context = True,
num_null_kv = 1,
dim_head = dim_head,
heads = heads,
flash = flash
)
def forward(
self,
condition,
hiddens,
mask = None
):
return self.attn(hiddens, condition, mask = mask) + hiddens
# film text conditioning
# 条件配置字典
CONDITION_CONFIG = dict(
t5 = T5Adapter,
clip = OpenClipAdapter,
bge = BGEAdapter
)
# 模型类型列表
MODEL_TYPES = CONDITION_CONFIG.keys()
# 条件器基类
class Conditioner(nn.Module):
pass
# 空条件器
class Identity(nn.Module):
def forward(self, t, *args, **kwargs):
return t
# 空条件器类,继承自 Conditioner
@beartype
class NullConditioner(Conditioner):
def __init__(
self,
*,
hidden_dims: Tuple[int, …],
**kwargs
):
super().__init__()
num_null_conditioners = len(hidden_dims)
self.cond_fns = tuple(Identity() for _ in range(num_null_conditioners))
self.register_buffer(\’_device_param\’, torch.tensor(0), persistent = False)
@property
def device(self):
return next(self.buffers()).device
def embed_texts(self, texts: List[str]):
assert False, \’null conditioner cannot embed text\’
def forward(self, *args, **kwarg):
return self.cond_fns, None
# 带有 FiLM 的文本条件器
@beartype
class TextConditioner(Conditioner):
def __init__(
self,
*,
hidden_dims: Tuple[int, …],
model_types = \’t5\’,
model_names = None,
cond_drop_prob = 0.,
hiddens_channel_first = True,
text_embed_stem_dim_mult = 2
):
# 调用父类的构造函数
super().__init__()
# 将 model_types 转换为元组
model_types = cast_tuple(model_types)
# 将 model_names 转换为元组,并确保其长度与 model_types 相同
model_names = cast_tuple(model_names, length = len(model_types))
# 断言 model_types 和 model_names 的长度相同
assert len(model_types) == len(model_names)
# 断言 model_types 中的每个元素都在 MODEL_TYPES 中
assert all([model_type in MODEL_TYPES for model_type in model_types])
# 初始化一个空列表 text_models
text_models = []
# 遍历 model_types 和 model_names,根据 model_type 创建对应的模型,并添加到 text_models 中
for model_type, model_name in zip(model_types, model_names):
klass = CONDITION_CONFIG.get(model_type)
model = klass(model_name)
text_models.append(model)
# 将 text_models 赋值给 self.text_models
self.text_models = text_models
# 获取每个模型的潜在维度,存储在 latent_dims 中
self.latent_dims = [model.dim_latent for model in text_models]
# 初始化一个空的 nn.ModuleList,用于存储条件器
self.conditioners = nn.ModuleList([])
# 将 hidden_dims、num_condition_fns、hiddens_channel_first、cond_drop_prob 等属性赋值
self.hidden_dims = hidden_dims
self.num_condition_fns = len(hidden_dims)
self.hiddens_channel_first = cast_tuple(hiddens_channel_first, self.num_condition_fns) # 是否将待条件化的隐藏层放在通道维度的第一位
# 断言 hiddens_channel_first 的长度与 num_condition_fns 相同
assert len(self.hiddens_channel_first) == self.num_condition_fns
# 将 cond_drop_prob 赋值给 self.cond_drop_prob
# 计算总的潜在维度
total_latent_dim = sum(self.latent_dims)
# 计算 MLP 的输出维度
mlp_stem_output_dim = total_latent_dim * text_embed_stem_dim_mult
# 定义文本嵌入的 MLP 结构
self.text_embed_stem_mlp = nn.Sequential(
nn.Linear(total_latent_dim, mlp_stem_output_dim),
nn.SiLU()
)
# 根据 hidden_dims 创建条件器,并添加到 self.conditioners 中
for hidden_dim in hidden_dims:
self.conditioners.append(FiLM(mlp_stem_output_dim, hidden_dim))
# 初始化一个随机参数 null_text_embed
self.null_text_embed = nn.Parameter(torch.randn(total_latent_dim))
# 注册一个缓冲区 _device_param
self.register_buffer(\’_device_param\’, torch.tensor(0.), persistent = False)
@property
def device(self):
# 返回第一个缓冲区的设备
return next(self.buffers()).device
def embed_texts(self, texts: List[str]):
# 获取设备信息
device = self.device
# 初始化一个空列表 text_embeds,用于存储文本嵌入结果
text_embeds = []
# 遍历每个文本模型,将文本嵌入结果添加到 text_embeds 中
for text_model in self.text_models:
text_embed = text_model.embed_text(texts)
text_embeds.append(text_embed.to(device))
# 沿着最后一个维度拼接文本嵌入结果
return torch.cat(text_embeds, dim = -1)
def forward(
self,
texts: Optional[List[str]] = None,
text_embeds: Optional[Tensor] = None,
cond_drop_prob = None,
repeat_batch = 1, # 用于机器人变压器边缘情况
) -> Tuple[
Tuple[Callable, …],
TextCondReturn
]:
# 断言 texts 和 text_embeds 只有一个存在
assert exists(texts) ^ exists(text_embeds)
# 如果处于训练状态,则使用默认的 cond_drop_prob,否则需要显式设置
if self.training:
cond_drop_prob = default(cond_drop_prob, self.cond_drop_prob)
else:
assert exists(cond_drop_prob), \’当不处于训练状态时,必须显式设置 cond_drop_prob\’
# 根据 texts 或 text_embeds 的存在情况确定 batch 大小
if exists(texts):
batch = len(texts)
elif exists(text_embeds):
batch = text_embeds.shape[0]
# 如果 text_embeds 不存在,则调用 embed_texts 方法生成
if not exists(text_embeds):
text_embeds = self.embed_texts(texts)
# 如果 cond_drop_prob 大于 0,则生成一个掩码,用于对文本嵌入进行条件化
if cond_drop_prob > 0.:
prob_keep_mask = prob_mask_like((batch, 1), 1. – cond_drop_prob, device = self.device)
null_text_embeds = rearrange(self.null_text_embed, \’d -> 1 d\’)
text_embeds = torch.where(
prob_keep_mask,
text_embeds,
null_text_embeds
)
# 对文本嵌入进行 MLP 处理
text_embeds = self.text_embed_stem_mlp(text_embeds)
# 准备条件函数
repeat_batch = cast_tuple(repeat_batch, self.num_condition_fns)
cond_fns = []
# 遍历条件器,生成条件函数
for cond, cond_hiddens_channel_first, cond_repeat_batch in zip(self.conditioners, self.hiddens_channel_first, repeat_batch):
cond_text_embeds = repeat(text_embeds, \’b … -> (b r) …\’, r = cond_repeat_batch)
cond_fn = partial(cond, cond_text_embeds)
wrapper_fn = rearrange_channel_first if cond_hiddens_channel_first else rearrange_channel_last
cond_fns.append(wrapper_fn(cond_fn))
# 返回条件函数和文本条件返回值
return tuple(cond_fns), TextCondReturn(text_embeds, None)
# 定义一个名为 AttentionTextConditioner 的类,继承自 Conditioner 类
@beartype
class AttentionTextConditioner(Conditioner):
# 初始化函数,接受一系列参数
def __init__(
self,
*,
hidden_dims: Tuple[int, …], # 隐藏层维度的元组
model_types = \’t5\’, # 模型类型,默认为 \’t5\’
model_names = None, # 模型名称,默认为 None
cond_drop_prob = 0., # 条件丢弃概率,默认为 0
hiddens_channel_first = True, # 是否隐藏层优先,默认为 True
dim_latent = None, # 潜在维度,默认为 None
attn_dim_head = 64, # 注意力头维度,默认为 64
attn_heads = 8, # 注意力头数,默认为 8
flash = True # 是否闪烁,默认为 True
):
super().__init__() # 调用父类的初始化函数
model_types = cast_tuple(model_types) # 将模型类型转换为元组
model_names = cast_tuple(model_names, length = len(model_types)) # 将模型名称转换为元组,长度与模型类型相同
assert len(model_types) == len(model_names) # 断言模型类型和模型名称长度相同
assert all([model_type in MODEL_TYPES for model_type in model_types]) # 断言所有模型类型在 MODEL_TYPES 中
text_models = [] # 初始化文本模型列表
# 遍历模型类型和模型名称,创建文本模型并添加到列表中
for model_type, model_name in zip(model_types, model_names):
klass = CONDITION_CONFIG.get(model_type)
model = klass(model_name)
text_models.append(model)
self.text_models = text_models # 将文本模型列表赋值给类属性
self.to_latent_dims = nn.ModuleList([]) # 初始化线性层列表
dim_latent = default(dim_latent, max([model.dim_latent for model in text_models])) # 计算潜在维度
self.dim_latent = dim_latent # 将潜在维度赋值给类属性
# 遍历文本模型,为每个模型添加线性层
for model in text_models:
self.to_latent_dims.append(nn.Linear(model.dim_latent, dim_latent))
self.conditioners = nn.ModuleList([]) # 初始化条件器列表
self.hidden_dims = hidden_dims # 隐藏层维度赋值给类属性
self.num_condition_fns = len(hidden_dims) # 隐藏层维度数量赋值给类属性
self.hiddens_channel_first = cast_tuple(hiddens_channel_first, self.num_condition_fns) # 是否隐藏层优先赋值给类属性
assert len(self.hiddens_channel_first) == self.num_condition_fns # 断言隐藏层优先长度与隐藏层维度数量相同
self.cond_drop_prob = cond_drop_prob # 条件丢弃概率赋值给类属性
# 遍历隐藏层维度,为每个维度添加交叉注意力模块
for hidden_dim in hidden_dims:
self.conditioners.append(CrossAttention(dim_latent, hidden_dim, flash = flash))
self.register_buffer(\’_device_param\’, torch.tensor(0), persistent = False) # 注册缓冲区
@property
def device(self):
return next(self.buffers()).device # 返回设备信息
# 嵌入文本函数,接受文本列表,返回文本嵌入向量
def embed_texts(self, texts: List[str]):
device = self.device # 获取设备信息
text_embeds = [] # 初始化文本嵌入列表
# 遍历文本模型和线性层,为每个文本嵌入向量添加嵌入
for text_model, to_latent in zip(self.text_models, self.to_latent_dims):
text_embed = text_model.embed_text(texts, return_text_encodings = True) # 嵌入文本并返回文本编码
text_embed = text_embed.to(device) # 将文本嵌入向量移动到设备
mask = (text_embed != 0).any(dim = -1) # 创建掩码
text_embed = to_latent(text_embed) # 使用线性层转换文本嵌入向量
text_embed = text_embed.masked_fill(~mask[…, None], 0.) # 根据掩码填充文本嵌入向量
text_embeds.append(text_embed) # 将处理后的文本嵌入向量添加到列表中
return torch.cat(text_embeds, dim = -2) # 沿指定维度连接文本嵌入向量
# 前向传播函数,接受文本列表、文本嵌入向量等参数,返回元组
def forward(
self,
texts: Optional[List[str]] = None,
text_embeds: Optional[Tensor] = None,
cond_drop_prob = None,
repeat_batch = 1, # 用于机器人变压器边缘情况
) -> Tuple[
Tuple[Callable, …],
TextCondReturn
# 检查是否存在文本或文本嵌入
assert exists(texts) or exists(text_embeds)
# 如果存在文本嵌入和文本,则文本嵌入优先
if exists(text_embeds) and exists(texts):
texts = None
# 如果处于训练状态,则使用默认的条件丢弃概率,否则需要显式设置条件丢弃概率
if self.training:
cond_drop_prob = default(cond_drop_prob, self.cond_drop_prob)
else:
assert exists(cond_drop_prob), \’when not training, cond_drop_prob must be explicitly set\’
# 根据文本或文本嵌入的存在情况确定批次大小
if exists(texts):
batch = len(texts)
elif exists(text_embeds):
batch = text_embeds.shape[0]
# 如果不存在文本嵌入,则使用模型的 embed_texts 方法生成文本嵌入
if not exists(text_embeds):
text_embeds = self.embed_texts(texts)
# 创建一个掩码,标记非零元素的位置
mask = (text_embeds != 0).any(dim=-1)
# 如果条件丢弃概率大于0,则生成一个概率保留掩码
if cond_drop_prob > 0.:
prob_keep_mask = prob_mask_like((batch, 1), 1. – cond_drop_prob, device=self.device)
mask = mask & prob_keep_mask
# 准备条件函数
repeat_batch = cast_tuple(repeat_batch, self.num_condition_fns)
cond_fns = []
# 遍历条件器,生成条件函数列表
for cond, cond_hiddens_channel_first, cond_repeat_batch in zip(self.conditioners, self.hiddens_channel_first, repeat_batch):
cond_text_embeds = repeat(text_embeds, \’b … -> (b r) …\’, r=cond_repeat_batch)
cond_mask = repeat(mask, \’b … -> (b r) …\’, r=cond_repeat_batch) if exists(mask) else None
cond_fn = partial(cond, cond_text_embeds, mask=cond_mask)
wrapper_fn = rearrange_channel_first if cond_hiddens_channel_first else rearrange_channel_last
cond_fns.append(wrapper_fn(cond_fn))
# 返回条件函数列表和文本条件返回对象
return tuple(cond_fns), TextCondReturn(text_embeds, mask)
# 返回原始文本嵌入
# 定义一个文本嵌入返回器类,继承自 Conditioner 类
@beartype
class TextEmbeddingReturner(Conditioner):
# 初始化函数
def __init__(
self,
*,
dim_latent = None, # 潜在维度,默认为 None
hidden_dims: Tuple[int, …] = tuple(), # 隐藏维度,默认为空元组
model_types = \’t5\’, # 模型类型,默认为 \’t5\’
model_names = None, # 模型名称,默认为 None
cond_drop_prob = 0., # 条件丢弃概率,默认为 0.
):
super().__init__() # 调用父类的初始化函数
model_types = cast_tuple(model_types) # 将模型类型转换为元组
model_names = cast_tuple(model_names, length = len(model_types)) # 将模型名称转换为元组,长度与模型类型相同
assert len(model_types) == len(model_names) # 断言模型类型和模型名称长度相同
assert all([model_type in MODEL_TYPES for model_type in model_types]) # 断言所有模型类型在 MODEL_TYPES 中
text_models = [] # 初始化文本模型列表
# 遍历模型类型和模型名称,创建模型对象并添加到文本模型列表中
for model_type, model_name in zip(model_types, model_names):
klass = CONDITION_CONFIG.get(model_type)
model = klass(model_name)
text_models.append(model)
self.text_models = text_models # 将文本模型列表赋值给实例变量
self.to_latent_dims = nn.ModuleList([]) # 初始化潜在维度列表
dim_latent = default(dim_latent, max([model.dim_latent for model in text_models])) # 获取最大的模型潜在维度作为潜在维度
self.dim_latent = dim_latent # 将潜在维度赋值给实例变量
# 遍历文本模型,为每个模型创建线性层并添加到潜在维度列表中
for model in text_models:
self.to_latent_dims.append(nn.Linear(model.dim_latent, dim_latent))
self.conditioners = nn.ModuleList([]) # 初始化条件器列表
self.cond_drop_prob = cond_drop_prob # 将条件丢弃概率赋值给实例变量
# 遍历隐藏维度,为每个维度创建恒等映射并添加到条件器列表中
for hidden_dim in hidden_dims:
self.conditioners.append(nn.Identity())
self.register_buffer(\’_device_param\’, torch.tensor(0), persistent = False) # 注册缓冲区
@property
def device(self):
return next(self.buffers()).device # 返回缓冲区的设备
# 嵌入文本函数
def embed_texts(self, texts: List[str]):
device = self.device # 获取设备
text_embeds = [] # 初始化文本嵌入列表
# 遍历文本模型和潜在维度列表,为每个文本模型嵌入文本并处理
for text_model, to_latent in zip(self.text_models, self.to_latent_dims):
text_embed = text_model.embed_text(texts, return_text_encodings = True) # 嵌入文本并返回文本编码
text_embed = text_embed.to(device) # 将文本嵌入移到设备上
mask = (text_embed != 0).any(dim = -1) # 创建掩码,标记非零值
text_embed = to_latent(text_embed) # 使用线性层进行潜在维度转换
text_embed = text_embed.masked_fill(~mask[…, None], 0.) # 根据掩码填充文本嵌入
text_embeds.append(text_embed) # 将处理后的文本嵌入添加到列表中
return torch.cat(text_embeds, dim = -2) # 沿指定维度拼接文本嵌入
# 前向传播函数
def forward(
self,
texts: Optional[List[str]] = None, # 文本列表,默认为 None
text_embeds: Optional[Tensor] = None, # 文本嵌入张量,默认为 None
cond_drop_prob = None # 条件丢弃概率,默认为 None
) -> Tuple[
Tuple[Callable, …], # 返回条件器元组
TextCondReturn # 返回文本条件返回对象
]:
assert exists(texts) ^ exists(text_embeds) # 断言文本列表和文本嵌入张量只能有一个存在
if self.training:
cond_drop_prob = default(cond_drop_prob, self.cond_drop_prob) # 如果在训练模式下,使用默认的条件丢弃概率
else:
assert exists(cond_drop_prob), \’when not training, cond_drop_prob must be explicitly set\’ # 如果不在训练模式下,条件丢弃概率必须显式设置
if exists(texts):
batch = len(texts) # 获取文本列表的长度
elif exists(text_embeds):
batch = text_embeds.shape[0] # 获取文本嵌入张量的批次大小
if not exists(text_embeds):
text_embeds = self.embed_texts(texts) # 如果文本嵌入不存在,则调用嵌入文本函数
mask = (text_embeds != 0).any(dim = -1) # 创建掩码,标记非零值
if cond_drop_prob > 0.:
prob_keep_mask = prob_mask_like((batch, 1), 1. – cond_drop_prob, device = self.device) # 创建概率掩码
mask = mask & prob_keep_mask # 更新掩码
return tuple(self.conditioners), TextCondReturn(text_embeds, mask) # 返回条件器元组和文本条件返回对象
.\\lucidrains\\classifier-free-guidance-pytorch\\classifier_free_guidance_pytorch\\open_clip.py
# 导入必要的库和模块
from beartype import beartype
from typing import List
import torch
from torch import nn, einsum
import torch.nn.functional as F
import open_clip
from classifier_free_guidance_pytorch.tokenizer import tokenizer
# 常量定义
DEFAULT_CLIP_NAME = \’ViT-B-32\’
DEFAULT_PRETRAINED_CLIP = \’laion400m_e32\’
# 辅助函数
# 检查值是否存在
def exists(val):
return val is not None
# 返回默认值
def default(val, d):
return val if exists(val) else d
# 对张量进行 L2 归一化
def l2norm(t):
return F.normalize(t, dim = -1)
# 适配器类
class OpenClipAdapter():
def __init__(
self,
name = DEFAULT_CLIP_NAME,
pretrained = DEFAULT_PRETRAINED_CLIP
):
# 设置默认值
name = default(name, DEFAULT_CLIP_NAME)
pretrained = default(pretrained, DEFAULT_PRETRAINED_CLIP)
# 创建 OpenCLIP 模型和预处理函数
clip, _, preprocess = open_clip.create_model_and_transforms(name, pretrained = pretrained)
self.clip = clip
clip.eval()
self.tokenizer = tokenizer
self.eos_id = 49407
# 获取文本注意力的最后一层
text_attention_final = self.find_layer(\’ln_final\’)
self._dim_latent = text_attention_final.weight.shape[0]
# 注册前向钩子
self.handle = text_attention_final.register_forward_hook(self._hook)
self.clip_normalize = preprocess.transforms[-1]
self.cleared = False
# 查找指定层
def find_layer(self, layer):
modules = dict([*self.clip.named_modules()])
return modules.get(layer, None)
# 清除前向钩子
def clear(self):
if self.cleared:
return
self.handle()
# 前向钩子函数
def _hook(self, _, inputs, outputs):
self.text_encodings = outputs
@property
def dim_latent(self):
return self._dim_latent
@property
def max_text_len(self):
return 77
# 嵌入文本
@torch.no_grad()
@beartype
def embed_text(
self,
texts: List[str],
return_text_encodings = False,
output_device = None
):
# 对文本进行分词
texts, max_length = self.tokenizer.tokenize(texts)
texts = texts[…, :self.max_text_len]
# 编码文本
text_embeds = self.clip.encode_text(texts)
texts = texts[…, :max_length]
if not return_text_encodings:
return l2norm(text_embeds).to(output_device)
# 处理文本编码
is_eos_id = (texts == self.eos_id)
text_mask_excluding_eos = is_eos_id.cumsum(dim = -1) == 0
text_mask = F.pad(text_mask_excluding_eos, (1, -1), value = True)
text_mask = text_mask & (texts != 0)
assert not self.cleared
text_encodings = self.text_encodings[:, :max_length]
text_encodings = text_encodings.masked_fill(~text_mask[…, None], 0.)
del self.text_encodings
return text_encodings.float().to(output_device)
.\\lucidrains\\classifier-free-guidance-pytorch\\classifier_free_guidance_pytorch\\t5.py
# 导入所需的模块
from typing import List
from beartype import beartype
import torch
import transformers
from transformers import T5Tokenizer, T5EncoderModel, T5Config
# 设置 transformers 模块的日志级别为 error
transformers.logging.set_verbosity_error()
# 辅助函数
# 检查值是否存在
def exists(val):
return val is not None
# 返回默认值
def default(val, d):
return val if exists(val) else d
# 配置
# 定义最大长度
MAX_LENGTH = 256
# 默认的 T5 模型名称
DEFAULT_T5_NAME = \’google/t5-v1_1-base\’
# 存储 T5 模型配置的字典
T5_CONFIGS = {}
# 全局单例变量
# 获取 tokenizer
def get_tokenizer(name):
tokenizer = T5Tokenizer.from_pretrained(name)
return tokenizer
# 获取模型
def get_model(name):
model = T5EncoderModel.from_pretrained(name)
return model
# 获取模型和 tokenizer
def get_model_and_tokenizer(name):
global T5_CONFIGS
if name not in T5_CONFIGS:
T5_CONFIGS[name] = dict()
if \”model\” not in T5_CONFIGS[name]:
T5_CONFIGS[name][\”model\”] = get_model(name)
if \”tokenizer\” not in T5_CONFIGS[name]:
T5_CONFIGS[name][\”tokenizer\”] = get_tokenizer(name)
return T5_CONFIGS[name][\’model\’], T5_CONFIGS[name][\’tokenizer\’]
# 获取编码维度
def get_encoded_dim(name):
if name not in T5_CONFIGS:
# 避免加载模型,仅获取维度
config = T5Config.from_pretrained(name)
T5_CONFIGS[name] = dict(config=config)
elif \”config\” in T5_CONFIGS[name]:
config = T5_CONFIGS[name][\”config\”]
elif \”model\” in T5_CONFIGS[name]:
config = T5_CONFIGS[name][\”model\”].config
else:
assert False
return config.d_model
# 编码文本
# 对文本进行编码
def t5_encode_text(texts, name = DEFAULT_T5_NAME, output_device = None):
t5, tokenizer = get_model_and_tokenizer(name)
if torch.cuda.is_available():
t5 = t5.cuda()
device = next(t5.parameters()).device
encoded = tokenizer.batch_encode_plus(
texts,
return_tensors = \”pt\”,
padding = \’longest\’,
max_length = MAX_LENGTH,
truncation = True
)
input_ids = encoded.input_ids.to(device)
attn_mask = encoded.attention_mask.to(device)
t5.eval()
with torch.no_grad():
output = t5(input_ids = input_ids, attention_mask = attn_mask)
encoded_text = output.last_hidden_state.detach()
attn_mask = attn_mask.bool()
if not exists(output_device):
return encoded_text, attn_mask
encoded_text.to(output_device)
attn_mask.to(output_device)
return encoded_text, attn_mask
# T5 适配器类
class T5Adapter():
def __init__(
self,
name
):
name = default(name, DEFAULT_T5_NAME)
t5, tokenizer = get_model_and_tokenizer(name)
if torch.cuda.is_available():
t5 = t5.cuda()
self.name = name
self.t5 = t5
self.tokenizer = tokenizer
@property
def dim_latent(self):
return get_encoded_dim(self.name)
@property
def max_text_len(self):
return MAX_LENGTH
@torch.no_grad()
@beartype
def embed_text(
self,
texts: List[str],
return_text_encodings = False,
output_device = None
):
device = next(self.t5.parameters()).device
encoded = self.tokenizer.batch_encode_plus(
texts,
return_tensors = \”pt\”,
padding = \’longest\’,
max_length = MAX_LENGTH,
truncation = True
)
input_ids = encoded.input_ids.to(device)
attn_mask = encoded.attention_mask.to(device)
self.t5.eval()
with torch.no_grad():
output = self.t5(input_ids = input_ids, attention_mask = attn_mask)
encoded_text = output.last_hidden_state.detach()
attn_mask = attn_mask.bool()
encoded_text.masked_fill_(~attn_mask[…, None], 0.)
if not return_text_encodings:
numer = encoded_text.sum(dim = -2)
denom = attn_mask.sum(dim = -1)[…, None]
numer.masked_fill_(denom == 0, 0.)
mean_encodings = numer / denom.clamp(min = 1e-3)
return mean_encodings
return encoded_text.to(output_device)
.\\lucidrains\\classifier-free-guidance-pytorch\\classifier_free_guidance_pytorch\\__init__.py
# 从 classifier_free_guidance_pytorch 包中导入 NullConditioner、TextConditioner、AttentionTextConditioner、TextEmbeddingReturner 类
from classifier_free_guidance_pytorch.classifier_free_guidance_pytorch import (
NullConditioner,
TextConditioner,
AttentionTextConditioner,
TextEmbeddingReturner
)
# 从 classifier_free_guidance_pytorch 包中导入 classifier_free_guidance、classifier_free_guidance_class_decorator 函数
from classifier_free_guidance_pytorch.classifier_free_guidance_pytorch import (
classifier_free_guidance,
classifier_free_guidance_class_decorator
)
# 从 classifier_free_guidance_pytorch 包中导入 OpenClipAdapter 类
from classifier_free_guidance_pytorch.open_clip import OpenClipAdapter
# 从 classifier_free_guidance_pytorch 包中导入 T5Adapter 类
from classifier_free_guidance_pytorch.t5 import T5Adapter
# 从 classifier_free_guidance_pytorch 包中导入 BGEAdapter 类
from classifier_free_guidance_pytorch.bge import BGEAdapter
Classifier Free Guidance – Pytorch
Implementation of Classifier Free Guidance in Pytorch, with emphasis on text conditioning, and flexibility to include multiple text embedding models, as done in eDiff-I
It is clear now that text guidance is the ultimate interface to models. This repository will leverage some python decorator magic to make it easy to incorporate SOTA text conditioning to any model.
Appreciation
StabilityAI for the generous sponsorship, as well as my other sponsors out there
🤗 Huggingface for their amazing transformers library. The text conditioning module will use T5 embeddings, as latest research recommends
OpenCLIP for providing SOTA open sourced CLIP models. The eDiff model sees immense improvements by combining the T5 embeddings with CLIP text embeddings
Install
$ pip install classifier-free-guidance-pytorch
Usage
import torch
from classifier_free_guidance_pytorch import TextConditioner
text_conditioner = TextConditioner(
model_types = \’t5\’,
hidden_dims = (256, 512),
hiddens_channel_first = False,
cond_drop_prob = 0.2 # conditional dropout 20% of the time, must be greater than 0. to unlock classifier free guidance
).cuda()
# pass in your text as a List[str], and get back a List[callable]
# each callable function receives the hiddens in the dimensions listed at init (hidden_dims)
first_condition_fn, second_condition_fn = text_conditioner([\’a dog chasing after a ball\’])
# these hiddens will be in the direct flow of your model, say in a unet
first_hidden = torch.randn(1, 16, 256).cuda()
second_hidden = torch.randn(1, 32, 512).cuda()
# conditioned features
first_conditioned = first_condition_fn(first_hidden)
second_conditioned = second_condition_fn(second_hidden)
If you wish to use cross attention based conditioning (each hidden feature in your network can attend to individual subword tokens), just import the AttentionTextConditioner instead. Rest is the same
from classifier_free_guidance_pytorch import AttentionTextConditioner
text_conditioner = AttentionTextConditioner(
model_types = (\’t5\’, \’clip\’), # something like in eDiff paper, where they used both T5 and Clip for even better results (Balaji et al.)
hidden_dims = (256, 512),
cond_drop_prob = 0.2
)
Magic Class Decorator
This is a work in progress to make it as easy as possible to text condition your network.
First, let’s say you have a simple two layer network
import torch
from torch import nn
class MLP(nn.Module):
def __init__(
self,
dim
):
super().__init__()
self.proj_in = nn.Sequential(nn.Linear(dim, dim * 2), nn.ReLU())
self.proj_mid = nn.Sequential(nn.Linear(dim * 2, dim), nn.ReLU())
self.proj_out = nn.Linear(dim, 1)
def forward(
self,
data
):
hiddens1 = self.proj_in(data)
hiddens2 = self.proj_mid(hiddens1)
return self.proj_out(hiddens2)
# instantiate model and pass in some data, get (in this case) a binary prediction
model = MLP(dim = 256)
data = torch.randn(2, 256)
pred = model(data)
You would like to condition the hidden layers (hiddens1 and hiddens2) with text. Each batch element here would get its own free text conditioning
This has been whittled down to ~3 step using this repository.
import torch
from torch import nn
from classifier_free_guidance_pytorch import classifier_free_guidance_class_decorator
@classifier_free_guidance_class_decorator
class MLP(nn.Module):
def __init__(self, dim):
super().__init__()
self.proj_in = nn.Sequential(nn.Linear(dim, dim * 2), nn.ReLU())
self.proj_mid = nn.Sequential(nn.Linear(dim * 2, dim), nn.ReLU())
self.proj_out = nn.Linear(dim, 1)
def forward(
self,
inp,
cond_fns # List[Callable] – (1) your forward function now receives a list of conditioning functions, which you invoke on your hidden tensors
):
cond_hidden1, cond_hidden2 = cond_fns # conditioning functions are given back in the order of the `hidden_dims` set on the text conditioner
hiddens1 = self.proj_in(inp)
hiddens1 = cond_hidden1(hiddens1) # (2) condition the first hidden layer with FiLM
hiddens2 = self.proj_mid(hiddens1)
hiddens2 = cond_hidden2(hiddens2) # condition the second hidden layer with FiLM
return self.proj_out(hiddens2)
# instantiate your model – extra keyword arguments will need to be defined, prepended by `text_condition_`
model = MLP(
dim = 256,
text_condition_type = \’film\’, # can be film, attention, or null (none)
text_condition_model_types = (\’t5\’, \’clip\’), # in this example, conditioning on both T5 and OpenCLIP
text_condition_hidden_dims = (512, 256), # and pass in the hidden dimensions you would like to condition on. in this case there are two hidden dimensions (dim * 2 and dim, after the first and second projections)
text_condition_cond_drop_prob = 0.25 # conditional dropout probability for classifier free guidance. can be set to 0. if you do not need it and just want the text conditioning
)
# now you have your input data as well as corresponding free text as List[str]
data = torch.randn(2, 256)
texts = [\’a description\’, \’another description\’]
# (3) train your model, passing in your list of strings as \’texts\’
pred = model(data, texts = texts)
# after much training, you can now do classifier free guidance by passing in a condition scale of > 1. !
model.eval()
guided_pred = model(data, texts = texts, cond_scale = 3.) # cond_scale stands for conditioning scale from classifier free guidance paper
Todo
complete film conditioning, without classifier free guidance (used here)
add classifier free guidance for film conditioning
complete cross attention conditioning
stress test for spacetime unet in make-a-video
Citations
@article{Ho2022ClassifierFreeDG,
title = {Classifier-Free Diffusion Guidance},
author = {Jonathan Ho},
journal = {ArXiv},
year = {2022},
volume = {abs/2207.12598}
}
@article{Balaji2022eDiffITD,
title = {eDiff-I: Text-to-Image Diffusion Models with an Ensemble of Expert Denoisers},
author = {Yogesh Balaji and Seungjun Nah and Xun Huang and Arash Vahdat and Jiaming Song and Karsten Kreis and Miika Aittala and Timo Aila and Samuli Laine and Bryan Catanzaro and Tero Karras and Ming-Yu Liu},
journal = {ArXiv},
year = {2022},
volume = {abs/2211.01324}
}
@inproceedings{dao2022flashattention,
title = {Flash{A}ttention: Fast and Memory-Efficient Exact Attention with {IO}-Awareness},
author = {Dao, Tri and Fu, Daniel Y. and Ermon, Stefano and Rudra, Atri and R{\\\’e}, Christopher},
booktitle = {Advances in Neural Information Processing Systems},
year = {2022}
}
@inproceedings{Lin2023CommonDN,
title = {Common Diffusion Noise Schedules and Sample Steps are Flawed},
author = {Shanchuan Lin and Bingchen Liu and Jiashi Li and Xiao Yang},
year = {2023}
}
.\\lucidrains\\classifier-free-guidance-pytorch\\setup.py
# 导入设置工具和查找包工具
from setuptools import setup, find_packages
# 设置包的元数据
setup(
name = \’classifier-free-guidance-pytorch\’, # 包名
packages = find_packages(exclude=[]), # 查找包
include_package_data = True, # 包含数据文件
version = \’0.5.3\’, # 版本号
license=\’MIT\’, # 许可证
description = \’Classifier Free Guidance – Pytorch\’, # 描述
author = \’Phil Wang\’, # 作者
author_email = \’lucidrains@gmail.com\’, # 作者邮箱
long_description_content_type = \’text/markdown\’, # 长描述内容类型
url = \’https://github.com/lucidrains/classifier-free-guidance-pytorch\’, # URL
keywords = [ # 关键词
\’artificial intelligence\’,
\’deep learning\’,
\’classifier free guidance\’,
\’text conditioning and guidance\’
],
install_requires=[ # 安装依赖
\’beartype\’,
\’einops>=0.7\’,
\’ftfy\’,
\’open-clip-torch>=2.8.0\’,
\’torch>=2.0\’,
\’transformers[torch]\’
],
classifiers=[ # 分类器
\’Development Status :: 4 – Beta\’,
\’Intended Audience :: Developers\’,
\’Topic :: Scientific/Engineering :: Artificial Intelligence\’,
\’License :: OSI Approved :: MIT License\’,
\’Programming Language :: Python :: 3.6\’,
],
)
.\\lucidrains\\CoCa-pytorch\\coca_pytorch\\coca_pytorch.py
# 导入 torch 库
import torch
# 从 torch 库中导入 einsum, nn 模块
from torch import einsum, nn
# 从 torch 库中导入 F 模块
import torch.nn.functional as F
# 从 torch.autograd 库中导入 Function 模块
from torch.autograd import Function
# 从 torch.distributed 库中导入 dist 模块
import torch.distributed as dist
# 从 einops 库中导入 rearrange, repeat 函数
# helper functions
# 定义函数 exists,判断变量是否存在
def exists(val):
return val is not None
# 定义函数 default,如果变量存在则返回其值,否则返回默认值
def default(val, d):
return val if exists(val) else d
# distributed
# 定义函数 pad_dim_to,将张量在指定维度上填充到指定长度
def pad_dim_to(t, length, dim = 0):
pad_length = length – t.shape[dim]
zero_pairs = (-dim – 1) if dim < 0 else (t.ndim – dim – 1)
return F.pad(t, (*((0, 0) * zero_pairs), 0, pad_length)
# 定义函数 all_gather_variable_batch,用于在分布式环境中收集所有张量的批次
def all_gather_variable_batch(t):
device, rank, world_size = t.device, dist.get_rank(), dist.get_world_size()
size = torch.tensor(t.shape[0], device = device, dtype = torch.long)
sizes = [torch.empty_like(size, device = device, dtype = torch.long) for i in range(world_size)]
dist.all_gather(sizes, size)
sizes = torch.stack(sizes)
max_size = sizes.amax().item()
padded_t = pad_dim_to(t, max_size, dim = 0)
gathered_tensors = [torch.empty_like(padded_t, device = device, dtype = padded_t.dtype) for i in range(world_size)]
dist.all_gather(gathered_tensors, padded_t)
gathered_tensor = torch.cat(gathered_tensors)
seq = torch.arange(max_size, device = device)
mask = rearrange(seq, \’j -> 1 j\’) < rearrange(sizes, \’i -> i 1\’)
mask = rearrange(mask, \’i j -> (i j)\’)
gathered_tensor = gathered_tensor[mask]
sizes = sizes.tolist()
return gathered_tensor, sizes
# 定义类 AllGather,用于在分布式环境中收集所有张量
class AllGather(Function):
@staticmethod
def forward(ctx, x):
assert dist.is_initialized() and dist.get_world_size() > 1
x, batch_sizes = all_gather_variable_batch(x)
ctx.batch_sizes = batch_sizes
return x
@staticmethod
def backward(ctx, grads):
batch_sizes, rank = ctx.batch_sizes, dist.get_rank()
grads_by_rank = grads.split(batch_sizes, dim = 0)
return grads_by_rank[rank]
# 将 AllGather 类应用到张量上
all_gather = AllGather.apply
# normalization
# they use layernorm without bias, something that pytorch does not offer
# 定义类 LayerNorm,用于实现 Layer Normalization
class LayerNorm(nn.Module):
def __init__(self, dim):
super().__init__()
self.gamma = nn.Parameter(torch.ones(dim))
self.register_buffer(\”beta\”, torch.zeros(dim))
def forward(self, x):
return F.layer_norm(x, x.shape[-1:], self.gamma, self.beta)
# residual
# 定义类 Residual,用于实现残差连接
class Residual(nn.Module):
def __init__(self, fn):
super().__init__()
self.fn = fn
def forward(self, x, *args, **kwargs):
return self.fn(x, *args, **kwargs) + x
# to latents
# 定义类 EmbedToLatents,用于将输入转换为潜在空间
class EmbedToLatents(nn.Module):
def __init__(self, dim, dim_latents):
super().__init__()
self.to_latents = nn.Linear(dim, dim_latents, bias=False)
def forward(self, x):
latents = self.to_latents(x)
return F.normalize(latents, dim=-1)
# rotary positional embedding
# https://arxiv.org/abs/2104.09864
# 定义类 RotaryEmbedding,用于实现旋转位置嵌���
class RotaryEmbedding(nn.Module):
def __init__(self, dim):
super().__init__()
inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))
self.register_buffer(\”inv_freq\”, inv_freq)
def forward(self, max_seq_len, *, device):
seq = torch.arange(max_seq_len, device=device, dtype=self.inv_freq.dtype)
freqs = einsum(\”i , j -> i j\”, seq, self.inv_freq)
return torch.cat((freqs, freqs), dim=-1)
# 定义函数 rotate_half,用于旋转张量的一半
def rotate_half(x):
x = rearrange(x, \”… (j d) -> … j d\”, j=2)
x1, x2 = x.unbind(dim=-2)
return torch.cat((-x2, x1), dim=-1)
# 定义函数 apply_rotary_pos_emb,应用旋转位置嵌入到张量上
def apply_rotary_pos_emb(pos, t):
return (t * pos.cos()) + (rotate_half(t) * pos.sin())
# classic Noam Shazeer paper, except here they use SwiGLU instead of the more popular GEGLU for gating the feedforward
# https://arxiv.org/abs/2002.05202
# 定义类 SwiGLU,用于实现 SwiGLU 激活函数
class SwiGLU(nn.Module):
def forward(self, x):
x, gate = x.chunk(2, dim=-1)
return F.silu(gate) * x
# parallel attention and feedforward with residual
# discovered by Wang et al + EleutherAI from GPT-J fame
# 定义类 ParallelTransformerBlock,用于实现并行的注意力和前馈网络块
class ParallelTransformerBlock(nn.Module):
# 初始化函数,设置模型参数
def __init__(self, dim, dim_head=64, heads=8, ff_mult=4):
# 调用父类的初始化函数
super().__init__()
# 对输入进行归一化处理
self.norm = LayerNorm(dim)
# 计算注意力机制和前馈网络的内部维度
attn_inner_dim = dim_head * heads
ff_inner_dim = dim * ff_mult
self.fused_dims = (attn_inner_dim, dim_head, dim_head, (ff_inner_dim * 2))
# 设置头数和缩放因子
self.heads = heads
self.scale = dim_head**-0.5
# 初始化旋转嵌入
self.rotary_emb = RotaryEmbedding(dim_head)
# 定义融合的注意力机制和前馈网络的投影层
self.fused_attn_ff_proj = nn.Linear(dim, sum(self.fused_dims), bias=False)
self.attn_out = nn.Linear(attn_inner_dim, dim, bias=False)
# 前馈网络输出层
self.ff_out = nn.Sequential(
SwiGLU(),
nn.Linear(ff_inner_dim, dim, bias=False)
)
# 用于缓存因果掩码和旋转嵌入
self.mask = None
self.pos_emb = None
# 获取因果掩码
def get_mask(self, n, device):
if self.mask is not None and self.mask.shape[-1] >= n:
return self.mask[:n, :n].to(device)
mask = torch.ones((n, n), device=device, dtype=torch.bool).triu(1)
self.mask = mask
return mask
# 获取旋转嵌入
def get_rotary_embedding(self, n, device):
if self.pos_emb is not None and self.pos_emb.shape[-2] >= n:
return self.pos_emb[:n].to(device)
pos_emb = self.rotary_emb(n, device=device)
self.pos_emb = pos_emb
return pos_emb
# 前向传播函数
def forward(self, x, attn_mask=None):
\”\”\”
einstein notation
b – batch
h – heads
n, i, j – sequence length (base sequence length, source, target)
d – feature dimension
\”\”\”
n, device, h = x.shape[1], x.device, self.heads
# 预先归一化处理
x = self.norm(x)
# 获取注意力机制的查询、键、值和前馈网络的内部表示
q, k, v, ff = self.fused_attn_ff_proj(x).split(self.fused_dims, dim=-1)
# 分割头部
q = rearrange(q, \”b n (h d) -> b h n d\”, h=h)
# 旋转嵌入
positions = self.get_rotary_embedding(n, device)
q, k = map(lambda t: apply_rotary_pos_emb(positions, t), (q, k))
# 缩放
q = q * self.scale
# 相似度计算
sim = einsum(\”b h i d, b j d -> b h i j\”, q, k)
# 因果掩码
causal_mask = self.get_mask(n, device)
sim = sim.masked_fill(causal_mask, -torch.finfo(sim.dtype).max)
# 额外的注意力掩码
if exists(attn_mask):
attn_mask = rearrange(attn_mask, \’b i j -> b 1 i j\’)
sim = sim.masked_fill(~attn_mask, -torch.finfo(sim.dtype).max)
# 注意力计算
sim = sim – sim.amax(dim=-1, keepdim=True).detach()
attn = sim.softmax(dim=-1)
# 聚合值
out = einsum(\”b h i j, b j d -> b h i d\”, attn, v)
# 合并头部
out = rearrange(out, \”b h n d -> b n (h d)\”)
return self.attn_out(out) + self.ff_out(ff)
# 定义交叉注意力模块,使用多查询 + 单头键/值,类似于 PaLM,可选择并行前馈
class CrossAttention(nn.Module):
def __init__(
self,
dim,
*,
context_dim=None,
dim_head=64,
heads=8,
parallel_ff=False,
ff_mult=4,
norm_context=False
):
super().__init__()
self.heads = heads
self.scale = dim_head ** -0.5
inner_dim = heads * dim_head
context_dim = default(context_dim, dim)
self.norm = LayerNorm(dim)
self.context_norm = LayerNorm(context_dim) if norm_context else nn.Identity()
self.to_q = nn.Linear(dim, inner_dim, bias=False)
self.to_kv = nn.Linear(context_dim, dim_head * 2, bias=False)
self.to_out = nn.Linear(inner_dim, dim, bias=False)
# 是否使用并行前馈
ff_inner_dim = ff_mult * dim
self.ff = nn.Sequential(
nn.Linear(dim, ff_inner_dim * 2, bias=False),
SwiGLU(),
nn.Linear(ff_inner_dim, dim, bias=False)
) if parallel_ff else None
def forward(self, x, context):
\”\”\”
einstein notation
b – batch
h – heads
n, i, j – sequence length (base sequence length, source, target)
d – feature dimension
\”\”\”
# 预层归一化,用于查询和上下文
x = self.norm(x)
context = self.context_norm(context)
# 获取查询
q = self.to_q(x)
q = rearrange(q, \’b n (h d) -> b h n d\’, h = self.heads)
# 缩放
q = q * self.scale
# 获取键/值
k, v = self.to_kv(context).chunk(2, dim=-1)
# 查询/键相似度
sim = einsum(\’b h i d, b j d -> b h i j\’, q, k)
# 注意力
sim = sim – sim.amax(dim=-1, keepdim=True)
attn = sim.softmax(dim=-1)
# 聚合
out = einsum(\’b h i j, b j d -> b h i d\’, attn, v)
# 合并和组合头
out = rearrange(out, \’b h n d -> b n (h d)\’)
out = self.to_out(out)
# 添加并行前馈(用于多模态层)
if exists(self.ff):
out = out + self.ff(x)
return out
# transformer
class CoCa(nn.Module):
def __init__(
self,
*,
dim,
num_tokens,
unimodal_depth,
multimodal_depth,
dim_latents = None,
image_dim = None,
num_img_queries=256,
dim_head=64,
heads=8,
ff_mult=4,
img_encoder=None,
caption_loss_weight=1.,
contrastive_loss_weight=1.,
pad_id=0
# 初始化函数,设置模型的参数
def __init__(
self,
dim,
num_tokens,
pad_id,
caption_loss_weight,
contrastive_loss_weight,
img_encoder,
num_img_queries,
image_dim,
dim_head,
heads,
dim_latents,
unimodal_depth,
multimodal_depth,
ff_mult
):
# 调用父类的初始化函数
super().__init__()
# 设置模型的维度
self.dim = dim
# 设置填充标识符和损失权重
self.pad_id = pad_id
self.caption_loss_weight = caption_loss_weight
self.contrastive_loss_weight = contrastive_loss_weight
# token embeddings
# 创建 token embeddings 层
self.token_emb = nn.Embedding(num_tokens, dim)
# 创建文本分类标记
self.text_cls_token = nn.Parameter(torch.randn(dim))
# image encoder
# 设置图像编码器
self.img_encoder = img_encoder
# attention pooling for image tokens
# 创建图像查询参数
self.img_queries = nn.Parameter(torch.randn(num_img_queries + 1, dim)) # num image queries for multimodal, but 1 extra CLS for contrastive learning
# 创建图像注意力池化层
self.img_attn_pool = CrossAttention(dim=dim, context_dim=image_dim, dim_head=dim_head, heads=heads, norm_context=True)
# 图像注意力池化层的归一化
self.img_attn_pool_norm = LayerNorm(dim)
# 文本分类标记的归一化
self.text_cls_norm = LayerNorm(dim)
# to latents
# 设置潜变量的维度
dim_latents = default(dim_latents, dim)
# 图像到潜变量的映射
self.img_to_latents = EmbedToLatents(dim, dim_latents)
# 文本到潜变量的映射
self.text_to_latents = EmbedToLatents(dim, dim_latents)
# 对比学习的温度参数
self.temperature = nn.Parameter(torch.Tensor([1.]))
# unimodal layers
# 创建单模态层
self.unimodal_layers = nn.ModuleList([])
for ind in range(unimodal_depth):
self.unimodal_layers.append(
Residual(ParallelTransformerBlock(dim=dim, dim_head=dim_head, heads=heads, ff_mult=ff_mult)),
)
# multimodal layers
# 创建多模态层
self.multimodal_layers = nn.ModuleList([])
for ind in range(multimodal_depth):
self.multimodal_layers.append(nn.ModuleList([
Residual(ParallelTransformerBlock(dim=dim, dim_head=dim_head, heads=heads, ff_mult=ff_mult)),
Residual(CrossAttention(dim=dim, dim_head=dim_head, heads=heads, parallel_ff=True, ff_mult=ff_mult))
]))
# to logits
# 创建输出层
self.to_logits = nn.Sequential(
LayerNorm(dim),
nn.Linear(dim, num_tokens, bias=False)
)
# 将嵌入权重与投影层权重绑定
self.to_logits[-1].weight = self.token_emb.weight
# 初始化嵌入权重
nn.init.normal_(self.token_emb.weight, std=0.02)
# 是否处于数据并行设置中
self.is_distributed = dist.is_initialized() and dist.get_world_size() > 1
# 嵌入文本
def embed_text(self, text):
# 获取批次大小和设备
batch, device = text.shape[0], text.device
# 获取序列长度
seq = text.shape[1]
# 获取文本的 token embeddings
text_tokens = self.token_emb(text)
# 添加文本分类标记
text_cls_tokens = repeat(self.text_cls_token, \’d -> b 1 d\’, b=batch)
text_tokens = torch.cat((text_tokens, text_cls_tokens), dim=-2)
# 创建文本分类标记的特定掩码,防止其与填充部分进行注意力
cls_mask = rearrange(text!=self.pad_id, \’b j -> b 1 j\’)
attn_mask = F.pad(cls_mask, (0, 1, seq, 0), value=True)
# 经过单模态层
for attn_ff in self.unimodal_layers:
text_tokens = attn_ff(text_tokens, attn_mask=attn_mask)
# 获取文本分类标记
text_tokens, text_cls_tokens = text_tokens[:, :-1], text_tokens[:, -1]
text_embeds = self.text_cls_norm(text_cls_tokens)
return text_embeds, text_tokens
# 将图像嵌入到嵌入向量中
def embed_image(self, images=None, image_tokens=None):
# 将图像编码为嵌入向量
# 使用在初始化时传入的 img_encoder
# 也可以接受预先计算的图像标记
# 确保图像和图像标记不同时存在
assert not (exists(images) and exists(image_tokens))
if exists(images):
# 确保存在 self.img_encoder,用于自动图像编码
assert exists(self.img_encoder), \’img_encoder must be passed in for automatic image encoding\’
image_tokens = self.img_encoder(images)
# 注意力池化图像标记
img_queries = repeat(self.img_queries, \’n d -> b n d\’, b=image_tokens.shape[0])
img_queries = self.img_attn_pool(img_queries, image_tokens)
img_queries = self.img_attn_pool_norm(img_queries)
return img_queries[:, 0], img_queries[:, 1:]
def forward(
self,
text,
images=None,
image_tokens=None,
labels=None,
return_loss=False,
return_embeddings=False
):
batch, device = text.shape[0], text.device
if return_loss and not exists(labels):
text, labels = text[:, :-1], text[:, 1:]
text_embeds, text_tokens = self.embed_text(text)
image_embeds, image_tokens = self.embed_image(images=images, image_tokens=image_tokens)
# 如果研究人员需要返回嵌入向量,则返回嵌入向量
if return_embeddings:
return text_embeds, image_embeds
# 经过多模态层
for attn_ff, cross_attn in self.multimodal_layers:
text_tokens = attn_ff(text_tokens)
text_tokens = cross_attn(text_tokens, image_tokens)
logits = self.to_logits(text_tokens)
if not return_loss:
return logits
# 缩写
ce = F.cross_entropy
# 计算标题损失(交叉熵损失)
logits = rearrange(logits, \’b n c -> b c n\’)
caption_loss = ce(logits, labels, ignore_index=self.pad_id)
caption_loss = caption_loss * self.caption_loss_weight
# 嵌入到潜变量
text_latents = self.text_to_latents(text_embeds)
image_latents = self.img_to_latents(image_embeds)
# 可能进行分布式全收集
if self.is_distributed:
latents = torch.stack((text_latents, image_latents), dim=1)
latents = all_gather(latents)
text_latents, image_latents = latents.unbind(dim=1)
# 计算对比损失
sim = einsum(\’i d, j d -> i j\’, text_latents, image_latents)
sim = sim * self.temperature.exp()
contrastive_labels = torch.arange(batch, device=device)
contrastive_loss = (ce(sim, contrastive_labels) + ce(sim.t(), contrastive_labels)) * 0.5
contrastive_loss = contrastive_loss * self.contrastive_loss_weight
return caption_loss + contrastive_loss
#以上关于Lucidrains 系列项目源码解析(十二)的相关内容来源网络仅供参考,相关信息请以官方公告为准!
原创文章,作者:CSDN,如若转载,请注明出处:https://www.sudun.com/ask/93269.html