NI USB-4431数据采集卡的数据采集与保存方法

在这里插入图片描述

代码如下

"""
NI USB-4431数据采集卡 - 采集第一个通道数据并保存到CSV文件
作者: Assistant
日期: 2025-10-21
"""

import nidaqmx
import nidaqmx.system
from nidaqmx.constants import AcquisitionType, Edge, READ_ALL_AVAILABLE
import csv
import datetime
import numpy as np
import time
import os
import wave

def list_devices():
    """列出系统中所有可用的NI设备"""
    print("检测到的NI设备:")
    system = nidaqmx.system.System.local()
    for device in system.devices:
        print(f"  设备名称: {device.name}")
        print(f"  产品类型: {device.product_type}")
        print(f"  序列号: {device.dev_serial_num}")
        print(f"  AI物理通道数: {device.ai_physical_chans}")
        print()

def acquire_and_save_to_csv_and_wav(device_name="Dev1", channel="ai0", 
                                   sample_rate=48000, duration=5.0, 
                                   output_file=None, 
                                   voltage_range=1.0):
    """
    使用NI USB-4431采集卡采集指定通道的数据并保存到CSV和WAV文件
    
    参数:
        device_name (str): 设备名称,默认为"Dev1"
        channel (str): 要采集的通道,默认为"ai0"(第一个通道)
        sample_rate (float): 采样率,单位Hz,默认为48000Hz
        duration (float): 采集时长,单位秒,默认为5秒
        output_file (str): 输出CSV文件路径,如果为None则自动生成
        voltage_range (float): 电压范围,单位V,默认为±1V
    """
    
    # 如果没有指定输出文件名,则自动生成
    if output_file is None:
        timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        output_file = f"usb4431_ch{channel[-1]}_data_{timestamp}.csv"
    
    # 生成WAV文件名
    wav_file = output_file.replace('.csv', '.wav')
    
    # 计算需要采集的样本数
    samples_per_channel = int(sample_rate * duration)
    
    print(f"开始采集数据...")
    print(f"设备: {device_name}")
    print(f"通道: {channel}")
    print(f"采样率: {sample_rate} Hz")
    print(f"采集时长: {duration} 秒")
    print(f"电压范围: ±{voltage_range} V")
    print(f"总样本数: {samples_per_channel}")
    print(f"输出CSV文件: {output_file}")
    print(f"输出WAV文件: {wav_file}")
    print("-" * 50)
    
    try:
        # 创建任务
        with nidaqmx.Task() as task:
            # 添加模拟输入电压通道
            # USB-4431支持AC/DC耦合,这里使用默认设置
            task.ai_channels.add_ai_voltage_chan(
                f"{device_name}/{channel}",
                min_val=-voltage_range,
                max_val=voltage_range
            )
            
            # 配置采样时钟
            task.timing.cfg_samp_clk_timing(
                rate=sample_rate,
                sample_mode=AcquisitionType.FINITE,
                samps_per_chan=samples_per_channel
            )
            
            # 开始采集
            start_time = time.time()
            data = task.read(number_of_samples_per_channel=samples_per_channel)
            end_time = time.time()
            
            print(f"数据采集完成,耗时: {end_time - start_time:.2f} 秒")
            print(f"采集到 {len(data)} 个数据点")
            print(f"原始数据范围: [{np.min(data):.6f}, {np.max(data):.6f}] V")
            
            # 保存到CSV文件
            save_to_csv(data, sample_rate, output_file, voltage_range)
            
            # 转换并保存到WAV文件
            save_to_wav(data, sample_rate, wav_file)
            
            print(f"数据已保存到: {output_file}")
            print(f"WAV已保存到: {wav_file}")
            return output_file, wav_file
            
    except nidaqmx.DaqError as e:
        print(f"NI-DAQmx错误: {e}")
        return None, None
    except Exception as e:
        print(f"发生错误: {e}")
        return None, None

