Selaa lähdekoodia

Refactoring and TTC screen

XGJ_zhaoyuan 6 kuukautta sitten
vanhempi
commit
21ee24e1a2

+ 326 - 700
cicv_evaluate_master-single_eval_1117/compliance.py

@@ -20,114 +20,53 @@ sys.path.append('../results')
 import numpy as np
 import pandas as pd
 
-from data_info import DataInfoList
 from common import score_grade, string_concatenate, replace_key_with_value, score_over_100
 from scipy.spatial.distance import euclidean
 
-
-class Compliance(object):
+class traffic_rule(object):
     """
-        Class for achieving compliance metrics for autonomous driving.
+            Class for achieving compliance metrics for autonomous driving.
 
-    Attributes:
-        droadMark_df: Roadmark data, stored in dataframe format.
-
-    """
+        Attributes:
+            droadMark_df: Roadmark data, stored in dataframe format.
 
-    def __init__(self, data_processed, custom_data, scoreModel):
-        self.eval_data = pd.DataFrame()
+        """
+    def __init__(self, data_processed, scoreModel):
         self.scoreModel = scoreModel
 
         self.roadMark_df = data_processed.road_mark_df
         self.trafficLight_df = data_processed.traffic_light_df
         self.trafficSignal_df = data_processed.traffic_signal_df
         self.objState_df = data_processed.object_df
+        self.ego_df = self.objState_df[(self.objState_df.playerId == 1) & (self.objState_df.type == 1)]
 
         self.violation_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'violation'])
 
-        self.illegal_count = 0
-        self.penalty_points = 0
-        self.penalty_money = 0
-        self.warning_count = 0
-
-        self.config = data_processed.config
-        compliance_config = self.config.config['compliance']
-        self.compliance_config = compliance_config
-
-        # common data
-        self.bulitin_metric_list = self.config.builtinMetricList
-
-        # dimension data
-        self.weight_custom = compliance_config['weightCustom']
-        self.metric_list = compliance_config['metric']
-        self.type_list = compliance_config['type']
-        self.type_name_dict = compliance_config['typeName']
-        self.name_dict = compliance_config['name']
-        self.unit_dict = compliance_config['unit']
-        self.metric_dict = compliance_config['typeMetricDict']
-
-        # custom metric data
-        # self.customMetricParam = compliance_config['customMetricParam']
-        # self.custom_metric_list = list(self.customMetricParam.keys())
-        self.custom_data = custom_data
-        self.custom_param_dict = {}
+    def _filter_groups_by_frame_period(self, grouped_violations):
+        """
+        Filter groups by a minimum continuous frame period.
+        """
+        CONTINUOUS_FRAME_PERIOD = 13
+        return [g for g in grouped_violations if len(g[0]) >= CONTINUOUS_FRAME_PERIOD]
 
-        # score data
-        self.weight = compliance_config['weightDimension']
-        self.weight_dict = compliance_config['weight']
-        self.weight_list = compliance_config['weightList']
-        self.weight_type_dict = compliance_config['typeWeight']
-        self.weight_type_list = compliance_config['typeWeightList']
+    def _extract_violation_times(self, filtered_groups):
+        """
+        Create a dataframe with start and end times for each violation group.
+        """
+        return [[g[0][0], g[0][-1]] for g in filtered_groups]
 
-        # type dicts
-        self.type_illegal_count_dict = {}
-        self.type_penalty_points_dict = {}
-        self.type_penalty_money_dict = {}
-        self.type_warning_count_dict = {}
-        self.type_penalty_law_dict = {}
-
-        # metric dicts
-        self.metric_illegal_count_dict = {}
-        self.metric_penalty_points_dict = {}
-        self.metric_penalty_money_dict = {}
-        self.metric_warning_count_dict = {}
-        self.metric_penalty_law_dict = {}
-
-        # self.metric_illegal_list = []
-        self.metric_illegal_count_list = []
-        self.metric_penalty_points_list = []
-        self.metric_penalty_money_list = []
-        self.metric_warning_count_list = []
-        self.metric_penalty_law_list = []
-
-    def press_solid_line(self):
+    def _filter_solid_lines(self, Dimy):
         """
