123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- from pathlib import Path
- from typing import Dict, Any, Optional
- import pandas as pd
- import numpy as np
- from pyproj import Proj
- from dataclasses import dataclass, field
- from typing import Dict, Optional
- from pathlib import Path
- import pandas as pd
- from core.error_handler import ErrorHandler
- from core.config_manager import get_config
- import sys
- import csv
- import os
- import zipfile
- import argparse
- from genpy import Message
- import shutil
- import tempfile
- import pandas as pd
- import subprocess
- import pandas as pd
- import numpy as np
- @dataclass
- class Config:
- """PGVIL处理器配置类"""
- zip_path: Path
- output_path: Path
- engine_path: Optional[Path] = None
- map_path: Optional[Path] = None
- utm_zone: int = 51 # Example UTM zone
- x_offset: float = 0.0
- y_offset: float = 0.0
- def __post_init__(self):
- # Use output_path directly as output_dir to avoid nested directories
- self.output_dir = self.output_path
- self.output_dir.mkdir(parents=True, exist_ok=True)
- def run_pgvil_engine(config: Config):
- """Runs the external C++ preprocessing engine."""
- if not config.engine_path or not config.map_path:
- print("C++ engine path or map path not configured. Skipping C++ engine execution.")
- return True # Return True assuming it's optional or handled elsewhere
- engine_cmd = [
- str(config.engine_path),
- str(config.map_path),
- str(config.output_dir),
- str(config.x_offset),
- str(config.y_offset)
- ]
- print(f"--- Running C++ Preprocessing Engine ---")
- print(f"Command: {' '.join(engine_cmd)}")
- try:
- result = subprocess.run(
- engine_cmd,
- check=True, # Raise exception on non-zero exit code
- capture_output=True, # Capture stdout/stderr
- text=True, # Decode output as text
- cwd=config.engine_path.parent # Run from the engine's directory? Or script's? Adjust if needed.
- )
- print("C++ Engine Output:")
- print(result.stdout)
- if result.stderr:
- print("C++ Engine Error Output:")
- print(result.stderr)
- print("--- C++ Engine Finished Successfully ---")
- return True
- except FileNotFoundError:
- print(f"Error: C++ engine executable not found at {config.engine_path}.")
- return False
- except subprocess.CalledProcessError as e:
- print(f"Error: C++ engine failed with exit code {e.returncode}.")
- print("C++ Engine Output (stdout):")
- print(e.stdout)
- print("C++ Engine Output (stderr):")
- print(e.stderr)
- return False
- except Exception as e:
- print(f"An unexpected error occurred while running the C++ engine: {e}")
- return False
- def remove_conflicting_columns(df_object, df_csv_info):
- """
- delete the columns that are in both dataframes and are not simTime, simFrame, or playerId
- """
- conflicting_columns = set(df_object.columns) & set(df_csv_info.columns)
- for col in conflicting_columns:
- if col not in ["simTime", "simFrame", "playerId"]:
- del df_csv_info[col]
- return df_csv_info
- def align_simtime_by_simframe(df):
- # 创建一个映射,将simFrame映射到其对应的simTime代表值
- sim_frame_to_time_map = df.groupby('simFrame')['simTime'].first().to_dict()
-
- # 使用映射来更新DataFrame中的simTime值
- df['simTime'] = df['simFrame'].map(sim_frame_to_time_map)
- # 检查simFrame列是否为空或包含非整数类型的数据
- if df['simFrame'].empty or not df['simFrame'].apply(lambda x: isinstance(x, (int, np.integer))).all():
- return df
- # 识别缺失的simFrame
- all_frames = np.arange(df['simFrame'].min(), df['simFrame'].max() + 1)
- missing_frames = set(all_frames) - set(df['simFrame'])
- new_rows = []
- # 填补缺失的simFrame
- for missing_frame in missing_frames:
- prev_frame = df[df['simFrame'] < missing_frame]['simFrame'].max()
- next_frame = df[df['simFrame'] > missing_frame]['simFrame'].min()
-
- if prev_frame is not None and next_frame is not None:
- prev_row = df[df['simFrame'] == prev_frame].iloc[0]
- next_row = df[df['simFrame'] == next_frame].iloc[0]
-
- # 计算平均值并创建新行
- new_row = prev_row.copy()
- new_row['simFrame'] = missing_frame
- for col in df.columns:
- if col not in ['simTime', 'simFrame']:
- new_row[col] = (prev_row[col] + next_row[col]) / 2
-
- # 更新simTime值
- new_row['simTime'] = sim_frame_to_time_map.get(missing_frame, np.nan)
-
- # 将新行添加到DataFrame中
- new_rows.append(new_row)
- if new_rows:
- df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
- return df.sort_values(by='simFrame').reset_index(drop=True)
- def mergecopy_by_simtime(merged_df, external_df,ignore_cols,prefix=None,):
- """
- 将external_df中所有的字段,基于nearest simTime,批量合并到merged_df中simtime相同的所有行中
- """
- useful_cols = [col for col in external_df.columns if col not in ignore_cols]
- for col in useful_cols:
- col_name = f"{prefix}_{col}" if prefix else col
- mapping = external_df.set_index('nearest_simTime')[col].to_dict()
- merged_df[col_name] = merged_df['simTime'].map(mapping)
- return merged_df
- def read_csv_with_filled_columns(file_path):
- try:
- # 确保 file_path 是字符串类型且有效
- if not isinstance(file_path, str):
- raise ValueError("提供的文件路径无效")
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"文件 {file_path} 不存在")
-
- # 使用 on_bad_lines='skip' 跳过格式错误的行
- df = pd.read_csv(file_path, on_bad_lines='skip') # 跳过格式错误的行
-
- # 强制填充缺失的列为 NaN,确保列数一致
- if not df.empty: # 确保 df 不为空
- df.fillna(np.nan, inplace=True) # 用 NaN 填充所有空值
- return df
- except Exception as e:
- print(f"读取 CSV 文件 {file_path} 时发生错误: {str(e)}")
- return pd.DataFrame() # 返回空的 DataFrame 以便继续处理
- def convert_heading(posH_rad):
- # 将弧度转换为角度
- angle_deg = np.degrees(posH_rad)
- # 逆时针东为0 => 顺时针北为0,相当于 new_angle = (90 - angle_deg) % 360
- heading_deg = (90 - angle_deg) % 360
- return round(heading_deg,3)
- class PGVILProcessor:
- """PGVIL数据处理器,实现PGVIL特有的处理逻辑"""
-
- def __init__(self, config: Config):
- self.config = config
- def process_zip(self) -> Path:
- """处理输入ZIP文件,并返回输出目录路径
- zip_path
- output_dir
- """
- print(f"Processing ZIP: {self.config.zip_path}")
- zip_path = self.config.zip_path
- output_dir = Path(self.config.output_dir)#将目录路径转换为Path对象
- # 创建以 ZIP 名称为子目录的提取目录
- zip_name = Path(zip_path).stem
- output_dir.mkdir(parents=True, exist_ok=True)
- # 提取HMIdata和RDBdata中的CSV文件
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- for name in zip_ref.namelist():
- if ('HMIdata/' in name or 'RDBdata/' in name) and name.endswith('.csv'):
- # 原 zip 内的子路径最后一部分为文件名
- filename = os.path.basename(name)
- src = zip_ref.open(name)
- dst_path = output_dir / filename
- # print(f"提取 {name} 到 {dst_path}")
- with open(dst_path, 'wb') as dst_file:
- shutil.copyfileobj(src, dst_file)
- # 更新 config 中的输出目录为刚才的子目录
- self.config.output_dir = output_dir
- return output_dir
- def merge_csv_files(self):
- x_offset = self.config.x_offset
- y_offset = self.config.y_offset
- data_path = self.config.output_dir
- # X_OFFSET = 258109.4239876
- # Y_OFFSET = 4149969.964821
- # 定义CSV文件路径
- try:
- obj_state_path = os.path.join(data_path, "ObjState.csv")
- ego_map_path = os.path.join(data_path, "EgoMap.csv")
- lane_map_path = os.path.join(data_path, "LaneMap.csv")
- laneINfo_path = os.path.join(data_path, "LaneInfo.csv")
- roadPos_path = os.path.join(data_path, "RoadPos.csv")
- vehicleystems_path = os.path.join(data_path, "VehicleSystems.csv")
- trafficlight_path = os.path.join(data_path, "TrafficLight.csv")
- function_path = os.path.join(data_path, "Function.csv")
- except FileNotFoundError:
- raise Exception("File not found")
-
- df_object = read_csv_with_filled_columns(obj_state_path)
- df_map_info = read_csv_with_filled_columns(ego_map_path)
- df_lane_map = read_csv_with_filled_columns(lane_map_path)
- df_laneINfo = read_csv_with_filled_columns(laneINfo_path)
- df_roadPos = read_csv_with_filled_columns(roadPos_path)
- df_vehicleystems = read_csv_with_filled_columns(vehicleystems_path)
- df_trafficlight = read_csv_with_filled_columns(trafficlight_path)
- df_function = read_csv_with_filled_columns(function_path)
- # 检查并转换数值型列
- def convert_numeric_columns(df):
- numeric_cols = df.select_dtypes(include=['number']).columns
- df[numeric_cols] = df[numeric_cols].astype(float)
- return df
- df_object = convert_numeric_columns(df_object)
- df_map_info = convert_numeric_columns(df_map_info)
- df_lane_map = convert_numeric_columns(df_lane_map)
- df_laneINfo = convert_numeric_columns(df_laneINfo)
- df_roadPos = convert_numeric_columns(df_roadPos)
- df_vehicleystems = convert_numeric_columns(df_vehicleystems)
- df_trafficlight = convert_numeric_columns(df_trafficlight)
- df_function = convert_numeric_columns(df_function)
- # 对df_object中的posX和posY应用偏置
- df_object['posX'] += x_offset
- df_object['posY'] += y_offset
- # 对齐simTime和simFrame
- df_object = align_simtime_by_simframe(df_object)
- df_map_info = align_simtime_by_simframe(df_map_info)
- df_lane_map = align_simtime_by_simframe(df_lane_map)
- df_laneINfo = align_simtime_by_simframe(df_laneINfo)
- df_roadPos = align_simtime_by_simframe(df_roadPos)
- df_vehicleystems = align_simtime_by_simframe(df_vehicleystems)
- df_trafficlight = align_simtime_by_simframe(df_trafficlight)
-
- del_ego_map = remove_conflicting_columns(df_object, df_map_info)#去掉重复的列
- # 合并数据
- merged_df = pd.merge(df_object, del_ego_map, on=["simTime", "simFrame", "playerId"], how="left")
- # 使用simTime, simFrame, playerId合并ObjState和LaneMap\trafficlight\trafficlight
- del_lane_map = remove_conflicting_columns(merged_df, df_lane_map)
- merged_df = pd.merge(merged_df, del_lane_map, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
- del_laneINfo = remove_conflicting_columns(merged_df, df_laneINfo)
- merged_df = pd.merge(merged_df, del_laneINfo, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
- del_roadPos = remove_conflicting_columns(merged_df, df_roadPos)
- merged_df = pd.merge(merged_df, del_roadPos, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
- del_trafficlight = remove_conflicting_columns(merged_df, df_trafficlight)
- merged_df = pd.merge(merged_df, del_trafficlight, on=["simTime", "simFrame"], how="left").drop_duplicates()
- del_vehicleystems = remove_conflicting_columns(merged_df, df_vehicleystems)
- merged_df = pd.merge(merged_df, del_vehicleystems, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
-
- tolerance = 0.01
- def find_closest_time(sim_time, sim_time_to_index, tolerance=0.01):
- # 找到最接近的时间点,并且该时间点的差异小于 tolerance
- closest_time = min(sim_time_to_index.keys(), key=lambda y: abs(y - sim_time) if abs(y - sim_time) < tolerance else float('inf'))
- return closest_time
- #创建一个映射,存储 df_object 中每个 simTime 值及其对应的行索引
- sim_time_to_index = {row['simTime']: idx for idx, row in merged_df.iterrows()}
- df_function = df_function.sort_values(by='simTime').reset_index(drop=True)#按simTime列排序
- #找到 function.csv 中每个 simTime 值在 df_object 中的最近时间点
- df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
- df_function['nearest_index'] = df_function['nearest_simTime'].map(sim_time_to_index)
- #确保df_function中的nearest_index为整数类型,且去掉NaN值
- df_function_renamed = df_function.rename(columns={'simTime': 'function_simTime'})#重命名 df_function 中的 simTime 列
- df_function_valid = df_function_renamed.dropna(subset=['nearest_index']).copy()
- df_function_valid['nearest_index'] = df_function_valid['nearest_index'].astype(int)
- ignore_cols = ['function_simTime', 'nearest_simTime', 'nearest_index']
- merged_df = mergecopy_by_simtime(merged_df, df_function_valid,ignore_cols)
- """
- def check_matching(df_function, sim_time_to_index, tolerance=0.01):
- #检查 function.csv 中的所有行是否都成功匹配
-
- # 计算每个 simTime 对应的 nearest_simTime
- df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
-
- # 检查是否有没有匹配到的行
- unmatched_rows = df_function[df_function['nearest_simTime'].isna()]
- if not unmatched_rows.empty:
- print(f"没有匹配上的行: {unmatched_rows}")
- else:
- print("所有行都成功匹配!")
-
- # 统计匹配上了的行数和没有匹配上的行数
- total_rows = len(df_function)
- matched_rows = len(df_function) - len(unmatched_rows)
- print(f"总行数: {total_rows}, 匹配上的行数: {matched_rows}, 没有匹配上的行数: {len(unmatched_rows)}")
-
- return unmatched_rows
- # 调用检查函数
- unmatched_rows = check_matching(df_function, sim_time_to_index, tolerance=0.01)
- # 获取最后一行的 simTime
- last_row_simtime = df_function.iloc[-1]['simTime']
- print(f"最后一行的 simTime: {last_row_simtime}")
- # 获取最后一行的 nearest_simTime
- last_row_nearest_simtime = df_function.iloc[-1]['nearest_simTime']
- print(f"最后一行的 nearest_simTime: {last_row_nearest_simtime}")
- """
- # 将弧度转换为角度
- merged_df['posH'] = merged_df['posH'].apply(convert_heading)
- if 'posP' in merged_df.columns:
- merged_df.rename(columns={'posP': 'pitch_rate'}, inplace=True)
- if 'posR' in merged_df.columns:
- merged_df.rename(columns={'posR': 'roll_rate'}, inplace=True)
-
- # 先使用 infer_objects 来确保类型一致
- merged_df = merged_df.infer_objects()
- merged_df.fillna(np.nan, inplace=True) # 确保空值填充为 NaN
- merged_csv_path = Path(data_path) / "merged_ObjState.csv"
- # merged_df.to_csv(merged_csv_path, index=False,na_rep="NaN")
- merged_df.to_csv(merged_csv_path, index=False)
-
- return merged_csv_path
-
- # @ErrorHandler.measure_performance
- # def process_built_in_data(self) -> Dict[str, Path]:
- # """实现PGVIL特有的内置数据处理逻辑
-
- # 处理顺序:
- # 1. 处理CAN数据
- # 2. 处理传感器数据
- # 3. 处理其他PGVIL特有数据
- # 4. 合并内置数据
-
- # Returns:
- # 处理结果文件路径字典
- # """
- # result_files = {}
-
- # # 1. 处理CAN数据
- # print("1. 处理CAN数据...")
- # can_results = self._process_can_data()
- # if can_results:
- # result_files.update(can_results)
- # else:
- # print("警告: CAN数据处理失败或无数据")
-
- # # 2. 处理传感器数据
- # print("\n2. 处理传感器数据...")
- # sensor_results = self._process_sensor_data()
- # if sensor_results:
- # result_files.update(sensor_results)
- # else:
- # print("警告: 传感器数据处理失败或无数据")
-
- # # 3. 处理其他PGVIL特有数据
- # print("\n3. 处理其他PGVIL数据...")
- # other_results = self._process_other_data()
- # if other_results:
- # result_files.update(other_results)
-
- # # 4. 合并内置数据
- # print("\n4. 合并内置数据...")
- # if not self._merge_built_in_data(result_files):
- # print("警告: 内置数据合并失败")
-
- # return result_files
-
- # def _process_can_data(self) -> Dict[str, Path]:
- # """处理CAN数据"""
- # # TODO: 实现CAN数据处理逻辑
- # return {}
-
- # def _process_sensor_data(self) -> Dict[str, Path]:
- # """处理传感器数据"""
- # # TODO: 实现传感器数据处理逻辑
- # return {}
-
- # def _process_other_data(self) -> Dict[str, Path]:
- # """处理其他PGVIL特有数据"""
- # # TODO: 实现其他数据处理逻辑
- # return {}
-
- # def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool:
- # """合并PGVIL内置数据
-
- # Args:
- # result_files: 处理结果文件路径字典
-
- # Returns:
- # 合并是否成功
- # """
- # try:
- # # 实现PGVIL特有的数据合并逻辑
- # return True
- # except Exception as e:
- # print(f"内置数据合并失败: {e}")
- # return False
|