Coverage for openhcs/core/pipeline/funcstep_contract_validator.py: 56.8%
183 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1"""
2FuncStep memory contract validator for OpenHCS.
4This module provides the FuncStepContractValidator class, which is responsible for
5validating memory type declarations for FunctionStep instances in a pipeline.
7Doctrinal Clauses:
8- Clause 65 — No Fallback Logic
9- Clause 88 — No Inferred Capabilities
10- Clause 101 — Memory Type Declaration
11- Clause 106-A — Declared Memory Types
12- Clause 308 — Named Positional Enforcement
13"""
15import inspect
16import logging
17from typing import Any, Callable, Dict, List, Optional, Tuple
19from openhcs.constants.constants import VALID_MEMORY_TYPES, get_openhcs_config
20from openhcs.core.steps.function_step import FunctionStep
22from openhcs.core.components.validation import GenericValidator
24logger = logging.getLogger(__name__)
26# ===== DECLARATIVE DEFAULT VALUES =====
27# These declarations control defaults and may be moved to configuration in the future
29# Simple, direct error messages
30def missing_memory_type_error(func_name, step_name):
31 return (
32 f"Function '{func_name}' in step '{step_name}' needs memory type decorator (@numpy, @cupy, @torch, etc.)\n"
33 f"\n"
34 f"💡 SOLUTION: Use OpenHCS registry functions instead of raw external library functions:\n"
35 f"\n"
36 f"❌ WRONG:\n"
37 f" import pyclesperanto as cle\n"
38 f" step = FunctionStep(func=cle.{func_name}, name='{step_name}')\n"
39 f"\n"
40 f"✅ CORRECT:\n"
41 f" from openhcs.processing.func_registry import get_function_by_name\n"
42 f" {func_name}_func = get_function_by_name('{func_name}', 'pyclesperanto') # or 'numpy', 'cupy'\n"
43 f" step = FunctionStep(func={func_name}_func, name='{step_name}')\n"
44 f"\n"
45 f"📋 Available functions: Use get_all_function_names('pyclesperanto') to see all options"
46 )
48def inconsistent_memory_types_error(step_name, func1, func2):
49 return f"Functions in step '{step_name}' have different memory types: {func1} vs {func2}"
51def invalid_memory_type_error(func_name, input_type, output_type, valid_types):
52 return f"Function '{func_name}' has invalid memory types: {input_type}/{output_type}. Valid: {valid_types}"
54def invalid_function_error(location, func):
55 return f"Invalid function in {location}: {func}"
57def invalid_pattern_error(pattern):
58 return f"Invalid function pattern: {pattern}"
60def missing_required_args_error(func_name, step_name, missing_args):
61 return f"Function '{func_name}' in step '{step_name}' missing required args: {missing_args}"
63def complex_pattern_error(step_name):
64 return f"Step '{step_name}' with special decorators must use simple function pattern"
66class FuncStepContractValidator:
67 """
68 Validator for FunctionStep memory type contracts.
70 This validator enforces Clause 101 (Memory Type Declaration), Clause 88
71 (No Inferred Capabilities), and Clause 308 (Named Positional Enforcement)
72 by requiring explicit memory type declarations and named positional arguments
73 for all FunctionStep instances and their functions.
75 Key principles:
76 1. All functions in a FunctionStep must have consistent memory types
77 2. The shared memory types are set as the step's memory types in the step plan
78 3. Memory types must be validated at plan time, not runtime
79 4. No fallback or inference of memory types is allowed
80 5. All function patterns (callable, tuple, list, dict) are supported
81 6. When using (func, kwargs) pattern, all required positional arguments must be
82 explicitly provided in the kwargs dict
83 """
85 @staticmethod
86 def validate_pipeline(steps: List[Any], pipeline_context: Optional[Dict[str, Any]] = None, orchestrator=None) -> Dict[str, Dict[str, str]]:
87 """
88 Validate memory type contracts and function patterns for all FunctionStep instances in a pipeline.
90 This validator must run after the materialization and path planners to ensure
91 proper plan integration. It verifies that these planners have run by checking
92 the pipeline_context for planner execution flags and by validating the presence
93 of required fields in the step plans.
95 Args:
96 steps: The steps in the pipeline
97 pipeline_context: Optional context object with planner execution flags
98 orchestrator: Optional orchestrator for dict pattern key validation
100 Returns:
101 Dictionary mapping step UIDs to memory type dictionaries
103 Raises:
104 ValueError: If any FunctionStep violates memory type contracts or dict pattern validation
105 AssertionError: If required planners have not run before this validator
106 """
107 # Validate steps
108 if not steps: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 logger.warning("No steps provided to FuncStepContractValidator")
110 return {}
112 # Verify that required planners have run before this validator
113 if pipeline_context is not None: 113 ↛ 129line 113 didn't jump to line 129 because the condition on line 113 was always true
114 # Check that step plans exist and have required fields from planners
115 if not pipeline_context.step_plans:
116 raise AssertionError(
117 "Clause 101 Violation: Step plans must be initialized before FuncStepContractValidator."
118 )
120 # Check that materialization planner has run by verifying read_backend/write_backend exist
121 sample_step_index = next(iter(pipeline_context.step_plans.keys()))
122 sample_plan = pipeline_context.step_plans[sample_step_index]
123 if 'read_backend' not in sample_plan or 'write_backend' not in sample_plan:
124 raise AssertionError(
125 "Clause 101 Violation: Materialization planner must run before FuncStepContractValidator. "
126 "Step plans missing read_backend/write_backend fields."
127 )
128 else:
129 logger.warning(
130 "No pipeline_context provided to FuncStepContractValidator. "
131 "Cannot verify planner execution order. Falling back to attribute checks."
132 )
134 # Create step memory types dictionary
135 step_memory_types = {}
137 # Process each step in the pipeline
138 for i, step in enumerate(steps):
139 # Only validate FunctionStep instances
140 if isinstance(step, FunctionStep): 140 ↛ 138line 140 didn't jump to line 138 because the condition on line 140 was always true
141 # Verify that other planners have run before this validator by checking attributes
142 # This is a fallback verification when pipeline_context is not provided
143 try:
144 # Check for path planner fields (using dunder names)
145 _ = step.__input_dir__
146 _ = step.__output_dir__
147 except AttributeError as e:
148 raise AssertionError(
149 f"Clause 101 Violation: Required planners must run before FuncStepContractValidator. "
150 f"Missing attribute: {e}. Path planner must run first."
151 ) from e
153 memory_types = FuncStepContractValidator.validate_funcstep(step, orchestrator)
154 step_memory_types[i] = memory_types # Use step index instead of step_id
158 return step_memory_types
160 @staticmethod
161 def validate_funcstep(step: FunctionStep, orchestrator=None) -> Dict[str, str]:
162 """
163 Validate memory type contracts, func_pattern structure, and dict pattern keys for a FunctionStep instance.
164 If special I/O or chainbreaker decorators are used, the func_pattern must be simple.
166 Args:
167 step: The FunctionStep to validate
168 orchestrator: Optional orchestrator for dict pattern key validation
170 Returns:
171 Dictionary of validated memory types
173 Raises:
174 ValueError: If the FunctionStep violates memory type contracts, structural rules,
175 or dict pattern key validation.
176 """
177 # Extract the function pattern and name from the step
178 func_pattern = step.func # Renamed for clarity in this context
179 step_name = step.name
181 # 1. Check if any function in the pattern uses special contract decorators
182 # _extract_functions_from_pattern will raise ValueError if func_pattern itself is invalid (e.g. None, or bad structure)
183 all_callables = FuncStepContractValidator._extract_functions_from_pattern(func_pattern, step_name)
185 uses_special_contracts = False
186 if all_callables: # Only check attributes if we have actual callables 186 ↛ 198line 186 didn't jump to line 198 because the condition on line 186 was always true
187 for f_callable in all_callables:
188 if hasattr(f_callable, '__special_inputs__') or \
189 hasattr(f_callable, '__special_outputs__') or \
190 hasattr(f_callable, '__chain_breaker__'):
191 uses_special_contracts = True
192 break
194 # 2. Special contracts validation is handled by validate_pattern_structure() below
195 # No additional restrictions needed - all valid patterns support special contracts
197 # 3. Validate using generic validation system
198 config = get_openhcs_config()
199 validator = GenericValidator(config)
201 # Check for constraint violation: group_by ∈ variable_components
202 if step.group_by and step.group_by.value in [vc.value for vc in step.variable_components]:
203 # Auto-resolve constraint violation by nullifying group_by
204 logger.warning(
205 f"Step '{step_name}': Auto-resolved group_by conflict. "
206 f"Set group_by to None due to conflict with variable_components {[vc.value for vc in step.variable_components]}. "
207 f"Original group_by was {step.group_by.value}."
208 )
209 step.group_by = None
211 # Validate step configuration after auto-resolution
212 validation_result = validator.validate_step(
213 step.variable_components, step.group_by, func_pattern, step_name
214 )
215 if not validation_result.is_valid: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 raise ValueError(validation_result.error_message)
218 # Validate dict pattern keys if orchestrator is available
219 if orchestrator is not None and isinstance(func_pattern, dict) and step.group_by is not None:
220 dict_validation_result = validator.validate_dict_pattern_keys(
221 func_pattern, step.group_by, step_name, orchestrator
222 )
223 if not dict_validation_result.is_valid: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 raise ValueError(dict_validation_result.error_message)
226 # 4. Proceed with existing memory type validation using the original func_pattern
227 input_type, output_type = FuncStepContractValidator.validate_function_pattern(
228 func_pattern, step_name)
230 # Return the validated memory types and store the func for stateless execution
231 return {
232 'input_memory_type': input_type,
233 'output_memory_type': output_type,
234 'func': func_pattern # Store the validated func for stateless execution
235 }
237 @staticmethod
238 def validate_function_pattern(
239 func: Any,
240 step_name: str
241 ) -> Tuple[str, str]:
242 """
243 Validate memory type contracts for a function pattern.
245 Args:
246 func: The function pattern to validate
247 step_name: The name of the step containing the function
249 Returns:
250 Tuple of (input_memory_type, output_memory_type)
252 Raises:
253 ValueError: If the function pattern violates memory type contracts
254 """
255 # Extract all functions from the pattern
256 functions = FuncStepContractValidator.validate_pattern_structure(func, step_name)
258 if not functions: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 raise ValueError(f"No valid functions found in pattern for step {step_name}")
261 # Get memory types from the first function
262 first_fn = functions[0]
264 # Validate that the function has explicit memory type declarations
265 try:
266 input_type = first_fn.input_memory_type
267 output_type = first_fn.output_memory_type
268 except AttributeError as exc:
269 raise ValueError(missing_memory_type_error(first_fn.__name__, step_name)) from exc
271 # Validate memory types against known valid types
272 if input_type not in VALID_MEMORY_TYPES or output_type not in VALID_MEMORY_TYPES: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 raise ValueError(invalid_memory_type_error(
274 first_fn.__name__, input_type, output_type, ", ".join(sorted(VALID_MEMORY_TYPES))
275 ))
277 # Validate that all functions have valid memory type declarations
278 for fn in functions[1:]: 278 ↛ 280line 278 didn't jump to line 280 because the loop on line 278 never started
279 # Validate that the function has explicit memory type declarations
280 try:
281 fn_input_type = fn.input_memory_type
282 fn_output_type = fn.output_memory_type
283 except AttributeError as exc:
284 raise ValueError(missing_memory_type_error(fn.__name__, step_name)) from exc
286 # Validate memory types against known valid types
287 if fn_input_type not in VALID_MEMORY_TYPES or fn_output_type not in VALID_MEMORY_TYPES:
288 raise ValueError(invalid_memory_type_error(
289 fn.__name__, fn_input_type, fn_output_type, ", ".join(sorted(VALID_MEMORY_TYPES))
290 ))
292 # Return first function's input type and last function's output type
293 last_function = functions[-1]
294 return input_type, last_function.output_memory_type
296 @staticmethod
297 def _validate_required_args(func: Callable, kwargs: Dict[str, Any], step_name: str) -> None:
298 """
299 Validate that all required positional arguments are provided in kwargs.
301 This enforces Clause 308 (Named Positional Enforcement) by requiring that
302 all required positional arguments are explicitly provided in the kwargs dict
303 when using the (func, kwargs) pattern.
305 Args:
306 func: The function to validate
307 kwargs: The kwargs dict to check
308 step_name: The name of the step containing the function
310 Raises:
311 ValueError: If any required positional arguments are missing from kwargs
312 """
313 # Get the function signature
314 sig = inspect.signature(func)
316 # Collect names of required positional arguments
317 required_args = []
318 for name, param in sig.parameters.items():
319 # Check if parameter is positional (POSITIONAL_ONLY or POSITIONAL_OR_KEYWORD)
320 if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD):
321 # Check if parameter has no default value
322 if param.default is inspect.Parameter.empty:
323 required_args.append(name)
325 # Check if all required args are in kwargs
326 missing_args = [arg for arg in required_args if arg not in kwargs]
328 # Raise error if any required args are missing
329 if missing_args:
330 raise ValueError(missing_required_args_error(func.__name__, step_name, missing_args))
332 @staticmethod
333 def _validate_dict_pattern_keys(
334 func_pattern: dict,
335 group_by,
336 step_name: str,
337 orchestrator
338 ) -> None:
339 """
340 Validate that dict function pattern keys match available component keys.
342 This validation ensures compile-time guarantee that dict patterns will work
343 at runtime by checking that all dict keys exist in the actual component data.
345 Args:
346 func_pattern: Dict function pattern to validate
347 group_by: GroupBy enum specifying component type
348 step_name: Name of the step containing the function
349 orchestrator: Orchestrator for component key access
351 Raises:
352 ValueError: If dict pattern keys don't match available component keys
353 """
354 # Get available component keys from orchestrator
355 try:
356 available_keys = orchestrator.get_component_keys(group_by)
357 available_keys_set = set(str(key) for key in available_keys)
358 except Exception as e:
359 raise ValueError(f"Failed to get component keys for {group_by.value}: {e}")
361 # Check each dict key against available keys
362 pattern_keys = list(func_pattern.keys())
363 pattern_keys_set = set(str(key) for key in pattern_keys)
365 # Try direct string match first
366 missing_keys = pattern_keys_set - available_keys_set
368 if missing_keys:
369 # Try integer conversion for missing keys
370 still_missing = set()
371 for key in missing_keys:
372 try:
373 # Try converting pattern key to int and check if int version exists in available keys
374 key_as_int = int(key)
375 if str(key_as_int) not in available_keys_set:
376 still_missing.add(key)
377 except (ValueError, TypeError):
378 # Try converting available keys to int and check if string key matches
379 found_as_int = False
380 for avail_key in available_keys_set:
381 try:
382 if int(avail_key) == int(key):
383 found_as_int = True
384 break
385 except (ValueError, TypeError):
386 continue
387 if not found_as_int:
388 still_missing.add(key)
390 if still_missing:
391 raise ValueError(
392 f"Function pattern keys not found in available {group_by.value} components for step '{step_name}'. "
393 f"Missing keys: {sorted(still_missing)}. "
394 f"Available keys: {sorted(available_keys)}. "
395 f"Function pattern keys must match component values from the plate data."
396 )
398 @staticmethod
399 def validate_pattern_structure(
400 func: Any,
401 step_name: str
402 ) -> List[Callable]:
403 """
404 Validate and extract all functions from a function pattern.
406 This is a public wrapper for _extract_functions_from_pattern that provides
407 a stable API for pattern structure validation.
409 Supports nested patterns of arbitrary depth, including:
410 - Direct callable
411 - Tuple of (callable, kwargs)
412 - List of callables or patterns
413 - Dict of keyed callables or patterns
415 Args:
416 func: The function pattern to validate and extract functions from
417 step_name: The name of the step or component containing the function
419 Returns:
420 List of functions in the pattern
422 Raises:
423 ValueError: If the function pattern is invalid
424 """
425 return FuncStepContractValidator._extract_functions_from_pattern(func, step_name)
427 @staticmethod
428 def _is_function_reference(obj):
429 """Check if an object is a FunctionReference."""
430 try:
431 from openhcs.core.pipeline.compiler import FunctionReference
432 return isinstance(obj, FunctionReference)
433 except ImportError:
434 return False
436 @staticmethod
437 def _resolve_function_reference(func_or_ref):
438 """Resolve a FunctionReference to an actual function, or return the original."""
439 from openhcs.core.pipeline.compiler import FunctionReference
440 if isinstance(func_or_ref, FunctionReference): 440 ↛ 442line 440 didn't jump to line 442 because the condition on line 440 was always true
441 return func_or_ref.resolve()
442 return func_or_ref
444 @staticmethod
445 def _extract_functions_from_pattern(
446 func: Any,
447 step_name: str
448 ) -> List[Callable]:
449 """
450 Extract all functions from a function pattern.
452 Supports nested patterns of arbitrary depth, including:
453 - Direct callable
454 - FunctionReference objects
455 - Tuple of (callable/FunctionReference, kwargs)
456 - List of callables or patterns
457 - Dict of keyed callables or patterns
459 Args:
460 func: The function pattern to extract functions from
461 step_name: The name of the step containing the function
463 Returns:
464 List of functions in the pattern
466 Raises:
467 ValueError: If the function pattern is invalid
468 """
469 functions = []
471 # Case 1: Direct FunctionReference
472 from openhcs.core.pipeline.compiler import FunctionReference
473 if isinstance(func, FunctionReference):
474 resolved_func = func.resolve()
475 functions.append(resolved_func)
476 return functions
478 # Case 2: Direct callable
479 if callable(func) and not isinstance(func, type): 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true
480 functions.append(func)
481 return functions
483 # Case 3: Tuple of (callable/FunctionReference, kwargs)
484 if isinstance(func, tuple) and len(func) == 2 and isinstance(func[1], dict):
485 # Resolve the first element if it's a FunctionReference
486 resolved_first = FuncStepContractValidator._resolve_function_reference(func[0])
487 if callable(resolved_first) and not isinstance(resolved_first, type): 487 ↛ 494line 487 didn't jump to line 494 because the condition on line 487 was always true
488 # The kwargs dict is optional - if provided, it will be used during execution
489 # No need to validate required args here as the execution logic handles this gracefully
490 functions.append(resolved_first)
491 return functions
493 # Case 4: List of patterns
494 if isinstance(func, list):
495 from openhcs.core.pipeline.compiler import FunctionReference
496 for i, f in enumerate(func):
497 # Check if it's a valid pattern (including FunctionReference)
498 is_valid_pattern = (
499 isinstance(f, (list, dict, tuple, FunctionReference)) or
500 (callable(f) and not isinstance(f, type))
501 )
502 if is_valid_pattern: 502 ↛ 507line 502 didn't jump to line 507 because the condition on line 502 was always true
503 nested_functions = FuncStepContractValidator._extract_functions_from_pattern(
504 f, step_name)
505 functions.extend(nested_functions)
506 else:
507 raise ValueError(invalid_function_error(f"list at index {i}", f))
508 return functions
510 # Case 5: Dict of keyed patterns
511 if isinstance(func, dict): 511 ↛ 528line 511 didn't jump to line 528 because the condition on line 511 was always true
512 from openhcs.core.pipeline.compiler import FunctionReference
513 for key, f in func.items():
514 # Check if it's a valid pattern (including FunctionReference)
515 is_valid_pattern = (
516 isinstance(f, (list, dict, tuple, FunctionReference)) or
517 (callable(f) and not isinstance(f, type))
518 )
519 if is_valid_pattern: 519 ↛ 524line 519 didn't jump to line 524 because the condition on line 519 was always true
520 nested_functions = FuncStepContractValidator._extract_functions_from_pattern(
521 f, step_name)
522 functions.extend(nested_functions)
523 else:
524 raise ValueError(invalid_function_error(f"dict with key '{key}'", f))
525 return functions
527 # Invalid type
528 raise ValueError(invalid_pattern_error(func))