Coverage for openhcs/processing/func_registry.py: 43.5%

196 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Function registry for processing backends. 

3 

4This module provides a registry for functions that can be executed by different 

5processing backends (numpy, cupy, torch, etc.). It automatically scans the 

6processing directory to register functions with matching input and output 

7memory types. 

8 

9The function registry is a global singleton that is initialized during application 

10startup and shared across all components. 

11 

12Valid memory types: 

13- numpy 

14- cupy 

15- torch 

16- tensorflow 

17- jax 

18 

19Thread Safety: 

20 All functions in this module are thread-safe and use a lock to ensure 

21 consistent access to the global registry. 

22""" 

23from __future__ import annotations 

24 

25import importlib 

26import inspect 

27import logging 

28import os 

29import pkgutil 

30import sys 

31import threading 

32from typing import Any, Callable, Dict, List, Optional, Set, Tuple 

33 

34logger = logging.getLogger(__name__) 

35 

36# Thread-safe lock for registry access 

37_registry_lock = threading.Lock() 

38 

39# Import hook system for auto-decorating external libraries 

40_original_import = __builtins__['__import__'] 

41_decoration_applied = set() 

42_import_hook_installed = False 

43 

44# Global registry of functions by backend type 

45# Structure: {backend_name: [function1, function2, ...]} 

46FUNC_REGISTRY: Dict[str, List[Callable]] = {} 

47 

48# Valid memory types 

49VALID_MEMORY_TYPES = {"numpy", "cupy", "torch", "tensorflow", "jax", "pyclesperanto"} 

50 

51# CPU-only memory types (for CI/testing without GPU) 

52CPU_ONLY_MEMORY_TYPES = {"numpy"} 

53 

54# Check if CPU-only mode is enabled 

55CPU_ONLY_MODE = os.getenv('OPENHCS_CPU_ONLY', 'false').lower() == 'true' 

56 

57# Flag to track if the registry has been initialized 

58_registry_initialized = False 

59 

60# Flag to track if we're currently in the initialization process (prevent recursion) 

61_registry_initializing = False 

62 

63 

64# Import hook system removed - using existing comprehensive registries with clean decoration 

65 

66 

67# Import hook decoration functions removed - using existing registries 

68 

69 

70def _auto_initialize_registry() -> None: 

71 """ 

72 Auto-initialize the function registry on module import. 

73 

74 This follows the same pattern as storage_registry in openhcs.io.base. 

75 """ 

76 global _registry_initialized 

77 

78 if _registry_initialized: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 return 

80 

81 try: 

82 # Clear and initialize the registry with valid memory types 

83 FUNC_REGISTRY.clear() 

84 

85 # Use CPU-only memory types if CPU_ONLY_MODE is enabled 

86 memory_types_to_use = CPU_ONLY_MEMORY_TYPES if CPU_ONLY_MODE else VALID_MEMORY_TYPES 

87 

88 for memory_type in memory_types_to_use: 

89 FUNC_REGISTRY[memory_type] = [] 

90 

91 if CPU_ONLY_MODE: 91 ↛ 95line 91 didn't jump to line 95 because the condition on line 91 was always true

92 logger.info("CPU-only mode enabled - only registering numpy functions") 

93 

94 # Phase 1: Scan processing directory and register native OpenHCS functions 

95 _scan_and_register_functions() 

96 

97 # Phase 2: Register external library functions 

98 _register_external_libraries() 

99 

100 total_functions = sum(len(funcs) for funcs in FUNC_REGISTRY.values()) 

101 logger.info( 

102 "Function registry auto-initialized with %d functions across %d backends", 

103 total_functions, 

104 len(VALID_MEMORY_TYPES) 

105 ) 

106 

107 # Mark registry as initialized 

108 _registry_initialized = True 

109 

110 except Exception as e: 

111 logger.error(f"Failed to auto-initialize function registry: {e}") 

112 # Initialize empty registry as fallback 

113 FUNC_REGISTRY.clear() 

114 memory_types_to_use = CPU_ONLY_MEMORY_TYPES if CPU_ONLY_MODE else VALID_MEMORY_TYPES 

115 for memory_type in memory_types_to_use: 

116 FUNC_REGISTRY[memory_type] = [] 

117 _registry_initialized = True 

118 

119 

120def initialize_registry() -> None: 

121 """ 

