Coverage for openhcs/introspection/signature_analyzer.py: 37.7%

589 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1# File: openhcs/introspection/signature_analyzer.py 

2 

3import ast 

4import inspect 

5import dataclasses 

6import re 

7from typing import Any, Dict, Callable, get_type_hints, NamedTuple, Union, Optional, Type 

8from dataclasses import dataclass 

9 

10# Lazy imports for OpenHCS-specific type resolution (optional dependency) 

11# These are only imported when needed for type hint resolution 

12_lazy_module = None 

13_config_module = None 

14 

15 

16def _get_openhcs_modules(): 

17 """Lazy-load OpenHCS-specific modules for type resolution.""" 

18 global _lazy_module, _config_module 

19 if _lazy_module is None: 

20 try: 

21 import openhcs.config_framework.lazy_factory as lazy_module 

22 import openhcs.core.config as config_module 

23 _lazy_module = lazy_module 

24 _config_module = config_module 

25 except ImportError: 

26 # If OpenHCS modules aren't available, return empty dicts 

27 _lazy_module = type('EmptyModule', (), {})() 

28 _config_module = type('EmptyModule', (), {})() 

29 return _lazy_module, _config_module 

30 

31 

32@dataclass(frozen=True) 

33class AnalysisConstants: 

34 """Constants for signature analysis to eliminate magic strings.""" 

35 INIT_METHOD_SUFFIX: str = ".__init__" 

36 SELF_PARAM: str = "self" 

37 CLS_PARAM: str = "cls" 

38 DUNDER_PREFIX: str = "__" 

39 DUNDER_SUFFIX: str = "__" 

40 

41 

42# Create constants instance for use throughout the module 

43CONSTANTS = AnalysisConstants() 

44 

45 

46class ParameterInfo(NamedTuple): 

47 """Information about a parameter.""" 

48 name: str 

49 param_type: type 

50 default_value: Any 

51 is_required: bool 

52 description: Optional[str] = None # Add parameter description from docstring 

53 

54class DocstringInfo(NamedTuple): 

55 """Information extracted from a docstring.""" 

56 summary: Optional[str] = None # First line or brief description 

57 description: Optional[str] = None # Full description 

58 parameters: Dict[str, str] = None # Parameter name -> description mapping 

59 returns: Optional[str] = None # Return value description 

60 examples: Optional[str] = None # Usage examples 

61 

62class DocstringExtractor: 

63 """Extract structured information from docstrings.""" 

64 

65 @staticmethod 

66 def extract(target: Union[Callable, type]) -> DocstringInfo: 

67 """Extract docstring information from function or class. 

68 

69 Args: 

70 target: Function, method, or class to extract docstring from 

71 

72 Returns: 

73 DocstringInfo with parsed docstring components 

74 """ 

75 if not target: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 return DocstringInfo() 

77 

78 # ENHANCEMENT: Handle lazy dataclasses by extracting from their base class 

79 actual_target = DocstringExtractor._resolve_lazy_target(target) 

80 

81 docstring = inspect.getdoc(actual_target) 

82 if not docstring: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 return DocstringInfo() 

84 

85 # Try AST-based parsing first for better accuracy 

86 try: 

87 return DocstringExtractor._parse_docstring_ast(actual_target, docstring) 

88 except Exception: 

89 # Fall back to regex-based parsing 

90 return DocstringExtractor._parse_docstring(docstring) 

91 

92 @staticmethod 

93 def _resolve_lazy_target(target: Union[Callable, type]) -> Union[Callable, type]: 

94 """Resolve lazy dataclass to its base class for docstring extraction. 

95 

96 Lazy dataclasses are dynamically created and may not have proper docstrings. 

97 This method attempts to find the original base class that the lazy class 

98 was created from. 

99 """ 

100 if not hasattr(target, '__name__'): 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true

101 return target 

102 

103 # Check if this looks like a lazy dataclass (starts with "Lazy") 

104 if target.__name__.startswith('Lazy'): 104 ↛ 106line 104 didn't jump to line 106 because the condition on line 104 was never true

105 # Try to find the base class in the MRO 

106 for base in getattr(target, '__mro__', []): 

107 if base != target and base.__name__ != 'object': 

108 # Found a base class that's not the lazy class itself 

109 if not base.__name__.startswith('Lazy'): 

110 return base 

111 

112 return target 

113 

114 @staticmethod 

115 def _parse_docstring_ast(target: Union[Callable, type], docstring: str) -> DocstringInfo: 

116 """Parse docstring using AST for more accurate extraction. 

117 

118 This method uses AST to parse the source code and extract docstring 

119 information more accurately, especially for complex multiline descriptions. 

120 """ 

121 try: 

122 # Get source code 

123 source = inspect.getsource(target) 

124 tree = ast.parse(source) 

125 

126 # Find the function/class node 

127 for node in ast.walk(tree): 127 ↛ 133line 127 didn't jump to line 133 because the loop on line 127 didn't complete

128 if isinstance(node, (ast.FunctionDef, ast.ClassDef)): 

129 if ast.get_docstring(node) == docstring: 129 ↛ 127line 129 didn't jump to line 127 because the condition on line 129 was always true

130 return DocstringExtractor._parse_ast_docstring(node, docstring) 

131 

132 # Fallback to regex parsing if AST parsing fails 

133 return DocstringExtractor._parse_docstring(docstring) 

134 

135 except Exception: 

136 # Fallback to regex parsing 

137 return DocstringExtractor._parse_docstring(docstring) 

138 

139 @staticmethod 

140 def _parse_ast_docstring(node: Union[ast.FunctionDef, ast.ClassDef], docstring: str) -> DocstringInfo: 

141 """Parse docstring from AST node with enhanced multiline support.""" 

