计算机视觉目标检测——DETR代码解读

摘要

本周主要对DETR(End-to-End Object Detection with Transformers)的代码实现进行了详细解读和学习。DETR是首个将目标检测任务转化为集合预测问题的端到端框架,通过Transformer的全局建模能力,显著简化了检测器的设计。在代码解读中,重点分析了DETR的整体架构,包括CNN特征提取、正余弦位置编码、Transformer的Encoder-Decoder结构和基于集合预测的损失函数。Transformer模块通过多头注意力机制构建全局上下文关系,而损失函数利用匈牙利算法实现预测框与真实框的一对一匹配,避免了传统方法中的NMS后处理。代码展示了DETR在架构上的高效性与简洁性。

Abstract

This week’s work focused on a comprehensive analysis of the code implementation for DETR (End-to-End Object Detection with Transformers). DETR represents a pioneering framework that reformulates object detection as a set prediction task, eliminating the need for post-processing steps like NMS. The study explored the overall architecture of DETR, including CNN-based feature extraction, sine positional encoding, the Transformer-based encoder-decoder structure, and the set prediction loss function. The Transformer module leverages multi-head attention mechanisms to establish global contextual relationships, while the loss function employs the Hungarian algorithm to achieve one-to-one matching between predicted and ground-truth boxes. The code demonstrates the efficiency and simplicity of DETR’s design.

一、DETR代码解读

1. 整体代码

1.1 整体架构

先回顾一下DETR的整体框架:

image-20241228203208744

DETR的网络结构主要由三部分组成:CNN骨干网、Transformer编码器和解码器,以及输出层。

  1. CNN骨干网:负责从输入图像中提取特征图。DETR通常使用ResNet等深度卷积神经网络作为骨干网,通过一系列的卷积和池化操作,将输入图像转换为高维特征图。
  2. Transformer编码器:用于对CNN骨干网提取的特征图进行编码。编码器由多个Transformer block组成,每个block包含自注意力层和前馈神经网络层。通过多层自注意力机制,编码器能够捕获特征图中不同位置之间的全局上下文关系,生成一组特征向量。
  3. Transformer解码器:解码器是DETR实现集合预测的关键部分。它接受编码器的输出和一组可学习的目标查询(object queries),通过自注意力机制和编码器-解码器注意力机制,将目标查询解码为一系列目标的边界框坐标和类别标签。
  4. 预测头:两个简单的前馈网络( feed forward network,FFN )来进行最终的检测预测。分别用于预测边界框的标准化中心坐标、高度和宽度,以及类别标签。
1.2 基于集合预测的损失函数

DETR在单次通过解码器时推断一个固定大小的有 N 个预测的集合,其中 N 被设置为显著大于图像中典型的物体数量,文中N为100。训练的主要困难之一是在 ground truth 方面对预测对象(类别、位置、大小)进行打分。我们的损失在预测对象和真实对象之间产生一个最佳的二分匹配,然后优化 object-specific ( bounding box ) 的损失。
y y y表示对象的ground truth集合, y ^ = { y ^ i } i = 1 N \hat y = \{ {\hat y_i}\} _{i = 1}^N y^={y^i}i=1N表示有 N N N个预测的集合。假设N远大于图像中物体的个数,我们考虑y也是一个大小为N的被 ( no object ) 填充的集合。为了在这两个集合之间找到一个二分匹配,我们用最低的代价搜索N个元素 σ ∈ ℘ N \sigma \in {\wp _N} σN的一个置换:

σ ^ = arg ⁡ min ⁡ σ ∈ ℘ N ∑ i N L m a t c h ( y i , y ^ σ ( i ) ) \hat \sigma = \mathop {\arg \min }\limits_{\sigma \in {\wp _N}} \sum\limits_i^N {{L_{match}}({y_i},{{\hat y}_{\sigma (i)}})} σ^=σNargminiNLmatch(yi,y^σ(i))

  • L m a t c h ( y i , y ^ σ ( i ) ) {{L_{match}}({y_i},{{\hat y}_{\sigma (i)}})} Lmatch(yi,y^σ(i))是真值 y i {{y_i}} yi和具有索引 σ i {\sigma _i} σi的一个预测之间的一个成对匹配代价(a pair-wise matching cost)。这个最优分配是通过匈牙利算法计算的。匹配代价同时考虑了类预测以及预测框和真实框之间的相似性。

  • 真实集合的每个元素 i i i都可以看成一个 y i = ( c i , b i ) {y_i} = ({c_i},{b_i}) yi=(ci,bi),其中 c i {c_i} ci是目标类标签(目标类标签特可能是 ϕ \phi ϕ)。 b i ∈ [ 0 , 1 ] 4 {b_i} \in {\left[ {0,1} \right]^4} bi[0,1]4是一个向量,它定义了真实框的中心坐标及其相对于图像的高度和宽度。

  • 对于索引 σ i {\sigma _i} σi的预测,我们定义类 c i {c _i} ci的概率为 p ^ σ ( i ) ( c i ) {{\hat p}_{\sigma (i)}}({c_i}) p^σ(i)(ci),预测框为 b ^ σ ( i ) {{\hat b}_{\sigma (i)}} b^σ(i)

利用上述的符号,可以将 L m a t c h ( y i , y ^ σ ( i ) ) {{L_{match}}({y_i},{{\hat y}_{\sigma (i)}})} Lmatch(yi,y^σ(i))定义为:

− 1 { c i ≠ ϕ } p ^ σ ( i ) ( c i ) + 1 { c i ≠ ϕ } L b o x ( b i , b ^ σ ( i ) ) - {1_{\{ {c_i} \ne \phi \} }}{{\hat p}_{\sigma (i)}}({c_i}) + {1_{\{ {c_i} \ne \phi \} }}{L_{box}}({b_i},{{\hat b}_{\sigma (i)}}) 1{ci=ϕ}p^σ(i)(ci)+1{ci=ϕ}Lbox(bi,b^σ(i))

其损失函数是计算上一步中匹配的所有配对的匈牙利损失。我们定义的损失类似于常见目标检测器的损失,即类别预测的负对数box 损失线性组合

image-20241222150525126

其中的 σ ^ {\hat \sigma } σ^是第一步中计算的最优分配。

1.3 论文中的示例代码
import torch
from torch import nn
from torchvision.models import resnet50

