Coverage for openhcs/pyqt_gui/widgets/plate_manager.py: 0.0%

574 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Plate Manager Widget for PyQt6 

3 

4Manages plate selection, initialization, and execution with full feature parity 

5to the Textual TUI version. Uses hybrid approach: extracted business logic + clean PyQt6 UI. 

6""" 

7 

8import logging 

9import asyncio 

10import inspect 

11import copy 

12import sys 

13import subprocess 

14import tempfile 

15from typing import List, Dict, Optional, Callable 

16from pathlib import Path 

17 

18from PyQt6.QtWidgets import ( 

19 QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QListWidget, 

20 QListWidgetItem, QLabel, QMessageBox, QFileDialog, QProgressBar, 

21 QCheckBox, QFrame, QSplitter 

22) 

23from PyQt6.QtCore import Qt, pyqtSignal, QTimer, QThread 

24from PyQt6.QtGui import QFont 

25 

26from openhcs.core.config import GlobalPipelineConfig, PipelineConfig 

27from openhcs.io.filemanager import FileManager 

28from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator, OrchestratorState 

29from openhcs.core.pipeline import Pipeline 

30from openhcs.constants.constants import VariableComponents, GroupBy 

31from openhcs.pyqt_gui.widgets.mixins import ( 

32 preserve_selection_during_update, 

33 handle_selection_change_with_prevention 

34) 

35from openhcs.pyqt_gui.shared.style_generator import StyleSheetGenerator 

36from openhcs.pyqt_gui.shared.color_scheme import PyQt6ColorScheme 

37 

38logger = logging.getLogger(__name__) 

39 

40 

41class PlateManagerWidget(QWidget): 

42 """ 

43 PyQt6 Plate Manager Widget. 

44  

45 Manages plate selection, initialization, compilation, and execution. 

46 Preserves all business logic from Textual version with clean PyQt6 UI. 

47 """ 

48 

49 # Signals 

50 plate_selected = pyqtSignal(str) # plate_path 

51 status_message = pyqtSignal(str) # status message 

52 orchestrator_state_changed = pyqtSignal(str, str) # plate_path, state 

53 

54 # Log viewer integration signals 

55 subprocess_log_started = pyqtSignal(str) # base_log_path 

56 subprocess_log_stopped = pyqtSignal() 

57 clear_subprocess_logs = pyqtSignal() 

58 

59 # Progress update signals (thread-safe UI updates) 

60 progress_started = pyqtSignal(int) # max_value 

61 progress_updated = pyqtSignal(int) # current_value 

62 progress_finished = pyqtSignal() 

63 

64 # Error handling signals (thread-safe error reporting) 

65 compilation_error = pyqtSignal(str, str) # plate_name, error_message 

66 initialization_error = pyqtSignal(str, str) # plate_name, error_message 

67 

68 def __init__(self, file_manager: FileManager, service_adapter, 

69 color_scheme: Optional[PyQt6ColorScheme] = None, parent=None): 

70 """ 

71 Initialize the plate manager widget. 

72 

73 Args: 

74 file_manager: FileManager instance for file operations 

75 service_adapter: PyQt service adapter for dialogs and operations 

76 color_scheme: Color scheme for styling (optional, uses service adapter if None) 

77 parent: Parent widget 

78 """ 

79 super().__init__(parent) 

80 

81 # Core dependencies 

82 self.file_manager = file_manager 

83 self.service_adapter = service_adapter 

84 self.global_config = service_adapter.get_global_config() 

85 self.pipeline_editor = None # Will be set by main window 

86 

87 # Initialize color scheme and style generator 

88 self.color_scheme = color_scheme or service_adapter.get_current_color_scheme() 

89 self.style_generator = StyleSheetGenerator(self.color_scheme) 

90 

91 # Business logic state (extracted from Textual version) 

92 self.plates: List[Dict] = [] # List of plate dictionaries 

93 self.selected_plate_path: str = "" 

94 self.orchestrators: Dict[str, PipelineOrchestrator] = {} 

95 self.plate_configs: Dict[str, Dict] = {} 

96 self.plate_compiled_data: Dict[str, tuple] = {} # Store compiled pipeline data 

97 self.current_process = None 

98 self.execution_state = "idle" 

99 self.log_file_path: Optional[str] = None 

100 self.log_file_position: int = 0 

101 

102 # UI components 

103 self.plate_list: Optional[QListWidget] = None 

104 self.buttons: Dict[str, QPushButton] = {} 

105 self.status_label: Optional[QLabel] = None 

106 self.progress_bar: Optional[QProgressBar] = None 

107 

108 # Setup UI 

109 self.setup_ui() 

110 self.setup_connections() 

111 self.update_button_states() 

112 

113 logger.debug("Plate manager widget initialized") 

114 

115 # ========== UI Setup ========== 

116 

117 def setup_ui(self): 

118 """Setup the user interface.""" 

119 layout = QVBoxLayout(self) 

120 layout.setContentsMargins(5, 5, 5, 5) 

121 layout.setSpacing(5) 

122 

123 # Title 

124 title_label = QLabel("Plate Manager") 

125 title_label.setFont(QFont("Arial", 12, QFont.Weight.Bold)) 

126 title_label.setStyleSheet(f"color: {self.color_scheme.to_hex(self.color_scheme.text_accent)}; padding: 5px;") 

127 layout.addWidget(title_label) 

128 

129 # Main content splitter 

130 splitter = QSplitter(Qt.Orientation.Vertical) 

131 layout.addWidget(splitter) 

132 

133 # Plate list 

134 self.plate_list = QListWidget() 

135 self.plate_list.setSelectionMode(QListWidget.SelectionMode.ExtendedSelection) 

136 # Apply centralized styling to plate list 

137 self.setStyleSheet(self.style_generator.generate_plate_manager_style()) 

138 splitter.addWidget(self.plate_list) 

139 

140 # Button panel 

141 button_panel = self.create_button_panel() 

142 splitter.addWidget(button_panel) 

143 

144 # Status section 

145 status_frame = self.create_status_section() 

146 layout.addWidget(status_frame) 

147 

148 # Set splitter proportions 

149 splitter.setSizes([300, 150]) 

150 

151 def create_button_panel(self) -> QWidget: 

152 """ 

