Coverage for openhcs/debug/pickle_to_python.py: 0.0%

369 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1#!/usr/bin/env python3 

2""" 

3Pickle to Python Converter - Convert OpenHCS debug pickle files to runnable Python scripts 

4""" 

5 

6import sys 

7import dill as pickle 

8import inspect 

9import dataclasses 

10from pathlib import Path 

11from datetime import datetime 

12from collections import defaultdict 

13from enum import Enum 

14 

15# It's better to have these imports at the top level 

16from openhcs.core.config import GlobalPipelineConfig, PathPlanningConfig, VFSConfig, ZarrConfig 

17from openhcs.core.steps.function_step import FunctionStep 

18 

19def collect_imports_from_data(data_obj): 

20 """Extract function and enum imports by traversing data structure.""" 

21 function_imports = defaultdict(set) 

22 enum_imports = defaultdict(set) 

23 

24 def find_and_register_imports(obj): 

25 if isinstance(obj, Enum): 

26 module = obj.__class__.__module__ 

27 name = obj.__class__.__name__ 

28 # Only skip built-in modules that don't need imports 

29 if module and name and module != 'builtins': 

30 enum_imports[module].add(name) 

31 elif callable(obj): 

32 module = getattr(obj, '__module__', None) 

33 name = getattr(obj, '__name__', None) 

34 # Only skip built-in modules that don't need imports 

35 if module and name and module != 'builtins': 

36 function_imports[module].add(name) 

37 elif isinstance(obj, (list, tuple)): 

38 for item in obj: 

39 find_and_register_imports(item) 

40 elif isinstance(obj, dict): 

41 for value in obj.values(): 

42 find_and_register_imports(value) 

43 

44 find_and_register_imports(data_obj) 

45 return function_imports, enum_imports 

46 

47def format_imports_as_strings(function_imports, enum_imports): 

48 """Convert import dictionaries to list of import strings.""" 

49 import_lines = [] 

50 all_imports = function_imports.copy() 

51 for module, names in enum_imports.items(): 

52 all_imports[module].update(names) 

53 

54 for module, names in sorted(all_imports.items()): 

55 import_lines.append(f"from {module} import {', '.join(sorted(names))}") 

56 

57 return import_lines 

58 

59def generate_complete_function_pattern_code(func_obj, indent=0, clean_mode=False): 

60 """Generate complete Python code for function pattern with imports.""" 

61 # Generate pattern representation 

62 pattern_repr = generate_readable_function_repr(func_obj, indent, clean_mode) 

63 

64 # Collect imports from this pattern 

65 function_imports, enum_imports = collect_imports_from_data(func_obj) 

66 import_lines = format_imports_as_strings(function_imports, enum_imports) 

67 

68 # Build complete code 

69 code_lines = ["# Edit this function pattern and save to apply changes", ""] 

70 if import_lines: 

71 code_lines.append("# Dynamic imports") 

72 code_lines.extend(import_lines) 

73 code_lines.append("") 

74 code_lines.append(f"pattern = {pattern_repr}") 

75 

76 return "\n".join(code_lines) 

77 

78def _value_to_repr(value): 

79 """Converts a value to its Python representation string.""" 

80 if isinstance(value, Enum): 

81 return f"{value.__class__.__name__}.{value.name}" 

82 elif isinstance(value, str): 

83 # Use repr() for strings to properly escape newlines and special characters 

84 return repr(value) 

85 elif isinstance(value, Path): 

86 return f'Path({repr(str(value))})' 

87 return repr(value) 

88 

89def generate_clean_dataclass_repr(instance, indent_level=0, clean_mode=False): 

90 """ 

91 Generates a clean, readable Python representation of a dataclass instance, 

92 omitting fields that are set to their default values if clean_mode is True. 

93 This function is recursive and handles nested dataclasses. 

94 """ 

95 if not dataclasses.is_dataclass(instance): 

96 return _value_to_repr(instance) 

97 

98 lines = [] 

99 indent_str = " " * indent_level 

100 child_indent_str = " " * (indent_level + 1) 

101 

102 # Get a default instance of the same class for comparison 

