Coverage for openhcs/introspection/signature_analyzer.py: 37.7%
589 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1# File: openhcs/introspection/signature_analyzer.py
3import ast
4import inspect
5import dataclasses
6import re
7from typing import Any, Dict, Callable, get_type_hints, NamedTuple, Union, Optional, Type
8from dataclasses import dataclass
10# Lazy imports for OpenHCS-specific type resolution (optional dependency)
11# These are only imported when needed for type hint resolution
12_lazy_module = None
13_config_module = None
16def _get_openhcs_modules():
17 """Lazy-load OpenHCS-specific modules for type resolution."""
18 global _lazy_module, _config_module
19 if _lazy_module is None:
20 try:
21 import openhcs.config_framework.lazy_factory as lazy_module
22 import openhcs.core.config as config_module
23 _lazy_module = lazy_module
24 _config_module = config_module
25 except ImportError:
26 # If OpenHCS modules aren't available, return empty dicts
27 _lazy_module = type('EmptyModule', (), {})()
28 _config_module = type('EmptyModule', (), {})()
29 return _lazy_module, _config_module
32@dataclass(frozen=True)
33class AnalysisConstants:
34 """Constants for signature analysis to eliminate magic strings."""
35 INIT_METHOD_SUFFIX: str = ".__init__"
36 SELF_PARAM: str = "self"
37 CLS_PARAM: str = "cls"
38 DUNDER_PREFIX: str = "__"
39 DUNDER_SUFFIX: str = "__"
42# Create constants instance for use throughout the module
43CONSTANTS = AnalysisConstants()
46class ParameterInfo(NamedTuple):
47 """Information about a parameter."""
48 name: str
49 param_type: type
50 default_value: Any
51 is_required: bool
52 description: Optional[str] = None # Add parameter description from docstring
54class DocstringInfo(NamedTuple):
55 """Information extracted from a docstring."""
56 summary: Optional[str] = None # First line or brief description
57 description: Optional[str] = None # Full description
58 parameters: Dict[str, str] = None # Parameter name -> description mapping
59 returns: Optional[str] = None # Return value description
60 examples: Optional[str] = None # Usage examples
62class DocstringExtractor:
63 """Extract structured information from docstrings."""
65 @staticmethod
66 def extract(target: Union[Callable, type]) -> DocstringInfo:
67 """Extract docstring information from function or class.
69 Args:
70 target: Function, method, or class to extract docstring from
72 Returns:
73 DocstringInfo with parsed docstring components
74 """
75 if not target: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 return DocstringInfo()
78 # ENHANCEMENT: Handle lazy dataclasses by extracting from their base class
79 actual_target = DocstringExtractor._resolve_lazy_target(target)
81 docstring = inspect.getdoc(actual_target)
82 if not docstring: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 return DocstringInfo()
85 # Try AST-based parsing first for better accuracy
86 try:
87 return DocstringExtractor._parse_docstring_ast(actual_target, docstring)
88 except Exception:
89 # Fall back to regex-based parsing
90 return DocstringExtractor._parse_docstring(docstring)
92 @staticmethod
93 def _resolve_lazy_target(target: Union[Callable, type]) -> Union[Callable, type]:
94 """Resolve lazy dataclass to its base class for docstring extraction.
96 Lazy dataclasses are dynamically created and may not have proper docstrings.
97 This method attempts to find the original base class that the lazy class
98 was created from.
99 """
100 if not hasattr(target, '__name__'): 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 return target
103 # Check if this looks like a lazy dataclass (starts with "Lazy")
104 if target.__name__.startswith('Lazy'): 104 ↛ 106line 104 didn't jump to line 106 because the condition on line 104 was never true
105 # Try to find the base class in the MRO
106 for base in getattr(target, '__mro__', []):
107 if base != target and base.__name__ != 'object':
108 # Found a base class that's not the lazy class itself
109 if not base.__name__.startswith('Lazy'):
110 return base
112 return target
114 @staticmethod
115 def _parse_docstring_ast(target: Union[Callable, type], docstring: str) -> DocstringInfo:
116 """Parse docstring using AST for more accurate extraction.
118 This method uses AST to parse the source code and extract docstring
119 information more accurately, especially for complex multiline descriptions.
120 """
121 try:
122 # Get source code
123 source = inspect.getsource(target)
124 tree = ast.parse(source)
126 # Find the function/class node
127 for node in ast.walk(tree): 127 ↛ 133line 127 didn't jump to line 133 because the loop on line 127 didn't complete
128 if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
129 if ast.get_docstring(node) == docstring: 129 ↛ 127line 129 didn't jump to line 127 because the condition on line 129 was always true
130 return DocstringExtractor._parse_ast_docstring(node, docstring)
132 # Fallback to regex parsing if AST parsing fails
133 return DocstringExtractor._parse_docstring(docstring)
135 except Exception:
136 # Fallback to regex parsing
137 return DocstringExtractor._parse_docstring(docstring)
139 @staticmethod
140 def _parse_ast_docstring(node: Union[ast.FunctionDef, ast.ClassDef], docstring: str) -> DocstringInfo:
141 """Parse docstring from AST node with enhanced multiline support."""
142 # For now, use the improved regex parser
143 # This can be extended later with more sophisticated AST-based parsing
144 return DocstringExtractor._parse_docstring(docstring)
146 @staticmethod
147 def _parse_docstring(docstring: str) -> DocstringInfo:
148 """Parse a docstring into structured components with improved multiline support.
150 Supports multiple docstring formats:
151 - Google style (Args:, Returns:, Examples:)
152 - NumPy style (Parameters, Returns, Examples)
153 - Sphinx style (:param name:, :returns:)
154 - Simple format (just description)
156 Uses improved parsing for multiline parameter descriptions that continues
157 until a blank line or new parameter/section is encountered.
158 """
159 lines = docstring.strip().split('\n')
161 summary = None
162 description_lines = []
163 parameters = {}
164 returns = None
165 examples = None
167 current_section = 'description'
168 current_param = None
169 current_param_lines = []
171 def _finalize_current_param():
172 """Finalize the current parameter description."""
173 if current_param and current_param_lines:
174 param_desc = '\n'.join(current_param_lines).strip()
175 parameters[current_param] = param_desc
177 for i, line in enumerate(lines):
178 original_line = line
179 line = line.strip()
181 # Handle both Google/Sphinx style (with colons) and NumPy style (without colons)
182 if line.lower() in ('args:', 'arguments:', 'parameters:'):
183 _finalize_current_param()
184 current_param = None
185 current_param_lines = []
186 current_section = 'parameters'
187 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 continue
189 continue
190 elif line.lower() in ('args', 'arguments', 'parameters') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'):
191 # NumPy-style section headers (without colons, followed by dashes)
192 _finalize_current_param()
193 current_param = None
194 current_param_lines = []
195 current_section = 'parameters'
196 continue
197 elif line.lower() in ('returns:', 'return:'): 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true
198 _finalize_current_param()
199 current_param = None
200 current_param_lines = []
201 current_section = 'returns'
202 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator
203 continue
204 continue
205 elif line.lower() in ('returns', 'return') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'):
206 # NumPy-style returns section
207 _finalize_current_param()
208 current_param = None
209 current_param_lines = []
210 current_section = 'returns'
211 continue
212 elif line.lower() in ('examples:', 'example:'): 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 _finalize_current_param()
214 current_param = None
215 current_param_lines = []
216 current_section = 'examples'
217 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator
218 continue
219 continue
220 elif line.lower() in ('examples', 'example') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'):
221 # NumPy-style examples section
222 _finalize_current_param()
223 current_param = None
224 current_param_lines = []
225 current_section = 'examples'
226 continue
228 if current_section == 'description':
229 if not summary and line:
230 summary = line
231 else:
232 description_lines.append(original_line) # Keep original indentation
234 elif current_section == 'parameters':
235 # Enhanced parameter parsing to handle multiple formats
236 param_match_google = re.match(r'^(\w+):\s*(.+)', line)
237 param_match_sphinx = re.match(r'^:param\s+(\w+):\s*(.+)', line)
238 param_match_numpy = re.match(r'^(\w+)\s*:\s*(.+)', line)
239 # New: Handle pyclesperanto-style inline parameters (param_name: type description)
240 param_match_inline = re.match(r'^(\w+):\s*(\w+(?:\[.*?\])?|\w+(?:\s*\|\s*\w+)*)\s+(.+)', line)
241 # New: Handle parameters that start with bullet points or dashes
242 param_match_bullet = re.match(r'^[-•*]\s*(\w+):\s*(.+)', line)
244 if param_match_google or param_match_sphinx or param_match_numpy or param_match_inline or param_match_bullet:
245 _finalize_current_param()
247 if param_match_google:
248 param_name, param_desc = param_match_google.groups()
249 elif param_match_sphinx: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 param_name, param_desc = param_match_sphinx.groups()
251 elif param_match_numpy: 251 ↛ 253line 251 didn't jump to line 253 because the condition on line 251 was always true
252 param_name, param_desc = param_match_numpy.groups()
253 elif param_match_inline:
254 param_name, param_type, param_desc = param_match_inline.groups()
255 param_desc = f"{param_type} - {param_desc}" # Include type in description
256 elif param_match_bullet:
257 param_name, param_desc = param_match_bullet.groups()
259 current_param = param_name
260 current_param_lines = [param_desc.strip()]
261 elif current_param and (original_line.startswith(' ') or original_line.startswith('\t')):
262 # Indented continuation line
263 current_param_lines.append(line)
264 elif not line:
265 _finalize_current_param()
266 current_param = None
267 current_param_lines = []
268 elif current_param:
269 # Non-indented continuation line (part of the same block)
270 current_param_lines.append(line)
271 else:
272 # Try to parse inline parameter definitions in a single block
273 # This handles cases where parameters are listed without clear separation
274 inline_params = DocstringExtractor._parse_inline_parameters(line)
275 for param_name, param_desc in inline_params.items(): 275 ↛ 276line 275 didn't jump to line 276 because the loop on line 275 never started
276 parameters[param_name] = param_desc
278 elif current_section == 'returns':
279 if returns is None:
280 returns = line
281 else:
282 returns += '\n' + line
284 elif current_section == 'examples': 284 ↛ 177line 284 didn't jump to line 177 because the condition on line 284 was always true
285 if examples is None:
286 examples = line
287 else:
288 examples += '\n' + line
290 _finalize_current_param()
292 description = '\n'.join(description_lines).strip()
293 if description == summary: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 description = None
296 return DocstringInfo(
297 summary=summary,
298 description=description,
299 parameters=parameters or {},
300 returns=returns,
301 examples=examples
302 )
304 @staticmethod
305 def _parse_inline_parameters(line: str) -> Dict[str, str]:
306 """Parse parameters from a single line containing multiple parameter definitions.
308 Handles formats like:
309 - "input_image: Image Input image to process. footprint: Image Structuring element..."
310 - "param1: type1 description1. param2: type2 description2."
311 """
312 parameters = {}
314 import re
316 # Strategy: Use a flexible pattern that works with the pyclesperanto format
317 # Pattern matches: param_name: everything up to the next param_name: or end of string
318 param_pattern = r'(\w+):\s*([^:]*?)(?=\s+\w+:|$)'
319 matches = re.findall(param_pattern, line)
321 for param_name, param_desc in matches:
322 if param_desc.strip(): 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was never true
323 # Clean up the description (remove trailing periods, extra whitespace)
324 clean_desc = param_desc.strip().rstrip('.')
325 parameters[param_name] = clean_desc
327 return parameters
330class SignatureAnalyzer:
331 """Universal analyzer for extracting parameter information from any target."""
333 # Class-level cache for field documentation to avoid re-parsing
334 _field_docs_cache = {}
336 # Class-level cache for dataclass analysis results to avoid expensive AST parsing
337 _dataclass_analysis_cache = {}
339 @staticmethod
340 def analyze(target: Union[Callable, Type, object], skip_first_param: Optional[bool] = None) -> Dict[str, ParameterInfo]:
341 """Extract parameter information from any target: function, constructor, dataclass, or instance.
343 Args:
344 target: Function, constructor, dataclass type, or dataclass instance
345 skip_first_param: Whether to skip the first parameter (after self/cls).
346 If None, auto-detects based on context:
347 - False for step constructors (all params are configuration)
348 - True for image processing functions (first param is image data)
350 Returns:
351 Dict mapping parameter names to ParameterInfo
352 """
353 if not target: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 return {}
356 # Dispatch based on target type
357 if inspect.isclass(target): 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 if dataclasses.is_dataclass(target):
359 return SignatureAnalyzer._analyze_dataclass(target)
360 else:
361 # Try to analyze constructor
362 return SignatureAnalyzer._analyze_callable(target.__init__, skip_first_param)
363 elif dataclasses.is_dataclass(target): 363 ↛ 365line 363 didn't jump to line 365 because the condition on line 363 was never true
364 # Instance of dataclass
365 return SignatureAnalyzer._analyze_dataclass_instance(target)
366 else:
367 # Function, method, or other callable
368 return SignatureAnalyzer._analyze_callable(target, skip_first_param)
370 @staticmethod
371 def _analyze_callable(callable_obj: Callable, skip_first_param: Optional[bool] = None) -> Dict[str, ParameterInfo]:
372 """Extract parameter information from callable signature.
374 Args:
375 callable_obj: The callable to analyze
376 skip_first_param: Whether to skip the first parameter (after self/cls).
377 If None, auto-detects based on context.
378 """
379 sig = inspect.signature(callable_obj)
380 # Build comprehensive namespace for forward reference resolution
381 # Start with function's globals (which contain the actual types), then add our modules as fallback
382 lazy_module, config_module = _get_openhcs_modules()
383 globalns = {
384 **vars(lazy_module),
385 **vars(config_module),
386 **getattr(callable_obj, '__globals__', {})
387 }
389 # For OpenHCS functions, prioritize the function's actual module globals
390 if hasattr(callable_obj, '__module__') and callable_obj.__module__: 390 ↛ 404line 390 didn't jump to line 404 because the condition on line 390 was always true
391 try:
392 import sys
393 actual_module = sys.modules.get(callable_obj.__module__)
394 if actual_module: 394 ↛ 404line 394 didn't jump to line 404 because the condition on line 394 was always true
395 # Function's module globals should take precedence for type resolution
396 globalns = {
397 **vars(lazy_module),
398 **vars(config_module),
399 **vars(actual_module) # This overwrites with the actual module types
400 }
401 except Exception:
402 pass # Fall back to original globalns
404 import logging
405 logger = logging.getLogger(__name__)
407 try:
408 type_hints = get_type_hints(callable_obj, globalns=globalns)
409 logger.debug(f"🔍 SIG ANALYZER: get_type_hints succeeded for {callable_obj.__name__}: {type_hints}")
410 except (NameError, AttributeError) as e:
411 # If type hint resolution fails, try with just the function's original globals
412 try:
413 type_hints = get_type_hints(callable_obj, globalns=getattr(callable_obj, '__globals__', {}))
414 logger.debug(f"🔍 SIG ANALYZER: get_type_hints with __globals__ succeeded for {callable_obj.__name__}: {type_hints}")
415 except:
416 # If that still fails, fall back to __annotations__ directly
417 # This is critical for functions where type hints were added via docstring parsing
418 # (e.g., cucim functions where _enhance_annotations_from_docstring added types)
419 type_hints = getattr(callable_obj, '__annotations__', {})
420 logger.debug(f"🔍 SIG ANALYZER: Fell back to __annotations__ for {callable_obj.__name__}: {type_hints}")
421 except Exception as ex:
422 # For any other type hint resolution errors, fall back to __annotations__
423 # This ensures we don't lose type information that was added programmatically
424 type_hints = getattr(callable_obj, '__annotations__', {})
425 logger.debug(f"🔍 SIG ANALYZER: Exception {ex}, fell back to __annotations__ for {callable_obj.__name__}: {type_hints}")
429 # Extract docstring information (with fallback for robustness)
430 try:
431 docstring_info = DocstringExtractor.extract(callable_obj)
432 except:
433 docstring_info = None
435 if not docstring_info: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 docstring_info = DocstringInfo()
438 parameters = {}
439 param_list = list(sig.parameters.items())
441 # Determine skip behavior: explicit parameter overrides auto-detection
442 should_skip_first_param = (
443 skip_first_param if skip_first_param is not None
444 else SignatureAnalyzer._should_skip_first_parameter(callable_obj)
445 )
447 first_param_after_self_skipped = False
449 for i, (param_name, param) in enumerate(param_list):
450 # Always skip self/cls
451 if param_name in (CONSTANTS.SELF_PARAM, CONSTANTS.CLS_PARAM):
452 continue
454 # Always skip dunder parameters (internal/reserved fields)
455 if param_name.startswith(CONSTANTS.DUNDER_PREFIX) and param_name.endswith(CONSTANTS.DUNDER_SUFFIX): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true
456 continue
458 # Skip first parameter for image processing functions only
459 if should_skip_first_param and not first_param_after_self_skipped:
460 first_param_after_self_skipped = True
461 continue
463 # Handle **kwargs parameters - try to extract original function signature
464 if param.kind == inspect.Parameter.VAR_KEYWORD:
465 # Try to find the original function if this is a wrapper
466 original_params = SignatureAnalyzer._extract_original_parameters(callable_obj)
467 if original_params:
468 parameters.update(original_params)
469 continue
471 from typing import Any
472 param_type = type_hints.get(param_name, Any)
473 default_value = param.default if param.default != inspect.Parameter.empty else None
474 is_required = param.default == inspect.Parameter.empty
478 # Get parameter description from docstring
479 param_description = docstring_info.parameters.get(param_name) if docstring_info else None
481 parameters[param_name] = ParameterInfo(
482 name=param_name,
483 param_type=param_type,
484 default_value=default_value,
485 is_required=is_required,
486 description=param_description
487 )
489 return parameters
491 @staticmethod
492 def _should_skip_first_parameter(callable_obj: Callable) -> bool:
493 """
494 Determine if the first parameter should be skipped for any callable.
496 Universal logic that works with any object:
497 - Constructors (__init__ methods): don't skip (all params are configuration)
498 - All other callables: skip first param (assume it's data being processed)
499 """
500 # Check if this is any __init__ method (constructor)
501 if (hasattr(callable_obj, '__qualname__') and
502 callable_obj.__qualname__.endswith(CONSTANTS.INIT_METHOD_SUFFIX)):
503 return False
505 # Everything else: skip first parameter
506 return True
508 @staticmethod
509 def _extract_original_parameters(callable_obj: Callable) -> Dict[str, ParameterInfo]:
510 """
511 Extract parameters from the original function if this is a wrapper with **kwargs.
513 This handles cases where scikit-image or other auto-registered functions
514 are wrapped with (image, **kwargs) signatures.
515 """
516 try:
517 # Check if this function has access to the original function
518 # Common patterns: __wrapped__, closure variables, etc.
520 # Pattern 1: Check if it's a functools.wraps wrapper
521 if hasattr(callable_obj, '__wrapped__'):
522 return SignatureAnalyzer._analyze_callable(callable_obj.__wrapped__)
524 # Pattern 2: Check closure for original function reference
525 if hasattr(callable_obj, '__closure__') and callable_obj.__closure__: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 for cell in callable_obj.__closure__:
527 if hasattr(cell.cell_contents, '__call__'):
528 # Found a callable in closure - might be the original function
529 try:
530 orig_sig = inspect.signature(cell.cell_contents)
531 # Skip if it also has **kwargs (avoid infinite recursion)
532 if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in orig_sig.parameters.values()):
533 continue
534 return SignatureAnalyzer._analyze_callable(cell.cell_contents)
535 except:
536 continue
538 # Pattern 3: Try to extract from function name and module
539 # This is a fallback for scikit-image functions
540 if hasattr(callable_obj, '__name__') and hasattr(callable_obj, '__module__'): 540 ↛ 565line 540 didn't jump to line 565 because the condition on line 540 was always true
541 func_name = callable_obj.__name__
542 module_name = callable_obj.__module__
544 # Try to find the original function in scikit-image
545 if 'skimage' in module_name: 545 ↛ 565line 545 didn't jump to line 565 because the condition on line 545 was always true
546 try:
547 import importlib
548 # Extract the actual module path (remove wrapper module parts)
549 if 'scikit_image_registry' in module_name: 549 ↛ 551line 549 didn't jump to line 551 because the condition on line 549 was never true
550 # This is our wrapper, try to find the original in skimage
551 for skimage_module in ['skimage.filters', 'skimage.morphology',
552 'skimage.segmentation', 'skimage.feature',
553 'skimage.measure', 'skimage.transform',
554 'skimage.restoration', 'skimage.exposure']:
555 try:
556 mod = importlib.import_module(skimage_module)
557 if hasattr(mod, func_name):
558 orig_func = getattr(mod, func_name)
559 return SignatureAnalyzer._analyze_callable(orig_func)
560 except:
561 continue
562 except:
563 pass
565 return {}
567 except Exception:
568 return {}
570 @staticmethod
571 def _analyze_dataclass(dataclass_type: type) -> Dict[str, ParameterInfo]:
572 """Extract parameter information from dataclass fields."""
573 import logging
574 logger = logging.getLogger(__name__)
576 # PERFORMANCE: Check cache first to avoid expensive AST parsing
577 # Use the class object itself as the key (classes are hashable and have stable identity)
578 cache_key = dataclass_type
579 if cache_key in SignatureAnalyzer._dataclass_analysis_cache:
580 logger.info(f"✅ CACHE HIT for {dataclass_type.__name__} (id={id(dataclass_type)})")
581 return SignatureAnalyzer._dataclass_analysis_cache[cache_key]
583 logger.info(f"❌ CACHE MISS for {dataclass_type.__name__} (id={id(dataclass_type)}), cache has {len(SignatureAnalyzer._dataclass_analysis_cache)} entries")
585 try:
586 # Try to get type hints, fall back to __annotations__ if resolution fails
587 try:
588 type_hints = get_type_hints(dataclass_type)
589 except Exception:
590 # Fall back to __annotations__ for robustness
591 type_hints = getattr(dataclass_type, '__annotations__', {})
593 # Extract docstring information from dataclass
594 docstring_info = DocstringExtractor.extract(dataclass_type)
596 # Extract inline field documentation using AST
597 inline_docs = SignatureAnalyzer._extract_inline_field_docs(dataclass_type)
599 # ENHANCEMENT: For dataclasses modified by decorators (like GlobalPipelineConfig),
600 # also extract field documentation from the field types themselves
601 field_type_docs = SignatureAnalyzer._extract_field_type_docs(dataclass_type)
603 parameters = {}
605 for field in dataclasses.fields(dataclass_type):
606 param_type = type_hints.get(field.name, str)
608 # Get default value
609 if field.default != dataclasses.MISSING:
610 default_value = field.default
611 is_required = False
612 elif field.default_factory != dataclasses.MISSING:
613 default_value = field.default_factory()
614 is_required = False
615 else:
616 default_value = None
617 is_required = True
619 # Get field description from multiple sources (priority order)
620 field_description = None
622 # 1. Field metadata (highest priority)
623 if hasattr(field, 'metadata') and 'description' in field.metadata:
624 field_description = field.metadata['description']
625 # 2. Inline documentation strings (from AST parsing)
626 elif field.name in inline_docs:
627 field_description = inline_docs[field.name]
628 # 3. Field type documentation (for decorator-modified classes)
629 elif field.name in field_type_docs:
630 field_description = field_type_docs[field.name]
631 # 4. Docstring parameters (fallback)
632 elif docstring_info.parameters and field.name in docstring_info.parameters:
633 field_description = docstring_info.parameters.get(field.name)
634 # 5. CRITICAL FIX: Use inheritance-aware field documentation extraction
635 else:
636 field_description = SignatureAnalyzer.extract_field_documentation(dataclass_type, field.name)
638 parameters[field.name] = ParameterInfo(
639 name=field.name,
640 param_type=param_type,
641 default_value=default_value,
642 is_required=is_required,
643 description=field_description
644 )
646 # PERFORMANCE: Cache the result to avoid re-parsing
647 SignatureAnalyzer._dataclass_analysis_cache[cache_key] = parameters
648 return parameters
650 except Exception:
651 # Return empty dict on error (don't cache errors)
652 return {}
654 @staticmethod
655 def _extract_inline_field_docs(dataclass_type: type) -> Dict[str, str]:
656 """Extract inline field documentation strings using AST parsing.
658 This handles multiple patterns used for field documentation:
660 Pattern 1 - Next line string literal:
661 @dataclass
662 class Config:
663 field_name: str = "default"
664 '''Field description here.'''
666 Pattern 2 - Same line string literal (less common):
667 @dataclass
668 class Config:
669 field_name: str = "default" # '''Field description'''
671 Pattern 3 - Traditional docstring parameters (handled by DocstringExtractor):
672 @dataclass
673 class Config:
674 '''
675 Args:
676 field_name: Field description here.
677 '''
678 field_name: str = "default"
679 """
680 try:
681 import ast
682 import re
684 # Try to get source code - handle cases where it might not be available
685 source = None
686 try:
687 source = inspect.getsource(dataclass_type)
688 except (OSError, TypeError):
689 # ENHANCEMENT: For decorator-modified classes, try multiple source file strategies
690 try:
691 # Strategy 1: Try the file where the class is currently defined
692 source_file = inspect.getfile(dataclass_type)
693 with open(source_file, 'r', encoding='utf-8') as f:
694 file_content = f.read()
695 source = SignatureAnalyzer._extract_class_source_from_file(file_content, dataclass_type.__name__)
697 # Strategy 2: If that fails, try to find the original source file
698 # This handles decorator-modified classes where inspect.getfile() returns the wrong file
699 if not source:
700 try:
701 import os
702 source_dir = os.path.dirname(source_file)
704 # Try common source files in the same directory
705 candidate_files = []
707 # If the current file is lazy_config.py, try config.py
708 if source_file.endswith('lazy_config.py'):
709 candidate_files.append(os.path.join(source_dir, 'config.py'))
711 # Try other common patterns
712 for filename in os.listdir(source_dir):
713 if filename.endswith('.py') and filename != os.path.basename(source_file):
714 candidate_files.append(os.path.join(source_dir, filename))
716 # Try each candidate file
717 for candidate_file in candidate_files:
718 if os.path.exists(candidate_file):
719 with open(candidate_file, 'r', encoding='utf-8') as f:
720 candidate_content = f.read()
721 source = SignatureAnalyzer._extract_class_source_from_file(candidate_content, dataclass_type.__name__)
722 if source: # Found it!
723 break
724 except Exception:
725 pass
726 except Exception:
727 pass
729 if not source:
730 return {}
732 tree = ast.parse(source)
734 # Find the class definition - be more flexible with class name matching
735 class_node = None
736 target_class_name = dataclass_type.__name__
738 # Handle cases where the class might have been renamed or modified
739 for node in ast.walk(tree):
740 if isinstance(node, ast.ClassDef):
741 # Try exact match first
742 if node.name == target_class_name:
743 class_node = node
744 break
745 # Also try without common prefixes/suffixes that decorators might add
746 base_name = target_class_name.replace('Lazy', '').replace('Config', '')
747 node_base_name = node.name.replace('Lazy', '').replace('Config', '')
748 if base_name and node_base_name and base_name == node_base_name:
749 class_node = node
750 break
752 if not class_node:
753 return {}
755 field_docs = {}
756 source_lines = source.split('\n')
758 # Method 1: Look for field assignments followed by string literals (next line)
759 for i, node in enumerate(class_node.body):
760 if isinstance(node, ast.AnnAssign) and hasattr(node.target, 'id'):
761 field_name = node.target.id
763 # Check if the next node is a string literal (documentation)
764 if i + 1 < len(class_node.body):
765 next_node = class_node.body[i + 1]
766 if isinstance(next_node, ast.Expr):
767 # Handle both ast.Constant (Python 3.8+) and ast.Str (older versions)
768 if isinstance(next_node.value, ast.Constant) and isinstance(next_node.value.value, str):
769 field_docs[field_name] = next_node.value.value.strip()
770 continue
771 elif hasattr(ast, 'Str') and isinstance(next_node.value, ast.Str):
772 field_docs[field_name] = next_node.value.s.strip()
773 continue
775 # Method 2: Check for inline comments on the same line
776 # Get the line number of the field definition
777 field_line_num = node.lineno - 1 # Convert to 0-based indexing
778 if 0 <= field_line_num < len(source_lines):
779 line = source_lines[field_line_num]
781 # Look for string literals in comments on the same line
782 # Pattern: field: type = value # """Documentation"""
783 comment_match = re.search(r'#\s*["\']([^"\']+)["\']', line)
784 if comment_match:
785 field_docs[field_name] = comment_match.group(1).strip()
786 continue
788 # Look for triple-quoted strings on the same line
789 # Pattern: field: type = value """Documentation"""
790 triple_quote_match = re.search(r'"""([^"]+)"""|\'\'\'([^\']+)\'\'\'', line)
791 if triple_quote_match:
792 doc_text = triple_quote_match.group(1) or triple_quote_match.group(2)
793 field_docs[field_name] = doc_text.strip()
795 return field_docs
797 except Exception as e:
798 # Return empty dict if AST parsing fails
799 # Could add logging here for debugging: logger.debug(f"AST parsing failed: {e}")
800 return {}
802 @staticmethod
803 def _extract_field_type_docs(dataclass_type: type) -> Dict[str, str]:
804 """Extract field documentation from field types for decorator-modified dataclasses.
806 This handles cases where dataclasses have been modified by decorators (like @auto_create_decorator)
807 that inject fields from other dataclasses. In such cases, the AST parsing of the main class
808 won't find documentation for the injected fields, so we need to extract documentation from
809 the field types themselves.
811 For example, GlobalPipelineConfig has injected fields like 'path_planning_config' of type
812 PathPlanningConfig. We extract the class docstring from PathPlanningConfig to use as the
813 field description.
814 """
815 try:
816 import dataclasses
818 field_type_docs = {}
820 # Get all dataclass fields
821 if not dataclasses.is_dataclass(dataclass_type):
822 return {}
824 fields = dataclasses.fields(dataclass_type)
826 for field in fields:
827 # Check if this field's type is a dataclass
828 field_type = field.type
830 # Handle Optional types
831 if hasattr(field_type, '__origin__') and field_type.__origin__ is Union:
832 # Extract the non-None type from Optional[T]
833 args = field_type.__args__
834 non_none_types = [arg for arg in args if arg is not type(None)]
835 if len(non_none_types) == 1:
836 field_type = non_none_types[0]
838 # If the field type is a dataclass, extract its docstring as field documentation
839 if dataclasses.is_dataclass(field_type):
840 # ENHANCEMENT: Resolve lazy dataclasses to their base classes for documentation
841 resolved_field_type = SignatureAnalyzer._resolve_lazy_dataclass_for_docs(field_type)
843 docstring_info = DocstringExtractor.extract(resolved_field_type)
844 if docstring_info.summary:
845 field_type_docs[field.name] = docstring_info.summary
846 elif docstring_info.description:
847 # Use first line of description if no summary
848 first_line = docstring_info.description.split('\n')[0].strip()
849 if first_line:
850 field_type_docs[field.name] = first_line
852 return field_type_docs
854 except Exception as e:
855 # Return empty dict if extraction fails
856 return {}
858 @staticmethod
859 def _extract_class_source_from_file(file_content: str, class_name: str) -> Optional[str]:
860 """Extract the source code for a specific class from a file.
862 This method is used when inspect.getsource() fails (e.g., for decorator-modified classes)
863 to extract the class definition directly from the source file.
865 Args:
866 file_content: The content of the source file
867 class_name: The name of the class to extract
869 Returns:
870 The source code for the class, or None if not found
871 """
872 try:
873 lines = file_content.split('\n')
874 class_lines = []
875 in_class = False
876 class_indent = 0
878 for line in lines:
879 # Look for the class definition
880 if line.strip().startswith(f'class {class_name}'):
881 in_class = True
882 class_indent = len(line) - len(line.lstrip())
883 class_lines.append(line)
884 elif in_class:
885 # Check if we've reached the end of the class
886 if line.strip() and not line.startswith(' ') and not line.startswith('\t'):
887 # Non-indented line that's not empty - end of class
888 break
889 elif line.strip() and len(line) - len(line.lstrip()) <= class_indent:
890 # Line at same or less indentation than class - end of class
891 break
892 else:
893 # Still inside the class
894 class_lines.append(line)
896 if class_lines:
897 return '\n'.join(class_lines)
898 return None
900 except Exception:
901 return None
903 @staticmethod
904 def extract_field_documentation(dataclass_type: type, field_name: str) -> Optional[str]:
905 """Extract documentation for a specific field from a dataclass.
907 This method tries multiple approaches to find documentation for a specific field:
908 1. Inline field documentation (AST parsing)
909 2. Field type documentation (for nested dataclasses)
910 3. Docstring parameters
911 4. Field metadata
913 Args:
914 dataclass_type: The dataclass type containing the field
915 field_name: Name of the field to get documentation for
917 Returns:
918 Field documentation string, or None if not found
919 """
920 try:
921 import dataclasses
923 if not dataclasses.is_dataclass(dataclass_type):
924 return None
926 # ENHANCEMENT: Resolve lazy dataclasses to their base classes
927 # PipelineConfig should resolve to GlobalPipelineConfig for documentation
928 resolved_type = SignatureAnalyzer._resolve_lazy_dataclass_for_docs(dataclass_type)
930 # Check cache first for performance
931 cache_key = (resolved_type.__name__, resolved_type.__module__)
932 if cache_key not in SignatureAnalyzer._field_docs_cache:
933 # Extract all field documentation for this dataclass and cache it
934 SignatureAnalyzer._field_docs_cache[cache_key] = SignatureAnalyzer._extract_all_field_docs(resolved_type)
936 cached_docs = SignatureAnalyzer._field_docs_cache[cache_key]
937 if field_name in cached_docs:
938 return cached_docs[field_name]
940 return None
942 except Exception:
943 return None
945 @staticmethod
946 def _resolve_lazy_dataclass_for_docs(dataclass_type: type) -> type:
947 """Resolve lazy dataclasses to their base classes for documentation extraction.
949 This handles the case where PipelineConfig (lazy) should resolve to GlobalPipelineConfig
950 for documentation purposes.
952 Args:
953 dataclass_type: The dataclass type (potentially lazy)
955 Returns:
956 The resolved dataclass type for documentation extraction
957 """
958 try:
959 # Check if this is a lazy dataclass by looking for common patterns
960 class_name = dataclass_type.__name__
962 # Handle PipelineConfig -> GlobalPipelineConfig
963 if class_name == 'PipelineConfig':
964 try:
965 from openhcs.core.config import GlobalPipelineConfig
966 return GlobalPipelineConfig
967 except ImportError:
968 pass
970 # Handle LazyXxxConfig -> XxxConfig mappings
971 if class_name.startswith('Lazy') and class_name.endswith('Config'):
972 try:
973 # Remove 'Lazy' prefix: LazyWellFilterConfig -> WellFilterConfig
974 base_class_name = class_name[4:] # Remove 'Lazy'
976 # Try to import from openhcs.core.config
977 from openhcs.core import config as config_module
978 if hasattr(config_module, base_class_name):
979 return getattr(config_module, base_class_name)
980 except (ImportError, AttributeError):
981 pass
983 # For other lazy dataclasses, try to find the Global version
984 if not class_name.startswith('Global') and class_name.endswith('Config'):
985 try:
986 # Try to find GlobalXxxConfig version
987 global_class_name = f'Global{class_name}'
988 module = __import__(dataclass_type.__module__, fromlist=[global_class_name])
989 if hasattr(module, global_class_name):
990 return getattr(module, global_class_name)
991 except (ImportError, AttributeError):
992 pass
994 # If no resolution found, return the original type
995 return dataclass_type
997 except Exception:
998 return dataclass_type
1000 @staticmethod
1001 def _extract_all_field_docs(dataclass_type: type) -> Dict[str, str]:
1002 """Extract all field documentation for a dataclass and return as a dictionary.
1004 This method combines all documentation extraction approaches and caches the results.
1006 Args:
1007 dataclass_type: The dataclass type to extract documentation from
1009 Returns:
1010 Dictionary mapping field names to their documentation
1011 """
1012 all_docs = {}
1014 try:
1015 import dataclasses
1017 # Try inline field documentation first
1018 inline_docs = SignatureAnalyzer._extract_inline_field_docs(dataclass_type)
1019 all_docs.update(inline_docs)
1021 # Try field type documentation (for nested dataclasses)
1022 field_type_docs = SignatureAnalyzer._extract_field_type_docs(dataclass_type)
1023 for field_name, doc in field_type_docs.items():
1024 if field_name not in all_docs: # Don't overwrite inline docs
1025 all_docs[field_name] = doc
1027 # Try docstring parameters
1028 docstring_info = DocstringExtractor.extract(dataclass_type)
1029 if docstring_info.parameters:
1030 for field_name, doc in docstring_info.parameters.items():
1031 if field_name not in all_docs: # Don't overwrite previous docs
1032 all_docs[field_name] = doc
1034 # Try field metadata
1035 fields = dataclasses.fields(dataclass_type)
1036 for field in fields:
1037 if field.name not in all_docs: # Don't overwrite previous docs
1038 if hasattr(field, 'metadata') and 'description' in field.metadata:
1039 all_docs[field.name] = field.metadata['description']
1041 # ENHANCEMENT: Try inheritance - check parent classes for missing field documentation
1042 for field in fields:
1043 if field.name not in all_docs: # Only for fields still missing documentation
1044 # Walk up the inheritance chain
1045 for base_class in dataclass_type.__mro__[1:]: # Skip the class itself
1046 if base_class == object:
1047 continue
1048 if dataclasses.is_dataclass(base_class):
1049 # Check if this base class has the field with documentation
1050 try:
1051 base_fields = dataclasses.fields(base_class)
1052 base_field_names = [f.name for f in base_fields]
1053 if field.name in base_field_names:
1054 # Try to get documentation from the base class
1055 inherited_doc = SignatureAnalyzer.extract_field_documentation(base_class, field.name)
1056 if inherited_doc:
1057 all_docs[field.name] = inherited_doc
1058 break # Found documentation, stop looking
1059 except Exception:
1060 continue # Try next base class
1062 except Exception:
1063 pass # Return whatever we managed to extract
1065 return all_docs
1067 @staticmethod
1068 def extract_field_documentation_from_context(field_name: str, context_types: list[type]) -> Optional[str]:
1069 """Extract field documentation by searching through multiple dataclass types.
1071 This method is useful when you don't know exactly which dataclass contains
1072 a field, but you have a list of candidate types to search through.
1074 Args:
1075 field_name: Name of the field to get documentation for
1076 context_types: List of dataclass types to search through
1078 Returns:
1079 Field documentation string, or None if not found
1080 """
1081 for dataclass_type in context_types:
1082 if dataclass_type:
1083 doc = SignatureAnalyzer.extract_field_documentation(dataclass_type, field_name)
1084 if doc:
1085 return doc
1086 return None
1088 @staticmethod
1089 def _analyze_dataclass_instance(instance: object) -> Dict[str, ParameterInfo]:
1090 """Extract parameter information from a dataclass instance."""
1091 try:
1092 # Get the type and analyze it
1093 dataclass_type = type(instance)
1094 parameters = SignatureAnalyzer._analyze_dataclass(dataclass_type)
1096 # Update default values with current instance values
1097 # For lazy dataclasses, use object.__getattribute__ to preserve None values for placeholders
1098 for name, param_info in parameters.items():
1099 if hasattr(instance, name):
1100 # Check if this is a lazy dataclass that should preserve None values
1101 if hasattr(instance, '_resolve_field_value'):
1102 # This is a lazy dataclass - use object.__getattribute__ to get stored value
1103 current_value = object.__getattribute__(instance, name)
1104 else:
1105 # Regular dataclass - use normal getattr
1106 current_value = getattr(instance, name)
1108 # Create new ParameterInfo with current value as default
1109 parameters[name] = ParameterInfo(
1110 name=param_info.name,
1111 param_type=param_info.param_type,
1112 default_value=current_value,
1113 is_required=param_info.is_required,
1114 description=param_info.description
1115 )
1117 return parameters
1119 except Exception:
1120 return {}
1122 # Duplicate method removed - using the fixed version above