153 Create the button panel with all plate management actions. 

154  

155 Returns: 

156 Widget containing action buttons 

157 """ 

158 panel = QFrame() 

159 panel.setFrameStyle(QFrame.Shape.Box) 

160 # Frame styling is handled by the main widget stylesheet 

161 

162 layout = QVBoxLayout(panel) 

163 

164 # Button configurations (extracted from Textual version) 

165 button_configs = [ 

166 ("Add", "add_plate", "Add new plate directory"), 

167 ("Del", "del_plate", "Delete selected plates"), 

168 ("Edit", "edit_config", "Edit plate configuration"), 

169 ("Init", "init_plate", "Initialize selected plates"), 

170 ("Compile", "compile_plate", "Compile plate pipelines"), 

171 ("Run", "run_plate", "Run/Stop plate execution"), 

172 ("Code", "code_plate", "Generate Python code"), 

173 ("Save", "save_python_script", "Save Python script"), 

174 ] 

175 

176 # Create buttons in rows 

177 for i in range(0, len(button_configs), 4): 

178 row_layout = QHBoxLayout() 

179 

180 for j in range(4): 

181 if i + j < len(button_configs): 

182 name, action, tooltip = button_configs[i + j] 

183 

184 button = QPushButton(name) 

185 button.setToolTip(tooltip) 

186 button.setMinimumHeight(30) 

187 # Button styling is handled by the main widget stylesheet 

188 

189 # Connect button to action 

190 button.clicked.connect(lambda checked, a=action: self.handle_button_action(a)) 

191 

192 self.buttons[action] = button 

193 row_layout.addWidget(button) 

194 else: 

195 row_layout.addStretch() 

196 

197 layout.addLayout(row_layout) 

198 

199 return panel 

200 

201 def create_status_section(self) -> QWidget: 

202 """ 

203 Create the status section with progress bar and status label. 

204  

205 Returns: 

206 Widget containing status information 

207 """ 

208 frame = QFrame() 

209 frame.setFrameStyle(QFrame.Shape.Box) 

210 # Frame styling is handled by the main widget stylesheet 

211 

212 layout = QVBoxLayout(frame) 

213 

214 # Status label 

215 self.status_label = QLabel("Ready") 

216 self.status_label.setStyleSheet(f"color: {self.color_scheme.to_hex(self.color_scheme.status_success)}; font-weight: bold;") 

217 layout.addWidget(self.status_label) 

218 

219 # Progress bar 

220 self.progress_bar = QProgressBar() 

221 self.progress_bar.setVisible(False) 

222 # Progress bar styling is handled by the main widget stylesheet 

223 layout.addWidget(self.progress_bar) 

224 

225 return frame 

226 

227 def setup_connections(self): 

228 """Setup signal/slot connections.""" 

229 # Plate list selection 

230 self.plate_list.itemSelectionChanged.connect(self.on_selection_changed) 

231 self.plate_list.itemDoubleClicked.connect(self.on_item_double_clicked) 

232 

233 # Internal signals 

234 self.status_message.connect(self.update_status) 

235 self.orchestrator_state_changed.connect(self.on_orchestrator_state_changed) 

236 

237 # Progress signals for thread-safe UI updates 

238 self.progress_started.connect(self._on_progress_started) 

239 self.progress_updated.connect(self._on_progress_updated) 

240 self.progress_finished.connect(self._on_progress_finished) 

241 

242 # Error handling signals for thread-safe error reporting 

243 self.compilation_error.connect(self._handle_compilation_error) 

244 self.initialization_error.connect(self._handle_initialization_error) 

245 

246 def handle_button_action(self, action: str): 

247 """ 

248 Handle button actions (extracted from Textual version). 

249 

250 Args: 

251 action: Action identifier 

252 """ 

253 # Action mapping (preserved from Textual version) 

254 action_map = { 

255 "add_plate": self.action_add_plate, 

256 "del_plate": self.action_delete_plate, 

257 "edit_config": self.action_edit_config, 

258 "init_plate": self.action_init_plate, 

259 "compile_plate": self.action_compile_plate, 

260 "code_plate": self.action_code_plate, 

261 "save_python_script": self.action_save_python_script, 

262 } 

263 

264 if action in action_map: 

265 action_func = action_map[action] 

266 

267 # Handle async actions 

268 if inspect.iscoroutinefunction(action_func): 

269 self.run_async_action(action_func) 

270 else: 

271 action_func() 

272 elif action == "run_plate": 

273 if self.is_any_plate_running(): 

274 self.run_async_action(self.action_stop_execution) 

275 else: 

276 self.run_async_action(self.action_run_plate) 

277 else: 

278 logger.warning(f"Unknown action: {action}") 

279 

280 def run_async_action(self, async_func: Callable): 

281 """ 

282 Run async action using service adapter. 

283 

284 Args: 

285 async_func: Async function to execute 

286 """ 

287 self.service_adapter.execute_async_operation(async_func) 

288 

289 # ========== Business Logic Methods (Extracted from Textual) ========== 

290 

291 def action_add_plate(self): 

292 """Handle Add Plate button (adapted from Textual version).""" 

293 from openhcs.core.path_cache import PathCacheKey 

294 

295 # Use cached directory dialog (mirrors Textual TUI pattern) 

296 directory_path = self.service_adapter.show_cached_directory_dialog( 

297 cache_key=PathCacheKey.PLATE_IMPORT, 

298 title="Select Plate Directory", 

299 fallback_path=Path.home() 

300 ) 

301 

302 if directory_path: 

303 self.add_plate_callback([directory_path]) 

304 

305 def add_plate_callback(self, selected_paths: List[Path]): 

306 """ 

307 Handle plate directory selection (extracted from Textual version). 

