123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- ##################################################################
- #
- # Copyright (c) 2023 CICV, Inc. All Rights Reserved
- #
- ##################################################################
- """
- @Authors: yangzihao(yangzihao@china-icv.cn)
- @Data: 2023/07/24
- @Last Modified: 2023/07/24
- @Summary: Evaluateion functions
- """
- import sys
- sys.path.append('../common')
- sys.path.append('../modules')
- import os
- import time
- import json
- import math
- import pandas as pd
- import numpy as np
- import zipfile
- import importlib
- import matplotlib.pyplot as plt
- def _cal_max_min_avg(num_list):
- maxx = round(max(num_list), 3) if num_list else "-"
- minn = round(min(num_list), 3) if num_list else "-"
- avg = round(sum(num_list) / len(num_list), 3) if num_list else "-"
- result = {
- "max": maxx,
- "min": minn,
- "avg": avg
- }
- return result
- def save_line_chart(list1, list2, save_path):
- plt.plot(list1, label='列表1')
- plt.plot(list2, label='列表2')
- plt.xlabel('索引')
- plt.ylabel('值')
- plt.title('折线图')
- plt.legend()
- plt.savefig(save_path)
- plt.close()
- def import_class(path, file):
- sys.path.append(path) # 将path添加到Python的搜索路径中
- # module_name = file[:-3] # 去掉文件名的".py"后缀
- module = __import__(file, fromlist=['ScoreModel'])
- class_instance = module.ScoreModel
- return class_instance
- def import_class_lib(path, file):
- sys.path.append(path) # 将path添加到Python的搜索路径中
- module = importlib.import_module(file)
- class_name = "ScoreModel"
- class_instance = getattr(module, class_name)
- return class_instance
- def score_over_100(score):
- if score > 100:
- return 100
- return score
- def normalize_dict_values(dictionary):
- # 计算字典中键的数量
- n = len(dictionary)
- # 初始化总和为0
- total_sum = 0
- # 遍历字典,对每个值进行赋值,并累加总和
- for key, value in dictionary.items():
- # 计算当前值,除了最后一个值外都四舍五入到两位小数
- if key != list(dictionary.keys())[-1]:
- new_value = round(1 / n, 2)
- else:
- # 最后一个值的计算:1减去之前所有值的和
- new_value = round(1 - total_sum, 2)
- # 更新字典的值
- dictionary[key] = new_value
- # 累加当前值到总和
- total_sum += new_value
- return dictionary
- def cal_velocity(lat_v, lon_v):
- """
- The function can calculate the velocity with lateral velocity and longitudinal velocity.
- Args:
- lat_v: lateral velocity, m/s
- lon_v: longitudinal velocity, m/s
- Returns:
- v: the resultant velocity, km/h
- """
- v = (lat_v ** 2 + lon_v ** 2) ** 0.5
- return v
- def json2dict(json_file):
- with open(json_file, 'r', encoding='utf-8') as f:
- json_dict = json.load(f)
- return json_dict
- def get_subfolders_name(path):
- return [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))]
- def zip_dir(dir_path, zip_file_path):
- """
- This function can zip the files in dir_path as a zipfile.
- Arguments:
- dir_path: A str of the path of the files to be ziped.
- zip_file_path: A str of the path of the zipfile to be stored.
- Returns:
- None
- """
- with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for root, dirs, files in os.walk(dir_path):
- for file in files:
- file_path = os.path.join(root, file)
- zipf.write(file_path)
- def score_grade(score):
- """
- Returns the corresponding grade based on the input score.
- Arguments:
- score: An integer or float representing the score.
- Returns:
- grade: A string representing the grade.
- """
- if score >= 90:
- grade = '优秀'
- elif score >= 80:
- grade = '良好'
- elif score > 60:
- grade = '一般'
- else:
- grade = '较差'
- return grade
- def mileage_format(mileage):
- if mileage < 1000:
- return f"{mileage:.2f}米"
- else:
- mileage = mileage / 1000
- return f"{mileage:.2f}公里"
- def duration_format(duration):
- if duration < 60:
- return f"{duration:.2f}秒"
- elif duration < 3600:
- minute = int(duration / 60)
- second = int(duration % 60)
- return f"{minute}分{second}秒" if second != 0 else f"{minute}分"
- else:
- hour = int(duration / 3600)
- minute = int((duration % 3600) / 60)
- second = int(duration % 60)
- ans = f"{hour:d}时"
- ans = ans + f"{minute}分" if minute != 0 else ans
- ans = ans + f"{second}秒" if second != 0 else ans
- return ans
- def continuous_group(df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group_time = []
- group_frame = []
- sub_group_time = []
- sub_group_frame = []
- for i in range(len(frame_list)):
- if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group_time.append(time_list[i])
- sub_group_frame.append(frame_list[i])
- else:
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- sub_group_time = [time_list[i]]
- sub_group_frame = [frame_list[i]]
- group_time.append(sub_group_time)
- group_frame.append(sub_group_frame)
- group_time = [g for g in group_time if len(g) >= 2]
- group_frame = [g for g in group_frame if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group_time]
- frame = [[g[0], g[-1]] for g in group_frame]
- time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
- result_df = pd.concat([time_df, frame_df], axis=1)
- return result_df
- def continuous_group_old(df):
- time_list = df['simTime'].values.tolist()
- frame_list = df['simFrame'].values.tolist()
- group = []
- sub_group = []
- for i in range(len(frame_list)):
- if not sub_group or frame_list[i] - frame_list[i - 1] <= 1:
- sub_group.append(time_list[i])
- else:
- group.append(sub_group)
- sub_group = [time_list[i]]
- group.append(sub_group)
- group = [g for g in group if len(g) >= 2]
- # 输出图表值
- time = [[g[0], g[-1]] for g in group]
- unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
- return unsafe_df
- def get_frame_with_time(df1, df2):
- # 将dataframe1按照start_time与simTime进行合并
- df1_start = df1.merge(df2[['simTime', 'simFrame']], left_on='start_time', right_on='simTime')
- df1_start = df1_start[['start_time', 'simFrame']].copy()
- df1_start.rename(columns={'simFrame': 'start_frame'}, inplace=True)
- # 将dataframe1按照end_time与simTime进行合并
- df1_end = df1.merge(df2[['simTime', 'simFrame']], left_on='end_time', right_on='simTime')
- df1_end = df1_end[['end_time', 'simFrame']].copy()
- df1_end.rename(columns={'simFrame': 'end_frame'}, inplace=True)
- # 将两个合并后的数据框按照行索引进行拼接
- result = pd.concat([df1_start, df1_end], axis=1)
- return result
- def string_concatenate(str_list):
- """
- This function concatenates the input string list to generate a new string.
- If str_list is empty, an empty string is returned.
- If str_list has only one element, return that element.
- If str_list has multiple elements, concatenate all the elements except the last element with ', ', and then concatenate the resulting string with the last element with 'and'.
- Arguments:
- str_list: A list of strings.
- Returns:
- ans_str: A concatenated string.
- """
- if not str_list:
- return ""
- ans_str = '、'.join(str_list[:-1])
- ans_str = ans_str + "和" + str_list[-1] if len(str_list) > 1 else ans_str + str_list[-1]
- return ans_str
- def zip_time_pairs(time_list, zip_list, upper_limit=9999):
- zip_time_pairs = zip(time_list, zip_list)
- zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
- return zip_vs_time
- def replace_key_with_value(input_string, replacement_dict):
- """
- 替换字符串中的关键字为给定的字典中的值。
- :param input_string: 需要进行替换的字符串
- :param replacement_dict: 替换规则,格式为 {key: value}
- :return: 替换后的字符串
- """
- # 遍历字典中的键值对,并用值替换字符串中的键
- for key, value in replacement_dict.items():
- if key in input_string:
- input_string = input_string.replace(key, value)
- return input_string
- def continous_judge(frame_list):
- if not frame_list:
- return 0
- cnt = 1
- for i in range(1, len(frame_list)):
- if frame_list[i] - frame_list[i - 1] <= 3:
- continue
- cnt += 1
- return cnt
- def get_interpolation(x, point1, point2):
- """
- According to the two extreme value points, the equation of one variable is determined,
- and the solution of the equation is obtained in the domain of definition.
- Arguments:
- x: A float number of the independent variable.
- point1: A set of the coordinate extreme point.
- point2: A set of the other coordinate extreme point.
- Returns:
- y: A float number of the dependent variable.
- """
- try:
- k = (point1[1] - point2[1]) / (point1[0] - point2[0])
- b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
- y = x * k + b
- return y
- except Exception as e:
- return f"Error: {str(e)}"
- def statistic_analysis(data_list):
- sorted_data_list = sorted(data_list)
- maxx = sorted_data_list[-1]
- minn = sorted_data_list[0]
- meann = sum(sorted_data_list) / len(sorted_data_list)
- percentile_99 = np.percentile(sorted_data_list, 99)
- ans_list = [maxx, minn, meann, percentile_99]
- return ans_list
|