Coverage for openhcs/core/steps/abstract.py: 92.9%

26 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Abstract Step Interface 

3 

4This module defines the AbstractStep interface, which is the base class for all steps 

5in the OpenHCS pipeline. It provides the core functionality for step execution, 

6validation, and state management. 

7 

8Doctrinal Clauses: 

9- Clause 3 — Declarative Primacy 

10- Clause 12 — Absolute Clean Execution 

11- Clause 21 — Context Immunity 

12- Clause 65 — No Fallback Logic 

13- Clause 66 — Immutability After Construction 

14- Clause 88 — No Inferred Capabilities 

15- Clause 92 — Structural Validation First 

16- Clause 106-A — Declared Memory Types 

17- Clause 244 — Rot Intolerance 

18- Clause 245 — Declarative Enforcement 

19- Clause 246 — Statelessness Mandate 

20- Clause 251 — Declarative Memory Conversion 

21- Clause 503 — Cognitive Load Transfer 

22""" 

23 

24import abc 

25import logging 

26from typing import TYPE_CHECKING, List, Optional 

27 

28from openhcs.constants.constants import VariableComponents, GroupBy, get_default_variable_components, get_default_group_by 

29from openhcs.constants.input_source import InputSource 

30 

31# Import LazyStepMaterializationConfig for type hints 

32from openhcs.core.config import LazyStepMaterializationConfig 

33from openhcs.core.config import LazyStepWellFilterConfig 

34 

35# Import ContextProvider for automatic step context registration 

36from openhcs.config_framework.lazy_factory import ContextProvider 

37 

38# ProcessingContext is used in type hints 

39if TYPE_CHECKING: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true

40 from openhcs.core.context.processing_context import ProcessingContext 

41# StepResult is no longer returned by process() 

42 

43 

44#def get_step_id(step: 'AbstractStep') -> str: 

45# """ 

46# Generate a stable step ID from a step object reference. 

47# 

48# This function provides a deterministic way to derive a step's ID 

49# from its object reference, enabling stateless execution where 

50# step objects don't need to store their own IDs as attributes. 

51# 

52# Args: 

53# step: The step object to generate an ID for 

54# 

55# Returns: 

56# A stable string ID based on the step object's identity 

57# 

58# Note: 

59# This uses the same algorithm as step.__init__() to ensure 

60# consistency between compilation and execution phases. 

61# """ 

62# return str(id(step)) 

63 

64 

65class AbstractStep(abc.ABC, ContextProvider): 

66 """ 

67 Abstract base class for all steps in the OpenHCS pipeline. 

68 

69 Inherits from ContextProvider to enable automatic context injection 

70 for lazy configuration resolution. 

71 

72 This class defines the interface that all steps must implement. 

73 Steps are stateful during pipeline definition and compilation (holding attributes 

74 like name, input/output memory types, etc.). After compilation, these attributes 

75 are stripped by the StepAttributeStripper, and the step instances become 

76 stateless shells. During execution, steps operate solely based on the 

77 ProcessingContext (which is frozen) and their specific plan within 

78 context.step_plans. 

79 

80 Input Source Control: 

81 

82 The input_source parameter controls where a step reads its input data: 

83 

84 - InputSource.PREVIOUS_STEP (default): Standard pipeline chaining where the step 

85 reads from the output directory of the previous step. This maintains normal 

86 sequential data flow. 

87 

88 - InputSource.PIPELINE_START: The step reads from the original pipeline input 

89 directory, bypassing all previous step outputs. This replaces the @chain_breaker 

90 decorator functionality and is used for position generation and quality control. 

91 

92 Usage Examples: 

93 

94 Standard processing step (default): 

95 ```python 

96 step = FunctionStep( 

97 func=my_processing_function, 

98 name="process_images" 

99 # input_source defaults to InputSource.PREVIOUS_STEP 

100 ) 

101 ``` 

102 

103 Position generation accessing original images: 

104 ```python 

105 step = FunctionStep( 

106 func=ashlar_compute_tile_positions_gpu, 

107 name="compute_positions", 

108 input_source=InputSource.PIPELINE_START 

109 ) 

110 ``` 

111 

112 """ 

