comfort.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn), yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/06/25
  11. @Last Modified: 2025/04/25
  12. @Summary: Comfort metrics
  13. """
  14. import sys
  15. import math
  16. import pandas as pd
  17. import numpy as np
  18. import scipy.signal
  19. from pathlib import Path
  20. from typing import Dict, List, Any, Optional, Callable, Union, Tuple
  21. from modules.lib.score import Score
  22. from modules.lib.common import get_interpolation, get_frame_with_time
  23. from modules.lib import data_process
  24. from modules.lib.log_manager import LogManager
  25. COMFORT_INFO = [
  26. "simTime",
  27. "simFrame",
  28. "speedX",
  29. "speedY",
  30. "accelX",
  31. "accelY",
  32. "curvHor",
  33. "lightMask",
  34. "v",
  35. "lat_acc",
  36. "lon_acc",
  37. "time_diff",
  38. "lon_acc_diff",
  39. "lon_acc_roc",
  40. "speedH",
  41. "accelH",
  42. ]
  43. # ----------------------
  44. # 独立指标计算函数
  45. # ----------------------
  46. def calculate_weaving(data_processed) -> dict:
  47. """计算蛇行指标"""
  48. comfort = ComfortCalculator(data_processed)
  49. zigzag_count = comfort.calculate_zigzag_count()
  50. return {"weaving": float(zigzag_count)}
  51. def calculate_shake(data_processed) -> dict:
  52. """计算晃动指标"""
  53. comfort = ComfortCalculator(data_processed)
  54. shake_count = comfort.calculate_shake_count()
  55. return {"shake": float(shake_count)}
  56. def calculate_cadence(data_processed) -> dict:
  57. """计算顿挫指标"""
  58. comfort = ComfortCalculator(data_processed)
  59. cadence_count = comfort.calculate_cadence_count()
  60. return {"cadence": float(cadence_count)}
  61. def calculate_slambrake(data_processed) -> dict:
  62. """计算急刹车指标"""
  63. comfort = ComfortCalculator(data_processed)
  64. slam_brake_count = comfort.calculate_slam_brake_count()
  65. return {"slamBrake": float(slam_brake_count)}
  66. def calculate_slamaccelerate(data_processed) -> dict:
  67. """计算急加速指标"""
  68. comfort = ComfortCalculator(data_processed)
  69. slam_accel_count = comfort.calculate_slam_accel_count()
  70. return {"slamAccelerate": float(slam_accel_count)}
  71. # 装饰器保持不变
  72. def peak_valley_decorator(method):
  73. def wrapper(self, *args, **kwargs):
  74. peak_valley = self._peak_valley_determination(self.df)
  75. pv_list = self.df.loc[peak_valley, ['simTime', 'speedH']].values.tolist()
  76. if len(pv_list) != 0:
  77. flag = True
  78. p_last = pv_list[0]
  79. for i in range(1, len(pv_list)):
  80. p_curr = pv_list[i]
  81. if self._peak_valley_judgment(p_last, p_curr):
  82. # method(self, p_curr, p_last)
  83. method(self, p_curr, p_last, flag, *args, **kwargs)
  84. else:
  85. p_last = p_curr
  86. return method
  87. else:
  88. flag = False
  89. p_curr = [0, 0]
  90. p_last = [0, 0]
  91. method(self, p_curr, p_last, flag, *args, **kwargs)
  92. return method
  93. return wrapper
  94. class ComfortRegistry:
  95. """舒适性指标注册器"""
  96. def __init__(self, data_processed):
  97. self.logger = LogManager().get_logger() # 获取全局日志实例
  98. self.data = data_processed
  99. self.comfort_config = data_processed.comfort_config["comfort"]
  100. self.metrics = self._extract_metrics(self.comfort_config)
  101. self._registry = self._build_registry()
  102. def _extract_metrics(self, config_node: dict) -> list:
  103. """DFS遍历提取指标"""
  104. metrics = []
  105. def _recurse(node):
  106. if isinstance(node, dict):
  107. if 'name' in node and not any(isinstance(v, dict) for v in node.values()):
  108. metrics.append(node['name'])
  109. for v in node.values():
  110. _recurse(v)
  111. _recurse(config_node)
  112. self.logger.info(f'评比的舒适性指标列表:{metrics}')
  113. return metrics
  114. def _build_registry(self) -> dict:
  115. """自动注册指标函数"""
  116. registry = {}
  117. for metric_name in self.metrics:
  118. func_name = f"calculate_{metric_name.lower()}"
  119. try:
  120. registry[metric_name] = globals()[func_name]
  121. except KeyError:
  122. self.logger.error(f"未实现指标函数: {func_name}")
  123. return registry
  124. def batch_execute(self) -> dict:
  125. """批量执行指标计算"""
  126. results = {}
  127. for name, func in self._registry.items():
  128. try:
  129. result = func(self.data)
  130. results.update(result)
  131. # 新增:将每个指标的结果写入日志
  132. self.logger.info(f'舒适性指标[{name}]计算结果: {result}')
  133. except Exception as e:
  134. self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
  135. results[name] = None
  136. self.logger.info(f'舒适性指标计算结果:{results}')
  137. return results
  138. class ComfortCalculator:
  139. """舒适性指标计算类 - 提供核心计算功能"""
  140. def __init__(self, data_processed):
  141. self.data_processed = data_processed
  142. self.logger = LogManager().get_logger()
  143. self.data = data_processed.ego_data
  144. self.ego_df = pd.DataFrame()
  145. self.discomfort_df = pd.DataFrame(columns=['start_time', 'end_time', 'start_frame', 'end_frame', 'type'])
  146. # 统计指标
  147. self.calculated_value = {
  148. 'weaving': 0,
  149. 'shake': 0,
  150. 'cadence': 0,
  151. 'slamBrake': 0,
  152. 'slamAccelerate': 0
  153. }
  154. self.time_list = self.data['simTime'].values.tolist()
  155. self.frame_list = self.data['simFrame'].values.tolist()
  156. self.zigzag_count = 0
  157. self.shake_count = 0
  158. self.cadence_count = 0
  159. self.slam_brake_count = 0
  160. self.slam_accel_count = 0
  161. self.zigzag_time_list = []
  162. self.zigzag_stre_list = []
  163. self._initialize_data()
  164. def _initialize_data(self):
  165. """初始化数据"""
  166. self.ego_df = self.data[COMFORT_INFO].copy()
  167. self.df = self.ego_df.reset_index(drop=True)
  168. self._prepare_comfort_parameters()
  169. def _prepare_comfort_parameters(self):
  170. """准备舒适性计算所需参数"""
  171. # 计算加减速阈值
  172. self.ego_df['ip_acc'] = self.ego_df['v'].apply(get_interpolation, point1=[18, 4], point2=[72, 2])
  173. self.ego_df['ip_dec'] = self.ego_df['v'].apply(get_interpolation, point1=[18, -5], point2=[72, -3.5])
  174. self.ego_df['slam_brake'] = (self.ego_df['lon_acc'] - self.ego_df['ip_dec']).apply(
  175. lambda x: 1 if x < 0 else 0)
  176. self.ego_df['slam_accel'] = (self.ego_df['lon_acc'] - self.ego_df['ip_acc']).apply(
  177. lambda x: 1 if x > 0 else 0)
  178. self.ego_df['cadence'] = self.ego_df.apply(
  179. lambda row: self._cadence_process_new(row['lon_acc'], row['ip_acc'], row['ip_dec']), axis=1)
  180. def _cal_cur_ego_path(self, row):
  181. """计算车辆轨迹曲率"""
  182. try:
  183. divide = (row['speedX'] ** 2 + row['speedY'] ** 2) ** (3 / 2)
  184. if not divide:
  185. res = None
  186. else:
  187. res = (row['speedX'] * row['accelY'] - row['speedY'] * row['accelX']) / divide
  188. except:
  189. res = None
  190. return res
  191. def _peak_valley_determination(self, df):
  192. """确定角速度的峰谷"""
  193. peaks, _ = scipy.signal.find_peaks(
  194. df['speedH'], height=2.3, distance=3,
  195. prominence=2.3, width=1)
  196. valleys, _ = scipy.signal.find_peaks(
  197. -df['speedH'], height=2.3, distance=3,
  198. prominence=2.3, width=1)
  199. return sorted(list(peaks) + list(valleys))
  200. def _peak_valley_judgment(self, p_last, p_curr, tw=100, avg=4.6):
  201. """判断峰谷是否满足蛇行条件"""
  202. t_diff = p_curr[0] - p_last[0]
  203. v_diff = abs(p_curr[1] - p_last[1])
  204. s = p_curr[1] * p_last[1]
  205. if t_diff < tw and v_diff > avg and s < 0:
  206. if [p_last[0], p_curr[0]] not in self.zigzag_time_list:
  207. self.zigzag_time_list.append([p_last[0], p_curr[0]])
  208. return True
  209. return False
  210. def _cadence_process_new(self, lon_acc, ip_acc, ip_dec):
  211. """处理顿挫数据"""
  212. if abs(lon_acc) < 1 or lon_acc > ip_acc or lon_acc < ip_dec:
  213. return np.nan
  214. elif abs(lon_acc) == 0:
  215. return 0
  216. elif lon_acc > 0 and lon_acc < ip_acc:
  217. return 1
  218. elif lon_acc < 0 and lon_acc > ip_dec:
  219. return -1
  220. else:
  221. return 0
  222. @peak_valley_decorator
  223. def _zigzag_count_func(self, p_curr, p_last, flag=True):
  224. """计算蛇行次数"""
  225. if flag:
  226. self.zigzag_count += 1
  227. else:
  228. self.zigzag_count += 0
  229. @peak_valley_decorator
  230. def _cal_zigzag_strength(self, p_curr, p_last, flag=True):
  231. """计算蛇行强度"""
  232. if flag:
  233. v_diff = abs(p_curr[1] - p_last[1])
  234. t_diff = p_curr[0] - p_last[0]
  235. if t_diff > 0:
  236. self.zigzag_stre_list.append(v_diff / t_diff) # 平均角加速度
  237. else:
  238. self.zigzag_stre_list = []
  239. def _get_zigzag_times(self):
  240. """获取所有蛇行时间点"""
  241. all_times = []
  242. for time_range in self.zigzag_time_list:
  243. start, end = time_range
  244. # 获取这个时间范围内的所有时间点
  245. times_in_range = self.ego_df[(self.ego_df['simTime'] >= start) &
  246. (self.ego_df['simTime'] <= end)]['simTime'].tolist()
  247. all_times.extend(times_in_range)
  248. return all_times
  249. def calculate_zigzag_count(self):
  250. """计算蛇行指标"""
  251. self._zigzag_count_func()
  252. return self.zigzag_count
  253. def calculate_shake_count(self):
  254. """计算晃动指标"""
  255. self._shake_detector()
  256. return self.shake_count
  257. def calculate_cadence_count(self):
  258. """计算顿挫指标"""
  259. self._cadence_detector()
  260. return self.cadence_count
  261. def calculate_slam_brake_count(self):
  262. """计算急刹车指标"""
  263. self._slam_brake_detector()
  264. return self.slam_brake_count
  265. def calculate_slam_accel_count(self):
  266. """计算急加速指标"""
  267. self._slam_accel_detector()
  268. return self.slam_accel_count
  269. def _shake_detector(self, T_diff=0.5):
  270. """检测晃动事件 - 改进版本(不使用车辆轨迹曲率)"""
  271. # lat_acc已经是车辆坐标系下的横向加速度,由data_process.py计算
  272. time_list = []
  273. frame_list = []
  274. # 复制数据以避免修改原始数据
  275. df = self.ego_df.copy()
  276. # 1. 计算横向加速度变化率
  277. df['lat_acc_rate'] = df['lat_acc'].diff() / df['simTime'].diff()
  278. # 2. 计算横摆角速度变化率
  279. df['speedH_rate'] = df['speedH'].diff() / df['simTime'].diff()
  280. # 3. 计算横摆角速度的短期变化特性
  281. window_size = 5 # 5帧窗口
  282. df['speedH_std'] = df['speedH'].rolling(window=window_size, min_periods=2).std()
  283. # 4. 基于车速的动态阈值
  284. v0 = 20 * 5/18 # ≈5.56 m/s
  285. # 递减系数
  286. k = 0.008 * 3.6 # =0.0288 per m/s
  287. df['lat_acc_threshold'] = df['v'].apply(
  288. lambda speed: max(
  289. 1.0, # 下限 1.0 m/s²
  290. min(
  291. 1.8, # 上限 1.8 m/s²
  292. 1.8 - k * (speed - v0) # 线性递减
  293. )
  294. )
  295. )
  296. df['speedH_threshold'] = df['v'].apply(
  297. lambda speed: max(1.5, min(3.0, 2.0 * (1 + (speed - 20) / 60)))
  298. )
  299. # 将计算好的阈值和中间变量保存到self.ego_df中,供其他函数使用
  300. self.ego_df['lat_acc_threshold'] = df['lat_acc_threshold']
  301. self.ego_df['speedH_threshold'] = df['speedH_threshold']
  302. self.ego_df['lat_acc_rate'] = df['lat_acc_rate']
  303. self.ego_df['speedH_rate'] = df['speedH_rate']
  304. self.ego_df['speedH_std'] = df['speedH_std']
  305. # 5. 综合判断晃动条件
  306. # 条件A: 横向加速度超过阈值
  307. condition_A = df['lat_acc'].abs() > df['lat_acc_threshold']
  308. # 条件B: 横向加速度变化率超过阈值
  309. lat_acc_rate_threshold = 0.5 # 横向加速度变化率阈值 (m/s³)
  310. condition_B = df['lat_acc_rate'].abs() > lat_acc_rate_threshold
  311. # 条件C: 横摆角速度有明显变化但不呈现周期性
  312. condition_C = (df['speedH_std'] > df['speedH_threshold']) & (~df['simTime'].isin(self._get_zigzag_times()))
  313. # 综合条件: 满足条件A,且满足条件B或条件C
  314. shake_condition = condition_A & (condition_B | condition_C)
  315. # 筛选满足条件的数据
  316. shake_df = df[shake_condition].copy()
  317. # 按照连续帧号分组,确保只有连续帧超过阈值的才被认为是晃动
  318. if not shake_df.empty:
  319. shake_df['frame_diff'] = shake_df['simFrame'].diff().fillna(0)
  320. shake_df['group'] = (shake_df['frame_diff'] > T_diff).cumsum()
  321. # 分组统计
  322. shake_groups = shake_df.groupby('group')
  323. for _, group in shake_groups:
  324. if len(group) >= 2: # 至少2帧才算一次晃动
  325. time_list.extend(group['simTime'].values)
  326. frame_list.extend(group['simFrame'].values)
  327. self.shake_count += 1
  328. # 分组处理
  329. TIME_RANGE = 1
  330. t_list = time_list
  331. f_list = frame_list
  332. group_time = []
  333. group_frame = []
  334. sub_group_time = []
  335. sub_group_frame = []
  336. if len(f_list) > 0:
  337. for i in range(len(f_list)):
  338. if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE:
  339. sub_group_time.append(t_list[i])
  340. sub_group_frame.append(f_list[i])
  341. else:
  342. group_time.append(sub_group_time)
  343. group_frame.append(sub_group_frame)
  344. sub_group_time = [t_list[i]]
  345. sub_group_frame = [f_list[i]]
  346. group_time.append(sub_group_time)
  347. group_frame.append(sub_group_frame)
  348. # 输出图表值
  349. shake_time = [[g[0], g[-1]] for g in group_time]
  350. shake_frame = [[g[0], g[-1]] for g in group_frame]
  351. self.shake_count = len(shake_time)
  352. if shake_time:
  353. time_df = pd.DataFrame(shake_time, columns=['start_time', 'end_time'])
  354. frame_df = pd.DataFrame(shake_frame, columns=['start_frame', 'end_frame'])
  355. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  356. discomfort_df['type'] = 'shake'
  357. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  358. return time_list
  359. def _cadence_detector(self):
  360. """顿挫检测器"""
  361. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'cadence']].copy()
  362. time_list = data['simTime'].values.tolist()
  363. data = data[data['cadence'] != np.nan]
  364. data['cadence_diff'] = data['cadence'].diff()
  365. data.dropna(subset='cadence_diff', inplace=True)
  366. data = data[data['cadence_diff'] != 0]
  367. t_list = data['simTime'].values.tolist()
  368. f_list = data['simFrame'].values.tolist()
  369. TIME_RANGE = 1
  370. group_time = []
  371. group_frame = []
  372. sub_group_time = []
  373. sub_group_frame = []
  374. for i in range(len(f_list)):
  375. if not sub_group_time or t_list[i] - t_list[i - 1] <= TIME_RANGE: # 特征点相邻一秒内的,算作同一组顿挫
  376. sub_group_time.append(t_list[i])
  377. sub_group_frame.append(f_list[i])
  378. else:
  379. group_time.append(sub_group_time)
  380. group_frame.append(sub_group_frame)
  381. sub_group_time = [t_list[i]]
  382. sub_group_frame = [f_list[i]]
  383. group_time.append(sub_group_time)
  384. group_frame.append(sub_group_frame)
  385. group_time = [g for g in group_time if len(g) >= 1] # 有一次特征点则算作一次顿挫
  386. group_frame = [g for g in group_frame if len(g) >= 1]
  387. # 输出图表值
  388. cadence_time = [[g[0], g[-1]] for g in group_time]
  389. cadence_frame = [[g[0], g[-1]] for g in group_frame]
  390. if cadence_time:
  391. time_df = pd.DataFrame(cadence_time, columns=['start_time', 'end_time'])
  392. frame_df = pd.DataFrame(cadence_frame, columns=['start_frame', 'end_frame'])
  393. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  394. discomfort_df['type'] = 'cadence'
  395. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  396. # 将顿挫组的起始时间为组重新统计时间
  397. cadence_time_list = [time for pair in cadence_time for time in time_list if pair[0] <= time <= pair[1]]
  398. stre_list = []
  399. freq_list = []
  400. for g in group_time:
  401. # calculate strength
  402. g_df = data[data['simTime'].isin(g)]
  403. strength = g_df['lon_acc'].abs().mean()
  404. stre_list.append(strength)
  405. # calculate frequency
  406. cnt = len(g)
  407. t_start = g_df['simTime'].iloc[0]
  408. t_end = g_df['simTime'].iloc[-1]
  409. t_delta = t_end - t_start
  410. frequency = cnt / t_delta
  411. freq_list.append(frequency)
  412. self.cadence_count = len(freq_list)
  413. cadence_stre = sum(stre_list) / len(stre_list) if stre_list else 0
  414. return cadence_time_list
  415. def _slam_brake_detector(self):
  416. """急刹车检测器"""
  417. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'lon_acc_roc', 'ip_dec', 'slam_brake']].copy()
  418. res_df = data[data['slam_brake'] == 1]
  419. t_list = res_df['simTime'].values
  420. f_list = res_df['simFrame'].values.tolist()
  421. TIME_RANGE = 1
  422. group_time = []
  423. group_frame = []
  424. sub_group_time = []
  425. sub_group_frame = []
  426. for i in range(len(f_list)):
  427. if not sub_group_time or f_list[i] - f_list[i - 1] <= TIME_RANGE: # 连续帧的算作同一组急刹
  428. sub_group_time.append(t_list[i])
  429. sub_group_frame.append(f_list[i])
  430. else:
  431. group_time.append(sub_group_time)
  432. group_frame.append(sub_group_frame)
  433. sub_group_time = [t_list[i]]
  434. sub_group_frame = [f_list[i]]
  435. group_time.append(sub_group_time)
  436. group_frame.append(sub_group_frame)
  437. group_time = [g for g in group_time if len(g) >= 2] # 达到两帧算作一次急刹
  438. group_frame = [g for g in group_frame if len(g) >= 2]
  439. # 输出图表值
  440. slam_brake_time = [[g[0], g[-1]] for g in group_time]
  441. slam_brake_frame = [[g[0], g[-1]] for g in group_frame]
  442. if slam_brake_time:
  443. time_df = pd.DataFrame(slam_brake_time, columns=['start_time', 'end_time'])
  444. frame_df = pd.DataFrame(slam_brake_frame, columns=['start_frame', 'end_frame'])
  445. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  446. discomfort_df['type'] = 'slam_brake'
  447. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  448. time_list = [element for sublist in group_time for element in sublist]
  449. self.slam_brake_count = len(group_time)
  450. return time_list
  451. def _slam_accel_detector(self):
  452. """急加速检测器"""
  453. data = self.ego_df[['simTime', 'simFrame', 'lon_acc', 'ip_acc', 'slam_accel']].copy()
  454. res_df = data.loc[data['slam_accel'] == 1]
  455. t_list = res_df['simTime'].values
  456. f_list = res_df['simFrame'].values.tolist()
  457. group_time = []
  458. group_frame = []
  459. sub_group_time = []
  460. sub_group_frame = []
  461. for i in range(len(f_list)):
  462. if not group_time or f_list[i] - f_list[i - 1] <= 1: # 连续帧的算作同一组急加速
  463. sub_group_time.append(t_list[i])
  464. sub_group_frame.append(f_list[i])
  465. else:
  466. group_time.append(sub_group_time)
  467. group_frame.append(sub_group_frame)
  468. sub_group_time = [t_list[i]]
  469. sub_group_frame = [f_list[i]]
  470. group_time.append(sub_group_time)
  471. group_frame.append(sub_group_frame)
  472. group_time = [g for g in group_time if len(g) >= 2]
  473. group_frame = [g for g in group_frame if len(g) >= 2]
  474. # 输出图表值
  475. slam_accel_time = [[g[0], g[-1]] for g in group_time]
  476. slam_accel_frame = [[g[0], g[-1]] for g in group_frame]
  477. if slam_accel_time:
  478. time_df = pd.DataFrame(slam_accel_time, columns=['start_time', 'end_time'])
  479. frame_df = pd.DataFrame(slam_accel_frame, columns=['start_frame', 'end_frame'])
  480. discomfort_df = pd.concat([time_df, frame_df], axis=1)
  481. discomfort_df['type'] = 'slam_accel'
  482. self.discomfort_df = pd.concat([self.discomfort_df, discomfort_df], ignore_index=True)
  483. time_list = [element for sublist in group_time for element in sublist]
  484. self.slam_accel_count = len(group_time)
  485. return time_list
  486. class ComfortManager:
  487. """舒适性指标计算主类"""
  488. def __init__(self, data_processed):
  489. self.data = data_processed
  490. self.logger = LogManager().get_logger()
  491. self.registry = ComfortRegistry(self.data)
  492. def report_statistic(self):
  493. """生成舒适性评分报告"""
  494. comfort_result = self.registry.batch_execute()
  495. return comfort_result
  496. if __name__ == '__main__':
  497. case_name = 'ICA'
  498. mode_label = 'PGVIL'
  499. data = data_process.DataPreprocessing(case_name, mode_label)
  500. comfort_instance = ComfortManager(data)
  501. try:
  502. comfort_result = comfort_instance.report_statistic()
  503. result = {'comfort': comfort_result}
  504. print(result)
  505. except Exception as e:
  506. print(f"An error occurred in Comfort.report_statistic: {e}")