Coverage for openhcs/textual_tui/widgets/plate_manager.py: 0.0%

938 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2PlateManagerWidget for OpenHCS Textual TUI 

3 

4Plate management widget with complete button set and reactive state management. 

5Matches the functionality from the current prompt-toolkit TUI. 

6""" 

7 

8import asyncio 

9import copy 

10import dataclasses 

11import inspect 

12import json 

13import logging 

14import numpy as np 

15import os 

16import pickle 

17import re 

18import signal 

19import stat 

20import subprocess 

21import sys 

22import tempfile 

23import time 

24import traceback 

25from collections import defaultdict 

26from datetime import datetime 

27from pathlib import Path 

28from typing import Dict, List, Optional, Callable, Any, Tuple 

29 

30from openhcs.core.config import PipelineConfig 

31 

32from PIL import Image 

33from textual.app import ComposeResult 

34from textual.containers import Horizontal, ScrollableContainer 

35from textual.reactive import reactive 

36from textual.widgets import Button, Static, SelectionList 

37from textual.widget import Widget 

38from textual.css.query import NoMatches 

39from .button_list_widget import ButtonListWidget, ButtonConfig 

40from textual import work, on 

41 

42from openhcs.core.config import GlobalPipelineConfig, VFSConfig, MaterializationBackend 

43from openhcs.core.pipeline import Pipeline 

44from openhcs.io.filemanager import FileManager 

45from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks 

46from openhcs.io.base import storage_registry 

47from openhcs.io.memory import MemoryStorageBackend 

48from openhcs.io.zarr import ZarrStorageBackend 

49from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator 

50from openhcs.constants.constants import GroupBy, Backend, VariableComponents, OrchestratorState 

51from openhcs.textual_tui.services.file_browser_service import SelectionMode 

52from openhcs.textual_tui.services.window_service import WindowService 

53from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey, get_path_cache 

54from openhcs.textual_tui.widgets.shared.signature_analyzer import SignatureAnalyzer 

55 

56logger = logging.getLogger(__name__) 

57 

58# Note: Using subprocess approach instead of multiprocessing to avoid TUI FD conflicts 

59 

60def get_orchestrator_status_symbol(orchestrator: PipelineOrchestrator) -> str: 

61 """Get UI symbol for orchestrator state - simple mapping without over-engineering.""" 

62 if orchestrator is None: 

63 return "?" # No orchestrator (newly added plate) 

64 

65 state_to_symbol = { 

66 OrchestratorState.CREATED: "?", # Created but not initialized 

67 OrchestratorState.READY: "-", # Initialized, ready for compilation 

68 OrchestratorState.COMPILED: "o", # Compiled, ready for execution 

69 OrchestratorState.EXECUTING: "!", # Execution in progress 

70 OrchestratorState.COMPLETED: "C", # Execution completed successfully 

71 OrchestratorState.INIT_FAILED: "I", # Initialization failed 

72 OrchestratorState.COMPILE_FAILED: "P", # Compilation failed (P for Pipeline) 

73 OrchestratorState.EXEC_FAILED: "X", # Execution failed 

74 } 

75 

76 return state_to_symbol.get(orchestrator.state, "?") 

77 

78def get_current_log_file_path() -> str: 

79 """Get the current log file path from the logging system.""" 

80 try: 

81 # Get the root logger and find the FileHandler 

82 root_logger = logging.getLogger() 

83 for handler in root_logger.handlers: 

84 if isinstance(handler, logging.FileHandler): 

85 return handler.baseFilename 

86 

87 # Fallback: try to get from openhcs logger 

88 openhcs_logger = logging.getLogger("openhcs") 

89 for handler in openhcs_logger.handlers: 

90 if isinstance(handler, logging.FileHandler): 

91 return handler.baseFilename 

92 

93 # Last resort: create a default path 

94 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs" 

95 log_dir.mkdir(parents=True, exist_ok=True) 

96 return str(log_dir / f"openhcs_subprocess_{int(time.time())}.log") 

97 

98 except Exception as e: 

99 logger.warning(f"Could not determine log file path: {e}") 

100 return "/tmp/openhcs_subprocess.log" 

101 

102 

103 

104 

105 

106 

107 

108class PlateManagerWidget(ButtonListWidget): 

109 """ 

110 Plate management widget using Textual reactive state. 