class DETR(nn.Module):
    def __init__(self, num_classes, hidden_dim, nheads, num_encoder_layers, num_decoder_layers):
        super().__init__()
        # backbone = resnet50 除掉average pool和fc层  只保留conv1 - conv5_x
        self.backbone = nn.Sequential(*list(resnet50(pretrained=True).children())[:-2])
        # 1x1卷积降维 2048->256
        self.conv = nn.Conv2d(2048, hidden_dim, 1)
        # 6层encoder + 6层decoder    hidden_dim=256  nheads多头注意力机制 8头   num_encoder_layers=num_decoder_layers=6
        self.transformer = nn.Transformer(hidden_dim, nheads, num_encoder_layers, num_decoder_layers)
        # 分类头
        self.linear_class = nn.Linear(hidden_dim, num_classes + 1)
        # 回归头
        self.linear_bbox = nn.Linear(hidden_dim, 4)
        # 位置编码  encoder输入
        self.row_embed = nn.Parameter(torch.rand(50, hidden_dim // 2))
        self.col_embed = nn.Parameter(torch.rand(50, hidden_dim // 2))
        # query pos编码  decoder输入
        self.query_pos = nn.Parameter(torch.rand(100, hidden_dim))


    def forward(self, inputs):
        x = self.backbone(inputs)    # [1,3,800,1066] -> [1,2048,25,34]
        h = self.conv(x)             # [1,2048,25,34] -> [1,256,25,34]
        H, W = h.shape[-2:]          # H=25  W=34
        # pos = [850,1,256]  self.col_embed = [50,128]  self.row_embed[:H]=[50,128]
        pos = torch.cat([self.col_embed[:W].unsqueeze(0).repeat(H, 1, 1),
                        self.row_embed[:H].unsqueeze(1).repeat(1, W, 1),
                        ], dim=-1).flatten(0, 1).unsqueeze(1)
        # encoder输入  decoder输入
        h = self.transformer(pos + h.flatten(2).permute(2, 0, 1), self.query_pos.unsqueeze(1))
        return self.linear_class(h), self.linear_bbox(h).sigmoid()


detr = DETR(num_classes=91, hidden_dim=256, nheads=8, num_encoder_layers=6, num_decoder_layers=6)
detr.eval()
inputs = torch.randn(1, 3, 800, 1066)
logits, bboxes = detr(inputs)
print(logits.shape)   # torch.Size([100, 1, 92])
print(bboxes.shape)   # torch.Size([100, 1, 4])

论文原文给出的代码,mAP只有40,虽然比真实的源码低了2个点,但是代码很简单,只有40多行,方便我们了解整个DETR的网络结构,可以与上述的DETR框架一一对应起来。

2. Backbone代码

整个Backbone主要包括CNN特征提取和位置编码两个部分。

首先是调用models/Backbone.py中的build_backbone函数创建Backbone:

def build_backbone(args):
    # 搭建backbone
    # 位置编码  PositionEmbeddingSine()
    position_embedding = build_position_encoding(args)
    train_backbone = args.lr_backbone > 0   # 是否需要训练backbone  True
    return_interm_layers = args.masks       # 是否需要返回中间层结果 目标检测False  分割True
    # 生成backbone  resnet50
    backbone = Backbone(args.backbone, train_backbone, return_interm_layers, args.dilation)
    # 将backbone输出与位置编码相加   0: backbone   1: PositionEmbeddingSine()
    model = Joiner(backbone, position_embedding)
    model.num_channels = backbone.num_channels   # 512
    return model

这里首先调用build_position_encoding函数生成正余弦位置编码position_embedding:[bs,256,H/32, W/32],其中256前128是y方向位置编码,后128是x方向位置编码;再调用Backbone类生成ResNet50对输入数据进行特征提取得到特征图[bs,2048,H/32, W/32]。最后Joiner将两者合并存储起来,方便后续使用。

2.1 CNN特征提取部分代码

创建ResNet50,先调用Backbone类:

class Backbone(BackboneBase):
    """ResNet backbone with frozen BatchNorm."""
    def __init__(self, name: str,
                 train_backbone: bool,
                 return_interm_layers: bool,
                 dilation: bool):
        # 直接调包 调用torchvision.models中的backbone
        backbone = getattr(torchvision.models, name)(
            replace_stride_with_dilation=[False, False, dilation],
            pretrained=is_main_process(), norm_layer=FrozenBatchNorm2d)
        # resnet50  2048
        num_channels = 512 if name in ('resnet18', 'resnet34') else 2048
        super().__init__(backbone, train_backbone, num_channels, return_interm_layers)

这个类是继承自BackboneBase类的,而且CNN直接调用的就是torchvision.models中的模型,所以直接看BackboneBase类:

class BackboneBase(nn.Module):

    def __init__(self, backbone: nn.Module, train_backbone: bool, num_channels: int, return_interm_layers: bool):
        super().__init__()
        for name, parameter in backbone.named_parameters():
            # layer0 layer1不需要训练 因为前面层提取的信息其实很有限 都是差不多的 不需要训练
            if not train_backbone or 'layer2' not in name and 'layer3' not in name and 'layer4' not in name:
                parameter.requires_grad_(False)
        # False 检测任务不需要返回中间层
        if return_interm_layers:
            return_layers = {"layer1": "0", "layer2": "1", "layer3": "2", "layer4": "3"}
        else:
            return_layers = {'layer4': "0"}
        # 检测任务直接返回layer4即可  执行torchvision.models._utils.IntermediateLayerGetter这个函数可以直接返回对应层的输出结果
        self.body = IntermediateLayerGetter(backbone, return_layers=return_layers)
        self.num_channels = num_channels

    def forward(self, tensor_list: NestedTensor):
        """
        tensor_list: pad预处理之后的图像信息
        tensor_list.tensors: [bs, 3, 608, 810]预处理后的图片数据 对于小图片而言多余部分用0填充
        tensor_list.mask: [bs, 608, 810] 用于记录矩阵中哪些地方是填充的(原图部分值为False,填充部分值为True)
        """
        # 取出预处理后的图片数据 [bs, 3, 608, 810] 输入模型中  输出layer4的输出结果 dict '0'=[bs, 2048, 19, 26]
        xs = self.body(tensor_list.tensors)
        # 保存输出数据
        out: Dict[str, NestedTensor] = {}
        for name, x in xs.items():
            m = tensor_list.mask  # 取出图片的mask [bs, 608, 810] 知道图片哪些区域是有效的 哪些位置是pad之后的无效的
            assert m is not None
            # 通过插值函数知道卷积后的特征的mask  知道卷积后的特征哪些是有效的  哪些是无效的
            # 因为之前图片输入网络是整个图片都卷积计算的 生成的新特征其中有很多区域都是无效的
            mask = F.interpolate(m[None].float(), size=x.shape[-2:]).to(torch.bool)[0]
            # out['0'] = NestedTensor: tensors[bs, 2048, 19, 26] + mask[bs, 19, 26]
            out[name] = NestedTensor(x, mask)
        # out['0'] = NestedTensor: tensors[bs, 2048, 19, 26] + mask[bs, 19, 26]
        return out

这个类还是在调用torchvision.models中的模型,然后再把预处理后的图片数据[bs, 3, 608, 810]和mask数据[bs, 608, 810]输入到模型中(这个图片数据是经过pad填充的数据,而mask数据就是记录这些图片哪些像素位置是pad的,为True,没用pad的真实有效数据就为False)。经过前向传播,再调用IntermediateLayerGetter函数把对应层特征图提取出来,得到原图32倍下采样的特征图[bs, 2048, 19, 26],以及这张特征图对应的mask[bs, 19, 26]。

2.2 位置编码部分代码

这里主要是调用models/position_encoding.py中的build_position_encoding函数创建位置编码:

def build_position_encoding(args):
    """
    创建位置编码
    args: 一系列参数  args.hidden_dim: transformer中隐藏层的维度   args.position_embedding: 位置编码类型 正余弦sine or 可学习learned
    """
    # N_steps = 128 = 256 // 2  backbone输出[bs,256,25,34]  256维度的特征
    # 而传统的位置编码应该也是256维度的, 但是detr用的是一个x方向和y方向的位置编码concat的位置编码方式  这里和ViT有所不同
    # 二维位置编码   前128维代表x方向位置编码  后128维代表y方向位置编码
    N_steps = args.hidden_dim // 2
    if args.position_embedding in ('v2', 'sine'):
        # TODO find a better way of exposing other arguments
        # [bs,256,19,26]  dim=1时  前128个是y方向位置编码  后128个是x方向位置编码
        position_embedding = PositionEmbeddingSine(N_steps, normalize=True)
    elif args.position_embedding in ('v3', 'learned'):
        position_embedding = PositionEmbeddingLearned(N_steps)
    else:
        raise ValueError(f"not supported {args.position_embedding}")

    return position_embedding

可以看到,源码是实现了两种位置编码,一种是正余弦绝对位置编码,不需要额外的参数学习,另一种是可学习绝对位置编码。原论文用的是正余弦绝对位置编码,而且代码也是默认使用这个的,所以这里主要介绍PositionEmbeddingSine类:

class PositionEmbeddingSine(nn.Module):
    """
    Absolute pos embedding, Sine.  没用可学习参数  不可学习  定义好了就固定了
    This is a more standard version of the position embedding, very similar to the one
    used by the Attention is all you need paper, generalized to work on images.
    """
    def __init__(self, num_pos_feats=64, temperature=10000, normalize=False, scale=None):
        super().__init__()
        self.num_pos_feats = num_pos_feats    # 128维度  x/y  = d_model/2
        self.temperature = temperature        # 常数 正余弦位置编码公式里面的10000
        self.normalize = normalize            # 是否对向量进行max规范化   True
        if scale is not None and normalize is False:
            raise ValueError("normalize should be True if scale is passed")
        if scale is None:
            # 这里之所以规范化到2*pi  因为位置编码函数的周期是[2pi, 20000pi]
            scale = 2 * math.pi  # 规范化参数 2*pi
        self.scale = scale

    def forward(self, tensor_list: NestedTensor):
        x = tensor_list.tensors   # [bs, 2048, 19, 26]  预处理后的 经过backbone 32倍下采样之后的数据  对于小图片而言多余部分用0填充
        mask = tensor_list.mask   # [bs, 19, 26]  用于记录矩阵中哪些地方是填充的(原图部分值为False,填充部分值为True)
        assert mask is not None
        not_mask = ~mask   # True的位置才是真实有效的位置

        # 考虑到图像本身是2维的 所以这里使用的是2维的正余弦位置编码
        # 这样各行/列都映射到不同的值 当然有效位置是正常值 无效位置会有重复值 但是后续计算注意力权重会忽略这部分的
        # 而且最后一个数字就是有效位置的总和,方便max规范化
        # 计算此时y方向上的坐标  [bs, 19, 26]
        y_embed = not_mask.cumsum(1, dtype=torch.float32)
        # 计算此时x方向的坐标    [bs, 19, 26]
        x_embed = not_mask.cumsum(2, dtype=torch.float32)

        # 最大值规范化 除以最大值 再乘以2*pi 最终把坐标规范化到0-2pi之间
        if self.normalize:
            eps = 1e-6
            y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale
            x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale

        dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device)   # 0 1 2 .. 127
        # 2i/2i+1: 2 * (dim_t // 2)  self.temperature=10000   self.num_pos_feats = d/2
        dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats)   # 分母

        pos_x = x_embed[:, :, :, None] / dim_t   # 正余弦括号里面的公式
        pos_y = y_embed[:, :, :, None] / dim_t   # 正余弦括号里面的公式
        # x方向位置编码: [bs,19,26,64][bs,19,26,64] -> [bs,19,26,64,2] -> [bs,19,26,128]
        pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3)
        # y方向位置编码: [bs,19,26,64][bs,19,26,64] -> [bs,19,26,64,2] -> [bs,19,26,128]
        pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3)
        # concat: [bs,19,26,128][bs,19,26,128] -> [bs,19,26,256] -> [bs,256,19,26]
        pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2)

        # [bs,256,19,26]  dim=1时  前128个是y方向位置编码  后128个是x方向位置编码
        return pos

