123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2024 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
- @Data: 2024/10/17
- @Last Modified: 2024/10/17
- @Summary: Evaluation functions
- """
- import os
- import sys
- import yaml
- import traceback
- import numpy as np
- import pandas as pd
- from collections import Counter
- from pathlib import Path
- root_path = Path(__file__).resolve().parent.parent
- sys.path.append(str(root_path))
- from models.common import log
- from config import config
- log_path = config.LOG_PATH # 后边改成python包传递路径
- logger = log.get_logger(log_path)
- class DataQuality(object):
- def __init__(self, df=None):
- self.df = df
- self.frame_list = []
- self.frame_diff_list = []
- self.frame_diff_counter = {}
- self.total_frame_count = 0
- self.max_frame_number = 0
- self.frame_loss_count = 0
- self.frame_loss_rate = 0
- self.frame_loss_max = 0
- self.frame_loss_max_start = []
- self.result = ""
- def quality_detect(self):
- self.frame_extract()
- if self.frame_list:
- self.cal_total_frame_count()
- self.cal_max_frame_number()
- self.cal_frame_diff()
- self.cal_frame_loss_count()
- self.cal_frame_loss_rate()
- self.cal_frame_loss_max()
- self.cal_frame_diff_counter()
- self.result_print()
- else:
- self.result = "No data in this file."
- print("No data in this file.")
- def frame_extract(self):
- self.df = self.df.dropna(subset=["simFrame"])
- self.frame_list = sorted(self.df["simFrame"].unique())
- def cal_frame_diff(self):
- maxx = -1
- for i in range(1, len(self.frame_list)):
- diff = self.frame_list[i] - self.frame_list[i - 1] - 1
- if diff > maxx and diff != 0:
- maxx = diff
- self.frame_loss_max_start = [self.frame_list[i - 1], self.frame_list[i]]
- self.frame_diff_list.append(int(diff))
- def cal_total_frame_count(self):
- self.total_frame_count = len(self.frame_list) - 1 if self.frame_list else 0
- def cal_max_frame_number(self):
- self.max_frame_number = self.frame_list[-1] if self.frame_list else 0
- def cal_frame_loss_count(self):
- self.frame_loss_count = sum(self.frame_diff_list)
- def cal_frame_loss_rate(self):
- if self.total_frame_count + 1 > 0:
- self.frame_loss_rate = round(
- self.frame_loss_count / (self.total_frame_count + 1), 4
- )
- else:
- self.frame_loss_rate = 0
- def cal_frame_loss_max(self):
- self.frame_loss_max = max(self.frame_diff_list) if self.frame_diff_list else 0
- def cal_frame_diff_counter(self):
- self.frame_diff_counter = dict(Counter(self.frame_diff_list))
- sorted_items = sorted(
- self.frame_diff_counter.items(), key=lambda x: x[1], reverse=True
- )
- self.frame_diff_counter = dict(sorted_items[1:]) # 跳过不丢帧次数统计
- def result_print(self):
- self.result += f"丢帧率: {self.frame_loss_rate * 100:.2f}%, "
- self.result += f"总帧数: {self.total_frame_count + 1}, "
- self.result += f"丢帧数量: {self.frame_loss_count}, "
- self.result += f"最长丢帧数量: {self.frame_loss_max}, "
- self.result += f"最长丢帧时起始帧: {self.frame_loss_max_start}, "
- self.result += f"丢帧数及次数统计: {self.frame_diff_counter}."
- print("此文件总帧数:", self.total_frame_count + 1)
- print("此文件最大帧数:", self.max_frame_number)
- print("此文件丢帧数量:", self.frame_loss_count)
- print("此文件丢帧率:", f"{self.frame_loss_rate * 100:.2f}%")
- print("此文件最长丢帧数量:", self.frame_loss_max)
- print("此文件最长丢帧时起始帧:", self.frame_loss_max_start)
- print("此文件丢帧数及次数统计:", self.frame_diff_counter)
- def get_all_files(path):
- return [
- os.path.join(root, file)
- for root, _, files in os.walk(path)
- for file in files
- if file.endswith(".csv")
- ]
- def frame_loss_statistic(path):
- logger.info(f"Start Check Frame loss statistic: {path}")
- file_names = get_all_files(path)
- frame_loss_dict = {}
- for file in file_names:
- file_name = os.path.basename(file)
- print(f"\n[{file_name}]")
- df = pd.read_csv(file, index_col=False)
- d_q = DataQuality(df)
- d_q.quality_detect()
- frame_loss_dict[file_name] = {
- "result": d_q.result,
- "frame_loss_rate": d_q.frame_loss_rate,
- }
- return frame_loss_dict
- def data_precheck(file_path, case_name):
- """检查数据是否是有效数据,丢帧率是否在阈值内,数据中检查关键信息是否齐全"""
- logger.info(f"[case:{case_name}] Check if the data is valid: Start.")
- if not os.path.exists(file_path):
- logger.error(f"[case:{file_path}] SINGLE_CASE_EVAL: Invalid dataPath!")
- exit(1)
- frame_loss_dict = {}
- try:
- frame_loss_dict = frame_loss_statistic(file_path)
- except Exception as e:
- logger.error(
- f"[case:{case_name}] SINGLE_CASE_EVAL: frame loss statistic ERROR: {repr(e)}",
- exc_info=True,
- )
- exit(-1)
- # 检查帧丢失率
- for key, value in frame_loss_dict.items():
- if value["frame_loss_rate"] > config.DATA_QUALITY_LEVEL_1:
- logger.error(
- f"[case:{case_name}] SINGLE_CASE_EVAL: [{key}] frame loss rate > {config.DATA_QUALITY_LEVEL_1}%: {value['result']}"
- )
- return False
- # 检查关键信息是否齐全(根据实际情况修改)
- logger.info(f"[case:{case_name}] Check if the data is valid: End.")
- return True
- class DataPreprocessing:
- def __init__(self, case_name, mode_label):
- # Base info
- self.data_path = os.path.join(config.PROCESSED_DATA_PATH, case_name)
- self.case_name = case_name
- # Initialize data containers
- self.object_df = pd.DataFrame()
- self.driver_ctrl_df = pd.DataFrame()
- self.vehicle_sys_df = pd.DataFrame()
- self.ego_data_df = pd.DataFrame()
- self.config = config
- self.mode_label = mode_label
- self.obj_data = {}
- self.ego_data = {}
- self.obj_id_list = []
- # Data quality level
- self.data_quality_level = config.DATA_QUALITY_LEVEL_1
- # Load and process data
- self._merge_csv()
- self._read_csv()
- self._process_object_df()
- self.report_info = self._get_report_info(self.obj_data.get(1, pd.DataFrame()))
- self._process_mode()
- def _process_mode(self):
- if self.mode_label == "real_car":
- self._process_real_car()
- elif self.mode_label == "PGVIL":
- self._process_PGVIL()
- def _process_real_car(self):
- # Process real car data (implementation needed)
- pass
- def _process_PGVIL(self):
- """Process PGVIL data"""
- self.driver_ctrl_data = self._get_driver_ctrl_data(self.driver_ctrl_df)
- @staticmethod
- def cal_velocity(lat_v, lon_v):
- """
- Calculate the resultant velocity from lateral and longitudinal components.
- Args:
- lat_v: Lateral velocity in m/s
- lon_v: Longitudinal velocity in m/s
- Returns:
- Resultant velocity in km/h
- """
- return np.sqrt(lat_v**2 + lon_v**2) # Using numpy for vectorized operations
- def _process_object_df(self):
- EGO_PLAYER_ID = 1
- data = self.object_df.copy()
- # calculate common parameters
- data["lat_v"] = data["speedY"] * 1
- data["lon_v"] = data["speedX"] * 1
- data["v"] = data.apply(
- lambda row: self.cal_velocity(row["lat_v"], row["lon_v"]), axis=1
- )
- data["v"] = data["v"] # km/h
- # calculate acceleraton components
- data["lat_acc"] = data["accelY"] * 1
- data["lon_acc"] = data["accelX"] * 1
- data["accel"] = data.apply(
- lambda row: self.cal_velocity(row["lat_acc"], row["lon_acc"]), axis=1
- )
- data = data.dropna(subset=["type"])
- data.reset_index(drop=True, inplace=True)
- self.object_df = data.copy()
- # calculate respective parameters
- for obj_id, obj_data in data.groupby("playerId"):
- self.obj_data[obj_id] = obj_data
- self.obj_data[obj_id]["time_diff"] = self.obj_data[obj_id]["simTime"].diff()
- self.obj_data[obj_id]["lat_acc_diff"] = self.obj_data[obj_id][
- "lat_acc"
- ].diff()
- self.obj_data[obj_id]["lon_acc_diff"] = self.obj_data[obj_id][
- "lon_acc"
- ].diff()
- self.obj_data[obj_id]["yawrate_diff"] = self.obj_data[obj_id][
- "speedH"
- ].diff()
- self.obj_data[obj_id]["lat_acc_roc"] = (
- self.obj_data[obj_id]["lat_acc_diff"]
- / self.obj_data[obj_id]["time_diff"]
- )
- self.obj_data[obj_id]["lon_acc_roc"] = (
- self.obj_data[obj_id]["lon_acc_diff"]
- / self.obj_data[obj_id]["time_diff"]
- )
- self.obj_data[obj_id]["accelH"] = (
- self.obj_data[obj_id]["yawrate_diff"]
- / self.obj_data[obj_id]["time_diff"]
- )
- self.obj_data[obj_id]["lat_acc_roc"] = self.obj_data[obj_id][
- "lat_acc_roc"
- ].replace([np.inf, -np.inf], [9999, -9999])
- self.obj_data[obj_id]["lon_acc_roc"] = self.obj_data[obj_id][
- "lon_acc_roc"
- ].replace([np.inf, -np.inf], [9999, -9999])
- self.obj_data[obj_id]["accelH"] = self.obj_data[obj_id]["accelH"].replace(
- [np.inf, -np.inf], [9999, -9999]
- )
- # get object id list
- self.obj_id_list = list(self.obj_data.keys())
- self.ego_data = self.obj_data[EGO_PLAYER_ID]
- def _get_driver_ctrl_data(self, df):
- """
- Process and get driver control information.
- Args:
- df: A DataFrame containing driver control data.
- Returns:
- A dictionary of driver control info.
- """
- driver_ctrl_data = {
- "time_list": df["simTime"].round(2).tolist(),
- "frame_list": df["simFrame"].tolist(),
- "brakePedal_list": (
- (df["brakePedal"] * 100).tolist()
- if df["brakePedal"].max() < 1
- else df["brakePedal"].tolist()
- ),
- "throttlePedal_list": (
- (df["throttlePedal"] * 100).tolist()
- if df["throttlePedal"].max() < 1
- else df["throttlePedal"].tolist()
- ),
- "steeringWheel_list": df["steeringWheel"].tolist(),
- }
- return driver_ctrl_data
- def _read_csv(self):
- """Read CSV files into DataFrames."""
- self.driver_ctrl_df = pd.read_csv(
- os.path.join(self.data_path, "DriverCtrl.csv")
- ).drop_duplicates()
- self.object_df = pd.read_csv(
- os.path.join(self.data_path, "merged_ObjState.csv"),
- dtype={"simTime": float},
- ).drop_duplicates(subset=["simTime", "simFrame", "playerId"])
- self.road_mark_df = pd.read_csv(
- os.path.join(self.data_path, "RoadMark.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- self.road_pos_df = pd.read_csv(
- os.path.join(self.data_path, "RoadPos.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- self.traffic_light_df = pd.read_csv(
- os.path.join(self.data_path, "TrafficLight.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- self.traffic_signal_df = pd.read_csv(
- os.path.join(self.data_path, "TrafficSign.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- self.lane_info_new_df = pd.read_csv(
- os.path.join(self.data_path, "LaneInfo_new.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- self.road_info_df = pd.read_csv(
- os.path.join(self.data_path, "RoadInfo.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- self.inter_info_df = pd.read_csv(
- os.path.join(self.data_path, "InterInfo.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- self.cross_walk_df = pd.read_csv(
- os.path.join(self.data_path, "CrosswalkInfo.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- def _get_report_info(self, df):
- """Extract report information from the DataFrame."""
- mileage = self._mileage_cal(df)
- duration = self._duration_cal(df)
- return {"mileage": mileage, "duration": duration}
- def _mileage_cal(self, df):
- """Calculate mileage based on the driving data."""
- if df["travelDist"].nunique() == 1:
- df["time_diff"] = df["simTime"].diff().fillna(0)
- df["avg_speed"] = (df["v"] + df["v"].shift()).fillna(0) / 2
- df["distance_increment"] = df["avg_speed"] * df["time_diff"] / 3.6
- df["travelDist"] = df["distance_increment"].cumsum().fillna(0)
- mileage = round(df["travelDist"].iloc[-1] - df["travelDist"].iloc[0], 2)
- return mileage
- return 0.0 # Return 0 if travelDist is not valid
- def _duration_cal(self, df):
- """Calculate duration of the driving data."""
- return df["simTime"].iloc[-1] - df["simTime"].iloc[0]
- def _merge_csv(self):
- """Merge CSV files into one consolidated DataFrame."""
- df_object = pd.read_csv(
- os.path.join(self.data_path, "ObjState.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- df_laneinfo = pd.read_csv(
- os.path.join(self.data_path, "LaneInfo.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- df_roadPos = pd.read_csv(
- os.path.join(self.data_path, "RoadPos.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- df_vehicleSys = pd.read_csv(
- os.path.join(self.data_path, "VehicleSystems.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- ego_map_df = pd.read_csv(
- os.path.join(self.data_path, "EgoMap.csv"), dtype={"simTime": float}
- ).drop_duplicates()
- # Rename columns for clarity
- df_laneinfo = df_laneinfo.rename(columns={"curvHor": "curvHor", "id": "laneId"})
- df_laneinfo["curvHor"] = df_laneinfo["curvHor"].round(3)
- # Merge data
- combined_df = pd.merge(
- df_roadPos,
- df_laneinfo,
- on=["simTime", "simFrame", "playerId", "laneId"],
- how="inner",
- )
- df_laneinfo_new = combined_df[
- ["simTime", "simFrame", "playerId", "curvHor", "curvHorDot"]
- ].drop_duplicates()
- df_roadPos = df_roadPos[
- ["simTime", "simFrame", "playerId", "laneOffset", "rollRel", "pitchRel"]
- ].copy()
- df_vehicleSys = df_vehicleSys[
- ["simTime", "simFrame", "playerId", "lightMask", "steering"]
- ].copy()
- # Final merge to create complete DataFrame
- merged_df = pd.merge(
- df_object, df_vehicleSys, on=["simTime", "simFrame", "playerId"], how="left"
- )
- merged_df = pd.merge(
- merged_df,
- df_laneinfo_new,
- on=["simTime", "simFrame", "playerId"],
- how="left",
- )
- merged_df = pd.merge(
- merged_df, df_roadPos, on=["simTime", "simFrame", "playerId"], how="left"
- )
- # Columns to copy from ego map
- columns_to_copy = [
- "simTime",
- "simFrame",
- "playerId",
- "road_link_id",
- "road_fc",
- "road_type",
- "road_speed_max",
- "road_speed_min",
- ]
- # Merge EGO data with map data
- merged_df = pd.merge(
- merged_df,
- ego_map_df[columns_to_copy],
- on=["simTime", "simFrame", "playerId"],
- how="left",
- )
- # Clean up and save
- merged_df.drop_duplicates(inplace=True)
- merged_df = merged_df[merged_df.simFrame > 0].copy()
- merged_df.to_csv(
- os.path.join(self.data_path, "merged_ObjState.csv"), index=False
- )
- logger.info("merged_ObjState.csv has been saved.")
|