#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhangyu @Data: 2024/02/21 @Last Modified: 2024/02/21 @Summary: The template of custom indicator. """ """ 设计思路: 最大航向偏差角 """ import math import pandas as pd import numpy as np from common import zip_time_pairs, continuous_group, get_status_active_data from log import logger """import functions""" # custom metric codes class CustomMetric(object): def __init__(self, all_data, case_name): self.data = all_data self.optimal_dict = self.data.config self.status_trigger_dict = self.data.status_trigger_dict self.case_name = case_name self.markline_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.df = pd.DataFrame() self.ego_df = pd.DataFrame() self.df_ego = pd.DataFrame() self.roadMark_df = pd.DataFrame() # self.roadPos_df = pd.DataFrame() self.time_list_follow = list() self.frame_list_follow = list() self.dist_list = list() self.dist_deviation_list = list() self.dist_deviation_list_full_time = list() self.result = { "name": "最大航向角偏差", "value": [], # "weight": [], "tableData": { "avg": "", # 平均值,或指标值 "max": "", "min": "" }, "reportData": { "name": "最大航向角偏差(rad)", # "legend": [], # 如果有多个data,则需要增加data对应的说明,如:["横向加速度", "纵向加速度"] "data": [], "markLine": [], "range": [], }, "statusFlag": {} } self.run() print(f"指标03: LKA最大航向角偏差: {self.result['value']}") def data_extract(self): self.df = self.data.object_df self.ego_df = self.data.object_df[self.data.object_df.playerId == 1] # new active get code active_time_ranges = self.status_trigger_dict['LKA']['LKA_active_time'] self.df_ego = get_status_active_data(active_time_ranges, self.ego_df) self.roadMark_df = get_status_active_data(active_time_ranges, self.data.road_mark_df) if self.df_ego.empty: self.result['statusFlag']['function_LKA'] = False else: self.result['statusFlag']['function_LKA'] = True def dist(self, x1, y1, x2, y2): dis = math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dis def Compute_nearby_distance_to_lane_boundary(self, x, width_ego): if x.lateralDist < abs(x.right_lateral_distance): return x.lateralDist - width_ego / 2 else: return abs(x.right_lateral_distance) - width_ego / 2 def data_analyze(self): # 提取自车宽度 ego_df = self.df_ego.reset_index(drop=True) road_mark_df = self.roadMark_df # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减 road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True) road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True) road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor'] road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor'] road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \ lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1) ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle'] ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply( \ lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1) row_with_max_value = max(ego_df['heading_deviation_abs']) self.result['value'] = [row_with_max_value] self.time_list_follow = ego_df['simTime'].values.tolist() self.frame_list_follow = ego_df['simFrame'].values.tolist() self.dist_deviation_list = ego_df['heading_deviation_abs'].values.tolist() def markline_statistic(self): unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow, 'dist_deviation': self.dist_deviation_list}) unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1] # v_df = unfunc_df[unfunc_df['dist_deviation'] > 0] v_df = unfunc_df v_df = v_df[['simTime', 'simFrame', 'dist_deviation']] v_follow_df = continuous_group(v_df) v_follow_df['type'] = "LKA" self.markline_df = pd.concat([self.markline_df, v_follow_df], ignore_index=True) def report_data_statistic(self): time_list = self.ego_df['simTime'].values.tolist() graph_list = [x for x in self.dist_deviation_list if not np.isnan(x)] self.result['tableData']['avg'] = f'{np.mean(graph_list):.3f}' if graph_list else 0 self.result['tableData']['max'] = f'{max(graph_list):.3f}' if graph_list else 0 self.result['tableData']['min'] = f'{min(graph_list):.3f}' if graph_list else 0 zip_vs_time = zip_time_pairs(time_list, self.dist_deviation_list) self.result['reportData']['data'] = zip_vs_time self.markline_statistic() markline_slices = self.markline_df.to_dict('records') self.result['reportData']['range'] = f"[-1.875, 1.875]" def run(self): logger.info(f"[case:{self.case_name}] Custom metric:[cicv_LKA_03_heading_deviation_max:{self.result['name']}] evaluate.") try: self.data_extract() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data extract ERROR!", e) try: self.data_analyze() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data analyze ERROR!", e) try: self.report_data_statistic() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} report data statistic ERROR!", e) # if __name__ == "__main__": # pass