123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- from pathlib import Path
- from typing import Dict, Any, Optional
- import pandas as pd
- import numpy as np
- from pyproj import Proj
- from dataclasses import dataclass, field
- from typing import Dict, Optional
- from pathlib import Path
- import pandas as pd
- # from core.error_handler import ErrorHandler
- # from core.config_manager import get_config
- import sys
- import csv
- import os
- import zipfile
- import argparse
- from genpy import Message
- import shutil
- import tempfile
- import pandas as pd
- import subprocess
- import pandas as pd
- import numpy as np
- @dataclass
- class Config:
- """PGVIL处理器配置类"""
- zip_path: Path
- output_path: Path
- engine_path: Optional[Path] = None
- map_path: Optional[Path] = None
- utm_zone: int = 51 # Example UTM zone
- x_offset: float = 0.0
- y_offset: float = 0.0
- def __post_init__(self):
- # Use output_path directly as output_dir to avoid nested directories
- self.output_dir = self.output_path
- self.output_dir.mkdir(parents=True, exist_ok=True)
- def run_pgvil_engine(config: Config):
- """Runs the external C++ preprocessing engine."""
- if not config.engine_path or not config.map_path:
- print("C++ engine path or map path not configured. Skipping C++ engine execution.")
- return True # Return True assuming it's optional or handled elsewhere
- engine_cmd = [
- str(config.engine_path),
- str(config.map_path),
- str(config.output_dir),
- str(config.x_offset),
- str(config.y_offset)
- ]
- print(f"run_pgvil_engine: x={config.x_offset}, y={config.y_offset}")
- print(f"--- Running C++ Preprocessing Engine ---")
- print(f"Command: {' '.join(engine_cmd)}")
- try:
- result = subprocess.run(
- engine_cmd,
- check=True, # Raise exception on non-zero exit code
- capture_output=True, # Capture stdout/stderr
- text=True, # Decode output as text
- # cwd=config.engine_path.parent # Run from the engine's directory? Or script's? Adjust if needed.
- )
- print("C++ Engine Output:")
- print(result.stdout)
- if result.stderr:
- print("C++ Engine Error Output:")
- print(result.stderr)
- print("--- C++ Engine Finished Successfully ---")
- return True
- except FileNotFoundError:
- print(f"Error: C++ engine executable not found at {config.engine_path}.")
- return False
- except subprocess.CalledProcessError as e:
- print(f"Error: C++ engine failed with exit code {e.returncode}.")
- print("C++ Engine Output (stdout):")
- print(e.stdout)
- print("C++ Engine Output (stderr):")
- print(e.stderr)
- return False
- except Exception as e:
- print(f"An unexpected error occurred while running the C++ engine: {e}")
- return False
- def remove_conflicting_columns(df_object, df_csv_info):
- """
- 找到连个表中除(simTime, simFrame, or playerId) 都存在的列,删掉df_csv_info中对应的重复列
- """
- renamed = {}
- conflicting_columns = set(df_object.columns) & set(df_csv_info.columns)
- for col in conflicting_columns:
- # if col not in ["simTime", "simFrame", "playerId"]:
- if col not in ["simFrame", "playerId"]:
- del df_csv_info[col]
- return df_csv_info
- def align_simtime_by_simframe(df):
- # 创建一个映射,将simFrame映射到其对应的simTime代表值
- sim_frame_to_time_map = df.groupby('simFrame')['simTime'].first().to_dict()
- frames_sorted = sorted(sim_frame_to_time_map.keys())
- times_sorted = [sim_frame_to_time_map[f] for f in frames_sorted]
- times_head = times_sorted[:100]
- if len(times_head) > 2:
- diffs = np.diff(times_head)
- diffs_rounded = np.round(diffs, 3)
- values, counts = np.unique(diffs_rounded, return_counts=True)
- mode_dt = values[np.argmax(counts)]
- new_frame_to_time_map = {
- frame: round(times_sorted[0] + mode_dt * i, 3)
- for i, frame in enumerate(frames_sorted)
- }
- else:
- new_frame_to_time_map = sim_frame_to_time_map
- # 使用映射来更新DataFrame中的simTime值
- df['simTime'] = df['simFrame'].map(new_frame_to_time_map)
- # 检查simFrame列是否为空或包含非整数类型的数据
- if df['simFrame'].empty or not df['simFrame'].apply(lambda x: isinstance(x, (int, np.integer))).all():
- return df
- # 识别缺失的simFrame
- all_frames = np.arange(df['simFrame'].min(), df['simFrame'].max() + 1)
- missing_frames = set(all_frames) - set(df['simFrame'])
- new_rows = []
- # 填补缺失的simFrame
- for missing_frame in missing_frames:
- prev_frame = df[df['simFrame'] < missing_frame]['simFrame'].max()
- next_frame = df[df['simFrame'] > missing_frame]['simFrame'].min()
- if prev_frame is not None and next_frame is not None:
- prev_row = df[df['simFrame'] == prev_frame].iloc[0]
- next_row = df[df['simFrame'] == next_frame].iloc[0]
- # 计算平均值并创建新行
- new_row = prev_row.copy()
- new_row['simFrame'] = missing_frame
- for col in df.columns:
- if col not in ['simTime', 'simFrame']:
- new_row[col] = (prev_row[col] + next_row[col]) / 2
- # 更新simTime值
- new_row['simTime'] = new_frame_to_time_map.get(missing_frame, np.nan)
- # 将新行添加到DataFrame中
- new_rows.append(new_row)
- if new_rows:
- df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
- return df.sort_values(by='simFrame').reset_index(drop=True)
- def mergecopy_by_simtime(merged_df, external_df, ignore_cols, prefix=None, ):
- """
- 将external_df中所有的字段,基于nearest simTime,批量合并到merged_df中simtime相同的所有行中
- """
- useful_cols = [col for col in external_df.columns if col not in ignore_cols]
- for col in useful_cols:
- col_name = f"{prefix}_{col}" if prefix else col
- mapping = external_df.set_index('nearest_simTime')[col].to_dict()
- merged_df[col_name] = merged_df['simTime'].map(mapping)
- return merged_df
- def read_csv_with_filled_columns(file_path):
- try:
- # 确保 file_path 是字符串类型且有效
- if not isinstance(file_path, str):
- raise ValueError("提供的文件路径无效")
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"文件 {file_path} 不存在")
- # 使用 on_bad_lines='skip' 跳过格式错误的行
- df = pd.read_csv(file_path, on_bad_lines='skip') # 跳过格式错误的行
- # 强制填充缺失的列为 NaN,确保列数一致
- if not df.empty: # 确保 df 不为空
- df.fillna(np.nan, inplace=True) # 用 NaN 填充所有空值
- return df
- except Exception as e:
- print(f"读取 CSV 文件 {file_path} 时发生错误: {str(e)}")
- return pd.DataFrame() # 返回空的 DataFrame 以便继续处理
- def convert_heading(posH_rad):
- # 将弧度转换为角度
- angle_deg = np.degrees(posH_rad)
- # 逆时针东为0 => 顺时针北为0,相当于 new_angle = (90 - angle_deg) % 360
- heading_deg = (90 - angle_deg) % 360
- return round(heading_deg, 3)
- # def find_closest_time(sim_time, sim_time_to_index, tolerance=0.01):
- # # 找到最接近的时间点,并且该时间点的差异小于 tolerance
- # closest_time = min(sim_time_to_index.keys(), key=lambda y: abs(y - sim_time) if abs(y - sim_time) < tolerance else float('inf'))
- # return closest_time
- def find_closest_time(sim_time, sim_time_to_index, tolerance=0.04):
- # 计算所有 simTime 的差值
- diffs = {k: abs(k - sim_time) for k in sim_time_to_index.keys()}
- # Step 1: 优先在容差范围内找
- within_tolerance = {k: v for k, v in diffs.items() if v <= tolerance}
- if within_tolerance:
- return min(within_tolerance, key=within_tolerance.get)
- # Step 2: 容忍失败,强制返回最近值
- return min(diffs, key=diffs.get)
- def convert_numeric_columns(df):
- numeric_cols = df.select_dtypes(include=['number']).columns
- # 强制保留为 int 类型的列,其余为float
- int_columns = ["simFrame", "playerId", "type", "stateMask", "ctrlId", "ifwarning"]
- for col in numeric_cols:
- if col in int_columns and col in df.columns:
- df[col] = df[col].astype(int)
- else:
- df[col] = df[col].astype(float)
- return df
- def safe_convert_numeric(df, name):
- if df is None or df.empty:
- return df
- return convert_numeric_columns(df)
- def safe_align_simtime(df, name):
- if df is None or df.empty:
- return df
- return align_simtime_by_simframe(df)
- class PGVILProcessor:
- """PGVIL数据处理器,实现PGVIL特有的处理逻辑"""
- def __init__(self, config: Config):
- self.config = config
- def process_zip(self) -> Path:
- """处理输入ZIP文件,并返回输出目录路径
- zip_path
- output_dir
- """
- print(f"Processing ZIP: {self.config.zip_path}")
- zip_path = self.config.zip_path
- output_dir = Path(self.config.output_dir) # 将目录路径转换为Path对象
- # 创建以 ZIP 名称为子目录的提取目录
- zip_name = Path(zip_path).stem
- output_dir.mkdir(parents=True, exist_ok=True)
- # 提取HMIdata和RDBdata中的CSV文件
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
- for name in zip_ref.namelist():
- if ('HMIdata/' in name or 'RDBdata/' in name) and name.endswith('.csv'):
- # 原 zip 内的子路径最后一部分为文件名
- filename = os.path.basename(name)
- src = zip_ref.open(name)
- dst_path = output_dir / filename
- # print(f"提取 {name} 到 {dst_path}")
- with open(dst_path, 'wb') as dst_file:
- shutil.copyfileobj(src, dst_file)
- # 更新 config 中的输出目录为刚才的子目录
- self.config.output_dir = output_dir
- return output_dir
- def merge_csv_files(self):
- x_offset = self.config.x_offset
- y_offset = self.config.y_offset
- data_path = self.config.output_dir
- # 定义CSV文件路径
- try:
- obj_state_path = os.path.join(data_path, "ObjState.csv")
- ego_map_path = os.path.join(data_path, "EgoMap.csv")
- lane_map_path = os.path.join(data_path, "LaneMap.csv")
- laneINfo_path = os.path.join(data_path, "LaneInfo.csv")
- roadPos_path = os.path.join(data_path, "RoadPos.csv")
- vehicleystems_path = os.path.join(data_path, "VehicleSystems.csv")
- trafficlight_path = os.path.join(data_path, "TrafficLight.csv")
- function_path = os.path.join(data_path, "Function.csv")
- except FileNotFoundError:
- raise Exception("File not found")
- print("777777:")
- df_object = read_csv_with_filled_columns(obj_state_path)
- df_map_info = read_csv_with_filled_columns(ego_map_path)
- df_lane_map = read_csv_with_filled_columns(lane_map_path)
- df_laneINfo = read_csv_with_filled_columns(laneINfo_path)
- df_roadPos = read_csv_with_filled_columns(roadPos_path)
- df_vehicleystems = read_csv_with_filled_columns(vehicleystems_path)
- df_trafficlight = read_csv_with_filled_columns(trafficlight_path)
- df_function = None
- if os.path.exists(function_path):
- df_function = read_csv_with_filled_columns(function_path)
- # 对df_object中的posX和posY应用偏置
- if df_object is not None and not df_object.empty:
- df_object['posX'] += x_offset
- df_object['posY'] += y_offset
- # 对齐simTime和simFrame
- df_object = safe_align_simtime(df_object, "df_object")
- df_map_info = safe_align_simtime(df_map_info, "df_map_info")
- df_lane_map = safe_align_simtime(df_lane_map, "df_lane_map")
- df_laneINfo = safe_align_simtime(df_laneINfo, "df_laneINfo")
- df_roadPos = safe_align_simtime(df_roadPos, "df_roadPos")
- df_vehicleystems = safe_align_simtime(df_vehicleystems, "df_vehicleystems")
- df_trafficlight = safe_align_simtime(df_trafficlight, "df_trafficlight")
- print("0000000<<<<<<<<<<<<<<<<<<<<<")
- df_object = safe_convert_numeric(df_object, "df_object")
- df_map_info = safe_convert_numeric(df_map_info, "df_map_info")
- df_lane_map = safe_convert_numeric(df_lane_map, "df_lane_map")
- df_laneINfo = safe_convert_numeric(df_laneINfo, "df_laneINfo")
- df_roadPos = safe_convert_numeric(df_roadPos, "df_roadPos")
- df_vehicleystems = safe_convert_numeric(df_vehicleystems, "df_vehicleystems")
- df_trafficlight = safe_convert_numeric(df_trafficlight, "df_trafficlight")
- print("1111111<<<<<<<<<<<<<<<<<<<<<")
- if df_function is not None:
- df_function = safe_convert_numeric(df_function, "df_function")
- # 使用simTime, simFrame, playerId合并ObjState和df_roadPos
- del_roadPos = remove_conflicting_columns(df_object, df_roadPos)
- if df_object is not None and not df_object.empty and df_roadPos is not None and not df_roadPos.empty:
- merged_df = df_object.merge(df_roadPos, on=["simFrame", "playerId"], how="inner")
- # merged_df = pd.merge(df_object, del_roadPos, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
- # 创建一个映射,存储 df_object 中每个 simTime 值及其对应的行索引
- sim_time_to_index = {row['simTime']: idx for idx, row in merged_df.iterrows()}
- ego_df = merged_df[merged_df["playerId"] == 1].copy() # 拆成ego和other
- other_df = merged_df[merged_df["playerId"] != 1].copy()
- print("444444<<<<<<<<<<<<<<<<<<<<<")
- # ego merge del_trafficlight
- if df_trafficlight is not None and not df_trafficlight.empty:
- df_trafficlight = df_trafficlight[df_trafficlight["ctrlId"] == 3][
- ["simTime", "simFrame", "stateMask", "ctrlId"]].copy()
- df_trafficlight = df_trafficlight.drop_duplicates(subset=["simTime", "simFrame", "ctrlId"]).reset_index(
- drop=True)
- if df_trafficlight.empty:
- ego_df["stateMask"] = np.nan
- ego_df["ctrlId"] = np.nan
- else:
- ego_df = pd.merge(ego_df, df_trafficlight, on=["simTime", "simFrame"], how="left")
- else:
- ego_df["stateMask"] = np.nan
- ego_df["ctrlId"] = np.nan
- merged_df = pd.concat([ego_df, other_df], ignore_index=True)
- print("33333333<<<<<<<<<<<<<<<<<<<<<")
- if df_laneINfo is not None and not df_laneINfo.empty:
- del_laneINfo = remove_conflicting_columns(merged_df, df_laneINfo)
- # merged_df = pd.merge(merged_df, del_laneINfo, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
- merged_df = pd.merge(merged_df, del_laneINfo, on=["simFrame", "playerId"], how="left").drop_duplicates()
- if df_map_info is not None and not df_map_info.empty:
- del_ego_map = remove_conflicting_columns(merged_df, df_map_info)
- # merged_df = pd.merge(merged_df, del_ego_map, on=["simTime", "simFrame", "playerId"], how="left")
- merged_df = pd.merge(merged_df, del_ego_map, on=["simFrame", "playerId"], how="left")
- if df_lane_map is not None and not df_lane_map.empty:
- del_lane_map = remove_conflicting_columns(merged_df, df_lane_map)
- # merged_df = pd.merge(merged_df, del_lane_map, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
- merged_df = pd.merge(merged_df, del_lane_map, on=["simFrame", "playerId"], how="left").drop_duplicates()
- if df_vehicleystems is not None and not df_vehicleystems.empty:
- del_vehicleystems = remove_conflicting_columns(merged_df, df_vehicleystems)
- # merged_df = pd.merge(merged_df, del_vehicleystems, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
- merged_df = pd.merge(merged_df, del_vehicleystems, on=["simFrame", "playerId"],
- how="left").drop_duplicates()
- if df_function is not None and not df_function.empty:
- tolerance = 0.01
- df_function = df_function.sort_values(by='simTime').reset_index(drop=True) # 按simTime列排序
- # 找到 function.csv 中每个 simTime 值在 df_object 中的最近时间点
- df_function['nearest_simTime'] = df_function['simTime'].apply(
- lambda x: find_closest_time(x, sim_time_to_index, tolerance))
- df_function['nearest_index'] = df_function['nearest_simTime'].map(sim_time_to_index)
- # 确保df_function中的nearest_index为整数类型,且去掉NaN值
- df_function_renamed = df_function.rename(
- columns={'simTime': 'function_simTime'}) # 重命名 df_function 中的 simTime 列
- df_function_valid = df_function_renamed.dropna(subset=['nearest_index']).copy()
- df_function_valid['nearest_index'] = df_function_valid['nearest_index'].astype(int)
- ignore_cols = ['function_simTime', 'nearest_simTime', 'nearest_index']
- merged_df = mergecopy_by_simtime(merged_df, df_function_valid, ignore_cols)
- """
- def check_matching(df_function, sim_time_to_index, tolerance=0.01):
- #检查 function.csv 中的所有行是否都成功匹配
- # 计算每个 simTime 对应的 nearest_simTime
- df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
- # 检查是否有没有匹配到的行
- unmatched_rows = df_function[df_function['nearest_simTime'].isna()]
- if not unmatched_rows.empty:
- print(f"没有匹配上的行: {unmatched_rows}")
- else:
- print("所有行都成功匹配!")
- # 统计匹配上了的行数和没有匹配上的行数
- total_rows = len(df_function)
- matched_rows = len(df_function) - len(unmatched_rows)
- print(f"总行数: {total_rows}, 匹配上的行数: {matched_rows}, 没有匹配上的行数: {len(unmatched_rows)}")
- return unmatched_rows
- # 调用检查函数
- unmatched_rows = check_matching(df_function, sim_time_to_index, tolerance=0.01)
- # 获取最后一行的 simTime
- last_row_simtime = df_function.iloc[-1]['simTime']
- print(f"最后一行的 simTime: {last_row_simtime}")
- # 获取最后一行的 nearest_simTime
- last_row_nearest_simtime = df_function.iloc[-1]['nearest_simTime']
- print(f"最后一行的 nearest_simTime: {last_row_nearest_simtime}")
- """
- columns_to_convert = ['posH']
- for col in columns_to_convert:
- if col in merged_df.columns:
- merged_df[col] = merged_df[col].apply(convert_heading)
- # 将弧度/秒转换为度/秒
- rad_to_deg = 180 / np.pi
- for col in ['speedH', 'accelH']:
- if col in merged_df.columns:
- merged_df[col] = merged_df[col] * rad_to_deg
- if 'posP' in merged_df.columns:
- merged_df.rename(columns={'posP': 'pitch_rate'}, inplace=True)
- merged_df['pitch_rate'] = merged_df['pitch_rate'].apply(convert_heading)
- if 'posR' in merged_df.columns:
- merged_df.rename(columns={'posR': 'roll_rate'}, inplace=True)
- merged_df['roll_rate'] = merged_df['roll_rate'].apply(convert_heading)
- # 先使用 infer_objects 来确保类型一致
- merged_df = merged_df.infer_objects()
- merged_df.fillna(np.nan, inplace=True) # 确保空值填充为 NaN
- merged_df = merged_df.sort_values(by=["simTime", "simFrame", "playerId"]).reset_index(drop=True)
- merged_csv_path = Path(data_path) / "merged_ObjState.csv"
- # merged_df.to_csv(merged_csv_path, index=False,na_rep="NaN")
- merged_df.to_csv(merged_csv_path, index=False)
- return merged_csv_path
|