Coverage for openhcs/core/pipeline/funcstep_contract_validator.py: 56.8%

183 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2FuncStep memory contract validator for OpenHCS. 

3 

4This module provides the FuncStepContractValidator class, which is responsible for 

5validating memory type declarations for FunctionStep instances in a pipeline. 

6 

7Doctrinal Clauses: 

8- Clause 65 — No Fallback Logic 

9- Clause 88 — No Inferred Capabilities 

10- Clause 101 — Memory Type Declaration 

11- Clause 106-A — Declared Memory Types 

12- Clause 308 — Named Positional Enforcement 

13""" 

14 

15import inspect 

16import logging 

17from typing import Any, Callable, Dict, List, Optional, Tuple 

18 

19from openhcs.constants.constants import VALID_MEMORY_TYPES, get_openhcs_config 

20from openhcs.core.steps.function_step import FunctionStep 

21 

22from openhcs.core.components.validation import GenericValidator 

23 

24logger = logging.getLogger(__name__) 

25 

26# ===== DECLARATIVE DEFAULT VALUES ===== 

27# These declarations control defaults and may be moved to configuration in the future 

28 

29# Simple, direct error messages 

30def missing_memory_type_error(func_name, step_name): 

31 return ( 

32 f"Function '{func_name}' in step '{step_name}' needs memory type decorator (@numpy, @cupy, @torch, etc.)\n" 

33 f"\n" 

34 f"💡 SOLUTION: Use OpenHCS registry functions instead of raw external library functions:\n" 

35 f"\n" 

36 f"❌ WRONG:\n" 

37 f" import pyclesperanto as cle\n" 

38 f" step = FunctionStep(func=cle.{func_name}, name='{step_name}')\n" 

39 f"\n" 

40 f"✅ CORRECT:\n" 

41 f" from openhcs.processing.func_registry import get_function_by_name\n" 

42 f" {func_name}_func = get_function_by_name('{func_name}', 'pyclesperanto') # or 'numpy', 'cupy'\n" 

43 f" step = FunctionStep(func={func_name}_func, name='{step_name}')\n" 

44 f"\n" 

45 f"📋 Available functions: Use get_all_function_names('pyclesperanto') to see all options" 

46 ) 

47 

48def inconsistent_memory_types_error(step_name, func1, func2): 

49 return f"Functions in step '{step_name}' have different memory types: {func1} vs {func2}" 

50 

51def invalid_memory_type_error(func_name, input_type, output_type, valid_types): 

52 return f"Function '{func_name}' has invalid memory types: {input_type}/{output_type}. Valid: {valid_types}" 

53 

54def invalid_function_error(location, func): 

55 return f"Invalid function in {location}: {func}" 

56 

57def invalid_pattern_error(pattern): 

58 return f"Invalid function pattern: {pattern}" 

59 

60def missing_required_args_error(func_name, step_name, missing_args): 

61 return f"Function '{func_name}' in step '{step_name}' missing required args: {missing_args}" 

62 

63def complex_pattern_error(step_name): 

64 return f"Step '{step_name}' with special decorators must use simple function pattern" 

65 

66class FuncStepContractValidator: 

67 """ 

68 Validator for FunctionStep memory type contracts. 

69 

70 This validator enforces Clause 101 (Memory Type Declaration), Clause 88 

71 (No Inferred Capabilities), and Clause 308 (Named Positional Enforcement) 

72 by requiring explicit memory type declarations and named positional arguments 

73 for all FunctionStep instances and their functions. 

74 

75 Key principles: 

76 1. All functions in a FunctionStep must have consistent memory types 

77 2. The shared memory types are set as the step's memory types in the step plan 

78 3. Memory types must be validated at plan time, not runtime 

79 4. No fallback or inference of memory types is allowed 

80 5. All function patterns (callable, tuple, list, dict) are supported 

81 6. When using (func, kwargs) pattern, all required positional arguments must be 

82 explicitly provided in the kwargs dict 

83 """ 

84 

85 @staticmethod 

86 def validate_pipeline(steps: List[Any], pipeline_context: Optional[Dict[str, Any]] = None, orchestrator=None) -> Dict[str, Dict[str, str]]: 

87 """ 

88 Validate memory type contracts and function patterns for all FunctionStep instances in a pipeline. 

89 

90 This validator must run after the materialization and path planners to ensure 

91 proper plan integration. It verifies that these planners have run by checking 

92 the pipeline_context for planner execution flags and by validating the presence 

