Coverage for openhcs/textual_tui/widgets/plate_manager.py: 0.0%

940 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2PlateManagerWidget for OpenHCS Textual TUI 

3 

4Plate management widget with complete button set and reactive state management. 

5Matches the functionality from the current prompt-toolkit TUI. 

6""" 

7 

8import asyncio 

9import copy 

10import dataclasses 

11import inspect 

12import json 

13import logging 

14import numpy as np 

15import os 

16import pickle 

17import re 

18import signal 

19import stat 

20import subprocess 

21import sys 

22import tempfile 

23import time 

24import traceback 

25from collections import defaultdict 

26from datetime import datetime 

27from pathlib import Path 

28from typing import Dict, List, Optional, Callable, Any, Tuple 

29 

30from openhcs.core.config import PipelineConfig 

31 

32from PIL import Image 

33from textual.app import ComposeResult 

34from textual.containers import Horizontal, ScrollableContainer 

35from textual.reactive import reactive 

36from textual.widgets import Button, Static, SelectionList 

37from textual.widget import Widget 

38from textual.css.query import NoMatches 

39from .button_list_widget import ButtonListWidget, ButtonConfig 

40from textual import work, on 

41 

42from openhcs.core.config import GlobalPipelineConfig, VFSConfig, MaterializationBackend 

43from openhcs.core.pipeline import Pipeline 

44from openhcs.io.filemanager import FileManager 

45from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks 

46from openhcs.io.base import storage_registry 

47from openhcs.io.memory import MemoryStorageBackend 

48from openhcs.io.zarr import ZarrStorageBackend 

49from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator 

50from openhcs.constants.constants import GroupBy, Backend, VariableComponents, OrchestratorState 

51from openhcs.textual_tui.services.file_browser_service import SelectionMode 

52from openhcs.textual_tui.services.window_service import WindowService 

53from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey, get_path_cache 

54from openhcs.textual_tui.widgets.shared.signature_analyzer import SignatureAnalyzer 

55 

56logger = logging.getLogger(__name__) 

57 

58# Note: Using subprocess approach instead of multiprocessing to avoid TUI FD conflicts 

59 

60def get_orchestrator_status_symbol(orchestrator: PipelineOrchestrator) -> str: 

61 """Get UI symbol for orchestrator state - simple mapping without over-engineering.""" 

62 if orchestrator is None: 

63 return "?" # No orchestrator (newly added plate) 

64 

65 state_to_symbol = { 

66 OrchestratorState.CREATED: "?", # Created but not initialized 

67 OrchestratorState.READY: "-", # Initialized, ready for compilation 

68 OrchestratorState.COMPILED: "o", # Compiled, ready for execution 

69 OrchestratorState.EXECUTING: "!", # Execution in progress 

70 OrchestratorState.COMPLETED: "C", # Execution completed successfully 

71 OrchestratorState.INIT_FAILED: "I", # Initialization failed 

72 OrchestratorState.COMPILE_FAILED: "P", # Compilation failed (P for Pipeline) 

73 OrchestratorState.EXEC_FAILED: "X", # Execution failed 

74 } 

75 

76 return state_to_symbol.get(orchestrator.state, "?") 

77 

78def get_current_log_file_path() -> str: 

79 """Get the current log file path from the logging system.""" 

80 try: 

81 # Get the root logger and find the FileHandler 

82 root_logger = logging.getLogger() 

83 for handler in root_logger.handlers: 

84 if isinstance(handler, logging.FileHandler): 

85 return handler.baseFilename 

86 

87 # Fallback: try to get from openhcs logger 

88 openhcs_logger = logging.getLogger("openhcs") 

89 for handler in openhcs_logger.handlers: 

90 if isinstance(handler, logging.FileHandler): 

91 return handler.baseFilename 

92 

93 # Last resort: create a default path 

94 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs" 

95 log_dir.mkdir(parents=True, exist_ok=True) 

96 return str(log_dir / f"openhcs_subprocess_{int(time.time())}.log") 

97 

98 except Exception as e: 

99 logger.warning(f"Could not determine log file path: {e}") 

100 return "/tmp/openhcs_subprocess.log" 

101 

102 

103 

104 

105 

106 

107 

108class PlateManagerWidget(ButtonListWidget): 

109 """ 

110 Plate management widget using Textual reactive state. 

