Coverage for openhcs/core/steps/abstract.py: 93.1%

27 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2Abstract Step Interface 

3 

4This module defines the AbstractStep interface, which is the base class for all steps 

5in the OpenHCS pipeline. It provides the core functionality for step execution, 

6validation, and state management. 

7 

8Doctrinal Clauses: 

9- Clause 3 — Declarative Primacy 

10- Clause 12 — Absolute Clean Execution 

11- Clause 21 — Context Immunity 

12- Clause 65 — No Fallback Logic 

13- Clause 66 — Immutability After Construction 

14- Clause 88 — No Inferred Capabilities 

15- Clause 92 — Structural Validation First 

16- Clause 106-A — Declared Memory Types 

17- Clause 244 — Rot Intolerance 

18- Clause 245 — Declarative Enforcement 

19- Clause 246 — Statelessness Mandate 

20- Clause 251 — Declarative Memory Conversion 

21- Clause 503 — Cognitive Load Transfer 

22""" 

23 

24import abc 

25import logging 

26from abc import abstractmethod 

27from pathlib import Path 

28from typing import TYPE_CHECKING, List, Optional, Union 

29 

30from openhcs.constants.constants import VariableComponents, GroupBy, get_default_variable_components, get_default_group_by 

31from openhcs.constants.input_source import InputSource 

32from openhcs.core.config import PathPlanningConfig 

33 

34# Import LazyStepMaterializationConfig for type hints 

35from openhcs.core.config import LazyStepMaterializationConfig 

36from openhcs.core.config import LazyStepWellFilterConfig 

37 

38# Import ContextProvider for automatic step context registration 

39from openhcs.config_framework.lazy_factory import ContextProvider 

40 

41# ProcessingContext is used in type hints 

42if TYPE_CHECKING: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true

43 from openhcs.core.context.processing_context import ProcessingContext 

44# StepResult is no longer returned by process() 

45 

46 

47#def get_step_id(step: 'AbstractStep') -> str: 

48# """ 

49# Generate a stable step ID from a step object reference. 

50# 

51# This function provides a deterministic way to derive a step's ID 

52# from its object reference, enabling stateless execution where 

53# step objects don't need to store their own IDs as attributes. 

54# 

55# Args: 

56# step: The step object to generate an ID for 

57# 

58# Returns: 

59# A stable string ID based on the step object's identity 

60# 

61# Note: 

62# This uses the same algorithm as step.__init__() to ensure 

63# consistency between compilation and execution phases. 

64# """ 

65# return str(id(step)) 

66 

67 

68class AbstractStep(abc.ABC, ContextProvider): 

69 """ 

70 Abstract base class for all steps in the OpenHCS pipeline. 

71 

72 Inherits from ContextProvider to enable automatic context injection 

73 for lazy configuration resolution. 

74 

75 This class defines the interface that all steps must implement. 

76 Steps are stateful during pipeline definition and compilation (holding attributes 

77 like name, input/output memory types, etc.). After compilation, these attributes 

78 are stripped by the StepAttributeStripper, and the step instances become 

79 stateless shells. During execution, steps operate solely based on the 

80 ProcessingContext (which is frozen) and their specific plan within 

81 context.step_plans. 

82 

83 Input Source Control: 

84 

85 The input_source parameter controls where a step reads its input data: 

86 

87 - InputSource.PREVIOUS_STEP (default): Standard pipeline chaining where the step 

88 reads from the output directory of the previous step. This maintains normal 

89 sequential data flow. 

90 

91 - InputSource.PIPELINE_START: The step reads from the original pipeline input 

92 directory, bypassing all previous step outputs. This replaces the @chain_breaker 

93 decorator functionality and is used for position generation and quality control. 

94 

95 Usage Examples: 

96 

97 Standard processing step (default): 

98 ```python 

99 step = FunctionStep( 

100 func=my_processing_function, 

101 name="process_images" 

102 # input_source defaults to InputSource.PREVIOUS_STEP 

103 ) 

104 ``` 

105 

106 Position generation accessing original images: 

107 ```python 

108 step = FunctionStep( 

109 func=ashlar_compute_tile_positions_gpu, 

110 name="compute_positions", 

111 input_source=InputSource.PIPELINE_START 

112 ) 

113 ``` 

114 

115 """ 

