Coverage for openhcs/processing/backends/analysis/cache_utils.py: 9.7%

126 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Unified caching utilities for external library function registries. 

3 

4Provides common caching patterns extracted from scikit-image registry 

5for use by pyclesperanto, CuPy, and other external library registries. 

6""" 

7 

8import json 

9import logging 

10import time 

11from pathlib import Path 

12from typing import Dict, Optional, Any, Callable 

13 

14from openhcs.core.xdg_paths import get_cache_file_path 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def get_library_cache_path(library_name: str) -> Path: 

20 """ 

21 Get the cache file path for a specific library. 

22 

23 Args: 

24 library_name: Name of the library (e.g., 'pyclesperanto', 'cupy') 

25 

26 Returns: 

27 Path to the cache file 

28 """ 

29 cache_filename = f"{library_name}_function_metadata.json" 

30 return get_cache_file_path(cache_filename) 

31 

32 

33def save_library_metadata( 

34 library_name: str, 

35 registry: Dict[str, Any], 

36 get_version_func: Callable[[], str], 

37 extract_cache_data_func: Callable[[Any], Dict[str, Any]] 

38) -> None: 

39 """ 

40 Save library function metadata to cache. 

41 

42 Args: 

43 library_name: Name of the library 

44 registry: Registry dictionary mapping function names to metadata objects 

45 get_version_func: Function that returns the library version string 

46 extract_cache_data_func: Function that extracts cacheable data from metadata object 

47 """ 

48 cache_path = get_library_cache_path(library_name) 

49 

50 # Get library version 

51 try: 

52 library_version = get_version_func() 

53 except Exception: 

54 library_version = "unknown" 

55 

56 # Build cache data structure 

57 cache_data = { 

58 'cache_version': '1.0', 

59 'library_version': library_version, 

60 'timestamp': time.time(), 

61 'functions': {} 

62 } 

63 

64 # Extract function metadata 

65 for full_name, func_meta in registry.items(): 

66 try: 

67 cache_data['functions'][full_name] = extract_cache_data_func(func_meta) 

68 except Exception as e: 

69 logger.warning(f"Failed to extract cache data for {full_name}: {e}") 

70 

71 # Save to disk 

72 try: 

73 with open(cache_path, 'w') as f: 

74 json.dump(cache_data, f, indent=2) 

75 logger.info(f"Saved {library_name} metadata cache: {len(cache_data['functions'])} functions") 

76 except Exception as e: 

77 logger.warning(f"Failed to save {library_name} metadata cache: {e}") 

78 

79 

80def load_library_metadata( 

81 library_name: str, 

82 get_version_func: Callable[[], str], 

83 max_age_days: int = 7 

84) -> Optional[Dict[str, Dict[str, Any]]]: 

85 """ 

86 Load library function metadata from cache with validation. 

87 

88 Args: 

89 library_name: Name of the library 

90 get_version_func: Function that returns the current library version 

91 max_age_days: Maximum age in days before cache is considered stale 

92 

93 Returns: 

94 Dictionary of cached function metadata, or None if cache invalid 

95 """ 

96 cache_path = get_library_cache_path(library_name) 

97 

98 if not cache_path.exists(): 

99 logger.debug(f"No {library_name} cache found at {cache_path}") 

100 return None 

101 

102 try: 

103 with open(cache_path, 'r') as f: 

104 cache_data = json.load(f) 

105 

106 # Handle old cache format (direct metadata dict) 

107 if 'functions' not in cache_data: 

108 logger.info(f"Found old {library_name} cache format - will rebuild") 

109 return None 

110 

111 # Validate library version 

112 try: 

113 current_version = get_version_func() 

114 except Exception: 

115 current_version = "unknown" 

116 

117 cached_version = cache_data.get('library_version', 'unknown') 

118 if cached_version != current_version: 

119 logger.info(f"{library_name} version changed ({cached_version}{current_version}) - will rebuild cache") 

120 return None 

121 

122 # Check cache age 

123 cache_timestamp = cache_data.get('timestamp', 0) 

124 cache_age_days = (time.time() - cache_timestamp) / (24 * 3600) 

125 if cache_age_days > max_age_days: 

126 logger.info(f"{library_name} cache is {cache_age_days:.1f} days old - will rebuild") 

127 return None 

128 

129 functions = cache_data['functions'] 

130 logger.info(f"Loaded valid {library_name} metadata cache: {len(functions)} functions") 

131 return functions 

132 

133 except Exception as e: 

134 logger.warning(f"Failed to load {library_name} metadata cache: {e}") 

135 return None 

136 

137 

138def clear_library_cache(library_name: str) -> None: 

139 """ 

140 Clear the library metadata cache to force rebuild on next startup. 

141 

142 Args: 

143 library_name: Name of the library 

