|
@@ -0,0 +1,236 @@
|
|
|
|
+#!/usr/bin/env python
|
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
|
+##################################################################
|
|
|
|
+#
|
|
|
|
+# Copyright (c) 2023 CICV, Inc. All Rights Reserved
|
|
|
|
+#
|
|
|
|
+##################################################################
|
|
|
|
+"""
|
|
|
|
+@Authors: zhangyu
|
|
|
|
+@Data: 2024/02/21
|
|
|
|
+@Last Modified: 2024/02/21
|
|
|
|
+@Summary: The template of custom indicator.
|
|
|
|
+"""
|
|
|
|
+
|
|
|
|
+"""
|
|
|
|
+设计思路:
|
|
|
|
+最大横向偏移量
|
|
|
|
+zy_center_distance_expectation
|
|
|
|
+"""
|
|
|
|
+import math
|
|
|
|
+import pandas as pd
|
|
|
|
+import numpy as np
|
|
|
|
+from common import zip_time_pairs, continuous_group
|
|
|
|
+from log import logger
|
|
|
|
+
|
|
|
|
+"""import functions"""
|
|
|
|
+
|
|
|
|
+# custom metric codes
|
|
|
|
+class CustomMetric(object):
|
|
|
|
+ def __init__(self, all_data, case_name):
|
|
|
|
+ self.data = all_data
|
|
|
|
+ self.optimal_dict = self.data.config
|
|
|
|
+ self.case_name = case_name
|
|
|
|
+ self.markline_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
|
|
|
|
+
|
|
|
|
+ self.df = pd.DataFrame()
|
|
|
|
+ self.ego_df = pd.DataFrame()
|
|
|
|
+ self.df_follow = pd.DataFrame()
|
|
|
|
+ self.roadMark_df = pd.DataFrame()
|
|
|
|
+ self.roadPos_df = pd.DataFrame()
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ self.time_list_follow = list()
|
|
|
|
+ self.frame_list_follow = list()
|
|
|
|
+ self.dist_list = list()
|
|
|
|
+ self.dist_deviation_list = list()
|
|
|
|
+ self.dist_deviation_list_full_time = list()
|
|
|
|
+
|
|
|
|
+ self.result = {
|
|
|
|
+ "name": "横向相对位置振荡频率",
|
|
|
|
+ "value": [],
|
|
|
|
+ # "weight": [],
|
|
|
|
+ "tableData": {
|
|
|
|
+ "avg": "", # 平均值,或指标值
|
|
|
|
+ "max": "",
|
|
|
|
+ "min": ""
|
|
|
|
+ },
|
|
|
|
+ "reportData": {
|
|
|
|
+ "name": "横向相对位置振荡频率(Hz)",
|
|
|
|
+ # "legend": [], # 如果有多个data,则需要增加data对应的说明,如:["横向加速度", "纵向加速度"]
|
|
|
|
+ "data": [],
|
|
|
|
+ "markLine": [],
|
|
|
|
+ "range": [],
|
|
|
|
+ },
|
|
|
|
+ "statusFlag": {}
|
|
|
|
+ }
|
|
|
|
+ self.run()
|
|
|
|
+ print(f"指标12: 横向相对位置振荡频率: {self.result['value']}")
|
|
|
|
+
|
|
|
|
+ def data_extract(self):
|
|
|
|
+ self.df = self.data.object_df
|
|
|
|
+ self.ego_df = self.data.object_df[self.data.object_df.playerId == 1]
|
|
|
|
+ self.df_follow = self.df[self.df['ACC_status'] == "Shut_off"].copy() # 数字3对应ICA的Active
|
|
|
|
+ # self.df_follow = self.df[self.df['ACC_status'] == "Active"].copy() # 数字3对应ICA的Active
|
|
|
|
+ self.roadMark_df = self.data.road_mark_df
|
|
|
|
+ self.roadPos_df = self.data.road_pos_df
|
|
|
|
+
|
|
|
|
+ if self.df_follow.empty:
|
|
|
|
+ self.result['statusFlag']['function_ICA'] = False
|
|
|
|
+ else:
|
|
|
|
+ self.result['statusFlag']['function_ICA'] = True
|
|
|
|
+
|
|
|
|
+ def func_center_line_cycle(self, l):
|
|
|
|
+ # print("\n穿过y=0的周期数: ")
|
|
|
|
+ length = len(l)
|
|
|
|
+ number_peak = 0
|
|
|
|
+ number_trough = 0
|
|
|
|
+ for number, value in enumerate(l):
|
|
|
|
+ if number == 0:
|
|
|
|
+ if value > l[number + 1] and value > 0:
|
|
|
|
+ number_peak += 1
|
|
|
|
+ if value < l[number + 1] and value < 0:
|
|
|
|
+ number_trough += 1
|
|
|
|
+ continue
|
|
|
|
+ if number == length - 1:
|
|
|
|
+ if value > l[number - 1] and value > 0:
|
|
|
|
+ number_peak += 1
|
|
|
|
+ if value < l[number - 1] and value < 0:
|
|
|
|
+ number_trough += 1
|
|
|
|
+ continue
|
|
|
|
+ if value >= l[number - 1] and value > l[number + 1] and value > 0:
|
|
|
|
+ number_peak += 1
|
|
|
|
+ if value < l[number - 1] and value <= l[number + 1] and value < 0:
|
|
|
|
+ number_trough += 1
|
|
|
|
+ # print("number_peak: ", number_peak)
|
|
|
|
+ # print("number_trough: ", number_trough)
|
|
|
|
+ cycle = min(number_peak, number_trough) - 1
|
|
|
|
+ # print("cycle: ", cycle)
|
|
|
|
+ if cycle == -1:
|
|
|
|
+ return 0
|
|
|
|
+ return cycle
|
|
|
|
+
|
|
|
|
+ def func_normal_cycle_optical(self, l):
|
|
|
|
+ # print("正常的周期数: ")
|
|
|
|
+ length = len(l)
|
|
|
|
+ number_peak = 0
|
|
|
|
+ number_trough = 0
|
|
|
|
+ # value_peak = []
|
|
|
|
+ # value_trough = []
|
|
|
|
+ # 生成新的、只含有波峰波谷的列表
|
|
|
|
+ new_list = []
|
|
|
|
+ for number, value in enumerate(l):
|
|
|
|
+ if number == 0:
|
|
|
|
+ if value > l[number + 1]:
|
|
|
|
+ number_peak += 1
|
|
|
|
+ new_list.append(value)
|
|
|
|
+ if value < l[number + 1]:
|
|
|
|
+ number_trough += 1
|
|
|
|
+ new_list.append(value)
|
|
|
|
+ continue
|
|
|
|
+ if number == length - 1:
|
|
|
|
+ if value > l[number - 1]:
|
|
|
|
+ number_peak += 1
|
|
|
|
+ new_list.append(value)
|
|
|
|
+ if value < l[number - 1]:
|
|
|
|
+ number_trough += 1
|
|
|
|
+ new_list.append(value)
|
|
|
|
+ continue
|
|
|
|
+ if value >= l[number - 1] and value > l[number + 1]:
|
|
|
|
+ number_peak += 1
|
|
|
|
+ new_list.append(value)
|
|
|
|
+ if value < l[number - 1] and value <= l[number + 1]:
|
|
|
|
+ number_trough += 1
|
|
|
|
+ new_list.append(value)
|
|
|
|
+ if abs(number_peak - number_trough) > 1:
|
|
|
|
+ # print("计算波峰波谷有误")
|
|
|
|
+ pass
|
|
|
|
+ else:
|
|
|
|
+ # print("number_peak: ", number_peak)
|
|
|
|
+ # print("number_trough: ", number_trough)
|
|
|
|
+ cycle = max(number_peak, number_trough) - 1
|
|
|
|
+ # print("cycle: ", cycle)
|
|
|
|
+ # print(f"value_peak: {value_peak}")
|
|
|
|
+ # print(f"value_trough: {value_trough}")
|
|
|
|
+
|
|
|
|
+ # print(f"new_list: {new_list}")
|
|
|
|
+ difference_list = []
|
|
|
|
+ for i in range(len(new_list) - 1):
|
|
|
|
+ difference = abs(new_list[i] - new_list[i + 1])
|
|
|
|
+ difference_list.append(difference)
|
|
|
|
+ if len(difference_list) == 0:
|
|
|
|
+ maximum_range = -1
|
|
|
|
+ # print(f"极差: {maximum_range}")
|
|
|
|
+ else:
|
|
|
|
+ maximum_range = max(difference_list)
|
|
|
|
+ # print(f"极差: {maximum_range}")
|
|
|
|
+ return cycle
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def data_analyze(self):
|
|
|
|
+ # 提取自车宽度
|
|
|
|
+ roadPos_df = self.roadPos_df
|
|
|
|
+ player_df = self.df
|
|
|
|
+ ego_df = player_df[player_df.playerId == 1]
|
|
|
|
+ width_ego = ego_df['dimY'].values.tolist()[0]
|
|
|
|
+
|
|
|
|
+ # 提取距离左车道线和右车道线距离
|
|
|
|
+ roadPos_ego_df = roadPos_df[roadPos_df.playerId == 1].reset_index(drop=True)
|
|
|
|
+ roadPos_ego_list = roadPos_ego_df['laneOffset'].values.tolist()
|
|
|
|
+ cycle_number = self.func_normal_cycle_optical(roadPos_ego_list)
|
|
|
|
+ if not roadPos_ego_df['simTime'].empty:
|
|
|
|
+ frenquency = cycle_number / roadPos_ego_df['simTime'].values[-1]
|
|
|
|
+ else:
|
|
|
|
+ frenquency = cycle_number
|
|
|
|
+ self.result['value'] = [round(frenquency, 3)]
|
|
|
|
+ self.time_list_follow = roadPos_ego_df['simTime'].values.tolist()
|
|
|
|
+ self.frame_list_follow = roadPos_ego_df['simFrame'].values.tolist()
|
|
|
|
+ self.dist_deviation_list = roadPos_ego_df['laneOffset'].values.tolist()
|
|
|
|
+
|
|
|
|
+ def markline_statistic(self):
|
|
|
|
+ unfunc_df = pd.DataFrame({'simTime': self.time_list_follow, 'simFrame': self.frame_list_follow,
|
|
|
|
+ 'dist_deviation': self.dist_deviation_list})
|
|
|
|
+ unfunc_df = unfunc_df[unfunc_df['simFrame'] > 1]
|
|
|
|
+ # v_df = unfunc_df[unfunc_df['dist_deviation'] > 0]
|
|
|
|
+ v_df = unfunc_df
|
|
|
|
+ v_df = v_df[['simTime', 'simFrame', 'dist_deviation']]
|
|
|
|
+ v_follow_df = continuous_group(v_df)
|
|
|
|
+ v_follow_df['type'] = "ICA"
|
|
|
|
+ self.markline_df = pd.concat([self.markline_df, v_follow_df], ignore_index=True)
|
|
|
|
+
|
|
|
|
+ def report_data_statistic(self):
|
|
|
|
+ time_list = self.ego_df['simTime'].values.tolist()
|
|
|
|
+ graph_list = [x for x in self.dist_deviation_list if not np.isnan(x)]
|
|
|
|
+ self.result['tableData']['avg'] = f'{np.mean(graph_list):.2f}' if graph_list else 0
|
|
|
|
+ self.result['tableData']['max'] = f'{max(graph_list):.2f}' if graph_list else 0
|
|
|
|
+ self.result['tableData']['min'] = f'{min(graph_list):.2f}' if graph_list else 0
|
|
|
|
+
|
|
|
|
+ zip_vs_time = zip_time_pairs(time_list, self.dist_deviation_list)
|
|
|
|
+ self.result['reportData']['data'] = zip_vs_time
|
|
|
|
+
|
|
|
|
+ self.markline_statistic()
|
|
|
|
+ markline_slices = self.markline_df.to_dict('records')
|
|
|
|
+ self.result['reportData']['markLine'] = markline_slices
|
|
|
|
+ self.result['reportData']['range'] = f"[-1.875, 1.875]"
|
|
|
|
+
|
|
|
|
+ def run(self):
|
|
|
|
+ # logger.info(f"Custom metric run:[{self.result['name']}].")
|
|
|
|
+ logger.info(f"[case:{self.case_name}] Custom metric:[ica_distance_deviation:{self.result['name']}] evaluate.")
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ self.data_extract()
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data extract ERROR!", e)
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ self.data_analyze()
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} data analyze ERROR!", e)
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ self.report_data_statistic()
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"[case:{self.case_name}] Custom metric:{self.result['name']} report data statistic ERROR!", e)
|
|
|
|
+
|
|
|
|
+# if __name__ == "__main__":
|
|
|
|
+# pass
|