-        #define RDB_ROADMARK_TYPE_NONE           0
-        #define RDB_ROADMARK_TYPE_SOLID          1
-        #define RDB_ROADMARK_TYPE_BROKEN         2
-        #define RDB_ROADMARK_TYPE_CURB           3
-        #define RDB_ROADMARK_TYPE_GRASS          4
-        #define RDB_ROADMARK_TYPE_BOTDOT         5
-        #define RDB_ROADMARK_TYPE_OTHER          6
-        #define RDB_ROADMARK_TYPE_SOLID_SOLID    7
-        #define RDB_ROADMARK_TYPE_BROKEN_SOLID   8
-        #define RDB_ROADMARK_TYPE_SOLID_BROKEN   9
-        #define RDB_ROADMARK_TYPE_LANE_CENTER   10
-        #define RDB_ROADMARK_COLOR_NONE          0
-        #define RDB_ROADMARK_COLOR_WHITE         1
-        #define RDB_ROADMARK_COLOR_RED           2
-        #define RDB_ROADMARK_COLOR_YELLOW        3
-        #define RDB_ROADMARK_COLOR_OTHER         4
-        #define RDB_ROADMARK_COLOR_BLUE          5
-        #define RDB_ROADMARK_COLOR_GREEN         6
+        Filter solid lines within the player's lateral distance.
         """
-        # Dimy = self.objState_df[self.objState_df["id"] == 1]["dimY"][0] / 2
-        Dimy = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2
         dist_line = self.roadMark_df[self.roadMark_df["type"] == 1]
         dist_line = dist_line.reset_index()
-        dist_press = dist_line[abs(dist_line["lateralDist"].values) <= Dimy]
+        return dist_line[abs(dist_line["lateralDist"].values) <= Dimy]
 
-        # 违规行为详情表统计
+    def _group_violations(self, dist_press):
+        """
+        Group violations by continuous frames.
+        """
         t_list = dist_press['simTime'].values.tolist()
         f_list = dist_press['simFrame'].values.tolist()
         group_time = []
@@ -143,32 +82,29 @@ class Compliance(object):
                 group_frame.append(sub_group_frame)
                 sub_group_time = [t_list[i]]
                 sub_group_frame = [f_list[i]]
-
-        CONTINUOUS_FRAME_PERIOD = 13
         group_time.append(sub_group_time)
         group_frame.append(sub_group_frame)
-        group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD]
-        group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD]
-        # group_time = [g for g in group_time if g[-1]-g[0] >= 1]
-        # group_frame = [g for g in group_frame if g[-1]-g[0] >= 13]
-
-        press_line_count = len(group_time)
-
-        # 输出图表值
-        press_line_time = [[g[0], g[-1]] for g in group_time]
-        press_line_frame = [[g[0], g[-1]] for g in group_frame]
+        return list(zip(group_time, group_frame))
 
+    def get_solid_line_violations(self):
+        """
+        Detect violations of pressing solid lines and return a dictionary with violation details.
+        """
+        # Extract relevant data
+        Dimy = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2
+        dist_press = self._filter_solid_lines(Dimy)
+        grouped_violations = self._group_violations(dist_press)# Group violations by continuous frames
+        filtered_groups = self._filter_groups_by_frame_period(grouped_violations)# Filter groups by minimum frame period
+        # Calculate violation count and create violation dataframe
+        press_line_count = len(filtered_groups)
+        press_line_time = self._extract_violation_times(filtered_groups)
         if press_line_time:
             time_df = pd.DataFrame(press_line_time, columns=['start_time', 'end_time'])
-            # frame_df = pd.DataFrame(press_line_frame, columns=['start_frame', 'end_frame'])
             time_df['violation'] = '压实线'
-            # frame_df['violation'] = '压实线'
-            # self.violation_df = pd.concat([self.violation_df, time_df, frame_df], ignore_index=True)
-            self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
-            # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True)
-
+            # Update violation dataframe
+            self.violation_df = pd.concat([self.violation_df, press_line_time], ignore_index=True)
+        # Create and return violation dictionary
         warning_count = 0
-
         press_line_dict = {
             'metric': 'pressSolidLine',
             'weight': 3,
@@ -178,118 +114,83 @@ class Compliance(object):
             'warning_count': warning_count,
             'penalty_law': '《中华人民共和国道路交通安全法》第八十二条:机动车在高速公路上行驶,不得有下列行为:(三)骑、轧车行道分界线或者在路肩上行驶。'
         }
-
         return press_line_dict
 
-    def normalization_processing(self, x):
-        difference = x["traffic_light_h_diff"]
-        while (difference >= 360):
+    def normalize_angle(self, angle):
+        """Normalize angle to the range [0, 360)."""
+        difference = angle
+        while difference >= 360:
             difference -= 360
         return difference
 
-    def flag_red_traffic_light(self, x, cycleTime, duration_start, duration_end):
-        divisor = x['simTime'] / cycleTime
+    def is_red_light(self, simTime, cycleTime, duration_start, duration_end):
+        """Check if the current time corresponds to a red light phase."""
+        divisor = simTime / cycleTime
         decimal_part = divisor - int(divisor)
-        # decimal_part = divisor % 1
-        if duration_start <= decimal_part < duration_end:
-            return 1
-        else:
-            return x["flag_red_traffic_light"]
-
-    def run_red_light(self):
-        """
+        return duration_start <= decimal_part < duration_end
+
+    def process_traffic_light(self, trafficLight_id):
+        """Process a single traffic light and detect run red light events."""
+        trafficLight_position = self.trafficSignal_df[self.trafficSignal_df["playerId"] == trafficLight_id].iloc[:1, :]
+        if trafficLight_position.empty:
+            return
+        trafficLight_position_x = trafficLight_position['posX'].values[0]
+        trafficLight_position_y = trafficLight_position['posY'].values[0]
+        trafficLight_position_heading = trafficLight_position['posH'].values[0]
+        trafficLight_character = self.trafficLight_df[self.trafficLight_df.id == trafficLight_id]
+        cycleTime = trafficLight_character["cycleTime"].values[0]
+        noPhases = trafficLight_character["noPhases"].values[0]
+        # Calculate distances and headings
+        self.ego_df["traffic_light_distance_absolute"] = self.ego_df[['posX', 'posY']].apply(
+            lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1)
+        self.ego_df["traffic_light_h_diff"] = self.ego_df.apply(
+            lambda x: abs(x['posH'] - trafficLight_position_heading) * 57.3, axis=1)
+        self.ego_df["traffic_light_h_diff"] = self.ego_df["traffic_light_h_diff"].apply(self.normalize_angle)
+        # Filter ego vehicles near the traffic light with the correct heading
+        mask_traffic_light = ((self.ego_df['traffic_light_h_diff'] <= 210) & (
+                self.ego_df['traffic_light_h_diff'] >= 150)) | (self.ego_df['traffic_light_h_diff'] <= 30) | (
+                                     self.ego_df['traffic_light_h_diff'] >= 330)
+        ego_near_light = self.ego_df[(self.ego_df.traffic_light_distance_absolute <= 10) & mask_traffic_light]
+        if ego_near_light.empty:
+            return
+        # Check for red light violations
+        ego_near_light["flag_red_traffic_light"] = 0
+        type_list = trafficLight_character['violation'][:noPhases]
+        duration = trafficLight_character['duration'][:noPhases]
+        duration_correct = [0] * noPhases
+        for number in range(noPhases):
+            duration_correct[number] = sum(duration[:number + 1])
+            type_current = type_list.values[number]
+            if type_current == 1:  # Red light phase
+                if number == 0:
+                    duration_start = 0
+                else:
+                    duration_start = duration_correct[number - 1]
+                duration_end = duration_correct[number]
+                ego_near_light["flag_red_traffic_light"] = ego_near_light.apply(
+                    lambda x: self.is_red_light(x['simTime'], cycleTime, duration_start, duration_end), axis=1)
+        # Collect run red light events
+        run_red_light_df = ego_near_light[ego_near_light['flag_red_traffic_light'] == 1]
+        self.collect_run_red_light_events(run_red_light_df)
+
+    def collect_run_red_light_events(self, run_red_light_df):
+        grouped_events = self._group_violations(run_red_light_df)
+        filtered_events = self._filter_groups_by_frame_period(grouped_events)
+        violation_times = self._extract_violation_times(filtered_events)
+        if violation_times:
+            time_df = pd.DataFrame(violation_times, columns=['start_time', 'end_time'])
+            time_df['violation'] = '闯红灯'
+            self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
 
