Coverage for openhcs/core/steps/abstract.py: 92.9%
26 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Abstract Step Interface
4This module defines the AbstractStep interface, which is the base class for all steps
5in the OpenHCS pipeline. It provides the core functionality for step execution,
6validation, and state management.
8Doctrinal Clauses:
9- Clause 3 — Declarative Primacy
10- Clause 12 — Absolute Clean Execution
11- Clause 21 — Context Immunity
12- Clause 65 — No Fallback Logic
13- Clause 66 — Immutability After Construction
14- Clause 88 — No Inferred Capabilities
15- Clause 92 — Structural Validation First
16- Clause 106-A — Declared Memory Types
17- Clause 244 — Rot Intolerance
18- Clause 245 — Declarative Enforcement
19- Clause 246 — Statelessness Mandate
20- Clause 251 — Declarative Memory Conversion
21- Clause 503 — Cognitive Load Transfer
22"""
24import abc
25import logging
26from typing import TYPE_CHECKING, List, Optional
28from openhcs.constants.constants import VariableComponents, GroupBy, get_default_variable_components, get_default_group_by
29from openhcs.constants.input_source import InputSource
31# Import LazyStepMaterializationConfig for type hints
32from openhcs.core.config import LazyStepMaterializationConfig
33from openhcs.core.config import LazyStepWellFilterConfig
35# Import ContextProvider for automatic step context registration
36from openhcs.config_framework.lazy_factory import ContextProvider
38# ProcessingContext is used in type hints
39if TYPE_CHECKING: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true
40 from openhcs.core.context.processing_context import ProcessingContext
41# StepResult is no longer returned by process()
44#def get_step_id(step: 'AbstractStep') -> str:
45# """
46# Generate a stable step ID from a step object reference.
47#
48# This function provides a deterministic way to derive a step's ID
49# from its object reference, enabling stateless execution where
50# step objects don't need to store their own IDs as attributes.
51#
52# Args:
53# step: The step object to generate an ID for
54#
55# Returns:
56# A stable string ID based on the step object's identity
57#
58# Note:
59# This uses the same algorithm as step.__init__() to ensure
60# consistency between compilation and execution phases.
61# """
62# return str(id(step))
65class AbstractStep(abc.ABC, ContextProvider):
66 """
67 Abstract base class for all steps in the OpenHCS pipeline.
69 Inherits from ContextProvider to enable automatic context injection
70 for lazy configuration resolution.
72 This class defines the interface that all steps must implement.
73 Steps are stateful during pipeline definition and compilation (holding attributes
74 like name, input/output memory types, etc.). After compilation, these attributes
75 are stripped by the StepAttributeStripper, and the step instances become
76 stateless shells. During execution, steps operate solely based on the
77 ProcessingContext (which is frozen) and their specific plan within
78 context.step_plans.
80 Input Source Control:
82 The input_source parameter controls where a step reads its input data:
84 - InputSource.PREVIOUS_STEP (default): Standard pipeline chaining where the step
85 reads from the output directory of the previous step. This maintains normal
86 sequential data flow.
88 - InputSource.PIPELINE_START: The step reads from the original pipeline input
89 directory, bypassing all previous step outputs. This replaces the @chain_breaker
90 decorator functionality and is used for position generation and quality control.
92 Usage Examples:
94 Standard processing step (default):
95 ```python
96 step = FunctionStep(
97 func=my_processing_function,
98 name="process_images"
99 # input_source defaults to InputSource.PREVIOUS_STEP
100 )
101 ```
103 Position generation accessing original images:
104 ```python
105 step = FunctionStep(
106 func=ashlar_compute_tile_positions_gpu,
107 name="compute_positions",
108 input_source=InputSource.PIPELINE_START
109 )
110 ```
112 """
113 _context_type = "step" # Register as step context provider
115 # Attributes like input_memory_type, output_memory_type, etc.,
116 # are defined in concrete subclasses (e.g., FunctionStep) as needed.
118 def __init__(
119 self,
120 *, # Force keyword-only arguments
121 name: str = None,
122 description: str = None,
123 enabled: bool = True,
124 variable_components: List[VariableComponents] = get_default_variable_components(),
125 group_by: Optional[GroupBy] = get_default_group_by(),
126 input_source: InputSource = InputSource.PREVIOUS_STEP,
127 step_well_filter_config: 'LazyStepWellFilterConfig' = LazyStepWellFilterConfig(),
128 step_materialization_config: Optional['LazyStepMaterializationConfig'] = None,
129 napari_streaming_config: Optional['LazyNapariStreamingConfig'] = None,
130 fiji_streaming_config: Optional['LazyFijiStreamingConfig'] = None,
131 ) -> None:
132 """
133 Initialize a step. These attributes are primarily used during the
134 pipeline definition and compilation phase. After compilation, step
135 instances are stripped of these attributes by StepAttributeStripper
136 to enforce statelessness during execution.
138 Args:
139 name: Human-readable name for the step. Defaults to class name.
140 description: Optional description of what this step does.
141 enabled: Whether this step is enabled. Disabled steps are filtered out
142 during pipeline compilation. Defaults to True.
143 variable_components: List of variable components for this step.
144 group_by: Optional grouping hint for step execution.
145 input_source: Input source strategy for this step. Defaults to PREVIOUS_STEP
146 for normal pipeline chaining. Use PIPELINE_START to access
147 original input data (replaces @chain_breaker decorator).
148 step_materialization_config: Optional LazyStepMaterializationConfig for per-step materialized output.
149 When provided, enables saving materialized copy of step output
150 to custom location in addition to normal memory backend processing.
151 Use LazyStepMaterializationConfig() for safe defaults that prevent path collisions.
152 napari_streaming_config: Optional LazyNapariStreamingConfig for napari streaming.
153 When provided, enables real-time streaming to napari viewer.
154 fiji_streaming_config: Optional LazyFijiStreamingConfig for Fiji streaming.
155 When provided, enables real-time streaming to Fiji viewer.
156 """
157 self.name = name or self.__class__.__name__
158 self.description = description
159 self.enabled = enabled
160 self.variable_components = variable_components
161 self.group_by = group_by
162 self.input_source = input_source
163 self.step_materialization_config = step_materialization_config
164 self.napari_streaming_config = napari_streaming_config
165 self.fiji_streaming_config = fiji_streaming_config
166 self.step_well_filter_config = step_well_filter_config
168 # Internal compiler hints - set by path planner during compilation
169 self.__input_dir__ = None
170 self.__output_dir__ = None
172 # Generate a stable step_id based on object id at instantiation.
173 # This ID is used to link the step object to its plan in the context.
174# self.step_id = str(id(self))
176 logger_instance = logging.getLogger(__name__)
177 #logger_instance.debug(f"Created step '{self.name}' (type: {self.__class__.__name__}) with ID {self.step_id}")
179 @abc.abstractmethod
180 def process(self, context: 'ProcessingContext', step_index: int) -> None:
181 """
182 Process the step with the given context and step index.
184 This method must be implemented by all step subclasses.
185 During execution, the step instance is stateless. All necessary
186 configuration and paths are retrieved from context.step_plans[self.step_id].
187 The context itself is frozen and must not be modified.
188 Outputs are written to VFS via context.filemanager based on the steps plan.
189 This method returns None.
191 Args:
192 context: The frozen ProcessingContext containing all required fields,
193 including step_plans and filemanager.
194 """
195 # Clause 246 — Statelessness Mandate
196 # Clause 21 — Context Immunity (Context is read-only for steps)
197 raise NotImplementedError("AbstractStep.process() must be implemented by subclasses")