142 # For now, use the improved regex parser 

143 # This can be extended later with more sophisticated AST-based parsing 

144 return DocstringExtractor._parse_docstring(docstring) 

145 

146 @staticmethod 

147 def _parse_docstring(docstring: str) -> DocstringInfo: 

148 """Parse a docstring into structured components with improved multiline support. 

149 

150 Supports multiple docstring formats: 

151 - Google style (Args:, Returns:, Examples:) 

152 - NumPy style (Parameters, Returns, Examples) 

153 - Sphinx style (:param name:, :returns:) 

154 - Simple format (just description) 

155 

156 Uses improved parsing for multiline parameter descriptions that continues 

157 until a blank line or new parameter/section is encountered. 

158 """ 

159 lines = docstring.strip().split('\n') 

160 

161 summary = None 

162 description_lines = [] 

163 parameters = {} 

164 returns = None 

165 examples = None 

166 

167 current_section = 'description' 

168 current_param = None 

169 current_param_lines = [] 

170 

171 def _finalize_current_param(): 

172 """Finalize the current parameter description.""" 

173 if current_param and current_param_lines: 

174 param_desc = '\n'.join(current_param_lines).strip() 

175 parameters[current_param] = param_desc 

176 

177 for i, line in enumerate(lines): 

178 original_line = line 

179 line = line.strip() 

180 

181 # Handle both Google/Sphinx style (with colons) and NumPy style (without colons) 

182 if line.lower() in ('args:', 'arguments:', 'parameters:'): 

183 _finalize_current_param() 

184 current_param = None 

185 current_param_lines = [] 

186 current_section = 'parameters' 

187 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 continue 

189 continue 

190 elif line.lower() in ('args', 'arguments', 'parameters') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'): 

191 # NumPy-style section headers (without colons, followed by dashes) 

192 _finalize_current_param() 

193 current_param = None 

194 current_param_lines = [] 

195 current_section = 'parameters' 

196 continue 

197 elif line.lower() in ('returns:', 'return:'): 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 _finalize_current_param() 

199 current_param = None 

200 current_param_lines = [] 

201 current_section = 'returns' 

202 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator 

203 continue 

204 continue 

205 elif line.lower() in ('returns', 'return') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'): 

206 # NumPy-style returns section 

207 _finalize_current_param() 

208 current_param = None 

209 current_param_lines = [] 

210 current_section = 'returns' 

211 continue 

212 elif line.lower() in ('examples:', 'example:'): 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 _finalize_current_param() 

214 current_param = None 

215 current_param_lines = [] 

216 current_section = 'examples' 

217 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator 

218 continue 

219 continue 

220 elif line.lower() in ('examples', 'example') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'): 

221 # NumPy-style examples section 

222 _finalize_current_param() 

223 current_param = None 

224 current_param_lines = [] 

225 current_section = 'examples' 

226 continue 

227 

228 if current_section == 'description': 

229 if not summary and line: 

230 summary = line 

231 else: 

232 description_lines.append(original_line) # Keep original indentation 

233 

234 elif current_section == 'parameters': 

235 # Enhanced parameter parsing to handle multiple formats 

236 param_match_google = re.match(r'^(\w+):\s*(.+)', line) 

237 param_match_sphinx = re.match(r'^:param\s+(\w+):\s*(.+)', line) 

238 param_match_numpy = re.match(r'^(\w+)\s*:\s*(.+)', line) 

239 # New: Handle pyclesperanto-style inline parameters (param_name: type description) 

240 param_match_inline = re.match(r'^(\w+):\s*(\w+(?:\[.*?\])?|\w+(?:\s*\|\s*\w+)*)\s+(.+)', line) 

241 # New: Handle parameters that start with bullet points or dashes 

242 param_match_bullet = re.match(r'^[-•*]\s*(\w+):\s*(.+)', line) 

243 

244 if param_match_google or param_match_sphinx or param_match_numpy or param_match_inline or param_match_bullet: 

245 _finalize_current_param() 

246 

247 if param_match_google: 

248 param_name, param_desc = param_match_google.groups() 

249 elif param_match_sphinx: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true

250 param_name, param_desc = param_match_sphinx.groups() 

251 elif param_match_numpy: 251 ↛ 253line 251 didn't jump to line 253 because the condition on line 251 was always true

252 param_name, param_desc = param_match_numpy.groups() 

253 elif param_match_inline: 

254 param_name, param_type, param_desc = param_match_inline.groups() 

255 param_desc = f"{param_type} - {param_desc}" # Include type in description 

256 elif param_match_bullet: 

257 param_name, param_desc = param_match_bullet.groups() 

258 

259 current_param = param_name 

260 current_param_lines = [param_desc.strip()] 

261 elif current_param and (original_line.startswith(' ') or original_line.startswith('\t')): 

262 # Indented continuation line 

263 current_param_lines.append(line) 

264 elif not line: 

265 _finalize_current_param() 

266 current_param = None 

267 current_param_lines = [] 

268 elif current_param: 

269 # Non-indented continuation line (part of the same block) 

270 current_param_lines.append(line) 

271 else: 

272 # Try to parse inline parameter definitions in a single block 

273 # This handles cases where parameters are listed without clear separation 

274 inline_params = DocstringExtractor._parse_inline_parameters(line) 

275 for param_name, param_desc in inline_params.items(): 275 ↛ 276line 275 didn't jump to line 276 because the loop on line 275 never started

276 parameters[param_name] = param_desc 

277 

278 elif current_section == 'returns': 

279 if returns is None: 

280 returns = line 

281 else: 

282 returns += '\n' + line 

283 

284 elif current_section == 'examples': 284 ↛ 177line 284 didn't jump to line 177 because the condition on line 284 was always true

