common.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. import json
  2. from typing import List, Dict, Tuple
  3. import numpy as np
  4. import pandas as pd
  5. def dict2json(data_dict: Dict, file_path: str) -> None:
  6. """
  7. 将字典转换为JSON格式并保存到文件中。
  8. 参数:
  9. data_dict (dict): 要转换的字典。
  10. file_path (str): 保存JSON文件的路径。
  11. """
  12. try:
  13. with open(file_path, "w", encoding="utf-8") as json_file:
  14. json.dump(data_dict, json_file, ensure_ascii=False, indent=4)
  15. print(f"JSON文件已保存到 {file_path}")
  16. except Exception as e:
  17. print(f"保存JSON文件时出错: {e}")
  18. def get_interpolation(x: float, point1: Tuple[float, float], point2: Tuple[float, float]) -> float:
  19. """
  20. 根据两个点确定一元一次方程,并在定义域内求解。
  21. 参数:
  22. x: 自变量的值。
  23. point1: 第一个点的坐标。
  24. point2: 第二个点的坐标。
  25. 返回:
  26. y: 因变量的值。
  27. """
  28. try:
  29. k = (point1[1] - point2[1]) / (point1[0] - point2[0])
  30. b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
  31. return x * k + b
  32. except Exception as e:
  33. return f"Error: {str(e)}"
  34. def get_frame_with_time(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
  35. """
  36. 将两个DataFrame按照时间列进行合并,并返回结果。
  37. 参数:
  38. df1: 包含start_time和end_time的DataFrame。
  39. df2: 包含simTime和simFrame的DataFrame。
  40. 返回:
  41. 合并后的DataFrame。
  42. """
  43. df1_start = df1.merge(df2[["simTime", "simFrame"]], left_on="start_time", right_on="simTime")
  44. df1_start = df1_start[["start_time", "simFrame"]].rename(columns={"simFrame": "start_frame"})
  45. df1_end = df1.merge(df2[["simTime", "simFrame"]], left_on="end_time", right_on="simTime")
  46. df1_end = df1_end[["end_time", "simFrame"]].rename(columns={"simFrame": "end_frame"})
  47. return pd.concat([df1_start, df1_end], axis=1)
  48. class PolynomialCurvatureFitting:
  49. def __init__(self, data_path: str, degree: int = 3):
  50. self.data_path = data_path
  51. self.degree = degree
  52. self.data = pd.read_csv(self.data_path)
  53. self.points = self.data[['centerLine_x', 'centerLine_y']].values
  54. self.x_data, self.y_data = self.points[:, 0], self.points[:, 1]
  55. def curvature(self, coefficients: np.ndarray, x: float) -> float:
  56. """
  57. 计算多项式在x处的曲率。
  58. 参数:
  59. coefficients: 多项式系数。
  60. x: 自变量的值。
  61. 返回:
  62. 曲率值。
  63. """
  64. first_derivative = np.polyder(coefficients)
  65. second_derivative = np.polyder(first_derivative)
  66. return np.abs(np.polyval(second_derivative, x)) / (1 + np.polyval(first_derivative, x) ** 2) ** (3 / 2)
  67. def polynomial_fit(self, x_window: np.ndarray, y_window: np.ndarray) -> Tuple[np.ndarray, np.poly1d]:
  68. """
  69. 对给定的窗口数据进行多项式拟合。
  70. 参数:
  71. x_window: 窗口内的x数据。
  72. y_window: 窗口内的y数据。
  73. 返回:
  74. 多项式系数和多项式对象。
  75. """
  76. coefficients = np.polyfit(x_window, y_window, self.degree)
  77. return coefficients, np.poly1d(coefficients)
  78. def find_best_window(self, point: Tuple[float, float], window_size: int) -> int:
  79. """
  80. 找到最佳窗口的起始索引。
  81. 参数:
  82. point: 目标点的坐标。
  83. window_size: 窗口大小。
  84. 返回:
  85. 最佳窗口的起始索引。
  86. """
  87. x1, y1 = point
  88. window_means = np.array([
  89. (np.mean(self.x_data[start:start + window_size]), np.mean(self.y_data[start:start + window_size]))
  90. for start in range(len(self.x_data) - window_size + 1)
  91. ])
  92. distances = np.sqrt((x1 - window_means[:, 0]) ** 2 + (y1 - window_means[:, 1]) ** 2)
  93. return np.argmin(distances)
  94. def find_projection(self, x_point: float, y_point: float, polynomial: np.poly1d, x_data_range: Tuple[float, float], search_step: float = 0.0001) -> Tuple[float, float, float]:
  95. """
  96. 找到目标点在多项式曲线上的投影点。
  97. 参数:
  98. x_point: 目标点的x坐标。
  99. y_point: 目标点的y坐标。
  100. polynomial: 多项式对象。
  101. x_data_range: x的取值范围。
  102. search_step: 搜索步长。
  103. 返回:
  104. 投影点的坐标和最小距离。
  105. """
  106. x_values = np.arange(x_data_range[0], x_data_range[1], search_step)
  107. y_values = polynomial(x_values)
  108. distances = np.sqrt((x_point - x_values) ** 2 + (y_point - y_values) ** 2)
  109. min_idx = np.argmin(distances)
  110. return x_values[min_idx], y_values[min_idx], distances[min_idx]
  111. def fit_and_project(self, points: List[Tuple[float, float]], window_size: int) -> List[Dict]:
  112. """
  113. 对每个点进行多项式拟合和投影计算。
  114. 参数:
  115. points: 目标点列表。
  116. window_size: 窗口大小。
  117. 返回:
  118. 包含投影点、曲率、曲率变化和最小距离的字典列表。
  119. """
  120. results = []
  121. for point in points:
  122. best_start = self.find_best_window(point, window_size)
  123. x_window = self.x_data[best_start:best_start + window_size]
  124. y_window = self.y_data[best_start:best_start + window_size]
  125. coefficients, polynomial = self.polynomial_fit(x_window, y_window)
  126. proj_x, proj_y, min_distance = self.find_projection(point[0], point[1], polynomial, (min(x_window), max(x_window)))
  127. curvature_value = self.curvature(coefficients, proj_x)
  128. second_derivative_coefficients = np.polyder(np.polyder(coefficients))
  129. curvature_change_value = np.polyval(second_derivative_coefficients, proj_x)
  130. results.append({
  131. 'projection': (proj_x, proj_y),
  132. 'curvHor': curvature_value,
  133. 'curvHorDot': curvature_change_value,
  134. 'coefficients': coefficients,
  135. 'laneOffset': min_distance
  136. })
  137. return results
  138. if __name__ == "__main__":
  139. data_path = "/home/kevin/kevin/zhaoyuan/zhaoyuan/data/raw/data/LaneMap.csv"
  140. point_path = "/home/kevin/kevin/zhaoyuan/zhaoyuan/data/raw/data/EgoMap.csv"
  141. points_data = pd.read_csv(point_path)
  142. points = points_data[['posX', 'posY']].values
  143. window_size = 4
  144. fitting_instance = PolynomialCurvatureFitting(data_path)
  145. projections = fitting_instance.fit_and_project(points, window_size)
  146. fitting_instance.plot_results(points, projections)