#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2023/11/27 @Last Modified: 2023/11/27 @Summary: Csv data process functions """ import os import numpy as np import pandas as pd from common import cal_velocity from data_info import CsvData import matplotlib.pyplot as plt class DataProcess(object): """ The data process class. It is a template to get evaluation raw data and process the raw data. Attributes: """ def __init__(self, data_path, config): self.data_path = data_path self.casePath = data_path # config info self.config = config self.safe_config = config.config['safe'] self.comfort_config = config.config['comfort'] self.accurate_config = config.config['accurate'] # data process self.ego_df = pd.DataFrame() self.object_df = pd.DataFrame() self.trajectory_df = pd.DataFrame() # self.obj_data = {} # self.ego_data = {} self.obj_id_list = {} self.car_info = {} self.report_info = {} self.driver_ctrl_data = {} self._process() def _process(self): # self._merge_csv() self._read_csv() self._columns_process() self._draw_track() # self._signal_mapping() # self.car_info = self._get_car_info(self.object_df) # self._compact_data() # self._abnormal_detect() # self._status_map(self.object_df) self._ego_df_process() self._object_df_process() self.report_info = self._get_report_info(self.ego_df) self.driver_ctrl_data = self._get_driver_ctrl_data(self.ego_df) def _read_csv(self): """ Read csv files to dataframe. """ # self.object_df = pd.read_csv(os.path.join(self.data_path, 'merged_ObjState.csv')) self.ego_df = pd.read_csv(os.path.join(self.data_path, 'EgoState_pji.csv')) self.object_df = pd.read_csv(os.path.join(self.data_path, 'objects.csv')) self.trajectory_df = pd.read_csv(os.path.join(self.data_path, 'trajectory_pji.csv')) # self.driver_ctrl_df = pd.read_csv(os.path.join(self.data_path, 'DriverCtrl.csv')) # self.road_mark_df = pd.read_csv(os.path.join(self.data_path, 'RoadMark.csv')) # self.road_pos_df = pd.read_csv(os.path.join(self.data_path, 'RoadPos.csv')) # self.traffic_light_df = pd.read_csv(os.path.join(self.data_path, 'TrafficLight.csv')) # self.traffic_signal_df = pd.read_csv(os.path.join(self.data_path, 'TrafficSign.csv')) # self.lane_info_df = pd.read_csv(os.path.join(self.data_path, 'LaneInfo.csv')).drop_duplicates() # self.status_df = pd.read_csv(os.path.join(self.data_path, 'VehicleState.csv')) # self.vehicle_sys_df = pd.read_csv( # os.path.join(self.data_path, 'VehicleSystems.csv')).drop_duplicates() # 车灯信息 def _columns_process(self): # ego data process self.ego_df.rename(columns={"speed_linear": "lon_v", "speed_angular": "speedH", "cmd_speed_linear": "cmd_lon_v", "cmd_speed_angular": "cmd_speedH", }, inplace=True) self.ego_df['playerId'] = 1 self.ego_df['lat_v'] = 0 self.ego_df['lat_acc'] = 0 # self.ego_df['lon_acc'] = 0 # object data process self.object_df.rename(columns={"simtime": "simTime", "FrameID": "simFrame", "id": "playerId", "poseX": "posX", "poseY": "posY", "poseZ": "posZ", "velocity_linear_x": "lon_v", "velocity_linear_y": "lat_v", "velocity_angular_x": "speedH", "acceleration_linear_x": "lon_acc", "acceleration_linear_y": "lat_acc", "acceleration_angular_x": "accelH"}, inplace=True) # Time, simtime, FrameID, id, HeadingAngle, poseX, poseY, poseZ, label, dimX, dimY, dimZ, # velocity_linear_x, velocity_linear_y, velocity_linear_z, velocity_angular_x, velocity_angular_y, velocity_angular_z, # acceleration_linear_x, acceleration_linear_y, acceleration_linear_z, acceleration_angular_x, acceleration_angular_y, acceleration_angular_z def _draw_track(self): """ """ df = self.ego_df.copy() plt.scatter(df['posX'], df['posY'], c=df['simTime'], s=0.1) plt.axis('equal') # 添加坐标轴标签和标题 plt.xlabel('posX') plt.ylabel('posY') plt.title('Trajectory') # 显示图形 plt.savefig(os.path.join(self.casePath, "./track.png")) plt.close() def _get_car_info(self, df): """ Args: df: Returns: """ first_row = df[df['playerId'] == 1].iloc[0].to_dict() length = first_row['dimX'] width = first_row['dimY'] height = first_row['dimZ'] offset = first_row['offX'] car_info = { "length": length, "width": width, "height": height, "offset": offset } return car_info def _mileage_cal(self, df): """ Calculate mileage of given df. Args: df: A dataframe of driving data. Returns: mileage: A float of the mileage(meter) of the driving data. """ travelDist = df['traveledDist'].values.tolist() travelDist = [x for x in travelDist if not np.isnan(x)] # mile_start = df['travelDist'].iloc[0] mile_start = travelDist[0] mile_end = travelDist[-1] mileage = mile_end - mile_start return mileage def _duration_cal(self, df): """ Calculate duration of given df. Args: df: A dataframe of driving data. Returns: duration: A float of the duration(second) of the driving data. """ time_start = df['simTime'].iloc[0] time_end = df['simTime'].iloc[-1] duration = time_end - time_start return duration def _get_report_info(self, df): """ Get report infomation from dataframe. Args: df: A dataframe of driving data. Returns: report_info: A dict of report infomation. """ mileage = self._mileage_cal(df) duration = self._duration_cal(df) report_info = { "mileage": mileage, "duration": duration } return report_info def _ego_df_process(self): # ego_df process self.ego_df['time_diff'] = self.ego_df['simTime'].diff() self.ego_df['lon_v_diff'] = self.ego_df['lon_v'].diff() self.ego_df['lon_acc'] = self.ego_df['lon_v_diff'] / self.ego_df['time_diff'] data = self.ego_df.copy() data.rename( columns={"speedY": "lat_v", "speedX": "lon_v", "accelY": "lat_acc", "accelX": "lon_acc", "dimZ": "speedH"}, inplace=True) # calculate common parameters # data['lat_v'] = data['speedX'] * np.sin(data['posH']) * -1 + data['speedY'] * np.cos(data['posH']) # data['lon_v'] = data['speedX'] * np.cos(data['posH']) + data['speedY'] * np.sin(data['posH']) data['v'] = data.apply(lambda row: cal_velocity(row['lat_v'], row['lon_v']), axis=1) data['time_diff'] = data['simTime'].diff() data['avg_speed'] = (data['v'] + data['v'].shift()) / 2 # 计算每个时间间隔的平均速度 data['distance_increment'] = data['avg_speed'] * data['time_diff'] # 计算每个时间间隔的距离增量 # 计算当前里程 data['traveledDist'] = data['distance_increment'].cumsum() # .cumsum()返回累计和列表 data['traveledDist'] = data['traveledDist'].fillna(0) # calculate acceleraton components # data['lat_acc'] = data['accelX'] * np.sin(data['posH']) * -1 + data['accelY'] * np.cos(data['posH']) # data['lon_acc'] = data['accelX'] * np.cos(data['posH']) + data['accelY'] * np.sin(data['posH']) # data['accel'] = data.apply(lambda row: cal_velocity(row['lat_acc'], row['lon_acc']), axis=1) data['accel'] = data['lon_acc'] data['lat_acc_diff'] = data['lat_acc'].diff() data['lon_acc_diff'] = data['lon_acc'].diff() data['speedH_diff'] = data['speedH'].diff() data['time_diff'] = data['simTime'].diff() # self.obj_data['avg_speed'] = (self.obj_data['v'] + self.obj_data['v'].shift()) / 2 # 计算每个时间间隔的平均速度 # self.obj_data['distance_increment'] = self.obj_data['avg_speed'] * self.obj_data['time_diff'] / 3.6 # 计算每个时间间隔的距离增量 # # # 计算当前里程 # self.obj_data['travelDist'] = self.obj_data['distance_increment'].cumsum() # self.obj_data['travelDist'] = self.obj_data['travelDist'].fillna(0) data['lat_acc_roc'] = data['lat_acc_diff'] / data['time_diff'] data['lon_acc_roc'] = data['lon_acc_diff'] / data['time_diff'] data['accelH'] = data['speedH_diff'] / data['time_diff'] self.ego_df = data.copy() def _object_df_process(self): """ Process the data of object dataframe. Save the data groupby object_ID. Returns: No returns. """ self.obj_id_list = list(set(self.object_df.playerId)) # self.object_df = pd.merge(self.object_df, self.ego_df, how='left') # calculate respective parameters # for obj_id, obj_data in self.object_df.groupby("playerId"): # self.obj_data[obj_id] = obj_data # self.obj_data[obj_id]['lat_acc_diff'] = self.obj_data[obj_id]['lat_acc'].diff() # self.obj_data[obj_id]['lon_acc_diff'] = self.obj_data[obj_id]['lon_acc'].diff() # self.obj_data[obj_id]['speedH_diff'] = self.obj_data[obj_id]['speedH'].diff() # # self.obj_data[obj_id]['time_diff'] = self.obj_data[obj_id]['simTime'].diff() # # self.obj_data['avg_speed'] = (self.obj_data['v'] + self.obj_data['v'].shift()) / 2 # 计算每个时间间隔的平均速度 # # self.obj_data['distance_increment'] = self.obj_data['avg_speed'] * self.obj_data['time_diff'] / 3.6 # 计算每个时间间隔的距离增量 # # # # # 计算当前里程 # # self.obj_data['travelDist'] = self.obj_data['distance_increment'].cumsum() # # self.obj_data['travelDist'] = self.obj_data['travelDist'].fillna(0) # # self.obj_data[obj_id]['lat_acc_roc'] = self.obj_data[obj_id]['lat_acc_diff'] / self.obj_data[obj_id][ # 'time_diff'] # self.obj_data[obj_id]['lon_acc_roc'] = self.obj_data[obj_id]['lon_acc_diff'] / self.obj_data[obj_id][ # 'time_diff'] # self.obj_data[obj_id]['accelH'] = self.obj_data[obj_id]['speedH_diff'] / self.obj_data[obj_id][ # 'time_diff'] # get object id list # self.obj_id_list = list(self.obj_data.keys()) # self.ego_data = self.obj_data[1] def _get_driver_ctrl_data(self, df): """ Process and get drive ctrl information. Such as brake pedal, throttle pedal and steering wheel. Args: df: A dataframe of driver ctrl data. Returns: driver_ctrl_data: A dict of driver ctrl info. """ time_list = df['simTime'].values.tolist() frame_list = df['simFrame'].values.tolist() # df['brakePedal'] = df['brakePedal'] * 100 # brakePedal_list = df['brakePedal'].values.tolist() # df['throttlePedal'] = df['throttlePedal'] * 100 # throttlePedal_list = df['throttlePedal'].values.tolist() # steeringWheel_list = df['steeringWheel'].values.tolist() driver_ctrl_data = { "time_list": time_list, "frame_list": frame_list, # "brakePedal_list": brakePedal_list, # "throttlePedal_list": throttlePedal_list, # "steeringWheel_list": steeringWheel_list } return driver_ctrl_data