Coverage for openhcs/core/orchestrator/gpu_scheduler.py: 60.4%
71 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2GPU scheduler registry for OpenHCS.
4This module provides a thread-safe registry for GPU assignment during
5multi-pipeline execution, enforcing maximum concurrency limits per GPU
6and ensuring deterministic GPU assignment without runtime fallbacks.
8The GPU registry is a global singleton that must be initialized exactly once
9during application startup, before any pipeline threads are created. It is
10shared across all threads to ensure consistent GPU resource management.
12Thread Safety:
13 All functions in this module are thread-safe and use a lock to ensure
14 consistent access to the global registry.
16Doctrinal Clauses:
17- Clause 12 — Absolute Clean Execution
18- Clause 88 — No Inferred Capabilities
19- Clause 293 — Predeclared GPU Availability
20- Clause 295 — GPU Scheduling Affinity
21"""
23import logging
24import math
25import os
26import threading
27from typing import Dict, List, Optional
29# DEFAULT_NUM_WORKERS removed
30from openhcs.core.memory.gpu_utils import (check_cupy_gpu_available,
31 check_jax_gpu_available,
32 check_tf_gpu_available,
33 check_torch_gpu_available)
34# Import necessary config classes
35from openhcs.core.config import GlobalPipelineConfig, get_default_global_config
38logger = logging.getLogger(__name__) # Ensure logger is consistently named if used across module
40# Thread-safe lock for GPU registry access
41_registry_lock = threading.Lock()
43# GPU registry singleton
44# Structure: {gpu_id: {"max_pipelines": int}}
45# Simplified: removed unused "active" count since no runtime coordination exists
46GPU_REGISTRY: Dict[int, Dict[str, int]] = {}
48# Flag to track if the registry has been initialized
49_registry_initialized = False
52def initialize_gpu_registry(configured_num_workers: int) -> None:
53 """
54 Initialize the GPU registry based on available GPUs and configured number of workers.
56 This function detects available GPUs, calculates the maximum number of
57 concurrent pipelines per GPU (influenced by `configured_num_workers`),
58 and initializes the GPU registry.
60 Args:
61 configured_num_workers (int): The number of workers specified in the
62 global configuration, used as a fallback
63 if os.cpu_count() is not available or to
64 influence pipelines per GPU.
66 Must be called exactly once during application startup, before any
67 pipeline threads are created. The registry is a global singleton
68 shared across all threads.
70 Thread-safe: Uses a lock to ensure consistent access to the global registry.
72 Raises:
73 RuntimeError: If no GPUs are available or if the registry is already initialized.
74 """
75 global GPU_REGISTRY, _registry_initialized
77 with _registry_lock:
78 # Check if registry is already initialized
79 if _registry_initialized: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 raise RuntimeError(
81 "Clause 295 Violation: GPU registry already initialized. "
82 "Cannot reinitialize during execution."
83 )
85 # Detect available GPUs
86 available_gpus = _detect_available_gpus()
87 logger.info(f"Detected GPUs: {available_gpus}")
89 if not available_gpus: 89 ↛ 96line 89 didn't jump to line 96 because the condition on line 89 was always true
90 logger.warning("No GPUs detected. GPU memory types will not be available.")
91 _registry_initialized = True
92 GPU_REGISTRY.clear()
93 return
95 # Get maximum CPU threads (use CPU count as a proxy, fallback to configured_num_workers)
96 max_cpu_threads = os.cpu_count() or configured_num_workers
97 if max_cpu_threads <= 0: # Ensure positive
98 max_cpu_threads = 1
101 # Calculate maximum pipelines per GPU
102 max_pipelines_per_gpu = math.ceil(max_cpu_threads / len(available_gpus))
104 # Initialize registry
105 GPU_REGISTRY.clear()
106 for gpu_id in available_gpus:
107 GPU_REGISTRY[gpu_id] = {"max_pipelines": max_pipelines_per_gpu}
109 logger.info(
110 "GPU registry initialized with %s GPUs. Maximum %s pipelines per GPU.",
111 len(available_gpus), max_pipelines_per_gpu
112 )
114 # Mark registry as initialized
115 _registry_initialized = True
118def _detect_available_gpus() -> List[int]:
119 """
120 Detect available GPUs across all supported frameworks.
122 Returns:
123 List of available GPU IDs
124 """
125 available_gpus = set()
127 # Check cupy GPUs
128 try:
129 cupy_gpu = check_cupy_gpu_available()
130 if cupy_gpu is not None: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true
131 available_gpus.add(cupy_gpu)
132 except Exception as e:
133 logger.debug("Cupy GPU detection failed: %s", e)
135 # Check torch GPUs
136 try:
137 torch_gpu = check_torch_gpu_available()
138 if torch_gpu is not None: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 available_gpus.add(torch_gpu)
140 except Exception as e:
141 logger.debug("Torch GPU detection failed: %s", e)
143 # Check tensorflow GPUs
144 try:
145 tf_gpu = check_tf_gpu_available()
146 if tf_gpu is not None: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 available_gpus.add(tf_gpu)
148 except Exception as e:
149 logger.debug("TensorFlow GPU detection failed: %s", e)
151 # Skip JAX GPU detection to prevent thread explosion
152 # JAX creates 54+ threads during jax.devices() call
153 # JAX GPU detection will be done lazily only when JAX functions are actually used
154 # TODO: Add JAX GPU detection back when we have proper lazy initialization
155 logger.debug("Skipping JAX GPU detection to prevent thread explosion")
157 return sorted(list(available_gpus))
160# NOTE: acquire_gpu_slot() and release_gpu_slot() functions removed
161# These were orphaned code that was never actually called in the execution path.
162# GPU assignment happens at compilation time via GPUMemoryTypeValidator, not at runtime.
165def get_gpu_registry_status() -> Dict[int, Dict[str, int]]:
166 """
167 Get the current status of the GPU registry.
169 Thread-safe: Uses a lock to ensure consistent access to the global registry.
171 Returns:
172 Copy of the GPU registry
174 Raises:
175 RuntimeError: If the GPU registry is not initialized
176 """
178 with _registry_lock:
179 # Check if registry is initialized
180 if not _registry_initialized:
181 raise RuntimeError(
182 "Clause 295 Violation: GPU registry not initialized. "
183 "Must call initialize_gpu_registry() first."
184 )
186 # Return a copy of the registry to prevent external modification
187 return {gpu_id: info.copy() for gpu_id, info in GPU_REGISTRY.items()}
190def is_gpu_registry_initialized() -> bool:
191 """
192 Check if the GPU registry has been initialized.
194 Thread-safe: Uses a lock to ensure consistent access to the initialization flag.
196 Returns:
197 True if the registry is initialized, False otherwise
198 """
199 with _registry_lock:
200 return _registry_initialized
203def setup_global_gpu_registry(global_config: Optional[GlobalPipelineConfig] = None) -> None:
204 """
205 Initializes the global GPU registry using the provided or default global configuration.
207 This function should be called once at application startup. It ensures that the
208 GPU registry is initialized with worker configurations derived from the
209 GlobalPipelineConfig.
211 Args:
212 global_config (Optional[GlobalPipelineConfig]): An optional pre-loaded global
213 configuration object. If None, the default global configuration will be used.
214 """
215 # Use the existing thread-safe check from is_gpu_registry_initialized()
216 # but need to acquire lock to make the check-and-set atomic if we were to set _registry_initialized here.
217 # However, initialize_gpu_registry itself is internally locked and handles the _registry_initialized flag.
219 if is_gpu_registry_initialized():
220 logger.info("GPU registry is already initialized. Skipping setup.")
221 return
223 config_to_use: GlobalPipelineConfig
224 if global_config is None: 224 ↛ 228line 224 didn't jump to line 228 because the condition on line 224 was always true
225 logger.info("No global_config provided to setup_global_gpu_registry, using default configuration.")
226 config_to_use = get_default_global_config()
227 else:
228 config_to_use = global_config
230 # initialize_gpu_registry is already designed to be called once and is thread-safe.
231 # FAIL LOUD: No try-except - let exceptions bubble up to caller
232 initialize_gpu_registry(configured_num_workers=config_to_use.num_workers)
233 logger.info("Global GPU registry setup complete via setup_global_gpu_registry.")