status_trigger.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2024 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: xieguijin(xieguijin@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2024/09/20
  11. @Last Modified: 2024/09/20
  12. @Summary: Evaluation functions
  13. """
  14. class ACCTrigger(object):
  15. def __init__(self, df_vehState):
  16. self.df_vehState = df_vehState
  17. def find_start_end_time(self, change_speed_time_list):
  18. start_end_time_list = []
  19. # 初始化一个变量来跟踪当前序列的开始值
  20. # 和一个变量来跟踪当前序列的长度
  21. start_of_sequence = None
  22. sequence_length = 0
  23. # 遍历sim_times列表(从第二个元素开始比较)
  24. for i in range(1, len(change_speed_time_list)):
  25. # 检查当前元素与上一个元素的差是否接近0.04(考虑浮点数精度)
  26. if abs(change_speed_time_list[i] - change_speed_time_list[i - 1] - 0.01) < 0.04:
  27. # 如果这是序列的开始,则记录开始值
  28. if start_of_sequence is None:
  29. start_of_sequence = change_speed_time_list[i - 1]
  30. # 增加序列长度
  31. sequence_length += 1
  32. else:
  33. # 如果序列中断且长度足够(例如,至少为2),则添加序列到结果列表中
  34. if start_of_sequence is not None and sequence_length >= 0:
  35. start_end_time_list.append([start_of_sequence, change_speed_time_list[i - 1]])
  36. # 重置开始值和序列长度
  37. start_of_sequence = None
  38. sequence_length = 0
  39. # 处理最后一个序列(如果存在且长度足够)
  40. if start_of_sequence is not None and sequence_length >= 1:
  41. start_end_time_list.append([start_of_sequence, change_speed_time_list[-1]])
  42. return start_end_time_list
  43. def ACC_active_time_statistics(self):
  44. start_end_time_dict = {}
  45. ACC_STATUS = "Active"
  46. ACC_time = self.df_vehState[self.df_vehState['ACC_status'] == ACC_STATUS]['simTime'].tolist()
  47. ACC_start_end_time_list = self.find_start_end_time(ACC_time)
  48. start_end_time_dict['ACC_active_time'] = ACC_start_end_time_list
  49. return start_end_time_dict
  50. class LKATrigger(object):
  51. def __init__(self, df_vehState):
  52. self.df_vehState = df_vehState
  53. def lane_keep_warning_status(self, left_or_right):
  54. '''
  55. :return:
  56. start_vehstate: 状态机开始的时间,以列表形式存储
  57. close_vehstate:状态机结束的时间,以列表形式存储
  58. '''
  59. groups = (self.df_vehState['LKA_status'] != self.df_vehState['LKA_status'].shift()).cumsum()
  60. LKA_left_groups = self.df_vehState[self.df_vehState['LKA_status'] == left_or_right].groupby(groups)
  61. # 初始化结果列表
  62. result_times = []
  63. # 使用enumerate来跟踪分组的顺序(模拟全局索引的奇偶性)
  64. for idx, (name, group) in enumerate(LKA_left_groups, start=1):
  65. result_time = []
  66. # 获取片段的simTime值
  67. sim_times = group['simTime'].tolist()
  68. # 根据分组的奇偶性(即这里的idx)选择simTime
  69. # if idx % 2 == 0:
  70. # 偶数组,选择第一个
  71. if sim_times: # 确保列表不为空
  72. result_time.append(sim_times[-1])
  73. result_time.append(sim_times[-1])
  74. result_times.append(result_time)
  75. return result_times
  76. def LKA_active_time_statistics(self):
  77. warning_status_time_dict = {}
  78. lka_warning = "Active"
  79. # left_warning = 1
  80. # right_warning = 2
  81. LKA_vehstate = self.lane_keep_warning_status(lka_warning)
  82. # LKA_left_vehstate = self.lane_keep_warning_status(left_warning)
  83. # LKA_right_vehstate = self.lane_keep_warning_status(right_warning)
  84. warning_status_time_dict['LKA_active_time'] = LKA_vehstate
  85. # warning_status_time_dict['LKA_left_active_time'] = LKA_left_vehstate
  86. # warning_status_time_dict['LKA_right_active_time'] = LKA_right_vehstate
  87. return warning_status_time_dict
  88. class ICATrigger():
  89. def __init__(self, df_vehState):
  90. self.df_vehState = df_vehState
  91. def ICA_active_time_statistics(self):
  92. ICA_start_end_time_dict = {}
  93. # 寻找ICA相应状态机下的simTime和set_cruise_speed数据
  94. ICA_CONSTANT_SPEED = "LLC_Follow_Line"
  95. ICA_FOLLOW_CAR = "LLC_Follow_Vehicle"
  96. ICA_LONGITUDINAL = "Only_Longitudinal_Control"
  97. ICA_LATERAL = ["LLC_Follow_Line", "LLC_Follow_Vehicle"]
  98. ICA_ACTIVE = ["LLC_Follow_Line", "LLC_Follow_Vehicle", "Only_Longitudinal_Control"]
  99. constant_speed_time = self.df_vehState[self.df_vehState['ICA_status'] == ICA_CONSTANT_SPEED]['simTime'].tolist()
  100. follow_car_time = self.df_vehState[self.df_vehState['ICA_status'] == ICA_FOLLOW_CAR]['simTime'].tolist()
  101. longitudinal_time_list = self.df_vehState[self.df_vehState['ICA_status'] == ICA_LONGITUDINAL][
  102. 'simTime'].tolist()
  103. lateral_time_list = self.df_vehState[self.df_vehState['ICA_status'].isin(ICA_LATERAL)]['simTime'].tolist()
  104. active_time_list = self.df_vehState[self.df_vehState['ICA_status'].isin(ICA_ACTIVE)]['simTime'].tolist()
  105. ACC = ACCTrigger(self.df_vehState)
  106. ICA_speed_start_end_time_list = ACC.find_start_end_time(constant_speed_time)
  107. ICA_dist_start_end_time_list = ACC.find_start_end_time(follow_car_time)
  108. ICA_longitudinal_start_end_time_list = ACC.find_start_end_time(longitudinal_time_list)
  109. ICA_lateral_start_end_time_list = ACC.find_start_end_time(lateral_time_list)
  110. ICA_active_start_end_time_list = ACC.find_start_end_time(active_time_list)
  111. ICA_start_end_time_dict['ICA_cruise_time'] = ICA_speed_start_end_time_list
  112. ICA_start_end_time_dict['ICA_follow_time'] = ICA_dist_start_end_time_list
  113. ICA_start_end_time_dict['ICA_Longitudinal_time'] = ICA_longitudinal_start_end_time_list
  114. ICA_start_end_time_dict['ICA_Lateral_time'] = ICA_lateral_start_end_time_list
  115. ICA_start_end_time_dict['ICA_active_time'] = ICA_active_start_end_time_list
  116. return ICA_start_end_time_dict