Coverage for openhcs/core/orchestrator/gpu_scheduler.py: 65.9%
62 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2GPU scheduler registry for OpenHCS.
4This module provides a thread-safe registry for GPU assignment during
5multi-pipeline execution, enforcing maximum concurrency limits per GPU
6and ensuring deterministic GPU assignment without runtime fallbacks.
8The GPU registry is a global singleton that must be initialized exactly once
9during application startup, before any pipeline threads are created. It is
10shared across all threads to ensure consistent GPU resource management.
12Thread Safety:
13 All functions in this module are thread-safe and use a lock to ensure
14 consistent access to the global registry.
16Doctrinal Clauses:
17- Clause 12 — Absolute Clean Execution
18- Clause 88 — No Inferred Capabilities
19- Clause 293 — Predeclared GPU Availability
20- Clause 295 — GPU Scheduling Affinity
21"""
23import logging
24import math
25import os
26import threading
27from typing import Dict, List, Optional
29# DEFAULT_NUM_WORKERS removed
30from openhcs.core.lazy_gpu_imports import check_gpu_capability, check_installed_gpu_libraries
31import os
32# Import necessary config classes
33from openhcs.core.config import GlobalPipelineConfig
36logger = logging.getLogger(__name__) # Ensure logger is consistently named if used across module
38# Thread-safe lock for GPU registry access
39# Use RLock (reentrant lock) to allow same thread to acquire lock multiple times
40# This prevents deadlocks when gc.collect() triggers __del__ methods that access GPU registry
41_registry_lock = threading.RLock()
43# GPU registry singleton
44# Structure: {gpu_id: {"max_pipelines": int}}
45# Simplified: removed unused "active" count since no runtime coordination exists
46GPU_REGISTRY: Dict[int, Dict[str, int]] = {}
48# Flag to track if the registry has been initialized
49_registry_initialized = False
52def initialize_gpu_registry(configured_num_workers: int) -> None:
53 """
54 Initialize the GPU registry based on available GPUs and configured number of workers.
56 This function detects available GPUs, calculates the maximum number of
57 concurrent pipelines per GPU (influenced by `configured_num_workers`),
58 and initializes the GPU registry.
60 Args:
61 configured_num_workers (int): The number of workers specified in the
62 global configuration, used as a fallback
63 if os.cpu_count() is not available or to
64 influence pipelines per GPU.
66 Must be called exactly once during application startup, before any
67 pipeline threads are created. The registry is a global singleton
68 shared across all threads.
70 Thread-safe: Uses a lock to ensure consistent access to the global registry.
72 Raises:
73 RuntimeError: If no GPUs are available or if the registry is already initialized.
74 """
75 global GPU_REGISTRY, _registry_initialized
77 with _registry_lock:
78 # Check if registry is already initialized
79 if _registry_initialized: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 raise RuntimeError(
81 "Clause 295 Violation: GPU registry already initialized. "
82 "Cannot reinitialize during execution."
83 )
85 # Detect available GPUs
86 available_gpus = _detect_available_gpus()
87 logger.info(f"Detected GPUs: {available_gpus}")
89 if not available_gpus: 89 ↛ 96line 89 didn't jump to line 96 because the condition on line 89 was always true
90 logger.warning("No GPUs detected. GPU memory types will not be available.")
91 _registry_initialized = True
92 GPU_REGISTRY.clear()
93 return
95 # Get maximum CPU threads (use CPU count as a proxy, fallback to configured_num_workers)
96 max_cpu_threads = os.cpu_count() or configured_num_workers
97 if max_cpu_threads <= 0: # Ensure positive
98 max_cpu_threads = 1
101 # Calculate maximum pipelines per GPU
102 max_pipelines_per_gpu = math.ceil(max_cpu_threads / len(available_gpus))
104 # Initialize registry
105 GPU_REGISTRY.clear()
106 for gpu_id in available_gpus:
107 GPU_REGISTRY[gpu_id] = {"max_pipelines": max_pipelines_per_gpu}
109 logger.info(
110 "GPU registry initialized with %s GPUs. Maximum %s pipelines per GPU.",
111 len(available_gpus), max_pipelines_per_gpu
112 )
114 # Mark registry as initialized
115 _registry_initialized = True
118def _detect_available_gpus() -> List[int]:
119 """
120 Detect available GPUs across all supported frameworks.
122 Returns:
123 List of available GPU IDs
124 """
125 # Skip GPU detection if in subprocess mode
126 if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 return []
129 available_gpus = set()
131 # Check each GPU library
132 for lib_name in ['cupy', 'torch', 'tensorflow', 'jax']:
133 try:
134 gpu_id = check_gpu_capability(lib_name)
135 if gpu_id is not None: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 available_gpus.add(gpu_id)
137 except Exception as e:
138 logger.debug(f"{lib_name.capitalize()} GPU detection failed: {e}")
140 return sorted(list(available_gpus))
143# NOTE: acquire_gpu_slot() and release_gpu_slot() functions removed
144# These were orphaned code that was never actually called in the execution path.
145# GPU assignment happens at compilation time via GPUMemoryTypeValidator, not at runtime.
148def get_gpu_registry_status() -> Dict[int, Dict[str, int]]:
149 """
150 Get the current status of the GPU registry.
152 Thread-safe: Uses a lock to ensure consistent access to the global registry.
154 Returns:
155 Copy of the GPU registry
157 Raises:
158 RuntimeError: If the GPU registry is not initialized
159 """
161 with _registry_lock:
162 # Check if registry is initialized
163 if not _registry_initialized:
164 raise RuntimeError(
165 "Clause 295 Violation: GPU registry not initialized. "
166 "Must call initialize_gpu_registry() first."
167 )
169 # Return a copy of the registry to prevent external modification
170 return {gpu_id: info.copy() for gpu_id, info in GPU_REGISTRY.items()}
173def is_gpu_registry_initialized() -> bool:
174 """
175 Check if the GPU registry has been initialized.
177 Thread-safe: Uses a lock to ensure consistent access to the initialization flag.
179 Returns:
180 True if the registry is initialized, False otherwise
181 """
182 with _registry_lock:
183 return _registry_initialized
186def setup_global_gpu_registry(global_config: Optional[GlobalPipelineConfig] = None) -> None:
187 """
188 Initializes the global GPU registry using the provided or default global configuration.
190 This function should be called once at application startup. It ensures that the
191 GPU registry is initialized with worker configurations derived from the
192 GlobalPipelineConfig.
194 Args:
195 global_config (Optional[GlobalPipelineConfig]): An optional pre-loaded global
196 configuration object. If None, the default global configuration will be used.
197 """
198 # Use the existing thread-safe check from is_gpu_registry_initialized()
199 # but need to acquire lock to make the check-and-set atomic if we were to set _registry_initialized here.
200 # However, initialize_gpu_registry itself is internally locked and handles the _registry_initialized flag.
202 if is_gpu_registry_initialized():
203 logger.info("GPU registry is already initialized. Skipping setup.")
204 return
206 config_to_use: GlobalPipelineConfig
207 if global_config is None:
208 logger.info("No global_config provided to setup_global_gpu_registry, using default configuration.")
209 config_to_use = GlobalPipelineConfig()
210 else:
211 config_to_use = global_config
213 # initialize_gpu_registry is already designed to be called once and is thread-safe.
214 # FAIL LOUD: No try-except - let exceptions bubble up to caller
215 initialize_gpu_registry(configured_num_workers=config_to_use.num_workers)
216 logger.info("Global GPU registry setup complete via setup_global_gpu_registry.")