285 if examples is None: 

286 examples = line 

287 else: 

288 examples += '\n' + line 

289 

290 _finalize_current_param() 

291 

292 description = '\n'.join(description_lines).strip() 

293 if description == summary: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 description = None 

295 

296 return DocstringInfo( 

297 summary=summary, 

298 description=description, 

299 parameters=parameters or {}, 

300 returns=returns, 

301 examples=examples 

302 ) 

303 

304 @staticmethod 

305 def _parse_inline_parameters(line: str) -> Dict[str, str]: 

306 """Parse parameters from a single line containing multiple parameter definitions. 

307 

308 Handles formats like: 

309 - "input_image: Image Input image to process. footprint: Image Structuring element..." 

310 - "param1: type1 description1. param2: type2 description2." 

311 """ 

312 parameters = {} 

313 

314 import re 

315 

316 # Strategy: Use a flexible pattern that works with the pyclesperanto format 

317 # Pattern matches: param_name: everything up to the next param_name: or end of string 

318 param_pattern = r'(\w+):\s*([^:]*?)(?=\s+\w+:|$)' 

319 matches = re.findall(param_pattern, line) 

320 

321 for param_name, param_desc in matches: 

322 if param_desc.strip(): 322 ↛ 324line 322 didn't jump to line 324 because the condition on line 322 was never true

323 # Clean up the description (remove trailing periods, extra whitespace) 

324 clean_desc = param_desc.strip().rstrip('.') 

325 parameters[param_name] = clean_desc 

326 

327 return parameters 

328 

329 

330class SignatureAnalyzer: 

331 """Universal analyzer for extracting parameter information from any target.""" 

332 

333 # Class-level cache for field documentation to avoid re-parsing 

334 _field_docs_cache = {} 

335 

336 # Class-level cache for dataclass analysis results to avoid expensive AST parsing 

337 _dataclass_analysis_cache = {} 

338 

339 @staticmethod 

340 def analyze(target: Union[Callable, Type, object], skip_first_param: Optional[bool] = None) -> Dict[str, ParameterInfo]: 

341 """Extract parameter information from any target: function, constructor, dataclass, or instance. 

342 

343 Args: 

344 target: Function, constructor, dataclass type, or dataclass instance 

345 skip_first_param: Whether to skip the first parameter (after self/cls). 

346 If None, auto-detects based on context: 

347 - False for step constructors (all params are configuration) 

348 - True for image processing functions (first param is image data) 

349 

350 Returns: 

351 Dict mapping parameter names to ParameterInfo 

352 """ 

353 if not target: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 return {} 

355 

356 # Dispatch based on target type 

357 if inspect.isclass(target): 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 if dataclasses.is_dataclass(target): 

359 return SignatureAnalyzer._analyze_dataclass(target) 

360 else: 

361 # Try to analyze constructor 

362 return SignatureAnalyzer._analyze_callable(target.__init__, skip_first_param) 

363 elif dataclasses.is_dataclass(target): 363 ↛ 365line 363 didn't jump to line 365 because the condition on line 363 was never true

364 # Instance of dataclass 

365 return SignatureAnalyzer._analyze_dataclass_instance(target) 

366 else: 

367 # Function, method, or other callable 

368 return SignatureAnalyzer._analyze_callable(target, skip_first_param) 

369 

370 @staticmethod 

371 def _analyze_callable(callable_obj: Callable, skip_first_param: Optional[bool] = None) -> Dict[str, ParameterInfo]: 

372 """Extract parameter information from callable signature. 

373 

374 Args: 

375 callable_obj: The callable to analyze 

376 skip_first_param: Whether to skip the first parameter (after self/cls). 

377 If None, auto-detects based on context. 

378 """ 

379 sig = inspect.signature(callable_obj) 

380 # Build comprehensive namespace for forward reference resolution 

381 # Start with function's globals (which contain the actual types), then add our modules as fallback 

382 lazy_module, config_module = _get_openhcs_modules() 

383 globalns = { 

384 **vars(lazy_module), 

385 **vars(config_module), 

386 **getattr(callable_obj, '__globals__', {}) 

387 } 

388 

389 # For OpenHCS functions, prioritize the function's actual module globals 

390 if hasattr(callable_obj, '__module__') and callable_obj.__module__: 390 ↛ 404line 390 didn't jump to line 404 because the condition on line 390 was always true

391 try: 

392 import sys 

393 actual_module = sys.modules.get(callable_obj.__module__) 

394 if actual_module: 394 ↛ 404line 394 didn't jump to line 404 because the condition on line 394 was always true

395 # Function's module globals should take precedence for type resolution 

396 globalns = { 

397 **vars(lazy_module), 

398 **vars(config_module), 

399 **vars(actual_module) # This overwrites with the actual module types 

400 } 

401 except Exception: 

402 pass # Fall back to original globalns 

403 

404 import logging 

405 logger = logging.getLogger(__name__) 

406 

407 try: 

408 type_hints = get_type_hints(callable_obj, globalns=globalns) 

409 logger.debug(f"🔍 SIG ANALYZER: get_type_hints succeeded for {callable_obj.__name__}: {type_hints}") 

410 except (NameError, AttributeError) as e: 

411 # If type hint resolution fails, try with just the function's original globals 

412 try: 

413 type_hints = get_type_hints(callable_obj, globalns=getattr(callable_obj, '__globals__', {})) 

414 logger.debug(f"🔍 SIG ANALYZER: get_type_hints with __globals__ succeeded for {callable_obj.__name__}: {type_hints}") 

415 except: 

416 # If that still fails, fall back to __annotations__ directly 

417 # This is critical for functions where type hints were added via docstring parsing 

