This document provides a comprehensive overview of ScenarioMax's architecture, design patterns, and implementation details.
- Overview
- Two-Stage Pipeline Architecture
- Core Components
- Data Flow
- Parallel Processing
- Memory Management
- Error Handling
- Extension Points
ScenarioMax is built around a two-stage pipeline architecture that provides flexibility, maintainability, and performance for processing autonomous driving datasets. The architecture separates concerns between dataset-specific parsing and target format generation, enabling easy extensibility and reuse.
- Separation of Concerns: Clear boundaries between dataset parsing, format conversion, and processing orchestration
- Extensibility: Plugin-based architecture for datasets and output formats
- Performance: Parallel processing with memory optimization
- Reliability: Comprehensive error handling and validation
- Maintainability: Clean abstractions and consistent patterns
┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Raw Data │───▶│ Unified Format │───▶│ Target Format │
│ │ │ │ │ │
│ • Waymo │ │ • Standardized │ │ • TFRecord │
│ • nuScenes │ │ • Validated │ │ • GPUDrive JSON │
│ • nuPlan │ │ • Enhanced │ │ • Custom │
│ • OpenScenes│ │ • Python Native │ │ │
└─────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
[Raw-to-Unified] [Enhancement] [Unified-to-Target]
Extractors (Optional) Converters
Purpose: Convert dataset-specific formats to a standardized representation
Components:
- Dataset-specific extractors (
raw_to_unified/datasets/) - Data validation and sanitization
- Coordinate system normalization
- Type mapping and standardization
Output: Python dictionaries following the UnifiedScenario schema
Purpose: Convert unified format to training/simulation-ready formats
Components:
- Target format converters (
unified_to_tfexample/,unified_to_gpudrive/) - Format-specific optimizations
- Serialization and compression
- Output file management
Output: Format-specific files (TFRecord, JSON, etc.)
Purpose: Apply transformations, filtering, or augmentation to scenarios
Features:
- Scenario filtering based on criteria
- Data augmentation (planned)
- Custom transformations
- Quality validation
The central coordinator that manages the entire conversion process.
def convert_dataset(
source_type: str, # "raw" or "pickle"
source_paths: dict, # Dataset paths
target_format: str, # Output format
output_path: str, # Destination
enhancement: bool = False, # Enable enhancement
stream_mode: bool = False, # Memory-efficient processing
num_workers: int = 8, # Parallel workers
**kwargs
) -> dictKey Responsibilities:
- Coordinate multi-dataset processing
- Manage processing modes (batch vs. streaming)
- Handle temporary file cleanup
- Aggregate processing statistics
Dynamic configuration system for supported datasets.
@dataclass
class DatasetConfig:
name: str # Dataset identifier
version: str # Dataset version
load_func: Callable # Scenario loading function
convert_func: Callable # Conversion function
preprocess_func: Callable # Optional preprocessing
additional_args: dict # Extra configurationPattern: Registry pattern with lazy loading to avoid circular imports
Handles parallel scenario processing with memory management.
Architecture:
Main Process
│
├── Worker 0 ── [Scenarios 0-99] ── Output Dir 0/
├── Worker 1 ── [Scenarios 100-199] ── Output Dir 1/
├── Worker 2 ── [Scenarios 200-299] ── Output Dir 2/
└── Worker N ── [Scenarios N*100-...] ── Output Dir N/
Post-Processing: Merge worker outputs into final format
Features:
- Joblib-based parallel processing
- Per-worker memory monitoring
- Automatic workload distribution
- Worker isolation and error handling
Converts unified scenarios to TensorFlow TFRecord format.
Structure:
unified_to_tfexample/
├── convert_to_tfexample.py # Main conversion logic
├── converter/
│ ├── state.py # Agent state conversion
│ ├── roadgraph.py # Map element conversion
│ ├── traffic_light.py # Traffic light conversion
│ └── utils.py # Conversion utilities
├── postprocess.py # Worker coordination
├── shard_tfexample.py # File sharding
└── exceptions.py # Format-specific exceptions
Converts unified scenarios to GPUDrive JSON format.
Structure:
unified_to_gpudrive/
├── convert_to_json.py # Main conversion logic
├── converter/
│ ├── state.py # State conversion
│ ├── roadgraph.py # Roadgraph conversion
│ └── traffic_light.py # Traffic light conversion
├── postprocess.py # Output management
└── utils.py # Helper functions
graph TD
A[Raw Dataset Files] --> B[Dataset Loader]
B --> C[Raw Scenarios]
C --> D[Preprocessing]
D --> E[Parallel Workers]
E --> F[Scenario Conversion]
F --> G[Validation]
G --> H{Enhancement?}
H -->|Yes| I[Enhancement Engine]
H -->|No| J[Format Converter]
I --> J
J --> K[Output Serialization]
K --> L[Worker Output Files]
L --> M[Post-Processing]
M --> N[Final Output Files]
Each dataset has specific extraction logic:
Waymo:
def convert_waymo_scenario(scenario, version):
return {
ScenarioDescription.ID: extract_scenario_id(scenario),
ScenarioDescription.METADATA: extract_metadata(scenario),
ScenarioDescription.DYNAMIC_AGENTS: extract_agents(scenario),
ScenarioDescription.MAP_ELEMENTS: extract_map(scenario),
ScenarioDescription.TIMESTEPS: extract_timesteps(scenario),
# ... other fields
}nuScenes:
def convert_nuscenes_scenario(scenario, version):
# Different extraction logic but same output schema
return unified_scenario_dictThe unified format follows a consistent structure:
{
"id": "scenario_unique_identifier",
"version": "dataset_version",
"metadata": {
"dataset_name": "waymo|nuscenes|nuplan",
"dataset_version": "version_string",
"source_file": "original_file_path"
},
"timesteps": [0.0, 0.1, 0.2, ...], # Time array
"dynamic_agents": {
"agent_id": {
"type": "vehicle|pedestrian|cyclist",
"position": [[x, y], ...], # Per timestep
"heading": [theta, ...], # Per timestep
"velocity": [[vx, vy], ...], # Per timestep
"valid": [True, False, ...], # Per timestep
"size": [length, width, height] # Static
}
},
"map_elements": {
"element_id": {
"type": "lane|crosswalk|stop_sign|...",
"geometry": [[x, y], ...], # Polyline/polygon
"properties": {...} # Element-specific data
}
},
"dynamic_map_elements": {
"traffic_light_id": {
"type": "traffic_light",
"states": ["red", "green", ...], # Per timestep
"position": [x, y],
"controlled_lanes": ["lane_id1", ...]
}
}
}TFExample: Converts to TensorFlow features
features = {
"state/x": tf.train.Feature(float_list=...),
"state/y": tf.train.Feature(float_list=...),
"roadgraph_samples/xyz": tf.train.Feature(float_list=...),
# ... many more features
}GPUDrive JSON: Converts to simulation format
{
"scenario_id": "...",
"agents": [{
"position": [...],
"heading": [...],
"type": "vehicle"
}],
"map": {
"lanes": [...],
"intersections": [...]
}
}ScenarioMax uses a process-based parallel processing model with joblib:
# Main process distributes work
worker_args = distribute_scenarios(scenarios, num_workers)
# Parallel execution
results = Parallel(n_jobs=num_workers, backend="multiprocessing")(
delayed(process_worker)(args) for args in worker_args
)Each worker operates independently:
- Separate output directories:
output/worker_0/,output/worker_1/, etc. - Memory tracking: Per-worker memory monitoring
- Error isolation: Worker failures don't crash other workers
- Progress tracking: Individual progress bars per worker
Scenarios are distributed across workers using several strategies:
- Round-robin: Simple distribution for uniform scenarios
- Size-based: Distribute based on scenario complexity (planned)
- Dynamic: Reassign work from failed workers (planned)
def process_memory():
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return {
'rss': mem_info.rss / 1024 / 1024, # MB
'vms': mem_info.vms / 1024 / 1024, # MB
'percent': process.memory_percent() # %
}- Generator-based processing: Stream data instead of loading all in memory
- Preprocessing functions: Dataset-specific memory optimization
- Configurable worker limits: Adjust based on available RAM
- Garbage collection: Explicit memory cleanup between scenarios
def preprocess_waymo_scenarios(files, worker_index):
"""Generator-based processing for memory efficiency."""
logger.info(f"Worker {worker_index} processing {len(files)} files")
for file in files:
# Process one file at a time to minimize memory usage
scenarios = load_waymo_file(file)
for scenario in scenarios:
yield scenario
# File data is garbage collected hereScenarioMax implements a comprehensive exception hierarchy:
ScenarioMaxError (base)
├── DatasetError
│ ├── UnsupportedDatasetError
│ ├── DatasetLoadError
│ └── EmptyDatasetError
├── ConversionError
│ ├── UnsupportedFormatError
│ ├── ScenarioConversionError
│ └── WorkerProcessingError
├── ValidationError
│ ├── InvalidScenarioDataError
│ ├── OverpassException
│ └── NotEnoughValidObjectsException
└── ConfigurationError
├── MissingEnvironmentVariableError
└── InvalidConfigurationError- Scenario-level: Skip invalid scenarios, continue processing
- Worker-level: Restart failed workers with remaining work
- Dataset-level: Continue with other datasets if one fails
- Pipeline-level: Clean up resources and provide detailed error reports
def validate_scenario(scenario: dict) -> None:
"""Multi-level validation."""
# Schema validation
validate_required_fields(scenario)
# Data integrity validation
validate_timestep_consistency(scenario)
validate_agent_trajectories(scenario)
# Dataset-specific validation
if scenario['metadata']['dataset_name'] == 'waymo':
validate_waymo_specific_fields(scenario)-
Create dataset module:
raw_to_unified/datasets/new_dataset/ ├── __init__.py ├── load.py # get_new_dataset_scenarios() ├── extractor.py # convert_new_dataset_scenario() ├── types.py # Dataset-specific type mappings └── utils.py # Helper functions -
Register in dataset registry:
if dataset_name == "new_dataset": from scenariomax.raw_to_unified.datasets import new_dataset return DatasetConfig( name="new_dataset", version="1.0", load_func=new_dataset.get_new_dataset_scenarios, convert_func=new_dataset.convert_new_dataset_scenario )
-
Follow the interface:
def get_new_dataset_scenarios(data_path: str, **kwargs) -> list: """Load scenarios from dataset.""" pass def convert_new_dataset_scenario(scenario: Any, version: str) -> dict: """Convert to unified format.""" pass
-
Create converter module:
unified_to_new_format/ ├── __init__.py ├── convert_to_new_format.py # Main conversion ├── postprocess.py # Worker coordination ├── converter/ # Component converters └── utils.py # Helper functions -
Implement conversion function:
def convert_to_new_format(scenario: dict) -> Any: """Convert unified scenario to new format.""" pass
-
Add to pipeline registry:
def _get_target_postprocess_func(target_format: str): if target_format == "new_format": from scenariomax.unified_to_new_format import postprocess return postprocess.postprocess_new_format
def custom_enhancement(unified_scenario: dict) -> dict:
"""Apply custom transformations to scenarios."""
# Add noise to trajectories
if random.random() < 0.1:
add_trajectory_noise(unified_scenario)
# Filter scenarios by criteria
if not meets_criteria(unified_scenario):
raise SkipScenarioException("Does not meet criteria")
return unified_scenario
# Register enhancement
from scenariomax.enhancement import register_enhancement
register_enhancement("custom", custom_enhancement)-
I/O Bound Operations:
- Use SSD storage for better random access
- Implement read-ahead caching for sequential access
- Compress intermediate files to reduce disk usage
-
CPU Bound Operations:
- Optimize number of workers based on CPU cores
- Use vectorized operations (NumPy) where possible
- Profile conversion functions for optimization opportunities
-
Memory Bound Operations:
- Use generator-based processing for large datasets
- Implement smart caching with LRU eviction
- Monitor and limit per-worker memory usage
- Horizontal Scaling: Run multiple instances across machines
- Vertical Scaling: Optimize single-machine performance
- Cloud Scaling: Use containerized workers in cloud environments
This architecture provides a solid foundation for processing autonomous driving datasets at scale while maintaining flexibility for future extensions and optimizations.