可以对照公式:

P E ( p o s , 2 i ) = sin ⁡ ( p o s / 1000 0 2 i / d   m o d   e l ) P{E_{(pos,2i)}} = \sin (pos/{10000^{2i/{d_{\bmod el}}}}) PE(pos,2i)=sin(pos/100002i/dmodel)

P E ( p o s , 2 i + 1 ) = cos ⁡ ( p o s / 1000 0 2 i / d   m o d   e l ) P{E_{(pos,2i + 1)}} = \cos (pos/{10000^{2i/{d_{\bmod el}}}}) PE(pos,2i+1)=cos(pos/100002i/dmodel)

3. Transfomer代码

先看下调用接口:

def build_transformer(args):
    return Transformer(
        d_model=args.hidden_dim,
        dropout=args.dropout,
        nhead=args.nheads,
        dim_feedforward=args.dim_feedforward,
        num_encoder_layers=args.enc_layers,
        num_decoder_layers=args.dec_layers,
        normalize_before=args.pre_norm,
        return_intermediate_dec=True,
    )

直接调用Transformer类:

class Transformer(nn.Module):

    def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
                 num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False,
                 return_intermediate_dec=False):
        super().__init__()
        """
        d_model: 编码器里面mlp(前馈神经网络  2个linear层)的hidden dim 512
        nhead: 多头注意力头数 8
        num_encoder_layers: encoder的层数 6
        num_decoder_layers: decoder的层数 6
        dim_feedforward: 前馈神经网络的维度 2048
        dropout: 0.1
        activation: 激活函数类型 relu
        normalize_before: 是否使用前置LN
        return_intermediate_dec: 是否返回decoder中间层结果  False
        """
        # 初始化一个小encoder
        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
        encoder_norm = nn.LayerNorm(d_model) if normalize_before else None
        # 创建整个Encoder层  6个encoder层堆叠
        self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)

        # 初始化一个小decoder
        decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
        decoder_norm = nn.LayerNorm(d_model)
        # 创建整个Decoder层  6个decoder层堆叠
        self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
                                          return_intermediate=return_intermediate_dec)

        # 参数初始化
        self._reset_parameters()

        self.d_model = d_model  # 编码器里面mlp的hidden dim 512
        self.nhead = nhead      # 多头注意力头数 8

    def _reset_parameters(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def forward(self, src, mask, query_embed, pos_embed):
        """
        src: [bs,256,19,26] 图片输入backbone+1x1conv之后的特征图
        mask: [bs, 19, 26]  用于记录特征图中哪些地方是填充的(原图部分值为False,填充部分值为True)
        query_embed: [100, 256]  类似于传统目标检测里面的anchor  这里设置了100个   需要预测的目标
        pos_embed: [bs, 256, 19, 26]  位置编码
        """
        # bs  c=256  h=19  w=26
        bs, c, h, w = src.shape
        # src: [bs,256,19,26]=[bs,C,H,W] -> [494,bs,256]=[HW,bs,C]
        src = src.flatten(2).permute(2, 0, 1)
        # pos_embed: [bs, 256, 19, 26]=[bs,C,H,W] -> [494,bs,256]=[HW,bs,C]
        pos_embed = pos_embed.flatten(2).permute(2, 0, 1)
        # query_embed: [100, 256]=[num,C] -> [100,bs,256]=[num,bs,256]
        query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1)
        # mask: [bs, 19, 26]=[bs,H,W] -> [bs,494]=[bs,HW]
        mask = mask.flatten(1)

        # tgt: [100, bs, 256] 需要预测的目标query embedding 和 query_embed形状相同  且全设置为0
        #                     在每层decoder层中不断的被refine,相当于一次次的被coarse-to-fine的过程
        tgt = torch.zeros_like(query_embed)
        # memory: [494, bs, 256]=[HW, bs, 256]  Encoder输出  具有全局相关性(增强后)的特征表示
        memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed)
        # [6, 100, bs, 256]
        # tgt:需要预测的目标 query embeding
        # memory: encoder的输出
        # pos: memory的位置编码
        # query_pos: tgt的位置编码
        hs = self.decoder(tgt, memory, memory_key_padding_mask=mask,
                          pos=pos_embed, query_pos=query_embed)
        # decoder输出 [6, 100, bs, 256] -> [6, bs, 100, 256]
        # encoder输出 [bs, 256, H, W]
        return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)