103 default_instance = instance.__class__() 

104 

105 for field in dataclasses.fields(instance): 

106 field_name = field.name 

107 current_value = getattr(instance, field_name) 

108 default_value = getattr(default_instance, field_name) 

109 

110 if clean_mode and current_value == default_value: 

111 continue 

112 

113 if dataclasses.is_dataclass(current_value): 

114 # Recursively generate representation for nested dataclasses 

115 nested_repr = generate_clean_dataclass_repr(current_value, indent_level + 1, clean_mode) 

116 lines.append(f"{child_indent_str}{field_name}={current_value.__class__.__name__}(\n{nested_repr}\n{child_indent_str})") 

117 else: 

118 value_repr = _value_to_repr(current_value) 

119 lines.append(f"{child_indent_str}{field_name}={value_repr}") 

120 

121 if not lines: 

122 return "" # Return empty string if all fields were default in clean_mode 

123 

124 return ",\n".join(lines) 

125 

126 

127def convert_pickle_to_python(pickle_path, output_path=None, clean_mode=False): 

128 """Convert an OpenHCS debug pickle file to a runnable Python script.""" 

129 

130 pickle_file = Path(pickle_path) 

131 if not pickle_file.exists(): 

132 print(f"Error: Pickle file not found: {pickle_path}") 

133 return 

134 

135 if output_path is None: 

136 output_path = pickle_file.with_suffix('.py') 

137 

138 print(f"Converting {pickle_file} to {output_path} (Clean Mode: {clean_mode})") 

139 

140 try: 

141 with open(pickle_file, 'rb') as f: 

142 data = pickle.load(f) 

143 

144 # Generate Python script 

145 with open(output_path, 'w') as f: 

146 f.write('#!/usr/bin/env python3\n') 

147 f.write('"""\n') 

148 f.write(f'OpenHCS Pipeline Script - Generated from {pickle_file.name}\n') 

149 f.write(f'Generated: {datetime.now()}\n') 

150 f.write('"""\n\n') 

151 

152 # Imports 

153 f.write('import sys\n') 

154 f.write('import os\n') 

155 f.write('from pathlib import Path\n\n') 

156 f.write('# Add OpenHCS to path\n') 

157 f.write('sys.path.insert(0, "/home/ts/code/projects/openhcs")\n\n') 

158 

159 f.write('from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator\n') 

160 f.write('from openhcs.core.steps.function_step import FunctionStep\n') 

161 f.write('from openhcs.core.config import (GlobalPipelineConfig, PathPlanningConfig, VFSConfig, ZarrConfig, \n' 

162 ' MaterializationBackend, ZarrCompressor, ZarrChunkStrategy)\n') 

163 f.write('from openhcs.constants.constants import VariableComponents, Backend, Microscope\n\n') 

164 

165 # Use extracted function for orchestrator generation 

166 orchestrator_code = generate_complete_orchestrator_code( 

167 data["plate_paths"], data["pipeline_data"], data['global_config'], clean_mode 

168 ) 

169 

170 # Write orchestrator code (already includes dynamic imports) 

171 f.write(orchestrator_code) 

172 f.write('\n\n') 

173 

174 # ... (rest of the file remains the same for now) ... 

175 f.write('def setup_signal_handlers():\n') 

176 f.write(' """Setup signal handlers to kill all child processes and threads on Ctrl+C."""\n') 

177 f.write(' import signal\n') 

178 f.write(' import os\n') 

179 f.write(' import sys\n\n') 

180 f.write(' def cleanup_and_exit(signum, frame):\n') 

181 f.write(' print(f"\\n🔥 Signal {signum} received! Cleaning up all processes and threads...")\n\n') 

182 f.write(' os._exit(1)\n\n') 

183 f.write(' signal.signal(signal.SIGINT, cleanup_and_exit)\n') 

184 f.write(' signal.signal(signal.SIGTERM, cleanup_and_exit)\n\n') 

185 

186 f.write('def run_pipeline():\n') 

187 f.write(' os.environ["OPENHCS_SUBPROCESS_MODE"] = "1"\n') 

