Coverage for openhcs/processing/func_registry.py: 37.2%

227 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Function registry for processing backends. 

3 

4This module provides a registry for functions that can be executed by different 

5processing backends (numpy, cupy, torch, etc.). It automatically scans the 

6processing directory to register functions with matching input and output 

7memory types. 

8 

9The function registry is a global singleton that is initialized during application 

10startup and shared across all components. 

11 

12Valid memory types: 

13- numpy 

14- cupy 

15- torch 

16- tensorflow 

17- jax 

18 

19Thread Safety: 

20 All functions in this module are thread-safe and use a lock to ensure 

21 consistent access to the global registry. 

22""" 

23from __future__ import annotations 

24 

25import importlib 

26import inspect 

27import logging 

28import os 

29import pkgutil 

30import sys 

31import threading 

32from typing import Any, Callable, Dict, List, Optional, Set 

33 

34logger = logging.getLogger(__name__) 

35 

36# Thread-safe lock for registry access 

37_registry_lock = threading.Lock() 

38 

39# Import hook system for auto-decorating external libraries 

40_original_import = __builtins__['__import__'] 

41_decoration_applied = set() 

42_import_hook_installed = False 

43 

44# Global registry of functions by backend type 

45# Structure: {backend_name: [function1, function2, ...]} 

46FUNC_REGISTRY: Dict[str, List[Callable]] = {} 

47 

48# Valid memory types 

49VALID_MEMORY_TYPES = {"numpy", "cupy", "torch", "tensorflow", "jax", "pyclesperanto"} 

50 

51# CPU-only memory types (for CI/testing without GPU) 

52CPU_ONLY_MEMORY_TYPES = {"numpy"} 

53 

54# Check if CPU-only mode is enabled 

55CPU_ONLY_MODE = os.getenv('OPENHCS_CPU_ONLY', 'false').lower() == 'true' 

56 

57# Flag to track if the registry has been initialized 

58_registry_initialized = False 

59 

60# Flag to track if we're currently in the initialization process (prevent recursion) 

61_registry_initializing = False 

62 

63 

64# Import hook system removed - using existing comprehensive registries with clean decoration 

65 

66 

67# Import hook decoration functions removed - using existing registries 

68 

69 

70def _create_virtual_modules() -> None: 

71 """Create virtual modules that mirror external library structure under openhcs namespace.""" 

72 import types 

73 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

74 

75 # Get all registered functions 

76 all_functions = RegistryService.get_all_functions_with_metadata() 

77 

78 # Group functions by their full module path 

79 functions_by_module = {} 

80 for composite_key, metadata in all_functions.items(): 

81 # Only create virtual modules for external library functions with slice_by_slice 

82 if (hasattr(metadata.func, 'slice_by_slice') and 

83 not hasattr(metadata.func, '__processing_contract__') and 

84 not metadata.func.__module__.startswith('openhcs.')): 

85 

86 original_module = metadata.func.__module__ 

87 virtual_module = f'openhcs.{original_module}' 

88 if virtual_module not in functions_by_module: 

89 functions_by_module[virtual_module] = {} 

90 functions_by_module[virtual_module][metadata.func.__name__] = metadata.func 

91 

92 # Create virtual modules for each module path 

93 created_modules = [] 

94 all_virtual_modules = set() 

95 

96 # First, collect all module paths including intermediate ones 

97 for virtual_module in functions_by_module.keys(): 

98 parts = virtual_module.split('.') 

99 for i in range(2, len(parts) + 1): # Start from 'openhcs.xxx' 

100 intermediate_module = '.'.join(parts[:i]) 

101 all_virtual_modules.add(intermediate_module) 

102 

103 # Create intermediate modules first (in order) 

104 for virtual_module in sorted(all_virtual_modules): 

105 if virtual_module not in sys.modules: 105 ↛ 104line 105 didn't jump to line 104 because the condition on line 105 was always true

106 module = types.ModuleType(virtual_module) 

107 module.__doc__ = f"Virtual module mirroring {virtual_module.replace('openhcs.', '')} with OpenHCS decorations" 

108 sys.modules[virtual_module] = module 

109 created_modules.append(virtual_module) 

110 

111 # Then add functions to the leaf modules 

112 for virtual_module, functions in functions_by_module.items(): 

113 if virtual_module in sys.modules: 113 ↛ 112line 113 didn't jump to line 112 because the condition on line 113 was always true

114 module = sys.modules[virtual_module] 

115 # Add all functions from this module 

116 for func_name, func in functions.items(): 

117 setattr(module, func_name, func) 

118 

119 if created_modules: 119 ↛ exitline 119 didn't return from function '_create_virtual_modules' because the condition on line 119 was always true

120 logger.info(f"Created {len(created_modules)} virtual modules: {', '.join(created_modules)}") 

121 

122 

123def _auto_initialize_registry() -> None: 

124 """ 

