#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2024 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn) @Data: 2024/10/17 @Last Modified: 2024/10/17 @Summary: Evaluation functions """ import os import sys import yaml import traceback import numpy as np import pandas as pd from collections import Counter from pathlib import Path root_path = Path(__file__).resolve().parent.parent sys.path.append(str(root_path)) from models.common import log from config import config log_path = config.LOG_PATH # 后边改成python包传递路径 logger = log.get_logger(log_path) class DataQuality(object): def __init__(self, df=None): self.df = df self.frame_list = [] self.frame_diff_list = [] self.frame_diff_counter = {} self.total_frame_count = 0 self.max_frame_number = 0 self.frame_loss_count = 0 self.frame_loss_rate = 0 self.frame_loss_max = 0 self.frame_loss_max_start = [] self.result = "" def quality_detect(self): self.frame_extract() if self.frame_list: self.cal_total_frame_count() self.cal_max_frame_number() self.cal_frame_diff() self.cal_frame_loss_count() self.cal_frame_loss_rate() self.cal_frame_loss_max() self.cal_frame_diff_counter() self.result_print() else: self.result = "No data in this file." print("No data in this file.") def frame_extract(self): self.df = self.df.dropna(subset=["simFrame"]) self.frame_list = sorted(self.df["simFrame"].unique()) def cal_frame_diff(self): maxx = -1 for i in range(1, len(self.frame_list)): diff = self.frame_list[i] - self.frame_list[i - 1] - 1 if diff > maxx and diff != 0: maxx = diff self.frame_loss_max_start = [self.frame_list[i - 1], self.frame_list[i]] self.frame_diff_list.append(int(diff)) def cal_total_frame_count(self): self.total_frame_count = len(self.frame_list) - 1 if self.frame_list else 0 def cal_max_frame_number(self): self.max_frame_number = self.frame_list[-1] if self.frame_list else 0 def cal_frame_loss_count(self): self.frame_loss_count = sum(self.frame_diff_list) def cal_frame_loss_rate(self): if self.total_frame_count + 1 > 0: self.frame_loss_rate = round( self.frame_loss_count / (self.total_frame_count + 1), 4 ) else: self.frame_loss_rate = 0 def cal_frame_loss_max(self): self.frame_loss_max = max(self.frame_diff_list) if self.frame_diff_list else 0 def cal_frame_diff_counter(self): self.frame_diff_counter = dict(Counter(self.frame_diff_list)) sorted_items = sorted( self.frame_diff_counter.items(), key=lambda x: x[1], reverse=True ) self.frame_diff_counter = dict(sorted_items[1:]) # 跳过不丢帧次数统计 def result_print(self): self.result += f"丢帧率: {self.frame_loss_rate * 100:.2f}%, " self.result += f"总帧数: {self.total_frame_count + 1}, " self.result += f"丢帧数量: {self.frame_loss_count}, " self.result += f"最长丢帧数量: {self.frame_loss_max}, " self.result += f"最长丢帧时起始帧: {self.frame_loss_max_start}, " self.result += f"丢帧数及次数统计: {self.frame_diff_counter}." print("此文件总帧数:", self.total_frame_count + 1) print("此文件最大帧数:", self.max_frame_number) print("此文件丢帧数量:", self.frame_loss_count) print("此文件丢帧率:", f"{self.frame_loss_rate * 100:.2f}%") print("此文件最长丢帧数量:", self.frame_loss_max) print("此文件最长丢帧时起始帧:", self.frame_loss_max_start) print("此文件丢帧数及次数统计:", self.frame_diff_counter) def get_all_files(path): return [ os.path.join(root, file) for root, _, files in os.walk(path) for file in files if file.endswith(".csv") ] def frame_loss_statistic(path): logger.info(f"Start Check Frame loss statistic: {path}") file_names = get_all_files(path) frame_loss_dict = {} for file in file_names: file_name = os.path.basename(file) print(f"\n[{file_name}]") df = pd.read_csv(file, index_col=False) d_q = DataQuality(df) d_q.quality_detect() frame_loss_dict[file_name] = { "result": d_q.result, "frame_loss_rate": d_q.frame_loss_rate, } return frame_loss_dict def data_precheck(file_path, case_name): """检查数据是否是有效数据,丢帧率是否在阈值内,数据中检查关键信息是否齐全""" logger.info(f"[case:{case_name}] Check if the data is valid: Start.") if not os.path.exists(file_path): logger.error(f"[case:{file_path}] SINGLE_CASE_EVAL: Invalid dataPath!") exit(1) frame_loss_dict = {} try: frame_loss_dict = frame_loss_statistic(file_path) except Exception as e: logger.error( f"[case:{case_name}] SINGLE_CASE_EVAL: frame loss statistic ERROR: {repr(e)}", exc_info=True, ) exit(-1) # 检查帧丢失率 for key, value in frame_loss_dict.items(): if value["frame_loss_rate"] > config.DATA_QUALITY_LEVEL_1: logger.error( f"[case:{case_name}] SINGLE_CASE_EVAL: [{key}] frame loss rate > {config.DATA_QUALITY_LEVEL_1}%: {value['result']}" ) return False # 检查关键信息是否齐全(根据实际情况修改) logger.info(f"[case:{case_name}] Check if the data is valid: End.") return True class DataPreprocessing: def __init__(self, case_name, mode_label): # Base info self.data_path = os.path.join(config.PROCESSED_DATA_PATH, case_name) self.case_name = case_name # Initialize data containers self.object_df = pd.DataFrame() self.driver_ctrl_df = pd.DataFrame() self.vehicle_sys_df = pd.DataFrame() self.ego_data_df = pd.DataFrame() self.config = config self.mode_label = mode_label self.obj_data = {} self.ego_data = {} self.obj_id_list = [] # Data quality level self.data_quality_level = config.DATA_QUALITY_LEVEL_1 # Load and process data self._merge_csv() self._read_csv() self._process_object_df() self.report_info = self._get_report_info(self.obj_data.get(1, pd.DataFrame())) self._process_mode() def _process_mode(self): if self.mode_label == "real_car": self._process_real_car() elif self.mode_label == "PGVIL": self._process_PGVIL() def _process_real_car(self): # Process real car data (implementation needed) pass def _process_PGVIL(self): """Process PGVIL data""" self.driver_ctrl_data = self._get_driver_ctrl_data(self.driver_ctrl_df) @staticmethod def cal_velocity(lat_v, lon_v): """ Calculate the resultant velocity from lateral and longitudinal components. Args: lat_v: Lateral velocity in m/s lon_v: Longitudinal velocity in m/s Returns: Resultant velocity in km/h """ return np.sqrt(lat_v**2 + lon_v**2) # Using numpy for vectorized operations def _process_object_df(self): EGO_PLAYER_ID = 1 data = self.object_df.copy() # calculate common parameters data["lat_v"] = data["speedY"] * 1 data["lon_v"] = data["speedX"] * 1 data["v"] = data.apply( lambda row: self.cal_velocity(row["lat_v"], row["lon_v"]), axis=1 ) data["v"] = data["v"] # km/h # calculate acceleraton components data["lat_acc"] = data["accelY"] * 1 data["lon_acc"] = data["accelX"] * 1 data["accel"] = data.apply( lambda row: self.cal_velocity(row["lat_acc"], row["lon_acc"]), axis=1 ) data = data.dropna(subset=["type"]) data.reset_index(drop=True, inplace=True) self.object_df = data.copy() # calculate respective parameters for obj_id, obj_data in data.groupby("playerId"): self.obj_data[obj_id] = obj_data self.obj_data[obj_id]["time_diff"] = self.obj_data[obj_id]["simTime"].diff() self.obj_data[obj_id]["lat_acc_diff"] = self.obj_data[obj_id][ "lat_acc" ].diff() self.obj_data[obj_id]["lon_acc_diff"] = self.obj_data[obj_id][ "lon_acc" ].diff() self.obj_data[obj_id]["yawrate_diff"] = self.obj_data[obj_id][ "speedH" ].diff() self.obj_data[obj_id]["lat_acc_roc"] = ( self.obj_data[obj_id]["lat_acc_diff"] / self.obj_data[obj_id]["time_diff"] ) self.obj_data[obj_id]["lon_acc_roc"] = ( self.obj_data[obj_id]["lon_acc_diff"] / self.obj_data[obj_id]["time_diff"] ) self.obj_data[obj_id]["accelH"] = ( self.obj_data[obj_id]["yawrate_diff"] / self.obj_data[obj_id]["time_diff"] ) self.obj_data[obj_id]["lat_acc_roc"] = self.obj_data[obj_id][ "lat_acc_roc" ].replace([np.inf, -np.inf], [9999, -9999]) self.obj_data[obj_id]["lon_acc_roc"] = self.obj_data[obj_id][ "lon_acc_roc" ].replace([np.inf, -np.inf], [9999, -9999]) self.obj_data[obj_id]["accelH"] = self.obj_data[obj_id]["accelH"].replace( [np.inf, -np.inf], [9999, -9999] ) # get object id list self.obj_id_list = list(self.obj_data.keys()) self.ego_data = self.obj_data[EGO_PLAYER_ID] def _get_driver_ctrl_data(self, df): """ Process and get driver control information. Args: df: A DataFrame containing driver control data. Returns: A dictionary of driver control info. """ driver_ctrl_data = { "time_list": df["simTime"].round(2).tolist(), "frame_list": df["simFrame"].tolist(), "brakePedal_list": ( (df["brakePedal"] * 100).tolist() if df["brakePedal"].max() < 1 else df["brakePedal"].tolist() ), "throttlePedal_list": ( (df["throttlePedal"] * 100).tolist() if df["throttlePedal"].max() < 1 else df["throttlePedal"].tolist() ), "steeringWheel_list": df["steeringWheel"].tolist(), } return driver_ctrl_data def _read_csv(self): """Read CSV files into DataFrames.""" self.driver_ctrl_df = pd.read_csv( os.path.join(self.data_path, "DriverCtrl.csv") ).drop_duplicates() self.object_df = pd.read_csv( os.path.join(self.data_path, "merged_ObjState.csv"), dtype={"simTime": float}, ).drop_duplicates(subset=["simTime", "simFrame", "playerId"]) self.road_mark_df = pd.read_csv( os.path.join(self.data_path, "RoadMark.csv"), dtype={"simTime": float} ).drop_duplicates() self.road_pos_df = pd.read_csv( os.path.join(self.data_path, "RoadPos.csv"), dtype={"simTime": float} ).drop_duplicates() self.traffic_light_df = pd.read_csv( os.path.join(self.data_path, "TrafficLight.csv"), dtype={"simTime": float} ).drop_duplicates() self.traffic_signal_df = pd.read_csv( os.path.join(self.data_path, "TrafficSign.csv"), dtype={"simTime": float} ).drop_duplicates() self.lane_info_new_df = pd.read_csv( os.path.join(self.data_path, "LaneInfo_new.csv"), dtype={"simTime": float} ).drop_duplicates() self.road_info_df = pd.read_csv( os.path.join(self.data_path, "RoadInfo.csv"), dtype={"simTime": float} ).drop_duplicates() self.inter_info_df = pd.read_csv( os.path.join(self.data_path, "InterInfo.csv"), dtype={"simTime": float} ).drop_duplicates() self.cross_walk_df = pd.read_csv( os.path.join(self.data_path, "CrosswalkInfo.csv"), dtype={"simTime": float} ).drop_duplicates() def _get_report_info(self, df): """Extract report information from the DataFrame.""" mileage = self._mileage_cal(df) duration = self._duration_cal(df) return {"mileage": mileage, "duration": duration} def _mileage_cal(self, df): """Calculate mileage based on the driving data.""" if df["travelDist"].nunique() == 1: df["time_diff"] = df["simTime"].diff().fillna(0) df["avg_speed"] = (df["v"] + df["v"].shift()).fillna(0) / 2 df["distance_increment"] = df["avg_speed"] * df["time_diff"] / 3.6 df["travelDist"] = df["distance_increment"].cumsum().fillna(0) mileage = round(df["travelDist"].iloc[-1] - df["travelDist"].iloc[0], 2) return mileage return 0.0 # Return 0 if travelDist is not valid def _duration_cal(self, df): """Calculate duration of the driving data.""" return df["simTime"].iloc[-1] - df["simTime"].iloc[0] def _merge_csv(self): """Merge CSV files into one consolidated DataFrame.""" df_object = pd.read_csv( os.path.join(self.data_path, "ObjState.csv"), dtype={"simTime": float} ).drop_duplicates() df_laneinfo = pd.read_csv( os.path.join(self.data_path, "LaneInfo.csv"), dtype={"simTime": float} ).drop_duplicates() df_roadPos = pd.read_csv( os.path.join(self.data_path, "RoadPos.csv"), dtype={"simTime": float} ).drop_duplicates() df_vehicleSys = pd.read_csv( os.path.join(self.data_path, "VehicleSystems.csv"), dtype={"simTime": float} ).drop_duplicates() ego_map_df = pd.read_csv( os.path.join(self.data_path, "EgoMap.csv"), dtype={"simTime": float} ).drop_duplicates() # Rename columns for clarity df_laneinfo = df_laneinfo.rename(columns={"curvHor": "curvHor", "id": "laneId"}) df_laneinfo["curvHor"] = df_laneinfo["curvHor"].round(3) # Merge data combined_df = pd.merge( df_roadPos, df_laneinfo, on=["simTime", "simFrame", "playerId", "laneId"], how="inner", ) df_laneinfo_new = combined_df[ ["simTime", "simFrame", "playerId", "curvHor", "curvHorDot"] ].drop_duplicates() df_roadPos = df_roadPos[ ["simTime", "simFrame", "playerId", "laneOffset", "rollRel", "pitchRel"] ].copy() df_vehicleSys = df_vehicleSys[ ["simTime", "simFrame", "playerId", "lightMask", "steering"] ].copy() # Final merge to create complete DataFrame merged_df = pd.merge( df_object, df_vehicleSys, on=["simTime", "simFrame", "playerId"], how="left" ) merged_df = pd.merge( merged_df, df_laneinfo_new, on=["simTime", "simFrame", "playerId"], how="left", ) merged_df = pd.merge( merged_df, df_roadPos, on=["simTime", "simFrame", "playerId"], how="left" ) # Columns to copy from ego map columns_to_copy = [ "simTime", "simFrame", "playerId", "road_link_id", "road_fc", "road_type", "road_speed_max", "road_speed_min", ] # Merge EGO data with map data merged_df = pd.merge( merged_df, ego_map_df[columns_to_copy], on=["simTime", "simFrame", "playerId"], how="left", ) # Clean up and save merged_df.drop_duplicates(inplace=True) merged_df = merged_df[merged_df.simFrame > 0].copy() merged_df.to_csv( os.path.join(self.data_path, "merged_ObjState.csv"), index=False ) logger.info("merged_ObjState.csv has been saved.")