仔细分析这个类会发现,我们虽然暂时不了解模型的细节部分,但是模型的主体框架已经定义出来了。整个Transformer其实就是输入经过Backbone输出的特征图src(降维到256)、src_key_padding_mask(记录特征图每个位置是否是被pad的,pad的就不需要计算注意力)和位置编码pos到TransformerEncoder中,而TransformerEncoder其实是由TransformerEncoderLayer组成的;然后再输入encoder的输出、mask、位置编码和query编码到TransformerEncoder中,而TransformerEncoder是由TransformerDecoderLayer组成的。

所以,下面分为TransformerEncoder和TransformerDecoder两个模块来了解Transformer具体细节组成。

3.1 TransformerEncoder代码

这个部分就是调用_get_clones函数,复制6份TransformerEncoderLayer类,然后前向传播依次输入这6个TransformerEncoderLayer类,不断的计算特征图的自注意力,并不断的增强特征图,最终得到最强的(信息最多的)特征图output:[h*w, bs, 256]。值得注意的是,整个TransformerEncoder过程特征图的shape是不变的。

class TransformerEncoder(nn.Module):

    def __init__(self, encoder_layer, num_layers, norm=None):
        super().__init__()
        # 复制num_layers=6份encoder_layer=TransformerEncoderLayer
        self.layers = _get_clones(encoder_layer, num_layers)
        # 6层TransformerEncoderLayer
        self.num_layers = num_layers
        self.norm = norm  # layer norm

    def forward(self, src,
                mask: Optional[Tensor] = None,
                src_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None):
        """
        src: [h*w, bs, 256]  经过Backbone输出的特征图(降维到256)
        mask: None
        src_key_padding_mask: [h*w, bs]  记录每个特征图的每个位置是否是被pad的(True无效   False有效)
        pos: [h*w, bs, 256] 每个特征图的位置编码
        """
        output = src

        # 遍历这6层TransformerEncoderLayer
        for layer in self.layers:
            output = layer(output, src_mask=mask,
                           src_key_padding_mask=src_key_padding_mask, pos=pos)

        if self.norm is not None:
            output = self.norm(output)

        # 得到最终ENCODER的输出 [h*w, bs, 256]
        return output

def _get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

我们来看一下TransformerEncoderLayer

下图是TransformerEncoderLayer的结构:Encoder Layer = multi-head Attention + add&Norm + feed forward + add&Norm,重点在于multi-head Attention。

image-20241228205228716

class TransformerEncoderLayer(nn.Module):

    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False):
        super().__init__()
        """
        小encoder层  结构:multi-head Attention + add&Norm + feed forward + add&Norm
        d_model: mlp 前馈神经网络的dim
        nhead: 8头注意力机制
        dim_feedforward: 前馈神经网络的维度 2048
        dropout: 0.1
        activation: 激活函数类型
        normalize_before: 是否使用先LN  False
        """
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Implementation of Feedforward model
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        self.activation = _get_activation_fn(activation)
        self.normalize_before = normalize_before

    def with_pos_embed(self, tensor, pos: Optional[Tensor]):
        # 这个操作是把词向量和位置编码相加操作
        return tensor if pos is None else tensor + pos

    def forward_post(self,
                     src,
                     src_mask: Optional[Tensor] = None,
                     src_key_padding_mask: Optional[Tensor] = None,
                     pos: Optional[Tensor] = None):
        """
        src: [494, bs, 256]  backbone输入下采样32倍后 再 压缩维度到256的特征图
        src_mask: None
        src_key_padding_mask: [bs, 494]  记录哪些位置有pad True 没意义 不需要计算attention
        pos: [494, bs, 256]  位置编码
        """
        # 数据 + 位置编码  [494, bs, 256]
        # 这也是和原版encoder不同的地方,这里每个encoder的q和k都会加上位置编码  再用q和k计算相似度  再和v加权得到更具有全局相关性(增强后)的特征表示
        # 每用一层都加上位置编码  信息不断加强  最终得到的特征全局相关性最强  原版的transformer只在输入加上位置编码  作者发现这样更好
        q = k = self.with_pos_embed(src, pos)
        # multi-head attention   [494, bs, 256]
        # q 和 k = backbone输出特征图 + 位置编码
        # v = backbone输出特征图
        # 这里对query和key增加位置编码 是因为需要在图像特征中各个位置之间计算相似度/相关性 而value作为原图像的特征 和 相关性矩阵加权,
        # 从而得到各个位置结合了全局相关性(增强后)的特征表示,所以q 和 k这种计算需要+位置编码  而v代表原图像不需要加位置编码
        # nn.MultiheadAttention: 返回两个值  第一个是自注意力层的输出  第二个是自注意力权重  这里取0
        # key_padding_mask: 记录backbone生成的特征图中哪些是原始图像pad的部分 这部分是没有意义的
        #                   计算注意力会被填充为-inf,这样最终生成注意力经过softmax时输出就趋向于0,相当于忽略不计
        # attn_mask: 是在Transformer中用来“防作弊”的,即遮住当前预测位置之后的位置,忽略这些位置,不计算与其相关的注意力权重
        #            而在encoder中通常为None 不适用  decoder中才使用
        src2 = self.self_attn(q, k, value=src, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
        # add + norm + feed forward + add + norm
        src = src + self.dropout1(src2)
        src = self.norm1(src)
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src

    def forward_pre(self, src,
                    src_mask: Optional[Tensor] = None,
                    src_key_padding_mask: Optional[Tensor] = None,
                    pos: Optional[Tensor] = None):
        src2 = self.norm1(src)
        q = k = self.with_pos_embed(src2, pos)
        src2 = self.self_attn(q, k, value=src2, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        src2 = self.norm2(src)
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src2))))
        src = src + self.dropout2(src2)
        return src

    def forward(self, src,
                src_mask: Optional[Tensor] = None,
                src_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None):
        if self.normalize_before:  # False
            return self.forward_pre(src, src_mask, src_key_padding_mask, pos)
        return self.forward_post(src, src_mask, src_key_padding_mask, pos)  # 默认执行

