Coverage for src/polystore/backend_registry.py: 52%
82 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 06:58 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 06:58 +0000
1"""
2Storage backend metaclass registration system.
4Backends are automatically discovered and registered when their classes are defined.
5"""
7import logging
8from typing import Dict
9from .base import BackendBase, DataSink
11logger = logging.getLogger(__name__)
13_backend_instances: Dict[str, DataSink] = {}
16def _get_storage_backends() -> Dict:
17 """Get the storage backends registry, ensuring it's initialized."""
18 # Import backends to trigger registration
19 from . import memory, disk
20 try:
21 from . import zarr
22 except ImportError:
23 pass # Zarr not available
25 # Registry auto-created by AutoRegisterMeta on BackendBase
26 return BackendBase.__registry__
29# Lazy access to registry
30STORAGE_BACKENDS = None
33def get_backend_instance(backend_type: str) -> DataSink:
34 """
35 Get backend instance by type with lazy instantiation.
37 Args:
38 backend_type: Backend type identifier (e.g., 'disk', 'memory')
40 Returns:
41 Backend instance
43 Raises:
44 KeyError: If backend type not registered
45 RuntimeError: If backend instantiation fails
46 """
47 backend_type = backend_type.lower()
49 # Return cached instance if available
50 if backend_type in _backend_instances:
51 return _backend_instances[backend_type]
53 # Get backend class from registry
54 storage_backends = _get_storage_backends()
55 if backend_type not in storage_backends:
56 raise KeyError(f"Backend type '{backend_type}' not registered. "
57 f"Available backends: {list(storage_backends.keys())}")
59 backend_class = storage_backends[backend_type]
61 try:
62 # Create and cache instance
63 instance = backend_class()
64 _backend_instances[backend_type] = instance
65 logger.debug(f"Created instance for backend '{backend_type}'")
66 return instance
67 except Exception as e:
68 raise RuntimeError(f"Failed to instantiate backend '{backend_type}': {e}") from e
71def create_storage_registry() -> Dict[str, DataSink]:
72 """
73 Create storage registry with all registered backends.
75 Returns:
76 Dictionary mapping backend types to instances
77 """
78 # Get backends registry (triggers import and registration)
79 storage_backends = _get_storage_backends()
81 # Backends that require context-specific initialization (e.g., plate_root)
82 # These are registered lazily when needed, not at startup
83 SKIP_BACKENDS = {'virtual_workspace'}
85 registry = {}
86 for backend_type in storage_backends.keys():
87 # Skip backends that need context-specific initialization
88 if backend_type in SKIP_BACKENDS:
89 logger.debug(f"Skipping backend '{backend_type}' - requires context-specific initialization")
90 continue
92 try:
93 registry[backend_type] = get_backend_instance(backend_type)
94 except Exception as e:
95 logger.warning(f"Failed to create instance for backend '{backend_type}': {e}")
96 continue
98 logger.info(f"Created storage registry with {len(registry)} backends: {list(registry.keys())}")
99 return registry
102def cleanup_backend_connections() -> None:
103 """
104 Clean up backend connections without affecting persistent resources.
106 For napari streaming backend, this cleans up ZeroMQ connections but
107 leaves the napari window open for future use.
108 """
109 import os
111 # Check if we're running in test mode
112 is_test_mode = (
113 'pytest' in os.environ.get('_', '') or
114 'PYTEST_CURRENT_TEST' in os.environ or
115 any('pytest' in arg for arg in __import__('sys').argv)
116 )
118 for backend_type, instance in _backend_instances.items():
119 # Use targeted cleanup for napari streaming to preserve window
120 if hasattr(instance, 'cleanup_connections'):
121 try:
122 instance.cleanup_connections()
123 logger.debug(f"Cleaned up connections for backend '{backend_type}'")
124 except Exception as e:
125 logger.warning(f"Failed to cleanup connections for backend '{backend_type}': {e}")
126 elif hasattr(instance, 'cleanup') and backend_type != 'napari_stream':
127 try:
128 instance.cleanup()
129 logger.debug(f"Cleaned up backend '{backend_type}'")
130 except Exception as e:
131 logger.warning(f"Failed to cleanup backend '{backend_type}': {e}")
133 # In test mode, also stop viewer processes to allow pytest to exit
134 if is_test_mode:
135 try:
136 from openhcs.runtime.napari_stream_visualizer import _cleanup_global_viewer
137 _cleanup_global_viewer()
138 logger.debug("Cleaned up napari viewer for test mode")
139 except ImportError:
140 pass # napari not available
141 except Exception as e:
142 logger.warning(f"Failed to cleanup napari viewer: {e}")
145class BackendRegistry(dict):
146 """
147 Registry for storage backends.
149 This is a dictionary that automatically populates with available backends
150 when first accessed.
151 """
153 def __init__(self):
154 """Initialize the backend registry."""
155 super().__init__()
156 # Populate with available backends
157 self.update(create_storage_registry())
160def cleanup_all_backends() -> None:
161 """
162 Clean up all cached backend instances completely.
164 This is for full shutdown - clears instance cache and calls full cleanup.
165 Use cleanup_backend_connections() for test cleanup to preserve napari window.
166 """
167 for backend_type, instance in _backend_instances.items():
168 if hasattr(instance, 'cleanup'):
169 try:
170 instance.cleanup()
171 logger.debug(f"Cleaned up backend '{backend_type}'")
172 except Exception as e:
173 logger.warning(f"Failed to cleanup backend '{backend_type}': {e}")
175 _backend_instances.clear()
176 logger.info("All backend instances cleaned up")