125 Auto-initialize the function registry on module import. 

126 

127 This follows the same pattern as storage_registry in openhcs.io.base. 

128 """ 

129 global _registry_initialized 

130 

131 if _registry_initialized: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 return 

133 

134 try: 

135 # Clear and initialize the registry 

136 FUNC_REGISTRY.clear() 

137 

138 # Phase 1: Register all functions from RegistryService (includes OpenHCS and external libraries) 

139 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

140 all_functions = RegistryService.get_all_functions_with_metadata() 

141 

142 # Initialize registry structure based on discovered registries 

143 # Handle composite keys from RegistryService (backend:function_name) 

144 for composite_key, metadata in all_functions.items(): 

145 registry_name = metadata.registry.library_name 

146 if registry_name not in FUNC_REGISTRY: 

147 FUNC_REGISTRY[registry_name] = [] 

148 

149 # Register all functions 

150 for composite_key, metadata in all_functions.items(): 

151 registry_name = metadata.registry.library_name 

152 FUNC_REGISTRY[registry_name].append(metadata.func) 

153 

154 # Phase 2: Apply CPU-only filtering if enabled 

155 if CPU_ONLY_MODE: 155 ↛ 159line 155 didn't jump to line 159 because the condition on line 155 was always true

156 logger.info("CPU-only mode enabled - filtering to numpy functions only") 

157 _apply_cpu_only_filtering() 

158 

159 total_functions = sum(len(funcs) for funcs in FUNC_REGISTRY.values()) 

160 logger.info( 

161 "Function registry auto-initialized with %d functions across %d registries", 

162 total_functions, 

163 len(FUNC_REGISTRY) 

164 ) 

165 

166 # Mark registry as initialized 

167 _registry_initialized = True 

168 

169 # Create virtual modules for external library functions 

170 _create_virtual_modules() 

171 

172 except Exception as e: 

173 logger.error(f"Failed to auto-initialize function registry: {e}") 

174 raise 

175 

176 

177def initialize_registry() -> None: 

178 """ 

179 Initialize the function registry and scan for functions to register. 

180 

181 This function is now optional since the registry auto-initializes on import. 

182 It can be called to force re-initialization if needed. 

183 

184 Thread-safe: Uses a lock to ensure consistent access to the global registry. 

185 

186 Raises: 

187 RuntimeError: If the registry is already initialized and force=False 

