Coverage for openhcs/debug/pickle_to_python.py: 39.0%

562 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1#!/usr/bin/env python3 

2""" 

3Pickle to Python Converter - Convert OpenHCS debug pickle files to runnable Python scripts 

4""" 

5 

6import sys 

7import dill as pickle 

8import inspect 

9from pathlib import Path 

10from datetime import datetime 

11from collections import defaultdict 

12from enum import Enum 

13import dataclasses 

14from dataclasses import is_dataclass, fields 

15 

16from openhcs.core.steps.function_step import FunctionStep 

17 

18def collect_imports_from_data(data_obj): 

19 """Extract function, enum, and dataclass imports by traversing data structure.""" 

20 function_imports = defaultdict(set) 

21 enum_imports = defaultdict(set) 

22 decorated_functions = set() 

23 

24 def register_imports(obj): 

25 if isinstance(obj, Enum): 

26 enum_imports[obj.__class__.__module__].add(obj.__class__.__name__) 

27 elif is_dataclass(obj): 

28 module = obj.__class__.__module__ 

29 name = obj.__class__.__name__ 

30 function_imports[module].add(name) 

31 [register_imports(getattr(obj, f.name)) for f in fields(obj) if getattr(obj, f.name) is not None] 

32 elif callable(obj): 

33 # Skip bound methods (like step.process) - only import standalone functions 

34 if inspect.ismethod(obj): 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true

35 return 

36 if _is_external_registered_function(obj): 36 ↛ 38line 36 didn't jump to line 38 because the condition on line 36 was never true

37 # Use the actual module path but under openhcs namespace 

38 original_module = obj.__module__ 

39 # Convert original module to openhcs namespace: cucim.skimage.filters -> openhcs.cucim.skimage.filters 

40 virtual_module = f'openhcs.{original_module}' 

41 function_imports[virtual_module].add(obj.__name__) 

42 decorated_functions.add(obj.__name__) 

43 else: 

44 function_imports[obj.__module__].add(obj.__name__) 

45 elif isinstance(obj, (list, tuple)): 

46 [register_imports(item) for item in obj] 

47 elif isinstance(obj, dict): 

48 [register_imports(value) for value in obj.values()] 

49 elif hasattr(obj, '__dict__') and obj.__dict__: 

50 [register_imports(value) for value in obj.__dict__.values()] 

51 

52 register_imports(data_obj) 

53 return function_imports, enum_imports, decorated_functions 

54 

55 

56def _is_external_registered_function(func): 

57 """Check if function is an external library function registered with OpenHCS.""" 

58 # External functions have slice_by_slice but not full OpenHCS decorations 

59 return (hasattr(func, 'slice_by_slice') and 

60 not hasattr(func, '__processing_contract__') and 

61 not func.__module__.startswith('openhcs.')) 

62 

63 

64def _get_function_library_name(func): 

65 """Get the library name for an external registered function.""" 

66 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

67 

68 # Find the function in the registry to get its library name 

69 all_functions = RegistryService.get_all_functions_with_metadata() 

70 for func_name, metadata in all_functions.items(): 

71 if metadata.func is func: 

72 return metadata.registry.library_name 

73 

74 return None 

75 

76 

77def _create_openhcs_library_modules(): 

78 """ 

79 Create virtual modules that mirror external library structure under openhcs namespace. 

80 

81 This enables namespace-based distinction between raw and OpenHCS-wrapped functions: 

82 

83 - `from skimage.filters import gaussian` → Raw function (NOT pipeline-compatible) 

84 - `from openhcs.skimage.filters import gaussian` → Wrapped, tested, pipeline-ready 

85 

86 The virtual modules contain only functions that have been: 

87 1. Runtime tested for OpenHCS compatibility 

88 2. Characterized for memory types and GPU support 

89 3. Wrapped with appropriate OpenHCS decorators 

90 

91 This prevents accidental use of unwrapped functions in pipelines and makes 

92 import statements self-documenting about pipeline compatibility. 

93 """ 

94 import types 

95 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

96 

97 # Get all registered functions 

98 all_functions = RegistryService.get_all_functions_with_metadata() 

99 

100 # Group functions by their full module path 

101 functions_by_module = {} 

102 for func_name, metadata in all_functions.items(): 

103 if _is_external_registered_function(metadata.func): 

104 original_module = metadata.func.__module__ 

105 virtual_module = f'openhcs.{original_module}' 

106 if virtual_module not in functions_by_module: 

107 functions_by_module[virtual_module] = {} 

108 functions_by_module[virtual_module][metadata.func.__name__] = metadata.func 

109 

110 # Create virtual modules for each module path 

111 created_modules = [] 

112 for virtual_module, functions in functions_by_module.items(): 

113 if virtual_module not in sys.modules: 

114 module = types.ModuleType(virtual_module) 

115 module.__doc__ = f"Virtual module mirroring {virtual_module.replace('openhcs.', '')} with OpenHCS decorations" 

116 sys.modules[virtual_module] = module 

117 

118 # Add all functions from this module 

119 for func_name, func in functions.items(): 

120 setattr(module, func_name, func) 

121 

122 created_modules.append(virtual_module) 

123 

124 return created_modules 

125 

126def format_imports_as_strings(function_imports, enum_imports): 

127 """Convert import dictionaries to list of import strings with collision resolution.""" 

128 # Merge imports 

129 all_imports = function_imports.copy() 

130 for module, names in enum_imports.items(): 

131 all_imports.setdefault(module, set()).update(names) 

132 

133 # Build collision map 

134 name_to_modules = defaultdict(list) 

135 for module, names in all_imports.items(): 

136 for name in names: 

137 name_to_modules[name].append(module) 

138 

139 import_lines, name_mappings = [], {} 

140 for module, names in sorted(all_imports.items()): 

141 if not module or module == 'builtins' or not names: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 continue 

143 

144 imports = [] 

