Coverage for openhcs/core/orchestrator/orchestrator.py: 37.7%
596 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2Consolidated orchestrator module for OpenHCS.
4This module provides a unified PipelineOrchestrator class that implements
5a two-phase (compile-all-then-execute-all) pipeline execution model.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 66 — Immutability After Construction
10- Clause 88 — No Inferred Capabilities
11- Clause 293 — GPU Pre-Declaration Enforcement
12- Clause 295 — GPU Scheduling Affinity
13"""
15import logging
16import contextlib
17import concurrent.futures
18import multiprocessing
19from dataclasses import fields
20from pathlib import Path
21from typing import Any, Callable, Dict, List, Optional, Union, Set
23from openhcs.constants.constants import Backend, DEFAULT_WORKSPACE_DIR_SUFFIX, DEFAULT_IMAGE_EXTENSIONS, GroupBy, OrchestratorState, get_openhcs_config, AllComponents, VariableComponents
24from openhcs.constants import Microscope
25from openhcs.core.config import GlobalPipelineConfig
26from openhcs.config_framework.global_config import set_current_global_config, get_current_global_config
27from openhcs.config_framework.lazy_factory import ContextProvider
30from openhcs.core.metadata_cache import get_metadata_cache, MetadataCache
31from openhcs.core.context.processing_context import ProcessingContext
32from openhcs.core.pipeline.compiler import PipelineCompiler
33from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper
34from openhcs.core.steps.abstract import AbstractStep
35from openhcs.core.components.validation import convert_enum_by_value
36from openhcs.io.filemanager import FileManager
37# Zarr backend is CPU-only; always import it (even in subprocess/no-GPU mode)
38import os
39from openhcs.io.zarr import ZarrStorageBackend
40# PipelineConfig now imported directly above
41from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
42from openhcs.io.exceptions import StorageWriteError
43from openhcs.io.base import storage_registry
44from openhcs.microscopes import create_microscope_handler
45from openhcs.microscopes.microscope_base import MicroscopeHandler
47# Conditional analysis import - skip in subprocess runner mode
48if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 48 ↛ 50line 48 didn't jump to line 50 because the condition on line 48 was never true
49 # Subprocess runner mode - create placeholder
50 def consolidate_analysis_results(*args, **kwargs):
51 """Placeholder for subprocess runner mode."""
52 raise RuntimeError("Analysis consolidation not available in subprocess runner mode")
53else:
54 from openhcs.processing.backends.analysis.consolidate_analysis_results import consolidate_analysis_results
56# Import generic component system - required for orchestrator functionality
57from openhcs.core.components.multiprocessing import MultiprocessingCoordinator
59# Optional napari import for visualization
60try:
61 from openhcs.runtime.napari_stream_visualizer import NapariStreamVisualizer
62 NapariVisualizerType = NapariStreamVisualizer
63except ImportError:
64 # Create a placeholder type for type hints when napari is not available
65 NapariStreamVisualizer = None
66 NapariVisualizerType = Any # Use Any for type hints when napari is not available
68# Optional GPU memory management imports
69try:
70 from openhcs.core.memory.gpu_cleanup import log_gpu_memory_usage, cleanup_all_gpu_frameworks
71except ImportError:
72 log_gpu_memory_usage = None
73 cleanup_all_gpu_frameworks = None
76logger = logging.getLogger(__name__)
79def _create_merged_config(pipeline_config: 'PipelineConfig', global_config: GlobalPipelineConfig) -> GlobalPipelineConfig:
80 """
81 Pure function for creating merged config that preserves None values for sibling inheritance.
83 Follows OpenHCS stateless architecture principles - no side effects, explicit dependencies.
84 Extracted from apply_pipeline_config to eliminate code duplication.
85 """
86 logger.debug(f"Starting merge with pipeline_config={type(pipeline_config)} and global_config={type(global_config)}")
88 # DEBUG: Check what the global_config looks like
89 if hasattr(global_config, 'step_well_filter_config'): 89 ↛ 95line 89 didn't jump to line 95 because the condition on line 89 was always true
90 step_config = getattr(global_config, 'step_well_filter_config')
91 if hasattr(step_config, 'well_filter'): 91 ↛ 95line 91 didn't jump to line 95 because the condition on line 91 was always true
92 well_filter_value = getattr(step_config, 'well_filter')
93 logger.debug(f"global_config has step_well_filter_config.well_filter = {well_filter_value}")
95 merged_config_values = {}
96 for field in fields(GlobalPipelineConfig):
97 # Fail-loud: Let AttributeError bubble up naturally (no getattr fallbacks)
98 pipeline_value = getattr(pipeline_config, field.name)
100 if field.name == 'step_well_filter_config':
101 logger.debug(f"Processing step_well_filter_config: pipeline_value = {pipeline_value}")
103 if pipeline_value is not None:
104 # CRITICAL FIX: Convert lazy configs to base configs with resolved values
105 # This ensures that user-set values from lazy configs are preserved in the thread-local context
106 # instead of being replaced with static defaults when GlobalPipelineConfig is instantiated
107 if hasattr(pipeline_value, 'to_base_config'):
108 # This is a lazy config - convert to base config with resolved values
109 converted_value = pipeline_value.to_base_config()
110 merged_config_values[field.name] = converted_value
111 if field.name == 'step_well_filter_config':
112 logger.debug(f"Converted lazy config to base: {converted_value}")
113 else:
114 # Regular value - use as-is
115 merged_config_values[field.name] = pipeline_value
116 if field.name == 'step_well_filter_config': 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 logger.debug(f"Using pipeline value as-is: {pipeline_value}")
118 else:
119 global_value = getattr(global_config, field.name)
120 merged_config_values[field.name] = global_value
121 if field.name == 'step_well_filter_config': 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 print(f"🔍 MERGE DEBUG: Using global_config value: {global_value}")
124 result = GlobalPipelineConfig(**merged_config_values)
126 # DEBUG: Check what the result looks like
127 if hasattr(result, 'step_well_filter_config'): 127 ↛ 133line 127 didn't jump to line 133 because the condition on line 127 was always true
128 step_config = getattr(result, 'step_well_filter_config')
129 if hasattr(step_config, 'well_filter'): 129 ↛ 133line 129 didn't jump to line 133 because the condition on line 129 was always true
130 well_filter_value = getattr(step_config, 'well_filter')
131 print(f"🔍 MERGE DEBUG: Final result has step_well_filter_config.well_filter = {well_filter_value}")
133 return result
136def _execute_single_axis_static(
137 pipeline_definition: List[AbstractStep],
138 frozen_context: 'ProcessingContext',
139 visualizer: Optional['NapariVisualizerType']
140) -> Dict[str, Any]:
141 """
142 Static version of _execute_single_axis for multiprocessing compatibility.
144 This function is identical to PipelineOrchestrator._execute_single_axis but doesn't
145 require an orchestrator instance, making it safe for pickling in ProcessPoolExecutor.
146 """
147 axis_id = frozen_context.axis_id
148 logger.info(f"🔥 SINGLE_AXIS: Starting execution for axis {axis_id}")
150 # NUCLEAR VALIDATION
151 if not frozen_context.is_frozen():
152 error_msg = f"🔥 SINGLE_AXIS ERROR: Context for axis {axis_id} is not frozen before execution"
153 logger.error(error_msg)
154 raise RuntimeError(error_msg)
156 if not pipeline_definition:
157 error_msg = f"🔥 SINGLE_AXIS ERROR: Empty pipeline_definition for axis {axis_id}"
158 logger.error(error_msg)
159 raise RuntimeError(error_msg)
161 # Execute each step in the pipeline
162 for step_index, step in enumerate(pipeline_definition):
163 step_name = frozen_context.step_plans[step_index]["step_name"]
165 logger.info(f"🔥 SINGLE_AXIS: Executing step {step_index+1}/{len(pipeline_definition)} - {step_name} for axis {axis_id}")
167 if not hasattr(step, 'process'):
168 error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} missing process method for axis {axis_id}"
169 logger.error(error_msg)
170 raise RuntimeError(error_msg)
172 # CRITICAL: Wrap step processing in config_context(step) for lazy config resolution
173 from openhcs.config_framework.context_manager import config_context
174 with config_context(step):
175 step.process(frozen_context, step_index)
176 logger.info(f"🔥 SINGLE_AXIS: Step {step_index+1}/{len(pipeline_definition)} - {step_name} completed for axis {axis_id}")
178 # Handle visualization if requested
179 if visualizer:
180 step_plan = frozen_context.step_plans[step_index]
181 if step_plan['visualize']:
182 output_dir = step_plan['output_dir']
183 write_backend = step_plan['write_backend']
184 if output_dir:
185 logger.debug(f"Visualizing output for step {step_index} from path {output_dir} (backend: {write_backend}) for axis {axis_id}")
186 visualizer.visualize_path(
187 step_id=f"step_{step_index}",
188 path=str(output_dir),
189 backend=write_backend,
190 axis_id=axis_id
191 )
192 else:
193 logger.warning(f"Step {step_index} in axis {axis_id} flagged for visualization but 'output_dir' is missing in its plan.")
195 logger.info(f"🔥 SINGLE_AXIS: Pipeline execution completed successfully for axis {axis_id}")
196 return {"status": "success", "axis_id": axis_id}
199def _configure_worker_logging(log_file_base: str):
200 """
201 Configure logging and import hook for worker process.
203 This function is called once per worker process when it starts.
204 Each worker will get its own log file with a unique identifier.
206 Args:
207 log_file_base: Base path for worker log files
208 """
209 import os
210 import logging
211 import time
213 # CRITICAL: Skip function registry initialization for fast worker startup
214 # The environment variable is inherited from the subprocess runner
215 # Note: We don't log this yet because logging isn't configured
217 # Note: Import hook system was removed - using existing comprehensive registries
219 # Create unique worker identifier using PID and timestamp
220 worker_pid = os.getpid()
221 worker_timestamp = int(time.time() * 1000000) # Microsecond precision for uniqueness
222 worker_id = f"{worker_pid}_{worker_timestamp}"
223 worker_log_file = f"{log_file_base}_worker_{worker_id}.log"
225 # Configure root logger to capture ALL logs from worker process
226 root_logger = logging.getLogger()
227 root_logger.handlers.clear() # Clear any inherited handlers
229 # Create file handler for worker logs
230 file_handler = logging.FileHandler(worker_log_file)
231 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
232 root_logger.addHandler(file_handler)
233 root_logger.setLevel(logging.INFO)
235 # Ensure all OpenHCS module logs are captured
236 logging.getLogger("openhcs").setLevel(logging.INFO)
238 # Get worker logger
239 worker_logger = logging.getLogger("openhcs.worker")
240 worker_logger.info(f"🔥 WORKER: Process {worker_pid} (ID: {worker_id}) logging configured")
241 worker_logger.info(f"🔥 WORKER: All logs writing to: {worker_log_file}")
243 # Log import hook installation status
244 worker_logger.info(f"🔥 WORKER: Import hook installed for auto-discovered functions")
247def _configure_worker_with_gpu(log_file_base: str, global_config_dict: dict):
248 """
249 Configure logging, function registry, and GPU registry for worker process.
251 This function is called once per worker process when it starts.
252 It sets up logging, function registry, and GPU registry initialization.
254 Args:
255 log_file_base: Base path for worker log files (empty string if no logging)
256 global_config_dict: Serialized global configuration for GPU registry setup
257 """
258 import logging
259 import os
261 # Workers should be allowed to import GPU libs if available.
262 # The parent subprocess runner may set OPENHCS_SUBPROCESS_NO_GPU=1 to stay lean,
263 # but that flag must not leak into worker processes.
264 os.environ.pop('OPENHCS_SUBPROCESS_NO_GPU', None)
266 # Configure logging only if log_file_base is provided
267 if log_file_base:
268 _configure_worker_logging(log_file_base)
269 worker_logger = logging.getLogger("openhcs.worker")
270 else:
271 # Set up basic logging for worker messages
272 logging.basicConfig(level=logging.INFO)
273 worker_logger = logging.getLogger("openhcs.worker")
274 worker_logger.info("🔥 WORKER: No log file base provided, using basic logging")
276 # Initialize function registry for this worker process
277 try:
278 worker_logger.info("🔥 WORKER: Initializing function registry for worker process")
280 # Import and initialize function registry (will auto-discover all libraries)
281 import openhcs.processing.func_registry as func_registry_module
283 # Force initialization if not already done (workers need full registry)
284 with func_registry_module._registry_lock:
285 if not func_registry_module._registry_initialized:
286 func_registry_module._auto_initialize_registry()
288 worker_logger.info("🔥 WORKER: Function registry initialized successfully")
290 except Exception as e:
291 worker_logger.error(f"🔥 WORKER: Failed to initialize function registry: {e}")
292 # Don't raise - let worker continue, registry will auto-init on first function call
293 worker_logger.warning("🔥 WORKER: Function registry will auto-initialize on first function call")
295 # Initialize GPU registry for this worker process
296 try:
297 worker_logger.info("🔥 WORKER: Initializing GPU registry for worker process")
299 # Reconstruct global config from dict
300 from openhcs.core.config import GlobalPipelineConfig
301 global_config = GlobalPipelineConfig(**global_config_dict)
303 # Initialize GPU registry for this worker
304 from openhcs.core.orchestrator.gpu_scheduler import setup_global_gpu_registry
305 setup_global_gpu_registry(global_config)
307 worker_logger.info("🔥 WORKER: GPU registry initialized successfully")
309 except Exception as e:
310 worker_logger.error(f"🔥 WORKER: Failed to initialize GPU registry: {e}")
311 # Don't raise - let worker continue without GPU if needed
312 worker_logger.warning("🔥 WORKER: Continuing without GPU registry - GPU functions may fail")
315# Global variable to store log file base for worker processes
316_worker_log_file_base = None
322class PipelineOrchestrator(ContextProvider):
323 """
324 Updated orchestrator supporting both global and per-orchestrator configuration.
326 Global configuration: Updates all orchestrators (existing behavior)
327 Per-orchestrator configuration: Affects only this orchestrator instance
329 The orchestrator first compiles the pipeline for all specified axis values,
330 creating frozen, immutable ProcessingContexts using `compile_plate_for_processing()`.
331 Then, it executes the (now stateless) pipeline definition against these contexts,
332 potentially in parallel, using `execute_compiled_plate()`.
333 """
334 _context_type = "orchestrator" # Register as orchestrator context provider
336 def __init__(
337 self,
338 plate_path: Union[str, Path],
339 workspace_path: Optional[Union[str, Path]] = None,
340 *,
341 pipeline_config: Optional['PipelineConfig'] = None,
342 storage_registry: Optional[Any] = None,
343 ):
344 # Lock removed - was orphaned code never used
346 # Validate shared global context exists
347 if get_current_global_config(GlobalPipelineConfig) is None: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 raise RuntimeError(
349 "No global configuration context found. "
350 "Ensure application startup has called ensure_global_config_context()."
351 )
353 # Initialize auto-sync control for pipeline config
354 self._pipeline_config = None
355 self._auto_sync_enabled = True
357 # Context management now handled by contextvars-based system
359 # Initialize per-orchestrator configuration
360 # DUAL-AXIS FIX: Always create a PipelineConfig instance to make orchestrator detectable as context provider
361 # This ensures the orchestrator has a dataclass attribute for stack introspection
362 # PipelineConfig is already the lazy version of GlobalPipelineConfig
363 from openhcs.core.config import PipelineConfig
364 if pipeline_config is None: 364 ↛ 367line 364 didn't jump to line 367 because the condition on line 364 was never true
365 # CRITICAL FIX: Create pipeline config that inherits from global config
366 # This ensures the orchestrator's pipeline_config has the global values for resolution
367 pipeline_config = PipelineConfig()
369 # CRITICAL FIX: Do NOT apply global config inheritance during initialization
370 # PipelineConfig should always have None values that resolve through lazy resolution
371 # Copying concrete values breaks the placeholder system and makes all fields appear "explicitly set"
373 self.pipeline_config = pipeline_config
375 # CRITICAL FIX: Expose pipeline config as public attribute for dual-axis resolver discovery
376 # The resolver's _is_context_provider method only finds public attributes (skips _private)
377 # This allows the resolver to discover the orchestrator's pipeline config during context resolution
378 self.pipeline_config = pipeline_config
379 logger.info("PipelineOrchestrator initialized with PipelineConfig for context discovery.")
381 # REMOVED: Unnecessary thread-local modification
382 # The orchestrator should not modify thread-local storage during initialization
383 # Global config is already available through the dual-axis resolver fallback
385 # Validate plate_path if provided
386 if plate_path: 386 ↛ 389line 386 didn't jump to line 389 because the condition on line 386 was always true
387 plate_path = Path(plate_path)
389 if plate_path: 389 ↛ 398line 389 didn't jump to line 398 because the condition on line 389 was always true
390 if not plate_path.is_absolute(): 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true
391 raise ValueError(f"Plate path must be absolute: {plate_path}")
392 if not plate_path.exists(): 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true
393 raise FileNotFoundError(f"Plate path does not exist: {plate_path}")
394 if not plate_path.is_dir(): 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true
395 raise NotADirectoryError(f"Plate path is not a directory: {plate_path}")
397 # Initialize _plate_path_frozen first to allow plate_path to be set during initialization
398 object.__setattr__(self, '_plate_path_frozen', False)
400 self.plate_path = plate_path
401 self.workspace_path = workspace_path
403 if self.plate_path is None and self.workspace_path is None: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 raise ValueError("Either plate_path or workspace_path must be provided for PipelineOrchestrator.")
406 # Freeze plate_path immediately after setting it to prove immutability
407 object.__setattr__(self, '_plate_path_frozen', True)
408 logger.info(f"🔒 PLATE_PATH FROZEN: {self.plate_path} is now immutable")
410 if storage_registry: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true
411 self.registry = storage_registry
412 logger.info("PipelineOrchestrator using provided StorageRegistry instance.")
413 else:
414 # Create a copy of the global registry to avoid modifying shared state
415 from openhcs.io.base import storage_registry as global_storage_registry
416 self.registry = global_storage_registry.copy()
417 logger.info("PipelineOrchestrator created its own StorageRegistry instance (copy of global).")
419 # Override zarr backend with orchestrator's config
420 shared_context = get_current_global_config(GlobalPipelineConfig)
421 zarr_backend_with_config = ZarrStorageBackend(shared_context.zarr_config)
422 self.registry[Backend.ZARR.value] = zarr_backend_with_config
423 logger.info(f"Orchestrator zarr backend configured with {shared_context.zarr_config.compressor.value} compression")
425 # Orchestrator always creates its own FileManager, using the determined registry
426 self.filemanager = FileManager(self.registry)
427 self.input_dir: Optional[Path] = None
428 self.microscope_handler: Optional[MicroscopeHandler] = None
429 self.default_pipeline_definition: Optional[List[AbstractStep]] = None
430 self._initialized: bool = False
431 self._state: OrchestratorState = OrchestratorState.CREATED
433 # Component keys cache for fast access - uses AllComponents (includes multiprocessing axis)
434 self._component_keys_cache: Dict['AllComponents', List[str]] = {}
436 # Metadata cache service
437 self._metadata_cache_service = get_metadata_cache()
443 def __setattr__(self, name: str, value: Any) -> None:
444 """
445 Set an attribute, preventing modification of plate_path after it's frozen.
447 This proves that plate_path is truly immutable after initialization.
448 """
449 if name == 'plate_path' and getattr(self, '_plate_path_frozen', False): 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true
450 import traceback
451 stack_trace = ''.join(traceback.format_stack())
452 error_msg = (
453 f"🚫 IMMUTABLE PLATE_PATH VIOLATION: Cannot modify plate_path after freezing!\n"
454 f"Current value: {getattr(self, 'plate_path', 'UNSET')}\n"
455 f"Attempted new value: {value}\n"
456 f"Stack trace:\n{stack_trace}"
457 )
458 logger.error(error_msg)
459 raise AttributeError(error_msg)
460 super().__setattr__(name, value)
462 @property
463 def state(self) -> OrchestratorState:
464 """Get the current orchestrator state."""
465 return self._state
467 def initialize_microscope_handler(self):
468 """Initializes the microscope handler."""
469 if self.microscope_handler is not None: 469 ↛ 470line 469 didn't jump to line 470 because the condition on line 469 was never true
470 logger.debug("Microscope handler already initialized.")
471 return
472# if self.input_dir is None:
473# raise RuntimeError("Workspace (and input_dir) must be initialized before microscope handler.")
475 logger.info(f"Initializing microscope handler using input directory: {self.input_dir}...")
476 try:
477 # Use configured microscope type or auto-detect
478 shared_context = get_current_global_config(GlobalPipelineConfig)
479 microscope_type = shared_context.microscope.value if shared_context.microscope != Microscope.AUTO else 'auto'
480 self.microscope_handler = create_microscope_handler(
481 plate_folder=str(self.plate_path),
482 filemanager=self.filemanager,
483 microscope_type=microscope_type,
484 )
485 logger.info(f"Initialized microscope handler: {type(self.microscope_handler).__name__}")
486 except Exception as e:
487 error_msg = f"Failed to create microscope handler: {e}"
488 logger.error(error_msg)
489 raise RuntimeError(error_msg) from e
491 def initialize(self, workspace_path: Optional[Union[str, Path]] = None) -> 'PipelineOrchestrator':
492 """
493 Initializes all required components for the orchestrator.
494 Must be called before other processing methods.
495 Returns self for chaining.
496 """
497 if self._initialized: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true
498 logger.info("Orchestrator already initialized.")
499 return self
501 try:
502 self.initialize_microscope_handler()
504 # Delegate workspace initialization to microscope handler
505 logger.info("Initializing workspace with microscope handler...")
506 actual_image_dir = self.microscope_handler.initialize_workspace(
507 self.plate_path, workspace_path, self.filemanager
508 )
510 # Use the actual image directory returned by the microscope handler
511 self.input_dir = Path(actual_image_dir)
512 logger.info(f"Set input directory to: {self.input_dir}")
514 # Set workspace_path based on what the handler returned
515 if actual_image_dir != self.plate_path: 515 ↛ 520line 515 didn't jump to line 520 because the condition on line 515 was always true
516 # Handler created a workspace
517 self.workspace_path = Path(actual_image_dir).parent if Path(actual_image_dir).name != "workspace" else Path(actual_image_dir)
518 else:
519 # Handler used plate directly (like OpenHCS)
520 self.workspace_path = None
522 # Mark as initialized BEFORE caching to avoid chicken-and-egg problem
523 self._initialized = True
524 self._state = OrchestratorState.READY
526 # Auto-cache component keys and metadata for instant access
527 logger.info("Caching component keys and metadata...")
528 self.cache_component_keys()
529 self._metadata_cache_service.cache_metadata(
530 self.microscope_handler,
531 self.plate_path,
532 self._component_keys_cache
533 )
535 logger.info("PipelineOrchestrator fully initialized with cached component keys and metadata.")
536 return self
537 except Exception as e:
538 self._state = OrchestratorState.INIT_FAILED
539 logger.error(f"Failed to initialize orchestrator: {e}")
540 raise
542 def is_initialized(self) -> bool:
543 return self._initialized
545 def create_context(self, axis_id: str) -> ProcessingContext:
546 """Creates a ProcessingContext for a given multiprocessing axis value."""
547 if not self.is_initialized(): 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 raise RuntimeError("Orchestrator must be initialized before calling create_context().")
549 if not axis_id: 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true
550 raise ValueError("Axis identifier must be provided.")
551 if self.input_dir is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 raise RuntimeError("Orchestrator input_dir is not set; initialize orchestrator first.")
554 context = ProcessingContext(
555 global_config=self.get_effective_config(),
556 axis_id=axis_id,
557 filemanager=self.filemanager
558 )
559 # Orchestrator reference removed - was orphaned and unpickleable
560 context.microscope_handler = self.microscope_handler
561 context.input_dir = self.input_dir
562 context.workspace_path = self.workspace_path
563 context.plate_path = self.plate_path # Add plate_path for path planner
564 # Pass metadata cache for OpenHCS metadata creation
565 context.metadata_cache = {} # Initialize empty - metadata cache is not pickled
566 return context
568 def compile_pipelines(
569 self,
570 pipeline_definition: List[AbstractStep],
571 well_filter: Optional[List[str]] = None,
572 enable_visualizer_override: bool = False
573 ) -> Dict[str, ProcessingContext]:
574 """Compile pipelines for axis values (well_filter name preserved for UI compatibility)."""
575 return PipelineCompiler.compile_pipelines(
576 orchestrator=self,
577 pipeline_definition=pipeline_definition,
578 axis_filter=well_filter, # Translate well_filter to axis_filter for generic backend
579 enable_visualizer_override=enable_visualizer_override
580 )
582 def _execute_single_axis(
583 self,
584 pipeline_definition: List[AbstractStep],
585 frozen_context: ProcessingContext,
586 visualizer: Optional[NapariVisualizerType]
587 ) -> Dict[str, Any]:
588 """Executes the pipeline for a single well using its frozen context."""
589 axis_id = frozen_context.axis_id
590 logger.info(f"🔥 SINGLE_AXIS: Starting execution for axis {axis_id}")
592 # NUCLEAR VALIDATION
593 if not frozen_context.is_frozen():
594 error_msg = f"🔥 SINGLE_AXIS ERROR: Context for axis {axis_id} is not frozen before execution"
595 logger.error(error_msg)
596 raise RuntimeError(error_msg)
598 if not pipeline_definition:
599 error_msg = f"🔥 SINGLE_AXIS ERROR: Empty pipeline_definition for axis {axis_id}"
600 logger.error(error_msg)
601 raise RuntimeError(error_msg)
603 # Step IDs are consistent since pipeline_definition comes from UI (no remapping needed)
605 logger.info(f"🔥 SINGLE_AXIS: Processing {len(pipeline_definition)} steps for axis {axis_id}")
607 for step_index, step in enumerate(pipeline_definition):
608 step_name = frozen_context.step_plans[step_index]["step_name"]
610 logger.info(f"🔥 SINGLE_AXIS: Executing step {step_index+1}/{len(pipeline_definition)} - {step_name} for axis {axis_id}")
612 if not hasattr(step, 'process'):
613 error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} missing process method for axis {axis_id}"
614 logger.error(error_msg)
615 raise RuntimeError(error_msg)
617 # CRITICAL: Wrap step processing in config_context(step) for lazy config resolution
618 from openhcs.config_framework.context_manager import config_context
619 with config_context(step):
620 step.process(frozen_context, step_index)
621 logger.info(f"🔥 SINGLE_AXIS: Step {step_index+1}/{len(pipeline_definition)} - {step_name} completed for axis {axis_id}")
623 # except Exception as step_error:
624 # import traceback
625 # full_traceback = traceback.format_exc()
626 # error_msg = f"🔥 SINGLE_AXIS ERROR: Step {step_index+1} ({step_id}) failed for axis {axis_id}: {step_error}"
627 # logger.error(error_msg, exc_info=True)
628 # logger.error(f"🔥 SINGLE_AXIS TRACEBACK for axis {axis_id}, step {step_index+1} ({step_id}):\n{full_traceback}")
629 # raise RuntimeError(error_msg) from step_error
631 if visualizer:
632 step_plan = frozen_context.step_plans[step_index]
633 if step_plan['visualize']:
634 output_dir = step_plan['output_dir']
635 write_backend = step_plan['write_backend']
636 if output_dir:
637 logger.debug(f"Visualizing output for step {step_index} from path {output_dir} (backend: {write_backend}) for axis {axis_id}")
638 visualizer.visualize_path(
639 step_id=f"step_{step_index}",
640 path=str(output_dir),
641 backend=write_backend,
642 axis_id=axis_id
643 )
644 else:
645 logger.warning(f"Step {step_index} in axis {axis_id} flagged for visualization but 'output_dir' is missing in its plan.")
647 logger.info(f"🔥 SINGLE_AXIS: Pipeline execution completed successfully for axis {axis_id}")
648 return {"status": "success", "axis_id": axis_id}
650 def execute_compiled_plate(
651 self,
652 pipeline_definition: List[AbstractStep],
653 compiled_contexts: Dict[str, ProcessingContext],
654 max_workers: Optional[int] = None,
655 visualizer: Optional[NapariVisualizerType] = None,
656 log_file_base: Optional[str] = None
657 ) -> Dict[str, Dict[str, Any]]:
658 """
659 Execute-all phase: Runs the stateless pipeline against compiled contexts.
661 Args:
662 pipeline_definition: The stateless list of AbstractStep objects.
663 compiled_contexts: Dict of axis_id to its compiled, frozen ProcessingContext.
664 Obtained from `compile_plate_for_processing`.
665 max_workers: Maximum number of worker threads for parallel execution.
666 visualizer: Optional instance of NapariStreamVisualizer for real-time visualization
667 (requires napari to be installed; must be initialized with orchestrator's filemanager by the caller).
668 log_file_base: Base path for worker process log files (without extension).
669 Each worker will create its own log file: {log_file_base}_worker_{pid}.log
671 Returns:
672 A dictionary mapping well IDs to their execution status (success/error and details).
673 """
675 # CRITICAL FIX: Use resolved pipeline definition from compilation if available
676 # For subprocess runner, use the parameter directly since it receives pre-compiled contexts
677 resolved_pipeline = getattr(self, '_resolved_pipeline_definition', None)
678 if resolved_pipeline is not None:
679 logger.info(f"🔥 EXECUTION: Using resolved pipeline definition with {len(resolved_pipeline)} steps (from compilation)")
680 pipeline_definition = resolved_pipeline
681 else:
682 logger.info(f"🔥 EXECUTION: Using parameter pipeline definition with {len(pipeline_definition)} steps (subprocess mode)")
683 # In subprocess mode, the pipeline_definition parameter should already be resolved
684 if not self.is_initialized():
685 raise RuntimeError("Orchestrator must be initialized before executing.")
686 if not pipeline_definition:
687 raise ValueError("A valid (stateless) pipeline definition must be provided.")
688 if not compiled_contexts:
689 logger.warning("No compiled contexts provided for execution.")
690 return {}
692 # Use effective config (includes pipeline config) instead of global config directly
693 actual_max_workers = max_workers if max_workers is not None else self.get_effective_config().num_workers
694 if actual_max_workers <= 0: # Ensure positive number of workers
695 actual_max_workers = 1
697 # 🔬 AUTOMATIC VISUALIZER CREATION: Create visualizers if compiler detected streaming
698 if visualizer is None:
699 context = next(iter(compiled_contexts.values()))
700 logger.info(f"🔬 ORCHESTRATOR: Checking for required visualizers: {len(context.required_visualizers)} found")
701 for visualizer_info in context.required_visualizers:
702 config = visualizer_info['config']
703 logger.info(f"🔬 ORCHESTRATOR: Creating visualizer with config: {config}")
704 visualizer = config.create_visualizer(self.filemanager, context.visualizer_config)
705 logger.info(f"🔬 ORCHESTRATOR: Starting visualizer: {visualizer}")
706 visualizer.start_viewer()
707 logger.info(f"🔬 ORCHESTRATOR: Visualizer started, is_running: {visualizer.is_running}")
708 break
710 self._state = OrchestratorState.EXECUTING
711 logger.info(f"Starting execution for {len(compiled_contexts)} axis values with max_workers={actual_max_workers}.")
713 # 🔍 VRAM TRACKING: Log initial memory state
714 try:
715 if log_gpu_memory_usage:
716 log_gpu_memory_usage("plate execution start")
717 except Exception:
718 pass
720 try:
721 execution_results: Dict[str, Dict[str, Any]] = {}
723 # CUDA COMPATIBILITY: Set spawn method for multiprocessing to support CUDA
724 try:
725 # Check if spawn method is available and set it if not already set
726 current_method = multiprocessing.get_start_method(allow_none=True)
727 if current_method != 'spawn':
728 logger.info(f"🔥 CUDA: Setting multiprocessing start method from '{current_method}' to 'spawn' for CUDA compatibility")
729 multiprocessing.set_start_method('spawn', force=True)
730 else:
731 logger.debug("🔥 CUDA: Multiprocessing start method already set to 'spawn'")
732 except RuntimeError as e:
733 # Start method may already be set, which is fine
734 logger.debug(f"🔥 CUDA: Start method already configured: {e}")
736 # Choose executor type based on effective config for debugging support
737 effective_config = self.get_effective_config()
738 executor_type = "ThreadPoolExecutor" if effective_config.use_threading else "ProcessPoolExecutor"
739 logger.info(f"🔥 ORCHESTRATOR: Creating {executor_type} with {actual_max_workers} workers")
741 # DEATH DETECTION: Mark executor creation
742 logger.info(f"🔥 DEATH_MARKER: BEFORE_{executor_type.upper()}_CREATION")
744 # Choose appropriate executor class and configure worker logging
745 if effective_config.use_threading:
746 logger.info("🔥 DEBUG MODE: Using ThreadPoolExecutor for easier debugging")
747 executor = concurrent.futures.ThreadPoolExecutor(max_workers=actual_max_workers)
748 else:
749 logger.info("🔥 PRODUCTION MODE: Using ProcessPoolExecutor for true parallelism")
750 # CRITICAL FIX: Use _configure_worker_with_gpu to ensure workers have function registry
751 # Workers need the function registry to access decorated functions with memory types
752 from openhcs.config_framework.global_config import get_current_global_config
753 global_config = get_current_global_config(GlobalPipelineConfig)
754 global_config_dict = global_config.__dict__ if global_config else {}
756 if log_file_base:
757 logger.info(f"🔥 WORKER SETUP: Configuring worker processes with function registry and logging")
758 executor = concurrent.futures.ProcessPoolExecutor(
759 max_workers=actual_max_workers,
760 initializer=_configure_worker_with_gpu,
761 initargs=(log_file_base, global_config_dict)
762 )
763 else:
764 logger.info("🔥 WORKER SETUP: Configuring worker processes with function registry (no logging)")
765 executor = concurrent.futures.ProcessPoolExecutor(
766 max_workers=actual_max_workers,
767 initializer=_configure_worker_with_gpu,
768 initargs=("", global_config_dict) # Empty string for no logging
769 )
771 logger.info(f"🔥 DEATH_MARKER: ENTERING_{executor_type.upper()}_CONTEXT")
772 with executor:
773 logger.info(f"🔥 DEATH_MARKER: {executor_type.upper()}_CREATED_SUCCESSFULLY")
774 logger.info(f"🔥 ORCHESTRATOR: {executor_type} created, submitting {len(compiled_contexts)} tasks")
776 # NUCLEAR ERROR TRACING: Create snapshot of compiled_contexts to prevent iteration issues
777 contexts_snapshot = dict(compiled_contexts.items())
778 logger.info(f"🔥 ORCHESTRATOR: Created contexts snapshot with {len(contexts_snapshot)} items")
780 # CRITICAL FIX: Resolve all lazy dataclass instances before multiprocessing
781 # This ensures that the contexts are safe for pickling in ProcessPoolExecutor
782 # Note: Don't resolve pipeline_definition as it may overwrite collision-resolved configs
783 logger.info("🔥 ORCHESTRATOR: Resolving lazy dataclasses for multiprocessing compatibility")
784 contexts_snapshot = resolve_lazy_configurations_for_serialization(contexts_snapshot)
785 logger.info("🔥 ORCHESTRATOR: Lazy dataclass resolution completed")
787 logger.info("🔥 DEATH_MARKER: BEFORE_TASK_SUBMISSION_LOOP")
788 future_to_axis_id = {}
789 config = get_openhcs_config()
790 if not config:
791 raise RuntimeError("Component configuration is required for orchestrator execution")
792 axis_name = config.multiprocessing_axis.value
793 for axis_id, context in contexts_snapshot.items():
794 try:
795 logger.info(f"🔥 DEATH_MARKER: SUBMITTING_TASK_FOR_{axis_name.upper()}_{axis_id}")
796 logger.info(f"🔥 ORCHESTRATOR: Submitting task for {axis_name} {axis_id}")
797 # Resolve all arguments before passing to ProcessPoolExecutor
798 resolved_context = resolve_lazy_configurations_for_serialization(context)
800 # Use static function to avoid pickling the orchestrator instance
801 # Note: Use original pipeline_definition to preserve collision-resolved configs
802 # Don't pass visualizer to worker processes - they communicate via ZeroMQ
803 future = executor.submit(_execute_single_axis_static, pipeline_definition, resolved_context, None)
804 future_to_axis_id[future] = axis_id
805 logger.info(f"🔥 ORCHESTRATOR: Task submitted for {axis_name} {axis_id}")
806 logger.info(f"🔥 DEATH_MARKER: TASK_SUBMITTED_FOR_{axis_name.upper()}_{axis_id}")
807 except Exception as submit_error:
808 error_msg = f"🔥 ORCHESTRATOR ERROR: Failed to submit task for {axis_name} {axis_id}: {submit_error}"
809 logger.error(error_msg, exc_info=True)
810 # FAIL-FAST: Re-raise task submission errors immediately
811 raise
813 logger.info("🔥 DEATH_MARKER: TASK_SUBMISSION_LOOP_COMPLETED")
815 logger.info(f"🔥 ORCHESTRATOR: All {len(future_to_axis_id)} tasks submitted, waiting for completion")
816 logger.info("🔥 DEATH_MARKER: BEFORE_COMPLETION_LOOP")
818 completed_count = 0
819 logger.info("🔥 DEATH_MARKER: ENTERING_AS_COMPLETED_LOOP")
820 for future in concurrent.futures.as_completed(future_to_axis_id):
821 axis_id = future_to_axis_id[future]
822 completed_count += 1
823 logger.info(f"🔥 DEATH_MARKER: PROCESSING_COMPLETED_TASK_{completed_count}_{axis_name.upper()}_{axis_id}")
824 logger.info(f"🔥 ORCHESTRATOR: Task {completed_count}/{len(future_to_axis_id)} completed for {axis_name} {axis_id}")
826 try:
827 logger.info(f"🔥 DEATH_MARKER: CALLING_FUTURE_RESULT_FOR_{axis_name.upper()}_{axis_id}")
828 result = future.result()
829 logger.info(f"🔥 DEATH_MARKER: FUTURE_RESULT_SUCCESS_FOR_{axis_name.upper()}_{axis_id}")
830 logger.info(f"🔥 ORCHESTRATOR: {axis_name.title()} {axis_id} result: {result}")
831 execution_results[axis_id] = result
832 logger.info(f"🔥 DEATH_MARKER: RESULT_STORED_FOR_{axis_name.upper()}_{axis_id}")
833 except Exception as exc:
834 import traceback
835 full_traceback = traceback.format_exc()
836 error_msg = f"{axis_name.title()} {axis_id} generated an exception during execution: {exc}"
837 logger.error(f"🔥 ORCHESTRATOR ERROR: {error_msg}", exc_info=True)
838 logger.error(f"🔥 ORCHESTRATOR FULL TRACEBACK for {axis_name} {axis_id}:\n{full_traceback}")
839 # FAIL-FAST: Re-raise immediately instead of storing error
840 raise
842 logger.info("🔥 DEATH_MARKER: COMPLETION_LOOP_FINISHED")
844 logger.info(f"🔥 ORCHESTRATOR: All tasks completed, {len(execution_results)} results collected")
847 # 🔥 GPU CLEANUP: Clear GPU memory after plate execution
848 try:
849 if cleanup_all_gpu_frameworks:
850 cleanup_all_gpu_frameworks()
851 logger.debug("🔥 GPU CLEANUP: Cleared all GPU frameworks after plate execution")
852 except Exception as cleanup_error:
853 logger.warning(f"Failed to cleanup GPU memory after plate execution: {cleanup_error}")
857 logger.info(f"🔥 ORCHESTRATOR: Plate execution completed, checking for analysis consolidation")
858 # Run automatic analysis consolidation if enabled
859 shared_context = get_current_global_config(GlobalPipelineConfig)
860 if shared_context.analysis_consolidation_config.enabled:
861 try:
862 # Get results directory using same logic as path planner (single source of truth)
863 results_dir = None
864 for axis_id, context in compiled_contexts.items():
865 # Use the same logic as PathPlanner._get_results_path()
866 plate_path = Path(context.plate_path)
867 materialization_path = shared_context.materialization_results_path
869 if Path(materialization_path).is_absolute():
870 potential_results_dir = Path(materialization_path)
871 else:
872 potential_results_dir = plate_path / materialization_path
874 if potential_results_dir.exists():
875 results_dir = potential_results_dir
876 logger.info(f"🔍 CONSOLIDATION: Found results directory: {results_dir}")
877 break
879 if results_dir and results_dir.exists():
880 # Check if there are actually CSV files (materialized results)
881 csv_files = list(results_dir.glob("*.csv"))
882 if csv_files:
883 logger.info(f"🔄 CONSOLIDATION: Found {len(csv_files)} CSV files, running consolidation")
884 # Get well IDs from compiled contexts
885 axis_ids = list(compiled_contexts.keys())
886 logger.info(f"🔄 CONSOLIDATION: Using well IDs: {axis_ids}")
888 consolidate_analysis_results(
889 results_directory=str(results_dir),
890 well_ids=axis_ids,
891 consolidation_config=shared_context.analysis_consolidation_config,
892 plate_metadata_config=shared_context.plate_metadata_config
893 )
894 logger.info("✅ CONSOLIDATION: Completed successfully")
895 else:
896 logger.info(f"⏭️ CONSOLIDATION: No CSV files found in {results_dir}, skipping")
897 else:
898 logger.info(f"⏭️ CONSOLIDATION: No results directory found in compiled contexts")
899 except Exception as e:
900 logger.error(f"❌ CONSOLIDATION: Failed: {e}")
902 # Update state based on execution results
903 if all(result.get("status") == "success" for result in execution_results.values()):
904 self._state = OrchestratorState.COMPLETED
905 else:
906 self._state = OrchestratorState.EXEC_FAILED
908 # 🔬 VISUALIZER CLEANUP: Stop napari visualizer if it was auto-created and not persistent
909 if visualizer is not None:
910 try:
911 if not visualizer.persistent:
912 visualizer.stop_viewer()
913 logger.info("🔬 ORCHESTRATOR: Stopped non-persistent napari visualizer")
914 else:
915 logger.info("🔬 ORCHESTRATOR: Keeping persistent napari visualizer alive")
916 # Just cleanup ZMQ connection, leave process running
917 visualizer._cleanup_zmq()
918 except Exception as e:
919 logger.warning(f"🔬 ORCHESTRATOR: Failed to cleanup napari visualizer: {e}")
921 logger.info(f"🔥 ORCHESTRATOR: Plate execution finished. Results: {execution_results}")
923 return execution_results
924 except Exception as e:
925 self._state = OrchestratorState.EXEC_FAILED
926 logger.error(f"Failed to execute compiled plate: {e}")
927 raise
929 def get_component_keys(self, component: Union['AllComponents', 'VariableComponents'], component_filter: Optional[List[Union[str, int]]] = None) -> List[str]:
930 """
931 Generic method to get component keys using VariableComponents directly.
933 Returns the discovered component values as strings to match the pattern
934 detection system format.
936 Tries metadata cache first, falls back to filename parsing cache if metadata is empty.
938 Args:
939 component: AllComponents or VariableComponents enum specifying which component to extract
940 (also accepts GroupBy enum which will be converted to AllComponents)
941 component_filter: Optional list of component values to filter by
943 Returns:
944 List of component values as strings, sorted
946 Raises:
947 RuntimeError: If orchestrator is not initialized
948 """
949 if not self.is_initialized(): 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true
950 raise RuntimeError("Orchestrator must be initialized before getting component keys.")
952 # Convert GroupBy to AllComponents using OpenHCS generic utility
953 if isinstance(component, GroupBy) and component.value is None: 953 ↛ 954line 953 didn't jump to line 954 because the condition on line 953 was never true
954 raise ValueError("Cannot get component keys for GroupBy.NONE")
956 # Convert to AllComponents for cache lookup (includes multiprocessing axis)
957 component = convert_enum_by_value(component, AllComponents) or component
959 # Use component directly - let natural errors occur for wrong types
960 component_name = component.value
962 # Try metadata cache first (preferred source)
963 cached_metadata = self._metadata_cache_service.get_cached_metadata(component)
964 if cached_metadata: 964 ↛ 969line 964 didn't jump to line 969 because the condition on line 964 was always true
965 all_components = list(cached_metadata.keys())
966 logger.debug(f"Using metadata cache for {component_name}: {len(all_components)} components")
967 else:
968 # Fall back to filename parsing cache
969 all_components = self._component_keys_cache[component] # Let KeyError bubble up naturally
971 if not all_components:
972 logger.warning(f"No {component_name} values found in input directory: {self.input_dir}")
973 return []
975 logger.debug(f"Using filename parsing cache for {component.value}: {len(all_components)} components")
977 if component_filter:
978 str_component_filter = {str(c) for c in component_filter}
979 selected_components = [comp for comp in all_components if comp in str_component_filter]
980 if not selected_components: 980 ↛ 981line 980 didn't jump to line 981 because the condition on line 980 was never true
981 component_name = group_by.value
982 logger.warning(f"No {component_name} values from {all_components} match the filter: {component_filter}")
983 return selected_components
984 else:
985 return all_components
987 def cache_component_keys(self, components: Optional[List['AllComponents']] = None) -> None:
988 """
989 Pre-compute and cache component keys for fast access using single-pass parsing.
991 This method performs expensive file listing and parsing operations once,
992 extracting all component types in a single pass for maximum efficiency.
994 Args:
995 components: Optional list of AllComponents to cache.
996 If None, caches all components in the AllComponents enum.
997 """
998 if not self.is_initialized(): 998 ↛ 999line 998 didn't jump to line 999 because the condition on line 998 was never true
999 raise RuntimeError("Orchestrator must be initialized before caching component keys.")
1001 if components is None: 1001 ↛ 1004line 1001 didn't jump to line 1004 because the condition on line 1001 was always true
1002 components = list(AllComponents) # Cache all enum values including multiprocessing axis
1004 logger.info(f"Caching component keys for: {[comp.value for comp in components]}")
1006 # Initialize component sets for all requested components
1007 component_sets: Dict['AllComponents', Set[Union[str, int]]] = {}
1008 for component in components:
1009 component_sets[component] = set()
1011 # Single pass through all filenames - extract all components at once
1012 try:
1013 # Use primary backend from microscope handler
1014 backend_to_use = self.microscope_handler.get_primary_backend(self.input_dir)
1015 logger.debug(f"Using backend '{backend_to_use}' for file listing based on available backends")
1017 filenames = self.filemanager.list_files(str(self.input_dir), backend_to_use, extensions=DEFAULT_IMAGE_EXTENSIONS)
1018 logger.debug(f"Parsing {len(filenames)} filenames in single pass...")
1020 for filename in filenames:
1021 parsed_info = self.microscope_handler.parser.parse_filename(str(filename))
1022 if parsed_info: 1022 ↛ 1029line 1022 didn't jump to line 1029 because the condition on line 1022 was always true
1023 # Extract all requested components from this filename
1024 for component in component_sets:
1025 component_name = component.value
1026 if component_name in parsed_info and parsed_info[component_name] is not None: 1026 ↛ 1024line 1026 didn't jump to line 1024 because the condition on line 1026 was always true
1027 component_sets[component].add(parsed_info[component_name])
1028 else:
1029 logger.warning(f"Could not parse filename: {filename}")
1031 except Exception as e:
1032 logger.error(f"Error listing files or parsing filenames from {self.input_dir}: {e}", exc_info=True)
1033 # Initialize empty sets for failed parsing
1034 for component in component_sets:
1035 component_sets[component] = set()
1037 # Convert sets to sorted lists and store in cache
1038 for component, component_set in component_sets.items():
1039 sorted_components = [str(comp) for comp in sorted(list(component_set))]
1040 self._component_keys_cache[component] = sorted_components
1041 logger.debug(f"Cached {len(sorted_components)} {component.value} keys")
1043 if not sorted_components: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true
1044 logger.warning(f"No {component.value} values found in input directory: {self.input_dir}")
1046 logger.info(f"Component key caching complete. Cached {len(component_sets)} component types in single pass.")
1048 def clear_component_cache(self, components: Optional[List['AllComponents']] = None) -> None:
1049 """
1050 Clear cached component keys to force recomputation.
1052 Use this when the input directory contents have changed and you need
1053 to refresh the component key cache.
1055 Args:
1056 components: Optional list of AllComponents to clear from cache.
1057 If None, clears entire cache.
1058 """
1059 if components is None:
1060 self._component_keys_cache.clear()
1061 logger.info("Cleared entire component keys cache")
1062 else:
1063 for component in components:
1064 if component in self._component_keys_cache:
1065 del self._component_keys_cache[component]
1066 logger.debug(f"Cleared cache for {component.value}")
1067 logger.info(f"Cleared cache for {len(components)} component types")
1069 @property
1070 def metadata_cache(self) -> MetadataCache:
1071 """Access to metadata cache service."""
1072 return self._metadata_cache_service
1076 # Global config management removed - handled by UI layer
1078 @property
1079 def pipeline_config(self) -> Optional['PipelineConfig']:
1080 """Get current pipeline configuration."""
1081 return self._pipeline_config
1083 @pipeline_config.setter
1084 def pipeline_config(self, value: Optional['PipelineConfig']) -> None:
1085 """Set pipeline configuration with auto-sync to thread-local context."""
1086 self._pipeline_config = value
1087 # CRITICAL FIX: Also update public attribute for dual-axis resolver discovery
1088 # This ensures the resolver can always find the current pipeline config
1089 if hasattr(self, '__dict__'): # Avoid issues during __init__ 1089 ↛ 1091line 1089 didn't jump to line 1091 because the condition on line 1089 was always true
1090 self.__dict__['pipeline_config'] = value
1091 if self._auto_sync_enabled and value is not None:
1092 self._sync_to_thread_local()
1094 def _sync_to_thread_local(self) -> None:
1095 """Internal method to sync current pipeline_config to thread-local context."""
1096 if self._pipeline_config and hasattr(self, 'plate_path'): 1096 ↛ 1097line 1096 didn't jump to line 1097 because the condition on line 1096 was never true
1097 self.apply_pipeline_config(self._pipeline_config)
1099 def apply_pipeline_config(self, pipeline_config: 'PipelineConfig') -> None:
1100 """
1101 Apply per-orchestrator configuration using thread-local storage.
1103 This method sets the orchestrator's effective config in thread-local storage
1104 for step-level lazy configurations to resolve against.
1105 """
1106 # Import PipelineConfig at runtime for isinstance check
1107 from openhcs.core.config import PipelineConfig
1108 if not isinstance(pipeline_config, PipelineConfig):
1109 raise TypeError(f"Expected PipelineConfig, got {type(pipeline_config)}")
1111 # Temporarily disable auto-sync to prevent recursion
1112 self._auto_sync_enabled = False
1113 try:
1114 self._pipeline_config = pipeline_config
1115 finally:
1116 self._auto_sync_enabled = True
1118 # CRITICAL FIX: Do NOT contaminate thread-local context during PipelineConfig editing
1119 # The orchestrator should maintain its own internal context without modifying
1120 # the global thread-local context. This prevents reset operations from showing
1121 # orchestrator's saved values instead of original thread-local defaults.
1122 #
1123 # The merged config is computed internally and used by get_effective_config()
1124 # but should NOT be set as the global thread-local context.
1126 logger.info(f"Applied orchestrator config for plate: {self.plate_path}")
1128 def get_effective_config(self, *, for_serialization: bool = False) -> GlobalPipelineConfig:
1129 """
1130 Get effective configuration for this orchestrator.
1132 Args:
1133 for_serialization: If True, resolves all values for pickling/storage.
1134 If False, preserves None values for sibling inheritance.
1135 """
1137 if for_serialization: 1137 ↛ 1138line 1137 didn't jump to line 1138 because the condition on line 1137 was never true
1138 result = self.pipeline_config.to_base_config()
1140 # DEBUG: Check what the serialization result looks like
1141 if hasattr(result, 'step_well_filter_config'):
1142 step_config = getattr(result, 'step_well_filter_config')
1143 if hasattr(step_config, 'well_filter'):
1144 well_filter_value = getattr(step_config, 'well_filter')
1145 logger.debug(f"Serialization result has step_well_filter_config.well_filter = {well_filter_value}")
1147 return result
1148 else:
1149 # Reuse existing merged config logic from apply_pipeline_config
1150 shared_context = get_current_global_config(GlobalPipelineConfig)
1151 if not shared_context: 1151 ↛ 1152line 1151 didn't jump to line 1152 because the condition on line 1151 was never true
1152 raise RuntimeError("No global configuration context available for merging")
1154 # DEBUG: Check what the shared context looks like before merging
1155 if hasattr(shared_context, 'step_well_filter_config'): 1155 ↛ 1161line 1155 didn't jump to line 1161 because the condition on line 1155 was always true
1156 step_config = getattr(shared_context, 'step_well_filter_config')
1157 if hasattr(step_config, 'well_filter'): 1157 ↛ 1161line 1157 didn't jump to line 1161 because the condition on line 1157 was always true
1158 well_filter_value = getattr(step_config, 'well_filter')
1159 logger.debug(f"Shared context before merge has step_well_filter_config.well_filter = {well_filter_value}")
1161 result = _create_merged_config(self.pipeline_config, shared_context)
1163 # DEBUG: Check what the merged result looks like
1164 if hasattr(result, 'step_well_filter_config'): 1164 ↛ 1170line 1164 didn't jump to line 1170 because the condition on line 1164 was always true
1165 step_config = getattr(result, 'step_well_filter_config')
1166 if hasattr(step_config, 'well_filter'): 1166 ↛ 1170line 1166 didn't jump to line 1170 because the condition on line 1166 was always true
1167 well_filter_value = getattr(step_config, 'well_filter')
1168 logger.debug(f"Merged result has step_well_filter_config.well_filter = {well_filter_value}")
1170 return result
1174 def clear_pipeline_config(self) -> None:
1175 """Clear per-orchestrator configuration."""
1176 # REMOVED: Thread-local modification - dual-axis resolver handles context automatically
1177 # No need to modify thread-local storage when clearing orchestrator config
1178 self.pipeline_config = None
1179 logger.info(f"Cleared per-orchestrator config for plate: {self.plate_path}")
1181 def cleanup_pipeline_config(self) -> None:
1182 """Clean up orchestrator context when done (for backward compatibility)."""
1183 self.clear_pipeline_config()
1185 def __del__(self):
1186 """Ensure config cleanup on orchestrator destruction."""
1187 try:
1188 # Clear any stored configuration references
1189 self.clear_pipeline_config()
1190 except Exception:
1191 # Ignore errors during cleanup in destructor to prevent cascading failures
1192 pass