Coverage for openhcs/core/pipeline/funcstep_contract_validator.py: 48.1%
148 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2FuncStep memory contract validator for OpenHCS.
4This module provides the FuncStepContractValidator class, which is responsible for
5validating memory type declarations for FunctionStep instances in a pipeline.
7Doctrinal Clauses:
8- Clause 65 — No Fallback Logic
9- Clause 88 — No Inferred Capabilities
10- Clause 101 — Memory Type Declaration
11- Clause 106-A — Declared Memory Types
12- Clause 308 — Named Positional Enforcement
13"""
15import inspect
16import logging
17from typing import Any, Callable, Dict, List, Optional, Tuple
19from openhcs.constants.constants import VALID_MEMORY_TYPES
20from openhcs.core.steps.function_step import FunctionStep
22logger = logging.getLogger(__name__)
24# ===== DECLARATIVE DEFAULT VALUES =====
25# These declarations control defaults and may be moved to configuration in the future
27# Simple, direct error messages
28def missing_memory_type_error(func_name, step_name):
29 return (
30 f"Function '{func_name}' in step '{step_name}' needs memory type decorator (@numpy, @cupy, @torch, etc.)\n"
31 f"\n"
32 f"💡 SOLUTION: Use OpenHCS registry functions instead of raw external library functions:\n"
33 f"\n"
34 f"❌ WRONG:\n"
35 f" import pyclesperanto as cle\n"
36 f" step = FunctionStep(func=cle.{func_name}, name='{step_name}')\n"
37 f"\n"
38 f"✅ CORRECT:\n"
39 f" from openhcs.processing.func_registry import get_function_by_name\n"
40 f" {func_name}_func = get_function_by_name('{func_name}', 'pyclesperanto') # or 'numpy', 'cupy'\n"
41 f" step = FunctionStep(func={func_name}_func, name='{step_name}')\n"
42 f"\n"
43 f"📋 Available functions: Use get_all_function_names('pyclesperanto') to see all options"
44 )
46def inconsistent_memory_types_error(step_name, func1, func2):
47 return f"Functions in step '{step_name}' have different memory types: {func1} vs {func2}"
49def invalid_memory_type_error(func_name, input_type, output_type, valid_types):
50 return f"Function '{func_name}' has invalid memory types: {input_type}/{output_type}. Valid: {valid_types}"
52def invalid_function_error(location, func):
53 return f"Invalid function in {location}: {func}"
55def invalid_pattern_error(pattern):
56 return f"Invalid function pattern: {pattern}"
58def missing_required_args_error(func_name, step_name, missing_args):
59 return f"Function '{func_name}' in step '{step_name}' missing required args: {missing_args}"
61def complex_pattern_error(step_name):
62 return f"Step '{step_name}' with special decorators must use simple function pattern"
64class FuncStepContractValidator:
65 """
66 Validator for FunctionStep memory type contracts.
68 This validator enforces Clause 101 (Memory Type Declaration), Clause 88
69 (No Inferred Capabilities), and Clause 308 (Named Positional Enforcement)
70 by requiring explicit memory type declarations and named positional arguments
71 for all FunctionStep instances and their functions.
73 Key principles:
74 1. All functions in a FunctionStep must have consistent memory types
75 2. The shared memory types are set as the step's memory types in the step plan
76 3. Memory types must be validated at plan time, not runtime
77 4. No fallback or inference of memory types is allowed
78 5. All function patterns (callable, tuple, list, dict) are supported
79 6. When using (func, kwargs) pattern, all required positional arguments must be
80 explicitly provided in the kwargs dict
81 """
83 @staticmethod
84 def validate_pipeline(steps: List[Any], pipeline_context: Optional[Dict[str, Any]] = None, orchestrator=None) -> Dict[str, Dict[str, str]]:
85 """
86 Validate memory type contracts and function patterns for all FunctionStep instances in a pipeline.
88 This validator must run after the materialization and path planners to ensure
89 proper plan integration. It verifies that these planners have run by checking
90 the pipeline_context for planner execution flags and by validating the presence
91 of required fields in the step plans.
93 Args:
94 steps: The steps in the pipeline
95 pipeline_context: Optional context object with planner execution flags
96 orchestrator: Optional orchestrator for dict pattern key validation
98 Returns:
99 Dictionary mapping step UIDs to memory type dictionaries
101 Raises:
102 ValueError: If any FunctionStep violates memory type contracts or dict pattern validation
103 AssertionError: If required planners have not run before this validator
104 """
105 # Validate steps
106 if not steps: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 logger.warning("No steps provided to FuncStepContractValidator")
108 return {}
110 # Verify that required planners have run before this validator
111 if pipeline_context is not None: 111 ↛ 127line 111 didn't jump to line 127 because the condition on line 111 was always true
112 # Check that step plans exist and have required fields from planners
113 if not pipeline_context.step_plans:
114 raise AssertionError(
115 "Clause 101 Violation: Step plans must be initialized before FuncStepContractValidator."
116 )
118 # Check that materialization planner has run by verifying read_backend/write_backend exist
119 sample_step_id = next(iter(pipeline_context.step_plans.keys()))
120 sample_plan = pipeline_context.step_plans[sample_step_id]
121 if 'read_backend' not in sample_plan or 'write_backend' not in sample_plan:
122 raise AssertionError(
123 "Clause 101 Violation: Materialization planner must run before FuncStepContractValidator. "
124 "Step plans missing read_backend/write_backend fields."
125 )
126 else:
127 logger.warning(
128 "No pipeline_context provided to FuncStepContractValidator. "
129 "Cannot verify planner execution order. Falling back to attribute checks."
130 )
132 # Create step memory types dictionary
133 step_memory_types = {}
135 # Process each step in the pipeline
136 for i, step in enumerate(steps):
137 # Only validate FunctionStep instances
138 if isinstance(step, FunctionStep): 138 ↛ 136line 138 didn't jump to line 136 because the condition on line 138 was always true
139 # Verify that other planners have run before this validator by checking attributes
140 # This is a fallback verification when pipeline_context is not provided
141 try:
142 # Check for path planner fields (using dunder names)
143 _ = step.__input_dir__
144 _ = step.__output_dir__
145 except AttributeError as e:
146 raise AssertionError(
147 f"Clause 101 Violation: Required planners must run before FuncStepContractValidator. "
148 f"Missing attribute: {e}. Path planner must run first."
149 ) from e
151 memory_types = FuncStepContractValidator.validate_funcstep(step, orchestrator)
152 step_memory_types[step.step_id] = memory_types
156 return step_memory_types
158 @staticmethod
159 def validate_funcstep(step: FunctionStep, orchestrator=None) -> Dict[str, str]:
160 """
161 Validate memory type contracts, func_pattern structure, and dict pattern keys for a FunctionStep instance.
162 If special I/O or chainbreaker decorators are used, the func_pattern must be simple.
164 Args:
165 step: The FunctionStep to validate
166 orchestrator: Optional orchestrator for dict pattern key validation
168 Returns:
169 Dictionary of validated memory types
171 Raises:
172 ValueError: If the FunctionStep violates memory type contracts, structural rules,
173 or dict pattern key validation.
174 """
175 # Extract the function pattern and name from the step
176 func_pattern = step.func # Renamed for clarity in this context
177 step_name = step.name
179 # 1. Check if any function in the pattern uses special contract decorators
180 # _extract_functions_from_pattern will raise ValueError if func_pattern itself is invalid (e.g. None, or bad structure)
181 all_callables = FuncStepContractValidator._extract_functions_from_pattern(func_pattern, step_name)
183 uses_special_contracts = False
184 if all_callables: # Only check attributes if we have actual callables 184 ↛ 196line 184 didn't jump to line 196 because the condition on line 184 was always true
185 for f_callable in all_callables:
186 if hasattr(f_callable, '__special_inputs__') or \
187 hasattr(f_callable, '__special_outputs__') or \
188 hasattr(f_callable, '__chain_breaker__'):
189 uses_special_contracts = True
190 break
192 # 2. Special contracts validation is handled by validate_pattern_structure() below
193 # No additional restrictions needed - all valid patterns support special contracts
195 # 3. Validate dict pattern keys if orchestrator is available
196 if orchestrator is not None and isinstance(func_pattern, dict) and step.group_by is not None: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 FuncStepContractValidator._validate_dict_pattern_keys(
198 func_pattern, step.group_by, step_name, orchestrator
199 )
201 # 4. Proceed with existing memory type validation using the original func_pattern
202 input_type, output_type = FuncStepContractValidator.validate_function_pattern(
203 func_pattern, step_name)
205 # Return the validated memory types and store the func for stateless execution
206 return {
207 'input_memory_type': input_type,
208 'output_memory_type': output_type,
209 'func': func_pattern # Store the validated func for stateless execution
210 }
212 @staticmethod
213 def validate_function_pattern(
214 func: Any,
215 step_name: str
216 ) -> Tuple[str, str]:
217 """
218 Validate memory type contracts for a function pattern.
220 Args:
221 func: The function pattern to validate
222 step_name: The name of the step containing the function
224 Returns:
225 Tuple of (input_memory_type, output_memory_type)
227 Raises:
228 ValueError: If the function pattern violates memory type contracts
229 """
230 # Extract all functions from the pattern
231 functions = FuncStepContractValidator.validate_pattern_structure(func, step_name)
233 if not functions: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 raise ValueError(f"No valid functions found in pattern for step {step_name}")
236 # Get memory types from the first function
237 first_fn = functions[0]
239 # Validate that the function has explicit memory type declarations
240 try:
241 input_type = first_fn.input_memory_type
242 output_type = first_fn.output_memory_type
243 except AttributeError as exc:
244 raise ValueError(missing_memory_type_error(first_fn.__name__, step_name)) from exc
246 # Validate memory types against known valid types
247 if input_type not in VALID_MEMORY_TYPES or output_type not in VALID_MEMORY_TYPES: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 raise ValueError(invalid_memory_type_error(
249 first_fn.__name__, input_type, output_type, ", ".join(sorted(VALID_MEMORY_TYPES))
250 ))
252 # Validate that all functions have valid memory type declarations
253 for fn in functions[1:]: 253 ↛ 255line 253 didn't jump to line 255 because the loop on line 253 never started
254 # Validate that the function has explicit memory type declarations
255 try:
256 fn_input_type = fn.input_memory_type
257 fn_output_type = fn.output_memory_type
258 except AttributeError as exc:
259 raise ValueError(missing_memory_type_error(fn.__name__, step_name)) from exc
261 # Validate memory types against known valid types
262 if fn_input_type not in VALID_MEMORY_TYPES or fn_output_type not in VALID_MEMORY_TYPES:
263 raise ValueError(invalid_memory_type_error(
264 fn.__name__, fn_input_type, fn_output_type, ", ".join(sorted(VALID_MEMORY_TYPES))
265 ))
267 # Return first function's input type and last function's output type
268 last_function = functions[-1]
269 return input_type, last_function.output_memory_type
271 @staticmethod
272 def _validate_required_args(func: Callable, kwargs: Dict[str, Any], step_name: str) -> None:
273 """
274 Validate that all required positional arguments are provided in kwargs.
276 This enforces Clause 308 (Named Positional Enforcement) by requiring that
277 all required positional arguments are explicitly provided in the kwargs dict
278 when using the (func, kwargs) pattern.
280 Args:
281 func: The function to validate
282 kwargs: The kwargs dict to check
283 step_name: The name of the step containing the function
285 Raises:
286 ValueError: If any required positional arguments are missing from kwargs
287 """
288 # Get the function signature
289 sig = inspect.signature(func)
291 # Collect names of required positional arguments
292 required_args = []
293 for name, param in sig.parameters.items():
294 # Check if parameter is positional (POSITIONAL_ONLY or POSITIONAL_OR_KEYWORD)
295 if param.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD):
296 # Check if parameter has no default value
297 if param.default is inspect.Parameter.empty:
298 required_args.append(name)
300 # Check if all required args are in kwargs
301 missing_args = [arg for arg in required_args if arg not in kwargs]
303 # Raise error if any required args are missing
304 if missing_args:
305 raise ValueError(missing_required_args_error(func.__name__, step_name, missing_args))
307 @staticmethod
308 def _validate_dict_pattern_keys(
309 func_pattern: dict,
310 group_by,
311 step_name: str,
312 orchestrator
313 ) -> None:
314 """
315 Validate that dict function pattern keys match available component keys.
317 This validation ensures compile-time guarantee that dict patterns will work
318 at runtime by checking that all dict keys exist in the actual component data.
320 Args:
321 func_pattern: Dict function pattern to validate
322 group_by: GroupBy enum specifying component type
323 step_name: Name of the step containing the function
324 orchestrator: Orchestrator for component key access
326 Raises:
327 ValueError: If dict pattern keys don't match available component keys
328 """
329 # Get available component keys from orchestrator
330 try:
331 available_keys = orchestrator.get_component_keys(group_by)
332 available_keys_set = set(str(key) for key in available_keys)
333 except Exception as e:
334 raise ValueError(f"Failed to get component keys for {group_by.value}: {e}")
336 # Check each dict key against available keys
337 pattern_keys = list(func_pattern.keys())
338 pattern_keys_set = set(str(key) for key in pattern_keys)
340 # Try direct string match first
341 missing_keys = pattern_keys_set - available_keys_set
343 if missing_keys:
344 # Try integer conversion for missing keys
345 still_missing = set()
346 for key in missing_keys:
347 try:
348 # Try converting pattern key to int and check if int version exists in available keys
349 key_as_int = int(key)
350 if str(key_as_int) not in available_keys_set:
351 still_missing.add(key)
352 except (ValueError, TypeError):
353 # Try converting available keys to int and check if string key matches
354 found_as_int = False
355 for avail_key in available_keys_set:
356 try:
357 if int(avail_key) == int(key):
358 found_as_int = True
359 break
360 except (ValueError, TypeError):
361 continue
362 if not found_as_int:
363 still_missing.add(key)
365 if still_missing:
366 raise ValueError(
367 f"Function pattern keys not found in available {group_by.value} components for step '{step_name}'. "
368 f"Missing keys: {sorted(still_missing)}. "
369 f"Available keys: {sorted(available_keys)}. "
370 f"Function pattern keys must match component values from the plate data."
371 )
373 @staticmethod
374 def validate_pattern_structure(
375 func: Any,
376 step_name: str
377 ) -> List[Callable]:
378 """
379 Validate and extract all functions from a function pattern.
381 This is a public wrapper for _extract_functions_from_pattern that provides
382 a stable API for pattern structure validation.
384 Supports nested patterns of arbitrary depth, including:
385 - Direct callable
386 - Tuple of (callable, kwargs)
387 - List of callables or patterns
388 - Dict of keyed callables or patterns
390 Args:
391 func: The function pattern to validate and extract functions from
392 step_name: The name of the step or component containing the function
394 Returns:
395 List of functions in the pattern
397 Raises:
398 ValueError: If the function pattern is invalid
399 """
400 return FuncStepContractValidator._extract_functions_from_pattern(func, step_name)
402 @staticmethod
403 def _extract_functions_from_pattern(
404 func: Any,
405 step_name: str
406 ) -> List[Callable]:
407 """
408 Extract all functions from a function pattern.
410 Supports nested patterns of arbitrary depth, including:
411 - Direct callable
412 - Tuple of (callable, kwargs)
413 - List of callables or patterns
414 - Dict of keyed callables or patterns
416 Args:
417 func: The function pattern to extract functions from
418 step_name: The name of the step containing the function
420 Returns:
421 List of functions in the pattern
423 Raises:
424 ValueError: If the function pattern is invalid
425 """
426 functions = []
428 # Case 1: Direct callable
429 if callable(func) and not isinstance(func, type):
430 functions.append(func)
431 return functions
433 # Case 2: Tuple of (callable, kwargs)
434 if (isinstance(func, tuple) and len(func) == 2 and
435 callable(func[0]) and isinstance(func[1], dict)):
436 # The kwargs dict is optional - if provided, it will be used during execution
437 # No need to validate required args here as the execution logic handles this gracefully
438 functions.append(func[0])
439 return functions
441 # Case 3: List of patterns
442 if isinstance(func, list): 442 ↛ 454line 442 didn't jump to line 454 because the condition on line 442 was always true
443 for i, f in enumerate(func):
444 # Recursively extract functions from nested patterns
445 if isinstance(f, (list, dict, tuple)) or (callable(f) and not isinstance(f, type)): 445 ↛ 450line 445 didn't jump to line 450 because the condition on line 445 was always true
446 nested_functions = FuncStepContractValidator._extract_functions_from_pattern(
447 f, step_name)
448 functions.extend(nested_functions)
449 else:
450 raise ValueError(invalid_function_error(f"list at index {i}", f))
451 return functions
453 # Case 4: Dict of keyed patterns
454 if isinstance(func, dict):
455 for key, f in func.items():
456 # Recursively extract functions from nested patterns
457 if isinstance(f, (list, dict, tuple)) or (callable(f) and not isinstance(f, type)):
458 nested_functions = FuncStepContractValidator._extract_functions_from_pattern(
459 f, step_name)
460 functions.extend(nested_functions)
461 else:
462 raise ValueError(invalid_function_error(f"dict with key '{key}'", f))
463 return functions
465 # Invalid type
466 raise ValueError(invalid_pattern_error(func))