Coverage for openhcs/io/backend_registry.py: 58.6%
79 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Storage backend metaclass registration system.
4Eliminates hardcoded backend registration by using metaclass auto-registration
5following OpenHCS generic solution principles. Backends are automatically
6discovered and registered when their classes are defined.
7"""
9import logging
10from typing import Dict
11from openhcs.io.base import BackendBase, DataSink
12from openhcs.core.auto_register_meta import AutoRegisterMeta
14logger = logging.getLogger(__name__)
16_backend_instances: Dict[str, DataSink] = {}
18# Registry auto-created by AutoRegisterMeta on BackendBase
19# Includes both StorageBackend (read-write) and ReadOnlyBackend (read-only) subclasses
20STORAGE_BACKENDS = BackendBase.__registry__
23def get_backend_instance(backend_type: str) -> DataSink:
24 """
25 Get backend instance by type with lazy instantiation.
27 Args:
28 backend_type: Backend type identifier (e.g., 'disk', 'memory')
30 Returns:
31 Backend instance
33 Raises:
34 KeyError: If backend type not registered
35 RuntimeError: If backend instantiation fails
36 """
37 backend_type = backend_type.lower()
39 # Return cached instance if available
40 if backend_type in _backend_instances:
41 return _backend_instances[backend_type]
43 # Get backend class from registry
44 if backend_type not in STORAGE_BACKENDS: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 raise KeyError(f"Backend type '{backend_type}' not registered. "
46 f"Available backends: {list(STORAGE_BACKENDS.keys())}")
48 backend_class = STORAGE_BACKENDS[backend_type]
50 try:
51 # Create and cache instance
52 instance = backend_class()
53 _backend_instances[backend_type] = instance
54 logger.debug(f"Created instance for backend '{backend_type}'")
55 return instance
56 except Exception as e:
57 raise RuntimeError(f"Failed to instantiate backend '{backend_type}': {e}") from e
60def create_storage_registry() -> Dict[str, DataSink]:
61 """
62 Create storage registry with all registered backends.
64 Returns:
65 Dictionary mapping backend types to instances
66 """
67 # Backends auto-discovered on first access to STORAGE_BACKENDS
69 # Backends that require context-specific initialization (e.g., plate_root)
70 # These are registered lazily when needed, not at startup
71 SKIP_BACKENDS = {'virtual_workspace'}
73 registry = {}
74 for backend_type in STORAGE_BACKENDS.keys(): # Auto-discovers here
75 # Skip backends that need context-specific initialization
76 if backend_type in SKIP_BACKENDS:
77 logger.debug(f"Skipping backend '{backend_type}' - requires context-specific initialization")
78 continue
80 try:
81 registry[backend_type] = get_backend_instance(backend_type)
82 except Exception as e:
83 logger.warning(f"Failed to create instance for backend '{backend_type}': {e}")
84 continue
86 logger.info(f"Created storage registry with {len(registry)} backends: {list(registry.keys())}")
87 return registry
90def cleanup_backend_connections() -> None:
91 """
92 Clean up backend connections without affecting persistent resources.
94 For napari streaming backend, this cleans up ZeroMQ connections but
95 leaves the napari window open for future use.
96 """
97 import os
99 # Check if we're running in test mode
100 is_test_mode = (
101 'pytest' in os.environ.get('_', '') or
102 'PYTEST_CURRENT_TEST' in os.environ or
103 any('pytest' in arg for arg in __import__('sys').argv)
104 )
106 for backend_type, instance in _backend_instances.items():
107 # Use targeted cleanup for napari streaming to preserve window
108 if hasattr(instance, 'cleanup_connections'): 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 try:
110 instance.cleanup_connections()
111 logger.debug(f"Cleaned up connections for backend '{backend_type}'")
112 except Exception as e:
113 logger.warning(f"Failed to cleanup connections for backend '{backend_type}': {e}")
114 elif hasattr(instance, 'cleanup') and backend_type != 'napari_stream': 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 try:
116 instance.cleanup()
117 logger.debug(f"Cleaned up backend '{backend_type}'")
118 except Exception as e:
119 logger.warning(f"Failed to cleanup backend '{backend_type}': {e}")
121 # In test mode, also stop viewer processes to allow pytest to exit
122 if is_test_mode: 122 ↛ 141line 122 didn't jump to line 141 because the condition on line 122 was always true
123 try:
124 from openhcs.runtime.napari_stream_visualizer import _cleanup_global_viewer
125 _cleanup_global_viewer()
126 logger.debug("Cleaned up napari viewer for test mode")
127 except ImportError:
128 pass # napari not available
129 except Exception as e:
130 logger.warning(f"Failed to cleanup napari viewer: {e}")
132 try:
133 from openhcs.runtime.fiji_stream_visualizer import _cleanup_global_fiji_viewer
134 _cleanup_global_fiji_viewer()
135 logger.debug("Cleaned up Fiji viewer for test mode")
136 except ImportError:
137 pass # fiji visualizer not available
138 except Exception as e:
139 logger.warning(f"Failed to cleanup Fiji viewer: {e}")
141 logger.info(f"Backend connections cleaned up ({'test mode' if is_test_mode else 'napari window preserved'})")
144def cleanup_all_backends() -> None:
145 """
146 Clean up all cached backend instances completely.
148 This is for full shutdown - clears instance cache and calls full cleanup.
149 Use cleanup_backend_connections() for test cleanup to preserve napari window.
150 """
151 for backend_type, instance in _backend_instances.items():
152 if hasattr(instance, 'cleanup'):
153 try:
154 instance.cleanup()
155 logger.debug(f"Cleaned up backend '{backend_type}'")
156 except Exception as e:
157 logger.warning(f"Failed to cleanup backend '{backend_type}': {e}")
159 _backend_instances.clear()
160 logger.info("All backend instances cleaned up")