Coverage for openhcs/core/pipeline/compiler.py: 75.5%
468 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Pipeline module for OpenHCS.
4This module provides the core pipeline compilation components for OpenHCS.
5The PipelineCompiler is responsible for preparing step_plans within a ProcessingContext.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath)
10- Clause 17-B — Path Format Discipline
11- Clause 66 — Immutability After Construction
12- Clause 88 — No Inferred Capabilities
13- Clause 101 — Memory Type Declaration
14- Clause 245 — Path Declaration
15- Clause 273 — Backend Authorization Doctrine
16- Clause 281 — Context-Bound Identifiers
17- Clause 293 — GPU Pre-Declaration Enforcement
18- Clause 295 — GPU Scheduling Affinity
19- Clause 504 — Pipeline Preparation Modifications
20- Clause 524 — Step = Declaration = ID = Runtime Authority
21"""
23import inspect
24import logging
25import dataclasses
26from pathlib import Path
27from typing import Callable, Dict, List, Optional
28from collections import OrderedDict # For special_outputs and special_inputs order (used by PathPlanner)
30from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES, READ_BACKEND, WRITE_BACKEND, Backend
31from openhcs.core.context.processing_context import ProcessingContext
32from openhcs.core.config import MaterializationBackend, PathPlanningConfig, WellFilterMode
33from openhcs.core.pipeline.funcstep_contract_validator import \
34 FuncStepContractValidator
35from openhcs.core.pipeline.materialization_flag_planner import \
36 MaterializationFlagPlanner
37from openhcs.core.pipeline.path_planner import PipelinePathPlanner
38from openhcs.core.pipeline.gpu_memory_validator import \
39 GPUMemoryTypeValidator
40from openhcs.core.steps.abstract import AbstractStep
41from openhcs.core.steps.function_step import FunctionStep # Used for isinstance check
42from dataclasses import dataclass
43logger = logging.getLogger(__name__)
46@dataclass(frozen=True)
47class FunctionReference:
48 """
49 A picklable reference to a function in the registry.
51 This replaces raw function objects in compiled step definitions to ensure
52 picklability while allowing workers to resolve functions from their registry.
53 """
54 function_name: str
55 registry_name: str
56 memory_type: str # The memory type for get_function_by_name() (e.g., "numpy", "pyclesperanto")
57 composite_key: str # The full registry key (e.g., "pyclesperanto:gaussian_blur")
59 def resolve(self) -> Callable:
60 """Resolve this reference to the actual decorated function from registry."""
61 if self.registry_name == "openhcs": 61 ↛ 71line 61 didn't jump to line 71 because the condition on line 61 was always true
62 # For OpenHCS functions, use RegistryService directly with composite key
63 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
64 all_functions = RegistryService.get_all_functions_with_metadata()
65 if self.composite_key in all_functions: 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was always true
66 return all_functions[self.composite_key].func
67 else:
68 raise RuntimeError(f"OpenHCS function {self.composite_key} not found in registry")
69 else:
70 # For external library functions, use the memory type for lookup
71 from openhcs.processing.func_registry import get_function_by_name
72 return get_function_by_name(self.function_name, self.memory_type)
75def _refresh_function_objects_in_steps(pipeline_definition: List[AbstractStep]) -> None:
76 """
77 Refresh all function objects in pipeline steps to ensure they're picklable.
79 This recreates function objects by importing them fresh from their original modules,
80 similar to how code mode works, which avoids unpicklable closures from registry wrapping.
81 """
82 for step in pipeline_definition:
83 if hasattr(step, 'func') and step.func is not None: 83 ↛ 82line 83 didn't jump to line 82 because the condition on line 83 was always true
84 step.func = _refresh_function_object(step.func)
87def _refresh_function_object(func_value):
88 """Convert function objects to picklable FunctionReference objects.
90 Also filters out functions with enabled=False at compile time.
91 """
92 try:
93 if callable(func_value) and hasattr(func_value, '__module__'):
94 # Single function → FunctionReference
95 return _get_function_reference(func_value)
97 elif isinstance(func_value, tuple) and len(func_value) == 2:
98 # Function with parameters tuple → (FunctionReference, params)
99 func, params = func_value
101 # Check if function is disabled via enabled parameter
102 if isinstance(params, dict) and params.get('enabled', True) is False: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 import logging
104 logger = logging.getLogger(__name__)
105 func_name = getattr(func, '__name__', str(func))
106 logger.info(f"🔧 COMPILE-TIME FILTER: Removing disabled function '{func_name}' from pipeline")
107 return None # Mark for removal
109 if callable(func):
110 func_ref = _refresh_function_object(func)
111 # Remove 'enabled' from params since it's not a real function parameter
112 if isinstance(params, dict) and 'enabled' in params: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 params = {k: v for k, v in params.items() if k != 'enabled'}
114 return (func_ref, params)
116 elif isinstance(func_value, list):
117 # List of functions → List of FunctionReferences (filter out None)
118 refreshed = [_refresh_function_object(item) for item in func_value]
119 return [item for item in refreshed if item is not None]
121 elif isinstance(func_value, dict):
122 # Dict of functions → Dict of FunctionReferences (filter out None values)
123 refreshed = {key: _refresh_function_object(value) for key, value in func_value.items()}
124 return {key: value for key, value in refreshed.items() if value is not None}
126 except Exception as e:
127 import logging
128 logger = logging.getLogger(__name__)
129 logger.warning(f"Failed to create function reference for {func_value}: {e}")
130 # If we can't create a reference, return original (may fail later)
131 return func_value
133 return func_value
136def _get_function_reference(func):
137 """Convert a function to a picklable FunctionReference."""
138 try:
139 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
141 # Get all function metadata to find this function
142 all_functions = RegistryService.get_all_functions_with_metadata()
144 # Find the metadata for this function by matching name and module
145 for composite_key, metadata in all_functions.items(): 145 ↛ 163line 145 didn't jump to line 163 because the loop on line 145 didn't complete
146 if (metadata.func.__name__ == func.__name__ and
147 metadata.func.__module__ == func.__module__):
148 # Create a picklable reference instead of the function object
149 return FunctionReference(
150 function_name=func.__name__,
151 registry_name=metadata.registry.library_name,
152 memory_type=metadata.registry.MEMORY_TYPE,
153 composite_key=composite_key
154 )
156 except Exception as e:
157 import logging
158 logger = logging.getLogger(__name__)
159 logger.warning(f"Failed to create function reference for {func.__name__}: {e}")
161 # If we can't create a reference, this function isn't in the registry
162 # This should not happen for properly registered functions
163 raise RuntimeError(f"Function {func.__name__} not found in registry - cannot create reference")
166def _normalize_step_attributes(pipeline_definition: List[AbstractStep]) -> None:
167 """Backwards compatibility: Set missing step attributes to constructor defaults."""
168 sig = inspect.signature(AbstractStep.__init__)
169 # Include ALL parameters with defaults, even None values
170 defaults = {name: param.default for name, param in sig.parameters.items()
171 if name != 'self' and param.default is not inspect.Parameter.empty}
173 # Add attributes that are set manually in AbstractStep.__init__ but not constructor parameters
174 manual_attributes = {
175 '__input_dir__': None,
176 '__output_dir__': None,
177 }
179 for i, step in enumerate(pipeline_definition):
180 # Set missing constructor parameters
181 for attr_name, default_value in defaults.items():
182 if not hasattr(step, attr_name): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 setattr(step, attr_name, default_value)
185 # Set missing manual attributes (for backwards compatibility with older serialized steps)
186 for attr_name, default_value in manual_attributes.items():
187 if not hasattr(step, attr_name): 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 setattr(step, attr_name, default_value)
191class PipelineCompiler:
192 """
193 Compiles a pipeline by populating step plans within a ProcessingContext.
195 This class provides static methods that are called sequentially by the
196 PipelineOrchestrator for each well's ProcessingContext. Each method
197 is responsible for a specific part of the compilation process, such as
198 path planning, special I/O resolution, materialization flag setting,
199 memory contract validation, and GPU resource assignment.
200 """
202 @staticmethod
203 def initialize_step_plans_for_context(
204 context: ProcessingContext,
205 steps_definition: List[AbstractStep],
206 orchestrator,
207 metadata_writer: bool = False,
208 plate_path: Optional[Path] = None
209 # base_input_dir and axis_id parameters removed, will use from context
210 ) -> None:
211 """
212 Initializes step_plans by calling PipelinePathPlanner.prepare_pipeline_paths,
213 which handles primary paths, special I/O path planning and linking, and chainbreaker status.
214 Then, this method supplements the plans with non-I/O FunctionStep-specific attributes.
216 Args:
217 context: ProcessingContext to initialize step plans for
218 steps_definition: List of AbstractStep objects defining the pipeline
219 orchestrator: Orchestrator instance for well filter resolution
220 metadata_writer: If True, this well is responsible for creating OpenHCS metadata files
221 plate_path: Path to plate root for zarr conversion detection
222 """
223 # NOTE: This method is called within config_context() wrapper in compile_pipelines()
224 if context.is_frozen(): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 raise AttributeError("Cannot initialize step plans in a frozen ProcessingContext.")
227 if not hasattr(context, 'step_plans') or context.step_plans is None: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 context.step_plans = {} # Ensure step_plans dict exists
230 # === VISUALIZER CONFIG EXTRACTION ===
231 # visualizer_config is a legacy parameter that's passed to visualizers but never used
232 # The actual display configuration comes from the display_config parameter
233 # Set to None for backward compatibility with orchestrator code
234 context.visualizer_config = None
236 # Note: _normalize_step_attributes is now called in compile_pipelines() before filtering
237 # to ensure old pickled steps have the 'enabled' attribute before we check it
239 # Pre-initialize step_plans with basic entries for each step
240 # Use step index as key instead of step_id for multiprocessing compatibility
241 for step_index, step in enumerate(steps_definition):
242 if step_index not in context.step_plans: 242 ↛ 241line 242 didn't jump to line 241 because the condition on line 242 was always true
243 context.step_plans[step_index] = {
244 "step_name": step.name,
245 "step_type": step.__class__.__name__,
246 "axis_id": context.axis_id,
247 }
249 # === INPUT CONVERSION DETECTION ===
250 # Check if first step needs zarr conversion
251 if steps_definition and plate_path: 251 ↛ 290line 251 didn't jump to line 290 because the condition on line 251 was always true
252 first_step = steps_definition[0]
253 # Access config directly from orchestrator.pipeline_config (lazy resolution via config_context)
254 vfs_config = orchestrator.pipeline_config.vfs_config
256 # Only convert if default materialization backend is ZARR
257 wants_zarr_conversion = (
258 vfs_config.materialization_backend == MaterializationBackend.ZARR
259 )
261 if wants_zarr_conversion:
262 # Check if input plate is already zarr format
263 available_backends = context.microscope_handler.get_available_backends(plate_path)
264 already_zarr = Backend.ZARR in available_backends
266 if not already_zarr:
267 # Determine if input uses virtual workspace
268 from openhcs.microscopes.openhcs import OpenHCSMetadataHandler
269 from openhcs.io.metadata_writer import get_subdirectory_name
271 openhcs_metadata_handler = OpenHCSMetadataHandler(context.filemanager)
272 metadata = openhcs_metadata_handler._load_metadata_dict(plate_path)
273 subdirs = metadata["subdirectories"]
275 # Get actual subdirectory from input_dir
276 original_subdir = get_subdirectory_name(context.input_dir, plate_path)
277 uses_virtual_workspace = Backend.VIRTUAL_WORKSPACE.value in subdirs[original_subdir]["available_backends"]
279 zarr_subdir = "zarr" if uses_virtual_workspace else original_subdir
280 conversion_dir = plate_path / zarr_subdir
282 context.step_plans[0]["input_conversion_dir"] = str(conversion_dir)
283 context.step_plans[0]["input_conversion_backend"] = MaterializationBackend.ZARR.value
284 context.step_plans[0]["input_conversion_uses_virtual_workspace"] = uses_virtual_workspace
285 context.step_plans[0]["input_conversion_original_subdir"] = original_subdir
286 logger.debug(f"Input conversion to zarr enabled for first step: {first_step.name}")
288 # The axis_id and base_input_dir are available from the context object.
289 # Path planning now gets config directly from orchestrator.pipeline_config parameter
290 PipelinePathPlanner.prepare_pipeline_paths(
291 context,
292 steps_definition,
293 orchestrator.pipeline_config
294 )
296 # === FUNCTION OBJECT REFRESH ===
297 # CRITICAL FIX: Refresh all function objects to ensure they're picklable
298 # This prevents multiprocessing pickling errors by ensuring clean function objects
299 logger.debug("🔧 FUNCTION REFRESH: Refreshing all function objects for picklability...")
300 _refresh_function_objects_in_steps(steps_definition)
302 # === LAZY CONFIG RESOLUTION ===
303 # Resolve each step's lazy configs with proper nested context
304 # This ensures step-level configs inherit from pipeline-level configs
305 # Architecture: GlobalPipelineConfig -> PipelineConfig -> Step (same as UI)
306 logger.debug("🔧 LAZY CONFIG RESOLUTION: Resolving lazy configs with nested step contexts...")
307 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
308 from openhcs.config_framework.context_manager import config_context
310 # Resolve each step individually with nested context (pipeline -> step)
311 # NOTE: The caller has already set up config_context(orchestrator.pipeline_config)
312 # We add step-level context on top for each step
313 resolved_steps = []
314 for step in steps_definition:
315 with config_context(step): # Step-level context on top of pipeline context
316 resolved_step = resolve_lazy_configurations_for_serialization(step)
317 resolved_steps.append(resolved_step)
318 steps_definition = resolved_steps
320 # Loop to supplement step_plans with non-I/O, non-path attributes
321 # after PipelinePathPlanner has fully populated them with I/O info.
322 for step_index, step in enumerate(steps_definition):
323 if step_index not in context.step_plans: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 logger.error(
325 f"Critical error: Step {step.name} (index: {step_index}) "
326 f"not found in step_plans after path planning phase. Clause 504."
327 )
328 # Create a minimal error plan
329 context.step_plans[step_index] = {
330 "step_name": step.name,
331 "step_type": step.__class__.__name__,
332 "axis_id": context.axis_id, # Use context.axis_id
333 "error": "Missing from path planning phase by PipelinePathPlanner",
334 "create_openhcs_metadata": metadata_writer # Set metadata writer responsibility flag
335 }
336 continue
338 current_plan = context.step_plans[step_index]
340 # Ensure basic metadata (PathPlanner should set most of this)
341 current_plan["step_name"] = step.name
342 current_plan["step_type"] = step.__class__.__name__
343 current_plan["axis_id"] = context.axis_id # Use context.axis_id; PathPlanner should also use context.axis_id
344 current_plan.setdefault("visualize", False) # Ensure visualize key exists
345 current_plan["create_openhcs_metadata"] = metadata_writer # Set metadata writer responsibility flag
347 # The special_outputs and special_inputs are now fully handled by PipelinePathPlanner.
348 # The block for planning special_outputs (lines 134-148 in original) is removed.
349 # Ensure these keys exist as OrderedDicts if PathPlanner doesn't guarantee it
350 # (PathPlanner currently creates them as dicts, OrderedDict might not be strictly needed here anymore)
351 current_plan.setdefault("special_inputs", OrderedDict())
352 current_plan.setdefault("special_outputs", OrderedDict())
353 current_plan.setdefault("chainbreaker", False) # PathPlanner now sets this.
355 # Add step-specific attributes (non-I/O, non-path related)
356 current_plan["variable_components"] = step.variable_components
357 current_plan["group_by"] = step.group_by
358 # Lazy configs were already resolved at the beginning of compilation
359 resolved_step = step
361 # DEBUG: Check what the resolved napari config actually has
362 if hasattr(resolved_step, 'napari_streaming_config') and resolved_step.napari_streaming_config:
363 logger.debug(f"resolved_step.napari_streaming_config.well_filter = {resolved_step.napari_streaming_config.well_filter}")
364 if hasattr(resolved_step, 'step_well_filter_config') and resolved_step.step_well_filter_config: 364 ↛ 366line 364 didn't jump to line 366 because the condition on line 364 was always true
365 logger.debug(f"resolved_step.step_well_filter_config.well_filter = {resolved_step.step_well_filter_config.well_filter}")
366 if hasattr(resolved_step, 'step_materialization_config') and resolved_step.step_materialization_config: 366 ↛ 370line 366 didn't jump to line 370 because the condition on line 366 was always true
367 logger.debug(f"resolved_step.step_materialization_config.sub_dir = '{resolved_step.step_materialization_config.sub_dir}' (type: {type(resolved_step.step_materialization_config).__name__})")
369 # Store WellFilterConfig instances only if they match the current axis
370 from openhcs.core.config import WellFilterConfig, StreamingConfig, WellFilterMode
371 has_streaming = False
372 required_visualizers = getattr(context, 'required_visualizers', [])
374 # CRITICAL FIX: Ensure required_visualizers is always set on context
375 # This prevents AttributeError during execution phase
376 if not hasattr(context, 'required_visualizers'):
377 context.required_visualizers = []
379 # Get step axis filters for this step
380 step_axis_filters = getattr(context, 'step_axis_filters', {}).get(step_index, {})
382 logger.debug(f"Processing step '{step.name}' with attributes: {[attr for attr in dir(resolved_step) if not attr.startswith('_') and 'config' in attr]}")
383 if step.name == "Image Enhancement Processing":
384 logger.debug(f"All attributes for {step.name}: {[attr for attr in dir(resolved_step) if not attr.startswith('_')]}")
386 for attr_name in dir(resolved_step):
387 if not attr_name.startswith('_'):
388 config = getattr(resolved_step, attr_name, None)
389 # Configs are already resolved to base configs at line 277
390 # No need to call to_base_config() again - that's legacy code
392 # Skip None configs
393 if config is None:
394 continue
396 # CRITICAL: Check enabled field first (fail-fast for disabled configs)
397 if hasattr(config, 'enabled') and not config.enabled:
398 continue
400 # Check well filter matching (only for WellFilterConfig instances)
401 include_config = True
402 if isinstance(config, WellFilterConfig) and config.well_filter is not None:
403 config_filter = step_axis_filters.get(attr_name)
404 if config_filter: 404 ↛ 413line 404 didn't jump to line 413 because the condition on line 404 was always true
405 # Apply axis filter logic
406 axis_in_filter = context.axis_id in config_filter['resolved_axis_values']
407 include_config = (
408 axis_in_filter if config_filter['filter_mode'] == WellFilterMode.INCLUDE
409 else not axis_in_filter
410 )
412 # Add config to plan if it passed all checks
413 if include_config:
414 current_plan[attr_name] = config
416 # Add streaming extras if this is a streaming config
417 if isinstance(config, StreamingConfig):
418 # Validate that the visualizer can actually be created
419 try:
420 # Only validate configs that actually have a backend (real streaming configs)
421 if not hasattr(config, 'backend'): 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true
422 continue
424 # Test visualizer creation without actually creating it
425 if hasattr(config, 'create_visualizer'): 425 ↛ 448line 425 didn't jump to line 448 because the condition on line 425 was always true
426 # For napari, check if napari is available and environment supports GUI
427 if config.backend.name == 'NAPARI_STREAM':
428 from openhcs.utils.import_utils import optional_import
429 import os
431 # Check if running in headless/CI environment
432 # CPU-only mode does NOT imply headless - you can run CPU mode with napari
433 is_headless = (
434 os.getenv('CI', 'false').lower() == 'true' or
435 os.getenv('OPENHCS_HEADLESS', 'false').lower() == 'true' or
436 os.getenv('DISPLAY') is None
437 )
439 if is_headless: 439 ↛ 443line 439 didn't jump to line 443 because the condition on line 439 was always true
440 logger.info(f"Napari streaming disabled for step '{step.name}': running in headless environment (CI or no DISPLAY)")
441 continue # Skip this streaming config
443 napari = optional_import("napari")
444 if napari is None:
445 logger.warning(f"Napari streaming disabled for step '{step.name}': napari not installed. Install with: pip install 'openhcs[viz]' or pip install napari")
446 continue # Skip this streaming config
448 has_streaming = True
449 # Collect visualizer info
450 visualizer_info = {
451 'backend': config.backend.name,
452 'config': config
453 }
454 if visualizer_info not in required_visualizers: 454 ↛ 386line 454 didn't jump to line 386 because the condition on line 454 was always true
455 required_visualizers.append(visualizer_info)
456 except Exception as e:
457 logger.warning(f"Streaming disabled for step '{step.name}': {e}")
458 continue # Skip this streaming config
460 # Set visualize flag for orchestrator if any streaming is enabled
461 current_plan["visualize"] = has_streaming
462 context.required_visualizers = required_visualizers
464 # Add FunctionStep specific attributes
465 if isinstance(step, FunctionStep): 465 ↛ exitline 465 didn't return from function 'initialize_step_plans_for_context' because the condition on line 465 was always true
467 # 🎯 SEMANTIC COHERENCE FIX: Prevent group_by/variable_components conflict
468 # When variable_components contains the same value as group_by,
469 # set group_by to None to avoid EZStitcher heritage rule violation
470 if (step.variable_components and step.group_by and 470 ↛ 472line 470 didn't jump to line 472 because the condition on line 470 was never true
471 step.group_by in step.variable_components):
472 logger.debug(f"Step {step.name}: Detected group_by='{step.group_by}' in variable_components={step.variable_components}. "
473 f"Setting group_by=None to maintain semantic coherence.")
474 current_plan["group_by"] = None
476 # func attribute is guaranteed in FunctionStep.__init__
477 current_plan["func_name"] = getattr(step.func, '__name__', str(step.func))
479 # Memory type hints from step instance (set in FunctionStep.__init__ if provided)
480 # These are initial hints; FuncStepContractValidator will set final types.
481 if hasattr(step, 'input_memory_type_hint'): # From FunctionStep.__init__ 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 current_plan['input_memory_type_hint'] = step.input_memory_type_hint
483 if hasattr(step, 'output_memory_type_hint'): # From FunctionStep.__init__ 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 current_plan['output_memory_type_hint'] = step.output_memory_type_hint
486 # The resolve_special_input_paths_for_context static method is DELETED (lines 181-238 of original)
487 # as this functionality is now handled by PipelinePathPlanner.prepare_pipeline_paths.
489 # _prepare_materialization_flags is removed as MaterializationFlagPlanner.prepare_pipeline_flags
490 # now modifies context.step_plans in-place and takes context directly.
492 @staticmethod
493 def declare_zarr_stores_for_context(
494 context: ProcessingContext,
495 steps_definition: List[AbstractStep],
496 orchestrator
497 ) -> None:
498 """
499 Declare zarr store creation functions for runtime execution.
501 This method runs after path planning but before materialization flag planning
502 to declare which steps need zarr stores and provide the metadata needed
503 for runtime store creation.
505 Args:
506 context: ProcessingContext for current well
507 steps_definition: List of AbstractStep objects
508 orchestrator: Orchestrator instance for accessing all wells
509 """
510 from openhcs.constants import MULTIPROCESSING_AXIS
512 all_wells = orchestrator.get_component_keys(MULTIPROCESSING_AXIS)
514 # Access config directly from orchestrator.pipeline_config (lazy resolution via config_context)
515 vfs_config = orchestrator.pipeline_config.vfs_config
517 for step_index, step in enumerate(steps_definition):
518 step_plan = context.step_plans[step_index]
520 will_use_zarr = (
521 vfs_config.materialization_backend == MaterializationBackend.ZARR and
522 step_index == len(steps_definition) - 1
523 )
525 if will_use_zarr:
526 step_plan["zarr_config"] = {
527 "all_wells": all_wells,
528 "needs_initialization": True
529 }
530 logger.debug(f"Step '{step.name}' will use zarr backend for axis {context.axis_id}")
531 else:
532 step_plan["zarr_config"] = None
534 @staticmethod
535 def plan_materialization_flags_for_context(
536 context: ProcessingContext,
537 steps_definition: List[AbstractStep],
538 orchestrator
539 ) -> None:
540 """
541 Plans and injects materialization flags into context.step_plans
542 by calling MaterializationFlagPlanner.
543 """
544 if context.is_frozen(): 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true
545 raise AttributeError("Cannot plan materialization flags in a frozen ProcessingContext.")
546 if not context.step_plans: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true
547 logger.warning("step_plans is empty in context for materialization planning. This may be valid if pipeline is empty.")
548 return
550 # MaterializationFlagPlanner.prepare_pipeline_flags now takes context and pipeline_definition
551 # and modifies context.step_plans in-place.
552 MaterializationFlagPlanner.prepare_pipeline_flags(
553 context,
554 steps_definition,
555 orchestrator.plate_path,
556 orchestrator.pipeline_config
557 )
559 # Post-check (optional, but good for ensuring contracts are met by the planner)
560 for step_index, step in enumerate(steps_definition):
561 if step_index not in context.step_plans: 561 ↛ 563line 561 didn't jump to line 563 because the condition on line 561 was never true
562 # This should not happen if prepare_pipeline_flags guarantees plans for all steps
563 logger.error(f"Step {step.name} (index: {step_index}) missing from step_plans after materialization planning.")
564 continue
566 plan = context.step_plans[step_index]
567 # Check for keys that FunctionStep actually uses during execution
568 required_keys = [READ_BACKEND, WRITE_BACKEND]
569 if not all(k in plan for k in required_keys): 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true
570 missing_keys = [k for k in required_keys if k not in plan]
571 logger.error(
572 f"Materialization flag planning incomplete for step {step.name} (index: {step_index}). "
573 f"Missing required keys: {missing_keys} (Clause 273)."
574 )
577 @staticmethod
578 def validate_memory_contracts_for_context(
579 context: ProcessingContext,
580 steps_definition: List[AbstractStep],
581 orchestrator=None
582 ) -> None:
583 """
584 Validates FunctionStep memory contracts, dict patterns, and adds memory type info to context.step_plans.
586 Args:
587 context: ProcessingContext to validate
588 steps_definition: List of AbstractStep objects
589 orchestrator: Optional orchestrator for dict pattern key validation
590 """
591 if context.is_frozen(): 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true
592 raise AttributeError("Cannot validate memory contracts in a frozen ProcessingContext.")
594 # FuncStepContractValidator might need access to input/output_memory_type_hint from plan
595 step_memory_types = FuncStepContractValidator.validate_pipeline(
596 steps=steps_definition,
597 pipeline_context=context, # Pass context so validator can access step plans for memory type overrides
598 orchestrator=orchestrator # Pass orchestrator for dict pattern key validation
599 )
601 for step_index, memory_types in step_memory_types.items():
602 if "input_memory_type" not in memory_types or "output_memory_type" not in memory_types: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true
603 step_name = context.step_plans[step_index]["step_name"]
604 raise AssertionError(
605 f"Memory type validation must set input/output_memory_type for FunctionStep {step_name} (index: {step_index}) (Clause 101)."
606 )
607 if step_index in context.step_plans: 607 ↛ 610line 607 didn't jump to line 610 because the condition on line 607 was always true
608 context.step_plans[step_index].update(memory_types)
609 else:
610 logger.warning(f"Step index {step_index} found in memory_types but not in context.step_plans. Skipping.")
612 # Apply memory type override: Any step with disk output must use numpy for disk writing
613 for step_index, step in enumerate(steps_definition):
614 if isinstance(step, FunctionStep): 614 ↛ 613line 614 didn't jump to line 613 because the condition on line 614 was always true
615 if step_index in context.step_plans: 615 ↛ 613line 615 didn't jump to line 613 because the condition on line 615 was always true
616 step_plan = context.step_plans[step_index]
617 is_last_step = (step_index == len(steps_definition) - 1)
618 write_backend = step_plan['write_backend']
620 if write_backend == 'disk':
621 logger.debug(f"Step {step.name} has disk output, overriding output_memory_type to numpy")
622 step_plan['output_memory_type'] = 'numpy'
626 @staticmethod
627 def assign_gpu_resources_for_context(
628 context: ProcessingContext
629 ) -> None:
630 """
631 Validates GPU memory types from context.step_plans and assigns GPU device IDs.
632 (Unchanged from previous version)
633 """
634 if context.is_frozen(): 634 ↛ 635line 634 didn't jump to line 635 because the condition on line 634 was never true
635 raise AttributeError("Cannot assign GPU resources in a frozen ProcessingContext.")
637 gpu_assignments = GPUMemoryTypeValidator.validate_step_plans(context.step_plans)
639 for step_index, step_plan_val in context.step_plans.items(): # Renamed step_plan to step_plan_val to avoid conflict
640 is_gpu_step = False
641 input_type = step_plan_val["input_memory_type"]
642 if input_type in VALID_GPU_MEMORY_TYPES: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true
643 is_gpu_step = True
645 output_type = step_plan_val["output_memory_type"]
646 if output_type in VALID_GPU_MEMORY_TYPES: 646 ↛ 647line 646 didn't jump to line 647 because the condition on line 646 was never true
647 is_gpu_step = True
649 if is_gpu_step: 649 ↛ 652line 649 didn't jump to line 652 because the condition on line 649 was never true
650 # Ensure gpu_assignments has an entry for this step_index if it's a GPU step
651 # And that entry contains a 'gpu_id'
652 step_gpu_assignment = gpu_assignments[step_index]
653 if "gpu_id" not in step_gpu_assignment:
654 step_name = step_plan_val["step_name"]
655 raise AssertionError(
656 f"GPU validation must assign gpu_id for step {step_name} (index: {step_index}) "
657 f"with GPU memory types (Clause 295)."
658 )
660 for step_index, gpu_assignment in gpu_assignments.items(): 660 ↛ 661line 660 didn't jump to line 661 because the loop on line 660 never started
661 if step_index in context.step_plans:
662 context.step_plans[step_index].update(gpu_assignment)
663 else:
664 logger.warning(f"Step index {step_index} found in gpu_assignments but not in context.step_plans. Skipping.")
666 @staticmethod
667 def apply_global_visualizer_override_for_context(
668 context: ProcessingContext,
669 global_enable_visualizer: bool
670 ) -> None:
671 """
672 Applies global visualizer override to all step_plans in the context.
673 (Unchanged from previous version)
674 """
675 if context.is_frozen():
676 raise AttributeError("Cannot apply visualizer override in a frozen ProcessingContext.")
678 if global_enable_visualizer:
679 if not context.step_plans: return # Guard against empty step_plans
680 for step_index, plan in context.step_plans.items():
681 plan["visualize"] = True
682 logger.info(f"Global visualizer override: Step '{plan['step_name']}' marked for visualization.")
684 @staticmethod
685 def resolve_lazy_dataclasses_for_context(context: ProcessingContext, orchestrator) -> None:
686 """
687 Resolve all lazy dataclass instances in step plans to their base configurations.
689 This method should be called after all compilation phases but before context
690 freezing to ensure step plans are safe for pickling in multiprocessing contexts.
692 NOTE: The caller MUST have already set up config_context(orchestrator.pipeline_config)
693 before calling this method. We rely on that context for lazy resolution.
695 Args:
696 context: ProcessingContext to process
697 orchestrator: PipelineOrchestrator (unused - kept for API compatibility)
698 """
699 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
701 # Resolve the entire context recursively to catch all lazy dataclass instances
702 # The caller has already set up config_context(), so lazy resolution happens automatically
703 resolved_context_dict = resolve_lazy_configurations_for_serialization(vars(context))
705 # Update context attributes with resolved values
706 for attr_name, resolved_value in resolved_context_dict.items():
707 if not attr_name.startswith('_'): # Skip private attributes
708 setattr(context, attr_name, resolved_value)
710 @staticmethod
711 def validate_backend_compatibility(orchestrator) -> None:
712 """
713 Validate and auto-correct materialization backend for microscopes with single compatible backend.
715 For microscopes with only one compatible backend (e.g., OMERO → OMERO_LOCAL),
716 automatically corrects the backend if misconfigured. For microscopes with multiple
717 compatible backends, the configured backend must be explicitly compatible.
719 Args:
720 orchestrator: PipelineOrchestrator instance with initialized microscope_handler
721 """
722 from openhcs.core.config import VFSConfig
723 from dataclasses import replace
725 microscope_handler = orchestrator.microscope_handler
726 required_backend = microscope_handler.get_required_backend()
728 if required_backend:
729 # Microscope has single compatible backend - auto-correct if needed
730 vfs_config = orchestrator.pipeline_config.vfs_config or VFSConfig()
732 if vfs_config.materialization_backend != required_backend: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 logger.warning(
734 f"{microscope_handler.microscope_type} requires {required_backend.value} backend. "
735 f"Auto-correcting from {vfs_config.materialization_backend.value}."
736 )
737 new_vfs_config = replace(vfs_config, materialization_backend=required_backend)
738 orchestrator.pipeline_config = replace(
739 orchestrator.pipeline_config,
740 vfs_config=new_vfs_config
741 )
743 @staticmethod
744 def ensure_analysis_materialization(pipeline_definition: List[AbstractStep]) -> None:
745 """
746 Ensure intermediate steps with analysis outputs have step_materialization_config.
748 Analysis results (special outputs) must be saved alongside the images they were
749 created from to maintain metadata coherence. For intermediate steps (not final),
750 this requires materializing the images so analysis has matching image metadata.
752 Final steps don't need auto-creation because their images and analysis both
753 go to main output directory (no metadata mismatch).
755 Called once before per-well compilation loop.
757 Args:
758 pipeline_definition: List of pipeline steps to check
759 """
760 from openhcs.core.config import StepMaterializationConfig
762 for step_index, step in enumerate(pipeline_definition):
763 # Only process FunctionSteps
764 if not isinstance(step, FunctionStep): 764 ↛ 765line 764 didn't jump to line 765 because the condition on line 764 was never true
765 continue
767 # Check if step has special outputs (analysis results)
768 has_special_outputs = hasattr(step.func, '__special_outputs__') and step.func.__special_outputs__
770 # Only auto-create for intermediate steps (not final step)
771 is_intermediate_step = step_index < len(pipeline_definition) - 1
773 # Normalize: no config = disabled config (eliminates dual code path)
774 if not step.step_materialization_config:
775 from openhcs.config_framework.lazy_factory import LazyStepMaterializationConfig
776 step.step_materialization_config = LazyStepMaterializationConfig(enabled=False)
778 # Single code path: just check enabled
779 if has_special_outputs and not step.step_materialization_config.enabled and is_intermediate_step:
780 # Auto-enable materialization to preserve metadata coherence
781 from openhcs.config_framework.lazy_factory import LazyStepMaterializationConfig
782 step.step_materialization_config = LazyStepMaterializationConfig()
784 logger.warning(
785 f"⚠️ Step '{step.name}' (index {step_index}) has analysis outputs but lacks "
786 f"enabled materialization config. Auto-creating with defaults to preserve "
787 f"metadata coherence (intermediate step analysis must be saved with matching images)."
788 )
789 logger.info(
790 f" → Images and analysis will be saved to: "
791 f"{{plate_root}}/{step.step_materialization_config.sub_dir}/"
792 )
794 @staticmethod
795 def compile_pipelines(
796 orchestrator,
797 pipeline_definition: List[AbstractStep],
798 axis_filter: Optional[List[str]] = None,
799 enable_visualizer_override: bool = False
800 ) -> Dict[str, ProcessingContext]:
801 """
802 Compile-all phase: Prepares frozen ProcessingContexts for each axis value.
804 This method iterates through the specified axis values, creates a ProcessingContext
805 for each, and invokes the various phases of the PipelineCompiler to populate
806 the context's step_plans. After all compilation phases for an axis value are complete,
807 its context is frozen. Finally, attributes are stripped from the pipeline_definition,
808 making the step objects stateless for the execution phase.
810 Args:
811 orchestrator: The PipelineOrchestrator instance to use for compilation
812 pipeline_definition: The list of AbstractStep objects defining the pipeline.
813 axis_filter: Optional list of axis values to process. If None, processes all found axis values.
814 enable_visualizer_override: If True, all steps in all compiled contexts
815 will have their 'visualize' flag set to True.
817 Returns:
818 A dictionary mapping axis values to their compiled and frozen ProcessingContexts.
819 The input `pipeline_definition` list (of step objects) is modified in-place
820 to become stateless.
821 """
822 from openhcs.constants.constants import OrchestratorState
823 from openhcs.core.pipeline.step_attribute_stripper import StepAttributeStripper
825 if not orchestrator.is_initialized(): 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true
826 raise RuntimeError("PipelineOrchestrator must be explicitly initialized before calling compile_pipelines().")
828 if not pipeline_definition: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true
829 raise ValueError("A valid pipeline definition (List[AbstractStep]) must be provided.")
831 # === BACKWARDS COMPATIBILITY PREPROCESSING ===
832 # Normalize step attributes BEFORE filtering to ensure old pickled steps have 'enabled' attribute
833 logger.debug("🔧 BACKWARDS COMPATIBILITY: Normalizing step attributes before filtering...")
834 _normalize_step_attributes(pipeline_definition)
836 # Filter out disabled steps at compile time (before any compilation phases)
837 original_count = len(pipeline_definition)
838 enabled_steps = []
839 for step in pipeline_definition:
840 if step.enabled: 840 ↛ 843line 840 didn't jump to line 843 because the condition on line 840 was always true
841 enabled_steps.append(step)
842 else:
843 logger.info(f"🔧 COMPILE-TIME FILTER: Removing disabled step '{step.name}' from pipeline")
845 # Update pipeline_definition in-place to contain only enabled steps
846 pipeline_definition.clear()
847 pipeline_definition.extend(enabled_steps)
849 if original_count != len(pipeline_definition): 849 ↛ 850line 849 didn't jump to line 850 because the condition on line 849 was never true
850 logger.info(f"🔧 COMPILE-TIME FILTER: Filtered {original_count - len(pipeline_definition)} disabled step(s), {len(pipeline_definition)} step(s) remaining")
852 if not pipeline_definition: 852 ↛ 853line 852 didn't jump to line 853 because the condition on line 852 was never true
853 logger.warning("All steps were disabled. Pipeline is empty after filtering.")
854 return {
855 'pipeline_definition': pipeline_definition,
856 'compiled_contexts': {}
857 }
859 try:
860 compiled_contexts: Dict[str, ProcessingContext] = {}
861 # Get multiprocessing axis values dynamically from configuration
862 from openhcs.constants import MULTIPROCESSING_AXIS
864 # CRITICAL: Resolve well_filter_config from pipeline_config if present
865 # This allows global-level well filtering to work (e.g., well_filter_config.well_filter = 1)
866 resolved_axis_filter = axis_filter
867 if orchestrator.pipeline_config and hasattr(orchestrator.pipeline_config, 'well_filter_config'): 867 ↛ 885line 867 didn't jump to line 885 because the condition on line 867 was always true
868 well_filter_config = orchestrator.pipeline_config.well_filter_config
869 if well_filter_config and hasattr(well_filter_config, 'well_filter') and well_filter_config.well_filter is not None: 869 ↛ 870line 869 didn't jump to line 870 because the condition on line 869 was never true
870 from openhcs.core.utils import WellFilterProcessor
871 available_wells = orchestrator.get_component_keys(MULTIPROCESSING_AXIS)
872 resolved_wells = list(WellFilterProcessor.resolve_compilation_filter(
873 well_filter_config.well_filter,
874 available_wells
875 ))
876 logger.info(f"Resolved well_filter_config.well_filter={well_filter_config.well_filter} to {len(resolved_wells)} wells: {resolved_wells}")
878 # If axis_filter was also provided, intersect them
879 if axis_filter:
880 resolved_axis_filter = [w for w in resolved_wells if w in axis_filter]
881 logger.info(f"Intersected with axis_filter: {len(resolved_axis_filter)} wells remain")
882 else:
883 resolved_axis_filter = resolved_wells
885 axis_values_to_process = orchestrator.get_component_keys(MULTIPROCESSING_AXIS, resolved_axis_filter)
887 if not axis_values_to_process: 887 ↛ 888line 887 didn't jump to line 888 because the condition on line 887 was never true
888 logger.warning("No axis values found to process based on filter.")
889 return {
890 'pipeline_definition': pipeline_definition,
891 'compiled_contexts': {}
892 }
894 logger.info(f"Starting compilation for axis values: {', '.join(axis_values_to_process)}")
896 # === ANALYSIS MATERIALIZATION AUTO-INSTANTIATION ===
897 # Ensure intermediate steps with analysis outputs have step_materialization_config
898 # This preserves metadata coherence (ROIs must match image structure they were created from)
899 # CRITICAL: Must be inside config_context() for lazy resolution of .enabled field
900 from openhcs.config_framework.context_manager import config_context
901 with config_context(orchestrator.pipeline_config):
902 PipelineCompiler.ensure_analysis_materialization(pipeline_definition)
904 # === BACKEND COMPATIBILITY VALIDATION ===
905 # Validate that configured backend is compatible with microscope
906 # For microscopes with only one compatible backend (e.g., OMERO), auto-set it
907 logger.debug("🔧 BACKEND VALIDATION: Validating backend compatibility with microscope...")
908 PipelineCompiler.validate_backend_compatibility(orchestrator)
910 # === GLOBAL AXIS FILTER RESOLUTION ===
911 # Resolve axis filters once for all axis values to ensure step-level inheritance works
912 logger.debug("🔧 LAZY CONFIG RESOLUTION: Resolving lazy configs for axis filter resolution...")
913 from openhcs.config_framework.lazy_factory import resolve_lazy_configurations_for_serialization
914 from openhcs.config_framework.context_manager import config_context
916 # Resolve each step with nested context (same as initialize_step_plans_for_context)
917 # This ensures step-level configs inherit from pipeline-level configs
918 resolved_steps_for_filters = []
919 with config_context(orchestrator.pipeline_config):
920 for step in pipeline_definition:
921 with config_context(step): # Step-level context on top of pipeline context
922 resolved_step = resolve_lazy_configurations_for_serialization(step)
923 resolved_steps_for_filters.append(resolved_step)
925 logger.debug("🎯 AXIS FILTER RESOLUTION: Resolving step axis filters...")
926 # Create a temporary context to store the global axis filters
927 temp_context = orchestrator.create_context("temp")
929 # Use orchestrator context during axis filter resolution
930 # This ensures that lazy config resolution uses the orchestrator context
931 from openhcs.config_framework.context_manager import config_context
932 with config_context(orchestrator.pipeline_config):
933 _resolve_step_axis_filters(resolved_steps_for_filters, temp_context, orchestrator)
934 global_step_axis_filters = getattr(temp_context, 'step_axis_filters', {})
936 # Determine responsible axis value for metadata creation (lexicographically first)
937 responsible_axis_value = sorted(axis_values_to_process)[0] if axis_values_to_process else None
938 logger.debug(f"Designated responsible axis value for metadata creation: {responsible_axis_value}")
940 for axis_id in axis_values_to_process:
941 logger.debug(f"Compiling for axis value: {axis_id}")
942 context = orchestrator.create_context(axis_id)
944 # Copy global axis filters to this context
945 context.step_axis_filters = global_step_axis_filters
947 # Determine if this axis value is responsible for metadata creation
948 is_responsible = (axis_id == responsible_axis_value)
949 logger.debug(f"Axis {axis_id} metadata responsibility: {is_responsible}")
951 # CRITICAL: Wrap all compilation steps in config_context() for lazy resolution
952 from openhcs.config_framework.context_manager import config_context
953 with config_context(orchestrator.pipeline_config):
954 PipelineCompiler.initialize_step_plans_for_context(context, pipeline_definition, orchestrator, metadata_writer=is_responsible, plate_path=orchestrator.plate_path)
955 PipelineCompiler.declare_zarr_stores_for_context(context, pipeline_definition, orchestrator)
956 PipelineCompiler.plan_materialization_flags_for_context(context, pipeline_definition, orchestrator)
957 PipelineCompiler.validate_memory_contracts_for_context(context, pipeline_definition, orchestrator)
958 PipelineCompiler.assign_gpu_resources_for_context(context)
960 if enable_visualizer_override: 960 ↛ 961line 960 didn't jump to line 961 because the condition on line 960 was never true
961 PipelineCompiler.apply_global_visualizer_override_for_context(context, True)
963 # Resolve all lazy dataclasses before freezing to ensure multiprocessing compatibility
964 PipelineCompiler.resolve_lazy_dataclasses_for_context(context, orchestrator)
970 context.freeze()
971 compiled_contexts[axis_id] = context
972 logger.debug(f"Compilation finished for axis value: {axis_id}")
974 # Log path planning summary once per plate
975 if compiled_contexts: 975 ↛ 992line 975 didn't jump to line 992 because the condition on line 975 was always true
976 first_context = next(iter(compiled_contexts.values()))
977 logger.info("📁 PATH PLANNING SUMMARY:")
978 logger.info(f" Main pipeline output: {first_context.output_plate_root}")
980 # Check for materialization steps in first context
981 materialization_steps = []
982 for step_id, plan in first_context.step_plans.items():
983 if 'materialized_output_dir' in plan:
984 step_name = plan.get('step_name', f'step_{step_id}')
985 mat_path = plan['materialized_output_dir']
986 materialization_steps.append((step_name, mat_path))
988 for step_name, mat_path in materialization_steps:
989 logger.info(f" Materialization {step_name}: {mat_path}")
991 # After processing all wells, strip attributes and finalize
992 logger.info("Stripping attributes from pipeline definition steps.")
993 StepAttributeStripper.strip_step_attributes(pipeline_definition, {})
995 orchestrator._state = OrchestratorState.COMPILED
997 # Log worker configuration for execution planning
998 effective_config = orchestrator.get_effective_config()
999 logger.info(f"⚙️ EXECUTION CONFIG: {effective_config.num_workers} workers configured for pipeline execution")
1001 logger.info(f"🏁 COMPILATION COMPLETE: {len(compiled_contexts)} wells compiled successfully")
1003 # Return expected structure with both pipeline_definition and compiled_contexts
1004 return {
1005 'pipeline_definition': pipeline_definition,
1006 'compiled_contexts': compiled_contexts
1007 }
1008 except Exception as e:
1009 orchestrator._state = OrchestratorState.COMPILE_FAILED
1010 logger.error(f"Failed to compile pipelines: {e}")
1011 raise
1015# The monolithic compile() method is removed.
1016# Orchestrator will call the static methods above in sequence.
1017# _strip_step_attributes is also removed as StepAttributeStripper is called by Orchestrator.
1020def _resolve_step_axis_filters(resolved_steps: List[AbstractStep], context, orchestrator):
1021 """
1022 Resolve axis filters for steps with any WellFilterConfig instances.
1024 This function handles step-level axis filtering by resolving patterns like
1025 "row:A", ["A01", "B02"], or max counts against the available axis values for the plate.
1026 It processes ALL WellFilterConfig instances (materialization, streaming, etc.) uniformly.
1028 Args:
1029 resolved_steps: List of pipeline steps with lazy configs already resolved
1030 context: Processing context for the current axis value
1031 orchestrator: Orchestrator instance with access to available axis values
1032 """
1033 from openhcs.core.utils import WellFilterProcessor
1034 from openhcs.core.config import WellFilterConfig
1036 # Get available axis values from orchestrator using multiprocessing axis
1037 from openhcs.constants import MULTIPROCESSING_AXIS
1038 available_axis_values = orchestrator.get_component_keys(MULTIPROCESSING_AXIS)
1039 if not available_axis_values: 1039 ↛ 1040line 1039 didn't jump to line 1040 because the condition on line 1039 was never true
1040 logger.warning("No available axis values found for axis filter resolution")
1041 return
1043 # Initialize step_axis_filters in context if not present
1044 if not hasattr(context, 'step_axis_filters'): 1044 ↛ 1048line 1044 didn't jump to line 1048 because the condition on line 1044 was always true
1045 context.step_axis_filters = {}
1047 # Process each step for ALL WellFilterConfig instances using the already resolved steps
1048 for step_index, resolved_step in enumerate(resolved_steps):
1049 step_filters = {}
1051 # Check all attributes for WellFilterConfig instances on the RESOLVED step
1052 for attr_name in dir(resolved_step):
1053 if not attr_name.startswith('_'):
1054 config = getattr(resolved_step, attr_name, None)
1055 if config is not None and isinstance(config, WellFilterConfig) and config.well_filter is not None:
1056 try:
1057 # Resolve the axis filter pattern to concrete axis values
1058 resolved_axis_values = WellFilterProcessor.resolve_compilation_filter(
1059 config.well_filter,
1060 available_axis_values
1061 )
1063 # Store resolved axis values for this config
1064 step_filters[attr_name] = {
1065 'resolved_axis_values': sorted(resolved_axis_values),
1066 'filter_mode': config.well_filter_mode,
1067 'original_filter': config.well_filter
1068 }
1070 logger.debug(f"Step '{resolved_step.name}' {attr_name} filter '{config.well_filter}' "
1071 f"resolved to {len(resolved_axis_values)} axis values: {sorted(resolved_axis_values)}")
1072 logger.debug(f"Step '{resolved_step.name}' {attr_name} filter '{config.well_filter}' "
1073 f"resolved to {len(resolved_axis_values)} axis values: {sorted(resolved_axis_values)}")
1075 except Exception as e:
1076 logger.error(f"Failed to resolve axis filter for step '{resolved_step.name}' {attr_name}: {e}")
1077 raise ValueError(f"Invalid axis filter '{config.well_filter}' "
1078 f"for step '{resolved_step.name}' {attr_name}: {e}")
1080 # Store step filters if any were found
1081 if step_filters: 1081 ↛ 1048line 1081 didn't jump to line 1048 because the condition on line 1081 was always true
1082 context.step_axis_filters[step_index] = step_filters
1084 total_filters = sum(len(filters) for filters in context.step_axis_filters.values())
1085 logger.debug(f"Axis filter resolution complete. {len(context.step_axis_filters)} steps have axis filters, {total_filters} total filters.")
1088def _should_process_for_well(axis_id, well_filter_config):
1089 """Unified well filtering logic for all WellFilterConfig systems."""
1090 if well_filter_config.well_filter is None:
1091 return True
1093 well_in_filter = axis_id in well_filter_config.well_filter
1094 return well_in_filter if well_filter_config.well_filter_mode == WellFilterMode.INCLUDE else not well_in_filter