Coverage for openhcs/textual_tui/widgets/plate_manager.py: 0.0%

961 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2PlateManagerWidget for OpenHCS Textual TUI 

3 

4Plate management widget with complete button set and reactive state management. 

5Matches the functionality from the current prompt-toolkit TUI. 

6""" 

7 

8import asyncio 

9import copy 

10import dataclasses 

11import inspect 

12import logging 

13import numpy as np 

14import os 

15import re 

16import signal 

17import subprocess 

18import sys 

19import tempfile 

20import time 

21import traceback 

22from collections import defaultdict 

23from pathlib import Path 

24from typing import Dict, List, Optional, Callable, Any, Tuple 

25 

26from openhcs.core.config import PipelineConfig 

27from openhcs.core.log_utils import get_current_log_file_path 

28 

29from PIL import Image 

30from textual.reactive import reactive 

31from .button_list_widget import ButtonListWidget, ButtonConfig 

32from textual import work 

33 

34from openhcs.core.config import GlobalPipelineConfig, VFSConfig, MaterializationBackend 

35from openhcs.core.pipeline import Pipeline 

36from openhcs.io.filemanager import FileManager 

37from openhcs.io.zarr import ZarrStorageBackend 

38from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator 

39from openhcs.constants.constants import Backend, VariableComponents, OrchestratorState 

40from openhcs.textual_tui.services.file_browser_service import SelectionMode 

41from openhcs.textual_tui.services.window_service import WindowService 

42from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey, get_path_cache 

43from openhcs.introspection.signature_analyzer import SignatureAnalyzer 

44 

45logger = logging.getLogger(__name__) 

46 

47# Note: Using subprocess approach instead of multiprocessing to avoid TUI FD conflicts 

48 

49def get_orchestrator_status_symbol(orchestrator: PipelineOrchestrator) -> str: 

50 """Get UI symbol for orchestrator state - simple mapping without over-engineering.""" 

51 if orchestrator is None: 

52 return "?" # No orchestrator (newly added plate) 

53 

54 state_to_symbol = { 

55 OrchestratorState.CREATED: "?", # Created but not initialized 

56 OrchestratorState.READY: "-", # Initialized, ready for compilation 

57 OrchestratorState.COMPILED: "o", # Compiled, ready for execution 

58 OrchestratorState.EXECUTING: "!", # Execution in progress 

59 OrchestratorState.COMPLETED: "C", # Execution completed successfully 

60 OrchestratorState.INIT_FAILED: "I", # Initialization failed 

61 OrchestratorState.COMPILE_FAILED: "P", # Compilation failed (P for Pipeline) 

62 OrchestratorState.EXEC_FAILED: "X", # Execution failed 

63 } 

64 

65 return state_to_symbol.get(orchestrator.state, "?") 

66 

67 

68 

69 

70 

71 

72 

73class PlateManagerWidget(ButtonListWidget): 

74 """ 

75 Plate management widget using Textual reactive state. 