111 """ 

112 

113 # Semantic reactive property (like PipelineEditor's pipeline_steps) 

114 selected_plate = reactive("") 

115 orchestrators = reactive({}) 

116 plate_configs = reactive({}) 

117 orchestrator_state_version = reactive(0) # Increment to trigger UI refresh 

118 

119 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig): 

120 button_configs = [ 

121 ButtonConfig("Add", "add_plate"), 

122 ButtonConfig("Del", "del_plate", disabled=True), 

123 ButtonConfig("Edit", "edit_config", disabled=True), # Unified edit button for config editing 

124 ButtonConfig("Init", "init_plate", disabled=True), 

125 ButtonConfig("Compile", "compile_plate", disabled=True), 

126 ButtonConfig("Run", "run_plate", disabled=True), 

127 ButtonConfig("Code", "code_plate", disabled=True), # Generate Python code 

128 ButtonConfig("Save", "save_python_script", disabled=True), # Save Python script 

129 # ButtonConfig("Export", "export_ome_zarr", disabled=True), # Export to OME-ZARR - HIDDEN FROM UI 

130 ] 

131 super().__init__( 

132 button_configs=button_configs, 

133 list_id="plate_content", 

134 container_id="plate_list", 

135 on_button_pressed=self._handle_button_press, 

136 on_selection_changed=self._handle_selection_change, 

137 on_item_moved=self._handle_item_moved 

138 ) 

139 self.filemanager = filemanager 

140 self.global_config = global_config 

141 self.plate_compiled_data = {} 

142 self.on_plate_selected: Optional[Callable[[str], None]] = None 

143 self.pipeline_editor: Optional['PipelineEditorWidget'] = None 

144 

145 # Initialize window service to avoid circular imports 

146 self.window_service = None # Will be set in on_mount 

147 

148 # --- Subprocess Architecture --- 

149 self.current_process: Optional[subprocess.Popen] = None 

150 self.log_file_path: Optional[str] = None # Single source of truth 

151 self.log_file_position: int = 0 # Track position in log file for incremental reading 

152 # Async monitoring using Textual's interval system 

153 self.monitoring_interval = None 

154 self.monitoring_active = False 

155 # --- 

156 

157 logger.debug("PlateManagerWidget initialized") 

158 

159 

160 

161 

162 

163 def on_unmount(self) -> None: 

164 logger.debug("Unmounting PlateManagerWidget, ensuring worker process is terminated.") 

165 # Schedule async stop execution since on_unmount is sync 

166 import asyncio 

167 if self.current_process and self.current_process.poll() is None: 

168 # Create a task to stop execution asynchronously 

169 asyncio.create_task(self.action_stop_execution()) 

170 self._stop_monitoring() 

171 

172 def format_item_for_display(self, plate: Dict) -> Tuple[str, str]: 

173 # Get status from orchestrator instead of magic string 

174 plate_path = plate.get('path', '') 

175 orchestrator = self.orchestrators.get(plate_path) 

176 status_symbol = get_orchestrator_status_symbol(orchestrator) 

177 

178 status_symbols = { 

179 "?": "➕", # Created (not initialized) 

180 "-": "✅", # Ready (initialized) 

181 "o": "⚡", # Compiled 

182 "!": "🔄", # Executing 

183 "C": "🏁", # Completed 

184 "I": "🚫", # Init failed 

185 "P": "💥", # Compile failed (Pipeline) 

186 "X": "❌" # Execution failed 

187 } 

188 status_icon = status_symbols.get(status_symbol, "❓") 

189 plate_name = plate.get('name', 'Unknown') 

190 display_text = f"{status_icon} {plate_name} - {plate_path}" 

191 return display_text, plate_path 

192 

193 async def _handle_button_press(self, button_id: str) -> None: 

194 action_map = { 

195 "add_plate": self.action_add_plate, 

196 "del_plate": self.action_delete_plate, 

197 "edit_config": self.action_edit_config, # Unified edit button 

198 "init_plate": self.action_init_plate, 

199 "compile_plate": self.action_compile_plate, 

200 "code_plate": self.action_code_plate, # Generate Python code 

201 "save_python_script": self.action_save_python_script, # Save Python script 

202 # "export_ome_zarr": self.action_export_ome_zarr, # HIDDEN 

203 } 

204 if button_id in action_map: 

205 action = action_map[button_id] 

206 if inspect.iscoroutinefunction(action): 

207 await action() 

208 else: 

209 action() 

210 elif button_id == "run_plate": 

211 if self._is_any_plate_running(): 

212 await self.action_stop_execution() 

213 else: 

214 await self.action_run_plate() 

215 

216 def _handle_selection_change(self, selected_values: List[str]) -> None: 

217 logger.debug(f"Checkmarks changed: {len(selected_values)} items selected") 

218 

219 def _handle_item_moved(self, from_index: int, to_index: int) -> None: 

220 current_plates = list(self.items) 

221 plate = current_plates.pop(from_index) 

222 current_plates.insert(to_index, plate) 

223 self.items = current_plates 

224 plate_name = plate['name'] 

225 direction = "up" if to_index < from_index else "down" 

226 self.app.current_status = f"Moved plate '{plate_name}' {direction}" 

227 

228 def on_mount(self) -> None: 

229 # Initialize window service 

230 self.window_service = WindowService(self.app) 

231 

232 self.call_later(self._delayed_update_display) 

233 self.call_later(self._update_button_states) 

234 

235 def watch_items(self, items: List[Dict]) -> None: 

236 """Automatically update UI when items changes (follows ButtonListWidget pattern).""" 

237 # DEBUG: Log when items list changes to track the source of the reset 

238 stack_trace = ''.join(traceback.format_stack()[-3:-1]) # Get last 2 stack frames 

239 logger.debug(f"🔍 ITEMS CHANGED: {len(items)} plates. Call stack:\n{stack_trace}") 

240 

241 # CRITICAL: Call parent's watch_items to update the SelectionList 

242 super().watch_items(items) 

243 

244 logger.debug(f"Plates updated: {len(items)} plates") 

245 self._update_button_states() 

246 

247 def watch_highlighted_item(self, plate_path: str) -> None: 

248 self.selected_plate = plate_path 

249 logger.debug(f"Highlighted plate: {plate_path}") 

250 

251 def watch_selected_plate(self, plate_path: str) -> None: 

252 self._update_button_states() 

253 if self.on_plate_selected and plate_path: 

254 self.on_plate_selected(plate_path) 

255 logger.debug(f"Selected plate: {plate_path}") 

256 

257 def watch_orchestrator_state_version(self, version: int) -> None: 

258 """Automatically refresh UI when orchestrator states change.""" 

259 # Only update UI if widget is properly mounted 

260 if not self.is_mounted: 

261 return 

262 

263 # Force SelectionList to update by calling _update_selection_list 

264 # This re-calls format_item_for_display() for all items 

265 self._update_selection_list() 

266 

267 # CRITICAL: Update main button states when orchestrator states change 

268 self._update_button_states() 

269 

270 # Also notify PipelineEditor if connected 

271 if self.pipeline_editor: 

272 logger.debug(f"PlateManager: Notifying PipelineEditor of orchestrator state change (version {version})") 

273 self.pipeline_editor._update_button_states() 

274 

275 def get_selection_state(self) -> tuple[List[Dict], str]: 

276 # Check if widget is properly mounted first 

277 if not self.is_mounted: 

278 logger.debug("get_selection_state called on unmounted widget") 

279 return [], "empty" 

280 

281 try: 

282 selection_list = self.query_one(f"#{self.list_id}") 

283 multi_selected_values = selection_list.selected 

284 if multi_selected_values: 

285 selected_items = [p for p in self.items if p.get('path') in multi_selected_values] 

286 return selected_items, "checkbox" 

287 elif self.selected_plate: 

288 selected_items = [p for p in self.items if p.get('path') == self.selected_plate] 

289 return selected_items, "cursor" 

290 else: 

291 return [], "empty" 

292 except Exception as e: 

293 # DOM CORRUPTION DETECTED - This is a critical error 

294 stack_trace = ''.join(traceback.format_stack()[-3:-1]) 

295 logger.error(f"🚨 DOM CORRUPTION: Failed to get selection state: {e}") 

296 logger.error(f"🚨 DOM CORRUPTION: Call stack:\n{stack_trace}") 

297 logger.error(f"🚨 DOM CORRUPTION: Widget mounted: {self.is_mounted}") 

298 logger.error(f"🚨 DOM CORRUPTION: Looking for: #{self.list_id}") 

299 logger.error(f"🚨 DOM CORRUPTION: Plates count: {len(self.items)}") 

300 

301 # Try to diagnose what widgets actually exist 

302 try: 

303 all_widgets = list(self.query("*")) 

304 widget_ids = [w.id for w in all_widgets if w.id] 

305 logger.error(f"🚨 DOM CORRUPTION: Available widget IDs: {widget_ids}") 

306 except Exception as diag_e: 

307 logger.error(f"🚨 DOM CORRUPTION: Could not diagnose widgets: {diag_e}") 

308 

309 if self.selected_plate: 

310 selected_items = [p for p in self.items if p.get('path') == self.selected_plate] 

311 return selected_items, "cursor" 

312 return [], "empty" 

313 

314 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str: 

315 count = len(selected_items) 

316 if count == 0: return f"No items for {operation}" 

317 if count == 1: return f"{operation.title()} item: {selected_items[0].get('name', 'Unknown')}" 

318 return f"{operation.title()} {count} items" 

319 

320 def _delayed_update_display(self) -> None: 

321 """Trigger UI update - no longer needed since reactive system handles this automatically.""" 

322 # The reactive system now handles updates automatically via watch_plates() 

323 # This method is kept for compatibility but does nothing 

324 pass 

325 

326 def _trigger_ui_refresh(self) -> None: 

327 """Force UI refresh when orchestrator state changes without items list changing.""" 

328 # Increment reactive counter to trigger automatic UI refresh 

329 self.orchestrator_state_version += 1 

330 

331 def _update_button_states(self) -> None: 

332 try: 

333 # Check if widget is mounted and buttons exist 

334 if not self.is_mounted: 

335 return 

336 

337 has_selection = bool(self.selected_plate) 

338 is_running = self._is_any_plate_running() 

339 

340 # Check if there are any selected items (for delete button) 

341 selected_items, _ = self.get_selection_state() 

342 has_selected_items = bool(selected_items) 

343 

344 can_run = has_selection and any(p['path'] in self.plate_compiled_data for p in self.items if p.get('path') == self.selected_plate) 

345 

346 # Try to get run button - if it doesn't exist, widget is not fully mounted 

347 try: 

348 run_button = self.query_one("#run_plate") 

349 if is_running: 

350 run_button.label = "Stop" 

351 run_button.disabled = False 

352 else: 

353 run_button.label = "Run" 

354 run_button.disabled = not can_run 

355 except: 

356 # Buttons not mounted yet, skip update 

357 return 

358 

359 self.query_one("#add_plate").disabled = is_running 

360 self.query_one("#del_plate").disabled = not self.items or not has_selected_items or is_running 

361 

362 # Edit button (config editing) enabled when 1+ orchestrators selected and initialized 

363 selected_items, _ = self.get_selection_state() 

364 edit_enabled = ( 

365 len(selected_items) > 0 and 

366 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

367 not is_running 

368 ) 

369 self.query_one("#edit_config").disabled = not edit_enabled 

370 

371 # Init button - enabled when plates are selected, can be initialized, and not running 

372 init_enabled = ( 

373 len(selected_items) > 0 and 

374 any(self._can_orchestrator_be_initialized(item['path']) for item in selected_items) and 

375 not is_running 

376 ) 

377 self.query_one("#init_plate").disabled = not init_enabled 

378 

379 # Compile button - enabled when plates are selected, initialized, and not running 

380 selected_items, _ = self.get_selection_state() 

381 compile_enabled = ( 

382 len(selected_items) > 0 and 

383 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

384 not is_running 

385 ) 

386 self.query_one("#compile_plate").disabled = not compile_enabled 

387 

388 # Code button - enabled when plates are selected, initialized, and not running 

389 code_enabled = ( 

390 len(selected_items) > 0 and 

391 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

392 not is_running 

393 ) 

394 self.query_one("#code_plate").disabled = not code_enabled 

395 

396 # Save Python script button - enabled when plates are selected, initialized, and not running 

397 save_enabled = ( 

398 len(selected_items) > 0 and 

399 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

400 not is_running 

401 ) 

402 self.query_one("#save_python_script").disabled = not save_enabled 

403 

404 # Export button - enabled when plate is initialized and has workspace (HIDDEN FROM UI) 

405 # export_enabled = ( 

406 # has_selection and 

407 # self.selected_plate in self.orchestrators and 

408 # not is_running 

409 # ) 

410 # try: 

411 # self.query_one("#export_ome_zarr").disabled = not export_enabled 

412 # except: 

413 # pass # Button is hidden from UI 

414 

415 # Debug button removed - no longer needed 

416 

417 except Exception as e: 

418 # Only log if it's not a mounting/unmounting issue 

419 if self.is_mounted: 

420 logger.debug(f"Button state update skipped (widget not ready): {e}") 

421 # Don't log errors during unmounting 

422 

423 def _is_any_plate_running(self) -> bool: 

424 return self.current_process is not None and self.current_process.poll() is None 

425 

426 def _has_pipelines(self, plates: List[Dict]) -> bool: 

427 """Check if all plates have pipeline definitions.""" 

428 if not self.pipeline_editor: 

429 return False 

430 

431 for plate in plates: 

432 pipeline = self.pipeline_editor.get_pipeline_for_plate(plate['path']) 

433 if not pipeline: 

434 return False 

435 return True 

436 

437 def get_plate_status(self, plate_path: str) -> str: 

438 """Get status for specific plate - now uses orchestrator state.""" 

439 orchestrator = self.orchestrators.get(plate_path) 

440 return get_orchestrator_status_symbol(orchestrator) 

441 

442 def _is_orchestrator_initialized(self, plate_path: str) -> bool: 

443 """Check if orchestrator exists and is in an initialized state.""" 

444 orchestrator = self.orchestrators.get(plate_path) 

445 if orchestrator is None: 

446 return False 

447 return orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED, 

448 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED, 

449 OrchestratorState.EXEC_FAILED] 

450 

451 def _can_orchestrator_be_initialized(self, plate_path: str) -> bool: 

452 """Check if orchestrator can be initialized (doesn't exist or is in a re-initializable state).""" 

453 orchestrator = self.orchestrators.get(plate_path) 

454 if orchestrator is None: 

455 return True # No orchestrator exists, can be initialized 

456 return orchestrator.state in [OrchestratorState.CREATED, OrchestratorState.INIT_FAILED] 

457 

458 def _notify_pipeline_editor_status_change(self, plate_path: str, new_status: str) -> None: 

459 """Notify pipeline editor when plate status changes (enables Add button immediately).""" 

460 if self.pipeline_editor and self.pipeline_editor.current_plate == plate_path: 

461 # Update pipeline editor's status and trigger button state update 

462 self.pipeline_editor.current_plate_status = new_status 

463 

464 def _get_current_pipeline_definition(self, plate_path: str = None) -> List: 

465 """Get current pipeline definition from PipelineEditor (now returns FunctionStep objects directly).""" 

466 if not self.pipeline_editor: 

467 logger.warning("No pipeline editor reference - using empty pipeline") 

468 return [] 

469 

470 # Get pipeline for specific plate or current plate 

471 target_plate = plate_path or self.pipeline_editor.current_plate 

472 if not target_plate: 

473 logger.warning("No plate specified - using empty pipeline") 

474 return [] 

475 

476 # Get pipeline from editor (now returns List[FunctionStep] directly) 

477 pipeline_steps = self.pipeline_editor.get_pipeline_for_plate(target_plate) 

478 

479 # No conversion needed - pipeline_steps are already FunctionStep objects with memory type decorators 

480 return pipeline_steps 

481 

482 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str: 

483 """Generate human-readable description of what will be operated on.""" 

484 count = len(selected_items) 

485 if selection_mode == "empty": 

486 return f"No items available for {operation}" 

487 elif selection_mode == "all": 

488 return f"{operation.title()} ALL {count} items" 

489 elif selection_mode == "checkbox": 

490 if count == 1: 

491 item_name = selected_items[0].get('name', 'Unknown') 

492 return f"{operation.title()} selected item: {item_name}" 

493 else: 

494 return f"{operation.title()} {count} selected items" 

495 else: 

496 return f"{operation.title()} {count} items" 

497 

498 def _reset_execution_state(self, status_message: str, force_fail_executing: bool = True): 

499 if self.current_process: 

500 if self.current_process.poll() is None: # Still running 

501 logger.warning("Forcefully terminating subprocess during reset.") 

502 self.current_process.terminate() 

503 try: 

504 self.current_process.wait(timeout=1) 

505 except subprocess.TimeoutExpired: 

506 self.current_process.kill() # Force kill if terminate fails 

507 self.current_process = None 

508 

509 # Clear log file reference (no temp files - log file is single source of truth) 

510 self.log_file_path = None 

511 self.log_file_position = 0 

512 

513 # Stop async monitoring 

514 self._stop_monitoring() 

515 

516 # Only reset executing orchestrators to failed if this is a forced termination 

517 # Natural completion should preserve the states set by the completion handler 

518 if force_fail_executing: 

519 for plate_path, orchestrator in self.orchestrators.items(): 

520 if orchestrator.state == OrchestratorState.EXECUTING: 

521 orchestrator._state = OrchestratorState.EXEC_FAILED 

522 

523 # Trigger UI refresh after state changes - this is essential for button states 

524 self._trigger_ui_refresh() 

525 

526 # Update button states - but only if widget is properly mounted 

527 try: 

528 if self.is_mounted and hasattr(self, 'query_one'): 

529 self._update_button_states() 

530 except Exception as e: 

531 logger.error(f"Failed to update button states during reset: {e}") 

532 

533 self.app.current_status = status_message 

534 

535 async def action_run_plate(self) -> None: 

536 # Clear logs from singleton toolong window before starting new run 

537 try: 

538 from openhcs.textual_tui.windows.toolong_window import clear_toolong_logs 

539 logger.info("Clearing logs from singleton toolong window before new run") 

540 clear_toolong_logs(self.app) 

541 logger.info("Toolong logs cleared") 

542 except Exception as e: 

543 logger.error(f"Failed to clear toolong logs: {e}") 

544 import traceback 

545 logger.error(traceback.format_exc()) 

546 

547 selected_items, _ = self.get_selection_state() 

548 if not selected_items: 

549 self.app.show_error("No plates selected to run.") 

550 return 

551 

552 ready_items = [item for item in selected_items if item.get('path') in self.plate_compiled_data] 

553 if not ready_items: 

554 self.app.show_error("Selected plates are not compiled. Please compile first.") 

555 return 

556 

557 try: 

558 # Use subprocess approach like integration tests 

559 logger.debug("🔥 Using subprocess approach for clean isolation") 

560 

561 plate_paths_to_run = [item['path'] for item in ready_items] 

562 

563 # Pass definition pipeline steps - subprocess will make fresh copy and compile 

564 pipeline_data = {} 

565 for plate_path in plate_paths_to_run: 

566 definition_pipeline = self._get_current_pipeline_definition(plate_path) 

567 pipeline_data[plate_path] = definition_pipeline 

568 

569 logger.info(f"🔥 Starting subprocess for {len(plate_paths_to_run)} plates") 

570 

571 # Create data file for subprocess (only file needed besides log) 

572 data_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') 

573 

574 # Generate unique ID for this subprocess 

575 import time 

576 subprocess_timestamp = int(time.time()) 

577 plate_names = [Path(path).name for path in plate_paths_to_run] 

578 unique_id = f"plates_{'_'.join(plate_names[:2])}_{subprocess_timestamp}" # Limit to first 2 plates for filename length 

579 

580 # Build subprocess log name from TUI log base using log utilities 

581 from openhcs.core.log_utils import get_current_log_file_path 

582 try: 

583 tui_log_path = get_current_log_file_path() 

584 if tui_log_path.endswith('.log'): 

585 tui_base = tui_log_path[:-4] # Remove .log extension 

586 else: 

587 tui_base = tui_log_path 

588 log_file_base = f"{tui_base}_subprocess_{subprocess_timestamp}" 

589 except RuntimeError: 

590 # Fallback if no main log found 

591 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs" 

592 log_dir.mkdir(parents=True, exist_ok=True) 

593 log_file_base = str(log_dir / f"tui_subprocess_{subprocess_timestamp}") 

594 

595 # Pickle data for subprocess 

596 subprocess_data = { 

597 'plate_paths': plate_paths_to_run, 

598 'pipeline_data': pipeline_data, 

599 'global_config': self.app.global_config # Pickle config object directly 

600 } 

601 

602 # Wrap pickle operation in executor to avoid blocking UI 

603 def _write_pickle_data(): 

604 import dill as pickle 

605 with open(data_file.name, 'wb') as f: 

606 pickle.dump(subprocess_data, f) 

607 data_file.close() 

608 

609 await asyncio.get_event_loop().run_in_executor(None, _write_pickle_data) 

610 

611 logger.debug(f"🔥 Created data file: {data_file.name}") 

612 

613 # Create subprocess (like integration tests) 

614 subprocess_script = Path(__file__).parent.parent / "subprocess_runner.py" 

615 

616 # Generate actual log file path that subprocess will create 

617 actual_log_file_path = f"{log_file_base}_{unique_id}.log" 

618 logger.debug(f"🔥 Log file base: {log_file_base}") 

619 logger.debug(f"🔥 Unique ID: {unique_id}") 

620 logger.debug(f"🔥 Actual log file: {actual_log_file_path}") 

621 

622 # Store log file path for monitoring (subprocess logger writes to this) 

623 self.log_file_path = actual_log_file_path 

624 self.log_file_position = self._get_current_log_position() # Start from current end 

625 

626 logger.debug(f"🔥 Subprocess command: {sys.executable} {subprocess_script} {data_file.name} {log_file_base} {unique_id}") 

627 logger.debug(f"🔥 Subprocess logger will write to: {self.log_file_path}") 

628 logger.debug(f"🔥 Subprocess stdout will be silenced (logger handles output)") 

629 

630 # SIMPLE SUBPROCESS: Let subprocess log to its own file (single source of truth) 

631 # Wrap subprocess creation in executor to avoid blocking UI 

632 def _create_subprocess(): 

633 return subprocess.Popen([ 

634 sys.executable, str(subprocess_script), 

635 data_file.name, log_file_base, unique_id # Only data file and log - no temp files 

636 ], 

637 stdout=subprocess.DEVNULL, # Subprocess logs to its own file 

638 stderr=subprocess.DEVNULL, # Subprocess logs to its own file 

639 text=True, # Text mode for easier handling 

640 ) 

641 

642 self.current_process = await asyncio.get_event_loop().run_in_executor(None, _create_subprocess) 

643 

644 logger.info(f"🔥 Subprocess started with PID: {self.current_process.pid}") 

645 

646 # Subprocess logs to its own dedicated file - no output monitoring needed 

647 

648 # Update orchestrator states to show running state 

649 for plate in ready_items: 

650 plate_path = plate['path'] 

651 if plate_path in self.orchestrators: 

652 self.orchestrators[plate_path]._state = OrchestratorState.EXECUTING 

653 

654 # Trigger UI refresh after state changes 

655 self._trigger_ui_refresh() 

656 

657 self.app.current_status = f"Running {len(ready_items)} plate(s) in subprocess..." 

658 self._update_button_states() 

659 

660 # Start reactive log monitoring 

661 self._start_log_monitoring() 

662 

663 # Start async monitoring 

664 await self._start_monitoring() 

665 

666 except Exception as e: 

667 logger.critical(f"Failed to start subprocess: {e}", exc_info=True) 

668 self.app.show_error("Failed to start subprocess", str(e)) 

669 self._reset_execution_state("Subprocess failed to start") 

670 

671 def _start_log_monitoring(self) -> None: 

672 """Start reactive log monitoring for subprocess logs.""" 

673 if not self.log_file_path: 

674 logger.warning("Cannot start log monitoring: no log file path") 

675 return 

676 

677 try: 

678 # Extract base path from log file path (remove .log extension) 

679 log_path = Path(self.log_file_path) 

680 base_log_path = str(log_path.with_suffix('')) 

681 

682 # Notify status bar to start log monitoring 

683 if hasattr(self.app, 'status_bar') and self.app.status_bar: 

684 self.app.status_bar.start_log_monitoring(base_log_path) 

685 logger.debug(f"Started reactive log monitoring for: {base_log_path}") 

686 else: 

687 logger.warning("Status bar not available for log monitoring") 

688 

689 except Exception as e: 

690 logger.error(f"Failed to start log monitoring: {e}") 

691 

692 def _stop_log_monitoring(self) -> None: 

693 """Stop reactive log monitoring.""" 

694 try: 

695 # Notify status bar to stop log monitoring 

696 if hasattr(self.app, 'status_bar') and self.app.status_bar: 

697 self.app.status_bar.stop_log_monitoring() 

698 logger.debug("Stopped reactive log monitoring") 

699 except Exception as e: 

700 logger.error(f"Failed to stop log monitoring: {e}") 

701 

702 def _get_current_log_position(self) -> int: 

703 """Get current position in log file.""" 

704 if not self.log_file_path or not Path(self.log_file_path).exists(): 

705 return 0 

706 try: 

707 return Path(self.log_file_path).stat().st_size 

708 except Exception: 

709 return 0 

710 

711 

712 

713 def _stop_file_watcher(self) -> None: 

714 """Stop file system watcher without blocking.""" 

715 if not self.file_observer: 

716 return 

717 

718 try: 

719 # Just stop and abandon - don't wait for anything 

720 self.file_observer.stop() 

721 except Exception: 

722 pass # Ignore errors 

723 finally: 

724 # Always clear references immediately 

725 self.file_observer = None 

726 self.file_watcher = None 

727 

728 

729 

730 async def _start_monitoring(self) -> None: 

731 """Start async monitoring using Textual's interval system.""" 

732 # Stop any existing monitoring 

733 self._stop_monitoring() 

734 

735 if self.monitoring_active: 

736 return 

737 

738 self.monitoring_active = True 

739 # Use Textual's set_interval for periodic async monitoring 

740 self.monitoring_interval = self.set_interval( 

741 10.0, # Check every 10 seconds 

742 self._check_process_status_async, 

743 pause=False 

744 ) 

745 logger.debug("Started async process monitoring") 

746 

747 def _stop_monitoring(self) -> None: 

748 """Stop async monitoring.""" 

749 if self.monitoring_interval: 

750 self.monitoring_interval.stop() 

751 self.monitoring_interval = None 

752 self.monitoring_active = False 

753 

754 # Also stop log monitoring 

755 self._stop_log_monitoring() 

756 

757 logger.debug("Stopped async process monitoring") 

758 

759 async def _check_process_status_async(self) -> None: 

760 """Async process status check - replaces worker thread.""" 

761 if not self.monitoring_active: 

762 return 

763 

764 try: 

765 # Simple direct access - no threading, no locks needed 

766 if not self._is_any_plate_running(): 

767 logger.debug("🔥 MONITOR: Subprocess finished") 

768 

769 # Stop monitoring first 

770 self._stop_monitoring() 

771 

772 # Handle completion directly - no call_from_thread needed 

773 await self._handle_process_completion() 

774 

775 except Exception as e: 

776 logger.debug(f"Error in async process monitoring: {e}") 

777 # Continue monitoring on error 

778 

779 async def _handle_process_completion(self) -> None: 

780 """Handle subprocess completion - read from log file (single source of truth).""" 

781 # Determine success/failure from log file content (single source of truth) 

782 success = False 

783 

784 if self.log_file_path and Path(self.log_file_path).exists(): 

785 try: 

786 # Read log file directly to check for success markers 

787 with open(self.log_file_path, 'r') as f: 

788 log_content = f.read() 

789 # Look for success markers in the log 

790 has_execution_success = "🔥 SUBPROCESS: EXECUTION SUCCESS:" in log_content 

791 has_all_completed = "All plates completed successfully" in log_content 

792 if has_execution_success and has_all_completed: 

793 success = True 

794 

795 except Exception as e: 

796 logger.error(f"Error reading subprocess log file: {e}") 

797 success = False 

798 

799 # Clean up the subprocess 

800 logger.info("🔥 MONITOR: Starting process cleanup...") 

801 if self.current_process: 

802 try: 

803 self.current_process.wait() # Clean up the zombie process 

804 logger.info("🔥 MONITOR: Process cleanup completed") 

805 except Exception as e: 

806 logger.warning(f"🔥 MONITOR: Error during process cleanup: {e}") 

807 

808 # Update orchestrator states based on log file analysis (single source of truth) 

809 if success: 

810 # Success - update orchestrators to completed 

811 for plate_path, orchestrator in self.orchestrators.items(): 

812 if orchestrator.state == OrchestratorState.EXECUTING: 

813 orchestrator._state = OrchestratorState.COMPLETED 

814 

815 # Reset execution state (this will trigger UI refresh internally) 

816 self._reset_execution_state("Execution completed successfully.", force_fail_executing=False) 

817 else: 

818 # Failure - update orchestrators to failed 

819 for plate_path, orchestrator in self.orchestrators.items(): 

820 if orchestrator.state == OrchestratorState.EXECUTING: 

821 orchestrator._state = OrchestratorState.EXEC_FAILED 

822 

823 # Reset execution state (this will trigger UI refresh internally) 

824 self._reset_execution_state("Execution failed.", force_fail_executing=False) 

825 

826 self._stop_monitoring() # Stop monitoring since process is done 

827 

828 async def _read_log_file_incremental(self) -> None: 

829 """Read new content from the log file since last read.""" 

830 if not self.log_file_path: 

831 self.app.current_status = "🔥 LOG READER: No log file" 

832 return 

833 

834 try: 

835 # Wrap all file I/O operations in executor to avoid blocking UI 

836 def _read_log_content(): 

837 if not Path(self.log_file_path).exists(): 

838 return None, self.log_file_position 

839 

840 with open(self.log_file_path, 'r') as f: 

841 # Seek to where we left off 

842 f.seek(self.log_file_position) 

843 new_content = f.read() 

844 # Update position for next read 

845 new_position = f.tell() 

846 

847 return new_content, new_position 

848 

849 new_content, new_position = await asyncio.get_event_loop().run_in_executor(None, _read_log_content) 

850 self.log_file_position = new_position 

851 

852 if new_content is None: 

853 self.app.current_status = "🔥 LOG READER: No log file" 

854 return 

855 

856 if new_content and new_content.strip(): 

857 # Get the last non-empty line from new content 

858 lines = new_content.strip().split('\n') 

859 non_empty_lines = [line.strip() for line in lines if line.strip()] 

860 

861 if non_empty_lines: 

862 # Show the last line, truncated if too long 

863 last_line = non_empty_lines[-1] 

864 if len(last_line) > 100: 

865 last_line = last_line[:97] + "..." 

866 

867 self.app.current_status = last_line 

868 else: 

869 self.app.current_status = "🔥 LOG READER: No lines found" 

870 else: 

871 self.app.current_status = "🔥 LOG READER: No new content" 

872 

873 except Exception as e: 

874 self.app.current_status = f"🔥 LOG READER ERROR: {e}" 

875 

876 

877 

878 async def action_stop_execution(self) -> None: 

879 logger.info("🛑 Stop button pressed. Terminating subprocess.") 

880 self.app.current_status = "Terminating execution..." 

881 

882 # Stop async monitoring first 

883 self._stop_monitoring() 

884 

885 if self.current_process and self.current_process.poll() is None: # Still running 

886 try: 

887 # Kill the entire process group, not just the parent process 

888 # The subprocess creates its own process group, so we need to kill that group 

889 logger.info(f"🛑 Killing process group for PID {self.current_process.pid}...") 

890 

891 # Get the process group ID (should be same as PID since subprocess calls os.setpgrp()) 

892 process_group_id = self.current_process.pid 

893 

894 # Kill entire process group (negative PID kills process group) 

895 os.killpg(process_group_id, signal.SIGTERM) 

896 

897 # Give processes time to exit gracefully 

898 await asyncio.sleep(1) 

899 

900 # Force kill if still alive 

901 try: 

902 os.killpg(process_group_id, signal.SIGKILL) 

903 logger.info(f"🛑 Force killed process group {process_group_id}") 

904 except ProcessLookupError: 

905 logger.info(f"🛑 Process group {process_group_id} already terminated") 

906 

907 except Exception as e: 

908 logger.warning(f"🛑 Error killing process group: {e}, falling back to single process kill") 

909 # Fallback to killing just the main process 

910 self.current_process.kill() 

911 

912 self._reset_execution_state("Execution terminated by user.") 

913 

914 

915 

916 async def action_add_plate(self) -> None: 

917 """Handle Add Plate button.""" 

918 await self._open_plate_directory_browser() 

919 

920 async def action_export_ome_zarr(self) -> None: 

921 """Export selected plate to OME-ZARR format.""" 

922 if not self.selected_plate: 

923 self.app.show_error("No Selection", "Please select a plate to export.") 

924 return 

925 

926 # Get the orchestrator for the selected plate 

927 orchestrator = self.orchestrators.get(self.selected_plate) 

928 if not orchestrator: 

929 self.app.show_error("Not Initialized", "Please initialize the plate before exporting.") 

930 return 

931 

932 # Open file browser for export location 

933 def handle_export_result(selected_paths): 

934 if selected_paths: 

935 export_path = Path(selected_paths[0]) if isinstance(selected_paths, list) else Path(selected_paths) 

936 self._start_ome_zarr_export(orchestrator, export_path) 

937 

938 await self.window_service.open_file_browser( 

939 file_manager=self.filemanager, 

940 initial_path=get_cached_browser_path(PathCacheKey.GENERAL), 

941 backend=Backend.DISK, 

942 title="Select OME-ZARR Export Directory", 

943 mode="save", 

944 selection_mode=SelectionMode.DIRECTORIES_ONLY, 

945 cache_key=PathCacheKey.GENERAL, 

946 on_result_callback=handle_export_result, 

947 caller_id="plate_manager_export" 

948 ) 

949 

950 def _start_ome_zarr_export(self, orchestrator, export_path: Path): 

951 """Start OME-ZARR export process.""" 

952 async def run_export(): 

953 try: 

954 self.app.current_status = f"Exporting to OME-ZARR: {export_path}" 

955 

956 # Create export-specific config with ZARR materialization 

957 export_config = orchestrator.global_config 

958 export_vfs_config = VFSConfig( 

959 intermediate_backend=export_config.vfs.intermediate_backend, 

960 materialization_backend=MaterializationBackend.ZARR 

961 ) 

962 

963 # Update orchestrator config for export 

964 export_global_config = dataclasses.replace(export_config, vfs=export_vfs_config) 

965 

966 # Create zarr backend with OME-ZARR enabled 

967 zarr_backend = ZarrStorageBackend(ome_zarr_metadata=True) 

968 

969 # Copy processed data from current workspace/plate to OME-ZARR format 

970 # For OpenHCS format, workspace_path is None, so use input_dir (plate path) 

971 source_path = orchestrator.workspace_path or orchestrator.input_dir 

972 if source_path and source_path.exists(): 

973 # Find processed images in workspace/plate 

974 processed_images = list(source_path.rglob("*.tif")) 

975 

976 if processed_images: 

977 # Group by well for batch operations 

978 wells_data = defaultdict(list) 

979 

980 for img_path in processed_images: 

981 # Extract well from filename 

982 well_match = None 

983 # Try ImageXpress pattern: A01_s001_w1_z001.tif 

984 match = re.search(r'([A-Z]\d{2})_', img_path.name) 

985 if match: 

986 well_id = match.group(1) 

987 wells_data[well_id].append(img_path) 

988 

989 # Export each well to OME-ZARR 

990 export_store_path = export_path / "plate.zarr" 

991 

992 for well_id, well_images in wells_data.items(): 

993 # Load images 

994 images = [] 

995 for img_path in well_images: 

996 img = Image.open(img_path) 

997 images.append(np.array(img)) 

998 

999 # Create output paths for OME-ZARR structure 

1000 output_paths = [export_store_path / f"{well_id}_{i:03d}.tif" 

1001 for i in range(len(images))] 

1002 

1003 # Save to OME-ZARR format 

1004 zarr_backend.save_batch(images, output_paths, chunk_name=well_id) 

1005 

1006 self.app.current_status = f"✅ OME-ZARR export completed: {export_store_path}" 

1007 else: 

1008 self.app.show_error("No Data", "No processed images found in workspace.") 

1009 else: 

1010 self.app.show_error("No Workspace", "Plate workspace not found. Run pipeline first.") 

1011 

1012 except Exception as e: 

1013 logger.error(f"OME-ZARR export failed: {e}", exc_info=True) 

1014 self.app.show_error("Export Failed", f"OME-ZARR export failed: {str(e)}") 

1015 

1016 # Run export in background 

1017 asyncio.create_task(run_export()) 

1018 

1019 # Debug functionality removed - no longer needed 

1020 

1021 async def _open_plate_directory_browser(self): 

1022 """Open textual-window file browser for plate directory selection.""" 

1023 # Get cached path for better UX - remembers last used directory 

1024 path_cache = get_path_cache() 

1025 initial_path = path_cache.get_initial_path(PathCacheKey.PLATE_IMPORT, Path.home()) 

1026 

1027 # Open textual-window file browser for directory selection 

1028 await self.window_service.open_file_browser( 

1029 file_manager=self.filemanager, 

1030 initial_path=initial_path, 

1031 backend=Backend.DISK, 

1032 title="Select Plate Directory", 

1033 mode="load", 

1034 selection_mode=SelectionMode.DIRECTORIES_ONLY, 

1035 cache_key=PathCacheKey.PLATE_IMPORT, 

1036 on_result_callback=self._add_plate_callback, 

1037 caller_id="plate_manager", 

1038 enable_multi_selection=True 

1039 ) 

1040 

1041 def _add_plate_callback(self, selected_paths) -> None: 

1042 """Handle directory selection from file browser.""" 

1043 logger.debug(f"_add_plate_callback called with: {selected_paths} (type: {type(selected_paths)})") 

1044 

1045 if selected_paths is None or selected_paths is False: 

1046 self.app.current_status = "Plate selection cancelled" 

1047 return 

1048 

1049 # Handle both single path and list of paths 

1050 if not isinstance(selected_paths, list): 

1051 selected_paths = [selected_paths] 

1052 

1053 added_plates = [] 

1054 current_plates = list(self.items) 

1055 

1056 for selected_path in selected_paths: 

1057 # Ensure selected_path is a Path object 

1058 if isinstance(selected_path, str): 

1059 selected_path = Path(selected_path) 

1060 elif not isinstance(selected_path, Path): 

1061 selected_path = Path(str(selected_path)) 

1062 

1063 # Check if plate already exists 

1064 if any(plate['path'] == str(selected_path) for plate in current_plates): 

1065 continue 

1066 

1067 # Add the plate to the list 

1068 plate_name = selected_path.name 

1069 plate_path = str(selected_path) 

1070 plate_entry = { 

1071 'name': plate_name, 

1072 'path': plate_path, 

1073 # No status field - state comes from orchestrator 

1074 } 

1075 

1076 current_plates.append(plate_entry) 

1077 added_plates.append(plate_name) 

1078 

1079 # Cache the parent directory for next time (save user navigation time) 

1080 if selected_paths: 

1081 # Use parent of first selected path as the cached directory 

1082 first_path = selected_paths[0] if isinstance(selected_paths[0], Path) else Path(selected_paths[0]) 

1083 parent_dir = first_path.parent 

1084 get_path_cache().set_cached_path(PathCacheKey.PLATE_IMPORT, parent_dir) 

1085 

1086 # Update items list using reactive property (triggers automatic UI update) 

1087 self.items = current_plates 

1088 

1089 if added_plates: 

1090 if len(added_plates) == 1: 

1091 self.app.current_status = f"Added plate: {added_plates[0]}" 

1092 else: 

1093 self.app.current_status = f"Added {len(added_plates)} plates: {', '.join(added_plates)}" 

1094 else: 

1095 self.app.current_status = "No new plates added (duplicates skipped)" 

1096 

1097 def action_delete_plate(self) -> None: 

1098 selected_items, _ = self.get_selection_state() 

1099 if not selected_items: 

1100 self.app.show_error("No plate selected to delete.") 

1101 return 

1102 

1103 paths_to_delete = {p['path'] for p in selected_items} 

1104 self.items = [p for p in self.items if p['path'] not in paths_to_delete] 

1105 

1106 # Clean up orchestrators for deleted plates 

1107 for path in paths_to_delete: 

1108 if path in self.orchestrators: 

1109 del self.orchestrators[path] 

1110 

1111 if self.selected_plate in paths_to_delete: 

1112 self.selected_plate = "" 

1113 

1114 self.app.current_status = f"Deleted {len(paths_to_delete)} plate(s)" 

1115 

1116 

1117 

1118 async def action_edit_config(self) -> None: 

1119 """ 

1120 Handle Edit button - create per-orchestrator PipelineConfig instances. 

1121 

1122 This enables per-orchestrator configuration without affecting global configuration. 

1123 Shows resolved defaults from GlobalPipelineConfig with "Pipeline default: {value}" placeholders. 

1124 """ 

1125 selected_items, selection_mode = self.get_selection_state() 

1126 

1127 if selection_mode == "empty": 

1128 self.app.current_status = "No orchestrators selected for configuration" 

1129 return 

1130 

1131 selected_orchestrators = [ 

1132 self.orchestrators[item['path']] for item in selected_items 

1133 if item['path'] in self.orchestrators 

1134 ] 

1135 

1136 if not selected_orchestrators: 

1137 self.app.current_status = "No initialized orchestrators selected" 

1138 return 

1139 

1140 # Load existing config or create new one for editing 

1141 representative_orchestrator = selected_orchestrators[0] 

1142 

1143 if representative_orchestrator.pipeline_config: 

1144 # Create editing config from existing orchestrator config with user-set values preserved 

1145 # Use current global config (not orchestrator's old global config) for updated placeholders 

1146 from openhcs.core.config import create_editing_config_from_existing_lazy_config 

1147 current_plate_config = create_editing_config_from_existing_lazy_config( 

1148 representative_orchestrator.pipeline_config, 

1149 self.global_config # Use current global config for updated placeholders 

1150 ) 

1151 else: 

1152 # Create new config with placeholders using current global config 

1153 from openhcs.core.config import create_pipeline_config_for_editing 

1154 current_plate_config = create_pipeline_config_for_editing(self.global_config) 

1155 

1156 def handle_config_save(new_config: PipelineConfig) -> None: 

1157 """Apply per-orchestrator configuration without global side effects.""" 

1158 for orchestrator in selected_orchestrators: 

1159 # Direct synchronous call - no async needed 

1160 orchestrator.apply_pipeline_config(new_config) 

1161 count = len(selected_orchestrators) 

1162 self.app.current_status = f"Per-orchestrator configuration applied to {count} orchestrator(s)" 

1163 

1164 # Open configuration window using PipelineConfig (not GlobalPipelineConfig) 

1165 await self.window_service.open_config_window( 

1166 PipelineConfig, 

1167 current_plate_config, 

1168 on_save_callback=handle_config_save 

1169 ) 

1170 

1171 async def action_edit_global_config(self) -> None: 

1172 """ 

1173 Handle global configuration editing - affects all orchestrators. 

1174 

1175 This maintains the existing global configuration workflow but uses lazy loading. 

1176 """ 

1177 from openhcs.core.config import get_default_global_config 

1178 from openhcs.core.lazy_config import create_pipeline_config_for_editing, PipelineConfig 

1179 

1180 # Get current global config from app or use default 

1181 current_global_config = self.app.global_config or get_default_global_config() 

1182 

1183 # Create lazy PipelineConfig for editing with proper thread-local context 

1184 current_lazy_config = create_pipeline_config_for_editing(current_global_config, preserve_values=True) 

1185 

1186 def handle_global_config_save(new_config: PipelineConfig) -> None: 

1187 """Apply global configuration to all orchestrators.""" 

1188 # Convert lazy PipelineConfig back to GlobalPipelineConfig 

1189 global_config = new_config.to_base_config() 

1190 

1191 self.app.global_config = global_config # Update app-level config 

1192 

1193 # Update thread-local storage for MaterializationPathConfig defaults 

1194 from openhcs.core.config import set_current_global_config, GlobalPipelineConfig 

1195 set_current_global_config(GlobalPipelineConfig, global_config) 

1196 

1197 for orchestrator in self.orchestrators.values(): 

1198 asyncio.create_task(orchestrator.apply_new_global_config(global_config)) 

1199 self.app.current_status = "Global configuration applied to all orchestrators" 

1200 

1201 # PipelineConfig already imported from openhcs.core.config 

1202 await self.window_service.open_config_window( 

1203 PipelineConfig, 

1204 current_lazy_config, 

1205 on_save_callback=handle_global_config_save 

1206 ) 

1207 

1208 

1209 

1210 def _analyze_orchestrator_configs(self, orchestrators: List['PipelineOrchestrator']) -> Dict[str, Dict[str, Any]]: 

1211 """Analyze configs across multiple orchestrators to detect same/different values. 

1212 

1213 Args: 

1214 orchestrators: List of PipelineOrchestrator instances 

1215 

1216 Returns: 

1217 Dict mapping field names to analysis results: 

1218 - {"type": "same", "value": actual_value, "default": default_value} 

1219 - {"type": "different", "values": [val1, val2, ...], "default": default_value} 

1220 """ 

1221 if not orchestrators: 

1222 return {} 

1223 

1224 # Get parameter info for defaults 

1225 param_info = SignatureAnalyzer.analyze(GlobalPipelineConfig) 

1226 

1227 config_analysis = {} 

1228 

1229 # Analyze each field in GlobalPipelineConfig 

1230 for field in dataclasses.fields(GlobalPipelineConfig): 

1231 field_name = field.name 

1232 

1233 # Get values from all orchestrators 

1234 values = [] 

1235 for orch in orchestrators: 

1236 try: 

1237 value = getattr(orch.global_config, field_name) 

1238 values.append(value) 

1239 except AttributeError: 

1240 # Field doesn't exist in this config, skip 

1241 continue 

1242 

1243 if not values: 

1244 continue 

1245 

1246 # Get default value from parameter info 

1247 param_details = param_info.get(field_name) 

1248 default_value = param_details.default_value if param_details else None 

1249 

1250 # Check if all values are the same 

1251 if all(self._values_equal(v, values[0]) for v in values): 

1252 config_analysis[field_name] = { 

1253 "type": "same", 

1254 "value": values[0], 

1255 "default": default_value 

1256 } 

1257 else: 

1258 config_analysis[field_name] = { 

1259 "type": "different", 

1260 "values": values, 

1261 "default": default_value 

1262 } 

1263 

1264 return config_analysis 

1265 

1266 def _values_equal(self, val1: Any, val2: Any) -> bool: 

1267 """Check if two values are equal, handling dataclasses and complex types.""" 

1268 # Handle dataclass comparison 

1269 if dataclasses.is_dataclass(val1) and dataclasses.is_dataclass(val2): 

1270 return dataclasses.asdict(val1) == dataclasses.asdict(val2) 

1271 

1272 # Handle regular comparison 

1273 return val1 == val2 

1274 

1275 def action_init_plate(self) -> None: 

1276 """Handle Init Plate button - initialize selected plates.""" 

1277 # Get current selection state 

1278 selected_items, selection_mode = self.get_selection_state() 

1279 

1280 if selection_mode == "empty": 

1281 logger.warning("No plates available for initialization") 

1282 return 

1283 

1284 # Validate all selected plates can be initialized (allow ALL failed plates to be re-initialized) 

1285 invalid_plates = [] 

1286 for item in selected_items: 

1287 plate_path = item['path'] 

1288 orchestrator = self.orchestrators.get(plate_path) 

1289 # Only block plates that are currently executing - all other states can be re-initialized 

1290 if orchestrator is not None and orchestrator.state == OrchestratorState.EXECUTING: 

1291 invalid_plates.append(item) 

1292 

1293 if invalid_plates: 

1294 names = [item['name'] for item in invalid_plates] 

1295 logger.warning(f"Cannot initialize plates that are currently executing: {', '.join(names)}") 

1296 return 

1297 

1298 # Start async initialization 

1299 self._start_async_init(selected_items, selection_mode) 

1300 

1301 def _start_async_init(self, selected_items: List[Dict], selection_mode: str) -> None: 

1302 """Start async initialization of selected plates.""" 

1303 # Generate operation description 

1304 desc = self.get_operation_description(selected_items, selection_mode, "initialize") 

1305 logger.info(f"Initializing: {desc}") 

1306 

1307 # Start background worker 

1308 self._init_plates_worker(selected_items) 

1309 

1310 @work(exclusive=True) 

1311 async def _init_plates_worker(self, selected_items: List[Dict]) -> None: 

1312 """Background worker for plate initialization.""" 

1313 for plate_data in selected_items: 

1314 plate_path = plate_data['path'] 

1315 

1316 # Find the actual plate in self.items (not the copy from get_selection_state) 

1317 actual_plate = None 

1318 for plate in self.items: 

1319 if plate['path'] == plate_path: 

1320 actual_plate = plate 

1321 break 

1322 

1323 if not actual_plate: 

1324 logger.error(f"Plate not found in plates list: {plate_path}") 

1325 continue 

1326 

1327 try: 

1328 # Run heavy initialization in executor to avoid blocking UI 

1329 def init_orchestrator(): 

1330 return PipelineOrchestrator( 

1331 plate_path=plate_path, 

1332 global_config=self.global_config, 

1333 storage_registry=self.filemanager.registry 

1334 ).initialize() 

1335 

1336 orchestrator = await asyncio.get_event_loop().run_in_executor(None, init_orchestrator) 

1337 

1338 # Store orchestrator for later use (channel selection, etc.) 

1339 self.orchestrators[plate_path] = orchestrator 

1340 # Orchestrator state is already set to READY by initialize() method 

1341 logger.info(f"Plate {actual_plate['name']} initialized successfully") 

1342 

1343 except Exception as e: 

1344 logger.error(f"Failed to initialize plate {plate_path}: {e}", exc_info=True) 

1345 # Create a failed orchestrator to track the error state 

1346 failed_orchestrator = PipelineOrchestrator( 

1347 plate_path=plate_path, 

1348 global_config=self.global_config, 

1349 storage_registry=self.filemanager.registry 

1350 ) 

1351 failed_orchestrator._state = OrchestratorState.INIT_FAILED 

1352 self.orchestrators[plate_path] = failed_orchestrator 

1353 actual_plate['error'] = str(e) 

1354 

1355 # Trigger UI refresh after orchestrator state changes 

1356 self._trigger_ui_refresh() 

1357 # Update button states immediately (reactive system handles UI updates automatically) 

1358 self._update_button_states() 

1359 # Notify pipeline editor of status change 

1360 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path'])) 

1361 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol) 

