123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/08/21
- @Last Modified: 2023/08/21
- @Summary: Compliance metrics
- """
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- sys.path.append('../results')
- import numpy as np
- import pandas as pd
- from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
- from scipy.spatial.distance import euclidean
- class traffic_rule(object):
- """
- Class for achieving compliance metrics for autonomous driving.
- Attributes:
- droadMark_df: Roadmark data, stored in dataframe format.
- """
- def __init__(self, data_processed, scoreModel):
- self.scoreModel = scoreModel
- self.roadMark_df = data_processed.road_mark_df
- self.trafficLight_df = data_processed.traffic_light_df
- self.trafficSignal_df = data_processed.traffic_signal_df
- self.objState_df = data_processed.object_df
- self.ego_df = self.objState_df[(self.objState_df.playerId == 1) & (self.objState_df.type == 1)]
- self.violation_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'violation'])
- def _filter_groups_by_frame_period(self, grouped_violations):
- """
- Filter groups by a minimum continuous frame period.
- """
- CONTINUOUS_FRAME_PERIOD = 13
- return [g for g in grouped_violations if len(g[0]) >= CONTINUOUS_FRAME_PERIOD]
- def _extract_violation_times(self, filtered_groups):
- """
- Create a dataframe with start and end times for each violation group.
- """
- return [[g[0][0], g[0][-1]] for g in filtered_groups]
- def _filter_solid_lines(self, Dimy):
- """
- Filter solid lines within the player's lateral distance.
- """
- dist_line = self.roadMark_df[self.roadMark_df["type"] == 1]
- dist_line = dist_line.reset_index()
- return dist_line[abs(dist_line["lateralDist"].values) <= Dimy]
- def _group_violations(self, dist_press):
- """
- Group violations by continuous frames.
- """
- t_list = dist_press['simTime'].values.tolist()
- f_list = dist_press['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(f_list)):
- if not sub_group_time or t_list[i] - t_list[i - 1] <= 1:
- sub_group_time.append(t_list[i])
- sub_group_frame.append(f_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [t_list[i]]
- sub_group_frame = [f_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- return list(zip(group_time, group_frame))
- def get_solid_line_violations(self):
- """
- Detect violations of pressing solid lines and return a dictionary with violation details.
- """
- # Extract relevant data
- Dimy = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2
- dist_press = self._filter_solid_lines(Dimy)
- grouped_violations = self._group_violations(dist_press)# Group violations by continuous frames
- filtered_groups = self._filter_groups_by_frame_period(grouped_violations)# Filter groups by minimum frame period
- # Calculate violation count and create violation dataframe
- press_line_count = len(filtered_groups)
- press_line_time = self._extract_violation_times(filtered_groups)
- if press_line_time:
- time_df = pd.DataFrame(press_line_time, columns=['start_time', 'end_time'])
- time_df['violation'] = '压实线'
- # Update violation dataframe
- self.violation_df = pd.concat([self.violation_df, press_line_time], ignore_index=True)
- # Create and return violation dictionary
- warning_count = 0
- press_line_dict = {
- 'metric': 'pressSolidLine',
- 'weight': 3,
- 'illegal_count': press_line_count,
- 'penalty_points': press_line_count * 3,
- 'penalty_money': press_line_count * 200,
- 'warning_count': warning_count,
- 'penalty_law': '《中华人民共和国道路交通安全法》第八十二条:机动车在高速公路上行驶,不得有下列行为:(三)骑、轧车行道分界线或者在路肩上行驶。'
- }
- return press_line_dict
- def normalize_angle(self, angle):
- """Normalize angle to the range [0, 360)."""
- difference = angle
- while difference >= 360:
- difference -= 360
- return difference
- def is_red_light(self, simTime, cycleTime, duration_start, duration_end):
- """Check if the current time corresponds to a red light phase."""
- divisor = simTime / cycleTime
- decimal_part = divisor - int(divisor)
- return duration_start <= decimal_part < duration_end
- def process_traffic_light(self, trafficLight_id):
- """Process a single traffic light and detect run red light events."""
- trafficLight_position = self.trafficSignal_df[self.trafficSignal_df["playerId"] == trafficLight_id].iloc[:1, :]
- if trafficLight_position.empty:
- return
- trafficLight_position_x = trafficLight_position['posX'].values[0]
- trafficLight_position_y = trafficLight_position['posY'].values[0]
- trafficLight_position_heading = trafficLight_position['posH'].values[0]
- trafficLight_character = self.trafficLight_df[self.trafficLight_df.id == trafficLight_id]
- cycleTime = trafficLight_character["cycleTime"].values[0]
- noPhases = trafficLight_character["noPhases"].values[0]
- # Calculate distances and headings
- self.ego_df["traffic_light_distance_absolute"] = self.ego_df[['posX', 'posY']].apply(
- lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1)
- self.ego_df["traffic_light_h_diff"] = self.ego_df.apply(
- lambda x: abs(x['posH'] - trafficLight_position_heading) * 57.3, axis=1)
- self.ego_df["traffic_light_h_diff"] = self.ego_df["traffic_light_h_diff"].apply(self.normalize_angle)
- # Filter ego vehicles near the traffic light with the correct heading
- mask_traffic_light = ((self.ego_df['traffic_light_h_diff'] <= 210) & (
- self.ego_df['traffic_light_h_diff'] >= 150)) | (self.ego_df['traffic_light_h_diff'] <= 30) | (
- self.ego_df['traffic_light_h_diff'] >= 330)
- ego_near_light = self.ego_df[(self.ego_df.traffic_light_distance_absolute <= 10) & mask_traffic_light]
- if ego_near_light.empty:
- return
- # Check for red light violations
- ego_near_light["flag_red_traffic_light"] = 0
- type_list = trafficLight_character['violation'][:noPhases]
- duration = trafficLight_character['duration'][:noPhases]
- duration_correct = [0] * noPhases
- for number in range(noPhases):
- duration_correct[number] = sum(duration[:number + 1])
- type_current = type_list.values[number]
- if type_current == 1: # Red light phase
- if number == 0:
- duration_start = 0
- else:
- duration_start = duration_correct[number - 1]
- duration_end = duration_correct[number]
- ego_near_light["flag_red_traffic_light"] = ego_near_light.apply(
- lambda x: self.is_red_light(x['simTime'], cycleTime, duration_start, duration_end), axis=1)
- # Collect run red light events
- run_red_light_df = ego_near_light[ego_near_light['flag_red_traffic_light'] == 1]
- self.collect_run_red_light_events(run_red_light_df)
- def collect_run_red_light_events(self, run_red_light_df):
- grouped_events = self._group_violations(run_red_light_df)
- filtered_events = self._filter_groups_by_frame_period(grouped_events)
- violation_times = self._extract_violation_times(filtered_events)
- if violation_times:
- time_df = pd.DataFrame(violation_times, columns=['start_time', 'end_time'])
- time_df['violation'] = '闯红灯'
- self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
- def run_red_light_detection(self):
- """Main function to detect run red light events."""
- trafficLight_id_list = set(self.trafficLight_df["id"].tolist())
- run_red_light_count = 0
- for trafficLight_id in trafficLight_id_list:
- self.process_traffic_light(trafficLight_id)
- # 闯红灯次数统计(这里可以根据需要修改统计逻辑)
- if 'flag_red_traffic_light' in self.ego_df.columns and self.ego_df['flag_red_traffic_light'].any() == 1:
- run_red_light_count += 1
- run_red_light_dict = {
- 'metric': 'runRedLight',
- 'weight': 6,
- 'illegal_count': run_red_light_count,
- 'penalty_points': run_red_light_count * 6,
- 'penalty_money': run_red_light_count * 200,
- 'warning_count': 0,
- 'penalty_law': '《中华人民共和国道路交通安全法实施条例》第四十条:(二)红色叉形灯或者箭头灯亮时,禁止本车道车辆通行。'
- }
- return run_red_light_dict
- def _find_speed_violations(self):
- DimX = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2
- data_ego = self.objState_df[self.objState_df["playerId"] == 1]
- speed_limit_sign = self.trafficSignal_df[self.trafficSignal_df["type"] == 274]
- same_df_rate = pd.merge(speed_limit_sign, data_ego, on=['simTime', 'simFrame'], how='inner').reset_index()
- speed_df = same_df_rate[(abs(same_df_rate["posX_x"] - same_df_rate["posX_y"]) <= 7) & (
- abs(same_df_rate["posY_x"] - same_df_rate["posY_y"]) <= DimX)]
- speed_df["speed"] = np.sqrt(speed_df["speedX"] ** 2 + speed_df["speedY"] ** 2) * 3.6
- list_sign = speed_df[speed_df["speed"] > speed_df["value"]]
- return list_sign, speed_df
- def _calculate_overspeed_statistics(self, speed_df, list_sign):
- index_sign = list_sign.index.to_list()
- speed_df["flag_press"] = speed_df["simFrame"].apply(lambda x: 1 if x in list_sign["simFrame"] else 0)
- speed_df["diff_press"] = speed_df["flag_press"].diff()
- index_list = []
- subindex_list = []
- for i in range(len(index_sign)):
- if not subindex_list or index_sign[i] - index_sign[i - 1] == 1:
- subindex_list.append(index_sign[i])
- else:
- index_list.append(subindex_list)
- subindex_list = [index_sign[i]]
- index_list.append(subindex_list)
- overspeed_count_0_to_10 = 0
- overspeed_count_10_to_20 = 0
- overspeed_count_20_to_50 = 0
- overspeed_count_50_to_ = 0
- if index_list[0]:
- for i in range(len(index_list)):
- left = index_list[i][0]
- right = index_list[i][-1]
- df_tmp = speed_df.loc[left:right + 1]
- max_ratio = ((df_tmp["speed"] - df_tmp["value"]) / df_tmp["value"]).max()
- if 0 <= max_ratio < 0.1:
- overspeed_count_0_to_10 += 1
- elif 0.1 <= max_ratio < 0.2:
- overspeed_count_10_to_20 += 1
- elif 0.2 <= max_ratio < 0.5:
- overspeed_count_20_to_50 += 1
- elif max_ratio >= 0.5:
- overspeed_count_50_to_ += 1
- return (
- self._create_overspeed_dict(overspeed_count_0_to_10, 'overspeed10', 0, 0),
- self._create_overspeed_dict(overspeed_count_10_to_20, 'overspeed10_20', 0, 200),
- self._create_overspeed_dict(overspeed_count_20_to_50, 'overspeed20_50', 6, 200),
- self._create_overspeed_dict(overspeed_count_50_to_, 'overspeed50', 12, 2000)
- )
- def _create_overspeed_dict(self, count, metric, penalty_points, penalty_money):
- return {
- 'metric': metric,
- 'weight': None,
- 'illegal_count': count,
- 'penalty_points': count * penalty_points,
- 'penalty_money': count * penalty_money,
- 'warning_count': count if penalty_points == 0 else 0,
- 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
- }
- def overspeed(self):
- list_sign, speed_df = self._find_speed_violations()
- grouped_events = self._group_violations(list_sign)
- filtered_events = self._filter_groups_by_frame_period(grouped_events)
- violation_times = self._extract_violation_times(filtered_events)
- if violation_times:
- time_df = pd.DataFrame([d[0] for d in violation_times], columns=['start_time', 'end_time'])
- time_df['violation'] = '超速'
- self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
- return self._calculate_overspeed_statistics(speed_df, list_sign)
- class Compliance(object):
- def __init__(self, data_processed, custom_data, scoreModel):
- self.eval_data = pd.DataFrame()
- self.penalty_points = 0
- self.config = data_processed.config
- compliance_config = self.config.config['compliance']
- self.compliance_config = compliance_config
- self.weight_dict = compliance_config['weight']
- self.metric_list = compliance_config['metric']
- self.type_list = compliance_config['type']
- print("self.type_list is", self.type_list)
- self.weight_custom = compliance_config['weightCustom']
- self.name_dict = compliance_config['name']
- self.metric_dict = compliance_config['typeMetricDict']
- self.type_name_dict = compliance_config['typeName']
- self.weight = compliance_config['weightDimension']
- self.weight_type_dict = compliance_config['typeWeight']
- self.weight_type_list = compliance_config['typeWeightList']
- self.type_illegal_count_dict = {}
- self.traffic_rule = traffic_rule(data_processed, scoreModel)
- self.violation_df = self.traffic_rule.violation_df
- def score_cal_penalty_points(self, penalty_points):
- if penalty_points == 0:
- score = 100
- elif penalty_points >= 12:
- score = 0
- else:
- score = (12 - penalty_points) / 12 * 60
- return score
- def time_splice(self, start_time, end_time):
- str_time = f"[{start_time}s, {end_time}s]"
- return str_time
- def weight_type_cal(self):
- # penalty_list = [1, 3, 6, 9, 12]
- penalty_list = [1, 3, 6, 12]
- sum_penalty = sum(penalty_list)
- weight_type_list = [round(x / sum_penalty, 2) for x in penalty_list]
- return weight_type_list
- def compliance_statistic(self):
- # metric analysis
- press_line_dict = self.traffic_rule.get_solid_line_violations()
- run_red_light_dict = self.traffic_rule.run_red_light_detection()
- overspeed_0_to_10_dict, overspeed_10_to_20_dict, overspeed_20_to_50_dict, overspeed_50_dict = self.traffic_rule.overspeed()
- df_list = []
- if "overspeed10" in self.metric_list:
- df_list.append(overspeed_0_to_10_dict)
- if "overspeed10_20" in self.metric_list:
- df_list.append(overspeed_10_to_20_dict)
- if "pressSolidLine" in self.metric_list:
- df_list.append(press_line_dict)
- if "runRedLight" in self.metric_list:
- df_list.append(run_red_light_dict)
- if "overspeed20_50" in self.metric_list:
- df_list.append(overspeed_20_to_50_dict)
- if "overspeed50" in self.metric_list:
- df_list.append(overspeed_50_dict)
- # generate dataframe and dicts
- compliance_df = pd.DataFrame(df_list)
- return compliance_df
- def prepare_data(self):
- self.compliance_df = self.compliance_statistic()
- self.illegal_count = int(self.compliance_df['illegal_count'].sum())
- self.metric_penalty_points_dict = self.compliance_df.set_index('metric').to_dict()['penalty_points']
- self.metric_illegal_count_dict = self.compliance_df.set_index('metric').to_dict()['illegal_count']
- self.metric_penalty_money_dict = self.compliance_df.set_index('metric').to_dict()['penalty_money']
- self.metric_warning_count_dict = self.compliance_df.set_index('metric').to_dict()['warning_count']
- self.metric_penalty_law_dict = self.compliance_df.set_index('metric').to_dict()['penalty_law']
- # 初始化数据字典
- self.illegal_count = int(self.compliance_df['illegal_count'].sum())
- self.metric_penalty_points_dict = self.compliance_df.set_index('metric').to_dict()['penalty_points']
- self.metric_illegal_count_dict = self.compliance_df.set_index('metric').to_dict()['illegal_count']
- self.metric_penalty_money_dict = self.compliance_df.set_index('metric').to_dict()['penalty_money']
- self.metric_warning_count_dict = self.compliance_df.set_index('metric').to_dict()['warning_count']
- self.metric_penalty_law_dict = self.compliance_df.set_index('metric').to_dict()['penalty_law']
- def calculate_deduct_scores(self):
- score_type_dict = {}
- deduct_functions = {
- "deduct1": self.calculate_deduct(1),
- "deduct3": self.calculate_deduct(3),
- "deduct6": self.calculate_deduct(6),
- "deduct9": self.calculate_deduct(9),
- "deduct12": self.calculate_deduct(12),
- }
- for deduct_type in self.type_list:
- print("deduct_type is", deduct_type)
- if deduct_type in deduct_functions:
- penalty_points, illegal_count = deduct_functions[deduct_type]
- score_type_dict[deduct_type] = self.score_cal_penalty_points(penalty_points)
- self.type_illegal_count_dict[deduct_type] = illegal_count
- return score_type_dict
- def calculate_deduct(self, num):
- deduct_df = self.compliance_df[(self.compliance_df['weight'].isna()) | (self.compliance_df['weight'] == num)]
- return deduct_df['penalty_points'].sum(), deduct_df['illegal_count'].sum()
- def calculate_weights(self):
- weight_dict = {
- "overspeed10": 0.5,
- "overspeed10_20": 0.5,
- "pressSolidLine": 1.0,
- "runRedLight": 0.5,
- "overspeed20_50": 0.5,
- "overspeed50": 1.0
- }
- self.weight_type_list = [weight_dict.get(metric, 0.5) for metric in
- self.type_list] # 假设 type_list 中的每个元素都在 weight_dict 的键中,或者默认为 0.5
- self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
- self.weight_dict = weight_dict
- def calculate_compliance_score(self, score_type_dict):
- if not self.weight_custom: # 客观赋权
- self.calculate_weights()
- penalty_points_threshold = 12 # 假设的扣分阈值
- if hasattr(self, 'penalty_points') and self.penalty_points >= penalty_points_threshold:
- score_compliance = 0
- elif sum(score_type_dict.values()) / len(score_type_dict) == 100:
- score_compliance = 100
- else:
- score_type_tmp = [80 if x == 100 else x for key, x in score_type_dict.items()]
- score_compliance = np.dot(self.weight_type_list, score_type_tmp)
- return round(score_compliance, 2)
- def output_results(self, score_compliance, score_type_dict):
- print("\n[合规性表现及得分情况]")
- print(f"合规性得分为:{score_compliance:.2f}分。")
- print(f"合规性各分组得分为:{score_type_dict}。")
- print(f"合规性各分组权重为:{self.weight_type_list}。")
- def compliance_score(self):
- self.prepare_data()
- score_type_dict = self.calculate_deduct_scores()
- score_compliance = self.calculate_compliance_score(score_type_dict)
- self.output_results(score_compliance, score_type_dict)
- return score_compliance, score_type_dict
- def _get_weight_distribution(self):
- # get weight distribution
- weight_distribution = {}
- weight_distribution["name"] = self.config.dimension_name["compliance"]
- for type in self.type_list:
- type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
- self.weight_dict.items() if
- key in self.metric_dict[type]}
- weight_distribution_type = {
- "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
- "indexes": type_weight_indexes_dict
- }
- weight_distribution[type] = weight_distribution_type
- return weight_distribution
- def build_weight_index(self, metric_prefix, weight_key):
- if metric_prefix in self.metric_list:
- return {
- f"{metric_prefix}Weight": f"{metric_prefix.replace('_', ' ').capitalize()}({self.weight_dict[weight_key] * 100:.2f}%)"}
- return {}
- def compliance_weight_distribution(self):
- weight_distribution = {"name": "合规性"}
- deduct_types = {
- "deduct1": {"metrics": ["overspeed10", "overspeed10_20"], "weight_key": "deduct1"},
- "deduct3": {"metrics": ["pressSolidLine"], "weight_key": "deduct3"},
- "deduct6": {"metrics": ["runRedLight", "overspeed20_50"], "weight_key": "deduct6"},
- "deduct9": {"metrics": [], "weight_key": "deduct9"}, # Assuming no specific metrics for deduct9
- "deduct12": {"metrics": ["overspeed50"], "weight_key": "deduct12"},
- }
- for deduct_type, details in deduct_types.items():
- if deduct_type in self.type_list:
- indexes_dict = {}
- for metric in details["metrics"]:
- indexes_dict.update(self.build_weight_index(metric, details["weight_key"]))
- weight_distribution[deduct_type] = {
- f"{deduct_type.replace('deduct', '')}Weight":
- f"{details['weight_key'].replace('deduct', '').replace('_', ' ').capitalize()}"
- f"违规({int(details['weight_key'].replace('deduct', ''))}分)({self.weight_type_dict[deduct_type] * 100:.2f} % )",
- "indexes": indexes_dict
- }
- if deduct_type == "deduct9" and "deduct9" in self.type_list:
- weight_distribution[deduct_type] = {
- "deduct9Weight": f"严重违规(9分)({self.weight_type_dict[deduct_type] * 100:.2f}%)",
- "indexes": {}
- }
- return weight_distribution
- def report_statistic(self):
- score_compliance, score_type_dict = self.compliance_score()
- grade_compliance = score_grade(score_compliance)
- score_compliance = int(score_compliance) if int(score_compliance) == score_compliance else score_compliance
- score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()]
- # 获取合规性描述
- comp_description1 = self.get_compliance_description(grade_compliance, self.illegal_count)
- comp_description2 = self.get_violation_details(self.type_list, self.type_illegal_count_dict,
- self.type_name_dict)
- weight_distribution = self._get_weight_distribution()
- # 获取违规数据表
- violations_slices = self.get_violations_table()
- # 获取扣分详情
- deductPoint_dict = self.get_deduct_points_dict(score_type_dict)
- # 返回结果(这里可以根据需要返回具体的数据结构)
- return {
- "name": "合规性",
- "weight": f"{self.weight * 100:.2f}%",
- "weightDistribution": weight_distribution,
- "score": score_compliance,
- "level": grade_compliance,
- 'score_type': score_type,
- # 'score_metric': score_metric,
- 'illegalCount': self.illegal_count,
- "description1": comp_description1,
- "description2": comp_description2,
- "details": deductPoint_dict,
- "violations": violations_slices
- }
- def get_compliance_description(self, grade_compliance, illegal_count):
- # 获取合规性描述
- if grade_compliance == '优秀':
- return '车辆在本轮测试中无违反交通法规行为;'
- else:
- return f'车辆在本轮测试中共发生{illegal_count}次违反交通法规行为;' \
- f'如果等级为一般或较差,需要提高算法在合规性上的表现。'
- def get_violation_details(self, type_list, type_illegal_count_dict, type_name_dict):
- # 获取违规详情描述
- if self.illegal_count == 0:
- return "车辆在该用例中无违反交通法规行为,算法表现良好。"
- else:
- str_illegel_type = ", ".join(
- [f"{type_name_dict[type]}行为{count}次" for type, count in type_illegal_count_dict.items() if
- count > 0])
- return f"车辆在该用例共违反交通法规{self.illegal_count}次。其中{str_illegel_type}。违规行为详情见附录C。"
- def get_violations_table(self):
- # 获取违规数据表
- if not self.violation_df.empty:
- self.violation_df['time'] = self.violation_df.apply(
- lambda row: self.time_splice(row['start_time'], row['end_time']), axis=1)
- df_violations = self.violation_df[['time', 'violation']]
- return df_violations.to_dict('records')
- else:
- return []
- def get_deduct_points_dict(self, score_type_dict):
- # 获取扣分详情字典
- deductPoint_dict = {}
- for type in self.type_list:
- type_dict = {
- "name": self.type_name_dict[type],
- "score": score_type_dict[type],
- "indexes": {}
- }
- for metric in self.metric_list:
- type_dict["indexes"][metric] = {
- "name": self.name_dict[metric],
- "times": self.metric_illegal_count_dict[metric],
- "deductPoints": self.metric_penalty_points_dict[metric],
- "fine": self.metric_penalty_money_dict[metric],
- "basis": self.metric_penalty_law_dict[metric]
- }
- deductPoint_dict[type] = type_dict
- for deduct_type, metrics in [
- ("deduct1", ["overspeed10", "overspeed10_20"]),
- ("deduct3", ["pressSolidLine"]),
- ("deduct6", ["runRedLight", "overspeed20_50"]),
- ("deduct9", ["xx"]), # 注意:这里xx是示例,实际应替换为具体的违规类型
- ("deduct12", ["overspeed50"])
- ]:
- if deduct_type in self.type_list:
- deduct_indexes = {}
- for metric in metrics:
- if metric in self.metric_list:
- deduct_indexes[metric] = {
- "name": self.name_dict[metric],
- "times": self.metric_illegal_count_dict[metric],
- "deductPoints": self.metric_penalty_points_dict[metric],
- "fine": self.metric_penalty_money_dict[metric],
- "basis": self.metric_penalty_law_dict[metric]
- }
- # 对于deduct9,特殊处理其score值
- score = score_type_dict.get(deduct_type, 100) if deduct_type != "deduct9" else 100
- deductPoint_dict[deduct_type] = {
- "name": f"{deduct_type.replace('deduct', '').strip()}违规({int(deduct_type.replace('deduct', ''))}分)",
- "score": score,
- "indexes": deduct_indexes
- }
- return deductPoint_dict
- def get_eval_data(self):
- df = self.eval_data
- return df
|