Coverage for openhcs/processing/func_registry.py: 37.2%
227 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Function registry for processing backends.
4This module provides a registry for functions that can be executed by different
5processing backends (numpy, cupy, torch, etc.). It automatically scans the
6processing directory to register functions with matching input and output
7memory types.
9The function registry is a global singleton that is initialized during application
10startup and shared across all components.
12Valid memory types:
13- numpy
14- cupy
15- torch
16- tensorflow
17- jax
19Thread Safety:
20 All functions in this module are thread-safe and use a lock to ensure
21 consistent access to the global registry.
22"""
23from __future__ import annotations
25import importlib
26import inspect
27import logging
28import os
29import pkgutil
30import sys
31import threading
32from typing import Any, Callable, Dict, List, Optional, Set
34logger = logging.getLogger(__name__)
36# Thread-safe lock for registry access
37_registry_lock = threading.Lock()
39# Import hook system for auto-decorating external libraries
40_original_import = __builtins__['__import__']
41_decoration_applied = set()
42_import_hook_installed = False
44# Global registry of functions by backend type
45# Structure: {backend_name: [function1, function2, ...]}
46FUNC_REGISTRY: Dict[str, List[Callable]] = {}
48# Valid memory types
49VALID_MEMORY_TYPES = {"numpy", "cupy", "torch", "tensorflow", "jax", "pyclesperanto"}
51# CPU-only memory types (for CI/testing without GPU)
52CPU_ONLY_MEMORY_TYPES = {"numpy"}
54# Check if CPU-only mode is enabled
55CPU_ONLY_MODE = os.getenv('OPENHCS_CPU_ONLY', 'false').lower() == 'true'
57# Flag to track if the registry has been initialized
58_registry_initialized = False
60# Flag to track if we're currently in the initialization process (prevent recursion)
61_registry_initializing = False
64# Import hook system removed - using existing comprehensive registries with clean decoration
67# Import hook decoration functions removed - using existing registries
70def _create_virtual_modules() -> None:
71 """Create virtual modules that mirror external library structure under openhcs namespace."""
72 import types
73 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
75 # Get all registered functions
76 all_functions = RegistryService.get_all_functions_with_metadata()
78 # Group functions by their full module path
79 functions_by_module = {}
80 for composite_key, metadata in all_functions.items():
81 # Only create virtual modules for external library functions with slice_by_slice
82 if (hasattr(metadata.func, 'slice_by_slice') and
83 not hasattr(metadata.func, '__processing_contract__') and
84 not metadata.func.__module__.startswith('openhcs.')):
86 original_module = metadata.func.__module__
87 virtual_module = f'openhcs.{original_module}'
88 if virtual_module not in functions_by_module:
89 functions_by_module[virtual_module] = {}
90 functions_by_module[virtual_module][metadata.func.__name__] = metadata.func
92 # Create virtual modules for each module path
93 created_modules = []
94 all_virtual_modules = set()
96 # First, collect all module paths including intermediate ones
97 for virtual_module in functions_by_module.keys():
98 parts = virtual_module.split('.')
99 for i in range(2, len(parts) + 1): # Start from 'openhcs.xxx'
100 intermediate_module = '.'.join(parts[:i])
101 all_virtual_modules.add(intermediate_module)
103 # Create intermediate modules first (in order)
104 for virtual_module in sorted(all_virtual_modules):
105 if virtual_module not in sys.modules: 105 ↛ 104line 105 didn't jump to line 104 because the condition on line 105 was always true
106 module = types.ModuleType(virtual_module)
107 module.__doc__ = f"Virtual module mirroring {virtual_module.replace('openhcs.', '')} with OpenHCS decorations"
108 sys.modules[virtual_module] = module
109 created_modules.append(virtual_module)
111 # Then add functions to the leaf modules
112 for virtual_module, functions in functions_by_module.items():
113 if virtual_module in sys.modules: 113 ↛ 112line 113 didn't jump to line 112 because the condition on line 113 was always true
114 module = sys.modules[virtual_module]
115 # Add all functions from this module
116 for func_name, func in functions.items():
117 setattr(module, func_name, func)
119 if created_modules: 119 ↛ exitline 119 didn't return from function '_create_virtual_modules' because the condition on line 119 was always true
120 logger.info(f"Created {len(created_modules)} virtual modules: {', '.join(created_modules)}")
123def _auto_initialize_registry() -> None:
124 """
125 Auto-initialize the function registry on module import.
127 This follows the same pattern as storage_registry in openhcs.io.base.
128 """
129 global _registry_initialized
131 if _registry_initialized: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 return
134 try:
135 # Clear and initialize the registry
136 FUNC_REGISTRY.clear()
138 # Phase 1: Register all functions from RegistryService (includes OpenHCS and external libraries)
139 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
140 all_functions = RegistryService.get_all_functions_with_metadata()
142 # Initialize registry structure based on discovered registries
143 # Handle composite keys from RegistryService (backend:function_name)
144 for composite_key, metadata in all_functions.items():
145 registry_name = metadata.registry.library_name
146 if registry_name not in FUNC_REGISTRY:
147 FUNC_REGISTRY[registry_name] = []
149 # Register all functions
150 for composite_key, metadata in all_functions.items():
151 registry_name = metadata.registry.library_name
152 FUNC_REGISTRY[registry_name].append(metadata.func)
154 # Phase 2: Apply CPU-only filtering if enabled
155 if CPU_ONLY_MODE: 155 ↛ 159line 155 didn't jump to line 159 because the condition on line 155 was always true
156 logger.info("CPU-only mode enabled - filtering to numpy functions only")
157 _apply_cpu_only_filtering()
159 total_functions = sum(len(funcs) for funcs in FUNC_REGISTRY.values())
160 logger.info(
161 "Function registry auto-initialized with %d functions across %d registries",
162 total_functions,
163 len(FUNC_REGISTRY)
164 )
166 # Mark registry as initialized
167 _registry_initialized = True
169 # Create virtual modules for external library functions
170 _create_virtual_modules()
172 except Exception as e:
173 logger.error(f"Failed to auto-initialize function registry: {e}")
174 raise
177def initialize_registry() -> None:
178 """
179 Initialize the function registry and scan for functions to register.
181 This function is now optional since the registry auto-initializes on import.
182 It can be called to force re-initialization if needed.
184 Thread-safe: Uses a lock to ensure consistent access to the global registry.
186 Raises:
187 RuntimeError: If the registry is already initialized and force=False
188 """
189 with _registry_lock:
190 global _registry_initialized
192 # Check if registry is already initialized
193 if _registry_initialized:
194 logger.info("Function registry already initialized, skipping manual initialization")
195 return
197 # Clear and initialize the registry
198 FUNC_REGISTRY.clear()
200 # Phase 1: Register all functions from RegistryService (includes OpenHCS and external libraries)
201 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
202 all_functions = RegistryService.get_all_functions_with_metadata()
204 # Initialize registry structure based on discovered registries
205 # Handle composite keys from RegistryService (backend:function_name)
206 for composite_key, metadata in all_functions.items():
207 registry_name = metadata.registry.library_name
208 if registry_name not in FUNC_REGISTRY:
209 FUNC_REGISTRY[registry_name] = []
211 # Register all functions
212 for composite_key, metadata in all_functions.items():
213 registry_name = metadata.registry.library_name
214 FUNC_REGISTRY[registry_name].append(metadata.func)
216 # Phase 2: Apply CPU-only filtering if enabled
217 if CPU_ONLY_MODE:
218 logger.info("CPU-only mode enabled - filtering to numpy functions only")
219 _apply_cpu_only_filtering()
221 logger.info(
222 "Function registry initialized with %d functions across %d registries",
223 sum(len(funcs) for funcs in FUNC_REGISTRY.values()),
224 len(FUNC_REGISTRY)
225 )
227 # Mark registry as initialized
228 _registry_initialized = True
230 # Create virtual modules for external library functions
231 _create_virtual_modules()
234def load_prebuilt_registry(registry_data: Dict) -> None:
235 """
236 Load a pre-built function registry from serialized data.
238 This allows subprocess workers to skip function discovery by loading
239 a registry that was built in the main process.
241 Args:
242 registry_data: Dictionary containing the pre-built registry
243 """
244 with _registry_lock:
245 global _registry_initialized
247 FUNC_REGISTRY.clear()
248 FUNC_REGISTRY.update(registry_data)
249 _registry_initialized = True
251 total_functions = sum(len(funcs) for funcs in FUNC_REGISTRY.values())
252 logger.info(f"Loaded pre-built registry with {total_functions} functions")
255def _scan_and_register_functions() -> None:
256 """
257 Scan the processing directory for native OpenHCS functions.
259 This function recursively imports all modules in the processing directory
260 and registers functions that have matching input_memory_type and output_memory_type
261 attributes that are in the set of valid memory types.
263 This is Phase 1 of initialization - only native OpenHCS functions.
264 External library functions are registered in Phase 2.
265 """
266 from openhcs import processing
268 processing_path = os.path.dirname(processing.__file__)
269 processing_package = "openhcs.processing"
271 logger.info("Phase 1: Scanning for native OpenHCS functions in %s", processing_path)
273 # Walk through all modules in the processing package
274 for _, module_name, is_pkg in pkgutil.walk_packages([processing_path], f"{processing_package}."):
275 try:
276 # Import the module
277 logger.debug(f"Scanning module: {module_name}")
278 module = importlib.import_module(module_name)
280 # Skip packages (we'll process their modules separately)
281 if is_pkg:
282 logger.debug(f"Skipping package: {module_name}")
283 continue
285 # Find all functions in the module
286 function_count = 0
287 for name, obj in inspect.getmembers(module, inspect.isfunction):
288 # Check if the function has the required attributes
289 if hasattr(obj, "input_memory_type") and hasattr(obj, "output_memory_type"):
290 input_type = getattr(obj, "input_memory_type")
291 output_type = getattr(obj, "output_memory_type")
293 # Register if input and output types are valid (OpenHCS functions can have mixed types)
294 if input_type in VALID_MEMORY_TYPES and output_type in VALID_MEMORY_TYPES:
295 _register_function(obj, "openhcs")
296 function_count += 1
298 logger.debug(f"Module {module_name}: found {function_count} registerable functions")
299 except Exception as e:
300 logger.warning("Error importing module %s: %s", module_name, e)
303def _apply_unified_decoration(original_func, func_name, memory_type, create_wrapper=True):
304 """
305 Unified decoration pattern for all external library functions.
307 NOTE: Dtype preservation is now handled at the decorator level in decorators.py.
308 This function applies memory type attributes, decorator wrappers, and module replacement.
310 This applies the same hybrid approach across all registries:
311 1. Direct decoration (for subprocess compatibility)
312 2. Memory type decorator application (for dtype preservation and other features)
313 3. Module replacement (for best user experience and pickling compatibility)
315 Args:
316 original_func: The original external library function
317 func_name: Function name for wrapper creation
318 memory_type: MemoryType enum value (NUMPY, CUPY, PYCLESPERANTO, TORCH, TENSORFLOW, JAX)
319 create_wrapper: Whether to apply memory type decorator (default: True)
321 Returns:
322 The function to register (decorated if create_wrapper=True, original if not)
323 """
324 from openhcs.constants import MemoryType
326 # Step 1: Direct decoration (for subprocess compatibility)
327 original_func.input_memory_type = memory_type.value
328 original_func.output_memory_type = memory_type.value
330 if not create_wrapper:
331 return original_func
333 # Step 2: Apply memory type decorator (includes dtype preservation, streams, OOM recovery)
334 from openhcs.core.memory.decorators import numpy, cupy, torch, tensorflow, jax, pyclesperanto
336 if memory_type == MemoryType.NUMPY:
337 wrapper_func = numpy(original_func)
338 elif memory_type == MemoryType.CUPY:
339 wrapper_func = cupy(original_func)
340 elif memory_type == MemoryType.TORCH:
341 wrapper_func = torch(original_func)
342 elif memory_type == MemoryType.TENSORFLOW:
343 wrapper_func = tensorflow(original_func)
344 elif memory_type == MemoryType.JAX:
345 wrapper_func = jax(original_func)
346 elif memory_type == MemoryType.PYCLESPERANTO:
347 wrapper_func = pyclesperanto(original_func)
348 else:
349 # Fallback for unknown memory types
350 wrapper_func = original_func
351 wrapper_func.input_memory_type = memory_type.value
352 wrapper_func.output_memory_type = memory_type.value
354 # Step 3: Module replacement (for best user experience and pickling compatibility)
355 module_name = original_func.__module__
356 if module_name in sys.modules:
357 target_module = sys.modules[module_name]
358 if hasattr(target_module, func_name):
359 setattr(target_module, func_name, wrapper_func)
360 logger.debug(f"Replaced {module_name}.{func_name} with enhanced function")
362 return wrapper_func
367def register_function(func: Callable, backend: str = None, **kwargs) -> None:
368 """
369 Manually register a function with the function registry.
371 This is the public API for registering functions that are not auto-discovered
372 by the module scanner (e.g., dynamically decorated functions).
374 Args:
375 func: The function to register (must have input_memory_type and output_memory_type attributes)
376 backend: Optional backend name (defaults to func.input_memory_type)
377 **kwargs: Additional metadata (ignored for compatibility)
379 Raises:
380 ValueError: If function doesn't have required memory type attributes
381 ValueError: If memory types are invalid
382 """
383 with _registry_lock:
384 # Ensure registry is initialized
385 if not _registry_initialized:
386 _auto_initialize_registry()
388 # Validate function has required attributes
389 if not hasattr(func, "input_memory_type") or not hasattr(func, "output_memory_type"):
390 raise ValueError(
391 f"Function '{func.__name__}' must have input_memory_type and output_memory_type attributes"
392 )
394 input_type = func.input_memory_type
395 output_type = func.output_memory_type
397 # Validate memory types
398 if input_type not in VALID_MEMORY_TYPES:
399 raise ValueError(f"Invalid input memory type: {input_type}")
400 if output_type not in VALID_MEMORY_TYPES:
401 raise ValueError(f"Invalid output memory type: {output_type}")
403 # Use backend if specified, otherwise register as openhcs
404 registry_name = backend or "openhcs"
405 if registry_name not in FUNC_REGISTRY:
406 raise ValueError(f"Invalid registry name: {registry_name}")
408 # Register the function
409 _register_function(func, registry_name)
412def _apply_cpu_only_filtering() -> None:
413 """Filter registry to only include numpy-compatible functions when CPU_ONLY_MODE is enabled."""
414 for registry_name, functions in list(FUNC_REGISTRY.items()):
415 filtered_functions = []
416 for func in functions:
417 # Only keep functions with numpy memory types
418 if hasattr(func, 'output_memory_type') and func.output_memory_type == "numpy": 418 ↛ 416line 418 didn't jump to line 416 because the condition on line 418 was always true
419 filtered_functions.append(func)
421 # Update registry with filtered functions, remove empty registries
422 if filtered_functions: 422 ↛ 425line 422 didn't jump to line 425 because the condition on line 422 was always true
423 FUNC_REGISTRY[registry_name] = filtered_functions
424 else:
425 del FUNC_REGISTRY[registry_name]
428def _register_function(func: Callable, registry_name: str) -> None:
429 """
430 Register a function for a specific registry.
432 This is an internal function used during automatic scanning and manual registration.
434 Args:
435 func: The function to register
436 registry_name: The registry name (e.g., "openhcs", "skimage", "pyclesperanto")
437 """
438 # Skip if function is already registered
439 if func in FUNC_REGISTRY[registry_name]:
440 logger.debug(
441 "Function '%s' already registered for registry '%s'",
442 func.__name__, registry_name
443 )
444 return
446 # Add function to registry
447 FUNC_REGISTRY[registry_name].append(func)
449 # Add registry_name attribute for easier inspection
450 setattr(func, "registry", registry_name)
452 logger.debug(
453 "Registered function '%s' for memory type '%s'",
454 func.__name__, memory_type
455 )
458def get_functions_by_memory_type(memory_type: str) -> List[Callable]:
459 """
460 Get all functions for a specific memory type using the new RegistryService.
462 Args:
463 memory_type: The memory type (e.g., "numpy", "cupy", "torch")
465 Returns:
466 A list of functions for the specified memory type
468 Raises:
469 ValueError: If the memory type is not valid
470 """
471 # Check if memory type is valid
472 if memory_type not in VALID_MEMORY_TYPES:
473 raise ValueError(
474 f"Invalid memory type: {memory_type}. "
475 f"Valid types are: {', '.join(sorted(VALID_MEMORY_TYPES))}"
476 )
478 # Get functions from new RegistryService
479 from openhcs.processing.backends.lib_registry.registry_service import RegistryService
480 all_functions = RegistryService.get_all_functions_with_metadata()
482 # Filter functions by memory type using proper architecture
483 functions = []
484 for func_name, metadata in all_functions.items():
485 # Handle two distinct patterns:
487 # 1. Runtime Testing Libraries: Use registry's MEMORY_TYPE attribute
488 if hasattr(metadata, 'registry') and hasattr(metadata.registry, 'MEMORY_TYPE'):
489 if metadata.registry.MEMORY_TYPE == memory_type:
490 functions.append(metadata.func)
492 # 2. OpenHCS Native Functions: Check function's own memory type attributes
493 elif metadata.tags and 'openhcs' in metadata.tags:
494 # Check if function has memory type information
495 func = metadata.func
496 if hasattr(func, 'input_memory_type') and func.input_memory_type == memory_type:
497 functions.append(func)
498 elif hasattr(func, 'backend') and func.backend == memory_type:
499 functions.append(func)
501 # Also include legacy FUNC_REGISTRY functions for backward compatibility
502 with _registry_lock:
503 if _registry_initialized and memory_type in FUNC_REGISTRY:
504 functions.extend(FUNC_REGISTRY[memory_type])
506 return functions
509def get_function_info(func: Callable) -> Dict[str, Any]:
510 """
511 Get information about a registered function.
513 Args:
514 func: The function to get information about
516 Returns:
517 A dictionary containing information about the function
519 Raises:
520 ValueError: If the function does not have memory type attributes
521 """
522 if not hasattr(func, "input_memory_type") or not hasattr(func, "output_memory_type"):
523 raise ValueError(
524 f"Function '{func.__name__}' does not have memory type attributes"
525 )
527 return {
528 "name": func.__name__,
529 "input_memory_type": func.input_memory_type,
530 "output_memory_type": func.output_memory_type,
531 "backend": getattr(func, "backend", func.input_memory_type),
532 "doc": func.__doc__,
533 "module": func.__module__
534 }
537def is_registry_initialized() -> bool:
538 """
539 Check if the function registry has been initialized.
541 Thread-safe: Uses a lock to ensure consistent access to the initialization flag.
543 Returns:
544 True if the registry is initialized, False otherwise
545 """
546 with _registry_lock:
547 return _registry_initialized
550def get_valid_memory_types() -> Set[str]:
551 """
552 Get the set of valid memory types.
554 Returns:
555 A set of valid memory type names
556 """
557 return VALID_MEMORY_TYPES.copy()
560# Import hook system removed - using existing comprehensive registries
563def get_function_by_name(function_name: str, memory_type: str) -> Optional[Callable]:
564 """
565 Get a specific function by name and memory type from the registry.
567 Args:
568 function_name: Name of the function to find
569 memory_type: The memory type (e.g., "numpy", "cupy", "torch")
571 Returns:
572 The function if found, None otherwise
574 Raises:
575 RuntimeError: If the registry is not initialized
576 ValueError: If the memory type is not valid
577 """
578 functions = get_functions_by_memory_type(memory_type)
580 for func in functions:
581 if func.__name__ == function_name:
582 return func
584 return None
587def get_all_function_names(memory_type: str) -> List[str]:
588 """
589 Get all function names registered for a specific memory type.
591 Args:
592 memory_type: The memory type (e.g., "numpy", "cupy", "torch")
594 Returns:
595 A list of function names
597 Raises:
598 RuntimeError: If the registry is not initialized
599 ValueError: If the memory type is not valid
600 """
601 functions = get_functions_by_memory_type(memory_type)
602 return [func.__name__ for func in functions]
605# LAZY INITIALIZATION: Don't auto-initialize on import to avoid blocking GUI startup
606# The registry will auto-initialize on first access (when get_functions_by_memory_type is called)
607# This prevents importing GPU libraries (cupy, torch, etc.) during module import, which
608# blocks the main thread due to Python's GIL even when done in a background thread.
609#
610# For subprocess runner mode, set OPENHCS_SUBPROCESS_NO_GPU=1 to skip GPU library imports entirely.
611import os
612# if not os.environ.get('OPENHCS_SUBPROCESS_NO_GPU'):
613# _auto_initialize_registry()