Coverage for openhcs/textual_tui/widgets/plate_manager.py: 0.0%
938 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2PlateManagerWidget for OpenHCS Textual TUI
4Plate management widget with complete button set and reactive state management.
5Matches the functionality from the current prompt-toolkit TUI.
6"""
8import asyncio
9import copy
10import dataclasses
11import inspect
12import json
13import logging
14import numpy as np
15import os
16import pickle
17import re
18import signal
19import stat
20import subprocess
21import sys
22import tempfile
23import time
24import traceback
25from collections import defaultdict
26from datetime import datetime
27from pathlib import Path
28from typing import Dict, List, Optional, Callable, Any, Tuple
30from openhcs.core.config import PipelineConfig
32from PIL import Image
33from textual.app import ComposeResult
34from textual.containers import Horizontal, ScrollableContainer
35from textual.reactive import reactive
36from textual.widgets import Button, Static, SelectionList
37from textual.widget import Widget
38from textual.css.query import NoMatches
39from .button_list_widget import ButtonListWidget, ButtonConfig
40from textual import work, on
42from openhcs.core.config import GlobalPipelineConfig, VFSConfig, MaterializationBackend
43from openhcs.core.pipeline import Pipeline
44from openhcs.io.filemanager import FileManager
45from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks
46from openhcs.io.base import storage_registry
47from openhcs.io.memory import MemoryStorageBackend
48from openhcs.io.zarr import ZarrStorageBackend
49from openhcs.core.orchestrator.orchestrator import PipelineOrchestrator
50from openhcs.constants.constants import GroupBy, Backend, VariableComponents, OrchestratorState
51from openhcs.textual_tui.services.file_browser_service import SelectionMode
52from openhcs.textual_tui.services.window_service import WindowService
53from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey, get_path_cache
54from openhcs.textual_tui.widgets.shared.signature_analyzer import SignatureAnalyzer
56logger = logging.getLogger(__name__)
58# Note: Using subprocess approach instead of multiprocessing to avoid TUI FD conflicts
60def get_orchestrator_status_symbol(orchestrator: PipelineOrchestrator) -> str:
61 """Get UI symbol for orchestrator state - simple mapping without over-engineering."""
62 if orchestrator is None:
63 return "?" # No orchestrator (newly added plate)
65 state_to_symbol = {
66 OrchestratorState.CREATED: "?", # Created but not initialized
67 OrchestratorState.READY: "-", # Initialized, ready for compilation
68 OrchestratorState.COMPILED: "o", # Compiled, ready for execution
69 OrchestratorState.EXECUTING: "!", # Execution in progress
70 OrchestratorState.COMPLETED: "C", # Execution completed successfully
71 OrchestratorState.INIT_FAILED: "I", # Initialization failed
72 OrchestratorState.COMPILE_FAILED: "P", # Compilation failed (P for Pipeline)
73 OrchestratorState.EXEC_FAILED: "X", # Execution failed
74 }
76 return state_to_symbol.get(orchestrator.state, "?")
78def get_current_log_file_path() -> str:
79 """Get the current log file path from the logging system."""
80 try:
81 # Get the root logger and find the FileHandler
82 root_logger = logging.getLogger()
83 for handler in root_logger.handlers:
84 if isinstance(handler, logging.FileHandler):
85 return handler.baseFilename
87 # Fallback: try to get from openhcs logger
88 openhcs_logger = logging.getLogger("openhcs")
89 for handler in openhcs_logger.handlers:
90 if isinstance(handler, logging.FileHandler):
91 return handler.baseFilename
93 # Last resort: create a default path
94 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs"
95 log_dir.mkdir(parents=True, exist_ok=True)
96 return str(log_dir / f"openhcs_subprocess_{int(time.time())}.log")
98 except Exception as e:
99 logger.warning(f"Could not determine log file path: {e}")
100 return "/tmp/openhcs_subprocess.log"
108class PlateManagerWidget(ButtonListWidget):
109 """
110 Plate management widget using Textual reactive state.
111 """
113 # Semantic reactive property (like PipelineEditor's pipeline_steps)
114 selected_plate = reactive("")
115 orchestrators = reactive({})
116 plate_configs = reactive({})
117 orchestrator_state_version = reactive(0) # Increment to trigger UI refresh
119 def __init__(self, filemanager: FileManager, global_config: GlobalPipelineConfig):
120 button_configs = [
121 ButtonConfig("Add", "add_plate"),
122 ButtonConfig("Del", "del_plate", disabled=True),
123 ButtonConfig("Edit", "edit_config", disabled=True), # Unified edit button for config editing
124 ButtonConfig("Init", "init_plate", disabled=True),
125 ButtonConfig("Compile", "compile_plate", disabled=True),
126 ButtonConfig("Run", "run_plate", disabled=True),
127 ButtonConfig("Code", "code_plate", disabled=True), # Generate Python code
128 ButtonConfig("Save", "save_python_script", disabled=True), # Save Python script
129 # ButtonConfig("Export", "export_ome_zarr", disabled=True), # Export to OME-ZARR - HIDDEN FROM UI
130 ]
131 super().__init__(
132 button_configs=button_configs,
133 list_id="plate_content",
134 container_id="plate_list",
135 on_button_pressed=self._handle_button_press,
136 on_selection_changed=self._handle_selection_change,
137 on_item_moved=self._handle_item_moved
138 )
139 self.filemanager = filemanager
140 self.global_config = global_config
141 self.plate_compiled_data = {}
142 self.on_plate_selected: Optional[Callable[[str], None]] = None
143 self.pipeline_editor: Optional['PipelineEditorWidget'] = None
145 # Initialize window service to avoid circular imports
146 self.window_service = None # Will be set in on_mount
148 # --- Subprocess Architecture ---
149 self.current_process: Optional[subprocess.Popen] = None
150 self.log_file_path: Optional[str] = None # Single source of truth
151 self.log_file_position: int = 0 # Track position in log file for incremental reading
152 # Async monitoring using Textual's interval system
153 self.monitoring_interval = None
154 self.monitoring_active = False
155 # ---
157 logger.debug("PlateManagerWidget initialized")
163 def on_unmount(self) -> None:
164 logger.debug("Unmounting PlateManagerWidget, ensuring worker process is terminated.")
165 # Schedule async stop execution since on_unmount is sync
166 import asyncio
167 if self.current_process and self.current_process.poll() is None:
168 # Create a task to stop execution asynchronously
169 asyncio.create_task(self.action_stop_execution())
170 self._stop_monitoring()
172 def format_item_for_display(self, plate: Dict) -> Tuple[str, str]:
173 # Get status from orchestrator instead of magic string
174 plate_path = plate.get('path', '')
175 orchestrator = self.orchestrators.get(plate_path)
176 status_symbol = get_orchestrator_status_symbol(orchestrator)
178 status_symbols = {
179 "?": "➕", # Created (not initialized)
180 "-": "✅", # Ready (initialized)
181 "o": "⚡", # Compiled
182 "!": "🔄", # Executing
183 "C": "🏁", # Completed
184 "I": "🚫", # Init failed
185 "P": "💥", # Compile failed (Pipeline)
186 "X": "❌" # Execution failed
187 }
188 status_icon = status_symbols.get(status_symbol, "❓")
189 plate_name = plate.get('name', 'Unknown')
190 display_text = f"{status_icon} {plate_name} - {plate_path}"
191 return display_text, plate_path
193 async def _handle_button_press(self, button_id: str) -> None:
194 action_map = {
195 "add_plate": self.action_add_plate,
196 "del_plate": self.action_delete_plate,
197 "edit_config": self.action_edit_config, # Unified edit button
198 "init_plate": self.action_init_plate,
199 "compile_plate": self.action_compile_plate,
200 "code_plate": self.action_code_plate, # Generate Python code
201 "save_python_script": self.action_save_python_script, # Save Python script
202 # "export_ome_zarr": self.action_export_ome_zarr, # HIDDEN
203 }
204 if button_id in action_map:
205 action = action_map[button_id]
206 if inspect.iscoroutinefunction(action):
207 await action()
208 else:
209 action()
210 elif button_id == "run_plate":
211 if self._is_any_plate_running():
212 await self.action_stop_execution()
213 else:
214 await self.action_run_plate()
216 def _handle_selection_change(self, selected_values: List[str]) -> None:
217 logger.debug(f"Checkmarks changed: {len(selected_values)} items selected")
219 def _handle_item_moved(self, from_index: int, to_index: int) -> None:
220 current_plates = list(self.items)
221 plate = current_plates.pop(from_index)
222 current_plates.insert(to_index, plate)
223 self.items = current_plates
224 plate_name = plate['name']
225 direction = "up" if to_index < from_index else "down"
226 self.app.current_status = f"Moved plate '{plate_name}' {direction}"
228 def on_mount(self) -> None:
229 # Initialize window service
230 self.window_service = WindowService(self.app)
232 self.call_later(self._delayed_update_display)
233 self.call_later(self._update_button_states)
235 def watch_items(self, items: List[Dict]) -> None:
236 """Automatically update UI when items changes (follows ButtonListWidget pattern)."""
237 # DEBUG: Log when items list changes to track the source of the reset
238 stack_trace = ''.join(traceback.format_stack()[-3:-1]) # Get last 2 stack frames
239 logger.debug(f"🔍 ITEMS CHANGED: {len(items)} plates. Call stack:\n{stack_trace}")
241 # CRITICAL: Call parent's watch_items to update the SelectionList
242 super().watch_items(items)
244 logger.debug(f"Plates updated: {len(items)} plates")
245 self._update_button_states()
247 def watch_highlighted_item(self, plate_path: str) -> None:
248 self.selected_plate = plate_path
249 logger.debug(f"Highlighted plate: {plate_path}")
251 def watch_selected_plate(self, plate_path: str) -> None:
252 self._update_button_states()
253 if self.on_plate_selected and plate_path:
254 self.on_plate_selected(plate_path)
255 logger.debug(f"Selected plate: {plate_path}")
257 def watch_orchestrator_state_version(self, version: int) -> None:
258 """Automatically refresh UI when orchestrator states change."""
259 # Only update UI if widget is properly mounted
260 if not self.is_mounted:
261 return
263 # Force SelectionList to update by calling _update_selection_list
264 # This re-calls format_item_for_display() for all items
265 self._update_selection_list()
267 # CRITICAL: Update main button states when orchestrator states change
268 self._update_button_states()
270 # Also notify PipelineEditor if connected
271 if self.pipeline_editor:
272 logger.debug(f"PlateManager: Notifying PipelineEditor of orchestrator state change (version {version})")
273 self.pipeline_editor._update_button_states()
275 def get_selection_state(self) -> tuple[List[Dict], str]:
276 # Check if widget is properly mounted first
277 if not self.is_mounted:
278 logger.debug("get_selection_state called on unmounted widget")
279 return [], "empty"
281 try:
282 selection_list = self.query_one(f"#{self.list_id}")
283 multi_selected_values = selection_list.selected
284 if multi_selected_values:
285 selected_items = [p for p in self.items if p.get('path') in multi_selected_values]
286 return selected_items, "checkbox"
287 elif self.selected_plate:
288 selected_items = [p for p in self.items if p.get('path') == self.selected_plate]
289 return selected_items, "cursor"
290 else:
291 return [], "empty"
292 except Exception as e:
293 # DOM CORRUPTION DETECTED - This is a critical error
294 stack_trace = ''.join(traceback.format_stack()[-3:-1])
295 logger.error(f"🚨 DOM CORRUPTION: Failed to get selection state: {e}")
296 logger.error(f"🚨 DOM CORRUPTION: Call stack:\n{stack_trace}")
297 logger.error(f"🚨 DOM CORRUPTION: Widget mounted: {self.is_mounted}")
298 logger.error(f"🚨 DOM CORRUPTION: Looking for: #{self.list_id}")
299 logger.error(f"🚨 DOM CORRUPTION: Plates count: {len(self.items)}")
301 # Try to diagnose what widgets actually exist
302 try:
303 all_widgets = list(self.query("*"))
304 widget_ids = [w.id for w in all_widgets if w.id]
305 logger.error(f"🚨 DOM CORRUPTION: Available widget IDs: {widget_ids}")
306 except Exception as diag_e:
307 logger.error(f"🚨 DOM CORRUPTION: Could not diagnose widgets: {diag_e}")
309 if self.selected_plate:
310 selected_items = [p for p in self.items if p.get('path') == self.selected_plate]
311 return selected_items, "cursor"
312 return [], "empty"
314 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str:
315 count = len(selected_items)
316 if count == 0: return f"No items for {operation}"
317 if count == 1: return f"{operation.title()} item: {selected_items[0].get('name', 'Unknown')}"
318 return f"{operation.title()} {count} items"
320 def _delayed_update_display(self) -> None:
321 """Trigger UI update - no longer needed since reactive system handles this automatically."""
322 # The reactive system now handles updates automatically via watch_plates()
323 # This method is kept for compatibility but does nothing
324 pass
326 def _trigger_ui_refresh(self) -> None:
327 """Force UI refresh when orchestrator state changes without items list changing."""
328 # Increment reactive counter to trigger automatic UI refresh
329 self.orchestrator_state_version += 1
331 def _update_button_states(self) -> None:
332 try:
333 # Check if widget is mounted and buttons exist
334 if not self.is_mounted:
335 return
337 has_selection = bool(self.selected_plate)
338 is_running = self._is_any_plate_running()
340 # Check if there are any selected items (for delete button)
341 selected_items, _ = self.get_selection_state()
342 has_selected_items = bool(selected_items)
344 can_run = has_selection and any(p['path'] in self.plate_compiled_data for p in self.items if p.get('path') == self.selected_plate)
346 # Try to get run button - if it doesn't exist, widget is not fully mounted
347 try:
348 run_button = self.query_one("#run_plate")
349 if is_running:
350 run_button.label = "Stop"
351 run_button.disabled = False
352 else:
353 run_button.label = "Run"
354 run_button.disabled = not can_run
355 except:
356 # Buttons not mounted yet, skip update
357 return
359 self.query_one("#add_plate").disabled = is_running
360 self.query_one("#del_plate").disabled = not self.items or not has_selected_items or is_running
362 # Edit button (config editing) enabled when 1+ orchestrators selected and initialized
363 selected_items, _ = self.get_selection_state()
364 edit_enabled = (
365 len(selected_items) > 0 and
366 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
367 not is_running
368 )
369 self.query_one("#edit_config").disabled = not edit_enabled
371 # Init button - enabled when plates are selected, can be initialized, and not running
372 init_enabled = (
373 len(selected_items) > 0 and
374 any(self._can_orchestrator_be_initialized(item['path']) for item in selected_items) and
375 not is_running
376 )
377 self.query_one("#init_plate").disabled = not init_enabled
379 # Compile button - enabled when plates are selected, initialized, and not running
380 selected_items, _ = self.get_selection_state()
381 compile_enabled = (
382 len(selected_items) > 0 and
383 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
384 not is_running
385 )
386 self.query_one("#compile_plate").disabled = not compile_enabled
388 # Code button - enabled when plates are selected, initialized, and not running
389 code_enabled = (
390 len(selected_items) > 0 and
391 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
392 not is_running
393 )
394 self.query_one("#code_plate").disabled = not code_enabled
396 # Save Python script button - enabled when plates are selected, initialized, and not running
397 save_enabled = (
398 len(selected_items) > 0 and
399 all(self._is_orchestrator_initialized(item['path']) for item in selected_items) and
400 not is_running
401 )
402 self.query_one("#save_python_script").disabled = not save_enabled
404 # Export button - enabled when plate is initialized and has workspace (HIDDEN FROM UI)
405 # export_enabled = (
406 # has_selection and
407 # self.selected_plate in self.orchestrators and
408 # not is_running
409 # )
410 # try:
411 # self.query_one("#export_ome_zarr").disabled = not export_enabled
412 # except:
413 # pass # Button is hidden from UI
415 # Debug button removed - no longer needed
417 except Exception as e:
418 # Only log if it's not a mounting/unmounting issue
419 if self.is_mounted:
420 logger.debug(f"Button state update skipped (widget not ready): {e}")
421 # Don't log errors during unmounting
423 def _is_any_plate_running(self) -> bool:
424 return self.current_process is not None and self.current_process.poll() is None
426 def _has_pipelines(self, plates: List[Dict]) -> bool:
427 """Check if all plates have pipeline definitions."""
428 if not self.pipeline_editor:
429 return False
431 for plate in plates:
432 pipeline = self.pipeline_editor.get_pipeline_for_plate(plate['path'])
433 if not pipeline:
434 return False
435 return True
437 def get_plate_status(self, plate_path: str) -> str:
438 """Get status for specific plate - now uses orchestrator state."""
439 orchestrator = self.orchestrators.get(plate_path)
440 return get_orchestrator_status_symbol(orchestrator)
442 def _is_orchestrator_initialized(self, plate_path: str) -> bool:
443 """Check if orchestrator exists and is in an initialized state."""
444 orchestrator = self.orchestrators.get(plate_path)
445 if orchestrator is None:
446 return False
447 return orchestrator.state in [OrchestratorState.READY, OrchestratorState.COMPILED,
448 OrchestratorState.COMPLETED, OrchestratorState.COMPILE_FAILED,
449 OrchestratorState.EXEC_FAILED]
451 def _can_orchestrator_be_initialized(self, plate_path: str) -> bool:
452 """Check if orchestrator can be initialized (doesn't exist or is in a re-initializable state)."""
453 orchestrator = self.orchestrators.get(plate_path)
454 if orchestrator is None:
455 return True # No orchestrator exists, can be initialized
456 return orchestrator.state in [OrchestratorState.CREATED, OrchestratorState.INIT_FAILED]
458 def _notify_pipeline_editor_status_change(self, plate_path: str, new_status: str) -> None:
459 """Notify pipeline editor when plate status changes (enables Add button immediately)."""
460 if self.pipeline_editor and self.pipeline_editor.current_plate == plate_path:
461 # Update pipeline editor's status and trigger button state update
462 self.pipeline_editor.current_plate_status = new_status
464 def _get_current_pipeline_definition(self, plate_path: str = None) -> List:
465 """Get current pipeline definition from PipelineEditor (now returns FunctionStep objects directly)."""
466 if not self.pipeline_editor:
467 logger.warning("No pipeline editor reference - using empty pipeline")
468 return []
470 # Get pipeline for specific plate or current plate
471 target_plate = plate_path or self.pipeline_editor.current_plate
472 if not target_plate:
473 logger.warning("No plate specified - using empty pipeline")
474 return []
476 # Get pipeline from editor (now returns List[FunctionStep] directly)
477 pipeline_steps = self.pipeline_editor.get_pipeline_for_plate(target_plate)
479 # No conversion needed - pipeline_steps are already FunctionStep objects with memory type decorators
480 return pipeline_steps
482 def get_operation_description(self, selected_items: List[Dict], selection_mode: str, operation: str) -> str:
483 """Generate human-readable description of what will be operated on."""
484 count = len(selected_items)
485 if selection_mode == "empty":
486 return f"No items available for {operation}"
487 elif selection_mode == "all":
488 return f"{operation.title()} ALL {count} items"
489 elif selection_mode == "checkbox":
490 if count == 1:
491 item_name = selected_items[0].get('name', 'Unknown')
492 return f"{operation.title()} selected item: {item_name}"
493 else:
494 return f"{operation.title()} {count} selected items"
495 else:
496 return f"{operation.title()} {count} items"
498 def _reset_execution_state(self, status_message: str, force_fail_executing: bool = True):
499 if self.current_process:
500 if self.current_process.poll() is None: # Still running
501 logger.warning("Forcefully terminating subprocess during reset.")
502 self.current_process.terminate()
503 try:
504 self.current_process.wait(timeout=1)
505 except subprocess.TimeoutExpired:
506 self.current_process.kill() # Force kill if terminate fails
507 self.current_process = None
509 # Clear log file reference (no temp files - log file is single source of truth)
510 self.log_file_path = None
511 self.log_file_position = 0
513 # Stop async monitoring
514 self._stop_monitoring()
516 # Only reset executing orchestrators to failed if this is a forced termination
517 # Natural completion should preserve the states set by the completion handler
518 if force_fail_executing:
519 for plate_path, orchestrator in self.orchestrators.items():
520 if orchestrator.state == OrchestratorState.EXECUTING:
521 orchestrator._state = OrchestratorState.EXEC_FAILED
523 # Trigger UI refresh after state changes - this is essential for button states
524 self._trigger_ui_refresh()
526 # Update button states - but only if widget is properly mounted
527 try:
528 if self.is_mounted and hasattr(self, 'query_one'):
529 self._update_button_states()
530 except Exception as e:
531 logger.error(f"Failed to update button states during reset: {e}")
533 self.app.current_status = status_message
535 async def action_run_plate(self) -> None:
536 # Clear logs from singleton toolong window before starting new run
537 try:
538 from openhcs.textual_tui.windows.toolong_window import clear_toolong_logs
539 logger.info("Clearing logs from singleton toolong window before new run")
540 clear_toolong_logs(self.app)
541 logger.info("Toolong logs cleared")
542 except Exception as e:
543 logger.error(f"Failed to clear toolong logs: {e}")
544 import traceback
545 logger.error(traceback.format_exc())
547 selected_items, _ = self.get_selection_state()
548 if not selected_items:
549 self.app.show_error("No plates selected to run.")
550 return
552 ready_items = [item for item in selected_items if item.get('path') in self.plate_compiled_data]
553 if not ready_items:
554 self.app.show_error("Selected plates are not compiled. Please compile first.")
555 return
557 try:
558 # Use subprocess approach like integration tests
559 logger.debug("🔥 Using subprocess approach for clean isolation")
561 plate_paths_to_run = [item['path'] for item in ready_items]
563 # Pass definition pipeline steps - subprocess will make fresh copy and compile
564 pipeline_data = {}
565 for plate_path in plate_paths_to_run:
566 definition_pipeline = self._get_current_pipeline_definition(plate_path)
567 pipeline_data[plate_path] = definition_pipeline
569 logger.info(f"🔥 Starting subprocess for {len(plate_paths_to_run)} plates")
571 # Create data file for subprocess (only file needed besides log)
572 data_file = tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pkl')
574 # Generate unique ID for this subprocess
575 import time
576 subprocess_timestamp = int(time.time())
577 plate_names = [Path(path).name for path in plate_paths_to_run]
578 unique_id = f"plates_{'_'.join(plate_names[:2])}_{subprocess_timestamp}" # Limit to first 2 plates for filename length
580 # Build subprocess log name from TUI log base using log utilities
581 from openhcs.core.log_utils import get_current_log_file_path
582 try:
583 tui_log_path = get_current_log_file_path()
584 if tui_log_path.endswith('.log'):
585 tui_base = tui_log_path[:-4] # Remove .log extension
586 else:
587 tui_base = tui_log_path
588 log_file_base = f"{tui_base}_subprocess_{subprocess_timestamp}"
589 except RuntimeError:
590 # Fallback if no main log found
591 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs"
592 log_dir.mkdir(parents=True, exist_ok=True)
593 log_file_base = str(log_dir / f"tui_subprocess_{subprocess_timestamp}")
595 # Pickle data for subprocess
596 subprocess_data = {
597 'plate_paths': plate_paths_to_run,
598 'pipeline_data': pipeline_data,
599 'global_config': self.app.global_config # Pickle config object directly
600 }
602 # Wrap pickle operation in executor to avoid blocking UI
603 def _write_pickle_data():
604 import dill as pickle
605 with open(data_file.name, 'wb') as f:
606 pickle.dump(subprocess_data, f)
607 data_file.close()
609 await asyncio.get_event_loop().run_in_executor(None, _write_pickle_data)
611 logger.debug(f"🔥 Created data file: {data_file.name}")
613 # Create subprocess (like integration tests)
614 subprocess_script = Path(__file__).parent.parent / "subprocess_runner.py"
616 # Generate actual log file path that subprocess will create
617 actual_log_file_path = f"{log_file_base}_{unique_id}.log"
618 logger.debug(f"🔥 Log file base: {log_file_base}")
619 logger.debug(f"🔥 Unique ID: {unique_id}")
620 logger.debug(f"🔥 Actual log file: {actual_log_file_path}")
622 # Store log file path for monitoring (subprocess logger writes to this)
623 self.log_file_path = actual_log_file_path
624 self.log_file_position = self._get_current_log_position() # Start from current end
626 logger.debug(f"🔥 Subprocess command: {sys.executable} {subprocess_script} {data_file.name} {log_file_base} {unique_id}")
627 logger.debug(f"🔥 Subprocess logger will write to: {self.log_file_path}")
628 logger.debug(f"🔥 Subprocess stdout will be silenced (logger handles output)")
630 # SIMPLE SUBPROCESS: Let subprocess log to its own file (single source of truth)
631 # Wrap subprocess creation in executor to avoid blocking UI
632 def _create_subprocess():
633 return subprocess.Popen([
634 sys.executable, str(subprocess_script),
635 data_file.name, log_file_base, unique_id # Only data file and log - no temp files
636 ],
637 stdout=subprocess.DEVNULL, # Subprocess logs to its own file
638 stderr=subprocess.DEVNULL, # Subprocess logs to its own file
639 text=True, # Text mode for easier handling
640 )
642 self.current_process = await asyncio.get_event_loop().run_in_executor(None, _create_subprocess)
644 logger.info(f"🔥 Subprocess started with PID: {self.current_process.pid}")
646 # Subprocess logs to its own dedicated file - no output monitoring needed
648 # Update orchestrator states to show running state
649 for plate in ready_items:
650 plate_path = plate['path']
651 if plate_path in self.orchestrators:
652 self.orchestrators[plate_path]._state = OrchestratorState.EXECUTING
654 # Trigger UI refresh after state changes
655 self._trigger_ui_refresh()
657 self.app.current_status = f"Running {len(ready_items)} plate(s) in subprocess..."
658 self._update_button_states()
660 # Start reactive log monitoring
661 self._start_log_monitoring()
663 # Start async monitoring
664 await self._start_monitoring()
666 except Exception as e:
667 logger.critical(f"Failed to start subprocess: {e}", exc_info=True)
668 self.app.show_error("Failed to start subprocess", str(e))
669 self._reset_execution_state("Subprocess failed to start")
671 def _start_log_monitoring(self) -> None:
672 """Start reactive log monitoring for subprocess logs."""
673 if not self.log_file_path:
674 logger.warning("Cannot start log monitoring: no log file path")
675 return
677 try:
678 # Extract base path from log file path (remove .log extension)
679 log_path = Path(self.log_file_path)
680 base_log_path = str(log_path.with_suffix(''))
682 # Notify status bar to start log monitoring
683 if hasattr(self.app, 'status_bar') and self.app.status_bar:
684 self.app.status_bar.start_log_monitoring(base_log_path)
685 logger.debug(f"Started reactive log monitoring for: {base_log_path}")
686 else:
687 logger.warning("Status bar not available for log monitoring")
689 except Exception as e:
690 logger.error(f"Failed to start log monitoring: {e}")
692 def _stop_log_monitoring(self) -> None:
693 """Stop reactive log monitoring."""
694 try:
695 # Notify status bar to stop log monitoring
696 if hasattr(self.app, 'status_bar') and self.app.status_bar:
697 self.app.status_bar.stop_log_monitoring()
698 logger.debug("Stopped reactive log monitoring")
699 except Exception as e:
700 logger.error(f"Failed to stop log monitoring: {e}")
702 def _get_current_log_position(self) -> int:
703 """Get current position in log file."""
704 if not self.log_file_path or not Path(self.log_file_path).exists():
705 return 0
706 try:
707 return Path(self.log_file_path).stat().st_size
708 except Exception:
709 return 0
713 def _stop_file_watcher(self) -> None:
714 """Stop file system watcher without blocking."""
715 if not self.file_observer:
716 return
718 try:
719 # Just stop and abandon - don't wait for anything
720 self.file_observer.stop()
721 except Exception:
722 pass # Ignore errors
723 finally:
724 # Always clear references immediately
725 self.file_observer = None
726 self.file_watcher = None
730 async def _start_monitoring(self) -> None:
731 """Start async monitoring using Textual's interval system."""
732 # Stop any existing monitoring
733 self._stop_monitoring()
735 if self.monitoring_active:
736 return
738 self.monitoring_active = True
739 # Use Textual's set_interval for periodic async monitoring
740 self.monitoring_interval = self.set_interval(
741 10.0, # Check every 10 seconds
742 self._check_process_status_async,
743 pause=False
744 )
745 logger.debug("Started async process monitoring")
747 def _stop_monitoring(self) -> None:
748 """Stop async monitoring."""
749 if self.monitoring_interval:
750 self.monitoring_interval.stop()
751 self.monitoring_interval = None
752 self.monitoring_active = False
754 # Also stop log monitoring
755 self._stop_log_monitoring()
757 logger.debug("Stopped async process monitoring")
759 async def _check_process_status_async(self) -> None:
760 """Async process status check - replaces worker thread."""
761 if not self.monitoring_active:
762 return
764 try:
765 # Simple direct access - no threading, no locks needed
766 if not self._is_any_plate_running():
767 logger.debug("🔥 MONITOR: Subprocess finished")
769 # Stop monitoring first
770 self._stop_monitoring()
772 # Handle completion directly - no call_from_thread needed
773 await self._handle_process_completion()
775 except Exception as e:
776 logger.debug(f"Error in async process monitoring: {e}")
777 # Continue monitoring on error
779 async def _handle_process_completion(self) -> None:
780 """Handle subprocess completion - read from log file (single source of truth)."""
781 # Determine success/failure from log file content (single source of truth)
782 success = False
784 if self.log_file_path and Path(self.log_file_path).exists():
785 try:
786 # Read log file directly to check for success markers
787 with open(self.log_file_path, 'r') as f:
788 log_content = f.read()
789 # Look for success markers in the log
790 has_execution_success = "🔥 SUBPROCESS: EXECUTION SUCCESS:" in log_content
791 has_all_completed = "All plates completed successfully" in log_content
792 if has_execution_success and has_all_completed:
793 success = True
795 except Exception as e:
796 logger.error(f"Error reading subprocess log file: {e}")
797 success = False
799 # Clean up the subprocess
800 logger.info("🔥 MONITOR: Starting process cleanup...")
801 if self.current_process:
802 try:
803 self.current_process.wait() # Clean up the zombie process
804 logger.info("🔥 MONITOR: Process cleanup completed")
805 except Exception as e:
806 logger.warning(f"🔥 MONITOR: Error during process cleanup: {e}")
808 # Update orchestrator states based on log file analysis (single source of truth)
809 if success:
810 # Success - update orchestrators to completed
811 for plate_path, orchestrator in self.orchestrators.items():
812 if orchestrator.state == OrchestratorState.EXECUTING:
813 orchestrator._state = OrchestratorState.COMPLETED
815 # Reset execution state (this will trigger UI refresh internally)
816 self._reset_execution_state("Execution completed successfully.", force_fail_executing=False)
817 else:
818 # Failure - update orchestrators to failed
819 for plate_path, orchestrator in self.orchestrators.items():
820 if orchestrator.state == OrchestratorState.EXECUTING:
821 orchestrator._state = OrchestratorState.EXEC_FAILED
823 # Reset execution state (this will trigger UI refresh internally)
824 self._reset_execution_state("Execution failed.", force_fail_executing=False)
826 self._stop_monitoring() # Stop monitoring since process is done
828 async def _read_log_file_incremental(self) -> None:
829 """Read new content from the log file since last read."""
830 if not self.log_file_path:
831 self.app.current_status = "🔥 LOG READER: No log file"
832 return
834 try:
835 # Wrap all file I/O operations in executor to avoid blocking UI
836 def _read_log_content():
837 if not Path(self.log_file_path).exists():
838 return None, self.log_file_position
840 with open(self.log_file_path, 'r') as f:
841 # Seek to where we left off
842 f.seek(self.log_file_position)
843 new_content = f.read()
844 # Update position for next read
845 new_position = f.tell()
847 return new_content, new_position
849 new_content, new_position = await asyncio.get_event_loop().run_in_executor(None, _read_log_content)
850 self.log_file_position = new_position
852 if new_content is None:
853 self.app.current_status = "🔥 LOG READER: No log file"
854 return
856 if new_content and new_content.strip():
857 # Get the last non-empty line from new content
858 lines = new_content.strip().split('\n')
859 non_empty_lines = [line.strip() for line in lines if line.strip()]
861 if non_empty_lines:
862 # Show the last line, truncated if too long
863 last_line = non_empty_lines[-1]
864 if len(last_line) > 100:
865 last_line = last_line[:97] + "..."
867 self.app.current_status = last_line
868 else:
869 self.app.current_status = "🔥 LOG READER: No lines found"
870 else:
871 self.app.current_status = "🔥 LOG READER: No new content"
873 except Exception as e:
874 self.app.current_status = f"🔥 LOG READER ERROR: {e}"
878 async def action_stop_execution(self) -> None:
879 logger.info("🛑 Stop button pressed. Terminating subprocess.")
880 self.app.current_status = "Terminating execution..."
882 # Stop async monitoring first
883 self._stop_monitoring()
885 if self.current_process and self.current_process.poll() is None: # Still running
886 try:
887 # Kill the entire process group, not just the parent process
888 # The subprocess creates its own process group, so we need to kill that group
889 logger.info(f"🛑 Killing process group for PID {self.current_process.pid}...")
891 # Get the process group ID (should be same as PID since subprocess calls os.setpgrp())
892 process_group_id = self.current_process.pid
894 # Kill entire process group (negative PID kills process group)
895 os.killpg(process_group_id, signal.SIGTERM)
897 # Give processes time to exit gracefully
898 await asyncio.sleep(1)
900 # Force kill if still alive
901 try:
902 os.killpg(process_group_id, signal.SIGKILL)
903 logger.info(f"🛑 Force killed process group {process_group_id}")
904 except ProcessLookupError:
905 logger.info(f"🛑 Process group {process_group_id} already terminated")
907 except Exception as e:
908 logger.warning(f"🛑 Error killing process group: {e}, falling back to single process kill")
909 # Fallback to killing just the main process
910 self.current_process.kill()
912 self._reset_execution_state("Execution terminated by user.")
916 async def action_add_plate(self) -> None:
917 """Handle Add Plate button."""
918 await self._open_plate_directory_browser()
920 async def action_export_ome_zarr(self) -> None:
921 """Export selected plate to OME-ZARR format."""
922 if not self.selected_plate:
923 self.app.show_error("No Selection", "Please select a plate to export.")
924 return
926 # Get the orchestrator for the selected plate
927 orchestrator = self.orchestrators.get(self.selected_plate)
928 if not orchestrator:
929 self.app.show_error("Not Initialized", "Please initialize the plate before exporting.")
930 return
932 # Open file browser for export location
933 def handle_export_result(selected_paths):
934 if selected_paths:
935 export_path = Path(selected_paths[0]) if isinstance(selected_paths, list) else Path(selected_paths)
936 self._start_ome_zarr_export(orchestrator, export_path)
938 await self.window_service.open_file_browser(
939 file_manager=self.filemanager,
940 initial_path=get_cached_browser_path(PathCacheKey.GENERAL),
941 backend=Backend.DISK,
942 title="Select OME-ZARR Export Directory",
943 mode="save",
944 selection_mode=SelectionMode.DIRECTORIES_ONLY,
945 cache_key=PathCacheKey.GENERAL,
946 on_result_callback=handle_export_result,
947 caller_id="plate_manager_export"
948 )
950 def _start_ome_zarr_export(self, orchestrator, export_path: Path):
951 """Start OME-ZARR export process."""
952 async def run_export():
953 try:
954 self.app.current_status = f"Exporting to OME-ZARR: {export_path}"
956 # Create export-specific config with ZARR materialization
957 export_config = orchestrator.global_config
958 export_vfs_config = VFSConfig(
959 intermediate_backend=export_config.vfs.intermediate_backend,
960 materialization_backend=MaterializationBackend.ZARR
961 )
963 # Update orchestrator config for export
964 export_global_config = dataclasses.replace(export_config, vfs=export_vfs_config)
966 # Create zarr backend with OME-ZARR enabled
967 zarr_backend = ZarrStorageBackend(ome_zarr_metadata=True)
969 # Copy processed data from current workspace/plate to OME-ZARR format
970 # For OpenHCS format, workspace_path is None, so use input_dir (plate path)
971 source_path = orchestrator.workspace_path or orchestrator.input_dir
972 if source_path and source_path.exists():
973 # Find processed images in workspace/plate
974 processed_images = list(source_path.rglob("*.tif"))
976 if processed_images:
977 # Group by well for batch operations
978 wells_data = defaultdict(list)
980 for img_path in processed_images:
981 # Extract well from filename
982 well_match = None
983 # Try ImageXpress pattern: A01_s001_w1_z001.tif
984 match = re.search(r'([A-Z]\d{2})_', img_path.name)
985 if match:
986 well_id = match.group(1)
987 wells_data[well_id].append(img_path)
989 # Export each well to OME-ZARR
990 export_store_path = export_path / "plate.zarr"
992 for well_id, well_images in wells_data.items():
993 # Load images
994 images = []
995 for img_path in well_images:
996 img = Image.open(img_path)
997 images.append(np.array(img))
999 # Create output paths for OME-ZARR structure
1000 output_paths = [export_store_path / f"{well_id}_{i:03d}.tif"
1001 for i in range(len(images))]
1003 # Save to OME-ZARR format
1004 zarr_backend.save_batch(images, output_paths, chunk_name=well_id)
1006 self.app.current_status = f"✅ OME-ZARR export completed: {export_store_path}"
1007 else:
1008 self.app.show_error("No Data", "No processed images found in workspace.")
1009 else:
1010 self.app.show_error("No Workspace", "Plate workspace not found. Run pipeline first.")
1012 except Exception as e:
1013 logger.error(f"OME-ZARR export failed: {e}", exc_info=True)
1014 self.app.show_error("Export Failed", f"OME-ZARR export failed: {str(e)}")
1016 # Run export in background
1017 asyncio.create_task(run_export())
1019 # Debug functionality removed - no longer needed
1021 async def _open_plate_directory_browser(self):
1022 """Open textual-window file browser for plate directory selection."""
1023 # Get cached path for better UX - remembers last used directory
1024 path_cache = get_path_cache()
1025 initial_path = path_cache.get_initial_path(PathCacheKey.PLATE_IMPORT, Path.home())
1027 # Open textual-window file browser for directory selection
1028 await self.window_service.open_file_browser(
1029 file_manager=self.filemanager,
1030 initial_path=initial_path,
1031 backend=Backend.DISK,
1032 title="Select Plate Directory",
1033 mode="load",
1034 selection_mode=SelectionMode.DIRECTORIES_ONLY,
1035 cache_key=PathCacheKey.PLATE_IMPORT,
1036 on_result_callback=self._add_plate_callback,
1037 caller_id="plate_manager",
1038 enable_multi_selection=True
1039 )
1041 def _add_plate_callback(self, selected_paths) -> None:
1042 """Handle directory selection from file browser."""
1043 logger.debug(f"_add_plate_callback called with: {selected_paths} (type: {type(selected_paths)})")
1045 if selected_paths is None or selected_paths is False:
1046 self.app.current_status = "Plate selection cancelled"
1047 return
1049 # Handle both single path and list of paths
1050 if not isinstance(selected_paths, list):
1051 selected_paths = [selected_paths]
1053 added_plates = []
1054 current_plates = list(self.items)
1056 for selected_path in selected_paths:
1057 # Ensure selected_path is a Path object
1058 if isinstance(selected_path, str):
1059 selected_path = Path(selected_path)
1060 elif not isinstance(selected_path, Path):
1061 selected_path = Path(str(selected_path))
1063 # Check if plate already exists
1064 if any(plate['path'] == str(selected_path) for plate in current_plates):
1065 continue
1067 # Add the plate to the list
1068 plate_name = selected_path.name
1069 plate_path = str(selected_path)
1070 plate_entry = {
1071 'name': plate_name,
1072 'path': plate_path,
1073 # No status field - state comes from orchestrator
1074 }
1076 current_plates.append(plate_entry)
1077 added_plates.append(plate_name)
1079 # Cache the parent directory for next time (save user navigation time)
1080 if selected_paths:
1081 # Use parent of first selected path as the cached directory
1082 first_path = selected_paths[0] if isinstance(selected_paths[0], Path) else Path(selected_paths[0])
1083 parent_dir = first_path.parent
1084 get_path_cache().set_cached_path(PathCacheKey.PLATE_IMPORT, parent_dir)
1086 # Update items list using reactive property (triggers automatic UI update)
1087 self.items = current_plates
1089 if added_plates:
1090 if len(added_plates) == 1:
1091 self.app.current_status = f"Added plate: {added_plates[0]}"
1092 else:
1093 self.app.current_status = f"Added {len(added_plates)} plates: {', '.join(added_plates)}"
1094 else:
1095 self.app.current_status = "No new plates added (duplicates skipped)"
1097 def action_delete_plate(self) -> None:
1098 selected_items, _ = self.get_selection_state()
1099 if not selected_items:
1100 self.app.show_error("No plate selected to delete.")
1101 return
1103 paths_to_delete = {p['path'] for p in selected_items}
1104 self.items = [p for p in self.items if p['path'] not in paths_to_delete]
1106 # Clean up orchestrators for deleted plates
1107 for path in paths_to_delete:
1108 if path in self.orchestrators:
1109 del self.orchestrators[path]
1111 if self.selected_plate in paths_to_delete:
1112 self.selected_plate = ""
1114 self.app.current_status = f"Deleted {len(paths_to_delete)} plate(s)"
1118 async def action_edit_config(self) -> None:
1119 """
1120 Handle Edit button - create per-orchestrator PipelineConfig instances.
1122 This enables per-orchestrator configuration without affecting global configuration.
1123 Shows resolved defaults from GlobalPipelineConfig with "Pipeline default: {value}" placeholders.
1124 """
1125 selected_items, selection_mode = self.get_selection_state()
1127 if selection_mode == "empty":
1128 self.app.current_status = "No orchestrators selected for configuration"
1129 return
1131 selected_orchestrators = [
1132 self.orchestrators[item['path']] for item in selected_items
1133 if item['path'] in self.orchestrators
1134 ]
1136 if not selected_orchestrators:
1137 self.app.current_status = "No initialized orchestrators selected"
1138 return
1140 # Load existing config or create new one for editing
1141 representative_orchestrator = selected_orchestrators[0]
1143 if representative_orchestrator.pipeline_config:
1144 # Create editing config from existing orchestrator config with user-set values preserved
1145 # Use current global config (not orchestrator's old global config) for updated placeholders
1146 from openhcs.core.config import create_editing_config_from_existing_lazy_config
1147 current_plate_config = create_editing_config_from_existing_lazy_config(
1148 representative_orchestrator.pipeline_config,
1149 self.global_config # Use current global config for updated placeholders
1150 )
1151 else:
1152 # Create new config with placeholders using current global config
1153 from openhcs.core.config import create_pipeline_config_for_editing
1154 current_plate_config = create_pipeline_config_for_editing(self.global_config)
1156 def handle_config_save(new_config: PipelineConfig) -> None:
1157 """Apply per-orchestrator configuration without global side effects."""
1158 for orchestrator in selected_orchestrators:
1159 # Direct synchronous call - no async needed
1160 orchestrator.apply_pipeline_config(new_config)
1161 count = len(selected_orchestrators)
1162 self.app.current_status = f"Per-orchestrator configuration applied to {count} orchestrator(s)"
1164 # Open configuration window using PipelineConfig (not GlobalPipelineConfig)
1165 await self.window_service.open_config_window(
1166 PipelineConfig,
1167 current_plate_config,
1168 on_save_callback=handle_config_save
1169 )
1171 async def action_edit_global_config(self) -> None:
1172 """
1173 Handle global configuration editing - affects all orchestrators.
1175 This maintains the existing global configuration workflow but uses lazy loading.
1176 """
1177 from openhcs.core.config import get_default_global_config
1178 from openhcs.core.lazy_config import create_pipeline_config_for_editing, PipelineConfig
1180 # Get current global config from app or use default
1181 current_global_config = self.app.global_config or get_default_global_config()
1183 # Create lazy PipelineConfig for editing with proper thread-local context
1184 current_lazy_config = create_pipeline_config_for_editing(current_global_config, preserve_values=True)
1186 def handle_global_config_save(new_config: PipelineConfig) -> None:
1187 """Apply global configuration to all orchestrators."""
1188 # Convert lazy PipelineConfig back to GlobalPipelineConfig
1189 global_config = new_config.to_base_config()
1191 self.app.global_config = global_config # Update app-level config
1193 # Update thread-local storage for MaterializationPathConfig defaults
1194 from openhcs.core.config import set_current_global_config, GlobalPipelineConfig
1195 set_current_global_config(GlobalPipelineConfig, global_config)
1197 for orchestrator in self.orchestrators.values():
1198 asyncio.create_task(orchestrator.apply_new_global_config(global_config))
1199 self.app.current_status = "Global configuration applied to all orchestrators"
1201 # PipelineConfig already imported from openhcs.core.config
1202 await self.window_service.open_config_window(
1203 PipelineConfig,
1204 current_lazy_config,
1205 on_save_callback=handle_global_config_save
1206 )
1210 def _analyze_orchestrator_configs(self, orchestrators: List['PipelineOrchestrator']) -> Dict[str, Dict[str, Any]]:
1211 """Analyze configs across multiple orchestrators to detect same/different values.
1213 Args:
1214 orchestrators: List of PipelineOrchestrator instances
1216 Returns:
1217 Dict mapping field names to analysis results:
1218 - {"type": "same", "value": actual_value, "default": default_value}
1219 - {"type": "different", "values": [val1, val2, ...], "default": default_value}
1220 """
1221 if not orchestrators:
1222 return {}
1224 # Get parameter info for defaults
1225 param_info = SignatureAnalyzer.analyze(GlobalPipelineConfig)
1227 config_analysis = {}
1229 # Analyze each field in GlobalPipelineConfig
1230 for field in dataclasses.fields(GlobalPipelineConfig):
1231 field_name = field.name
1233 # Get values from all orchestrators
1234 values = []
1235 for orch in orchestrators:
1236 try:
1237 value = getattr(orch.global_config, field_name)
1238 values.append(value)
1239 except AttributeError:
1240 # Field doesn't exist in this config, skip
1241 continue
1243 if not values:
1244 continue
1246 # Get default value from parameter info
1247 param_details = param_info.get(field_name)
1248 default_value = param_details.default_value if param_details else None
1250 # Check if all values are the same
1251 if all(self._values_equal(v, values[0]) for v in values):
1252 config_analysis[field_name] = {
1253 "type": "same",
1254 "value": values[0],
1255 "default": default_value
1256 }
1257 else:
1258 config_analysis[field_name] = {
1259 "type": "different",
1260 "values": values,
1261 "default": default_value
1262 }
1264 return config_analysis
1266 def _values_equal(self, val1: Any, val2: Any) -> bool:
1267 """Check if two values are equal, handling dataclasses and complex types."""
1268 # Handle dataclass comparison
1269 if dataclasses.is_dataclass(val1) and dataclasses.is_dataclass(val2):
1270 return dataclasses.asdict(val1) == dataclasses.asdict(val2)
1272 # Handle regular comparison
1273 return val1 == val2
1275 def action_init_plate(self) -> None:
1276 """Handle Init Plate button - initialize selected plates."""
1277 # Get current selection state
1278 selected_items, selection_mode = self.get_selection_state()
1280 if selection_mode == "empty":
1281 logger.warning("No plates available for initialization")
1282 return
1284 # Validate all selected plates can be initialized (allow ALL failed plates to be re-initialized)
1285 invalid_plates = []
1286 for item in selected_items:
1287 plate_path = item['path']
1288 orchestrator = self.orchestrators.get(plate_path)
1289 # Only block plates that are currently executing - all other states can be re-initialized
1290 if orchestrator is not None and orchestrator.state == OrchestratorState.EXECUTING:
1291 invalid_plates.append(item)
1293 if invalid_plates:
1294 names = [item['name'] for item in invalid_plates]
1295 logger.warning(f"Cannot initialize plates that are currently executing: {', '.join(names)}")
1296 return
1298 # Start async initialization
1299 self._start_async_init(selected_items, selection_mode)
1301 def _start_async_init(self, selected_items: List[Dict], selection_mode: str) -> None:
1302 """Start async initialization of selected plates."""
1303 # Generate operation description
1304 desc = self.get_operation_description(selected_items, selection_mode, "initialize")
1305 logger.info(f"Initializing: {desc}")
1307 # Start background worker
1308 self._init_plates_worker(selected_items)
1310 @work(exclusive=True)
1311 async def _init_plates_worker(self, selected_items: List[Dict]) -> None:
1312 """Background worker for plate initialization."""
1313 for plate_data in selected_items:
1314 plate_path = plate_data['path']
1316 # Find the actual plate in self.items (not the copy from get_selection_state)
1317 actual_plate = None
1318 for plate in self.items:
1319 if plate['path'] == plate_path:
1320 actual_plate = plate
1321 break
1323 if not actual_plate:
1324 logger.error(f"Plate not found in plates list: {plate_path}")
1325 continue
1327 try:
1328 # Run heavy initialization in executor to avoid blocking UI
1329 def init_orchestrator():
1330 return PipelineOrchestrator(
1331 plate_path=plate_path,
1332 global_config=self.global_config,
1333 storage_registry=self.filemanager.registry
1334 ).initialize()
1336 orchestrator = await asyncio.get_event_loop().run_in_executor(None, init_orchestrator)
1338 # Store orchestrator for later use (channel selection, etc.)
1339 self.orchestrators[plate_path] = orchestrator
1340 # Orchestrator state is already set to READY by initialize() method
1341 logger.info(f"Plate {actual_plate['name']} initialized successfully")
1343 except Exception as e:
1344 logger.error(f"Failed to initialize plate {plate_path}: {e}", exc_info=True)
1345 # Create a failed orchestrator to track the error state
1346 failed_orchestrator = PipelineOrchestrator(
1347 plate_path=plate_path,
1348 global_config=self.global_config,
1349 storage_registry=self.filemanager.registry
1350 )
1351 failed_orchestrator._state = OrchestratorState.INIT_FAILED
1352 self.orchestrators[plate_path] = failed_orchestrator
1353 actual_plate['error'] = str(e)
1355 # Trigger UI refresh after orchestrator state changes
1356 self._trigger_ui_refresh()
1357 # Update button states immediately (reactive system handles UI updates automatically)
1358 self._update_button_states()
1359 # Notify pipeline editor of status change
1360 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path']))
1361 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol)
1362 logger.debug(f"Updated plate {actual_plate['name']} status")
1364 # Final UI update (reactive system handles this automatically when self.items is modified)
1365 self._update_button_states()
1367 # Update status
1368 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.READY])
1369 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.INIT_FAILED])
1371 if error_count == 0:
1372 logger.info(f"Successfully initialized {success_count} plates")
1373 else:
1374 logger.warning(f"Initialized {success_count} plates, {error_count} errors")
1376 def action_compile_plate(self) -> None:
1377 """Handle Compile Plate button - compile pipelines for selected plates."""
1378 # Get current selection state
1379 selected_items, selection_mode = self.get_selection_state()
1381 if selection_mode == "empty":
1382 logger.warning("No plates available for compilation")
1383 return
1385 # Validate all selected plates are ready for compilation (allow failed plates to be re-compiled)
1386 not_ready = []
1387 for item in selected_items:
1388 plate_path = item['path']
1389 orchestrator = self.orchestrators.get(plate_path)
1390 # Allow READY, COMPILE_FAILED, EXEC_FAILED, COMPILED, and COMPLETED states to be compiled/recompiled
1391 if orchestrator is None or orchestrator.state not in [OrchestratorState.READY, OrchestratorState.COMPILE_FAILED, OrchestratorState.EXEC_FAILED, OrchestratorState.COMPILED, OrchestratorState.COMPLETED]:
1392 not_ready.append(item)
1394 if not_ready:
1395 names = [item['name'] for item in not_ready]
1396 # More accurate error message based on actual state
1397 if any(self.orchestrators.get(item['path']) is None for item in not_ready):
1398 logger.warning(f"Cannot compile plates that haven't been initialized: {', '.join(names)}")
1399 elif any(self.orchestrators.get(item['path']).state == OrchestratorState.EXECUTING for item in not_ready):
1400 logger.warning(f"Cannot compile plates that are currently executing: {', '.join(names)}")
1401 else:
1402 logger.warning(f"Cannot compile plates in current state: {', '.join(names)}")
1403 return
1405 # Validate all selected plates have pipelines
1406 no_pipeline = []
1407 for item in selected_items:
1408 pipeline = self._get_current_pipeline_definition(item['path'])
1409 if not pipeline:
1410 no_pipeline.append(item)
1412 if no_pipeline:
1413 names = [item['name'] for item in no_pipeline]
1414 self.app.current_status = f"Cannot compile plates without pipelines: {', '.join(names)}"
1415 return
1417 # Start async compilation
1418 self._start_async_compile(selected_items, selection_mode)
1420 def _start_async_compile(self, selected_items: List[Dict], selection_mode: str) -> None:
1421 """Start async compilation of selected plates."""
1422 # Generate operation description
1423 desc = self.get_operation_description(selected_items, selection_mode, "compile")
1424 logger.info(f"Compiling: {desc}")
1426 # Start background worker
1427 self._compile_plates_worker(selected_items)
1429 @work(exclusive=True)
1430 async def _compile_plates_worker(self, selected_items: List[Dict]) -> None:
1431 """Background worker for plate compilation."""
1432 for plate_data in selected_items:
1433 plate_path = plate_data['path']
1435 # Find the actual plate in self.items (not the copy from get_selection_state)
1436 actual_plate = None
1437 for plate in self.items:
1438 if plate['path'] == plate_path:
1439 actual_plate = plate
1440 break
1442 if not actual_plate:
1443 logger.error(f"Plate not found in plates list: {plate_path}")
1444 continue
1446 # Get definition pipeline and make fresh copy
1447 definition_pipeline = self._get_current_pipeline_definition(plate_path)
1448 if not definition_pipeline:
1449 logger.warning(f"No pipeline defined for {actual_plate['name']}, using empty pipeline")
1450 definition_pipeline = []
1452 try:
1453 # Get or create orchestrator for compilation (run in executor to avoid blocking)
1454 def get_or_create_orchestrator():
1455 if plate_path in self.orchestrators:
1456 orchestrator = self.orchestrators[plate_path]
1457 if not orchestrator.is_initialized():
1458 orchestrator.initialize()
1459 return orchestrator
1460 else:
1461 return PipelineOrchestrator(
1462 plate_path=plate_path,
1463 global_config=self.global_config,
1464 storage_registry=self.filemanager.registry
1465 ).initialize()
1467 orchestrator = await asyncio.get_event_loop().run_in_executor(None, get_or_create_orchestrator)
1468 self.orchestrators[plate_path] = orchestrator
1470 # Make fresh copy for compilation
1471 execution_pipeline = copy.deepcopy(definition_pipeline)
1473 # Fix step IDs after deep copy to match new object IDs
1474 for step in execution_pipeline:
1475 step.step_id = str(id(step))
1476 # Ensure variable_components is never None - use FunctionStep default
1477 if step.variable_components is None:
1478 logger.warning(f"🔥 Step '{step.name}' has None variable_components, setting FunctionStep default")
1479 step.variable_components = [VariableComponents.SITE]
1480 # Also ensure it's not an empty list
1481 elif not step.variable_components:
1482 logger.warning(f"🔥 Step '{step.name}' has empty variable_components, setting FunctionStep default")
1483 step.variable_components = [VariableComponents.SITE]
1485 # Get wells and compile (async - run in executor to avoid blocking UI)
1486 # Wrap in Pipeline object like test_main.py does
1487 pipeline_obj = Pipeline(steps=execution_pipeline)
1489 # Run heavy operations in executor to avoid blocking UI
1490 wells = await asyncio.get_event_loop().run_in_executor(None, lambda: orchestrator.get_component_keys(GroupBy.WELL))
1491 compiled_contexts = await asyncio.get_event_loop().run_in_executor(
1492 None, orchestrator.compile_pipelines, pipeline_obj.steps, wells
1493 )
1495 # Store state simply - no reactive property issues
1496 step_ids_in_pipeline = [id(step) for step in execution_pipeline]
1497 # Get step IDs from contexts (ProcessingContext objects)
1498 first_well_key = list(compiled_contexts.keys())[0] if compiled_contexts else None
1499 step_ids_in_contexts = list(compiled_contexts[first_well_key].step_plans.keys()) if first_well_key and hasattr(compiled_contexts[first_well_key], 'step_plans') else []
1500 logger.info(f"🔥 Storing compiled data for {plate_path}: pipeline={type(execution_pipeline)}, contexts={type(compiled_contexts)}")
1501 logger.info(f"🔥 Step IDs in pipeline: {step_ids_in_pipeline}")
1502 logger.info(f"🔥 Step IDs in contexts: {step_ids_in_contexts}")
1503 self.plate_compiled_data[plate_path] = (execution_pipeline, compiled_contexts)
1504 logger.info(f"🔥 Stored! Available compiled plates: {list(self.plate_compiled_data.keys())}")
1506 # Orchestrator state is already set to COMPILED by compile_pipelines() method
1507 logger.info(f"🔥 Successfully compiled {plate_path}")
1509 except Exception as e:
1510 logger.error(f"🔥 COMPILATION ERROR: Pipeline compilation failed for {plate_path}: {e}", exc_info=True)
1511 # Orchestrator state is already set to FAILED by compile_pipelines() method
1512 actual_plate['error'] = str(e)
1513 # Don't store anything in plate_compiled_data on failure
1515 # Trigger UI refresh after orchestrator state changes
1516 self._trigger_ui_refresh()
1517 # Update button states immediately (reactive system handles UI updates automatically)
1518 self._update_button_states()
1519 # Notify pipeline editor of status change
1520 status_symbol = get_orchestrator_status_symbol(self.orchestrators.get(actual_plate['path']))
1521 self._notify_pipeline_editor_status_change(actual_plate['path'], status_symbol)
1523 # Final UI update (reactive system handles this automatically when self.items is modified)
1524 self._update_button_states()
1526 # Update status
1527 success_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILED])
1528 error_count = len([p for p in selected_items if self.orchestrators.get(p['path']) and self.orchestrators[p['path']].state == OrchestratorState.COMPILE_FAILED])
1530 if error_count == 0:
1531 logger.info(f"Successfully compiled {success_count} plates")
1532 else:
1533 logger.warning(f"Compiled {success_count} plates, {error_count} errors")
1535 async def action_code_plate(self) -> None:
1536 """Generate Python code for selected plates and their pipelines."""
1537 logger.debug("Code button pressed - generating Python code for plates")
1539 selected_items, _ = self.get_selection_state()
1540 if not selected_items:
1541 self.app.current_status = "No plates selected for code generation"
1542 return
1544 try:
1545 # Get pipeline data for selected plates
1546 plate_paths = [item['path'] for item in selected_items]
1547 pipeline_data = {}
1549 # Collect pipeline steps for each plate
1550 for plate_path in plate_paths:
1551 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1552 # Get pipeline steps from pipeline editor if available
1553 if plate_path in self.pipeline_editor.plate_pipelines:
1554 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path]
1555 else:
1556 pipeline_data[plate_path] = []
1557 else:
1558 pipeline_data[plate_path] = []
1560 # Use existing pickle_to_python logic to generate complete script
1561 from openhcs.textual_tui.services.terminal_launcher import TerminalLauncher
1563 # Create data structure like pickle_to_python expects
1564 data = {
1565 'plate_paths': plate_paths,
1566 'pipeline_data': pipeline_data,
1567 'global_config': self.app.global_config
1568 }
1570 # Extract variables from data dict
1571 plate_paths = data['plate_paths']
1572 pipeline_data = data['pipeline_data']
1574 # Generate just the orchestrator configuration (no execution wrapper)
1575 from openhcs.debug.pickle_to_python import generate_complete_orchestrator_code
1577 python_code = generate_complete_orchestrator_code(
1578 plate_paths=plate_paths,
1579 pipeline_data=pipeline_data,
1580 global_config=self.app.global_config,
1581 clean_mode=False
1582 )
1584 # Create callback to handle edited code
1585 def handle_edited_code(edited_code: str):
1586 logger.debug("Orchestrator code edited, processing changes...")
1587 try:
1588 # Execute the code (it has all necessary imports)
1589 namespace = {}
1590 exec(edited_code, namespace)
1592 # Update pipeline data if present (composition: orchestrator contains pipelines)
1593 if 'pipeline_data' in namespace:
1594 new_pipeline_data = namespace['pipeline_data']
1595 # Update pipeline editor using reactive system (like pipeline code button does)
1596 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1597 # Update plate pipelines storage
1598 current_pipelines = dict(self.pipeline_editor.plate_pipelines)
1599 current_pipelines.update(new_pipeline_data)
1600 self.pipeline_editor.plate_pipelines = current_pipelines
1602 # If current plate is in the edited data, update the current view too
1603 current_plate = self.pipeline_editor.current_plate
1604 if current_plate and current_plate in new_pipeline_data:
1605 self.pipeline_editor.pipeline_steps = new_pipeline_data[current_plate]
1607 self.app.current_status = f"Pipeline data updated for {len(new_pipeline_data)} plates"
1609 # Update global config if present
1610 elif 'global_config' in namespace:
1611 new_global_config = namespace['global_config']
1612 import asyncio
1613 for plate_path in plate_paths:
1614 if plate_path in self.orchestrators:
1615 orchestrator = self.orchestrators[plate_path]
1616 asyncio.create_task(orchestrator.apply_new_global_config(new_global_config))
1617 self.app.current_status = f"Global config updated for {len(plate_paths)} plates"
1619 # Update orchestrators list if present
1620 elif 'orchestrators' in namespace:
1621 new_orchestrators = namespace['orchestrators']
1622 self.app.current_status = f"Orchestrator list updated with {len(new_orchestrators)} orchestrators"
1624 else:
1625 self.app.show_error("Parse Error", "No valid assignments found in edited code")
1627 except SyntaxError as e:
1628 self.app.show_error("Syntax Error", f"Invalid Python syntax: {e}")
1629 except Exception as e:
1630 logger.error(f"Failed to parse edited orchestrator code: {e}")
1631 self.app.show_error("Edit Error", f"Failed to parse orchestrator code: {str(e)}")
1633 # Launch terminal editor
1634 launcher = TerminalLauncher(self.app)
1635 await launcher.launch_editor_for_file(
1636 file_content=python_code,
1637 file_extension='.py',
1638 on_save_callback=handle_edited_code
1639 )
1641 except Exception as e:
1642 logger.error(f"Failed to generate plate code: {e}")
1643 self.app.current_status = f"Failed to generate code: {e}"
1645 async def action_save_python_script(self) -> None:
1646 """Save Python script for selected plates (like special_io_pipeline.py)."""
1647 logger.debug("Save button pressed - saving Python script for plates")
1649 selected_items, _ = self.get_selection_state()
1650 if not selected_items:
1651 self.app.current_status = "No plates selected for script generation"
1652 return
1654 try:
1655 # Get pipeline data for selected plates
1656 plate_paths = [item['path'] for item in selected_items]
1657 pipeline_data = {}
1659 # Collect pipeline steps for each plate
1660 for plate_path in plate_paths:
1661 if hasattr(self, 'pipeline_editor') and self.pipeline_editor:
1662 # Get pipeline steps from pipeline editor if available
1663 if plate_path in self.pipeline_editor.plate_pipelines:
1664 pipeline_data[plate_path] = self.pipeline_editor.plate_pipelines[plate_path]
1665 else:
1666 pipeline_data[plate_path] = []
1667 else:
1668 pipeline_data[plate_path] = []
1670 # Create data structure like pickle_to_python expects
1671 data = {
1672 'plate_paths': plate_paths,
1673 'pipeline_data': pipeline_data,
1674 'global_config': self.app.global_config
1675 }
1677 # Generate complete executable Python script using pickle_to_python logic
1678 python_code = self._generate_executable_script(data)
1680 # Launch file browser to save the script
1681 from openhcs.textual_tui.windows.file_browser_window import open_file_browser_window, BrowserMode
1682 from openhcs.textual_tui.services.file_browser_service import SelectionMode
1683 from openhcs.core.path_cache import get_cached_browser_path, PathCacheKey
1684 from openhcs.constants.constants import Backend
1686 def handle_save_result(result):
1687 if result:
1688 # Handle both single Path and list of Paths
1689 save_path = None
1690 if isinstance(result, Path):
1691 save_path = result
1692 elif isinstance(result, list) and len(result) > 0:
1693 save_path = result[0] # Take first path
1695 if save_path:
1696 try:
1697 # Write the Python script to the selected file
1698 with open(save_path, 'w') as f:
1699 f.write(python_code)
1701 logger.info(f"Python script saved to: {save_path}")
1702 self.app.current_status = f"Python script saved to: {save_path}"
1703 except Exception as e:
1704 logger.error(f"Failed to save Python script: {e}")
1705 self.app.current_status = f"Failed to save script: {e}"
1707 # Generate default filename based on first plate
1708 first_plate_name = Path(plate_paths[0]).name if plate_paths else "pipeline"
1709 default_filename = f"{first_plate_name}_pipeline.py"
1711 await open_file_browser_window(
1712 app=self.app,
1713 file_manager=self.app.filemanager,
1714 initial_path=get_cached_browser_path(PathCacheKey.PIPELINE_FILES),
1715 backend=Backend.DISK,
1716 title="Save Python Pipeline Script",
1717 mode=BrowserMode.SAVE,
1718 selection_mode=SelectionMode.FILES_ONLY,
1719 filter_extensions=['.py'],
1720 default_filename=default_filename,
1721 cache_key=PathCacheKey.PIPELINE_FILES,
1722 on_result_callback=handle_save_result,
1723 caller_id="plate_manager_save_script"
1724 )
1726 except Exception as e:
1727 logger.error(f"Failed to save Python script: {e}")
1728 self.app.current_status = f"Failed to save script: {e}"
1730 def _generate_executable_script(self, data: Dict) -> str:
1731 """Generate fully executable Python script by creating a temp pickle and using existing convert_pickle_to_python."""
1732 import tempfile
1733 import dill as pickle
1734 from openhcs.debug.pickle_to_python import convert_pickle_to_python
1736 # Create temporary pickle file
1737 with tempfile.NamedTemporaryFile(mode='wb', suffix='.pkl', delete=False) as temp_pickle:
1738 pickle.dump(data, temp_pickle)
1739 temp_pickle_path = temp_pickle.name
1741 # Create temporary output file
1742 with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as temp_output:
1743 temp_output_path = temp_output.name
1745 try:
1746 # Use existing convert_pickle_to_python function
1747 convert_pickle_to_python(temp_pickle_path, temp_output_path)
1749 # Read the generated script
1750 with open(temp_output_path, 'r') as f:
1751 script_content = f.read()
1753 return script_content
1755 finally:
1756 # Clean up temp files
1757 import os
1758 try:
1759 os.unlink(temp_pickle_path)
1760 os.unlink(temp_output_path)
1761 except:
1762 pass