""" 基于XML配置的雷达数据处理器示例 使用XML配置文件来定义数据格式和处理参数 示例数据格式(CSV): 1234567.89,1,15.6,12.3,30.5,-15.2 1234567.90,2,25.1,8.7,-15.3,-12.8 """ from pathlib import Path import pandas as pd from typing import Dict, Any, Optional import tempfile import zipfile import xml.etree.ElementTree as ET import numpy as np from core.plugin_interface import CustomDataProcessorPlugin class RadarDataProcessor(CustomDataProcessorPlugin): """处理雷达数据的插件,使用XML配置文件""" def __init__(self): """初始化插件,加载XML配置文件""" self.config = None self.config_path = Path("resources/radar_config.xml") self.field_map = {} self.delimiter = ',' self.processing_params = {} self.columns = ['simTime', 'target_id', 'distance', 'velocity', 'angle', 'rcs'] # 移除 frame self._load_config() def _load_config(self): """加载并解析XML配置文件""" try: if not self.config_path.exists(): print(f"配置文件未找到: {self.config_path}") return tree = ET.parse(self.config_path) root = tree.getroot() # 解析字段配置 fields = root.find('./DataFormat/Fields') if fields is not None: for field in fields.findall('Field'): name = field.get('name') column = int(field.get('column')) dtype = field.get('type') self.field_map[column] = { 'name': name, 'type': dtype, 'unit': field.get('unit', '') } # 解析分隔符 delimiter = root.find('./DataFormat/Delimiter') if delimiter is not None: self.delimiter = delimiter.text # 解析处理参数 processing = root.find('./Processing') if processing is not None: # 解析时间格式 time_format = processing.find('TimeFormat') if time_format is not None: self.processing_params['time_format'] = time_format.text # 解析角度范围 angle_range = processing.find('AngleRange') if angle_range is not None: self.processing_params['angle_min'] = float(angle_range.find('Min').text) self.processing_params['angle_max'] = float(angle_range.find('Max').text) # 解析距离范围 distance_range = processing.find('DistanceRange') if distance_range is not None: self.processing_params['distance_min'] = float(distance_range.find('Min').text) self.processing_params['distance_max'] = float(distance_range.find('Max').text) # 解析过滤器参数 filters = processing.find('Filters') if filters is not None: min_rcs = filters.find('MinRCS') if min_rcs is not None: self.processing_params['min_rcs'] = float(min_rcs.text) print(f"成功加载雷达配置文件: {self.config_path}") except Exception as e: print(f"加载配置文件失败: {e}") import traceback traceback.print_exc() def can_handle(self, zip_path: Path, folder_name: str) -> bool: """检查是否是雷达数据文件夹""" # 修改判断逻辑,使其更灵活地匹配雷达数据文件夹 return 'radar' in folder_name.lower() def get_required_columns(self) -> Dict[str, Any]: """定义输出数据的列和类型""" return { 'simTime': float, # 必需列 'playerId': int, # 必需列 'simFrame': int, # 必需列 'radar_distance': float, # 雷达距离 'radar_velocity': float, # 雷达速度 'radar_angle': float, # 雷达角度 'radar_rcs': float, # 雷达RCS } def _convert_timestamp(self, timestamp: float) -> float: """根据配置转换时间戳格式""" time_format = self.processing_params.get('time_format', 'unix_timestamp') if time_format == 'unix_timestamp': return timestamp # 添加其他时间格式的转换if需要 return timestamp def _filter_data(self, df: pd.DataFrame) -> pd.DataFrame: """根据配置的参数过滤数据""" if df.empty: return df # 角度范围过滤 angle_min = self.processing_params.get('angle_min', -float('inf')) angle_max = self.processing_params.get('angle_max', float('inf')) df = df[df['angle'].between(angle_min, angle_max)] # 距离范围过滤 distance_min = self.processing_params.get('distance_min', 0) distance_max = self.processing_params.get('distance_max', float('inf')) df = df[df['distance'].between(distance_min, distance_max)] # RCS过滤 min_rcs = self.processing_params.get('min_rcs', -float('inf')) df = df[df['rcs'] >= min_rcs] return df def _process_extracted_files(self, file_paths: list) -> Optional[pd.DataFrame]: """处理提取的雷达数据文件,直接存储到输出目录""" all_data = [] frame_counter = 1 for file_path in file_paths: try: if file_path.suffix.lower() == '.csv': # Read and process data df = pd.read_csv(file_path, delimiter=self.delimiter, header=None) if df.empty: continue df = df.iloc[:, :6] df.columns = ['simTime', 'target_id', 'distance', 'velocity', 'angle', 'rcs'] # Add frame numbers and process data df['simFrame'] = range(frame_counter, frame_counter + len(df)) frame_counter += len(df) # Round values for consistency for col in ['simTime', 'distance', 'velocity', 'angle', 'rcs']: df[col] = df[col].round(3) df['playerId'] = 1 # Filter and rename columns filtered_df = self._filter_data(df) if not filtered_df.empty: filtered_df = filtered_df.rename(columns={ 'distance': 'radar_distance', 'velocity': 'radar_velocity', 'angle': 'radar_angle', 'rcs': 'radar_rcs' }) all_data.append(filtered_df) except Exception as e: print(f"处理文件失败: {file_path}: {e}") continue if not all_data: return None # Merge all data and sort by time final_df = pd.concat(all_data, ignore_index=True) final_df.sort_values('simTime', inplace=True) final_df['simFrame'] = range(1, len(final_df) + 1) # Ensure correct data types for col, dtype in self.get_required_columns().items(): if col in final_df.columns: final_df[col] = final_df[col].astype(dtype) final_df.reset_index(drop=True, inplace=True) return final_df