Coverage for openhcs/core/pipeline/materialization_flag_planner.py: 92.3%

47 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Materialization flag planner for OpenHCS. 

3 

4This module provides the MaterializationFlagPlanner class, which is responsible for 

5determining materialization flags and backend selection for each step in a pipeline. 

6 

7Doctrinal Clauses: 

8- Clause 12 — Absolute Clean Execution 

9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath) 

10- Clause 65 — No Fallback Logic 

11- Clause 66 — Immutability After Construction 

12- Clause 88 — No Inferred Capabilities 

13- Clause 245 — Path Declaration 

14- Clause 273 — Backend Authorization Doctrine 

15- Clause 276 — Positional Backend Enforcement 

16- Clause 504 — Pipeline Preparation Modifications 

17""" 

18 

19import logging 

20from pathlib import Path 

21from typing import List 

22 

23from openhcs.constants.constants import READ_BACKEND, WRITE_BACKEND, Backend 

24from openhcs.core.context.processing_context import ProcessingContext 

25from openhcs.core.steps.abstract import AbstractStep 

26from openhcs.core.config import MaterializationBackend 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class MaterializationFlagPlanner: 

32 """Sets read/write backends for pipeline steps.""" 

33 

34 @staticmethod 

35 def prepare_pipeline_flags( 

36 context: ProcessingContext, 

37 pipeline_definition: List[AbstractStep], 

38 plate_path: Path, 

39 pipeline_config 

40 ) -> None: 

41 """Set read/write backends for pipeline steps.""" 

42 

43 # === SETUP === 

44 # Access config directly from pipeline_config (lazy resolution via config_context) 

45 vfs_config = pipeline_config.vfs_config 

46 step_plans = context.step_plans 

47 

48 # === PROCESS EACH STEP === 

49 for i, step in enumerate(pipeline_definition): 

50 step_plan = step_plans[i] # Use step index instead of step_id 

51 

52 # === READ BACKEND SELECTION === 

53 if i == 0: # First step - read from plate format 

54 read_backend = MaterializationFlagPlanner._get_first_step_read_backend(context, vfs_config) 

55 step_plan[READ_BACKEND] = read_backend 

56 

57 # Zarr conversion flag is already set by path planner if needed 

58 else: # Other steps - read from memory (unless already set by chainbreaker logic) 

59 if READ_BACKEND not in step_plan: 59 ↛ 70line 59 didn't jump to line 70 because the condition on line 59 was always true

60 # Check if this step reads from PIPELINE_START (original input) 

61 from openhcs.core.steps.abstract import InputSource 

62 if step.input_source == InputSource.PIPELINE_START: 

63 # Use the same backend as the first step 

64 step_plan[READ_BACKEND] = step_plans[0][READ_BACKEND] 

65 else: 

66 step_plan[READ_BACKEND] = Backend.MEMORY.value 

67 

68 # === WRITE BACKEND SELECTION === 

69 # Check if this step will use zarr (has zarr_config set by compiler) 

70 will_use_zarr = step_plan.get("zarr_config") is not None 

71 

72 if will_use_zarr: 

73 # Steps with zarr_config should write to materialization backend 

74 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config) 

75 step_plan[WRITE_BACKEND] = materialization_backend 

76 elif i == len(pipeline_definition) - 1: # Last step without zarr - write to materialization backend 

77 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config) 

78 step_plan[WRITE_BACKEND] = materialization_backend 

79 else: # Other steps - write to memory 

80 step_plan[WRITE_BACKEND] = Backend.MEMORY.value 

81 

82 # === PER-STEP MATERIALIZATION BACKEND SELECTION === 

83 if "materialized_output_dir" in step_plan: 

84 materialization_backend = MaterializationFlagPlanner._resolve_materialization_backend(context, vfs_config) 

85 step_plan["materialized_backend"] = materialization_backend 

86 

87 @staticmethod 

88 def _get_first_step_read_backend(context: ProcessingContext, vfs_config) -> str: 

89 """Get read backend for first step based on VFS config and metadata-based auto-detection.""" 

90 

91 # Check if user explicitly configured a read backend 

92 if vfs_config.read_backend != Backend.AUTO: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 return vfs_config.read_backend.value 

94 

95 # AUTO mode: Use unified backend detection 

96 return MaterializationFlagPlanner._detect_backend_for_context(context, fallback_backend=Backend.DISK.value) 

97 

98 @staticmethod 

99 def _resolve_materialization_backend(context: ProcessingContext, vfs_config) -> str: 

100 """Resolve materialization backend, handling AUTO option.""" 

101 # Check if user explicitly configured a materialization backend 

102 if vfs_config.materialization_backend != MaterializationBackend.AUTO: 102 ↛ 106line 102 didn't jump to line 106 because the condition on line 102 was always true

103 return vfs_config.materialization_backend.value 

104 

105 # AUTO mode: Use unified backend detection 

106 return MaterializationFlagPlanner._detect_backend_for_context(context, fallback_backend=MaterializationBackend.DISK.value) 

107 

108 @staticmethod 

109 def _detect_backend_for_context(context: ProcessingContext, fallback_backend: str) -> str: 

110 """Unified backend detection logic for both read and materialization backends.""" 

111 # Use the microscope handler's get_primary_backend method 

112 # This handles both OpenHCS (metadata-based) and other microscopes (compatibility-based) 

113 return context.microscope_handler.get_primary_backend(context.input_dir, context.filemanager) 

114 

115 

116 

117 

118 

119