Coverage for openhcs/core/orchestrator/gpu_scheduler.py: 60.4%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2GPU scheduler registry for OpenHCS. 

3 

4This module provides a thread-safe registry for GPU assignment during 

5multi-pipeline execution, enforcing maximum concurrency limits per GPU 

6and ensuring deterministic GPU assignment without runtime fallbacks. 

7 

8The GPU registry is a global singleton that must be initialized exactly once 

9during application startup, before any pipeline threads are created. It is 

10shared across all threads to ensure consistent GPU resource management. 

11 

12Thread Safety: 

13 All functions in this module are thread-safe and use a lock to ensure 

14 consistent access to the global registry. 

15 

16Doctrinal Clauses: 

17- Clause 12 — Absolute Clean Execution 

18- Clause 88 — No Inferred Capabilities 

19- Clause 293 — Predeclared GPU Availability 

20- Clause 295 — GPU Scheduling Affinity 

21""" 

22 

23import logging 

24import math 

25import os 

26import threading 

27from typing import Dict, List, Optional 

28 

29# DEFAULT_NUM_WORKERS removed 

30from openhcs.core.memory.gpu_utils import (check_cupy_gpu_available, 

31 check_jax_gpu_available, 

32 check_tf_gpu_available, 

33 check_torch_gpu_available) 

34# Import necessary config classes 

35from openhcs.core.config import GlobalPipelineConfig, get_default_global_config 

36 

37 

38logger = logging.getLogger(__name__) # Ensure logger is consistently named if used across module 

39 

40# Thread-safe lock for GPU registry access 

41_registry_lock = threading.Lock() 

42 

43# GPU registry singleton 

44# Structure: {gpu_id: {"max_pipelines": int}} 

45# Simplified: removed unused "active" count since no runtime coordination exists 

46GPU_REGISTRY: Dict[int, Dict[str, int]] = {} 

47 

48# Flag to track if the registry has been initialized 

49_registry_initialized = False 

50 

51 

52def initialize_gpu_registry(configured_num_workers: int) -> None: 

53 """ 

54 Initialize the GPU registry based on available GPUs and configured number of workers. 

55 

56 This function detects available GPUs, calculates the maximum number of 

57 concurrent pipelines per GPU (influenced by `configured_num_workers`), 

58 and initializes the GPU registry. 

59 

60 Args: 

61 configured_num_workers (int): The number of workers specified in the 

62 global configuration, used as a fallback 

63 if os.cpu_count() is not available or to 

64 influence pipelines per GPU. 

65 

66 Must be called exactly once during application startup, before any 

67 pipeline threads are created. The registry is a global singleton 

68 shared across all threads. 

69 

70 Thread-safe: Uses a lock to ensure consistent access to the global registry. 

71 

72 Raises: 

73 RuntimeError: If no GPUs are available or if the registry is already initialized. 

74 """ 

75 global GPU_REGISTRY, _registry_initialized 

76 

77 with _registry_lock: 

78 # Check if registry is already initialized 

79 if _registry_initialized: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 raise RuntimeError( 

81 "Clause 295 Violation: GPU registry already initialized. " 

82 "Cannot reinitialize during execution." 

83 ) 

84 

85 # Detect available GPUs 

86 available_gpus = _detect_available_gpus() 

87 logger.info(f"Detected GPUs: {available_gpus}") 

88 

89 if not available_gpus: 89 ↛ 96line 89 didn't jump to line 96 because the condition on line 89 was always true

90 logger.warning("No GPUs detected. GPU memory types will not be available.") 

91 _registry_initialized = True 

92 GPU_REGISTRY.clear() 

93 return 

94 

95 # Get maximum CPU threads (use CPU count as a proxy, fallback to configured_num_workers) 

96 max_cpu_threads = os.cpu_count() or configured_num_workers 

97 if max_cpu_threads <= 0: # Ensure positive 

98 max_cpu_threads = 1 

99 

100 

101 # Calculate maximum pipelines per GPU 

102 max_pipelines_per_gpu = math.ceil(max_cpu_threads / len(available_gpus)) 

103 

104 # Initialize registry 

105 GPU_REGISTRY.clear() 

106 for gpu_id in available_gpus: 

107 GPU_REGISTRY[gpu_id] = {"max_pipelines": max_pipelines_per_gpu} 

108 

109 logger.info( 

110 "GPU registry initialized with %s GPUs. Maximum %s pipelines per GPU.", 

111 len(available_gpus), max_pipelines_per_gpu 

112 ) 

113 

114 # Mark registry as initialized 

115 _registry_initialized = True 

116 

117 

118def _detect_available_gpus() -> List[int]: 

119 """ 

