Coverage for openhcs/io/backend_registry.py: 64.3%
102 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2Storage backend metaclass registration system.
4Eliminates hardcoded backend registration by using metaclass auto-registration
5following OpenHCS generic solution principles. Backends are automatically
6discovered and registered when their classes are defined.
7"""
9import logging
10from abc import ABCMeta
11from typing import Dict, Type, List, Optional
12from openhcs.io.base import DataSink, StorageBackend
13from openhcs.io.streaming import StreamingBackend
15logger = logging.getLogger(__name__)
17# Global registry of storage backends - populated by metaclass
18STORAGE_BACKENDS: Dict[str, Type[DataSink]] = {}
20# Global registry of backend instances - created lazily
21_backend_instances: Dict[str, DataSink] = {}
24class StorageBackendMeta(ABCMeta):
25 """
26 Metaclass for automatic registration of storage backends.
28 Automatically registers backend classes when they are defined,
29 eliminating the need for hardcoded registration in factory functions.
30 """
32 def __new__(cls, name, bases, attrs):
33 new_class = super().__new__(cls, name, bases, attrs)
35 # Only register concrete implementations (not abstract base classes)
36 if not getattr(new_class, '__abstractmethods__', None): 36 ↛ 53line 36 didn't jump to line 53 because the condition on line 36 was always true
37 # Extract backend type from class attributes or class name
38 backend_type = getattr(new_class, '_backend_type', None)
40 if backend_type is None: 40 ↛ 42line 40 didn't jump to line 42 because the condition on line 40 was never true
41 # Skip registration if no explicit backend type
42 logger.debug(f"Skipping registration for {name} - no explicit _backend_type attribute")
43 return new_class
45 # Auto-register in STORAGE_BACKENDS
46 STORAGE_BACKENDS[backend_type] = new_class
48 # Store the backend type as a class attribute
49 new_class._backend_type = backend_type
51 logger.debug(f"Auto-registered {name} as '{backend_type}' backend")
53 return new_class
56def get_backend_instance(backend_type: str) -> DataSink:
57 """
58 Get backend instance by type with lazy instantiation.
60 Args:
61 backend_type: Backend type identifier (e.g., 'disk', 'memory')
63 Returns:
64 Backend instance
66 Raises:
67 KeyError: If backend type not registered
68 RuntimeError: If backend instantiation fails
69 """
70 backend_type = backend_type.lower()
72 # Return cached instance if available
73 if backend_type in _backend_instances: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 return _backend_instances[backend_type]
76 # Get backend class from registry
77 if backend_type not in STORAGE_BACKENDS: 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 raise KeyError(f"Backend type '{backend_type}' not registered. "
79 f"Available backends: {list(STORAGE_BACKENDS.keys())}")
81 backend_class = STORAGE_BACKENDS[backend_type]
83 try:
84 # Create and cache instance
85 instance = backend_class()
86 _backend_instances[backend_type] = instance
87 logger.debug(f"Created instance for backend '{backend_type}'")
88 return instance
89 except Exception as e:
90 raise RuntimeError(f"Failed to instantiate backend '{backend_type}': {e}") from e
93def create_storage_registry() -> Dict[str, DataSink]:
94 """
95 Create storage registry with all registered backends.
97 Returns:
98 Dictionary mapping backend types to instances
99 """
100 # Ensure all backends are discovered
101 discover_all_backends()
103 registry = {}
104 for backend_type in STORAGE_BACKENDS.keys():
105 try:
106 registry[backend_type] = get_backend_instance(backend_type)
107 except Exception as e:
108 logger.warning(f"Failed to create instance for backend '{backend_type}': {e}")
109 continue
111 logger.info(f"Created storage registry with {len(registry)} backends: {list(registry.keys())}")
112 return registry
115def cleanup_backend_connections() -> None:
116 """
117 Clean up backend connections without affecting persistent resources.
119 For napari streaming backend, this cleans up ZeroMQ connections but
120 leaves the napari window open for future use.
121 """
122 import os
124 # Check if we're running in test mode
125 is_test_mode = (
126 'pytest' in os.environ.get('_', '') or
127 'PYTEST_CURRENT_TEST' in os.environ or
128 any('pytest' in arg for arg in __import__('sys').argv)
129 )
131 for backend_type, instance in _backend_instances.items():
132 # Use targeted cleanup for napari streaming to preserve window
133 if hasattr(instance, 'cleanup_connections'):
134 try:
135 instance.cleanup_connections()
136 logger.debug(f"Cleaned up connections for backend '{backend_type}'")
137 except Exception as e:
138 logger.warning(f"Failed to cleanup connections for backend '{backend_type}': {e}")
139 elif hasattr(instance, 'cleanup') and backend_type != 'napari_stream':
140 try:
141 instance.cleanup()
142 logger.debug(f"Cleaned up backend '{backend_type}'")
143 except Exception as e:
144 logger.warning(f"Failed to cleanup backend '{backend_type}': {e}")
146 # In test mode, also stop viewer processes to allow pytest to exit
147 if is_test_mode: 147 ↛ 166line 147 didn't jump to line 166 because the condition on line 147 was always true
148 try:
149 from openhcs.runtime.napari_stream_visualizer import _cleanup_global_viewer
150 _cleanup_global_viewer()
151 logger.debug("Cleaned up napari viewer for test mode")
152 except ImportError:
153 pass # napari not available
154 except Exception as e:
155 logger.warning(f"Failed to cleanup napari viewer: {e}")
157 try:
158 from openhcs.runtime.fiji_stream_visualizer import _cleanup_global_fiji_viewer
159 _cleanup_global_fiji_viewer()
160 logger.debug("Cleaned up Fiji viewer for test mode")
161 except ImportError:
162 pass # fiji visualizer not available
163 except Exception as e:
164 logger.warning(f"Failed to cleanup Fiji viewer: {e}")
166 logger.info(f"Backend connections cleaned up ({'test mode' if is_test_mode else 'napari window preserved'})")
169def cleanup_all_backends() -> None:
170 """
171 Clean up all cached backend instances completely.
173 This is for full shutdown - clears instance cache and calls full cleanup.
174 Use cleanup_backend_connections() for test cleanup to preserve napari window.
175 """
176 for backend_type, instance in _backend_instances.items():
177 if hasattr(instance, 'cleanup'):
178 try:
179 instance.cleanup()
180 logger.debug(f"Cleaned up backend '{backend_type}'")
181 except Exception as e:
182 logger.warning(f"Failed to cleanup backend '{backend_type}': {e}")
184 _backend_instances.clear()
185 logger.info("All backend instances cleaned up")
188def discover_all_backends() -> None:
189 """
190 Discover all storage backends by importing specific modules.
192 Uses direct imports to avoid circular dependency issues while
193 still triggering metaclass registration.
194 """
195 import os
197 # Check if we're in subprocess runner mode and should skip GPU-heavy backends
198 if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was never true
199 # Subprocess runner mode - only import essential backends
200 try:
201 from openhcs.io import disk, memory
202 logger.debug(f"Subprocess runner mode - discovered {len(STORAGE_BACKENDS)} essential backends: {list(STORAGE_BACKENDS.keys())}")
203 except ImportError as e:
204 logger.warning(f"Could not import essential backend modules: {e}")
205 else:
206 # Normal mode - import all backend modules to trigger metaclass registration
207 try:
208 from openhcs.io import disk, memory, zarr, napari_stream, fiji_stream
209 logger.debug(f"Discovered {len(STORAGE_BACKENDS)} storage backends: {list(STORAGE_BACKENDS.keys())}")
210 except ImportError as e:
211 logger.warning(f"Could not import some backend modules: {e}")