308  

309 Args: 

310 selected_paths: List of selected directory paths 

311 """ 

312 if not selected_paths: 

313 self.status_message.emit("Plate selection cancelled") 

314 return 

315 

316 added_plates = [] 

317 

318 for selected_path in selected_paths: 

319 # Check if plate already exists 

320 if any(plate['path'] == str(selected_path) for plate in self.plates): 

321 continue 

322 

323 # Add the plate to the list 

324 plate_name = selected_path.name 

325 plate_path = str(selected_path) 

326 plate_entry = { 

327 'name': plate_name, 

328 'path': plate_path, 

329 } 

330 

331 self.plates.append(plate_entry) 

332 added_plates.append(plate_name) 

333 

334 if added_plates: 

335 self.update_plate_list() 

336 self.status_message.emit(f"Added {len(added_plates)} plate(s): {', '.join(added_plates)}") 

337 else: 

338 self.status_message.emit("No new plates added (duplicates skipped)") 

339 

340 def action_delete_plate(self): 

341 """Handle Delete Plate button (extracted from Textual version).""" 

342 selected_items = self.get_selected_plates() 

343 if not selected_items: 

344 self.service_adapter.show_error_dialog("No plate selected to delete.") 

345 return 

346 

347 paths_to_delete = {p['path'] for p in selected_items} 

348 self.plates = [p for p in self.plates if p['path'] not in paths_to_delete] 

349 

350 # Clean up orchestrators for deleted plates 

351 for path in paths_to_delete: 

352 if path in self.orchestrators: 

353 del self.orchestrators[path] 

354 

355 if self.selected_plate_path in paths_to_delete: 

356 self.selected_plate_path = "" 

357 # Notify pipeline editor that no plate is selected (mirrors Textual TUI) 

358 self.plate_selected.emit("") 

359 

360 self.update_plate_list() 

361 self.status_message.emit(f"Deleted {len(paths_to_delete)} plate(s)") 

362 

363 async def action_init_plate(self): 

364 """Handle Initialize Plate button (extracted from Textual version).""" 

365 selected_items = self.get_selected_plates() 

366 

367 if not selected_items: 

368 self.service_adapter.show_error_dialog("No plates selected for initialization.") 

369 return 

370 

371 # Use signal for thread-safe progress start 

372 self.progress_started.emit(len(selected_items)) 

373 

374 for i, plate in enumerate(selected_items): 

375 plate_path = plate['path'] 

376 

377 try: 

378 # Initialize orchestrator (heavy operation) 

379 def init_orchestrator(): 

380 return PipelineOrchestrator( 

381 plate_path=plate_path, 

382 global_config=self.global_config, 

383 storage_registry=self.file_manager.registry 

384 ).initialize() 

385 

386 # Run in executor to avoid blocking UI (works in Qt thread) 

387 import asyncio 

388 loop = asyncio.get_event_loop() 

389 orchestrator = await loop.run_in_executor(None, init_orchestrator) 

390 

391 # Store orchestrator 

392 self.orchestrators[plate_path] = orchestrator 

393 self.orchestrator_state_changed.emit(plate_path, "READY") 

394 

395 # Auto-select this plate if no plate is currently selected 

396 if not self.selected_plate_path: 

397 self.selected_plate_path = plate_path 

398 self.plate_selected.emit(plate_path) 

399 # Note: UI selection update removed - not safe from async thread 

400 # The UI will update automatically when orchestrator state changes 

401 

402 # Use signal for thread-safe progress update 

403 self.progress_updated.emit(i + 1) 

404 

405 except Exception as e: 

406 logger.error(f"Failed to initialize plate {plate['name']}: {e}") 

407 # Use signal for thread-safe error reporting 

408 self.initialization_error.emit(plate['name'], str(e)) 

409 

410 # Use signal for thread-safe progress completion 

411 self.progress_finished.emit() 

412 self.status_message.emit(f"Initialized {len(selected_items)} plate(s)") 

413 

414 # Additional action methods would be implemented here following the same pattern... 

415 # (compile_plate, run_plate, code_plate, save_python_script, edit_config) 

416 

417 def action_edit_config(self): 

418 """ 

419 Handle Edit Config button - create per-orchestrator PipelineConfig instances. 

420 

421 This enables per-orchestrator configuration without affecting global configuration. 

422 Shows resolved defaults from GlobalPipelineConfig with "Pipeline default: {value}" placeholders. 

423 """ 

424 selected_items = self.get_selected_plates() 

425 

426 if not selected_items: 

427 self.service_adapter.show_error_dialog("No plates selected for configuration.") 

428 return 

429 

430 # Get selected orchestrators 

431 selected_orchestrators = [ 

432 self.orchestrators[item['path']] for item in selected_items 

433 if item['path'] in self.orchestrators 

434 ] 

435 

436 if not selected_orchestrators: 

437 self.service_adapter.show_error_dialog("No initialized orchestrators selected.") 

438 return 

439 

440 # Load existing config or create new one for editing 

441 representative_orchestrator = selected_orchestrators[0] 

442 

443 if representative_orchestrator.pipeline_config: 

444 # Create editing config from existing orchestrator config with user-set values preserved 

445 # Use current global config (not orchestrator's old global config) for updated placeholders 

446 from openhcs.core.config import create_editing_config_from_existing_lazy_config 

447 current_plate_config = create_editing_config_from_existing_lazy_config( 

448 representative_orchestrator.pipeline_config, 

449 self.global_config # Use current global config for updated placeholders 

450 ) 

451 else: 

452 # Create new config with placeholders using current global config 

453 from openhcs.core.config import create_pipeline_config_for_editing 

454 current_plate_config = create_pipeline_config_for_editing(self.global_config) 

455 

456 def handle_config_save(new_config: PipelineConfig) -> None: 

457 """Apply per-orchestrator configuration without global side effects.""" 

458 for orchestrator in selected_orchestrators: 

459 # Direct synchronous call - no async needed 

460 orchestrator.apply_pipeline_config(new_config) 

461 count = len(selected_orchestrators) 

462 self.service_adapter.show_info_dialog(f"Per-orchestrator configuration applied to {count} orchestrator(s)") 

463 

464 # Open configuration window using PipelineConfig (not GlobalPipelineConfig) 

465 # PipelineConfig already imported from openhcs.core.config 

466 self._open_config_window( 

467 config_class=PipelineConfig, 

468 current_config=current_plate_config, 

469 on_save_callback=handle_config_save 

470 ) 

471 

472 def _open_config_window(self, config_class, current_config, on_save_callback, is_global_config_editing=False): 

473 """ 

