Coverage for openhcs/core/registry_cache.py: 77.0%

109 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Generic caching system for plugin registries. 

3 

4Provides unified caching for both function registries (Pattern B) and 

5metaclass registries (Pattern A), eliminating code duplication and 

6ensuring consistent cache behavior across the codebase. 

7 

8Architecture: 

9- RegistryCacheManager: Generic cache manager for any registry type 

10- Supports version validation, age-based invalidation, mtime checking 

11- JSON-based serialization with custom serializers/deserializers 

12- XDG-compliant cache locations 

13 

14Usage: 

15 # For function registries 

16 cache_mgr = RegistryCacheManager( 

17 cache_name="scikit_image_functions", 

18 version_getter=lambda: skimage.__version__, 

19 serializer=serialize_function_metadata, 

20 deserializer=deserialize_function_metadata 

21 ) 

22  

23 # For metaclass registries 

24 cache_mgr = RegistryCacheManager( 

25 cache_name="microscope_handlers", 

26 version_getter=lambda: openhcs.__version__, 

27 serializer=serialize_plugin_class, 

28 deserializer=deserialize_plugin_class 

29 ) 

30""" 

31 

32import json 

33import logging 

34import time 

35from pathlib import Path 

36from typing import Dict, Any, Optional, Callable, TypeVar, Generic 

37from dataclasses import dataclass 

38 

39from openhcs.core.xdg_paths import get_cache_file_path 

40 

41logger = logging.getLogger(__name__) 

42 

43T = TypeVar('T') # Generic type for cached items 

44 

45 

46@dataclass 

47class CacheConfig: 

48 """Configuration for registry caching behavior.""" 

49 max_age_days: int = 7 # Maximum cache age before invalidation 

50 check_mtimes: bool = False # Check file modification times 

51 cache_version: str = "1.0" # Cache format version 

52 

53 

54class RegistryCacheManager(Generic[T]): 

55 """ 

56 Generic cache manager for plugin registries. 

57  

58 Handles caching, validation, and reconstruction of registry data 

59 with support for version checking, age-based invalidation, and 

60 custom serialization. 

61  

62 Type Parameters: 

63 T: Type of items being cached (e.g., FunctionMetadata, Type[Plugin]) 

64 """ 

65 

66 def __init__( 

67 self, 

68 cache_name: str, 

69 version_getter: Callable[[], str], 

70 serializer: Callable[[T], Dict[str, Any]], 

71 deserializer: Callable[[Dict[str, Any]], T], 

72 config: Optional[CacheConfig] = None 

73 ): 

74 """ 

75 Initialize cache manager. 

76  

77 Args: 

78 cache_name: Name for the cache file (e.g., "microscope_handlers") 

79 version_getter: Function that returns current version string 

80 serializer: Function to serialize item to JSON-compatible dict 

81 deserializer: Function to deserialize dict back to item 

82 config: Optional cache configuration 

83 """ 

84 self.cache_name = cache_name 

85 self.version_getter = version_getter 

86 self.serializer = serializer 

87 self.deserializer = deserializer 

88 self.config = config or CacheConfig() 

89 self._cache_path = get_cache_file_path(f"{cache_name}.json") 

90 

91 def load_cache(self) -> Optional[Dict[str, T]]: 

92 """ 

93 Load cached items with validation. 

94  

95 Returns: 

96 Dictionary of cached items, or None if cache is invalid 

97 """ 

98 if not self._cache_path.exists(): 

99 logger.debug(f"No cache found for {self.cache_name}") 

100 return None 

101 

102 try: 

103 with open(self._cache_path, 'r') as f: 

104 cache_data = json.load(f) 

105 except json.JSONDecodeError: 

106 logger.warning(f"Corrupt cache file {self._cache_path}, rebuilding") 

107 self._cache_path.unlink(missing_ok=True) 

108 return None 

109 

110 # Validate cache version 

111 if cache_data.get('cache_version') != self.config.cache_version: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 logger.debug(f"Cache version mismatch for {self.cache_name}") 

113 return None 

114 

115 # Validate library/package version 

116 cached_version = cache_data.get('version', 'unknown') 

117 current_version = self.version_getter() 

118 if cached_version != current_version: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 logger.info( 

120 f"{self.cache_name} version changed " 

121 f"({cached_version}{current_version}) - cache invalid" 

122 ) 

123 return None 

124 

125 # Validate cache age 

126 cache_timestamp = cache_data.get('timestamp', 0) 

127 cache_age_days = (time.time() - cache_timestamp) / (24 * 3600) 

128 if cache_age_days > self.config.max_age_days: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 logger.debug( 

130 f"Cache for {self.cache_name} is {cache_age_days:.1f} days old - rebuilding" 

131 ) 

132 return None 

133 

134 # Validate file mtimes if configured 

135 if self.config.check_mtimes and 'file_mtimes' in cache_data: 

136 if not self._validate_mtimes(cache_data['file_mtimes']): 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 logger.debug(f"File modifications detected for {self.cache_name}") 

138 return None 

139 

140 # Deserialize items 

141 items = {} 

142 for key, item_data in cache_data.get('items', {}).items(): 

143 try: 

144 items[key] = self.deserializer(item_data) 

145 except Exception as e: 

146 logger.warning(f"Failed to deserialize {key} from cache: {e}") 

147 return None # Invalidate entire cache on any deserialization error 

148 

149 logger.info(f"✅ Loaded {len(items)} items from {self.cache_name} cache") 

150 return items 

151 

152 def save_cache( 

153 self, 

154 items: Dict[str, T], 

155 file_mtimes: Optional[Dict[str, float]] = None 

156 ) -> None: 

157 """ 

