Coverage for openhcs/core/orchestrator/orchestrator.py: 60.3%
703 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Consolidated orchestrator module for OpenHCS.
4This module provides a unified PipelineOrchestrator class that implements
5a two-phase (compile-all-then-execute-all) pipeline execution model.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 66 — Immutability After Construction
10- Clause 88 — No Inferred Capabilities
11- Clause 293 — GPU Pre-Declaration Enforcement
12- Clause 295 — GPU Scheduling Affinity
13"""
15import logging
16import concurrent.futures
17import multiprocessing
18from dataclasses import fields
19from pathlib import Path
20from typing import Any, Callable, Dict, List, Optional, Union, Set
22from openhcs.constants.constants import Backend, DEFAULT_IMAGE_EXTENSIONS, GroupBy, OrchestratorState, get_openhcs_config, AllComponents, VariableComponents
23from openhcs.constants import Microscope
24from openhcs.core.config import GlobalPipelineConfig
25from openhcs.config_framework.global_config import get_current_global_config
26from openhcs.config_framework.lazy_factory import ContextProvider
29from openhcs.core.metadata_cache import get_metadata_cache, MetadataCache
30from openhcs.core.context.processing_context import ProcessingContext
31from openhcs.core.pipeline.compiler import PipelineCompiler
32from openhcs.core.steps.abstract import AbstractStep
33from openhcs.core.components.validation import convert_enum_by_value
34from openhcs.io.filemanager import FileManager
35# Zarr backend is CPU-only; always import it (even in subprocess/no-GPU mode)
36import os
37from openhcs.io.zarr import ZarrStorageBackend
38# PipelineConfig now imported directly above
39from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
40from openhcs.microscopes import create_microscope_handler
41from openhcs.microscopes.microscope_base import MicroscopeHandler
43# Lazy import of consolidate_analysis_results to avoid blocking GUI startup
44# This function imports GPU libraries, so we defer it until first use
45def _get_consolidate_analysis_results():
46 """Lazy import of consolidate_analysis_results function."""
47 if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1':
48 # Subprocess runner mode - create placeholder
49 def consolidate_analysis_results(*args, **kwargs):
50 """Placeholder for subprocess runner mode."""
51 raise RuntimeError("Analysis consolidation not available in subprocess runner mode")
52 return consolidate_analysis_results
53 else:
54 from openhcs.processing.backends.analysis.consolidate_analysis_results import consolidate_analysis_results
55 return consolidate_analysis_results
57# Import generic component system - required for orchestrator functionality
59# Optional napari import for visualization
60try:
61 from openhcs.runtime.napari_stream_visualizer import NapariStreamVisualizer
62 NapariVisualizerType = NapariStreamVisualizer
63except ImportError:
64 # Create a placeholder type for type hints when napari is not available
65 NapariStreamVisualizer = None
66 NapariVisualizerType = Any # Use Any for type hints when napari is not available
68# Optional GPU memory management imports
69try:
70 from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks
71except ImportError:
72 cleanup_all_gpu_frameworks = None
75logger = logging.getLogger(__name__)
78def _create_merged_config(pipeline_config: 'PipelineConfig', global_config: GlobalPipelineConfig) -> GlobalPipelineConfig:
79 """
80 Pure function for creating merged config that preserves None values for sibling inheritance.
82 Follows OpenHCS stateless architecture principles - no side effects, explicit dependencies.
83 Extracted from apply_pipeline_config to eliminate code duplication.
84 """
85 logger.debug(f"Starting merge with pipeline_config={type(pipeline_config)} and global_config={type(global_config)}")
87 # DEBUG: Check what the global_config looks like
88 if hasattr(global_config, 'step_well_filter_config'): 88 ↛ 94line 88 didn't jump to line 94 because the condition on line 88 was always true
89 step_config = getattr(global_config, 'step_well_filter_config')
90 if hasattr(step_config, 'well_filter'): 90 ↛ 94line 90 didn't jump to line 94 because the condition on line 90 was always true
91 well_filter_value = getattr(step_config, 'well_filter')
92 logger.debug(f"global_config has step_well_filter_config.well_filter = {well_filter_value}")
94 merged_config_values = {}
95 for field in fields(GlobalPipelineConfig):
96 # Fail-loud: Let AttributeError bubble up naturally (no getattr fallbacks)
97 pipeline_value = getattr(pipeline_config, field.name)
99 if field.name == 'step_well_filter_config':
100 logger.debug(f"Processing step_well_filter_config: pipeline_value = {pipeline_value}")
102 if pipeline_value is not None:
103 # CRITICAL FIX: Convert lazy configs to base configs with resolved values
104 # This ensures that user-set values from lazy configs are preserved in the thread-local context
105 # instead of being replaced with static defaults when GlobalPipelineConfig is instantiated
106 if hasattr(pipeline_value, 'to_base_config'):
107 # This is a lazy config - convert to base config with resolved values
108 converted_value = pipeline_value.to_base_config()
109 merged_config_values[field.name] = converted_value
110 if field.name == 'step_well_filter_config':
111 logger.debug(f"Converted lazy config to base: {converted_value}")
112 else:
113 # Regular value - use as-is
114 merged_config_values[field.name] = pipeline_value
115 if field.name == 'step_well_filter_config': 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 logger.debug(f"Using pipeline value as-is: {pipeline_value}")
117 else:
118 global_value = getattr(global_config, field.name)
119 merged_config_values[field.name] = global_value
121 result = GlobalPipelineConfig(**merged_config_values)
122 return result
125def _execute_single_axis_static(
126 pipeline_definition: List[AbstractStep],
127 frozen_context: 'ProcessingContext',
128 visualizer: Optional['NapariVisualizerType']
129) -> Dict[str, Any]:
130 """
131 Static version of _execute_single_axis for multiprocessing compatibility.
133 This function is identical to PipelineOrchestrator._execute_single_axis but doesn't
134 require an orchestrator instance, making it safe for pickling in ProcessPoolExecutor.
136 Args:
137 pipeline_definition: List of pipeline steps to execute
138 frozen_context: Frozen processing context for this axis
139 visualizer: Optional Napari visualizer (not used in multiprocessing)
140 """
141 axis_id = frozen_context.axis_id
143 # NUCLEAR VALIDATION
144 if not frozen_context.is_frozen(): 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 error_msg = f"Context for axis {axis_id} is not frozen before execution"
146 logger.error(error_msg)
147 raise RuntimeError(error_msg)
149 if not pipeline_definition: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 error_msg = f"Empty pipeline_definition for axis {axis_id}"
151 logger.error(error_msg)
152 raise RuntimeError(error_msg)
154 # Execute each step in the pipeline
155 for step_index, step in enumerate(pipeline_definition):
156 step_name = frozen_context.step_plans[step_index]["step_name"]
158 # Verify step has process method (should always be true for AbstractStep subclasses)
159 if not hasattr(step, 'process'): 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 error_msg = f"Step {step_index+1} missing process method for axis {axis_id}"
161 logger.error(error_msg)
162 raise RuntimeError(error_msg)
164 # Call process method on step instance
165 step.process(frozen_context, step_index)
167 # Handle visualization if requested
168 if visualizer: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 step_plan = frozen_context.step_plans[step_index]
170 if step_plan['visualize']:
171 output_dir = step_plan['output_dir']
172 write_backend = step_plan['write_backend']
173 if output_dir:
174 logger.debug(f"Visualizing output for step {step_index} from path {output_dir} (backend: {write_backend}) for axis {axis_id}")
175 visualizer.visualize_path(
176 step_id=f"step_{step_index}",
177 path=str(output_dir),
178 backend=write_backend,
179 axis_id=axis_id
180 )
181 else:
182 logger.warning(f"Step {step_index} in axis {axis_id} flagged for visualization but 'output_dir' is missing in its plan.")
184 logger.info(f"🔥 SINGLE_AXIS: Pipeline execution completed successfully for axis {axis_id}")
185 result = {"status": "success", "axis_id": axis_id}
186 logger.info(f"🔥 SINGLE_AXIS: Returning result: {result}")
187 logger.info(f"🔥 SINGLE_AXIS: Result type check - status: {type(result['status'])}, axis_id: {type(result['axis_id'])}")
188 return result
191def _configure_worker_logging(log_file_base: str):
192 """
193 Configure logging and import hook for worker process.
195 This function is called once per worker process when it starts.
196 Each worker will get its own log file with a unique identifier.
198 Args:
199 log_file_base: Base path for worker log files
200 """
201 import os
202 import logging
203 import time
205 # CRITICAL: Skip function registry initialization for fast worker startup
206 # The environment variable is inherited from the subprocess runner
207 # Note: We don't log this yet because logging isn't configured
209 # Note: Import hook system was removed - using existing comprehensive registries
211 # Create unique worker identifier using PID and timestamp
212 worker_pid = os.getpid()
213 worker_timestamp = int(time.time() * 1000000) # Microsecond precision for uniqueness
214 worker_id = f"{worker_pid}_{worker_timestamp}"
215 worker_log_file = f"{log_file_base}_worker_{worker_id}.log"
217 # Configure root logger to capture ALL logs from worker process
218 root_logger = logging.getLogger()
219 root_logger.handlers.clear() # Clear any inherited handlers
221 # Create file handler for worker logs
222 file_handler = logging.FileHandler(worker_log_file)
223 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
224 root_logger.addHandler(file_handler)
225 root_logger.setLevel(logging.INFO)
227 # Ensure all OpenHCS module logs are captured
228 logging.getLogger("openhcs").setLevel(logging.INFO)
230 # Get worker logger
231 worker_logger = logging.getLogger("openhcs.worker")
234def _configure_worker_with_gpu(log_file_base: str, global_config_dict: dict):
235 """
236 Configure logging, function registry, and GPU registry for worker process.
238 This function is called once per worker process when it starts.
239 It sets up logging, function registry, and GPU registry initialization.
241 Args:
242 log_file_base: Base path for worker log files (empty string if no logging)
243 global_config_dict: Serialized global configuration for GPU registry setup
244 """
245 import logging
246 import os
248 # Workers should be allowed to import GPU libs if available.
249 # The parent subprocess runner may set OPENHCS_SUBPROCESS_NO_GPU=1 to stay lean,
250 # but that flag must not leak into worker processes.
251 os.environ.pop('OPENHCS_SUBPROCESS_NO_GPU', None)
253 # Configure logging only if log_file_base is provided
254 if log_file_base: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 _configure_worker_logging(log_file_base)
256 worker_logger = logging.getLogger("openhcs.worker")
257 else:
258 # Set up basic logging for worker messages
259 logging.basicConfig(level=logging.INFO)
260 worker_logger = logging.getLogger("openhcs.worker")
262 # Initialize function registry for this worker process
263 try:
264 # Import and initialize function registry (will auto-discover all libraries)
265 import openhcs.processing.func_registry as func_registry_module
267 # Force initialization if not already done (workers need full registry)
268 with func_registry_module._registry_lock:
269 if not func_registry_module._registry_initialized: 269 ↛ 277line 269 didn't jump to line 277
270 func_registry_module._auto_initialize_registry()
272 except Exception as e:
273 # Don't raise - let worker continue, registry will auto-init on first function call
274 pass
276 # Initialize GPU registry for this worker process
277 try:
278 # Reconstruct global config from dict
279 from openhcs.core.config import GlobalPipelineConfig
280 global_config = GlobalPipelineConfig(**global_config_dict)
282 # Initialize GPU registry for this worker
283 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry
284 setup_global_gpu_registry(global_config)
286 except Exception as e:
287 # Don't raise - let worker continue without GPU if needed
288 pass
291# Global variable to store log file base for worker processes
292_worker_log_file_base = None
298class PipelineOrchestrator(ContextProvider):
299 """
300 Updated orchestrator supporting both global and per-orchestrator configuration.
302 Global configuration: Updates all orchestrators (existing behavior)
303 Per-orchestrator configuration: Affects only this orchestrator instance
305 The orchestrator first compiles the pipeline for all specified axis values,
306 creating frozen, immutable ProcessingContexts using `compile_plate_for_processing()`.
307 Then, it executes the (now stateless) pipeline definition against these contexts,
308 potentially in parallel, using `execute_compiled_plate()`.
309 """
310 _context_type = "orchestrator" # Register as orchestrator context provider
312 def __init__(
313 self,
314 plate_path: Union[str, Path],
315 workspace_path: Optional[Union[str, Path]] = None,
316 *,
317 pipeline_config: Optional['PipelineConfig'] = None,
318 storage_registry: Optional[Any] = None,
319 progress_callback: Optional[Callable[[str, str, str, Dict[str, Any]], None]] = None,
320 ):
321 # Lock removed - was orphaned code never used
323 # Validate shared global context exists
324 if get_current_global_config(GlobalPipelineConfig) is None: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true
325 raise RuntimeError(
326 "No global configuration context found. "
327 "Ensure application startup has called ensure_global_config_context()."
328 )
330 # Track executor for cancellation support
331 self._executor = None
333 # Initialize auto-sync control for pipeline config
334 self._pipeline_config = None
335 self._auto_sync_enabled = True
337 # Context management now handled by contextvars-based system
339 # Initialize per-orchestrator configuration
340 # DUAL-AXIS FIX: Always create a PipelineConfig instance to make orchestrator detectable as context provider
341 # This ensures the orchestrator has a dataclass attribute for stack introspection
342 # PipelineConfig is already the lazy version of GlobalPipelineConfig
343 from openhcs.core.config import PipelineConfig
344 if pipeline_config is None: 344 ↛ 347line 344 didn't jump to line 347 because the condition on line 344 was never true
345 # CRITICAL FIX: Create pipeline config that inherits from global config
346 # This ensures the orchestrator's pipeline_config has the global values for resolution
347 pipeline_config = PipelineConfig()
349 # CRITICAL FIX: Do NOT apply global config inheritance during initialization
350 # PipelineConfig should always have None values that resolve through lazy resolution
351 # Copying concrete values breaks the placeholder system and makes all fields appear "explicitly set"
353 self.pipeline_config = pipeline_config
355 # CRITICAL FIX: Expose pipeline config as public attribute for dual-axis resolver discovery
356 # The resolver's _is_context_provider method only finds public attributes (skips _private)
357 # This allows the resolver to discover the orchestrator's pipeline config during context resolution
358 self.pipeline_config = pipeline_config
359 logger.info("PipelineOrchestrator initialized with PipelineConfig for context discovery.")
361 # REMOVED: Unnecessary thread-local modification
362 # The orchestrator should not modify thread-local storage during initialization
363 # Global config is already available through the dual-axis resolver fallback
365 # Convert to Path and validate
366 if plate_path: 366 ↛ 379line 366 didn't jump to line 379 because the condition on line 366 was always true
367 plate_path = Path(plate_path)
369 # Validate filesystem paths (skip for OMERO virtual paths)
370 if not str(plate_path).startswith("/omero/"): 370 ↛ 379line 370 didn't jump to line 379 because the condition on line 370 was always true
371 if not plate_path.is_absolute(): 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 raise ValueError(f"Plate path must be absolute: {plate_path}")
373 if not plate_path.exists(): 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 raise FileNotFoundError(f"Plate path does not exist: {plate_path}")
375 if not plate_path.is_dir(): 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true
376 raise NotADirectoryError(f"Plate path is not a directory: {plate_path}")
378 # Initialize _plate_path_frozen first to allow plate_path to be set during initialization
379 object.__setattr__(self, '_plate_path_frozen', False)
381 self.plate_path = plate_path
382 self.workspace_path = workspace_path
384 if self.plate_path is None and self.workspace_path is None: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true
385 raise ValueError("Either plate_path or workspace_path must be provided for PipelineOrchestrator.")
387 # Freeze plate_path immediately after setting it to prove immutability
388 object.__setattr__(self, '_plate_path_frozen', True)
389 logger.info(f"🔒 PLATE_PATH FROZEN: {self.plate_path} is now immutable")
391 if storage_registry: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 self.registry = storage_registry
393 logger.info("PipelineOrchestrator using provided StorageRegistry instance.")
394 else:
395 # Create a copy of the global registry to avoid modifying shared state
396 from openhcs.io.base import storage_registry as global_storage_registry, ensure_storage_registry
397 # Ensure registry is initialized before copying
398 ensure_storage_registry()
399 self.registry = global_storage_registry.copy()
400 logger.info("PipelineOrchestrator created its own StorageRegistry instance (copy of global).")
402 # Override zarr backend with orchestrator's config
403 shared_context = get_current_global_config(GlobalPipelineConfig)
404 zarr_backend_with_config = ZarrStorageBackend(shared_context.zarr_config)
405 self.registry[Backend.ZARR.value] = zarr_backend_with_config
406 logger.info(f"Orchestrator zarr backend configured with {shared_context.zarr_config.compressor.value} compression")
408 # Orchestrator always creates its own FileManager, using the determined registry
409 self.filemanager = FileManager(self.registry)
410 self.input_dir: Optional[Path] = None
411 self.microscope_handler: Optional[MicroscopeHandler] = None
412 self.default_pipeline_definition: Optional[List[AbstractStep]] = None
413 self._initialized: bool = False
414 self._state: OrchestratorState = OrchestratorState.CREATED
416 # Progress callback for real-time execution updates
417 self.progress_callback = progress_callback
418 if progress_callback: 418 ↛ 419line 418 didn't jump to line 419 because the condition on line 418 was never true
419 logger.info("PipelineOrchestrator initialized with progress callback")
421 # Component keys cache for fast access - uses AllComponents (includes multiprocessing axis)
422 self._component_keys_cache: Dict['AllComponents', List[str]] = {}
424 # Metadata cache service
425 self._metadata_cache_service = get_metadata_cache()
427 # Viewer management - shared between pipeline execution and image browser
428 self._visualizers = {} # Dict[(backend_name, port)] -> visualizer instance
431 def __setattr__(self, name: str, value: Any) -> None:
432 """
433 Set an attribute, preventing modification of plate_path after it's frozen.
435 This proves that plate_path is truly immutable after initialization.
436 """
437 if name == 'plate_path' and getattr(self, '_plate_path_frozen', False): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 import traceback
439 stack_trace = ''.join(traceback.format_stack())
440 error_msg = (
441 f"🚫 IMMUTABLE PLATE_PATH VIOLATION: Cannot modify plate_path after freezing!\n"
442 f"Current value: {getattr(self, 'plate_path', 'UNSET')}\n"
443 f"Attempted new value: {value}\n"
444 f"Stack trace:\n{stack_trace}"
445 )
446 logger.error(error_msg)
447 raise AttributeError(error_msg)
448 super().__setattr__(name, value)
450 @property
451 def state(self) -> OrchestratorState:
452 """Get the current orchestrator state."""
453 return self._state
455 def get_or_create_visualizer(self, config, vis_config=None):
456 """
457 Get existing visualizer or create a new one for the given config.
459 This method is shared between pipeline execution and image browser to avoid
460 duplicating viewer instances. Viewers are tracked by (backend_name, port) key.
462 Args:
463 config: Streaming config (any StreamingConfig subclass)
464 vis_config: Optional visualizer config (can be None for image browser)
466 Returns:
467 Visualizer instance
468 """
469 from openhcs.core.config import StreamingConfig
471 # Generic streaming config handling using polymorphic attributes
472 if isinstance(config, StreamingConfig): 472 ↛ 485line 472 didn't jump to line 485 because the condition on line 472 was always true
473 # Start global ack listener (must be before viewers connect)
474 from openhcs.runtime.zmq_base import start_global_ack_listener
475 start_global_ack_listener(config.transport_mode)
477 # Pre-create queue tracker using polymorphic attributes
478 from openhcs.runtime.queue_tracker import GlobalQueueTrackerRegistry
479 registry = GlobalQueueTrackerRegistry()
480 registry.get_or_create_tracker(config.port, config.viewer_type)
481 logger.info(f"🔬 ORCHESTRATOR: Pre-created queue tracker for {config.viewer_type} on port {config.port}")
483 key = (config.viewer_type, config.port)
484 else:
485 backend_name = config.backend.name if hasattr(config, 'backend') else 'unknown'
486 key = (backend_name,)
488 # Check if we already have a visualizer for this key
489 if key in self._visualizers: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true
490 vis = self._visualizers[key]
491 if vis.is_running:
492 return vis
493 else:
494 del self._visualizers[key]
496 # Create new visualizer using polymorphic create_visualizer method
497 vis = config.create_visualizer(self.filemanager, vis_config)
499 # Start viewer asynchronously for streaming configs
500 if isinstance(config, StreamingConfig): 500 ↛ 514line 500 didn't jump to line 514 because the condition on line 500 was always true
501 vis.start_viewer(async_mode=True)
503 # Ping server to set ready state (background thread to avoid blocking)
504 import threading
505 def ping_server():
506 import time
507 time.sleep(1.0) # Give server time to start
508 if hasattr(vis, '_wait_for_server_ready'):
509 vis._wait_for_server_ready(timeout=10.0)
511 thread = threading.Thread(target=ping_server, daemon=True)
512 thread.start()
513 else:
514 vis.start_viewer()
516 # Store in cache
517 self._visualizers[key] = vis
519 return vis
521 def initialize_microscope_handler(self):
522 """Initializes the microscope handler."""
523 if self.microscope_handler is not None: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 logger.debug("Microscope handler already initialized.")
525 return
526# if self.input_dir is None:
527# raise RuntimeError("Workspace (and input_dir) must be initialized before microscope handler.")
529 logger.info(f"Initializing microscope handler using input directory: {self.input_dir}...")
530 try:
531 # Use configured microscope type or auto-detect
532 shared_context = get_current_global_config(GlobalPipelineConfig)
533 microscope_type = shared_context.microscope.value if shared_context.microscope != Microscope.AUTO else 'auto'
534 self.microscope_handler = create_microscope_handler(
535 plate_folder=str(self.plate_path),
536 filemanager=self.filemanager,
537 microscope_type=microscope_type,
538 )
539 logger.info(f"Initialized microscope handler: {type(self.microscope_handler).__name__}")
540 except Exception as e:
541 error_msg = f"Failed to create microscope handler: {e}"
542 logger.error(error_msg)
543 raise RuntimeError(error_msg) from e
545 def initialize(self, workspace_path: Optional[Union[str, Path]] = None) -> 'PipelineOrchestrator':
546 """
547 Initializes all required components for the orchestrator.
548 Must be called before other processing methods.
549 Returns self for chaining.
550 """
551 logger.info(f"🔥 INIT: initialize() called for plate: {self.plate_path}")
552 if self._initialized: 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true
553 logger.info("Orchestrator already initialized.")
554 return self
556 try:
557 logger.info(f"🔥 INIT: About to call initialize_microscope_handler()")
558 self.initialize_microscope_handler()
560 # Delegate workspace initialization to microscope handler
561 logger.info("Initializing workspace with microscope handler...")
562 actual_image_dir = self.microscope_handler.initialize_workspace(
563 self.plate_path, self.filemanager
564 )
566 # Use the actual image directory returned by the microscope handler
567 # All handlers now return Path (including OMERO with virtual paths)
568 self.input_dir = Path(actual_image_dir)
569 logger.info(f"Set input directory to: {self.input_dir}")
571 # Set workspace_path based on what the handler returned
572 if actual_image_dir != self.plate_path:
573 # Handler created a workspace (or virtual path for OMERO)
574 self.workspace_path = Path(actual_image_dir).parent if Path(actual_image_dir).name != "workspace" else Path(actual_image_dir)
575 else:
576 # Handler used plate directly (like OpenHCS)
577 self.workspace_path = None
579 # Mark as initialized BEFORE caching to avoid chicken-and-egg problem
580 self._initialized = True
581 self._state = OrchestratorState.READY
583 # Auto-cache component keys and metadata for instant access
584 logger.info("Caching component keys and metadata...")
585 self.cache_component_keys()
586 self._metadata_cache_service.cache_metadata(
587 self.microscope_handler,
588 self.plate_path,
589 self._component_keys_cache
590 )
592 # Ensure complete OpenHCS metadata exists
593 self._ensure_openhcs_metadata()
595 logger.info("PipelineOrchestrator fully initialized with cached component keys and metadata.")
596 return self
597 except Exception as e:
598 self._state = OrchestratorState.INIT_FAILED
599 logger.error(f"Failed to initialize orchestrator: {e}")
600 raise
602 def is_initialized(self) -> bool:
603 return self._initialized
605 def _ensure_openhcs_metadata(self) -> None:
606 """Ensure complete OpenHCS metadata exists for the plate.
608 Uses the same context creation logic as pipeline execution to get full metadata
609 with channel names from metadata files (HTD, Index.xml, etc).
611 Skips OMERO and other non-disk-based microscope handlers since they don't have
612 real disk directories.
613 """
614 from openhcs.microscopes.openhcs import OpenHCSMetadataGenerator
616 # Skip metadata creation for OMERO and other non-disk-based handlers
617 # OMERO uses virtual paths like /omero/plate_1 which are not real directories
618 if self.microscope_handler.microscope_type == 'omero': 618 ↛ 619line 618 didn't jump to line 619 because the condition on line 618 was never true
619 logger.debug("Skipping metadata creation for OMERO plate (uses virtual paths)")
620 return
622 # For plates with virtual workspace, metadata is already created by _build_virtual_mapping()
623 # We just need to add the component metadata to the existing "." subdirectory
624 from openhcs.io.metadata_writer import get_subdirectory_name
625 subdir_name = get_subdirectory_name(self.input_dir, self.plate_path)
627 # Create context using SAME logic as create_context() to get full metadata
628 context = self.create_context(axis_id="metadata_init")
630 # Create metadata (will skip if already complete)
631 generator = OpenHCSMetadataGenerator(self.filemanager)
632 generator.create_metadata(
633 context,
634 str(self.input_dir),
635 "disk",
636 is_main=True,
637 plate_root=str(self.plate_path),
638 sub_dir=subdir_name,
639 skip_if_complete=True
640 )
642 def get_results_path(self) -> Path:
643 """Get the results directory path for this orchestrator's plate.
645 Uses the same logic as PathPlanner._get_results_path() to ensure consistency.
646 This is the single source of truth for where results are stored.
648 Returns:
649 Path to results directory (absolute or relative to output plate root)
650 """
651 from openhcs.core.pipeline.path_planner import PipelinePathPlanner
653 # Get materialization_results_path from global config
654 materialization_path = self.global_config.materialization_results_path
656 # If absolute, use as-is
657 if Path(materialization_path).is_absolute():
658 return Path(materialization_path)
660 # If relative, resolve relative to output plate root
661 # Use path_planning_config from global config
662 path_config = self.global_config.path_planning_config
663 output_plate_root = PipelinePathPlanner.build_output_plate_root(
664 self.plate_path,
665 path_config,
666 is_per_step_materialization=False
667 )
669 return output_plate_root / materialization_path
671 def create_context(self, axis_id: str) -> ProcessingContext:
672 """Creates a ProcessingContext for a given multiprocessing axis value."""
673 if not self.is_initialized(): 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 raise RuntimeError("Orchestrator must be initialized before calling create_context().")
675 if not axis_id: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true
676 raise ValueError("Axis identifier must be provided.")
677 if self.input_dir is None: 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true
678 raise RuntimeError("Orchestrator input_dir is not set; initialize orchestrator first.")
680 context = ProcessingContext(
681 global_config=self.get_effective_config(),
682 axis_id=axis_id,
683 filemanager=self.filemanager
684 )
685 # Orchestrator reference removed - was orphaned and unpickleable
686 context.microscope_handler = self.microscope_handler
687 context.input_dir = self.input_dir
688 context.workspace_path = self.workspace_path
689 context.plate_path = self.plate_path # Add plate_path for path planner
691 # CRITICAL: Pass metadata cache for OpenHCS metadata creation
692 # Extract cached metadata from service and convert to dict format expected by OpenHCSMetadataGenerator
693 metadata_dict = {}
694 for component in AllComponents:
695 cached_metadata = self._metadata_cache_service.get_cached_metadata(component)
696 if cached_metadata:
697 metadata_dict[component] = cached_metadata
698 context.metadata_cache = metadata_dict
700 return context
702 def compile_pipelines(
703 self,
704 pipeline_definition: List[AbstractStep],
705 well_filter: Optional[List[str]] = None,
706 enable_visualizer_override: bool = False
707 ) -> Dict[str, ProcessingContext]:
708 """Compile pipelines for axis values (well_filter name preserved for UI compatibility)."""
709 return PipelineCompiler.compile_pipelines(
710 orchestrator=self,
711 pipeline_definition=pipeline_definition,
712 axis_filter=well_filter, # Translate well_filter to axis_filter for generic backend
713 enable_visualizer_override=enable_visualizer_override
714 )
716 def _execute_single_axis(
717 self,
718 pipeline_definition: List[AbstractStep],
719 frozen_context: ProcessingContext,
720 visualizer: Optional[NapariVisualizerType]
721 ) -> Dict[str, Any]:
722 """Executes the pipeline for a single well using its frozen context."""
723 axis_id = frozen_context.axis_id
724 logger.info(f"🔥 SINGLE_AXIS: Starting execution for axis {axis_id}")
726 # Send progress: axis started
727 if self.progress_callback:
728 try:
729 self.progress_callback(axis_id, 'pipeline', 'started', {
730 'total_steps': len(pipeline_definition)
731 })
732 except Exception as e:
733 logger.warning(f"Progress callback failed for axis {axis_id} start: {e}")
735 # NUCLEAR VALIDATION
736 if not frozen_context.is_frozen():
737 error_msg = f"🔥 SINGLE_AXIS ERROR: Context for axis {axis_id} is not frozen before execution"
738 logger.error(error_msg)
739 raise RuntimeError(error_msg)
741 if not pipeline_definition:
742 error_msg = f"🔥 SINGLE_AXIS ERROR: Empty pipeline_definition for axis {axis_id}"
743 logger.error(error_msg)
744 raise RuntimeError(error_msg)
746 # Step IDs are consistent since pipeline_definition comes from UI (no remapping needed)
748 logger.info(f"🔥 SINGLE_AXIS: Processing {len(pipeline_definition)} steps for axis {axis_id}")
750 for step_index, step in enumerate(pipeline_definition):
751 step_name = frozen_context.step_plans[step_index]["step_name"]
753 logger.info(f"🔥 SINGLE_AXIS: Executing step {step_index+1}/{len(pipeline_definition)} - {step_name} for axis {axis_id}")
755 # Send progress: step started
756 if self.progress_callback:
757 try:
758 self.progress_callback(axis_id, step_name, 'started', {
759 'step_index': step_index,
760 'total_steps': len(pipeline_definition)
761 })
762 except Exception as e:
763 logger.warning(f"Progress callback failed for axis {axis_id} step {step_name} start: {e}")
765 # Verify step has process method (should always be true for AbstractStep subclasses)
766 if not hasattr(step, 'process'):
767 error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} missing process method for axis {axis_id}"
768 logger.error(error_msg)
769 raise RuntimeError(error_msg)
771 # Call process method on step instance
772 step.process(frozen_context, step_index)
773 logger.info(f"🔥 SINGLE_AXIS: Step {step_index+1}/{len(pipeline_definition)} - {step_name} completed for axis {axis_id}")
775 # Send progress: step completed
776 if self.progress_callback:
777 try:
778 self.progress_callback(axis_id, step_name, 'completed', {
779 'step_index': step_index,
780 'total_steps': len(pipeline_definition)
781 })
782 except Exception as e:
783 logger.warning(f"Progress callback failed for axis {axis_id} step {step_name} completion: {e}")
785 # except Exception as step_error:
786 # import traceback
787 # full_traceback = traceback.format_exc()
788 # error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} ({step_id}) failed for axis {axis_id}: {step_error}"
789 # logger.error(error_msg, exc_info=True)
790 # logger.error(f"🔥 SINGLE_AXIS TRACEBACK for axis {axis_id}, step {step_index+1} ({step_id}):\n{full_traceback}")
791 # raise RuntimeError(error_msg) from step_error
793 if visualizer:
794 step_plan = frozen_context.step_plans[step_index]
795 if step_plan['visualize']:
796 output_dir = step_plan['output_dir']
797 write_backend = step_plan['write_backend']
798 if output_dir:
799 logger.debug(f"Visualizing output for step {step_index} from path {output_dir} (backend: {write_backend}) for axis {axis_id}")
800 visualizer.visualize_path(
801 step_id=f"step_{step_index}",
802 path=str(output_dir),
803 backend=write_backend,
804 axis_id=axis_id
805 )
806 else:
807 logger.warning(f"Step {step_index} in axis {axis_id} flagged for visualization but 'output_dir' is missing in its plan.")
809 logger.info(f"🔥 SINGLE_AXIS: Pipeline execution completed successfully for axis {axis_id}")
811 # Send progress: axis completed
812 if self.progress_callback:
813 try:
814 self.progress_callback(axis_id, 'pipeline', 'completed', {
815 'total_steps': len(pipeline_definition)
816 })
817 except Exception as e:
818 logger.warning(f"Progress callback failed for axis {axis_id} completion: {e}")
820 return {"status": "success", "axis_id": axis_id}
822 def cancel_execution(self):
823 """
824 Cancel ongoing execution by shutting down the executor.
826 This gracefully cancels pending futures and shuts down worker processes
827 without killing all child processes (preserving Napari viewers, etc.).
828 """
829 if self._executor:
830 try:
831 logger.info("🔥 ORCHESTRATOR: Cancelling execution - shutting down executor")
832 self._executor.shutdown(wait=False, cancel_futures=True)
833 logger.info("🔥 ORCHESTRATOR: Executor shutdown initiated")
834 except Exception as e:
835 logger.warning(f"🔥 ORCHESTRATOR: Failed to cancel executor: {e}")
837 def execute_compiled_plate(
838 self,
839 pipeline_definition: List[AbstractStep],
840 compiled_contexts: Dict[str, ProcessingContext],
841 max_workers: Optional[int] = None,
842 visualizer: Optional[NapariVisualizerType] = None,
843 log_file_base: Optional[str] = None
844 ) -> Dict[str, Dict[str, Any]]:
845 """
846 Execute-all phase: Runs the stateless pipeline against compiled contexts.
848 Args:
849 pipeline_definition: The stateless list of AbstractStep objects.
850 compiled_contexts: Dict of axis_id to its compiled, frozen ProcessingContext.
851 Obtained from `compile_plate_for_processing`.
852 max_workers: Maximum number of worker threads for parallel execution.
853 visualizer: Optional instance of NapariStreamVisualizer for real-time visualization
854 (requires napari to be installed; must be initialized with orchestrator's filemanager by the caller).
855 log_file_base: Base path for worker process log files (without extension).
856 Each worker will create its own log file: {log_file_base}_worker_{pid}.log
858 Returns:
859 A dictionary mapping well IDs to their execution status (success/error and details).
860 """
862 # CRITICAL FIX: Use resolved pipeline definition from compilation if available
863 # For subprocess runner, use the parameter directly since it receives pre-compiled contexts
864 resolved_pipeline = getattr(self, '_resolved_pipeline_definition', None)
865 if resolved_pipeline is not None: 865 ↛ 866line 865 didn't jump to line 866 because the condition on line 865 was never true
866 logger.info(f"🔥 EXECUTION: Using resolved pipeline definition with {len(resolved_pipeline)} steps (from compilation)")
867 pipeline_definition = resolved_pipeline
868 else:
869 logger.info(f"🔥 EXECUTION: Using parameter pipeline definition with {len(pipeline_definition)} steps (subprocess mode)")
870 # In subprocess mode, the pipeline_definition parameter should already be resolved
871 if not self.is_initialized(): 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true
872 raise RuntimeError("Orchestrator must be initialized before executing.")
873 if not pipeline_definition: 873 ↛ 874line 873 didn't jump to line 874 because the condition on line 873 was never true
874 raise ValueError("A valid (stateless) pipeline definition must be provided.")
875 if not compiled_contexts: 875 ↛ 876line 875 didn't jump to line 876 because the condition on line 875 was never true
876 logger.warning("No compiled contexts provided for execution.")
877 return {}
879 # Use effective config (includes pipeline config) instead of global config directly
880 actual_max_workers = max_workers if max_workers is not None else self.get_effective_config().num_workers
881 if actual_max_workers <= 0: # Ensure positive number of workers 881 ↛ 882line 881 didn't jump to line 882 because the condition on line 881 was never true
882 actual_max_workers = 1
884 # 🔬 AUTOMATIC VISUALIZER CREATION: Create visualizers if compiler detected streaming
885 visualizers = []
886 if visualizer is None: 886 ↛ 934line 886 didn't jump to line 934 because the condition on line 886 was always true
887 from openhcs.core.config import StreamingConfig
889 # Collect unique configs (deduplicate by viewer_type + port)
890 unique_configs = {}
891 for ctx in compiled_contexts.values():
892 for visualizer_info in ctx.required_visualizers:
893 config = visualizer_info['config']
894 key = (config.viewer_type, config.port) if isinstance(config, StreamingConfig) else (config.backend.name,)
895 if key not in unique_configs:
896 unique_configs[key] = (config, ctx.visualizer_config)
898 # Create visualizers
899 for config, vis_config in unique_configs.values():
900 visualizers.append(self.get_or_create_visualizer(config, vis_config))
902 # Wait for all streaming viewers to be ready before starting pipeline
903 # This ensures viewers are available to receive images
904 if visualizers:
905 logger.info(f"🔬 ORCHESTRATOR: Waiting for {len(visualizers)} streaming viewer(s) to be ready...")
906 import time
907 max_wait = 30.0 # Maximum wait time in seconds
908 start_time = time.time()
910 while time.time() - start_time < max_wait:
911 all_ready = all(v.is_running for v in visualizers)
912 if all_ready: 912 ↛ 913line 912 didn't jump to line 913 because the condition on line 912 was never true
913 logger.info("🔬 ORCHESTRATOR: All streaming viewers are ready!")
914 break
915 time.sleep(0.2) # Check every 200ms
916 else:
917 # Timeout - log which viewers aren't ready (use generic port attribute)
918 not_ready = [v.port for v in visualizers if not v.is_running]
919 logger.warning(f"🔬 ORCHESTRATOR: Timeout waiting for streaming viewers. Not ready: {not_ready}")
921 # Clear viewer state for new pipeline run to prevent accumulation
922 logger.info("🔬 ORCHESTRATOR: Clearing streaming viewer state for new pipeline run...")
923 for vis in visualizers:
924 if hasattr(vis, 'clear_viewer_state'): 924 ↛ 923line 924 didn't jump to line 923 because the condition on line 924 was always true
925 success = vis.clear_viewer_state()
926 if success: 926 ↛ 927line 926 didn't jump to line 927 because the condition on line 926 was never true
927 logger.info(f"🔬 ORCHESTRATOR: Cleared state for viewer on port {vis.port}")
928 else:
929 logger.warning(f"🔬 ORCHESTRATOR: Failed to clear state for viewer on port {vis.port}")
931 # For backwards compatibility, set visualizer to the first one
932 visualizer = visualizers[0] if visualizers else None
934 self._state = OrchestratorState.EXECUTING
935 logger.info(f"Starting execution for {len(compiled_contexts)} axis values with max_workers={actual_max_workers}.")
937 try:
938 execution_results: Dict[str, Dict[str, Any]] = {}
940 # CUDA COMPATIBILITY: Set spawn method for multiprocessing to support CUDA
941 try:
942 # Check if spawn method is available and set it if not already set
943 current_method = multiprocessing.get_start_method(allow_none=True)
944 if current_method != 'spawn':
945 logger.info(f"🔥 CUDA: Setting multiprocessing start method from '{current_method}' to 'spawn' for CUDA compatibility")
946 multiprocessing.set_start_method('spawn', force=True)
947 else:
948 logger.debug("🔥 CUDA: Multiprocessing start method already set to 'spawn'")
949 except RuntimeError as e:
950 # Start method may already be set, which is fine
951 logger.debug(f"🔥 CUDA: Start method already configured: {e}")
953 # Choose executor type based on effective config for debugging support
954 effective_config = self.get_effective_config()
955 executor_type = "ThreadPoolExecutor" if effective_config.use_threading else "ProcessPoolExecutor"
956 logger.info(f"🔥 ORCHESTRATOR: Creating {executor_type} with {actual_max_workers} workers")
958 # DEATH DETECTION: Mark executor creation
959 logger.info(f"🔥 DEATH_MARKER: BEFORE_{executor_type.upper()}_CREATION")
961 # Choose appropriate executor class and configure worker logging
962 if effective_config.use_threading: 962 ↛ 963line 962 didn't jump to line 963 because the condition on line 962 was never true
963 logger.info("🔥 DEBUG MODE: Using ThreadPoolExecutor for easier debugging")
964 executor = concurrent.futures.ThreadPoolExecutor(max_workers=actual_max_workers)
965 else:
966 logger.info("🔥 PRODUCTION MODE: Using ProcessPoolExecutor for true parallelism")
967 # CRITICAL FIX: Use _configure_worker_with_gpu to ensure workers have function registry
968 # Workers need the function registry to access decorated functions with memory types
969 global_config = get_current_global_config(GlobalPipelineConfig)
970 global_config_dict = global_config.__dict__ if global_config else {}
972 if log_file_base: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true
973 logger.info("🔥 WORKER SETUP: Configuring worker processes with function registry and logging")
974 executor = concurrent.futures.ProcessPoolExecutor(
975 max_workers=actual_max_workers,
976 initializer=_configure_worker_with_gpu,
977 initargs=(log_file_base, global_config_dict)
978 )
979 else:
980 logger.info("🔥 WORKER SETUP: Configuring worker processes with function registry (no logging)")
981 executor = concurrent.futures.ProcessPoolExecutor(
982 max_workers=actual_max_workers,
983 initializer=_configure_worker_with_gpu,
984 initargs=("", global_config_dict) # Empty string for no logging
985 )
987 logger.info(f"🔥 DEATH_MARKER: ENTERING_{executor_type.upper()}_CONTEXT")
988 # Store executor for cancellation support
989 self._executor = executor
990 with executor:
991 logger.info(f"🔥 DEATH_MARKER: {executor_type.upper()}_CREATED_SUCCESSFULLY")
992 logger.info(f"🔥 ORCHESTRATOR: {executor_type} created, submitting {len(compiled_contexts)} tasks")
994 # NUCLEAR ERROR TRACING: Create snapshot of compiled_contexts to prevent iteration issues
995 contexts_snapshot = dict(compiled_contexts.items())
996 logger.info(f"🔥 ORCHESTRATOR: Created contexts snapshot with {len(contexts_snapshot)} items")
998 # CRITICAL FIX: Resolve all lazy dataclass instances before multiprocessing
999 # This ensures that the contexts are safe for pickling in ProcessPoolExecutor
1000 # Note: Don't resolve pipeline_definition as it may overwrite collision-resolved configs
1001 logger.info("🔥 ORCHESTRATOR: Resolving lazy dataclasses for multiprocessing compatibility")
1002 contexts_snapshot = resolve_lazy_configurations_for_serialization(contexts_snapshot)
1003 logger.info("🔥 ORCHESTRATOR: Lazy dataclass resolution completed")
1005 logger.info("🔥 DEATH_MARKER: BEFORE_TASK_SUBMISSION_LOOP")
1006 future_to_axis_id = {}
1007 config = get_openhcs_config()
1008 if not config: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true
1009 raise RuntimeError("Component configuration is required for orchestrator execution")
1010 axis_name = config.multiprocessing_axis.value
1011 for axis_id, context in contexts_snapshot.items():
1012 try:
1013 logger.info(f"🔥 DEATH_MARKER: SUBMITTING_TASK_FOR_{axis_name.upper()}_{axis_id}")
1014 logger.info(f"🔥 ORCHESTRATOR: Submitting task for {axis_name} {axis_id}")
1015 # Resolve all arguments before passing to ProcessPoolExecutor
1016 resolved_context = resolve_lazy_configurations_for_serialization(context)
1018 # Use static function to avoid pickling the orchestrator instance
1019 # Note: Use original pipeline_definition to preserve collision-resolved configs
1020 # Don't pass visualizer to worker processes - they communicate via ZeroMQ
1021 future = executor.submit(
1022 _execute_single_axis_static,
1023 pipeline_definition,
1024 resolved_context,
1025 None # visualizer
1026 )
1027 future_to_axis_id[future] = axis_id
1028 logger.info(f"🔥 ORCHESTRATOR: Task submitted for {axis_name} {axis_id}")
1029 logger.info(f"🔥 DEATH_MARKER: TASK_SUBMITTED_FOR_{axis_name.upper()}_{axis_id}")
1030 except Exception as submit_error:
1031 error_msg = f"🔥 ORCHESTRATOR ERROR: Failed to submit task for {axis_name} {axis_id}: {submit_error}"
1032 logger.error(error_msg, exc_info=True)
1033 # FAIL-FAST: Re-raise task submission errors immediately
1034 raise
1036 logger.info("🔥 DEATH_MARKER: TASK_SUBMISSION_LOOP_COMPLETED")
1038 logger.info(f"🔥 ORCHESTRATOR: All {len(future_to_axis_id)} tasks submitted, waiting for completion")
1039 logger.info("🔥 DEATH_MARKER: BEFORE_COMPLETION_LOOP")
1041 completed_count = 0
1042 logger.info("🔥 DEATH_MARKER: ENTERING_AS_COMPLETED_LOOP")
1043 for future in concurrent.futures.as_completed(future_to_axis_id):
1044 axis_id = future_to_axis_id[future]
1045 completed_count += 1
1046 logger.info(f"🔥 DEATH_MARKER: PROCESSING_COMPLETED_TASK_{completed_count}_{axis_name.upper()}_{axis_id}")
1047 logger.info(f"🔥 ORCHESTRATOR: Task {completed_count}/{len(future_to_axis_id)} completed for {axis_name} {axis_id}")
1049 try:
1050 logger.info(f"🔥 DEATH_MARKER: CALLING_FUTURE_RESULT_FOR_{axis_name.upper()}_{axis_id}")
1051 result = future.result()
1052 logger.info(f"🔥 DEATH_MARKER: FUTURE_RESULT_SUCCESS_FOR_{axis_name.upper()}_{axis_id}")
1053 logger.info(f"🔥 ORCHESTRATOR: {axis_name.title()} {axis_id} result: {result}")
1054 execution_results[axis_id] = result
1055 logger.info(f"🔥 DEATH_MARKER: RESULT_STORED_FOR_{axis_name.upper()}_{axis_id}")
1056 except Exception as exc:
1057 import traceback
1058 full_traceback = traceback.format_exc()
1059 error_msg = f"{axis_name.title()} {axis_id} generated an exception during execution: {exc}"
1060 logger.error(f"🔥 ORCHESTRATOR ERROR: {error_msg}", exc_info=True)
1061 logger.error(f"🔥 ORCHESTRATOR FULL TRACEBACK for {axis_name} {axis_id}:\n{full_traceback}")
1062 # FAIL-FAST: Re-raise immediately instead of storing error
1063 raise
1065 logger.info("🔥 DEATH_MARKER: COMPLETION_LOOP_FINISHED")
1067 logger.info(f"🔥 ORCHESTRATOR: All tasks completed, {len(execution_results)} results collected")
1069 # Explicitly shutdown executor INSIDE the with block to avoid hang on context exit
1070 logger.info("🔥 ORCHESTRATOR: Explicitly shutting down executor")
1071 executor.shutdown(wait=True, cancel_futures=False)
1072 logger.info("🔥 ORCHESTRATOR: Executor shutdown complete")
1074 # Determine if we're using multiprocessing (ProcessPoolExecutor) or threading
1075 effective_config = self.get_effective_config()
1076 use_multiprocessing = not effective_config.use_threading
1077 logger.info(f"🔥 ORCHESTRATOR: About to start GPU cleanup (use_multiprocessing={use_multiprocessing})")
1079 # 🔥 GPU CLEANUP: Skip in multiprocessing mode - workers handle their own cleanup
1080 # In multiprocessing mode, GPU cleanup in the main process can hang because
1081 # GPU contexts are owned by worker processes, not the orchestrator process
1082 try:
1083 if cleanup_all_gpu_frameworks and not use_multiprocessing: 1083 ↛ 1084line 1083 didn't jump to line 1084 because the condition on line 1083 was never true
1084 logger.info("🔥 ORCHESTRATOR: Running GPU cleanup...")
1085 cleanup_all_gpu_frameworks()
1086 logger.info("🔥 GPU CLEANUP: Cleared all GPU frameworks after plate execution")
1087 elif use_multiprocessing: 1087 ↛ 1092line 1087 didn't jump to line 1092 because the condition on line 1087 was always true
1088 logger.info("🔥 GPU CLEANUP: Skipped in multiprocessing mode (workers handle their own cleanup)")
1089 except Exception as cleanup_error:
1090 logger.warning(f"Failed to cleanup GPU memory after plate execution: {cleanup_error}")
1092 logger.info("🔥 ORCHESTRATOR: GPU cleanup section finished")
1094 logger.info("🔥 ORCHESTRATOR: Plate execution completed, checking for analysis consolidation")
1095 # Run automatic analysis consolidation if enabled
1096 shared_context = get_current_global_config(GlobalPipelineConfig)
1097 logger.info(f"🔥 ORCHESTRATOR: Analysis consolidation enabled={shared_context.analysis_consolidation_config.enabled}")
1098 if shared_context.analysis_consolidation_config.enabled: 1098 ↛ 1146line 1098 didn't jump to line 1146 because the condition on line 1098 was always true
1099 try:
1100 logger.info("🔥 ORCHESTRATOR: Starting consolidation - finding results directory")
1101 # Get results directory using same logic as path planner (single source of truth)
1102 results_dir = None
1103 for axis_id, context in compiled_contexts.items():
1104 # Use the same logic as PathPlanner._get_results_path()
1105 plate_path = Path(context.plate_path)
1106 materialization_path = shared_context.materialization_results_path
1108 if Path(materialization_path).is_absolute(): 1108 ↛ 1109line 1108 didn't jump to line 1109 because the condition on line 1108 was never true
1109 potential_results_dir = Path(materialization_path)
1110 else:
1111 potential_results_dir = plate_path / materialization_path
1113 if potential_results_dir.exists(): 1113 ↛ 1114line 1113 didn't jump to line 1114 because the condition on line 1113 was never true
1114 results_dir = potential_results_dir
1115 logger.info(f"🔍 CONSOLIDATION: Found results directory: {results_dir}")
1116 break
1118 if results_dir and results_dir.exists(): 1118 ↛ 1119line 1118 didn't jump to line 1119 because the condition on line 1118 was never true
1119 logger.info(f"🔥 ORCHESTRATOR: Results directory exists: {results_dir}")
1120 # Check if there are actually CSV files (materialized results)
1121 logger.info("🔥 ORCHESTRATOR: Checking for CSV files...")
1122 csv_files = list(results_dir.glob("*.csv"))
1123 logger.info(f"🔥 ORCHESTRATOR: Found {len(csv_files)} CSV files")
1124 if csv_files:
1125 logger.info(f"🔄 CONSOLIDATION: Found {len(csv_files)} CSV files, running consolidation")
1126 # Get well IDs from compiled contexts
1127 axis_ids = list(compiled_contexts.keys())
1128 logger.info(f"🔄 CONSOLIDATION: Using well IDs: {axis_ids}")
1130 logger.info("🔥 ORCHESTRATOR: Calling consolidate_analysis_results()...")
1131 consolidate_fn = _get_consolidate_analysis_results()
1132 consolidate_fn(
1133 results_directory=str(results_dir),
1134 well_ids=axis_ids,
1135 consolidation_config=shared_context.analysis_consolidation_config,
1136 plate_metadata_config=shared_context.plate_metadata_config
1137 )
1138 logger.info("✅ CONSOLIDATION: Completed successfully")
1139 else:
1140 logger.info(f"⏭️ CONSOLIDATION: No CSV files found in {results_dir}, skipping")
1141 else:
1142 logger.info("⏭️ CONSOLIDATION: No results directory found in compiled contexts")
1143 except Exception as e:
1144 logger.error(f"❌ CONSOLIDATION: Failed: {e}")
1145 else:
1146 logger.info("🔥 ORCHESTRATOR: Analysis consolidation disabled, skipping")
1148 # Update state based on execution results
1149 logger.info("🔥 ORCHESTRATOR: Updating orchestrator state based on execution results")
1150 if all(result.get("status") == "success" for result in execution_results.values()): 1150 ↛ 1153line 1150 didn't jump to line 1153 because the condition on line 1150 was always true
1151 self._state = OrchestratorState.COMPLETED
1152 else:
1153 self._state = OrchestratorState.EXEC_FAILED
1154 logger.info(f"🔥 ORCHESTRATOR: State updated to {self._state}")
1156 # 🔬 VISUALIZER CLEANUP: Stop all visualizers if they were auto-created and not persistent
1157 logger.info(f"🔬 ORCHESTRATOR: Starting visualizer cleanup for {len(visualizers)} visualizers")
1158 for idx, vis in enumerate(visualizers): 1158 ↛ 1159line 1158 didn't jump to line 1159 because the loop on line 1158 never started
1159 try:
1160 logger.info(f"🔬 ORCHESTRATOR: Processing visualizer {idx+1}/{len(visualizers)}, persistent={vis.persistent}")
1161 if not vis.persistent:
1162 logger.info(f"🔬 ORCHESTRATOR: Calling stop_viewer() for non-persistent visualizer {idx+1}")
1163 vis.stop_viewer()
1164 logger.info(f"🔬 ORCHESTRATOR: Stopped non-persistent visualizer {idx+1}")
1165 else:
1166 logger.info("🔬 ORCHESTRATOR: Keeping persistent visualizer alive (no cleanup needed)")
1167 # Persistent visualizers stay alive across executions - no cleanup needed
1168 # The ZMQ connection will be reused for the next execution
1169 except Exception as e:
1170 logger.warning(f"🔬 ORCHESTRATOR: Failed to cleanup visualizer {idx+1}: {e}")
1171 logger.info("🔬 ORCHESTRATOR: Visualizer cleanup complete")
1173 logger.info(f"🔥 ORCHESTRATOR: Plate execution finished. Results: {execution_results}")
1175 return execution_results
1176 except Exception as e:
1177 self._state = OrchestratorState.EXEC_FAILED
1178 logger.error(f"Failed to execute compiled plate: {e}")
1179 raise
1181 def get_component_keys(self, component: Union['AllComponents', 'VariableComponents'], component_filter: Optional[List[Union[str, int]]] = None) -> List[str]:
1182 """
1183 Generic method to get component keys using VariableComponents directly.
1185 Returns the discovered component values as strings to match the pattern
1186 detection system format.
1188 Tries metadata cache first, falls back to filename parsing cache if metadata is empty.
1190 Args:
1191 component: AllComponents or VariableComponents enum specifying which component to extract
1192 (also accepts GroupBy enum which will be converted to AllComponents)
1193 component_filter: Optional list of component values to filter by
1195 Returns:
1196 List of component values as strings, sorted
1198 Raises:
1199 RuntimeError: If orchestrator is not initialized
1200 """
1201 if not self.is_initialized(): 1201 ↛ 1202line 1201 didn't jump to line 1202 because the condition on line 1201 was never true
1202 raise RuntimeError("Orchestrator must be initialized before getting component keys.")
1204 # Convert GroupBy to AllComponents using OpenHCS generic utility
1205 if isinstance(component, GroupBy) and component.value is None: 1205 ↛ 1206line 1205 didn't jump to line 1206 because the condition on line 1205 was never true
1206 raise ValueError("Cannot get component keys for GroupBy.NONE")
1208 # Convert to AllComponents for cache lookup (includes multiprocessing axis)
1209 component = convert_enum_by_value(component, AllComponents) or component
1211 # Use component directly - let natural errors occur for wrong types
1212 component_name = component.value
1214 # Try metadata cache first (preferred source)
1215 cached_metadata = self._metadata_cache_service.get_cached_metadata(component)
1216 if cached_metadata:
1217 all_components = list(cached_metadata.keys())
1218 logger.debug(f"Using metadata cache for {component_name}: {len(all_components)} components")
1219 else:
1220 # Fall back to filename parsing cache
1221 all_components = self._component_keys_cache[component] # Let KeyError bubble up naturally
1223 if not all_components: 1223 ↛ 1224line 1223 didn't jump to line 1224 because the condition on line 1223 was never true
1224 logger.warning(f"No {component_name} values found in input directory: {self.input_dir}")
1225 return []
1227 logger.debug(f"Using filename parsing cache for {component.value}: {len(all_components)} components")
1229 if component_filter:
1230 str_component_filter = {str(c) for c in component_filter}
1231 selected_components = [comp for comp in all_components if comp in str_component_filter]
1232 if not selected_components: 1232 ↛ 1233line 1232 didn't jump to line 1233 because the condition on line 1232 was never true
1233 component_name = group_by.value
1234 logger.warning(f"No {component_name} values from {all_components} match the filter: {component_filter}")
1235 return selected_components
1236 else:
1237 return all_components
1239 def cache_component_keys(self, components: Optional[List['AllComponents']] = None) -> None:
1240 """
1241 Pre-compute and cache component keys for fast access using single-pass parsing.
1243 This method performs expensive file listing and parsing operations once,
1244 extracting all component types in a single pass for maximum efficiency.
1246 Args:
1247 components: Optional list of AllComponents to cache.
1248 If None, caches all components in the AllComponents enum.
1249 """
1250 if not self.is_initialized(): 1250 ↛ 1251line 1250 didn't jump to line 1251 because the condition on line 1250 was never true
1251 raise RuntimeError("Orchestrator must be initialized before caching component keys.")
1253 if components is None: 1253 ↛ 1256line 1253 didn't jump to line 1256 because the condition on line 1253 was always true
1254 components = list(AllComponents) # Cache all enum values including multiprocessing axis
1256 logger.info(f"Caching component keys for: {[comp.value for comp in components]}")
1258 # Initialize component sets for all requested components
1259 component_sets: Dict['AllComponents', Set[Union[str, int]]] = {}
1260 for component in components:
1261 component_sets[component] = set()
1263 # Single pass through all filenames - extract all components at once
1264 try:
1265 # Use primary backend from microscope handler
1266 backend_to_use = self.microscope_handler.get_primary_backend(self.input_dir, self.filemanager)
1267 logger.debug(f"Using backend '{backend_to_use}' for file listing based on available backends")
1269 filenames = self.filemanager.list_files(str(self.input_dir), backend_to_use, extensions=DEFAULT_IMAGE_EXTENSIONS)
1270 logger.debug(f"Parsing {len(filenames)} filenames in single pass...")
1272 for filename in filenames:
1273 parsed_info = self.microscope_handler.parser.parse_filename(str(filename))
1274 if parsed_info: 1274 ↛ 1281line 1274 didn't jump to line 1281 because the condition on line 1274 was always true
1275 # Extract all requested components from this filename
1276 for component in component_sets:
1277 component_name = component.value
1278 if component_name in parsed_info and parsed_info[component_name] is not None: 1278 ↛ 1276line 1278 didn't jump to line 1276 because the condition on line 1278 was always true
1279 component_sets[component].add(parsed_info[component_name])
1280 else:
1281 logger.warning(f"Could not parse filename: {filename}")
1283 except Exception as e:
1284 logger.error(f"Error listing files or parsing filenames from {self.input_dir}: {e}", exc_info=True)
1285 # Initialize empty sets for failed parsing
1286 for component in component_sets:
1287 component_sets[component] = set()
1289 # Convert sets to sorted lists and store in cache
1290 for component, component_set in component_sets.items():
1291 sorted_components = [str(comp) for comp in sorted(list(component_set))]
1292 self._component_keys_cache[component] = sorted_components
1293 logger.debug(f"Cached {len(sorted_components)} {component.value} keys")
1295 if not sorted_components: 1295 ↛ 1296line 1295 didn't jump to line 1296 because the condition on line 1295 was never true
1296 logger.warning(f"No {component.value} values found in input directory: {self.input_dir}")
1298 logger.info(f"Component key caching complete. Cached {len(component_sets)} component types in single pass.")
1300 def clear_component_cache(self, components: Optional[List['AllComponents']] = None) -> None:
1301 """
1302 Clear cached component keys to force recomputation.
1304 Use this when the input directory contents have changed and you need
1305 to refresh the component key cache.
1307 Args:
1308 components: Optional list of AllComponents to clear from cache.
1309 If None, clears entire cache.
1310 """
1311 if components is None:
1312 self._component_keys_cache.clear()
1313 logger.info("Cleared entire component keys cache")
1314 else:
1315 for component in components:
1316 if component in self._component_keys_cache:
1317 del self._component_keys_cache[component]
1318 logger.debug(f"Cleared cache for {component.value}")
1319 logger.info(f"Cleared cache for {len(components)} component types")
1321 @property
1322 def metadata_cache(self) -> MetadataCache:
1323 """Access to metadata cache service."""
1324 return self._metadata_cache_service
1328 # Global config management removed - handled by UI layer
1330 @property
1331 def pipeline_config(self) -> Optional['PipelineConfig']:
1332 """Get current pipeline configuration."""
1333 return self._pipeline_config
1335 @pipeline_config.setter
1336 def pipeline_config(self, value: Optional['PipelineConfig']) -> None:
1337 """Set pipeline configuration with auto-sync to thread-local context."""
1338 self._pipeline_config = value
1339 # CRITICAL FIX: Also update public attribute for dual-axis resolver discovery
1340 # This ensures the resolver can always find the current pipeline config
1341 if hasattr(self, '__dict__'): # Avoid issues during __init__ 1341 ↛ 1343line 1341 didn't jump to line 1343 because the condition on line 1341 was always true
1342 self.__dict__['pipeline_config'] = value
1343 if self._auto_sync_enabled and value is not None:
1344 self._sync_to_thread_local()
1346 def _sync_to_thread_local(self) -> None:
1347 """Internal method to sync current pipeline_config to thread-local context."""
1348 if self._pipeline_config and hasattr(self, 'plate_path'): 1348 ↛ 1349line 1348 didn't jump to line 1349 because the condition on line 1348 was never true
1349 self.apply_pipeline_config(self._pipeline_config)
1351 def apply_pipeline_config(self, pipeline_config: 'PipelineConfig') -> None:
1352 """
1353 Apply per-orchestrator configuration using thread-local storage.
1355 This method sets the orchestrator's effective config in thread-local storage
1356 for step-level lazy configurations to resolve against.
1357 """
1358 # Import PipelineConfig at runtime for isinstance check
1359 from openhcs.core.config import PipelineConfig
1360 if not isinstance(pipeline_config, PipelineConfig):
1361 raise TypeError(f"Expected PipelineConfig, got {type(pipeline_config)}")
1363 # Temporarily disable auto-sync to prevent recursion
1364 self._auto_sync_enabled = False
1365 try:
1366 self._pipeline_config = pipeline_config
1367 finally:
1368 self._auto_sync_enabled = True
1370 # CRITICAL FIX: Do NOT contaminate thread-local context during PipelineConfig editing
1371 # The orchestrator should maintain its own internal context without modifying
1372 # the global thread-local context. This prevents reset operations from showing
1373 # orchestrator's saved values instead of original thread-local defaults.
1374 #
1375 # The merged config is computed internally and used by get_effective_config()
1376 # but should NOT be set as the global thread-local context.
1378 logger.info(f"Applied orchestrator config for plate: {self.plate_path}")
1380 def get_effective_config(self, *, for_serialization: bool = False) -> GlobalPipelineConfig:
1381 """
1382 Get effective configuration for this orchestrator.
1384 Args:
1385 for_serialization: If True, resolves all values for pickling/storage.
1386 If False, preserves None values for sibling inheritance.
1387 """
1389 if for_serialization: 1389 ↛ 1390line 1389 didn't jump to line 1390 because the condition on line 1389 was never true
1390 result = self.pipeline_config.to_base_config()
1392 # DEBUG: Check what the serialization result looks like
1393 if hasattr(result, 'step_well_filter_config'):
1394 step_config = getattr(result, 'step_well_filter_config')
1395 if hasattr(step_config, 'well_filter'):
1396 well_filter_value = getattr(step_config, 'well_filter')
1397 logger.debug(f"Serialization result has step_well_filter_config.well_filter = {well_filter_value}")
1399 return result
1400 else:
1401 # Reuse existing merged config logic from apply_pipeline_config
1402 shared_context = get_current_global_config(GlobalPipelineConfig)
1403 if not shared_context: 1403 ↛ 1404line 1403 didn't jump to line 1404 because the condition on line 1403 was never true
1404 raise RuntimeError("No global configuration context available for merging")
1406 # DEBUG: Check what the shared context looks like before merging
1407 if hasattr(shared_context, 'step_well_filter_config'): 1407 ↛ 1413line 1407 didn't jump to line 1413 because the condition on line 1407 was always true
1408 step_config = getattr(shared_context, 'step_well_filter_config')
1409 if hasattr(step_config, 'well_filter'): 1409 ↛ 1413line 1409 didn't jump to line 1413 because the condition on line 1409 was always true
1410 well_filter_value = getattr(step_config, 'well_filter')
1411 logger.debug(f"Shared context before merge has step_well_filter_config.well_filter = {well_filter_value}")
1413 result = _create_merged_config(self.pipeline_config, shared_context)
1415 # DEBUG: Check what the merged result looks like
1416 if hasattr(result, 'step_well_filter_config'): 1416 ↛ 1422line 1416 didn't jump to line 1422 because the condition on line 1416 was always true
1417 step_config = getattr(result, 'step_well_filter_config')
1418 if hasattr(step_config, 'well_filter'): 1418 ↛ 1422line 1418 didn't jump to line 1422 because the condition on line 1418 was always true
1419 well_filter_value = getattr(step_config, 'well_filter')
1420 logger.debug(f"Merged result has step_well_filter_config.well_filter = {well_filter_value}")
1422 return result
1426 def clear_pipeline_config(self) -> None:
1427 """Clear per-orchestrator configuration."""
1428 # REMOVED: Thread-local modification - dual-axis resolver handles context automatically
1429 # No need to modify thread-local storage when clearing orchestrator config
1430 self.pipeline_config = None
1431 logger.info(f"Cleared per-orchestrator config for plate: {self.plate_path}")
1433 def cleanup_pipeline_config(self) -> None:
1434 """Clean up orchestrator context when done (for backward compatibility)."""
1435 self.clear_pipeline_config()
1437 def __del__(self):
1438 """Ensure config cleanup on orchestrator destruction."""
1439 try:
1440 # Clear any stored configuration references
1441 self.clear_pipeline_config()
1442 except Exception:
1443 # Ignore errors during cleanup in destructor to prevent cascading failures
1444 pass