Lucidrains 系列项目源码解析(二十)(lua源码分析)

Lucidrains 系列项目源码解析(二十).\\lucidrains\\DALLE2-pytorch\\dalle2_pytorch\\train_configs.py
# 导入所需的库
import json
from torchvisio

.\\lucidrains\\DALLE2-pytorch\\dalle2_pytorch\\train_configs.py

#导入需要的库

导入json

从torchvision 导入转换为T

从pydantic 导入BaseModel、验证器、model_validator

来自输入列表、可选、联合、元组、字典、任意、TypeVar

#导入自定义模块

从x_clip 导入CLIP 作为XCLIP

从open_clip 导入list_pretrained

从coca_pytorch 导入CoCa

#将类导入自定义模块

从dalle2_pytorch.dalle2_pytorch 导入(

可口可乐适配器,

打开AIClipAdapter,

开放式夹子适配器,

尤网,

解码器,

在传播之前,

扩散推进网络,

XClip 适配器

从dalle2_pytorch.trackers 导入Tracker、create_loader、create_logger、create_saver

# 辅助函数

# 检查值是否存在

默认存在(val):

返回值不为None

# 如果存在则返回值,否则返回默认值

默认默认(val,d):

如果存在则返回val(val),否则返回d

# 定义类型变量

InnerType=TypeVar(\’InnerType\’)

ListOrTuple=联合[列表[内部类型], 元组[内部类型]]

SingularOrIterable=Union[InnerType, ListOrTuple]

# 一般pydantic 类

# 训练集系作文课

类TrainSplitConfig(BaseModel):

train: 浮动=0.75

val: 浮点=0.15

test: 浮点数=0.1

# 检查所有参数的总和是否为1

@model_validator(模式=\’之后\’)

def validate_all(self, m):

实际总和=sum([*dict(self).values()])

如果实际总和!=1.

raise ValueError(f\'{dict(self).keys()} sum 必须为1.0。Found: {actual_sum}\’)

还给你自己

# 日志跟踪配置类

类TrackerLogConfig(BaseModel):

log_type: str=\’控制台\’

resume: bool=False # 对于保存在唯一位置的日志,恢复之前的执行

auto_resume: bool=False # 如果进程崩溃并重新启动,则从崩溃的执行中恢复

verbose: 布尔值=False

班级构成:

# 每个日志类型都有自己的参数,这些参数是通过配置传递的。

添加=\’允许\’

# 创建一个记录器

def create(self, data_path: str):

kwargs=self.dict()

返回create_logger(self.log_type, data_path, **kwargs)

#负载跟踪配置类

类TrackerLoadConfig(BaseModel):

load_from: 选项[str]=无

only_auto_resume: bool=False # 仅当记录器自动恢复时才尝试加载

班级构成:

添加=\’允许\’

#创建一个加载器

def create(self, data_path: str):

kwargs=self.dict()

第:章如果self.load_from是None

不返回任何内容

返回create_loader(self.load_from, data_path, **kwargs)

#保存跟踪设置类

类TrackerSaveConfig(BaseModel):

save_to: str=\’本地\’

save_all: 布尔值=False

save_latest: 布尔=True

save_best: 布尔值=True

班级构成:

添加=\’允许\’

#创建一个保存程序

def create(self, data_path: str):

kwargs=self.dict()

返回create_saver(self.save_to, data_path, **kwargs)

# 跟踪配置类

类TrackerConfig(BaseModel):

data_path: str=\’.tracker_data\’

overwrite_data_path: 布尔=False

log: TrackerLogConfig

load: 选项[TrackerLoadConfig]=None

save: 联盟[列表[TrackerSaveConfig], TrackerSaveConfig]

#创建跟踪器

def create(self, full_config: BaseModel, extra_config: dict, dummy_mode: bool=False) – Tracker:

tracker=Tracker(self.data_path, dummy_mode=dummy_mode, overwrite_data_path=self.overwrite_data_path)

# 添加记录器

tracker.add_logger(self.log.create(self.data_path))

#添加加载器

第:章如果self.load不是None

tracker.add_loader(self.load.create(self.data_path))

# 添加保护程序或保护程序

if isinstance(self.save, 列表):

对于self.save: 中的save_config

tracker.add_saver(save_config.create(self.data_path))

: 其他

tracker.add_saver(self.save.create(self.data_path))

# 初始化所有组件并检查所有数据是否有效

tracker.init(完整配置,额外配置)

返回追踪器

# 扩散预设类

# 适配器配置类

类适配器配置(BaseModel):

make: str=\’开放之夜\’

型号: str=\’ViT-L/14\’

base_model_kwargs: 选项[Dict[str, Any]]=None

# 创建适配器对象的方法

def 创建(自我):

# 如果适配器类型为\’openai\’,则返回OpenAIClipAdapter对象

如果self.make==\’openai\’:

返回OpenAIClipAdapter(self.model)

# 如果适配器类型为\’open_clip\’,则返回OpenClipAdapter对象

elif self.make==\’open_clip\’:

# 获取预训练模型列表并选择对应的模型检查点

预训练=dict(list_pretrained())

检查点=预训练[self.model]

返回OpenClipAdapter(name=self.model, pretrained=checkpoint)

# 如果适配器类型为“x-clip”,则返回XClipAdapter 对象

elif self.make==\’x-clip\’:

返回XClipAdapter(XCLIP(**self.base_model_kwargs))

# 如果适配器类型是\’coca\’,则返回一个CoCaAdapter对象

elif self.make==\’古柯\’:

返回CoCaAdapter(CoCa(**self.base_model_kwargs))

# 如果适配器类型与已知类型不匹配,则抛出属性错误异常

: 其他

raise AttributeError(\’具有该名称的适配器不可用。\’)

# 定义DiffusionPriorNetworkConfig 类,其中包含模型的各种配置参数。

类DiffusionPriorNetworkConfig(BaseModel):

dim: int # 模型尺寸

Depth: int # 模型深度

max_text_len: 可选[int]=None # 最大文本长度

num_timesteps: 可选[int]=None # 时间步数

num_time_embeds: int=1 # 嵌入时间数

num_image_embeds: int=1 # 图像嵌入数量

num_text_embeds: int=1 # 文本嵌入数量

dim_head: int=64 # 头部尺寸

head: int=8 # 头数

ff_mult: int=4 # 前馈层的倍数

Norm_in: bool=False # 输入层是否标准化

Norm_out: bool=True # 输出层是否标准化

attn_dropout: float=0. # Attention dropout概率机制

ff_dropout: float=0. # 前馈层dropout 概率

Final_proj: bool=True # 是否进行最终投影?

Normformer: bool=False # 是否使用Normformer?

rotary_emb: bool=True # 是否使用旋转嵌入?

班级构成:

添加=\’允许\’

#创建DiffusionPriorNetwork对象

def 创建(自我):

kwargs=self.dict()

返回DiffusionPriorNetwork(**kwargs)

# 定义包含模型配置参数的DiffusionPriorConfig 类。

类DiffusionPriorConfig(BaseModel):

Clip: 可选[AdapterConfig]=None # 适配器配置

net: DiffusionPriorNetworkConfig # DiffusionPriorNetworkConfig 对象

image_embed_dim: int # 图像嵌入大小

image_size: int # 图像大小

image_channels: int=3 # 图像通道数

timesteps: int=1000 # 时间步数

Sample_timesteps:Optional[int]=None # 采样时间步数

cond_drop_prob: float=0. # 条件丢弃概率

loss_type: str=\’l2\’ # 损失类型

detector_x_start: bool=True # 是否预测x的起始点

beta_schedule: str=\’cosine\’ # Beta 时间表

condition_on_text_encodings: bool=True # 是否以文本编码为条件?

班级构成:

添加=\’允许\’

创建一个#DiffusionPrior 对象

def 创建(自我):

kwargs=self.dict()

has_clip=存在(kwargs.pop(\’clip\’))

kwargs.pop(\’net\’)

剪辑=无

如果有_clip:

Clip=self.clip.create()

扩散先验网络=self.net.create()

返回DiffusionPrior(net=diffusion_prior_network, Clip=clip, **kwargs)

# 定义包含训练配置参数的DiffusionPriorTrainConfig 类。

类DiffusionPriorTrainConfig(BaseModel):

epochs: int=1 # 训练轮数

lr: float=1.1e-4 # 学习率

wd: float=6.02e-2 # 权重衰减

max_grad_norm: float=0.5 # 最大梯度范数

use_ema: bool=True # 是否使用指数移动平均线?

ema_beta: float=0.99 # 指数移动平均线的贝塔值

amp: bool=False # 是否使用混合精度训练?

Warmup_steps: 可选[int]=None # 预热步骤数

save_every_seconds: int=3600 # 保存模型的时间间隔

eval_timesteps: List[int]=[64] # 评估时间步数

best_validation_loss: float=1e9 # 最佳验证损失

current_epoch: int=0 # 当前轮数

num_samples_seen: int=0 # 当前样本数

random_seed: int=0 # 随机种子

# 定义包含数据配置参数的DiffusionPriorDataConfig类。

类DiffusionPriorDataConfig(BaseModel):

image_url: str # 嵌入文件夹路径

meta_url: str # 图像元数据(标题)路径

splits: TrainSplitConfig # 分割训练、验证和测试数据集

batch_size: int # 每个GPU 的批量大小

num_data_points: int=25e7 # 训练数据点总数

eval_every_seconds: int=3600 # 运行验证统计数据的频率

# 定义包含训练配置参数的TrainDiffusionPriorConfig类

类TrainDiffusionPriorConfig(BaseModel):

priority: DiffusionPriorConfig # DiffusionPriorConfig 对象

data: DiffusionPriorDataConfig # DiffusionPriorDataConfig 对象

train: DiffusionPriorTrainConfig # DiffusionPriorTrainConfig 对象

tracker: TrackerConfig # 跟踪器配置

# 从JSON 路径加载配置

@类方法

def from_json_path(cls, json_path):

使用open(json_path) 作为f:

设置=json.load(f)

返回cls(**配置)

#Decoder Pydantic 类

# 定义包含Unet模型配置参数的UnetConfig类。

类UnetConfig(BaseModel):

dim: int # 维度

dim_mults: ListOrTuple[int] # 维乘列表

image_embed_dim:Optional[int]=None # 图像嵌入尺寸

text_embed_dim:Optional[int]=None # 文本嵌入尺寸

cond_on_text_encodings:Optional[bool]=None # 是否使用文本编码作为条件

cond_dim: 可选[int]=None #条件维度

Channels: int=3 # 通道数

self_attn: SingularOrIterable[bool]=False #自注意力机制

attn_dim_head: int=32 # 注意头部尺寸

attn_heads: int=16 # 注意力头数量

init_cross_embed: bool=True # 是否初始化交叉嵌入

班级构成:

添加=\’允许\’

# 定义一个包含解码器配置参数的DecoderConfig类。

类DecoderConfig(BaseModel):

unets: ListOrTuple[UnetConfig] # UnetConfig 列表

image_size: 可选[int]=None # 图像大小

image_sizes: ListOrTuple[int]=None # 图像大小列表

Clip:Optional[AdapterConfig]=None # 适配器配置(如果没有提供嵌入,将使用剪辑模型)

Channels: int=3 # 通道数

timesteps: int=1000 # 时间步数

Sample_timesteps:Optional[SingularOrIterable[Optional[int]]]=None # 采样时间步数

loss_type: str=\’l2\’ # 损失类型

beta_schedule:Optional[ListOrTuple[str]]=None # Beta 时间表(None 表示所有余弦)

# 定义学习分布参数。默认为True

learn_variance: SingularOrIterable[bool]=True

# 定义图像条件下的丢弃概率。默认值为0.1。

image_cond_drop_prob: 浮点数=0.1

# 定义文本条件的丢弃概率。默认值为0.5。

def 创建(自我):

# 从参数中提取解码器参数

解码器_kwargs=self.dict()

# 从解码器参数中提取UNet 配置

unet_configs=detector_kwargs.pop(\’unets\’)

#根据UNet设置创建UNet对象列表

unets=[Unet(**config) 用于配置unet_configs]

# 检查裁剪参数是否存在

has_clip=存在(decoder_kwargs.pop(\’clip\’))

剪辑=无

# 如果剪辑参数存在则创建一个剪辑对象

如果有_clip:

Clip=self.clip.create()

# 传递一个UNet对象列表和一个clip对象并返回一个解码器对象。

返回解码器(unets, Clip=clip, **decoder_kwarg

s)
# 验证器,用于检查图像大小参数
@validator(\’image_sizes\’)
def check_image_sizes(cls, image_sizes, values):
# 如果 image_size 和 image_sizes 中只有一个存在,则抛出异常
if exists(values.get(\’image_size\’)) ^ exists(image_sizes):
return image_sizes
raise ValueError(\’either image_size or image_sizes is required, but not both\’)
# 类配置,允许额外参数
class Config:
extra = \”allow\”
# 定义一个配置类,用于存储解码器的训练配置信息
class DecoderDataConfig(BaseModel):
webdataset_base_url: str # 存储包含jpg图像的webdataset的路径
img_embeddings_url: Optional[str] = None # 存储包含嵌入向量的.npy文件的路径
text_embeddings_url: Optional[str] = None # 存储包含嵌入向量的.npy文件的路径
num_workers: int = 4 # 工作进程数
batch_size: int = 64 # 批量大小
start_shard: int = 0 # 起始分片
end_shard: int = 9999999 # 结束分片
shard_width: int = 6 # 分片宽度
index_width: int = 4 # 索引宽度
splits: TrainSplitConfig # 训练数据集拆分配置
shuffle_train: bool = True # 是否对训练数据进行洗牌
resample_train: bool = False # 是否重新采样训练数据
preprocessing: Dict[str, Any] = {\’ToTensor\’: True} # 预处理步骤配置
@property
def img_preproc(self):
# 获取图像预处理转换函数
def _get_transformation(transformation_name, **kwargs):
if transformation_name == \”RandomResizedCrop\”:
return T.RandomResizedCrop(**kwargs)
elif transformation_name == \”RandomHorizontalFlip\”:
return T.RandomHorizontalFlip()
elif transformation_name == \”ToTensor\”:
return T.ToTensor()
transforms = []
# 遍历预处理配置,生成转换函数列表
for transform_name, transform_kwargs_or_bool in self.preprocessing.items():
transform_kwargs = {} if not isinstance(transform_kwargs_or_bool, dict) else transform_kwargs_or_bool
transforms.append(_get_transformation(transform_name, **transform_kwargs))
return T.Compose(transforms)
# 定义一个配置类,用于存储解码器的训练配置信息
class DecoderTrainConfig(BaseModel):
epochs: int = 20 # 训练轮数
lr: SingularOrIterable[float] = 1e-4 # 学习率
wd: SingularOrIterable[float] = 0.01 # 权重衰减
warmup_steps: Optional[SingularOrIterable[int]] = None # 预热步数
find_unused_parameters: bool = True # 是否查找未使用的参数
static_graph: bool = True # 是否使用静态图
max_grad_norm: SingularOrIterable[float] = 0.5 # 最大梯度范数
save_every_n_samples: int = 100000 # 每隔多少样本保存一次模型
n_sample_images: int = 6 # 在采样训练和测试数���集时生成的示例图像数量
cond_scale: Union[float, List[float]] = 1.0 # 条件缩放
device: str = \’cuda:0\’ # 设备
epoch_samples: Optional[int] = None # 每轮样本数限制
validation_samples: Optional[int] = None # 验证集样本数限制
save_immediately: bool = False # 是否立即保存
use_ema: bool = True # 是否使用指数移动平均
ema_beta: float = 0.999 # 指数移动平均的beta值
amp: bool = False # 是否使用混合精度训练
unet_training_mask: Optional[ListOrTuple[bool]] = None # UNet训练掩码
# 定义一个配置类,用于存储解码器的评估配置信息
class DecoderEvaluateConfig(BaseModel):
n_evaluation_samples: int = 1000 # 评估样本数
FID: Optional[Dict[str, Any]] = None # FID评估配置
IS: Optional[Dict[str, Any]] = None # IS评估配置
KID: Optional[Dict[str, Any]] = None # KID评估配置
LPIPS: Optional[Dict[str, Any]] = None # LPIPS评估配置
# 定义一个配置类,用于存储训练解码器的完整配置信息
class TrainDecoderConfig(BaseModel):
decoder: DecoderConfig # 解码器配置
data: DecoderDataConfig # 数据配置
train: DecoderTrainConfig # 训练配置
evaluate: DecoderEvaluateConfig # 评估配置
tracker: TrackerConfig # 追踪器配置
seed: int = 0 # 随机种子
@classmethod
def from_json_path(cls, json_path):
with open(json_path) as f:
config = json.load(f) # 从JSON文件中加载配置
print(config)
return cls(**config)
@model_validator(mode = \’after\’) # 模型验证器
# 检查是否提供了足够的信息来获取指定用于训练的嵌入
def check_has_embeddings(self, m):
# 将self转换为字典
values = dict(self)
# 获取data和decoder配置
data_config, decoder_config = values.get(\’data\’), values.get(\’decoder\’)
# 如果data_config或decoder_config不存在
if not exists(data_config) or not exists(decoder_config):
# 则发生了其他错误,应该直接返回values
return values
# 检查decoder是否使用文本嵌入
using_text_embeddings = any([unet.cond_on_text_encodings for unet in decoder_config.unets])
# 检查是否使用了clip
using_clip = exists(decoder_config.clip)
# 获取图片嵌入和文本嵌入的URL
img_emb_url = data_config.img_embeddings_url
text_emb_url = data_config.text_embeddings_url
# 如果使用了文本嵌入
if using_text_embeddings:
# 需要一种方法来获取嵌入
assert using_clip or exists(text_emb_url), \’If text conditioning, either clip or text_embeddings_url must be provided\’
# 如果使用了clip
if using_clip:
# 如果同时使用了文本嵌入和图片嵌入的URL
if using_text_embeddings:
assert not exists(text_emb_url) or not exists(img_emb_url), \’Loaded clip, but also provided text_embeddings_url and img_embeddings_url. This is redundant. Remove the clip model or the text embeddings\’
else:
assert not exists(img_emb_url), \’Loaded clip, but also provided img_embeddings_url. This is redundant. Remove the clip model or the embeddings\’
# 如果存在文本嵌入的URL
if text_emb_url:
assert using_text_embeddings, \”Text embeddings are being loaded, but text embeddings are not being conditioned on. This will slow down the dataloader for no reason.\”
# 返回m
return m

.\\lucidrains\\DALLE2-pytorch\\dalle2_pytorch\\utils.py

# 导入时间模块
import time
# 导入 importlib 模块
import importlib
# 辅助函数
# 检查值是否存在
def exists(val):
return val is not None
# 时间辅助函数
# 计时器类
class Timer:
def __init__(self):
self.reset()
# 重置计时器
def reset(self):
self.last_time = time.time()
# 返回经过的时间
def elapsed(self):
return time.time() – self.last_time
# 打印辅助函数
# 打印带边框的字符串
def print_ribbon(s, symbol=\’=\’, repeat=40):
flank = symbol * repeat
return f\'{flank} {s} {flank}\’
# 导入辅助函数
# 尝试导入指定模块,如果失败则打印错误信息并退出程序
def import_or_print_error(pkg_name, err_str=None):
try:
return importlib.import_module(pkg_name)
except ModuleNotFoundError as e:
if exists(err_str):
print(err_str)
exit()

.\\lucidrains\\DALLE2-pytorch\\dalle2_pytorch\\version.py

# 定义变量 __version__,赋值为字符串 \’1.15.6\’
__version__ = \’1.15.6\’

.\\lucidrains\\DALLE2-pytorch\\dalle2_pytorch\\vqgan_vae.py

# 导入必要的库
import copy
import math
from math import sqrt
from functools import partial, wraps
# 导入自定义模块
from vector_quantize_pytorch import VectorQuantize as VQ
# 导入 PyTorch 库
import torch
from torch import nn, einsum
import torch.nn.functional as F
from torch.autograd import grad as torch_grad
import torchvision
# 导入 einops 库
from einops import rearrange, reduce, repeat, pack, unpack
from einops.layers.torch import Rearrange
# 定义常量
MList = nn.ModuleList
# 辅助函数
# 判断变量是否存在
def exists(val):
return val is not None
# 返回默认值
def default(val, d):
return val if exists(val) else d
# 装饰器
# 模型评估装饰器
def eval_decorator(fn):
def inner(model, *args, **kwargs):
was_training = model.training
model.eval()
out = fn(model, *args, **kwargs)
model.train(was_training)
return out
return inner
# 移除 VGG 模型装饰器
def remove_vgg(fn):
@wraps(fn)
def inner(self, *args, **kwargs):
has_vgg = hasattr(self, \’vgg\’)
if has_vgg:
vgg = self.vgg
delattr(self, \’vgg\’)
out = fn(self, *args, **kwargs)
if has_vgg:
self.vgg = vgg
return out
return inner
# 关键字参数辅助函数
# 从字典中选择指定键的值并弹出这些键
def pick_and_pop(keys, d):
values = list(map(lambda key: d.pop(key), keys))
return dict(zip(keys, values))
# 根据条件将字典分组
def group_dict_by_key(cond, d):
return_val = [dict(),dict()]
for key in d.keys():
match = bool(cond(key))
ind = int(not match)
return_val[ind][key] = d[key]
return (*return_val,)
# 判断字符串是否以指定前缀开头
def string_begins_with(prefix, string_input):
return string_input.startswith(prefix)
# 根据前缀将字典分组
def group_by_key_prefix(prefix, d):
return group_dict_by_key(partial(string_begins_with, prefix), d)
# 根据前缀将字典分组并去除前缀
def groupby_prefix_and_trim(prefix, d):
kwargs_with_prefix, kwargs = group_dict_by_key(partial(string_begins_with, prefix), d)
kwargs_without_prefix = dict(map(lambda x: (x[0][len(prefix):], x[1]), tuple(kwargs_with_prefix.items()))
return kwargs_without_prefix, kwargs
# 张量辅助函数
# 对数函数
def log(t, eps = 1e-10):
return torch.log(t + eps)
# 计算梯度惩罚
def gradient_penalty(images, output, weight = 10):
batch_size = images.shape[0]
gradients = torch_grad(outputs = output, inputs = images,
grad_outputs = torch.ones(output.size(), device = images.device),
create_graph = True, retain_graph = True, only_inputs = True)[0]
gradients = rearrange(gradients, \’b … -> b (…)\’)
return weight * ((gradients.norm(2, dim = 1) – 1) ** 2).mean()
# L2 归一化
def l2norm(t):
return F.normalize(t, dim = -1)
# Leaky ReLU 激活函数
def leaky_relu(p = 0.1):
return nn.LeakyReLU(0.1)
# 稳定的 Softmax 函数
def stable_softmax(t, dim = -1, alpha = 32 ** 2):
t = t / alpha
t = t – torch.amax(t, dim = dim, keepdim = True).detach()
return (t * alpha).softmax(dim = dim)
# 安全除法
def safe_div(numer, denom, eps = 1e-8):
return numer / (denom + eps)
# GAN 损失函数
# Hinge 判别器损失
def hinge_discr_loss(fake, real):
return (F.relu(1 + fake) + F.relu(1 – real)).mean()
# Hinge 生成器损失
def hinge_gen_loss(fake):
return -fake.mean()
# 二元交叉熵判别器损失
def bce_discr_loss(fake, real):
return (-log(1 – torch.sigmoid(fake)) – log(torch.sigmoid(real))).mean()
# 二元交叉熵生成器损失
def bce_gen_loss(fake):
return -log(torch.sigmoid(fake)).mean()
# 计算损失对层的梯度
def grad_layer_wrt_loss(loss, layer):
return torch_grad(
outputs = loss,
inputs = layer,
grad_outputs = torch.ones_like(loss),
retain_graph = True
)[0].detach()
# VQGAN VAE
# 通道层归一化
class LayerNormChan(nn.Module):
def __init__(
self,
dim,
eps = 1e-5
):
super().__init__()
self.eps = eps
self.gamma = nn.Parameter(torch.ones(1, dim, 1, 1))
def forward(self, x):
var = torch.var(x, dim = 1, unbiased = False, keepdim = True)
mean = torch.mean(x, dim = 1, keepdim = True)
return (x – mean) / (var + self.eps).sqrt() * self.gamma
# 判别器
class Discriminator(nn.Module):
def __init__(
self,
dims,
channels = 3,
groups = 16,
init_kernel_size = 5
# 定义一个继承自 nn.Module 的类,用于构建一个简单的卷积神经网络
):
# 调用父类的初始化方法
super().__init__()
# 将输入维度按照前后两两配对,形成一个维度对的列表
dim_pairs = zip(dims[:-1], dims[1:])
# 初始化网络的第一层,包括一个卷积层和激活函数
self.layers = MList([nn.Sequential(nn.Conv2d(channels, dims[0], init_kernel_size, padding = init_kernel_size // 2), leaky_relu())])
# 遍历维度对列表,构建网络的中间层,每层包括卷积层、归一化层和激活函数
for dim_in, dim_out in dim_pairs:
self.layers.append(nn.Sequential(
nn.Conv2d(dim_in, dim_out, 4, stride = 2, padding = 1),
nn.GroupNorm(groups, dim_out),
leaky_relu()
))
# 获取最后一个维度
dim = dims[-1]
# 构建输出层,包括两个卷积层和激活函数,用于生成输出结果
self.to_logits = nn.Sequential( # return 5 x 5, for PatchGAN-esque training
nn.Conv2d(dim, dim, 1),
leaky_relu(),
nn.Conv2d(dim, 1, 4)
)
# 定义前向传播方法,将输入数据通过网络层进行处理,得到输出结果
def forward(self, x):
# 遍历网络的每一层,将输入数据依次传递给每一层
for net in self.layers:
x = net(x)
# 返回经过所有网络层处理后的输出结果
return self.to_logits(x)
# positional encoding
class ContinuousPositionBias(nn.Module):
\”\”\” from https://arxiv.org/abs/2111.09883 \”\”\”
def __init__(self, *, dim, heads, layers = 2):
super().__init__()
self.net = MList([])
self.net.append(nn.Sequential(nn.Linear(2, dim), leaky_relu()))
for _ in range(layers – 1):
self.net.append(nn.Sequential(nn.Linear(dim, dim), leaky_relu()))
self.net.append(nn.Linear(dim, heads)
# 初始化一个空的相对位置矩阵
self.register_buffer(\’rel_pos\’, None, persistent = False)
def forward(self, x):
n, device = x.shape[-1], x.device
fmap_size = int(sqrt(n))
if not exists(self.rel_pos):
# 生成位置信息
pos = torch.arange(fmap_size, device = device)
grid = torch.stack(torch.meshgrid(pos, pos, indexing = \’ij\’))
grid = rearrange(grid, \’c i j -> (i j) c\’)
rel_pos = rearrange(grid, \’i c -> i 1 c\’) – rearrange(grid, \’j c -> 1 j c\’)
rel_pos = torch.sign(rel_pos) * torch.log(rel_pos.abs() + 1)
# 将生成的位置信息存储在缓冲区中
self.register_buffer(\’rel_pos\’, rel_pos, persistent = False)
rel_pos = self.rel_pos.float()
for layer in self.net:
rel_pos = layer(rel_pos)
bias = rearrange(rel_pos, \’i j h -> h i j\’)
return x + bias
# resnet encoder / decoder
class ResnetEncDec(nn.Module):
def __init__(
self,
dim,
*,
channels = 3,
layers = 4,
layer_mults = None,
num_resnet_blocks = 1,
resnet_groups = 16,
first_conv_kernel_size = 5,
use_attn = True,
attn_dim_head = 64,
attn_heads = 8,
attn_dropout = 0.,
):
super().__init__()
assert dim % resnet_groups == 0, f\’dimension {dim} must be divisible by {resnet_groups} (groups for the groupnorm)\’
self.layers = layers
self.encoders = MList([])
self.decoders = MList([])
layer_mults = default(layer_mults, list(map(lambda t: 2 ** t, range(layers))))
assert len(layer_mults) == layers, \’layer multipliers must be equal to designated number of layers\’
layer_dims = [dim * mult for mult in layer_mults]
dims = (dim, *layer_dims)
self.encoded_dim = dims[-1]
dim_pairs = zip(dims[:-1], dims[1:])
append = lambda arr, t: arr.append(t)
prepend = lambda arr, t: arr.insert(0, t)
if not isinstance(num_resnet_blocks, tuple):
num_resnet_blocks = (*((0,) * (layers – 1)), num_resnet_blocks)
if not isinstance(use_attn, tuple):
use_attn = (*((False,) * (layers – 1)), use_attn)
assert len(num_resnet_blocks) == layers, \’number of resnet blocks config must be equal to number of layers\’
assert len(use_attn) == layers
for layer_index, (dim_in, dim_out), layer_num_resnet_blocks, layer_use_attn in zip(range(layers), dim_pairs, num_resnet_blocks, use_attn):
append(self.encoders, nn.Sequential(nn.Conv2d(dim_in, dim_out, 4, stride = 2, padding = 1), leaky_relu()))
prepend(self.decoders, nn.Sequential(nn.ConvTranspose2d(dim_out, dim_in, 4, 2, 1), leaky_relu()))
if layer_use_attn:
prepend(self.decoders, VQGanAttention(dim = dim_out, heads = attn_heads, dim_head = attn_dim_head, dropout = attn_dropout))
for _ in range(layer_num_resnet_blocks):
append(self.encoders, ResBlock(dim_out, groups = resnet_groups))
prepend(self.decoders, GLUResBlock(dim_out, groups = resnet_groups))
if layer_use_attn:
append(self.encoders, VQGanAttention(dim = dim_out, heads = attn_heads, dim_head = attn_dim_head, dropout = attn_dropout))
prepend(self.encoders, nn.Conv2d(channels, dim, first_conv_kernel_size, padding = first_conv_kernel_size // 2))
append(self.decoders, nn.Conv2d(dim, channels, 1))
def get_encoded_fmap_size(self, image_size):
return image_size // (2 ** self.layers)
# 定义一个属性,返回最后一个解码器的权重
@property
def last_dec_layer(self):
return self.decoders[-1].weight
# 编码函数,对输入数据进行编码
def encode(self, x):
# 遍历所有编码器,对输入数据进行编码
for enc in self.encoders:
x = enc(x)
# 返回编码后的数据
return x
# 解码函数,对输入数据进行解码
def decode(self, x):
# 遍历所有解码器,对输入数据进行解码
for dec in self.decoders:
x = dec(x)
# 返回解码后的数据
return x
# 定义 GLUResBlock 类,继承自 nn.Module
class GLUResBlock(nn.Module):
# 初始化函数,接受通道数和组数作为参数
def __init__(self, chan, groups = 16):
super().__init__()
# 定义网络结构为一个序列
self.net = nn.Sequential(
nn.Conv2d(chan, chan * 2, 3, padding = 1), # 3×3 卷积层
nn.GLU(dim = 1), # GLU 激活函数
nn.GroupNorm(groups, chan), # 分组归一化
nn.Conv2d(chan, chan * 2, 3, padding = 1), # 3×3 卷积层
nn.GLU(dim = 1), # GLU 激活函数
nn.GroupNorm(groups, chan), # 分组归一化
nn.Conv2d(chan, chan, 1) # 1×1 卷积层
)
# 前向传播函数
def forward(self, x):
return self.net(x) + x # 返回网络输出与输入的和
# 定义 ResBlock 类,继承自 nn.Module
class ResBlock(nn.Module):
# 初始化函数,接受通道数和组数作为参数
def __init__(self, chan, groups = 16):
super().__init__()
# 定义网络结构为一个序列
self.net = nn.Sequential(
nn.Conv2d(chan, chan, 3, padding = 1), # 3×3 卷积层
nn.GroupNorm(groups, chan), # 分组归一化
leaky_relu(), # leaky_relu 激活函数
nn.Conv2d(chan, chan, 3, padding = 1), # 3×3 卷积层
nn.GroupNorm(groups, chan), # 分组归一化
leaky_relu(), # leaky_relu 激活函数
nn.Conv2d(chan, chan, 1) # 1×1 卷积层
)
# 前向传播函数
def forward(self, x):
return self.net(x) + x # 返回网络输出与输入的和
# 定义 VQGanAttention 类,继承自 nn.Module
class VQGanAttention(nn.Module):
# 初始化函数,接受维度、头数、头维度和 dropout 等参数
def __init__(
self,
*,
dim,
dim_head = 64,
heads = 8,
dropout = 0.
):
super().__init__()
self.heads = heads
self.scale = dim_head ** -0.5
inner_dim = heads * dim_head
self.dropout = nn.Dropout(dropout)
self.pre_norm = LayerNormChan(dim)
self.cpb = ContinuousPositionBias(dim = dim // 4, heads = heads)
self.to_qkv = nn.Conv2d(dim, inner_dim * 3, 1, bias = False)
self.to_out = nn.Conv2d(inner_dim, dim, 1, bias = False)
# 前向传播函数
def forward(self, x):
h = self.heads
height, width, residual = *x.shape[-2:], x.clone()
x = self.pre_norm(x)
q, k, v = self.to_qkv(x).chunk(3, dim = 1)
q, k, v = map(lambda t: rearrange(t, \’b (h c) x y -> b h c (x y)\’, h = h), (q, k, v))
sim = einsum(\’b h c i, b h c j -> b h i j\’, q, k) * self.scale
sim = self.cpb(sim)
attn = stable_softmax(sim, dim = -1)
attn = self.dropout(attn)
out = einsum(\’b h i j, b h c j -> b h c i\’, attn, v)
out = rearrange(out, \’b h c (x y) -> b (h c) x y\’, x = height, y = width)
out = self.to_out(out)
return out + residual
# 定义 RearrangeImage 类,继承自 nn.Module
class RearrangeImage(nn.Module):
# 前向传播函数
def forward(self, x):
n = x.shape[1]
w = h = int(sqrt(n))
return rearrange(x, \’b (h w) … -> b h w …\’, h = h, w = w)
# 定义 Attention 类,继承自 nn.Module
class Attention(nn.Module):
# 初始化函数,接受维度、头数和头维度等参数
def __init__(
self,
dim,
*,
heads = 8,
dim_head = 32
):
super().__init__()
self.norm = nn.LayerNorm(dim)
self.heads = heads
self.scale = dim_head ** -0.5
inner_dim = dim_head * heads
self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)
self.to_out = nn.Linear(inner_dim, dim)
# 前向传播函数
def forward(self, x):
h = self.heads
x = self.norm(x)
q, k, v = self.to_qkv(x).chunk(3, dim = -1)
q, k, v = map(lambda t: rearrange(t, \’b n (h d) -> b h n d\’, h = h), (q, k, v))
q = q * self.scale
sim = einsum(\’b h i d, b h j d -> b h i j\’, q, k)
sim = sim – sim.amax(dim = -1, keepdim = True).detach()
attn = sim.softmax(dim = -1)
out = einsum(\’b h i j, b h j d -> b h i d\’, attn, v)
out = rearrange(out, \’b h n d -> b n (h d)\’)
return self.to_out(out)
# 定义 FeedForward 函数,返回一个包含层归一化、线性层、GELU 激活函数和线性层的序列
def FeedForward(dim, mult = 4):
return nn.Sequential(
nn.LayerNorm(dim),
nn.Linear(dim, dim * mult, bias = False),
nn.GELU(),
nn.Linear(dim * mult, dim, bias = False)
)
# 定义 Transformer 类,继承自 nn.Module
class Transformer(nn.Module):
# 初始化函数,接受维度、层数、头维度、头数和前馈网络倍数等参数
def __init__(
self,
dim,
*,
layers,
dim_head = 32,
heads = 8,
ff_mult = 4
):
# 调用父类的构造函数
super().__init__()
# 初始化一个空的神经网络模块列表
self.layers = nn.ModuleList([])
# 循环创建指定数量的层
for _ in range(layers):
# 向神经网络模块列表中添加一个包含注意力和前馈神经网络的模块列表
self.layers.append(nn.ModuleList([
Attention(dim = dim, dim_head = dim_head, heads = heads),
FeedForward(dim = dim, mult = ff_mult)
]))
# 初始化一个 LayerNorm 层
self.norm = nn.LayerNorm(dim)
def forward(self, x):
# 遍历每一层的注意力和前馈神经网络
for attn, ff in self.layers:
# 对输入进行注意力操作并加上原始输入
x = attn(x) + x
# 对输入进行前馈神经网络操作并加上原始输入
x = ff(x) + x
# 对最终结果进行 LayerNorm 操作
return self.norm(x)
# 定义 ViTEncDec 类,继承自 nn.Module
class ViTEncDec(nn.Module):
# 初始化函数,接受多个参数
def __init__(
self,
dim,
channels = 3,
layers = 4,
patch_size = 8,
dim_head = 32,
heads = 8,
ff_mult = 4
):
# 调用父类的初始化函数
super().__init__()
# 设置编码后的维度
self.encoded_dim = dim
# 设置补丁大小
self.patch_size = patch_size
# 计算输入维度
input_dim = channels * (patch_size ** 2)
# 定义编码器部分
self.encoder = nn.Sequential(
# 重排输入数据形状
Rearrange(\’b c (h p1) (w p2) -> b (h w) (p1 p2 c)\’, p1 = patch_size, p2 = patch_size),
# 线性层
nn.Linear(input_dim, dim),
# Transformer 模块
Transformer(
dim = dim,
dim_head = dim_head,
heads = heads,
ff_mult = ff_mult,
layers = layers
),
# 重排图像数据形状
RearrangeImage(),
# 重排输出数据形状
Rearrange(\’b h w c -> b c h w\’)
)
# 定义解码器部分
self.decoder = nn.Sequential(
# 重排输入数据形状
Rearrange(\’b c h w -> b (h w) c\’),
# Transformer 模块
Transformer(
dim = dim,
dim_head = dim_head,
heads = heads,
ff_mult = ff_mult,
layers = layers
),
# 线性层和激活函数
nn.Sequential(
nn.Linear(dim, dim * 4, bias = False),
nn.Tanh(),
nn.Linear(dim * 4, input_dim, bias = False),
),
# 重排图像数据形状
RearrangeImage(),
# 重排输出数据形状
Rearrange(\’b h w (p1 p2 c) -> b c (h p1) (w p2)\’, p1 = patch_size, p2 = patch_size)
)
# 获取编码后特征图的大小
def get_encoded_fmap_size(self, image_size):
return image_size // self.patch_size
# 返回解码器的最后一层
@property
def last_dec_layer(self):
return self.decoder[-3][-1].weight
# 编码函数
def encode(self, x):
return self.encoder(x)
# 解码函数
def decode(self, x):
return self.decoder(x)
# 定义 NullVQGanVAE 类,继承自 nn.Module
class NullVQGanVAE(nn.Module):
# 初始化函数��接受 channels 参数
def __init__(
self,
*,
channels
):
# 调用父类的初始化函数
super().__init__()
# 设置编码后的维度为 channels
self.encoded_dim = channels
# 设置层数为 0
self.layers = 0
# 获取编码后特征图的大小
def get_encoded_fmap_size(self, size):
return size
# 复制模型用于评估
def copy_for_eval(self):
return self
# 编码函数
def encode(self, x):
return x
# 解码函数
def decode(self, x):
return x
# 定义 VQGanVAE 类,继承自 nn.Module
class VQGanVAE(nn.Module):
# 初始化函数,接受多个参数
def __init__(
self,
*,
dim,
image_size,
channels = 3,
layers = 4,
l2_recon_loss = False,
use_hinge_loss = True,
vgg = None,
vq_codebook_dim = 256,
vq_codebook_size = 512,
vq_decay = 0.8,
vq_commitment_weight = 1.,
vq_kmeans_init = True,
vq_use_cosine_sim = True,
use_vgg_and_gan = True,
vae_type = \’resnet\’,
discr_layers = 4,
**kwargs
# 初始化函数,设置各种参数
):
# 调用父类的初始化函数
super().__init__()
# 将参数按照前缀分组,提取出以\’vq_\’开头的参数
vq_kwargs, kwargs = groupby_prefix_and_trim(\’vq_\’, kwargs)
# 将参数按照前缀分组,提取出以\’encdec_\’开头的参数
encdec_kwargs, kwargs = groupby_prefix_and_trim(\’encdec_\’, kwargs)
# 设置图像大小、通道数、VQ 编码簇大小
self.image_size = image_size
self.channels = channels
self.codebook_size = vq_codebook_size
# 根据 VAE 类型选择编码器解码器类
if vae_type == \’resnet\’:
enc_dec_klass = ResnetEncDec
elif vae_type == \’vit\’:
enc_dec_klass = ViTEncDec
else:
raise ValueError(f\'{vae_type} not valid\’)
# 初始化编码器解码器
self.enc_dec = enc_dec_klass(
dim = dim,
channels = channels,
layers = layers,
**encdec_kwargs
)
# 初始化 VQ 模块
self.vq = VQ(
dim = self.enc_dec.encoded_dim,
codebook_dim = vq_codebook_dim,
codebook_size = vq_codebook_size,
decay = vq_decay,
commitment_weight = vq_commitment_weight,
accept_image_fmap = True,
kmeans_init = vq_kmeans_init,
use_cosine_sim = vq_use_cosine_sim,
**vq_kwargs
)
# 设置重构损失函数
self.recon_loss_fn = F.mse_loss if l2_recon_loss else F.l1_loss
# 如果是灰度图像,则关闭 GAN 和感知损失
self.vgg = None
self.discr = None
self.use_vgg_and_gan = use_vgg_and_gan
if not use_vgg_and_gan:
return
# 初始化感知损失
if exists(vgg):
self.vgg = vgg
else:
self.vgg = torchvision.models.vgg16(pretrained = True)
self.vgg.classifier = nn.Sequential(*self.vgg.classifier[:-2])
# 初始化 GAN 相关损失
layer_mults = list(map(lambda t: 2 ** t, range(discr_layers)))
layer_dims = [dim * mult for mult in layer_mults]
dims = (dim, *layer_dims)
self.discr = Discriminator(dims = dims, channels = channels)
self.discr_loss = hinge_discr_loss if use_hinge_loss else bce_discr_loss
self.gen_loss = hinge_gen_loss if use_hinge_loss else bce_gen_loss
# 获取编码后的维度
@property
def encoded_dim(self):
return self.enc_dec.encoded_dim
# 获取编码后特征图的大小
def get_encoded_fmap_size(self, image_size):
return self.enc_dec.get_encoded_fmap_size(image_size)
# 复制模型用于评估
def copy_for_eval(self):
device = next(self.parameters()).device
vae_copy = copy.deepcopy(self.cpu())
if vae_copy.use_vgg_and_gan:
del vae_copy.discr
del vae_copy.vgg
vae_copy.eval()
return vae_copy.to(device)
# 获取模型状态字典
@remove_vgg
def state_dict(self, *args, **kwargs):
return super().state_dict(*args, **kwargs)
# 加载模型状态字典
@remove_vgg
def load_state_dict(self, *args, **kwargs):
return super().load_state_dict(*args, **kwargs)
# 获取编码簇
@property
def codebook(self):
return self.vq.codebook
# 编码
def encode(self, fmap):
fmap = self.enc_dec.encode(fmap)
return fmap
# 解码
def decode(self, fmap, return_indices_and_loss = False):
fmap, indices, commit_loss = self.vq(fmap)
fmap = self.enc_dec.decode(fmap)
if not return_indices_and_loss:
return fmap
return fmap, indices, commit_loss
# 前向传播
def forward(
self,
img,
return_loss = False,
return_discr_loss = False,
return_recons = False,
add_gradient_penalty = True
):
# 解构赋值,获取图像的批次、通道数、高度、宽度、设备信息
batch, channels, height, width, device = *img.shape, img.device
# 断言输入图像的高度和宽度与设定的图像大小相等
assert height == self.image_size and width == self.image_size, \’height and width of input image must be equal to {self.image_size}\’
# 断言输入图像的通道数与 VQGanVAE 中设定的通道数相等
assert channels == self.channels, \’number of channels on image or sketch is not equal to the channels set on this VQGanVAE\’
# 编码输入图像
fmap = self.encode(img)
# 解码编码后的特征图,并返回索引和损失
fmap, indices, commit_loss = self.decode(fmap, return_indices_and_loss = True)
if not return_loss and not return_discr_loss:
return fmap
# 断言只能返回自编码器损失或鉴别器损失,不能同时返回
assert return_loss ^ return_discr_loss, \’you should either return autoencoder loss or discriminator loss, but not both\’
# 是否返回鉴别器损失
if return_discr_loss:
# 断言鉴别器存在
assert exists(self.discr), \’discriminator must exist to train it\’
# 分离编码后的特征图,设置输入图像为需要梯度
fmap.detach_()
img.requires_grad_()
# 获取编码后特征图和输入图像的鉴别器输出
fmap_discr_logits, img_discr_logits = map(self.discr, (fmap, img))
# 计算鉴别器损失
discr_loss = self.discr_loss(fmap_discr_logits, img_discr_logits)
if add_gradient_penalty:
# 添加梯度惩罚项
gp = gradient_penalty(img, img_discr_logits)
loss = discr_loss + gp
if return_recons:
return loss, fmap
return loss
# 重构损失
recon_loss = self.recon_loss_fn(fmap, img)
# 若不使用 VGG 和 GAN
if not self.use_vgg_and_gan:
if return_recons:
return recon_loss, fmap
return recon_loss
# 感知损失
img_vgg_input = img
fmap_vgg_input = fmap
if img.shape[1] == 1:
# 处理灰度图像用于 VGG
img_vgg_input, fmap_vgg_input = map(lambda t: repeat(t, \’b 1 … -> b c …\’, c = 3), (img_vgg_input, fmap_vgg_input))
# 获取输入图像和重构图像的 VGG 特征
img_vgg_feats = self.vgg(img_vgg_input)
recon_vgg_feats = self.vgg(fmap_vgg_input)
perceptual_loss = F.mse_loss(img_vgg_feats, recon_vgg_feats)
# 生成器损失
gen_loss = self.gen_loss(self.discr(fmap))
# 计算自适应权重
last_dec_layer = self.enc_dec.last_dec_layer
norm_grad_wrt_gen_loss = grad_layer_wrt_loss(gen_loss, last_dec_layer).norm(p = 2)
norm_grad_wrt_perceptual_loss = grad_layer_wrt_loss(perceptual_loss, last_dec_layer).norm(p = 2)
adaptive_weight = safe_div(norm_grad_wrt_perceptual_loss, norm_grad_wrt_gen_loss)
adaptive_weight.clamp_(max = 1e4)
# 组合损失
loss = recon_loss + perceptual_loss + commit_loss + adaptive_weight * gen_loss
if return_recons:
return loss, fmap
return loss

.\\lucidrains\\DALLE2-pytorch\\dalle2_pytorch\\vqgan_vae_trainer.py

# 从 math 模块中导入 sqrt 函数
from math import sqrt
# 从 copy 模块中导入 copy 函数
import copy
# 从 random 模块中导入 choice 函数
from random import choice
# 从 pathlib 模块中导入 Path 类
from pathlib import Path
# 从 shutil 模块中导入 rmtree 函数
from shutil import rmtree
# 从 PIL 模块中导入 Image 类
from PIL import Image
# 导入 torch 库
import torch
# 从 torch 模块中导入 nn 模块
from torch import nn
# 从 torch.cuda.amp 模块中导入 autocast, GradScaler 函数
from torch.cuda.amp import autocast, GradScaler
# 从 torch.utils.data 模块中导入 Dataset, DataLoader, random_split 类
from torch.utils.data import Dataset, DataLoader, random_split
# 导入 torchvision.transforms 模块,并重命名为 T
import torchvision.transforms as T
# 从 torchvision.datasets 模块中导入 ImageFolder 类
from torchvision.datasets import ImageFolder
# 从 torchvision.utils 模块中导入 make_grid, save_image 函数
from torchvision.utils import make_grid, save_image
# 导入 einops 模块中的 rearrange 函数
from einops import rearrange
# 导入 dalle2_pytorch.vqgan_vae 模块中的 VQGanVAE 类
from dalle2_pytorch.vqgan_vae import VQGanVAE
# 导入 dalle2_pytorch.optimizer 模块中的 get_optimizer 函数
from dalle2_pytorch.optimizer import get_optimizer
# 导入 ema_pytorch 模块中的 EMA 类
from ema_pytorch import EMA
# helpers
# 定义函数 exists,判断值是否不为 None
def exists(val):
return val is not None
# 定义函数 noop,空函数,不执行任何操作
def noop(*args, **kwargs):
pass
# 定义函数 cycle,生成一个无限循环的数据生成器
def cycle(dl):
while True:
for data in dl:
yield data
# 定义函数 cast_tuple,将输入转换为元组类型
def cast_tuple(t):
return t if isinstance(t, (tuple, list)) else (t,)
# 定义函数 yes_or_no,询问用户问题并返回 True 或 False
def yes_or_no(question):
answer = input(f\'{question} (y/n) \’)
return answer.lower() in (\’yes\’, \’y\’)
# 定义函数 accum_log,累积日志信息
def accum_log(log, new_logs):
for key, new_value in new_logs.items():
old_value = log.get(key, 0.)
log[key] = old_value + new_value
return log
# classes
# 定义类 ImageDataset,继承自 Dataset 类
class ImageDataset(Dataset):
def __init__(
self,
folder,
image_size,
exts = [\’jpg\’, \’jpeg\’, \’png\’]
):
super().__init__()
self.folder = folder
self.image_size = image_size
# 获取指定文件夹下指定扩展名的所有文件路径
self.paths = [p for ext in exts for p in Path(f\'{folder}\’).glob(f\’**/*.{ext}\’)]
print(f\'{len(self.paths)} training samples found at {folder}\’)
# 定义数据转换操作
self.transform = T.Compose([
T.Lambda(lambda img: img.convert(\’RGB\’) if img.mode != \’RGB\’ else img),
T.Resize(image_size),
T.RandomHorizontalFlip(),
T.CenterCrop(image_size),
T.ToTensor()
])
# 返回数据集的长度
def __len__(self):
return len(self.paths)
# 获取指定索引处的数据
def __getitem__(self, index):
path = self.paths[index]
img = Image.open(path)
return self.transform(img)
# main trainer class
# 定义类 VQGanVAETrainer,继承自 nn.Module 类
class VQGanVAETrainer(nn.Module):
def __init__(
self,
vae,
*,
num_train_steps,
lr,
batch_size,
folder,
grad_accum_every,
wd = 0.,
save_results_every = 100,
save_model_every = 1000,
results_folder = \’./results\’,
valid_frac = 0.05,
random_split_seed = 42,
ema_beta = 0.995,
ema_update_after_step = 500,
ema_update_every = 10,
apply_grad_penalty_every = 4,
amp = False
):
# 调用父类的构造函数
super().__init__()
# 断言 vae 是 VQGanVAE 的实例
assert isinstance(vae, VQGanVAE), \’vae must be instance of VQGanVAE\’
# 获取 VAE 的图像大小
image_size = vae.image_size
# 设置 VAE 和 EMA_VAE
self.vae = vae
self.ema_vae = EMA(vae, update_after_step = ema_update_after_step, update_every = ema_update_every)
# 注册步数缓冲区
self.register_buffer(\’steps\’, torch.Tensor([0]))
# 设置训练步数、批量大小、梯度累积频率
self.num_train_steps = num_train_steps
self.batch_size = batch_size
self.grad_accum_every = grad_accum_every
# 获取所有参数、判别器参数、VAE 参数
all_parameters = set(vae.parameters())
discr_parameters = set(vae.discr.parameters())
vae_parameters = all_parameters – discr_parameters
# 获取优化器
self.optim = get_optimizer(vae_parameters, lr = lr, wd = wd)
self.discr_optim = get_optimizer(discr_parameters, lr = lr, wd = wd)
# 设置混合精度训练
self.amp = amp
self.scaler = GradScaler(enabled = amp)
self.discr_scaler = GradScaler(enabled = amp)
# 创建数据集
self.ds = ImageDataset(folder, image_size = image_size)
# 划分验证集
if valid_frac > 0:
train_size = int((1 – valid_frac) * len(self.ds))
valid_size = len(self.ds) – train_size
self.ds, self.valid_ds = random_split(self.ds, [train_size, valid_size], generator = torch.Generator().manual_seed(random_split_seed))
print(f\’training with dataset of {len(self.ds)} samples and validating with randomly splitted {len(self.valid_ds)} samples\’)
else:
self.valid_ds = self.ds
print(f\’training with shared training and valid dataset of {len(self.ds)} samples\’)
# 创建数据加载器
self.dl = cycle(DataLoader(
self.ds,
batch_size = batch_size,
shuffle = True
))
self.valid_dl = cycle(DataLoader(
self.valid_ds,
batch_size = batch_size,
shuffle = True
))
# 设置保存模型和结果的频率
self.save_model_every = save_model_every
self.save_results_every = save_results_every
# 设置应用梯度惩罚的频率
self.apply_grad_penalty_every = apply_grad_penalty_every
# 设置结果文件夹
self.results_folder = Path(results_folder)
# 如果结果文件夹中有文件且确认清除,则删除文件夹
if len([*self.results_folder.glob(\’**/*\’)]) > 0 and yes_or_no(\’do you want to clear previous experiment checkpoints and results?\’):
rmtree(str(self.results_folder))
# 创建结果文件夹
self.results_folder.mkdir(parents = True, exist_ok = True)
# 定义训练步骤函数
def train_step(self):
# 获取模型参数所在设备
device = next(self.vae.parameters()).device
# 获取当前步数
steps = int(self.steps.item())
# 是否应用梯度惩罚
apply_grad_penalty = not (steps % self.apply_grad_penalty_every)
# 设置 VAE 模型为训练模式
self.vae.train()
# 初始化日志字典
logs = {}
# 更新 VAE(生成器)
# 多次执行梯度累积
for _ in range(self.grad_accum_every):
# 获取下一个数据批次
img = next(self.dl)
img = img.to(device)
# 开启自动混合精度
with autocast(enabled = self.amp):
# 计算损失
loss = self.vae(
img,
return_loss = True,
apply_grad_penalty = apply_grad_penalty
)
# 反向传播并缩放损失
self.scaler.scale(loss / self.grad_accum_every).backward()
# 累积损失到日志中
accum_log(logs, {\’loss\’: loss.item() / self.grad_accum_every})
# 梯度更新
self.scaler.step(self.optim)
self.scaler.update()
self.optim.zero_grad()
# 更新鉴别器
if exists(self.vae.discr):
discr_loss = 0
for _ in range(self.grad_accum_every):
img = next(self.dl)
img = img.to(device)
with autocast(enabled = self.amp):
loss = self.vae(img, return_discr_loss = True)
self.discr_scaler.scale(loss / self.grad_accum_every).backward()
accum_log(logs, {\’discr_loss\’: loss.item() / self.grad_accum_every})
self.discr_scaler.step(self.discr_optim)
self.discr_scaler.update()
self.discr_optim.zero_grad()
# 打印日志
print(f\”{steps}: vae loss: {logs[\’loss\’]} – discr loss: {logs[\’discr_loss\’]}\”)
# 更新指数移动平均生成器
self.ema_vae.update()
# 定期采样结果
if not (steps % self.save_results_every):
for model, filename in ((self.ema_vae.ema_model, f\'{steps}.ema\’), (self.vae, str(steps))):
model.eval()
imgs = next(self.dl)
imgs = imgs.to(device)
recons = model(imgs)
nrows = int(sqrt(self.batch_size))
imgs_and_recons = torch.stack((imgs, recons), dim = 0)
imgs_and_recons = rearrange(imgs_and_recons, \’r b … -> (b r) …\’)
imgs_and_recons = imgs_and_recons.detach().cpu().float().clamp(0., 1.)
grid = make_grid(imgs_and_recons, nrow = 2, normalize = True, value_range = (0, 1))
logs[\’reconstructions\’] = grid
save_image(grid, str(self.results_folder / f\'{filename}.png\’))
print(f\'{steps}: saving to {str(self.results_folder)}\’)
# 定期保存模型
if not (steps % self.save_model_every):
state_dict = self.vae.state_dict()
model_path = str(self.results_folder / f\’vae.{steps}.pt\’)
torch.save(state_dict, model_path)
ema_state_dict = self.ema_vae.state_dict()
model_path = str(self.results_folder / f\’vae.{steps}.ema.pt\’)
torch.save(ema_state_dict, model_path)
print(f\'{steps}: saving model to {str(self.results_folder)}\’)
# 更新步数并返回日志
self.steps += 1
return logs
# 训练函数
def train(self, log_fn = noop):
# 获取模型参数所在设备
device = next(self.vae.parameters()).device
# 在训练步数未达到总训练步数前循环执行训练步骤
while self.steps < self.num_train_steps:
logs = self.train_step()
log_fn(logs)
# 训练完成
print(\’training complete\’)

.\\lucidrains\\DALLE2-pytorch\\dalle2_pytorch\\__init__.py

# 从dalle2_pytorch版本模块中导入版本号
from dalle2_pytorch.version import __version__
# 从dalle2_pytorch模块中导入DALLE2类、DiffusionPriorNetwork类、DiffusionPrior类、Unet类和Decoder类
from dalle2_pytorch.dalle2_pytorch import DALLE2, DiffusionPriorNetwork, DiffusionPrior, Unet, Decoder
# 从dalle2_pytorch模块中导入OpenAIClipAdapter类和OpenClipAdapter类
from dalle2_pytorch.dalle2_pytorch import OpenAIClipAdapter, OpenClipAdapter
# 从dalle2_pytorch模块中导入DecoderTrainer类和DiffusionPriorTrainer类
from dalle2_pytorch.trainer import DecoderTrainer, DiffusionPriorTrainer
# 从dalle2_pytorch模块中导入VQGanVAE类
from dalle2_pytorch.vqgan_vae import VQGanVAE
# 从x_clip模块中导入CLIP类
from x_clip import CLIP

Diffusion Prior

This readme serves as an introduction to the diffusion prior.

Intro

A properly trained prior will allow you to translate between two embedding spaces. If you know a priori that two embeddings are connected some way—then ability the translate between them could extremely helpful.

Motivation

Before we dive into the model, let’s look at a quick example of where the model may be helpful.

For demonstration purposes we will imagine that we wish to generate images from text using CLIP and a Decoder.

CLIP is a contrastive model that learns to maximize the cosine similarity between a given image and caption, however, there is no guarantee that these embeddings are in the same space. While the embeddings generated are close the image and text embeddings occupy two disjoint sets.

# Load Models
clip_model = clip.load(\”ViT-L/14\”)
decoder = Decoder(checkpoint=\”best.pth\”) # A decoder trained on CLIP Image embeddings
# Retrieve prompt from user and encode with CLIP
prompt = \”A corgi wearing sunglasses\”
tokenized_text = tokenize(prompt)
text_embedding = clip_model.encode_text(tokenized_text)
# Now, pass the text embedding to the decoder
predicted_image = decoder.sample(text_embedding)

Question: Can you spot the issue here?

Answer: We’re trying to generate an image from a text embedding!

Unfortunately, we run into the issue previously mentioned–the image embeddings and the text embeddings are not interchangeable! Now let’s look at a better solution

# Load Models
prior= Prior(checkpoint=\”prior.pth\”) # A decoder trained to go from: text-> clip text emb -> clip img emb
decoder = Decoder(checkpoint=\”decoder.pth\”) # A decoder trained on CLIP Image embeddings
# Retrieve prompt from user and encode with a prior
prompt = \”A corgi wearing sunglasses\”
tokenized_text = tokenize(prompt)
text_embedding = prior.sample(tokenized_text) # <– now we get an embedding in the same space as images!
# Now, pass the predicted image embedding to the decoder
predicted_image = decoder.sample(text_embedding)

With the prior we are able to successfully generate embeddings within CLIP’s image space! For this reason, the decoder will perform much better as it receives input that is much closer to its training data.

You may be asking yourself the following question:

“Why don’t you just train the decoder on clip text embeddings instead of image embeddings?”

OpenAI covers this topic in their DALLE-2 paper. The TL;DR is “it doesn’t work as well as decoders trained on image embeddings”…also…its just an example 😄

Usage

To utilize a pre-trained prior, it’s quite simple.

Loading Checkpoints

import torch
from dalle2_pytorch import DiffusionPrior, DiffusionPriorNetwork, OpenAIClipAdapter
from dalle2_pytorch.trainer import DiffusionPriorTrainer
def load_diffusion_model(dprior_path):
prior_network = DiffusionPriorNetwork(
dim=768,
depth=24,
dim_head=64,
heads=32,
normformer=True,
attn_dropout=5e-2,
ff_dropout=5e-2,
num_time_embeds=1,
num_image_embeds=1,
num_text_embeds=1,
num_timesteps=1000,
ff_mult=4
)
diffusion_prior = DiffusionPrior(
net=prior_network,
clip=OpenAIClipAdapter(\”ViT-L/14\”),
image_embed_dim=768,
timesteps=1000,
cond_drop_prob=0.1,
loss_type=\”l2\”,
condition_on_text_encodings=True,
)
trainer = DiffusionPriorTrainer(
diffusion_prior=diffusion_prior,
lr=1.1e-4,
wd=6.02e-2,
max_grad_norm=0.5,
amp=False,
group_wd_params=True,
use_ema=True,
device=device,
accelerator=None,
)
trainer.load(dprior_path)
return trainer

Here we instantiate a model matches the configuration it was trained with, and then load the weights (just like any other PyTorch model!)

Sampling

Once we have a pre-trained model, generating embeddings is quite simple!

# tokenize the text
tokenized_text = clip.tokenize(\”<your amazing prompt>\”)
# predict an embedding
predicted_embedding = prior.sample(tokenized_text, n_samples_per_batch=2, cond_scale=1.0)

The resulting tensor returned from .sample() is of the same shape as your training data along the non-batch dimension(s). For example, a prior trained on ViT-L/14 embeddings will predict an embedding of shape (1, 768).

For CLIP priors, this is quite handy as it means that you can use prior.sample(tokenizer_text) as a drop in replacement for clip.encode_text().

Some things to note:

It is possible to specify the number of embeddings to sample from (the default suggested by OpenAI is n=2). Put simply, the idea here is that you avoid getting unlucky with a bad embedding generation by creating two; and selecting the one with the higher cosine similarity with the prompt.You may specify a higher conditioning scale than the default (1.0). It is unclear whether OpenAI uses a higher value for the prior specifically, or only on the decoder. Local testing has shown poor results with anything higher than 1.0 but ymmv.

Training

Overview

Training the prior is a relatively straightforward process thanks to the Trainer base class. The major step that is required of you is preparing a dataset in the format that EmbeddingReader expects. Having pre-computed embeddings massively increases training efficiency and is generally recommended as you will likely benefit from having them on hand for other tasks as well. Once you have a dataset, you are ready to move onto configuration

Dataset

To train the prior, it is highly recommended to use precomputed embeddings for the images. To obtain these for a custom dataset, you can leverage img2datset to pull images from a list of URLs and clip_retrieval for generating the actual embeddings that can be used in the prior’s dataloader.

Configuration

The configuration file allows for you to easily track and reproduce experiments. It is a simple JSON file that will specify the architecture, dataset, and training parameters. For more information and specifics please see the configuration README.

Distributed Training

If you would like to train in a distributed manner we have opted to leverage huggingface’ new Accelerate library. HFA makes it extremely simple to distribute work across multiple GPU’s and nodes. All that is required of you is to follow the simple CLI configuration tool more information here.

Evaluation

There are a variety of metrics available to you when training the prior. You can read a brief description of each in the table below:

MetricDescriptionCommentsOnline Model ValidationThe validation loss associated with your online model.Ideally validation loss will be as low as possible. Using L2 loss, values as low as 0.1 and lower are possible after around 1 Billion samples seen.EMA ValidationThis metric measures the validation loss associated with your EMA model.This will likely lag behind your “online” model’s validation loss, but should outperform in the long-term.Baseline SimilarityBaseline similarity refers to the similarity between your dataset’s prompts and associated image embeddings. This will serve as a guide for your prior’s performance in cosine similarity.Generally 0.3 is considered a good cosine similarity for caption similarity.Similarity With Original ImageThis metric will measure the cosine similarity between your prior’s predicted image embedding and the actual image that the caption was associated with. This is useful for determining wether your prior is generating images with the right contents.Values around 0.75+ are obtainable. This metric should improve rapidly in the early stages of training and plateau with diminishing increases over time. If it takes hundreds of millions of samples to reach above 0.5/0.6 similarity–then you likely are suffering from some kind of training error or inefficiency (i.e. not using EMA)Difference From Baseline SimilaritySometimes its useful to visualize a metric in another light. This metric will show you how your prior’s predicted image embeddings match up with the baseline similarity measured in your dataset.This value should float around 0.0 with some room for variation. After a billion samples seen, values are within 0.01+/- of 0.0. If this climbs to high, (~>0.02) then this may be a sign that your model is overfitting somehow.Similarity With TextThis metric is your bread and butter cosine similarity between the predicted image embedding and the original caption given to the prior. Monitoring this metric will be on of your main focuses and is probably the second most important behind your loss.As mentioned, this value should be close to baseline similarity. We have observed early rapid increase with diminishing returns as the prior learns to generate valid image embeddings. If this value increases too far beyond the baseline similarity–it could be an indication that your model is overfitting.Similarity With Unrelated CaptionThis metric will attempt to exposed an overfit prior by feeding it arbitrary prompts (from your dataset) and then measure the similarity of this predicted embedding with some other image.Early on we found that a poorly trained/modeled prior could effectively fool CLIP into believing that the cosine similarity between two images were high (when in fact the caption and image were completely unrelated). With this in mind–a low value is ideal, anything below 0.1 is probably safe.

Launching the script

Now that you’ve done all the prep it’s time for the easy part! 🚀

To actually launch the script, you will either use accelerate launch train_diffusion_prior.py –config_path <path to your config> to launch with distributed training & huggingface accelerate or python train_diffusion_prior.py if you would like to train on your gpu/cpu without huggingface accelerate.

Checkpointing

Checkpoints will be saved to the directory specified in your configuration file.

Additionally, a final checkpoint is saved before running the test split. This file will be saved to the same directory and titled “latest.pth”. This is to avoid problems where your save_every configuration does not overlap with the number of steps required to do a complete pass through the data.

Things To Keep In Mind

The prior has not been trained for tasks other than the traditional CLIP embedding translation…at least yet.

As we finalize the replication of unCLIP, there will almost assuredly be experiments attempting to apply the prior network to other tasks.

With that in mind, you are more or less a pioneer in embedding-translation if you are reading this and attempting something you don’t see documentation for!

DALL-E 2 – Pytorch

Implementation of DALL-E 2, OpenAI’s updated text-to-image synthesis neural network, in Pytorch.

Yannic Kilcher summary | AssemblyAI explainer

The main novelty seems to be an extra layer of indirection with the prior network (whether it is an autoregressive transformer or a diffusion network), which predicts an image embedding based on the text embedding from CLIP. Specifically, this repository will only build out the diffusion prior network, as it is the best performing variant (but which incidentally involves a causal transformer as the denoising network 😂)

This model is SOTA for text-to-image for now.

Please join if you are interested in helping out with the replication with the LAION community | Yannic Interview

As of 5/23/22, it is no longer SOTA. SOTA will be here. Jax versions as well as text-to-video project will be shifted towards the Imagen architecture, as it is way simpler.

Status

A research group has used the code in this repository to train a functional diffusion prior for their CLIP generations. Will share their work once they release their preprint. This, and Katherine’s own experiments, validate OpenAI’s finding that the extra prior increases variety of generations.
Decoder is now verified working for unconditional generation on my experimental setup for Oxford flowers. 2 researchers have also confirmed Decoder is working for them.

ongoing at 21k steps

Justin Pinkney successfully trained the diffusion prior in the repository for his CLIP to Stylegan2 text-to-image application
Romain has scaled up training to 800 GPUs with the available scripts without any issues

Pre-Trained Models

LAION is training prior models. Checkpoints are available on 🤗huggingface and the training statistics are available on 🐝WANDB.Decoder – In-progress test run 🚧Decoder – Another test run with sparse attentionDALL-E 2 🚧 – DALL-E 2 Laion repository

Appreciation

This library would not have gotten to this working state without the help of

Zion for the distributed training code for the diffusion priorAidan for the distributed training code for the decoder as well as the dataloadersKumar for working on the initial diffusion training scriptRomain for the pull request reviews and project managementHe Cao and xiankgx for the Q&A and for identifying of critical bugsMarunine for identifying issues with resizing of the low resolution conditioner, when training the upsampler, in addition to various other bug fixesMalumaDev for proposing the use of pixel shuffle upsampler for fixing checkboard artifactsKatherine for her adviceStability AI for the generous sponsorship🤗 Huggingface and in particular Sylvain for the Accelerate libraryAlex for einops, indispensable tool for tensor manipulation
… and many others. Thank you! 🙏

Install

$ pip install dalle2-pytorch

Usage

To train DALLE-2 is a 3 step process, with the training of CLIP being the most important

To train CLIP, you can either use x-clip package, or join the LAION discord, where a lot of replication efforts are already underway.

This repository will demonstrate integration with x-clip for starters

import torch
from dalle2_pytorch import CLIP
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 1,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 1,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8,
use_all_token_embeds = True, # whether to use fine-grained contrastive learning (FILIP)
decoupled_contrastive_learning = True, # use decoupled contrastive learning (DCL) objective function, removing positive pairs from the denominator of the InfoNCE loss (CLOOB + DCL)
extra_latent_projection = True, # whether to use separate projections for text-to-image vs image-to-text comparisons (CLOOB)
use_visual_ssl = True, # whether to do self supervised learning on images
visual_ssl_type = \’simclr\’, # can be either \’simclr\’ or \’simsiam\’, depending on using DeCLIP or SLIP
use_mlm = False, # use masked language learning (MLM) on text (DeCLIP)
text_ssl_loss_weight = 0.05, # weight for text MLM loss
image_ssl_loss_weight = 0.05 # weight for image self-supervised learning loss
).cuda()
# mock data
text = torch.randint(0, 49408, (4, 256)).cuda()
images = torch.randn(4, 3, 256, 256).cuda()
# train
loss = clip(
text,
images,
return_loss = True # needs to be set to True to return contrastive loss
)
loss.backward()
# do the above with as many texts and images as possible in a loop

Then, you will need to train the decoder, which learns to generate images based on the image embedding coming from the trained CLIP above

import torch
from dalle2_pytorch import Unet, Decoder, CLIP
# trained clip from step 1
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 1,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 1,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8
).cuda()
# unet for the decoder
unet = Unet(
dim = 128,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults=(1, 2, 4, 8)
).cuda()
# decoder, which contains the unet and clip
decoder = Decoder(
unet = unet,
clip = clip,
timesteps = 100,
image_cond_drop_prob = 0.1,
text_cond_drop_prob = 0.5
).cuda()
# mock images (get a lot of this)
images = torch.randn(4, 3, 256, 256).cuda()
# feed images into decoder
loss = decoder(images)
loss.backward()
# do the above for many many many many steps
# then it will learn to generate images based on the CLIP image embeddings

Finally, the main contribution of the paper. The repository offers the diffusion prior network. It takes the CLIP text embeddings and tries to generate the CLIP image embeddings. Again, you will need the trained CLIP from the first step

import torch
from dalle2_pytorch import DiffusionPriorNetwork, DiffusionPrior, CLIP
# get trained CLIP from step one
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 6,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 6,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8,
).cuda()
# setup prior network, which contains an autoregressive transformer
prior_network = DiffusionPriorNetwork(
dim = 512,
depth = 6,
dim_head = 64,
heads = 8
).cuda()
# diffusion prior network, which contains the CLIP and network (with transformer) above
diffusion_prior = DiffusionPrior(
net = prior_network,
clip = clip,
timesteps = 100,
cond_drop_prob = 0.2
).cuda()
# mock data
text = torch.randint(0, 49408, (4, 256)).cuda()
images = torch.randn(4, 3, 256, 256).cuda()
# feed text and images into diffusion prior network
loss = diffusion_prior(text, images)
loss.backward()
# do the above for many many many steps
# now the diffusion prior can generate image embeddings from the text embeddings

In the paper, they actually used a recently discovered technique, from Jonathan Ho himself (original author of DDPMs, the core technique used in DALL-E v2) for high resolution image synthesis.

This can easily be used within this framework as so

import torch
from dalle2_pytorch import Unet, Decoder, CLIP
# trained clip from step 1
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 6,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 6,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8
).cuda()
# 2 unets for the decoder (a la cascading DDPM)
unet1 = Unet(
dim = 32,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults = (1, 2, 4, 8)
).cuda()
unet2 = Unet(
dim = 32,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults = (1, 2, 4, 8, 16)
).cuda()
# decoder, which contains the unet(s) and clip
decoder = Decoder(
clip = clip,
unet = (unet1, unet2), # insert both unets in order of low resolution to highest resolution (you can have as many stages as you want here)
image_sizes = (256, 512), # resolutions, 256 for first unet, 512 for second. these must be unique and in ascending order (matches with the unets passed in)
timesteps = 1000,
image_cond_drop_prob = 0.1,
text_cond_drop_prob = 0.5
).cuda()
# mock images (get a lot of this)
images = torch.randn(4, 3, 512, 512).cuda()
# feed images into decoder, specifying which unet you want to train
# each unet can be trained separately, which is one of the benefits of the cascading DDPM scheme
loss = decoder(images, unet_number = 1)
loss.backward()
loss = decoder(images, unet_number = 2)
loss.backward()
# do the above for many steps for both unets

Finally, to generate the DALL-E2 images from text. Insert the trained DiffusionPrior as well as the Decoder (which wraps CLIP, the causal transformer, and unet(s))

from dalle2_pytorch import DALLE2
dalle2 = DALLE2(
prior = diffusion_prior,
decoder = decoder
)
# send the text as a string if you want to use the simple tokenizer from DALLE v1
# or you can do it as token ids, if you have your own tokenizer
texts = [\’glistening morning dew on a flower petal\’]
images = dalle2(texts) # (1, 3, 256, 256)

That’s it!

Let’s see the whole script below

import torch
from dalle2_pytorch import DALLE2, DiffusionPriorNetwork, DiffusionPrior, Unet, Decoder, CLIP
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 6,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 6,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8
).cuda()
# mock data
text = torch.randint(0, 49408, (4, 256)).cuda()
images = torch.randn(4, 3, 256, 256).cuda()
# train
loss = clip(
text,
images,
return_loss = True
)
loss.backward()
# do above for many steps …
# prior networks (with transformer)
prior_network = DiffusionPriorNetwork(
dim = 512,
depth = 6,
dim_head = 64,
heads = 8
).cuda()
diffusion_prior = DiffusionPrior(
net = prior_network,
clip = clip,
timesteps = 1000,
sample_timesteps = 64,
cond_drop_prob = 0.2
).cuda()
loss = diffusion_prior(text, images)
loss.backward()
# do above for many steps …
# decoder (with unet)
unet1 = Unet(
dim = 128,
image_embed_dim = 512,
text_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults=(1, 2, 4, 8),
cond_on_text_encodings = True # set to True for any unets that need to be conditioned on text encodings
).cuda()
unet2 = Unet(
dim = 16,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults = (1, 2, 4, 8, 16)
).cuda()
decoder = Decoder(
unet = (unet1, unet2),
image_sizes = (128, 256),
clip = clip,
timesteps = 100,
image_cond_drop_prob = 0.1,
text_cond_drop_prob = 0.5
).cuda()
for unet_number in (1, 2):
loss = decoder(images, text = text, unet_number = unet_number) # this can optionally be decoder(images, text) if you wish to condition on the text encodings as well, though it was hinted in the paper it didn\’t do much
loss.backward()
# do above for many steps
dalle2 = DALLE2(
prior = diffusion_prior,
decoder = decoder
)
images = dalle2(
[\’cute puppy chasing after a squirrel\’],
cond_scale = 2. # classifier free guidance strength (> 1 would strengthen the condition)
)
# save your image (in this example, of size 256×256)

Everything in this readme should run without error

You can also train the decoder on images of greater than the size (say 512×512) at which CLIP was trained (256×256). The images will be resized to CLIP image resolution for the image embeddings

For the layperson, no worries, training will all be automated into a CLI tool, at least for small scale training.

Training on Preprocessed CLIP Embeddings

It is likely, when scaling up, that you would first preprocess your images and text into corresponding embeddings before training the prior network. You can do so easily by simply passing in image_embed, text_embed, and optionally text_encodings

Working example below

import torch
from dalle2_pytorch import DiffusionPriorNetwork, DiffusionPrior, CLIP
# get trained CLIP from step one
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 6,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 6,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8,
).cuda()
# setup prior network, which contains an autoregressive transformer
prior_network = DiffusionPriorNetwork(
dim = 512,
depth = 6,
dim_head = 64,
heads = 8
).cuda()
# diffusion prior network, which contains the CLIP and network (with transformer) above
diffusion_prior = DiffusionPrior(
net = prior_network,
clip = clip,
timesteps = 100,
cond_drop_prob = 0.2,
condition_on_text_encodings = False # this probably should be true, but just to get Laion started
).cuda()
# mock data
text = torch.randint(0, 49408, (4, 256)).cuda()
images = torch.randn(4, 3, 256, 256).cuda()
# precompute the text and image embeddings
# here using the diffusion prior class, but could be done with CLIP alone
clip_image_embeds = diffusion_prior.clip.embed_image(images).image_embed
clip_text_embeds = diffusion_prior.clip.embed_text(text).text_embed
# feed text and images into diffusion prior network
loss = diffusion_prior(
text_embed = clip_text_embeds,
image_embed = clip_image_embeds
)
loss.backward()
# do the above for many many many steps
# now the diffusion prior can generate image embeddings from the text embeddings

You can also completely go CLIP-less, in which case you will need to pass in the image_embed_dim into the DiffusionPrior on initialization

import torch
from dalle2_pytorch import DiffusionPriorNetwork, DiffusionPrior
# setup prior network, which contains an autoregressive transformer
prior_network = DiffusionPriorNetwork(
dim = 512,
depth = 6,
dim_head = 64,
heads = 8
).cuda()
# diffusion prior network, which contains the CLIP and network (with transformer) above
diffusion_prior = DiffusionPrior(
net = prior_network,
image_embed_dim = 512, # this needs to be set
timesteps = 100,
cond_drop_prob = 0.2,
condition_on_text_encodings = False # this probably should be true, but just to get Laion started
).cuda()
# mock data
text = torch.randint(0, 49408, (4, 256)).cuda()
images = torch.randn(4, 3, 256, 256).cuda()
# precompute the text and image embeddings
# here using the diffusion prior class, but could be done with CLIP alone
clip_image_embeds = torch.randn(4, 512).cuda()
clip_text_embeds = torch.randn(4, 512).cuda()
# feed text and images into diffusion prior network
loss = diffusion_prior(
text_embed = clip_text_embeds,
image_embed = clip_image_embeds
)
loss.backward()
# do the above for many many many steps
# now the diffusion prior can generate image embeddings from the text embeddings

OpenAI CLIP

Although there is the possibility they are using an unreleased, more powerful CLIP, you can use one of the released ones, if you do not wish to train your own CLIP from scratch. This will also allow the community to more quickly validate the conclusions of the paper.

To use a pretrained OpenAI CLIP, simply import OpenAIClipAdapter and pass it into the DiffusionPrior or Decoder like so

import torch
from dalle2_pytorch import DALLE2, DiffusionPriorNetwork, DiffusionPrior, Unet, Decoder, OpenAIClipAdapter
# openai pretrained clip – defaults to ViT-B/32
clip = OpenAIClipAdapter()
# mock data
text = torch.randint(0, 49408, (4, 256)).cuda()
images = torch.randn(4, 3, 256, 256).cuda()
# prior networks (with transformer)
prior_network = DiffusionPriorNetwork(
dim = 512,
depth = 6,
dim_head = 64,
heads = 8
).cuda()
diffusion_prior = DiffusionPrior(
net = prior_network,
clip = clip,
timesteps = 100,
cond_drop_prob = 0.2
).cuda()
loss = diffusion_prior(text, images)
loss.backward()
# do above for many steps …
# decoder (with unet)
unet1 = Unet(
dim = 128,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults=(1, 2, 4, 8),
text_embed_dim = 512,
cond_on_text_encodings = True # set to True for any unets that need to be conditioned on text encodings (ex. first unet in cascade)
).cuda()
unet2 = Unet(
dim = 16,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults = (1, 2, 4, 8, 16)
).cuda()
decoder = Decoder(
unet = (unet1, unet2),
image_sizes = (128, 256),
clip = clip,
timesteps = 1000,
sample_timesteps = (250, 27),
image_cond_drop_prob = 0.1,
text_cond_drop_prob = 0.5
).cuda()
for unet_number in (1, 2):
loss = decoder(images, text = text, unet_number = unet_number) # this can optionally be decoder(images, text) if you wish to condition on the text encodings as well, though it was hinted in the paper it didn\’t do much
loss.backward()
# do above for many steps
dalle2 = DALLE2(
prior = diffusion_prior,
decoder = decoder
)
images = dalle2(
[\’a butterfly trying to escape a tornado\’],
cond_scale = 2. # classifier free guidance strength (> 1 would strengthen the condition)
)
# save your image (in this example, of size 256×256)

Alternatively, you can also use Open Clip

$ pip install open-clip-torch

Ex. using the SOTA Open Clip model trained by Romain

from dalle2_pytorch import OpenClipAdapter
clip = OpenClipAdapter(\’ViT-H/14\’)

Now you’ll just have to worry about training the Prior and the Decoder!

Inpainting

Inpainting is also built into the Decoder. You simply have to pass in the inpaint_image and inpaint_mask (boolean tensor where True indicates which regions of the inpaint image to keep)

This repository uses the formulation put forth by Lugmayr et al. in Repaint

import torch
from dalle2_pytorch import Unet, Decoder, CLIP
# trained clip from step 1
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 6,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 6,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8
).cuda()
# 2 unets for the decoder (a la cascading DDPM)
unet = Unet(
dim = 16,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults = (1, 1, 1, 1)
).cuda()
# decoder, which contains the unet(s) and clip
decoder = Decoder(
clip = clip,
unet = (unet,), # insert both unets in order of low resolution to highest resolution (you can have as many stages as you want here)
image_sizes = (256,), # resolutions, 256 for first unet, 512 for second. these must be unique and in ascending order (matches with the unets passed in)
timesteps = 1000,
image_cond_drop_prob = 0.1,
text_cond_drop_prob = 0.5
).cuda()
# mock images (get a lot of this)
images = torch.randn(4, 3, 256, 256).cuda()
# feed images into decoder, specifying which unet you want to train
# each unet can be trained separately, which is one of the benefits of the cascading DDPM scheme
loss = decoder(images, unet_number = 1)
loss.backward()
# do the above for many steps for both unets
mock_image_embed = torch.randn(1, 512).cuda()
# then to do inpainting
inpaint_image = torch.randn(1, 3, 256, 256).cuda() # (batch, channels, height, width)
inpaint_mask = torch.ones(1, 256, 256).bool().cuda() # (batch, height, width)
inpainted_images = decoder.sample(
image_embed = mock_image_embed,
inpaint_image = inpaint_image, # just pass in the inpaint image
inpaint_mask = inpaint_mask # and the mask
)
inpainted_images.shape # (1, 3, 256, 256)

Experimental

DALL-E2 with Latent Diffusion

This repository decides to take the next step and offer DALL-E v2 combined with latent diffusion, from Rombach et al.

You can use it as follows. Latent diffusion can be limited to just the first U-Net in the cascade, or to any number you wish.

The repository also comes equipped with all the necessary settings to recreate ViT-VQGan from the Improved VQGans paper. Furthermore, the vector quantization library also comes equipped to do residual or multi-headed quantization, which I believe will give an even further boost in performance to the autoencoder.

import torch
from dalle2_pytorch import Unet, Decoder, CLIP, VQGanVAE
# trained clip from step 1
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 1,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 1,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8
)
# 3 unets for the decoder (a la cascading DDPM)
# first two unets are doing latent diffusion
# vqgan-vae must be trained beforehand
vae1 = VQGanVAE(
dim = 32,
image_size = 256,
layers = 3,
layer_mults = (1, 2, 4)
)
vae2 = VQGanVAE(
dim = 32,
image_size = 512,
layers = 3,
layer_mults = (1, 2, 4)
)
unet1 = Unet(
dim = 32,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
sparse_attn = True,
sparse_attn_window = 2,
dim_mults = (1, 2, 4, 8)
)
unet2 = Unet(
dim = 32,
image_embed_dim = 512,
channels = 3,
dim_mults = (1, 2, 4, 8, 16),
cond_on_image_embeds = True,
cond_on_text_encodings = False
)
unet3 = Unet(
dim = 32,
image_embed_dim = 512,
channels = 3,
dim_mults = (1, 2, 4, 8, 16),
cond_on_image_embeds = True,
cond_on_text_encodings = False,
attend_at_middle = False
)
# decoder, which contains the unet(s) and clip
decoder = Decoder(
clip = clip,
vae = (vae1, vae2), # latent diffusion for unet1 (vae1) and unet2 (vae2), but not for the last unet3
unet = (unet1, unet2, unet3), # insert unets in order of low resolution to highest resolution (you can have as many stages as you want here)
image_sizes = (256, 512, 1024), # resolutions, 256 for first unet, 512 for second, 1024 for third
timesteps = 100,
image_cond_drop_prob = 0.1,
text_cond_drop_prob = 0.5
).cuda()
# mock images (get a lot of this)
images = torch.randn(1, 3, 1024, 1024).cuda()
# feed images into decoder, specifying which unet you want to train
# each unet can be trained separately, which is one of the benefits of the cascading DDPM scheme
with decoder.one_unet_in_gpu(1):
loss = decoder(images, unet_number = 1)
loss.backward()
with decoder.one_unet_in_gpu(2):
loss = decoder(images, unet_number = 2)
loss.backward()
with decoder.one_unet_in_gpu(3):
loss = decoder(images, unet_number = 3)
loss.backward()
# do the above for many steps for both unets
# then it will learn to generate images based on the CLIP image embeddings
# chaining the unets from lowest resolution to highest resolution (thus cascading)
mock_image_embed = torch.randn(1, 512).cuda()
images = decoder.sample(mock_image_embed) # (1, 3, 1024, 1024)

Training wrapper

Decoder Training

Training the Decoder may be confusing, as one needs to keep track of an optimizer for each of the Unet(s) separately. Each Unet will also need its own corresponding exponential moving average. The DecoderTrainer hopes to make this simple, as shown below

import torch
from dalle2_pytorch import DALLE2, Unet, Decoder, CLIP, DecoderTrainer
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 6,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 6,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8
).cuda()
# mock data
text = torch.randint(0, 49408, (32, 256)).cuda()
images = torch.randn(32, 3, 256, 256).cuda()
# decoder (with unet)
unet1 = Unet(
dim = 128,
image_embed_dim = 512,
text_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults=(1, 2, 4, 8),
cond_on_text_encodings = True,
).cuda()
unet2 = Unet(
dim = 16,
image_embed_dim = 512,
cond_dim = 128,
channels = 3,
dim_mults = (1, 2, 4, 8, 16),
).cuda()
decoder = Decoder(
unet = (unet1, unet2),
image_sizes = (128, 256),
clip = clip,
timesteps = 1000
).cuda()
decoder_trainer = DecoderTrainer(
decoder,
lr = 3e-4,
wd = 1e-2,
ema_beta = 0.99,
ema_update_after_step = 1000,
ema_update_every = 10,
)
for unet_number in (1, 2):
loss = decoder_trainer(
images,
text = text,
unet_number = unet_number, # which unet to train on
max_batch_size = 4 # gradient accumulation – this sets the maximum batch size in which to do forward and backwards pass – for this example 32 / 4 == 8 times
)
decoder_trainer.update(unet_number) # update the specific unet as well as its exponential moving average
# after much training
# you can sample from the exponentially moving averaged unets as so
mock_image_embed = torch.randn(32, 512).cuda()
images = decoder_trainer.sample(image_embed = mock_image_embed, text = text) # (4, 3, 256, 256)

Diffusion Prior Training

Similarly, one can use the DiffusionPriorTrainer to automatically instantiate and keep track of an exponential moving averaged prior.

import torch
from dalle2_pytorch import DALLE2, DiffusionPriorNetwork, DiffusionPrior, DiffusionPriorTrainer, Unet, Decoder, CLIP
clip = CLIP(
dim_text = 512,
dim_image = 512,
dim_latent = 512,
num_text_tokens = 49408,
text_enc_depth = 6,
text_seq_len = 256,
text_heads = 8,
visual_enc_depth = 6,
visual_image_size = 256,
visual_patch_size = 32,
visual_heads = 8
).cuda()
# mock data
text = torch.randint(0, 49408, (512, 256)).cuda()
images = torch.randn(512, 3, 256, 256).cuda()
# prior networks (with transformer)
prior_network = DiffusionPriorNetwork(
dim = 512,
depth = 6,
dim_head = 64,
heads = 8
).cuda()
diffusion_prior = DiffusionPrior(
net = prior_network,
clip = clip,
timesteps = 100,
cond_drop_prob = 0.2
).cuda()
diffusion_prior_trainer = DiffusionPriorTrainer(
diffusion_prior,
lr = 3e-4,
wd = 1e-2,
ema_beta = 0.99,
ema_update_after_step = 1000,
ema_update_every = 10,
)
loss = diffusion_prior_trainer(text, images, max_batch_size = 4)
diffusion_prior_trainer.update() # this will update the optimizer as well as the exponential moving averaged diffusion prior
# after much of the above three lines in a loop
# you can sample from the exponential moving average of the diffusion prior identically to how you do so for DiffusionPrior
image_embeds = diffusion_prior_trainer.sample(text, max_batch_size = 4) # (512, 512) – exponential moving averaged image embeddings

Bonus

Unconditional Training

The repository also contains the means to train unconditional DDPM model, or even cascading DDPMs. You simply have to set unconditional = True in the Decoder

ex.

import torch
from dalle2_pytorch import Unet, Decoder, DecoderTrainer
# unet for the cascading ddpm
unet1 = Unet(
dim = 128,
dim_mults=(1, 2, 4, 8)
).cuda()
unet2 = Unet(
dim = 32,
dim_mults = (1, 2, 4, 8, 16)
).cuda()
# decoder, which contains the unets
decoder = Decoder(
unet = (unet1, unet2),
image_sizes = (256, 512), # first unet up to 256px, then second to 512px
timesteps = 1000,
unconditional = True
).cuda()
# decoder trainer
decoder_trainer = DecoderTrainer(decoder)
# images (get a lot of this)
images = torch.randn(1, 3, 512, 512).cuda()
# feed images into decoder
for i in (1, 2):
loss = decoder_trainer(images, unet_number = i)
decoder_trainer.update(unet_number = i)
# do the above for many many many many images
# then it will learn to generate images
images = decoder_trainer.sample(batch_size = 36, max_batch_size = 4) # (36, 3, 512, 512)

Dataloaders

Decoder Dataloaders

In order to make loading data simple and efficient, we include some general dataloaders that can be used to train portions of the network.

Decoder: Image Embedding Dataset

When training the decoder (and up samplers if training together) in isolation, you will need to load images and corresponding image embeddings. This dataset can read two similar types of datasets. First, it can read a webdataset that contains .jpg and .npy files in the .tars that contain the images and associated image embeddings respectively. Alternatively, you can also specify a source for the embeddings outside of the webdataset. In this case, the path to the embeddings should contain .npy files with the same shard numbers as the webdataset and there should be a correspondence between the filename of the .jpg and the index of the embedding in the .npy. So, for example, 0001.tar from the webdataset with image 00010509.jpg (the first 4 digits are the shard number and the last 4 are the index) in it should be paralleled by a img_emb_0001.npy which contains a NumPy array with the embedding at index 509.

Generating a dataset of this type:

Use img2dataset to generate a webdataset.Use clip-retrieval to convert the images to embeddings.Use embedding-dataset-reordering to reorder the embeddings into the expected format.
Usage:

from dalle2_pytorch.dataloaders import ImageEmbeddingDataset, create_image_embedding_dataloader
# Create a dataloader directly.
dataloader = create_image_embedding_dataloader(
tar_url=\”/path/or/url/to/webdataset/{0000..9999}.tar\”, # Uses bracket expanding notation. This specifies to read all tars from 0000.tar to 9999.tar
embeddings_url=\”path/or/url/to/embeddings/folder\”, # Included if .npy files are not in webdataset. Left out or set to None otherwise
num_workers=4,
batch_size=32,
shard_width=4, # If a file in the webdataset shard 3 is named 0003039.jpg, we know the shard width is 4 and the last three digits are the index
shuffle_num=200, # Does a shuffle of the data with a buffer size of 200
shuffle_shards=True, # Shuffle the order the shards are read in
resample_shards=False, # Sample shards with replacement. If true, an epoch will be infinite unless stopped manually
)
for img, emb in dataloader:
print(img.shape) # torch.Size([32, 3, 256, 256])
print(emb[\”img\”].shape) # torch.Size([32, 512])
# Train decoder only as shown above
# Or create a dataset without a loader so you can configure it manually
dataset = ImageEmbeddingDataset(
urls=\”/path/or/url/to/webdataset/{0000..9999}.tar\”,
embedding_folder_url=\”path/or/url/to/embeddings/folder\”,
shard_width=4,
shuffle_shards=True,
resample=False
)

Scripts

train_diffusion_prior.py

For detailed information on training the diffusion prior, please refer to the dedicated readme

Todo

finish off gaussian diffusion class for latent embedding – allow for prediction of epsilon add what was proposed in the paper, where DDPM objective for image latent embedding predicts x0 directly (reread vq-diffusion paper and get caught up on that line of work) make sure it works end to end to produce an output tensor, taking a single gradient step augment unet so that it can also be conditioned on text encodings (although in paper they hinted this didn’t make much a difference) figure out all the current bag of tricks needed to make DDPMs great (starting with the blur trick mentioned in paper) build the cascading ddpm by having Decoder class manage multiple unets at different resolutions add efficient attention in unet be able to finely customize what to condition on (text, image embed) for specific unet in the cascade (super resolution ddpms near the end may not need too much conditioning) offload unets not being trained on to CPU for memory efficiency (for training each resolution unets separately) build out latent diffusion architecture, with the vq-reg variant (vqgan-vae), make it completely optional and compatible with cascading ddpms for decoder, allow ability to customize objective (predict epsilon vs x0), in case latent diffusion does better with prediction of x0 use attention-based upsampling https://arxiv.org/abs/2112.11435 use inheritance just this once for sharing logic between decoder and prior network ddpms bring in vit-vqgan https://arxiv.org/abs/2110.04627 for the latent diffusion abstract interface for CLIP adapter class, so other CLIPs can be brought in take care of mixed precision as well as gradient accumulation within decoder trainer just take care of the training for the decoder in a wrapper class, as each unet in the cascade will need its own optimizer bring in tools to train vqgan-vae add convnext backbone for vqgan-vae (in addition to vit [vit-vqgan] + resnet) make sure DDPMs can be run with traditional resnet blocks (but leave convnext as an option for experimentation) make sure for the latter unets in the cascade, one can train on crops for learning super resolution (constrain the unet to be only convolutions in that case, or allow conv-like attention with rel pos bias) offer setting in diffusion prior to split time and image embeddings into multiple tokens, configurable, for more surface area during attention make sure resnet hyperparameters can be configurable across unet depth (groups and expansion factor) pull logic for training diffusion prior into a class DiffusionPriorTrainer, for eventual script based + CLI based training make sure the cascading ddpm in the repository can be trained unconditionally, offer a one-line CLI tool for training on a folder of images bring in cross-scale embedding from iclr paper https://github.com/lucidrains/vit-pytorch/blob/main/vit_pytorch/crossformer.py#L14 cross embed layers for downsampling, as an option use an experimental tracker agnostic setup, as done here use pydantic for config drive training for both diffusion prior and decoder, all exponential moving averaged models needs to be saved and restored as well (as well as the step number) offer save / load methods on the trainer classes to automatically take care of state dicts for scalers / optimizers / saving versions and checking for breaking changes allow for creation of diffusion prior model off pydantic config classes – consider the same for tracker configs bring in skip-layer excitations (from lightweight gan paper) to see if it helps for either decoder of unet or vqgan-vae training (doesnt work well) test out grid attention in cascading ddpm locally, decide whether to keep or remove https://arxiv.org/abs/2204.01697 (keeping, seems to be fine) allow for unet to be able to condition non-cross attention style as well speed up inference, read up on papers (ddim) add inpainting ability using resampler from repaint paper https://arxiv.org/abs/2201.09865 add the final combination of upsample feature maps, used in unet squared, seems to have an effect in local experiments consider elucidated dalle2 https://arxiv.org/abs/2206.00364 add simple outpainting, text-guided 2x size the image for starters interface out the vqgan-vae so a pretrained one can be pulled off the shelf to validate latent diffusion + DALL-E2

Citations

@misc{ramesh2022,
title = {Hierarchical Text-Conditional Image Generation with CLIP Latents},
author = {Aditya Ramesh et al},
year = {2022}
}

@misc{crowson2022,
author = {Katherine Crowson},
url = {https://twitter.com/rivershavewings}
}

@misc{rombach2021highresolution,
title = {High-Resolution Image Synthesis with Latent Diffusion Models},
author = {Robin Rombach and Andreas Blattmann and Dominik Lorenz and Patrick Esser and Björn Ommer},
year = {2021},
eprint = {2112.10752},
archivePrefix = {arXiv},
primaryClass = {cs.CV}
}

@article{shen2019efficient,
author = {Zhuoran Shen and Mingyuan Zhang and Haiyu Zhao and Shuai Yi and Hongsheng Li},
title = {Efficient Attention: Attention with Linear Complexities},
journal = {CoRR},
year = {2018},
url = {http://arxiv.org/abs/1812.01243},
}

@article{Yu2021VectorquantizedIM,
title = {Vector-quantized Image Modeling with Improved VQGAN},
author = {Jiahui Yu and Xin Li and Jing Yu Koh and Han Zhang and Ruoming Pang and James Qin and Alexander Ku and Yuanzhong Xu and Jason Baldridge and Yonghui Wu},
journal = {ArXiv},
year = {2021},
volume = {abs/2110.04627}
}

@article{Shleifer2021NormFormerIT,
title = {NormFormer: Improved Transformer Pretraining with Extra Normalization},
author = {Sam Shleifer and Jason Weston and Myle Ott},
journal = {ArXiv},
year = {2021},
volume = {abs/2110.09456}
}

@article{Yu2022CoCaCC,
title = {CoCa: Contrastive Captioners are Image-Text Foundation Models},
author = {Jiahui Yu and Zirui Wang and Vijay Vasudevan and Legg Yeung and Mojtaba Seyedhosseini and Yonghui Wu},
journal = {ArXiv},
year = {2022},
volume = {abs/2205.01917}
}

@misc{wang2021crossformer,
title = {CrossFormer: A Versatile Vision Transformer Hinging on Cross-scale Attention},
author = {Wenxiao Wang and Lu Yao and Long Chen and Binbin Lin and Deng Cai and Xiaofei He and Wei Liu},
year = {2021},
eprint = {2108.00154},
archivePrefix = {arXiv},
primaryClass = {cs.CV}
}

@article{ho2021cascaded,
title = {Cascaded Diffusion Models for High Fidelity Image Generation},
author = {Ho, Jonathan and Saharia, Chitwan and Chan, William and Fleet, David J and Norouzi, Mohammad and Salimans, Tim},
journal = {arXiv preprint arXiv:2106.15282},
year = {2021}
}

@misc{Saharia2022,
title = {Imagen: unprecedented photorealism × deep level of language understanding},
author = {Chitwan Saharia*, William Chan*, Saurabh Saxena†, Lala Li†, Jay Whang†, Emily Denton, Seyed Kamyar Seyed Ghasemipour, Burcu Karagol Ayan, S. Sara Mahdavi, Rapha Gontijo Lopes, Tim Salimans, Jonathan Ho†, David Fleet†, Mohammad Norouzi*},
year = {2022}
}

@article{Choi2022PerceptionPT,
title = {Perception Prioritized Training of Diffusion Models},
author = {Jooyoung Choi and Jungbeom Lee and Chaehun Shin and Sungwon Kim and Hyunwoo J. Kim and Sung-Hoon Yoon},
journal = {ArXiv},
year = {2022},
volume = {abs/2204.00227}
}

@article{Saharia2021PaletteID,
title = {Palette: Image-to-Image Diffusion Models},
author = {Chitwan Saharia and William Chan and Huiwen Chang and Chris A. Lee and Jonathan Ho and Tim Salimans and David J. Fleet and Mohammad Norouzi},
journal = {ArXiv},
year = {2021},
volume = {abs/2111.05826}
}

@article{Lugmayr2022RePaintIU,
title = {RePaint: Inpainting using Denoising Diffusion Probabilistic Models},
author = {Andreas Lugmayr and Martin Danelljan and Andr{\\\’e}s Romero and Fisher Yu and Radu Timofte and Luc Van Gool},
journal = {ArXiv},
year = {2022},
volume = {abs/2201.09865}
}

@misc{chen2022analog,
title = {Analog Bits: Generating Discrete Data using Diffusion Models with Self-Conditioning},
author = {Ting Chen and Ruixiang Zhang and Geoffrey Hinton},
year = {2022},
eprint = {2208.04202},
archivePrefix = {arXiv},
primaryClass = {cs.CV}
}

@article{Qiao2019WeightS,
title = {Weight Standardization},
author = {Siyuan Qiao and Huiyu Wang and Chenxi Liu and Wei Shen and Alan Loddon Yuille},
journal = {ArXiv},
year = {2019},
volume = {abs/1903.10520}
}

@inproceedings{rogozhnikov2022einops,
title = {Einops: Clear and Reliable Tensor Manipulations with Einstein-like Notation},
author = {Alex Rogozhnikov},
booktitle = {International Conference on Learning Representations},
year = {2022},
url = {https://openreview.net/forum?id=oapKSVM2bcj}
}

@article{Sunkara2022NoMS,
title = {No More Strided Convolutions or Pooling: A New CNN Building Block for Low-Resolution Images and Small Objects},
author = {Raja Sunkara and Tie Luo},
journal = {ArXiv},
year = {2022},
volume = {abs/2208.03641}
}

@article{Salimans2022ProgressiveDF,
title = {Progressive Distillation for Fast Sampling of Diffusion Models},
author = {Tim Salimans and Jonathan Ho},
journal = {ArXiv},
year = {2022},
volume = {abs/2202.00512}
}

Creating noise from data is easy; creating data from noise is generative modeling. – Yang Song’s paper

#以上关于Lucidrains 系列项目源码解析(二十)的相关内容来源网络仅供参考,相关信息请以官方公告为准!

原创文章,作者:CSDN,如若转载,请注明出处:https://www.sudun.com/ask/93045.html

(0)
CSDN's avatarCSDN
上一篇 2024年7月5日 上午6:28
下一篇 2024年7月5日 上午6:46

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注