common.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/07/24
  11. @Last Modified: 2023/07/24
  12. @Summary: Evaluateion functions
  13. """
  14. import sys
  15. sys.path.append('../common')
  16. sys.path.append('../modules')
  17. import os
  18. import time
  19. import json
  20. import math
  21. import pandas as pd
  22. import numpy as np
  23. import zipfile
  24. import importlib
  25. def custom_metric_param_parser(param_list):
  26. """
  27. param_dict = {
  28. "paramA" [
  29. {
  30. "kind": "-1",
  31. "optimal": "1",
  32. "multiple": ["0.5","5"],
  33. "spare1": null,
  34. "spare2": null
  35. }
  36. ]
  37. }
  38. """
  39. kind_list = []
  40. optimal_list = []
  41. multiple_list = []
  42. spare_list = []
  43. # spare1_list = []
  44. # spare2_list = []
  45. for i in range(len(param_list)):
  46. kind_list.append(int(param_list[i]['kind']))
  47. optimal_list.append(float(param_list[i]['optimal']))
  48. multiple_list.append([float(x) for x in param_list[i]['multiple']])
  49. spare_list.append([item["param"] for item in param_list[i]["spare"]])
  50. # spare1_list.append(param_list[i]['spare1'])
  51. # spare2_list.append(param_list[i]['spare2'])
  52. result = {
  53. "kind": kind_list,
  54. "optimal": optimal_list,
  55. "multiple": multiple_list,
  56. "spare": spare_list,
  57. # "spare1": spare1_list,
  58. # "spare2": spare2_list
  59. }
  60. return result
  61. def score_over_100(score):
  62. if score > 100:
  63. return 100
  64. return score
  65. def import_score_class(path, file):
  66. sys.path.append(path) # 将path添加到Python的搜索路径中
  67. # module_name = file[:-3] # 去掉文件名的".py"后缀
  68. module = __import__(file, fromlist=['ScoreModel'])
  69. class_instance = module.ScoreModel
  70. return class_instance
  71. def import_class_lib(path, file):
  72. sys.path.append(path) # 将path添加到Python的搜索路径中
  73. module = importlib.import_module(file)
  74. class_name = "ScoreModel"
  75. class_instance = getattr(module, class_name)
  76. return class_instance
  77. def cal_velocity(lat_v, lon_v):
  78. """
  79. The function can calculate the velocity with lateral velocity and longitudinal velocity.
  80. Args:
  81. lat_v: lateral velocity, m/s
  82. lon_v: longitudinal velocity, m/s
  83. Returns:
  84. v: the resultant velocity, km/h
  85. """
  86. v = (lat_v ** 2 + lon_v ** 2) ** 0.5
  87. return v
  88. def df2csv(df, filePath):
  89. df.to_csv(f'{filePath}', index=False)
  90. def dict2json(input_dict, jsonPath):
  91. with open(f'{jsonPath}', 'w', encoding='utf-8') as f:
  92. f.write(json.dumps(input_dict, ensure_ascii=False))
  93. def json2dict(json_file):
  94. with open(json_file, 'r', encoding='utf-8') as f:
  95. json_dict = json.load(f)
  96. return json_dict
  97. def get_subfolders_name(path):
  98. return [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))]
  99. def zip_dir(dir_path, zip_file_path):
  100. """
  101. This function can zip the files in dir_path as a zipfile.
  102. Arguments:
  103. dir_path: A str of the path of the files to be ziped.
  104. zip_file_path: A str of the path of the zipfile to be stored.
  105. Returns:
  106. None
  107. """
  108. with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
  109. for root, dirs, files in os.walk(dir_path):
  110. for file in files:
  111. file_path = os.path.join(root, file)
  112. zipf.write(file_path)
  113. def score_grade(score):
  114. """
  115. Returns the corresponding grade based on the input score.
  116. Arguments:
  117. score: An integer or float representing the score.
  118. Returns:
  119. grade: A string representing the grade.
  120. """
  121. GRADE_EXCELLENT = 90
  122. GRADE_GOOD = 80
  123. GRADE_GENERAL = 60
  124. if score >= GRADE_EXCELLENT:
  125. grade = '优秀'
  126. elif score >= GRADE_GOOD:
  127. grade = '良好'
  128. elif score > GRADE_GENERAL:
  129. grade = '一般'
  130. else:
  131. grade = '较差'
  132. return grade
  133. def mileage_format(mileage):
  134. if mileage < 1000:
  135. return f"{mileage:.2f}米"
  136. else:
  137. mileage = mileage / 1000
  138. return f"{mileage:.2f}公里"
  139. def duration_format(duration):
  140. if duration < 60:
  141. return f"{duration:.2f}秒"
  142. elif duration < 3600:
  143. minute = int(duration / 60)
  144. second = int(duration % 60)
  145. return f"{minute}分{second}秒" if second != 0 else f"{minute}分"
  146. else:
  147. hour = int(duration / 3600)
  148. minute = int((duration % 3600) / 60)
  149. second = int(duration % 60)
  150. ans = f"{hour:d}时"
  151. ans = ans + f"{minute}分" if minute != 0 else ans
  152. ans = ans + f"{second}秒" if second != 0 else ans
  153. return ans
  154. def continuous_group(df):
  155. time_list = df['simTime'].values.tolist()
  156. frame_list = df['simFrame'].values.tolist()
  157. group_time = []
  158. group_frame = []
  159. sub_group_time = []
  160. sub_group_frame = []
  161. for i in range(len(frame_list)):
  162. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  163. sub_group_time.append(time_list[i])
  164. sub_group_frame.append(frame_list[i])
  165. else:
  166. group_time.append(sub_group_time)
  167. group_frame.append(sub_group_frame)
  168. sub_group_time = [time_list[i]]
  169. sub_group_frame = [frame_list[i]]
  170. group_time.append(sub_group_time)
  171. group_frame.append(sub_group_frame)
  172. group_time = [g for g in group_time if len(g) >= 2]
  173. group_frame = [g for g in group_frame if len(g) >= 2]
  174. # 输出图表值
  175. time = [[g[0], g[-1]] for g in group_time]
  176. frame = [[g[0], g[-1]] for g in group_frame]
  177. time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  178. frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
  179. result_df = pd.concat([time_df, frame_df], axis=1)
  180. return result_df
  181. def continuous_group_old(df):
  182. time_list = df['simTime'].values.tolist()
  183. frame_list = df['simFrame'].values.tolist()
  184. group = []
  185. sub_group = []
  186. for i in range(len(frame_list)):
  187. if not sub_group or frame_list[i] - frame_list[i - 1] <= 1:
  188. sub_group.append(time_list[i])
  189. else:
  190. group.append(sub_group)
  191. sub_group = [time_list[i]]
  192. group.append(sub_group)
  193. group = [g for g in group if len(g) >= 2]
  194. # 输出图表值
  195. time = [[g[0], g[-1]] for g in group]
  196. unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  197. return unsafe_df
  198. def get_frame_with_time(df1, df2):
  199. # 将dataframe1按照start_time与simTime进行合并
  200. df1_start = df1.merge(df2[['simTime', 'simFrame']], left_on='start_time', right_on='simTime')
  201. df1_start = df1_start[['start_time', 'simFrame']].copy()
  202. df1_start.rename(columns={'simFrame': 'start_frame'}, inplace=True)
  203. # 将dataframe1按照end_time与simTime进行合并
  204. df1_end = df1.merge(df2[['simTime', 'simFrame']], left_on='end_time', right_on='simTime')
  205. df1_end = df1_end[['end_time', 'simFrame']].copy()
  206. df1_end.rename(columns={'simFrame': 'end_frame'}, inplace=True)
  207. # 将两个合并后的数据框按照行索引进行拼接
  208. result = pd.concat([df1_start, df1_end], axis=1)
  209. return result
  210. def string_concatenate(str_list):
  211. """
  212. This function concatenates the input string list to generate a new string.
  213. If str_list is empty, an empty string is returned.
  214. If str_list has only one element, return that element.
  215. If str_list has multiple elements, concatenate all the elements except the last element with ', ', and then concatenate the resulting string with the last element with 'and'.
  216. Arguments:
  217. str_list: A list of strings.
  218. Returns:
  219. ans_str: A concatenated string.
  220. """
  221. if not str_list:
  222. return ""
  223. ans_str = '、'.join(str_list[:-1])
  224. ans_str = ans_str + "和" + str_list[-1] if len(str_list) > 1 else ans_str + str_list[-1]
  225. return ans_str
  226. def zip_time_pairs(time_list, zip_list):
  227. zip_time_pairs = zip(time_list, zip_list)
  228. zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
  229. return zip_vs_time
  230. def replace_key_with_value(input_string, replacement_dict):
  231. """
  232. 替换字符串中的关键字为给定的字典中的值。
  233. :param input_string: 需要进行替换的字符串
  234. :param replacement_dict: 替换规则,格式为 {key: value}
  235. :return: 替换后的字符串
  236. """
  237. # 遍历字典中的键值对,并用值替换字符串中的键
  238. for key, value in replacement_dict.items():
  239. if key in input_string:
  240. input_string = input_string.replace(key, value)
  241. return input_string
  242. def continous_judge(frame_list):
  243. if not frame_list:
  244. return 0
  245. cnt = 1
  246. for i in range(1, len(frame_list)):
  247. if frame_list[i] - frame_list[i - 1] <= 3:
  248. continue
  249. cnt += 1
  250. return cnt
  251. def get_interpolation(x, point1, point2):
  252. """
  253. According to the two extreme value points, the equation of one variable is determined,
  254. and the solution of the equation is obtained in the domain of definition.
  255. Arguments:
  256. x: A float number of the independent variable.
  257. point1: A set of the coordinate extreme point.
  258. point2: A set of the other coordinate extreme point.
  259. Returns:
  260. y: A float number of the dependent variable.
  261. """
  262. try:
  263. k = (point1[1] - point2[1]) / (point1[0] - point2[0])
  264. b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
  265. y = x * k + b
  266. return y
  267. except Exception as e:
  268. return f"Error: {str(e)}"
  269. def statistic_analysis(data_list):
  270. sorted_data_list = sorted(data_list)
  271. maxx = sorted_data_list[-1]
  272. minn = sorted_data_list[0]
  273. meann = sum(sorted_data_list) / len(sorted_data_list)
  274. percentile_99 = np.percentile(sorted_data_list, 99)
  275. ans_list = [maxx, minn, meann, percentile_99]
  276. return ans_list