474 Open configuration window with specified config class and current config. 

475 

476 Args: 

477 config_class: Configuration class type (PipelineConfig or GlobalPipelineConfig) 

478 current_config: Current configuration instance 

479 on_save_callback: Function to call when config is saved 

480 is_global_config_editing: Whether this is global config editing (affects placeholder behavior) 

481 """ 

482 from openhcs.pyqt_gui.windows.config_window import ConfigWindow 

483 

484 config_window = ConfigWindow( 

485 config_class, # config_class 

486 current_config, # current_config 

487 on_save_callback, # on_save_callback 

488 self.color_scheme, # color_scheme 

489 self, # parent 

490 is_global_config_editing # is_global_config_editing 

491 ) 

492 # Show as non-modal window (like main window configuration) 

493 config_window.show() 

494 config_window.raise_() 

495 config_window.activateWindow() 

496 

497 def action_edit_global_config(self): 

498 """ 

499 Handle global configuration editing - affects all orchestrators. 

500 

501 Uses concrete GlobalPipelineConfig for direct editing with static placeholder defaults. 

502 """ 

503 from openhcs.core.config import get_default_global_config, GlobalPipelineConfig 

504 

505 # Get current global config from service adapter or use default 

506 current_global_config = self.service_adapter.get_global_config() or get_default_global_config() 

507 

508 def handle_global_config_save(new_config: GlobalPipelineConfig) -> None: 

509 """Apply global configuration to all orchestrators and save to cache.""" 

510 self.service_adapter.set_global_config(new_config) # Update app-level config 

511 

512 # Update thread-local storage for MaterializationPathConfig defaults 

513 from openhcs.core.config import set_current_global_config, GlobalPipelineConfig 

514 set_current_global_config(GlobalPipelineConfig, new_config) 

515 

516 # Save to cache for persistence between sessions 

517 self._save_global_config_to_cache(new_config) 

518 

519 for orchestrator in self.orchestrators.values(): 

520 self.run_async_action(orchestrator.apply_new_global_config(new_config)) 

521 self.service_adapter.show_info_dialog("Global configuration applied to all orchestrators") 

522 

523 # Open configuration window using concrete GlobalPipelineConfig 

524 self._open_config_window( 

525 config_class=GlobalPipelineConfig, 

526 current_config=current_global_config, 

527 on_save_callback=handle_global_config_save, 

528 is_global_config_editing=True 

529 ) 

530 

531 def _save_global_config_to_cache(self, config: GlobalPipelineConfig): 

532 """Save global config to cache for persistence between sessions.""" 

533 try: 

534 # Use synchronous saving to ensure it completes 

535 from openhcs.core.config_cache import _sync_save_config 

536 from openhcs.core.xdg_paths import get_config_file_path 

537 

538 cache_file = get_config_file_path("global_config.config") 

539 success = _sync_save_config(config, cache_file) 

540 

541 if success: 

542 logger.info("Global config saved to cache for session persistence") 

543 else: 

544 logger.error("Failed to save global config to cache - sync save returned False") 

545 except Exception as e: 

546 logger.error(f"Failed to save global config to cache: {e}") 

547 # Don't show error dialog as this is not critical for immediate functionality 

548 

549 async def action_compile_plate(self): 

550 """Handle Compile Plate button - compile pipelines for selected plates.""" 

551 selected_items = self.get_selected_plates() 

552 

553 if not selected_items: 

554 logger.warning("No plates available for compilation") 

555 return 

556 

557 # Validate all selected plates are ready for compilation 

558 not_ready = [] 

559 for item in selected_items: 

560 plate_path = item['path'] 

561 orchestrator = self.orchestrators.get(plate_path) 

562 # Allow READY, COMPILE_FAILED, EXEC_FAILED, COMPILED, and COMPLETED states to be compiled/recompiled 

563 if orchestrator is None or orchestrator.state not in [ 

564 OrchestratorState.READY, OrchestratorState.COMPILE_FAILED, 

565 OrchestratorState.EXEC_FAILED, OrchestratorState.COMPILED, 

566 OrchestratorState.COMPLETED 

567 ]: 

568 not_ready.append(item) 

569 

570 if not_ready: 

571 names = [item['name'] for item in not_ready] 

572 # More accurate error message based on actual state 

573 if any(self.orchestrators.get(item['path']) is None for item in not_ready): 

574 error_msg = f"Cannot compile plates that haven't been initialized: {', '.join(names)}" 

575 elif any(self.orchestrators.get(item['path']).state == OrchestratorState.EXECUTING for item in not_ready): 

576 error_msg = f"Cannot compile plates that are currently executing: {', '.join(names)}" 

577 else: 

578 error_msg = f"Cannot compile plates in current state: {', '.join(names)}" 

579 

580 logger.warning(error_msg) 

581 self.service_adapter.show_error_dialog(error_msg) 

582 return 

583 

584 # Validate all selected plates have pipelines 

585 no_pipeline = [] 

586 for item in selected_items: 

587 pipeline = self._get_current_pipeline_definition(item['path']) 

588 if not pipeline: 

589 no_pipeline.append(item) 

590 

591 if no_pipeline: 

592 names = [item['name'] for item in no_pipeline] 

593 error_msg = f"Cannot compile plates without pipelines: {', '.join(names)}" 

594 self.status_message.emit(error_msg) 

595 self.service_adapter.show_error_dialog(error_msg) 

596 return 

597 

598 # Start async compilation 

599 await self._compile_plates_worker(selected_items) 

600 

601 async def _compile_plates_worker(self, selected_items: List[Dict]) -> None: 

602 """Background worker for plate compilation.""" 

603 # Use signals for thread-safe UI updates 

604 self.progress_started.emit(len(selected_items)) 

605 

606 for i, plate_data in enumerate(selected_items): 

607 plate_path = plate_data['path'] 

608 

609 # Get definition pipeline and make fresh copy 

610 definition_pipeline = self._get_current_pipeline_definition(plate_path) 

611 if not definition_pipeline: 

612 logger.warning(f"No pipeline defined for {plate_data['name']}, using empty pipeline") 

613 definition_pipeline = [] 

614 

615 try: 

616 # Get or create orchestrator for compilation (run in executor to avoid blocking) 

617 def get_or_create_orchestrator(): 

618 if plate_path in self.orchestrators: 

619 orchestrator = self.orchestrators[plate_path] 

620 if not orchestrator.is_initialized(): 

621 orchestrator.initialize() 

622 return orchestrator 

623 else: 

624 return PipelineOrchestrator( 

625 plate_path=plate_path, 

626 global_config=self.global_config, 

627 storage_registry=self.file_manager.registry 

628 ).initialize() 

629 

630 # Run in executor (works in Qt thread) 

631 import asyncio 

632 loop = asyncio.get_event_loop() 

633 orchestrator = await loop.run_in_executor(None, get_or_create_orchestrator) 

634 self.orchestrators[plate_path] = orchestrator 

635 

636 # Make fresh copy for compilation 

637 execution_pipeline = copy.deepcopy(definition_pipeline) 

638 

639 # Fix step IDs after deep copy to match new object IDs 

640 for step in execution_pipeline: 

641 step.step_id = str(id(step)) 

642 # Ensure variable_components is never None - use FunctionStep default 

643 if step.variable_components is None: 

644 logger.warning(f"Step '{step.name}' has None variable_components, setting FunctionStep default") 

645 step.variable_components = [VariableComponents.SITE] 

646 # Also ensure it's not an empty list 

647 elif not step.variable_components: 

648 logger.warning(f"Step '{step.name}' has empty variable_components, setting FunctionStep default") 

649 step.variable_components = [VariableComponents.SITE] 

650 

651 # Get wells and compile (async - run in executor to avoid blocking UI) 

652 # Wrap in Pipeline object like test_main.py does 

653 pipeline_obj = Pipeline(steps=execution_pipeline) 

654 

655 # Run heavy operations in executor to avoid blocking UI (works in Qt thread) 

656 import asyncio 

657 loop = asyncio.get_event_loop() 

658 wells = await loop.run_in_executor(None, lambda: orchestrator.get_component_keys(GroupBy.WELL)) 

659 compiled_contexts = await loop.run_in_executor( 

660 None, orchestrator.compile_pipelines, pipeline_obj.steps, wells 

661 ) 

662 

663 # Store compiled data 

664 self.plate_compiled_data[plate_path] = (execution_pipeline, compiled_contexts) 

665 logger.info(f"Successfully compiled {plate_path}") 

666 

667 # Update orchestrator state change signal 

668 self.orchestrator_state_changed.emit(plate_path, "COMPILED") 

669 

670 except Exception as e: 

671 logger.error(f"COMPILATION ERROR: Pipeline compilation failed for {plate_path}: {e}", exc_info=True) 

672 plate_data['error'] = str(e) 

673 # Don't store anything in plate_compiled_data on failure 

674 self.orchestrator_state_changed.emit(plate_path, "COMPILE_FAILED") 

675 # Use signal for thread-safe error reporting instead of direct dialog call 

676 self.compilation_error.emit(plate_data['name'], str(e)) 

677 

678 # Use signal for thread-safe progress update 

679 self.progress_updated.emit(i + 1) 

680 

681 # Use signal for thread-safe progress completion 

682 self.progress_finished.emit() 

683 self.status_message.emit(f"Compilation completed for {len(selected_items)} plate(s)") 

684 self.update_button_states() 

685 

686 async def action_run_plate(self): 

687 """Handle Run Plate button - execute compiled plates.""" 

688 selected_items = self.get_selected_plates() 

689 if not selected_items: 

690 self.service_adapter.show_error_dialog("No plates selected to run.") 

691 return 

692 

693 ready_items = [item for item in selected_items if item.get('path') in self.plate_compiled_data] 

694 if not ready_items: 

695 self.service_adapter.show_error_dialog("Selected plates are not compiled. Please compile first.") 

696 return 

697 

698 try: 

699 # Use subprocess approach like Textual TUI 

700 logger.debug("Using subprocess approach for clean isolation") 

701 

702 plate_paths_to_run = [item['path'] for item in ready_items] 

703 

704 # Pass definition pipeline steps - subprocess will make fresh copy and compile 

705 pipeline_data = {} 

706 for plate_path in plate_paths_to_run: 

707 definition_pipeline = self._get_current_pipeline_definition(plate_path) 

708 pipeline_data[plate_path] = definition_pipeline 

709 

710 logger.info(f"Starting subprocess for {len(plate_paths_to_run)} plates") 

711 

712 # Clear subprocess logs before starting new execution 

713 self.clear_subprocess_logs.emit() 

714 

715 # Create data file for subprocess 

716 data_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl') 

717 

718 # Generate unique ID for this subprocess 

719 import time 

720 subprocess_timestamp = int(time.time()) 

721 plate_names = [Path(path).name for path in plate_paths_to_run] 

722 unique_id = f"plates_{'_'.join(plate_names[:2])}_{subprocess_timestamp}" 

723 

724 # Build subprocess log name using log utilities 

725 from openhcs.core.log_utils import get_current_log_file_path 

726 try: 

727 tui_log_path = get_current_log_file_path() 

728 if tui_log_path.endswith('.log'): 

729 tui_base = tui_log_path[:-4] # Remove .log extension 

730 else: 

731 tui_base = tui_log_path 

732 log_file_base = f"{tui_base}_subprocess_{subprocess_timestamp}" 

733 except RuntimeError: 

734 # Fallback if no main log found 

735 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs" 

736 log_dir.mkdir(parents=True, exist_ok=True) 

737 log_file_base = str(log_dir / f"pyqt_gui_subprocess_{subprocess_timestamp}") 

738 

739 # Pickle data for subprocess 

740 subprocess_data = { 

741 'plate_paths': plate_paths_to_run, 

742 'pipeline_data': pipeline_data, 

743 'global_config': self.global_config 

744 } 

745 

746 # Write pickle data 

747 def _write_pickle_data(): 

748 import dill as pickle 

749 with open(data_file.name, 'wb') as f: 

750 pickle.dump(subprocess_data, f) 

751 data_file.close() 

752 

753 # Write pickle data in executor (works in Qt thread) 

754 import asyncio 

755 loop = asyncio.get_event_loop() 

756 await loop.run_in_executor(None, _write_pickle_data) 

757 

758 logger.debug(f"Created data file: {data_file.name}") 

759 

760 # Create subprocess 

761 subprocess_script = Path(__file__).parent.parent.parent / "textual_tui" / "subprocess_runner.py" 

762 

763 # Generate actual log file path that subprocess will create 

764 actual_log_file_path = f"{log_file_base}_{unique_id}.log" 

765 logger.debug(f"Log file base: {log_file_base}") 

766 logger.debug(f"Unique ID: {unique_id}") 

767 logger.debug(f"Actual log file: {actual_log_file_path}") 

768 

769 # Store log file path for monitoring 

770 self.log_file_path = actual_log_file_path 

771 self.log_file_position = 0 

772 

773 logger.debug(f"Subprocess command: {sys.executable} {subprocess_script} {data_file.name} {log_file_base} {unique_id}") 

774 

775 # Create subprocess 

776 def _create_subprocess(): 

777 return subprocess.Popen([ 

778 sys.executable, str(subprocess_script), 

779 data_file.name, log_file_base, unique_id 

780 ], 

781 stdout=subprocess.DEVNULL, 

782 stderr=subprocess.DEVNULL, 

783 text=True, 

784 ) 

785 

786 # Create subprocess in executor (works in Qt thread) 

787 import asyncio 

788 loop = asyncio.get_event_loop() 

789 self.current_process = await loop.run_in_executor(None, _create_subprocess) 

790 

791 logger.info(f"Subprocess started with PID: {self.current_process.pid}") 

792 

793 # Emit signal for log viewer to start monitoring 

794 self.subprocess_log_started.emit(log_file_base) 

795 

796 # Update orchestrator states to show running state 

797 for plate in ready_items: 

798 plate_path = plate['path'] 

799 if plate_path in self.orchestrators: 

800 self.orchestrators[plate_path]._state = OrchestratorState.EXECUTING 

801 

802 self.execution_state = "running" 

803 self.status_message.emit(f"Running {len(ready_items)} plate(s) in subprocess...") 

804 self.update_button_states() 

805 

806 # Start monitoring 

807 await self._start_monitoring() 

808 

809 except Exception as e: 

810 logger.error(f"Failed to start plate execution: {e}", exc_info=True) 

811 self.service_adapter.show_error_dialog(f"Failed to start execution: {e}") 

812 self.execution_state = "idle" 

813 self.update_button_states() 

814 

815 async def action_stop_execution(self): 

816 """Handle Stop Execution - terminate running subprocess (matches TUI implementation).""" 

817 logger.info("🛑 Stop button pressed. Terminating subprocess.") 

818 self.status_message.emit("Terminating execution...") 

819 

820 if self.current_process and self.current_process.poll() is None: # Still running 

821 try: 

822 # Kill the entire process group, not just the parent process (matches TUI) 

823 # The subprocess creates its own process group, so we need to kill that group 

824 logger.info(f"🛑 Killing process group for PID {self.current_process.pid}...") 

825 

826 # Get the process group ID (should be same as PID since subprocess calls os.setpgrp()) 

827 process_group_id = self.current_process.pid 

828 

829 # Kill entire process group (negative PID kills process group) 

830 import os 

831 import signal 

832 os.killpg(process_group_id, signal.SIGTERM) 

833 

834 # Give processes time to exit gracefully 

835 import asyncio 

836 await asyncio.sleep(1) 

837 

838 # Force kill if still alive 

839 try: 

840 os.killpg(process_group_id, signal.SIGKILL) 

841 logger.info(f"🛑 Force killed process group {process_group_id}") 

842 except ProcessLookupError: 

843 logger.info(f"🛑 Process group {process_group_id} already terminated") 

844 

845 # Reset execution state 

846 self.execution_state = "idle" 

847 self.current_process = None 

848 

849 # Update orchestrator states 

850 for orchestrator in self.orchestrators.values(): 

851 if orchestrator.state == OrchestratorState.EXECUTING: 

852 orchestrator._state = OrchestratorState.COMPILED 

853 

854 self.status_message.emit("Execution terminated by user") 

855 self.update_button_states() 

856 

857 # Emit signal for log viewer 

858 self.subprocess_log_stopped.emit() 

859 

860 except Exception as e: 

861 logger.warning(f"🛑 Error killing process group: {e}, falling back to single process kill") 

862 # Fallback to killing just the main process (original behavior) 

863 self.current_process.terminate() 

864 try: 

865 self.current_process.wait(timeout=5) 

866 except subprocess.TimeoutExpired: 

867 self.current_process.kill() 

868 self.current_process.wait() 

869 

870 # Reset state even on fallback 

871 self.execution_state = "idle" 

872 self.current_process = None 

873 self.status_message.emit("Execution terminated by user") 

874 self.update_button_states() 

875 self.subprocess_log_stopped.emit() 

876 else: 

877 self.service_adapter.show_info_dialog("No execution is currently running.") 

878 

879 def action_code_plate(self): 

880 """Handle Code Generation button (placeholder).""" 

881 self.service_adapter.show_info_dialog("Code generation not yet implemented in PyQt6 version.") 

882 

883 def action_save_python_script(self): 

884 """Handle Save Python Script button (placeholder).""" 

885 self.service_adapter.show_info_dialog("Script saving not yet implemented in PyQt6 version.") 

886 

887 # ========== UI Helper Methods ========== 

888 

889 def update_plate_list(self): 

890 """Update the plate list widget using selection preservation mixin.""" 

891 def format_plate_item(plate): 

892 """Format plate item for display.""" 

893 display_text = f"{plate['name']} ({plate['path']})" 

894 

895 # Add status indicators 

896 status_indicators = [] 

897 if plate['path'] in self.orchestrators: 

898 orchestrator = self.orchestrators[plate['path']] 

899 if orchestrator.state == OrchestratorState.READY: 

900 status_indicators.append("✓ Init") 

901 elif orchestrator.state == OrchestratorState.COMPILED: 

902 status_indicators.append("✓ Compiled") 

903 elif orchestrator.state == OrchestratorState.EXECUTING: 

904 status_indicators.append("🔄 Running") 

905 elif orchestrator.state == OrchestratorState.COMPLETED: 

906 status_indicators.append("✅ Complete") 

907 elif orchestrator.state == OrchestratorState.COMPILE_FAILED: 

908 status_indicators.append("❌ Compile Failed") 

909 elif orchestrator.state == OrchestratorState.EXEC_FAILED: 

910 status_indicators.append("❌ Exec Failed") 

911 

912 if status_indicators: 

913 display_text = f"[{', '.join(status_indicators)}] {display_text}" 

914 

915 return display_text, plate 

916 

917 def update_func(): 

918 """Update function that clears and rebuilds the list.""" 

919 self.plate_list.clear() 

920 

921 for plate in self.plates: 

922 display_text, plate_data = format_plate_item(plate) 

923 item = QListWidgetItem(display_text) 

924 item.setData(Qt.ItemDataRole.UserRole, plate_data) 

925 

926 # Add tooltip 

927 if plate['path'] in self.orchestrators: 

928 orchestrator = self.orchestrators[plate['path']] 

929 item.setToolTip(f"Status: {orchestrator.state.value}") 

930 

931 self.plate_list.addItem(item) 

932 

933 # Auto-select first plate if no selection and plates exist 

934 if self.plates and not self.selected_plate_path: 

935 self.plate_list.setCurrentRow(0) 

936 

937 # Use utility to preserve selection during update 

938 preserve_selection_during_update( 

939 self.plate_list, 

940 lambda item_data: item_data['path'] if isinstance(item_data, dict) and 'path' in item_data else str(item_data), 

941 lambda: bool(self.orchestrators), 

942 update_func 

943 ) 

944 self.update_button_states() 

945 

946 def get_selected_plates(self) -> List[Dict]: 

947 """ 