93 of required fields in the step plans. 

94 

95 Args: 

96 steps: The steps in the pipeline 

97 pipeline_context: Optional context object with planner execution flags 

98 orchestrator: Optional orchestrator for dict pattern key validation 

99 

100 Returns: 

101 Dictionary mapping step UIDs to memory type dictionaries 

102 

103 Raises: 

104 ValueError: If any FunctionStep violates memory type contracts or dict pattern validation 

105 AssertionError: If required planners have not run before this validator 

106 """ 

107 # Validate steps 

108 if not steps: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 logger.warning("No steps provided to FuncStepContractValidator") 

110 return {} 

111 

112 # Verify that required planners have run before this validator 

113 if pipeline_context is not None: 113 ↛ 129line 113 didn't jump to line 129 because the condition on line 113 was always true

114 # Check that step plans exist and have required fields from planners 

115 if not pipeline_context.step_plans: 

116 raise AssertionError( 

117 "Clause 101 Violation: Step plans must be initialized before FuncStepContractValidator." 

118 ) 

119 

120 # Check that materialization planner has run by verifying read_backend/write_backend exist 

121 sample_step_index = next(iter(pipeline_context.step_plans.keys())) 

122 sample_plan = pipeline_context.step_plans[sample_step_index] 

123 if 'read_backend' not in sample_plan or 'write_backend' not in sample_plan: 

124 raise AssertionError( 

125 "Clause 101 Violation: Materialization planner must run before FuncStepContractValidator. " 

126 "Step plans missing read_backend/write_backend fields." 

127 ) 

128 else: 

129 logger.warning( 

130 "No pipeline_context provided to FuncStepContractValidator. " 

131 "Cannot verify planner execution order. Falling back to attribute checks." 

132 ) 

133 

134 # Create step memory types dictionary 

135 step_memory_types = {} 

136 

137 # Process each step in the pipeline 

138 for i, step in enumerate(steps): 

139 # Only validate FunctionStep instances 

140 if isinstance(step, FunctionStep): 140 ↛ 138line 140 didn't jump to line 138 because the condition on line 140 was always true

141 # Verify that other planners have run before this validator by checking attributes 

142 # This is a fallback verification when pipeline_context is not provided 

143 try: 

144 # Check for path planner fields (using dunder names) 

145 _ = step.__input_dir__ 

146 _ = step.__output_dir__ 

147 except AttributeError as e: 

148 raise AssertionError( 

149 f"Clause 101 Violation: Required planners must run before FuncStepContractValidator. " 

150 f"Missing attribute: {e}. Path planner must run first." 

151 ) from e 

152 

153 memory_types = FuncStepContractValidator.validate_funcstep(step, orchestrator) 

154 step_memory_types[i] = memory_types # Use step index instead of step_id 

155 

156 

157 

158 return step_memory_types 

159 

160 @staticmethod 

161 def validate_funcstep(step: FunctionStep, orchestrator=None) -> Dict[str, str]: 

162 """ 

163 Validate memory type contracts, func_pattern structure, and dict pattern keys for a FunctionStep instance. 

164 If special I/O or chainbreaker decorators are used, the func_pattern must be simple. 

165 

166 Args: 

167 step: The FunctionStep to validate 

168 orchestrator: Optional orchestrator for dict pattern key validation 

169 

170 Returns: 

171 Dictionary of validated memory types 

172 

173 Raises: 

174 ValueError: If the FunctionStep violates memory type contracts, structural rules, 

175 or dict pattern key validation. 