1362 logger.debug(f"Updated plate {actual_plate['name']} status") 

1363 

1364 # Final UI update (reactive system handles this automatically when self.items is modified) 

1365 self._update_button_states() 

1366 

1367 # Update status 

1368 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.READY]) 

1369 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.INIT_FAILED]) 

1370 

1371 if error_count == 0: 

1372 logger.info(f"Successfully initialized {success_count} plates") 

1373 else: 

1374 logger.warning(f"Initialized {success_count} plates, {error_count} errors") 

1375 

1376 def action_compile_plate(self) -> None: 

1377 """Handle Compile Plate button - compile pipelines for selected plates.""" 

1378 # Get current selection state 

1379 selected_items, selection_mode = self.get_selection_state() 

1380 

1381 if selection_mode == "empty": 

1382 logger.warning("No plates available for compilation") 

1383 return 

1384 

1385 # Validate all selected plates are ready for compilation (allow failed plates to be re-compiled) 

1386 not_ready = [] 

1387 for item in selected_items: 

1388 plate_path = item['path'] 

1389 orchestrator = self.orchestrators.get(plate_path) 

1390 # Allow READY, COMPILE_FAILED, EXEC_FAILED, COMPILED, and COMPLETED states to be compiled/recompiled 

1391 if orchestrator is None or orchestrator.state not in [OrchestratorState.READY, OrchestratorState.COMPILE_FAILED, OrchestratorState.EXEC_FAILED, OrchestratorState.COMPILED, OrchestratorState.COMPLETED]: 