122 Initialize the function registry and scan for functions to register. 

123 

124 This function is now optional since the registry auto-initializes on import. 

125 It can be called to force re-initialization if needed. 

126 

127 Thread-safe: Uses a lock to ensure consistent access to the global registry. 

128 

129 Raises: 

130 RuntimeError: If the registry is already initialized and force=False 

131 """ 

132 with _registry_lock: 

133 global _registry_initialized 

134 

135 # Check if registry is already initialized 

136 if _registry_initialized: 

137 logger.info("Function registry already initialized, skipping manual initialization") 

138 return 

139 

140 # Clear and initialize the registry with valid memory types 

141 FUNC_REGISTRY.clear() 

142 memory_types_to_use = CPU_ONLY_MEMORY_TYPES if CPU_ONLY_MODE else VALID_MEMORY_TYPES 

143 for memory_type in memory_types_to_use: 

144 FUNC_REGISTRY[memory_type] = [] 

145 

146 # Phase 1: Scan processing directory and register native OpenHCS functions 

147 _scan_and_register_functions() 

148 

149 # Phase 2: Register external library functions 

150 _register_external_libraries() 

151 

152 logger.info( 

153 "Function registry initialized with %d functions across %d backends", 

154 sum(len(funcs) for funcs in FUNC_REGISTRY.values()), 

155 len(VALID_MEMORY_TYPES) 

156 ) 

157 

158 # Mark registry as initialized 

159 _registry_initialized = True 

160 

161 

162def load_prebuilt_registry(registry_data: Dict) -> None: 

163 """ 

164 Load a pre-built function registry from serialized data. 

165 

166 This allows subprocess workers to skip function discovery by loading 

167 a registry that was built in the main process. 

168 

169 Args: 

170 registry_data: Dictionary containing the pre-built registry 

171 """ 

172 with _registry_lock: 

173 global _registry_initialized 

174 

175 FUNC_REGISTRY.clear() 

176 FUNC_REGISTRY.update(registry_data) 

177 _registry_initialized = True 

178 

179 total_functions = sum(len(funcs) for funcs in FUNC_REGISTRY.values()) 

180 logger.info(f"Loaded pre-built registry with {total_functions} functions") 

181 

182 

183def _scan_and_register_functions() -> None: 

184 """ 

185 Scan the processing directory for native OpenHCS functions. 

186 

187 This function recursively imports all modules in the processing directory 

188 and registers functions that have matching input_memory_type and output_memory_type 

189 attributes that are in the set of valid memory types. 

190 

191 This is Phase 1 of initialization - only native OpenHCS functions. 

192 External library functions are registered in Phase 2. 

193 """ 

194 from openhcs import processing 

195 

196 processing_path = os.path.dirname(processing.__file__) 

197 processing_package = "openhcs.processing" 

198 

199 logger.info("Phase 1: Scanning for native OpenHCS functions in %s", processing_path) 

200 

201 # Walk through all modules in the processing package 

202 for _, module_name, is_pkg in pkgutil.walk_packages([processing_path], f"{processing_package}."): 

203 try: 

204 # Import the module 

205 logger.debug(f"Scanning module: {module_name}") 

206 module = importlib.import_module(module_name) 

207 

208 # Skip packages (we'll process their modules separately) 

209 if is_pkg: 

210 logger.debug(f"Skipping package: {module_name}") 

211 continue 

212 

213 # Find all functions in the module 

214 function_count = 0 

215 for name, obj in inspect.getmembers(module, inspect.isfunction): 

216 # Check if the function has the required attributes 

217 if hasattr(obj, "input_memory_type") and hasattr(obj, "output_memory_type"): 

218 input_type = getattr(obj, "input_memory_type") 

219 output_type = getattr(obj, "output_memory_type") 

220 

221 # Register if input and output types match and are valid 

222 if input_type == output_type and input_type in VALID_MEMORY_TYPES: 222 ↛ 215line 222 didn't jump to line 215 because the condition on line 222 was always true

223 _register_function(obj, input_type) 

224 function_count += 1 

225 

226 logger.debug(f"Module {module_name}: found {function_count} registerable functions") 

227 except Exception as e: 

228 logger.warning("Error importing module %s: %s", module_name, e) 

229 

230 

231def _apply_unified_decoration(original_func, func_name, memory_type, create_wrapper=True): 

232 """ 

