Coverage for openhcs/pyqt_gui/widgets/plate_manager.py: 0.0%
574 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Plate Manager Widget for PyQt6
4Manages plate selection, initialization, and execution with full feature parity
5to the Textual TUI version. Uses hybrid approach: extracted business logic + clean PyQt6 UI.
6"""
8import logging
9import asyncio
10import inspect
11import copy
12import sys
13import subprocess
14import tempfile
15from typing import List, Dict, Optional, Callable
16from pathlib import Path
18from PyQt6.QtWidgets import (
19 QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QListWidget,
20 QListWidgetItem, QLabel, QMessageBox, QFileDialog, QProgressBar,
21 QCheckBox, QFrame, QSplitter
22)
23from PyQt6.QtCore import Qt, pyqtSignal, QTimer, QThread
24from PyQt6.QtGui import QFont
26from openhcs.core.config import GlobalPipelineConfig, PipelineConfig
27from openhcs.io.filemanager import FileManager
28from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator, OrchestratorState
29from openhcs.core.pipeline import Pipeline
30from openhcs.constants.constants import VariableComponents, GroupBy
31from openhcs.pyqt_gui.widgets.mixins import (
32 preserve_selection_during_update,
33 handle_selection_change_with_prevention
34)
35from openhcs.pyqt_gui.shared.style_generator import StyleSheetGenerator
36from openhcs.pyqt_gui.shared.color_scheme import PyQt6ColorScheme
38logger = logging.getLogger(__name__)
41class PlateManagerWidget(QWidget):
42 """
43 PyQt6 Plate Manager Widget.
45 Manages plate selection, initialization, compilation, and execution.
46 Preserves all business logic from Textual version with clean PyQt6 UI.
47 """
49 # Signals
50 plate_selected = pyqtSignal(str) # plate_path
51 status_message = pyqtSignal(str) # status message
52 orchestrator_state_changed = pyqtSignal(str, str) # plate_path, state
54 # Log viewer integration signals
55 subprocess_log_started = pyqtSignal(str) # base_log_path
56 subprocess_log_stopped = pyqtSignal()
57 clear_subprocess_logs = pyqtSignal()
59 # Progress update signals (thread-safe UI updates)
60 progress_started = pyqtSignal(int) # max_value
61 progress_updated = pyqtSignal(int) # current_value
62 progress_finished = pyqtSignal()
64 # Error handling signals (thread-safe error reporting)
65 compilation_error = pyqtSignal(str, str) # plate_name, error_message
66 initialization_error = pyqtSignal(str, str) # plate_name, error_message
68 def __init__(self, file_manager: FileManager, service_adapter,
69 color_scheme: Optional[PyQt6ColorScheme] = None, parent=None):
70 """
71 Initialize the plate manager widget.
73 Args:
74 file_manager: FileManager instance for file operations
75 service_adapter: PyQt service adapter for dialogs and operations
76 color_scheme: Color scheme for styling (optional, uses service adapter if None)
77 parent: Parent widget
78 """
79 super().__init__(parent)
81 # Core dependencies
82 self.file_manager = file_manager
83 self.service_adapter = service_adapter
84 self.global_config = service_adapter.get_global_config()
85 self.pipeline_editor = None # Will be set by main window
87 # Initialize color scheme and style generator
88 self.color_scheme = color_scheme or service_adapter.get_current_color_scheme()
89 self.style_generator = StyleSheetGenerator(self.color_scheme)
91 # Business logic state (extracted from Textual version)
92 self.plates: List[Dict] = [] # List of plate dictionaries
93 self.selected_plate_path: str = ""
94 self.orchestrators: Dict[str, PipelineOrchestrator] = {}
95 self.plate_configs: Dict[str, Dict] = {}
96 self.plate_compiled_data: Dict[str, tuple] = {} # Store compiled pipeline data
97 self.current_process = None
98 self.execution_state = "idle"
99 self.log_file_path: Optional[str] = None
100 self.log_file_position: int = 0
102 # UI components
103 self.plate_list: Optional[QListWidget] = None
104 self.buttons: Dict[str, QPushButton] = {}
105 self.status_label: Optional[QLabel] = None
106 self.progress_bar: Optional[QProgressBar] = None
108 # Setup UI
109 self.setup_ui()
110 self.setup_connections()
111 self.update_button_states()
113 logger.debug("Plate manager widget initialized")
115 # ========== UI Setup ==========
117 def setup_ui(self):
118 """Setup the user interface."""
119 layout = QVBoxLayout(self)
120 layout.setContentsMargins(5, 5, 5, 5)
121 layout.setSpacing(5)
123 # Title
124 title_label = QLabel("Plate Manager")
125 title_label.setFont(QFont("Arial", 12, QFont.Weight.Bold))
126 title_label.setStyleSheet(f"color: {self.color_scheme.to_hex(self.color_scheme.text_accent)}; padding: 5px;")
127 layout.addWidget(title_label)
129 # Main content splitter
130 splitter = QSplitter(Qt.Orientation.Vertical)
131 layout.addWidget(splitter)
133 # Plate list
134 self.plate_list = QListWidget()
135 self.plate_list.setSelectionMode(QListWidget.SelectionMode.ExtendedSelection)
136 # Apply centralized styling to plate list
137 self.setStyleSheet(self.style_generator.generate_plate_manager_style())
138 splitter.addWidget(self.plate_list)
140 # Button panel
141 button_panel = self.create_button_panel()
142 splitter.addWidget(button_panel)
144 # Status section
145 status_frame = self.create_status_section()
146 layout.addWidget(status_frame)
148 # Set splitter proportions
149 splitter.setSizes([300, 150])
151 def create_button_panel(self) -> QWidget:
152 """
153 Create the button panel with all plate management actions.
155 Returns:
156 Widget containing action buttons
157 """
158 panel = QFrame()
159 panel.setFrameStyle(QFrame.Shape.Box)
160 # Frame styling is handled by the main widget stylesheet
162 layout = QVBoxLayout(panel)
164 # Button configurations (extracted from Textual version)
165 button_configs = [
166 ("Add", "add_plate", "Add new plate directory"),
167 ("Del", "del_plate", "Delete selected plates"),
168 ("Edit", "edit_config", "Edit plate configuration"),
169 ("Init", "init_plate", "Initialize selected plates"),
170 ("Compile", "compile_plate", "Compile plate pipelines"),
171 ("Run", "run_plate", "Run/Stop plate execution"),
172 ("Code", "code_plate", "Generate Python code"),
173 ("Save", "save_python_script", "Save Python script"),
174 ]
176 # Create buttons in rows
177 for i in range(0, len(button_configs), 4):
178 row_layout = QHBoxLayout()
180 for j in range(4):
181 if i + j < len(button_configs):
182 name, action, tooltip = button_configs[i + j]
184 button = QPushButton(name)
185 button.setToolTip(tooltip)
186 button.setMinimumHeight(30)
187 # Button styling is handled by the main widget stylesheet
189 # Connect button to action
190 button.clicked.connect(lambda checked, a=action: self.handle_button_action(a))
192 self.buttons[action] = button
193 row_layout.addWidget(button)
194 else:
195 row_layout.addStretch()
197 layout.addLayout(row_layout)
199 return panel
201 def create_status_section(self) -> QWidget:
202 """
203 Create the status section with progress bar and status label.
205 Returns:
206 Widget containing status information
207 """
208 frame = QFrame()
209 frame.setFrameStyle(QFrame.Shape.Box)
210 # Frame styling is handled by the main widget stylesheet
212 layout = QVBoxLayout(frame)
214 # Status label
215 self.status_label = QLabel("Ready")
216 self.status_label.setStyleSheet(f"color: {self.color_scheme.to_hex(self.color_scheme.status_success)}; font-weight: bold;")
217 layout.addWidget(self.status_label)
219 # Progress bar
220 self.progress_bar = QProgressBar()
221 self.progress_bar.setVisible(False)
222 # Progress bar styling is handled by the main widget stylesheet
223 layout.addWidget(self.progress_bar)
225 return frame
227 def setup_connections(self):
228 """Setup signal/slot connections."""
229 # Plate list selection
230 self.plate_list.itemSelectionChanged.connect(self.on_selection_changed)
231 self.plate_list.itemDoubleClicked.connect(self.on_item_double_clicked)
233 # Internal signals
234 self.status_message.connect(self.update_status)
235 self.orchestrator_state_changed.connect(self.on_orchestrator_state_changed)
237 # Progress signals for thread-safe UI updates
238 self.progress_started.connect(self._on_progress_started)
239 self.progress_updated.connect(self._on_progress_updated)
240 self.progress_finished.connect(self._on_progress_finished)
242 # Error handling signals for thread-safe error reporting
243 self.compilation_error.connect(self._handle_compilation_error)
244 self.initialization_error.connect(self._handle_initialization_error)
246 def handle_button_action(self, action: str):
247 """
248 Handle button actions (extracted from Textual version).
250 Args:
251 action: Action identifier
252 """
253 # Action mapping (preserved from Textual version)
254 action_map = {
255 "add_plate": self.action_add_plate,
256 "del_plate": self.action_delete_plate,
257 "edit_config": self.action_edit_config,
258 "init_plate": self.action_init_plate,
259 "compile_plate": self.action_compile_plate,
260 "code_plate": self.action_code_plate,
261 "save_python_script": self.action_save_python_script,
262 }
264 if action in action_map:
265 action_func = action_map[action]
267 # Handle async actions
268 if inspect.iscoroutinefunction(action_func):
269 self.run_async_action(action_func)
270 else:
271 action_func()
272 elif action == "run_plate":
273 if self.is_any_plate_running():
274 self.run_async_action(self.action_stop_execution)
275 else:
276 self.run_async_action(self.action_run_plate)
277 else:
278 logger.warning(f"Unknown action: {action}")
280 def run_async_action(self, async_func: Callable):
281 """
282 Run async action using service adapter.
284 Args:
285 async_func: Async function to execute
286 """
287 self.service_adapter.execute_async_operation(async_func)
289 # ========== Business Logic Methods (Extracted from Textual) ==========
291 def action_add_plate(self):
292 """Handle Add Plate button (adapted from Textual version)."""
293 from openhcs.core.path_cache import PathCacheKey
295 # Use cached directory dialog (mirrors Textual TUI pattern)
296 directory_path = self.service_adapter.show_cached_directory_dialog(
297 cache_key=PathCacheKey.PLATE_IMPORT,
298 title="Select Plate Directory",
299 fallback_path=Path.home()
300 )
302 if directory_path:
303 self.add_plate_callback([directory_path])
305 def add_plate_callback(self, selected_paths: List[Path]):
306 """
307 Handle plate directory selection (extracted from Textual version).
309 Args:
310 selected_paths: List of selected directory paths
311 """
312 if not selected_paths:
313 self.status_message.emit("Plate selection cancelled")
314 return
316 added_plates = []
318 for selected_path in selected_paths:
319 # Check if plate already exists
320 if any(plate['path'] == str(selected_path) for plate in self.plates):
321 continue
323 # Add the plate to the list
324 plate_name = selected_path.name
325 plate_path = str(selected_path)
326 plate_entry = {
327 'name': plate_name,
328 'path': plate_path,
329 }
331 self.plates.append(plate_entry)
332 added_plates.append(plate_name)
334 if added_plates:
335 self.update_plate_list()
336 self.status_message.emit(f"Added {len(added_plates)} plate(s): {', '.join(added_plates)}")
337 else:
338 self.status_message.emit("No new plates added (duplicates skipped)")
340 def action_delete_plate(self):
341 """Handle Delete Plate button (extracted from Textual version)."""
342 selected_items = self.get_selected_plates()
343 if not selected_items:
344 self.service_adapter.show_error_dialog("No plate selected to delete.")
345 return
347 paths_to_delete = {p['path'] for p in selected_items}
348 self.plates = [p for p in self.plates if p['path'] not in paths_to_delete]
350 # Clean up orchestrators for deleted plates
351 for path in paths_to_delete:
352 if path in self.orchestrators:
353 del self.orchestrators[path]
355 if self.selected_plate_path in paths_to_delete:
356 self.selected_plate_path = ""
357 # Notify pipeline editor that no plate is selected (mirrors Textual TUI)
358 self.plate_selected.emit("")
360 self.update_plate_list()
361 self.status_message.emit(f"Deleted {len(paths_to_delete)} plate(s)")
363 async def action_init_plate(self):
364 """Handle Initialize Plate button (extracted from Textual version)."""
365 selected_items = self.get_selected_plates()
367 if not selected_items:
368 self.service_adapter.show_error_dialog("No plates selected for initialization.")
369 return
371 # Use signal for thread-safe progress start
372 self.progress_started.emit(len(selected_items))
374 for i, plate in enumerate(selected_items):
375 plate_path = plate['path']
377 try:
378 # Initialize orchestrator (heavy operation)
379 def init_orchestrator():
380 return PipelineOrchestrator(
381 plate_path=plate_path,
382 global_config=self.global_config,
383 storage_registry=self.file_manager.registry
384 ).initialize()
386 # Run in executor to avoid blocking UI (works in Qt thread)
387 import asyncio
388 loop = asyncio.get_event_loop()
389 orchestrator = await loop.run_in_executor(None, init_orchestrator)
391 # Store orchestrator
392 self.orchestrators[plate_path] = orchestrator
393 self.orchestrator_state_changed.emit(plate_path, "READY")
395 # Auto-select this plate if no plate is currently selected
396 if not self.selected_plate_path:
397 self.selected_plate_path = plate_path
398 self.plate_selected.emit(plate_path)
399 # Note: UI selection update removed - not safe from async thread
400 # The UI will update automatically when orchestrator state changes
402 # Use signal for thread-safe progress update
403 self.progress_updated.emit(i + 1)
405 except Exception as e:
406 logger.error(f"Failed to initialize plate {plate['name']}: {e}")
407 # Use signal for thread-safe error reporting
408 self.initialization_error.emit(plate['name'], str(e))
410 # Use signal for thread-safe progress completion
411 self.progress_finished.emit()
412 self.status_message.emit(f"Initialized {len(selected_items)} plate(s)")
414 # Additional action methods would be implemented here following the same pattern...
415 # (compile_plate, run_plate, code_plate, save_python_script, edit_config)
417 def action_edit_config(self):
418 """
419 Handle Edit Config button - create per-orchestrator PipelineConfig instances.
421 This enables per-orchestrator configuration without affecting global configuration.
422 Shows resolved defaults from GlobalPipelineConfig with "Pipeline default: {value}" placeholders.
423 """
424 selected_items = self.get_selected_plates()
426 if not selected_items:
427 self.service_adapter.show_error_dialog("No plates selected for configuration.")
428 return
430 # Get selected orchestrators
431 selected_orchestrators = [
432 self.orchestrators[item['path']] for item in selected_items
433 if item['path'] in self.orchestrators
434 ]
436 if not selected_orchestrators:
437 self.service_adapter.show_error_dialog("No initialized orchestrators selected.")
438 return
440 # Load existing config or create new one for editing
441 representative_orchestrator = selected_orchestrators[0]
443 if representative_orchestrator.pipeline_config:
444 # Create editing config from existing orchestrator config with user-set values preserved
445 # Use current global config (not orchestrator's old global config) for updated placeholders
446 from openhcs.core.config import create_editing_config_from_existing_lazy_config
447 current_plate_config = create_editing_config_from_existing_lazy_config(
448 representative_orchestrator.pipeline_config,
449 self.global_config # Use current global config for updated placeholders
450 )
451 else:
452 # Create new config with placeholders using current global config
453 from openhcs.core.config import create_pipeline_config_for_editing
454 current_plate_config = create_pipeline_config_for_editing(self.global_config)
456 def handle_config_save(new_config: PipelineConfig) -> None:
457 """Apply per-orchestrator configuration without global side effects."""
458 for orchestrator in selected_orchestrators:
459 # Direct synchronous call - no async needed
460 orchestrator.apply_pipeline_config(new_config)
461 count = len(selected_orchestrators)
462 self.service_adapter.show_info_dialog(f"Per-orchestrator configuration applied to {count} orchestrator(s)")
464 # Open configuration window using PipelineConfig (not GlobalPipelineConfig)
465 # PipelineConfig already imported from openhcs.core.config
466 self._open_config_window(
467 config_class=PipelineConfig,
468 current_config=current_plate_config,
469 on_save_callback=handle_config_save
470 )
472 def _open_config_window(self, config_class, current_config, on_save_callback, is_global_config_editing=False):
473 """
474 Open configuration window with specified config class and current config.
476 Args:
477 config_class: Configuration class type (PipelineConfig or GlobalPipelineConfig)
478 current_config: Current configuration instance
479 on_save_callback: Function to call when config is saved
480 is_global_config_editing: Whether this is global config editing (affects placeholder behavior)
481 """
482 from openhcs.pyqt_gui.windows.config_window import ConfigWindow
484 config_window = ConfigWindow(
485 config_class, # config_class
486 current_config, # current_config
487 on_save_callback, # on_save_callback
488 self.color_scheme, # color_scheme
489 self, # parent
490 is_global_config_editing # is_global_config_editing
491 )
492 # Show as non-modal window (like main window configuration)
493 config_window.show()
494 config_window.raise_()
495 config_window.activateWindow()
497 def action_edit_global_config(self):
498 """
499 Handle global configuration editing - affects all orchestrators.
501 Uses concrete GlobalPipelineConfig for direct editing with static placeholder defaults.
502 """
503 from openhcs.core.config import get_default_global_config, GlobalPipelineConfig
505 # Get current global config from service adapter or use default
506 current_global_config = self.service_adapter.get_global_config() or get_default_global_config()
508 def handle_global_config_save(new_config: GlobalPipelineConfig) -> None:
509 """Apply global configuration to all orchestrators and save to cache."""
510 self.service_adapter.set_global_config(new_config) # Update app-level config
512 # Update thread-local storage for MaterializationPathConfig defaults
513 from openhcs.core.config import set_current_global_config, GlobalPipelineConfig
514 set_current_global_config(GlobalPipelineConfig, new_config)
516 # Save to cache for persistence between sessions
517 self._save_global_config_to_cache(new_config)
519 for orchestrator in self.orchestrators.values():
520 self.run_async_action(orchestrator.apply_new_global_config(new_config))
521 self.service_adapter.show_info_dialog("Global configuration applied to all orchestrators")
523 # Open configuration window using concrete GlobalPipelineConfig
524 self._open_config_window(
525 config_class=GlobalPipelineConfig,
526 current_config=current_global_config,
527 on_save_callback=handle_global_config_save,
528 is_global_config_editing=True
529 )
531 def _save_global_config_to_cache(self, config: GlobalPipelineConfig):
532 """Save global config to cache for persistence between sessions."""
533 try:
534 # Use synchronous saving to ensure it completes
535 from openhcs.core.config_cache import _sync_save_config
536 from openhcs.core.xdg_paths import get_config_file_path
538 cache_file = get_config_file_path("global_config.config")
539 success = _sync_save_config(config, cache_file)
541 if success:
542 logger.info("Global config saved to cache for session persistence")
543 else:
544 logger.error("Failed to save global config to cache - sync save returned False")
545 except Exception as e:
546 logger.error(f"Failed to save global config to cache: {e}")
547 # Don't show error dialog as this is not critical for immediate functionality
549 async def action_compile_plate(self):
550 """Handle Compile Plate button - compile pipelines for selected plates."""
551 selected_items = self.get_selected_plates()
553 if not selected_items:
554 logger.warning("No plates available for compilation")
555 return
557 # Validate all selected plates are ready for compilation
558 not_ready = []
559 for item in selected_items:
560 plate_path = item['path']
561 orchestrator = self.orchestrators.get(plate_path)
562 # Allow READY, COMPILE_FAILED, EXEC_FAILED, COMPILED, and COMPLETED states to be compiled/recompiled
563 if orchestrator is None or orchestrator.state not in [
564 OrchestratorState.READY, OrchestratorState.COMPILE_FAILED,
565 OrchestratorState.EXEC_FAILED, OrchestratorState.COMPILED,
566 OrchestratorState.COMPLETED
567 ]:
568 not_ready.append(item)
570 if not_ready:
571 names = [item['name'] for item in not_ready]
572 # More accurate error message based on actual state
573 if any(self.orchestrators.get(item['path']) is None for item in not_ready):
574 error_msg = f"Cannot compile plates that haven't been initialized: {', '.join(names)}"
575 elif any(self.orchestrators.get(item['path']).state == OrchestratorState.EXECUTING for item in not_ready):
576 error_msg = f"Cannot compile plates that are currently executing: {', '.join(names)}"
577 else:
578 error_msg = f"Cannot compile plates in current state: {', '.join(names)}"
580 logger.warning(error_msg)
581 self.service_adapter.show_error_dialog(error_msg)
582 return
584 # Validate all selected plates have pipelines
585 no_pipeline = []
586 for item in selected_items:
587 pipeline = self._get_current_pipeline_definition(item['path'])
588 if not pipeline:
589 no_pipeline.append(item)
591 if no_pipeline:
592 names = [item['name'] for item in no_pipeline]
593 error_msg = f"Cannot compile plates without pipelines: {', '.join(names)}"
594 self.status_message.emit(error_msg)
595 self.service_adapter.show_error_dialog(error_msg)
596 return
598 # Start async compilation
599 await self._compile_plates_worker(selected_items)
601 async def _compile_plates_worker(self, selected_items: List[Dict]) -> None:
602 """Background worker for plate compilation."""
603 # Use signals for thread-safe UI updates
604 self.progress_started.emit(len(selected_items))
606 for i, plate_data in enumerate(selected_items):
607 plate_path = plate_data['path']
609 # Get definition pipeline and make fresh copy
610 definition_pipeline = self._get_current_pipeline_definition(plate_path)
611 if not definition_pipeline:
612 logger.warning(f"No pipeline defined for {plate_data['name']}, using empty pipeline")
613 definition_pipeline = []
615 try:
616 # Get or create orchestrator for compilation (run in executor to avoid blocking)
617 def get_or_create_orchestrator():
618 if plate_path in self.orchestrators:
619 orchestrator = self.orchestrators[plate_path]
620 if not orchestrator.is_initialized():
621 orchestrator.initialize()
622 return orchestrator
623 else:
624 return PipelineOrchestrator(
625 plate_path=plate_path,
626 global_config=self.global_config,
627 storage_registry=self.file_manager.registry
628 ).initialize()
630 # Run in executor (works in Qt thread)
631 import asyncio
632 loop = asyncio.get_event_loop()
633 orchestrator = await loop.run_in_executor(None, get_or_create_orchestrator)
634 self.orchestrators[plate_path] = orchestrator
636 # Make fresh copy for compilation
637 execution_pipeline = copy.deepcopy(definition_pipeline)
639 # Fix step IDs after deep copy to match new object IDs
640 for step in execution_pipeline:
641 step.step_id = str(id(step))
642 # Ensure variable_components is never None - use FunctionStep default
643 if step.variable_components is None:
644 logger.warning(f"Step '{step.name}' has None variable_components, setting FunctionStep default")
645 step.variable_components = [VariableComponents.SITE]
646 # Also ensure it's not an empty list
647 elif not step.variable_components:
648 logger.warning(f"Step '{step.name}' has empty variable_components, setting FunctionStep default")
649 step.variable_components = [VariableComponents.SITE]
651 # Get wells and compile (async - run in executor to avoid blocking UI)
652 # Wrap in Pipeline object like test_main.py does
653 pipeline_obj = Pipeline(steps=execution_pipeline)
655 # Run heavy operations in executor to avoid blocking UI (works in Qt thread)
656 import asyncio
657 loop = asyncio.get_event_loop()
658 wells = await loop.run_in_executor(None, lambda: orchestrator.get_component_keys(GroupBy.WELL))
659 compiled_contexts = await loop.run_in_executor(
660 None, orchestrator.compile_pipelines, pipeline_obj.steps, wells
661 )
663 # Store compiled data
664 self.plate_compiled_data[plate_path] = (execution_pipeline, compiled_contexts)
665 logger.info(f"Successfully compiled {plate_path}")
667 # Update orchestrator state change signal
668 self.orchestrator_state_changed.emit(plate_path, "COMPILED")
670 except Exception as e:
671 logger.error(f"COMPILATION ERROR: Pipeline compilation failed for {plate_path}: {e}", exc_info=True)
672 plate_data['error'] = str(e)
673 # Don't store anything in plate_compiled_data on failure
674 self.orchestrator_state_changed.emit(plate_path, "COMPILE_FAILED")
675 # Use signal for thread-safe error reporting instead of direct dialog call
676 self.compilation_error.emit(plate_data['name'], str(e))
678 # Use signal for thread-safe progress update
679 self.progress_updated.emit(i + 1)
681 # Use signal for thread-safe progress completion
682 self.progress_finished.emit()
683 self.status_message.emit(f"Compilation completed for {len(selected_items)} plate(s)")
684 self.update_button_states()
686 async def action_run_plate(self):
687 """Handle Run Plate button - execute compiled plates."""
688 selected_items = self.get_selected_plates()
689 if not selected_items:
690 self.service_adapter.show_error_dialog("No plates selected to run.")
691 return
693 ready_items = [item for item in selected_items if item.get('path') in self.plate_compiled_data]
694 if not ready_items:
695 self.service_adapter.show_error_dialog("Selected plates are not compiled. Please compile first.")
696 return
698 try:
699 # Use subprocess approach like Textual TUI
700 logger.debug("Using subprocess approach for clean isolation")
702 plate_paths_to_run = [item['path'] for item in ready_items]
704 # Pass definition pipeline steps - subprocess will make fresh copy and compile
705 pipeline_data = {}
706 for plate_path in plate_paths_to_run:
707 definition_pipeline = self._get_current_pipeline_definition(plate_path)
708 pipeline_data[plate_path] = definition_pipeline
710 logger.info(f"Starting subprocess for {len(plate_paths_to_run)} plates")
712 # Clear subprocess logs before starting new execution
713 self.clear_subprocess_logs.emit()
715 # Create data file for subprocess
716 data_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl')
718 # Generate unique ID for this subprocess
719 import time
720 subprocess_timestamp = int(time.time())
721 plate_names = [Path(path).name for path in plate_paths_to_run]
722 unique_id = f"plates_{'_'.join(plate_names[:2])}_{subprocess_timestamp}"
724 # Build subprocess log name using log utilities
725 from openhcs.core.log_utils import get_current_log_file_path
726 try:
727 tui_log_path = get_current_log_file_path()
728 if tui_log_path.endswith('.log'):
729 tui_base = tui_log_path[:-4] # Remove .log extension
730 else:
731 tui_base = tui_log_path
732 log_file_base = f"{tui_base}_subprocess_{subprocess_timestamp}"
733 except RuntimeError:
734 # Fallback if no main log found
735 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs"
736 log_dir.mkdir(parents=True, exist_ok=True)
737 log_file_base = str(log_dir / f"pyqt_gui_subprocess_{subprocess_timestamp}")
739 # Pickle data for subprocess
740 subprocess_data = {
741 'plate_paths': plate_paths_to_run,
742 'pipeline_data': pipeline_data,
743 'global_config': self.global_config
744 }
746 # Write pickle data
747 def _write_pickle_data():
748 import dill as pickle
749 with open(data_file.name, 'wb') as f:
750 pickle.dump(subprocess_data, f)
751 data_file.close()
753 # Write pickle data in executor (works in Qt thread)
754 import asyncio
755 loop = asyncio.get_event_loop()
756 await loop.run_in_executor(None, _write_pickle_data)
758 logger.debug(f"Created data file: {data_file.name}")
760 # Create subprocess
761 subprocess_script = Path(__file__).parent.parent.parent / "textual_tui" / "subprocess_runner.py"
763 # Generate actual log file path that subprocess will create
764 actual_log_file_path = f"{log_file_base}_{unique_id}.log"
765 logger.debug(f"Log file base: {log_file_base}")
766 logger.debug(f"Unique ID: {unique_id}")
767 logger.debug(f"Actual log file: {actual_log_file_path}")
769 # Store log file path for monitoring
770 self.log_file_path = actual_log_file_path
771 self.log_file_position = 0
773 logger.debug(f"Subprocess command: {sys.executable} {subprocess_script} {data_file.name} {log_file_base} {unique_id}")
775 # Create subprocess
776 def _create_subprocess():
777 return subprocess.Popen([
778 sys.executable, str(subprocess_script),
779 data_file.name, log_file_base, unique_id
780 ],
781 stdout=subprocess.DEVNULL,
782 stderr=subprocess.DEVNULL,
783 text=True,
784 )
786 # Create subprocess in executor (works in Qt thread)
787 import asyncio
788 loop = asyncio.get_event_loop()
789 self.current_process = await loop.run_in_executor(None, _create_subprocess)
791 logger.info(f"Subprocess started with PID: {self.current_process.pid}")
793 # Emit signal for log viewer to start monitoring
794 self.subprocess_log_started.emit(log_file_base)
796 # Update orchestrator states to show running state
797 for plate in ready_items:
798 plate_path = plate['path']
799 if plate_path in self.orchestrators:
800 self.orchestrators[plate_path]._state = OrchestratorState.EXECUTING
802 self.execution_state = "running"
803 self.status_message.emit(f"Running {len(ready_items)} plate(s) in subprocess...")
804 self.update_button_states()
806 # Start monitoring
807 await self._start_monitoring()
809 except Exception as e:
810 logger.error(f"Failed to start plate execution: {e}", exc_info=True)
811 self.service_adapter.show_error_dialog(f"Failed to start execution: {e}")
812 self.execution_state = "idle"
813 self.update_button_states()
815 async def action_stop_execution(self):
816 """Handle Stop Execution - terminate running subprocess (matches TUI implementation)."""
817 logger.info("🛑 Stop button pressed. Terminating subprocess.")
818 self.status_message.emit("Terminating execution...")
820 if self.current_process and self.current_process.poll() is None: # Still running
821 try:
822 # Kill the entire process group, not just the parent process (matches TUI)
823 # The subprocess creates its own process group, so we need to kill that group
824 logger.info(f"🛑 Killing process group for PID {self.current_process.pid}...")
826 # Get the process group ID (should be same as PID since subprocess calls os.setpgrp())
827 process_group_id = self.current_process.pid
829 # Kill entire process group (negative PID kills process group)
830 import os
831 import signal
832 os.killpg(process_group_id, signal.SIGTERM)
834 # Give processes time to exit gracefully
835 import asyncio
836 await asyncio.sleep(1)
838 # Force kill if still alive
839 try:
840 os.killpg(process_group_id, signal.SIGKILL)
841 logger.info(f"🛑 Force killed process group {process_group_id}")
842 except ProcessLookupError:
843 logger.info(f"🛑 Process group {process_group_id} already terminated")
845 # Reset execution state
846 self.execution_state = "idle"
847 self.current_process = None
849 # Update orchestrator states
850 for orchestrator in self.orchestrators.values():
851 if orchestrator.state == OrchestratorState.EXECUTING:
852 orchestrator._state = OrchestratorState.COMPILED
854 self.status_message.emit("Execution terminated by user")
855 self.update_button_states()
857 # Emit signal for log viewer
858 self.subprocess_log_stopped.emit()
860 except Exception as e:
861 logger.warning(f"🛑 Error killing process group: {e}, falling back to single process kill")
862 # Fallback to killing just the main process (original behavior)
863 self.current_process.terminate()
864 try:
865 self.current_process.wait(timeout=5)
866 except subprocess.TimeoutExpired:
867 self.current_process.kill()
868 self.current_process.wait()
870 # Reset state even on fallback
871 self.execution_state = "idle"
872 self.current_process = None
873 self.status_message.emit("Execution terminated by user")
874 self.update_button_states()
875 self.subprocess_log_stopped.emit()
876 else:
877 self.service_adapter.show_info_dialog("No execution is currently running.")
879 def action_code_plate(self):
880 """Handle Code Generation button (placeholder)."""
881 self.service_adapter.show_info_dialog("Code generation not yet implemented in PyQt6 version.")
883 def action_save_python_script(self):
884 """Handle Save Python Script button (placeholder)."""
885 self.service_adapter.show_info_dialog("Script saving not yet implemented in PyQt6 version.")
887 # ========== UI Helper Methods ==========
889 def update_plate_list(self):
890 """Update the plate list widget using selection preservation mixin."""
891 def format_plate_item(plate):
892 """Format plate item for display."""
893 display_text = f"{plate['name']} ({plate['path']})"
895 # Add status indicators
896 status_indicators = []
897 if plate['path'] in self.orchestrators:
898 orchestrator = self.orchestrators[plate['path']]
899 if orchestrator.state == OrchestratorState.READY:
900 status_indicators.append("✓ Init")
901 elif orchestrator.state == OrchestratorState.COMPILED:
902 status_indicators.append("✓ Compiled")
903 elif orchestrator.state == OrchestratorState.EXECUTING:
904 status_indicators.append("🔄 Running")
905 elif orchestrator.state == OrchestratorState.COMPLETED:
906 status_indicators.append("✅ Complete")
907 elif orchestrator.state == OrchestratorState.COMPILE_FAILED:
908 status_indicators.append("❌ Compile Failed")
909 elif orchestrator.state == OrchestratorState.EXEC_FAILED:
910 status_indicators.append("❌ Exec Failed")
912 if status_indicators:
913 display_text = f"[{', '.join(status_indicators)}] {display_text}"
915 return display_text, plate
917 def update_func():
918 """Update function that clears and rebuilds the list."""
919 self.plate_list.clear()
921 for plate in self.plates:
922 display_text, plate_data = format_plate_item(plate)
923 item = QListWidgetItem(display_text)
924 item.setData(Qt.ItemDataRole.UserRole, plate_data)
926 # Add tooltip
927 if plate['path'] in self.orchestrators:
928 orchestrator = self.orchestrators[plate['path']]
929 item.setToolTip(f"Status: {orchestrator.state.value}")
931 self.plate_list.addItem(item)
933 # Auto-select first plate if no selection and plates exist
934 if self.plates and not self.selected_plate_path:
935 self.plate_list.setCurrentRow(0)
937 # Use utility to preserve selection during update
938 preserve_selection_during_update(
939 self.plate_list,
940 lambda item_data: item_data['path'] if isinstance(item_data, dict) and 'path' in item_data else str(item_data),
941 lambda: bool(self.orchestrators),
942 update_func
943 )
944 self.update_button_states()
946 def get_selected_plates(self) -> List[Dict]:
947 """
948 Get currently selected plates.
950 Returns:
951 List of selected plate dictionaries
952 """
953 selected_items = []
954 for item in self.plate_list.selectedItems():
955 plate_data = item.data(Qt.ItemDataRole.UserRole)
956 if plate_data:
957 selected_items.append(plate_data)
958 return selected_items
960 def update_button_states(self):
961 """Update button enabled/disabled states based on selection."""
962 selected_plates = self.get_selected_plates()
963 has_selection = len(selected_plates) > 0
964 has_initialized = any(plate['path'] in self.orchestrators for plate in selected_plates)
965 has_compiled = any(plate['path'] in self.plate_compiled_data for plate in selected_plates)
966 is_running = self.is_any_plate_running()
968 # Update button states (logic extracted from Textual version)
969 self.buttons["del_plate"].setEnabled(has_selection and not is_running)
970 self.buttons["edit_config"].setEnabled(has_initialized and not is_running)
971 self.buttons["init_plate"].setEnabled(has_selection and not is_running)
972 self.buttons["compile_plate"].setEnabled(has_initialized and not is_running)
973 self.buttons["code_plate"].setEnabled(has_initialized and not is_running)
974 self.buttons["save_python_script"].setEnabled(has_initialized and not is_running)
976 # Run button - enabled if plates are compiled or if currently running (for stop)
977 if is_running:
978 self.buttons["run_plate"].setEnabled(True)
979 self.buttons["run_plate"].setText("Stop")
980 else:
981 self.buttons["run_plate"].setEnabled(has_compiled)
982 self.buttons["run_plate"].setText("Run")
984 def is_any_plate_running(self) -> bool:
985 """
986 Check if any plate is currently running.
988 Returns:
989 True if any plate is running, False otherwise
990 """
991 return self.execution_state == "running"
993 def update_status(self, message: str):
994 """
995 Update status label.
997 Args:
998 message: Status message to display
999 """
1000 self.status_label.setText(message)
1002 def on_selection_changed(self):
1003 """Handle plate list selection changes using utility."""
1004 def on_selected(selected_plates):
1005 self.selected_plate_path = selected_plates[0]['path']
1006 self.plate_selected.emit(self.selected_plate_path)
1008 def on_cleared():
1009 self.selected_plate_path = ""
1011 # Use utility to handle selection with prevention
1012 handle_selection_change_with_prevention(
1013 self.plate_list,
1014 self.get_selected_plates,
1015 lambda item_data: item_data['path'] if isinstance(item_data, dict) and 'path' in item_data else str(item_data),
1016 lambda: bool(self.orchestrators),
1017 lambda: self.selected_plate_path,
1018 on_selected,
1019 on_cleared
1020 )
1022 self.update_button_states()
1028 def on_item_double_clicked(self, item: QListWidgetItem):
1029 """Handle double-click on plate item."""
1030 plate_data = item.data(Qt.ItemDataRole.UserRole)
1031 if plate_data:
1032 # Double-click could trigger initialization or configuration
1033 if plate_data['path'] not in self.orchestrators:
1034 self.run_async_action(self.action_init_plate)
1036 def on_orchestrator_state_changed(self, plate_path: str, state: str):
1037 """
1038 Handle orchestrator state changes.
1040 Args:
1041 plate_path: Path of the plate
1042 state: New orchestrator state
1043 """
1044 self.update_plate_list()
1045 logger.debug(f"Orchestrator state changed: {plate_path} -> {state}")
1047 def on_config_changed(self, new_config: GlobalPipelineConfig):
1048 """
1049 Handle global configuration changes.
1051 Args:
1052 new_config: New global configuration
1053 """
1054 self.global_config = new_config
1056 # Apply new global config to all existing orchestrators
1057 # This rebuilds their pipeline configs preserving concrete values
1058 for orchestrator in self.orchestrators.values():
1059 self.run_async_action(orchestrator.apply_new_global_config(new_config))
1061 logger.info(f"Applied new global config to {len(self.orchestrators)} orchestrators")
1063 # ========== Helper Methods ==========
1065 def _get_current_pipeline_definition(self, plate_path: str) -> List:
1066 """
1067 Get the current pipeline definition for a plate.
1069 Args:
1070 plate_path: Path to the plate
1072 Returns:
1073 List of pipeline steps or empty list if no pipeline
1074 """
1075 if not self.pipeline_editor:
1076 logger.warning("No pipeline editor reference - using empty pipeline")
1077 return []
1079 # Get pipeline for specific plate or current plate
1080 target_plate = plate_path or getattr(self.pipeline_editor, 'current_plate', None)
1081 if not target_plate:
1082 logger.warning("No plate specified - using empty pipeline")
1083 return []
1085 # Get pipeline from editor (should return List[FunctionStep] directly)
1086 if hasattr(self.pipeline_editor, 'get_pipeline_for_plate'):
1087 pipeline_steps = self.pipeline_editor.get_pipeline_for_plate(target_plate)
1088 elif hasattr(self.pipeline_editor, 'pipeline_steps'):
1089 # Fallback to current pipeline steps if get_pipeline_for_plate not available
1090 pipeline_steps = getattr(self.pipeline_editor, 'pipeline_steps', [])
1091 else:
1092 logger.warning("Pipeline editor doesn't have expected methods - using empty pipeline")
1093 return []
1095 return pipeline_steps or []
1097 def set_pipeline_editor(self, pipeline_editor):
1098 """
1099 Set the pipeline editor reference.
1101 Args:
1102 pipeline_editor: Pipeline editor widget instance
1103 """
1104 self.pipeline_editor = pipeline_editor
1105 logger.debug("Pipeline editor reference set in plate manager")
1107 async def _start_monitoring(self):
1108 """Start monitoring subprocess execution."""
1109 if not self.current_process:
1110 return
1112 # Simple monitoring - check if process is still running
1113 def check_process():
1114 if self.current_process and self.current_process.poll() is not None:
1115 # Process has finished
1116 return_code = self.current_process.returncode
1117 logger.info(f"Subprocess finished with return code: {return_code}")
1119 # Reset execution state
1120 self.execution_state = "idle"
1121 self.current_process = None
1123 # Update orchestrator states based on return code
1124 for orchestrator in self.orchestrators.values():
1125 if orchestrator.state == OrchestratorState.EXECUTING:
1126 if return_code == 0:
1127 orchestrator._state = OrchestratorState.COMPLETED
1128 else:
1129 orchestrator._state = OrchestratorState.EXEC_FAILED
1131 if return_code == 0:
1132 self.status_message.emit("Execution completed successfully")
1133 else:
1134 self.status_message.emit(f"Execution failed with code {return_code}")
1136 self.update_button_states()
1138 # Emit signal for log viewer
1139 self.subprocess_log_stopped.emit()
1141 return False # Stop monitoring
1142 return True # Continue monitoring
1144 # Monitor process in background
1145 while check_process():
1146 await asyncio.sleep(1) # Check every second
1148 def _on_progress_started(self, max_value: int):
1149 """Handle progress started signal (main thread)."""
1150 self.progress_bar.setVisible(True)
1151 self.progress_bar.setMaximum(max_value)
1152 self.progress_bar.setValue(0)
1154 def _on_progress_updated(self, value: int):
1155 """Handle progress updated signal (main thread)."""
1156 self.progress_bar.setValue(value)
1158 def _on_progress_finished(self):
1159 """Handle progress finished signal (main thread)."""
1160 self.progress_bar.setVisible(False)
1162 def _handle_compilation_error(self, plate_name: str, error_message: str):
1163 """Handle compilation error on main thread (slot)."""
1164 self.service_adapter.show_error_dialog(f"Compilation failed for {plate_name}: {error_message}")
1166 def _handle_initialization_error(self, plate_name: str, error_message: str):
1167 """Handle initialization error on main thread (slot)."""
1168 self.service_adapter.show_error_dialog(f"Failed to initialize {plate_name}: {error_message}")