123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/08/03
- @Last Modified: 2023/08/03
- @Summary: Functionality metrics
- """
- import os
- import math
- import numpy as np
- import pandas as pd
- from collections import defaultdict
- import scipy.integrate as spi
- from config import DANGEROUS_DIST, LONGITUDINAL_DISTANCE_FRONT, LONGITUDINAL_DISTANCE_REAR, LATERAL_DISTANCE_ABS, \
- OVERSPEED_THRESHOLD
- from common import score_grade, string_concatenate, replace_key_with_value, score_over_100, _cal_max_min_avg
- from score_weight import cal_score_with_priority, cal_weight_from_80
- class Safe(object):
- """
- Class for achieving safe metrics for autonomous driving.
- Attributes:
- df: Vehicle driving data, stored in dataframe format.
- df_drivectrl: Vehcile driving control data, including brakepedal, throttle pedal and steerignwheel.
- Methods:
- _safe_param_cal_new: Calculate parameters for evaluateion.
- _cal_v_ego_projection: Calculate ego car velocity project over distance.
- _cal_v_projection: Calculate relative velocity project over distance.
- _cal_a_projection: Calculate relative acceleration project over distance.
- _calculate_derivative: Calculate derivative.
- _cal_relative_angular_v: Calculate relative angular velocity.
- _death_pr: Calculate death probability.
- _cal_collisionRisk_level: Calculate collisionRisk level.
- dist: Calculate distance of two cars.
- """
- def __init__(self, data_processed, scoreModel, resultPath):
- # self.eval_data = pd.DataFrame()
- # self.data_processed = data_processed
- self.scoreModel = scoreModel
- self.resultPath = resultPath
- self.df = data_processed.object_df.copy()
- self.ego_df = data_processed.ego_df
- # print("self.ego_df is", self.ego_df)
- self.obj_id_list = data_processed.obj_id_list
- # print("self.obj_id_list is", self.obj_id_list)
- #
- # # config infos for calculating score
- self.config = data_processed.config
- # print("self.config is", self.config)
- safe_config = self.config.config['safe']
- self.safe_config = safe_config
- # # common data
- self.bulitin_metric_list = self.config.builtinMetricList
- #
- # # dimension data
- self.weight_custom = safe_config['weightCustom']
- self.metric_list = safe_config['metric']
- self.type_list = safe_config['type']
- self.type_name_dict = safe_config['typeName']
- self.name_dict = safe_config['name']
- self.unit_dict = safe_config['unit']
- # # custom metric data
- # # self.customMetricParam = safe_config['customMetricParam']
- # # self.custom_metric_list = list(self.customMetricParam.keys())
- # # self.custom_data = {}
- # # self.custom_param_dict = {}
- #
- # # score data
- self.weight = safe_config['weightDimension']
- self.weight_type_dict = safe_config['typeWeight']
- self.weight_type_list = safe_config['typeWeightList']
- self.weight_dict = safe_config['weight']
- self.weight_list = safe_config['weightList']
- self.priority_dict = safe_config['priority']
- self.priority_list = safe_config['priorityList']
- self.kind1_dict = safe_config['kind']
- # self.kind1_dict = self.kind_dict[0]
- self.optimal1_dict = safe_config['optimal']
- # self.optimal1_dict = self.optimal_dict[0]
- self.multiple_dict = safe_config['multiple']
- self.kind_list = safe_config['kindList']
- self.optimal_list = safe_config['optimalList']
- self.multiple_list = safe_config['multipleList']
- self.metric_dict = safe_config['typeMetricDict']
- # # self.time_metric_list = self.metric_dict['safeTime']
- # # self.distance_metric_list = self.metric_dict['safeDistance']
- #
- # # lists of drving control info
- # self.time_list = data_processed.driver_ctrl_data['time_list']
- # self.frame_list = self.df['simFrame'].values.tolist()
- if len(self.df) > 0:
- if self.df['simFrame'].values.tolist()[-1] <= self.ego_df['simFrame'].values.tolist()[-1]:
- self.frame_list = self.df['simFrame'].values.tolist()
- else:
- self.frame_list = self.ego_df['simFrame'].values.tolist()
- else:
- self.frame_list = []
- self.collision_dist_list = []
- self.collisionRisk_list = []
- self.collisionSeverity_list = []
- self.overSpeed_list = []
- self.simFrame_list = []
- self.simtime_risk_max = []
- self.simtime_risk_min = []
- self.simFrame_severity_list = []
- self.simtime_severity_max = []
- self.simtime_severity_min = []
- self.simtime_collisioncount = []
- self.collision_risk_dict = dict()
- self.collision_severity_dict = dict()
- self.over_speed_dict = dict()
- self.collisionCount = 0
- self.collisionRisk = 0
- self.collisionSeverity = 0
- self.overSpeed = 0
- self.simFrame_collisionCount_list = []
- self.empty_flag = True
- if len(self.obj_id_list) > 1:
- # self.most_dangerous = {}
- # self.pass_percent = {}
- self._safe_metric_cal()
- self._overspeed_cal()
- # # no car following scene
- # if len(self.obj_id_list) > 1:
- # self.unsafe_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # self.unsafe_time_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # self.unsafe_dist_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # # self.unsafe_acce_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # self.unsafe_acce_drac_df = pd.DataFrame(
- # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # self.unsafe_acce_xtn_df = pd.DataFrame(
- # columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # self.unsafe_prob_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
- # self.most_dangerous = {}
- # self.pass_percent = {}
- # self._safe_param_cal_new()
- def _overspeed_cal(self):
- """
- :return:
- """
- self.overSpeed_list = self.ego_df[self.ego_df['v'] > OVERSPEED_THRESHOLD]['v'].values.tolist()
- self.overSpeed_time_list = self.ego_df[self.ego_df['v'] > OVERSPEED_THRESHOLD]['simTime'].values.tolist()
- if self.overSpeed_list:
- max_speed = max(self.overSpeed_list)
- overspeed = (max_speed - OVERSPEED_THRESHOLD) / OVERSPEED_THRESHOLD * 100
- self.overSpeed = round(overspeed, 2)
- else:
- self.overSpeed = 0
- def _safe_metric_cal(self):
- """
- """
- self.collisionRisk_list = []
- self.collision_dist_list = []
- self.collisionSeverity_list = []
- Tc = 0.3
- rho = 0.3 # 驾驶员制动反应时间
- ego_accel_max = 6 # 自车油门最大加速度
- obj_decel_max = 8 # 前车刹车最大减速度
- ego_decel_min = 1 # 自车刹车最小减速度 ug
- ego_decel_lon_max = 8
- ego_decel_lat_max = 1
- # 构建双层字典数据结构
- obj_dict = defaultdict(dict)
- obj_data_dict = self.df.to_dict('records')
- for item in obj_data_dict:
- obj_dict[item['simFrame']][item['playerId']] = item
- # print("obj_dict is", obj_dict)
- df_list = []
- EGO_PLAYER_ID = 1
- self.empty_flag = True
- # print("obj_dict[1] is", self.frame_list)
- for frame_num in self.frame_list:
- # ego_data = obj_dict[frame_num][EGO_PLAYER_ID]
- # ego_data = self.ego_df.iloc[frame_num]
- ego_data = self.ego_df[self.ego_df['simFrame'] == frame_num]
- # if frame_num == 1:
- v1 = ego_data['v'] # km/h to m/s
- # print("ego_data['posX'].values is", ego_data['posX'].values[0])
- x1 = ego_data['posX'].values[0]
- # print("x1 is", x1)
- y1 = ego_data['posY'].values[0]
- # print("x1 and y1 are", x1, y1)
- h1 = ego_data['yaw']
- # len1 = 2.3 # 随手录入的数值
- # width1 = ego_data['dimY'] #
- # o_x1 = ego_data['offX']
- v_x1 = ego_data['lon_v'].values[0] # km/h to m/s
- # v_y1 = ego_data['speedY'] / 3.6 # km/h to m/s
- v_y1 = 0 # 随手录入的数值
- a_x1 = ego_data['speedH'].values[0]
- a_y1 = 0 # 随手录入的数值
- # a_y1 = ego_data['accelY']
- # a1 = ego_data['accel']
- # print("calculate is beginning...")
- # print("self.obj_id_list is", self.obj_id_list)
- for playerId in self.obj_id_list:
- dist_list = []
- # print("playerId is", playerId)
- if playerId == EGO_PLAYER_ID:
- continue
- try:
- obj_data = obj_dict[frame_num][playerId]
- except KeyError:
- continue
- x2 = obj_data['posX']
- y2 = obj_data['posY']
- # print(" obj_data is", obj_data)
- # print("x2 is", y2)
- dist = self.dist(x1, y1, x2, y2)
- # print("dist is", dist)
- # DANGEROUS_DIST = 0.10 # 与障碍物距离小于此值即为危险
- if dist <= DANGEROUS_DIST:
- dist_list.append(0)
- else:
- dist_list.append(1)
- obj_data['dist'] = dist
- # print("obj_data['dist'] is", obj_data['dist'])
- v_x2 = obj_data['lon_v'] # km/h to m/s
- v_y2 = obj_data['lat_v'] # km/h to m/s
- v2 = math.sqrt(v_x2 ** 2 + v_y2 ** 2)
- # v2 = obj_data['v'] / 3.6 # km/h to m/s
- # h2 = obj_data['posH']
- # len2 = obj_data['dimX']
- # width2 = obj_data['dimY']
- # o_x2 = obj_data['offX']
- # a_x2 = obj_data['accelX']
- # a_y2 = obj_data['accelY']
- a_x2 = obj_data['lon_acc']
- a_y2 = obj_data['lat_acc']
- # a2 = obj_data['accel']
- dx, dy = x2 - x1, y2 - y1
- # 定义矢量A和x轴正向向量x
- A = np.array([dx, dy])
- # print("A is", A)
- x = np.array([1, 0])
- # 计算点积和向量长度
- dot_product = np.dot(A, x)
- vector_length_A = np.linalg.norm(A)
- vector_length_x = np.linalg.norm(x)
- # 计算夹角的余弦值
- cos_theta = dot_product / (vector_length_A * vector_length_x)
- # 将余弦值转换为角度值(弧度制)
- beta = np.arccos(cos_theta) # 如何通过theta正负确定方向
- lon_d = dist * math.cos(beta - h1)
- lat_d = abs(dist * math.sin(beta - h1)) # 需要增加左正右负的判断,但beta取值为[0,pi)
- obj_dict[frame_num][playerId]['lon_d'] = lon_d
- obj_dict[frame_num][playerId]['lat_d'] = lat_d
- if lon_d > 5 or lon_d < -2 or lat_d > 1:
- continue
- self.empty_flag = False
- vx, vy = v_x1 - v_x2, v_y1 - v_y2
- ax, ay = a_x2 - a_x1, a_y2 - a_y1
- v_ego_p = self._cal_v_ego_projection(dx, dy, v_x1, v_y1)
- v_obj_p = self._cal_v_ego_projection(dx, dy, v_x2, v_y2)
- vrel_projection_in_dist = self._cal_v_projection(dx, dy, vx, vy)
- arel_projection_in_dist = self._cal_a_projection(dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1,
- v_x2, v_y2)
- obj_dict[frame_num][playerId]['vrel_projection_in_dist'] = vrel_projection_in_dist
- obj_dict[frame_num][playerId]['arel_projection_in_dist'] = arel_projection_in_dist
- obj_dict[frame_num][playerId]['v_ego_projection_in_dist'] = v_ego_p
- obj_dict[frame_num][playerId]['v_obj_projection_in_dist'] = v_obj_p
- # obj_type = obj_data['type']
- TTC = self._cal_TTC(dist, vrel_projection_in_dist)
- # MTTC = self._cal_MTTC(TTC, vrel_projection_in_dist, arel_projection_in_dist)
- # THW = self._cal_THW(dist, v_ego_p)
- # 单车道时可用
- # LonSD = self._cal_longitudinal_safe_dist(v_ego_p, v_obj_p, rho, ego_accel_max, ego_decel_min,
- # obj_decel_max)
- # lat_dist = 0.5
- # v_right = v1
- # v_left = v2
- # a_right_lat_brake_min = 1
- # a_left_lat_brake_min = 1
- # a_lat_max = 5
- # LatSD = self._cal_lateral_safe_dist(lat_dist, v_right, v_left, rho, a_right_lat_brake_min,
- # a_left_lat_brake_min,
- # a_lat_max)
- # DRAC = self._cal_DRAC(dist, vrel_projection_in_dist, len1, len2, width1, width2, o_x1, o_x2)
- lon_a1 = a_x1 * math.cos(h1) + a_y1 * math.sin(h1)
- lon_a2 = a_x2 * math.cos(h1) + a_y2 * math.sin(h1)
- # lon_a = abs(lon_a1 - lon_a2)
- # lon_d = dist * abs(math.cos(beta - h1))
- # lon_v = v_x1 * math.cos(h1) + v_y1 * math.sin(h1)
- # BTN = self._cal_BTN_new(lon_a1, lon_a, lon_d, lon_v, ego_decel_lon_max)
- lat_a1 = a_x1 * math.sin(h1) * -1 + a_y1 * math.cos(h1)
- lat_a2 = a_x2 * math.sin(h1) * -1 + a_y2 * math.cos(h1)
- lat_a = abs(lat_a1 - lat_a2)
- lat_d = dist * abs(math.sin(beta - h1))
- lat_v = v_x1 * math.sin(h1) * -1 + v_y1 * math.cos(h1)
- # STN = self._cal_STN_new(TTC, lat_a1, lat_a, lat_d, lat_v, ego_decel_lat_max, width1, width2)
- obj_dict[frame_num][playerId]['lat_v_rel'] = v_x1 - v_x2
- obj_dict[frame_num][playerId]['lon_v_rel'] = v_y1 - v_y2
- # BTN = self.cal_BTN(a_y1, ay, dy, vy, max_ay)
- # STN = self.cal_STN(TTC, a_x1, ax, dx, vx, max_ax, len1, len2)
- TTC = None if (not TTC or TTC < 0) else TTC
- # MTTC = None if (not MTTC or MTTC < 0) else MTTC
- # THW = None if (not THW or THW < 0) else THW
- #
- # DRAC = 10 if DRAC >= 10 else DRAC
- # print("calculate is beginning...")
- if not TTC or TTC > 4000: # threshold = 4258.41
- collisionSeverity = 0
- pr_death = 0
- collisionRisk = 0
- else:
- result, error = spi.quad(self._normal_distribution, 0, TTC - Tc)
- collisionSeverity = 1 - result
- # print("collisionSeverity is", collisionSeverity)
- # pr_death = self._death_pr(obj_type, vrel_projection_in_dist)
- pr_death = self._death_pr(vrel_projection_in_dist)
- collisionRisk = 0.4 * pr_death + 0.6 * collisionSeverity
- self.collisionRisk_list.append(collisionRisk)
- self.simFrame_list.append(frame_num)
- self.collisionSeverity_list.append(collisionSeverity)
- self.collision_dist_list.append(max(dist_list))
- if len(self.collision_dist_list) > 0:
- prev_value = self.collision_dist_list[0] if len(self.collision_dist_list) > 0 else 0 # 初始化前一个元素为列表的第一个元素
- # 遍历列表(从第二个元素开始)
- # print("len(self.collision_dist_list) is", len(self.collision_dist_list))
- for i in range(1, len(self.collision_dist_list)):
- current_value = self.collision_dist_list[i] # 当前元素
- # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
- if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
- self.collisionCount += 1 # 增加0的段的计数
- self.simFrame_collisionCount_list.append(self.simFrame_list[i])
- self.collisionRisk = max(self.collisionRisk_list)
- self.collisionSeverity = max(self.collisionSeverity_list)
- else:
- self.collisionRisk = 0
- self.collisionSeverity = 0
- print("collision count is", self.collisionCount)
- print("collisionRisk is", self.collisionRisk)
- print("collisionSeverity is", self.collisionSeverity)
- def _cal_v_ego_projection(self, dx, dy, v_x1, v_y1):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 在 AB 连线上的速度 V1_on_AB
- V1_on_AB = v_x1 * U_ABx + v_y1 * U_ABy
- return V1_on_AB
- def _cal_v_projection(self, dx, dy, vx, vy):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的速度 V_relative
- # vx = vx1 - vx2
- # vy = vy1 - vy2
- # 计算 A 相对于 B 在 AB 连线上的速度 V_on_AB
- V_on_AB = vx * U_ABx + vy * U_ABy
- return V_on_AB
- def _cal_a_projection(self, dx, dy, vx, vy, ax, ay, x1, y1, x2, y2, v_x1, v_y1, v_x2, v_y2):
- # 计算 AB 连线的向量 AB
- # dx = x2 - x1
- # dy = y2 - y1
- # 计算 θ
- V_mod = math.sqrt(vx ** 2 + vy ** 2)
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- if V_mod == 0 or AB_mod == 0:
- return 0
- cos_theta = (vx * dx + vy * dy) / (V_mod * AB_mod)
- theta = math.acos(cos_theta)
- # 计算 AB 连线的模长 |AB|
- AB_mod = math.sqrt(dx ** 2 + dy ** 2)
- # 计算 AB 连线的单位向量 U_AB
- U_ABx = dx / AB_mod
- U_ABy = dy / AB_mod
- # 计算 A 相对于 B 的加速度 a_relative
- # ax = ax1 - ax2
- # ay = ay1 - ay2
- # 计算 A 相对于 B 在 AB 连线上的加速度 a_on_AB
- a_on_AB = ax * U_ABx + ay * U_ABy
- VA = np.array([v_x1, v_y1])
- VB = np.array([v_x2, v_y2])
- D_A = np.array([x1, y1])
- D_B = np.array([x2, y2])
- V_r = VA - VB
- V = np.linalg.norm(V_r)
- w = self._cal_relative_angular_v(theta, D_A, D_B, VA, VB)
- a_on_AB_back = self._calculate_derivative(a_on_AB, w, V, theta)
- return a_on_AB_back
- # 计算相对加速度
- def _calculate_derivative(self, a, w, V, theta):
- # 计算(V×cos(θ))'的值
- # derivative = a * math.cos(theta) - w * V * math.sin(theta)theta
- derivative = a - w * V * math.sin(theta)
- return derivative
- def _cal_relative_angular_v(self, theta, A, B, VA, VB):
- dx = A[0] - B[0]
- dy = A[1] - B[1]
- dvx = VA[0] - VB[0]
- dvy = VA[1] - VB[1]
- # (dx * dvy - dy * dvx)
- angular_velocity = math.sqrt(dvx ** 2 + dvy ** 2) * math.sin(theta) / math.sqrt(dx ** 2 + dy ** 2)
- return angular_velocity
- # def _death_pr(self, obj_type, v_relative):
- def _death_pr(self, v_relative):
- # if obj_type == 5:
- # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- # else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- return p_death
- # def _cal_collisionRisk_level(self, obj_type, v_relative, collisionSeverity):
- def _cal_collisionRisk_level(self, v_relative, collisionSeverity):
- # if obj_type == 5:
- # p_death = 1 / (1 + np.exp(7.723 - 0.15 * v_relative))
- # else:
- p_death = 1 / (1 + np.exp(8.192 - 0.12 * v_relative))
- collisionRisk = 0.4 * p_death + 0.6 * collisionSeverity
- return collisionRisk
- # 求两车之间当前距离
- def dist(self, x1, y1, x2, y2):
- dist = np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
- return dist
- def _cal_count_(self, dist):
- count = 0
- # TTC (time to collision)
- def _cal_TTC(self, dist, vrel_projection_in_dist):
- if vrel_projection_in_dist == 0:
- return math.inf
- TTC = dist / vrel_projection_in_dist
- return TTC
- def _normal_distribution(self, x):
- mean = 1.32
- std_dev = 0.26
- return (1 / (math.sqrt(std_dev * 2 * math.pi))) * math.exp(-0.5 * (x - mean) ** 2 / std_dev)
- def _safe_statistic(self):
- """
- """
- self.collision_risk_dict = _cal_max_min_avg(self.collisionRisk_list)
- self.collision_severity_dict = _cal_max_min_avg(self.collisionSeverity_list)
- self.over_speed_dict = _cal_max_min_avg(self.overSpeed_list)
- arr_safe = [[self.collisionCount, self.collisionRisk, self.collisionSeverity, self.overSpeed]]
- return arr_safe
- def safe_score_new(self):
- """
- """
- score_metric_dict = {}
- score_type_dict = {}
- arr_safe = self._safe_statistic()
- print("\n[安全性表现及得分情况]")
- print("安全性各指标值:", [[round(num, 2) for num in row] for row in arr_safe])
- if arr_safe:
- arr_safe = np.array(arr_safe)
- score_model = self.scoreModel(self.kind_list, self.optimal_list, self.multiple_list, arr_safe)
- score_sub = score_model.cal_score()
- metric_list = [x for x in self.metric_list if x in self.config.builtinMetricList]
- # metric_num = len(metric_list)
- if len(self.obj_id_list) > 1 and not self.empty_flag:
- # score_sub[-metric_num:] = [num * 1.25 for num in score_sub[-metric_num:]]
- pass
- else:
- score_sub = [100] * len(score_sub)
- score_sub = list(map(lambda x: 100 if np.isnan(x) else x, score_sub)) # 对None值做特判
- score_metric = score_sub
- score_metric_dict = {key: value for key, value in zip(metric_list, score_metric)}
- score_metric_dict = {key: score_metric_dict[key] for key in self.metric_list}
- if self.collisionCount > 0:
- score_metric_dict["collisionCount"] = 0
- else:
- score_metric_dict["collisionCount"] = 100
- # print("score_metric_dict is", score_metric_dict)
- score_metric = list(score_metric_dict.values())
- if self.weight_custom: # 用户自定义权重
- score_metric_with_weight_dict = {key: score_metric_dict[key] * self.weight_dict[key] for key in
- self.weight_dict}
- for type in self.type_list:
- type_score = sum(
- value for key, value in score_metric_with_weight_dict.items() if key in self.metric_dict[type])
- print("safe type_score is", type_score)
- score_type_dict[type] = round(type_score, 2)
- score_type_with_weight_dict = {key: score_type_dict[key] * self.weight_type_dict[key] for key in
- score_type_dict}
- score_safe = sum(score_type_with_weight_dict.values())
- else: # 动态客观赋权
- self.weight_list = cal_weight_from_80(score_metric)
- self.weight_dict = {key: value for key, value in zip(self.metric_list, self.weight_list)}
- score_safe = cal_score_with_priority(score_metric, self.weight_list, self.priority_list)
- for type in self.type_list:
- type_weight = sum(value for key, value in self.weight_dict.items() if key in self.metric_dict[type])
- for key, value in self.weight_dict.items():
- if key in self.metric_dict[type]:
- # self.weight_dict[key] = round(value / type_weight, 4)
- self.weight_dict[key] = value / type_weight
- type_score_metric = [value for key, value in score_metric_dict.items() if key in self.metric_dict[type]]
- type_weight_list = [value for key, value in self.weight_dict.items() if key in self.metric_dict[type]]
- type_priority_list = [value for key, value in self.priority_dict.items() if
- key in self.metric_dict[type]]
- type_score = cal_score_with_priority(type_score_metric, type_weight_list, type_priority_list)
- score_type_dict[type] = round(type_score, 2) if type_score < 100 else 100
- for key in self.weight_dict:
- self.weight_dict[key] = round(self.weight_dict[key], 4)
- score_type = list(score_type_dict.values())
- self.weight_type_list = cal_weight_from_80(score_type)
- self.weight_type_dict = {key: value for key, value in zip(self.type_list, self.weight_type_list)}
- score_safe = round(score_safe, 2)
- # score_type = [round(x, 2) for key, x in score_type_dict.items()]
- # score_metric = [round(x, 2) for key, x in score_metric_dict.items()]
- print("安全性各指标基准值:", self.optimal_list)
- print(f"安全性得分为:{score_safe:.2f}分。")
- print(f"安全性各类型得分为:{score_type_dict}。")
- print(f"安全性各指标得分为:{score_metric_dict}。")
- # return score_safe, score_type, score_metric
- return score_safe, score_type_dict, score_metric_dict
- def report_statistic(self):
- simTime_time_list_max = []
- simTime_time_list_min = []
- report_dict = {
- "name": "安全性",
- "weight": f"{self.weight * 100:.2f}%",
- # 'collisionRisk': self.collisionRisk,
- # "noObjectCar": True,
- }
- # No objects
- if self.obj_id_list == 0:
- score_safe = 80
- grade_safe = "良好"
- score_type_dict = {
- "safeCollision": 80,
- "safeSpeed": 80
- }
- score_metric_dict = {
- "collisionCount": 80,
- "collisionRisk": 80,
- "collisionSeverity": 80,
- "overSpeed": 80
- }
- description = "安全性在无目标车情况下表现良好。"
- description1 = "次数:0次"
- description2 = f"最大值0.00%;" \
- f"最小值0.00%;" \
- f"平均值0.00%"
- description3 = f"最大值0.00%;" \
- f"最小值0.00%;" \
- f"平均值0.00%"
- description4 = f"最大值{100 * self.over_speed_dict['max']}%;" \
- f"最小值{100 * self.over_speed_dict['min']}%;" \
- f"平均值{100 * self.over_speed_dict['avg']}%"
- # with objects
- else:
- score_safe, score_type_dict, score_metric_dict = self.safe_score_new()
- score_safe = int(score_safe) if int(score_safe) == score_safe else round(score_safe, 2)
- grade_safe = score_grade(score_safe)
- description = f"· 在安全性方面,得分{score_safe:.2f}分,表现{grade_safe}。"
- is_good = True
- is_semicolon = False
- if self.collisionCount > 0:
- is_good = False
- is_semicolon = True
- description += f"出现{self.collisionCount}次碰撞,需重点优化" if score_safe < 80 else f"出现{self.collisionCount}次碰撞,需注意"
- if self.over_speed_dict['max'] != '-':
- if self.over_speed_dict['max'] > 0:
- is_good = False
- if is_semicolon:
- description += ";"
- is_semicolon = False
- description += f"超速比例最大值{self.over_speed_dict['max']:.2f}%,需重点优化。" if score_safe < 80 else f"超速比例最大值{self.over_speed_dict['max']:.2f}%,需注意"
- else:
- if is_semicolon:
- description += f"。"
- if is_good:
- description += "速度控制良好。"
- if self.collisionCount == 0:
- description1 = f"次数:{self.collisionCount}次\n没有发生碰撞"
- else:
- self.simtime_collisioncount = \
- self.ego_df.loc[self.ego_df['simFrame'].isin(list(set(self.simFrame_collisionCount_list)))][
- 'simTime'].tolist()
- self.simtime_collisioncount = [str("{:.02f}".format(s)) for s in self.simtime_collisioncount]
- str_temp_collisioncount = "、".join(self.simtime_collisioncount)
- description1 = f"次数:{self.collisionCount}次\n在{str_temp_collisioncount}时刻发生碰撞"
- if all(x == 0 for x in self.collisionRisk_list):
- pass
- else:
- index_risk_max = [i for i, x in enumerate(self.collisionRisk_list) if x == max(self.collisionRisk_list)]
- index_risk_min = [i for i, x in enumerate(self.collisionRisk_list) if x == min(self.collisionRisk_list)]
- print("self.simFrame_list[index_risk_max] is", self.simFrame_list)
- simframe_risk_max = list(set([self.simFrame_list[x] for x in index_risk_max]))
- simframe_risk_min = list(set([self.simFrame_list[y] for y in index_risk_min]))
- print("simframe_risk_max is", simframe_risk_max)
- self.simtime_risk_max = self.ego_df.loc[self.ego_df['simFrame'].isin(simframe_risk_max)][
- 'simTime'].tolist()
- self.simtime_risk_min = self.ego_df.loc[self.ego_df['simFrame'].isin(simframe_risk_min)][
- 'simTime'].tolist()
- self.simtime_risk_max = [str("{:.02f}".format(s)) + "s" for s in self.simtime_risk_max]
- self.simtime_risk_min = [str("{:.02f}".format(ss)) + "s" for ss in self.simtime_risk_min[:2]]
- str_temp_risk_max = "、".join(self.simtime_risk_max)
- str_temp_risk_min = "、".join(self.simtime_risk_min)
- if len(str_temp_risk_max) > 0:
- description2 = f"在{str_temp_risk_max}时刻达到"
- else:
- description2 = ""
- description2 += f"最大值{self.collision_risk_dict['max']:.2f}%\n" if (
- self.collision_risk_dict['max'] != '-') else "最大值0.00%\n"
- if self.collision_risk_dict['min'] != '-' and self.collision_risk_dict['min'] > 0.1:
- if len(str_temp_risk_min) > 0:
- description2 += f"在{str_temp_risk_min}时刻达到"
- description2 += f"最小值{self.collision_risk_dict['min']:.2f}%\n"
- description2 += f"平均值{self.collision_risk_dict['avg']:.2f}%" if (
- self.collision_risk_dict['avg'] != '-') else "平均值0.00%"
- if all(x == 0 for x in self.collisionSeverity_list):
- pass
- else:
- index_severity_max = [i for i, x in enumerate(self.collisionSeverity_list) if
- x == max(self.collisionSeverity_list)]
- index_severity_min = [i for i, x in enumerate(self.collisionSeverity_list) if
- x == min(self.collisionSeverity_list)]
- simframe_severity_max = list(set([self.simFrame_list[x] for x in index_severity_max]))
- simframe_severity_min = list(set([self.simFrame_list[y] for y in index_severity_min]))
- self.simtime_severity_max = self.ego_df.loc[self.ego_df['simFrame'].isin(simframe_severity_max)][
- 'simTime'].tolist()
- self.simtime_severity_min = self.ego_df.loc[self.ego_df['simFrame'].isin(simframe_severity_min)][
- 'simTime'].tolist()
- if len(self.simtime_severity_max) > 2:
- self.simtime_severity_max = [str("{:.02f}".format(s)) + "s" for s in self.simtime_severity_max[:2]]
- else:
- self.simtime_severity_max = [str("{:.02f}".format(s)) + "s" for s in self.simtime_severity_max]
- self.simtime_severity_min = [str("{:.02f}".format(ss)) + "s" for ss in self.simtime_severity_min[:2]]
- str_temp_severity_max = "、".join(self.simtime_severity_max)
- str_temp_severity_min = "、".join(self.simtime_severity_min)
- if len(str_temp_severity_max) > 0:
- description3 = f"在{str_temp_severity_max}时刻达到"
- else:
- description3 = ""
- description3 += f"最大值{self.collision_severity_dict['max']:.2f}%\n" if (
- self.collision_severity_dict['max'] != '-') else "最大值0.00%\n"
- if self.collision_severity_dict['min'] != '-' and self.collision_severity_dict['min'] > 0.1:
- if len(str_temp_severity_min) > 0:
- description3 += f"在{str_temp_severity_min}时刻达到"
- description3 += f"最小值{self.collision_severity_dict['min']:.2f}%\n"
- description3 += f"平均值{self.collision_severity_dict['avg']:.2f}%" if (
- self.collision_severity_dict['avg'] != '-') else "平均值0.00%"
- index_speed_max = [i for i, x in enumerate(self.overSpeed_list) if x == max(self.overSpeed_list)]
- index_speed_min = [i for i, x in enumerate(self.overSpeed_list) if x == min(self.overSpeed_list)]
- if len(index_speed_max) > 2:
- for ii in index_speed_max[:2]:
- simTime_time_list_max.append(str("{:.02f}".format(self.overSpeed_time_list[ii])) + "s")
- else:
- for ii in index_speed_max:
- simTime_time_list_max.append(str("{:.02f}".format(self.overSpeed_time_list[ii])) + "s")
- if len(index_speed_min) > 2:
- for xx in index_speed_min[:2]:
- simTime_time_list_min.append(str("{:.02f}".format(self.overSpeed_time_list[xx])) + "s")
- else:
- for xx in index_speed_min:
- simTime_time_list_min.append(str("{:.02f}".format(self.overSpeed_time_list[xx])) + "s")
- str_temp_overspeed_max = "、".join(simTime_time_list_max)
- str_temp_overspeed_min = "、".join(simTime_time_list_min)
- if len(simTime_time_list_max) > 0:
- description4 = f"在{str_temp_overspeed_max}时刻达到"
- else:
- description4 = ""
- description4 += f"最大值{self.over_speed_dict['max']:.2f}%\n" if (
- self.over_speed_dict['max'] != '-') else "最大值0.00%\n"
- if self.over_speed_dict['min'] != '-':
- if self.over_speed_dict['min'] > 0:
- if len(simTime_time_list_min) > 0:
- description4 += f"在{str_temp_overspeed_min}时刻达到"
- description4 += f"最小值{self.over_speed_dict['min']:.2f}%\n"
- else:
- description4 += f"最小值0.00%\n"
- description4 += f"平均值{self.over_speed_dict['avg']:.2f}%" if (
- self.over_speed_dict['avg'] != '-') else "平均值0.00%"
- report_dict["score"] = score_safe
- report_dict["level"] = grade_safe
- report_dict["description"] = replace_key_with_value(description, self.name_dict)
- collisionCount_index = {
- "weight": self.weight_dict['collisionCount'],
- "score": score_metric_dict['collisionCount'],
- "description": description1
- }
- collisionRisk_index = {
- "weight": self.weight_dict['collisionRisk'],
- "score": score_metric_dict['collisionRisk'],
- "description": description2
- }
- collisionSeverity_index = {
- "weight": self.weight_dict['collisionSeverity'],
- "score": score_metric_dict['collisionSeverity'],
- "description": description3
- }
- overSpeed_index = {
- "weight": self.weight_dict['overSpeed'],
- "score": score_metric_dict['overSpeed'],
- "description": description4
- }
- indexes_dict = {
- "collisionCount": collisionCount_index,
- "collisionRisk": collisionRisk_index,
- "collisionSeverity": collisionSeverity_index,
- "overSpeed": overSpeed_index
- }
- report_dict["indexes"] = indexes_dict
- print(report_dict)
- return report_dict
- # if __name__ == "__main__":
- # pass
- #
- # print("---started---")
- # count = 0
- # dataPath = './task/case0613' # 设置路径
- # configPath = r"./config_safe.json"
- # config = ConfigParse(configPath)
- # data_processed = DataProcess(dataPath, config) # 数据处理,包括objectstate这些
- # print("data_processed is", data_processed.object_df.playerId) # dir(data_processed)可以查看data_processed的数据属性
- # safe_indicator = Safe(data_processed)
- # collisionRisk_list, collisionSeverity_list, collision_dist_count = safe_indicator._safe_param_cal_new()
- #
- # prev_value = collision_dist_count[0] # 初始化前一个元素为列表的第一个元素
- #
- # # 遍历列表(从第二个元素开始)
- # for i in range(1, len(collision_dist_count)):
- # current_value = collision_dist_count[i] # 当前元素
- #
- # # 如果当前元素是0且与前一个元素不同,或者这是第一个元素且为0
- # if (current_value == 0 and prev_value != 0) or (i == 0 and current_value == 0):
- # count += 1 # 增加0的段的计数
- # print("collisionRisk is", max(collisionRisk_list))
- # print("collisionSeverity is", max(collisionSeverity_list))
- # print("collision count is", count)
- # print("---end---")
|