Coverage for openhcs/processing/func_registry.py: 43.5%
196 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Function registry for processing backends.
4This module provides a registry for functions that can be executed by different
5processing backends (numpy, cupy, torch, etc.). It automatically scans the
6processing directory to register functions with matching input and output
7memory types.
9The function registry is a global singleton that is initialized during application
10startup and shared across all components.
12Valid memory types:
13- numpy
14- cupy
15- torch
16- tensorflow
17- jax
19Thread Safety:
20 All functions in this module are thread-safe and use a lock to ensure
21 consistent access to the global registry.
22"""
23from __future__ import annotations
25import importlib
26import inspect
27import logging
28import os
29import pkgutil
30import sys
31import threading
32from typing import Any, Callable, Dict, List, Optional, Set, Tuple
34logger = logging.getLogger(__name__)
36# Thread-safe lock for registry access
37_registry_lock = threading.Lock()
39# Import hook system for auto-decorating external libraries
40_original_import = __builtins__['__import__']
41_decoration_applied = set()
42_import_hook_installed = False
44# Global registry of functions by backend type
45# Structure: {backend_name: [function1, function2, ...]}
46FUNC_REGISTRY: Dict[str, List[Callable]] = {}
48# Valid memory types
49VALID_MEMORY_TYPES = {"numpy", "cupy", "torch", "tensorflow", "jax", "pyclesperanto"}
51# CPU-only memory types (for CI/testing without GPU)
52CPU_ONLY_MEMORY_TYPES = {"numpy"}
54# Check if CPU-only mode is enabled
55CPU_ONLY_MODE = os.getenv('OPENHCS_CPU_ONLY', 'false').lower() == 'true'
57# Flag to track if the registry has been initialized
58_registry_initialized = False
60# Flag to track if we're currently in the initialization process (prevent recursion)
61_registry_initializing = False
64# Import hook system removed - using existing comprehensive registries with clean decoration
67# Import hook decoration functions removed - using existing registries
70def _auto_initialize_registry() -> None:
71 """
72 Auto-initialize the function registry on module import.
74 This follows the same pattern as storage_registry in openhcs.io.base.
75 """
76 global _registry_initialized
78 if _registry_initialized: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 return
81 try:
82 # Clear and initialize the registry with valid memory types
83 FUNC_REGISTRY.clear()
85 # Use CPU-only memory types if CPU_ONLY_MODE is enabled
86 memory_types_to_use = CPU_ONLY_MEMORY_TYPES if CPU_ONLY_MODE else VALID_MEMORY_TYPES
88 for memory_type in memory_types_to_use:
89 FUNC_REGISTRY[memory_type] = []
91 if CPU_ONLY_MODE: 91 ↛ 95line 91 didn't jump to line 95 because the condition on line 91 was always true
92 logger.info("CPU-only mode enabled - only registering numpy functions")
94 # Phase 1: Scan processing directory and register native OpenHCS functions
95 _scan_and_register_functions()
97 # Phase 2: Register external library functions
98 _register_external_libraries()
100 total_functions = sum(len(funcs) for funcs in FUNC_REGISTRY.values())
101 logger.info(
102 "Function registry auto-initialized with %d functions across %d backends",
103 total_functions,
104 len(VALID_MEMORY_TYPES)
105 )
107 # Mark registry as initialized
108 _registry_initialized = True
110 except Exception as e:
111 logger.error(f"Failed to auto-initialize function registry: {e}")
112 # Initialize empty registry as fallback
113 FUNC_REGISTRY.clear()
114 memory_types_to_use = CPU_ONLY_MEMORY_TYPES if CPU_ONLY_MODE else VALID_MEMORY_TYPES
115 for memory_type in memory_types_to_use:
116 FUNC_REGISTRY[memory_type] = []
117 _registry_initialized = True
120def initialize_registry() -> None:
121 """
122 Initialize the function registry and scan for functions to register.
124 This function is now optional since the registry auto-initializes on import.
125 It can be called to force re-initialization if needed.
127 Thread-safe: Uses a lock to ensure consistent access to the global registry.
129 Raises:
130 RuntimeError: If the registry is already initialized and force=False
131 """
132 with _registry_lock:
133 global _registry_initialized
135 # Check if registry is already initialized
136 if _registry_initialized:
137 logger.info("Function registry already initialized, skipping manual initialization")
138 return
140 # Clear and initialize the registry with valid memory types
141 FUNC_REGISTRY.clear()
142 memory_types_to_use = CPU_ONLY_MEMORY_TYPES if CPU_ONLY_MODE else VALID_MEMORY_TYPES
143 for memory_type in memory_types_to_use:
144 FUNC_REGISTRY[memory_type] = []
146 # Phase 1: Scan processing directory and register native OpenHCS functions
147 _scan_and_register_functions()
149 # Phase 2: Register external library functions
150 _register_external_libraries()
152 logger.info(
153 "Function registry initialized with %d functions across %d backends",
154 sum(len(funcs) for funcs in FUNC_REGISTRY.values()),
155 len(VALID_MEMORY_TYPES)
156 )
158 # Mark registry as initialized
159 _registry_initialized = True
162def load_prebuilt_registry(registry_data: Dict) -> None:
163 """
164 Load a pre-built function registry from serialized data.
166 This allows subprocess workers to skip function discovery by loading
167 a registry that was built in the main process.
169 Args:
170 registry_data: Dictionary containing the pre-built registry
171 """
172 with _registry_lock:
173 global _registry_initialized
175 FUNC_REGISTRY.clear()
176 FUNC_REGISTRY.update(registry_data)
177 _registry_initialized = True
179 total_functions = sum(len(funcs) for funcs in FUNC_REGISTRY.values())
180 logger.info(f"Loaded pre-built registry with {total_functions} functions")
183def _scan_and_register_functions() -> None:
184 """
185 Scan the processing directory for native OpenHCS functions.
187 This function recursively imports all modules in the processing directory
188 and registers functions that have matching input_memory_type and output_memory_type
189 attributes that are in the set of valid memory types.
191 This is Phase 1 of initialization - only native OpenHCS functions.
192 External library functions are registered in Phase 2.
193 """
194 from openhcs import processing
196 processing_path = os.path.dirname(processing.__file__)
197 processing_package = "openhcs.processing"
199 logger.info("Phase 1: Scanning for native OpenHCS functions in %s", processing_path)
201 # Walk through all modules in the processing package
202 for _, module_name, is_pkg in pkgutil.walk_packages([processing_path], f"{processing_package}."):
203 try:
204 # Import the module
205 logger.debug(f"Scanning module: {module_name}")
206 module = importlib.import_module(module_name)
208 # Skip packages (we'll process their modules separately)
209 if is_pkg:
210 logger.debug(f"Skipping package: {module_name}")
211 continue
213 # Find all functions in the module
214 function_count = 0
215 for name, obj in inspect.getmembers(module, inspect.isfunction):
216 # Check if the function has the required attributes
217 if hasattr(obj, "input_memory_type") and hasattr(obj, "output_memory_type"):
218 input_type = getattr(obj, "input_memory_type")
219 output_type = getattr(obj, "output_memory_type")
221 # Register if input and output types match and are valid
222 if input_type == output_type and input_type in VALID_MEMORY_TYPES: 222 ↛ 215line 222 didn't jump to line 215 because the condition on line 222 was always true
223 _register_function(obj, input_type)
224 function_count += 1
226 logger.debug(f"Module {module_name}: found {function_count} registerable functions")
227 except Exception as e:
228 logger.warning("Error importing module %s: %s", module_name, e)
231def _apply_unified_decoration(original_func, func_name, memory_type, create_wrapper=True):
232 """
233 Unified decoration pattern for all external library functions.
235 NOTE: Dtype preservation is now handled at the decorator level in decorators.py.
236 This function applies memory type attributes, decorator wrappers, and module replacement.
238 This applies the same hybrid approach across all registries:
239 1. Direct decoration (for subprocess compatibility)
240 2. Memory type decorator application (for dtype preservation and other features)
241 3. Module replacement (for best user experience and pickling compatibility)
243 Args:
244 original_func: The original external library function
245 func_name: Function name for wrapper creation
246 memory_type: MemoryType enum value (NUMPY, CUPY, PYCLESPERANTO, TORCH, TENSORFLOW, JAX)
247 create_wrapper: Whether to apply memory type decorator (default: True)
249 Returns:
250 The function to register (decorated if create_wrapper=True, original if not)
251 """
252 from openhcs.constants import MemoryType
253 import sys
255 # Step 1: Direct decoration (for subprocess compatibility)
256 original_func.input_memory_type = memory_type.value
257 original_func.output_memory_type = memory_type.value
259 if not create_wrapper:
260 return original_func
262 # Step 2: Apply memory type decorator (includes dtype preservation, streams, OOM recovery)
263 from openhcs.core.memory.decorators import numpy, cupy, torch, tensorflow, jax, pyclesperanto
265 if memory_type == MemoryType.NUMPY:
266 wrapper_func = numpy(original_func)
267 elif memory_type == MemoryType.CUPY:
268 wrapper_func = cupy(original_func)
269 elif memory_type == MemoryType.TORCH:
270 wrapper_func = torch(original_func)
271 elif memory_type == MemoryType.TENSORFLOW:
272 wrapper_func = tensorflow(original_func)
273 elif memory_type == MemoryType.JAX:
274 wrapper_func = jax(original_func)
275 elif memory_type == MemoryType.PYCLESPERANTO:
276 wrapper_func = pyclesperanto(original_func)
277 else:
278 # Fallback for unknown memory types
279 wrapper_func = original_func
280 wrapper_func.input_memory_type = memory_type.value
281 wrapper_func.output_memory_type = memory_type.value
283 # Step 3: Module replacement (for best user experience and pickling compatibility)
284 module_name = original_func.__module__
285 if module_name in sys.modules:
286 target_module = sys.modules[module_name]
287 if hasattr(target_module, func_name):
288 setattr(target_module, func_name, wrapper_func)
289 logger.debug(f"Replaced {module_name}.{func_name} with enhanced function")
291 return wrapper_func
294def _register_external_libraries() -> None:
295 """
296 Phase 2: Register external library functions using clean unified registries.
298 This is separate from core scanning to avoid circular dependencies.
299 External library registration should use direct registration, not trigger re-initialization.
300 """
301 logger.info("Phase 2: Registering external library functions using unified registries...")
303 try:
304 from openhcs.processing.backends.lib_registry.pyclesperanto_registry import PyclesperantoRegistry
305 registry = PyclesperantoRegistry()
306 registry.register_functions_direct()
307 logger.info("Successfully registered pyclesperanto functions")
308 except ImportError as e:
309 logger.warning(f"Could not register pyclesperanto functions: {e}")
310 except Exception as e:
311 logger.error(f"Error registering pyclesperanto functions: {e}")
313 try:
314 from openhcs.processing.backends.lib_registry.scikit_image_registry import SkimageRegistry
315 registry = SkimageRegistry()
316 registry.register_functions_direct()
317 logger.info("Successfully registered scikit-image functions")
318 except ImportError as e:
319 logger.warning(f"Could not register scikit-image functions: {e}")
320 except Exception as e:
321 logger.error(f"Error registering scikit-image functions: {e}")
323 try:
324 from openhcs.processing.backends.lib_registry.cupy_registry import CupyRegistry
325 registry = CupyRegistry()
326 registry.register_functions_direct()
327 logger.info("Successfully registered CuPy ndimage functions")
328 except ImportError as e:
329 logger.warning(f"Could not register CuPy functions: {e}")
330 except Exception as e:
331 logger.error(f"Error registering CuPy functions: {e}")
334def register_function(func: Callable, backend: str = None, **kwargs) -> None:
335 """
336 Manually register a function with the function registry.
338 This is the public API for registering functions that are not auto-discovered
339 by the module scanner (e.g., dynamically decorated functions).
341 Args:
342 func: The function to register (must have input_memory_type and output_memory_type attributes)
343 backend: Optional backend name (defaults to func.input_memory_type)
344 **kwargs: Additional metadata (ignored for compatibility)
346 Raises:
347 ValueError: If function doesn't have required memory type attributes
348 ValueError: If memory types are invalid
349 """
350 with _registry_lock:
351 # Ensure registry is initialized
352 if not _registry_initialized:
353 _auto_initialize_registry()
355 # Validate function has required attributes
356 if not hasattr(func, "input_memory_type") or not hasattr(func, "output_memory_type"):
357 raise ValueError(
358 f"Function '{func.__name__}' must have input_memory_type and output_memory_type attributes"
359 )
361 input_type = func.input_memory_type
362 output_type = func.output_memory_type
364 # Validate memory types
365 if input_type not in VALID_MEMORY_TYPES:
366 raise ValueError(f"Invalid input memory type: {input_type}")
367 if output_type not in VALID_MEMORY_TYPES:
368 raise ValueError(f"Invalid output memory type: {output_type}")
370 # Use input_memory_type as backend if not specified
371 memory_type = backend or input_type
372 if memory_type not in VALID_MEMORY_TYPES:
373 raise ValueError(f"Invalid backend memory type: {memory_type}")
375 # Register the function
376 _register_function(func, memory_type)
379def _register_function(func: Callable, memory_type: str) -> None:
380 """
381 Register a function for a specific memory type.
383 This is an internal function used during automatic scanning and manual registration.
385 Args:
386 func: The function to register
387 memory_type: The memory type (e.g., "numpy", "cupy", "torch")
388 """
389 # Skip if function is already registered
390 if func in FUNC_REGISTRY[memory_type]: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true
391 logger.debug(
392 "Function '%s' already registered for memory type '%s'",
393 func.__name__, memory_type
394 )
395 return
397 # Add function to registry
398 FUNC_REGISTRY[memory_type].append(func)
400 # Add memory_type attribute for easier inspection
401 setattr(func, "backend", memory_type)
403 logger.debug(
404 "Registered function '%s' for memory type '%s'",
405 func.__name__, memory_type
406 )
409def get_functions_by_memory_type(memory_type: str) -> List[Callable]:
410 """
411 Get all functions registered for a specific memory type.
413 Thread-safe: Uses a lock to ensure consistent access to the global registry.
415 Args:
416 memory_type: The memory type (e.g., "numpy", "cupy", "torch")
418 Returns:
419 A list of functions registered for the specified memory type
421 Raises:
422 RuntimeError: If the registry is not initialized
423 ValueError: If the memory type is not valid
424 """
425 with _registry_lock:
426 # Check if registry is initialized (should be auto-initialized on import)
427 if not _registry_initialized:
428 logger.warning("Function registry not initialized, auto-initializing now")
429 _auto_initialize_registry()
431 # Check if memory type is valid
432 if memory_type not in VALID_MEMORY_TYPES:
433 raise ValueError(
434 f"Invalid memory type: {memory_type}. "
435 f"Valid types are: {', '.join(sorted(VALID_MEMORY_TYPES))}"
436 )
438 # Return a copy of the list to prevent external modification
439 return list(FUNC_REGISTRY[memory_type])
442def get_function_info(func: Callable) -> Dict[str, Any]:
443 """
444 Get information about a registered function.
446 Args:
447 func: The function to get information about
449 Returns:
450 A dictionary containing information about the function
452 Raises:
453 ValueError: If the function does not have memory type attributes
454 """
455 if not hasattr(func, "input_memory_type") or not hasattr(func, "output_memory_type"):
456 raise ValueError(
457 f"Function '{func.__name__}' does not have memory type attributes"
458 )
460 return {
461 "name": func.__name__,
462 "input_memory_type": func.input_memory_type,
463 "output_memory_type": func.output_memory_type,
464 "backend": getattr(func, "backend", func.input_memory_type),
465 "doc": func.__doc__,
466 "module": func.__module__
467 }
470def is_registry_initialized() -> bool:
471 """
472 Check if the function registry has been initialized.
474 Thread-safe: Uses a lock to ensure consistent access to the initialization flag.
476 Returns:
477 True if the registry is initialized, False otherwise
478 """
479 with _registry_lock:
480 return _registry_initialized
483def get_valid_memory_types() -> Set[str]:
484 """
485 Get the set of valid memory types.
487 Returns:
488 A set of valid memory type names
489 """
490 return VALID_MEMORY_TYPES.copy()
493# Import hook system removed - using existing comprehensive registries
496def get_function_by_name(function_name: str, memory_type: str) -> Optional[Callable]:
497 """
498 Get a specific function by name and memory type from the registry.
500 Args:
501 function_name: Name of the function to find
502 memory_type: The memory type (e.g., "numpy", "cupy", "torch")
504 Returns:
505 The function if found, None otherwise
507 Raises:
508 RuntimeError: If the registry is not initialized
509 ValueError: If the memory type is not valid
510 """
511 functions = get_functions_by_memory_type(memory_type)
513 for func in functions:
514 if func.__name__ == function_name:
515 return func
517 return None
520def get_all_function_names(memory_type: str) -> List[str]:
521 """
522 Get all function names registered for a specific memory type.
524 Args:
525 memory_type: The memory type (e.g., "numpy", "cupy", "torch")
527 Returns:
528 A list of function names
530 Raises:
531 RuntimeError: If the registry is not initialized
532 ValueError: If the memory type is not valid
533 """
534 functions = get_functions_by_memory_type(memory_type)
535 return [func.__name__ for func in functions]
538# Auto-initialize the registry on module import (following storage_registry pattern)
539# Skip initialization in subprocess workers for faster startup
540import os
541if not os.environ.get('OPENHCS_SUBPROCESS_MODE'): 541 ↛ exitline 541 didn't exit the module because the condition on line 541 was always true
542 _auto_initialize_registry()