Coverage for openhcs/core/orchestrator/gpu_scheduler.py: 65.9%

62 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2GPU scheduler registry for OpenHCS. 

3 

4This module provides a thread-safe registry for GPU assignment during 

5multi-pipeline execution, enforcing maximum concurrency limits per GPU 

6and ensuring deterministic GPU assignment without runtime fallbacks. 

7 

8The GPU registry is a global singleton that must be initialized exactly once 

9during application startup, before any pipeline threads are created. It is 

10shared across all threads to ensure consistent GPU resource management. 

11 

12Thread Safety: 

13 All functions in this module are thread-safe and use a lock to ensure 

14 consistent access to the global registry. 

15 

16Doctrinal Clauses: 

17- Clause 12 — Absolute Clean Execution 

18- Clause 88 — No Inferred Capabilities 

19- Clause 293 — Predeclared GPU Availability 

20- Clause 295 — GPU Scheduling Affinity 

21""" 

22 

23import logging 

24import math 

25import os 

26import threading 

27from typing import Dict, List, Optional 

28 

29# DEFAULT_NUM_WORKERS removed 

30from openhcs.core.lazy_gpu_imports import check_gpu_capability, check_installed_gpu_libraries 

31import os 

32# Import necessary config classes 

33from openhcs.core.config import GlobalPipelineConfig 

34 

35 

36logger = logging.getLogger(__name__) # Ensure logger is consistently named if used across module 

37 

38# Thread-safe lock for GPU registry access 

39# Use RLock (reentrant lock) to allow same thread to acquire lock multiple times 

40# This prevents deadlocks when gc.collect() triggers __del__ methods that access GPU registry 

41_registry_lock = threading.RLock() 

42 

43# GPU registry singleton 

44# Structure: {gpu_id: {"max_pipelines": int}} 

45# Simplified: removed unused "active" count since no runtime coordination exists 

46GPU_REGISTRY: Dict[int, Dict[str, int]] = {} 

47 

48# Flag to track if the registry has been initialized 

49_registry_initialized = False 

50 

51 

52def initialize_gpu_registry(configured_num_workers: int) -> None: 

53 """ 

54 Initialize the GPU registry based on available GPUs and configured number of workers. 

55 

56 This function detects available GPUs, calculates the maximum number of 

57 concurrent pipelines per GPU (influenced by `configured_num_workers`), 

58 and initializes the GPU registry. 

59 

60 Args: 

61 configured_num_workers (int): The number of workers specified in the 

62 global configuration, used as a fallback 

63 if os.cpu_count() is not available or to 

64 influence pipelines per GPU. 

65 

66 Must be called exactly once during application startup, before any 

67 pipeline threads are created. The registry is a global singleton 

68 shared across all threads. 

69 

70 Thread-safe: Uses a lock to ensure consistent access to the global registry. 

71 

72 Raises: 

73 RuntimeError: If no GPUs are available or if the registry is already initialized. 

74 """ 

75 global GPU_REGISTRY, _registry_initialized 

76 

77 with _registry_lock: 

78 # Check if registry is already initialized 

79 if _registry_initialized: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 raise RuntimeError( 

81 "Clause 295 Violation: GPU registry already initialized. " 

82 "Cannot reinitialize during execution." 

83 ) 

84 

85 # Detect available GPUs 

86 available_gpus = _detect_available_gpus() 

87 logger.info(f"Detected GPUs: {available_gpus}") 

88 

89 if not available_gpus: 89 ↛ 96line 89 didn't jump to line 96 because the condition on line 89 was always true

90 logger.warning("No GPUs detected. GPU memory types will not be available.") 

91 _registry_initialized = True 

92 GPU_REGISTRY.clear() 

93 return 

94 

95 # Get maximum CPU threads (use CPU count as a proxy, fallback to configured_num_workers) 

96 max_cpu_threads = os.cpu_count() or configured_num_workers 

97 if max_cpu_threads <= 0: # Ensure positive 

98 max_cpu_threads = 1 

99 

100 

101 # Calculate maximum pipelines per GPU 

102 max_pipelines_per_gpu = math.ceil(max_cpu_threads / len(available_gpus)) 

103 

104 # Initialize registry 

105 GPU_REGISTRY.clear() 

106 for gpu_id in available_gpus: 

107 GPU_REGISTRY[gpu_id] = {"max_pipelines": max_pipelines_per_gpu} 

108 

109 logger.info( 

110 "GPU registry initialized with %s GPUs. Maximum %s pipelines per GPU.", 

111 len(available_gpus), max_pipelines_per_gpu 

112 ) 

113 

114 # Mark registry as initialized 

115 _registry_initialized = True 

116 

117 

118def _detect_available_gpus() -> List[int]: 

119 """ 