111 """ 

112 

113 # Semantic reactive property (like PipelineEditor's pipeline_steps) 

114 selected_plate = reactive("") 

115 orchestrators = reactive({}) 

116 plate_configs = reactive({}) 

117 orchestrator_state_version = reactive(0) # Increment to trigger UI refresh 

118 

119 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig): 

120 button_configs = [ 

121 ButtonConfig("Add", "add_plate"), 

122 ButtonConfig("Del", "del_plate", disabled=True), 

123 ButtonConfig("Edit", "edit_config", disabled=True), # Unified edit button for config editing 

124 ButtonConfig("Init", "init_plate", disabled=True), 

125 ButtonConfig("Compile", "compile_plate", disabled=True), 

126 ButtonConfig("Run", "run_plate", disabled=True), 

127 ButtonConfig("Code", "code_plate", disabled=True), # Generate Python code 

128 ButtonConfig("Save", "save_python_script", disabled=True), # Save Python script 

129 # ButtonConfig("Export", "export_ome_zarr", disabled=True), # Export to OME-ZARR - HIDDEN FROM UI 

130 ] 

131 super().__init__( 

132 button_configs=button_configs, 

133 list_id="plate_content", 

134 container_id="plate_list", 

135 on_button_pressed=self._handle_button_press, 

136 on_selection_changed=self._handle_selection_change, 

137 on_item_moved=self._handle_item_moved 

138 ) 

139 self.filemanager = filemanager 

140 self.global_config = global_config 

141 self.plate_compiled_data = {} 

142 self.on_plate_selected: Optional[Callable[[str], None]] = None 

143 self.pipeline_editor: Optional['PipelineEditorWidget'] = None 

144 

145 # Initialize window service to avoid circular imports 

146 self.window_service = None # Will be set in on_mount 

147 

148 # --- Subprocess Architecture --- 

149 self.current_process: Optional[subprocess.Popen] = None 

150 self.log_file_path: Optional[str] = None # Single source of truth 

151 self.log_file_position: int = 0 # Track position in log file for incremental reading 

152 # Async monitoring using Textual's interval system 

153 self.monitoring_interval = None 

154 self.monitoring_active = False 

155 # --- 

156 

157 logger.debug("PlateManagerWidget initialized") 

158 

159 

160 

161 

162 

163 def on_unmount(self) -> None: 

164 logger.debug("Unmounting PlateManagerWidget, ensuring worker process is terminated.") 

165 # Schedule async stop execution since on_unmount is sync 

166 import asyncio 

167 if self.current_process and self.current_process.poll() is None: 

168 # Create a task to stop execution asynchronously 

169 asyncio.create_task(self.action_stop_execution()) 

170 self._stop_monitoring() 

171 

172 def format_item_for_display(self, plate: Dict) -> Tuple[str, str]: 

173 # Get status from orchestrator instead of magic string 

174 plate_path = plate.get('path', '') 

175 orchestrator = self.orchestrators.get(plate_path) 

176 status_symbol = get_orchestrator_status_symbol(orchestrator) 

177 

178 status_symbols = { 

179 "?": "➕", # Created (not initialized) 

180 "-": "✅", # Ready (initialized) 

181 "o": "⚡", # Compiled 

182 "!": "🔄", # Executing 

183 "C": "🏁", # Completed 

184 "I": "🚫", # Init failed 

185 "P": "💥", # Compile failed (Pipeline) 

186 "X": "❌" # Execution failed 

187 } 

188 status_icon = status_symbols.get(status_symbol, "❓") 

189 plate_name = plate.get('name', 'Unknown') 

190 display_text = f"{status_icon} {plate_name} - {plate_path}" 

191 return display_text, plate_path 

192 

193 async def _handle_button_press(self, button_id: str) -> None: 

194 action_map = { 

195 "add_plate": self.action_add_plate, 

196 "del_plate": self.action_delete_plate, 

197 "edit_config": self.action_edit_config, # Unified edit button 

198 "init_plate": self.action_init_plate, 

199 "compile_plate": self.action_compile_plate, 

200 "code_plate": self.action_code_plate, # Generate Python code 

201 "save_python_script": self.action_save_python_script, # Save Python script 

202 # "export_ome_zarr": self.action_export_ome_zarr, # HIDDEN 

203 } 

204 if button_id in action_map: 

205 action = action_map[button_id] 

206 if inspect.iscoroutinefunction(action): 

207 await action() 

208 else: 

209 action() 

210 elif button_id == "run_plate": 

211 if self._is_any_plate_running(): 

212 await self.action_stop_execution() 

213 else: 

214 await self.action_run_plate() 

215 

216 def _handle_selection_change(self, selected_values: List[str]) -> None: 

217 logger.debug(f"Checkmarks changed: {len(selected_values)} items selected") 

218 

219 def _handle_item_moved(self, from_index: int, to_index: int) -> None: 

220 current_plates = list(self.items) 

221 plate = current_plates.pop(from_index) 

222 current_plates.insert(to_index, plate) 

223 self.items = current_plates 

224 plate_name = plate['name'] 

225 direction = "up" if to_index < from_index else "down" 

226 self.app.current_status = f"Moved plate '{plate_name}' {direction}" 

227 

228 def on_mount(self) -> None: 

229 # Initialize window service 

230 self.window_service = WindowService(self.app) 

231 

232 self.call_later(self._delayed_update_display) 

233 self.call_later(self._update_button_states) 

234 

235 def watch_items(self, items: List[Dict]) -> None: 

236 """Automatically update UI when items changes (follows ButtonListWidget pattern).""" 

237 # DEBUG: Log when items list changes to track the source of the reset 

238 stack_trace = ''.join(traceback.format_stack()[-3:-1]) # Get last 2 stack frames 

239 logger.debug(f"🔍 ITEMS CHANGED: {len(items)} plates. Call stack:\n{stack_trace}") 

240 

241 # CRITICAL: Call parent's watch_items to update the SelectionList 

242 super().watch_items(items) 

243 

244 logger.debug(f"Plates updated: {len(items)} plates") 

245 self._update_button_states() 

246 

247 def watch_highlighted_item(self, plate_path: str) -> None: 

248 self.selected_plate = plate_path 

249 logger.debug(f"Highlighted plate: {plate_path}") 

250 

251 def watch_selected_plate(self, plate_path: str) -> None: 

252 self._update_button_states() 

253 if self.on_plate_selected and plate_path: 

254 self.on_plate_selected(plate_path) 

255 logger.debug(f"Selected plate: {plate_path}") 

256 

257 def watch_orchestrator_state_version(self, version: int) -> None: 

258 """Automatically refresh UI when orchestrator states change.""" 

259 # Only update UI if widget is properly mounted 

260 if not self.is_mounted: 

261 return 

262 

263 # Force SelectionList to update by calling _update_selection_list 

264 # This re-calls format_item_for_display() for all items 

265 self._update_selection_list() 

266 

267 # CRITICAL: Update main button states when orchestrator states change 

268 self._update_button_states() 

269 

270 # Also notify PipelineEditor if connected 

271 if self.pipeline_editor: 

272 logger.debug(f"PlateManager: Notifying PipelineEditor of orchestrator state change (version {version})") 

273 self.pipeline_editor._update_button_states() 

274 

275 def get_selection_state(self) -> tuple[List[Dict], str]: 

276 # Check if widget is properly mounted first 

277 if not self.is_mounted: 

278 logger.debug("get_selection_state called on unmounted widget") 

279 return [], "empty" 

280 

281 try: 

282 selection_list = self.query_one(f"#{self.list_id}") 

283 multi_selected_values = selection_list.selected 

284 if multi_selected_values: 

285 selected_items = [p for p in self.items if p.get('path') in multi_selected_values] 

286 return selected_items, "checkbox" 

287 elif self.selected_plate: 

288 selected_items = [p for p in self.items if p.get('path') == self.selected_plate] 

289 return selected_items, "cursor" 

290 else: 

291 return [], "empty" 

292 except Exception as e: 

293 # DOM CORRUPTION DETECTED - This is a critical error 

294 stack_trace = ''.join(traceback.format_stack()[-3:-1]) 

295 logger.error(f"🚨 DOM CORRUPTION: Failed to get selection state: {e}") 

296 logger.error(f"🚨 DOM CORRUPTION: Call stack:\n{stack_trace}") 

297 logger.error(f"🚨 DOM CORRUPTION: Widget mounted: {self.is_mounted}") 

298 logger.error(f"🚨 DOM CORRUPTION: Looking for: #{self.list_id}") 

299 logger.error(f"🚨 DOM CORRUPTION: Plates count: {len(self.items)}") 

300 

301 # Try to diagnose what widgets actually exist 

302 try: 

303 all_widgets = list(self.query("*")) 

304 widget_ids = [w.id for w in all_widgets if w.id] 

305 logger.error(f"🚨 DOM CORRUPTION: Available widget IDs: {widget_ids}") 

306 except Exception as diag_e: 

307 logger.error(f"🚨 DOM CORRUPTION: Could not diagnose widgets: {diag_e}") 

308 

309 if self.selected_plate: 

310 selected_items = [p for p in self.items if p.get('path') == self.selected_plate] 

311 return selected_items, "cursor" 

312 return [], "empty" 

313 

314 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str: 

315 count = len(selected_items) 

316 if count == 0: return f"No items for {operation}" 

317 if count == 1: return f"{operation.title()} item: {selected_items[0].get('name', 'Unknown')}" 

318 return f"{operation.title()} {count} items" 

319 

320 def _delayed_update_display(self) -> None: 

321 """Trigger UI update - no longer needed since reactive system handles this automatically.""" 

322 # The reactive system now handles updates automatically via watch_plates() 

323 # This method is kept for compatibility but does nothing 

324 pass 

325 

326 def _trigger_ui_refresh(self) -> None: 

327 """Force UI refresh when orchestrator state changes without items list changing.""" 

328 # Increment reactive counter to trigger automatic UI refresh 

329 self.orchestrator_state_version += 1 

330 

331 def _update_button_states(self) -> None: 

332 try: 

333 # Check if widget is mounted and buttons exist 

334 if not self.is_mounted: 

335 return 

336 

337 has_selection = bool(self.selected_plate) 

338 is_running = self._is_any_plate_running() 

339 

340 # Check if there are any selected items (for delete button) 

341 selected_items, _ = self.get_selection_state() 

342 has_selected_items = bool(selected_items) 

343 

344 can_run = has_selection and any(p['path'] in self.plate_compiled_data for p in self.items if p.get('path') == self.selected_plate) 

345 

346 # Try to get run button - if it doesn't exist, widget is not fully mounted 

347 try: 

348 run_button = self.query_one("#run_plate") 

349 if is_running: 

350 run_button.label = "Stop" 

351 run_button.disabled = False 

352 else: 

353 run_button.label = "Run" 

354 run_button.disabled = not can_run 

355 except: 

356 # Buttons not mounted yet, skip update 

357 return 

358 

359 self.query_one("#add_plate").disabled = is_running 

360 self.query_one("#del_plate").disabled = not self.items or not has_selected_items or is_running 

361 

362 # Edit button (config editing) enabled when 1+ orchestrators selected and initialized 

363 selected_items, _ = self.get_selection_state() 

364 edit_enabled = ( 

365 len(selected_items) > 0 and 

366 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

367 not is_running 

368 ) 

369 self.query_one("#edit_config").disabled = not edit_enabled 

370 

371 # Init button - enabled when plates are selected, can be initialized, and not running 

372 init_enabled = ( 

373 len(selected_items) > 0 and 

374 any(self._can_orchestrator_be_initialized(item['path']) for item in selected_items) and 

375 not is_running 

376 ) 

377 self.query_one("#init_plate").disabled = not init_enabled 

378 

379 # Compile button - enabled when plates are selected, initialized, and not running 

380 selected_items, _ = self.get_selection_state() 

381 compile_enabled = ( 

382 len(selected_items) > 0 and 

383 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

384 not is_running 

385 ) 

386 self.query_one("#compile_plate").disabled = not compile_enabled 

387 

388 # Code button - enabled when plates are selected, initialized, and not running 

389 code_enabled = ( 

390 len(selected_items) > 0 and 

391 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

392 not is_running 

393 ) 

394 self.query_one("#code_plate").disabled = not code_enabled 

395 

396 # Save Python script button - enabled when plates are selected, initialized, and not running 

397 save_enabled = ( 

398 len(selected_items) > 0 and 

399 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and 

400 not is_running 

401 ) 

402 self.query_one("#save_python_script").disabled = not save_enabled 

403 

404 # Export button - enabled when plate is initialized and has workspace (HIDDEN FROM UI) 

405 # export_enabled = ( 

406 # has_selection and 

407 # self.selected_plate in self.orchestrators and 

408 # not is_running 

409 # ) 

410 # try: 

411 # self.query_one("#export_ome_zarr").disabled = not export_enabled 

412 # except: 

413 # pass # Button is hidden from UI 

414 

415 # Debug button removed - no longer needed 

416 

417 except Exception as e: 

418 # Only log if it's not a mounting/unmounting issue 

419 if self.is_mounted: 

420 logger.debug(f"Button state update skipped (widget not ready): {e}") 

421 # Don't log errors during unmounting 

422 

423 def _is_any_plate_running(self) -> bool: 

424 return self.current_process is not None and self.current_process.poll() is None 

425 

426 def _has_pipelines(self, plates: List[Dict]) -> bool: 

427 """Check if all plates have pipeline definitions.""" 

428 if not self.pipeline_editor: 

429 return False 

430 

431 for plate in plates: 

432 pipeline = self.pipeline_editor.get_pipeline_for_plate(plate['path']) 

433 if not pipeline: 

434 return False 

435 return True 

436 

437 def get_plate_status(self, plate_path: str) -> str: 

438 """Get status for specific plate - now uses orchestrator state.""" 

439 orchestrator = self.orchestrators.get(plate_path) 

440 return get_orchestrator_status_symbol(orchestrator) 

441 

442 def _is_orchestrator_initialized(self, plate_path: str) -> bool: 

443 """Check if orchestrator exists and is in an initialized state.""" 

444 orchestrator = self.orchestrators.get(plate_path) 

445 if orchestrator is None: 

446 return False 

447 return orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED, 

448 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED, 

449 OrchestratorState.EXEC_FAILED] 

450 

451 def _can_orchestrator_be_initialized(self, plate_path: str) -> bool: 

452 """Check if orchestrator can be initialized (doesn't exist or is in a re-initializable state).""" 

453 orchestrator = self.orchestrators.get(plate_path) 

454 if orchestrator is None: 

455 return True # No orchestrator exists, can be initialized 

456 return orchestrator.state in [OrchestratorState.CREATED, OrchestratorState.INIT_FAILED] 

457 

458 def _notify_pipeline_editor_status_change(self, plate_path: str, new_status: str) -> None: 

459 """Notify pipeline editor when plate status changes (enables Add button immediately).""" 

460 if self.pipeline_editor and self.pipeline_editor.current_plate == plate_path: 

461 # Update pipeline editor's status and trigger button state update 

462 self.pipeline_editor.current_plate_status = new_status 

463 

464 def _get_current_pipeline_definition(self, plate_path: str = None) -> List: 

465 """Get current pipeline definition from PipelineEditor (now returns FunctionStep objects directly).""" 

466 if not self.pipeline_editor: 

467 logger.warning("No pipeline editor reference - using empty pipeline") 

468 return [] 

469 

470 # Get pipeline for specific plate or current plate 

471 target_plate = plate_path or self.pipeline_editor.current_plate 

472 if not target_plate: 

473 logger.warning("No plate specified - using empty pipeline") 

474 return [] 

475 

476 # Get pipeline from editor (now returns List[FunctionStep] directly) 

477 pipeline_steps = self.pipeline_editor.get_pipeline_for_plate(target_plate) 

478 

479 # No conversion needed - pipeline_steps are already FunctionStep objects with memory type decorators 

480 return pipeline_steps 

481 

482 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str: 

483 """Generate human-readable description of what will be operated on.""" 

484 count = len(selected_items) 

485 if selection_mode == "empty": 

486 return f"No items available for {operation}" 

487 elif selection_mode == "all": 

488 return f"{operation.title()} ALL {count} items" 

489 elif selection_mode == "checkbox": 

490 if count == 1: 

491 item_name = selected_items[0].get('name', 'Unknown') 

492 return f"{operation.title()} selected item: {item_name}" 

493 else: 

494 return f"{operation.title()} {count} selected items" 

495 else: 

496 return f"{operation.title()} {count} items" 

497 

498 def _reset_execution_state(self, status_message: str, force_fail_executing: bool = True): 

499 if self.current_process: 

500 if self.current_process.poll() is None: # Still running 

501 logger.warning("Forcefully terminating subprocess during reset.") 

502 self.current_process.terminate() 

503 try: 

504 self.current_process.wait(timeout=1) 

505 except subprocess.TimeoutExpired: 

506 self.current_process.kill() # Force kill if terminate fails 

507 self.current_process = None 

508 

509 # Clear log file reference (no temp files - log file is single source of truth) 

510 self.log_file_path = None 

511 self.log_file_position = 0 

512 

513 # Stop async monitoring 

514 self._stop_monitoring() 

515 

516 # Only reset executing orchestrators to failed if this is a forced termination 

517 # Natural completion should preserve the states set by the completion handler 

518 if force_fail_executing: 

519 for plate_path, orchestrator in self.orchestrators.items(): 

520 if orchestrator.state == OrchestratorState.EXECUTING: 

521 orchestrator._state = OrchestratorState.EXEC_FAILED 

522 

523 # Trigger UI refresh after state changes - this is essential for button states 

524 self._trigger_ui_refresh() 

525 

526 # Update button states - but only if widget is properly mounted 

527 try: 

528 if self.is_mounted and hasattr(self, 'query_one'): 

529 self._update_button_states() 

530 except Exception as e: 

531 logger.error(f"Failed to update button states during reset: {e}") 

532 

533 self.app.current_status = status_message 

534 

535 async def action_run_plate(self) -> None: 

536 # Clear logs from singleton toolong window before starting new run 

537 try: 

538 from openhcs.textual_tui.windows.toolong_window import clear_toolong_logs 

539 logger.info("Clearing logs from singleton toolong window before new run") 

540 clear_toolong_logs(self.app) 

541 logger.info("Toolong logs cleared") 

542 except Exception as e: 

543 logger.error(f"Failed to clear toolong logs: {e}") 

544 import traceback 

545 logger.error(traceback.format_exc()) 

546 

547 selected_items, _ = self.get_selection_state() 

548 if not selected_items: 

549 self.app.show_error("No plates selected to run.") 

550 return 

551 

552 ready_items = [item for item in selected_items if item.get('path') in self.plate_compiled_data] 

553 if not ready_items: 

554 self.app.show_error("Selected plates are not compiled. Please compile first.") 

555 return 

556 

557 try: 

558 # Use subprocess approach like integration tests 

559 logger.debug("🔥 Using subprocess approach for clean isolation") 

560 

561 plate_paths_to_run = [item['path'] for item in ready_items] 

562 

563 # Pass definition pipeline steps - subprocess will make fresh copy and compile 

564 pipeline_data = {} 

565 for plate_path in plate_paths_to_run: 

566 definition_pipeline = self._get_current_pipeline_definition(plate_path) 

567 pipeline_data[plate_path] = definition_pipeline 

568 

569 logger.info(f"🔥 Starting subprocess for {len(plate_paths_to_run)} plates") 

570 

571 # Create data file for subprocess (only file needed besides log) 

572 data_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') 

573 

574 # Generate unique ID for this subprocess 

575 import time 

576 subprocess_timestamp = int(time.time()) 

577 plate_names = [Path(path).name for path in plate_paths_to_run] 

578 unique_id = f"plates_{'_'.join(plate_names[:2])}_{subprocess_timestamp}" # Limit to first 2 plates for filename length 

579 

580 # Build subprocess log name from TUI log base using log utilities 

581 from openhcs.core.log_utils import get_current_log_file_path 

582 try: 

583 tui_log_path = get_current_log_file_path() 

584 if tui_log_path.endswith('.log'): 

585 tui_base = tui_log_path[:-4] # Remove .log extension 

586 else: 

587 tui_base = tui_log_path 

588 log_file_base = f"{tui_base}_subprocess_{subprocess_timestamp}" 

589 except RuntimeError: 

590 # Fallback if no main log found 

591 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs" 

592 log_dir.mkdir(parents=True, exist_ok=True) 

593 log_file_base = str(log_dir / f"tui_subprocess_{subprocess_timestamp}") 

594 

595 # Pickle data for subprocess 

596 subprocess_data = { 

597 'plate_paths': plate_paths_to_run, 

598 'pipeline_data': pipeline_data, 

599 'global_config': self.app.global_config # Pickle config object directly 

600 } 

601 

602 # Resolve all lazy configurations to concrete values before pickling 

603 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization 

604 resolved_subprocess_data = resolve_lazy_configurations_for_serialization(subprocess_data) 

605 

606 # Wrap pickle operation in executor to avoid blocking UI 

607 def _write_pickle_data(): 

608 import dill as pickle 

609 with open(data_file.name, 'wb') as f: 

610 pickle.dump(resolved_subprocess_data, f) 

611 data_file.close() 

612 

613 await asyncio.get_event_loop().run_in_executor(None, _write_pickle_data) 

614 

615 logger.debug(f"🔥 Created data file: {data_file.name}") 

616 

617 # Create subprocess (like integration tests) 

618 subprocess_script = Path(__file__).parent.parent / "subprocess_runner.py" 

619 

620 # Generate actual log file path that subprocess will create 

621 actual_log_file_path = f"{log_file_base}_{unique_id}.log" 

622 logger.debug(f"🔥 Log file base: {log_file_base}") 

623 logger.debug(f"🔥 Unique ID: {unique_id}") 

624 logger.debug(f"🔥 Actual log file: {actual_log_file_path}") 

625 

626 # Store log file path for monitoring (subprocess logger writes to this) 

627 self.log_file_path = actual_log_file_path 

628 self.log_file_position = self._get_current_log_position() # Start from current end 

629 

630 logger.debug(f"🔥 Subprocess command: {sys.executable} {subprocess_script} {data_file.name} {log_file_base} {unique_id}") 

631 logger.debug(f"🔥 Subprocess logger will write to: {self.log_file_path}") 

632 logger.debug(f"🔥 Subprocess stdout will be silenced (logger handles output)") 

633 

634 # SIMPLE SUBPROCESS: Let subprocess log to its own file (single source of truth) 

635 # Wrap subprocess creation in executor to avoid blocking UI 

636 def _create_subprocess(): 

637 return subprocess.Popen([ 

638 sys.executable, str(subprocess_script), 

639 data_file.name, log_file_base, unique_id # Only data file and log - no temp files 

640 ], 

641 stdout=subprocess.DEVNULL, # Subprocess logs to its own file 

642 stderr=subprocess.DEVNULL, # Subprocess logs to its own file 

643 text=True, # Text mode for easier handling 

644 ) 

645 

646 self.current_process = await asyncio.get_event_loop().run_in_executor(None, _create_subprocess) 

647 

648 logger.info(f"🔥 Subprocess started with PID: {self.current_process.pid}") 

649 

650 # Subprocess logs to its own dedicated file - no output monitoring needed 

651 

652 # Update orchestrator states to show running state 

653 for plate in ready_items: 

654 plate_path = plate['path'] 

655 if plate_path in self.orchestrators: 

656 self.orchestrators[plate_path]._state = OrchestratorState.EXECUTING 

657 

658 # Trigger UI refresh after state changes 

659 self._trigger_ui_refresh() 

660 

661 self.app.current_status = f"Running {len(ready_items)} plate(s) in subprocess..." 

662 self._update_button_states() 

663 

664 # Start reactive log monitoring 

665 self._start_log_monitoring() 

666 

667 # Start async monitoring 

668 await self._start_monitoring() 

669 

670 except Exception as e: 

671 logger.critical(f"Failed to start subprocess: {e}", exc_info=True) 

672 self.app.show_error("Failed to start subprocess", str(e)) 

673 self._reset_execution_state("Subprocess failed to start") 

674 

675 def _start_log_monitoring(self) -> None: 

676 """Start reactive log monitoring for subprocess logs.""" 

677 if not self.log_file_path: 

678 logger.warning("Cannot start log monitoring: no log file path") 

679 return 

680 

681 try: 

682 # Extract base path from log file path (remove .log extension) 

683 log_path = Path(self.log_file_path) 

684 base_log_path = str(log_path.with_suffix('')) 

685 

686 # Notify status bar to start log monitoring 

687 if hasattr(self.app, 'status_bar') and self.app.status_bar: 

688 self.app.status_bar.start_log_monitoring(base_log_path) 

689 logger.debug(f"Started reactive log monitoring for: {base_log_path}") 

690 else: 

691 logger.warning("Status bar not available for log monitoring") 

692 

693 except Exception as e: 

694 logger.error(f"Failed to start log monitoring: {e}") 

695 

696 def _stop_log_monitoring(self) -> None: 

697 """Stop reactive log monitoring.""" 

698 try: 

699 # Notify status bar to stop log monitoring 

700 if hasattr(self.app, 'status_bar') and self.app.status_bar: 

701 self.app.status_bar.stop_log_monitoring() 

702 logger.debug("Stopped reactive log monitoring") 

703 except Exception as e: 

704 logger.error(f"Failed to stop log monitoring: {e}") 

705 

706 def _get_current_log_position(self) -> int: 

707 """Get current position in log file.""" 

708 if not self.log_file_path or not Path(self.log_file_path).exists(): 

709 return 0 

710 try: 

711 return Path(self.log_file_path).stat().st_size 

712 except Exception: 

713 return 0 

714 

715 

716 

717 def _stop_file_watcher(self) -> None: 

718 """Stop file system watcher without blocking.""" 

719 if not self.file_observer: 

720 return 

721 

722 try: 

723 # Just stop and abandon - don't wait for anything 

724 self.file_observer.stop() 

725 except Exception: 

726 pass # Ignore errors 

727 finally: 

728 # Always clear references immediately 

729 self.file_observer = None 

730 self.file_watcher = None 

731 

732 

733 

734 async def _start_monitoring(self) -> None: 

735 """Start async monitoring using Textual's interval system.""" 

736 # Stop any existing monitoring 

737 self._stop_monitoring() 

738 

739 if self.monitoring_active: 

740 return 

741 

742 self.monitoring_active = True 

743 # Use Textual's set_interval for periodic async monitoring 

744 self.monitoring_interval = self.set_interval( 

745 10.0, # Check every 10 seconds 

746 self._check_process_status_async, 

747 pause=False 

748 ) 

749 logger.debug("Started async process monitoring") 

750 

751 def _stop_monitoring(self) -> None: 

752 """Stop async monitoring.""" 

753 if self.monitoring_interval: 

754 self.monitoring_interval.stop() 

755 self.monitoring_interval = None 

756 self.monitoring_active = False 

757 

758 # Also stop log monitoring 

759 self._stop_log_monitoring() 

760 

761 logger.debug("Stopped async process monitoring") 

762 

763 async def _check_process_status_async(self) -> None: 

764 """Async process status check - replaces worker thread.""" 

765 if not self.monitoring_active: 

766 return 

767 

768 try: 

769 # Simple direct access - no threading, no locks needed 

770 if not self._is_any_plate_running(): 

771 logger.debug("🔥 MONITOR: Subprocess finished") 

772 

773 # Stop monitoring first 

774 self._stop_monitoring() 

775 

776 # Handle completion directly - no call_from_thread needed 

777 await self._handle_process_completion() 

778 

779 except Exception as e: 

780 logger.debug(f"Error in async process monitoring: {e}") 

781 # Continue monitoring on error 

782 

783 async def _handle_process_completion(self) -> None: 

784 """Handle subprocess completion - read from log file (single source of truth).""" 

785 # Determine success/failure from log file content (single source of truth) 

786 success = False 

787 

788 if self.log_file_path and Path(self.log_file_path).exists(): 

789 try: 

790 # Read log file directly to check for success markers 

791 with open(self.log_file_path, 'r') as f: 

792 log_content = f.read() 

793 # Look for success markers in the log 

794 has_execution_success = "🔥 SUBPROCESS: EXECUTION SUCCESS:" in log_content 

795 has_all_completed = "All plates completed successfully" in log_content 

796 if has_execution_success and has_all_completed: 

797 success = True 

798 

799 except Exception as e: 

800 logger.error(f"Error reading subprocess log file: {e}") 

801 success = False 

802 

803 # Clean up the subprocess 

804 logger.info("🔥 MONITOR: Starting process cleanup...") 

805 if self.current_process: 

806 try: 

807 self.current_process.wait() # Clean up the zombie process 

808 logger.info("🔥 MONITOR: Process cleanup completed") 

809 except Exception as e: 

810 logger.warning(f"🔥 MONITOR: Error during process cleanup: {e}") 

811 

812 # Update orchestrator states based on log file analysis (single source of truth) 

813 if success: 

814 # Success - update orchestrators to completed 

815 for plate_path, orchestrator in self.orchestrators.items(): 

816 if orchestrator.state == OrchestratorState.EXECUTING: 

817 orchestrator._state = OrchestratorState.COMPLETED 

818 

819 # Reset execution state (this will trigger UI refresh internally) 

820 self._reset_execution_state("Execution completed successfully.", force_fail_executing=False) 

821 else: 

822 # Failure - update orchestrators to failed 

823 for plate_path, orchestrator in self.orchestrators.items(): 

824 if orchestrator.state == OrchestratorState.EXECUTING: 

825 orchestrator._state = OrchestratorState.EXEC_FAILED 

826 

827 # Reset execution state (this will trigger UI refresh internally) 

828 self._reset_execution_state("Execution failed.", force_fail_executing=False) 

829 

830 self._stop_monitoring() # Stop monitoring since process is done 

831 

832 async def _read_log_file_incremental(self) -> None: 

833 """Read new content from the log file since last read.""" 

834 if not self.log_file_path: 

835 self.app.current_status = "🔥 LOG READER: No log file" 

836 return 

837 

838 try: 

839 # Wrap all file I/O operations in executor to avoid blocking UI 

840 def _read_log_content(): 

841 if not Path(self.log_file_path).exists(): 

842 return None, self.log_file_position 

843 

844 with open(self.log_file_path, 'r') as f: 

845 # Seek to where we left off 

846 f.seek(self.log_file_position) 

847 new_content = f.read() 

848 # Update position for next read 

849 new_position = f.tell() 

850 

851 return new_content, new_position 

852 

853 new_content, new_position = await asyncio.get_event_loop().run_in_executor(None, _read_log_content) 

854 self.log_file_position = new_position 

855 

856 if new_content is None: 

857 self.app.current_status = "🔥 LOG READER: No log file" 

858 return 

859 

860 if new_content and new_content.strip(): 

861 # Get the last non-empty line from new content 

862 lines = new_content.strip().split('\n') 

863 non_empty_lines = [line.strip() for line in lines if line.strip()] 

864 

865 if non_empty_lines: 

866 # Show the last line, truncated if too long 

867 last_line = non_empty_lines[-1] 

868 if len(last_line) > 100: 

869 last_line = last_line[:97] + "..." 

870 

871 self.app.current_status = last_line 

872 else: 

873 self.app.current_status = "🔥 LOG READER: No lines found" 

874 else: 

875 self.app.current_status = "🔥 LOG READER: No new content" 

876 

877 except Exception as e: 

878 self.app.current_status = f"🔥 LOG READER ERROR: {e}" 

879 

880 

881 

882 async def action_stop_execution(self) -> None: 

883 logger.info("🛑 Stop button pressed. Terminating subprocess.") 

884 self.app.current_status = "Terminating execution..." 

885 

886 # Stop async monitoring first 

887 self._stop_monitoring() 

888 

889 if self.current_process and self.current_process.poll() is None: # Still running 

890 try: 

891 # Kill the entire process group, not just the parent process 

892 # The subprocess creates its own process group, so we need to kill that group 

893 logger.info(f"🛑 Killing process group for PID {self.current_process.pid}...") 

894 

895 # Get the process group ID (should be same as PID since subprocess calls os.setpgrp()) 

896 process_group_id = self.current_process.pid 

897 

898 # Kill entire process group (negative PID kills process group) 

899 os.killpg(process_group_id, signal.SIGTERM) 

900 

901 # Give processes time to exit gracefully 

902 await asyncio.sleep(1) 

903 

904 # Force kill if still alive 

905 try: 

906 os.killpg(process_group_id, signal.SIGKILL) 

907 logger.info(f"🛑 Force killed process group {process_group_id}") 

908 except ProcessLookupError: 

909 logger.info(f"🛑 Process group {process_group_id} already terminated") 

910 

911 except Exception as e: 

912 logger.warning(f"🛑 Error killing process group: {e}, falling back to single process kill") 

913 # Fallback to killing just the main process 

914 self.current_process.kill() 

915 

916 self._reset_execution_state("Execution terminated by user.") 

917 

918 

919 

920 async def action_add_plate(self) -> None: 

921 """Handle Add Plate button.""" 

922 await self._open_plate_directory_browser() 

923 

924 async def action_export_ome_zarr(self) -> None: 

925 """Export selected plate to OME-ZARR format.""" 

926 if not self.selected_plate: 

927 self.app.show_error("No Selection", "Please select a plate to export.") 

928 return 

929 

930 # Get the orchestrator for the selected plate 

931 orchestrator = self.orchestrators.get(self.selected_plate) 

932 if not orchestrator: 

933 self.app.show_error("Not Initialized", "Please initialize the plate before exporting.") 

934 return 

935 

936 # Open file browser for export location 

937 def handle_export_result(selected_paths): 

938 if selected_paths: 

939 export_path = Path(selected_paths[0]) if isinstance(selected_paths, list) else Path(selected_paths) 

940 self._start_ome_zarr_export(orchestrator, export_path) 

941 

942 await self.window_service.open_file_browser( 

943 file_manager=self.filemanager, 

944 initial_path=get_cached_browser_path(PathCacheKey.GENERAL), 

945 backend=Backend.DISK, 

946 title="Select OME-ZARR Export Directory", 

947 mode="save", 

948 selection_mode=SelectionMode.DIRECTORIES_ONLY, 

949 cache_key=PathCacheKey.GENERAL, 

950 on_result_callback=handle_export_result, 

951 caller_id="plate_manager_export" 

952 ) 

953 

954 def _start_ome_zarr_export(self, orchestrator, export_path: Path): 

955 """Start OME-ZARR export process.""" 

956 async def run_export(): 

957 try: 

958 self.app.current_status = f"Exporting to OME-ZARR: {export_path}" 

959 

960 # Create export-specific config with ZARR materialization 

961 from openhcs.core.config import GlobalPipelineConfig 

962 from openhcs.config_framework.global_config import get_current_global_config 

963 export_config = get_current_global_config(GlobalPipelineConfig) 

964 export_vfs_config = VFSConfig( 

965 intermediate_backend=export_config.vfs_config.intermediate_backend, 

966 materialization_backend=MaterializationBackend.ZARR 

967 ) 

968 

969 # Update orchestrator config for export 

970 export_global_config = dataclasses.replace(export_config, vfs=export_vfs_config) 

971 

972 # Create zarr backend with OME-ZARR enabled 

973 zarr_backend = ZarrStorageBackend(ome_zarr_metadata=True) 

974 

975 # Copy processed data from current workspace/plate to OME-ZARR format 

976 # For OpenHCS format, workspace_path is None, so use input_dir (plate path) 

977 source_path = orchestrator.workspace_path or orchestrator.input_dir 

978 if source_path and source_path.exists(): 

979 # Find processed images in workspace/plate 

980 processed_images = list(source_path.rglob("*.tif")) 

981 

982 if processed_images: 

983 # Group by well for batch operations 

984 wells_data = defaultdict(list) 

985 

986 for img_path in processed_images: 

987 # Extract well from filename 

988 well_match = None 

989 # Try ImageXpress pattern: A01_s001_w1_z001.tif 

990 match = re.search(r'([A-Z]\d{2})_', img_path.name) 

991 if match: 

992 well_id = match.group(1) 

993 wells_data[well_id].append(img_path) 

994 

995 # Export each well to OME-ZARR 

996 export_store_path = export_path / "plate.zarr" 

997 

998 for well_id, well_images in wells_data.items(): 

999 # Load images 

1000 images = [] 

1001 for img_path in well_images: 

1002 img = Image.open(img_path) 

1003 images.append(np.array(img)) 

1004 

1005 # Create output paths for OME-ZARR structure 

1006 output_paths = [export_store_path / f"{well_id}_{i:03d}.tif" 

1007 for i in range(len(images))] 

1008 

1009 # Save to OME-ZARR format 

1010 zarr_backend.save_batch(images, output_paths, chunk_name=well_id) 

1011 

1012 self.app.current_status = f"✅ OME-ZARR export completed: {export_store_path}" 

1013 else: 

1014 self.app.show_error("No Data", "No processed images found in workspace.") 

1015 else: 

1016 self.app.show_error("No Workspace", "Plate workspace not found. Run pipeline first.") 

1017 

1018 except Exception as e: 

1019 logger.error(f"OME-ZARR export failed: {e}", exc_info=True) 

1020 self.app.show_error("Export Failed", f"OME-ZARR export failed: {str(e)}") 

1021 

1022 # Run export in background 

1023 asyncio.create_task(run_export()) 

1024 

1025 # Debug functionality removed - no longer needed 

1026 

1027 async def _open_plate_directory_browser(self): 

1028 """Open textual-window file browser for plate directory selection.""" 

1029 # Get cached path for better UX - remembers last used directory 

1030 path_cache = get_path_cache() 

1031 initial_path = path_cache.get_initial_path(PathCacheKey.PLATE_IMPORT, Path.home()) 

1032 

1033 # Open textual-window file browser for directory selection 

1034 await self.window_service.open_file_browser( 

1035 file_manager=self.filemanager, 

1036 initial_path=initial_path, 

1037 backend=Backend.DISK, 

1038 title="Select Plate Directory", 

1039 mode="load", 

1040 selection_mode=SelectionMode.DIRECTORIES_ONLY, 

1041 cache_key=PathCacheKey.PLATE_IMPORT, 

1042 on_result_callback=self._add_plate_callback, 

1043 caller_id="plate_manager", 

1044 enable_multi_selection=True 

1045 ) 

1046 

1047 def _add_plate_callback(self, selected_paths) -> None: 

1048 """Handle directory selection from file browser.""" 

1049 logger.debug(f"_add_plate_callback called with: {selected_paths} (type: {type(selected_paths)})") 

1050 

1051 if selected_paths is None or selected_paths is False: 

1052 self.app.current_status = "Plate selection cancelled" 

1053 return 

1054 

1055 # Handle both single path and list of paths 

1056 if not isinstance(selected_paths, list): 

1057 selected_paths = [selected_paths] 

1058 

1059 added_plates = [] 

1060 current_plates = list(self.items) 

1061 

1062 for selected_path in selected_paths: 

1063 # Ensure selected_path is a Path object 

1064 if isinstance(selected_path, str): 

1065 selected_path = Path(selected_path) 

1066 elif not isinstance(selected_path, Path): 

1067 selected_path = Path(str(selected_path)) 

1068 

1069 # Check if plate already exists 

1070 if any(plate['path'] == str(selected_path) for plate in current_plates): 

1071 continue 

1072 

1073 # Add the plate to the list 

1074 plate_name = selected_path.name 

1075 plate_path = str(selected_path) 

1076 plate_entry = { 

1077 'name': plate_name, 

1078 'path': plate_path, 

1079 # No status field - state comes from orchestrator 

1080 } 

1081 

1082 current_plates.append(plate_entry) 

1083 added_plates.append(plate_name) 

1084 

1085 # Cache the parent directory for next time (save user navigation time) 

1086 if selected_paths: 

1087 # Use parent of first selected path as the cached directory 

1088 first_path = selected_paths[0] if isinstance(selected_paths[0], Path) else Path(selected_paths[0]) 

1089 parent_dir = first_path.parent 

1090 get_path_cache().set_cached_path(PathCacheKey.PLATE_IMPORT, parent_dir) 

1091 

1092 # Update items list using reactive property (triggers automatic UI update) 

1093 self.items = current_plates 

1094 

1095 if added_plates: 

1096 if len(added_plates) == 1: 

1097 self.app.current_status = f"Added plate: {added_plates[0]}" 

1098 else: 

1099 self.app.current_status = f"Added {len(added_plates)} plates: {', '.join(added_plates)}" 

1100 else: 

1101 self.app.current_status = "No new plates added (duplicates skipped)" 

1102 

1103 def action_delete_plate(self) -> None: 

1104 selected_items, _ = self.get_selection_state() 

1105 if not selected_items: 

1106 self.app.show_error("No plate selected to delete.") 

1107 return 

1108 

1109 paths_to_delete = {p['path'] for p in selected_items} 

1110 self.items = [p for p in self.items if p['path'] not in paths_to_delete] 

1111 

1112 # Clean up orchestrators for deleted plates 

1113 for path in paths_to_delete: 

1114 if path in self.orchestrators: 

1115 del self.orchestrators[path] 

1116 

1117 if self.selected_plate in paths_to_delete: 

1118 self.selected_plate = "" 

1119 

1120 self.app.current_status = f"Deleted {len(paths_to_delete)} plate(s)" 

1121 

1122 

1123 

1124 async def action_edit_config(self) -> None: 

1125 """ 

1126 Handle Edit button - create per-orchestrator PipelineConfig instances. 

1127 

1128 This enables per-orchestrator configuration without affecting global configuration. 

1129 Shows resolved defaults from GlobalPipelineConfig with "Pipeline default: {value}" placeholders. 

1130 """ 

1131 selected_items, selection_mode = self.get_selection_state() 

1132 

1133 if selection_mode == "empty": 

1134 self.app.current_status = "No orchestrators selected for configuration" 

1135 return 

1136 

1137 selected_orchestrators = [ 

1138 self.orchestrators[item['path']] for item in selected_items 

1139 if item['path'] in self.orchestrators 

1140 ] 

1141 

1142 if not selected_orchestrators: 

1143 self.app.current_status = "No initialized orchestrators selected" 

1144 return 

1145 

1146 # Load existing config or create new one for editing 

1147 representative_orchestrator = selected_orchestrators[0] 

1148 

1149 # Use orchestrator's existing config if it exists, otherwise use global config as source 

1150 source_config = representative_orchestrator.pipeline_config or self.global_config 

1151 

1152 current_plate_config = create_dataclass_for_editing(PipelineConfig, source_config) 

1153 

1154 def handle_config_save(new_config: PipelineConfig) -> None: 

1155 """Apply per-orchestrator configuration without global side effects.""" 

1156 for orchestrator in selected_orchestrators: 

1157 # Direct synchronous call - no async needed 

1158 orchestrator.apply_pipeline_config(new_config) 

1159 count = len(selected_orchestrators) 

1160 self.app.current_status = f"Per-orchestrator configuration applied to {count} orchestrator(s)" 

1161 

1162 # Open configuration window using PipelineConfig (not GlobalPipelineConfig) 

1163 await self.window_service.open_config_window( 

1164 PipelineConfig, 

1165 current_plate_config, 

1166 on_save_callback=handle_config_save 

1167 ) 

1168 

1169 async def action_edit_global_config(self) -> None: 

1170 """ 

1171 Handle global configuration editing - affects all orchestrators. 

1172 

1173 This maintains the existing global configuration workflow but uses lazy loading. 

1174 """ 

1175 

1176 from openhcs.core.config import PipelineConfig 

1177 from openhcs.config_framework.lazy_factory import create_dataclass_for_editing 

1178 

1179 # Get current global config from app or use default 

1180 current_global_config = self.app.global_config or GlobalPipelineConfig() 

1181 

1182 # Create lazy PipelineConfig for editing with proper thread-local context 

1183 current_lazy_config = create_dataclass_for_editing(PipelineConfig, current_global_config, preserve_values=True) 

1184 

1185 def handle_global_config_save(new_config: PipelineConfig) -> None: 

1186 """Apply global configuration to all orchestrators.""" 

1187 # Convert lazy PipelineConfig back to GlobalPipelineConfig 

1188 global_config = new_config.to_base_config() 

1189 

1190 self.app.global_config = global_config # Update app-level config 

1191 

1192 # REMOVED: Thread-local modification - dual-axis resolver handles context automatically 

1193 

1194 for orchestrator in self.orchestrators.values(): 

1195 asyncio.create_task(orchestrator.apply_new_global_config(global_config)) 

1196 self.app.current_status = "Global configuration applied to all orchestrators" 

1197 

1198 # PipelineConfig already imported from openhcs.core.config 

1199 await self.window_service.open_config_window( 

1200 PipelineConfig, 

1201 current_lazy_config, 

1202 on_save_callback=handle_global_config_save 

1203 ) 

1204 

1205 

1206 

1207 def _analyze_orchestrator_configs(self, orchestrators: List['PipelineOrchestrator']) -> Dict[str, Dict[str, Any]]: 

1208 """Analyze configs across multiple orchestrators to detect same/different values. 

1209 

1210 Args: 

1211 orchestrators: List of PipelineOrchestrator instances 

1212 

1213 Returns: 

1214 Dict mapping field names to analysis results: 

1215 - {"type": "same", "value": actual_value, "default": default_value} 

1216 - {"type": "different", "values": [val1, val2, ...], "default": default_value} 

1217 """ 

1218 if not orchestrators: 

1219 return {} 

1220 

1221 # Get parameter info for defaults 

1222 param_info = SignatureAnalyzer.analyze(GlobalPipelineConfig) 

1223 

1224 config_analysis = {} 

1225 

1226 # Analyze each field in GlobalPipelineConfig 

1227 for field in dataclasses.fields(GlobalPipelineConfig): 

1228 field_name = field.name 

1229 

1230 # Get values from all orchestrators 

1231 values = [] 

1232 for orch in orchestrators: 

1233 try: 

1234 value = getattr(orch.global_config, field_name) 

1235 values.append(value) 

1236 except AttributeError: 

1237 # Field doesn't exist in this config, skip 

1238 continue 

1239 

1240 if not values: 

1241 continue 

1242 

1243 # Get default value from parameter info 

1244 param_details = param_info.get(field_name) 

1245 default_value = param_details.default_value if param_details else None 

1246 

1247 # Check if all values are the same 

1248 if all(self._values_equal(v, values[0]) for v in values): 

1249 config_analysis[field_name] = { 

1250 "type": "same", 

1251 "value": values[0], 

1252 "default": default_value 

1253 } 

1254 else: 

1255 config_analysis[field_name] = { 

1256 "type": "different", 

1257 "values": values, 

1258 "default": default_value 

1259 } 

1260 

1261 return config_analysis 

1262 

1263 def _values_equal(self, val1: Any, val2: Any) -> bool: 

1264 """Check if two values are equal, handling dataclasses and complex types.""" 

1265 # Handle dataclass comparison 

1266 if dataclasses.is_dataclass(val1) and dataclasses.is_dataclass(val2): 

1267 return dataclasses.asdict(val1) == dataclasses.asdict(val2) 

1268 

1269 # Handle regular comparison 

1270 return val1 == val2 

1271 

1272 def action_init_plate(self) -> None: 

1273 """Handle Init Plate button - initialize selected plates.""" 

1274 # Get current selection state 

1275 selected_items, selection_mode = self.get_selection_state() 

1276 

1277 if selection_mode == "empty": 

1278 logger.warning("No plates available for initialization") 

1279 return 

1280 

1281 # Validate all selected plates can be initialized (allow ALL failed plates to be re-initialized) 

1282 invalid_plates = [] 

1283 for item in selected_items: 

1284 plate_path = item['path'] 

1285 orchestrator = self.orchestrators.get(plate_path) 

1286 # Only block plates that are currently executing - all other states can be re-initialized 

1287 if orchestrator is not None and orchestrator.state == OrchestratorState.EXECUTING: 

1288 invalid_plates.append(item) 

1289 

1290 if invalid_plates: 

1291 names = [item['name'] for item in invalid_plates] 

1292 logger.warning(f"Cannot initialize plates that are currently executing: {', '.join(names)}") 

1293 return 

1294 

1295 # Start async initialization 

1296 self._start_async_init(selected_items, selection_mode) 

1297 

1298 def _start_async_init(self, selected_items: List[Dict], selection_mode: str) -> None: 

1299 """Start async initialization of selected plates.""" 

1300 # Generate operation description 

1301 desc = self.get_operation_description(selected_items, selection_mode, "initialize") 

1302 logger.info(f"Initializing: {desc}") 

1303 

1304 # Start background worker 

1305 self._init_plates_worker(selected_items) 

1306 

1307 @work(exclusive=True) 

1308 async def _init_plates_worker(self, selected_items: List[Dict]) -> None: 

1309 """Background worker for plate initialization.""" 

1310 for plate_data in selected_items: 

1311 plate_path = plate_data['path'] 

1312 

1313 # Find the actual plate in self.items (not the copy from get_selection_state) 

1314 actual_plate = None 

1315 for plate in self.items: 

1316 if plate['path'] == plate_path: 

1317 actual_plate = plate 

1318 break 

1319 

1320 if not actual_plate: 

1321 logger.error(f"Plate not found in plates list: {plate_path}") 

1322 continue 

1323 

1324 try: 

1325 # Run heavy initialization in executor to avoid blocking UI 

1326 def init_orchestrator(): 

1327 return PipelineOrchestrator( 

1328 plate_path=plate_path, 

1329 global_config=self.global_config, 

1330 storage_registry=self.filemanager.registry 

1331 ).initialize() 

1332 

1333 orchestrator = await asyncio.get_event_loop().run_in_executor(None, init_orchestrator) 

1334 

1335 # Store orchestrator for later use (channel selection, etc.) 

1336 self.orchestrators[plate_path] = orchestrator 

1337 # Orchestrator state is already set to READY by initialize() method 

1338 logger.info(f"Plate {actual_plate['name']} initialized successfully") 

1339 

1340 except Exception as e: 

1341 logger.error(f"Failed to initialize plate {plate_path}: {e}", exc_info=True) 

1342 # Create a failed orchestrator to track the error state 

1343 failed_orchestrator = PipelineOrchestrator( 

1344 plate_path=plate_path, 

1345 global_config=self.global_config, 

1346 storage_registry=self.filemanager.registry 

1347 ) 

1348 failed_orchestrator._state = OrchestratorState.INIT_FAILED 

1349 self.orchestrators[plate_path] = failed_orchestrator 

1350 actual_plate['error'] = str(e) 

1351 

1352 # Trigger UI refresh after orchestrator state changes 

1353 self._trigger_ui_refresh() 

1354 # Update button states immediately (reactive system handles UI updates automatically) 

1355 self._update_button_states() 

1356 # Notify pipeline editor of status change 

1357 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path'])) 

1358 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol) 

