Coverage for openhcs/core/pipeline/funcstep_contract_validator.py: 48.1%

148 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2FuncStep memory contract validator for OpenHCS. 

3 

4This module provides the FuncStepContractValidator class, which is responsible for 

5validating memory type declarations for FunctionStep instances in a pipeline. 

6 

7Doctrinal Clauses: 

8- Clause 65 — No Fallback Logic 

9- Clause 88 — No Inferred Capabilities 

10- Clause 101 — Memory Type Declaration 

11- Clause 106-A — Declared Memory Types 

12- Clause 308 — Named Positional Enforcement 

13""" 

14 

15import inspect 

16import logging 

17from typing import Any, Callable, Dict, List, Optional, Tuple 

18 

19from openhcs.constants.constants import VALID_MEMORY_TYPES 

20from openhcs.core.steps.function_step import FunctionStep 

21 

22logger = logging.getLogger(__name__) 

23 

24# ===== DECLARATIVE DEFAULT VALUES ===== 

25# These declarations control defaults and may be moved to configuration in the future 

26 

27# Simple, direct error messages 

28def missing_memory_type_error(func_name, step_name): 

29 return ( 

30 f"Function '{func_name}' in step '{step_name}' needs memory type decorator (@numpy, @cupy, @torch, etc.)\n" 

31 f"\n" 

32 f"💡 SOLUTION: Use OpenHCS registry functions instead of raw external library functions:\n" 

33 f"\n" 

34 f"❌ WRONG:\n" 

35 f" import pyclesperanto as cle\n" 

36 f" step = FunctionStep(func=cle.{func_name}, name='{step_name}')\n" 

37 f"\n" 

38 f"✅ CORRECT:\n" 

39 f" from openhcs.processing.func_registry import get_function_by_name\n" 

40 f" {func_name}_func = get_function_by_name('{func_name}', 'pyclesperanto') # or 'numpy', 'cupy'\n" 

41 f" step = FunctionStep(func={func_name}_func, name='{step_name}')\n" 

42 f"\n" 

43 f"📋 Available functions: Use get_all_function_names('pyclesperanto') to see all options" 

44 ) 

45 

46def inconsistent_memory_types_error(step_name, func1, func2): 

47 return f"Functions in step '{step_name}' have different memory types: {func1} vs {func2}" 

48 

49def invalid_memory_type_error(func_name, input_type, output_type, valid_types): 

50 return f"Function '{func_name}' has invalid memory types: {input_type}/{output_type}. Valid: {valid_types}" 

51 

52def invalid_function_error(location, func): 

53 return f"Invalid function in {location}: {func}" 

54 

55def invalid_pattern_error(pattern): 

56 return f"Invalid function pattern: {pattern}" 

57 

58def missing_required_args_error(func_name, step_name, missing_args): 

59 return f"Function '{func_name}' in step '{step_name}' missing required args: {missing_args}" 

60 

61def complex_pattern_error(step_name): 

62 return f"Step '{step_name}' with special decorators must use simple function pattern" 

63 

64class FuncStepContractValidator: 

65 """ 

66 Validator for FunctionStep memory type contracts. 

67 

68 This validator enforces Clause 101 (Memory Type Declaration), Clause 88 

69 (No Inferred Capabilities), and Clause 308 (Named Positional Enforcement) 

70 by requiring explicit memory type declarations and named positional arguments 

71 for all FunctionStep instances and their functions. 

72 

73 Key principles: 

74 1. All functions in a FunctionStep must have consistent memory types 

75 2. The shared memory types are set as the step's memory types in the step plan 

76 3. Memory types must be validated at plan time, not runtime 

77 4. No fallback or inference of memory types is allowed 

78 5. All function patterns (callable, tuple, list, dict) are supported 

79 6. When using (func, kwargs) pattern, all required positional arguments must be 

80 explicitly provided in the kwargs dict 

