123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/07/24
- @Last Modified: 2023/07/24
- @Summary: Evaluateion functions
- """
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- import os
- import time
- import json
- import math
- import pandas as pd
- import numpy as np
- import zipfile
- import importlib
- def get_status_active_data(active_time_ranges, df):
- # 给定的双层列表
- # active_time_ranges = status_trigger_dict['LKA']['LKA_active_time']
- # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
- # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
- filtered_df = pd.DataFrame(columns=df.columns)
- for start, end in active_time_ranges:
- mask = (df['simTime'] >= start) & (df['simTime'] <= end)
- if filtered_df is None:
- filtered_df = df[mask]
- else:
- filtered_df = pd.concat([filtered_df, df[mask]])
- # 上述方式可能引入重复行,使用drop_duplicates去重
- df_result = filtered_df.drop_duplicates()
- # ego_x = df_result[df_result['playerId'] == 1]['posX'].reset_index(drop = True)
- # ego_y = df_result[df_result['playerId'] == 1]['posY'].reset_index(drop = True)
- # obj_x = df_result[df_result['playerId'] == 2]['posX'].reset_index(drop = True)
- # obj_y = df_result[df_result['playerId'] == 2]['posY'].reset_index(drop = True)
- # print("ego_x is", ego_x)
- # ego_speedx = self.df[self.df['playerId'] == 1]['speedX'].reset_index(drop = True)
- # ego_speedy = self.df[self.df['playerId'] == 1]['speedY'].reset_index(drop = True)
- # obj_speedx = self.df[self.df['playerId'] == 2]['speedX'].reset_index(drop = True)
- # obj_speedy = self.df[self.df['playerId'] == 2]['speedY'].reset_index(drop = True)
- # dx = obj_x - ego_x
- # dy = obj_y - ego_y
- # vx = obj_speedx - ego_speedx
- # vy = obj_speedy - ego_speedy
- # dist = np.sqrt(dx**2 + dy**2)
- # ego_v_projection_in_dist = _cal_v_ego_projection(dx, dy, ego_speedx, ego_speedy)
- # thw1 = _cal_THW(dist, ego_v_projection_in_dist)
- # thw = thw1.tolist()
- # THW = [[x]*2 for x in thw]
- # df_result['THW'] = THW
- return df_result
- def _cal_v_ego_projection(dx, dy, v_x1, v_y1):
- AB_mod = np.sqrt(dx**2 + dy**2)
- U_ABx = dx/AB_mod
- U_ABy = dy/AB_mod
- V1_on_AB = v_x1 * U_ABx + v_y1 *U_ABy
- return V1_on_AB
- def _cal_THW(dist, ego_v_projection_in_dist):
- # if ego_v_projection_in_dist == 0:
- # return math.inf
- ego_v_projection_in_dist = ego_v_projection_in_dist.replace(0, 0.001)
- THW = dist / ego_v_projection_in_dist
- return THW
- def custom_metric_param_parser(param_list):
- """
- param_dict = {
- "paramA" [
- {
- "kind": "-1",
- "optimal": "1",
- "multiple": ["0.5","5"],
- "spare1": null,
- "spare2": null
- }
- ]
- }
- """
- kind_list = []
- optimal_list = []
- multiple_list = []
- spare_list = []
- # spare1_list = []
- # spare2_list = []
- for i in range(len(param_list)):
- kind_list.append(int(param_list[i]['kind']))
- optimal_list.append(float(param_list[i]['optimal']))
- multiple_list.append([float(x) for x in param_list[i]['multiple']])
- spare_list.append([item["param"] for item in param_list[i]["spare"]])
- # spare1_list.append(param_list[i]['spare1'])
- # spare2_list.append(param_list[i]['spare2'])
- result = {
- "kind": kind_list,
- "optimal": optimal_list,
- "multiple": multiple_list,
- "spare": spare_list,
- # "spare1": spare1_list,
- # "spare2": spare2_list
- }
- return result
- def score_over_100(score):
- if score > 100:
- return 100
- return score
- def import_score_class(path, file):
- sys.path.append(path) # 将path添加到Python的搜索路径中
- # module_name = file[:-3] # 去掉文件名的".py"后缀
- module = __import__(file, fromlist=['ScoreModel'])
- class_instance = module.ScoreModel
- return class_instance
- def import_class_lib(path, file):
- sys.path.append(path) # 将path添加到Python的搜索路径中
- module = importlib.import_module(file)
- class_name = "ScoreModel"
- class_instance = getattr(module, class_name)
- return class_instance
- def cal_velocity(lat_v, lon_v):
- """
- The function can calculate the velocity with lateral velocity and longitudinal velocity.
- Args:
- lat_v: lateral velocity, m/s
- lon_v: longitudinal velocity, m/s
- Returns:
- v: the resultant velocity, km/h
- """
- v = (lat_v ** 2 + lon_v ** 2) ** 0.5
- return v
- def df2csv(df, filePath):
- df.to_csv(f'{filePath}', index=False)
- def dict2json(input_dict, jsonPath):
- with open(f'{jsonPath}', 'w', encoding='utf-8') as f:
- f.write(json.dumps(input_dict, ensure_ascii=False))
- def json2dict(json_file):
- with open(json_file, 'r', encoding='utf-8') as f:
- json_dict = json.load(f)
- return json_dict
- def get_subfolders_name(path):
- return [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))]
- def zip_dir(dir_path, zip_file_path):
- """
- This function can zip the files in dir_path as a zipfile.
- Arguments:
- dir_path: A str of the path of the files to be ziped.
- zip_file_path: A str of the path of the zipfile to be stored.
- Returns:
- None
- """
- with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for root, dirs, files in os.walk(dir_path):
- for file in files:
- file_path = os.path.join(root, file)
- zipf.write(file_path)
- def score_grade(score):
- """
- Returns the corresponding grade based on the input score.
- Arguments:
- score: An integer or float representing the score.
- Returns:
- grade: A string representing the grade.
- """
- GRADE_EXCELLENT = 90
- GRADE_GOOD = 80
- GRADE_GENERAL = 60
- if score >= GRADE_EXCELLENT:
- grade = '优秀'
- elif score >= GRADE_GOOD:
- grade = '良好'
- elif score > GRADE_GENERAL:
- grade = '一般'
- else:
- grade = '较差'
- return grade
- def mileage_format(mileage):
- if mileage < 1000:
- return f"{mileage:.2f}米"
- else:
- mileage = mileage / 1000
- return f"{mileage:.2f}公里"
- def duration_format(duration):
- if duration < 60:
- return f"{duration:.2f}秒"
- elif duration < 3600:
- minute = int(duration / 60)
- second = int(duration % 60)
- return f"{minute}分{second}秒" if second != 0 else f"{minute}分"
- else:
- hour = int(duration / 3600)
- minute = int((duration % 3600) / 60)
- second = int(duration % 60)
- ans = f"{hour:d}时"
- ans = ans + f"{minute}分" if minute != 0 else ans
- ans = ans + f"{second}秒" if second != 0 else ans
- return ans
- def continuous_group(df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
- result_df = pd.concat([time_df, frame_df], axis=1)
- return result_df
- def continuous_group_old(df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group = []
- sub_group = []
- for i in range(len(frame_list)):
- if not sub_group or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group.append(time_list[i])
- else:
- group.append(sub_group)
- sub_group = [time_list[i]]
- group.append(sub_group)
- group = [g for g in group if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group]
- unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- return unsafe_df
- def get_frame_with_time(df1, df2):
- # 将dataframe1按照start_time与simTime进行合并
- df1_start = df1.merge(df2[['simTime', 'simFrame']], left_on='start_time', right_on='simTime')
- df1_start = df1_start[['start_time', 'simFrame']].copy()
- df1_start.rename(columns={'simFrame': 'start_frame'}, inplace=True)
- # 将dataframe1按照end_time与simTime进行合并
- df1_end = df1.merge(df2[['simTime', 'simFrame']], left_on='end_time', right_on='simTime')
- df1_end = df1_end[['end_time', 'simFrame']].copy()
- df1_end.rename(columns={'simFrame': 'end_frame'}, inplace=True)
- # 将两个合并后的数据框按照行索引进行拼接
- result = pd.concat([df1_start, df1_end], axis=1)
- return result
- def string_concatenate(str_list):
- """
- This function concatenates the input string list to generate a new string.
- If str_list is empty, an empty string is returned.
- If str_list has only one element, return that element.
- If str_list has multiple elements, concatenate all the elements except the last element with ', ', and then concatenate the resulting string with the last element with 'and'.
- Arguments:
- str_list: A list of strings.
- Returns:
- ans_str: A concatenated string.
- """
- if not str_list:
- return ""
- ans_str = '、'.join(str_list[:-1])
- ans_str = ans_str + "和" + str_list[-1] if len(str_list) > 1 else ans_str + str_list[-1]
- return ans_str
- def zip_time_pairs(time_list, zip_list):
- zip_time_pairs = zip(time_list, zip_list)
- zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
- return zip_vs_time
- def replace_key_with_value(input_string, replacement_dict):
- """
- 替换字符串中的关键字为给定的字典中的值。
- :param input_string: 需要进行替换的字符串
- :param replacement_dict: 替换规则,格式为 {key: value}
- :return: 替换后的字符串
- """
- # 遍历字典中的键值对,并用值替换字符串中的键
- for key, value in replacement_dict.items():
- if key in input_string:
- input_string = input_string.replace(key, value)
- return input_string
- def continous_judge(frame_list):
- if not frame_list:
- return 0
- cnt = 1
- for i in range(1, len(frame_list)):
- if frame_list[i] - frame_list[i - 1] <= 3:
- continue
- cnt += 1
- return cnt
- def get_interpolation(x, point1, point2):
- """
- According to the two extreme value points, the equation of one variable is determined,
- and the solution of the equation is obtained in the domain of definition.
- Arguments:
- x: A float number of the independent variable.
- point1: A set of the coordinate extreme point.
- point2: A set of the other coordinate extreme point.
- Returns:
- y: A float number of the dependent variable.
- """
- try:
- k = (point1[1] - point2[1]) / (point1[0] - point2[0])
- b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
- y = x * k + b
- return y
- except Exception as e:
- return f"Error: {str(e)}"
- def statistic_analysis(data_list):
- sorted_data_list = sorted(data_list)
- maxx = sorted_data_list[-1]
- minn = sorted_data_list[0]
- meann = sum(sorted_data_list) / len(sorted_data_list)
- percentile_99 = np.percentile(sorted_data_list, 99)
- ans_list = [maxx, minn, meann, percentile_99]
- return ans_list
|