1359 logger.debug(f"Updated plate {actual_plate['name']} status") 

1360 

1361 # Final UI update (reactive system handles this automatically when self.items is modified) 

1362 self._update_button_states() 

1363 

1364 # Update status 

1365 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.READY]) 

1366 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.INIT_FAILED]) 

1367 

1368 if error_count == 0: 

1369 logger.info(f"Successfully initialized {success_count} plates") 

1370 else: 

1371 logger.warning(f"Initialized {success_count} plates, {error_count} errors") 

1372 

1373 def action_compile_plate(self) -> None: 

1374 """Handle Compile Plate button - compile pipelines for selected plates.""" 

1375 # Get current selection state 

1376 selected_items, selection_mode = self.get_selection_state() 

1377 

1378 if selection_mode == "empty": 

1379 logger.warning("No plates available for compilation") 

1380 return 

1381 

1382 # Validate all selected plates are ready for compilation (allow failed plates to be re-compiled) 

1383 not_ready = [] 

1384 for item in selected_items: 

1385 plate_path = item['path'] 

1386 orchestrator = self.orchestrators.get(plate_path) 

1387 # Allow READY, COMPILE_FAILED, EXEC_FAILED, COMPILED, and COMPLETED states to be compiled/recompiled 

1388 if orchestrator is None or orchestrator.state not in [OrchestratorState.READY, OrchestratorState.COMPILE_FAILED, OrchestratorState.EXEC_FAILED, OrchestratorState.COMPILED, OrchestratorState.COMPLETED]: 