81 """ 

82 

83 @staticmethod 

84 def validate_pipeline(steps: List[Any], pipeline_context: Optional[Dict[str, Any]] = None, orchestrator=None) -> Dict[str, Dict[str, str]]: 

85 """ 

86 Validate memory type contracts and function patterns for all FunctionStep instances in a pipeline. 

87 

88 This validator must run after the materialization and path planners to ensure 

89 proper plan integration. It verifies that these planners have run by checking 

90 the pipeline_context for planner execution flags and by validating the presence 

91 of required fields in the step plans. 

92 

93 Args: 

94 steps: The steps in the pipeline 

95 pipeline_context: Optional context object with planner execution flags 

96 orchestrator: Optional orchestrator for dict pattern key validation 

97 

98 Returns: 

99 Dictionary mapping step UIDs to memory type dictionaries 

100 

101 Raises: 

102 ValueError: If any FunctionStep violates memory type contracts or dict pattern validation 

103 AssertionError: If required planners have not run before this validator 

104 """ 

105 # Validate steps 

106 if not steps: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 logger.warning("No steps provided to FuncStepContractValidator") 

108 return {} 

109 

110 # Verify that required planners have run before this validator 

111 if pipeline_context is not None: 111 ↛ 127line 111 didn't jump to line 127 because the condition on line 111 was always true

112 # Check that step plans exist and have required fields from planners 

113 if not pipeline_context.step_plans: 

114 raise AssertionError( 

115 "Clause 101 Violation: Step plans must be initialized before FuncStepContractValidator." 

116 ) 

117 

118 # Check that materialization planner has run by verifying read_backend/write_backend exist 

119 sample_step_id = next(iter(pipeline_context.step_plans.keys())) 

120 sample_plan = pipeline_context.step_plans[sample_step_id] 

121 if 'read_backend' not in sample_plan or 'write_backend' not in sample_plan: 

122 raise AssertionError( 

123 "Clause 101 Violation: Materialization planner must run before FuncStepContractValidator. " 

124 "Step plans missing read_backend/write_backend fields." 

125 ) 

126 else: 

127 logger.warning( 

128 "No pipeline_context provided to FuncStepContractValidator. " 

129 "Cannot verify planner execution order. Falling back to attribute checks." 

130 ) 

131 

132 # Create step memory types dictionary 

133 step_memory_types = {} 

134 

135 # Process each step in the pipeline 

136 for i, step in enumerate(steps): 

137 # Only validate FunctionStep instances 

138 if isinstance(step, FunctionStep): 138 ↛ 136line 138 didn't jump to line 136 because the condition on line 138 was always true

139 # Verify that other planners have run before this validator by checking attributes 

140 # This is a fallback verification when pipeline_context is not provided 

141 try: 

142 # Check for path planner fields (using dunder names) 

143 _ = step.__input_dir__ 

144 _ = step.__output_dir__ 

145 except AttributeError as e: 

146 raise AssertionError( 

147 f"Clause 101 Violation: Required planners must run before FuncStepContractValidator. " 

148 f"Missing attribute: {e}. Path planner must run first." 

149 ) from e 

150 

151 memory_types = FuncStepContractValidator.validate_funcstep(step, orchestrator) 

152 step_memory_types[step.step_id] = memory_types 

153 

154 

155 

156 return step_memory_types 

157 

158 @staticmethod 

159 def validate_funcstep(step: FunctionStep, orchestrator=None) -> Dict[str, str]: 

160 """ 

161 Validate memory type contracts, func_pattern structure, and dict pattern keys for a FunctionStep instance. 

162 If special I/O or chainbreaker decorators are used, the func_pattern must be simple. 

163 

164 Args: 

165 step: The FunctionStep to validate 

166 orchestrator: Optional orchestrator for dict pattern key validation 

167 

168 Returns: 

169 Dictionary of validated memory types 

170 

171 Raises: 