418 # (e.g., cucim functions where _enhance_annotations_from_docstring added types) 

419 type_hints = getattr(callable_obj, '__annotations__', {}) 

420 logger.debug(f"🔍 SIG ANALYZER: Fell back to __annotations__ for {callable_obj.__name__}: {type_hints}") 

421 except Exception as ex: 

422 # For any other type hint resolution errors, fall back to __annotations__ 

423 # This ensures we don't lose type information that was added programmatically 

424 type_hints = getattr(callable_obj, '__annotations__', {}) 

425 logger.debug(f"🔍 SIG ANALYZER: Exception {ex}, fell back to __annotations__ for {callable_obj.__name__}: {type_hints}") 

426 

427 

428 

429 # Extract docstring information (with fallback for robustness) 

430 try: 

431 docstring_info = DocstringExtractor.extract(callable_obj) 

432 except: 

433 docstring_info = None 

434 

435 if not docstring_info: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 docstring_info = DocstringInfo() 

437 

438 parameters = {} 

439 param_list = list(sig.parameters.items()) 

440 

441 # Determine skip behavior: explicit parameter overrides auto-detection 

442 should_skip_first_param = ( 

443 skip_first_param if skip_first_param is not None 

444 else SignatureAnalyzer._should_skip_first_parameter(callable_obj) 

445 ) 

446 

447 first_param_after_self_skipped = False 

448 

449 for i, (param_name, param) in enumerate(param_list): 

450 # Always skip self/cls 

451 if param_name in (CONSTANTS.SELF_PARAM, CONSTANTS.CLS_PARAM): 

452 continue 

453 

454 # Always skip dunder parameters (internal/reserved fields) 

455 if param_name.startswith(CONSTANTS.DUNDER_PREFIX) and param_name.endswith(CONSTANTS.DUNDER_SUFFIX): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 continue 

457 

458 # Skip first parameter for image processing functions only 

459 if should_skip_first_param and not first_param_after_self_skipped: 

460 first_param_after_self_skipped = True 

461 continue 

462 

463 # Handle **kwargs parameters - try to extract original function signature 

464 if param.kind == inspect.Parameter.VAR_KEYWORD: 

465 # Try to find the original function if this is a wrapper 

466 original_params = SignatureAnalyzer._extract_original_parameters(callable_obj) 

467 if original_params: 

468 parameters.update(original_params) 

469 continue 

470 

471 from typing import Any 

472 param_type = type_hints.get(param_name, Any) 

473 default_value = param.default if param.default != inspect.Parameter.empty else None 

474 is_required = param.default == inspect.Parameter.empty 

475 

476 

477 

478 # Get parameter description from docstring 

479 param_description = docstring_info.parameters.get(param_name) if docstring_info else None 

480 

481 parameters[param_name] = ParameterInfo( 

482 name=param_name, 

483 param_type=param_type, 

484 default_value=default_value, 

485 is_required=is_required, 

486 description=param_description 

487 ) 

488 

489 return parameters 

490 

491 @staticmethod 

492 def _should_skip_first_parameter(callable_obj: Callable) -> bool: 

493 """ 

494 Determine if the first parameter should be skipped for any callable. 

495 

496 Universal logic that works with any object: 

497 - Constructors (__init__ methods): don't skip (all params are configuration) 

498 - All other callables: skip first param (assume it's data being processed) 

499 """ 

500 # Check if this is any __init__ method (constructor) 

501 if (hasattr(callable_obj, '__qualname__') and 

502 callable_obj.__qualname__.endswith(CONSTANTS.INIT_METHOD_SUFFIX)): 

503 return False 

504 

505 # Everything else: skip first parameter 

506 return True 

507 

508 @staticmethod 

509 def _extract_original_parameters(callable_obj: Callable) -> Dict[str, ParameterInfo]: 

510 """ 

511 Extract parameters from the original function if this is a wrapper with **kwargs. 

512 

513 This handles cases where scikit-image or other auto-registered functions 

514 are wrapped with (image, **kwargs) signatures. 

515 """ 

516 try: 

517 # Check if this function has access to the original function 

518 # Common patterns: __wrapped__, closure variables, etc. 

519 

520 # Pattern 1: Check if it's a functools.wraps wrapper 

521 if hasattr(callable_obj, '__wrapped__'): 

522 return SignatureAnalyzer._analyze_callable(callable_obj.__wrapped__) 

523 

524 # Pattern 2: Check closure for original function reference 

525 if hasattr(callable_obj, '__closure__') and callable_obj.__closure__: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 for cell in callable_obj.__closure__: 

527 if hasattr(cell.cell_contents, '__call__'): 

528 # Found a callable in closure - might be the original function 

529 try: 

530 orig_sig = inspect.signature(cell.cell_contents) 

531 # Skip if it also has **kwargs (avoid infinite recursion) 

532 if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in orig_sig.parameters.values()): 

533 continue 

534 return SignatureAnalyzer._analyze_callable(cell.cell_contents) 

535 except: 

536 continue 

537 

538 # Pattern 3: Try to extract from function name and module 

539 # This is a fallback for scikit-image functions 

540 if hasattr(callable_obj, '__name__') and hasattr(callable_obj, '__module__'): 540 ↛ 565line 540 didn't jump to line 565 because the condition on line 540 was always true

541 func_name = callable_obj.__name__ 

542 module_name = callable_obj.__module__ 

543 

544 # Try to find the original function in scikit-image 

545 if 'skimage' in module_name: 545 ↛ 565line 545 didn't jump to line 565 because the condition on line 545 was always true

546 try: 

547 import importlib 

548 # Extract the actual module path (remove wrapper module parts) 

