#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhangyu @Data: 2024/02/21 @Last Modified: 2024/02/21 @Summary: The template of custom indicator. """ """ 设计思路: 最大横向偏移量 zy_center_distance_expectation """ import math import pandas as pd import numpy as np from common import zip_time_pairs, continuous_group, get_status_active_data from log import logger """import functions""" # custom metric codes class CustomMetric(object): def __init__(self, all_data, case_name): self.data = all_data self.optimal_dict = self.data.config self.status_trigger_dict = self.data.status_trigger_dict self.case_name = case_name self.markline_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.df = pd.DataFrame() self.ego_df = pd.DataFrame() self.df_follow = pd.DataFrame() self.roadMark_df = pd.DataFrame() self.roadPos_df = pd.DataFrame() self.time_list_follow = list() self.frame_list_follow = list() self.dist_list = list() self.dist_deviation_list = list() self.dist_deviation_list_full_time = list() self.result = { "name": "横向距离极大值", "value": [], # "weight": [], "tableData": { "avg": "", # 平均值,或指标值 "max": "", "min": "" }, "reportData": { "name": "横向距离极大值(m)", # "legend": [], # 如果有多个data,则需要增加data对应的说明,如:["横向加速度", "纵向加速度"] "data": [], "markLine": [], "range": [], }, "statusFlag": {} } self.run() print(f"指标07: 横向距离极大值: {self.result['value']}") def data_extract(self): self.df = self.data.object_df self.ego_df = self.data.object_df[self.data.object_df.playerId == 1] # new active get code active_time_ranges = self.status_trigger_dict['ICA']['ICA_Lateral_time'] self.roadPos_df = get_status_active_data(active_time_ranges, self.data.road_pos_df) # self.df_ego = self.df[self.df['ACC_status'] == "Shut_off"].copy() # 数字3对应ICA的Active # self.df_ego = self.df[self.df['ACC_status'] == "Active"].copy() # 数字3对应ICA的Active # self.roadMark_df = self.data.road_mark_df # self.roadPos_df = self.data.road_pos_df if self.roadPos_df.empty: self.result['statusFlag']['function_ICA'] = False else: self.result['statusFlag']['function_ICA'] = True def dist(self, x1, y1, x2, y2): dis = math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dis def Compute_nearby_distance_to_lane_boundary(self, x, width_ego): if x.lateralDist < abs(x.right_lateral_distance): return x.lateralDist - width_ego/2 else: return abs(x.right_lateral_distance) - width_ego/2 def func_laneOffset_abs(self, x): return abs(x.laneOffset) def data_analyze(self): if self.roadPos_df.empty: row_with_max_value = 0.0 self.result['value'] = [0.0] else: # 提取自车宽度 roadPos_df = self.roadPos_df # player_df = self.df # ego_df = player_df[player_df.playerId == 1] # width_ego = ego_df['dimY'].values.tolist()[0] # 提取距离左车道线和右车道线距离 roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True) # # 计算到车道边界线距离 roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax() row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset self.result['value'] = [row_with_max_value] self.time_list_follow = roadPos_ego_df['simTime'].values.tolist() self.frame_list_follow = roadPos_ego_df['simFrame'].values.tolist() self.dist_deviation_list = roadPos_ego_df['laneOffset'].values.tolist() def markline_statistic(self): if not self.roadPos_df.empty: unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow, 'dist_deviation': self.dist_deviation_list}) unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1] # v_df = unfunc_df[unfunc_df['dist_deviation'] > 0] v_df = unfunc_df v_df = v_df[['simTime', 'simFrame', 'dist_deviation']] v_follow_df = continuous_group(v_df) v_follow_df['type'] = "ICA" self.markline_df = pd.concat([self.markline_df, v_follow_df], ignore_index=True) def report_data_statistic(self): if not self.roadPos_df.empty: time_list = self.ego_df['simTime'].values.tolist() graph_list = [x for x in self.dist_deviation_list if not np.isnan(x)] self.result['tableData']['avg'] = f'{np.mean(graph_list):.2f}' if graph_list else 0 self.result['tableData']['max'] = f'{max(graph_list):.2f}' if graph_list else 0 self.result['tableData']['min'] = f'{min(graph_list):.2f}' if graph_list else 0 zip_vs_time = zip_time_pairs(time_list, self.dist_deviation_list) self.result['reportData']['data'] = zip_vs_time self.markline_statistic() markline_slices = self.markline_df.to_dict('records') self.result['reportData']['markLine'] = markline_slices self.result['reportData']['range'] = f"[-1.875, 1.875]" else: self.result['tableData']['avg'] = 0 self.result['tableData']['max'] = 0 self.result['tableData']['min'] = 0 zip_vs_time = [] self.result['reportData']['data'] = zip_vs_time self.markline_statistic() markline_slices = self.markline_df.to_dict('records') self.result['reportData']['markLine'] = markline_slices self.result['reportData']['range'] = f"[-1.875, 1.875]" def run(self): # logger.info(f"Custom metric run:[{self.result['name']}].") logger.info(f"[case:{self.case_name}] Custom metric:[ica_distance_deviation:{self.result['name']}] evaluate.") try: self.data_extract() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data extract ERROR!", e) try: self.data_analyze() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data analyze ERROR!", e) try: self.report_data_statistic() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} report data statistic ERROR!", e) # if __name__ == "__main__": # pass