Coverage for openhcs/textual_tui/widgets/shared/signature_analyzer.py: 0.0%

559 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1# File: openhcs/textual_tui/widgets/shared/signature_analyzer.py 

2 

3import ast 

4import inspect 

5import dataclasses 

6import re 

7from typing import Any, Dict, Callable, get_type_hints, NamedTuple, Union, Optional, Type 

8from dataclasses import dataclass 

9import openhcs.config_framework.lazy_factory as lazy_module 

10import openhcs.core.config as config_module 

11 

12 

13@dataclass(frozen=True) 

14class AnalysisConstants: 

15 """Constants for signature analysis to eliminate magic strings.""" 

16 INIT_METHOD_SUFFIX: str = ".__init__" 

17 SELF_PARAM: str = "self" 

18 CLS_PARAM: str = "cls" 

19 DUNDER_PREFIX: str = "__" 

20 DUNDER_SUFFIX: str = "__" 

21 

22 

23# Create constants instance for use throughout the module 

24CONSTANTS = AnalysisConstants() 

25 

26 

27class ParameterInfo(NamedTuple): 

28 """Information about a parameter.""" 

29 name: str 

30 param_type: type 

31 default_value: Any 

32 is_required: bool 

33 description: Optional[str] = None # Add parameter description from docstring 

34 

35class DocstringInfo(NamedTuple): 

36 """Information extracted from a docstring.""" 

37 summary: Optional[str] = None # First line or brief description 

38 description: Optional[str] = None # Full description 

39 parameters: Dict[str, str] = None # Parameter name -> description mapping 

40 returns: Optional[str] = None # Return value description 

41 examples: Optional[str] = None # Usage examples 

42 

43class DocstringExtractor: 

44 """Extract structured information from docstrings.""" 

45 

46 @staticmethod 

47 def extract(target: Union[Callable, type]) -> DocstringInfo: 

48 """Extract docstring information from function or class. 

49 

50 Args: 

51 target: Function, method, or class to extract docstring from 

52 

53 Returns: 

54 DocstringInfo with parsed docstring components 

55 """ 

56 if not target: 

57 return DocstringInfo() 

58 

59 # ENHANCEMENT: Handle lazy dataclasses by extracting from their base class 

60 actual_target = DocstringExtractor._resolve_lazy_target(target) 

61 

62 docstring = inspect.getdoc(actual_target) 

63 if not docstring: 

64 return DocstringInfo() 

65 

66 # Try AST-based parsing first for better accuracy 

67 try: 

68 return DocstringExtractor._parse_docstring_ast(actual_target, docstring) 

69 except Exception: 

70 # Fall back to regex-based parsing 

71 return DocstringExtractor._parse_docstring(docstring) 

72 

73 @staticmethod 

74 def _resolve_lazy_target(target: Union[Callable, type]) -> Union[Callable, type]: 

75 """Resolve lazy dataclass to its base class for docstring extraction. 

76 

77 Lazy dataclasses are dynamically created and may not have proper docstrings. 

78 This method attempts to find the original base class that the lazy class 

79 was created from. 

80 """ 

81 if not hasattr(target, '__name__'): 

82 return target 

83 

84 # Check if this looks like a lazy dataclass (starts with "Lazy") 

85 if target.__name__.startswith('Lazy'): 

86 # Try to find the base class in the MRO 

87 for base in getattr(target, '__mro__', []): 

88 if base != target and base.__name__ != 'object': 

89 # Found a base class that's not the lazy class itself 

90 if not base.__name__.startswith('Lazy'): 

91 return base 

92 

93 return target 

94 

95 @staticmethod 

96 def _parse_docstring_ast(target: Union[Callable, type], docstring: str) -> DocstringInfo: 

97 """Parse docstring using AST for more accurate extraction. 

98 

99 This method uses AST to parse the source code and extract docstring 

100 information more accurately, especially for complex multiline descriptions. 

101 """ 

102 try: 

103 # Get source code 

104 source = inspect.getsource(target) 

105 tree = ast.parse(source) 

106 

107 # Find the function/class node 

108 for node in ast.walk(tree): 

109 if isinstance(node, (ast.FunctionDef, ast.ClassDef)): 

110 if ast.get_docstring(node) == docstring: 

111 return DocstringExtractor._parse_ast_docstring(node, docstring) 

112 

113 # Fallback to regex parsing if AST parsing fails 

114 return DocstringExtractor._parse_docstring(docstring) 

115 

116 except Exception: 

117 # Fallback to regex parsing 

118 return DocstringExtractor._parse_docstring(docstring) 

119 

120 @staticmethod 

121 def _parse_ast_docstring(node: Union[ast.FunctionDef, ast.ClassDef], docstring: str) -> DocstringInfo: 

122 """Parse docstring from AST node with enhanced multiline support.""" 

123 # For now, use the improved regex parser 

124 # This can be extended later with more sophisticated AST-based parsing 

125 return DocstringExtractor._parse_docstring(docstring) 

126 

127 @staticmethod 

128 def _parse_docstring(docstring: str) -> DocstringInfo: 

