LKA_0702(1).py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. import pandas as pd
  2. import numpy as np
  3. import os
  4. import math
  5. class LKA:
  6. def __init__(self, df, obj, roadPos_df, roadMark_df, laneInfo_df):
  7. self.df = df
  8. self.segmented_data = []
  9. """
  10. 注释:zhangyu 0626 新增
  11. """
  12. self.roadPos_df = roadPos_df
  13. self.roadMark_df = roadMark_df
  14. self.laneInfo_df = laneInfo_df
  15. """
  16. 横向指标
  17. """
  18. self.lateral_control01_minimum_distance_nearby_lane = None
  19. self.lateral_control02_maximum_lateral_offset = None
  20. self.lateral_control03_maximum_heading_deviation = None
  21. self.lateral_control04_relative_position_oscillation_frequency = None
  22. self.lateral_control05_relative_position_oscillation_difference = None
  23. self.lateral_control06_absolute_lateral_offset_distribution_expectation = None
  24. self.lateral_control07_absolute_lateral_offset_distribution_standard_deviation = None
  25. self.lateral_control08_maximum_lateral_deviation = None
  26. self.lateral_control09_minimum_lateral_deviation = None
  27. self.lateral_control10_fixed_driving_direction_TLC = None
  28. # self.lateral_control10_absolute_position_oscillation_difference = None
  29. # self.lateral_control11_maximum_heading_deviation = None
  30. # self.lateral_control12_relative_position_oscillation_frequency = None
  31. # self.lateral_control13_relative_position_oscillation_difference = None
  32. self._lateral_control01_minimum_distance_nearby_lane()
  33. self._lateral_control02_maximum_lateral_offset()
  34. self._lateral_control03_maximum_heading_deviation()
  35. self._lateral_control04_relative_position_oscillation_frequency()
  36. self._lateral_control05_relative_position_oscillation_difference()
  37. self._lateral_control06_absolute_lateral_offset_distribution_expectation()
  38. self._lateral_control07_absolute_lateral_offset_distribution_standard_deviation()
  39. self._lateral_control08_maximum_lateral_deviation()
  40. self._lateral_control09_minimum_lateral_deviation()
  41. self._lateral_control10_fixed_driving_direction_TLC()
  42. def Compute_nearby_distance_to_lane_boundary(self, x, width_ego):
  43. """
  44. 计算自车到车道边界线的距离,需要减去自车宽度
  45. Args:
  46. x.lateralDist :到边界线距离
  47. width_ego:车宽
  48. Returns:
  49. 自车到车道边界线的距离
  50. """
  51. if x.lateralDist < abs(x.right_lateral_distance):
  52. return x.lateralDist - width_ego/2
  53. else:
  54. return abs(x.right_lateral_distance) - width_ego/2
  55. def func_laneOffset_abs(self, x):
  56. """
  57. 计算自车横向偏移量的绝对值
  58. Args:
  59. x.laneOffset :横向偏移量
  60. Returns:
  61. 横向偏移量的绝对值
  62. """
  63. return abs(x.laneOffset)
  64. def func_center_line_cycle_optical(self, l):
  65. """
  66. 计算偏离车道中心线的振荡周期和极差,极大值要大于0,极小值要小于0
  67. Args:
  68. l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2]
  69. Returns:
  70. cycle:振荡周期数
  71. maximum_range:最大极差
  72. """
  73. # print("\n穿过y=0的周期数: ")
  74. length = len(l)
  75. number_peak = 0
  76. number_trough = 0
  77. # 生成新的、只含有波峰波谷的列表
  78. new_list = []
  79. plus = []
  80. minus = []
  81. for number, value in enumerate(l):
  82. if number == 0:
  83. if value > l[number + 1] and value > 0:
  84. number_peak += 1
  85. plus.append(value)
  86. if value < l[number + 1] and value < 0:
  87. number_trough += 1
  88. minus.append(value)
  89. continue
  90. if number == length - 1:
  91. if value > l[number - 1] and value > 0:
  92. number_peak += 1
  93. plus.append(value)
  94. if len(minus) != 0:
  95. new_list.append(min(minus))
  96. minus = []
  97. if value < l[number - 1] and value < 0:
  98. number_trough += 1
  99. minus.append(value)
  100. if len(plus) != 0:
  101. new_list.append(max(plus))
  102. plus = []
  103. if len(minus) != 0:
  104. new_list.append(min(minus))
  105. minus = []
  106. if len(plus) != 0:
  107. new_list.append(max(plus))
  108. plus = []
  109. continue
  110. if value >= l[number - 1] and value > l[number + 1] and value > 0:
  111. number_peak += 1
  112. plus.append(value)
  113. if len(minus) != 0:
  114. new_list.append(min(minus))
  115. minus = []
  116. if value < l[number - 1] and value <= l[number + 1] and value < 0:
  117. number_trough += 1
  118. minus.append(value)
  119. if len(plus) != 0:
  120. new_list.append(max(plus))
  121. plus = []
  122. cycle = min(number_peak, number_trough) - 1
  123. difference_list = []
  124. for i in range(len(new_list) - 1):
  125. difference = abs(new_list[i] - new_list[i + 1])
  126. difference_list.append(difference)
  127. if len(difference_list) == 0:
  128. maximum_range = -1
  129. # print(f"极差: {maximum_range}")
  130. else:
  131. maximum_range = max(difference_list)
  132. # print(f"极差: {maximum_range}")
  133. return cycle, maximum_range
  134. def func_normal_cycle_optical(self, l):
  135. """
  136. 计算振荡周期和极差,两个相邻的极大值(或极小值)算一次振荡
  137. Args:
  138. l :波的数值列表,如 l = [1, 2, 1, 2, 1, 0, -1, 0, 1, 2, 1, 2, 0, -1, 2]
  139. Returns:
  140. cycle:振荡周期数
  141. maximum_range:最大极差
  142. """
  143. # print("正常的周期数: ")
  144. length = len(l)
  145. number_peak = 0
  146. number_trough = 0
  147. # 生成新的、只含有波峰波谷的列表
  148. new_list = []
  149. for number, value in enumerate(l):
  150. if number == 0:
  151. if value > l[number + 1]:
  152. number_peak += 1
  153. new_list.append(value)
  154. if value < l[number + 1]:
  155. number_trough += 1
  156. new_list.append(value)
  157. continue
  158. if number == length - 1:
  159. if value > l[number - 1]:
  160. number_peak += 1
  161. new_list.append(value)
  162. if value < l[number - 1]:
  163. number_trough += 1
  164. new_list.append(value)
  165. continue
  166. if value >= l[number - 1] and value > l[number + 1]:
  167. number_peak += 1
  168. new_list.append(value)
  169. if value < l[number - 1] and value <= l[number + 1]:
  170. number_trough += 1
  171. new_list.append(value)
  172. if abs(number_peak - number_trough) > 1:
  173. pass
  174. else:
  175. cycle = max(number_peak, number_trough) - 1
  176. difference_list = []
  177. for i in range(len(new_list) - 1):
  178. difference = abs(new_list[i] - new_list[i + 1])
  179. difference_list.append(difference)
  180. if len(difference_list) == 0:
  181. maximum_range = -1
  182. # print(f"极差: {maximum_range}")
  183. else:
  184. maximum_range = max(difference_list)
  185. # print(f"极差: {maximum_range}")
  186. return cycle, maximum_range
  187. def fixed_driving_direction_TLC(self, x):
  188. TLC = (x['laneWidth'] / 2 - x['laneOffset_abs'] - x['dimY'] / 2 * math.cos(x['heading_deviation_abs'])) / (
  189. x['velocity_resultant'] * math.sin(x['heading_deviation_abs']))
  190. return TLC
  191. def _lateral_control01_minimum_distance_nearby_lane(self):
  192. """
  193. 横向控制01指标:计算离近侧车道线最小距离(m)
  194. Args:
  195. Returns:
  196. None
  197. """
  198. roadMark_df = self.roadMark_df
  199. player_df = self.df
  200. ego_df = player_df[player_df.playerId == 0]
  201. width_ego = ego_df['dimY'].values.tolist()[0]
  202. # 提取距离左车道线和右车道线距离
  203. roadMark_left_df = roadMark_df[roadMark_df.id == 0].reset_index(drop=True)
  204. roadMark_right_df = roadMark_df[roadMark_df.id == 2].reset_index(drop=True)
  205. roadMark_left_df['right_lateral_distance'] = roadMark_right_df['lateralDist']
  206. # 计算到车道边界线距离
  207. roadMark_left_df['nearby_distance_to_lane_boundary'] = roadMark_left_df.apply(
  208. lambda x: self.Compute_nearby_distance_to_lane_boundary(x, width_ego), axis=1)
  209. nearby_distance_to_lane_boundary = min(roadMark_left_df['nearby_distance_to_lane_boundary'])
  210. self.lateral_control01_minimum_distance_nearby_lane = round(nearby_distance_to_lane_boundary, 3)
  211. def _lateral_control02_maximum_lateral_offset(self):
  212. """
  213. 横向控制02指标:计算最大横向偏移量(m)
  214. Args:
  215. Returns:
  216. None
  217. """
  218. roadPos_df = self.roadPos_df
  219. # 提取自车的roadPos数据
  220. roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
  221. # 计算横向偏移量的绝对值
  222. roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
  223. max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax()
  224. row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
  225. self.lateral_control02_maximum_lateral_offset = round(row_with_max_value, 3)
  226. def _lateral_control03_maximum_heading_deviation(self):
  227. """
  228. 横向控制3指标:最大偏航角
  229. Args:
  230. Returns:
  231. None
  232. """
  233. player_df = self.df
  234. road_mark_df = self.roadMark_df
  235. ego_df = player_df[player_df.playerId == 0].reset_index(drop=True)
  236. # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减
  237. road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True)
  238. road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True)
  239. road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor']
  240. road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor']
  241. road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \
  242. lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1)
  243. ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle']
  244. ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply( \
  245. lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1)
  246. row_with_max_value = max(ego_df['heading_deviation_abs'])
  247. self.lateral_control03_maximum_heading_deviation = row_with_max_value
  248. def _lateral_control04_relative_position_oscillation_frequency(self):
  249. """
  250. 横向控制12指标:横向相对位置振荡频率(Hz)
  251. Args:
  252. Returns:
  253. None
  254. """
  255. roadPos_df = self.roadPos_df
  256. roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
  257. roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
  258. cycle_number, _ = self.func_normal_cycle_optical(roadPos_ego_list)
  259. self.lateral_control04_relative_position_oscillation_frequency = round(cycle_number, 3)
  260. def _lateral_control05_relative_position_oscillation_difference(self):
  261. """
  262. 横向控制04指标:横向相对位置振荡极差(m)
  263. Args:
  264. Returns:
  265. None
  266. """
  267. roadPos_df = self.roadPos_df
  268. roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
  269. roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
  270. _, maximum_range = self.func_normal_cycle_optical(roadPos_ego_list)
  271. self.lateral_control05_relative_position_oscillation_difference = round(maximum_range, 3)
  272. def _lateral_control06_absolute_lateral_offset_distribution_expectation(self):
  273. """
  274. 横向控制06指标:计算绝对横向偏移量分布期望(m)
  275. Args:
  276. Returns:
  277. None
  278. """
  279. roadPos_df = self.roadPos_df
  280. roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
  281. roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \
  282. lambda x: self.func_laneOffset_abs(x), axis=1)
  283. mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].mean()
  284. self.lateral_control06_absolute_lateral_offset_distribution_expectation = round(mean_laneOffset_index, 3)
  285. def _lateral_control07_absolute_lateral_offset_distribution_standard_deviation(self):
  286. """
  287. 横向控制07指标:计算绝对横向偏移量分布标准差(m)
  288. Args:
  289. Returns:
  290. None
  291. """
  292. roadPos_df = self.roadPos_df
  293. roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
  294. roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply( \
  295. lambda x: self.func_laneOffset_abs(x), axis=1)
  296. mean_laneOffset_index = roadPos_ego_df['laneOffset_abs'].std()
  297. self.lateral_control07_absolute_lateral_offset_distribution_standard_deviation = round(mean_laneOffset_index, 3)
  298. def _lateral_control08_maximum_lateral_deviation(self):
  299. """
  300. 横向控制08指标:横向距离极大值(m)
  301. Args:
  302. Returns:
  303. None
  304. """
  305. roadPos_df = self.roadPos_df
  306. roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
  307. roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
  308. max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmax()
  309. row_with_max_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
  310. self.lateral_control08_maximum_lateral_deviation = row_with_max_value
  311. def _lateral_control09_minimum_lateral_deviation(self):
  312. """
  313. 横向控制09指标:横向距离极小值(m)
  314. Args:
  315. Returns:
  316. None
  317. """
  318. roadPos_df = self.roadPos_df
  319. roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
  320. roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1)
  321. max_laneOffset_abs_index = roadPos_ego_df['laneOffset_abs'].idxmin()
  322. row_with_min_value = roadPos_ego_df.iloc[max_laneOffset_abs_index].laneOffset
  323. self.lateral_control09_minimum_lateral_deviation = row_with_min_value
  324. def _lateral_control10_fixed_driving_direction_TLC(self):
  325. """
  326. 横向控制10指标:固定行驶方向车辆跨道时间
  327. Args:
  328. Returns:
  329. None
  330. """
  331. player_df = self.df
  332. ego_df = player_df[player_df.playerId == 0].reset_index(drop=True)
  333. width_ego = ego_df['dimY'] # 自车宽度e
  334. road_mark_df = self.roadMark_df
  335. # 左车道线曲率,右车道线曲率,求二者平均值,计算车道线曲率,再与自车朝向相减
  336. road_mark_left_df = road_mark_df[road_mark_df.id == 0].reset_index(drop=True)
  337. road_mark_right_df = road_mark_df[road_mark_df.id == 2].reset_index(drop=True)
  338. road_mark_left_df['curvHor_left'] = road_mark_left_df['curvHor']
  339. road_mark_left_df['curvHor_right'] = road_mark_right_df['curvHor']
  340. road_mark_left_df['curvHor_middle'] = road_mark_left_df[['curvHor_left', 'curvHor_right']].apply( \
  341. lambda x: (x['curvHor_left'] + x['curvHor_right']) / 2, axis=1)
  342. ego_df['curvHor_middle'] = road_mark_left_df['curvHor_middle']
  343. ego_df['heading_deviation_abs'] = ego_df[['curvHor_middle', 'posH']].apply(\
  344. lambda x: abs(x['posH'] - x['curvHor_middle']), axis=1) # 偏航角θ
  345. laneInfo_df = self.laneInfo_df
  346. laneInfo_df = laneInfo_df[laneInfo_df.id == -1].reset_index(drop=True) # laneInfo_df['width'] 车道宽度
  347. ego_df['velocity_resultant'] = ego_df[['speedX', 'speedY']].apply( \
  348. lambda x: math.sqrt(x['speedX'] ** 2 - x['speedY'] ** 2) / 3.6, axis=1) # 汽车行驶速度v
  349. roadPos_df = self.roadPos_df
  350. roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
  351. roadPos_ego_df['laneOffset_abs'] = roadPos_ego_df.apply(lambda x: self.func_laneOffset_abs(x), axis=1) # 横向偏移量y0
  352. merged_df = pd.merge(roadPos_ego_df, ego_df, on='simFrame', how='inner')
  353. merged_df["laneWidth"] = 3.5
  354. merged_df['TLC'] = merged_df.apply(lambda x: self.fixed_driving_direction_TLC(x), axis=1)
  355. row_with_min_value = min(merged_df['TLC'])
  356. self.lateral_control10_fixed_driving_direction_TLC = row_with_min_value
  357. def _lateral_control11_fixed_driving_direction_TLC(self):
  358. """
  359. 横向控制11指标:固定方向盘转角车辆跨道时间
  360. Args:
  361. Returns:
  362. None
  363. """
  364. pass
  365. if __name__ == "__main__":
  366. objState_csv_data = "/guazai/single_evaluate/SINGLE_EVALUATE/task_0515/case0530-ica-post/data/EgoState.csv"
  367. directory = os.path.dirname(objState_csv_data)
  368. print("目录名:", directory)
  369. # 使用 os.path.join() 组合目录和文件名
  370. objState_sensor_csv_data = os.path.join(directory, "ObjState.csv")
  371. roadMark_csv_data = os.path.join(directory, "RoadMark.csv")
  372. roadPos_csv_data = os.path.join(directory, "RoadPos.csv")
  373. laneInfo_csv_data = os.path.join(directory, "LaneInfo.csv")
  374. objState_csv_data = pd.read_csv(objState_csv_data)
  375. objState_sensor_csv_data = pd.read_csv(objState_sensor_csv_data)
  376. roadMark_csv_data = pd.read_csv(roadMark_csv_data)
  377. roadPos_csv_data = pd.read_csv(roadPos_csv_data)
  378. laneInfo_csv_data = pd.read_csv(laneInfo_csv_data)
  379. ICA_object = LKA(objState_csv_data, objState_sensor_csv_data, roadPos_csv_data, roadMark_csv_data, laneInfo_csv_data)
  380. print(ICA_object.lateral_control01_minimum_distance_nearby_lane)
  381. print(ICA_object.lateral_control02_maximum_lateral_offset)
  382. print(ICA_object.lateral_control03_maximum_heading_deviation)
  383. print(ICA_object.lateral_control04_relative_position_oscillation_frequency)
  384. print(ICA_object.lateral_control05_relative_position_oscillation_difference)
  385. print(ICA_object.lateral_control06_absolute_lateral_offset_distribution_expectation)
  386. print(ICA_object.lateral_control07_absolute_lateral_offset_distribution_standard_deviation)
  387. print(ICA_object.lateral_control08_maximum_lateral_deviation)
  388. print(ICA_object.lateral_control09_minimum_lateral_deviation)
  389. print(ICA_object._lateral_control10_minimum_lateral_deviation)