76 """ 

77 

78 # Semantic reactive property (like PipelineEditor's pipeline_steps) 

79 selected_plate = reactive("") 

80 orchestrators = reactive({}) 

81 plate_configs = reactive({}) 

82 orchestrator_state_version = reactive(0) # Increment to trigger UI refresh 

83 

84 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig): 

85 button_configs = [ 

86 ButtonConfig("Add", "add_plate"), 

87 ButtonConfig("Del", "del_plate", disabled=True), 

88 ButtonConfig("Edit", "edit_config", disabled=True), # Unified edit button for config editing 

89 ButtonConfig("Init", "init_plate", disabled=True), 

90 ButtonConfig("Compile", "compile_plate", disabled=True), 

91 ButtonConfig("Run", "run_plate", disabled=True), 

92 ButtonConfig("Code", "code_plate", disabled=True), # Generate Python code 

93 ButtonConfig("Save", "save_python_script", disabled=True), # Save Python script 

94 # ButtonConfig("Export", "export_ome_zarr", disabled=True), # Export to OME-ZARR - HIDDEN FROM UI 

95 ] 

96 super().__init__( 

97 button_configs=button_configs, 

98 list_id="plate_content", 

99 container_id="plate_list", 

100 on_button_pressed=self._handle_button_press, 

101 on_selection_changed=self._handle_selection_change, 

102 on_item_moved=self._handle_item_moved 

103 ) 

104 self.filemanager = filemanager 

105 self.global_config = global_config 

106 self.plate_compiled_data = {} 

107 self.on_plate_selected: Optional[Callable[[str], None]] = None 

108 self.pipeline_editor: Optional['PipelineEditorWidget'] = None 

109 

110 # Initialize window service to avoid circular imports 

111 self.window_service = None # Will be set in on_mount 

112 

113 # --- Subprocess Architecture --- 

114 self.current_process: Optional[subprocess.Popen] = None 

115 self.zmq_client = None # ZMQ execution client (when using ZMQ mode) 

116 self.current_execution_id = None # Track current execution ID for cancellation 

117 self.log_file_path: Optional[str] = None # Single source of truth 

118 self.log_file_position: int = 0 # Track position in log file for incremental reading 

119 # Async monitoring using Textual's interval system 

120 self.monitoring_interval = None 

121 self.monitoring_active = False 

122 # --- 

123 

124 logger.debug("PlateManagerWidget initialized") 

125 

126 

127 

128 

129 

130 def on_unmount(self) -> None: 

131 logger.debug("Unmounting PlateManagerWidget, ensuring worker process is terminated.") 

132 # Schedule async stop execution since on_unmount is sync 

133 import asyncio 

134 if self.current_process and self.current_process.poll() is None: 

135 # Create a task to stop execution asynchronously 

136 asyncio.create_task(self.action_stop_execution()) 

137 self._stop_monitoring() 

138 

139 def format_item_for_display(self, plate: Dict) -> Tuple[str, str]: 

140 # Get status from orchestrator instead of magic string 

141 plate_path = plate.get('path', '') 

142 orchestrator = self.orchestrators.get(plate_path) 

143 status_symbol = get_orchestrator_status_symbol(orchestrator) 

144 

145 status_symbols = { 

146 "?": "➕", # Created (not initialized) 

147 "-": "✅", # Ready (initialized) 

148 "o": "⚡", # Compiled 

149 "!": "🔄", # Executing 

150 "C": "🏁", # Completed 

151 "I": "🚫", # Init failed 

152 "P": "💥", # Compile failed (Pipeline) 

153 "X": "❌" # Execution failed 

154 } 

155 status_icon = status_symbols.get(status_symbol, "❓") 

156 plate_name = plate.get('name', 'Unknown') 

157 display_text = f"{status_icon} {plate_name} - {plate_path}" 

158 return display_text, plate_path 

159 

160 async def _handle_button_press(self, button_id: str) -> None: 

161 action_map = { 

162 "add_plate": self.action_add_plate, 

163 "del_plate": self.action_delete_plate, 

164 "edit_config": self.action_edit_config, # Unified edit button 

165 "init_plate": self.action_init_plate, 

166 "compile_plate": self.action_compile_plate, 

167 "code_plate": self.action_code_plate, # Generate Python code 

168 "save_python_script": self.action_save_python_script, # Save Python script 

169 # "export_ome_zarr": self.action_export_ome_zarr, # HIDDEN 

170 } 

171 if button_id in action_map: 

172 action = action_map[button_id] 

173 if inspect.iscoroutinefunction(action): 

174 await action() 

175 else: 

176 action() 

177 elif button_id == "run_plate": 

178 if self._is_any_plate_running(): 

179 await self.action_stop_execution() 

180 else: 

181 await self.action_run_plate() 

182 

183 def _handle_selection_change(self, selected_values: List[str]) -> None: 

184 logger.debug(f"Checkmarks changed: {len(selected_values)} items selected") 

185 

186 def _handle_item_moved(self, from_index: int, to_index: int) -> None: 

187 current_plates = list(self.items) 

188 plate = current_plates.pop(from_index) 

189 current_plates.insert(to_index, plate) 

190 self.items = current_plates 

191 plate_name = plate['name'] 

192 direction = "up" if to_index < from_index else "down" 

193 self.app.current_status = f"Moved plate '{plate_name}' {direction}" 

194 

195 def on_mount(self) -> None: 

196 # Initialize window service 

197 self.window_service = WindowService(self.app) 

198 

199 self.call_later(self._delayed_update_display) 

200 self.call_later(self._update_button_states) 

201 

202 def watch_items(self, items: List[Dict]) -> None: 

203 """Automatically update UI when items changes (follows ButtonListWidget pattern).""" 

204 # DEBUG: Log when items list changes to track the source of the reset 

205 stack_trace = ''.join(traceback.format_stack()[-3:-1]) # Get last 2 stack frames 

206 logger.debug(f"🔍 ITEMS CHANGED: {len(items)} plates. Call stack:\n{stack_trace}") 

207 

208 # CRITICAL: Call parent's watch_items to update the SelectionList 

209 super().watch_items(items) 

210 

211 logger.debug(f"Plates updated: {len(items)} plates") 

212 self._update_button_states() 

213 

214 def watch_highlighted_item(self, plate_path: str) -> None: 

215 self.selected_plate = plate_path 

216 logger.debug(f"Highlighted plate: {plate_path}") 

217 

218 def watch_selected_plate(self, plate_path: str) -> None: 

219 self._update_button_states() 

220 if self.on_plate_selected and plate_path: 

221 self.on_plate_selected(plate_path) 

222 logger.debug(f"Selected plate: {plate_path}") 

223 

224 def watch_orchestrator_state_version(self, version: int) -> None: 

225 """Automatically refresh UI when orchestrator states change.""" 

226 # Only update UI if widget is properly mounted 

227 if not self.is_mounted: 

228 return 

229 

230 # Force SelectionList to update by calling _update_selection_list 

231 # This re-calls format_item_for_display() for all items 

232 self._update_selection_list() 

233 

234 # CRITICAL: Update main button states when orchestrator states change 

235 self._update_button_states() 

236 

237 # Also notify PipelineEditor if connected 

238 if self.pipeline_editor: 

239 logger.debug(f"PlateManager: Notifying PipelineEditor of orchestrator state change (version {version})") 

240 self.pipeline_editor._update_button_states() 

241 

242 def get_selection_state(self) -> tuple[List[Dict], str]: 

243 # Check if widget is properly mounted first 

244 if not self.is_mounted: 

245 logger.debug("get_selection_state called on unmounted widget") 

246 return [], "empty" 

247 

248 try: 

249 selection_list = self.query_one(f"#{self.list_id}") 

250 multi_selected_values = selection_list.selected 

251 if multi_selected_values: 

252 selected_items = [p for p in self.items if p.get('path') in multi_selected_values] 

253 return selected_items, "checkbox" 

254 elif self.selected_plate: 

255 selected_items = [p for p in self.items if p.get('path') == self.selected_plate] 

256 return selected_items, "cursor" 

257 else: 

258 return [], "empty" 

259 except Exception as e: 

260 # DOM CORRUPTION DETECTED - This is a critical error 

261 stack_trace = ''.join(traceback.format_stack()[-3:-1]) 

262 logger.error(f"🚨 DOM CORRUPTION: Failed to get selection state: {e}") 

263 logger.error(f"🚨 DOM CORRUPTION: Call stack:\n{stack_trace}") 

264 logger.error(f"🚨 DOM CORRUPTION: Widget mounted: {self.is_mounted}") 

265 logger.error(f"🚨 DOM CORRUPTION: Looking for: #{self.list_id}") 

266 logger.error(f"🚨 DOM CORRUPTION: Plates count: {len(self.items)}") 

267 

268 # Try to diagnose what widgets actually exist 

269 try: 

270 all_widgets = list(self.query("*")) 

271 widget_ids = [w.id for w in all_widgets if w.id] 

272 logger.error(f"🚨 DOM CORRUPTION: Available widget IDs: {widget_ids}") 

273 except Exception as diag_e: 

274 logger.error(f"🚨 DOM CORRUPTION: Could not diagnose widgets: {diag_e}") 

275 

276 if self.selected_plate: 

277 selected_items = [p for p in self.items if p.get('path') == self.selected_plate] 

278 return selected_items, "cursor" 

279 return [], "empty" 

280 

281 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str: 

282 count = len(selected_items) 

283 if count == 0: return f"No items for {operation}" 

284 if count == 1: return f"{operation.title()} item: {selected_items[0].get('name', 'Unknown')}" 

285 return f"{operation.title()} {count} items" 

286 

287 def _delayed_update_display(self) -> None: 

288 """Trigger UI update - no longer needed since reactive system handles this automatically.""" 

289 # The reactive system now handles updates automatically via watch_plates() 

290 # This method is kept for compatibility but does nothing 

291 pass 

292 

293 def _trigger_ui_refresh(self) -> None: 

294 """Force UI refresh when orchestrator state changes without items list changing.""" 

295 # Increment reactive counter to trigger automatic UI refresh 

296 self.orchestrator_state_version += 1 

297 

298 def _update_button_states(self) -> None: 

299 try: 

300 # Check if widget is mounted and buttons exist 

301 if not self.is_mounted: 

302 return 

303 

304 has_selection = bool(self.selected_plate) 

305 is_running = self._is_any_plate_running() 

306 

307 # Check if there are any selected items (for delete button) 

308 selected_items, _ = self.get_selection_state() 

309 has_selected_items = bool(selected_items) 

310 

311 can_run = has_selection and any(p['path'] in self.plate_compiled_data for p in self.items if p.get('path') == self.selected_plate) 

312 

313 # Try to get run button - if it doesn't exist, widget is not fully mounted 

314 try: 

315 run_button = self.query_one("#run_plate") 

316 if is_running: 

317 run_button.label = "Stop" 

318 run_button.disabled = False 

319 else: 

320 run_button.label = "Run" 

321 run_button.disabled = not can_run 

322 except: 

323 # Buttons not mounted yet, skip update 

324 return 

325 

326 self.query_one("#add_plate").disabled = is_running 

327 self.query_one("#del_plate").disabled = not self.items or not has_selected_items or is_running 

328 

329 # Edit button (config editing) enabled when 1+ orchestrators selected and initialized 

330 selected_items, _ = self.get_selection_state() 

331 edit_enabled = ( 

332 len(selected_items) > 0 and 

333 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

334 not is_running 

335 ) 

336 self.query_one("#edit_config").disabled = not edit_enabled 

337 

338 # Init button - enabled when plates are selected, can be initialized, and not running 

339 init_enabled = ( 

340 len(selected_items) > 0 and 

341 any(self._can_orchestrator_be_initialized(item['path']) for item in selected_items) and 

342 not is_running 

343 ) 

344 self.query_one("#init_plate").disabled = not init_enabled 

345 

346 # Compile button - enabled when plates are selected, initialized, and not running 

347 selected_items, _ = self.get_selection_state() 

348 compile_enabled = ( 

349 len(selected_items) > 0 and 

350 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

351 not is_running 

352 ) 

353 self.query_one("#compile_plate").disabled = not compile_enabled 

354 

355 # Code button - enabled when plates are selected, initialized, and not running 

356 code_enabled = ( 

357 len(selected_items) > 0 and 

358 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

359 not is_running 

360 ) 

361 self.query_one("#code_plate").disabled = not code_enabled 

362 

363 # Save Python script button - enabled when plates are selected, initialized, and not running 

364 save_enabled = ( 

365 len(selected_items) > 0 and 

366 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

367 not is_running 

368 ) 

369 self.query_one("#save_python_script").disabled = not save_enabled 

370 

371 # Export button - enabled when plate is initialized and has workspace (HIDDEN FROM UI) 

372 # export_enabled = ( 

373 # has_selection and 

374 # self.selected_plate in self.orchestrators and 

375 # not is_running 

376 # ) 

377 # try: 

378 # self.query_one("#export_ome_zarr").disabled = not export_enabled 

379 # except: 

380 # pass # Button is hidden from UI 

381 

382 # Debug button removed - no longer needed 

383 

384 except Exception as e: 

385 # Only log if it's not a mounting/unmounting issue 

386 if self.is_mounted: 

387 logger.debug(f"Button state update skipped (widget not ready): {e}") 

388 # Don't log errors during unmounting 

389 

390 def _is_any_plate_running(self) -> bool: 

391 return self.current_process is not None and self.current_process.poll() is None 

392 

393 def _has_pipelines(self, plates: List[Dict]) -> bool: 

394 """Check if all plates have pipeline definitions.""" 

395 if not self.pipeline_editor: 

396 return False 

397 

398 for plate in plates: 

399 pipeline = self.pipeline_editor.get_pipeline_for_plate(plate['path']) 

400 if not pipeline: 

401 return False 

402 return True 

403 

404 def get_plate_status(self, plate_path: str) -> str: 

405 """Get status for specific plate - now uses orchestrator state.""" 

406 orchestrator = self.orchestrators.get(plate_path) 

407 return get_orchestrator_status_symbol(orchestrator) 

408 

409 def _is_orchestrator_initialized(self, plate_path: str) -> bool: 

410 """Check if orchestrator exists and is in an initialized state.""" 

411 orchestrator = self.orchestrators.get(plate_path) 

412 if orchestrator is None: 

413 return False 

414 return orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED, 

415 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED, 

416 OrchestratorState.EXEC_FAILED] 

417 

418 def _can_orchestrator_be_initialized(self, plate_path: str) -> bool: 

419 """Check if orchestrator can be initialized (doesn't exist or is in a re-initializable state).""" 

420 orchestrator = self.orchestrators.get(plate_path) 

421 if orchestrator is None: 

422 return True # No orchestrator exists, can be initialized 

423 return orchestrator.state in [OrchestratorState.CREATED, OrchestratorState.INIT_FAILED] 

424 

425 def _notify_pipeline_editor_status_change(self, plate_path: str, new_status: str) -> None: 

426 """Notify pipeline editor when plate status changes (enables Add button immediately).""" 

427 if self.pipeline_editor and self.pipeline_editor.current_plate == plate_path: 

428 # Update pipeline editor's status and trigger button state update 

429 self.pipeline_editor.current_plate_status = new_status 

430 

431 def _get_current_pipeline_definition(self, plate_path: str = None) -> List: 

432 """Get current pipeline definition from PipelineEditor (now returns FunctionStep objects directly).""" 

433 if not self.pipeline_editor: 

434 logger.warning("No pipeline editor reference - using empty pipeline") 

435 return [] 

436 

437 # Get pipeline for specific plate or current plate 

438 target_plate = plate_path or self.pipeline_editor.current_plate 

439 if not target_plate: 

440 logger.warning("No plate specified - using empty pipeline") 

441 return [] 

442 

443 # Get pipeline from editor (now returns List[FunctionStep] directly) 

444 pipeline_steps = self.pipeline_editor.get_pipeline_for_plate(target_plate) 

445 

446 # No conversion needed - pipeline_steps are already FunctionStep objects with memory type decorators 

447 return pipeline_steps 

448 

449 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str: 

450 """Generate human-readable description of what will be operated on.""" 

451 count = len(selected_items) 

452 if selection_mode == "empty": 

453 return f"No items available for {operation}" 

454 elif selection_mode == "all": 

455 return f"{operation.title()} ALL {count} items" 

456 elif selection_mode == "checkbox": 

457 if count == 1: 

458 item_name = selected_items[0].get('name', 'Unknown') 

459 return f"{operation.title()} selected item: {item_name}" 

460 else: 

461 return f"{operation.title()} {count} selected items" 

462 else: 

463 return f"{operation.title()} {count} items" 

464 

465 def _reset_execution_state(self, status_message: str, force_fail_executing: bool = True): 

466 if self.current_process: 

467 if self.current_process.poll() is None: # Still running 

468 logger.warning("Forcefully terminating subprocess during reset.") 

469 self.current_process.terminate() 

470 try: 

471 self.current_process.wait(timeout=1) 

472 except subprocess.TimeoutExpired: 

473 self.current_process.kill() # Force kill if terminate fails 

474 self.current_process = None 

475 

476 # Clear log file reference (no temp files - log file is single source of truth) 

477 self.log_file_path = None 

478 self.log_file_position = 0 

479 

480 # Stop async monitoring 

481 self._stop_monitoring() 

482 

483 # Only reset executing orchestrators to failed if this is a forced termination 

484 # Natural completion should preserve the states set by the completion handler 

485 if force_fail_executing: 

486 for plate_path, orchestrator in self.orchestrators.items(): 

487 if orchestrator.state == OrchestratorState.EXECUTING: 

488 orchestrator._state = OrchestratorState.EXEC_FAILED 

489 

490 # Trigger UI refresh after state changes - this is essential for button states 

491 self._trigger_ui_refresh() 

492 

493 # Update button states - but only if widget is properly mounted 

494 try: 

495 if self.is_mounted and hasattr(self, 'query_one'): 

496 self._update_button_states() 

497 except Exception as e: 

498 logger.error(f"Failed to update button states during reset: {e}") 

499 

500 self.app.current_status = status_message 

501 

502 async def action_run_plate(self) -> None: 

503 # Clear logs from singleton toolong window before starting new run 

504 try: 

505 from openhcs.textual_tui.windows.toolong_window import clear_toolong_logs 

506 logger.info("Clearing logs from singleton toolong window before new run") 

507 clear_toolong_logs(self.app) 

508 logger.info("Toolong logs cleared") 

509 except Exception as e: 

510 logger.error(f"Failed to clear toolong logs: {e}") 

511 import traceback 

512 logger.error(traceback.format_exc()) 

513 

514 selected_items, _ = self.get_selection_state() 

515 if not selected_items: 

516 self.app.show_error("No plates selected to run.") 

517 return 

518 

519 ready_items = [item for item in selected_items if item.get('path') in self.plate_compiled_data] 

520 if not ready_items: 

521 self.app.show_error("Selected plates are not compiled. Please compile first.") 

522 return 

523 

524 await self._run_plates_zmq(ready_items) 

525 

526 def _start_log_monitoring(self) -> None: 

527 """Start reactive log monitoring for subprocess logs.""" 

528 if not self.log_file_path: 

529 logger.warning("Cannot start log monitoring: no log file path") 

530 return 

531 

532 try: 

533 # Extract base path from log file path (remove .log extension) 

534 log_path = Path(self.log_file_path) 

535 base_log_path = str(log_path.with_suffix('')) 

536 

537 # Notify status bar to start log monitoring 

538 if hasattr(self.app, 'status_bar') and self.app.status_bar: 

539 self.app.status_bar.start_log_monitoring(base_log_path) 

540 logger.debug(f"Started reactive log monitoring for: {base_log_path}") 

541 else: 

542 logger.warning("Status bar not available for log monitoring") 

543 

544 except Exception as e: 

545 logger.error(f"Failed to start log monitoring: {e}") 

546 

547 def _stop_log_monitoring(self) -> None: 

548 """Stop reactive log monitoring.""" 

549 try: 

550 # Notify status bar to stop log monitoring 

551 if hasattr(self.app, 'status_bar') and self.app.status_bar: 

552 self.app.status_bar.stop_log_monitoring() 

553 logger.debug("Stopped reactive log monitoring") 

554 except Exception as e: 

555 logger.error(f"Failed to stop log monitoring: {e}") 

556 

557 def _get_current_log_position(self) -> int: 

558 """Get current position in log file.""" 

559 if not self.log_file_path or not Path(self.log_file_path).exists(): 

560 return 0 

561 try: 

562 return Path(self.log_file_path).stat().st_size 

563 except Exception: 

564 return 0 

565 

566 

567 

568 def _stop_file_watcher(self) -> None: 

569 """Stop file system watcher without blocking.""" 

570 if not self.file_observer: 

571 return 

572 

573 try: 

574 # Just stop and abandon - don't wait for anything 

575 self.file_observer.stop() 

576 except Exception: 

577 pass # Ignore errors 

578 finally: 

579 # Always clear references immediately 

580 self.file_observer = None 

581 self.file_watcher = None 

582 

583 

584 

585 async def _start_monitoring(self) -> None: 

586 """Start async monitoring using Textual's interval system.""" 

587 # Stop any existing monitoring 

588 self._stop_monitoring() 

589 

590 if self.monitoring_active: 

591 return 

592 

593 self.monitoring_active = True 

594 # Use Textual's set_interval for periodic async monitoring 

595 self.monitoring_interval = self.set_interval( 

596 10.0, # Check every 10 seconds 

597 self._check_process_status_async, 

598 pause=False 

599 ) 

600 logger.debug("Started async process monitoring") 

601 

602 def _stop_monitoring(self) -> None: 

603 """Stop async monitoring.""" 

604 if self.monitoring_interval: 

605 self.monitoring_interval.stop() 

606 self.monitoring_interval = None 

607 self.monitoring_active = False 

608 

609 # Also stop log monitoring 

610 self._stop_log_monitoring() 

611 

612 logger.debug("Stopped async process monitoring") 

613 

614 async def _check_process_status_async(self) -> None: 

615 """Async process status check - replaces worker thread.""" 

616 if not self.monitoring_active: 

617 return 

618 

619 try: 

620 # Simple direct access - no threading, no locks needed 

621 if not self._is_any_plate_running(): 

622 logger.debug("🔥 MONITOR: Subprocess finished") 

623 

624 # Stop monitoring first 

625 self._stop_monitoring() 

626 

627 # Handle completion directly - no call_from_thread needed 

628 await self._handle_process_completion() 

629 

630 except Exception as e: 

631 logger.debug(f"Error in async process monitoring: {e}") 

632 # Continue monitoring on error 

633 

634 async def _handle_process_completion(self) -> None: 

635 """Handle subprocess completion - read from log file (single source of truth).""" 

636 # Determine success/failure from log file content (single source of truth) 

637 success = False 

638 

639 if self.log_file_path and Path(self.log_file_path).exists(): 

640 try: 

641 # Read log file directly to check for success markers 

642 with open(self.log_file_path, 'r') as f: 

643 log_content = f.read() 

644 # Look for success markers in the log 

645 has_execution_success = "🔥 SUBPROCESS: EXECUTION SUCCESS:" in log_content 

646 has_all_completed = "All plates completed successfully" in log_content 

647 if has_execution_success and has_all_completed: 

648 success = True 

649 

650 except Exception as e: 

651 logger.error(f"Error reading subprocess log file: {e}") 

652 success = False 

653 

654 # Clean up the subprocess 

655 logger.info("🔥 MONITOR: Starting process cleanup...") 

656 if self.current_process: 

657 try: 

658 self.current_process.wait() # Clean up the zombie process 

659 logger.info("🔥 MONITOR: Process cleanup completed") 

660 except Exception as e: 

661 logger.warning(f"🔥 MONITOR: Error during process cleanup: {e}") 

662 

663 # Update orchestrator states based on log file analysis (single source of truth) 

664 if success: 

665 # Success - update orchestrators to completed 

666 for plate_path, orchestrator in self.orchestrators.items(): 

667 if orchestrator.state == OrchestratorState.EXECUTING: 

668 orchestrator._state = OrchestratorState.COMPLETED 

669 

670 # Reset execution state (this will trigger UI refresh internally) 

671 self._reset_execution_state("Execution completed successfully.", force_fail_executing=False) 

672 else: 

673 # Failure - update orchestrators to failed 

674 for plate_path, orchestrator in self.orchestrators.items(): 

675 if orchestrator.state == OrchestratorState.EXECUTING: 

676 orchestrator._state = OrchestratorState.EXEC_FAILED 

677 

678 # Reset execution state (this will trigger UI refresh internally) 

679 self._reset_execution_state("Execution failed.", force_fail_executing=False) 

680 

681 self._stop_monitoring() # Stop monitoring since process is done 

682 

683 async def _read_log_file_incremental(self) -> None: 

684 """Read new content from the log file since last read.""" 

685 if not self.log_file_path: 

686 self.app.current_status = "🔥 LOG READER: No log file" 

687 return 

688 

689 try: 

690 # Wrap all file I/O operations in executor to avoid blocking UI 

691 def _read_log_content(): 

692 if not Path(self.log_file_path).exists(): 

693 return None, self.log_file_position 

694 

695 with open(self.log_file_path, 'r') as f: 

696 # Seek to where we left off 

697 f.seek(self.log_file_position) 

698 new_content = f.read() 

699 # Update position for next read 

700 new_position = f.tell() 

701 

702 return new_content, new_position 

703 

704 new_content, new_position = await asyncio.get_event_loop().run_in_executor(None, _read_log_content) 

705 self.log_file_position = new_position 

706 

707 if new_content is None: 

708 self.app.current_status = "🔥 LOG READER: No log file" 

709 return 

710 

711 if new_content and new_content.strip(): 

712 # Get the last non-empty line from new content 

713 lines = new_content.strip().split('\n') 

714 non_empty_lines = [line.strip() for line in lines if line.strip()] 

715 

716 if non_empty_lines: 

717 # Show the last line, truncated if too long 

718 last_line = non_empty_lines[-1] 

719 if len(last_line) > 100: 

720 last_line = last_line[:97] + "..." 

721 

722 self.app.current_status = last_line 

723 else: 

724 self.app.current_status = "🔥 LOG READER: No lines found" 

725 else: 

726 self.app.current_status = "🔥 LOG READER: No new content" 

727 

728 except Exception as e: 

729 self.app.current_status = f"🔥 LOG READER ERROR: {e}" 

730 

731 

732 

733 async def _run_plates_zmq(self, ready_items) -> None: 

734 """Run plates using ZMQ execution client (recommended).""" 

735 try: 

736 from openhcs.runtime.zmq_execution_client import ZMQExecutionClient 

737 

738 plate_paths_to_run = [item['path'] for item in ready_items] 

739 logger.info(f"Starting ZMQ execution for {len(plate_paths_to_run)} plates") 

740 

741 # Create ZMQ client (non-persistent mode for UI-managed execution) 

742 self.zmq_client = ZMQExecutionClient( 

743 port=7777, 

744 persistent=False, # UI manages lifecycle 

745 progress_callback=self._on_zmq_progress 

746 ) 

747 

748 # Connect to server (will spawn if needed) 

749 def _connect(): 

750 return self.zmq_client.connect(timeout=15) 

751 

752 import asyncio 

753 loop = asyncio.get_event_loop() 

754 connected = await loop.run_in_executor(None, _connect) 

755 

756 if not connected: 

757 raise RuntimeError("Failed to connect to ZMQ execution server") 

758 

759 logger.info("Connected to ZMQ execution server") 

760 

761 # Update orchestrator states to show running state 

762 for plate in ready_items: 

763 plate_path = plate['path'] 

764 if plate_path in self.orchestrators: 

765 self.orchestrators[plate_path]._state = OrchestratorState.EXECUTING 

766 

767 self._trigger_ui_refresh() 

768 self.app.current_status = f"Running {len(ready_items)} plate(s) via ZMQ..." 

769 self._update_button_states() 

770 

771 # Execute each plate 

772 for plate_path in plate_paths_to_run: 

773 definition_pipeline = self._get_current_pipeline_definition(plate_path) 

774 

775 # Get effective config for this plate 

776 effective_config = self.app.global_config 

777 from openhcs.core.config import PipelineConfig 

778 pipeline_config = PipelineConfig() 

779 

780 logger.info(f"Executing plate: {plate_path}") 

781 

782 # Execute via ZMQ (in executor to avoid blocking UI) 

783 def _execute(): 

784 return self.zmq_client.execute_pipeline( 

785 plate_id=str(plate_path), 

786 pipeline_steps=definition_pipeline, 

787 global_config=effective_config, 

788 pipeline_config=pipeline_config 

789 ) 

790 

791 response = await loop.run_in_executor(None, _execute) 

792 

793 # Track execution ID for cancellation 

794 if response.get('execution_id'): 

795 self.current_execution_id = response['execution_id'] 

796 

797 logger.info(f"Plate {plate_path} execution response: {response.get('status')}") 

798 

799 if response.get('status') != 'complete': 

800 error_msg = response.get('message', 'Unknown error') 

801 logger.error(f"Plate {plate_path} execution failed: {error_msg}") 

802 self.app.show_error(f"Execution failed for {plate_path}: {error_msg}") 

803 

804 # Execution complete 

805 self.current_execution_id = None 

806 self.app.current_status = f"Completed {len(ready_items)} plate(s)" 

807 

808 # Update orchestrator states 

809 for plate in ready_items: 

810 plate_path = plate['path'] 

811 if plate_path in self.orchestrators: 

812 self.orchestrators[plate_path]._state = OrchestratorState.EXEC_COMPLETE 

813 

814 self._trigger_ui_refresh() 

815 self._update_button_states() 

816 

817 # Disconnect from server 

818 def _disconnect(): 

819 self.zmq_client.disconnect() 

820 

821 await loop.run_in_executor(None, _disconnect) 

822 self.zmq_client = None 

823 

824 except Exception as e: 

825 logger.error(f"Failed to execute plates via ZMQ: {e}", exc_info=True) 

826 self.app.show_error(f"Failed to execute: {e}") 

827 self.current_execution_id = None 

828 self._reset_execution_state("ZMQ execution failed") 

829 

830 # Cleanup ZMQ client 

831 if hasattr(self, 'zmq_client') and self.zmq_client: 

832 try: 

833 self.zmq_client.disconnect() 

834 except: 

835 pass 

836 self.zmq_client = None 

837 

838 def _on_zmq_progress(self, message): 

839 """Handle progress updates from ZMQ execution server.""" 

840 try: 

841 well_id = message.get('well_id', 'unknown') 

842 step = message.get('step', 'unknown') 

843 status = message.get('status', 'unknown') 

844 

845 # Update status in TUI 

846 progress_text = f"[{well_id}] {step}: {status}" 

847 self.app.current_status = progress_text 

848 logger.debug(f"Progress: {progress_text}") 

849 

850 except Exception as e: 

851 logger.warning(f"Failed to handle progress update: {e}") 

852 

853 async def action_stop_execution(self) -> None: 

854 logger.info("🛑 Stop button pressed.") 

855 self.app.current_status = "Terminating execution..." 

856 

857 # Stop async monitoring first 

858 self._stop_monitoring() 

859 

860 # Check if using ZMQ execution 

861 if self.zmq_client: 

862 try: 

863 logger.info("🛑 Requesting graceful cancellation via ZMQ...") 

864 

865 import asyncio 

866 loop = asyncio.get_event_loop() 

867 

868 # Cancel specific execution if we have an ID 

869 if self.current_execution_id: 

870 logger.info(f"🛑 Cancelling execution {self.current_execution_id}") 

871 

872 def _cancel(): 

873 return self.zmq_client.cancel_execution(self.current_execution_id) 

874 

875 response = await loop.run_in_executor(None, _cancel) 

876 

877 if response.get('status') == 'ok': 

878 logger.info("🛑 Cancellation request accepted, waiting for graceful shutdown...") 

879 self.app.current_status = "Cancellation requested, waiting..." 

880 

881 # Wait for graceful cancellation with timeout 

882 timeout = 5 # seconds 

883 start_time = asyncio.get_event_loop().time() 

884 

885 while (asyncio.get_event_loop().time() - start_time) < timeout: 

886 # Check if execution is still running 

887 def _check_status(): 

888 return self.zmq_client.get_status(self.current_execution_id) 

889 

890 status_response = await loop.run_in_executor(None, _check_status) 

891 

892 if status_response.get('status') == 'error': 

893 # Execution no longer exists (completed or cancelled) 

894 logger.info("🛑 Execution completed/cancelled gracefully") 

895 break 

896 

897 await asyncio.sleep(0.5) 

898 else: 

899 # Timeout reached - execution still running 

900 logger.warning("🛑 Graceful cancellation timeout - execution may still be running") 

901 self.app.current_status = "Cancellation timeout - execution may still be running" 

902 else: 

903 logger.warning(f"🛑 Cancellation failed: {response.get('message')}") 

904 self.app.current_status = f"Cancellation failed: {response.get('message')}" 

905 

906 # Disconnect client 

907 def _disconnect(): 

908 self.zmq_client.disconnect() 

909 

910 await loop.run_in_executor(None, _disconnect) 

911 

912 self.zmq_client = None 

913 self.current_execution_id = None 

914 self._reset_execution_state("Execution cancelled by user") 

915 

916 except Exception as e: 

917 logger.error(f"🛑 Error cancelling ZMQ execution: {e}") 

918 self.app.show_error(f"Failed to cancel execution: {e}") 

919 

920 elif self.current_process and self.current_process.poll() is None: # Still running subprocess 

921 try: 

922 # Kill the entire process group, not just the parent process 

923 # The subprocess creates its own process group, so we need to kill that group 

924 logger.info(f"🛑 Killing process group for PID {self.current_process.pid}...") 

925 

926 # Get the process group ID (should be same as PID since subprocess calls os.setpgrp()) 

927 process_group_id = self.current_process.pid 

928 

929 # Kill entire process group (negative PID kills process group) 

930 os.killpg(process_group_id, signal.SIGTERM) 

931 

932 # Give processes time to exit gracefully 

933 await asyncio.sleep(1) 

934 

935 # Force kill if still alive 

936 try: 

937 os.killpg(process_group_id, signal.SIGKILL) 

938 logger.info(f"🛑 Force killed process group {process_group_id}") 

939 except ProcessLookupError: 

940 logger.info(f"🛑 Process group {process_group_id} already terminated") 

941 

942 except Exception as e: 

943 logger.warning(f"🛑 Error killing process group: {e}, falling back to single process kill") 

944 # Fallback to killing just the main process 

945 self.current_process.kill() 

946 

947 self._reset_execution_state("Execution terminated by user.") 

948 

949 

950 

951 async def action_add_plate(self) -> None: 

952 """Handle Add Plate button.""" 

953 await self._open_plate_directory_browser() 

954 

955 async def action_export_ome_zarr(self) -> None: 

956 """Export selected plate to OME-ZARR format.""" 

957 if not self.selected_plate: 

958 self.app.show_error("No Selection", "Please select a plate to export.") 

959 return 

960 

961 # Get the orchestrator for the selected plate 

962 orchestrator = self.orchestrators.get(self.selected_plate) 

963 if not orchestrator: 

964 self.app.show_error("Not Initialized", "Please initialize the plate before exporting.") 

965 return 

966 

967 # Open file browser for export location 

968 def handle_export_result(selected_paths): 

969 if selected_paths: 

970 export_path = Path(selected_paths[0]) if isinstance(selected_paths, list) else Path(selected_paths) 

971 self._start_ome_zarr_export(orchestrator, export_path) 

972 

973 await self.window_service.open_file_browser( 

974 file_manager=self.filemanager, 

975 initial_path=get_cached_browser_path(PathCacheKey.GENERAL), 

976 backend=Backend.DISK, 

977 title="Select OME-ZARR Export Directory", 

978 mode="save", 

979 selection_mode=SelectionMode.DIRECTORIES_ONLY, 

980 cache_key=PathCacheKey.GENERAL, 

981 on_result_callback=handle_export_result, 

982 caller_id="plate_manager_export" 

983 ) 

984 

985 def _start_ome_zarr_export(self, orchestrator, export_path: Path): 

986 """Start OME-ZARR export process.""" 

987 async def run_export(): 

988 try: 

989 self.app.current_status = f"Exporting to OME-ZARR: {export_path}" 

990 

991 # Create export-specific config with ZARR materialization 

992 from openhcs.core.config import GlobalPipelineConfig 

993 from openhcs.config_framework.global_config import get_current_global_config 

994 export_config = get_current_global_config(GlobalPipelineConfig) 

995 export_vfs_config = VFSConfig( 

996 intermediate_backend=export_config.vfs_config.intermediate_backend, 

997 materialization_backend=MaterializationBackend.ZARR 

998 ) 

999 

1000 # Update orchestrator config for export 

1001 export_global_config = dataclasses.replace(export_config, vfs=export_vfs_config) 

1002 

1003 # Create zarr backend with OME-ZARR enabled 

1004 zarr_backend = ZarrStorageBackend(ome_zarr_metadata=True) 

1005 

1006 # Copy processed data from current workspace/plate to OME-ZARR format 

1007 # For OpenHCS format, workspace_path is None, so use input_dir (plate path) 

1008 source_path = orchestrator.workspace_path or orchestrator.input_dir 

1009 if source_path and source_path.exists(): 

1010 # Find processed images in workspace/plate 

1011 processed_images = list(source_path.rglob("*.tif")) 

1012 

1013 if processed_images: 

1014 # Group by well for batch operations 

1015 wells_data = defaultdict(list) 

1016 

1017 for img_path in processed_images: 

1018 # Extract well from filename 

1019 well_match = None 

1020 # Try ImageXpress pattern: A01_s001_w1_z001.tif 

1021 match = re.search(r'([A-Z]\d{2})_', img_path.name) 

1022 if match: 

1023 well_id = match.group(1) 

1024 wells_data[well_id].append(img_path) 

1025 

1026 # Export each well to OME-ZARR 

1027 export_store_path = export_path / "plate.zarr" 

1028 

1029 for well_id, well_images in wells_data.items(): 

1030 # Load images 

1031 images = [] 

1032 for img_path in well_images: 

1033 img = Image.open(img_path) 

1034 images.append(np.array(img)) 

1035 

1036 # Create output paths for OME-ZARR structure 

1037 output_paths = [export_store_path / f"{well_id}_{i:03d}.tif" 

1038 for i in range(len(images))] 

1039 

1040 # Save to OME-ZARR format 

1041 zarr_backend.save_batch(images, output_paths, chunk_name=well_id) 

1042 

1043 self.app.current_status = f"✅ OME-ZARR export completed: {export_store_path}" 

1044 else: 

1045 self.app.show_error("No Data", "No processed images found in workspace.") 

1046 else: 

1047 self.app.show_error("No Workspace", "Plate workspace not found. Run pipeline first.") 

1048 

1049 except Exception as e: 

1050 logger.error(f"OME-ZARR export failed: {e}", exc_info=True) 

1051 self.app.show_error("Export Failed", f"OME-ZARR export failed: {str(e)}") 

1052 

1053 # Run export in background 

1054 asyncio.create_task(run_export()) 

1055 

1056 # Debug functionality removed - no longer needed 

1057 

1058 async def _open_plate_directory_browser(self): 

1059 """Open textual-window file browser for plate directory selection.""" 

1060 # Get cached path for better UX - remembers last used directory 

1061 path_cache = get_path_cache() 

1062 initial_path = path_cache.get_initial_path(PathCacheKey.PLATE_IMPORT, Path.home()) 

1063 

1064 # Open textual-window file browser for directory selection 

1065 await self.window_service.open_file_browser( 

1066 file_manager=self.filemanager, 

1067 initial_path=initial_path, 

1068 backend=Backend.DISK, 

1069 title="Select Plate Directory", 

1070 mode="load", 

1071 selection_mode=SelectionMode.DIRECTORIES_ONLY, 

1072 cache_key=PathCacheKey.PLATE_IMPORT, 

1073 on_result_callback=self._add_plate_callback, 

1074 caller_id="plate_manager", 

1075 enable_multi_selection=True 

1076 ) 

1077 

1078 def _add_plate_callback(self, selected_paths) -> None: 

1079 """Handle directory selection from file browser.""" 

1080 logger.debug(f"_add_plate_callback called with: {selected_paths} (type: {type(selected_paths)})") 

1081 

1082 if selected_paths is None or selected_paths is False: 

1083 self.app.current_status = "Plate selection cancelled" 

1084 return 

1085 

1086 # Handle both single path and list of paths 

1087 if not isinstance(selected_paths, list): 

1088 selected_paths = [selected_paths] 

1089 

1090 added_plates = [] 

1091 current_plates = list(self.items) 

1092 

1093 for selected_path in selected_paths: 

1094 # Ensure selected_path is a Path object 

1095 if isinstance(selected_path, str): 

1096 selected_path = Path(selected_path) 

1097 elif not isinstance(selected_path, Path): 

1098 selected_path = Path(str(selected_path)) 

1099 

1100 # Check if plate already exists 

1101 if any(plate['path'] == str(selected_path) for plate in current_plates): 

1102 continue 

1103 

1104 # Add the plate to the list 

1105 plate_name = selected_path.name 

1106 plate_path = str(selected_path) 

1107 plate_entry = { 

1108 'name': plate_name, 

1109 'path': plate_path, 

1110 # No status field - state comes from orchestrator 

1111 } 

1112 

1113 current_plates.append(plate_entry) 

1114 added_plates.append(plate_name) 

1115 

1116 # Cache the parent directory for next time (save user navigation time) 

1117 if selected_paths: 

1118 # Use parent of first selected path as the cached directory 

1119 first_path = selected_paths[0] if isinstance(selected_paths[0], Path) else Path(selected_paths[0]) 

1120 parent_dir = first_path.parent 

1121 get_path_cache().set_cached_path(PathCacheKey.PLATE_IMPORT, parent_dir) 

1122 

1123 # Update items list using reactive property (triggers automatic UI update) 

1124 self.items = current_plates 

1125 

1126 if added_plates: 

1127 if len(added_plates) == 1: 

1128 self.app.current_status = f"Added plate: {added_plates[0]}" 

1129 else: 

1130 self.app.current_status = f"Added {len(added_plates)} plates: {', '.join(added_plates)}" 

1131 else: 

1132 self.app.current_status = "No new plates added (duplicates skipped)" 

1133 

1134 def action_delete_plate(self) -> None: 

1135 selected_items, _ = self.get_selection_state() 

1136 if not selected_items: 

1137 self.app.show_error("No plate selected to delete.") 

1138 return 

1139 

1140 paths_to_delete = {p['path'] for p in selected_items} 

1141 self.items = [p for p in self.items if p['path'] not in paths_to_delete] 

1142 

1143 # Clean up orchestrators for deleted plates 

1144 for path in paths_to_delete: 

1145 if path in self.orchestrators: 

1146 del self.orchestrators[path] 

1147 

1148 if self.selected_plate in paths_to_delete: 

1149 self.selected_plate = "" 

1150 

1151 self.app.current_status = f"Deleted {len(paths_to_delete)} plate(s)" 

1152 

1153 

1154 

1155 async def action_edit_config(self) -> None: 

1156 """ 

1157 Handle Edit button - create per-orchestrator PipelineConfig instances. 

1158 

1159 This enables per-orchestrator configuration without affecting global configuration. 

1160 Shows resolved defaults from GlobalPipelineConfig with "Pipeline default: {value}" placeholders. 

1161 """ 

1162 selected_items, selection_mode = self.get_selection_state() 

1163 

1164 if selection_mode == "empty": 

1165 self.app.current_status = "No orchestrators selected for configuration" 

1166 return 

1167 

1168 selected_orchestrators = [ 

1169 self.orchestrators[item['path']] for item in selected_items 

1170 if item['path'] in self.orchestrators 

1171 ] 

1172 

1173 if not selected_orchestrators: 

1174 self.app.current_status = "No initialized orchestrators selected" 

1175 return 

1176 

1177 # Load existing config or create new one for editing 

1178 representative_orchestrator = selected_orchestrators[0] 

1179 

1180 # Use orchestrator's existing config if it exists, otherwise use global config as source 

1181 source_config = representative_orchestrator.pipeline_config or self.global_config 

1182 

1183 current_plate_config = create_dataclass_for_editing(PipelineConfig, source_config) 

1184 

1185 def handle_config_save(new_config: PipelineConfig) -> None: 

1186 """Apply per-orchestrator configuration without global side effects.""" 

1187 for orchestrator in selected_orchestrators: 

1188 # Direct synchronous call - no async needed 

1189 orchestrator.apply_pipeline_config(new_config) 

1190 count = len(selected_orchestrators) 

1191 self.app.current_status = f"Per-orchestrator configuration applied to {count} orchestrator(s)" 

1192 

1193 # Open configuration window using PipelineConfig (not GlobalPipelineConfig) 

1194 await self.window_service.open_config_window( 

1195 PipelineConfig, 

1196 current_plate_config, 

1197 on_save_callback=handle_config_save 

1198 ) 

1199 

1200 async def action_edit_global_config(self) -> None: 

1201 """ 

1202 Handle global configuration editing - affects all orchestrators. 

1203 

1204 This maintains the existing global configuration workflow but uses lazy loading. 

1205 """ 

1206 

1207 from openhcs.core.config import PipelineConfig 

1208 from openhcs.config_framework.lazy_factory import create_dataclass_for_editing 

1209 

1210 # Get current global config from app or use default 

1211 current_global_config = self.app.global_config or GlobalPipelineConfig() 

1212 

1213 # Create lazy PipelineConfig for editing with proper thread-local context 

1214 current_lazy_config = create_dataclass_for_editing(PipelineConfig, current_global_config, preserve_values=True) 

1215 

1216 def handle_global_config_save(new_config: PipelineConfig) -> None: 

1217 """Apply global configuration to all orchestrators.""" 

1218 # Convert lazy PipelineConfig back to GlobalPipelineConfig 

1219 global_config = new_config.to_base_config() 

1220 

1221 self.app.global_config = global_config # Update app-level config 

1222 

1223 # REMOVED: Thread-local modification - dual-axis resolver handles context automatically 

1224 

1225 for orchestrator in self.orchestrators.values(): 

1226 asyncio.create_task(orchestrator.apply_new_global_config(global_config)) 

1227 self.app.current_status = "Global configuration applied to all orchestrators" 

1228 

1229 # PipelineConfig already imported from openhcs.core.config 

1230 await self.window_service.open_config_window( 

1231 PipelineConfig, 

1232 current_lazy_config, 

1233 on_save_callback=handle_global_config_save 

1234 ) 

1235 

1236 

1237 

1238 def _analyze_orchestrator_configs(self, orchestrators: List['PipelineOrchestrator']) -> Dict[str, Dict[str, Any]]: 

1239 """Analyze configs across multiple orchestrators to detect same/different values. 

1240 

1241 Args: 

1242 orchestrators: List of PipelineOrchestrator instances 

1243 

1244 Returns: 

1245 Dict mapping field names to analysis results: 

1246 - {"type": "same", "value": actual_value, "default": default_value} 

1247 - {"type": "different", "values": [val1, val2, ...], "default": default_value} 

1248 """ 

1249 if not orchestrators: 

1250 return {} 

1251 

1252 # Get parameter info for defaults 

1253 param_info = SignatureAnalyzer.analyze(GlobalPipelineConfig) 

1254 

1255 config_analysis = {} 

1256 

1257 # Analyze each field in GlobalPipelineConfig 

1258 for field in dataclasses.fields(GlobalPipelineConfig): 

1259 field_name = field.name 

1260 

1261 # Get values from all orchestrators 

1262 values = [] 

1263 for orch in orchestrators: 

1264 try: 

1265 value = getattr(orch.global_config, field_name) 

1266 values.append(value) 

1267 except AttributeError: 

1268 # Field doesn't exist in this config, skip 

1269 continue 

1270 

1271 if not values: 

1272 continue 

1273 

1274 # Get default value from parameter info 

1275 param_details = param_info.get(field_name) 

1276 default_value = param_details.default_value if param_details else None 

1277 

1278 # Check if all values are the same 

1279 if all(self._values_equal(v, values[0]) for v in values): 

1280 config_analysis[field_name] = { 

1281 "type": "same", 

1282 "value": values[0], 

1283 "default": default_value 

1284 } 

1285 else: 

1286 config_analysis[field_name] = { 

1287 "type": "different", 

1288 "values": values, 

1289 "default": default_value 

1290 } 

1291 

1292 return config_analysis 

1293 

1294 def _values_equal(self, val1: Any, val2: Any) -> bool: 

1295 """Check if two values are equal, handling dataclasses and complex types.""" 

1296 # Handle dataclass comparison 

1297 if dataclasses.is_dataclass(val1) and dataclasses.is_dataclass(val2): 

1298 return dataclasses.asdict(val1) == dataclasses.asdict(val2) 

1299 

1300 # Handle regular comparison 

1301 return val1 == val2 

1302 

1303 def action_init_plate(self) -> None: 

1304 """Handle Init Plate button - initialize selected plates.""" 

1305 # Get current selection state 

1306 selected_items, selection_mode = self.get_selection_state() 

1307 

1308 if selection_mode == "empty": 

1309 logger.warning("No plates available for initialization") 

1310 return 

1311 

1312 # Validate all selected plates can be initialized (allow ALL failed plates to be re-initialized) 

1313 invalid_plates = [] 

1314 for item in selected_items: 

1315 plate_path = item['path'] 

1316 orchestrator = self.orchestrators.get(plate_path) 

1317 # Only block plates that are currently executing - all other states can be re-initialized 

1318 if orchestrator is not None and orchestrator.state == OrchestratorState.EXECUTING: 

1319 invalid_plates.append(item) 

1320 

1321 if invalid_plates: 

1322 names = [item['name'] for item in invalid_plates] 

1323 logger.warning(f"Cannot initialize plates that are currently executing: {', '.join(names)}") 

1324 return 

1325 

1326 # Start async initialization 

1327 self._start_async_init(selected_items, selection_mode) 

1328 

1329 def _start_async_init(self, selected_items: List[Dict], selection_mode: str) -> None: 

1330 """Start async initialization of selected plates.""" 

1331 # Generate operation description 

1332 desc = self.get_operation_description(selected_items, selection_mode, "initialize") 

1333 logger.info(f"Initializing: {desc}") 

1334 

1335 # Start background worker 

1336 self._init_plates_worker(selected_items) 

1337 

1338 @work(exclusive=True) 

1339 async def _init_plates_worker(self, selected_items: List[Dict]) -> None: 

1340 """Background worker for plate initialization.""" 

1341 for plate_data in selected_items: 

1342 plate_path = plate_data['path'] 

1343 

1344 # Find the actual plate in self.items (not the copy from get_selection_state) 

1345 actual_plate = None 

1346 for plate in self.items: 

1347 if plate['path'] == plate_path: 

1348 actual_plate = plate 

1349 break 

1350 

1351 if not actual_plate: 

1352 logger.error(f"Plate not found in plates list: {plate_path}") 

1353 continue 

1354 

1355 try: 

1356 # Run heavy initialization in executor to avoid blocking UI 

1357 def init_orchestrator(): 

1358 return PipelineOrchestrator( 

1359 plate_path=plate_path, 

1360 global_config=self.global_config, 

1361 storage_registry=self.filemanager.registry 

1362 ).initialize() 

1363 

1364 orchestrator = await asyncio.get_event_loop().run_in_executor(None, init_orchestrator) 

1365 

1366 # Store orchestrator for later use (channel selection, etc.) 

1367 self.orchestrators[plate_path] = orchestrator 

1368 # Orchestrator state is already set to READY by initialize() method 

1369 logger.info(f"Plate {actual_plate['name']} initialized successfully") 

1370 

1371 except Exception as e: 

1372 logger.error(f"Failed to initialize plate {plate_path}: {e}", exc_info=True) 

1373 # Create a failed orchestrator to track the error state 

1374 failed_orchestrator = PipelineOrchestrator( 

1375 plate_path=plate_path, 

1376 global_config=self.global_config, 

1377 storage_registry=self.filemanager.registry 

1378 ) 

1379 failed_orchestrator._state = OrchestratorState.INIT_FAILED 

1380 self.orchestrators[plate_path] = failed_orchestrator 

1381 actual_plate['error'] = str(e) 

1382 

1383 # Trigger UI refresh after orchestrator state changes 

1384 self._trigger_ui_refresh() 

1385 # Update button states immediately (reactive system handles UI updates automatically) 

1386 self._update_button_states() 

1387 # Notify pipeline editor of status change 

1388 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path'])) 

1389 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol) 

