本指南将帮助您创建自定义的CAN数据处理插件,以处理特定格式的CAN数据并将其集成到数据处理流程中。
resources
目录下plugins/can_plugin_template.py
为新文件,例如 my_can_plugin.py
在您的插件文件中修改以下内容:
class MyCanProcessor(CustomDataProcessorPlugin): # 修改类名
def __init__(self):
self.dbc = None
self.custom_dbc_path = Path("resources/my_custom.dbc") # 修改为您的DBC文件路径
self._load_dbc()
修改 can_handle
方法以匹配您的数据文件夹命名规则:
def can_handle(self, zip_path: Path, folder_name: str) -> bool:
return folder_name.startswith('my_can_data_') # 修改为您的文件夹前缀
在 get_required_columns
方法中定义您需要的输出列:
def get_required_columns(self) -> Dict[str, Any]:
return {
'simTime': float, # 必需列
'playerId': int, # 必需列
'my_signal_1': float, # 自定义列
'my_signal_2': float, # 自定义列
}
在 _process_can_data
方法中映射CAN信号到输出列:
record = {
'simTime': timestamp,
'playerId': 1,
'my_signal_1': decoded.get('CAN_Signal_Name_1', 0.0),
'my_signal_2': decoded.get('CAN_Signal_Name_2', 0.0),
}
your_data.zip/
my_can_data_folder/ # 您的CAN数据文件夹
can_data_1.csv # CAN数据文件
can_data_2.csv
...
CSV文件必须包含以下列:
timestamp
: 时间戳 (秒)can_id
: CAN消息ID (十六进制或十进制)data
: CAN数据 (十六进制字符串)示例:
timestamp,can_id,data
1.234,0x123,0123456789ABCDEF
1.235,0x456,FEDCBA9876543210
运行插件时会自动打印可用的CAN消息和信号列表,确保:
DBC文件未找到
信号解码失败
数据类型错误
参考 plugins/can_plugin_template.py
中的示例实现。
# 添加自定义数据处理逻辑
def _process_can_data(self, can_file: Path) -> pd.DataFrame:
df = pd.read_csv(can_file)
processed_data = []
for _, row in df.iterrows():
decoded = self._decode_can_message(row['can_id'], bytes.fromhex(row['data']))
if decoded:
# 添加您的自定义处理逻辑
speed = decoded.get('Vehicle_Speed', 0) * 3.6 # 转换为km/h
record = {
'simTime': row['timestamp'],
'playerId': 1,
'vehicle_speed': speed
}
processed_data.append(record)
return pd.DataFrame(processed_data)
必需字段
simTime
和 playerId
数据质量
性能优化