Coverage for openhcs/core/config_cache.py: 0.0%

101 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Unified Global Configuration Cache System 

3 

4Provides shared configuration caching logic with pluggable execution strategies 

5for different UI frameworks (async for TUI, Qt threading for PyQt). 

6""" 

7 

8import logging 

9import dill as pickle 

10from abc import ABC, abstractmethod 

11from pathlib import Path 

12from typing import Optional, Callable, Any 

13from concurrent.futures import ThreadPoolExecutor 

14 

15from openhcs.core.config import GlobalPipelineConfig, get_default_global_config 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class CacheExecutionStrategy(ABC): 

21 """Abstract strategy for executing cache operations.""" 

22 

23 @abstractmethod 

24 async def execute_load(self, cache_file: Path) -> Optional[GlobalPipelineConfig]: 

25 """Execute cache load operation.""" 

26 pass 

27 

28 @abstractmethod 

29 async def execute_save(self, config: GlobalPipelineConfig, cache_file: Path) -> bool: 

30 """Execute cache save operation.""" 

31 pass 

32 

33 

34class AsyncExecutionStrategy(CacheExecutionStrategy): 

35 """Async execution strategy for TUI.""" 

36 

37 async def execute_load(self, cache_file: Path) -> Optional[GlobalPipelineConfig]: 

38 import asyncio 

39 loop = asyncio.get_running_loop() 

40 return await loop.run_in_executor(None, _sync_load_config, cache_file) 

41 

42 async def execute_save(self, config: GlobalPipelineConfig, cache_file: Path) -> bool: 

43 import asyncio 

44 loop = asyncio.get_running_loop() 

45 return await loop.run_in_executor(None, _sync_save_config, config, cache_file) 

46 

47 

48class QtExecutionStrategy(CacheExecutionStrategy): 

49 """Qt threading execution strategy for PyQt GUI.""" 

50 

51 def __init__(self, thread_pool=None): 

52 self.thread_pool = thread_pool or ThreadPoolExecutor(max_workers=2) 

53 

54 async def execute_load(self, cache_file: Path) -> Optional[GlobalPipelineConfig]: 

55 # Convert to sync for Qt integration 

56 return _sync_load_config(cache_file) 

57 

58 async def execute_save(self, config: GlobalPipelineConfig, cache_file: Path) -> bool: 

59 # Convert to sync for Qt integration  

60 return _sync_save_config(config, cache_file) 

61 

62 

63def _sync_load_config(cache_file: Path) -> Optional[GlobalPipelineConfig]: 

64 """Synchronous config loading implementation.""" 

65 try: 

66 if not cache_file.exists(): 

67 return None 

68 

69 with open(cache_file, 'rb') as f: 

70 config = pickle.load(f) 

71 

72 if isinstance(config, GlobalPipelineConfig): 

73 logger.debug(f"Loaded cached config from: {cache_file}") 

74 return config 

75 else: 

76 logger.warning(f"Invalid config type in cache: {type(config)}") 

77 return None 

78 

79 except pickle.PickleError as e: 

80 logger.warning(f"Failed to unpickle cached config: {e}") 

81 return None 

82 except Exception as e: 

83 logger.warning(f"Failed to load cached config: {e}") 

84 return None 

85 

86 

87def _sync_save_config(config: GlobalPipelineConfig, cache_file: Path) -> bool: 

88 """Synchronous config saving implementation.""" 

89 try: 

90 # Ensure cache directory exists 

91 cache_file.parent.mkdir(parents=True, exist_ok=True) 

92 

93 with open(cache_file, 'wb') as f: 

94 pickle.dump(config, f) 

95 

96 logger.debug(f"Saved config to cache: {cache_file}") 

97 return True 

98 

99 except Exception as e: 

100 logger.error(f"Failed to save config to cache: {e}") 

101 return False 

102 

103 

104class UnifiedGlobalConfigCache: 

105 """ 

106 Unified global configuration cache with pluggable execution strategies. 

107  

108 Supports both async (TUI) and Qt threading (PyQt) execution patterns 

109 while sharing the core caching logic. 

110 """ 

111 

112 def __init__(self, cache_file: Optional[Path] = None, strategy: Optional[CacheExecutionStrategy] = None): 

113 if cache_file is None: 

114 from openhcs.core.xdg_paths import get_config_file_path 

115 cache_file = get_config_file_path("global_config.config") 

116 

117 self.cache_file = cache_file 

118 self.strategy = strategy or AsyncExecutionStrategy() 

119 logger.debug(f"UnifiedGlobalConfigCache initialized with cache file: {self.cache_file}") 

120 

121 async def load_cached_config(self) -> Optional[GlobalPipelineConfig]: 

122 """Load cached global config from disk.""" 

123 return await self.strategy.execute_load(self.cache_file) 

124 

125 async def save_config_to_cache(self, config: GlobalPipelineConfig) -> bool: 

126 """Save global config to cache.""" 

127 return await self.strategy.execute_save(config, self.cache_file) 

128 

129 async def clear_cache(self) -> bool: 

130 """Clear cached config by removing the cache file.""" 

131 try: 

132 if self.cache_file.exists(): 

133 self.cache_file.unlink() 

134 logger.info(f"Cleared config cache: {self.cache_file}") 

135 return True 

136 except Exception as e: 

137 logger.error(f"Failed to clear config cache: {e}") 

138 return False 

139 

140 

141# Global instance for easy access 

142_global_config_cache: Optional[UnifiedGlobalConfigCache] = None 

143 

144 

145def get_global_config_cache(strategy: Optional[CacheExecutionStrategy] = None) -> UnifiedGlobalConfigCache: 

146 """Get global config cache instance with optional strategy.""" 

147 global _global_config_cache 

148 if _global_config_cache is None or (strategy and _global_config_cache.strategy != strategy): 

149 _global_config_cache = UnifiedGlobalConfigCache(strategy=strategy) 

150 return _global_config_cache 

151 

152 

153async def load_cached_global_config(strategy: Optional[CacheExecutionStrategy] = None) -> GlobalPipelineConfig: 

154 """ 

155 Load global config with cache fallback. 

156  

157 Args: 

158 strategy: Optional execution strategy (defaults to async) 

159  

160 Returns: 

161 GlobalPipelineConfig (cached or default) 

162 """ 

163 try: 

164 cache = get_global_config_cache(strategy) 

165 cached_config = await cache.load_cached_config() 

166 if cached_config is not None: 

167 logger.info("Using cached global configuration") 

168 return cached_config 

169 except Exception as e: 

170 logger.warning(f"Failed to load cached config, using defaults: {e}") 

171 

172 # Fallback to default config 

173 logger.info("Using default global configuration") 

174 return get_default_global_config() 

175 

176 

177def load_cached_global_config_sync() -> GlobalPipelineConfig: 

178 """ 

179 Synchronous version for startup scenarios. 

180 

181 Returns: 

182 GlobalPipelineConfig (cached or default) 

183 """ 

184 try: 

185 from openhcs.core.xdg_paths import get_config_file_path 

186 cache_file = get_config_file_path("global_config.config") 

187 cached_config = _sync_load_config(cache_file) 

188 if cached_config is not None: 

189 logger.info("Using cached global configuration") 

190 return cached_config 

191 except Exception as e: 

192 logger.warning(f"Failed to load cached config, using defaults: {e}") 

193 

194 # Fallback to default config 

195 logger.info("Using default global configuration") 

196 return get_default_global_config()