Coverage for openhcs/core/pipeline/__init__.py: 68.6%

33 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1from typing import List, Dict 

2""" 

3Pipeline module for the OpenHCS pipeline architecture. 

4 

5This module provides components for building and executing pipelines, 

6including compilation, execution, and result handling. 

7 

8Doctrinal Clauses: 

9- Clause 17 — VFS Exclusivity (FileManager is the only component that uses VirtualPath) 

10- Clause 17-B — Path Format Discipline 

11- Clause 66 — Immutability After Construction 

12- Clause 88 — No Inferred Capabilities 

13- Clause 106-A — Declared Memory Types 

14- Clause 251 — Declarative Memory Conversion 

15- Clause 262 — Pre-Runtime Conversion 

16- Clause 281 — Context-Bound Identifiers 

17- Clause 297 — Immutable Result Enforcement 

18- Clause 504 — Pipeline Preparation Modifications 

19- Clause 524 — Step = Declaration = ID = Runtime Authority 

20""" 

21 

22# Import from constants 

23from openhcs.constants.constants import (DEFAULT_BACKEND, 

24 FORCE_DISK_WRITE, READ_BACKEND, 

25 REQUIRES_DISK_READ, 

26 REQUIRES_DISK_WRITE, 

27 VALID_GPU_MEMORY_TYPES, 

28 VALID_MEMORY_TYPES, WRITE_BACKEND, 

29 Backend, MemoryType) 

30from openhcs.core.pipeline.funcstep_contract_validator import \ 

31 FuncStepContractValidator 

32from openhcs.core.pipeline.materialization_flag_planner import \ 

33 MaterializationFlagPlanner 

34# Import from existing modules 

35from openhcs.core.pipeline.path_planner import PipelinePathPlanner 

36# Import directly from modules to avoid circular dependency 

37from openhcs.core.pipeline.compiler import PipelineCompiler 

38# Removed import of GPUMemoryTypeValidator to break circular dependency 

39from openhcs.core.pipeline.step_attribute_stripper import \ 

40 StepAttributeStripper 

41 

42 

43# Define Pipeline class 

44class Pipeline(list): 

45 """ 

46 A Pipeline that behaves like List[AbstractStep] but carries metadata. 

47 

48 This class inherits from list, making it fully compatible with any code 

49 expecting List[AbstractStep], while providing additional pipeline-specific 

50 functionality like naming, metadata, and serialization. 

51 

52 Key Benefits: 

53 - Drop-in replacement for List[AbstractStep] 

54 - Backward compatible with existing .steps access 

55 - Rich metadata support for debugging and UI 

56 - Method chaining for fluent pipeline construction 

57 

58 Doctrinal Clauses: 

59 - Clause 66 — Immutability After Construction (metadata can be set but steps are managed via list methods) 

60 - Clause 88 — No Inferred Capabilities (explicit type declarations) 

61 """ 

62 

63 def __init__(self, steps=None, *, name=None, metadata=None, description=None): 

64 """ 

65 Initialize a pipeline that behaves like a list of steps. 

66 

67 Args: 

68 steps: Initial list of AbstractStep objects 

69 name: Human-readable name for the pipeline 

70 metadata: Additional metadata dictionary 

71 description: Optional description of what this pipeline does 

72 """ 

73 # Initialize the list part with steps 

74 super().__init__(steps or []) 

75 

76 # Pipeline metadata 

77 self.name = name or f"Pipeline_{id(self)}" 

78 self.description = description 

79 self.metadata = metadata or {} 

80 

81 # Add creation timestamp for debugging 

82 from datetime import datetime 

83 self.metadata.setdefault('created_at', datetime.now().isoformat()) 

84 

85 @property 

86 def steps(self): 

87 """ 

88 Backward compatibility property. 

89 

90 Returns self since Pipeline IS a list of steps. 

91 This ensures existing code using pipeline.steps continues to work. 

92 """ 

93 return self 

94 

95 def add_step(self, step): 

96 """ 

97 Add a step to the pipeline and return self for method chaining. 

98 

99 Args: 

100 step: AbstractStep to add to the pipeline 

101 

102 Returns: 

103 self for fluent method chaining 

104 """ 

105 self.append(step) 

106 return self 

107 

108 def clone(self, *, name=None, metadata=None): 

109 """ 

110 Create a copy of this pipeline with optional new metadata. 

111 

112 Args: 

113 name: New name for the cloned pipeline 

114 metadata: New metadata (merged with existing) 

115 

116 Returns: 

117 New Pipeline instance with copied steps 

118 """ 

119 new_metadata = self.metadata.copy() 

120 if metadata: 

121 new_metadata.update(metadata) 

122 

123 return Pipeline( 

124 steps=self.copy(), # Shallow copy of the step list 

125 name=name or f"{self.name}_copy", 

126 metadata=new_metadata, 

127 description=self.description 

128 ) 

129 

130 def to_dict(self): 

131 """ 

132 Convert the pipeline to a dictionary for serialization. 

133 

134 Returns: 

135 Dictionary representation of the pipeline 

136 """ 

137 return { 

138 "name": self.name, 

139 "description": self.description, 

140 "steps": list(self), # Convert to plain list for serialization 

141 "metadata": self.metadata, 

142 "step_count": len(self) 

143 } 

144 

145 def __repr__(self): 

146 """Enhanced string representation for debugging.""" 

147 return f"Pipeline(name='{self.name}', steps={len(self)})" 

148 

149 def __str__(self): 

150 """Human-readable string representation.""" 

151 step_summary = f"{len(self)} step{'s' if len(self) != 1 else ''}" 

152 return f"{self.name} ({step_summary})" 

153 

154__all__ = [ 

155 # Constants from backends 

156 'Backend', 

157 'DEFAULT_BACKEND', 

158 'REQUIRES_DISK_READ', 

159 'REQUIRES_DISK_WRITE', 

160 'FORCE_DISK_WRITE', 

161 'READ_BACKEND', 

162 'WRITE_BACKEND', 

163 

164 # Constants from memory 

165 'MemoryType', 

166 'VALID_MEMORY_TYPES', 

167 'VALID_GPU_MEMORY_TYPES', 

168 

169 # Core components 

170 'Pipeline', 

171 'PipelineCompiler', 

172 

173 # Planner components 

174 'PipelinePathPlanner', 

175 'MaterializationFlagPlanner', 

176 'FuncStepContractValidator', 

177 # Removed GPUMemoryTypeValidator to break circular dependency 

178 'StepAttributeStripper' 

179]