1389 not_ready.append(item) 

1390 

1391 if not_ready: 

1392 names = [item['name'] for item in not_ready] 

1393 # More accurate error message based on actual state 

1394 if any(self.orchestrators.get(item['path']) is None for item in not_ready): 

1395 logger.warning(f"Cannot compile plates that haven't been initialized: {', '.join(names)}") 

1396 elif any(self.orchestrators.get(item['path']).state == OrchestratorState.EXECUTING for item in not_ready): 

1397 logger.warning(f"Cannot compile plates that are currently executing: {', '.join(names)}") 

1398 else: 

1399 logger.warning(f"Cannot compile plates in current state: {', '.join(names)}") 

1400 return 

1401 

1402 # Validate all selected plates have pipelines 

1403 no_pipeline = [] 

1404 for item in selected_items: 

1405 pipeline = self._get_current_pipeline_definition(item['path']) 

1406 if not pipeline: 

1407 no_pipeline.append(item) 

1408 

1409 if no_pipeline: 

1410 names = [item['name'] for item in no_pipeline] 

1411 self.app.current_status = f"Cannot compile plates without pipelines: {', '.join(names)}" 

1412 return 

1413 

1414 # Start async compilation 

1415 self._start_async_compile(selected_items, selection_mode) 

1416 

1417 def _start_async_compile(self, selected_items: List[Dict], selection_mode: str) -> None: 

