#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn) @Data: 2023/07/25 @Last Modified: 2023/07/26 @Summary: safe metrics """ import math import numpy as np import pandas as pd from modules.lib.score import Score from modules.config import config from collections import defaultdict import scipy.integrate as spi class Safe(object): def __init__(self, data_processed): # self.logger = log.get_logger() # 使用时再初始化 self.data_processed = data_processed self.df = data_processed.object_df.copy() self.ego_df = data_processed.ego_data self.obj_id_list = data_processed.obj_id_list self.metric_list = [ "TTC", "MTTC", "THW", "LatSD", "BTN", "collisionRisk", "collisionSeverity", ] # score data self.calculated_value = { "TTC": 1.0, "MTTC": 1.0, "THW": 1.0, # 'LonSD': 0.0, "LatSD": 0.0, # 'DRAC': 0.0, "BTN": 1.0, # 'STN': 1.0, "collisionRisk": 0.0, "collisionSeverity": 0.0, } # lists of drving control info self.time_list = self.ego_df["simTime"].values.tolist() self.frame_list = self.ego_df["simFrame"].values.tolist() self.collisionRisk = 0 self.empty_flag = True # no car following scene if len(self.obj_id_list) > 1: self.unsafe_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.unsafe_time_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.unsafe_dist_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_acce_drac_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.unsafe_acce_xtn_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.unsafe_prob_df = pd.DataFrame( columns=["start_time", "end_time", "start_frame", "end_frame", "type"] ) self.most_dangerous = {} self.pass_percent = {} self._safe_param_cal_new() def _safe_param_cal_new(self): Tc = 0.3 # 安全距离 rho = Tc # 驾驶员制动反应时间(单位:秒) ego_accel_max = self.data_processed.vehicle_config[ "EGO_ACCEL_MAX" ] # 自车油门最大加速度 obj_decel_max = self.data_processed.vehicle_config[ "OBJ_DECEL_MAX" ] # 前车刹车最大减速度 ego_decel_min = self.data_processed.vehicle_config[ "EGO_DECEL_MIN" ] # 自车刹车最小减速度 ug ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"] ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"] # 构建双层字典数据结构 obj_dict = defaultdict(dict) # 构建字典列表,每个元素是一个字典,包含'start_time', 'end_time', 'start_frame', 'end_frame', 'type'五个键值对 obj_data_dict = self.df.to_dict("records") for item in obj_data_dict: obj_dict[item["simFrame"]][item["playerId"]] = item df_list = [] EGO_PLAYER_ID = 1 # self.empty_flag = True for frame_num in self.frame_list: ego_data = obj_dict[frame_num][EGO_PLAYER_ID] v1 = ego_data["v"] # km/h to m/s x1 = ego_data["posX"] y1 = ego_data["posY"] h1 = ego_data["posH"] # len1 = ego_data['dimX'] # width1 = ego_data['dimY'] # o_x1 = ego_data['offX'] v_x1 = ego_data["speedX"] # km/h to m/s v_y1 = ego_data["speedY"] # km/h to m/s a_x1 = ego_data["accelX"] a_y1 = ego_data["accelY"] # a1 = ego_data['accel'] for playerId in self.obj_id_list: if playerId == EGO_PLAYER_ID: continue try: obj_data = obj_dict[frame_num][playerId] except KeyError: continue x2 = obj_data["posX"] y2 = obj_data["posY"] dist = self.dist(x1, y1, x2, y2) obj_data["dist"] = dist v_x2 = obj_data["speedX"] # km/h to m/s v_y2 = obj_data["speedY"] # km/h to m/s v2 = obj_data["v"] # km/h to m/s # h2 = obj_data['posH'] # len2 = obj_data['dimX'] # width2 = obj_data['dimY'] # o_x2 = obj_data['offX'] a_x2 = obj_data["accelX"] a_y2 = obj_data["accelY"] # a2 = obj_data['accel'] dx, dy = x2 - x1, y2 - y1 # 定义矢量A和x轴正向向量x A = np.array([dx, dy]) x = np.array([1, 0]) # 计算点积和向量长度 dot_product = np.dot(A, x) vector_length_A = np.linalg.norm(A) vector_length_x = np.linalg.norm(x) # 计算夹角的余弦值 cos_theta = dot_product / (vector_length_A * vector_length_x) # 将余弦值转换为角度值(弧度制) beta = np.arccos(cos_theta) # 如何通过theta正负确定方向 lon_d = dist * math.cos(beta - h1) lat_d = abs( dist * math.sin(beta - h1) ) # 需要增加左正右负的判断,但beta取值为[0,pi) obj_dict[frame_num][playerId]["lon_d"] = lon_d obj_dict[frame_num][playerId]["lat_d"] = lat_d # 代码注释,这里是筛选出车前100米,车后5米,车右边4米的目标车,用于计算安全距离 # print(f"lon_d: {lon_d}, lat_d: {lat_d}") if lon_d > 100 or lon_d < -5 or lat_d > 4: continue self.empty_flag = False vx, vy = v_x1 - v_x2, v_y1 - v_y2 ax, ay = a_x2 - a_x1, a_y2 - a_y1 v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1) v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2) vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy) arel_projection_in_dist = self._cal_a_projection( dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2 ) obj_dict[frame_num][playerId][ "vrel_projection_in_dist" ] = vrel_projection_in_dist obj_dict[frame_num][playerId][ "arel_projection_in_dist" ] = arel_projection_in_dist obj_dict[frame_num][playerId]["v_ego_projection_in_dist"] = v_ego_p obj_dict[frame_num][playerId]["v_obj_projection_in_dist"] = v_obj_p obj_type = obj_data["type"] TTC = self._cal_TTC(dist, vrel_projection_in_dist) MTTC = self._cal_MTTC( TTC, vrel_projection_in_dist, arel_projection_in_dist ) THW = self._cal_THW(dist, v_ego_p) # 单车道时可用 # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, # obj_decel_max) lat_dist = 0.5 v_right = v1 v_left = v2 a_right_lat_brake_min = 1 a_left_lat_brake_min = 1 a_lat_max = 5 LatSD = self._cal_lateral_safe_dist( lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max, ) # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2) lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1) lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1) lon_a = abs(lon_a1 - lon_a2) lon_d = dist * abs(math.cos(beta - h1)) lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1) BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max) lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1) lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1) lat_a = abs(lat_a1 - lat_a2) lat_d = dist * abs(math.sin(beta - h1)) lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1) # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2) obj_dict[frame_num][playerId]["lat_v_rel"] = v_x1 - v_x2 obj_dict[frame_num][playerId]["lon_v_rel"] = v_y1 - v_y2 TTC = None if (not TTC or TTC < 0) else TTC MTTC = None if (not MTTC or MTTC < 0) else MTTC THW = None if (not THW or THW < 0) else THW # 原DRAC计算已被注释,需移除相关代码引用 # DRAC = 10 if DRAC >= 10 else DRAC # 删除这行 # 修改后的指标赋值 obj_dict[frame_num][playerId]["TTC"] = TTC obj_dict[frame_num][playerId]["MTTC"] = MTTC obj_dict[frame_num][playerId]["THW"] = THW # obj_dict[frame_num][playerId]['LonSD'] = LonSD obj_dict[frame_num][playerId]["LatSD"] = LatSD # obj_dict[frame_num][playerId]['DRAC'] = DRAC # 删除这行 obj_dict[frame_num][playerId]["BTN"] = abs(BTN) # obj_dict[frame_num][playerId]['STN'] = abs(STN) # 删除这行 # 确保以下变量已定义 collisionSeverity = ( collisionSeverity if "collisionSeverity" in locals() else 0 ) pr_death = pr_death if "pr_death" in locals() else 0 collisionRisk = collisionRisk if "collisionRisk" in locals() else 0 obj_dict[frame_num][playerId]["collisionSeverity"] = ( collisionSeverity * 100 ) obj_dict[frame_num][playerId]["pr_death"] = pr_death * 100 obj_dict[frame_num][playerId]["collisionRisk"] = collisionRisk * 100 # print( # f"{playerId} {frame_num} {TTC} {MTTC} {THW} {LatSD} {BTN} {collisionSeverity} {pr_death} {collisionRisk}" # ) df_fnum = pd.DataFrame(obj_dict[frame_num].values()) df_list.append(df_fnum) df_safe = pd.concat(df_list) col_list = [ "simTime", "simFrame", "playerId", "TTC", "MTTC", "THW", "LatSD", "BTN", # 移除STN "collisionSeverity", "pr_death", "collisionRisk", ] if not self.empty_flag: self.df = df_safe[col_list].reset_index(drop=True) # self.df_safe是有问题的,因为后边统计的函数里是没有self.df_safe这个变量的,只有self.df, # 需要增加如果是空的情况下,self.df_safe的值为默认值。 def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx**2 + dy**2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 在 AB 连线上的速度 V1_on_AB V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy return V1_on_AB def _cal_v_projection(self, dx, dy, vx, vy): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx**2 + dy**2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的速度 V_relative # vx = vx1 - vx2 # vy = vy1 - vy2 # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB V_on_AB = vx * U_ABx + vy * U_ABy return V_on_AB def _cal_a_projection( self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2 ): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 θ V_mod = math.sqrt(vx**2 + vy**2) AB_mod = math.sqrt(dx**2 + dy**2) if V_mod == 0 or AB_mod == 0: return 0 cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod) theta = math.acos(cos_theta) # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx**2 + dy**2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的加速度 a_relative # ax = ax1 - ax2 # ay = ay1 - ay2 # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB a_on_AB = ax * U_ABx + ay * U_ABy VA = np.array([v_x1, v_y1]) VB = np.array([v_x2, v_y2]) D_A = np.array([x1, y1]) D_B = np.array([x2, y2]) V_r = VA - VB V = np.linalg.norm(V_r) w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB) a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta) return a_on_AB_back # 计算相对加速度 def _calculate_derivative(self, a, w, V, theta): # 计算(V×cos(θ))'的值 # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta derivative = a - w * V * math.sin(theta) return derivative def _cal_relative_angular_v(self, theta, A, B, VA, VB): dx = A[0] - B[0] dy = A[1] - B[1] dvx = VA[0] - VB[0] dvy = VA[1] - VB[1] # (dx * dvy - dy * dvx) angular_velocity = ( math.sqrt(dvx**2 + dvy**2) * math.sin(theta) / math.sqrt(dx**2 + dy**2) ) return angular_velocity def _death_pr(self, obj_type, v_relative): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) return p_death def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity return collisionRisk # 求两车之间当前距离 def dist(self, x1, y1, x2, y2): dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dist # TTC (time to collision) def _cal_TTC(self, dist, vrel_projection_in_dist): if vrel_projection_in_dist == 0: return math.inf TTC = dist / vrel_projection_in_dist return TTC def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist): MTTC = math.nan if arel_projection_in_dist != 0: tmp = vrel_projection_in_dist**2 + 2 * arel_projection_in_dist * dist if tmp < 0: return math.nan t1 = ( -1 * vrel_projection_in_dist - math.sqrt(tmp) ) / arel_projection_in_dist t2 = ( -1 * vrel_projection_in_dist + math.sqrt(tmp) ) / arel_projection_in_dist if t1 > 0 and t2 > 0: if t1 >= t2: MTTC = t2 elif t1 < t2: MTTC = t1 elif t1 > 0 and t2 <= 0: MTTC = t1 elif t1 <= 0 and t2 > 0: MTTC = t2 if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0: MTTC = dist / vrel_projection_in_dist return MTTC # THW (time headway) def _cal_THW(self, dist, v_ego_projection_in_dist): if not v_ego_projection_in_dist: THW = None else: THW = dist / v_ego_projection_in_dist return THW def velocity(self, v_x, v_y): v = math.sqrt(v_x**2 + v_y**2) * 3.6 return v def _cal_longitudinal_safe_dist( self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max ): lon_dist_min = ( v_ego_p * rho + ego_accel_max * (rho**2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (2 * ego_decel_min) - v_obj_p**2 / (2 * ego_decel_max) ) return lon_dist_min def _cal_lateral_safe_dist( self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max, ): v_right_rho = v_right + rho * a_lat_max v_left_rho = v_left + rho * a_lat_max dist_min = lat_dist + ( (v_right + v_right_rho) * rho / 2 + v_right_rho**2 / a_right_lat_brake_min / 2 + ((v_left + v_right_rho) * rho / 2) + v_left_rho**2 / a_left_lat_brake_min / 2 ) return dist_min # DRAC (decelerate required avoid collision) def _cal_DRAC( self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2 ): dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671 if dist_length < 0: dist_width = dist - (width2 / 2 + width1 / 2) if dist_width < 0: return math.inf else: d = dist_width else: d = dist_length DRAC = vrel_projection_in_dist**2 / (2 * d) return DRAC # BTN (brake threat number) def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max): BTN = ( lon_a1 + lon_a - lon_v**2 / (2 * lon_d) ) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def _cal_STN_new( self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2 ): STN = ( lat_a1 + lat_a + 2 / ttc**2 * ( lat_d + abs(ego_decel_lat_max * lat_v) * (width1 + width2) / 2 + abs(lat_v * ttc) ) ) / ego_decel_lat_max return STN # BTN (brake threat number) def cal_BTN(self, a_y1, ay, dy, vy, max_ay): BTN = ( a_y1 + ay - vy**2 / (2 * dy) ) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2): STN = ( a_x1 + ax + 2 / ttc**2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc) ) / max_ax return STN # 追尾碰撞风险 def _normal_distribution(self, x): mean = 1.32 std_dev = 0.26 return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp( -0.5 * (x - mean) ** 2 / std_dev ) def continuous_group(self, df): time_list = df["simTime"].values.tolist() frame_list = df["simFrame"].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(frame_list)): if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1: sub_group_time.append(time_list[i]) sub_group_frame.append(frame_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [time_list[i]] sub_group_frame = [frame_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 time = [[g[0], g[-1]] for g in group_time] frame = [[g[0], g[-1]] for g in group_frame] unfunc_time_df = pd.DataFrame(time, columns=["start_time", "end_time"]) unfunc_frame_df = pd.DataFrame(frame, columns=["start_frame", "end_frame"]) unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1) return unfunc_df # 统计最危险的指标 def _safe_statistic_most_dangerous(self): min_list = ["TTC", "MTTC", "THW", "LatSD"] max_list = ["BTN", "collisionRisk", "collisionSeverity"] # 移除STN for metric in min_list: if metric in self.metric_list: if metric in self.df.columns: self.calculated_value[metric] = self.df[metric].min() else: self.calculated_value[metric] = 10.0 for metric in max_list: if metric in self.metric_list: if metric in self.df.columns: self.calculated_value[metric] = self.df[metric].max() else: self.calculated_value[metric] = 10.0 return self.calculated_value def _safe_no_obj_statistic(self): # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"] return self.calculated_value def report_statistic( self, ): safety_result = {} if len(self.obj_id_list) == 1: safety_result = self._safe_no_obj_statistic() else: safety_result = self._safe_statistic_most_dangerous() evaluator = Score(self.data_processed.safety_config) result = evaluator.evaluate(safety_result) # self.logger.info(f"安全性表现及得分情况:[{result}]") print("\n[安全性表现及得分情况]") return result