1392 not_ready.append(item) 

1393 

1394 if not_ready: 

1395 names = [item['name'] for item in not_ready] 

1396 # More accurate error message based on actual state 

1397 if any(self.orchestrators.get(item['path']) is None for item in not_ready): 

1398 logger.warning(f"Cannot compile plates that haven't been initialized: {', '.join(names)}") 

1399 elif any(self.orchestrators.get(item['path']).state == OrchestratorState.EXECUTING for item in not_ready): 

1400 logger.warning(f"Cannot compile plates that are currently executing: {', '.join(names)}") 

1401 else: 

1402 logger.warning(f"Cannot compile plates in current state: {', '.join(names)}") 

1403 return 

1404 

1405 # Validate all selected plates have pipelines 

1406 no_pipeline = [] 

1407 for item in selected_items: 

1408 pipeline = self._get_current_pipeline_definition(item['path']) 

1409 if not pipeline: 

1410 no_pipeline.append(item) 

1411 

1412 if no_pipeline: 

1413 names = [item['name'] for item in no_pipeline] 

1414 self.app.current_status = f"Cannot compile plates without pipelines: {', '.join(names)}" 

1415 return 

1416 

1417 # Start async compilation 

1418 self._start_async_compile(selected_items, selection_mode) 

1419 

1420 def _start_async_compile(self, selected_items: List[Dict], selection_mode: str) -> None: 