120 Detect available GPUs across all supported frameworks. 

121 

122 Returns: 

123 List of available GPU IDs 

124 """ 

125 # Skip GPU detection if in subprocess mode 

126 if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 return [] 

128 

129 available_gpus = set() 

130 

131 # Check each GPU library 

132 for lib_name in ['cupy', 'torch', 'tensorflow', 'jax']: 

133 try: 

134 gpu_id = check_gpu_capability(lib_name) 

135 if gpu_id is not None: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 available_gpus.add(gpu_id) 

137 except Exception as e: 

138 logger.debug(f"{lib_name.capitalize()} GPU detection failed: {e}") 

139 

140 return sorted(list(available_gpus)) 

141 

142 

143# NOTE: acquire_gpu_slot() and release_gpu_slot() functions removed 

144# These were orphaned code that was never actually called in the execution path. 

145# GPU assignment happens at compilation time via GPUMemoryTypeValidator, not at runtime. 

146 

147 

148def get_gpu_registry_status() -> Dict[int, Dict[str, int]]: 

149 """ 

150 Get the current status of the GPU registry. 

151 

152 Thread-safe: Uses a lock to ensure consistent access to the global registry. 

153 

154 Returns: 

155 Copy of the GPU registry 

156 

157 Raises: 

158 RuntimeError: If the GPU registry is not initialized 

159 """ 

160 

161 with _registry_lock: 

162 # Check if registry is initialized 

163 if not _registry_initialized: 

164 raise RuntimeError( 

165 "Clause 295 Violation: GPU registry not initialized. " 

166 "Must call initialize_gpu_registry() first." 

167 ) 

168 

169 # Return a copy of the registry to prevent external modification 

170 return {gpu_id: info.copy() for gpu_id, info in GPU_REGISTRY.items()} 

171 

172 

173def is_gpu_registry_initialized() -> bool: 

174 """ 

175 Check if the GPU registry has been initialized. 

176 

177 Thread-safe: Uses a lock to ensure consistent access to the initialization flag. 

178 

179 Returns: 

180 True if the registry is initialized, False otherwise 

181 """ 

182 with _registry_lock: 

183 return _registry_initialized 

184 

185 

186def setup_global_gpu_registry(global_config: Optional[GlobalPipelineConfig] = None) -> None: 

187 """ 

188 Initializes the global GPU registry using the provided or default global configuration. 

189 

190 This function should be called once at application startup. It ensures that the 

191 GPU registry is initialized with worker configurations derived from the 

192 GlobalPipelineConfig. 

193 

194 Args: 

195 global_config (Optional[GlobalPipelineConfig]): An optional pre-loaded global 

196 configuration object. If None, the default global configuration will be used. 

197 """ 

198 # Use the existing thread-safe check from is_gpu_registry_initialized() 

199 # but need to acquire lock to make the check-and-set atomic if we were to set _registry_initialized here. 

200 # However, initialize_gpu_registry itself is internally locked and handles the _registry_initialized flag. 

201 

202 if is_gpu_registry_initialized(): 

203 logger.info("GPU registry is already initialized. Skipping setup.") 

204 return 

205 

206 config_to_use: GlobalPipelineConfig 

207 if global_config is None: 

208 logger.info("No global_config provided to setup_global_gpu_registry, using default configuration.") 

209 config_to_use = GlobalPipelineConfig() 

210 else: 

211 config_to_use = global_config 

212 

213 # initialize_gpu_registry is already designed to be called once and is thread-safe. 

214 # FAIL LOUD: No try-except - let exceptions bubble up to caller 

215 initialize_gpu_registry(configured_num_workers=config_to_use.num_workers) 

216 logger.info("Global GPU registry setup complete via setup_global_gpu_registry.")