#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################## # # Copyright (c) 2023 CICV, Inc. All Rights Reserved # ################################################################## """ @Authors: yangzihao(yangzihao@china-icv.cn) @Data: 2023/07/24 @Last Modified: 2023/07/24 @Summary: Evaluateion functions """ import sys sys.path.append('../common') sys.path.append('../modules') import os import time import json import math import pandas as pd import numpy as np import zipfile import importlib import matplotlib.pyplot as plt def _cal_max_min_avg(num_list): maxx = round(max(num_list), 3) if len(num_list)>0 else "-" minn = round(min(num_list), 3) if len(num_list)>0 else "-" avg = round(sum(num_list) / len(num_list), 3) if len(num_list)>0 else "-" result = { "max": maxx, "min": minn, "avg": avg } return result def save_line_chart(list1, list2, save_path): plt.plot(list1, label='列表1') plt.plot(list2, label='列表2') plt.xlabel('索引') plt.ylabel('值') plt.title('折线图') plt.legend() plt.savefig(save_path) plt.close() def import_class(path, file): sys.path.append(path) # 将path添加到Python的搜索路径中 # module_name = file[:-3] # 去掉文件名的".py"后缀 module = __import__(file, fromlist=['ScoreModel']) class_instance = module.ScoreModel return class_instance def import_class_lib(path, file): sys.path.append(path) # 将path添加到Python的搜索路径中 module = importlib.import_module(file) class_name = "ScoreModel" class_instance = getattr(module, class_name) return class_instance def score_over_100(score): if score > 100: return 100 return score def normalize_dict_values(dictionary): # 计算字典中键的数量 n = len(dictionary) # 初始化总和为0 total_sum = 0 # 遍历字典,对每个值进行赋值,并累加总和 for key, value in dictionary.items(): # 计算当前值,除了最后一个值外都四舍五入到两位小数 if key != list(dictionary.keys())[-1]: new_value = round(1 / n, 2) else: # 最后一个值的计算:1减去之前所有值的和 new_value = round(1 - total_sum, 2) # 更新字典的值 dictionary[key] = new_value # 累加当前值到总和 total_sum += new_value return dictionary def cal_velocity(lat_v, lon_v): """ The function can calculate the velocity with lateral velocity and longitudinal velocity. Args: lat_v: lateral velocity, m/s lon_v: longitudinal velocity, m/s Returns: v: the resultant velocity, km/h """ v = (lat_v ** 2 + lon_v ** 2) ** 0.5 return v def json2dict(json_file): with open(json_file, 'r', encoding='utf-8') as f: json_dict = json.load(f) return json_dict def get_subfolders_name(path): return [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))] def zip_dir(dir_path, zip_file_path): """ This function can zip the files in dir_path as a zipfile. Arguments: dir_path: A str of the path of the files to be ziped. zip_file_path: A str of the path of the zipfile to be stored. Returns: None """ with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(dir_path): for file in files: file_path = os.path.join(root, file) zipf.write(file_path) def score_grade(score): """ Returns the corresponding grade based on the input score. Arguments: score: An integer or float representing the score. Returns: grade: A string representing the grade. """ if score >= 90: grade = '优秀' elif score >= 80: grade = '良好' elif score > 60: grade = '一般' else: grade = '较差' return grade def mileage_format(mileage): if mileage < 1000: return f"{mileage:.2f}米" else: mileage = mileage / 1000 return f"{mileage:.2f}公里" def duration_format(duration): if duration < 60: return f"{duration:.2f}秒" elif duration < 3600: minute = int(duration / 60) second = int(duration % 60) return f"{minute}分{second}秒" if second != 0 else f"{minute}分" else: hour = int(duration / 3600) minute = int((duration % 3600) / 60) second = int(duration % 60) ans = f"{hour:d}时" ans = ans + f"{minute}分" if minute != 0 else ans ans = ans + f"{second}秒" if second != 0 else ans return ans def continuous_group(df): time_list = df['simTime'].values.tolist() frame_list = df['simFrame'].values.tolist() group_time = [] group_frame = [] sub_group_time = [] sub_group_frame = [] for i in range(len(frame_list)): if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1: sub_group_time.append(time_list[i]) sub_group_frame.append(frame_list[i]) else: group_time.append(sub_group_time) group_frame.append(sub_group_frame) sub_group_time = [time_list[i]] sub_group_frame = [frame_list[i]] group_time.append(sub_group_time) group_frame.append(sub_group_frame) group_time = [g for g in group_time if len(g) >= 2] group_frame = [g for g in group_frame if len(g) >= 2] # 输出图表值 time = [[g[0], g[-1]] for g in group_time] frame = [[g[0], g[-1]] for g in group_frame] time_df = pd.DataFrame(time, columns=['start_time', 'end_time']) frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame']) result_df = pd.concat([time_df, frame_df], axis=1) return result_df def continuous_group_old(df): time_list = df['simTime'].values.tolist() frame_list = df['simFrame'].values.tolist() group = [] sub_group = [] for i in range(len(frame_list)): if not sub_group or frame_list[i] - frame_list[i - 1] <= 1: sub_group.append(time_list[i]) else: group.append(sub_group) sub_group = [time_list[i]] group.append(sub_group) group = [g for g in group if len(g) >= 2] # 输出图表值 time = [[g[0], g[-1]] for g in group] unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time']) return unsafe_df def get_frame_with_time(df1, df2): # 将dataframe1按照start_time与simTime进行合并 df1_start = df1.merge(df2[['simTime', 'simFrame']], left_on='start_time', right_on='simTime') df1_start = df1_start[['start_time', 'simFrame']].copy() df1_start.rename(columns={'simFrame': 'start_frame'}, inplace=True) # 将dataframe1按照end_time与simTime进行合并 df1_end = df1.merge(df2[['simTime', 'simFrame']], left_on='end_time', right_on='simTime') df1_end = df1_end[['end_time', 'simFrame']].copy() df1_end.rename(columns={'simFrame': 'end_frame'}, inplace=True) # 将两个合并后的数据框按照行索引进行拼接 result = pd.concat([df1_start, df1_end], axis=1) return result def string_concatenate(str_list): """ This function concatenates the input string list to generate a new string. If str_list is empty, an empty string is returned. If str_list has only one element, return that element. If str_list has multiple elements, concatenate all the elements except the last element with ', ', and then concatenate the resulting string with the last element with 'and'. Arguments: str_list: A list of strings. Returns: ans_str: A concatenated string. """ if not str_list: return "" ans_str = '、'.join(str_list[:-1]) ans_str = ans_str + "和" + str_list[-1] if len(str_list) > 1 else ans_str + str_list[-1] return ans_str def zip_time_pairs(time_list, zip_list, upper_limit=9999): zip_time_pairs = zip(time_list, zip_list) zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)] return zip_vs_time def replace_key_with_value(input_string, replacement_dict): """ 替换字符串中的关键字为给定的字典中的值。 :param input_string: 需要进行替换的字符串 :param replacement_dict: 替换规则,格式为 {key: value} :return: 替换后的字符串 """ # 遍历字典中的键值对,并用值替换字符串中的键 for key, value in replacement_dict.items(): if key in input_string: input_string = input_string.replace(key, value) return input_string def continous_judge(frame_list): if not frame_list: return 0 cnt = 1 for i in range(1, len(frame_list)): if frame_list[i] - frame_list[i - 1] <= 3: continue cnt += 1 return cnt def get_interpolation(x, point1, point2): """ According to the two extreme value points, the equation of one variable is determined, and the solution of the equation is obtained in the domain of definition. Arguments: x: A float number of the independent variable. point1: A set of the coordinate extreme point. point2: A set of the other coordinate extreme point. Returns: y: A float number of the dependent variable. """ try: k = (point1[1] - point2[1]) / (point1[0] - point2[0]) b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0]) y = x * k + b return y except Exception as e: return f"Error: {str(e)}" def statistic_analysis(data_list): sorted_data_list = sorted(data_list) maxx = sorted_data_list[-1] minn = sorted_data_list[0] meann = sum(sorted_data_list) / len(sorted_data_list) percentile_99 = np.percentile(sorted_data_list, 99) ans_list = [maxx, minn, meann, percentile_99] return ans_list