Coverage for openhcs/core/pipeline/gpu_memory_validator.py: 24.0%
62 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2GPU memory type validator for OpenHCS.
4This module provides the GPUMemoryTypeValidator class, which is responsible for
5validating GPU memory types and assigning GPU IDs to steps requiring GPU memory.
7Doctrinal Clauses:
8- Clause 66 — Immutability After Construction
9- Clause 88 — No Inferred Capabilities
10- Clause 293 — GPU Pre-Declaration Enforcement
11- Clause 295 — GPU Scheduling Affinity
12"""
14import logging
15from typing import Any, Dict
17from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES
18from openhcs.core.utils import optional_import
20# LAZY IMPORT: Import gpu_scheduler only when needed to avoid circular dependency
21# from openhcs.core.orchestrator.gpu_scheduler import get_gpu_registry_status
23logger = logging.getLogger(__name__)
26def _validate_required_libraries(required_libraries: set) -> None:
27 """
28 Validate that required GPU libraries are installed.
30 Args:
31 required_libraries: Set of memory types that require library validation
33 Raises:
34 ValueError: If any required library is not installed
35 """
36 missing_libraries = []
38 for memory_type in required_libraries:
39 if memory_type == "cupy":
40 cupy = optional_import("cupy")
41 if cupy is None:
42 missing_libraries.append("cupy")
43 elif memory_type == "torch":
44 torch = optional_import("torch")
45 if torch is None:
46 missing_libraries.append("torch")
47 elif memory_type == "tensorflow":
48 tensorflow = optional_import("tensorflow")
49 if tensorflow is None:
50 missing_libraries.append("tensorflow")
51 elif memory_type == "jax":
52 jax = optional_import("jax")
53 if jax is None:
54 missing_libraries.append("jax")
56 if missing_libraries:
57 raise ValueError(
58 f"🔥 COMPILATION FAILED: Required GPU libraries not installed: {', '.join(missing_libraries)}. "
59 f"Pipeline contains functions decorated with @{'/'.join(missing_libraries)}_func but the corresponding "
60 f"libraries are not available. Install the missing libraries or remove the functions from your pipeline."
61 )
64class GPUMemoryTypeValidator:
65 """
66 Validator for GPU memory types in step plans.
68 This validator ensures that all declared GPU memory types are compatible
69 with available hardware, assigns valid GPU device IDs to steps requiring
70 GPU memory using the centralized GPU scheduler registry, and fails loudly
71 if no suitable GPU is available.
73 Key principles:
74 1. All declared GPU memory types must be validated
75 2. Steps requiring GPU memory must be assigned a valid GPU device ID via the scheduler
76 3. Validation must fail loudly if required GPU hardware is unavailable
77 4. No inference or mutation of declared memory types is allowed
78 5. GPU assignment must be thread-safe and respect concurrency limits
79 """
81 @staticmethod
82 def validate_step_plans(
83 step_plans: Dict[str, Dict[str, Any]]
84 ) -> Dict[str, Dict[str, Any]]:
85 """
86 Validate GPU memory types in step plans and assign GPU IDs.
88 This method checks each step plan for GPU memory types and
89 assigns a GPU ID to the step plan if needed. The GPU ID is
90 assigned during planning/compilation, not during execution.
92 Args:
93 step_plans: Dictionary mapping step IDs to step plans
95 Returns:
96 Dictionary mapping step IDs to dictionaries containing GPU assignments
98 Raises:
99 ValueError: If no GPUs are available
100 """
101 # Check if any step requires GPU and validate library availability
102 requires_gpu = False
103 required_libraries = set()
105 for step_id, step_plan in step_plans.items():
106 input_memory_type = step_plan.get('input_memory_type')
107 output_memory_type = step_plan.get('output_memory_type')
109 if input_memory_type in VALID_GPU_MEMORY_TYPES: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 requires_gpu = True
111 required_libraries.add(input_memory_type)
113 if output_memory_type in VALID_GPU_MEMORY_TYPES: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 requires_gpu = True
115 required_libraries.add(output_memory_type)
117 # If no step requires GPU, return empty assignments
118 if not requires_gpu: 118 ↛ 122line 118 didn't jump to line 122 because the condition on line 118 was always true
119 return {}
121 # Validate that required libraries are installed
122 _validate_required_libraries(required_libraries)
124 # Get GPU registry status (lazy import to avoid circular dependency)
125 try:
126 from openhcs.core.orchestrator.gpu_scheduler import get_gpu_registry_status
127 gpu_registry = get_gpu_registry_status()
128 logger.info("GPU registry status: %s", gpu_registry)
129 except Exception as e:
130 raise ValueError(f"🔥 COMPILATION FAILED: Cannot access GPU registry: {e}. GPU functions require initialized GPU registry!") from e
132 if not gpu_registry:
133 raise ValueError(
134 "🔥 COMPILATION FAILED: No GPUs available in registry but pipeline contains GPU-decorated functions (@torch, @cupy, etc.)!"
135 )
137 # Assign the first available GPU (since actual load tracking was orphaned)
138 # GPU assignment happens at compilation time, not runtime
139 least_loaded_gpu = list(gpu_registry.keys())[0]
141 # Assign the same GPU ID to all steps in the pipeline
142 # This ensures GPU affinity throughout the pipeline
143 gpu_id = least_loaded_gpu
145 # GPU ID will be assigned to step plans only, not to context
147 # Assign GPU ID to step plans
148 gpu_assignments = {}
149 for step_id, step_plan in step_plans.items():
150 input_memory_type = step_plan.get('input_memory_type')
151 output_memory_type = step_plan.get('output_memory_type')
153 if (input_memory_type in VALID_GPU_MEMORY_TYPES or
154 output_memory_type in VALID_GPU_MEMORY_TYPES):
155 # Assign GPU ID to step plan
156 step_plan['gpu_id'] = gpu_id
157 gpu_assignments[step_id] = {"gpu_id": gpu_id}
159 # Log assignment for debugging
160 logger.debug(
161 "Step %s assigned gpu_id %s for memory types: %s/%s",
162 step_id, gpu_id, input_memory_type, output_memory_type
163 )
165 return gpu_assignments