Coverage for openhcs/processing/backends/lib_registry/openhcs_registry.py: 70.0%
94 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2OpenHCS native function registry.
4This registry processes OpenHCS functions that have been decorated with
5explicit contract declarations, allowing them to skip runtime testing
6while producing the same FunctionMetadata format as external libraries.
7"""
9import logging
10import numpy as np
11from typing import Dict, List, Tuple, Any, Callable
12import importlib
13from functools import wraps
15from openhcs.processing.backends.lib_registry.unified_registry import LibraryRegistryBase, FunctionMetadata, ProcessingContract
17logger = logging.getLogger(__name__)
20class OpenHCSRegistry(LibraryRegistryBase):
21 """
22 Registry for OpenHCS native functions with explicit contract support.
24 This registry processes OpenHCS functions that have been decorated with
25 explicit contract declarations, allowing them to skip runtime testing
26 while producing the same FunctionMetadata format as external libraries.
27 """
29 # Required abstract class attributes
30 MODULES_TO_SCAN = [] # Will be set dynamically
31 MEMORY_TYPE = None # OpenHCS functions have their own memory type attributes
32 FLOAT_DTYPE = np.float32
34 def __init__(self):
35 super().__init__("openhcs")
36 # Set modules to scan to OpenHCS processing modules
37 self.MODULES_TO_SCAN = self._get_openhcs_modules()
39 def _get_openhcs_modules(self) -> List[str]:
40 """Get list of OpenHCS processing modules to scan using automatic discovery."""
41 import pkgutil
42 import os
44 modules = []
46 # Get the backends directory path
47 backends_path = os.path.dirname(__file__) # lib_registry directory
48 backends_path = os.path.dirname(backends_path) # backends directory
50 # Walk through all modules in openhcs.processing.backends recursively
51 for importer, module_name, ispkg in pkgutil.walk_packages(
52 [backends_path],
53 "openhcs.processing.backends."
54 ):
55 # Skip lib_registry modules to avoid circular imports
56 if "lib_registry" in module_name: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 continue
59 # Skip __pycache__ and other non-module files
60 if "__pycache__" in module_name: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 continue
63 try:
64 # Try to import the module to ensure it's valid
65 importlib.import_module(module_name)
66 modules.append(module_name)
67 except ImportError as e:
68 # Module has import issues, skip it but log for debugging
69 logger.debug(f"Skipping module {module_name}: {e}")
70 continue
72 return modules
74 def get_modules_to_scan(self) -> List[Tuple[str, Any]]:
75 """Get modules to scan for OpenHCS functions."""
76 modules = []
77 for module_name in self.MODULES_TO_SCAN:
78 try:
79 module = importlib.import_module(module_name)
80 modules.append((module_name, module))
81 except ImportError as e:
82 logger.warning(f"Could not import OpenHCS module {module_name}: {e}")
83 return modules
87 # ===== ESSENTIAL ABC METHODS =====
88 def get_library_version(self) -> str:
89 """Get OpenHCS version."""
90 try:
91 import openhcs
92 return getattr(openhcs, '__version__', 'unknown')
93 except:
94 return 'unknown'
96 def is_library_available(self) -> bool:
97 """OpenHCS is always available."""
98 return True
100 def get_library_object(self):
101 """Return OpenHCS processing module."""
102 import openhcs.processing
103 return openhcs.processing
105 def get_memory_type(self) -> str:
106 """Return placeholder memory type."""
107 return self.MEMORY_TYPE
109 def get_display_name(self) -> str:
110 """Get display name for OpenHCS."""
111 return "OpenHCS"
113 def get_module_patterns(self) -> List[str]:
114 """Get module patterns for OpenHCS."""
115 return ["openhcs"]
119 def discover_functions(self) -> Dict[str, FunctionMetadata]:
120 """Discover OpenHCS functions with explicit contracts."""
121 functions = {}
122 modules = self.get_modules_to_scan()
124 for module_name, module in modules:
125 import inspect
126 for name, func in inspect.getmembers(module, inspect.isfunction):
127 # Simple: if it has a processing contract, include it
128 if hasattr(func, '__processing_contract__'):
129 contract = getattr(func, '__processing_contract__')
131 # Apply contract wrapper (adds slice_by_slice for FLEXIBLE)
132 wrapped_func = self.apply_contract_wrapper(func, contract)
134 # Generate unique function name using module information
135 unique_name = self._generate_function_name(name, module_name)
137 metadata = FunctionMetadata(
138 name=unique_name,
139 func=wrapped_func,
140 contract=contract,
141 registry=self,
142 module=func.__module__ or "",
143 doc=(func.__doc__ or "").splitlines()[0] if func.__doc__ else "",
144 tags=["openhcs"],
145 original_name=name
146 )
148 functions[unique_name] = metadata
150 return functions
154 def _generate_function_name(self, original_name: str, module_name: str) -> str:
155 """Generate unique function name for OpenHCS functions."""
156 # Extract meaningful part from module name
157 if isinstance(module_name, str): 157 ↛ 169line 157 didn't jump to line 169 because the condition on line 157 was always true
158 module_parts = module_name.split('.')
159 # Find meaningful part after 'backends'
160 try:
161 backends_idx = module_parts.index('backends')
162 meaningful_parts = module_parts[backends_idx+1:]
163 if meaningful_parts: 163 ↛ 169line 163 didn't jump to line 169 because the condition on line 163 was always true
164 prefix = '_'.join(meaningful_parts)
165 return f"{prefix}_{original_name}"
166 except ValueError:
167 pass
169 return original_name
171 def _generate_tags(self, module_name: str) -> List[str]:
172 """Generate tags for OpenHCS functions."""
173 tags = ['openhcs']
175 # Add module-specific tags
176 if isinstance(module_name, str):
177 module_parts = module_name.split('.')
178 if 'analysis' in module_parts:
179 tags.append('analysis')
180 if 'preprocessing' in module_parts:
181 tags.append('preprocessing')
182 if 'segmentation' in module_parts:
183 tags.append('segmentation')
185 return tags