#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2023/08/21 @Last Modified: 2023/08/21 @Summary: Compliance metrics """ import sys sys.path.append('../common') sys.path.append('../modules') sys.path.append('../results') import numpy as np import pandas as pd from common import score_grade, string_concatenate, replace_key_with_value, score_over_100 from scipy.spatial.distance import euclidean class traffic_rule(object): """ Class for achieving compliance metrics for autonomous driving. Attributes: droadMark_df: Roadmark data, stored in dataframe format. """ def __init__(self, data_processed, scoreModel): self.scoreModel = scoreModel self.roadMark_df = data_processed.road_mark_df self.trafficLight_df = data_processed.traffic_light_df self.trafficSignal_df = data_processed.traffic_signal_df self.objState_df = data_processed.object_df self.ego_df = self.objState_df[(self.objState_df.playerId == 1) & (self.objState_df.type == 1)] self.violation_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'violation']) def _filter_groups_by_frame_period(self, grouped_violations): """ Filter groups by a minimum continuous frame period. """ CONTINUOUS_FRAME_PERIOD = 13 return [g for g in grouped_violations if len(g[0]) >= CONTINUOUS_FRAME_PERIOD] def _extract_violation_times(self, filtered_groups): """ Create a dataframe with start and end times for each violation group. """ return [[g[0][0], g[0][-1]] for g in filtered_groups] def _filter_solid_lines(self, Dimy): """ Filter solid lines within the player's lateral distance. """ dist_line = self.roadMark_df[self.roadMark_df["type"] == 1] dist_line = dist_line.reset_index() return dist_line[abs(dist_line["lateralDist"].values) <= Dimy] def _group_violations(self, dist_press): """ Group violations by continuous frames. """ t_list = dist_press['simTime'].values.tolist() f_list = dist_press['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(f_list)): if not sub_group_time or t_list[i] - t_list[i - 1] <= 1: sub_group_time.append(t_list[i]) sub_group_frame.append(f_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [t_list[i]] sub_group_frame = [f_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) return list(zip(group_time, group_frame)) def get_solid_line_violations(self): """ Detect violations of pressing solid lines and return a dictionary with violation details. """ # Extract relevant data Dimy = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2 dist_press = self._filter_solid_lines(Dimy) grouped_violations = self._group_violations(dist_press)# Group violations by continuous frames filtered_groups = self._filter_groups_by_frame_period(grouped_violations)# Filter groups by minimum frame period # Calculate violation count and create violation dataframe press_line_count = len(filtered_groups) press_line_time = self._extract_violation_times(filtered_groups) if press_line_time: time_df = pd.DataFrame(press_line_time, columns=['start_time', 'end_time']) time_df['violation'] = '压实线' # Update violation dataframe self.violation_df = pd.concat([self.violation_df, press_line_time], ignore_index=True) # Create and return violation dictionary warning_count = 0 press_line_dict = { 'metric': 'pressSolidLine', 'weight': 3, 'illegal_count': press_line_count, 'penalty_points': press_line_count * 3, 'penalty_money': press_line_count * 200, 'warning_count': warning_count, 'penalty_law': '《中华人民共和国道路交通安全法》第八十二条:机动车在高速公路上行驶,不得有下列行为:(三)骑、轧车行道分界线或者在路肩上行驶。' } return press_line_dict def normalize_angle(self, angle): """Normalize angle to the range [0, 360).""" difference = angle while difference >= 360: difference -= 360 return difference def is_red_light(self, simTime, cycleTime, duration_start, duration_end): """Check if the current time corresponds to a red light phase.""" divisor = simTime / cycleTime decimal_part = divisor - int(divisor) return duration_start <= decimal_part < duration_end def process_traffic_light(self, trafficLight_id): """Process a single traffic light and detect run red light events.""" trafficLight_position = self.trafficSignal_df[self.trafficSignal_df["playerId"] == trafficLight_id].iloc[:1, :] if trafficLight_position.empty: return trafficLight_position_x = trafficLight_position['posX'].values[0] trafficLight_position_y = trafficLight_position['posY'].values[0] trafficLight_position_heading = trafficLight_position['posH'].values[0] trafficLight_character = self.trafficLight_df[self.trafficLight_df.id == trafficLight_id] cycleTime = trafficLight_character["cycleTime"].values[0] noPhases = trafficLight_character["noPhases"].values[0] # Calculate distances and headings self.ego_df["traffic_light_distance_absolute"] = self.ego_df[['posX', 'posY']].apply( lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1) self.ego_df["traffic_light_h_diff"] = self.ego_df.apply( lambda x: abs(x['posH'] - trafficLight_position_heading) * 57.3, axis=1) self.ego_df["traffic_light_h_diff"] = self.ego_df["traffic_light_h_diff"].apply(self.normalize_angle) # Filter ego vehicles near the traffic light with the correct heading mask_traffic_light = ((self.ego_df['traffic_light_h_diff'] <= 210) & ( self.ego_df['traffic_light_h_diff'] >= 150)) | (self.ego_df['traffic_light_h_diff'] <= 30) | ( self.ego_df['traffic_light_h_diff'] >= 330) ego_near_light = self.ego_df[(self.ego_df.traffic_light_distance_absolute <= 10) & mask_traffic_light] if ego_near_light.empty: return # Check for red light violations ego_near_light["flag_red_traffic_light"] = 0 type_list = trafficLight_character['violation'][:noPhases] duration = trafficLight_character['duration'][:noPhases] duration_correct = [0] * noPhases for number in range(noPhases): duration_correct[number] = sum(duration[:number + 1]) type_current = type_list.values[number] if type_current == 1: # Red light phase if number == 0: duration_start = 0 else: duration_start = duration_correct[number - 1] duration_end = duration_correct[number] ego_near_light["flag_red_traffic_light"] = ego_near_light.apply( lambda x: self.is_red_light(x['simTime'], cycleTime, duration_start, duration_end), axis=1) # Collect run red light events run_red_light_df = ego_near_light[ego_near_light['flag_red_traffic_light'] == 1] self.collect_run_red_light_events(run_red_light_df) def collect_run_red_light_events(self, run_red_light_df): grouped_events = self._group_violations(run_red_light_df) filtered_events = self._filter_groups_by_frame_period(grouped_events) violation_times = self._extract_violation_times(filtered_events) if violation_times: time_df = pd.DataFrame(violation_times, columns=['start_time', 'end_time']) time_df['violation'] = '闯红灯' self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True) def run_red_light_detection(self): """Main function to detect run red light events.""" trafficLight_id_list = set(self.trafficLight_df["id"].tolist()) run_red_light_count = 0 for trafficLight_id in trafficLight_id_list: self.process_traffic_light(trafficLight_id) # 闯红灯次数统计(这里可以根据需要修改统计逻辑) if 'flag_red_traffic_light' in self.ego_df.columns and self.ego_df['flag_red_traffic_light'].any() == 1: run_red_light_count += 1 run_red_light_dict = { 'metric': 'runRedLight', 'weight': 6, 'illegal_count': run_red_light_count, 'penalty_points': run_red_light_count * 6, 'penalty_money': run_red_light_count * 200, 'warning_count': 0, 'penalty_law': '《中华人民共和国道路交通安全法实施条例》第四十条:(二)红色叉形灯或者箭头灯亮时,禁止本车道车辆通行。' } return run_red_light_dict def _find_speed_violations(self): DimX = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2 data_ego = self.objState_df[self.objState_df["playerId"] == 1] speed_limit_sign = self.trafficSignal_df[self.trafficSignal_df["type"] == 274] same_df_rate = pd.merge(speed_limit_sign, data_ego, on=['simTime', 'simFrame'], how='inner').reset_index() speed_df = same_df_rate[(abs(same_df_rate["posX_x"] - same_df_rate["posX_y"]) <= 7) & ( abs(same_df_rate["posY_x"] - same_df_rate["posY_y"]) <= DimX)] speed_df["speed"] = np.sqrt(speed_df["speedX"] ** 2 + speed_df["speedY"] ** 2) * 3.6 list_sign = speed_df[speed_df["speed"] > speed_df["value"]] return list_sign, speed_df def _calculate_overspeed_statistics(self, speed_df, list_sign): index_sign = list_sign.index.to_list() speed_df["flag_press"] = speed_df["simFrame"].apply(lambda x: 1 if x in list_sign["simFrame"] else 0) speed_df["diff_press"] = speed_df["flag_press"].diff() index_list = [] subindex_list = [] for i in range(len(index_sign)): if not subindex_list or index_sign[i] - index_sign[i - 1] == 1: subindex_list.append(index_sign[i]) else: index_list.append(subindex_list) subindex_list = [index_sign[i]] index_list.append(subindex_list) overspeed_count_0_to_10 = 0 overspeed_count_10_to_20 = 0 overspeed_count_20_to_50 = 0 overspeed_count_50_to_ = 0 if index_list[0]: for i in range(len(index_list)): left = index_list[i][0] right = index_list[i][-1] df_tmp = speed_df.loc[left:right + 1] max_ratio = ((df_tmp["speed"] - df_tmp["value"]) / df_tmp["value"]).max() if 0 <= max_ratio < 0.1: overspeed_count_0_to_10 += 1 elif 0.1 <= max_ratio < 0.2: overspeed_count_10_to_20 += 1 elif 0.2 <= max_ratio < 0.5: overspeed_count_20_to_50 += 1 elif max_ratio >= 0.5: overspeed_count_50_to_ += 1 return ( self._create_overspeed_dict(overspeed_count_0_to_10, 'overspeed10', 0, 0), self._create_overspeed_dict(overspeed_count_10_to_20, 'overspeed10_20', 0, 200), self._create_overspeed_dict(overspeed_count_20_to_50, 'overspeed20_50', 6, 200), self._create_overspeed_dict(overspeed_count_50_to_, 'overspeed50', 12, 2000) ) def _create_overspeed_dict(self, count, metric, penalty_points, penalty_money): return { 'metric': metric, 'weight': None, 'illegal_count': count, 'penalty_points': count * penalty_points, 'penalty_money': count * penalty_money, 'warning_count': count if penalty_points == 0 else 0, 'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。' } def overspeed(self): list_sign, speed_df = self._find_speed_violations() grouped_events = self._group_violations(list_sign) filtered_events = self._filter_groups_by_frame_period(grouped_events) violation_times = self._extract_violation_times(filtered_events) if violation_times: time_df = pd.DataFrame([d[0] for d in violation_times], columns=['start_time', 'end_time']) time_df['violation'] = '超速' self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True) return self._calculate_overspeed_statistics(speed_df, list_sign) class Compliance(object): def __init__(self, data_processed, custom_data, scoreModel): self.eval_data = pd.DataFrame() self.penalty_points = 0 self.config = data_processed.config compliance_config = self.config.config['compliance'] self.compliance_config = compliance_config self.weight_dict = compliance_config['weight'] self.metric_list = compliance_config['metric'] self.type_list = compliance_config['type'] print("self.type_list is", self.type_list) self.weight_custom = compliance_config['weightCustom'] self.name_dict = compliance_config['name'] self.metric_dict = compliance_config['typeMetricDict'] self.type_name_dict = compliance_config['typeName'] self.weight = compliance_config['weightDimension'] self.weight_type_dict = compliance_config['typeWeight'] self.weight_type_list = compliance_config['typeWeightList'] self.type_illegal_count_dict = {} self.traffic_rule = traffic_rule(data_processed, scoreModel) self.violation_df = self.traffic_rule.violation_df def score_cal_penalty_points(self, penalty_points): if penalty_points == 0: score = 100 elif penalty_points >= 12: score = 0 else: score = (12 - penalty_points) / 12 * 60 return score def time_splice(self, start_time, end_time): str_time = f"[{start_time}s, {end_time}s]" return str_time def weight_type_cal(self): # penalty_list = [1, 3, 6, 9, 12] penalty_list = [1, 3, 6, 12] sum_penalty = sum(penalty_list) weight_type_list = [round(x / sum_penalty, 2) for x in penalty_list] return weight_type_list def compliance_statistic(self): # metric analysis press_line_dict = self.traffic_rule.get_solid_line_violations() run_red_light_dict = self.traffic_rule.run_red_light_detection() overspeed_0_to_10_dict, overspeed_10_to_20_dict, overspeed_20_to_50_dict, overspeed_50_dict = self.traffic_rule.overspeed() df_list = [] if "overspeed10" in self.metric_list: df_list.append(overspeed_0_to_10_dict) if "overspeed10_20" in self.metric_list: df_list.append(overspeed_10_to_20_dict) if "pressSolidLine" in self.metric_list: df_list.append(press_line_dict) if "runRedLight" in self.metric_list: df_list.append(run_red_light_dict) if "overspeed20_50" in self.metric_list: df_list.append(overspeed_20_to_50_dict) if "overspeed50" in self.metric_list: df_list.append(overspeed_50_dict) # generate dataframe and dicts compliance_df = pd.DataFrame(df_list) return compliance_df def prepare_data(self): self.compliance_df = self.compliance_statistic() self.illegal_count = int(self.compliance_df['illegal_count'].sum()) self.metric_penalty_points_dict = self.compliance_df.set_index('metric').to_dict()['penalty_points'] self.metric_illegal_count_dict = self.compliance_df.set_index('metric').to_dict()['illegal_count'] self.metric_penalty_money_dict = self.compliance_df.set_index('metric').to_dict()['penalty_money'] self.metric_warning_count_dict = self.compliance_df.set_index('metric').to_dict()['warning_count'] self.metric_penalty_law_dict = self.compliance_df.set_index('metric').to_dict()['penalty_law'] # 初始化数据字典 self.illegal_count = int(self.compliance_df['illegal_count'].sum()) self.metric_penalty_points_dict = self.compliance_df.set_index('metric').to_dict()['penalty_points'] self.metric_illegal_count_dict = self.compliance_df.set_index('metric').to_dict()['illegal_count'] self.metric_penalty_money_dict = self.compliance_df.set_index('metric').to_dict()['penalty_money'] self.metric_warning_count_dict = self.compliance_df.set_index('metric').to_dict()['warning_count'] self.metric_penalty_law_dict = self.compliance_df.set_index('metric').to_dict()['penalty_law'] def calculate_deduct_scores(self): score_type_dict = {} deduct_functions = { "deduct1": self.calculate_deduct(1), "deduct3": self.calculate_deduct(3), "deduct6": self.calculate_deduct(6), "deduct9": self.calculate_deduct(9), "deduct12": self.calculate_deduct(12), } for deduct_type in self.type_list: print("deduct_type is", deduct_type) if deduct_type in deduct_functions: penalty_points, illegal_count = deduct_functions[deduct_type] score_type_dict[deduct_type] = self.score_cal_penalty_points(penalty_points) self.type_illegal_count_dict[deduct_type] = illegal_count return score_type_dict def calculate_deduct(self, num): deduct_df = self.compliance_df[(self.compliance_df['weight'].isna()) | (self.compliance_df['weight'] == num)] return deduct_df['penalty_points'].sum(), deduct_df['illegal_count'].sum() def calculate_weights(self): weight_dict = { "overspeed10": 0.5, "overspeed10_20": 0.5, "pressSolidLine": 1.0, "runRedLight": 0.5, "overspeed20_50": 0.5, "overspeed50": 1.0 } self.weight_type_list = [weight_dict.get(metric, 0.5) for metric in self.type_list] # 假设 type_list 中的每个元素都在 weight_dict 的键中,或者默认为 0.5 self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)} self.weight_dict = weight_dict def calculate_compliance_score(self, score_type_dict): if not self.weight_custom: # 客观赋权 self.calculate_weights() penalty_points_threshold = 12 # 假设的扣分阈值 if hasattr(self, 'penalty_points') and self.penalty_points >= penalty_points_threshold: score_compliance = 0 elif sum(score_type_dict.values()) / len(score_type_dict) == 100: score_compliance = 100 else: score_type_tmp = [80 if x == 100 else x for key, x in score_type_dict.items()] score_compliance = np.dot(self.weight_type_list, score_type_tmp) return round(score_compliance, 2) def output_results(self, score_compliance, score_type_dict): print("\n[合规性表现及得分情况]") print(f"合规性得分为:{score_compliance:.2f}分。") print(f"合规性各分组得分为:{score_type_dict}。") print(f"合规性各分组权重为:{self.weight_type_list}。") def compliance_score(self): self.prepare_data() score_type_dict = self.calculate_deduct_scores() score_compliance = self.calculate_compliance_score(score_type_dict) self.output_results(score_compliance, score_type_dict) return score_compliance, score_type_dict def _get_weight_distribution(self): # get weight distribution weight_distribution = {} weight_distribution["name"] = self.config.dimension_name["compliance"] for type in self.type_list: type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in self.weight_dict.items() if key in self.metric_dict[type]} weight_distribution_type = { "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)", "indexes": type_weight_indexes_dict } weight_distribution[type] = weight_distribution_type return weight_distribution def build_weight_index(self, metric_prefix, weight_key): if metric_prefix in self.metric_list: return { f"{metric_prefix}Weight": f"{metric_prefix.replace('_', ' ').capitalize()}({self.weight_dict[weight_key] * 100:.2f}%)"} return {} def compliance_weight_distribution(self): weight_distribution = {"name": "合规性"} deduct_types = { "deduct1": {"metrics": ["overspeed10", "overspeed10_20"], "weight_key": "deduct1"}, "deduct3": {"metrics": ["pressSolidLine"], "weight_key": "deduct3"}, "deduct6": {"metrics": ["runRedLight", "overspeed20_50"], "weight_key": "deduct6"}, "deduct9": {"metrics": [], "weight_key": "deduct9"}, # Assuming no specific metrics for deduct9 "deduct12": {"metrics": ["overspeed50"], "weight_key": "deduct12"}, } for deduct_type, details in deduct_types.items(): if deduct_type in self.type_list: indexes_dict = {} for metric in details["metrics"]: indexes_dict.update(self.build_weight_index(metric, details["weight_key"])) weight_distribution[deduct_type] = { f"{deduct_type.replace('deduct', '')}Weight": f"{details['weight_key'].replace('deduct', '').replace('_', ' ').capitalize()}" f"违规({int(details['weight_key'].replace('deduct', ''))}分)({self.weight_type_dict[deduct_type] * 100:.2f} % )", "indexes": indexes_dict } if deduct_type == "deduct9" and "deduct9" in self.type_list: weight_distribution[deduct_type] = { "deduct9Weight": f"严重违规(9分)({self.weight_type_dict[deduct_type] * 100:.2f}%)", "indexes": {} } return weight_distribution def report_statistic(self): score_compliance, score_type_dict = self.compliance_score() grade_compliance = score_grade(score_compliance) score_compliance = int(score_compliance) if int(score_compliance) == score_compliance else score_compliance score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()] # 获取合规性描述 comp_description1 = self.get_compliance_description(grade_compliance, self.illegal_count) comp_description2 = self.get_violation_details(self.type_list, self.type_illegal_count_dict, self.type_name_dict) weight_distribution = self._get_weight_distribution() # 获取违规数据表 violations_slices = self.get_violations_table() # 获取扣分详情 deductPoint_dict = self.get_deduct_points_dict(score_type_dict) # 返回结果(这里可以根据需要返回具体的数据结构) return { "name": "合规性", "weight": f"{self.weight * 100:.2f}%", "weightDistribution": weight_distribution, "score": score_compliance, "level": grade_compliance, 'score_type': score_type, # 'score_metric': score_metric, 'illegalCount': self.illegal_count, "description1": comp_description1, "description2": comp_description2, "details": deductPoint_dict, "violations": violations_slices } def get_compliance_description(self, grade_compliance, illegal_count): # 获取合规性描述 if grade_compliance == '优秀': return '车辆在本轮测试中无违反交通法规行为;' else: return f'车辆在本轮测试中共发生{illegal_count}次违反交通法规行为;' \ f'如果等级为一般或较差,需要提高算法在合规性上的表现。' def get_violation_details(self, type_list, type_illegal_count_dict, type_name_dict): # 获取违规详情描述 if self.illegal_count == 0: return "车辆在该用例中无违反交通法规行为,算法表现良好。" else: str_illegel_type = ", ".join( [f"{type_name_dict[type]}行为{count}次" for type, count in type_illegal_count_dict.items() if count > 0]) return f"车辆在该用例共违反交通法规{self.illegal_count}次。其中{str_illegel_type}。违规行为详情见附录C。" def get_violations_table(self): # 获取违规数据表 if not self.violation_df.empty: self.violation_df['time'] = self.violation_df.apply( lambda row: self.time_splice(row['start_time'], row['end_time']), axis=1) df_violations = self.violation_df[['time', 'violation']] return df_violations.to_dict('records') else: return [] def get_deduct_points_dict(self, score_type_dict): # 获取扣分详情字典 deductPoint_dict = {} for type in self.type_list: type_dict = { "name": self.type_name_dict[type], "score": score_type_dict[type], "indexes": {} } for metric in self.metric_list: type_dict["indexes"][metric] = { "name": self.name_dict[metric], "times": self.metric_illegal_count_dict[metric], "deductPoints": self.metric_penalty_points_dict[metric], "fine": self.metric_penalty_money_dict[metric], "basis": self.metric_penalty_law_dict[metric] } deductPoint_dict[type] = type_dict for deduct_type, metrics in [ ("deduct1", ["overspeed10", "overspeed10_20"]), ("deduct3", ["pressSolidLine"]), ("deduct6", ["runRedLight", "overspeed20_50"]), ("deduct9", ["xx"]), # 注意:这里xx是示例,实际应替换为具体的违规类型 ("deduct12", ["overspeed50"]) ]: if deduct_type in self.type_list: deduct_indexes = {} for metric in metrics: if metric in self.metric_list: deduct_indexes[metric] = { "name": self.name_dict[metric], "times": self.metric_illegal_count_dict[metric], "deductPoints": self.metric_penalty_points_dict[metric], "fine": self.metric_penalty_money_dict[metric], "basis": self.metric_penalty_law_dict[metric] } # 对于deduct9,特殊处理其score值 score = score_type_dict.get(deduct_type, 100) if deduct_type != "deduct9" else 100 deductPoint_dict[deduct_type] = { "name": f"{deduct_type.replace('deduct', '').strip()}违规({int(deduct_type.replace('deduct', ''))}分)", "score": score, "indexes": deduct_indexes } return deductPoint_dict def get_eval_data(self): df = self.eval_data return df