1.常见can日志文件

sample.blf
sampl.asc

2.asc文件转化成blf

在这里插入图片描述

在这里插入图片描述

3.将blf文件中的信号提取到excle

#python

import os
import pandas as pd
import cantools
from can import BLFReader

#功能:批量处理blf文件,已知DBC和信号所在的message(ID),提取信号并存储到excle

def batch_extract_to_excel(blf_folder, dbc_path, output_excel):
    """
    批量处理BLF文件并合并到单个Excel
    :param blf_folder: BLF文件所在文件夹路径
    :param dbc_path: DBC文件路径
    :param output_excel: 输出Excel文件路径
    """
    # 检查文件路径是否存在
    if not os.path.exists(blf_folder):
        print(f"错误: BLF文件夹路径不存在: {blf_folder}")
        return

    if not os.path.exists(dbc_path):
        print(f"错误: DBC文件路径不存在: {dbc_path}")
        return

    # 加载DBC数据库——信号ID获取——改
    try:
        db = cantools.database.load_file(dbc_path)

        # 获取HAP消息的ID(包含Distance信号)
        hap_fd1_id = db.get_message_by_name('HAP').frame_id

        # 获取ABM2消息的ID(包含VehLgtAccel和VehLatAccel信号)
        abm2_id = db.get_message_by_name('ABM').frame_id

        print(f"HAP消息ID: 0x{hap_id:x}")
        print(f"ABM消息ID: 0x{abm_id:x}")

    except Exception as e:
        print(f"加载DBC文件失败: {str(e)}")
        return

    all_data = []
    processed_files = 0

    # 获取所有BLF文件
    blf_files = [f for f in os.listdir(blf_folder) if f.lower().endswith('.blf')]

    if not blf_files:
        print("未找到BLF文件")
        return

    print(f"找到 {len(blf_files)} 个BLF文件")

    # 遍历文件夹中的所有BLF文件
    for filename in blf_files:
        blf_path = os.path.join(blf_folder, filename)
        print(f"正在处理文件: {filename}")

        try:
            with BLFReader(blf_path) as reader:
                # 用于存储最近收到的ABM信号值——改
                latest_abm2_data = {
                    'VehLgtAccel': 0.0,  # 纵向加速度
                    'VehLatAccel': 0.0,  # 横向加速度
                    'abm2_timestamp': None
                }

                file_data_count = 0
                hap_fd1_count = 0
                abm2_count = 0

                for msg in reader:
                    # 处理ABM消息(包含VehLgtAccel和VehLatAccel)——改
                    if msg.arbitration_id == abm_id:    #打印消息ABM里的信号
                        try:
                            decoded = db.decode_message(msg.arbitration_id, msg.data)
                            latest_abm2_data['VehLgtAccel'] = float(decoded.get('VehLgtAccel', 0.0))
                            latest_abm2_data['VehLatAccel'] = float(decoded.get('VehLatAccel', 0.0))
                            latest_abm2_data['abm_timestamp'] = msg.timestamp
                            abm_count += 1
                        except Exception as e:
                            print(f"  解码ABM消息时出错: {str(e)}")
                            continue

                    # 处理HAP_FD1消息(包含BrkDistance)
                    elif msg.arbitration_id == hap_id:   #打印消息HAP里的信号
                        try:
                            decoded = db.decode_message(msg.arbitration_id, msg.data)

                            # 只有当有ABM2数据时才记录(确保所有信号都有值)
                            if latest_abm2_data['abm_timestamp'] is not None:
                                all_data.append({
                                    'SourceFile': filename,
                                    'Timestamp': msg.timestamp,  # 使用HAP的时间戳
                                    'VehLgtAccel': latest_abm2_data['VehLgtAccel'],  # 来自ABM2 - 纵向加速度
                                    'VehLatAccel': latest_abm2_data['VehLatAccel'],  # 来自ABM2 - 横向加速度
                                    'APS_ESP_BrkDistance': int(decoded.get('BrkDistance', 0)),
                                    # 来自HAP - 制动距离
                                    'ABM2_Timestamp': latest_abm2_data['abm_timestamp']  # 记录ABM的时间戳用于参考
                                })
                                file_data_count += 1

                            hap_fd1_count += 1

                        except Exception as e:
                            print(f"  解码HAP消息时出错: {str(e)}")
                            continue

                print(f"  从文件 {filename} 提取了 {file_data_count} 条数据")
                print(f"  HAP消息数: {hap_count}, ABM消息数: {abm_count}")
                processed_files += 1

        except Exception as e:
            print(f"处理文件 {filename} 时出错: {str(e)}")
            continue

    # 保存到单个Excel文件
    if all_data:
        df = pd.DataFrame(all_data)

        # 按文件名和时间戳排序
        df = df.sort_values(['SourceFile', 'Timestamp']).reset_index(drop=True)

        # 添加数据统计信息
        print(f"\n=== 数据统计 ===")
        print(f"处理文件数: {processed_files}/{len(blf_files)}")
        print(f"总数据条数: {len(df)}")
        print(f"\n信号数据类型:")
        print(f"VehLgtAccel: {df['VehLgtAccel'].dtype}")
        print(f"VehLatAccel: {df['VehLatAccel'].dtype}")
        print(f"APS_ESP_BrkDistance: {df['APS_ESP_BrkDistance'].dtype}")

        print(f"\n信号数值范围:")
        print(f"VehLgtAccel: {df['VehLgtAccel'].min():.6f} - {df['VehLgtAccel'].max():.6f}")
        print(f"VehLatAccel: {df['VehLatAccel'].min():.6f} - {df['VehLatAccel'].max():.6f}")
        print(f"APS_ESP_BrkDistance: {df['BrkDistance'].min()} - {df['BrkDistance'].max()}")

        print(f"\n前5条数据样例:")
        print(df.head())

        try:
            # 创建输出目录(如果不存在)
            output_dir = os.path.dirname(output_excel)
            if output_dir and not os.path.exists(output_dir):
                os.makedirs(output_dir)

            # 保存Excel
            with pd.ExcelWriter(output_excel, engine='openpyxl') as writer:
                # 主工作表包含所有数据
                df.to_excel(writer, sheet_name='All_Data', index=False)

                # 为每个文件创建单独的工作表
                for filename in df['SourceFile'].unique():
                    file_df = df[df['SourceFile'] == filename]
                    sheet_name = os.path.splitext(filename)[0][:30]  # 限制工作表名称长度
                    file_df.to_excel(writer, sheet_name=sheet_name, index=False)

                # 添加统计信息工作表
                stats_data = {
                    '统计项': [
                        '处理文件数', '总数据条数',
                        'VehLgtAccel最小值', 'VehLgtAccel最大值',
                        'VehLatAccel最小值', 'VehLatAccel最大值',
                        'BrkDistance最小值', 'BrkDistance最大值'
                    ],
                    '数值': [
                        processed_files, len(df),
                        df['VehLgtAccel'].min(), df['VehLgtAccel'].max(),
                        df['VehLatAccel'].min(), df['VehLatAccel'].max(),
                        df['BrkDistance'].min(), df['APS_ESP_BrkDistance'].max()
                    ]
                }
                stats_df = pd.DataFrame(stats_data)
                stats_df.to_excel(writer, sheet_name='Statistics', index=False)

            print(f"\n所有数据已保存到 {output_excel}")

        except PermissionError:
            print(f"错误: 无法写入文件 {output_excel},文件可能被其他程序占用")
            # 尝试保存为CSV格式作为备选
            csv_file = output_excel.replace('.xlsx', '.csv')
            df.to_csv(csv_file, index=False)
            print(f"数据已保存为CSV格式: {csv_file}")

        except Exception as e:
            print(f"保存Excel文件时出错: {str(e)}")

    else:
        print("没有提取到有效数据")


