#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2024 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn) @Data: 2024/09/20 @Last Modified: 2024/09/20 @Summary: Evaluation functions """ class ACCTrigger(object): def __init__(self, df_vehState): self.df_vehState = df_vehState def find_start_end_time(self, change_speed_time_list): start_end_time_list = [] # 初始化一个变量来跟踪当前序列的开始值 # 和一个变量来跟踪当前序列的长度 start_of_sequence = None sequence_length = 0 # 遍历sim_times列表(从第二个元素开始比较) for i in range(1, len(change_speed_time_list)): # 检查当前元素与上一个元素的差是否接近0.04(考虑浮点数精度) if abs(change_speed_time_list[i] - change_speed_time_list[i - 1] - 0.01) < 0.04: # 如果这是序列的开始,则记录开始值 if start_of_sequence is None: start_of_sequence = change_speed_time_list[i - 1] # 增加序列长度 sequence_length += 1 else: # 如果序列中断且长度足够(例如,至少为2),则添加序列到结果列表中 if start_of_sequence is not None and sequence_length >= 0: start_end_time_list.append([start_of_sequence, change_speed_time_list[i - 1]]) # 重置开始值和序列长度 start_of_sequence = None sequence_length = 0 # 处理最后一个序列(如果存在且长度足够) if start_of_sequence is not None and sequence_length >= 1: start_end_time_list.append([start_of_sequence, change_speed_time_list[-1]]) return start_end_time_list def ACC_active_time_statistics(self): start_end_time_dict = {} ACC_STATUS = "Active" ACC_time = self.df_vehState[self.df_vehState['ACC_status'] == ACC_STATUS]['simTime'].tolist() ACC_start_end_time_list = self.find_start_end_time(ACC_time) start_end_time_dict['ACC_active_time'] = ACC_start_end_time_list return start_end_time_dict class LKATrigger(object): def __init__(self, df_vehState): self.df_vehState = df_vehState def lane_keep_warning_status(self, left_or_right): ''' :return: start_vehstate: 状态机开始的时间,以列表形式存储 close_vehstate:状态机结束的时间,以列表形式存储 ''' groups = (self.df_vehState['LKA_status'] != self.df_vehState['LKA_status'].shift()).cumsum() LKA_left_groups = self.df_vehState[self.df_vehState['LKA_status'] == left_or_right].groupby(groups) # 初始化结果列表 result_times = [] # 使用enumerate来跟踪分组的顺序(模拟全局索引的奇偶性) for idx, (name, group) in enumerate(LKA_left_groups, start=1): result_time = [] # 获取片段的simTime值 sim_times = group['simTime'].tolist() # 根据分组的奇偶性(即这里的idx)选择simTime # if idx % 2 == 0: # 偶数组,选择第一个 if sim_times: # 确保列表不为空 result_time.append(sim_times[-1]) result_time.append(sim_times[-1]) result_times.append(result_time) return result_times def LKA_active_time_statistics(self): warning_status_time_dict = {} lka_warning = "Active" # left_warning = 1 # right_warning = 2 LKA_vehstate = self.lane_keep_warning_status(lka_warning) # LKA_left_vehstate = self.lane_keep_warning_status(left_warning) # LKA_right_vehstate = self.lane_keep_warning_status(right_warning) warning_status_time_dict['LKA_active_time'] = LKA_vehstate # warning_status_time_dict['LKA_left_active_time'] = LKA_left_vehstate # warning_status_time_dict['LKA_right_active_time'] = LKA_right_vehstate return warning_status_time_dict class ICATrigger(): def __init__(self, df_vehState): self.df_vehState = df_vehState def ICA_active_time_statistics(self): ICA_start_end_time_dict = {} # 寻找ICA相应状态机下的simTime和set_cruise_speed数据 ICA_CONSTANT_SPEED = "LLC_Follow_Line" ICA_FOLLOW_CAR = "LLC_Follow_Vehicle" ICA_LONGITUDINAL = "Only_Longitudinal_Control" ICA_LATERAL = ["LLC_Follow_Line", "LLC_Follow_Vehicle"] ICA_ACTIVE = ["LLC_Follow_Line", "LLC_Follow_Vehicle", "Only_Longitudinal_Control"] constant_speed_time = self.df_vehState[self.df_vehState['ICA_status'] == ICA_CONSTANT_SPEED]['simTime'].tolist() follow_car_time = self.df_vehState[self.df_vehState['ICA_status'] == ICA_FOLLOW_CAR]['simTime'].tolist() longitudinal_time_list = self.df_vehState[self.df_vehState['ICA_status'] == ICA_LONGITUDINAL][ 'simTime'].tolist() lateral_time_list = self.df_vehState[self.df_vehState['ICA_status'].isin(ICA_LATERAL)]['simTime'].tolist() active_time_list = self.df_vehState[self.df_vehState['ICA_status'].isin(ICA_ACTIVE)]['simTime'].tolist() ACC = ACCTrigger(self.df_vehState) ICA_speed_start_end_time_list = ACC.find_start_end_time(constant_speed_time) ICA_dist_start_end_time_list = ACC.find_start_end_time(follow_car_time) ICA_longitudinal_start_end_time_list = ACC.find_start_end_time(longitudinal_time_list) ICA_lateral_start_end_time_list = ACC.find_start_end_time(lateral_time_list) ICA_active_start_end_time_list = ACC.find_start_end_time(active_time_list) ICA_start_end_time_dict['ICA_cruise_time'] = ICA_speed_start_end_time_list ICA_start_end_time_dict['ICA_follow_time'] = ICA_dist_start_end_time_list ICA_start_end_time_dict['ICA_Longitudinal_time'] = ICA_longitudinal_start_end_time_list ICA_start_end_time_dict['ICA_Lateral_time'] = ICA_lateral_start_end_time_list ICA_start_end_time_dict['ICA_active_time'] = ICA_active_start_end_time_list return ICA_start_end_time_dict