can_plugin_guide.md 4.1 KB

CAN数据处理插件开发指南

1. 概述

本指南将帮助您创建自定义的CAN数据处理插件,以处理特定格式的CAN数据并将其集成到数据处理流程中。

2. 快速开始

2.1 准备工作

  1. 将您的DBC文件放在 resources 目录下
  2. 复制 plugins/can_plugin_template.py 为新文件,例如 my_can_plugin.py

2.2 基本配置

在您的插件文件中修改以下内容:

class MyCanProcessor(CustomDataProcessorPlugin):  # 修改类名
    def __init__(self):
        self.dbc = None
        self.custom_dbc_path = Path("resources/my_custom.dbc")  # 修改为您的DBC文件路径
        self._load_dbc()

2.3 数据文件夹识别

修改 can_handle 方法以匹配您的数据文件夹命名规则:

def can_handle(self, zip_path: Path, folder_name: str) -> bool:
    return folder_name.startswith('my_can_data_')  # 修改为您的文件夹前缀

2.4 定义输出列

get_required_columns 方法中定义您需要的输出列:

def get_required_columns(self) -> Dict[str, Any]:
    return {
        'simTime': float,      # 必需列
        'playerId': int,       # 必需列
        'my_signal_1': float,  # 自定义列
        'my_signal_2': float,  # 自定义列
    }

3. 数据处理配置

3.1 DBC文件要求

  • DBC文件必须包含完整的CAN消息和信号定义
  • 确保信号有正确的比例因子和偏移量
  • 建议包含信号的单位信息

3.2 信号映射示例

_process_can_data 方法中映射CAN信号到输出列:

record = {
    'simTime': timestamp,
    'playerId': 1,
    'my_signal_1': decoded.get('CAN_Signal_Name_1', 0.0),
    'my_signal_2': decoded.get('CAN_Signal_Name_2', 0.0),
}

4. 输入数据格式要求

4.1 ZIP文件结构

your_data.zip/
    my_can_data_folder/      # 您的CAN数据文件夹
        can_data_1.csv       # CAN数据文件
        can_data_2.csv
        ...

4.2 CAN数据文件格式

CSV文件必须包含以下列:

  • timestamp: 时间戳 (秒)
  • can_id: CAN消息ID (十六进制或十进制)
  • data: CAN数据 (十六进制字符串)

示例:

timestamp,can_id,data
1.234,0x123,0123456789ABCDEF
1.235,0x456,FEDCBA9876543210

5. 调试技巧

5.1 验证DBC文件加载

运行插件时会自动打印可用的CAN消息和信号列表,确保:

  • 所有预期的消息ID都列出
  • 信号名称正确

5.2 数据处理验证

  1. 使用小数据集进行测试
  2. 检查输出CSV文件中的数据类型和值范围
  3. 确认时间戳的连续性和合理性

5.3 常见问题解决

  1. DBC文件未找到

    • 检查文件路径是否正确
    • 确认文件在resources目录下
  2. 信号解码失败

    • 验证CAN ID是否匹配
    • 检查数据长度是否符合DBC定义
  3. 数据类型错误

    • 确保所有输出列都有正确的数据类型转换
    • 检查是否处理了所有可能的空值情况

6. 示例

6.1 完整的插件示例

参考 plugins/can_plugin_template.py 中的示例实现。

6.2 自定义数据处理示例

# 添加自定义数据处理逻辑
def _process_can_data(self, can_file: Path) -> pd.DataFrame:
    df = pd.read_csv(can_file)
    processed_data = []
    
    for _, row in df.iterrows():
        decoded = self._decode_can_message(row['can_id'], bytes.fromhex(row['data']))
        if decoded:
            # 添加您的自定义处理逻辑
            speed = decoded.get('Vehicle_Speed', 0) * 3.6  # 转换为km/h
            record = {
                'simTime': row['timestamp'],
                'playerId': 1,
                'vehicle_speed': speed
            }
            processed_data.append(record)
            
    return pd.DataFrame(processed_data)

7. 注意事项

  1. 必需字段

    • 确保输出数据包含 simTimeplayerId
    • 这些字段用于数据合并和同步
  2. 数据质量

    • 处理异常值和无效数据
    • 确保时间戳的单调递增
    • 合理处理丢失的数据点
  3. 性能优化

    • 考虑批量处理大量数据
    • 使用适当的数据结构减少内存使用
    • 添加进度提示以监控长时间运行的处理