145 for name in sorted(names): 

146 if len(name_to_modules[name]) > 1: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 qualified = f"{name}_{module.split('.')[-1]}" 

148 imports.append(f"{name} as {qualified}") 

149 name_mappings[(name, module)] = qualified 

150 else: 

151 imports.append(name) 

152 name_mappings[(name, module)] = name 

153 

154 import_lines.append(f"from {module} import {', '.join(imports)}") 

155 

156 return import_lines, name_mappings 

157 

158def generate_complete_function_pattern_code(func_obj, indent=0, clean_mode=False): 

159 """Generate complete Python code for function pattern with imports.""" 

160 # Collect imports from this pattern first to get name mappings 

161 function_imports, enum_imports, decorated_functions = collect_imports_from_data(func_obj) 

162 

163 # Create containers for additional imports discovered during repr generation 

164 additional_function_imports = defaultdict(set) 

165 additional_enum_imports = defaultdict(set) 

166 

167 # Merge initial imports with containers for additional imports 

168 for module, names in function_imports.items(): 

169 additional_function_imports[module].update(names) 

170 for module, names in enum_imports.items(): 

171 additional_enum_imports[module].update(names) 

172 

173 # First pass: Generate pattern representation to collect all imports (including from expanded defaults) 

174 # Use temporary name mappings for this pass 

175 temp_import_lines, temp_name_mappings = format_imports_as_strings(additional_function_imports, additional_enum_imports) 

176 pattern_repr = generate_readable_function_repr( 

177 func_obj, indent, clean_mode, temp_name_mappings, 

178 required_function_imports=additional_function_imports, 

179 required_enum_imports=additional_enum_imports 

180 ) 

181 

182 # Second pass: Now that we have ALL imports (including from expanded defaults), 

183 # regenerate name mappings to handle any new collisions, then regenerate pattern repr 

184 import_lines, final_name_mappings = format_imports_as_strings(additional_function_imports, additional_enum_imports) 

185 

186 # If name mappings changed (new collisions detected), regenerate pattern repr with correct aliases 

187 if final_name_mappings != temp_name_mappings: 

188 pattern_repr = generate_readable_function_repr( 

189 func_obj, indent, clean_mode, final_name_mappings, 

190 required_function_imports=additional_function_imports, 

191 required_enum_imports=additional_enum_imports 

192 ) 

193 

194 # Build complete code 

195 code_lines = ["# Edit this function pattern and save to apply changes", ""] 

196 if import_lines: 

197 code_lines.append("# Dynamic imports") 

198 code_lines.extend(import_lines) 

199 code_lines.append("") 

200 code_lines.append(f"pattern = {pattern_repr}") 

201 

202 return "\n".join(code_lines) 

203 

204def _value_to_repr(value, required_imports=None, name_mappings=None): 

205 """Converts a value to its Python representation string and tracks required imports.""" 

206 if isinstance(value, Enum): 

207 enum_class_name = value.__class__.__name__ 

208 enum_module = value.__class__.__module__ 

209 

210 # Collect import for the enum class 

211 if required_imports is not None and enum_module and enum_class_name: 211 ↛ 215line 211 didn't jump to line 215 because the condition on line 211 was always true

212 required_imports[enum_module].add(enum_class_name) 

213 

214 # Use name mapping if available to handle collisions 

215 if name_mappings and (enum_class_name, enum_module) in name_mappings: 

216 mapped_name = name_mappings[(enum_class_name, enum_module)] 

217 return f"{mapped_name}.{value.name}" 

218 else: 

219 return f"{enum_class_name}.{value.name}" 

220 elif isinstance(value, str): 

221 # Use repr() for strings to properly escape newlines and special characters 

222 return repr(value) 

223 elif isinstance(value, Path): 223 ↛ 225line 223 didn't jump to line 225 because the condition on line 223 was never true

224 # Track that we need Path import 

225 if required_imports is not None: 

226 required_imports['pathlib'].add('Path') 

227 

228 # Use name mapping if available 

229 path_name = 'Path' 

230 if name_mappings and ('Path', 'pathlib') in name_mappings: 

231 path_name = name_mappings[('Path', 'pathlib')] 

232 

233 return f'{path_name}({repr(str(value))})' 

234 return repr(value) 

235 

236def generate_clean_dataclass_repr(instance, indent_level=0, clean_mode=False, required_imports=None, name_mappings=None): 

237 """ 

238 Generates a clean, readable Python representation of a dataclass instance, 

239 omitting fields that are set to their default values if clean_mode is True. 

240 This function is recursive and handles nested dataclasses. 

241 """ 

242 if not dataclasses.is_dataclass(instance): 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 return _value_to_repr(instance, required_imports, name_mappings) 

244 

245 lines = [] 

246 indent_str = " " * indent_level 

247 child_indent_str = " " * (indent_level + 1) 

248 

249 # Get a default instance of the same class for comparison 

250 # CRITICAL FIX: For lazy dataclasses, create instance with raw values to preserve None vs concrete distinction 

251 if hasattr(instance, '_resolve_field_value'): 

252 # This is a lazy dataclass - create empty instance without triggering resolution 

253 default_instance = object.__new__(instance.__class__) 

254 

255 # Set all fields to None (their raw default state) using object.__setattr__ 

256 for field in dataclasses.fields(instance): 

257 object.__setattr__(default_instance, field.name, None) 

258 

259 # Initialize any required lazy dataclass attributes 

260 if hasattr(instance.__class__, '_is_lazy_dataclass'): 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 object.__setattr__(default_instance, '_is_lazy_dataclass', True) 

262 else: 

263 # Regular dataclass - use normal constructor 

264 default_instance = instance.__class__() 

265 

266 for field in dataclasses.fields(instance): 

267 field_name = field.name 

268 

269 # CRITICAL FIX: For lazy dataclasses, use raw stored value to avoid triggering resolution 