1421 """Start async compilation of selected plates.""" 

1422 # Generate operation description 

1423 desc = self.get_operation_description(selected_items, selection_mode, "compile") 

1424 logger.info(f"Compiling: {desc}") 

1425 

1426 # Start background worker 

1427 self._compile_plates_worker(selected_items) 

1428 

1429 @work(exclusive=True) 

1430 async def _compile_plates_worker(self, selected_items: List[Dict]) -> None: 

1431 """Background worker for plate compilation.""" 

1432 for plate_data in selected_items: 

1433 plate_path = plate_data['path'] 

1434 

1435 # Find the actual plate in self.items (not the copy from get_selection_state) 

1436 actual_plate = None 

1437 for plate in self.items: 

1438 if plate['path'] == plate_path: 

1439 actual_plate = plate 

1440 break 

1441 

1442 if not actual_plate: 

1443 logger.error(f"Plate not found in plates list: {plate_path}") 

1444 continue 

1445 

1446 # Get definition pipeline and make fresh copy 

1447 definition_pipeline = self._get_current_pipeline_definition(plate_path) 

1448 if not definition_pipeline: 

1449 logger.warning(f"No pipeline defined for {actual_plate['name']}, using empty pipeline") 

1450 definition_pipeline = [] 

1451 

1452 try: 