129 """Parse a docstring into structured components with improved multiline support. 

130 

131 Supports multiple docstring formats: 

132 - Google style (Args:, Returns:, Examples:) 

133 - NumPy style (Parameters, Returns, Examples) 

134 - Sphinx style (:param name:, :returns:) 

135 - Simple format (just description) 

136 

137 Uses improved parsing for multiline parameter descriptions that continues 

138 until a blank line or new parameter/section is encountered. 

139 """ 

140 lines = docstring.strip().split('\n') 

141 

142 summary = None 

143 description_lines = [] 

144 parameters = {} 

145 returns = None 

146 examples = None 

147 

148 current_section = 'description' 

149 current_param = None 

150 current_param_lines = [] 

151 

152 def _finalize_current_param(): 

153 """Finalize the current parameter description.""" 

154 if current_param and current_param_lines: 

155 param_desc = '\n'.join(current_param_lines).strip() 

156 parameters[current_param] = param_desc 

157 

158 for i, line in enumerate(lines): 

159 original_line = line 

160 line = line.strip() 

161 

162 # Handle both Google/Sphinx style (with colons) and NumPy style (without colons) 

163 if line.lower() in ('args:', 'arguments:', 'parameters:'): 

164 _finalize_current_param() 

165 current_param = None 

166 current_param_lines = [] 

167 current_section = 'parameters' 

168 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator 

169 continue 

170 continue 

171 elif line.lower() in ('args', 'arguments', 'parameters') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'): 

172 # NumPy-style section headers (without colons, followed by dashes) 

173 _finalize_current_param() 

174 current_param = None 

175 current_param_lines = [] 

176 current_section = 'parameters' 

177 continue 

178 elif line.lower() in ('returns:', 'return:'): 

179 _finalize_current_param() 

180 current_param = None 

181 current_param_lines = [] 

182 current_section = 'returns' 

183 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator 

184 continue 

185 continue 

186 elif line.lower() in ('returns', 'return') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'): 

187 # NumPy-style returns section 

188 _finalize_current_param() 

189 current_param = None 

190 current_param_lines = [] 

191 current_section = 'returns' 

192 continue 

193 elif line.lower() in ('examples:', 'example:'): 

194 _finalize_current_param() 

195 current_param = None 

196 current_param_lines = [] 

197 current_section = 'examples' 

198 if i + 1 < len(lines) and lines[i+1].strip().startswith('---'): # Skip NumPy style separator 

199 continue 

200 continue 

201 elif line.lower() in ('examples', 'example') and i + 1 < len(lines) and lines[i+1].strip().startswith('-'): 

202 # NumPy-style examples section 

203 _finalize_current_param() 

204 current_param = None 

205 current_param_lines = [] 

206 current_section = 'examples' 

207 continue 

208 

209 if current_section == 'description': 

210 if not summary and line: 

211 summary = line 

212 else: 

213 description_lines.append(original_line) # Keep original indentation 

214 

215 elif current_section == 'parameters': 

216 # Enhanced parameter parsing to handle multiple formats 

217 param_match_google = re.match(r'^(\w+):\s*(.+)', line) 

218 param_match_sphinx = re.match(r'^:param\s+(\w+):\s*(.+)', line) 

219 param_match_numpy = re.match(r'^(\w+)\s*:\s*(.+)', line) 

220 # New: Handle pyclesperanto-style inline parameters (param_name: type description) 

221 param_match_inline = re.match(r'^(\w+):\s*(\w+(?:\[.*?\])?|\w+(?:\s*\|\s*\w+)*)\s+(.+)', line) 

222 # New: Handle parameters that start with bullet points or dashes 

223 param_match_bullet = re.match(r'^[-•*]\s*(\w+):\s*(.+)', line) 

224 

225 if param_match_google or param_match_sphinx or param_match_numpy or param_match_inline or param_match_bullet: 

226 _finalize_current_param() 

227 

228 if param_match_google: 

229 param_name, param_desc = param_match_google.groups() 

230 elif param_match_sphinx: 

231 param_name, param_desc = param_match_sphinx.groups() 

232 elif param_match_numpy: 

233 param_name, param_desc = param_match_numpy.groups() 

234 elif param_match_inline: 

235 param_name, param_type, param_desc = param_match_inline.groups() 

236 param_desc = f"{param_type} - {param_desc}" # Include type in description 

237 elif param_match_bullet: 

238 param_name, param_desc = param_match_bullet.groups() 

239 

240 current_param = param_name 

241 current_param_lines = [param_desc.strip()] 

242 elif current_param and (original_line.startswith(' ') or original_line.startswith('\t')): 

243 # Indented continuation line 

244 current_param_lines.append(line) 

245 elif not line: 

246 _finalize_current_param() 

247 current_param = None 

248 current_param_lines = [] 

249 elif current_param: 

250 # Non-indented continuation line (part of the same block) 

251 current_param_lines.append(line) 

252 else: 

253 # Try to parse inline parameter definitions in a single block 

254 # This handles cases where parameters are listed without clear separation 

255 inline_params = DocstringExtractor._parse_inline_parameters(line) 

256 for param_name, param_desc in inline_params.items(): 

257 parameters[param_name] = param_desc 

258 

259 elif current_section == 'returns': 

260 if returns is None: 

261 returns = line 

262 else: 