def save_to_csv(data, sample_rate, filename, voltage_range):
    """
    将采集到的数据保存到CSV文件
    
    参数:
        data (list): 采集到的数据点列表
        sample_rate (float): 采样率
        filename (str): 输出文件名
        voltage_range (float): 电压范围
    """
    
    # 计算时间戳
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        
        # 写入元数据
        writer.writerow(['Data Acquisition Information'])
        writer.writerow(['Device', 'NI USB-4431'])
        writer.writerow(['Timestamp', timestamp])
        writer.writerow(['Sampling', str(int(sample_rate))])
        writer.writerow(['Voltage Range', f"±{voltage_range}V"])
        writer.writerow(['Channel', 'CH1'])
        writer.writerow([])  # 空行
        
        # 写入列标题
        writer.writerow(['Time(s)', 'CH1(V)'])
        
        # 写入数据
        dt = 1.0 / sample_rate  # 时间间隔
        for i, value in enumerate(data):
            time_point = i * dt
            writer.writerow([f"{time_point:.6f}", f"{value:.6f}"])

def save_to_wav(data, sample_rate, filename, bit_depth=24):
    """
    将采集到的数据转换并保存为WAV文件
    
    参数:
        data (list): 采集到的数据点列表
        sample_rate (float): 采样率
        filename (str): 输出WAV文件名
        bit_depth (int): 位深度,默认为24位
    """
    
    # 将数据转换为numpy数组
    audio_data = np.array(data, dtype=np.float32)
    
    # 确保数据在±1范围内(已经是±1V范围,对应WAV的±1振幅)
    # 数据已经是正确的范围,不需要额外缩放
    
    print(f"使用采样率: {sample_rate} Hz")
    print(f"输出位深度: {bit_depth}位")
    print(f"WAV数据范围: [{np.min(audio_data):.6f}, {np.max(audio_data):.6f}]")
    
    if bit_depth == 16:
        # 转换为16位整数格式 (-32768 到 32767)
        audio_data_int = (audio_data * 32767).astype(np.int16)
        sampwidth = 2  # 2字节 = 16位
    elif bit_depth == 24:
        # 转换为24位整数格式 (-8388608 到 8388607)
        audio_data_int = (audio_data * 8388607).astype(np.int32)
        sampwidth = 3  # 3字节 = 24位
    elif bit_depth == 32:
        # 转换为32位浮点格式 (-1.0 到 1.0)
        audio_data_int = audio_data.astype(np.float32)
        sampwidth = 4  # 4字节 = 32位浮点
    else:
        raise ValueError("只支持16位、24位或32位深度")
    
    # 创建WAV文件
    with wave.open(filename, 'w') as wav_file:
        wav_file.setnchannels(1)  # 单声道
        wav_file.setsampwidth(sampwidth)
        wav_file.setframerate(sample_rate)
        
        if bit_depth == 16:
            wav_file.writeframes(audio_data_int.tobytes())
        elif bit_depth == 24:
            frames = audio_data_int.tobytes()
            # 只取每个32位整数的低24位(3个字节)
            frames_24bit = bytearray()
            for i in range(0, len(frames), 4):
                # 取低3个字节(24位)
                frames_24bit.extend(frames[i:i+3])
            wav_file.writeframes(bytes(frames_24bit))
        elif bit_depth == 32:
            wav_file.writeframes(audio_data_int.tobytes())
    
    print(f"成功转换WAV: {filename}")
    print(f"采样率: {sample_rate} Hz")
    print(f"数据点数: {len(audio_data)}")
    print(f"时长: {len(audio_data) / sample_rate:.2f} 秒")

def continuous_acquisition(device_name="Dev1", channel="ai0", 
                          sample_rate=48000, duration=5.0, 
                          num_cycles=3, voltage_range=1.0):
    """
    连续多次采集数据并同时生成CSV和WAV文件
    
    参数:
        device_name (str): 设备名称
        channel (str): 通道名称
        sample_rate (float): 采样率
        duration (float): 每次采集时长
        num_cycles (int): 采集次数
        voltage_range (float): 电压范围
    """
    
    print(f"开始连续采集 {num_cycles} 次数据...")
    print("=" * 60)
    
    output_files = []
    
    for cycle in range(num_cycles):
        print(f"\n第 {cycle + 1}/{num_cycles} 次采集:")
        print("-" * 30)
        
        # 生成文件名
        timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        output_file = f"usb4431_ch{channel[-1]}_data_{timestamp}_cycle{cycle+1}.csv"
        
        # 采集数据并生成CSV和WAV文件
        csv_file, wav_file = acquire_and_save_to_csv_and_wav(
            device_name=device_name,
            channel=channel,
            sample_rate=sample_rate,
            duration=duration,
            output_file=output_file,
            voltage_range=voltage_range
        )
        
        if csv_file and wav_file:
            output_files.append((csv_file, wav_file))
            print(f"第 {cycle + 1} 次采集完成")
        else:
            print(f"第 {cycle + 1} 次采集失败")
        
        # 在采集之间稍作延迟(除了最后一次)
        if cycle < num_cycles - 1:
            time.sleep(1)
    
    print("\n" + "=" * 60)
    print(f"连续采集完成,共生成 {len(output_files)} 对文件:")
    for csv_file, wav_file in output_files:
        print(f"  - {csv_file}")
        print(f"  - {wav_file}")