188 f.write(' plate_paths, pipeline_data, global_config = create_pipeline()\n') 

189 f.write(' from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry\n') 

190 f.write(' setup_global_gpu_registry(global_config=global_config)\n') 

191 f.write(' for plate_path in plate_paths:\n') 

192 f.write(' orchestrator = PipelineOrchestrator(plate_path, global_config=global_config)\n') 

193 f.write(' orchestrator.initialize()\n') 

194 f.write(' compiled_contexts = orchestrator.compile_pipelines(pipeline_data[plate_path])\n') 

195 f.write(' orchestrator.execute_compiled_plate(\n') 

196 f.write(' pipeline_definition=pipeline_data[plate_path],\n') 

197 f.write(' compiled_contexts=compiled_contexts,\n') 

198 f.write(' max_workers=global_config.num_workers\n') 

199 f.write(' )\n\n') 

200 

201 f.write('if __name__ == "__main__":\n') 

202 f.write(' setup_signal_handlers()\n') 

203 f.write(' run_pipeline()\n') 

204 

205 

206 print(f"✅ Successfully converted to {output_path}") 

207 print(f"You can now run: python {output_path}") 

208 

209 except Exception as e: 

210 print(f"Error converting pickle file: {e}") 

211 import traceback 

212 traceback.print_exc() 

213 

214 

215def generate_readable_function_repr(func_obj, indent=0, clean_mode=False): 

216 """ 

217 Generate a readable and optionally clean Python representation of a function pattern. 

218 - Strips default kwargs from function tuples. 

219 - Simplifies `(func, {})` to `func`. 

220 - Simplifies `[func]` to `func`. 

221 """ 

222 indent_str = " " * indent 

223 next_indent_str = " " * (indent + 1) 

224 

225 if callable(func_obj): 

226 return f"{func_obj.__name__}" 

227 

228 elif isinstance(func_obj, tuple) and len(func_obj) == 2 and callable(func_obj[0]): 

229 func, args = func_obj 

230 

231 if not args and clean_mode: 

232 return f"{func.__name__}" 

233 

234 # Get function signature to find default values 

235 try: 

236 sig = inspect.signature(func) 

237 default_params = { 

238 k: v.default for k, v in sig.parameters.items() 

239 if v.default is not inspect.Parameter.empty 

240 } 

241 except (ValueError, TypeError): # Handle built-ins or other un-inspectables 

242 default_params = {} 

243 

244 # Filter out default values in clean_mode 

245 final_args = {} 

246 for k, v in args.items(): 

247 if not clean_mode or k not in default_params or v != default_params[k]: 

248 final_args[k] = v 

249 

250 if not final_args: 

251 return f"{func.__name__}" if clean_mode else f"({func.__name__}, {{}})" 

252 

253 args_items = [] 

254 for k, v in final_args.items(): 

255 v_repr = generate_readable_function_repr(v, indent + 2, clean_mode) 

256 args_items.append(f"{next_indent_str} '{k}': {v_repr}") 

257 args_str = "{\n" + ",\n".join(args_items) + f"\n{next_indent_str}}}" 

258 return f"({func.__name__}, {args_str})" 

259 

260 elif isinstance(func_obj, list): 

261 if clean_mode and len(func_obj) == 1: 

262 return generate_readable_function_repr(func_obj[0], indent, clean_mode) 

263 if not func_obj: 

264 return "[]" 

265 items = [generate_readable_function_repr(item, indent, clean_mode) for item in func_obj] 

266 return f"[\n{next_indent_str}{f',\n{next_indent_str}'.join(items)}\n{indent_str}]" 

267 

268 elif isinstance(func_obj, dict): 

269 if not func_obj: 

270 return "{}" 

271 items = [] 

272 for key, value in func_obj.items(): 

273 value_repr = generate_readable_function_repr(value, indent, clean_mode) 

274 items.append(f"{next_indent_str}'{key}': {value_repr}") 

275 return f"{{{',\n'.join(items)}\n{indent_str}}}" 

276 

277 else: 

278 return _value_to_repr(func_obj) 

279 