270 # This ensures tier 3 code generation only shows explicitly set pipeline config fields 

271 if hasattr(instance, '_resolve_field_value'): 

272 # This is a lazy dataclass - get raw stored value without triggering lazy resolution 

273 current_value = object.__getattribute__(instance, field_name) 

274 default_value = object.__getattribute__(default_instance, field_name) 

275 else: 

276 # Regular dataclass - use normal getattr 

277 current_value = getattr(instance, field_name) 

278 default_value = getattr(default_instance, field_name) 

279 

280 if clean_mode and current_value == default_value: 

281 continue 

282 

283 if dataclasses.is_dataclass(current_value): 

284 # Recursively generate representation for nested dataclasses 

285 nested_repr = generate_clean_dataclass_repr(current_value, indent_level + 1, clean_mode, required_imports, name_mappings) 

286 

287 # Only include nested dataclass if it has non-default content 

288 if nested_repr.strip(): # Has actual content 

289 # Collect import for the nested dataclass 

290 if required_imports is not None: 290 ↛ 296line 290 didn't jump to line 296 because the condition on line 290 was always true

291 class_module = current_value.__class__.__module__ 

292 class_name = current_value.__class__.__name__ 

293 if class_module and class_name: 293 ↛ 296line 293 didn't jump to line 296 because the condition on line 293 was always true

294 required_imports[class_module].add(class_name) 

295 

296 lines.append(f"{child_indent_str}{field_name}={current_value.__class__.__name__}(\n{nested_repr}\n{child_indent_str})") 

297 elif not clean_mode: 297 ↛ 299line 297 didn't jump to line 299 because the condition on line 297 was never true

298 # In non-clean mode, still include empty nested dataclasses 

299 if required_imports is not None: 

300 class_module = current_value.__class__.__module__ 

301 class_name = current_value.__class__.__name__ 

302 if class_module and class_name: 

303 required_imports[class_module].add(class_name) 

304 

305 lines.append(f"{child_indent_str}{field_name}={current_value.__class__.__name__}()") 

306 else: 

307 value_repr = _value_to_repr(current_value, required_imports, name_mappings) 

308 lines.append(f"{child_indent_str}{field_name}={value_repr}") 

309 

310 if not lines: 

311 return "" # Return empty string if all fields were default in clean_mode 

312 

313 return ",\n".join(lines) 

314 

315 

316def convert_pickle_to_python(pickle_path, output_path=None, clean_mode=False): 

317 """Convert an OpenHCS debug pickle file to a runnable Python script.""" 

318 

319 pickle_file = Path(pickle_path) 

320 if not pickle_file.exists(): 

321 print(f"Error: Pickle file not found: {pickle_path}") 

322 return 

323 

324 if output_path is None: 

325 output_path = pickle_file.with_suffix('.py') 

326 

327 print(f"Converting {pickle_file} to {output_path} (Clean Mode: {clean_mode})") 

328 

329 try: 

330 with open(pickle_file, 'rb') as f: 

331 data = pickle.load(f) 

332 

333 # Generate Python script 

334 with open(output_path, 'w') as f: 

335 f.write('#!/usr/bin/env python3\n') 

336 f.write('"""\n') 

337 f.write(f'OpenHCS Pipeline Script - Generated from {pickle_file.name}\n') 

338 f.write(f'Generated: {datetime.now()}\n') 

339 f.write('"""\n\n') 

340 

341 # Imports 

342 f.write('import sys\n') 

343 f.write('import os\n') 

344 f.write('from pathlib import Path\n\n') 

345 f.write('# Add OpenHCS to path\n') 

346 f.write('sys.path.insert(0, "/home/ts/code/projects/openhcs")\n\n') 

347 

348 f.write('from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator\n') 

349 f.write('from openhcs.core.steps.function_step import FunctionStep\n') 

350 f.write('from openhcs.core.config import (GlobalPipelineConfig, PathPlanningConfig, VFSConfig, ZarrConfig, \n' 

351 ' MaterializationBackend, ZarrCompressor, ZarrChunkStrategy)\n') 

352 f.write('from openhcs.constants.constants import VariableComponents, Backend, Microscope\n\n') 

353 

354 # Use extracted function for orchestrator generation 

355 orchestrator_code = generate_complete_orchestrator_code( 

356 data["plate_paths"], data["pipeline_data"], data['global_config'], clean_mode 

357 ) 

358 

359 # Write orchestrator code (already includes dynamic imports) 

360 f.write(orchestrator_code) 

361 f.write('\n\n') 

362 

363 # ... (rest of the file remains the same for now) ... 

364 f.write('def setup_signal_handlers():\n') 

365 f.write(' """Setup signal handlers to kill all child processes and threads on Ctrl+C."""\n') 

366 f.write(' import signal\n') 

367 f.write(' import os\n') 

368 f.write(' import sys\n\n') 

369 f.write(' def cleanup_and_exit(signum, frame):\n') 

370 f.write(' print(f"\\n🔥 Signal {signum} received! Cleaning up all processes and threads...")\n\n') 

371 f.write(' os._exit(1)\n\n') 

372 f.write(' signal.signal(signal.SIGINT, cleanup_and_exit)\n') 

373 f.write(' signal.signal(signal.SIGTERM, cleanup_and_exit)\n\n') 

374 

375 f.write('def run_pipeline():\n') 

376 f.write(' os.environ["OPENHCS_SUBPROCESS_MODE"] = "1"\n') 

377 f.write(' plate_paths, pipeline_data, global_config = create_pipeline()\n') 

378 f.write(' from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry\n') 

379 f.write(' setup_global_gpu_registry(global_config=global_config)\n') 

380 f.write(' for plate_path in plate_paths:\n') 

381 f.write(' orchestrator = PipelineOrchestrator(plate_path)\n') 

382 f.write(' orchestrator.initialize()\n') 