1453 # Get or create orchestrator for compilation (run in executor to avoid blocking) 

1454 def get_or_create_orchestrator(): 

1455 if plate_path in self.orchestrators: 

1456 orchestrator = self.orchestrators[plate_path] 

1457 if not orchestrator.is_initialized(): 

1458 orchestrator.initialize() 

1459 return orchestrator 

1460 else: 

1461 return PipelineOrchestrator( 

1462 plate_path=plate_path, 

1463 global_config=self.global_config, 

1464 storage_registry=self.filemanager.registry 

1465 ).initialize() 

1466 

1467 orchestrator = await asyncio.get_event_loop().run_in_executor(None, get_or_create_orchestrator) 

1468 self.orchestrators[plate_path] = orchestrator 

1469 

1470 # Make fresh copy for compilation 

1471 execution_pipeline = copy.deepcopy(definition_pipeline) 

1472 

1473 # Fix step IDs after deep copy to match new object IDs 

1474 for step in execution_pipeline: 

1475 step.step_id = str(id(step)) 

1476 # Ensure variable_components is never None - use FunctionStep default 

1477 if step.variable_components is None: 

1478 logger.warning(f"🔥 Step '{step.name}' has None variable_components, setting FunctionStep default") 

1479 step.variable_components = [VariableComponents.SITE] 

