123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
- @Data: 2023/06/25
- @Last Modified: 2025/05/20
- @Summary: Chart generation utilities for metrics visualization
- """
- import os
- import numpy as np
- import pandas as pd
- import matplotlib
- matplotlib.use('Agg') # 使用非图形界面的后端
- import matplotlib.pyplot as plt
- from typing import Optional, Dict, List, Any, Union
- from pathlib import Path
- from modules.lib.log_manager import LogManager
- def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
- """
- Generate chart data for function metrics
-
- Args:
- function_calculator: FunctionCalculator instance
- metric_name: Metric name
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 确保输出目录存在
- if output_dir:
- os.makedirs(output_dir, exist_ok=True)
- else:
- output_dir = os.getcwd()
-
- # 根据指标名称选择不同的图表生成方法
- if metric_name.lower() == 'latestwarningdistance_ttc_lst':
- return generate_warning_ttc_chart(function_calculator, output_dir)
- else:
- logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
- return None
-
- except Exception as e:
- logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
- return None
- def generate_warning_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate TTC warning chart with data visualization.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- ego_df = function_calculator.ego_data.copy()
- scenario_name = function_calculator.data.function_config["function"]["scenario"]["name"]
- correctwarning = scenario_sign_dict[scenario_name]
- warning_dist = calculate_distance(ego_df, correctwarning)
- warning_speed = calculate_relative_speed(ego_df, correctwarning)
- if warning_dist.empty:
- logger.warning("Cannot generate TTC warning chart: empty data")
- return None
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- # 保存 CSV 数据
- csv_filename = os.path.join(output_dir, f"warning_ttc_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': ego_df['simTime'],
- 'warning_distance': warning_dist,
- 'warning_speed': warning_speed,
- 'ttc': warning_dist / warning_speed
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Warning TTC data saved to: {csv_filename}")
- # 从 CSV 读取数据
- df = pd.read_csv(csv_filename)
- # 创建图表
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:预警距离
- ax1 = plt.subplot(3, 1, 1)
- ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Distance (m)')
- ax1.set_title('Warning Distance Over Time')
- ax1.grid(True)
- ax1.legend()
- # 图 2:相对速度
- ax2 = plt.subplot(3, 1, 2)
- ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Speed (m/s)')
- ax2.set_title('Relative Speed Over Time')
- ax2.grid(True)
- ax2.legend()
- # 图 3:TTC
- ax3 = plt.subplot(3, 1, 3)
- ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
- ax3.set_xlabel('Time (s)')
- ax3.set_ylabel('TTC (s)')
- ax3.set_title('Time To Collision (TTC) Over Time')
- ax3.grid(True)
- ax3.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"warning_ttc_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"Warning TTC chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate warning TTC chart: {str(e)}", exc_info=True)
- return None
- def generate_comfort_chart_data(comfort_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
- """
- Generate chart data for comfort metrics
-
- Args:
- comfort_calculator: ComfortCalculator instance
- metric_name: Metric name
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 确保输出目录存在
- if output_dir:
- os.makedirs(output_dir, exist_ok=True)
- else:
- output_dir = os.getcwd()
-
- # 根据指标名称选择不同的图表生成方法
- if metric_name.lower() == 'shake':
- return generate_shake_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'zigzag':
- return generate_zigzag_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'cadence':
- return generate_cadence_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'slambrake':
- return generate_slam_brake_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'slamaccelerate':
- return generate_slam_accelerate_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'vdv':
- return generate_vdv_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'ava_vav':
- return generate_ava_vav_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'msdv':
- return generate_msdv_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'motionsickness':
- return generate_motion_sickness_chart(comfort_calculator, output_dir)
- else:
- logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
- return None
-
- except Exception as e:
- logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
- return None
- def generate_shake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate shake metric chart with orange background for shake events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- shake_events = comfort_calculator.shake_events
- if df.empty:
- logger.warning("Cannot generate shake chart: empty data")
- return None
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"shake_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'lat_acc': df['lat_acc'],
- 'lat_acc_rate': df['lat_acc_rate'],
- 'speedH_std': df['speedH_std'],
- 'lat_acc_threshold': df.get('lat_acc_threshold', pd.Series([None]*len(df))),
- 'lat_acc_rate_threshold': 0.5,
- 'speedH_std_threshold': df.get('speedH_threshold', pd.Series([None]*len(df))),
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Shake data saved to: {csv_filename}")
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
- # 创建图表(第三步)
- import matplotlib.pyplot as plt
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:横向加速度
- ax1 = plt.subplot(3, 1, 1)
- ax1.plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration')
- if 'lat_acc_threshold' in df.columns:
- ax1.plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='lat_acc_threshold')
- for idx, event in enumerate(shake_events):
- label = 'Shake Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Lateral Acceleration (m/s²)')
- ax1.set_title('Shake Event Detection - Lateral Acceleration')
- ax1.grid(True)
- ax1.legend()
- # 图 2:lat_acc_rate
- ax2 = plt.subplot(3, 1, 2)
- ax2.plot(df['simTime'], df['lat_acc_rate'], 'g-', label='lat_acc_rate')
- ax2.axhline(
- y=0.5, color='orange', linestyle='--', linewidth=1.2, label='lat_acc_rate_threshold'
- )
- for idx, event in enumerate(shake_events):
- label = 'Shake Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Angular Velocity (m/s³)')
- ax2.set_title('Shake Event Detection - lat_acc_rate')
- ax2.grid(True)
- ax2.legend()
- # 图 3:speedH_std
- ax3 = plt.subplot(3, 1, 3)
- ax3.plot(df['simTime'], df['speedH_std'], 'b-', label='speedH_std')
- if 'speedH_std_threshold' in df.columns:
- ax3.plot(df['simTime'], df['speedH_std_threshold'], 'r--', label='speedH_threshold')
- for idx, event in enumerate(shake_events):
- label = 'Shake Event' if idx == 0 else None
- ax3.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
- ax3.set_xlabel('Time (s)')
- ax3.set_ylabel('Angular Velocity (deg/s)')
- ax3.set_title('Shake Event Detection - speedH_std')
- ax3.grid(True)
- ax3.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"shake_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"Shake chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate shake chart: {str(e)}", exc_info=True)
- return None
- def generate_zigzag_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate zigzag metric chart with orange background for zigzag events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- zigzag_events = comfort_calculator.discomfort_df[
- comfort_calculator.discomfort_df['type'] == 'zigzag'
- ].copy()
- if df.empty:
- logger.warning("Cannot generate zigzag chart: empty data")
- return None
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"zigzag_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'speedH': df['speedH'],
- 'posH': df['posH'],
- 'min_speedH_threshold': -2.3, # 可替换为动态阈值
- 'max_speedH_threshold': 2.3
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Zigzag data saved to: {csv_filename}")
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
- # 创建图表(第三步)
- import matplotlib.pyplot as plt
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # ===== 子图1:Yaw Rate =====
- ax1 = plt.subplot(2, 1, 1)
- ax1.plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate')
- # 添加 speedH 上下限阈值线
- ax1.axhline(y=2.3, color='m', linestyle='--', linewidth=1.2, label='Max Threshold (+2.3)')
- ax1.axhline(y=-2.3, color='r', linestyle='--', linewidth=1.2, label='Min Threshold (-2.3)')
- # 添加橙色背景:Zigzag Events
- for idx, event in zigzag_events.iterrows():
- label = 'Zigzag Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Angular Velocity (deg/s)')
- ax1.set_title('Zigzag Event Detection - Yaw Rate')
- ax1.grid(True)
- ax1.legend(loc='upper left')
- # ===== 子图2:Yaw Angle =====
- ax2 = plt.subplot(2, 1, 2)
- ax2.plot(df['simTime'], df['posH'], 'b-', label='Yaw')
- # 添加橙色背景:Zigzag Events
- for idx, event in zigzag_events.iterrows():
- label = 'Zigzag Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Yaw (deg)')
- ax2.set_title('Zigzag Event Detection - Yaw Angle')
- ax2.grid(True)
- ax2.legend(loc='upper left')
- # 保存图像
- chart_filename = os.path.join(output_dir, f"zigzag_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"Zigzag chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate zigzag chart: {str(e)}", exc_info=True)
- return None
-
- def generate_cadence_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate cadence metric chart with orange background for cadence events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
-
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- cadence_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'cadence'].copy()
-
- if df.empty:
- logger.warning("Cannot generate cadence chart: empty data")
- return None
-
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"cadence_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'lon_acc': df['lon_acc'],
- 'v': df['v'],
- 'ip_acc': df.get('ip_acc', pd.Series([None]*len(df))),
- 'ip_dec': df.get('ip_dec', pd.Series([None]*len(df)))
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Cadence data saved to: {csv_filename}")
-
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
-
- # 创建图表(第三步)
- import matplotlib.pyplot as plt
- plt.figure(figsize=(12, 8), constrained_layout=True)
-
- # 图 1:纵向加速度
- ax1 = plt.subplot(2, 1, 1)
- ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
- if 'ip_acc' in df.columns and 'ip_dec' in df.columns:
- ax1.plot(df['simTime'], df['ip_acc'], 'r--', label='Acceleration Threshold')
- ax1.plot(df['simTime'], df['ip_dec'], 'g--', label='Deceleration Threshold')
-
- # 添加橙色背景标识顿挫事件
- for idx, event in cadence_events.iterrows():
- label = 'Cadence Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
- ax1.set_title('Cadence Event Detection - Longitudinal Acceleration')
- ax1.grid(True)
- ax1.legend()
-
- # 图 2:速度
- ax2 = plt.subplot(2, 1, 2)
- ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
-
- # 添加橙色背景标识顿挫事件
- for idx, event in cadence_events.iterrows():
- label = 'Cadence Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Velocity (m/s)')
- ax2.set_title('Cadence Event Detection - Vehicle Speed')
- ax2.grid(True)
- ax2.legend()
-
- # 保存图像
- chart_filename = os.path.join(output_dir, f"cadence_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- logger.info(f"Cadence chart saved to: {chart_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate cadence chart: {str(e)}", exc_info=True)
- return None
- def generate_slam_brake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate slam brake metric chart with orange background for slam brake events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
-
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- slam_brake_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'slam_brake'].copy()
-
- if df.empty:
- logger.warning("Cannot generate slam brake chart: empty data")
- return None
-
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"slam_brake_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'lon_acc': df['lon_acc'],
- 'v': df['v'],
- 'min_threshold': df.get('ip_dec', pd.Series([None]*len(df))),
- 'max_threshold': 0.0
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Slam brake data saved to: {csv_filename}")
-
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
-
- # 创建图表(第三步)
- plt.figure(figsize=(12, 8), constrained_layout=True)
-
- # 图 1:纵向加速度
- ax1 = plt.subplot(2, 1, 1)
- ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
- if 'min_threshold' in df.columns:
- ax1.plot(df['simTime'], df['min_threshold'], 'r--', label='Deceleration Threshold')
-
- # 添加橙色背景标识急刹车事件
- for idx, event in slam_brake_events.iterrows():
- label = 'Slam Brake Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
- ax1.set_title('Slam Brake Event Detection - Longitudinal Acceleration')
- ax1.grid(True)
- ax1.legend()
-
- # 图 2:速度
- ax2 = plt.subplot(2, 1, 2)
- ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
-
- # 添加橙色背景标识急刹车事件
- for idx, event in slam_brake_events.iterrows():
- label = 'Slam Brake Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Velocity (m/s)')
- ax2.set_title('Slam Brake Event Detection - Vehicle Speed')
- ax2.grid(True)
- ax2.legend()
-
- # 保存图像
- chart_filename = os.path.join(output_dir, f"slam_brake_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- logger.info(f"Slam brake chart saved to: {chart_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate slam brake chart: {str(e)}", exc_info=True)
- return None
- def generate_slam_accelerate_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate slam accelerate metric chart with orange background for slam accelerate events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
-
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- slam_accel_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'slam_accel'].copy()
-
- if df.empty:
- logger.warning("Cannot generate slam accelerate chart: empty data")
- return None
-
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"slam_accel_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'lon_acc': df['lon_acc'],
- 'v': df['v'],
- 'min_threshold': 0.0,
- 'max_threshold': df.get('ip_acc', pd.Series([None]*len(df)))
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Slam accelerate data saved to: {csv_filename}")
-
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
-
- # 创建图表(第三步)
- plt.figure(figsize=(12, 8), constrained_layout=True)
-
- # 图 1:纵向加速度
- ax1 = plt.subplot(2, 1, 1)
- ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
- if 'max_threshold' in df.columns:
- ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Acceleration Threshold')
-
- # 添加橙色背景标识急加速事件
- for idx, event in slam_accel_events.iterrows():
- label = 'Slam Accelerate Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
- ax1.set_title('Slam Accelerate Event Detection - Longitudinal Acceleration')
- ax1.grid(True)
- ax1.legend()
-
- # 图 2:速度
- ax2 = plt.subplot(2, 1, 2)
- ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
-
- # 添加橙色背景标识急加速事件
- for idx, event in slam_accel_events.iterrows():
- label = 'Slam Accelerate Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Velocity (m/s)')
- ax2.set_title('Slam Accelerate Event Detection - Vehicle Speed')
- ax2.grid(True)
- ax2.legend()
-
- # 保存图像
- chart_filename = os.path.join(output_dir, f"slam_accel_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- logger.info(f"Slam accelerate chart saved to: {chart_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate slam accelerate chart: {str(e)}", exc_info=True)
- return None
- def get_metric_thresholds(calculator, metric_name: str) -> dict:
- """
- 从配置文件中获取指标的阈值
-
- Args:
- calculator: Calculator instance (SafetyCalculator or ComfortCalculator)
- metric_name: 指标名称
-
- Returns:
- dict: 包含min和max阈值的字典
- """
- logger = LogManager().get_logger()
- thresholds = {"min": None, "max": None}
-
- try:
- # 根据计算器类型获取配置
- if hasattr(calculator, 'data_processed'):
- if hasattr(calculator.data_processed, 'safety_config') and 'safety' in calculator.data_processed.safety_config:
- config = calculator.data_processed.safety_config['safety']
- metric_type = 'safety'
- elif hasattr(calculator.data_processed, 'comfort_config') and 'comfort' in calculator.data_processed.comfort_config:
- config = calculator.data_processed.comfort_config['comfort']
- metric_type = 'comfort'
- else:
- logger.warning(f"无法找到{metric_name}的配置信息")
- return thresholds
- else:
- logger.warning(f"计算器没有data_processed属性")
- return thresholds
-
- # 递归查找指标配置
- def find_metric_config(node, target_name):
- if isinstance(node, dict):
- if 'name' in node and node['name'].lower() == target_name.lower() and 'min' in node and 'max' in node:
- return node
- for key, value in node.items():
- result = find_metric_config(value, target_name)
- if result:
- return result
- return None
-
- # 查找指标配置
- metric_config = find_metric_config(config, metric_name)
- if metric_config:
- thresholds["min"] = metric_config.get("min")
- thresholds["max"] = metric_config.get("max")
- logger.info(f"找到{metric_name}的阈值: min={thresholds['min']}, max={thresholds['max']}")
- else:
- logger.warning(f"在{metric_type}配置中未找到{metric_name}的阈值信息")
-
- except Exception as e:
- logger.error(f"获取{metric_name}阈值时出错: {str(e)}", exc_info=True)
-
- return thresholds
- def generate_safety_chart_data(safety_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
- """
- Generate chart data for safety metrics
-
- Args:
- safety_calculator: SafetyCalculator instance
- metric_name: Metric name
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 确保输出目录存在
- if output_dir:
- os.makedirs(output_dir, exist_ok=True)
- else:
- output_dir = os.getcwd()
-
- # 根据指标名称选择不同的图表生成方法
- if metric_name.lower() == 'ttc':
- return generate_ttc_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'mttc':
- return generate_mttc_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'thw':
- return generate_thw_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'lonsd':
- return generate_lonsd_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'latsd':
- return generate_latsd_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'btn':
- return generate_btn_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'collisionrisk':
- return generate_collision_risk_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'collisionseverity':
- return generate_collision_severity_chart(safety_calculator, output_dir)
- else:
- logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
- return None
-
- except Exception as e:
- logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
- return None
- def generate_ttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate TTC metric chart with orange background for unsafe events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
-
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- ttc_data = safety_calculator.ttc_data
-
- if not ttc_data:
- logger.warning("Cannot generate TTC chart: empty data")
- return None
-
- # 创建DataFrame
- df = pd.DataFrame(ttc_data)
-
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'TTC')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"ttc_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'simFrame': df['simFrame'],
- 'TTC': df['TTC'],
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"TTC data saved to: {csv_filename}")
-
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
-
- # 检测超阈值事件
- unsafe_events = []
- if min_threshold is not None:
- # 对于TTC,小于最小阈值视为不安全
- unsafe_condition = df['TTC'] < min_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
-
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
-
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'min_ttc': group['TTC'].min()
- })
-
- # 创建图表(第三步)
- plt.figure(figsize=(12, 8))
- plt.plot(df['simTime'], df['TTC'], 'b-', label='TTC')
-
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
-
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe TTC Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- plt.xlabel('Time (s)')
- plt.ylabel('TTC (s)')
- plt.title('Time To Collision (TTC) Trend')
- plt.grid(True)
- plt.legend()
-
- # 保存图像
- chart_filename = os.path.join(output_dir, f"ttc_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个TTC不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(f"TTC不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小TTC={event['min_ttc']:.2f}s")
-
- logger.info(f"TTC chart saved to: {chart_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate TTC chart: {str(e)}", exc_info=True)
- return None
- def generate_mttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate MTTC metric chart with orange background for unsafe events
-
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- mttc_data = safety_calculator.mttc_data
-
- if not mttc_data:
- logger.warning("Cannot generate MTTC chart: empty data")
- return None
-
- # 创建DataFrame
- df = pd.DataFrame(mttc_data)
-
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'MTTC')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 检测超阈值事件
- unsafe_events = []
- if min_threshold is not None:
- # 对于MTTC,小于最小阈值视为不安全
- unsafe_condition = df['MTTC'] < min_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
-
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
-
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'min_mttc': group['MTTC'].min()
- })
-
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['MTTC'], 'g-', label='MTTC')
-
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
-
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe MTTC Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- plt.xlabel('Time (s)')
- plt.ylabel('MTTC (s)')
- plt.title('Modified Time To Collision (MTTC) Trend')
- plt.grid(True)
- plt.legend()
-
- # 保存图表
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- chart_filename = os.path.join(output_dir, f"mttc_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"mttc_data_{timestamp}.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
-
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个MTTC不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(f"MTTC不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小MTTC={event['min_mttc']:.2f}s")
-
- logger.info(f"MTTC chart saved to: {chart_filename}")
- logger.info(f"MTTC data saved to: {csv_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate MTTC chart: {str(e)}", exc_info=True)
- return None
- def generate_thw_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate THW metric chart with orange background for unsafe events
-
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- thw_data = safety_calculator.thw_data
-
- if not thw_data:
- logger.warning("Cannot generate THW chart: empty data")
- return None
-
- # 创建DataFrame
- df = pd.DataFrame(thw_data)
-
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'THW')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 检测超阈值事件
- unsafe_events = []
- if min_threshold is not None:
- # 对于THW,小于最小阈值视为不安全
- unsafe_condition = df['THW'] < min_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
-
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
-
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'min_thw': group['THW'].min()
- })
-
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['THW'], 'c-', label='THW')
-
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
-
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe THW Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- plt.xlabel('Time (s)')
- plt.ylabel('THW (s)')
- plt.title('Time Headway (THW) Trend')
- plt.grid(True)
- plt.legend()
-
- # 保存图表
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- chart_filename = os.path.join(output_dir, f"thw_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"thw_data_{timestamp}.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
-
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个THW不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(f"THW不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小THW={event['min_thw']:.2f}s")
-
- logger.info(f"THW chart saved to: {chart_filename}")
- logger.info(f"THW data saved to: {csv_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate THW chart: {str(e)}", exc_info=True)
- return None
- def generate_lonsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Longitudinal Safe Distance metric chart
-
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- lonsd_data = safety_calculator.lonsd_data
-
- if not lonsd_data:
- logger.warning("Cannot generate Longitudinal Safe Distance chart: empty data")
- return None
-
- # 创建DataFrame
- df = pd.DataFrame(lonsd_data)
-
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'LonSD')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['LonSD'], 'm-', label='Longitudinal Safe Distance')
-
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
-
- plt.xlabel('Time (s)')
- plt.ylabel('Distance (m)')
- plt.title('Longitudinal Safe Distance (LonSD) Trend')
- plt.grid(True)
- plt.legend()
-
- # 保存图表
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- chart_filename = os.path.join(output_dir, f"lonsd_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"lonsd_data_{timestamp}.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
-
- logger.info(f"Longitudinal Safe Distance chart saved to: {chart_filename}")
- logger.info(f"Longitudinal Safe Distance data saved to: {csv_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate Longitudinal Safe Distance chart: {str(e)}", exc_info=True)
- return None
- def generate_latsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Lateral Safe Distance metric chart with orange background for unsafe events
-
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- latsd_data = safety_calculator.latsd_data
-
- if not latsd_data:
- logger.warning("Cannot generate Lateral Safe Distance chart: empty data")
- return None
-
- # 创建DataFrame
- df = pd.DataFrame(latsd_data)
-
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'LatSD')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 检测超阈值事件
- unsafe_events = []
- if min_threshold is not None:
- # 对于LatSD,小于最小阈值视为不安全
- unsafe_condition = df['LatSD'] < min_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
-
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
-
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'min_latsd': group['LatSD'].min()
- })
-
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['LatSD'], 'y-', label='Lateral Safe Distance')
-
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
-
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe LatSD Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- plt.xlabel('Time (s)')
- plt.ylabel('Distance (m)')
- plt.title('Lateral Safe Distance (LatSD) Trend')
- plt.grid(True)
- plt.legend()
-
- # 保存图表
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- chart_filename = os.path.join(output_dir, f"latsd_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"latsd_data_{timestamp}.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
-
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个LatSD不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(f"LatSD不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小LatSD={event['min_latsd']:.2f}m")
-
- logger.info(f"Lateral Safe Distance chart saved to: {chart_filename}")
- logger.info(f"Lateral Safe Distance data saved to: {csv_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate Lateral Safe Distance chart: {str(e)}", exc_info=True)
- return None
- def generate_btn_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Brake Threat Number metric chart with orange background for unsafe events
-
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- btn_data = safety_calculator.btn_data
-
- if not btn_data:
- logger.warning("Cannot generate Brake Threat Number chart: empty data")
- return None
-
- # 创建DataFrame
- df = pd.DataFrame(btn_data)
-
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'BTN')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 检测超阈值事件
- unsafe_events = []
- if max_threshold is not None:
- # 对于BTN,大于最大阈值视为不安全
- unsafe_condition = df['BTN'] > max_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
-
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
-
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'max_btn': group['BTN'].max()
- })
-
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['BTN'], 'r-', label='Brake Threat Number')
-
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold})')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
-
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe BTN Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
-
- plt.xlabel('Time (s)')
- plt.ylabel('BTN')
- plt.title('Brake Threat Number (BTN) Trend')
- plt.grid(True)
- plt.legend()
-
- # 保存图表
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- chart_filename = os.path.join(output_dir, f"btn_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"btn_data_{timestamp}.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
-
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个BTN不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(f"BTN不安全事件 #{i+1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最大BTN={event['max_btn']:.2f}")
-
- logger.info(f"Brake Threat Number chart saved to: {chart_filename}")
- logger.info(f"Brake Threat Number data saved to: {csv_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate Brake Threat Number chart: {str(e)}", exc_info=True)
- return None
- def generate_collision_risk_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Collision Risk metric chart
-
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- risk_data = safety_calculator.collision_risk_data
-
- if not risk_data:
- logger.warning("Cannot generate Collision Risk chart: empty data")
- return None
-
- # 创建DataFrame
- df = pd.DataFrame(risk_data)
-
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'collisionRisk')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['collisionRisk'], 'r-', label='Collision Risk')
-
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)')
-
- plt.xlabel('Time (s)')
- plt.ylabel('Risk Value (%)')
- plt.title('Collision Risk (collisionRisk) Trend')
- plt.grid(True)
- plt.legend()
-
- # 保存图表
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- chart_filename = os.path.join(output_dir, f"collision_risk_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"collisionrisk_data_{timestamp}.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
-
- logger.info(f"Collision Risk chart saved to: {chart_filename}")
- logger.info(f"Collision Risk data saved to: {csv_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate Collision Risk chart: {str(e)}", exc_info=True)
- return None
- def generate_collision_severity_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Collision Severity metric chart
-
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- severity_data = safety_calculator.collision_severity_data
-
- if not severity_data:
- logger.warning("Cannot generate Collision Severity chart: empty data")
- return None
-
- # 创建DataFrame
- df = pd.DataFrame(severity_data)
-
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'collisionSeverity')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['collisionSeverity'], 'r-', label='Collision Severity')
-
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)')
-
- plt.xlabel('Time (s)')
- plt.ylabel('Severity (%)')
- plt.title('Collision Severity (collisionSeverity) Trend')
- plt.grid(True)
- plt.legend()
-
- # 保存图表
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- chart_filename = os.path.join(output_dir, f"collision_severity_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"collisionseverity_data_{timestamp}.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
-
- logger.info(f"Collision Severity chart saved to: {chart_filename}")
- logger.info(f"Collision Severity data saved to: {csv_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate Collision Severity chart: {str(e)}", exc_info=True)
- return None
- def generate_vdv_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate VDV (Vibration Dose Value) metric chart with data saved to CSV first.
- This version first saves data to CSV, then uses the CSV to generate the chart.
-
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- vdv_value = comfort_calculator.calculated_value.get('vdv', 0)
-
- if df.empty:
- logger.warning("Cannot generate VDV chart: empty data")
- return None
-
- # 确保有必要的列
- if 'accelX' not in df.columns or 'accelY' not in df.columns:
- logger.warning("Missing required columns for VDV chart")
- return None
-
- # 获取阈值
- thresholds = get_metric_thresholds(comfort_calculator, 'vdv')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
- if 'posH' not in df.columns:
- logger.warning("Missing heading angle data for coordinate transformation")
- return None
-
- # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
- df['posH_rad'] = np.radians(df['posH'])
-
- # 转换加速度到车身坐标系
- df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
- df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
- df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
-
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"vdv_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'a_x_body': df['a_x_body'],
- 'a_y_body': df['a_y_body'],
- 'a_z_body': df['a_z_body'],
- 'v': df['v'],
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold,
- 'vdv_value': vdv_value
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"VDV data saved to: {csv_filename}")
-
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
-
- # 创建图表(第三步)
- plt.figure(figsize=(12, 8))
-
- # 绘制三轴加速度
- plt.subplot(3, 1, 1)
- plt.plot(df['simTime'], df['a_x_body'], 'r-', label='X-axis Acceleration')
-
- # 添加阈值线
- if 'min_threshold' in df.columns and df['min_threshold'].iloc[0] is not None:
- min_threshold = df['min_threshold'].iloc[0]
- plt.axhline(y=min_threshold, color='r', linestyle=':', label=f'Min Threshold ({min_threshold})')
- if 'max_threshold' in df.columns and df['max_threshold'].iloc[0] is not None:
- max_threshold = df['max_threshold'].iloc[0]
- plt.axhline(y=max_threshold, color='g', linestyle=':', label=f'Max Threshold ({max_threshold})')
-
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- plt.title('Body X-axis Acceleration (Longitudinal)')
- plt.grid(True)
- plt.legend()
-
- plt.subplot(3, 1, 2)
- plt.plot(df['simTime'], df['a_y_body'], 'g-', label='Y-axis Acceleration')
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- plt.title('Body Y-axis Acceleration (Lateral)')
- plt.grid(True)
- plt.legend()
-
- plt.subplot(3, 1, 3)
- plt.plot(df['simTime'], df['a_z_body'], 'b-', label='Z-axis Acceleration')
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- vdv_value = df['vdv_value'].iloc[0] if 'vdv_value' in df.columns else 0
- plt.title(f'Body Z-axis Acceleration (Vertical) - VDV value: {vdv_value:.4f}')
- plt.grid(True)
- plt.legend()
-
- plt.tight_layout()
-
- # 保存图像
- chart_filename = os.path.join(output_dir, f"vdv_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- logger.info(f"VDV chart saved to: {chart_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate VDV chart: {str(e)}", exc_info=True)
- return None
- def generate_ava_vav_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate AVA_VAV (Average Vibration Acceleration Value) metric chart with data saved to CSV first.
- This version first saves data to CSV, then uses the CSV to generate the chart.
-
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- ava_vav_value = comfort_calculator.calculated_value.get('ava_vav', 0)
-
- if df.empty:
- logger.warning("Cannot generate AVA_VAV chart: empty data")
- return None
-
- # 确保有必要的列
- if 'accelX' not in df.columns or 'accelY' not in df.columns:
- logger.warning("Missing required columns for AVA_VAV chart")
- return None
-
- # 获取阈值
- thresholds = get_metric_thresholds(comfort_calculator, 'ava_vav')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
- if 'posH' not in df.columns:
- logger.warning("Missing heading angle data for coordinate transformation")
- return None
-
- # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
- df['posH_rad'] = np.radians(df['posH'])
-
- # 转换加速度到车身坐标系
- df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
- df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
- df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
-
- # 角速度数据
- df['omega_roll'] = df['rollRate'] if 'rollRate' in df.columns else pd.Series(np.zeros(len(df)))
- df['omega_pitch'] = df['pitchRate'] if 'pitchRate' in df.columns else pd.Series(np.zeros(len(df)))
- df['omega_yaw'] = df['speedH'] # 使用航向角速度作为偏航角速度
-
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"ava_vav_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'a_x_body': df['a_x_body'],
- 'a_y_body': df['a_y_body'],
- 'a_z_body': df['a_z_body'],
- 'omega_roll': df['omega_roll'],
- 'omega_pitch': df['omega_pitch'],
- 'omega_yaw': df['omega_yaw'],
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold,
- 'ava_vav_value': ava_vav_value
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"AVA_VAV data saved to: {csv_filename}")
-
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
-
- # 创建图表(第三步)
- plt.figure(figsize=(12, 10))
-
- # 绘制三轴加速度
- plt.subplot(3, 2, 1)
- plt.plot(df['simTime'], df['a_x_body'], 'r-', label='X-axis Acceleration')
-
- # 添加阈值线
- if 'min_threshold' in df.columns and df['min_threshold'].iloc[0] is not None:
- min_threshold = df['min_threshold'].iloc[0]
- plt.axhline(y=min_threshold, color='r', linestyle=':', label=f'Min Threshold ({min_threshold})')
- if 'max_threshold' in df.columns and df['max_threshold'].iloc[0] is not None:
- max_threshold = df['max_threshold'].iloc[0]
- plt.axhline(y=max_threshold, color='g', linestyle=':', label=f'Max Threshold ({max_threshold})')
-
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- plt.title('Body X-axis Acceleration (Longitudinal)')
- plt.grid(True)
- plt.legend()
-
- plt.subplot(3, 2, 3)
- plt.plot(df['simTime'], df['a_y_body'], 'g-', label='Y-axis Acceleration')
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- plt.title('Body Y-axis Acceleration (Lateral)')
- plt.grid(True)
- plt.legend()
-
- plt.subplot(3, 2, 5)
- plt.plot(df['simTime'], df['a_z_body'], 'b-', label='Z-axis Acceleration')
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- plt.title('Body Z-axis Acceleration (Vertical)')
- plt.grid(True)
- plt.legend()
-
- # 绘制三轴角速度
- plt.subplot(3, 2, 2)
- plt.plot(df['simTime'], df['omega_roll'], 'r-', label='Roll Rate')
- plt.xlabel('Time (s)')
- plt.ylabel('Angular Velocity (deg/s)')
- plt.title('Roll Rate')
- plt.grid(True)
- plt.legend()
-
- plt.subplot(3, 2, 4)
- plt.plot(df['simTime'], df['omega_pitch'], 'g-', label='Pitch Rate')
- plt.xlabel('Time (s)')
- plt.ylabel('Angular Velocity (deg/s)')
- plt.title('Pitch Rate')
- plt.grid(True)
- plt.legend()
-
- plt.subplot(3, 2, 6)
- plt.plot(df['simTime'], df['omega_yaw'], 'b-', label='Yaw Rate')
- plt.xlabel('Time (s)')
- plt.ylabel('Angular Velocity (deg/s)')
- ava_vav_value = df['ava_vav_value'].iloc[0] if 'ava_vav_value' in df.columns else 0
- plt.title(f'Yaw Rate - AVA_VAV value: {ava_vav_value:.4f}')
- plt.grid(True)
- plt.legend()
-
- plt.tight_layout()
-
- # 保存图像
- chart_filename = os.path.join(output_dir, f"ava_vav_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- logger.info(f"AVA_VAV chart saved to: {chart_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate AVA_VAV chart: {str(e)}", exc_info=True)
- return None
- def generate_msdv_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate MSDV (Motion Sickness Dose Value) metric chart with data saved to CSV first.
- This version first saves data to CSV, then uses the CSV to generate the chart.
-
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
-
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
-
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- msdv_value = comfort_calculator.calculated_value.get('msdv', 0)
- motion_sickness_prob = comfort_calculator.calculated_value.get('motionSickness', 0)
-
- if df.empty:
- logger.warning("Cannot generate MSDV chart: empty data")
- return None
-
- # 确保有必要的列
- if 'accelX' not in df.columns or 'accelY' not in df.columns:
- logger.warning("Missing required columns for MSDV chart")
- return None
-
- # 获取阈值
- thresholds = get_metric_thresholds(comfort_calculator, 'msdv')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
-
- # 将东北天坐标系下的加速度转换为车身坐标系下的加速度
- if 'posH' not in df.columns:
- logger.warning("Missing heading angle data for coordinate transformation")
- return None
-
- # 车身坐标系:X轴指向车头,Y轴指向车辆左侧,Z轴指向车顶
- df['posH_rad'] = np.radians(df['posH'])
-
- # 转换加速度到车身坐标系
- df['a_x_body'] = df['accelX'] * np.sin(df['posH_rad']) + df['accelY'] * np.cos(df['posH_rad'])
- df['a_y_body'] = df['accelX'] * np.cos(df['posH_rad']) - df['accelY'] * np.sin(df['posH_rad'])
- df['a_z_body'] = df['accelZ'] if 'accelZ' in df.columns else pd.Series(np.zeros(len(df)))
-
- # 生成时间戳
- import datetime
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"msdv_data_{timestamp}.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'a_x_body': df['a_x_body'],
- 'a_y_body': df['a_y_body'],
- 'a_z_body': df['a_z_body'],
- 'v': df['v'],
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold,
- 'msdv_value': msdv_value,
- 'motion_sickness_prob': motion_sickness_prob
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"MSDV data saved to: {csv_filename}")
-
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
-
- # 创建图表(第三步)
- plt.figure(figsize=(12, 8))
-
- # 绘制三轴加速度
- plt.subplot(3, 1, 1)
- plt.plot(df['simTime'], df['a_x_body'], 'r-', label='X-axis Acceleration')
-
- # 添加阈值线
- if 'min_threshold' in df.columns and df['min_threshold'].iloc[0] is not None:
- min_threshold = df['min_threshold'].iloc[0]
- plt.axhline(y=min_threshold, color='r', linestyle=':', label=f'Min Threshold ({min_threshold})')
- if 'max_threshold' in df.columns and df['max_threshold'].iloc[0] is not None:
- max_threshold = df['max_threshold'].iloc[0]
- plt.axhline(y=max_threshold, color='g', linestyle=':', label=f'Max Threshold ({max_threshold})')
-
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- plt.title('Body X-axis Acceleration (Longitudinal)')
- plt.grid(True)
- plt.legend()
-
- plt.subplot(3, 1, 2)
- plt.plot(df['simTime'], df['a_y_body'], 'g-', label='Y-axis Acceleration')
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- plt.title('Body Y-axis Acceleration (Lateral)')
- plt.grid(True)
- plt.legend()
-
- plt.subplot(3, 1, 3)
- plt.plot(df['simTime'], df['a_z_body'], 'b-', label='Z-axis Acceleration')
- plt.xlabel('Time (s)')
- plt.ylabel('Acceleration (m/s²)')
- msdv_value = df['msdv_value'].iloc[0] if 'msdv_value' in df.columns else 0
- motion_sickness_prob = df['motion_sickness_prob'].iloc[0] if 'motion_sickness_prob' in df.columns else 0
- plt.title(f'Body Z-axis Acceleration (Vertical) - MSDV: {msdv_value:.4f}, Motion Sickness Probability: {motion_sickness_prob:.2f}%')
- plt.grid(True)
- plt.legend()
-
- plt.tight_layout()
-
- # 保存图像
- chart_filename = os.path.join(output_dir, f"msdv_chart_{timestamp}.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
-
- logger.info(f"MSDV chart saved to: {chart_filename}")
- return chart_filename
-
- except Exception as e:
- logger.error(f"Failed to generate MSDV chart: {str(e)}", exc_info=True)
- return None
- def generate_traffic_chart_data(traffic_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
- """Generate chart data for traffic metrics"""
- # 待实现
- return None
- def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
- """Generate chart data for function metrics"""
- # 待实现
- return None
|