383 f.write(' compiled_contexts = orchestrator.compile_pipelines(pipeline_data[plate_path])\n') 

384 f.write(' orchestrator.execute_compiled_plate(\n') 

385 f.write(' pipeline_definition=pipeline_data[plate_path],\n') 

386 f.write(' compiled_contexts=compiled_contexts,\n') 

387 f.write(' max_workers=global_config.num_workers\n') 

388 f.write(' )\n\n') 

389 

390 f.write('if __name__ == "__main__":\n') 

391 f.write(' setup_signal_handlers()\n') 

392 f.write(' run_pipeline()\n') 

393 

394 

395 print(f"✅ Successfully converted to {output_path}") 

396 print(f"You can now run: python {output_path}") 

397 

398 except Exception as e: 

399 print(f"Error converting pickle file: {e}") 

400 import traceback 

401 traceback.print_exc() 

402 

403 

404def generate_readable_function_repr(func_obj, indent=0, clean_mode=False, name_mappings=None, 

405 required_function_imports=None, required_enum_imports=None): 

406 """Generate readable Python representation with collision-resolved function names.""" 

407 indent_str = " " * indent 

408 next_indent_str = " " * (indent + 1) 

409 name_mappings = name_mappings or {} 

410 

411 # Get qualified function name for collisions (handle both original and virtual modules) 

412 def get_name(f): 

413 if not callable(f): 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 return str(f) 

415 # Try virtual module first (for external functions), then original module 

416 virtual_module = f'openhcs.{f.__module__}' 

417 return (name_mappings.get((f.__name__, virtual_module), None) or 

418 name_mappings.get((f.__name__, f.__module__), f.__name__)) 

419 

420 if callable(func_obj): 

421 return get_name(func_obj) 

422 

423 elif isinstance(func_obj, tuple) and len(func_obj) == 2 and callable(func_obj[0]): 

424 func, args = func_obj 

425 

426 if not args and clean_mode: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 return get_name(func) 

428 

429 # Get function signature defaults 

430 try: 

431 defaults = {k: v.default for k, v in inspect.signature(func).parameters.items() 

432 if v.default is not inspect.Parameter.empty} 

433 except (ValueError, TypeError): 

434 defaults = {} 

435 

436 if clean_mode: 436 ↛ 443line 436 didn't jump to line 443 because the condition on line 436 was always true

437 # Clean mode: only show non-default values 

438 final_args = {k: v for k, v in args.items() 

439 if k not in defaults or v != defaults[k]} 

440 else: 

441 # Explicit mode: show ALL parameters (merge provided args with defaults) 

442 # Start with all defaults, then override with provided args 

443 final_args = {**defaults, **args} 

444 

445 # Collect imports from default values that weren't in original args 

446 if required_function_imports is not None or required_enum_imports is not None: 

447 for param_name, default_value in defaults.items(): 

448 if param_name not in args: # Only collect for newly added defaults 

449 # Collect imports from this default value 

450 if isinstance(default_value, Enum): 

451 if required_enum_imports is not None: 

452 enum_module = default_value.__class__.__module__ 

453 enum_class = default_value.__class__.__name__ 

454 required_enum_imports[enum_module].add(enum_class) 

455 elif is_dataclass(default_value): 

456 if required_function_imports is not None: 

457 dc_module = default_value.__class__.__module__ 

458 dc_class = default_value.__class__.__name__ 

459 required_function_imports[dc_module].add(dc_class) 

460 

461 if not final_args: 

462 return get_name(func) if clean_mode else f"({get_name(func)}, {{}})" 

463 

464 args_items = [f"{next_indent_str} '{k}': {generate_readable_function_repr(v, indent + 2, clean_mode, name_mappings, required_function_imports, required_enum_imports)}" 

465 for k, v in final_args.items()] 

466 args_str = "{\n" + ",\n".join(args_items) + f"\n{next_indent_str}}}" 

467 return f"({get_name(func)}, {args_str})" 

468 

469 elif isinstance(func_obj, list): 

470 if clean_mode and len(func_obj) == 1: 470 ↛ 472line 470 didn't jump to line 472 because the condition on line 470 was always true

471 return generate_readable_function_repr(func_obj[0], indent, clean_mode, name_mappings, required_function_imports, required_enum_imports) 

472 if not func_obj: 

473 return "[]" 

474 items = [generate_readable_function_repr(item, indent, clean_mode, name_mappings, required_function_imports, required_enum_imports) for item in func_obj] 

475 separator = f',\n{next_indent_str}' 

476 return f"[\n{next_indent_str}{separator.join(items)}\n{indent_str}]" 

477 

478 elif isinstance(func_obj, dict): 

479 if not func_obj: 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true

480 return "{}" 

481 items = [f"{next_indent_str}'{k}': {generate_readable_function_repr(v, indent, clean_mode, name_mappings, required_function_imports, required_enum_imports)}" 

482 for k, v in func_obj.items()] 

483 separator = ',\n' 

484 return f"{{{separator.join(items)}\n{indent_str}}}" 

485 

486 else: 

487 return _value_to_repr(func_obj, required_imports=required_enum_imports, name_mappings=name_mappings) 

488 

489 

490def _format_parameter_value(param_name, value, name_mappings=None): 

491 """Format parameter values with lazy dataclass preservation.""" 

492 if isinstance(value, Enum): 

493 enum_class_name = value.__class__.__name__ 

494 enum_module = value.__class__.__module__ 

495 

496 # Use name mapping if available to handle collisions 

497 if name_mappings and (enum_class_name, enum_module) in name_mappings: 497 ↛ 501line 497 didn't jump to line 501 because the condition on line 497 was always true

498 mapped_name = name_mappings[(enum_class_name, enum_module)] 

499 return f"{mapped_name}.{value.name}" 

500 else: 

501 return f"{enum_class_name}.{value.name}" 

502 elif isinstance(value, str): 

503 return f'"{value}"' 