有几个很关键的点(和原始transformer encoder不同的地方):

  1. 为什么每个encoder的q和k都是+位置编码的?

如果学过transformer的知道,通常都是在transformer的输入加上位置编码,而每个encoder的qkv都是相等的,都是不加位置编码的。而这里先将q和k都会加上位置编码,再用q和k计算相似度,最后和v加权得到更具有全局相关性(增强后)的特征表示。每一层都加上位置编码,每一层全局信息不断加强,最终可以得到最强的全局特征。

  1. 为什么q和k+位置编码,而v不需要加上位置编码?

因为q和k是用来计算图像特征中各个位置之间计算相似度/相关性的,加上位置编码后计算出来的全局特征相关性更强,而v代表原图像,所以并不需要加位置编码。

3.2 TransformerDecoder代码

Decoder结构和Encoder的结构类似,也是用_get_clones复制6份TransformerDecoderLayer类,然后前向传播依次输入这6个TransformerDecoderLayer类,不过不同的,Decoder需要输入这6个TransformerDecoderLayer的输出,后面这6个层的输出会一起参与损失计算。
class TransformerDecoder(nn.Module):

def __init__(self, decoder_layer, num_layers, norm=None, return_intermediate=False):
    super().__init__()
    # 复制num_layers=decoder_layer=TransformerDecoderLayer
    self.layers = _get_clones(decoder_layer, num_layers)
    self.num_layers = num_layers   # 6
    self.norm = norm               # LN
    # 是否返回中间层 默认True  因为DETR默认6个Decoder都会返回结果,一起加入损失计算的
    # 每一层Decoder都是逐层解析,逐层加强的,所以前面层的解析效果对后面层的解析是有意义的,所以作者把前面5层的输出也加入损失计算
    self.return_intermediate = return_intermediate

def forward(self, tgt, memory,
            tgt_mask: Optional[Tensor] = None,
            memory_mask: Optional[Tensor] = None,
            tgt_key_padding_mask: Optional[Tensor] = None,
            memory_key_padding_mask: Optional[Tensor] = None,
            pos: Optional[Tensor] = None,
            query_pos: Optional[Tensor] = None):
    """
    tgt: [100, bs, 256] 需要预测的目标query embedding 和 query_embed形状相同  且全设置为0
                        在每层decoder层中不断的被refine,相当于一次次的被coarse-to-fine的过程
    memory: [h*w, bs, 256]  Encoder输出  具有全局相关性(增强后)的特征表示
    tgt_mask: None
    tgt_key_padding_mask: None
    memory_key_padding_mask: [bs, h*w]  记录Encoder输出特征图的每个位置是否是被pad的(True无效   False有效)
    pos: [h*w, bs, 256]                 特征图的位置编码
    query_pos: [100, bs, 256]    query embedding的位置编码  随机初始化的
    """
    output = tgt   # 初始化query embedding  全是0

    intermediate = []  # 用于存放6层decoder的输出结果

    # 遍历6层decoder
    for layer in self.layers:
        output = layer(output, memory, tgt_mask=tgt_mask,
                       memory_mask=memory_mask,
                       tgt_key_padding_mask=tgt_key_padding_mask,
                       memory_key_padding_mask=memory_key_padding_mask,
                       pos=pos, query_pos=query_pos)
        # 6层结果全部加入intermediate
        if self.return_intermediate:
            intermediate.append(self.norm(output))

    if self.norm is not None:
        output = self.norm(output)
        if self.return_intermediate:
            intermediate.pop()
            intermediate.append(output)
    # 默认执行这里
    # 最后把  6x[100,bs,256] -> [6(6层decoder输出),100,bs,256]
    if self.return_intermediate:
        return torch.stack(intermediate)

    return output.unsqueeze(0)   # 不执行

我们来看一下TransformerDecoderLayer

下图是TransformerDecoderLayer的结构:decoder layer = Masked Multi-Head Attention + Add&Norm + Multi-Head Attention + add&Norm + feed forward + add&Norm。关键点在于两个Attention层,搞懂这两层的原理、区别是理解Decoder的关键。
image-20241228205702554

class TransformerDecoderLayer(nn.Module):

def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
             activation="relu", normalize_before=False):
    super().__init__()
    self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
    self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
    # Implementation of Feedforward model
    self.linear1 = nn.Linear(d_model, dim_feedforward)
    self.dropout = nn.Dropout(dropout)
    self.linear2 = nn.Linear(dim_feedforward, d_model)

    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.norm3 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)
    self.dropout3 = nn.Dropout(dropout)

    self.activation = _get_activation_fn(activation)
    self.normalize_before = normalize_before

def with_pos_embed(self, tensor, pos: Optional[Tensor]):
    return tensor if pos is None else tensor + pos

def forward_post(self, tgt, memory,
                 tgt_mask: Optional[Tensor] = None,
                 memory_mask: Optional[Tensor] = None,
                 tgt_key_padding_mask: Optional[Tensor] = None,
                 memory_key_padding_mask: Optional[Tensor] = None,
                 pos: Optional[Tensor] = None,
                 query_pos: Optional[Tensor] = None):
    """
    tgt: 需要预测的目标 query embedding  负责预测物体  用于建模图像当中的物体信息  在每层decoder层中不断的被refine
         [100, bs, 256]  和 query_embed形状相同  且全设置为0
    memory: [h*w, bs, 256]  Encoder输出  具有全局相关性(增强后)的特征表示
    tgt_mask: None
    memory_mask: None
    tgt_key_padding_mask: None
    memory_key_padding_mask: [bs, h*w]  记录Encoder输出特征图的每个位置是否是被pad的(True无效   False有效)
    pos: [h*w, bs, 256]  encoder输出特征图的位置编码
    query_pos: [100, bs, 256]  query embedding/tgt的位置编码  负责建模物体与物体之间的位置关系  随机初始化的
    tgt_mask、memory_mask、tgt_key_padding_mask是防止作弊的 这里都没有使用
    """
    # 第一个self-attention的目的:找到图像中物体的信息 -> tgt
    # 第一个多头自注意力层:输入qkv都和Encoder无关  都来自于tgt/query embedding
    # 通过第一个self-attention  可以不断建模物体与物体之间的关系  可以知道图像当中哪些位置会存在物体  物体信息->tgt
    # query embedding  +  query_pos
    q = k = self.with_pos_embed(tgt, query_pos)
    # masked multi-head self-attention  计算query embedding的自注意力
    tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask,
                          key_padding_mask=tgt_key_padding_mask)[0]

    # add + norm
    tgt = tgt + self.dropout1(tgt2)
    tgt = self.norm1(tgt)

    # 第二个self-attention的目的:不断增强encoder的输出特征,将物体的信息不断加入encoder的输出特征中去,更好地表征了图像中的各个物体
    # 第二个多头注意力层,也叫Encoder-Decoder self attention:key和value来自Encoder层输出   Query来自Decoder层输入
    # 第二个self-attention 可以建模图像 与 物体之间的关系
    # 根据上一步得到的tgt作为query 不断的去encoder输出的特征图中去问(q和k计算相似度)  问图像当中的物体在哪里呢?
    # 问完之后再将物体的位置信息融合encoder输出的特征图中(和v做运算)  这样我得到的v的特征就有 encoder增强后特征信息 + 物体的位置信息
    # query = query embedding  +  query_pos
    # key = encoder输出特征 + 特征位置编码
    # value = encoder输出特征
    tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos),
                               key=self.with_pos_embed(memory, pos),
                               value=memory, attn_mask=memory_mask,
                               key_padding_mask=memory_key_padding_mask)[0]
    # ada + norm + Feed Forward + add + norm
    tgt = tgt + self.dropout2(tgt2)
    tgt = self.norm2(tgt)
    tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
    tgt = tgt + self.dropout3(tgt2)
    tgt = self.norm3(tgt)

    # [100, bs, 256]
    # decoder的输出是第一个self-attention输出特征 + 第二个self-attention输出特征
    # 最终的特征:知道图像中物体与物体之间的关系 + encoder增强后的图像特征 + 图像与物体之间的关系
    return tgt

