123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737 |
- import zipfile
- import sqlite3
- import csv
- import tempfile
- from pathlib import Path
- from typing import List, Dict, Tuple, Optional, Any, NamedTuple
- import cantools
- import os
- import subprocess
- import numpy as np
- import pandas as pd
- from collections import Counter
- from datetime import datetime
- import argparse
- import sys
- from pyproj import Proj
- from bagpy.bagreader import bagreader
- import shutil
- import json
- from dataclasses import dataclass, field
- PLAYER_ID_EGO = int(1)
- PLAYER_ID_OBJ = int(2)
- DEFAULT_TYPE = int(1)
- OUTPUT_CSV_OBJSTATE = "ObjState.csv"
- OUTPUT_CSV_TEMP_OBJSTATE = "ObjState_temp_intermediate.csv"
- OUTPUT_CSV_EGOSTATE = "EgoState.csv"
- OUTPUT_CSV_MERGED = "merged_ObjState.csv"
- OUTPUT_CSV_OBU = "OBUdata.csv"
- OUTPUT_CSV_LANEMAP = "LaneMap.csv"
- OUTPUT_CSV_EGOMAP = "EgoMap.csv"
- OUTPUT_CSV_FUNCTION = "Function.csv"
- ROADMARK_CSV = "RoadMark.csv"
- HD_LANE_CSV = "hd_lane.csv"
- HD_LINK_CSV = "hd_link.csv"
- @dataclass
- class Config:
- """Holds configuration paths and settings."""
- zip_path: Path
- output_path: Path
- json_path: Optional[Path]
- dbc_path: Optional[Path] = None
- engine_path: Optional[Path] = None
- map_path: Optional[Path] = None
- utm_zone: int = 51
- x_offset: float = 0.0
- y_offset: float = 0.0
-
- output_dir: Path = field(init=False)
- def __post_init__(self):
-
- self.output_dir = self.output_path
- self.output_dir.mkdir(parents=True, exist_ok=True)
- class ZipCSVProcessor:
- """Processes DB files within a ZIP archive to generate CSV data."""
-
- EGO_COLS_NEW = [
- "simTime", "simFrame", "playerId", "v", "speedX", "speedY",
- "posH", "pitch", "roll", "roll_rate", "pitch_rate", "speedH", "posX", "posY", "accelX", "accelY", "accelZ",
- "travelDist", "composite_v", "relative_dist", "x_relative_dist", "y_relative_dist", "type"
- ]
- OBJ_COLS_OLD_SUFFIXED = [
- "v_obj", "speedX_obj", "speedY_obj", "posH_obj", "pitch_obj", "roll_obj", "roll_rate_obj", "pitch_rate_obj", "speedH_obj",
- "posX_obj", "posY_obj", "accelX_obj", "accelY_obj", "accelZ_obj", "travelDist_obj"
- ]
- OBJ_COLS_MAPPING = {old: new for old, new in
- zip(OBJ_COLS_OLD_SUFFIXED, EGO_COLS_NEW[3:18])}
- def __init__(self, config: Config):
- self.config = config
- self.dbc = self._load_dbc(config.dbc_path)
- self.projection = Proj(proj='utm', zone=config.utm_zone, ellps='WGS84', preserve_units='m')
- self._init_table_config()
- self._init_keyword_mapping()
- def _load_dbc(self, dbc_path: Optional[Path]) -> Optional[cantools.db.Database]:
- if not dbc_path or not dbc_path.exists():
- print("DBC path not provided or file not found.")
- return None
- try:
- return cantools.db.load_file(dbc_path)
- except Exception as e:
- print(f"DBC loading failed: {e}")
- return None
- def _init_table_config(self):
- """Initializes configurations for different table types."""
- self.table_config = {
- "gnss_table": self._get_gnss_config(),
- "can_table": self._get_can_config()
- }
- def _get_gnss_config(self):
-
- return {
- "output_columns": self.EGO_COLS_NEW,
- "mapping": {
- "simTime": ("second", "usecond"),
- "simFrame": "ID",
- "v": "speed",
- "speedY": "y_speed",
- "speedX": "x_speed",
- "posH": "yaw",
- "pitch": "tilt",
- "roll": "roll",
- "roll_rate": "roll_rate",
- "pitch_rate": "tilt_rate",
- "speedH": "yaw_rate",
- "posX": "latitude_dd",
- "posY": "longitude_dd",
- "accelX": "x_acceleration",
- "accelY": "y_acceleration",
- "accelZ": "z_acceleration",
- "travelDist": "total_distance",
-
- "composite_v": "speed",
- "relative_dist": "distance",
- "x_relative_dist": "x_distance",
- "y_relative_dist": "y_distance",
- "type": None
- },
- "db_columns": ["ID", "second", "usecond", "speed", "y_speed", "x_speed", "pitch_rate", "z_acceleration",
- "yaw", "tilt", "roll", "yaw_rate", "latitude_dd", "longitude_dd", "roll_rate", "total_distance",
- "x_acceleration", "y_acceleration", "total_distance", "distance", "x_distance", "y_distance"]
- }
- def _get_can_config(self):
-
- return {
- "mapping": {
-
- "v": "VUT_Speed_mps",
- "speedX": "VUT_Speed_x_mps",
- "speedY": "VUT_Speed_y_mps",
- "speedH": "VUT_Yaw_Rate",
- "posX": "VUT_GPS_Latitude",
- "posY": "VUT_GPS_Longitude",
- "posH": "VUT_Heading",
- "pitch": "VUT_Pitch",
- "roll": "VUT_Roll",
- "pitch_rate": None,
- "roll_rate": None,
- "accelX": "VUT_Acc_X",
- "accelY": "VUT_Acc_Y",
- "accelZ": "VUT_Acc_Z",
-
- "v_obj": "Speed_mps",
- "speedX_obj": "UFO_Speed_x_mps",
- "speedY_obj": "UFO_Speed_y_mps",
- "speedH_obj": "Yaw_Rate",
- "posX_obj": "GPS_Latitude",
- "posY_obj": "GPS_Longitude",
- "posH_obj": "Heading",
- "pitch_obj": None,
- "roll_obj": None,
- "pitch_rate_obj": None,
- "roll_rate_obj": None,
- "accelX_obj": "Acc_X",
- "accelY_obj": "Acc_Y",
- "accelZ_obj": "Acc_Z",
-
- "composite_v": "VUT_Rel_speed_long_mps",
- "relative_dist": "VUT_Dist_MRP_Abs",
- "x_relative_dist": "VUT_Dist_MRP_X",
- "y_relative_dist": "VUT_Dist_MRP_Y",
-
- "travelDist": None,
- "travelDist_obj": None
- },
- "db_columns": ["ID", "second", "usecond", "timestamp", "canid", "len", "frame"]
- }
- def _init_keyword_mapping(self):
- """Maps keywords in filenames to table configurations and output CSV names."""
- self.keyword_mapping = {
- "gnss": ("gnss_table", OUTPUT_CSV_OBJSTATE),
-
- "can2": ("can_table", OUTPUT_CSV_OBJSTATE),
- }
- def process_zip(self) -> None:
- """Extracts and processes DB files from the configured ZIP path."""
- print(f"Processing ZIP: {self.config.zip_path}")
- output_dir = self.config.output_dir
- try:
- with zipfile.ZipFile(self.config.zip_path, "r") as zip_ref:
- db_files_to_process = []
- for file_info in zip_ref.infolist():
-
- if 'CANdata/' in file_info.filename and file_info.filename.endswith('.db'):
-
- match = self._match_keyword(file_info.filename)
- if match:
- table_type, csv_name = match
- db_files_to_process.append((file_info, table_type, csv_name))
- if not db_files_to_process:
- print("No relevant DB files found in CANdata/ matching keywords.")
- return
-
- with tempfile.TemporaryDirectory() as tmp_dir_str:
- tmp_dir = Path(tmp_dir_str)
- for file_info, table_type, csv_name in db_files_to_process:
- print(f"Processing DB: {file_info.filename} for table type {table_type}")
- extracted_path = tmp_dir / Path(file_info.filename).name
- try:
-
- with zip_ref.open(file_info.filename) as source, open(extracted_path, "wb") as target:
- shutil.copyfileobj(source, target)
-
- self._process_db_file(extracted_path, output_dir, table_type, csv_name)
- except (sqlite3.Error, pd.errors.EmptyDataError, FileNotFoundError, KeyError) as e:
- print(f"Error processing DB file {file_info.filename}: {e}")
- except Exception as e:
- print(f"Unexpected error processing DB file {file_info.filename}: {e}")
- finally:
- if extracted_path.exists():
- extracted_path.unlink()
- except zipfile.BadZipFile:
- print(f"Error: Bad ZIP file: {self.config.zip_path}")
- except FileNotFoundError:
- print(f"Error: ZIP file not found: {self.config.zip_path}")
- except Exception as e:
- print(f"An error occurred during ZIP processing: {e}")
- def _match_keyword(self, filename: str) -> Optional[Tuple[str, str]]:
- """Finds the first matching keyword configuration for a filename."""
- for keyword, (table_type, csv_name) in self.keyword_mapping.items():
- if keyword in filename:
- return table_type, csv_name
- return None
- def _process_db_file(
- self, db_path: Path, output_dir: Path, table_type: str, csv_name: str
- ) -> None:
- """Connects to SQLite DB and processes the specified table type."""
- output_csv_path = output_dir / csv_name
- try:
-
- conn_str = f"file:{db_path}?mode=ro"
- with sqlite3.connect(conn_str, uri=True) as conn:
- cursor = conn.cursor()
- if not self._check_table_exists(cursor, table_type):
- print(f"Table '{table_type}' does not exist in {db_path.name}. Skipping.")
- return
- if self._check_table_empty(cursor, table_type):
- print(f"Table '{table_type}' in {db_path.name} is empty. Skipping.")
- return
- print(f"Exporting data from table '{table_type}' to {output_csv_path}")
- if table_type == "can_table":
- self._process_can_table_optimized(cursor, output_csv_path)
- elif table_type == "gnss_table":
-
- self._process_gnss_table(cursor, output_csv_path)
- else:
- print(f"Warning: No specific processor for table type '{table_type}'. Skipping.")
- except sqlite3.OperationalError as e:
- print(f"Database operational error for {db_path.name}: {e}. Check file integrity/permissions.")
- except sqlite3.DatabaseError as e:
- print(f"Database error connecting to {db_path.name}: {e}")
- except Exception as e:
- print(f"Unexpected error processing DB {db_path.name}: {e}")
- def _check_table_exists(self, cursor, table_name: str) -> bool:
- """Checks if a table exists in the database."""
- try:
- cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?;", (table_name,))
- return cursor.fetchone() is not None
- except sqlite3.Error as e:
- print(f"Error checking existence of table {table_name}: {e}")
- return False
- def _check_table_empty(self, cursor, table_name: str) -> bool:
- """Checks if a table is empty."""
- try:
- cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
- count = cursor.fetchone()[0]
- return count == 0
- except sqlite3.Error as e:
-
- print(f"Error checking if table {table_name} is empty: {e}")
- return True
- def _process_gnss_table(self, cursor, output_path: Path) -> None:
- """Processes gnss_table data and writes directly to CSV."""
- config = self.table_config["gnss_table"]
- db_columns = config["db_columns"]
- output_columns = config["output_columns"]
- mapping = config["mapping"]
- try:
- cursor.execute(f"SELECT {', '.join(db_columns)} FROM gnss_table")
- rows = cursor.fetchall()
- if not rows:
- print("No data found in gnss_table.")
- return
- processed_data = []
- for row in rows:
- row_dict = dict(zip(db_columns, row))
- record = {}
-
- record["simTime"] = round(row_dict.get("second", 0) + row_dict.get("usecond", 0) / 1e6, 2)
-
- for out_col in output_columns:
- if out_col == "simTime": continue
- if out_col == "playerId":
- record[out_col] = PLAYER_ID_EGO
- continue
- if out_col == "type":
- record[out_col] = DEFAULT_TYPE
- continue
- source_info = mapping.get(out_col)
- if source_info is None:
- record[out_col] = 0.0
- elif isinstance(source_info, tuple):
-
- record[out_col] = 0.0
- else:
- raw_value = row_dict.get(source_info)
- if raw_value is not None:
-
- if out_col == "posX":
-
- lat = row_dict.get(mapping["posX"])
- lon = row_dict.get(mapping["posY"])
- if lat is not None and lon is not None:
- proj_x, _ = self.projection(lon, lat)
- record[out_col] = round(proj_x, 6)
- else:
- record[out_col] = 0.0
- elif out_col == "posY":
-
- lat = row_dict.get(mapping["posX"])
- lon = row_dict.get(mapping["posY"])
- if lat is not None and lon is not None:
- _, proj_y = self.projection(lon, lat)
- record[out_col] = round(proj_y, 6)
- else:
- record[out_col] = 0.0
- elif out_col in ["composite_v", "relative_dist"]:
-
- record[out_col] = round(float(raw_value), 3) if source_info else 0.0
- else:
-
- try:
- record[out_col] = round(float(raw_value), 3)
- except (ValueError, TypeError):
- record[out_col] = raw_value
- else:
- record[out_col] = 0.0
- processed_data.append(record)
- if processed_data:
- df_final = pd.DataFrame(processed_data)[output_columns].iloc[::4].reset_index(drop=True)
- df_final['simFrame'] = np.arange(1, len(df_final) + 1)
- df_final.to_csv(output_path, index=False, encoding="utf-8")
- print(f"Successfully wrote GNSS data to {output_path}")
- else:
- print("No processable records found in gnss_table.")
- except sqlite3.Error as e:
- print(f"SQL error during GNSS processing: {e}")
- except Exception as e:
- print(f"Unexpected error during GNSS processing: {e}")
- def _process_can_table_optimized(self, cursor, output_path: Path) -> None:
- """Processes CAN data directly into the final merged DataFrame format."""
- config = self.table_config["can_table"]
- db_columns = config["db_columns"]
- mapping = config["mapping"]
- try:
- cursor.execute(f"SELECT {', '.join(db_columns)} FROM can_table")
- rows = cursor.fetchall()
- if not rows:
- print("No data found in can_table.")
- return
- all_records = []
- for row in rows:
- row_dict = dict(zip(db_columns, row))
-
- decoded_signals = self._decode_can_frame(row_dict)
-
- record = self._create_unified_can_record(row_dict, decoded_signals, mapping)
- if record:
- all_records.append(record)
- if not all_records:
- print("No CAN records could be successfully processed.")
- return
-
- df_raw = pd.DataFrame(all_records)
-
- df_ego = self._extract_vehicle_data(df_raw, PLAYER_ID_EGO)
- df_obj = self._extract_vehicle_data(df_raw, PLAYER_ID_OBJ)
-
- df_ego = self._project_coordinates(df_ego, 'posX', 'posY')
- df_obj = self._project_coordinates(df_obj, 'posX', 'posY')
-
- df_ego['type'] = DEFAULT_TYPE
- df_obj['type'] = DEFAULT_TYPE
-
-
- final_columns = self.EGO_COLS_NEW
- df_ego = df_ego.reindex(columns=final_columns).iloc[::4]
- df_obj = df_obj.reindex(columns=final_columns).iloc[::4]
-
- df_ego['simFrame'] = np.arange(1, len(df_ego)+1)
- df_obj['simFrame'] = np.arange(1, len(df_obj)+1)
-
- df_merged = pd.concat([df_ego, df_obj], ignore_index=True)
-
- df_merged.sort_values(by=["simTime", "simFrame", "playerId"], inplace=True)
- df_merged.fillna(0, inplace = True)
- df_merged.reset_index(drop=True, inplace=True)
-
-
-
-
- df_merged.to_csv(output_path, index=False, encoding="utf-8")
- print(f"Successfully processed CAN data and wrote merged output to {output_path}")
- except sqlite3.Error as e:
- print(f"SQL error during CAN processing: {e}")
- except KeyError as e:
- print(f"Key error during CAN processing - mapping issue? Missing key: {e}")
- except Exception as e:
- print(f"Unexpected error during CAN processing: {e}")
- import traceback
- traceback.print_exc()
- def _decode_can_frame(self, row_dict: Dict) -> Dict[str, Any]:
- """Decodes CAN frame using DBC file if available."""
- decoded_signals = {}
- if self.dbc and 'canid' in row_dict and 'frame' in row_dict and 'len' in row_dict:
- can_id = row_dict['canid']
- frame_bytes = bytes(row_dict['frame'][:row_dict['len']])
- try:
- message_def = self.dbc.get_message_by_frame_id(can_id)
- decoded_signals = message_def.decode(frame_bytes, decode_choices=False,
- allow_truncated=True)
- except KeyError:
-
- pass
- except ValueError as e:
- print(
- f"Warning: Decoding ValueError for CAN ID 0x{can_id:X} (length {row_dict['len']}, data: {frame_bytes.hex()}): {e}")
- except Exception as e:
- print(f"Warning: Error decoding CAN ID 0x{can_id:X}: {e}")
- return decoded_signals
- def _create_unified_can_record(self, row_dict: Dict, decoded_signals: Dict, mapping: Dict) -> Optional[
- Dict[str, Any]]:
- """Creates a single record combining DB fields and decoded signals based on mapping."""
- record = {}
- try:
-
- record["simTime"] = round(row_dict.get("second", 0) + row_dict.get("usecond", 0) / 1e6, 2)
- record["simFrame"] = row_dict.get("ID")
- record["canid"] = f"0x{row_dict.get('canid'):X}"
-
- for target_col, source_info in mapping.items():
- if target_col in ["simTime", "simFrame", "canid"]: continue
- if isinstance(source_info, tuple): continue
-
- signal_name = source_info
- if signal_name and signal_name in decoded_signals:
-
- raw_value = decoded_signals[signal_name]
- try:
-
-
- if isinstance(raw_value, (int, float)):
-
- if "Latitude" in target_col or "Longitude" in target_col:
- record[target_col] = float(raw_value)
- else:
- record[target_col] = round(float(raw_value), 6)
- else:
- record[target_col] = raw_value
- except (ValueError, TypeError):
- record[target_col] = raw_value
-
-
- return record
- except Exception as e:
- print(f"Error creating unified record for row {row_dict.get('ID')}: {e}")
- return None
- def _extract_vehicle_data(self, df_raw: pd.DataFrame, player_id: int) -> pd.DataFrame:
- """Extracts and renames columns for a specific vehicle (EGO or OBJ)."""
- df_vehicle = pd.DataFrame()
-
-
-
- df_vehicle_temps_ego = pd.DataFrame()
- df_vehicle_temps_obj = pd.DataFrame()
- if player_id == PLAYER_ID_EGO:
-
- ego_cols = {target: source for target, source in self.table_config['can_table']['mapping'].items()
- if source and not isinstance(source, tuple) and not target.endswith('_obj')}
- rename_map = {}
- select_cols_raw = []
- for target_col, source_info in ego_cols.items():
- if source_info:
- select_cols_raw.append(target_col)
- rename_map[target_col] = target_col
-
- relative_cols = ["composite_v", "relative_dist"]
- select_cols_raw.extend(relative_cols)
- for col in relative_cols:
- rename_map[col] = col
-
- df_vehicle_temp = df_raw[list(set(select_cols_raw) & set(df_raw.columns))]
- for col in df_vehicle_temp.columns:
- df_vehicle_temps_ego[col] = df_vehicle_temp[col].dropna().reset_index(drop=True)
- df_vehicle = pd.concat([df_vehicle, df_vehicle_temps_ego], axis=1)
- elif player_id == PLAYER_ID_OBJ:
-
- obj_cols = {target: source for target, source in self.table_config['can_table']['mapping'].items()
- if source and not isinstance(source, tuple) and target.endswith('_obj')}
- rename_map = {}
- select_cols_raw = []
- for target_col, source_info in obj_cols.items():
- if source_info:
- select_cols_raw.append(target_col)
-
- rename_map[target_col] = self.OBJ_COLS_MAPPING.get(target_col,
- target_col)
-
- df_vehicle_temp = df_raw[list(set(select_cols_raw) & set(df_raw.columns))]
- df_vehicle_temp.rename(columns=rename_map, inplace=True)
- for col in df_vehicle_temp.columns:
- df_vehicle_temps_obj[col] = df_vehicle_temp[col].dropna().reset_index(drop=True)
- df_vehicle = pd.concat([df_vehicle, df_vehicle_temps_obj], axis=1)
-
- if "composite_v" in df_raw.columns:
- df_vehicle["composite_v"] = df_raw["composite_v"].dropna().reset_index(drop=True)
- if "relative_dist" in df_raw.columns:
- df_vehicle["relative_dist"] = df_raw["relative_dist"].dropna().reset_index(drop=True)
-
-
-
-
- try:
- df_vehicle["simTime"] = np.round(np.linspace(df_raw["simTime"].tolist()[0]+28800, df_raw["simTime"].tolist()[0]+28800 + 0.01*(len(df_vehicle)), len(df_vehicle)), 2)
- df_vehicle["simFrame"] = np.arange(1, len(df_vehicle) + 1)
- df_vehicle["playerId"] = int(player_id)
- df_vehicle['playerId'] = pd.to_numeric(df_vehicle['playerId']).astype(int)
- df_vehicle["pitch_rate"] = df_vehicle["pitch"].diff() / df_vehicle["simTime"].diff()
- df_vehicle["roll_rate"] = df_vehicle["roll"].diff() / df_vehicle["simTime"].diff()
- except ValueError as ve:
- print(f"{ve}")
- except TypeError as te:
- print(f"{te}")
- except Exception as Ee:
- print(f"{Ee}")
- return df_vehicle
- def _project_coordinates(self, df: pd.DataFrame, lat_col: str, lon_col: str) -> pd.DataFrame:
- """Applies UTM projection to latitude and longitude columns."""
- if lat_col in df.columns and lon_col in df.columns:
-
- lat = pd.to_numeric(df[lat_col], errors='coerce')
- lon = pd.to_numeric(df[lon_col], errors='coerce')
- valid_coords = lat.notna() & lon.notna()
- if valid_coords.any():
- x, y = self.projection(lon[valid_coords].values, lat[valid_coords].values)
-
- df.loc[valid_coords, lat_col] = np.round(x, 6)
- df.loc[valid_coords, lon_col] = np.round(y, 6)
- df.loc[~valid_coords, [lat_col, lon_col]] = np.nan
- else:
-
- df[lat_col] = np.nan
- df[lon_col] = np.nan
-
- df.rename(columns={lat_col: 'posX', lon_col: 'posY'}, inplace=True)
- else:
-
- if 'posX' not in df.columns: df['posX'] = np.nan
- if 'posY' not in df.columns: df['posY'] = np.nan
- print(f"Warning: Latitude ('{lat_col}') or Longitude ('{lon_col}') columns not found for projection.")
- return df
- class PolynomialCurvatureFitting:
- """Calculates curvature and its derivative using polynomial fitting."""
- def __init__(self, lane_map_path: Path, degree: int = 3):
- self.lane_map_path = lane_map_path
- self.degree = degree
- self.data = self._load_data()
- if self.data is not None:
- self.points = self.data[["centerLine_x", "centerLine_y"]].values
- self.x_data, self.y_data = self.points[:, 0], self.points[:, 1]
- else:
- self.points = np.empty((0, 2))
- self.x_data, self.y_data = np.array([]), np.array([])
- def _load_data(self) -> Optional[pd.DataFrame]:
- """Loads lane map data safely."""
- if not self.lane_map_path.exists() or self.lane_map_path.stat().st_size == 0:
- print(f"Warning: LaneMap file not found or empty: {self.lane_map_path}")
- return None
- try:
- return pd.read_csv(self.lane_map_path)
- except pd.errors.EmptyDataError:
- print(f"Warning: LaneMap file is empty: {self.lane_map_path}")
- return None
- except Exception as e:
- print(f"Error reading LaneMap file {self.lane_map_path}: {e}")
- return None
- def curvature(self, coefficients: np.ndarray, x: float) -> float:
- """Computes curvature of the polynomial at x."""
- if len(coefficients) < 3:
- return 0.0
- first_deriv_coeffs = np.polyder(coefficients)
- second_deriv_coeffs = np.polyder(first_deriv_coeffs)
- dy_dx = np.polyval(first_deriv_coeffs, x)
- d2y_dx2 = np.polyval(second_deriv_coeffs, x)
- denominator = (1 + dy_dx ** 2) ** 1.5
- return np.abs(d2y_dx2) / denominator if denominator != 0 else np.inf
- def curvature_derivative(self, coefficients: np.ndarray, x: float) -> float:
- """Computes the derivative of curvature with respect to x."""
- if len(coefficients) < 4:
- return 0.0
- first_deriv_coeffs = np.polyder(coefficients)
- second_deriv_coeffs = np.polyder(first_deriv_coeffs)
- third_deriv_coeffs = np.polyder(second_deriv_coeffs)
- dy_dx = np.polyval(first_deriv_coeffs, x)
- d2y_dx2 = np.polyval(second_deriv_coeffs, x)
- d3y_dx3 = np.polyval(third_deriv_coeffs, x)
- denominator = (1 + dy_dx ** 2) ** 2.5
- if denominator == 0:
- return np.inf
- numerator = d3y_dx3 * (1 + dy_dx ** 2) - 3 * dy_dx * d2y_dx2 * d2y_dx2
-
-
-
-
- term1 = d3y_dx3 * (1 + dy_dx ** 2) ** (3 / 2)
- term2 = d2y_dx2 * (3 / 2) * (1 + dy_dx ** 2) ** (1 / 2) * (2 * dy_dx * d2y_dx2)
- numerator_dk_dx = term1 - term2
- denominator_dk_dx = (1 + dy_dx ** 2) ** 3
- if denominator_dk_dx == 0:
- return np.inf
-
- return numerator_dk_dx / denominator_dk_dx
-
-
-
-
- def polynomial_fit(
- self, x_window: np.ndarray, y_window: np.ndarray
- ) -> Tuple[Optional[np.ndarray], Optional[np.poly1d]]:
- """Performs polynomial fitting, handling potential rank warnings."""
- if len(x_window) <= self.degree:
- print(f"Warning: Window size {len(x_window)} is <= degree {self.degree}. Cannot fit.")
- return None, None
- try:
-
-
-
- coefficients = np.polyfit(x_window, y_window, self.degree)
- return coefficients, np.poly1d(coefficients)
- except np.RankWarning:
- print(f"Warning: Rank deficient fitting for window. Check data variability.")
-
-
-
-
-
- return None, None
- except Exception as e:
- print(f"Error during polynomial fit: {e}")
- return None, None
- def find_best_window(self, point: Tuple[float, float], window_size: int) -> Optional[int]:
- """Finds the start index of the window whose center is closest to the point."""
- if len(self.x_data) < window_size:
- print("Warning: Not enough data points for the specified window size.")
- return None
- x_point, y_point = point
- min_dist_sq = np.inf
- best_start_index = -1
-
-
- num_windows = len(self.x_data) - window_size + 1
- if num_windows <= 0: return None
- for start in range(num_windows):
- x_center = np.mean(self.x_data[start: start + window_size])
- y_center = np.mean(self.y_data[start: start + window_size])
- dist_sq = (x_point - x_center) ** 2 + (y_point - y_center) ** 2
- if dist_sq < min_dist_sq:
- min_dist_sq = dist_sq
- best_start_index = start
- return best_start_index if best_start_index != -1 else None
- def find_projection(
- self,
- x_target: float,
- y_target: float,
- polynomial: np.poly1d,
- x_range: Tuple[float, float],
- search_points: int = 100,
- ) -> Optional[Tuple[float, float, float]]:
- """Finds the approximate closest point on the polynomial within the x_range."""
- if x_range[1] <= x_range[0]: return None
- x_values = np.linspace(x_range[0], x_range[1], search_points)
- y_values = polynomial(x_values)
- distances_sq = (x_target - x_values) ** 2 + (y_target - y_values) ** 2
- if len(distances_sq) == 0: return None
- min_idx = np.argmin(distances_sq)
- min_distance = np.sqrt(distances_sq[min_idx])
- return x_values[min_idx], y_values[min_idx], min_distance
- def fit_and_project(
- self, points: np.ndarray, window_size: int
- ) -> List[Dict[str, Any]]:
- """Fits polynomial and calculates curvature for each point in the input array."""
- if self.data is None or len(self.x_data) < window_size:
- print("Insufficient LaneMap data for fitting.")
-
- return [
- {
- "projection": (np.nan, np.nan),
- "lightMask": 0,
- "curvHor": np.nan,
- "curvHorDot": np.nan,
- "laneOffset": np.nan,
- }
- ] * len(points)
- results = []
- if points.ndim != 2 or points.shape[1] != 2:
- raise ValueError("Input points must be a 2D numpy array with shape (n, 2).")
- for x_target, y_target in points:
- result = {
- "projection": (np.nan, np.nan),
- "lightMask": 0,
- "curvHor": np.nan,
- "curvHorDot": np.nan,
- "laneOffset": np.nan,
- }
- best_start = self.find_best_window((x_target, y_target), window_size)
- if best_start is None:
- results.append(result)
- continue
- x_window = self.x_data[best_start: best_start + window_size]
- y_window = self.y_data[best_start: best_start + window_size]
- coefficients, polynomial = self.polynomial_fit(x_window, y_window)
- if coefficients is None or polynomial is None:
- results.append(result)
- continue
- x_min, x_max = np.min(x_window), np.max(x_window)
- projection_result = self.find_projection(
- x_target, y_target, polynomial, (x_min, x_max)
- )
- if projection_result is None:
- results.append(result)
- continue
- proj_x, proj_y, min_distance = projection_result
- curv_hor = self.curvature(coefficients, proj_x)
- curv_hor_dot = self.curvature_derivative(coefficients, proj_x)
- result = {
- "projection": (round(proj_x, 6), round(proj_y, 6)),
- "lightMask": 0,
- "curvHor": round(curv_hor, 6),
- "curvHorDot": round(curv_hor_dot, 6),
- "laneOffset": round(min_distance, 6),
- }
- results.append(result)
- return results
- class DataQualityAnalyzer:
- """Analyzes data quality metrics, focusing on frame loss."""
- def __init__(self, df: Optional[pd.DataFrame] = None):
- self.df = df if df is not None and not df.empty else pd.DataFrame()
- def analyze_frame_loss(self) -> Dict[str, Any]:
- """Analyzes frame loss characteristics."""
- metrics = {
- "total_frames_data": 0,
- "unique_frames_count": 0,
- "min_frame": np.nan,
- "max_frame": np.nan,
- "expected_frames": 0,
- "dropped_frames_count": 0,
- "loss_rate": np.nan,
- "max_consecutive_loss": 0,
- "max_loss_start_frame": np.nan,
- "max_loss_end_frame": np.nan,
- "loss_intervals_distribution": {},
- "valid": False,
- "message": ""
- }
- if self.df.empty or 'simFrame' not in self.df.columns:
- metrics["message"] = "DataFrame is empty or 'simFrame' column is missing."
- return metrics
-
- frames_series = self.df['simFrame'].dropna().astype(int)
- metrics["total_frames_data"] = len(frames_series)
- if frames_series.empty:
- metrics["message"] = "No valid 'simFrame' data found after dropping NaN."
- return metrics
- unique_frames = sorted(frames_series.unique())
- metrics["unique_frames_count"] = len(unique_frames)
- if metrics["unique_frames_count"] < 2:
- metrics["message"] = "Less than two unique frames; cannot analyze loss."
- metrics["valid"] = True
- if metrics["unique_frames_count"] == 1:
- metrics["min_frame"] = unique_frames[0]
- metrics["max_frame"] = unique_frames[0]
- metrics["expected_frames"] = 1
- return metrics
- metrics["min_frame"] = unique_frames[0]
- metrics["max_frame"] = unique_frames[-1]
- metrics["expected_frames"] = metrics["max_frame"] - metrics["min_frame"] + 1
-
- frame_diffs = np.diff(unique_frames)
-
- gaps = frame_diffs[frame_diffs > 1]
- lost_frames_in_gaps = gaps - 1
- metrics["dropped_frames_count"] = int(lost_frames_in_gaps.sum())
- if metrics["expected_frames"] > 0:
- metrics["loss_rate"] = round(metrics["dropped_frames_count"] / metrics["expected_frames"], 4)
- else:
- metrics["loss_rate"] = 0.0
- if len(lost_frames_in_gaps) > 0:
- metrics["max_consecutive_loss"] = int(lost_frames_in_gaps.max())
-
- max_loss_indices = np.where(frame_diffs == metrics["max_consecutive_loss"] + 1)[0]
-
- max_loss_idx = max_loss_indices[0]
- metrics["max_loss_start_frame"] = unique_frames[max_loss_idx]
- metrics["max_loss_end_frame"] = unique_frames[max_loss_idx + 1]
-
- loss_counts = Counter(lost_frames_in_gaps)
- metrics["loss_intervals_distribution"] = {int(k): int(v) for k, v in loss_counts.items()}
- else:
- metrics["max_consecutive_loss"] = 0
- metrics["valid"] = True
- metrics["message"] = "Frame loss analysis complete."
- return metrics
- def get_all_csv_files(path: Path) -> List[Path]:
- """Gets all CSV files in path, excluding specific ones."""
- excluded_files = {OUTPUT_CSV_LANEMAP, ROADMARK_CSV}
- return [
- file_path
- for file_path in path.rglob("*.csv")
- if file_path.is_file() and file_path.name not in excluded_files
- ]
- def run_frame_loss_analysis_on_folder(path: Path) -> Dict[str, Dict[str, Any]]:
- """Runs frame loss analysis on all relevant CSV files in a folder."""
- analysis_results = {}
- csv_files = get_all_csv_files(path)
- if not csv_files:
- print(f"No relevant CSV files found in {path}")
- return analysis_results
- for file_path in csv_files:
- file_name = file_path.name
- if file_name in {OUTPUT_CSV_FUNCTION, OUTPUT_CSV_OBU}:
- print(f"Skipping frame analysis for: {file_name}")
- continue
- print(f"Analyzing frame loss for: {file_name}")
- if file_path.stat().st_size == 0:
- print(f"File {file_name} is empty. Skipping analysis.")
- analysis_results[file_name] = {"valid": False, "message": "File is empty."}
- continue
- try:
-
- df = pd.read_csv(file_path, usecols=['simFrame'], index_col=False,
- on_bad_lines='warn')
- analyzer = DataQualityAnalyzer(df)
- metrics = analyzer.analyze_frame_loss()
- analysis_results[file_name] = metrics
-
- if metrics["valid"]:
- print(f" Loss Rate: {metrics.get('loss_rate', np.nan) * 100:.2f}%, "
- f"Dropped: {metrics.get('dropped_frames_count', 'N/A')}, "
- f"Max Gap: {metrics.get('max_consecutive_loss', 'N/A')}")
- else:
- print(f" Analysis failed: {metrics.get('message')}")
- except pd.errors.EmptyDataError:
- print(f"File {file_name} contains no data after reading.")
- analysis_results[file_name] = {"valid": False, "message": "Empty data after read."}
- except ValueError as ve:
- print(f"ValueError processing file {file_name}: {ve}. Is 'simFrame' column present?")
- analysis_results[file_name] = {"valid": False, "message": f"ValueError: {ve}"}
- except Exception as e:
- print(f"Unexpected error processing file {file_name}: {e}")
- analysis_results[file_name] = {"valid": False, "message": f"Unexpected error: {e}"}
- return analysis_results
- def data_precheck(output_dir: Path, max_allowed_loss_rate: float = 0.20) -> bool:
- """Checks data quality, focusing on frame loss rate."""
- print(f"--- Running Data Quality Precheck on: {output_dir} ---")
- if not output_dir.exists() or not output_dir.is_dir():
- print(f"Error: Output directory does not exist: {output_dir}")
- return False
- try:
- frame_loss_results = run_frame_loss_analysis_on_folder(output_dir)
- except Exception as e:
- print(f"Critical error during frame loss analysis: {e}")
- return False
- if not frame_loss_results:
- print("Warning: No files were analyzed for frame loss.")
-
- return True
- all_checks_passed = True
- for file_name, metrics in frame_loss_results.items():
- if metrics.get("valid", False):
- loss_rate = metrics.get("loss_rate", np.nan)
- if pd.isna(loss_rate):
- print(f" {file_name}: Loss rate could not be calculated.")
-
- elif loss_rate > max_allowed_loss_rate:
- print(
- f" FAIL: {file_name} - Frame loss rate ({loss_rate * 100:.2f}%) exceeds threshold ({max_allowed_loss_rate * 100:.1f}%).")
- all_checks_passed = False
- else:
- print(f" PASS: {file_name} - Frame loss rate ({loss_rate * 100:.2f}%) is acceptable.")
- else:
- print(
- f" WARN: {file_name} - Frame loss analysis could not be completed ({metrics.get('message', 'Unknown reason')}).")
-
- print(f"--- Data Quality Precheck {'PASSED' if all_checks_passed else 'FAILED'} ---")
- return all_checks_passed
- class FinalDataProcessor:
- """Merges processed CSVs, adds curvature, and handles traffic lights."""
- def __init__(self, config: Config):
- self.config = config
- self.output_dir = config.output_dir
- def process(self) -> bool:
- """执行最终数据合并和处理步骤。"""
- print("--- Starting Final Data Processing ---")
- try:
-
- obj_state_path = self.output_dir / OUTPUT_CSV_OBJSTATE
- lane_map_path = self.output_dir / OUTPUT_CSV_LANEMAP
- if not obj_state_path.exists():
- print(f"Error: Required input file not found: {obj_state_path}")
- return False
-
- df_traffic = self._process_trafficlight_data()
- if not df_traffic.empty:
- traffic_csv_path = self.output_dir / "Traffic.csv"
- df_traffic.to_csv(traffic_csv_path, index=False, float_format='%.6f')
- print(f"Successfully created traffic light data file: {traffic_csv_path}")
-
- df_object = pd.read_csv(obj_state_path, dtype={"simTime": float}, low_memory=False)
- df_ego = df_object[df_object["playerId"] == 1]
- points = df_ego[["posX", "posY"]].values
- window_size = 4
- fitting_instance = PolynomialCurvatureFitting(lane_map_path)
- result_list = fitting_instance.fit_and_project(points, window_size)
- curvHor_values = [result["curvHor"] for result in result_list]
- curvature_change_value = [result["curvHorDot"] for result in result_list]
- min_distance = [result["laneOffset"] for result in result_list]
- indices = df_object[df_object["playerId"] == 1].index
- if len(indices) == len(curvHor_values):
- df_object.loc[indices, "lightMask"] = 0
- df_object.loc[indices, "curvHor"] = curvHor_values
- df_object.loc[indices, "curvHorDot"] = curvature_change_value
- df_object.loc[indices, "laneOffset"] = min_distance
- else:
- print("计算值的长度与 playerId == 1 的行数不匹配!")
-
- df_merged = self._merge_optional_data(df_object)
- df_merged[['speedH', 'accelX', 'accelY']] = -df_merged[['speedH', 'accelX', 'accelY']]
-
- merged_csv_path = self.output_dir / OUTPUT_CSV_MERGED
- print(f'merged_csv_path:{merged_csv_path}')
- df_merged.to_csv(merged_csv_path, index=False, float_format='%.6f')
- print(f"Successfully created final merged file: {merged_csv_path}")
-
-
-
- print("--- Final Data Processing Finished ---")
- return True
- except Exception as e:
- print(f"An unexpected error occurred during final data processing: {e}")
- import traceback
- traceback.print_exc()
- return False
- def _merge_optional_data(self, df_object: pd.DataFrame) -> pd.DataFrame:
- """加载和合并可选数据"""
- df_merged = df_object.copy()
-
- def clean_duplicate_columns(df):
-
- duplicate_cols = []
- base_cols = {}
-
- print(f"清理重复列前的列名: {df.columns.tolist()}")
- for col in df.columns:
- if col.endswith('_x') or col.endswith('_y'):
- base_name = col[:-2]
- if base_name not in base_cols:
- base_cols[base_name] = []
- base_cols[base_name].append(col)
-
- for base_name, cols in base_cols.items():
- if len(cols) > 1:
-
- is_identical = True
- first_col = cols[0]
- for col in cols[1:]:
- if not df[first_col].equals(df[col]):
- is_identical = False
- break
- if is_identical:
-
- df = df.rename(columns={first_col: base_name})
-
- for col in cols[1:]:
- duplicate_cols.append(col)
- print(f"列 {cols} 数据相同,保留为 {base_name}")
- else:
- print(f"列 {cols} 数据不同,保留所有列")
-
- if base_name == 'simTime' and 'simTime' not in df.columns:
- df = df.rename(columns={cols[0]: 'simTime'})
- print(f"将 {cols[0]} 重命名为 simTime")
-
- for col in cols[1:]:
- duplicate_cols.append(col)
-
- if duplicate_cols:
-
- if 'simTime' not in df.columns and any(col.startswith('simTime_') for col in duplicate_cols):
-
- for col in duplicate_cols[:]:
- if col.startswith('simTime_'):
- df = df.rename(columns={col: 'simTime'})
- duplicate_cols.remove(col)
- print(f"将 {col} 重命名为 simTime")
- break
- df = df.drop(columns=duplicate_cols)
- print(f"删除了重复列: {duplicate_cols}")
-
- print(f"清理重复列后的列名: {df.columns.tolist()}")
- return df
-
- egomap_path = self.output_dir / OUTPUT_CSV_EGOMAP
- if egomap_path.exists() and egomap_path.stat().st_size > 0:
- try:
- df_ego = pd.read_csv(egomap_path, dtype={"simTime": float})
- ego_column = ['posX', 'posY', 'posH']
- ego_new_column = ['posX_map', 'posY_map', 'posH_map']
- df_ego = df_ego.rename(columns = dict(zip(ego_column, ego_new_column)))
-
- if 'simFrame' in df_ego.columns:
- df_ego = df_ego.drop(columns=['simFrame'])
-
- print(f"合并 EgoMap 前 df_merged 的列: {df_merged.columns.tolist()}")
- print(f"df_ego 的列: {df_ego.columns.tolist()}")
-
- df_ego.sort_values(['simTime', 'playerId'], inplace=True)
- df_merged.sort_values(['simTime', 'playerId'], inplace=True)
-
- df_merged = pd.merge_asof(
- df_merged,
- df_ego,
- on='simTime',
- by='playerId',
- direction='nearest',
- tolerance=0.01
- )
-
- print(f"合并 EgoMap 后 df_merged 的列: {df_merged.columns.tolist()}")
-
- if 'simTime' not in df_merged.columns:
- if 'simTime_x' in df_merged.columns:
- df_merged.rename(columns={'simTime_x': 'simTime'}, inplace=True)
- print("将 simTime_x 重命名为 simTime")
- else:
- print("警告: 合并 EgoMap 后找不到 simTime 列!")
- df_merged = df_merged.drop(columns = ['posX_map', 'posY_map', 'posH_map'])
- print("EgoMap data merged.")
- except Exception as e:
- print(f"Warning: Could not merge EgoMap data from {egomap_path}: {e}")
- import traceback
- traceback.print_exc()
-
- df_merged = clean_duplicate_columns(df_merged)
-
- current_file_path = os.path.abspath(__file__)
- root_lane_csv_path1 = os.path.dirname(current_file_path)
- root_lane_csv_path2 = os.path.dirname(root_lane_csv_path1)
- root_lane_csv_path3 = os.path.dirname(root_lane_csv_path2)
- root_lane_csv_path4 = os.path.dirname(root_lane_csv_path3)
- lane_path = os.path.join(root_lane_csv_path4, "_internal")
- data_path = os.path.join(lane_path, "data_map")
- lane_csv_path = os.path.join(data_path, "hd_lane.csv")
- road_csv_path = os.path.join(data_path, "hd_link.csv")
- df_lane = pd.read_csv(lane_csv_path)
- column_to_read = ['link_type', 'link_coords']
- df_road = pd.read_csv(road_csv_path, usecols = column_to_read)
- df_road["simFrame"] = np.arange(1, len(df_road) + 1, 1)
-
- df_merged = pd.merge(
- df_merged,
- df_lane,
- on='lane_id',
- how = 'left'
- )
- df_merged = pd.merge(
- df_merged,
- df_road,
- on='simFrame',
- how = 'left'
- )
-
- traffic_path = self.output_dir / "Traffic.csv"
- if traffic_path.exists() and traffic_path.stat().st_size > 0:
- try:
- df_traffic = pd.read_csv(traffic_path, dtype={"simTime": float}, low_memory=False).drop_duplicates()
-
- if 'simFrame' in df_traffic.columns:
- df_traffic = df_traffic.drop(columns=['simFrame'])
-
- def get_direction_from_heading(heading):
-
- heading = heading % 360
- if heading > 180:
- heading -= 360
-
- if -45 <= heading <= 45:
- return 'N'
- elif 45 < heading <= 135:
- return 'E'
- elif -135 <= heading < -45:
- return 'W'
- else:
- return 'S'
-
- heading_col = 'posH'
- if heading_col not in df_merged.columns:
- if 'posH_x' in df_merged.columns:
- heading_col = 'posH_x'
- print(f"使用 {heading_col} 替代 posH")
- else:
- print(f"警告: 找不到航向角列 posH 或 posH_x")
- return df_merged
-
- df_merged['vehicle_direction'] = df_merged[heading_col].apply(get_direction_from_heading)
-
- phase_to_direction = {
- 1: 'S',
- 2: 'W',
- 3: 'N',
- 4: 'E',
- 5: 'S',
- 6: 'W',
- 7: 'S',
- 8: 'W',
- 9: 'N',
- 10: 'E',
- 11: 'N',
- 12: 'E',
- 13: 'S',
- 14: 'W',
- 15: 'N',
- 16: 'E'
- }
-
- trafficlight_to_direction = {
-
- 48100017: 'S',
- 48100038: 'S',
- 48100043: 'S',
- 48100030: 'S',
-
- 48100021: 'W',
- 48100039: 'W',
-
- 48100041: 'E',
- 48100019: 'E',
-
- 48100033: 'N',
- 48100018: 'N',
- 48100022: 'N'
- }
-
- df_traffic['time'] = df_traffic['simTime'].round(2).astype(float)
-
- if 'simTime' not in df_merged.columns:
- print("警告: 合并 Traffic 前 df_merged 中找不到 simTime 列!")
-
- if 'simTime_x' in df_merged.columns:
- df_merged.rename(columns={'simTime_x': 'simTime'}, inplace=True)
- print("将 simTime_x 重命名为 simTime")
- else:
- print("严重错误: 无法找到任何 simTime 相关列,无法继续合并!")
- return df_merged
- df_merged['time'] = df_merged['simTime'].round(2).astype(float)
-
- df_merged = pd.merge(df_merged, df_traffic, on=["time"], how="left")
-
- df_merged = clean_duplicate_columns(df_merged)
-
- trafficlight_col = 'trafficlight_id'
- if trafficlight_col not in df_merged.columns:
- if 'trafficlight_id_x' in df_merged.columns:
- trafficlight_col = 'trafficlight_id_x'
- print(f"使用 {trafficlight_col} 替代 trafficlight_id")
- else:
- print(f"警告: 找不到红绿灯ID列 trafficlight_id 或 trafficlight_id_x")
-
- def filter_relevant_traffic_light(row):
- if 'phaseId' not in row or pd.isna(row['phaseId']):
- return np.nan
-
- phase_id = int(row['phaseId']) if not pd.isna(row['phaseId']) else None
- if phase_id is None:
- return np.nan
- phase_direction = phase_to_direction.get(phase_id, None)
-
- if phase_direction == row['vehicle_direction']:
-
- relevant_ids = [tid for tid, direction in trafficlight_to_direction.items()
- if direction == phase_direction]
-
- if trafficlight_col in row and not pd.isna(row[trafficlight_col]) and row[trafficlight_col] in relevant_ids:
- return row[trafficlight_col]
- return np.nan
-
- df_merged['filtered_trafficlight_id'] = df_merged.apply(filter_relevant_traffic_light, axis=1)
-
- print(f"删除 time 列前 df_merged 的列: {df_merged.columns.tolist()}")
- df_merged.drop(columns=['time'], inplace=True)
- print(f"删除 time 列后 df_merged 的列: {df_merged.columns.tolist()}")
-
- if 'simTime' not in df_merged.columns:
- if 'simTime_x' in df_merged.columns:
- df_merged.rename(columns={'simTime_x': 'simTime'}, inplace=True)
- print("将 simTime_x 重命名为 simTime")
- else:
- print("警告: 处理 Traffic 数据后找不到 simTime 列!")
- print("Traffic light data merged and filtered.")
- except Exception as e:
- print(f"Warning: Could not merge Traffic data from {traffic_path}: {e}")
- import traceback
- traceback.print_exc()
- else:
- print("Traffic data not found or empty, skipping merge.")
-
- function_path = self.output_dir / OUTPUT_CSV_FUNCTION
- if function_path.exists() and function_path.stat().st_size > 0:
- try:
-
- print(f"正在读取 Function 数据: {function_path}")
- df_function = pd.read_csv(function_path, low_memory=False).drop_duplicates()
- print(f"Function 数据列名: {df_function.columns.tolist()}")
-
- if 'simFrame' in df_function.columns:
- df_function = df_function.drop(columns=['simFrame'])
-
- if 'simTime' in df_function.columns:
-
- try:
- df_function['simTime'] = pd.to_numeric(df_function['simTime'], errors='coerce')
- df_function = df_function.dropna(subset=['simTime'])
- df_function['time'] = df_function['simTime'].round(2)
-
- if 'simTime' in df_merged.columns:
- print(f"df_merged['simTime'] 的类型: {df_merged['simTime'].dtype}")
- print(f"df_merged['simTime'] 的前5个值: {df_merged['simTime'].head().tolist()}")
- df_merged['time'] = pd.to_numeric(df_merged['simTime'], errors='coerce').round(2)
-
- nan_count = df_merged['time'].isna().sum()
- if nan_count > 0:
- print(f"警告: 转换后有 {nan_count} 个 NaN 值,将删除这些行")
- df_merged = df_merged.dropna(subset=['time'])
-
- df_function['time'] = df_function['time'].astype(float)
- df_merged['time'] = df_merged['time'].astype(float)
- common_cols = list(set(df_merged.columns) & set(df_function.columns) - {'time'})
- df_function.drop(columns=common_cols, inplace=True, errors='ignore')
-
- df_merged = pd.merge(df_merged, df_function, on=["time"], how="left")
- df_merged.drop(columns=['time'], inplace=True)
- print("Function 数据合并成功。")
- else:
- print("警告: df_merged 中找不到 'simTime' 列,无法合并 Function 数据。")
-
- print(f"df_merged 的所有列: {df_merged.columns.tolist()}")
- except Exception as e:
- print(f"警告: 处理 Function.csv 中的 simTime 列时出错: {e}")
- import traceback
- traceback.print_exc()
- else:
- print(f"警告: Function.csv 中找不到 'simTime' 列。可用的列: {df_function.columns.tolist()}")
- except Exception as e:
- print(f"警告: 无法合并 Function 数据: {e}")
- import traceback
- traceback.print_exc()
- else:
- print(f"Function 数据文件不存在或为空: {function_path}")
-
- obu_path = self.output_dir / OUTPUT_CSV_OBU
- if obu_path.exists() and obu_path.stat().st_size > 0:
- try:
-
- print(f"正在读取 OBU 数据: {obu_path}")
- df_obu = pd.read_csv(obu_path, low_memory=False).drop_duplicates()
- print(f"OBU 数据列名: {df_obu.columns.tolist()}")
-
- if 'simFrame' in df_obu.columns:
- df_obu = df_obu.drop(columns=['simFrame'])
-
- if 'simTime' in df_obu.columns:
-
- try:
- df_obu['simTime'] = pd.to_numeric(df_obu['simTime'], errors='coerce')
- df_obu = df_obu.dropna(subset=['simTime'])
- df_obu['time'] = df_obu['simTime'].round(2)
-
- if 'simTime' in df_merged.columns:
- print(f"合并 OBU 前 df_merged['simTime'] 的类型: {df_merged['simTime'].dtype}")
- print(f"合并 OBU 前 df_merged['simTime'] 的前5个值: {df_merged['simTime'].head().tolist()}")
- df_merged['time'] = pd.to_numeric(df_merged['simTime'], errors='coerce').round(2)
-
- nan_count = df_merged['time'].isna().sum()
- if nan_count > 0:
- print(f"警告: 转换后有 {nan_count} 个 NaN 值,将删除这些行")
- df_merged = df_merged.dropna(subset=['time'])
-
- df_obu['time'] = df_obu['time'].astype(float)
- df_merged['time'] = df_merged['time'].astype(float)
- common_cols = list(set(df_merged.columns) & set(df_obu.columns) - {'time'})
- df_obu.drop(columns=common_cols, inplace=True, errors='ignore')
-
- df_merged = pd.merge(df_merged, df_obu, on=["time"], how="left")
- df_merged.drop(columns=['time'], inplace=True)
- print("OBU 数据合并成功。")
- else:
- print("警告: df_merged 中找不到 'simTime' 列,无法合并 OBU 数据。")
-
- print(f"df_merged 的所有列: {df_merged.columns.tolist()}")
- except Exception as e:
- print(f"警告: 处理 OBUdata.csv 中的 simTime 列时出错: {e}")
- import traceback
- traceback.print_exc()
- else:
- print(f"警告: OBUdata.csv 中找不到 'simTime' 列。可用的列: {df_obu.columns.tolist()}")
- except Exception as e:
- print(f"警告: 无法合并 OBU 数据: {e}")
- import traceback
- traceback.print_exc()
- else:
- print(f"OBU 数据文件不存在或为空: {obu_path}")
-
- df_merged = clean_duplicate_columns(df_merged)
- return df_merged
- def _process_trafficlight_data(self) -> pd.DataFrame:
- """Processes traffic light JSON data if available."""
-
- if not self.config.json_path:
- print("No traffic light JSON file provided. Skipping traffic light processing.")
- return pd.DataFrame()
- if not self.config.json_path.exists():
- print("Traffic light JSON file not found. Skipping traffic light processing.")
- return pd.DataFrame()
- print(f"Processing traffic light data from: {self.config.json_path}")
- valid_trafficlights = []
- try:
- with open(self.config.json_path, 'r', encoding='utf-8') as f:
-
- try:
-
- raw_data = json.load(f)
- if not isinstance(raw_data, list):
- raw_data = [raw_data]
- except json.JSONDecodeError:
-
- f.seek(0)
- raw_data = [json.loads(line) for line in f if line.strip()]
- for entry in raw_data:
-
- if isinstance(entry, str):
- try:
- entry = json.loads(entry)
- except json.JSONDecodeError:
- print(f"Warning: Skipping invalid JSON string in traffic light data: {entry[:100]}...")
- continue
-
- intersections = entry.get('intersections', [])
- if not isinstance(intersections, list): continue
- for intersection in intersections:
- if not isinstance(intersection, dict): continue
- timestamp_ms = intersection.get('intersectionTimestamp', 0)
- sim_time = round(int(timestamp_ms) / 1000, 2)
- phases = intersection.get('phases', [])
- if not isinstance(phases, list): continue
- for phase in phases:
- if not isinstance(phase, dict): continue
- phase_id = phase.get('phaseId', 0)
- phase_states = phase.get('phaseStates', [])
- if not isinstance(phase_states, list): continue
- for phase_state in phase_states:
- if not isinstance(phase_state, dict): continue
-
- if phase_state.get('startTime') == 0:
- light_state = phase_state.get('light', 0)
- data = {
- 'simTime': sim_time,
- 'phaseId': phase_id,
- 'stateMask': light_state,
-
- 'playerId': PLAYER_ID_EGO
- }
- valid_trafficlights.append(data)
- if not valid_trafficlights:
- print("No valid traffic light states (with startTime=0) found in JSON.")
- return pd.DataFrame()
- df_trafficlights = pd.DataFrame(valid_trafficlights)
-
- df_trafficlights.drop_duplicates(subset=['simTime', 'playerId', 'phaseId', 'stateMask'], keep='first',
- inplace=True)
- print(f"Processed {len(df_trafficlights)} unique traffic light state entries.")
-
- df_trafficlights = df_trafficlights.sort_values('simTime', ascending=True)
-
- print(f"交通灯数据时间范围: {df_trafficlights['simTime'].min()} 到 {df_trafficlights['simTime'].max()}")
- print(f"交通灯数据前5行时间: {df_trafficlights['simTime'].head().tolist()}")
- print(f"交通灯数据后5行时间: {df_trafficlights['simTime'].tail().tolist()}")
- return df_trafficlights
- except json.JSONDecodeError as e:
- print(f"Error decoding traffic light JSON file {self.config.json_path}: {e}")
- return pd.DataFrame()
- except Exception as e:
- print(f"Unexpected error processing traffic light data: {e}")
- return pd.DataFrame()
- class RosbagProcessor:
- """Extracts data from HMIdata files within a ZIP archive."""
- def __init__(self, config: Config):
- self.config = config
- self.output_dir = config.output_dir
- def process_zip_for_rosbags(self) -> None:
- """Finds, extracts, and processes rosbags from the ZIP file."""
- print(f"--- Processing HMIdata in {self.config.zip_path} ---")
- with tempfile.TemporaryDirectory() as tmp_dir_str:
- try:
- with zipfile.ZipFile(self.config.zip_path, 'r') as zip_ref:
- for member in zip_ref.infolist():
-
- if 'HMIdata/' in member.filename and member.filename.endswith('.csv'):
- try:
- target_path = self.output_dir / Path(member.filename).name
- with zip_ref.open(member) as source, open(target_path, "wb") as target:
- shutil.copyfileobj(source, target)
- print(f"Extracted HMI data: {target_path.name}")
- except Exception as e:
- print(f"Error extracting HMI data {member.filename}: {e}")
- except zipfile.BadZipFile:
- print(f"Error: Bad ZIP file provided: {self.config.zip_path}")
- return
- except FileNotFoundError:
- print(f"Error: ZIP file not found: {self.config.zip_path}")
- return
- print("--- HMIdata Processing Finished ---")
- def get_base_path() -> Path:
- """Gets the base path of the script or executable."""
- if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
-
- return Path(sys._MEIPASS)
- else:
-
- return Path(__file__).parent.resolve()
- def run_cpp_engine(config: Config):
- """Runs the external C++ preprocessing engine."""
- if not config.engine_path or not config.map_path:
- print("C++ engine path or map path not configured. Skipping C++ engine execution.")
- return True
- engine_cmd = [
- str(config.engine_path),
- str(config.map_path),
- str(config.output_dir),
- str(config.x_offset),
- str(config.y_offset)
- ]
- print(f"--- Running C++ Preprocessing Engine ---")
- print(f"Command: {' '.join(engine_cmd)}")
- try:
- result = subprocess.run(
- engine_cmd,
- check=True,
- capture_output=True,
- text=True,
- cwd=config.engine_path.parent
- )
- print("C++ Engine Output:")
- print(result.stdout)
- if result.stderr:
- print("C++ Engine Error Output:")
- print(result.stderr)
- print("--- C++ Engine Finished Successfully ---")
- return True
- except FileNotFoundError:
- print(f"Error: C++ engine executable not found at {config.engine_path}.")
- return False
- except subprocess.CalledProcessError as e:
- print(f"Error: C++ engine failed with exit code {e.returncode}.")
- print("C++ Engine Output (stdout):")
- print(e.stdout)
- print("C++ Engine Output (stderr):")
- print(e.stderr)
- return False
- except Exception as e:
- print(f"An unexpected error occurred while running the C++ engine: {e}")
- return False
- if __name__ == "__main__":
- pass
|