120 Detect available GPUs across all supported frameworks. 

121 

122 Returns: 

123 List of available GPU IDs 

124 """ 

125 available_gpus = set() 

126 

127 # Check cupy GPUs 

128 try: 

129 cupy_gpu = check_cupy_gpu_available() 

130 if cupy_gpu is not None: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true

131 available_gpus.add(cupy_gpu) 

132 except Exception as e: 

133 logger.debug("Cupy GPU detection failed: %s", e) 

134 

135 # Check torch GPUs 

136 try: 

137 torch_gpu = check_torch_gpu_available() 

138 if torch_gpu is not None: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 available_gpus.add(torch_gpu) 

140 except Exception as e: 

141 logger.debug("Torch GPU detection failed: %s", e) 

142 

143 # Check tensorflow GPUs 

144 try: 

145 tf_gpu = check_tf_gpu_available() 

146 if tf_gpu is not None: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 available_gpus.add(tf_gpu) 

148 except Exception as e: 

149 logger.debug("TensorFlow GPU detection failed: %s", e) 

150 

151 # Skip JAX GPU detection to prevent thread explosion 

152 # JAX creates 54+ threads during jax.devices() call 

153 # JAX GPU detection will be done lazily only when JAX functions are actually used 

154 # TODO: Add JAX GPU detection back when we have proper lazy initialization 

155 logger.debug("Skipping JAX GPU detection to prevent thread explosion") 

156 

157 return sorted(list(available_gpus)) 

158 

159 

160# NOTE: acquire_gpu_slot() and release_gpu_slot() functions removed 

161# These were orphaned code that was never actually called in the execution path. 

162# GPU assignment happens at compilation time via GPUMemoryTypeValidator, not at runtime. 

163 

164 

165def get_gpu_registry_status() -> Dict[int, Dict[str, int]]: 

166 """ 

167 Get the current status of the GPU registry. 

168 

169 Thread-safe: Uses a lock to ensure consistent access to the global registry. 

170 

171 Returns: 

172 Copy of the GPU registry 

173 

174 Raises: 

175 RuntimeError: If the GPU registry is not initialized 

176 """ 

177 

178 with _registry_lock: 

179 # Check if registry is initialized 

180 if not _registry_initialized: 

181 raise RuntimeError( 

182 "Clause 295 Violation: GPU registry not initialized. " 

183 "Must call initialize_gpu_registry() first." 

184 ) 

185 

186 # Return a copy of the registry to prevent external modification 

187 return {gpu_id: info.copy() for gpu_id, info in GPU_REGISTRY.items()} 

188 

189 

190def is_gpu_registry_initialized() -> bool: 

191 """ 

192 Check if the GPU registry has been initialized. 

193 

194 Thread-safe: Uses a lock to ensure consistent access to the initialization flag. 

195 

196 Returns: 

197 True if the registry is initialized, False otherwise 

198 """ 

199 with _registry_lock: 

200 return _registry_initialized 

201 

202 

203def setup_global_gpu_registry(global_config: Optional[GlobalPipelineConfig] = None) -> None: 

204 """ 

205 Initializes the global GPU registry using the provided or default global configuration. 

206 

207 This function should be called once at application startup. It ensures that the 

208 GPU registry is initialized with worker configurations derived from the 

209 GlobalPipelineConfig. 

210 

211 Args: 

212 global_config (Optional[GlobalPipelineConfig]): An optional pre-loaded global 

213 configuration object. If None, the default global configuration will be used. 

214 """ 

215 # Use the existing thread-safe check from is_gpu_registry_initialized() 

216 # but need to acquire lock to make the check-and-set atomic if we were to set _registry_initialized here. 

217 # However, initialize_gpu_registry itself is internally locked and handles the _registry_initialized flag. 

218 

219 if is_gpu_registry_initialized(): 

220 logger.info("GPU registry is already initialized. Skipping setup.") 

221 return 

222 

223 config_to_use: GlobalPipelineConfig 

224 if global_config is None: 224 ↛ 228line 224 didn't jump to line 228 because the condition on line 224 was always true

225 logger.info("No global_config provided to setup_global_gpu_registry, using default configuration.") 

226 config_to_use = get_default_global_config() 

227 else: 

228 config_to_use = global_config 

229 

230 # initialize_gpu_registry is already designed to be called once and is thread-safe. 

231 # FAIL LOUD: No try-except - let exceptions bubble up to caller 

232 initialize_gpu_registry(configured_num_workers=config_to_use.num_workers) 

233 logger.info("Global GPU registry setup complete via setup_global_gpu_registry.")