def main():
    """主函数"""
    print("NI USB-4431 数据采集与CSV/WAV保存工具")
    print("=" * 50)
    
    # 首先列出系统中的设备
    try:
        list_devices()
    except Exception as e:
        print(f"无法列出设备: {e}")
        print("请确保已正确安装NI-DAQmx驱动和nidaqmx Python包")
        print("安装命令: pip install nidaqmx")
        return
    
    # 示例1: 单次采集
    print("\n示例1: 单次采集5秒数据")
    print("-" * 30)
    acquire_and_save_to_csv_and_wav(
        device_name="Dev1",      # 根据实际情况修改设备名
        channel="ai0",           # 第一个通道
        sample_rate=48000,       # 48 kHz采样率
        duration=10.0,            # 采集5秒
        voltage_range=1.0        # ±1V范围,对应WAV的±1振幅范围
    )
    
    # # 示例2: 连续采集
    # print("\n\n示例2: 连续采集3次数据")
    # print("-" * 30)
    # continuous_acquisition(
    #     device_name="Dev1",      # 根据实际情况修改设备名
    #     channel="ai0",           # 第一个通道
    #     sample_rate=48000,       # 48 kHz采样率
    #     duration=2.0,            # 每次采集2秒
    #     num_cycles=3,            # 采集3次
    #     voltage_range=1.0        # ±1V范围,对应WAV的±1振幅范围
    # )

if __name__ == "__main__":
    main()

设备检测与列表
list_devices()函数可以列出系统中所有可用的NI设备。该函数会显示设备名称、产品类型、序列号和模拟输入通道数,方便用户确认设备连接状态。

数据采集配置
acquire_and_save_to_csv_and_wav()函数是核心功能,支持配置多个参数:设备名称、采集通道、采样率、采集时长、输出电压范围和自定义输出文件名。默认采样率设为48kHz,适合大多数音频采集场景。

数据采集实现
函数内部创建DAQmx任务,配置模拟输入电压通道和采样时钟。使用read()方法执行同步采集,返回采集到的电压数据数组。采集过程中会实时显示进度和参数信息。

数据保存格式
采集数据支持两种格式保存:CSV和WAV。CSV文件包含元数据信息和时间-电压数据对,便于后续分析处理。WAV文件则直接保存为音频格式,方便播放和音频处理软件使用。

CSV文件格式
save_to_csv()函数生成的CSV文件包含完整元数据:设备信息、时间戳、采样率、电压范围和通道信息。数据部分以时间戳和对应电压值的形式存储,时间精度达到微秒级。

WAV文件生成
save_to_wav()函数将电压数据转换为32位浮点格式,符合WAV文件标准。默认使用24位深度保存,确保音频质量。数据范围自动归一化到±1.0范围内,符合WAV格式规范。

错误处理
代码包含完整的异常处理机制,能捕获并显示NI-DAQmx错误和其他异常情况,确保程序稳定运行。

实际应用场景

该代码适合需要高精度音频采集的场合,如:声学测量、振动分析、工业噪声监测等。USB-4431的24位分辨率能准确捕捉微弱信号,48kHz采样率满足大多数音频频段需求。

CSV格式便于导入MATLAB、Excel等工具进行后续分析,而WAV格式可直接用于音频编辑软件。两种格式的结合提供了灵活的数据处理选择。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