Coverage for openhcs/textual_tui/widgets/plate_manager.py: 0.0%
961 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2PlateManagerWidget for OpenHCS Textual TUI
4Plate management widget with complete button set and reactive state management.
5Matches the functionality from the current prompt-toolkit TUI.
6"""
8import asyncio
9import copy
10import dataclasses
11import inspect
12import logging
13import numpy as np
14import os
15import re
16import signal
17import subprocess
18import sys
19import tempfile
20import time
21import traceback
22from collections import defaultdict
23from pathlib import Path
24from typing import Dict, List, Optional, Callable, Any, Tuple
26from openhcs.core.config import PipelineConfig
27from openhcs.core.log_utils import get_current_log_file_path
29from PIL import Image
30from textual.reactive import reactive
31from .button_list_widget import ButtonListWidget, ButtonConfig
32from textual import work
34from openhcs.core.config import GlobalPipelineConfig, VFSConfig, MaterializationBackend
35from openhcs.core.pipeline import Pipeline
36from openhcs.io.filemanager import FileManager
37from openhcs.io.zarr import ZarrStorageBackend
38from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
39from openhcs.constants.constants import Backend, VariableComponents, OrchestratorState
40from openhcs.textual_tui.services.file_browser_service import SelectionMode
41from openhcs.textual_tui.services.window_service import WindowService
42from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey, get_path_cache
43from openhcs.introspection.signature_analyzer import SignatureAnalyzer
45logger = logging.getLogger(__name__)
47# Note: Using subprocess approach instead of multiprocessing to avoid TUI FD conflicts
49def get_orchestrator_status_symbol(orchestrator: PipelineOrchestrator) -> str:
50 """Get UI symbol for orchestrator state - simple mapping without over-engineering."""
51 if orchestrator is None:
52 return "?" # No orchestrator (newly added plate)
54 state_to_symbol = {
55 OrchestratorState.CREATED: "?", # Created but not initialized
56 OrchestratorState.READY: "-", # Initialized, ready for compilation
57 OrchestratorState.COMPILED: "o", # Compiled, ready for execution
58 OrchestratorState.EXECUTING: "!", # Execution in progress
59 OrchestratorState.COMPLETED: "C", # Execution completed successfully
60 OrchestratorState.INIT_FAILED: "I", # Initialization failed
61 OrchestratorState.COMPILE_FAILED: "P", # Compilation failed (P for Pipeline)
62 OrchestratorState.EXEC_FAILED: "X", # Execution failed
63 }
65 return state_to_symbol.get(orchestrator.state, "?")
73class PlateManagerWidget(ButtonListWidget):
74 """
75 Plate management widget using Textual reactive state.
76 """
78 # Semantic reactive property (like PipelineEditor's pipeline_steps)
79 selected_plate = reactive("")
80 orchestrators = reactive({})
81 plate_configs = reactive({})
82 orchestrator_state_version = reactive(0) # Increment to trigger UI refresh
84 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig):
85 button_configs = [
86 ButtonConfig("Add", "add_plate"),
87 ButtonConfig("Del", "del_plate", disabled=True),
88 ButtonConfig("Edit", "edit_config", disabled=True), # Unified edit button for config editing
89 ButtonConfig("Init", "init_plate", disabled=True),
90 ButtonConfig("Compile", "compile_plate", disabled=True),
91 ButtonConfig("Run", "run_plate", disabled=True),
92 ButtonConfig("Code", "code_plate", disabled=True), # Generate Python code
93 ButtonConfig("Save", "save_python_script", disabled=True), # Save Python script
94 # ButtonConfig("Export", "export_ome_zarr", disabled=True), # Export to OME-ZARR - HIDDEN FROM UI
95 ]
96 super().__init__(
97 button_configs=button_configs,
98 list_id="plate_content",
99 container_id="plate_list",
100 on_button_pressed=self._handle_button_press,
101 on_selection_changed=self._handle_selection_change,
102 on_item_moved=self._handle_item_moved
103 )
104 self.filemanager = filemanager
105 self.global_config = global_config
106 self.plate_compiled_data = {}
107 self.on_plate_selected: Optional[Callable[[str], None]] = None
108 self.pipeline_editor: Optional['PipelineEditorWidget'] = None
110 # Initialize window service to avoid circular imports
111 self.window_service = None # Will be set in on_mount
113 # --- Subprocess Architecture ---
114 self.current_process: Optional[subprocess.Popen] = None
115 self.zmq_client = None # ZMQ execution client (when using ZMQ mode)
116 self.current_execution_id = None # Track current execution ID for cancellation
117 self.log_file_path: Optional[str] = None # Single source of truth
118 self.log_file_position: int = 0 # Track position in log file for incremental reading
119 # Async monitoring using Textual's interval system
120 self.monitoring_interval = None
121 self.monitoring_active = False
122 # ---
124 logger.debug("PlateManagerWidget initialized")
130 def on_unmount(self) -> None:
131 logger.debug("Unmounting PlateManagerWidget, ensuring worker process is terminated.")
132 # Schedule async stop execution since on_unmount is sync
133 import asyncio
134 if self.current_process and self.current_process.poll() is None:
135 # Create a task to stop execution asynchronously
136 asyncio.create_task(self.action_stop_execution())
137 self._stop_monitoring()
139 def format_item_for_display(self, plate: Dict) -> Tuple[str, str]:
140 # Get status from orchestrator instead of magic string
141 plate_path = plate.get('path', '')
142 orchestrator = self.orchestrators.get(plate_path)
143 status_symbol = get_orchestrator_status_symbol(orchestrator)
145 status_symbols = {
146 "?": "➕", # Created (not initialized)
147 "-": "✅", # Ready (initialized)
148 "o": "⚡", # Compiled
149 "!": "🔄", # Executing
150 "C": "🏁", # Completed
151 "I": "🚫", # Init failed
152 "P": "💥", # Compile failed (Pipeline)
153 "X": "❌" # Execution failed
154 }
155 status_icon = status_symbols.get(status_symbol, "❓")
156 plate_name = plate.get('name', 'Unknown')
157 display_text = f"{status_icon} {plate_name} - {plate_path}"
158 return display_text, plate_path
160 async def _handle_button_press(self, button_id: str) -> None:
161 action_map = {
162 "add_plate": self.action_add_plate,
163 "del_plate": self.action_delete_plate,
164 "edit_config": self.action_edit_config, # Unified edit button
165 "init_plate": self.action_init_plate,
166 "compile_plate": self.action_compile_plate,
167 "code_plate": self.action_code_plate, # Generate Python code
168 "save_python_script": self.action_save_python_script, # Save Python script
169 # "export_ome_zarr": self.action_export_ome_zarr, # HIDDEN
170 }
171 if button_id in action_map:
172 action = action_map[button_id]
173 if inspect.iscoroutinefunction(action):
174 await action()
175 else:
176 action()
177 elif button_id == "run_plate":
178 if self._is_any_plate_running():
179 await self.action_stop_execution()
180 else:
181 await self.action_run_plate()
183 def _handle_selection_change(self, selected_values: List[str]) -> None:
184 logger.debug(f"Checkmarks changed: {len(selected_values)} items selected")
186 def _handle_item_moved(self, from_index: int, to_index: int) -> None:
187 current_plates = list(self.items)
188 plate = current_plates.pop(from_index)
189 current_plates.insert(to_index, plate)
190 self.items = current_plates
191 plate_name = plate['name']
192 direction = "up" if to_index < from_index else "down"
193 self.app.current_status = f"Moved plate '{plate_name}' {direction}"
195 def on_mount(self) -> None:
196 # Initialize window service
197 self.window_service = WindowService(self.app)
199 self.call_later(self._delayed_update_display)
200 self.call_later(self._update_button_states)
202 def watch_items(self, items: List[Dict]) -> None:
203 """Automatically update UI when items changes (follows ButtonListWidget pattern)."""
204 # DEBUG: Log when items list changes to track the source of the reset
205 stack_trace = ''.join(traceback.format_stack()[-3:-1]) # Get last 2 stack frames
206 logger.debug(f"🔍 ITEMS CHANGED: {len(items)} plates. Call stack:\n{stack_trace}")
208 # CRITICAL: Call parent's watch_items to update the SelectionList
209 super().watch_items(items)
211 logger.debug(f"Plates updated: {len(items)} plates")
212 self._update_button_states()
214 def watch_highlighted_item(self, plate_path: str) -> None:
215 self.selected_plate = plate_path
216 logger.debug(f"Highlighted plate: {plate_path}")
218 def watch_selected_plate(self, plate_path: str) -> None:
219 self._update_button_states()
220 if self.on_plate_selected and plate_path:
221 self.on_plate_selected(plate_path)
222 logger.debug(f"Selected plate: {plate_path}")
224 def watch_orchestrator_state_version(self, version: int) -> None:
225 """Automatically refresh UI when orchestrator states change."""
226 # Only update UI if widget is properly mounted
227 if not self.is_mounted:
228 return
230 # Force SelectionList to update by calling _update_selection_list
231 # This re-calls format_item_for_display() for all items
232 self._update_selection_list()
234 # CRITICAL: Update main button states when orchestrator states change
235 self._update_button_states()
237 # Also notify PipelineEditor if connected
238 if self.pipeline_editor:
239 logger.debug(f"PlateManager: Notifying PipelineEditor of orchestrator state change (version {version})")
240 self.pipeline_editor._update_button_states()
242 def get_selection_state(self) -> tuple[List[Dict], str]:
243 # Check if widget is properly mounted first
244 if not self.is_mounted:
245 logger.debug("get_selection_state called on unmounted widget")
246 return [], "empty"
248 try:
249 selection_list = self.query_one(f"#{self.list_id}")
250 multi_selected_values = selection_list.selected
251 if multi_selected_values:
252 selected_items = [p for p in self.items if p.get('path') in multi_selected_values]
253 return selected_items, "checkbox"
254 elif self.selected_plate:
255 selected_items = [p for p in self.items if p.get('path') == self.selected_plate]
256 return selected_items, "cursor"
257 else:
258 return [], "empty"
259 except Exception as e:
260 # DOM CORRUPTION DETECTED - This is a critical error
261 stack_trace = ''.join(traceback.format_stack()[-3:-1])
262 logger.error(f"🚨 DOM CORRUPTION: Failed to get selection state: {e}")
263 logger.error(f"🚨 DOM CORRUPTION: Call stack:\n{stack_trace}")
264 logger.error(f"🚨 DOM CORRUPTION: Widget mounted: {self.is_mounted}")
265 logger.error(f"🚨 DOM CORRUPTION: Looking for: #{self.list_id}")
266 logger.error(f"🚨 DOM CORRUPTION: Plates count: {len(self.items)}")
268 # Try to diagnose what widgets actually exist
269 try:
270 all_widgets = list(self.query("*"))
271 widget_ids = [w.id for w in all_widgets if w.id]
272 logger.error(f"🚨 DOM CORRUPTION: Available widget IDs: {widget_ids}")
273 except Exception as diag_e:
274 logger.error(f"🚨 DOM CORRUPTION: Could not diagnose widgets: {diag_e}")
276 if self.selected_plate:
277 selected_items = [p for p in self.items if p.get('path') == self.selected_plate]
278 return selected_items, "cursor"
279 return [], "empty"
281 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str:
282 count = len(selected_items)
283 if count == 0: return f"No items for {operation}"
284 if count == 1: return f"{operation.title()} item: {selected_items[0].get('name', 'Unknown')}"
285 return f"{operation.title()} {count} items"
287 def _delayed_update_display(self) -> None:
288 """Trigger UI update - no longer needed since reactive system handles this automatically."""
289 # The reactive system now handles updates automatically via watch_plates()
290 # This method is kept for compatibility but does nothing
291 pass
293 def _trigger_ui_refresh(self) -> None:
294 """Force UI refresh when orchestrator state changes without items list changing."""
295 # Increment reactive counter to trigger automatic UI refresh
296 self.orchestrator_state_version += 1
298 def _update_button_states(self) -> None:
299 try:
300 # Check if widget is mounted and buttons exist
301 if not self.is_mounted:
302 return
304 has_selection = bool(self.selected_plate)
305 is_running = self._is_any_plate_running()
307 # Check if there are any selected items (for delete button)
308 selected_items, _ = self.get_selection_state()
309 has_selected_items = bool(selected_items)
311 can_run = has_selection and any(p['path'] in self.plate_compiled_data for p in self.items if p.get('path') == self.selected_plate)
313 # Try to get run button - if it doesn't exist, widget is not fully mounted
314 try:
315 run_button = self.query_one("#run_plate")
316 if is_running:
317 run_button.label = "Stop"
318 run_button.disabled = False
319 else:
320 run_button.label = "Run"
321 run_button.disabled = not can_run
322 except:
323 # Buttons not mounted yet, skip update
324 return
326 self.query_one("#add_plate").disabled = is_running
327 self.query_one("#del_plate").disabled = not self.items or not has_selected_items or is_running
329 # Edit button (config editing) enabled when 1+ orchestrators selected and initialized
330 selected_items, _ = self.get_selection_state()
331 edit_enabled = (
332 len(selected_items) > 0 and
333 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
334 not is_running
335 )
336 self.query_one("#edit_config").disabled = not edit_enabled
338 # Init button - enabled when plates are selected, can be initialized, and not running
339 init_enabled = (
340 len(selected_items) > 0 and
341 any(self._can_orchestrator_be_initialized(item['path']) for item in selected_items) and
342 not is_running
343 )
344 self.query_one("#init_plate").disabled = not init_enabled
346 # Compile button - enabled when plates are selected, initialized, and not running
347 selected_items, _ = self.get_selection_state()
348 compile_enabled = (
349 len(selected_items) > 0 and
350 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
351 not is_running
352 )
353 self.query_one("#compile_plate").disabled = not compile_enabled
355 # Code button - enabled when plates are selected, initialized, and not running
356 code_enabled = (
357 len(selected_items) > 0 and
358 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
359 not is_running
360 )
361 self.query_one("#code_plate").disabled = not code_enabled
363 # Save Python script button - enabled when plates are selected, initialized, and not running
364 save_enabled = (
365 len(selected_items) > 0 and
366 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
367 not is_running
368 )
369 self.query_one("#save_python_script").disabled = not save_enabled
371 # Export button - enabled when plate is initialized and has workspace (HIDDEN FROM UI)
372 # export_enabled = (
373 # has_selection and
374 # self.selected_plate in self.orchestrators and
375 # not is_running
376 # )
377 # try:
378 # self.query_one("#export_ome_zarr").disabled = not export_enabled
379 # except:
380 # pass # Button is hidden from UI
382 # Debug button removed - no longer needed
384 except Exception as e:
385 # Only log if it's not a mounting/unmounting issue
386 if self.is_mounted:
387 logger.debug(f"Button state update skipped (widget not ready): {e}")
388 # Don't log errors during unmounting
390 def _is_any_plate_running(self) -> bool:
391 return self.current_process is not None and self.current_process.poll() is None
393 def _has_pipelines(self, plates: List[Dict]) -> bool:
394 """Check if all plates have pipeline definitions."""
395 if not self.pipeline_editor:
396 return False
398 for plate in plates:
399 pipeline = self.pipeline_editor.get_pipeline_for_plate(plate['path'])
400 if not pipeline:
401 return False
402 return True
404 def get_plate_status(self, plate_path: str) -> str:
405 """Get status for specific plate - now uses orchestrator state."""
406 orchestrator = self.orchestrators.get(plate_path)
407 return get_orchestrator_status_symbol(orchestrator)
409 def _is_orchestrator_initialized(self, plate_path: str) -> bool:
410 """Check if orchestrator exists and is in an initialized state."""
411 orchestrator = self.orchestrators.get(plate_path)
412 if orchestrator is None:
413 return False
414 return orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED,
415 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED,
416 OrchestratorState.EXEC_FAILED]
418 def _can_orchestrator_be_initialized(self, plate_path: str) -> bool:
419 """Check if orchestrator can be initialized (doesn't exist or is in a re-initializable state)."""
420 orchestrator = self.orchestrators.get(plate_path)
421 if orchestrator is None:
422 return True # No orchestrator exists, can be initialized
423 return orchestrator.state in [OrchestratorState.CREATED, OrchestratorState.INIT_FAILED]
425 def _notify_pipeline_editor_status_change(self, plate_path: str, new_status: str) -> None:
426 """Notify pipeline editor when plate status changes (enables Add button immediately)."""
427 if self.pipeline_editor and self.pipeline_editor.current_plate == plate_path:
428 # Update pipeline editor's status and trigger button state update
429 self.pipeline_editor.current_plate_status = new_status
431 def _get_current_pipeline_definition(self, plate_path: str = None) -> List:
432 """Get current pipeline definition from PipelineEditor (now returns FunctionStep objects directly)."""
433 if not self.pipeline_editor:
434 logger.warning("No pipeline editor reference - using empty pipeline")
435 return []
437 # Get pipeline for specific plate or current plate
438 target_plate = plate_path or self.pipeline_editor.current_plate
439 if not target_plate:
440 logger.warning("No plate specified - using empty pipeline")
441 return []
443 # Get pipeline from editor (now returns List[FunctionStep] directly)
444 pipeline_steps = self.pipeline_editor.get_pipeline_for_plate(target_plate)
446 # No conversion needed - pipeline_steps are already FunctionStep objects with memory type decorators
447 return pipeline_steps
449 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str:
450 """Generate human-readable description of what will be operated on."""
451 count = len(selected_items)
452 if selection_mode == "empty":
453 return f"No items available for {operation}"
454 elif selection_mode == "all":
455 return f"{operation.title()} ALL {count} items"
456 elif selection_mode == "checkbox":
457 if count == 1:
458 item_name = selected_items[0].get('name', 'Unknown')
459 return f"{operation.title()} selected item: {item_name}"
460 else:
461 return f"{operation.title()} {count} selected items"
462 else:
463 return f"{operation.title()} {count} items"
465 def _reset_execution_state(self, status_message: str, force_fail_executing: bool = True):
466 if self.current_process:
467 if self.current_process.poll() is None: # Still running
468 logger.warning("Forcefully terminating subprocess during reset.")
469 self.current_process.terminate()
470 try:
471 self.current_process.wait(timeout=1)
472 except subprocess.TimeoutExpired:
473 self.current_process.kill() # Force kill if terminate fails
474 self.current_process = None
476 # Clear log file reference (no temp files - log file is single source of truth)
477 self.log_file_path = None
478 self.log_file_position = 0
480 # Stop async monitoring
481 self._stop_monitoring()
483 # Only reset executing orchestrators to failed if this is a forced termination
484 # Natural completion should preserve the states set by the completion handler
485 if force_fail_executing:
486 for plate_path, orchestrator in self.orchestrators.items():
487 if orchestrator.state == OrchestratorState.EXECUTING:
488 orchestrator._state = OrchestratorState.EXEC_FAILED
490 # Trigger UI refresh after state changes - this is essential for button states
491 self._trigger_ui_refresh()
493 # Update button states - but only if widget is properly mounted
494 try:
495 if self.is_mounted and hasattr(self, 'query_one'):
496 self._update_button_states()
497 except Exception as e:
498 logger.error(f"Failed to update button states during reset: {e}")
500 self.app.current_status = status_message
502 async def action_run_plate(self) -> None:
503 # Clear logs from singleton toolong window before starting new run
504 try:
505 from openhcs.textual_tui.windows.toolong_window import clear_toolong_logs
506 logger.info("Clearing logs from singleton toolong window before new run")
507 clear_toolong_logs(self.app)
508 logger.info("Toolong logs cleared")
509 except Exception as e:
510 logger.error(f"Failed to clear toolong logs: {e}")
511 import traceback
512 logger.error(traceback.format_exc())
514 selected_items, _ = self.get_selection_state()
515 if not selected_items:
516 self.app.show_error("No plates selected to run.")
517 return
519 ready_items = [item for item in selected_items if item.get('path') in self.plate_compiled_data]
520 if not ready_items:
521 self.app.show_error("Selected plates are not compiled. Please compile first.")
522 return
524 await self._run_plates_zmq(ready_items)
526 def _start_log_monitoring(self) -> None:
527 """Start reactive log monitoring for subprocess logs."""
528 if not self.log_file_path:
529 logger.warning("Cannot start log monitoring: no log file path")
530 return
532 try:
533 # Extract base path from log file path (remove .log extension)
534 log_path = Path(self.log_file_path)
535 base_log_path = str(log_path.with_suffix(''))
537 # Notify status bar to start log monitoring
538 if hasattr(self.app, 'status_bar') and self.app.status_bar:
539 self.app.status_bar.start_log_monitoring(base_log_path)
540 logger.debug(f"Started reactive log monitoring for: {base_log_path}")
541 else:
542 logger.warning("Status bar not available for log monitoring")
544 except Exception as e:
545 logger.error(f"Failed to start log monitoring: {e}")
547 def _stop_log_monitoring(self) -> None:
548 """Stop reactive log monitoring."""
549 try:
550 # Notify status bar to stop log monitoring
551 if hasattr(self.app, 'status_bar') and self.app.status_bar:
552 self.app.status_bar.stop_log_monitoring()
553 logger.debug("Stopped reactive log monitoring")
554 except Exception as e:
555 logger.error(f"Failed to stop log monitoring: {e}")
557 def _get_current_log_position(self) -> int:
558 """Get current position in log file."""
559 if not self.log_file_path or not Path(self.log_file_path).exists():
560 return 0
561 try:
562 return Path(self.log_file_path).stat().st_size
563 except Exception:
564 return 0
568 def _stop_file_watcher(self) -> None:
569 """Stop file system watcher without blocking."""
570 if not self.file_observer:
571 return
573 try:
574 # Just stop and abandon - don't wait for anything
575 self.file_observer.stop()
576 except Exception:
577 pass # Ignore errors
578 finally:
579 # Always clear references immediately
580 self.file_observer = None
581 self.file_watcher = None
585 async def _start_monitoring(self) -> None:
586 """Start async monitoring using Textual's interval system."""
587 # Stop any existing monitoring
588 self._stop_monitoring()
590 if self.monitoring_active:
591 return
593 self.monitoring_active = True
594 # Use Textual's set_interval for periodic async monitoring
595 self.monitoring_interval = self.set_interval(
596 10.0, # Check every 10 seconds
597 self._check_process_status_async,
598 pause=False
599 )
600 logger.debug("Started async process monitoring")
602 def _stop_monitoring(self) -> None:
603 """Stop async monitoring."""
604 if self.monitoring_interval:
605 self.monitoring_interval.stop()
606 self.monitoring_interval = None
607 self.monitoring_active = False
609 # Also stop log monitoring
610 self._stop_log_monitoring()
612 logger.debug("Stopped async process monitoring")
614 async def _check_process_status_async(self) -> None:
615 """Async process status check - replaces worker thread."""
616 if not self.monitoring_active:
617 return
619 try:
620 # Simple direct access - no threading, no locks needed
621 if not self._is_any_plate_running():
622 logger.debug("🔥 MONITOR: Subprocess finished")
624 # Stop monitoring first
625 self._stop_monitoring()
627 # Handle completion directly - no call_from_thread needed
628 await self._handle_process_completion()
630 except Exception as e:
631 logger.debug(f"Error in async process monitoring: {e}")
632 # Continue monitoring on error
634 async def _handle_process_completion(self) -> None:
635 """Handle subprocess completion - read from log file (single source of truth)."""
636 # Determine success/failure from log file content (single source of truth)
637 success = False
639 if self.log_file_path and Path(self.log_file_path).exists():
640 try:
641 # Read log file directly to check for success markers
642 with open(self.log_file_path, 'r') as f:
643 log_content = f.read()
644 # Look for success markers in the log
645 has_execution_success = "🔥 SUBPROCESS: EXECUTION SUCCESS:" in log_content
646 has_all_completed = "All plates completed successfully" in log_content
647 if has_execution_success and has_all_completed:
648 success = True
650 except Exception as e:
651 logger.error(f"Error reading subprocess log file: {e}")
652 success = False
654 # Clean up the subprocess
655 logger.info("🔥 MONITOR: Starting process cleanup...")
656 if self.current_process:
657 try:
658 self.current_process.wait() # Clean up the zombie process
659 logger.info("🔥 MONITOR: Process cleanup completed")
660 except Exception as e:
661 logger.warning(f"🔥 MONITOR: Error during process cleanup: {e}")
663 # Update orchestrator states based on log file analysis (single source of truth)
664 if success:
665 # Success - update orchestrators to completed
666 for plate_path, orchestrator in self.orchestrators.items():
667 if orchestrator.state == OrchestratorState.EXECUTING:
668 orchestrator._state = OrchestratorState.COMPLETED
670 # Reset execution state (this will trigger UI refresh internally)
671 self._reset_execution_state("Execution completed successfully.", force_fail_executing=False)
672 else:
673 # Failure - update orchestrators to failed
674 for plate_path, orchestrator in self.orchestrators.items():
675 if orchestrator.state == OrchestratorState.EXECUTING:
676 orchestrator._state = OrchestratorState.EXEC_FAILED
678 # Reset execution state (this will trigger UI refresh internally)
679 self._reset_execution_state("Execution failed.", force_fail_executing=False)
681 self._stop_monitoring() # Stop monitoring since process is done
683 async def _read_log_file_incremental(self) -> None:
684 """Read new content from the log file since last read."""
685 if not self.log_file_path:
686 self.app.current_status = "🔥 LOG READER: No log file"
687 return
689 try:
690 # Wrap all file I/O operations in executor to avoid blocking UI
691 def _read_log_content():
692 if not Path(self.log_file_path).exists():
693 return None, self.log_file_position
695 with open(self.log_file_path, 'r') as f:
696 # Seek to where we left off
697 f.seek(self.log_file_position)
698 new_content = f.read()
699 # Update position for next read
700 new_position = f.tell()
702 return new_content, new_position
704 new_content, new_position = await asyncio.get_event_loop().run_in_executor(None, _read_log_content)
705 self.log_file_position = new_position
707 if new_content is None:
708 self.app.current_status = "🔥 LOG READER: No log file"
709 return
711 if new_content and new_content.strip():
712 # Get the last non-empty line from new content
713 lines = new_content.strip().split('\n')
714 non_empty_lines = [line.strip() for line in lines if line.strip()]
716 if non_empty_lines:
717 # Show the last line, truncated if too long
718 last_line = non_empty_lines[-1]
719 if len(last_line) > 100:
720 last_line = last_line[:97] + "..."
722 self.app.current_status = last_line
723 else:
724 self.app.current_status = "🔥 LOG READER: No lines found"
725 else:
726 self.app.current_status = "🔥 LOG READER: No new content"
728 except Exception as e:
729 self.app.current_status = f"🔥 LOG READER ERROR: {e}"
733 async def _run_plates_zmq(self, ready_items) -> None:
734 """Run plates using ZMQ execution client (recommended)."""
735 try:
736 from openhcs.runtime.zmq_execution_client import ZMQExecutionClient
738 plate_paths_to_run = [item['path'] for item in ready_items]
739 logger.info(f"Starting ZMQ execution for {len(plate_paths_to_run)} plates")
741 # Create ZMQ client (non-persistent mode for UI-managed execution)
742 self.zmq_client = ZMQExecutionClient(
743 port=7777,
744 persistent=False, # UI manages lifecycle
745 progress_callback=self._on_zmq_progress
746 )
748 # Connect to server (will spawn if needed)
749 def _connect():
750 return self.zmq_client.connect(timeout=15)
752 import asyncio
753 loop = asyncio.get_event_loop()
754 connected = await loop.run_in_executor(None, _connect)
756 if not connected:
757 raise RuntimeError("Failed to connect to ZMQ execution server")
759 logger.info("Connected to ZMQ execution server")
761 # Update orchestrator states to show running state
762 for plate in ready_items:
763 plate_path = plate['path']
764 if plate_path in self.orchestrators:
765 self.orchestrators[plate_path]._state = OrchestratorState.EXECUTING
767 self._trigger_ui_refresh()
768 self.app.current_status = f"Running {len(ready_items)} plate(s) via ZMQ..."
769 self._update_button_states()
771 # Execute each plate
772 for plate_path in plate_paths_to_run:
773 definition_pipeline = self._get_current_pipeline_definition(plate_path)
775 # Get effective config for this plate
776 effective_config = self.app.global_config
777 from openhcs.core.config import PipelineConfig
778 pipeline_config = PipelineConfig()
780 logger.info(f"Executing plate: {plate_path}")
782 # Execute via ZMQ (in executor to avoid blocking UI)
783 def _execute():
784 return self.zmq_client.execute_pipeline(
785 plate_id=str(plate_path),
786 pipeline_steps=definition_pipeline,
787 global_config=effective_config,
788 pipeline_config=pipeline_config
789 )
791 response = await loop.run_in_executor(None, _execute)
793 # Track execution ID for cancellation
794 if response.get('execution_id'):
795 self.current_execution_id = response['execution_id']
797 logger.info(f"Plate {plate_path} execution response: {response.get('status')}")
799 if response.get('status') != 'complete':
800 error_msg = response.get('message', 'Unknown error')
801 logger.error(f"Plate {plate_path} execution failed: {error_msg}")
802 self.app.show_error(f"Execution failed for {plate_path}: {error_msg}")
804 # Execution complete
805 self.current_execution_id = None
806 self.app.current_status = f"Completed {len(ready_items)} plate(s)"
808 # Update orchestrator states
809 for plate in ready_items:
810 plate_path = plate['path']
811 if plate_path in self.orchestrators:
812 self.orchestrators[plate_path]._state = OrchestratorState.EXEC_COMPLETE
814 self._trigger_ui_refresh()
815 self._update_button_states()
817 # Disconnect from server
818 def _disconnect():
819 self.zmq_client.disconnect()
821 await loop.run_in_executor(None, _disconnect)
822 self.zmq_client = None
824 except Exception as e:
825 logger.error(f"Failed to execute plates via ZMQ: {e}", exc_info=True)
826 self.app.show_error(f"Failed to execute: {e}")
827 self.current_execution_id = None
828 self._reset_execution_state("ZMQ execution failed")
830 # Cleanup ZMQ client
831 if hasattr(self, 'zmq_client') and self.zmq_client:
832 try:
833 self.zmq_client.disconnect()
834 except:
835 pass
836 self.zmq_client = None
838 def _on_zmq_progress(self, message):
839 """Handle progress updates from ZMQ execution server."""
840 try:
841 well_id = message.get('well_id', 'unknown')
842 step = message.get('step', 'unknown')
843 status = message.get('status', 'unknown')
845 # Update status in TUI
846 progress_text = f"[{well_id}] {step}: {status}"
847 self.app.current_status = progress_text
848 logger.debug(f"Progress: {progress_text}")
850 except Exception as e:
851 logger.warning(f"Failed to handle progress update: {e}")
853 async def action_stop_execution(self) -> None:
854 logger.info("🛑 Stop button pressed.")
855 self.app.current_status = "Terminating execution..."
857 # Stop async monitoring first
858 self._stop_monitoring()
860 # Check if using ZMQ execution
861 if self.zmq_client:
862 try:
863 logger.info("🛑 Requesting graceful cancellation via ZMQ...")
865 import asyncio
866 loop = asyncio.get_event_loop()
868 # Cancel specific execution if we have an ID
869 if self.current_execution_id:
870 logger.info(f"🛑 Cancelling execution {self.current_execution_id}")
872 def _cancel():
873 return self.zmq_client.cancel_execution(self.current_execution_id)
875 response = await loop.run_in_executor(None, _cancel)
877 if response.get('status') == 'ok':
878 logger.info("🛑 Cancellation request accepted, waiting for graceful shutdown...")
879 self.app.current_status = "Cancellation requested, waiting..."
881 # Wait for graceful cancellation with timeout
882 timeout = 5 # seconds
883 start_time = asyncio.get_event_loop().time()
885 while (asyncio.get_event_loop().time() - start_time) < timeout:
886 # Check if execution is still running
887 def _check_status():
888 return self.zmq_client.get_status(self.current_execution_id)
890 status_response = await loop.run_in_executor(None, _check_status)
892 if status_response.get('status') == 'error':
893 # Execution no longer exists (completed or cancelled)
894 logger.info("🛑 Execution completed/cancelled gracefully")
895 break
897 await asyncio.sleep(0.5)
898 else:
899 # Timeout reached - execution still running
900 logger.warning("🛑 Graceful cancellation timeout - execution may still be running")
901 self.app.current_status = "Cancellation timeout - execution may still be running"
902 else:
903 logger.warning(f"🛑 Cancellation failed: {response.get('message')}")
904 self.app.current_status = f"Cancellation failed: {response.get('message')}"
906 # Disconnect client
907 def _disconnect():
908 self.zmq_client.disconnect()
910 await loop.run_in_executor(None, _disconnect)
912 self.zmq_client = None
913 self.current_execution_id = None
914 self._reset_execution_state("Execution cancelled by user")
916 except Exception as e:
917 logger.error(f"🛑 Error cancelling ZMQ execution: {e}")
918 self.app.show_error(f"Failed to cancel execution: {e}")
920 elif self.current_process and self.current_process.poll() is None: # Still running subprocess
921 try:
922 # Kill the entire process group, not just the parent process
923 # The subprocess creates its own process group, so we need to kill that group
924 logger.info(f"🛑 Killing process group for PID {self.current_process.pid}...")
926 # Get the process group ID (should be same as PID since subprocess calls os.setpgrp())
927 process_group_id = self.current_process.pid
929 # Kill entire process group (negative PID kills process group)
930 os.killpg(process_group_id, signal.SIGTERM)
932 # Give processes time to exit gracefully
933 await asyncio.sleep(1)
935 # Force kill if still alive
936 try:
937 os.killpg(process_group_id, signal.SIGKILL)
938 logger.info(f"🛑 Force killed process group {process_group_id}")
939 except ProcessLookupError:
940 logger.info(f"🛑 Process group {process_group_id} already terminated")
942 except Exception as e:
943 logger.warning(f"🛑 Error killing process group: {e}, falling back to single process kill")
944 # Fallback to killing just the main process
945 self.current_process.kill()
947 self._reset_execution_state("Execution terminated by user.")
951 async def action_add_plate(self) -> None:
952 """Handle Add Plate button."""
953 await self._open_plate_directory_browser()
955 async def action_export_ome_zarr(self) -> None:
956 """Export selected plate to OME-ZARR format."""
957 if not self.selected_plate:
958 self.app.show_error("No Selection", "Please select a plate to export.")
959 return
961 # Get the orchestrator for the selected plate
962 orchestrator = self.orchestrators.get(self.selected_plate)
963 if not orchestrator:
964 self.app.show_error("Not Initialized", "Please initialize the plate before exporting.")
965 return
967 # Open file browser for export location
968 def handle_export_result(selected_paths):
969 if selected_paths:
970 export_path = Path(selected_paths[0]) if isinstance(selected_paths, list) else Path(selected_paths)
971 self._start_ome_zarr_export(orchestrator, export_path)
973 await self.window_service.open_file_browser(
974 file_manager=self.filemanager,
975 initial_path=get_cached_browser_path(PathCacheKey.GENERAL),
976 backend=Backend.DISK,
977 title="Select OME-ZARR Export Directory",
978 mode="save",
979 selection_mode=SelectionMode.DIRECTORIES_ONLY,
980 cache_key=PathCacheKey.GENERAL,
981 on_result_callback=handle_export_result,
982 caller_id="plate_manager_export"
983 )
985 def _start_ome_zarr_export(self, orchestrator, export_path: Path):
986 """Start OME-ZARR export process."""
987 async def run_export():
988 try:
989 self.app.current_status = f"Exporting to OME-ZARR: {export_path}"
991 # Create export-specific config with ZARR materialization
992 from openhcs.core.config import GlobalPipelineConfig
993 from openhcs.config_framework.global_config import get_current_global_config
994 export_config = get_current_global_config(GlobalPipelineConfig)
995 export_vfs_config = VFSConfig(
996 intermediate_backend=export_config.vfs_config.intermediate_backend,
997 materialization_backend=MaterializationBackend.ZARR
998 )
1000 # Update orchestrator config for export
1001 export_global_config = dataclasses.replace(export_config, vfs=export_vfs_config)
1003 # Create zarr backend with OME-ZARR enabled
1004 zarr_backend = ZarrStorageBackend(ome_zarr_metadata=True)
1006 # Copy processed data from current workspace/plate to OME-ZARR format
1007 # For OpenHCS format, workspace_path is None, so use input_dir (plate path)
1008 source_path = orchestrator.workspace_path or orchestrator.input_dir
1009 if source_path and source_path.exists():
1010 # Find processed images in workspace/plate
1011 processed_images = list(source_path.rglob("*.tif"))
1013 if processed_images:
1014 # Group by well for batch operations
1015 wells_data = defaultdict(list)
1017 for img_path in processed_images:
1018 # Extract well from filename
1019 well_match = None
1020 # Try ImageXpress pattern: A01_s001_w1_z001.tif
1021 match = re.search(r'([A-Z]\d{2})_', img_path.name)
1022 if match:
1023 well_id = match.group(1)
1024 wells_data[well_id].append(img_path)
1026 # Export each well to OME-ZARR
1027 export_store_path = export_path / "plate.zarr"
1029 for well_id, well_images in wells_data.items():
1030 # Load images
1031 images = []
1032 for img_path in well_images:
1033 img = Image.open(img_path)
1034 images.append(np.array(img))
1036 # Create output paths for OME-ZARR structure
1037 output_paths = [export_store_path / f"{well_id}_{i:03d}.tif"
1038 for i in range(len(images))]
1040 # Save to OME-ZARR format
1041 zarr_backend.save_batch(images, output_paths, chunk_name=well_id)
1043 self.app.current_status = f"✅ OME-ZARR export completed: {export_store_path}"
1044 else:
1045 self.app.show_error("No Data", "No processed images found in workspace.")
1046 else:
1047 self.app.show_error("No Workspace", "Plate workspace not found. Run pipeline first.")
1049 except Exception as e:
1050 logger.error(f"OME-ZARR export failed: {e}", exc_info=True)
1051 self.app.show_error("Export Failed", f"OME-ZARR export failed: {str(e)}")
1053 # Run export in background
1054 asyncio.create_task(run_export())
1056 # Debug functionality removed - no longer needed
1058 async def _open_plate_directory_browser(self):
1059 """Open textual-window file browser for plate directory selection."""
1060 # Get cached path for better UX - remembers last used directory
1061 path_cache = get_path_cache()
1062 initial_path = path_cache.get_initial_path(PathCacheKey.PLATE_IMPORT, Path.home())
1064 # Open textual-window file browser for directory selection
1065 await self.window_service.open_file_browser(
1066 file_manager=self.filemanager,
1067 initial_path=initial_path,
1068 backend=Backend.DISK,
1069 title="Select Plate Directory",
1070 mode="load",
1071 selection_mode=SelectionMode.DIRECTORIES_ONLY,
1072 cache_key=PathCacheKey.PLATE_IMPORT,
1073 on_result_callback=self._add_plate_callback,
1074 caller_id="plate_manager",
1075 enable_multi_selection=True
1076 )
1078 def _add_plate_callback(self, selected_paths) -> None:
1079 """Handle directory selection from file browser."""
1080 logger.debug(f"_add_plate_callback called with: {selected_paths} (type: {type(selected_paths)})")
1082 if selected_paths is None or selected_paths is False:
1083 self.app.current_status = "Plate selection cancelled"
1084 return
1086 # Handle both single path and list of paths
1087 if not isinstance(selected_paths, list):
1088 selected_paths = [selected_paths]
1090 added_plates = []
1091 current_plates = list(self.items)
1093 for selected_path in selected_paths:
1094 # Ensure selected_path is a Path object
1095 if isinstance(selected_path, str):
1096 selected_path = Path(selected_path)
1097 elif not isinstance(selected_path, Path):
1098 selected_path = Path(str(selected_path))
1100 # Check if plate already exists
1101 if any(plate['path'] == str(selected_path) for plate in current_plates):
1102 continue
1104 # Add the plate to the list
1105 plate_name = selected_path.name
1106 plate_path = str(selected_path)
1107 plate_entry = {
1108 'name': plate_name,
1109 'path': plate_path,
1110 # No status field - state comes from orchestrator
1111 }
1113 current_plates.append(plate_entry)
1114 added_plates.append(plate_name)
1116 # Cache the parent directory for next time (save user navigation time)
1117 if selected_paths:
1118 # Use parent of first selected path as the cached directory
1119 first_path = selected_paths[0] if isinstance(selected_paths[0], Path) else Path(selected_paths[0])
1120 parent_dir = first_path.parent
1121 get_path_cache().set_cached_path(PathCacheKey.PLATE_IMPORT, parent_dir)
1123 # Update items list using reactive property (triggers automatic UI update)
1124 self.items = current_plates
1126 if added_plates:
1127 if len(added_plates) == 1:
1128 self.app.current_status = f"Added plate: {added_plates[0]}"
1129 else:
1130 self.app.current_status = f"Added {len(added_plates)} plates: {', '.join(added_plates)}"
1131 else:
1132 self.app.current_status = "No new plates added (duplicates skipped)"
1134 def action_delete_plate(self) -> None:
1135 selected_items, _ = self.get_selection_state()
1136 if not selected_items:
1137 self.app.show_error("No plate selected to delete.")
1138 return
1140 paths_to_delete = {p['path'] for p in selected_items}
1141 self.items = [p for p in self.items if p['path'] not in paths_to_delete]
1143 # Clean up orchestrators for deleted plates
1144 for path in paths_to_delete:
1145 if path in self.orchestrators:
1146 del self.orchestrators[path]
1148 if self.selected_plate in paths_to_delete:
1149 self.selected_plate = ""
1151 self.app.current_status = f"Deleted {len(paths_to_delete)} plate(s)"
1155 async def action_edit_config(self) -> None:
1156 """
1157 Handle Edit button - create per-orchestrator PipelineConfig instances.
1159 This enables per-orchestrator configuration without affecting global configuration.
1160 Shows resolved defaults from GlobalPipelineConfig with "Pipeline default: {value}" placeholders.
1161 """
1162 selected_items, selection_mode = self.get_selection_state()
1164 if selection_mode == "empty":
1165 self.app.current_status = "No orchestrators selected for configuration"
1166 return
1168 selected_orchestrators = [
1169 self.orchestrators[item['path']] for item in selected_items
1170 if item['path'] in self.orchestrators
1171 ]
1173 if not selected_orchestrators:
1174 self.app.current_status = "No initialized orchestrators selected"
1175 return
1177 # Load existing config or create new one for editing
1178 representative_orchestrator = selected_orchestrators[0]
1180 # Use orchestrator's existing config if it exists, otherwise use global config as source
1181 source_config = representative_orchestrator.pipeline_config or self.global_config
1183 current_plate_config = create_dataclass_for_editing(PipelineConfig, source_config)
1185 def handle_config_save(new_config: PipelineConfig) -> None:
1186 """Apply per-orchestrator configuration without global side effects."""
1187 for orchestrator in selected_orchestrators:
1188 # Direct synchronous call - no async needed
1189 orchestrator.apply_pipeline_config(new_config)
1190 count = len(selected_orchestrators)
1191 self.app.current_status = f"Per-orchestrator configuration applied to {count} orchestrator(s)"
1193 # Open configuration window using PipelineConfig (not GlobalPipelineConfig)
1194 await self.window_service.open_config_window(
1195 PipelineConfig,
1196 current_plate_config,
1197 on_save_callback=handle_config_save
1198 )
1200 async def action_edit_global_config(self) -> None:
1201 """
1202 Handle global configuration editing - affects all orchestrators.
1204 This maintains the existing global configuration workflow but uses lazy loading.
1205 """
1207 from openhcs.core.config import PipelineConfig
1208 from openhcs.config_framework.lazy_factory import create_dataclass_for_editing
1210 # Get current global config from app or use default
1211 current_global_config = self.app.global_config or GlobalPipelineConfig()
1213 # Create lazy PipelineConfig for editing with proper thread-local context
1214 current_lazy_config = create_dataclass_for_editing(PipelineConfig, current_global_config, preserve_values=True)
1216 def handle_global_config_save(new_config: PipelineConfig) -> None:
1217 """Apply global configuration to all orchestrators."""
1218 # Convert lazy PipelineConfig back to GlobalPipelineConfig
1219 global_config = new_config.to_base_config()
1221 self.app.global_config = global_config # Update app-level config
1223 # REMOVED: Thread-local modification - dual-axis resolver handles context automatically
1225 for orchestrator in self.orchestrators.values():
1226 asyncio.create_task(orchestrator.apply_new_global_config(global_config))
1227 self.app.current_status = "Global configuration applied to all orchestrators"
1229 # PipelineConfig already imported from openhcs.core.config
1230 await self.window_service.open_config_window(
1231 PipelineConfig,
1232 current_lazy_config,
1233 on_save_callback=handle_global_config_save
1234 )
1238 def _analyze_orchestrator_configs(self, orchestrators: List['PipelineOrchestrator']) -> Dict[str, Dict[str, Any]]:
1239 """Analyze configs across multiple orchestrators to detect same/different values.
1241 Args:
1242 orchestrators: List of PipelineOrchestrator instances
1244 Returns:
1245 Dict mapping field names to analysis results:
1246 - {"type": "same", "value": actual_value, "default": default_value}
1247 - {"type": "different", "values": [val1, val2, ...], "default": default_value}
1248 """
1249 if not orchestrators:
1250 return {}
1252 # Get parameter info for defaults
1253 param_info = SignatureAnalyzer.analyze(GlobalPipelineConfig)
1255 config_analysis = {}
1257 # Analyze each field in GlobalPipelineConfig
1258 for field in dataclasses.fields(GlobalPipelineConfig):
1259 field_name = field.name
1261 # Get values from all orchestrators
1262 values = []
1263 for orch in orchestrators:
1264 try:
1265 value = getattr(orch.global_config, field_name)
1266 values.append(value)
1267 except AttributeError:
1268 # Field doesn't exist in this config, skip
1269 continue
1271 if not values:
1272 continue
1274 # Get default value from parameter info
1275 param_details = param_info.get(field_name)
1276 default_value = param_details.default_value if param_details else None
1278 # Check if all values are the same
1279 if all(self._values_equal(v, values[0]) for v in values):
1280 config_analysis[field_name] = {
1281 "type": "same",
1282 "value": values[0],
1283 "default": default_value
1284 }
1285 else:
1286 config_analysis[field_name] = {
1287 "type": "different",
1288 "values": values,
1289 "default": default_value
1290 }
1292 return config_analysis
1294 def _values_equal(self, val1: Any, val2: Any) -> bool:
1295 """Check if two values are equal, handling dataclasses and complex types."""
1296 # Handle dataclass comparison
1297 if dataclasses.is_dataclass(val1) and dataclasses.is_dataclass(val2):
1298 return dataclasses.asdict(val1) == dataclasses.asdict(val2)
1300 # Handle regular comparison
1301 return val1 == val2
1303 def action_init_plate(self) -> None:
1304 """Handle Init Plate button - initialize selected plates."""
1305 # Get current selection state
1306 selected_items, selection_mode = self.get_selection_state()
1308 if selection_mode == "empty":
1309 logger.warning("No plates available for initialization")
1310 return
1312 # Validate all selected plates can be initialized (allow ALL failed plates to be re-initialized)
1313 invalid_plates = []
1314 for item in selected_items:
1315 plate_path = item['path']
1316 orchestrator = self.orchestrators.get(plate_path)
1317 # Only block plates that are currently executing - all other states can be re-initialized
1318 if orchestrator is not None and orchestrator.state == OrchestratorState.EXECUTING:
1319 invalid_plates.append(item)
1321 if invalid_plates:
1322 names = [item['name'] for item in invalid_plates]
1323 logger.warning(f"Cannot initialize plates that are currently executing: {', '.join(names)}")
1324 return
1326 # Start async initialization
1327 self._start_async_init(selected_items, selection_mode)
1329 def _start_async_init(self, selected_items: List[Dict], selection_mode: str) -> None:
1330 """Start async initialization of selected plates."""
1331 # Generate operation description
1332 desc = self.get_operation_description(selected_items, selection_mode, "initialize")
1333 logger.info(f"Initializing: {desc}")
1335 # Start background worker
1336 self._init_plates_worker(selected_items)
1338 @work(exclusive=True)
1339 async def _init_plates_worker(self, selected_items: List[Dict]) -> None:
1340 """Background worker for plate initialization."""
1341 for plate_data in selected_items:
1342 plate_path = plate_data['path']
1344 # Find the actual plate in self.items (not the copy from get_selection_state)
1345 actual_plate = None
1346 for plate in self.items:
1347 if plate['path'] == plate_path:
1348 actual_plate = plate
1349 break
1351 if not actual_plate:
1352 logger.error(f"Plate not found in plates list: {plate_path}")
1353 continue
1355 try:
1356 # Run heavy initialization in executor to avoid blocking UI
1357 def init_orchestrator():
1358 return PipelineOrchestrator(
1359 plate_path=plate_path,
1360 global_config=self.global_config,
1361 storage_registry=self.filemanager.registry
1362 ).initialize()
1364 orchestrator = await asyncio.get_event_loop().run_in_executor(None, init_orchestrator)
1366 # Store orchestrator for later use (channel selection, etc.)
1367 self.orchestrators[plate_path] = orchestrator
1368 # Orchestrator state is already set to READY by initialize() method
1369 logger.info(f"Plate {actual_plate['name']} initialized successfully")
1371 except Exception as e:
1372 logger.error(f"Failed to initialize plate {plate_path}: {e}", exc_info=True)
1373 # Create a failed orchestrator to track the error state
1374 failed_orchestrator = PipelineOrchestrator(
1375 plate_path=plate_path,
1376 global_config=self.global_config,
1377 storage_registry=self.filemanager.registry
1378 )
1379 failed_orchestrator._state = OrchestratorState.INIT_FAILED
1380 self.orchestrators[plate_path] = failed_orchestrator
1381 actual_plate['error'] = str(e)
1383 # Trigger UI refresh after orchestrator state changes
1384 self._trigger_ui_refresh()
1385 # Update button states immediately (reactive system handles UI updates automatically)
1386 self._update_button_states()
1387 # Notify pipeline editor of status change
1388 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path']))
1389 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol)
1390 logger.debug(f"Updated plate {actual_plate['name']} status")
1392 # Final UI update (reactive system handles this automatically when self.items is modified)
1393 self._update_button_states()
1395 # Update status
1396 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.READY])
1397 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.INIT_FAILED])
1399 if error_count == 0:
1400 logger.info(f"Successfully initialized {success_count} plates")
1401 else:
1402 logger.warning(f"Initialized {success_count} plates, {error_count} errors")
1404 def action_compile_plate(self) -> None:
1405 """Handle Compile Plate button - compile pipelines for selected plates."""
1406 # Get current selection state
1407 selected_items, selection_mode = self.get_selection_state()
1409 if selection_mode == "empty":
1410 logger.warning("No plates available for compilation")
1411 return
1413 # Validate all selected plates are ready for compilation (allow failed plates to be re-compiled)
1414 not_ready = []
1415 for item in selected_items:
1416 plate_path = item['path']
1417 orchestrator = self.orchestrators.get(plate_path)
1418 # Allow READY, COMPILE_FAILED, EXEC_FAILED, COMPILED, and COMPLETED states to be compiled/recompiled
1419 if orchestrator is None or orchestrator.state not in [OrchestratorState.READY, OrchestratorState.COMPILE_FAILED, OrchestratorState.EXEC_FAILED, OrchestratorState.COMPILED, OrchestratorState.COMPLETED]:
1420 not_ready.append(item)
1422 if not_ready:
1423 names = [item['name'] for item in not_ready]
1424 # More accurate error message based on actual state
1425 if any(self.orchestrators.get(item['path']) is None for item in not_ready):
1426 logger.warning(f"Cannot compile plates that haven't been initialized: {', '.join(names)}")
1427 elif any(self.orchestrators.get(item['path']).state == OrchestratorState.EXECUTING for item in not_ready):
1428 logger.warning(f"Cannot compile plates that are currently executing: {', '.join(names)}")
1429 else:
1430 logger.warning(f"Cannot compile plates in current state: {', '.join(names)}")
1431 return
1433 # Validate all selected plates have pipelines
1434 no_pipeline = []
1435 for item in selected_items:
1436 pipeline = self._get_current_pipeline_definition(item['path'])
1437 if not pipeline:
1438 no_pipeline.append(item)
1440 if no_pipeline:
1441 names = [item['name'] for item in no_pipeline]
1442 self.app.current_status = f"Cannot compile plates without pipelines: {', '.join(names)}"
1443 return
1445 # Start async compilation
1446 self._start_async_compile(selected_items, selection_mode)
1448 def _start_async_compile(self, selected_items: List[Dict], selection_mode: str) -> None:
1449 """Start async compilation of selected plates."""
1450 # Generate operation description
1451 desc = self.get_operation_description(selected_items, selection_mode, "compile")
1452 logger.info(f"Compiling: {desc}")
1454 # Start background worker
1455 self._compile_plates_worker(selected_items)
1457 @work(exclusive=True)
1458 async def _compile_plates_worker(self, selected_items: List[Dict]) -> None:
1459 """Background worker for plate compilation."""
1460 for plate_data in selected_items:
1461 plate_path = plate_data['path']
1463 # Find the actual plate in self.items (not the copy from get_selection_state)
1464 actual_plate = None
1465 for plate in self.items:
1466 if plate['path'] == plate_path:
1467 actual_plate = plate
1468 break
1470 if not actual_plate:
1471 logger.error(f"Plate not found in plates list: {plate_path}")
1472 continue
1474 # Get definition pipeline and make fresh copy
1475 definition_pipeline = self._get_current_pipeline_definition(plate_path)
1476 if not definition_pipeline:
1477 logger.warning(f"No pipeline defined for {actual_plate['name']}, using empty pipeline")
1478 definition_pipeline = []
1480 try:
1481 # Get or create orchestrator for compilation (run in executor to avoid blocking)
1482 def get_or_create_orchestrator():
1483 if plate_path in self.orchestrators:
1484 orchestrator = self.orchestrators[plate_path]
1485 if not orchestrator.is_initialized():
1486 orchestrator.initialize()
1487 return orchestrator
1488 else:
1489 return PipelineOrchestrator(
1490 plate_path=plate_path,
1491 global_config=self.global_config,
1492 storage_registry=self.filemanager.registry
1493 ).initialize()
1495 orchestrator = await asyncio.get_event_loop().run_in_executor(None, get_or_create_orchestrator)
1496 self.orchestrators[plate_path] = orchestrator
1498 # Make fresh copy for compilation
1499 execution_pipeline = copy.deepcopy(definition_pipeline)
1501 # Fix step IDs after deep copy to match new object IDs
1502 for step in execution_pipeline:
1503 step.step_id = str(id(step))
1504 # Ensure variable_components is never None - use FunctionStep default
1505 if step.variable_components is None:
1506 logger.warning(f"🔥 Step '{step.name}' has None variable_components, setting FunctionStep default")
1507 step.variable_components = [VariableComponents.SITE]
1508 # Also ensure it's not an empty list
1509 elif not step.variable_components:
1510 logger.warning(f"🔥 Step '{step.name}' has empty variable_components, setting FunctionStep default")
1511 step.variable_components = [VariableComponents.SITE]
1513 # Get wells and compile (async - run in executor to avoid blocking UI)
1514 # Wrap in Pipeline object like test_main.py does
1515 pipeline_obj = Pipeline(steps=execution_pipeline)
1517 # Run heavy operations in executor to avoid blocking UI
1518 # Get wells using multiprocessing axis (WELL in default config)
1519 from openhcs.constants import MULTIPROCESSING_AXIS
1520 wells = await asyncio.get_event_loop().run_in_executor(None, lambda: orchestrator.get_component_keys(MULTIPROCESSING_AXIS))
1521 compiled_contexts = await asyncio.get_event_loop().run_in_executor(
1522 None, orchestrator.compile_pipelines, pipeline_obj.steps, wells
1523 )
1525 # Store state simply - no reactive property issues
1526 step_ids_in_pipeline = [id(step) for step in execution_pipeline]
1527 # Get step IDs from contexts (ProcessingContext objects)
1528 first_well_key = list(compiled_contexts.keys())[0] if compiled_contexts else None
1529 step_ids_in_contexts = list(compiled_contexts[first_well_key].step_plans.keys()) if first_well_key and hasattr(compiled_contexts[first_well_key], 'step_plans') else []
1530 logger.info(f"🔥 Storing compiled data for {plate_path}: pipeline={type(execution_pipeline)}, contexts={type(compiled_contexts)}")
1531 logger.info(f"🔥 Step IDs in pipeline: {step_ids_in_pipeline}")
1532 logger.info(f"🔥 Step IDs in contexts: {step_ids_in_contexts}")
1533 self.plate_compiled_data[plate_path] = (execution_pipeline, compiled_contexts)
1534 logger.info(f"🔥 Stored! Available compiled plates: {list(self.plate_compiled_data.keys())}")
1536 # Orchestrator state is already set to COMPILED by compile_pipelines() method
1537 logger.info(f"🔥 Successfully compiled {plate_path}")
1539 except Exception as e:
1540 logger.error(f"🔥 COMPILATION ERROR: Pipeline compilation failed for {plate_path}: {e}", exc_info=True)
1541 # Orchestrator state is already set to FAILED by compile_pipelines() method
1542 actual_plate['error'] = str(e)
1543 # Don't store anything in plate_compiled_data on failure
1545 # Trigger UI refresh after orchestrator state changes
1546 self._trigger_ui_refresh()
1547 # Update button states immediately (reactive system handles UI updates automatically)
1548 self._update_button_states()
1549 # Notify pipeline editor of status change
1550 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path']))
1551 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol)
1553 # Final UI update (reactive system handles this automatically when self.items is modified)
1554 self._update_button_states()
1556 # Update status
1557 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILED])
1558 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILE_FAILED])
1560 if error_count == 0:
1561 logger.info(f"Successfully compiled {success_count} plates")
1562 else:
1563 logger.warning(f"Compiled {success_count} plates, {error_count} errors")
1565 async def action_code_plate(self) -> None:
1566 """Generate Python code for selected plates and their pipelines."""
1567 logger.debug("Code button pressed - generating Python code for plates")
1569 selected_items, _ = self.get_selection_state()
1570 if not selected_items:
1571 self.app.current_status = "No plates selected for code generation"
1572 return
1574 try:
1575 # Get pipeline data for selected plates
1576 plate_paths = [item['path'] for item in selected_items]
1577 pipeline_data = {}
1579 # Collect pipeline steps for each plate
1580 for plate_path in plate_paths:
1581 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1582 # Get pipeline steps from pipeline editor if available
1583 if plate_path in self.pipeline_editor.plate_pipelines:
1584 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path]
1585 else:
1586 pipeline_data[plate_path] = []
1587 else:
1588 pipeline_data[plate_path] = []
1590 # Use existing pickle_to_python logic to generate complete script
1591 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
1593 # Create data structure like pickle_to_python expects
1594 data = {
1595 'plate_paths': plate_paths,
1596 'pipeline_data': pipeline_data,
1597 'global_config': self.app.global_config
1598 }
1600 # Extract variables from data dict
1601 plate_paths = data['plate_paths']
1602 pipeline_data = data['pipeline_data']
1604 # Generate just the orchestrator configuration (no execution wrapper)
1605 from openhcs.debug.pickle_to_python import generate_complete_orchestrator_code
1607 python_code = generate_complete_orchestrator_code(
1608 plate_paths=plate_paths,
1609 pipeline_data=pipeline_data,
1610 global_config=self.app.global_config,
1611 clean_mode=True # Default to clean mode - only show non-default values
1612 )
1614 # Create callback to handle edited code
1615 def handle_edited_code(edited_code: str):
1616 logger.debug("Orchestrator code edited, processing changes...")
1617 try:
1618 # Execute the code (it has all necessary imports)
1619 namespace = {}
1620 exec(edited_code, namespace)
1622 # Update pipeline data if present (composition: orchestrator contains pipelines)
1623 if 'pipeline_data' in namespace:
1624 new_pipeline_data = namespace['pipeline_data']
1625 # Update pipeline editor using reactive system (like pipeline code button does)
1626 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1627 # Update plate pipelines storage
1628 current_pipelines = dict(self.pipeline_editor.plate_pipelines)
1629 current_pipelines.update(new_pipeline_data)
1630 self.pipeline_editor.plate_pipelines = current_pipelines
1632 # If current plate is in the edited data, update the current view too
1633 current_plate = self.pipeline_editor.current_plate
1634 if current_plate and current_plate in new_pipeline_data:
1635 self.pipeline_editor.pipeline_steps = new_pipeline_data[current_plate]
1637 self.app.current_status = f"Pipeline data updated for {len(new_pipeline_data)} plates"
1639 # Update global config if present
1640 elif 'global_config' in namespace:
1641 new_global_config = namespace['global_config']
1642 import asyncio
1643 for plate_path in plate_paths:
1644 if plate_path in self.orchestrators:
1645 orchestrator = self.orchestrators[plate_path]
1646 asyncio.create_task(orchestrator.apply_new_global_config(new_global_config))
1647 self.app.current_status = f"Global config updated for {len(plate_paths)} plates"
1649 # Update orchestrators list if present
1650 elif 'orchestrators' in namespace:
1651 new_orchestrators = namespace['orchestrators']
1652 self.app.current_status = f"Orchestrator list updated with {len(new_orchestrators)} orchestrators"
1654 else:
1655 self.app.show_error("Parse Error", "No valid assignments found in edited code")
1657 except SyntaxError as e:
1658 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
1659 except Exception as e:
1660 import traceback
1661 full_traceback = traceback.format_exc()
1662 logger.error(f"Failed to parse edited orchestrator code: {e}\nFull traceback:\n{full_traceback}")
1663 self.app.show_error("Edit Error", f"Failed to parse orchestrator code: {str(e)}\n\nFull traceback:\n{full_traceback}")
1665 # Launch terminal editor
1666 launcher = TerminalLauncher(self.app)
1667 await launcher.launch_editor_for_file(
1668 file_content=python_code,
1669 file_extension='.py',
1670 on_save_callback=handle_edited_code
1671 )
1673 except Exception as e:
1674 logger.error(f"Failed to generate plate code: {e}")
1675 self.app.current_status = f"Failed to generate code: {e}"
1677 async def action_save_python_script(self) -> None:
1678 """Save Python script for selected plates (like special_io_pipeline.py)."""
1679 logger.debug("Save button pressed - saving Python script for plates")
1681 selected_items, _ = self.get_selection_state()
1682 if not selected_items:
1683 self.app.current_status = "No plates selected for script generation"
1684 return
1686 try:
1687 # Get pipeline data for selected plates
1688 plate_paths = [item['path'] for item in selected_items]
1689 pipeline_data = {}
1691 # Collect pipeline steps for each plate
1692 for plate_path in plate_paths:
1693 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1694 # Get pipeline steps from pipeline editor if available
1695 if plate_path in self.pipeline_editor.plate_pipelines:
1696 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path]
1697 else:
1698 pipeline_data[plate_path] = []
1699 else:
1700 pipeline_data[plate_path] = []
1702 # Create data structure like pickle_to_python expects
1703 data = {
1704 'plate_paths': plate_paths,
1705 'pipeline_data': pipeline_data,
1706 'global_config': self.app.global_config
1707 }
1709 # Generate complete executable Python script using pickle_to_python logic
1710 python_code = self._generate_executable_script(data)
1712 # Launch file browser to save the script
1713 from openhcs.textual_tui.windows.file_browser_window import open_file_browser_window, BrowserMode
1714 from openhcs.textual_tui.services.file_browser_service import SelectionMode
1715 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
1716 from openhcs.constants.constants import Backend
1718 def handle_save_result(result):
1719 if result:
1720 # Handle both single Path and list of Paths
1721 save_path = None
1722 if isinstance(result, Path):
1723 save_path = result
1724 elif isinstance(result, list) and len(result) > 0:
1725 save_path = result[0] # Take first path
1727 if save_path:
1728 try:
1729 # Write the Python script to the selected file
1730 with open(save_path, 'w') as f:
1731 f.write(python_code)
1733 logger.info(f"Python script saved to: {save_path}")
1734 self.app.current_status = f"Python script saved to: {save_path}"
1735 except Exception as e:
1736 logger.error(f"Failed to save Python script: {e}")
1737 self.app.current_status = f"Failed to save script: {e}"
1739 # Generate default filename based on first plate
1740 first_plate_name = Path(plate_paths[0]).name if plate_paths else "pipeline"
1741 default_filename = f"{first_plate_name}_pipeline.py"
1743 await open_file_browser_window(
1744 app=self.app,
1745 file_manager=self.app.filemanager,
1746 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
1747 backend=Backend.DISK,
1748 title="Save Python Pipeline Script",
1749 mode=BrowserMode.SAVE,
1750 selection_mode=SelectionMode.FILES_ONLY,
1751 filter_extensions=['.py'],
1752 default_filename=default_filename,
1753 cache_key=PathCacheKey.PIPELINE_FILES,
1754 on_result_callback=handle_save_result,
1755 caller_id="plate_manager_save_script"
1756 )
1758 except Exception as e:
1759 logger.error(f"Failed to save Python script: {e}")
1760 self.app.current_status = f"Failed to save script: {e}"
1762 def _generate_executable_script(self, data: Dict) -> str:
1763 """Generate fully executable Python script by creating a temp pickle and using existing convert_pickle_to_python."""
1764 import tempfile
1765 import dill as pickle
1766 from openhcs.debug.pickle_to_python import convert_pickle_to_python
1768 # Create temporary pickle file
1769 with tempfile.NamedTemporaryFile(mode='wb', suffix='.pkl', delete=False) as temp_pickle:
1770 pickle.dump(data, temp_pickle)
1771 temp_pickle_path = temp_pickle.name
1773 # Create temporary output file
1774 with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp_output:
1775 temp_output_path = temp_output.name
1777 try:
1778 # Use existing convert_pickle_to_python function
1779 convert_pickle_to_python(temp_pickle_path, temp_output_path)
1781 # Read the generated script
1782 with open(temp_output_path, 'r') as f:
1783 script_content = f.read()
1785 return script_content
1787 finally:
1788 # Clean up temp files
1789 import os
1790 try:
1791 os.unlink(temp_pickle_path)
1792 os.unlink(temp_output_path)
1793 except:
1794 pass