#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2023/11/27 @Last Modified: 2023/11/27 @Summary: Csv data process functions """ import os import numpy as np import pandas as pd from common import cal_velocity from data_info import CsvData import matplotlib.pyplot as plt class DataProcess(object): """ The data process class. It is a template to get evaluation raw data and process the raw data. Attributes: """ def __init__(self, data_path, config): self.data_path = data_path self.casePath = data_path # config info self.config = config # self.safe_config = config.config['safe'] # self.function_config = config.config['function'] # self.compliance_config = config.config['compliance'] self.comfort_config = config.config['comfort'] self.efficient_config = config.config['efficient'] # data process self.ego_df = pd.DataFrame() self.object_df = pd.DataFrame() # self.driver_ctrl_df = pd.DataFrame() # self.road_mark_df = pd.DataFrame() # self.road_pos_df = pd.DataFrame() # self.traffic_light_df = pd.DataFrame() # self.traffic_signal_df = pd.DataFrame() # self.status_df = pd.DataFrame() self.obj_data = {} self.ego_data = {} self.obj_id_list = {} self.car_info = {} self.report_info = {} self.driver_ctrl_data = {} self._process() def _process(self): # self._merge_csv() self._read_csv() self._draw_track() # self._signal_mapping() # self.car_info = self._get_car_info(self.object_df) # self._compact_data() # self._abnormal_detect() # self._status_map(self.object_df) self._object_df_process() self.report_info = self._get_report_info(self.obj_data[1]) self.driver_ctrl_data = self._get_driver_ctrl_data(self.ego_df) def _read_csv(self): """ Read csv files to dataframe. Args: data_path: A str of the path of csv files Returns: No returns. """ # self.object_df = pd.read_csv(os.path.join(self.data_path, 'merged_ObjState.csv')) # self.object_df = pd.read_csv(os.path.join(self.data_path, 'ObjState.csv')) self.ego_df = pd.read_csv(os.path.join(self.data_path, 'EgoState_pji.csv')) self.ego_df['playerId'] = 1 # self.driver_ctrl_df = pd.read_csv(os.path.join(self.data_path, 'DriverCtrl.csv')) # self.road_mark_df = pd.read_csv(os.path.join(self.data_path, 'RoadMark.csv')) # self.road_pos_df = pd.read_csv(os.path.join(self.data_path, 'RoadPos.csv')) # self.traffic_light_df = pd.read_csv(os.path.join(self.data_path, 'TrafficLight.csv')) # self.traffic_signal_df = pd.read_csv(os.path.join(self.data_path, 'TrafficSign.csv')) # self.lane_info_df = pd.read_csv(os.path.join(self.data_path, 'LaneInfo.csv')).drop_duplicates() # self.status_df = pd.read_csv(os.path.join(self.data_path, 'VehicleState.csv')) # self.vehicle_sys_df = pd.read_csv( # os.path.join(self.data_path, 'VehicleSystems.csv')).drop_duplicates() # 车灯信息 def _draw_track(self): """ """ df = self.ego_df.copy() plt.scatter(df['posX'], df['posY'], c=df['simTime'], s=0.1) plt.axis('equal') # 添加坐标轴标签和标题 plt.xlabel('posX') plt.ylabel('posY') plt.title('Trajectory') # 显示图形 plt.savefig(os.path.join(self.casePath, "./track.png")) plt.close() def _signal_mapping(self): pass # singal mapping # signal_json = r'./signal.json' # signal_dict = json2dict(signal_json) # df_objectstate = signal_name_map(df_objectstate, signal_dict, 'objectState') # df_roadmark = signal_name_map(df_roadmark, signal_dict, 'roadMark') # df_roadpos = signal_name_map(df_roadpos, signal_dict, 'roadPos') # df_trafficlight = signal_name_map(df_trafficlight, signal_dict, 'trafficLight') # df_trafficsignal = signal_name_map(df_trafficsignal, signal_dict, 'trafficSignal') # df_drivectrl = signal_name_map(df_drivectrl, signal_dict, 'driverCtrl') # df_laneinfo = signal_name_map(df_laneinfo, signal_dict, 'laneInfo') # df_status = signal_name_map(df_status, signal_dict, 'statusMachine') # df_vehiclesys = signal_name_map(df_vehiclesys, signal_dict, 'vehicleSys') def _get_car_info(self, df): """ Args: df: Returns: """ first_row = df[df['playerId'] == 1].iloc[0].to_dict() length = first_row['dimX'] width = first_row['dimY'] height = first_row['dimZ'] offset = first_row['offX'] car_info = { "length": length, "width": width, "height": height, "offset": offset } return car_info def _compact_data(self): """ Extra necessary data from dataframes. Returns: """ self.object_df = self.object_df[CsvData.OBJECT_INFO].copy() def _abnormal_detect(self): # head and tail detect """ Detect the head of the csv whether begin with 0 or not. Returns: A dataframe, which 'time' column begin with 0. """ pass def _mileage_cal(self, df): """ Calculate mileage of given df. Args: df: A dataframe of driving data. Returns: mileage: A float of the mileage(meter) of the driving data. """ travelDist = df['traveledDist'].values.tolist() travelDist = [x for x in travelDist if not np.isnan(x)] # mile_start = df['travelDist'].iloc[0] mile_start = travelDist[0] mile_end = travelDist[-1] mileage = mile_end - mile_start return mileage def _duration_cal(self, df): """ Calculate duration of given df. Args: df: A dataframe of driving data. Returns: duration: A float of the duration(second) of the driving data. """ time_start = df['simTime'].iloc[0] time_end = df['simTime'].iloc[-1] duration = time_end - time_start return duration def _get_report_info(self, df): """ Get report infomation from dataframe. Args: df: A dataframe of driving data. Returns: report_info: A dict of report infomation. """ mileage = self._mileage_cal(df) duration = self._duration_cal(df) report_info = { "mileage": mileage, "duration": duration } return report_info # def _status_mapping(self, df): # df['ACC_status'] = df['ACC_status'].apply(lambda x: acc_status_mapping(x)) # df['LKA_status'] = df['LKA_status'].apply(lambda x: lka_status_mapping(x)) # df['LDW_status'] = df['LDW_status'].apply(lambda x: ldw_status_mapping(x)) def _object_df_process(self): """ Process the data of object dataframe. Save the data groupby object_ID. Returns: No returns. """ data = self.ego_df.copy() data.rename( columns={"speedY": "lat_v", "speedX": "lon_v", "accelY": "lat_acc", "accelX": "lon_acc", "dimZ": "speedH"}, inplace=True) # calculate common parameters # data['lat_v'] = data['speedX'] * np.sin(data['posH']) * -1 + data['speedY'] * np.cos(data['posH']) # data['lon_v'] = data['speedX'] * np.cos(data['posH']) + data['speedY'] * np.sin(data['posH']) data['v'] = data.apply(lambda row: cal_velocity(row['lat_v'], row['lon_v']), axis=1) data['time_diff'] = data['simTime'].diff() data['avg_speed'] = (data['v'] + data['v'].shift()) / 2 # 计算每个时间间隔的平均速度 data['distance_increment'] = data['avg_speed'] * data['time_diff'] # 计算每个时间间隔的距离增量 # 计算当前里程 data['traveledDist'] = data['distance_increment'].cumsum() data['traveledDist'] = data['traveledDist'].fillna(0) # calculate acceleraton components # data['lat_acc'] = data['accelX'] * np.sin(data['posH']) * -1 + data['accelY'] * np.cos(data['posH']) # data['lon_acc'] = data['accelX'] * np.cos(data['posH']) + data['accelY'] * np.sin(data['posH']) data['accel'] = data.apply(lambda row: cal_velocity(row['lat_acc'], row['lon_acc']), axis=1) # data.rename(columns={"yawrate_roc": "accelH"}, inplace=True) self.object_df = data.copy() # calculate respective parameters for obj_id, obj_data in data.groupby("playerId"): self.obj_data[obj_id] = obj_data self.obj_data[obj_id]['lat_acc_diff'] = self.obj_data[obj_id]['lat_acc'].diff() self.obj_data[obj_id]['lon_acc_diff'] = self.obj_data[obj_id]['lon_acc'].diff() self.obj_data[obj_id]['speedH_diff'] = self.obj_data[obj_id]['speedH'].diff() self.obj_data[obj_id]['time_diff'] = self.obj_data[obj_id]['simTime'].diff() # self.obj_data['avg_speed'] = (self.obj_data['v'] + self.obj_data['v'].shift()) / 2 # 计算每个时间间隔的平均速度 # self.obj_data['distance_increment'] = self.obj_data['avg_speed'] * self.obj_data['time_diff'] / 3.6 # 计算每个时间间隔的距离增量 # # # 计算当前里程 # self.obj_data['travelDist'] = self.obj_data['distance_increment'].cumsum() # self.obj_data['travelDist'] = self.obj_data['travelDist'].fillna(0) self.obj_data[obj_id]['lat_acc_roc'] = self.obj_data[obj_id]['lat_acc_diff'] / self.obj_data[obj_id][ 'time_diff'] self.obj_data[obj_id]['lon_acc_roc'] = self.obj_data[obj_id]['lon_acc_diff'] / self.obj_data[obj_id][ 'time_diff'] self.obj_data[obj_id]['accelH'] = self.obj_data[obj_id]['speedH_diff'] / self.obj_data[obj_id][ 'time_diff'] # get object id list self.obj_id_list = list(self.obj_data.keys()) self.ego_data = self.obj_data[1] def _get_driver_ctrl_data(self, df): """ Process and get drive ctrl information. Such as brake pedal, throttle pedal and steering wheel. Args: df: A dataframe of driver ctrl data. Returns: driver_ctrl_data: A dict of driver ctrl info. """ time_list = df['simTime'].values.tolist() frame_list = df['simFrame'].values.tolist() # df['brakePedal'] = df['brakePedal'] * 100 # brakePedal_list = df['brakePedal'].values.tolist() # df['throttlePedal'] = df['throttlePedal'] * 100 # throttlePedal_list = df['throttlePedal'].values.tolist() # steeringWheel_list = df['steeringWheel'].values.tolist() driver_ctrl_data = { "time_list": time_list, "frame_list": frame_list, # "brakePedal_list": brakePedal_list, # "throttlePedal_list": throttlePedal_list, # "steeringWheel_list": steeringWheel_list } return driver_ctrl_data