549 if 'scikit_image_registry' in module_name: 549 ↛ 551line 549 didn't jump to line 551 because the condition on line 549 was never true

550 # This is our wrapper, try to find the original in skimage 

551 for skimage_module in ['skimage.filters', 'skimage.morphology', 

552 'skimage.segmentation', 'skimage.feature', 

553 'skimage.measure', 'skimage.transform', 

554 'skimage.restoration', 'skimage.exposure']: 

555 try: 

556 mod = importlib.import_module(skimage_module) 

557 if hasattr(mod, func_name): 

558 orig_func = getattr(mod, func_name) 

559 return SignatureAnalyzer._analyze_callable(orig_func) 

560 except: 

561 continue 

562 except: 

563 pass 

564 

565 return {} 

566 

567 except Exception: 

568 return {} 

569 

570 @staticmethod 

571 def _analyze_dataclass(dataclass_type: type) -> Dict[str, ParameterInfo]: 

572 """Extract parameter information from dataclass fields.""" 

573 import logging 

574 logger = logging.getLogger(__name__) 

575 

576 # PERFORMANCE: Check cache first to avoid expensive AST parsing 

577 # Use the class object itself as the key (classes are hashable and have stable identity) 

578 cache_key = dataclass_type 

579 if cache_key in SignatureAnalyzer._dataclass_analysis_cache: 

580 logger.info(f"✅ CACHE HIT for {dataclass_type.__name__} (id={id(dataclass_type)})") 

581 return SignatureAnalyzer._dataclass_analysis_cache[cache_key] 

582 

583 logger.info(f"❌ CACHE MISS for {dataclass_type.__name__} (id={id(dataclass_type)}), cache has {len(SignatureAnalyzer._dataclass_analysis_cache)} entries") 

584 

585 try: 

586 # Try to get type hints, fall back to __annotations__ if resolution fails 

587 try: 

588 type_hints = get_type_hints(dataclass_type) 

589 except Exception: 

590 # Fall back to __annotations__ for robustness 

591 type_hints = getattr(dataclass_type, '__annotations__', {}) 

592 

593 # Extract docstring information from dataclass 

594 docstring_info = DocstringExtractor.extract(dataclass_type) 

595 

596 # Extract inline field documentation using AST 

597 inline_docs = SignatureAnalyzer._extract_inline_field_docs(dataclass_type) 

598 

599 # ENHANCEMENT: For dataclasses modified by decorators (like GlobalPipelineConfig), 

600 # also extract field documentation from the field types themselves 

601 field_type_docs = SignatureAnalyzer._extract_field_type_docs(dataclass_type) 

602 

603 parameters = {} 

604 

605 for field in dataclasses.fields(dataclass_type): 

606 param_type = type_hints.get(field.name, str) 

607 

608 # Get default value 

609 if field.default != dataclasses.MISSING: 

610 default_value = field.default 

611 is_required = False 

612 elif field.default_factory != dataclasses.MISSING: 

613 default_value = field.default_factory() 

614 is_required = False 

615 else: 

616 default_value = None 

617 is_required = True 

618 

619 # Get field description from multiple sources (priority order) 

620 field_description = None 

621 

622 # 1. Field metadata (highest priority) 

623 if hasattr(field, 'metadata') and 'description' in field.metadata: 

624 field_description = field.metadata['description'] 

625 # 2. Inline documentation strings (from AST parsing) 

626 elif field.name in inline_docs: 

627 field_description = inline_docs[field.name] 

628 # 3. Field type documentation (for decorator-modified classes) 

629 elif field.name in field_type_docs: 

630 field_description = field_type_docs[field.name] 

631 # 4. Docstring parameters (fallback) 

632 elif docstring_info.parameters and field.name in docstring_info.parameters: 

633 field_description = docstring_info.parameters.get(field.name) 

634 # 5. CRITICAL FIX: Use inheritance-aware field documentation extraction 

635 else: 

636 field_description = SignatureAnalyzer.extract_field_documentation(dataclass_type, field.name) 

637 

638 parameters[field.name] = ParameterInfo( 

639 name=field.name, 

640 param_type=param_type, 

641 default_value=default_value, 

642 is_required=is_required, 

643 description=field_description 

644 ) 

645 

646 # PERFORMANCE: Cache the result to avoid re-parsing 

647 SignatureAnalyzer._dataclass_analysis_cache[cache_key] = parameters 

648 return parameters 

649 

650 except Exception: 

651 # Return empty dict on error (don't cache errors) 

652 return {} 

653 

654 @staticmethod 

655 def _extract_inline_field_docs(dataclass_type: type) -> Dict[str, str]: 

656 """Extract inline field documentation strings using AST parsing. 

657 

658 This handles multiple patterns used for field documentation: 

659 

660 Pattern 1 - Next line string literal: 

661 @dataclass 

662 class Config: 

663 field_name: str = "default" 

664 '''Field description here.''' 

665 

666 Pattern 2 - Same line string literal (less common): 

667 @dataclass 

668 class Config: 

669 field_name: str = "default" # '''Field description''' 

670 

671 Pattern 3 - Traditional docstring parameters (handled by DocstringExtractor): 

672 @dataclass 

673 class Config: 

674 ''' 

675 Args: 

676 field_name: Field description here. 

677 ''' 

678 field_name: str = "default" 

679 """ 

680 try: 

681 import ast 

682 import re 

683 

684 # Try to get source code - handle cases where it might not be available 

685 source = None 

686 try: 

687 source = inspect.getsource(dataclass_type) 

688 except (OSError, TypeError): 

689 # ENHANCEMENT: For decorator-modified classes, try multiple source file strategies 

690 try: 

691 # Strategy 1: Try the file where the class is currently defined 

