123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/08/21
- @Last Modified: 2023/08/21
- @Summary: Compliance metrics
- """
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- sys.path.append('../results')
- import numpy as np
- import pandas as pd
- from data_info import DataInfoList
- from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
- from scipy.spatial.distance import euclidean
- class Compliance(object):
- """
- Class for achieving compliance metrics for autonomous driving.
- Attributes:
- droadMark_df: Roadmark data, stored in dataframe format.
- """
- def __init__(self, data_processed, custom_data, scoreModel):
- self.eval_data = pd.DataFrame()
- self.scoreModel = scoreModel
- self.roadMark_df = data_processed.road_mark_df
- self.trafficLight_df = data_processed.traffic_light_df
- self.trafficSignal_df = data_processed.traffic_signal_df
- self.objState_df = data_processed.object_df
- self.violation_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'violation'])
- self.illegal_count = 0
- self.penalty_points = 0
- self.penalty_money = 0
- self.warning_count = 0
- self.config = data_processed.config
- compliance_config = self.config.config['compliance']
- self.compliance_config = compliance_config
- # common data
- self.bulitin_metric_list = self.config.builtinMetricList
- # dimension data
- self.weight_custom = compliance_config['weightCustom']
- self.metric_list = compliance_config['metric']
- self.type_list = compliance_config['type']
- self.type_name_dict = compliance_config['typeName']
- self.name_dict = compliance_config['name']
- self.unit_dict = compliance_config['unit']
- self.metric_dict = compliance_config['typeMetricDict']
- # custom metric data
- # self.customMetricParam = compliance_config['customMetricParam']
- # self.custom_metric_list = list(self.customMetricParam.keys())
- self.custom_data = custom_data
- self.custom_param_dict = {}
- # score data
- self.weight = compliance_config['weightDimension']
- self.weight_dict = compliance_config['weight']
- self.weight_list = compliance_config['weightList']
- self.weight_type_dict = compliance_config['typeWeight']
- self.weight_type_list = compliance_config['typeWeightList']
- # type dicts
- self.type_illegal_count_dict = {}
- self.type_penalty_points_dict = {}
- self.type_penalty_money_dict = {}
- self.type_warning_count_dict = {}
- self.type_penalty_law_dict = {}
- # metric dicts
- self.metric_illegal_count_dict = {}
- self.metric_penalty_points_dict = {}
- self.metric_penalty_money_dict = {}
- self.metric_warning_count_dict = {}
- self.metric_penalty_law_dict = {}
- # self.metric_illegal_list = []
- self.metric_illegal_count_list = []
- self.metric_penalty_points_list = []
- self.metric_penalty_money_list = []
- self.metric_warning_count_list = []
- self.metric_penalty_law_list = []
- def press_solid_line(self):
- """
- #define RDB_ROADMARK_TYPE_NONE 0
- #define RDB_ROADMARK_TYPE_SOLID 1
- #define RDB_ROADMARK_TYPE_BROKEN 2
- #define RDB_ROADMARK_TYPE_CURB 3
- #define RDB_ROADMARK_TYPE_GRASS 4
- #define RDB_ROADMARK_TYPE_BOTDOT 5
- #define RDB_ROADMARK_TYPE_OTHER 6
- #define RDB_ROADMARK_TYPE_SOLID_SOLID 7
- #define RDB_ROADMARK_TYPE_BROKEN_SOLID 8
- #define RDB_ROADMARK_TYPE_SOLID_BROKEN 9
- #define RDB_ROADMARK_TYPE_LANE_CENTER 10
- #define RDB_ROADMARK_COLOR_NONE 0
- #define RDB_ROADMARK_COLOR_WHITE 1
- #define RDB_ROADMARK_COLOR_RED 2
- #define RDB_ROADMARK_COLOR_YELLOW 3
- #define RDB_ROADMARK_COLOR_OTHER 4
- #define RDB_ROADMARK_COLOR_BLUE 5
- #define RDB_ROADMARK_COLOR_GREEN 6
- """
- # Dimy = self.objState_df[self.objState_df["id"] == 1]["dimY"][0] / 2
- Dimy = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2
- dist_line = self.roadMark_df[self.roadMark_df["type"] == 1]
- dist_line = dist_line.reset_index()
- dist_press = dist_line[abs(dist_line["lateralDist"].values) <= Dimy]
- # 违规行为详情表统计
- t_list = dist_press['simTime'].values.tolist()
- f_list = dist_press['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(f_list)):
- if not sub_group_time or t_list[i] - t_list[i - 1] <= 1:
- sub_group_time.append(t_list[i])
- sub_group_frame.append(f_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [t_list[i]]
- sub_group_frame = [f_list[i]]
- CONTINUOUS_FRAME_PERIOD = 13
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD]
- group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD]
- # group_time = [g for g in group_time if g[-1]-g[0] >= 1]
- # group_frame = [g for g in group_frame if g[-1]-g[0] >= 13]
- press_line_count = len(group_time)
- # 输出图表值
- press_line_time = [[g[0], g[-1]] for g in group_time]
- press_line_frame = [[g[0], g[-1]] for g in group_frame]
- if press_line_time:
- time_df = pd.DataFrame(press_line_time, columns=['start_time', 'end_time'])
- # frame_df = pd.DataFrame(press_line_frame, columns=['start_frame', 'end_frame'])
- time_df['violation'] = '压实线'
- # frame_df['violation'] = '压实线'
- # self.violation_df = pd.concat([self.violation_df, time_df, frame_df], ignore_index=True)
- self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
- # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True)
- warning_count = 0
- press_line_dict = {
- 'metric': 'pressSolidLine',
- 'weight': 3,
- 'illegal_count': press_line_count,
- 'penalty_points': press_line_count * 3,
- 'penalty_money': press_line_count * 200,
- 'warning_count': warning_count,
- 'penalty_law': '《中华人民共和国道路交通安全法》第八十二条:机动车在高速公路上行驶,不得有下列行为:(三)骑、轧车行道分界线或者在路肩上行驶。'
- }
- return press_line_dict
- def normalization_processing(self, x):
- difference = x["traffic_light_h_diff"]
- while (difference >= 360):
- difference -= 360
- return difference
- def flag_red_traffic_light(self, x, cycleTime, duration_start, duration_end):
- divisor = x['simTime'] / cycleTime
- decimal_part = divisor - int(divisor)
- # decimal_part = divisor % 1
- if duration_start <= decimal_part < duration_end:
- return 1
- else:
- return x["flag_red_traffic_light"]
- def run_red_light(self):
- """
- """
- ego_df = self.objState_df[(self.objState_df.playerId == 1) & (self.objState_df.type == 1)]
- trafficLight_id_list = set(self.trafficLight_df["id"].tolist())
- l_stipulate = 10
- run_red_light_count = 0
- for trafficLight_id in trafficLight_id_list:
- trafficLight_position = self.trafficSignal_df[self.trafficSignal_df["playerId"] == trafficLight_id]
- trafficLight_character = self.trafficLight_df[self.trafficLight_df.id == trafficLight_id]
- if trafficLight_position.empty: # trafficSign中没有记录
- continue
- trafficLight_position = trafficLight_position.iloc[:1, :]
- trafficLight_position_x = trafficLight_position['posX'].values[0]
- trafficLight_position_y = trafficLight_position['posY'].values[0]
- trafficLight_position_heading = trafficLight_position['posH'].values[0]
- cycleTime = trafficLight_character["cycleTime"].values[0]
- noPhases = trafficLight_character["noPhases"].values[0]
- ego_df["traffic_light_distance_absolute"] = ego_df[['posX', 'posY']].apply( \
- lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1)
- # ego_df["traffic_light_distance_absolute"] = ego_df.apply(lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1)
- ego_df["traffic_light_h_diff"] = ego_df.apply(
- lambda x: abs(x['posH'] - trafficLight_position_heading) * 57.3, axis=1)
- ego_df["traffic_light_h_diff"] = ego_df.apply(
- lambda x: self.normalization_processing(x), axis=1).copy() # 归一化到[0,360)之间
- mask_trafftic_light = ((ego_df['traffic_light_h_diff'] <= 210) & (
- ego_df['traffic_light_h_diff'] >= 150)) | (ego_df['traffic_light_h_diff'] <= 30) | (
- ego_df['traffic_light_h_diff'] >= 330)
- ego_near_light = ego_df[(ego_df.traffic_light_distance_absolute <= l_stipulate) & mask_trafftic_light]
- if ego_near_light.empty:
- continue
- """ 当前是否为红灯 """
- ego_near_light["flag_red_traffic_light"] = 0 # 不是红灯
- type_list = trafficLight_character['violation'][:noPhases]
- duration = trafficLight_character['duration'][:noPhases]
- duration_correct = [0] * noPhases
- for number in range(noPhases):
- duration_correct[number] = sum(duration[:number + 1])
- type_current = type_list.values[number]
- if type_current == 1: # 当前duration是红灯
- if number == 0:
- duration_start = 0
- else:
- duration_start = duration_correct[number - 1]
- duration_end = duration_correct[number]
- ego_near_light["flag_red_traffic_light"] = ego_near_light[
- ['simTime', 'flag_red_traffic_light']].apply(
- lambda x: self.flag_red_traffic_light(x, cycleTime, duration_start, duration_end), axis=1)
- # 挑出闯红灯数据
- run_red_light_df = ego_near_light[ego_near_light['flag_red_traffic_light'] == 1]
- # 违规行为详情表统计
- t_list = run_red_light_df['simTime'].values.tolist()
- f_list = run_red_light_df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(f_list)):
- if not sub_group_time or t_list[i] - t_list[i - 1] <= 1:
- sub_group_time.append(t_list[i])
- sub_group_frame.append(f_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [t_list[i]]
- sub_group_frame = [f_list[i]]
- CONTINUOUS_FRAME_PERIOD = 13
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD]
- group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD]
- run_red_light_time = [[g[0], g[-1]] for g in group_time]
- run_red_light_frame = [[g[0], g[-1]] for g in group_frame]
- if run_red_light_time:
- time_df = pd.DataFrame(run_red_light_time, columns=['start_time', 'end_time'])
- # frame_df = pd.DataFrame(run_red_light_frame, columns=['start_frame', 'end_frame'])
- time_df['violation'] = '闯红灯'
- # frame_df['violation'] = '闯红灯'
- self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
- # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True)
- # 闯红灯次数统计
- if ego_near_light["flag_red_traffic_light"].any() == 1:
- run_red_light_count = run_red_light_count + 1
- run_red_light_dict = {
- 'metric': 'runRedLight',
- 'weight': 6,
- 'illegal_count': run_red_light_count,
- 'penalty_points': run_red_light_count * 6,
- 'penalty_money': run_red_light_count * 200,
- 'warning_count': 0,
- 'penalty_law': '《中华人民共和国道路交通安全法实施条例》第四十条:(二)红色叉形灯或者箭头灯亮时,禁止本车道车辆通行。'
- }
- return run_red_light_dict
- def overspeed(self):
- """
- 《中华人民共和国道路交通安全法实施条例》第四十五条:机动车在道路上行驶不得超过限速标志、标线标明的速度;
- 1、高速、城市快速路超速超过规定时速10%以内,处以警告,不扣分;
- 2、普通私家车高速超过规定时速10%以上未达20%的,处以200元罚款,记3分,普通私家车在除高速公路、城市快速路外的道路,超速20%以上未达到50%,会一次被记3分;
- 3、超过规定时速20%以上未达50%的,处以200元罚款,记6分,或在普通道路上超速50%以上,都会一次被记6分。;
- 4、超过规定时速50%以上的,可吊销驾驶证并罚款2000元,计12分。
- [0, 10) 0,0,1
- [10, 20) 0,200,1
- 高速、城市快速路:[20, 50) 6,200
- 高速、城市快速路:[50,)12,2000
- 高速、城市以外:[20, 50)3,200
- 高速、城市以外:[50,)6,1000-2000
- """
- Dimx = self.objState_df[self.objState_df["playerId"] == 1]["dimX"][0] / 2
- data_ego = self.objState_df[self.objState_df["playerId"] == 1]
- speed_limit_sign = self.trafficSignal_df[self.trafficSignal_df["type"] == 274]
- same_df_rate = pd.merge(speed_limit_sign, data_ego, on=['simTime', 'simFrame'], how='inner')
- same_df_rate = same_df_rate.reset_index()
- speed_df = same_df_rate[(abs(same_df_rate["posX_x"] - same_df_rate["posX_y"]) <= 7) & (
- abs(same_df_rate["posY_x"] - same_df_rate["posY_y"]) <= Dimx)]
- speed_df["speed"] = np.sqrt(speed_df["speedX"] ** 2 + speed_df["speedY"] ** 2) * 3.6
- # speed_df["value"] = 10
- # same_df_rate["value"] = 10
- list_sign = speed_df[speed_df["speed"] > speed_df["value"]]
- # 违规行为详情表统计
- t_list = list_sign['simTime'].values.tolist()
- f_list = list_sign['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(f_list)):
- if not sub_group_time or t_list[i] - t_list[i - 1] <= 2:
- sub_group_time.append(t_list[i])
- sub_group_frame.append(f_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [t_list[i]]
- sub_group_frame = [f_list[i]]
- CONTINUOUS_FRAME_PERIOD = 13
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD]
- group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD]
- # 输出图表值
- overspeed_time = [[g[0], g[-1]] for g in group_time]
- overspeed_frame = [[g[0], g[-1]] for g in group_frame]
- if overspeed_time:
- time_df = pd.DataFrame(overspeed_time, columns=['start_time', 'end_time'])
- # frame_df = pd.DataFrame(overspeed_frame, columns=['start_frame', 'end_frame'])
- time_df['violation'] = '超速'
- # frame_df['violation'] = '超速'
- self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
- # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True)
- # list_sign = list_sign.reset_index()
- index_sign = list_sign.index.to_list()
- speed_df["flag_press"] = speed_df["simFrame"].apply(lambda x: 1 if x in list_sign["simFrame"] else 0)
- speed_df["diff_press"] = speed_df["flag_press"].diff()
- # overrate_count = speed_df[speed_df["diff_press"] == -1]["diff_press"].count()
- index_list = []
- subindex_list = []
- for i in range(len(index_sign)):
- if not subindex_list or index_sign[i] - index_sign[i - 1] == 1:
- subindex_list.append(index_sign[i])
- else:
- index_list.append(subindex_list)
- subindex_list.append(index_sign[i])
- index_list.append(subindex_list)
- overspeed_count_0_to_10 = 0
- overspeed_count_10_to_20 = 0
- overspeed_count_20_to_50 = 0
- overspeed_count_50_to_ = 0
- if index_list[0]:
- for i in range(len(index_list)):
- left = index_list[i][0]
- right = index_list[i][-1]
- df_tmp = speed_df.loc[left:right + 1]
- max_ratio = ((df_tmp["speed"] - df_tmp["value"]) / df_tmp["value"]).max()
- if max_ratio >= 0 and max_ratio < 0.1:
- overspeed_count_0_to_10 += 1
- elif max_ratio >= 0.1 and max_ratio < 0.2:
- overspeed_count_10_to_20 += 1
- elif max_ratio >= 0.2 and max_ratio < 0.5:
- overspeed_count_20_to_50 += 1
- elif max_ratio >= 0.5:
- overspeed_count_50_to_ += 1
- """
- [0, 10) 0,0,1
- [10, 20) 0,200,1
- 高速、城市快速路:[20, 50) 6,200
- 高速、城市快速路:[50,)12,2000
- """
- overspeed_0_to_10 = {
- # 'metric': 'overspeed_0_to_10',
- 'metric': 'overspeed10',
- 'weight': None,
- 'illegal_count': overspeed_count_0_to_10,
- 'penalty_points': 0,
- 'penalty_money': 0,
- 'warning_count': overspeed_count_0_to_10,
- 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
- }
- overspeed_10_to_20 = {
- # 'metric': 'overspeed_10_to_20',
- 'metric': 'overspeed10_20',
- 'weight': None,
- 'illegal_count': overspeed_count_10_to_20,
- 'penalty_points': 0,
- 'penalty_money': overspeed_count_10_to_20 * 200,
- 'warning_count': overspeed_count_10_to_20,
- 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
- }
- overspeed_20_to_50 = {
- # 'metric': 'overspeed_20_to_50',
- 'metric': 'overspeed20_50',
- 'weight': 6,
- 'illegal_count': overspeed_count_20_to_50,
- 'penalty_points': overspeed_count_20_to_50 * 6,
- 'penalty_money': overspeed_count_20_to_50 * 200,
- 'warning_count': 0,
- 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
- }
- overspeed_50_to_ = {
- # 'metric': 'overspeed_50_to_',
- 'metric': 'overspeed50',
- 'weight': 12,
- 'illegal_count': overspeed_count_50_to_,
- 'penalty_points': overspeed_count_50_to_ * 12,
- 'penalty_money': overspeed_count_50_to_ * 2000,
- 'warning_count': 0,
- 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
- }
- return overspeed_0_to_10, overspeed_10_to_20, overspeed_20_to_50, overspeed_50_to_
- def score_cal_penalty_points(self, penalty_points):
- """
- # 1_results: 0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 100
- # 3_results: 0, 15, 30, 45, 100
- # 6_results: 0, 30, 100
- # 9_results: 0, 15, 100
- """
- if penalty_points == 0:
- score = 100
- elif penalty_points >= 12:
- score = 0
- else:
- score = (12 - penalty_points) / 12 * 60
- return score
- def time_splice(self, start_time, end_time):
- str_time = f"[{start_time}s, {end_time}s]"
- return str_time
- def time_frame_splice(self, start_time, end_time, start_frame, end_frame):
- str_time = f"[{start_time}s, {end_time}s], {start_frame}-{end_frame}"
- return str_time
- def weight_type_cal(self):
- # penalty_list = [1, 3, 6, 9, 12]
- penalty_list = [1, 3, 6, 12]
- sum_penalty = sum(penalty_list)
- weight_type_list = [round(x / sum_penalty, 2) for x in penalty_list]
- return weight_type_list
- def compliance_statistic(self):
- # metric analysis
- press_line_dict = self.press_solid_line()
- run_red_light_dict = self.run_red_light()
- overspeed_0_to_10_dict, overspeed_10_to_20_dict, overspeed_20_to_50_dict, overspeed_50_dict = self.overspeed()
- df_list = []
- if "overspeed10" in self.metric_list:
- df_list.append(overspeed_0_to_10_dict)
- if "overspeed10_20" in self.metric_list:
- df_list.append(overspeed_10_to_20_dict)
- if "pressSolidLine" in self.metric_list:
- df_list.append(press_line_dict)
- if "runRedLight" in self.metric_list:
- df_list.append(run_red_light_dict)
- if "overspeed20_50" in self.metric_list:
- df_list.append(overspeed_20_to_50_dict)
- if "overspeed50" in self.metric_list:
- df_list.append(overspeed_50_dict)
- # generate dataframe and dicts
- compliance_df = pd.DataFrame(df_list)
- return compliance_df
- def compliance_score(self):
- """
- 待优化
- 可以重新构建数据结构,每个判断函数的返回类型为dict
- 在多指标统计扣分、罚款时,可以将所有字典合并为一个dataframe,然后求和
- """
- # initialization
- score_type_dict = {}
- compliance_df = self.compliance_statistic()
- self.illegal_count = int(compliance_df['illegal_count'].sum())
- # self.penalty_points = compliance_df['penalty_points'].sum()
- # self.penalty_money = compliance_df['penalty_money'].sum()
- # self.warning_count = compliance_df['warning_count'].sum()
- # self.metric_illegal_list = compliance_df[compliance_df['illegal_count'] > 0]['metric'].tolist()
- # self.metric_illegal_count_list = compliance_df['illegal_count'].values.tolist()
- #
- # self.penalty_points_list = compliance_df['penalty_points'].values.tolist()
- # self.penalty_money_list = compliance_df['penalty_money'].values.tolist()
- # self.warning_count_list = compliance_df['warning_count'].values.tolist()
- self.metric_penalty_points_dict = compliance_df.set_index('metric').to_dict()['penalty_points']
- self.metric_illegal_count_dict = compliance_df.set_index('metric').to_dict()['illegal_count']
- self.metric_penalty_money_dict = compliance_df.set_index('metric').to_dict()['penalty_money']
- self.metric_warning_count_dict = compliance_df.set_index('metric').to_dict()['warning_count']
- self.metric_penalty_law_dict = compliance_df.set_index('metric').to_dict()['penalty_law']
- # deduct1
- if "deduct1" in self.type_list:
- deduct1_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 1)]
- penalty_points_1 = deduct1_df['penalty_points'].sum()
- illegal_count_1 = deduct1_df['illegal_count'].sum()
- score_type_dict["deduct1"] = self.score_cal_penalty_points(penalty_points_1)
- self.type_illegal_count_dict["deduct1"] = illegal_count_1
- # deduct3
- if "deduct3" in self.type_list:
- deduct3_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 3)]
- penalty_points_3 = deduct3_df['penalty_points'].sum()
- illegal_count_3 = deduct3_df['illegal_count'].sum()
- score_type_dict["deduct3"] = self.score_cal_penalty_points(penalty_points_3)
- self.type_illegal_count_dict["deduct3"] = illegal_count_3
- # deduct6
- if "deduct6" in self.type_list:
- deduct6_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 6)]
- penalty_points_6 = deduct6_df['penalty_points'].sum()
- illegal_count_6 = deduct6_df['illegal_count'].sum()
- score_type_dict["deduct6"] = self.score_cal_penalty_points(penalty_points_6)
- self.type_illegal_count_dict["deduct6"] = illegal_count_6
- # deduct9
- if "deduct9" in self.type_list:
- deduct9_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 9)]
- penalty_points_9 = deduct9_df['penalty_points'].sum()
- illegal_count_9 = deduct9_df['illegal_count'].sum()
- score_type_dict["deduct9"] = self.score_cal_penalty_points(penalty_points_9)
- self.type_illegal_count_dict["deduct9"] = illegal_count_9
- # deduct12
- if "deduct12" in self.type_list:
- deduct12_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 12)]
- penalty_points_12 = deduct12_df['penalty_points'].sum()
- illegal_count_12 = deduct12_df['illegal_count'].sum()
- score_type_dict["deduct12"] = self.score_cal_penalty_points(penalty_points_12)
- self.type_illegal_count_dict["deduct12"] = illegal_count_12
- weight_type_list = self.weight_type_cal()
- weight_dict = {
- "overspeed10": 0.5,
- "overspeed10_20": 0.5,
- "pressSolidLine": 1.0,
- "runRedLight": 0.5,
- "overspeed20_50": 0.5,
- "overspeed50": 1.0
- }
- type_list = ["deduct1", "deduct3", "deduct6", "deduct12"]
- if not self.weight_custom: # 客观赋权
- self.weight_type_list = weight_type_list
- self.weight_type_dict = {key: value for key, value in zip(type_list, weight_type_list)}
- self.weight_dict = weight_dict
- if self.penalty_points >= 12:
- score_compliance = 0
- elif sum(score_type_dict.values()) / len(score_type_dict) == 100:
- score_compliance = 100
- else:
- score_type_tmp = [80 if x == 100 else x for key, x in score_type_dict.items()]
- score_compliance = np.dot(self.weight_type_list, score_type_tmp)
- score_compliance = round(score_compliance, 2)
- print("\n[合规性表现及得分情况]")
- print(f"合规性得分为:{score_compliance:.2f}分。")
- print(f"合规性各分组得分为:{score_type_dict}。")
- print(f"合规性各分组权重为:{self.weight_type_list}。")
- # print(f"合规性各指标违规次数为:\n{count_metric}。")
- # print(f"违法总次数为:{self.illegal_count}。")
- # print(f"违法扣分总数为:{self.penalty_points}。")
- # print(f"罚款总金额为:{self.penalty_money}。")
- # print(f"警告总次数为:{self.warning_count}。")
- # print(compliance_df)
- return score_compliance, score_type_dict
- def _get_weight_distribution(self):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = self.config.dimension_name["compliance"]
- for type in self.type_list:
- type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in self.metric_dict[type]}
- weight_distribution_type = {
- "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
- "indexes": type_weight_indexes_dict
- }
- weight_distribution[type] = weight_distribution_type
- return weight_distribution
- def compliance_weight_distribution(self):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = "合规性"
- if "deduct1" in self.type_list:
- deduct1_weight_indexes_dict = {}
- if "overspeed10" in self.metric_list:
- deduct1_weight_indexes_dict[
- "overspeed10Weight"] = f"超速,但未超过10%({self.weight_dict['overspeed10'] * 100:.2f}%)"
- if "overspeed10_20" in self.metric_list:
- deduct1_weight_indexes_dict[
- "overspeed1020Weight"] = f"超速10%-20%({self.weight_dict['overspeed10_20'] * 100:.2f}%)"
- weight_distribution['deduct1'] = {
- "deduct1Weight": f"轻微违规(1分)({self.weight_type_dict['deduct1'] * 100:.2f}%)",
- "indexes": deduct1_weight_indexes_dict
- }
- if "deduct3" in self.type_list:
- deduct3_weight_indexes_dict = {}
- if "pressSolidLine" in self.metric_list:
- deduct3_weight_indexes_dict[
- "pressSolidLineWeight"] = f"压实线({self.weight_dict['pressSolidLine'] * 100:.2f}%)"
- weight_distribution['deduct3'] = {
- "deduct3Weight": f"中等违规(3分)({self.weight_type_dict['deduct3'] * 100:.2f}%)",
- "indexes": deduct3_weight_indexes_dict
- }
- if "deduct6" in self.type_list:
- deduct6_weight_indexes_dict = {}
- if "runRedLight" in self.metric_list:
- deduct6_weight_indexes_dict["runRedLightWeight"] = f"闯红灯({self.weight_dict['runRedLight'] * 100:.2f}%)"
- if "overspeed20_50" in self.metric_list:
- deduct6_weight_indexes_dict[
- "overspeed2050Weight"] = f"超速20%-50%({self.weight_dict['overspeed20_50'] * 100:.2f}%)"
- weight_distribution['deduct6'] = {
- "deduct6Weight": f"危险违规(6分)({self.weight_type_dict['deduct6'] * 100:.2f}%)",
- "indexes": deduct6_weight_indexes_dict
- }
- if "deduct9" in self.type_list:
- deduct9_weight_indexes_dict = {}
- weight_distribution['deduct9'] = {
- "deduct9Weight": f"严重违规(9分)({self.weight_type_dict['deduct6'] * 100:.2f}%)",
- "indexes": deduct9_weight_indexes_dict
- }
- if "deduct12" in self.type_list:
- deduct12_weight_indexes_dict = {}
- if "overspeed50" in self.metric_list:
- deduct12_weight_indexes_dict[
- "overspeed50Weight"] = f"超速50%以上({self.weight_dict['overspeed50'] * 100:.2f}%)"
- weight_distribution['deduct12'] = {
- "deduct12Weight": f"重大违规(12分)({self.weight_type_dict['deduct12'] * 100:.2f}%)",
- "indexes": deduct12_weight_indexes_dict
- }
- return weight_distribution
- def report_statistic(self):
- score_compliance, score_type_dict = self.compliance_score()
- grade_compliance = score_grade(score_compliance)
- score_compliance = int(score_compliance) if int(score_compliance) == score_compliance else score_compliance
- score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()]
- # get weight distribution
- # weight_distribution = self.compliance_weight_distribution()
- weight_distribution = self._get_weight_distribution()
- # if self.metric_illegal_list:
- # str_illegel_metric = string_concatenate(self.metric_illegal_list)
- cnt1 = self.illegal_count
- if grade_compliance == '优秀':
- comp_description1 = '车辆在本轮测试中无违反交通法规行为;'
- elif grade_compliance == '良好':
- comp_description1 = f'车辆在本轮测试中共发生{cnt1}次违反交通法规⾏为;'
- elif grade_compliance == '一般':
- comp_description1 = f'车辆在本轮测试中共有{cnt1}次违反交通法规⾏为,需要提高算法在合规性上的表现;'
- elif grade_compliance == '较差':
- comp_description1 = f'车辆在本轮测试中共有{cnt1}次违反交通法规⾏为,需要提高算法在合规性上的表现;'
- # xx指标得分超过基准线且无违规行为,算法表现良好
- # 违规次数 self.illegal_count
- # 1分xx次,3分xx次
- if cnt1 == 0:
- comp_description2 = "车辆在该用例中无违反交通法规行为,算法表现良好。"
- else:
- str_illegel_type = ""
- for type in self.type_list:
- if self.type_illegal_count_dict[type] > 0:
- str_illegel_type += f'{self.type_name_dict[type]}行为{self.type_illegal_count_dict[type]}次,'
- # if "deduct1" in self.type_list:
- # if self.type_illegal_count_dict["deduct1"] > 0:
- # str_illegel_type += f'轻微违规(1分)行为{self.type_illegal_count_dict["deduct1"]}次,'
- #
- # if "deduct3" in self.type_list:
- # if self.type_illegal_count_dict["deduct3"] > 0:
- # str_illegel_type += f'中等违规(3分)行为{self.type_illegal_count_dict["deduct3"]}次,'
- #
- # if "deduct6" in self.type_list:
- # if self.type_illegal_count_dict["deduct6"] > 0:
- # str_illegel_type += f'危险违规(6分)行为{self.type_illegal_count_dict["deduct6"]}次,'
- #
- # if "deduct9" in self.type_list:
- # if self.type_illegal_count_dict["deduct9"] > 0:
- # str_illegel_type += f'严重违规(9分)行为{self.type_illegal_count_dict["deduct9"]}次,'
- #
- # if "deduct12" in self.type_list:
- # if self.type_illegal_count_dict["deduct12"] > 0:
- # str_illegel_type += f'重大违规(12分)行为{self.type_illegal_count_dict["deduct12"]}次,'
- str_illegel_type = str_illegel_type[:-1]
- comp_description2 = f"车辆在该用例共违反交通法规{cnt1}次。其中{str_illegel_type}。违规行为详情见附录C。"
- # data for violations table
- if not self.violation_df.empty:
- # self.violation_df['time'] = self.violation_df.apply(
- # lambda row: self.time_frame_splice(row['start_time'], row['end_time'], row['start_frame'],
- # row['end_frame']), axis=1)
- self.violation_df['time'] = self.violation_df.apply(
- lambda row: self.time_splice(row['start_time'], row['end_time']), axis=1)
- df_violations = self.violation_df[['time', 'violation']]
- violations_slices = df_violations.to_dict('records')
- else:
- violations_slices = []
- # violations_slices = [{'time': "[1s, 2s]", 'violation': "压实线"},
- # {'time': "[10s, 20s]", 'violation': "闯红灯"},
- # {'time': "[100s, 200s]", 'violation': "超速"}]
- deductPoint_dict = {}
- for type in self.type_list:
- type_dict = {
- "name": self.type_name_dict[type],
- "score": score_type_dict[type],
- }
- type_indexes = {}
- for metric in self.metric_list:
- type_indexes[metric] = {
- # "name": self.name_dict[metric],
- # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
- "name": f"{self.name_dict[metric]}",
- "times": self.metric_illegal_count_dict[metric],
- "deductPoints": self.metric_penalty_points_dict[metric],
- "fine": self.metric_penalty_money_dict[metric],
- "basis": self.metric_penalty_law_dict[metric]
- }
- type_dict["indexes"] = type_indexes
- deductPoint_dict[type] = type_dict
- # deduct1
- if "deduct1" in self.type_list:
- deduct1_indexes = {}
- if "overspeed10" in self.metric_list:
- tmp_dict = {
- "name": "超速,但未超过10%",
- "times": self.metric_illegal_count_dict["overspeed10"],
- "deductPoints": self.metric_penalty_points_dict["overspeed10"],
- "fine": self.metric_penalty_money_dict["overspeed10"],
- "basis": self.metric_penalty_law_dict["overspeed10"]
- # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。"
- }
- deduct1_indexes['overspeed10'] = tmp_dict
- if "overspeed10_20" in self.metric_list:
- tmp_dict = {
- "name": "超速10%-20%",
- "times": self.metric_illegal_count_dict["overspeed10_20"],
- "deductPoints": self.metric_penalty_points_dict["overspeed10_20"],
- "fine": self.metric_penalty_money_dict["overspeed10_20"],
- "basis": self.metric_penalty_law_dict["overspeed10_20"]
- # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。"
- }
- deduct1_indexes['overspeed10_20'] = tmp_dict
- deduct1_dict = {
- "name": "轻微违规(1分)",
- "score": score_type_dict["deduct1"],
- "indexes": deduct1_indexes
- }
- deductPoint_dict["deduct1"] = deduct1_dict
- # deduct3
- if "deduct3" in self.type_list:
- deduct3_indexes = {}
- if "pressSolidLine" in self.metric_list:
- tmp_dict = {
- "name": "压实线",
- "times": self.metric_illegal_count_dict["pressSolidLine"],
- "deductPoints": self.metric_penalty_points_dict["pressSolidLine"],
- "fine": self.metric_penalty_money_dict["pressSolidLine"],
- "basis": self.metric_penalty_law_dict["pressSolidLine"]
- # "basis": "《中华人民共和国道路交通安全法》第八十二条:机动车在高速公路上行驶,不得有下列行为:(三)骑、轧车行道分界线或者在路肩上行驶。"
- }
- deduct3_indexes['pressSolidLine'] = tmp_dict
- deduct3_dict = {
- "name": "中等违规(3分)",
- "score": score_type_dict["deduct3"],
- "indexes": deduct3_indexes
- }
- deductPoint_dict["deduct3"] = deduct3_dict
- # deduct6
- if "deduct6" in self.type_list:
- deduct6_indexes = {}
- if "runRedLight" in self.metric_list:
- tmp_dict = {
- "name": "闯红灯",
- "times": self.metric_illegal_count_dict["runRedLight"],
- "deductPoints": self.metric_penalty_points_dict["runRedLight"],
- "fine": self.metric_penalty_money_dict["runRedLight"],
- "basis": self.metric_penalty_law_dict["runRedLight"]
- # "basis": "《中华人民共和国道路交通安全法实施条例》第四十条:(二)红色叉形灯或者箭头灯亮时,禁止本车道车辆通行。"
- }
- deduct6_indexes['runRedLight'] = tmp_dict
- if "overspeed20_50" in self.metric_list:
- tmp_dict = {
- "name": "超速20%-50%",
- "times": self.metric_illegal_count_dict["overspeed20_50"],
- "deductPoints": self.metric_penalty_points_dict["overspeed20_50"],
- "fine": self.metric_penalty_money_dict["overspeed20_50"],
- "basis": self.metric_penalty_law_dict["overspeed20_50"]
- # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。"
- }
- deduct6_indexes['overspeed20_50'] = tmp_dict
- deduct6_dict = {
- "name": "危险违规(6分)",
- "score": score_type_dict["deduct6"],
- "indexes": deduct6_indexes
- }
- deductPoint_dict["deduct6"] = deduct6_dict
- # deduct9
- if "deduct9" in self.type_list:
- deduct9_indexes = {}
- if "xx" in self.metric_list:
- tmp_dict = {
- "name": "-",
- "times": "0",
- "deductPoints": "0",
- "fine": "0",
- "basis": "-"
- # "basis": "-"
- }
- deduct9_indexes['xx'] = tmp_dict
- deduct9_dict = {
- "name": "严重违规(9分)",
- "score": 100, # score_type_dict["deduct9"],
- "indexes": deduct9_indexes
- }
- deductPoint_dict["deduct9"] = deduct9_dict
- # deduct12
- if "deduct12" in self.type_list:
- deduct12_indexes = {}
- if "overspeed50" in self.metric_list:
- tmp_dict = {
- "name": "超速50%以上",
- "times": self.metric_illegal_count_dict["overspeed50"],
- "deductPoints": self.metric_penalty_points_dict["overspeed50"],
- "fine": self.metric_penalty_money_dict["overspeed50"],
- "basis": self.metric_penalty_law_dict["overspeed50"]
- # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。"
- }
- deduct12_indexes['overspeed50'] = tmp_dict
- deduct12_dict = {
- "name": "重大违规(12分)",
- "score": score_type_dict["deduct12"],
- "indexes": deduct12_indexes
- }
- deductPoint_dict["deduct12"] = deduct12_dict
- report_dict = {
- "name": "合规性",
- "weight": f"{self.weight * 100:.2f}%",
- "weightDistribution": weight_distribution,
- "score": score_compliance,
- "level": grade_compliance,
- 'score_type': score_type,
- # 'score_metric': score_metric,
- 'illegalCount': self.illegal_count,
- "description1": comp_description1,
- "description2": comp_description2,
- "details": deductPoint_dict,
- "violations": violations_slices
- }
- return report_dict
- def get_eval_data(self):
- df = self.eval_data
- return df
|