|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import math |
|
from typing import Optional |
|
|
|
import torch |
|
from torch import Tensor, nn as nn |
|
from torch.nn import functional as F |
|
from torch.nn.modules import transformer |
|
|
|
from timm.models.vision_transformer import PatchEmbed, VisionTransformer |
|
|
|
|
|
class DecoderLayer(nn.Module): |
|
"""A Transformer decoder layer supporting two-stream attention (XLNet) |
|
This implements a pre-LN decoder, as opposed to the post-LN default in PyTorch.""" |
|
|
|
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation='gelu', layer_norm_eps=1e-5): |
|
super().__init__() |
|
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True) |
|
self.cross_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True) |
|
|
|
self.linear1 = nn.Linear(d_model, dim_feedforward) |
|
self.dropout = nn.Dropout(dropout) |
|
self.linear2 = nn.Linear(dim_feedforward, d_model) |
|
|
|
self.norm1 = nn.LayerNorm(d_model, eps=layer_norm_eps) |
|
self.norm2 = nn.LayerNorm(d_model, eps=layer_norm_eps) |
|
self.norm_q = nn.LayerNorm(d_model, eps=layer_norm_eps) |
|
self.norm_c = nn.LayerNorm(d_model, eps=layer_norm_eps) |
|
self.dropout1 = nn.Dropout(dropout) |
|
self.dropout2 = nn.Dropout(dropout) |
|
self.dropout3 = nn.Dropout(dropout) |
|
|
|
self.activation = transformer._get_activation_fn(activation) |
|
|
|
def __setstate__(self, state): |
|
if 'activation' not in state: |
|
state['activation'] = F.gelu |
|
super().__setstate__(state) |
|
|
|
def forward_stream( |
|
self, |
|
tgt: Tensor, |
|
tgt_norm: Tensor, |
|
tgt_kv: Tensor, |
|
memory: Tensor, |
|
tgt_mask: Optional[Tensor], |
|
tgt_key_padding_mask: Optional[Tensor], |
|
): |
|
"""Forward pass for a single stream (i.e. content or query) |
|
tgt_norm is just a LayerNorm'd tgt. Added as a separate parameter for efficiency. |
|
Both tgt_kv and memory are expected to be LayerNorm'd too. |
|
memory is LayerNorm'd by ViT. |
|
""" |
|
tgt2, sa_weights = self.self_attn( |
|
tgt_norm, tgt_kv, tgt_kv, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask |
|
) |
|
tgt = tgt + self.dropout1(tgt2) |
|
|
|
tgt2, ca_weights = self.cross_attn(self.norm1(tgt), memory, memory) |
|
tgt = tgt + self.dropout2(tgt2) |
|
|
|
tgt2 = self.linear2(self.dropout(self.activation(self.linear1(self.norm2(tgt))))) |
|
tgt = tgt + self.dropout3(tgt2) |
|
return tgt, sa_weights, ca_weights |
|
|
|
def forward( |
|
self, |
|
query, |
|
content, |
|
memory, |
|
query_mask: Optional[Tensor] = None, |
|
content_mask: Optional[Tensor] = None, |
|
content_key_padding_mask: Optional[Tensor] = None, |
|
update_content: bool = True, |
|
): |
|
query_norm = self.norm_q(query) |
|
content_norm = self.norm_c(content) |
|
query = self.forward_stream(query, query_norm, content_norm, memory, query_mask, content_key_padding_mask)[0] |
|
if update_content: |
|
content = self.forward_stream( |
|
content, content_norm, content_norm, memory, content_mask, content_key_padding_mask |
|
)[0] |
|
return query, content |
|
|
|
|
|
class Decoder(nn.Module): |
|
__constants__ = ['norm'] |
|
|
|
def __init__(self, decoder_layer, num_layers, norm): |
|
super().__init__() |
|
self.layers = transformer._get_clones(decoder_layer, num_layers) |
|
self.num_layers = num_layers |
|
self.norm = norm |
|
|
|
def forward( |
|
self, |
|
query, |
|
content, |
|
memory, |
|
query_mask: Optional[Tensor] = None, |
|
content_mask: Optional[Tensor] = None, |
|
content_key_padding_mask: Optional[Tensor] = None, |
|
): |
|
for i, mod in enumerate(self.layers): |
|
last = i == len(self.layers) - 1 |
|
query, content = mod( |
|
query, content, memory, query_mask, content_mask, content_key_padding_mask, update_content=not last |
|
) |
|
query = self.norm(query) |
|
return query |
|
|
|
|
|
class Encoder(VisionTransformer): |
|
|
|
def __init__( |
|
self, |
|
img_size=224, |
|
patch_size=16, |
|
in_chans=3, |
|
embed_dim=768, |
|
depth=12, |
|
num_heads=12, |
|
mlp_ratio=4.0, |
|
qkv_bias=True, |
|
drop_rate=0.0, |
|
attn_drop_rate=0.0, |
|
drop_path_rate=0.0, |
|
embed_layer=PatchEmbed, |
|
): |
|
super().__init__( |
|
img_size, |
|
patch_size, |
|
in_chans, |
|
embed_dim=embed_dim, |
|
depth=depth, |
|
num_heads=num_heads, |
|
mlp_ratio=mlp_ratio, |
|
qkv_bias=qkv_bias, |
|
drop_rate=drop_rate, |
|
attn_drop_rate=attn_drop_rate, |
|
drop_path_rate=drop_path_rate, |
|
embed_layer=embed_layer, |
|
num_classes=0, |
|
global_pool='', |
|
class_token=False, |
|
) |
|
|
|
def forward(self, x): |
|
|
|
return self.forward_features(x) |
|
|
|
|
|
class TokenEmbedding(nn.Module): |
|
|
|
def __init__(self, charset_size: int, embed_dim: int): |
|
super().__init__() |
|
self.embedding = nn.Embedding(charset_size, embed_dim) |
|
self.embed_dim = embed_dim |
|
|
|
def forward(self, tokens: torch.Tensor): |
|
return math.sqrt(self.embed_dim) * self.embedding(tokens) |
|
|