def forward_pre(self, tgt, memory,
                tgt_mask: Optional[Tensor] = None,
                memory_mask: Optional[Tensor] = None,
                tgt_key_padding_mask: Optional[Tensor] = None,
                memory_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None,
                query_pos: Optional[Tensor] = None):
    tgt2 = self.norm1(tgt)
    q = k = self.with_pos_embed(tgt2, query_pos)
    tgt2 = self.self_attn(q, k, value=tgt2, attn_mask=tgt_mask,
                          key_padding_mask=tgt_key_padding_mask)[0]
    tgt = tgt + self.dropout1(tgt2)
    tgt2 = self.norm2(tgt)
    tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt2, query_pos),
                               key=self.with_pos_embed(memory, pos),
                               value=memory, attn_mask=memory_mask,
                               key_padding_mask=memory_key_padding_mask)[0]
    tgt = tgt + self.dropout2(tgt2)
    tgt2 = self.norm3(tgt)
    tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt2))))
    tgt = tgt + self.dropout3(tgt2)
    return tgt

def forward(self, tgt, memory,
            tgt_mask: Optional[Tensor] = None,
            memory_mask: Optional[Tensor] = None,
            tgt_key_padding_mask: Optional[Tensor] = None,
            memory_key_padding_mask: Optional[Tensor] = None,
            pos: Optional[Tensor] = None,
            query_pos: Optional[Tensor] = None):
    if self.normalize_before:
        return self.forward_pre(tgt, memory, tgt_mask, memory_mask,
                                tgt_key_padding_mask, memory_key_padding_mask, pos, query_pos)
    return self.forward_post(tgt, memory, tgt_mask, memory_mask,
                             tgt_key_padding_mask, memory_key_padding_mask, pos, query_pos)

Decoder的主要作用:

  1. 从Encoder的最终输出,我们得到了增强版的图像特征memory,以及特征的位置信息pos。
  2. 自定义了图像当中的物体信息tgt,初始化为全0,以及图像中的物体位置信息query_pos,随机初始化。
  3. 第一个self-attention:qk=tgt+query_pos,v=tgt,计算图像中物体与物体的相关性,负责建模图像中的物体信息,最终得到的tgt1,是增强版的物体信息,这些位置信息包含了物体与物体之间的位置关系。
  4. 第二个self-attention:q=tgt+qyery_pos,k=memory+pos,v=memory,以物体的信息tgt作为query,去图像特征memory中去问(计算他们的相关性),问图像中物体在哪里呢?问完之后再将物体的位置信息融入到图像特征中去(v),整个过程是负责建模图像特征与物体特征之间的关系,最后得到的是更强的图像特征tgt2,包括encoder输出的增强版的图像特征+物体的位置特征。
  5. 最后把tgt1 + tgt2 = Encoder输出的增强版图像特征 + 物体信息 + 物体位置信息,作为decoder的输出。

4. 损失计算代码

首先在detr.py中会先定义好损失函数:

criterion = SetCriterion(num_classes, matcher=matcher, weight_dict=weight_dict,eos_coef=args.eos_coef, losses=losses)
criterion.to(device)

然后在engine.py的train_one_epoch中前向推理结束后调用criterion函数,计算损失:

# 前向传播
outputs = model(samples)
# 计算损失  loss_dict: 'loss_ce' + 'loss_bbox' + 'loss_giou'    用于log日志: 'class_error' + 'cardinality_error'
loss_dict = criterion(outputs, targets)
# 权重系数 {'loss_ce': 1, 'loss_bbox': 5, 'loss_giou': 2}
weight_dict = criterion.weight_dict   
# 总损失 = 回归损失:loss_bbox(L1)+loss_bbox  +   分类损失:loss_ce
losses = sum(loss_dict[k] * weight_dict[k] for k in loss_dict.keys() if k in weight_dict)

SetCriterion类的内容为:

class SetCriterion(nn.Module):
    """ This class computes the loss for DETR.
    The process happens in two steps:
        1) we compute hungarian assignment between ground truth boxes and the outputs of the model
        2) we supervise each pair of matched ground-truth / prediction (supervise class and box)
    """
    def __init__(self, num_classes, matcher, weight_dict, eos_coef, losses):
        """ Create the criterion.
        Parameters:
            num_classes: number of object categories, omitting the special no-object category
            matcher: module able to compute a matching between targets and proposals
            weight_dict: dict containing as key the names of the losses and as values their relative weight.
            eos_coef: relative classification weight applied to the no-object category
            losses: list of all the losses to be applied. See get_loss for list of available losses.
        """
        super().__init__()
        self.num_classes = num_classes     # 数据集类别数
        self.matcher = matcher             # HungarianMatcher()  匈牙利算法 二分图匹配
        self.weight_dict = weight_dict     # dict: 18  3x6  6个decoder的损失权重   6*(loss_ce+loss_giou+loss_bbox)
        self.eos_coef = eos_coef           # 0.1
        self.losses = losses               # list: 3  ['labels', 'boxes', 'cardinality']
        empty_weight = torch.ones(self.num_classes + 1)
        empty_weight[-1] = self.eos_coef   # tensro: 92   前91=1  92=eos_coef=0.1
        self.register_buffer('empty_weight', empty_weight)
        
    def forward(self, outputs, targets):
        """ This performs the loss computation.
        Parameters:
             outputs: dict of tensors, see the output specification of the model for the format
                      dict: 'pred_logits'=Tensor[bs, 100, 92个class]  'pred_boxes'=Tensor[bs, 100, 4]  最后一个decoder层输出
                             'aux_output'={list:5}  0-4  每个都是dict:2 pred_logits+pred_boxes 表示5个decoder前面层的输出
             targets: list of dicts, such that len(targets) == batch_size.   list: bs
                      每张图片包含以下信息:'boxes'、'labels'、'image_id'、'area'、'iscrowd'、'orig_size'、'size'
                      The expected keys in each dict depends on the losses applied, see each loss' doc
        """
        # dict: 2   最后一个decoder层输出  pred_logits[bs, 100, 92个class] + pred_boxes[bs, 100, 4]
        outputs_without_aux = {k: v for k, v in outputs.items() if k != 'aux_outputs'}

        # 匈牙利算法  解决二分图匹配问题  从100个预测框中找到和N个gt框一一对应的预测框  其他的100-N个都变为背景
        # Retrieve the matching between the outputs of the last layer and the targets  list:1
        # tuple: 2    0=Tensor3=Tensor[5, 35, 63]  匹配到的3个预测框  其他的97个预测框都是背景
        #             1=Tensor3=Tensor[1, 0, 2]    对应的三个gt框
        indices = self.matcher(outputs_without_aux, targets)

        # Compute the average number of target boxes accross all nodes, for normalization purposes
        num_boxes = sum(len(t["labels"]) for t in targets)   # int 统计这整个batch的所有图片的gt总个数  3
        num_boxes = torch.as_tensor([num_boxes], dtype=torch.float, device=next(iter(outputs.values())).device)
        if is_dist_avail_and_initialized():
            torch.distributed.all_reduce(num_boxes)
        num_boxes = torch.clamp(num_boxes / get_world_size(), min=1).item()   # 3.0

        # 计算最后层decoder损失  Compute all the requested losses
        losses = {}
        for loss in self.losses:
            losses.update(self.get_loss(loss, outputs, targets, indices, num_boxes))

        # 计算前面5层decoder损失  累加到一起  得到最终的losses
        # In case of auxiliary losses, we repeat this process with the output of each intermediate layer.
        if 'aux_outputs' in outputs:
            for i, aux_outputs in enumerate(outputs['aux_outputs']):
                indices = self.matcher(aux_outputs, targets)   # 同样匈牙利算法匹配
                for loss in self.losses:   # 计算各个loss
                    if loss == 'masks':
                        # Intermediate masks losses are too costly to compute, we ignore them.
                        continue
                    kwargs = {}
                    if loss == 'labels':
                        # Logging is enabled only for the last layer
                        kwargs = {'log': False}
                    l_dict = self.get_loss(loss, aux_outputs, targets, indices, num_boxes, **kwargs)
                    l_dict = {k + f'_{i}': v for k, v in l_dict.items()}
                    losses.update(l_dict)
        # 参加权重更新的损失:losses: 'loss_ce' + 'loss_bbox' + 'loss_giou'    用于log日志: 'class_error' + 'cardinality_error'
        return losses

整个函数主要在做两件事:

  1. 调用self.matcher函数,从100个预测框中匹配出N个(gt个数)真正的预测框,匹配出每个gt框对应的预测框。
  2. 调用self.get_loss函数,计算各个损失。
4.1 self.matcher函数

self.matcher函数就是二分图匹配的过程(匈牙利算法),DETR是在models/matcher.py中的HungarianMatcher类实现了匈牙利匹配算法:

class HungarianMatcher(nn.Module):
    """This class computes an assignment between the targets and the predictions of the network

    For efficiency reasons, the targets don't include the no_object. Because of this, in general,
    there are more predictions than targets. In this case, we do a 1-to-1 matching of the best predictions,
    while the others are un-matched (and thus treated as non-objects).
    """

    def __init__(self, cost_class: float = 1, cost_bbox: float = 1, cost_giou: float = 1):
        """Creates the matcher

        Params:
            cost_class: This is the relative weight of the classification error in the matching cost
            cost_bbox: This is the relative weight of the L1 error of the bounding box coordinates in the matching cost
            cost_giou: This is the relative weight of the giou loss of the bounding box in the matching cost
        """
        super().__init__()
        self.cost_class = cost_class
        self.cost_bbox = cost_bbox
        self.cost_giou = cost_giou
        assert cost_class != 0 or cost_bbox != 0 or cost_giou != 0, "all costs cant be 0"
    
    # 不需要更新梯度  只是一种匹配方式
    @torch.no_grad()
    def forward(self, outputs, targets):
        """ Performs the matching

        Params:
            outputs: This is a dict that contains at least these entries:
                 "pred_logits": Tensor of dim [batch_size, num_queries, num_classes]=[bs,100,92] with the classification logits
                 "pred_boxes": Tensor of dim [batch_size, num_queries, 4]=[bs,100,4] with the predicted box coordinates

            targets: list:bs This is a list of targets (len(targets) = batch_size), where each target is a dict containing:
                 "labels": Tensor of dim [num_target_boxes]=[3] (where num_target_boxes is the number of ground-truth
                           objects in the target) containing the class labels
                 "boxes": Tensor of dim [num_target_boxes, 4] containing the target box coordinates

        Returns:
            A list of size batch_size, containing tuples of (index_i, index_j) where:
                - index_i is the indices of the selected predictions (in order)
                - index_j is the indices of the corresponding selected targets (in order)
            For each batch element, it holds:
                len(index_i) = len(index_j) = min(num_queries, num_target_boxes)
        """
        # batch_size  100
        bs, num_queries = outputs["pred_logits"].shape[:2]

        # We flatten to compute the cost matrices in a batch
        # [2,100,92] -> [200, 92] -> [200, 92]概率
        out_prob = outputs["pred_logits"].flatten(0, 1).softmax(-1)  # [batch_size * num_queries, num_classes]
        # [2,100,4] -> [200, 4]
        out_bbox = outputs["pred_boxes"].flatten(0, 1)  # [batch_size * num_queries, 4]

        # Also concat the target labels and boxes
        # [3]  idx = 32, 1, 85  concat all labels
        tgt_ids = torch.cat([v["labels"] for v in targets])
        # [3, 4]  concat all box
        tgt_bbox = torch.cat([v["boxes"] for v in targets])

        # 计算损失   分类 + L1 box + GIOU box
        # Compute the classification cost. Contrary to the loss, we don't use the NLL,
        # but approximate it in 1 - proba[target class].
        # The 1 is a constant that doesn't change the matching, it can be ommitted.
        cost_class = -out_prob[:, tgt_ids]

        # Compute the L1 cost between boxes
        cost_bbox = torch.cdist(out_bbox, tgt_bbox, p=1)

        # Compute the giou cost betwen boxes
        cost_giou = -generalized_box_iou(box_cxcywh_to_xyxy(out_bbox), box_cxcywh_to_xyxy(tgt_bbox))

        # Final cost matrix   [100, 3]  bs*100个预测框分别和3个gt框的损失矩阵
        C = self.cost_bbox * cost_bbox + self.cost_class * cost_class + self.cost_giou * cost_giou
        C = C.view(bs, num_queries, -1).cpu()  # [bs, 100, 3]

        sizes = [len(v["boxes"]) for v in targets]   # gt个数 3

        # 匈牙利算法进行二分图匹配  从100个预测框中挑选出最终的3个预测框 分别和gt计算损失  这个组合的总损失是最小的
        # 0: [3]  5, 35, 63   匹配到的gt个预测框idx
        # 1: [3]  1, 0, 2     对应的gt idx
        indices = [linear_sum_assignment(c[i]) for i, c in enumerate(C.split(sizes, -1))]
        
        # list: bs  返回bs张图片的匹配结果
        # 每张图片都是一个tuple:2
        # 0 = Tensor[gt_num,]  匹配到的正样本idx       1 = Tensor[gt_num,]  gt的idx
        return [(torch.as_tensor(i, dtype=torch.int64), torch.as_tensor(j, dtype=torch.int64)) for i, j in indices]