692 source_file = inspect.getfile(dataclass_type) 

693 with open(source_file, 'r', encoding='utf-8') as f: 

694 file_content = f.read() 

695 source = SignatureAnalyzer._extract_class_source_from_file(file_content, dataclass_type.__name__) 

696 

697 # Strategy 2: If that fails, try to find the original source file 

698 # This handles decorator-modified classes where inspect.getfile() returns the wrong file 

699 if not source: 

700 try: 

701 import os 

702 source_dir = os.path.dirname(source_file) 

703 

704 # Try common source files in the same directory 

705 candidate_files = [] 

706 

707 # If the current file is lazy_config.py, try config.py 

708 if source_file.endswith('lazy_config.py'): 

709 candidate_files.append(os.path.join(source_dir, 'config.py')) 

710 

711 # Try other common patterns 

712 for filename in os.listdir(source_dir): 

713 if filename.endswith('.py') and filename != os.path.basename(source_file): 

714 candidate_files.append(os.path.join(source_dir, filename)) 

715 

716 # Try each candidate file 

717 for candidate_file in candidate_files: 

718 if os.path.exists(candidate_file): 

719 with open(candidate_file, 'r', encoding='utf-8') as f: 

720 candidate_content = f.read() 

721 source = SignatureAnalyzer._extract_class_source_from_file(candidate_content, dataclass_type.__name__) 

722 if source: # Found it! 

723 break 

724 except Exception: 

725 pass 

726 except Exception: 

727 pass 

728 

729 if not source: 

730 return {} 

731 

732 tree = ast.parse(source) 

733 

734 # Find the class definition - be more flexible with class name matching 

735 class_node = None 

736 target_class_name = dataclass_type.__name__ 

737 

738 # Handle cases where the class might have been renamed or modified 

739 for node in ast.walk(tree): 

740 if isinstance(node, ast.ClassDef): 

741 # Try exact match first 

742 if node.name == target_class_name: 

743 class_node = node 

744 break 

745 # Also try without common prefixes/suffixes that decorators might add 

746 base_name = target_class_name.replace('Lazy', '').replace('Config', '') 

747 node_base_name = node.name.replace('Lazy', '').replace('Config', '') 

748 if base_name and node_base_name and base_name == node_base_name: 

749 class_node = node 

750 break 

751 

752 if not class_node: 

753 return {} 

754 

755 field_docs = {} 

756 source_lines = source.split('\n') 

757 

758 # Method 1: Look for field assignments followed by string literals (next line) 

759 for i, node in enumerate(class_node.body): 

760 if isinstance(node, ast.AnnAssign) and hasattr(node.target, 'id'): 

761 field_name = node.target.id 

762 

763 # Check if the next node is a string literal (documentation) 

764 if i + 1 < len(class_node.body): 

765 next_node = class_node.body[i + 1] 

766 if isinstance(next_node, ast.Expr): 

767 # Handle both ast.Constant (Python 3.8+) and ast.Str (older versions) 

768 if isinstance(next_node.value, ast.Constant) and isinstance(next_node.value.value, str): 

769 field_docs[field_name] = next_node.value.value.strip() 

770 continue 

771 elif hasattr(ast, 'Str') and isinstance(next_node.value, ast.Str): 

772 field_docs[field_name] = next_node.value.s.strip() 

773 continue 

774 

775 # Method 2: Check for inline comments on the same line 

776 # Get the line number of the field definition 

777 field_line_num = node.lineno - 1 # Convert to 0-based indexing 

778 if 0 <= field_line_num < len(source_lines): 

779 line = source_lines[field_line_num] 

780 

781 # Look for string literals in comments on the same line 

782 # Pattern: field: type = value # """Documentation""" 

783 comment_match = re.search(r'#\s*["\']([^"\']+)["\']', line) 

784 if comment_match: 

785 field_docs[field_name] = comment_match.group(1).strip() 

786 continue 

787 

788 # Look for triple-quoted strings on the same line 

789 # Pattern: field: type = value """Documentation""" 

790 triple_quote_match = re.search(r'"""([^"]+)"""|\'\'\'([^\']+)\'\'\'', line) 

791 if triple_quote_match: 

792 doc_text = triple_quote_match.group(1) or triple_quote_match.group(2) 

793 field_docs[field_name] = doc_text.strip() 

794 

795 return field_docs 

796 

797 except Exception as e: 

798 # Return empty dict if AST parsing fails 

799 # Could add logging here for debugging: logger.debug(f"AST parsing failed: {e}") 

800 return {} 

801 

802 @staticmethod 

803 def _extract_field_type_docs(dataclass_type: type) -> Dict[str, str]: 

804 """Extract field documentation from field types for decorator-modified dataclasses. 

805 

806 This handles cases where dataclasses have been modified by decorators (like @auto_create_decorator) 

807 that inject fields from other dataclasses. In such cases, the AST parsing of the main class 

808 won't find documentation for the injected fields, so we need to extract documentation from 

809 the field types themselves. 

810 

811 For example, GlobalPipelineConfig has injected fields like 'path_planning_config' of type 

812 PathPlanningConfig. We extract the class docstring from PathPlanningConfig to use as the 

813 field description. 

814 """ 

815 try: 

816 import dataclasses 

817 

818 field_type_docs = {} 

819 

820 # Get all dataclass fields 

821 if not dataclasses.is_dataclass(dataclass_type): 

822 return {} 

823 

824 fields = dataclasses.fields(dataclass_type) 

825 

826 for field in fields: 

827 # Check if this field's type is a dataclass 

828 field_type = field.type 

829 

830 # Handle Optional types 

831 if hasattr(field_type, '__origin__') and field_type.__origin__ is Union: 

832 # Extract the non-None type from Optional[T] 