233 Unified decoration pattern for all external library functions. 

234 

235 NOTE: Dtype preservation is now handled at the decorator level in decorators.py. 

236 This function applies memory type attributes, decorator wrappers, and module replacement. 

237 

238 This applies the same hybrid approach across all registries: 

239 1. Direct decoration (for subprocess compatibility) 

240 2. Memory type decorator application (for dtype preservation and other features) 

241 3. Module replacement (for best user experience and pickling compatibility) 

242 

243 Args: 

244 original_func: The original external library function 

245 func_name: Function name for wrapper creation 

246 memory_type: MemoryType enum value (NUMPY, CUPY, PYCLESPERANTO, TORCH, TENSORFLOW, JAX) 

247 create_wrapper: Whether to apply memory type decorator (default: True) 

248 

249 Returns: 

250 The function to register (decorated if create_wrapper=True, original if not) 

251 """ 

252 from openhcs.constants import MemoryType 

253 import sys 

254 

255 # Step 1: Direct decoration (for subprocess compatibility) 

256 original_func.input_memory_type = memory_type.value 

257 original_func.output_memory_type = memory_type.value 

258 

259 if not create_wrapper: 

260 return original_func 

261 

262 # Step 2: Apply memory type decorator (includes dtype preservation, streams, OOM recovery) 

263 from openhcs.core.memory.decorators import numpy, cupy, torch, tensorflow, jax, pyclesperanto 

264 

265 if memory_type == MemoryType.NUMPY: 

266 wrapper_func = numpy(original_func) 

267 elif memory_type == MemoryType.CUPY: 

268 wrapper_func = cupy(original_func) 

269 elif memory_type == MemoryType.TORCH: 

270 wrapper_func = torch(original_func) 

271 elif memory_type == MemoryType.TENSORFLOW: 

272 wrapper_func = tensorflow(original_func) 

273 elif memory_type == MemoryType.JAX: 

274 wrapper_func = jax(original_func) 

275 elif memory_type == MemoryType.PYCLESPERANTO: 

276 wrapper_func = pyclesperanto(original_func) 

277 else: 

278 # Fallback for unknown memory types 

279 wrapper_func = original_func 

280 wrapper_func.input_memory_type = memory_type.value 

281 wrapper_func.output_memory_type = memory_type.value 

282 

283 # Step 3: Module replacement (for best user experience and pickling compatibility) 

284 module_name = original_func.__module__ 

285 if module_name in sys.modules: 

286 target_module = sys.modules[module_name] 

287 if hasattr(target_module, func_name): 

288 setattr(target_module, func_name, wrapper_func) 

289 logger.debug(f"Replaced {module_name}.{func_name} with enhanced function") 

290 

291 return wrapper_func 

292 

293 

294def _register_external_libraries() -> None: 

295 """ 

296 Phase 2: Register external library functions using clean unified registries. 

297 

298 This is separate from core scanning to avoid circular dependencies. 

299 External library registration should use direct registration, not trigger re-initialization. 