172 ValueError: If the FunctionStep violates memory type contracts, structural rules, 

173 or dict pattern key validation. 

174 """ 

175 # Extract the function pattern and name from the step 

176 func_pattern = step.func # Renamed for clarity in this context 

177 step_name = step.name 

178 

179 # 1. Check if any function in the pattern uses special contract decorators 

180 # _extract_functions_from_pattern will raise ValueError if func_pattern itself is invalid (e.g. None, or bad structure) 

181 all_callables = FuncStepContractValidator._extract_functions_from_pattern(func_pattern, step_name) 

182 

183 uses_special_contracts = False 

184 if all_callables: # Only check attributes if we have actual callables 184 ↛ 196line 184 didn't jump to line 196 because the condition on line 184 was always true

185 for f_callable in all_callables: 

186 if hasattr(f_callable, '__special_inputs__') or \ 

187 hasattr(f_callable, '__special_outputs__') or \ 

188 hasattr(f_callable, '__chain_breaker__'): 

189 uses_special_contracts = True 

190 break 

191 

192 # 2. Special contracts validation is handled by validate_pattern_structure() below 

193 # No additional restrictions needed - all valid patterns support special contracts 

194 

195 # 3. Validate dict pattern keys if orchestrator is available 

196 if orchestrator is not None and isinstance(func_pattern, dict) and step.group_by is not None: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 FuncStepContractValidator._validate_dict_pattern_keys( 

198 func_pattern, step.group_by, step_name, orchestrator 

199 ) 

200 

201 # 4. Proceed with existing memory type validation using the original func_pattern 

202 input_type, output_type = FuncStepContractValidator.validate_function_pattern( 

203 func_pattern, step_name) 

204 

205 # Return the validated memory types and store the func for stateless execution 

206 return { 

207 'input_memory_type': input_type, 

208 'output_memory_type': output_type, 

209 'func': func_pattern # Store the validated func for stateless execution 

210 } 

211 

212 @staticmethod 

213 def validate_function_pattern( 

214 func: Any, 

215 step_name: str 

216 ) -> Tuple[str, str]: 

217 """ 

218 Validate memory type contracts for a function pattern. 

219 

220 Args: 

221 func: The function pattern to validate 

222 step_name: The name of the step containing the function 

223 

224 Returns: 

225 Tuple of (input_memory_type, output_memory_type) 

226 

227 Raises: 

228 ValueError: If the function pattern violates memory type contracts 

229 """ 

230 # Extract all functions from the pattern 

231 functions = FuncStepContractValidator.validate_pattern_structure(func, step_name) 

232 

233 if not functions: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 raise ValueError(f"No valid functions found in pattern for step {step_name}") 

235 

236 # Get memory types from the first function 

237 first_fn = functions[0] 

238 

239 # Validate that the function has explicit memory type declarations 

240 try: 

241 input_type = first_fn.input_memory_type 

242 output_type = first_fn.output_memory_type 

243 except AttributeError as exc: 

244 raise ValueError(missing_memory_type_error(first_fn.__name__, step_name)) from exc 

245 

246 # Validate memory types against known valid types 

247 if input_type not in VALID_MEMORY_TYPES or output_type not in VALID_MEMORY_TYPES: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 raise ValueError(invalid_memory_type_error( 

249 first_fn.__name__, input_type, output_type, ", ".join(sorted(VALID_MEMORY_TYPES)) 

250 )) 

251 

252 # Validate that all functions have valid memory type declarations 

253 for fn in functions[1:]: 253 ↛ 255line 253 didn't jump to line 255 because the loop on line 253 never started

254 # Validate that the function has explicit memory type declarations 

255 try: 

256 fn_input_type = fn.input_memory_type 

257 fn_output_type = fn.output_memory_type 

258 except AttributeError as exc: 

259 raise ValueError(missing_memory_type_error(fn.__name__, step_name)) from exc 

260 

261 # Validate memory types against known valid types 

262 if fn_input_type not in VALID_MEMORY_TYPES or fn_output_type not in VALID_MEMORY_TYPES: 

263 raise ValueError(invalid_memory_type_error( 

264 fn.__name__, fn_input_type, fn_output_type, ", ".join(sorted(VALID_MEMORY_TYPES)) 

265 )) 

266 

267 # Return first function's input type and last function's output type 

268 last_function = functions[-1] 

269 return input_type, last_function.output_memory_type 

270 

271 @staticmethod 

272 def _validate_required_args(func: Callable, kwargs: Dict[str, Any], step_name: str) -> None: 

273 """ 

