#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhangyu @Data: 2024/02/21 @Last Modified: 2024/02/21 @Summary: The template of custom indicator. """ """ 设计思路: 车道宽度:3.75m 车宽:1.8m 车的一边距离车道边界线为0.975m为最佳,值越小,分值越低 """ import math import pandas as pd import numpy as np from common import zip_time_pairs, continuous_group, get_status_active_data from log import logger """import functions""" # custom metric codes class CustomMetric(object): def __init__(self, all_data, case_name): self.data = all_data self.optimal_dict = self.data.config self.status_trigger_dict = self.data.status_trigger_dict self.case_name = case_name self.markline_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.df = pd.DataFrame() self.ego_df = pd.DataFrame() self.df_ego = pd.DataFrame() self.roadMark_df = pd.DataFrame() self.time_list_follow = list() self.frame_list_follow = list() self.dist_list = list() self.dist_deviation_list = list() self.dist_deviation_list_full_time = list() self.result = { "name": "离近侧车道线最小距离", "value": [], # "weight": [], "tableData": { "avg": "", # 平均值,或指标值 "max": "", "min": "" }, "reportData": { "name": "离近侧车道线最小距离(m)", # "legend": [], # 如果有多个data,则需要增加data对应的说明,如:["横向加速度", "纵向加速度"] "data": [], "markLine": [], "range": [], }, "statusFlag": {} } self.run() print(f"指标01: 离近侧车道线最小距离: {self.result['value']}") def data_extract(self): self.df = self.data.object_df self.ego_df = self.data.object_df[self.data.object_df.playerId == 1] # new active get code active_time_ranges = self.status_trigger_dict['LKA']['LKA_active_time'] self.df_ego = get_status_active_data(active_time_ranges, self.ego_df) self.roadMark_df = get_status_active_data(active_time_ranges, self.data.road_mark_df) # self.df_ego = self.df[self.df['LKA_status'] == "Active"].copy() # 数字3对应LKA的Active # self.roadMark_df = self.data.road_mark_df if self.df_ego.empty: self.result['statusFlag']['function_LKA'] = False else: self.result['statusFlag']['function_LKA'] = True def dist(self, x1, y1, x2, y2): dis = math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dis def Compute_nearby_distance_to_lane_boundary(self, x, width_ego): if x.lateralDist < abs(x.right_lateral_distance): return x.lateralDist - width_ego / 2 else: return abs(x.right_lateral_distance) - width_ego / 2 def data_analyze(self): # 提取自车宽度 roadMark_df = self.roadMark_df ego_df = self.df_ego # ego_df = player_df[player_df.playerId == 1] width_ego = ego_df['dimY'].values.tolist()[0] # 提取距离左车道线和右车道线距离 # roadMark_df['nearby_distance'] roadMark_left_df = roadMark_df[roadMark_df.id == 0].reset_index(drop=True) roadMark_right_df = roadMark_df[roadMark_df.id == 2].reset_index(drop=True) roadMark_left_df['right_lateral_distance'] = roadMark_right_df['lateralDist'] # 计算到车道边界线距离 roadMark_left_df['nearby_distance_to_lane_boundary'] = roadMark_left_df.apply( lambda x: self.Compute_nearby_distance_to_lane_boundary(x, width_ego), axis=1) nearby_distance_to_lane_boundary = min(roadMark_left_df['nearby_distance_to_lane_boundary']) self.result['value'] = [round(nearby_distance_to_lane_boundary, 3)] self.time_list_follow = roadMark_left_df['simTime'].values.tolist() self.frame_list_follow = roadMark_left_df['simFrame'].values.tolist() self.dist_deviation_list = roadMark_left_df['nearby_distance_to_lane_boundary'].values.tolist() # print("hello world") def markline_statistic(self): unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow, 'dist_deviation': self.dist_deviation_list}) unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1] v_df = unfunc_df[unfunc_df['dist_deviation'] > 0] v_df = v_df[['simTime', 'simFrame', 'dist_deviation']] v_follow_df = continuous_group(v_df) v_follow_df['type'] = "LKA" self.markline_df = pd.concat([self.markline_df, v_follow_df], ignore_index=True) def report_data_statistic(self): time_list = self.ego_df['simTime'].values.tolist() graph_list = [x for x in self.dist_deviation_list if not np.isnan(x)] self.result['tableData']['avg'] = f'{np.mean(graph_list):.3f}' if graph_list else 0 self.result['tableData']['max'] = f'{max(graph_list):.3f}' if graph_list else 0 self.result['tableData']['min'] = f'{min(graph_list):.3f}' if graph_list else 0 zip_vs_time = zip_time_pairs(time_list, self.dist_deviation_list) self.result['reportData']['data'] = zip_vs_time self.markline_statistic() markline_slices = self.markline_df.to_dict('records') self.result['reportData']['markLine'] = markline_slices self.result['reportData']['range'] = f"[0, 0.975]" def run(self): # logger.info(f"Custom metric run:[{self.result['name']}].") logger.info(f"[case:{self.case_name}] Custom metric:[ica_distance_deviation:{self.result['name']}] evaluate.") try: self.data_extract() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data extract ERROR!", e) try: self.data_analyze() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data analyze ERROR!", e) try: self.report_data_statistic() except Exception as e: logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} report data statistic ERROR!", e) # if __name__ == "__main__": # pass