Coverage for openhcs/core/steps/abstract.py: 93.1%
27 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2Abstract Step Interface
4This module defines the AbstractStep interface, which is the base class for all steps
5in the OpenHCS pipeline. It provides the core functionality for step execution,
6validation, and state management.
8Doctrinal Clauses:
9- Clause 3 — Declarative Primacy
10- Clause 12 — Absolute Clean Execution
11- Clause 21 — Context Immunity
12- Clause 65 — No Fallback Logic
13- Clause 66 — Immutability After Construction
14- Clause 88 — No Inferred Capabilities
15- Clause 92 — Structural Validation First
16- Clause 106-A — Declared Memory Types
17- Clause 244 — Rot Intolerance
18- Clause 245 — Declarative Enforcement
19- Clause 246 — Statelessness Mandate
20- Clause 251 — Declarative Memory Conversion
21- Clause 503 — Cognitive Load Transfer
22"""
24import abc
25import logging
26from abc import abstractmethod
27from pathlib import Path
28from typing import TYPE_CHECKING, List, Optional, Union
30from openhcs.constants.constants import VariableComponents, GroupBy, get_default_variable_components, get_default_group_by
31from openhcs.constants.input_source import InputSource
32from openhcs.core.config import PathPlanningConfig
34# Import LazyStepMaterializationConfig for type hints
35from openhcs.core.config import LazyStepMaterializationConfig
36from openhcs.core.config import LazyStepWellFilterConfig
38# Import ContextProvider for automatic step context registration
39from openhcs.config_framework.lazy_factory import ContextProvider
41# ProcessingContext is used in type hints
42if TYPE_CHECKING: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true
43 from openhcs.core.context.processing_context import ProcessingContext
44# StepResult is no longer returned by process()
47#def get_step_id(step: 'AbstractStep') -> str:
48# """
49# Generate a stable step ID from a step object reference.
50#
51# This function provides a deterministic way to derive a step's ID
52# from its object reference, enabling stateless execution where
53# step objects don't need to store their own IDs as attributes.
54#
55# Args:
56# step: The step object to generate an ID for
57#
58# Returns:
59# A stable string ID based on the step object's identity
60#
61# Note:
62# This uses the same algorithm as step.__init__() to ensure
63# consistency between compilation and execution phases.
64# """
65# return str(id(step))
68class AbstractStep(abc.ABC, ContextProvider):
69 """
70 Abstract base class for all steps in the OpenHCS pipeline.
72 Inherits from ContextProvider to enable automatic context injection
73 for lazy configuration resolution.
75 This class defines the interface that all steps must implement.
76 Steps are stateful during pipeline definition and compilation (holding attributes
77 like name, input/output memory types, etc.). After compilation, these attributes
78 are stripped by the StepAttributeStripper, and the step instances become
79 stateless shells. During execution, steps operate solely based on the
80 ProcessingContext (which is frozen) and their specific plan within
81 context.step_plans.
83 Input Source Control:
85 The input_source parameter controls where a step reads its input data:
87 - InputSource.PREVIOUS_STEP (default): Standard pipeline chaining where the step
88 reads from the output directory of the previous step. This maintains normal
89 sequential data flow.
91 - InputSource.PIPELINE_START: The step reads from the original pipeline input
92 directory, bypassing all previous step outputs. This replaces the @chain_breaker
93 decorator functionality and is used for position generation and quality control.
95 Usage Examples:
97 Standard processing step (default):
98 ```python
99 step = FunctionStep(
100 func=my_processing_function,
101 name="process_images"
102 # input_source defaults to InputSource.PREVIOUS_STEP
103 )
104 ```
106 Position generation accessing original images:
107 ```python
108 step = FunctionStep(
109 func=ashlar_compute_tile_positions_gpu,
110 name="compute_positions",
111 input_source=InputSource.PIPELINE_START
112 )
113 ```
115 """
116 _context_type = "step" # Register as step context provider
118 # Attributes like input_memory_type, output_memory_type, etc.,
119 # are defined in concrete subclasses (e.g., FunctionStep) as needed.
121 def __init__(
122 self,
123 *, # Force keyword-only arguments
124 name: str = None,
125 variable_components: List[VariableComponents] = get_default_variable_components(),
126 group_by: Optional[GroupBy] = get_default_group_by(),
127 input_source: InputSource = InputSource.PREVIOUS_STEP,
128 step_well_filter_config: 'LazyStepWellFilterConfig' = LazyStepWellFilterConfig(),
129 step_materialization_config: Optional['LazyStepMaterializationConfig'] = None,
130 napari_streaming_config: Optional['LazyNapariStreamingConfig'] = None,
131 fiji_streaming_config: Optional['LazyFijiStreamingConfig'] = None,
132 ) -> None:
133 """
134 Initialize a step. These attributes are primarily used during the
135 pipeline definition and compilation phase. After compilation, step
136 instances are stripped of these attributes by StepAttributeStripper
137 to enforce statelessness during execution.
139 Args:
140 name: Human-readable name for the step. Defaults to class name.
141 variable_components: List of variable components for this step.
142 group_by: Optional grouping hint for step execution.
143 input_source: Input source strategy for this step. Defaults to PREVIOUS_STEP
144 for normal pipeline chaining. Use PIPELINE_START to access
145 original input data (replaces @chain_breaker decorator).
146 step_materialization_config: Optional LazyStepMaterializationConfig for per-step materialized output.
147 When provided, enables saving materialized copy of step output
148 to custom location in addition to normal memory backend processing.
149 Use LazyStepMaterializationConfig() for safe defaults that prevent path collisions.
150 napari_streaming_config: Optional LazyNapariStreamingConfig for napari streaming.
151 When provided, enables real-time streaming to napari viewer.
152 fiji_streaming_config: Optional LazyFijiStreamingConfig for Fiji streaming.
153 When provided, enables real-time streaming to Fiji viewer.
154 """
155 self.name = name or self.__class__.__name__
156 self.variable_components = variable_components
157 self.group_by = group_by
158 self.input_source = input_source
159 self.step_materialization_config = step_materialization_config
160 self.napari_streaming_config = napari_streaming_config
161 self.fiji_streaming_config = fiji_streaming_config
162 self.step_well_filter_config = step_well_filter_config
164 # Internal compiler hints - set by path planner during compilation
165 self.__input_dir__ = None
166 self.__output_dir__ = None
168 # Generate a stable step_id based on object id at instantiation.
169 # This ID is used to link the step object to its plan in the context.
170# self.step_id = str(id(self))
172 logger_instance = logging.getLogger(__name__)
173 #logger_instance.debug(f"Created step '{self.name}' (type: {self.__class__.__name__}) with ID {self.step_id}")
175 @abc.abstractmethod
176 def process(self, context: 'ProcessingContext', step_index: int) -> None:
177 """
178 Process the step with the given context and step index.
180 This method must be implemented by all step subclasses.
181 During execution, the step instance is stateless. All necessary
182 configuration and paths are retrieved from context.step_plans[self.step_id].
183 The context itself is frozen and must not be modified.
184 Outputs are written to VFS via context.filemanager based on the steps plan.
185 This method returns None.
187 Args:
188 context: The frozen ProcessingContext containing all required fields,
189 including step_plans and filemanager.
190 """
191 # Clause 246 — Statelessness Mandate
192 # Clause 21 — Context Immunity (Context is read-only for steps)
193 raise NotImplementedError("AbstractStep.process() must be implemented by subclasses")