pgvil.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. from pathlib import Path
  2. from typing import Dict, Any, Optional
  3. import pandas as pd
  4. import numpy as np
  5. from pyproj import Proj
  6. from dataclasses import dataclass, field
  7. from typing import Dict, Optional
  8. from pathlib import Path
  9. import pandas as pd
  10. from core.error_handler import ErrorHandler
  11. from core.config_manager import get_config
  12. import sys
  13. import csv
  14. import os
  15. import zipfile
  16. import argparse
  17. from genpy import Message
  18. import shutil
  19. import tempfile
  20. import pandas as pd
  21. import subprocess
  22. import pandas as pd
  23. import numpy as np
  24. @dataclass
  25. class Config:
  26. """PGVIL处理器配置类"""
  27. zip_path: Path
  28. output_path: Path
  29. engine_path: Optional[Path] = None
  30. map_path: Optional[Path] = None
  31. utm_zone: int = 51 # Example UTM zone
  32. x_offset: float = 0.0
  33. y_offset: float = 0.0
  34. def __post_init__(self):
  35. # Use output_path directly as output_dir to avoid nested directories
  36. self.output_dir = self.output_path
  37. self.output_dir.mkdir(parents=True, exist_ok=True)
  38. def run_pgvil_engine(config: Config):
  39. """Runs the external C++ preprocessing engine."""
  40. if not config.engine_path or not config.map_path:
  41. print("C++ engine path or map path not configured. Skipping C++ engine execution.")
  42. return True # Return True assuming it's optional or handled elsewhere
  43. engine_cmd = [
  44. str(config.engine_path),
  45. str(config.map_path),
  46. str(config.output_dir),
  47. str(config.x_offset),
  48. str(config.y_offset)
  49. ]
  50. print(f"--- Running C++ Preprocessing Engine ---")
  51. print(f"Command: {' '.join(engine_cmd)}")
  52. try:
  53. result = subprocess.run(
  54. engine_cmd,
  55. check=True, # Raise exception on non-zero exit code
  56. capture_output=True, # Capture stdout/stderr
  57. text=True, # Decode output as text
  58. cwd=config.engine_path.parent # Run from the engine's directory? Or script's? Adjust if needed.
  59. )
  60. print("C++ Engine Output:")
  61. print(result.stdout)
  62. if result.stderr:
  63. print("C++ Engine Error Output:")
  64. print(result.stderr)
  65. print("--- C++ Engine Finished Successfully ---")
  66. return True
  67. except FileNotFoundError:
  68. print(f"Error: C++ engine executable not found at {config.engine_path}.")
  69. return False
  70. except subprocess.CalledProcessError as e:
  71. print(f"Error: C++ engine failed with exit code {e.returncode}.")
  72. print("C++ Engine Output (stdout):")
  73. print(e.stdout)
  74. print("C++ Engine Output (stderr):")
  75. print(e.stderr)
  76. return False
  77. except Exception as e:
  78. print(f"An unexpected error occurred while running the C++ engine: {e}")
  79. return False
  80. def remove_conflicting_columns(df_object, df_csv_info):
  81. """
  82. delete the columns that are in both dataframes and are not simTime, simFrame, or playerId
  83. """
  84. conflicting_columns = set(df_object.columns) & set(df_csv_info.columns)
  85. for col in conflicting_columns:
  86. if col not in ["simTime", "simFrame", "playerId"]:
  87. del df_csv_info[col]
  88. return df_csv_info
  89. def align_simtime_by_simframe(df):
  90. # 创建一个映射,将simFrame映射到其对应的simTime代表值
  91. sim_frame_to_time_map = df.groupby('simFrame')['simTime'].first().to_dict()
  92. # 使用映射来更新DataFrame中的simTime值
  93. df['simTime'] = df['simFrame'].map(sim_frame_to_time_map)
  94. # 检查simFrame列是否为空或包含非整数类型的数据
  95. if df['simFrame'].empty or not df['simFrame'].apply(lambda x: isinstance(x, (int, np.integer))).all():
  96. return df
  97. # 识别缺失的simFrame
  98. all_frames = np.arange(df['simFrame'].min(), df['simFrame'].max() + 1)
  99. missing_frames = set(all_frames) - set(df['simFrame'])
  100. new_rows = []
  101. # 填补缺失的simFrame
  102. for missing_frame in missing_frames:
  103. prev_frame = df[df['simFrame'] < missing_frame]['simFrame'].max()
  104. next_frame = df[df['simFrame'] > missing_frame]['simFrame'].min()
  105. if prev_frame is not None and next_frame is not None:
  106. prev_row = df[df['simFrame'] == prev_frame].iloc[0]
  107. next_row = df[df['simFrame'] == next_frame].iloc[0]
  108. # 计算平均值并创建新行
  109. new_row = prev_row.copy()
  110. new_row['simFrame'] = missing_frame
  111. for col in df.columns:
  112. if col not in ['simTime', 'simFrame']:
  113. new_row[col] = (prev_row[col] + next_row[col]) / 2
  114. # 更新simTime值
  115. new_row['simTime'] = sim_frame_to_time_map.get(missing_frame, np.nan)
  116. # 将新行添加到DataFrame中
  117. new_rows.append(new_row)
  118. if new_rows:
  119. df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
  120. return df.sort_values(by='simFrame').reset_index(drop=True)
  121. def mergecopy_by_simtime(merged_df, external_df,ignore_cols,prefix=None,):
  122. """
  123. 将external_df中所有的字段,基于nearest simTime,批量合并到merged_df中simtime相同的所有行中
  124. """
  125. useful_cols = [col for col in external_df.columns if col not in ignore_cols]
  126. for col in useful_cols:
  127. col_name = f"{prefix}_{col}" if prefix else col
  128. mapping = external_df.set_index('nearest_simTime')[col].to_dict()
  129. merged_df[col_name] = merged_df['simTime'].map(mapping)
  130. return merged_df
  131. def read_csv_with_filled_columns(file_path):
  132. try:
  133. # 确保 file_path 是字符串类型且有效
  134. if not isinstance(file_path, str):
  135. raise ValueError("提供的文件路径无效")
  136. if not os.path.exists(file_path):
  137. raise FileNotFoundError(f"文件 {file_path} 不存在")
  138. # 使用 on_bad_lines='skip' 跳过格式错误的行
  139. df = pd.read_csv(file_path, on_bad_lines='skip') # 跳过格式错误的行
  140. # 强制填充缺失的列为 NaN,确保列数一致
  141. if not df.empty: # 确保 df 不为空
  142. df.fillna(np.nan, inplace=True) # 用 NaN 填充所有空值
  143. return df
  144. except Exception as e:
  145. print(f"读取 CSV 文件 {file_path} 时发生错误: {str(e)}")
  146. return pd.DataFrame() # 返回空的 DataFrame 以便继续处理
  147. def convert_heading(posH_rad):
  148. # 将弧度转换为角度
  149. angle_deg = np.degrees(posH_rad)
  150. # 逆时针东为0 => 顺时针北为0,相当于 new_angle = (90 - angle_deg) % 360
  151. heading_deg = (90 - angle_deg) % 360
  152. return round(heading_deg,3)
  153. class PGVILProcessor:
  154. """PGVIL数据处理器,实现PGVIL特有的处理逻辑"""
  155. def __init__(self, config: Config):
  156. self.config = config
  157. def process_zip(self) -> Path:
  158. """处理输入ZIP文件,并返回输出目录路径
  159. zip_path
  160. output_dir
  161. """
  162. print(f"Processing ZIP: {self.config.zip_path}")
  163. zip_path = self.config.zip_path
  164. output_dir = Path(self.config.output_dir)#将目录路径转换为Path对象
  165. # 创建以 ZIP 名称为子目录的提取目录
  166. zip_name = Path(zip_path).stem
  167. output_dir.mkdir(parents=True, exist_ok=True)
  168. # 提取HMIdata和RDBdata中的CSV文件
  169. with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  170. for name in zip_ref.namelist():
  171. if ('HMIdata/' in name or 'RDBdata/' in name) and name.endswith('.csv'):
  172. # 原 zip 内的子路径最后一部分为文件名
  173. filename = os.path.basename(name)
  174. src = zip_ref.open(name)
  175. dst_path = output_dir / filename
  176. # print(f"提取 {name} 到 {dst_path}")
  177. with open(dst_path, 'wb') as dst_file:
  178. shutil.copyfileobj(src, dst_file)
  179. # 更新 config 中的输出目录为刚才的子目录
  180. self.config.output_dir = output_dir
  181. return output_dir
  182. def merge_csv_files(self):
  183. x_offset = self.config.x_offset
  184. y_offset = self.config.y_offset
  185. data_path = self.config.output_dir
  186. # X_OFFSET = 258109.4239876
  187. # Y_OFFSET = 4149969.964821
  188. # 定义CSV文件路径
  189. try:
  190. obj_state_path = os.path.join(data_path, "ObjState.csv")
  191. ego_map_path = os.path.join(data_path, "EgoMap.csv")
  192. lane_map_path = os.path.join(data_path, "LaneMap.csv")
  193. laneINfo_path = os.path.join(data_path, "LaneInfo.csv")
  194. roadPos_path = os.path.join(data_path, "RoadPos.csv")
  195. vehicleystems_path = os.path.join(data_path, "VehicleSystems.csv")
  196. trafficlight_path = os.path.join(data_path, "TrafficLight.csv")
  197. function_path = os.path.join(data_path, "Function.csv")
  198. except FileNotFoundError:
  199. raise Exception("File not found")
  200. df_object = read_csv_with_filled_columns(obj_state_path)
  201. df_map_info = read_csv_with_filled_columns(ego_map_path)
  202. df_lane_map = read_csv_with_filled_columns(lane_map_path)
  203. df_laneINfo = read_csv_with_filled_columns(laneINfo_path)
  204. df_roadPos = read_csv_with_filled_columns(roadPos_path)
  205. df_vehicleystems = read_csv_with_filled_columns(vehicleystems_path)
  206. df_trafficlight = read_csv_with_filled_columns(trafficlight_path)
  207. df_function = read_csv_with_filled_columns(function_path)
  208. # 检查并转换数值型列
  209. def convert_numeric_columns(df):
  210. numeric_cols = df.select_dtypes(include=['number']).columns
  211. df[numeric_cols] = df[numeric_cols].astype(float)
  212. return df
  213. df_object = convert_numeric_columns(df_object)
  214. df_map_info = convert_numeric_columns(df_map_info)
  215. df_lane_map = convert_numeric_columns(df_lane_map)
  216. df_laneINfo = convert_numeric_columns(df_laneINfo)
  217. df_roadPos = convert_numeric_columns(df_roadPos)
  218. df_vehicleystems = convert_numeric_columns(df_vehicleystems)
  219. df_trafficlight = convert_numeric_columns(df_trafficlight)
  220. df_function = convert_numeric_columns(df_function)
  221. # 对df_object中的posX和posY应用偏置
  222. df_object['posX'] += x_offset
  223. df_object['posY'] += y_offset
  224. # 对齐simTime和simFrame
  225. df_object = align_simtime_by_simframe(df_object)
  226. df_map_info = align_simtime_by_simframe(df_map_info)
  227. df_lane_map = align_simtime_by_simframe(df_lane_map)
  228. df_laneINfo = align_simtime_by_simframe(df_laneINfo)
  229. df_roadPos = align_simtime_by_simframe(df_roadPos)
  230. df_vehicleystems = align_simtime_by_simframe(df_vehicleystems)
  231. df_trafficlight = align_simtime_by_simframe(df_trafficlight)
  232. del_ego_map = remove_conflicting_columns(df_object, df_map_info)#去掉重复的列
  233. # 合并数据
  234. merged_df = pd.merge(df_object, del_ego_map, on=["simTime", "simFrame", "playerId"], how="left")
  235. # 使用simTime, simFrame, playerId合并ObjState和LaneMap\trafficlight\trafficlight
  236. del_lane_map = remove_conflicting_columns(merged_df, df_lane_map)
  237. merged_df = pd.merge(merged_df, del_lane_map, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
  238. del_laneINfo = remove_conflicting_columns(merged_df, df_laneINfo)
  239. merged_df = pd.merge(merged_df, del_laneINfo, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
  240. del_roadPos = remove_conflicting_columns(merged_df, df_roadPos)
  241. merged_df = pd.merge(merged_df, del_roadPos, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
  242. del_trafficlight = remove_conflicting_columns(merged_df, df_trafficlight)
  243. merged_df = pd.merge(merged_df, del_trafficlight, on=["simTime", "simFrame"], how="left").drop_duplicates()
  244. del_vehicleystems = remove_conflicting_columns(merged_df, df_vehicleystems)
  245. merged_df = pd.merge(merged_df, del_vehicleystems, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
  246. tolerance = 0.01
  247. def find_closest_time(sim_time, sim_time_to_index, tolerance=0.01):
  248. # 找到最接近的时间点,并且该时间点的差异小于 tolerance
  249. closest_time = min(sim_time_to_index.keys(), key=lambda y: abs(y - sim_time) if abs(y - sim_time) < tolerance else float('inf'))
  250. return closest_time
  251. #创建一个映射,存储 df_object 中每个 simTime 值及其对应的行索引
  252. sim_time_to_index = {row['simTime']: idx for idx, row in merged_df.iterrows()}
  253. df_function = df_function.sort_values(by='simTime').reset_index(drop=True)#按simTime列排序
  254. #找到 function.csv 中每个 simTime 值在 df_object 中的最近时间点
  255. df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
  256. df_function['nearest_index'] = df_function['nearest_simTime'].map(sim_time_to_index)
  257. #确保df_function中的nearest_index为整数类型,且去掉NaN值
  258. df_function_renamed = df_function.rename(columns={'simTime': 'function_simTime'})#重命名 df_function 中的 simTime 列
  259. df_function_valid = df_function_renamed.dropna(subset=['nearest_index']).copy()
  260. df_function_valid['nearest_index'] = df_function_valid['nearest_index'].astype(int)
  261. ignore_cols = ['function_simTime', 'nearest_simTime', 'nearest_index']
  262. merged_df = mergecopy_by_simtime(merged_df, df_function_valid,ignore_cols)
  263. """
  264. def check_matching(df_function, sim_time_to_index, tolerance=0.01):
  265. #检查 function.csv 中的所有行是否都成功匹配
  266. # 计算每个 simTime 对应的 nearest_simTime
  267. df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
  268. # 检查是否有没有匹配到的行
  269. unmatched_rows = df_function[df_function['nearest_simTime'].isna()]
  270. if not unmatched_rows.empty:
  271. print(f"没有匹配上的行: {unmatched_rows}")
  272. else:
  273. print("所有行都成功匹配!")
  274. # 统计匹配上了的行数和没有匹配上的行数
  275. total_rows = len(df_function)
  276. matched_rows = len(df_function) - len(unmatched_rows)
  277. print(f"总行数: {total_rows}, 匹配上的行数: {matched_rows}, 没有匹配上的行数: {len(unmatched_rows)}")
  278. return unmatched_rows
  279. # 调用检查函数
  280. unmatched_rows = check_matching(df_function, sim_time_to_index, tolerance=0.01)
  281. # 获取最后一行的 simTime
  282. last_row_simtime = df_function.iloc[-1]['simTime']
  283. print(f"最后一行的 simTime: {last_row_simtime}")
  284. # 获取最后一行的 nearest_simTime
  285. last_row_nearest_simtime = df_function.iloc[-1]['nearest_simTime']
  286. print(f"最后一行的 nearest_simTime: {last_row_nearest_simtime}")
  287. """
  288. # 将弧度转换为角度
  289. merged_df['posH'] = merged_df['posH'].apply(convert_heading)
  290. if 'posP' in merged_df.columns:
  291. merged_df.rename(columns={'posP': 'pitch_rate'}, inplace=True)
  292. if 'posR' in merged_df.columns:
  293. merged_df.rename(columns={'posR': 'roll_rate'}, inplace=True)
  294. # 先使用 infer_objects 来确保类型一致
  295. merged_df = merged_df.infer_objects()
  296. merged_df.fillna(np.nan, inplace=True) # 确保空值填充为 NaN
  297. merged_csv_path = Path(data_path) / "merged_ObjState.csv"
  298. # merged_df.to_csv(merged_csv_path, index=False,na_rep="NaN")
  299. merged_df.to_csv(merged_csv_path, index=False)
  300. return merged_csv_path
  301. # @ErrorHandler.measure_performance
  302. # def process_built_in_data(self) -> Dict[str, Path]:
  303. # """实现PGVIL特有的内置数据处理逻辑
  304. # 处理顺序:
  305. # 1. 处理CAN数据
  306. # 2. 处理传感器数据
  307. # 3. 处理其他PGVIL特有数据
  308. # 4. 合并内置数据
  309. # Returns:
  310. # 处理结果文件路径字典
  311. # """
  312. # result_files = {}
  313. # # 1. 处理CAN数据
  314. # print("1. 处理CAN数据...")
  315. # can_results = self._process_can_data()
  316. # if can_results:
  317. # result_files.update(can_results)
  318. # else:
  319. # print("警告: CAN数据处理失败或无数据")
  320. # # 2. 处理传感器数据
  321. # print("\n2. 处理传感器数据...")
  322. # sensor_results = self._process_sensor_data()
  323. # if sensor_results:
  324. # result_files.update(sensor_results)
  325. # else:
  326. # print("警告: 传感器数据处理失败或无数据")
  327. # # 3. 处理其他PGVIL特有数据
  328. # print("\n3. 处理其他PGVIL数据...")
  329. # other_results = self._process_other_data()
  330. # if other_results:
  331. # result_files.update(other_results)
  332. # # 4. 合并内置数据
  333. # print("\n4. 合并内置数据...")
  334. # if not self._merge_built_in_data(result_files):
  335. # print("警告: 内置数据合并失败")
  336. # return result_files
  337. # def _process_can_data(self) -> Dict[str, Path]:
  338. # """处理CAN数据"""
  339. # # TODO: 实现CAN数据处理逻辑
  340. # return {}
  341. # def _process_sensor_data(self) -> Dict[str, Path]:
  342. # """处理传感器数据"""
  343. # # TODO: 实现传感器数据处理逻辑
  344. # return {}
  345. # def _process_other_data(self) -> Dict[str, Path]:
  346. # """处理其他PGVIL特有数据"""
  347. # # TODO: 实现其他数据处理逻辑
  348. # return {}
  349. # def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool:
  350. # """合并PGVIL内置数据
  351. # Args:
  352. # result_files: 处理结果文件路径字典
  353. # Returns:
  354. # 合并是否成功
  355. # """
  356. # try:
  357. # # 实现PGVIL特有的数据合并逻辑
  358. # return True
  359. # except Exception as e:
  360. # print(f"内置数据合并失败: {e}")
  361. # return False