Coverage for openhcs/core/pipeline/materialization_flag_planner.py: 79.3%
40 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Materialization flag planner for OpenHCS.
4This module provides the MaterializationFlagPlanner class, which is responsible for
5determining materialization flags and backend selection for each step in a pipeline.
7Doctrinal Clauses:
8- Clause 12 — Absolute Clean Execution
9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath)
10- Clause 65 — No Fallback Logic
11- Clause 66 — Immutability After Construction
12- Clause 88 — No Inferred Capabilities
13- Clause 245 — Path Declaration
14- Clause 273 — Backend Authorization Doctrine
15- Clause 276 — Positional Backend Enforcement
16- Clause 504 — Pipeline Preparation Modifications
17"""
19import logging
20from pathlib import Path
21from typing import Any, Dict, List
23from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend
24from openhcs.core.context.processing_context import ProcessingContext
25from openhcs.core.steps.abstract import AbstractStep
26from openhcs.core.config import MaterializationBackend
28logger = logging.getLogger(__name__)
31class MaterializationFlagPlanner:
32 """Sets read/write backends for pipeline steps."""
34 @staticmethod
35 def prepare_pipeline_flags(
36 context: ProcessingContext,
37 pipeline_definition: List[AbstractStep],
38 plate_path: Path
39 ) -> None:
40 """Set read/write backends for pipeline steps."""
42 # === SETUP ===
43 vfs_config = context.get_vfs_config()
44 step_plans = context.step_plans
46 # === PROCESS EACH STEP ===
47 for i, step in enumerate(pipeline_definition):
48 step_plan = step_plans[step.step_id]
50 # === READ BACKEND SELECTION ===
51 if i == 0: # First step - read from plate format
52 read_backend = MaterializationFlagPlanner._get_first_step_read_backend(context)
53 step_plan[READ_BACKEND] = read_backend
55 # Zarr conversion flag is already set by path planner if needed
56 else: # Other steps - read from memory (unless already set by chainbreaker logic)
57 if READ_BACKEND not in step_plan:
58 step_plan[READ_BACKEND] = Backend.MEMORY.value
60 # === WRITE BACKEND SELECTION ===
61 # Check if this step will use zarr (has zarr_config set by compiler)
62 will_use_zarr = step_plan.get("zarr_config") is not None
64 if will_use_zarr:
65 # Steps with zarr_config should write to materialization backend
66 step_plan[WRITE_BACKEND] = vfs_config.materialization_backend.value
67 elif i == len(pipeline_definition) - 1: # Last step without zarr - write to materialization backend
68 step_plan[WRITE_BACKEND] = vfs_config.materialization_backend.value
69 else: # Other steps - write to memory
70 step_plan[WRITE_BACKEND] = Backend.MEMORY.value
72 # === PER-STEP MATERIALIZATION BACKEND SELECTION ===
73 if "materialized_output_dir" in step_plan:
74 step_plan["materialized_backend"] = vfs_config.materialization_backend.value
76 @staticmethod
77 def _get_first_step_read_backend(context: ProcessingContext) -> str:
78 """Get read backend for first step based on compatible backends (in priority order) and availability."""
79 compatible_backends = context.microscope_handler.compatible_backends
81 if len(compatible_backends) == 1: 81 ↛ 86line 81 didn't jump to line 86 because the condition on line 81 was always true
82 # Only one compatible - use its string value
83 return compatible_backends[0].value
84 else:
85 # Multiple compatible - check availability in priority order
86 available_backends = context.microscope_handler.metadata_handler.get_available_backends(context.input_dir)
88 # Use first compatible backend (highest priority) that's actually available
89 for backend_enum in compatible_backends:
90 backend_name = backend_enum.value
91 if available_backends.get(backend_name, False):
92 return backend_name
94 # No compatible backends are available - fail loud
95 compatible_names = [b.value for b in compatible_backends]
96 raise RuntimeError(f"No compatible backends are actually available. Compatible: {compatible_names}, Available: {available_backends}")