#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn) @Data: 2023/07/25 @Last Modified: 2023/07/26 @Summary: safe metrics """ import sys import os import math import numpy as np import pandas as pd from pathlib import Path root_path = Path(__file__).resolve().parent.parent sys.path.append(str(root_path)) from models.common.score import Score from config import config from collections import defaultdict import scipy.integrate as spi from models.common import log # 确保这个路径是正确的,或者调整它 log_path = config.LOG_PATH logger = log.get_logger(log_path) class Safe(object): def __init__(self, data_processed): self.data_processed = data_processed self.df = data_processed.object_df.copy() self.ego_df = data_processed.ego_data self.obj_id_list = data_processed.obj_id_list self.metric_list = config.SAFETY_METRIC_LIST # score data self.calculated_value = { 'TTC': 1.0, 'MTTC': 1.0, 'THW': 1.0, 'LonSD': 0.0, 'LatSD': 0.0, 'DRAC': 0.0, 'BTN': 1.0, 'STN': 1.0, 'collisionRisk': 0.0, 'collisionSeverity': 0.0, } # lists of drving control info self.time_list = data_processed.driver_ctrl_data['time_list'] self.frame_list = self.ego_df['simFrame'].values.tolist() self.collisionRisk = 0 self.empty_flag = True # no car following scene if len(self.obj_id_list) > 1: self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_acce_drac_df = pd.DataFrame( columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_acce_xtn_df = pd.DataFrame( columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type']) self.most_dangerous = {} self.pass_percent = {} self._safe_param_cal_new() def _safe_param_cal_new(self): Tc = config.TC # 安全距离 rho = config.RHO # 驾驶员制动反应时间 ego_accel_max = config.EGO_ACCEL_MAX # 自车油门最大加速度 obj_decel_max = config.OBJ_DECEL_MAX # 前车刹车最大减速度 ego_decel_min = config.EGO_DECEL_MIN # 自车刹车最小减速度 ug ego_decel_lon_max = config.EGO_DECEL_LON_MAX ego_decel_lat_max = config.EGO_DECEL_LAT_MAX # 构建双层字典数据结构 obj_dict = defaultdict(dict) # 构建字典列表,每个元素是一个字典,包含'start_time', 'end_time', 'start_frame', 'end_frame', 'type'五个键值对 obj_data_dict = self.df.to_dict('records') for item in obj_data_dict: obj_dict[item['simFrame']][item['playerId']] = item df_list = [] EGO_PLAYER_ID = 1 # self.empty_flag = True for frame_num in self.frame_list: ego_data = obj_dict[frame_num][EGO_PLAYER_ID] v1 = ego_data['v'] # km/h to m/s x1 = ego_data['posX'] y1 = ego_data['posY'] h1 = ego_data['posH'] len1 = ego_data['dimX'] width1 = ego_data['dimY'] o_x1 = ego_data['offX'] v_x1 = ego_data['speedX'] # km/h to m/s v_y1 = ego_data['speedY'] # km/h to m/s a_x1 = ego_data['accelX'] a_y1 = ego_data['accelY'] # a1 = ego_data['accel'] for playerId in self.obj_id_list: if playerId == EGO_PLAYER_ID: continue try: obj_data = obj_dict[frame_num][playerId] except KeyError: continue x2 = obj_data['posX'] y2 = obj_data['posY'] dist = self.dist(x1, y1, x2, y2) obj_data['dist'] = dist v_x2 = obj_data['speedX'] # km/h to m/s v_y2 = obj_data['speedY'] # km/h to m/s v2 = obj_data['v'] # km/h to m/s # h2 = obj_data['posH'] len2 = obj_data['dimX'] width2 = obj_data['dimY'] o_x2 = obj_data['offX'] a_x2 = obj_data['accelX'] a_y2 = obj_data['accelY'] # a2 = obj_data['accel'] dx, dy = x2 - x1, y2 - y1 # 定义矢量A和x轴正向向量x A = np.array([dx, dy]) x = np.array([1, 0]) # 计算点积和向量长度 dot_product = np.dot(A, x) vector_length_A = np.linalg.norm(A) vector_length_x = np.linalg.norm(x) # 计算夹角的余弦值 cos_theta = dot_product / (vector_length_A * vector_length_x) # 将余弦值转换为角度值(弧度制) beta = np.arccos(cos_theta) # 如何通过theta正负确定方向 lon_d = dist * math.cos(beta - h1) lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi) obj_dict[frame_num][playerId]['lon_d'] = lon_d obj_dict[frame_num][playerId]['lat_d'] = lat_d # 代码注释,这里是筛选出车前100米,车后5米,车右边4米的目标车,用于计算安全距离 if lon_d > 100 or lon_d < -5 or lat_d > 4: continue self.empty_flag = False vx, vy = v_x1 - v_x2, v_y1 - v_y2 ax, ay = a_x2 - a_x1, a_y2 - a_y1 v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1) v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2) vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy) arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2) obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p obj_type = obj_data['type'] TTC = self._cal_TTC(dist, vrel_projection_in_dist) MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist) THW = self._cal_THW(dist, v_ego_p) # 单车道时可用 LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max) lat_dist = 0.5 v_right = v1 v_left = v2 a_right_lat_brake_min = 1 a_left_lat_brake_min = 1 a_lat_max = 5 LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max) DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2) lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1) lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1) lon_a = abs(lon_a1 - lon_a2) lon_d = dist * abs(math.cos(beta - h1)) lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1) BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max) lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1) lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1) lat_a = abs(lat_a1 - lat_a2) lat_d = dist * abs(math.sin(beta - h1)) lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1) STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2) obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2 obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2 # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay) # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2) TTC = None if (not TTC or TTC < 0) else TTC MTTC = None if (not MTTC or MTTC < 0) else MTTC THW = None if (not THW or THW < 0) else THW DRAC = 10 if DRAC >= 10 else DRAC # TTC要进行筛选,否则会出现nan或者TTC过大的情况 if not TTC or TTC > 4000: # threshold = 4258.41 collisionSeverity = 0 pr_death = 0 collisionRisk = 0 else: result, error = spi.quad(self._normal_distribution, 0, TTC - Tc) collisionSeverity = 1 - result pr_death = self._death_pr(obj_type, vrel_projection_in_dist) collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity obj_dict[frame_num][playerId]['TTC'] = TTC obj_dict[frame_num][playerId]['MTTC'] = MTTC obj_dict[frame_num][playerId]['THW'] = THW obj_dict[frame_num][playerId]['LonSD'] = LonSD obj_dict[frame_num][playerId]['LatSD'] = LatSD obj_dict[frame_num][playerId]['DRAC'] = DRAC obj_dict[frame_num][playerId]['BTN'] = abs(BTN) obj_dict[frame_num][playerId]['STN'] = abs(STN) obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100 obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100 obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100 df_fnum = pd.DataFrame(obj_dict[frame_num].values()) df_list.append(df_fnum) df_safe = pd.concat(df_list) col_list = ['simTime', 'simFrame', 'playerId', 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'DRAC', 'BTN', 'STN', 'collisionSeverity', 'pr_death', 'collisionRisk'] self.df_safe = df_safe[col_list].reset_index(drop=True) def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 在 AB 连线上的速度 V1_on_AB V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy return V1_on_AB def _cal_v_projection(self, dx, dy, vx, vy): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的速度 V_relative # vx = vx1 - vx2 # vy = vy1 - vy2 # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB V_on_AB = vx * U_ABx + vy * U_ABy return V_on_AB def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2): # 计算 AB 连线的向量 AB # dx = x2 - x1 # dy = y2 - y1 # 计算 θ V_mod = math.sqrt(vx ** 2 + vy ** 2) AB_mod = math.sqrt(dx ** 2 + dy ** 2) if V_mod == 0 or AB_mod == 0: return 0 cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod) theta = math.acos(cos_theta) # 计算 AB 连线的模长 |AB| AB_mod = math.sqrt(dx ** 2 + dy ** 2) # 计算 AB 连线的单位向量 U_AB U_ABx = dx / AB_mod U_ABy = dy / AB_mod # 计算 A 相对于 B 的加速度 a_relative # ax = ax1 - ax2 # ay = ay1 - ay2 # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB a_on_AB = ax * U_ABx + ay * U_ABy VA = np.array([v_x1, v_y1]) VB = np.array([v_x2, v_y2]) D_A = np.array([x1, y1]) D_B = np.array([x2, y2]) V_r = VA - VB V = np.linalg.norm(V_r) w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB) a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta) return a_on_AB_back # 计算相对加速度 def _calculate_derivative(self, a, w, V, theta): # 计算(V×cos(θ))'的值 # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta derivative = a - w * V * math.sin(theta) return derivative def _cal_relative_angular_v(self, theta, A, B, VA, VB): dx = A[0] - B[0] dy = A[1] - B[1] dvx = VA[0] - VB[0] dvy = VA[1] - VB[1] # (dx * dvy - dy * dvx) angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2) return angular_velocity def _death_pr(self, obj_type, v_relative): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) return p_death def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity): if obj_type == 5: p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative)) else: p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative)) collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity return collisionRisk # 求两车之间当前距离 def dist(self, x1, y1, x2, y2): dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2) return dist # TTC (time to collision) def _cal_TTC(self, dist, vrel_projection_in_dist): if vrel_projection_in_dist == 0: return math.inf TTC = dist / vrel_projection_in_dist return TTC def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist): MTTC = math.nan if arel_projection_in_dist != 0: tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist if tmp < 0: return math.nan t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist if t1 > 0 and t2 > 0: if t1 >= t2: MTTC = t2 elif t1 < t2: MTTC = t1 elif t1 > 0 and t2 <= 0: MTTC = t1 elif t1 <= 0 and t2 > 0: MTTC = t2 if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0: MTTC = dist / vrel_projection_in_dist return MTTC # THW (time headway) def _cal_THW(self, dist, v_ego_projection_in_dist): if not v_ego_projection_in_dist: THW = None else: THW = dist / v_ego_projection_in_dist return THW def velocity(self, v_x, v_y): v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6 return v def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max): lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / ( 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max) return lon_dist_min def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min, a_lat_max): v_right_rho = v_right + rho * a_lat_max v_left_rho = v_left + rho * a_lat_max dist_min = lat_dist + ((v_right + v_right_rho) * rho / 2 + v_right_rho ** 2 / a_right_lat_brake_min / 2 + ( (v_left + v_right_rho) * rho / 2) + v_left_rho ** 2 / a_left_lat_brake_min / 2) return dist_min # DRAC (decelerate required avoid collision) def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2): dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671 if dist_length < 0: dist_width = dist - (width2 / 2 + width1 / 2) if dist_width < 0: return math.inf else: d = dist_width else: d = dist_length DRAC = vrel_projection_in_dist ** 2 / (2 * d) return DRAC # BTN (brake threat number) def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max): BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2): STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * ( width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max return STN # BTN (brake threat number) def cal_BTN(self, a_y1, ay, dy, vy, max_ay): BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值 return BTN # STN (steer threat number) def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2): STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax return STN # 追尾碰撞风险 def _normal_distribution(self, x): mean = 1.32 std_dev = 0.26 return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev) def continuous_group(self, df): time_list = df['simTime'].values.tolist() frame_list = df['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(frame_list)): if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1: sub_group_time.append(time_list[i]) sub_group_frame.append(frame_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [time_list[i]] sub_group_frame = [frame_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 time = [[g[0], g[-1]] for g in group_time] frame = [[g[0], g[-1]] for g in group_frame] unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time']) unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame']) unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1) return unfunc_df # 统计最危险的指标 def _safe_statistic_most_dangerous(self): min_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD'] max_list = ['DRAC', 'BTN', 'STN', 'collisionRisk', 'collisionSeverity'] for metric in min_list: if metric in self.metric_list: if metric in self.df.columns: self.calculated_value[metric] = self.df[metric].min() else: self.calculated_value[metric] = 10.0 for metric in max_list: if metric in self.metric_list: if metric in self.df.columns: self.calculated_value[metric] = self.df[metric].max() else: self.calculated_value[metric] = 10.0 return self.calculated_value def _safe_no_obj_statistic(self): # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"] return self.calculated_value def report_statistic(self, ): safety_result = {} if len(self.obj_id_list) == 1: safety_result = self._safe_no_obj_statistic() else: safety_result = self._safe_statistic_most_dangerous() evaluator = Score(config.SAFETY_CONFIG_PATH) result = evaluator.evaluate(safety_result) print("\n[安全性表现及得分情况]") return result