import os import numpy as np import torch import torch.nn as nn from timm.data import IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD from timm.models.layers import DropPath, trunc_normal_ from timm.models.registry import register_model from timm.models.layers import to_2tuple import math from torch import Tensor from torch.nn import init from torch.nn.modules.utils import _pair from torchvision.ops.deform_conv import deform_conv2d as deform_conv2d_tv def _cfg(url='', **kwargs): return { 'url': url, 'num_classes': 1000, 'input_size': (3, 224, 224), 'pool_size': None, 'crop_pct': .96, 'interpolation': 'bicubic', 'mean': IMAGENET_DEFAULT_MEAN, 'std': IMAGENET_DEFAULT_STD, 'classifier': 'head', **kwargs } default_cfgs = { 'my_S': _cfg(crop_pct=0.9), 'my_M': _cfg(crop_pct=0.9), 'my_L': _cfg(crop_pct=0.875), } class Mlp(nn.Module): def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.): super().__init__() out_features = out_features or in_features hidden_features = hidden_features or in_features self.fc1 = nn.Linear(in_features, hidden_features) self.act = act_layer() self.fc2 = nn.Linear(hidden_features, out_features) self.drop = nn.Dropout(drop) def forward(self, x): x = self.fc1(x) x = self.act(x) x = self.drop(x) x = self.fc2(x) x = self.drop(x) return x class MyFC(nn.Module): """ """ def __init__( self, in_channels: int, out_channels: int, kernel_size, # re-defined kernel_size, represent the spatial area of staircase FC stride: int = 1, padding: int = 0, dilation: int = 1, groups: int = 1, conjugate: bool = False, bias: bool = True, ): super(MyFC, self).__init__() if in_channels % groups != 0: raise ValueError('in_channels must be divisible by groups') if out_channels % groups != 0: raise ValueError('out_channels must be divisible by groups') if stride != 1: raise ValueError('stride must be 1') if padding != 0: raise ValueError('padding must be 0') self.in_channels = in_channels self.out_channels = out_channels self.kernel_size = kernel_size self.stride = _pair(stride) self.padding = _pair(padding) self.dilation = _pair(dilation) self.groups = groups self.conjugate = conjugate self.weight = nn.Parameter(torch.empty(out_channels, in_channels // groups, 1, 1)) # kernel size == 1 if bias: self.bias = nn.Parameter(torch.empty(out_channels)) else: self.register_parameter('bias', None) self.register_buffer('offset', self.gen_offset()) self.reset_parameters() def reset_parameters(self) -> None: init.kaiming_uniform_(self.weight, a=math.sqrt(5)) if self.bias is not None: fan_in, _ = init._calculate_fan_in_and_fan_out(self.weight) bound = 1 / math.sqrt(fan_in) init.uniform_(self.bias, -bound, bound) def gen_offset(self): """ offset (Tensor[batch_size, 2 * offset_groups * kernel_height * kernel_width, out_height, out_width]): offsets to be applied for each position in the convolution kernel. """ # offset shape: (1, self.in_channels*2, 1, 1) k = self.kernel_size # Note: Unary - takes precedence over binary // if self.conjugate: return torch.Tensor([[i, j] for j in range(-(k//2), k//2+1) for i in range(-(k//2),k//2+1)][::-1] \ * math.ceil(self.in_channels / k**2)).view(1, -1, 1, 1)[:, :2 * self.in_channels] return torch.Tensor([[i, j] for j in range(-(k//2), k//2+1) for i in range(-(k//2),k//2+1)] \ * math.ceil(self.in_channels / k**2)).view(1, -1, 1, 1)[:, :2 * self.in_channels] def forward(self, input: Tensor) -> Tensor: """ Args: input (Tensor[batch_size, in_channels, in_height, in_width]): input tensor """ B, C, H, W = input.size() return deform_conv2d_tv(input, self.offset.expand(B, -1, H, W), self.weight, self.bias, stride=self.stride, padding=self.padding, dilation=self.dilation) def extra_repr(self) -> str: s = self.__class__.__name__ + '(' s += '{in_channels}' s += ', {out_channels}' s += ', kernel_size={kernel_size}' s += ', stride={stride}' s += ', padding={padding}' if self.padding != (0, 0) else '' s += ', dilation={dilation}' if self.dilation != (1, 1) else '' s += ', groups={groups}' if self.groups != 1 else '' s += ', bias=False' if self.bias is None else '' s += ')' return s.format(**self.__dict__) class MyMLP(nn.Module): def __init__(self, dim, qkv_bias=False, qk_scale=None, attn_drop=0., proj_drop=0.): super().__init__() self.mlp_c = nn.Linear(dim, dim, bias=qkv_bias) self.sfc1 = MyFC(dim, dim, 3, conjugate=True) self.sfc2 = MyFC(dim, dim, 3, conjugate=False) self.reweight = Mlp(dim, dim // 4, dim * 3) self.proj = nn.Linear(dim, dim) self.proj_drop = nn.Dropout(proj_drop) def forward(self, x): B, H, W, C = x.shape h = self.sfc1(x.permute(0, 3, 1, 2)).permute(0, 2, 3, 1) w = self.sfc2(x.permute(0, 3, 1, 2)).permute(0, 2, 3, 1) c = self.mlp_c(x) a = (h + w + c).permute(0, 3, 1, 2).flatten(2).mean(2) a = self.reweight(a).reshape(B, C, 3).permute(2, 0, 1).softmax(dim=0).unsqueeze(2).unsqueeze(2) x = h * a[0] + w * a[1] + c * a[2] x = self.proj(x) x = self.proj_drop(x) return x class MyBlock(nn.Module): def __init__(self, dim, mlp_ratio=4., qkv_bias=False, qk_scale=None, drop=0., attn_drop=0., drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm, skip_lam=1.0, mlp_fn=MyMLP): super().__init__() self.norm1 = norm_layer(dim) self.attn = mlp_fn(dim, qkv_bias=qkv_bias, qk_scale=None, attn_drop=attn_drop) # NOTE: drop path for stochastic depth, we shall see if this is better than dropout here self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity() self.norm2 = norm_layer(dim) mlp_hidden_dim = int(dim * mlp_ratio) self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer) self.skip_lam = skip_lam def forward(self, x): x = x + self.drop_path(self.attn(self.norm1(x))) / self.skip_lam x = x + self.drop_path(self.mlp(self.norm2(x))) / self.skip_lam return x class PatchEmbedOverlapping(nn.Module): """ 2D Image to Patch Embedding with overlapping """ def __init__(self, patch_size=16, stride=16, padding=0, in_chans=3, embed_dim=768, norm_layer=None, groups=1): super().__init__() patch_size = to_2tuple(patch_size) stride = to_2tuple(stride) padding = to_2tuple(padding) self.patch_size = patch_size # remove image_size in model init to support dynamic image size self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=stride, padding=padding, groups=groups) self.norm = norm_layer(embed_dim) if norm_layer else nn.Identity() def forward(self, x): x = self.proj(x) return x class Downsample(nn.Module): """ Downsample transition stage """ def __init__(self, in_embed_dim, out_embed_dim, patch_size): super().__init__() assert patch_size == 2, patch_size self.proj = nn.Conv2d(in_embed_dim, out_embed_dim, kernel_size=(3, 3), stride=(2, 2), padding=1) def forward(self, x): x = x.permute(0, 3, 1, 2) x = self.proj(x) # B, C, H, W x = x.permute(0, 2, 3, 1) return x def basic_blocks(dim, index, layers, mlp_ratio=3., qkv_bias=False, qk_scale=None, attn_drop=0., drop_path_rate=0., skip_lam=1.0, mlp_fn=MyMLP, **kwargs): blocks = [] for block_idx in range(layers[index]): block_dpr = drop_path_rate * (block_idx + sum(layers[:index])) / (sum(layers) - 1) blocks.append(MyBlock(dim, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop, drop_path=block_dpr, skip_lam=skip_lam, mlp_fn=mlp_fn)) blocks = nn.Sequential(*blocks) return blocks class MyNet(nn.Module): """ MyMLP Network """ def __init__(self, layers, img_size=224, patch_size=4, in_chans=3, num_classes=1000, embed_dims=None, transitions=None, segment_dim=None, mlp_ratios=None, skip_lam=1.0, qkv_bias=False, qk_scale=None, drop_rate=0., attn_drop_rate=0., drop_path_rate=0., norm_layer=nn.LayerNorm, mlp_fn=MyMLP, fork_feat=False, **kwargs): super().__init__() if not fork_feat: self.num_classes = num_classes self.fork_feat = fork_feat self.patch_embed = PatchEmbedOverlapping(patch_size=7, stride=4, padding=2, in_chans=3, embed_dim=embed_dims[0]) network = [] for i in range(len(layers)): stage = basic_blocks(embed_dims[i], i, layers, mlp_ratio=mlp_ratios[i], qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop_rate, drop_path_rate=drop_path_rate, norm_layer=norm_layer, skip_lam=skip_lam, mlp_fn=mlp_fn) network.append(stage) if i >= len(layers) - 1: break if transitions[i] or embed_dims[i] != embed_dims[i+1]: patch_size = 2 if transitions[i] else 1 network.append(Downsample(embed_dims[i], embed_dims[i+1], patch_size)) self.network = nn.ModuleList(network) if self.fork_feat: # add a norm layer for each output self.out_indices = [0, 2, 4, 6] for i_emb, i_layer in enumerate(self.out_indices): if i_emb == 0 and os.environ.get('FORK_LAST3', None): # TODO: more elegant way """For RetinaNet, `start_level=1`. The first norm layer will not used. cmd: `FORK_LAST3=1 python -m torch.distributed.launch ...` """ layer = nn.Identity() else: layer = norm_layer(embed_dims[i_emb]) layer_name = f'norm{i_layer}' self.add_module(layer_name, layer) else: # Classifier head self.norm = norm_layer(embed_dims[-1]) self.head = nn.Linear(embed_dims[-1], num_classes) if num_classes > 0 else nn.Identity() self.apply(self.cls_init_weights) def cls_init_weights(self, m): if isinstance(m, nn.Linear): trunc_normal_(m.weight, std=.02) if isinstance(m, nn.Linear) and m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.LayerNorm): nn.init.constant_(m.bias, 0) nn.init.constant_(m.weight, 1.0) elif isinstance(m, MyFC): trunc_normal_(m.weight, std=.02) nn.init.constant_(m.bias, 0) def get_classifier(self): return self.head def reset_classifier(self, num_classes, global_pool=''): self.num_classes = num_classes self.head = nn.Linear(self.embed_dim, num_classes) if num_classes > 0 else nn.Identity() def forward_embeddings(self, x): x = self.patch_embed(x) # B,C,H,W-> B,H,W,C x = x.permute(0, 2, 3, 1) return x def forward_tokens(self, x): outs = [] for idx, block in enumerate(self.network): x = block(x) if self.fork_feat and idx in self.out_indices: norm_layer = getattr(self, f'norm{idx}') x_out = norm_layer(x) outs.append(x_out.permute(0, 3, 1, 2).contiguous()) if self.fork_feat: return outs B, H, W, C = x.shape x = x.reshape(B, -1, C) return x def forward(self, x): x = self.forward_embeddings(x) # B, H, W, C -> B, N, C x = self.forward_tokens(x) if self.fork_feat: return x x = self.norm(x) cls_out = self.head(x.mean(1)) return cls_out @register_model def MyMLP_B1(pretrained=False, **kwargs): transitions = [True, True, True, True] layers = [2, 2, 4, 2] mlp_ratios = [4, 4, 4, 4] embed_dims = [64, 128, 320, 512] model = MyNet(layers, embed_dims=embed_dims, patch_size=7, transitions=transitions, mlp_ratios=mlp_ratios, mlp_fn=MyMLP, **kwargs) model.default_cfg = default_cfgs['my_S'] return model @register_model def MyMLP_B2(pretrained=False, **kwargs): transitions = [True, True, True, True] layers = [2, 3, 10, 3] mlp_ratios = [4, 4, 4, 4] embed_dims = [64, 128, 320, 512] model = MyNet(layers, embed_dims=embed_dims, patch_size=7, transitions=transitions, mlp_ratios=mlp_ratios, mlp_fn=MyMLP, **kwargs) model.default_cfg = default_cfgs['my_S'] return model @register_model def MyMLP_B3(pretrained=False, **kwargs): transitions = [True, True, True, True] layers = [3, 4, 18, 3] mlp_ratios = [8, 8, 4, 4] embed_dims = [64, 128, 320, 512] model = MyNet(layers, embed_dims=embed_dims, patch_size=7, transitions=transitions, mlp_ratios=mlp_ratios, mlp_fn=MyMLP, **kwargs) model.default_cfg = default_cfgs['my_M'] return model @register_model def MyMLP_B4(pretrained=False, **kwargs): transitions = [True, True, True, True] layers = [3, 8, 27, 3] mlp_ratios = [8, 8, 4, 4] embed_dims = [64, 128, 320, 512] model = MyNet(layers, embed_dims=embed_dims, patch_size=7, transitions=transitions, mlp_ratios=mlp_ratios, mlp_fn=MyMLP, **kwargs) model.default_cfg = default_cfgs['my_L'] return model @register_model def MyMLP_B5(pretrained=False, **kwargs): transitions = [True, True, True, True] layers = [3, 4, 24, 3] mlp_ratios = [4, 4, 4, 4] embed_dims = [96, 192, 384, 768] model = MyNet(layers, embed_dims=embed_dims, patch_size=7, transitions=transitions, mlp_ratios=mlp_ratios, mlp_fn=MyMLP, **kwargs) model.default_cfg = default_cfgs['my_L'] return model