Coverage for openhcs/core/streaming_config_factory.py: 63.7%

63 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Factory functions for creating streaming configuration classes. 

3 

4This module contains OpenHCS-specific utilities for generating streaming configs 

5with minimal boilerplate. Keeps openhcs/core/config.py purely declarative. 

6""" 

7 

8from typing import Optional, List, Type, Union, TYPE_CHECKING 

9from pathlib import Path 

10from dataclasses import dataclass 

11from abc import ABC 

12 

13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true

14 from openhcs.core.config import GlobalPipelineConfig, PipelineConfig 

15 

16 

17def create_streaming_config( 

18 viewer_name: str, 

19 port: int, 

20 backend, # Backend enum 

21 display_config_class, 

22 visualizer_module: str, 

23 visualizer_class_name: str, 

24 extra_fields: dict = None 

25): 

26 """ 

27 Factory to create streaming config classes with minimal boilerplate. 

28  

29 Eliminates duplication between streaming configs by auto-generating classes 

30 from declarative specifications. Adding a new streaming backend requires only 

31 5-10 lines instead of ~50 lines of boilerplate. 

32  

33 Args: 

34 viewer_name: Viewer identifier ('napari', 'fiji', etc.) 

35 port: Default port number 

36 backend: Backend enum value 

37 display_config_class: Display config class to inherit from 

38 visualizer_module: Module path for visualizer class 

39 visualizer_class_name: Name of visualizer class 

40 extra_fields: Optional dict of {field_name: (type, default_value)} 

41  

42 Returns: 

43 Dynamically created streaming config class 

44  

45 Example: 

46 >>> NapariStreamingConfig = create_streaming_config( 

47 ... viewer_name='napari', 

48 ... port=5555, 

49 ... backend=Backend.NAPARI_STREAM, 

50 ... display_config_class=NapariDisplayConfig, 

51 ... visualizer_module='openhcs.runtime.napari_stream_visualizer', 

52 ... visualizer_class_name='NapariStreamVisualizer' 

53 ... ) 

54 """ 

55 # Import here to avoid circular dependencies 

56 from openhcs.core.config import StreamingConfig 

57 

58 # Get the global_pipeline_config decorator from config module 

59 # It's created by @auto_create_decorator on GlobalPipelineConfig 

60 import openhcs.core.config as config_module 

61 global_pipeline_config = getattr(config_module, 'global_pipeline_config', None) 

62 if global_pipeline_config is None: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 raise RuntimeError("global_pipeline_config decorator not found. Import openhcs.core.config first.") 

64 

65 # Build class namespace with methods 

66 def _get_streaming_kwargs(self, context): 

67 kwargs = { 

68 "port": self.port, 

69 "host": self.host, 

70 "transport_mode": self.transport_mode, 

71 "display_config": self 

72 } 

73 # Add extra fields to kwargs 

74 if extra_fields: 

75 for field_name in extra_fields: 

76 kwargs[field_name] = getattr(self, field_name) 

77 if context: 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true

78 kwargs["microscope_handler"] = context.microscope_handler 

79 return kwargs 

80 

81 def _create_visualizer(self, filemanager, visualizer_config): 

82 # Lazy import to avoid circular dependencies 

83 module = __import__(visualizer_module, fromlist=[visualizer_class_name]) 

84 visualizer_class = getattr(module, visualizer_class_name) 

85 return visualizer_class( 

86 filemanager, 

87 visualizer_config, 

88 viewer_title=f"OpenHCS {viewer_name.title()} Visualization", 

89 persistent=self.persistent, 

90 port=self.port, 

91 display_config=self, 

92 transport_mode=self.transport_mode 

93 ) 

94 

95 # Build class dict with properties using lambdas 

96 class_dict = { 

97 'port': port, 

98 'backend': property(lambda self: backend), 

99 'viewer_type': property(lambda self: viewer_name), 

100 'step_plan_output_key': property(lambda self: f"{viewer_name}_streaming_paths"), 

101 'get_streaming_kwargs': _get_streaming_kwargs, 

102 'create_visualizer': _create_visualizer, 

103 '__annotations__': {'port': int}, 

104 '__module__': 'openhcs.core.config', # Make it appear as if defined in config.py 

105 } 

106 

107 # Add extra fields 

108 if extra_fields: 

109 for field_name, (field_type, default_val) in extra_fields.items(): 

110 class_dict[field_name] = default_val 

111 class_dict['__annotations__'][field_name] = field_type 

112 

113 # Create class dynamically 

114 cls_name = f"{viewer_name.title()}StreamingConfig" 

115 new_class = type(cls_name, (StreamingConfig, display_config_class), class_dict) 

116 

117 # Apply decorators 

118 new_class = dataclass(frozen=True)(new_class) 

119 new_class = global_pipeline_config(new_class) 

120 

121 return new_class 

122 

123 

124def build_component_order(): 

125 """ 