280 

281def _format_parameter_value(param_name, value): 

282 """Generic parameter formatting with type-based rules.""" 

283 from enum import Enum 

284 

285 # Handle different value types generically 

286 if isinstance(value, Enum): 

287 # For any enum, use ClassName.VALUE_NAME format 

288 return f"{value.__class__.__name__}.{value.name}" 

289 elif isinstance(value, str): 

290 # String values need quotes 

291 return f'"{value}"' 

292 elif isinstance(value, list): 

293 # Handle lists of enums or other objects 

294 if value and isinstance(value[0], Enum): 

295 enum_reprs = [f"{item.__class__.__name__}.{item.name}" for item in value] 

296 return f"[{', '.join(enum_reprs)}]" 

297 else: 

298 return repr(value) 

299 else: 

300 # Use standard repr for everything else (bool, int, float, None, etc.) 

301 return repr(value) 

302 

303 

304def _collect_enum_classes_from_step(step): 

305 """Collect enum classes referenced in step parameters for import generation.""" 

306 from enum import Enum 

307 import inspect 

308 

309 enum_classes = set() 

310 sig = inspect.signature(FunctionStep.__init__) 

311 

312 for param_name, param in sig.parameters.items(): 

313 if param_name in ['self', 'func']: 

314 continue 

315 

316 value = getattr(step, param_name, param.default) 

317 

318 # Collect enum classes from parameter values 

319 if isinstance(value, Enum): 

320 enum_classes.add(value.__class__) 

321 elif isinstance(value, list): 

322 for item in value: 

323 if isinstance(item, Enum): 

324 enum_classes.add(item.__class__) 

325 

326 return enum_classes 

327 

328 

329def _collect_dataclass_classes_from_object(obj, visited=None): 

330 """Recursively collect dataclass classes that will be referenced in generated code.""" 

331 import dataclasses 

332 from enum import Enum 

333 

334 if visited is None: 

335 visited = set() 

336 

337 # Avoid infinite recursion 

338 if id(obj) in visited: 

339 return set(), set() 

340 visited.add(id(obj)) 

341 

342 dataclass_classes = set() 

343 enum_classes = set() 

344 

345 if dataclasses.is_dataclass(obj): 

346 # Add the dataclass class itself 

347 dataclass_classes.add(obj.__class__) 

348 

349 # Recursively check all fields 

350 for field in dataclasses.fields(obj): 

351 field_value = getattr(obj, field.name) 

352 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(field_value, visited) 

353 dataclass_classes.update(nested_dataclasses) 

354 enum_classes.update(nested_enums) 

355 

356 elif isinstance(obj, Enum): 

357 # Collect enum classes from enum values 

358 enum_classes.add(obj.__class__) 

359 

360 elif isinstance(obj, (list, tuple)): 

361 for item in obj: 

362 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(item, visited) 

363 dataclass_classes.update(nested_dataclasses) 

364 enum_classes.update(nested_enums) 

365 

366 elif isinstance(obj, dict): 

367 for value in obj.values(): 

368 nested_dataclasses, nested_enums = _collect_dataclass_classes_from_object(value, visited) 

369 dataclass_classes.update(nested_dataclasses) 

370 enum_classes.update(nested_enums) 

371 

372 return dataclass_classes, enum_classes 

373 

374 

375def _generate_step_parameters(step, default_step, clean_mode=False): 

376 """Automatically generate all FunctionStep parameters using introspection.""" 

377 import inspect 

378 

379 step_args = [] 

380 sig = inspect.signature(FunctionStep.__init__) 

381 

382 for param_name, param in sig.parameters.items(): 

383 # Skip constructor-specific parameters 

384 if param_name in ['self', 'func']: 

385 continue 

386 

387 current_val = getattr(step, param_name, param.default) 

388 default_val = getattr(default_step, param_name) 

389 

390 # Include parameter if it differs from default (clean mode) or always (full mode) 

391 if not clean_mode or current_val != default_val: 

392 formatted_val = _format_parameter_value(param_name, current_val) 

393 step_args.append(f"{param_name}={formatted_val}") 

394 