263 returns += '\n' + line 

264 

265 elif current_section == 'examples': 

266 if examples is None: 

267 examples = line 

268 else: 

269 examples += '\n' + line 

270 

271 _finalize_current_param() 

272 

273 description = '\n'.join(description_lines).strip() 

274 if description == summary: 

275 description = None 

276 

277 return DocstringInfo( 

278 summary=summary, 

279 description=description, 

280 parameters=parameters or {}, 

281 returns=returns, 

282 examples=examples 

283 ) 

284 

285 @staticmethod 

286 def _parse_inline_parameters(line: str) -> Dict[str, str]: 

287 """Parse parameters from a single line containing multiple parameter definitions. 

288 

289 Handles formats like: 

290 - "input_image: Image Input image to process. footprint: Image Structuring element..." 

291 - "param1: type1 description1. param2: type2 description2." 

292 """ 

293 parameters = {} 

294 

295 import re 

296 

297 # Strategy: Use a flexible pattern that works with the pyclesperanto format 

298 # Pattern matches: param_name: everything up to the next param_name: or end of string 

299 param_pattern = r'(\w+):\s*([^:]*?)(?=\s+\w+:|$)' 

300 matches = re.findall(param_pattern, line) 

301 

302 for param_name, param_desc in matches: 

303 if param_desc.strip(): 

304 # Clean up the description (remove trailing periods, extra whitespace) 

305 clean_desc = param_desc.strip().rstrip('.') 

306 parameters[param_name] = clean_desc 

307 

308 return parameters 

309 

310 

311class SignatureAnalyzer: 

312 """Universal analyzer for extracting parameter information from any target.""" 

313 

314 # Class-level cache for field documentation to avoid re-parsing 

315 _field_docs_cache = {} 

316 

317 @staticmethod 

318 def analyze(target: Union[Callable, Type, object], skip_first_param: Optional[bool] = None) -> Dict[str, ParameterInfo]: 

319 """Extract parameter information from any target: function, constructor, dataclass, or instance. 

320 

321 Args: 

322 target: Function, constructor, dataclass type, or dataclass instance 

323 skip_first_param: Whether to skip the first parameter (after self/cls). 

324 If None, auto-detects based on context: 

325 - False for step constructors (all params are configuration) 

326 - True for image processing functions (first param is image data) 

327 

328 Returns: 

329 Dict mapping parameter names to ParameterInfo 

330 """ 

331 if not target: 

332 return {} 

333 

334 # Dispatch based on target type 

335 if inspect.isclass(target): 

336 if dataclasses.is_dataclass(target): 

337 return SignatureAnalyzer._analyze_dataclass(target) 

338 else: 

339 # Try to analyze constructor 

340 return SignatureAnalyzer._analyze_callable(target.__init__, skip_first_param) 

341 elif dataclasses.is_dataclass(target): 

342 # Instance of dataclass 

343 return SignatureAnalyzer._analyze_dataclass_instance(target) 

344 else: 

345 # Function, method, or other callable 

346 return SignatureAnalyzer._analyze_callable(target, skip_first_param) 

347 

348 @staticmethod 

349 def _analyze_callable(callable_obj: Callable, skip_first_param: Optional[bool] = None) -> Dict[str, ParameterInfo]: 

350 """Extract parameter information from callable signature. 

351 

352 Args: 

353 callable_obj: The callable to analyze 

354 skip_first_param: Whether to skip the first parameter (after self/cls). 

355 If None, auto-detects based on context. 

356 """ 

357 sig = inspect.signature(callable_obj) 

358 # Build comprehensive namespace for forward reference resolution 

359 # Start with function's globals (which contain the actual types), then add our modules as fallback 

360 globalns = { 

361 **vars(lazy_module), 

362 **vars(config_module), 

363 **getattr(callable_obj, '__globals__', {}) 

364 } 

365 

366 # For OpenHCS functions, prioritize the function's actual module globals 

367 if hasattr(callable_obj, '__module__') and callable_obj.__module__: 

368 try: 

369 import sys 

370 actual_module = sys.modules.get(callable_obj.__module__) 

371 if actual_module: 

372 # Function's module globals should take precedence for type resolution 

373 globalns = { 

374 **vars(lazy_module), 

375 **vars(config_module), 

376 **vars(actual_module) # This overwrites with the actual module types 

377 } 

378 except Exception: 

379 pass # Fall back to original globalns 

380 

381 try: 

382 type_hints = get_type_hints(callable_obj, globalns=globalns) 

383 except (NameError, AttributeError) as e: 

384 # If type hint resolution fails, try with just the function's original globals 

385 try: 

386 type_hints = get_type_hints(callable_obj, globalns=getattr(callable_obj, '__globals__', {})) 

387 except: 

388 # If that still fails, return empty type hints 

389 type_hints = {} 

390 except Exception: 

391 # For any other type hint resolution errors, return empty type hints 

392 type_hints = {} 

393 

394 

395 

396 # Extract docstring information (with fallback for robustness) 

397 try: 

398 docstring_info = DocstringExtractor.extract(callable_obj) 

399 except: 

400 docstring_info = None 

401 

402 if not docstring_info: 

403 docstring_info = DocstringInfo() 

404 

