Bladeren bron

修改lst双车场景下数据预处理部分处理逻辑

XGJ_zhaoyuan 1 maand geleden
bovenliggende
commit
3e3c18b48d
3 gewijzigde bestanden met toevoegingen van 307 en 283 verwijderingen
  1. 28 30
      core/optimized_processor.py
  2. 243 218
      core/processors/built_in/lst.py
  3. 36 35
      run.py

+ 28 - 30
core/optimized_processor.py

@@ -7,15 +7,14 @@ from .processors.built_in.lst import ZipCSVProcessor, RosbagProcessor, Config
 from core.processors.built_in.lst import data_precheck, run_cpp_engine, FinalDataProcessor
 
 
-
 def process_lst_data(
-    zip_data_path: Path,
-    output_base_dir: Path,
-    trafficlight_json_path: Optional[Path] = None,
-    utm_zone: int = 51,
-    x_offset: float = 0.0,
-    y_offset: float = 0.0,
-    continue_to_iterate: bool = False,
+        zip_data_path: Path,
+        output_base_dir: Path,
+        trafficlight_json_path: Optional[Path] = None,
+        utm_zone: int = 51,
+        x_offset: float = 0.0,
+        y_offset: float = 0.0,
+        continue_to_iterate: bool = False,
 ) -> Optional[Path]:
     """
     Processes LST data using an optimized pipeline.
@@ -33,12 +32,12 @@ def process_lst_data(
         Path to the final merged_ObjState.csv file if successful, None otherwise
     """
     print(f"Starting LST data processing for: {zip_data_path.name}")
-    
+
     # Validate input paths
     if not zip_data_path.exists():
         print(f"Error: Input ZIP file not found: {zip_data_path}")
         return None
-        
+
     if not trafficlight_json_path:
         print(f"Warning: Traffic light JSON file not found: {trafficlight_json_path}")
         trafficlight_json_path = None