188 """ 

189 with _registry_lock: 

190 global _registry_initialized 

191 

192 # Check if registry is already initialized 

193 if _registry_initialized: 

194 logger.info("Function registry already initialized, skipping manual initialization") 

195 return 

196 

197 # Clear and initialize the registry 

198 FUNC_REGISTRY.clear() 

199 

200 # Phase 1: Register all functions from RegistryService (includes OpenHCS and external libraries) 

201 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

202 all_functions = RegistryService.get_all_functions_with_metadata() 

203 

204 # Initialize registry structure based on discovered registries 

205 # Handle composite keys from RegistryService (backend:function_name) 

206 for composite_key, metadata in all_functions.items(): 

207 registry_name = metadata.registry.library_name 

208 if registry_name not in FUNC_REGISTRY: 

209 FUNC_REGISTRY[registry_name] = [] 

210 

211 # Register all functions 

212 for composite_key, metadata in all_functions.items(): 

213 registry_name = metadata.registry.library_name 

214 FUNC_REGISTRY[registry_name].append(metadata.func) 

215 

216 # Phase 2: Apply CPU-only filtering if enabled 

217 if CPU_ONLY_MODE: 

218 logger.info("CPU-only mode enabled - filtering to numpy functions only") 

219 _apply_cpu_only_filtering() 

220 

221 logger.info( 

222 "Function registry initialized with %d functions across %d registries", 

223 sum(len(funcs) for funcs in FUNC_REGISTRY.values()), 

224 len(FUNC_REGISTRY) 

225 ) 

226 

227 # Mark registry as initialized 

228 _registry_initialized = True 

229 

230 # Create virtual modules for external library functions 

231 _create_virtual_modules() 

232 

233 

234def load_prebuilt_registry(registry_data: Dict) -> None: 

235 """ 

236 Load a pre-built function registry from serialized data. 

237 

238 This allows subprocess workers to skip function discovery by loading 

239 a registry that was built in the main process. 

240 

241 Args: 

242 registry_data: Dictionary containing the pre-built registry 

243 """ 

244 with _registry_lock: 

245 global _registry_initialized 

246 

247 FUNC_REGISTRY.clear() 

248 FUNC_REGISTRY.update(registry_data) 

249 _registry_initialized = True 

250 

251 total_functions = sum(len(funcs) for funcs in FUNC_REGISTRY.values()) 

252 logger.info(f"Loaded pre-built registry with {total_functions} functions") 

253 

254 

255def _scan_and_register_functions() -> None: 

256 """ 

257 Scan the processing directory for native OpenHCS functions. 

258 

259 This function recursively imports all modules in the processing directory 

260 and registers functions that have matching input_memory_type and output_memory_type 

261 attributes that are in the set of valid memory types. 

262 

263 This is Phase 1 of initialization - only native OpenHCS functions. 

264 External library functions are registered in Phase 2. 

265 """ 

266 from openhcs import processing 

267 

268 processing_path = os.path.dirname(processing.__file__) 

269 processing_package = "openhcs.processing" 

270 

271 logger.info("Phase 1: Scanning for native OpenHCS functions in %s", processing_path) 

272 

273 # Walk through all modules in the processing package 

274 for _, module_name, is_pkg in pkgutil.walk_packages([processing_path], f"{processing_package}."): 

275 try: 

276 # Import the module 

277 logger.debug(f"Scanning module: {module_name}") 

278 module = importlib.import_module(module_name) 

279 

280 # Skip packages (we'll process their modules separately) 

281 if is_pkg: 

282 logger.debug(f"Skipping package: {module_name}") 

283 continue 

284 

285 # Find all functions in the module 

286 function_count = 0 

287 for name, obj in inspect.getmembers(module, inspect.isfunction): 

288 # Check if the function has the required attributes 

289 if hasattr(obj, "input_memory_type") and hasattr(obj, "output_memory_type"): 

290 input_type = getattr(obj, "input_memory_type") 

291 output_type = getattr(obj, "output_memory_type") 

292 

293 # Register if input and output types are valid (OpenHCS functions can have mixed types) 

294 if input_type in VALID_MEMORY_TYPES and output_type in VALID_MEMORY_TYPES: 

295 _register_function(obj, "openhcs") 

296 function_count += 1 

297 

298 logger.debug(f"Module {module_name}: found {function_count} registerable functions") 

299 except Exception as e: 

300 logger.warning("Error importing module %s: %s", module_name, e) 

301 

302 

303def _apply_unified_decoration(original_func, func_name, memory_type, create_wrapper=True): 

304 """ 

305 Unified decoration pattern for all external library functions. 