126 Build canonical component order from VirtualComponents + AllComponents. 

127 

128 This ensures VirtualComponents is the single source of truth - if you add/remove 

129 a virtual component, the component_order is automatically updated. 

130 

131 Returns: 

132 List of component names in canonical order for layer/window naming 

133 """ 

134 from openhcs.constants import AllComponents, VirtualComponents 

135 

136 # Virtual components come first (for step/source grouping) 

137 virtual_component_names = [vc.value for vc in VirtualComponents] 

138 

139 # Then filename components in standard order 

140 filename_component_names = [ac.value for ac in AllComponents] 

141 

142 # Combine, preserving order and avoiding duplicates 

143 component_order = [] 

144 seen = set() 

145 for name in virtual_component_names + filename_component_names: 

146 if name not in seen: 146 ↛ 145line 146 didn't jump to line 145 because the condition on line 146 was always true

147 component_order.append(name) 

148 seen.add(name) 

149 

150 return component_order 

151 

152 

153def get_all_streaming_ports( 

154 config: 'Union[GlobalPipelineConfig, PipelineConfig]' = None, 

155 num_ports_per_type: int = 10 

156) -> List[int]: 

157 """Get all streaming ports for all registered streaming config types. 

158 

159 Extracts actual configured ports from the provided config (GlobalPipelineConfig 

160 or PipelineConfig). This ensures the scanner finds viewers launched with custom 

161 ports from the orchestrator. 

162 

163 Args: 

164 config: GlobalPipelineConfig or PipelineConfig to extract ports from. 

165 If None, uses current GlobalPipelineConfig from context. 

166 num_ports_per_type: Number of ports to allocate per streaming type (default: 10) 

167 

168 Returns: 

169 List of all streaming ports across all types 

170 """ 

171 from openhcs.constants.constants import DEFAULT_EXECUTION_SERVER_PORT 

172 from openhcs.core.config import StreamingConfig, GlobalPipelineConfig 

173 from openhcs.config_framework.global_config import get_current_global_config 

174 

175 # Start with execution server port 

176 ports = [DEFAULT_EXECUTION_SERVER_PORT] 

177 

178 # Get config to extract ports from 

179 if config is None: 

180 config = get_current_global_config(GlobalPipelineConfig) 

181 if config is None: 

182 # No config available - return just execution server port 

183 return ports 

184 

185 # Extract all streaming config fields from the config 

186 # Works for both GlobalPipelineConfig and PipelineConfig 

187 import dataclasses 

188 for field in dataclasses.fields(config): 

189 field_value = getattr(config, field.name) 

190 

191 # Check if this field is a StreamingConfig 

192 if field_value is not None and isinstance(field_value, StreamingConfig): 

193 port = field_value.port 

194 

195 # Fail-loud if concrete config has None port (configuration error) 

196 if port is None: 

197 raise ValueError( 

198 f"Streaming config {field.name} has None port. " 

199 f"All StreamingConfig instances must have a port." 

200 ) 

201 

202 # Generate port range for this streaming type 

203 ports.extend([port + i for i in range(num_ports_per_type)]) 

204 

205 return ports 

206