|
@@ -16,12 +16,13 @@ import zipfile
|
|
|
import argparse
|
|
|
from genpy import Message
|
|
|
import shutil
|
|
|
-import tempfile
|
|
|
+import tempfile
|
|
|
import pandas as pd
|
|
|
import subprocess
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
|
|
|
+
|
|
|
@dataclass
|
|
|
class Config:
|
|
|
"""PGVIL处理器配置类"""
|
|
@@ -39,6 +40,7 @@ class Config:
|
|
|
self.output_dir = self.output_path
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
+
|
|
|
def run_pgvil_engine(config: Config):
|
|
|
"""Runs the external C++ preprocessing engine."""
|
|
|
if not config.engine_path or not config.map_path:
|
|
@@ -52,6 +54,7 @@ def run_pgvil_engine(config: Config):
|
|
|
str(config.x_offset),
|
|
|
str(config.y_offset)
|
|
|
]
|
|
|
+ print(f"run_pgvil_engine: x={config.x_offset}, y={config.y_offset}")
|
|
|
|
|
|
print(f"--- Running C++ Preprocessing Engine ---")
|
|
|
print(f"Command: {' '.join(engine_cmd)}")
|
|
@@ -100,7 +103,7 @@ def remove_conflicting_columns(df_object, df_csv_info):
|
|
|
def align_simtime_by_simframe(df):
|
|
|
# 创建一个映射,将simFrame映射到其对应的simTime代表值
|
|
|
sim_frame_to_time_map = df.groupby('simFrame')['simTime'].first().to_dict()
|
|
|
-
|
|
|
+
|
|
|
# 使用映射来更新DataFrame中的simTime值
|
|
|
df['simTime'] = df['simFrame'].map(sim_frame_to_time_map)
|
|
|
# 检查simFrame列是否为空或包含非整数类型的数据
|
|
@@ -115,28 +118,29 @@ def align_simtime_by_simframe(df):
|
|
|
for missing_frame in missing_frames:
|
|
|
prev_frame = df[df['simFrame'] < missing_frame]['simFrame'].max()
|
|
|
next_frame = df[df['simFrame'] > missing_frame]['simFrame'].min()
|
|
|
-
|
|
|
+
|
|
|
if prev_frame is not None and next_frame is not None:
|
|
|
prev_row = df[df['simFrame'] == prev_frame].iloc[0]
|
|
|
next_row = df[df['simFrame'] == next_frame].iloc[0]
|
|
|
-
|
|
|
+
|
|
|
# 计算平均值并创建新行
|
|
|
new_row = prev_row.copy()
|
|
|
new_row['simFrame'] = missing_frame
|
|
|
for col in df.columns:
|
|
|
if col not in ['simTime', 'simFrame']:
|
|
|
new_row[col] = (prev_row[col] + next_row[col]) / 2
|
|
|
-
|
|
|
+
|
|
|
# 更新simTime值
|
|
|
new_row['simTime'] = sim_frame_to_time_map.get(missing_frame, np.nan)
|
|
|
-
|
|
|
+
|
|
|
# 将新行添加到DataFrame中
|
|
|
new_rows.append(new_row)
|
|
|
if new_rows:
|
|
|
df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
|
|
|
return df.sort_values(by='simFrame').reset_index(drop=True)
|
|
|
|
|
|
-def mergecopy_by_simtime(merged_df, external_df,ignore_cols,prefix=None,):
|
|
|
+
|
|
|
+def mergecopy_by_simtime(merged_df, external_df, ignore_cols, prefix=None, ):
|
|
|
"""
|
|
|
将external_df中所有的字段,基于nearest simTime,批量合并到merged_df中simtime相同的所有行中
|
|
|
"""
|
|
@@ -147,6 +151,7 @@ def mergecopy_by_simtime(merged_df, external_df,ignore_cols,prefix=None,):
|
|
|
merged_df[col_name] = merged_df['simTime'].map(mapping)
|
|
|
return merged_df
|
|
|
|
|
|
+
|
|
|
def read_csv_with_filled_columns(file_path):
|
|
|
try:
|
|
|
# 确保 file_path 是字符串类型且有效
|
|
@@ -154,10 +159,10 @@ def read_csv_with_filled_columns(file_path):
|
|
|
raise ValueError("提供的文件路径无效")
|
|
|
if not os.path.exists(file_path):
|
|
|
raise FileNotFoundError(f"文件 {file_path} 不存在")
|
|
|
-
|
|
|
+
|
|
|
# 使用 on_bad_lines='skip' 跳过格式错误的行
|
|
|
df = pd.read_csv(file_path, on_bad_lines='skip') # 跳过格式错误的行
|
|
|
-
|
|
|
+
|
|
|
# 强制填充缺失的列为 NaN,确保列数一致
|
|
|
if not df.empty: # 确保 df 不为空
|
|
|
df.fillna(np.nan, inplace=True) # 用 NaN 填充所有空值
|
|
@@ -166,17 +171,18 @@ def read_csv_with_filled_columns(file_path):
|
|
|
print(f"读取 CSV 文件 {file_path} 时发生错误: {str(e)}")
|
|
|
return pd.DataFrame() # 返回空的 DataFrame 以便继续处理
|
|
|
|
|
|
+
|
|
|
def convert_heading(posH_rad):
|
|
|
# 将弧度转换为角度
|
|
|
angle_deg = np.degrees(posH_rad)
|
|
|
# 逆时针东为0 => 顺时针北为0,相当于 new_angle = (90 - angle_deg) % 360
|
|
|
heading_deg = (90 - angle_deg) % 360
|
|
|
- return round(heading_deg,3)
|
|
|
+ return round(heading_deg, 3)
|
|
|
|
|
|
|
|
|
class PGVILProcessor:
|
|
|
"""PGVIL数据处理器,实现PGVIL特有的处理逻辑"""
|
|
|
-
|
|
|
+
|
|
|
def __init__(self, config: Config):
|
|
|
self.config = config
|
|
|
|
|
@@ -187,7 +193,7 @@ class PGVILProcessor:
|
|
|
"""
|
|
|
print(f"Processing ZIP: {self.config.zip_path}")
|
|
|
zip_path = self.config.zip_path
|
|
|
- output_dir = Path(self.config.output_dir)#将目录路径转换为Path对象
|
|
|
+ output_dir = Path(self.config.output_dir) # 将目录路径转换为Path对象
|
|
|
|
|
|
# 创建以 ZIP 名称为子目录的提取目录
|
|
|
zip_name = Path(zip_path).stem
|
|
@@ -208,7 +214,6 @@ class PGVILProcessor:
|
|
|
self.config.output_dir = output_dir
|
|
|
return output_dir
|
|
|
|
|
|
-
|
|
|
def merge_csv_files(self):
|
|
|
x_offset = self.config.x_offset
|
|
|
y_offset = self.config.y_offset
|
|
@@ -216,7 +221,7 @@ class PGVILProcessor:
|
|
|
# X_OFFSET = 258109.4239876
|
|
|
# Y_OFFSET = 4149969.964821
|
|
|
|
|
|
- # 定义CSV文件路径
|
|
|
+ # 定义CSV文件路径
|
|
|
try:
|
|
|
obj_state_path = os.path.join(data_path, "ObjState.csv")
|
|
|
ego_map_path = os.path.join(data_path, "EgoMap.csv")
|
|
@@ -228,15 +233,17 @@ class PGVILProcessor:
|
|
|
function_path = os.path.join(data_path, "Function.csv")
|
|
|
except FileNotFoundError:
|
|
|
raise Exception("File not found")
|
|
|
-
|
|
|
- df_object = read_csv_with_filled_columns(obj_state_path)
|
|
|
- df_map_info = read_csv_with_filled_columns(ego_map_path)
|
|
|
- df_lane_map = read_csv_with_filled_columns(lane_map_path)
|
|
|
- df_laneINfo = read_csv_with_filled_columns(laneINfo_path)
|
|
|
- df_roadPos = read_csv_with_filled_columns(roadPos_path)
|
|
|
+
|
|
|
+ df_object = read_csv_with_filled_columns(obj_state_path)
|
|
|
+ df_map_info = read_csv_with_filled_columns(ego_map_path)
|
|
|
+ df_lane_map = read_csv_with_filled_columns(lane_map_path)
|
|
|
+ df_laneINfo = read_csv_with_filled_columns(laneINfo_path)
|
|
|
+ df_roadPos = read_csv_with_filled_columns(roadPos_path)
|
|
|
df_vehicleystems = read_csv_with_filled_columns(vehicleystems_path)
|
|
|
df_trafficlight = read_csv_with_filled_columns(trafficlight_path)
|
|
|
- df_function = read_csv_with_filled_columns(function_path)
|
|
|
+ df_function = None
|
|
|
+ if os.path.exists(function_path):
|
|
|
+ df_function = read_csv_with_filled_columns(function_path)
|
|
|
|
|
|
# 检查并转换数值型列
|
|
|
def convert_numeric_columns(df):
|
|
@@ -251,8 +258,8 @@ class PGVILProcessor:
|
|
|
df_roadPos = convert_numeric_columns(df_roadPos)
|
|
|
df_vehicleystems = convert_numeric_columns(df_vehicleystems)
|
|
|
df_trafficlight = convert_numeric_columns(df_trafficlight)
|
|
|
- df_function = convert_numeric_columns(df_function)
|
|
|
-
|
|
|
+ if df_function is not None:
|
|
|
+ df_function = convert_numeric_columns(df_function)
|
|
|
|
|
|
# 对df_object中的posX和posY应用偏置
|
|
|
df_object['posX'] += x_offset
|
|
@@ -266,65 +273,72 @@ class PGVILProcessor:
|
|
|
df_roadPos = align_simtime_by_simframe(df_roadPos)
|
|
|
df_vehicleystems = align_simtime_by_simframe(df_vehicleystems)
|
|
|
df_trafficlight = align_simtime_by_simframe(df_trafficlight)
|
|
|
-
|
|
|
- del_ego_map = remove_conflicting_columns(df_object, df_map_info)#去掉重复的列
|
|
|
+
|
|
|
+ del_ego_map = remove_conflicting_columns(df_object, df_map_info) # 去掉重复的列
|
|
|
# 合并数据
|
|
|
merged_df = pd.merge(df_object, del_ego_map, on=["simTime", "simFrame", "playerId"], how="left")
|
|
|
|
|
|
-
|
|
|
# 使用simTime, simFrame, playerId合并ObjState和LaneMap\trafficlight\trafficlight
|
|
|
del_lane_map = remove_conflicting_columns(merged_df, df_lane_map)
|
|
|
- merged_df = pd.merge(merged_df, del_lane_map, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
|
|
|
+ merged_df = pd.merge(merged_df, del_lane_map, on=["simTime", "simFrame", "playerId"],
|
|
|
+ how="left").drop_duplicates()
|
|
|
del_laneINfo = remove_conflicting_columns(merged_df, df_laneINfo)
|
|
|
- merged_df = pd.merge(merged_df, del_laneINfo, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
|
|
|
+ merged_df = pd.merge(merged_df, del_laneINfo, on=["simTime", "simFrame", "playerId"],
|
|
|
+ how="left").drop_duplicates()
|
|
|
del_roadPos = remove_conflicting_columns(merged_df, df_roadPos)
|
|
|
- merged_df = pd.merge(merged_df, del_roadPos, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
|
|
|
+ merged_df = pd.merge(merged_df, del_roadPos, on=["simTime", "simFrame", "playerId"],
|
|
|
+ how="left").drop_duplicates()
|
|
|
del_trafficlight = remove_conflicting_columns(merged_df, df_trafficlight)
|
|
|
merged_df = pd.merge(merged_df, del_trafficlight, on=["simTime", "simFrame"], how="left").drop_duplicates()
|
|
|
del_vehicleystems = remove_conflicting_columns(merged_df, df_vehicleystems)
|
|
|
- merged_df = pd.merge(merged_df, del_vehicleystems, on=["simTime", "simFrame", "playerId"], how="left").drop_duplicates()
|
|
|
-
|
|
|
+ merged_df = pd.merge(merged_df, del_vehicleystems, on=["simTime", "simFrame", "playerId"],
|
|
|
+ how="left").drop_duplicates()
|
|
|
+
|
|
|
tolerance = 0.01
|
|
|
+
|
|
|
def find_closest_time(sim_time, sim_time_to_index, tolerance=0.01):
|
|
|
# 找到最接近的时间点,并且该时间点的差异小于 tolerance
|
|
|
- closest_time = min(sim_time_to_index.keys(), key=lambda y: abs(y - sim_time) if abs(y - sim_time) < tolerance else float('inf'))
|
|
|
+ closest_time = min(sim_time_to_index.keys(),
|
|
|
+ key=lambda y: abs(y - sim_time) if abs(y - sim_time) < tolerance else float('inf'))
|
|
|
return closest_time
|
|
|
|
|
|
- #创建一个映射,存储 df_object 中每个 simTime 值及其对应的行索引
|
|
|
+ # 创建一个映射,存储 df_object 中每个 simTime 值及其对应的行索引
|
|
|
sim_time_to_index = {row['simTime']: idx for idx, row in merged_df.iterrows()}
|
|
|
-
|
|
|
- df_function = df_function.sort_values(by='simTime').reset_index(drop=True)#按simTime列排序
|
|
|
- #找到 function.csv 中每个 simTime 值在 df_object 中的最近时间点
|
|
|
- df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
|
|
|
- df_function['nearest_index'] = df_function['nearest_simTime'].map(sim_time_to_index)
|
|
|
-
|
|
|
- #确保df_function中的nearest_index为整数类型,且去掉NaN值
|
|
|
- df_function_renamed = df_function.rename(columns={'simTime': 'function_simTime'})#重命名 df_function 中的 simTime 列
|
|
|
- df_function_valid = df_function_renamed.dropna(subset=['nearest_index']).copy()
|
|
|
- df_function_valid['nearest_index'] = df_function_valid['nearest_index'].astype(int)
|
|
|
-
|
|
|
- ignore_cols = ['function_simTime', 'nearest_simTime', 'nearest_index']
|
|
|
- merged_df = mergecopy_by_simtime(merged_df, df_function_valid,ignore_cols)
|
|
|
+ if df_function is not None:
|
|
|
+ df_function = df_function.sort_values(by='simTime').reset_index(drop=True) # 按simTime列排序
|
|
|
+ # 找到 function.csv 中每个 simTime 值在 df_object 中的最近时间点
|
|
|
+ df_function['nearest_simTime'] = df_function['simTime'].apply(
|
|
|
+ lambda x: find_closest_time(x, sim_time_to_index, tolerance))
|
|
|
+ df_function['nearest_index'] = df_function['nearest_simTime'].map(sim_time_to_index)
|
|
|
+
|
|
|
+ # 确保df_function中的nearest_index为整数类型,且去掉NaN值
|
|
|
+ df_function_renamed = df_function.rename(
|
|
|
+ columns={'simTime': 'function_simTime'}) # 重命名 df_function 中的 simTime 列
|
|
|
+ df_function_valid = df_function_renamed.dropna(subset=['nearest_index']).copy()
|
|
|
+ df_function_valid['nearest_index'] = df_function_valid['nearest_index'].astype(int)
|
|
|
+
|
|
|
+ ignore_cols = ['function_simTime', 'nearest_simTime', 'nearest_index']
|
|
|
+ merged_df = mergecopy_by_simtime(merged_df, df_function_valid, ignore_cols)
|
|
|
"""
|
|
|
def check_matching(df_function, sim_time_to_index, tolerance=0.01):
|
|
|
#检查 function.csv 中的所有行是否都成功匹配
|
|
|
-
|
|
|
+
|
|
|
# 计算每个 simTime 对应的 nearest_simTime
|
|
|
df_function['nearest_simTime'] = df_function['simTime'].apply(lambda x: find_closest_time(x, sim_time_to_index, tolerance))
|
|
|
-
|
|
|
+
|
|
|
# 检查是否有没有匹配到的行
|
|
|
unmatched_rows = df_function[df_function['nearest_simTime'].isna()]
|
|
|
if not unmatched_rows.empty:
|
|
|
print(f"没有匹配上的行: {unmatched_rows}")
|
|
|
else:
|
|
|
print("所有行都成功匹配!")
|
|
|
-
|
|
|
+
|
|
|
|
|
|
# 统计匹配上了的行数和没有匹配上的行数
|
|
|
total_rows = len(df_function)
|
|
|
matched_rows = len(df_function) - len(unmatched_rows)
|
|
|
print(f"总行数: {total_rows}, 匹配上的行数: {matched_rows}, 没有匹配上的行数: {len(unmatched_rows)}")
|
|
|
-
|
|
|
+
|
|
|
return unmatched_rows
|
|
|
# 调用检查函数
|
|
|
unmatched_rows = check_matching(df_function, sim_time_to_index, tolerance=0.01)
|
|
@@ -336,12 +350,18 @@ class PGVILProcessor:
|
|
|
print(f"最后一行的 nearest_simTime: {last_row_nearest_simtime}")
|
|
|
"""
|
|
|
# 将弧度转换为角度
|
|
|
- merged_df['posH'] = merged_df['posH'].apply(convert_heading)
|
|
|
+ columns_to_convert = ['posH', 'speedH', 'accelH']
|
|
|
+ for col in columns_to_convert:
|
|
|
+ if col in merged_df.columns:
|
|
|
+ merged_df[col] = merged_df[col].apply(convert_heading)
|
|
|
+
|
|
|
if 'posP' in merged_df.columns:
|
|
|
merged_df.rename(columns={'posP': 'pitch_rate'}, inplace=True)
|
|
|
+ merged_df['pitch_rate'] = merged_df['pitch_rate'].apply(convert_heading)
|
|
|
if 'posR' in merged_df.columns:
|
|
|
merged_df.rename(columns={'posR': 'roll_rate'}, inplace=True)
|
|
|
-
|
|
|
+ merged_df['roll_rate'] = merged_df['roll_rate'].apply(convert_heading)
|
|
|
+
|
|
|
# 先使用 infer_objects 来确保类型一致
|
|
|
merged_df = merged_df.infer_objects()
|
|
|
merged_df.fillna(np.nan, inplace=True) # 确保空值填充为 NaN
|
|
@@ -349,26 +369,24 @@ class PGVILProcessor:
|
|
|
|
|
|
# merged_df.to_csv(merged_csv_path, index=False,na_rep="NaN")
|
|
|
merged_df.to_csv(merged_csv_path, index=False)
|
|
|
-
|
|
|
- return merged_csv_path
|
|
|
|
|
|
+ return merged_csv_path
|
|
|
|
|
|
-
|
|
|
# @ErrorHandler.measure_performance
|
|
|
# def process_built_in_data(self) -> Dict[str, Path]:
|
|
|
# """实现PGVIL特有的内置数据处理逻辑
|
|
|
-
|
|
|
+
|
|
|
# 处理顺序:
|
|
|
# 1. 处理CAN数据
|
|
|
# 2. 处理传感器数据
|
|
|
# 3. 处理其他PGVIL特有数据
|
|
|
# 4. 合并内置数据
|
|
|
-
|
|
|
+
|
|
|
# Returns:
|
|
|
# 处理结果文件路径字典
|
|
|
# """
|
|
|
# result_files = {}
|
|
|
-
|
|
|
+
|
|
|
# # 1. 处理CAN数据
|
|
|
# print("1. 处理CAN数据...")
|
|
|
# can_results = self._process_can_data()
|
|
@@ -376,7 +394,7 @@ class PGVILProcessor:
|
|
|
# result_files.update(can_results)
|
|
|
# else:
|
|
|
# print("警告: CAN数据处理失败或无数据")
|
|
|
-
|
|
|
+
|
|
|
# # 2. 处理传感器数据
|
|
|
# print("\n2. 处理传感器数据...")
|
|
|
# sensor_results = self._process_sensor_data()
|
|
@@ -384,41 +402,41 @@ class PGVILProcessor:
|
|
|
# result_files.update(sensor_results)
|
|
|
# else:
|
|
|
# print("警告: 传感器数据处理失败或无数据")
|
|
|
-
|
|
|
+
|
|
|
# # 3. 处理其他PGVIL特有数据
|
|
|
# print("\n3. 处理其他PGVIL数据...")
|
|
|
# other_results = self._process_other_data()
|
|
|
# if other_results:
|
|
|
# result_files.update(other_results)
|
|
|
-
|
|
|
+
|
|
|
# # 4. 合并内置数据
|
|
|
# print("\n4. 合并内置数据...")
|
|
|
# if not self._merge_built_in_data(result_files):
|
|
|
# print("警告: 内置数据合并失败")
|
|
|
-
|
|
|
+
|
|
|
# return result_files
|
|
|
-
|
|
|
+
|
|
|
# def _process_can_data(self) -> Dict[str, Path]:
|
|
|
# """处理CAN数据"""
|
|
|
# # TODO: 实现CAN数据处理逻辑
|
|
|
# return {}
|
|
|
-
|
|
|
+
|
|
|
# def _process_sensor_data(self) -> Dict[str, Path]:
|
|
|
# """处理传感器数据"""
|
|
|
# # TODO: 实现传感器数据处理逻辑
|
|
|
# return {}
|
|
|
-
|
|
|
+
|
|
|
# def _process_other_data(self) -> Dict[str, Path]:
|
|
|
# """处理其他PGVIL特有数据"""
|
|
|
# # TODO: 实现其他数据处理逻辑
|
|
|
# return {}
|
|
|
-
|
|
|
+
|
|
|
# def _merge_built_in_data(self, result_files: Dict[str, Path]) -> bool:
|
|
|
# """合并PGVIL内置数据
|
|
|
-
|
|
|
+
|
|
|
# Args:
|
|
|
# result_files: 处理结果文件路径字典
|
|
|
-
|
|
|
+
|
|
|
# Returns:
|
|
|
# 合并是否成功
|
|
|
# """
|