Coverage for openhcs/core/auto_register_meta.py: 73.8%
288 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Generic metaclass infrastructure for automatic plugin registration.
4This module provides reusable metaclass infrastructure for Pattern A registry systems
5(1:1 class-to-plugin mapping with automatic discovery). It eliminates code duplication
6across MicroscopeHandlerMeta, StorageBackendMeta, and ContextProviderMeta.
8Pattern Selection Guide:
9-----------------------
10Use AutoRegisterMeta (Pattern A) when:
11- You have a 1:1 mapping between classes and plugins
12- Plugins should be automatically discovered and registered
13- Registration happens at class definition time
14- Simple metadata (just a key and maybe one secondary registry)
16Use Service Pattern (Pattern B) when:
17- You have many-to-one mapping (multiple items per plugin)
18- Complex metadata (FunctionMetadata with 8+ fields)
19- Need aggregation across multiple sources
20- Examples: Function registry, Format registry
22Use Functional Registry (Pattern C) when:
23- Simple type-to-handler mappings
24- No state needed
25- Functional programming style preferred
26- Examples: Widget creation registries
28Use Manual Registration (Pattern D) when:
29- Complex initialization logic required
30- Explicit control over registration timing needed
31- Very few plugins (< 3)
32- Examples: ZMQ servers, Pipeline steps
34Architecture:
35------------
36AutoRegisterMeta uses a configuration-driven approach:
371. RegistryConfig defines registration behavior
382. AutoRegisterMeta applies the configuration during class creation
393. Domain-specific metaclasses provide thin wrappers with their config
41This maintains domain-specific features while eliminating duplication.
42"""
44import importlib
45import logging
46from abc import ABCMeta
47from dataclasses import dataclass
48from typing import Dict, Type, Optional, Callable, Any
50logger = logging.getLogger(__name__)
52# Lazy import to avoid circular dependency
53_registry_cache_manager = None
55def _get_cache_manager():
56 """Lazy import of RegistryCacheManager to avoid circular imports."""
57 global _registry_cache_manager
58 if _registry_cache_manager is None:
59 from openhcs.core.registry_cache import (
60 RegistryCacheManager,
61 CacheConfig,
62 serialize_plugin_class,
63 deserialize_plugin_class,
64 get_package_file_mtimes
65 )
66 _registry_cache_manager = {
67 'RegistryCacheManager': RegistryCacheManager,
68 'CacheConfig': CacheConfig,
69 'serialize_plugin_class': serialize_plugin_class,
70 'deserialize_plugin_class': deserialize_plugin_class,
71 'get_package_file_mtimes': get_package_file_mtimes
72 }
73 return _registry_cache_manager
76# Type aliases for clarity
77RegistryDict = Dict[str, Type]
78KeyExtractor = Callable[[str, Type], str]
80# Constants for key sources
81PRIMARY_KEY = 'primary'
84class SecondaryRegistryDict(dict):
85 """
86 Dict for secondary registries that auto-triggers primary registry discovery.
88 When accessed, this dict triggers discovery of the primary registry,
89 which populates both the primary and secondary registries.
90 """
92 def __init__(self, primary_registry: 'LazyDiscoveryDict'):
93 super().__init__()
94 self._primary_registry = primary_registry
96 def _ensure_discovered(self):
97 """Trigger discovery of primary registry (which populates this secondary registry)."""
98 if hasattr(self._primary_registry, '_discover'): 98 ↛ exitline 98 didn't return from function '_ensure_discovered' because the condition on line 98 was always true
99 self._primary_registry._discover()
101 def __getitem__(self, key):
102 self._ensure_discovered()
103 return super().__getitem__(key)
105 def __contains__(self, key):
106 self._ensure_discovered()
107 return super().__contains__(key)
109 def __iter__(self):
110 self._ensure_discovered()
111 return super().__iter__()
113 def __len__(self):
114 self._ensure_discovered()
115 return super().__len__()
117 def keys(self):
118 self._ensure_discovered()
119 return super().keys()
121 def values(self):
122 self._ensure_discovered()
123 return super().values()
125 def items(self):
126 self._ensure_discovered()
127 return super().items()
129 def get(self, key, default=None):
130 self._ensure_discovered()
131 return super().get(key, default)
134class LazyDiscoveryDict(dict):
135 """
136 Dict that auto-discovers plugins on first access with optional caching.
138 Supports caching discovered plugins to speed up subsequent application starts.
139 Cache is validated against package version and file modification times.
140 """
142 def __init__(self, enable_cache: bool = True):
143 """
144 Initialize lazy discovery dict.
146 Args:
147 enable_cache: If True, use caching to speed up discovery
148 """
149 super().__init__()
150 self._base_class = None
151 self._config = None
152 self._discovered = False
153 self._enable_cache = enable_cache
154 self._cache_manager = None
156 def _set_config(self, base_class: Type, config: 'RegistryConfig') -> None:
157 self._base_class = base_class
158 self._config = config
160 # Initialize cache manager if caching is enabled
161 if self._enable_cache and config.discovery_package: 161 ↛ exitline 161 didn't return from function '_set_config' because the condition on line 161 was always true
162 try:
163 cache_utils = _get_cache_manager()
165 # Get version getter (use openhcs version)
166 def get_version():
167 try:
168 import openhcs
169 return openhcs.__version__
170 except:
171 return "unknown"
173 self._cache_manager = cache_utils['RegistryCacheManager'](
174 cache_name=f"{config.registry_name.replace(' ', '_')}_registry",
175 version_getter=get_version,
176 serializer=cache_utils['serialize_plugin_class'],
177 deserializer=cache_utils['deserialize_plugin_class'],
178 config=cache_utils['CacheConfig'](
179 max_age_days=7,
180 check_mtimes=True # Validate file modifications
181 )
182 )
183 except Exception as e:
184 logger.debug(f"Failed to initialize cache manager: {e}")
185 self._cache_manager = None
187 def _discover(self) -> None:
188 """Run discovery once, using cache if available."""
189 if self._discovered or not self._config or not self._config.discovery_package:
190 return
191 self._discovered = True
193 # Try to load from cache first
194 if self._cache_manager: 194 ↛ 208line 194 didn't jump to line 208 because the condition on line 194 was always true
195 try:
196 cached_plugins = self._cache_manager.load_cache()
197 if cached_plugins is not None:
198 # Reconstruct registry from cache
199 self.update(cached_plugins)
200 logger.info(
201 f"✅ Loaded {len(self)} {self._config.registry_name}s from cache"
202 )
203 return
204 except Exception as e:
205 logger.debug(f"Cache load failed for {self._config.registry_name}: {e}")
207 # Cache miss or disabled - perform full discovery
208 try:
209 pkg = importlib.import_module(self._config.discovery_package)
211 if self._config.discovery_function: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 self._config.discovery_function(
213 pkg.__path__,
214 f"{self._config.discovery_package}.",
215 self._base_class
216 )
217 else:
218 root = self._config.discovery_package.split('.')[0]
219 mod = importlib.import_module(f"{root}.core.registry_discovery")
220 func = (
221 mod.discover_registry_classes_recursive
222 if self._config.discovery_recursive
223 else mod.discover_registry_classes
224 )
225 func(pkg.__path__, f"{self._config.discovery_package}.", self._base_class)
227 logger.debug(f"Discovered {len(self)} {self._config.registry_name}s")
229 # Save to cache if enabled
230 if self._cache_manager: 230 ↛ exitline 230 didn't return from function '_discover' because the condition on line 230 was always true
231 try:
232 cache_utils = _get_cache_manager()
233 file_mtimes = cache_utils['get_package_file_mtimes'](
234 self._config.discovery_package
235 )
236 self._cache_manager.save_cache(dict(self), file_mtimes)
237 except Exception as e:
238 logger.debug(f"Failed to save cache for {self._config.registry_name}: {e}")
240 except Exception as e:
241 logger.warning(f"Discovery failed: {e}")
243 def __getitem__(self, k):
244 self._discover()
245 return super().__getitem__(k)
247 def __contains__(self, k):
248 self._discover()
249 return super().__contains__(k)
251 def __iter__(self):
252 self._discover()
253 return super().__iter__()
255 def __len__(self):
256 self._discover()
257 return super().__len__()
259 def keys(self):
260 self._discover()
261 return super().keys()
263 def values(self):
264 self._discover()
265 return super().values()
267 def items(self):
268 self._discover()
269 return super().items()
271 def get(self, k, default=None):
272 self._discover()
273 return super().get(k, default)
276@dataclass(frozen=True)
277class SecondaryRegistry:
278 """Configuration for a secondary registry (e.g., metadata handlers)."""
279 registry_dict: RegistryDict
280 key_source: str # 'primary' or attribute name
281 attr_name: str # Attribute to check on the class
284@dataclass(frozen=True)
285class RegistryConfig:
286 """
287 Configuration for automatic class registration behavior.
289 This dataclass encapsulates all the configuration needed for metaclass
290 registration, making the pattern explicit and easy to understand.
292 Attributes:
293 registry_dict: Dictionary to register classes into (e.g., MICROSCOPE_HANDLERS)
294 key_attribute: Name of class attribute containing the registration key
295 (e.g., '_microscope_type', '_backend_type', '_context_type')
296 key_extractor: Optional function to derive key from class name if key_attribute
297 is not set. Signature: (class_name: str, cls: Type) -> str
298 skip_if_no_key: If True, skip registration when key_attribute is None.
299 If False, require either key_attribute or key_extractor.
300 secondary_registries: Optional list of secondary registry configurations
301 log_registration: If True, log debug message when class is registered
302 registry_name: Human-readable name for logging (e.g., 'microscope handler')
303 discovery_package: Optional package name to auto-discover (e.g., 'openhcs.microscopes')
304 discovery_recursive: If True, use recursive discovery (default: False)
306 Examples:
307 # Microscope handlers with name-based key extraction and secondary registry
308 RegistryConfig(
309 registry_dict=MICROSCOPE_HANDLERS,
310 key_attribute='_microscope_type',
311 key_extractor=extract_key_from_handler_suffix,
312 skip_if_no_key=False,
313 secondary_registries=[
314 SecondaryRegistry(
315 registry_dict=METADATA_HANDLERS,
316 key_source=PRIMARY_KEY,
317 attr_name='_metadata_handler_class'
318 )
319 ],
320 log_registration=True,
321 registry_name='microscope handler'
322 )
324 # Storage backends with explicit key and skip-if-none behavior
325 RegistryConfig(
326 registry_dict=STORAGE_BACKENDS,
327 key_attribute='_backend_type',
328 skip_if_no_key=True,
329 registry_name='storage backend'
330 )
332 # Context providers with simple explicit key
333 RegistryConfig(
334 registry_dict=CONTEXT_PROVIDERS,
335 key_attribute='_context_type',
336 skip_if_no_key=True,
337 registry_name='context provider'
338 )
339 """
340 registry_dict: RegistryDict
341 key_attribute: str
342 key_extractor: Optional[KeyExtractor] = None
343 skip_if_no_key: bool = False
344 secondary_registries: Optional[list[SecondaryRegistry]] = None
345 log_registration: bool = True
346 registry_name: str = "plugin"
347 discovery_package: Optional[str] = None # Auto-inferred from base class module if None
348 discovery_recursive: bool = False
349 discovery_function: Optional[Callable] = None # Custom discovery function
352class AutoRegisterMeta(ABCMeta):
353 """
354 Generic metaclass for automatic plugin registration (Pattern A).
356 This metaclass automatically registers concrete classes in a global registry
357 when they are defined, eliminating the need for manual registration calls.
359 Features:
360 - Skips abstract classes (checks __abstractmethods__)
361 - Supports explicit keys via class attributes
362 - Supports derived keys via key extraction functions
363 - Supports secondary registries (e.g., metadata handlers)
364 - Configurable skip-if-no-key behavior
365 - Debug logging for registration events
367 Usage:
368 # Create domain-specific metaclass
369 class MicroscopeHandlerMeta(AutoRegisterMeta):
370 def __new__(mcs, name, bases, attrs):
371 return super().__new__(mcs, name, bases, attrs,
372 registry_config=_MICROSCOPE_REGISTRY_CONFIG)
374 # Use in class definition
375 class ImageXpressHandler(MicroscopeHandler, metaclass=MicroscopeHandlerMeta):
376 _microscope_type = 'imagexpress' # Optional if key_extractor is provided
377 _metadata_handler_class = ImageXpressMetadata # Optional secondary registration
379 Design Principles:
380 - Explicit configuration over magic behavior
381 - Preserve all domain-specific features
382 - Zero breaking changes to existing code
383 - Easy to understand and debug
384 """
386 def __new__(mcs, name: str, bases: tuple, attrs: dict,
387 registry_config: Optional[RegistryConfig] = None):
388 """
389 Create a new class and register it if appropriate.
391 Args:
392 name: Name of the class being created
393 bases: Base classes
394 attrs: Class attributes dictionary
395 registry_config: Configuration for registration behavior.
396 If None, auto-configures from class attributes or skips registration.
398 Returns:
399 The newly created class
400 """
401 # Create the class using ABCMeta
402 new_class = super().__new__(mcs, name, bases, attrs)
404 # Auto-configure registry if not provided but class has __registry__ attributes
405 if registry_config is None:
406 registry_config = mcs._auto_configure_registry(new_class, attrs)
407 if registry_config is None: 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true
408 return new_class # No config and no auto-config possible
410 # Set up lazy discovery if registry dict supports it (only once for base class)
411 if isinstance(registry_config.registry_dict, LazyDiscoveryDict) and not registry_config.registry_dict._config:
412 from dataclasses import replace
414 # Auto-infer discovery_package from base class module if not specified
415 config = registry_config
416 if config.discovery_package is None: 416 ↛ 426line 416 didn't jump to line 426 because the condition on line 416 was always true
417 # Extract package from base class module (e.g., 'openhcs.microscopes.microscope_base' → 'openhcs.microscopes')
418 module_parts = new_class.__module__.rsplit('.', 1)
419 inferred_package = module_parts[0] if len(module_parts) > 1 else new_class.__module__
420 # Create new config with inferred package
421 config = replace(config, discovery_package=inferred_package)
422 logger.debug(f"Auto-inferred discovery_package='{inferred_package}' from {new_class.__module__}")
424 # Auto-infer discovery_recursive based on package structure
425 # Check if package has subdirectories with __init__.py (indicating nested structure)
426 if config.discovery_package: 426 ↛ 454line 426 didn't jump to line 454 because the condition on line 426 was always true
427 try:
428 pkg = importlib.import_module(config.discovery_package)
429 if hasattr(pkg, '__path__'): 429 ↛ 454line 429 didn't jump to line 454 because the condition on line 429 was always true
430 import os
431 has_subpackages = False
432 for path in pkg.__path__:
433 if os.path.isdir(path): 433 ↛ 440line 433 didn't jump to line 440 because the condition on line 433 was always true
434 # Check if any subdirectories contain __init__.py
435 for entry in os.listdir(path):
436 subdir = os.path.join(path, entry)
437 if os.path.isdir(subdir) and os.path.exists(os.path.join(subdir, '__init__.py')): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 has_subpackages = True
439 break
440 if has_subpackages: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 break
443 # Only override if discovery_recursive is still at default (False)
444 # This allows explicit overrides to take precedence
445 if has_subpackages and not config.discovery_recursive: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true
446 config = replace(config, discovery_recursive=True)
447 logger.debug(f"Auto-inferred discovery_recursive=True for '{config.discovery_package}' (has subpackages)")
448 elif not has_subpackages and config.discovery_recursive: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true
449 logger.debug(f"Keeping explicit discovery_recursive=True for '{config.discovery_package}' (no subpackages detected)")
450 except Exception as e:
451 logger.debug(f"Failed to auto-infer discovery_recursive: {e}")
453 # Auto-wrap secondary registries with SecondaryRegistryDict
454 if config.secondary_registries:
455 import sys
456 wrapped_secondaries = []
457 module = sys.modules.get(new_class.__module__)
459 for sec_reg in config.secondary_registries:
460 # Check if secondary registry needs wrapping
461 if isinstance(sec_reg.registry_dict, dict) and not isinstance(sec_reg.registry_dict, SecondaryRegistryDict): 461 ↛ 483line 461 didn't jump to line 483 because the condition on line 461 was always true
462 # Create a new SecondaryRegistryDict wrapping the primary registry
463 wrapped_dict = SecondaryRegistryDict(registry_config.registry_dict)
464 # Copy any existing entries from the old dict
465 wrapped_dict.update(sec_reg.registry_dict)
467 # Find and update the module global variable
468 if module: 468 ↛ 476line 468 didn't jump to line 476 because the condition on line 468 was always true
469 for var_name, var_value in vars(module).items(): 469 ↛ 476line 469 didn't jump to line 476 because the loop on line 469 didn't complete
470 if var_value is sec_reg.registry_dict:
471 setattr(module, var_name, wrapped_dict)
472 logger.debug(f"Auto-wrapped secondary registry '{var_name}' in {new_class.__module__}")
473 break
475 # Create new SecondaryRegistry with wrapped dict
476 wrapped_sec_reg = SecondaryRegistry(
477 registry_dict=wrapped_dict,
478 key_source=sec_reg.key_source,
479 attr_name=sec_reg.attr_name
480 )
481 wrapped_secondaries.append(wrapped_sec_reg)
482 else:
483 wrapped_secondaries.append(sec_reg)
485 # Rebuild config with wrapped secondary registries
486 config = replace(config, secondary_registries=wrapped_secondaries)
488 registry_config.registry_dict._set_config(new_class, config)
490 # Only register concrete classes (not abstract base classes)
491 if not bases or getattr(new_class, '__abstractmethods__', None):
492 return new_class
494 # Get or derive the registration key
495 key = mcs._get_registration_key(name, new_class, registry_config)
497 # Handle missing key
498 if key is None: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 return mcs._handle_missing_key(name, registry_config)
501 # Register in primary registry
502 mcs._register_class(new_class, key, registry_config)
504 # Handle secondary registrations
505 if registry_config.secondary_registries:
506 mcs._register_secondary(new_class, key, registry_config.secondary_registries)
508 # Log registration if enabled
509 if registry_config.log_registration: 509 ↛ 512line 509 didn't jump to line 512 because the condition on line 509 was always true
510 logger.debug(f"Auto-registered {name} as '{key}' {registry_config.registry_name}")
512 return new_class
514 @staticmethod
515 def _get_registration_key(name: str, cls: Type, config: RegistryConfig) -> Optional[str]:
516 """Get the registration key for a class (explicit or derived)."""
517 # Try explicit key first
518 key = getattr(cls, config.key_attribute, None)
519 if key is not None: 519 ↛ 523line 519 didn't jump to line 523 because the condition on line 519 was always true
520 return key
522 # Try key extractor if provided
523 if config.key_extractor is not None:
524 return config.key_extractor(name, cls)
526 return None
528 @staticmethod
529 def _handle_missing_key(name: str, config: RegistryConfig) -> Type:
530 """Handle case where no registration key is available."""
531 if config.skip_if_no_key:
532 if config.log_registration:
533 logger.debug(f"Skipping registration for {name} - no {config.key_attribute}")
534 return None # Will be returned from __new__
535 else:
536 raise ValueError(
537 f"Class {name} must have {config.key_attribute} attribute "
538 f"or provide a key_extractor in registry config"
539 )
541 @classmethod
542 def _auto_configure_registry(mcs, new_class: Type, attrs: dict) -> Optional[RegistryConfig]:
543 """
544 Auto-configure registry from metaclass OR base class attributes.
546 Priority:
547 1. Metaclass attributes (__registry_dict__, __registry_key__ on metaclass)
548 2. Base class attributes (__registry_key__ on class, auto-create __registry__)
549 3. Parent class __registry__ (inherit from parent)
551 Returns:
552 RegistryConfig if auto-configuration successful, None otherwise
553 """
554 # Check if the metaclass has __registry_dict__ attribute (old style)
555 registry_dict = getattr(mcs, '__registry_dict__', None)
557 # If no metaclass registry, check if base class wants auto-creation or inheritance
558 if registry_dict is None: 558 ↛ 588line 558 didn't jump to line 588 because the condition on line 558 was always true
559 # First check if any parent class has __registry__ (inherit from parent)
560 # This takes priority over creating a new registry
561 for base in new_class.__mro__[1:]: # Skip self
562 if hasattr(base, '__registry__'):
563 registry_dict = base.__registry__
564 key_attribute = getattr(base, '__registry_key__', None)
565 key_extractor = getattr(base, '__key_extractor__', None)
566 skip_if_no_key = getattr(base, '__skip_if_no_key__', True)
567 secondary_registries = getattr(base, '__secondary_registries__', None)
568 registry_name = getattr(base, '__registry_name__', None)
569 break
570 else:
571 # No parent registry found - check if class explicitly defines __registry_key__
572 # (only create new registry if __registry_key__ is in the class body, not inherited)
573 key_attribute = attrs.get('__registry_key__')
574 if key_attribute is not None: 574 ↛ 585line 574 didn't jump to line 585 because the condition on line 574 was always true
575 # Auto-create registry dict and store on the class
576 registry_dict = LazyDiscoveryDict()
577 new_class.__registry__ = registry_dict
579 # Get other optional attributes from class
580 key_extractor = attrs.get('__key_extractor__')
581 skip_if_no_key = attrs.get('__skip_if_no_key__', True)
582 secondary_registries = attrs.get('__secondary_registries__')
583 registry_name = attrs.get('__registry_name__')
584 else:
585 return None # No registry configuration found
586 else:
587 # Old style: get from metaclass
588 key_attribute = getattr(mcs, '__registry_key__', '_registry_key')
589 key_extractor = getattr(mcs, '__key_extractor__', None)
590 skip_if_no_key = getattr(mcs, '__skip_if_no_key__', True)
591 secondary_registries = getattr(mcs, '__secondary_registries__', None)
592 registry_name = getattr(mcs, '__registry_name__', None)
594 # Auto-derive registry name if not provided
595 if registry_name is None: 595 ↛ 606line 595 didn't jump to line 606 because the condition on line 595 was always true
596 # Derive from class name: "StorageBackend" → "storage backend"
597 clean_name = new_class.__name__
598 for suffix in ['Base', 'Meta', 'Handler', 'Registry']:
599 if clean_name.endswith(suffix):
600 clean_name = clean_name[:-len(suffix)]
601 break
602 # Convert CamelCase to space-separated lowercase
603 import re
604 registry_name = re.sub(r'([A-Z])', r' \1', clean_name).strip().lower()
606 logger.debug(f"Auto-configured registry for {new_class.__name__}: "
607 f"key_attribute={key_attribute}, registry_name={registry_name}")
609 return RegistryConfig(
610 registry_dict=registry_dict,
611 key_attribute=key_attribute,
612 key_extractor=key_extractor,
613 skip_if_no_key=skip_if_no_key,
614 secondary_registries=secondary_registries,
615 registry_name=registry_name
616 )
618 @staticmethod
619 def _register_class(cls: Type, key: str, config: RegistryConfig) -> None:
620 """Register class in primary registry."""
621 config.registry_dict[key] = cls
622 setattr(cls, config.key_attribute, key)
624 @staticmethod
625 def _register_secondary(
626 cls: Type,
627 primary_key: str,
628 secondary_registries: list[SecondaryRegistry]
629 ) -> None:
630 """Handle secondary registry registrations."""
631 for sec_reg in secondary_registries:
632 value = getattr(cls, sec_reg.attr_name, None)
633 if value is None: 633 ↛ 637line 633 didn't jump to line 637 because the condition on line 633 was always true
634 continue
636 # Determine the key for secondary registration
637 if sec_reg.key_source == PRIMARY_KEY:
638 secondary_key = primary_key
639 else:
640 secondary_key = getattr(cls, sec_reg.key_source, None)
641 if secondary_key is None:
642 logger.warning(
643 f"Cannot register {sec_reg.attr_name} for {cls.__name__} - "
644 f"no {sec_reg.key_source} attribute"
645 )
646 continue
648 # Register in secondary registry
649 sec_reg.registry_dict[secondary_key] = value
650 logger.debug(f"Auto-registered {sec_reg.attr_name} from {cls.__name__} as '{secondary_key}'")
653# Helper functions for common key extraction patterns
655def make_suffix_extractor(suffix: str) -> KeyExtractor:
656 """
657 Create a key extractor that removes a suffix from class names.
659 Args:
660 suffix: The suffix to remove (e.g., 'Handler', 'Backend')
662 Returns:
663 A key extractor function
665 Examples:
666 extract_handler = make_suffix_extractor('Handler')
667 extract_handler('ImageXpressHandler', cls) -> 'imagexpress'
669 extract_backend = make_suffix_extractor('Backend')
670 extract_backend('DiskStorageBackend', cls) -> 'diskstorage'
671 """
672 suffix_len = len(suffix)
674 def extractor(name: str, cls: Type) -> str:
675 if name.endswith(suffix):
676 return name[:-suffix_len].lower()
677 return name.lower()
679 return extractor
682# Pre-built extractors for common patterns
683extract_key_from_handler_suffix = make_suffix_extractor('Handler')
684extract_key_from_backend_suffix = make_suffix_extractor('Backend')