common.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. ##################################################################
  4. #
  5. # Copyright (c) 2023 CICV, Inc. All Rights Reserved
  6. #
  7. ##################################################################
  8. """
  9. @Authors: yangzihao(yangzihao@china-icv.cn)
  10. @Data: 2023/07/24
  11. @Last Modified: 2023/07/24
  12. @Summary: Evaluateion functions
  13. """
  14. import sys
  15. sys.path.append('../common')
  16. sys.path.append('../modules')
  17. import os
  18. import time
  19. import json
  20. import math
  21. import pandas as pd
  22. import numpy as np
  23. import zipfile
  24. import importlib
  25. def get_status_active_data(active_time_ranges, df):
  26. # 给定的双层列表
  27. # active_time_ranges = status_trigger_dict['LKA']['LKA_active_time']
  28. # 使用Pandas的between函数(注意between是闭区间)结合列表推导来过滤
  29. # 这里需要遍历所有时间范围,然后使用`|`(逻辑或)将它们的结果合并
  30. filtered_df = pd.DataFrame(columns=df.columns)
  31. for start, end in active_time_ranges:
  32. mask = (df['simTime'] >= start) & (df['simTime'] <= end)
  33. if filtered_df is None:
  34. filtered_df = df[mask]
  35. else:
  36. filtered_df = pd.concat([filtered_df, df[mask]])
  37. # 上述方式可能引入重复行,使用drop_duplicates去重
  38. df_result = filtered_df.drop_duplicates()
  39. # ego_x = df_result[df_result['playerId'] == 1]['posX'].reset_index(drop = True)
  40. # ego_y = df_result[df_result['playerId'] == 1]['posY'].reset_index(drop = True)
  41. # obj_x = df_result[df_result['playerId'] == 2]['posX'].reset_index(drop = True)
  42. # obj_y = df_result[df_result['playerId'] == 2]['posY'].reset_index(drop = True)
  43. # print("ego_x is", ego_x)
  44. # ego_speedx = self.df[self.df['playerId'] == 1]['speedX'].reset_index(drop = True)
  45. # ego_speedy = self.df[self.df['playerId'] == 1]['speedY'].reset_index(drop = True)
  46. # obj_speedx = self.df[self.df['playerId'] == 2]['speedX'].reset_index(drop = True)
  47. # obj_speedy = self.df[self.df['playerId'] == 2]['speedY'].reset_index(drop = True)
  48. # dx = obj_x - ego_x
  49. # dy = obj_y - ego_y
  50. # vx = obj_speedx - ego_speedx
  51. # vy = obj_speedy - ego_speedy
  52. # dist = np.sqrt(dx**2 + dy**2)
  53. # ego_v_projection_in_dist = _cal_v_ego_projection(dx, dy, ego_speedx, ego_speedy)
  54. # thw1 = _cal_THW(dist, ego_v_projection_in_dist)
  55. # thw = thw1.tolist()
  56. # THW = [[x]*2 for x in thw]
  57. # df_result['THW'] = THW
  58. return df_result
  59. def _cal_v_ego_projection(dx, dy, v_x1, v_y1):
  60. AB_mod = np.sqrt(dx**2 + dy**2)
  61. U_ABx = dx/AB_mod
  62. U_ABy = dy/AB_mod
  63. V1_on_AB = v_x1 * U_ABx + v_y1 *U_ABy
  64. return V1_on_AB
  65. def _cal_THW(dist, ego_v_projection_in_dist):
  66. # if ego_v_projection_in_dist == 0:
  67. # return math.inf
  68. ego_v_projection_in_dist = ego_v_projection_in_dist.replace(0, 0.001)
  69. THW = dist / ego_v_projection_in_dist
  70. return THW
  71. def custom_metric_param_parser(param_list):
  72. """
  73. param_dict = {
  74. "paramA" [
  75. {
  76. "kind": "-1",
  77. "optimal": "1",
  78. "multiple": ["0.5","5"],
  79. "spare1": null,
  80. "spare2": null
  81. }
  82. ]
  83. }
  84. """
  85. kind_list = []
  86. optimal_list = []
  87. multiple_list = []
  88. spare_list = []
  89. # spare1_list = []
  90. # spare2_list = []
  91. for i in range(len(param_list)):
  92. kind_list.append(int(param_list[i]['kind']))
  93. optimal_list.append(float(param_list[i]['optimal']))
  94. multiple_list.append([float(x) for x in param_list[i]['multiple']])
  95. spare_list.append([item["param"] for item in param_list[i]["spare"]])
  96. # spare1_list.append(param_list[i]['spare1'])
  97. # spare2_list.append(param_list[i]['spare2'])
  98. result = {
  99. "kind": kind_list,
  100. "optimal": optimal_list,
  101. "multiple": multiple_list,
  102. "spare": spare_list,
  103. # "spare1": spare1_list,
  104. # "spare2": spare2_list
  105. }
  106. return result
  107. def score_over_100(score):
  108. if score > 100:
  109. return 100
  110. return score
  111. def import_score_class(path, file):
  112. sys.path.append(path) # 将path添加到Python的搜索路径中
  113. # module_name = file[:-3] # 去掉文件名的".py"后缀
  114. module = __import__(file, fromlist=['ScoreModel'])
  115. class_instance = module.ScoreModel
  116. return class_instance
  117. def import_class_lib(path, file):
  118. sys.path.append(path) # 将path添加到Python的搜索路径中
  119. module = importlib.import_module(file)
  120. class_name = "ScoreModel"
  121. class_instance = getattr(module, class_name)
  122. return class_instance
  123. def cal_velocity(lat_v, lon_v):
  124. """
  125. The function can calculate the velocity with lateral velocity and longitudinal velocity.
  126. Args:
  127. lat_v: lateral velocity, m/s
  128. lon_v: longitudinal velocity, m/s
  129. Returns:
  130. v: the resultant velocity, km/h
  131. """
  132. v = (lat_v ** 2 + lon_v ** 2) ** 0.5
  133. return v
  134. def df2csv(df, filePath):
  135. df.to_csv(f'{filePath}', index=False)
  136. def dict2json(input_dict, jsonPath):
  137. with open(f'{jsonPath}', 'w', encoding='utf-8') as f:
  138. f.write(json.dumps(input_dict, ensure_ascii=False))
  139. def json2dict(json_file):
  140. with open(json_file, 'r', encoding='utf-8') as f:
  141. json_dict = json.load(f)
  142. return json_dict
  143. def get_subfolders_name(path):
  144. return [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))]
  145. def zip_dir(dir_path, zip_file_path):
  146. """
  147. This function can zip the files in dir_path as a zipfile.
  148. Arguments:
  149. dir_path: A str of the path of the files to be ziped.
  150. zip_file_path: A str of the path of the zipfile to be stored.
  151. Returns:
  152. None
  153. """
  154. with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
  155. for root, dirs, files in os.walk(dir_path):
  156. for file in files:
  157. file_path = os.path.join(root, file)
  158. zipf.write(file_path)
  159. def score_grade(score):
  160. """
  161. Returns the corresponding grade based on the input score.
  162. Arguments:
  163. score: An integer or float representing the score.
  164. Returns:
  165. grade: A string representing the grade.
  166. """
  167. GRADE_EXCELLENT = 90
  168. GRADE_GOOD = 80
  169. GRADE_GENERAL = 60
  170. if score >= GRADE_EXCELLENT:
  171. grade = '优秀'
  172. elif score >= GRADE_GOOD:
  173. grade = '良好'
  174. elif score > GRADE_GENERAL:
  175. grade = '一般'
  176. else:
  177. grade = '较差'
  178. return grade
  179. def mileage_format(mileage):
  180. if mileage < 1000:
  181. return f"{mileage:.2f}米"
  182. else:
  183. mileage = mileage / 1000
  184. return f"{mileage:.2f}公里"
  185. def duration_format(duration):
  186. if duration < 60:
  187. return f"{duration:.2f}秒"
  188. elif duration < 3600:
  189. minute = int(duration / 60)
  190. second = int(duration % 60)
  191. return f"{minute}分{second}秒" if second != 0 else f"{minute}分"
  192. else:
  193. hour = int(duration / 3600)
  194. minute = int((duration % 3600) / 60)
  195. second = int(duration % 60)
  196. ans = f"{hour:d}时"
  197. ans = ans + f"{minute}分" if minute != 0 else ans
  198. ans = ans + f"{second}秒" if second != 0 else ans
  199. return ans
  200. def continuous_group(df):
  201. time_list = df['simTime'].values.tolist()
  202. frame_list = df['simFrame'].values.tolist()
  203. group_time = []
  204. group_frame = []
  205. sub_group_time = []
  206. sub_group_frame = []
  207. for i in range(len(frame_list)):
  208. if not sub_group_time or frame_list[i] - frame_list[i - 1] <= 1:
  209. sub_group_time.append(time_list[i])
  210. sub_group_frame.append(frame_list[i])
  211. else:
  212. group_time.append(sub_group_time)
  213. group_frame.append(sub_group_frame)
  214. sub_group_time = [time_list[i]]
  215. sub_group_frame = [frame_list[i]]
  216. group_time.append(sub_group_time)
  217. group_frame.append(sub_group_frame)
  218. group_time = [g for g in group_time if len(g) >= 2]
  219. group_frame = [g for g in group_frame if len(g) >= 2]
  220. # 输出图表值
  221. time = [[g[0], g[-1]] for g in group_time]
  222. frame = [[g[0], g[-1]] for g in group_frame]
  223. time_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  224. frame_df = pd.DataFrame(frame, columns=['start_frame', 'end_frame'])
  225. result_df = pd.concat([time_df, frame_df], axis=1)
  226. return result_df
  227. def continuous_group_old(df):
  228. time_list = df['simTime'].values.tolist()
  229. frame_list = df['simFrame'].values.tolist()
  230. group = []
  231. sub_group = []
  232. for i in range(len(frame_list)):
  233. if not sub_group or frame_list[i] - frame_list[i - 1] <= 1:
  234. sub_group.append(time_list[i])
  235. else:
  236. group.append(sub_group)
  237. sub_group = [time_list[i]]
  238. group.append(sub_group)
  239. group = [g for g in group if len(g) >= 2]
  240. # 输出图表值
  241. time = [[g[0], g[-1]] for g in group]
  242. unsafe_df = pd.DataFrame(time, columns=['start_time', 'end_time'])
  243. return unsafe_df
  244. def get_frame_with_time(df1, df2):
  245. # 将dataframe1按照start_time与simTime进行合并
  246. df1_start = df1.merge(df2[['simTime', 'simFrame']], left_on='start_time', right_on='simTime')
  247. df1_start = df1_start[['start_time', 'simFrame']].copy()
  248. df1_start.rename(columns={'simFrame': 'start_frame'}, inplace=True)
  249. # 将dataframe1按照end_time与simTime进行合并
  250. df1_end = df1.merge(df2[['simTime', 'simFrame']], left_on='end_time', right_on='simTime')
  251. df1_end = df1_end[['end_time', 'simFrame']].copy()
  252. df1_end.rename(columns={'simFrame': 'end_frame'}, inplace=True)
  253. # 将两个合并后的数据框按照行索引进行拼接
  254. result = pd.concat([df1_start, df1_end], axis=1)
  255. return result
  256. def string_concatenate(str_list):
  257. """
  258. This function concatenates the input string list to generate a new string.
  259. If str_list is empty, an empty string is returned.
  260. If str_list has only one element, return that element.
  261. If str_list has multiple elements, concatenate all the elements except the last element with ', ', and then concatenate the resulting string with the last element with 'and'.
  262. Arguments:
  263. str_list: A list of strings.
  264. Returns:
  265. ans_str: A concatenated string.
  266. """
  267. if not str_list:
  268. return ""
  269. ans_str = '、'.join(str_list[:-1])
  270. ans_str = ans_str + "和" + str_list[-1] if len(str_list) > 1 else ans_str + str_list[-1]
  271. return ans_str
  272. def zip_time_pairs(time_list, zip_list):
  273. zip_time_pairs = zip(time_list, zip_list)
  274. zip_vs_time = [[x, "" if math.isnan(y) else y] for x, y in zip_time_pairs]
  275. return zip_vs_time
  276. def replace_key_with_value(input_string, replacement_dict):
  277. """
  278. 替换字符串中的关键字为给定的字典中的值。
  279. :param input_string: 需要进行替换的字符串
  280. :param replacement_dict: 替换规则,格式为 {key: value}
  281. :return: 替换后的字符串
  282. """
  283. # 遍历字典中的键值对,并用值替换字符串中的键
  284. for key, value in replacement_dict.items():
  285. if key in input_string:
  286. input_string = input_string.replace(key, value)
  287. return input_string
  288. def continous_judge(frame_list):
  289. if not frame_list:
  290. return 0
  291. cnt = 1
  292. for i in range(1, len(frame_list)):
  293. if frame_list[i] - frame_list[i - 1] <= 3:
  294. continue
  295. cnt += 1
  296. return cnt
  297. def get_interpolation(x, point1, point2):
  298. """
  299. According to the two extreme value points, the equation of one variable is determined,
  300. and the solution of the equation is obtained in the domain of definition.
  301. Arguments:
  302. x: A float number of the independent variable.
  303. point1: A set of the coordinate extreme point.
  304. point2: A set of the other coordinate extreme point.
  305. Returns:
  306. y: A float number of the dependent variable.
  307. """
  308. try:
  309. k = (point1[1] - point2[1]) / (point1[0] - point2[0])
  310. b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
  311. y = x * k + b
  312. return y
  313. except Exception as e:
  314. return f"Error: {str(e)}"
  315. def statistic_analysis(data_list):
  316. sorted_data_list = sorted(data_list)
  317. maxx = sorted_data_list[-1]
  318. minn = sorted_data_list[0]
  319. meann = sum(sorted_data_list) / len(sorted_data_list)
  320. percentile_99 = np.percentile(sorted_data_list, 99)
  321. ans_list = [maxx, minn, meann, percentile_99]
  322. return ans_list