123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2024 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2024/09/20
- @Last Modified: 2024/09/20
- @Summary: Evaluation functions
- """
- class ACCTrigger(object):
- def __init__(self, df_vehState):
- self.df_vehState = df_vehState
- def find_start_end_time(self, change_speed_time_list):
- start_end_time_list = []
- # 初始化一个变量来跟踪当前序列的开始值
- # 和一个变量来跟踪当前序列的长度
- start_of_sequence = None
- sequence_length = 0
- # 遍历sim_times列表(从第二个元素开始比较)
- for i in range(1, len(change_speed_time_list)):
- # 检查当前元素与上一个元素的差是否接近0.04(考虑浮点数精度)
- if abs(change_speed_time_list[i] - change_speed_time_list[i - 1] - 0.01) < 0.04:
- # 如果这是序列的开始,则记录开始值
- if start_of_sequence is None:
- start_of_sequence = change_speed_time_list[i - 1]
- # 增加序列长度
- sequence_length += 1
- else:
- # 如果序列中断且长度足够(例如,至少为2),则添加序列到结果列表中
- if start_of_sequence is not None and sequence_length >= 0:
- start_end_time_list.append([start_of_sequence, change_speed_time_list[i - 1]])
- # 重置开始值和序列长度
- start_of_sequence = None
- sequence_length = 0
- # 处理最后一个序列(如果存在且长度足够)
- if start_of_sequence is not None and sequence_length >= 1:
- start_end_time_list.append([start_of_sequence, change_speed_time_list[-1]])
- return start_end_time_list
- def ACC_active_time_statistics(self):
- start_end_time_dict = {}
- ACC_STATUS = "Active"
- ACC_time = self.df_vehState[self.df_vehState['ACC_status'] == ACC_STATUS]['simTime'].tolist()
- ACC_start_end_time_list = self.find_start_end_time(ACC_time)
- start_end_time_dict['ACC_active_time'] = ACC_start_end_time_list
- return start_end_time_dict
- class LKATrigger(object):
- def __init__(self, df_vehState):
- self.df_vehState = df_vehState
- def lane_keep_warning_status(self, left_or_right):
- '''
- :return:
- start_vehstate: 状态机开始的时间,以列表形式存储
- close_vehstate:状态机结束的时间,以列表形式存储
- '''
- groups = (self.df_vehState['LKA_status'] != self.df_vehState['LKA_status'].shift()).cumsum()
- LKA_left_groups = self.df_vehState[self.df_vehState['LKA_status'] == left_or_right].groupby(groups)
- # 初始化结果列表
- result_times = []
- # 使用enumerate来跟踪分组的顺序(模拟全局索引的奇偶性)
- for idx, (name, group) in enumerate(LKA_left_groups, start=1):
- result_time = []
- # 获取片段的simTime值
- sim_times = group['simTime'].tolist()
- # 根据分组的奇偶性(即这里的idx)选择simTime
- # if idx % 2 == 0:
- # 偶数组,选择第一个
- if sim_times: # 确保列表不为空
- result_time.append(sim_times[-1])
- result_time.append(sim_times[-1])
- result_times.append(result_time)
- return result_times
- def LKA_active_time_statistics(self):
- warning_status_time_dict = {}
- lka_warning = "Active"
- # left_warning = 1
- # right_warning = 2
- LKA_vehstate = self.lane_keep_warning_status(lka_warning)
- # LKA_left_vehstate = self.lane_keep_warning_status(left_warning)
- # LKA_right_vehstate = self.lane_keep_warning_status(right_warning)
- warning_status_time_dict['LKA_active_time'] = LKA_vehstate
- # warning_status_time_dict['LKA_left_active_time'] = LKA_left_vehstate
- # warning_status_time_dict['LKA_right_active_time'] = LKA_right_vehstate
- return warning_status_time_dict
- class ICATrigger():
- def __init__(self, df_vehState):
- self.df_vehState = df_vehState
- def ICA_active_time_statistics(self):
- ICA_start_end_time_dict = {}
- # 寻找ICA相应状态机下的simTime和set_cruise_speed数据
- ICA_CONSTANT_SPEED = "LLC_Follow_Line"
- ICA_FOLLOW_CAR = "LLC_Follow_Vehicle"
- ICA_LONGITUDINAL = "Only_Longitudinal_Control"
- ICA_LATERAL = ["LLC_Follow_Line", "LLC_Follow_Vehicle"]
- ICA_ACTIVE = ["LLC_Follow_Line", "LLC_Follow_Vehicle", "Only_Longitudinal_Control"]
- constant_speed_time = self.df_vehState[self.df_vehState['ICA_status'] == ICA_CONSTANT_SPEED]['simTime'].tolist()
- follow_car_time = self.df_vehState[self.df_vehState['ICA_status'] == ICA_FOLLOW_CAR]['simTime'].tolist()
- longitudinal_time_list = self.df_vehState[self.df_vehState['ICA_status'] == ICA_LONGITUDINAL][
- 'simTime'].tolist()
- lateral_time_list = self.df_vehState[self.df_vehState['ICA_status'].isin(ICA_LATERAL)]['simTime'].tolist()
- active_time_list = self.df_vehState[self.df_vehState['ICA_status'].isin(ICA_ACTIVE)]['simTime'].tolist()
- ACC = ACCTrigger(self.df_vehState)
- ICA_speed_start_end_time_list = ACC.find_start_end_time(constant_speed_time)
- ICA_dist_start_end_time_list = ACC.find_start_end_time(follow_car_time)
- ICA_longitudinal_start_end_time_list = ACC.find_start_end_time(longitudinal_time_list)
- ICA_lateral_start_end_time_list = ACC.find_start_end_time(lateral_time_list)
- ICA_active_start_end_time_list = ACC.find_start_end_time(active_time_list)
- ICA_start_end_time_dict['ICA_cruise_time'] = ICA_speed_start_end_time_list
- ICA_start_end_time_dict['ICA_follow_time'] = ICA_dist_start_end_time_list
- ICA_start_end_time_dict['ICA_Longitudinal_time'] = ICA_longitudinal_start_end_time_list
- ICA_start_end_time_dict['ICA_Lateral_time'] = ICA_lateral_start_end_time_list
- ICA_start_end_time_dict['ICA_active_time'] = ICA_active_start_end_time_list
- return ICA_start_end_time_dict
|