Coverage for openhcs/processing/backends/lib_registry/openhcs_registry.py: 66.5%

159 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2OpenHCS native function registry. 

3 

4This registry processes OpenHCS functions that have been decorated with 

5explicit contract declarations, allowing them to skip runtime testing 

6while producing the same FunctionMetadata format as external libraries. 

7""" 

8 

9import logging 

10import numpy as np 

11from typing import Dict, List, Tuple, Any 

12import importlib 

13 

14from openhcs.processing.backends.lib_registry.unified_registry import LibraryRegistryBase, FunctionMetadata 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class OpenHCSRegistry(LibraryRegistryBase): 

20 """ 

21 Registry for OpenHCS native functions with explicit contract support. 

22 

23 This registry processes OpenHCS functions that have been decorated with 

24 explicit contract declarations, allowing them to skip runtime testing 

25 while producing the same FunctionMetadata format as external libraries. 

26 """ 

27 

28 # Registry name for auto-registration 

29 _registry_name = 'openhcs' 

30 

31 # Required abstract class attributes 

32 MODULES_TO_SCAN = [] # Will be set dynamically 

33 MEMORY_TYPE = None # OpenHCS functions have their own memory type attributes 

34 FLOAT_DTYPE = np.float32 

35 

36 def __init__(self): 

37 super().__init__("openhcs") 

38 # Set modules to scan to OpenHCS processing modules 

39 self.MODULES_TO_SCAN = self._get_openhcs_modules() 

40 

41 def _get_openhcs_modules(self) -> List[str]: 

42 """Get list of OpenHCS processing modules to scan using automatic discovery.""" 

43 import pkgutil 

44 import os 

45 

46 modules = [] 

47 

48 # Get the backends directory path 

49 backends_path = os.path.dirname(__file__) # lib_registry directory 

50 backends_path = os.path.dirname(backends_path) # backends directory 

51 

52 # Walk through all modules in openhcs.processing.backends recursively 

53 for importer, module_name, ispkg in pkgutil.walk_packages( 

54 [backends_path], 

55 "openhcs.processing.backends." 

56 ): 

57 # Skip lib_registry modules to avoid circular imports 

58 if "lib_registry" in module_name: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 continue 

60 

61 # Skip __pycache__ and other non-module files 

62 if "__pycache__" in module_name: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 continue 

64 

65 try: 

66 # Try to import the module to ensure it's valid 

67 importlib.import_module(module_name) 

68 modules.append(module_name) 

69 except ImportError as e: 

70 # Module has import issues, skip it but log for debugging 

71 logger.debug(f"Skipping module {module_name}: {e}") 

72 continue 

73 

74 return modules 

75 

76 def get_modules_to_scan(self) -> List[Tuple[str, Any]]: 

77 """Get modules to scan for OpenHCS functions.""" 

78 modules = [] 

79 for module_name in self.MODULES_TO_SCAN: 

80 try: 

81 module = importlib.import_module(module_name) 

82 modules.append((module_name, module)) 

83 except ImportError as e: 

84 logger.warning(f"Could not import OpenHCS module {module_name}: {e}") 

85 return modules 

86 

87 

88 

89 # ===== ESSENTIAL ABC METHODS ===== 

90 def get_library_version(self) -> str: 

91 """Get OpenHCS version.""" 

92 try: 

93 import openhcs 

94 return getattr(openhcs, '__version__', 'unknown') 

95 except: 

96 return 'unknown' 

97 

98 def is_library_available(self) -> bool: 

99 """OpenHCS is always available.""" 

100 return True 

101 

102 def get_library_object(self): 

103 """Return OpenHCS processing module.""" 

104 import openhcs.processing 

105 return openhcs.processing 

106 

107 def get_memory_type(self) -> str: 

108 """Return placeholder memory type.""" 

109 return self.MEMORY_TYPE 

110 

111 def get_display_name(self) -> str: 

112 """Get display name for OpenHCS.""" 

113 return "OpenHCS" 

114 

115 def get_module_patterns(self) -> List[str]: 

116 """Get module patterns for OpenHCS.""" 

117 return ["openhcs"] 

118 

119 

120 

121 def discover_functions(self) -> Dict[str, FunctionMetadata]: 

122 """Discover OpenHCS functions with memory type decorators and assign default contracts.""" 

123 from openhcs.processing.backends.lib_registry.unified_registry import ProcessingContract 

124 

125 functions = {} 

126 modules = self.get_modules_to_scan() 

127 

128 logger.info(f"🔍 OpenHCS Registry: Scanning {len(modules)} modules for functions with memory type decorators") 

129 

130 for module_name, module in modules: 

131 import inspect 

132 module_function_count = 0 

133 

134 for name, func in inspect.getmembers(module, inspect.isfunction): 

135 # Look for functions with memory type attributes (added by @numpy, @cupy, etc.) 

136 if hasattr(func, 'input_memory_type') and hasattr(func, 'output_memory_type'): 

137 input_type = getattr(func, 'input_memory_type') 

138 output_type = getattr(func, 'output_memory_type') 

139 

140 # Skip if memory types are invalid 

141 valid_memory_types = {'numpy', 'cupy', 'torch', 'tensorflow', 'jax', 'pyclesperanto'} 

142 if input_type not in valid_memory_types or output_type not in valid_memory_types: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 logger.debug(f"Skipping {name} - invalid memory types: {input_type} -> {output_type}") 

144 continue 

145 

146 # Check if function's backend is available before including it 

147 if not self._is_function_backend_available(func): 

148 logger.debug(f"Skipping {name} - backend not available") 

149 continue 

150 

151 # Assign default contract for OpenHCS functions 

152 # Most OpenHCS functions are FLEXIBLE (can handle both 2D and 3D) 

153 contract = ProcessingContract.FLEXIBLE 

154 

155 # Add the contract attribute so other parts of the system can find it 

156 func.__processing_contract__ = contract 

157 

158 # Apply contract wrapper (adds slice_by_slice for FLEXIBLE) 

159 wrapped_func = self.apply_contract_wrapper(func, contract) 

160 

161 # Generate unique function name using module information 

162 unique_name = self._generate_function_name(name, module_name) 

163 

164 # Extract full docstring, not just first line 

165 doc = self._extract_function_docstring(func) 

166 

167 metadata = FunctionMetadata( 

168 name=unique_name, 

169 func=wrapped_func, 

170 contract=contract, 

171 registry=self, 

172 module=func.__module__ or "", 

173 doc=doc, 

174 tags=["openhcs"], 

175 original_name=name 

176 ) 

177 

178 functions[unique_name] = metadata 

179 module_function_count += 1 

180 

181 logger.debug(f" 📦 {module_name}: Found {module_function_count} OpenHCS functions") 

182 

183 logger.info(f"✅ OpenHCS Registry: Discovered {len(functions)} total functions") 

184 return functions 

185 

186 

187 

188 def _generate_function_name(self, original_name: str, module_name: str) -> str: 

189 """Generate unique function name for OpenHCS functions.""" 

190 # Extract meaningful part from module name 

191 if isinstance(module_name, str): 191 ↛ 203line 191 didn't jump to line 203 because the condition on line 191 was always true

192 module_parts = module_name.split('.') 

193 # Find meaningful part after 'backends' 

194 try: 

195 backends_idx = module_parts.index('backends') 

196 meaningful_parts = module_parts[backends_idx+1:] 

197 if meaningful_parts: 197 ↛ 203line 197 didn't jump to line 203 because the condition on line 197 was always true

198 prefix = '_'.join(meaningful_parts) 

199 return f"{prefix}_{original_name}" 

200 except ValueError: 

201 pass 

202 

203 return original_name 

204 

205 def _generate_tags(self, module_name: str) -> List[str]: 

206 """Generate tags for OpenHCS functions.""" 

207 tags = ['openhcs'] 

208 

209 # Add module-specific tags 

210 if isinstance(module_name, str): 

211 module_parts = module_name.split('.') 

212 if 'analysis' in module_parts: 

213 tags.append('analysis') 

214 if 'preprocessing' in module_parts: 

215 tags.append('preprocessing') 

216 if 'segmentation' in module_parts: 

217 tags.append('segmentation') 

218 

219 return tags 

220 

221 def _is_function_backend_available(self, func) -> bool: 

222 """ 

