#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2024 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: zhanghaiwen @Data: 2024/12/23 @Last Modified: 2024/12/23 @Summary: Efficient metrics calculation """ from modules.lib.score import Score from modules.lib.log_manager import LogManager import numpy as np from typing import Dict, Tuple, Optional, Callable, Any import pandas as pd class Efficient: """高效性指标计算类""" def __init__(self, data_processed): """初始化高效性指标计算类 Args: data_processed: 预处理后的数据对象 """ self.logger = LogManager().get_logger() self.data_processed = data_processed self.df = data_processed.object_df.copy() # 浅拷贝 self.ego_df = data_processed.ego_data.copy() # 浅拷贝 # 配置参数 self.STOP_SPEED_THRESHOLD = 0.05 # 停车速度阈值 (m/s) self.STOP_TIME_THRESHOLD = 0.5 # 停车时间阈值 (秒) self.FRAME_RANGE = 13 # 停车帧数阈值 # 初始化结果变量 self.stop_count = 0 # 停车次数 self.stop_duration = 0 # 平均停车时长 self.average_v = 0 # 平均速度 # 统计指标结果字典 self.calculated_value = { 'maxSpeed': 0, 'deviationSpeed': 0, 'averagedSpeed': 0, 'stopDuration': 0, 'speedUtilizationRatio': 0, 'accelerationSmoothness': 0 # 添加新指标的默认值 } def _max_speed(self): """计算最大速度 Returns: float: 最大速度 (m/s) """ max_speed = self.ego_df['v'].max() * 3.6 # 转换为 km/h self.calculated_value['maxSpeed'] = max_speed return max_speed def _deviation_speed(self): """计算速度方差 Returns: float: 速度方差 """ deviation = self.ego_df['v'].var() * 3.6 # 转换为 km/h self.calculated_value['deviationSpeed'] = deviation return deviation def average_velocity(self): """计算平均速度 Returns: float: 平均速度 (km/h) """ self.average_v = self.ego_df['v'].mean() * 3.6 # 转换为 km/h self.calculated_value['averagedSpeed'] = self.average_v return self.average_v def stop_duration_and_count(self): """计算停车次数和平均停车时长 Returns: float: 平均停车时长 (秒) """ # 获取速度低于阈值的时间和帧号 stop_mask = self.ego_df['v'] <= self.STOP_SPEED_THRESHOLD if not any(stop_mask): self.calculated_value['stopDuration'] = 0 return 0 # 如果没有停车,直接返回0 stop_time_list = self.ego_df.loc[stop_mask, 'simTime'].values.tolist() stop_frame_list = self.ego_df.loc[stop_mask, 'simFrame'].values.tolist() if not stop_frame_list: return 0 # 防止空列表导致的索引错误 stop_frame_group = [] stop_time_group = [] sum_stop_time = 0 f1, t1 = stop_frame_list[0], stop_time_list[0] # 检测停车段 for i in range(1, len(stop_frame_list)): if stop_frame_list[i] - stop_frame_list[i - 1] != 1: # 帧不连续 f2, t2 = stop_frame_list[i - 1], stop_time_list[i - 1] # 如果停车有效(帧数差 >= FRAME_RANGE) if f2 - f1 >= self.FRAME_RANGE: stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) self.stop_count += 1 # 更新起始点 f1, t1 = stop_frame_list[i], stop_time_list[i] # 检查最后一段停车 if len(stop_frame_list) > 0: f2, t2 = stop_frame_list[-1], stop_time_list[-1] last_frame = self.ego_df['simFrame'].values[-1] # 确保不是因为数据结束导致的停车 if f2 - f1 >= self.FRAME_RANGE and f2 != last_frame: stop_frame_group.append((f1, f2)) stop_time_group.append((t1, t2)) sum_stop_time += (t2 - t1) self.stop_count += 1 # 计算平均停车时长 self.stop_duration = sum_stop_time / self.stop_count if self.stop_count > 0 else 0 self.calculated_value['stopDuration'] = self.stop_duration self.logger.info(f"检测到停车次数: {self.stop_count}, 平均停车时长: {self.stop_duration:.2f}秒") return self.stop_duration def speed_utilization_ratio(self, default_speed_limit=60.0): """计算速度利用率 速度利用率度量车辆实际速度与道路限速之间的比率, 反映车辆对道路速度资源的利用程度。 计算公式: R_v = v_actual / v_limit Args: default_speed_limit: 默认道路限速 (km/h),当无法获取实际限速时使用 Returns: float: 速度利用率 (0-1之间的比率) """ # 获取车辆速度数据 (m/s) speeds = self.ego_df['v'].values # 尝试从数据中获取道路限速信息 # 首先检查road_speed_max列,其次检查speedLimit列,最后使用默认值 if 'road_speed_max' in self.ego_df.columns: speed_limits = self.ego_df['road_speed_max'].values self.logger.info("使用road_speed_max列作为道路限速信息") elif 'speedLimit' in self.ego_df.columns: speed_limits = self.ego_df['speedLimit'].values self.logger.info("使用speedLimit列作为道路限速信息") else: # 默认限速转换为 m/s default_limit_ms = default_speed_limit / 3.6 speed_limits = np.full_like(speeds, default_limit_ms) self.logger.info(f"未找到道路限速信息,使用默认限速: {default_speed_limit} km/h") # 确保限速值为m/s单位,如果数据是km/h需要转换 # 假设如果限速值大于30,则认为是km/h单位,需要转换为m/s if np.mean(speed_limits) > 30: speed_limits = speed_limits / 3.6 self.logger.info("将限速单位从km/h转换为m/s") # 计算每一帧的速度利用率 ratios = np.divide(speeds, speed_limits, out=np.zeros_like(speeds), where=speed_limits!=0) # 限制比率不超过1(超速按1计算) ratios = np.minimum(ratios, 1.0) # 计算平均速度利用率 avg_ratio = np.mean(ratios) self.calculated_value['speedUtilizationRatio'] = avg_ratio self.logger.info(f"速度利用率(Speed Utilization Ratio): {avg_ratio:.4f}") return avg_ratio # ---------------------- # 基础指标计算函数 # ---------------------- def maxSpeed(data_processed) -> dict: """计算最大速度""" efficient = Efficient(data_processed) max_speed = efficient._max_speed() return {"maxSpeed": float(max_speed)} def deviationSpeed(data_processed) -> dict: """计算速度方差""" efficient = Efficient(data_processed) deviation = efficient._deviation_speed() return {"deviationSpeed": float(deviation)} def averagedSpeed(data_processed) -> dict: """计算平均速度""" efficient = Efficient(data_processed) avg_speed = efficient.average_velocity() return {"averagedSpeed": float(avg_speed)} def stopDuration(data_processed) -> dict: """计算停车持续时间和次数""" efficient = Efficient(data_processed) stop_duration = efficient.stop_duration_and_count() return {"stopDuration": float(stop_duration)} def speedUtilizationRatio(data_processed) -> dict: """计算速度利用率""" efficient = Efficient(data_processed) ratio = efficient.speed_utilization_ratio() return {"speedUtilizationRatio": float(ratio)} def acceleration_smoothness(data_processed) -> dict: """计算加速度平稳度""" efficient = Efficient(data_processed) smoothness = efficient.acceleration_smoothness() return {"accelerationSmoothness": float(smoothness)} class EfficientRegistry: """高效性指标注册器""" def __init__(self, data_processed): self.logger = LogManager().get_logger() # 获取全局日志实例 self.data = data_processed self.eff_config = data_processed.efficient_config["efficient"] self.metrics = self._extract_metrics(self.eff_config) self._registry = self._build_registry() def _extract_metrics(self, config_node: dict) -> list: """DFS遍历提取指标""" metrics = [] def _recurse(node): if isinstance(node, dict): if 'name' in node and not any(isinstance(v, dict) for v in node.values()): metrics.append(node['name']) for v in node.values(): _recurse(v) _recurse(config_node) self.logger.info(f'评比的高效性指标列表:{metrics}') return metrics def _build_registry(self) -> dict: """自动注册指标函数""" registry = {} for metric_name in self.metrics: try: registry[metric_name] = globals()[metric_name] except KeyError: self.logger.error(f"未实现指标函数: {metric_name}") return registry def batch_execute(self) -> dict: """批量执行指标计算""" results = {} for name, func in self._registry.items(): try: result = func(self.data) results.update(result) # 新增:将每个指标的结果写入日志 self.logger.info(f'高效性指标[{name}]计算结果: {result}') except Exception as e: self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True) results[name] = None self.logger.info(f'高效性指标计算结果:{results}') return results class EfficientManager: """高效性指标管理类""" def __init__(self, data_processed): self.data = data_processed self.efficient = EfficientRegistry(self.data) def report_statistic(self): """Generate the statistics and report the results.""" # 使用注册表批量执行指标计算 efficient_result = self.efficient.batch_execute() return efficient_result def acceleration_smoothness(self): """计算加速度平稳度 加速度平稳度用以衡量车辆加减速过程的平滑程度, 通过计算加速度序列的波动程度(标准差)来评估。 平稳度指标定义为 1-σ_a/a_max(归一化后靠近1代表加速度更稳定)。 Returns: float: 加速度平稳度 (0-1之间的比率,越接近1表示越平稳) """ # 获取加速度数据 # 优先使用车辆坐标系下的加速度数据 if 'lon_acc_vehicle' in self.ego_df.columns and 'lat_acc_vehicle' in self.ego_df.columns: # 使用车辆坐标系下的加速度计算合成加速度 lon_acc = self.ego_df['lon_acc_vehicle'].values lat_acc = self.ego_df['lat_acc_vehicle'].values accel_magnitude = np.sqrt(lon_acc**2 + lat_acc**2) self.logger.info("使用车辆坐标系下的加速度计算合成加速度") elif 'accelX' in self.ego_df.columns and 'accelY' in self.ego_df.columns: # 计算合成加速度(考虑X和Y方向) accel_x = self.ego_df['accelX'].values accel_y = self.ego_df['accelY'].values accel_magnitude = np.sqrt(accel_x**2 + accel_y**2) self.logger.info("使用accelX和accelY计算合成加速度") else: # 从速度差分计算加速度 velocity = self.ego_df['v'].values time_diff = self.ego_df['simTime'].diff().fillna(0).values # 避免除以零 time_diff[time_diff == 0] = 1e-6 accel_magnitude = np.abs(np.diff(velocity, prepend=velocity[0]) / time_diff) self.logger.info("从速度差分计算加速度") # 过滤掉异常值(可选) # 使用3倍标准差作为阈值 mean_accel = np.mean(accel_magnitude) std_accel = np.std(accel_magnitude) threshold = mean_accel + 3 * std_accel filtered_accel = accel_magnitude[accel_magnitude <= threshold] # 如果过滤后数据太少,则使用原始数据 if len(filtered_accel) < len(accel_magnitude) * 0.8: filtered_accel = accel_magnitude self.logger.info("过滤后数据太少,使用原始加速度数据") else: self.logger.info(f"过滤掉 {len(accel_magnitude) - len(filtered_accel)} 个异常加速度值") # 计算加速度标准差 accel_std = np.std(filtered_accel) # 计算最大加速度(使用95百分位数以避免极端值影响) accel_max = np.percentile(filtered_accel, 95) # 防止除以零 if accel_max < 0.001: accel_max = 0.001 # 计算平稳度指标: 1 - σ_a/a_max smoothness = 1.0 - (accel_std / accel_max) # 限制在0-1范围内 smoothness = np.clip(smoothness, 0.0, 1.0) self.calculated_value['accelerationSmoothness'] = smoothness self.logger.info(f"加速度标准差: {accel_std:.4f} m/s²") self.logger.info(f"加速度最大值(95百分位): {accel_max:.4f} m/s²") self.logger.info(f"加速度平稳度(Acceleration Smoothness): {smoothness:.4f}") return smoothness