274 Validate that all required positional arguments are provided in kwargs. 

275 

276 This enforces Clause 308 (Named Positional Enforcement) by requiring that 

277 all required positional arguments are explicitly provided in the kwargs dict 

278 when using the (func, kwargs) pattern. 

279 

280 Args: 

281 func: The function to validate 

282 kwargs: The kwargs dict to check 

283 step_name: The name of the step containing the function 

284 

285 Raises: 

286 ValueError: If any required positional arguments are missing from kwargs 

287 """ 

288 # Get the function signature 

289 sig = inspect.signature(func) 

290 

291 # Collect names of required positional arguments 

292 required_args = [] 

293 for name, param in sig.parameters.items(): 

294 # Check if parameter is positional (POSITIONAL_ONLY or POSITIONAL_OR_KEYWORD) 

295 if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD): 

296 # Check if parameter has no default value 

297 if param.default is inspect.Parameter.empty: 

298 required_args.append(name) 

299 

300 # Check if all required args are in kwargs 

301 missing_args = [arg for arg in required_args if arg not in kwargs] 

302 

303 # Raise error if any required args are missing 

304 if missing_args: 

305 raise ValueError(missing_required_args_error(func.__name__, step_name, missing_args)) 

306 

307 @staticmethod 

308 def _validate_dict_pattern_keys( 

309 func_pattern: dict, 

310 group_by, 

311 step_name: str, 

312 orchestrator 

313 ) -> None: 

314 """ 

315 Validate that dict function pattern keys match available component keys. 

316 

317 This validation ensures compile-time guarantee that dict patterns will work 

318 at runtime by checking that all dict keys exist in the actual component data. 

319 

320 Args: 

321 func_pattern: Dict function pattern to validate 

322 group_by: GroupBy enum specifying component type 

323 step_name: Name of the step containing the function 

324 orchestrator: Orchestrator for component key access 

325 

326 Raises: 

327 ValueError: If dict pattern keys don't match available component keys 

328 """ 

329 # Get available component keys from orchestrator 

330 try: 

331 available_keys = orchestrator.get_component_keys(group_by) 

332 available_keys_set = set(str(key) for key in available_keys) 

333 except Exception as e: 

334 raise ValueError(f"Failed to get component keys for {group_by.value}: {e}") 

335 

336 # Check each dict key against available keys 

337 pattern_keys = list(func_pattern.keys()) 

338 pattern_keys_set = set(str(key) for key in pattern_keys) 

339 

340 # Try direct string match first 

341 missing_keys = pattern_keys_set - available_keys_set 

342 

343 if missing_keys: 

344 # Try integer conversion for missing keys 

345 still_missing = set() 

346 for key in missing_keys: 

347 try: 

348 # Try converting pattern key to int and check if int version exists in available keys 

349 key_as_int = int(key) 

350 if str(key_as_int) not in available_keys_set: 

351 still_missing.add(key) 

352 except (ValueError, TypeError): 

353 # Try converting available keys to int and check if string key matches 

354 found_as_int = False 

355 for avail_key in available_keys_set: 

356 try: 

357 if int(avail_key) == int(key): 

358 found_as_int = True 

359 break 

360 except (ValueError, TypeError): 

361 continue 

362 if not found_as_int: 

363 still_missing.add(key) 

364 

365 if still_missing: 

366 raise ValueError( 

367 f"Function pattern keys not found in available {group_by.value} components for step '{step_name}'. " 

368 f"Missing keys: {sorted(still_missing)}. " 

369 f"Available keys: {sorted(available_keys)}. " 

370 f"Function pattern keys must match component values from the plate data." 

371 ) 

372 

373 @staticmethod 

374 def validate_pattern_structure( 

375 func: Any, 

376 step_name: str 

377 ) -> List[Callable]: 

378 """ 