176 """ 

177 # Extract the function pattern and name from the step 

178 func_pattern = step.func # Renamed for clarity in this context 

179 step_name = step.name 

180 

181 # 1. Check if any function in the pattern uses special contract decorators 

182 # _extract_functions_from_pattern will raise ValueError if func_pattern itself is invalid (e.g. None, or bad structure) 

183 all_callables = FuncStepContractValidator._extract_functions_from_pattern(func_pattern, step_name) 

184 

185 uses_special_contracts = False 

186 if all_callables: # Only check attributes if we have actual callables 186 ↛ 198line 186 didn't jump to line 198 because the condition on line 186 was always true

187 for f_callable in all_callables: 

188 if hasattr(f_callable, '__special_inputs__') or \ 

189 hasattr(f_callable, '__special_outputs__') or \ 

190 hasattr(f_callable, '__chain_breaker__'): 

191 uses_special_contracts = True 

192 break 

193 

194 # 2. Special contracts validation is handled by validate_pattern_structure() below 

195 # No additional restrictions needed - all valid patterns support special contracts 

196 

197 # 3. Validate using generic validation system 

198 config = get_openhcs_config() 

199 validator = GenericValidator(config) 

200 

201 # Check for constraint violation: group_by ∈ variable_components 

202 if step.group_by and step.group_by.value in [vc.value for vc in step.variable_components]: 

203 # Auto-resolve constraint violation by nullifying group_by 

204 logger.warning( 

205 f"Step '{step_name}': Auto-resolved group_by conflict. " 

206 f"Set group_by to None due to conflict with variable_components {[vc.value for vc in step.variable_components]}. " 

207 f"Original group_by was {step.group_by.value}." 

208 ) 

209 step.group_by = None 

210 

211 # Validate step configuration after auto-resolution 

212 validation_result = validator.validate_step( 

213 step.variable_components, step.group_by, func_pattern, step_name 

214 ) 

215 if not validation_result.is_valid: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 raise ValueError(validation_result.error_message) 

217 

218 # Validate dict pattern keys if orchestrator is available 

219 if orchestrator is not None and isinstance(func_pattern, dict) and step.group_by is not None: 

220 dict_validation_result = validator.validate_dict_pattern_keys( 

221 func_pattern, step.group_by, step_name, orchestrator 

222 ) 

223 if not dict_validation_result.is_valid: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 raise ValueError(dict_validation_result.error_message) 

225 

226 # 4. Proceed with existing memory type validation using the original func_pattern 

227 input_type, output_type = FuncStepContractValidator.validate_function_pattern( 

228 func_pattern, step_name) 

229 

230 # Return the validated memory types and store the func for stateless execution 

231 return { 

232 'input_memory_type': input_type, 

233 'output_memory_type': output_type, 

234 'func': func_pattern # Store the validated func for stateless execution 

235 } 

236 

237 @staticmethod 

238 def validate_function_pattern( 

239 func: Any, 

240 step_name: str 

241 ) -> Tuple[str, str]: 

242 """ 

243 Validate memory type contracts for a function pattern. 

244 

245 Args: 

246 func: The function pattern to validate 

247 step_name: The name of the step containing the function 

248 

249 Returns: 

250 Tuple of (input_memory_type, output_memory_type) 

251 

252 Raises: 

253 ValueError: If the function pattern violates memory type contracts 

254 """ 

255 # Extract all functions from the pattern 

256 functions = FuncStepContractValidator.validate_pattern_structure(func, step_name) 

257 

258 if not functions: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 raise ValueError(f"No valid functions found in pattern for step {step_name}") 

260 

261 # Get memory types from the first function 

262 first_fn = functions[0] 

263 

264 # Validate that the function has explicit memory type declarations 

265 try: 

266 input_type = first_fn.input_memory_type 

267 output_type = first_fn.output_memory_type 

268 except AttributeError as exc: 

269 raise ValueError(missing_memory_type_error(first_fn.__name__, step_name)) from exc 

270 

271 # Validate memory types against known valid types 

272 if input_type not in VALID_MEMORY_TYPES or output_type not in VALID_MEMORY_TYPES: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 raise ValueError(invalid_memory_type_error( 

274 first_fn.__name__, input_type, output_type, ", ".join(sorted(VALID_MEMORY_TYPES)) 

275 )) 

276 

277 # Validate that all functions have valid memory type declarations 

278 for fn in functions[1:]: 278 ↛ 280line 278 didn't jump to line 280 because the loop on line 278 never started

279 # Validate that the function has explicit memory type declarations 

280 try: 

281 fn_input_type = fn.input_memory_type 

282 fn_output_type = fn.output_memory_type 

283 except AttributeError as exc: 

284 raise ValueError(missing_memory_type_error(fn.__name__, step_name)) from exc 

285 

286 # Validate memory types against known valid types 

287 if fn_input_type not in VALID_MEMORY_TYPES or fn_output_type not in VALID_MEMORY_TYPES: 

288 raise ValueError(invalid_memory_type_error( 

289 fn.__name__, fn_input_type, fn_output_type, ", ".join(sorted(VALID_MEMORY_TYPES)) 

290 )) 

291 

292 # Return first function's input type and last function's output type 

293 last_function = functions[-1] 

294 return input_type, last_function.output_memory_type 

295 

296 @staticmethod 

297 def _validate_required_args(func: Callable, kwargs: Dict[str, Any], step_name: str) -> None: 

298 """ 