300 """ 

301 logger.info("Phase 2: Registering external library functions using unified registries...") 

302 

303 try: 

304 from openhcs.processing.backends.lib_registry.pyclesperanto_registry import PyclesperantoRegistry 

305 registry = PyclesperantoRegistry() 

306 registry.register_functions_direct() 

307 logger.info("Successfully registered pyclesperanto functions") 

308 except ImportError as e: 

309 logger.warning(f"Could not register pyclesperanto functions: {e}") 

310 except Exception as e: 

311 logger.error(f"Error registering pyclesperanto functions: {e}") 

312 

313 try: 

314 from openhcs.processing.backends.lib_registry.scikit_image_registry import SkimageRegistry 

315 registry = SkimageRegistry() 

316 registry.register_functions_direct() 

317 logger.info("Successfully registered scikit-image functions") 

318 except ImportError as e: 

319 logger.warning(f"Could not register scikit-image functions: {e}") 

320 except Exception as e: 

321 logger.error(f"Error registering scikit-image functions: {e}") 

322 

323 try: 

324 from openhcs.processing.backends.lib_registry.cupy_registry import CupyRegistry 

325 registry = CupyRegistry() 

326 registry.register_functions_direct() 

327 logger.info("Successfully registered CuPy ndimage functions") 

328 except ImportError as e: 

329 logger.warning(f"Could not register CuPy functions: {e}") 

330 except Exception as e: 

331 logger.error(f"Error registering CuPy functions: {e}") 

332 

333 

334def register_function(func: Callable, backend: str = None, **kwargs) -> None: 

335 """ 

336 Manually register a function with the function registry. 

337 

338 This is the public API for registering functions that are not auto-discovered 

339 by the module scanner (e.g., dynamically decorated functions). 

340 

341 Args: 

342 func: The function to register (must have input_memory_type and output_memory_type attributes) 

343 backend: Optional backend name (defaults to func.input_memory_type) 

344 **kwargs: Additional metadata (ignored for compatibility) 

345 

346 Raises: 

347 ValueError: If function doesn't have required memory type attributes 

348 ValueError: If memory types are invalid 

349 """ 

350 with _registry_lock: 

351 # Ensure registry is initialized 

352 if not _registry_initialized: 

353 _auto_initialize_registry() 

354 

355 # Validate function has required attributes 

356 if not hasattr(func, "input_memory_type") or not hasattr(func, "output_memory_type"): 

357 raise ValueError( 

358 f"Function '{func.__name__}' must have input_memory_type and output_memory_type attributes" 

359 ) 

360 

361 input_type = func.input_memory_type 

362 output_type = func.output_memory_type 

363 

364 # Validate memory types 

365 if input_type not in VALID_MEMORY_TYPES: 

366 raise ValueError(f"Invalid input memory type: {input_type}") 

367 if output_type not in VALID_MEMORY_TYPES: 

368 raise ValueError(f"Invalid output memory type: {output_type}") 

369 

370 # Use input_memory_type as backend if not specified 

371 memory_type = backend or input_type 

372 if memory_type not in VALID_MEMORY_TYPES: 

373 raise ValueError(f"Invalid backend memory type: {memory_type}") 

374 

375 # Register the function 

376 _register_function(func, memory_type) 

377 

378 

379def _register_function(func: Callable, memory_type: str) -> None: 

380 """ 

381 Register a function for a specific memory type. 

382 

383 This is an internal function used during automatic scanning and manual registration. 

384 

385 Args: 

386 func: The function to register 

387 memory_type: The memory type (e.g., "numpy", "cupy", "torch") 

388 """ 

389 # Skip if function is already registered 

390 if func in FUNC_REGISTRY[memory_type]: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true

391 logger.debug( 

392 "Function '%s' already registered for memory type '%s'", 

393 func.__name__, memory_type 

394 ) 

395 return 

396 

397 # Add function to registry 

398 FUNC_REGISTRY[memory_type].append(func) 

399 

400 # Add memory_type attribute for easier inspection 

401 setattr(func, "backend", memory_type) 

402 

403 logger.debug( 

404 "Registered function '%s' for memory type '%s'", 

405 func.__name__, memory_type 

406 ) 

407 

408 

409def get_functions_by_memory_type(memory_type: str) -> List[Callable]: 

410 """ 

411 Get all functions registered for a specific memory type. 

412  

413 Thread-safe: Uses a lock to ensure consistent access to the global registry. 

414  

415 Args: 

416 memory_type: The memory type (e.g., "numpy", "cupy", "torch") 