1390 logger.debug(f"Updated plate {actual_plate['name']} status") 

1391 

1392 # Final UI update (reactive system handles this automatically when self.items is modified) 

1393 self._update_button_states() 

1394 

1395 # Update status 

1396 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.READY]) 

1397 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.INIT_FAILED]) 

1398 

1399 if error_count == 0: 

1400 logger.info(f"Successfully initialized {success_count} plates") 

1401 else: 

1402 logger.warning(f"Initialized {success_count} plates, {error_count} errors") 

1403 

1404 def action_compile_plate(self) -> None: 

1405 """Handle Compile Plate button - compile pipelines for selected plates.""" 

1406 # Get current selection state 

1407 selected_items, selection_mode = self.get_selection_state() 

1408 

1409 if selection_mode == "empty": 

1410 logger.warning("No plates available for compilation") 

1411 return 

1412 

1413 # Validate all selected plates are ready for compilation (allow failed plates to be re-compiled) 

1414 not_ready = [] 

1415 for item in selected_items: 

1416 plate_path = item['path'] 

1417 orchestrator = self.orchestrators.get(plate_path) 

1418 # Allow READY, COMPILE_FAILED, EXEC_FAILED, COMPILED, and COMPLETED states to be compiled/recompiled 

1419 if orchestrator is None or orchestrator.state not in [OrchestratorState.READY, OrchestratorState.COMPILE_FAILED, OrchestratorState.EXEC_FAILED, OrchestratorState.COMPILED, OrchestratorState.COMPLETED]: 