395 return step_args 

396 

397 

398def generate_complete_pipeline_steps_code(pipeline_steps, clean_mode=False): 

399 """Generate complete Python code for pipeline steps with imports.""" 

400 # Build code with imports and steps 

401 code_lines = ["# Edit this pipeline and save to apply changes", ""] 

402 

403 # Collect imports from ALL data in pipeline steps (functions AND parameters) 

404 all_function_imports = defaultdict(set) 

405 all_enum_imports = defaultdict(set) 

406 

407 for step in pipeline_steps: 

408 # Get imports from function patterns 

409 func_imports, enum_imports = collect_imports_from_data(step.func) 

410 # Get imports from step parameters (variable_components, group_by, etc.) 

411 param_imports, param_enums = collect_imports_from_data(step) 

412 

413 # Get enum classes referenced in generated code (VariableComponents, GroupBy, etc.) 

414 enum_classes = _collect_enum_classes_from_step(step) 

415 for enum_class in enum_classes: 

416 module = enum_class.__module__ 

417 name = enum_class.__name__ 

418 if module and name: 

419 all_enum_imports[module].add(name) 

420 

421 # Merge all imports 

422 for module, names in func_imports.items(): 

423 all_function_imports[module].update(names) 

424 for module, names in enum_imports.items(): 

425 all_enum_imports[module].update(names) 

426 for module, names in param_imports.items(): 

427 all_function_imports[module].update(names) 

428 for module, names in param_enums.items(): 

429 all_enum_imports[module].update(names) 

430 

431 # Add FunctionStep import (always needed for generated code) 

432 all_function_imports['openhcs.core.steps.function_step'].add('FunctionStep') 

433 

434 # Format and add all collected imports 

435 import_lines = format_imports_as_strings(all_function_imports, all_enum_imports) 

436 if import_lines: 

437 code_lines.append("# Automatically collected imports") 

438 code_lines.extend(import_lines) 

439 code_lines.append("") 

440 

441 # Generate pipeline steps (extract exact logic from lines 164-198) 

442 code_lines.append("# Pipeline steps") 

443 code_lines.append("pipeline_steps = []") 

444 code_lines.append("") 

445 

446 default_step = FunctionStep(func=lambda: None) 

447 for i, step in enumerate(pipeline_steps): 

448 code_lines.append(f"# Step {i+1}: {step.name}") 

449 func_repr = generate_readable_function_repr(step.func, indent=1, clean_mode=clean_mode) 

450 

451 # Generate all FunctionStep parameters automatically 

452 step_args = [f"func={func_repr}"] 

453 step_args.extend(_generate_step_parameters(step, default_step, clean_mode)) 

454 

455 args_str = ",\n ".join(step_args) 

456 code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)") 

457 code_lines.append(f"pipeline_steps.append(step_{i+1})") 

458 code_lines.append("") 

459 

460 return "\n".join(code_lines) 

461 

462 

463def generate_complete_orchestrator_code(plate_paths, pipeline_data, global_config, clean_mode=False): 

464 """Generate complete Python code for orchestrator config with imports.""" 

465 # Build complete code (extract exact logic from lines 150-200) 

466 code_lines = ["# Edit this orchestrator configuration and save to apply changes", ""] 

467 

468 # Collect imports from ALL data in orchestrator (functions, parameters, config) 

469 all_function_imports = defaultdict(set) 

470 all_enum_imports = defaultdict(set) 

471 

472 # Collect from pipeline steps 

473 for plate_path, steps in pipeline_data.items(): 

474 for step in steps: 

475 # Get imports from function patterns 

476 func_imports, enum_imports = collect_imports_from_data(step.func) 

477 # Get imports from step parameters 

478 param_imports, param_enums = collect_imports_from_data(step) 

479 

480 # Get enum classes referenced in generated code 

481 enum_classes = _collect_enum_classes_from_step(step) 

482 for enum_class in enum_classes: 

483 module = enum_class.__module__ 

484 name = enum_class.__name__ 

485 if module and name: 

486 all_enum_imports[module].add(name) 

487 

488 # Merge all imports 