405 parameters = {} 

406 param_list = list(sig.parameters.items()) 

407 

408 # Determine skip behavior: explicit parameter overrides auto-detection 

409 should_skip_first_param = ( 

410 skip_first_param if skip_first_param is not None 

411 else SignatureAnalyzer._should_skip_first_parameter(callable_obj) 

412 ) 

413 

414 first_param_after_self_skipped = False 

415 

416 for i, (param_name, param) in enumerate(param_list): 

417 # Always skip self/cls 

418 if param_name in (CONSTANTS.SELF_PARAM, CONSTANTS.CLS_PARAM): 

419 continue 

420 

421 # Always skip dunder parameters (internal/reserved fields) 

422 if param_name.startswith(CONSTANTS.DUNDER_PREFIX) and param_name.endswith(CONSTANTS.DUNDER_SUFFIX): 

423 continue 

424 

425 # Skip first parameter for image processing functions only 

426 if should_skip_first_param and not first_param_after_self_skipped: 

427 first_param_after_self_skipped = True 

428 continue 

429 

430 # Handle **kwargs parameters - try to extract original function signature 

431 if param.kind == inspect.Parameter.VAR_KEYWORD: 

432 # Try to find the original function if this is a wrapper 

433 original_params = SignatureAnalyzer._extract_original_parameters(callable_obj) 

434 if original_params: 

435 parameters.update(original_params) 

436 continue 

437 

438 from typing import Any 

439 param_type = type_hints.get(param_name, Any) 

440 default_value = param.default if param.default != inspect.Parameter.empty else None 

441 is_required = param.default == inspect.Parameter.empty 

442 

443 

444 

445 # Get parameter description from docstring 

446 param_description = docstring_info.parameters.get(param_name) if docstring_info else None 

447 

448 parameters[param_name] = ParameterInfo( 

449 name=param_name, 

450 param_type=param_type, 

451 default_value=default_value, 

452 is_required=is_required, 

453 description=param_description 

454 ) 

455 

456 return parameters 

457 

458 @staticmethod 

459 def _should_skip_first_parameter(callable_obj: Callable) -> bool: 

460 """ 

461 Determine if the first parameter should be skipped for any callable. 

462 

463 Universal logic that works with any object: 

464 - Constructors (__init__ methods): don't skip (all params are configuration) 

465 - All other callables: skip first param (assume it's data being processed) 

466 """ 

467 # Check if this is any __init__ method (constructor) 

468 if (hasattr(callable_obj, '__qualname__') and 

469 callable_obj.__qualname__.endswith(CONSTANTS.INIT_METHOD_SUFFIX)): 

470 return False 

471 

472 # Everything else: skip first parameter 

473 return True 

474 

475 @staticmethod 

476 def _extract_original_parameters(callable_obj: Callable) -> Dict[str, ParameterInfo]: 

477 """ 

478 Extract parameters from the original function if this is a wrapper with **kwargs. 

479 

480 This handles cases where scikit-image or other auto-registered functions 

481 are wrapped with (image, **kwargs) signatures. 

482 """ 

483 try: 

484 # Check if this function has access to the original function 

485 # Common patterns: __wrapped__, closure variables, etc. 

486 

487 # Pattern 1: Check if it's a functools.wraps wrapper 

488 if hasattr(callable_obj, '__wrapped__'): 

489 return SignatureAnalyzer._analyze_callable(callable_obj.__wrapped__) 

490 

491 # Pattern 2: Check closure for original function reference 

492 if hasattr(callable_obj, '__closure__') and callable_obj.__closure__: 

493 for cell in callable_obj.__closure__: 

494 if hasattr(cell.cell_contents, '__call__'): 

495 # Found a callable in closure - might be the original function 

496 try: 

497 orig_sig = inspect.signature(cell.cell_contents) 

498 # Skip if it also has **kwargs (avoid infinite recursion) 

499 if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in orig_sig.parameters.values()): 

500 continue 

501 return SignatureAnalyzer._analyze_callable(cell.cell_contents) 

502 except: 

503 continue 

504 

505 # Pattern 3: Try to extract from function name and module 

506 # This is a fallback for scikit-image functions 

507 if hasattr(callable_obj, '__name__') and hasattr(callable_obj, '__module__'): 

508 func_name = callable_obj.__name__ 

509 module_name = callable_obj.__module__ 

510 

511 # Try to find the original function in scikit-image 

512 if 'skimage' in module_name: 

513 try: 

514 import importlib 

515 # Extract the actual module path (remove wrapper module parts) 

516 if 'scikit_image_registry' in module_name: 

517 # This is our wrapper, try to find the original in skimage 

518 for skimage_module in ['skimage.filters', 'skimage.morphology', 

519 'skimage.segmentation', 'skimage.feature', 

520 'skimage.measure', 'skimage.transform', 

521 'skimage.restoration', 'skimage.exposure']: 

522 try: 

523 mod = importlib.import_module(skimage_module) 

524 if hasattr(mod, func_name): 

525 orig_func = getattr(mod, func_name) 

526 return SignatureAnalyzer._analyze_callable(orig_func) 

527 except: 

528 continue 

529 except: 

530 pass 

531 

532 return {} 

533 