1420 not_ready.append(item) 

1421 

1422 if not_ready: 

1423 names = [item['name'] for item in not_ready] 

1424 # More accurate error message based on actual state 

1425 if any(self.orchestrators.get(item['path']) is None for item in not_ready): 

1426 logger.warning(f"Cannot compile plates that haven't been initialized: {', '.join(names)}") 

1427 elif any(self.orchestrators.get(item['path']).state == OrchestratorState.EXECUTING for item in not_ready): 

1428 logger.warning(f"Cannot compile plates that are currently executing: {', '.join(names)}") 

1429 else: 

1430 logger.warning(f"Cannot compile plates in current state: {', '.join(names)}") 

1431 return 

1432 

1433 # Validate all selected plates have pipelines 

1434 no_pipeline = [] 

1435 for item in selected_items: 

1436 pipeline = self._get_current_pipeline_definition(item['path']) 

1437 if not pipeline: 

1438 no_pipeline.append(item) 

1439 

1440 if no_pipeline: 

1441 names = [item['name'] for item in no_pipeline] 

1442 self.app.current_status = f"Cannot compile plates without pipelines: {', '.join(names)}" 

1443 return 

1444 

1445 # Start async compilation 

1446 self._start_async_compile(selected_items, selection_mode) 

1447 

1448 def _start_async_compile(self, selected_items: List[Dict], selection_mode: str) -> None: 

