123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2024 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
- @Data: 2024/09/20
- @Last Modified: 2024/09/20
- @Summary: Evaluation functions
- """
- class ACCTrigger(object):
- def __init__(self, df_vehState):
- self.df_vehState = df_vehState
- def find_start_end_time(self, change_speed_time_list):
- start_end_time_list = []
- # 初始化一个变量来跟踪当前序列的开始值
- # 和一个变量来跟踪当前序列的长度
- start_of_sequence = None
- sequence_length = 0
- # 遍历sim_times列表(从第二个元素开始比较)
- for i in range(1, len(change_speed_time_list)):
- # 检查当前元素与上一个元素的差是否接近0.04(考虑浮点数精度)
- if abs(change_speed_time_list[i] - change_speed_time_list[i - 1] - 0.01) < 0.04:
- # 如果这是序列的开始,则记录开始值
- if start_of_sequence is None:
- start_of_sequence = change_speed_time_list[i - 1]
- # 增加序列长度
- sequence_length += 1
- else:
- # 如果序列中断且长度足够(例如,至少为2),则添加序列到结果列表中
- if start_of_sequence is not None and sequence_length >= 0:
- start_end_time_list.append([start_of_sequence, change_speed_time_list[i - 1]])
- # 重置开始值和序列长度
- start_of_sequence = None
- sequence_length = 0
- # 处理最后一个序列(如果存在且长度足够)
- if start_of_sequence is not None and sequence_length >= 2:
- start_end_time_list.append([start_of_sequence, change_speed_time_list[-1]])
- return start_end_time_list
- def ACC_active_time_statistics(self):
- start_end_time_dict = {}
- ACC_STATUS = "Active"
- ACC_time = self.df_vehState[self.df_vehState['ACC_status'] == ACC_STATUS]['simTime'].tolist()
- ACC_start_end_time_list = self.find_start_end_time(ACC_time)
- start_end_time_dict['ACC_active_time'] = ACC_start_end_time_list
- return start_end_time_dict
- class LKATrigger(object):
- def __init__(self, df_vehState):
- self.df_vehState = df_vehState
- def lane_keep_warning_status(self, left_or_right):
- '''
- :return:
- start_vehstate: 状态机开始的时间,以列表形式存储
- close_vehstate:状态机结束的时间,以列表形式存储
- '''
- groups = (self.df_vehState['LKA_status'] != self.df_vehState['LKA_status'].shift()).cumsum()
- LKA_left_groups = self.df_vehState[self.df_vehState['LKA_status'] == left_or_right].groupby(groups)
- # 初始化结果列表
- result_times = []
- # 使用enumerate来跟踪分组的顺序(模拟全局索引的奇偶性)
- for idx, (name, group) in enumerate(LKA_left_groups, start=1):
- result_time = []
- # 获取片段的simTime值
- sim_times = group['simTime'].tolist()
- # 根据分组的奇偶性(即这里的idx)选择simTime
- # if idx % 2 == 0:
- # 偶数组,选择第一个
- if sim_times: # 确保列表不为空
- result_time.append(sim_times[-1])
- result_time.append(sim_times[-1])
- result_times.append(result_time)
- return result_times
- def LKA_warning_active_time_statistics(self):
- warning_status_time_dict = {}
- lka_warning = "Active"
- # left_warning = 1
- # right_warning = 2
- LKA_vehstate = self.lane_keep_warning_status(lka_warning)
- # LKA_left_vehstate = self.lane_keep_warning_status(left_warning)
- # LKA_right_vehstate = self.lane_keep_warning_status(right_warning)
- warning_status_time_dict['LKA_active_time'] = LKA_vehstate
- # warning_status_time_dict['LKA_left_active_time'] = LKA_left_vehstate
- # warning_status_time_dict['LKA_right_active_time'] = LKA_right_vehstate
- return warning_status_time_dict
|