# CAN数据处理插件开发指南 ## 1. 概述 本指南将帮助您创建自定义的CAN数据处理插件,以处理特定格式的CAN数据并将其集成到数据处理流程中。 ## 2. 快速开始 ### 2.1 准备工作 1. 将您的DBC文件放在 `resources` 目录下 2. 复制 `plugins/can_plugin_template.py` 为新文件,例如 `my_can_plugin.py` ### 2.2 基本配置 在您的插件文件中修改以下内容: ```python class MyCanProcessor(CustomDataProcessorPlugin): # 修改类名 def __init__(self): self.dbc = None self.custom_dbc_path = Path("resources/my_custom.dbc") # 修改为您的DBC文件路径 self._load_dbc() ``` ### 2.3 数据文件夹识别 修改 `can_handle` 方法以匹配您的数据文件夹命名规则: ```python def can_handle(self, zip_path: Path, folder_name: str) -> bool: return folder_name.startswith('my_can_data_') # 修改为您的文件夹前缀 ``` ### 2.4 定义输出列 在 `get_required_columns` 方法中定义您需要的输出列: ```python def get_required_columns(self) -> Dict[str, Any]: return { 'simTime': float, # 必需列 'playerId': int, # 必需列 'my_signal_1': float, # 自定义列 'my_signal_2': float, # 自定义列 } ``` ## 3. 数据处理配置 ### 3.1 DBC文件要求 - DBC文件必须包含完整的CAN消息和信号定义 - 确保信号有正确的比例因子和偏移量 - 建议包含信号的单位信息 ### 3.2 信号映射示例 在 `_process_can_data` 方法中映射CAN信号到输出列: ```python record = { 'simTime': timestamp, 'playerId': 1, 'my_signal_1': decoded.get('CAN_Signal_Name_1', 0.0), 'my_signal_2': decoded.get('CAN_Signal_Name_2', 0.0), } ``` ## 4. 输入数据格式要求 ### 4.1 ZIP文件结构 ``` your_data.zip/ my_can_data_folder/ # 您的CAN数据文件夹 can_data_1.csv # CAN数据文件 can_data_2.csv ... ``` ### 4.2 CAN数据文件格式 CSV文件必须包含以下列: - `timestamp`: 时间戳 (秒) - `can_id`: CAN消息ID (十六进制或十进制) - `data`: CAN数据 (十六进制字符串) 示例: ```csv timestamp,can_id,data 1.234,0x123,0123456789ABCDEF 1.235,0x456,FEDCBA9876543210 ``` ## 5. 调试技巧 ### 5.1 验证DBC文件加载 运行插件时会自动打印可用的CAN消息和信号列表,确保: - 所有预期的消息ID都列出 - 信号名称正确 ### 5.2 数据处理验证 1. 使用小数据集进行测试 2. 检查输出CSV文件中的数据类型和值范围 3. 确认时间戳的连续性和合理性 ### 5.3 常见问题解决 1. DBC文件未找到 - 检查文件路径是否正确 - 确认文件在resources目录下 2. 信号解码失败 - 验证CAN ID是否匹配 - 检查数据长度是否符合DBC定义 3. 数据类型错误 - 确保所有输出列都有正确的数据类型转换 - 检查是否处理了所有可能的空值情况 ## 6. 示例 ### 6.1 完整的插件示例 参考 `plugins/can_plugin_template.py` 中的示例实现。 ### 6.2 自定义数据处理示例 ```python # 添加自定义数据处理逻辑 def _process_can_data(self, can_file: Path) -> pd.DataFrame: df = pd.read_csv(can_file) processed_data = [] for _, row in df.iterrows(): decoded = self._decode_can_message(row['can_id'], bytes.fromhex(row['data'])) if decoded: # 添加您的自定义处理逻辑 speed = decoded.get('Vehicle_Speed', 0) * 3.6 # 转换为km/h record = { 'simTime': row['timestamp'], 'playerId': 1, 'vehicle_speed': speed } processed_data.append(record) return pd.DataFrame(processed_data) ``` ## 7. 注意事项 1. 必需字段 - 确保输出数据包含 `simTime` 和 `playerId` - 这些字段用于数据合并和同步 2. 数据质量 - 处理异常值和无效数据 - 确保时间戳的单调递增 - 合理处理丢失的数据点 3. 性能优化 - 考虑批量处理大量数据 - 使用适当的数据结构减少内存使用 - 添加进度提示以监控长时间运行的处理