306 

307 NOTE: Dtype preservation is now handled at the decorator level in decorators.py. 

308 This function applies memory type attributes, decorator wrappers, and module replacement. 

309 

310 This applies the same hybrid approach across all registries: 

311 1. Direct decoration (for subprocess compatibility) 

312 2. Memory type decorator application (for dtype preservation and other features) 

313 3. Module replacement (for best user experience and pickling compatibility) 

314 

315 Args: 

316 original_func: The original external library function 

317 func_name: Function name for wrapper creation 

318 memory_type: MemoryType enum value (NUMPY, CUPY, PYCLESPERANTO, TORCH, TENSORFLOW, JAX) 

319 create_wrapper: Whether to apply memory type decorator (default: True) 

320 

321 Returns: 

322 The function to register (decorated if create_wrapper=True, original if not) 

323 """ 

324 from openhcs.constants import MemoryType 

325 

326 # Step 1: Direct decoration (for subprocess compatibility) 

327 original_func.input_memory_type = memory_type.value 

328 original_func.output_memory_type = memory_type.value 

329 

330 if not create_wrapper: 

331 return original_func 

332 

333 # Step 2: Apply memory type decorator (includes dtype preservation, streams, OOM recovery) 

334 from openhcs.core.memory.decorators import numpy, cupy, torch, tensorflow, jax, pyclesperanto 

335 

336 if memory_type == MemoryType.NUMPY: 

337 wrapper_func = numpy(original_func) 

338 elif memory_type == MemoryType.CUPY: 

339 wrapper_func = cupy(original_func) 

340 elif memory_type == MemoryType.TORCH: 

341 wrapper_func = torch(original_func) 

342 elif memory_type == MemoryType.TENSORFLOW: 

343 wrapper_func = tensorflow(original_func) 

344 elif memory_type == MemoryType.JAX: 

345 wrapper_func = jax(original_func) 

346 elif memory_type == MemoryType.PYCLESPERANTO: 

347 wrapper_func = pyclesperanto(original_func) 

348 else: 

349 # Fallback for unknown memory types 

350 wrapper_func = original_func 

351 wrapper_func.input_memory_type = memory_type.value 

352 wrapper_func.output_memory_type = memory_type.value 

353 

354 # Step 3: Module replacement (for best user experience and pickling compatibility) 

355 module_name = original_func.__module__ 

356 if module_name in sys.modules: 

357 target_module = sys.modules[module_name] 

358 if hasattr(target_module, func_name): 

359 setattr(target_module, func_name, wrapper_func) 

360 logger.debug(f"Replaced {module_name}.{func_name} with enhanced function") 

361 

362 return wrapper_func 

363 

364 

365 

366 

367def register_function(func: Callable, backend: str = None, **kwargs) -> None: 

368 """ 

369 Manually register a function with the function registry. 

370 

371 This is the public API for registering functions that are not auto-discovered 

372 by the module scanner (e.g., dynamically decorated functions). 

373 

374 Args: 

375 func: The function to register (must have input_memory_type and output_memory_type attributes) 

376 backend: Optional backend name (defaults to func.input_memory_type) 

377 **kwargs: Additional metadata (ignored for compatibility) 

378 

379 Raises: 

380 ValueError: If function doesn't have required memory type attributes 

381 ValueError: If memory types are invalid 