158 Save items to cache. 

159  

160 Args: 

161 items: Dictionary of items to cache 

162 file_mtimes: Optional dict of file paths to modification times 

163 """ 

164 cache_data = { 

165 'cache_version': self.config.cache_version, 

166 'version': self.version_getter(), 

167 'timestamp': time.time(), 

168 'items': {} 

169 } 

170 

171 # Add file mtimes if provided 

172 if file_mtimes: 

173 cache_data['file_mtimes'] = file_mtimes 

174 

175 # Serialize items 

176 for key, item in items.items(): 

177 try: 

178 cache_data['items'][key] = self.serializer(item) 

179 except Exception as e: 

180 logger.warning(f"Failed to serialize {key} for cache: {e}") 

181 

182 # Save to disk 

183 try: 

184 self._cache_path.parent.mkdir(parents=True, exist_ok=True) 

185 with open(self._cache_path, 'w') as f: 

186 json.dump(cache_data, f, indent=2) 

187 logger.info(f"💾 Saved {len(items)} items to {self.cache_name} cache") 

188 except Exception as e: 

189 logger.warning(f"Failed to save {self.cache_name} cache: {e}") 

190 

191 def clear_cache(self) -> None: 

192 """Clear the cache file.""" 

193 if self._cache_path.exists(): 

194 self._cache_path.unlink() 

195 logger.info(f"🧹 Cleared {self.cache_name} cache") 

196 

197 def _validate_mtimes(self, cached_mtimes: Dict[str, float]) -> bool: 

198 """ 

199 Validate that file modification times haven't changed. 

200  

201 Args: 

202 cached_mtimes: Dictionary of file paths to cached mtimes 

203  

204 Returns: 

205 True if all mtimes match, False if any file changed 

206 """ 

207 for file_path, cached_mtime in cached_mtimes.items(): 

208 path = Path(file_path) 

209 if not path.exists(): 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 return False # File was deleted 

211 

212 current_mtime = path.stat().st_mtime 

213 if abs(current_mtime - cached_mtime) > 1.0: # 1 second tolerance 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 return False # File was modified 

215 

216 return True 

217 

218 

219# Serializers for metaclass registries (Pattern A) 

220 

221def serialize_plugin_class(plugin_class: type) -> Dict[str, Any]: 

222 """ 

223 Serialize a plugin class to JSON-compatible dict. 

224  

225 Args: 

226 plugin_class: Plugin class to serialize 

227  

228 Returns: 

229 Dictionary with module and class name 

230 """ 

231 return { 

232 'module': plugin_class.__module__, 

233 'class_name': plugin_class.__name__, 

234 'qualname': plugin_class.__qualname__ 

235 } 

236 

237 

238def deserialize_plugin_class(data: Dict[str, Any]) -> type: 

239 """ 

240 Deserialize a plugin class from JSON-compatible dict. 

241  

242 Args: 

243 data: Dictionary with module and class name 

244  

245 Returns: 

246 Reconstructed plugin class 

247  

248 Raises: 

249 ImportError: If module cannot be imported 

250 AttributeError: If class not found in module 

251 """ 

252 import importlib 

253 

254 module = importlib.import_module(data['module']) 

255 plugin_class = getattr(module, data['class_name']) 

256 return plugin_class 

257 

258 

259def get_package_file_mtimes(package_path: str) -> Dict[str, float]: 

260 """ 

261 Get modification times for all Python files in a package. 

262  

263 Args: 

264 package_path: Package path (e.g., "openhcs.microscopes") 

265  

266 Returns: 

267 Dictionary mapping file paths to modification times 

268 """ 

269 import importlib 

270 from pathlib import Path 

271 

272 try: 

273 pkg = importlib.import_module(package_path) 

274 pkg_dir = Path(pkg.__file__).parent 

275 

276 mtimes = {} 

277 for py_file in pkg_dir.rglob("*.py"): 

278 if not py_file.name.startswith('_'): # Skip __pycache__, etc. 

279 mtimes[str(py_file)] = py_file.stat().st_mtime 

280 

281 return mtimes 

282 except Exception as e: 

283 logger.warning(f"Failed to get mtimes for {package_path}: {e}") 

284 return {} 

285