Coverage for openhcs/debug/pickle_to_python.py: 39.0%
562 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1#!/usr/bin/env python3
2"""
3Pickle to Python Converter - Convert OpenHCS debug pickle files to runnable Python scripts
4"""
6import sys
7import dill as pickle
8import inspect
9from pathlib import Path
10from datetime import datetime
11from collections import defaultdict
12from enum import Enum
13import dataclasses
14from dataclasses import is_dataclass, fields
16from openhcs.core.steps.function_step import FunctionStep
18def collect_imports_from_data(data_obj):
19 """Extract function, enum, and dataclass imports by traversing data structure."""
20 function_imports = defaultdict(set)
21 enum_imports = defaultdict(set)
22 decorated_functions = set()
24 def register_imports(obj):
25 if isinstance(obj, Enum):
26 enum_imports[obj.__class__.__module__].add(obj.__class__.__name__)
27 elif is_dataclass(obj):
28 module = obj.__class__.__module__
29 name = obj.__class__.__name__
30 function_imports[module].add(name)
31 [register_imports(getattr(obj, f.name)) for f in fields(obj) if getattr(obj, f.name) is not None]
32 elif callable(obj):
33 # Skip bound methods (like step.process) - only import standalone functions
34 if inspect.ismethod(obj): 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true
35 return
36 if _is_external_registered_function(obj): 36 ↛ 38line 36 didn't jump to line 38 because the condition on line 36 was never true
37 # Use the actual module path but under openhcs namespace
38 original_module = obj.__module__
39 # Convert original module to openhcs namespace: cucim.skimage.filters -> openhcs.cucim.skimage.filters
40 virtual_module = f'openhcs.{original_module}'
41 function_imports[virtual_module].add(obj.__name__)
42 decorated_functions.add(obj.__name__)
43 else:
44 function_imports[obj.__module__].add(obj.__name__)
45 elif isinstance(obj, (list, tuple)):
46 [register_imports(item) for item in obj]
47 elif isinstance(obj, dict):
48 [register_imports(value) for value in obj.values()]
49 elif hasattr(obj, '__dict__') and obj.__dict__:
50 [register_imports(value) for value in obj.__dict__.values()]
52 register_imports(data_obj)
53 return function_imports, enum_imports, decorated_functions
56def _is_external_registered_function(func):
57 """Check if function is an external library function registered with OpenHCS."""
58 # External functions have slice_by_slice but not full OpenHCS decorations
59 return (hasattr(func, 'slice_by_slice') and
60 not hasattr(func, '__processing_contract__') and
61 not func.__module__.startswith('openhcs.'))
64def _get_function_library_name(func):
65 """Get the library name for an external registered function."""
66 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
68 # Find the function in the registry to get its library name
69 all_functions = RegistryService.get_all_functions_with_metadata()
70 for func_name, metadata in all_functions.items():
71 if metadata.func is func:
72 return metadata.registry.library_name
74 return None
77def _create_openhcs_library_modules():
78 """
79 Create virtual modules that mirror external library structure under openhcs namespace.
81 This enables namespace-based distinction between raw and OpenHCS-wrapped functions:
83 - `from skimage.filters import gaussian` → Raw function (NOT pipeline-compatible)
84 - `from openhcs.skimage.filters import gaussian` → Wrapped, tested, pipeline-ready
86 The virtual modules contain only functions that have been:
87 1. Runtime tested for OpenHCS compatibility
88 2. Characterized for memory types and GPU support
89 3. Wrapped with appropriate OpenHCS decorators
91 This prevents accidental use of unwrapped functions in pipelines and makes
92 import statements self-documenting about pipeline compatibility.
93 """
94 import types
95 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
97 # Get all registered functions
98 all_functions = RegistryService.get_all_functions_with_metadata()
100 # Group functions by their full module path
101 functions_by_module = {}
102 for func_name, metadata in all_functions.items():
103 if _is_external_registered_function(metadata.func):
104 original_module = metadata.func.__module__
105 virtual_module = f'openhcs.{original_module}'
106 if virtual_module not in functions_by_module:
107 functions_by_module[virtual_module] = {}
108 functions_by_module[virtual_module][metadata.func.__name__] = metadata.func
110 # Create virtual modules for each module path
111 created_modules = []
112 for virtual_module, functions in functions_by_module.items():
113 if virtual_module not in sys.modules:
114 module = types.ModuleType(virtual_module)
115 module.__doc__ = f"Virtual module mirroring {virtual_module.replace('openhcs.', '')} with OpenHCS decorations"
116 sys.modules[virtual_module] = module
118 # Add all functions from this module
119 for func_name, func in functions.items():
120 setattr(module, func_name, func)
122 created_modules.append(virtual_module)
124 return created_modules
126def format_imports_as_strings(function_imports, enum_imports):
127 """Convert import dictionaries to list of import strings with collision resolution."""
128 # Merge imports
129 all_imports = function_imports.copy()
130 for module, names in enum_imports.items():
131 all_imports.setdefault(module, set()).update(names)
133 # Build collision map
134 name_to_modules = defaultdict(list)
135 for module, names in all_imports.items():
136 for name in names:
137 name_to_modules[name].append(module)
139 import_lines, name_mappings = [], {}
140 for module, names in sorted(all_imports.items()):
141 if not module or module == 'builtins' or not names: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 continue
144 imports = []
145 for name in sorted(names):
146 if len(name_to_modules[name]) > 1: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 qualified = f"{name}_{module.split('.')[-1]}"
148 imports.append(f"{name} as {qualified}")
149 name_mappings[(name, module)] = qualified
150 else:
151 imports.append(name)
152 name_mappings[(name, module)] = name
154 import_lines.append(f"from {module} import {', '.join(imports)}")
156 return import_lines, name_mappings
158def generate_complete_function_pattern_code(func_obj, indent=0, clean_mode=False):
159 """Generate complete Python code for function pattern with imports."""
160 # Collect imports from this pattern first to get name mappings
161 function_imports, enum_imports, decorated_functions = collect_imports_from_data(func_obj)
163 # Create containers for additional imports discovered during repr generation
164 additional_function_imports = defaultdict(set)
165 additional_enum_imports = defaultdict(set)
167 # Merge initial imports with containers for additional imports
168 for module, names in function_imports.items():
169 additional_function_imports[module].update(names)
170 for module, names in enum_imports.items():
171 additional_enum_imports[module].update(names)
173 # First pass: Generate pattern representation to collect all imports (including from expanded defaults)
174 # Use temporary name mappings for this pass
175 temp_import_lines, temp_name_mappings = format_imports_as_strings(additional_function_imports, additional_enum_imports)
176 pattern_repr = generate_readable_function_repr(
177 func_obj, indent, clean_mode, temp_name_mappings,
178 required_function_imports=additional_function_imports,
179 required_enum_imports=additional_enum_imports
180 )
182 # Second pass: Now that we have ALL imports (including from expanded defaults),
183 # regenerate name mappings to handle any new collisions, then regenerate pattern repr
184 import_lines, final_name_mappings = format_imports_as_strings(additional_function_imports, additional_enum_imports)
186 # If name mappings changed (new collisions detected), regenerate pattern repr with correct aliases
187 if final_name_mappings != temp_name_mappings:
188 pattern_repr = generate_readable_function_repr(
189 func_obj, indent, clean_mode, final_name_mappings,
190 required_function_imports=additional_function_imports,
191 required_enum_imports=additional_enum_imports
192 )
194 # Build complete code
195 code_lines = ["# Edit this function pattern and save to apply changes", ""]
196 if import_lines:
197 code_lines.append("# Dynamic imports")
198 code_lines.extend(import_lines)
199 code_lines.append("")
200 code_lines.append(f"pattern = {pattern_repr}")
202 return "\n".join(code_lines)
204def _value_to_repr(value, required_imports=None, name_mappings=None):
205 """Converts a value to its Python representation string and tracks required imports."""
206 if isinstance(value, Enum):
207 enum_class_name = value.__class__.__name__
208 enum_module = value.__class__.__module__
210 # Collect import for the enum class
211 if required_imports is not None and enum_module and enum_class_name: 211 ↛ 215line 211 didn't jump to line 215 because the condition on line 211 was always true
212 required_imports[enum_module].add(enum_class_name)
214 # Use name mapping if available to handle collisions
215 if name_mappings and (enum_class_name, enum_module) in name_mappings:
216 mapped_name = name_mappings[(enum_class_name, enum_module)]
217 return f"{mapped_name}.{value.name}"
218 else:
219 return f"{enum_class_name}.{value.name}"
220 elif isinstance(value, str):
221 # Use repr() for strings to properly escape newlines and special characters
222 return repr(value)
223 elif isinstance(value, Path): 223 ↛ 225line 223 didn't jump to line 225 because the condition on line 223 was never true
224 # Track that we need Path import
225 if required_imports is not None:
226 required_imports['pathlib'].add('Path')
228 # Use name mapping if available
229 path_name = 'Path'
230 if name_mappings and ('Path', 'pathlib') in name_mappings:
231 path_name = name_mappings[('Path', 'pathlib')]
233 return f'{path_name}({repr(str(value))})'
234 return repr(value)
236def generate_clean_dataclass_repr(instance, indent_level=0, clean_mode=False, required_imports=None, name_mappings=None):
237 """
238 Generates a clean, readable Python representation of a dataclass instance,
239 omitting fields that are set to their default values if clean_mode is True.
240 This function is recursive and handles nested dataclasses.
241 """
242 if not dataclasses.is_dataclass(instance): 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 return _value_to_repr(instance, required_imports, name_mappings)
245 lines = []
246 indent_str = " " * indent_level
247 child_indent_str = " " * (indent_level + 1)
249 # Get a default instance of the same class for comparison
250 # CRITICAL FIX: For lazy dataclasses, create instance with raw values to preserve None vs concrete distinction
251 if hasattr(instance, '_resolve_field_value'):
252 # This is a lazy dataclass - create empty instance without triggering resolution
253 default_instance = object.__new__(instance.__class__)
255 # Set all fields to None (their raw default state) using object.__setattr__
256 for field in dataclasses.fields(instance):
257 object.__setattr__(default_instance, field.name, None)
259 # Initialize any required lazy dataclass attributes
260 if hasattr(instance.__class__, '_is_lazy_dataclass'): 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true
261 object.__setattr__(default_instance, '_is_lazy_dataclass', True)
262 else:
263 # Regular dataclass - use normal constructor
264 default_instance = instance.__class__()
266 for field in dataclasses.fields(instance):
267 field_name = field.name
269 # CRITICAL FIX: For lazy dataclasses, use raw stored value to avoid triggering resolution
270 # This ensures tier 3 code generation only shows explicitly set pipeline config fields
271 if hasattr(instance, '_resolve_field_value'):
272 # This is a lazy dataclass - get raw stored value without triggering lazy resolution
273 current_value = object.__getattribute__(instance, field_name)
274 default_value = object.__getattribute__(default_instance, field_name)
275 else:
276 # Regular dataclass - use normal getattr
277 current_value = getattr(instance, field_name)
278 default_value = getattr(default_instance, field_name)
280 if clean_mode and current_value == default_value:
281 continue
283 if dataclasses.is_dataclass(current_value):
284 # Recursively generate representation for nested dataclasses
285 nested_repr = generate_clean_dataclass_repr(current_value, indent_level + 1, clean_mode, required_imports, name_mappings)
287 # Only include nested dataclass if it has non-default content
288 if nested_repr.strip(): # Has actual content
289 # Collect import for the nested dataclass
290 if required_imports is not None: 290 ↛ 296line 290 didn't jump to line 296 because the condition on line 290 was always true
291 class_module = current_value.__class__.__module__
292 class_name = current_value.__class__.__name__
293 if class_module and class_name: 293 ↛ 296line 293 didn't jump to line 296 because the condition on line 293 was always true
294 required_imports[class_module].add(class_name)
296 lines.append(f"{child_indent_str}{field_name}={current_value.__class__.__name__}(\n{nested_repr}\n{child_indent_str})")
297 elif not clean_mode: 297 ↛ 299line 297 didn't jump to line 299 because the condition on line 297 was never true
298 # In non-clean mode, still include empty nested dataclasses
299 if required_imports is not None:
300 class_module = current_value.__class__.__module__
301 class_name = current_value.__class__.__name__
302 if class_module and class_name:
303 required_imports[class_module].add(class_name)
305 lines.append(f"{child_indent_str}{field_name}={current_value.__class__.__name__}()")
306 else:
307 value_repr = _value_to_repr(current_value, required_imports, name_mappings)
308 lines.append(f"{child_indent_str}{field_name}={value_repr}")
310 if not lines:
311 return "" # Return empty string if all fields were default in clean_mode
313 return ",\n".join(lines)
316def convert_pickle_to_python(pickle_path, output_path=None, clean_mode=False):
317 """Convert an OpenHCS debug pickle file to a runnable Python script."""
319 pickle_file = Path(pickle_path)
320 if not pickle_file.exists():
321 print(f"Error: Pickle file not found: {pickle_path}")
322 return
324 if output_path is None:
325 output_path = pickle_file.with_suffix('.py')
327 print(f"Converting {pickle_file} to {output_path} (Clean Mode: {clean_mode})")
329 try:
330 with open(pickle_file, 'rb') as f:
331 data = pickle.load(f)
333 # Generate Python script
334 with open(output_path, 'w') as f:
335 f.write('#!/usr/bin/env python3\n')
336 f.write('"""\n')
337 f.write(f'OpenHCS Pipeline Script - Generated from {pickle_file.name}\n')
338 f.write(f'Generated: {datetime.now()}\n')
339 f.write('"""\n\n')
341 # Imports
342 f.write('import sys\n')
343 f.write('import os\n')
344 f.write('from pathlib import Path\n\n')
345 f.write('# Add OpenHCS to path\n')
346 f.write('sys.path.insert(0, "/home/ts/code/projects/openhcs")\n\n')
348 f.write('from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator\n')
349 f.write('from openhcs.core.steps.function_step import FunctionStep\n')
350 f.write('from openhcs.core.config import (GlobalPipelineConfig, PathPlanningConfig, VFSConfig, ZarrConfig, \n'
351 ' MaterializationBackend, ZarrCompressor, ZarrChunkStrategy)\n')
352 f.write('from openhcs.constants.constants import VariableComponents, Backend, Microscope\n\n')
354 # Use extracted function for orchestrator generation
355 orchestrator_code = generate_complete_orchestrator_code(
356 data["plate_paths"], data["pipeline_data"], data['global_config'], clean_mode
357 )
359 # Write orchestrator code (already includes dynamic imports)
360 f.write(orchestrator_code)
361 f.write('\n\n')
363 # ... (rest of the file remains the same for now) ...
364 f.write('def setup_signal_handlers():\n')
365 f.write(' """Setup signal handlers to kill all child processes and threads on Ctrl+C."""\n')
366 f.write(' import signal\n')
367 f.write(' import os\n')
368 f.write(' import sys\n\n')
369 f.write(' def cleanup_and_exit(signum, frame):\n')
370 f.write(' print(f"\\n🔥 Signal {signum} received! Cleaning up all processes and threads...")\n\n')
371 f.write(' os._exit(1)\n\n')
372 f.write(' signal.signal(signal.SIGINT, cleanup_and_exit)\n')
373 f.write(' signal.signal(signal.SIGTERM, cleanup_and_exit)\n\n')
375 f.write('def run_pipeline():\n')
376 f.write(' os.environ["OPENHCS_SUBPROCESS_MODE"] = "1"\n')
377 f.write(' plate_paths, pipeline_data, global_config = create_pipeline()\n')
378 f.write(' from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry\n')
379 f.write(' setup_global_gpu_registry(global_config=global_config)\n')
380 f.write(' for plate_path in plate_paths:\n')
381 f.write(' orchestrator = PipelineOrchestrator(plate_path)\n')
382 f.write(' orchestrator.initialize()\n')
383 f.write(' compiled_contexts = orchestrator.compile_pipelines(pipeline_data[plate_path])\n')
384 f.write(' orchestrator.execute_compiled_plate(\n')
385 f.write(' pipeline_definition=pipeline_data[plate_path],\n')
386 f.write(' compiled_contexts=compiled_contexts,\n')
387 f.write(' max_workers=global_config.num_workers\n')
388 f.write(' )\n\n')
390 f.write('if __name__ == "__main__":\n')
391 f.write(' setup_signal_handlers()\n')
392 f.write(' run_pipeline()\n')
395 print(f"✅ Successfully converted to {output_path}")
396 print(f"You can now run: python {output_path}")
398 except Exception as e:
399 print(f"Error converting pickle file: {e}")
400 import traceback
401 traceback.print_exc()
404def generate_readable_function_repr(func_obj, indent=0, clean_mode=False, name_mappings=None,
405 required_function_imports=None, required_enum_imports=None):
406 """Generate readable Python representation with collision-resolved function names."""
407 indent_str = " " * indent
408 next_indent_str = " " * (indent + 1)
409 name_mappings = name_mappings or {}
411 # Get qualified function name for collisions (handle both original and virtual modules)
412 def get_name(f):
413 if not callable(f): 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true
414 return str(f)
415 # Try virtual module first (for external functions), then original module
416 virtual_module = f'openhcs.{f.__module__}'
417 return (name_mappings.get((f.__name__, virtual_module), None) or
418 name_mappings.get((f.__name__, f.__module__), f.__name__))
420 if callable(func_obj):
421 return get_name(func_obj)
423 elif isinstance(func_obj, tuple) and len(func_obj) == 2 and callable(func_obj[0]):
424 func, args = func_obj
426 if not args and clean_mode: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 return get_name(func)
429 # Get function signature defaults
430 try:
431 defaults = {k: v.default for k, v in inspect.signature(func).parameters.items()
432 if v.default is not inspect.Parameter.empty}
433 except (ValueError, TypeError):
434 defaults = {}
436 if clean_mode: 436 ↛ 443line 436 didn't jump to line 443 because the condition on line 436 was always true
437 # Clean mode: only show non-default values
438 final_args = {k: v for k, v in args.items()
439 if k not in defaults or v != defaults[k]}
440 else:
441 # Explicit mode: show ALL parameters (merge provided args with defaults)
442 # Start with all defaults, then override with provided args
443 final_args = {**defaults, **args}
445 # Collect imports from default values that weren't in original args
446 if required_function_imports is not None or required_enum_imports is not None:
447 for param_name, default_value in defaults.items():
448 if param_name not in args: # Only collect for newly added defaults
449 # Collect imports from this default value
450 if isinstance(default_value, Enum):
451 if required_enum_imports is not None:
452 enum_module = default_value.__class__.__module__
453 enum_class = default_value.__class__.__name__
454 required_enum_imports[enum_module].add(enum_class)
455 elif is_dataclass(default_value):
456 if required_function_imports is not None:
457 dc_module = default_value.__class__.__module__
458 dc_class = default_value.__class__.__name__
459 required_function_imports[dc_module].add(dc_class)
461 if not final_args:
462 return get_name(func) if clean_mode else f"({get_name(func)}, {{}})"
464 args_items = [f"{next_indent_str} '{k}': {generate_readable_function_repr(v, indent + 2, clean_mode, name_mappings, required_function_imports, required_enum_imports)}"
465 for k, v in final_args.items()]
466 args_str = "{\n" + ",\n".join(args_items) + f"\n{next_indent_str}}}"
467 return f"({get_name(func)}, {args_str})"
469 elif isinstance(func_obj, list):
470 if clean_mode and len(func_obj) == 1: 470 ↛ 472line 470 didn't jump to line 472 because the condition on line 470 was always true
471 return generate_readable_function_repr(func_obj[0], indent, clean_mode, name_mappings, required_function_imports, required_enum_imports)
472 if not func_obj:
473 return "[]"
474 items = [generate_readable_function_repr(item, indent, clean_mode, name_mappings, required_function_imports, required_enum_imports) for item in func_obj]
475 separator = f',\n{next_indent_str}'
476 return f"[\n{next_indent_str}{separator.join(items)}\n{indent_str}]"
478 elif isinstance(func_obj, dict):
479 if not func_obj: 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true
480 return "{}"
481 items = [f"{next_indent_str}'{k}': {generate_readable_function_repr(v, indent, clean_mode, name_mappings, required_function_imports, required_enum_imports)}"
482 for k, v in func_obj.items()]
483 separator = ',\n'
484 return f"{{{separator.join(items)}\n{indent_str}}}"
486 else:
487 return _value_to_repr(func_obj, required_imports=required_enum_imports, name_mappings=name_mappings)
490def _format_parameter_value(param_name, value, name_mappings=None):
491 """Format parameter values with lazy dataclass preservation."""
492 if isinstance(value, Enum):
493 enum_class_name = value.__class__.__name__
494 enum_module = value.__class__.__module__
496 # Use name mapping if available to handle collisions
497 if name_mappings and (enum_class_name, enum_module) in name_mappings: 497 ↛ 501line 497 didn't jump to line 501 because the condition on line 497 was always true
498 mapped_name = name_mappings[(enum_class_name, enum_module)]
499 return f"{mapped_name}.{value.name}"
500 else:
501 return f"{enum_class_name}.{value.name}"
502 elif isinstance(value, str):
503 return f'"{value}"'
504 elif isinstance(value, list) and value and isinstance(value[0], Enum):
505 formatted_items = []
506 for item in value:
507 enum_class_name = item.__class__.__name__
508 enum_module = item.__class__.__module__
510 # Use name mapping if available to handle collisions
511 if name_mappings and (enum_class_name, enum_module) in name_mappings: 511 ↛ 515line 511 didn't jump to line 515 because the condition on line 511 was always true
512 mapped_name = name_mappings[(enum_class_name, enum_module)]
513 formatted_items.append(f"{mapped_name}.{item.name}")
514 else:
515 formatted_items.append(f"{enum_class_name}.{item.name}")
517 return f"[{', '.join(formatted_items)}]"
518 elif is_dataclass(value) and 'Lazy' in value.__class__.__name__:
519 # Preserve lazy behavior by only including explicitly set fields
520 class_name = value.__class__.__name__
521 explicit_args = [
522 f"{f.name}={_format_parameter_value(f.name, object.__getattribute__(value, f.name), name_mappings)}"
523 for f in fields(value)
524 if object.__getattribute__(value, f.name) is not None
525 ]
526 return f"{class_name}({', '.join(explicit_args)})" if explicit_args else f"{class_name}()"
527 else:
528 return repr(value)
534def _collect_dataclass_classes_from_object(obj, visited=None):
535 """Recursively collect dataclass classes that will be referenced in generated code."""
536 if visited is None:
537 visited = set()
539 if id(obj) in visited:
540 return set(), set()
541 visited.add(id(obj))
543 dataclass_classes = set()
544 enum_classes = set()
546 if is_dataclass(obj):
547 dataclass_classes.add(obj.__class__)
548 for field in fields(obj):
549 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(getattr(obj, field.name), visited)
550 dataclass_classes.update(nested_dataclasses)
551 enum_classes.update(nested_enums)
552 elif isinstance(obj, Enum):
553 enum_classes.add(obj.__class__)
554 elif isinstance(obj, (list, tuple)):
555 for item in obj:
556 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(item, visited)
557 dataclass_classes.update(nested_dataclasses)
558 enum_classes.update(nested_enums)
559 elif isinstance(obj, dict):
560 for value in obj.values():
561 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(value, visited)
562 dataclass_classes.update(nested_dataclasses)
563 enum_classes.update(nested_enums)
565 return dataclass_classes, enum_classes
568def _collect_enum_classes_from_step(step):
569 """Collect enum classes referenced in step parameters for import generation."""
570 from openhcs.core.steps.function_step import FunctionStep
571 import inspect
572 from enum import Enum
574 enum_classes = set()
575 sig = inspect.signature(FunctionStep.__init__)
577 for param_name, param in sig.parameters.items():
578 # Skip constructor-specific parameters and **kwargs
579 if param_name in ['self', 'func'] or param.kind == inspect.Parameter.VAR_KEYWORD:
580 continue
582 value = getattr(step, param_name, param.default)
583 if isinstance(value, Enum):
584 enum_classes.add(type(value))
585 elif isinstance(value, (list, tuple)):
586 # Check for lists/tuples of enums
587 for item in value:
588 if isinstance(item, Enum):
589 enum_classes.add(type(item))
591 return enum_classes
594def _generate_step_parameters(step, default_step, clean_mode=False, name_mappings=None,
595 required_function_imports=None, required_enum_imports=None):
596 """Generate FunctionStep constructor parameters using functional introspection."""
597 from openhcs.core.steps.abstract import AbstractStep
599 signatures = [(name, param) for name, param in inspect.signature(FunctionStep.__init__).parameters.items()
600 if name != 'self' and param.kind != inspect.Parameter.VAR_KEYWORD] + \
601 [(name, param) for name, param in inspect.signature(AbstractStep.__init__).parameters.items()
602 if name != 'self']
604 return [f"{name}={generate_readable_function_repr(getattr(step, name, param.default), 1, clean_mode, name_mappings, required_function_imports, required_enum_imports) if name == 'func' else _format_parameter_value(name, getattr(step, name, param.default), name_mappings)}"
605 for name, param in signatures
606 if not clean_mode or getattr(step, name, param.default) != getattr(default_step, name, param.default)]
609def generate_complete_pipeline_steps_code(pipeline_steps, clean_mode=False):
610 """Generate complete Python code for pipeline steps with imports."""
611 # Build code with imports and steps
612 code_lines = ["# Edit this pipeline and save to apply changes", ""]
614 # Collect imports from ALL data in pipeline steps (functions AND parameters)
615 all_function_imports = defaultdict(set)
616 all_enum_imports = defaultdict(set)
617 all_decorated_functions = set()
619 for step in pipeline_steps:
620 # Collect all imports from step (functions, enums, dataclasses)
621 func_imports, enum_imports, func_decorated = collect_imports_from_data(step.func)
622 param_imports, param_enums, param_decorated = collect_imports_from_data(step)
624 # Merge imports
625 for module, names in func_imports.items():
626 all_function_imports[module].update(names)
627 for module, names in enum_imports.items():
628 all_enum_imports[module].update(names)
629 for module, names in param_imports.items():
630 all_function_imports[module].update(names)
631 for module, names in param_enums.items():
632 all_enum_imports[module].update(names)
633 all_decorated_functions.update(func_decorated)
634 all_decorated_functions.update(param_decorated)
636 # Add FunctionStep import (always needed for generated code)
637 all_function_imports['openhcs.core.steps.function_step'].add('FunctionStep')
639 # Virtual modules are now automatically created during OpenHCS import
640 # No need to generate runtime virtual module creation code
642 # First pass: Generate step code to collect all imports (including from expanded defaults)
643 # Use temporary name mappings for this pass
644 temp_import_lines, temp_name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports)
646 step_code_lines = []
647 step_code_lines.append("# Pipeline steps")
648 step_code_lines.append("pipeline_steps = []")
649 step_code_lines.append("")
651 default_step = FunctionStep(func=lambda: None)
652 for i, step in enumerate(pipeline_steps):
653 step_code_lines.append(f"# Step {i+1}: {step.name}")
655 # Generate all FunctionStep parameters automatically using introspection
656 # Pass import containers to collect additional imports from expanded defaults
657 step_args = _generate_step_parameters(step, default_step, clean_mode, temp_name_mappings,
658 all_function_imports, all_enum_imports)
660 args_str = ",\n ".join(step_args)
661 step_code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)")
662 step_code_lines.append(f"pipeline_steps.append(step_{i+1})")
663 step_code_lines.append("")
665 # Second pass: Now that we have ALL imports (including from expanded defaults),
666 # regenerate name mappings to handle any new collisions, then regenerate step code if needed
667 import_lines, final_name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports)
669 # If name mappings changed (new collisions detected), regenerate step code with correct aliases
670 if final_name_mappings != temp_name_mappings: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true
671 step_code_lines = []
672 step_code_lines.append("# Pipeline steps")
673 step_code_lines.append("pipeline_steps = []")
674 step_code_lines.append("")
676 for i, step in enumerate(pipeline_steps):
677 step_code_lines.append(f"# Step {i+1}: {step.name}")
678 step_args = _generate_step_parameters(step, default_step, clean_mode, final_name_mappings,
679 all_function_imports, all_enum_imports)
680 args_str = ",\n ".join(step_args)
681 step_code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)")
682 step_code_lines.append(f"pipeline_steps.append(step_{i+1})")
683 step_code_lines.append("")
685 # Add imports to output
686 if import_lines: 686 ↛ 692line 686 didn't jump to line 692 because the condition on line 686 was always true
687 code_lines.append("# Automatically collected imports")
688 code_lines.extend(import_lines)
689 code_lines.append("")
691 # Add step code
692 code_lines.extend(step_code_lines)
694 return "\n".join(code_lines)
697def generate_complete_orchestrator_code(plate_paths, pipeline_data, global_config, clean_mode=False, pipeline_config=None, per_plate_configs=None):
698 """
699 Generate complete Python code for orchestrator config with imports.
701 Args:
702 plate_paths: List of plate paths
703 pipeline_data: Dict mapping plate_path to list of steps
704 global_config: GlobalPipelineConfig instance
705 clean_mode: If True, only show non-default values
706 pipeline_config: Single PipelineConfig to apply to all plates (legacy, deprecated)
707 per_plate_configs: Dict mapping plate_path to PipelineConfig (preferred)
708 """
709 # Build complete code (extract exact logic from lines 150-200)
710 code_lines = ["# Edit this orchestrator configuration and save to apply changes", ""]
712 # Collect imports from ALL data in orchestrator (functions, parameters, config)
713 all_function_imports = defaultdict(set)
714 all_enum_imports = defaultdict(set)
715 all_decorated_functions = set()
717 # Collect from pipeline steps
718 for plate_path, steps in pipeline_data.items():
719 for step in steps:
720 # Get imports from function patterns
721 func_imports, enum_imports, func_decorated = collect_imports_from_data(step.func)
722 # Get imports from step parameters
723 param_imports, param_enums, param_decorated = collect_imports_from_data(step)
725 # Merge all imports
726 for module, names in func_imports.items():
727 all_function_imports[module].update(names)
728 for module, names in enum_imports.items():
729 all_enum_imports[module].update(names)
730 for module, names in param_imports.items():
731 all_function_imports[module].update(names)
732 for module, names in param_enums.items():
733 all_enum_imports[module].update(names)
734 all_decorated_functions.update(func_decorated)
735 all_decorated_functions.update(param_decorated)
737 # Don't collect imports from entire global config upfront - only collect what's actually used
738 # This prevents importing unused classes and keeps the generated code clean
740 # First pass: Collect imports needed for config representation (e.g., Path) BEFORE formatting imports
741 config_repr_imports = defaultdict(set)
742 temp_config_repr = generate_clean_dataclass_repr(global_config, indent_level=0, clean_mode=clean_mode, required_imports=config_repr_imports)
744 # Merge config representation imports with main imports
745 for module, names in config_repr_imports.items():
746 all_function_imports[module].update(names)
748 # Don't collect imports from entire pipeline config upfront - let representation generation handle it
749 # This ensures only actually used imports are collected
751 # Add always-needed imports for generated code structure
752 all_function_imports['openhcs.core.steps.function_step'].add('FunctionStep')
753 all_function_imports['openhcs.core.config'].add('PipelineConfig')
754 all_function_imports['openhcs.core.orchestrator.orchestrator'].add('PipelineOrchestrator')
755 all_function_imports['openhcs.core.config'].add('GlobalPipelineConfig') # Always needed for global_config constructor
757 # Virtual modules are now automatically created during OpenHCS import
758 # No need for runtime virtual module creation
760 # First pass: Generate name mappings for collision resolution (don't add imports yet)
761 import_lines, name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports)
763 # Generate config representation and collect only the imports it actually needs
764 config_repr_imports = defaultdict(set)
765 config_repr = generate_clean_dataclass_repr(global_config, indent_level=0, clean_mode=clean_mode, required_imports=config_repr_imports, name_mappings=name_mappings)
767 # Add only the imports that are actually used in the config representation
768 for module, names in config_repr_imports.items():
769 all_function_imports[module].update(names)
771 # Generate readable plate path variables
772 plate_path_vars = {}
773 for i, plate_path in enumerate(plate_paths, 1):
774 # Extract a readable name from the path
775 path_str = str(plate_path)
776 plate_name = path_str.split('/')[-1] if '/' in path_str else path_str
777 # Replace all invalid Python identifier characters with underscores
778 var_name = f"plate_{i}_{plate_name.replace('-', '_').replace('.', '_').replace(' ', '_')}"
779 plate_path_vars[plate_path] = var_name
781 code_lines.extend([
782 "# Plate paths",
783 ""
784 ])
786 # Generate individual plate path variables for readability
787 for plate_path, var_name in plate_path_vars.items():
788 code_lines.append(f'{var_name} = "{plate_path}"')
790 code_lines.extend([
791 "",
792 "# Collect all plate paths",
793 f"plate_paths = [{', '.join(plate_path_vars.values())}]",
794 "",
795 "# Global configuration",
796 ])
798 code_lines.append(f"global_config = GlobalPipelineConfig(\n{config_repr}\n)")
799 code_lines.append("")
801 # Handle per-plate configs (preferred) or single pipeline_config (legacy)
802 if per_plate_configs:
803 # NEW APPROACH: Group each plate's config and steps together
804 code_lines.extend([
805 "# Per-plate configurations and pipeline steps",
806 "per_plate_configs = {}",
807 "pipeline_data = {}",
808 ""
809 ])
811 default_step = FunctionStep(func=lambda: None)
813 # Iterate through plates in order, generating config + steps for each
814 for plate_path in plate_paths:
815 plate_name = str(plate_path).split('/')[-1] if '/' in str(plate_path) else str(plate_path)
816 var_name = plate_path_vars[plate_path]
818 code_lines.append(f"# ========== Plate: {plate_name} ==========")
819 code_lines.append("")
821 # Generate config for this plate
822 if plate_path in per_plate_configs:
823 config = per_plate_configs[plate_path]
825 # Collect imports needed for this pipeline config
826 pipeline_config_imports = defaultdict(set)
827 pipeline_config_repr = generate_clean_dataclass_repr(
828 config,
829 indent_level=0,
830 clean_mode=clean_mode,
831 required_imports=pipeline_config_imports,
832 name_mappings=name_mappings
833 )
835 # Add the collected imports to the main import collection
836 for module, names in pipeline_config_imports.items():
837 all_function_imports[module].update(names)
839 code_lines.append(f'# Pipeline config for {plate_name}')
840 code_lines.append(f'per_plate_configs[{var_name}] = PipelineConfig(\n{pipeline_config_repr}\n)')
841 code_lines.append("")
843 # Generate steps for this plate
844 if plate_path in pipeline_data:
845 steps = pipeline_data[plate_path]
847 code_lines.append(f'# Pipeline steps for {plate_name}')
848 code_lines.append("steps = []")
849 code_lines.append("")
851 for i, step in enumerate(steps):
852 code_lines.append(f"# Step {i+1}: {step.name}")
854 # Generate all FunctionStep parameters automatically using introspection with name mappings
855 step_args = _generate_step_parameters(step, default_step, clean_mode, name_mappings)
857 args_str = ",\n ".join(step_args)
858 code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)")
859 code_lines.append(f"steps.append(step_{i+1})")
860 code_lines.append("")
862 code_lines.append(f'pipeline_data[{var_name}] = steps')
863 code_lines.append("")
864 elif pipeline_config is not None:
865 # Legacy single pipeline_config for all plates
866 # Collect imports needed for pipeline config representation
867 pipeline_config_imports = defaultdict(set)
868 pipeline_config_repr = generate_clean_dataclass_repr(
869 pipeline_config,
870 indent_level=0,
871 clean_mode=clean_mode,
872 required_imports=pipeline_config_imports,
873 name_mappings=name_mappings
874 )
876 # Add the collected imports to the main import collection
877 for module, names in pipeline_config_imports.items():
878 all_function_imports[module].update(names)
880 # Regenerate import lines with the new imports
881 import_lines, name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports)
883 code_lines.extend([
884 "# Pipeline configuration (lazy GlobalPipelineConfig)",
885 f"pipeline_config = PipelineConfig(\n{pipeline_config_repr}\n)",
886 ""
887 ])
889 # Generate pipeline data
890 code_lines.extend(["# Pipeline steps", "pipeline_data = {}", ""])
892 default_step = FunctionStep(func=lambda: None)
893 for plate_path, steps in pipeline_data.items():
894 # Extract plate name without using Path in generated code
895 plate_name = str(plate_path).split('/')[-1] if '/' in str(plate_path) else str(plate_path)
896 var_name = plate_path_vars[plate_path]
898 code_lines.append(f'# Steps for plate: {plate_name}')
899 code_lines.append("steps = []")
900 code_lines.append("")
902 for i, step in enumerate(steps):
903 code_lines.append(f"# Step {i+1}: {step.name}")
905 # Generate all FunctionStep parameters automatically using introspection with name mappings
906 step_args = _generate_step_parameters(step, default_step, clean_mode, name_mappings)
908 args_str = ",\n ".join(step_args)
909 code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)")
910 code_lines.append(f"steps.append(step_{i+1})")
911 code_lines.append("")
913 # Use variable name instead of full path string
914 code_lines.append(f'pipeline_data[{var_name}] = steps')
915 code_lines.append("")
916 else:
917 # No pipeline config overrides
918 code_lines.extend([
919 "# Pipeline configuration (lazy GlobalPipelineConfig)",
920 "pipeline_config = PipelineConfig()",
921 ""
922 ])
924 # Generate pipeline data
925 code_lines.extend(["# Pipeline steps", "pipeline_data = {}", ""])
927 default_step = FunctionStep(func=lambda: None)
928 for plate_path, steps in pipeline_data.items():
929 # Extract plate name without using Path in generated code
930 plate_name = str(plate_path).split('/')[-1] if '/' in str(plate_path) else str(plate_path)
931 var_name = plate_path_vars[plate_path]
933 code_lines.append(f'# Steps for plate: {plate_name}')
934 code_lines.append("steps = []")
935 code_lines.append("")
937 for i, step in enumerate(steps):
938 code_lines.append(f"# Step {i+1}: {step.name}")
940 # Generate all FunctionStep parameters automatically using introspection with name mappings
941 step_args = _generate_step_parameters(step, default_step, clean_mode, name_mappings)
943 args_str = ",\n ".join(step_args)
944 code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)")
945 code_lines.append(f"steps.append(step_{i+1})")
946 code_lines.append("")
948 # Use variable name instead of full path string
949 code_lines.append(f'pipeline_data[{var_name}] = steps')
950 code_lines.append("")
952 # Add orchestrator creation example with per-plate configs
953 if per_plate_configs:
954 code_lines.extend([
955 "# Example: Create orchestrators with per-plate PipelineConfigs",
956 "# orchestrators = {}",
957 "# for plate_path in plate_paths:",
958 "# config = per_plate_configs.get(plate_path, PipelineConfig())",
959 "# orchestrator = PipelineOrchestrator(",
960 "# plate_path=plate_path,",
961 "# pipeline_config=config",
962 "# )",
963 "# orchestrators[plate_path] = orchestrator",
964 ""
965 ])
966 else:
967 code_lines.extend([
968 "# Example: Create orchestrators with PipelineConfig",
969 "# orchestrators = {}",
970 "# for plate_path in plate_paths:",
971 "# orchestrator = PipelineOrchestrator(",
972 "# plate_path=plate_path,",
973 "# pipeline_config=pipeline_config",
974 "# )",
975 "# orchestrators[plate_path] = orchestrator",
976 ""
977 ])
979 # Final pass: Generate all imports and prepend to code
980 final_import_lines, final_name_mappings = format_imports_as_strings(all_function_imports, all_enum_imports)
981 if final_import_lines:
982 # Prepend imports to the beginning of the code
983 final_code_lines = ["# Edit this orchestrator configuration and save to apply changes", ""]
984 final_code_lines.append("# Automatically collected imports")
985 final_code_lines.extend(final_import_lines)
986 final_code_lines.append("")
990 # Add the rest of the code (skip the first two lines which are the header)
991 final_code_lines.extend(code_lines[2:])
992 return "\n".join(final_code_lines)
993 else:
994 return "\n".join(code_lines)
997def generate_config_code(config, config_class, clean_mode=True):
998 """
999 Generate Python code representation of a config object.
1001 Args:
1002 config: Config instance (PipelineConfig, GlobalPipelineConfig, etc.)
1003 config_class: The class of the config
1004 clean_mode: If True, only show non-default values
1006 Returns:
1007 str: Complete Python code with imports
1008 """
1009 # Collect imports needed for config representation
1010 required_imports = defaultdict(set)
1011 config_repr = generate_clean_dataclass_repr(
1012 config,
1013 indent_level=0,
1014 clean_mode=clean_mode,
1015 required_imports=required_imports
1016 )
1018 # Add the config class itself to imports
1019 required_imports[config_class.__module__].add(config_class.__name__)
1021 # Build complete code with imports
1022 code_lines = ["# Configuration Code", ""]
1024 # Add imports
1025 for module, names in sorted(required_imports.items()):
1026 names_str = ", ".join(sorted(names))
1027 code_lines.append(f"from {module} import {names_str}")
1029 code_lines.extend(["", f"config = {config_class.__name__}(", config_repr, ")"])
1031 return "\n".join(code_lines)
1034def main():
1035 import argparse
1036 parser = argparse.ArgumentParser(description="Convert OpenHCS debug pickle files to runnable Python scripts.")
1037 parser.add_argument("pickle_file", help="Path to the input pickle file.")
1038 parser.add_argument("output_file", nargs='?', default=None, help="Path to the output Python script file (optional).")
1039 parser.add_argument("--clean", action="store_true", help="Generate a clean script with only non-default parameters.")
1041 args = parser.parse_args()
1043 convert_pickle_to_python(args.pickle_file, args.output_file, clean_mode=args.clean)
1045if __name__ == "__main__":
1046 main()