534 except Exception: 

535 return {} 

536 

537 @staticmethod 

538 def _analyze_dataclass(dataclass_type: type) -> Dict[str, ParameterInfo]: 

539 """Extract parameter information from dataclass fields.""" 

540 try: 

541 type_hints = get_type_hints(dataclass_type) 

542 

543 # Extract docstring information from dataclass 

544 docstring_info = DocstringExtractor.extract(dataclass_type) 

545 

546 # Extract inline field documentation using AST 

547 inline_docs = SignatureAnalyzer._extract_inline_field_docs(dataclass_type) 

548 

549 # ENHANCEMENT: For dataclasses modified by decorators (like GlobalPipelineConfig), 

550 # also extract field documentation from the field types themselves 

551 field_type_docs = SignatureAnalyzer._extract_field_type_docs(dataclass_type) 

552 

553 parameters = {} 

554 

555 for field in dataclasses.fields(dataclass_type): 

556 param_type = type_hints.get(field.name, str) 

557 

558 # Get default value 

559 if field.default != dataclasses.MISSING: 

560 default_value = field.default 

561 is_required = False 

562 elif field.default_factory != dataclasses.MISSING: 

563 default_value = field.default_factory() 

564 is_required = False 

565 else: 

566 default_value = None 

567 is_required = True 

568 

569 # Get field description from multiple sources (priority order) 

570 field_description = None 

571 

572 # 1. Field metadata (highest priority) 

573 if hasattr(field, 'metadata') and 'description' in field.metadata: 

574 field_description = field.metadata['description'] 

575 # 2. Inline documentation strings (from AST parsing) 

576 elif field.name in inline_docs: 

577 field_description = inline_docs[field.name] 

578 # 3. Field type documentation (for decorator-modified classes) 

579 elif field.name in field_type_docs: 

580 field_description = field_type_docs[field.name] 

581 # 4. Docstring parameters (fallback) 

582 elif docstring_info.parameters and field.name in docstring_info.parameters: 

583 field_description = docstring_info.parameters.get(field.name) 

584 # 5. CRITICAL FIX: Use inheritance-aware field documentation extraction 

585 else: 

586 field_description = SignatureAnalyzer.extract_field_documentation(dataclass_type, field.name) 

587 

588 parameters[field.name] = ParameterInfo( 

589 name=field.name, 

590 param_type=param_type, 

591 default_value=default_value, 

592 is_required=is_required, 

593 description=field_description 

594 ) 

595 

596 return parameters 

597 

598 except Exception: 

599 # Return empty dict on error 

600 return {} 

601 

602 @staticmethod 

603 def _extract_inline_field_docs(dataclass_type: type) -> Dict[str, str]: 

604 """Extract inline field documentation strings using AST parsing. 

605 

606 This handles multiple patterns used for field documentation: 

607 

608 Pattern 1 - Next line string literal: 

609 @dataclass 

610 class Config: 

611 field_name: str = "default" 

612 '''Field description here.''' 

613 

614 Pattern 2 - Same line string literal (less common): 

615 @dataclass 

616 class Config: 

617 field_name: str = "default" # '''Field description''' 

618 

619 Pattern 3 - Traditional docstring parameters (handled by DocstringExtractor): 

620 @dataclass 

621 class Config: 

622 ''' 

623 Args: 

624 field_name: Field description here. 

625 ''' 

626 field_name: str = "default" 

627 """ 

628 try: 

629 import ast 

630 import re 

631 

632 # Try to get source code - handle cases where it might not be available 

633 source = None 

634 try: 

635 source = inspect.getsource(dataclass_type) 

636 except (OSError, TypeError): 

637 # ENHANCEMENT: For decorator-modified classes, try multiple source file strategies 

638 try: 

639 # Strategy 1: Try the file where the class is currently defined 

640 source_file = inspect.getfile(dataclass_type) 

641 with open(source_file, 'r', encoding='utf-8') as f: 

642 file_content = f.read() 

643 source = SignatureAnalyzer._extract_class_source_from_file(file_content, dataclass_type.__name__) 

644 

645 # Strategy 2: If that fails, try to find the original source file 

646 # This handles decorator-modified classes where inspect.getfile() returns the wrong file 

647 if not source: 

648 try: 

649 import os 

650 source_dir = os.path.dirname(source_file) 

651 

652 # Try common source files in the same directory 

653 candidate_files = [] 

654 

655 # If the current file is lazy_config.py, try config.py 

656 if source_file.endswith('lazy_config.py'): 

657 candidate_files.append(os.path.join(source_dir, 'config.py')) 

658 

659 # Try other common patterns 

660 for filename in os.listdir(source_dir): 

661 if filename.endswith('.py') and filename != os.path.basename(source_file): 

662 candidate_files.append(os.path.join(source_dir, filename)) 

663 

664 # Try each candidate file 

665 for candidate_file in candidate_files: 

666 if os.path.exists(candidate_file): 

667 with open(candidate_file, 'r', encoding='utf-8') as f: 

668 candidate_content = f.read() 

669 source = SignatureAnalyzer._extract_class_source_from_file(candidate_content, dataclass_type.__name__) 

670 if source: # Found it! 

671 break 

672 except Exception: 