948 Get currently selected plates. 

949  

950 Returns: 

951 List of selected plate dictionaries 

952 """ 

953 selected_items = [] 

954 for item in self.plate_list.selectedItems(): 

955 plate_data = item.data(Qt.ItemDataRole.UserRole) 

956 if plate_data: 

957 selected_items.append(plate_data) 

958 return selected_items 

959 

960 def update_button_states(self): 

961 """Update button enabled/disabled states based on selection.""" 

962 selected_plates = self.get_selected_plates() 

963 has_selection = len(selected_plates) > 0 

964 has_initialized = any(plate['path'] in self.orchestrators for plate in selected_plates) 

965 has_compiled = any(plate['path'] in self.plate_compiled_data for plate in selected_plates) 

966 is_running = self.is_any_plate_running() 

967 

968 # Update button states (logic extracted from Textual version) 

969 self.buttons["del_plate"].setEnabled(has_selection and not is_running) 

970 self.buttons["edit_config"].setEnabled(has_initialized and not is_running) 

971 self.buttons["init_plate"].setEnabled(has_selection and not is_running) 

972 self.buttons["compile_plate"].setEnabled(has_initialized and not is_running) 

973 self.buttons["code_plate"].setEnabled(has_initialized and not is_running) 

974 self.buttons["save_python_script"].setEnabled(has_initialized and not is_running) 

975 

976 # Run button - enabled if plates are compiled or if currently running (for stop) 

977 if is_running: 

978 self.buttons["run_plate"].setEnabled(True) 

979 self.buttons["run_plate"].setText("Stop") 

980 else: 

981 self.buttons["run_plate"].setEnabled(has_compiled) 

982 self.buttons["run_plate"].setText("Run") 

983 

984 def is_any_plate_running(self) -> bool: 

985 """ 