1449 """Start async compilation of selected plates.""" 

1450 # Generate operation description 

1451 desc = self.get_operation_description(selected_items, selection_mode, "compile") 

1452 logger.info(f"Compiling: {desc}") 

1453 

1454 # Start background worker 

1455 self._compile_plates_worker(selected_items) 

1456 

1457 @work(exclusive=True) 

1458 async def _compile_plates_worker(self, selected_items: List[Dict]) -> None: 

1459 """Background worker for plate compilation.""" 

1460 for plate_data in selected_items: 

1461 plate_path = plate_data['path'] 

1462 

1463 # Find the actual plate in self.items (not the copy from get_selection_state) 

1464 actual_plate = None 

1465 for plate in self.items: 

1466 if plate['path'] == plate_path: 

1467 actual_plate = plate 

1468 break 

1469 

1470 if not actual_plate: 

1471 logger.error(f"Plate not found in plates list: {plate_path}") 

1472 continue 

1473 

1474 # Get definition pipeline and make fresh copy 

1475 definition_pipeline = self._get_current_pipeline_definition(plate_path) 

1476 if not definition_pipeline: 

1477 logger.warning(f"No pipeline defined for {actual_plate['name']}, using empty pipeline") 

1478 definition_pipeline = [] 

1479 

1480 try: 