1418 """Start async compilation of selected plates.""" 

1419 # Generate operation description 

1420 desc = self.get_operation_description(selected_items, selection_mode, "compile") 

1421 logger.info(f"Compiling: {desc}") 

1422 

1423 # Start background worker 

1424 self._compile_plates_worker(selected_items) 

1425 

1426 @work(exclusive=True) 

1427 async def _compile_plates_worker(self, selected_items: List[Dict]) -> None: 

1428 """Background worker for plate compilation.""" 

1429 for plate_data in selected_items: 

1430 plate_path = plate_data['path'] 

1431 

1432 # Find the actual plate in self.items (not the copy from get_selection_state) 

1433 actual_plate = None 

1434 for plate in self.items: 

1435 if plate['path'] == plate_path: 

1436 actual_plate = plate 

1437 break 

1438 

1439 if not actual_plate: 

1440 logger.error(f"Plate not found in plates list: {plate_path}") 

1441 continue 

1442 

1443 # Get definition pipeline and make fresh copy 

1444 definition_pipeline = self._get_current_pipeline_definition(plate_path) 

1445 if not definition_pipeline: 

1446 logger.warning(f"No pipeline defined for {actual_plate['name']}, using empty pipeline") 

1447 definition_pipeline = [] 

1448 

1449 try: 

