common.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import json
  2. from typing import List, Dict, Tuple
  3. import numpy as np
  4. import pandas as pd
  5. def dict2json(data_dict: Dict, file_path: str) -> None:
  6. """
  7. 将字典转换为JSON格式并保存到文件中。
  8. 参数:
  9. data_dict (dict): 要转换的字典。
  10. file_path (str): 保存JSON文件的路径。
  11. """
  12. try:
  13. with open(file_path, "w", encoding="utf-8") as json_file:
  14. json.dump(data_dict, json_file, ensure_ascii=False, indent=4)
  15. print(f"JSON文件已保存到 {file_path}")
  16. except Exception as e:
  17. print(f"保存JSON文件时出错: {e}")
  18. def get_interpolation(x: float, point1: Tuple[float, float], point2: Tuple[float, float]) -> float:
  19. """
  20. 根据两个点确定一元一次方程,并在定义域内求解。
  21. 参数:
  22. x: 自变量的值。
  23. point1: 第一个点的坐标。
  24. point2: 第二个点的坐标。
  25. 返回:
  26. y: 因变量的值。
  27. """
  28. try:
  29. k = (point1[1] - point2[1]) / (point1[0] - point2[0])
  30. b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
  31. return x * k + b
  32. except Exception as e:
  33. return f"Error: {str(e)}"
  34. def get_frame_with_time_fast(time_to_frame, target_time):
  35. """
  36. 使用字典快速查找最接近的帧号(假设时间戳是精确匹配的)
  37. """
  38. # 如果目标时间存在精确匹配
  39. if target_time in time_to_frame:
  40. return time_to_frame[target_time]
  41. # 否则找最接近的时间(如果无法保证精确匹配)
  42. closest_time = min(time_to_frame.keys(), key=lambda t: abs(t - target_time))
  43. return time_to_frame[closest_time]
  44. # 辅助函数
  45. def get_frame_with_time(time_list, frame_list, target_time):
  46. time_array = np.array(time_list)
  47. idx = np.argmin(np.abs(time_array - target_time))
  48. return frame_list[idx]
  49. class PolynomialCurvatureFitting:
  50. def __init__(self, data_path: str, degree: int = 3):
  51. self.data_path = data_path
  52. self.degree = degree
  53. self.data = pd.read_csv(self.data_path)
  54. self.points = self.data[['centerLine_x', 'centerLine_y']].values
  55. self.x_data, self.y_data = self.points[:, 0], self.points[:, 1]
  56. def curvature(self, coefficients: np.ndarray, x: float) -> float:
  57. """
  58. 计算多项式在x处的曲率。
  59. 参数:
  60. coefficients: 多项式系数。
  61. x: 自变量的值。
  62. 返回:
  63. 曲率值。
  64. """
  65. first_derivative = np.polyder(coefficients)
  66. second_derivative = np.polyder(first_derivative)
  67. return np.abs(np.polyval(second_derivative, x)) / (1 + np.polyval(first_derivative, x) ** 2) ** (3 / 2)
  68. def polynomial_fit(self, x_window: np.ndarray, y_window: np.ndarray) -> Tuple[np.ndarray, np.poly1d]:
  69. """
  70. 对给定的窗口数据进行多项式拟合。
  71. 参数:
  72. x_window: 窗口内的x数据。
  73. y_window: 窗口内的y数据。
  74. 返回:
  75. 多项式系数和多项式对象。
  76. """
  77. coefficients = np.polyfit(x_window, y_window, self.degree)
  78. return coefficients, np.poly1d(coefficients)
  79. def find_best_window(self, point: Tuple[float, float], window_size: int) -> int:
  80. """
  81. 找到最佳窗口的起始索引。
  82. 参数:
  83. point: 目标点的坐标。
  84. window_size: 窗口大小。
  85. 返回:
  86. 最佳窗口的起始索引。
  87. """
  88. x1, y1 = point
  89. window_means = np.array([
  90. (np.mean(self.x_data[start:start + window_size]), np.mean(self.y_data[start:start + window_size]))
  91. for start in range(len(self.x_data) - window_size + 1)
  92. ])
  93. distances = np.sqrt((x1 - window_means[:, 0]) ** 2 + (y1 - window_means[:, 1]) ** 2)
  94. return np.argmin(distances)
  95. def find_projection(self, x_point: float, y_point: float, polynomial: np.poly1d, x_data_range: Tuple[float, float],
  96. search_step: float = 0.0001) -> Tuple[float, float, float]:
  97. """
  98. 找到目标点在多项式曲线上的投影点。
  99. 参数:
  100. x_point: 目标点的x坐标。
  101. y_point: 目标点的y坐标。
  102. polynomial: 多项式对象。
  103. x_data_range: x的取值范围。
  104. search_step: 搜索步长。
  105. 返回:
  106. 投影点的坐标和最小距离。
  107. """
  108. x_values = np.arange(x_data_range[0], x_data_range[1], search_step)
  109. y_values = polynomial(x_values)
  110. distances = np.sqrt((x_point - x_values) ** 2 + (y_point - y_values) ** 2)
  111. min_idx = np.argmin(distances)
  112. return x_values[min_idx], y_values[min_idx], distances[min_idx]
  113. def fit_and_project(self, points: List[Tuple[float, float]], window_size: int) -> List[Dict]:
  114. """
  115. 对每个点进行多项式拟合和投影计算。
  116. 参数:
  117. points: 目标点列表。
  118. window_size: 窗口大小。
  119. 返回:
  120. 包含投影点、曲率、曲率变化和最小距离的字典列表。
  121. """
  122. results = []
  123. for point in points:
  124. best_start = self.find_best_window(point, window_size)
  125. x_window = self.x_data[best_start:best_start + window_size]
  126. y_window = self.y_data[best_start:best_start + window_size]
  127. coefficients, polynomial = self.polynomial_fit(x_window, y_window)
  128. proj_x, proj_y, min_distance = self.find_projection(point[0], point[1], polynomial,
  129. (min(x_window), max(x_window)))
  130. curvature_value = self.curvature(coefficients, proj_x)
  131. second_derivative_coefficients = np.polyder(np.polyder(coefficients))
  132. curvature_change_value = np.polyval(second_derivative_coefficients, proj_x)
  133. results.append({
  134. 'projection': (proj_x, proj_y),
  135. 'curvHor': curvature_value,
  136. 'curvHorDot': curvature_change_value,
  137. 'coefficients': coefficients,
  138. 'laneOffset': min_distance
  139. })
  140. return results
  141. if __name__ == "__main__":
  142. data_path = "/home/kevin/kevin/zhaoyuan/zhaoyuan/data/raw/data/LaneMap.csv"
  143. point_path = "/home/kevin/kevin/zhaoyuan/zhaoyuan/data/raw/data/EgoMap.csv"
  144. points_data = pd.read_csv(point_path)
  145. points = points_data[['posX', 'posY']].values
  146. window_size = 4
  147. fitting_instance = PolynomialCurvatureFitting(data_path)
  148. projections = fitting_instance.fit_and_project(points, window_size)
  149. fitting_instance.plot_results(points, projections)