379 Validate and extract all functions from a function pattern. 

380 

381 This is a public wrapper for _extract_functions_from_pattern that provides 

382 a stable API for pattern structure validation. 

383 

384 Supports nested patterns of arbitrary depth, including: 

385 - Direct callable 

386 - Tuple of (callable, kwargs) 

387 - List of callables or patterns 

388 - Dict of keyed callables or patterns 

389 

390 Args: 

391 func: The function pattern to validate and extract functions from 

392 step_name: The name of the step or component containing the function 

393 

394 Returns: 

395 List of functions in the pattern 

396 

397 Raises: 

398 ValueError: If the function pattern is invalid 

399 """ 

400 return FuncStepContractValidator._extract_functions_from_pattern(func, step_name) 

401 

402 @staticmethod 

403 def _extract_functions_from_pattern( 

404 func: Any, 

405 step_name: str 

406 ) -> List[Callable]: 

407 """ 

408 Extract all functions from a function pattern. 

409 

410 Supports nested patterns of arbitrary depth, including: 

411 - Direct callable 

412 - Tuple of (callable, kwargs) 

413 - List of callables or patterns 

414 - Dict of keyed callables or patterns 

415 

416 Args: 

417 func: The function pattern to extract functions from 

418 step_name: The name of the step containing the function 

419 

420 Returns: 

421 List of functions in the pattern 

422 

423 Raises: 

424 ValueError: If the function pattern is invalid 

425 """ 

426 functions = [] 

427 

428 # Case 1: Direct callable 

429 if callable(func) and not isinstance(func, type): 

430 functions.append(func) 

431 return functions 

432 

433 # Case 2: Tuple of (callable, kwargs) 

434 if (isinstance(func, tuple) and len(func) == 2 and 

435 callable(func[0]) and isinstance(func[1], dict)): 

436 # The kwargs dict is optional - if provided, it will be used during execution 

437 # No need to validate required args here as the execution logic handles this gracefully 

438 functions.append(func[0]) 

439 return functions 

440 

441 # Case 3: List of patterns 

442 if isinstance(func, list): 442 ↛ 454line 442 didn't jump to line 454 because the condition on line 442 was always true

443 for i, f in enumerate(func): 

444 # Recursively extract functions from nested patterns 

445 if isinstance(f, (list, dict, tuple)) or (callable(f) and not isinstance(f, type)): 445 ↛ 450line 445 didn't jump to line 450 because the condition on line 445 was always true

446 nested_functions = FuncStepContractValidator._extract_functions_from_pattern( 

447 f, step_name) 

448 functions.extend(nested_functions) 

449 else: 

450 raise ValueError(invalid_function_error(f"list at index {i}", f)) 

451 return functions 

452 

453 # Case 4: Dict of keyed patterns 

454 if isinstance(func, dict): 

455 for key, f in func.items(): 

456 # Recursively extract functions from nested patterns 

457 if isinstance(f, (list, dict, tuple)) or (callable(f) and not isinstance(f, type)): 

458 nested_functions = FuncStepContractValidator._extract_functions_from_pattern( 

459 f, step_name) 

460 functions.extend(nested_functions) 

461 else: 

462 raise ValueError(invalid_function_error(f"dict with key '{key}'", f)) 

463 return functions 

464 

465 # Invalid type 

466 raise ValueError(invalid_pattern_error(func))