116 _context_type = "step" # Register as step context provider 

117 

118 # Attributes like input_memory_type, output_memory_type, etc., 

119 # are defined in concrete subclasses (e.g., FunctionStep) as needed. 

120 

121 def __init__( 

122 self, 

123 *, # Force keyword-only arguments 

124 name: str = None, 

125 variable_components: List[VariableComponents] = get_default_variable_components(), 

126 group_by: Optional[GroupBy] = get_default_group_by(), 

127 input_source: InputSource = InputSource.PREVIOUS_STEP, 

128 step_well_filter_config: 'LazyStepWellFilterConfig' = LazyStepWellFilterConfig(), 

129 step_materialization_config: Optional['LazyStepMaterializationConfig'] = None, 

130 napari_streaming_config: Optional['LazyNapariStreamingConfig'] = None, 

131 fiji_streaming_config: Optional['LazyFijiStreamingConfig'] = None, 

132 ) -> None: 

133 """ 

134 Initialize a step. These attributes are primarily used during the 

135 pipeline definition and compilation phase. After compilation, step 

136 instances are stripped of these attributes by StepAttributeStripper 

137 to enforce statelessness during execution. 

138 

139 Args: 

140 name: Human-readable name for the step. Defaults to class name. 

141 variable_components: List of variable components for this step. 

142 group_by: Optional grouping hint for step execution. 

143 input_source: Input source strategy for this step. Defaults to PREVIOUS_STEP 

144 for normal pipeline chaining. Use PIPELINE_START to access 

145 original input data (replaces @chain_breaker decorator). 

146 step_materialization_config: Optional LazyStepMaterializationConfig for per-step materialized output. 

147 When provided, enables saving materialized copy of step output 

148 to custom location in addition to normal memory backend processing. 

149 Use LazyStepMaterializationConfig() for safe defaults that prevent path collisions. 

150 napari_streaming_config: Optional LazyNapariStreamingConfig for napari streaming. 

151 When provided, enables real-time streaming to napari viewer. 

152 fiji_streaming_config: Optional LazyFijiStreamingConfig for Fiji streaming. 

153 When provided, enables real-time streaming to Fiji viewer. 

154 """ 

155 self.name = name or self.__class__.__name__ 

156 self.variable_components = variable_components 

157 self.group_by = group_by 

158 self.input_source = input_source 

159 self.step_materialization_config = step_materialization_config 

160 self.napari_streaming_config = napari_streaming_config 

161 self.fiji_streaming_config = fiji_streaming_config 

162 self.step_well_filter_config = step_well_filter_config 

163 

164 # Internal compiler hints - set by path planner during compilation 

165 self.__input_dir__ = None 

166 self.__output_dir__ = None 

167 

168 # Generate a stable step_id based on object id at instantiation. 

169 # This ID is used to link the step object to its plan in the context. 

170# self.step_id = str(id(self)) 

171 

172 logger_instance = logging.getLogger(__name__) 

173 #logger_instance.debug(f"Created step '{self.name}' (type: {self.__class__.__name__}) with ID {self.step_id}") 

174 

175 @abc.abstractmethod 

176 def process(self, context: 'ProcessingContext', step_index: int) -> None: 

177 """ 

178 Process the step with the given context and step index. 

179 

180 This method must be implemented by all step subclasses. 

181 During execution, the step instance is stateless. All necessary 

182 configuration and paths are retrieved from context.step_plans[self.step_id]. 

183 The context itself is frozen and must not be modified. 

184 Outputs are written to VFS via context.filemanager based on the steps plan. 

185 This method returns None. 

186 

187 Args: 

188 context: The frozen ProcessingContext containing all required fields, 

189 including step_plans and filemanager. 

190 """ 

191 # Clause 246 — Statelessness Mandate 

192 # Clause 21 — Context Immunity (Context is read-only for steps) 

193 raise NotImplementedError("AbstractStep.process() must be implemented by subclasses")