Coverage for openhcs/processing/backends/analysis/cache_utils.py: 9.7%
126 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Unified caching utilities for external library function registries.
4Provides common caching patterns extracted from scikit-image registry
5for use by pyclesperanto, CuPy, and other external library registries.
6"""
8import json
9import logging
10import time
11from pathlib import Path
12from typing import Dict, Optional, Any, Callable
14from openhcs.core.xdg_paths import get_cache_file_path
16logger = logging.getLogger(__name__)
19def get_library_cache_path(library_name: str) -> Path:
20 """
21 Get the cache file path for a specific library.
23 Args:
24 library_name: Name of the library (e.g., 'pyclesperanto', 'cupy')
26 Returns:
27 Path to the cache file
28 """
29 cache_filename = f"{library_name}_function_metadata.json"
30 return get_cache_file_path(cache_filename)
33def save_library_metadata(
34 library_name: str,
35 registry: Dict[str, Any],
36 get_version_func: Callable[[], str],
37 extract_cache_data_func: Callable[[Any], Dict[str, Any]]
38) -> None:
39 """
40 Save library function metadata to cache.
42 Args:
43 library_name: Name of the library
44 registry: Registry dictionary mapping function names to metadata objects
45 get_version_func: Function that returns the library version string
46 extract_cache_data_func: Function that extracts cacheable data from metadata object
47 """
48 cache_path = get_library_cache_path(library_name)
50 # Get library version
51 try:
52 library_version = get_version_func()
53 except Exception:
54 library_version = "unknown"
56 # Build cache data structure
57 cache_data = {
58 'cache_version': '1.0',
59 'library_version': library_version,
60 'timestamp': time.time(),
61 'functions': {}
62 }
64 # Extract function metadata
65 for full_name, func_meta in registry.items():
66 try:
67 cache_data['functions'][full_name] = extract_cache_data_func(func_meta)
68 except Exception as e:
69 logger.warning(f"Failed to extract cache data for {full_name}: {e}")
71 # Save to disk
72 try:
73 with open(cache_path, 'w') as f:
74 json.dump(cache_data, f, indent=2)
75 logger.info(f"Saved {library_name} metadata cache: {len(cache_data['functions'])} functions")
76 except Exception as e:
77 logger.warning(f"Failed to save {library_name} metadata cache: {e}")
80def load_library_metadata(
81 library_name: str,
82 get_version_func: Callable[[], str],
83 max_age_days: int = 7
84) -> Optional[Dict[str, Dict[str, Any]]]:
85 """
86 Load library function metadata from cache with validation.
88 Args:
89 library_name: Name of the library
90 get_version_func: Function that returns the current library version
91 max_age_days: Maximum age in days before cache is considered stale
93 Returns:
94 Dictionary of cached function metadata, or None if cache invalid
95 """
96 cache_path = get_library_cache_path(library_name)
98 if not cache_path.exists():
99 logger.debug(f"No {library_name} cache found at {cache_path}")
100 return None
102 try:
103 with open(cache_path, 'r') as f:
104 cache_data = json.load(f)
106 # Handle old cache format (direct metadata dict)
107 if 'functions' not in cache_data:
108 logger.info(f"Found old {library_name} cache format - will rebuild")
109 return None
111 # Validate library version
112 try:
113 current_version = get_version_func()
114 except Exception:
115 current_version = "unknown"
117 cached_version = cache_data.get('library_version', 'unknown')
118 if cached_version != current_version:
119 logger.info(f"{library_name} version changed ({cached_version} → {current_version}) - will rebuild cache")
120 return None
122 # Check cache age
123 cache_timestamp = cache_data.get('timestamp', 0)
124 cache_age_days = (time.time() - cache_timestamp) / (24 * 3600)
125 if cache_age_days > max_age_days:
126 logger.info(f"{library_name} cache is {cache_age_days:.1f} days old - will rebuild")
127 return None
129 functions = cache_data['functions']
130 logger.info(f"Loaded valid {library_name} metadata cache: {len(functions)} functions")
131 return functions
133 except Exception as e:
134 logger.warning(f"Failed to load {library_name} metadata cache: {e}")
135 return None
138def clear_library_cache(library_name: str) -> None:
139 """
140 Clear the library metadata cache to force rebuild on next startup.
142 Args:
143 library_name: Name of the library
144 """
145 cache_path = get_library_cache_path(library_name)
146 try:
147 if cache_path.exists():
148 cache_path.unlink()
149 logger.info(f"{library_name} metadata cache cleared")
150 else:
151 logger.info(f"No {library_name} metadata cache to clear")
152 except Exception as e:
153 logger.warning(f"Failed to clear {library_name} metadata cache: {e}")
156def register_functions_from_cache(
157 library_name: str,
158 cached_metadata: Dict[str, Dict[str, Any]],
159 get_function_func: Callable[[str, str], Any],
160 register_function_func: Callable[[Any, str, str], None],
161 memory_type: str
162) -> tuple[int, int]:
163 """
164 Register library functions using cached metadata.
166 Args:
167 library_name: Name of the library
168 cached_metadata: Dictionary of cached function metadata
169 get_function_func: Function to get the actual function object (module_path, func_name) -> function
170 register_function_func: Function to register the function (func, func_name, memory_type) -> None
171 memory_type: Memory type for registration
173 Returns:
174 Tuple of (decorated_count, skipped_count)
175 """
176 logger.info(f"Registering {library_name} functions from metadata cache")
178 decorated_count = 0
179 skipped_count = 0
181 for full_name, func_data in cached_metadata.items():
182 try:
183 func_name = func_data['name']
184 module_path = func_data['module']
185 contract = func_data['contract']
187 # Skip functions with unknown or dimension-changing contracts
188 if contract in ['unknown', 'dim_change']:
189 skipped_count += 1
190 continue
192 # Get the actual function object
193 original_func = get_function_func(module_path, func_name)
194 if original_func is None:
195 logger.warning(f"Could not find function {func_name} in {module_path}")
196 skipped_count += 1
197 continue
199 # Register the function
200 register_function_func(original_func, func_name, memory_type)
201 decorated_count += 1
203 except Exception as e:
204 logger.error(f"Failed to register {full_name} from cache: {e}")
205 skipped_count += 1
207 logger.info(f"Registered {decorated_count} {library_name} functions from cache")
208 logger.info(f"Skipped {skipped_count} functions (unknown/dim_change contracts or errors)")
210 return decorated_count, skipped_count
213def should_use_cache_for_library(library_name: str) -> bool:
214 """
215 Determine if cache should be used for a library based on environment.
217 Args:
218 library_name: Name of the library
220 Returns:
221 True if cache should be used, False if full discovery should run
222 """
223 import os
225 # Always use cache in subprocess mode
226 if os.environ.get('OPENHCS_SUBPROCESS_MODE'):
227 logger.info(f"SUBPROCESS: Using cached metadata for {library_name} function registration")
228 return True
230 # Use cache for TUI speedup too
231 logger.info(f"Checking for cached metadata to speed up {library_name} startup...")
232 return True
235def get_cache_status(library_name: str) -> Dict[str, Any]:
236 """
237 Get status information about a library's cache.
239 Args:
240 library_name: Name of the library
242 Returns:
243 Dictionary with cache status information
244 """
245 cache_path = get_library_cache_path(library_name)
247 status = {
248 'library': library_name,
249 'cache_file': str(cache_path),
250 'exists': cache_path.exists(),
251 'size': None,
252 'modified': None,
253 'function_count': None,
254 'library_version': None,
255 'cache_age_days': None
256 }
258 if status['exists']:
259 try:
260 stat = cache_path.stat()
261 status['size'] = stat.st_size
262 status['modified'] = stat.st_mtime
264 # Try to read cache data
265 with open(cache_path, 'r') as f:
266 cache_data = json.load(f)
268 if 'functions' in cache_data:
269 status['function_count'] = len(cache_data['functions'])
270 status['library_version'] = cache_data.get('library_version')
272 cache_timestamp = cache_data.get('timestamp', 0)
273 if cache_timestamp:
274 status['cache_age_days'] = (time.time() - cache_timestamp) / (24 * 3600)
276 except Exception as e:
277 logger.debug(f"Could not read cache status for {library_name}: {e}")
279 return status
282def run_cached_registration(library_name: str, register_from_cache_fn) -> bool:
283 """
284 Try to register functions for a library from cache based on environment heuristics.
286 Returns True if registration was handled via cache (and caller should stop),
287 otherwise False to indicate the caller should proceed with full discovery.
288 """
289 try:
290 if should_use_cache_for_library(library_name):
291 used = bool(register_from_cache_fn())
292 return used
293 except Exception as e:
294 logger.warning(f"{library_name}: cache fast path failed with error; falling back to discovery: {e}")
295 return False