1450 # Get or create orchestrator for compilation (run in executor to avoid blocking) 

1451 def get_or_create_orchestrator(): 

1452 if plate_path in self.orchestrators: 

1453 orchestrator = self.orchestrators[plate_path] 

1454 if not orchestrator.is_initialized(): 

1455 orchestrator.initialize() 

1456 return orchestrator 

1457 else: 

1458 return PipelineOrchestrator( 

1459 plate_path=plate_path, 

1460 global_config=self.global_config, 

1461 storage_registry=self.filemanager.registry 

1462 ).initialize() 

1463 

1464 orchestrator = await asyncio.get_event_loop().run_in_executor(None, get_or_create_orchestrator) 

1465 self.orchestrators[plate_path] = orchestrator 

1466 

1467 # Make fresh copy for compilation 

1468 execution_pipeline = copy.deepcopy(definition_pipeline) 

1469 

1470 # Fix step IDs after deep copy to match new object IDs 

1471 for step in execution_pipeline: 

1472 step.step_id = str(id(step)) 

1473 # Ensure variable_components is never None - use FunctionStep default 

1474 if step.variable_components is None: 

1475 logger.warning(f"🔥 Step '{step.name}' has None variable_components, setting FunctionStep default") 

1476 step.variable_components = [VariableComponents.SITE] 