986 Check if any plate is currently running. 

987  

988 Returns: 

989 True if any plate is running, False otherwise 

990 """ 

991 return self.execution_state == "running" 

992 

993 def update_status(self, message: str): 

994 """ 

995 Update status label. 

996  

997 Args: 

998 message: Status message to display 

999 """ 

1000 self.status_label.setText(message) 

1001 

1002 def on_selection_changed(self): 

1003 """Handle plate list selection changes using utility.""" 

1004 def on_selected(selected_plates): 

1005 self.selected_plate_path = selected_plates[0]['path'] 

1006 self.plate_selected.emit(self.selected_plate_path) 

1007 

1008 def on_cleared(): 

1009 self.selected_plate_path = "" 

1010 

1011 # Use utility to handle selection with prevention 

1012 handle_selection_change_with_prevention( 

1013 self.plate_list, 

1014 self.get_selected_plates, 

1015 lambda item_data: item_data['path'] if isinstance(item_data, dict) and 'path' in item_data else str(item_data), 

1016 lambda: bool(self.orchestrators), 

1017 lambda: self.selected_plate_path, 

1018 on_selected, 

1019 on_cleared 

1020 ) 

1021 

1022 self.update_button_states() 

1023 

1024 

1025 

1026 

1027 

1028 def on_item_double_clicked(self, item: QListWidgetItem): 

1029 """Handle double-click on plate item.""" 

1030 plate_data = item.data(Qt.ItemDataRole.UserRole) 

1031 if plate_data: 

1032 # Double-click could trigger initialization or configuration 

1033 if plate_data['path'] not in self.orchestrators: 

1034 self.run_async_action(self.action_init_plate) 

1035 

1036 def on_orchestrator_state_changed(self, plate_path: str, state: str): 

1037 """ 