504 elif isinstance(value, list) and value and isinstance(value[0], Enum): 

505 formatted_items = [] 

506 for item in value: 

507 enum_class_name = item.__class__.__name__ 

508 enum_module = item.__class__.__module__ 

509 

510 # Use name mapping if available to handle collisions 

511 if name_mappings and (enum_class_name, enum_module) in name_mappings: 511 ↛ 515line 511 didn't jump to line 515 because the condition on line 511 was always true

512 mapped_name = name_mappings[(enum_class_name, enum_module)] 

513 formatted_items.append(f"{mapped_name}.{item.name}") 

514 else: 

515 formatted_items.append(f"{enum_class_name}.{item.name}") 

516 

517 return f"[{', '.join(formatted_items)}]" 

518 elif is_dataclass(value) and 'Lazy' in value.__class__.__name__: 

519 # Preserve lazy behavior by only including explicitly set fields 

520 class_name = value.__class__.__name__ 

521 explicit_args = [ 

522 f"{f.name}={_format_parameter_value(f.name, object.__getattribute__(value, f.name), name_mappings)}" 

523 for f in fields(value) 

524 if object.__getattribute__(value, f.name) is not None 

525 ] 

526 return f"{class_name}({', '.join(explicit_args)})" if explicit_args else f"{class_name}()" 

527 else: 

528 return repr(value) 

529 

530 

531 

532 

533 

534def _collect_dataclass_classes_from_object(obj, visited=None): 

535 """Recursively collect dataclass classes that will be referenced in generated code.""" 

536 if visited is None: 

537 visited = set() 

538 

539 if id(obj) in visited: 

540 return set(), set() 

541 visited.add(id(obj)) 

542 

543 dataclass_classes = set() 

544 enum_classes = set() 

545 

546 if is_dataclass(obj): 

547 dataclass_classes.add(obj.__class__) 

548 for field in fields(obj): 

549 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(getattr(obj, field.name), visited) 

550 dataclass_classes.update(nested_dataclasses) 

551 enum_classes.update(nested_enums) 

552 elif isinstance(obj, Enum): 

553 enum_classes.add(obj.__class__) 

554 elif isinstance(obj, (list, tuple)): 

555 for item in obj: 

556 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(item, visited) 

557 dataclass_classes.update(nested_dataclasses) 

558 enum_classes.update(nested_enums) 

559 elif isinstance(obj, dict): 

560 for value in obj.values(): 

561 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(value, visited) 

562 dataclass_classes.update(nested_dataclasses) 

563 enum_classes.update(nested_enums) 

564 

565 return dataclass_classes, enum_classes 

566 

567 

568def _collect_enum_classes_from_step(step): 

569 """Collect enum classes referenced in step parameters for import generation.""" 

570 from openhcs.core.steps.function_step import FunctionStep 

571 import inspect 

572 from enum import Enum 

573 

574 enum_classes = set() 

575 sig = inspect.signature(FunctionStep.__init__) 

576 

577 for param_name, param in sig.parameters.items(): 

578 # Skip constructor-specific parameters and **kwargs 

579 if param_name in ['self', 'func'] or param.kind == inspect.Parameter.VAR_KEYWORD: 

580 continue 

581 

582 value = getattr(step, param_name, param.default) 

583 if isinstance(value, Enum): 

584 enum_classes.add(type(value)) 

585 elif isinstance(value, (list, tuple)): 

586 # Check for lists/tuples of enums 

587 for item in value: 

588 if isinstance(item, Enum): 

589 enum_classes.add(type(item)) 

590 

591 return enum_classes 

592 

593 

594def _generate_step_parameters(step, default_step, clean_mode=False, name_mappings=None, 

595 required_function_imports=None, required_enum_imports=None): 

596 """Generate FunctionStep constructor parameters using functional introspection.""" 

597 from openhcs.core.steps.abstract import AbstractStep 

598 

599 signatures = [(name, param) for name, param in inspect.signature(FunctionStep.__init__).parameters.items() 

600 if name != 'self' and param.kind != inspect.Parameter.VAR_KEYWORD] + \ 

601 [(name, param) for name, param in inspect.signature(AbstractStep.__init__).parameters.items() 

602 if name != 'self'] 

603 

604 return [f"{name}={generate_readable_function_repr(getattr(step, name, param.default), 1, clean_mode, name_mappings, required_function_imports, required_enum_imports) if name == 'func' else _format_parameter_value(name, getattr(step, name, param.default), name_mappings)}" 

605 for name, param in signatures 

606 if not clean_mode or getattr(step, name, param.default) != getattr(default_step, name, param.default)] 

607 

608 

609def generate_complete_pipeline_steps_code(pipeline_steps, clean_mode=False): 

610 """Generate complete Python code for pipeline steps with imports.""" 

611 # Build code with imports and steps 

612 code_lines = ["# Edit this pipeline and save to apply changes", ""] 

613 

614 # Collect imports from ALL data in pipeline steps (functions AND parameters) 

615 all_function_imports = defaultdict(set) 

616 all_enum_imports = defaultdict(set) 

617 all_decorated_functions = set() 

618 

619 for step in pipeline_steps: 

620 # Collect all imports from step (functions, enums, dataclasses) 

621 func_imports, enum_imports, func_decorated = collect_imports_from_data(step.func) 

622 param_imports, param_enums, param_decorated = collect_imports_from_data(step) 

623 

624 # Merge imports 

625 for module, names in func_imports.items(): 

626 all_function_imports[module].update(names) 

627 for module, names in enum_imports.items(): 

628 all_enum_imports[module].update(names) 

629 for module, names in param_imports.items(): 

630 all_function_imports[module].update(names) 

631 for module, names in param_enums.items(): 

632 all_enum_imports[module].update(names) 

633 all_decorated_functions.update(func_decorated) 

634 all_decorated_functions.update(param_decorated) 

635 