1481 # Get or create orchestrator for compilation (run in executor to avoid blocking) 

1482 def get_or_create_orchestrator(): 

1483 if plate_path in self.orchestrators: 

1484 orchestrator = self.orchestrators[plate_path] 

1485 if not orchestrator.is_initialized(): 

1486 orchestrator.initialize() 

1487 return orchestrator 

1488 else: 

1489 return PipelineOrchestrator( 

1490 plate_path=plate_path, 

1491 global_config=self.global_config, 

1492 storage_registry=self.filemanager.registry 

1493 ).initialize() 

1494 

1495 orchestrator = await asyncio.get_event_loop().run_in_executor(None, get_or_create_orchestrator) 

1496 self.orchestrators[plate_path] = orchestrator 

1497 

1498 # Make fresh copy for compilation 

1499 execution_pipeline = copy.deepcopy(definition_pipeline) 

1500 

1501 # Fix step IDs after deep copy to match new object IDs 

1502 for step in execution_pipeline: 

1503 step.step_id = str(id(step)) 

1504 # Ensure variable_components is never None - use FunctionStep default 

1505 if step.variable_components is None: 

1506 logger.warning(f"🔥 Step '{step.name}' has None variable_components, setting FunctionStep default") 

1507 step.variable_components = [VariableComponents.SITE] 