1038 Handle orchestrator state changes. 

1039  

1040 Args: 

1041 plate_path: Path of the plate 

1042 state: New orchestrator state 

1043 """ 

1044 self.update_plate_list() 

1045 logger.debug(f"Orchestrator state changed: {plate_path} -> {state}") 

1046 

1047 def on_config_changed(self, new_config: GlobalPipelineConfig): 

1048 """ 

1049 Handle global configuration changes. 

1050 

1051 Args: 

1052 new_config: New global configuration 

1053 """ 

1054 self.global_config = new_config 

1055 

1056 # Apply new global config to all existing orchestrators 

1057 # This rebuilds their pipeline configs preserving concrete values 

1058 for orchestrator in self.orchestrators.values(): 

1059 self.run_async_action(orchestrator.apply_new_global_config(new_config)) 

1060 

1061 logger.info(f"Applied new global config to {len(self.orchestrators)} orchestrators") 

1062 

1063 # ========== Helper Methods ========== 

1064 

1065 def _get_current_pipeline_definition(self, plate_path: str) -> List: 

1066 """ 

1067 Get the current pipeline definition for a plate. 

1068 

1069 Args: 

1070 plate_path: Path to the plate 

1071 

1072 Returns: 

1073 List of pipeline steps or empty list if no pipeline 

