center_distance_standard_deviation.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2024/02/21
  11. @Last Modified: 2024/02/21
  12. @Summary: The template of custom indicator.
  13. """
  14. import pandas as pd
  15. import numpy as np
  16. import scipy.signal as sg
  17. from common import zip_time_pairs, continuous_group, continous_judge
  18. from log import logger
  19. """import functions"""
  20. class CustomMetric(object):
  21. def __init__(self, all_data, case_name):
  22. self.data = all_data
  23. self.case_name = case_name
  24. self.markline_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  25. self.result = {
  26. "name": "ica车道中心线横向距离分布标准差",
  27. "value": [],
  28. # "weight": [],
  29. "tableData": {
  30. "avg": "", # 平均值,或指标值
  31. "max": "",
  32. "min": ""
  33. },
  34. "reportData": {
  35. "name": "车辆中心线横向距离(m)",
  36. # "legend": [], # 如果有多个data,则需要增加data对应的说明,如:["横向加速度", "纵向加速度"]
  37. "data": [],
  38. "markLine": [],
  39. "range": [],
  40. },
  41. "statusFlag": {}
  42. }
  43. self.ego_df = pd.DataFrame()
  44. self.df_lka = pd.DataFrame()
  45. self.center_dist_with_nan = list()
  46. self.center_time_list = list()
  47. self.center_frame_list = list()
  48. self.center_dist = list()
  49. self.run()
  50. def data_extract(self):
  51. self.ego_df = self.data.ego_data
  52. self.df_lka = self.ego_df[self.ego_df['ICA_status'].isin(
  53. ["Only_Longitudinal_Control", "LLC_Follow_Line", "LLC_Follow_Vehicle"])].copy()
  54. if self.df_lka.empty:
  55. self.result['statusFlag']['functionICA'] = False
  56. else:
  57. self.result['statusFlag']['functionICA'] = True
  58. # self.df_lka = self.ego_df[self.ego_df['LKA_status'] == "Active"].copy()
  59. # self.config = self.data.config
  60. # self.function_config = self.config.config['function']
  61. # self.optimal_dict = self.function_config['optimal']
  62. def data_analyze(self):
  63. df_lka = self.df_lka
  64. self.center_dist_with_nan = df_lka['laneOffset'].to_list()
  65. self.center_time_list = df_lka['simTime'].to_list()
  66. self.center_frame_list = df_lka['simFrame'].to_list()
  67. self.center_dist = [x for x in self.center_dist_with_nan if not np.isnan(x)]
  68. if not self.center_dist:
  69. self.result['value'].append(0)
  70. else:
  71. center_dist = [abs(x) for x in self.center_dist]
  72. # 车道中心线横向距离分布期望与标准差
  73. D_lane_center_dist = np.std(center_dist)
  74. self.result['value'].append(round(D_lane_center_dist, 2))
  75. def markline_statistic(self):
  76. unfunc_df = pd.DataFrame(
  77. {'simTime': self.center_time_list, 'simFrame': self.center_frame_list,
  78. 'center_dist': self.center_dist_with_nan})
  79. unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
  80. unfunc_df = unfunc_df.dropna(subset=['center_dist'])
  81. lane_df = unfunc_df[abs(unfunc_df['center_dist']) > 1.5]
  82. lane_df = lane_df[['simTime', 'simFrame', 'center_dist']]
  83. dist_lane_df = continuous_group(lane_df)
  84. dist_lane_df['type'] = "ICA"
  85. self.markline_df = pd.concat([self.markline_df, dist_lane_df], ignore_index=True)
  86. def report_data_statistic(self):
  87. time_list = self.df_lka['simTime'].values.tolist()
  88. line_dist_list = self.center_dist_with_nan
  89. self.result['tableData']['avg'] = self.result['value'][0] if not self.df_lka.empty else '-'
  90. self.result['tableData']['max'] = '-'
  91. self.result['tableData']['min'] = '-'
  92. zip_vs_time = zip_time_pairs(time_list, line_dist_list)
  93. self.result['reportData']['data'] = zip_vs_time
  94. self.markline_statistic()
  95. markline_slices = self.markline_df.to_dict('records')
  96. self.result['reportData']['markLine'] = markline_slices
  97. self.result['reportData']['range'] = f"[0, 0.5]"
  98. def run(self):
  99. # logger.info(f"Custom metric run:[{self.result['name']}].")
  100. logger.info(
  101. f"[case:{self.case_name}] Custom metric:[center_distance_standard_deviation:{self.result['name']}] evaluate.")
  102. try:
  103. self.data_extract()
  104. except Exception as e:
  105. logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data extract ERROR!", e)
  106. try:
  107. self.data_analyze()
  108. except Exception as e:
  109. logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data analyze ERROR!", e)
  110. try:
  111. self.report_data_statistic()
  112. except Exception as e:
  113. logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} report data statistic ERROR!", e)
  114. # if __name__ == "__main__":
  115. # pass