Coverage for openhcs/core/pipeline/materialization_flag_planner.py: 93.4%

45 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Materialization flag planner for OpenHCS. 

3 

4This module provides the MaterializationFlagPlanner class, which is responsible for 

5determining materialization flags and backend selection for each step in a pipeline. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath) 

10- Clause 65 — No Fallback Logic 

11- Clause 66 — Immutability After Construction 

12- Clause 88 — No Inferred Capabilities 

13- Clause 245 — Path Declaration 

14- Clause 273 — Backend Authorization Doctrine 

15- Clause 276 — Positional Backend Enforcement 

16- Clause 504 — Pipeline Preparation Modifications 

17""" 

18 

19import logging 

20from pathlib import Path 

21from typing import Any, Dict, List 

22 

23from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend 

24from openhcs.constants import Microscope 

25from openhcs.core.context.processing_context import ProcessingContext 

26from openhcs.core.steps.abstract import AbstractStep 

27from openhcs.core.config import MaterializationBackend 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32class MaterializationFlagPlanner: 

33 """Sets read/write backends for pipeline steps.""" 

34 

35 @staticmethod 

36 def prepare_pipeline_flags( 

37 context: ProcessingContext, 

38 pipeline_definition: List[AbstractStep], 

39 plate_path: Path, 

40 pipeline_config 

41 ) -> None: 

42 """Set read/write backends for pipeline steps.""" 

43 

44 # === SETUP === 

45 # Access config directly from pipeline_config (lazy resolution via config_context) 

46 vfs_config = pipeline_config.vfs_config 

47 step_plans = context.step_plans 

48 

49 # === PROCESS EACH STEP === 

50 for i, step in enumerate(pipeline_definition): 

51 step_plan = step_plans[i] # Use step index instead of step_id 

52 

53 # === READ BACKEND SELECTION === 

54 if i == 0: # First step - read from plate format 

55 read_backend = MaterializationFlagPlanner._get_first_step_read_backend(context, vfs_config) 

56 step_plan[READ_BACKEND] = read_backend 

57 

58 # Zarr conversion flag is already set by path planner if needed 

59 else: # Other steps - read from memory (unless already set by chainbreaker logic) 

60 if READ_BACKEND not in step_plan: 

61 step_plan[READ_BACKEND] = Backend.MEMORY.value 

62 

63 # === WRITE BACKEND SELECTION === 

64 # Check if this step will use zarr (has zarr_config set by compiler) 

65 will_use_zarr = step_plan.get("zarr_config") is not None 

66 

67 if will_use_zarr: 

68 # Steps with zarr_config should write to materialization backend 

69 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config) 

70 step_plan[WRITE_BACKEND] = materialization_backend 

71 elif i == len(pipeline_definition) - 1: # Last step without zarr - write to materialization backend 

72 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config) 

73 step_plan[WRITE_BACKEND] = materialization_backend 

74 else: # Other steps - write to memory 

75 step_plan[WRITE_BACKEND] = Backend.MEMORY.value 

76 

77 # === PER-STEP MATERIALIZATION BACKEND SELECTION === 

78 if "materialized_output_dir" in step_plan: 

79 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config) 

80 step_plan["materialized_backend"] = materialization_backend 

81 

82 @staticmethod 

83 def _get_first_step_read_backend(context: ProcessingContext, vfs_config) -> str: 

84 """Get read backend for first step based on VFS config and metadata-based auto-detection.""" 

85 

86 # Check if user explicitly configured a read backend 

87 if vfs_config.read_backend != Backend.AUTO: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 return vfs_config.read_backend.value 

89 

90 # AUTO mode: Use unified backend detection 

91 return MaterializationFlagPlanner._detect_backend_for_context(context, fallback_backend=Backend.DISK.value) 

92 

93 @staticmethod 

94 def _resolve_materialization_backend(context: ProcessingContext, vfs_config) -> str: 

95 """Resolve materialization backend, handling AUTO option.""" 

96 # Check if user explicitly configured a materialization backend 

97 if vfs_config.materialization_backend != MaterializationBackend.AUTO: 97 ↛ 101line 97 didn't jump to line 101 because the condition on line 97 was always true

98 return vfs_config.materialization_backend.value 

99 

100 # AUTO mode: Use unified backend detection 

101 return MaterializationFlagPlanner._detect_backend_for_context(context, fallback_backend=MaterializationBackend.DISK.value) 

102 

103 @staticmethod 

104 def _detect_backend_for_context(context: ProcessingContext, fallback_backend: str) -> str: 

105 """Unified backend detection logic for both read and materialization backends.""" 

106 # Use the microscope handler's get_primary_backend method 

107 # This handles both OpenHCS (metadata-based) and other microscopes (compatibility-based) 

108 return context.microscope_handler.get_primary_backend(context.input_dir) 

109 

110 

111 

112 

113 

114