其实就是先计算每个预测框(100个)和每个gt框的总损失,形成损失矩阵C,然后调用scipy.optimize.linear_sum_assignment写好的匈牙利算法,匹配的原则就是 ”loss总和“ 最小(这里的loss并不是真正的loss,这里只是一种度量方式,和loss的计算方式是不一样的),得到每个gt对应的唯一负责的预测框,其他的预测框会自动归为背景。

linear_sum_assignment,输入一个二分图的度量矩阵(cost 矩阵),计算这个二分图的度量矩阵的最小权重分配方式,返回的是匹配方案对应的矩阵行索引(预测框idx)和列索引(gt框idx)。

4.2 self.get_loss函数

self.get_loss是SetCriterion类定义的一个函数:

    def get_loss(self, loss, outputs, targets, indices, num_boxes, **kwargs):
        loss_map = {
            'labels': self.loss_labels,
            'cardinality': self.loss_cardinality,
            'boxes': self.loss_boxes,
            'masks': self.loss_masks
        }
        assert loss in loss_map, f'do you really want to compute {loss} loss?'
        return loss_map[loss](outputs, targets, indices, num_boxes, **kwargs)

同时会调用分类损失(self.loss_labels)、回归损失(self.boxes)和cardinality损失。不过cardinality损失只用于log,并不参与梯度更新,所以这里不展开叙述,这里重点介绍分类损失和回归损失。

分类损失self.loss_labels:

    def loss_labels(self, outputs, targets, indices, num_boxes, log=True):
        """Classification loss (NLL)
        targets dicts must contain the key "labels" containing a tensor of dim [nb_target_boxes]
        outputs:'pred_logits'=[bs, 100, 92] 'pred_boxes'=[bs, 100, 4] 'aux_outputs'=5*([bs, 100, 92]+[bs, 100, 4])
        targets:'boxes'=[3,4] labels=[3] ...
        indices: [3] 如:5,35,63  匹配好的3个预测框idx
        num_boxes:当前batch的所有gt个数
        """
        assert 'pred_logits' in outputs
        src_logits = outputs['pred_logits']  # 分类:[bs, 100, 92类别]

        # idx tuple:2  0=[num_all_gt] 记录每个gt属于哪张图片  1=[num_all_gt] 记录每个匹配到的预测框的index
        idx = self._get_src_permutation_idx(indices)
        target_classes_o = torch.cat([t["labels"][J] for t, (_, J) in zip(targets, indices)])
        target_classes = torch.full(src_logits.shape[:2], self.num_classes,
                                    dtype=torch.int64, device=src_logits.device)
        # 正样本+负样本  上面匹配到的预测框作为正样本 正常的idx  而100个中没有匹配到的预测框作为负样本(idx=91 背景类)
        target_classes[idx] = target_classes_o

        # 分类损失 = 正样本 + 负样本
        loss_ce = F.cross_entropy(src_logits.transpose(1, 2), target_classes, self.empty_weight)
        losses = {'loss_ce': loss_ce}

        # 日志 记录Top-1精度
        if log:
            # TODO this should probably be a separate loss, not hacked in this one here
            losses['class_error'] = 100 - accuracy(src_logits[idx], target_classes_o)[0]

        # losses: 'loss_ce': 分类损失
        #         'class_error':Top-1精度 即预测概率最大的那个类别与对应被分配的GT类别是否一致  这部分仅用于日志显示 并不参与模型训练
        return losses

    def _get_src_permutation_idx(self, indices):
        # permute predictions following indices
        # [num_all_gt]  记录每个gt都是来自哪张图片的 idx
        batch_idx = torch.cat([torch.full_like(src, i) for i, (src, _) in enumerate(indices)])
        # 记录匹配到的预测框的idx
        src_idx = torch.cat([src for (src, _) in indices])
        return batch_idx, src_idx

上述代码总结为:

  1. 分类损失 = 交叉熵损失。
  2. 正样本+负样本=100,正样本个数=GT个数,负样本个数=100-GT个数。
  3. 92个类别,idx=91表示背景类别。
  4. 注意这里有一个_get_src_permutation_id函数,主要是讲预测框拉平,原本是有batch这个维度的,现在拉平到一维,方便后续计算损失。
  5. 这里还会计算一个class_error:Top-1精度,用于日志显示。

回归损失self.boxes:

    def loss_boxes(self, outputs, targets, indices, num_boxes):
        """Compute the losses related to the bounding boxes, the L1 regression loss and the GIoU loss
           targets dicts must contain the key "boxes" containing a tensor of dim [nb_target_boxes, 4]
           The target boxes are expected in format (center_x, center_y, w, h), normalized by the image size.
        outputs:'pred_logits'=[bs, 100, 92] 'pred_boxes'=[bs, 100, 4] 'aux_outputs'=5*([bs, 100, 92]+[bs, 100, 4])
        targets:'boxes'=[3,4] labels=[3] ...
        indices: [3] 如:5,35,63  匹配好的3个预测框idx
        num_boxes:当前batch的所有gt个数
        """
        assert 'pred_boxes' in outputs
        # idx tuple:2  0=[num_all_gt] 记录每个gt属于哪张图片  1=[num_all_gt] 记录每个匹配到的预测框的index
        idx = self._get_src_permutation_idx(indices)

        # [all_gt_num, 4]  这个batch的所有正样本的预测框坐标
        src_boxes = outputs['pred_boxes'][idx]
        # [all_gt_num, 4]  这个batch的所有gt框坐标
        target_boxes = torch.cat([t['boxes'][i] for t, (_, i) in zip(targets, indices)], dim=0)

        # 计算L1损失
        loss_bbox = F.l1_loss(src_boxes, target_boxes, reduction='none')

        losses = {}
        losses['loss_bbox'] = loss_bbox.sum() / num_boxes

        # 计算GIOU损失
        loss_giou = 1 - torch.diag(box_ops.generalized_box_iou(
            box_ops.box_cxcywh_to_xyxy(src_boxes),
            box_ops.box_cxcywh_to_xyxy(target_boxes)))
        losses['loss_giou'] = loss_giou.sum() / num_boxes

        # 'loss_bbox': L1回归损失   'loss_giou': giou回归损失  
        return losses

上述代码可总结为:

  1. 回归损失:只计算所有正样本的回归损失。
  2. 回归损失 = L1 Loss + GIOU Loss。

至此介绍完了DETR代码中的大部分内容了。

总结

通过对DETR代码的解读,进一步理解了其创新性和实现细节,代码详细展示了三部分,第一步部分是backbone,包含CNN特征和位置编码Positional Encoding的实现;第二部分是Transformer的实现,主要学习DETR中TransformerEncoderLayer和TransformerDecoderLayer的实现,第三部分是损失函数的实现,学习率匈牙利算法和二分图匹配(self.matcher)的内容。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