382 """ 

383 with _registry_lock: 

384 # Ensure registry is initialized 

385 if not _registry_initialized: 

386 _auto_initialize_registry() 

387 

388 # Validate function has required attributes 

389 if not hasattr(func, "input_memory_type") or not hasattr(func, "output_memory_type"): 

390 raise ValueError( 

391 f"Function '{func.__name__}' must have input_memory_type and output_memory_type attributes" 

392 ) 

393 

394 input_type = func.input_memory_type 

395 output_type = func.output_memory_type 

396 

397 # Validate memory types 

398 if input_type not in VALID_MEMORY_TYPES: 

399 raise ValueError(f"Invalid input memory type: {input_type}") 

400 if output_type not in VALID_MEMORY_TYPES: 

401 raise ValueError(f"Invalid output memory type: {output_type}") 

402 

403 # Use backend if specified, otherwise register as openhcs 

404 registry_name = backend or "openhcs" 

405 if registry_name not in FUNC_REGISTRY: 

406 raise ValueError(f"Invalid registry name: {registry_name}") 

407 

408 # Register the function 

409 _register_function(func, registry_name) 

410 

411 

412def _apply_cpu_only_filtering() -> None: 

413 """Filter registry to only include numpy-compatible functions when CPU_ONLY_MODE is enabled.""" 

414 for registry_name, functions in list(FUNC_REGISTRY.items()): 

415 filtered_functions = [] 

416 for func in functions: 

417 # Only keep functions with numpy memory types 

418 if hasattr(func, 'output_memory_type') and func.output_memory_type == "numpy": 418 ↛ 416line 418 didn't jump to line 416 because the condition on line 418 was always true

419 filtered_functions.append(func) 

420 

421 # Update registry with filtered functions, remove empty registries 

422 if filtered_functions: 422 ↛ 425line 422 didn't jump to line 425 because the condition on line 422 was always true

423 FUNC_REGISTRY[registry_name] = filtered_functions 

424 else: 

425 del FUNC_REGISTRY[registry_name] 

426 

427 

428def _register_function(func: Callable, registry_name: str) -> None: 

429 """ 

430 Register a function for a specific registry. 

431 

432 This is an internal function used during automatic scanning and manual registration. 

433 

434 Args: 

435 func: The function to register 

436 registry_name: The registry name (e.g., "openhcs", "skimage", "pyclesperanto") 

437 """ 

438 # Skip if function is already registered 

439 if func in FUNC_REGISTRY[registry_name]: 

440 logger.debug( 

441 "Function '%s' already registered for registry '%s'", 

442 func.__name__, registry_name 

443 ) 

444 return 

445 

446 # Add function to registry 

447 FUNC_REGISTRY[registry_name].append(func) 

448 

449 # Add registry_name attribute for easier inspection 

450 setattr(func, "registry", registry_name) 

451 

452 logger.debug( 

453 "Registered function '%s' for memory type '%s'", 

454 func.__name__, memory_type 

455 ) 

456 

457 

458def get_functions_by_memory_type(memory_type: str) -> List[Callable]: 

459 """ 

460 Get all functions for a specific memory type using the new RegistryService. 

461 

462 Args: 

463 memory_type: The memory type (e.g., "numpy", "cupy", "torch") 

464 

465 Returns: 

466 A list of functions for the specified memory type 

467 

468 Raises: 

469 ValueError: If the memory type is not valid 

470 """ 

471 # Check if memory type is valid 

472 if memory_type not in VALID_MEMORY_TYPES: 

473 raise ValueError( 

474 f"Invalid memory type: {memory_type}. " 

475 f"Valid types are: {', '.join(sorted(VALID_MEMORY_TYPES))}" 

476 ) 

477 

478 # Get functions from new RegistryService 

479 from openhcs.processing.backends.lib_registry.registry_service import RegistryService 

480 all_functions = RegistryService.get_all_functions_with_metadata() 

481 

482 # Filter functions by memory type using proper architecture 

483 functions = [] 

484 for func_name, metadata in all_functions.items(): 

485 # Handle two distinct patterns: 

486 

487 # 1. Runtime Testing Libraries: Use registry's MEMORY_TYPE attribute 

488 if hasattr(metadata, 'registry') and hasattr(metadata.registry, 'MEMORY_TYPE'): 

489 if metadata.registry.MEMORY_TYPE == memory_type: 

490 functions.append(metadata.func) 

491 

492 # 2. OpenHCS Native Functions: Check function's own memory type attributes 

493 elif metadata.tags and 'openhcs' in metadata.tags: 

494 # Check if function has memory type information 

495 func = metadata.func 

496 if hasattr(func, 'input_memory_type') and func.input_memory_type == memory_type: 

497 functions.append(func) 

498 elif hasattr(func, 'backend') and func.backend == memory_type: 

499 functions.append(func) 

500 

501 # Also include legacy FUNC_REGISTRY functions for backward compatibility 

502 with _registry_lock: 

503 if _registry_initialized and memory_type in FUNC_REGISTRY: 

504 functions.extend(FUNC_REGISTRY[memory_type]) 

505 

506 return functions 

507 

508 

509def get_function_info(func: Callable) -> Dict[str, Any]: 

510 """ 

