Coverage for openhcs/core/pipeline/materialization_flag_planner.py: 79.3%

40 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Materialization flag planner for OpenHCS. 

3 

4This module provides the MaterializationFlagPlanner class, which is responsible for 

5determining materialization flags and backend selection for each step in a pipeline. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath) 

10- Clause 65 — No Fallback Logic 

11- Clause 66 — Immutability After Construction 

12- Clause 88 — No Inferred Capabilities 

13- Clause 245 — Path Declaration 

14- Clause 273 — Backend Authorization Doctrine 

15- Clause 276 — Positional Backend Enforcement 

16- Clause 504 — Pipeline Preparation Modifications 

17""" 

18 

19import logging 

20from pathlib import Path 

21from typing import Any, Dict, List 

22 

23from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend 

24from openhcs.core.context.processing_context import ProcessingContext 

25from openhcs.core.steps.abstract import AbstractStep 

26from openhcs.core.config import MaterializationBackend 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class MaterializationFlagPlanner: 

32 """Sets read/write backends for pipeline steps.""" 

33 

34 @staticmethod 

35 def prepare_pipeline_flags( 

36 context: ProcessingContext, 

37 pipeline_definition: List[AbstractStep], 

38 plate_path: Path 

39 ) -> None: 

40 """Set read/write backends for pipeline steps.""" 

41 

42 # === SETUP === 

43 vfs_config = context.get_vfs_config() 

44 step_plans = context.step_plans 

45 

46 # === PROCESS EACH STEP === 

47 for i, step in enumerate(pipeline_definition): 

48 step_plan = step_plans[step.step_id] 

49 

50 # === READ BACKEND SELECTION === 

51 if i == 0: # First step - read from plate format 

52 read_backend = MaterializationFlagPlanner._get_first_step_read_backend(context) 

53 step_plan[READ_BACKEND] = read_backend 

54 

55 # Zarr conversion flag is already set by path planner if needed 

56 else: # Other steps - read from memory (unless already set by chainbreaker logic) 

57 if READ_BACKEND not in step_plan: 

58 step_plan[READ_BACKEND] = Backend.MEMORY.value 

59 

60 # === WRITE BACKEND SELECTION === 

61 # Check if this step will use zarr (has zarr_config set by compiler) 

62 will_use_zarr = step_plan.get("zarr_config") is not None 

63 

64 if will_use_zarr: 

65 # Steps with zarr_config should write to materialization backend 

66 step_plan[WRITE_BACKEND] = vfs_config.materialization_backend.value 

67 elif i == len(pipeline_definition) - 1: # Last step without zarr - write to materialization backend 

68 step_plan[WRITE_BACKEND] = vfs_config.materialization_backend.value 

69 else: # Other steps - write to memory 

70 step_plan[WRITE_BACKEND] = Backend.MEMORY.value 

71 

72 # === PER-STEP MATERIALIZATION BACKEND SELECTION === 

73 if "materialized_output_dir" in step_plan: 

74 step_plan["materialized_backend"] = vfs_config.materialization_backend.value 

75 

76 @staticmethod 

77 def _get_first_step_read_backend(context: ProcessingContext) -> str: 

78 """Get read backend for first step based on compatible backends (in priority order) and availability.""" 

79 compatible_backends = context.microscope_handler.compatible_backends 

80 

81 if len(compatible_backends) == 1: 81 ↛ 86line 81 didn't jump to line 86 because the condition on line 81 was always true

82 # Only one compatible - use its string value 

83 return compatible_backends[0].value 

84 else: 

85 # Multiple compatible - check availability in priority order 

86 available_backends = context.microscope_handler.metadata_handler.get_available_backends(context.input_dir) 

87 

88 # Use first compatible backend (highest priority) that's actually available 

89 for backend_enum in compatible_backends: 

90 backend_name = backend_enum.value 

91 if available_backends.get(backend_name, False): 

92 return backend_name 

93 

94 # No compatible backends are available - fail loud 

95 compatible_names = [b.value for b in compatible_backends] 

96 raise RuntimeError(f"No compatible backends are actually available. Compatible: {compatible_names}, Available: {available_backends}") 

97 

98 

99 

100 

101 

102