1508 # Also ensure it's not an empty list 

1509 elif not step.variable_components: 

1510 logger.warning(f"🔥 Step '{step.name}' has empty variable_components, setting FunctionStep default") 

1511 step.variable_components = [VariableComponents.SITE] 

1512 

1513 # Get wells and compile (async - run in executor to avoid blocking UI) 

1514 # Wrap in Pipeline object like test_main.py does 

1515 pipeline_obj = Pipeline(steps=execution_pipeline) 

1516 

1517 # Run heavy operations in executor to avoid blocking UI 

1518 # Get wells using multiprocessing axis (WELL in default config) 

1519 from openhcs.constants import MULTIPROCESSING_AXIS 

1520 wells = await asyncio.get_event_loop().run_in_executor(None, lambda: orchestrator.get_component_keys(MULTIPROCESSING_AXIS)) 

1521 compiled_contexts = await asyncio.get_event_loop().run_in_executor( 

1522 None, orchestrator.compile_pipelines, pipeline_obj.steps, wells 

1523 ) 

1524 

1525 # Store state simply - no reactive property issues 

1526 step_ids_in_pipeline = [id(step) for step in execution_pipeline] 

1527 # Get step IDs from contexts (ProcessingContext objects) 

1528 first_well_key = list(compiled_contexts.keys())[0] if compiled_contexts else None 

1529 step_ids_in_contexts = list(compiled_contexts[first_well_key].step_plans.keys()) if first_well_key and hasattr(compiled_contexts[first_well_key], 'step_plans') else [] 

1530 logger.info(f"🔥 Storing compiled data for {plate_path}: pipeline={type(execution_pipeline)}, contexts={type(compiled_contexts)}") 

1531 logger.info(f"🔥 Step IDs in pipeline: {step_ids_in_pipeline}") 

1532 logger.info(f"🔥 Step IDs in contexts: {step_ids_in_contexts}") 

1533 self.plate_compiled_data[plate_path] = (execution_pipeline, compiled_contexts) 

1534 logger.info(f"🔥 Stored! Available compiled plates: {list(self.plate_compiled_data.keys())}") 

1535 

1536 # Orchestrator state is already set to COMPILED by compile_pipelines() method 

1537 logger.info(f"🔥 Successfully compiled {plate_path}") 

1538 

1539 except Exception as e: 

1540 logger.error(f"🔥 COMPILATION ERROR: Pipeline compilation failed for {plate_path}: {e}", exc_info=True) 

1541 # Orchestrator state is already set to FAILED by compile_pipelines() method 

1542 actual_plate['error'] = str(e) 

1543 # Don't store anything in plate_compiled_data on failure 

1544 

1545 # Trigger UI refresh after orchestrator state changes 

1546 self._trigger_ui_refresh() 

1547 # Update button states immediately (reactive system handles UI updates automatically) 

1548 self._update_button_states() 

1549 # Notify pipeline editor of status change 

1550 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path'])) 

1551 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol) 

1552 

1553 # Final UI update (reactive system handles this automatically when self.items is modified) 

1554 self._update_button_states() 

1555 

1556 # Update status 

1557 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILED]) 

1558 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILE_FAILED]) 

1559 

1560 if error_count == 0: 

1561 logger.info(f"Successfully compiled {success_count} plates") 

1562 else: 

1563 logger.warning(f"Compiled {success_count} plates, {error_count} errors") 

1564 

