lst.py 70 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493
  1. import zipfile
  2. import sqlite3
  3. import csv
  4. import tempfile
  5. from pathlib import Path
  6. from typing import List, Dict, Tuple, Optional, Any, NamedTuple
  7. import cantools
  8. import os
  9. import subprocess
  10. import numpy as np
  11. import pandas as pd
  12. from collections import Counter
  13. from datetime import datetime
  14. import argparse
  15. import sys
  16. from pyproj import Proj
  17. from bagpy.bagreader import bagreader
  18. import shutil
  19. import json
  20. from dataclasses import dataclass, field
  21. # --- Constants ---
  22. PLAYER_ID_EGO = int(1)
  23. PLAYER_ID_OBJ = int(2)
  24. DEFAULT_TYPE = int(1)
  25. OUTPUT_CSV_OBJSTATE = "ObjState.csv"
  26. OUTPUT_CSV_TEMP_OBJSTATE = "ObjState_temp_intermediate.csv" # Should be eliminated
  27. OUTPUT_CSV_EGOSTATE = "EgoState.csv" # Not used in final merge? Check logic if needed.
  28. OUTPUT_CSV_MERGED = "merged_ObjState.csv"
  29. OUTPUT_CSV_OBU = "OBUdata.csv"
  30. OUTPUT_CSV_LANEMAP = "LaneMap.csv"
  31. OUTPUT_CSV_EGOMAP = "EgoMap.csv"
  32. OUTPUT_CSV_FUNCTION = "Function.csv"
  33. ROADMARK_CSV = "RoadMark.csv"
  34. # --- Configuration Class ---
  35. @dataclass
  36. class Config:
  37. """Holds configuration paths and settings."""
  38. zip_path: Path
  39. output_path: Path
  40. json_path: Optional[Path] # Make json_path optional
  41. dbc_path: Optional[Path] = None
  42. engine_path: Optional[Path] = None
  43. map_path: Optional[Path] = None
  44. utm_zone: int = 51 # Example UTM zone
  45. x_offset: float = 0.0
  46. y_offset: float = 0.0
  47. # Derived paths
  48. output_dir: Path = field(init=False)
  49. def __post_init__(self):
  50. # Use output_path directly as output_dir to avoid nested directories
  51. self.output_dir = self.output_path
  52. self.output_dir.mkdir(parents=True, exist_ok=True)
  53. # --- Zip/CSV Processing ---
  54. class ZipCSVProcessor:
  55. """Processes DB files within a ZIP archive to generate CSV data."""
  56. # Define column mappings more clearly
  57. EGO_COLS_NEW = [
  58. "simTime", "simFrame", "playerId", "v", "speedX", "speedY",
  59. "posH", "speedH", "posX", "posY", "accelX", "accelY",
  60. "travelDist", "composite_v", "relative_dist", "type" # Added type
  61. ]
  62. OBJ_COLS_OLD_SUFFIXED = [
  63. "v_obj", "speedX_obj", "speedY_obj", "posH_obj", "speedH_obj",
  64. "posX_obj", "posY_obj", "accelX_obj", "accelY_obj", "travelDist_obj"
  65. ]
  66. OBJ_COLS_MAPPING = {old: new for old, new in
  67. zip(OBJ_COLS_OLD_SUFFIXED, EGO_COLS_NEW[3:13])} # Map suffixed cols to standard names
  68. def __init__(self, config: Config):
  69. self.config = config
  70. self.dbc = self._load_dbc(config.dbc_path)
  71. self.projection = Proj(proj='utm', zone=config.utm_zone, ellps='WGS84', preserve_units='m')
  72. self._init_table_config()
  73. self._init_keyword_mapping()
  74. def _load_dbc(self, dbc_path: Optional[Path]) -> Optional[cantools.db.Database]:
  75. if not dbc_path or not dbc_path.exists():
  76. print("DBC path not provided or file not found.")
  77. return None
  78. try:
  79. return cantools.db.load_file(dbc_path)
  80. except Exception as e:
  81. print(f"DBC loading failed: {e}")
  82. return None
  83. def _init_table_config(self):
  84. """Initializes configurations for different table types."""
  85. self.table_config = {
  86. "gnss_table": self._get_gnss_config(),
  87. "can_table": self._get_can_config()
  88. }
  89. def _get_gnss_config(self):
  90. # Keep relevant columns, adjust mapping as needed
  91. return {
  92. "output_columns": self.EGO_COLS_NEW, # Use the standard ego columns + type
  93. "mapping": { # Map output columns to source DB columns/signals
  94. "simTime": ("second", "usecond"),
  95. "simFrame": "ID",
  96. "v": "speed",
  97. "speedY": "y_speed",
  98. "speedX": "x_speed",
  99. "posH": "yaw",
  100. "speedH": "yaw_rate",
  101. "posX": "latitude_dd", # Source before projection
  102. "posY": "longitude_dd", # Source before projection
  103. "accelX": "x_acceleration",
  104. "accelY": "y_acceleration",
  105. "travelDist": "total_distance",
  106. # composite_v/relative_dist might not be direct fields in GNSS, handle later if needed
  107. "composite_v": "speed", # Placeholder, adjust if needed
  108. "relative_dist": None, # Placeholder, likely not in GNSS data
  109. "type": None # Will be set later
  110. },
  111. "db_columns": ["ID", "second", "usecond", "speed", "y_speed", "x_speed",
  112. "yaw", "yaw_rate", "latitude_dd", "longitude_dd",
  113. "x_acceleration", "y_acceleration", "total_distance"] # Actual cols to SELECT
  114. }
  115. def _get_can_config(self):
  116. # Define columns needed from DB/CAN signals for both EGO and OBJ
  117. return {
  118. "mapping": { # Map unified output columns to CAN signals or direct fields
  119. # EGO mappings (VUT = Vehicle Under Test)
  120. "v": "VUT_Speed_mps",
  121. "speedX": "VUT_Speed_x_mps",
  122. "speedY": "VUT_Speed_y_mps",
  123. "speedH": "VUT_Yaw_Rate",
  124. "posX": "VUT_GPS_Latitude", # Source before projection
  125. "posY": "VUT_GPS_Longitude", # Source before projection
  126. "posH": "VUT_Heading",
  127. "accelX": "VUT_Acc_X",
  128. "accelY": "VUT_Acc_Y",
  129. # OBJ mappings (UFO = Unidentified Flying Object / Other Vehicle)
  130. "v_obj": "Speed_mps",
  131. "speedX_obj": "UFO_Speed_x_mps",
  132. "speedY_obj": "UFO_Speed_y_mps",
  133. "speedH_obj": "Yaw_Rate",
  134. "posX_obj": "GPS_Latitude", # Source before projection
  135. "posY_obj": "GPS_Longitude", # Source before projection
  136. "posH_obj": "Heading",
  137. "accelX_obj": "Acc_X",
  138. "accelY_obj": "Acc_Y",
  139. # Relative Mappings
  140. "composite_v": "VUT_Rel_speed_long_mps",
  141. "relative_dist": "VUT_Dist_MRP_Abs",
  142. # travelDist often calculated, not direct CAN signal
  143. "travelDist": None, # Placeholder
  144. "travelDist_obj": None # Placeholder
  145. },
  146. "db_columns": ["ID", "second", "usecond", "timestamp", "canid", "len", "frame"] # Core DB columns
  147. }
  148. def _init_keyword_mapping(self):
  149. """Maps keywords in filenames to table configurations and output CSV names."""
  150. self.keyword_mapping = {
  151. "gnss": ("gnss_table", OUTPUT_CSV_OBJSTATE),
  152. # GNSS likely represents ego, writing to ObjState first? Revisit logic if needed.
  153. "can2": ("can_table", OUTPUT_CSV_OBJSTATE), # Process CAN data into the combined ObjState file
  154. }
  155. def process_zip(self) -> None:
  156. """Extracts and processes DB files from the configured ZIP path."""
  157. print(f"Processing ZIP: {self.config.zip_path}")
  158. output_dir = self.config.output_dir # Already created in Config
  159. try:
  160. with zipfile.ZipFile(self.config.zip_path, "r") as zip_ref:
  161. db_files_to_process = []
  162. for file_info in zip_ref.infolist():
  163. # Check if it's a DB file in the CANdata directory
  164. if 'CANdata/' in file_info.filename and file_info.filename.endswith('.db'):
  165. # Check if the filename contains any of the keywords
  166. match = self._match_keyword(file_info.filename)
  167. if match:
  168. table_type, csv_name = match
  169. db_files_to_process.append((file_info, table_type, csv_name))
  170. if not db_files_to_process:
  171. print("No relevant DB files found in CANdata/ matching keywords.")
  172. return
  173. # Process matched DB files
  174. with tempfile.TemporaryDirectory() as tmp_dir_str:
  175. tmp_dir = Path(tmp_dir_str)
  176. for file_info, table_type, csv_name in db_files_to_process:
  177. print(f"Processing DB: {file_info.filename} for table type {table_type}")
  178. extracted_path = tmp_dir / Path(file_info.filename).name
  179. try:
  180. # Extract the specific DB file
  181. with zip_ref.open(file_info.filename) as source, open(extracted_path, "wb") as target:
  182. shutil.copyfileobj(source, target)
  183. # Process the extracted DB file
  184. self._process_db_file(extracted_path, output_dir, table_type, csv_name)
  185. except (sqlite3.Error, pd.errors.EmptyDataError, FileNotFoundError, KeyError) as e:
  186. print(f"Error processing DB file {file_info.filename}: {e}")
  187. except Exception as e:
  188. print(f"Unexpected error processing DB file {file_info.filename}: {e}")
  189. finally:
  190. if extracted_path.exists():
  191. extracted_path.unlink() # Clean up extracted file
  192. except zipfile.BadZipFile:
  193. print(f"Error: Bad ZIP file: {self.config.zip_path}")
  194. except FileNotFoundError:
  195. print(f"Error: ZIP file not found: {self.config.zip_path}")
  196. except Exception as e:
  197. print(f"An error occurred during ZIP processing: {e}")
  198. def _match_keyword(self, filename: str) -> Optional[Tuple[str, str]]:
  199. """Finds the first matching keyword configuration for a filename."""
  200. for keyword, (table_type, csv_name) in self.keyword_mapping.items():
  201. if keyword in filename:
  202. return table_type, csv_name
  203. return None
  204. def _process_db_file(
  205. self, db_path: Path, output_dir: Path, table_type: str, csv_name: str
  206. ) -> None:
  207. """Connects to SQLite DB and processes the specified table type."""
  208. output_csv_path = output_dir / csv_name
  209. try:
  210. # Use URI for read-only connection
  211. conn_str = f"file:{db_path}?mode=ro"
  212. with sqlite3.connect(conn_str, uri=True) as conn:
  213. cursor = conn.cursor()
  214. if not self._check_table_exists(cursor, table_type):
  215. print(f"Table '{table_type}' does not exist in {db_path.name}. Skipping.")
  216. return
  217. if self._check_table_empty(cursor, table_type):
  218. print(f"Table '{table_type}' in {db_path.name} is empty. Skipping.")
  219. return
  220. print(f"Exporting data from table '{table_type}' to {output_csv_path}")
  221. if table_type == "can_table":
  222. self._process_can_table_optimized(cursor, output_csv_path)
  223. elif table_type == "gnss_table":
  224. # Pass output_path directly, avoid intermediate steps
  225. self._process_gnss_table(cursor, output_csv_path)
  226. else:
  227. print(f"Warning: No specific processor for table type '{table_type}'. Skipping.")
  228. except sqlite3.OperationalError as e:
  229. print(f"Database operational error for {db_path.name}: {e}. Check file integrity/permissions.")
  230. except sqlite3.DatabaseError as e:
  231. print(f"Database error connecting to {db_path.name}: {e}")
  232. except Exception as e:
  233. print(f"Unexpected error processing DB {db_path.name}: {e}")
  234. def _check_table_exists(self, cursor, table_name: str) -> bool:
  235. """Checks if a table exists in the database."""
  236. try:
  237. cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table_name,))
  238. return cursor.fetchone() is not None
  239. except sqlite3.Error as e:
  240. print(f"Error checking existence of table {table_name}: {e}")
  241. return False # Assume not exists on error
  242. def _check_table_empty(self, cursor, table_name: str) -> bool:
  243. """Checks if a table is empty."""
  244. try:
  245. cursor.execute(f"SELECT COUNT(*) FROM {table_name}") # Use COUNT(*) for efficiency
  246. count = cursor.fetchone()[0]
  247. return count == 0
  248. except sqlite3.Error as e:
  249. # If error occurs (e.g., table doesn't exist after check - race condition?), treat as problematic/empty
  250. print(f"Error checking if table {table_name} is empty: {e}")
  251. return True
  252. def _process_gnss_table(self, cursor, output_path: Path) -> None:
  253. """Processes gnss_table data and writes directly to CSV."""
  254. config = self.table_config["gnss_table"]
  255. db_columns = config["db_columns"]
  256. output_columns = config["output_columns"]
  257. mapping = config["mapping"]
  258. try:
  259. cursor.execute(f"SELECT {', '.join(db_columns)} FROM gnss_table")
  260. rows = cursor.fetchall()
  261. if not rows:
  262. print("No data found in gnss_table.")
  263. return
  264. processed_data = []
  265. for row in rows:
  266. row_dict = dict(zip(db_columns, row))
  267. record = {}
  268. # Calculate simTime
  269. record["simTime"] = round(row_dict.get("second", 0) + row_dict.get("usecond", 0) / 1e6, 2)
  270. # Map other columns
  271. for out_col in output_columns:
  272. if out_col == "simTime": continue # Already handled
  273. if out_col == "playerId":
  274. record[out_col] = PLAYER_ID_EGO # Assuming GNSS is ego
  275. continue
  276. if out_col == "type":
  277. record[out_col] = DEFAULT_TYPE
  278. continue
  279. source_info = mapping.get(out_col)
  280. if source_info is None:
  281. record[out_col] = 0.0 # Or np.nan if preferred
  282. elif isinstance(source_info, tuple):
  283. # This case was only for simTime, handled above
  284. record[out_col] = 0.0
  285. else: # Direct mapping from db_columns
  286. raw_value = row_dict.get(source_info)
  287. if raw_value is not None:
  288. # Handle projection for position columns
  289. if out_col == "posX":
  290. # Assuming source_info = "latitude_dd"
  291. lat = row_dict.get(mapping["posX"])
  292. lon = row_dict.get(mapping["posY"])
  293. if lat is not None and lon is not None:
  294. proj_x, _ = self.projection(lon, lat)
  295. record[out_col] = round(proj_x, 6)
  296. else:
  297. record[out_col] = 0.0
  298. elif out_col == "posY":
  299. # Assuming source_info = "longitude_dd"
  300. lat = row_dict.get(mapping["posX"])
  301. lon = row_dict.get(mapping["posY"])
  302. if lat is not None and lon is not None:
  303. _, proj_y = self.projection(lon, lat)
  304. record[out_col] = round(proj_y, 6)
  305. else:
  306. record[out_col] = 0.0
  307. elif out_col in ["composite_v", "relative_dist"]:
  308. # Handle these based on source if available, else default
  309. record[out_col] = round(float(raw_value), 3) if source_info else 0.0
  310. else:
  311. # General case: round numeric values
  312. try:
  313. record[out_col] = round(float(raw_value), 3)
  314. except (ValueError, TypeError):
  315. record[out_col] = raw_value # Keep as is if not numeric
  316. else:
  317. record[out_col] = 0.0 # Default for missing source data
  318. processed_data.append(record)
  319. if processed_data:
  320. df_final = pd.DataFrame(processed_data)[output_columns].iloc[::4].reset_index(drop=True) # Ensure column order
  321. df_final['simFrame'] = np.arange(1, len(df_final) + 1)
  322. df_final.to_csv(output_path, index=False, encoding="utf-8")
  323. print(f"Successfully wrote GNSS data to {output_path}")
  324. else:
  325. print("No processable records found in gnss_table.")
  326. except sqlite3.Error as e:
  327. print(f"SQL error during GNSS processing: {e}")
  328. except Exception as e:
  329. print(f"Unexpected error during GNSS processing: {e}")
  330. def _process_can_table_optimized(self, cursor, output_path: Path) -> None:
  331. """Processes CAN data directly into the final merged DataFrame format."""
  332. config = self.table_config["can_table"]
  333. db_columns = config["db_columns"]
  334. mapping = config["mapping"]
  335. try:
  336. cursor.execute(f"SELECT {', '.join(db_columns)} FROM can_table")
  337. rows = cursor.fetchall()
  338. if not rows:
  339. print("No data found in can_table.")
  340. return
  341. all_records = []
  342. for row in rows:
  343. row_dict = dict(zip(db_columns, row))
  344. # Decode CAN frame if DBC is available
  345. decoded_signals = self._decode_can_frame(row_dict)
  346. # Create a unified record combining DB fields and decoded signals
  347. record = self._create_unified_can_record(row_dict, decoded_signals, mapping)
  348. if record: # Only add if parsing was successful
  349. all_records.append(record)
  350. if not all_records:
  351. print("No CAN records could be successfully processed.")
  352. return
  353. # Convert raw records to DataFrame for easier manipulation
  354. df_raw = pd.DataFrame(all_records)
  355. # Separate EGO and OBJ data based on available columns
  356. df_ego = self._extract_vehicle_data(df_raw, PLAYER_ID_EGO)
  357. df_obj = self._extract_vehicle_data(df_raw, PLAYER_ID_OBJ)
  358. # Project coordinates
  359. df_ego = self._project_coordinates(df_ego, 'posX', 'posY')
  360. df_obj = self._project_coordinates(df_obj, 'posX', 'posY') # Use same column names after extraction
  361. # Add calculated/default columns
  362. df_ego['type'] = DEFAULT_TYPE
  363. df_obj['type'] = DEFAULT_TYPE
  364. # Note: travelDist is often calculated later or not available directly
  365. # Ensure both have the same columns before merging
  366. final_columns = self.EGO_COLS_NEW # Target columns
  367. df_ego = df_ego.reindex(columns=final_columns).iloc[::4]
  368. df_obj = df_obj.reindex(columns=final_columns).iloc[::4]
  369. # Reindex simFrame of ego and obj
  370. df_ego['simFrame'] = np.arange(1, len(df_ego)+1)
  371. df_obj['simFrame'] = np.arange(1, len(df_obj)+1)
  372. # Merge EGO and OBJ dataframes
  373. df_merged = pd.concat([df_ego, df_obj], ignore_index=True)
  374. # Sort and clean up
  375. df_merged.sort_values(by=["simTime", "simFrame", "playerId"], inplace=True)
  376. df_merged.reset_index(drop=True, inplace=True)
  377. # Fill potential NaNs introduced by reindexing or missing data
  378. # Choose appropriate fill strategy (e.g., 0, forward fill, or leave as NaN)
  379. # df_merged.fillna(0.0, inplace=True) # Example: fill with 0.0
  380. # Save the final merged DataFrame
  381. df_merged.to_csv(output_path, index=False, encoding="utf-8")
  382. print(f"Successfully processed CAN data and wrote merged output to {output_path}")
  383. except sqlite3.Error as e:
  384. print(f"SQL error during CAN processing: {e}")
  385. except KeyError as e:
  386. print(f"Key error during CAN processing - mapping issue? Missing key: {e}")
  387. except Exception as e:
  388. print(f"Unexpected error during CAN processing: {e}")
  389. import traceback
  390. traceback.print_exc() # Print detailed traceback for debugging
  391. def _decode_can_frame(self, row_dict: Dict) -> Dict[str, Any]:
  392. """Decodes CAN frame using DBC file if available."""
  393. decoded_signals = {}
  394. if self.dbc and 'canid' in row_dict and 'frame' in row_dict and 'len' in row_dict:
  395. can_id = row_dict['canid']
  396. frame_bytes = bytes(row_dict['frame'][:row_dict['len']]) # Ensure correct length
  397. try:
  398. message_def = self.dbc.get_message_by_frame_id(can_id)
  399. decoded_signals = message_def.decode(frame_bytes, decode_choices=False,
  400. allow_truncated=True) # Allow truncated
  401. except KeyError:
  402. # Optional: print(f"Warning: CAN ID 0x{can_id:X} not found in DBC.")
  403. pass # Ignore unknown IDs silently
  404. except ValueError as e:
  405. print(
  406. f"Warning: Decoding ValueError for CAN ID 0x{can_id:X} (length {row_dict['len']}, data: {frame_bytes.hex()}): {e}")
  407. except Exception as e:
  408. print(f"Warning: Error decoding CAN ID 0x{can_id:X}: {e}")
  409. return decoded_signals
  410. def _create_unified_can_record(self, row_dict: Dict, decoded_signals: Dict, mapping: Dict) -> Optional[
  411. Dict[str, Any]]:
  412. """Creates a single record combining DB fields and decoded signals based on mapping."""
  413. record = {}
  414. try:
  415. # Handle time and frame ID first
  416. record["simTime"] = round(row_dict.get("second", 0) + row_dict.get("usecond", 0) / 1e6, 2)
  417. record["simFrame"] = row_dict.get("ID")
  418. record["canid"] = f"0x{row_dict.get('canid'):X}" # Store CAN ID if needed
  419. # Populate record using the mapping config
  420. for target_col, source_info in mapping.items():
  421. if target_col in ["simTime", "simFrame", "canid"]: continue # Already handled
  422. if isinstance(source_info, tuple): continue # Should only be time
  423. # source_info is now the signal name (or None)
  424. signal_name = source_info
  425. if signal_name and signal_name in decoded_signals:
  426. # Value from decoded CAN signal
  427. raw_value = decoded_signals[signal_name]
  428. try:
  429. # Apply scaling/offset if needed (cantools handles this)
  430. # Round appropriately, especially for floats
  431. if isinstance(raw_value, (int, float)):
  432. # Be cautious with lat/lon precision before projection
  433. if "Latitude" in target_col or "Longitude" in target_col:
  434. record[target_col] = float(raw_value) # Keep precision for projection
  435. else:
  436. record[target_col] = round(float(raw_value), 6)
  437. else:
  438. record[target_col] = raw_value # Keep non-numeric as is (e.g., enums)
  439. except (ValueError, TypeError):
  440. record[target_col] = raw_value # Assign raw value if conversion fails
  441. # If signal not found or source_info is None, leave it empty for now
  442. # Will be filled later or during DataFrame processing
  443. return record
  444. except Exception as e:
  445. print(f"Error creating unified record for row {row_dict.get('ID')}: {e}")
  446. return None
  447. def _extract_vehicle_data(self, df_raw: pd.DataFrame, player_id: int) -> pd.DataFrame:
  448. """Extracts and renames columns for a specific vehicle (EGO or OBJ)."""
  449. df_vehicle = pd.DataFrame()
  450. # df_vehicle["simTime"] = df_raw["simTime"].drop_duplicates().sort_values().reset_index(drop=True)
  451. # df_vehicle["simFrame"] = np.arange(1, len(df_vehicle) + 1)
  452. # df_vehicle["playerId"] = int(player_id)
  453. df_vehicle_temps_ego = pd.DataFrame()
  454. df_vehicle_temps_obj = pd.DataFrame()
  455. if player_id == PLAYER_ID_EGO:
  456. # Select EGO columns (not ending in _obj) + relative columns
  457. ego_cols = {target: source for target, source in self.table_config['can_table']['mapping'].items()
  458. if source and not isinstance(source, tuple) and not target.endswith('_obj')}
  459. rename_map = {}
  460. select_cols_raw = []
  461. for target_col, source_info in ego_cols.items():
  462. if source_info: # Mapped signal/field name in df_raw
  463. select_cols_raw.append(target_col) # Column names in df_raw are already target names
  464. rename_map[target_col] = target_col # No rename needed here
  465. # Include relative speed and distance for ego frame
  466. relative_cols = ["composite_v", "relative_dist"]
  467. select_cols_raw.extend(relative_cols)
  468. for col in relative_cols:
  469. rename_map[col] = col
  470. # Select and rename
  471. df_vehicle_temp = df_raw[list(set(select_cols_raw) & set(df_raw.columns))] # Select available columns
  472. for col in df_vehicle_temp.columns:
  473. df_vehicle_temps_ego[col] = df_vehicle_temp[col].dropna().reset_index(drop=True)
  474. df_vehicle = pd.concat([df_vehicle, df_vehicle_temps_ego], axis=1)
  475. elif player_id == PLAYER_ID_OBJ:
  476. # Select OBJ columns (ending in _obj)
  477. obj_cols = {target: source for target, source in self.table_config['can_table']['mapping'].items()
  478. if source and not isinstance(source, tuple) and target.endswith('_obj')}
  479. rename_map = {}
  480. select_cols_raw = []
  481. for target_col, source_info in obj_cols.items():
  482. if source_info:
  483. select_cols_raw.append(target_col) # Original _obj column name
  484. # Map from VUT_XXX_obj -> VUT_XXX
  485. rename_map[target_col] = self.OBJ_COLS_MAPPING.get(target_col,
  486. target_col) # Rename to standard name
  487. # Select and rename
  488. df_vehicle_temp = df_raw[list(set(select_cols_raw) & set(df_raw.columns))] # Select available columns
  489. df_vehicle_temp.rename(columns=rename_map, inplace=True)
  490. for col in df_vehicle_temp.columns:
  491. df_vehicle_temps_obj[col] = df_vehicle_temp[col].dropna().reset_index(drop=True)
  492. df_vehicle = pd.concat([df_vehicle, df_vehicle_temps_obj], axis=1)
  493. # Copy relative speed/distance from ego calculation (assuming it's relative *to* ego)
  494. if "composite_v" in df_raw.columns:
  495. df_vehicle["composite_v"] = df_raw["composite_v"].dropna().reset_index(drop=True)
  496. if "relative_dist" in df_raw.columns:
  497. df_vehicle["relative_dist"] = df_raw["relative_dist"].dropna().reset_index(drop=True)
  498. # Drop rows where essential position data might be missing after selection/renaming
  499. # Adjust required columns as necessary
  500. # required_pos = ['posX', 'posY', 'posH']
  501. # df_vehicle.dropna(subset=[col for col in required_pos if col in df_vehicle.columns], inplace=True)
  502. try:
  503. # x = np.arange(df_raw["simTime"].tolist()[0], df_raw["simTime"].tolist()[0] + 0.01 * (len(df_vehicle)),0.01)
  504. # y = f"{x: .02f}"
  505. df_vehicle["simTime"] = np.round(np.linspace(df_raw["simTime"].tolist()[0]+28800, df_raw["simTime"].tolist()[0]+28800 + 0.01*(len(df_vehicle)), len(df_vehicle)), 2)
  506. df_vehicle["simFrame"] = np.arange(1, len(df_vehicle) + 1)
  507. df_vehicle["playerId"] = int(player_id)
  508. df_vehicle['playerId'] = pd.to_numeric(df_vehicle['playerId']).astype(int)
  509. except ValueError as ve:
  510. print(f"{ve}")
  511. except TypeError as te:
  512. print(f"{te}")
  513. except Exception as Ee:
  514. print(f"{Ee}")
  515. return df_vehicle
  516. def _project_coordinates(self, df: pd.DataFrame, lat_col: str, lon_col: str) -> pd.DataFrame:
  517. """Applies UTM projection to latitude and longitude columns."""
  518. if lat_col in df.columns and lon_col in df.columns:
  519. # Ensure data is numeric and handle potential errors/missing values
  520. lat = pd.to_numeric(df[lat_col], errors='coerce')
  521. lon = pd.to_numeric(df[lon_col], errors='coerce')
  522. valid_coords = lat.notna() & lon.notna()
  523. if valid_coords.any():
  524. x, y = self.projection(lon[valid_coords].values, lat[valid_coords].values)
  525. # Update DataFrame, assign NaN where original coords were invalid
  526. df.loc[valid_coords, lat_col] = np.round(x, 6) # Overwrite latitude col with X
  527. df.loc[valid_coords, lon_col] = np.round(y, 6) # Overwrite longitude col with Y
  528. df.loc[~valid_coords, [lat_col, lon_col]] = np.nan # Set invalid coords to NaN
  529. else:
  530. # No valid coordinates found, set columns to NaN or handle as needed
  531. df[lat_col] = np.nan
  532. df[lon_col] = np.nan
  533. # Rename columns AFTER projection for clarity
  534. df.rename(columns={lat_col: 'posX', lon_col: 'posY'}, inplace=True)
  535. else:
  536. # Ensure columns exist even if projection didn't happen
  537. if 'posX' not in df.columns: df['posX'] = np.nan
  538. if 'posY' not in df.columns: df['posY'] = np.nan
  539. print(f"Warning: Latitude ('{lat_col}') or Longitude ('{lon_col}') columns not found for projection.")
  540. return df
  541. # --- Polynomial Fitting (Largely unchanged, minor cleanup) ---
  542. class PolynomialCurvatureFitting:
  543. """Calculates curvature and its derivative using polynomial fitting."""
  544. def __init__(self, lane_map_path: Path, degree: int = 3):
  545. self.lane_map_path = lane_map_path
  546. self.degree = degree
  547. self.data = self._load_data()
  548. if self.data is not None:
  549. self.points = self.data[["centerLine_x", "centerLine_y"]].values
  550. self.x_data, self.y_data = self.points[:, 0], self.points[:, 1]
  551. else:
  552. self.points = np.empty((0, 2))
  553. self.x_data, self.y_data = np.array([]), np.array([])
  554. def _load_data(self) -> Optional[pd.DataFrame]:
  555. """Loads lane map data safely."""
  556. if not self.lane_map_path.exists() or self.lane_map_path.stat().st_size == 0:
  557. print(f"Warning: LaneMap file not found or empty: {self.lane_map_path}")
  558. return None
  559. try:
  560. return pd.read_csv(self.lane_map_path)
  561. except pd.errors.EmptyDataError:
  562. print(f"Warning: LaneMap file is empty: {self.lane_map_path}")
  563. return None
  564. except Exception as e:
  565. print(f"Error reading LaneMap file {self.lane_map_path}: {e}")
  566. return None
  567. def curvature(self, coefficients: np.ndarray, x: float) -> float:
  568. """Computes curvature of the polynomial at x."""
  569. if len(coefficients) < 3: # Need at least degree 2 for curvature
  570. return 0.0
  571. first_deriv_coeffs = np.polyder(coefficients)
  572. second_deriv_coeffs = np.polyder(first_deriv_coeffs)
  573. dy_dx = np.polyval(first_deriv_coeffs, x)
  574. d2y_dx2 = np.polyval(second_deriv_coeffs, x)
  575. denominator = (1 + dy_dx ** 2) ** 1.5
  576. return np.abs(d2y_dx2) / denominator if denominator != 0 else np.inf
  577. def curvature_derivative(self, coefficients: np.ndarray, x: float) -> float:
  578. """Computes the derivative of curvature with respect to x."""
  579. if len(coefficients) < 4: # Need at least degree 3 for derivative of curvature
  580. return 0.0
  581. first_deriv_coeffs = np.polyder(coefficients)
  582. second_deriv_coeffs = np.polyder(first_deriv_coeffs)
  583. third_deriv_coeffs = np.polyder(second_deriv_coeffs)
  584. dy_dx = np.polyval(first_deriv_coeffs, x)
  585. d2y_dx2 = np.polyval(second_deriv_coeffs, x)
  586. d3y_dx3 = np.polyval(third_deriv_coeffs, x)
  587. denominator = (1 + dy_dx ** 2) ** 2.5 # Note the power is 2.5 or 5/2
  588. if denominator == 0:
  589. return np.inf
  590. numerator = d3y_dx3 * (1 + dy_dx ** 2) - 3 * dy_dx * d2y_dx2 * d2y_dx2 # Corrected term order? Verify formula
  591. # Standard formula: (d3y_dx3*(1 + dy_dx**2) - 3*dy_dx*(d2y_dx2**2)) / ((1 + dy_dx**2)**(5/2)) * sign(d2y_dx2)
  592. # Let's stick to the provided calculation logic but ensure denominator is correct
  593. # The provided formula in the original code seems to be for dk/ds (arc length), not dk/dx.
  594. # Re-implementing dk/dx based on standard calculus:
  595. term1 = d3y_dx3 * (1 + dy_dx ** 2) ** (3 / 2)
  596. term2 = d2y_dx2 * (3 / 2) * (1 + dy_dx ** 2) ** (1 / 2) * (2 * dy_dx * d2y_dx2) # Chain rule
  597. numerator_dk_dx = term1 - term2
  598. denominator_dk_dx = (1 + dy_dx ** 2) ** 3
  599. if denominator_dk_dx == 0:
  600. return np.inf
  601. # Take absolute value or not? Original didn't. Let's omit abs() for derivative.
  602. return numerator_dk_dx / denominator_dk_dx
  603. # dk_dx = (d3y_dx3 * (1 + dy_dx ** 2) - 3 * dy_dx * d2y_dx2 ** 2) / (
  604. # (1 + dy_dx ** 2) ** (5/2) # Original had power 3 ?? Double check this formula source
  605. # ) * np.sign(d2y_dx2) # Need sign of curvature
  606. # return dk_dx
  607. def polynomial_fit(
  608. self, x_window: np.ndarray, y_window: np.ndarray
  609. ) -> Tuple[Optional[np.ndarray], Optional[np.poly1d]]:
  610. """Performs polynomial fitting, handling potential rank warnings."""
  611. if len(x_window) <= self.degree:
  612. print(f"Warning: Window size {len(x_window)} is <= degree {self.degree}. Cannot fit.")
  613. return None, None
  614. try:
  615. # Use warnings context manager if needed, but RankWarning often indicates insufficient data variability
  616. # with warnings.catch_warnings():
  617. # warnings.filterwarnings('error', category=np.RankWarning) # Or ignore
  618. coefficients = np.polyfit(x_window, y_window, self.degree)
  619. return coefficients, np.poly1d(coefficients)
  620. except np.RankWarning:
  621. print(f"Warning: Rank deficient fitting for window. Check data variability.")
  622. # Attempt lower degree fit? Or return None? For now, return None.
  623. # try:
  624. # coefficients = np.polyfit(x_window, y_window, len(x_window) - 1)
  625. # return coefficients, np.poly1d(coefficients)
  626. # except:
  627. return None, None
  628. except Exception as e:
  629. print(f"Error during polynomial fit: {e}")
  630. return None, None
  631. def find_best_window(self, point: Tuple[float, float], window_size: int) -> Optional[int]:
  632. """Finds the start index of the window whose center is closest to the point."""
  633. if len(self.x_data) < window_size:
  634. print("Warning: Not enough data points for the specified window size.")
  635. return None
  636. x_point, y_point = point
  637. min_dist_sq = np.inf
  638. best_start_index = -1
  639. # Calculate window centers more efficiently
  640. # Use rolling mean if window_size is large, otherwise simple loop is fine
  641. num_windows = len(self.x_data) - window_size + 1
  642. if num_windows <= 0: return None
  643. for start in range(num_windows):
  644. x_center = np.mean(self.x_data[start: start + window_size])
  645. y_center = np.mean(self.y_data[start: start + window_size])
  646. dist_sq = (x_point - x_center) ** 2 + (y_point - y_center) ** 2
  647. if dist_sq < min_dist_sq:
  648. min_dist_sq = dist_sq
  649. best_start_index = start
  650. return best_start_index if best_start_index != -1 else None
  651. def find_projection(
  652. self,
  653. x_target: float,
  654. y_target: float,
  655. polynomial: np.poly1d,
  656. x_range: Tuple[float, float],
  657. search_points: int = 100, # Number of points instead of step size
  658. ) -> Optional[Tuple[float, float, float]]:
  659. """Finds the approximate closest point on the polynomial within the x_range."""
  660. if x_range[1] <= x_range[0]: return None # Invalid range
  661. x_values = np.linspace(x_range[0], x_range[1], search_points)
  662. y_values = polynomial(x_values)
  663. distances_sq = (x_target - x_values) ** 2 + (y_target - y_values) ** 2
  664. if len(distances_sq) == 0: return None
  665. min_idx = np.argmin(distances_sq)
  666. min_distance = np.sqrt(distances_sq[min_idx])
  667. return x_values[min_idx], y_values[min_idx], min_distance
  668. def fit_and_project(
  669. self, points: np.ndarray, window_size: int
  670. ) -> List[Dict[str, Any]]:
  671. """Fits polynomial and calculates curvature for each point in the input array."""
  672. if self.data is None or len(self.x_data) < window_size:
  673. print("Insufficient LaneMap data for fitting.")
  674. # Return default values for all points
  675. return [
  676. {
  677. "projection": (np.nan, np.nan),
  678. "curvHor": np.nan,
  679. "curvHorDot": np.nan,
  680. "laneOffset": np.nan,
  681. }
  682. ] * len(points)
  683. results = []
  684. if points.ndim != 2 or points.shape[1] != 2:
  685. raise ValueError("Input points must be a 2D numpy array with shape (n, 2).")
  686. for x_target, y_target in points:
  687. result = { # Default result
  688. "projection": (np.nan, np.nan),
  689. "curvHor": np.nan,
  690. "curvHorDot": np.nan,
  691. "laneOffset": np.nan,
  692. }
  693. best_start = self.find_best_window((x_target, y_target), window_size)
  694. if best_start is None:
  695. results.append(result)
  696. continue
  697. x_window = self.x_data[best_start: best_start + window_size]
  698. y_window = self.y_data[best_start: best_start + window_size]
  699. coefficients, polynomial = self.polynomial_fit(x_window, y_window)
  700. if coefficients is None or polynomial is None:
  701. results.append(result)
  702. continue
  703. x_min, x_max = np.min(x_window), np.max(x_window)
  704. projection_result = self.find_projection(
  705. x_target, y_target, polynomial, (x_min, x_max)
  706. )
  707. if projection_result is None:
  708. results.append(result)
  709. continue
  710. proj_x, proj_y, min_distance = projection_result
  711. curv_hor = self.curvature(coefficients, proj_x)
  712. curv_hor_dot = self.curvature_derivative(coefficients, proj_x)
  713. result = {
  714. "projection": (round(proj_x, 6), round(proj_y, 6)),
  715. "curvHor": round(curv_hor, 6),
  716. "curvHorDot": round(curv_hor_dot, 6),
  717. "laneOffset": round(min_distance, 6),
  718. }
  719. results.append(result)
  720. return results
  721. # --- Data Quality Analyzer (Optimized) ---
  722. class DataQualityAnalyzer:
  723. """Analyzes data quality metrics, focusing on frame loss."""
  724. def __init__(self, df: Optional[pd.DataFrame] = None):
  725. self.df = df if df is not None and not df.empty else pd.DataFrame() # Ensure df is DataFrame
  726. def analyze_frame_loss(self) -> Dict[str, Any]:
  727. """Analyzes frame loss characteristics."""
  728. metrics = {
  729. "total_frames_data": 0,
  730. "unique_frames_count": 0,
  731. "min_frame": np.nan,
  732. "max_frame": np.nan,
  733. "expected_frames": 0,
  734. "dropped_frames_count": 0,
  735. "loss_rate": np.nan,
  736. "max_consecutive_loss": 0,
  737. "max_loss_start_frame": np.nan,
  738. "max_loss_end_frame": np.nan,
  739. "loss_intervals_distribution": {},
  740. "valid": False, # Indicate if analysis was possible
  741. "message": ""
  742. }
  743. if self.df.empty or 'simFrame' not in self.df.columns:
  744. metrics["message"] = "DataFrame is empty or 'simFrame' column is missing."
  745. return metrics
  746. # Drop rows with NaN simFrame and ensure integer type
  747. frames_series = self.df['simFrame'].dropna().astype(int)
  748. metrics["total_frames_data"] = len(frames_series)
  749. if frames_series.empty:
  750. metrics["message"] = "No valid 'simFrame' data found after dropping NaN."
  751. return metrics
  752. unique_frames = sorted(frames_series.unique())
  753. metrics["unique_frames_count"] = len(unique_frames)
  754. if metrics["unique_frames_count"] < 2:
  755. metrics["message"] = "Less than two unique frames; cannot analyze loss."
  756. metrics["valid"] = True # Data exists, just not enough to analyze loss
  757. if metrics["unique_frames_count"] == 1:
  758. metrics["min_frame"] = unique_frames[0]
  759. metrics["max_frame"] = unique_frames[0]
  760. metrics["expected_frames"] = 1
  761. return metrics
  762. metrics["min_frame"] = unique_frames[0]
  763. metrics["max_frame"] = unique_frames[-1]
  764. metrics["expected_frames"] = metrics["max_frame"] - metrics["min_frame"] + 1
  765. # Calculate differences between consecutive unique frames
  766. frame_diffs = np.diff(unique_frames)
  767. # Gaps are where diff > 1. The number of lost frames in a gap is diff - 1.
  768. gaps = frame_diffs[frame_diffs > 1]
  769. lost_frames_in_gaps = gaps - 1
  770. metrics["dropped_frames_count"] = int(lost_frames_in_gaps.sum())
  771. if metrics["expected_frames"] > 0:
  772. metrics["loss_rate"] = round(metrics["dropped_frames_count"] / metrics["expected_frames"], 4)
  773. else:
  774. metrics["loss_rate"] = 0.0 # Avoid division by zero if min_frame == max_frame (already handled)
  775. if len(lost_frames_in_gaps) > 0:
  776. metrics["max_consecutive_loss"] = int(lost_frames_in_gaps.max())
  777. # Find where the max loss occurred
  778. max_loss_indices = np.where(frame_diffs == metrics["max_consecutive_loss"] + 1)[0]
  779. # Get the first occurrence start/end frames
  780. max_loss_idx = max_loss_indices[0]
  781. metrics["max_loss_start_frame"] = unique_frames[max_loss_idx]
  782. metrics["max_loss_end_frame"] = unique_frames[max_loss_idx + 1]
  783. # Count distribution of loss interval lengths
  784. loss_counts = Counter(lost_frames_in_gaps)
  785. metrics["loss_intervals_distribution"] = {int(k): int(v) for k, v in loss_counts.items()}
  786. else:
  787. metrics["max_consecutive_loss"] = 0
  788. metrics["valid"] = True
  789. metrics["message"] = "Frame loss analysis complete."
  790. return metrics
  791. def get_all_csv_files(path: Path) -> List[Path]:
  792. """Gets all CSV files in path, excluding specific ones."""
  793. excluded_files = {OUTPUT_CSV_LANEMAP, ROADMARK_CSV}
  794. return [
  795. file_path
  796. for file_path in path.rglob("*.csv") # Recursive search
  797. if file_path.is_file() and file_path.name not in excluded_files
  798. ]
  799. def run_frame_loss_analysis_on_folder(path: Path) -> Dict[str, Dict[str, Any]]:
  800. """Runs frame loss analysis on all relevant CSV files in a folder."""
  801. analysis_results = {}
  802. csv_files = get_all_csv_files(path)
  803. if not csv_files:
  804. print(f"No relevant CSV files found in {path}")
  805. return analysis_results
  806. for file_path in csv_files:
  807. file_name = file_path.name
  808. if file_name in {OUTPUT_CSV_FUNCTION, OUTPUT_CSV_OBU}: # Skip specific files if needed
  809. print(f"Skipping frame analysis for: {file_name}")
  810. continue
  811. print(f"Analyzing frame loss for: {file_name}")
  812. if file_path.stat().st_size == 0:
  813. print(f"File {file_name} is empty. Skipping analysis.")
  814. analysis_results[file_name] = {"valid": False, "message": "File is empty."}
  815. continue
  816. try:
  817. # Read only necessary column if possible, handle errors
  818. df = pd.read_csv(file_path, usecols=['simFrame'], index_col=False,
  819. on_bad_lines='warn') # 'warn' or 'skip'
  820. analyzer = DataQualityAnalyzer(df)
  821. metrics = analyzer.analyze_frame_loss()
  822. analysis_results[file_name] = metrics
  823. # Optionally print a summary here
  824. if metrics["valid"]:
  825. print(f" Loss Rate: {metrics.get('loss_rate', np.nan) * 100:.2f}%, "
  826. f"Dropped: {metrics.get('dropped_frames_count', 'N/A')}, "
  827. f"Max Gap: {metrics.get('max_consecutive_loss', 'N/A')}")
  828. else:
  829. print(f" Analysis failed: {metrics.get('message')}")
  830. except pd.errors.EmptyDataError:
  831. print(f"File {file_name} contains no data after reading.")
  832. analysis_results[file_name] = {"valid": False, "message": "Empty data after read."}
  833. except ValueError as ve: # Handle case where simFrame might not be present
  834. print(f"ValueError processing file {file_name}: {ve}. Is 'simFrame' column present?")
  835. analysis_results[file_name] = {"valid": False, "message": f"ValueError: {ve}"}
  836. except Exception as e:
  837. print(f"Unexpected error processing file {file_name}: {e}")
  838. analysis_results[file_name] = {"valid": False, "message": f"Unexpected error: {e}"}
  839. return analysis_results
  840. def data_precheck(output_dir: Path, max_allowed_loss_rate: float = 0.20) -> bool:
  841. """Checks data quality, focusing on frame loss rate."""
  842. print(f"--- Running Data Quality Precheck on: {output_dir} ---")
  843. if not output_dir.exists() or not output_dir.is_dir():
  844. print(f"Error: Output directory does not exist: {output_dir}")
  845. return False
  846. try:
  847. frame_loss_results = run_frame_loss_analysis_on_folder(output_dir)
  848. except Exception as e:
  849. print(f"Critical error during frame loss analysis: {e}")
  850. return False # Treat critical error as failure
  851. if not frame_loss_results:
  852. print("Warning: No files were analyzed for frame loss.")
  853. # Decide if this is a failure or just a warning. Let's treat it as OK for now.
  854. return True
  855. all_checks_passed = True
  856. for file_name, metrics in frame_loss_results.items():
  857. if metrics.get("valid", False):
  858. loss_rate = metrics.get("loss_rate", np.nan)
  859. if pd.isna(loss_rate):
  860. print(f" {file_name}: Loss rate could not be calculated.")
  861. # Decide if NaN loss rate is acceptable.
  862. elif loss_rate > max_allowed_loss_rate:
  863. print(
  864. f" FAIL: {file_name} - Frame loss rate ({loss_rate * 100:.2f}%) exceeds threshold ({max_allowed_loss_rate * 100:.1f}%).")
  865. all_checks_passed = False
  866. else:
  867. print(f" PASS: {file_name} - Frame loss rate ({loss_rate * 100:.2f}%) is acceptable.")
  868. else:
  869. print(
  870. f" WARN: {file_name} - Frame loss analysis could not be completed ({metrics.get('message', 'Unknown reason')}).")
  871. # Decide if inability to analyze is a failure. Let's allow it for now.
  872. print(f"--- Data Quality Precheck {'PASSED' if all_checks_passed else 'FAILED'} ---")
  873. return all_checks_passed
  874. # --- Final Preprocessing Step ---
  875. class FinalDataProcessor:
  876. """Merges processed CSVs, adds curvature, and handles traffic lights."""
  877. def __init__(self, config: Config):
  878. self.config = config
  879. self.output_dir = config.output_dir
  880. def find_closet_idx(self, time, df) -> int:
  881. series = (df['simTime'] - time).abs()
  882. index = (df['simTime'] - time).abs().idxmin()
  883. return (df['simTime'] - time).abs().idxmin()
  884. def process(self) -> bool:
  885. """执行最终数据合并和处理步骤。"""
  886. print("--- Starting Final Data Processing ---")
  887. try:
  888. # 1. Load main object state data
  889. obj_state_path = self.output_dir / OUTPUT_CSV_OBJSTATE
  890. lane_map_path = self.output_dir / OUTPUT_CSV_LANEMAP
  891. if not obj_state_path.exists():
  892. print(f"Error: Required input file not found: {obj_state_path}")
  893. return False
  894. # Load and process data
  895. df_object = pd.read_csv(obj_state_path, dtype={"simTime": float}, low_memory=False)
  896. # Process and merge data
  897. df_merged = self._merge_optional_data(df_object)
  898. # Save final merged file directly to output directory
  899. merged_csv_path = self.output_dir / OUTPUT_CSV_MERGED
  900. print(f'merged_csv_path:{merged_csv_path}')
  901. df_merged.to_csv(merged_csv_path, index=False, float_format='%.6f')
  902. print(f"Successfully created final merged file: {merged_csv_path}")
  903. # Clean up intermediate files
  904. if obj_state_path.exists():
  905. obj_state_path.unlink()
  906. print("--- Final Data Processing Finished ---")
  907. return True
  908. except Exception as e:
  909. print(f"An unexpected error occurred during final data processing: {e}")
  910. import traceback
  911. traceback.print_exc()
  912. return False
  913. def _merge_optional_data(self, df_object: pd.DataFrame) -> pd.DataFrame:
  914. """加载和合并可选数据"""
  915. df_merged = df_object.copy()
  916. # --- 合并 EgoMap ---
  917. egomap_path = self.output_dir / OUTPUT_CSV_EGOMAP
  918. if egomap_path.exists() and egomap_path.stat().st_size > 0:
  919. try:
  920. df_ego = pd.read_csv(egomap_path, dtype={"simTime": float})
  921. # 删除 simFrame 列,因为使用主数据的 simFrame
  922. if 'simFrame' in df_ego.columns:
  923. df_ego = df_ego.drop(columns=['simFrame'])
  924. # 按时间和ID排序
  925. df_ego.sort_values(['simTime', 'playerId'], inplace=True)
  926. df_merged.sort_values(['simTime', 'playerId'], inplace=True)
  927. # 使用 merge_asof 进行就近合并,不包括 simFrame
  928. # df_merged = pd.merge_asof(
  929. # df_merged,
  930. # df_ego,
  931. # on='simTime',
  932. # by='playerId',
  933. # direction='nearest',
  934. # tolerance=0.01 # 10ms tolerance
  935. # )
  936. df_merged = pd.merge(
  937. df_merged,
  938. df_ego,
  939. on=["simTime", "playerId"],
  940. how="left",
  941. suffixes=("", "_map"),
  942. )
  943. if {"posX_map", "posY_map", "posH_map"}.issubset(df_merged.columns):
  944. df_merged.drop(columns=["posX_map", "posY_map", "posH_map"], inplace=True)
  945. print("EgoMap data merged.")
  946. except Exception as e:
  947. print(f"Warning: Could not merge EgoMap data from {egomap_path}: {e}")
  948. # --- Merge Function ---
  949. function_path = self.output_dir / OUTPUT_CSV_FUNCTION
  950. if function_path.exists() and function_path.stat().st_size > 0:
  951. try:
  952. df_function = pd.read_csv(function_path, dtype={"timestamp": float}, low_memory=False).drop_duplicates()
  953. # 删除 simFrame 列
  954. if 'simFrame' in df_function.columns:
  955. df_function = df_function.drop(columns=['simFrame'])
  956. if 'simTime' in df_function.columns:
  957. df_function['simTime'] = df_function['simTime'].round(2)
  958. df_function['time1'] = df_function['simTime'].apply(lambda x: self.find_closet_idx(x, df_merged))
  959. common_cols = list(set(df_merged.columns) & set(df_function.columns))
  960. df_function.drop(columns=common_cols, inplace=True, errors='ignore')
  961. df_merged = df_merged.merge(df_function, right_on='time1', left_index=True, how='left')
  962. df_merged.drop(columns=['time1'], inplace=True)
  963. df_merged.reset_index(drop=True, inplace=True)
  964. print("Function data merged.")
  965. else:
  966. print("Warning: 'simTime' column not found in Function.csv. Cannot merge.")
  967. except Exception as e:
  968. print(f"Warning: Could not merge Function data from {function_path}: {e}")
  969. else:
  970. print("Function data not found or empty, skipping merge.")
  971. # --- Merge OBU ---
  972. obu_path = self.output_dir / OUTPUT_CSV_OBU
  973. if obu_path.exists() and obu_path.stat().st_size > 0:
  974. try:
  975. df_obu = pd.read_csv(obu_path, dtype={"simTime": float}, low_memory=False).drop_duplicates()
  976. # 删除 simFrame 列
  977. if 'simFrame' in df_obu.columns:
  978. df_obu = df_obu.drop(columns=['simFrame'])
  979. df_obu['time2'] = df_obu['simTime'].apply(lambda x: self.find_closet_idx(x, df_merged))
  980. common_cols = list(set(df_merged.columns) & set(df_obu.columns))
  981. df_obu.drop(columns=common_cols, inplace=True, errors='ignore')
  982. df_merged = df_merged.merge(df_obu, right_on = 'time2', left_index=True, how = 'left')
  983. df_merged.drop(columns=['time2'], inplace=True)
  984. print("OBU data merged.")
  985. except Exception as e:
  986. print(f"Warning: Could not merge OBU data from {obu_path}: {e}")
  987. else:
  988. print("OBU data not found or empty, skipping merge.")
  989. df_merged[['speedH', 'accelX', 'accelY']] = -df_merged[['speedH', 'accelX', 'accelY']]
  990. return df_merged
  991. def _process_trafficlight_data(self) -> pd.DataFrame:
  992. """Processes traffic light JSON data if available."""
  993. # Check if json_path is provided and exists
  994. if not self.config.json_path:
  995. print("No traffic light JSON file provided. Skipping traffic light processing.")
  996. return pd.DataFrame()
  997. if not self.config.json_path.exists():
  998. print("Traffic light JSON file not found. Skipping traffic light processing.")
  999. return pd.DataFrame()
  1000. print(f"Processing traffic light data from: {self.config.json_path}")
  1001. valid_trafficlights = []
  1002. try:
  1003. with open(self.config.json_path, 'r', encoding='utf-8') as f:
  1004. # Read the whole file, assuming it's a JSON array or JSON objects per line
  1005. try:
  1006. # Attempt to read as a single JSON array
  1007. raw_data = json.load(f)
  1008. if not isinstance(raw_data, list):
  1009. raw_data = [raw_data] # Handle case of single JSON object
  1010. except json.JSONDecodeError:
  1011. # If fails, assume JSON objects per line
  1012. f.seek(0) # Reset file pointer
  1013. raw_data = [json.loads(line) for line in f if line.strip()]
  1014. for entry in raw_data:
  1015. # Normalize entry if it's a string containing JSON
  1016. if isinstance(entry, str):
  1017. try:
  1018. entry = json.loads(entry)
  1019. except json.JSONDecodeError:
  1020. print(f"Warning: Skipping invalid JSON string in traffic light data: {entry[:100]}...")
  1021. continue
  1022. # Safely extract data using .get()
  1023. intersections = entry.get('intersections', [])
  1024. if not isinstance(intersections, list): continue # Skip if not a list
  1025. for intersection in intersections:
  1026. if not isinstance(intersection, dict): continue
  1027. timestamp_ms = intersection.get('intersectionTimestamp', 0)
  1028. sim_time = round(int(timestamp_ms) / 1000, 2) # Convert ms to s and round
  1029. phases = intersection.get('phases', [])
  1030. if not isinstance(phases, list): continue
  1031. for phase in phases:
  1032. if not isinstance(phase, dict): continue
  1033. phase_id = phase.get('phaseId', 0)
  1034. phase_states = phase.get('phaseStates', [])
  1035. if not isinstance(phase_states, list): continue
  1036. for phase_state in phase_states:
  1037. if not isinstance(phase_state, dict): continue
  1038. # Check for startTime == 0 as per original logic
  1039. if phase_state.get('startTime') == 0:
  1040. light_state = phase_state.get('light', 0) # Extract light state
  1041. data = {
  1042. 'simTime': sim_time,
  1043. 'phaseId': phase_id,
  1044. 'stateMask': light_state,
  1045. # Add playerId for merging - assume applies to ego
  1046. 'playerId': PLAYER_ID_EGO
  1047. }
  1048. valid_trafficlights.append(data)
  1049. if not valid_trafficlights:
  1050. print("No valid traffic light states (with startTime=0) found in JSON.")
  1051. return pd.DataFrame()
  1052. df_trafficlights = pd.DataFrame(valid_trafficlights)
  1053. # Drop duplicates based on relevant fields
  1054. df_trafficlights.drop_duplicates(subset=['simTime', 'playerId', 'phaseId', 'stateMask'], keep='first',
  1055. inplace=True)
  1056. print(f"Processed {len(df_trafficlights)} unique traffic light state entries.")
  1057. return df_trafficlights
  1058. except json.JSONDecodeError as e:
  1059. print(f"Error decoding traffic light JSON file {self.config.json_path}: {e}")
  1060. return pd.DataFrame()
  1061. except Exception as e:
  1062. print(f"Unexpected error processing traffic light data: {e}")
  1063. return pd.DataFrame()
  1064. # --- Rosbag Processing ---
  1065. class RosbagProcessor:
  1066. """Extracts data from Rosbag files within a ZIP archive."""
  1067. # Mapping from filename parts to rostopics
  1068. ROSTOPIC_MAP = {
  1069. ('V2I', 'HazardousLocationW'): "/HazardousLocationWarning",
  1070. ('V2C', 'OtherVehicleRedLightViolationW'): "/c2v/GoThroughRadLight",
  1071. ('V2I', 'LeftTurnAssist'): "/LeftTurnAssistant",
  1072. ('V2V', 'LeftTurnAssist'): "/V2VLeftTurnAssistant",
  1073. ('V2I', 'RedLightViolationW'): "/SignalViolationWarning",
  1074. ('V2C', 'AbnormalVehicleW'): "/c2v/AbnormalVehicleWarnning",
  1075. ('V2C', 'SignalLightReminder'): "/c2v/TrafficLightInfo",
  1076. ('V2C', 'VulnerableRoadUserCollisionW'): "/c2v/VulnerableObject",
  1077. ('V2C', 'EmergencyVehiclesPriority'): "/c2v/EmergencyVehiclesPriority",
  1078. ('V2C', 'LitterW'): "/c2v/RoadSpillageWarning",
  1079. ('V2V', 'ForwardCollision'): "/V2VForwardCollisionWarning",
  1080. ('V2C', 'VisibilityW'): "/c2v/VisibilityWarinning",
  1081. ('V2V', 'EmergencyBrakeW'): "/V2VEmergencyBrakeWarning",
  1082. ('V2I', 'GreenLightOptimalSpeedAdvisory'): "/GreenLightOptimalSpeedAdvisory", # Check exact topic name
  1083. ('V2C', 'DynamicSpeedLimitingInformation'): "/c2v/DynamicSpeedLimit",
  1084. ('V2C', 'TrafficJamW'): "/c2v/TrafficJam",
  1085. ('V2C', 'DrivingLaneRecommendation'): "/c2v/LaneGuidance",
  1086. ('V2C', 'RampMerge'): "/c2v/RampMerging",
  1087. ('V2I', 'CooperativeIntersectionPassing'): "/CooperativeIntersectionPassing",
  1088. ('V2I', 'IntersectionCollisionW'): "/IntersectionCollisionWarning",
  1089. ('V2V', 'IntersectionCollisionW'): "/V2VIntersectionCollisionWarning",
  1090. ('V2V', 'BlindSpotW'): "/V2VBlindSpotWarning",
  1091. ('V2I', 'SpeedLimitW'): "/SpeedLimit",
  1092. ('V2I', 'VulnerableRoadUserCollisionW'): "/VulnerableRoadUserCollisionWarning",
  1093. ('V2I', 'CooperativeLaneChange'): "/CooperativeLaneChange",
  1094. ('V2V', 'CooperativeLaneChange'): "/V2VCooperativeLaneChange",
  1095. ('V2I', 'CooperativeVehicleMerge'): "/CooperativeVehicleMerge",
  1096. ('V2V', 'AbnormalVehicleW'): "/V2VAbnormalVehicleWarning",
  1097. ('V2V', 'ControlLossW'): "/V2VVehicleLossControlWarning",
  1098. ('V2V', 'EmergencyVehicleW'): '/V2VEmergencyVehicleWarning',
  1099. ('V2I', 'InVehicleSignage'): "/InVehicleSign",
  1100. ('V2V', 'DoNotPassW'): "/V2VDoNotPassWarning",
  1101. ('V2I', 'TrafficJamW'): "/TrafficJamWarning",
  1102. # Add more mappings as needed
  1103. }
  1104. def __init__(self, config: Config):
  1105. self.config = config
  1106. self.output_dir = config.output_dir
  1107. def _get_target_rostopic(self, zip_filename: str) -> Optional[str]:
  1108. """Determines the target rostopic based on keywords in the filename."""
  1109. for (kw1, kw2), topic in self.ROSTOPIC_MAP.items():
  1110. if kw1 in zip_filename and kw2 in zip_filename:
  1111. print(f"Identified target topic '{topic}' for {zip_filename}")
  1112. return topic
  1113. print(f"Warning: No specific rostopic mapping found for {zip_filename}.")
  1114. return None
  1115. def process_zip_for_rosbags(self) -> None:
  1116. """Finds, extracts, and processes rosbags from the ZIP file."""
  1117. print(f"--- Processing Rosbags in {self.config.zip_path} ---")
  1118. target_rostopic = self._get_target_rostopic(self.config.zip_path.stem)
  1119. if not target_rostopic:
  1120. print("Skipping Rosbag processing as no target topic was identified.")
  1121. with tempfile.TemporaryDirectory() as tmp_dir_str:
  1122. tmp_dir = Path(tmp_dir_str)
  1123. bag_files_extracted = []
  1124. try:
  1125. with zipfile.ZipFile(self.config.zip_path, 'r') as zip_ref:
  1126. for member in zip_ref.infolist():
  1127. # Extract Rosbag files
  1128. if 'Rosbag/' in member.filename and member.filename.endswith('.bag'):
  1129. try:
  1130. extracted_path = Path(zip_ref.extract(member, path=tmp_dir))
  1131. bag_files_extracted.append(extracted_path)
  1132. print(f"Extracted Rosbag: {extracted_path.name}")
  1133. except Exception as e:
  1134. print(f"Error extracting Rosbag {member.filename}: {e}")
  1135. # Extract HMIdata CSV files directly to output
  1136. elif 'HMIdata/' in member.filename and member.filename.endswith('.csv'):
  1137. try:
  1138. target_path = self.output_dir / Path(member.filename).name
  1139. with zip_ref.open(member) as source, open(target_path, "wb") as target:
  1140. shutil.copyfileobj(source, target)
  1141. print(f"Extracted HMI data: {target_path.name}")
  1142. except Exception as e:
  1143. print(f"Error extracting HMI data {member.filename}: {e}")
  1144. except zipfile.BadZipFile:
  1145. print(f"Error: Bad ZIP file provided: {self.config.zip_path}")
  1146. return
  1147. except FileNotFoundError:
  1148. print(f"Error: ZIP file not found: {self.config.zip_path}")
  1149. return
  1150. if not bag_files_extracted:
  1151. print("No Rosbag files found in the archive.")
  1152. # Attempt extraction of HMI/RDB anyway if needed (already done above)
  1153. return
  1154. # Process extracted bag files
  1155. for bag_path in bag_files_extracted:
  1156. print(f"Processing bag file: {bag_path.name}")
  1157. self._convert_bag_topic_to_csv(bag_path, target_rostopic)
  1158. print("--- Rosbag Processing Finished ---")
  1159. def _convert_bag_topic_to_csv(self, bag_file_path: Path, target_topic: str) -> None:
  1160. """Converts a specific topic from a single bag file to CSV."""
  1161. output_csv_path = self.output_dir / OUTPUT_CSV_OBU # Standard name for OBU data
  1162. try:
  1163. # Check if bagpy can handle Path object, else convert to str
  1164. bag_reader = bagreader(str(bag_file_path), verbose=False)
  1165. # Check if topic exists
  1166. available_topics = bag_reader.topic_table['Topics'].tolist() if hasattr(bag_reader,
  1167. 'topic_table') and bag_reader.topic_table is not None else []
  1168. if target_topic not in available_topics:
  1169. print(f"Target topic '{target_topic}' not found in {bag_file_path.name}. Available: {available_topics}")
  1170. # Clean up temporary bagpy-generated files if possible
  1171. df = pd.DataFrame(columns=['simTime', 'event_Type'])
  1172. if hasattr(bag_reader, 'data_folder') and Path(bag_reader.data_folder).exists():
  1173. shutil.rmtree(bag_reader.data_folder, ignore_errors=True)
  1174. else:
  1175. # Extract message data to a temporary CSV created by bagpy
  1176. temp_csv_path_str = bag_reader.message_by_topic(target_topic)
  1177. temp_csv_path = Path(temp_csv_path_str)
  1178. if not temp_csv_path.exists() or temp_csv_path.stat().st_size == 0:
  1179. print(
  1180. f"Warning: Bagpy generated an empty or non-existent CSV for topic '{target_topic}' from {bag_file_path.name}.")
  1181. return # Skip if empty
  1182. # Read the temporary CSV, process, and save to final location
  1183. df = pd.read_csv(temp_csv_path)
  1184. if df.empty:
  1185. print(f"Warning: Bagpy CSV for topic '{target_topic}' is empty after reading.")
  1186. return
  1187. # Clean columns: Drop 'Time', rename '*timestamp' -> 'simTime'
  1188. if 'Time' in df.columns:
  1189. df.drop(columns=['Time'], inplace=True)
  1190. rename_dict = {}
  1191. for col in df.columns:
  1192. if col.endswith('.timestamp'): # More specific match
  1193. rename_dict[col] = 'simTime'
  1194. elif col.endswith('event_type'): # As per original code
  1195. rename_dict[col] = 'event_Type'
  1196. # Add other renames if necessary
  1197. df.rename(columns=rename_dict, inplace=True)
  1198. # Ensure simTime is float and rounded (optional, do if needed for merging)
  1199. if 'simTime' in df.columns:
  1200. df['simTime'] = pd.to_numeric(df['simTime'], errors='coerce').round(2) # Example rounding
  1201. # Save processed data
  1202. df.to_csv(output_csv_path, index=False, float_format='%.6f')
  1203. print(f"Saved processed OBU data to: {output_csv_path}")
  1204. except ValueError as ve:
  1205. # Catch potential Bagpy internal errors if topic doesn't contain messages
  1206. print(
  1207. f"ValueError processing bag {bag_file_path.name} (Topic: {target_topic}): {ve}. Topic might be empty.")
  1208. except ImportError as ie:
  1209. print(
  1210. f"ImportError during bag processing: {ie}. Ensure all ROS dependencies are installed if needed by bagpy.")
  1211. except Exception as e:
  1212. print(f"Error processing bag file {bag_file_path.name} (Topic: {target_topic}): {e}")
  1213. import traceback
  1214. traceback.print_exc() # More details on unexpected errors
  1215. finally:
  1216. # Clean up temporary files/folders created by bagpy
  1217. if 'temp_csv_path' in locals() and temp_csv_path.exists():
  1218. try:
  1219. temp_csv_path.unlink() # Delete the specific CSV
  1220. except OSError as ose:
  1221. print(f"Warning: Could not delete bagpy temp csv {temp_csv_path}: {ose}")
  1222. if 'bag_reader' in locals() and hasattr(bag_reader, 'data_folder'):
  1223. bagpy_folder = Path(bag_reader.data_folder)
  1224. if bagpy_folder.exists() and bagpy_folder.is_dir():
  1225. try:
  1226. shutil.rmtree(bagpy_folder, ignore_errors=True) # Delete the folder bagpy made
  1227. except OSError as ose:
  1228. print(f"Warning: Could not delete bagpy temp folder {bagpy_folder}: {ose}")
  1229. # --- Utility Functions ---
  1230. def get_base_path() -> Path:
  1231. """Gets the base path of the script or executable."""
  1232. if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
  1233. # Running in a PyInstaller bundle
  1234. return Path(sys._MEIPASS)
  1235. else:
  1236. # Running as a normal script
  1237. return Path(__file__).parent.resolve()
  1238. def run_cpp_engine(config: Config):
  1239. """Runs the external C++ preprocessing engine."""
  1240. if not config.engine_path or not config.map_path:
  1241. print("C++ engine path or map path not configured. Skipping C++ engine execution.")
  1242. return True # Return True assuming it's optional or handled elsewhere
  1243. engine_cmd = [
  1244. str(config.engine_path),
  1245. str(config.map_path),
  1246. str(config.output_dir),
  1247. str(config.x_offset),
  1248. str(config.y_offset)
  1249. ]
  1250. print(f"--- Running C++ Preprocessing Engine ---")
  1251. print(f"Command: {' '.join(engine_cmd)}")
  1252. try:
  1253. result = subprocess.run(
  1254. engine_cmd,
  1255. check=True, # Raise exception on non-zero exit code
  1256. capture_output=True, # Capture stdout/stderr
  1257. text=True, # Decode output as text
  1258. cwd=config.engine_path.parent # Run from the engine's directory? Or script's? Adjust if needed.
  1259. )
  1260. print("C++ Engine Output:")
  1261. print(result.stdout)
  1262. if result.stderr:
  1263. print("C++ Engine Error Output:")
  1264. print(result.stderr)
  1265. print("--- C++ Engine Finished Successfully ---")
  1266. return True
  1267. except FileNotFoundError:
  1268. print(f"Error: C++ engine executable not found at {config.engine_path}.")
  1269. return False
  1270. except subprocess.CalledProcessError as e:
  1271. print(f"Error: C++ engine failed with exit code {e.returncode}.")
  1272. print("C++ Engine Output (stdout):")
  1273. print(e.stdout)
  1274. print("C++ Engine Output (stderr):")
  1275. print(e.stderr)
  1276. return False
  1277. except Exception as e:
  1278. print(f"An unexpected error occurred while running the C++ engine: {e}")
  1279. return False
  1280. if __name__ == "__main__":
  1281. pass