#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: xieguijin(xieguijin@china-icv.cn) @Data: 2023/07/23 @Last Modified: 2023/07/23 @Summary: Efficient metrics calculation """ from modules.lib.score import Score from modules.lib.log_manager import LogManager class Efficient: """高效性指标计算类""" def __init__(self, data_processed): """初始化高效性指标计算类 Args: data_processed: 预处理后的数据对象 """ self.logger = LogManager().get_logger() self.data_processed = data_processed self.df = data_processed.object_df.copy() self.ego_df = data_processed.ego_data.copy() # 配置参数 self.STOP_SPEED_THRESHOLD = 0.05 # 停车速度阈值 (m/s) self.STOP_TIME_THRESHOLD = 0.5 # 停车时间阈值 (秒) self.FRAME_RANGE = 13 # 停车帧数阈值 # 初始化结果变量 self.stop_count = 0 # 停车次数 self.stop_duration = 0 # 平均停车时长 self.average_v = 0 # 平均速度 def _max_speed(self): """计算最大速度 Returns: float: 最大速度 (m/s) """ return self.ego_df['v'].max() def _deviation_speed(self): """计算速度方差 Returns: float: 速度方差 """ return self.ego_df['v'].var() def average_velocity(self): """计算平均速度 Returns: float: 平均速度 (m/s) """ self.average_v = self.ego_df['v'].mean() return self.average_v def stop_duration_and_count(self): """计算停车次数和平均停车时长 Returns: float: 平均停车时长 (秒) """ # 获取速度低于阈值的时间和帧号 stop_mask = self.ego_df['v'] <= self.STOP_SPEED_THRESHOLD if not any(stop_mask): return 0 # 如果没有停车,直接返回0 stop_time_list = self.ego_df.loc[stop_mask, 'simTime'].values.tolist() stop_frame_list = self.ego_df.loc[stop_mask, 'simFrame'].values.tolist() if not stop_frame_list: return 0 # 防止空列表导致的索引错误 stop_frame_group = [] stop_time_group = [] sum_stop_time = 0 f1, t1 = stop_frame_list[0], stop_time_list[0] # 检测停车段 for i in range(1, len(stop_frame_list)): if stop_frame_list[i] - stop_frame_list[i - 1] != 1: # 帧不连续 f2, t2 = stop_frame_list[i - 1], stop_time_list[i - 1] # 如果停车有效(帧数差 >= FRAME_RANGE) if f2 - f1 >= self.FRAME_RANGE: stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) self.stop_count += 1 # 更新起始点 f1, t1 = stop_frame_list[i], stop_time_list[i] # 检查最后一段停车 if len(stop_frame_list) > 0: f2, t2 = stop_frame_list[-1], stop_time_list[-1] last_frame = self.ego_df['simFrame'].values[-1] # 确保不是因为数据结束导致的停车 if f2 - f1 >= self.FRAME_RANGE and f2 != last_frame: stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) self.stop_count += 1 # 计算平均停车时长 self.stop_duration = sum_stop_time / self.stop_count if self.stop_count > 0 else 0 self.logger.info(f"检测到停车次数: {self.stop_count}, 平均停车时长: {self.stop_duration:.2f}秒") return self.stop_duration def report_statistic(self): """生成统计报告 Returns: dict: 高效性评估结果 """ # 计算各项指标 max_speed_ms = self._max_speed() deviation_speed_ms = self._deviation_speed() average_speed_ms = self.average_velocity() # 将 m/s 转换为 km/h 用于评分 max_speed_kmh = max_speed_ms * 3.6 deviation_speed_kmh = deviation_speed_ms * 3.6 average_speed_kmh = average_speed_ms * 3.6 efficient_result = { 'maxSpeed': max_speed_kmh, # 转换为 km/h 'deviationSpeed': deviation_speed_kmh, # 转换为 km/h 'averagedSpeed': average_speed_kmh, # 转换为 km/h 'stopDuration': self.stop_duration_and_count() } self.logger.info(f"高效性指标计算完成,结果: {efficient_result}") # 评分 evaluator = Score(self.data_processed.efficient_config) result = evaluator.evaluate(efficient_result) print("\n[高效性表现及评价结果]") return result