import json from typing import List, Dict, Tuple import numpy as np import pandas as pd def dict2json(data_dict: Dict, file_path: str) -> None: """ 将字典转换为JSON格式并保存到文件中。 参数: data_dict (dict): 要转换的字典。 file_path (str): 保存JSON文件的路径。 """ try: with open(file_path, "w", encoding="utf-8") as json_file: json.dump(data_dict, json_file, ensure_ascii=False, indent=4) print(f"JSON文件已保存到 {file_path}") except Exception as e: print(f"保存JSON文件时出错: {e}") def get_interpolation(x: float, point1: Tuple[float, float], point2: Tuple[float, float]) -> float: """ 根据两个点确定一元一次方程,并在定义域内求解。 参数: x: 自变量的值。 point1: 第一个点的坐标。 point2: 第二个点的坐标。 返回: y: 因变量的值。 """ try: k = (point1[1] - point2[1]) / (point1[0] - point2[0]) b = (point1[0] * point2[1] - point1[1] * point2[0]) / (point1[0] - point2[0]) return x * k + b except Exception as e: return f"Error: {str(e)}" def get_frame_with_time(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: """ 将两个DataFrame按照时间列进行合并,并返回结果。 参数: df1: 包含start_time和end_time的DataFrame。 df2: 包含simTime和simFrame的DataFrame。 返回: 合并后的DataFrame。 """ df1_start = df1.merge(df2[["simTime", "simFrame"]], left_on="start_time", right_on="simTime") df1_start = df1_start[["start_time", "simFrame"]].rename(columns={"simFrame": "start_frame"}) df1_end = df1.merge(df2[["simTime", "simFrame"]], left_on="end_time", right_on="simTime") df1_end = df1_end[["end_time", "simFrame"]].rename(columns={"simFrame": "end_frame"}) return pd.concat([df1_start, df1_end], axis=1) class PolynomialCurvatureFitting: def __init__(self, data_path: str, degree: int = 3): self.data_path = data_path self.degree = degree self.data = pd.read_csv(self.data_path) self.points = self.data[['centerLine_x', 'centerLine_y']].values self.x_data, self.y_data = self.points[:, 0], self.points[:, 1] def curvature(self, coefficients: np.ndarray, x: float) -> float: """ 计算多项式在x处的曲率。 参数: coefficients: 多项式系数。 x: 自变量的值。 返回: 曲率值。 """ first_derivative = np.polyder(coefficients) second_derivative = np.polyder(first_derivative) return np.abs(np.polyval(second_derivative, x)) / (1 + np.polyval(first_derivative, x) ** 2) ** (3 / 2) def polynomial_fit(self, x_window: np.ndarray, y_window: np.ndarray) -> Tuple[np.ndarray, np.poly1d]: """ 对给定的窗口数据进行多项式拟合。 参数: x_window: 窗口内的x数据。 y_window: 窗口内的y数据。 返回: 多项式系数和多项式对象。 """ coefficients = np.polyfit(x_window, y_window, self.degree) return coefficients, np.poly1d(coefficients) def find_best_window(self, point: Tuple[float, float], window_size: int) -> int: """ 找到最佳窗口的起始索引。 参数: point: 目标点的坐标。 window_size: 窗口大小。 返回: 最佳窗口的起始索引。 """ x1, y1 = point window_means = np.array([ (np.mean(self.x_data[start:start + window_size]), np.mean(self.y_data[start:start + window_size])) for start in range(len(self.x_data) - window_size + 1) ]) distances = np.sqrt((x1 - window_means[:, 0]) ** 2 + (y1 - window_means[:, 1]) ** 2) return np.argmin(distances) def find_projection(self, x_point: float, y_point: float, polynomial: np.poly1d, x_data_range: Tuple[float, float], search_step: float = 0.0001) -> Tuple[float, float, float]: """ 找到目标点在多项式曲线上的投影点。 参数: x_point: 目标点的x坐标。 y_point: 目标点的y坐标。 polynomial: 多项式对象。 x_data_range: x的取值范围。 search_step: 搜索步长。 返回: 投影点的坐标和最小距离。 """ x_values = np.arange(x_data_range[0], x_data_range[1], search_step) y_values = polynomial(x_values) distances = np.sqrt((x_point - x_values) ** 2 + (y_point - y_values) ** 2) min_idx = np.argmin(distances) return x_values[min_idx], y_values[min_idx], distances[min_idx] def fit_and_project(self, points: List[Tuple[float, float]], window_size: int) -> List[Dict]: """ 对每个点进行多项式拟合和投影计算。 参数: points: 目标点列表。 window_size: 窗口大小。 返回: 包含投影点、曲率、曲率变化和最小距离的字典列表。 """ results = [] for point in points: best_start = self.find_best_window(point, window_size) x_window = self.x_data[best_start:best_start + window_size] y_window = self.y_data[best_start:best_start + window_size] coefficients, polynomial = self.polynomial_fit(x_window, y_window) proj_x, proj_y, min_distance = self.find_projection(point[0], point[1], polynomial, (min(x_window), max(x_window))) curvature_value = self.curvature(coefficients, proj_x) second_derivative_coefficients = np.polyder(np.polyder(coefficients)) curvature_change_value = np.polyval(second_derivative_coefficients, proj_x) results.append({ 'projection': (proj_x, proj_y), 'curvHor': curvature_value, 'curvHorDot': curvature_change_value, 'coefficients': coefficients, 'laneOffset': min_distance }) return results if __name__ == "__main__": data_path = "/home/kevin/kevin/zhaoyuan/zhaoyuan/data/raw/data/LaneMap.csv" point_path = "/home/kevin/kevin/zhaoyuan/zhaoyuan/data/raw/data/EgoMap.csv" points_data = pd.read_csv(point_path) points = points_data[['posX', 'posY']].values window_size = 4 fitting_instance = PolynomialCurvatureFitting(data_path) projections = fitting_instance.fit_and_project(points, window_size) fitting_instance.plot_results(points, projections)