Coverage for src/polystore/backend_registry.py: 52%

82 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 06:58 +0000

1""" 

2Storage backend metaclass registration system. 

3 

4Backends are automatically discovered and registered when their classes are defined. 

5""" 

6 

7import logging 

8from typing import Dict 

9from .base import BackendBase, DataSink 

10 

11logger = logging.getLogger(__name__) 

12 

13_backend_instances: Dict[str, DataSink] = {} 

14 

15 

16def _get_storage_backends() -> Dict: 

17 """Get the storage backends registry, ensuring it's initialized.""" 

18 # Import backends to trigger registration 

19 from . import memory, disk 

20 try: 

21 from . import zarr 

22 except ImportError: 

23 pass # Zarr not available 

24 

25 # Registry auto-created by AutoRegisterMeta on BackendBase 

26 return BackendBase.__registry__ 

27 

28 

29# Lazy access to registry 

30STORAGE_BACKENDS = None 

31 

32 

33def get_backend_instance(backend_type: str) -> DataSink: 

34 """ 

35 Get backend instance by type with lazy instantiation. 

36 

37 Args: 

38 backend_type: Backend type identifier (e.g., 'disk', 'memory') 

39 

40 Returns: 

41 Backend instance 

42 

43 Raises: 

44 KeyError: If backend type not registered 

45 RuntimeError: If backend instantiation fails 

46 """ 

47 backend_type = backend_type.lower() 

48 

49 # Return cached instance if available 

50 if backend_type in _backend_instances: 

51 return _backend_instances[backend_type] 

52 

53 # Get backend class from registry 

54 storage_backends = _get_storage_backends() 

55 if backend_type not in storage_backends: 

56 raise KeyError(f"Backend type '{backend_type}' not registered. " 

57 f"Available backends: {list(storage_backends.keys())}") 

58 

59 backend_class = storage_backends[backend_type] 

60 

61 try: 

62 # Create and cache instance 

63 instance = backend_class() 

64 _backend_instances[backend_type] = instance 

65 logger.debug(f"Created instance for backend '{backend_type}'") 

66 return instance 

67 except Exception as e: 

68 raise RuntimeError(f"Failed to instantiate backend '{backend_type}': {e}") from e 

69 

70 

71def create_storage_registry() -> Dict[str, DataSink]: 

72 """ 

73 Create storage registry with all registered backends. 

74 

75 Returns: 

76 Dictionary mapping backend types to instances 

77 """ 

78 # Get backends registry (triggers import and registration) 

79 storage_backends = _get_storage_backends() 

80 

81 # Backends that require context-specific initialization (e.g., plate_root) 

82 # These are registered lazily when needed, not at startup 

83 SKIP_BACKENDS = {'virtual_workspace'} 

84 

85 registry = {} 

86 for backend_type in storage_backends.keys(): 

87 # Skip backends that need context-specific initialization 

88 if backend_type in SKIP_BACKENDS: 

89 logger.debug(f"Skipping backend '{backend_type}' - requires context-specific initialization") 

90 continue 

91 

92 try: 

93 registry[backend_type] = get_backend_instance(backend_type) 

94 except Exception as e: 

95 logger.warning(f"Failed to create instance for backend '{backend_type}': {e}") 

96 continue 

97 

98 logger.info(f"Created storage registry with {len(registry)} backends: {list(registry.keys())}") 

99 return registry 

100 

101 

102def cleanup_backend_connections() -> None: 

103 """ 

104 Clean up backend connections without affecting persistent resources. 

105 

106 For napari streaming backend, this cleans up ZeroMQ connections but 

107 leaves the napari window open for future use. 

108 """ 

109 import os 

110 

111 # Check if we're running in test mode 

112 is_test_mode = ( 

113 'pytest' in os.environ.get('_', '') or 

114 'PYTEST_CURRENT_TEST' in os.environ or 

115 any('pytest' in arg for arg in __import__('sys').argv) 

116 ) 

117 

118 for backend_type, instance in _backend_instances.items(): 

119 # Use targeted cleanup for napari streaming to preserve window 

120 if hasattr(instance, 'cleanup_connections'): 

121 try: 

122 instance.cleanup_connections() 

123 logger.debug(f"Cleaned up connections for backend '{backend_type}'") 

124 except Exception as e: 

125 logger.warning(f"Failed to cleanup connections for backend '{backend_type}': {e}") 

126 elif hasattr(instance, 'cleanup') and backend_type != 'napari_stream': 

127 try: 

128 instance.cleanup() 

129 logger.debug(f"Cleaned up backend '{backend_type}'") 

130 except Exception as e: 

131 logger.warning(f"Failed to cleanup backend '{backend_type}': {e}") 

132 

133 # In test mode, also stop viewer processes to allow pytest to exit 

134 if is_test_mode: 

135 try: 

136 from openhcs.runtime.napari_stream_visualizer import _cleanup_global_viewer 

137 _cleanup_global_viewer() 

138 logger.debug("Cleaned up napari viewer for test mode") 

139 except ImportError: 

140 pass # napari not available 

141 except Exception as e: 

142 logger.warning(f"Failed to cleanup napari viewer: {e}") 

143 

144 

145class BackendRegistry(dict): 

146 """ 

147 Registry for storage backends. 

148 

149 This is a dictionary that automatically populates with available backends 

150 when first accessed. 

151 """ 

152 

153 def __init__(self): 

154 """Initialize the backend registry.""" 

155 super().__init__() 

156 # Populate with available backends 

157 self.update(create_storage_registry()) 

158 

159 

160def cleanup_all_backends() -> None: 

161 """ 

162 Clean up all cached backend instances completely. 

163 

164 This is for full shutdown - clears instance cache and calls full cleanup. 

165 Use cleanup_backend_connections() for test cleanup to preserve napari window. 

166 """ 

167 for backend_type, instance in _backend_instances.items(): 

168 if hasattr(instance, 'cleanup'): 

169 try: 

170 instance.cleanup() 

171 logger.debug(f"Cleaned up backend '{backend_type}'") 

172 except Exception as e: 

173 logger.warning(f"Failed to cleanup backend '{backend_type}': {e}") 

174 

175 _backend_instances.clear() 

176 logger.info("All backend instances cleaned up") 

177 

178 

179