Coverage for openhcs/textual_tui/widgets/plate_manager.py: 0.0%
940 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2PlateManagerWidget for OpenHCS Textual TUI
4Plate management widget with complete button set and reactive state management.
5Matches the functionality from the current prompt-toolkit TUI.
6"""
8import asyncio
9import copy
10import dataclasses
11import inspect
12import json
13import logging
14import numpy as np
15import os
16import pickle
17import re
18import signal
19import stat
20import subprocess
21import sys
22import tempfile
23import time
24import traceback
25from collections import defaultdict
26from datetime import datetime
27from pathlib import Path
28from typing import Dict, List, Optional, Callable, Any, Tuple
30from openhcs.core.config import PipelineConfig
32from PIL import Image
33from textual.app import ComposeResult
34from textual.containers import Horizontal, ScrollableContainer
35from textual.reactive import reactive
36from textual.widgets import Button, Static, SelectionList
37from textual.widget import Widget
38from textual.css.query import NoMatches
39from .button_list_widget import ButtonListWidget, ButtonConfig
40from textual import work, on
42from openhcs.core.config import GlobalPipelineConfig, VFSConfig, MaterializationBackend
43from openhcs.core.pipeline import Pipeline
44from openhcs.io.filemanager import FileManager
45from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks
46from openhcs.io.base import storage_registry
47from openhcs.io.memory import MemoryStorageBackend
48from openhcs.io.zarr import ZarrStorageBackend
49from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
50from openhcs.constants.constants import GroupBy, Backend, VariableComponents, OrchestratorState
51from openhcs.textual_tui.services.file_browser_service import SelectionMode
52from openhcs.textual_tui.services.window_service import WindowService
53from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey, get_path_cache
54from openhcs.textual_tui.widgets.shared.signature_analyzer import SignatureAnalyzer
56logger = logging.getLogger(__name__)
58# Note: Using subprocess approach instead of multiprocessing to avoid TUI FD conflicts
60def get_orchestrator_status_symbol(orchestrator: PipelineOrchestrator) -> str:
61 """Get UI symbol for orchestrator state - simple mapping without over-engineering."""
62 if orchestrator is None:
63 return "?" # No orchestrator (newly added plate)
65 state_to_symbol = {
66 OrchestratorState.CREATED: "?", # Created but not initialized
67 OrchestratorState.READY: "-", # Initialized, ready for compilation
68 OrchestratorState.COMPILED: "o", # Compiled, ready for execution
69 OrchestratorState.EXECUTING: "!", # Execution in progress
70 OrchestratorState.COMPLETED: "C", # Execution completed successfully
71 OrchestratorState.INIT_FAILED: "I", # Initialization failed
72 OrchestratorState.COMPILE_FAILED: "P", # Compilation failed (P for Pipeline)
73 OrchestratorState.EXEC_FAILED: "X", # Execution failed
74 }
76 return state_to_symbol.get(orchestrator.state, "?")
78def get_current_log_file_path() -> str:
79 """Get the current log file path from the logging system."""
80 try:
81 # Get the root logger and find the FileHandler
82 root_logger = logging.getLogger()
83 for handler in root_logger.handlers:
84 if isinstance(handler, logging.FileHandler):
85 return handler.baseFilename
87 # Fallback: try to get from openhcs logger
88 openhcs_logger = logging.getLogger("openhcs")
89 for handler in openhcs_logger.handlers:
90 if isinstance(handler, logging.FileHandler):
91 return handler.baseFilename
93 # Last resort: create a default path
94 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs"
95 log_dir.mkdir(parents=True, exist_ok=True)
96 return str(log_dir / f"openhcs_subprocess_{int(time.time())}.log")
98 except Exception as e:
99 logger.warning(f"Could not determine log file path: {e}")
100 return "/tmp/openhcs_subprocess.log"
108class PlateManagerWidget(ButtonListWidget):
109 """
110 Plate management widget using Textual reactive state.
111 """
113 # Semantic reactive property (like PipelineEditor's pipeline_steps)
114 selected_plate = reactive("")
115 orchestrators = reactive({})
116 plate_configs = reactive({})
117 orchestrator_state_version = reactive(0) # Increment to trigger UI refresh
119 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig):
120 button_configs = [
121 ButtonConfig("Add", "add_plate"),
122 ButtonConfig("Del", "del_plate", disabled=True),
123 ButtonConfig("Edit", "edit_config", disabled=True), # Unified edit button for config editing
124 ButtonConfig("Init", "init_plate", disabled=True),
125 ButtonConfig("Compile", "compile_plate", disabled=True),
126 ButtonConfig("Run", "run_plate", disabled=True),
127 ButtonConfig("Code", "code_plate", disabled=True), # Generate Python code
128 ButtonConfig("Save", "save_python_script", disabled=True), # Save Python script
129 # ButtonConfig("Export", "export_ome_zarr", disabled=True), # Export to OME-ZARR - HIDDEN FROM UI
130 ]
131 super().__init__(
132 button_configs=button_configs,
133 list_id="plate_content",
134 container_id="plate_list",
135 on_button_pressed=self._handle_button_press,
136 on_selection_changed=self._handle_selection_change,
137 on_item_moved=self._handle_item_moved
138 )
139 self.filemanager = filemanager
140 self.global_config = global_config
141 self.plate_compiled_data = {}
142 self.on_plate_selected: Optional[Callable[[str], None]] = None
143 self.pipeline_editor: Optional['PipelineEditorWidget'] = None
145 # Initialize window service to avoid circular imports
146 self.window_service = None # Will be set in on_mount
148 # --- Subprocess Architecture ---
149 self.current_process: Optional[subprocess.Popen] = None
150 self.log_file_path: Optional[str] = None # Single source of truth
151 self.log_file_position: int = 0 # Track position in log file for incremental reading
152 # Async monitoring using Textual's interval system
153 self.monitoring_interval = None
154 self.monitoring_active = False
155 # ---
157 logger.debug("PlateManagerWidget initialized")
163 def on_unmount(self) -> None:
164 logger.debug("Unmounting PlateManagerWidget, ensuring worker process is terminated.")
165 # Schedule async stop execution since on_unmount is sync
166 import asyncio
167 if self.current_process and self.current_process.poll() is None:
168 # Create a task to stop execution asynchronously
169 asyncio.create_task(self.action_stop_execution())
170 self._stop_monitoring()
172 def format_item_for_display(self, plate: Dict) -> Tuple[str, str]:
173 # Get status from orchestrator instead of magic string
174 plate_path = plate.get('path', '')
175 orchestrator = self.orchestrators.get(plate_path)
176 status_symbol = get_orchestrator_status_symbol(orchestrator)
178 status_symbols = {
179 "?": "➕", # Created (not initialized)
180 "-": "✅", # Ready (initialized)
181 "o": "⚡", # Compiled
182 "!": "🔄", # Executing
183 "C": "🏁", # Completed
184 "I": "🚫", # Init failed
185 "P": "💥", # Compile failed (Pipeline)
186 "X": "❌" # Execution failed
187 }
188 status_icon = status_symbols.get(status_symbol, "❓")
189 plate_name = plate.get('name', 'Unknown')
190 display_text = f"{status_icon} {plate_name} - {plate_path}"
191 return display_text, plate_path
193 async def _handle_button_press(self, button_id: str) -> None:
194 action_map = {
195 "add_plate": self.action_add_plate,
196 "del_plate": self.action_delete_plate,
197 "edit_config": self.action_edit_config, # Unified edit button
198 "init_plate": self.action_init_plate,
199 "compile_plate": self.action_compile_plate,
200 "code_plate": self.action_code_plate, # Generate Python code
201 "save_python_script": self.action_save_python_script, # Save Python script
202 # "export_ome_zarr": self.action_export_ome_zarr, # HIDDEN
203 }
204 if button_id in action_map:
205 action = action_map[button_id]
206 if inspect.iscoroutinefunction(action):
207 await action()
208 else:
209 action()
210 elif button_id == "run_plate":
211 if self._is_any_plate_running():
212 await self.action_stop_execution()
213 else:
214 await self.action_run_plate()
216 def _handle_selection_change(self, selected_values: List[str]) -> None:
217 logger.debug(f"Checkmarks changed: {len(selected_values)} items selected")
219 def _handle_item_moved(self, from_index: int, to_index: int) -> None:
220 current_plates = list(self.items)
221 plate = current_plates.pop(from_index)
222 current_plates.insert(to_index, plate)
223 self.items = current_plates
224 plate_name = plate['name']
225 direction = "up" if to_index < from_index else "down"
226 self.app.current_status = f"Moved plate '{plate_name}' {direction}"
228 def on_mount(self) -> None:
229 # Initialize window service
230 self.window_service = WindowService(self.app)
232 self.call_later(self._delayed_update_display)
233 self.call_later(self._update_button_states)
235 def watch_items(self, items: List[Dict]) -> None:
236 """Automatically update UI when items changes (follows ButtonListWidget pattern)."""
237 # DEBUG: Log when items list changes to track the source of the reset
238 stack_trace = ''.join(traceback.format_stack()[-3:-1]) # Get last 2 stack frames
239 logger.debug(f"🔍 ITEMS CHANGED: {len(items)} plates. Call stack:\n{stack_trace}")
241 # CRITICAL: Call parent's watch_items to update the SelectionList
242 super().watch_items(items)
244 logger.debug(f"Plates updated: {len(items)} plates")
245 self._update_button_states()
247 def watch_highlighted_item(self, plate_path: str) -> None:
248 self.selected_plate = plate_path
249 logger.debug(f"Highlighted plate: {plate_path}")
251 def watch_selected_plate(self, plate_path: str) -> None:
252 self._update_button_states()
253 if self.on_plate_selected and plate_path:
254 self.on_plate_selected(plate_path)
255 logger.debug(f"Selected plate: {plate_path}")
257 def watch_orchestrator_state_version(self, version: int) -> None:
258 """Automatically refresh UI when orchestrator states change."""
259 # Only update UI if widget is properly mounted
260 if not self.is_mounted:
261 return
263 # Force SelectionList to update by calling _update_selection_list
264 # This re-calls format_item_for_display() for all items
265 self._update_selection_list()
267 # CRITICAL: Update main button states when orchestrator states change
268 self._update_button_states()
270 # Also notify PipelineEditor if connected
271 if self.pipeline_editor:
272 logger.debug(f"PlateManager: Notifying PipelineEditor of orchestrator state change (version {version})")
273 self.pipeline_editor._update_button_states()
275 def get_selection_state(self) -> tuple[List[Dict], str]:
276 # Check if widget is properly mounted first
277 if not self.is_mounted:
278 logger.debug("get_selection_state called on unmounted widget")
279 return [], "empty"
281 try:
282 selection_list = self.query_one(f"#{self.list_id}")
283 multi_selected_values = selection_list.selected
284 if multi_selected_values:
285 selected_items = [p for p in self.items if p.get('path') in multi_selected_values]
286 return selected_items, "checkbox"
287 elif self.selected_plate:
288 selected_items = [p for p in self.items if p.get('path') == self.selected_plate]
289 return selected_items, "cursor"
290 else:
291 return [], "empty"
292 except Exception as e:
293 # DOM CORRUPTION DETECTED - This is a critical error
294 stack_trace = ''.join(traceback.format_stack()[-3:-1])
295 logger.error(f"🚨 DOM CORRUPTION: Failed to get selection state: {e}")
296 logger.error(f"🚨 DOM CORRUPTION: Call stack:\n{stack_trace}")
297 logger.error(f"🚨 DOM CORRUPTION: Widget mounted: {self.is_mounted}")
298 logger.error(f"🚨 DOM CORRUPTION: Looking for: #{self.list_id}")
299 logger.error(f"🚨 DOM CORRUPTION: Plates count: {len(self.items)}")
301 # Try to diagnose what widgets actually exist
302 try:
303 all_widgets = list(self.query("*"))
304 widget_ids = [w.id for w in all_widgets if w.id]
305 logger.error(f"🚨 DOM CORRUPTION: Available widget IDs: {widget_ids}")
306 except Exception as diag_e:
307 logger.error(f"🚨 DOM CORRUPTION: Could not diagnose widgets: {diag_e}")
309 if self.selected_plate:
310 selected_items = [p for p in self.items if p.get('path') == self.selected_plate]
311 return selected_items, "cursor"
312 return [], "empty"
314 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str:
315 count = len(selected_items)
316 if count == 0: return f"No items for {operation}"
317 if count == 1: return f"{operation.title()} item: {selected_items[0].get('name', 'Unknown')}"
318 return f"{operation.title()} {count} items"
320 def _delayed_update_display(self) -> None:
321 """Trigger UI update - no longer needed since reactive system handles this automatically."""
322 # The reactive system now handles updates automatically via watch_plates()
323 # This method is kept for compatibility but does nothing
324 pass
326 def _trigger_ui_refresh(self) -> None:
327 """Force UI refresh when orchestrator state changes without items list changing."""
328 # Increment reactive counter to trigger automatic UI refresh
329 self.orchestrator_state_version += 1
331 def _update_button_states(self) -> None:
332 try:
333 # Check if widget is mounted and buttons exist
334 if not self.is_mounted:
335 return
337 has_selection = bool(self.selected_plate)
338 is_running = self._is_any_plate_running()
340 # Check if there are any selected items (for delete button)
341 selected_items, _ = self.get_selection_state()
342 has_selected_items = bool(selected_items)
344 can_run = has_selection and any(p['path'] in self.plate_compiled_data for p in self.items if p.get('path') == self.selected_plate)
346 # Try to get run button - if it doesn't exist, widget is not fully mounted
347 try:
348 run_button = self.query_one("#run_plate")
349 if is_running:
350 run_button.label = "Stop"
351 run_button.disabled = False
352 else:
353 run_button.label = "Run"
354 run_button.disabled = not can_run
355 except:
356 # Buttons not mounted yet, skip update
357 return
359 self.query_one("#add_plate").disabled = is_running
360 self.query_one("#del_plate").disabled = not self.items or not has_selected_items or is_running
362 # Edit button (config editing) enabled when 1+ orchestrators selected and initialized
363 selected_items, _ = self.get_selection_state()
364 edit_enabled = (
365 len(selected_items) > 0 and
366 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
367 not is_running
368 )
369 self.query_one("#edit_config").disabled = not edit_enabled
371 # Init button - enabled when plates are selected, can be initialized, and not running
372 init_enabled = (
373 len(selected_items) > 0 and
374 any(self._can_orchestrator_be_initialized(item['path']) for item in selected_items) and
375 not is_running
376 )
377 self.query_one("#init_plate").disabled = not init_enabled
379 # Compile button - enabled when plates are selected, initialized, and not running
380 selected_items, _ = self.get_selection_state()
381 compile_enabled = (
382 len(selected_items) > 0 and
383 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
384 not is_running
385 )
386 self.query_one("#compile_plate").disabled = not compile_enabled
388 # Code button - enabled when plates are selected, initialized, and not running
389 code_enabled = (
390 len(selected_items) > 0 and
391 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
392 not is_running
393 )
394 self.query_one("#code_plate").disabled = not code_enabled
396 # Save Python script button - enabled when plates are selected, initialized, and not running
397 save_enabled = (
398 len(selected_items) > 0 and
399 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
400 not is_running
401 )
402 self.query_one("#save_python_script").disabled = not save_enabled
404 # Export button - enabled when plate is initialized and has workspace (HIDDEN FROM UI)
405 # export_enabled = (
406 # has_selection and
407 # self.selected_plate in self.orchestrators and
408 # not is_running
409 # )
410 # try:
411 # self.query_one("#export_ome_zarr").disabled = not export_enabled
412 # except:
413 # pass # Button is hidden from UI
415 # Debug button removed - no longer needed
417 except Exception as e:
418 # Only log if it's not a mounting/unmounting issue
419 if self.is_mounted:
420 logger.debug(f"Button state update skipped (widget not ready): {e}")
421 # Don't log errors during unmounting
423 def _is_any_plate_running(self) -> bool:
424 return self.current_process is not None and self.current_process.poll() is None
426 def _has_pipelines(self, plates: List[Dict]) -> bool:
427 """Check if all plates have pipeline definitions."""
428 if not self.pipeline_editor:
429 return False
431 for plate in plates:
432 pipeline = self.pipeline_editor.get_pipeline_for_plate(plate['path'])
433 if not pipeline:
434 return False
435 return True
437 def get_plate_status(self, plate_path: str) -> str:
438 """Get status for specific plate - now uses orchestrator state."""
439 orchestrator = self.orchestrators.get(plate_path)
440 return get_orchestrator_status_symbol(orchestrator)
442 def _is_orchestrator_initialized(self, plate_path: str) -> bool:
443 """Check if orchestrator exists and is in an initialized state."""
444 orchestrator = self.orchestrators.get(plate_path)
445 if orchestrator is None:
446 return False
447 return orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED,
448 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED,
449 OrchestratorState.EXEC_FAILED]
451 def _can_orchestrator_be_initialized(self, plate_path: str) -> bool:
452 """Check if orchestrator can be initialized (doesn't exist or is in a re-initializable state)."""
453 orchestrator = self.orchestrators.get(plate_path)
454 if orchestrator is None:
455 return True # No orchestrator exists, can be initialized
456 return orchestrator.state in [OrchestratorState.CREATED, OrchestratorState.INIT_FAILED]
458 def _notify_pipeline_editor_status_change(self, plate_path: str, new_status: str) -> None:
459 """Notify pipeline editor when plate status changes (enables Add button immediately)."""
460 if self.pipeline_editor and self.pipeline_editor.current_plate == plate_path:
461 # Update pipeline editor's status and trigger button state update
462 self.pipeline_editor.current_plate_status = new_status
464 def _get_current_pipeline_definition(self, plate_path: str = None) -> List:
465 """Get current pipeline definition from PipelineEditor (now returns FunctionStep objects directly)."""
466 if not self.pipeline_editor:
467 logger.warning("No pipeline editor reference - using empty pipeline")
468 return []
470 # Get pipeline for specific plate or current plate
471 target_plate = plate_path or self.pipeline_editor.current_plate
472 if not target_plate:
473 logger.warning("No plate specified - using empty pipeline")
474 return []
476 # Get pipeline from editor (now returns List[FunctionStep] directly)
477 pipeline_steps = self.pipeline_editor.get_pipeline_for_plate(target_plate)
479 # No conversion needed - pipeline_steps are already FunctionStep objects with memory type decorators
480 return pipeline_steps
482 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str:
483 """Generate human-readable description of what will be operated on."""
484 count = len(selected_items)
485 if selection_mode == "empty":
486 return f"No items available for {operation}"
487 elif selection_mode == "all":
488 return f"{operation.title()} ALL {count} items"
489 elif selection_mode == "checkbox":
490 if count == 1:
491 item_name = selected_items[0].get('name', 'Unknown')
492 return f"{operation.title()} selected item: {item_name}"
493 else:
494 return f"{operation.title()} {count} selected items"
495 else:
496 return f"{operation.title()} {count} items"
498 def _reset_execution_state(self, status_message: str, force_fail_executing: bool = True):
499 if self.current_process:
500 if self.current_process.poll() is None: # Still running
501 logger.warning("Forcefully terminating subprocess during reset.")
502 self.current_process.terminate()
503 try:
504 self.current_process.wait(timeout=1)
505 except subprocess.TimeoutExpired:
506 self.current_process.kill() # Force kill if terminate fails
507 self.current_process = None
509 # Clear log file reference (no temp files - log file is single source of truth)
510 self.log_file_path = None
511 self.log_file_position = 0
513 # Stop async monitoring
514 self._stop_monitoring()
516 # Only reset executing orchestrators to failed if this is a forced termination
517 # Natural completion should preserve the states set by the completion handler
518 if force_fail_executing:
519 for plate_path, orchestrator in self.orchestrators.items():
520 if orchestrator.state == OrchestratorState.EXECUTING:
521 orchestrator._state = OrchestratorState.EXEC_FAILED
523 # Trigger UI refresh after state changes - this is essential for button states
524 self._trigger_ui_refresh()
526 # Update button states - but only if widget is properly mounted
527 try:
528 if self.is_mounted and hasattr(self, 'query_one'):
529 self._update_button_states()
530 except Exception as e:
531 logger.error(f"Failed to update button states during reset: {e}")
533 self.app.current_status = status_message
535 async def action_run_plate(self) -> None:
536 # Clear logs from singleton toolong window before starting new run
537 try:
538 from openhcs.textual_tui.windows.toolong_window import clear_toolong_logs
539 logger.info("Clearing logs from singleton toolong window before new run")
540 clear_toolong_logs(self.app)
541 logger.info("Toolong logs cleared")
542 except Exception as e:
543 logger.error(f"Failed to clear toolong logs: {e}")
544 import traceback
545 logger.error(traceback.format_exc())
547 selected_items, _ = self.get_selection_state()
548 if not selected_items:
549 self.app.show_error("No plates selected to run.")
550 return
552 ready_items = [item for item in selected_items if item.get('path') in self.plate_compiled_data]
553 if not ready_items:
554 self.app.show_error("Selected plates are not compiled. Please compile first.")
555 return
557 try:
558 # Use subprocess approach like integration tests
559 logger.debug("🔥 Using subprocess approach for clean isolation")
561 plate_paths_to_run = [item['path'] for item in ready_items]
563 # Pass definition pipeline steps - subprocess will make fresh copy and compile
564 pipeline_data = {}
565 for plate_path in plate_paths_to_run:
566 definition_pipeline = self._get_current_pipeline_definition(plate_path)
567 pipeline_data[plate_path] = definition_pipeline
569 logger.info(f"🔥 Starting subprocess for {len(plate_paths_to_run)} plates")
571 # Create data file for subprocess (only file needed besides log)
572 data_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl')
574 # Generate unique ID for this subprocess
575 import time
576 subprocess_timestamp = int(time.time())
577 plate_names = [Path(path).name for path in plate_paths_to_run]
578 unique_id = f"plates_{'_'.join(plate_names[:2])}_{subprocess_timestamp}" # Limit to first 2 plates for filename length
580 # Build subprocess log name from TUI log base using log utilities
581 from openhcs.core.log_utils import get_current_log_file_path
582 try:
583 tui_log_path = get_current_log_file_path()
584 if tui_log_path.endswith('.log'):
585 tui_base = tui_log_path[:-4] # Remove .log extension
586 else:
587 tui_base = tui_log_path
588 log_file_base = f"{tui_base}_subprocess_{subprocess_timestamp}"
589 except RuntimeError:
590 # Fallback if no main log found
591 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs"
592 log_dir.mkdir(parents=True, exist_ok=True)
593 log_file_base = str(log_dir / f"tui_subprocess_{subprocess_timestamp}")
595 # Pickle data for subprocess
596 subprocess_data = {
597 'plate_paths': plate_paths_to_run,
598 'pipeline_data': pipeline_data,
599 'global_config': self.app.global_config # Pickle config object directly
600 }
602 # Resolve all lazy configurations to concrete values before pickling
603 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
604 resolved_subprocess_data = resolve_lazy_configurations_for_serialization(subprocess_data)
606 # Wrap pickle operation in executor to avoid blocking UI
607 def _write_pickle_data():
608 import dill as pickle
609 with open(data_file.name, 'wb') as f:
610 pickle.dump(resolved_subprocess_data, f)
611 data_file.close()
613 await asyncio.get_event_loop().run_in_executor(None, _write_pickle_data)
615 logger.debug(f"🔥 Created data file: {data_file.name}")
617 # Create subprocess (like integration tests)
618 subprocess_script = Path(__file__).parent.parent / "subprocess_runner.py"
620 # Generate actual log file path that subprocess will create
621 actual_log_file_path = f"{log_file_base}_{unique_id}.log"
622 logger.debug(f"🔥 Log file base: {log_file_base}")
623 logger.debug(f"🔥 Unique ID: {unique_id}")
624 logger.debug(f"🔥 Actual log file: {actual_log_file_path}")
626 # Store log file path for monitoring (subprocess logger writes to this)
627 self.log_file_path = actual_log_file_path
628 self.log_file_position = self._get_current_log_position() # Start from current end
630 logger.debug(f"🔥 Subprocess command: {sys.executable} {subprocess_script} {data_file.name} {log_file_base} {unique_id}")
631 logger.debug(f"🔥 Subprocess logger will write to: {self.log_file_path}")
632 logger.debug(f"🔥 Subprocess stdout will be silenced (logger handles output)")
634 # SIMPLE SUBPROCESS: Let subprocess log to its own file (single source of truth)
635 # Wrap subprocess creation in executor to avoid blocking UI
636 def _create_subprocess():
637 return subprocess.Popen([
638 sys.executable, str(subprocess_script),
639 data_file.name, log_file_base, unique_id # Only data file and log - no temp files
640 ],
641 stdout=subprocess.DEVNULL, # Subprocess logs to its own file
642 stderr=subprocess.DEVNULL, # Subprocess logs to its own file
643 text=True, # Text mode for easier handling
644 )
646 self.current_process = await asyncio.get_event_loop().run_in_executor(None, _create_subprocess)
648 logger.info(f"🔥 Subprocess started with PID: {self.current_process.pid}")
650 # Subprocess logs to its own dedicated file - no output monitoring needed
652 # Update orchestrator states to show running state
653 for plate in ready_items:
654 plate_path = plate['path']
655 if plate_path in self.orchestrators:
656 self.orchestrators[plate_path]._state = OrchestratorState.EXECUTING
658 # Trigger UI refresh after state changes
659 self._trigger_ui_refresh()
661 self.app.current_status = f"Running {len(ready_items)} plate(s) in subprocess..."
662 self._update_button_states()
664 # Start reactive log monitoring
665 self._start_log_monitoring()
667 # Start async monitoring
668 await self._start_monitoring()
670 except Exception as e:
671 logger.critical(f"Failed to start subprocess: {e}", exc_info=True)
672 self.app.show_error("Failed to start subprocess", str(e))
673 self._reset_execution_state("Subprocess failed to start")
675 def _start_log_monitoring(self) -> None:
676 """Start reactive log monitoring for subprocess logs."""
677 if not self.log_file_path:
678 logger.warning("Cannot start log monitoring: no log file path")
679 return
681 try:
682 # Extract base path from log file path (remove .log extension)
683 log_path = Path(self.log_file_path)
684 base_log_path = str(log_path.with_suffix(''))
686 # Notify status bar to start log monitoring
687 if hasattr(self.app, 'status_bar') and self.app.status_bar:
688 self.app.status_bar.start_log_monitoring(base_log_path)
689 logger.debug(f"Started reactive log monitoring for: {base_log_path}")
690 else:
691 logger.warning("Status bar not available for log monitoring")
693 except Exception as e:
694 logger.error(f"Failed to start log monitoring: {e}")
696 def _stop_log_monitoring(self) -> None:
697 """Stop reactive log monitoring."""
698 try:
699 # Notify status bar to stop log monitoring
700 if hasattr(self.app, 'status_bar') and self.app.status_bar:
701 self.app.status_bar.stop_log_monitoring()
702 logger.debug("Stopped reactive log monitoring")
703 except Exception as e:
704 logger.error(f"Failed to stop log monitoring: {e}")
706 def _get_current_log_position(self) -> int:
707 """Get current position in log file."""
708 if not self.log_file_path or not Path(self.log_file_path).exists():
709 return 0
710 try:
711 return Path(self.log_file_path).stat().st_size
712 except Exception:
713 return 0
717 def _stop_file_watcher(self) -> None:
718 """Stop file system watcher without blocking."""
719 if not self.file_observer:
720 return
722 try:
723 # Just stop and abandon - don't wait for anything
724 self.file_observer.stop()
725 except Exception:
726 pass # Ignore errors
727 finally:
728 # Always clear references immediately
729 self.file_observer = None
730 self.file_watcher = None
734 async def _start_monitoring(self) -> None:
735 """Start async monitoring using Textual's interval system."""
736 # Stop any existing monitoring
737 self._stop_monitoring()
739 if self.monitoring_active:
740 return
742 self.monitoring_active = True
743 # Use Textual's set_interval for periodic async monitoring
744 self.monitoring_interval = self.set_interval(
745 10.0, # Check every 10 seconds
746 self._check_process_status_async,
747 pause=False
748 )
749 logger.debug("Started async process monitoring")
751 def _stop_monitoring(self) -> None:
752 """Stop async monitoring."""
753 if self.monitoring_interval:
754 self.monitoring_interval.stop()
755 self.monitoring_interval = None
756 self.monitoring_active = False
758 # Also stop log monitoring
759 self._stop_log_monitoring()
761 logger.debug("Stopped async process monitoring")
763 async def _check_process_status_async(self) -> None:
764 """Async process status check - replaces worker thread."""
765 if not self.monitoring_active:
766 return
768 try:
769 # Simple direct access - no threading, no locks needed
770 if not self._is_any_plate_running():
771 logger.debug("🔥 MONITOR: Subprocess finished")
773 # Stop monitoring first
774 self._stop_monitoring()
776 # Handle completion directly - no call_from_thread needed
777 await self._handle_process_completion()
779 except Exception as e:
780 logger.debug(f"Error in async process monitoring: {e}")
781 # Continue monitoring on error
783 async def _handle_process_completion(self) -> None:
784 """Handle subprocess completion - read from log file (single source of truth)."""
785 # Determine success/failure from log file content (single source of truth)
786 success = False
788 if self.log_file_path and Path(self.log_file_path).exists():
789 try:
790 # Read log file directly to check for success markers
791 with open(self.log_file_path, 'r') as f:
792 log_content = f.read()
793 # Look for success markers in the log
794 has_execution_success = "🔥 SUBPROCESS: EXECUTION SUCCESS:" in log_content
795 has_all_completed = "All plates completed successfully" in log_content
796 if has_execution_success and has_all_completed:
797 success = True
799 except Exception as e:
800 logger.error(f"Error reading subprocess log file: {e}")
801 success = False
803 # Clean up the subprocess
804 logger.info("🔥 MONITOR: Starting process cleanup...")
805 if self.current_process:
806 try:
807 self.current_process.wait() # Clean up the zombie process
808 logger.info("🔥 MONITOR: Process cleanup completed")
809 except Exception as e:
810 logger.warning(f"🔥 MONITOR: Error during process cleanup: {e}")
812 # Update orchestrator states based on log file analysis (single source of truth)
813 if success:
814 # Success - update orchestrators to completed
815 for plate_path, orchestrator in self.orchestrators.items():
816 if orchestrator.state == OrchestratorState.EXECUTING:
817 orchestrator._state = OrchestratorState.COMPLETED
819 # Reset execution state (this will trigger UI refresh internally)
820 self._reset_execution_state("Execution completed successfully.", force_fail_executing=False)
821 else:
822 # Failure - update orchestrators to failed
823 for plate_path, orchestrator in self.orchestrators.items():
824 if orchestrator.state == OrchestratorState.EXECUTING:
825 orchestrator._state = OrchestratorState.EXEC_FAILED
827 # Reset execution state (this will trigger UI refresh internally)
828 self._reset_execution_state("Execution failed.", force_fail_executing=False)
830 self._stop_monitoring() # Stop monitoring since process is done
832 async def _read_log_file_incremental(self) -> None:
833 """Read new content from the log file since last read."""
834 if not self.log_file_path:
835 self.app.current_status = "🔥 LOG READER: No log file"
836 return
838 try:
839 # Wrap all file I/O operations in executor to avoid blocking UI
840 def _read_log_content():
841 if not Path(self.log_file_path).exists():
842 return None, self.log_file_position
844 with open(self.log_file_path, 'r') as f:
845 # Seek to where we left off
846 f.seek(self.log_file_position)
847 new_content = f.read()
848 # Update position for next read
849 new_position = f.tell()
851 return new_content, new_position
853 new_content, new_position = await asyncio.get_event_loop().run_in_executor(None, _read_log_content)
854 self.log_file_position = new_position
856 if new_content is None:
857 self.app.current_status = "🔥 LOG READER: No log file"
858 return
860 if new_content and new_content.strip():
861 # Get the last non-empty line from new content
862 lines = new_content.strip().split('\n')
863 non_empty_lines = [line.strip() for line in lines if line.strip()]
865 if non_empty_lines:
866 # Show the last line, truncated if too long
867 last_line = non_empty_lines[-1]
868 if len(last_line) > 100:
869 last_line = last_line[:97] + "..."
871 self.app.current_status = last_line
872 else:
873 self.app.current_status = "🔥 LOG READER: No lines found"
874 else:
875 self.app.current_status = "🔥 LOG READER: No new content"
877 except Exception as e:
878 self.app.current_status = f"🔥 LOG READER ERROR: {e}"
882 async def action_stop_execution(self) -> None:
883 logger.info("🛑 Stop button pressed. Terminating subprocess.")
884 self.app.current_status = "Terminating execution..."
886 # Stop async monitoring first
887 self._stop_monitoring()
889 if self.current_process and self.current_process.poll() is None: # Still running
890 try:
891 # Kill the entire process group, not just the parent process
892 # The subprocess creates its own process group, so we need to kill that group
893 logger.info(f"🛑 Killing process group for PID {self.current_process.pid}...")
895 # Get the process group ID (should be same as PID since subprocess calls os.setpgrp())
896 process_group_id = self.current_process.pid
898 # Kill entire process group (negative PID kills process group)
899 os.killpg(process_group_id, signal.SIGTERM)
901 # Give processes time to exit gracefully
902 await asyncio.sleep(1)
904 # Force kill if still alive
905 try:
906 os.killpg(process_group_id, signal.SIGKILL)
907 logger.info(f"🛑 Force killed process group {process_group_id}")
908 except ProcessLookupError:
909 logger.info(f"🛑 Process group {process_group_id} already terminated")
911 except Exception as e:
912 logger.warning(f"🛑 Error killing process group: {e}, falling back to single process kill")
913 # Fallback to killing just the main process
914 self.current_process.kill()
916 self._reset_execution_state("Execution terminated by user.")
920 async def action_add_plate(self) -> None:
921 """Handle Add Plate button."""
922 await self._open_plate_directory_browser()
924 async def action_export_ome_zarr(self) -> None:
925 """Export selected plate to OME-ZARR format."""
926 if not self.selected_plate:
927 self.app.show_error("No Selection", "Please select a plate to export.")
928 return
930 # Get the orchestrator for the selected plate
931 orchestrator = self.orchestrators.get(self.selected_plate)
932 if not orchestrator:
933 self.app.show_error("Not Initialized", "Please initialize the plate before exporting.")
934 return
936 # Open file browser for export location
937 def handle_export_result(selected_paths):
938 if selected_paths:
939 export_path = Path(selected_paths[0]) if isinstance(selected_paths, list) else Path(selected_paths)
940 self._start_ome_zarr_export(orchestrator, export_path)
942 await self.window_service.open_file_browser(
943 file_manager=self.filemanager,
944 initial_path=get_cached_browser_path(PathCacheKey.GENERAL),
945 backend=Backend.DISK,
946 title="Select OME-ZARR Export Directory",
947 mode="save",
948 selection_mode=SelectionMode.DIRECTORIES_ONLY,
949 cache_key=PathCacheKey.GENERAL,
950 on_result_callback=handle_export_result,
951 caller_id="plate_manager_export"
952 )
954 def _start_ome_zarr_export(self, orchestrator, export_path: Path):
955 """Start OME-ZARR export process."""
956 async def run_export():
957 try:
958 self.app.current_status = f"Exporting to OME-ZARR: {export_path}"
960 # Create export-specific config with ZARR materialization
961 from openhcs.core.config import GlobalPipelineConfig
962 from openhcs.config_framework.global_config import get_current_global_config
963 export_config = get_current_global_config(GlobalPipelineConfig)
964 export_vfs_config = VFSConfig(
965 intermediate_backend=export_config.vfs_config.intermediate_backend,
966 materialization_backend=MaterializationBackend.ZARR
967 )
969 # Update orchestrator config for export
970 export_global_config = dataclasses.replace(export_config, vfs=export_vfs_config)
972 # Create zarr backend with OME-ZARR enabled
973 zarr_backend = ZarrStorageBackend(ome_zarr_metadata=True)
975 # Copy processed data from current workspace/plate to OME-ZARR format
976 # For OpenHCS format, workspace_path is None, so use input_dir (plate path)
977 source_path = orchestrator.workspace_path or orchestrator.input_dir
978 if source_path and source_path.exists():
979 # Find processed images in workspace/plate
980 processed_images = list(source_path.rglob("*.tif"))
982 if processed_images:
983 # Group by well for batch operations
984 wells_data = defaultdict(list)
986 for img_path in processed_images:
987 # Extract well from filename
988 well_match = None
989 # Try ImageXpress pattern: A01_s001_w1_z001.tif
990 match = re.search(r'([A-Z]\d{2})_', img_path.name)
991 if match:
992 well_id = match.group(1)
993 wells_data[well_id].append(img_path)
995 # Export each well to OME-ZARR
996 export_store_path = export_path / "plate.zarr"
998 for well_id, well_images in wells_data.items():
999 # Load images
1000 images = []
1001 for img_path in well_images:
1002 img = Image.open(img_path)
1003 images.append(np.array(img))
1005 # Create output paths for OME-ZARR structure
1006 output_paths = [export_store_path / f"{well_id}_{i:03d}.tif"
1007 for i in range(len(images))]
1009 # Save to OME-ZARR format
1010 zarr_backend.save_batch(images, output_paths, chunk_name=well_id)
1012 self.app.current_status = f"✅ OME-ZARR export completed: {export_store_path}"
1013 else:
1014 self.app.show_error("No Data", "No processed images found in workspace.")
1015 else:
1016 self.app.show_error("No Workspace", "Plate workspace not found. Run pipeline first.")
1018 except Exception as e:
1019 logger.error(f"OME-ZARR export failed: {e}", exc_info=True)
1020 self.app.show_error("Export Failed", f"OME-ZARR export failed: {str(e)}")
1022 # Run export in background
1023 asyncio.create_task(run_export())
1025 # Debug functionality removed - no longer needed
1027 async def _open_plate_directory_browser(self):
1028 """Open textual-window file browser for plate directory selection."""
1029 # Get cached path for better UX - remembers last used directory
1030 path_cache = get_path_cache()
1031 initial_path = path_cache.get_initial_path(PathCacheKey.PLATE_IMPORT, Path.home())
1033 # Open textual-window file browser for directory selection
1034 await self.window_service.open_file_browser(
1035 file_manager=self.filemanager,
1036 initial_path=initial_path,
1037 backend=Backend.DISK,
1038 title="Select Plate Directory",
1039 mode="load",
1040 selection_mode=SelectionMode.DIRECTORIES_ONLY,
1041 cache_key=PathCacheKey.PLATE_IMPORT,
1042 on_result_callback=self._add_plate_callback,
1043 caller_id="plate_manager",
1044 enable_multi_selection=True
1045 )
1047 def _add_plate_callback(self, selected_paths) -> None:
1048 """Handle directory selection from file browser."""
1049 logger.debug(f"_add_plate_callback called with: {selected_paths} (type: {type(selected_paths)})")
1051 if selected_paths is None or selected_paths is False:
1052 self.app.current_status = "Plate selection cancelled"
1053 return
1055 # Handle both single path and list of paths
1056 if not isinstance(selected_paths, list):
1057 selected_paths = [selected_paths]
1059 added_plates = []
1060 current_plates = list(self.items)
1062 for selected_path in selected_paths:
1063 # Ensure selected_path is a Path object
1064 if isinstance(selected_path, str):
1065 selected_path = Path(selected_path)
1066 elif not isinstance(selected_path, Path):
1067 selected_path = Path(str(selected_path))
1069 # Check if plate already exists
1070 if any(plate['path'] == str(selected_path) for plate in current_plates):
1071 continue
1073 # Add the plate to the list
1074 plate_name = selected_path.name
1075 plate_path = str(selected_path)
1076 plate_entry = {
1077 'name': plate_name,
1078 'path': plate_path,
1079 # No status field - state comes from orchestrator
1080 }
1082 current_plates.append(plate_entry)
1083 added_plates.append(plate_name)
1085 # Cache the parent directory for next time (save user navigation time)
1086 if selected_paths:
1087 # Use parent of first selected path as the cached directory
1088 first_path = selected_paths[0] if isinstance(selected_paths[0], Path) else Path(selected_paths[0])
1089 parent_dir = first_path.parent
1090 get_path_cache().set_cached_path(PathCacheKey.PLATE_IMPORT, parent_dir)
1092 # Update items list using reactive property (triggers automatic UI update)
1093 self.items = current_plates
1095 if added_plates:
1096 if len(added_plates) == 1:
1097 self.app.current_status = f"Added plate: {added_plates[0]}"
1098 else:
1099 self.app.current_status = f"Added {len(added_plates)} plates: {', '.join(added_plates)}"
1100 else:
1101 self.app.current_status = "No new plates added (duplicates skipped)"
1103 def action_delete_plate(self) -> None:
1104 selected_items, _ = self.get_selection_state()
1105 if not selected_items:
1106 self.app.show_error("No plate selected to delete.")
1107 return
1109 paths_to_delete = {p['path'] for p in selected_items}
1110 self.items = [p for p in self.items if p['path'] not in paths_to_delete]
1112 # Clean up orchestrators for deleted plates
1113 for path in paths_to_delete:
1114 if path in self.orchestrators:
1115 del self.orchestrators[path]
1117 if self.selected_plate in paths_to_delete:
1118 self.selected_plate = ""
1120 self.app.current_status = f"Deleted {len(paths_to_delete)} plate(s)"
1124 async def action_edit_config(self) -> None:
1125 """
1126 Handle Edit button - create per-orchestrator PipelineConfig instances.
1128 This enables per-orchestrator configuration without affecting global configuration.
1129 Shows resolved defaults from GlobalPipelineConfig with "Pipeline default: {value}" placeholders.
1130 """
1131 selected_items, selection_mode = self.get_selection_state()
1133 if selection_mode == "empty":
1134 self.app.current_status = "No orchestrators selected for configuration"
1135 return
1137 selected_orchestrators = [
1138 self.orchestrators[item['path']] for item in selected_items
1139 if item['path'] in self.orchestrators
1140 ]
1142 if not selected_orchestrators:
1143 self.app.current_status = "No initialized orchestrators selected"
1144 return
1146 # Load existing config or create new one for editing
1147 representative_orchestrator = selected_orchestrators[0]
1149 # Use orchestrator's existing config if it exists, otherwise use global config as source
1150 source_config = representative_orchestrator.pipeline_config or self.global_config
1152 current_plate_config = create_dataclass_for_editing(PipelineConfig, source_config)
1154 def handle_config_save(new_config: PipelineConfig) -> None:
1155 """Apply per-orchestrator configuration without global side effects."""
1156 for orchestrator in selected_orchestrators:
1157 # Direct synchronous call - no async needed
1158 orchestrator.apply_pipeline_config(new_config)
1159 count = len(selected_orchestrators)
1160 self.app.current_status = f"Per-orchestrator configuration applied to {count} orchestrator(s)"
1162 # Open configuration window using PipelineConfig (not GlobalPipelineConfig)
1163 await self.window_service.open_config_window(
1164 PipelineConfig,
1165 current_plate_config,
1166 on_save_callback=handle_config_save
1167 )
1169 async def action_edit_global_config(self) -> None:
1170 """
1171 Handle global configuration editing - affects all orchestrators.
1173 This maintains the existing global configuration workflow but uses lazy loading.
1174 """
1176 from openhcs.core.config import PipelineConfig
1177 from openhcs.config_framework.lazy_factory import create_dataclass_for_editing
1179 # Get current global config from app or use default
1180 current_global_config = self.app.global_config or GlobalPipelineConfig()
1182 # Create lazy PipelineConfig for editing with proper thread-local context
1183 current_lazy_config = create_dataclass_for_editing(PipelineConfig, current_global_config, preserve_values=True)
1185 def handle_global_config_save(new_config: PipelineConfig) -> None:
1186 """Apply global configuration to all orchestrators."""
1187 # Convert lazy PipelineConfig back to GlobalPipelineConfig
1188 global_config = new_config.to_base_config()
1190 self.app.global_config = global_config # Update app-level config
1192 # REMOVED: Thread-local modification - dual-axis resolver handles context automatically
1194 for orchestrator in self.orchestrators.values():
1195 asyncio.create_task(orchestrator.apply_new_global_config(global_config))
1196 self.app.current_status = "Global configuration applied to all orchestrators"
1198 # PipelineConfig already imported from openhcs.core.config
1199 await self.window_service.open_config_window(
1200 PipelineConfig,
1201 current_lazy_config,
1202 on_save_callback=handle_global_config_save
1203 )
1207 def _analyze_orchestrator_configs(self, orchestrators: List['PipelineOrchestrator']) -> Dict[str, Dict[str, Any]]:
1208 """Analyze configs across multiple orchestrators to detect same/different values.
1210 Args:
1211 orchestrators: List of PipelineOrchestrator instances
1213 Returns:
1214 Dict mapping field names to analysis results:
1215 - {"type": "same", "value": actual_value, "default": default_value}
1216 - {"type": "different", "values": [val1, val2, ...], "default": default_value}
1217 """
1218 if not orchestrators:
1219 return {}
1221 # Get parameter info for defaults
1222 param_info = SignatureAnalyzer.analyze(GlobalPipelineConfig)
1224 config_analysis = {}
1226 # Analyze each field in GlobalPipelineConfig
1227 for field in dataclasses.fields(GlobalPipelineConfig):
1228 field_name = field.name
1230 # Get values from all orchestrators
1231 values = []
1232 for orch in orchestrators:
1233 try:
1234 value = getattr(orch.global_config, field_name)
1235 values.append(value)
1236 except AttributeError:
1237 # Field doesn't exist in this config, skip
1238 continue
1240 if not values:
1241 continue
1243 # Get default value from parameter info
1244 param_details = param_info.get(field_name)
1245 default_value = param_details.default_value if param_details else None
1247 # Check if all values are the same
1248 if all(self._values_equal(v, values[0]) for v in values):
1249 config_analysis[field_name] = {
1250 "type": "same",
1251 "value": values[0],
1252 "default": default_value
1253 }
1254 else:
1255 config_analysis[field_name] = {
1256 "type": "different",
1257 "values": values,
1258 "default": default_value
1259 }
1261 return config_analysis
1263 def _values_equal(self, val1: Any, val2: Any) -> bool:
1264 """Check if two values are equal, handling dataclasses and complex types."""
1265 # Handle dataclass comparison
1266 if dataclasses.is_dataclass(val1) and dataclasses.is_dataclass(val2):
1267 return dataclasses.asdict(val1) == dataclasses.asdict(val2)
1269 # Handle regular comparison
1270 return val1 == val2
1272 def action_init_plate(self) -> None:
1273 """Handle Init Plate button - initialize selected plates."""
1274 # Get current selection state
1275 selected_items, selection_mode = self.get_selection_state()
1277 if selection_mode == "empty":
1278 logger.warning("No plates available for initialization")
1279 return
1281 # Validate all selected plates can be initialized (allow ALL failed plates to be re-initialized)
1282 invalid_plates = []
1283 for item in selected_items:
1284 plate_path = item['path']
1285 orchestrator = self.orchestrators.get(plate_path)
1286 # Only block plates that are currently executing - all other states can be re-initialized
1287 if orchestrator is not None and orchestrator.state == OrchestratorState.EXECUTING:
1288 invalid_plates.append(item)
1290 if invalid_plates:
1291 names = [item['name'] for item in invalid_plates]
1292 logger.warning(f"Cannot initialize plates that are currently executing: {', '.join(names)}")
1293 return
1295 # Start async initialization
1296 self._start_async_init(selected_items, selection_mode)
1298 def _start_async_init(self, selected_items: List[Dict], selection_mode: str) -> None:
1299 """Start async initialization of selected plates."""
1300 # Generate operation description
1301 desc = self.get_operation_description(selected_items, selection_mode, "initialize")
1302 logger.info(f"Initializing: {desc}")
1304 # Start background worker
1305 self._init_plates_worker(selected_items)
1307 @work(exclusive=True)
1308 async def _init_plates_worker(self, selected_items: List[Dict]) -> None:
1309 """Background worker for plate initialization."""
1310 for plate_data in selected_items:
1311 plate_path = plate_data['path']
1313 # Find the actual plate in self.items (not the copy from get_selection_state)
1314 actual_plate = None
1315 for plate in self.items:
1316 if plate['path'] == plate_path:
1317 actual_plate = plate
1318 break
1320 if not actual_plate:
1321 logger.error(f"Plate not found in plates list: {plate_path}")
1322 continue
1324 try:
1325 # Run heavy initialization in executor to avoid blocking UI
1326 def init_orchestrator():
1327 return PipelineOrchestrator(
1328 plate_path=plate_path,
1329 global_config=self.global_config,
1330 storage_registry=self.filemanager.registry
1331 ).initialize()
1333 orchestrator = await asyncio.get_event_loop().run_in_executor(None, init_orchestrator)
1335 # Store orchestrator for later use (channel selection, etc.)
1336 self.orchestrators[plate_path] = orchestrator
1337 # Orchestrator state is already set to READY by initialize() method
1338 logger.info(f"Plate {actual_plate['name']} initialized successfully")
1340 except Exception as e:
1341 logger.error(f"Failed to initialize plate {plate_path}: {e}", exc_info=True)
1342 # Create a failed orchestrator to track the error state
1343 failed_orchestrator = PipelineOrchestrator(
1344 plate_path=plate_path,
1345 global_config=self.global_config,
1346 storage_registry=self.filemanager.registry
1347 )
1348 failed_orchestrator._state = OrchestratorState.INIT_FAILED
1349 self.orchestrators[plate_path] = failed_orchestrator
1350 actual_plate['error'] = str(e)
1352 # Trigger UI refresh after orchestrator state changes
1353 self._trigger_ui_refresh()
1354 # Update button states immediately (reactive system handles UI updates automatically)
1355 self._update_button_states()
1356 # Notify pipeline editor of status change
1357 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path']))
1358 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol)
1359 logger.debug(f"Updated plate {actual_plate['name']} status")
1361 # Final UI update (reactive system handles this automatically when self.items is modified)
1362 self._update_button_states()
1364 # Update status
1365 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.READY])
1366 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.INIT_FAILED])
1368 if error_count == 0:
1369 logger.info(f"Successfully initialized {success_count} plates")
1370 else:
1371 logger.warning(f"Initialized {success_count} plates, {error_count} errors")
1373 def action_compile_plate(self) -> None:
1374 """Handle Compile Plate button - compile pipelines for selected plates."""
1375 # Get current selection state
1376 selected_items, selection_mode = self.get_selection_state()
1378 if selection_mode == "empty":
1379 logger.warning("No plates available for compilation")
1380 return
1382 # Validate all selected plates are ready for compilation (allow failed plates to be re-compiled)
1383 not_ready = []
1384 for item in selected_items:
1385 plate_path = item['path']
1386 orchestrator = self.orchestrators.get(plate_path)
1387 # Allow READY, COMPILE_FAILED, EXEC_FAILED, COMPILED, and COMPLETED states to be compiled/recompiled
1388 if orchestrator is None or orchestrator.state not in [OrchestratorState.READY, OrchestratorState.COMPILE_FAILED, OrchestratorState.EXEC_FAILED, OrchestratorState.COMPILED, OrchestratorState.COMPLETED]:
1389 not_ready.append(item)
1391 if not_ready:
1392 names = [item['name'] for item in not_ready]
1393 # More accurate error message based on actual state
1394 if any(self.orchestrators.get(item['path']) is None for item in not_ready):
1395 logger.warning(f"Cannot compile plates that haven't been initialized: {', '.join(names)}")
1396 elif any(self.orchestrators.get(item['path']).state == OrchestratorState.EXECUTING for item in not_ready):
1397 logger.warning(f"Cannot compile plates that are currently executing: {', '.join(names)}")
1398 else:
1399 logger.warning(f"Cannot compile plates in current state: {', '.join(names)}")
1400 return
1402 # Validate all selected plates have pipelines
1403 no_pipeline = []
1404 for item in selected_items:
1405 pipeline = self._get_current_pipeline_definition(item['path'])
1406 if not pipeline:
1407 no_pipeline.append(item)
1409 if no_pipeline:
1410 names = [item['name'] for item in no_pipeline]
1411 self.app.current_status = f"Cannot compile plates without pipelines: {', '.join(names)}"
1412 return
1414 # Start async compilation
1415 self._start_async_compile(selected_items, selection_mode)
1417 def _start_async_compile(self, selected_items: List[Dict], selection_mode: str) -> None:
1418 """Start async compilation of selected plates."""
1419 # Generate operation description
1420 desc = self.get_operation_description(selected_items, selection_mode, "compile")
1421 logger.info(f"Compiling: {desc}")
1423 # Start background worker
1424 self._compile_plates_worker(selected_items)
1426 @work(exclusive=True)
1427 async def _compile_plates_worker(self, selected_items: List[Dict]) -> None:
1428 """Background worker for plate compilation."""
1429 for plate_data in selected_items:
1430 plate_path = plate_data['path']
1432 # Find the actual plate in self.items (not the copy from get_selection_state)
1433 actual_plate = None
1434 for plate in self.items:
1435 if plate['path'] == plate_path:
1436 actual_plate = plate
1437 break
1439 if not actual_plate:
1440 logger.error(f"Plate not found in plates list: {plate_path}")
1441 continue
1443 # Get definition pipeline and make fresh copy
1444 definition_pipeline = self._get_current_pipeline_definition(plate_path)
1445 if not definition_pipeline:
1446 logger.warning(f"No pipeline defined for {actual_plate['name']}, using empty pipeline")
1447 definition_pipeline = []
1449 try:
1450 # Get or create orchestrator for compilation (run in executor to avoid blocking)
1451 def get_or_create_orchestrator():
1452 if plate_path in self.orchestrators:
1453 orchestrator = self.orchestrators[plate_path]
1454 if not orchestrator.is_initialized():
1455 orchestrator.initialize()
1456 return orchestrator
1457 else:
1458 return PipelineOrchestrator(
1459 plate_path=plate_path,
1460 global_config=self.global_config,
1461 storage_registry=self.filemanager.registry
1462 ).initialize()
1464 orchestrator = await asyncio.get_event_loop().run_in_executor(None, get_or_create_orchestrator)
1465 self.orchestrators[plate_path] = orchestrator
1467 # Make fresh copy for compilation
1468 execution_pipeline = copy.deepcopy(definition_pipeline)
1470 # Fix step IDs after deep copy to match new object IDs
1471 for step in execution_pipeline:
1472 step.step_id = str(id(step))
1473 # Ensure variable_components is never None - use FunctionStep default
1474 if step.variable_components is None:
1475 logger.warning(f"🔥 Step '{step.name}' has None variable_components, setting FunctionStep default")
1476 step.variable_components = [VariableComponents.SITE]
1477 # Also ensure it's not an empty list
1478 elif not step.variable_components:
1479 logger.warning(f"🔥 Step '{step.name}' has empty variable_components, setting FunctionStep default")
1480 step.variable_components = [VariableComponents.SITE]
1482 # Get wells and compile (async - run in executor to avoid blocking UI)
1483 # Wrap in Pipeline object like test_main.py does
1484 pipeline_obj = Pipeline(steps=execution_pipeline)
1486 # Run heavy operations in executor to avoid blocking UI
1487 # Get wells using multiprocessing axis (WELL in default config)
1488 from openhcs.constants import MULTIPROCESSING_AXIS
1489 wells = await asyncio.get_event_loop().run_in_executor(None, lambda: orchestrator.get_component_keys(MULTIPROCESSING_AXIS))
1490 compiled_contexts = await asyncio.get_event_loop().run_in_executor(
1491 None, orchestrator.compile_pipelines, pipeline_obj.steps, wells
1492 )
1494 # Store state simply - no reactive property issues
1495 step_ids_in_pipeline = [id(step) for step in execution_pipeline]
1496 # Get step IDs from contexts (ProcessingContext objects)
1497 first_well_key = list(compiled_contexts.keys())[0] if compiled_contexts else None
1498 step_ids_in_contexts = list(compiled_contexts[first_well_key].step_plans.keys()) if first_well_key and hasattr(compiled_contexts[first_well_key], 'step_plans') else []
1499 logger.info(f"🔥 Storing compiled data for {plate_path}: pipeline={type(execution_pipeline)}, contexts={type(compiled_contexts)}")
1500 logger.info(f"🔥 Step IDs in pipeline: {step_ids_in_pipeline}")
1501 logger.info(f"🔥 Step IDs in contexts: {step_ids_in_contexts}")
1502 self.plate_compiled_data[plate_path] = (execution_pipeline, compiled_contexts)
1503 logger.info(f"🔥 Stored! Available compiled plates: {list(self.plate_compiled_data.keys())}")
1505 # Orchestrator state is already set to COMPILED by compile_pipelines() method
1506 logger.info(f"🔥 Successfully compiled {plate_path}")
1508 except Exception as e:
1509 logger.error(f"🔥 COMPILATION ERROR: Pipeline compilation failed for {plate_path}: {e}", exc_info=True)
1510 # Orchestrator state is already set to FAILED by compile_pipelines() method
1511 actual_plate['error'] = str(e)
1512 # Don't store anything in plate_compiled_data on failure
1514 # Trigger UI refresh after orchestrator state changes
1515 self._trigger_ui_refresh()
1516 # Update button states immediately (reactive system handles UI updates automatically)
1517 self._update_button_states()
1518 # Notify pipeline editor of status change
1519 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path']))
1520 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol)
1522 # Final UI update (reactive system handles this automatically when self.items is modified)
1523 self._update_button_states()
1525 # Update status
1526 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILED])
1527 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILE_FAILED])
1529 if error_count == 0:
1530 logger.info(f"Successfully compiled {success_count} plates")
1531 else:
1532 logger.warning(f"Compiled {success_count} plates, {error_count} errors")
1534 async def action_code_plate(self) -> None:
1535 """Generate Python code for selected plates and their pipelines."""
1536 logger.debug("Code button pressed - generating Python code for plates")
1538 selected_items, _ = self.get_selection_state()
1539 if not selected_items:
1540 self.app.current_status = "No plates selected for code generation"
1541 return
1543 try:
1544 # Get pipeline data for selected plates
1545 plate_paths = [item['path'] for item in selected_items]
1546 pipeline_data = {}
1548 # Collect pipeline steps for each plate
1549 for plate_path in plate_paths:
1550 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1551 # Get pipeline steps from pipeline editor if available
1552 if plate_path in self.pipeline_editor.plate_pipelines:
1553 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path]
1554 else:
1555 pipeline_data[plate_path] = []
1556 else:
1557 pipeline_data[plate_path] = []
1559 # Use existing pickle_to_python logic to generate complete script
1560 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
1562 # Create data structure like pickle_to_python expects
1563 data = {
1564 'plate_paths': plate_paths,
1565 'pipeline_data': pipeline_data,
1566 'global_config': self.app.global_config
1567 }
1569 # Extract variables from data dict
1570 plate_paths = data['plate_paths']
1571 pipeline_data = data['pipeline_data']
1573 # Generate just the orchestrator configuration (no execution wrapper)
1574 from openhcs.debug.pickle_to_python import generate_complete_orchestrator_code
1576 python_code = generate_complete_orchestrator_code(
1577 plate_paths=plate_paths,
1578 pipeline_data=pipeline_data,
1579 global_config=self.app.global_config,
1580 clean_mode=True # Default to clean mode - only show non-default values
1581 )
1583 # Create callback to handle edited code
1584 def handle_edited_code(edited_code: str):
1585 logger.debug("Orchestrator code edited, processing changes...")
1586 try:
1587 # Execute the code (it has all necessary imports)
1588 namespace = {}
1589 exec(edited_code, namespace)
1591 # Update pipeline data if present (composition: orchestrator contains pipelines)
1592 if 'pipeline_data' in namespace:
1593 new_pipeline_data = namespace['pipeline_data']
1594 # Update pipeline editor using reactive system (like pipeline code button does)
1595 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1596 # Update plate pipelines storage
1597 current_pipelines = dict(self.pipeline_editor.plate_pipelines)
1598 current_pipelines.update(new_pipeline_data)
1599 self.pipeline_editor.plate_pipelines = current_pipelines
1601 # If current plate is in the edited data, update the current view too
1602 current_plate = self.pipeline_editor.current_plate
1603 if current_plate and current_plate in new_pipeline_data:
1604 self.pipeline_editor.pipeline_steps = new_pipeline_data[current_plate]
1606 self.app.current_status = f"Pipeline data updated for {len(new_pipeline_data)} plates"
1608 # Update global config if present
1609 elif 'global_config' in namespace:
1610 new_global_config = namespace['global_config']
1611 import asyncio
1612 for plate_path in plate_paths:
1613 if plate_path in self.orchestrators:
1614 orchestrator = self.orchestrators[plate_path]
1615 asyncio.create_task(orchestrator.apply_new_global_config(new_global_config))
1616 self.app.current_status = f"Global config updated for {len(plate_paths)} plates"
1618 # Update orchestrators list if present
1619 elif 'orchestrators' in namespace:
1620 new_orchestrators = namespace['orchestrators']
1621 self.app.current_status = f"Orchestrator list updated with {len(new_orchestrators)} orchestrators"
1623 else:
1624 self.app.show_error("Parse Error", "No valid assignments found in edited code")
1626 except SyntaxError as e:
1627 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
1628 except Exception as e:
1629 import traceback
1630 full_traceback = traceback.format_exc()
1631 logger.error(f"Failed to parse edited orchestrator code: {e}\nFull traceback:\n{full_traceback}")
1632 self.app.show_error("Edit Error", f"Failed to parse orchestrator code: {str(e)}\n\nFull traceback:\n{full_traceback}")
1634 # Launch terminal editor
1635 launcher = TerminalLauncher(self.app)
1636 await launcher.launch_editor_for_file(
1637 file_content=python_code,
1638 file_extension='.py',
1639 on_save_callback=handle_edited_code
1640 )
1642 except Exception as e:
1643 logger.error(f"Failed to generate plate code: {e}")
1644 self.app.current_status = f"Failed to generate code: {e}"
1646 async def action_save_python_script(self) -> None:
1647 """Save Python script for selected plates (like special_io_pipeline.py)."""
1648 logger.debug("Save button pressed - saving Python script for plates")
1650 selected_items, _ = self.get_selection_state()
1651 if not selected_items:
1652 self.app.current_status = "No plates selected for script generation"
1653 return
1655 try:
1656 # Get pipeline data for selected plates
1657 plate_paths = [item['path'] for item in selected_items]
1658 pipeline_data = {}
1660 # Collect pipeline steps for each plate
1661 for plate_path in plate_paths:
1662 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1663 # Get pipeline steps from pipeline editor if available
1664 if plate_path in self.pipeline_editor.plate_pipelines:
1665 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path]
1666 else:
1667 pipeline_data[plate_path] = []
1668 else:
1669 pipeline_data[plate_path] = []
1671 # Create data structure like pickle_to_python expects
1672 data = {
1673 'plate_paths': plate_paths,
1674 'pipeline_data': pipeline_data,
1675 'global_config': self.app.global_config
1676 }
1678 # Generate complete executable Python script using pickle_to_python logic
1679 python_code = self._generate_executable_script(data)
1681 # Launch file browser to save the script
1682 from openhcs.textual_tui.windows.file_browser_window import open_file_browser_window, BrowserMode
1683 from openhcs.textual_tui.services.file_browser_service import SelectionMode
1684 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
1685 from openhcs.constants.constants import Backend
1687 def handle_save_result(result):
1688 if result:
1689 # Handle both single Path and list of Paths
1690 save_path = None
1691 if isinstance(result, Path):
1692 save_path = result
1693 elif isinstance(result, list) and len(result) > 0:
1694 save_path = result[0] # Take first path
1696 if save_path:
1697 try:
1698 # Write the Python script to the selected file
1699 with open(save_path, 'w') as f:
1700 f.write(python_code)
1702 logger.info(f"Python script saved to: {save_path}")
1703 self.app.current_status = f"Python script saved to: {save_path}"
1704 except Exception as e:
1705 logger.error(f"Failed to save Python script: {e}")
1706 self.app.current_status = f"Failed to save script: {e}"
1708 # Generate default filename based on first plate
1709 first_plate_name = Path(plate_paths[0]).name if plate_paths else "pipeline"
1710 default_filename = f"{first_plate_name}_pipeline.py"
1712 await open_file_browser_window(
1713 app=self.app,
1714 file_manager=self.app.filemanager,
1715 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
1716 backend=Backend.DISK,
1717 title="Save Python Pipeline Script",
1718 mode=BrowserMode.SAVE,
1719 selection_mode=SelectionMode.FILES_ONLY,
1720 filter_extensions=['.py'],
1721 default_filename=default_filename,
1722 cache_key=PathCacheKey.PIPELINE_FILES,
1723 on_result_callback=handle_save_result,
1724 caller_id="plate_manager_save_script"
1725 )
1727 except Exception as e:
1728 logger.error(f"Failed to save Python script: {e}")
1729 self.app.current_status = f"Failed to save script: {e}"
1731 def _generate_executable_script(self, data: Dict) -> str:
1732 """Generate fully executable Python script by creating a temp pickle and using existing convert_pickle_to_python."""
1733 import tempfile
1734 import dill as pickle
1735 from openhcs.debug.pickle_to_python import convert_pickle_to_python
1737 # Create temporary pickle file
1738 with tempfile.NamedTemporaryFile(mode='wb', suffix='.pkl', delete=False) as temp_pickle:
1739 pickle.dump(data, temp_pickle)
1740 temp_pickle_path = temp_pickle.name
1742 # Create temporary output file
1743 with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp_output:
1744 temp_output_path = temp_output.name
1746 try:
1747 # Use existing convert_pickle_to_python function
1748 convert_pickle_to_python(temp_pickle_path, temp_output_path)
1750 # Read the generated script
1751 with open(temp_output_path, 'r') as f:
1752 script_content = f.read()
1754 return script_content
1756 finally:
1757 # Clean up temp files
1758 import os
1759 try:
1760 os.unlink(temp_pickle_path)
1761 os.unlink(temp_output_path)
1762 except:
1763 pass