833 args = field_type.__args__ 

834 non_none_types = [arg for arg in args if arg is not type(None)] 

835 if len(non_none_types) == 1: 

836 field_type = non_none_types[0] 

837 

838 # If the field type is a dataclass, extract its docstring as field documentation 

839 if dataclasses.is_dataclass(field_type): 

840 # ENHANCEMENT: Resolve lazy dataclasses to their base classes for documentation 

841 resolved_field_type = SignatureAnalyzer._resolve_lazy_dataclass_for_docs(field_type) 

842 

843 docstring_info = DocstringExtractor.extract(resolved_field_type) 

844 if docstring_info.summary: 

845 field_type_docs[field.name] = docstring_info.summary 

846 elif docstring_info.description: 

847 # Use first line of description if no summary 

848 first_line = docstring_info.description.split('\n')[0].strip() 

849 if first_line: 

850 field_type_docs[field.name] = first_line 

851 

852 return field_type_docs 

853 

854 except Exception as e: 

855 # Return empty dict if extraction fails 

856 return {} 

857 

858 @staticmethod 

859 def _extract_class_source_from_file(file_content: str, class_name: str) -> Optional[str]: 

860 """Extract the source code for a specific class from a file. 

861 

862 This method is used when inspect.getsource() fails (e.g., for decorator-modified classes) 

863 to extract the class definition directly from the source file. 

864 

865 Args: 

866 file_content: The content of the source file 

867 class_name: The name of the class to extract 

868 

869 Returns: 

870 The source code for the class, or None if not found 

871 """ 

872 try: 

873 lines = file_content.split('\n') 

874 class_lines = [] 

875 in_class = False 

876 class_indent = 0 

877 

878 for line in lines: 

879 # Look for the class definition 

880 if line.strip().startswith(f'class {class_name}'): 

881 in_class = True 

882 class_indent = len(line) - len(line.lstrip()) 

883 class_lines.append(line) 

884 elif in_class: 

885 # Check if we've reached the end of the class 

886 if line.strip() and not line.startswith(' ') and not line.startswith('\t'): 

887 # Non-indented line that's not empty - end of class 

888 break 

889 elif line.strip() and len(line) - len(line.lstrip()) <= class_indent: 

890 # Line at same or less indentation than class - end of class 

891 break 

892 else: 

893 # Still inside the class 

894 class_lines.append(line) 

895 

896 if class_lines: 

897 return '\n'.join(class_lines) 

898 return None 

899 

900 except Exception: 

901 return None 

902 

903 @staticmethod 

904 def extract_field_documentation(dataclass_type: type, field_name: str) -> Optional[str]: 

905 """Extract documentation for a specific field from a dataclass. 

906 

907 This method tries multiple approaches to find documentation for a specific field: 

908 1. Inline field documentation (AST parsing) 

909 2. Field type documentation (for nested dataclasses) 

910 3. Docstring parameters 

911 4. Field metadata 

912 

913 Args: 

914 dataclass_type: The dataclass type containing the field 

915 field_name: Name of the field to get documentation for 

916 

917 Returns: 

918 Field documentation string, or None if not found 

919 """ 

920 try: 

921 import dataclasses 

922 

923 if not dataclasses.is_dataclass(dataclass_type): 

924 return None 

925 

926 # ENHANCEMENT: Resolve lazy dataclasses to their base classes 

927 # PipelineConfig should resolve to GlobalPipelineConfig for documentation 

928 resolved_type = SignatureAnalyzer._resolve_lazy_dataclass_for_docs(dataclass_type) 

929 

930 # Check cache first for performance 

931 cache_key = (resolved_type.__name__, resolved_type.__module__) 

932 if cache_key not in SignatureAnalyzer._field_docs_cache: 

933 # Extract all field documentation for this dataclass and cache it 

934 SignatureAnalyzer._field_docs_cache[cache_key] = SignatureAnalyzer._extract_all_field_docs(resolved_type) 

935 

936 cached_docs = SignatureAnalyzer._field_docs_cache[cache_key] 

937 if field_name in cached_docs: 

938 return cached_docs[field_name] 

939 

940 return None 

941 

942 except Exception: 

943 return None 

944 

945 @staticmethod 

946 def _resolve_lazy_dataclass_for_docs(dataclass_type: type) -> type: 

947 """Resolve lazy dataclasses to their base classes for documentation extraction. 

948 

949 This handles the case where PipelineConfig (lazy) should resolve to GlobalPipelineConfig 

950 for documentation purposes. 

951 

952 Args: 

953 dataclass_type: The dataclass type (potentially lazy) 

954 

955 Returns: 

956 The resolved dataclass type for documentation extraction 

957 """ 

958 try: 

959 # Check if this is a lazy dataclass by looking for common patterns 

960 class_name = dataclass_type.__name__ 

961 

962 # Handle PipelineConfig -> GlobalPipelineConfig 

963 if class_name == 'PipelineConfig': 

964 try: 

965 from openhcs.core.config import GlobalPipelineConfig 

966 return GlobalPipelineConfig 

967 except ImportError: 

968 pass 

969 

970 # Handle LazyXxxConfig -> XxxConfig mappings 

971 if class_name.startswith('Lazy') and class_name.endswith('Config'): 

972 try: 

973 # Remove 'Lazy' prefix: LazyWellFilterConfig -> WellFilterConfig 

974 base_class_name = class_name[4:] # Remove 'Lazy' 

975 

976 # Try to import from openhcs.core.config 

977 from openhcs.core import config as config_module 

978 if hasattr(config_module, base_class_name): 

979 return getattr(config_module, base_class_name) 

980 except (ImportError, AttributeError): 

981 pass 

982 

