123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- """
- 基于XML配置的雷达数据处理器示例
- 使用XML配置文件来定义数据格式和处理参数
- 示例数据格式(CSV):
- 1234567.89,1,15.6,12.3,30.5,-15.2
- 1234567.90,2,25.1,8.7,-15.3,-12.8
- """
- from pathlib import Path
- import pandas as pd
- from typing import Dict, Any, Optional
- import tempfile
- import zipfile
- import xml.etree.ElementTree as ET
- import numpy as np
- from core.plugin_interface import CustomDataProcessorPlugin
- class RadarDataProcessor(CustomDataProcessorPlugin):
- """处理雷达数据的插件,使用XML配置文件"""
-
- def __init__(self):
- """初始化插件,加载XML配置文件"""
- self.config = None
- self.config_path = Path("resources/radar_config.xml")
- self.field_map = {}
- self.delimiter = ','
- self.processing_params = {}
- self.columns = ['simTime', 'target_id', 'distance', 'velocity', 'angle', 'rcs'] # 移除 frame
- self._load_config()
-
- def _load_config(self):
- """加载并解析XML配置文件"""
- try:
- if not self.config_path.exists():
- print(f"配置文件未找到: {self.config_path}")
- return
-
- tree = ET.parse(self.config_path)
- root = tree.getroot()
-
- # 解析字段配置
- fields = root.find('./DataFormat/Fields')
- if fields is not None:
- for field in fields.findall('Field'):
- name = field.get('name')
- column = int(field.get('column'))
- dtype = field.get('type')
- self.field_map[column] = {
- 'name': name,
- 'type': dtype,
- 'unit': field.get('unit', '')
- }
-
- # 解析分隔符
- delimiter = root.find('./DataFormat/Delimiter')
- if delimiter is not None:
- self.delimiter = delimiter.text
-
- # 解析处理参数
- processing = root.find('./Processing')
- if processing is not None:
- # 解析时间格式
- time_format = processing.find('TimeFormat')
- if time_format is not None:
- self.processing_params['time_format'] = time_format.text
-
- # 解析角度范围
- angle_range = processing.find('AngleRange')
- if angle_range is not None:
- self.processing_params['angle_min'] = float(angle_range.find('Min').text)
- self.processing_params['angle_max'] = float(angle_range.find('Max').text)
-
- # 解析距离范围
- distance_range = processing.find('DistanceRange')
- if distance_range is not None:
- self.processing_params['distance_min'] = float(distance_range.find('Min').text)
- self.processing_params['distance_max'] = float(distance_range.find('Max').text)
-
- # 解析过滤器参数
- filters = processing.find('Filters')
- if filters is not None:
- min_rcs = filters.find('MinRCS')
- if min_rcs is not None:
- self.processing_params['min_rcs'] = float(min_rcs.text)
-
- print(f"成功加载雷达配置文件: {self.config_path}")
-
- except Exception as e:
- print(f"加载配置文件失败: {e}")
- import traceback
- traceback.print_exc()
-
- def can_handle(self, zip_path: Path, folder_name: str) -> bool:
- """检查是否是雷达数据文件夹"""
- # 修改判断逻辑,使其更灵活地匹配雷达数据文件夹
- return 'radar' in folder_name.lower()
-
- def get_required_columns(self) -> Dict[str, Any]:
- """定义输出数据的列和类型"""
- return {
- 'simTime': float, # 必需列
- 'playerId': int, # 必需列
- 'simFrame': int, # 必需列
- 'radar_distance': float, # 雷达距离
- 'radar_velocity': float, # 雷达速度
- 'radar_angle': float, # 雷达角度
- 'radar_rcs': float, # 雷达RCS
- }
-
- def _convert_timestamp(self, timestamp: float) -> float:
- """根据配置转换时间戳格式"""
- time_format = self.processing_params.get('time_format', 'unix_timestamp')
- if time_format == 'unix_timestamp':
- return timestamp
- # 添加其他时间格式的转换if需要
- return timestamp
-
- def _filter_data(self, df: pd.DataFrame) -> pd.DataFrame:
- """根据配置的参数过滤数据"""
- if df.empty:
- return df
-
- # 角度范围过滤
- angle_min = self.processing_params.get('angle_min', -float('inf'))
- angle_max = self.processing_params.get('angle_max', float('inf'))
- df = df[df['angle'].between(angle_min, angle_max)]
-
- # 距离范围过滤
- distance_min = self.processing_params.get('distance_min', 0)
- distance_max = self.processing_params.get('distance_max', float('inf'))
- df = df[df['distance'].between(distance_min, distance_max)]
-
- # RCS过滤
- min_rcs = self.processing_params.get('min_rcs', -float('inf'))
- df = df[df['rcs'] >= min_rcs]
-
- return df
- def _process_extracted_files(self, file_paths: list) -> Optional[pd.DataFrame]:
- """处理提取的雷达数据文件,直接存储到输出目录"""
- all_data = []
- frame_counter = 1
-
- for file_path in file_paths:
- try:
- if file_path.suffix.lower() == '.csv':
- # Read and process data
- df = pd.read_csv(file_path, delimiter=self.delimiter, header=None)
- if df.empty:
- continue
-
- df = df.iloc[:, :6]
- df.columns = ['simTime', 'target_id', 'distance', 'velocity', 'angle', 'rcs']
-
- # Add frame numbers and process data
- df['simFrame'] = range(frame_counter, frame_counter + len(df))
- frame_counter += len(df)
-
- # Round values for consistency
- for col in ['simTime', 'distance', 'velocity', 'angle', 'rcs']:
- df[col] = df[col].round(3)
- df['playerId'] = 1
- # Filter and rename columns
- filtered_df = self._filter_data(df)
- if not filtered_df.empty:
- filtered_df = filtered_df.rename(columns={
- 'distance': 'radar_distance',
- 'velocity': 'radar_velocity',
- 'angle': 'radar_angle',
- 'rcs': 'radar_rcs'
- })
- all_data.append(filtered_df)
-
- except Exception as e:
- print(f"处理文件失败: {file_path}: {e}")
- continue
-
- if not all_data:
- return None
-
- # Merge all data and sort by time
- final_df = pd.concat(all_data, ignore_index=True)
- final_df.sort_values('simTime', inplace=True)
- final_df['simFrame'] = range(1, len(final_df) + 1)
-
- # Ensure correct data types
- for col, dtype in self.get_required_columns().items():
- if col in final_df.columns:
- final_df[col] = final_df[col].astype(dtype)
-
- final_df.reset_index(drop=True, inplace=True)
- return final_df
|