489 for module, names in func_imports.items(): 

490 all_function_imports[module].update(names) 

491 for module, names in enum_imports.items(): 

492 all_enum_imports[module].update(names) 

493 for module, names in param_imports.items(): 

494 all_function_imports[module].update(names) 

495 for module, names in param_enums.items(): 

496 all_enum_imports[module].update(names) 

497 

498 # Collect from global config 

499 config_imports, config_enums = collect_imports_from_data(global_config) 

500 for module, names in config_imports.items(): 

501 all_function_imports[module].update(names) 

502 for module, names in config_enums.items(): 

503 all_enum_imports[module].update(names) 

504 

505 # Collect dataclass and enum classes referenced in generated code (PathPlanningConfig, VFSConfig, Backend, etc.) 

506 dataclass_classes, config_enum_classes = _collect_dataclass_classes_from_object(global_config) 

507 for dataclass_class in dataclass_classes: 

508 module = dataclass_class.__module__ 

509 name = dataclass_class.__name__ 

510 if module and name: 

511 all_function_imports[module].add(name) 

512 

513 for enum_class in config_enum_classes: 

514 module = enum_class.__module__ 

515 name = enum_class.__name__ 

516 if module and name: 

517 all_enum_imports[module].add(name) 

518 

519 # Add always-needed imports for generated code structure 

520 all_function_imports['openhcs.core.steps.function_step'].add('FunctionStep') 

521 

522 # Format and add all collected imports 

523 import_lines = format_imports_as_strings(all_function_imports, all_enum_imports) 

524 if import_lines: 

525 code_lines.append("# Automatically collected imports") 

526 code_lines.extend(import_lines) 

527 code_lines.append("") 

528 

529 code_lines.extend([ 

530 "# Plate paths", 

531 f"plate_paths = {repr(plate_paths)}", 

532 "", 

533 "# Global configuration", 

534 ]) 

535 

536 config_repr = generate_clean_dataclass_repr(global_config, indent_level=0, clean_mode=clean_mode) 

537 code_lines.append(f"global_config = GlobalPipelineConfig(\n{config_repr}\n)") 

538 code_lines.append("") 

539 

540 # Generate pipeline data (exact logic from lines 164-198) 

541 code_lines.extend(["# Pipeline steps", "pipeline_data = {}", ""]) 

542 

543 default_step = FunctionStep(func=lambda: None) 

544 for plate_path, steps in pipeline_data.items(): 

545 code_lines.append(f'# Steps for plate: {Path(plate_path).name}') 

546 code_lines.append("steps = []") 

547 code_lines.append("") 

548 

549 for i, step in enumerate(steps): 

550 code_lines.append(f"# Step {i+1}: {step.name}") 

551 func_repr = generate_readable_function_repr(step.func, indent=1, clean_mode=clean_mode) 

552 

553 # Generate all FunctionStep parameters automatically 

554 step_args = [f"func={func_repr}"] 

555 step_args.extend(_generate_step_parameters(step, default_step, clean_mode)) 

556 

557 args_str = ",\n ".join(step_args) 

558 code_lines.append(f"step_{i+1} = FunctionStep(\n {args_str}\n)") 

559 code_lines.append(f"steps.append(step_{i+1})") 

560 code_lines.append("") 

561 

562 code_lines.append(f'pipeline_data["{plate_path}"] = steps') 

563 code_lines.append("") 

564 

565 return "\n".join(code_lines) 

566 

567 

568def main(): 

569 import argparse 

570 parser = argparse.ArgumentParser(description="Convert OpenHCS debug pickle files to runnable Python scripts.") 

571 parser.add_argument("pickle_file", help="Path to the input pickle file.") 

572 parser.add_argument("output_file", nargs='?', default=None, help="Path to the output Python script file (optional).") 

573 parser.add_argument("--clean", action="store_true", help="Generate a clean script with only non-default parameters.") 

574 

575 args = parser.parse_args() 

576 

577 convert_pickle_to_python(args.pickle_file, args.output_file, clean_mode=args.clean) 

578 

579if __name__ == "__main__": 

580 main()