1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
- @Data: 2023/06/25
- @Last Modified: 2025/05/20
- @Summary: Chart generation utilities for metrics visualization
- """
- """
- 主要功能模块:
- 函数指标绘图(generate_function_chart_data)
- 舒适性指标绘图(generate_comfort_chart_data)
- 安全性指标绘图(generate_safety_chart_data)
- 交通指标绘图(generate_traffic_chart_data,待实现)
- 新增的急加速指标绘图:
- 在generate_comfort_chart_data中增加了slamaccelerate指标的支持
- 实现了完整的generate_slam_accelerate_chart函数,用于绘制急加速事件
- 该函数包含:
- 数据获取与预处理
- CSV数据保存与读取
- 纵向加速度与速度的双子图绘制
- 急加速事件的橙色背景标记
- 阈值线绘制
- 高质量图表输出(300dpi PNG)
- 其他关键特性:
- 统一的日志记录系统
- 阈值获取函数get_metric_thresholds
- 错误处理和异常捕获
- 时间戳管理
- 数据验证机制
- 详细的日志输出
- 辅助函数:
- calculate_distance 和 calculate_relative_speed(简化实现)
- scenario_sign_dict 场景签名字典(简化实现)
- 此代码实现了完整的指标可视化工具,特别针对急加速指标slamAccelerate提供了详细的绘图功能,能够清晰展示急加速事件的发生时间和相关数据变化。
- """
- import matplotlib
- matplotlib.use('Agg') # 使用非图形界面的后端
- import matplotlib.pyplot as plt
- import os
- import numpy as np
- import pandas as pd
- from typing import Optional, Dict, List, Any, Union
- from pathlib import Path
- from modules.lib.log_manager import LogManager
- def generate_function_chart_data(function_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
- str]:
- """
- Generate chart data for function metrics
- Args:
- function_calculator: FunctionCalculator instance
- metric_name: Metric name
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 确保输出目录存在
- if output_dir:
- os.makedirs(output_dir, exist_ok=True)
- else:
- output_dir = os.path.join(os.getcwd(), 'data')
- # 根据指标名称选择不同的图表生成方法
- if metric_name.lower() == 'latestwarningdistance_ttc_lst':
- return generate_latest_warning_ttc_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'earliestwarningdistance_ttc_lst':
- return generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'earliestwarningdistance_lst':
- return generate_earliest_warning_distance_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'latestwarningdistance_lst':
- return generate_latest_warning_distance_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'latestwarningdistance_ttc_pgvil':
- return generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'earliestwarningdistance_ttc_pgvil':
- return generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'earliestwarningdistance_pgvil':
- return generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'latestwarningdistance_pgvil':
- return generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'limitspeed_lst':
- return generate_limit_speed_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'limitspeedpastlimitsign_lst':
- return generate_limit_speed_past_sign_chart(function_calculator, output_dir)
- elif metric_name.lower() == 'maxlongitudedist_lst':
- return generate_max_longitude_dist_chart(function_calculator, output_dir)
- else:
- logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
- return None
- except Exception as e:
- logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
- return None
- def generate_earliest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate warning distance chart with data visualization.
- This function creates charts for earliestWarningDistance_LST and latestWarningDistance_LST metrics.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Check if correctwarning is already calculated
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_LST')
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- # Get calculated warning distance and speed
- warning_dist = getattr(function_calculator, 'warning_dist', None)
- if warning_dist.empty:
- logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data")
- return None
- # Calculate metric value
- metric_value = float(warning_dist.iloc[0]) if len(warning_dist) >= 0.0 else max_threshold
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
- 'warning_distance': warning_dist,
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold,
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"earliestWarningDistance_LST data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create single chart for warning distance
- plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
- # Plot warning distance
- plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- # Add threshold lines
- plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
- plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- # Mark metric value
- if len(df) > 0:
- label_text = 'Earliest Warning Distance'
- plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0],
- color='red', s=100, zorder=5,
- label=f'{label_text}: {metric_value:.2f}m')
- # Set y-axis range
- plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
- plt.xlabel('Time (s)')
- plt.ylabel('Distance (m)')
- plt.title(f'earliestWarningDistance_LST - Warning Distance Over Time')
- plt.grid(True)
- plt.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"earliestWarningDistance_LST_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"earliestWarningDistance_LST chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate earliestWarningDistance_LST chart: {str(e)}", exc_info=True)
- return None
- def generate_earliest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate warning distance chart with data visualization.
- This function creates charts for earliestWarningDistance_PGVIL and latestWarningDistance_PGVIL metrics.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Check if correctwarning is already calculated
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, 'earliestWarningDistance_PGVIL')
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- # Get calculated warning distance and speed
- warning_dist = getattr(function_calculator, 'warning_dist', None)
- warning_time = getattr(function_calculator, 'warning_time', None)
- if len(warning_dist) == 0:
- logger.warning(f"Cannot generate {"earliestWarningDistance_LST"} chart: empty data")
- return None
- # Calculate metric value
- metric_value = float(warning_dist[0]) if len(warning_dist) >= 0.0 else max_threshold
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': warning_time,
- 'warning_distance': warning_dist,
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"earliestWarningDistance_PGVIL data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create single chart for warning distance
- plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
- # Plot warning distance
- plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- # Add threshold lines
- plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
- plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- # Mark metric value
- if len(df) > 0:
- label_text = 'Earliest Warning Distance'
- plt.scatter(df['simTime'].iloc[0], df['warning_distance'].iloc[0],
- color='red', s=100, zorder=5,
- label=f'{label_text}: {metric_value:.2f}m')
- # Set y-axis range
- plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
- plt.xlabel('Time (s)')
- plt.ylabel('Distance (m)')
- plt.title(f'earliestWarningDistance_PGVIL - Warning Distance Over Time')
- plt.grid(True)
- plt.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"earliestWarningDistance_PGVIL_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"earliestWarningDistance_PGVIL chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate earliestWarningDistance_PGVIL chart: {str(e)}", exc_info=True)
- return None
- # # 使用function.py中已实现的find_nested_name函数
- # from modules.metric.function import find_nested_name
- def generate_latest_warning_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate TTC warning chart with data visualization.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- ego_df = function_calculator.ego_data.copy()
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- # 获取配置的阈值
- thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_PGVIL')
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- warning_dist = getattr(function_calculator, 'warning_dist', None)
- warning_speed = getattr(function_calculator, 'warning_speed', None)
- warning_time = getattr(function_calculator, 'warning_time', None)
- ttc = getattr(function_calculator, 'ttc', None)
- if len(warning_dist) == 0:
- logger.warning("Cannot generate TTC warning chart: empty data")
- return None
- # 生成时间戳
- # 保存 CSV 数据
- csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': warning_time,
- 'warning_distance': warning_dist,
- 'warning_speed': warning_speed,
- 'ttc': ttc,
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold,
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"latestwarningdistance_ttc_pgvil data saved to: {csv_filename}")
- # 从 CSV 读取数据
- df = pd.read_csv(csv_filename)
- # 创建图表
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:预警距离
- ax1 = plt.subplot(3, 1, 1)
- ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Distance (m)')
- ax1.set_title('Warning Distance Over Time')
- ax1.grid(True)
- ax1.legend()
- # 图 2:相对速度
- ax2 = plt.subplot(3, 1, 2)
- ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Speed (m/s)')
- ax2.set_title('Relative Speed Over Time')
- ax2.grid(True)
- ax2.legend()
- # 图 3:TTC
- ax3 = plt.subplot(3, 1, 3)
- ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
- # Add threshold lines
- ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
- ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- # Calculate metric value (latest TTC)
- metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold
- # Mark latest TTC value
- if len(df) > 0:
- ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1],
- color='red', s=100, zorder=5,
- label=f'Latest TTC: {metric_value:.2f}s')
- ax3.set_xlabel('Time (s)')
- ax3.set_ylabel('TTC (s)')
- ax3.set_title('Time To Collision (TTC) Over Time')
- ax3.grid(True)
- ax3.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_pgvil_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"latestwarningdistance_ttc_pgvil chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate latestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True)
- return None
- def generate_latest_warning_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate TTC warning chart with data visualization.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- ego_df = function_calculator.ego_data.copy()
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- # 获取配置的阈值
- thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_TTC_LST')
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- warning_dist = getattr(function_calculator, 'warning_dist', None)
- warning_speed = getattr(function_calculator, 'warning_speed', None)
- ttc = getattr(function_calculator, 'ttc', None)
- if warning_dist.empty:
- logger.warning("Cannot generate TTC warning chart: empty data")
- return None
- # 生成时间戳
- # 保存 CSV 数据
- csv_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
- 'warning_distance': warning_dist,
- 'warning_speed': warning_speed,
- 'ttc': ttc,
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold,
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"latestwarningdistance_ttc_lst data saved to: {csv_filename}")
- # 从 CSV 读取数据
- df = pd.read_csv(csv_filename)
- # 创建图表
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:预警距离
- ax1 = plt.subplot(3, 1, 1)
- ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Distance (m)')
- ax1.set_title('Warning Distance Over Time')
- ax1.grid(True)
- ax1.legend()
- # 图 2:相对速度
- ax2 = plt.subplot(3, 1, 2)
- ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Speed (m/s)')
- ax2.set_title('Relative Speed Over Time')
- ax2.grid(True)
- ax2.legend()
- # 图 3:TTC
- ax3 = plt.subplot(3, 1, 3)
- ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
- # Add threshold lines
- ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
- ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- # Calculate metric value (latest TTC)
- metric_value = float(ttc[-1]) if len(ttc) > 0 else max_threshold
- # Mark latest TTC value
- if len(df) > 0:
- ax3.scatter(df['simTime'].iloc[-1], df['ttc'].iloc[-1],
- color='red', s=100, zorder=5,
- label=f'Latest TTC: {metric_value:.2f}s')
- ax3.set_xlabel('Time (s)')
- ax3.set_ylabel('TTC (s)')
- ax3.set_title('Time To Collision (TTC) Over Time')
- ax3.grid(True)
- ax3.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"latestwarningdistance_ttc_lst_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"latestwarningdistance_ttc_lst chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate latestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True)
- return None
- def generate_latest_warning_distance_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate warning distance chart with data visualization.
- This function creates charts for latestWarningDistance_LST metric.
- Args:
- function_calculator: FunctionCalculator instance
- metric_name: Metric name (latestWarningDistance_LST)
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Check if correctwarning is already calculated
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_LST')
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- # Get calculated warning distance and speed
- warning_dist = getattr(function_calculator, 'warning_dist', None)
- if warning_dist.empty:
- logger.warning(f"Cannot generate latestWarningDistance_LST chart: empty data")
- return None
- # Calculate metric value
- metric_value = float(warning_dist.iloc[-1]) if len(warning_dist) > 0 else max_threshold
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"latestWarningDistance_LST_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
- 'warning_distance': warning_dist,
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"latestWarningDistance_LST data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create single chart for warning distance
- plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
- # Plot warning distance
- plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- # Add threshold lines
- plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
- plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- # Mark metric value
- if len(df) > 0:
- label_text = 'Latest Warning Distance'
- plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1],
- color='red', s=100, zorder=5,
- label=f'{label_text}: {metric_value:.2f}m')
- # Set y-axis range
- plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
- plt.xlabel('Time (s)')
- plt.ylabel('Distance (m)')
- plt.title(f'latestWarningDistance_LST - Warning Distance Over Time')
- plt.grid(True)
- plt.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"latestWarningDistance_LST_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"latestWarningDistance_LST chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate latestWarningDistance_LST chart: {str(e)}", exc_info=True)
- return None
- def generate_latest_warning_distance_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate warning distance chart with data visualization.
- This function creates charts for latestWarningDistance_LST metric.
- Args:
- function_calculator: FunctionCalculator instance
- metric_name: Metric name (latestWarningDistance_LST)
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Check if correctwarning is already calculated
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, 'latestWarningDistance_PGVIL')
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- # Get calculated warning distance and speed
- warning_dist = getattr(function_calculator, 'warning_dist', None)
- warning_time = getattr(function_calculator, 'warning_time', None)
- if len(warning_dist) == 0:
- logger.warning(f"Cannot generate latestWarningDistance_PGVIL chart: empty data")
- return None
- # Calculate metric value
- metric_value = float(warning_dist[-1]) if len(warning_dist) > 0 else max_threshold
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': warning_time,
- 'warning_distance': warning_dist,
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"latestWarningDistance_PGVIL data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create single chart for warning distance
- plt.figure(figsize=(12, 6), constrained_layout=True) # Adjusted height for single chart
- # Plot warning distance
- plt.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- # Add threshold lines
- plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
- plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- # Mark metric value
- if len(df) > 0:
- label_text = 'Latest Warning Distance'
- plt.scatter(df['simTime'].iloc[-1], df['warning_distance'].iloc[-1],
- color='red', s=100, zorder=5,
- label=f'{label_text}: {metric_value:.2f}m')
- # Set y-axis range
- plt.ylim(bottom=-1, top=max(max_threshold * 1.1, df['warning_distance'].max() * 1.1))
- plt.xlabel('Time (s)')
- plt.ylabel('Distance (m)')
- plt.title(f'latestWarningDistance_PGVIL - Warning Distance Over Time')
- plt.grid(True)
- plt.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"latestWarningDistance_PGVIL_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"latestWarningDistance_PGVIL chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate latestWarningDistance_PGVIL chart: {str(e)}", exc_info=True)
- return None
- def generate_earliest_warning_distance_ttc_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_LST metric.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- metric_name = 'earliestWarningDistance_TTC_LST'
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Check if correctwarning is already calculated
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, metric_name)
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- # Get calculated warning distance and speed
- warning_dist = getattr(function_calculator, 'correctwarning', None)
- warning_speed = getattr(function_calculator, 'warning_speed', None)
- ttc = getattr(function_calculator, 'ttc', None)
- # Calculate metric value
- metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['simTime'],
- 'warning_distance': warning_dist,
- 'warning_speed': warning_speed,
- 'ttc': ttc,
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"{metric_name} data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create chart
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:预警距离
- ax1 = plt.subplot(3, 1, 1)
- ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Distance (m)')
- ax1.set_title('Warning Distance Over Time')
- ax1.grid(True)
- ax1.legend()
- # 图 2:相对速度
- ax2 = plt.subplot(3, 1, 2)
- ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Speed (m/s)')
- ax2.set_title('Relative Speed Over Time')
- ax2.grid(True)
- ax2.legend()
- # 图 3:TTC
- ax3 = plt.subplot(3, 1, 3)
- ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
- # Add threshold lines
- ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
- ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- # Mark earliest TTC value
- if len(df) > 0:
- ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0],
- color='red', s=100, zorder=5,
- label=f'Earliest TTC: {metric_value:.2f}s')
- ax3.set_xlabel('Time (s)')
- ax3.set_ylabel('TTC (s)')
- ax3.set_title('Time To Collision (TTC) Over Time')
- ax3.grid(True)
- ax3.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_lst_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"{metric_name} chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate earliestwarningdistance_ttc_lst chart: {str(e)}", exc_info=True)
- return None
- def generate_earliest_warning_distance_ttc_pgvil_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate TTC warning chart with data visualization for earliestWarningDistance_TTC_PGVIL metric.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- metric_name = 'earliestWarningDistance_TTC_PGVIL'
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Check if correctwarning is already calculated
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, metric_name)
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- # Get calculated warning distance and speed
- warning_dist = getattr(function_calculator, 'warning_dist', None)
- warning_speed = getattr(function_calculator, 'warning_speed', None)
- ttc = getattr(function_calculator, 'ttc', None)
- warning_time = getattr(function_calculator, 'warning_time', None)
- # Calculate metric value
- metric_value = float(ttc[0]) if len(ttc) > 0 else max_threshold
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': warning_time,
- 'warning_distance': warning_dist,
- 'warning_speed': warning_speed,
- 'ttc': ttc,
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"{metric_name} data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create chart
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:预警距离
- ax1 = plt.subplot(3, 1, 1)
- ax1.plot(df['simTime'], df['warning_distance'], 'b-', label='Warning Distance')
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Distance (m)')
- ax1.set_title('Warning Distance Over Time')
- ax1.grid(True)
- ax1.legend()
- # 图 2:相对速度
- ax2 = plt.subplot(3, 1, 2)
- ax2.plot(df['simTime'], df['warning_speed'], 'g-', label='Relative Speed')
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Speed (m/s)')
- ax2.set_title('Relative Speed Over Time')
- ax2.grid(True)
- ax2.legend()
- # 图 3:TTC
- ax3 = plt.subplot(3, 1, 3)
- ax3.plot(df['simTime'], df['ttc'], 'r-', label='TTC')
- # Add threshold lines
- ax3.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
- ax3.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- # Mark earliest TTC value
- if len(df) > 0:
- ax3.scatter(df['simTime'].iloc[0], df['ttc'].iloc[0],
- color='red', s=100, zorder=5,
- label=f'Earliest TTC: {metric_value:.2f}s')
- ax3.set_xlabel('Time (s)')
- ax3.set_ylabel('TTC (s)')
- ax3.set_title('Time To Collision (TTC) Over Time')
- ax3.grid(True)
- ax3.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"earliestwarningdistance_ttc_pgvil_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"{metric_name} chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate earliestwarningdistance_ttc_pgvil chart: {str(e)}", exc_info=True)
- return None
- def generate_limit_speed_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate limit speed chart with data visualization for limitSpeed_LST metric.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- metric_name = 'limitSpeed_LST'
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, metric_name)
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- if ego_df.empty:
- logger.warning(f"Cannot generate {metric_name} chart: empty data")
- return None
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': ego_df['simTime'],
- 'speed': ego_df['v'],
- 'speed_limit': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df)))
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"{metric_name} data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create chart
- plt.figure(figsize=(12, 6), constrained_layout=True)
- # Plot speed
- plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed')
- plt.plot(df['simTime'], df['speed_limit'], 'r--', label='Speed Limit')
- # Set y-axis range
- plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1))
- plt.xlabel('Time (s)')
- plt.ylabel('Speed (m/s)')
- plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit')
- plt.grid(True)
- plt.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"{metric_name} chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
- return None
- def generate_limit_speed_past_sign_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate limit speed past sign chart with data visualization for limitSpeedPastLimitSign_LST metric.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- metric_name = 'limitSpeedPastLimitSign_LST'
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, metric_name)
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- if ego_df.empty:
- logger.warning(f"Cannot generate {metric_name} chart: empty data")
- return None
- # Get sign passing time if available
- sign_time = getattr(function_calculator, 'sign_pass_time', None)
- if sign_time is None:
- # Try to estimate sign passing time (middle of the simulation)
- sign_time = ego_df['simTime'].iloc[len(ego_df) // 2]
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': ego_df['simTime'],
- 'speed': ego_df['v'],
- 'speed_limit': ego_df.get('speed_limit', pd.Series([max_threshold] * len(ego_df))),
- 'sign_pass_time': sign_time
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"{metric_name} data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create chart
- plt.figure(figsize=(12, 6), constrained_layout=True)
- # Plot speed
- plt.plot(df['simTime'], df['speed'], 'b-', label='Vehicle Speed')
- plt.plot(df['simTime'], df['speed_limit'], 'r--', label='Speed Limit')
- # Mark sign passing time
- plt.axvline(x=sign_time, color='g', linestyle='--', label='Speed Limit Sign')
- # Set y-axis range
- plt.ylim(bottom=0, top=max(max_threshold * 1.1, df['speed'].max() * 1.1))
- plt.xlabel('Time (s)')
- plt.ylabel('Speed (m/s)')
- plt.title(f'{metric_name} - Vehicle Speed vs Speed Limit')
- plt.grid(True)
- plt.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"{metric_name} chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
- return None
- def generate_max_longitude_dist_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate maximum longitudinal distance chart with data visualization for maxLongitudeDist_LST metric.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- metric_name = 'maxLongitudeDist_LST'
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, metric_name)
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- # Get longitudinal distance data
- longitude_dist = ego_df['longitude_dist'] if 'longitude_dist' in ego_df.columns else None
- stop_time = ego_df['stop_time'] if 'stop_time' in ego_df.columns else None
- if longitude_dist is None or longitude_dist.empty:
- logger.warning(f"Cannot generate {metric_name} chart: missing longitudinal distance data")
- return None
- # Calculate metric value
- metric_value = longitude_dist.max()
- max_distance_time = ego_df.loc[longitude_dist.idxmax(), 'simTime']
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': ego_df['simTime'],
- 'x_relative_dist': ego_df['x_relative_dist'],
- 'stop_time': stop_time,
- 'longitude_dist': longitude_dist
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"{metric_name} data saved to: {csv_filename}")
- # Read data from CSV
- df = pd.read_csv(csv_filename)
- # Create chart
- plt.figure(figsize=(12, 6), constrained_layout=True)
- # Plot longitudinal distance
- plt.plot(df['simTime'], df['x_relative_dist'], 'b-', label='Longitudinal Distance')
- # Add threshold lines
- plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
- plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- # Mark maximum longitudinal distance
- plt.scatter(max_distance_time, metric_value,
- color='red', s=100, zorder=5,
- label=f'Maximum Longitudinal Distance: {metric_value:.2f}m')
- # Set y-axis range
- plt.ylim(bottom=min(0, min_threshold * 0.9), top=max(max_threshold * 1.1, df['longitude_dist'].max() * 1.1))
- plt.xlabel('Time (s)')
- plt.ylabel('Longitudinal Distance (m)')
- plt.title(f'{metric_name} - Longitudinal Distance Over Time')
- plt.grid(True)
- plt.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"{metric_name} chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
- return None
- def generate_warning_delay_time_chart(function_calculator, output_dir: str) -> Optional[str]:
- """
- Generate warning delay time chart with data visualization for warningDelayTime_LST metric.
- Args:
- function_calculator: FunctionCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- metric_name = 'warningDelayTime_LST'
- try:
- # Get data
- ego_df = function_calculator.ego_data.copy()
- # Get configured thresholds
- thresholds = get_metric_thresholds(function_calculator, metric_name)
- max_threshold = thresholds["max"]
- min_threshold = thresholds["min"]
- # Check if correctwarning is already calculated
- correctwarning = getattr(function_calculator, 'correctwarning', None)
- if correctwarning is None:
- logger.warning(f"Cannot generate {metric_name} chart: missing correctwarning value")
- return None
- # Get HMI warning time and rosbag warning time
- HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
- simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
- rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
- 'simTime'].tolist()
- simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
- if (simTime_HMI is None) or (simTime_rosbag is None):
- logger.warning(f"Cannot generate {metric_name} chart: missing warning time data")
- return None
- # Calculate delay time
- delay_time = abs(simTime_HMI - simTime_rosbag)
- # Save CSV data
- csv_filename = os.path.join(output_dir, f"{metric_name.lower()}_data.csv")
- df_csv = pd.DataFrame({
- 'HMI_warning_time': [simTime_HMI],
- 'rosbag_warning_time': [simTime_rosbag],
- 'delay_time': [delay_time],
- 'min_threshold': [min_threshold],
- 'max_threshold': [max_threshold]
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"{metric_name} data saved to: {csv_filename}")
- # Create chart - bar chart for delay time
- plt.figure(figsize=(10, 6), constrained_layout=True)
- # Plot delay time as bar
- plt.bar(['Warning Delay Time'], [delay_time], color='blue', width=0.4)
- # Add threshold lines
- plt.axhline(y=max_threshold, color='r', linestyle='--', label=f'Max Threshold ({max_threshold}s)')
- plt.axhline(y=min_threshold, color='g', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- # Add value label
- plt.text(0, delay_time + 0.05, f'{delay_time:.3f}s', ha='center', va='bottom', fontweight='bold')
- # Set y-axis range
- plt.ylim(bottom=0, top=max(max_threshold * 1.2, delay_time * 1.2))
- plt.ylabel('Delay Time (s)')
- plt.title(f'{metric_name} - Warning Delay Time')
- plt.grid(True, axis='y')
- plt.legend()
- # Save image
- chart_filename = os.path.join(output_dir, f"{metric_name.lower()}_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"{metric_name} chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate {metric_name} chart: {str(e)}", exc_info=True)
- return None
- def generate_comfort_chart_data(comfort_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
- str]:
- """
- Generate chart data for comfort metrics
- Args:
- comfort_calculator: ComfortCalculator instance
- metric_name: Metric name
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 确保输出目录存在
- if output_dir:
- os.makedirs(output_dir, exist_ok=True)
- else:
- output_dir = os.getcwd()
- # 根据指标名称选择不同的图表生成方法
- if metric_name.lower() == 'shake':
- return generate_shake_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'zigzag':
- return generate_zigzag_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'cadence':
- return generate_cadence_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'slambrake':
- return generate_slam_brake_chart(comfort_calculator, output_dir)
- elif metric_name.lower() == 'slamaccelerate':
- return generate_slam_accelerate_chart(comfort_calculator, output_dir)
- else:
- logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
- return None
- except Exception as e:
- logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
- return None
- def generate_shake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate shake metric chart with orange background for shake events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- shake_events = comfort_calculator.shake_events
- if df.empty:
- logger.warning("Cannot generate shake chart: empty data")
- return None
- # 生成时间戳
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"shake_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'lat_acc': df['lat_acc'],
- 'lat_acc_rate': df['lat_acc_rate'],
- 'speedH_std': df['speedH_std'],
- 'lat_acc_threshold': df.get('lat_acc_threshold', pd.Series([None] * len(df))),
- 'lat_acc_rate_threshold': 0.5,
- 'speedH_std_threshold': df.get('speedH_threshold', pd.Series([None] * len(df))),
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Shake data saved to: {csv_filename}")
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
- # 创建图表(第三步)
- import matplotlib.pyplot as plt
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:横向加速度
- ax1 = plt.subplot(3, 1, 1)
- ax1.plot(df['simTime'], df['lat_acc'], 'b-', label='Lateral Acceleration')
- if 'lat_acc_threshold' in df.columns:
- ax1.plot(df['simTime'], df['lat_acc_threshold'], 'r--', label='lat_acc_threshold')
- for idx, event in enumerate(shake_events):
- label = 'Shake Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Lateral Acceleration (m/s²)')
- ax1.set_title('Shake Event Detection - Lateral Acceleration')
- ax1.grid(True)
- ax1.legend()
- # 图 2:lat_acc_rate
- ax2 = plt.subplot(3, 1, 2)
- ax2.plot(df['simTime'], df['lat_acc_rate'], 'g-', label='lat_acc_rate')
- ax2.axhline(
- y=0.5, color='orange', linestyle='--', linewidth=1.2, label='lat_acc_rate_threshold'
- )
- for idx, event in enumerate(shake_events):
- label = 'Shake Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Angular Velocity (m/s³)')
- ax2.set_title('Shake Event Detection - lat_acc_rate')
- ax2.grid(True)
- ax2.legend()
- # 图 3:speedH_std
- ax3 = plt.subplot(3, 1, 3)
- ax3.plot(df['simTime'], df['speedH_std'], 'b-', label='speedH_std')
- if 'speedH_std_threshold' in df.columns:
- ax3.plot(df['simTime'], df['speedH_std_threshold'], 'r--', label='speedH_threshold')
- for idx, event in enumerate(shake_events):
- label = 'Shake Event' if idx == 0 else None
- ax3.axvspan(event['start_time'], event['end_time'], alpha=0.3, color='orange', label=label)
- ax3.set_xlabel('Time (s)')
- ax3.set_ylabel('Angular Velocity (deg/s)')
- ax3.set_title('Shake Event Detection - speedH_std')
- ax3.grid(True)
- ax3.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"shake_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"Shake chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate shake chart: {str(e)}", exc_info=True)
- return None
- def generate_zigzag_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate zigzag metric chart with orange background for zigzag events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- zigzag_events = comfort_calculator.discomfort_df[
- comfort_calculator.discomfort_df['type'] == 'zigzag'
- ].copy()
- if df.empty:
- logger.warning("Cannot generate zigzag chart: empty data")
- return None
- # 生成时间戳
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"zigzag_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'speedH': df['speedH'],
- 'posH': df['posH'],
- 'min_speedH_threshold': -2.3, # 可替换为动态阈值
- 'max_speedH_threshold': 2.3
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Zigzag data saved to: {csv_filename}")
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
- # 创建图表(第三步)
- import matplotlib.pyplot as plt
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # ===== 子图1:Yaw Rate =====
- ax1 = plt.subplot(2, 1, 1)
- ax1.plot(df['simTime'], df['speedH'], 'g-', label='Yaw Rate')
- # 添加 speedH 上下限阈值线
- ax1.axhline(y=2.3, color='m', linestyle='--', linewidth=1.2, label='Max Threshold (+2.3)')
- ax1.axhline(y=-2.3, color='r', linestyle='--', linewidth=1.2, label='Min Threshold (-2.3)')
- # 添加橙色背景:Zigzag Events
- for idx, event in zigzag_events.iterrows():
- label = 'Zigzag Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Angular Velocity (deg/s)')
- ax1.set_title('Zigzag Event Detection - Yaw Rate')
- ax1.grid(True)
- ax1.legend(loc='upper left')
- # ===== 子图2:Yaw Angle =====
- ax2 = plt.subplot(2, 1, 2)
- ax2.plot(df['simTime'], df['posH'], 'b-', label='Yaw')
- # 添加橙色背景:Zigzag Events
- for idx, event in zigzag_events.iterrows():
- label = 'Zigzag Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Yaw (deg)')
- ax2.set_title('Zigzag Event Detection - Yaw Angle')
- ax2.grid(True)
- ax2.legend(loc='upper left')
- # 保存图像
- chart_filename = os.path.join(output_dir, f"zigzag_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"Zigzag chart saved to: {chart_filename}")
- return csv_filename
- except Exception as e:
- logger.error(f"Failed to generate zigzag chart: {str(e)}", exc_info=True)
- return None
- def generate_cadence_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate cadence metric chart with orange background for cadence events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- cadence_events = comfort_calculator.discomfort_df[comfort_calculator.discomfort_df['type'] == 'cadence'].copy()
- if df.empty:
- logger.warning("Cannot generate cadence chart: empty data")
- return None
- # 生成时间戳
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"cadence_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'lon_acc': df['lon_acc'],
- 'v': df['v'],
- 'ip_acc': df.get('ip_acc', pd.Series([None] * len(df))),
- 'ip_dec': df.get('ip_dec', pd.Series([None] * len(df)))
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Cadence data saved to: {csv_filename}")
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
- # 创建图表(第三步)
- import matplotlib.pyplot as plt
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:纵向加速度
- ax1 = plt.subplot(2, 1, 1)
- ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
- if 'ip_acc' in df.columns and 'ip_dec' in df.columns:
- ax1.plot(df['simTime'], df['ip_acc'], 'r--', label='Acceleration Threshold')
- ax1.plot(df['simTime'], df['ip_dec'], 'g--', label='Deceleration Threshold')
- # 添加橙色背景标识顿挫事件
- for idx, event in cadence_events.iterrows():
- label = 'Cadence Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
- ax1.set_title('Cadence Event Detection - Longitudinal Acceleration')
- ax1.grid(True)
- ax1.legend()
- # 图 2:速度
- ax2 = plt.subplot(2, 1, 2)
- ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
- # 添加橙色背景标识顿挫事件
- for idx, event in cadence_events.iterrows():
- label = 'Cadence Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Velocity (m/s)')
- ax2.set_title('Cadence Event Detection - Vehicle Speed')
- ax2.grid(True)
- ax2.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"cadence_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"Cadence chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate cadence chart: {str(e)}", exc_info=True)
- return None
- def generate_slam_brake_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate slam brake metric chart with orange background for slam brake events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- slam_brake_events = comfort_calculator.discomfort_df[
- comfort_calculator.discomfort_df['type'] == 'slam_brake'].copy()
- if df.empty:
- logger.warning("Cannot generate slam brake chart: empty data")
- return None
- # 生成时间戳
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"slam_brake_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'lon_acc': df['lon_acc'],
- 'v': df['v'],
- 'min_threshold': df.get('ip_dec', pd.Series([None] * len(df))),
- 'max_threshold': 0.0
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Slam brake data saved to: {csv_filename}")
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
- # 创建图表(第三步)
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:纵向加速度
- ax1 = plt.subplot(2, 1, 1)
- ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
- if 'min_threshold' in df.columns:
- ax1.plot(df['simTime'], df['min_threshold'], 'r--', label='Deceleration Threshold')
- # 添加橙色背景标识急刹车事件
- for idx, event in slam_brake_events.iterrows():
- label = 'Slam Brake Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Longitudinal Acceleration (m/s²)')
- ax1.set_title('Slam Brake Event Detection - Longitudinal Acceleration')
- ax1.grid(True)
- ax1.legend()
- # 图 2:速度
- ax2 = plt.subplot(2, 1, 2)
- ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
- # 添加橙色背景标识急刹车事件
- for idx, event in slam_brake_events.iterrows():
- label = 'Slam Brake Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Velocity (m/s)')
- ax2.set_title('Slam Brake Event Detection - Vehicle Speed')
- ax2.grid(True)
- ax2.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"slam_brake_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"Slam brake chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate slam brake chart: {str(e)}", exc_info=True)
- return None
- def generate_slam_accelerate_chart(comfort_calculator, output_dir: str) -> Optional[str]:
- """
- Generate slam accelerate metric chart with orange background for slam accelerate events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- comfort_calculator: ComfortCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- df = comfort_calculator.ego_df.copy()
- slam_accel_events = comfort_calculator.discomfort_df[
- (comfort_calculator.discomfort_df['type'] == 'slam_accel')
- ].copy()
- if df.empty:
- logger.warning("Cannot generate slam accelerate chart: empty data")
- return None
- # 生成时间戳
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"slam_accel_data.csv")
- # 获取加速度阈值(如果存在)
- accel_threshold = df.get('ip_acc', pd.Series([None] * len(df)))
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'lon_acc': df['lon_acc'],
- 'v': df['v'],
- 'min_threshold': 0.0, # 加速度最小阈值设为0
- 'max_threshold': accel_threshold # 急加速阈值
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Slam accelerate data saved to: {csv_filename}")
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
- # 创建图表(第三步)
- plt.figure(figsize=(12, 8), constrained_layout=True)
- # 图 1:纵向加速度
- ax1 = plt.subplot(2, 1, 1)
- ax1.plot(df['simTime'], df['lon_acc'], 'b-', label='Longitudinal Acceleration')
- # 添加加速度阈值线
- if 'max_threshold' in df.columns and not df['max_threshold'].isnull().all():
- ax1.plot(df['simTime'], df['max_threshold'], 'r--', label='Acceleration Threshold')
- # 添加橙色背景标识急加速事件
- for idx, event in slam_accel_events.iterrows():
- label = 'Slam Accelerate Event' if idx == 0 else None
- ax1.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax1.set_xlabel('Time (s)')
- ax1.set_ylabel('Acceleration (m/s²)')
- ax1.set_title('Slam Accelerate Event Detection - Longitudinal Acceleration')
- ax1.grid(True)
- ax1.legend()
- # 图 2:速度
- ax2 = plt.subplot(2, 1, 2)
- ax2.plot(df['simTime'], df['v'], 'g-', label='Velocity')
- # 添加橙色背景标识急加速事件
- for idx, event in slam_accel_events.iterrows():
- label = 'Slam Accelerate Event' if idx == 0 else None
- ax2.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- ax2.set_xlabel('Time (s)')
- ax2.set_ylabel('Velocity (m/s)')
- ax2.set_title('Slam Accelerate Event Detection - Vehicle Speed')
- ax2.grid(True)
- ax2.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"slam_accel_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- logger.info(f"Slam accelerate chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate slam accelerate chart: {str(e)}", exc_info=True)
- return None
- def get_metric_thresholds(calculator, metric_name: str) -> dict:
- """
- 从配置文件中获取指标的阈值
- Args:
- calculator: Calculator instance (FunctionCalculator, SafetyCalculator, ComfortCalculator, EfficientCalculator, TrafficCalculator)
- metric_name: 指标名称
- Returns:
- dict: 包含min和max阈值的字典
- """
- logger = LogManager().get_logger()
- thresholds = {"min": None, "max": None}
- try:
- # 根据计算器类型获取配置
- if hasattr(calculator, 'data_processed'):
- # 检查安全性指标配置
- if hasattr(calculator.data_processed,
- 'safety_config') and 'safety' in calculator.data_processed.safety_config:
- config = calculator.data_processed.safety_config['safety']
- metric_type = 'safety'
- # 检查舒适性指标配置
- elif hasattr(calculator.data_processed,
- 'comfort_config') and 'comfort' in calculator.data_processed.comfort_config:
- config = calculator.data_processed.comfort_config['comfort']
- metric_type = 'comfort'
- # 检查功能性指标配置
- elif hasattr(calculator.data_processed,
- 'function_config') and 'function' in calculator.data_processed.function_config:
- config = calculator.data_processed.function_config['function']
- metric_type = 'function'
- # 检查高效性指标配置
- elif hasattr(calculator.data_processed,
- 'efficient_config') and 'efficient' in calculator.data_processed.efficient_config:
- config = calculator.data_processed.efficient_config['efficient']
- metric_type = 'efficient'
- # 检查交通性指标配置
- elif hasattr(calculator.data_processed,
- 'traffic_config') and 'traffic' in calculator.data_processed.traffic_config:
- config = calculator.data_processed.traffic_config['traffic']
- metric_type = 'traffic'
- else:
- # 直接检查calculator是否有function_config属性(针对FunctionCalculator)
- if hasattr(calculator, 'function_config') and 'function' in calculator.function_config:
- config = calculator.function_config['function']
- metric_type = 'function'
- else:
- logger.warning(f"无法找到{metric_name}的配置信息")
- return thresholds
- else:
- # 直接检查calculator是否有function_config属性(针对FunctionCalculator)
- if hasattr(calculator, 'function_config') and 'function' in calculator.function_config:
- config = calculator.function_config['function']
- metric_type = 'function'
- else:
- logger.warning(f"计算器没有data_processed属性或function_config属性")
- return thresholds
- # 递归查找指标配置
- def find_metric_config(node, target_name):
- if isinstance(node, dict):
- if 'name' in node and node['name'].lower() == target_name.lower() and 'min' in node and 'max' in node:
- return node
- for key, value in node.items():
- result = find_metric_config(value, target_name)
- if result:
- return result
- return None
- # 查找指标配置
- metric_config = find_metric_config(config, metric_name)
- if metric_config:
- thresholds["min"] = metric_config.get("min")
- thresholds["max"] = metric_config.get("max")
- logger.info(f"找到{metric_name}的阈值: min={thresholds['min']}, max={thresholds['max']}")
- else:
- logger.warning(f"在{metric_type}配置中未找到{metric_name}的阈值信息")
- except Exception as e:
- logger.error(f"获取{metric_name}阈值时出错: {str(e)}", exc_info=True)
- return thresholds
- def generate_safety_chart_data(safety_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[str]:
- """
- Generate chart data for safety metrics
- Args:
- safety_calculator: SafetyCalculator instance
- metric_name: Metric name
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 确保输出目录存在
- if output_dir:
- os.makedirs(output_dir, exist_ok=True)
- else:
- output_dir = os.getcwd()
- # 根据指标名称选择不同的图表生成方法
- if metric_name.lower() == 'ttc':
- return generate_ttc_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'mttc':
- return generate_mttc_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'thw':
- return generate_thw_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'lonsd':
- return generate_lonsd_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'latsd':
- return generate_latsd_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'btn':
- return generate_btn_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'collisionrisk':
- return generate_collision_risk_chart(safety_calculator, output_dir)
- elif metric_name.lower() == 'collisionseverity':
- return generate_collision_severity_chart(safety_calculator, output_dir)
- else:
- logger.warning(f"Chart generation not implemented for metric [{metric_name}]")
- return None
- except Exception as e:
- logger.error(f"Failed to generate chart data: {str(e)}", exc_info=True)
- return None
- def generate_ttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate TTC metric chart with orange background for unsafe events.
- This version first saves data to CSV, then uses the CSV to generate the chart.
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- ttc_data = safety_calculator.ttc_data
- if not ttc_data:
- logger.warning("Cannot generate TTC chart: empty data")
- return None
- # 创建DataFrame
- df = pd.DataFrame(ttc_data)
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'TTC')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
- # 生成时间戳
- # 保存 CSV 数据(第一步)
- csv_filename = os.path.join(output_dir, f"ttc_data.csv")
- df_csv = pd.DataFrame({
- 'simTime': df['simTime'],
- 'simFrame': df['simFrame'],
- 'TTC': df['TTC'],
- 'min_threshold': min_threshold,
- 'max_threshold': max_threshold
- })
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"TTC data saved to: {csv_filename}")
- # 第二步:从 CSV 读取(可验证保存数据无误)
- df = pd.read_csv(csv_filename)
- # 检测超阈值事件
- unsafe_events = []
- if min_threshold is not None:
- # 对于TTC,小于最小阈值视为不安全
- unsafe_condition = df['TTC'] < min_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'min_ttc': group['TTC'].min()
- })
- # 创建图表(第三步)
- plt.figure(figsize=(12, 8))
- plt.plot(df['simTime'], df['TTC'], 'b-', label='TTC')
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe TTC Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- plt.xlabel('Time (s)')
- plt.ylabel('TTC (s)')
- plt.title('Time To Collision (TTC) Trend')
- plt.grid(True)
- plt.legend()
- # 保存图像
- chart_filename = os.path.join(output_dir, f"ttc_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个TTC不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(
- f"TTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小TTC={event['min_ttc']:.2f}s")
- logger.info(f"TTC chart saved to: {chart_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate TTC chart: {str(e)}", exc_info=True)
- return None
- def generate_mttc_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate MTTC metric chart with orange background for unsafe events
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- mttc_data = safety_calculator.mttc_data
- if not mttc_data:
- logger.warning("Cannot generate MTTC chart: empty data")
- return None
- # 创建DataFrame
- df = pd.DataFrame(mttc_data)
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'MTTC')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
- # 检测超阈值事件
- unsafe_events = []
- if min_threshold is not None:
- # 对于MTTC,小于最小阈值视为不安全
- unsafe_condition = df['MTTC'] < min_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'min_mttc': group['MTTC'].min()
- })
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['MTTC'], 'g-', label='MTTC')
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe MTTC Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- plt.xlabel('Time (s)')
- plt.ylabel('MTTC (s)')
- plt.title('Modified Time To Collision (MTTC) Trend')
- plt.grid(True)
- plt.legend()
- # 保存图表
- chart_filename = os.path.join(output_dir, f"mttc_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"mttc_data.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个MTTC不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(
- f"MTTC不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小MTTC={event['min_mttc']:.2f}s")
- logger.info(f"MTTC chart saved to: {chart_filename}")
- logger.info(f"MTTC data saved to: {csv_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate MTTC chart: {str(e)}", exc_info=True)
- return None
- def generate_thw_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate THW metric chart with orange background for unsafe events
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- thw_data = safety_calculator.thw_data
- if not thw_data:
- logger.warning("Cannot generate THW chart: empty data")
- return None
- # 创建DataFrame
- df = pd.DataFrame(thw_data)
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'THW')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
- # 检测超阈值事件
- unsafe_events = []
- if min_threshold is not None:
- # 对于THW,小于最小阈值视为不安全
- unsafe_condition = df['THW'] < min_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'min_thw': group['THW'].min()
- })
- # 创建图表
- plt.figure(figsize=(12, 10))
- plt.plot(df['simTime'], df['THW'], 'c-', label='THW')
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}s)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe THW Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- plt.xlabel('Time (s)')
- plt.ylabel('THW (s)')
- plt.title('Time Headway (THW) Trend')
- plt.grid(True)
- plt.legend()
- # 保存图表
- chart_filename = os.path.join(output_dir, f"thw_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"thw_data.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个THW不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(
- f"THW不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小THW={event['min_thw']:.2f}s")
- logger.info(f"THW chart saved to: {chart_filename}")
- logger.info(f"THW data saved to: {csv_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate THW chart: {str(e)}", exc_info=True)
- return None
- def generate_lonsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Longitudinal Safe Distance metric chart
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- lonsd_data = safety_calculator.lonsd_data
- if not lonsd_data:
- logger.warning("Cannot generate Longitudinal Safe Distance chart: empty data")
- return None
- # 创建DataFrame
- df = pd.DataFrame(lonsd_data)
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'LonSD')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['LonSD'], 'm-', label='Longitudinal Safe Distance')
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
- plt.xlabel('Time (s)')
- plt.ylabel('Distance (m)')
- plt.title('Longitudinal Safe Distance (LonSD) Trend')
- plt.grid(True)
- plt.legend()
- # 保存图表
- chart_filename = os.path.join(output_dir, f"lonsd_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"lonsd_data.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Longitudinal Safe Distance chart saved to: {chart_filename}")
- logger.info(f"Longitudinal Safe Distance data saved to: {csv_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate Longitudinal Safe Distance chart: {str(e)}", exc_info=True)
- return None
- def generate_latsd_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Lateral Safe Distance metric chart with orange background for unsafe events
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- latsd_data = safety_calculator.latsd_data
- if not latsd_data:
- logger.warning("Cannot generate Lateral Safe Distance chart: empty data")
- return None
- # 创建DataFrame
- df = pd.DataFrame(latsd_data)
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'LatSD')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
- # 检测超阈值事件
- unsafe_events = []
- if min_threshold is not None:
- # 对于LatSD,小于最小阈值视为不安全
- unsafe_condition = df['LatSD'] < min_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'min_latsd': group['LatSD'].min()
- })
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['LatSD'], 'y-', label='Lateral Safe Distance')
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}m)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}m)')
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe LatSD Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- plt.xlabel('Time (s)')
- plt.ylabel('Distance (m)')
- plt.title('Lateral Safe Distance (LatSD) Trend')
- plt.grid(True)
- plt.legend()
- # 保存图表
- chart_filename = os.path.join(output_dir, f"latsd_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"latsd_data.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个LatSD不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(
- f"LatSD不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最小LatSD={event['min_latsd']:.2f}m")
- logger.info(f"Lateral Safe Distance chart saved to: {chart_filename}")
- logger.info(f"Lateral Safe Distance data saved to: {csv_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate Lateral Safe Distance chart: {str(e)}", exc_info=True)
- return None
- def generate_btn_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Brake Threat Number metric chart with orange background for unsafe events
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- btn_data = safety_calculator.btn_data
- if not btn_data:
- logger.warning("Cannot generate Brake Threat Number chart: empty data")
- return None
- # 创建DataFrame
- df = pd.DataFrame(btn_data)
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'BTN')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
- # 检测超阈值事件
- unsafe_events = []
- if max_threshold is not None:
- # 对于BTN,大于最大阈值视为不安全
- unsafe_condition = df['BTN'] > max_threshold
- event_groups = (unsafe_condition != unsafe_condition.shift()).cumsum()
- for _, group in df[unsafe_condition].groupby(event_groups):
- if len(group) >= 2: # 至少2帧才算一次事件
- start_time = group['simTime'].iloc[0]
- end_time = group['simTime'].iloc[-1]
- duration = end_time - start_time
- if duration >= 0.1: # 只记录持续时间超过0.1秒的事件
- unsafe_events.append({
- 'start_time': start_time,
- 'end_time': end_time,
- 'start_frame': group['simFrame'].iloc[0],
- 'end_frame': group['simFrame'].iloc[-1],
- 'duration': duration,
- 'max_btn': group['BTN'].max()
- })
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['BTN'], 'r-', label='Brake Threat Number')
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold})')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold})')
- # 添加橙色背景标识不安全事件
- for idx, event in enumerate(unsafe_events):
- label = 'Unsafe BTN Event' if idx == 0 else None
- plt.axvspan(event['start_time'], event['end_time'],
- alpha=0.3, color='orange', label=label)
- plt.xlabel('Time (s)')
- plt.ylabel('BTN')
- plt.title('Brake Threat Number (BTN) Trend')
- plt.grid(True)
- plt.legend()
- # 保存图表
- chart_filename = os.path.join(output_dir, f"btn_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"btn_data.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
- # 记录不安全事件信息
- if unsafe_events:
- logger.info(f"检测到 {len(unsafe_events)} 个BTN不安全事件")
- for i, event in enumerate(unsafe_events):
- logger.info(
- f"BTN不安全事件 #{i + 1}: 开始时间={event['start_time']:.2f}s, 结束时间={event['end_time']:.2f}s, 持续时间={event['duration']:.2f}s, 最大BTN={event['max_btn']:.2f}")
- logger.info(f"Brake Threat Number chart saved to: {chart_filename}")
- logger.info(f"Brake Threat Number data saved to: {csv_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate Brake Threat Number chart: {str(e)}", exc_info=True)
- return None
- def generate_collision_risk_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Collision Risk metric chart
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- risk_data = safety_calculator.collision_risk_data
- if not risk_data:
- logger.warning("Cannot generate Collision Risk chart: empty data")
- return None
- # 创建DataFrame
- df = pd.DataFrame(risk_data)
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'collisionRisk')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['collisionRisk'], 'r-', label='Collision Risk')
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)')
- plt.xlabel('Time (s)')
- plt.ylabel('Risk Value (%)')
- plt.title('Collision Risk (collisionRisk) Trend')
- plt.grid(True)
- plt.legend()
- # 保存图表
- chart_filename = os.path.join(output_dir, f"collision_risk_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"collisionrisk_data.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Collision Risk chart saved to: {chart_filename}")
- logger.info(f"Collision Risk data saved to: {csv_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate Collision Risk chart: {str(e)}", exc_info=True)
- return None
- def generate_collision_severity_chart(safety_calculator, output_dir: str) -> Optional[str]:
- """
- Generate Collision Severity metric chart
- Args:
- safety_calculator: SafetyCalculator instance
- output_dir: Output directory
- Returns:
- str: Chart file path, or None if generation fails
- """
- logger = LogManager().get_logger()
- try:
- # 获取数据
- severity_data = safety_calculator.collision_severity_data
- if not severity_data:
- logger.warning("Cannot generate Collision Severity chart: empty data")
- return None
- # 创建DataFrame
- df = pd.DataFrame(severity_data)
- # 获取阈值
- thresholds = get_metric_thresholds(safety_calculator, 'collisionSeverity')
- min_threshold = thresholds.get('min')
- max_threshold = thresholds.get('max')
- # 创建图表
- plt.figure(figsize=(12, 6))
- plt.plot(df['simTime'], df['collisionSeverity'], 'r-', label='Collision Severity')
- # 添加阈值线
- if min_threshold is not None:
- plt.axhline(y=min_threshold, color='r', linestyle='--', label=f'Min Threshold ({min_threshold}%)')
- if max_threshold is not None:
- plt.axhline(y=max_threshold, color='g', linestyle='--', label=f'Max Threshold ({max_threshold}%)')
- plt.xlabel('Time (s)')
- plt.ylabel('Severity (%)')
- plt.title('Collision Severity (collisionSeverity) Trend')
- plt.grid(True)
- plt.legend()
- # 保存图表
- chart_filename = os.path.join(output_dir, f"collision_severity_chart.png")
- plt.savefig(chart_filename, dpi=300)
- plt.close()
- # 保存CSV数据,包含阈值信息
- csv_filename = os.path.join(output_dir, f"collisionseverity_data.csv")
- df_csv = df.copy()
- df_csv['min_threshold'] = min_threshold
- df_csv['max_threshold'] = max_threshold
- df_csv.to_csv(csv_filename, index=False)
- logger.info(f"Collision Severity chart saved to: {chart_filename}")
- logger.info(f"Collision Severity data saved to: {csv_filename}")
- return chart_filename
- except Exception as e:
- logger.error(f"Failed to generate Collision Severity chart: {str(e)}", exc_info=True)
- return None
- def generate_traffic_chart_data(traffic_calculator, metric_name: str, output_dir: Optional[str] = None) -> Optional[
- str]:
- """Generate chart data for traffic metrics"""
- # 待实现
- return None
- def calculate_distance(ego_df, correctwarning):
- """计算预警距离"""
- dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
- return dist
- def calculate_relative_speed(ego_df, correctwarning):
- """计算相对速度"""
- return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
- # 使用function.py中已实现的scenario_sign_dict
- from modules.metric.function import scenario_sign_dict
- if __name__ == "__main__":
- # 测试代码
- print("Metrics visualization utilities loaded.")
|