Coverage for openhcs/core/pipeline/gpu_memory_validator.py: 24.0%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2GPU memory type validator for OpenHCS. 

3 

4This module provides the GPUMemoryTypeValidator class, which is responsible for 

5validating GPU memory types and assigning GPU IDs to steps requiring GPU memory. 

6 

7Doctrinal Clauses: 

8- Clause 66 — Immutability After Construction 

9- Clause 88 — No Inferred Capabilities 

10- Clause 293 — GPU Pre-Declaration Enforcement 

11- Clause 295 — GPU Scheduling Affinity 

12""" 

13 

14import logging 

15from typing import Any, Dict 

16 

17from openhcs.constants.constants import VALID_GPU_MEMORY_TYPES 

18from openhcs.core.utils import optional_import 

19 

20# LAZY IMPORT: Import gpu_scheduler only when needed to avoid circular dependency 

21# from openhcs.core.orchestrator.gpu_scheduler import get_gpu_registry_status 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26def _validate_required_libraries(required_libraries: set) -> None: 

27 """ 

28 Validate that required GPU libraries are installed. 

29 

30 Args: 

31 required_libraries: Set of memory types that require library validation 

32 

33 Raises: 

34 ValueError: If any required library is not installed 

35 """ 

36 missing_libraries = [] 

37 

38 for memory_type in required_libraries: 

39 if memory_type == "cupy": 

40 cupy = optional_import("cupy") 

41 if cupy is None: 

42 missing_libraries.append("cupy") 

43 elif memory_type == "torch": 

44 torch = optional_import("torch") 

45 if torch is None: 

46 missing_libraries.append("torch") 

47 elif memory_type == "tensorflow": 

48 tensorflow = optional_import("tensorflow") 

49 if tensorflow is None: 

50 missing_libraries.append("tensorflow") 

51 elif memory_type == "jax": 

52 jax = optional_import("jax") 

53 if jax is None: 

54 missing_libraries.append("jax") 

55 

56 if missing_libraries: 

57 raise ValueError( 

58 f"🔥 COMPILATION FAILED: Required GPU libraries not installed: {', '.join(missing_libraries)}. " 

59 f"Pipeline contains functions decorated with @{'/'.join(missing_libraries)}_func but the corresponding " 

60 f"libraries are not available. Install the missing libraries or remove the functions from your pipeline." 

61 ) 

62 

63 

64class GPUMemoryTypeValidator: 

65 """ 

66 Validator for GPU memory types in step plans. 

67 

68 This validator ensures that all declared GPU memory types are compatible 

69 with available hardware, assigns valid GPU device IDs to steps requiring 

70 GPU memory using the centralized GPU scheduler registry, and fails loudly 

71 if no suitable GPU is available. 

72 

73 Key principles: 

74 1. All declared GPU memory types must be validated 

75 2. Steps requiring GPU memory must be assigned a valid GPU device ID via the scheduler 

76 3. Validation must fail loudly if required GPU hardware is unavailable 

77 4. No inference or mutation of declared memory types is allowed 

78 5. GPU assignment must be thread-safe and respect concurrency limits 

79 """ 

80 

81 @staticmethod 

82 def validate_step_plans( 

83 step_plans: Dict[str, Dict[str, Any]] 

84 ) -> Dict[str, Dict[str, Any]]: 

85 """ 

86 Validate GPU memory types in step plans and assign GPU IDs. 

87 

88 This method checks each step plan for GPU memory types and 

89 assigns a GPU ID to the step plan if needed. The GPU ID is 

90 assigned during planning/compilation, not during execution. 

91 

92 Args: 

93 step_plans: Dictionary mapping step IDs to step plans 

94 

95 Returns: 

96 Dictionary mapping step IDs to dictionaries containing GPU assignments 

97 

98 Raises: 

99 ValueError: If no GPUs are available 

100 """ 

101 # Check if any step requires GPU and validate library availability 

102 requires_gpu = False 

103 required_libraries = set() 

104 

105 for step_id, step_plan in step_plans.items(): 

106 input_memory_type = step_plan.get('input_memory_type') 

107 output_memory_type = step_plan.get('output_memory_type') 

108 

109 if input_memory_type in VALID_GPU_MEMORY_TYPES: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 requires_gpu = True 

111 required_libraries.add(input_memory_type) 

112 

113 if output_memory_type in VALID_GPU_MEMORY_TYPES: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 requires_gpu = True 

115 required_libraries.add(output_memory_type) 

116 

117 # If no step requires GPU, return empty assignments 

118 if not requires_gpu: 118 ↛ 122line 118 didn't jump to line 122 because the condition on line 118 was always true

119 return {} 

120 

121 # Validate that required libraries are installed 

122 _validate_required_libraries(required_libraries) 

123 

124 # Get GPU registry status (lazy import to avoid circular dependency) 

125 try: 

126 from openhcs.core.orchestrator.gpu_scheduler import get_gpu_registry_status 

127 gpu_registry = get_gpu_registry_status() 

128 logger.info("GPU registry status: %s", gpu_registry) 

129 except Exception as e: 

130 raise ValueError(f"🔥 COMPILATION FAILED: Cannot access GPU registry: {e}. GPU functions require initialized GPU registry!") from e 

131 

132 if not gpu_registry: 

133 raise ValueError( 

134 "🔥 COMPILATION FAILED: No GPUs available in registry but pipeline contains GPU-decorated functions (@torch, @cupy, etc.)!" 

135 ) 

136 

137 # Assign the first available GPU (since actual load tracking was orphaned) 

138 # GPU assignment happens at compilation time, not runtime 

139 least_loaded_gpu = list(gpu_registry.keys())[0] 

140 

141 # Assign the same GPU ID to all steps in the pipeline 

142 # This ensures GPU affinity throughout the pipeline 

143 gpu_id = least_loaded_gpu 

144 

145 # GPU ID will be assigned to step plans only, not to context 

146 

147 # Assign GPU ID to step plans 

148 gpu_assignments = {} 

149 for step_id, step_plan in step_plans.items(): 

150 input_memory_type = step_plan.get('input_memory_type') 

151 output_memory_type = step_plan.get('output_memory_type') 

152 

153 if (input_memory_type in VALID_GPU_MEMORY_TYPES or 

154 output_memory_type in VALID_GPU_MEMORY_TYPES): 

155 # Assign GPU ID to step plan 

156 step_plan['gpu_id'] = gpu_id 

157 gpu_assignments[step_id] = {"gpu_id": gpu_id} 

158 

159 # Log assignment for debugging 

160 logger.debug( 

161 "Step %s assigned gpu_id %s for memory types: %s/%s", 

162 step_id, gpu_id, input_memory_type, output_memory_type 

163 ) 

164 

165 return gpu_assignments