@@ -77,13 +76,12 @@ def process_lst_data(
         # Final processing of built-in data
         print("Processing and merging built-in data...")
         final_processor = FinalDataProcessor(config)
-        
+
         if not final_processor.process():
             raise RuntimeError("Final data processing failed")
 
         final_csv_path = config.output_dir / "merged_ObjState.csv"
 
-        
         return final_csv_path
 
     except Exception as e:
@@ -93,45 +91,45 @@ def process_lst_data(
 
 
 def process_pgvil_data(
-    zip_data_path: Path,
-    output_base_dir: Path,
-    utm_zone: int = 51,
-    x_offset: float = 0.0,
-    y_offset: float = 0.0
+        zip_data_path: Path,
+        output_base_dir: Path,
+        utm_zone: int = 51,
+        x_offset: float = 0.0,
+        y_offset: float = 0.0
 ) -> Optional[Path]:
     """处理PGVIL数据
-    
+
     Args:
         zip_data_path: ZIP数据文件路径
         output_base_dir: 输出基础目录
         utm_zone: UTM坐标系区域
         x_offset: X坐标偏移量
         y_offset: Y坐标偏移量
-        
+
     Returns:
         Optional[Path]: 处理后的CSV文件路径,处理失败则返回None
     """
     try:
         # 确保输出目录存在
         output_base_dir.mkdir(parents=True, exist_ok=True)
-        
+
         # 解压ZIP文件
         if not extract_zip_file(zip_data_path, output_base_dir):
             return None
-            
+
         # 查找所有PGVIL数据文件
         pgvil_files = []
         for root, _, files in os.walk(output_base_dir):
             for file in files:
                 if file.lower().endswith(('.csv', '.json')):
                     pgvil_files.append(Path(root) / file)
-                    
+
         if not pgvil_files:
             print(f"在 {output_base_dir} 中未找到PGVIL数据文件")
             return None
-            
+
         print(f"找到 {len(pgvil_files)} 个PGVIL数据文件")
-        
+
         # 处理所有PGVIL文件
         all_data = []
         for pgvil_file in pgvil_files:
@@ -143,31 +141,31 @@ def process_pgvil_data(
                     with open(pgvil_file, 'r') as f:
                         data = json.load(f)
                     df = pd.DataFrame(data)
-                    
+
                 # 确保必要的列存在
                 required_cols = ['simTime', 'simFrame', 'playerId']
                 for col in required_cols:
                     if col not in df.columns:
                         df[col] = 0  # 添加默认值
-                        
+
                 all_data.append(df)
                 print(f"成功处理文件: {pgvil_file}")
             except Exception as e:
                 print(f"处理文件 {pgvil_file} 时出错: {e}")
-        
+
         if not all_data:
             print("没有成功处理任何PGVIL文件")
             return None
-            
+
         # 合并所有数据
         combined_df = pd.concat(all_data, ignore_index=True)
-        
+
         # 保存处理后的数据
         output_path = output_base_dir / "processed_pgvil_data.csv"
         combined_df.to_csv(output_path, index=False)
         print(f"成功处理所有PGVIL数据,结果保存到: {output_path}")
         return output_path
-        
+
     except Exception as e:
         print(f"处理PGVIL数据时出错: {e}")
         import traceback

File diff suppressed because it is too large
+ 243 - 218
core/processors/built_in/lst.py


+ 36 - 35
run.py

@@ -30,12 +30,13 @@ def parse_arguments():
         default='lst',
         help='要处理的数据类型 (lst 或 pgvil)'
     )
-    
+
     # 定义参数
     parser.add_argument(
         '--zip-path',
         type=Path,
-        default=Path('V2I_CSAE53-2020_HazardousLocationW_LST_02-01.zip'),
+        # default=Path('/home/server/桌面/XGJ/dataprocess/V2V_CSAE53-2020_ForwardCollision_LST_02-01.zip'),
+        default=Path('/home/server/桌面/XGJ/dataprocess/V2I_CSAE53-2020_HazardousLocationW_LST_02-01.zip'),
         help='输入的ZIP数据文件路径'
     )
 
@@ -49,7 +50,7 @@ def parse_arguments():
     parser.add_argument(
         '--output-dir',
         type=Path,
-        default=Path('output'),
+        default=Path('./data_zhaoyuan3/'),
         help='输出目录的基础路径'
     )
 
@@ -66,14 +67,14 @@ def parse_arguments():
         default=0.0,
         help='X坐标偏移量'
     )
-                       
+
     parser.add_argument(
         '--y-offset',
         type=float,
         default=0.0,
         help='Y坐标偏移量'
     )
-                       
+
     parser.add_argument(
         '--config',
         type=Path,
@@ -87,7 +88,7 @@ def parse_arguments():
         default=Path('plugins'),
         help='插件目录路径'
     )
-                       
+
     parser.add_argument(
         '--resources-dir',
         type=Path,
@@ -100,27 +101,27 @@ def parse_arguments():
         action='store_true',
         help='启用并行处理'
     )
-                       
+
     parser.add_argument(
         '--no-parallel',
         action='store_true',
         help='禁用并行处理'
     )
-                       
+
     parser.add_argument(
         '--max-workers',
         type=int,
         default=None,
         help='并行处理的最大工作线程数'
     )
-                       
+
     parser.add_argument(
         '--batch-size',
         type=int,
         default=10000,
         help='处理大数据集时的批处理大小'
     )
-                       
+
     parser.add_argument(
         '--log-level',
         type=str,
@@ -128,14 +129,14 @@ def parse_arguments():
         default='INFO',
         help='日志级别'
     )
-                       
+
     parser.add_argument(
         '--log-dir',
         type=Path,
         default=Path('logs'),
         help='日志文件目录'
     )
-                       
+
     parser.add_argument(
         '--no-log-file',
         action='store_true',
@@ -151,17 +152,17 @@ def setup_config(args):
     zip_name = args.zip_path.stem
     output_dir = args.output_dir / zip_name
     output_dir.mkdir(parents=True, exist_ok=True)
-    
+
     print(f"输出目录: {output_dir}")
 
     # 加载配置
     config = load_config(args.config)
-    
+
     # 更新配置中的输出目录为包含zip名称的子目录
     config['paths']['output_dir'] = str(output_dir)
     config['paths']['data_dir'] = str(output_dir)  # 确保数据也输出到同一目录
     config['paths']['temp_dir'] = str(output_dir)  # 确保临时文件也在同一目录
-    
+
     # 使用命令行参数覆盖配置文件中的设置
     if args.use_parallel and args.no_parallel:
         print("警告: 同时指定了 --use-parallel 和 --no-parallel,将使用 --use-parallel")
@@ -170,30 +171,30 @@ def setup_config(args):
         config['processing']['use_parallel'] = True
     elif args.no_parallel:
         config['processing']['use_parallel'] = False
-        
+
     if args.max_workers is not None:
         config['processing']['max_workers'] = args.max_workers
-        
+
     if args.batch_size != 10000:  # 不等于默认值
         config['processing']['batch_size'] = args.batch_size
-        
+
     # 更新日志配置
     config['logging']['level'] = args.log_level
     config['logging']['log_dir'] = str(args.log_dir)
     config['logging']['log_to_file'] = not args.no_log_file
-    
+
     # 更新坐标系配置
     config['coordinates']['utm_zone'] = args.utm_zone
     config['coordinates']['x_offset'] = args.x_offset
     config['coordinates']['y_offset'] = args.y_offset
-    
+
     # 更新路径配置
     config['paths']['plugins_dir'] = str(args.plugins_dir)
     config['paths']['resources_dir'] = str(args.resources_dir)
-    
+
     # 应用更新后的配置
     update_config(config)
-    
+
     return output_dir
 
 
@@ -206,7 +207,7 @@ def process_plugins(args, output_dir, final_csv_path):
     # 处理自定义数据
     print("处理并合并自定义数据...")
     folders = resource_manager.list_zip_folders(args.zip_path)
-    
+
     for folder in folders:
         plugin = plugin_manager.get_plugin_for_data(args.zip_path, folder)
         if not plugin:
@@ -220,22 +221,22 @@ def process_plugins(args, output_dir, final_csv_path):
             folder,
             output_dir
         )
-        
+
         if plugin_output is not None and not plugin_output.empty:
             output_file = output_dir / f"{folder}_processed.csv"
             print(f'插件输出文件: {output_file}')
             plugin_output.to_csv(output_file, index=False)
-            
+
             if not resource_manager.validate_plugin_output(output_file):
                 print(f"警告: 插件输出验证失败: {folder}")
                 continue
-                
+
             # 合并自定义数据与主数据文件
             print(f"合并 {folder} 数据...")
             if resource_manager.merge_plugin_data(
-                final_csv_path,
-                output_file,
-                final_csv_path
+                    final_csv_path,
+                    output_file,
+                    final_csv_path
             ):
                 print(f"成功合并 {folder} 数据")
             else:
@@ -251,16 +252,16 @@ def main():
     try:
         # 设置配置
         output_dir = setup_config(args)
-        
+
         print("开始数据处理流程")
         print(f"从以下位置加载配置: {args.config}")
-        
+
         # 根据数据类型选择处理流程
         if args.data_type == 'lst':
             final_csv_path = process_lst_data(
                 zip_data_path=args.zip_path,
                 output_base_dir=output_dir,
-                trafficlight_json_path=args.trafficlight_json, 
+                trafficlight_json_path=args.trafficlight_json,
                 utm_zone=args.utm_zone,
                 x_offset=args.x_offset,
                 y_offset=args.y_offset
@@ -276,13 +277,13 @@ def main():
         else:
             print(f"不支持的数据类型: {args.data_type}")
             sys.exit(1)
-            
+
         if not final_csv_path:
             print(f"{args.data_type}内置数据处理失败")
             sys.exit(1)
-            
+
         print(f"\n{args.data_type}内置处理流程成功完成!")
-        
+
         # 处理插件数据
         process_plugins(args, output_dir, final_csv_path)
 

Some files were not shown because too many files changed in this diff