Coverage for openhcs/core/pipeline/__init__.py: 68.6%
33 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1from typing import List, Dict
2"""
3Pipeline module for the OpenHCS pipeline architecture.
5This module provides components for building and executing pipelines,
6including compilation, execution, and result handling.
8Doctrinal Clauses:
9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath)
10- Clause 17-B — Path Format Discipline
11- Clause 66 — Immutability After Construction
12- Clause 88 — No Inferred Capabilities
13- Clause 106-A — Declared Memory Types
14- Clause 251 — Declarative Memory Conversion
15- Clause 262 — Pre-Runtime Conversion
16- Clause 281 — Context-Bound Identifiers
17- Clause 297 — Immutable Result Enforcement
18- Clause 504 — Pipeline Preparation Modifications
19- Clause 524 — Step = Declaration = ID = Runtime Authority
20"""
22# Import from constants
23from openhcs.constants.constants import (DEFAULT_BACKEND,
24 FORCE_DISK_WRITE, READ_BACKEND,
25 REQUIRES_DISK_READ,
26 REQUIRES_DISK_WRITE,
27 VALID_GPU_MEMORY_TYPES,
28 VALID_MEMORY_TYPES, WRITE_BACKEND,
29 Backend, MemoryType)
30from openhcs.core.pipeline.funcstep_contract_validator import \
31 FuncStepContractValidator
32from openhcs.core.pipeline.materialization_flag_planner import \
33 MaterializationFlagPlanner
34# Import from existing modules
35from openhcs.core.pipeline.path_planner import PipelinePathPlanner
36# Import directly from modules to avoid circular dependency
37from openhcs.core.pipeline.compiler import PipelineCompiler
38# Removed import of GPUMemoryTypeValidator to break circular dependency
39from openhcs.core.pipeline.step_attribute_stripper import \
40 StepAttributeStripper
43# Define Pipeline class
44class Pipeline(list):
45 """
46 A Pipeline that behaves like List[AbstractStep] but carries metadata.
48 This class inherits from list, making it fully compatible with any code
49 expecting List[AbstractStep], while providing additional pipeline-specific
50 functionality like naming, metadata, and serialization.
52 Key Benefits:
53 - Drop-in replacement for List[AbstractStep]
54 - Backward compatible with existing .steps access
55 - Rich metadata support for debugging and UI
56 - Method chaining for fluent pipeline construction
58 Doctrinal Clauses:
59 - Clause 66 — Immutability After Construction (metadata can be set but steps are managed via list methods)
60 - Clause 88 — No Inferred Capabilities (explicit type declarations)
61 """
63 def __init__(self, steps=None, *, name=None, metadata=None, description=None):
64 """
65 Initialize a pipeline that behaves like a list of steps.
67 Args:
68 steps: Initial list of AbstractStep objects
69 name: Human-readable name for the pipeline
70 metadata: Additional metadata dictionary
71 description: Optional description of what this pipeline does
72 """
73 # Initialize the list part with steps
74 super().__init__(steps or [])
76 # Pipeline metadata
77 self.name = name or f"Pipeline_{id(self)}"
78 self.description = description
79 self.metadata = metadata or {}
81 # Add creation timestamp for debugging
82 from datetime import datetime
83 self.metadata.setdefault('created_at', datetime.now().isoformat())
85 @property
86 def steps(self):
87 """
88 Backward compatibility property.
90 Returns self since Pipeline IS a list of steps.
91 This ensures existing code using pipeline.steps continues to work.
92 """
93 return self
95 def add_step(self, step):
96 """
97 Add a step to the pipeline and return self for method chaining.
99 Args:
100 step: AbstractStep to add to the pipeline
102 Returns:
103 self for fluent method chaining
104 """
105 self.append(step)
106 return self
108 def clone(self, *, name=None, metadata=None):
109 """
110 Create a copy of this pipeline with optional new metadata.
112 Args:
113 name: New name for the cloned pipeline
114 metadata: New metadata (merged with existing)
116 Returns:
117 New Pipeline instance with copied steps
118 """
119 new_metadata = self.metadata.copy()
120 if metadata:
121 new_metadata.update(metadata)
123 return Pipeline(
124 steps=self.copy(), # Shallow copy of the step list
125 name=name or f"{self.name}_copy",
126 metadata=new_metadata,
127 description=self.description
128 )
130 def to_dict(self):
131 """
132 Convert the pipeline to a dictionary for serialization.
134 Returns:
135 Dictionary representation of the pipeline
136 """
137 return {
138 "name": self.name,
139 "description": self.description,
140 "steps": list(self), # Convert to plain list for serialization
141 "metadata": self.metadata,
142 "step_count": len(self)
143 }
145 def __repr__(self):
146 """Enhanced string representation for debugging."""
147 return f"Pipeline(name='{self.name}', steps={len(self)})"
149 def __str__(self):
150 """Human-readable string representation."""
151 step_summary = f"{len(self)} step{'s' if len(self) != 1 else ''}"
152 return f"{self.name} ({step_summary})"
154__all__ = [
155 # Constants from backends
156 'Backend',
157 'DEFAULT_BACKEND',
158 'REQUIRES_DISK_READ',
159 'REQUIRES_DISK_WRITE',
160 'FORCE_DISK_WRITE',
161 'READ_BACKEND',
162 'WRITE_BACKEND',
164 # Constants from memory
165 'MemoryType',
166 'VALID_MEMORY_TYPES',
167 'VALID_GPU_MEMORY_TYPES',
169 # Core components
170 'Pipeline',
171 'PipelineCompiler',
173 # Planner components
174 'PipelinePathPlanner',
175 'MaterializationFlagPlanner',
176 'FuncStepContractValidator',
177 # Removed GPUMemoryTypeValidator to break circular dependency
178 'StepAttributeStripper'
179]