|
@@ -1629,84 +1629,24 @@ class FinalDataProcessor:
|
|
|
|
|
|
# --- Rosbag Processing ---
|
|
|
class RosbagProcessor:
|
|
|
- """Extracts data from Rosbag files within a ZIP archive."""
|
|
|
-
|
|
|
- # Mapping from filename parts to rostopics
|
|
|
- ROSTOPIC_MAP = {
|
|
|
- ('V2I', 'HazardousLocationW'): "/HazardousLocationWarning",
|
|
|
- ('V2C', 'OtherVehicleRedLightViolationW'): "/c2v/GoThroughRadLight",
|
|
|
- ('V2I', 'LeftTurnAssist'): "/LeftTurnAssistant",
|
|
|
- ('V2V', 'LeftTurnAssist'): "/V2VLeftTurnAssistant",
|
|
|
- ('V2I', 'RedLightViolationW'): "/SignalViolationWarning",
|
|
|
- ('V2C', 'AbnormalVehicleW'): "/c2v/AbnormalVehicleWarnning",
|
|
|
- ('V2C', 'SignalLightReminder'): "/c2v/TrafficLightInfo",
|
|
|
- ('V2C', 'VulnerableRoadUserCollisionW'): "/c2v/VulnerableObject",
|
|
|
- ('V2C', 'EmergencyVehiclesPriority'): "/c2v/EmergencyVehiclesPriority",
|
|
|
- ('V2C', 'LitterW'): "/c2v/RoadSpillageWarning",
|
|
|
- ('V2V', 'ForwardCollision'): "/V2VForwardCollisionWarning",
|
|
|
- ('V2C', 'VisibilityW'): "/c2v/VisibilityWarinning",
|
|
|
- ('V2V', 'EmergencyBrakeW'): "/V2VEmergencyBrakeWarning",
|
|
|
- ('V2I', 'GreenLightOptimalSpeedAdvisory'): "/GreenLightOptimalSpeedAdvisory", # Check exact topic name
|
|
|
- ('V2C', 'DynamicSpeedLimitingInformation'): "/c2v/DynamicSpeedLimit",
|
|
|
- ('V2C', 'TrafficJamW'): "/c2v/TrafficJam",
|
|
|
- ('V2C', 'DrivingLaneRecommendation'): "/c2v/LaneGuidance",
|
|
|
- ('V2C', 'RampMerge'): "/c2v/RampMerging",
|
|
|
- ('V2I', 'CooperativeIntersectionPassing'): "/CooperativeIntersectionPassing",
|
|
|
- ('V2I', 'IntersectionCollisionW'): "/IntersectionCollisionWarning",
|
|
|
- ('V2V', 'IntersectionCollisionW'): "/V2VIntersectionCollisionWarning",
|
|
|
- ('V2V', 'BlindSpotW'): "/V2VBlindSpotWarning",
|
|
|
- ('V2I', 'SpeedLimitW'): "/SpeedLimit",
|
|
|
- ('V2I', 'VulnerableRoadUserCollisionW'): "/VulnerableRoadUserCollisionWarning",
|
|
|
- ('V2I', 'CooperativeLaneChange'): "/CooperativeLaneChange",
|
|
|
- ('V2V', 'CooperativeLaneChange'): "/V2VCooperativeLaneChange",
|
|
|
- ('V2I', 'CooperativeVehicleMerge'): "/CooperativeVehicleMerge",
|
|
|
- ('V2V', 'AbnormalVehicleW'): "/V2VAbnormalVehicleWarning",
|
|
|
- ('V2V', 'ControlLossW'): "/V2VVehicleLossControlWarning",
|
|
|
- ('V2V', 'EmergencyVehicleW'): '/V2VEmergencyVehicleWarning',
|
|
|
- ('V2I', 'InVehicleSignage'): "/InVehicleSign",
|
|
|
- ('V2V', 'DoNotPassW'): "/V2VDoNotPassWarning",
|
|
|
- ('V2I', 'TrafficJamW'): "/TrafficJamWarning",
|
|
|
- # Add more mappings as needed
|
|
|
- }
|
|
|
+ """Extracts data from HMIdata files within a ZIP archive."""
|
|
|
|
|
|
def __init__(self, config: Config):
|
|
|
self.config = config
|
|
|
self.output_dir = config.output_dir
|
|
|
|
|
|
- def _get_target_rostopic(self, zip_filename: str) -> Optional[str]:
|
|
|
- """Determines the target rostopic based on keywords in the filename."""
|
|
|
- for (kw1, kw2), topic in self.ROSTOPIC_MAP.items():
|
|
|
- if kw1 in zip_filename and kw2 in zip_filename:
|
|
|
- print(f"Identified target topic '{topic}' for {zip_filename}")
|
|
|
- return topic
|
|
|
- print(f"Warning: No specific rostopic mapping found for {zip_filename}.")
|
|
|
- return None
|
|
|
-
|
|
|
def process_zip_for_rosbags(self) -> None:
|
|
|
"""Finds, extracts, and processes rosbags from the ZIP file."""
|
|
|
- print(f"--- Processing Rosbags in {self.config.zip_path} ---")
|
|
|
- target_rostopic = self._get_target_rostopic(self.config.zip_path.stem)
|
|
|
- if not target_rostopic:
|
|
|
- print("Skipping Rosbag processing as no target topic was identified.")
|
|
|
+ print(f"--- Processing HMIdata in {self.config.zip_path} ---")
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir_str:
|
|
|
- tmp_dir = Path(tmp_dir_str)
|
|
|
- bag_files_extracted = []
|
|
|
try:
|
|
|
with zipfile.ZipFile(self.config.zip_path, 'r') as zip_ref:
|
|
|
for member in zip_ref.infolist():
|
|
|
- # Extract Rosbag files
|
|
|
- if 'Rosbag/' in member.filename and member.filename.endswith('.bag'):
|
|
|
- try:
|
|
|
- extracted_path = Path(zip_ref.extract(member, path=tmp_dir))
|
|
|
- bag_files_extracted.append(extracted_path)
|
|
|
- print(f"Extracted Rosbag: {extracted_path.name}")
|
|
|
- except Exception as e:
|
|
|
- print(f"Error extracting Rosbag {member.filename}: {e}")
|
|
|
|
|
|
# Extract HMIdata CSV files directly to output
|
|
|
- elif 'HMIdata/' in member.filename and member.filename.endswith('.csv'):
|
|
|
+ if 'HMIdata/' in member.filename and member.filename.endswith('.csv'):
|
|
|
try:
|
|
|
target_path = self.output_dir / Path(member.filename).name
|
|
|
with zip_ref.open(member) as source, open(target_path, "wb") as target:
|
|
@@ -1722,101 +1662,7 @@ class RosbagProcessor:
|
|
|
print(f"Error: ZIP file not found: {self.config.zip_path}")
|
|
|
return
|
|
|
|
|
|
- if not bag_files_extracted:
|
|
|
- print("No Rosbag files found in the archive.")
|
|
|
- # Attempt extraction of HMI/RDB anyway if needed (already done above)
|
|
|
- return
|
|
|
-
|
|
|
- # Process extracted bag files
|
|
|
- for bag_path in bag_files_extracted:
|
|
|
- print(f"Processing bag file: {bag_path.name}")
|
|
|
- self._convert_bag_topic_to_csv(bag_path, target_rostopic)
|
|
|
-
|
|
|
- print("--- Rosbag Processing Finished ---")
|
|
|
-
|
|
|
- def _convert_bag_topic_to_csv(self, bag_file_path: Path, target_topic: str) -> None:
|
|
|
- """Converts a specific topic from a single bag file to CSV."""
|
|
|
- output_csv_path = self.output_dir / OUTPUT_CSV_OBU # Standard name for OBU data
|
|
|
-
|
|
|
- try:
|
|
|
- # Check if bagpy can handle Path object, else convert to str
|
|
|
- bag_reader = bagreader(str(bag_file_path), verbose=False)
|
|
|
-
|
|
|
- # Check if topic exists
|
|
|
- available_topics = bag_reader.topic_table['Topics'].tolist() if hasattr(bag_reader,
|
|
|
- 'topic_table') and bag_reader.topic_table is not None else []
|
|
|
- if target_topic not in available_topics:
|
|
|
- print(f"Target topic '{target_topic}' not found in {bag_file_path.name}. Available: {available_topics}")
|
|
|
- # Clean up temporary bagpy-generated files if possible
|
|
|
- df = pd.DataFrame(columns=['simTime', 'event_Type'])
|
|
|
- if hasattr(bag_reader, 'data_folder') and Path(bag_reader.data_folder).exists():
|
|
|
- shutil.rmtree(bag_reader.data_folder, ignore_errors=True)
|
|
|
- else:
|
|
|
-
|
|
|
- # Extract message data to a temporary CSV created by bagpy
|
|
|
- temp_csv_path_str = bag_reader.message_by_topic(target_topic)
|
|
|
- temp_csv_path = Path(temp_csv_path_str)
|
|
|
-
|
|
|
- if not temp_csv_path.exists() or temp_csv_path.stat().st_size == 0:
|
|
|
- print(
|
|
|
- f"Warning: Bagpy generated an empty or non-existent CSV for topic '{target_topic}' from {bag_file_path.name}.")
|
|
|
- return # Skip if empty
|
|
|
-
|
|
|
- # Read the temporary CSV, process, and save to final location
|
|
|
- df = pd.read_csv(temp_csv_path)
|
|
|
-
|
|
|
- if df.empty:
|
|
|
- print(f"Warning: Bagpy CSV for topic '{target_topic}' is empty after reading.")
|
|
|
- return
|
|
|
-
|
|
|
- # Clean columns: Drop 'Time', rename '*timestamp' -> 'simTime'
|
|
|
- if 'Time' in df.columns:
|
|
|
- df.drop(columns=['Time'], inplace=True)
|
|
|
-
|
|
|
- rename_dict = {}
|
|
|
- for col in df.columns:
|
|
|
- if col.endswith('.timestamp'): # More specific match
|
|
|
- rename_dict[col] = 'simTime'
|
|
|
- elif col.endswith('event_type'): # As per original code
|
|
|
- rename_dict[col] = 'event_Type'
|
|
|
- # Add other renames if necessary
|
|
|
-
|
|
|
- df.rename(columns=rename_dict, inplace=True)
|
|
|
-
|
|
|
- # Ensure simTime is float and rounded (optional, do if needed for merging)
|
|
|
- if 'simTime' in df.columns:
|
|
|
- df['simTime'] = pd.to_numeric(df['simTime'], errors='coerce').round(2) # Example rounding
|
|
|
-
|
|
|
- # Save processed data
|
|
|
- df.to_csv(output_csv_path, index=False, float_format='%.6f')
|
|
|
- print(f"Saved processed OBU data to: {output_csv_path}")
|
|
|
-
|
|
|
- except ValueError as ve:
|
|
|
- # Catch potential Bagpy internal errors if topic doesn't contain messages
|
|
|
- print(
|
|
|
- f"ValueError processing bag {bag_file_path.name} (Topic: {target_topic}): {ve}. Topic might be empty.")
|
|
|
- except ImportError as ie:
|
|
|
- print(
|
|
|
- f"ImportError during bag processing: {ie}. Ensure all ROS dependencies are installed if needed by bagpy.")
|
|
|
- except Exception as e:
|
|
|
- print(f"Error processing bag file {bag_file_path.name} (Topic: {target_topic}): {e}")
|
|
|
- import traceback
|
|
|
- traceback.print_exc() # More details on unexpected errors
|
|
|
- finally:
|
|
|
- # Clean up temporary files/folders created by bagpy
|
|
|
- if 'temp_csv_path' in locals() and temp_csv_path.exists():
|
|
|
- try:
|
|
|
- temp_csv_path.unlink() # Delete the specific CSV
|
|
|
- except OSError as ose:
|
|
|
- print(f"Warning: Could not delete bagpy temp csv {temp_csv_path}: {ose}")
|
|
|
-
|
|
|
- if 'bag_reader' in locals() and hasattr(bag_reader, 'data_folder'):
|
|
|
- bagpy_folder = Path(bag_reader.data_folder)
|
|
|
- if bagpy_folder.exists() and bagpy_folder.is_dir():
|
|
|
- try:
|
|
|
- shutil.rmtree(bagpy_folder, ignore_errors=True) # Delete the folder bagpy made
|
|
|
- except OSError as ose:
|
|
|
- print(f"Warning: Could not delete bagpy temp folder {bagpy_folder}: {ose}")
|
|
|
+ print("--- HMIdata Processing Finished ---")
|
|
|
|
|
|
|
|
|
# --- Utility Functions ---
|