【pytorch】torchvisionのCNNモデルの内部構造から仕組みを知る

VGG
ソースコード: https://github.com/pytorch/vision/blob/main/torchvision/models/vgg.py
VGGは3x3フィルタを重ねている単純なモデル。
とりわけVGG16 はプーリング層を除いた、畳み込み層(13)+全結合層(3) の計16層で構成されている。
VGGの全結合層は4096ユニットもあり、これが大量のメモリを消費している。
また、パラメーター数は1億3000万を超えており、caltech-101程度のデータ量(1クラス50件)では過学習を起こす。
期待をしている画像サイズは224x224。畳み込みを終えた時点で7x7に調整し、全結合を行う。
class VGG(nn.Module):
def __init__(
self, features: nn.Module, num_classes: int = 1000, init_weights: bool = True, dropout: float = 0.5
) -> None:
super().__init__()
_log_api_usage_once(self)
# make_layer で生成したCNNを使う。VGGのバージョンごとに異なる。
self.features = features
# 7x7の平均プーリング層 (ここでどんなサイズが届いても7x7に再調整する。224/32=7x7であるが、例えば256が来たときなどで7x7でなくなり落ちてしまうのを防ぐため)
self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
# 全結合層(分類を行う)
self.classifier = nn.Sequential(
# 512次元 * 畳み込み+プーリングされた7x7画像を、4096のクラスに分類
nn.Linear(512 * 7 * 7, 4096),
# ReLU活性化関数
nn.ReLU(True),
# ドロップアウト
nn.Dropout(p=dropout),
# 次元を並び替え
nn.Linear(4096, 4096),
# ReLU活性化関数
nn.ReLU(True),
# ドロップアウト
nn.Dropout(p=dropout),
# 指定したクラスで分類
nn.Linear(4096, num_classes),
)
if init_weights:
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def make_layers(cfg: list[Union[str, int]], batch_norm: bool = False) -> nn.Sequential:
layers: list[nn.Module] = []
in_channels = 3
for v in cfg:
if v == "M":
# Mの場合はマックスプーリング
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
else:
# 数字の場合は 3x3の畳み込み。サイズはそのままにチャンネル数を増やす。
v = cast(int, v)
conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
#
if batch_norm:
layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
else:
layers += [conv2d, nn.ReLU(inplace=True)]
# 増やしたチャンネル数を与えて次の層へ
in_channels = v
return nn.Sequential(*layers)
# VGGの各バージョンのリスト
"""
A: VGG11
B: VGG13
D: VGG16
E: VGG19
"""
# 数字 : 出力チャンネル数を指定した畳み込み層
# M : マックスプーリングで画像サイズを半分にする層。ABDEそれぞれに5個用意されている。 224/32 = 7x7 になる。
cfgs: dict[str, list[Union[str, int]]] = {
"A": [64, "M", 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"],
"B": [64, 64, "M", 128, 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"],
"D": [64, 64, "M", 128, 128, "M", 256, 256, 256, "M", 512, 512, 512, "M", 512, 512, 512, "M"],
"E": [64, 64, "M", 128, 128, "M", 256, 256, 256, 256, "M", 512, 512, 512, 512, "M", 512, 512, 512, 512, "M"],
}
def _vgg(cfg: str, batch_norm: bool, weights: Optional[WeightsEnum], progress: bool, **kwargs: Any) -> VGG:
if weights is not None:
kwargs["init_weights"] = False
if weights.meta["categories"] is not None:
_ovewrite_named_param(kwargs, "num_classes", len(weights.meta["categories"]))
# make_layers で畳み込み層を作る
model = VGG(make_layers(cfgs[cfg], batch_norm=batch_norm), **kwargs)
if weights is not None:
model.load_state_dict(weights.get_state_dict(progress=progress, check_hash=True))
return model
_COMMON_META = {
"min_size": (32, 32),
"categories": _IMAGENET_CATEGORIES,
"recipe": "https://github.com/pytorch/vision/tree/main/references/classification#alexnet-and-vgg",
"_docs": """These weights were trained from scratch by using a simplified training recipe.""",
}
https://github.com/pytorch/vision/blob/main/torchvision/models/vgg.py
GoogLeNet
VGGと違い、並列で畳み込みを行うことで、パラメーター数を大幅に削減したCNNモデル。
PytorchのGoogLeNetでは、1x1,3x3,5x5 ではなく、1x1,3x3,3x3 の並列畳み込みが行われている。1x1畳み込みはチャンネル方向の削減を行っている。
更に、VGGで使用されていた巨大な全結合層が廃止され、GoogLeNetでは、GlobalAveragePoolingを採用。
これにより、VGGは1億3800万のパラメーターに対して、GoogLeNetは700万まで削減された。
class GoogLeNet(nn.Module):
__constants__ = ["aux_logits", "transform_input"]
def __init__(
self,
num_classes: int = 1000,
aux_logits: bool = True,
transform_input: bool = False,
init_weights: Optional[bool] = None,
blocks: Optional[list[Callable[..., nn.Module]]] = None,
dropout: float = 0.2,
dropout_aux: float = 0.7,
) -> None:
super().__init__()
_log_api_usage_once(self)
if blocks is None:
blocks = [BasicConv2d, Inception, InceptionAux]
if init_weights is None:
warnings.warn(
"The default weight initialization of GoogleNet will be changed in future releases of "
"torchvision. If you wish to keep the old behavior (which leads to long initialization times"
" due to scipy/scipy#11299), please set init_weights=True.",
FutureWarning,
)
init_weights = True
if len(blocks) != 3:
raise ValueError(f"blocks length should be 3 instead of {len(blocks)}")
conv_block = blocks[0]
inception_block = blocks[1]
inception_aux_block = blocks[2]
self.aux_logits = aux_logits
self.transform_input = transform_input
# 前処理
self.conv1 = conv_block(3, 64, kernel_size=7, stride=2, padding=3)
self.maxpool1 = nn.MaxPool2d(3, stride=2, ceil_mode=True)
self.conv2 = conv_block(64, 64, kernel_size=1)
self.conv3 = conv_block(64, 192, kernel_size=3, padding=1)
self.maxpool2 = nn.MaxPool2d(3, stride=2, ceil_mode=True)
# Inceptionブロック群
"""
引数: [入力値],[1x1の出力],[1x1の出力],[3x3の出力],[1x1の出力],[3x3の出力],[1x1の出力]
1x1 conv: 192 → 64
1x1 conv: 192 → 96
3x3 conv: 96 → 128
1x1 conv: 192 → 16
3x3 conv: 16 → 32
maxpool
1x1 conv: 192 → 32
"""
self.inception3a = inception_block(192, 64, 96, 128, 16, 32, 32)
self.inception3b = inception_block(256, 128, 128, 192, 32, 96, 64)
self.maxpool3 = nn.MaxPool2d(3, stride=2, ceil_mode=True)
# Inceptionブロック群
self.inception4a = inception_block(480, 192, 96, 208, 16, 48, 64)
self.inception4b = inception_block(512, 160, 112, 224, 24, 64, 64)
self.inception4c = inception_block(512, 128, 128, 256, 24, 64, 64)
self.inception4d = inception_block(512, 112, 144, 288, 32, 64, 64)
self.inception4e = inception_block(528, 256, 160, 320, 32, 128, 128)
self.maxpool4 = nn.MaxPool2d(2, stride=2, ceil_mode=True)
# Inceptionブロック群
self.inception5a = inception_block(832, 256, 160, 320, 32, 128, 128)
self.inception5b = inception_block(832, 384, 192, 384, 48, 128, 128)
if aux_logits:
self.aux1 = inception_aux_block(512, num_classes, dropout=dropout_aux)
self.aux2 = inception_aux_block(528, num_classes, dropout=dropout_aux)
else:
self.aux1 = None # type: ignore[assignment]
self.aux2 = None # type: ignore[assignment]
# ここでVGGと違い、巨大な全結合をせず、一旦Global Average Poolingを行っている。
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.dropout = nn.Dropout(p=dropout)
# GAP を使ったあとに全結合をしている。1024次元を必要クラス分
self.fc = nn.Linear(1024, num_classes)
if init_weights:
for m in self.modules():
if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
torch.nn.init.trunc_normal_(m.weight, mean=0.0, std=0.01, a=-2, b=2)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def _transform_input(self, x: Tensor) -> Tensor:
if self.transform_input:
x_ch0 = torch.unsqueeze(x[:, 0], 1) * (0.229 / 0.5) + (0.485 - 0.5) / 0.5
x_ch1 = torch.unsqueeze(x[:, 1], 1) * (0.224 / 0.5) + (0.456 - 0.5) / 0.5
x_ch2 = torch.unsqueeze(x[:, 2], 1) * (0.225 / 0.5) + (0.406 - 0.5) / 0.5
x = torch.cat((x_ch0, x_ch1, x_ch2), 1)
return x
def _forward(self, x: Tensor) -> tuple[Tensor, Optional[Tensor], Optional[Tensor]]:
# N x 3 x 224 x 224
x = self.conv1(x)
# N x 64 x 112 x 112
x = self.maxpool1(x)
# N x 64 x 56 x 56
x = self.conv2(x)
# N x 64 x 56 x 56
x = self.conv3(x)
# N x 192 x 56 x 56
x = self.maxpool2(x)
# N x 192 x 28 x 28
x = self.inception3a(x)
# N x 256 x 28 x 28
x = self.inception3b(x)
# N x 480 x 28 x 28
x = self.maxpool3(x)
# N x 480 x 14 x 14
x = self.inception4a(x)
# N x 512 x 14 x 14
aux1: Optional[Tensor] = None
if self.aux1 is not None:
if self.training:
aux1 = self.aux1(x)
x = self.inception4b(x)
# N x 512 x 14 x 14
x = self.inception4c(x)
# N x 512 x 14 x 14
x = self.inception4d(x)
# N x 528 x 14 x 14
aux2: Optional[Tensor] = None
if self.aux2 is not None:
if self.training:
aux2 = self.aux2(x)
x = self.inception4e(x)
# N x 832 x 14 x 14
x = self.maxpool4(x)
# N x 832 x 7 x 7
x = self.inception5a(x)
# N x 832 x 7 x 7
x = self.inception5b(x)
# N x 1024 x 7 x 7
x = self.avgpool(x)
# N x 1024 x 1 x 1
x = torch.flatten(x, 1)
# N x 1024
x = self.dropout(x)
x = self.fc(x)
# N x 1000 (num_classes)
return x, aux2, aux1
@torch.jit.unused
def eager_outputs(self, x: Tensor, aux2: Tensor, aux1: Optional[Tensor]) -> GoogLeNetOutputs:
if self.training and self.aux_logits:
return _GoogLeNetOutputs(x, aux2, aux1)
else:
return x # type: ignore[return-value]
def forward(self, x: Tensor) -> GoogLeNetOutputs:
x = self._transform_input(x)
x, aux2, aux1 = self._forward(x)
aux_defined = self.training and self.aux_logits
if torch.jit.is_scripting():
if not aux_defined:
warnings.warn("Scripted GoogleNet always returns GoogleNetOutputs Tuple")
return GoogLeNetOutputs(x, aux2, aux1)
else:
return self.eager_outputs(x, aux2, aux1)
class Inception(nn.Module):
def __init__(
self,
in_channels: int,
ch1x1: int,
ch3x3red: int,
ch3x3: int,
ch5x5red: int,
ch5x5: int,
pool_proj: int,
conv_block: Optional[Callable[..., nn.Module]] = None,
) -> None:
super().__init__()
if conv_block is None:
conv_block = BasicConv2d
# 1x1 の層(チャンネル数削減、非線形変換、計算量削減目的)
self.branch1 = conv_block(in_channels, ch1x1, kernel_size=1)
# 3x3 の層(1x1を行った上で3x3を行う。)
self.branch2 = nn.Sequential(
conv_block(in_channels, ch3x3red, kernel_size=1), conv_block(ch3x3red, ch3x3, kernel_size=3, padding=1)
)
# 5x5の層(※ 実際には3x3として計算をしている)
# (1x1を行った上で3x3を行う。)
self.branch3 = nn.Sequential(
conv_block(in_channels, ch5x5red, kernel_size=1),
# Here, kernel_size=3 instead of kernel_size=5 is a known bug.
# Please see https://github.com/pytorch/vision/issues/906 for details.
conv_block(ch5x5red, ch5x5, kernel_size=3, padding=1),
)
# マックスプーリング
self.branch4 = nn.Sequential(
nn.MaxPool2d(kernel_size=3, stride=1, padding=1, ceil_mode=True),
conv_block(in_channels, pool_proj, kernel_size=1),
)
def _forward(self, x: Tensor) -> list[Tensor]:
branch1 = self.branch1(x)
branch2 = self.branch2(x)
branch3 = self.branch3(x)
branch4 = self.branch4(x)
outputs = [branch1, branch2, branch3, branch4]
return outputs
def forward(self, x: Tensor) -> Tensor:
# 各種の並列処理をチャンネル方向に結合している。
outputs = self._forward(x)
return torch.cat(outputs, 1)
https://github.com/pytorch/vision/blob/main/torchvision/models/googlenet.py
ResNet
PytorchのResNetは、層を深くしていった場合に発生する勾配消失に対応するため、掛け算ではなく足し算を使用している。
BasicBlockとBottleneckという2つのクラスがある。
BasicBlockは3x3畳み込みを2層重ねている。主に、浅いResNetで利用されている。
Bottleneckは1x1,3x3,1x1 の順でチャンネル圧縮、畳み込みで空間方向の特徴抽出、チャンネル数を元に戻すという手法で畳み込みのコストを減らしている。
class BasicBlock(nn.Module):
expansion: int = 1
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
downsample: Optional[nn.Module] = None,
groups: int = 1,
base_width: int = 64,
dilation: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super().__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if groups != 1 or base_width != 64:
raise ValueError("BasicBlock only supports groups=1 and base_width=64")
if dilation > 1:
raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes)
self.bn2 = norm_layer(planes)
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class Bottleneck(nn.Module):
# Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
# while original implementation places the stride at the first 1x1 convolution(self.conv1)
# according to "Deep residual learning for image recognition" https://arxiv.org/abs/1512.03385.
# This variant is also known as ResNet V1.5 and improves accuracy according to
# https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.
expansion: int = 4
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
downsample: Optional[nn.Module] = None,
groups: int = 1,
base_width: int = 64,
dilation: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super().__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
width = int(planes * (base_width / 64.0)) * groups
# Both self.conv2 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv1x1(inplanes, width)
self.bn1 = norm_layer(width)
self.conv2 = conv3x3(width, width, stride, groups, dilation)
self.bn2 = norm_layer(width)
self.conv3 = conv1x1(width, planes * self.expansion)
self.bn3 = norm_layer(planes * self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(
self,
block: type[Union[BasicBlock, Bottleneck]],
layers: list[int],
num_classes: int = 1000,
zero_init_residual: bool = False,
groups: int = 1,
width_per_group: int = 64,
replace_stride_with_dilation: Optional[list[bool]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super().__init__()
_log_api_usage_once(self)
if norm_layer is None:
norm_layer = nn.BatchNorm2d
self._norm_layer = norm_layer
self.inplanes = 64
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False]
if len(replace_stride_with_dilation) != 3:
raise ValueError(
"replace_stride_with_dilation should be None "
f"or a 3-element tuple, got {replace_stride_with_dilation}"
)
self.groups = groups
self.base_width = width_per_group
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = norm_layer(self.inplanes)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block behaves like an identity.
# This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck) and m.bn3.weight is not None:
nn.init.constant_(m.bn3.weight, 0) # type: ignore[arg-type]
elif isinstance(m, BasicBlock) and m.bn2.weight is not None:
nn.init.constant_(m.bn2.weight, 0) # type: ignore[arg-type]
def _make_layer(
self,
block: type[Union[BasicBlock, Bottleneck]],
planes: int,
blocks: int,
stride: int = 1,
dilate: bool = False,
) -> nn.Sequential:
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if dilate:
self.dilation *= stride
stride = 1
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * block.expansion, stride),
norm_layer(planes * block.expansion),
)
layers = []
layers.append(
block(
self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer
)
)
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(
block(
self.inplanes,
planes,
groups=self.groups,
base_width=self.base_width,
dilation=self.dilation,
norm_layer=norm_layer,
)
)
return nn.Sequential(*layers)
def _forward_impl(self, x: Tensor) -> Tensor:
# See note [TorchScript super()]
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
def _resnet(
block: type[Union[BasicBlock, Bottleneck]],
layers: list[int],
weights: Optional[WeightsEnum],
progress: bool,
**kwargs: Any,
) -> ResNet:
if weights is not None:
_ovewrite_named_param(kwargs, "num_classes", len(weights.meta["categories"]))
model = ResNet(block, layers, **kwargs)
if weights is not None:
model.load_state_dict(weights.get_state_dict(progress=progress, check_hash=True))
return model
_COMMON_META = {
"min_size": (1, 1),
"categories": _IMAGENET_CATEGORIES,
}
https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py
DenseNet
前方の各層の出力がすべての後方の層の入力に使われるようになっている。
class _DenseLayer(nn.Module):
def __init__(
self, num_input_features: int, growth_rate: int, bn_size: int, drop_rate: float, memory_efficient: bool = False
) -> None:
super().__init__()
self.norm1 = nn.BatchNorm2d(num_input_features)
self.relu1 = nn.ReLU(inplace=True)
self.conv1 = nn.Conv2d(num_input_features, bn_size * growth_rate, kernel_size=1, stride=1, bias=False)
self.norm2 = nn.BatchNorm2d(bn_size * growth_rate)
self.relu2 = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(bn_size * growth_rate, growth_rate, kernel_size=3, stride=1, padding=1, bias=False)
self.drop_rate = float(drop_rate)
self.memory_efficient = memory_efficient
def bn_function(self, inputs: list[Tensor]) -> Tensor:
concated_features = torch.cat(inputs, 1)
bottleneck_output = self.conv1(self.relu1(self.norm1(concated_features))) # noqa: T484
return bottleneck_output
# todo: rewrite when torchscript supports any
def any_requires_grad(self, input: list[Tensor]) -> bool:
for tensor in input:
if tensor.requires_grad:
return True
return False
@torch.jit.unused # noqa: T484
def call_checkpoint_bottleneck(self, input: list[Tensor]) -> Tensor:
def closure(*inputs):
return self.bn_function(inputs)
return cp.checkpoint(closure, *input, use_reentrant=False)
@torch.jit._overload_method # noqa: F811
def forward(self, input: list[Tensor]) -> Tensor: # noqa: F811
pass
@torch.jit._overload_method # noqa: F811
def forward(self, input: Tensor) -> Tensor: # noqa: F811
pass
# torchscript does not yet support *args, so we overload method
# allowing it to take either a List[Tensor] or single Tensor
def forward(self, input: Tensor) -> Tensor: # noqa: F811
if isinstance(input, Tensor):
prev_features = [input]
else:
prev_features = input
if self.memory_efficient and self.any_requires_grad(prev_features):
if torch.jit.is_scripting():
raise Exception("Memory Efficient not supported in JIT")
bottleneck_output = self.call_checkpoint_bottleneck(prev_features)
else:
bottleneck_output = self.bn_function(prev_features)
new_features = self.conv2(self.relu2(self.norm2(bottleneck_output)))
if self.drop_rate > 0:
new_features = F.dropout(new_features, p=self.drop_rate, training=self.training)
return new_features
# 入力値を結合しながら、下位層に渡し続ける。
class _DenseBlock(nn.ModuleDict):
_version = 2
def __init__(
self,
num_layers: int,
num_input_features: int,
bn_size: int,
growth_rate: int,
drop_rate: float,
memory_efficient: bool = False,
) -> None:
super().__init__()
for i in range(num_layers):
layer = _DenseLayer(
# growth_rate をかけて勾配爆発を抑える。
num_input_features + i * growth_rate,
growth_rate=growth_rate,
bn_size=bn_size,
drop_rate=drop_rate,
# Trueの場合、中間結果を削減する(メモリ消費削減)
memory_efficient=memory_efficient,
)
self.add_module("denselayer%d" % (i + 1), layer)
def forward(self, init_features: Tensor) -> Tensor:
# ここ入力値をそのまま結合をして引き渡している。
features = [init_features]
for name, layer in self.items():
new_features = layer(features)
features.append(new_features)
# このfeatures リストには入力値がそのまま残っている。
return torch.cat(features, 1)
# チャンネル数を削減しつつ、平均プーリングで圧縮。
class _Transition(nn.Sequential):
def __init__(self, num_input_features: int, num_output_features: int) -> None:
super().__init__()
self.norm = nn.BatchNorm2d(num_input_features)
self.relu = nn.ReLU(inplace=True)
self.conv = nn.Conv2d(num_input_features, num_output_features, kernel_size=1, stride=1, bias=False)
self.pool = nn.AvgPool2d(kernel_size=2, stride=2)
class DenseNet(nn.Module):
r"""Densenet-BC model class, based on
`"Densely Connected Convolutional Networks" <https://arxiv.org/pdf/1608.06993.pdf>`_.
Args:
growth_rate (int) - how many filters to add each layer (`k` in paper)
block_config (list of 4 ints) - how many layers in each pooling block
num_init_features (int) - the number of filters to learn in the first convolution layer
bn_size (int) - multiplicative factor for number of bottle neck layers
(i.e. bn_size * k features in the bottleneck layer)
drop_rate (float) - dropout rate after each dense layer
num_classes (int) - number of classification classes
memory_efficient (bool) - If True, uses checkpointing. Much more memory efficient,
but slower. Default: *False*. See `"paper" <https://arxiv.org/pdf/1707.06990.pdf>`_.
"""
def __init__(
self,
growth_rate: int = 32,
block_config: tuple[int, int, int, int] = (6, 12, 24, 16),
num_init_features: int = 64,
bn_size: int = 4,
drop_rate: float = 0,
num_classes: int = 1000,
memory_efficient: bool = False,
) -> None:
super().__init__()
_log_api_usage_once(self)
# First convolution
self.features = nn.Sequential(
OrderedDict(
[
("conv0", nn.Conv2d(3, num_init_features, kernel_size=7, stride=2, padding=3, bias=False)),
("norm0", nn.BatchNorm2d(num_init_features)),
("relu0", nn.ReLU(inplace=True)),
("pool0", nn.MaxPool2d(kernel_size=3, stride=2, padding=1)),
]
)
)
# Each denseblock
num_features = num_init_features
for i, num_layers in enumerate(block_config):
block = _DenseBlock(
num_layers=num_layers,
num_input_features=num_features,
bn_size=bn_size,
growth_rate=growth_rate,
drop_rate=drop_rate,
memory_efficient=memory_efficient,
)
self.features.add_module("denseblock%d" % (i + 1), block)
num_features = num_features + num_layers * growth_rate
if i != len(block_config) - 1:
trans = _Transition(num_input_features=num_features, num_output_features=num_features // 2)
self.features.add_module("transition%d" % (i + 1), trans)
num_features = num_features // 2
# Final batch norm
self.features.add_module("norm5", nn.BatchNorm2d(num_features))
# Linear layer
self.classifier = nn.Linear(num_features, num_classes)
# Official init from torch repo.
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.constant_(m.bias, 0)
def forward(self, x: Tensor) -> Tensor:
features = self.features(x)
out = F.relu(features, inplace=True)
out = F.adaptive_avg_pool2d(out, (1, 1))
out = torch.flatten(out, 1)
out = self.classifier(out)
return out
def _load_state_dict(model: nn.Module, weights: WeightsEnum, progress: bool) -> None:
# '.'s are no longer allowed in module names, but previous _DenseLayer
# has keys 'norm.1', 'relu.1', 'conv.1', 'norm.2', 'relu.2', 'conv.2'.
# They are also in the checkpoints in model_urls. This pattern is used
# to find such keys.
pattern = re.compile(
r"^(.*denselayer\d+\.(?:norm|relu|conv))\.((?:[12])\.(?:weight|bias|running_mean|running_var))$"
)
state_dict = weights.get_state_dict(progress=progress, check_hash=True)
for key in list(state_dict.keys()):
res = pattern.match(key)
if res:
new_key = res.group(1) + res.group(2)
state_dict[new_key] = state_dict[key]
del state_dict[key]
model.load_state_dict(state_dict)
def _densenet(
growth_rate: int,
block_config: tuple[int, int, int, int],
num_init_features: int,
weights: Optional[WeightsEnum],
progress: bool,
**kwargs: Any,
) -> DenseNet:
if weights is not None:
_ovewrite_named_param(kwargs, "num_classes", len(weights.meta["categories"]))
model = DenseNet(growth_rate, block_config, num_init_features, **kwargs)
if weights is not None:
_load_state_dict(model=model, weights=weights, progress=progress)
return model
_COMMON_META = {
"min_size": (29, 29),
"categories": _IMAGENET_CATEGORIES,
"recipe": "https://github.com/pytorch/vision/pull/116",
"_docs": """These weights are ported from LuaTorch.""",
}
DenseNetはResNetと違って足し算ではなく結合によって入力値を下位層に伝播する。
ResNetの足し算を使用した方法は、
y = F(x) + x
のように、その値は完全に残る。故に勾配消失は絶対に起こり得ない。
が、入力値をそのまま利用できるか?という点では問題がある。
DenseNetでは
[x0, x1, x2, ...] → 分離されたまま保持
このように配列として残すことで、どのような方法でも再利用が可能になる。
ただ、このように値をそのまま残して保存をする方法はチャンネル数が線形的に増加していく。
そこで growth_rate で制御をしている。勾配爆発を防ぐ。
num_input_features + i * growth_rate
BottleneckではGoogLeNetと同様に 1x1畳み込みを使って チャンネル数削減。
Transitionでも1x1畳み込み+平均プーリングを使ってチャンネル数削減+空間圧縮をしている。
また、入力値をそのまま引き継ぐという点でメモリ消費量が増えてしまうため、memory_efficient で計算の中間結果を残さないようにしている。
DenseNetではこれほどの工夫を施したが、ResNetのほうが実運用上のコストが低く、扱いやすかったため主流にはならなかった。
現代GPUにおいて、計算は早いがメモリ帯域は相対的に遅い。メモリアクセスが支配的なDenseNetが流行しないのはハードウェア上の制限によるものでもある。
更に、DenseNetは層が深くなればなるほど、メモリに残しておくべき値が増大していく。中間活性を保存しないmemory_efficientもあるが、逆伝播での計算は増えるため結果的に学習時間はのびてしまう。
まとめると ResNet VS DenseNet は以下のようになる。
- ResNetの構造はシンプルで扱いやすい
- DenseNetはGPU処理性能よりもメモリ使用が多いため、ハードウェアの制限上の問題から扱いづらい
- 入力値をそのまま残すというコンセプトは良いが、結果的にハードウェアコストが高くなってしまう
- 精度を上げることに成功したが、結果的に支払うコストのほうが高いため、ResNetよりも普及はしなかった。
最小コストである程度の深さと賢さを手に入れるResNet、入力値をそのまま残して最大限のコストを支払って賢くするDenseNet
どちらが、普及しやすいかは明らかである。
この限界から、次のEfficientNetでは単純に層を深くすれば良い、幅(チャンネル数)を広げれば良い、解像度を上げれば良いという問題ではなく、効率化が推進された。
MobileNet
MobileNet は
- GoogLeNet(計算削減)
- ResNet(勾配安定)
- DenseNet(特徴再利用)
この3点を踏まえた上で、モバイル端末で動作するレベルまで計算コストを最小化している。
# このInvertedResidual が後のEfficientNetのMBConvの原型である。
# necessary for backwards compatibility
class InvertedResidual(nn.Module):
def __init__(
self, inp: int, oup: int, stride: int, expand_ratio: int, norm_layer: Optional[Callable[..., nn.Module]] = None
) -> None:
super().__init__()
self.stride = stride
if stride not in [1, 2]:
raise ValueError(f"stride should be 1 or 2 instead of {stride}")
if norm_layer is None:
norm_layer = nn.BatchNorm2d
#
hidden_dim = int(round(inp * expand_ratio))
self.use_res_connect = self.stride == 1 and inp == oup
layers: list[nn.Module] = []
if expand_ratio != 1:
# pw
layers.append(
Conv2dNormActivation(inp, hidden_dim, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.ReLU6)
)
layers.extend(
[
# dw
Conv2dNormActivation(
hidden_dim,
hidden_dim,
stride=stride,
groups=hidden_dim,
norm_layer=norm_layer,
activation_layer=nn.ReLU6,
),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
norm_layer(oup),
]
)
self.conv = nn.Sequential(*layers)
self.out_channels = oup
self._is_cn = stride > 1
def forward(self, x: Tensor) -> Tensor:
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
# MobileNet 本体。
class MobileNetV2(nn.Module):
def __init__(
self,
num_classes: int = 1000,
width_mult: float = 1.0,
inverted_residual_setting: Optional[list[list[int]]] = None,
round_nearest: int = 8,
block: Optional[Callable[..., nn.Module]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None,
dropout: float = 0.2,
) -> None:
"""
MobileNet V2 main class
Args:
num_classes (int): Number of classes
width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
inverted_residual_setting: Network structure
round_nearest (int): Round the number of channels in each layer to be a multiple of this number
Set to 1 to turn off rounding
block: Module specifying inverted residual building block for mobilenet
norm_layer: Module specifying the normalization layer to use
dropout (float): The droupout probability
"""
super().__init__()
_log_api_usage_once(self)
if block is None:
block = InvertedResidual
if norm_layer is None:
norm_layer = nn.BatchNorm2d
input_channel = 32
last_channel = 1280
if inverted_residual_setting is None:
inverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1],
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
]
# only check the first element, assuming user knows t,c,n,s are required
if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
raise ValueError(
f"inverted_residual_setting should be non-empty or a 4-element list, got {inverted_residual_setting}"
)
# building first layer
input_channel = _make_divisible(input_channel * width_mult, round_nearest)
self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
features: list[nn.Module] = [
Conv2dNormActivation(3, input_channel, stride=2, norm_layer=norm_layer, activation_layer=nn.ReLU6)
]
# building inverted residual blocks
for t, c, n, s in inverted_residual_setting:
output_channel = _make_divisible(c * width_mult, round_nearest)
for i in range(n):
stride = s if i == 0 else 1
features.append(block(input_channel, output_channel, stride, expand_ratio=t, norm_layer=norm_layer))
input_channel = output_channel
# building last several layers
features.append(
Conv2dNormActivation(
input_channel, self.last_channel, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.ReLU6
)
)
# make it nn.Sequential
self.features = nn.Sequential(*features)
# building classifier
self.classifier = nn.Sequential(
nn.Dropout(p=dropout),
nn.Linear(self.last_channel, num_classes),
)
# weight initialization
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.zeros_(m.bias)
def _forward_impl(self, x: Tensor) -> Tensor:
# This exists since TorchScript doesn't support inheritance, so the superclass method
# (this one) needs to have a name other than `forward` that can be accessed in a subclass
x = self.features(x)
# Cannot use "squeeze" as batch-size can be 1
x = nn.functional.adaptive_avg_pool2d(x, (1, 1))
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
_COMMON_META = {
"num_params": 3504872,
"min_size": (1, 1),
"categories": _IMAGENET_CATEGORIES,
}
InvertedResidual は以下の構造になっている。
(1) 1x1 expand(広げる)
(2) depthwise conv
(3) 1x1 project(戻す)
(+ skip connection)
GoogLeNet では、 1x1でチャンネル数を削減してから畳み込みをして、チャンネル数をもとに戻していた。
が、このInvertedResidual は 1x1 でチャンネル数を増やしてから、depthwise Conv (畳み込み)を行い、1x1でもとに戻している。
Depthwise Conv とはチャンネルごとに独立して畳み込みを行う手法。従来の畳み込みは、チャンネル間をまたいだ畳み込みを行っている。このDepthwise Conv はチャンネルごとに畳み込みを行っているため、従来型の畳込みより遥かに計算コストは低い。
つまり
- 1x1 チャンネル方向に増やす
- depthwise conv 空間方向に畳み込み(※チャンネルごとに独立して行う)
- 1x1 チャンネル方向に元に戻す
という工程を経ている。
一見、GoogLeNetの思想とは異なるように見えるが、トータルで計算コストを下げるため1x1畳み込みを利用してチャンネル数を調整するという考え方は同じである。
この計算工程が、後のEfficientNetのMBConv の源流となっている。あえてチャンネル方向に広げ、depthwise で空間にのみ畳み込みを行い、そして1x1で元に戻す。
InvertedResidual のスキップ接続は、入力と出力のサイズ(チャンネル数、解像度)が完全に一致しているときだけスキップ接続を行う。
EfficientNet
# 分離型の設定
class MBConvConfig(_MBConvConfig):
# Stores information listed at Table 1 of the EfficientNet paper & Table 4 of the EfficientNetV2 paper
def __init__(
self,
expand_ratio: float,
kernel: int,
stride: int,
input_channels: int,
out_channels: int,
num_layers: int,
width_mult: float = 1.0,
depth_mult: float = 1.0,
block: Optional[Callable[..., nn.Module]] = None,
) -> None:
# Compound Scaling(複合スケーリング)
# width_mult: チャンネル数を何倍にするか。
# depth_mult: ブロックの繰り返し回数を何倍にするか。
input_channels = self.adjust_channels(input_channels, width_mult)
out_channels = self.adjust_channels(out_channels, width_mult)
num_layers = self.adjust_depth(num_layers, depth_mult)
if block is None:
block = MBConv
super().__init__(expand_ratio, kernel, stride, input_channels, out_channels, num_layers, block)
@staticmethod
def adjust_depth(num_layers: int, depth_mult: float):
return int(math.ceil(num_layers * depth_mult))
# 統合型の設定
class FusedMBConvConfig(_MBConvConfig):
# Stores information listed at Table 4 of the EfficientNetV2 paper
def __init__(
self,
expand_ratio: float,
kernel: int,
stride: int,
input_channels: int,
out_channels: int,
num_layers: int,
block: Optional[Callable[..., nn.Module]] = None,
) -> None:
if block is None:
block = FusedMBConv
super().__init__(expand_ratio, kernel, stride, input_channels, out_channels, num_layers, block)
"""
1×1 conv(拡張)
depthwise conv(空間処理)
1×1 conv(圧縮)
非線形(SiLU)
SE(場合による)
"""
# 分離型
class MBConv(nn.Module):
def __init__(
self,
cnf: MBConvConfig,
stochastic_depth_prob: float,
norm_layer: Callable[..., nn.Module],
se_layer: Callable[..., nn.Module] = SqueezeExcitation,
) -> None:
super().__init__()
if not (1 <= cnf.stride <= 2):
raise ValueError("illegal stride value")
self.use_res_connect = cnf.stride == 1 and cnf.input_channels == cnf.out_channels
layers: list[nn.Module] = []
# X x SigmoidのSiLU関数を使っている。小さい負の値は弱く残すことで勾配がなめらかに伝わる。ReLUでは負の値がすべて死ぬ。
activation_layer = nn.SiLU
# 1x1 畳み込みでチャンネル数を増やす。(後続の depthwise のため)
# expand
expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio)
if expanded_channels != cnf.input_channels:
layers.append(
Conv2dNormActivation(
cnf.input_channels,
expanded_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# depthwiseはチャンネル間の混合をせず、計算コストは低いが表現力が弱い。そこで活性化関数にSiLUを使用している。
# depthwise
layers.append(
Conv2dNormActivation(
expanded_channels,
expanded_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
groups=expanded_channels,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# ここが、MobileNet のInvertedResidual との違い。
# depthwise ではチャンネル間の混合、チャンネル数を増やすことはしないため
# 全チャンネルを俯瞰して、チャンネルごとの重要度を学習する。
# squeeze and excitation
squeeze_channels = max(1, cnf.input_channels // 4)
layers.append(se_layer(expanded_channels, squeeze_channels, activation=partial(nn.SiLU, inplace=True)))
# 1x1 畳み込みでチャンネル数を元に戻す。
# project
layers.append(
Conv2dNormActivation(
expanded_channels, cnf.out_channels, kernel_size=1, norm_layer=norm_layer, activation_layer=None
)
)
self.block = nn.Sequential(*layers)
self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row")
self.out_channels = cnf.out_channels
def forward(self, input: Tensor) -> Tensor:
result = self.block(input)
if self.use_res_connect:
result = self.stochastic_depth(result)
result += input
return result
# 統合型
# 通常のMBConvでdepthwiseを行い、結果を統合して1つの3x3畳み込みに置き換える。
class FusedMBConv(nn.Module):
def __init__(
self,
cnf: FusedMBConvConfig,
stochastic_depth_prob: float,
norm_layer: Callable[..., nn.Module],
) -> None:
super().__init__()
if not (1 <= cnf.stride <= 2):
raise ValueError("illegal stride value")
self.use_res_connect = cnf.stride == 1 and cnf.input_channels == cnf.out_channels
layers: list[nn.Module] = []
# X x SigmoidのSiLU関数を使っている。小さい負の値は弱く残すことで勾配がなめらかに伝わる。ReLUでは負の値がすべて死ぬ。
activation_layer = nn.SiLU
expanded_channels = cnf.adjust_channels(cnf.input_channels, cnf.expand_ratio)
if expanded_channels != cnf.input_channels:
# EfficientNet の高速化手法
# Depthwise Conv は計算量が少ないもののGPUでのメモリ読み出し効率が悪い。
# そこで、ネットワークの初期そうではあえて通常の畳み込みに統合している。
# fused expand
layers.append(
Conv2dNormActivation(
cnf.input_channels,
expanded_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
# project
layers.append(
Conv2dNormActivation(
expanded_channels, cnf.out_channels, kernel_size=1, norm_layer=norm_layer, activation_layer=None
)
)
else:
layers.append(
Conv2dNormActivation(
cnf.input_channels,
cnf.out_channels,
kernel_size=cnf.kernel,
stride=cnf.stride,
norm_layer=norm_layer,
activation_layer=activation_layer,
)
)
self.block = nn.Sequential(*layers)
self.stochastic_depth = StochasticDepth(stochastic_depth_prob, "row")
self.out_channels = cnf.out_channels
def forward(self, input: Tensor) -> Tensor:
result = self.block(input)
if self.use_res_connect:
result = self.stochastic_depth(result)
result += input
return result
class EfficientNet(nn.Module):
def __init__(
self,
inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]],
dropout: float,
stochastic_depth_prob: float = 0.2,
num_classes: int = 1000,
norm_layer: Optional[Callable[..., nn.Module]] = None,
last_channel: Optional[int] = None,
) -> None:
"""
EfficientNet V1 and V2 main class
Args:
inverted_residual_setting (Sequence[Union[MBConvConfig, FusedMBConvConfig]]): Network structure
dropout (float): The droupout probability
stochastic_depth_prob (float): The stochastic depth probability
num_classes (int): Number of classes
norm_layer (Optional[Callable[..., nn.Module]]): Module specifying the normalization layer to use
last_channel (int): The number of channels on the penultimate layer
"""
super().__init__()
_log_api_usage_once(self)
if not inverted_residual_setting:
raise ValueError("The inverted_residual_setting should not be empty")
elif not (
isinstance(inverted_residual_setting, Sequence)
and all([isinstance(s, _MBConvConfig) for s in inverted_residual_setting])
):
raise TypeError("The inverted_residual_setting should be List[MBConvConfig]")
if norm_layer is None:
norm_layer = nn.BatchNorm2d
layers: list[nn.Module] = []
# building first layer
firstconv_output_channels = inverted_residual_setting[0].input_channels
layers.append(
Conv2dNormActivation(
3, firstconv_output_channels, kernel_size=3, stride=2, norm_layer=norm_layer, activation_layer=nn.SiLU
)
)
# building inverted residual blocks
total_stage_blocks = sum(cnf.num_layers for cnf in inverted_residual_setting)
stage_block_id = 0
for cnf in inverted_residual_setting:
stage: list[nn.Module] = []
for _ in range(cnf.num_layers):
# copy to avoid modifications. shallow copy is enough
block_cnf = copy.copy(cnf)
# overwrite info if not the first conv in the stage
if stage:
block_cnf.input_channels = block_cnf.out_channels
block_cnf.stride = 1
# adjust stochastic depth probability based on the depth of the stage block
sd_prob = stochastic_depth_prob * float(stage_block_id) / total_stage_blocks
stage.append(block_cnf.block(block_cnf, sd_prob, norm_layer))
stage_block_id += 1
layers.append(nn.Sequential(*stage))
# building last several layers
lastconv_input_channels = inverted_residual_setting[-1].out_channels
lastconv_output_channels = last_channel if last_channel is not None else 4 * lastconv_input_channels
layers.append(
Conv2dNormActivation(
lastconv_input_channels,
lastconv_output_channels,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.SiLU,
)
)
self.features = nn.Sequential(*layers)
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.classifier = nn.Sequential(
nn.Dropout(p=dropout, inplace=True),
nn.Linear(lastconv_output_channels, num_classes),
)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
init_range = 1.0 / math.sqrt(m.out_features)
nn.init.uniform_(m.weight, -init_range, init_range)
nn.init.zeros_(m.bias)
def _forward_impl(self, x: Tensor) -> Tensor:
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
def _efficientnet(
inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]],
dropout: float,
last_channel: Optional[int],
weights: Optional[WeightsEnum],
progress: bool,
**kwargs: Any,
) -> EfficientNet:
if weights is not None:
_ovewrite_named_param(kwargs, "num_classes", len(weights.meta["categories"]))
model = EfficientNet(inverted_residual_setting, dropout, last_channel=last_channel, **kwargs)
if weights is not None:
model.load_state_dict(weights.get_state_dict(progress=progress, check_hash=True))
return model
def _efficientnet_conf(
arch: str,
**kwargs: Any,
) -> tuple[Sequence[Union[MBConvConfig, FusedMBConvConfig]], Optional[int]]:
inverted_residual_setting: Sequence[Union[MBConvConfig, FusedMBConvConfig]]
if arch.startswith("efficientnet_b"):
bneck_conf = partial(MBConvConfig, width_mult=kwargs.pop("width_mult"), depth_mult=kwargs.pop("depth_mult"))
inverted_residual_setting = [
bneck_conf(1, 3, 1, 32, 16, 1),
bneck_conf(6, 3, 2, 16, 24, 2),
bneck_conf(6, 5, 2, 24, 40, 2),
bneck_conf(6, 3, 2, 40, 80, 3),
bneck_conf(6, 5, 1, 80, 112, 3),
bneck_conf(6, 5, 2, 112, 192, 4),
bneck_conf(6, 3, 1, 192, 320, 1),
]
last_channel = None
elif arch.startswith("efficientnet_v2_s"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 24, 24, 2),
FusedMBConvConfig(4, 3, 2, 24, 48, 4),
FusedMBConvConfig(4, 3, 2, 48, 64, 4),
MBConvConfig(4, 3, 2, 64, 128, 6),
MBConvConfig(6, 3, 1, 128, 160, 9),
MBConvConfig(6, 3, 2, 160, 256, 15),
]
last_channel = 1280
elif arch.startswith("efficientnet_v2_m"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 24, 24, 3),
FusedMBConvConfig(4, 3, 2, 24, 48, 5),
FusedMBConvConfig(4, 3, 2, 48, 80, 5),
MBConvConfig(4, 3, 2, 80, 160, 7),
MBConvConfig(6, 3, 1, 160, 176, 14),
MBConvConfig(6, 3, 2, 176, 304, 18),
MBConvConfig(6, 3, 1, 304, 512, 5),
]
last_channel = 1280
elif arch.startswith("efficientnet_v2_l"):
inverted_residual_setting = [
FusedMBConvConfig(1, 3, 1, 32, 32, 4),
FusedMBConvConfig(4, 3, 2, 32, 64, 7),
FusedMBConvConfig(4, 3, 2, 64, 96, 7),
MBConvConfig(4, 3, 2, 96, 192, 10),
MBConvConfig(6, 3, 1, 192, 224, 19),
MBConvConfig(6, 3, 2, 224, 384, 25),
MBConvConfig(6, 3, 1, 384, 640, 7),
]
last_channel = 1280
else:
raise ValueError(f"Unsupported model type {arch}")
return inverted_residual_setting, last_channel
_COMMON_META: dict[str, Any] = {
"categories": _IMAGENET_CATEGORIES,
}
_COMMON_META_V1 = {
**_COMMON_META,
"min_size": (1, 1),
"recipe": "https://github.com/pytorch/vision/tree/main/references/classification#efficientnet-v1",
}
_COMMON_META_V2 = {
**_COMMON_META,
"min_size": (33, 33),
"recipe": "https://github.com/pytorch/vision/tree/main/references/classification#efficientnet-v2",
}
MBConvは何をしているのか?
端的に言うと、
- 1x1畳み込みで 一旦チャンネル数(特徴)を増やし、
- depthwise で空間方向にのみ畳み込みを行い
- squeeze and excitation で全チャンネル内で重要度を振り分け
- 1x1 畳み込みでチャンネル数を元に戻す(重要でないチャンネルの排除)
もっと端的に言うと、考えられる特徴という特徴を挙げ、不必要な特徴は無視して学習をし続ける仕組み。
しかもdepthwise は通常の畳み込みとは違いチャンネル方向の混合や増減はさせない。空間方向にのみ計算を行うのでコストは非常に少なくて済む。
このように、チャンネル方向の計算と空間方向の計算を分離して行うので、MBConvは分離型とされている。
FusedMBConv は何をしているのか?
通常の3x3畳み込みを使い、チャンネルと空間をまとめて畳み込みしている。そのため統合型とされている。
そもそもMBConvは計算コストは低いが、層が細かいためデータをロード→計算→保存というサイクルを繰り返す必要がある。この点においてIOバウンドの待ち時間がかかる。
一方で統合型は1回の処理で計算を行うためGPUコアをフル活用できる。計算量自体は増えるが、計算を1回で終わらせることができるため、バラバラに計算をしていくよりも早く終わる。
流れをまとめると
- MBConv : 重要とされる特徴を見つける工程
- FusedMBConv : 見つけた重要な特徴を一気に計算する工程。
IOバウンドが増える MBConv はスマホなどのGPUパワーの小さいエッジデバイスにおいては有効。しかし、IOバウンドがあまりにも多いとパワーのあるGPUの場合は持て余してしまう。
- 統合型でパラメータを増やしてGPUをフルに使い、メモリ効率を優先するか
- 分離型でパラメータをあまり増やさずメモリも適度に使いつつ、GPUの負荷を軽減するか
この二者択一である。
またEfficientNetV2には他にも工夫があり、最初の数ステージはIOバウンドが支配的なため統合型を利用して、後半は通常のMBConvを使うという方法を取っている。