|
- """
- @Authors: yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/11/27
- @Last Modified: 2023/11/27
- @Summary: Csv data process functions
- """
- import os
- import numpy as np
- import pandas as pd
- from common import cal_velocity
- from data_info import CsvData
- import matplotlib.pyplot as plt
- import warnings
- warnings.filterwarnings("ignore", category=FutureWarning, module='pandas')
- pd.set_option('future.no_silent_downcasting', True)
- class DataProcess(object):
- """
- The data process class. It is a template to get evaluation raw data and process the raw data.
- Attributes:
- """
- def __init__(self, data_path, config):
- self.data_path = data_path
- self.casePath = data_path
-
- self.config = config
-
-
-
- self.comfort_config = config.config['comfort']
- self.efficient_config = config.config['efficient']
-
- self.ego_df = pd.DataFrame()
- self.object_df = pd.DataFrame()
-
-
-
-
-
-
- self.obj_data = {}
- self.ego_data = {}
- self.obj_id_list = {}
- self.car_info = {}
- self.report_info = {}
- self.driver_ctrl_data = {}
- self._process()
- def _process(self):
-
- self._read_csv()
- self._draw_track()
-
-
-
-
-
- self._object_df_process()
- self.report_info = self._get_report_info(self.obj_data[1])
- self.driver_ctrl_data = self._get_driver_ctrl_data(self.ego_df)
- def _read_csv(self):
- """
- Read csv files to dataframe.
- Args:
- data_path: A str of the path of csv files
- Returns:
- No returns.
- """
-
-
- self.ego_df = pd.read_csv(os.path.join(self.data_path, 'EgoState_pji.csv'))
- self.ego_df['playerId'] = 1
-
-
-
-
-
-
-
-
-
- def _draw_track(self):
- """
- """
- df = self.ego_df.copy()
- plt.scatter(df['posX'], df['posY'], c=df['simTime'], s=0.1)
- plt.axis('equal')
-
- plt.xlabel('posX')
- plt.ylabel('posY')
- plt.title('Trajectory')
-
- plt.savefig(os.path.join(self.casePath, "./track.png"))
- plt.close()
- def _signal_mapping(self):
- pass
-
-
-
-
-
-
-
-
-
-
-
-
- def _get_car_info(self, df):
- """
- Args:
- df:
- Returns:
- """
- first_row = df[df['playerId'] == 1].iloc[0].to_dict()
- length = first_row['dimX']
- width = first_row['dimY']
- height = first_row['dimZ']
- offset = first_row['offX']
- car_info = {
- "length": length,
- "width": width,
- "height": height,
- "offset": offset
- }
- return car_info
- def _compact_data(self):
- """
- Extra necessary data from dataframes.
- Returns:
- """
- self.object_df = self.object_df[CsvData.OBJECT_INFO].copy()
- def _abnormal_detect(self):
- """
- Detect the head of the csv whether begin with 0 or not.
- Returns:
- A dataframe, which 'time' column begin with 0.
- """
- pass
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- def _mileage_cal(self, df):
- """
- Calculate mileage of given df.
- Args:
- df: A dataframe of driving data.
- Returns:
- mileage: A float of the mileage(meter) of the driving data.
- """
- travelDist = df['traveledDist'].values.tolist()
- travelDist = [x for x in travelDist if not np.isnan(x)]
-
- mile_start = travelDist[0]
- mile_end = travelDist[-1]
- mileage = mile_end - mile_start
- return mileage
- def _duration_cal(self, df):
- """
- Calculate duration of given df.
- Args:
- df: A dataframe of driving data.
- Returns:
- duration: A float of the duration(second) of the driving data.
- """
- time_start = df['simTime'].iloc[0]
- time_end = df['simTime'].iloc[-1]
- duration = time_end - time_start
- return duration
- def _get_report_info(self, df):
- """
- Get report infomation from dataframe.
- Args:
- df: A dataframe of driving data.
- Returns:
- report_info: A dict of report infomation.
- """
- mileage = self._mileage_cal(df)
- duration = self._duration_cal(df)
- report_info = {
- "mileage": mileage,
- "duration": duration
- }
- return report_info
-
-
-
-
- def _object_df_process(self):
- """
- Process the data of object dataframe. Save the data groupby object_ID.
- Returns:
- No returns.
- """
- data = self.ego_df.copy()
- data.rename(
- columns={"speedY": "lat_v", "speedX": "lon_v", "accelY": "lat_acc", "accelX": "lon_acc", "dimZ": "speedH"},
- inplace=True)
-
-
-
- data['v'] = data.apply(lambda row: cal_velocity(row['lat_v'], row['lon_v']), axis=1)
-
-
-
- data['accel'] = data.apply(lambda row: cal_velocity(row['lat_acc'], row['lon_acc']), axis=1)
-
- self.object_df = data.copy()
-
- for obj_id, obj_data in data.groupby("playerId"):
- self.obj_data[obj_id] = obj_data
- self.obj_data[obj_id]['lat_acc_diff'] = self.obj_data[obj_id]['lat_acc'].diff()
- self.obj_data[obj_id]['lon_acc_diff'] = self.obj_data[obj_id]['lon_acc'].diff()
- self.obj_data[obj_id]['speedH_diff'] = self.obj_data[obj_id]['speedH'].diff()
- self.obj_data[obj_id]['time_diff'] = self.obj_data[obj_id]['simTime'].diff()
- self.obj_data[obj_id]['lat_acc_roc'] = self.obj_data[obj_id]['lat_acc_diff'] / self.obj_data[obj_id][
- 'time_diff']
- self.obj_data[obj_id]['lon_acc_roc'] = self.obj_data[obj_id]['lon_acc_diff'] / self.obj_data[obj_id][
- 'time_diff']
- self.obj_data[obj_id]['accelH'] = self.obj_data[obj_id]['speedH_diff'] / self.obj_data[obj_id][
- 'time_diff']
-
- self.obj_id_list = list(self.obj_data.keys())
- self.ego_data = self.obj_data[1]
- def _get_driver_ctrl_data(self, df):
- """
- Process and get drive ctrl information. Such as brake pedal, throttle pedal and steering wheel.
- Args:
- df: A dataframe of driver ctrl data.
- Returns:
- driver_ctrl_data: A dict of driver ctrl info.
- """
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
-
-
-
-
-
- driver_ctrl_data = {
- "time_list": time_list,
- "frame_list": frame_list,
-
-
-
- }
- return driver_ctrl_data
|