Coverage for openhcs/processing/backends/lib_registry/unified_registry.py: 72.5%
405 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Unified registry base class for external library function registration.
4This module provides a common base class that eliminates ~70% of code duplication
5across library registries (pyclesperanto, scikit-image, cupy, etc.) while enforcing
6consistent behavior and making it impossible to skip dynamic testing or hardcode
7function lists.
9Key Benefits:
10- Eliminates ~1000+ lines of duplicated code
11- Enforces consistent testing and registration patterns
12- Makes adding new libraries trivial (60-120 lines vs 350-400)
13- Centralizes bug fixes and improvements
14- Type-safe abstract interface prevents shortcuts
16Architecture:
17- LibraryRegistryBase: Abstract base class with common functionality
18- ProcessingContract: Unified contract enum across all libraries
19- Dimension error adapter factory for consistent error handling
20- Integrated caching system using existing cache_utils.py patterns
21"""
23import importlib
24import inspect
25import json
26import logging
27import time
28from abc import ABC, abstractmethod
29from dataclasses import dataclass, field
30from enum import Enum
31from functools import wraps
32from typing import Any, Callable, Dict, List, Optional, Tuple, Type
35from openhcs.core.xdg_paths import get_cache_file_path
36from openhcs.core.memory.stack_utils import unstack_slices, stack_slices
37from openhcs.core.auto_register_meta import AutoRegisterMeta, LazyDiscoveryDict
39logger = logging.getLogger(__name__)
42# Enums for OpenHCS principle compliance (replace magic strings)
43class ModuleFilterComponents(Enum):
44 """Components to filter out when generating tags from module paths."""
45 BACKENDS = "backends"
46 PROCESSING = "processing"
47 OPENHCS = "openhcs"
49 @classmethod
50 def should_skip(cls, component: str) -> bool:
51 """Check if component should be skipped in tag generation."""
52 return any(component == item.value for item in cls)
55class ProcessingContract(Enum):
56 """
57 Unified contract classification with direct method execution.
58 """
59 PURE_3D = "_execute_pure_3d"
60 PURE_2D = "_execute_pure_2d"
61 FLEXIBLE = "_execute_flexible"
62 VOLUMETRIC_TO_SLICE = "_execute_volumetric_to_slice"
64 def execute(self, registry, func, image, *args, **kwargs):
65 """Execute the contract method on the registry."""
66 method = getattr(registry, self.value)
67 return method(func, image, *args, **kwargs)
70@dataclass(frozen=True)
71class FunctionMetadata:
72 """Clean metadata with no library-specific leakage."""
74 # Core fields only
75 name: str
76 func: Callable
77 contract: ProcessingContract
78 registry: 'LibraryRegistryBase' # Reference to the registry that registered this function - REQUIRED
79 module: str = ""
80 doc: str = ""
81 tags: List[str] = field(default_factory=list)
82 original_name: str = "" # Original function name for cache reconstruction
84 def get_memory_type(self) -> str:
85 """
86 Get the actual memory type (backend) of this function.
88 Returns the function's input_memory_type if available, otherwise falls back
89 to the registry's memory type. This ensures UI shows the actual backend
90 (cupy, numpy, etc.) instead of the registry name (openhcs).
92 Returns:
93 Memory type string (e.g., "cupy", "numpy", "torch", "pyclesperanto")
94 """
95 # First try to get memory type from function attributes
96 if hasattr(self.func, 'input_memory_type'):
97 return self.func.input_memory_type
98 elif hasattr(self.func, 'output_memory_type'):
99 return self.func.output_memory_type
100 elif hasattr(self.func, 'backend'):
101 return self.func.backend
103 # Fallback to registry memory type
104 return self.registry.get_memory_type()
106 def get_registry_name(self) -> str:
107 """
108 Get the registry name that registered this function.
110 Returns:
111 Registry name string (e.g., "openhcs", "skimage", "cupy", "pyclesperanto")
112 """
113 return self.registry.library_name
118class LibraryRegistryBase(ABC, metaclass=AutoRegisterMeta):
119 """
120 Minimal ABC for all library registries.
122 Provides only essential contracts that all registries must implement,
123 regardless of whether they use runtime testing or explicit contracts.
125 Registry auto-created and stored as LibraryRegistryBase.__registry__.
126 Subclasses auto-register by setting _registry_name class attribute.
127 """
128 __registry_key__ = '_registry_name'
130 _registry_name: Optional[str] = None # Override in subclasses (e.g., 'pyclesperanto', 'cupy')
132 # Common exclusions across all libraries
133 COMMON_EXCLUSIONS = {
134 'imread', 'imsave', 'load', 'save', 'read', 'write',
135 'show', 'imshow', 'plot', 'display', 'view', 'visualize',
136 'info', 'help', 'version', 'test', 'benchmark'
137 }
139 # Abstract class attributes - each implementation must define these
140 MODULES_TO_SCAN: List[str]
141 MEMORY_TYPE: str # Memory type string value (e.g., "pyclesperanto", "cupy", "numpy")
142 FLOAT_DTYPE: Any # Library-specific float32 type (np.float32, cp.float32, etc.)
144 def __init__(self, library_name: str):
145 """
146 Initialize registry for a specific library.
148 Args:
149 library_name: Name of the library (e.g., "pyclesperanto", "skimage")
150 """
151 self.library_name = library_name
152 self._cache_path = get_cache_file_path(f"{library_name}_function_metadata.json")
158 # ===== ESSENTIAL ABC METHODS =====
160 # ===== LIBRARY IDENTIFICATION =====
161 @abstractmethod
162 def get_library_version(self) -> str:
163 """Get library version for cache validation."""
164 pass
166 @abstractmethod
167 def is_library_available(self) -> bool:
168 """Check if the library is available for import."""
169 pass
171 # ===== FUNCTION DISCOVERY =====
172 @abstractmethod
173 def discover_functions(self) -> Dict[str, FunctionMetadata]:
174 """Discover and return function metadata. Must be implemented by subclasses."""
175 pass
177 # ===== CONTRACT HANDLING =====
178 def apply_contract_wrapper(self, func: Callable, contract: ProcessingContract) -> Callable:
179 """Apply contract wrapper with parameter injection (enabled + slice_by_slice for FLEXIBLE)."""
180 from functools import wraps
181 import inspect
183 original_sig = inspect.signature(func)
184 param_names = {p.name for p in original_sig.parameters.values()}
186 # Define injectable parameters: enabled for all, slice_by_slice for FLEXIBLE
187 injectable_params = [('enabled', True, bool)]
188 if contract == ProcessingContract.FLEXIBLE:
189 injectable_params.append(('slice_by_slice', False, bool))
191 # Filter out already-existing parameters
192 params_to_add = [(name, default, annotation) for name, default, annotation in injectable_params if name not in param_names]
194 # If nothing to inject, return original function
195 if not params_to_add: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 return func
198 # Build new parameter list (insert before **kwargs)
199 new_params = list(original_sig.parameters.values())
200 insert_index = next((i for i, p in enumerate(new_params) if p.kind == inspect.Parameter.VAR_KEYWORD), len(new_params))
202 for param_name, default_value, annotation in params_to_add:
203 new_params.insert(insert_index, inspect.Parameter(param_name, inspect.Parameter.KEYWORD_ONLY, default=default_value, annotation=annotation))
204 insert_index += 1
206 # Create wrapper
207 @wraps(func)
208 def wrapper(image, *args, **kwargs):
209 for param_name, _, _ in injectable_params:
210 if param_name in kwargs: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 setattr(func, param_name, kwargs[param_name])
212 return contract.execute(self, func, image, *args, **kwargs)
214 # Set defaults and signature
215 for param_name, default_value, _ in injectable_params:
216 setattr(wrapper, param_name, default_value)
218 wrapper.__signature__ = original_sig.replace(parameters=new_params)
219 wrapper.__annotations__ = getattr(func, '__annotations__', {}).copy()
220 for param_name, _, annotation in injectable_params:
221 wrapper.__annotations__[param_name] = annotation
223 return wrapper
225 def _inject_optional_dataclass_params(self, func: Callable) -> Callable:
226 """Inject optional lazy dataclass parameters into function signature.
228 Can be disabled by setting ENABLE_CONFIG_INJECTION = False.
229 """
230 # Configuration flag to enable/disable config injection
231 ENABLE_CONFIG_INJECTION = False # Set to True to re-enable config injection
233 if not ENABLE_CONFIG_INJECTION: 233 ↛ 237line 233 didn't jump to line 237 because the condition on line 233 was always true
234 return func # Return function unchanged when disabled
236 # Original injection logic (commented out for now but preserved)
237 import inspect
238 from functools import wraps
239 from typing import Optional
241 # Get original signature
242 original_sig = inspect.signature(func)
243 original_params = list(original_sig.parameters.values())
245 # Import existing lazy config types
246 from openhcs.core.config import LazyNapariStreamingConfig, LazyFijiStreamingConfig, LazyStepMaterializationConfig
248 # Define common lazy dataclass parameters to inject
249 dataclass_params = [
250 ('napari_streaming_config', 'Optional[LazyNapariStreamingConfig]', LazyNapariStreamingConfig),
251 ('fiji_streaming_config', 'Optional[LazyFijiStreamingConfig]', LazyFijiStreamingConfig),
252 ('step_materialization_config', 'Optional[LazyStepMaterializationConfig]', LazyStepMaterializationConfig),
253 ]
255 # Check if any parameters need to be added
256 existing_param_names = {p.name for p in original_params}
257 params_to_add = [(name, type_hint, lazy_class) for name, type_hint, lazy_class in dataclass_params
258 if name not in existing_param_names]
260 if not params_to_add:
261 return func # No parameters to add
263 # Create new parameters
264 new_params = original_params.copy()
266 # Find insertion point (before **kwargs if it exists)
267 insert_index = len(new_params)
268 for i, param in enumerate(new_params):
269 if param.kind == inspect.Parameter.VAR_KEYWORD:
270 insert_index = i
271 break
273 # Add dataclass parameters
274 for param_name, type_hint, lazy_class in params_to_add:
275 new_param = inspect.Parameter(
276 param_name,
277 inspect.Parameter.KEYWORD_ONLY,
278 default=None,
279 annotation=Optional[lazy_class] # Use actual type object, not string
280 )
281 new_params.insert(insert_index, new_param)
282 insert_index += 1
284 # Create enhanced wrapper function
285 @wraps(func)
286 def enhanced_wrapper(*args, **kwargs):
287 # Extract dataclass parameters from kwargs (they're just ignored for now)
288 regular_kwargs = {k: v for k, v in kwargs.items()
289 if k not in [name for name, _, _ in dataclass_params]}
291 # Call original function with regular parameters only
292 return func(*args, **regular_kwargs)
294 # Apply the modified signature
295 new_sig = original_sig.replace(parameters=new_params)
296 enhanced_wrapper.__signature__ = new_sig
298 # Enhance annotations
299 if hasattr(func, '__annotations__'):
300 enhanced_wrapper.__annotations__ = func.__annotations__.copy()
301 else:
302 enhanced_wrapper.__annotations__ = {}
304 # Add type annotations for injected parameters
305 from typing import Optional
306 for param_name, type_hint, lazy_class in params_to_add:
307 enhanced_wrapper.__annotations__[param_name] = Optional[lazy_class]
309 return enhanced_wrapper
311 def _get_function_by_name(self, module_path: str, func_name: str):
312 """Get function object by module path and name."""
313 module = importlib.import_module(module_path)
314 return getattr(module, func_name)
316 # ===== PROCESSING CONTRACT EXECUTION METHODS =====
317 def _execute_slice_by_slice(self, func, image, *args, **kwargs):
318 """Shared slice-by-slice execution logic."""
319 if image.ndim == 3:
320 from openhcs.core.memory.stack_utils import unstack_slices, stack_slices
321 from openhcs.core.memory.converters import detect_memory_type
322 mem = detect_memory_type(image)
323 slices = unstack_slices(image, mem, 0)
324 results = [func(sl, *args, **kwargs) for sl in slices]
325 return stack_slices(results, mem, 0)
326 return func(image, *args, **kwargs)
328 def _execute_pure_3d(self, func, image, *args, **kwargs):
329 """Execute 3D→3D function directly (no change)."""
330 return func(image, *args, **kwargs)
332 def _execute_pure_2d(self, func, image, *args, **kwargs):
333 """Execute 2D→2D function with unstack/restack wrapper."""
334 # Get memory type from the decorated function
335 memory_type = func.output_memory_type
336 slices = unstack_slices(image, memory_type, 0)
337 results = [func(sl, *args, **kwargs) for sl in slices]
338 return stack_slices(results, memory_type, 0)
340 def _execute_flexible(self, func, image, *args, **kwargs):
341 """Execute function that handles both 3D→3D and 2D→2D with toggle."""
342 # Check if slice_by_slice attribute is set on the function
343 slice_by_slice = getattr(func, 'slice_by_slice', False)
344 if slice_by_slice: 344 ↛ 346line 344 didn't jump to line 346 because the condition on line 344 was never true
345 # Reuse the 2D-only execution logic (unstack -> process -> restack)
346 return self._execute_pure_2d(func, image, *args, **kwargs)
347 else:
348 # Use 3D-only execution logic (no modification)
349 return self._execute_pure_3d(func, image, *args, **kwargs)
351 def _execute_volumetric_to_slice(self, func, image, *args, **kwargs):
352 """Execute 3D→2D function returning slice 3D array."""
353 # Get memory type from the decorated function
354 memory_type = func.output_memory_type
355 result_2d = func(image, *args, **kwargs)
356 return stack_slices([result_2d], memory_type, 0)
358 # ===== CACHING METHODS =====
359 def _load_or_discover_functions(self) -> Dict[str, FunctionMetadata]:
360 """Load functions from cache or discover them if cache is invalid."""
361 logger.info(f"🔄 _load_or_discover_functions called for {self.library_name}")
363 cached_functions = self._load_from_cache()
364 if cached_functions is not None:
365 logger.info(f"✅ Loaded {len(cached_functions)} {self.library_name} functions from cache")
366 return cached_functions
368 logger.info(f"🔍 Cache miss for {self.library_name} - performing full discovery")
369 functions = self.discover_functions()
370 self._save_to_cache(functions)
371 return functions
373 def _load_from_cache(self) -> Optional[Dict[str, FunctionMetadata]]:
374 """Load function metadata from cache with validation."""
375 logger.debug(f"📂 LOAD FROM CACHE: Checking cache for {self.library_name}")
377 if not self._cache_path.exists():
378 logger.debug(f"📂 LOAD FROM CACHE: No cache file exists at {self._cache_path}")
379 return None
381 try:
382 with open(self._cache_path, 'r') as f:
383 cache_data = json.load(f)
384 except json.JSONDecodeError:
385 logger.warning(f"Corrupt cache file {self._cache_path}, rebuilding")
386 self._cache_path.unlink(missing_ok=True)
387 return None
389 if 'functions' not in cache_data: 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true
390 return None
392 cached_version = cache_data.get('library_version', 'unknown')
393 current_version = self.get_library_version()
394 if cached_version != current_version: 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true
395 logger.info(f"{self.library_name} version changed ({cached_version} → {current_version}) - cache invalid")
396 return None
398 cache_timestamp = cache_data.get('timestamp', 0)
399 cache_age_days = (time.time() - cache_timestamp) / (24 * 3600)
400 if cache_age_days > 7: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true
401 logger.debug(f"Cache is {cache_age_days:.1f} days old - rebuilding")
402 return None
404 logger.debug(f"📂 LOAD FROM CACHE: Loading {len(cache_data['functions'])} functions for {self.library_name}")
406 functions = {}
407 for func_name, cached_data in cache_data['functions'].items():
408 original_name = cached_data.get('original_name', func_name)
409 func = self._get_function_by_name(cached_data['module'], original_name)
410 contract = ProcessingContract[cached_data['contract']]
412 # Apply the same wrappers as during discovery
413 has_adapter = hasattr(self, 'create_library_adapter')
414 logger.debug(f"📂 LOAD FROM CACHE: {func_name} - hasattr(create_library_adapter)={has_adapter}")
416 if has_adapter:
417 # External library - apply library adapter + contract wrapper + param injection
418 adapted_func = self.create_library_adapter(func, contract)
419 contract_wrapped_func = self.apply_contract_wrapper(adapted_func, contract)
420 final_func = self._inject_optional_dataclass_params(contract_wrapped_func)
421 else:
422 # OpenHCS - apply contract wrapper + param injection
423 contract_wrapped_func = self.apply_contract_wrapper(func, contract)
424 final_func = self._inject_optional_dataclass_params(contract_wrapped_func)
426 metadata = FunctionMetadata(
427 name=func_name,
428 func=final_func,
429 contract=contract,
430 registry=self,
431 module=cached_data.get('module', ''),
432 doc=cached_data.get('doc', ''),
433 tags=cached_data.get('tags', []),
434 original_name=cached_data.get('original_name', func_name)
435 )
436 functions[func_name] = metadata
438 return functions
440 def _save_to_cache(self, functions: Dict[str, FunctionMetadata]) -> None:
441 """Save function metadata to cache."""
442 cache_data = {
443 'cache_version': '1.0',
444 'library_version': self.get_library_version(),
445 'timestamp': time.time(),
446 'functions': {
447 func_name: {
448 'name': metadata.name,
449 'original_name': metadata.original_name,
450 'module': metadata.module,
451 'contract': metadata.contract.name,
452 'doc': metadata.doc,
453 'tags': metadata.tags
454 }
455 for func_name, metadata in functions.items()
456 }
457 }
459 self._cache_path.parent.mkdir(parents=True, exist_ok=True)
460 with open(self._cache_path, 'w') as f:
461 json.dump(cache_data, f, indent=2)
463 logger.info(f"💾 Saved {len(functions)} {self.library_name} functions to cache")
465 def get_memory_type(self) -> str:
466 """Get the memory type string value for this library."""
467 return self.MEMORY_TYPE
469 def get_module_patterns(self) -> List[str]:
470 """Get module patterns that identify this library (can be overridden by implementations)."""
471 # Default: just the library name
472 return [self.library_name.lower()]
474 def get_display_name(self) -> str:
475 """Get display name for this library (can be overridden by implementations)."""
476 # Default: capitalize library name
477 return self.library_name.title()
479 # ===== FUNCTION DISCOVERY =====
480 def get_modules_to_scan(self) -> List[Tuple[str, Any]]:
481 """
482 Get list of (module_name, module_object) tuples to scan for functions.
483 Uses the MODULES_TO_SCAN class attribute and library object from get_library_object().
485 Returns:
486 List of (name, module) pairs where name is for identification
487 and module is the actual module object to scan.
488 """
489 library = self.get_library_object()
490 modules = []
491 for module_name in self.MODULES_TO_SCAN:
492 if module_name == "": 492 ↛ 494line 492 didn't jump to line 494 because the condition on line 492 was never true
493 # Empty string means scan the main library namespace
494 module = library
495 modules.append(("main", module))
496 else:
497 module = getattr(library, module_name)
498 modules.append((module_name, module))
499 return modules
501 @abstractmethod
502 def get_library_object(self):
503 """Get the main library object to scan for modules. Library-specific implementation."""
504 pass
507class RuntimeTestingRegistryBase(LibraryRegistryBase):
508 """
509 Extended ABC for libraries that require runtime testing.
511 Adds runtime testing methods for libraries that don't have explicit
512 processing contracts and need behavioral classification through testing.
513 """
515 def create_test_arrays(self) -> Tuple[Any, Any]:
516 """
517 Create test arrays appropriate for this library.
519 Returns:
520 Tuple of (test_3d, test_2d) arrays for behavior testing
521 """
522 test_3d = self._create_array((3, 20, 20), self._get_float_dtype())
523 test_2d = self._create_array((20, 20), self._get_float_dtype())
524 return test_3d, test_2d
526 @abstractmethod
527 def _create_array(self, shape: Tuple[int, ...], dtype):
528 """Create array with specified shape and dtype. Library-specific implementation."""
529 pass
531 def _get_float_dtype(self):
532 """Get the appropriate float dtype for this library."""
533 return self.FLOAT_DTYPE
535 # ===== CORE BEHAVIOR CONTRACT =====
536 def classify_function_behavior(self, func: Callable, declared_contract: Optional[ProcessingContract] = None) -> Tuple[ProcessingContract, bool]:
537 """Classify function behavior by testing 3D and 2D inputs, or use declared contract if provided."""
539 # Fast path: If explicit contract is declared, use it directly (skip runtime testing)
540 if declared_contract is not None: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 return declared_contract, True
542 test_3d, test_2d = self.create_test_arrays()
544 def test_function(test_array):
545 """Test function with array, return (success, result)."""
546 try:
547 result = func(test_array)
548 return True, result
549 except:
550 return False, None
552 works_3d, result_3d = test_function(test_3d)
553 works_2d, _ = test_function(test_2d)
555 # Classification lookup table
556 classification_map = {
557 (True, True): self._classify_dual_support(result_3d),
558 (True, False): ProcessingContract.PURE_3D,
559 (False, True): ProcessingContract.PURE_2D,
560 (False, False): None # Invalid function
561 }
563 contract = classification_map[(works_3d, works_2d)]
564 is_valid = works_3d or works_2d
566 return contract, is_valid
568 def _classify_dual_support(self, result_3d):
569 """Classify functions that work on both 3D and 2D inputs."""
570 if result_3d is not None:
571 # Handle tuple results (some functions return multiple arrays)
572 if isinstance(result_3d, tuple):
573 # Check the first element if it's a tuple
574 first_result = result_3d[0] if len(result_3d) > 0 else None
575 if hasattr(first_result, 'ndim') and first_result.ndim == 2: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true
576 return ProcessingContract.VOLUMETRIC_TO_SLICE
577 # Handle single array results
578 elif hasattr(result_3d, 'ndim') and result_3d.ndim == 2:
579 return ProcessingContract.VOLUMETRIC_TO_SLICE
580 return ProcessingContract.FLEXIBLE
582 @abstractmethod
583 def _stack_2d_results(self, func, test_3d):
584 """Stack 2D results. Library-specific implementation required."""
585 pass
587 @abstractmethod
588 def _arrays_close(self, arr1, arr2):
589 """Compare arrays. Library-specific implementation required."""
590 pass
592 def create_library_adapter(self, original_func: Callable, contract: ProcessingContract) -> Callable:
593 """Create adapter with library-specific processing only."""
594 import inspect
595 func_name = getattr(original_func, '__name__', 'unknown')
597 logger.debug(f"🔧 CREATE LIBRARY ADAPTER: {func_name} from {getattr(original_func, '__module__', 'unknown')}")
599 # Get original signature to preserve it
600 original_sig = inspect.signature(original_func)
602 def adapter(image, *args, **kwargs):
603 processed_image = self._preprocess_input(image, func_name)
604 result = contract.execute(self, original_func, processed_image, *args, **kwargs)
605 return self._postprocess_output(result, image, func_name)
607 # Apply wraps and preserve signature
608 wrapped_adapter = wraps(original_func)(adapter)
609 wrapped_adapter.__signature__ = original_sig
611 # Preserve and enhance annotations
612 if hasattr(original_func, '__annotations__'): 612 ↛ 615line 612 didn't jump to line 615 because the condition on line 612 was always true
613 wrapped_adapter.__annotations__ = original_func.__annotations__.copy()
614 else:
615 wrapped_adapter.__annotations__ = {}
617 # Extract type hints from docstring if annotations are missing
618 self._enhance_annotations_from_docstring(wrapped_adapter, original_func)
620 # Set memory type attributes for contract execution compatibility
621 # Only set if registry has a specific memory type (external libraries)
622 if self.MEMORY_TYPE is not None: 622 ↛ 625line 622 didn't jump to line 625 because the condition on line 622 was always true
623 wrapped_adapter.input_memory_type = self.MEMORY_TYPE
624 wrapped_adapter.output_memory_type = self.MEMORY_TYPE
625 wrapped_adapter.stream_to_napari = False
627 return wrapped_adapter
629 def _enhance_annotations_from_docstring(self, wrapped_func: Callable, original_func: Callable):
630 """Extract type hints from docstring using mathematical simplification approach."""
631 try:
632 # Import from shared UI utilities (no circular dependency)
633 from openhcs.introspection.signature_analyzer import SignatureAnalyzer
634 import numpy as np
636 logger.debug(f"🔍 ENHANCE ANNOTATIONS: {original_func.__name__} from {original_func.__module__}")
638 # Unified type extraction with compatibility validation (mathematical simplification)
639 TYPE_PATTERNS = {'ndarray': np.ndarray, 'array': np.ndarray, 'array_like': np.ndarray,
640 'int': int, 'integer': int, 'float': float, 'scalar': float,
641 'bool': bool, 'boolean': bool, 'str': str, 'string': str,
642 'tuple': tuple, 'list': list, 'dict': dict, 'sequence': list}
644 COMPATIBLE_DEFAULTS = {float: (int, float, range), int: (int, float),
645 list: (list, tuple, range), tuple: (list, tuple, range)}
647 param_info = SignatureAnalyzer.analyze(original_func, skip_first_param=False)
649 # Inline type extraction and validation (single-use function inlining rule)
650 enhanced_count = 0
651 for param_name, info in param_info.items():
652 if param_name not in wrapped_func.__annotations__ and info.description:
653 # Extract first line of description (NumPy/SciPy convention: type is always on first line)
654 # This avoids false matches from type keywords appearing later in the description
655 first_line = info.description.split('\n')[0].strip().lower()
656 # Remove optional markers and split on 'or' for union types
657 first_line = first_line.replace(', optional', '').replace(' optional', '').split(' or ')[0].strip()
659 # Type extraction with priority patterns
660 python_type = (str if first_line.startswith('{') and '}' in first_line
661 else list if any(p in first_line for p in ['sequence', 'iterable', 'array of', 'list of'])
662 else next((t for pattern, t in TYPE_PATTERNS.items() if pattern in first_line), None))
664 # Inline compatibility check (single-use function inlining rule)
665 if python_type and (info.default_value is None or
666 type(info.default_value) in COMPATIBLE_DEFAULTS.get(python_type, (python_type,))):
667 logger.debug(f" ✓ Enhanced {param_name}: {python_type} (from first_line='{first_line[:50]}')")
668 wrapped_func.__annotations__[param_name] = python_type
669 enhanced_count += 1
670 elif info.description: 670 ↛ 651line 670 didn't jump to line 651 because the condition on line 670 was always true
671 logger.debug(f" ✗ Could not enhance {param_name}: first_line='{first_line[:50]}', extracted_type={python_type}")
673 if enhanced_count > 0: 673 ↛ exitline 673 didn't return from function '_enhance_annotations_from_docstring' because the condition on line 673 was always true
674 logger.debug(f" 📝 Enhanced {enhanced_count} annotations for {original_func.__name__}")
675 logger.debug(f" Final annotations: {wrapped_func.__annotations__}")
676 except Exception as e:
677 logger.error(f" ❌ Error enhancing annotations for {original_func.__name__}: {e}", exc_info=True)
679 @abstractmethod
680 def _preprocess_input(self, image, func_name: str):
681 """Preprocess input image. Library-specific implementation."""
682 pass
684 @abstractmethod
685 def _postprocess_output(self, result, original_image, func_name: str):
686 """Postprocess output result. Library-specific implementation."""
687 pass
689 # ===== BASIC FILTERING =====
690 def should_include_function(self, func: Callable, func_name: str) -> bool:
691 """Single method for all filtering logic (blacklist, signature, etc.)"""
692 # Skip private functions
693 if func_name.startswith('_'): 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true
694 return False
696 # Skip exclusions (check both common and library-specific)
697 exclusions = getattr(self.__class__, 'EXCLUSIONS', self.COMMON_EXCLUSIONS)
698 if func_name.lower() in exclusions: 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true
699 return False
701 # Skip classes and types
702 if inspect.isclass(func) or isinstance(func, type):
703 return False
705 # Must be callable
706 if not callable(func):
707 return False
709 # Pure functions must have at least one parameter
710 sig = inspect.signature(func)
711 params = list(sig.parameters.values())
712 if not params: 712 ↛ 713line 712 didn't jump to line 713 because the condition on line 712 was never true
713 return False
715 # Validate that type hints can be resolved (skip functions with missing dependencies)
716 if not self._validate_type_hints(func, func_name): 716 ↛ 717line 716 didn't jump to line 717 because the condition on line 716 was never true
717 return False
719 # Library-specific signature validation
720 return self._check_first_parameter(params[0], func_name)
723 def _validate_type_hints(self, func: Callable, func_name: str) -> bool:
724 """
725 Validate that function type hints can be resolved.
727 Returns False if type hints reference missing dependencies (e.g., torch when not installed).
728 This prevents functions with unresolvable type hints from being registered.
729 """
730 try:
731 from typing import get_type_hints
732 # Try to resolve type hints - this will fail if dependencies are missing
733 get_type_hints(func)
734 return True
735 except NameError as e:
736 # Type hint references a missing dependency (e.g., 'torch' not defined)
737 logger.warning(f"Skipping function '{func_name}' due to unresolvable type hints: {e}")
738 return False
739 except Exception:
740 # Other type hint resolution errors - be conservative and allow the function
741 # (this handles edge cases where get_type_hints fails for other reasons)
742 return True
744 @abstractmethod
745 def _check_first_parameter(self, first_param, func_name: str) -> bool:
746 """Check if first parameter meets library-specific criteria. Library-specific implementation."""
747 pass
749 # ===== RUNTIME TESTING IMPLEMENTATION =====
750 def discover_functions(self) -> Dict[str, FunctionMetadata]:
751 """Discover and classify all library functions with runtime testing."""
752 functions = {}
753 modules = self.get_modules_to_scan()
754 logger.info(f"🔍 Starting function discovery for {self.library_name}")
755 logger.info(f"📦 Scanning {len(modules)} modules: {[name for name, _ in modules]}")
757 total_tested = 0
758 total_accepted = 0
760 for module_name, module in modules:
761 logger.info(f" 📦 Analyzing {module_name} ({module})...")
762 module_tested = 0
763 module_accepted = 0
765 for name in dir(module):
766 if name.startswith("_"):
767 continue
769 func = getattr(module, name)
770 full_path = self._get_full_function_path(module, name, module_name)
772 if not self.should_include_function(func, name):
773 rejection_reason = self._get_rejection_reason(func, name)
774 if rejection_reason != "private": 774 ↛ 776line 774 didn't jump to line 776 because the condition on line 774 was always true
775 logger.debug(f" 🚫 Skipping {full_path}: {rejection_reason}")
776 continue
778 module_tested += 1
779 total_tested += 1
781 contract, is_valid = self.classify_function_behavior(func)
782 logger.debug(f" 🧪 Testing {full_path}")
783 logger.debug(f" Classification: {contract.name if contract else contract}")
785 if not is_valid:
786 logger.debug(" ❌ Rejected: Invalid classification")
787 continue
789 doc_lines = (func.__doc__ or "").splitlines()
790 first_line_doc = doc_lines[0] if doc_lines else ""
791 func_name = self._generate_function_name(name, module_name)
793 # Apply library adapter (preprocessing/postprocessing)
794 adapted_func = self.create_library_adapter(func, contract)
796 # Apply contract wrapper (slice_by_slice for FLEXIBLE)
797 contract_wrapped_func = self.apply_contract_wrapper(adapted_func, contract)
799 # Inject optional dataclass parameters
800 final_func = self._inject_optional_dataclass_params(contract_wrapped_func)
802 metadata = FunctionMetadata(
803 name=func_name,
804 func=final_func,
805 contract=contract,
806 registry=self,
807 module=func.__module__ or "",
808 doc=first_line_doc,
809 tags=self._generate_tags(name),
810 original_name=name
811 )
813 functions[func_name] = metadata
814 module_accepted += 1
815 total_accepted += 1
816 logger.debug(f" ✅ Accepted as '{func_name}'")
818 logger.debug(f" 📊 Module {module_name}: {module_accepted}/{module_tested} functions accepted")
820 logger.info(f"✅ Discovery complete: {total_accepted}/{total_tested} functions accepted")
821 return functions
825 def _get_full_function_path(self, module, func_name: str, module_name: str) -> str:
826 """Generate full module path for logging."""
827 if module_name == "main": 827 ↛ 828line 827 didn't jump to line 828 because the condition on line 827 was never true
828 return f"{self.library_name}.{func_name}"
829 else:
830 # Extract clean module path
831 module_str = str(module)
832 if "'" in module_str: 832 ↛ 836line 832 didn't jump to line 836 because the condition on line 832 was always true
833 clean_path = module_str.split("'")[1]
834 return f"{clean_path}.{func_name}"
835 else:
836 return f"{module_name}.{func_name}"
838 def _get_rejection_reason(self, func: Callable, func_name: str) -> str:
839 """Get detailed reason why a function was rejected."""
840 # Check each rejection criteria in order
841 if func_name.startswith('_'): 841 ↛ 842line 841 didn't jump to line 842 because the condition on line 841 was never true
842 return "private"
844 exclusions = getattr(self.__class__, 'EXCLUSIONS', self.COMMON_EXCLUSIONS)
845 if func_name.lower() in exclusions: 845 ↛ 846line 845 didn't jump to line 846 because the condition on line 845 was never true
846 return "blacklisted"
848 if inspect.isclass(func) or isinstance(func, type):
849 return "is class/type"
851 if not callable(func):
852 return "not callable"
854 try:
855 sig = inspect.signature(func)
856 params = list(sig.parameters.values())
857 if not params: 857 ↛ 858line 857 didn't jump to line 858 because the condition on line 857 was never true
858 return "no parameters (not pure function)"
859 except (ValueError, TypeError):
860 return "invalid signature"
862 return "unknown"
866 # ===== CUSTOMIZATION HOOKS =====
867 def _generate_function_name(self, name: str, module_name: str) -> str:
868 """Generate function name. Override in subclasses for custom naming."""
869 return name
871 def _generate_tags(self, func_name: str) -> List[str]:
872 """Generate tags using library name."""
873 return [self.library_name]
876# ============================================================================
877# Registry Export
878# ============================================================================
879# Auto-created registry from LibraryRegistryBase
880LIBRARY_REGISTRIES = LibraryRegistryBase.__registry__