|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
- @Data: 2023/07/25
- @Last Modified: 2023/07/26
- @Summary: safe metrics
- """
- import sys
- import os
- import math
- import numpy as np
- import pandas as pd
- from pathlib import Path
- root_path = Path(__file__).resolve().parent.parent
- sys.path.append(str(root_path))
- from models.common.score import Score
- from config import config
- from collections import defaultdict
- import scipy.integrate as spi
- from models.common import log # 确保这个路径是正确的,或者调整它
- log_path = config.LOG_PATH
- logger = log.get_logger(log_path)
- class Safe(object):
-
- def __init__(self, data_processed):
-
- self.data_processed = data_processed
- self.df = data_processed.object_df.copy()
- self.ego_df = data_processed.ego_data
- self.obj_id_list = data_processed.obj_id_list
- self.metric_list = config.SAFETY_METRIC_LIST
-
- # score data
- self.calculated_value = {
- 'TTC': 1.0,
- 'MTTC': 1.0,
- 'THW': 1.0,
- 'LonSD': 0.0,
- 'LatSD': 0.0,
- 'DRAC': 0.0,
- 'BTN': 1.0,
- 'STN': 1.0,
- 'collisionRisk': 0.0,
- 'collisionSeverity': 0.0,
- }
- # lists of drving control info
-
- self.time_list = data_processed.driver_ctrl_data['time_list']
-
- self.frame_list = self.ego_df['simFrame'].values.tolist()
-
- self.collisionRisk = 0
- self.empty_flag = True
- # no car following scene
- if len(self.obj_id_list) > 1:
- self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_drac_df = pd.DataFrame(
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_xtn_df = pd.DataFrame(
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.most_dangerous = {}
- self.pass_percent = {}
- self._safe_param_cal_new()
-
- def _safe_param_cal_new(self):
-
- Tc = config.TC # 安全距离
- rho = config.RHO # 驾驶员制动反应时间
- ego_accel_max = config.EGO_ACCEL_MAX # 自车油门最大加速度
- obj_decel_max = config.OBJ_DECEL_MAX # 前车刹车最大减速度
- ego_decel_min = config.EGO_DECEL_MIN # 自车刹车最小减速度 ug
- ego_decel_lon_max = config.EGO_DECEL_LON_MAX
- ego_decel_lat_max = config.EGO_DECEL_LAT_MAX
- # 构建双层字典数据结构
- obj_dict = defaultdict(dict)
- # 构建字典列表,每个元素是一个字典,包含'start_time', 'end_time', 'start_frame', 'end_frame', 'type'五个键值对
- obj_data_dict = self.df.to_dict('records')
- for item in obj_data_dict:
- obj_dict[item['simFrame']][item['playerId']] = item
- df_list = []
- EGO_PLAYER_ID = 1
- # self.empty_flag = True
- for frame_num in self.frame_list:
- ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
- v1 = ego_data['v'] # km/h to m/s
- x1 = ego_data['posX']
- y1 = ego_data['posY']
- h1 = ego_data['posH']
- len1 = ego_data['dimX']
- width1 = ego_data['dimY']
- o_x1 = ego_data['offX']
- v_x1 = ego_data['speedX'] # km/h to m/s
- v_y1 = ego_data['speedY'] # km/h to m/s
- a_x1 = ego_data['accelX']
- a_y1 = ego_data['accelY']
- # a1 = ego_data['accel']
- for playerId in self.obj_id_list:
- if playerId == EGO_PLAYER_ID:
- continue
- try:
- obj_data = obj_dict[frame_num][playerId]
- except KeyError:
- continue
- x2 = obj_data['posX']
- y2 = obj_data['posY']
- dist = self.dist(x1, y1, x2, y2)
- obj_data['dist'] = dist
- v_x2 = obj_data['speedX'] # km/h to m/s
- v_y2 = obj_data['speedY'] # km/h to m/s
- v2 = obj_data['v'] # km/h to m/s
- # h2 = obj_data['posH']
- len2 = obj_data['dimX']
- width2 = obj_data['dimY']
- o_x2 = obj_data['offX']
- a_x2 = obj_data['accelX']
- a_y2 = obj_data['accelY']
- # a2 = obj_data['accel']
- dx, dy = x2 - x1, y2 - y1
- # 定义矢量A和x轴正向向量x
- A = np.array([dx, dy])
- x = np.array([1, 0])
- # 计算点积和向量长度
- dot_product = np.dot(A, x)
- vector_length_A = np.linalg.norm(A)
- vector_length_x = np.linalg.norm(x)
- # 计算夹角的余弦值
- cos_theta = dot_product / (vector_length_A * vector_length_x)
- # 将余弦值转换为角度值(弧度制)
- beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
- lon_d = dist * math.cos(beta - h1)
- lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi)
- obj_dict[frame_num][playerId]['lon_d'] = lon_d
- obj_dict[frame_num][playerId]['lat_d'] = lat_d
- # 代码注释,这里是筛选出车前100米,车后5米,车右边4米的目标车,用于计算安全距离
- if lon_d > 100 or lon_d < -5 or lat_d > 4:
- continue
- self.empty_flag = False
- vx, vy = v_x1 - v_x2, v_y1 - v_y2
- ax, ay = a_x2 - a_x1, a_y2 - a_y1
- v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
- v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
- vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
- arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
- v_x2, v_y2)
- obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
- obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
- obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
- obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
- obj_type = obj_data['type']
- TTC = self._cal_TTC(dist, vrel_projection_in_dist)
- MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
- THW = self._cal_THW(dist, v_ego_p)
- # 单车道时可用
- LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
- obj_decel_max)
- lat_dist = 0.5
- v_right = v1
- v_left = v2
- a_right_lat_brake_min = 1
- a_left_lat_brake_min = 1
- a_lat_max = 5
- LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
- a_left_lat_brake_min,
- a_lat_max)
- DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
- lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
- lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
- lon_a = abs(lon_a1 - lon_a2)
- lon_d = dist * abs(math.cos(beta - h1))
- lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
- BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
- lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
- lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
- lat_a = abs(lat_a1 - lat_a2)
- lat_d = dist * abs(math.sin(beta - h1))
- lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
- STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
- obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
- obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
- # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay)
- # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2)
- TTC = None if (not TTC or TTC < 0) else TTC
- MTTC = None if (not MTTC or MTTC < 0) else MTTC
- THW = None if (not THW or THW < 0) else THW
- DRAC = 10 if DRAC >= 10 else DRAC
- # TTC要进行筛选,否则会出现nan或者TTC过大的情况
- if not TTC or TTC > 4000: # threshold = 4258.41
- collisionSeverity = 0
- pr_death = 0
- collisionRisk = 0
- else:
- result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
- collisionSeverity = 1 - result
- pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
- collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
- obj_dict[frame_num][playerId]['TTC'] = TTC
- obj_dict[frame_num][playerId]['MTTC'] = MTTC
- obj_dict[frame_num][playerId]['THW'] = THW
- obj_dict[frame_num][playerId]['LonSD'] = LonSD
- obj_dict[frame_num][playerId]['LatSD'] = LatSD
- obj_dict[frame_num][playerId]['DRAC'] = DRAC
- obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
- obj_dict[frame_num][playerId]['STN'] = abs(STN)
- obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100
- obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100
- obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100
- df_fnum = pd.DataFrame(obj_dict[frame_num].values())
- df_list.append(df_fnum)
- df_safe = pd.concat(df_list)
- col_list = ['simTime', 'simFrame', 'playerId',
- 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'DRAC', 'BTN', 'STN', 'collisionSeverity', 'pr_death',
- 'collisionRisk']
- self.df_safe = df_safe[col_list].reset_index(drop=True)
- def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 在 AB 连线上的速度 V1_on_AB
- V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
- return V1_on_AB
- def _cal_v_projection(self, dx, dy, vx, vy):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的速度 V_relative
- # vx = vx1 - vx2
- # vy = vy1 - vy2
- # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
- V_on_AB = vx * U_ABx + vy * U_ABy
- return V_on_AB
- def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 θ
- V_mod = math.sqrt(vx ** 2 + vy ** 2)
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- if V_mod == 0 or AB_mod == 0:
- return 0
- cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
- theta = math.acos(cos_theta)
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的加速度 a_relative
- # ax = ax1 - ax2
- # ay = ay1 - ay2
- # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
- a_on_AB = ax * U_ABx + ay * U_ABy
- VA = np.array([v_x1, v_y1])
- VB = np.array([v_x2, v_y2])
- D_A = np.array([x1, y1])
- D_B = np.array([x2, y2])
- V_r = VA - VB
- V = np.linalg.norm(V_r)
- w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
- a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
- return a_on_AB_back
- # 计算相对加速度
- def _calculate_derivative(self, a, w, V, theta):
- # 计算(V×cos(θ))'的值
- # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
- derivative = a - w * V * math.sin(theta)
- return derivative
- def _cal_relative_angular_v(self, theta, A, B, VA, VB):
- dx = A[0] - B[0]
- dy = A[1] - B[1]
- dvx = VA[0] - VB[0]
- dvy = VA[1] - VB[1]
- # (dx * dvy - dy * dvx)
- angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
- return angular_velocity
- def _death_pr(self, obj_type, v_relative):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- return p_death
- def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
- return collisionRisk
- # 求两车之间当前距离
- def dist(self, x1, y1, x2, y2):
- dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
- return dist
- # TTC (time to collision)
- def _cal_TTC(self, dist, vrel_projection_in_dist):
- if vrel_projection_in_dist == 0:
- return math.inf
- TTC = dist / vrel_projection_in_dist
- return TTC
- def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
- MTTC = math.nan
- if arel_projection_in_dist != 0:
- tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist
- if tmp < 0:
- return math.nan
- t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist
- t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist
- if t1 > 0 and t2 > 0:
- if t1 >= t2:
- MTTC = t2
- elif t1 < t2:
- MTTC = t1
- elif t1 > 0 and t2 <= 0:
- MTTC = t1
- elif t1 <= 0 and t2 > 0:
- MTTC = t2
- if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
- MTTC = dist / vrel_projection_in_dist
- return MTTC
- # THW (time headway)
- def _cal_THW(self, dist, v_ego_projection_in_dist):
- if not v_ego_projection_in_dist:
- THW = None
- else:
- THW = dist / v_ego_projection_in_dist
- return THW
- def velocity(self, v_x, v_y):
- v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
- return v
- def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max):
- lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (
- 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max)
- return lon_dist_min
- def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min,
- a_lat_max):
- v_right_rho = v_right + rho * a_lat_max
- v_left_rho = v_left + rho * a_lat_max
- dist_min = lat_dist + ((v_right + v_right_rho) * rho / 2 + v_right_rho ** 2 / a_right_lat_brake_min / 2 + (
- (v_left + v_right_rho) * rho / 2) + v_left_rho ** 2 / a_left_lat_brake_min / 2)
- return dist_min
- # DRAC (decelerate required avoid collision)
- def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
- dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
- if dist_length < 0:
- dist_width = dist - (width2 / 2 + width1 / 2)
- if dist_width < 0:
- return math.inf
- else:
- d = dist_width
- else:
- d = dist_length
- DRAC = vrel_projection_in_dist ** 2 / (2 * d)
- return DRAC
- # BTN (brake threat number)
- def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
- BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2):
- STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * (
- width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max
- return STN
- # BTN (brake threat number)
- def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
- BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
- STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax
- return STN
- # 追尾碰撞风险
- def _normal_distribution(self, x):
- mean = 1.32
- std_dev = 0.26
- return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
- def continuous_group(self, df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
- unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
- return unfunc_df
- # 统计最危险的指标
-
- def _safe_statistic_most_dangerous(self):
- min_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD']
- max_list = ['DRAC', 'BTN', 'STN', 'collisionRisk', 'collisionSeverity']
-
- for metric in min_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- self.calculated_value[metric] = self.df[metric].min()
- else:
- self.calculated_value[metric] = 10.0
- for metric in max_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- self.calculated_value[metric] = self.df[metric].max()
- else:
- self.calculated_value[metric] = 10.0
- return self.calculated_value
- def _safe_no_obj_statistic(self):
- # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"]
-
- return self.calculated_value
-
- def report_statistic(self, ):
-
- safety_result = {}
- if len(self.obj_id_list) == 1:
- safety_result = self._safe_no_obj_statistic()
- else:
- safety_result = self._safe_statistic_most_dangerous()
-
- evaluator = Score(config.SAFETY_CONFIG_PATH)
- result = evaluator.evaluate(safety_result)
-
- print("\n[安全性表现及得分情况]")
-
- return result
-
|