144 """ 

145 cache_path = get_library_cache_path(library_name) 

146 try: 

147 if cache_path.exists(): 

148 cache_path.unlink() 

149 logger.info(f"{library_name} metadata cache cleared") 

150 else: 

151 logger.info(f"No {library_name} metadata cache to clear") 

152 except Exception as e: 

153 logger.warning(f"Failed to clear {library_name} metadata cache: {e}") 

154 

155 

156def register_functions_from_cache( 

157 library_name: str, 

158 cached_metadata: Dict[str, Dict[str, Any]], 

159 get_function_func: Callable[[str, str], Any], 

160 register_function_func: Callable[[Any, str, str], None], 

161 memory_type: str 

162) -> tuple[int, int]: 

163 """ 

164 Register library functions using cached metadata. 

165 

166 Args: 

167 library_name: Name of the library 

168 cached_metadata: Dictionary of cached function metadata 

169 get_function_func: Function to get the actual function object (module_path, func_name) -> function 

170 register_function_func: Function to register the function (func, func_name, memory_type) -> None 

171 memory_type: Memory type for registration 

172 

173 Returns: 

174 Tuple of (decorated_count, skipped_count) 

175 """ 

176 logger.info(f"Registering {library_name} functions from metadata cache") 

177 

178 decorated_count = 0 

179 skipped_count = 0 

180 

181 for full_name, func_data in cached_metadata.items(): 

182 try: 

183 func_name = func_data['name'] 

184 module_path = func_data['module'] 

185 contract = func_data['contract'] 

186 

187 # Skip functions with unknown or dimension-changing contracts 

188 if contract in ['unknown', 'dim_change']: 

189 skipped_count += 1 

190 continue 

191 

192 # Get the actual function object 

193 original_func = get_function_func(module_path, func_name) 

194 if original_func is None: 

195 logger.warning(f"Could not find function {func_name} in {module_path}") 

196 skipped_count += 1 

197 continue 

198 

199 # Register the function 

200 register_function_func(original_func, func_name, memory_type) 

201 decorated_count += 1 

202 

203 except Exception as e: 

204 logger.error(f"Failed to register {full_name} from cache: {e}") 

205 skipped_count += 1 

206 

207 logger.info(f"Registered {decorated_count} {library_name} functions from cache") 

208 logger.info(f"Skipped {skipped_count} functions (unknown/dim_change contracts or errors)") 

209 

210 return decorated_count, skipped_count 

211 

212 

213def should_use_cache_for_library(library_name: str) -> bool: 

214 """ 

215 Determine if cache should be used for a library based on environment. 

216 

217 Args: 

218 library_name: Name of the library 

219 

220 Returns: 

221 True if cache should be used, False if full discovery should run 

222 """ 

223 import os 

224 

225 # Always use cache in subprocess mode 

226 if os.environ.get('OPENHCS_SUBPROCESS_MODE'): 

227 logger.info(f"SUBPROCESS: Using cached metadata for {library_name} function registration") 

228 return True 

229 

230 # Use cache for TUI speedup too 

231 logger.info(f"Checking for cached metadata to speed up {library_name} startup...") 

232 return True 

233 

234 

235def get_cache_status(library_name: str) -> Dict[str, Any]: 

236 """ 

237 Get status information about a library's cache. 

238 

239 Args: 

240 library_name: Name of the library 

241 

242 Returns: 

243 Dictionary with cache status information 

244 """ 

245 cache_path = get_library_cache_path(library_name) 

246 

247 status = { 

248 'library': library_name, 

249 'cache_file': str(cache_path), 

250 'exists': cache_path.exists(), 

251 'size': None, 

252 'modified': None, 

253 'function_count': None, 

254 'library_version': None, 

255 'cache_age_days': None 

256 } 

257 

258 if status['exists']: 

259 try: 

260 stat = cache_path.stat() 

261 status['size'] = stat.st_size 

262 status['modified'] = stat.st_mtime 

263 

264 # Try to read cache data 

265 with open(cache_path, 'r') as f: 

266 cache_data = json.load(f) 

267 

268 if 'functions' in cache_data: 

269 status['function_count'] = len(cache_data['functions']) 

270 status['library_version'] = cache_data.get('library_version') 

271 

272 cache_timestamp = cache_data.get('timestamp', 0) 

273 if cache_timestamp: 

274 status['cache_age_days'] = (time.time() - cache_timestamp) / (24 * 3600) 

275 

276 except Exception as e: 

277 logger.debug(f"Could not read cache status for {library_name}: {e}") 

278 

279 return status 

280 

281 

282def run_cached_registration(library_name: str, register_from_cache_fn) -> bool: 

283 """ 

284 Try to register functions for a library from cache based on environment heuristics. 

285 

286 Returns True if registration was handled via cache (and caller should stop), 

287 otherwise False to indicate the caller should proceed with full discovery. 

288 """ 

289 try: 

290 if should_use_cache_for_library(library_name): 

291 used = bool(register_from_cache_fn()) 

292 return used 

293 except Exception as e: 

294 logger.warning(f"{library_name}: cache fast path failed with error; falling back to discovery: {e}") 

295 return False 

296