636 # Add FunctionStep import (always needed for generated code) 

637 all_function_imports['openhcs.core.steps.function_step'].add('FunctionStep') 

638 

639 # Virtual modules are now automatically created during OpenHCS import 

640 # No need to generate runtime virtual module creation code 

641 

642 # First pass: Generate step code to collect all imports (including from expanded defaults) 

643 # Use temporary name mappings for this pass 

644 temp_import_lines, temp_name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports) 

645 

646 step_code_lines = [] 

647 step_code_lines.append("# Pipeline steps") 

648 step_code_lines.append("pipeline_steps = []") 

649 step_code_lines.append("") 

650 

651 default_step = FunctionStep(func=lambda: None) 

652 for i, step in enumerate(pipeline_steps): 

653 step_code_lines.append(f"# Step {i+1}: {step.name}") 

654 

655 # Generate all FunctionStep parameters automatically using introspection 

656 # Pass import containers to collect additional imports from expanded defaults 

657 step_args = _generate_step_parameters(step, default_step, clean_mode, temp_name_mappings, 

658 all_function_imports, all_enum_imports) 

659 

660 args_str = ",\n ".join(step_args) 

661 step_code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)") 

662 step_code_lines.append(f"pipeline_steps.append(step_{i+1})") 

663 step_code_lines.append("") 

664 

665 # Second pass: Now that we have ALL imports (including from expanded defaults), 

666 # regenerate name mappings to handle any new collisions, then regenerate step code if needed 

667 import_lines, final_name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports) 

668 

669 # If name mappings changed (new collisions detected), regenerate step code with correct aliases 

670 if final_name_mappings != temp_name_mappings: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true

671 step_code_lines = [] 

672 step_code_lines.append("# Pipeline steps") 

673 step_code_lines.append("pipeline_steps = []") 

674 step_code_lines.append("") 

675 

676 for i, step in enumerate(pipeline_steps): 

677 step_code_lines.append(f"# Step {i+1}: {step.name}") 

678 step_args = _generate_step_parameters(step, default_step, clean_mode, final_name_mappings, 

679 all_function_imports, all_enum_imports) 

680 args_str = ",\n ".join(step_args) 

681 step_code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)") 

682 step_code_lines.append(f"pipeline_steps.append(step_{i+1})") 

683 step_code_lines.append("") 

684 

685 # Add imports to output 

686 if import_lines: 686 ↛ 692line 686 didn't jump to line 692 because the condition on line 686 was always true

687 code_lines.append("# Automatically collected imports") 

688 code_lines.extend(import_lines) 

689 code_lines.append("") 

690 

691 # Add step code 

692 code_lines.extend(step_code_lines) 

693 

694 return "\n".join(code_lines) 

695 

696 

697def generate_complete_orchestrator_code(plate_paths, pipeline_data, global_config, clean_mode=False, pipeline_config=None, per_plate_configs=None): 

698 """ 

699 Generate complete Python code for orchestrator config with imports. 

700 

701 Args: 

702 plate_paths: List of plate paths 

703 pipeline_data: Dict mapping plate_path to list of steps 

704 global_config: GlobalPipelineConfig instance 

705 clean_mode: If True, only show non-default values 

706 pipeline_config: Single PipelineConfig to apply to all plates (legacy, deprecated) 

707 per_plate_configs: Dict mapping plate_path to PipelineConfig (preferred) 

708 """ 

709 # Build complete code (extract exact logic from lines 150-200) 

710 code_lines = ["# Edit this orchestrator configuration and save to apply changes", ""] 

711 

712 # Collect imports from ALL data in orchestrator (functions, parameters, config) 

713 all_function_imports = defaultdict(set) 

714 all_enum_imports = defaultdict(set) 

715 all_decorated_functions = set() 

716 

717 # Collect from pipeline steps 

718 for plate_path, steps in pipeline_data.items(): 

719 for step in steps: 

720 # Get imports from function patterns 

721 func_imports, enum_imports, func_decorated = collect_imports_from_data(step.func) 

722 # Get imports from step parameters 

723 param_imports, param_enums, param_decorated = collect_imports_from_data(step) 

724 

725 # Merge all imports 

726 for module, names in func_imports.items(): 

727 all_function_imports[module].update(names) 

728 for module, names in enum_imports.items(): 

729 all_enum_imports[module].update(names) 

730 for module, names in param_imports.items(): 

731 all_function_imports[module].update(names) 

732 for module, names in param_enums.items(): 

733 all_enum_imports[module].update(names) 

734 all_decorated_functions.update(func_decorated) 

735 all_decorated_functions.update(param_decorated) 

736 

737 # Don't collect imports from entire global config upfront - only collect what's actually used 

738 # This prevents importing unused classes and keeps the generated code clean 

739 

740 # First pass: Collect imports needed for config representation (e.g., Path) BEFORE formatting imports 

741 config_repr_imports = defaultdict(set) 

742 temp_config_repr = generate_clean_dataclass_repr(global_config, indent_level=0, clean_mode=clean_mode, required_imports=config_repr_imports) 

743 

744 # Merge config representation imports with main imports 

745 for module, names in config_repr_imports.items(): 

746 all_function_imports[module].update(names) 

747 

748 # Don't collect imports from entire pipeline config upfront - let representation generation handle it 

749 # This ensures only actually used imports are collected 

750 

751 # Add always-needed imports for generated code structure 

752 all_function_imports['openhcs.core.steps.function_step'].add('FunctionStep') 

753 all_function_imports['openhcs.core.config'].add('PipelineConfig') 

754 all_function_imports['openhcs.core.orchestrator.orchestrator'].add('PipelineOrchestrator') 

755 all_function_imports['openhcs.core.config'].add('GlobalPipelineConfig') # Always needed for global_config constructor 

756 

757 # Virtual modules are now automatically created during OpenHCS import 

758 # No need for runtime virtual module creation 

759 

