123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- import json
- from typing import List, Dict, Tuple
- import numpy as np
- import pandas as pd
-
- def dict2json(data_dict: Dict, file_path: str) -> None:
- """
- 将字典转换为JSON格式并保存到文件中。
- 参数:
- data_dict (dict): 要转换的字典。
- file_path (str): 保存JSON文件的路径。
- """
- try:
- with open(file_path, "w", encoding="utf-8") as json_file:
- json.dump(data_dict, json_file, ensure_ascii=False, indent=4)
- print(f"JSON文件已保存到 {file_path}")
- except Exception as e:
- print(f"保存JSON文件时出错: {e}")
- def get_interpolation(x: float, point1: Tuple[float, float], point2: Tuple[float, float]) -> float:
- """
- 根据两个点确定一元一次方程,并在定义域内求解。
- 参数:
- x: 自变量的值。
- point1: 第一个点的坐标。
- point2: 第二个点的坐标。
- 返回:
- y: 因变量的值。
- """
- try:
- k = (point1[1] - point2[1]) / (point1[0] - point2[0])
- b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0])
- return x * k + b
- except Exception as e:
- return f"Error: {str(e)}"
- def get_frame_with_time(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
- """
- 将两个DataFrame按照时间列进行合并,并返回结果。
- 参数:
- df1: 包含start_time和end_time的DataFrame。
- df2: 包含simTime和simFrame的DataFrame。
- 返回:
- 合并后的DataFrame。
- """
- df1_start = df1.merge(df2[["simTime", "simFrame"]], left_on="start_time", right_on="simTime")
- df1_start = df1_start[["start_time", "simFrame"]].rename(columns={"simFrame": "start_frame"})
- df1_end = df1.merge(df2[["simTime", "simFrame"]], left_on="end_time", right_on="simTime")
- df1_end = df1_end[["end_time", "simFrame"]].rename(columns={"simFrame": "end_frame"})
- return pd.concat([df1_start, df1_end], axis=1)
- class PolynomialCurvatureFitting:
- def __init__(self, data_path: str, degree: int = 3):
- self.data_path = data_path
- self.degree = degree
- self.data = pd.read_csv(self.data_path)
- self.points = self.data[['centerLine_x', 'centerLine_y']].values
- self.x_data, self.y_data = self.points[:, 0], self.points[:, 1]
- def curvature(self, coefficients: np.ndarray, x: float) -> float:
- """
- 计算多项式在x处的曲率。
- 参数:
- coefficients: 多项式系数。
- x: 自变量的值。
- 返回:
- 曲率值。
- """
- first_derivative = np.polyder(coefficients)
- second_derivative = np.polyder(first_derivative)
- return np.abs(np.polyval(second_derivative, x)) / (1 + np.polyval(first_derivative, x) ** 2) ** (3 / 2)
- def polynomial_fit(self, x_window: np.ndarray, y_window: np.ndarray) -> Tuple[np.ndarray, np.poly1d]:
- """
- 对给定的窗口数据进行多项式拟合。
- 参数:
- x_window: 窗口内的x数据。
- y_window: 窗口内的y数据。
- 返回:
- 多项式系数和多项式对象。
- """
- coefficients = np.polyfit(x_window, y_window, self.degree)
- return coefficients, np.poly1d(coefficients)
- def find_best_window(self, point: Tuple[float, float], window_size: int) -> int:
- """
- 找到最佳窗口的起始索引。
- 参数:
- point: 目标点的坐标。
- window_size: 窗口大小。
- 返回:
- 最佳窗口的起始索引。
- """
- x1, y1 = point
- window_means = np.array([
- (np.mean(self.x_data[start:start + window_size]), np.mean(self.y_data[start:start + window_size]))
- for start in range(len(self.x_data) - window_size + 1)
- ])
- distances = np.sqrt((x1 - window_means[:, 0]) ** 2 + (y1 - window_means[:, 1]) ** 2)
- return np.argmin(distances)
- def find_projection(self, x_point: float, y_point: float, polynomial: np.poly1d, x_data_range: Tuple[float, float], search_step: float = 0.0001) -> Tuple[float, float, float]:
- """
- 找到目标点在多项式曲线上的投影点。
- 参数:
- x_point: 目标点的x坐标。
- y_point: 目标点的y坐标。
- polynomial: 多项式对象。
- x_data_range: x的取值范围。
- search_step: 搜索步长。
- 返回:
- 投影点的坐标和最小距离。
- """
- x_values = np.arange(x_data_range[0], x_data_range[1], search_step)
- y_values = polynomial(x_values)
- distances = np.sqrt((x_point - x_values) ** 2 + (y_point - y_values) ** 2)
- min_idx = np.argmin(distances)
- return x_values[min_idx], y_values[min_idx], distances[min_idx]
- def fit_and_project(self, points: List[Tuple[float, float]], window_size: int) -> List[Dict]:
- """
- 对每个点进行多项式拟合和投影计算。
- 参数:
- points: 目标点列表。
- window_size: 窗口大小。
- 返回:
- 包含投影点、曲率、曲率变化和最小距离的字典列表。
- """
- results = []
- for point in points:
- best_start = self.find_best_window(point, window_size)
- x_window = self.x_data[best_start:best_start + window_size]
- y_window = self.y_data[best_start:best_start + window_size]
- coefficients, polynomial = self.polynomial_fit(x_window, y_window)
- proj_x, proj_y, min_distance = self.find_projection(point[0], point[1], polynomial, (min(x_window), max(x_window)))
- curvature_value = self.curvature(coefficients, proj_x)
- second_derivative_coefficients = np.polyder(np.polyder(coefficients))
- curvature_change_value = np.polyval(second_derivative_coefficients, proj_x)
- results.append({
- 'projection': (proj_x, proj_y),
- 'curvHor': curvature_value,
- 'curvHorDot': curvature_change_value,
- 'coefficients': coefficients,
- 'laneOffset': min_distance
- })
- return results
-
- if __name__ == "__main__":
- data_path = "/home/kevin/kevin/zhaoyuan/zhaoyuan/data/raw/data/LaneMap.csv"
- point_path = "/home/kevin/kevin/zhaoyuan/zhaoyuan/data/raw/data/EgoMap.csv"
- points_data = pd.read_csv(point_path)
- points = points_data[['posX', 'posY']].values
- window_size = 4
- fitting_instance = PolynomialCurvatureFitting(data_path)
- projections = fitting_instance.fit_and_project(points, window_size)
- fitting_instance.plot_results(points, projections)
|