common.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/07/24
  11. @Last Modified: 2023/07/24
  12. @Summary: Evaluateion functions
  13. """
  14. import sys
  15. sys.path.append('../common')
  16. sys.path.append('../modules')
  17. import os
  18. import time
  19. import json
  20. import math
  21. import pandas as pd
  22. import numpy as np
  23. import zipfile
  24. import importlib
  25. def import_class(path, file):
  26. sys.path.append(path) # 将path添加到Python的搜索路径中
  27. # module_name = file[:-3] # 去掉文件名的".py"后缀
  28. module = __import__(file, fromlist=['ScoreModel'])
  29. class_instance = module.ScoreModel
  30. return class_instance
  31. def import_class_lib(path, file):
  32. sys.path.append(path) # 将path添加到Python的搜索路径中
  33. module = importlib.import_module(file)
  34. class_name = "ScoreModel"
  35. class_instance = getattr(module, class_name)
  36. return class_instance
  37. def cal_velocity(lat_v, lon_v):
  38. """
  39. The function can calculate the velocity with lateral velocity and longitudinal velocity.
  40. Args:
  41. lat_v: lateral velocity, m/s
  42. lon_v: longitudinal velocity, m/s
  43. Returns:
  44. v: the resultant velocity, km/h
  45. """
  46. v = (lat_v ** 2 + lon_v ** 2) ** 0.5 * 3.6
  47. return v
  48. def json2dict(json_file):
  49. with open(json_file, 'r', encoding='utf-8') as f:
  50. json_dict = json.load(f)
  51. return json_dict
  52. def get_subfolders_name(path):
  53. return [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))]
  54. def zip_dir(dir_path, zip_file_path):
  55. """
  56. This function can zip the files in dir_path as a zipfile.
  57. Arguments:
  58. dir_path: A str of the path of the files to be ziped.
  59. zip_file_path: A str of the path of the zipfile to be stored.
  60. Returns:
  61. None
  62. """
  63. with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
  64. for root, dirs, files in os.walk(dir_path):
  65. for file in files:
  66. file_path = os.path.join(root, file)
  67. zipf.write(file_path)
  68. def score_grade(score):
  69. """
  70. Returns the corresponding grade based on the input score.
  71. Arguments:
  72. score: An integer or float representing the score.
  73. Returns:
  74. grade: A string representing the grade.
  75. """
  76. if score >= 90:
  77. grade = '优秀'
  78. elif score >= 80:
  79. grade = '良好'
  80. elif score > 60:
  81. grade = '一般'
  82. else:
  83. grade = '较差'
  84. return grade
  85. def mileage_format(mileage):
  86. if mileage < 1000:
  87. return f"{mileage:.2f}米"
  88. else:
  89. mileage = mileage / 1000
  90. return f"{mileage:.2f}公里"
  91. def duration_format(duration):
  92. if duration < 60:
  93. return f"{duration:.2f}秒"
  94. elif duration < 3600:
  95. minute = int(duration / 60)
  96. second = int(duration % 60)
  97. return f"{minute}分{second}秒" if second != 0 else f"{minute}分"
  98. else:
  99. hour = int(duration / 3600)
  100. minute = int((duration % 3600) / 60)
  101. second = int(duration % 60)
  102. ans = f"{hour:d}时"
  103. ans = ans + f"{minute}分" if minute != 0 else ans
  104. ans = ans + f"{second}秒" if second != 0 else ans
  105. return ans
  106. def continuous_group(df):
  107. time_list = df['simTime'].values.tolist()
  108. frame_list = df['simFrame'].values.tolist()
  109. group_time = []
  110. group_frame = []
  111. sub_group_time = []
  112. sub_group_frame = []
  113. for i in range(len(frame_list)):
  114. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  115. sub_group_time.append(time_list[i])
  116. sub_group_frame.append(frame_list[i])
  117. else:
  118. group_time.append(sub_group_time)
  119. group_frame.append(sub_group_frame)
  120. sub_group_time = [time_list[i]]
  121. sub_group_frame = [frame_list[i]]
  122. group_time.append(sub_group_time)
  123. group_frame.append(sub_group_frame)
  124. group_time = [g for g in group_time if len(g) >= 2]
  125. group_frame = [g for g in group_frame if len(g) >= 2]
  126. # 输出图表值
  127. time = [[g[0], g[-1]] for g in group_time]
  128. frame = [[g[0], g[-1]] for g in group_frame]
  129. time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  130. frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
  131. result_df = pd.concat([time_df, frame_df], axis=1)
  132. return result_df
  133. def continuous_group_old(df):
  134. time_list = df['simTime'].values.tolist()
  135. frame_list = df['simFrame'].values.tolist()
  136. group = []
  137. sub_group = []
  138. for i in range(len(frame_list)):
  139. if not sub_group or frame_list[i] - frame_list[i - 1] <= 1:
  140. sub_group.append(time_list[i])
  141. else:
  142. group.append(sub_group)
  143. sub_group = [time_list[i]]
  144. group.append(sub_group)
  145. group = [g for g in group if len(g) >= 2]
  146. # 输出图表值
  147. time = [[g[0], g[-1]] for g in group]
  148. unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  149. return unsafe_df
  150. def get_frame_with_time(df1, df2):
  151. # 将dataframe1按照start_time与simTime进行合并
  152. df1_start = df1.merge(df2[['simTime', 'simFrame']], left_on='start_time', right_on='simTime')
  153. df1_start = df1_start[['start_time', 'simFrame']].copy()
  154. df1_start.rename(columns={'simFrame': 'start_frame'}, inplace=True)
  155. # 将dataframe1按照end_time与simTime进行合并
  156. df1_end = df1.merge(df2[['simTime', 'simFrame']], left_on='end_time', right_on='simTime')
  157. df1_end = df1_end[['end_time', 'simFrame']].copy()
  158. df1_end.rename(columns={'simFrame': 'end_frame'}, inplace=True)
  159. # 将两个合并后的数据框按照行索引进行拼接
  160. result = pd.concat([df1_start, df1_end], axis=1)
  161. return result
  162. def string_concatenate(str_list):
  163. """
  164. This function concatenates the input string list to generate a new string.
  165. If str_list is empty, an empty string is returned.
  166. If str_list has only one element, return that element.
  167. If str_list has multiple elements, concatenate all the elements except the last element with ', ', and then concatenate the resulting string with the last element with 'and'.
  168. Arguments:
  169. str_list: A list of strings.
  170. Returns:
  171. ans_str: A concatenated string.
  172. """
  173. if not str_list:
  174. return ""
  175. ans_str = '、'.join(str_list[:-1])
  176. ans_str = ans_str + "和" + str_list[-1] if len(str_list) > 1 else ans_str + str_list[-1]
  177. return ans_str
  178. def zip_time_pairs(time_list, zip_list, upper_limit=9999):
  179. zip_time_pairs = zip(time_list, zip_list)
  180. zip_vs_time = [[x, upper_limit if y > upper_limit else y] for x, y in zip_time_pairs if not math.isnan(y)]
  181. return zip_vs_time
  182. def replace_key_with_value(input_string, replacement_dict):
  183. """
  184. 替换字符串中的关键字为给定的字典中的值。
  185. :param input_string: 需要进行替换的字符串
  186. :param replacement_dict: 替换规则,格式为 {key: value}
  187. :return: 替换后的字符串
  188. """
  189. # 遍历字典中的键值对,并用值替换字符串中的键
  190. for key, value in replacement_dict.items():
  191. if key in input_string:
  192. input_string = input_string.replace(key, value)
  193. return input_string
  194. def continous_judge(frame_list):
  195. if not frame_list:
  196. return 0
  197. cnt = 1
  198. for i in range(1, len(frame_list)):
  199. if frame_list[i] - frame_list[i - 1] <= 3:
  200. continue
  201. cnt += 1
  202. return cnt
  203. def get_interpolation(x, point1, point2):
  204. """
  205. According to the two extreme value points, the equation of one variable is determined,
  206. and the solution of the equation is obtained in the domain of definition.
  207. Arguments:
  208. x: A float number of the independent variable.
  209. point1: A set of the coordinate extreme point.
  210. point2: A set of the other coordinate extreme point.
  211. Returns:
  212. y: A float number of the dependent variable.
  213. """
  214. try:
  215. k = (point1[1] - point2[1]) / (point1[0] - point2[0])
  216. b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
  217. y = x * k + b
  218. return y
  219. except Exception as e:
  220. return f"Error: {str(e)}"
  221. def statistic_analysis(data_list):
  222. sorted_data_list = sorted(data_list)
  223. maxx = sorted_data_list[-1]
  224. minn = sorted_data_list[0]
  225. meann = sum(sorted_data_list) / len(sorted_data_list)
  226. percentile_99 = np.percentile(sorted_data_list, 99)
  227. ans_list = [maxx, minn, meann, percentile_99]
  228. return ans_list