center_distance_max_0531.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2024/02/21
  11. @Last Modified: 2024/02/21
  12. @Summary: The template of custom indicator.
  13. """
  14. import pandas as pd
  15. import numpy as np
  16. import scipy.signal as sg
  17. from common import zip_time_pairs, continuous_group, continous_judge
  18. from log import logger
  19. """import functions"""
  20. class CustomMetric(object):
  21. def __init__(self, all_data, case_name):
  22. self.data = all_data
  23. self.case_name = case_name
  24. self.markline_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  25. self.result = {
  26. "name": "ica车道中心线横向距离极大值",
  27. "value": [],
  28. # "weight": [],
  29. "tableData": {
  30. "avg": "", # 平均值,或指标值
  31. "max": "",
  32. "min": ""
  33. },
  34. "reportData": {
  35. "name": "车辆中心线横向距离(m)",
  36. # "legend": [], # 如果有多个data,则需要增加data对应的说明,如:["横向加速度", "纵向加速度"]
  37. "data": [],
  38. "markLine": [],
  39. "range": [],
  40. }
  41. }
  42. self.ego_df = pd.DataFrame()
  43. self.df_lka = pd.DataFrame()
  44. self.center_dist_with_nan = list()
  45. self.center_time_list = list()
  46. self.center_frame_list = list()
  47. self.center_dist = list()
  48. self.run()
  49. def data_extract(self):
  50. self.ego_df = self.data.ego_data
  51. self.df_lka = self.ego_df[self.ego_df['ICA_status'].isin(
  52. ["Only_Longitudinal_Control", "LLC_Follow_Line", "LLC_Follow_Vehicle"])].copy()
  53. # self.df_lka = self.ego_df[self.ego_df['LKA_status'] == "Active"].copy()
  54. # self.config = self.data.config
  55. # self.function_config = self.config.config['function']
  56. # self.optimal_dict = self.function_config['optimal']
  57. def data_analyze(self):
  58. df_lka = self.df_lka
  59. self.center_dist_with_nan = df_lka['laneOffset'].to_list()
  60. self.center_time_list = df_lka['simTime'].to_list()
  61. self.center_frame_list = df_lka['simFrame'].to_list()
  62. self.center_dist = [x for x in self.center_dist_with_nan if not np.isnan(x)]
  63. if not self.center_dist:
  64. self.result['value'].append(0)
  65. else:
  66. center_dist = [abs(x) for x in self.center_dist]
  67. # 车道中心线横向距离分布极值
  68. center_dist = np.array(center_dist)
  69. extreme_max_value = center_dist.max()
  70. self.result['value'].append(round(extreme_max_value, 2))
  71. def markline_statistic(self):
  72. unfunc_df = pd.DataFrame(
  73. {'simTime': self.center_time_list, 'simFrame': self.center_frame_list,
  74. 'center_dist': self.center_dist_with_nan})
  75. unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
  76. unfunc_df = unfunc_df.dropna(subset=['center_dist'])
  77. lane_df = unfunc_df[abs(unfunc_df['center_dist']) > 1.5]
  78. lane_df = lane_df[['simTime', 'simFrame', 'center_dist']]
  79. dist_lane_df = continuous_group(lane_df)
  80. dist_lane_df['type'] = "ICA"
  81. self.markline_df = pd.concat([self.markline_df, dist_lane_df], ignore_index=True)
  82. def report_data_statistic(self):
  83. time_list = self.df_lka['simTime'].values.tolist()
  84. line_dist_list = self.center_dist_with_nan
  85. self.result['tableData']['avg'] = '-'
  86. self.result['tableData']['max'] = self.result['value'][0]
  87. self.result['tableData']['min'] = '-'
  88. zip_vs_time = zip_time_pairs(time_list, line_dist_list)
  89. self.result['reportData']['data'] = zip_vs_time
  90. self.markline_statistic()
  91. markline_slices = self.markline_df.to_dict('records')
  92. self.result['reportData']['markLine'] = markline_slices
  93. self.result['reportData']['range'] = f"[0, 1.5]"
  94. def run(self):
  95. # logger.info(f"Custom metric run:[{self.result['name']}].")
  96. logger.info(f"[case:{self.case_name}] Custom metric:[center_distance_max:{self.result['name']}] evaluate.")
  97. try:
  98. self.data_extract()
  99. except Exception as e:
  100. logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data extract ERROR!", e)
  101. try:
  102. self.data_analyze()
  103. except Exception as e:
  104. logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data analyze ERROR!", e)
  105. try:
  106. self.report_data_statistic()
  107. except Exception as e:
  108. logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} report data statistic ERROR!", e)
  109. # if __name__ == "__main__":
  110. # pass