Coverage for openhcs/core/pipeline/compiler.py: 72.7%
395 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2Pipeline module for OpenHCS.
4This module provides the core pipeline compilation components for OpenHCS.
5The PipelineCompiler is responsible for preparing step_plans within a ProcessingContext.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath)
10- Clause 17-B — Path Format Discipline
11- Clause 66 — Immutability After Construction
12- Clause 88 — No Inferred Capabilities
13- Clause 101 — Memory Type Declaration
14- Clause 245 — Path Declaration
15- Clause 273 — Backend Authorization Doctrine
16- Clause 281 — Context-Bound Identifiers
17- Clause 293 — GPU Pre-Declaration Enforcement
18- Clause 295 — GPU Scheduling Affinity
19- Clause 504 — Pipeline Preparation Modifications
20- Clause 524 — Step = Declaration = ID = Runtime Authority
21"""
23import inspect
24import logging
25import json
26import dataclasses
27from pathlib import Path
28from typing import Any, Callable, Dict, List, Optional, Union
29from collections import OrderedDict # For special_outputs and special_inputs order (used by PathPlanner)
31from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES, READ_BACKEND, WRITE_BACKEND, Backend
32from openhcs.core.context.processing_context import ProcessingContext
33from openhcs.core.config import MaterializationBackend, PathPlanningConfig, WellFilterMode
34from openhcs.core.pipeline.funcstep_contract_validator import \
35 FuncStepContractValidator
36from openhcs.core.pipeline.materialization_flag_planner import \
37 MaterializationFlagPlanner
38from openhcs.core.pipeline.path_planner import PipelinePathPlanner
39from openhcs.core.pipeline.gpu_memory_validator import \
40 GPUMemoryTypeValidator
41from openhcs.core.steps.abstract import AbstractStep
42from openhcs.core.steps.function_step import FunctionStep # Used for isinstance check
43from dataclasses import dataclass
44from typing import Callable
45logger = logging.getLogger(__name__)
48@dataclass(frozen=True)
49class FunctionReference:
50 """
51 A picklable reference to a function in the registry.
53 This replaces raw function objects in compiled step definitions to ensure
54 picklability while allowing workers to resolve functions from their registry.
55 """
56 function_name: str
57 registry_name: str
58 memory_type: str # The memory type for get_function_by_name() (e.g., "numpy", "pyclesperanto")
59 composite_key: str # The full registry key (e.g., "pyclesperanto:gaussian_blur")
61 def resolve(self) -> Callable:
62 """Resolve this reference to the actual decorated function from registry."""
63 if self.registry_name == "openhcs": 63 ↛ 73line 63 didn't jump to line 73 because the condition on line 63 was always true
64 # For OpenHCS functions, use RegistryService directly with composite key
65 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
66 all_functions = RegistryService.get_all_functions_with_metadata()
67 if self.composite_key in all_functions: 67 ↛ 70line 67 didn't jump to line 70 because the condition on line 67 was always true
68 return all_functions[self.composite_key].func
69 else:
70 raise RuntimeError(f"OpenHCS function {self.composite_key} not found in registry")
71 else:
72 # For external library functions, use the memory type for lookup
73 from openhcs.processing.func_registry import get_function_by_name
74 return get_function_by_name(self.function_name, self.memory_type)
77def _refresh_function_objects_in_steps(pipeline_definition: List[AbstractStep]) -> None:
78 """
79 Refresh all function objects in pipeline steps to ensure they're picklable.
81 This recreates function objects by importing them fresh from their original modules,
82 similar to how code mode works, which avoids unpicklable closures from registry wrapping.
83 """
84 for step in pipeline_definition:
85 if hasattr(step, 'func') and step.func is not None: 85 ↛ 84line 85 didn't jump to line 84 because the condition on line 85 was always true
86 step.func = _refresh_function_object(step.func)
89def _refresh_function_object(func_value):
90 """Convert function objects to picklable FunctionReference objects."""
91 try:
92 if callable(func_value) and hasattr(func_value, '__module__'):
93 # Single function → FunctionReference
94 return _get_function_reference(func_value)
96 elif isinstance(func_value, tuple) and len(func_value) == 2:
97 # Function with parameters tuple → (FunctionReference, params)
98 func, params = func_value
99 if callable(func):
100 func_ref = _refresh_function_object(func)
101 return (func_ref, params)
103 elif isinstance(func_value, list):
104 # List of functions → List of FunctionReferences
105 return [_refresh_function_object(item) for item in func_value]
107 elif isinstance(func_value, dict):
108 # Dict of functions → Dict of FunctionReferences
109 return {key: _refresh_function_object(value) for key, value in func_value.items()}
111 except Exception as e:
112 import logging
113 logger = logging.getLogger(__name__)
114 logger.warning(f"Failed to create function reference for {func_value}: {e}")
115 # If we can't create a reference, return original (may fail later)
116 return func_value
118 return func_value
121def _get_function_reference(func):
122 """Convert a function to a picklable FunctionReference."""
123 try:
124 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
126 # Get all function metadata to find this function
127 all_functions = RegistryService.get_all_functions_with_metadata()
129 # Find the metadata for this function by matching name and module
130 for composite_key, metadata in all_functions.items(): 130 ↛ 148line 130 didn't jump to line 148 because the loop on line 130 didn't complete
131 if (metadata.func.__name__ == func.__name__ and
132 metadata.func.__module__ == func.__module__):
133 # Create a picklable reference instead of the function object
134 return FunctionReference(
135 function_name=func.__name__,
136 registry_name=metadata.registry.library_name,
137 memory_type=metadata.registry.MEMORY_TYPE,
138 composite_key=composite_key
139 )
141 except Exception as e:
142 import logging
143 logger = logging.getLogger(__name__)
144 logger.warning(f"Failed to create function reference for {func.__name__}: {e}")
146 # If we can't create a reference, this function isn't in the registry
147 # This should not happen for properly registered functions
148 raise RuntimeError(f"Function {func.__name__} not found in registry - cannot create reference")
151def _normalize_step_attributes(pipeline_definition: List[AbstractStep]) -> None:
152 """Backwards compatibility: Set missing step attributes to constructor defaults."""
153 sig = inspect.signature(AbstractStep.__init__)
154 # Include ALL parameters with defaults, even None values
155 defaults = {name: param.default for name, param in sig.parameters.items()
156 if name != 'self' and param.default is not inspect.Parameter.empty}
158 # Add attributes that are set manually in AbstractStep.__init__ but not constructor parameters
159 manual_attributes = {
160 '__input_dir__': None,
161 '__output_dir__': None,
162 }
164 for i, step in enumerate(pipeline_definition):
165 # Set missing constructor parameters
166 for attr_name, default_value in defaults.items():
167 if not hasattr(step, attr_name): 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 setattr(step, attr_name, default_value)
170 # Set missing manual attributes (for backwards compatibility with older serialized steps)
171 for attr_name, default_value in manual_attributes.items():
172 if not hasattr(step, attr_name): 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 setattr(step, attr_name, default_value)
176class PipelineCompiler:
177 """
178 Compiles a pipeline by populating step plans within a ProcessingContext.
180 This class provides static methods that are called sequentially by the
181 PipelineOrchestrator for each well's ProcessingContext. Each method
182 is responsible for a specific part of the compilation process, such as
183 path planning, special I/O resolution, materialization flag setting,
184 memory contract validation, and GPU resource assignment.
185 """
187 @staticmethod
188 def initialize_step_plans_for_context(
189 context: ProcessingContext,
190 steps_definition: List[AbstractStep],
191 orchestrator,
192 metadata_writer: bool = False,
193 plate_path: Optional[Path] = None
194 # base_input_dir and axis_id parameters removed, will use from context
195 ) -> None:
196 """
197 Initializes step_plans by calling PipelinePathPlanner.prepare_pipeline_paths,
198 which handles primary paths, special I/O path planning and linking, and chainbreaker status.
199 Then, this method supplements the plans with non-I/O FunctionStep-specific attributes.
201 Args:
202 context: ProcessingContext to initialize step plans for
203 steps_definition: List of AbstractStep objects defining the pipeline
204 orchestrator: Orchestrator instance for well filter resolution
205 metadata_writer: If True, this well is responsible for creating OpenHCS metadata files
206 plate_path: Path to plate root for zarr conversion detection
207 """
208 # NOTE: This method is called within config_context() wrapper in compile_pipelines()
209 if context.is_frozen(): 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 raise AttributeError("Cannot initialize step plans in a frozen ProcessingContext.")
212 if not hasattr(context, 'step_plans') or context.step_plans is None: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 context.step_plans = {} # Ensure step_plans dict exists
215 # === VISUALIZER CONFIG EXTRACTION ===
216 # Extract visualizer config from orchestrator.pipeline_config
217 # The caller has already set up config_context(orchestrator.pipeline_config)
218 # so we can just access the field directly - lazy resolution happens automatically
219 context.visualizer_config = orchestrator.pipeline_config.visualizer_config
221 # === BACKWARDS COMPATIBILITY PREPROCESSING ===
222 # Ensure all steps have complete attribute sets based on AbstractStep constructor
223 # This must happen before any other compilation logic to eliminate defensive programming
224 logger.debug("🔧 BACKWARDS COMPATIBILITY: Normalizing step attributes...")
225 _normalize_step_attributes(steps_definition)
229 # Pre-initialize step_plans with basic entries for each step
230 # Use step index as key instead of step_id for multiprocessing compatibility
231 for step_index, step in enumerate(steps_definition):
232 if step_index not in context.step_plans: 232 ↛ 231line 232 didn't jump to line 231 because the condition on line 232 was always true
233 context.step_plans[step_index] = {
234 "step_name": step.name,
235 "step_type": step.__class__.__name__,
236 "axis_id": context.axis_id,
237 }
239 # === INPUT CONVERSION DETECTION ===
240 # Check if first step needs zarr conversion
241 if steps_definition and plate_path: 241 ↛ 269line 241 didn't jump to line 269 because the condition on line 241 was always true
242 first_step = steps_definition[0]
243 # Access config directly from orchestrator.pipeline_config (lazy resolution via config_context)
244 vfs_config = orchestrator.pipeline_config.vfs_config
246 # Only convert if default materialization backend is ZARR
247 wants_zarr_conversion = (
248 vfs_config.materialization_backend == MaterializationBackend.ZARR
249 )
251 if wants_zarr_conversion:
252 # Check if input plate is already zarr format
253 available_backends = context.microscope_handler.get_available_backends(plate_path)
254 already_zarr = Backend.ZARR in available_backends
256 if not already_zarr: 256 ↛ 269line 256 didn't jump to line 269 because the condition on line 256 was always true
257 # Inject input conversion config using existing PathPlanningConfig pattern
258 path_config = orchestrator.pipeline_config.path_planning_config
259 conversion_config = PathPlanningConfig(
260 output_dir_suffix="", # No suffix - write to plate root
261 global_output_folder=plate_path.parent, # Parent of plate
262 sub_dir=path_config.sub_dir # Use same sub_dir (e.g., "images")
263 )
264 context.step_plans[0]["input_conversion_config"] = conversion_config
265 logger.debug(f"Input conversion to zarr enabled for first step: {first_step.name}")
267 # The axis_id and base_input_dir are available from the context object.
268 # Path planning now gets config directly from orchestrator.pipeline_config parameter
269 PipelinePathPlanner.prepare_pipeline_paths(
270 context,
271 steps_definition,
272 orchestrator.pipeline_config
273 )
275 # === FUNCTION OBJECT REFRESH ===
276 # CRITICAL FIX: Refresh all function objects to ensure they're picklable
277 # This prevents multiprocessing pickling errors by ensuring clean function objects
278 logger.debug("🔧 FUNCTION REFRESH: Refreshing all function objects for picklability...")
279 _refresh_function_objects_in_steps(steps_definition)
281 # === LAZY CONFIG RESOLUTION ===
282 # Resolve each step's lazy configs with proper nested context
283 # This ensures step-level configs inherit from pipeline-level configs
284 # Architecture: GlobalPipelineConfig -> PipelineConfig -> Step (same as UI)
285 logger.debug("🔧 LAZY CONFIG RESOLUTION: Resolving lazy configs with nested step contexts...")
286 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
287 from openhcs.config_framework.context_manager import config_context
289 # Resolve each step individually with nested context (pipeline -> step)
290 # NOTE: The caller has already set up config_context(orchestrator.pipeline_config)
291 # We add step-level context on top for each step
292 resolved_steps = []
293 for step in steps_definition:
294 with config_context(step): # Step-level context on top of pipeline context
295 resolved_step = resolve_lazy_configurations_for_serialization(step)
296 resolved_steps.append(resolved_step)
297 steps_definition = resolved_steps
299 # Loop to supplement step_plans with non-I/O, non-path attributes
300 # after PipelinePathPlanner has fully populated them with I/O info.
301 for step_index, step in enumerate(steps_definition):
302 if step_index not in context.step_plans: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 logger.error(
304 f"Critical error: Step {step.name} (index: {step_index}) "
305 f"not found in step_plans after path planning phase. Clause 504."
306 )
307 # Create a minimal error plan
308 context.step_plans[step_index] = {
309 "step_name": step.name,
310 "step_type": step.__class__.__name__,
311 "axis_id": context.axis_id, # Use context.axis_id
312 "error": "Missing from path planning phase by PipelinePathPlanner",
313 "create_openhcs_metadata": metadata_writer # Set metadata writer responsibility flag
314 }
315 continue
317 current_plan = context.step_plans[step_index]
319 # Ensure basic metadata (PathPlanner should set most of this)
320 current_plan["step_name"] = step.name
321 current_plan["step_type"] = step.__class__.__name__
322 current_plan["axis_id"] = context.axis_id # Use context.axis_id; PathPlanner should also use context.axis_id
323 current_plan.setdefault("visualize", False) # Ensure visualize key exists
324 current_plan["create_openhcs_metadata"] = metadata_writer # Set metadata writer responsibility flag
326 # The special_outputs and special_inputs are now fully handled by PipelinePathPlanner.
327 # The block for planning special_outputs (lines 134-148 in original) is removed.
328 # Ensure these keys exist as OrderedDicts if PathPlanner doesn't guarantee it
329 # (PathPlanner currently creates them as dicts, OrderedDict might not be strictly needed here anymore)
330 current_plan.setdefault("special_inputs", OrderedDict())
331 current_plan.setdefault("special_outputs", OrderedDict())
332 current_plan.setdefault("chainbreaker", False) # PathPlanner now sets this.
334 # Add step-specific attributes (non-I/O, non-path related)
335 current_plan["variable_components"] = step.variable_components
336 current_plan["group_by"] = step.group_by
337 # Lazy configs were already resolved at the beginning of compilation
338 resolved_step = step
340 # DEBUG: Check what the resolved napari config actually has
341 if hasattr(resolved_step, 'napari_streaming_config') and resolved_step.napari_streaming_config: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true
342 logger.debug(f"resolved_step.napari_streaming_config.well_filter = {resolved_step.napari_streaming_config.well_filter}")
343 if hasattr(resolved_step, 'step_well_filter_config') and resolved_step.step_well_filter_config: 343 ↛ 345line 343 didn't jump to line 345 because the condition on line 343 was always true
344 logger.debug(f"resolved_step.step_well_filter_config.well_filter = {resolved_step.step_well_filter_config.well_filter}")
345 if hasattr(resolved_step, 'step_materialization_config') and resolved_step.step_materialization_config:
346 logger.debug(f"resolved_step.step_materialization_config.sub_dir = '{resolved_step.step_materialization_config.sub_dir}' (type: {type(resolved_step.step_materialization_config).__name__})")
348 # Store WellFilterConfig instances only if they match the current axis
349 from openhcs.core.config import WellFilterConfig, StreamingConfig, WellFilterMode
350 has_streaming = False
351 required_visualizers = getattr(context, 'required_visualizers', [])
353 # CRITICAL FIX: Ensure required_visualizers is always set on context
354 # This prevents AttributeError during execution phase
355 if not hasattr(context, 'required_visualizers'):
356 context.required_visualizers = []
358 # Get step axis filters for this step
359 step_axis_filters = getattr(context, 'step_axis_filters', {}).get(step_index, {})
361 logger.debug(f"Processing step '{step.name}' with attributes: {[attr for attr in dir(resolved_step) if not attr.startswith('_') and 'config' in attr]}")
362 if step.name == "Image Enhancement Processing":
363 logger.debug(f"All attributes for {step.name}: {[attr for attr in dir(resolved_step) if not attr.startswith('_')]}")
365 for attr_name in dir(resolved_step):
366 if not attr_name.startswith('_'):
367 config = getattr(resolved_step, attr_name, None)
368 # Configs are already resolved to base configs at line 277
369 # No need to call to_base_config() again - that's legacy code
371 # Unified handling: compute inclusion for any WellFilterConfig (StreamingConfig subclasses it)
372 is_streaming = config is not None and isinstance(config, StreamingConfig)
373 is_wellfilter = config is not None and isinstance(config, WellFilterConfig)
375 include_config = False
376 if is_wellfilter:
377 # Check if this config has a well filter and if current axis matches
378 should_include_config = True
379 if config.well_filter is not None: 379 ↛ 388line 379 didn't jump to line 388 because the condition on line 379 was always true
380 config_filter = step_axis_filters.get(attr_name)
381 if config_filter: 381 ↛ 388line 381 didn't jump to line 388 because the condition on line 381 was always true
382 # Apply axis filter logic
383 axis_in_filter = context.axis_id in config_filter['resolved_axis_values']
384 should_include_config = (
385 axis_in_filter if config_filter['filter_mode'] == WellFilterMode.INCLUDE
386 else not axis_in_filter
387 )
388 if should_include_config:
389 include_config = True
391 # Only add the config to the plan if it's included (or not a WellFilterConfig)
392 if include_config or (not is_wellfilter and config is not None):
393 current_plan[attr_name] = config
395 # Streaming extras only apply if the streaming config is included
396 if is_streaming and include_config: 396 ↛ 398line 396 didn't jump to line 398 because the condition on line 396 was never true
397 # Validate that the visualizer can actually be created
398 try:
399 # Only validate configs that actually have a backend (real streaming configs)
400 if not hasattr(config, 'backend'):
401 continue
403 # Test visualizer creation without actually creating it
404 if hasattr(config, 'create_visualizer'):
405 # For napari, check if napari is available and environment supports GUI
406 if config.backend.name == 'NAPARI_STREAM':
407 from openhcs.utils.import_utils import optional_import
408 import os
410 # Check if running in headless/CI environment
411 is_headless = (
412 os.getenv('OPENHCS_CPU_ONLY', 'false').lower() == 'true' or
413 os.getenv('CI', 'false').lower() == 'true' or
414 os.getenv('DISPLAY') is None
415 )
417 if is_headless:
418 logger.info(f"Napari streaming disabled for step '{step.name}': running in headless environment (CI/CPU_ONLY mode)")
419 continue # Skip this streaming config
421 napari = optional_import("napari")
422 if napari is None:
423 logger.warning(f"Napari streaming disabled for step '{step.name}': napari not installed. Install with: pip install 'openhcs[viz]' or pip install napari")
424 continue # Skip this streaming config
426 has_streaming = True
427 # Collect visualizer info
428 visualizer_info = {
429 'backend': config.backend.name,
430 'config': config
431 }
432 if visualizer_info not in required_visualizers:
433 required_visualizers.append(visualizer_info)
434 except Exception as e:
435 logger.warning(f"Streaming disabled for step '{step.name}': {e}")
436 continue # Skip this streaming config
438 # Set visualize flag for orchestrator if any streaming is enabled
439 current_plan["visualize"] = has_streaming
440 context.required_visualizers = required_visualizers
442 # Add FunctionStep specific attributes
443 if isinstance(step, FunctionStep): 443 ↛ exitline 443 didn't return from function 'initialize_step_plans_for_context' because the condition on line 443 was always true
445 # 🎯 SEMANTIC COHERENCE FIX: Prevent group_by/variable_components conflict
446 # When variable_components contains the same value as group_by,
447 # set group_by to None to avoid EZStitcher heritage rule violation
448 if (step.variable_components and step.group_by and 448 ↛ 450line 448 didn't jump to line 450 because the condition on line 448 was never true
449 step.group_by in step.variable_components):
450 logger.debug(f"Step {step.name}: Detected group_by='{step.group_by}' in variable_components={step.variable_components}. "
451 f"Setting group_by=None to maintain semantic coherence.")
452 current_plan["group_by"] = None
454 # func attribute is guaranteed in FunctionStep.__init__
455 current_plan["func_name"] = getattr(step.func, '__name__', str(step.func))
457 # Memory type hints from step instance (set in FunctionStep.__init__ if provided)
458 # These are initial hints; FuncStepContractValidator will set final types.
459 if hasattr(step, 'input_memory_type_hint'): # From FunctionStep.__init__ 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true
460 current_plan['input_memory_type_hint'] = step.input_memory_type_hint
461 if hasattr(step, 'output_memory_type_hint'): # From FunctionStep.__init__ 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true
462 current_plan['output_memory_type_hint'] = step.output_memory_type_hint
464 # The resolve_special_input_paths_for_context static method is DELETED (lines 181-238 of original)
465 # as this functionality is now handled by PipelinePathPlanner.prepare_pipeline_paths.
467 # _prepare_materialization_flags is removed as MaterializationFlagPlanner.prepare_pipeline_flags
468 # now modifies context.step_plans in-place and takes context directly.
470 @staticmethod
471 def declare_zarr_stores_for_context(
472 context: ProcessingContext,
473 steps_definition: List[AbstractStep],
474 orchestrator
475 ) -> None:
476 """
477 Declare zarr store creation functions for runtime execution.
479 This method runs after path planning but before materialization flag planning
480 to declare which steps need zarr stores and provide the metadata needed
481 for runtime store creation.
483 Args:
484 context: ProcessingContext for current well
485 steps_definition: List of AbstractStep objects
486 orchestrator: Orchestrator instance for accessing all wells
487 """
488 from openhcs.constants.constants import Backend
489 from openhcs.constants import MULTIPROCESSING_AXIS
491 all_wells = orchestrator.get_component_keys(MULTIPROCESSING_AXIS)
493 # Access config directly from orchestrator.pipeline_config (lazy resolution via config_context)
494 vfs_config = orchestrator.pipeline_config.vfs_config
496 for step_index, step in enumerate(steps_definition):
497 step_plan = context.step_plans[step_index]
499 will_use_zarr = (
500 vfs_config.materialization_backend == MaterializationBackend.ZARR and
501 step_index == len(steps_definition) - 1
502 )
504 if will_use_zarr:
505 step_plan["zarr_config"] = {
506 "all_wells": all_wells,
507 "needs_initialization": True
508 }
509 logger.debug(f"Step '{step.name}' will use zarr backend for axis {context.axis_id}")
510 else:
511 step_plan["zarr_config"] = None
513 @staticmethod
514 def plan_materialization_flags_for_context(
515 context: ProcessingContext,
516 steps_definition: List[AbstractStep],
517 orchestrator
518 ) -> None:
519 """
520 Plans and injects materialization flags into context.step_plans
521 by calling MaterializationFlagPlanner.
522 """
523 if context.is_frozen(): 523 ↛ 524line 523 didn't jump to line 524 because the condition on line 523 was never true
524 raise AttributeError("Cannot plan materialization flags in a frozen ProcessingContext.")
525 if not context.step_plans: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 logger.warning("step_plans is empty in context for materialization planning. This may be valid if pipeline is empty.")
527 return
529 # MaterializationFlagPlanner.prepare_pipeline_flags now takes context and pipeline_definition
530 # and modifies context.step_plans in-place.
531 MaterializationFlagPlanner.prepare_pipeline_flags(
532 context,
533 steps_definition,
534 orchestrator.plate_path,
535 orchestrator.pipeline_config
536 )
538 # Post-check (optional, but good for ensuring contracts are met by the planner)
539 for step_index, step in enumerate(steps_definition):
540 if step_index not in context.step_plans: 540 ↛ 542line 540 didn't jump to line 542 because the condition on line 540 was never true
541 # This should not happen if prepare_pipeline_flags guarantees plans for all steps
542 logger.error(f"Step {step.name} (index: {step_index}) missing from step_plans after materialization planning.")
543 continue
545 plan = context.step_plans[step_index]
546 # Check for keys that FunctionStep actually uses during execution
547 required_keys = [READ_BACKEND, WRITE_BACKEND]
548 if not all(k in plan for k in required_keys): 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true
549 missing_keys = [k for k in required_keys if k not in plan]
550 logger.error(
551 f"Materialization flag planning incomplete for step {step.name} (index: {step_index}). "
552 f"Missing required keys: {missing_keys} (Clause 273)."
553 )
556 @staticmethod
557 def validate_memory_contracts_for_context(
558 context: ProcessingContext,
559 steps_definition: List[AbstractStep],
560 orchestrator=None
561 ) -> None:
562 """
563 Validates FunctionStep memory contracts, dict patterns, and adds memory type info to context.step_plans.
565 Args:
566 context: ProcessingContext to validate
567 steps_definition: List of AbstractStep objects
568 orchestrator: Optional orchestrator for dict pattern key validation
569 """
570 if context.is_frozen(): 570 ↛ 571line 570 didn't jump to line 571 because the condition on line 570 was never true
571 raise AttributeError("Cannot validate memory contracts in a frozen ProcessingContext.")
573 # FuncStepContractValidator might need access to input/output_memory_type_hint from plan
574 step_memory_types = FuncStepContractValidator.validate_pipeline(
575 steps=steps_definition,
576 pipeline_context=context, # Pass context so validator can access step plans for memory type overrides
577 orchestrator=orchestrator # Pass orchestrator for dict pattern key validation
578 )
580 for step_index, memory_types in step_memory_types.items():
581 if "input_memory_type" not in memory_types or "output_memory_type" not in memory_types: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true
582 step_name = context.step_plans[step_index]["step_name"]
583 raise AssertionError(
584 f"Memory type validation must set input/output_memory_type for FunctionStep {step_name} (index: {step_index}) (Clause 101)."
585 )
586 if step_index in context.step_plans: 586 ↛ 589line 586 didn't jump to line 589 because the condition on line 586 was always true
587 context.step_plans[step_index].update(memory_types)
588 else:
589 logger.warning(f"Step index {step_index} found in memory_types but not in context.step_plans. Skipping.")
591 # Apply memory type override: Any step with disk output must use numpy for disk writing
592 for step_index, step in enumerate(steps_definition):
593 if isinstance(step, FunctionStep): 593 ↛ 592line 593 didn't jump to line 592 because the condition on line 593 was always true
594 if step_index in context.step_plans: 594 ↛ 592line 594 didn't jump to line 592 because the condition on line 594 was always true
595 step_plan = context.step_plans[step_index]
596 is_last_step = (step_index == len(steps_definition) - 1)
597 write_backend = step_plan['write_backend']
599 if write_backend == 'disk':
600 logger.debug(f"Step {step.name} has disk output, overriding output_memory_type to numpy")
601 step_plan['output_memory_type'] = 'numpy'
605 @staticmethod
606 def assign_gpu_resources_for_context(
607 context: ProcessingContext
608 ) -> None:
609 """
610 Validates GPU memory types from context.step_plans and assigns GPU device IDs.
611 (Unchanged from previous version)
612 """
613 if context.is_frozen(): 613 ↛ 614line 613 didn't jump to line 614 because the condition on line 613 was never true
614 raise AttributeError("Cannot assign GPU resources in a frozen ProcessingContext.")
616 gpu_assignments = GPUMemoryTypeValidator.validate_step_plans(context.step_plans)
618 for step_index, step_plan_val in context.step_plans.items(): # Renamed step_plan to step_plan_val to avoid conflict
619 is_gpu_step = False
620 input_type = step_plan_val["input_memory_type"]
621 if input_type in VALID_GPU_MEMORY_TYPES: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true
622 is_gpu_step = True
624 output_type = step_plan_val["output_memory_type"]
625 if output_type in VALID_GPU_MEMORY_TYPES: 625 ↛ 626line 625 didn't jump to line 626 because the condition on line 625 was never true
626 is_gpu_step = True
628 if is_gpu_step: 628 ↛ 631line 628 didn't jump to line 631 because the condition on line 628 was never true
629 # Ensure gpu_assignments has an entry for this step_index if it's a GPU step
630 # And that entry contains a 'gpu_id'
631 step_gpu_assignment = gpu_assignments[step_index]
632 if "gpu_id" not in step_gpu_assignment:
633 step_name = step_plan_val["step_name"]
634 raise AssertionError(
635 f"GPU validation must assign gpu_id for step {step_name} (index: {step_index}) "
636 f"with GPU memory types (Clause 295)."
637 )
639 for step_index, gpu_assignment in gpu_assignments.items(): 639 ↛ 640line 639 didn't jump to line 640 because the loop on line 639 never started
640 if step_index in context.step_plans:
641 context.step_plans[step_index].update(gpu_assignment)
642 else:
643 logger.warning(f"Step index {step_index} found in gpu_assignments but not in context.step_plans. Skipping.")
645 @staticmethod
646 def apply_global_visualizer_override_for_context(
647 context: ProcessingContext,
648 global_enable_visualizer: bool
649 ) -> None:
650 """
651 Applies global visualizer override to all step_plans in the context.
652 (Unchanged from previous version)
653 """
654 if context.is_frozen():
655 raise AttributeError("Cannot apply visualizer override in a frozen ProcessingContext.")
657 if global_enable_visualizer:
658 if not context.step_plans: return # Guard against empty step_plans
659 for step_index, plan in context.step_plans.items():
660 plan["visualize"] = True
661 logger.info(f"Global visualizer override: Step '{plan['step_name']}' marked for visualization.")
663 @staticmethod
664 def resolve_lazy_dataclasses_for_context(context: ProcessingContext, orchestrator) -> None:
665 """
666 Resolve all lazy dataclass instances in step plans to their base configurations.
668 This method should be called after all compilation phases but before context
669 freezing to ensure step plans are safe for pickling in multiprocessing contexts.
671 NOTE: The caller MUST have already set up config_context(orchestrator.pipeline_config)
672 before calling this method. We rely on that context for lazy resolution.
674 Args:
675 context: ProcessingContext to process
676 orchestrator: PipelineOrchestrator (unused - kept for API compatibility)
677 """
678 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
680 # Resolve the entire context recursively to catch all lazy dataclass instances
681 # The caller has already set up config_context(), so lazy resolution happens automatically
682 resolved_context_dict = resolve_lazy_configurations_for_serialization(vars(context))
684 # Update context attributes with resolved values
685 for attr_name, resolved_value in resolved_context_dict.items():
686 if not attr_name.startswith('_'): # Skip private attributes
687 setattr(context, attr_name, resolved_value)
689 @staticmethod
690 def compile_pipelines(
691 orchestrator,
692 pipeline_definition: List[AbstractStep],
693 axis_filter: Optional[List[str]] = None,
694 enable_visualizer_override: bool = False
695 ) -> Dict[str, ProcessingContext]:
696 """
697 Compile-all phase: Prepares frozen ProcessingContexts for each axis value.
699 This method iterates through the specified axis values, creates a ProcessingContext
700 for each, and invokes the various phases of the PipelineCompiler to populate
701 the context's step_plans. After all compilation phases for an axis value are complete,
702 its context is frozen. Finally, attributes are stripped from the pipeline_definition,
703 making the step objects stateless for the execution phase.
705 Args:
706 orchestrator: The PipelineOrchestrator instance to use for compilation
707 pipeline_definition: The list of AbstractStep objects defining the pipeline.
708 axis_filter: Optional list of axis values to process. If None, processes all found axis values.
709 enable_visualizer_override: If True, all steps in all compiled contexts
710 will have their 'visualize' flag set to True.
712 Returns:
713 A dictionary mapping axis values to their compiled and frozen ProcessingContexts.
714 The input `pipeline_definition` list (of step objects) is modified in-place
715 to become stateless.
716 """
717 from openhcs.constants.constants import VariableComponents, OrchestratorState
718 from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper
720 if not orchestrator.is_initialized(): 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 raise RuntimeError("PipelineOrchestrator must be explicitly initialized before calling compile_pipelines().")
723 if not pipeline_definition: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 raise ValueError("A valid pipeline definition (List[AbstractStep]) must be provided.")
726 try:
727 compiled_contexts: Dict[str, ProcessingContext] = {}
728 # Get multiprocessing axis values dynamically from configuration
729 from openhcs.constants import MULTIPROCESSING_AXIS
730 axis_values_to_process = orchestrator.get_component_keys(MULTIPROCESSING_AXIS, axis_filter)
732 if not axis_values_to_process: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 logger.warning("No axis values found to process based on filter.")
734 return {
735 'pipeline_definition': pipeline_definition,
736 'compiled_contexts': {}
737 }
739 logger.info(f"Starting compilation for axis values: {', '.join(axis_values_to_process)}")
741 # === GLOBAL AXIS FILTER RESOLUTION ===
742 # Resolve axis filters once for all axis values to ensure step-level inheritance works
743 logger.debug("🔧 LAZY CONFIG RESOLUTION: Resolving lazy configs for axis filter resolution...")
744 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
745 from openhcs.config_framework.context_manager import config_context
747 # Resolve each step with nested context (same as initialize_step_plans_for_context)
748 # This ensures step-level configs inherit from pipeline-level configs
749 resolved_steps_for_filters = []
750 with config_context(orchestrator.pipeline_config):
751 for step in pipeline_definition:
752 with config_context(step): # Step-level context on top of pipeline context
753 resolved_step = resolve_lazy_configurations_for_serialization(step)
754 resolved_steps_for_filters.append(resolved_step)
756 logger.debug("🎯 AXIS FILTER RESOLUTION: Resolving step axis filters...")
757 # Create a temporary context to store the global axis filters
758 temp_context = orchestrator.create_context("temp")
760 # Use orchestrator context during axis filter resolution
761 # This ensures that lazy config resolution uses the orchestrator context
762 from openhcs.config_framework.context_manager import config_context
763 with config_context(orchestrator.pipeline_config):
764 _resolve_step_axis_filters(resolved_steps_for_filters, temp_context, orchestrator)
765 global_step_axis_filters = getattr(temp_context, 'step_axis_filters', {})
767 # Determine responsible axis value for metadata creation (lexicographically first)
768 responsible_axis_value = sorted(axis_values_to_process)[0] if axis_values_to_process else None
769 logger.debug(f"Designated responsible axis value for metadata creation: {responsible_axis_value}")
771 for axis_id in axis_values_to_process:
772 logger.debug(f"Compiling for axis value: {axis_id}")
773 context = orchestrator.create_context(axis_id)
775 # Copy global axis filters to this context
776 context.step_axis_filters = global_step_axis_filters
778 # Determine if this axis value is responsible for metadata creation
779 is_responsible = (axis_id == responsible_axis_value)
780 logger.debug(f"Axis {axis_id} metadata responsibility: {is_responsible}")
782 # CRITICAL: Wrap all compilation steps in config_context() for lazy resolution
783 from openhcs.config_framework.context_manager import config_context
784 with config_context(orchestrator.pipeline_config):
785 PipelineCompiler.initialize_step_plans_for_context(context, pipeline_definition, orchestrator, metadata_writer=is_responsible, plate_path=orchestrator.plate_path)
786 PipelineCompiler.declare_zarr_stores_for_context(context, pipeline_definition, orchestrator)
787 PipelineCompiler.plan_materialization_flags_for_context(context, pipeline_definition, orchestrator)
788 PipelineCompiler.validate_memory_contracts_for_context(context, pipeline_definition, orchestrator)
789 PipelineCompiler.assign_gpu_resources_for_context(context)
791 if enable_visualizer_override: 791 ↛ 792line 791 didn't jump to line 792 because the condition on line 791 was never true
792 PipelineCompiler.apply_global_visualizer_override_for_context(context, True)
794 # Resolve all lazy dataclasses before freezing to ensure multiprocessing compatibility
795 PipelineCompiler.resolve_lazy_dataclasses_for_context(context, orchestrator)
801 context.freeze()
802 compiled_contexts[axis_id] = context
803 logger.debug(f"Compilation finished for axis value: {axis_id}")
805 # Log path planning summary once per plate
806 if compiled_contexts: 806 ↛ 823line 806 didn't jump to line 823 because the condition on line 806 was always true
807 first_context = next(iter(compiled_contexts.values()))
808 logger.info(f"📁 PATH PLANNING SUMMARY:")
809 logger.info(f" Main pipeline output: {first_context.output_plate_root}")
811 # Check for materialization steps in first context
812 materialization_steps = []
813 for step_id, plan in first_context.step_plans.items():
814 if 'materialized_output_dir' in plan:
815 step_name = plan.get('step_name', f'step_{step_id}')
816 mat_path = plan['materialized_output_dir']
817 materialization_steps.append((step_name, mat_path))
819 for step_name, mat_path in materialization_steps:
820 logger.info(f" Materialization {step_name}: {mat_path}")
822 # After processing all wells, strip attributes and finalize
823 logger.info("Stripping attributes from pipeline definition steps.")
824 StepAttributeStripper.strip_step_attributes(pipeline_definition, {})
826 orchestrator._state = OrchestratorState.COMPILED
828 # Log worker configuration for execution planning
829 effective_config = orchestrator.get_effective_config()
830 logger.info(f"⚙️ EXECUTION CONFIG: {effective_config.num_workers} workers configured for pipeline execution")
832 logger.info(f"🏁 COMPILATION COMPLETE: {len(compiled_contexts)} wells compiled successfully")
834 # Return expected structure with both pipeline_definition and compiled_contexts
835 return {
836 'pipeline_definition': pipeline_definition,
837 'compiled_contexts': compiled_contexts
838 }
839 except Exception as e:
840 orchestrator._state = OrchestratorState.COMPILE_FAILED
841 logger.error(f"Failed to compile pipelines: {e}")
842 raise
846# The monolithic compile() method is removed.
847# Orchestrator will call the static methods above in sequence.
848# _strip_step_attributes is also removed as StepAttributeStripper is called by Orchestrator.
851def _resolve_step_axis_filters(resolved_steps: List[AbstractStep], context, orchestrator):
852 """
853 Resolve axis filters for steps with any WellFilterConfig instances.
855 This function handles step-level axis filtering by resolving patterns like
856 "row:A", ["A01", "B02"], or max counts against the available axis values for the plate.
857 It processes ALL WellFilterConfig instances (materialization, streaming, etc.) uniformly.
859 Args:
860 resolved_steps: List of pipeline steps with lazy configs already resolved
861 context: Processing context for the current axis value
862 orchestrator: Orchestrator instance with access to available axis values
863 """
864 from openhcs.core.utils import WellFilterProcessor
865 from openhcs.core.config import WellFilterConfig
867 # Get available axis values from orchestrator using multiprocessing axis
868 from openhcs.constants import MULTIPROCESSING_AXIS
869 available_axis_values = orchestrator.get_component_keys(MULTIPROCESSING_AXIS)
870 if not available_axis_values: 870 ↛ 871line 870 didn't jump to line 871 because the condition on line 870 was never true
871 logger.warning("No available axis values found for axis filter resolution")
872 return
874 # Initialize step_axis_filters in context if not present
875 if not hasattr(context, 'step_axis_filters'): 875 ↛ 879line 875 didn't jump to line 879 because the condition on line 875 was always true
876 context.step_axis_filters = {}
878 # Process each step for ALL WellFilterConfig instances using the already resolved steps
879 for step_index, resolved_step in enumerate(resolved_steps):
880 step_filters = {}
882 # Check all attributes for WellFilterConfig instances on the RESOLVED step
883 for attr_name in dir(resolved_step):
884 if not attr_name.startswith('_'):
885 config = getattr(resolved_step, attr_name, None)
886 if config is not None and isinstance(config, WellFilterConfig) and config.well_filter is not None:
887 try:
888 # Resolve the axis filter pattern to concrete axis values
889 resolved_axis_values = WellFilterProcessor.resolve_compilation_filter(
890 config.well_filter,
891 available_axis_values
892 )
894 # Store resolved axis values for this config
895 step_filters[attr_name] = {
896 'resolved_axis_values': sorted(resolved_axis_values),
897 'filter_mode': config.well_filter_mode,
898 'original_filter': config.well_filter
899 }
901 logger.debug(f"Step '{resolved_step.name}' {attr_name} filter '{config.well_filter}' "
902 f"resolved to {len(resolved_axis_values)} axis values: {sorted(resolved_axis_values)}")
903 logger.debug(f"Step '{resolved_step.name}' {attr_name} filter '{config.well_filter}' "
904 f"resolved to {len(resolved_axis_values)} axis values: {sorted(resolved_axis_values)}")
906 except Exception as e:
907 logger.error(f"Failed to resolve axis filter for step '{resolved_step.name}' {attr_name}: {e}")
908 raise ValueError(f"Invalid axis filter '{config.well_filter}' "
909 f"for step '{resolved_step.name}' {attr_name}: {e}")
911 # Store step filters if any were found
912 if step_filters: 912 ↛ 879line 912 didn't jump to line 879 because the condition on line 912 was always true
913 context.step_axis_filters[step_index] = step_filters
915 total_filters = sum(len(filters) for filters in context.step_axis_filters.values())
916 logger.debug(f"Axis filter resolution complete. {len(context.step_axis_filters)} steps have axis filters, {total_filters} total filters.")
919def _should_process_for_well(axis_id, well_filter_config):
920 """Unified well filtering logic for all WellFilterConfig systems."""
921 if well_filter_config.well_filter is None:
922 return True
924 well_in_filter = axis_id in well_filter_config.well_filter
925 return well_in_filter if well_filter_config.well_filter_mode == WellFilterMode.INCLUDE else not well_in_filter