1480 # Also ensure it's not an empty list 

1481 elif not step.variable_components: 

1482 logger.warning(f"🔥 Step '{step.name}' has empty variable_components, setting FunctionStep default") 

1483 step.variable_components = [VariableComponents.SITE] 

1484 

1485 # Get wells and compile (async - run in executor to avoid blocking UI) 

1486 # Wrap in Pipeline object like test_main.py does 

1487 pipeline_obj = Pipeline(steps=execution_pipeline) 

1488 

1489 # Run heavy operations in executor to avoid blocking UI 

1490 wells = await asyncio.get_event_loop().run_in_executor(None, lambda: orchestrator.get_component_keys(GroupBy.WELL)) 

1491 compiled_contexts = await asyncio.get_event_loop().run_in_executor( 

1492 None, orchestrator.compile_pipelines, pipeline_obj.steps, wells 

1493 ) 

1494 

1495 # Store state simply - no reactive property issues 

1496 step_ids_in_pipeline = [id(step) for step in execution_pipeline] 

1497 # Get step IDs from contexts (ProcessingContext objects) 

1498 first_well_key = list(compiled_contexts.keys())[0] if compiled_contexts else None 

1499 step_ids_in_contexts = list(compiled_contexts[first_well_key].step_plans.keys()) if first_well_key and hasattr(compiled_contexts[first_well_key], 'step_plans') else [] 

1500 logger.info(f"🔥 Storing compiled data for {plate_path}: pipeline={type(execution_pipeline)}, contexts={type(compiled_contexts)}") 

1501 logger.info(f"🔥 Step IDs in pipeline: {step_ids_in_pipeline}") 

1502 logger.info(f"🔥 Step IDs in contexts: {step_ids_in_contexts}") 

1503 self.plate_compiled_data[plate_path] = (execution_pipeline, compiled_contexts) 

1504 logger.info(f"🔥 Stored! Available compiled plates: {list(self.plate_compiled_data.keys())}") 

1505 

1506 # Orchestrator state is already set to COMPILED by compile_pipelines() method 

1507 logger.info(f"🔥 Successfully compiled {plate_path}") 

1508 

1509 except Exception as e: 

1510 logger.error(f"🔥 COMPILATION ERROR: Pipeline compilation failed for {plate_path}: {e}", exc_info=True) 

1511 # Orchestrator state is already set to FAILED by compile_pipelines() method 

1512 actual_plate['error'] = str(e) 

1513 # Don't store anything in plate_compiled_data on failure 

1514 

1515 # Trigger UI refresh after orchestrator state changes 

1516 self._trigger_ui_refresh() 

1517 # Update button states immediately (reactive system handles UI updates automatically) 

1518 self._update_button_states() 

1519 # Notify pipeline editor of status change 

1520 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path'])) 

1521 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol) 

1522 

1523 # Final UI update (reactive system handles this automatically when self.items is modified) 

1524 self._update_button_states() 

1525 

1526 # Update status 

1527 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILED]) 

1528 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILE_FAILED]) 

1529 

1530 if error_count == 0: 

1531 logger.info(f"Successfully compiled {success_count} plates") 