1477 # Also ensure it's not an empty list 

1478 elif not step.variable_components: 

1479 logger.warning(f"🔥 Step '{step.name}' has empty variable_components, setting FunctionStep default") 

1480 step.variable_components = [VariableComponents.SITE] 

1481 

1482 # Get wells and compile (async - run in executor to avoid blocking UI) 

1483 # Wrap in Pipeline object like test_main.py does 

1484 pipeline_obj = Pipeline(steps=execution_pipeline) 

1485 

1486 # Run heavy operations in executor to avoid blocking UI 

1487 # Get wells using multiprocessing axis (WELL in default config) 

1488 from openhcs.constants import MULTIPROCESSING_AXIS 

1489 wells = await asyncio.get_event_loop().run_in_executor(None, lambda: orchestrator.get_component_keys(MULTIPROCESSING_AXIS)) 

1490 compiled_contexts = await asyncio.get_event_loop().run_in_executor( 

1491 None, orchestrator.compile_pipelines, pipeline_obj.steps, wells 

1492 ) 

1493 

1494 # Store state simply - no reactive property issues 

1495 step_ids_in_pipeline = [id(step) for step in execution_pipeline] 

1496 # Get step IDs from contexts (ProcessingContext objects) 

1497 first_well_key = list(compiled_contexts.keys())[0] if compiled_contexts else None 

1498 step_ids_in_contexts = list(compiled_contexts[first_well_key].step_plans.keys()) if first_well_key and hasattr(compiled_contexts[first_well_key], 'step_plans') else [] 

1499 logger.info(f"🔥 Storing compiled data for {plate_path}: pipeline={type(execution_pipeline)}, contexts={type(compiled_contexts)}") 

1500 logger.info(f"🔥 Step IDs in pipeline: {step_ids_in_pipeline}") 

1501 logger.info(f"🔥 Step IDs in contexts: {step_ids_in_contexts}") 

1502 self.plate_compiled_data[plate_path] = (execution_pipeline, compiled_contexts) 

1503 logger.info(f"🔥 Stored! Available compiled plates: {list(self.plate_compiled_data.keys())}") 

1504 

1505 # Orchestrator state is already set to COMPILED by compile_pipelines() method 

1506 logger.info(f"🔥 Successfully compiled {plate_path}") 

1507 

1508 except Exception as e: 

1509 logger.error(f"🔥 COMPILATION ERROR: Pipeline compilation failed for {plate_path}: {e}", exc_info=True) 

1510 # Orchestrator state is already set to FAILED by compile_pipelines() method 

1511 actual_plate['error'] = str(e) 

1512 # Don't store anything in plate_compiled_data on failure 

1513 

1514 # Trigger UI refresh after orchestrator state changes 

1515 self._trigger_ui_refresh() 

1516 # Update button states immediately (reactive system handles UI updates automatically) 

1517 self._update_button_states() 

1518 # Notify pipeline editor of status change 

1519 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path'])) 

1520 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol) 

1521 

1522 # Final UI update (reactive system handles this automatically when self.items is modified) 

1523 self._update_button_states() 

1524 

1525 # Update status 

1526 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILED]) 

1527 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILE_FAILED]) 

1528 

1529 if error_count == 0: 

1530 logger.info(f"Successfully compiled {success_count} plates") 

1531 else: 

1532 logger.warning(f"Compiled {success_count} plates, {error_count} errors") 

1533 

1534 async def action_code_plate(self) -> None: 

1535 """Generate Python code for selected plates and their pipelines.""" 

1536 logger.debug("Code button pressed - generating Python code for plates") 

1537 