299 Validate that all required positional arguments are provided in kwargs. 

300 

301 This enforces Clause 308 (Named Positional Enforcement) by requiring that 

302 all required positional arguments are explicitly provided in the kwargs dict 

303 when using the (func, kwargs) pattern. 

304 

305 Args: 

306 func: The function to validate 

307 kwargs: The kwargs dict to check 

308 step_name: The name of the step containing the function 

309 

310 Raises: 

311 ValueError: If any required positional arguments are missing from kwargs 

312 """ 

313 # Get the function signature 

314 sig = inspect.signature(func) 

315 

316 # Collect names of required positional arguments 

317 required_args = [] 

318 for name, param in sig.parameters.items(): 

319 # Check if parameter is positional (POSITIONAL_ONLY or POSITIONAL_OR_KEYWORD) 

320 if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD): 

321 # Check if parameter has no default value 

322 if param.default is inspect.Parameter.empty: 

323 required_args.append(name) 

324 

325 # Check if all required args are in kwargs 

326 missing_args = [arg for arg in required_args if arg not in kwargs] 

327 

328 # Raise error if any required args are missing 

329 if missing_args: 

330 raise ValueError(missing_required_args_error(func.__name__, step_name, missing_args)) 

331 

332 @staticmethod 

333 def _validate_dict_pattern_keys( 

334 func_pattern: dict, 

335 group_by, 

336 step_name: str, 

337 orchestrator 

338 ) -> None: 

339 """ 

340 Validate that dict function pattern keys match available component keys. 

341 

342 This validation ensures compile-time guarantee that dict patterns will work 

343 at runtime by checking that all dict keys exist in the actual component data. 

344 

345 Args: 

346 func_pattern: Dict function pattern to validate 

347 group_by: GroupBy enum specifying component type 

348 step_name: Name of the step containing the function 

349 orchestrator: Orchestrator for component key access 

350 

351 Raises: 

352 ValueError: If dict pattern keys don't match available component keys 

353 """ 

354 # Get available component keys from orchestrator 

355 try: 

356 available_keys = orchestrator.get_component_keys(group_by) 

357 available_keys_set = set(str(key) for key in available_keys) 

358 except Exception as e: 

359 raise ValueError(f"Failed to get component keys for {group_by.value}: {e}") 

360 

361 # Check each dict key against available keys 

362 pattern_keys = list(func_pattern.keys()) 

363 pattern_keys_set = set(str(key) for key in pattern_keys) 

364 

365 # Try direct string match first 

366 missing_keys = pattern_keys_set - available_keys_set 

367 

368 if missing_keys: 

369 # Try integer conversion for missing keys 

370 still_missing = set() 

371 for key in missing_keys: 

372 try: 

373 # Try converting pattern key to int and check if int version exists in available keys 

374 key_as_int = int(key) 

375 if str(key_as_int) not in available_keys_set: 

376 still_missing.add(key) 

377 except (ValueError, TypeError): 

378 # Try converting available keys to int and check if string key matches 

379 found_as_int = False 

380 for avail_key in available_keys_set: 

381 try: 

382 if int(avail_key) == int(key): 

383 found_as_int = True 

384 break 

385 except (ValueError, TypeError): 

386 continue 

387 if not found_as_int: 

388 still_missing.add(key) 

389 

390 if still_missing: 

391 raise ValueError( 

392 f"Function pattern keys not found in available {group_by.value} components for step '{step_name}'. " 

393 f"Missing keys: {sorted(still_missing)}. " 

394 f"Available keys: {sorted(available_keys)}. " 

395 f"Function pattern keys must match component values from the plate data." 

396 ) 

397 

398 @staticmethod 

399 def validate_pattern_structure( 

400 func: Any, 

401 step_name: str 

402 ) -> List[Callable]: 

403 """ 

404 Validate and extract all functions from a function pattern. 

405 

406 This is a public wrapper for _extract_functions_from_pattern that provides 

407 a stable API for pattern structure validation. 

408 

409 Supports nested patterns of arbitrary depth, including: 

410 - Direct callable 

411 - Tuple of (callable, kwargs) 

412 - List of callables or patterns 

413 - Dict of keyed callables or patterns 

414 

415 Args: 

416 func: The function pattern to validate and extract functions from 

417 step_name: The name of the step or component containing the function 

418 

419 Returns: 

420 List of functions in the pattern 

421 

422 Raises: 

423 ValueError: If the function pattern is invalid 

