Coverage for openhcs/processing/backends/lib_registry/openhcs_registry.py: 70.0%

94 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2OpenHCS native function registry. 

3 

4This registry processes OpenHCS functions that have been decorated with 

5explicit contract declarations, allowing them to skip runtime testing 

6while producing the same FunctionMetadata format as external libraries. 

7""" 

8 

9import logging 

10import numpy as np 

11from typing import Dict, List, Tuple, Any, Callable 

12import importlib 

13from functools import wraps 

14 

15from openhcs.processing.backends.lib_registry.unified_registry import LibraryRegistryBase, FunctionMetadata, ProcessingContract 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class OpenHCSRegistry(LibraryRegistryBase): 

21 """ 

22 Registry for OpenHCS native functions with explicit contract support. 

23 

24 This registry processes OpenHCS functions that have been decorated with 

25 explicit contract declarations, allowing them to skip runtime testing 

26 while producing the same FunctionMetadata format as external libraries. 

27 """ 

28 

29 # Required abstract class attributes 

30 MODULES_TO_SCAN = [] # Will be set dynamically 

31 MEMORY_TYPE = None # OpenHCS functions have their own memory type attributes 

32 FLOAT_DTYPE = np.float32 

33 

34 def __init__(self): 

35 super().__init__("openhcs") 

36 # Set modules to scan to OpenHCS processing modules 

37 self.MODULES_TO_SCAN = self._get_openhcs_modules() 

38 

39 def _get_openhcs_modules(self) -> List[str]: 

40 """Get list of OpenHCS processing modules to scan using automatic discovery.""" 

41 import pkgutil 

42 import os 

43 

44 modules = [] 

45 

46 # Get the backends directory path 

47 backends_path = os.path.dirname(__file__) # lib_registry directory 

48 backends_path = os.path.dirname(backends_path) # backends directory 

49 

50 # Walk through all modules in openhcs.processing.backends recursively 

51 for importer, module_name, ispkg in pkgutil.walk_packages( 

52 [backends_path], 

53 "openhcs.processing.backends." 

54 ): 

55 # Skip lib_registry modules to avoid circular imports 

56 if "lib_registry" in module_name: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 continue 

58 

59 # Skip __pycache__ and other non-module files 

60 if "__pycache__" in module_name: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 continue 

62 

63 try: 

64 # Try to import the module to ensure it's valid 

65 importlib.import_module(module_name) 

66 modules.append(module_name) 

67 except ImportError as e: 

68 # Module has import issues, skip it but log for debugging 

69 logger.debug(f"Skipping module {module_name}: {e}") 

70 continue 

71 

72 return modules 

73 

74 def get_modules_to_scan(self) -> List[Tuple[str, Any]]: 

75 """Get modules to scan for OpenHCS functions.""" 

76 modules = [] 

77 for module_name in self.MODULES_TO_SCAN: 

78 try: 

79 module = importlib.import_module(module_name) 

80 modules.append((module_name, module)) 

81 except ImportError as e: 

82 logger.warning(f"Could not import OpenHCS module {module_name}: {e}") 

83 return modules 

84 

85 

86 

87 # ===== ESSENTIAL ABC METHODS ===== 

88 def get_library_version(self) -> str: 

89 """Get OpenHCS version.""" 

90 try: 

91 import openhcs 

92 return getattr(openhcs, '__version__', 'unknown') 

93 except: 

94 return 'unknown' 

95 

96 def is_library_available(self) -> bool: 

97 """OpenHCS is always available.""" 

98 return True 

99 

100 def get_library_object(self): 

101 """Return OpenHCS processing module.""" 

102 import openhcs.processing 

103 return openhcs.processing 

104 

105 def get_memory_type(self) -> str: 

106 """Return placeholder memory type.""" 

107 return self.MEMORY_TYPE 

108 

109 def get_display_name(self) -> str: 

110 """Get display name for OpenHCS.""" 

111 return "OpenHCS" 

112 

113 def get_module_patterns(self) -> List[str]: 

114 """Get module patterns for OpenHCS.""" 

115 return ["openhcs"] 

116 

117 

118 

119 def discover_functions(self) -> Dict[str, FunctionMetadata]: 

120 """Discover OpenHCS functions with explicit contracts.""" 

121 functions = {} 

122 modules = self.get_modules_to_scan() 

123 

124 for module_name, module in modules: 

125 import inspect 

126 for name, func in inspect.getmembers(module, inspect.isfunction): 

127 # Simple: if it has a processing contract, include it 

128 if hasattr(func, '__processing_contract__'): 

129 contract = getattr(func, '__processing_contract__') 

130 

131 # Apply contract wrapper (adds slice_by_slice for FLEXIBLE) 

132 wrapped_func = self.apply_contract_wrapper(func, contract) 

133 

134 # Generate unique function name using module information 

135 unique_name = self._generate_function_name(name, module_name) 

136 

137 metadata = FunctionMetadata( 

138 name=unique_name, 

139 func=wrapped_func, 

140 contract=contract, 

141 registry=self, 

142 module=func.__module__ or "", 

143 doc=(func.__doc__ or "").splitlines()[0] if func.__doc__ else "", 

144 tags=["openhcs"], 

145 original_name=name 

146 ) 

147 

148 functions[unique_name] = metadata 

149 

150 return functions 

151 

152 

153 

154 def _generate_function_name(self, original_name: str, module_name: str) -> str: 

155 """Generate unique function name for OpenHCS functions.""" 

156 # Extract meaningful part from module name 

157 if isinstance(module_name, str): 157 ↛ 169line 157 didn't jump to line 169 because the condition on line 157 was always true

158 module_parts = module_name.split('.') 

159 # Find meaningful part after 'backends' 

160 try: 

161 backends_idx = module_parts.index('backends') 

162 meaningful_parts = module_parts[backends_idx+1:] 

163 if meaningful_parts: 163 ↛ 169line 163 didn't jump to line 169 because the condition on line 163 was always true

164 prefix = '_'.join(meaningful_parts) 

165 return f"{prefix}_{original_name}" 

166 except ValueError: 

167 pass 

168 

169 return original_name 

170 

171 def _generate_tags(self, module_name: str) -> List[str]: 

172 """Generate tags for OpenHCS functions.""" 

173 tags = ['openhcs'] 

174 

175 # Add module-specific tags 

176 if isinstance(module_name, str): 

177 module_parts = module_name.split('.') 

178 if 'analysis' in module_parts: 

179 tags.append('analysis') 

180 if 'preprocessing' in module_parts: 

181 tags.append('preprocessing') 

182 if 'segmentation' in module_parts: 

183 tags.append('segmentation') 

184 

185 return tags