@@ -171,12 +171,18 @@ def create_scenario_description(
171171 return info_dict
172172
173173def create_digitaltwin_info_central (video_scene : VideoScene , args = None ):
174- return_dict = {}
174+ """Process one video scene and save results to individual pickle files."""
175+ processed_scenarios = []
175176 nuplan_root_path = args .nuplan_root_path
176177 nuplan_db_path = args .nuplan_db_path
177178 nuplan_map_version = args .nuplan_map_version
178179 nuplan_map_root = args .nuplan_map_root
179180 openscene_dataroot = args .openscene_dataroot
181+ out_dir = args .out_dir
182+
183+ # Create chunks directory for individual scenario files
184+ chunks_dir = os .path .join (out_dir , "chunks" )
185+ os .makedirs (chunks_dir , exist_ok = True )
180186
181187 video_scene_dict = video_scene .video_scene_dict
182188 digitaltwin_config = video_scene .config
@@ -212,12 +218,17 @@ def create_digitaltwin_info_central(video_scene: VideoScene, args=None):
212218 map_location = map_location ,
213219 video_name = current_video_name ,
214220 )
215- return_dict [current_video_name ] = info_dict
221+ # Save individual scenario to avoid large pipe transfers
222+ chunk_file = os .path .join (chunks_dir , f"{ current_video_name } .pkl" )
223+ with open (chunk_file , "wb" ) as f :
224+ pickle .dump ({current_video_name : info_dict }, f , protocol = pickle .HIGHEST_PROTOCOL )
225+ processed_scenarios .append (current_video_name )
216226 except KeyError as e :
217227 print (f"{ current_video_name } failed due to mismatch OpenScene info" )
218228 continue
219229
220- return return_dict
230+ # Return only scenario names, not the full data
231+ return processed_scenarios
221232
222233def parse_args ():
223234 parser = argparse .ArgumentParser (description = "Create world engine pkl from Digital Twin config." )
@@ -302,21 +313,42 @@ def parse_args():
302313 # DEBUG: single process
303314 # all_scenarios = {}
304315 # for video_scene in filtered_video_scenes:
305- # return_dict = create_digitaltwin_info_central(video_scene, args)
306- # all_scenarios.update(return_dict )
316+ # scenario_names = create_digitaltwin_info_central(video_scene, args)
317+ # print(f"Processed {len(scenario_names)} scenarios" )
307318
308- all_scenarios = {}
319+ # Create chunks directory
320+ chunks_dir = os .path .join (out_dir , "chunks" )
321+ os .makedirs (chunks_dir , exist_ok = True )
322+
323+ # Process with multiprocessing - workers save directly to disk
324+ all_scenario_names = []
309325 with Pool (processes = args .num_processes ) as pool :
310326 # Use tqdm to show progress
311- for return_dict in tqdm (
327+ for scenario_names in tqdm (
312328 pool .imap_unordered (partial (create_digitaltwin_info_central , args = args ), filtered_video_scenes ),
313329 total = len (filtered_video_scenes ),
314330 desc = "Processing Video Scenes"
315331 ):
316- all_scenarios .update (return_dict )
332+ all_scenario_names .extend (scenario_names )
333+
334+ print (f"\n Total scenarios processed: { len (all_scenario_names )} " )
335+ print (f"Merging { len (all_scenario_names )} chunk files from { chunks_dir } " )
336+
337+ # Merge all chunk files into final pickle
338+ all_scenarios = {}
339+ for scenario_name in tqdm (all_scenario_names , desc = "Merging chunks" ):
340+ chunk_file = os .path .join (chunks_dir , f"{ scenario_name } .pkl" )
341+ if os .path .exists (chunk_file ):
342+ with open (chunk_file , "rb" ) as f :
343+ chunk_data = pickle .load (f )
344+ all_scenarios .update (chunk_data )
345+ else :
346+ print (f"Warning: chunk file not found: { chunk_file } " )
317347
318348 pkl_file_path = f"{ args .out_dir } /all_scenarios.pkl"
319- print (f"Saving to { pkl_file_path } " )
349+ print (f"Saving final result to { pkl_file_path } " )
320350 os .makedirs (args .out_dir , exist_ok = True )
321351 with open (pkl_file_path , "wb" ) as f :
322352 pickle .dump (dict (all_scenarios ), f , protocol = pickle .HIGHEST_PROTOCOL )
353+
354+ print (f"Done! Final pkl contains { len (all_scenarios )} scenarios" )
0 commit comments