1565 async def action_code_plate(self) -> None: 

1566 """Generate Python code for selected plates and their pipelines.""" 

1567 logger.debug("Code button pressed - generating Python code for plates") 

1568 

1569 selected_items, _ = self.get_selection_state() 

1570 if not selected_items: 

1571 self.app.current_status = "No plates selected for code generation" 

1572 return 

1573 

1574 try: 

1575 # Get pipeline data for selected plates 

1576 plate_paths = [item['path'] for item in selected_items] 

1577 pipeline_data = {} 

1578 

1579 # Collect pipeline steps for each plate 

1580 for plate_path in plate_paths: 

1581 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1582 # Get pipeline steps from pipeline editor if available 

1583 if plate_path in self.pipeline_editor.plate_pipelines: 

1584 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path] 

1585 else: 

1586 pipeline_data[plate_path] = [] 

1587 else: 

1588 pipeline_data[plate_path] = [] 

1589 

1590 # Use existing pickle_to_python logic to generate complete script 

1591 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher 

1592 

1593 # Create data structure like pickle_to_python expects 

1594 data = { 

1595 'plate_paths': plate_paths, 

1596 'pipeline_data': pipeline_data, 

1597 'global_config': self.app.global_config 

1598 } 

1599 

1600 # Extract variables from data dict 

1601 plate_paths = data['plate_paths'] 

1602 pipeline_data = data['pipeline_data'] 

1603 

1604 # Generate just the orchestrator configuration (no execution wrapper) 

1605 from openhcs.debug.pickle_to_python import generate_complete_orchestrator_code 

1606 

1607 python_code = generate_complete_orchestrator_code( 

1608 plate_paths=plate_paths, 

1609 pipeline_data=pipeline_data, 

1610 global_config=self.app.global_config, 

1611 clean_mode=True # Default to clean mode - only show non-default values 

1612 ) 

1613 

1614 # Create callback to handle edited code 

1615 def handle_edited_code(edited_code: str): 

1616 logger.debug("Orchestrator code edited, processing changes...") 

1617 try: 

1618 # Execute the code (it has all necessary imports) 

1619 namespace = {} 

1620 exec(edited_code, namespace) 

1621 

1622 # Update pipeline data if present (composition: orchestrator contains pipelines) 

1623 if 'pipeline_data' in namespace: 

1624 new_pipeline_data = namespace['pipeline_data'] 

1625 # Update pipeline editor using reactive system (like pipeline code button does) 

1626 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1627 # Update plate pipelines storage 

1628 current_pipelines = dict(self.pipeline_editor.plate_pipelines) 

1629 current_pipelines.update(new_pipeline_data) 

1630 self.pipeline_editor.plate_pipelines = current_pipelines 

1631 

1632 # If current plate is in the edited data, update the current view too 

1633 current_plate = self.pipeline_editor.current_plate 

1634 if current_plate and current_plate in new_pipeline_data: 

1635 self.pipeline_editor.pipeline_steps = new_pipeline_data[current_plate] 

1636 

1637 self.app.current_status = f"Pipeline data updated for {len(new_pipeline_data)} plates" 

1638 

1639 # Update global config if present 

1640 elif 'global_config' in namespace: 

1641 new_global_config = namespace['global_config'] 

1642 import asyncio 

1643 for plate_path in plate_paths: 

1644 if plate_path in self.orchestrators: 

1645 orchestrator = self.orchestrators[plate_path] 

1646 asyncio.create_task(orchestrator.apply_new_global_config(new_global_config)) 

1647 self.app.current_status = f"Global config updated for {len(plate_paths)} plates" 

1648 

1649 # Update orchestrators list if present 

1650 elif 'orchestrators' in namespace: 

1651 new_orchestrators = namespace['orchestrators'] 

1652 self.app.current_status = f"Orchestrator list updated with {len(new_orchestrators)} orchestrators" 

1653 

1654 else: 

1655 self.app.show_error("Parse Error", "No valid assignments found in edited code") 

1656 

1657 except SyntaxError as e: 

1658 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}") 

1659 except Exception as e: 

1660 import traceback 

1661 full_traceback = traceback.format_exc() 

1662 logger.error(f"Failed to parse edited orchestrator code: {e}\nFull traceback:\n{full_traceback}") 

1663 self.app.show_error("Edit Error", f"Failed to parse orchestrator code: {str(e)}\n\nFull traceback:\n{full_traceback}") 

1664 

1665 # Launch terminal editor 

1666 launcher = TerminalLauncher(self.app) 

1667 await launcher.launch_editor_for_file( 

1668 file_content=python_code, 

1669 file_extension='.py', 

1670 on_save_callback=handle_edited_code 

1671 ) 

1672 

1673 except Exception as e: 

1674 logger.error(f"Failed to generate plate code: {e}") 

1675 self.app.current_status = f"Failed to generate code: {e}" 

1676 

1677 async def action_save_python_script(self) -> None: 

1678 """Save Python script for selected plates (like special_io_pipeline.py).""" 

1679 logger.debug("Save button pressed - saving Python script for plates") 

1680 

1681 selected_items, _ = self.get_selection_state() 

1682 if not selected_items: 

1683 self.app.current_status = "No plates selected for script generation" 

1684 return 

1685 

1686 try: 

1687 # Get pipeline data for selected plates 

1688 plate_paths = [item['path'] for item in selected_items] 

1689 pipeline_data = {} 

1690 

1691 # Collect pipeline steps for each plate 

1692 for plate_path in plate_paths: 

1693 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1694 # Get pipeline steps from pipeline editor if available 

1695 if plate_path in self.pipeline_editor.plate_pipelines: 

1696 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path] 

1697 else: 

1698 pipeline_data[plate_path] = [] 

1699 else: 

1700 pipeline_data[plate_path] = [] 

1701 

1702 # Create data structure like pickle_to_python expects 

1703 data = { 

1704 'plate_paths': plate_paths, 

1705 'pipeline_data': pipeline_data, 

1706 'global_config': self.app.global_config 

1707 } 

1708 

1709 # Generate complete executable Python script using pickle_to_python logic 

1710 python_code = self._generate_executable_script(data) 

1711 

1712 # Launch file browser to save the script 

1713 from openhcs.textual_tui.windows.file_browser_window import open_file_browser_window, BrowserMode 

1714 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

1715 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

1716 from openhcs.constants.constants import Backend 

1717 

1718 def handle_save_result(result): 

1719 if result: 

1720 # Handle both single Path and list of Paths 

1721 save_path = None 

1722 if isinstance(result, Path): 

1723 save_path = result 

1724 elif isinstance(result, list) and len(result) > 0: 

1725 save_path = result[0] # Take first path 

1726 

1727 if save_path: 

1728 try: 

1729 # Write the Python script to the selected file 

1730 with open(save_path, 'w') as f: 

1731 f.write(python_code) 

1732 

1733 logger.info(f"Python script saved to: {save_path}") 

1734 self.app.current_status = f"Python script saved to: {save_path}" 

1735 except Exception as e: 

1736 logger.error(f"Failed to save Python script: {e}") 

1737 self.app.current_status = f"Failed to save script: {e}" 

1738 

1739 # Generate default filename based on first plate 

1740 first_plate_name = Path(plate_paths[0]).name if plate_paths else "pipeline" 

1741 default_filename = f"{first_plate_name}_pipeline.py" 

1742 

1743 await open_file_browser_window( 

1744 app=self.app, 

1745 file_manager=self.app.filemanager, 

1746 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

1747 backend=Backend.DISK, 

1748 title="Save Python Pipeline Script", 

1749 mode=BrowserMode.SAVE, 

1750 selection_mode=SelectionMode.FILES_ONLY, 

1751 filter_extensions=['.py'], 

1752 default_filename=default_filename, 

1753 cache_key=PathCacheKey.PIPELINE_FILES, 

1754 on_result_callback=handle_save_result, 

1755 caller_id="plate_manager_save_script" 

1756 ) 

1757 

1758 except Exception as e: 

1759 logger.error(f"Failed to save Python script: {e}") 

1760 self.app.current_status = f"Failed to save script: {e}" 

1761 

1762 def _generate_executable_script(self, data: Dict) -> str: 

1763 """Generate fully executable Python script by creating a temp pickle and using existing convert_pickle_to_python.""" 

1764 import tempfile 

1765 import dill as pickle 

1766 from openhcs.debug.pickle_to_python import convert_pickle_to_python 

1767 

1768 # Create temporary pickle file 

1769 with tempfile.NamedTemporaryFile(mode='wb', suffix='.pkl', delete=False) as temp_pickle: 

1770 pickle.dump(data, temp_pickle) 

1771 temp_pickle_path = temp_pickle.name 

1772 

1773 # Create temporary output file 

1774 with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp_output: 

1775 temp_output_path = temp_output.name 

1776 

1777 try: 

1778 # Use existing convert_pickle_to_python function 

1779 convert_pickle_to_python(temp_pickle_path, temp_output_path) 

1780 

1781 # Read the generated script 

1782 with open(temp_output_path, 'r') as f: 

1783 script_content = f.read() 

1784 

1785 return script_content 

1786 

1787 finally: 

1788 # Clean up temp files 

1789 import os 

1790 try: 

1791 os.unlink(temp_pickle_path) 

1792 os.unlink(temp_output_path) 

1793 except: 

1794 pass 

1795 

1796