1074 """ 

1075 if not self.pipeline_editor: 

1076 logger.warning("No pipeline editor reference - using empty pipeline") 

1077 return [] 

1078 

1079 # Get pipeline for specific plate or current plate 

1080 target_plate = plate_path or getattr(self.pipeline_editor, 'current_plate', None) 

1081 if not target_plate: 

1082 logger.warning("No plate specified - using empty pipeline") 

1083 return [] 

1084 

1085 # Get pipeline from editor (should return List[FunctionStep] directly) 

1086 if hasattr(self.pipeline_editor, 'get_pipeline_for_plate'): 

1087 pipeline_steps = self.pipeline_editor.get_pipeline_for_plate(target_plate) 

1088 elif hasattr(self.pipeline_editor, 'pipeline_steps'): 

1089 # Fallback to current pipeline steps if get_pipeline_for_plate not available 

1090 pipeline_steps = getattr(self.pipeline_editor, 'pipeline_steps', []) 

1091 else: 

1092 logger.warning("Pipeline editor doesn't have expected methods - using empty pipeline") 

1093 return [] 

1094 

1095 return pipeline_steps or [] 

1096 

1097 def set_pipeline_editor(self, pipeline_editor): 

1098 """ 

1099 Set the pipeline editor reference. 

1100 

1101 Args: 

1102 pipeline_editor: Pipeline editor widget instance 

1103 """ 

1104 self.pipeline_editor = pipeline_editor 

1105 logger.debug("Pipeline editor reference set in plate manager") 

1106 

1107 async def _start_monitoring(self): 

1108 """Start monitoring subprocess execution.""" 

1109 if not self.current_process: 

1110 return 

1111 

1112 # Simple monitoring - check if process is still running 

1113 def check_process(): 

1114 if self.current_process and self.current_process.poll() is not None: 

1115 # Process has finished 

1116 return_code = self.current_process.returncode 

1117 logger.info(f"Subprocess finished with return code: {return_code}") 

1118 

1119 # Reset execution state 

1120 self.execution_state = "idle" 

1121 self.current_process = None 

1122 

1123 # Update orchestrator states based on return code 

1124 for orchestrator in self.orchestrators.values(): 

1125 if orchestrator.state == OrchestratorState.EXECUTING: 

1126 if return_code == 0: 

1127 orchestrator._state = OrchestratorState.COMPLETED 

1128 else: 

1129 orchestrator._state = OrchestratorState.EXEC_FAILED 

1130 

1131 if return_code == 0: 

1132 self.status_message.emit("Execution completed successfully") 

1133 else: 

1134 self.status_message.emit(f"Execution failed with code {return_code}") 

1135 

1136 self.update_button_states() 

1137 

1138 # Emit signal for log viewer 

1139 self.subprocess_log_stopped.emit() 

1140 

1141 return False # Stop monitoring 

1142 return True # Continue monitoring 

1143 

1144 # Monitor process in background 

1145 while check_process(): 

1146 await asyncio.sleep(1) # Check every second 

1147 

1148 def _on_progress_started(self, max_value: int): 

1149 """Handle progress started signal (main thread).""" 

1150 self.progress_bar.setVisible(True) 

1151 self.progress_bar.setMaximum(max_value) 

1152 self.progress_bar.setValue(0) 

1153 

1154 def _on_progress_updated(self, value: int): 

1155 """Handle progress updated signal (main thread).""" 

1156 self.progress_bar.setValue(value) 

1157 

1158 def _on_progress_finished(self): 

1159 """Handle progress finished signal (main thread).""" 

1160 self.progress_bar.setVisible(False) 

1161 

1162 def _handle_compilation_error(self, plate_name: str, error_message: str): 

1163 """Handle compilation error on main thread (slot).""" 

1164 self.service_adapter.show_error_dialog(f"Compilation failed for {plate_name}: {error_message}") 

1165 

1166 def _handle_initialization_error(self, plate_name: str, error_message: str): 

1167 """Handle initialization error on main thread (slot).""" 

1168 self.service_adapter.show_error_dialog(f"Failed to initialize {plate_name}: {error_message}")