673 pass 

674 except Exception: 

675 pass 

676 

677 if not source: 

678 return {} 

679 

680 tree = ast.parse(source) 

681 

682 # Find the class definition - be more flexible with class name matching 

683 class_node = None 

684 target_class_name = dataclass_type.__name__ 

685 

686 # Handle cases where the class might have been renamed or modified 

687 for node in ast.walk(tree): 

688 if isinstance(node, ast.ClassDef): 

689 # Try exact match first 

690 if node.name == target_class_name: 

691 class_node = node 

692 break 

693 # Also try without common prefixes/suffixes that decorators might add 

694 base_name = target_class_name.replace('Lazy', '').replace('Config', '') 

695 node_base_name = node.name.replace('Lazy', '').replace('Config', '') 

696 if base_name and node_base_name and base_name == node_base_name: 

697 class_node = node 

698 break 

699 

700 if not class_node: 

701 return {} 

702 

703 field_docs = {} 

704 source_lines = source.split('\n') 

705 

706 # Method 1: Look for field assignments followed by string literals (next line) 

707 for i, node in enumerate(class_node.body): 

708 if isinstance(node, ast.AnnAssign) and hasattr(node.target, 'id'): 

709 field_name = node.target.id 

710 

711 # Check if the next node is a string literal (documentation) 

712 if i + 1 < len(class_node.body): 

713 next_node = class_node.body[i + 1] 

714 if isinstance(next_node, ast.Expr): 

715 # Handle both ast.Constant (Python 3.8+) and ast.Str (older versions) 

716 if isinstance(next_node.value, ast.Constant) and isinstance(next_node.value.value, str): 

717 field_docs[field_name] = next_node.value.value.strip() 

718 continue 

719 elif hasattr(ast, 'Str') and isinstance(next_node.value, ast.Str): 

720 field_docs[field_name] = next_node.value.s.strip() 

721 continue 

722 

723 # Method 2: Check for inline comments on the same line 

724 # Get the line number of the field definition 

725 field_line_num = node.lineno - 1 # Convert to 0-based indexing 

726 if 0 <= field_line_num < len(source_lines): 

727 line = source_lines[field_line_num] 

728 

729 # Look for string literals in comments on the same line 

730 # Pattern: field: type = value # """Documentation""" 

731 comment_match = re.search(r'#\s*["\']([^"\']+)["\']', line) 

732 if comment_match: 

733 field_docs[field_name] = comment_match.group(1).strip() 

734 continue 

735 

736 # Look for triple-quoted strings on the same line 

737 # Pattern: field: type = value """Documentation""" 

738 triple_quote_match = re.search(r'"""([^"]+)"""|\'\'\'([^\']+)\'\'\'', line) 

739 if triple_quote_match: 

740 doc_text = triple_quote_match.group(1) or triple_quote_match.group(2) 

741 field_docs[field_name] = doc_text.strip() 

742 

743 return field_docs 

744 

745 except Exception as e: 

746 # Return empty dict if AST parsing fails 

747 # Could add logging here for debugging: logger.debug(f"AST parsing failed: {e}") 

748 return {} 

749 

750 @staticmethod 

751 def _extract_field_type_docs(dataclass_type: type) -> Dict[str, str]: 

752 """Extract field documentation from field types for decorator-modified dataclasses. 

753 

754 This handles cases where dataclasses have been modified by decorators (like @auto_create_decorator) 

755 that inject fields from other dataclasses. In such cases, the AST parsing of the main class 

756 won't find documentation for the injected fields, so we need to extract documentation from 

757 the field types themselves. 

758 

759 For example, GlobalPipelineConfig has injected fields like 'path_planning_config' of type 

760 PathPlanningConfig. We extract the class docstring from PathPlanningConfig to use as the 

761 field description. 

762 """ 

763 try: 

764 import dataclasses 

765 

766 field_type_docs = {} 

767 

768 # Get all dataclass fields 

769 if not dataclasses.is_dataclass(dataclass_type): 

770 return {} 

771 

772 fields = dataclasses.fields(dataclass_type) 

773 

774 for field in fields: 

775 # Check if this field's type is a dataclass 

776 field_type = field.type 

777 

778 # Handle Optional types 

779 if hasattr(field_type, '__origin__') and field_type.__origin__ is Union: 

780 # Extract the non-None type from Optional[T] 

781 args = field_type.__args__ 

782 non_none_types = [arg for arg in args if arg is not type(None)] 

783 if len(non_none_types) == 1: 

784 field_type = non_none_types[0] 

785 

786 # If the field type is a dataclass, extract its docstring as field documentation 

787 if dataclasses.is_dataclass(field_type): 

788 # ENHANCEMENT: Resolve lazy dataclasses to their base classes for documentation 

789 resolved_field_type = SignatureAnalyzer._resolve_lazy_dataclass_for_docs(field_type) 

790 

791 docstring_info = DocstringExtractor.extract(resolved_field_type) 

792 if docstring_info.summary: 

793 field_type_docs[field.name] = docstring_info.summary 

794 elif docstring_info.description: 

795 # Use first line of description if no summary 

796 first_line = docstring_info.description.split('\n')[0].strip() 

797 if first_line: 

