Coverage for openhcs/core/streaming_config_factory.py: 63.7%
63 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Factory functions for creating streaming configuration classes.
4This module contains OpenHCS-specific utilities for generating streaming configs
5with minimal boilerplate. Keeps openhcs/core/config.py purely declarative.
6"""
8from typing import Optional, List, Type, Union, TYPE_CHECKING
9from pathlib import Path
10from dataclasses import dataclass
11from abc import ABC
13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true
14 from openhcs.core.config import GlobalPipelineConfig, PipelineConfig
17def create_streaming_config(
18 viewer_name: str,
19 port: int,
20 backend, # Backend enum
21 display_config_class,
22 visualizer_module: str,
23 visualizer_class_name: str,
24 extra_fields: dict = None
25):
26 """
27 Factory to create streaming config classes with minimal boilerplate.
29 Eliminates duplication between streaming configs by auto-generating classes
30 from declarative specifications. Adding a new streaming backend requires only
31 5-10 lines instead of ~50 lines of boilerplate.
33 Args:
34 viewer_name: Viewer identifier ('napari', 'fiji', etc.)
35 port: Default port number
36 backend: Backend enum value
37 display_config_class: Display config class to inherit from
38 visualizer_module: Module path for visualizer class
39 visualizer_class_name: Name of visualizer class
40 extra_fields: Optional dict of {field_name: (type, default_value)}
42 Returns:
43 Dynamically created streaming config class
45 Example:
46 >>> NapariStreamingConfig = create_streaming_config(
47 ... viewer_name='napari',
48 ... port=5555,
49 ... backend=Backend.NAPARI_STREAM,
50 ... display_config_class=NapariDisplayConfig,
51 ... visualizer_module='openhcs.runtime.napari_stream_visualizer',
52 ... visualizer_class_name='NapariStreamVisualizer'
53 ... )
54 """
55 # Import here to avoid circular dependencies
56 from openhcs.core.config import StreamingConfig
58 # Get the global_pipeline_config decorator from config module
59 # It's created by @auto_create_decorator on GlobalPipelineConfig
60 import openhcs.core.config as config_module
61 global_pipeline_config = getattr(config_module, 'global_pipeline_config', None)
62 if global_pipeline_config is None: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 raise RuntimeError("global_pipeline_config decorator not found. Import openhcs.core.config first.")
65 # Build class namespace with methods
66 def _get_streaming_kwargs(self, context):
67 kwargs = {
68 "port": self.port,
69 "host": self.host,
70 "transport_mode": self.transport_mode,
71 "display_config": self
72 }
73 # Add extra fields to kwargs
74 if extra_fields:
75 for field_name in extra_fields:
76 kwargs[field_name] = getattr(self, field_name)
77 if context: 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true
78 kwargs["microscope_handler"] = context.microscope_handler
79 return kwargs
81 def _create_visualizer(self, filemanager, visualizer_config):
82 # Lazy import to avoid circular dependencies
83 module = __import__(visualizer_module, fromlist=[visualizer_class_name])
84 visualizer_class = getattr(module, visualizer_class_name)
85 return visualizer_class(
86 filemanager,
87 visualizer_config,
88 viewer_title=f"OpenHCS {viewer_name.title()} Visualization",
89 persistent=self.persistent,
90 port=self.port,
91 display_config=self,
92 transport_mode=self.transport_mode
93 )
95 # Build class dict with properties using lambdas
96 class_dict = {
97 'port': port,
98 'backend': property(lambda self: backend),
99 'viewer_type': property(lambda self: viewer_name),
100 'step_plan_output_key': property(lambda self: f"{viewer_name}_streaming_paths"),
101 'get_streaming_kwargs': _get_streaming_kwargs,
102 'create_visualizer': _create_visualizer,
103 '__annotations__': {'port': int},
104 '__module__': 'openhcs.core.config', # Make it appear as if defined in config.py
105 }
107 # Add extra fields
108 if extra_fields:
109 for field_name, (field_type, default_val) in extra_fields.items():
110 class_dict[field_name] = default_val
111 class_dict['__annotations__'][field_name] = field_type
113 # Create class dynamically
114 cls_name = f"{viewer_name.title()}StreamingConfig"
115 new_class = type(cls_name, (StreamingConfig, display_config_class), class_dict)
117 # Apply decorators
118 new_class = dataclass(frozen=True)(new_class)
119 new_class = global_pipeline_config(new_class)
121 return new_class
124def build_component_order():
125 """
126 Build canonical component order from VirtualComponents + AllComponents.
128 This ensures VirtualComponents is the single source of truth - if you add/remove
129 a virtual component, the component_order is automatically updated.
131 Returns:
132 List of component names in canonical order for layer/window naming
133 """
134 from openhcs.constants import AllComponents, VirtualComponents
136 # Virtual components come first (for step/source grouping)
137 virtual_component_names = [vc.value for vc in VirtualComponents]
139 # Then filename components in standard order
140 filename_component_names = [ac.value for ac in AllComponents]
142 # Combine, preserving order and avoiding duplicates
143 component_order = []
144 seen = set()
145 for name in virtual_component_names + filename_component_names:
146 if name not in seen: 146 ↛ 145line 146 didn't jump to line 145 because the condition on line 146 was always true
147 component_order.append(name)
148 seen.add(name)
150 return component_order
153def get_all_streaming_ports(
154 config: 'Union[GlobalPipelineConfig, PipelineConfig]' = None,
155 num_ports_per_type: int = 10
156) -> List[int]:
157 """Get all streaming ports for all registered streaming config types.
159 Extracts actual configured ports from the provided config (GlobalPipelineConfig
160 or PipelineConfig). This ensures the scanner finds viewers launched with custom
161 ports from the orchestrator.
163 Args:
164 config: GlobalPipelineConfig or PipelineConfig to extract ports from.
165 If None, uses current GlobalPipelineConfig from context.
166 num_ports_per_type: Number of ports to allocate per streaming type (default: 10)
168 Returns:
169 List of all streaming ports across all types
170 """
171 from openhcs.constants.constants import DEFAULT_EXECUTION_SERVER_PORT
172 from openhcs.core.config import StreamingConfig, GlobalPipelineConfig
173 from openhcs.config_framework.global_config import get_current_global_config
175 # Start with execution server port
176 ports = [DEFAULT_EXECUTION_SERVER_PORT]
178 # Get config to extract ports from
179 if config is None:
180 config = get_current_global_config(GlobalPipelineConfig)
181 if config is None:
182 # No config available - return just execution server port
183 return ports
185 # Extract all streaming config fields from the config
186 # Works for both GlobalPipelineConfig and PipelineConfig
187 import dataclasses
188 for field in dataclasses.fields(config):
189 field_value = getattr(config, field.name)
191 # Check if this field is a StreamingConfig
192 if field_value is not None and isinstance(field_value, StreamingConfig):
193 port = field_value.port
195 # Fail-loud if concrete config has None port (configuration error)
196 if port is None:
197 raise ValueError(
198 f"Streaming config {field.name} has None port. "
199 f"All StreamingConfig instances must have a port."
200 )
202 # Generate port range for this streaming type
203 ports.extend([port + i for i in range(num_ports_per_type)])
205 return ports