Coverage for openhcs/core/pipeline/materialization_flag_planner.py: 93.4%
45 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2Materialization flag planner for OpenHCS.
4This module provides the MaterializationFlagPlanner class, which is responsible for
5determining materialization flags and backend selection for each step in a pipeline.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath)
10- Clause 65 — No Fallback Logic
11- Clause 66 — Immutability After Construction
12- Clause 88 — No Inferred Capabilities
13- Clause 245 — Path Declaration
14- Clause 273 — Backend Authorization Doctrine
15- Clause 276 — Positional Backend Enforcement
16- Clause 504 — Pipeline Preparation Modifications
17"""
19import logging
20from pathlib import Path
21from typing import Any, Dict, List
23from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend
24from openhcs.constants import Microscope
25from openhcs.core.context.processing_context import ProcessingContext
26from openhcs.core.steps.abstract import AbstractStep
27from openhcs.core.config import MaterializationBackend
29logger = logging.getLogger(__name__)
32class MaterializationFlagPlanner:
33 """Sets read/write backends for pipeline steps."""
35 @staticmethod
36 def prepare_pipeline_flags(
37 context: ProcessingContext,
38 pipeline_definition: List[AbstractStep],
39 plate_path: Path,
40 pipeline_config
41 ) -> None:
42 """Set read/write backends for pipeline steps."""
44 # === SETUP ===
45 # Access config directly from pipeline_config (lazy resolution via config_context)
46 vfs_config = pipeline_config.vfs_config
47 step_plans = context.step_plans
49 # === PROCESS EACH STEP ===
50 for i, step in enumerate(pipeline_definition):
51 step_plan = step_plans[i] # Use step index instead of step_id
53 # === READ BACKEND SELECTION ===
54 if i == 0: # First step - read from plate format
55 read_backend = MaterializationFlagPlanner._get_first_step_read_backend(context, vfs_config)
56 step_plan[READ_BACKEND] = read_backend
58 # Zarr conversion flag is already set by path planner if needed
59 else: # Other steps - read from memory (unless already set by chainbreaker logic)
60 if READ_BACKEND not in step_plan:
61 step_plan[READ_BACKEND] = Backend.MEMORY.value
63 # === WRITE BACKEND SELECTION ===
64 # Check if this step will use zarr (has zarr_config set by compiler)
65 will_use_zarr = step_plan.get("zarr_config") is not None
67 if will_use_zarr:
68 # Steps with zarr_config should write to materialization backend
69 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config)
70 step_plan[WRITE_BACKEND] = materialization_backend
71 elif i == len(pipeline_definition) - 1: # Last step without zarr - write to materialization backend
72 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config)
73 step_plan[WRITE_BACKEND] = materialization_backend
74 else: # Other steps - write to memory
75 step_plan[WRITE_BACKEND] = Backend.MEMORY.value
77 # === PER-STEP MATERIALIZATION BACKEND SELECTION ===
78 if "materialized_output_dir" in step_plan:
79 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config)
80 step_plan["materialized_backend"] = materialization_backend
82 @staticmethod
83 def _get_first_step_read_backend(context: ProcessingContext, vfs_config) -> str:
84 """Get read backend for first step based on VFS config and metadata-based auto-detection."""
86 # Check if user explicitly configured a read backend
87 if vfs_config.read_backend != Backend.AUTO: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 return vfs_config.read_backend.value
90 # AUTO mode: Use unified backend detection
91 return MaterializationFlagPlanner._detect_backend_for_context(context, fallback_backend=Backend.DISK.value)
93 @staticmethod
94 def _resolve_materialization_backend(context: ProcessingContext, vfs_config) -> str:
95 """Resolve materialization backend, handling AUTO option."""
96 # Check if user explicitly configured a materialization backend
97 if vfs_config.materialization_backend != MaterializationBackend.AUTO: 97 ↛ 101line 97 didn't jump to line 101 because the condition on line 97 was always true
98 return vfs_config.materialization_backend.value
100 # AUTO mode: Use unified backend detection
101 return MaterializationFlagPlanner._detect_backend_for_context(context, fallback_backend=MaterializationBackend.DISK.value)
103 @staticmethod
104 def _detect_backend_for_context(context: ProcessingContext, fallback_backend: str) -> str:
105 """Unified backend detection logic for both read and materialization backends."""
106 # Use the microscope handler's get_primary_backend method
107 # This handles both OpenHCS (metadata-based) and other microscopes (compatibility-based)
108 return context.microscope_handler.get_primary_backend(context.input_dir)