1532 else: 

1533 logger.warning(f"Compiled {success_count} plates, {error_count} errors") 

1534 

1535 async def action_code_plate(self) -> None: 

1536 """Generate Python code for selected plates and their pipelines.""" 

1537 logger.debug("Code button pressed - generating Python code for plates") 

1538 

1539 selected_items, _ = self.get_selection_state() 

1540 if not selected_items: 

1541 self.app.current_status = "No plates selected for code generation" 

1542 return 

1543 

1544 try: 

1545 # Get pipeline data for selected plates 

1546 plate_paths = [item['path'] for item in selected_items] 

1547 pipeline_data = {} 

1548 

1549 # Collect pipeline steps for each plate 

1550 for plate_path in plate_paths: 

1551 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1552 # Get pipeline steps from pipeline editor if available 

1553 if plate_path in self.pipeline_editor.plate_pipelines: 

1554 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path] 

1555 else: 

1556 pipeline_data[plate_path] = [] 

1557 else: 

1558 pipeline_data[plate_path] = [] 

1559 

1560 # Use existing pickle_to_python logic to generate complete script 

1561 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher 

1562 

1563 # Create data structure like pickle_to_python expects 

1564 data = { 

1565 'plate_paths': plate_paths, 

1566 'pipeline_data': pipeline_data, 

1567 'global_config': self.app.global_config 

1568 } 

1569 

1570 # Extract variables from data dict 

1571 plate_paths = data['plate_paths'] 

1572 pipeline_data = data['pipeline_data'] 

1573 

1574 # Generate just the orchestrator configuration (no execution wrapper) 

1575 from openhcs.debug.pickle_to_python import generate_complete_orchestrator_code 

1576 

1577 python_code = generate_complete_orchestrator_code( 

1578 plate_paths=plate_paths, 

1579 pipeline_data=pipeline_data, 

1580 global_config=self.app.global_config, 

1581 clean_mode=False 

1582 ) 

1583 

1584 # Create callback to handle edited code 

1585 def handle_edited_code(edited_code: str): 

1586 logger.debug("Orchestrator code edited, processing changes...") 

1587 try: 

1588 # Execute the code (it has all necessary imports) 

1589 namespace = {} 

1590 exec(edited_code, namespace) 

1591 

1592 # Update pipeline data if present (composition: orchestrator contains pipelines) 

1593 if 'pipeline_data' in namespace: 

1594 new_pipeline_data = namespace['pipeline_data'] 

1595 # Update pipeline editor using reactive system (like pipeline code button does) 

1596 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1597 # Update plate pipelines storage 

1598 current_pipelines = dict(self.pipeline_editor.plate_pipelines) 

1599 current_pipelines.update(new_pipeline_data) 

1600 self.pipeline_editor.plate_pipelines = current_pipelines 

1601 

1602 # If current plate is in the edited data, update the current view too 

1603 current_plate = self.pipeline_editor.current_plate 

1604 if current_plate and current_plate in new_pipeline_data: 

1605 self.pipeline_editor.pipeline_steps = new_pipeline_data[current_plate] 

1606 

1607 self.app.current_status = f"Pipeline data updated for {len(new_pipeline_data)} plates" 

1608 

1609 # Update global config if present 

1610 elif 'global_config' in namespace: 

1611 new_global_config = namespace['global_config'] 

1612 import asyncio 

1613 for plate_path in plate_paths: 

1614 if plate_path in self.orchestrators: 

1615 orchestrator = self.orchestrators[plate_path] 

1616 asyncio.create_task(orchestrator.apply_new_global_config(new_global_config)) 

1617 self.app.current_status = f"Global config updated for {len(plate_paths)} plates" 

1618 

1619 # Update orchestrators list if present 

1620 elif 'orchestrators' in namespace: 

1621 new_orchestrators = namespace['orchestrators'] 

1622 self.app.current_status = f"Orchestrator list updated with {len(new_orchestrators)} orchestrators" 

1623 

1624 else: 

1625 self.app.show_error("Parse Error", "No valid assignments found in edited code") 

1626 

1627 except SyntaxError as e: 

1628 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}") 

1629 except Exception as e: 

1630 logger.error(f"Failed to parse edited orchestrator code: {e}") 

1631 self.app.show_error("Edit Error", f"Failed to parse orchestrator code: {str(e)}") 

1632 

1633 # Launch terminal editor 

1634 launcher = TerminalLauncher(self.app) 

1635 await launcher.launch_editor_for_file( 

1636 file_content=python_code, 

1637 file_extension='.py', 

1638 on_save_callback=handle_edited_code 

1639 ) 

1640 

1641 except Exception as e: 

1642 logger.error(f"Failed to generate plate code: {e}") 

1643 self.app.current_status = f"Failed to generate code: {e}" 

1644 

1645 async def action_save_python_script(self) -> None: 

1646 """Save Python script for selected plates (like special_io_pipeline.py).""" 

1647 logger.debug("Save button pressed - saving Python script for plates") 

1648 

1649 selected_items, _ = self.get_selection_state() 

1650 if not selected_items: 

1651 self.app.current_status = "No plates selected for script generation" 

1652 return 

1653 

1654 try: 

1655 # Get pipeline data for selected plates 

1656 plate_paths = [item['path'] for item in selected_items] 

1657 pipeline_data = {} 

1658 

1659 # Collect pipeline steps for each plate 

1660 for plate_path in plate_paths: 

1661 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1662 # Get pipeline steps from pipeline editor if available 

1663 if plate_path in self.pipeline_editor.plate_pipelines: 

1664 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path] 

1665 else: 

1666 pipeline_data[plate_path] = [] 

1667 else: 

1668 pipeline_data[plate_path] = [] 

1669 

1670 # Create data structure like pickle_to_python expects 

1671 data = { 

1672 'plate_paths': plate_paths, 

1673 'pipeline_data': pipeline_data, 

1674 'global_config': self.app.global_config 

1675 } 

1676 

1677 # Generate complete executable Python script using pickle_to_python logic 

1678 python_code = self._generate_executable_script(data) 

1679 

1680 # Launch file browser to save the script 

1681 from openhcs.textual_tui.windows.file_browser_window import open_file_browser_window, BrowserMode 

1682 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

1683 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

1684 from openhcs.constants.constants import Backend 

1685 

1686 def handle_save_result(result): 

1687 if result: 

1688 # Handle both single Path and list of Paths 

1689 save_path = None 

1690 if isinstance(result, Path): 

1691 save_path = result 

1692 elif isinstance(result, list) and len(result) > 0: 

1693 save_path = result[0] # Take first path 

1694 

1695 if save_path: 

1696 try: 

1697 # Write the Python script to the selected file 

1698 with open(save_path, 'w') as f: 

1699 f.write(python_code) 

1700 

1701 logger.info(f"Python script saved to: {save_path}") 

1702 self.app.current_status = f"Python script saved to: {save_path}" 

1703 except Exception as e: 

1704 logger.error(f"Failed to save Python script: {e}") 

1705 self.app.current_status = f"Failed to save script: {e}" 

1706 

1707 # Generate default filename based on first plate 

1708 first_plate_name = Path(plate_paths[0]).name if plate_paths else "pipeline" 

1709 default_filename = f"{first_plate_name}_pipeline.py" 

1710 

1711 await open_file_browser_window( 

1712 app=self.app, 

1713 file_manager=self.app.filemanager, 

1714 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

1715 backend=Backend.DISK, 

1716 title="Save Python Pipeline Script", 

1717 mode=BrowserMode.SAVE, 

1718 selection_mode=SelectionMode.FILES_ONLY, 

1719 filter_extensions=['.py'], 

1720 default_filename=default_filename, 

1721 cache_key=PathCacheKey.PIPELINE_FILES, 

1722 on_result_callback=handle_save_result, 

1723 caller_id="plate_manager_save_script" 

1724 ) 

1725 

1726 except Exception as e: 

1727 logger.error(f"Failed to save Python script: {e}") 

1728 self.app.current_status = f"Failed to save script: {e}" 

1729 

1730 def _generate_executable_script(self, data: Dict) -> str: 

1731 """Generate fully executable Python script by creating a temp pickle and using existing convert_pickle_to_python.""" 

1732 import tempfile 

1733 import dill as pickle 

1734 from openhcs.debug.pickle_to_python import convert_pickle_to_python 

1735 

1736 # Create temporary pickle file 

1737 with tempfile.NamedTemporaryFile(mode='wb', suffix='.pkl', delete=False) as temp_pickle: 

1738 pickle.dump(data, temp_pickle) 

1739 temp_pickle_path = temp_pickle.name 

1740 

1741 # Create temporary output file 

1742 with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp_output: 

1743 temp_output_path = temp_output.name 

1744 

1745 try: 

1746 # Use existing convert_pickle_to_python function 

1747 convert_pickle_to_python(temp_pickle_path, temp_output_path) 

1748 

1749 # Read the generated script 

1750 with open(temp_output_path, 'r') as f: 

1751 script_content = f.read() 

1752 

1753 return script_content 

1754 

1755 finally: 

1756 # Clean up temp files 

1757 import os 

1758 try: 

1759 os.unlink(temp_pickle_path) 

1760 os.unlink(temp_output_path) 

1761 except: 

1762 pass 

1763 

1764