1538 selected_items, _ = self.get_selection_state() 

1539 if not selected_items: 

1540 self.app.current_status = "No plates selected for code generation" 

1541 return 

1542 

1543 try: 

1544 # Get pipeline data for selected plates 

1545 plate_paths = [item['path'] for item in selected_items] 

1546 pipeline_data = {} 

1547 

1548 # Collect pipeline steps for each plate 

1549 for plate_path in plate_paths: 

1550 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1551 # Get pipeline steps from pipeline editor if available 

1552 if plate_path in self.pipeline_editor.plate_pipelines: 

1553 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path] 

1554 else: 

1555 pipeline_data[plate_path] = [] 

1556 else: 

1557 pipeline_data[plate_path] = [] 

1558 

1559 # Use existing pickle_to_python logic to generate complete script 

1560 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher 

1561 

1562 # Create data structure like pickle_to_python expects 

1563 data = { 

1564 'plate_paths': plate_paths, 

1565 'pipeline_data': pipeline_data, 

1566 'global_config': self.app.global_config 

1567 } 

1568 

1569 # Extract variables from data dict 

1570 plate_paths = data['plate_paths'] 

1571 pipeline_data = data['pipeline_data'] 

1572 

1573 # Generate just the orchestrator configuration (no execution wrapper) 

1574 from openhcs.debug.pickle_to_python import generate_complete_orchestrator_code 

1575 

1576 python_code = generate_complete_orchestrator_code( 

1577 plate_paths=plate_paths, 

1578 pipeline_data=pipeline_data, 

1579 global_config=self.app.global_config, 

1580 clean_mode=True # Default to clean mode - only show non-default values 

1581 ) 

1582 

1583 # Create callback to handle edited code 

1584 def handle_edited_code(edited_code: str): 

1585 logger.debug("Orchestrator code edited, processing changes...") 

1586 try: 

1587 # Execute the code (it has all necessary imports) 

1588 namespace = {} 

1589 exec(edited_code, namespace) 

1590 

1591 # Update pipeline data if present (composition: orchestrator contains pipelines) 

1592 if 'pipeline_data' in namespace: 

1593 new_pipeline_data = namespace['pipeline_data'] 

1594 # Update pipeline editor using reactive system (like pipeline code button does) 

1595 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1596 # Update plate pipelines storage 

1597 current_pipelines = dict(self.pipeline_editor.plate_pipelines) 

1598 current_pipelines.update(new_pipeline_data) 

1599 self.pipeline_editor.plate_pipelines = current_pipelines 

1600 

1601 # If current plate is in the edited data, update the current view too 

1602 current_plate = self.pipeline_editor.current_plate 

1603 if current_plate and current_plate in new_pipeline_data: 

1604 self.pipeline_editor.pipeline_steps = new_pipeline_data[current_plate] 

1605 

1606 self.app.current_status = f"Pipeline data updated for {len(new_pipeline_data)} plates" 

1607 

1608 # Update global config if present 

1609 elif 'global_config' in namespace: 

1610 new_global_config = namespace['global_config'] 

1611 import asyncio 

1612 for plate_path in plate_paths: 

1613 if plate_path in self.orchestrators: 

1614 orchestrator = self.orchestrators[plate_path] 

1615 asyncio.create_task(orchestrator.apply_new_global_config(new_global_config)) 

1616 self.app.current_status = f"Global config updated for {len(plate_paths)} plates" 

1617 

1618 # Update orchestrators list if present 

1619 elif 'orchestrators' in namespace: 

1620 new_orchestrators = namespace['orchestrators'] 

1621 self.app.current_status = f"Orchestrator list updated with {len(new_orchestrators)} orchestrators" 

1622 

1623 else: 

1624 self.app.show_error("Parse Error", "No valid assignments found in edited code") 

1625 

1626 except SyntaxError as e: 

1627 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}") 

1628 except Exception as e: 

1629 import traceback 

1630 full_traceback = traceback.format_exc() 

1631 logger.error(f"Failed to parse edited orchestrator code: {e}\nFull traceback:\n{full_traceback}") 

1632 self.app.show_error("Edit Error", f"Failed to parse orchestrator code: {str(e)}\n\nFull traceback:\n{full_traceback}") 

1633 

1634 # Launch terminal editor 

1635 launcher = TerminalLauncher(self.app) 

1636 await launcher.launch_editor_for_file( 

1637 file_content=python_code, 

1638 file_extension='.py', 

1639 on_save_callback=handle_edited_code 

1640 ) 

1641 

1642 except Exception as e: 

1643 logger.error(f"Failed to generate plate code: {e}") 

1644 self.app.current_status = f"Failed to generate code: {e}" 

1645 

1646 async def action_save_python_script(self) -> None: 

1647 """Save Python script for selected plates (like special_io_pipeline.py).""" 

1648 logger.debug("Save button pressed - saving Python script for plates") 

1649 

1650 selected_items, _ = self.get_selection_state() 

1651 if not selected_items: 

1652 self.app.current_status = "No plates selected for script generation" 

1653 return 

1654 

1655 try: 

1656 # Get pipeline data for selected plates 

1657 plate_paths = [item['path'] for item in selected_items] 

1658 pipeline_data = {} 

1659 

1660 # Collect pipeline steps for each plate 

1661 for plate_path in plate_paths: 

1662 if hasattr(self, 'pipeline_editor') and self.pipeline_editor: 

1663 # Get pipeline steps from pipeline editor if available 

1664 if plate_path in self.pipeline_editor.plate_pipelines: 

1665 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path] 

1666 else: 

1667 pipeline_data[plate_path] = [] 

1668 else: 

1669 pipeline_data[plate_path] = [] 

1670 

1671 # Create data structure like pickle_to_python expects 

1672 data = { 

1673 'plate_paths': plate_paths, 

1674 'pipeline_data': pipeline_data, 

1675 'global_config': self.app.global_config 

1676 } 

1677 

1678 # Generate complete executable Python script using pickle_to_python logic 

1679 python_code = self._generate_executable_script(data) 

1680 

1681 # Launch file browser to save the script 

1682 from openhcs.textual_tui.windows.file_browser_window import open_file_browser_window, BrowserMode 

1683 from openhcs.textual_tui.services.file_browser_service import SelectionMode 

1684 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey 

1685 from openhcs.constants.constants import Backend 

1686 

1687 def handle_save_result(result): 

1688 if result: 

1689 # Handle both single Path and list of Paths 

1690 save_path = None 

1691 if isinstance(result, Path): 

1692 save_path = result 

1693 elif isinstance(result, list) and len(result) > 0: 

1694 save_path = result[0] # Take first path 

1695 

1696 if save_path: 

1697 try: 

1698 # Write the Python script to the selected file 

1699 with open(save_path, 'w') as f: 

1700 f.write(python_code) 

1701 

1702 logger.info(f"Python script saved to: {save_path}") 

1703 self.app.current_status = f"Python script saved to: {save_path}" 

1704 except Exception as e: 

1705 logger.error(f"Failed to save Python script: {e}") 

1706 self.app.current_status = f"Failed to save script: {e}" 

1707 

1708 # Generate default filename based on first plate 

1709 first_plate_name = Path(plate_paths[0]).name if plate_paths else "pipeline" 

1710 default_filename = f"{first_plate_name}_pipeline.py" 

1711 

1712 await open_file_browser_window( 

1713 app=self.app, 

1714 file_manager=self.app.filemanager, 

1715 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES), 

1716 backend=Backend.DISK, 

1717 title="Save Python Pipeline Script", 

1718 mode=BrowserMode.SAVE, 

1719 selection_mode=SelectionMode.FILES_ONLY, 

1720 filter_extensions=['.py'], 

1721 default_filename=default_filename, 

1722 cache_key=PathCacheKey.PIPELINE_FILES, 

1723 on_result_callback=handle_save_result, 

1724 caller_id="plate_manager_save_script" 

1725 ) 

1726 

1727 except Exception as e: 

1728 logger.error(f"Failed to save Python script: {e}") 

1729 self.app.current_status = f"Failed to save script: {e}" 

1730 

1731 def _generate_executable_script(self, data: Dict) -> str: 

1732 """Generate fully executable Python script by creating a temp pickle and using existing convert_pickle_to_python.""" 

1733 import tempfile 

1734 import dill as pickle 

1735 from openhcs.debug.pickle_to_python import convert_pickle_to_python 

1736 

1737 # Create temporary pickle file 

1738 with tempfile.NamedTemporaryFile(mode='wb', suffix='.pkl', delete=False) as temp_pickle: 

1739 pickle.dump(data, temp_pickle) 

1740 temp_pickle_path = temp_pickle.name 

1741 

1742 # Create temporary output file 

1743 with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp_output: 

1744 temp_output_path = temp_output.name 

1745 

1746 try: 

1747 # Use existing convert_pickle_to_python function 

1748 convert_pickle_to_python(temp_pickle_path, temp_output_path) 

1749 

1750 # Read the generated script 

1751 with open(temp_output_path, 'r') as f: 

1752 script_content = f.read() 

1753 

1754 return script_content 

1755 

1756 finally: 

1757 # Clean up temp files 

1758 import os 

1759 try: 

1760 os.unlink(temp_pickle_path) 

1761 os.unlink(temp_output_path) 

1762 except: 

1763 pass 

1764 

1765