223 Check if the function's backend is available. 

224 

225 For OpenHCS functions with mixed backends, we need to check each function 

226 individually based on its memory type attributes. 

227 

228 Args: 

229 func: Function to check 

230 

231 Returns: 

232 True if the function's backend is available, False otherwise 

233 """ 

234 # Get the function's memory type 

235 memory_type = None 

236 if hasattr(func, 'input_memory_type'): 236 ↛ 238line 236 didn't jump to line 238 because the condition on line 236 was always true

237 memory_type = func.input_memory_type 

238 elif hasattr(func, 'output_memory_type'): 

239 memory_type = func.output_memory_type 

240 elif hasattr(func, 'backend'): 

241 memory_type = func.backend 

242 

243 if not memory_type: 243 ↛ 245line 243 didn't jump to line 245 because the condition on line 243 was never true

244 # If no memory type specified, assume numpy (always available) 

245 return True 

246 

247 # Check backend availability based on memory type 

248 return self._check_backend_availability(memory_type) 

249 

250 def _check_backend_availability(self, memory_type: str) -> bool: 

251 """ 

252 Check if a specific backend/memory type is available using the registry system. 

253 

254 This uses the existing registry system as the source of truth for backend availability, 

255 avoiding hardcoded checks and ensuring consistency across the system. 

