Coverage for openhcs/core/pipeline/materialization_flag_planner.py: 92.3%
47 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Materialization flag planner for OpenHCS.
4This module provides the MaterializationFlagPlanner class, which is responsible for
5determining materialization flags and backend selection for each step in a pipeline.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath)
10- Clause 65 — No Fallback Logic
11- Clause 66 — Immutability After Construction
12- Clause 88 — No Inferred Capabilities
13- Clause 245 — Path Declaration
14- Clause 273 — Backend Authorization Doctrine
15- Clause 276 — Positional Backend Enforcement
16- Clause 504 — Pipeline Preparation Modifications
17"""
19import logging
20from pathlib import Path
21from typing import List
23from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend
24from openhcs.core.context.processing_context import ProcessingContext
25from openhcs.core.steps.abstract import AbstractStep
26from openhcs.core.config import MaterializationBackend
28logger = logging.getLogger(__name__)
31class MaterializationFlagPlanner:
32 """Sets read/write backends for pipeline steps."""
34 @staticmethod
35 def prepare_pipeline_flags(
36 context: ProcessingContext,
37 pipeline_definition: List[AbstractStep],
38 plate_path: Path,
39 pipeline_config
40 ) -> None:
41 """Set read/write backends for pipeline steps."""
43 # === SETUP ===
44 # Access config directly from pipeline_config (lazy resolution via config_context)
45 vfs_config = pipeline_config.vfs_config
46 step_plans = context.step_plans
48 # === PROCESS EACH STEP ===
49 for i, step in enumerate(pipeline_definition):
50 step_plan = step_plans[i] # Use step index instead of step_id
52 # === READ BACKEND SELECTION ===
53 if i == 0: # First step - read from plate format
54 read_backend = MaterializationFlagPlanner._get_first_step_read_backend(context, vfs_config)
55 step_plan[READ_BACKEND] = read_backend
57 # Zarr conversion flag is already set by path planner if needed
58 else: # Other steps - read from memory (unless already set by chainbreaker logic)
59 if READ_BACKEND not in step_plan: 59 ↛ 70line 59 didn't jump to line 70 because the condition on line 59 was always true
60 # Check if this step reads from PIPELINE_START (original input)
61 from openhcs.core.steps.abstract import InputSource
62 if step.input_source == InputSource.PIPELINE_START:
63 # Use the same backend as the first step
64 step_plan[READ_BACKEND] = step_plans[0][READ_BACKEND]
65 else:
66 step_plan[READ_BACKEND] = Backend.MEMORY.value
68 # === WRITE BACKEND SELECTION ===
69 # Check if this step will use zarr (has zarr_config set by compiler)
70 will_use_zarr = step_plan.get("zarr_config") is not None
72 if will_use_zarr:
73 # Steps with zarr_config should write to materialization backend
74 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config)
75 step_plan[WRITE_BACKEND] = materialization_backend
76 elif i == len(pipeline_definition) - 1: # Last step without zarr - write to materialization backend
77 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config)
78 step_plan[WRITE_BACKEND] = materialization_backend
79 else: # Other steps - write to memory
80 step_plan[WRITE_BACKEND] = Backend.MEMORY.value
82 # === PER-STEP MATERIALIZATION BACKEND SELECTION ===
83 if "materialized_output_dir" in step_plan:
84 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config)
85 step_plan["materialized_backend"] = materialization_backend
87 @staticmethod
88 def _get_first_step_read_backend(context: ProcessingContext, vfs_config) -> str:
89 """Get read backend for first step based on VFS config and metadata-based auto-detection."""
91 # Check if user explicitly configured a read backend
92 if vfs_config.read_backend != Backend.AUTO: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 return vfs_config.read_backend.value
95 # AUTO mode: Use unified backend detection
96 return MaterializationFlagPlanner._detect_backend_for_context(context, fallback_backend=Backend.DISK.value)
98 @staticmethod
99 def _resolve_materialization_backend(context: ProcessingContext, vfs_config) -> str:
100 """Resolve materialization backend, handling AUTO option."""
101 # Check if user explicitly configured a materialization backend
102 if vfs_config.materialization_backend != MaterializationBackend.AUTO: 102 ↛ 106line 102 didn't jump to line 106 because the condition on line 102 was always true
103 return vfs_config.materialization_backend.value
105 # AUTO mode: Use unified backend detection
106 return MaterializationFlagPlanner._detect_backend_for_context(context, fallback_backend=MaterializationBackend.DISK.value)
108 @staticmethod
109 def _detect_backend_for_context(context: ProcessingContext, fallback_backend: str) -> str:
110 """Unified backend detection logic for both read and materialization backends."""
111 # Use the microscope handler's get_primary_backend method
112 # This handles both OpenHCS (metadata-based) and other microscopes (compatibility-based)
113 return context.microscope_handler.get_primary_backend(context.input_dir, context.filemanager)