|
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
- @Data: 2023/07/25
- @Last Modified: 2023/07/26
- @Summary: safe metrics
- """
- import math
- import numpy as np
- import pandas as pd
- from modules.lib.score import Score
- from modules.config import config
- from collections import defaultdict
- import scipy.integrate as spi
- class Safe(object):
- def __init__(self, data_processed):
- # self.logger = log.get_logger() # 使用时再初始化
- self.data_processed = data_processed
- self.df = data_processed.object_df.copy()
- self.ego_df = data_processed.ego_data.copy()
- self.obj_id_list = data_processed.obj_id_list
- self.metric_list = [
- "TTC",
- "MTTC",
- "THW",
- "LatSD",
- "BTN",
- "collisionRisk",
- "collisionSeverity",
- ]
- # score data
- self.calculated_value = {
- "TTC": 10.0,
- "MTTC": 10.0,
- "THW": 10.0,
- # 'LonSD': 0.0,
- "LatSD": 3.0,
- # 'DRAC': 0.0,
- "BTN": 1.0,
- # 'STN': 1.0,
- "collisionRisk": 0.0,
- "collisionSeverity": 0.0,
- }
- # lists of drving control info
- self.time_list = self.ego_df["simTime"].values.tolist()
- self.frame_list = self.ego_df["simFrame"].values.tolist()
- self.collisionRisk = 0
- self.empty_flag = True
- # no car following scene
- if len(self.obj_id_list) > 1:
- self.unsafe_df = pd.DataFrame(
- columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
- )
- self.unsafe_time_df = pd.DataFrame(
- columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
- )
- self.unsafe_dist_df = pd.DataFrame(
- columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
- )
- # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_drac_df = pd.DataFrame(
- columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
- )
- self.unsafe_acce_xtn_df = pd.DataFrame(
- columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
- )
- self.unsafe_prob_df = pd.DataFrame(
- columns=["start_time", "end_time", "start_frame", "end_frame", "type"]
- )
- self.most_dangerous = {}
- self.pass_percent = {}
- self._safe_param_cal_new()
- def _safe_param_cal_new(self):
- Tc = 0.3 # 安全距离
- rho = Tc # 驾驶员制动反应时间(单位:秒)
- ego_accel_max = self.data_processed.vehicle_config[
- "EGO_ACCEL_MAX"
- ] # 自车油门最大加速度
- obj_decel_max = self.data_processed.vehicle_config[
- "OBJ_DECEL_MAX"
- ] # 前车刹车最大减速度
- ego_decel_min = self.data_processed.vehicle_config[
- "EGO_DECEL_MIN"
- ] # 自车刹车最小减速度 ug
- ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
- ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
- # 构建双层字典数据结构
- obj_dict = defaultdict(dict)
- # 构建字典列表,每个元素是一个字典,包含'start_time', 'end_time', 'start_frame', 'end_frame', 'type'五个键值对
- obj_data_dict = self.df.to_dict("records")
- for item in obj_data_dict:
- obj_dict[item["simFrame"]][item["playerId"]] = item
- df_list = []
- EGO_PLAYER_ID = 1
- # self.empty_flag = True
- for frame_num in self.frame_list:
- ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
- v1 = ego_data["v"]
- x1 = ego_data["posX"]
- y1 = ego_data["posY"]
- h1 = ego_data["posH"]
- # 将角度转换为弧度
-
- # len1 = ego_data['dimX']
- # width1 = ego_data['dimY']
- # o_x1 = ego_data['offX']
- v_x1 = ego_data["speedX"]
- v_y1 = ego_data["speedY"]
- a_x1 = ego_data["accelX"]
- a_y1 = ego_data["accelY"]
- # a1 = ego_data['accel']
- for playerId in self.obj_id_list:
- if playerId == EGO_PLAYER_ID:
- continue
- try:
- obj_data = obj_dict[frame_num][playerId]
- except KeyError:
- continue
- x2 = obj_data["posX"]
- y2 = obj_data["posY"]
- dist = self.dist(x1, y1, x2, y2)
- obj_data["dist"] = dist
- v_x2 = obj_data["speedX"] # m/s
- v_y2 = obj_data["speedY"] # m/s
- v2 = obj_data["v"] # m/s
- # h2 = obj_data['posH']
- # len2 = obj_data['dimX']
- # width2 = obj_data['dimY']
- # o_x2 = obj_data['offX']
- a_x2 = obj_data["accelX"]
- a_y2 = obj_data["accelY"]
- # a2 = obj_data['accel']
- dx, dy = x2 - x1, y2 - y1
- # 计算目标车相对于自车的方位角
- beta = math.atan2(dy, dx) # 添加beta的计算
-
- # 将全局坐标系下的相对位置向量转换到自车坐标系
- # 自车坐标系:车头方向为x轴正方向,车辆左侧为y轴正方向
- h1_rad = math.radians(90 - h1) # 转换为与x轴的夹角
-
- # 坐标变换
- lon_d = dx * math.cos(h1_rad) + dy * math.sin(h1_rad) # 纵向距离(前为正,后为负)
- lat_d = -dx * math.sin(h1_rad) + dy * math.cos(h1_rad) # 横向距离(左为正,右为负)
- # 添加调试信息
- # if frame_num == self.frame_list[-1]: # 判断是否是最后一帧
- # print(f"最后一帧数据 frame_num={frame_num}:")
- # print(f"目标车ID: {playerId}")
- # print(f"自车位置: ({x1:.2f}, {y1:.2f}), 朝向角: {h1:.2f}度")
- # print(f"目标车位置: ({x2:.2f}, {y2:.2f})")
- # print(f"相对距离: dx={dx:.2f}, dy={dy:.2f}")
- # print(f"纵向距离: lon_d={lon_d:.2f}")
- # print(f"横向距离: lat_d={lat_d:.2f}")
- # print("------------------------")
- obj_dict[frame_num][playerId]["lon_d"] = lon_d
- obj_dict[frame_num][playerId]["lat_d"] = lat_d
- # 代码注释,这里是筛选出车前100米,车后5米,车右边4米的目标车,用于计算安全距离
- # print(f"lon_d: {lon_d}, lat_d: {lat_d}")
- if lon_d > 100 or lon_d < -5 or abs(lat_d) > 4:
- continue
- self.empty_flag = False
- vx, vy = v_x1 - v_x2, v_y1 - v_y2
- ax, ay = a_x2 - a_x1, a_y2 - a_y1
- v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
- v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
- vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
- arel_projection_in_dist = self._cal_a_projection(
- dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2
- )
- obj_dict[frame_num][playerId][
- "vrel_projection_in_dist"
- ] = vrel_projection_in_dist
- obj_dict[frame_num][playerId][
- "arel_projection_in_dist"
- ] = arel_projection_in_dist
- obj_dict[frame_num][playerId]["v_ego_projection_in_dist"] = v_ego_p
- obj_dict[frame_num][playerId]["v_obj_projection_in_dist"] = v_obj_p
- obj_type = obj_data["type"]
- TTC = self._cal_TTC(dist, vrel_projection_in_dist)
- MTTC = self._cal_MTTC(
- TTC, vrel_projection_in_dist, arel_projection_in_dist
- )
- THW = self._cal_THW(dist, v_ego_p)
- # 单车道时可用
- # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
- # obj_decel_max)
- lat_dist = 0.5
- v_right = v1
- v_left = v2
- a_right_lat_brake_min = 1
- a_left_lat_brake_min = 1
- a_lat_max = 5
- LatSD = self._cal_lateral_safe_dist(
- lat_dist,
- v_right,
- v_left,
- rho,
- a_right_lat_brake_min,
- a_left_lat_brake_min,
- a_lat_max,
- )
- # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
- # 将加速度和速度投影到车辆坐标系
- h1_rad_for_projection = math.radians(90 - h1) # 与L175保持一致
-
- # 计算纵向加速度
- lon_a1 = a_x1 * math.cos(h1_rad_for_projection) + a_y1 * math.sin(h1_rad_for_projection)
- lon_a2 = a_x2 * math.cos(h1_rad_for_projection) + a_y2 * math.sin(h1_rad_for_projection)
- lon_a = abs(lon_a1 - lon_a2)
-
- # 使用已计算的纵向距离或重新计算
- # 选项1: 使用已计算的lon_d (推荐,保持一致性)
- # lon_d = obj_dict[frame_num][playerId]["lon_d"]
-
- # 选项2: 重新计算,确保与L178一致
- # lon_d = dx * math.cos(h1_rad_for_projection) + dy * math.sin(h1_rad_for_projection)
-
- # 选项3: 使用简化公式,但确保角度单位一致
- lon_d = max(dist * abs(math.cos(beta - h1_rad_for_projection)), 0.1) # 设置最小值避免除零
-
- # 计算纵向速度
- lon_v = v_x1 * math.cos(h1_rad_for_projection) + v_y1 * math.sin(h1_rad_for_projection)
-
- # 计算BTN
- BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
- # 这部分代码可以简化,直接使用已计算的lon_a1和lat_a1
- # 计算纵向加速度
- # lon_a1 = a_x1 * math.cos(h1_rad_for_projection) + a_y1 * math.sin(h1_rad_for_projection)
- lon_a1 = ego_data["lon_acc"] # 直接使用已计算的纵向加速度
-
- # 计算横向加速度
- # lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
- lat_a1 = ego_data["lat_acc"] # 直接使用已计算的横向加速度
- lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
- # 修改:考虑相对加速度的方向,不再取绝对值
- lat_a = lat_a2 - lat_a1 # 目标车减去自车的加速度,保留方向信息
- lat_d = dist * abs(math.sin(beta - h1_rad))
- lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
- # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
- obj_dict[frame_num][playerId]["lat_v_rel"] = v_x1 - v_x2
- obj_dict[frame_num][playerId]["lon_v_rel"] = v_y1 - v_y2
- TTC = None if (not TTC or TTC < 0) else TTC
- MTTC = None if (not MTTC or MTTC < 0) else MTTC
- THW = None if (not THW or THW < 0) else THW
- # print(f'TTC: {TTC if TTC is None else f"{TTC:.2f}"}, '
- # f'MTTC: {MTTC if MTTC is None else f"{MTTC:.2f}"}, '
- # f'THW: {THW if THW is None else f"{THW:.2f}"}')
- # 原DRAC计算已被注释,需移除相关代码引用
- # DRAC = 10 if DRAC >= 10 else DRAC # 删除这行
- # 修改后的指标赋值
- obj_dict[frame_num][playerId]["TTC"] = TTC
- obj_dict[frame_num][playerId]["MTTC"] = MTTC
- obj_dict[frame_num][playerId]["THW"] = THW
- # obj_dict[frame_num][playerId]['LonSD'] = LonSD
- obj_dict[frame_num][playerId]["LatSD"] = LatSD
- # obj_dict[frame_num][playerId]['DRAC'] = DRAC # 删除这行
- obj_dict[frame_num][playerId]["BTN"] = abs(BTN)
- # obj_dict[frame_num][playerId]['STN'] = abs(STN) # 删除这行
- # 确保以下变量已定义
- collisionSeverity = (
- collisionSeverity if "collisionSeverity" in locals() else 0
- )
- pr_death = pr_death if "pr_death" in locals() else 0
- collisionRisk = collisionRisk if "collisionRisk" in locals() else 0
- obj_dict[frame_num][playerId]["collisionSeverity"] = (
- collisionSeverity * 100
- )
- obj_dict[frame_num][playerId]["pr_death"] = pr_death * 100
- obj_dict[frame_num][playerId]["collisionRisk"] = collisionRisk * 100
- # print(
- # f"{playerId} {frame_num} {TTC} {MTTC} {THW} {LatSD} {BTN} {collisionSeverity} {pr_death} {collisionRisk}"
- # )
- df_fnum = pd.DataFrame(obj_dict[frame_num].values())
- df_list.append(df_fnum)
- df_safe = pd.concat(df_list)
- col_list = [
- "simTime",
- "simFrame",
- "playerId",
- "TTC",
- "MTTC",
- "THW",
- "LatSD",
- "BTN", # 移除STN
- "collisionSeverity",
- "pr_death",
- "collisionRisk",
- ]
- if not self.empty_flag:
- self.df = df_safe[col_list].reset_index(drop=True)
- # self.df_safe是有问题的,因为后边统计的函数里是没有self.df_safe这个变量的,只有self.df,
- # 需要增加如果是空的情况下,self.df_safe的值为默认值。
- def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx**2 + dy**2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 在 AB 连线上的速度 V1_on_AB
- V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
- return V1_on_AB
- def _cal_v_projection(self, dx, dy, vx, vy):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx**2 + dy**2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的速度 V_relative
- # vx = vx1 - vx2
- # vy = vy1 - vy2
- # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
- V_on_AB = vx * U_ABx + vy * U_ABy
- # print(f'_cal_v_projection:{V_on_AB}')
- return V_on_AB
- def _cal_a_projection(
- self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2
- ):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 θ
- V_mod = math.sqrt(vx**2 + vy**2)
- AB_mod = math.sqrt(dx**2 + dy**2)
- if V_mod == 0 or AB_mod == 0:
- return 0
- cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
- theta = math.acos(cos_theta)
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx**2 + dy**2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的加速度 a_relative
- # ax = ax1 - ax2
- # ay = ay1 - ay2
- # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
- a_on_AB = ax * U_ABx + ay * U_ABy
- VA = np.array([v_x1, v_y1])
- VB = np.array([v_x2, v_y2])
- D_A = np.array([x1, y1])
- D_B = np.array([x2, y2])
- V_r = VA - VB
- V = np.linalg.norm(V_r)
- w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
- a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
- return a_on_AB_back
- # 计算相对加速度
- def _calculate_derivative(self, a, w, V, theta):
- # 计算(V×cos(θ))'的值
- # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
- derivative = a - w * V * math.sin(theta)
- return derivative
- def _cal_relative_angular_v(self, theta, A, B, VA, VB):
- dx = A[0] - B[0]
- dy = A[1] - B[1]
- dvx = VA[0] - VB[0]
- dvy = VA[1] - VB[1]
- # (dx * dvy - dy * dvx)
- angular_velocity = (
- math.sqrt(dvx**2 + dvy**2) * math.sin(theta) / math.sqrt(dx**2 + dy**2)
- )
- return angular_velocity
- def _death_pr(self, obj_type, v_relative):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- return p_death
- def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
- return collisionRisk
- # 求两车之间当前距离
- def dist(self, x1, y1, x2, y2):
- dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
- return dist
- # TTC (time to collision)
- def _cal_TTC(self, dist, vrel_projection_in_dist):
- # 如果相对速度接近0,视为相对静止或无法计算TTC
- if abs(vrel_projection_in_dist) < 0.02: # 使用之前讨论过的阈值判断
- return self.calculated_value["TTC"] # 返回默认值10.0
- TTC = dist / vrel_projection_in_dist
- # 如果计算结果为负,表示正在远离,也视为安全
- if TTC < 0:
- return self.calculated_value["TTC"] # 返回默认值10.0
- return TTC
- def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
- if arel_projection_in_dist != 0:
- tmp = vrel_projection_in_dist**2 + 2 * arel_projection_in_dist * dist
- if tmp < 0:
- return self.calculated_value["MTTC"] # 返回默认值10.0
- t1 = (
- -1 * vrel_projection_in_dist - math.sqrt(tmp)
- ) / arel_projection_in_dist
- t2 = (
- -1 * vrel_projection_in_dist + math.sqrt(tmp)
- ) / arel_projection_in_dist
- if t1 > 0 and t2 > 0:
- if t1 >= t2:
- MTTC = t2
- elif t1 < t2:
- MTTC = t1
- elif t1 > 0 and t2 <= 0:
- MTTC = t1
- elif t1 <= 0 and t2 > 0:
- MTTC = t2
- else:
- return self.calculated_value["MTTC"] # 返回默认值10.0
- elif arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
- MTTC = dist / vrel_projection_in_dist
- else:
- return self.calculated_value["MTTC"] # 返回默认值10.0
- return MTTC
- # THW (time headway)
- def _cal_THW(self, dist, v_ego_projection_in_dist):
- if not v_ego_projection_in_dist or abs(v_ego_projection_in_dist) < 0.02:
- return self.calculated_value["THW"] # 返回默认值10.0
- THW = dist / v_ego_projection_in_dist
- if THW < 0:
- return self.calculated_value["THW"] # 返回默认值10.0
- return THW
- def velocity(self, v_x, v_y):
- v = math.sqrt(v_x**2 + v_y**2) * 3.6
- return v
- def _cal_longitudinal_safe_dist(
- self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max
- ):
- lon_dist_min = (
- v_ego_p * rho
- + ego_accel_max * (rho**2) / 2
- + (v_ego_p + rho * ego_accel_max) ** 2 / (2 * ego_decel_min)
- - v_obj_p**2 / (2 * ego_decel_max)
- )
- return lon_dist_min
- def _cal_lateral_safe_dist(
- self,
- lat_dist,
- v_right,
- v_left,
- rho,
- a_right_lat_brake_min,
- a_left_lat_brake_min,
- a_lat_max,
- ):
- # 检查除数是否为零
- if a_right_lat_brake_min == 0 or a_left_lat_brake_min == 0:
- return self.calculated_value["LatSD"] # 返回默认值3.0
-
- v_right_rho = v_right + rho * a_lat_max
- v_left_rho = v_left + rho * a_lat_max
- dist_min = lat_dist + (
- (v_right + v_right_rho) * rho / 2
- + v_right_rho**2 / a_right_lat_brake_min / 2
- + ((v_left + v_right_rho) * rho / 2)
- + v_left_rho**2 / a_left_lat_brake_min / 2
- )
- return dist_min
- # DRAC (decelerate required avoid collision)
- def _cal_DRAC(
- self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2
- ):
- dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
- if dist_length < 0:
- dist_width = dist - (width2 / 2 + width1 / 2)
- if dist_width < 0:
- return self.calculated_value["DRAC"] if "DRAC" in self.calculated_value else 0.0 # 使用默认值
- else:
- d = dist_width
- else:
- d = dist_length
-
- # 避免除零
- if d <= 0:
- return self.calculated_value["DRAC"] if "DRAC" in self.calculated_value else 0.0 # 使用默认值
-
- DRAC = vrel_projection_in_dist**2 / (2 * d)
- return DRAC
- # BTN (brake threat number)
- def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
- # 安全检查,避免除零错误
- if lon_d <= 0 or ego_decel_lon_max == 0:
- return self.calculated_value["BTN"] # 返回默认值1.0
-
- # 注意:lon_a现在是带方向的相对加速度(lon_a2 - lon_a1)
- # 根据BTN公式:BTN = (lon_a1 + lon_a - lon_v**2 / (2 * lon_d)) / ego_decel_lon_max
- BTN = (
- lon_a1 + lon_a - lon_v**2 / (2 * lon_d)
- ) / ego_decel_lon_max
-
- return BTN
- # STN (steer threat number)
- def _cal_STN_new(
- self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2
- ):
- STN = (
- lat_a1
- + lat_a
- + 2
- / ttc**2
- * (
- lat_d
- + abs(ego_decel_lat_max * lat_v) * (width1 + width2) / 2
- + abs(lat_v * ttc)
- )
- ) / ego_decel_lat_max
- return STN
- # BTN (brake threat number)
- def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
- BTN = (
- a_y1 + ay - vy**2 / (2 * dy)
- ) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
- STN = (
- a_x1
- + ax
- + 2
- / ttc**2
- * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)
- ) / max_ax
- return STN
- # 追尾碰撞风险
- def _normal_distribution(self, x):
- mean = 1.32
- std_dev = 0.26
- return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(
- -0.5 * (x - mean) ** 2 / std_dev
- )
- def continuous_group(self, df):
- time_list = df["simTime"].values.tolist()
- frame_list = df["simFrame"].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- unfunc_time_df = pd.DataFrame(time, columns=["start_time", "end_time"])
- unfunc_frame_df = pd.DataFrame(frame, columns=["start_frame", "end_frame"])
- unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
- return unfunc_df
- # 统计最危险的指标
- def _safe_statistic_most_dangerous(self):
- min_list = ["TTC", "MTTC", "THW", "LatSD"]
- max_list = ["BTN", "collisionRisk", "collisionSeverity"] # 移除STN
- for metric in min_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- # 筛选出非NaN的值
- valid_values = self.df[metric].dropna()
- if not valid_values.empty:
- # 如果有非NaN值,计算最小值
- self.calculated_value[metric] = valid_values.min()
- print(f'self.calculated_value[{metric}]:{self.calculated_value[metric]}')
- else:
- # 如果全是NaN或列为空,赋默认值
- self.calculated_value[metric] = self.calculated_value[metric]
- else:
- # 列不存在,赋默认值
- self.calculated_value[metric] = self.calculated_value[metric]
- for metric in max_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- # 筛选出非NaN的值
- valid_values = self.df[metric].dropna()
- if not valid_values.empty:
- # 如果有非NaN值,计算最大值
- self.calculated_value[metric] = valid_values.max()
- print(f'self.calculated_value[{metric}]:{self.calculated_value[metric]}')
- else:
- # 如果全是NaN或列为空,赋默认值
- self.calculated_value[metric] = self.calculated_value[metric]
- else:
- # 列不存在,赋默认值
- self.calculated_value[metric] = self.calculated_value[metric]
- return self.calculated_value
- def _safe_no_obj_statistic(self):
- # list_metric = ["TTC","MTTC","THW","LonSD","LatSD","DRAC","BTN","STN","collisionSeverity", "collisionRisk"]
- return self.calculated_value
- def report_statistic(
- self,
- ):
- safety_result = {}
- if len(self.obj_id_list) == 1:
- safety_result = self._safe_no_obj_statistic()
- else:
- safety_result = self._safe_statistic_most_dangerous()
- evaluator = Score(self.data_processed.safety_config)
- result = evaluator.evaluate(safety_result)
- # self.logger.info(f"安全性表现及得分情况:[{result}]")
- print("\n[安全性表现及得分情况]")
- return result
|