798 field_type_docs[field.name] = first_line 

799 

800 return field_type_docs 

801 

802 except Exception as e: 

803 # Return empty dict if extraction fails 

804 return {} 

805 

806 @staticmethod 

807 def _extract_class_source_from_file(file_content: str, class_name: str) -> Optional[str]: 

808 """Extract the source code for a specific class from a file. 

809 

810 This method is used when inspect.getsource() fails (e.g., for decorator-modified classes) 

811 to extract the class definition directly from the source file. 

812 

813 Args: 

814 file_content: The content of the source file 

815 class_name: The name of the class to extract 

816 

817 Returns: 

818 The source code for the class, or None if not found 

819 """ 

820 try: 

821 lines = file_content.split('\n') 

822 class_lines = [] 

823 in_class = False 

824 class_indent = 0 

825 

826 for line in lines: 

827 # Look for the class definition 

828 if line.strip().startswith(f'class {class_name}'): 

829 in_class = True 

830 class_indent = len(line) - len(line.lstrip()) 

831 class_lines.append(line) 

832 elif in_class: 

833 # Check if we've reached the end of the class 

834 if line.strip() and not line.startswith(' ') and not line.startswith('\t'): 

835 # Non-indented line that's not empty - end of class 

836 break 

837 elif line.strip() and len(line) - len(line.lstrip()) <= class_indent: 

838 # Line at same or less indentation than class - end of class 

839 break 

840 else: 

841 # Still inside the class 

842 class_lines.append(line) 

843 

844 if class_lines: 

845 return '\n'.join(class_lines) 

846 return None 

847 

848 except Exception: 

849 return None 

850 

851 @staticmethod 

852 def extract_field_documentation(dataclass_type: type, field_name: str) -> Optional[str]: 

853 """Extract documentation for a specific field from a dataclass. 

854 

855 This method tries multiple approaches to find documentation for a specific field: 

856 1. Inline field documentation (AST parsing) 

857 2. Field type documentation (for nested dataclasses) 

858 3. Docstring parameters 

859 4. Field metadata 

860 

861 Args: 

862 dataclass_type: The dataclass type containing the field 

863 field_name: Name of the field to get documentation for 

864 

865 Returns: 

866 Field documentation string, or None if not found 

867 """ 

868 try: 

869 import dataclasses 

870 

871 if not dataclasses.is_dataclass(dataclass_type): 

872 return None 

873 

874 # ENHANCEMENT: Resolve lazy dataclasses to their base classes 

875 # PipelineConfig should resolve to GlobalPipelineConfig for documentation 

876 resolved_type = SignatureAnalyzer._resolve_lazy_dataclass_for_docs(dataclass_type) 

877 

878 # Check cache first for performance 

879 cache_key = (resolved_type.__name__, resolved_type.__module__) 

880 if cache_key not in SignatureAnalyzer._field_docs_cache: 

881 # Extract all field documentation for this dataclass and cache it 

882 SignatureAnalyzer._field_docs_cache[cache_key] = SignatureAnalyzer._extract_all_field_docs(resolved_type) 

883 

884 cached_docs = SignatureAnalyzer._field_docs_cache[cache_key] 

885 if field_name in cached_docs: 

886 return cached_docs[field_name] 

887 

888 return None 

889 

890 except Exception: 

891 return None 

892 

893 @staticmethod 

894 def _resolve_lazy_dataclass_for_docs(dataclass_type: type) -> type: 

895 """Resolve lazy dataclasses to their base classes for documentation extraction. 

896 

897 This handles the case where PipelineConfig (lazy) should resolve to GlobalPipelineConfig 

898 for documentation purposes. 

899 

900 Args: 

901 dataclass_type: The dataclass type (potentially lazy) 

902 

903 Returns: 

904 The resolved dataclass type for documentation extraction 

905 """ 

906 try: 

907 # Check if this is a lazy dataclass by looking for common patterns 

908 class_name = dataclass_type.__name__ 

909 

910 # Handle PipelineConfig -> GlobalPipelineConfig 

911 if class_name == 'PipelineConfig': 

912 try: 

913 from openhcs.core.config import GlobalPipelineConfig 

914 return GlobalPipelineConfig 

915 except ImportError: 

916 pass 

917 

918 # Handle LazyXxxConfig -> XxxConfig mappings 

919 if class_name.startswith('Lazy') and class_name.endswith('Config'): 

920 try: 

921 # Remove 'Lazy' prefix: LazyWellFilterConfig -> WellFilterConfig 

922 base_class_name = class_name[4:] # Remove 'Lazy' 

923 

924 # Try to import from openhcs.core.config 

925 from openhcs.core import config as config_module 

926 if hasattr(config_module, base_class_name): 

927 return getattr(config_module, base_class_name) 

928 except (ImportError, AttributeError): 

929 pass 

930 

931 # For other lazy dataclasses, try to find the Global version 

932 if not class_name.startswith('Global') and class_name.endswith('Config'): 

933 try: 

934 # Try to find GlobalXxxConfig version 

935 global_class_name = f'Global{class_name}' 

936 module = __import__(dataclass_type.__module__, fromlist=[global_class_name]) 

937 if hasattr(module, global_class_name): 