511 Get information about a registered function. 

512  

513 Args: 

514 func: The function to get information about 

515  

516 Returns: 

517 A dictionary containing information about the function 

518  

519 Raises: 

520 ValueError: If the function does not have memory type attributes 

521 """ 

522 if not hasattr(func, "input_memory_type") or not hasattr(func, "output_memory_type"): 

523 raise ValueError( 

524 f"Function '{func.__name__}' does not have memory type attributes" 

525 ) 

526 

527 return { 

528 "name": func.__name__, 

529 "input_memory_type": func.input_memory_type, 

530 "output_memory_type": func.output_memory_type, 

531 "backend": getattr(func, "backend", func.input_memory_type), 

532 "doc": func.__doc__, 

533 "module": func.__module__ 

534 } 

535 

536 

537def is_registry_initialized() -> bool: 

538 """ 

539 Check if the function registry has been initialized. 

540  

541 Thread-safe: Uses a lock to ensure consistent access to the initialization flag. 

542  

543 Returns: 

544 True if the registry is initialized, False otherwise 

545 """ 

546 with _registry_lock: 

547 return _registry_initialized 

548 

549 

550def get_valid_memory_types() -> Set[str]: 

551 """ 

552 Get the set of valid memory types. 

553 

554 Returns: 

555 A set of valid memory type names 

556 """ 

557 return VALID_MEMORY_TYPES.copy() 

558 

559 

560# Import hook system removed - using existing comprehensive registries 

561 

562 

563def get_function_by_name(function_name: str, memory_type: str) -> Optional[Callable]: 

564 """ 

565 Get a specific function by name and memory type from the registry. 

566 

567 Args: 

568 function_name: Name of the function to find 

569 memory_type: The memory type (e.g., "numpy", "cupy", "torch") 

570 

571 Returns: 

572 The function if found, None otherwise 

573 

574 Raises: 

575 RuntimeError: If the registry is not initialized 

576 ValueError: If the memory type is not valid 

577 """ 

578 functions = get_functions_by_memory_type(memory_type) 

579 

580 for func in functions: 

581 if func.__name__ == function_name: 

582 return func 

583 

584 return None 

585 

586 

587def get_all_function_names(memory_type: str) -> List[str]: 

588 """ 

589 Get all function names registered for a specific memory type. 

590 

591 Args: 

592 memory_type: The memory type (e.g., "numpy", "cupy", "torch") 

593 

594 Returns: 

595 A list of function names 

596 

597 Raises: 

598 RuntimeError: If the registry is not initialized 

599 ValueError: If the memory type is not valid 

600 """ 

601 functions = get_functions_by_memory_type(memory_type) 

602 return [func.__name__ for func in functions] 

603 

604 

605# LAZY INITIALIZATION: Don't auto-initialize on import to avoid blocking GUI startup 

606# The registry will auto-initialize on first access (when get_functions_by_memory_type is called) 

607# This prevents importing GPU libraries (cupy, torch, etc.) during module import, which 

608# blocks the main thread due to Python's GIL even when done in a background thread. 

609# 

610# For subprocess runner mode, set OPENHCS_SUBPROCESS_NO_GPU=1 to skip GPU library imports entirely. 

611import os 

612# if not os.environ.get('OPENHCS_SUBPROCESS_NO_GPU'): 

613# _auto_initialize_registry()