123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 安全指标计算模块
- """
- import numpy as np
- import pandas as pd
- import math
- from collections import defaultdict
- from typing import Dict, Any, List, Optional
- from modules.lib.score import Score
- from modules.lib.log_manager import LogManager
- # 安全指标相关常量
- SAFETY_INFO = [
- "simTime",
- "simFrame",
- "playerId",
- "posX",
- "posY",
- "posH",
- "speedX",
- "speedY",
- "accelX",
- "accelY",
- "v",
- "type"
- ]
- # ----------------------
- # SafetyCalculator单例管理
- # ----------------------
- # class SafetyCalculatorManager:
- # """SafetyCalculator单例管理器"""
- # _instance = None
-
- # @classmethod
- # def get_instance(cls, data_processed):
- # """获取SafetyCalculator实例,如果不存在则创建"""
- # if cls._instance is None or cls._instance.data_processed != data_processed:
- # cls._instance = SafetyCalculator(data_processed)
- # return cls._instance
- # ----------------------
- # 独立指标计算函数
- # ----------------------
- def calculate_ttc(data_processed) -> dict:
- """计算TTC (Time To Collision)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"TTC": None}
- try:
- safety = SafetyCalculator(data_processed)
- ttc_value = safety.get_ttc_value()
- return {"TTC": ttc_value}
- except Exception as e:
- LogManager().get_logger().error(f"TTC计算异常: {str(e)}", exc_info=True)
- return {"TTC": None}
- def calculate_mttc(data_processed) -> dict:
- """计算MTTC (Modified Time To Collision)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"MTTC": None}
- try:
- safety = SafetyCalculator(data_processed)
- mttc_value = safety.get_mttc_value()
- return {"MTTC": mttc_value}
- except Exception as e:
- LogManager().get_logger().error(f"MTTC计算异常: {str(e)}", exc_info=True)
- return {"MTTC": None}
- def calculate_thw(data_processed) -> dict:
- """计算THW (Time Headway)"""
- if data_processed is None or not hasattr(data_processed, 'object_df'):
- return {"THW": None}
- try:
- safety = SafetyCalculator(data_processed)
- thw_value = safety.get_thw_value()
- return {"THW": thw_value}
- except Exception as e:
- LogManager().get_logger().error(f"THW计算异常: {str(e)}", exc_info=True)
- return {"THW": None}
- def calculate_collisionrisk(data_processed) -> dict:
- """计算碰撞风险"""
- safety = SafetyCalculator(data_processed)
- collision_risk_value = safety.get_collision_risk_value()
- return {"collisionRisk": collision_risk_value}
- def calculate_lonsd(data_processed) -> dict:
- """计算纵向安全距离"""
- safety = SafetyCalculator(data_processed)
- lonsd_value = safety.get_lonsd_value()
- return {"LonSD": lonsd_value}
- def calculate_latsd(data_processed) -> dict:
- """计算横向安全距离"""
- safety = SafetyCalculator(data_processed)
- latsd_value = safety.get_latsd_value()
- return {"LatSD": latsd_value}
- def calculate_btn(data_processed) -> dict:
- """计算制动威胁数"""
- safety = SafetyCalculator(data_processed)
- btn_value = safety.get_btn_value()
- return {"BTN": btn_value}
- def calculate_collisionseverity(data_processed) -> dict:
- """计算碰撞严重性"""
- safety = SafetyCalculator(data_processed)
- collision_severity_value = safety.get_collision_severity_value()
- return {"collisionSeverity": collision_severity_value}
- class SafetyRegistry:
- """安全指标注册器"""
-
- def __init__(self, data_processed):
- self.logger = LogManager().get_logger()
- self.data = data_processed
- self.safety_config = data_processed.safety_config["safety"]
- self.metrics = self._extract_metrics(self.safety_config)
- self._registry = self._build_registry()
-
- def _extract_metrics(self, config_node: dict) -> list:
- """从配置中提取指标名称"""
- metrics = []
- def _recurse(node):
- if isinstance(node, dict):
- if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
- metrics.append(node['name'])
- for v in node.values():
- _recurse(v)
- _recurse(config_node)
- self.logger.info(f'评比的安全指标列表:{metrics}')
- return metrics
-
- def _build_registry(self) -> dict:
- """构建指标函数注册表"""
- registry = {}
- for metric_name in self.metrics:
- func_name = f"calculate_{metric_name.lower()}"
- if func_name in globals():
- registry[metric_name] = globals()[func_name]
- else:
- self.logger.warning(f"未实现安全指标函数: {func_name}")
- return registry
-
- def batch_execute(self) -> dict:
- """批量执行指标计算"""
- results = {}
- for name, func in self._registry.items():
- try:
- result = func(self.data)
- results.update(result)
- except Exception as e:
- self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
- results[name] = None
- self.logger.info(f'安全指标计算结果:{results}')
- return results
- class SafeManager:
- """安全指标管理类"""
-
- def __init__(self, data_processed):
- self.data = data_processed
- self.registry = SafetyRegistry(self.data)
-
- def report_statistic(self):
- """计算并报告安全指标结果"""
- safety_result = self.registry.batch_execute()
-
- return safety_result
-
- class SafetyCalculator:
- """安全指标计算类 - 兼容Safe类风格"""
- def __init__(self, data_processed):
- self.logger = LogManager().get_logger()
- self.data_processed = data_processed
- self.df = data_processed.object_df.copy()
- self.ego_df = data_processed.ego_data
- self.obj_id_list = data_processed.obj_id_list
- self.metric_list = [
- 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'BTN', 'collisionRisk', 'collisionSeverity'
- ]
- self.time_list = self.ego_df['simTime'].values.tolist()
- self.frame_list = self.ego_df['simFrame'].values.tolist()
- self.collisionRisk = 0
- self.empty_flag = True
- self.logger.info("SafetyCalculator初始化完成,obj_id_list长度: %d", len(self.obj_id_list))
- if len(self.obj_id_list) > 1:
- self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_drac_df = pd.DataFrame(
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_acce_xtn_df = pd.DataFrame(
- columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- self.most_dangerous = {}
- self.pass_percent = {}
- self.logger.info("开始执行安全参数计算 _safe_param_cal_new")
- self._safe_param_cal_new()
- self.logger.info("安全参数计算完成")
- def _safe_param_cal_new(self):
- self.logger.debug("进入 _safe_param_cal_new 方法")
- # 直接复用Safe类的实现
- Tc = 0.3 # 安全距离
- rho = self.data_processed.vehicle_config["RHO"]
- ego_accel_max = self.data_processed.vehicle_config["EGO_ACCEL_MAX"]
- obj_decel_max = self.data_processed.vehicle_config["OBJ_DECEL_MAX"]
- ego_decel_min = self.data_processed.vehicle_config["EGO_DECEL_MIN"]
- ego_decel_lon_max = self.data_processed.vehicle_config["EGO_DECEL_LON_MAX"]
- ego_decel_lat_max = self.data_processed.vehicle_config["EGO_DECEL_LAT_MAX"]
- obj_dict = defaultdict(dict)
- obj_data_dict = self.df.to_dict('records')
- for item in obj_data_dict:
- obj_dict[item['simFrame']][item['playerId']] = item
- df_list = []
- EGO_PLAYER_ID = 1
- for frame_num in self.frame_list:
- ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
- v1 = ego_data['v']
- x1 = ego_data['posX']
- y1 = ego_data['posY']
- h1 = ego_data['posH']
- v_x1 = ego_data['speedX']
- v_y1 = ego_data['speedY']
- a_x1 = ego_data['accelX']
- a_y1 = ego_data['accelY']
- for playerId in self.obj_id_list:
- if playerId == EGO_PLAYER_ID:
- continue
- try:
- obj_data = obj_dict[frame_num][playerId]
- except KeyError:
- continue
- x2 = obj_data['posX']
- y2 = obj_data['posY']
- dist = self.dist(x1, y1, x2, y2)
- obj_data['dist'] = dist
- v_x2 = obj_data['speedX']
- v_y2 = obj_data['speedY']
- v2 = obj_data['v']
- a_x2 = obj_data['accelX']
- a_y2 = obj_data['accelY']
- dx, dy = x2 - x1, y2 - y1
- A = np.array([dx, dy])
- x = np.array([1, 0])
- dot_product = np.dot(A, x)
- vector_length_A = np.linalg.norm(A)
- vector_length_x = np.linalg.norm(x)
- cos_theta = dot_product / (vector_length_A * vector_length_x)
- beta = np.arccos(cos_theta)
- lon_d = dist * math.cos(beta - h1)
- lat_d = abs(dist * math.sin(beta - h1))
- obj_dict[frame_num][playerId]['lon_d'] = lon_d
- obj_dict[frame_num][playerId]['lat_d'] = lat_d
- if lon_d > 100 or lon_d < -5 or lat_d > 4:
- continue
- self.empty_flag = False
- vx, vy = v_x1 - v_x2, v_y1 - v_y2
- ax, ay = a_x2 - a_x1, a_y2 - a_y1
- v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
- v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
- vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
- arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
- v_x2, v_y2)
- obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
- obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
- obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
- obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
- obj_type = obj_data['type']
- TTC = self._cal_TTC(dist, vrel_projection_in_dist) if abs(vrel_projection_in_dist) > 1e-6 else None
- MTTC = self._cal_MTTC(dist, vrel_projection_in_dist, arel_projection_in_dist)
- THW = self._cal_THW(dist, v_ego_p) if abs(v_ego_p) > 1e-6 else None
- LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, obj_decel_max)
- lat_dist = 0.5
- v_right = v1
- v_left = v2
- a_right_lat_brake_min = 1
- a_left_lat_brake_min = 1
- a_lat_max = 5
- LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
- a_left_lat_brake_min, a_lat_max)
- lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
- lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
- lon_a = abs(lon_a1 - lon_a2)
- lon_d = max(0.1, dist * abs(math.cos(beta - h1)))
- lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
- BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
- lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
- lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
- lat_a = abs(lat_a1 - lat_a2)
- lat_d = dist * abs(math.sin(beta - h1))
- lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
- obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
- obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
- TTC = None if (TTC is None or TTC < 0) else TTC
- MTTC = None if (MTTC is None or MTTC < 0) else MTTC
- THW = None if (THW is None or THW < 0) else THW
- obj_dict[frame_num][playerId]['TTC'] = TTC
- obj_dict[frame_num][playerId]['MTTC'] = MTTC
- obj_dict[frame_num][playerId]['THW'] = THW
- obj_dict[frame_num][playerId]['LonSD'] = LonSD
- obj_dict[frame_num][playerId]['LatSD'] = LatSD
- obj_dict[frame_num][playerId]['BTN'] = abs(BTN)
- collisionSeverity = 0
- pr_death = 0
- collisionRisk = 0
- obj_dict[frame_num][playerId]['collisionSeverity'] = collisionSeverity * 100
- obj_dict[frame_num][playerId]['pr_death'] = pr_death * 100
- obj_dict[frame_num][playerId]['collisionRisk'] = collisionRisk * 100
- df_fnum = pd.DataFrame(obj_dict[frame_num].values())
- df_list.append(df_fnum)
- df_safe = pd.concat(df_list)
- col_list = ['simTime', 'simFrame', 'playerId',
- 'TTC', 'MTTC', 'THW', 'LonSD', 'LatSD', 'BTN',
- 'collisionSeverity', 'pr_death', 'collisionRisk']
- self.df_safe = df_safe[col_list].reset_index(drop=True)
- def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 在 AB 连线上的速度 V1_on_AB
- V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
- return V1_on_AB
- def _cal_v_projection(self, dx, dy, vx, vy):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的速度 V_relative
- # vx = vx1 - vx2
- # vy = vy1 - vy2
- # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
- V_on_AB = vx * U_ABx + vy * U_ABy
- return V_on_AB
- def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 θ
- V_mod = math.sqrt(vx ** 2 + vy ** 2)
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- if V_mod == 0 or AB_mod == 0:
- return 0
- cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
- theta = math.acos(cos_theta)
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的加速度 a_relative
- # ax = ax1 - ax2
- # ay = ay1 - ay2
- # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
- a_on_AB = ax * U_ABx + ay * U_ABy
- VA = np.array([v_x1, v_y1])
- VB = np.array([v_x2, v_y2])
- D_A = np.array([x1, y1])
- D_B = np.array([x2, y2])
- V_r = VA - VB
- V = np.linalg.norm(V_r)
- w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
- a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
- return a_on_AB_back
- # 计算相对加速度
- def _calculate_derivative(self, a, w, V, theta):
- # 计算(V×cos(θ))'的值
- # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
- derivative = a - w * V * math.sin(theta)
- return derivative
- def _cal_relative_angular_v(self, theta, A, B, VA, VB):
- dx = A[0] - B[0]
- dy = A[1] - B[1]
- dvx = VA[0] - VB[0]
- dvy = VA[1] - VB[1]
- # (dx * dvy - dy * dvx)
- angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
- return angular_velocity
- def _death_pr(self, obj_type, v_relative):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- return p_death
- def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
- if obj_type == 5:
- p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
- return collisionRisk
- # 求两车之间当前距离
- def dist(self, x1, y1, x2, y2):
- dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
- return dist
- # TTC (time to collision)
- def _cal_TTC(self, dist, vrel_projection_in_dist):
- if vrel_projection_in_dist == 0:
- return math.inf
- TTC = dist / vrel_projection_in_dist
- return TTC
- def _cal_MTTC(self, dist, vrel_projection_in_dist, arel_projection_in_dist):
- MTTC = math.nan
- if arel_projection_in_dist != 0:
- tmp = vrel_projection_in_dist ** 2 + 2 * arel_projection_in_dist * dist
- if tmp < 0:
- return math.nan
- t1 = (-1 * vrel_projection_in_dist - math.sqrt(tmp)) / arel_projection_in_dist
- t2 = (-1 * vrel_projection_in_dist + math.sqrt(tmp)) / arel_projection_in_dist
- if t1 > 0 and t2 > 0:
- if t1 >= t2:
- MTTC = t2
- elif t1 < t2:
- MTTC = t1
- elif t1 > 0 and t2 <= 0:
- MTTC = t1
- elif t1 <= 0 and t2 > 0:
- MTTC = t2
- if arel_projection_in_dist == 0 and vrel_projection_in_dist > 0:
- MTTC = dist / vrel_projection_in_dist
- return MTTC
- # THW (time headway)
- def _cal_THW(self, dist, v_ego_projection_in_dist):
- if not v_ego_projection_in_dist:
- THW = None
- else:
- THW = dist / v_ego_projection_in_dist
- return THW
- def velocity(self, v_x, v_y):
- v = math.sqrt(v_x ** 2 + v_y ** 2) * 3.6
- return v
- def _cal_longitudinal_safe_dist(self, v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min, ego_decel_max):
- lon_dist_min = v_ego_p * rho + ego_accel_max * (rho ** 2) / 2 + (v_ego_p + rho * ego_accel_max) ** 2 / (
- 2 * ego_decel_min) - v_obj_p ** 2 / (2 * ego_decel_max)
- return lon_dist_min
- def _cal_lateral_safe_dist(self, lat_dist, v_right, v_left, rho, a_right_lat_brake_min, a_left_lat_brake_min,
- a_lat_max):
- v_right_rho = v_right + rho * a_lat_max
- v_left_rho = v_left + rho * a_lat_max
- dist_min = lat_dist + ((v_right + v_right_rho) * rho / 2 + v_right_rho ** 2 / a_right_lat_brake_min / 2 + (
- (v_left + v_right_rho) * rho / 2) + v_left_rho ** 2 / a_left_lat_brake_min / 2)
- return dist_min
- # DRAC (decelerate required avoid collision)
- def _cal_DRAC(self, dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2):
- dist_length = dist - (len2 / 2 - o_x2 + len1 / 2 + o_x1) # 4.671
- if dist_length < 0:
- dist_width = dist - (width2 / 2 + width1 / 2)
- if dist_width < 0:
- return math.inf
- else:
- d = dist_width
- else:
- d = dist_length
- DRAC = vrel_projection_in_dist ** 2 / (2 * d)
- return DRAC
- # BTN (brake threat number)
- def _cal_BTN_new(self, lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max):
- BTN = (lon_a1 + lon_a - lon_v ** 2 / (2 * lon_d)) / ego_decel_lon_max # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def _cal_STN_new(self, ttc, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2):
- STN = (lat_a1 + lat_a + 2 / ttc ** 2 * (lat_d + abs(ego_decel_lat_max * lat_v) * (
- width1 + width2) / 2 + abs(lat_v * ttc))) / ego_decel_lat_max
- return STN
- # BTN (brake threat number)
- def cal_BTN(self, a_y1, ay, dy, vy, max_ay):
- BTN = (a_y1 + ay - vy ** 2 / (2 * dy)) / max_ay # max_ay为此车可实现的最大纵向加速度,目前为本次实例里的最大值
- return BTN
- # STN (steer threat number)
- def cal_STN(self, ttc, a_x1, ax, dx, vx, max_ax, width1, width2):
- STN = (a_x1 + ax + 2 / ttc ** 2 * (dx + np.sign(max_ax * vx) * (width1 + width2) / 2 + vx * ttc)) / max_ax
- return STN
- # 追尾碰撞风险
- def _normal_distribution(self, x):
- mean = 1.32
- std_dev = 0.26
- return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
- def continuous_group(self, df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- unfunc_time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- unfunc_frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
- unfunc_df = pd.concat([unfunc_time_df, unfunc_frame_df], axis=1)
- return unfunc_df
- # 统计最危险的指标
-
- def _safe_statistic_most_dangerous(self):
- min_list = ['TTC', 'MTTC', 'THW', 'LonSD', 'LatSD']
- max_list = ['BTN', 'collisionRisk', 'collisionSeverity']
- result = {}
- for metric in min_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- val = self.df[metric].min()
- result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
- else:
- result[metric] = self._default_value(metric)
- for metric in max_list:
- if metric in self.metric_list:
- if metric in self.df.columns:
- val = self.df[metric].max()
- result[metric] = float(val) if pd.notnull(val) else self._default_value(metric)
- else:
- result[metric] = self._default_value(metric)
- return result
- def _safe_no_obj_statistic(self):
- # 仅有自车时的默认值
- result = {metric: self._default_value(metric) for metric in self.metric_list}
- return result
- def _default_value(self, metric):
- # 统一默认值
- defaults = {
- 'TTC': 10.0,
- 'MTTC': 4.2,
- 'THW': 2.1,
- 'LonSD': 10.0,
- 'LatSD': 2.0,
- 'BTN': 1.0,
- 'collisionRisk': 0.0,
- 'collisionSeverity': 0.0
- }
- return defaults.get(metric, None)
- def report_statistic(self):
- if len(self.obj_id_list) == 1:
- safety_result = self._safe_no_obj_statistic()
- else:
- safety_result = self._safe_statistic_most_dangerous()
- evaluator = Score(self.data_processed.safety_config)
- result = evaluator.evaluate(safety_result)
- print("\n[安全性表现及得分情况]")
- return result
- def get_ttc_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('TTC')
- ttc_values = self.df_safe['TTC'].dropna()
- return float(ttc_values.min()) if not ttc_values.empty else self._default_value('TTC')
- def get_mttc_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('MTTC')
- mttc_values = self.df_safe['MTTC'].dropna()
- return float(mttc_values.min()) if not mttc_values.empty else self._default_value('MTTC')
- def get_thw_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('THW')
- thw_values = self.df_safe['THW'].dropna()
- return float(thw_values.min()) if not thw_values.empty else self._default_value('THW')
- def get_lonsd_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('LonSD')
- lonsd_values = self.df_safe['LonSD'].dropna()
- return float(lonsd_values.mean()) if not lonsd_values.empty else self._default_value('LonSD')
- def get_latsd_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('LatSD')
- latsd_values = self.df_safe['LatSD'].dropna()
- return float(latsd_values.mean()) if not latsd_values.empty else self._default_value('LatSD')
- def get_btn_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('BTN')
- btn_values = self.df_safe['BTN'].dropna()
- return float(btn_values.max()) if not btn_values.empty else self._default_value('BTN')
- def get_collision_risk_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('collisionRisk')
- risk_values = self.df_safe['collisionRisk'].dropna()
- return float(risk_values.max()) if not risk_values.empty else self._default_value('collisionRisk')
- def get_collision_severity_value(self) -> float:
- if self.empty_flag or self.df_safe is None:
- return self._default_value('collisionSeverity')
- severity_values = self.df_safe['collisionSeverity'].dropna()
- return float(severity_values.max()) if not severity_values.empty else self._default_value('collisionSeverity')
|