CANOE——can日志数据处理
【代码】can日志数据处理。
·
1.常见can日志文件
sample.blf
sampl.asc
2.asc文件转化成blf


3.将blf文件中的信号提取到excle
#python
import os
import pandas as pd
import cantools
from can import BLFReader
#功能:批量处理blf文件,已知DBC和信号所在的message(ID),提取信号并存储到excle
def batch_extract_to_excel(blf_folder, dbc_path, output_excel):
"""
批量处理BLF文件并合并到单个Excel
:param blf_folder: BLF文件所在文件夹路径
:param dbc_path: DBC文件路径
:param output_excel: 输出Excel文件路径
"""
# 检查文件路径是否存在
if not os.path.exists(blf_folder):
print(f"错误: BLF文件夹路径不存在: {blf_folder}")
return
if not os.path.exists(dbc_path):
print(f"错误: DBC文件路径不存在: {dbc_path}")
return
# 加载DBC数据库——信号ID获取——改
try:
db = cantools.database.load_file(dbc_path)
# 获取HAP消息的ID(包含Distance信号)
hap_fd1_id = db.get_message_by_name('HAP').frame_id
# 获取ABM2消息的ID(包含VehLgtAccel和VehLatAccel信号)
abm2_id = db.get_message_by_name('ABM').frame_id
print(f"HAP消息ID: 0x{hap_id:x}")
print(f"ABM消息ID: 0x{abm_id:x}")
except Exception as e:
print(f"加载DBC文件失败: {str(e)}")
return
all_data = []
processed_files = 0
# 获取所有BLF文件
blf_files = [f for f in os.listdir(blf_folder) if f.lower().endswith('.blf')]
if not blf_files:
print("未找到BLF文件")
return
print(f"找到 {len(blf_files)} 个BLF文件")
# 遍历文件夹中的所有BLF文件
for filename in blf_files:
blf_path = os.path.join(blf_folder, filename)
print(f"正在处理文件: {filename}")
try:
with BLFReader(blf_path) as reader:
# 用于存储最近收到的ABM信号值——改
latest_abm2_data = {
'VehLgtAccel': 0.0, # 纵向加速度
'VehLatAccel': 0.0, # 横向加速度
'abm2_timestamp': None
}
file_data_count = 0
hap_fd1_count = 0
abm2_count = 0
for msg in reader:
# 处理ABM消息(包含VehLgtAccel和VehLatAccel)——改
if msg.arbitration_id == abm_id: #打印消息ABM里的信号
try:
decoded = db.decode_message(msg.arbitration_id, msg.data)
latest_abm2_data['VehLgtAccel'] = float(decoded.get('VehLgtAccel', 0.0))
latest_abm2_data['VehLatAccel'] = float(decoded.get('VehLatAccel', 0.0))
latest_abm2_data['abm_timestamp'] = msg.timestamp
abm_count += 1
except Exception as e:
print(f" 解码ABM消息时出错: {str(e)}")
continue
# 处理HAP_FD1消息(包含BrkDistance)
elif msg.arbitration_id == hap_id: #打印消息HAP里的信号
try:
decoded = db.decode_message(msg.arbitration_id, msg.data)
# 只有当有ABM2数据时才记录(确保所有信号都有值)
if latest_abm2_data['abm_timestamp'] is not None:
all_data.append({
'SourceFile': filename,
'Timestamp': msg.timestamp, # 使用HAP的时间戳
'VehLgtAccel': latest_abm2_data['VehLgtAccel'], # 来自ABM2 - 纵向加速度
'VehLatAccel': latest_abm2_data['VehLatAccel'], # 来自ABM2 - 横向加速度
'APS_ESP_BrkDistance': int(decoded.get('BrkDistance', 0)),
# 来自HAP - 制动距离
'ABM2_Timestamp': latest_abm2_data['abm_timestamp'] # 记录ABM的时间戳用于参考
})
file_data_count += 1
hap_fd1_count += 1
except Exception as e:
print(f" 解码HAP消息时出错: {str(e)}")
continue
print(f" 从文件 {filename} 提取了 {file_data_count} 条数据")
print(f" HAP消息数: {hap_count}, ABM消息数: {abm_count}")
processed_files += 1
except Exception as e:
print(f"处理文件 {filename} 时出错: {str(e)}")
continue
# 保存到单个Excel文件
if all_data:
df = pd.DataFrame(all_data)
# 按文件名和时间戳排序
df = df.sort_values(['SourceFile', 'Timestamp']).reset_index(drop=True)
# 添加数据统计信息
print(f"\n=== 数据统计 ===")
print(f"处理文件数: {processed_files}/{len(blf_files)}")
print(f"总数据条数: {len(df)}")
print(f"\n信号数据类型:")
print(f"VehLgtAccel: {df['VehLgtAccel'].dtype}")
print(f"VehLatAccel: {df['VehLatAccel'].dtype}")
print(f"APS_ESP_BrkDistance: {df['APS_ESP_BrkDistance'].dtype}")
print(f"\n信号数值范围:")
print(f"VehLgtAccel: {df['VehLgtAccel'].min():.6f} - {df['VehLgtAccel'].max():.6f}")
print(f"VehLatAccel: {df['VehLatAccel'].min():.6f} - {df['VehLatAccel'].max():.6f}")
print(f"APS_ESP_BrkDistance: {df['BrkDistance'].min()} - {df['BrkDistance'].max()}")
print(f"\n前5条数据样例:")
print(df.head())
try:
# 创建输出目录(如果不存在)
output_dir = os.path.dirname(output_excel)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# 保存Excel
with pd.ExcelWriter(output_excel, engine='openpyxl') as writer:
# 主工作表包含所有数据
df.to_excel(writer, sheet_name='All_Data', index=False)
# 为每个文件创建单独的工作表
for filename in df['SourceFile'].unique():
file_df = df[df['SourceFile'] == filename]
sheet_name = os.path.splitext(filename)[0][:30] # 限制工作表名称长度
file_df.to_excel(writer, sheet_name=sheet_name, index=False)
# 添加统计信息工作表
stats_data = {
'统计项': [
'处理文件数', '总数据条数',
'VehLgtAccel最小值', 'VehLgtAccel最大值',
'VehLatAccel最小值', 'VehLatAccel最大值',
'BrkDistance最小值', 'BrkDistance最大值'
],
'数值': [
processed_files, len(df),
df['VehLgtAccel'].min(), df['VehLgtAccel'].max(),
df['VehLatAccel'].min(), df['VehLatAccel'].max(),
df['BrkDistance'].min(), df['APS_ESP_BrkDistance'].max()
]
}
stats_df = pd.DataFrame(stats_data)
stats_df.to_excel(writer, sheet_name='Statistics', index=False)
print(f"\n所有数据已保存到 {output_excel}")
except PermissionError:
print(f"错误: 无法写入文件 {output_excel},文件可能被其他程序占用")
# 尝试保存为CSV格式作为备选
csv_file = output_excel.replace('.xlsx', '.csv')
df.to_csv(csv_file, index=False)
print(f"数据已保存为CSV格式: {csv_file}")
except Exception as e:
print(f"保存Excel文件时出错: {str(e)}")
else:
print("没有提取到有效数据")
# 调试函数:检查DBC文件中的信号定义
def check_dbc_signals(dbc_path):
"""检查DBC文件中的信号定义"""
try:
db = cantools.database.load_file(dbc_path)
print("=== DBC文件信号检查 ===")
# 检查HAP_FD1消息
print("\nHAP消息信号:")
hap_fd1_msg = db.get_message_by_name('HAP')
for signal in hap_fd1_msg.signals:
if signal.name == 'BrkDistance':
print(f" *** {signal.name}: scale={signal.scale}, offset={signal.offset}, unit='{signal.unit}' ***")
else:
print(f" {signal.name}: scale={signal.scale}, offset={signal.offset}, unit='{signal.unit}'")
# 检查ABM2消息
print("\nABM2消息信号:")
abm2_msg = db.get_message_by_name('ABM')
for signal in abm2_msg.signals:
if signal.name in ['VehLgtAccel', 'VehLatAccel']:
print(f" *** {signal.name}: scale={signal.scale}, offset={signal.offset}, unit='{signal.unit}' ***")
else:
print(f" {signal.name}: scale={signal.scale}, offset={signal.offset}, unit='{signal.unit}'")
# 检查消息发送周期(如果DBC中有定义)
print(f"\n消息周期信息:")
print(f"HAP_FD1周期: {getattr(hap_msg, 'cycle_time', '未定义')} ms")
print(f"ABM2周期: {getattr(abm_msg, 'cycle_time', '未定义')} ms")
except Exception as e:
print(f"检查DBC信号时出错: {str(e)}")
if __name__ == "__main__":
# 配置路径
blf_folder = r"C:\sample_blf"# 替换为BLF文件夹路径
dbc_file = r"C:\sample.dbc"# 替换为DBC文件路径
output_file = r"C:\acc3.xlsx" # 输出Excel路径
# 首先检查DBC文件信号定义
check_dbc_signals(dbc_file)
print("\n" + "=" * 50)
print("开始批量提取数据...")
print("=" * 50)
# 执行批量提取
batch_extract_to_excel(blf_folder, dbc_file, output_file)
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)