417  

418 Returns: 

419 A list of functions registered for the specified memory type 

420  

421 Raises: 

422 RuntimeError: If the registry is not initialized 

423 ValueError: If the memory type is not valid 

424 """ 

425 with _registry_lock: 

426 # Check if registry is initialized (should be auto-initialized on import) 

427 if not _registry_initialized: 

428 logger.warning("Function registry not initialized, auto-initializing now") 

429 _auto_initialize_registry() 

430 

431 # Check if memory type is valid 

432 if memory_type not in VALID_MEMORY_TYPES: 

433 raise ValueError( 

434 f"Invalid memory type: {memory_type}. " 

435 f"Valid types are: {', '.join(sorted(VALID_MEMORY_TYPES))}" 

436 ) 

437 

438 # Return a copy of the list to prevent external modification 

439 return list(FUNC_REGISTRY[memory_type]) 

440 

441 

442def get_function_info(func: Callable) -> Dict[str, Any]: 

443 """ 

444 Get information about a registered function. 

445  

446 Args: 

447 func: The function to get information about 

448  

449 Returns: 

450 A dictionary containing information about the function 

451  

452 Raises: 

453 ValueError: If the function does not have memory type attributes 

454 """ 

455 if not hasattr(func, "input_memory_type") or not hasattr(func, "output_memory_type"): 

456 raise ValueError( 

457 f"Function '{func.__name__}' does not have memory type attributes" 

458 ) 

459 

460 return { 

461 "name": func.__name__, 

462 "input_memory_type": func.input_memory_type, 

463 "output_memory_type": func.output_memory_type, 

464 "backend": getattr(func, "backend", func.input_memory_type), 

465 "doc": func.__doc__, 

466 "module": func.__module__ 

467 } 

468 

469 

470def is_registry_initialized() -> bool: 

471 """ 

472 Check if the function registry has been initialized. 

473  

474 Thread-safe: Uses a lock to ensure consistent access to the initialization flag. 

475  

476 Returns: 

477 True if the registry is initialized, False otherwise 

478 """ 

479 with _registry_lock: 

480 return _registry_initialized 

481 

482 

483def get_valid_memory_types() -> Set[str]: 

484 """ 

485 Get the set of valid memory types. 

486 

487 Returns: 

488 A set of valid memory type names 

489 """ 

490 return VALID_MEMORY_TYPES.copy() 

491 

492 

493# Import hook system removed - using existing comprehensive registries 

494 

495 

496def get_function_by_name(function_name: str, memory_type: str) -> Optional[Callable]: 

497 """ 

498 Get a specific function by name and memory type from the registry. 

499 

500 Args: 

501 function_name: Name of the function to find 

502 memory_type: The memory type (e.g., "numpy", "cupy", "torch") 

503 

504 Returns: 

505 The function if found, None otherwise 

506 

507 Raises: 

508 RuntimeError: If the registry is not initialized 

509 ValueError: If the memory type is not valid 

510 """ 

511 functions = get_functions_by_memory_type(memory_type) 

512 

513 for func in functions: 

514 if func.__name__ == function_name: 

515 return func 

516 

517 return None 

518 

519 

520def get_all_function_names(memory_type: str) -> List[str]: 

521 """ 

522 Get all function names registered for a specific memory type. 

523 

524 Args: 

525 memory_type: The memory type (e.g., "numpy", "cupy", "torch") 

526 

527 Returns: 

528 A list of function names 

529 

530 Raises: 

531 RuntimeError: If the registry is not initialized 

532 ValueError: If the memory type is not valid 

533 """ 

534 functions = get_functions_by_memory_type(memory_type) 

535 return [func.__name__ for func in functions] 

536 

537 

538# Auto-initialize the registry on module import (following storage_registry pattern) 

539# Skip initialization in subprocess workers for faster startup 

540import os 

541if not os.environ.get('OPENHCS_SUBPROCESS_MODE'): 541 ↛ exitline 541 didn't exit the module because the condition on line 541 was always true

542 _auto_initialize_registry()