760 # First pass: Generate name mappings for collision resolution (don't add imports yet) 

761 import_lines, name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports) 

762 

763 # Generate config representation and collect only the imports it actually needs 

764 config_repr_imports = defaultdict(set) 

765 config_repr = generate_clean_dataclass_repr(global_config, indent_level=0, clean_mode=clean_mode, required_imports=config_repr_imports, name_mappings=name_mappings) 

766 

767 # Add only the imports that are actually used in the config representation 

768 for module, names in config_repr_imports.items(): 

769 all_function_imports[module].update(names) 

770 

771 # Generate readable plate path variables 

772 plate_path_vars = {} 

773 for i, plate_path in enumerate(plate_paths, 1): 

774 # Extract a readable name from the path 

775 path_str = str(plate_path) 

776 plate_name = path_str.split('/')[-1] if '/' in path_str else path_str 

777 # Replace all invalid Python identifier characters with underscores 

778 var_name = f"plate_{i}_{plate_name.replace('-', '_').replace('.', '_').replace(' ', '_')}" 

779 plate_path_vars[plate_path] = var_name 

780 

781 code_lines.extend([ 

782 "# Plate paths", 

783 "" 

784 ]) 

785 

786 # Generate individual plate path variables for readability 

787 for plate_path, var_name in plate_path_vars.items(): 

788 code_lines.append(f'{var_name} = "{plate_path}"') 

789 

790 code_lines.extend([ 

791 "", 

792 "# Collect all plate paths", 

793 f"plate_paths = [{', '.join(plate_path_vars.values())}]", 

794 "", 

795 "# Global configuration", 

796 ]) 

797 

798 code_lines.append(f"global_config = GlobalPipelineConfig(\n{config_repr}\n)") 

799 code_lines.append("") 

800 

801 # Handle per-plate configs (preferred) or single pipeline_config (legacy) 

802 if per_plate_configs: 

803 # NEW APPROACH: Group each plate's config and steps together 

804 code_lines.extend([ 

805 "# Per-plate configurations and pipeline steps", 

806 "per_plate_configs = {}", 

807 "pipeline_data = {}", 

808 "" 

809 ]) 

810 

811 default_step = FunctionStep(func=lambda: None) 

812 

813 # Iterate through plates in order, generating config + steps for each 

814 for plate_path in plate_paths: 

815 plate_name = str(plate_path).split('/')[-1] if '/' in str(plate_path) else str(plate_path) 

816 var_name = plate_path_vars[plate_path] 

817 

818 code_lines.append(f"# ========== Plate: {plate_name} ==========") 

819 code_lines.append("") 

820 

821 # Generate config for this plate 

822 if plate_path in per_plate_configs: 

823 config = per_plate_configs[plate_path] 

824 

825 # Collect imports needed for this pipeline config 

826 pipeline_config_imports = defaultdict(set) 

827 pipeline_config_repr = generate_clean_dataclass_repr( 

828 config, 

829 indent_level=0, 

830 clean_mode=clean_mode, 

831 required_imports=pipeline_config_imports, 

832 name_mappings=name_mappings 

833 ) 

834 

835 # Add the collected imports to the main import collection 

836 for module, names in pipeline_config_imports.items(): 

837 all_function_imports[module].update(names) 

838 

839 code_lines.append(f'# Pipeline config for {plate_name}') 

840 code_lines.append(f'per_plate_configs[{var_name}] = PipelineConfig(\n{pipeline_config_repr}\n)') 

841 code_lines.append("") 

842 

843 # Generate steps for this plate 

844 if plate_path in pipeline_data: 

845 steps = pipeline_data[plate_path] 

846 

847 code_lines.append(f'# Pipeline steps for {plate_name}') 

848 code_lines.append("steps = []") 

849 code_lines.append("") 

850 

851 for i, step in enumerate(steps): 

852 code_lines.append(f"# Step {i+1}: {step.name}") 

853 

854 # Generate all FunctionStep parameters automatically using introspection with name mappings 

855 step_args = _generate_step_parameters(step, default_step, clean_mode, name_mappings) 

856 

857 args_str = ",\n ".join(step_args) 

858 code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)") 

859 code_lines.append(f"steps.append(step_{i+1})") 

860 code_lines.append("") 

861 

862 code_lines.append(f'pipeline_data[{var_name}] = steps') 

863 code_lines.append("") 

864 elif pipeline_config is not None: 

865 # Legacy single pipeline_config for all plates 

866 # Collect imports needed for pipeline config representation 

867 pipeline_config_imports = defaultdict(set) 

868 pipeline_config_repr = generate_clean_dataclass_repr( 

869 pipeline_config, 

870 indent_level=0, 

871 clean_mode=clean_mode, 

872 required_imports=pipeline_config_imports, 

873 name_mappings=name_mappings 

874 ) 

875 

876 # Add the collected imports to the main import collection 

877 for module, names in pipeline_config_imports.items(): 

878 all_function_imports[module].update(names) 

879 

880 # Regenerate import lines with the new imports 

881 import_lines, name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports) 

882 

883 code_lines.extend([ 

884 "# Pipeline configuration (lazy GlobalPipelineConfig)", 

885 f"pipeline_config = PipelineConfig(\n{pipeline_config_repr}\n)", 

886 "" 

887 ]) 

888 

889 # Generate pipeline data 

890 code_lines.extend(["# Pipeline steps", "pipeline_data = {}", ""]) 

891 

892 default_step = FunctionStep(func=lambda: None) 

893 for plate_path, steps in pipeline_data.items(): 

894 # Extract plate name without using Path in generated code 

895 plate_name = str(plate_path).split('/')[-1] if '/' in str(plate_path) else str(plate_path) 

896 var_name = plate_path_vars[plate_path] 

897 

898 code_lines.append(f'# Steps for plate: {plate_name}') 

899 code_lines.append("steps = []") 