424 """ 

425 return FuncStepContractValidator._extract_functions_from_pattern(func, step_name) 

426 

427 @staticmethod 

428 def _is_function_reference(obj): 

429 """Check if an object is a FunctionReference.""" 

430 try: 

431 from openhcs.core.pipeline.compiler import FunctionReference 

432 return isinstance(obj, FunctionReference) 

433 except ImportError: 

434 return False 

435 

436 @staticmethod 

437 def _resolve_function_reference(func_or_ref): 

438 """Resolve a FunctionReference to an actual function, or return the original.""" 

439 from openhcs.core.pipeline.compiler import FunctionReference 

440 if isinstance(func_or_ref, FunctionReference): 440 ↛ 442line 440 didn't jump to line 442 because the condition on line 440 was always true

441 return func_or_ref.resolve() 

442 return func_or_ref 

443 

444 @staticmethod 

445 def _extract_functions_from_pattern( 

446 func: Any, 

447 step_name: str 

448 ) -> List[Callable]: 

449 """ 

450 Extract all functions from a function pattern. 

451 

452 Supports nested patterns of arbitrary depth, including: 

453 - Direct callable 

454 - FunctionReference objects 

455 - Tuple of (callable/FunctionReference, kwargs) 

456 - List of callables or patterns 

457 - Dict of keyed callables or patterns 

458 

459 Args: 

460 func: The function pattern to extract functions from 

461 step_name: The name of the step containing the function 

462 

463 Returns: 

464 List of functions in the pattern 

465 

466 Raises: 

467 ValueError: If the function pattern is invalid 

468 """ 

469 functions = [] 

470 

471 # Case 1: Direct FunctionReference 

472 from openhcs.core.pipeline.compiler import FunctionReference 

473 if isinstance(func, FunctionReference): 

474 resolved_func = func.resolve() 

475 functions.append(resolved_func) 

476 return functions 

477 

478 # Case 2: Direct callable 

479 if callable(func) and not isinstance(func, type): 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true

480 functions.append(func) 

481 return functions 

482 

483 # Case 3: Tuple of (callable/FunctionReference, kwargs) 

484 if isinstance(func, tuple) and len(func) == 2 and isinstance(func[1], dict): 

485 # Resolve the first element if it's a FunctionReference 

486 resolved_first = FuncStepContractValidator._resolve_function_reference(func[0]) 

487 if callable(resolved_first) and not isinstance(resolved_first, type): 487 ↛ 494line 487 didn't jump to line 494 because the condition on line 487 was always true

488 # The kwargs dict is optional - if provided, it will be used during execution 

489 # No need to validate required args here as the execution logic handles this gracefully 

490 functions.append(resolved_first) 

491 return functions 

492 

493 # Case 4: List of patterns 

494 if isinstance(func, list): 

495 from openhcs.core.pipeline.compiler import FunctionReference 

496 for i, f in enumerate(func): 

497 # Check if it's a valid pattern (including FunctionReference) 

498 is_valid_pattern = ( 

499 isinstance(f, (list, dict, tuple, FunctionReference)) or 

500 (callable(f) and not isinstance(f, type)) 

501 ) 

502 if is_valid_pattern: 502 ↛ 507line 502 didn't jump to line 507 because the condition on line 502 was always true

503 nested_functions = FuncStepContractValidator._extract_functions_from_pattern( 

504 f, step_name) 

505 functions.extend(nested_functions) 

506 else: 

507 raise ValueError(invalid_function_error(f"list at index {i}", f)) 

508 return functions 

509 

510 # Case 5: Dict of keyed patterns 

511 if isinstance(func, dict): 511 ↛ 528line 511 didn't jump to line 528 because the condition on line 511 was always true

512 from openhcs.core.pipeline.compiler import FunctionReference 

513 for key, f in func.items(): 

514 # Check if it's a valid pattern (including FunctionReference) 

515 is_valid_pattern = ( 

516 isinstance(f, (list, dict, tuple, FunctionReference)) or 

517 (callable(f) and not isinstance(f, type)) 

518 ) 

519 if is_valid_pattern: 519 ↛ 524line 519 didn't jump to line 524 because the condition on line 519 was always true

520 nested_functions = FuncStepContractValidator._extract_functions_from_pattern( 

521 f, step_name) 

522 functions.extend(nested_functions) 

523 else: 

524 raise ValueError(invalid_function_error(f"dict with key '{key}'", f)) 

525 return functions 

526 

527 # Invalid type 

528 raise ValueError(invalid_pattern_error(func))