#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2023/11/27 @Last Modified: 2023/11/27 @Summary: Csv data process functions """ import os import sys import numpy as np import pandas as pd from status_mapping import * from status_trigger import * # from status_mapping import acc_status_mapping, lka_status_mapping, ldw_status_mapping from data_quality import DataQuality, get_all_files, frame_loss_statistic from common import cal_velocity from data_info import CsvData import log class DataProcess(object): """ The data process class. It is a template to get evaluation raw data and process the raw data. Attributes: """ def __init__(self, data_path, config, case_name): # base info self.data_path = data_path self.case_name = case_name self.config = config # drive data self.ego_df = pd.DataFrame() self.object_df = pd.DataFrame() self.driver_ctrl_df = pd.DataFrame() self.vehicle_sys_df = pd.DataFrame() self.status_df = pd.DataFrame() # environment data self.lane_info_df = pd.DataFrame() self.road_mark_df = pd.DataFrame() self.road_pos_df = pd.DataFrame() self.traffic_light_df = pd.DataFrame() self.traffic_signal_df = pd.DataFrame() self.frame_rate = float() self.obj_data = {} self.ego_data = {} self.obj_id_list = {} self.car_info = {} self.report_info = {} self.driver_ctrl_data = {} self.status_trigger_dict = {} self._process() def _process(self): # self._signal_mapping() self._merge_csv() self._read_csv() self._invalid_detect() self._fill_missing_columns() # self._cal_frame_rate() # self._time_alignment() self.car_info = self._get_car_info(self.object_df) # self._compact_data() # self._abnormal_detect() # self._status_map(self.object_df) self._object_accel_get_from_egostate() self._object_df_process() self.status_trigger_dict = self._get_status_trigger_dict(self.status_df) self.report_info = self._get_report_info(self.obj_data[1]) self.driver_ctrl_data = self._get_driver_ctrl_data(self.driver_ctrl_df) def _invalid_column_detect(self, df, csv_name): # head and tail detect """ Detect the head of the csv whether begin with 0 or not. Returns: A dataframe, which 'time' column begin with 0. """ logger = log.get_logger() for column in df.columns: if df[column].nunique() == 1: # if 9999.00 in df[column].values or "9999.00" in df[column].values: logger.warning( f"[case:{self.case_name}] SINGLE_CASE_EVAL: [{csv_name}] data '{column}' invalid WARNING!") def _csv_interpolate_by_frame(self, df): # df = pd.read_csv(input) # 读取CSV文件 df['simFrame'] = pd.to_numeric(df['simFrame'], errors='coerce') # 转换simFrame列为数字类型 df = df.sort_values(by='simFrame') # 根据simFrame列进行排序 full_simFrame_series = pd.Series(range(df['simFrame'].min(), df['simFrame'].max() + 1)) # 构建一个包含连续simFrame的完整序列 df = df.merge(full_simFrame_series.rename('simFrame'), how='right') # 使用merge方法将原始数据与完整序列合并,以填充缺失的simFrame行 df = df.interpolate(method='linear') # 对其他列进行线性插值 df['simFrame'] = df['simFrame'].astype(int) # 恢复simFrame列的数据类型为整数 # df.to_csv(output, index=False) # 保存处理后的数据到新的CSV文件 result = df.copy() return result def _cal_frame_rate(self): object_df = self.object_df.copy() ego_df = object_df[object_df['playerId'] == 1] df_filtered = ego_df[(ego_df['simTime'] > 0) & (ego_df['simTime'] <= 1)] self.frame_rate = df_filtered.shape[0] def _data_time_align(self, base_time, df): # 特判,如果输入的dataframe无数值,那么直接返回 if df.empty: return df # FRAME_RATE = self.frame_rate time_diff = 1.0 / self.frame_rate # 创建一个新的递增的 simTime 序列,从 0 开始,步长为 0.01 (time_diff) new_sim_time_values = np.arange(0, base_time.max() + time_diff, time_diff) # 创建一个映射字典,将原始 simTime 值映射到新的 simTime 值 original_to_new_sim_time = {original: round(new_sim_time_values[i], 2) for i, original in enumerate(base_time)} # 使用isin函数来过滤df,并筛选出simFrame大于0的数据 filtered_df1 = df[df['simTime'].isin(base_time)] filtered_df2 = filtered_df1[filtered_df1['simFrame'] > 0] filtered_df = filtered_df2.reset_index(drop=True) # 使用映射字典来替换 filtered_df 中的 simTime 列 filtered_df['simTime'] = filtered_df['simTime'].map(original_to_new_sim_time) # 同步更新simFrame filtered_df['simFrame'] = (filtered_df['simTime'] * self.frame_rate + 1).round().astype(int) return filtered_df @staticmethod def _speed_mps2kmph(df): df['speedX'] = df['speedX'] * 3.6 # m/s to km/h df['speedY'] = df['speedY'] * 3.6 # m/s to km/h df['speedZ'] = df['speedZ'] * 3.6 # m/s to km/h return df def _merge_csv(self): # read csv files df_ego = pd.read_csv(os.path.join(self.data_path, 'EgoState.csv')).drop_duplicates().dropna(how='all') df_object = pd.read_csv(os.path.join(self.data_path, 'ObjState.csv')).drop_duplicates() # 车辆行驶信息 df_laneinfo = pd.read_csv(os.path.join(self.data_path, 'LaneInfo.csv')).drop_duplicates() # 曲率信息, 曲率加速度信息 df_roadPos = pd.read_csv(os.path.join(self.data_path, 'RoadPos.csv')).drop_duplicates() df_status = pd.read_csv(os.path.join(self.data_path, 'VehState.csv'), index_col=False).drop_duplicates() # 状态机 df_vehicleSys = pd.read_csv(os.path.join(self.data_path, 'VehicleSystems.csv')).drop_duplicates() # 车灯信息 self.lane_info_df = df_laneinfo self.vehicle_sys_df = df_vehicleSys self.status_df = df_status self._invalid_detect_before_merge() # invalid detect # self._invalid_column_detect(df_laneinfo, 'LaneInfo.csv') # self._invalid_column_detect(df_object, 'ObjState.csv') # self._invalid_column_detect(df_vehicleSys, 'VehicleSystems.csv') # self._invalid_column_detect(df_status, 'VehState.csv') # df_ego['simTime'] = df_ego['simTime'].round(2) # EGO: km/h # df_object['simTime'] = df_object['simTime'].round(2) # OBJ: m/s, need unit conversion df_object = self._speed_mps2kmph(df_object) # m/s to km/h # base_time = df_ego['simTime'].unique() # df_ego = self._data_time_align(base_time, df_ego) # df_object = self._data_time_align(base_time, df_object) # df_laneinfo = self._data_time_align(base_time, df_laneinfo) # df_roadPos = self._data_time_align(base_time, df_roadPos) # df_status = self._data_time_align(base_time, df_status) # df_vehicleSys = self._data_time_align(base_time, df_vehicleSys) EGO_PLAYER_ID = 1 # 合并 ego_df 和 obj_df df_ego['playerId'] = EGO_PLAYER_ID combined_df = pd.concat([df_object, df_ego]).drop_duplicates(subset=['simTime', 'simFrame', 'playerId']) df_object = combined_df.sort_values( by=['simTime', 'simFrame', 'playerId']).copy() # 按simTime/simFrame/playerId排序 df_laneinfo['curvHor'] = df_laneinfo['curvHor'].round(3) df_laneinfo.rename(columns={"id": 'laneId'}, inplace=True) result = pd.merge(df_roadPos, df_laneinfo, how='inner', on=["simTime", "simFrame", "playerId", "laneId"]) df_laneinfo_new = result[["simTime", "simFrame", "playerId", "curvHor", "curvHorDot"]].copy().drop_duplicates() # status mapping df_status = self._status_mapping(df_status) df_status = df_status[['simTime', 'ACC_status', 'Aeb_status', 'LKA_status', 'ICA_status', 'LDW_status', 'set_cruise_speed', 'set_headway_time']].copy() df_roadPos = df_roadPos[["simTime", "simFrame", "playerId", "laneOffset", "rollRel", "pitchRel"]].copy() # df merge df_vehicleSys = df_vehicleSys[['simTime', 'simFrame', 'lightMask', 'steering']].copy() merged_df = pd.merge(df_object, df_vehicleSys, on=["simTime", "simFrame"], how="left") merged_df1 = pd.merge(merged_df, df_laneinfo_new, on=["simTime", "simFrame", "playerId"], how="left") merged_df1 = pd.merge(merged_df1, df_roadPos, on=["simTime", "simFrame", "playerId"], how="left") merged_df2 = pd.merge_asof(merged_df1, df_status, on="simTime", direction='nearest') mg_df = merged_df2.drop_duplicates() # 去重 mg_df = mg_df[mg_df.simFrame > 0].copy() mg_df.to_csv(os.path.join(self.data_path, 'merged_ObjState.csv'), index=False) print('The files are merged.') def _read_csv(self): """ Read csv files to dataframe. Args: data_path: A str of the path of csv files Returns: No returns. """ self.driver_ctrl_df = pd.read_csv(os.path.join(self.data_path, 'DriverCtrl.csv')).drop_duplicates() self.ego_df = pd.read_csv(os.path.join(self.data_path, 'EgoState.csv')).drop_duplicates() # self.object_df = pd.read_csv(os.path.join(self.data_path, 'ObjState.csv')) self.object_df = pd.read_csv(os.path.join(self.data_path, 'merged_ObjState.csv')).drop_duplicates( subset=['simTime', 'simFrame', 'playerId']) self.road_mark_df = pd.read_csv(os.path.join(self.data_path, 'RoadMark.csv')).drop_duplicates() self.road_pos_df = pd.read_csv(os.path.join(self.data_path, 'RoadPos.csv')).drop_duplicates() self.traffic_light_df = pd.read_csv(os.path.join(self.data_path, 'TrafficLight.csv')).drop_duplicates() self.traffic_signal_df = pd.read_csv(os.path.join(self.data_path, 'TrafficSign.csv')).drop_duplicates() def _invalid_detect_before_merge(self): # invalid detect self._invalid_column_detect(self.lane_info_df, 'LaneInfo.csv') # self._invalid_column_detect(self.object_df, 'ObjState.csv') self._invalid_column_detect(self.vehicle_sys_df, 'VehicleSystems.csv') self._invalid_column_detect(self.status_df, 'VehState.csv') def _invalid_detect(self): # invalid detect self._invalid_column_detect(self.ego_df, 'EgoState.csv') self._invalid_column_detect(self.object_df, 'ObjState.csv') self._invalid_column_detect(self.driver_ctrl_df, 'DriverCtrl.csv') self._invalid_column_detect(self.road_mark_df, 'RoadMark.csv') self._invalid_column_detect(self.road_pos_df, 'RoadPos.csv') self._invalid_column_detect(self.traffic_light_df, 'TrafficLight.csv') self._invalid_column_detect(self.traffic_signal_df, 'TrafficSign.csv') def _fill_missing_columns(self): pass def _time_alignment(self): base_time = self.ego_df['simTime'].unique() self.driver_ctrl_df = self._data_time_align(base_time, self.driver_ctrl_df) self.ego_df = self._data_time_align(base_time, self.ego_df) self.object_df = self._data_time_align(base_time, self.object_df) self.road_mark_df = self._data_time_align(base_time, self.road_mark_df) self.road_pos_df = self._data_time_align(base_time, self.road_pos_df) self.traffic_light_df = self._data_time_align(base_time, self.traffic_light_df) self.traffic_signal_df = self._data_time_align(base_time, self.traffic_signal_df) print("The data is aligned.") # interpolate data # self.driver_ctrl_df = self._csv_interpolate_by_frame(self.driver_ctrl_df) # self.ego_df = self._csv_interpolate_by_frame(self.ego_df) # self.object_df = self._csv_interpolate_by_frame(self.object_df) # self.road_mark_df = self._csv_interpolate_by_frame(self.road_mark_df) # self.road_pos_df = self._csv_interpolate_by_frame(self.road_pos_df) # self.traffic_light_df = self._csv_interpolate_by_frame(self.traffic_light_df) # self.traffic_signal_df = self._csv_interpolate_by_frame(self.traffic_signal_df) def _signal_mapping(self): pass # singal mapping # signal_json = r'./signal.json' # signal_dict = json2dict(signal_json) # df_objectstate = signal_name_map(df_objectstate, signal_dict, 'objectState') # df_roadmark = signal_name_map(df_roadmark, signal_dict, 'roadMark') # df_roadpos = signal_name_map(df_roadpos, signal_dict, 'roadPos') # df_trafficlight = signal_name_map(df_trafficlight, signal_dict, 'trafficLight') # df_trafficsignal = signal_name_map(df_trafficsignal, signal_dict, 'trafficSignal') # df_drivectrl = signal_name_map(df_drivectrl, signal_dict, 'driverCtrl') # df_laneinfo = signal_name_map(df_laneinfo, signal_dict, 'laneInfo') # df_status = signal_name_map(df_status, signal_dict, 'statusMachine') # df_vehiclesys = signal_name_map(df_vehiclesys, signal_dict, 'vehicleSys') def _get_car_info(self, df): """ Args: df: Returns: """ EGO_PLAYER_ID = 1 first_row = df[df['playerId'] == EGO_PLAYER_ID].iloc[0].to_dict() # length = first_row['dimX'] # width = first_row['dimY'] # height = first_row['dimZ'] # offset = first_row['offX'] # # car_info = { # "length": length, # "width": width, # "height": height, # "offset": offset # } car_info = { "length": 4, "width": 2, "height": 2, "offset": 1 } return car_info def _compact_data(self): """ Extra necessary data from dataframes. Returns: """ self.object_df = self.object_df[CsvData.OBJECT_INFO].copy() def _abnormal_detect(self): # head and tail detect """ Detect the head of the csv whether begin with 0 or not. Returns: A dataframe, which 'time' column begin with 0. """ pass def _unit_unified(self): pass def _object_accel_get_from_egostate(self): # 使用merge函数来合并两个DataFrame,基于simTime和playerId列 # 我们使用how='left'来确保df_object中的所有行都被保留 # 并且当在df_ego中找到匹配时,使用df_ego中的accel值 merged = pd.merge(self.object_df, self.ego_df[['simTime', 'playerId', 'accelX']], on=['simTime', 'playerId'], how='left', suffixes=('', '_y')) merged = pd.merge(merged, self.ego_df[['simTime', 'playerId', 'accelY']], on=['simTime', 'playerId'], how='left', suffixes=('', '_y')) # 因为我们使用了suffixes参数来避免列名冲突(尽管在这个特定情况下可能不是必需的), # 但现在我们有一个名为'accel_y'的列,它包含了我们要更新的值 # 如果不担心列名冲突,可以省略suffixes参数,并直接使用'accel'作为列名 # 在这种情况下,你只需要选择正确的列来更新df_object # 更新df_object的accel列 # 如果使用了suffixes参数,则使用'accel_y' # 如果没有使用suffixes参数,并且确信没有列名冲突,则直接使用'accel' self.object_df['accelX'] = merged['accelX_y'] # 如果使用了suffixes self.object_df['accelY'] = merged['accelY_y'] # 如果使用了suffixes def _object_df_process(self): """ Process the data of object dataframe. Save the data groupby object_ID. Returns: No returns. """ EGO_PLAYER_ID = 1 data = self.object_df.copy() # calculate common parameters data['lat_v'] = data['speedY'] * 1 data['lon_v'] = data['speedX'] * 1 data['v'] = data.apply(lambda row: cal_velocity(row['lat_v'], row['lon_v']), axis=1) data['v'] = data['v'] # km/h # calculate acceleraton components data['lat_acc'] = data['accelY'] * 1 data['lon_acc'] = data['accelX'] * 1 data['accel'] = data.apply(lambda row: cal_velocity(row['lat_acc'], row['lon_acc']), axis=1) data = data.dropna(subset=['type']) data.reset_index(drop=True, inplace=True) self.object_df = data.copy() # calculate respective parameters for obj_id, obj_data in data.groupby("playerId"): self.obj_data[obj_id] = obj_data self.obj_data[obj_id]['time_diff'] = self.obj_data[obj_id]['simTime'].diff() self.obj_data[obj_id]['lat_acc_diff'] = self.obj_data[obj_id]['lat_acc'].diff() self.obj_data[obj_id]['lon_acc_diff'] = self.obj_data[obj_id]['lon_acc'].diff() self.obj_data[obj_id]['yawrate_diff'] = self.obj_data[obj_id]['speedH'].diff() self.obj_data[obj_id]['lat_acc_roc'] = self.obj_data[obj_id]['lat_acc_diff'] / self.obj_data[obj_id][ 'time_diff'] self.obj_data[obj_id]['lon_acc_roc'] = self.obj_data[obj_id]['lon_acc_diff'] / self.obj_data[obj_id][ 'time_diff'] self.obj_data[obj_id]['accelH'] = self.obj_data[obj_id]['yawrate_diff'] / self.obj_data[obj_id][ 'time_diff'] self.obj_data[obj_id]['lat_acc_roc'] = self.obj_data[obj_id]['lat_acc_roc'].replace([np.inf, -np.inf], [9999, -9999]) self.obj_data[obj_id]['lon_acc_roc'] = self.obj_data[obj_id]['lon_acc_roc'].replace([np.inf, -np.inf], [9999, -9999]) self.obj_data[obj_id]['accelH'] = self.obj_data[obj_id]['accelH'].replace([np.inf, -np.inf], [9999, -9999]) # get object id list self.obj_id_list = list(self.obj_data.keys()) self.ego_data = self.obj_data[EGO_PLAYER_ID] def _get_status_trigger_dict(self, status_df): # calculate ACC trigger time acc_trigger = ACCTrigger(status_df) acc_status_trigger = acc_trigger.ACC_active_time_statistics() # calculate LKA trigger time lka_trigger = LKATrigger(status_df) lka_status_trigger = lka_trigger.LKA_active_time_statistics() # calculate ICA trigger time ica_trigger = ICATrigger(status_df) ica_status_trigger = ica_trigger.ICA_active_time_statistics() # ica_status_trigger = {'ICA_active_time': []} return { "ACC": acc_status_trigger, "LKA": lka_status_trigger, "ICA": ica_status_trigger } def _mileage_cal(self, df1): """ Calculate mileage of given df. Args: df1: A dataframe of driving data. Returns: mileage: A float of the mileage(meter) of the driving data. """ df = df1.copy() # if 9999.00 in df['travelDist'].values or "9999.00" in df['travelDist'].values: if df['travelDist'].nunique() == 1: df['time_diff'] = df['simTime'].diff() # 计算时间间隔 df['avg_speed'] = (df['v'] + df['v'].shift()) / 2 # 计算每个时间间隔的平均速度 df['distance_increment'] = df['avg_speed'] * df['time_diff'] / 3.6 # 计算每个时间间隔的距离增量 # 计算当前里程 df['travelDist'] = df['distance_increment'].cumsum() df['travelDist'] = df['travelDist'].fillna(0) mile_start = df['travelDist'].iloc[0] mile_end = df['travelDist'].iloc[-1] mileage = round(mile_end - mile_start, 2) return mileage def _duration_cal(self, df): """ Calculate duration of given df. Args: df: A dataframe of driving data. Returns: duration: A float of the duration(second) of the driving data. """ time_start = df['simTime'].iloc[0] time_end = df['simTime'].iloc[-1] duration = time_end - time_start return duration def _get_report_info(self, df): """ Get report infomation from dataframe. Args: df: A dataframe of driving data. Returns: report_info: A dict of report infomation. """ mileage = self._mileage_cal(df) duration = self._duration_cal(df) report_info = { "mileage": mileage, "duration": duration } return report_info def _status_mapping(self, df): # df['Abpb_status'] = df['Abpb_status'].apply(lambda x: abpb_status_mapping(x)) df['ACC_status'] = df['ACC_status'].apply(lambda x: acc_status_mapping(x)) df['Aeb_status'] = df['Aeb_status'].apply(lambda x: aeb_status_mapping(x)) # df['Awb_status'] = df['Awb_status'].apply(lambda x: ldw_status_mapping(x)) # df['DOW_status'] = df['DOW_status'].apply(lambda x: ldw_status_mapping(x)) # df['Eba_status'] = df['Eba_status'].apply(lambda x: ldw_status_mapping(x)) # df['ELK_status'] = df['ELK_status'].apply(lambda x: ldw_status_mapping(x)) # df['ESA_status'] = df['ESA_status'].apply(lambda x: ldw_status_mapping(x)) # df['Fcw_status'] = df['Fcw_status'].apply(lambda x: ldw_status_mapping(x)) df['ICA_status'] = df['ICA_status'].apply(lambda x: ica_status_mapping(x)) # df['ISLC_status'] = df['ISLC_status'].apply(lambda x: ldw_status_mapping(x)) # df['JA_status'] = df['JA_status'].apply(lambda x: ldw_status_mapping(x)) df['LKA_status'] = df['LKA_status'].apply(lambda x: lka_status_mapping(x)) df['LDW_status'] = df['LDW_status'].apply(lambda x: ldw_status_mapping(x)) # df['NOA_status'] = df['NOA_status'].apply(lambda x: ldw_status_mapping(x)) # df['RCW_status'] = df['RCW_status'].apply(lambda x: ldw_status_mapping(x)) # df['TLC_status'] = df['TLC_status'].apply(lambda x: ldw_status_mapping(x)) # df['FVSR_status'] = df['FVSR_status'].apply(lambda x: ldw_status_mapping(x)) # df['BSD_status'] = df['BSD_status'].apply(lambda x: ldw_status_mapping(x)) # df['RCTA_status'] = df['RCTA_status'].apply(lambda x: ldw_status_mapping(x)) # df['FCTA_status'] = df['FCTA_status'].apply(lambda x: ldw_status_mapping(x)) # df['ISA_status'] = df['ISA_status'].apply(lambda x: ldw_status_mapping(x)) # df['TSR_status'] = df['TSR_status'].apply(lambda x: ldw_status_mapping(x)) # df['AVM_status'] = df['AVM_status'].apply(lambda x: ldw_status_mapping(x)) # df['PDC_status'] = df['PDC_status'].apply(lambda x: ldw_status_mapping(x)) # df['APA_status'] = df['APA_status'].apply(lambda x: ldw_status_mapping(x)) # df['MEB_status'] = df['MEB_status'].apply(lambda x: ldw_status_mapping(x)) # df['RDA_status'] = df['RDA_status'].apply(lambda x: ldw_status_mapping(x)) return df def _get_driver_ctrl_data(self, df): """ Process and get drive ctrl information. Such as brake pedal, throttle pedal and steering wheel. Args: df: A dataframe of driver ctrl data. Returns: driver_ctrl_data: A dict of driver ctrl info. """ time_list = df['simTime'].round(2).values.tolist() frame_list = df['simFrame'].values.tolist() max_brakePedal = df['brakePedal'].max() if max_brakePedal < 1: df['brakePedal'] = df['brakePedal'] * 100 brakePedal_list = df['brakePedal'].values.tolist() max_throttlePedal = df['throttlePedal'].max() if max_throttlePedal < 1: df['throttlePedal'] = df['throttlePedal'] * 100 throttlePedal_list = df['throttlePedal'].values.tolist() steeringWheel_list = df['steeringWheel'].values.tolist() driver_ctrl_data = { "time_list": time_list, "frame_list": frame_list, "brakePedal_list": brakePedal_list, "throttlePedal_list": throttlePedal_list, "steeringWheel_list": steeringWheel_list } return driver_ctrl_data class StatusTime(object): """ # 调用方式:使用isin()方法来筛选DataFrame中simTime列与time_list中值一致的行 filtered_df = df[df['simTime'].isin(self.status_time_dict['ACC_status'])] """ def __init__(self, status_df): self.status_df = status_df self.status_list = [] self.no_status_time_list = [] self.status_time_dict = {} self._run() def _get_status_list(self): status_columns_list = self.status_df.columns.tolist() self.status_list = [x for x in status_columns_list if x not in ['simTime', 'simFrame']] def _get_no_status_time_list(self): # 筛选出所有状态机列值都为0的行 filtered_rows = self.status_df[self.status_df[self.status_list].eq(0).all(axis=1)] # 将筛选后的simTime列的值作为列表输出 self.no_status_time_list = filtered_rows['simTime'].tolist() def _get_status_time_dict(self): for status in self.status_list: status_time = self.status_df[self.status_df[status] != 0]['simTime'].values.tolist() self.status_time_dict[status] = status_time self.status_time_dict['no_status'] = self.no_status_time_list def _run(self): self._get_status_list() self._get_no_status_time_list() self._get_status_time_dict()