900 code_lines.append("") 

901 

902 for i, step in enumerate(steps): 

903 code_lines.append(f"# Step {i+1}: {step.name}") 

904 

905 # Generate all FunctionStep parameters automatically using introspection with name mappings 

906 step_args = _generate_step_parameters(step, default_step, clean_mode, name_mappings) 

907 

908 args_str = ",\n ".join(step_args) 

909 code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)") 

910 code_lines.append(f"steps.append(step_{i+1})") 

911 code_lines.append("") 

912 

913 # Use variable name instead of full path string 

914 code_lines.append(f'pipeline_data[{var_name}] = steps') 

915 code_lines.append("") 

916 else: 

917 # No pipeline config overrides 

918 code_lines.extend([ 

919 "# Pipeline configuration (lazy GlobalPipelineConfig)", 

920 "pipeline_config = PipelineConfig()", 

921 "" 

922 ]) 

923 

924 # Generate pipeline data 

925 code_lines.extend(["# Pipeline steps", "pipeline_data = {}", ""]) 

926 

927 default_step = FunctionStep(func=lambda: None) 

928 for plate_path, steps in pipeline_data.items(): 

929 # Extract plate name without using Path in generated code 

930 plate_name = str(plate_path).split('/')[-1] if '/' in str(plate_path) else str(plate_path) 

931 var_name = plate_path_vars[plate_path] 

932 

933 code_lines.append(f'# Steps for plate: {plate_name}') 

934 code_lines.append("steps = []") 

935 code_lines.append("") 

936 

937 for i, step in enumerate(steps): 

938 code_lines.append(f"# Step {i+1}: {step.name}") 

939 

940 # Generate all FunctionStep parameters automatically using introspection with name mappings 

941 step_args = _generate_step_parameters(step, default_step, clean_mode, name_mappings) 

942 

943 args_str = ",\n ".join(step_args) 

944 code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)") 

945 code_lines.append(f"steps.append(step_{i+1})") 

946 code_lines.append("") 

947 

948 # Use variable name instead of full path string 

949 code_lines.append(f'pipeline_data[{var_name}] = steps') 

950 code_lines.append("") 

951 

952 # Add orchestrator creation example with per-plate configs 

953 if per_plate_configs: 

954 code_lines.extend([ 

955 "# Example: Create orchestrators with per-plate PipelineConfigs", 

956 "# orchestrators = {}", 

957 "# for plate_path in plate_paths:", 

958 "# config = per_plate_configs.get(plate_path, PipelineConfig())", 

959 "# orchestrator = PipelineOrchestrator(", 

960 "# plate_path=plate_path,", 

961 "# pipeline_config=config", 

962 "# )", 

963 "# orchestrators[plate_path] = orchestrator", 

964 "" 

965 ]) 

966 else: 

967 code_lines.extend([ 

968 "# Example: Create orchestrators with PipelineConfig", 

969 "# orchestrators = {}", 

970 "# for plate_path in plate_paths:", 

971 "# orchestrator = PipelineOrchestrator(", 

972 "# plate_path=plate_path,", 

973 "# pipeline_config=pipeline_config", 

974 "# )", 

975 "# orchestrators[plate_path] = orchestrator", 

976 "" 

977 ]) 

978 

979 # Final pass: Generate all imports and prepend to code 

980 final_import_lines, final_name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports) 

981 if final_import_lines: 

982 # Prepend imports to the beginning of the code 

983 final_code_lines = ["# Edit this orchestrator configuration and save to apply changes", ""] 

984 final_code_lines.append("# Automatically collected imports") 

985 final_code_lines.extend(final_import_lines) 

986 final_code_lines.append("") 

987 

988 

989 

990 # Add the rest of the code (skip the first two lines which are the header) 

991 final_code_lines.extend(code_lines[2:]) 

992 return "\n".join(final_code_lines) 

993 else: 

994 return "\n".join(code_lines) 

995 

996 

997def generate_config_code(config, config_class, clean_mode=True): 

998 """ 

999 Generate Python code representation of a config object. 

1000 

1001 Args: 

1002 config: Config instance (PipelineConfig, GlobalPipelineConfig, etc.) 

1003 config_class: The class of the config 

1004 clean_mode: If True, only show non-default values 

1005 

1006 Returns: 

1007 str: Complete Python code with imports 

1008 """ 

1009 # Collect imports needed for config representation 

1010 required_imports = defaultdict(set) 

1011 config_repr = generate_clean_dataclass_repr( 

1012 config, 

1013 indent_level=0, 

1014 clean_mode=clean_mode, 

1015 required_imports=required_imports 

1016 ) 

1017 

1018 # Add the config class itself to imports 

1019 required_imports[config_class.__module__].add(config_class.__name__) 

1020 

1021 # Build complete code with imports 

1022 code_lines = ["# Configuration Code", ""] 

1023 

1024 # Add imports 

1025 for module, names in sorted(required_imports.items()): 

1026 names_str = ", ".join(sorted(names)) 

1027 code_lines.append(f"from {module} import {names_str}") 

1028 

1029 code_lines.extend(["", f"config = {config_class.__name__}(", config_repr, ")"]) 

1030 

1031 return "\n".join(code_lines) 

1032 

1033 

1034def main(): 

1035 import argparse 

1036 parser = argparse.ArgumentParser(description="Convert OpenHCS debug pickle files to runnable Python scripts.") 

1037 parser.add_argument("pickle_file", help="Path to the input pickle file.") 

1038 parser.add_argument("output_file", nargs='?', default=None, help="Path to the output Python script file (optional).") 

1039 parser.add_argument("--clean", action="store_true", help="Generate a clean script with only non-default parameters.") 

1040 

1041 args = parser.parse_args() 

1042 

1043 convert_pickle_to_python(args.pickle_file, args.output_file, clean_mode=args.clean) 

1044 

1045if __name__ == "__main__": 

1046 main()