# 调试函数:检查DBC文件中的信号定义
def check_dbc_signals(dbc_path):
    """检查DBC文件中的信号定义"""
    try:
        db = cantools.database.load_file(dbc_path)

        print("=== DBC文件信号检查 ===")

        # 检查HAP_FD1消息
        print("\nHAP消息信号:")
        hap_fd1_msg = db.get_message_by_name('HAP')
        for signal in hap_fd1_msg.signals:
            if signal.name == 'BrkDistance':
                print(f"  *** {signal.name}: scale={signal.scale}, offset={signal.offset}, unit='{signal.unit}' ***")
            else:
                print(f"  {signal.name}: scale={signal.scale}, offset={signal.offset}, unit='{signal.unit}'")

        # 检查ABM2消息
        print("\nABM2消息信号:")
        abm2_msg = db.get_message_by_name('ABM')
        for signal in abm2_msg.signals:
            if signal.name in ['VehLgtAccel', 'VehLatAccel']:
                print(f"  *** {signal.name}: scale={signal.scale}, offset={signal.offset}, unit='{signal.unit}' ***")
            else:
                print(f"  {signal.name}: scale={signal.scale}, offset={signal.offset}, unit='{signal.unit}'")

        # 检查消息发送周期(如果DBC中有定义)
        print(f"\n消息周期信息:")
        print(f"HAP_FD1周期: {getattr(hap_msg, 'cycle_time', '未定义')} ms")
        print(f"ABM2周期: {getattr(abm_msg, 'cycle_time', '未定义')} ms")

    except Exception as e:
        print(f"检查DBC信号时出错: {str(e)}")


if __name__ == "__main__":
    # 配置路径
    blf_folder = r"C:\sample_blf"# 替换为BLF文件夹路径
    dbc_file = r"C:\sample.dbc"# 替换为DBC文件路径
    output_file = r"C:\acc3.xlsx" # 输出Excel路径

    # 首先检查DBC文件信号定义
    check_dbc_signals(dbc_file)

    print("\n" + "=" * 50)
    print("开始批量提取数据...")
    print("=" * 50)

    # 执行批量提取
    batch_extract_to_excel(blf_folder, dbc_file, output_file)
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