#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2024/02/21 @Last Modified: 2024/02/21 @Summary: The template of custom indicator. """ import pandas as pd import numpy as np import scipy.signal as sg from common import zip_time_pairs, continuous_group, continous_judge from log import logger """import functions""" class CustomMetric(object): def __init__(self, all_data, case_name): self.data = all_data self.case_name = case_name self.markline_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.result = { "name": "ica车道中心线横向距离极大值", "value": [], # "weight": [], "tableData": { "avg": "", # 平均值,或指标值 "max": "", "min": "" }, "reportData": { "name": "车辆中心线横向距离(m)", # "legend": [], # 如果有多个data,则需要增加data对应的说明,如:["横向加速度", "纵向加速度"] "data": [], "markLine": [], "range": [], }, "statusFlag": {} } self.ego_df = pd.DataFrame() self.df_lka = pd.DataFrame() self.center_dist_with_nan = list() self.center_time_list = list() self.center_frame_list = list() self.center_dist = list() self.run() def data_extract(self): self.ego_df = self.data.ego_data self.df_lka = self.ego_df[self.ego_df['ICA_status'].isin( ["Only_Longitudinal_Control", "LLC_Follow_Line", "LLC_Follow_Vehicle"])].copy() if self.df_lka.empty: self.result['statusFlag']['functionICA'] = False else: self.result['statusFlag']['functionICA'] = True # self.df_lka = self.ego_df[self.ego_df['LKA_status'] == "Active"].copy() # self.config = self.data.config # self.function_config = self.config.config['function'] # self.optimal_dict = self.function_config['optimal'] def data_analyze(self): df_lka = self.df_lka self.center_dist_with_nan = df_lka['laneOffset'].to_list() self.center_time_list = df_lka['simTime'].to_list() self.center_frame_list = df_lka['simFrame'].to_list() self.center_dist = [x for x in self.center_dist_with_nan if not np.isnan(x)] if not self.center_dist: self.result['value'].append(0) else: center_dist = [abs(x) for x in self.center_dist] # 车道中心线横向距离分布极值 center_dist = np.array(center_dist) extreme_max_value = center_dist.max() self.result['value'].append(round(extreme_max_value, 2)) def markline_statistic(self): unfunc_df = pd.DataFrame( {'simTime': self.center_time_list, 'simFrame': self.center_frame_list, 'center_dist': self.center_dist_with_nan}) unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1] unfunc_df = unfunc_df.dropna(subset=['center_dist']) lane_df = unfunc_df[abs(unfunc_df['center_dist']) > 1.5] lane_df = lane_df[['simTime', 'simFrame', 'center_dist']] dist_lane_df = continuous_group(lane_df) dist_lane_df['type'] = "ICA" self.markline_df = pd.concat([self.markline_df, dist_lane_df], ignore_index=True) def report_data_statistic(self): time_list = self.df_lka['simTime'].values.tolist() line_dist_list = self.center_dist_with_nan self.result['tableData']['avg'] = '-' self.result['tableData']['max'] = self.result['value'][0] if not self.df_lka.empty else '-' self.result['tableData']['min'] = '-' zip_vs_time = zip_time_pairs(time_list, line_dist_list) self.result['reportData']['data'] = zip_vs_time self.markline_statistic() markline_slices = self.markline_df.to_dict('records') self.result['reportData']['markLine'] = markline_slices self.result['reportData']['range'] = f"[0, 1.5]" def run(self): # logger.info(f"Custom metric run:[{self.result['name']}].") logger.info(f"[case:{self.case_name}] Custom metric:[center_distance_max:{self.result['name']}] evaluate.") try: self.data_extract() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data extract ERROR!", e) try: self.data_analyze() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data analyze ERROR!", e) try: self.report_data_statistic() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} report data statistic ERROR!", e) # if __name__ == "__main__": # pass