common.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/07/24
  11. @Last Modified: 2023/07/24
  12. @Summary: Evaluateion functions
  13. """
  14. import sys
  15. sys.path.append('../common')
  16. sys.path.append('../modules')
  17. import os
  18. import time
  19. import json
  20. import math
  21. import pandas as pd
  22. import numpy as np
  23. import zipfile
  24. import importlib
  25. def get_status_active_data(active_time_ranges, df):
  26. # 给定的双层列表
  27. # active_time_ranges = status_trigger_dict['LKA']['LKA_active_time']
  28. # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
  29. # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
  30. filtered_df = pd.DataFrame(columns=df.columns)
  31. for start, end in active_time_ranges:
  32. mask = (df['simTime'] >= start) & (df['simTime'] <= end)
  33. if filtered_df is None:
  34. filtered_df = df[mask]
  35. else:
  36. filtered_df = pd.concat([filtered_df, df[mask]])
  37. # 上述方式可能引入重复行,使用drop_duplicates去重
  38. df_result = filtered_df.drop_duplicates()
  39. return df_result
  40. def custom_metric_param_parser(param_list):
  41. """
  42. param_dict = {
  43. "paramA" [
  44. {
  45. "kind": "-1",
  46. "optimal": "1",
  47. "multiple": ["0.5","5"],
  48. "spare1": null,
  49. "spare2": null
  50. }
  51. ]
  52. }
  53. """
  54. kind_list = []
  55. optimal_list = []
  56. multiple_list = []
  57. spare_list = []
  58. # spare1_list = []
  59. # spare2_list = []
  60. for i in range(len(param_list)):
  61. kind_list.append(int(param_list[i]['kind']))
  62. optimal_list.append(float(param_list[i]['optimal']))
  63. multiple_list.append([float(x) for x in param_list[i]['multiple']])
  64. spare_list.append([item["param"] for item in param_list[i]["spare"]])
  65. # spare1_list.append(param_list[i]['spare1'])
  66. # spare2_list.append(param_list[i]['spare2'])
  67. result = {
  68. "kind": kind_list,
  69. "optimal": optimal_list,
  70. "multiple": multiple_list,
  71. "spare": spare_list,
  72. # "spare1": spare1_list,
  73. # "spare2": spare2_list
  74. }
  75. return result
  76. def score_over_100(score):
  77. if score > 100:
  78. return 100
  79. return score
  80. def import_score_class(path, file):
  81. sys.path.append(path) # 将path添加到Python的搜索路径中
  82. # module_name = file[:-3] # 去掉文件名的".py"后缀
  83. module = __import__(file, fromlist=['ScoreModel'])
  84. class_instance = module.ScoreModel
  85. return class_instance
  86. def import_class_lib(path, file):
  87. sys.path.append(path) # 将path添加到Python的搜索路径中
  88. module = importlib.import_module(file)
  89. class_name = "ScoreModel"
  90. class_instance = getattr(module, class_name)
  91. return class_instance
  92. def cal_velocity(lat_v, lon_v):
  93. """
  94. The function can calculate the velocity with lateral velocity and longitudinal velocity.
  95. Args:
  96. lat_v: lateral velocity, m/s
  97. lon_v: longitudinal velocity, m/s
  98. Returns:
  99. v: the resultant velocity, km/h
  100. """
  101. v = (lat_v ** 2 + lon_v ** 2) ** 0.5
  102. return v
  103. def df2csv(df, filePath):
  104. df.to_csv(f'{filePath}', index=False)
  105. def dict2json(input_dict, jsonPath):
  106. with open(f'{jsonPath}', 'w', encoding='utf-8') as f:
  107. f.write(json.dumps(input_dict, ensure_ascii=False))
  108. def json2dict(json_file):
  109. with open(json_file, 'r', encoding='utf-8') as f:
  110. json_dict = json.load(f)
  111. return json_dict
  112. def get_subfolders_name(path):
  113. return [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))]
  114. def zip_dir(dir_path, zip_file_path):
  115. """
  116. This function can zip the files in dir_path as a zipfile.
  117. Arguments:
  118. dir_path: A str of the path of the files to be ziped.
  119. zip_file_path: A str of the path of the zipfile to be stored.
  120. Returns:
  121. None
  122. """
  123. with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
  124. for root, dirs, files in os.walk(dir_path):
  125. for file in files:
  126. file_path = os.path.join(root, file)
  127. zipf.write(file_path)
  128. def score_grade(score):
  129. """
  130. Returns the corresponding grade based on the input score.
  131. Arguments:
  132. score: An integer or float representing the score.
  133. Returns:
  134. grade: A string representing the grade.
  135. """
  136. GRADE_EXCELLENT = 90
  137. GRADE_GOOD = 80
  138. GRADE_GENERAL = 60
  139. if score >= GRADE_EXCELLENT:
  140. grade = '优秀'
  141. elif score >= GRADE_GOOD:
  142. grade = '良好'
  143. elif score > GRADE_GENERAL:
  144. grade = '一般'
  145. else:
  146. grade = '较差'
  147. return grade
  148. def mileage_format(mileage):
  149. if mileage < 1000:
  150. return f"{mileage:.2f}米"
  151. else:
  152. mileage = mileage / 1000
  153. return f"{mileage:.2f}公里"
  154. def duration_format(duration):
  155. if duration < 60:
  156. return f"{duration:.2f}秒"
  157. elif duration < 3600:
  158. minute = int(duration / 60)
  159. second = int(duration % 60)
  160. return f"{minute}分{second}秒" if second != 0 else f"{minute}分"
  161. else:
  162. hour = int(duration / 3600)
  163. minute = int((duration % 3600) / 60)
  164. second = int(duration % 60)
  165. ans = f"{hour:d}时"
  166. ans = ans + f"{minute}分" if minute != 0 else ans
  167. ans = ans + f"{second}秒" if second != 0 else ans
  168. return ans
  169. def continuous_group(df):
  170. time_list = df['simTime'].values.tolist()
  171. frame_list = df['simFrame'].values.tolist()
  172. group_time = []
  173. group_frame = []
  174. sub_group_time = []
  175. sub_group_frame = []
  176. for i in range(len(frame_list)):
  177. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  178. sub_group_time.append(time_list[i])
  179. sub_group_frame.append(frame_list[i])
  180. else:
  181. group_time.append(sub_group_time)
  182. group_frame.append(sub_group_frame)
  183. sub_group_time = [time_list[i]]
  184. sub_group_frame = [frame_list[i]]
  185. group_time.append(sub_group_time)
  186. group_frame.append(sub_group_frame)
  187. group_time = [g for g in group_time if len(g) >= 2]
  188. group_frame = [g for g in group_frame if len(g) >= 2]
  189. # 输出图表值
  190. time = [[g[0], g[-1]] for g in group_time]
  191. frame = [[g[0], g[-1]] for g in group_frame]
  192. time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  193. frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
  194. result_df = pd.concat([time_df, frame_df], axis=1)
  195. return result_df
  196. def continuous_group_old(df):
  197. time_list = df['simTime'].values.tolist()
  198. frame_list = df['simFrame'].values.tolist()
  199. group = []
  200. sub_group = []
  201. for i in range(len(frame_list)):
  202. if not sub_group or frame_list[i] - frame_list[i - 1] <= 1:
  203. sub_group.append(time_list[i])
  204. else:
  205. group.append(sub_group)
  206. sub_group = [time_list[i]]
  207. group.append(sub_group)
  208. group = [g for g in group if len(g) >= 2]
  209. # 输出图表值
  210. time = [[g[0], g[-1]] for g in group]
  211. unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  212. return unsafe_df
  213. def get_frame_with_time(df1, df2):
  214. # 将dataframe1按照start_time与simTime进行合并
  215. df1_start = df1.merge(df2[['simTime', 'simFrame']], left_on='start_time', right_on='simTime')
  216. df1_start = df1_start[['start_time', 'simFrame']].copy()
  217. df1_start.rename(columns={'simFrame': 'start_frame'}, inplace=True)
  218. # 将dataframe1按照end_time与simTime进行合并
  219. df1_end = df1.merge(df2[['simTime', 'simFrame']], left_on='end_time', right_on='simTime')
  220. df1_end = df1_end[['end_time', 'simFrame']].copy()
  221. df1_end.rename(columns={'simFrame': 'end_frame'}, inplace=True)
  222. # 将两个合并后的数据框按照行索引进行拼接
  223. result = pd.concat([df1_start, df1_end], axis=1)
  224. return result
  225. def string_concatenate(str_list):
  226. """
  227. This function concatenates the input string list to generate a new string.
  228. If str_list is empty, an empty string is returned.
  229. If str_list has only one element, return that element.
  230. If str_list has multiple elements, concatenate all the elements except the last element with ', ', and then concatenate the resulting string with the last element with 'and'.
  231. Arguments:
  232. str_list: A list of strings.
  233. Returns:
  234. ans_str: A concatenated string.
  235. """
  236. if not str_list:
  237. return ""
  238. ans_str = '、'.join(str_list[:-1])
  239. ans_str = ans_str + "和" + str_list[-1] if len(str_list) > 1 else ans_str + str_list[-1]
  240. return ans_str
  241. def zip_time_pairs(time_list, zip_list):
  242. zip_time_pairs = zip(time_list, zip_list)
  243. zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
  244. return zip_vs_time
  245. def replace_key_with_value(input_string, replacement_dict):
  246. """
  247. 替换字符串中的关键字为给定的字典中的值。
  248. :param input_string: 需要进行替换的字符串
  249. :param replacement_dict: 替换规则,格式为 {key: value}
  250. :return: 替换后的字符串
  251. """
  252. # 遍历字典中的键值对,并用值替换字符串中的键
  253. for key, value in replacement_dict.items():
  254. if key in input_string:
  255. input_string = input_string.replace(key, value)
  256. return input_string
  257. def continous_judge(frame_list):
  258. if not frame_list:
  259. return 0
  260. cnt = 1
  261. for i in range(1, len(frame_list)):
  262. if frame_list[i] - frame_list[i - 1] <= 3:
  263. continue
  264. cnt += 1
  265. return cnt
  266. def get_interpolation(x, point1, point2):
  267. """
  268. According to the two extreme value points, the equation of one variable is determined,
  269. and the solution of the equation is obtained in the domain of definition.
  270. Arguments:
  271. x: A float number of the independent variable.
  272. point1: A set of the coordinate extreme point.
  273. point2: A set of the other coordinate extreme point.
  274. Returns:
  275. y: A float number of the dependent variable.
  276. """
  277. try:
  278. k = (point1[1] - point2[1]) / (point1[0] - point2[0])
  279. b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
  280. y = x * k + b
  281. return y
  282. except Exception as e:
  283. return f"Error: {str(e)}"
  284. def statistic_analysis(data_list):
  285. sorted_data_list = sorted(data_list)
  286. maxx = sorted_data_list[-1]
  287. minn = sorted_data_list[0]
  288. meann = sum(sorted_data_list) / len(sorted_data_list)
  289. percentile_99 = np.percentile(sorted_data_list, 99)
  290. ans_list = [maxx, minn, meann, percentile_99]
  291. return ans_list