|
@@ -1030,7 +1030,12 @@ class FinalDataProcessor:
|
|
|
if not obj_state_path.exists():
|
|
|
print(f"Error: Required input file not found: {obj_state_path}")
|
|
|
return False
|
|
|
-
|
|
|
+ # 处理交通灯数据并保存
|
|
|
+ df_traffic = self._process_trafficlight_data()
|
|
|
+ if not df_traffic.empty:
|
|
|
+ traffic_csv_path = self.output_dir / "Traffic.csv"
|
|
|
+ df_traffic.to_csv(traffic_csv_path, index=False, float_format='%.6f')
|
|
|
+ print(f"Successfully created traffic light data file: {traffic_csv_path}")
|
|
|
# Load and process data
|
|
|
df_object = pd.read_csv(obj_state_path, dtype={"simTime": float}, low_memory=False)
|
|
|
|
|
@@ -1044,8 +1049,8 @@ class FinalDataProcessor:
|
|
|
print(f"Successfully created final merged file: {merged_csv_path}")
|
|
|
|
|
|
# Clean up intermediate files
|
|
|
- if obj_state_path.exists():
|
|
|
- obj_state_path.unlink()
|
|
|
+ # if obj_state_path.exists():
|
|
|
+ # obj_state_path.unlink()
|
|
|
|
|
|
print("--- Final Data Processing Finished ---")
|
|
|
return True
|
|
@@ -1056,9 +1061,198 @@ class FinalDataProcessor:
|
|
|
traceback.print_exc()
|
|
|
return False
|
|
|
|
|
|
+ # def _merge_optional_data(self, df_object: pd.DataFrame) -> pd.DataFrame:
|
|
|
+ # """加载和合并可选数据"""
|
|
|
+ # df_merged = df_object.copy()
|
|
|
+
|
|
|
+ # # 检查并删除重复列的函数
|
|
|
+ # def clean_duplicate_columns(df):
|
|
|
+ # # 查找带有 _x 或 _y 后缀的列
|
|
|
+ # duplicate_cols = []
|
|
|
+ # base_cols = {}
|
|
|
+
|
|
|
+ # for col in df.columns:
|
|
|
+ # if col.endswith('_x') or col.endswith('_y'):
|
|
|
+ # base_name = col[:-2] # 去掉后缀
|
|
|
+ # if base_name not in base_cols:
|
|
|
+ # base_cols[base_name] = []
|
|
|
+ # base_cols[base_name].append(col)
|
|
|
+
|
|
|
+ # # 对于每组重复列,检查数据是否相同,如果相同则只保留一个
|
|
|
+ # for base_name, cols in base_cols.items():
|
|
|
+ # if len(cols) > 1:
|
|
|
+ # # 检查这些列的数据是否相同
|
|
|
+ # is_identical = True
|
|
|
+ # first_col = cols[0]
|
|
|
+ # for col in cols[1:]:
|
|
|
+ # if not df[first_col].equals(df[col]):
|
|
|
+ # is_identical = False
|
|
|
+ # break
|
|
|
+
|
|
|
+ # if is_identical:
|
|
|
+ # # 数据相同,保留第一列并重命名为基本名称
|
|
|
+ # df = df.rename(columns={first_col: base_name})
|
|
|
+ # # 删除其他重复列
|
|
|
+ # for col in cols[1:]:
|
|
|
+ # duplicate_cols.append(col)
|
|
|
+ # print(f"列 {cols} 数据相同,保留为 {base_name}")
|
|
|
+ # else:
|
|
|
+ # print(f"列 {cols} 数据不同,保留所有列")
|
|
|
+
|
|
|
+ # # 删除重复列
|
|
|
+ # if duplicate_cols:
|
|
|
+ # df = df.drop(columns=duplicate_cols)
|
|
|
+ # print(f"删除了重复列: {duplicate_cols}")
|
|
|
+
|
|
|
+ # return df
|
|
|
+
|
|
|
+ # # --- 合并 EgoMap ---
|
|
|
+ # egomap_path = self.output_dir / OUTPUT_CSV_EGOMAP
|
|
|
+ # if egomap_path.exists() and egomap_path.stat().st_size > 0:
|
|
|
+ # try:
|
|
|
+ # df_ego = pd.read_csv(egomap_path, dtype={"simTime": float})
|
|
|
+ # # 删除 simFrame 列,因为使用主数据的 simFrame
|
|
|
+ # if 'simFrame' in df_ego.columns:
|
|
|
+ # df_ego = df_ego.drop(columns=['simFrame'])
|
|
|
+
|
|
|
+ # # 按时间和ID排序
|
|
|
+ # df_ego.sort_values(['simTime', 'playerId'], inplace=True)
|
|
|
+ # df_merged.sort_values(['simTime', 'playerId'], inplace=True)
|
|
|
+
|
|
|
+ # # 使用 merge_asof 进行就近合并,不包括 simFrame
|
|
|
+ # df_merged = pd.merge_asof(
|
|
|
+ # df_merged,
|
|
|
+ # df_ego,
|
|
|
+ # on='simTime',
|
|
|
+ # by='playerId',
|
|
|
+ # direction='nearest',
|
|
|
+ # tolerance=0.01 # 10ms tolerance
|
|
|
+ # )
|
|
|
+ # print("EgoMap data merged.")
|
|
|
+ # except Exception as e:
|
|
|
+ # print(f"Warning: Could not merge EgoMap data from {egomap_path}: {e}")
|
|
|
+
|
|
|
+ # # --- Merge Function ---
|
|
|
+ # function_path = self.output_dir / OUTPUT_CSV_FUNCTION
|
|
|
+ # if function_path.exists() and function_path.stat().st_size > 0:
|
|
|
+ # try:
|
|
|
+ # df_function = pd.read_csv(function_path, dtype={"timestamp": float}, low_memory=False).drop_duplicates()
|
|
|
+ # # 删除 simFrame 列
|
|
|
+ # if 'simFrame' in df_function.columns:
|
|
|
+ # df_function = df_function.drop(columns=['simFrame'])
|
|
|
+
|
|
|
+ # if 'simTime' in df_function.columns:
|
|
|
+ # df_function['simTime'] = df_function['simTime'].round(2)
|
|
|
+ # df_function['time'] = df_function['simTime'].round(2).astype(float)
|
|
|
+ # df_merged['time'] = df_merged['simTime'].round(2).astype(float)
|
|
|
+
|
|
|
+ # common_cols = list(set(df_merged.columns) & set(df_function.columns) - {'time'})
|
|
|
+ # df_function.drop(columns=common_cols, inplace=True, errors='ignore')
|
|
|
+
|
|
|
+ # df_merged = pd.merge(df_merged, df_function, on=["time"], how="left")
|
|
|
+ # df_merged.drop(columns=['time'], inplace=True)
|
|
|
+ # print("Function data merged.")
|
|
|
+ # else:
|
|
|
+ # print("Warning: 'simTime' column not found in Function.csv. Cannot merge.")
|
|
|
+ # except Exception as e:
|
|
|
+ # print(f"Warning: Could not merge Function data from {function_path}: {e}")
|
|
|
+ # else:
|
|
|
+ # print("Function data not found or empty, skipping merge.")
|
|
|
+
|
|
|
+ # # --- Merge OBU ---
|
|
|
+ # obu_path = self.output_dir / OUTPUT_CSV_OBU
|
|
|
+ # if obu_path.exists() and obu_path.stat().st_size > 0:
|
|
|
+ # try:
|
|
|
+ # df_obu = pd.read_csv(obu_path, dtype={"simTime": float}, low_memory=False).drop_duplicates()
|
|
|
+ # # 删除 simFrame 列
|
|
|
+ # if 'simFrame' in df_obu.columns:
|
|
|
+ # df_obu = df_obu.drop(columns=['simFrame'])
|
|
|
+
|
|
|
+ # df_obu['time'] = df_obu['simTime'].round(2).astype(float)
|
|
|
+ # df_merged['time'] = df_merged['simTime'].round(2).astype(float)
|
|
|
+
|
|
|
+ # common_cols = list(set(df_merged.columns) & set(df_obu.columns) - {'time'})
|
|
|
+ # df_obu.drop(columns=common_cols, inplace=True, errors='ignore')
|
|
|
+
|
|
|
+ # df_merged = pd.merge(df_merged, df_obu, on=["time"], how="left")
|
|
|
+ # df_merged.drop(columns=['time'], inplace=True)
|
|
|
+ # print("OBU data merged.")
|
|
|
+ # except Exception as e:
|
|
|
+ # print(f"Warning: Could not merge OBU data from {obu_path}: {e}")
|
|
|
+ # else:
|
|
|
+ # print("OBU data not found or empty, skipping merge.")
|
|
|
+
|
|
|
+ # # 在所有合并完成后,清理重复列
|
|
|
+ # df_merged = clean_duplicate_columns(df_merged)
|
|
|
+
|
|
|
+ # return df_merged
|
|
|
def _merge_optional_data(self, df_object: pd.DataFrame) -> pd.DataFrame:
|
|
|
"""加载和合并可选数据"""
|
|
|
df_merged = df_object.copy()
|
|
|
+
|
|
|
+ # 检查并删除重复列的函数
|
|
|
+ def clean_duplicate_columns(df):
|
|
|
+ # 查找带有 _x 或 _y 后缀的列
|
|
|
+ duplicate_cols = []
|
|
|
+ base_cols = {}
|
|
|
+
|
|
|
+ # 打印清理前的列名
|
|
|
+ print(f"清理重复列前的列名: {df.columns.tolist()}")
|
|
|
+
|
|
|
+ for col in df.columns:
|
|
|
+ if col.endswith('_x') or col.endswith('_y'):
|
|
|
+ base_name = col[:-2] # 去掉后缀
|
|
|
+ if base_name not in base_cols:
|
|
|
+ base_cols[base_name] = []
|
|
|
+ base_cols[base_name].append(col)
|
|
|
+
|
|
|
+ # 对于每组重复列,检查数据是否相同,如果相同则只保留一个
|
|
|
+ for base_name, cols in base_cols.items():
|
|
|
+ if len(cols) > 1:
|
|
|
+ # 检查这些列的数据是否相同
|
|
|
+ is_identical = True
|
|
|
+ first_col = cols[0]
|
|
|
+ for col in cols[1:]:
|
|
|
+ if not df[first_col].equals(df[col]):
|
|
|
+ is_identical = False
|
|
|
+ break
|
|
|
+
|
|
|
+ if is_identical:
|
|
|
+ # 数据相同,保留第一列并重命名为基本名称
|
|
|
+ df = df.rename(columns={first_col: base_name})
|
|
|
+ # 删除其他重复列
|
|
|
+ for col in cols[1:]:
|
|
|
+ duplicate_cols.append(col)
|
|
|
+ print(f"列 {cols} 数据相同,保留为 {base_name}")
|
|
|
+ else:
|
|
|
+ print(f"列 {cols} 数据不同,保留所有列")
|
|
|
+ # 如果是 simTime 相关列,确保保留一个
|
|
|
+ if base_name == 'simTime' and 'simTime' not in df.columns:
|
|
|
+ df = df.rename(columns={cols[0]: 'simTime'})
|
|
|
+ print(f"将 {cols[0]} 重命名为 simTime")
|
|
|
+ # 删除其他 simTime 相关列
|
|
|
+ for col in cols[1:]:
|
|
|
+ duplicate_cols.append(col)
|
|
|
+
|
|
|
+ # 删除重复列
|
|
|
+ if duplicate_cols:
|
|
|
+ # 确保不会删除 simTime 列
|
|
|
+ if 'simTime' not in df.columns and any(col.startswith('simTime_') for col in duplicate_cols):
|
|
|
+ # 找到一个 simTime 相关列保留
|
|
|
+ for col in duplicate_cols[:]:
|
|
|
+ if col.startswith('simTime_'):
|
|
|
+ df = df.rename(columns={col: 'simTime'})
|
|
|
+ duplicate_cols.remove(col)
|
|
|
+ print(f"将 {col} 重命名为 simTime")
|
|
|
+ break
|
|
|
+
|
|
|
+ df = df.drop(columns=duplicate_cols)
|
|
|
+ print(f"删除了重复列: {duplicate_cols}")
|
|
|
+
|
|
|
+ # 打印清理后的列名
|
|
|
+ print(f"清理重复列后的列名: {df.columns.tolist()}")
|
|
|
+
|
|
|
+ return df
|
|
|
|
|
|
# --- 合并 EgoMap ---
|
|
|
egomap_path = self.output_dir / OUTPUT_CSV_EGOMAP
|
|
@@ -1069,6 +1263,10 @@ class FinalDataProcessor:
|
|
|
if 'simFrame' in df_ego.columns:
|
|
|
df_ego = df_ego.drop(columns=['simFrame'])
|
|
|
|
|
|
+ # 打印合并前的列名
|
|
|
+ print(f"合并 EgoMap 前 df_merged 的列: {df_merged.columns.tolist()}")
|
|
|
+ print(f"df_ego 的列: {df_ego.columns.tolist()}")
|
|
|
+
|
|
|
# 按时间和ID排序
|
|
|
df_ego.sort_values(['simTime', 'playerId'], inplace=True)
|
|
|
df_merged.sort_values(['simTime', 'playerId'], inplace=True)
|
|
@@ -1082,62 +1280,312 @@ class FinalDataProcessor:
|
|
|
direction='nearest',
|
|
|
tolerance=0.01 # 10ms tolerance
|
|
|
)
|
|
|
+
|
|
|
+ # 打印合并后的列名
|
|
|
+ print(f"合并 EgoMap 后 df_merged 的列: {df_merged.columns.tolist()}")
|
|
|
+
|
|
|
+ # 确保 simTime 列存在
|
|
|
+ if 'simTime' not in df_merged.columns:
|
|
|
+ if 'simTime_x' in df_merged.columns:
|
|
|
+ df_merged.rename(columns={'simTime_x': 'simTime'}, inplace=True)
|
|
|
+ print("将 simTime_x 重命名为 simTime")
|
|
|
+ else:
|
|
|
+ print("警告: 合并 EgoMap 后找不到 simTime 列!")
|
|
|
+
|
|
|
print("EgoMap data merged.")
|
|
|
except Exception as e:
|
|
|
print(f"Warning: Could not merge EgoMap data from {egomap_path}: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+
|
|
|
+ # 先处理可能的列名重复问题
|
|
|
+ df_merged = clean_duplicate_columns(df_merged)
|
|
|
+
|
|
|
+ # --- 合并 Traffic ---
|
|
|
+ traffic_path = self.output_dir / "Traffic.csv"
|
|
|
+ if traffic_path.exists() and traffic_path.stat().st_size > 0:
|
|
|
+ try:
|
|
|
+ df_traffic = pd.read_csv(traffic_path, dtype={"simTime": float}, low_memory=False).drop_duplicates()
|
|
|
+ # 删除 simFrame 列
|
|
|
+ if 'simFrame' in df_traffic.columns:
|
|
|
+ df_traffic = df_traffic.drop(columns=['simFrame'])
|
|
|
+
|
|
|
+ # 根据车辆航向角确定行驶方向并筛选对应的红绿灯
|
|
|
+ def get_direction_from_heading(heading):
|
|
|
+ # 将角度归一化到 -180 到 180 度范围
|
|
|
+ heading = heading % 360
|
|
|
+ if heading > 180:
|
|
|
+ heading -= 360
|
|
|
+
|
|
|
+ # 确定方向:北(N)、东(E)、南(S)、西(W)
|
|
|
+ if -45 <= heading <= 45: # 北向
|
|
|
+ return 'N'
|
|
|
+ elif 45 < heading <= 135: # 东向
|
|
|
+ return 'E'
|
|
|
+ elif -135 <= heading < -45: # 西向
|
|
|
+ return 'W'
|
|
|
+ else: # 南向 (135 < heading <= 180 或 -180 <= heading < -135)
|
|
|
+ return 'S'
|
|
|
+
|
|
|
+ # 检查posH列是否存在,如果不存在但posH_x存在,则使用posH_x
|
|
|
+ heading_col = 'posH'
|
|
|
+ if heading_col not in df_merged.columns:
|
|
|
+ if 'posH_x' in df_merged.columns:
|
|
|
+ heading_col = 'posH_x'
|
|
|
+ print(f"使用 {heading_col} 替代 posH")
|
|
|
+ else:
|
|
|
+ print(f"警告: 找不到航向角列 posH 或 posH_x")
|
|
|
+ return df_merged
|
|
|
+
|
|
|
+ # 添加方向列
|
|
|
+ df_merged['vehicle_direction'] = df_merged[heading_col].apply(get_direction_from_heading)
|
|
|
+
|
|
|
+ # 创建 phaseId 到方向的映射
|
|
|
+ phase_to_direction = {
|
|
|
+ 1: 'S', # 南直行
|
|
|
+ 2: 'W', # 西直行
|
|
|
+ 3: 'N', # 北直行
|
|
|
+ 4: 'E', # 东直行
|
|
|
+ 5: 'S', # 南行人
|
|
|
+ 6: 'W', # 西行人
|
|
|
+ 7: 'S', # 南左转
|
|
|
+ 8: 'W', # 西左转
|
|
|
+ 9: 'N', # 北左转
|
|
|
+ 10: 'E', # 东左转
|
|
|
+ 11: 'N', # 北行人
|
|
|
+ 12: 'E', # 东行人
|
|
|
+ 13: 'S', # 南右转
|
|
|
+ 14: 'W', # 西右转
|
|
|
+ 15: 'N', # 北右转
|
|
|
+ 16: 'E' # 东右转
|
|
|
+ }
|
|
|
+
|
|
|
+ # 创建 trafficlight_id 到方向的映射
|
|
|
+ trafficlight_to_direction = {
|
|
|
+ # 南向北方向的红绿灯
|
|
|
+ 48100017: 'S',
|
|
|
+ 48100038: 'S',
|
|
|
+ 48100043: 'S',
|
|
|
+ 48100030: 'S',
|
|
|
+ # 西向东方向的红绿灯
|
|
|
+ 48100021: 'W',
|
|
|
+ 48100039: 'W',
|
|
|
+ # 东向西方向的红绿灯
|
|
|
+ 48100041: 'E',
|
|
|
+ 48100019: 'E',
|
|
|
+ # 北向南方向的红绿灯
|
|
|
+ 48100033: 'N',
|
|
|
+ 48100018: 'N',
|
|
|
+ 48100022: 'N'
|
|
|
+ }
|
|
|
+
|
|
|
+ # 添加时间列用于合并
|
|
|
+ df_traffic['time'] = df_traffic['simTime'].round(2).astype(float)
|
|
|
+
|
|
|
+ # 检查 df_merged 中是否有 simTime 列
|
|
|
+ if 'simTime' not in df_merged.columns:
|
|
|
+ print("警告: 合并 Traffic 前 df_merged 中找不到 simTime 列!")
|
|
|
+ # 尝试查找 simTime_x 或其他可能的列
|
|
|
+ if 'simTime_x' in df_merged.columns:
|
|
|
+ df_merged.rename(columns={'simTime_x': 'simTime'}, inplace=True)
|
|
|
+ print("将 simTime_x 重命名为 simTime")
|
|
|
+ else:
|
|
|
+ print("严重错误: 无法找到任何 simTime 相关列,无法继续合并!")
|
|
|
+ return df_merged
|
|
|
+
|
|
|
+ df_merged['time'] = df_merged['simTime'].round(2).astype(float)
|
|
|
+
|
|
|
+ # 合并 Traffic 数据
|
|
|
+ df_merged = pd.merge(df_merged, df_traffic, on=["time"], how="left")
|
|
|
+
|
|
|
+ # 再次处理可能的列名重复问题
|
|
|
+ df_merged = clean_duplicate_columns(df_merged)
|
|
|
+
|
|
|
+ # 检查trafficlight_id列是否存在
|
|
|
+ trafficlight_col = 'trafficlight_id'
|
|
|
+ if trafficlight_col not in df_merged.columns:
|
|
|
+ if 'trafficlight_id_x' in df_merged.columns:
|
|
|
+ trafficlight_col = 'trafficlight_id_x'
|
|
|
+ print(f"使用 {trafficlight_col} 替代 trafficlight_id")
|
|
|
+ else:
|
|
|
+ print(f"警告: 找不到红绿灯ID列 trafficlight_id 或 trafficlight_id_x")
|
|
|
+
|
|
|
+ # 筛选与车辆行驶方向相关的红绿灯
|
|
|
+ def filter_relevant_traffic_light(row):
|
|
|
+ if 'phaseId' not in row or pd.isna(row['phaseId']):
|
|
|
+ return np.nan
|
|
|
+
|
|
|
+ # 获取 phaseId 对应的方向
|
|
|
+ phase_id = int(row['phaseId']) if not pd.isna(row['phaseId']) else None
|
|
|
+ if phase_id is None:
|
|
|
+ return np.nan
|
|
|
+
|
|
|
+ phase_direction = phase_to_direction.get(phase_id, None)
|
|
|
+
|
|
|
+ # 如果 phaseId 方向与车辆方向匹配
|
|
|
+ if phase_direction == row['vehicle_direction']:
|
|
|
+ # 查找该方向的所有红绿灯 ID
|
|
|
+ relevant_ids = [tid for tid, direction in trafficlight_to_direction.items()
|
|
|
+ if direction == phase_direction]
|
|
|
+
|
|
|
+ # 如果 trafficlight_id 在 EgoMap 中且方向匹配
|
|
|
+ if trafficlight_col in row and not pd.isna(row[trafficlight_col]) and row[trafficlight_col] in relevant_ids:
|
|
|
+ return row[trafficlight_col]
|
|
|
+
|
|
|
+ return np.nan
|
|
|
+
|
|
|
+ # 应用筛选函数
|
|
|
+ df_merged['filtered_trafficlight_id'] = df_merged.apply(filter_relevant_traffic_light, axis=1)
|
|
|
+
|
|
|
+ # 清理临时列
|
|
|
+ print(f"删除 time 列前 df_merged 的列: {df_merged.columns.tolist()}")
|
|
|
+ df_merged.drop(columns=['time'], inplace=True)
|
|
|
+ print(f"删除 time 列后 df_merged 的列: {df_merged.columns.tolist()}")
|
|
|
+
|
|
|
+ # 确保 simTime 列存在
|
|
|
+ if 'simTime' not in df_merged.columns:
|
|
|
+ if 'simTime_x' in df_merged.columns:
|
|
|
+ df_merged.rename(columns={'simTime_x': 'simTime'}, inplace=True)
|
|
|
+ print("将 simTime_x 重命名为 simTime")
|
|
|
+ else:
|
|
|
+ print("警告: 处理 Traffic 数据后找不到 simTime 列!")
|
|
|
+
|
|
|
+ print("Traffic light data merged and filtered.")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Warning: Could not merge Traffic data from {traffic_path}: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+ else:
|
|
|
+ print("Traffic data not found or empty, skipping merge.")
|
|
|
|
|
|
# --- Merge Function ---
|
|
|
function_path = self.output_dir / OUTPUT_CSV_FUNCTION
|
|
|
if function_path.exists() and function_path.stat().st_size > 0:
|
|
|
try:
|
|
|
- df_function = pd.read_csv(function_path, dtype={"timestamp": float}, low_memory=False).drop_duplicates()
|
|
|
+ # 添加调试信息
|
|
|
+ print(f"正在读取 Function 数据: {function_path}")
|
|
|
+ df_function = pd.read_csv(function_path, low_memory=False).drop_duplicates()
|
|
|
+ print(f"Function 数据列名: {df_function.columns.tolist()}")
|
|
|
+
|
|
|
# 删除 simFrame 列
|
|
|
if 'simFrame' in df_function.columns:
|
|
|
df_function = df_function.drop(columns=['simFrame'])
|
|
|
|
|
|
+ # 确保 simTime 列存在并且是浮点型
|
|
|
if 'simTime' in df_function.columns:
|
|
|
- df_function['simTime'] = df_function['simTime'].round(2)
|
|
|
- df_function['time'] = df_function['simTime'].round(1).astype(float)
|
|
|
- df_merged['time'] = df_merged['simTime'].round(1).astype(float)
|
|
|
-
|
|
|
- common_cols = list(set(df_merged.columns) & set(df_function.columns) - {'time'})
|
|
|
- df_function.drop(columns=common_cols, inplace=True, errors='ignore')
|
|
|
-
|
|
|
- df_merged = pd.merge(df_merged, df_function, on=["time"], how="left")
|
|
|
- df_merged.drop(columns=['time'], inplace=True)
|
|
|
- print("Function data merged.")
|
|
|
+ # 安全地将 simTime 转换为浮点型
|
|
|
+ try:
|
|
|
+ df_function['simTime'] = pd.to_numeric(df_function['simTime'], errors='coerce')
|
|
|
+ df_function = df_function.dropna(subset=['simTime']) # 删除无法转换的行
|
|
|
+ df_function['time'] = df_function['simTime'].round(2)
|
|
|
+
|
|
|
+ # 安全地处理 df_merged 的 simTime 列
|
|
|
+ if 'simTime' in df_merged.columns:
|
|
|
+ print(f"df_merged['simTime'] 的类型: {df_merged['simTime'].dtype}")
|
|
|
+ print(f"df_merged['simTime'] 的前5个值: {df_merged['simTime'].head().tolist()}")
|
|
|
+
|
|
|
+ df_merged['time'] = pd.to_numeric(df_merged['simTime'], errors='coerce').round(2)
|
|
|
+ # 删除 time 列中的 NaN 值
|
|
|
+ nan_count = df_merged['time'].isna().sum()
|
|
|
+ if nan_count > 0:
|
|
|
+ print(f"警告: 转换后有 {nan_count} 个 NaN 值,将删除这些行")
|
|
|
+ df_merged = df_merged.dropna(subset=['time'])
|
|
|
+
|
|
|
+ # 确保两个 DataFrame 的 time 列类型一致
|
|
|
+ df_function['time'] = df_function['time'].astype(float)
|
|
|
+ df_merged['time'] = df_merged['time'].astype(float)
|
|
|
+
|
|
|
+ common_cols = list(set(df_merged.columns) & set(df_function.columns) - {'time'})
|
|
|
+ df_function.drop(columns=common_cols, inplace=True, errors='ignore')
|
|
|
+
|
|
|
+ # 合并数据
|
|
|
+ df_merged = pd.merge(df_merged, df_function, on=["time"], how="left")
|
|
|
+ df_merged.drop(columns=['time'], inplace=True)
|
|
|
+ print("Function 数据合并成功。")
|
|
|
+ else:
|
|
|
+ print("警告: df_merged 中找不到 'simTime' 列,无法合并 Function 数据。")
|
|
|
+ # 打印所有列名以便调试
|
|
|
+ print(f"df_merged 的所有列: {df_merged.columns.tolist()}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"警告: 处理 Function.csv 中的 simTime 列时出错: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
else:
|
|
|
- print("Warning: 'simTime' column not found in Function.csv. Cannot merge.")
|
|
|
+ print(f"警告: Function.csv 中找不到 'simTime' 列。可用的列: {df_function.columns.tolist()}")
|
|
|
except Exception as e:
|
|
|
- print(f"Warning: Could not merge Function data from {function_path}: {e}")
|
|
|
+ print(f"警告: 无法合并 Function 数据: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
else:
|
|
|
- print("Function data not found or empty, skipping merge.")
|
|
|
+ print(f"Function 数据文件不存在或为空: {function_path}")
|
|
|
|
|
|
# --- Merge OBU ---
|
|
|
obu_path = self.output_dir / OUTPUT_CSV_OBU
|
|
|
if obu_path.exists() and obu_path.stat().st_size > 0:
|
|
|
try:
|
|
|
- df_obu = pd.read_csv(obu_path, dtype={"simTime": float}, low_memory=False).drop_duplicates()
|
|
|
+ # 添加调试信息
|
|
|
+ print(f"正在读取 OBU 数据: {obu_path}")
|
|
|
+ df_obu = pd.read_csv(obu_path, low_memory=False).drop_duplicates()
|
|
|
+ print(f"OBU 数据列名: {df_obu.columns.tolist()}")
|
|
|
+
|
|
|
# 删除 simFrame 列
|
|
|
if 'simFrame' in df_obu.columns:
|
|
|
df_obu = df_obu.drop(columns=['simFrame'])
|
|
|
|
|
|
- df_obu['time'] = df_obu['simTime'].round(1).astype(float)
|
|
|
- df_merged['time'] = df_merged['simTime'].round(1).astype(float)
|
|
|
-
|
|
|
- common_cols = list(set(df_merged.columns) & set(df_obu.columns) - {'time'})
|
|
|
- df_obu.drop(columns=common_cols, inplace=True, errors='ignore')
|
|
|
-
|
|
|
- df_merged = pd.merge(df_merged, df_obu, on=["time"], how="left")
|
|
|
- df_merged.drop(columns=['time'], inplace=True)
|
|
|
- print("OBU data merged.")
|
|
|
+ # 确保 simTime 列存在并且是浮点型
|
|
|
+ if 'simTime' in df_obu.columns:
|
|
|
+ # 安全地将 simTime 转换为浮点型
|
|
|
+ try:
|
|
|
+ df_obu['simTime'] = pd.to_numeric(df_obu['simTime'], errors='coerce')
|
|
|
+ df_obu = df_obu.dropna(subset=['simTime']) # 删除无法转换的行
|
|
|
+ df_obu['time'] = df_obu['simTime'].round(2)
|
|
|
+
|
|
|
+ # 安全地处理 df_merged 的 simTime 列
|
|
|
+ if 'simTime' in df_merged.columns:
|
|
|
+ print(f"合并 OBU 前 df_merged['simTime'] 的类型: {df_merged['simTime'].dtype}")
|
|
|
+ print(f"合并 OBU 前 df_merged['simTime'] 的前5个值: {df_merged['simTime'].head().tolist()}")
|
|
|
+
|
|
|
+ df_merged['time'] = pd.to_numeric(df_merged['simTime'], errors='coerce').round(2)
|
|
|
+ # 删除 time 列中的 NaN 值
|
|
|
+ nan_count = df_merged['time'].isna().sum()
|
|
|
+ if nan_count > 0:
|
|
|
+ print(f"警告: 转换后有 {nan_count} 个 NaN 值,将删除这些行")
|
|
|
+ df_merged = df_merged.dropna(subset=['time'])
|
|
|
+
|
|
|
+ # 确保两个 DataFrame 的 time 列类型一致
|
|
|
+ df_obu['time'] = df_obu['time'].astype(float)
|
|
|
+ df_merged['time'] = df_merged['time'].astype(float)
|
|
|
+
|
|
|
+ common_cols = list(set(df_merged.columns) & set(df_obu.columns) - {'time'})
|
|
|
+ df_obu.drop(columns=common_cols, inplace=True, errors='ignore')
|
|
|
+
|
|
|
+ # 合并数据
|
|
|
+ df_merged = pd.merge(df_merged, df_obu, on=["time"], how="left")
|
|
|
+ df_merged.drop(columns=['time'], inplace=True)
|
|
|
+ print("OBU 数据合并成功。")
|
|
|
+ else:
|
|
|
+ print("警告: df_merged 中找不到 'simTime' 列,无法合并 OBU 数据。")
|
|
|
+ # 打印所有列名以便调试
|
|
|
+ print(f"df_merged 的所有列: {df_merged.columns.tolist()}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"警告: 处理 OBUdata.csv 中的 simTime 列时出错: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+ else:
|
|
|
+ print(f"警告: OBUdata.csv 中找不到 'simTime' 列。可用的列: {df_obu.columns.tolist()}")
|
|
|
except Exception as e:
|
|
|
- print(f"Warning: Could not merge OBU data from {obu_path}: {e}")
|
|
|
+ print(f"警告: 无法合并 OBU 数据: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
else:
|
|
|
- print("OBU data not found or empty, skipping merge.")
|
|
|
+ print(f"OBU 数据文件不存在或为空: {obu_path}")
|
|
|
|
|
|
+ # 在所有合并完成后,再次清理重复列
|
|
|
+ df_merged = clean_duplicate_columns(df_merged)
|
|
|
+
|
|
|
return df_merged
|
|
|
|
|
|
+
|
|
|
def _process_trafficlight_data(self) -> pd.DataFrame:
|
|
|
"""Processes traffic light JSON data if available."""
|
|
|
# Check if json_path is provided and exists
|
|
@@ -1215,6 +1663,14 @@ class FinalDataProcessor:
|
|
|
df_trafficlights.drop_duplicates(subset=['simTime', 'playerId', 'phaseId', 'stateMask'], keep='first',
|
|
|
inplace=True)
|
|
|
print(f"Processed {len(df_trafficlights)} unique traffic light state entries.")
|
|
|
+ # 按时间升序排序 - 修复倒序问题
|
|
|
+ df_trafficlights = df_trafficlights.sort_values('simTime', ascending=True)
|
|
|
+
|
|
|
+ # 添加调试信息
|
|
|
+ print(f"交通灯数据时间范围: {df_trafficlights['simTime'].min()} 到 {df_trafficlights['simTime'].max()}")
|
|
|
+ print(f"交通灯数据前5行时间: {df_trafficlights['simTime'].head().tolist()}")
|
|
|
+ print(f"交通灯数据后5行时间: {df_trafficlights['simTime'].tail().tolist()}")
|
|
|
+
|
|
|
return df_trafficlights
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
@@ -1224,7 +1680,6 @@ class FinalDataProcessor:
|
|
|
print(f"Unexpected error processing traffic light data: {e}")
|
|
|
return pd.DataFrame()
|
|
|
|
|
|
-
|
|
|
# --- Rosbag Processing ---
|
|
|
class RosbagProcessor:
|
|
|
"""Extracts data from Rosbag files within a ZIP archive."""
|