938 return getattr(module, global_class_name) 

939 except (ImportError, AttributeError): 

940 pass 

941 

942 # If no resolution found, return the original type 

943 return dataclass_type 

944 

945 except Exception: 

946 return dataclass_type 

947 

948 @staticmethod 

949 def _extract_all_field_docs(dataclass_type: type) -> Dict[str, str]: 

950 """Extract all field documentation for a dataclass and return as a dictionary. 

951 

952 This method combines all documentation extraction approaches and caches the results. 

953 

954 Args: 

955 dataclass_type: The dataclass type to extract documentation from 

956 

957 Returns: 

958 Dictionary mapping field names to their documentation 

959 """ 

960 all_docs = {} 

961 

962 try: 

963 import dataclasses 

964 

965 # Try inline field documentation first 

966 inline_docs = SignatureAnalyzer._extract_inline_field_docs(dataclass_type) 

967 all_docs.update(inline_docs) 

968 

969 # Try field type documentation (for nested dataclasses) 

970 field_type_docs = SignatureAnalyzer._extract_field_type_docs(dataclass_type) 

971 for field_name, doc in field_type_docs.items(): 

972 if field_name not in all_docs: # Don't overwrite inline docs 

973 all_docs[field_name] = doc 

974 

975 # Try docstring parameters 

976 docstring_info = DocstringExtractor.extract(dataclass_type) 

977 if docstring_info.parameters: 

978 for field_name, doc in docstring_info.parameters.items(): 

979 if field_name not in all_docs: # Don't overwrite previous docs 

980 all_docs[field_name] = doc 

981 

982 # Try field metadata 

983 fields = dataclasses.fields(dataclass_type) 

984 for field in fields: 

985 if field.name not in all_docs: # Don't overwrite previous docs 

986 if hasattr(field, 'metadata') and 'description' in field.metadata: 

987 all_docs[field.name] = field.metadata['description'] 

988 

989 # ENHANCEMENT: Try inheritance - check parent classes for missing field documentation 

990 for field in fields: 

991 if field.name not in all_docs: # Only for fields still missing documentation 

992 # Walk up the inheritance chain 

993 for base_class in dataclass_type.__mro__[1:]: # Skip the class itself 

994 if base_class == object: 

995 continue 

996 if dataclasses.is_dataclass(base_class): 

997 # Check if this base class has the field with documentation 

998 try: 

999 base_fields = dataclasses.fields(base_class) 

1000 base_field_names = [f.name for f in base_fields] 

1001 if field.name in base_field_names: 

1002 # Try to get documentation from the base class 

1003 inherited_doc = SignatureAnalyzer.extract_field_documentation(base_class, field.name) 

1004 if inherited_doc: 

1005 all_docs[field.name] = inherited_doc 

1006 break # Found documentation, stop looking 

1007 except Exception: 

1008 continue # Try next base class 

1009 

1010 except Exception: 

1011 pass # Return whatever we managed to extract 

1012 

1013 return all_docs 

1014 

1015 @staticmethod 

1016 def extract_field_documentation_from_context(field_name: str, context_types: list[type]) -> Optional[str]: 

1017 """Extract field documentation by searching through multiple dataclass types. 

1018 

1019 This method is useful when you don't know exactly which dataclass contains 

1020 a field, but you have a list of candidate types to search through. 

1021 

1022 Args: 

1023 field_name: Name of the field to get documentation for 

1024 context_types: List of dataclass types to search through 

1025 

1026 Returns: 

1027 Field documentation string, or None if not found 

1028 """ 

1029 for dataclass_type in context_types: 

1030 if dataclass_type: 

1031 doc = SignatureAnalyzer.extract_field_documentation(dataclass_type, field_name) 

1032 if doc: 

1033 return doc 

1034 return None 

1035 

1036 @staticmethod 

1037 def _analyze_dataclass_instance(instance: object) -> Dict[str, ParameterInfo]: 

1038 """Extract parameter information from a dataclass instance.""" 

1039 try: 

1040 # Get the type and analyze it 

1041 dataclass_type = type(instance) 

1042 parameters = SignatureAnalyzer._analyze_dataclass(dataclass_type) 

1043 

1044 # Update default values with current instance values 

1045 # For lazy dataclasses, use object.__getattribute__ to preserve None values for placeholders 

1046 for name, param_info in parameters.items(): 

1047 if hasattr(instance, name): 

1048 # Check if this is a lazy dataclass that should preserve None values 

1049 if hasattr(instance, '_resolve_field_value'): 

1050 # This is a lazy dataclass - use object.__getattribute__ to get stored value 

1051 current_value = object.__getattribute__(instance, name) 

1052 else: 

1053 # Regular dataclass - use normal getattr 

1054 current_value = getattr(instance, name) 

1055 

1056 # Create new ParameterInfo with current value as default 

1057 parameters[name] = ParameterInfo( 

1058 name=param_info.name, 

1059 param_type=param_info.param_type, 

1060 default_value=current_value, 

1061 is_required=param_info.is_required, 

1062 description=param_info.description 

1063 ) 

1064 

1065 return parameters 

1066 

1067 except Exception: 

1068 return {} 

1069 

1070 # Duplicate method removed - using the fixed version above