#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/08/21 @Last Modified: 2023/08/21 @Summary: Compliance metrics """ import sys sys.path.append('../common') sys.path.append('../modules') sys.path.append('../results') import numpy as np import pandas as pd from data_info import DataInfoList from common import score_grade, string_concatenate, replace_key_with_value, score_over_100 from scipy.spatial.distance import euclidean class Compliance(object): """ Class for achieving compliance metrics for autonomous driving. Attributes: droadMark_df: Roadmark data, stored in dataframe format. """ def __init__(self, data_processed, custom_data, scoreModel): self.eval_data = pd.DataFrame() self.scoreModel = scoreModel self.roadMark_df = data_processed.road_mark_df self.trafficLight_df = data_processed.traffic_light_df self.trafficSignal_df = data_processed.traffic_signal_df self.objState_df = data_processed.object_df self.violation_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'violation']) self.illegal_count = 0 self.penalty_points = 0 self.penalty_money = 0 self.warning_count = 0 self.config = data_processed.config compliance_config = self.config.config['compliance'] self.compliance_config = compliance_config # common data self.bulitin_metric_list = self.config.builtinMetricList # dimension data self.weight_custom = compliance_config['weightCustom'] self.metric_list = compliance_config['metric'] self.type_list = compliance_config['type'] self.type_name_dict = compliance_config['typeName'] self.name_dict = compliance_config['name'] self.unit_dict = compliance_config['unit'] self.metric_dict = compliance_config['typeMetricDict'] # custom metric data # self.customMetricParam = compliance_config['customMetricParam'] # self.custom_metric_list = list(self.customMetricParam.keys()) self.custom_data = custom_data self.custom_param_dict = {} # score data self.weight = compliance_config['weightDimension'] self.weight_dict = compliance_config['weight'] self.weight_list = compliance_config['weightList'] self.weight_type_dict = compliance_config['typeWeight'] self.weight_type_list = compliance_config['typeWeightList'] # type dicts self.type_illegal_count_dict = {} self.type_penalty_points_dict = {} self.type_penalty_money_dict = {} self.type_warning_count_dict = {} self.type_penalty_law_dict = {} # metric dicts self.metric_illegal_count_dict = {} self.metric_penalty_points_dict = {} self.metric_penalty_money_dict = {} self.metric_warning_count_dict = {} self.metric_penalty_law_dict = {} # self.metric_illegal_list = [] self.metric_illegal_count_list = [] self.metric_penalty_points_list = [] self.metric_penalty_money_list = [] self.metric_warning_count_list = [] self.metric_penalty_law_list = [] def press_solid_line(self): """ #define RDB_ROADMARK_TYPE_NONE 0 #define RDB_ROADMARK_TYPE_SOLID 1 #define RDB_ROADMARK_TYPE_BROKEN 2 #define RDB_ROADMARK_TYPE_CURB 3 #define RDB_ROADMARK_TYPE_GRASS 4 #define RDB_ROADMARK_TYPE_BOTDOT 5 #define RDB_ROADMARK_TYPE_OTHER 6 #define RDB_ROADMARK_TYPE_SOLID_SOLID 7 #define RDB_ROADMARK_TYPE_BROKEN_SOLID 8 #define RDB_ROADMARK_TYPE_SOLID_BROKEN 9 #define RDB_ROADMARK_TYPE_LANE_CENTER 10 #define RDB_ROADMARK_COLOR_NONE 0 #define RDB_ROADMARK_COLOR_WHITE 1 #define RDB_ROADMARK_COLOR_RED 2 #define RDB_ROADMARK_COLOR_YELLOW 3 #define RDB_ROADMARK_COLOR_OTHER 4 #define RDB_ROADMARK_COLOR_BLUE 5 #define RDB_ROADMARK_COLOR_GREEN 6 """ # Dimy = self.objState_df[self.objState_df["id"] == 1]["dimY"][0] / 2 Dimy = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2 dist_line = self.roadMark_df[self.roadMark_df["type"] == 1] dist_line = dist_line.reset_index() dist_press = dist_line[abs(dist_line["lateralDist"].values) <= Dimy] # 违规行为详情表统计 t_list = dist_press['simTime'].values.tolist() f_list = dist_press['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not sub_group_time or t_list[i] - t_list[i - 1] <= 1: sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] CONTINUOUS_FRAME_PERIOD = 13 group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD] group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD] # group_time = [g for g in group_time if g[-1]-g[0] >= 1] # group_frame = [g for g in group_frame if g[-1]-g[0] >= 13] press_line_count = len(group_time) # 输出图表值 press_line_time = [[g[0], g[-1]] for g in group_time] press_line_frame = [[g[0], g[-1]] for g in group_frame] if press_line_time: time_df = pd.DataFrame(press_line_time, columns=['start_time', 'end_time']) # frame_df = pd.DataFrame(press_line_frame, columns=['start_frame', 'end_frame']) time_df['violation'] = '压实线' # frame_df['violation'] = '压实线' # self.violation_df = pd.concat([self.violation_df, time_df, frame_df], ignore_index=True) self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True) # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True) warning_count = 0 press_line_dict = { 'metric': 'pressSolidLine', 'weight': 3, 'illegal_count': press_line_count, 'penalty_points': press_line_count * 3, 'penalty_money': press_line_count * 200, 'warning_count': warning_count, 'penalty_law': '《中华人民共和国道路交通安全法》第八十二条:机动车在高速公路上行驶,不得有下列行为:(三)骑、轧车行道分界线或者在路肩上行驶。' } return press_line_dict def normalization_processing(self, x): difference = x["traffic_light_h_diff"] while (difference >= 360): difference -= 360 return difference def flag_red_traffic_light(self, x, cycleTime, duration_start, duration_end): divisor = x['simTime'] / cycleTime decimal_part = divisor - int(divisor) # decimal_part = divisor % 1 if duration_start <= decimal_part < duration_end: return 1 else: return x["flag_red_traffic_light"] def run_red_light(self): """ """ ego_df = self.objState_df[(self.objState_df.playerId == 1) & (self.objState_df.type == 1)] trafficLight_id_list = set(self.trafficLight_df["id"].tolist()) l_stipulate = 10 run_red_light_count = 0 for trafficLight_id in trafficLight_id_list: trafficLight_position = self.trafficSignal_df[self.trafficSignal_df["playerId"] == trafficLight_id] trafficLight_character = self.trafficLight_df[self.trafficLight_df.id == trafficLight_id] if trafficLight_position.empty: # trafficSign中没有记录 continue trafficLight_position = trafficLight_position.iloc[:1, :] trafficLight_position_x = trafficLight_position['posX'].values[0] trafficLight_position_y = trafficLight_position['posY'].values[0] trafficLight_position_heading = trafficLight_position['posH'].values[0] cycleTime = trafficLight_character["cycleTime"].values[0] noPhases = trafficLight_character["noPhases"].values[0] ego_df["traffic_light_distance_absolute"] = ego_df[['posX', 'posY']].apply( \ lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1) # ego_df["traffic_light_distance_absolute"] = ego_df.apply(lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1) ego_df["traffic_light_h_diff"] = ego_df.apply( lambda x: abs(x['posH'] - trafficLight_position_heading) * 57.3, axis=1) ego_df["traffic_light_h_diff"] = ego_df.apply( lambda x: self.normalization_processing(x), axis=1).copy() # 归一化到[0,360)之间 mask_trafftic_light = ((ego_df['traffic_light_h_diff'] <= 210) & ( ego_df['traffic_light_h_diff'] >= 150)) | (ego_df['traffic_light_h_diff'] <= 30) | ( ego_df['traffic_light_h_diff'] >= 330) ego_near_light = ego_df[(ego_df.traffic_light_distance_absolute <= l_stipulate) & mask_trafftic_light] if ego_near_light.empty: continue """ 当前是否为红灯 """ ego_near_light["flag_red_traffic_light"] = 0 # 不是红灯 type_list = trafficLight_character['violation'][:noPhases] duration = trafficLight_character['duration'][:noPhases] duration_correct = [0] * noPhases for number in range(noPhases): duration_correct[number] = sum(duration[:number + 1]) type_current = type_list.values[number] if type_current == 1: # 当前duration是红灯 if number == 0: duration_start = 0 else: duration_start = duration_correct[number - 1] duration_end = duration_correct[number] ego_near_light["flag_red_traffic_light"] = ego_near_light[ ['simTime', 'flag_red_traffic_light']].apply( lambda x: self.flag_red_traffic_light(x, cycleTime, duration_start, duration_end), axis=1) # 挑出闯红灯数据 run_red_light_df = ego_near_light[ego_near_light['flag_red_traffic_light'] == 1] # 违规行为详情表统计 t_list = run_red_light_df['simTime'].values.tolist() f_list = run_red_light_df['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not sub_group_time or t_list[i] - t_list[i - 1] <= 1: sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] CONTINUOUS_FRAME_PERIOD = 13 group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD] group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD] run_red_light_time = [[g[0], g[-1]] for g in group_time] run_red_light_frame = [[g[0], g[-1]] for g in group_frame] if run_red_light_time: time_df = pd.DataFrame(run_red_light_time, columns=['start_time', 'end_time']) # frame_df = pd.DataFrame(run_red_light_frame, columns=['start_frame', 'end_frame']) time_df['violation'] = '闯红灯' # frame_df['violation'] = '闯红灯' self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True) # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True) # 闯红灯次数统计 if ego_near_light["flag_red_traffic_light"].any() == 1: run_red_light_count = run_red_light_count + 1 run_red_light_dict = { 'metric': 'runRedLight', 'weight': 6, 'illegal_count': run_red_light_count, 'penalty_points': run_red_light_count * 6, 'penalty_money': run_red_light_count * 200, 'warning_count': 0, 'penalty_law': '《中华人民共和国道路交通安全法实施条例》第四十条:(二)红色叉形灯或者箭头灯亮时,禁止本车道车辆通行。' } return run_red_light_dict def overspeed(self): """ 《中华人民共和国道路交通安全法实施条例》第四十五条:机动车在道路上行驶不得超过限速标志、标线标明的速度; 1、高速、城市快速路超速超过规定时速10%以内,处以警告,不扣分; 2、普通私家车高速超过规定时速10%以上未达20%的,处以200元罚款,记3分,普通私家车在除高速公路、城市快速路外的道路,超速20%以上未达到50%,会一次被记3分; 3、超过规定时速20%以上未达50%的,处以200元罚款,记6分,或在普通道路上超速50%以上,都会一次被记6分。; 4、超过规定时速50%以上的,可吊销驾驶证并罚款2000元,计12分。 [0, 10) 0,0,1 [10, 20) 0,200,1 高速、城市快速路:[20, 50) 6,200 高速、城市快速路:[50,)12,2000 高速、城市以外:[20, 50)3,200 高速、城市以外:[50,)6,1000-2000 """ Dimx = self.objState_df[self.objState_df["playerId"] == 1]["dimX"][0] / 2 data_ego = self.objState_df[self.objState_df["playerId"] == 1] speed_limit_sign = self.trafficSignal_df[self.trafficSignal_df["type"] == 274] same_df_rate = pd.merge(speed_limit_sign, data_ego, on=['simTime', 'simFrame'], how='inner') same_df_rate = same_df_rate.reset_index() speed_df = same_df_rate[(abs(same_df_rate["posX_x"] - same_df_rate["posX_y"]) <= 7) & ( abs(same_df_rate["posY_x"] - same_df_rate["posY_y"]) <= Dimx)] speed_df["speed"] = np.sqrt(speed_df["speedX"] ** 2 + speed_df["speedY"] ** 2) * 3.6 # speed_df["value"] = 10 # same_df_rate["value"] = 10 list_sign = speed_df[speed_df["speed"] > speed_df["value"]] # 违规行为详情表统计 t_list = list_sign['simTime'].values.tolist() f_list = list_sign['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not sub_group_time or t_list[i] - t_list[i - 1] <= 2: sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] CONTINUOUS_FRAME_PERIOD = 13 group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD] group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD] # 输出图表值 overspeed_time = [[g[0], g[-1]] for g in group_time] overspeed_frame = [[g[0], g[-1]] for g in group_frame] if overspeed_time: time_df = pd.DataFrame(overspeed_time, columns=['start_time', 'end_time']) # frame_df = pd.DataFrame(overspeed_frame, columns=['start_frame', 'end_frame']) time_df['violation'] = '超速' # frame_df['violation'] = '超速' self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True) # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True) # list_sign = list_sign.reset_index() index_sign = list_sign.index.to_list() speed_df["flag_press"] = speed_df["simFrame"].apply(lambda x: 1 if x in list_sign["simFrame"] else 0) speed_df["diff_press"] = speed_df["flag_press"].diff() # overrate_count = speed_df[speed_df["diff_press"] == -1]["diff_press"].count() index_list = [] subindex_list = [] for i in range(len(index_sign)): if not subindex_list or index_sign[i] - index_sign[i - 1] == 1: subindex_list.append(index_sign[i]) else: index_list.append(subindex_list) subindex_list.append(index_sign[i]) index_list.append(subindex_list) overspeed_count_0_to_10 = 0 overspeed_count_10_to_20 = 0 overspeed_count_20_to_50 = 0 overspeed_count_50_to_ = 0 if index_list[0]: for i in range(len(index_list)): left = index_list[i][0] right = index_list[i][-1] df_tmp = speed_df.loc[left:right + 1] max_ratio = ((df_tmp["speed"] - df_tmp["value"]) / df_tmp["value"]).max() if max_ratio >= 0 and max_ratio < 0.1: overspeed_count_0_to_10 += 1 elif max_ratio >= 0.1 and max_ratio < 0.2: overspeed_count_10_to_20 += 1 elif max_ratio >= 0.2 and max_ratio < 0.5: overspeed_count_20_to_50 += 1 elif max_ratio >= 0.5: overspeed_count_50_to_ += 1 """ [0, 10) 0,0,1 [10, 20) 0,200,1 高速、城市快速路:[20, 50) 6,200 高速、城市快速路:[50,)12,2000 """ overspeed_0_to_10 = { # 'metric': 'overspeed_0_to_10', 'metric': 'overspeed10', 'weight': None, 'illegal_count': overspeed_count_0_to_10, 'penalty_points': 0, 'penalty_money': 0, 'warning_count': overspeed_count_0_to_10, 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。' } overspeed_10_to_20 = { # 'metric': 'overspeed_10_to_20', 'metric': 'overspeed10_20', 'weight': None, 'illegal_count': overspeed_count_10_to_20, 'penalty_points': 0, 'penalty_money': overspeed_count_10_to_20 * 200, 'warning_count': overspeed_count_10_to_20, 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。' } overspeed_20_to_50 = { # 'metric': 'overspeed_20_to_50', 'metric': 'overspeed20_50', 'weight': 6, 'illegal_count': overspeed_count_20_to_50, 'penalty_points': overspeed_count_20_to_50 * 6, 'penalty_money': overspeed_count_20_to_50 * 200, 'warning_count': 0, 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。' } overspeed_50_to_ = { # 'metric': 'overspeed_50_to_', 'metric': 'overspeed50', 'weight': 12, 'illegal_count': overspeed_count_50_to_, 'penalty_points': overspeed_count_50_to_ * 12, 'penalty_money': overspeed_count_50_to_ * 2000, 'warning_count': 0, 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。' } return overspeed_0_to_10, overspeed_10_to_20, overspeed_20_to_50, overspeed_50_to_ def score_cal_penalty_points(self, penalty_points): """ # 1_results: 0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 100 # 3_results: 0, 15, 30, 45, 100 # 6_results: 0, 30, 100 # 9_results: 0, 15, 100 """ if penalty_points == 0: score = 100 elif penalty_points >= 12: score = 0 else: score = (12 - penalty_points) / 12 * 60 return score def time_splice(self, start_time, end_time): str_time = f"[{start_time}s, {end_time}s]" return str_time def time_frame_splice(self, start_time, end_time, start_frame, end_frame): str_time = f"[{start_time}s, {end_time}s], {start_frame}-{end_frame}" return str_time def weight_type_cal(self): # penalty_list = [1, 3, 6, 9, 12] penalty_list = [1, 3, 6, 12] sum_penalty = sum(penalty_list) weight_type_list = [round(x / sum_penalty, 2) for x in penalty_list] return weight_type_list def compliance_statistic(self): # metric analysis press_line_dict = self.press_solid_line() run_red_light_dict = self.run_red_light() overspeed_0_to_10_dict, overspeed_10_to_20_dict, overspeed_20_to_50_dict, overspeed_50_dict = self.overspeed() df_list = [] if "overspeed10" in self.metric_list: df_list.append(overspeed_0_to_10_dict) if "overspeed10_20" in self.metric_list: df_list.append(overspeed_10_to_20_dict) if "pressSolidLine" in self.metric_list: df_list.append(press_line_dict) if "runRedLight" in self.metric_list: df_list.append(run_red_light_dict) if "overspeed20_50" in self.metric_list: df_list.append(overspeed_20_to_50_dict) if "overspeed50" in self.metric_list: df_list.append(overspeed_50_dict) # generate dataframe and dicts compliance_df = pd.DataFrame(df_list) return compliance_df def compliance_score(self): """ 待优化 可以重新构建数据结构,每个判断函数的返回类型为dict 在多指标统计扣分、罚款时,可以将所有字典合并为一个dataframe,然后求和 """ # initialization score_type_dict = {} compliance_df = self.compliance_statistic() self.illegal_count = int(compliance_df['illegal_count'].sum()) # self.penalty_points = compliance_df['penalty_points'].sum() # self.penalty_money = compliance_df['penalty_money'].sum() # self.warning_count = compliance_df['warning_count'].sum() # self.metric_illegal_list = compliance_df[compliance_df['illegal_count'] > 0]['metric'].tolist() # self.metric_illegal_count_list = compliance_df['illegal_count'].values.tolist() # # self.penalty_points_list = compliance_df['penalty_points'].values.tolist() # self.penalty_money_list = compliance_df['penalty_money'].values.tolist() # self.warning_count_list = compliance_df['warning_count'].values.tolist() self.metric_penalty_points_dict = compliance_df.set_index('metric').to_dict()['penalty_points'] self.metric_illegal_count_dict = compliance_df.set_index('metric').to_dict()['illegal_count'] self.metric_penalty_money_dict = compliance_df.set_index('metric').to_dict()['penalty_money'] self.metric_warning_count_dict = compliance_df.set_index('metric').to_dict()['warning_count'] self.metric_penalty_law_dict = compliance_df.set_index('metric').to_dict()['penalty_law'] # deduct1 if "deduct1" in self.type_list: deduct1_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 1)] penalty_points_1 = deduct1_df['penalty_points'].sum() illegal_count_1 = deduct1_df['illegal_count'].sum() score_type_dict["deduct1"] = self.score_cal_penalty_points(penalty_points_1) self.type_illegal_count_dict["deduct1"] = illegal_count_1 # deduct3 if "deduct3" in self.type_list: deduct3_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 3)] penalty_points_3 = deduct3_df['penalty_points'].sum() illegal_count_3 = deduct3_df['illegal_count'].sum() score_type_dict["deduct3"] = self.score_cal_penalty_points(penalty_points_3) self.type_illegal_count_dict["deduct3"] = illegal_count_3 # deduct6 if "deduct6" in self.type_list: deduct6_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 6)] penalty_points_6 = deduct6_df['penalty_points'].sum() illegal_count_6 = deduct6_df['illegal_count'].sum() score_type_dict["deduct6"] = self.score_cal_penalty_points(penalty_points_6) self.type_illegal_count_dict["deduct6"] = illegal_count_6 # deduct9 if "deduct9" in self.type_list: deduct9_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 9)] penalty_points_9 = deduct9_df['penalty_points'].sum() illegal_count_9 = deduct9_df['illegal_count'].sum() score_type_dict["deduct9"] = self.score_cal_penalty_points(penalty_points_9) self.type_illegal_count_dict["deduct9"] = illegal_count_9 # deduct12 if "deduct12" in self.type_list: deduct12_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 12)] penalty_points_12 = deduct12_df['penalty_points'].sum() illegal_count_12 = deduct12_df['illegal_count'].sum() score_type_dict["deduct12"] = self.score_cal_penalty_points(penalty_points_12) self.type_illegal_count_dict["deduct12"] = illegal_count_12 weight_type_list = self.weight_type_cal() weight_dict = { "overspeed10": 0.5, "overspeed10_20": 0.5, "pressSolidLine": 1.0, "runRedLight": 0.5, "overspeed20_50": 0.5, "overspeed50": 1.0 } type_list = ["deduct1", "deduct3", "deduct6", "deduct12"] if not self.weight_custom: # 客观赋权 self.weight_type_list = weight_type_list self.weight_type_dict = {key: value for key, value in zip(type_list, weight_type_list)} self.weight_dict = weight_dict if self.penalty_points >= 12: score_compliance = 0 elif sum(score_type_dict.values()) / len(score_type_dict) == 100: score_compliance = 100 else: score_type_tmp = [80 if x == 100 else x for key, x in score_type_dict.items()] score_compliance = np.dot(self.weight_type_list, score_type_tmp) score_compliance = round(score_compliance, 2) print("\n[合规性表现及得分情况]") print(f"合规性得分为:{score_compliance:.2f}分。") print(f"合规性各分组得分为:{score_type_dict}。") print(f"合规性各分组权重为:{self.weight_type_list}。") # print(f"合规性各指标违规次数为:\n{count_metric}。") # print(f"违法总次数为:{self.illegal_count}。") # print(f"违法扣分总数为:{self.penalty_points}。") # print(f"罚款总金额为:{self.penalty_money}。") # print(f"警告总次数为:{self.warning_count}。") # print(compliance_df) return score_compliance, score_type_dict def _get_weight_distribution(self): # get weight distribution weight_distribution = {} weight_distribution["name"] = self.config.dimension_name["compliance"] for type in self.type_list: type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in self.metric_dict[type]} weight_distribution_type = { "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)", "indexes": type_weight_indexes_dict } weight_distribution[type] = weight_distribution_type return weight_distribution def compliance_weight_distribution(self): # get weight distribution weight_distribution = {} weight_distribution["name"] = "合规性" if "deduct1" in self.type_list: deduct1_weight_indexes_dict = {} if "overspeed10" in self.metric_list: deduct1_weight_indexes_dict[ "overspeed10Weight"] = f"超速,但未超过10%({self.weight_dict['overspeed10'] * 100:.2f}%)" if "overspeed10_20" in self.metric_list: deduct1_weight_indexes_dict[ "overspeed1020Weight"] = f"超速10%-20%({self.weight_dict['overspeed10_20'] * 100:.2f}%)" weight_distribution['deduct1'] = { "deduct1Weight": f"轻微违规(1分)({self.weight_type_dict['deduct1'] * 100:.2f}%)", "indexes": deduct1_weight_indexes_dict } if "deduct3" in self.type_list: deduct3_weight_indexes_dict = {} if "pressSolidLine" in self.metric_list: deduct3_weight_indexes_dict[ "pressSolidLineWeight"] = f"压实线({self.weight_dict['pressSolidLine'] * 100:.2f}%)" weight_distribution['deduct3'] = { "deduct3Weight": f"中等违规(3分)({self.weight_type_dict['deduct3'] * 100:.2f}%)", "indexes": deduct3_weight_indexes_dict } if "deduct6" in self.type_list: deduct6_weight_indexes_dict = {} if "runRedLight" in self.metric_list: deduct6_weight_indexes_dict["runRedLightWeight"] = f"闯红灯({self.weight_dict['runRedLight'] * 100:.2f}%)" if "overspeed20_50" in self.metric_list: deduct6_weight_indexes_dict[ "overspeed2050Weight"] = f"超速20%-50%({self.weight_dict['overspeed20_50'] * 100:.2f}%)" weight_distribution['deduct6'] = { "deduct6Weight": f"危险违规(6分)({self.weight_type_dict['deduct6'] * 100:.2f}%)", "indexes": deduct6_weight_indexes_dict } if "deduct9" in self.type_list: deduct9_weight_indexes_dict = {} weight_distribution['deduct9'] = { "deduct9Weight": f"严重违规(9分)({self.weight_type_dict['deduct6'] * 100:.2f}%)", "indexes": deduct9_weight_indexes_dict } if "deduct12" in self.type_list: deduct12_weight_indexes_dict = {} if "overspeed50" in self.metric_list: deduct12_weight_indexes_dict[ "overspeed50Weight"] = f"超速50%以上({self.weight_dict['overspeed50'] * 100:.2f}%)" weight_distribution['deduct12'] = { "deduct12Weight": f"重大违规(12分)({self.weight_type_dict['deduct12'] * 100:.2f}%)", "indexes": deduct12_weight_indexes_dict } return weight_distribution def report_statistic(self): score_compliance, score_type_dict = self.compliance_score() grade_compliance = score_grade(score_compliance) score_compliance = int(score_compliance) if int(score_compliance) == score_compliance else score_compliance score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()] # get weight distribution # weight_distribution = self.compliance_weight_distribution() weight_distribution = self._get_weight_distribution() # if self.metric_illegal_list: # str_illegel_metric = string_concatenate(self.metric_illegal_list) cnt1 = self.illegal_count if grade_compliance == '优秀': comp_description1 = '车辆在本轮测试中无违反交通法规行为;' elif grade_compliance == '良好': comp_description1 = f'车辆在本轮测试中共发生{cnt1}次违反交通法规⾏为;' elif grade_compliance == '一般': comp_description1 = f'车辆在本轮测试中共有{cnt1}次违反交通法规⾏为,需要提高算法在合规性上的表现;' elif grade_compliance == '较差': comp_description1 = f'车辆在本轮测试中共有{cnt1}次违反交通法规⾏为,需要提高算法在合规性上的表现;' # xx指标得分超过基准线且无违规行为,算法表现良好 # 违规次数 self.illegal_count # 1分xx次,3分xx次 if cnt1 == 0: comp_description2 = "车辆在该用例中无违反交通法规行为,算法表现良好。" else: str_illegel_type = "" for type in self.type_list: if self.type_illegal_count_dict[type] > 0: str_illegel_type += f'{self.type_name_dict[type]}行为{self.type_illegal_count_dict[type]}次,' # if "deduct1" in self.type_list: # if self.type_illegal_count_dict["deduct1"] > 0: # str_illegel_type += f'轻微违规(1分)行为{self.type_illegal_count_dict["deduct1"]}次,' # # if "deduct3" in self.type_list: # if self.type_illegal_count_dict["deduct3"] > 0: # str_illegel_type += f'中等违规(3分)行为{self.type_illegal_count_dict["deduct3"]}次,' # # if "deduct6" in self.type_list: # if self.type_illegal_count_dict["deduct6"] > 0: # str_illegel_type += f'危险违规(6分)行为{self.type_illegal_count_dict["deduct6"]}次,' # # if "deduct9" in self.type_list: # if self.type_illegal_count_dict["deduct9"] > 0: # str_illegel_type += f'严重违规(9分)行为{self.type_illegal_count_dict["deduct9"]}次,' # # if "deduct12" in self.type_list: # if self.type_illegal_count_dict["deduct12"] > 0: # str_illegel_type += f'重大违规(12分)行为{self.type_illegal_count_dict["deduct12"]}次,' str_illegel_type = str_illegel_type[:-1] comp_description2 = f"车辆在该用例共违反交通法规{cnt1}次。其中{str_illegel_type}。违规行为详情见附录C。" # data for violations table if not self.violation_df.empty: # self.violation_df['time'] = self.violation_df.apply( # lambda row: self.time_frame_splice(row['start_time'], row['end_time'], row['start_frame'], # row['end_frame']), axis=1) self.violation_df['time'] = self.violation_df.apply( lambda row: self.time_splice(row['start_time'], row['end_time']), axis=1) df_violations = self.violation_df[['time', 'violation']] violations_slices = df_violations.to_dict('records') else: violations_slices = [] # violations_slices = [{'time': "[1s, 2s]", 'violation': "压实线"}, # {'time': "[10s, 20s]", 'violation': "闯红灯"}, # {'time': "[100s, 200s]", 'violation': "超速"}] deductPoint_dict = {} for type in self.type_list: type_dict = { "name": self.type_name_dict[type], "score": score_type_dict[type], } type_indexes = {} for metric in self.metric_list: type_indexes[metric] = { # "name": self.name_dict[metric], # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})", "name": f"{self.name_dict[metric]}", "times": self.metric_illegal_count_dict[metric], "deductPoints": self.metric_penalty_points_dict[metric], "fine": self.metric_penalty_money_dict[metric], "basis": self.metric_penalty_law_dict[metric] } type_dict["indexes"] = type_indexes deductPoint_dict[type] = type_dict # deduct1 if "deduct1" in self.type_list: deduct1_indexes = {} if "overspeed10" in self.metric_list: tmp_dict = { "name": "超速,但未超过10%", "times": self.metric_illegal_count_dict["overspeed10"], "deductPoints": self.metric_penalty_points_dict["overspeed10"], "fine": self.metric_penalty_money_dict["overspeed10"], "basis": self.metric_penalty_law_dict["overspeed10"] # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。" } deduct1_indexes['overspeed10'] = tmp_dict if "overspeed10_20" in self.metric_list: tmp_dict = { "name": "超速10%-20%", "times": self.metric_illegal_count_dict["overspeed10_20"], "deductPoints": self.metric_penalty_points_dict["overspeed10_20"], "fine": self.metric_penalty_money_dict["overspeed10_20"], "basis": self.metric_penalty_law_dict["overspeed10_20"] # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。" } deduct1_indexes['overspeed10_20'] = tmp_dict deduct1_dict = { "name": "轻微违规(1分)", "score": score_type_dict["deduct1"], "indexes": deduct1_indexes } deductPoint_dict["deduct1"] = deduct1_dict # deduct3 if "deduct3" in self.type_list: deduct3_indexes = {} if "pressSolidLine" in self.metric_list: tmp_dict = { "name": "压实线", "times": self.metric_illegal_count_dict["pressSolidLine"], "deductPoints": self.metric_penalty_points_dict["pressSolidLine"], "fine": self.metric_penalty_money_dict["pressSolidLine"], "basis": self.metric_penalty_law_dict["pressSolidLine"] # "basis": "《中华人民共和国道路交通安全法》第八十二条:机动车在高速公路上行驶,不得有下列行为:(三)骑、轧车行道分界线或者在路肩上行驶。" } deduct3_indexes['pressSolidLine'] = tmp_dict deduct3_dict = { "name": "中等违规(3分)", "score": score_type_dict["deduct3"], "indexes": deduct3_indexes } deductPoint_dict["deduct3"] = deduct3_dict # deduct6 if "deduct6" in self.type_list: deduct6_indexes = {} if "runRedLight" in self.metric_list: tmp_dict = { "name": "闯红灯", "times": self.metric_illegal_count_dict["runRedLight"], "deductPoints": self.metric_penalty_points_dict["runRedLight"], "fine": self.metric_penalty_money_dict["runRedLight"], "basis": self.metric_penalty_law_dict["runRedLight"] # "basis": "《中华人民共和国道路交通安全法实施条例》第四十条:(二)红色叉形灯或者箭头灯亮时,禁止本车道车辆通行。" } deduct6_indexes['runRedLight'] = tmp_dict if "overspeed20_50" in self.metric_list: tmp_dict = { "name": "超速20%-50%", "times": self.metric_illegal_count_dict["overspeed20_50"], "deductPoints": self.metric_penalty_points_dict["overspeed20_50"], "fine": self.metric_penalty_money_dict["overspeed20_50"], "basis": self.metric_penalty_law_dict["overspeed20_50"] # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。" } deduct6_indexes['overspeed20_50'] = tmp_dict deduct6_dict = { "name": "危险违规(6分)", "score": score_type_dict["deduct6"], "indexes": deduct6_indexes } deductPoint_dict["deduct6"] = deduct6_dict # deduct9 if "deduct9" in self.type_list: deduct9_indexes = {} if "xx" in self.metric_list: tmp_dict = { "name": "-", "times": "0", "deductPoints": "0", "fine": "0", "basis": "-" # "basis": "-" } deduct9_indexes['xx'] = tmp_dict deduct9_dict = { "name": "严重违规(9分)", "score": 100, # score_type_dict["deduct9"], "indexes": deduct9_indexes } deductPoint_dict["deduct9"] = deduct9_dict # deduct12 if "deduct12" in self.type_list: deduct12_indexes = {} if "overspeed50" in self.metric_list: tmp_dict = { "name": "超速50%以上", "times": self.metric_illegal_count_dict["overspeed50"], "deductPoints": self.metric_penalty_points_dict["overspeed50"], "fine": self.metric_penalty_money_dict["overspeed50"], "basis": self.metric_penalty_law_dict["overspeed50"] # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。" } deduct12_indexes['overspeed50'] = tmp_dict deduct12_dict = { "name": "重大违规(12分)", "score": score_type_dict["deduct12"], "indexes": deduct12_indexes } deductPoint_dict["deduct12"] = deduct12_dict report_dict = { "name": "合规性", "weight": f"{self.weight * 100:.2f}%", "weightDistribution": weight_distribution, "score": score_compliance, "level": grade_compliance, 'score_type': score_type, # 'score_metric': score_metric, 'illegalCount': self.illegal_count, "description1": comp_description1, "description2": comp_description2, "details": deductPoint_dict, "violations": violations_slices } return report_dict def get_eval_data(self): df = self.eval_data return df