LKA_Trigger.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import numpy as np
  2. import pandas as pd
  3. STANDBY = 0
  4. RIGHT_WARNING = 2
  5. class LKA_Trigger():
  6. def __init__(self, df_vehState):
  7. self.df_vehState = df_vehState
  8. def lane_keep_warning_status(self, left_or_right):
  9. '''
  10. :return:
  11. start_vehstate: 状态机开始的时间,以列表形式存储
  12. close_vehstate:状态机结束的时间,以列表形式存储
  13. '''
  14. groups = (self.df_vehState['LKA_status'] != self.df_vehState['LKA_status'].shift()).cumsum()
  15. LKA_left_groups = self.df_vehState[self.df_vehState['LKA_status'] == left_or_right].groupby(groups)
  16. # 初始化结果列表
  17. result_times = []
  18. # 使用enumerate来跟踪分组的顺序(模拟全局索引的奇偶性)
  19. for idx, (name, group) in enumerate(LKA_left_groups, start=1):
  20. result_time = []
  21. # 获取片段的simTime值
  22. sim_times = group['simTime'].tolist()
  23. # 根据分组的奇偶性(即这里的idx)选择simTime
  24. # if idx % 2 == 0:
  25. # 偶数组,选择第一个
  26. if sim_times: # 确保列表不为空
  27. result_time.append(sim_times[-1])
  28. result_time.append(sim_times[-1])
  29. result_times.append(result_time)
  30. return result_times
  31. def LKA_warning_active_time_statistics(self):
  32. warning_status_time_dict = {}
  33. left_warning = 1
  34. right_warning = 2
  35. LKA_left_vehstate = self.lane_keep_warning_status(left_warning)
  36. LKA_right_vehstate = self.lane_keep_warning_status(right_warning)
  37. warning_status_time_dict['LKA_left_active_time'] = LKA_left_vehstate
  38. warning_status_time_dict['LKA_right_active_time'] = LKA_right_vehstate
  39. return warning_status_time_dict
  40. if __name__ == '__main__':
  41. df_vehState = pd.read_csv(r"D:\Cicv\Lantu\数据\LKA_data2\data\VehState.csv")
  42. LKA = LKA_Trigger(df_vehState)
  43. vehstate = LKA.LKA_warning_active_time_statistics()
  44. print(vehstate)