-        """
-        ego_df = self.objState_df[(self.objState_df.playerId == 1) & (self.objState_df.type == 1)]
+    def run_red_light_detection(self):
+        """Main function to detect run red light events."""
         trafficLight_id_list = set(self.trafficLight_df["id"].tolist())
-        l_stipulate = 10
         run_red_light_count = 0
         for trafficLight_id in trafficLight_id_list:
-            trafficLight_position = self.trafficSignal_df[self.trafficSignal_df["playerId"] == trafficLight_id]
-            trafficLight_character = self.trafficLight_df[self.trafficLight_df.id == trafficLight_id]
-            if trafficLight_position.empty:  # trafficSign中没有记录
-                continue
-            trafficLight_position = trafficLight_position.iloc[:1, :]
-            trafficLight_position_x = trafficLight_position['posX'].values[0]
-            trafficLight_position_y = trafficLight_position['posY'].values[0]
-            trafficLight_position_heading = trafficLight_position['posH'].values[0]
-            cycleTime = trafficLight_character["cycleTime"].values[0]
-            noPhases = trafficLight_character["noPhases"].values[0]
-
-            ego_df["traffic_light_distance_absolute"] = ego_df[['posX', 'posY']].apply( \
-                lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1)
-
-            # ego_df["traffic_light_distance_absolute"] = ego_df.apply(lambda x: euclidean((trafficLight_position_x, trafficLight_position_y), (x['posX'], x['posY'])), axis=1)
-
-            ego_df["traffic_light_h_diff"] = ego_df.apply(
-                lambda x: abs(x['posH'] - trafficLight_position_heading) * 57.3, axis=1)
-            ego_df["traffic_light_h_diff"] = ego_df.apply(
-                lambda x: self.normalization_processing(x), axis=1).copy()  # 归一化到[0,360)之间
-            mask_trafftic_light = ((ego_df['traffic_light_h_diff'] <= 210) & (
-                    ego_df['traffic_light_h_diff'] >= 150)) | (ego_df['traffic_light_h_diff'] <= 30) | (
-                                          ego_df['traffic_light_h_diff'] >= 330)
-            ego_near_light = ego_df[(ego_df.traffic_light_distance_absolute <= l_stipulate) & mask_trafftic_light]
-            if ego_near_light.empty:
-                continue
-            """ 当前是否为红灯 """
-            ego_near_light["flag_red_traffic_light"] = 0  # 不是红灯
-            type_list = trafficLight_character['violation'][:noPhases]
-            duration = trafficLight_character['duration'][:noPhases]
-            duration_correct = [0] * noPhases
-            for number in range(noPhases):
-                duration_correct[number] = sum(duration[:number + 1])
-                type_current = type_list.values[number]
-                if type_current == 1:  # 当前duration是红灯
-                    if number == 0:
-                        duration_start = 0
-                    else:
-                        duration_start = duration_correct[number - 1]
-                    duration_end = duration_correct[number]
-                    ego_near_light["flag_red_traffic_light"] = ego_near_light[
-                        ['simTime', 'flag_red_traffic_light']].apply(
-                        lambda x: self.flag_red_traffic_light(x, cycleTime, duration_start, duration_end), axis=1)
-
-            # 挑出闯红灯数据
-            run_red_light_df = ego_near_light[ego_near_light['flag_red_traffic_light'] == 1]
-
-            # 违规行为详情表统计
-            t_list = run_red_light_df['simTime'].values.tolist()
-            f_list = run_red_light_df['simFrame'].values.tolist()
-
-            group_time = []
-            group_frame = []
-            sub_group_time = []
-            sub_group_frame = []
-            for i in range(len(f_list)):
-                if not sub_group_time or t_list[i] - t_list[i - 1] <= 1:
-                    sub_group_time.append(t_list[i])
-                    sub_group_frame.append(f_list[i])
-                else:
-                    group_time.append(sub_group_time)
-                    group_frame.append(sub_group_frame)
-                    sub_group_time = [t_list[i]]
-                    sub_group_frame = [f_list[i]]
-
-            CONTINUOUS_FRAME_PERIOD = 13
-            group_time.append(sub_group_time)
-            group_frame.append(sub_group_frame)
-            group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD]
-            group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD]
-
-            run_red_light_time = [[g[0], g[-1]] for g in group_time]
-            run_red_light_frame = [[g[0], g[-1]] for g in group_frame]
-
-            if run_red_light_time:
-                time_df = pd.DataFrame(run_red_light_time, columns=['start_time', 'end_time'])
-                # frame_df = pd.DataFrame(run_red_light_frame, columns=['start_frame', 'end_frame'])
-                time_df['violation'] = '闯红灯'
-                # frame_df['violation'] = '闯红灯'
-                self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
-                # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True)
-
-            # 闯红灯次数统计
-            if ego_near_light["flag_red_traffic_light"].any() == 1:
-                run_red_light_count = run_red_light_count + 1
+            self.process_traffic_light(trafficLight_id)
+            # 闯红灯次数统计(这里可以根据需要修改统计逻辑)
+            if 'flag_red_traffic_light' in self.ego_df.columns and self.ego_df['flag_red_traffic_light'].any() == 1:
+                run_red_light_count += 1
 
         run_red_light_dict = {
             'metric': 'runRedLight',
@@ -300,81 +201,23 @@ class Compliance(object):
             'warning_count': 0,
             'penalty_law': '《中华人民共和国道路交通安全法实施条例》第四十条:(二)红色叉形灯或者箭头灯亮时,禁止本车道车辆通行。'
         }
-
         return run_red_light_dict
 
-    def overspeed(self):
-        """
-        《中华人民共和国道路交通安全法实施条例》第四十五条:机动车在道路上行驶不得超过限速标志、标线标明的速度;
-        1、高速、城市快速路超速超过规定时速10%以内,处以警告,不扣分;
-        2、普通私家车高速超过规定时速10%以上未达20%的,处以200元罚款,记3分,普通私家车在除高速公路、城市快速路外的道路,超速20%以上未达到50%,会一次被记3分;
-        3、超过规定时速20%以上未达50%的,处以200元罚款,记6分,或在普通道路上超速50%以上,都会一次被记6分。;
-        4、超过规定时速50%以上的,可吊销驾驶证并罚款2000元,计12分。
-
-        [0, 10) 0,0,1
-        [10, 20) 0,200,1
-        高速、城市快速路:[20, 50) 6,200
-        高速、城市快速路:[50,)12,2000
-
-        高速、城市以外:[20, 50)3,200
-        高速、城市以外:[50,)6,1000-2000
-        """
-        Dimx = self.objState_df[self.objState_df["playerId"] == 1]["dimX"][0] / 2
+    def _find_speed_violations(self):
+        DimX = self.objState_df[self.objState_df["playerId"] == 1]["dimY"][0] / 2
         data_ego = self.objState_df[self.objState_df["playerId"] == 1]
         speed_limit_sign = self.trafficSignal_df[self.trafficSignal_df["type"] == 274]
-        same_df_rate = pd.merge(speed_limit_sign, data_ego, on=['simTime', 'simFrame'], how='inner')
-        same_df_rate = same_df_rate.reset_index()
-
+        same_df_rate = pd.merge(speed_limit_sign, data_ego, on=['simTime', 'simFrame'], how='inner').reset_index()
         speed_df = same_df_rate[(abs(same_df_rate["posX_x"] - same_df_rate["posX_y"]) <= 7) & (
-                abs(same_df_rate["posY_x"] - same_df_rate["posY_y"]) <= Dimx)]
+                abs(same_df_rate["posY_x"] - same_df_rate["posY_y"]) <= DimX)]
         speed_df["speed"] = np.sqrt(speed_df["speedX"] ** 2 + speed_df["speedY"] ** 2) * 3.6
-
-        # speed_df["value"] = 10
-        # same_df_rate["value"] = 10
-
         list_sign = speed_df[speed_df["speed"] > speed_df["value"]]
+        return list_sign, speed_df
 
-        # 违规行为详情表统计
-        t_list = list_sign['simTime'].values.tolist()
-        f_list = list_sign['simFrame'].values.tolist()
-        group_time = []
-        group_frame = []
-        sub_group_time = []
-        sub_group_frame = []
-        for i in range(len(f_list)):
-            if not sub_group_time or t_list[i] - t_list[i - 1] <= 2:
-                sub_group_time.append(t_list[i])
-                sub_group_frame.append(f_list[i])
-            else:
-                group_time.append(sub_group_time)
-                group_frame.append(sub_group_frame)
-                sub_group_time = [t_list[i]]
-                sub_group_frame = [f_list[i]]
-
-        CONTINUOUS_FRAME_PERIOD = 13
-        group_time.append(sub_group_time)
-        group_frame.append(sub_group_frame)
-        group_time = [g for g in group_time if len(g) >= CONTINUOUS_FRAME_PERIOD]
-        group_frame = [g for g in group_frame if len(g) >= CONTINUOUS_FRAME_PERIOD]
-
-        # 输出图表值
-        overspeed_time = [[g[0], g[-1]] for g in group_time]
-        overspeed_frame = [[g[0], g[-1]] for g in group_frame]
-
-        if overspeed_time:
-            time_df = pd.DataFrame(overspeed_time, columns=['start_time', 'end_time'])
-            # frame_df = pd.DataFrame(overspeed_frame, columns=['start_frame', 'end_frame'])
-            time_df['violation'] = '超速'
-            # frame_df['violation'] = '超速'
-            self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
-            # self.violation_df = pd.concat([self.violation_df, frame_df], ignore_index=True)
-
-        # list_sign = list_sign.reset_index()
+    def _calculate_overspeed_statistics(self, speed_df, list_sign):
         index_sign = list_sign.index.to_list()
         speed_df["flag_press"] = speed_df["simFrame"].apply(lambda x: 1 if x in list_sign["simFrame"] else 0)
         speed_df["diff_press"] = speed_df["flag_press"].diff()
-        # overrate_count = speed_df[speed_df["diff_press"] == -1]["diff_press"].count()
-
         index_list = []
         subindex_list = []
         for i in range(len(index_sign)):
@@ -382,89 +225,81 @@ class Compliance(object):
                 subindex_list.append(index_sign[i])
             else:
                 index_list.append(subindex_list)
-                subindex_list.append(index_sign[i])
+                subindex_list = [index_sign[i]]
         index_list.append(subindex_list)
-
         overspeed_count_0_to_10 = 0
         overspeed_count_10_to_20 = 0
         overspeed_count_20_to_50 = 0
         overspeed_count_50_to_ = 0
-
         if index_list[0]:
             for i in range(len(index_list)):
                 left = index_list[i][0]
                 right = index_list[i][-1]
                 df_tmp = speed_df.loc[left:right + 1]
                 max_ratio = ((df_tmp["speed"] - df_tmp["value"]) / df_tmp["value"]).max()
-                if max_ratio >= 0 and max_ratio < 0.1:
+                if 0 <= max_ratio < 0.1:
                     overspeed_count_0_to_10 += 1
-                elif max_ratio >= 0.1 and max_ratio < 0.2:
+                elif 0.1 <= max_ratio < 0.2:
                     overspeed_count_10_to_20 += 1
-                elif max_ratio >= 0.2 and max_ratio < 0.5:
+                elif 0.2 <= max_ratio < 0.5:
                     overspeed_count_20_to_50 += 1
                 elif max_ratio >= 0.5:
                     overspeed_count_50_to_ += 1
-
-        """
-        [0, 10) 0,0,1
-        [10, 20) 0,200,1
-        高速、城市快速路:[20, 50) 6,200
-        高速、城市快速路:[50,)12,2000
-        """
-
-        overspeed_0_to_10 = {
-            # 'metric': 'overspeed_0_to_10',
-            'metric': 'overspeed10',
+        return (
+            self._create_overspeed_dict(overspeed_count_0_to_10, 'overspeed10', 0, 0),
+            self._create_overspeed_dict(overspeed_count_10_to_20, 'overspeed10_20', 0, 200),
+            self._create_overspeed_dict(overspeed_count_20_to_50, 'overspeed20_50', 6, 200),
+            self._create_overspeed_dict(overspeed_count_50_to_, 'overspeed50', 12, 2000)
+        )
+
+    def _create_overspeed_dict(self, count, metric, penalty_points, penalty_money):
+        return {
+            'metric': metric,
             'weight': None,
-            'illegal_count': overspeed_count_0_to_10,
-            'penalty_points': 0,
-            'penalty_money': 0,
-            'warning_count': overspeed_count_0_to_10,
+            'illegal_count': count,
+            'penalty_points': count * penalty_points,
+            'penalty_money': count * penalty_money,
+            'warning_count': count if penalty_points == 0 else 0,
             'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
         }
 
-        overspeed_10_to_20 = {
-            # 'metric': 'overspeed_10_to_20',
-            'metric': 'overspeed10_20',
-            'weight': None,
-            'illegal_count': overspeed_count_10_to_20,
-            'penalty_points': 0,
-            'penalty_money': overspeed_count_10_to_20 * 200,
-            'warning_count': overspeed_count_10_to_20,
-            'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
-        }
+    def overspeed(self):
+        list_sign, speed_df = self._find_speed_violations()
+        grouped_events = self._group_violations(list_sign)
+        filtered_events = self._filter_groups_by_frame_period(grouped_events)
+        violation_times = self._extract_violation_times(filtered_events)
+        if violation_times:
+            time_df = pd.DataFrame([d[0] for d in violation_times], columns=['start_time', 'end_time'])
+            time_df['violation'] = '超速'
+            self.violation_df = pd.concat([self.violation_df, time_df], ignore_index=True)
+        return self._calculate_overspeed_statistics(speed_df, list_sign)
 
-        overspeed_20_to_50 = {
-            # 'metric': 'overspeed_20_to_50',
-            'metric': 'overspeed20_50',
-            'weight': 6,
-            'illegal_count': overspeed_count_20_to_50,
-            'penalty_points': overspeed_count_20_to_50 * 6,
-            'penalty_money': overspeed_count_20_to_50 * 200,
-            'warning_count': 0,
-            'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
-        }
+class Compliance(object):
+    def __init__(self, data_processed, custom_data, scoreModel):
+        self.eval_data = pd.DataFrame()
+        self.penalty_points = 0
 
-        overspeed_50_to_ = {
-            # 'metric': 'overspeed_50_to_',
-            'metric': 'overspeed50',
-            'weight': 12,
-            'illegal_count': overspeed_count_50_to_,
-            'penalty_points': overspeed_count_50_to_ * 12,
-            'penalty_money': overspeed_count_50_to_ * 2000,
-            'warning_count': 0,
-            'penalty_law': '《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。'
-        }
+        self.config = data_processed.config
+        compliance_config = self.config.config['compliance']
+        self.compliance_config = compliance_config
+        self.weight_dict = compliance_config['weight']
+        self.metric_list = compliance_config['metric']
+        self.type_list = compliance_config['type']
+        print("self.type_list is", self.type_list)
+        self.weight_custom = compliance_config['weightCustom']
+        self.name_dict = compliance_config['name']
+        self.metric_dict = compliance_config['typeMetricDict']
+        self.type_name_dict = compliance_config['typeName']
+        self.weight = compliance_config['weightDimension']
+        self.weight_type_dict = compliance_config['typeWeight']
+        self.weight_type_list = compliance_config['typeWeightList']
 
-        return overspeed_0_to_10, overspeed_10_to_20, overspeed_20_to_50, overspeed_50_to_
+        self.type_illegal_count_dict = {}
+
+        self.traffic_rule = traffic_rule(data_processed, scoreModel)
+        self.violation_df = self.traffic_rule.violation_df
 
     def score_cal_penalty_points(self, penalty_points):
-        """
-        # 1_results: 0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 100
-        # 3_results: 0, 15, 30, 45, 100
-        # 6_results: 0, 30, 100
-        # 9_results: 0, 15, 100
-        """
         if penalty_points == 0:
             score = 100
         elif penalty_points >= 12:
@@ -477,10 +312,6 @@ class Compliance(object):
         str_time = f"[{start_time}s, {end_time}s]"
         return str_time
 
-    def time_frame_splice(self, start_time, end_time, start_frame, end_frame):
-        str_time = f"[{start_time}s, {end_time}s], {start_frame}-{end_frame}"
-        return str_time
-
     def weight_type_cal(self):
         # penalty_list = [1, 3, 6, 9, 12]
         penalty_list = [1, 3, 6, 12]
@@ -489,11 +320,10 @@ class Compliance(object):
         return weight_type_list
 
     def compliance_statistic(self):
-
         # metric analysis
-        press_line_dict = self.press_solid_line()
-        run_red_light_dict = self.run_red_light()
-        overspeed_0_to_10_dict, overspeed_10_to_20_dict, overspeed_20_to_50_dict, overspeed_50_dict = self.overspeed()
+        press_line_dict = self.traffic_rule.get_solid_line_violations()
+        run_red_light_dict = self.traffic_rule.run_red_light_detection()
+        overspeed_0_to_10_dict, overspeed_10_to_20_dict, overspeed_20_to_50_dict, overspeed_50_dict = self.traffic_rule.overspeed()
 
         df_list = []
         if "overspeed10" in self.metric_list:
@@ -513,80 +343,49 @@ class Compliance(object):
 
         if "overspeed50" in self.metric_list:
             df_list.append(overspeed_50_dict)
-
         # generate dataframe and dicts
         compliance_df = pd.DataFrame(df_list)
-
         return compliance_df
 
-    def compliance_score(self):
-        """
-        待优化
-        可以重新构建数据结构,每个判断函数的返回类型为dict
-        在多指标统计扣分、罚款时,可以将所有字典合并为一个dataframe,然后求和
-        """
-        # initialization
+    def prepare_data(self):
+        self.compliance_df = self.compliance_statistic()
+        self.illegal_count = int(self.compliance_df['illegal_count'].sum())
+        self.metric_penalty_points_dict = self.compliance_df.set_index('metric').to_dict()['penalty_points']
+        self.metric_illegal_count_dict = self.compliance_df.set_index('metric').to_dict()['illegal_count']
+        self.metric_penalty_money_dict = self.compliance_df.set_index('metric').to_dict()['penalty_money']
+        self.metric_warning_count_dict = self.compliance_df.set_index('metric').to_dict()['warning_count']
+        self.metric_penalty_law_dict = self.compliance_df.set_index('metric').to_dict()['penalty_law']
+
+        # 初始化数据字典
+        self.illegal_count = int(self.compliance_df['illegal_count'].sum())
+        self.metric_penalty_points_dict = self.compliance_df.set_index('metric').to_dict()['penalty_points']
+        self.metric_illegal_count_dict = self.compliance_df.set_index('metric').to_dict()['illegal_count']
+        self.metric_penalty_money_dict = self.compliance_df.set_index('metric').to_dict()['penalty_money']
+        self.metric_warning_count_dict = self.compliance_df.set_index('metric').to_dict()['warning_count']
+        self.metric_penalty_law_dict = self.compliance_df.set_index('metric').to_dict()['penalty_law']
+
+    def calculate_deduct_scores(self):
         score_type_dict = {}
-        compliance_df = self.compliance_statistic()
-        self.illegal_count = int(compliance_df['illegal_count'].sum())
-        # self.penalty_points = compliance_df['penalty_points'].sum()
-        # self.penalty_money = compliance_df['penalty_money'].sum()
-        # self.warning_count = compliance_df['warning_count'].sum()
-
-        # self.metric_illegal_list = compliance_df[compliance_df['illegal_count'] > 0]['metric'].tolist()
-        # self.metric_illegal_count_list = compliance_df['illegal_count'].values.tolist()
-        #
-        # self.penalty_points_list = compliance_df['penalty_points'].values.tolist()
-        # self.penalty_money_list = compliance_df['penalty_money'].values.tolist()
-        # self.warning_count_list = compliance_df['warning_count'].values.tolist()
-
-        self.metric_penalty_points_dict = compliance_df.set_index('metric').to_dict()['penalty_points']
-        self.metric_illegal_count_dict = compliance_df.set_index('metric').to_dict()['illegal_count']
-        self.metric_penalty_money_dict = compliance_df.set_index('metric').to_dict()['penalty_money']
-        self.metric_warning_count_dict = compliance_df.set_index('metric').to_dict()['warning_count']
-        self.metric_penalty_law_dict = compliance_df.set_index('metric').to_dict()['penalty_law']
-
-        # deduct1
-        if "deduct1" in self.type_list:
-            deduct1_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 1)]
-            penalty_points_1 = deduct1_df['penalty_points'].sum()
-            illegal_count_1 = deduct1_df['illegal_count'].sum()
-            score_type_dict["deduct1"] = self.score_cal_penalty_points(penalty_points_1)
-            self.type_illegal_count_dict["deduct1"] = illegal_count_1
-
-        # deduct3
-        if "deduct3" in self.type_list:
-            deduct3_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 3)]
-            penalty_points_3 = deduct3_df['penalty_points'].sum()
-            illegal_count_3 = deduct3_df['illegal_count'].sum()
-            score_type_dict["deduct3"] = self.score_cal_penalty_points(penalty_points_3)
-            self.type_illegal_count_dict["deduct3"] = illegal_count_3
-
-        # deduct6
-        if "deduct6" in self.type_list:
-            deduct6_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 6)]
-            penalty_points_6 = deduct6_df['penalty_points'].sum()
-            illegal_count_6 = deduct6_df['illegal_count'].sum()
-            score_type_dict["deduct6"] = self.score_cal_penalty_points(penalty_points_6)
-            self.type_illegal_count_dict["deduct6"] = illegal_count_6
-
-        # deduct9
-        if "deduct9" in self.type_list:
-            deduct9_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 9)]
-            penalty_points_9 = deduct9_df['penalty_points'].sum()
-            illegal_count_9 = deduct9_df['illegal_count'].sum()
-            score_type_dict["deduct9"] = self.score_cal_penalty_points(penalty_points_9)
-            self.type_illegal_count_dict["deduct9"] = illegal_count_9
-
-        # deduct12
-        if "deduct12" in self.type_list:
-            deduct12_df = compliance_df[(compliance_df['weight'].isna()) | (compliance_df['weight'] == 12)]
-            penalty_points_12 = deduct12_df['penalty_points'].sum()
-            illegal_count_12 = deduct12_df['illegal_count'].sum()
-            score_type_dict["deduct12"] = self.score_cal_penalty_points(penalty_points_12)
-            self.type_illegal_count_dict["deduct12"] = illegal_count_12
-
-        weight_type_list = self.weight_type_cal()
+        deduct_functions = {
+            "deduct1": self.calculate_deduct(1),
+            "deduct3": self.calculate_deduct(3),
+            "deduct6": self.calculate_deduct(6),
+            "deduct9": self.calculate_deduct(9),
+            "deduct12": self.calculate_deduct(12),
+        }
+        for deduct_type in self.type_list:
+            print("deduct_type is", deduct_type)
+            if deduct_type in deduct_functions:
+                penalty_points, illegal_count = deduct_functions[deduct_type]
+                score_type_dict[deduct_type] = self.score_cal_penalty_points(penalty_points)
+                self.type_illegal_count_dict[deduct_type] = illegal_count
+        return score_type_dict
+
+    def calculate_deduct(self, num):
+        deduct_df = self.compliance_df[(self.compliance_df['weight'].isna()) | (self.compliance_df['weight'] == num)]
+        return deduct_df['penalty_points'].sum(), deduct_df['illegal_count'].sum()
+
+    def calculate_weights(self):
         weight_dict = {
             "overspeed10": 0.5,
             "overspeed10_20": 0.5,
@@ -595,14 +394,16 @@ class Compliance(object):
             "overspeed20_50": 0.5,
             "overspeed50": 1.0
         }
+        self.weight_type_list = [weight_dict.get(metric, 0.5) for metric in
+                                 self.type_list]  # 假设 type_list 中的每个元素都在 weight_dict 的键中,或者默认为 0.5
+        self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
+        self.weight_dict = weight_dict
 
-        type_list = ["deduct1", "deduct3", "deduct6", "deduct12"]
+    def calculate_compliance_score(self, score_type_dict):
         if not self.weight_custom:  # 客观赋权
-            self.weight_type_list = weight_type_list
-            self.weight_type_dict = {key: value for key, value in zip(type_list, weight_type_list)}
-            self.weight_dict = weight_dict
-
-        if self.penalty_points >= 12:
+            self.calculate_weights()
+        penalty_points_threshold = 12  # 假设的扣分阈值
+        if hasattr(self, 'penalty_points') and self.penalty_points >= penalty_points_threshold:
             score_compliance = 0
         elif sum(score_type_dict.values()) / len(score_type_dict) == 100:
             score_compliance = 100
@@ -610,350 +411,175 @@ class Compliance(object):
             score_type_tmp = [80 if x == 100 else x for key, x in score_type_dict.items()]
             score_compliance = np.dot(self.weight_type_list, score_type_tmp)
 
-        score_compliance = round(score_compliance, 2)
+        return round(score_compliance, 2)
 
+    def output_results(self, score_compliance, score_type_dict):
         print("\n[合规性表现及得分情况]")
         print(f"合规性得分为:{score_compliance:.2f}分。")
         print(f"合规性各分组得分为:{score_type_dict}。")
         print(f"合规性各分组权重为:{self.weight_type_list}。")
-        # print(f"合规性各指标违规次数为:\n{count_metric}。")
-        # print(f"违法总次数为:{self.illegal_count}。")
-        # print(f"违法扣分总数为:{self.penalty_points}。")
-        # print(f"罚款总金额为:{self.penalty_money}。")
-        # print(f"警告总次数为:{self.warning_count}。")
-        # print(compliance_df)
+
+    def compliance_score(self):
+        self.prepare_data()
+        score_type_dict = self.calculate_deduct_scores()
+        score_compliance = self.calculate_compliance_score(score_type_dict)
+        self.output_results(score_compliance, score_type_dict)
         return score_compliance, score_type_dict
 
     def _get_weight_distribution(self):
         # get weight distribution
         weight_distribution = {}
         weight_distribution["name"] = self.config.dimension_name["compliance"]
-
         for type in self.type_list:
             type_weight_indexes_dict = {key: f"{self.name_dict[key]}({value * 100:.2f}%)" for key, value in
                                         self.weight_dict.items() if
                                         key in self.metric_dict[type]}
-
             weight_distribution_type = {
                 "weight": f"{self.type_name_dict[type]}({self.weight_type_dict[type] * 100:.2f}%)",
                 "indexes": type_weight_indexes_dict
             }
             weight_distribution[type] = weight_distribution_type
-
         return weight_distribution
 
-    def compliance_weight_distribution(self):
-        # get weight distribution
-        weight_distribution = {}
-        weight_distribution["name"] = "合规性"
-
-        if "deduct1" in self.type_list:
-            deduct1_weight_indexes_dict = {}
-            if "overspeed10" in self.metric_list:
-                deduct1_weight_indexes_dict[
-                    "overspeed10Weight"] = f"超速,但未超过10%({self.weight_dict['overspeed10'] * 100:.2f}%)"
-
-            if "overspeed10_20" in self.metric_list:
-                deduct1_weight_indexes_dict[
-                    "overspeed1020Weight"] = f"超速10%-20%({self.weight_dict['overspeed10_20'] * 100:.2f}%)"
-
-            weight_distribution['deduct1'] = {
-                "deduct1Weight": f"轻微违规(1分)({self.weight_type_dict['deduct1'] * 100:.2f}%)",
-                "indexes": deduct1_weight_indexes_dict
-            }
-
-        if "deduct3" in self.type_list:
-            deduct3_weight_indexes_dict = {}
-
-            if "pressSolidLine" in self.metric_list:
-                deduct3_weight_indexes_dict[
-                    "pressSolidLineWeight"] = f"压实线({self.weight_dict['pressSolidLine'] * 100:.2f}%)"
-
-            weight_distribution['deduct3'] = {
-                "deduct3Weight": f"中等违规(3分)({self.weight_type_dict['deduct3'] * 100:.2f}%)",
-                "indexes": deduct3_weight_indexes_dict
-            }
-
-        if "deduct6" in self.type_list:
-            deduct6_weight_indexes_dict = {}
-
-            if "runRedLight" in self.metric_list:
-                deduct6_weight_indexes_dict["runRedLightWeight"] = f"闯红灯({self.weight_dict['runRedLight'] * 100:.2f}%)"
-
-            if "overspeed20_50" in self.metric_list:
-                deduct6_weight_indexes_dict[
-                    "overspeed2050Weight"] = f"超速20%-50%({self.weight_dict['overspeed20_50'] * 100:.2f}%)"
-
-            weight_distribution['deduct6'] = {
-                "deduct6Weight": f"危险违规(6分)({self.weight_type_dict['deduct6'] * 100:.2f}%)",
-                "indexes": deduct6_weight_indexes_dict
-            }
-
-        if "deduct9" in self.type_list:
-            deduct9_weight_indexes_dict = {}
-
-            weight_distribution['deduct9'] = {
-                "deduct9Weight": f"严重违规(9分)({self.weight_type_dict['deduct6'] * 100:.2f}%)",
-                "indexes": deduct9_weight_indexes_dict
-            }
-
-        if "deduct12" in self.type_list:
-            deduct12_weight_indexes_dict = {}
-
-            if "overspeed50" in self.metric_list:
-                deduct12_weight_indexes_dict[
-                    "overspeed50Weight"] = f"超速50%以上({self.weight_dict['overspeed50'] * 100:.2f}%)"
-
-            weight_distribution['deduct12'] = {
-                "deduct12Weight": f"重大违规(12分)({self.weight_type_dict['deduct12'] * 100:.2f}%)",
-                "indexes": deduct12_weight_indexes_dict
-            }
+    def build_weight_index(self, metric_prefix, weight_key):
+        if metric_prefix in self.metric_list:
+            return {
+                f"{metric_prefix}Weight": f"{metric_prefix.replace('_', ' ').capitalize()}({self.weight_dict[weight_key] * 100:.2f}%)"}
+        return {}
 
+    def compliance_weight_distribution(self):
+        weight_distribution = {"name": "合规性"}
+        deduct_types = {
+            "deduct1": {"metrics": ["overspeed10", "overspeed10_20"], "weight_key": "deduct1"},
+            "deduct3": {"metrics": ["pressSolidLine"], "weight_key": "deduct3"},
+            "deduct6": {"metrics": ["runRedLight", "overspeed20_50"], "weight_key": "deduct6"},
+            "deduct9": {"metrics": [], "weight_key": "deduct9"},  # Assuming no specific metrics for deduct9
+            "deduct12": {"metrics": ["overspeed50"], "weight_key": "deduct12"},
+        }
+        for deduct_type, details in deduct_types.items():
+            if deduct_type in self.type_list:
+                indexes_dict = {}
+                for metric in details["metrics"]:
+                    indexes_dict.update(self.build_weight_index(metric, details["weight_key"]))
+                weight_distribution[deduct_type] = {
+                    f"{deduct_type.replace('deduct', '')}Weight":
+                    f"{details['weight_key'].replace('deduct', '').replace('_', ' ').capitalize()}"
+                    f"违规({int(details['weight_key'].replace('deduct', ''))}分)({self.weight_type_dict[deduct_type] * 100:.2f} % )",
+                    "indexes": indexes_dict
+                }
+            if deduct_type == "deduct9" and "deduct9" in self.type_list:
+                weight_distribution[deduct_type] = {
+                    "deduct9Weight": f"严重违规(9分)({self.weight_type_dict[deduct_type] * 100:.2f}%)",
+                    "indexes": {}
+                }
         return weight_distribution
 
     def report_statistic(self):
         score_compliance, score_type_dict = self.compliance_score()
         grade_compliance = score_grade(score_compliance)
-
         score_compliance = int(score_compliance) if int(score_compliance) == score_compliance else score_compliance
         score_type = [int(n) if int(n) == n else n for key, n in score_type_dict.items()]
-
-        # get weight distribution
-        # weight_distribution = self.compliance_weight_distribution()
+        # 获取合规性描述
+        comp_description1 = self.get_compliance_description(grade_compliance, self.illegal_count)
+        comp_description2 = self.get_violation_details(self.type_list, self.type_illegal_count_dict,
+                                            self.type_name_dict)
         weight_distribution = self._get_weight_distribution()
 
-        # if self.metric_illegal_list:
-        #     str_illegel_metric = string_concatenate(self.metric_illegal_list)
+        # 获取违规数据表
+        violations_slices = self.get_violations_table()
+        # 获取扣分详情
+        deductPoint_dict = self.get_deduct_points_dict(score_type_dict)
+
+        # 返回结果(这里可以根据需要返回具体的数据结构)
+        return {
+            "name": "合规性",
+            "weight": f"{self.weight * 100:.2f}%",
+            "weightDistribution": weight_distribution,
+            "score": score_compliance,
+            "level": grade_compliance,
+            'score_type': score_type,
+            # 'score_metric': score_metric,
+            'illegalCount': self.illegal_count,
+
+            "description1": comp_description1,
+            "description2": comp_description2,
+            "details": deductPoint_dict,
+            "violations": violations_slices
+        }
 
-        cnt1 = self.illegal_count
+    def get_compliance_description(self, grade_compliance, illegal_count):
+        # 获取合规性描述
         if grade_compliance == '优秀':
-            comp_description1 = '车辆在本轮测试中无违反交通法规行为;'
-        elif grade_compliance == '良好':
-            comp_description1 = f'车辆在本轮测试中共发生{cnt1}次违反交通法规⾏为;'
-        elif grade_compliance == '一般':
-            comp_description1 = f'车辆在本轮测试中共有{cnt1}次违反交通法规⾏为,需要提高算法在合规性上的表现;'
-        elif grade_compliance == '较差':
-            comp_description1 = f'车辆在本轮测试中共有{cnt1}次违反交通法规⾏为,需要提高算法在合规性上的表现;'
-
-        # xx指标得分超过基准线且无违规行为,算法表现良好
-        # 违规次数 self.illegal_count
-        # 1分xx次,3分xx次
-
-        if cnt1 == 0:
-            comp_description2 = "车辆在该用例中无违反交通法规行为,算法表现良好。"
+            return '车辆在本轮测试中无违反交通法规行为;'
+        else:
+            return f'车辆在本轮测试中共发生{illegal_count}次违反交通法规行为;' \
+                   f'如果等级为一般或较差,需要提高算法在合规性上的表现。'
+
+    def get_violation_details(self, type_list, type_illegal_count_dict, type_name_dict):
+        # 获取违规详情描述
+        if self.illegal_count == 0:
+            return "车辆在该用例中无违反交通法规行为,算法表现良好。"
         else:
-            str_illegel_type = ""
-
-            for type in self.type_list:
-                if self.type_illegal_count_dict[type] > 0:
-                    str_illegel_type += f'{self.type_name_dict[type]}行为{self.type_illegal_count_dict[type]}次,'
-
-            # if "deduct1" in self.type_list:
-            #     if self.type_illegal_count_dict["deduct1"] > 0:
-            #         str_illegel_type += f'轻微违规(1分)行为{self.type_illegal_count_dict["deduct1"]}次,'
-            #
-            # if "deduct3" in self.type_list:
-            #     if self.type_illegal_count_dict["deduct3"] > 0:
-            #         str_illegel_type += f'中等违规(3分)行为{self.type_illegal_count_dict["deduct3"]}次,'
-            #
-            # if "deduct6" in self.type_list:
-            #     if self.type_illegal_count_dict["deduct6"] > 0:
-            #         str_illegel_type += f'危险违规(6分)行为{self.type_illegal_count_dict["deduct6"]}次,'
-            #
-            # if "deduct9" in self.type_list:
-            #     if self.type_illegal_count_dict["deduct9"] > 0:
-            #         str_illegel_type += f'严重违规(9分)行为{self.type_illegal_count_dict["deduct9"]}次,'
-            #
-            # if "deduct12" in self.type_list:
-            #     if self.type_illegal_count_dict["deduct12"] > 0:
-            #         str_illegel_type += f'重大违规(12分)行为{self.type_illegal_count_dict["deduct12"]}次,'
-
-            str_illegel_type = str_illegel_type[:-1]
-            comp_description2 = f"车辆在该用例共违反交通法规{cnt1}次。其中{str_illegel_type}。违规行为详情见附录C。"
-
-        # data for violations table
+            str_illegel_type = ", ".join(
+                [f"{type_name_dict[type]}行为{count}次" for type, count in type_illegal_count_dict.items() if
+                 count > 0])
+            return f"车辆在该用例共违反交通法规{self.illegal_count}次。其中{str_illegel_type}。违规行为详情见附录C。"
+
+    def get_violations_table(self):
+        # 获取违规数据表
         if not self.violation_df.empty:
-            # self.violation_df['time'] = self.violation_df.apply(
-            #     lambda row: self.time_frame_splice(row['start_time'], row['end_time'], row['start_frame'],
-            #                                        row['end_frame']), axis=1)
             self.violation_df['time'] = self.violation_df.apply(
                 lambda row: self.time_splice(row['start_time'], row['end_time']), axis=1)
             df_violations = self.violation_df[['time', 'violation']]
-            violations_slices = df_violations.to_dict('records')
+            return df_violations.to_dict('records')
         else:
-            violations_slices = []
-
-        # violations_slices = [{'time': "[1s, 2s]", 'violation': "压实线"},
-        #                      {'time': "[10s, 20s]", 'violation': "闯红灯"},
-        #                      {'time': "[100s, 200s]", 'violation': "超速"}]
+            return []
 
+    def get_deduct_points_dict(self, score_type_dict):
+        # 获取扣分详情字典
         deductPoint_dict = {}
-
         for type in self.type_list:
             type_dict = {
                 "name": self.type_name_dict[type],
                 "score": score_type_dict[type],
+                "indexes": {}
             }
-
-            type_indexes = {}
             for metric in self.metric_list:
-                type_indexes[metric] = {
-                    # "name": self.name_dict[metric],
-                    # "name": f"{self.name_dict[metric]}({self.unit_dict[metric]})",
-                    "name": f"{self.name_dict[metric]}",
+                type_dict["indexes"][metric] = {
+                    "name": self.name_dict[metric],
                     "times": self.metric_illegal_count_dict[metric],
                     "deductPoints": self.metric_penalty_points_dict[metric],
                     "fine": self.metric_penalty_money_dict[metric],
                     "basis": self.metric_penalty_law_dict[metric]
                 }
-            type_dict["indexes"] = type_indexes
             deductPoint_dict[type] = type_dict
 
-        # deduct1
-        if "deduct1" in self.type_list:
-            deduct1_indexes = {}
-            if "overspeed10" in self.metric_list:
-                tmp_dict = {
-                    "name": "超速,但未超过10%",
-                    "times": self.metric_illegal_count_dict["overspeed10"],
-                    "deductPoints": self.metric_penalty_points_dict["overspeed10"],
-                    "fine": self.metric_penalty_money_dict["overspeed10"],
-                    "basis": self.metric_penalty_law_dict["overspeed10"]
-                    # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。"
-                }
-                deduct1_indexes['overspeed10'] = tmp_dict
-
-            if "overspeed10_20" in self.metric_list:
-                tmp_dict = {
-                    "name": "超速10%-20%",
-                    "times": self.metric_illegal_count_dict["overspeed10_20"],
-                    "deductPoints": self.metric_penalty_points_dict["overspeed10_20"],
-                    "fine": self.metric_penalty_money_dict["overspeed10_20"],
-                    "basis": self.metric_penalty_law_dict["overspeed10_20"]
-                    # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。"
+        for deduct_type, metrics in [
+            ("deduct1", ["overspeed10", "overspeed10_20"]),
+            ("deduct3", ["pressSolidLine"]),
+            ("deduct6", ["runRedLight", "overspeed20_50"]),
+            ("deduct9", ["xx"]),  # 注意:这里xx是示例,实际应替换为具体的违规类型
+            ("deduct12", ["overspeed50"])
+        ]:
+            if deduct_type in self.type_list:
+                deduct_indexes = {}
+                for metric in metrics:
+                    if metric in self.metric_list:
+                        deduct_indexes[metric] = {
+                            "name": self.name_dict[metric],
+                            "times": self.metric_illegal_count_dict[metric],
+                            "deductPoints": self.metric_penalty_points_dict[metric],
+                            "fine": self.metric_penalty_money_dict[metric],
+                            "basis": self.metric_penalty_law_dict[metric]
+                        }
+                # 对于deduct9,特殊处理其score值
+                score = score_type_dict.get(deduct_type, 100) if deduct_type != "deduct9" else 100
+                deductPoint_dict[deduct_type] = {
+                    "name": f"{deduct_type.replace('deduct', '').strip()}违规({int(deduct_type.replace('deduct', ''))}分)",
+                    "score": score,
+                    "indexes": deduct_indexes
                 }
-
-                deduct1_indexes['overspeed10_20'] = tmp_dict
-
-            deduct1_dict = {
-                "name": "轻微违规(1分)",
-                "score": score_type_dict["deduct1"],
-                "indexes": deduct1_indexes
-            }
-            deductPoint_dict["deduct1"] = deduct1_dict
-
-        # deduct3
-        if "deduct3" in self.type_list:
-            deduct3_indexes = {}
-            if "pressSolidLine" in self.metric_list:
-                tmp_dict = {
-                    "name": "压实线",
-                    "times": self.metric_illegal_count_dict["pressSolidLine"],
-                    "deductPoints": self.metric_penalty_points_dict["pressSolidLine"],
-                    "fine": self.metric_penalty_money_dict["pressSolidLine"],
-                    "basis": self.metric_penalty_law_dict["pressSolidLine"]
-                    # "basis": "《中华人民共和国道路交通安全法》第八十二条:机动车在高速公路上行驶,不得有下列行为:(三)骑、轧车行道分界线或者在路肩上行驶。"
-                }
-                deduct3_indexes['pressSolidLine'] = tmp_dict
-
-            deduct3_dict = {
-                "name": "中等违规(3分)",
-                "score": score_type_dict["deduct3"],
-                "indexes": deduct3_indexes
-            }
-            deductPoint_dict["deduct3"] = deduct3_dict
-
-        # deduct6
-        if "deduct6" in self.type_list:
-            deduct6_indexes = {}
-            if "runRedLight" in self.metric_list:
-                tmp_dict = {
-                    "name": "闯红灯",
-                    "times": self.metric_illegal_count_dict["runRedLight"],
-                    "deductPoints": self.metric_penalty_points_dict["runRedLight"],
-                    "fine": self.metric_penalty_money_dict["runRedLight"],
-                    "basis": self.metric_penalty_law_dict["runRedLight"]
-                    # "basis": "《中华人民共和国道路交通安全法实施条例》第四十条:(二)红色叉形灯或者箭头灯亮时,禁止本车道车辆通行。"
-                }
-                deduct6_indexes['runRedLight'] = tmp_dict
-
-            if "overspeed20_50" in self.metric_list:
-                tmp_dict = {
-                    "name": "超速20%-50%",
-                    "times": self.metric_illegal_count_dict["overspeed20_50"],
-                    "deductPoints": self.metric_penalty_points_dict["overspeed20_50"],
-                    "fine": self.metric_penalty_money_dict["overspeed20_50"],
-                    "basis": self.metric_penalty_law_dict["overspeed20_50"]
-                    # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。"
-                }
-                deduct6_indexes['overspeed20_50'] = tmp_dict
-
-            deduct6_dict = {
-                "name": "危险违规(6分)",
-                "score": score_type_dict["deduct6"],
-                "indexes": deduct6_indexes
-            }
-            deductPoint_dict["deduct6"] = deduct6_dict
-
-        # deduct9
-        if "deduct9" in self.type_list:
-            deduct9_indexes = {}
-            if "xx" in self.metric_list:
-                tmp_dict = {
-                    "name": "-",
-                    "times": "0",
-                    "deductPoints": "0",
-                    "fine": "0",
-                    "basis": "-"
-                    # "basis": "-"
-                }
-                deduct9_indexes['xx'] = tmp_dict
-
-            deduct9_dict = {
-                "name": "严重违规(9分)",
-                "score": 100,  # score_type_dict["deduct9"],
-                "indexes": deduct9_indexes
-            }
-            deductPoint_dict["deduct9"] = deduct9_dict
-
-        # deduct12
-        if "deduct12" in self.type_list:
-            deduct12_indexes = {}
-            if "overspeed50" in self.metric_list:
-                tmp_dict = {
-                    "name": "超速50%以上",
-                    "times": self.metric_illegal_count_dict["overspeed50"],
-                    "deductPoints": self.metric_penalty_points_dict["overspeed50"],
-                    "fine": self.metric_penalty_money_dict["overspeed50"],
-                    "basis": self.metric_penalty_law_dict["overspeed50"]
-                    # "basis": "《中华人民共和国道路交通安全法》第四十二条:机动车上道路行驶,不得超过限速标志标明的最高时速。"
-                }
-                deduct12_indexes['overspeed50'] = tmp_dict
-
-            deduct12_dict = {
-                "name": "重大违规(12分)",
-                "score": score_type_dict["deduct12"],
-                "indexes": deduct12_indexes
-            }
-            deductPoint_dict["deduct12"] = deduct12_dict
-
-        report_dict = {
-            "name": "合规性",
-            "weight": f"{self.weight * 100:.2f}%",
-            "weightDistribution": weight_distribution,
-            "score": score_compliance,
-            "level": grade_compliance,
-            'score_type': score_type,
-            # 'score_metric': score_metric,
-            'illegalCount': self.illegal_count,
-
-            "description1": comp_description1,
-            "description2": comp_description2,
-            "details": deductPoint_dict,
-            "violations": violations_slices
-        }
-
-        return report_dict
+        return deductPoint_dict
 
     def get_eval_data(self):
         df = self.eval_data

+ 7 - 1
cicv_evaluate_master-single_eval_1117/safe.py

@@ -56,6 +56,7 @@ class Safe(object):
 
         self.df = data_processed.object_df.copy()
         self.ego_df = data_processed.ego_data
+        self.laneinfo_df = data_processed.lane_info_df
         self.obj_id_list = data_processed.obj_id_list
 
         # config infos for calculating score
@@ -171,6 +172,7 @@ class Safe(object):
 
                 try:
                     obj_data = obj_dict[frame_num][playerId]
+                    lane_width = self.laneinfo_df[self.laneinfo_df['simFrame'].isin([frame_num])]['width'].values/2
                 except KeyError:
                     continue
 
@@ -236,7 +238,10 @@ class Safe(object):
 
                 obj_type = obj_data['type']
 
-                TTC = self._cal_TTC(dist, vrel_projection_in_dist)
+                if abs(dist*cos_theta) <= lane_width[0]:
+                    TTC = self._cal_TTC(dist, vrel_projection_in_dist)
+                else:
+                    TTC = 10000
                 MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
                 THW = self._cal_THW(dist, v_ego_p)
 
@@ -318,6 +323,7 @@ class Safe(object):
 
         if not self.empty_flag:
             df_safe = df_safe[col_list].reset_index(drop=True)
+
         # df_safe.reset_index(drop=True)
 
         self.eval_data = df_safe.copy()

+ 1 - 1
cicv_evaluate_master-single_eval_1117/single_run_test.py

@@ -186,7 +186,7 @@ if __name__ == "__main__":
 
     configPath = rf"./close_beta_test/config_test/config.json"
     
-    dataPath = r"./close_beta_test/case_test/data_lantu"
+    dataPath = r"./close_beta_test/case_test/data_PGVIL_1121"
     # dataPath = r'/home/kevin/kevin/pingjia/cicv_evaluate_master-single_eval/close_beta_test/case_test/ACC/ACC_following_car_good'