#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2024 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: xieguijin(xieguijin@china-icv.cn) @Data: 2024/12/23 @Last Modified: 2024/12/23 @Summary: Efficient metrics calculation """ from modules.lib.score import Score from modules.lib import log_manager class Efficient: STOP_SPEED_THRESHOLD = 0.05 # Speed threshold to consider as stop STOP_TIME_THRESHOLD = 0.5 # Time threshold to consider as stop (in seconds) FRAME_RANGE = 13 # Frame range to classify stop duration def __init__(self, data_processed): # self.logger = log.get_logger() # 使用时再初始化 self.data_processed = data_processed self.df = data_processed.object_df.copy() self.ego_df = data_processed.ego_data self.stop_count = 0 # Initialize stop count self.stop_duration = 0 # Initialize stop duration def _max_speed(self): """Return the maximum speed in the ego data.""" return self.ego_df['v'].max() def _deviation_speed(self): """Return the variance of the speed in the ego data.""" return self.ego_df['v'].var() def average_velocity(self): """ Calculate the average velocity of the ego data. Average velocity = total distance / total time """ self.average_v = self.ego_df['v'].mean() return self.average_v def stop_duration_and_count(self): """ Calculate stop duration and stop count based on the following: - Stops are detected when speed is <= STOP_SPEED_THRESHOLD - Stops should last more than STOP_TIME_THRESHOLD (in seconds) """ stop_time_list = self.ego_df[self.ego_df['v'] <= self.STOP_SPEED_THRESHOLD]['simTime'].values.tolist() stop_frame_list = self.ego_df[self.ego_df['v'] <= self.STOP_SPEED_THRESHOLD]['simFrame'].values.tolist() stop_frame_group = [] stop_time_group = [] sum_stop_time = 0 f1, t1 = stop_frame_list[0] if stop_frame_list else 0, stop_time_list[0] if stop_time_list else 0 for i in range(1, len(stop_frame_list)): if stop_frame_list[i] - stop_frame_list[i - 1] != 1: # Frame discontinuity f2, t2 = stop_frame_list[i - 1], stop_time_list[i - 1] # If stop is valid (frame gap >= FRAME_RANGE and duration > STOP_TIME_THRESHOLD) if f2 - f1 >= self.FRAME_RANGE: stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) self.stop_count += 1 # Update f1, t1 f1, t1 = stop_frame_list[i], stop_time_list[i] # Check last stop segment if len(stop_frame_list) > 0: f2, t2 = stop_frame_list[-1], stop_time_list[-1] if f2 - f1 >= self.FRAME_RANGE and f2 != self.ego_df['simFrame'].values[-1]: stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) self.stop_count += 1 # Calculate stop duration if stop count is not zero self.stop_duration = sum_stop_time / self.stop_count if self.stop_count != 0 else 0 return self.stop_duration def report_statistic(self): """Generate the statistics and report the results.""" efficient_result = { 'maxSpeed': self._max_speed(), 'deviationSpeed': self._deviation_speed(), 'averagedSpeed': self.average_velocity(), 'stopDuration': self.stop_duration_and_count() } # self.logger.info(f"Efficient metrics calculation completed. Results: {efficient_result}") evaluator = Score(self.data_processed.efficient_config) result = evaluator.evaluate(efficient_result) print("\n[高效性表现及评价结果]") return result