983 # For other lazy dataclasses, try to find the Global version 

984 if not class_name.startswith('Global') and class_name.endswith('Config'): 

985 try: 

986 # Try to find GlobalXxxConfig version 

987 global_class_name = f'Global{class_name}' 

988 module = __import__(dataclass_type.__module__, fromlist=[global_class_name]) 

989 if hasattr(module, global_class_name): 

990 return getattr(module, global_class_name) 

991 except (ImportError, AttributeError): 

992 pass 

993 

994 # If no resolution found, return the original type 

995 return dataclass_type 

996 

997 except Exception: 

998 return dataclass_type 

999 

1000 @staticmethod 

1001 def _extract_all_field_docs(dataclass_type: type) -> Dict[str, str]: 

1002 """Extract all field documentation for a dataclass and return as a dictionary. 

1003 

1004 This method combines all documentation extraction approaches and caches the results. 

1005 

1006 Args: 

1007 dataclass_type: The dataclass type to extract documentation from 

1008 

1009 Returns: 

1010 Dictionary mapping field names to their documentation 

1011 """ 

1012 all_docs = {} 

1013 

1014 try: 

1015 import dataclasses 

1016 

1017 # Try inline field documentation first 

1018 inline_docs = SignatureAnalyzer._extract_inline_field_docs(dataclass_type) 

1019 all_docs.update(inline_docs) 

1020 

1021 # Try field type documentation (for nested dataclasses) 

1022 field_type_docs = SignatureAnalyzer._extract_field_type_docs(dataclass_type) 

1023 for field_name, doc in field_type_docs.items(): 

1024 if field_name not in all_docs: # Don't overwrite inline docs 

1025 all_docs[field_name] = doc 

1026 

1027 # Try docstring parameters 

1028 docstring_info = DocstringExtractor.extract(dataclass_type) 

1029 if docstring_info.parameters: 

1030 for field_name, doc in docstring_info.parameters.items(): 

1031 if field_name not in all_docs: # Don't overwrite previous docs 

1032 all_docs[field_name] = doc 

1033 

1034 # Try field metadata 

1035 fields = dataclasses.fields(dataclass_type) 

1036 for field in fields: 

1037 if field.name not in all_docs: # Don't overwrite previous docs 

1038 if hasattr(field, 'metadata') and 'description' in field.metadata: 

1039 all_docs[field.name] = field.metadata['description'] 

1040 

1041 # ENHANCEMENT: Try inheritance - check parent classes for missing field documentation 

1042 for field in fields: 

1043 if field.name not in all_docs: # Only for fields still missing documentation 

1044 # Walk up the inheritance chain 

1045 for base_class in dataclass_type.__mro__[1:]: # Skip the class itself 

1046 if base_class == object: 

1047 continue 

1048 if dataclasses.is_dataclass(base_class): 

1049 # Check if this base class has the field with documentation 

1050 try: 

1051 base_fields = dataclasses.fields(base_class) 

1052 base_field_names = [f.name for f in base_fields] 

1053 if field.name in base_field_names: 

1054 # Try to get documentation from the base class 

1055 inherited_doc = SignatureAnalyzer.extract_field_documentation(base_class, field.name) 

1056 if inherited_doc: 

1057 all_docs[field.name] = inherited_doc 

1058 break # Found documentation, stop looking 

1059 except Exception: 

1060 continue # Try next base class 

1061 

1062 except Exception: 

1063 pass # Return whatever we managed to extract 

1064 

1065 return all_docs 

1066 

1067 @staticmethod 

1068 def extract_field_documentation_from_context(field_name: str, context_types: list[type]) -> Optional[str]: 

1069 """Extract field documentation by searching through multiple dataclass types. 

1070 

1071 This method is useful when you don't know exactly which dataclass contains 

1072 a field, but you have a list of candidate types to search through. 

1073 

1074 Args: 

1075 field_name: Name of the field to get documentation for 

1076 context_types: List of dataclass types to search through 

1077 

1078 Returns: 

1079 Field documentation string, or None if not found 

1080 """ 

1081 for dataclass_type in context_types: 

1082 if dataclass_type: 

1083 doc = SignatureAnalyzer.extract_field_documentation(dataclass_type, field_name) 

1084 if doc: 

1085 return doc 

1086 return None 

1087 

1088 @staticmethod 

1089 def _analyze_dataclass_instance(instance: object) -> Dict[str, ParameterInfo]: 

1090 """Extract parameter information from a dataclass instance.""" 

1091 try: 

1092 # Get the type and analyze it 

1093 dataclass_type = type(instance) 

1094 parameters = SignatureAnalyzer._analyze_dataclass(dataclass_type) 

1095 

1096 # Update default values with current instance values 

1097 # For lazy dataclasses, use object.__getattribute__ to preserve None values for placeholders 

1098 for name, param_info in parameters.items(): 

1099 if hasattr(instance, name): 

1100 # Check if this is a lazy dataclass that should preserve None values 

1101 if hasattr(instance, '_resolve_field_value'): 

1102 # This is a lazy dataclass - use object.__getattribute__ to get stored value 

1103 current_value = object.__getattribute__(instance, name) 

1104 else: 

1105 # Regular dataclass - use normal getattr 

1106 current_value = getattr(instance, name) 

1107 

1108 # Create new ParameterInfo with current value as default 

1109 parameters[name] = ParameterInfo( 

1110 name=param_info.name, 

1111 param_type=param_info.param_type, 

1112 default_value=current_value, 

1113 is_required=param_info.is_required, 

1114 description=param_info.description 

1115 ) 

1116 

1117 return parameters 

1118 

1119 except Exception: 

1120 return {} 

1121 

1122 # Duplicate method removed - using the fixed version above