from pathlib import Path from typing import Dict, Any, Optional import pandas as pd import numpy as np from pyproj import Proj from dataclasses import dataclass, field from typing import Dict, Optional from pathlib import Path import pandas as pd from core.error_handler import ErrorHandler from core.config_manager import get_config import sys import csv import os import zipfile import argparse from genpy import Message import shutil import tempfile import pandas as pd import subprocess import pandas as pd import numpy as np @dataclass class Config: """PGVIL处理器配置类""" zip_path: Path output_path: Path engine_path: Optional[Path] = None map_path: Optional[Path] = None utm_zone: int = 51 # Example UTM zone x_offset: float = 0.0 y_offset: float = 0.0 def __post_init__(self): # Use output_path directly as output_dir to avoid nested directories self.output_dir = self.output_path self.output_dir.mkdir(parents=True, exist_ok=True) def run_pgvil_engine(config: Config): """Runs the external C++ preprocessing engine.""" if not config.engine_path or not config.map_path: print("C++ engine path or map path not configured. Skipping C++ engine execution.") return True # Return True assuming it's optional or handled elsewhere engine_cmd = [ str(config.engine_path), str(config.map_path), str(config.output_dir), str(config.x_offset), str(config.y_offset) ] print(f"--- Running C++ Preprocessing Engine ---") print(f"Command: {' '.join(engine_cmd)}") try: result = subprocess.run( engine_cmd, check=True, # Raise exception on non-zero exit code capture_output=True, # Capture stdout/stderr text=True, # Decode output as text cwd=config.engine_path.parent # Run from the engine's directory? Or script's? Adjust if needed. ) print("C++ Engine Output:") print(result.stdout) if result.stderr: print("C++ Engine Error Output:") print(result.stderr) print("--- C++ Engine Finished Successfully ---") return True except FileNotFoundError: print(f"Error: C++ engine executable not found at {config.engine_path}.") return False except subprocess.CalledProcessError as e: print(f"Error: C++ engine failed with exit code {e.returncode}.") print("C++ Engine Output (stdout):") print(e.stdout) print("C++ Engine Output (stderr):") print(e.stderr) return False except Exception as e: print(f"An unexpected error occurred while running the C++ engine: {e}") return False def remove_conflicting_columns(df_object, df_csv_info): """ delete the columns that are in both dataframes and are not simTime, simFrame, or playerId """ conflicting_columns = set(df_object.columns) & set(df_csv_info.columns) for col in conflicting_columns: if col not in ["simTime", "simFrame", "playerId"]: del df_csv_info[col] return df_csv_info def align_simtime_by_simframe(df): # 创建一个映射,将simFrame映射到其对应的simTime代表值 sim_frame_to_time_map = df.groupby('simFrame')['simTime'].first().to_dict() # 使用映射来更新DataFrame中的simTime值 df['simTime'] = df['simFrame'].map(sim_frame_to_time_map) # 检查simFrame列是否为空或包含非整数类型的数据 if df['simFrame'].empty or not df['simFrame'].apply(lambda x: isinstance(x, (int, np.integer))).all(): return df # 识别缺失的simFrame all_frames = np.arange(df['simFrame'].min(), df['simFrame'].max() + 1) missing_frames = set(all_frames) - set(df['simFrame']) new_rows = [] # 填补缺失的simFrame for missing_frame in missing_frames: prev_frame = df[df['simFrame'] < missing_frame]['simFrame'].max() next_frame = df[df['simFrame'] > missing_frame]['simFrame'].min() if prev_frame is not None and next_frame is not None: prev_row = df[df['simFrame'] == prev_frame].iloc[0] next_row = df[df['simFrame'] == next_frame].iloc[0] # 计算平均值并创建新行 new_row = prev_row.copy() new_row['simFrame'] = missing_frame for col in df.columns: if col not in ['simTime', 'simFrame']: new_row[col] = (prev_row[col] + next_row[col]) / 2 # 更新simTime值 new_row['simTime'] = sim_frame_to_time_map.get(missing_frame, np.nan) # 将新行添加到DataFrame中 new_rows.append(new_row) if new_rows: df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True) return df.sort_values(by='simFrame').reset_index(drop=True) def mergecopy_by_simtime(merged_df, external_df,ignore_cols,prefix=None,): """ 将external_df中所有的字段,基于nearest simTime,批量合并到merged_df中simtime相同的所有行中 """ useful_cols = [col for col in external_df.columns if col not in ignore_cols] for col in useful_cols: col_name = f"{prefix}_{col}" if prefix else col mapping = external_df.set_index('nearest_simTime')[col].to_dict() merged_df[col_name] = merged_df['simTime'].map(mapping) return merged_df def read_csv_with_filled_columns(file_path): try: # 确保 file_path 是字符串类型且有效 if not isinstance(file_path, str): raise ValueError("提供的文件路径无效") if not os.path.exists(file_path): raise FileNotFoundError(f"文件 {file_path} 不存在") # 使用 on_bad_lines='skip' 跳过格式错误的行 df = pd.read_csv(file_path, on_bad_lines='skip') # 跳过格式错误的行 # 强制填充缺失的列为 NaN,确保列数一致 if not df.empty: # 确保 df 不为空 df.fillna(np.nan, inplace=True) # 用 NaN 填充所有空值 return df except Exception as e: print(f"读取 CSV 文件 {file_path} 时发生错误: {str(e)}") return pd.DataFrame() # 返回空的 DataFrame 以便继续处理 def convert_heading(posH_rad): # 将弧度转换为角度 angle_deg = np.degrees(posH_rad) # 逆时针东为0 => 顺时针北为0,相当于 new_angle = (90 - angle_deg) % 360 heading_deg = (90 - angle_deg) % 360 return round(heading_deg,3) class PGVILProcessor: """PGVIL数据处理器,实现PGVIL特有的处理逻辑""" def __init__(self, config: Config): self.config = config def process_zip(self) -> Path: """处理输入ZIP文件,并返回输出目录路径 zip_path output_dir """ print(f"Processing ZIP: {self.config.zip_path}") zip_path = self.config.zip_path output_dir = Path(self.config.output_dir)#将目录路径转换为Path对象 # 创建以 ZIP 名称为子目录的提取目录 zip_name = Path(zip_path).stem output_dir.mkdir(parents=True, exist_ok=True) # 提取HMIdata和RDBdata中的CSV文件 with zipfile.ZipFile(zip_path, 'r') as zip_ref: for name in zip_ref.namelist(): if ('HMIdata/' in name or 'RDBdata/' in name) and name.endswith('.csv'): # 原 zip 内的子路径最后一部分为文件名 filename = os.path.basename(name) src = zip_ref.open(name) dst_path = output_dir / filename # print(f"提取 {name} 到 {dst_path}") with open(dst_path, 'wb') as dst_file: shutil.copyfileobj(src, dst_file) # 更新 config 中的输出目录为刚才的子目录 self.config.output_dir = output_dir return output_dir def merge_csv_files(self): x_offset = self.config.x_offset y_offset = self.config.y_offset data_path = self.config.output_dir # X_OFFSET = 258109.4239876 # Y_OFFSET = 4149969.964821 # 定义CSV文件路径 try: obj_state_path = os.path.join(data_path, "ObjState.csv") ego_map_path = os.path.join(data_path, "EgoMap.csv") lane_map_path = os.path.join(data_path, "LaneMap.csv") laneINfo_path = os.path.join(data_path, "LaneInfo.csv") roadPos_path = os.path.join(data_path, "RoadPos.csv") vehicleystems_path = os.path.join(data_path, "VehicleSystems.csv") trafficlight_path = os.path.join(data_path, "TrafficLight.csv") function_path = os.path.join(data_path, "Function.csv") except FileNotFoundError: raise Exception("File not found") df_object = read_csv_with_filled_columns(obj_state_path) df_map_info = read_csv_with_filled_columns(ego_map_path) df_lane_map = read_csv_with_filled_columns(lane_map_path) df_laneINfo = read_csv_with_filled_columns(laneINfo_path) df_roadPos = read_csv_with_filled_columns(roadPos_path) df_vehicleystems = read_csv_with_filled_columns(vehicleystems_path) df_trafficlight = read_csv_with_filled_columns(trafficlight_path) df_function = read_csv_with_filled_columns(function_path) # 检查并转换数值型列 def convert_numeric_columns(df): numeric_cols = df.select_dtypes(include=['number']).columns df[numeric_cols] = df[numeric_cols].astype(float) return df df_object = convert_numeric_columns(df_object) df_map_info = convert_numeric_columns(df_map_info) df_lane_map = convert_numeric_columns(df_lane_map) df_laneINfo = convert_numeric_columns(df_laneINfo) df_roadPos = convert_numeric_columns(df_roadPos) df_vehicleystems = convert_numeric_columns(df_vehicleystems) df_trafficlight = convert_numeric_columns(df_trafficlight) df_function = convert_numeric_columns(df_function) # 对df_object中的posX和posY应用偏置 df_object['posX'] += x_offset df_object['posY'] += y_offset # 对齐simTime和simFrame df_object = align_simtime_by_simframe(df_object) df_map_info = align_simtime_by_simframe(df_map_info) df_lane_map = align_simtime_by_simframe(df_lane_map) df_laneINfo = align_simtime_by_simframe(df_laneINfo) df_roadPos = align_simtime_by_simframe(df_roadPos) df_vehicleystems = align_simtime_by_simframe(df_vehicleystems) df_trafficlight = align_simtime_by_simframe(df_trafficlight) del_ego_map = remove_conflicting_columns(df_object, df_map_info)#去掉重复的列 # 合并数据 merged_df = pd.merge(df_object, del_ego_map, on=["simTime", "simFrame", "playerId"], how="left") # 使用simTime, simFrame, playerId合并ObjState和LaneMap\trafficlight\trafficlight del_lane_map = remove_conflicting_columns(merged_df, df_lane_map) merged_df = pd.merge(merged_df, del_lane_map, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates() del_laneINfo = remove_conflicting_columns(merged_df, df_laneINfo) merged_df = pd.merge(merged_df, del_laneINfo, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates() del_roadPos = remove_conflicting_columns(merged_df, df_roadPos) merged_df = pd.merge(merged_df, del_roadPos, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates() del_trafficlight = remove_conflicting_columns(merged_df, df_trafficlight) merged_df = pd.merge(merged_df, del_trafficlight, on=["simTime", "simFrame"], how="left").drop_duplicates() del_vehicleystems = remove_conflicting_columns(merged_df, df_vehicleystems) merged_df = pd.merge(merged_df, del_vehicleystems, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates() tolerance = 0.01 def find_closest_time(sim_time, sim_time_to_index, tolerance=0.01): # 找到最接近的时间点,并且该时间点的差异小于 tolerance closest_time = min(sim_time_to_index.keys(), key=lambda y: abs(y - sim_time) if abs(y - sim_time) < tolerance else float('inf')) return closest_time #创建一个映射,存储 df_object 中每个 simTime 值及其对应的行索引 sim_time_to_index = {row['simTime']: idx for idx, row in merged_df.iterrows()} df_function = df_function.sort_values(by='simTime').reset_index(drop=True)#按simTime列排序 #找到 function.csv 中每个 simTime 值在 df_object 中的最近时间点 df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance)) df_function['nearest_index'] = df_function['nearest_simTime'].map(sim_time_to_index) #确保df_function中的nearest_index为整数类型,且去掉NaN值 df_function_renamed = df_function.rename(columns={'simTime': 'function_simTime'})#重命名 df_function 中的 simTime 列 df_function_valid = df_function_renamed.dropna(subset=['nearest_index']).copy() df_function_valid['nearest_index'] = df_function_valid['nearest_index'].astype(int) ignore_cols = ['function_simTime', 'nearest_simTime', 'nearest_index'] merged_df = mergecopy_by_simtime(merged_df, df_function_valid,ignore_cols) """ def check_matching(df_function, sim_time_to_index, tolerance=0.01): #检查 function.csv 中的所有行是否都成功匹配 # 计算每个 simTime 对应的 nearest_simTime df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance)) # 检查是否有没有匹配到的行 unmatched_rows = df_function[df_function['nearest_simTime'].isna()] if not unmatched_rows.empty: print(f"没有匹配上的行: {unmatched_rows}") else: print("所有行都成功匹配!") # 统计匹配上了的行数和没有匹配上的行数 total_rows = len(df_function) matched_rows = len(df_function) - len(unmatched_rows) print(f"总行数: {total_rows}, 匹配上的行数: {matched_rows}, 没有匹配上的行数: {len(unmatched_rows)}") return unmatched_rows # 调用检查函数 unmatched_rows = check_matching(df_function, sim_time_to_index, tolerance=0.01) # 获取最后一行的 simTime last_row_simtime = df_function.iloc[-1]['simTime'] print(f"最后一行的 simTime: {last_row_simtime}") # 获取最后一行的 nearest_simTime last_row_nearest_simtime = df_function.iloc[-1]['nearest_simTime'] print(f"最后一行的 nearest_simTime: {last_row_nearest_simtime}") """ # 将弧度转换为角度 merged_df['posH'] = merged_df['posH'].apply(convert_heading) if 'posP' in merged_df.columns: merged_df.rename(columns={'posP': 'pitch_rate'}, inplace=True) if 'posR' in merged_df.columns: merged_df.rename(columns={'posR': 'roll_rate'}, inplace=True) # 先使用 infer_objects 来确保类型一致 merged_df = merged_df.infer_objects() merged_df.fillna(np.nan, inplace=True) # 确保空值填充为 NaN merged_csv_path = Path(data_path) / "merged_ObjState.csv" # merged_df.to_csv(merged_csv_path, index=False,na_rep="NaN") merged_df.to_csv(merged_csv_path, index=False) return merged_csv_path # @ErrorHandler.measure_performance # def process_built_in_data(self) -> Dict[str, Path]: # """实现PGVIL特有的内置数据处理逻辑 # 处理顺序: # 1. 处理CAN数据 # 2. 处理传感器数据 # 3. 处理其他PGVIL特有数据 # 4. 合并内置数据 # Returns: # 处理结果文件路径字典 # """ # result_files = {} # # 1. 处理CAN数据 # print("1. 处理CAN数据...") # can_results = self._process_can_data() # if can_results: # result_files.update(can_results) # else: # print("警告: CAN数据处理失败或无数据") # # 2. 处理传感器数据 # print("\n2. 处理传感器数据...") # sensor_results = self._process_sensor_data() # if sensor_results: # result_files.update(sensor_results) # else: # print("警告: 传感器数据处理失败或无数据") # # 3. 处理其他PGVIL特有数据 # print("\n3. 处理其他PGVIL数据...") # other_results = self._process_other_data() # if other_results: # result_files.update(other_results) # # 4. 合并内置数据 # print("\n4. 合并内置数据...") # if not self._merge_built_in_data(result_files): # print("警告: 内置数据合并失败") # return result_files # def _process_can_data(self) -> Dict[str, Path]: # """处理CAN数据""" # # TODO: 实现CAN数据处理逻辑 # return {} # def _process_sensor_data(self) -> Dict[str, Path]: # """处理传感器数据""" # # TODO: 实现传感器数据处理逻辑 # return {} # def _process_other_data(self) -> Dict[str, Path]: # """处理其他PGVIL特有数据""" # # TODO: 实现其他数据处理逻辑 # return {} # def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool: # """合并PGVIL内置数据 # Args: # result_files: 处理结果文件路径字典 # Returns: # 合并是否成功 # """ # try: # # 实现PGVIL特有的数据合并逻辑 # return True # except Exception as e: # print(f"内置数据合并失败: {e}") # return False