123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2025 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: zhanghaiwen(zhanghaiwen@china-icv.cn)
- @Data: 2025/01/5
- @Last Modified: 2025/01/5
- @Summary: Function Metrics Calculation
- """
- import sys
- from pathlib import Path
- # 添加项目根目录到系统路径
- root_path = Path(__file__).resolve().parent.parent
- sys.path.append(str(root_path))
- print(root_path)
- from modules.lib.score import Score
- from modules.lib.log_manager import LogManager
- import numpy as np
- from typing import Dict, Tuple, Optional, Callable, Any
- import pandas as pd
- import yaml
- from modules.lib.chart_generator import generate_function_chart_data
- from shapely.geometry import Point, Polygon
- from modules.lib.common import get_interpolation
- import pandas as pd
- import yaml
- import math
- # ----------------------
- # 基础工具函数 (Pure functions)
- # ----------------------
- scenario_sign_dict = {
- "LeftTurnAssist": 206,
- "HazardousLocationW": 207,
- "RedLightViolationW": 208,
- "AbnormalVehicleW": 209,
- "NsightVulnerableRoadUserCollisionW": 210,
- "LitterW": 211,
- "ForwardCollision": 212,
- "VisibilityW": 213,
- "EmergencyBrakeW": 214,
- "IntersectionCollisionW": 215,
- "BlindSpotW": 216,
- "DoNotPassW": 217,
- "ControlLossW": 218,
- "FrontTrafficJamW": 219,
- "EmergencyVehicleW": 220,
- "CooperativeVehicleMerge": 221,
- "CooperativeLaneChange": 223,
- "VulnerableRoadUserCollisionW": 224,
- "CooperativeIntersectionPassing": 225,
- "RampMerge": 226,
- "DrivingLaneRecommendation": 227,
- "TrafficJamW": 228,
- "DynamicSpeedLimitingInformation": 229,
- "EmergencyVehiclesPriority": 230,
- "RemoteSupervision": 231,
- "SignalLightReminder": 232,
- "OtherVehicleRedLightViolationW": 233,
- "GreenLightOptimalSpeedAdvisory": 234,
- }
- def _is_pedestrian_in_crosswalk(polygon, test_point) -> bool:
- polygon = Polygon(polygon)
- point = Point(test_point)
- return polygon.contains(point)
- def _is_segment_by_interval(time_list, expected_step) -> list:
- """
- 根据时间戳之间的间隔进行分段。
- 参数:
- time_list (list): 时间戳列表。
- expected_step (float): 预期的固定步长。
- 返回:
- list: 分段后的时间戳列表,每个元素是一个子列表。
- """
- if not time_list:
- return []
- segments = []
- current_segment = [time_list[0]]
- for i in range(1, len(time_list)):
- actual_step = time_list[i] - time_list[i - 1]
- if actual_step != expected_step:
- # 如果间隔不符合预期,则开始一个新的段
- segments.append(current_segment)
- current_segment = [time_list[i]]
- else:
- # 否则,将当前时间戳添加到当前段中
- current_segment.append(time_list[i])
- # 添加最后一个段
- if current_segment:
- segments.append(current_segment)
- return segments
- # 寻找二级指标的名称
- def find_nested_name(data):
- """
- 查找字典中嵌套的name结构。
- :param data: 要搜索的字典
- :return: 找到的第一个嵌套name结构的值,如果没有找到则返回None
- """
- if isinstance(data, dict):
- for key, value in data.items():
- if isinstance(value, dict) and 'name' in value:
- return value['name']
- # 递归查找嵌套字典
- result = find_nested_name(value)
- if result is not None:
- return result
- elif isinstance(data, list):
- for item in data:
- result = find_nested_name(item)
- if result is not None:
- return result
- return None
- def calculate_distance_PGVIL(ego_pos: np.ndarray, obj_pos: np.ndarray) -> np.ndarray:
- """向量化距离计算"""
- return np.linalg.norm(ego_pos - obj_pos, axis=1)
- def calculate_relative_speed_PGVIL(
- ego_speed: np.ndarray, obj_speed: np.ndarray
- ) -> np.ndarray:
- """向量化相对速度计算"""
- return np.linalg.norm(ego_speed - obj_speed, axis=1)
- def calculate_distance(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
- """向量化距离计算"""
- dist = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['relative_dist']
- return dist
- def calculate_relative_speed(ego_df: pd.DataFrame, correctwarning: int) -> np.ndarray:
- """向量化相对速度计算"""
- return ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]['composite_v']
- def extract_ego_obj(data: pd.DataFrame) -> Tuple[pd.Series, pd.DataFrame]:
- """数据提取函数"""
- ego = data[data["playerId"] == 1].iloc[0]
- obj = data[data["playerId"] != 1]
- return ego, obj
- def get_first_warning(data_processed) -> Optional[pd.DataFrame]:
- """带缓存的预警数据获取"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- scenario_name = find_nested_name(data_processed.function_config["function"])
- correctwarning = scenario_sign_dict.get(scenario_name)
- if correctwarning is None:
- print("无法获取正确的预警信号标志位!")
- return None
- warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
- warning_times = warning_rows['simTime']
- if warning_times.empty:
- print("没有找到预警数据!")
- return None
- first_time = warning_times.iloc[0]
- return obj_df[obj_df['simTime'] == first_time]
- def getAxis(heading):
- AxisL = [0, 0]
- AxisW = [0, 0]
- AxisL[0] = math.cos(heading)
- AxisL[1] = math.sin(heading)
- AxisW[0] = -math.sin(heading)
- AxisW[1] = math.cos(heading)
- return AxisL, AxisW
- def getDotMultiply(firstAxis, secondAxis):
- dotMultiply = abs(firstAxis[0] * secondAxis[0] + firstAxis[1] * secondAxis[1])
- return dotMultiply
- def getProjectionRadius(AxisL, AxisW, baseAxis, halfLength, halfWidth):
- projAxisL = getDotMultiply(AxisL, baseAxis)
- projAxisW = getDotMultiply(AxisW, baseAxis)
- projectionRadius = halfLength * projAxisL + halfWidth * projAxisW
- return projectionRadius
- def isCollision(
- firstAxisL,
- firstAxisW,
- firstHalfLength,
- firstHalfWidth,
- secondAxisL,
- secondAxisW,
- secondHalfLength,
- secondHalfWidth,
- disVector,
- ):
- isCollision = True
- axes = [firstAxisL, firstAxisW, secondAxisL, secondAxisW]
- addProjectionRadius = 0
- projectionDisVecotr = 0
- for axis in axes:
- addProjectionRadius = getProjectionRadius(
- firstAxisL, firstAxisW, axis, firstHalfLength, firstHalfWidth
- ) + getProjectionRadius(
- secondAxisL, secondAxisW, axis, secondHalfLength, secondHalfWidth
- )
- projectionDisVecotr = getDotMultiply(disVector, axis)
- if projectionDisVecotr > addProjectionRadius:
- isCollision = False
- break
- return isCollision
- def funcIsCollision(
- firstDimX,
- firstDimY,
- firstOffX,
- firstOffY,
- firstX,
- firstY,
- firstHeading,
- secondDimX,
- secondDimY,
- secondOffX,
- secondOffY,
- secondX,
- secondY,
- secondHeading,
- ):
- firstAxisL = getAxis(firstHeading)[0]
- firstAxisW = getAxis(firstHeading)[1]
- secondAxisL = getAxis(secondHeading)[0]
- secondAxisW = getAxis(secondHeading)[1]
- firstHalfLength = 0.5 * firstDimX
- firstHalfWidth = 0.5 * firstDimY
- secondHalfLength = 0.5 * secondDimX
- secondHalfWidth = 0.5 * secondDimY
- firstCenter = [0, 0]
- secondCenter = [0, 0]
- disVector = [0, 0]
- firstCenter[0] = firstX + firstOffX * math.cos(firstHeading)
- firstCenter[1] = firstY + firstOffX * math.sin(firstHeading)
- secondCenter[0] = secondX + secondOffX * math.cos(secondHeading)
- secondCenter[1] = secondY + secondOffX * math.sin(secondHeading)
- disVector[0] = secondCenter[0] - firstCenter[0]
- disVector[1] = secondCenter[1] - firstCenter[1]
- varIsCollision = isCollision(
- firstAxisL,
- firstAxisW,
- firstHalfLength,
- firstHalfWidth,
- secondAxisL,
- secondAxisW,
- secondHalfLength,
- secondHalfWidth,
- disVector,
- )
- return varIsCollision
- # ----------------------
- # 核心计算功能函数
- # ----------------------
- def latestWarningDistance_LST(data, plot_path) -> dict:
- """预警距离计算流水线"""
- scenario_name = find_nested_name(data.function_config["function"])
- value = data.function_config["function"][scenario_name]["latestWarningDistance_LST"]["max"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- warning_dist = calculate_distance(ego_df, correctwarning)
- warning_speed = calculate_relative_speed(ego_df, correctwarning)
- # 将计算结果保存到data对象中,供图表生成使用
- data.warning_dist = warning_dist
- data.warning_speed = warning_speed
- data.correctwarning = correctwarning
- if len(warning_dist.tolist()) == 0:
- return {"latestWarningDistance_LST": 0.0}
- # 生成图表数据
- generate_function_chart_data(data, 'latestWarningDistance_LST', plot_path)
- return {"latestWarningDistance_LST": warning_dist.tolist()[-1] if len(warning_dist) > 0 else value}
- def earliestWarningDistance_LST(data, plot_path) -> dict:
- """预警距离计算流水线"""
- scenario_name = find_nested_name(data.function_config["function"])
- value = data.function_config["function"][scenario_name]["earliestWarningDistance_LST"]["max"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- warning_dist = calculate_distance(ego_df, correctwarning)
- warning_speed = calculate_relative_speed(ego_df, correctwarning)
- # 将计算结果保存到data对象中,供图表生成使用
- data.warning_dist = warning_dist
- data.warning_speed = warning_speed
- data.correctwarning = correctwarning
- if len(warning_dist.tolist()) == 0:
- return {"earliestWarningDistance_LST": 0.0}
- # 生成图表数据
- generate_function_chart_data(data, 'earliestWarningDistance_LST', plot_path)
- return {"earliestWarningDistance_LST": float(warning_dist.tolist()[0]) if len(warning_dist) > 0 else value}
- def latestWarningDistance_TTC_LST(data, plot_path) -> dict:
- """TTC计算流水线"""
- scenario_name = find_nested_name(data.function_config["function"])
- value = data.function_config["function"][scenario_name]["latestWarningDistance_TTC_LST"]["max"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- warning_dist = calculate_distance(ego_df, correctwarning)
- if len(warning_dist.tolist()) == 0:
- return {"latestWarningDistance_TTC_LST": 0.0}
- # 将correctwarning保存到data对象中,供图表生成使用
- data.correctwarning = correctwarning
- warning_speed = calculate_relative_speed(ego_df, correctwarning)
- with np.errstate(divide='ignore', invalid='ignore'):
- ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
- # 处理无效的TTC值
- for i in range(len(ttc)):
- ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
- data.warning_dist = warning_dist
- data.warning_speed = warning_speed
- data.ttc = ttc
- # 生成图表数据
- # from modules.lib.chart_generator import generate_function_chart_data
- generate_function_chart_data(data, 'latestWarningDistance_TTC_LST', plot_path)
- return {"latestWarningDistance_TTC_LST": float(ttc[-1]) if len(ttc) > 0 else value}
- def earliestWarningDistance_TTC_LST(data, plot_path) -> dict:
- """TTC计算流水线"""
- scenario_name = find_nested_name(data.function_config["function"])
- value = data.function_config["function"][scenario_name]["earliestWarningDistance_TTC_LST"]["max"]
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- warning_dist = calculate_distance(ego_df, correctwarning)
- if len(warning_dist.tolist()) == 0:
- return {"earliestWarningDistance_TTC_LST": 0.0}
- # 将correctwarning保存到data对象中,供图表生成使用
- data.correctwarning = correctwarning
- warning_speed = calculate_relative_speed(ego_df, correctwarning)
- with np.errstate(divide='ignore', invalid='ignore'):
- ttc = np.where(warning_speed != 0, warning_dist / warning_speed, np.inf)
- # 处理无效的TTC值
- for i in range(len(ttc)):
- ttc[i] = float(value) if (not ttc[i] or ttc[i] < 0) else ttc[i]
- # 将计算结果保存到data对象中,供图表生成使用
- data.warning_dist = warning_dist
- data.warning_speed = warning_speed
- data.ttc = ttc
- data.correctwarning = correctwarning
- # 生成图表数据
- generate_function_chart_data(data, 'earliestWarningDistance_TTC_LST', plot_path)
- return {"earliestWarningDistance_TTC_LST": float(ttc[0]) if len(ttc) > 0 else value}
- def warningDelayTime_LST(data, plot_path):
- scenario_name = find_nested_name(data.function_config["function"])
- correctwarning = scenario_sign_dict[scenario_name]
- # 将correctwarning保存到data对象中,供图表生成使用
- data.correctwarning = correctwarning
- ego_df = data.ego_data
- HMI_warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning)]['simTime'].tolist()
- simTime_HMI = HMI_warning_rows[0] if len(HMI_warning_rows) > 0 else None
- rosbag_warning_rows = ego_df[(ego_df['event_Type'].notna()) & ((ego_df['event_Type'] != np.nan))][
- 'simTime'].tolist()
- simTime_rosbag = rosbag_warning_rows[0] if len(rosbag_warning_rows) > 0 else None
- if (simTime_HMI is None) or (simTime_rosbag is None):
- print("预警出错!")
- delay_time = 100.0
- else:
- delay_time = abs(simTime_HMI - simTime_rosbag)
- return {"warningDelayTime_LST": delay_time}
- def warningDelayTimeofReachDecel_LST(data, plot_path):
- scenario_name = find_nested_name(data.function_config["function"])
- correctwarning = scenario_sign_dict[scenario_name]
- # 将correctwarning保存到data对象中,供图表生成使用
- data.correctwarning = correctwarning
- ego_df = data.ego_data
- obj_df = data.obj_data[2]
- # 加速度在速度方向上的投影
- speed_mag = np.sqrt(obj_df["speedX"] ** 2 + obj_df["speedY"] ** 2) + 1e-6
- obj_df["ux"] = obj_df["speedX"] / speed_mag
- obj_df["uy"] = obj_df["speedY"] / speed_mag
- obj_df["merged_accel"] = obj_df["accelX"] * obj_df["ux"] + obj_df["accelY"] * obj_df["uy"]
- obj_speed_simtime = obj_df[obj_df['merged_accel'] <= -4]['simTime'].tolist() # 单位m/s^2
- warning_simTime = ego_df[ego_df['ifwarning'] == correctwarning]['simTime'].tolist()
- if (len(warning_simTime) == 0) and (len(obj_speed_simtime) == 0):
- return {"warningDelayTimeofReachDecel_LST": 0}
- elif (len(warning_simTime) == 0) and (len(obj_speed_simtime) > 0):
- return {"warningDelayTimeofReachDecel_LST": obj_speed_simtime[0]}
- elif (len(warning_simTime) > 0) and (len(obj_speed_simtime) == 0):
- return {"warningDelayTimeofReachDecel_LST": -1}
- else:
- return {"warningDelayTimeofReachDecel_LST": warning_simTime[0] - obj_speed_simtime[0]}
- def rightWarningSignal_LST(data, plot_path):
- scenario_name = find_nested_name(data.function_config["function"])
- correctwarning = scenario_sign_dict[scenario_name]
- # 将correctwarning保存到data对象中,供图表生成使用
- data.correctwarning = correctwarning
- ego_df = data.ego_data
- if ego_df['ifwarning'].empty:
- print("无法获取正确预警信号标志位!")
- return
- warning_rows = ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['ifwarning'].notna())]
- if warning_rows.empty:
- return {"rightWarningSignal_LST": -1}
- else:
- return {"rightWarningSignal_LST": 1}
- def noWarning_LST(data, plot_path):
- scenario_name = find_nested_name(data.function_config["function"])
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data.ego_data
- if ego_df['ifwarning'].empty:
- print("无法获取正确预警信号标志位!")
- return
- warning_rows = (ego_df['ifwarning'].dropna() == -1).all()
- if warning_rows:
- return {"noWarning_LST": 1}
- else:
- return {"noWarning_LST": -1}
- # def ifCrossingRedLight_LST(data, plot_path):
- # scenario_name = find_nested_name(data.function_config["function"])
- # correctwarning = scenario_sign_dict[scenario_name]
- # # 将correctwarning保存到data对象中,供图表生成使用
- # data.correctwarning = correctwarning
- # ego_df = data.ego_data
- # redlight_simtime = ego_df[
- # (ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 1) & (ego_df['relative_dist'] <= 0.5) & (
- # ego_df['v'] != 0)]['simTime']
- # if redlight_simtime.empty:
- # return {"ifCrossingRedLight_LST": -1}
- # else:
- # return {"ifCrossingRedLight_LST": 1}
- def ifStopgreenWaveSpeedGuidance_LST(data, plot_path):
- scenario_name = find_nested_name(data.function_config["function"])
- correctwarning = scenario_sign_dict[scenario_name]
- # 将correctwarning保存到data对象中,供图表生成使用
- data.correctwarning = correctwarning
- ego_df = data.ego_data
- greenlight_simtime = \
- ego_df[(ego_df['ifwarning'] == correctwarning) & (ego_df['stateMask'] == 0) & (ego_df['v'] <= 0.1)]['simTime']
- if greenlight_simtime.empty:
- return {"ifStopgreenWaveSpeedGuidance_LST": -1}
- else:
- return {"ifStopgreenWaveSpeedGuidance_LST": 1}
- # ------ 单车智能指标 ------
- def limitSpeed_LST(data, plot_path):
- ego_df = data.ego_data
- scenario_name = find_nested_name(data.function_config["function"])
- limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
- speed_limit = ego_df[abs(ego_df['x_relative_dist']) <= 0.2]['v'].tolist()
- if len(speed_limit) == 0:
- return {"limitSpeed_LST": -1}
- max_speed = max(speed_limit)*3.6
- data.speedLimit = limit_speed
- generate_function_chart_data(data, 'limitspeed_LST', plot_path)
- return {"limitSpeed_LST": max_speed}
- def limitSpeedPastLimitSign_LST(data, plot_path):
- ego_df = data.ego_data
- scenario_name = find_nested_name(data.function_config["function"])
- limit_speed = data.function_config["function"][scenario_name]["limitSpeed_LST"]["max"]
- car_length = data.function_config["vehicle"]['CAR_LENGTH']
- ego_speed = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['v'].tolist()
- ego_time = ego_df[ego_df['x_relative_dist'] <= -100 - car_length]['simTime'].tolist()
- data.speedLimit = limit_speed
- data.speedPastLimitSign_LST = ego_time[0] if len(ego_time) > 0 else None
- generate_function_chart_data(data, 'limitSpeedPastLimitSign_LST', plot_path)
- if len(ego_speed) == 0:
- return {"speedPastLimitSign_LST": -1}
- return {"speedPastLimitSign_LST": ego_speed[0]}
- def leastDistance_LST(data, plot_path):
- ego_df = data.ego_data
- dist_row = ego_df[ego_df['v'] <= 0.1]['relative_dist'].tolist()
- if len(dist_row) == 0:
- return {"leastDistance_LST": -1}
- else:
- min_dist = min(dist_row)
- return {"leastDistance_LST": min_dist}
- def noCrossRedLight_LST(data, plot_path):
- ego_df = data.ego_data
- cross_redlight_dist = ego_df[(ego_df['stateMask'] == 3) & (ego_df['filtered_trafficlight_id'] is not None) & abs(ego_df['v'] > 0.1)]['x_relative_dist']
- has_positive = any(x > 0 for x in cross_redlight_dist)
- has_negative = any(x < 0 for x in cross_redlight_dist)
- has_both = has_positive and has_negative
- if has_both:
- return {'noCrossRedLight_LST': -1}
- else:
- return {'noCrossRedLight_LST': 1}
- def launchTimeinStopLine_LST(data, plot_path):
- ego_df = data.ego_data
- simtime_row = ego_df[ego_df['v'] <= 0.1]['simTime'].tolist()
- if len(simtime_row) == 0:
- return {"launchTimeinStopLine_LST": -1}
- else:
- delta_t = simtime_row[-1] - simtime_row[0]
- return {"launchTimeinStopLine_LST": delta_t}
- def launchTimewhenFollowingCar_LST(data, plot_path):
- ego_df = data.ego_data
- t_interval = ego_df['simTime'].tolist()[1] - ego_df['simTime'].tolist()[0]
- simtime_row = ego_df[ego_df['v'] <= 0.1]['simTime'].tolist()
- if len(simtime_row) == 0:
- return {"launchTimewhenFollowingCar_LST": 0}
- else:
- time_interval = _is_segment_by_interval(simtime_row, t_interval)
- delta_t = []
- for t in time_interval:
- delta_t.append(t[-1] - t[0])
- return {"launchTimewhenFollowingCar_LST": max(delta_t)}
- def noStop_LST(data, plot_path):
- ego_df_ini = data.ego_data
- min_time = ego_df_ini['simTime'].min() + 0
- max_time = ego_df_ini['simTime'].max() - 0
- ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
- speed = ego_df['v'].tolist()
- if (any(speed) <= 0.5):
- return {"noStop_LST": -1}
- else:
- return {"noStop_LST": 1}
- def launchTimeinTrafficLight_LST(data, plot_path):
- '''
- 待修改:
- 红灯的状态值:1
- 绿灯的状态值:0
- '''
- ego_df = data.ego_data
- simtime_of_redlight = ego_df[(ego_df['stateMask'] == 3) & (ego_df['filtered_trafficlight_id'] is not None)]['simTime']
- simtime_of_stop = ego_df[ego_df['v'] <= 0.1]['simTime']
- if len(simtime_of_stop) or len(simtime_of_redlight):
- return {"launchTimeinTrafficLight_LST": -1}
- simtime_of_launch = simtime_of_redlight[simtime_of_redlight.isin(simtime_of_stop)].tolist()
- if len(simtime_of_launch) == 0:
- return {"launchTimeinTrafficLight_LST": -1}
- return {"launchTimeinTrafficLight_LST": simtime_of_launch[-1] - simtime_of_launch[0]}
- def crossJunctionToTargetLane_LST(data, plot_path):
- ego_df = data.ego_data
- lane_in_leftturn = set(ego_df['lane_id'].tolist())
- scenario_name = find_nested_name(data.function_config["function"])
- if (30225902 in lane_in_leftturn) or (30226005 in lane_in_leftturn):
- return {"crossJunctionToTargetLane_LST": 1}
- else:
- return {"crossJunctionToTargetLane_LST": -1}
- def keepInLane_LST(data, plot_path):
- # ego_df = data.ego_data[data.ego_data['road_type'] == 15]
- ego_df = data.ego_data[data.ego_data['lane_id'].isin([30225904, 30225978])].copy()
- notkeepinlane = ego_df[ego_df['laneOffset'] > ego_df['lane_width'] / 2]
- if len(notkeepinlane) == 0:
- return {"keepInLane_LST": -1}
- return {"keepInLane_LST": 1}
- def leastLateralDistance_LST(data, plot_path):
- ego_df = data.ego_data
- lane_width = ego_df[abs(ego_df['x_relative_dist']) <= 0.5]['lane_width'].tolist()[0]
- if len(lane_width) == 0:
- return {"leastLateralDistance_LST": -1}
- else:
- y_relative_dist = ego_df[abs(ego_df['x_relative_dist']) <= 0.05]['y_relative_dist']
- if (abs(y_relative_dist) <= lane_width / 2).all():
- return {"leastLateralDistance_LST": 1}
- else:
- return {"leastLateralDistance_LST": -1}
- def waitTimeAtCrosswalkwithPedestrian_LST(data, plot_path):
- ego_df = data.ego_data
- object_df = data.obj_data
- data['in_crosswalk'] = []
- position_data = data.drop_duplicates(subset=['cross_id', 'cross_coords'], inplace=True)
- for cross_id in position_data['cross_id'].tolist():
- for posX, posY in object_df['posX', 'posY']:
- pedestrian_pos = (posX, posY)
- plogan_pos = position_data[position_data['cross_id'] == cross_id]['cross_coords'].tolist()[0]
- if _is_pedestrian_in_crosswalk(plogan_pos, pedestrian_pos):
- data[data['playerId'] == 2]['in_crosswalk'] = 1
- else:
- data[data['playerId'] == 2]['in_crosswalk'] = 0
- car_stop_time = ego_df[ego_df['v'] <= 0.1]['simTime']
- pedestrian_in_crosswalk_time = data[(data['in_crosswalk'] == 1)]['simTime']
- car_wait_pedestrian = car_stop_time.isin(pedestrian_in_crosswalk_time).tolist()
- return {'waitTimeAtCrosswalkwithPedestrian_LST': car_wait_pedestrian[-1] - car_wait_pedestrian[0] if len(
- car_wait_pedestrian) > 0 else 0}
- def launchTimewhenPedestrianLeave_LST(data, plot_path):
- ego_df = data.ego_data
- car_stop_time = ego_df[ego_df['v'] <= 0.1]["simTime"]
- if len(car_stop_time) == 0:
- return {"launchTimewhenPedestrianLeave_LST": 0}
- else:
- lane_width = ego_df[ego_df['v'] <= 0.1]['lane_width'].tolist()[0]
- car_to_pedestrain = ego_df[abs(ego_df['y_relative_dist']) <= lane_width / 2]["simTime"]
- legal_stop_time = car_stop_time.isin(car_to_pedestrain).tolist()
- return {"launchTimewhenPedestrianLeave_LST": legal_stop_time[-1] - legal_stop_time[0]}
- def noCollision_LST(data, plot_path):
- ego_df = data.ego_data
- is_dist_min = False
- is_sign_change = False
- for i in range(1, len(ego_df) - 1):
- relative_dist_decreasing = ego_df['relative_dist'][i-1] > ego_df['relative_dist'][i]
- relative_dist_increasing = ego_df['relative_dist'][i+1] > ego_df['relative_dist'][i]
- is_dist_min = relative_dist_decreasing and relative_dist_increasing and (ego_df['relative_dist'][i] < 0.5)
- x_relative = (ego_df['x_relative_dist'][i-1]*ego_df['x_relative_dist'][i]) < 0
- y_relative = (ego_df['y_relative_dist'][i-1]*ego_df['y_relative_dist'][i]) < 0
- is_sign_change = x_relative or y_relative
- if is_dist_min and is_sign_change:
- return {"noCollision_LST": -1}
- return {"noCollision_LST": 1}
- def noReverse_LST(data, plot_path):
- ego_df = data.ego_data
- if (ego_df["lon_v_vehicle"] * ego_df["posH"]).any() < 0:
- return {"noReverse_LST": -1}
- else:
- return {"noReverse_LST": 1}
- def turnAround_LST(data, plot_path):
- ego_df = data.ego_data
- if (ego_df["lon_v_vehicle"].tolist()[0] * ego_df["lon_v_vehicle"].tolist()[-1] < 0) and (
- ego_df["lon_v_vehicle"] * ego_df["posH"].all() > 0):
- return {"turnAround_LST": 1}
- else:
- return {"turnAround_LST": -1}
- def laneOffset_LST(data, plot_path):
- car_width = data.vehicle_config['CAR_WIDTH']
- ego_df_ini = data.ego_data
- min_time = ego_df_ini['simTime'].min() + 0
- max_time = ego_df_ini['simTime'].max() - 0
- ego_df = ego_df_ini[(ego_df_ini['simTime'] >= min_time) & (ego_df_ini['simTime'] <= max_time)]
- laneoffset = abs(ego_df['y_relative_dist']) - car_width / 2
- return {"laneOffset_LST": max(laneoffset)}
- def maxLongitudeDist_LST(data, plot_path):
- ego_df = data.ego_data
- longitude_dist = abs(ego_df[ego_df['v'] <= 0.2]['x_relative_dist'])
- data.longitude_dist = min(abs(ego_df[ego_df['v'] <= 0.2]['x_relative_dist']), default=0)
- stop_time = ego_df[abs(ego_df['x_relative_dist']) == min(longitude_dist)]['simTime'].tolist()
- data.stop_time = min(stop_time)
- if len(longitude_dist) == 0:
- return {"maxLongitudeDist_LST": -1}
- generate_function_chart_data(data, 'maxLongitudeDist_LST', plot_path)
- return {"maxLongDist_LST": min(longitude_dist)}
- def noEmergencyBraking_LST(data, plot_path):
- ego_df = data.ego_data
- ego_df['ip_dec'] = ego_df['v'].apply(
- get_interpolation, point1=[18, -5], point2=[72, -3.5])
- ego_df['slam_brake'] = (ego_df['accleX'] - ego_df['ip_dec']).apply(
- lambda x: 1 if x < 0 else 0)
- if sum(ego_df['slam_brake']) == 0:
- return {"noEmergencyBraking_LST": 1}
- else:
- return {"noEmergencyBraking_LST": -1}
- def rightWarningSignal_PGVIL(data_processed, plot_path) -> dict:
- """判断是否发出正确预警信号"""
- ego_df = data_processed.ego_data
- scenario_name = find_nested_name(data_processed.function_config["function"])
- correctwarning = scenario_sign_dict[scenario_name]
- if correctwarning is None:
- print("无法获取正确的预警信号标志位!")
- return None
- # 找出本行 correctwarning 和 ifwarning 相等,且 correctwarning 不是 NaN 的行
- warning_rows = ego_df[
- (ego_df["ifwarning"] == correctwarning) & (ego_df["ifwarning"].notna())
- ]
- if warning_rows.empty:
- return {"rightWarningSignal_PGVIL": -1}
- else:
- return {"rightWarningSignal_PGVIL": 1}
- def latestWarningDistance_PGVIL(data_processed, plot_path) -> dict:
- """预警距离计算流水线"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"latestWarningDistance_PGVIL": -11}
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- # dis_pos2head = 5.929 / 2 + 1.982
- ego, obj = extract_ego_obj(warning_data)
- distances = calculate_distance_PGVIL(
- np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
- ) - dis_pos2head
- # 将计算结果保存到data对象中,供图表生成使用
- data_processed.warning_dist = distances
- data_processed.warning_time = ego['simTime'].tolist()
- if distances.size == 0:
- print("没有找到数据!")
- return {"latestWarningDistance_PGVIL": -1} # 或返回其他默认值,如0.0
- generate_function_chart_data(data_processed, 'latestWarningDistance_PGVIL', plot_path)
- return {"latestWarningDistance_PGVIL": float(np.min(distances))}
- def latestWarningDistancetoConflictPoint_PGVIL(data_processed, plot_path) -> dict:
- """预警距离计算流水线"""
- ego_df = data_processed.ego_data
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"latestWarningDistancetoConflictPoint_PGVIL": -11}
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- ego, obj = extract_ego_obj(warning_data)
- ConflictPoint = obj[obj["type"] == 25].copy()
- distances = calculate_distance_PGVIL(
- np.array([[ego["posX"], ego["posY"]]]), ConflictPoint[["posX", "posY"]].values
- ) - dis_pos2head
- # 将计算结果保存到data对象中,供图表生成使用
- data_processed.warning_dist = distances
- data_processed.warning_time = ego['simTime'].tolist()
- if distances.size == 0:
- print("没有找到数据!")
- return {"latestWarningDistancetoConflictPoint_PGVIL": -1} # 或返回其他默认值,如0.0
- generate_function_chart_data(data_processed, 'latestWarningDistancetoConflictPoint_PGVIL', plot_path)
- return {"latestWarningDistancetoConflictPoint_PGVIL": float(np.min(distances))}
- def latestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict:
- """TTC计算流水线"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- # dis_pos2head = 5.929 / 2 + 1.982
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"latestWarningDistance_TTC_PGVIL": 0.0}
- ego, obj = extract_ego_obj(warning_data)
- # 向量化计算
- ego_pos = np.array([[ego["posX"], ego["posY"]]])
- ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
- obj_pos = obj[["posX", "posY"]].values
- obj_speed = obj[["speedX", "speedY"]].values
- distances = calculate_distance_PGVIL(ego_pos, obj_pos) - dis_pos2head
- rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
- data_processed.warning_dist = distances
- data_processed.warning_speed = rel_speeds
- data_processed.warning_time = ego['simTime'].tolist()
- with np.errstate(divide="ignore", invalid="ignore"):
- ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
- if ttc.size == 0:
- print("没有找到数据!")
- return {"latestWarningDistance_TTC_PGVIL": -2} # 或返回其他默认值,如0.0
- data_processed.ttc = ttc
- generate_function_chart_data(data_processed, 'latestWarningDistance_TTC_PGVIL', plot_path)
- return {"latestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
- def latestWarningDistancetoConflictPoint_TTC_PGVIL(data_processed, plot_path) -> dict:
- """TTC计算流水线"""
- ego_df = data_processed.ego_data
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -11}
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- ego, obj = extract_ego_obj(warning_data)
- # 向量化计算
- ego_pos = np.array([[ego["posX"], ego["posY"]]])
- ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
- ConflictPoint = obj[obj["type"] == 25].copy()
- if ConflictPoint.empty:
- print("未找到 type == 25 的目标物")
- return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -22}
- ConflictPoint_pos = ConflictPoint[["posX", "posY"]].values
- ConflictPoint_speed = obj[["speedX", "speedY"]].values
- distances = calculate_distance_PGVIL(ego_pos, ConflictPoint_pos) - dis_pos2head
- rel_speeds = calculate_relative_speed_PGVIL(ego_speed, ConflictPoint_speed)
- data_processed.warning_dist = distances
- data_processed.warning_speed = rel_speeds
- data_processed.warning_time = ego['simTime'].tolist()
- with np.errstate(divide="ignore", invalid="ignore"):
- ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
- if ttc.size == 0:
- print("没有找到数据!")
- return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -33} # 或返回其他默认值,如0.0
- data_processed.ttc = ttc
- generate_function_chart_data(data_processed, 'latestWarningDistancetoConflictPoint_TTC_PGVIL', plot_path)
- return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": float(np.nanmin(ttc))}
- def earliestWarningDistance_PGVIL(data_processed, plot_path) -> dict:
- """预警距离计算流水线"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"earliestWarningDistance_PGVIL": -11}
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- # dis_pos2head = 5.929 / 2 + 1.982
- ego, obj = extract_ego_obj(warning_data)
- distances = calculate_distance_PGVIL(
- np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
- ) - dis_pos2head
- # 将计算结果保存到data对象中,供图表生成使用
- data_processed.warning_dist = distances
- data_processed.warning_time = ego['simTime'].tolist()
- if distances.size == 0:
- print("没有找到数据!")
- return {"earliestWarningDistance_PGVIL": -22} # 或返回其他默认值,如0.0
- generate_function_chart_data(data_processed, 'earliestWarningDistance_PGVIL', plot_path)
- return {"earliestWarningDistance_PGVIL": float(np.min(distances))}
- def earliestWarningDistancetoConflictPoint_PGVIL(data_processed, plot_path) -> dict:
- """预警距离计算流水线"""
- ego_df = data_processed.ego_data
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"earliestWarningDistancetoConflictPoint_PGVIL": -11}
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- ego, obj = extract_ego_obj(warning_data)
- distances = calculate_distance_PGVIL(
- np.array([[ego["posX"], ego["posY"]]]), obj[["posX", "posY"]].values
- ) - dis_pos2head
- # 将计算结果保存到data对象中,供图表生成使用
- data_processed.warning_dist = distances
- data_processed.warning_time = ego['simTime'].tolist()
- if distances.size == 0:
- print("没有找到数据!")
- return {"earliestWarningDistancetoConflictPoint_PGVIL": -22} # 或返回其他默认值,如0.0
- generate_function_chart_data(data_processed, 'earliestWarningDistancetoConflictPoint_PGVIL', plot_path)
- return {"earliestWarningDistancetoConflictPoint_PGVIL": float(np.min(distances))}
- def earliestWarningDistance_TTC_PGVIL(data_processed, plot_path) -> dict:
- """TTC计算流水线"""
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"earliestWarningDistance_TTC_PGVIL": -11}
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- # dis_pos2head = 5.929 / 2 + 1.982
- ego, obj = extract_ego_obj(warning_data)
- # 向量化计算
- ego_pos = np.array([[ego["posX"], ego["posY"]]])
- ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
- obj_pos = obj[["posX", "posY"]].values
- obj_speed = obj[["speedX", "speedY"]].values
- distances = calculate_distance_PGVIL(ego_pos, obj_pos) - dis_pos2head
- rel_speeds = calculate_relative_speed_PGVIL(ego_speed, obj_speed)
- data_processed.warning_dist = distances
- data_processed.warning_speed = rel_speeds
- data_processed.warning_time = ego['simTime'].tolist()
- with np.errstate(divide="ignore", invalid="ignore"):
- ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
- if ttc.size == 0:
- print("没有找到数据!")
- return {"earliestWarningDistance_TTC_PGVIL": -22} # 或返回其他默认值,如0.0
- data_processed.ttc = ttc
- generate_function_chart_data(data_processed, 'earliestWarningDistance_TTC_PGVIL', plot_path)
- return {"earliestWarningDistance_TTC_PGVIL": float(np.nanmin(ttc))}
- def earliestWarningDistancetoConflictPoint_TTC_PGVIL(data_processed, plot_path) -> dict:
- """TTC计算流水线"""
- ego_df = data_processed.ego_data
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": -11}
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- ego, obj = extract_ego_obj(warning_data)
- ConflictPoint = obj[obj["type"] == 25].copy()
- if ConflictPoint.empty:
- print("未找到 type == 25 的目标物")
- return {"latestWarningDistancetoConflictPoint_TTC_PGVIL": -22}
- # 向量化计算
- ego_pos = np.array([[ego["posX"], ego["posY"]]])
- ego_speed = np.array([[ego["speedX"], ego["speedY"]]])
- ConflictPoint_pos = ConflictPoint[["posX", "posY"]].values
- ConflictPoint_speed = ConflictPoint[["speedX", "speedY"]].values
- distances = calculate_distance_PGVIL(ego_pos, ConflictPoint_pos) - dis_pos2head
- rel_speeds = calculate_relative_speed_PGVIL(ego_speed, ConflictPoint_speed)
- data_processed.warning_dist = distances
- data_processed.warning_speed = rel_speeds
- data_processed.warning_time = ego['simTime'].tolist()
- with np.errstate(divide="ignore", invalid="ignore"):
- ttc = np.where(rel_speeds != 0, distances / rel_speeds, np.inf)
- if ttc.size == 0:
- print("没有找到数据!")
- return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": -22} # 或返回其他默认值,如0.0
- data_processed.ttc = ttc
- generate_function_chart_data(data_processed, 'earliestWarningDistancetoConflictPoint_TTC_PGVIL', plot_path)
- return {"earliestWarningDistancetoConflictPoint_TTC_PGVIL": float(np.nanmin(ttc))}
- def warningDelayTimeofReachDecel_PGVIL(data_processed, plot_path) -> dict:
- # 预警时机相对背景车辆减速度达到-4m/s2后的时延
- ego_df = data_processed.ego_data
- obj_df = data_processed.object_df
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"warningDelayTimeofReachDecel_PGVIL": -1}
- try:
- ego, _ = extract_ego_obj(warning_data)
- warning_time = ego["simTime"] if isinstance(ego, pd.Series) else ego["simTime"].iloc[0]
- # 回溯:只取在预警前的背景车轨迹
- obj_pre = obj_df[(obj_df["playerId"] == 2) & (obj_df["simTime"] < warning_time)].copy()
- if len(obj_pre) < 2:
- print("预警前目标车辆轨迹不足2帧,无法计算减速度")
- return {"warningDelayTimeofReachDecel_PGVIL": -2}
- obj_pre = obj_pre.sort_values(by="simTime").reset_index(drop=True)
- obj_speed = obj_pre[["speedX", "speedY"]].values
- simtime_gap = obj_pre["simTime"].iloc[1] - obj_pre["simTime"].iloc[0]
- obj_speed_magnitude = np.linalg.norm(obj_speed, axis=1)
- obj_deceleration = np.diff(obj_speed_magnitude) / simtime_gap
- if np.min(obj_deceleration) < -4:
- max_index = np.argmin(obj_deceleration)
- decel_time = obj_pre["simTime"].iloc[max_index + 1] # 注意 +1 是因为 diff 少1行
- delay_time = warning_time - decel_time
- return {"warningDelayTimeofReachDecel_PGVIL": round(float(delay_time), 3)}
- else:
- print("未发现背景车减速度低于 -4 m/s² 的时刻")
- return {"warningDelayTimeofReachDecel_PGVIL": -3}
- except Exception as e:
- print("异常:", e)
- return {"warningDelayTimeofReachDecel_PGVIL": -99}
- def warningDelayTime_PGVIL(data_processed, plot_path) -> dict:
- """车端接收到预警到HMI发出预警的时延"""
- ego_df = data_processed.ego_data
- # #打印ego_df的列名
- # print(ego_df.columns.tolist())
- warning_data = get_first_warning(data_processed)
- if warning_data is None:
- return {"warningDelayTime_PGVIL": -1}
- try:
- ego, obj = extract_ego_obj(warning_data)
- # 找到event_Type不为空,且playerID为1的行
- rosbag_warning_rows = ego_df[(ego_df["event_Type"].notna())]
- first_time = rosbag_warning_rows["simTime"].iloc[0]
- warning_time = warning_data[warning_data["playerId"] == 1]["simTime"].iloc[0]
- delay_time = warning_time - first_time
- return {"warningDelayTime_PGVIL": float(delay_time)}
- except Exception as e:
- print(f"计算预警时延时发生错误: {e}")
- return {"warningDelayTime_PGVIL": -1}
- def noWarning_PGVIL(data_processed, plot_path) -> dict:
- scenario_name = find_nested_name(data_processed.function_config["function"])
- correctwarning = scenario_sign_dict[scenario_name]
- ego_df = data_processed.ego_data
- if ego_df['ifwarning'].empty:
- print("无法获取正确预警信号标志位!")
- return
- warning_rows = (ego_df['ifwarning'].dropna() == -1).all()
- if warning_rows:
- return {"noWarning_PGVIL": 1}
- else:
- return {"noWarning_PGVIL": -1}
- def get_car_to_stop_line_distance(ego, car_point, stop_line_points):
- """
- 计算主车后轴中心点到停止线的距离
- :return 距离
- """
- distance_carpoint_carhead = ego["dimX"] / 2 + ego["offX"]
- # 计算停止线的方向向量
- line_vector = np.array(
- [
- stop_line_points[1][0] - stop_line_points[0][0],
- stop_line_points[1][1] - stop_line_points[0][1],
- ]
- )
- direction_vector_norm = np.linalg.norm(line_vector)
- direction_vector_unit = (
- line_vector / direction_vector_norm
- if direction_vector_norm != 0
- else np.array([0, 0])
- )
- # 计算主车后轴中心点到停止线投影的坐标(垂足)
- projection_length = np.dot(car_point - stop_line_points[0], direction_vector_unit)
- perpendicular_foot = stop_line_points[0] + projection_length * direction_vector_unit
- # 计算主车后轴中心点到垂足的距离
- distance_to_foot = np.linalg.norm(car_point - perpendicular_foot)
- carhead_distance_to_foot = distance_to_foot - distance_carpoint_carhead
- return carhead_distance_to_foot
- def ifCrossingRedLight_PGVIL(data_processed, plot_path) -> dict:
- GREEN = 0x100000 # 1048576
- YELLOW = 0x1000000 # 16777216
- RED = 0x10000000 # 268435456
- ego_df = data_processed.ego_data.copy()
- stopline_df = data_processed.object_df.copy()
- # 过滤出停止线对象(type==25),只保留两端点
- stopline_df = stopline_df.loc[
- stopline_df["type"] == 25, ["simTime", "playerId", "posX", "posY"]
- ]
- # 去除无效stateMask帧
- ego_df = ego_df[ego_df["stateMask"].notna()].copy()
- ego_df.sort_values(by="simTime", inplace=True)
- dis_pos2head = ego_df["dimX"].iloc[0] / 2 + ego_df["offX"].iloc[0]
- red_light_violation = False
- prev_distance = float("inf")
- for index, ego in ego_df.iterrows():
- simTime = ego["simTime"]
- car_point = (ego["posX"], ego["posY"])
- stateMask = int(ego["stateMask"])
- # 获取该时刻的两个停止线点(playerId 2 和 3)
- p1_row = stopline_df[(stopline_df["playerId"] == 2) & (stopline_df["simTime"] == simTime)]
- p2_row = stopline_df[(stopline_df["playerId"] == 3) & (stopline_df["simTime"] == simTime)]
- # 如果缺失,跳过该帧
- if p1_row.empty or p2_row.empty:
- continue
- p1 = (p1_row["posX"].iloc[0], p1_row["posY"].iloc[0])
- p2 = (p2_row["posX"].iloc[0], p2_row["posY"].iloc[0])
- stop_line_points = np.array([p1, p2], dtype=float)
- distance_to_stopline = get_car_to_stop_line_distance(
- ego, car_point, stop_line_points
- ) - dis_pos2head
- # 如果车头刚刚越线
- if prev_distance > 0 and distance_to_stopline <= 0:
- # 如果不是绿灯,就视为闯红灯
- if stateMask != GREEN:
- red_light_violation = True
- print(f"[Red Light Violation] simTime={simTime}, stateMask={stateMask}")
- break
- prev_distance = distance_to_stopline
- return {"ifCrossingRedLight_PGVIL": -1 if red_light_violation else 1}
- def limitSpeed_PGVIL(data, plot_path):
- ego_df = data.ego_data
- max_speed = max(ego_df["v"])
- if len(ego_df["v"]) == 0:
- return {"speedLimit_PGVIL": -1}
- else:
- return {"speedLimit_PGVIL": round(max_speed * 3.6, 3)}
- def leastDistance_PGVIL(data, plot_path):
- exclude_seconds = 2.0
- ego_df = data.ego_data
- stopline_df = data.object_df
- stopline_df = stopline_df.loc[
- stopline_df["type"] == 25, ["simTime", "playerId", "posX", "posY"]
- ]
- max_sim_time = ego_df["simTime"].max()
- threshold_time = max_sim_time - exclude_seconds
- for index, ego in ego_df.iterrows():
- if ego["simTime"] >= threshold_time:
- continue
- if "v" in ego and ego["v"] == 0:
- current_time = ego["simTime"]
- car_point = (ego["posX"], ego["posY"])
- # p1 = (ego["stopline_x1"], ego["stopline_y1"])
- # p2 = (ego["stopline_x2"], ego["stopline_y2"])
- p1_row = stopline_df[(stopline_df["playerId"] == 2) & (stopline_df["simTime"] == current_time)]
- p2_row = stopline_df[(stopline_df["playerId"] == 3) & (stopline_df["simTime"] == current_time)]
- if p1_row.empty or p2_row.empty:
- continue # 如果找不到,就跳过,继续找下一个停车帧
- p1 = (p1_row["posX"].iloc[0], p1_row["posY"].iloc[0])
- p2 = (p2_row["posX"].iloc[0], p2_row["posY"].iloc[0])
- stop_line_points = np.array([p1, p2], dtype=float)
- distance_to_stopline = get_car_to_stop_line_distance(
- ego, car_point, stop_line_points
- )
- # print(f"distance_to_stopline:{distance_to_stopline}")
- return {"leastDistance_PGVIL": round(distance_to_stopline, 3)}
- return {"leastDistance_PGVIL": -1}
- def launchTimeinStopLine_PGVIL(data, plot_path):
- ego_df = data.ego_data
- # === 先计算有效时间段 ===
- max_sim_time = ego_df["simTime"].max()
- min_sim_time = ego_df["simTime"].min()
- threshold_start = min_sim_time + 3.0
- threshold_end = max_sim_time - 3.0
- # === 只保留有效时间段 ===
- ego_df = ego_df[(ego_df["simTime"] >= threshold_start) & (ego_df["simTime"] <= threshold_end)]
- in_stop = False
- start_time = None
- last_low_time = None
- max_duration = 0.0
- v_thresh = 0.01
- for _, row in ego_df.iterrows():
- simt = row["simTime"]
- v = row.get("v", None)
- if v is None or np.isnan(v) or abs(v) > v_thresh:
- if in_stop:
- duration = last_low_time - start_time
- if duration > max_duration:
- max_duration = duration
- in_stop = False
- else:
- if not in_stop:
- start_time = simt
- in_stop = True
- last_low_time = simt
- if in_stop:
- duration = last_low_time - start_time
- if duration > max_duration:
- max_duration = duration
- if max_duration <= 0:
- return {"launchTimeinStopLine_PGVIL": -1}
- else:
- return {"launchTimeinStopLine_PGVIL": round(float(max_duration), 3)}
- def launchTimeinTrafficLight_PGVIL(data, plot_path):
- GREEN = 1048576 # 0x100000
- RED = 268435456 # 0x10000000
- ego_df = data.ego_data
- mask = (ego_df["stateMask"]).notna()
- ego_df_light = ego_df[mask].copy()
- ego_df_light = ego_df_light.drop_duplicates(subset=["simTime"]).sort_values(by="simTime")
- # 找到第一次红灯 to 绿灯的切换时刻
- is_transition = (
- (ego_df_light["stateMask"] == GREEN) &
- (ego_df_light["stateMask"].shift(1) == RED)
- )
- transition_times = ego_df_light.loc[is_transition, "simTime"]
- if transition_times.empty:
- # print("没有红绿灯切换")
- return {"launchTimeinTrafficLight_PGVIL": -88}
- time_red2green = transition_times.iloc[0]
- car_speed_at_transition = ego_df.loc[ego_df["simTime"] == time_red2green, "v"]
- if not car_speed_at_transition.empty and car_speed_at_transition.iloc[0] > 0.5:
- return {"launchTimeinTrafficLight_PGVIL": -66}
- car_move_times = ego_df.loc[
- (ego_df["simTime"] >= time_red2green) & (ego_df["v"] > 0), "simTime"
- ]
- if car_move_times.empty:
- # print("没有车移动")
- return {"launchTimeinTrafficLight_PGVIL": -99}
- time_move = car_move_times.iloc[0]
- return {"launchTimeinTrafficLight_PGVIL": round(time_move - time_red2green, 3)}
- def crossJunctionToTargetLane_PGVIL(data, plot_path):
- ego_df = data.ego_data
- road_ids = set(ego_df["road_id"].dropna())
- try:
- scenario_name = find_nested_name(data.function_config["function"])
- target_lane_id = data.function_config["function"][scenario_name][
- "crossJunctionToTargetLane_PGVIL"
- ]["max"]
- except KeyError:
- raise ValueError("请在配置文件中指定目标车道ID!")
- result = target_lane_id if target_lane_id in road_ids else -1
- return {"crossJunctionToTargetLane_PGVIL": result}
- def crossJunctionToTargetLane_PGVIL(data, plot_path):
- ego_df = data.ego_data
- road_ids = set(ego_df["road_id"].dropna())
- try:
- scenario_name = find_nested_name(data.function_config["function"])
- target_lane_id = data.function_config["function"][scenario_name][
- "crossJunctionToTargetLane_PGVIL"
- ]["max"]
- except KeyError:
- raise ValueError("请在配置文件中指定目标车道ID!")
- result = target_lane_id if target_lane_id in road_ids else -1
- return {"crossJunctionToTargetLane_PGVIL": result}
- def noStop_PGVIL(data, plot_path):
- exclude_end_seconds = 5.0
- exclude_start_seconds = 5.0
- ego_df = data.ego_data
- min_sim_time = ego_df["simTime"].min()
- max_sim_time = ego_df["simTime"].max()
- start_threshold = min_sim_time + exclude_start_seconds
- end_threshold = max_sim_time - exclude_end_seconds
- filtered_df = ego_df[
- (ego_df["simTime"] >= start_threshold) & (ego_df["simTime"] <= end_threshold)
- ]
- if (filtered_df["v"] == 0).any():
- return {"noStop_PGVIL": -1}
- else:
- return {"noStop_PGVIL": 1}
- def noEmergencyBraking_PGVIL(data, plot_path):
- ego_df = data.ego_data
- ego_df["ip_dec"] = ego_df["v"].apply(
- get_interpolation, point1=[18, -5], point2=[72, -3.5]
- )
- ego_df["slam_brake"] = (ego_df["accelX"] - ego_df["ip_dec"]).apply(
- lambda x: 1 if x < 0 else 0
- )
- if sum(ego_df["slam_brake"]) == 0:
- return {"noEmergencyBraking_PGVIL": 1}
- else:
- return {"noEmergencyBraking_PGVIL": -1}
- def noReverse_PGVIL(data, plot_path):
- ego_df = data.ego_data.copy()
- ego_df["posH"] = (90 - ego_df["posH"]) % 360
- yaw_rad = np.deg2rad(ego_df["posH"])
- # 车头朝向向量
- heading_x = np.cos(yaw_rad)
- heading_y = np.sin(yaw_rad)
- # 速度向量
- speed_x = ego_df["speedX"]
- speed_y = ego_df["speedY"]
- # 点积:判断速度是否与车头方向一致
- dot_product = speed_x * heading_x + speed_y * heading_y
- moving = np.hypot(speed_x, speed_y) > 0.5 # 排除近乎静止的帧(避免误判)
- reverse_flag = (dot_product < 0) & moving # 倒车帧:点积为负 且 在移动
- if reverse_flag.any():
- return {"noReverse_PGVIL": -1} # 存在倒车
- else:
- return {"noReverse_PGVIL": 1} # 全程未倒车
- def laneOffset_PGVIL(data, plot_path):
- ego_df = data.ego_data
- car_width = ego_df["dimY"].loc[0]
- print(f"car_width:{car_width}")
- is_zero = ego_df["v"] == 0
- if not is_zero.any():
- # 全程未停车
- return {"laneOffset_PGVIL": -1}
- groups = (is_zero != is_zero.shift(fill_value=False)).cumsum()
- last_zero_group = groups[is_zero].iloc[-1]
- last_stop = ego_df[groups == last_zero_group]
- # 距离右侧车道线
- edge_dist = (last_stop["width"] / 2 + last_stop["laneOffset"]) - (
- car_width / 2
- )
- return {"laneOffset_PGVIL": round(edge_dist.max(), 3)}
- def maxLongitudelDistance_PGVIL(data, plot_path):
- ego_df = data.ego_data
- object_df = data.object_df
- is_zero = ego_df["v"] == 0
- if not is_zero.any():
- # 全程未停车
- return {"maxLongitudelDistance_PGVIL": -1}
- groups = (is_zero != is_zero.shift(fill_value=False)).cumsum()
- last_zero_group = groups[is_zero].iloc[-1]
- last_stop = ego_df[groups == last_zero_group]
- stop_point = object_df[object_df["type"] == 25]
- stop_x = stop_point["posX"].iloc[0]
- stop_y = stop_point["posY"].iloc[0]
- dx = last_stop["posX"] - stop_x
- dy = last_stop["posY"] - stop_y
- last_stop["posH"] = (90 - last_stop["posH"]) % 360
- heading_rad = last_stop["posH"] # 弧度制
- longitudinal_offset = np.cos(heading_rad) * dx + np.sin(heading_rad) * dy
- max_longitudinal_offset = np.abs(longitudinal_offset).max()
- return {"maxLongitudelDistance_PGVIL": round(max_longitudinal_offset, 3)}
- def keepInLane_PGVIL(data, plot_path):
- ego_df = data.ego_data.copy()
- ego_df = ego_df.dropna(subset=["laneOffset", "lane_width"])
- if ego_df.empty:
- return {"keepInLane_PGVIL": -1}
- threshold = ego_df["lane_width"] / 2
- if (ego_df["laneOffset"].abs() > threshold).any():
- return {"keepInLane_PGVIL": -1}
- else:
- return {"keepInLane_PGVIL": 1}
- # def launchTimewhenPedestrianLeave_PGVIL(data, plot_path):
- # ego_df = data.ego_data
- # object_df = data.object_df
- # # == 1. 提取行人轨迹 ==
- # ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]]
- # merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
- # if merged.empty:
- # return {"launchTimewhenPedestrianLeave_PGVIL": -1}
- # # == 2. 判断主车是否在行人完全离开前抢行 ==
- # ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "offX"]].copy()
- # ped = ped_df.rename(columns={"posX": "posX_ped", "posY": "posY_ped"})
- # ego_ped = pd.merge(ego, ped, on="simTime")
- # ego_ped = ego_ped.sort_values("simTime").reset_index(drop=True)
- # front_offset = ego_ped["dimX"] / 2 + ego_ped["offX"]
- # yaw = np.deg2rad(ego_ped["posH"])
- # ego_ped["front_x"] = ego_ped["posX"] + np.cos(yaw) * front_offset
- # ego_ped["front_y"] = ego_ped["posY"] + np.sin(yaw) * front_offset
- # dx = ego_ped["front_x"] - ego_ped["posX_ped"]
- # dy = ego_ped["front_y"] - ego_ped["posY_ped"]
- # ego_ped["proj"] = dx * np.cos(yaw) + dy * np.sin(yaw)
- # ego_ped["lateral_dist"] = np.abs(-np.sin(yaw) * dx + np.cos(yaw) * dy)
- # cross_idx = ego_ped[ego_ped["proj"] > 0].index
- # if not cross_idx.empty and cross_idx[-1] <= len(ego_ped) - 3:
- # i = cross_idx[0]
- # d0 = ego_ped.loc[i, "lateral_dist"]
- # d1 = ego_ped.loc[i + 1, "lateral_dist"]
- # d2 = ego_ped.loc[i + 2, "lateral_dist"]
- # if d1 < d0 and d2 < d1:
- # return {"launchTimewhenPedestrianLeave_PGVIL": -66} # 主车未避让行人
- # # == 3. 判断行人进入与离开车道 ==
- # yaw_rad = np.deg2rad(merged["posH"])
- # dx = merged["posX_ped"] - merged["posX"]
- # dy = merged["posY_ped"] - merged["posY"]
- # merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy)
- # merged["threshold"] = merged["lane_width"] / 2
- # entry = merged[merged["lateral_dist"] <= merged["threshold"]]
- # if entry.empty:
- # return {"launchTimewhenPedestrianLeave_PGVIL": -11}
- # t_entry = entry["simTime"].iloc[0]
- # after_e = merged[merged["simTime"] > t_entry]
- # leave = after_e[after_e["lateral_dist"] > after_e["threshold"]]
- # if leave.empty:
- # return {"launchTimewhenPedestrianLeave_PGVIL": -22}
- # t_leave = leave["simTime"].iloc[0]
- # ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0]
- # # == 4. 查找 t_leave 之后主车是否在前方合理范围内停车 ==
- # stops = ego_df[(ego_df["simTime"] >= t_leave) & (ego_df["v"] == 0)].copy()
- # if stops.empty:
- # return {"launchTimewhenPedestrianLeave_PGVIL": -33}
- # front_overPos = stops["dimX"] / 2 + stops["offX"]
- # yaw_stop = np.deg2rad(stops["posH"])
- # stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos
- # stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos
- # dx_s = stops["front_x"] - ped_x
- # dy_s = stops["front_y"] - ped_y
- # proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop)
- # valid = stops[
- # (proj >= 0) &
- # (proj <= 20) &
- # ((stops["simTime"] - t_leave) <= 5.0)
- # ]
- # if valid.empty:
- # return {"launchTimewhenPedestrianLeave_PGVIL": -88}
- # # == 5. 查找 t_stop 后首次启动 ==
- # t_stop = valid["simTime"].iloc[0]
- # launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0)]["simTime"]
- # if launches.empty:
- # return {"launchTimewhenPedestrianLeave_PGVIL": -99}
- # t_launch = launches.iloc[0]
- # return {"launchTimewhenPedestrianLeave_PGVIL": round(t_launch - t_stop, 3)}
- def launchTimewhenPedestrianLeave_PGVIL(data, plot_path):
- ego_df = data.ego_data
- object_df = data.object_df
- ego_df["posH"] = (90 - ego_df["posH"]) % 360
- object_df["posH"] = (90 - object_df["posH"]) % 360
- ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]]
- merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
- if merged.empty:
- return {"launchTimewhenPedestrianLeave_PGVIL": -1}
- yaw_rad = np.deg2rad(merged["posH"]) # 度→弧度
- dx = merged["posX_ped"] - merged["posX"]
- dy = merged["posY_ped"] - merged["posY"]
- merged["lateral_dist"] = np.abs(-np.sin(yaw_rad) * dx + np.cos(yaw_rad) * dy)
- merged["threshold"] = merged["lane_width"] / 2
- entry = merged[merged["lateral_dist"] <= merged["threshold"]]
- if entry.empty:
- return {"launchTimewhenPedestrianLeave_PGVIL": -11}
- t_entry = entry["simTime"].iloc[0]
- after_e = merged[merged["simTime"] > t_entry]
- leave = after_e[after_e["lateral_dist"] > after_e["threshold"]]
- if leave.empty:
- return {"launchTimewhenPedestrianLeave_PGVIL": -22}
- t_leave = leave["simTime"].iloc[0]
- ped_x, ped_y = leave[["posX_ped", "posY_ped"]].iloc[0]
- ego_after_leave = ego_df[ego_df["simTime"] >= t_leave].copy()
- min_V = ego_after_leave["v"].min()
- if ego_after_leave["v"].min() == 0:
- stops = ego_after_leave[ego_after_leave["v"] == 0].copy()
- elif min_V <= 1:
- stops = ego_after_leave[ego_after_leave["v"] == min_V].copy()
- else:
- stops = pd.DataFrame()
- # stops = ego_df[(ego_df["simTime"] >= t_leave) & (ego_df["v"] <= 0.01)].copy()
- if stops.empty: # 没有停车
- return {"launchTimewhenPedestrianLeave_PGVIL": -33}
- front_overPos = stops["dimX"] / 2 + stops["offX"]
- yaw_stop = np.deg2rad(stops["posH"])
- stops["front_x"] = stops["posX"] + np.cos(yaw_stop) * front_overPos
- stops["front_y"] = stops["posY"] + np.sin(yaw_stop) * front_overPos
- dx_s = stops["front_x"] - ped_x
- dy_s = stops["front_y"] - ped_y
- proj = dx_s * np.cos(yaw_stop) + dy_s * np.sin(yaw_stop)
- valid = stops[
- (proj >= 0) &
- (proj <= 20) &
- ((stops["simTime"] - t_leave) <= 5.0)
- ]
- if valid.empty: # 车辆未在行人离开后停止
- return {"launchTimewhenPedestrianLeave_PGVIL": -88}
- t_stop = valid["simTime"].iloc[0]
- if ego_after_leave["v"].min() == 0:
- launches_data = ego_after_leave[ego_after_leave["v"] > 0].copy()
- else:
- launches_data = ego_after_leave[ego_after_leave["v"] > 1].copy()
- launches = launches_data[launches_data["simTime"] > t_stop]
- # launches = ego_df[(ego_df["simTime"] > t_stop) & (ego_df["v"] > 0.5)]["simTime"]
- if launches.empty: # 车辆停止后未启动
- return {"launchTimewhenPedestrianLeave_PGVIL": -99}
- # t_launch = launches.iloc[0]
- t_launch = launches["simTime"].iloc[0]
- return {"launchTimewhenPedestrianLeave_PGVIL": round(t_launch - t_stop, 3)}
- def waitTimeAtCrosswalkwithPedestrian_PGVIL(data, plot_path):
- ego_df = data.ego_data
- object_df = data.object_df
- # ego_df["posH"] = (90 - ego_df["posH"]) % 360
- # object_df["posH"] = (90 - object_df["posH"]) % 360
- ped_df = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY"]]
- first_launch = ego_df.loc[ego_df["v"] > 0, "simTime"]
- if first_launch.empty:
- t0_launch = 0.0
- else:
- t0_launch = first_launch.iloc[0]
- merged = ego_df.merge(ped_df, on="simTime", how="inner", suffixes=("", "_ped"))
- merged["front_offset"] = merged["dimX"] / 2 + merged["offX"]
- yaw = np.deg2rad(merged["posH"])
- merged["front_x"] = merged["posX"] + np.cos(yaw) * merged["front_offset"]
- merged["front_y"] = merged["posY"] + np.sin(yaw) * merged["front_offset"]
- dx = merged["posX_ped"] - merged["front_x"]
- dy = merged["posY_ped"] - merged["front_y"]
- valid_times = (dx * np.cos(yaw) + dy * np.sin(yaw)) <= 0.1
- stops_df = merged.loc[
- (merged["simTime"] >= t0_launch) & (merged["v"] == 0) & valid_times
- ].sort_values("simTime")
- if stops_df.empty:
- print("no stopping time found!")
- return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": -1}
- wait_time = stops_df["simTime"].iloc[-1] - stops_df["simTime"].iloc[0]
- return {"waitTimeAtCrosswalkwithPedestrian_PGVIL": round(wait_time, 3)}
- def noCollision_PGVIL(data, plot_path):
- ego_df = data.ego_data
- object_df = data.object_df
- ego_df['posH'] = (90 - ego_df['posH']) % 360
- object_df['posH'] = (90 - object_df['posH']) % 360
- ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]]
- tar = object_df.loc[object_df["playerId"] == 2, ["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]]
- df = ego.merge(tar, on="simTime", suffixes=("", "_tar"))
- df["posH_tar_rad"] = np.deg2rad(df["posH_tar"])
- df["posH_rad"] = np.deg2rad(df["posH"])
- df["collision"] = df.apply(lambda row: funcIsCollision(
- row["dimX"], row["dimY"], row["offX"], 0, row["posX"], row["posY"],
- row["posH_rad"], row["dimX_tar"], row["dimY_tar"], row["offX_tar"], 0,
- row["posX_tar"], row["posY_tar"], row["posH_tar_rad"], ), axis=1, )
- if df["collision"].any():
- return {"noCollision_PGVIL": -1}
- else:
- return {"noCollision_PGVIL": 1}
- def leastLateralDistance_PGVIL(data, plot_path):
- ego_df = data.ego_data
- object_df = data.object_df
- ego_df['posH'] = (90 - ego_df['posH']) % 360
- object_df['posH'] = (90 - object_df['posH']) % 360
- cones = object_df[object_df["type"] == 25].copy()
- ego = ego_df[["simTime", "posX", "posY", "posH", "dimX", "dimY", "offX"]].copy()
- ego["posH_rad"] = np.deg2rad(ego["posH"])
- has_collision = False
- for idx, cone in cones.iterrows():
- # 找到与ego同一时刻的数据
- ego_at_time = ego[ego["simTime"] == cone["simTime"]]
- if ego_at_time.empty:
- continue
- posX_tar = cone["posX"]
- posY_tar = cone["posY"]
- dimX_tar = cone["dimX"]
- dimY_tar = cone["dimY"]
- offX_tar = cone.get("offX", 0)
- posH_tar_rad = np.deg2rad(cone["posH"]) if not np.isnan(cone["posH"]) else 0
- for _, row in ego_at_time.iterrows():
- result = funcIsCollision(
- row["dimX"], row["dimY"], row["offX"], 0,
- row["posX"], row["posY"], row["posH_rad"],
- dimX_tar, dimY_tar, offX_tar, 0,
- posX_tar, posY_tar, posH_tar_rad,
- )
- if result:
- has_collision = True
- break
- if has_collision:
- break
- return {"leastLateralDistance_PGVIL": -1 if has_collision else 1}
- class FunctionRegistry:
- """动态函数注册器(支持参数验证)"""
- def __init__(self, data_processed, plot_path):
- self.logger = LogManager().get_logger() # 获取全局日志实例
- self.data = data_processed
- self.plot_path = plot_path
- # 检查function_config是否为空
- if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
- self.logger.warning("功能配置为空,跳过功能指标计算")
- self.fun_config = {}
- self.level_3_merics = []
- self._registry = {}
- return
- self.fun_config = data_processed.function_config.get("function", {})
- self.level_3_merics = self._extract_level_3_metrics(self.fun_config)
- self._registry: Dict[str, Callable] = {}
- self._registry = self._build_registry()
- def _extract_level_3_metrics(self, config_node: dict) -> list:
- """DFS遍历提取第三层指标(时间复杂度O(n))[4](@ref)"""
- metrics = []
- def _recurse(node):
- if isinstance(node, dict):
- if "name" in node and not any(
- isinstance(v, dict) for v in node.values()
- ):
- metrics.append(node["name"])
- for v in node.values():
- _recurse(v)
- _recurse(config_node)
- self.logger.info(f"评比的功能指标列表:{metrics}")
- return metrics
- def _build_registry(self) -> dict:
- """自动注册指标函数(防御性编程)"""
- registry = {}
- for func_name in self.level_3_merics:
- try:
- registry[func_name] = globals()[func_name]
- except KeyError:
- print(f"未实现指标函数: {func_name}")
- self.logger.error(f"未实现指标函数: {func_name}")
- return registry
- def batch_execute(self) -> dict:
- """批量执行指标计算(带熔断机制)"""
- results = {}
- # 如果配置为空或没有注册的指标,直接返回空结果
- if not hasattr(self, 'fun_config') or not self.fun_config or not self._registry:
- self.logger.info("功能配置为空或无注册指标,返回空结果")
- return results
- for name, func in self._registry.items():
- try:
- result = func(self.data, self.plot_path) # 统一传递数据上下文
- results.update(result)
- except Exception as e:
- print(f"{name} 执行失败: {str(e)}")
- self.logger.error(f"{name} 执行失败: {str(e)}", exc_info=True)
- results[name] = None
- self.logger.info(f"功能指标计算结果:{results}")
- return results
- class FunctionManager:
- """管理功能指标计算的类"""
- def __init__(self, data_processed, plot_path):
- self.data = data_processed
- self.logger = LogManager().get_logger()
- self.plot_path = plot_path
- # 检查function_config是否为空
- if not hasattr(data_processed, 'function_config') or not data_processed.function_config:
- self.logger.warning("功能配置为空,跳过功能指标计算初始化")
- self.function = None
- else:
- self.function = FunctionRegistry(self.data, self.plot_path)
- def report_statistic(self):
- """
- 计算并报告功能指标结果。
- :return: 评估结果
- """
- # 如果function为None,直接返回空字典
- if self.function is None:
- self.logger.info("功能指标管理器未初始化,返回空结果")
- return {}
- function_result = self.function.batch_execute()
- print("\n[功能性表现及评价结果]")
- return function_result
- # self.logger.info(f'Function Result: {function_result}')
- # 使用示例
- if __name__ == "__main__":
- pass
- # print("\n[功能类表现及得分情况]")
|