Coverage for openhcs/core/orchestrator/orchestrator.py: 57.8%
475 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Consolidated orchestrator module for OpenHCS.
4This module provides a unified PipelineOrchestrator class that implements
5a two-phase (compile-all-then-execute-all) pipeline execution model.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 66 — Immutability After Construction
10- Clause 88 — No Inferred Capabilities
11- Clause 293 — GPU Pre-Declaration Enforcement
12- Clause 295 — GPU Scheduling Affinity
13"""
15import logging
16import concurrent.futures
17import multiprocessing
18from pathlib import Path
19from typing import Any, Callable, Dict, List, Optional, Union, Set
21from openhcs.constants.constants import Backend, DEFAULT_WORKSPACE_DIR_SUFFIX, DEFAULT_IMAGE_EXTENSIONS, GroupBy, OrchestratorState
22from openhcs.constants import Microscope
23from openhcs.core.config import GlobalPipelineConfig, get_default_global_config, PipelineConfig
24from openhcs.core.context.processing_context import ProcessingContext
25from openhcs.core.pipeline.compiler import PipelineCompiler
26from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper
27from openhcs.core.steps.abstract import AbstractStep, get_step_id
28from openhcs.io.exceptions import StorageWriteError
29from openhcs.io.filemanager import FileManager
30from openhcs.io.base import storage_registry
31from openhcs.microscopes import create_microscope_handler
32from openhcs.microscopes.microscope_base import MicroscopeHandler
34# Optional napari import for visualization
35try:
36 from openhcs.runtime.napari_stream_visualizer import NapariStreamVisualizer
37 NapariVisualizerType = NapariStreamVisualizer
38except ImportError:
39 # Create a placeholder type for type hints when napari is not available
40 NapariStreamVisualizer = None
41 NapariVisualizerType = Any # Use Any for type hints when napari is not available
44logger = logging.getLogger(__name__)
47def _configure_worker_logging(log_file_base: str):
48 """
49 Configure logging and import hook for worker process.
51 This function is called once per worker process when it starts.
52 Each worker will get its own log file with a unique identifier.
54 Args:
55 log_file_base: Base path for worker log files
56 """
57 import os
58 import logging
59 import time
61 # CRITICAL: Skip function registry initialization for fast worker startup
62 # The environment variable is inherited from the subprocess runner
63 # Note: We don't log this yet because logging isn't configured
65 # CRITICAL: Install import hook for auto-discovered functions
66 # Worker processes are fresh Python processes that need the import hook
67 try:
68 from openhcs.processing.func_registry import _install_import_hook
69 _install_import_hook()
70 # Note: We don't log this yet because logging isn't configured
71 except Exception:
72 # Can't log yet, but this is critical - the worker will fail later
73 pass
75 # Create unique worker identifier using PID and timestamp
76 worker_pid = os.getpid()
77 worker_timestamp = int(time.time() * 1000000) # Microsecond precision for uniqueness
78 worker_id = f"{worker_pid}_{worker_timestamp}"
79 worker_log_file = f"{log_file_base}_worker_{worker_id}.log"
81 # Configure root logger to capture ALL logs from worker process
82 root_logger = logging.getLogger()
83 root_logger.handlers.clear() # Clear any inherited handlers
85 # Create file handler for worker logs
86 file_handler = logging.FileHandler(worker_log_file)
87 file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
88 root_logger.addHandler(file_handler)
89 root_logger.setLevel(logging.INFO)
91 # Ensure all OpenHCS module logs are captured
92 logging.getLogger("openhcs").setLevel(logging.INFO)
94 # Get worker logger
95 worker_logger = logging.getLogger("openhcs.worker")
96 worker_logger.info(f"🔥 WORKER: Process {worker_pid} (ID: {worker_id}) logging configured")
97 worker_logger.info(f"🔥 WORKER: All logs writing to: {worker_log_file}")
99 # Log import hook installation status
100 worker_logger.info(f"🔥 WORKER: Import hook installed for auto-discovered functions")
103# Global variable to store log file base for worker processes
104_worker_log_file_base = None
107def _ensure_step_ids_for_multiprocessing(
108 frozen_context: ProcessingContext,
109 pipeline_definition: List[AbstractStep],
110 well_id: str
111) -> None:
112 """
113 Helper function to update step IDs after multiprocessing pickle/unpickle.
115 When contexts are pickled/unpickled for multiprocessing, step objects get
116 new memory addresses, changing their IDs. This remaps the step_plans.
117 """
118 from openhcs.core.pipeline.compiler import PipelineCompiler
119 try:
120 logger.debug(f"🔥 MULTIPROCESSING: Updating step IDs for well {well_id}")
121 PipelineCompiler.update_step_ids_for_multiprocessing(frozen_context, pipeline_definition)
122 logger.debug(f"🔥 MULTIPROCESSING: Step IDs updated successfully for well {well_id}")
123 except Exception as remap_error:
124 error_msg = f"🔥 MULTIPROCESSING ERROR: Failed to remap step IDs for well {well_id}: {remap_error}"
125 logger.error(error_msg, exc_info=True)
126 raise RuntimeError(error_msg) from remap_error
129class PipelineOrchestrator:
130 """
131 Updated orchestrator supporting both global and per-orchestrator configuration.
133 Global configuration: Updates all orchestrators (existing behavior)
134 Per-orchestrator configuration: Affects only this orchestrator instance
136 The orchestrator first compiles the pipeline for all specified wells,
137 creating frozen, immutable ProcessingContexts using `compile_plate_for_processing()`.
138 Then, it executes the (now stateless) pipeline definition against these contexts,
139 potentially in parallel, using `execute_compiled_plate()`.
140 """
142 def __init__(
143 self,
144 plate_path: Union[str, Path],
145 workspace_path: Optional[Union[str, Path]] = None,
146 *,
147 global_config: Optional[GlobalPipelineConfig] = None,
148 pipeline_config: Optional[PipelineConfig] = None,
149 storage_registry: Optional[Any] = None, # Optional StorageRegistry instance
150 ):
151 # Lock removed - was orphaned code never used
153 if global_config is None: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 self.global_config = get_default_global_config()
155 logger.info("PipelineOrchestrator using default global configuration.")
156 else:
157 self.global_config = global_config
159 # Initialize per-orchestrator configuration
160 self.pipeline_config = pipeline_config # Per-orchestrator overrides
164 # Set current pipeline config for MaterializationPathConfig defaults
165 from openhcs.core.config import set_current_pipeline_config
166 set_current_pipeline_config(self.global_config)
168 if plate_path is None: 168 ↛ 172line 168 didn't jump to line 172 because the condition on line 168 was never true
169 # This case should ideally be prevented by TUI logic if plate_path is mandatory
170 # for an orchestrator instance tied to a specific plate.
171 # If workspace_path is also None, this will be caught later.
172 pass
173 elif isinstance(plate_path, str): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 plate_path = Path(plate_path)
175 elif not isinstance(plate_path, Path): 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 raise ValueError(f"Invalid plate_path type: {type(plate_path)}. Must be str or Path.")
178 if plate_path: 178 ↛ 187line 178 didn't jump to line 187 because the condition on line 178 was always true
179 if not plate_path.is_absolute(): 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 raise ValueError(f"Plate path must be absolute: {plate_path}")
181 if not plate_path.exists(): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 raise FileNotFoundError(f"Plate path does not exist: {plate_path}")
183 if not plate_path.is_dir(): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 raise NotADirectoryError(f"Plate path is not a directory: {plate_path}")
186 # Initialize _plate_path_frozen first to allow plate_path to be set during initialization
187 object.__setattr__(self, '_plate_path_frozen', False)
189 self.plate_path = plate_path
190 self.workspace_path = workspace_path
192 if self.plate_path is None and self.workspace_path is None: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 raise ValueError("Either plate_path or workspace_path must be provided for PipelineOrchestrator.")
195 # Freeze plate_path immediately after setting it to prove immutability
196 object.__setattr__(self, '_plate_path_frozen', True)
197 logger.info(f"🔒 PLATE_PATH FROZEN: {self.plate_path} is now immutable")
199 if storage_registry: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 self.registry = storage_registry
201 logger.info("PipelineOrchestrator using provided StorageRegistry instance.")
202 else:
203 from openhcs.io.base import storage_registry as global_registry
204 # Create a copy of the global registry to avoid modifying shared state
205 self.registry = global_registry.copy()
206 logger.info("PipelineOrchestrator created its own StorageRegistry instance (copy of global).")
208 # Override zarr backend with orchestrator's config
209 from openhcs.io.zarr import ZarrStorageBackend
210 from openhcs.constants.constants import Backend
212 zarr_backend_with_config = ZarrStorageBackend(self.global_config.zarr)
213 self.registry[Backend.ZARR.value] = zarr_backend_with_config
214 logger.info(f"Orchestrator zarr backend configured with {self.global_config.zarr.compressor.value} compression")
216 # Orchestrator always creates its own FileManager, using the determined registry
217 self.filemanager = FileManager(self.registry)
218 self.input_dir: Optional[Path] = None
219 self.microscope_handler: Optional[MicroscopeHandler] = None
220 self.default_pipeline_definition: Optional[List[AbstractStep]] = None
221 self._initialized: bool = False
222 self._state: OrchestratorState = OrchestratorState.CREATED
224 # Component keys cache for fast access
225 self._component_keys_cache: Dict[GroupBy, List[str]] = {}
227 # Metadata cache for component key→name mappings
228 self._metadata_cache: Dict[GroupBy, Dict[str, Optional[str]]] = {}
230 def __setattr__(self, name: str, value: Any) -> None:
231 """
232 Set an attribute, preventing modification of plate_path after it's frozen.
234 This proves that plate_path is truly immutable after initialization.
235 """
236 if name == 'plate_path' and getattr(self, '_plate_path_frozen', False): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 import traceback
238 stack_trace = ''.join(traceback.format_stack())
239 error_msg = (
240 f"🚫 IMMUTABLE PLATE_PATH VIOLATION: Cannot modify plate_path after freezing!\n"
241 f"Current value: {getattr(self, 'plate_path', 'UNSET')}\n"
242 f"Attempted new value: {value}\n"
243 f"Stack trace:\n{stack_trace}"
244 )
245 logger.error(error_msg)
246 raise AttributeError(error_msg)
247 super().__setattr__(name, value)
249 @property
250 def state(self) -> OrchestratorState:
251 """Get the current orchestrator state."""
252 return self._state
254 def initialize_microscope_handler(self):
255 """Initializes the microscope handler."""
256 if self.microscope_handler is not None: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 logger.debug("Microscope handler already initialized.")
258 return
259# if self.input_dir is None:
260# raise RuntimeError("Workspace (and input_dir) must be initialized before microscope handler.")
262 logger.info(f"Initializing microscope handler using input directory: {self.input_dir}...")
263 try:
264 # Use configured microscope type or auto-detect
265 microscope_type = self.global_config.microscope.value if self.global_config.microscope != Microscope.AUTO else 'auto'
266 self.microscope_handler = create_microscope_handler(
267 plate_folder=str(self.plate_path),
268 filemanager=self.filemanager,
269 microscope_type=microscope_type,
270 )
271 logger.info(f"Initialized microscope handler: {type(self.microscope_handler).__name__}")
272 except Exception as e:
273 error_msg = f"Failed to create microscope handler: {e}"
274 logger.error(error_msg)
275 raise RuntimeError(error_msg) from e
277 def initialize(self, workspace_path: Optional[Union[str, Path]] = None) -> 'PipelineOrchestrator':
278 """
279 Initializes all required components for the orchestrator.
280 Must be called before other processing methods.
281 Returns self for chaining.
282 """
283 if self._initialized: 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true
284 logger.info("Orchestrator already initialized.")
285 return self
287 try:
288 self.initialize_microscope_handler()
290 # Delegate workspace initialization to microscope handler
291 logger.info("Initializing workspace with microscope handler...")
292 actual_image_dir = self.microscope_handler.initialize_workspace(
293 self.plate_path, workspace_path, self.filemanager
294 )
296 # Use the actual image directory returned by the microscope handler
297 self.input_dir = Path(actual_image_dir)
298 logger.info(f"Set input directory to: {self.input_dir}")
300 # Set workspace_path based on what the handler returned
301 if actual_image_dir != self.plate_path: 301 ↛ 306line 301 didn't jump to line 306 because the condition on line 301 was always true
302 # Handler created a workspace
303 self.workspace_path = Path(actual_image_dir).parent if Path(actual_image_dir).name != "workspace" else Path(actual_image_dir)
304 else:
305 # Handler used plate directly (like OpenHCS)
306 self.workspace_path = None
308 # Mark as initialized BEFORE caching to avoid chicken-and-egg problem
309 self._initialized = True
310 self._state = OrchestratorState.READY
312 # Auto-cache component keys and metadata for instant access
313 logger.info("Caching component keys and metadata...")
314 self.cache_component_keys()
315 self.cache_metadata()
317 logger.info("PipelineOrchestrator fully initialized with cached component keys and metadata.")
318 return self
319 except Exception as e:
320 self._state = OrchestratorState.INIT_FAILED
321 logger.error(f"Failed to initialize orchestrator: {e}")
322 raise
324 def is_initialized(self) -> bool:
325 return self._initialized
327 def create_context(self, well_id: str) -> ProcessingContext:
328 """Creates a ProcessingContext for a given well."""
329 if not self.is_initialized(): 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 raise RuntimeError("Orchestrator must be initialized before calling create_context().")
331 if not well_id: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 raise ValueError("Well identifier must be provided.")
333 if self.input_dir is None: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 raise RuntimeError("Orchestrator input_dir is not set; initialize orchestrator first.")
336 context = ProcessingContext(
337 global_config=self.global_config,
338 well_id=well_id,
339 filemanager=self.filemanager
340 )
341 # Orchestrator reference removed - was orphaned and unpickleable
342 context.microscope_handler = self.microscope_handler
343 context.input_dir = self.input_dir
344 context.workspace_path = self.workspace_path
345 context.plate_path = self.plate_path # Add plate_path for path planner
346 # Pass metadata cache for OpenHCS metadata creation
347 context.metadata_cache = dict(self._metadata_cache) # Copy to avoid pickling issues
348 return context
350 def compile_pipelines(
351 self,
352 pipeline_definition: List[AbstractStep],
353 well_filter: Optional[List[str]] = None,
354 enable_visualizer_override: bool = False
355 ) -> Dict[str, ProcessingContext]:
356 """
357 Compile-all phase: Prepares frozen ProcessingContexts for each well.
359 This method delegates to PipelineCompiler.compile_pipelines() to handle
360 the actual compilation logic while providing orchestrator context.
362 Args:
363 pipeline_definition: The list of AbstractStep objects defining the pipeline.
364 well_filter: Optional list of well IDs to process. If None, processes all found wells.
365 enable_visualizer_override: If True, all steps in all compiled contexts
366 will have their 'visualize' flag set to True.
368 Returns:
369 A dictionary mapping well IDs to their compiled and frozen ProcessingContexts.
370 The input `pipeline_definition` list (of step objects) is modified in-place
371 to become stateless.
372 """
373 return PipelineCompiler.compile_pipelines(
374 orchestrator=self,
375 pipeline_definition=pipeline_definition,
376 well_filter=well_filter,
377 enable_visualizer_override=enable_visualizer_override
378 )
380 def _execute_single_well(
381 self,
382 pipeline_definition: List[AbstractStep],
383 frozen_context: ProcessingContext,
384 visualizer: Optional[NapariVisualizerType]
385 ) -> Dict[str, Any]:
386 """Executes the pipeline for a single well using its frozen context."""
387 well_id = frozen_context.well_id
388 logger.info(f"🔥 SINGLE_WELL: Starting execution for well {well_id}")
390 # NUCLEAR VALIDATION
391 if not frozen_context.is_frozen(): 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 error_msg = f"🔥 SINGLE_WELL ERROR: Context for well {well_id} is not frozen before execution"
393 logger.error(error_msg)
394 raise RuntimeError(error_msg)
396 if not pipeline_definition: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true
397 error_msg = f"🔥 SINGLE_WELL ERROR: Empty pipeline_definition for well {well_id}"
398 logger.error(error_msg)
399 raise RuntimeError(error_msg)
401 # MULTIPROCESSING FIX: Update step IDs after pickle/unpickle
402 _ensure_step_ids_for_multiprocessing(frozen_context, pipeline_definition, well_id)
404 logger.info(f"🔥 SINGLE_WELL: Processing {len(pipeline_definition)} steps for well {well_id}")
406 for step_index, step in enumerate(pipeline_definition):
407 # Generate step_id from object reference (elegant stateless approach)
408 step_id = get_step_id(step)
409 step_name = getattr(step, 'name', 'N/A') if hasattr(step, 'name') else 'N/A'
411 logger.info(f"🔥 SINGLE_WELL: Executing step {step_index+1}/{len(pipeline_definition)} - {step_id} ({step_name}) for well {well_id}")
413 if not hasattr(step, 'process'): 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true
414 error_msg = f"🔥 SINGLE_WELL ERROR: Step {step_id} missing process method for well {well_id}"
415 logger.error(error_msg)
416 raise RuntimeError(error_msg)
418 step.process(frozen_context)
419 logger.info(f"🔥 SINGLE_WELL: Step {step_index+1}/{len(pipeline_definition)} - {step_id} completed for well {well_id}")
421 # except Exception as step_error:
422 # import traceback
423 # full_traceback = traceback.format_exc()
424 # error_msg = f"🔥 SINGLE_WELL ERROR: Step {step_index+1} ({step_id}) failed for well {well_id}: {step_error}"
425 # logger.error(error_msg, exc_info=True)
426 # logger.error(f"🔥 SINGLE_WELL TRACEBACK for well {well_id}, step {step_index+1} ({step_id}):\n{full_traceback}")
427 # raise RuntimeError(error_msg) from step_error
429 if visualizer: 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true
430 step_plan = frozen_context.step_plans[step_id]
431 if step_plan['visualize']:
432 output_dir = step_plan['output_dir']
433 write_backend = step_plan['write_backend']
434 if output_dir:
435 logger.debug(f"Visualizing output for step {step_id} from path {output_dir} (backend: {write_backend}) for well {well_id}")
436 visualizer.visualize_path(
437 step_id=step_id,
438 path=str(output_dir),
439 backend=write_backend,
440 well_id=well_id
441 )
442 else:
443 logger.warning(f"Step {step_id} in well {well_id} flagged for visualization but 'output_dir' is missing in its plan.")
445 logger.info(f"🔥 SINGLE_WELL: Pipeline execution completed successfully for well {well_id}")
446 return {"status": "success", "well_id": well_id}
448 def execute_compiled_plate(
449 self,
450 pipeline_definition: List[AbstractStep],
451 compiled_contexts: Dict[str, ProcessingContext],
452 max_workers: Optional[int] = None,
453 visualizer: Optional[NapariVisualizerType] = None,
454 log_file_base: Optional[str] = None
455 ) -> Dict[str, Dict[str, Any]]:
456 """
457 Execute-all phase: Runs the stateless pipeline against compiled contexts.
459 Args:
460 pipeline_definition: The stateless list of AbstractStep objects.
461 compiled_contexts: Dict of well_id to its compiled, frozen ProcessingContext.
462 Obtained from `compile_plate_for_processing`.
463 max_workers: Maximum number of worker threads for parallel execution.
464 visualizer: Optional instance of NapariStreamVisualizer for real-time visualization
465 (requires napari to be installed; must be initialized with orchestrator's filemanager by the caller).
466 log_file_base: Base path for worker process log files (without extension).
467 Each worker will create its own log file: {log_file_base}_worker_{pid}.log
469 Returns:
470 A dictionary mapping well IDs to their execution status (success/error and details).
471 """
472 if not self.is_initialized(): 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true
473 raise RuntimeError("Orchestrator must be initialized before executing.")
474 if not pipeline_definition: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true
475 raise ValueError("A valid (stateless) pipeline definition must be provided.")
476 if not compiled_contexts: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true
477 logger.warning("No compiled contexts provided for execution.")
478 return {}
480 actual_max_workers = max_workers if max_workers is not None else self.global_config.num_workers
481 if actual_max_workers <= 0: # Ensure positive number of workers 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 actual_max_workers = 1
484 self._state = OrchestratorState.EXECUTING
485 logger.info(f"Starting execution for {len(compiled_contexts)} wells with max_workers={actual_max_workers}.")
487 # 🔍 VRAM TRACKING: Log initial memory state
488 try:
489 from openhcs.core.memory.gpu_cleanup import log_gpu_memory_usage
490 log_gpu_memory_usage("plate execution start")
491 except Exception:
492 pass
494 try:
495 execution_results: Dict[str, Dict[str, Any]] = {}
497 # CUDA COMPATIBILITY: Set spawn method for multiprocessing to support CUDA
498 try:
499 # Check if spawn method is available and set it if not already set
500 current_method = multiprocessing.get_start_method(allow_none=True)
501 if current_method != 'spawn':
502 logger.info(f"🔥 CUDA: Setting multiprocessing start method from '{current_method}' to 'spawn' for CUDA compatibility")
503 multiprocessing.set_start_method('spawn', force=True)
504 else:
505 logger.debug("🔥 CUDA: Multiprocessing start method already set to 'spawn'")
506 except RuntimeError as e:
507 # Start method may already be set, which is fine
508 logger.debug(f"🔥 CUDA: Start method already configured: {e}")
510 # Choose executor type based on global config for debugging support
511 executor_type = "ThreadPoolExecutor" if self.global_config.use_threading else "ProcessPoolExecutor"
512 logger.info(f"🔥 ORCHESTRATOR: Creating {executor_type} with {actual_max_workers} workers")
514 # DEATH DETECTION: Mark executor creation
515 logger.info(f"🔥 DEATH_MARKER: BEFORE_{executor_type.upper()}_CREATION")
517 # Choose appropriate executor class and configure worker logging
518 if self.global_config.use_threading: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true
519 logger.info("🔥 DEBUG MODE: Using ThreadPoolExecutor for easier debugging")
520 executor = concurrent.futures.ThreadPoolExecutor(max_workers=actual_max_workers)
521 else:
522 logger.info("🔥 PRODUCTION MODE: Using ProcessPoolExecutor for true parallelism")
523 if log_file_base: 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 logger.info(f"🔥 WORKER LOGGING: Configuring worker processes with log base: {log_file_base}")
525 executor = concurrent.futures.ProcessPoolExecutor(
526 max_workers=actual_max_workers,
527 initializer=_configure_worker_logging,
528 initargs=(log_file_base,)
529 )
530 else:
531 logger.info("🔥 WORKER LOGGING: No log base provided, workers will inherit logging")
532 executor = concurrent.futures.ProcessPoolExecutor(max_workers=actual_max_workers)
534 logger.info(f"🔥 DEATH_MARKER: ENTERING_{executor_type.upper()}_CONTEXT")
535 with executor:
536 logger.info(f"🔥 DEATH_MARKER: {executor_type.upper()}_CREATED_SUCCESSFULLY")
537 logger.info(f"🔥 ORCHESTRATOR: {executor_type} created, submitting {len(compiled_contexts)} tasks")
539 # NUCLEAR ERROR TRACING: Create snapshot of compiled_contexts to prevent iteration issues
540 contexts_snapshot = dict(compiled_contexts.items())
541 logger.info(f"🔥 ORCHESTRATOR: Created contexts snapshot with {len(contexts_snapshot)} items")
543 logger.info("🔥 DEATH_MARKER: BEFORE_TASK_SUBMISSION_LOOP")
544 future_to_well_id = {}
545 for well_id, context in contexts_snapshot.items():
546 try:
547 logger.info(f"🔥 DEATH_MARKER: SUBMITTING_TASK_FOR_WELL_{well_id}")
548 logger.info(f"🔥 ORCHESTRATOR: Submitting task for well {well_id}")
549 future = executor.submit(self._execute_single_well, pipeline_definition, context, visualizer)
550 future_to_well_id[future] = well_id
551 logger.info(f"🔥 ORCHESTRATOR: Task submitted for well {well_id}")
552 logger.info(f"🔥 DEATH_MARKER: TASK_SUBMITTED_FOR_WELL_{well_id}")
553 except Exception as submit_error:
554 error_msg = f"🔥 ORCHESTRATOR ERROR: Failed to submit task for well {well_id}: {submit_error}"
555 logger.error(error_msg, exc_info=True)
556 # FAIL-FAST: Re-raise task submission errors immediately
557 raise
559 logger.info("🔥 DEATH_MARKER: TASK_SUBMISSION_LOOP_COMPLETED")
561 logger.info(f"🔥 ORCHESTRATOR: All {len(future_to_well_id)} tasks submitted, waiting for completion")
562 logger.info("🔥 DEATH_MARKER: BEFORE_COMPLETION_LOOP")
564 completed_count = 0
565 logger.info("🔥 DEATH_MARKER: ENTERING_AS_COMPLETED_LOOP")
566 for future in concurrent.futures.as_completed(future_to_well_id):
567 well_id = future_to_well_id[future]
568 completed_count += 1
569 logger.info(f"🔥 DEATH_MARKER: PROCESSING_COMPLETED_TASK_{completed_count}_WELL_{well_id}")
570 logger.info(f"🔥 ORCHESTRATOR: Task {completed_count}/{len(future_to_well_id)} completed for well {well_id}")
572 try:
573 logger.info(f"🔥 DEATH_MARKER: CALLING_FUTURE_RESULT_FOR_WELL_{well_id}")
574 result = future.result()
575 logger.info(f"🔥 DEATH_MARKER: FUTURE_RESULT_SUCCESS_FOR_WELL_{well_id}")
576 logger.info(f"🔥 ORCHESTRATOR: Well {well_id} result: {result}")
577 execution_results[well_id] = result
578 logger.info(f"🔥 DEATH_MARKER: RESULT_STORED_FOR_WELL_{well_id}")
579 except Exception as exc:
580 import traceback
581 full_traceback = traceback.format_exc()
582 error_msg = f"Well {well_id} generated an exception during execution: {exc}"
583 logger.error(f"🔥 ORCHESTRATOR ERROR: {error_msg}", exc_info=True)
584 logger.error(f"🔥 ORCHESTRATOR FULL TRACEBACK for well {well_id}:\n{full_traceback}")
585 # FAIL-FAST: Re-raise immediately instead of storing error
586 raise
588 logger.info("🔥 DEATH_MARKER: COMPLETION_LOOP_FINISHED")
590 logger.info(f"🔥 ORCHESTRATOR: All tasks completed, {len(execution_results)} results collected")
593 # 🔥 GPU CLEANUP: Clear GPU memory after plate execution
594 try:
595 from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks
596 cleanup_all_gpu_frameworks()
597 logger.debug("🔥 GPU CLEANUP: Cleared all GPU frameworks after plate execution")
598 except Exception as cleanup_error:
599 logger.warning(f"Failed to cleanup GPU memory after plate execution: {cleanup_error}")
603 logger.info(f"🔥 ORCHESTRATOR: Plate execution completed, checking for analysis consolidation")
604 # Run automatic analysis consolidation if enabled
605 if self.global_config.analysis_consolidation.enabled: 605 ↛ 648line 605 didn't jump to line 648 because the condition on line 605 was always true
606 try:
607 from openhcs.processing.backends.analysis.consolidate_analysis_results import consolidate_analysis_results
609 # Get results directory from compiled contexts (Option 2: use existing paths)
610 results_dir = None
611 for well_id, context in compiled_contexts.items():
612 # Look for any step that has an output_dir - this is where materialization happens
613 for step_id, step_plan in context.step_plans.items():
614 if 'output_dir' in step_plan: 614 ↛ 613line 614 didn't jump to line 613 because the condition on line 614 was always true
615 # Found an output directory, check if it has a results subdirectory
616 potential_results_dir = Path(step_plan['output_dir']) / self.global_config.materialization_results_path
617 if potential_results_dir.exists(): 617 ↛ 618line 617 didn't jump to line 618 because the condition on line 617 was never true
618 results_dir = potential_results_dir
619 logger.info(f"🔍 CONSOLIDATION: Found results directory from step {step_id}: {results_dir}")
620 break
621 if results_dir: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true
622 break
624 if results_dir and results_dir.exists(): 624 ↛ 626line 624 didn't jump to line 626 because the condition on line 624 was never true
625 # Check if there are actually CSV files (materialized results)
626 csv_files = list(results_dir.glob("*.csv"))
627 if csv_files:
628 logger.info(f"🔄 CONSOLIDATION: Found {len(csv_files)} CSV files, running consolidation")
629 # Get well IDs from compiled contexts
630 well_ids = list(compiled_contexts.keys())
631 logger.info(f"🔄 CONSOLIDATION: Using well IDs: {well_ids}")
633 consolidate_analysis_results(
634 results_directory=str(results_dir),
635 well_ids=well_ids,
636 consolidation_config=self.global_config.analysis_consolidation,
637 plate_metadata_config=self.global_config.plate_metadata
638 )
639 logger.info("✅ CONSOLIDATION: Completed successfully")
640 else:
641 logger.info(f"⏭️ CONSOLIDATION: No CSV files found in {results_dir}, skipping")
642 else:
643 logger.info(f"⏭️ CONSOLIDATION: No results directory found in compiled contexts")
644 except Exception as e:
645 logger.error(f"❌ CONSOLIDATION: Failed: {e}")
647 # Update state based on execution results
648 if all(result.get("status") == "success" for result in execution_results.values()): 648 ↛ 651line 648 didn't jump to line 651 because the condition on line 648 was always true
649 self._state = OrchestratorState.COMPLETED
650 else:
651 self._state = OrchestratorState.EXEC_FAILED
653 logger.info(f"🔥 ORCHESTRATOR: Plate execution finished. Results: {execution_results}")
655 return execution_results
656 except Exception as e:
657 self._state = OrchestratorState.EXEC_FAILED
658 logger.error(f"Failed to execute compiled plate: {e}")
659 raise
661 def get_component_keys(self, group_by: GroupBy, component_filter: Optional[List[Union[str, int]]] = None) -> List[str]:
662 """
663 Generic method to get component keys for any group_by type.
665 This method works with any GroupBy enum value (CHANNEL, SITE, Z_INDEX, WELL)
666 and returns the discovered component values as strings to match the pattern
667 detection system format.
669 Tries metadata cache first, falls back to filename parsing cache if metadata is empty.
671 Args:
672 group_by: GroupBy enum specifying which component to extract
673 component_filter: Optional list of component values to filter by
675 Returns:
676 List of component values as strings, sorted
678 Raises:
679 RuntimeError: If orchestrator is not initialized
680 """
681 if not self.is_initialized(): 681 ↛ 682line 681 didn't jump to line 682 because the condition on line 681 was never true
682 raise RuntimeError("Orchestrator must be initialized before getting component keys.")
684 # Try metadata cache first (preferred source)
685 if group_by in self._metadata_cache and self._metadata_cache[group_by]:
686 all_components = list(self._metadata_cache[group_by].keys())
687 logger.debug(f"Using metadata cache for {group_by.value}: {len(all_components)} components")
688 else:
689 # Fall back to filename parsing cache
690 if group_by not in self._component_keys_cache: 690 ↛ 691line 690 didn't jump to line 691 because the condition on line 690 was never true
691 raise RuntimeError(f"Component keys cache is empty for {group_by.value}. "
692 f"Ensure cache_component_keys() was called during initialization.")
694 all_components = self._component_keys_cache[group_by]
696 if not all_components: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true
697 component_name = group_by.value
698 logger.warning(f"No {component_name} values found in input directory: {self.input_dir}")
699 return []
701 logger.debug(f"Using filename parsing cache for {group_by.value}: {len(all_components)} components")
703 if component_filter:
704 str_component_filter = {str(c) for c in component_filter}
705 selected_components = [comp for comp in all_components if comp in str_component_filter]
706 if not selected_components: 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true
707 component_name = group_by.value
708 logger.warning(f"No {component_name} values from {all_components} match the filter: {component_filter}")
709 return selected_components
710 else:
711 return all_components
713 def cache_component_keys(self, components: Optional[List[GroupBy]] = None) -> None:
714 """
715 Pre-compute and cache component keys for fast access using single-pass parsing.
717 This method performs expensive file listing and parsing operations once,
718 extracting all component types in a single pass for maximum efficiency.
720 Args:
721 components: Optional list of GroupBy components to cache.
722 If None, caches all components in the GroupBy enum.
723 """
724 if not self.is_initialized(): 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true
725 raise RuntimeError("Orchestrator must be initialized before caching component keys.")
727 if components is None: 727 ↛ 730line 727 didn't jump to line 730 because the condition on line 727 was always true
728 components = list(GroupBy) # Cache all enum values
730 logger.info(f"Caching component keys for: {[comp.value for comp in components]}")
732 # Initialize component sets for all requested components
733 component_sets: Dict[GroupBy, Set[Union[str, int]]] = {}
734 for group_by in components:
735 if group_by != GroupBy.NONE: # Skip the empty enum
736 component_sets[group_by] = set()
738 # Single pass through all filenames - extract all components at once
739 try:
740 filenames = self.filemanager.list_files(str(self.input_dir), Backend.DISK.value, extensions=DEFAULT_IMAGE_EXTENSIONS)
741 logger.debug(f"Parsing {len(filenames)} filenames in single pass...")
743 for filename in filenames:
744 parsed_info = self.microscope_handler.parser.parse_filename(str(filename))
745 if parsed_info: 745 ↛ 752line 745 didn't jump to line 752 because the condition on line 745 was always true
746 # Extract all requested components from this filename
747 for group_by in component_sets:
748 component_name = group_by.value
749 if component_name in parsed_info and parsed_info[component_name] is not None: 749 ↛ 747line 749 didn't jump to line 747 because the condition on line 749 was always true
750 component_sets[group_by].add(parsed_info[component_name])
751 else:
752 logger.warning(f"Could not parse filename: {filename}")
754 except Exception as e:
755 logger.error(f"Error listing files or parsing filenames from {self.input_dir}: {e}", exc_info=True)
756 # Initialize empty sets for failed parsing
757 for group_by in component_sets:
758 component_sets[group_by] = set()
760 # Convert sets to sorted lists and store in cache
761 for group_by, component_set in component_sets.items():
762 sorted_components = [str(comp) for comp in sorted(list(component_set))]
763 self._component_keys_cache[group_by] = sorted_components
764 logger.debug(f"Cached {len(sorted_components)} {group_by.value} keys")
766 if not sorted_components: 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true
767 logger.warning(f"No {group_by.value} values found in input directory: {self.input_dir}")
769 logger.info(f"Component key caching complete. Cached {len(component_sets)} component types in single pass.")
771 def clear_component_cache(self, components: Optional[List[GroupBy]] = None) -> None:
772 """
773 Clear cached component keys to force recomputation.
775 Use this when the input directory contents have changed and you need
776 to refresh the component key cache.
778 Args:
779 components: Optional list of GroupBy components to clear from cache.
780 If None, clears entire cache.
781 """
782 if components is None:
783 self._component_keys_cache.clear()
784 logger.info("Cleared entire component keys cache")
785 else:
786 for group_by in components:
787 if group_by in self._component_keys_cache:
788 del self._component_keys_cache[group_by]
789 logger.debug(f"Cleared cache for {group_by.value}")
790 logger.info(f"Cleared cache for {len(components)} component types")
792 def cache_metadata(self) -> None:
793 """
794 Cache all metadata from metadata handler for fast access.
796 This method calls the metadata handler's parse_metadata() method once
797 and stores the results for instant access to component key→name mappings.
798 Call this after orchestrator initialization to enable metadata-based
799 component names.
800 """
801 if not self.is_initialized() or self.input_dir is None or self.microscope_handler is None: 801 ↛ 802line 801 didn't jump to line 802 because the condition on line 801 was never true
802 raise RuntimeError("Orchestrator must be initialized before caching metadata.")
804 try:
805 # Parse all metadata once using enum→method mapping
806 # Use plate_path for metadata loading since metadata files are in plate root
807 metadata = self.microscope_handler.metadata_handler.parse_metadata(self.plate_path)
809 # Initialize all GroupBy components with component keys mapped to None
810 for group_by in [GroupBy.CHANNEL, GroupBy.WELL, GroupBy.SITE, GroupBy.Z_INDEX]:
811 # Get all component keys for this GroupBy from filename parsing
812 component_keys = self.get_component_keys(group_by)
813 # Create dict mapping each key to None (no metadata available)
814 self._metadata_cache[group_by] = {key: None for key in component_keys}
816 # Update with actual metadata from metadata handler where available
817 for component_name, mapping in metadata.items():
818 try:
819 group_by = GroupBy(component_name) # Convert string to enum
820 # For OpenHCS plates, metadata keys might be the only source of component keys
821 # Merge metadata keys with any existing component keys from filename parsing
822 if group_by in self._metadata_cache: 822 ↛ 833line 822 didn't jump to line 833 because the condition on line 822 was always true
823 # Start with existing component keys (from filename parsing)
824 combined_cache = self._metadata_cache[group_by].copy()
825 # Add any metadata keys that weren't found in filename parsing
826 for metadata_key in mapping.keys():
827 if metadata_key not in combined_cache: 827 ↛ 828line 827 didn't jump to line 828 because the condition on line 827 was never true
828 combined_cache[metadata_key] = None
829 # Update with actual metadata values
830 combined_cache.update(mapping)
831 self._metadata_cache[group_by] = combined_cache
832 else:
833 self._metadata_cache[group_by] = mapping
834 logger.debug(f"Updated metadata for {group_by.value}: {len(mapping)} entries with real data")
835 except ValueError:
836 logger.warning(f"Unknown component type in metadata: {component_name}")
838 # Log what we have for each component
839 for group_by in [GroupBy.CHANNEL, GroupBy.WELL, GroupBy.SITE, GroupBy.Z_INDEX]:
840 mapping = self._metadata_cache[group_by]
841 real_metadata_count = sum(1 for v in mapping.values() if v is not None)
842 total_keys = len(mapping)
843 logger.debug(f"Cached {group_by.value}: {total_keys} keys, {real_metadata_count} with metadata")
845 logger.info(f"Metadata caching complete. All {len(self._metadata_cache)} component types populated.")
847 except Exception as e:
848 logger.warning(f"Could not cache metadata: {e}")
849 # Don't fail - metadata is optional enhancement
851 def get_component_metadata(self, group_by: GroupBy, key: str) -> Optional[str]:
852 """
853 Get metadata display name for a specific component key.
855 Args:
856 group_by: GroupBy enum specifying component type
857 key: Component key (e.g., "1", "2", "A01")
859 Returns:
860 Display name from metadata, or None if not available
861 Example: get_component_metadata(GroupBy.CHANNEL, "1") → "HOECHST 33342"
862 """
863 if group_by in self._metadata_cache:
864 return self._metadata_cache[group_by].get(key)
865 return None
867 def clear_metadata_cache(self) -> None:
868 """
869 Clear cached metadata to force recomputation.
871 Use this when the input directory contents have changed and you need
872 to refresh the metadata cache.
873 """
874 self._metadata_cache.clear()
875 logger.info("Cleared metadata cache")
877 async def apply_new_global_config(self, new_config: GlobalPipelineConfig):
878 """
879 Apply global configuration and rebuild orchestrator-specific config if needed.
881 This method:
882 1. Updates the global config reference
883 2. Rebuilds any existing orchestrator-specific config to reference the new global config
884 3. Preserves all user-set field values while updating lazy resolution defaults
885 4. Re-initializes components that depend on config (if already initialized)
886 """
887 from openhcs.core.config import GlobalPipelineConfig as GlobalPipelineConfigType
888 if not isinstance(new_config, GlobalPipelineConfigType):
889 raise TypeError(f"Expected GlobalPipelineConfig, got {type(new_config)}")
891 old_global_config = self.global_config
892 self.global_config = new_config
894 # Rebuild orchestrator-specific config if it exists
895 if self.pipeline_config is not None:
896 from openhcs.core.lazy_config import rebuild_lazy_config_with_new_global_reference
897 self.pipeline_config = rebuild_lazy_config_with_new_global_reference(
898 self.pipeline_config,
899 new_config,
900 GlobalPipelineConfigType
901 )
902 logger.info(f"Rebuilt orchestrator-specific config for plate: {self.plate_path}")
904 # Update thread-local storage to reflect the new effective configuration
905 from openhcs.core.config import set_current_global_config
906 effective_config = self.get_effective_config()
907 set_current_global_config(GlobalPipelineConfigType, effective_config)
909 # Re-initialize components that depend on config if orchestrator was already initialized
910 if self.is_initialized():
911 logger.info(f"Re-initializing orchestrator components for plate: {self.plate_path}")
912 try:
913 # Reset initialization state to allow re-initialization
914 self._initialized = False
915 self._state = OrchestratorState.CREATED
917 # Re-initialize with new config
918 self.initialize()
919 logger.info(f"Successfully re-initialized orchestrator for plate: {self.plate_path}")
920 except Exception as e:
921 logger.error(f"Failed to re-initialize orchestrator for plate {self.plate_path}: {e}")
922 self._state = OrchestratorState.INIT_FAILED
923 raise
925 def apply_pipeline_config(self, pipeline_config: PipelineConfig) -> None:
926 """
927 Apply per-orchestrator configuration - affects only this orchestrator.
928 Does not modify global configuration or affect other orchestrators.
929 """
930 if not isinstance(pipeline_config, PipelineConfig):
931 raise TypeError(f"Expected PipelineConfig, got {type(pipeline_config)}")
932 self.pipeline_config = pipeline_config
936 # Update thread-local storage to reflect the new effective configuration
937 # This ensures MaterializationPathConfig uses the updated defaults
938 from openhcs.core.config import set_current_global_config, GlobalPipelineConfig
939 effective_config = self.get_effective_config()
940 set_current_global_config(GlobalPipelineConfig, effective_config)
942 def get_effective_config(self) -> GlobalPipelineConfig:
943 """Get effective configuration for this orchestrator."""
944 if self.pipeline_config:
945 return self.pipeline_config.to_base_config()
946 return self.global_config
948 def clear_pipeline_config(self) -> None:
949 """Clear per-orchestrator configuration."""
950 self.pipeline_config = None
951 logger.info(f"Cleared per-orchestrator config for plate: {self.plate_path}")
953 # Update thread-local storage to reflect global config
954 from openhcs.core.config import set_current_global_config, GlobalPipelineConfig
955 set_current_global_config(GlobalPipelineConfig, self.global_config)