113 _context_type = "step" # Register as step context provider 

114 

115 # Attributes like input_memory_type, output_memory_type, etc., 

116 # are defined in concrete subclasses (e.g., FunctionStep) as needed. 

117 

118 def __init__( 

119 self, 

120 *, # Force keyword-only arguments 

121 name: str = None, 

122 description: str = None, 

123 enabled: bool = True, 

124 variable_components: List[VariableComponents] = get_default_variable_components(), 

125 group_by: Optional[GroupBy] = get_default_group_by(), 

126 input_source: InputSource = InputSource.PREVIOUS_STEP, 

127 step_well_filter_config: 'LazyStepWellFilterConfig' = LazyStepWellFilterConfig(), 

128 step_materialization_config: Optional['LazyStepMaterializationConfig'] = None, 

129 napari_streaming_config: Optional['LazyNapariStreamingConfig'] = None, 

130 fiji_streaming_config: Optional['LazyFijiStreamingConfig'] = None, 

131 ) -> None: 

132 """ 

133 Initialize a step. These attributes are primarily used during the 

134 pipeline definition and compilation phase. After compilation, step 

135 instances are stripped of these attributes by StepAttributeStripper 

136 to enforce statelessness during execution. 

137 

138 Args: 

139 name: Human-readable name for the step. Defaults to class name. 

140 description: Optional description of what this step does. 

141 enabled: Whether this step is enabled. Disabled steps are filtered out 

142 during pipeline compilation. Defaults to True. 

143 variable_components: List of variable components for this step. 

144 group_by: Optional grouping hint for step execution. 

145 input_source: Input source strategy for this step. Defaults to PREVIOUS_STEP 

146 for normal pipeline chaining. Use PIPELINE_START to access 

147 original input data (replaces @chain_breaker decorator). 

148 step_materialization_config: Optional LazyStepMaterializationConfig for per-step materialized output. 

149 When provided, enables saving materialized copy of step output 

150 to custom location in addition to normal memory backend processing. 

151 Use LazyStepMaterializationConfig() for safe defaults that prevent path collisions. 

152 napari_streaming_config: Optional LazyNapariStreamingConfig for napari streaming. 

153 When provided, enables real-time streaming to napari viewer. 

154 fiji_streaming_config: Optional LazyFijiStreamingConfig for Fiji streaming. 

155 When provided, enables real-time streaming to Fiji viewer. 

156 """ 

157 self.name = name or self.__class__.__name__ 

158 self.description = description 

159 self.enabled = enabled 

160 self.variable_components = variable_components 

161 self.group_by = group_by 

162 self.input_source = input_source 

163 self.step_materialization_config = step_materialization_config 

164 self.napari_streaming_config = napari_streaming_config 

165 self.fiji_streaming_config = fiji_streaming_config 

166 self.step_well_filter_config = step_well_filter_config 

167 

168 # Internal compiler hints - set by path planner during compilation 

169 self.__input_dir__ = None 

170 self.__output_dir__ = None 

171 

172 # Generate a stable step_id based on object id at instantiation. 

173 # This ID is used to link the step object to its plan in the context. 

174# self.step_id = str(id(self)) 

175 

176 logger_instance = logging.getLogger(__name__) 

177 #logger_instance.debug(f"Created step '{self.name}' (type: {self.__class__.__name__}) with ID {self.step_id}") 

178 

179 @abc.abstractmethod 

180 def process(self, context: 'ProcessingContext', step_index: int) -> None: 

181 """ 

182 Process the step with the given context and step index. 

183 

184 This method must be implemented by all step subclasses. 

185 During execution, the step instance is stateless. All necessary 

186 configuration and paths are retrieved from context.step_plans[self.step_id]. 

187 The context itself is frozen and must not be modified. 

188 Outputs are written to VFS via context.filemanager based on the steps plan. 

189 This method returns None. 

190 

191 Args: 

192 context: The frozen ProcessingContext containing all required fields, 

193 including step_plans and filemanager. 

194 """ 

195 # Clause 246 — Statelessness Mandate 

196 # Clause 21 — Context Immunity (Context is read-only for steps) 

197 raise NotImplementedError("AbstractStep.process() must be implemented by subclasses")