视觉语言导航从入门到精通(三)
VLN Agent 架构Language EncoderCross-Modal FusionVisual EncoderAction DecoderAction: 前进/左转/右转/停止模块功能Language Encoder编码自然语言指令Visual Encoder编码视觉观察Cross-Modal Fusion融合语言和视觉特征Action Decoder解码生成导航动作1.2 导航循环2.
视觉语言导航从入门到精通(三):核心模型架构详解
本文是「视觉语言导航从入门到精通」系列的第三篇,深入讲解VLN的核心模型架构和关键技术。
文章目录
- 1. VLN模型总体框架
- 2. 编码器模块
- 3. 跨模态融合
- 4. 动作解码与决策
- 5. 经典模型详解
- 6. 空间表示与地图方法
- 7. 预训练策略
- 8. 多任务学习
- 9. PyTorch实现
- 10. 数学原理深入
- 11. 训练技巧与实践经验
- 12. 消融实验与分析
1. VLN模型总体框架
1.1 基础架构
VLN Agent 架构
| 模块 | 功能 |
|---|---|
| Language Encoder | 编码自然语言指令 |
| Visual Encoder | 编码视觉观察 |
| Cross-Modal Fusion | 融合语言和视觉特征 |
| Action Decoder | 解码生成导航动作 |
1.2 导航循环
# VLN导航的基本循环
def navigate(agent, instruction, env):
"""
VLN导航主循环
"""
# 1. 编码指令(只需一次)
lang_features = agent.encode_language(instruction)
# 2. 初始化状态
hidden_state = agent.init_state()
done = False
trajectory = []
while not done:
# 3. 获取当前视觉观察
observation = env.get_observation()
# 4. 编码视觉特征
visual_features = agent.encode_visual(observation)
# 5. 跨模态融合
fused_features = agent.fuse(lang_features, visual_features, hidden_state)
# 6. 预测动作
action, hidden_state = agent.decode_action(fused_features)
# 7. 执行动作
env.step(action)
trajectory.append(action)
# 8. 检查是否结束
if action == 'STOP' or len(trajectory) > MAX_STEPS:
done = True
return trajectory
2. 编码器模块
2.1 语言编码器
LSTM编码器(经典方法)
import torch
import torch.nn as nn
class LSTMLanguageEncoder(nn.Module):
"""基于LSTM的语言编码器"""
def __init__(self, vocab_size, embed_dim=256, hidden_dim=512,
num_layers=2, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
self.lstm = nn.LSTM(
embed_dim,
hidden_dim // 2, # 双向LSTM
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout
)
self.dropout = nn.Dropout(dropout)
def forward(self, input_ids, lengths):
"""
Args:
input_ids: [batch, seq_len]
lengths: [batch] 每个序列的实际长度
Returns:
outputs: [batch, seq_len, hidden_dim] 每个token的表示
final: [batch, hidden_dim] 句子级别表示
"""
# 词嵌入
embeds = self.dropout(self.embedding(input_ids))
# Pack序列
packed = nn.utils.rnn.pack_padded_sequence(
embeds, lengths.cpu(), batch_first=True, enforce_sorted=False
)
# LSTM编码
outputs, (h_n, c_n) = self.lstm(packed)
# Unpack
outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
# 拼接双向最后隐藏状态
final = torch.cat([h_n[-2], h_n[-1]], dim=-1)
return outputs, final
BERT编码器(现代方法)
from transformers import BertModel, BertTokenizer
class BERTLanguageEncoder(nn.Module):
"""基于BERT的语言编码器"""
def __init__(self, bert_model='bert-base-uncased', finetune=True):
super().__init__()
self.bert = BertModel.from_pretrained(bert_model)
self.hidden_dim = self.bert.config.hidden_size # 768
if not finetune:
for param in self.bert.parameters():
param.requires_grad = False
def forward(self, input_ids, attention_mask):
"""
Args:
input_ids: [batch, seq_len]
attention_mask: [batch, seq_len]
Returns:
token_features: [batch, seq_len, 768]
sentence_feature: [batch, 768]
"""
outputs = self.bert(
input_ids=input_ids,
attention_mask=attention_mask,
return_dict=True
)
token_features = outputs.last_hidden_state # [batch, seq_len, 768]
sentence_feature = outputs.pooler_output # [batch, 768]
return token_features, sentence_feature
2.2 视觉编码器
ResNet特征提取
import torchvision.models as models
from torchvision.models import ResNet152_Weights
class ResNetVisualEncoder(nn.Module):
"""基于ResNet的视觉编码器"""
def __init__(self, output_dim=512, pretrained=True):
super().__init__()
# 加载预训练ResNet(使用新版API)
weights = ResNet152_Weights.IMAGENET1K_V2 if pretrained else None
resnet = models.resnet152(weights=weights)
# 移除最后的全连接层
self.backbone = nn.Sequential(*list(resnet.children())[:-1])
# 投影层
self.proj = nn.Linear(2048, output_dim)
def forward(self, images):
"""
Args:
images: [batch, num_views, 3, H, W] 全景图像
Returns:
features: [batch, num_views, output_dim]
"""
batch_size, num_views = images.shape[:2]
# 合并batch和views维度
images = images.view(-1, *images.shape[2:])
# 提取特征
features = self.backbone(images) # [batch*views, 2048, 1, 1]
features = features.squeeze(-1).squeeze(-1) # [batch*views, 2048]
# 投影
features = self.proj(features) # [batch*views, output_dim]
# 恢复维度
features = features.view(batch_size, num_views, -1)
return features
ViT视觉编码器
from transformers import ViTModel
class ViTVisualEncoder(nn.Module):
"""基于Vision Transformer的视觉编码器"""
def __init__(self, model_name='google/vit-base-patch16-224'):
super().__init__()
self.vit = ViTModel.from_pretrained(model_name)
self.hidden_dim = self.vit.config.hidden_size
def forward(self, images):
"""
Args:
images: [batch, num_views, 3, 224, 224]
Returns:
features: [batch, num_views, hidden_dim]
"""
batch_size, num_views = images.shape[:2]
images = images.view(-1, *images.shape[2:])
outputs = self.vit(pixel_values=images)
# 使用CLS token作为图像表示
features = outputs.last_hidden_state[:, 0] # [batch*views, hidden_dim]
features = features.view(batch_size, num_views, -1)
return features
全景图表示
graph TB
subgraph panorama["🌐 全景图视角划分 (36视角)"]
direction TB
P["📷 全景图"] --> E1["+30° 仰角 (上方)"]
P --> E2["0° 仰角 (水平)"]
P --> E3["-30° 仰角 (下方)"]
E1 --> V1["v1 ~ v12"]
E2 --> V2["v13 ~ v24"]
E3 --> V3["v25 ~ v36"]
end
subgraph heading["🧭 方位角分布 (每30°一个)"]
direction LR
H0["0°"] --- H30["30°"] --- H60["60°"] --- H90["90°"] --- H120["..."] --- H330["330°"]
end
V1 -.->|"12个方位角"| heading
V2 -.->|"12个方位角"| heading
V3 -.->|"12个方位角"| heading
视角计算: 3 种仰角 × 12 个方位角 = 36 个离散视角
3. 跨模态融合
3.1 注意力机制
Soft Attention
class SoftAttention(nn.Module):
"""软注意力机制"""
def __init__(self, query_dim, key_dim, hidden_dim=256):
super().__init__()
self.query_proj = nn.Linear(query_dim, hidden_dim)
self.key_proj = nn.Linear(key_dim, hidden_dim)
self.score = nn.Linear(hidden_dim, 1)
def forward(self, query, keys, mask=None):
"""
Args:
query: [batch, query_dim] 查询向量
keys: [batch, num_keys, key_dim] 键值对
mask: [batch, num_keys] 可选的mask
Returns:
context: [batch, key_dim] 加权上下文
weights: [batch, num_keys] 注意力权重
"""
# 投影
q = self.query_proj(query).unsqueeze(1) # [batch, 1, hidden]
k = self.key_proj(keys) # [batch, num_keys, hidden]
# 计算注意力分数
scores = self.score(torch.tanh(q + k)).squeeze(-1) # [batch, num_keys]
# 应用mask
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
# Softmax归一化
weights = torch.softmax(scores, dim=-1) # [batch, num_keys]
# 加权求和
context = torch.bmm(weights.unsqueeze(1), keys).squeeze(1)
return context, weights
Cross-Modal Attention
class CrossModalAttention(nn.Module):
"""跨模态注意力"""
def __init__(self, visual_dim, lang_dim, hidden_dim=512, num_heads=8):
super().__init__()
# 视觉 -> 语言 注意力
self.v2l_attention = nn.MultiheadAttention(
embed_dim=hidden_dim,
num_heads=num_heads,
batch_first=True
)
# 语言 -> 视觉 注意力
self.l2v_attention = nn.MultiheadAttention(
embed_dim=hidden_dim,
num_heads=num_heads,
batch_first=True
)
# 投影层
self.visual_proj = nn.Linear(visual_dim, hidden_dim)
self.lang_proj = nn.Linear(lang_dim, hidden_dim)
# Layer Norm
self.norm1 = nn.LayerNorm(hidden_dim)
self.norm2 = nn.LayerNorm(hidden_dim)
def forward(self, visual_feats, lang_feats, lang_mask=None):
"""
Args:
visual_feats: [batch, num_views, visual_dim]
lang_feats: [batch, seq_len, lang_dim]
lang_mask: [batch, seq_len]
Returns:
fused_visual: [batch, num_views, hidden_dim]
fused_lang: [batch, seq_len, hidden_dim]
"""
# 投影到相同维度
v = self.visual_proj(visual_feats)
l = self.lang_proj(lang_feats)
# 视觉特征关注语言
v_attended, _ = self.v2l_attention(
query=v, key=l, value=l,
key_padding_mask=~lang_mask if lang_mask is not None else None
)
fused_visual = self.norm1(v + v_attended)
# 语言特征关注视觉
l_attended, _ = self.l2v_attention(
query=l, key=v, value=v
)
fused_lang = self.norm2(l + l_attended)
return fused_visual, fused_lang
3.2 Co-Grounding机制
class CoGrounding(nn.Module):
"""
Co-Grounding: 同时进行视觉定位和语言定位
参考: Self-Monitoring Navigation Agent (ICCV 2019)
"""
def __init__(self, hidden_dim=512):
super().__init__()
# 文本到视觉的定位
self.text_to_visual = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
# 视觉到文本的定位
self.visual_to_text = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, visual_feats, text_feats, text_mask=None):
"""
Args:
visual_feats: [batch, num_views, hidden_dim]
text_feats: [batch, seq_len, hidden_dim]
Returns:
visual_weights: [batch, num_views] 视觉注意力
text_weights: [batch, seq_len] 文本注意力
visual_context: [batch, hidden_dim]
text_context: [batch, hidden_dim]
"""
batch_size = visual_feats.size(0)
num_views = visual_feats.size(1)
seq_len = text_feats.size(1)
# 计算所有视觉-文本对的相似度
# [batch, num_views, seq_len, hidden_dim*2]
v_expanded = visual_feats.unsqueeze(2).expand(-1, -1, seq_len, -1)
t_expanded = text_feats.unsqueeze(1).expand(-1, num_views, -1, -1)
combined = torch.cat([v_expanded, t_expanded], dim=-1)
# 文本到视觉权重
t2v_scores = self.text_to_visual(combined).squeeze(-1) # [batch, num_views, seq_len]
t2v_scores = t2v_scores.mean(dim=-1) # [batch, num_views]
visual_weights = torch.softmax(t2v_scores, dim=-1)
# 视觉到文本权重
v2t_scores = self.visual_to_text(combined).squeeze(-1)
v2t_scores = v2t_scores.mean(dim=1) # [batch, seq_len]
if text_mask is not None:
v2t_scores = v2t_scores.masked_fill(~text_mask, float('-inf'))
text_weights = torch.softmax(v2t_scores, dim=-1)
# 加权得到上下文
visual_context = (visual_feats * visual_weights.unsqueeze(-1)).sum(dim=1)
text_context = (text_feats * text_weights.unsqueeze(-1)).sum(dim=1)
return visual_weights, text_weights, visual_context, text_context
4. 动作解码与决策
4.1 动作空间
# R2R 离散动作空间
class R2RActionSpace:
"""R2R数据集的动作空间"""
# 高层动作
ACTIONS = {
'STOP': 0, # 停止导航
'FORWARD': 1, # 选择一个viewpoint前进
}
# 实际执行时,FORWARD需要选择具体的viewpoint
# viewpoint选择范围: 当前位置可达的相邻节点
@staticmethod
def get_navigable_viewpoints(state):
"""获取当前可导航的viewpoint列表"""
return state.navigableLocations
# 连续导航动作空间 (Habitat)
class ContinuousActionSpace:
"""连续导航的动作空间"""
ACTIONS = {
'STOP': 0,
'MOVE_FORWARD': 1, # 前进0.25米
'TURN_LEFT': 2, # 左转15度
'TURN_RIGHT': 3, # 右转15度
}
4.2 LSTM解码器
class LSTMDecoder(nn.Module):
"""基于LSTM的动作解码器"""
def __init__(self, input_dim, hidden_dim=512, dropout=0.5):
super().__init__()
self.lstm = nn.LSTMCell(input_dim, hidden_dim)
self.dropout = nn.Dropout(dropout)
# 动作预测头
self.action_predictor = nn.Linear(hidden_dim, 1) # 输出viewpoint分数
def forward(self, x, prev_hidden, prev_cell):
"""
Args:
x: [batch, input_dim] 当前输入
prev_hidden: [batch, hidden_dim] 上一步隐藏状态
prev_cell: [batch, hidden_dim] 上一步cell状态
Returns:
action_logits: [batch, num_candidates] 动作logits
hidden: [batch, hidden_dim]
cell: [batch, hidden_dim]
"""
hidden, cell = self.lstm(x, (prev_hidden, prev_cell))
hidden = self.dropout(hidden)
return hidden, cell
class ActionPredictor(nn.Module):
"""动作预测器 - 选择下一个viewpoint"""
def __init__(self, hidden_dim, visual_dim):
super().__init__()
self.proj = nn.Linear(hidden_dim + visual_dim, hidden_dim)
self.score = nn.Linear(hidden_dim, 1)
def forward(self, hidden, candidate_features, candidate_mask=None):
"""
Args:
hidden: [batch, hidden_dim] 解码器隐藏状态
candidate_features: [batch, num_candidates, visual_dim] 候选viewpoint特征
candidate_mask: [batch, num_candidates] 有效候选mask
Returns:
action_probs: [batch, num_candidates] 动作概率分布
"""
batch_size, num_candidates, _ = candidate_features.shape
# 扩展hidden
hidden_expanded = hidden.unsqueeze(1).expand(-1, num_candidates, -1)
# 拼接并计算分数
combined = torch.cat([hidden_expanded, candidate_features], dim=-1)
scores = self.score(torch.tanh(self.proj(combined))).squeeze(-1)
# 应用mask
if candidate_mask is not None:
scores = scores.masked_fill(~candidate_mask, float('-inf'))
action_probs = torch.softmax(scores, dim=-1)
return action_probs, scores
4.3 Transformer解码器
class TransformerDecoder(nn.Module):
"""基于Transformer的动作解码器"""
def __init__(self, hidden_dim=768, num_layers=4, num_heads=12, dropout=0.1):
super().__init__()
decoder_layer = nn.TransformerDecoderLayer(
d_model=hidden_dim,
nhead=num_heads,
dim_feedforward=hidden_dim * 4,
dropout=dropout,
batch_first=True
)
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
# 位置编码
self.pos_encoder = PositionalEncoding(hidden_dim, dropout)
# 动作embedding
self.action_embed = nn.Embedding(10, hidden_dim) # 假设最多10种动作
def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
"""
Args:
tgt: [batch, tgt_len, hidden_dim] 目标序列(历史动作)
memory: [batch, src_len, hidden_dim] 编码器输出
Returns:
output: [batch, tgt_len, hidden_dim]
"""
tgt = self.pos_encoder(tgt)
output = self.decoder(tgt, memory, tgt_mask=tgt_mask)
return output
5. 经典模型详解
5.1 Seq2Seq基础模型
Seq2Seq VLN模型结构
数据流程:
- 语言编码: 指令通过Bi-LSTM编码为上下文向量
- 视觉编码: 观察图像通过ResNet提取特征
- 注意力融合: 语言和视觉特征通过注意力机制融合
- 动作解码: LSTM解码器生成导航动作
class Seq2SeqVLN(nn.Module):
"""基础Seq2Seq VLN模型"""
def __init__(self, vocab_size, embed_dim=256, hidden_dim=512):
super().__init__()
# 语言编码器
self.lang_encoder = LSTMLanguageEncoder(vocab_size, embed_dim, hidden_dim)
# 视觉编码器
self.visual_encoder = ResNetVisualEncoder(output_dim=hidden_dim)
# 注意力
self.attention = SoftAttention(hidden_dim, hidden_dim)
# 解码器
self.decoder = nn.LSTMCell(hidden_dim * 2, hidden_dim)
# 动作预测
self.action_predictor = ActionPredictor(hidden_dim, hidden_dim)
def forward(self, instructions, lengths, visual_obs, candidates,
prev_hidden, prev_cell):
"""单步前向传播"""
# 编码语言
lang_features, lang_ctx = self.lang_encoder(instructions, lengths)
# 编码视觉
visual_features = self.visual_encoder(visual_obs)
# 注意力加权语言
attended_lang, lang_weights = self.attention(
prev_hidden, lang_features
)
# 注意力加权视觉
attended_visual, visual_weights = self.attention(
prev_hidden, visual_features
)
# 解码
decoder_input = torch.cat([attended_lang, attended_visual], dim=-1)
hidden, cell = self.decoder(decoder_input, (prev_hidden, prev_cell))
# 预测动作
action_probs, action_logits = self.action_predictor(
hidden, candidates
)
return action_probs, action_logits, hidden, cell
5.2 Speaker-Follower模型
Speaker-Follower数据增强框架
训练阶段的三步流程:
| 步骤 | 输入 | 模型 | 输出 |
|---|---|---|---|
| 1. 训练Speaker | Path | Speaker | Synthetic Instruction |
| 2. 数据增强 | 随机采样路径 | Speaker | 合成指令 |
| 3. 训练Follower | 原始数据 + 增强数据 | Follower | 导航策略 |
核心思想:使用Speaker模型从路径生成指令,扩充训练数据。
class Speaker(nn.Module):
"""Speaker模型:根据路径生成指令"""
def __init__(self, vocab_size, visual_dim=2048, hidden_dim=512):
super().__init__()
# 视觉编码
self.visual_encoder = nn.Linear(visual_dim, hidden_dim)
# LSTM解码器生成指令
self.decoder = nn.LSTM(hidden_dim + 256, hidden_dim, batch_first=True)
# 词嵌入
self.embedding = nn.Embedding(vocab_size, 256)
# 输出层
self.output = nn.Linear(hidden_dim, vocab_size)
def forward(self, visual_sequence, target_instructions=None):
"""
训练时使用teacher forcing
推理时自回归生成
"""
# 编码视觉序列
visual_features = self.visual_encoder(visual_sequence)
if target_instructions is not None:
# Teacher forcing训练
embeds = self.embedding(target_instructions[:, :-1])
inputs = torch.cat([visual_features, embeds], dim=-1)
outputs, _ = self.decoder(inputs)
logits = self.output(outputs)
return logits
else:
# 自回归生成
return self.generate(visual_features)
def generate(self, visual_features, max_len=80):
"""自回归生成指令"""
batch_size = visual_features.size(0)
device = visual_features.device
# 初始化
generated = torch.zeros(batch_size, 1).long().to(device) # <BOS>
hidden = None
for _ in range(max_len):
embeds = self.embedding(generated[:, -1:])
inputs = torch.cat([visual_features[:, :1], embeds], dim=-1)
outputs, hidden = self.decoder(inputs, hidden)
logits = self.output(outputs)
# 采样下一个词
next_token = logits.argmax(dim=-1)
generated = torch.cat([generated, next_token], dim=1)
# 检查是否全部生成<EOS>
if (next_token == 1).all(): # 假设1是<EOS>
break
return generated
5.3 VLNBERT / RecBERT
VLNBERT架构
输入序列格式:[CLS] w1 w2 ... wn [SEP] v1 v2 ... vm [SEP] h1 h2 ... hk
| Token类型 | 说明 |
|---|---|
[CLS] |
特殊分类token,输出用于动作预测 |
w1...wn |
语言tokens(指令) |
v1...vm |
视觉tokens(当前观察) |
h1...hk |
历史tokens(导航历史) |
处理流程:输入序列 → BERT Encoder (多层Transformer) → [CLS]输出 → 动作预测
from transformers import BertModel, BertConfig
class VLNBERT(nn.Module):
"""VLN-BERT模型"""
def __init__(self, config_path=None):
super().__init__()
# 加载BERT配置
if config_path:
config = BertConfig.from_json_file(config_path)
else:
config = BertConfig(
hidden_size=768,
num_attention_heads=12,
num_hidden_layers=9,
intermediate_size=3072
)
self.bert = BertModel(config)
self.hidden_dim = config.hidden_size
# 视觉投影
self.visual_proj = nn.Linear(2048, self.hidden_dim)
# 动作角度编码
self.angle_encoder = nn.Linear(4, self.hidden_dim) # [sin, cos, sin, cos]
# Token类型embedding
self.token_type_embeddings = nn.Embedding(3, self.hidden_dim)
# 0: 语言, 1: 视觉, 2: 历史
# 动作预测头
self.action_head = nn.Linear(self.hidden_dim, 1)
def forward(self, input_ids, attention_mask, visual_features,
angle_features, history_features=None):
"""
Args:
input_ids: [batch, lang_len] 语言token ids
attention_mask: [batch, lang_len]
visual_features: [batch, num_views, 2048]
angle_features: [batch, num_views, 4]
history_features: [batch, hist_len, hidden_dim] 可选
"""
batch_size = input_ids.size(0)
# 1. 语言embedding
lang_embeds = self.bert.embeddings.word_embeddings(input_ids)
lang_type = self.token_type_embeddings(
torch.zeros_like(input_ids)
)
lang_embeds = lang_embeds + lang_type
# 2. 视觉embedding
visual_embeds = self.visual_proj(visual_features)
angle_embeds = self.angle_encoder(angle_features)
visual_embeds = visual_embeds + angle_embeds
visual_type = self.token_type_embeddings(
torch.ones(batch_size, visual_embeds.size(1)).long().to(input_ids.device)
)
visual_embeds = visual_embeds + visual_type
# 3. 拼接所有embedding
if history_features is not None:
history_type = self.token_type_embeddings(
torch.full((batch_size, history_features.size(1)), 2).long().to(input_ids.device)
)
history_embeds = history_features + history_type
all_embeds = torch.cat([lang_embeds, visual_embeds, history_embeds], dim=1)
else:
all_embeds = torch.cat([lang_embeds, visual_embeds], dim=1)
# 4. 通过BERT
outputs = self.bert(
inputs_embeds=all_embeds,
attention_mask=self._create_attention_mask(attention_mask, all_embeds)
)
# 5. 提取视觉token的表示用于动作预测
lang_len = input_ids.size(1)
visual_len = visual_features.size(1)
visual_outputs = outputs.last_hidden_state[:, lang_len:lang_len+visual_len]
# 6. 动作分数
action_scores = self.action_head(visual_outputs).squeeze(-1)
return action_scores, outputs.last_hidden_state[:, 0] # scores, CLS
5.4 HAMT (History Aware Multimodal Transformer)
HAMT架构:显式建模导航历史
核心组件:
| 模块 | 功能 |
|---|---|
| History Encoder | 编码时序历史 obs₁→h₁→obs₂→h₂→…→obsₜ→hₜ |
| Cross-Modal Transformer | Language ←Attention→ History 双向注意力融合 |
| Action Prediction | 基于融合特征预测动作 |
History: obs₁ → obs₂ → obs₃ → ... → obsₜ
↓ ↓ ↓ ↓
[h₁] → [h₂] → [h₃] → ... → [hₜ]
↓
Language ──────────────────────> Cross-Modal Transformer ──> Action
class HAMT(nn.Module):
"""History Aware Multimodal Transformer"""
def __init__(self, hidden_dim=768, num_layers=4, num_heads=12):
super().__init__()
# 语言编码器
self.lang_encoder = BERTLanguageEncoder()
# 视觉编码器
self.visual_encoder = ViTVisualEncoder()
# 历史编码器
self.history_encoder = nn.TransformerEncoder(
nn.TransformerEncoderLayer(
d_model=hidden_dim,
nhead=num_heads,
batch_first=True
),
num_layers=2
)
# 观察编码
self.observation_encoder = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim), # visual + action
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim)
)
# 跨模态Transformer
self.cross_modal_encoder = nn.TransformerEncoder(
nn.TransformerEncoderLayer(
d_model=hidden_dim,
nhead=num_heads,
batch_first=True
),
num_layers=num_layers
)
# 动作预测
self.action_predictor = nn.Linear(hidden_dim, 1)
# 位置编码
self.pos_encoding = LearnedPositionalEncoding(hidden_dim)
def encode_history(self, observations, actions):
"""
编码导航历史
Args:
observations: list of [batch, hidden_dim] 历史观察
actions: list of [batch, hidden_dim] 历史动作
Returns:
history: [batch, hist_len, hidden_dim]
"""
history_embeds = []
for obs, act in zip(observations, actions):
combined = torch.cat([obs, act], dim=-1)
encoded = self.observation_encoder(combined)
history_embeds.append(encoded)
history = torch.stack(history_embeds, dim=1)
history = history + self.pos_encoding(history)
history = self.history_encoder(history)
return history
def forward(self, input_ids, attention_mask, current_visual,
history_observations, history_actions, candidates):
"""
Args:
input_ids: [batch, seq_len]
current_visual: [batch, num_views, visual_dim]
history_*: 历史信息
candidates: [batch, num_candidates, visual_dim]
"""
# 1. 编码语言
lang_features, _ = self.lang_encoder(input_ids, attention_mask)
# 2. 编码当前视觉
visual_features = self.visual_encoder(current_visual)
# 3. 编码历史
history_features = self.encode_history(
history_observations, history_actions
)
# 4. 跨模态融合
combined = torch.cat([
lang_features,
visual_features,
history_features
], dim=1)
fused = self.cross_modal_encoder(combined)
# 5. 提取全局表示
global_repr = fused.mean(dim=1)
# 6. 动作预测
candidate_scores = torch.bmm(
candidates,
global_repr.unsqueeze(-1)
).squeeze(-1)
return candidate_scores
6. 空间表示与地图方法
6.1 拓扑地图表示
拓扑地图是一种高效的空间表示方法,将环境建模为节点和边的图结构:
import torch
import torch.nn as nn
import torch_geometric
from torch_geometric.nn import GATConv, GCNConv
class TopologicalMap(nn.Module):
"""拓扑地图表示"""
def __init__(self, node_dim=768, edge_dim=128, num_layers=3):
super().__init__()
self.node_dim = node_dim
self.edge_dim = edge_dim
# 节点特征编码
self.node_encoder = nn.Sequential(
nn.Linear(2048 + 128, node_dim), # visual + positional
nn.ReLU(),
nn.Dropout(0.1)
)
# 边特征编码
self.edge_encoder = nn.Sequential(
nn.Linear(7, edge_dim), # distance, heading, elevation, etc.
nn.ReLU()
)
# 图神经网络层
self.gnn_layers = nn.ModuleList([
GATConv(node_dim, node_dim // 8, heads=8, dropout=0.1)
for _ in range(num_layers)
])
# 层归一化
self.layer_norms = nn.ModuleList([
nn.LayerNorm(node_dim) for _ in range(num_layers)
])
def forward(self, node_features, edge_index, edge_attr=None):
"""
Args:
node_features: [num_nodes, visual_dim + pos_dim]
edge_index: [2, num_edges]
edge_attr: [num_edges, edge_dim]
Returns:
node_embeddings: [num_nodes, node_dim]
"""
# 编码节点特征
x = self.node_encoder(node_features)
# 图神经网络传播
for i, gnn_layer in enumerate(self.gnn_layers):
x_new = gnn_layer(x, edge_index)
x = self.layer_norms[i](x + x_new) # 残差连接
return x
def update_map(self, current_map, new_observation, action):
"""
增量更新地图
Args:
current_map: 当前地图状态
new_observation: 新的观察
action: 执行的动作
"""
# 添加新节点(如果是新位置)
new_node = self.create_node(new_observation)
# 添加边连接
new_edge = self.create_edge(current_map.current_node, new_node, action)
# 更新地图
current_map.add_node(new_node)
current_map.add_edge(new_edge)
return current_map
class DynamicGraph:
"""动态构建的导航图"""
def __init__(self):
self.nodes = {} # viewpoint_id -> features
self.edges = [] # (src, dst, edge_features)
self.current_node = None
self.visited = set()
def add_node(self, vp_id: str, features: torch.Tensor):
"""添加节点"""
if vp_id not in self.nodes:
self.nodes[vp_id] = features
def add_edge(self, src: str, dst: str, edge_features: torch.Tensor):
"""添加边"""
self.edges.append((src, dst, edge_features))
def get_graph_tensors(self):
"""获取图的张量表示"""
node_list = list(self.nodes.keys())
node_to_idx = {vp: i for i, vp in enumerate(node_list)}
# 节点特征
node_features = torch.stack([self.nodes[vp] for vp in node_list])
# 边索引
edge_index = []
edge_attr = []
for src, dst, feat in self.edges:
if src in node_to_idx and dst in node_to_idx:
edge_index.append([node_to_idx[src], node_to_idx[dst]])
edge_attr.append(feat)
edge_index = torch.tensor(edge_index).T if edge_index else torch.zeros(2, 0).long()
edge_attr = torch.stack(edge_attr) if edge_attr else torch.zeros(0, 7)
return node_features, edge_index, edge_attr
6.2 BEV (鸟瞰图) 表示
BEV表示将3D环境投影到2D平面,提供全局空间感知:
class BEVEncoder(nn.Module):
"""BEV鸟瞰图编码器"""
def __init__(self, hidden_dim=256, grid_size=32, resolution=0.5):
super().__init__()
self.grid_size = grid_size
self.resolution = resolution # 每个网格的实际尺寸(米)
self.hidden_dim = hidden_dim
# 将3D特征投影到BEV网格
self.point_to_grid = nn.Sequential(
nn.Linear(2048, hidden_dim),
nn.ReLU()
)
# BEV特征CNN
self.bev_cnn = nn.Sequential(
nn.Conv2d(hidden_dim, hidden_dim, 3, padding=1),
nn.BatchNorm2d(hidden_dim),
nn.ReLU(),
nn.Conv2d(hidden_dim, hidden_dim, 3, padding=1),
nn.BatchNorm2d(hidden_dim),
nn.ReLU(),
nn.Conv2d(hidden_dim, hidden_dim, 3, padding=1),
nn.BatchNorm2d(hidden_dim),
nn.ReLU()
)
# 位置编码
self.pos_embed = nn.Parameter(
torch.zeros(1, hidden_dim, grid_size, grid_size)
)
def forward(self, point_features, point_positions):
"""
Args:
point_features: [batch, num_points, feat_dim]
point_positions: [batch, num_points, 3] (x, y, z)
Returns:
bev_features: [batch, hidden_dim, grid_size, grid_size]
"""
batch_size = point_features.size(0)
# 初始化BEV网格
bev_grid = torch.zeros(
batch_size, self.hidden_dim,
self.grid_size, self.grid_size,
device=point_features.device
)
# 投影特征到网格
projected_features = self.point_to_grid(point_features)
# 将点分配到网格单元
for b in range(batch_size):
for i in range(point_features.size(1)):
x, y = point_positions[b, i, 0], point_positions[b, i, 1]
# 计算网格坐标
grid_x = int((x / self.resolution) + self.grid_size // 2)
grid_y = int((y / self.resolution) + self.grid_size // 2)
# 边界检查
if 0 <= grid_x < self.grid_size and 0 <= grid_y < self.grid_size:
bev_grid[b, :, grid_y, grid_x] += projected_features[b, i]
# 添加位置编码
bev_grid = bev_grid + self.pos_embed
# CNN处理
bev_features = self.bev_cnn(bev_grid)
return bev_features
class BEVBert(nn.Module):
"""BEVBert: 基于BEV的VLN模型"""
def __init__(self, hidden_dim=768):
super().__init__()
# 语言编码器
from transformers import BertModel
self.lang_encoder = BertModel.from_pretrained('bert-base-uncased')
# BEV编码器
self.bev_encoder = BEVEncoder(hidden_dim=hidden_dim)
# 跨模态融合
self.cross_attention = nn.MultiheadAttention(
hidden_dim, num_heads=12, batch_first=True
)
# 动作预测
self.action_predictor = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, input_ids, attention_mask, bev_features, candidates):
"""前向传播"""
# 语言编码
lang_output = self.lang_encoder(
input_ids=input_ids,
attention_mask=attention_mask
)
lang_features = lang_output.last_hidden_state
# BEV特征展平
bev_flat = bev_features.flatten(2).permute(0, 2, 1) # [B, H*W, C]
# 跨模态注意力
fused, _ = self.cross_attention(
query=bev_flat,
key=lang_features,
value=lang_features,
key_padding_mask=~attention_mask.bool()
)
# 全局特征
global_feat = fused.mean(dim=1)
# 动作预测
candidate_scores = []
for i in range(candidates.size(1)):
combined = torch.cat([global_feat, candidates[:, i]], dim=-1)
score = self.action_predictor(combined)
candidate_scores.append(score)
return torch.cat(candidate_scores, dim=-1)
6.3 语义地图构建
class SemanticMapper(nn.Module):
"""语义地图构建器"""
def __init__(self, num_classes=40, map_size=128, resolution=0.1):
super().__init__()
self.num_classes = num_classes
self.map_size = map_size
self.resolution = resolution
# 语义分割模型
self.semantic_segmentor = SemanticSegmentationHead(num_classes)
# 深度估计
self.depth_estimator = DepthEstimationHead()
# 地图融合模块
self.map_fuser = nn.Sequential(
nn.Conv2d(num_classes + 1, 64, 3, padding=1),
nn.ReLU(),
nn.Conv2d(64, 32, 3, padding=1),
nn.ReLU()
)
def build_local_map(self, rgb_image, depth_map, camera_pose):
"""
从当前观察构建局部语义地图
Args:
rgb_image: [B, 3, H, W]
depth_map: [B, 1, H, W]
camera_pose: [B, 4, 4] 相机位姿矩阵
"""
# 语义分割
semantic_logits = self.semantic_segmentor(rgb_image)
semantic_map = semantic_logits.argmax(dim=1)
# 反投影到3D空间
point_cloud = self.depth_to_pointcloud(depth_map, camera_pose)
# 投影到2D地图
local_map = self.project_to_map(point_cloud, semantic_map)
return local_map
def update_global_map(self, global_map, local_map, agent_pose):
"""
更新全局地图
Args:
global_map: [B, C, H, W] 当前全局地图
local_map: [B, C, h, w] 新的局部地图
agent_pose: [B, 3] (x, y, theta)
"""
# 变换到全局坐标系
transformed_local = self.transform_to_global(local_map, agent_pose)
# 融合
updated_map = self.fuse_maps(global_map, transformed_local)
return updated_map
def fuse_maps(self, global_map, local_map):
"""融合全局和局部地图"""
# 使用置信度加权融合
combined = torch.cat([global_map, local_map], dim=1)
fused = self.map_fuser(combined)
# 更新规则:新观察有更高置信度的区域覆盖旧值
confidence_global = global_map.max(dim=1, keepdim=True)[0]
confidence_local = local_map.max(dim=1, keepdim=True)[0]
mask = (confidence_local > confidence_global).float()
updated = global_map * (1 - mask) + local_map * mask
return updated
7. 预训练策略
7.1 VLN预训练任务
class VLNPretrainer(nn.Module):
"""VLN预训练模型"""
def __init__(self, config):
super().__init__()
self.config = config
# 基础编码器
self.visual_encoder = VisualEncoder(config.visual_dim)
self.lang_encoder = LanguageEncoder(config.lang_dim)
self.cross_encoder = CrossModalEncoder(config.hidden_dim)
# 预训练任务头
self.mlm_head = nn.Linear(config.hidden_dim, config.vocab_size) # MLM
self.itm_head = nn.Linear(config.hidden_dim, 2) # Image-Text Matching
self.sap_head = nn.Linear(config.hidden_dim, config.num_actions) # Single Action Prediction
self.sprel_head = nn.Linear(config.hidden_dim * 2, 3) # Spatial Relationship
def forward_mlm(self, input_ids, visual_features, mlm_labels):
"""
Masked Language Modeling
随机遮挡15%的语言token,预测被遮挡的词
"""
# 遮挡处理
masked_input_ids, mlm_labels = self.create_mlm_masks(input_ids)
# 编码
lang_features = self.lang_encoder(masked_input_ids)
fused = self.cross_encoder(lang_features, visual_features)
# 预测
mlm_logits = self.mlm_head(fused)
# 损失
mlm_loss = F.cross_entropy(
mlm_logits.view(-1, self.config.vocab_size),
mlm_labels.view(-1),
ignore_index=-100
)
return mlm_loss
def forward_itm(self, input_ids, visual_features, is_matched):
"""
Image-Text Matching
判断视觉观察和语言指令是否匹配
"""
lang_features = self.lang_encoder(input_ids)
fused = self.cross_encoder(lang_features, visual_features)
# 使用[CLS] token进行分类
cls_output = fused[:, 0]
itm_logits = self.itm_head(cls_output)
itm_loss = F.cross_entropy(itm_logits, is_matched.long())
return itm_loss
def forward_sap(self, input_ids, visual_features, action_labels):
"""
Single Action Prediction
预测当前步应该执行的动作
"""
lang_features = self.lang_encoder(input_ids)
fused = self.cross_encoder(lang_features, visual_features)
global_feat = fused.mean(dim=1)
action_logits = self.sap_head(global_feat)
sap_loss = F.cross_entropy(action_logits, action_labels)
return sap_loss
def forward_sprel(self, visual_feat1, visual_feat2, spatial_labels):
"""
Spatial Relationship Prediction
预测两个viewpoint之间的空间关系
"""
combined = torch.cat([visual_feat1, visual_feat2], dim=-1)
sprel_logits = self.sprel_head(combined)
sprel_loss = F.cross_entropy(sprel_logits, spatial_labels)
return sprel_loss
def pretrain_step(self, batch):
"""预训练步骤"""
total_loss = 0
# MLM
if 'mlm' in self.config.pretrain_tasks:
mlm_loss = self.forward_mlm(
batch['input_ids'],
batch['visual_features'],
batch['mlm_labels']
)
total_loss += self.config.mlm_weight * mlm_loss
# ITM
if 'itm' in self.config.pretrain_tasks:
itm_loss = self.forward_itm(
batch['input_ids'],
batch['visual_features'],
batch['is_matched']
)
total_loss += self.config.itm_weight * itm_loss
# SAP
if 'sap' in self.config.pretrain_tasks:
sap_loss = self.forward_sap(
batch['input_ids'],
batch['visual_features'],
batch['action_labels']
)
total_loss += self.config.sap_weight * sap_loss
return total_loss
7.2 对比学习预训练
class ContrastiveVLNPretrainer(nn.Module):
"""基于对比学习的VLN预训练"""
def __init__(self, config):
super().__init__()
self.visual_encoder = VisualEncoder(config.visual_dim)
self.lang_encoder = LanguageEncoder(config.lang_dim)
# 投影头
self.visual_projector = nn.Sequential(
nn.Linear(config.visual_dim, config.hidden_dim),
nn.ReLU(),
nn.Linear(config.hidden_dim, config.embed_dim)
)
self.lang_projector = nn.Sequential(
nn.Linear(config.lang_dim, config.hidden_dim),
nn.ReLU(),
nn.Linear(config.hidden_dim, config.embed_dim)
)
self.temperature = nn.Parameter(torch.ones([]) * config.init_temp)
def forward(self, visual_inputs, lang_inputs):
"""
计算对比学习损失
正样本:同一episode的视觉-语言对
负样本:不同episode的视觉-语言对
"""
# 编码
visual_features = self.visual_encoder(visual_inputs)
lang_features = self.lang_encoder(lang_inputs)
# 投影到共享空间
visual_embeds = F.normalize(
self.visual_projector(visual_features.mean(dim=1)), dim=-1
)
lang_embeds = F.normalize(
self.lang_projector(lang_features.mean(dim=1)), dim=-1
)
# 计算相似度
logits = torch.matmul(visual_embeds, lang_embeds.T) / self.temperature
# InfoNCE损失
batch_size = logits.size(0)
labels = torch.arange(batch_size, device=logits.device)
loss_v2l = F.cross_entropy(logits, labels)
loss_l2v = F.cross_entropy(logits.T, labels)
return (loss_v2l + loss_l2v) / 2
class MomentumContrastVLN(nn.Module):
"""MoCo风格的VLN预训练"""
def __init__(self, config):
super().__init__()
# Query编码器
self.encoder_q = VLNEncoder(config)
# Momentum编码器(不更新梯度)
self.encoder_k = VLNEncoder(config)
for param_k in self.encoder_k.parameters():
param_k.requires_grad = False
# 队列
self.register_buffer("queue", torch.randn(config.embed_dim, config.queue_size))
self.queue = F.normalize(self.queue, dim=0)
self.register_buffer("queue_ptr", torch.zeros(1, dtype=torch.long))
self.momentum = config.momentum
self.temperature = config.temperature
@torch.no_grad()
def _momentum_update_key_encoder(self):
"""动量更新Key编码器"""
for param_q, param_k in zip(
self.encoder_q.parameters(),
self.encoder_k.parameters()
):
param_k.data = param_k.data * self.momentum + param_q.data * (1 - self.momentum)
@torch.no_grad()
def _dequeue_and_enqueue(self, keys):
"""更新队列"""
batch_size = keys.size(0)
ptr = int(self.queue_ptr)
# 入队
self.queue[:, ptr:ptr + batch_size] = keys.T
# 更新指针
ptr = (ptr + batch_size) % self.queue.size(1)
self.queue_ptr[0] = ptr
def forward(self, visual_q, lang_q, visual_k, lang_k):
"""MoCo前向传播"""
# Query
q = self.encoder_q(visual_q, lang_q)
q = F.normalize(q, dim=-1)
# Key
with torch.no_grad():
self._momentum_update_key_encoder()
k = self.encoder_k(visual_k, lang_k)
k = F.normalize(k, dim=-1)
# 正样本相似度
l_pos = torch.einsum('nc,nc->n', [q, k]).unsqueeze(-1)
# 负样本相似度
l_neg = torch.einsum('nc,ck->nk', [q, self.queue.clone().detach()])
# 计算损失
logits = torch.cat([l_pos, l_neg], dim=1) / self.temperature
labels = torch.zeros(logits.size(0), dtype=torch.long, device=logits.device)
loss = F.cross_entropy(logits, labels)
# 更新队列
self._dequeue_and_enqueue(k)
return loss
7.3 预训练数据构建
class VLNPretrainDataset(Dataset):
"""VLN预训练数据集"""
def __init__(self, data_path, tokenizer, feature_store):
self.data = self.load_data(data_path)
self.tokenizer = tokenizer
self.feature_store = feature_store
# 负样本池
self.instruction_pool = [item['instruction'] for item in self.data]
self.visual_pool = [item['scan'] + '_' + item['path'][0]
for item in self.data]
def __getitem__(self, idx):
item = self.data[idx]
# 正样本
instruction = item['instruction']
visual_features = self.get_visual_features(item['scan'], item['path'])
# MLM
input_ids, mlm_labels = self.create_mlm_example(instruction)
# ITM - 50%概率替换为负样本
is_matched = 1
if random.random() < 0.5:
neg_idx = random.randint(0, len(self.instruction_pool) - 1)
while neg_idx == idx:
neg_idx = random.randint(0, len(self.instruction_pool) - 1)
instruction = self.instruction_pool[neg_idx]
is_matched = 0
return {
'input_ids': input_ids,
'visual_features': visual_features,
'mlm_labels': mlm_labels,
'is_matched': is_matched,
'action_labels': self.get_action_label(item)
}
def create_mlm_example(self, text):
"""创建MLM训练样本"""
tokens = self.tokenizer.tokenize(text)
input_ids = self.tokenizer.convert_tokens_to_ids(tokens)
mlm_labels = [-100] * len(input_ids)
# 15%概率遮挡
for i in range(len(input_ids)):
if random.random() < 0.15:
mlm_labels[i] = input_ids[i]
rand = random.random()
if rand < 0.8:
input_ids[i] = self.tokenizer.mask_token_id
elif rand < 0.9:
input_ids[i] = random.randint(0, len(self.tokenizer) - 1)
return torch.tensor(input_ids), torch.tensor(mlm_labels)
8. 多任务学习
8.1 VLN多任务框架
class MultiTaskVLN(nn.Module):
"""多任务VLN模型"""
def __init__(self, config):
super().__init__()
# 共享编码器
self.visual_encoder = SharedVisualEncoder(config)
self.lang_encoder = SharedLanguageEncoder(config)
self.cross_encoder = SharedCrossEncoder(config)
# 任务特定头
self.task_heads = nn.ModuleDict({
'r2r': R2RHead(config), # Room-to-Room导航
'rxr': RxRHead(config), # 多语言导航
'reverie': REVERIEHead(config), # 远程物体导航
'soon': SOONHead(config), # 物体定位
})
# 任务路由器
self.task_router = TaskRouter(config)
def forward(self, batch, task_name):
"""
多任务前向传播
Args:
batch: 输入数据
task_name: 任务名称
"""
# 共享编码
visual_features = self.visual_encoder(batch['visual'])
lang_features = self.lang_encoder(batch['input_ids'], batch['attention_mask'])
fused_features = self.cross_encoder(visual_features, lang_features)
# 任务特定处理
task_head = self.task_heads[task_name]
output = task_head(fused_features, batch)
return output
def multi_task_train_step(self, task_batches):
"""
多任务训练步骤
Args:
task_batches: {'r2r': batch1, 'rxr': batch2, ...}
"""
total_loss = 0
task_losses = {}
for task_name, batch in task_batches.items():
output = self.forward(batch, task_name)
task_loss = self.compute_task_loss(output, batch, task_name)
# 任务权重
weight = self.get_task_weight(task_name)
total_loss += weight * task_loss
task_losses[task_name] = task_loss.item()
return total_loss, task_losses
class R2RHead(nn.Module):
"""R2R任务特定头"""
def __init__(self, config):
super().__init__()
self.action_predictor = nn.Linear(config.hidden_dim, 1)
def forward(self, fused_features, batch):
# 候选viewpoint评分
candidates = batch['candidate_features']
scores = self.score_candidates(fused_features, candidates)
return {'action_scores': scores}
class REVERIEHead(nn.Module):
"""REVERIE任务特定头:导航 + 物体定位"""
def __init__(self, config):
super().__init__()
# 导航头
self.nav_head = nn.Linear(config.hidden_dim, 1)
# 物体定位头
self.obj_head = nn.Sequential(
nn.Linear(config.hidden_dim, config.hidden_dim),
nn.ReLU(),
nn.Linear(config.hidden_dim, 4) # bbox坐标
)
def forward(self, fused_features, batch):
nav_scores = self.nav_head(fused_features)
obj_bbox = self.obj_head(fused_features)
return {
'action_scores': nav_scores,
'object_bbox': obj_bbox
}
8.2 辅助任务设计
class VLNWithAuxiliaryTasks(nn.Module):
"""带辅助任务的VLN模型"""
def __init__(self, config):
super().__init__()
self.main_model = VLNAgent(config)
# 辅助任务
self.progress_estimator = ProgressEstimator(config)
self.instruction_decoder = InstructionDecoder(config)
self.viewpoint_classifier = ViewpointClassifier(config)
def forward(self, batch):
# 主任务
main_output = self.main_model(batch)
# 辅助任务
aux_outputs = {}
# 1. 进度估计:预测完成了多少比例的路径
progress = self.progress_estimator(main_output['state'])
aux_outputs['progress'] = progress
# 2. 指令重构:从状态重构输入指令
reconstructed = self.instruction_decoder(main_output['state'])
aux_outputs['reconstructed_instruction'] = reconstructed
# 3. Viewpoint分类:预测当前位置类型(走廊、房间等)
vp_type = self.viewpoint_classifier(main_output['state'])
aux_outputs['viewpoint_type'] = vp_type
return main_output, aux_outputs
def compute_total_loss(self, main_output, aux_outputs, batch):
"""计算总损失"""
# 主任务损失
main_loss = F.cross_entropy(
main_output['action_scores'],
batch['target_action']
)
# 辅助任务损失
progress_loss = F.mse_loss(
aux_outputs['progress'],
batch['progress_label']
)
reconstruction_loss = F.cross_entropy(
aux_outputs['reconstructed_instruction'].view(-1, self.vocab_size),
batch['input_ids'].view(-1)
)
vp_loss = F.cross_entropy(
aux_outputs['viewpoint_type'],
batch['viewpoint_label']
)
# 加权求和
total_loss = (
main_loss +
0.1 * progress_loss +
0.1 * reconstruction_loss +
0.05 * vp_loss
)
return total_loss
class ProgressEstimator(nn.Module):
"""进度估计模块"""
def __init__(self, config):
super().__init__()
self.estimator = nn.Sequential(
nn.Linear(config.hidden_dim, config.hidden_dim // 2),
nn.ReLU(),
nn.Linear(config.hidden_dim // 2, 1),
nn.Sigmoid() # 输出0-1之间的进度值
)
def forward(self, state):
return self.estimator(state)
class InstructionDecoder(nn.Module):
"""指令重构解码器"""
def __init__(self, config):
super().__init__()
self.decoder = nn.LSTM(
config.hidden_dim,
config.hidden_dim,
batch_first=True
)
self.output_proj = nn.Linear(config.hidden_dim, config.vocab_size)
def forward(self, state, max_len=80):
"""从状态重构指令"""
batch_size = state.size(0)
# 初始化
hidden = (state.unsqueeze(0), torch.zeros_like(state.unsqueeze(0)))
input_feat = state.unsqueeze(1)
outputs = []
for _ in range(max_len):
output, hidden = self.decoder(input_feat, hidden)
logits = self.output_proj(output)
outputs.append(logits)
input_feat = output
return torch.cat(outputs, dim=1)
9. PyTorch实现
9.1 完整VLN Agent
class VLNAgent(nn.Module):
"""完整的VLN Agent实现"""
def __init__(self, config):
super().__init__()
self.config = config
# 编码器
self.lang_encoder = BERTLanguageEncoder(
bert_model=config.bert_model,
finetune=config.finetune_bert
)
self.visual_encoder = nn.Sequential(
nn.Linear(config.visual_dim, config.hidden_dim),
nn.ReLU(),
nn.Dropout(config.dropout)
)
# 角度编码
self.angle_encoder = nn.Linear(128, config.hidden_dim)
# 跨模态注意力
self.cross_attention = CrossModalAttention(
visual_dim=config.hidden_dim,
lang_dim=config.hidden_dim,
hidden_dim=config.hidden_dim
)
# 历史编码(可选)
if config.use_history:
self.history_encoder = nn.GRU(
config.hidden_dim,
config.hidden_dim,
batch_first=True
)
# 动作预测
self.action_predictor = nn.Sequential(
nn.Linear(config.hidden_dim * 2, config.hidden_dim),
nn.ReLU(),
nn.Dropout(config.dropout),
nn.Linear(config.hidden_dim, 1)
)
# 停止预测
self.stop_predictor = nn.Linear(config.hidden_dim, 2)
def forward(self, batch, mode='train'):
"""
Args:
batch: 包含以下字段的字典
- input_ids: [batch, seq_len]
- attention_mask: [batch, seq_len]
- visual_features: [batch, num_views, visual_dim]
- angle_features: [batch, num_views, 128]
- candidate_features: [batch, num_candidates, visual_dim]
- candidate_angles: [batch, num_candidates, 128]
- candidate_mask: [batch, num_candidates]
"""
# 1. 语言编码
lang_features, lang_global = self.lang_encoder(
batch['input_ids'],
batch['attention_mask']
)
# 2. 视觉编码
visual_features = self.visual_encoder(batch['visual_features'])
angle_features = self.angle_encoder(batch['angle_features'])
visual_features = visual_features + angle_features
# 3. 跨模态融合
fused_visual, fused_lang = self.cross_attention(
visual_features,
lang_features,
batch['attention_mask'].bool()
)
# 4. 全局表示
visual_global = fused_visual.mean(dim=1)
# 5. 候选viewpoint编码
candidate_features = self.visual_encoder(batch['candidate_features'])
candidate_angles = self.angle_encoder(batch['candidate_angles'])
candidate_features = candidate_features + candidate_angles
# 6. 动作分数计算
state = torch.cat([visual_global, lang_global], dim=-1)
state_expanded = state.unsqueeze(1).expand(-1, candidate_features.size(1), -1)
combined = torch.cat([state_expanded, candidate_features], dim=-1)
action_scores = self.action_predictor(combined).squeeze(-1)
# 应用mask
if 'candidate_mask' in batch:
action_scores = action_scores.masked_fill(
~batch['candidate_mask'],
float('-inf')
)
# 7. 停止预测
stop_logits = self.stop_predictor(visual_global)
return {
'action_scores': action_scores,
'stop_logits': stop_logits,
'state': state
}
9.2 训练循环
class VLNTrainer:
"""VLN训练器"""
def __init__(self, agent, optimizer, config):
self.agent = agent
self.optimizer = optimizer
self.config = config
self.action_criterion = nn.CrossEntropyLoss(ignore_index=-1)
self.stop_criterion = nn.CrossEntropyLoss()
def train_epoch(self, dataloader, env):
"""训练一个epoch"""
self.agent.train()
total_loss = 0
for batch_idx, batch in enumerate(dataloader):
# 移动到GPU
batch = {k: v.cuda() if torch.is_tensor(v) else v
for k, v in batch.items()}
# 初始化环境
env.reset(batch)
episode_loss = 0
done = False
step = 0
while not done and step < self.config.max_steps:
# 获取当前观察
obs = env.get_observation()
batch.update(obs)
# 前向传播
outputs = self.agent(batch, mode='train')
# 计算损失
# Teacher forcing: 使用真实动作
target_action = batch['target_action']
action_loss = self.action_criterion(
outputs['action_scores'],
target_action
)
target_stop = batch['target_stop']
stop_loss = self.stop_criterion(
outputs['stop_logits'],
target_stop
)
step_loss = action_loss + self.config.stop_weight * stop_loss
episode_loss += step_loss
# 执行动作(teacher forcing)
env.step(target_action)
# 检查是否结束
done = env.is_done()
step += 1
# 反向传播
self.optimizer.zero_grad()
episode_loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(
self.agent.parameters(),
self.config.max_grad_norm
)
self.optimizer.step()
total_loss += episode_loss.item()
if batch_idx % self.config.log_interval == 0:
print(f"Batch {batch_idx}, Loss: {episode_loss.item():.4f}")
return total_loss / len(dataloader)
def evaluate(self, dataloader, env):
"""评估"""
self.agent.eval()
all_results = []
with torch.no_grad():
for batch in dataloader:
batch = {k: v.cuda() if torch.is_tensor(v) else v
for k, v in batch.items()}
env.reset(batch)
trajectory = []
done = False
step = 0
while not done and step < self.config.max_steps:
obs = env.get_observation()
batch.update(obs)
outputs = self.agent(batch, mode='eval')
# 贪婪选择动作
action = outputs['action_scores'].argmax(dim=-1)
# 检查是否停止
stop_pred = outputs['stop_logits'].argmax(dim=-1)
if stop_pred.item() == 1:
action = torch.tensor([0]) # STOP action
env.step(action)
trajectory.append(action.item())
done = env.is_done()
step += 1
# 计算指标
result = self.compute_metrics(
trajectory,
batch['path'],
env.get_final_position()
)
all_results.append(result)
# 聚合结果
return self.aggregate_metrics(all_results)
10. 数学原理深入
10.1 注意力机制的数学表达
Scaled Dot-Product Attention
Attention ( Q , K , V ) = softmax ( Q K T d k ) V \text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V Attention(Q,K,V)=softmax(dkQKT)V
其中:
- Q ∈ R n × d k Q \in \mathbb{R}^{n \times d_k} Q∈Rn×dk:Query矩阵
- K ∈ R m × d k K \in \mathbb{R}^{m \times d_k} K∈Rm×dk:Key矩阵
- V ∈ R m × d v V \in \mathbb{R}^{m \times d_v} V∈Rm×dv:Value矩阵
- d k d_k dk:Key的维度,用于缩放防止梯度消失
import torch
import torch.nn.functional as F
import math
def scaled_dot_product_attention(query, key, value, mask=None):
"""
数学公式的PyTorch实现
Args:
query: [batch, num_heads, seq_len_q, d_k]
key: [batch, num_heads, seq_len_k, d_k]
value: [batch, num_heads, seq_len_k, d_v]
mask: [batch, 1, 1, seq_len_k] or [batch, 1, seq_len_q, seq_len_k]
"""
d_k = query.size(-1)
# QK^T / sqrt(d_k)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
# 应用mask
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
# softmax归一化
attention_weights = F.softmax(scores, dim=-1)
# 加权求和
output = torch.matmul(attention_weights, value)
return output, attention_weights
10.2 跨模态对齐的损失函数
对比学习损失 (Contrastive Loss)
用于拉近匹配的视觉-语言对,推远不匹配的对:
L c o n t r a s t = − log exp ( sim ( v i , l i ) / τ ) ∑ j = 1 N exp ( sim ( v i , l j ) / τ ) \mathcal{L}_{contrast} = -\log \frac{\exp(\text{sim}(v_i, l_i) / \tau)}{\sum_{j=1}^{N} \exp(\text{sim}(v_i, l_j) / \tau)} Lcontrast=−log∑j=1Nexp(sim(vi,lj)/τ)exp(sim(vi,li)/τ)
其中:
- v i v_i vi:视觉特征
- l i l_i li:语言特征
- τ \tau τ:温度参数
- sim ( ⋅ , ⋅ ) \text{sim}(\cdot, \cdot) sim(⋅,⋅):相似度函数(如余弦相似度)
class ContrastiveLoss(nn.Module):
"""跨模态对比学习损失"""
def __init__(self, temperature=0.07):
super().__init__()
self.temperature = temperature
def forward(self, visual_features, lang_features):
"""
Args:
visual_features: [batch, hidden_dim]
lang_features: [batch, hidden_dim]
"""
# L2归一化
visual_features = F.normalize(visual_features, dim=-1)
lang_features = F.normalize(lang_features, dim=-1)
# 计算相似度矩阵
logits = torch.matmul(visual_features, lang_features.T) / self.temperature
# 对角线是正样本
labels = torch.arange(logits.size(0), device=logits.device)
# 双向对比损失
loss_v2l = F.cross_entropy(logits, labels)
loss_l2v = F.cross_entropy(logits.T, labels)
return (loss_v2l + loss_l2v) / 2
10.3 强化学习目标
策略梯度 (Policy Gradient)
VLN中的动作选择可以建模为序列决策问题:
∇ θ J ( θ ) = E τ ∼ π θ [ ∑ t = 0 T ∇ θ log π θ ( a t ∣ s t ) ⋅ R ( τ ) ] \nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) \cdot R(\tau) \right] ∇θJ(θ)=Eτ∼πθ[t=0∑T∇θlogπθ(at∣st)⋅R(τ)]
带基线的REINFORCE
∇ θ J ( θ ) = E [ ∑ t = 0 T ∇ θ log π θ ( a t ∣ s t ) ⋅ ( R ( τ ) − b ) ] \nabla_\theta J(\theta) = \mathbb{E} \left[ \sum_{t=0}^{T} \nabla_\theta \log \pi_\theta(a_t|s_t) \cdot (R(\tau) - b) \right] ∇θJ(θ)=E[t=0∑T∇θlogπθ(at∣st)⋅(R(τ)−b)]
class REINFORCELoss:
"""REINFORCE策略梯度损失"""
def __init__(self, baseline_type='average'):
self.baseline_type = baseline_type
self.baseline = 0
self.alpha = 0.9 # 指数移动平均系数
def compute_loss(self, log_probs, rewards):
"""
Args:
log_probs: list of [batch] 每步动作的log概率
rewards: [batch] 回合奖励
"""
# 计算基线
if self.baseline_type == 'average':
baseline = rewards.mean()
# 更新移动平均基线
self.baseline = self.alpha * self.baseline + (1 - self.alpha) * baseline.item()
else:
baseline = self.baseline
# 计算优势
advantages = rewards - baseline
# 策略梯度损失
policy_loss = 0
for log_prob in log_probs:
policy_loss -= (log_prob * advantages).mean()
return policy_loss / len(log_probs)
11. 训练技巧与实践经验
11.1 数据增强策略
Speaker数据增强
使用Speaker模型生成合成指令,扩充训练数据:
class SpeakerAugmentation:
"""基于Speaker的数据增强"""
def __init__(self, speaker_model, env, num_augment=20):
self.speaker = speaker_model
self.env = env
self.num_augment = num_augment
def generate_augmented_data(self, original_data):
augmented = []
for _ in range(self.num_augment):
# 1. 随机采样路径
path = self.env.sample_random_path()
# 2. 提取路径视觉特征
visual_features = self.extract_path_features(path)
# 3. Speaker生成指令
instruction = self.speaker.generate(visual_features)
augmented.append({
'path': path,
'instruction': instruction,
'is_synthetic': True
})
return original_data + augmented
环境Dropout (EnvDrop)
随机遮挡视觉特征,增强泛化能力:
class EnvironmentDropout(nn.Module):
"""环境Dropout正则化"""
def __init__(self, drop_prob=0.5, feature_drop_prob=0.4):
super().__init__()
self.drop_prob = drop_prob
self.feature_drop_prob = feature_drop_prob
def forward(self, visual_features, training=True):
"""
Args:
visual_features: [batch, num_views, feat_dim]
"""
if not training:
return visual_features
batch_size, num_views, feat_dim = visual_features.shape
# 随机决定是否应用EnvDrop
if torch.rand(1).item() > self.drop_prob:
return visual_features
# 随机遮挡部分视角
view_mask = torch.rand(batch_size, num_views, 1, device=visual_features.device)
view_mask = (view_mask > self.feature_drop_prob).float()
return visual_features * view_mask
11.2 学习率调度策略
def get_vln_scheduler(optimizer, num_training_steps, warmup_ratio=0.1):
"""
VLN常用的学习率调度:
- Warmup阶段线性增长
- 之后余弦衰减
"""
num_warmup_steps = int(num_training_steps * warmup_ratio)
def lr_lambda(current_step):
if current_step < num_warmup_steps:
# 线性warmup
return float(current_step) / float(max(1, num_warmup_steps))
else:
# 余弦衰减
progress = float(current_step - num_warmup_steps) / \
float(max(1, num_training_steps - num_warmup_steps))
return max(0.0, 0.5 * (1.0 + math.cos(math.pi * progress)))
return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
# 使用示例
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
scheduler = get_vln_scheduler(optimizer, num_training_steps=10000, warmup_ratio=0.1)
11.3 混合训练策略
Teacher Forcing + Student Forcing
class MixedTraining:
"""混合训练策略"""
def __init__(self, teacher_forcing_ratio=0.5):
self.tf_ratio = teacher_forcing_ratio
def train_step(self, agent, env, batch, use_sample=False):
"""
Args:
use_sample: True时使用采样动作(Student Forcing)
False时使用真实动作(Teacher Forcing)
"""
total_loss = 0
env.reset(batch)
hidden = agent.init_hidden(batch['batch_size'])
for t in range(batch['max_steps']):
obs = env.get_observation()
# 前向传播
action_logits, hidden = agent(obs, hidden, batch['instructions'])
# 计算损失
loss = F.cross_entropy(action_logits, batch['target_actions'][:, t])
total_loss += loss
# 决定使用哪个动作
if use_sample or torch.rand(1).item() > self.tf_ratio:
# Student Forcing: 使用模型预测
action = action_logits.argmax(dim=-1)
else:
# Teacher Forcing: 使用真实动作
action = batch['target_actions'][:, t]
# 执行动作
env.step(action)
if env.all_done():
break
return total_loss
# 训练循环中的使用
trainer = MixedTraining(teacher_forcing_ratio=0.5)
for epoch in range(num_epochs):
for batch in dataloader:
# 交替使用两种策略
if epoch % 2 == 0:
loss = trainer.train_step(agent, env, batch, use_sample=False)
else:
loss = trainer.train_step(agent, env, batch, use_sample=True)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(agent.parameters(), max_norm=5.0)
optimizer.step()
11.4 实验对比:训练技巧的效果
以下是在R2R val_unseen上的消融实验结果:
| 配置 | SR | SPL | 备注 |
|---|---|---|---|
| Baseline | 45.2 | 40.1 | 无任何技巧 |
| + EnvDrop | 48.7 | 43.2 | +3.5% SR |
| + Speaker增强 | 52.1 | 45.8 | +3.4% SR |
| + 混合训练 | 53.4 | 47.1 | +1.3% SR |
| + 学习率调度 | 54.8 | 48.3 | +1.4% SR |
| All Combined | 58.2 | 51.6 | 总计 +13% SR |
12. 消融实验与分析
12.1 模型组件消融
HAMT组件消融实验:
| 配置 | SR | SPL | 说明 |
|---|---|---|---|
| Full HAMT | 66.0 | 61.2 | 完整模型 |
| - 历史编码器 | 62.3 | 57.1 | 移除历史编码 |
| - 跨模态注意力 | 58.7 | 53.4 | 使用简单拼接 |
| - 位置编码 | 64.1 | 59.3 | 移除位置编码 |
| - 预训练 | 55.2 | 49.8 | 随机初始化 |
def run_ablation_study(model_config, dataset, ablations):
"""运行消融实验"""
results = {}
# 基准模型
base_model = VLNAgent(model_config)
base_results = evaluate(base_model, dataset)
results['full'] = base_results
# 各个消融
for ablation_name, ablation_config in ablations.items():
# 修改配置
ablated_config = model_config.copy()
ablated_config.update(ablation_config)
# 训练和评估
ablated_model = VLNAgent(ablated_config)
ablated_results = evaluate(ablated_model, dataset)
results[ablation_name] = ablated_results
return results
# 消融配置
ablations = {
'no_history': {'use_history': False},
'no_cross_attention': {'cross_attention_type': 'none'},
'no_position_encoding': {'use_position_encoding': False},
'no_pretrain': {'load_pretrained': False}
}
results = run_ablation_study(config, val_unseen_dataset, ablations)
12.2 预训练影响分析
| 预训练策略 | R2R SR | R2R SPL | RxR SR | 训练时间 |
|---|---|---|---|---|
| 无预训练 | 48.3 | 42.1 | 35.2 | 8h |
| ImageNet预训练视觉 | 54.7 | 48.3 | 40.1 | 8h |
| BERT预训练语言 | 58.2 | 51.8 | 44.3 | 10h |
| VLN专用预训练 | 63.5 | 57.2 | 51.8 | 24h + 10h |
| 大规模预训练 | 68.1 | 62.3 | 58.2 | 72h + 10h |
预训练任务贡献分析:
# 不同预训练任务的贡献
pretrain_tasks_ablation = {
'full': {'mlm': True, 'itm': True, 'sap': True},
'no_mlm': {'mlm': False, 'itm': True, 'sap': True},
'no_itm': {'mlm': True, 'itm': False, 'sap': True},
'no_sap': {'mlm': True, 'itm': True, 'sap': False},
'mlm_only': {'mlm': True, 'itm': False, 'sap': False}
}
# 结果
"""
| 预训练任务 | SR | SPL | 分析 |
|-----------|-----|-----|------|
| Full (MLM+ITM+SAP) | 63.5 | 57.2 | 基准 |
| - MLM | 60.2 | 54.1 | MLM贡献约3.3% |
| - ITM | 61.8 | 55.6 | ITM贡献约1.7% |
| - SAP | 58.7 | 52.3 | SAP贡献约4.8% |
| MLM only | 56.4 | 50.2 | 单任务不足 |
"""
12.3 注意力机制对比
class AttentionComparison:
"""不同注意力机制的对比实验"""
attention_types = {
'soft_attention': SoftAttention,
'multi_head': nn.MultiheadAttention,
'cross_attention': CrossModalAttention,
'co_attention': CoAttention,
'sparse_attention': SparseAttention
}
@staticmethod
def compare_attention_mechanisms():
"""对比不同注意力机制"""
results = {}
for attn_name, attn_class in AttentionComparison.attention_types.items():
model = VLNAgent(attention_type=attn_name)
metrics = evaluate(model)
results[attn_name] = metrics
return results
# 实验结果
"""
| 注意力机制 | SR | SPL | 参数量 | 推理速度 |
|-----------|-----|-----|-------|---------|
| Soft Attention | 52.3 | 46.1 | 2.1M | 100fps |
| Multi-Head (8h) | 58.7 | 52.3 | 8.4M | 85fps |
| Cross Attention | 61.2 | 55.1 | 12.6M | 72fps |
| Co-Attention | 59.8 | 53.7 | 10.2M | 68fps |
| Sparse Attention | 60.5 | 54.2 | 8.4M | 92fps |
"""
12.4 视觉特征对比
| 特征提取器 | SR | SPL | 特征维度 | 提取速度 |
|---|---|---|---|---|
| ResNet-50 | 54.2 | 47.8 | 2048 | 150fps |
| ResNet-152 | 58.7 | 52.3 | 2048 | 80fps |
| ViT-B/16 | 61.3 | 55.1 | 768 | 65fps |
| ViT-L/14 | 64.2 | 58.3 | 1024 | 35fps |
| CLIP ViT-B | 62.8 | 56.7 | 512 | 55fps |
| EVA-CLIP | 66.5 | 60.2 | 768 | 40fps |
12.5 路径长度影响分析
def analyze_path_length_effect(model, dataset):
"""分析路径长度对性能的影响"""
length_bins = [(0, 5), (5, 10), (10, 15), (15, 20), (20, float('inf'))]
results = {}
for min_len, max_len in length_bins:
# 过滤数据
filtered = [
item for item in dataset
if min_len <= len(item['path']) < max_len
]
if not filtered:
continue
# 评估
metrics = evaluate_subset(model, filtered)
results[f'{min_len}-{max_len}'] = metrics
return results
# 结果示例
"""
| 路径长度 | 数据占比 | SR | SPL | NE |
|---------|---------|-----|-----|-----|
| 0-5 步 | 12% | 78.3 | 72.1 | 1.82 |
| 5-10 步 | 48% | 65.2 | 58.7 | 3.45 |
| 10-15 步 | 28% | 52.1 | 45.3 | 5.12 |
| 15-20 步 | 9% | 41.3 | 34.8 | 6.78 |
| 20+ 步 | 3% | 28.7 | 22.1 | 8.92 |
观察:
1. 短路径(<5步)性能最好,SR接近80%
2. 性能随路径长度增加而显著下降
3. 长路径(>15步)仍是主要挑战
"""
12.6 指令复杂度分析
def analyze_instruction_complexity(model, dataset):
"""分析指令复杂度对性能的影响"""
def compute_complexity(instruction):
"""计算指令复杂度得分"""
words = instruction.split()
# 词数
word_count = len(words)
# 方向词数量
direction_words = ['left', 'right', 'straight', 'turn', 'go']
direction_count = sum(1 for w in words if w.lower() in direction_words)
# 地标词数量
landmark_count = count_landmarks(instruction)
return {
'word_count': word_count,
'direction_count': direction_count,
'landmark_count': landmark_count,
'complexity_score': word_count * 0.3 + direction_count * 0.4 + landmark_count * 0.3
}
# 按复杂度分组
low_complexity = []
medium_complexity = []
high_complexity = []
for item in dataset:
complexity = compute_complexity(item['instruction'])
score = complexity['complexity_score']
if score < 10:
low_complexity.append(item)
elif score < 20:
medium_complexity.append(item)
else:
high_complexity.append(item)
return {
'low': evaluate_subset(model, low_complexity),
'medium': evaluate_subset(model, medium_complexity),
'high': evaluate_subset(model, high_complexity)
}
# 结果
"""
| 复杂度 | SR | SPL | 常见失败原因 |
|-------|-----|-----|-------------|
| 低 | 72.1 | 66.3 | 过早停止 |
| 中 | 61.5 | 55.2 | 方向错误 |
| 高 | 48.3 | 41.7 | 地标识别失败 |
"""
12.7 错误分析
class ErrorAnalyzer:
"""导航错误分析器"""
ERROR_TYPES = {
'early_stop': '过早停止',
'late_stop': '过晚停止',
'wrong_direction': '方向错误',
'landmark_miss': '地标遗漏',
'backtrack': '回溯失败',
'stuck': '卡住不动'
}
def analyze_errors(self, predictions, ground_truths):
"""分析错误类型分布"""
error_counts = {k: 0 for k in self.ERROR_TYPES}
total_errors = 0
for pred, gt in zip(predictions, ground_truths):
if not self.is_success(pred, gt):
total_errors += 1
error_type = self.classify_error(pred, gt)
error_counts[error_type] += 1
# 计算比例
error_distribution = {
k: v / total_errors if total_errors > 0 else 0
for k, v in error_counts.items()
}
return error_distribution
def classify_error(self, pred, gt):
"""分类错误类型"""
pred_path = pred['trajectory']
gt_path = gt['path']
# 检查是否过早停止
if len(pred_path) < len(gt_path) * 0.5:
return 'early_stop'
# 检查是否过晚停止
if len(pred_path) > len(gt_path) * 1.5:
return 'late_stop'
# 检查方向错误
if self.has_wrong_direction(pred_path, gt_path):
return 'wrong_direction'
# 检查是否卡住
if self.is_stuck(pred_path):
return 'stuck'
return 'landmark_miss' # 默认为地标识别问题
# 典型错误分布
"""
| 错误类型 | VLNBERT | HAMT | DUET |
|---------|---------|------|------|
| 过早停止 | 28% | 22% | 18% |
| 过晚停止 | 15% | 18% | 20% |
| 方向错误 | 32% | 28% | 25% |
| 地标遗漏 | 18% | 24% | 28% |
| 回溯失败 | 5% | 6% | 7% |
| 卡住不动 | 2% | 2% | 2% |
"""
总结
本文详细介绍了VLN的核心模型架构:
关键组件
- 编码器:LSTM/BERT语言编码,ResNet/ViT视觉编码
- 跨模态融合:Attention机制,Cross-Modal Transformer
- 动作解码:LSTM/Transformer解码器,候选viewpoint打分
经典模型演进
- Seq2Seq → Speaker-Follower → VLNBERT → HAMT
- 从简单的注意力机制到复杂的Transformer架构
- 从单步决策到历史感知的序列建模
参考文献
[1] Anderson P, et al. “Vision-and-Language Navigation: Interpreting visually-grounded navigation instructions in real environments.” CVPR 2018.
[2] Fried D, et al. “Speaker-Follower Models for Vision-and-Language Navigation.” NeurIPS 2018.
[3] Hong Y, et al. “VLN BERT: A Recurrent Vision-and-Language BERT for Navigation.” CVPR 2021.
[4] Chen S, et al. “History Aware Multimodal Transformer for Vision-and-Language Navigation.” NeurIPS 2021.
[5] Chen S, et al. “Think Global, Act Local: Dual-scale Graph Transformer for Vision-and-Language Navigation.” CVPR 2022.
[6] An D, et al. “BEVBert: Multimodal Map Pre-training for Language-guided Navigation.” ICCV 2023.
[7] Tan H, et al. “Learning to Navigate Unseen Environments: Back Translation with Environmental Dropout.” NAACL 2019.
[8] Hao W, et al. “Towards Learning a Generic Agent for Vision-and-Language Navigation via Pre-training.” CVPR 2020.
[9] Majumdar A, et al. “Improving Vision-and-Language Navigation with Image-Text Pairs from the Web.” ECCV 2020.
[10] Guhur P L, et al. “Airbert: In-domain Pretraining for Vision-and-Language Navigation.” ICCV 2021.
[11] Vaswani A, et al. “Attention is All You Need.” NeurIPS 2017.
[12] Devlin J, et al. “BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding.” NAACL 2019.
[13] He K, et al. “Deep Residual Learning for Image Recognition.” CVPR 2016.
[14] Dosovitskiy A, et al. “An Image is Worth 16x16 Words: Transformers for Image Recognition at Scale.” ICLR 2021.
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)