256 

257 Args: 

258 memory_type: Memory type to check (e.g., "cupy", "torch", "numpy", "pyclesperanto") 

259 

260 Returns: 

261 True if backend is available, False otherwise 

262 """ 

263 # Import registry service to get registry instances 

264 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

265 

266 # Special case: numpy is always available (no dedicated registry) 

267 if memory_type == "numpy": 

268 return True 

269 

270 # Get all available registries 

271 try: 

272 registry_classes = RegistryService._discover_registries() 

273 

274 # Find the registry that matches this memory type 

275 for registry_class in registry_classes: 

276 try: 

277 registry_instance = registry_class() 

278 

279 # Check if this registry handles the memory type 

280 if hasattr(registry_instance, 'MEMORY_TYPE') and registry_instance.MEMORY_TYPE == memory_type: 

281 # Use the registry's own availability check as source of truth 

282 return registry_instance.is_library_available() 

283 

284 except Exception as e: 

285 logger.debug(f"Failed to check registry {registry_class.__name__}: {e}") 

286 continue 

287 

288 # If no registry found for this memory type, it's not available 

289 logger.debug(f"No registry found for memory type: {memory_type}") 

290 return False 

291 

292 except Exception as e: 

293 logger.warning(f"Failed to check backend availability for {memory_type}: {e}") 

294 return False 

295 

296 def _extract_function_docstring(self, func) -> str: 

297 """ 

298 Extract the full docstring from a function, with proper formatting. 

299 

300 Args: 

301 func: Function to extract docstring from 

302 

303 Returns: 

304 Formatted docstring or empty string if none 

305 """ 

306 if not func.__doc__: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 return "" 

308 

309 # Get the full docstring 

310 docstring = func.__doc__.strip() 

311 

312 # For UI display, we want a concise but informative description 

313 # Take the first paragraph (up to first double newline) or first 200 chars 

314 lines = docstring.split('\n') 

315 

316 # Find the first non-empty line (summary) 

317 summary_lines = [] 

318 for line in lines: 318 ↛ 326line 318 didn't jump to line 326 because the loop on line 318 didn't complete

319 line = line.strip() 

320 if not line and summary_lines: 

321 # Empty line after content - end of summary 

322 break 

323 if line: 323 ↛ 318line 323 didn't jump to line 318 because the condition on line 323 was always true

324 summary_lines.append(line) 

325 

326 if summary_lines: 326 ↛ 333line 326 didn't jump to line 333 because the condition on line 326 was always true

327 summary = ' '.join(summary_lines) 

328 # Limit length for UI display 

329 if len(summary) > 200: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 summary = summary[:197] + "..." 

331 return summary 

332 

333 return ""