Coverage for openhcs/io/base.py: 53.3%

41 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1# openhcs/io/storage/backends/base.py 

2""" 

3Abstract base classes for storage backends. 

4 

5This module defines the fundamental interfaces for storage backends, 

6independent of specific implementations. It establishes the contract 

7that all storage backends must fulfill. 

8""" 

9 

10import logging 

11from abc import ABC, abstractmethod 

12from pathlib import Path 

13from typing import Any, Dict, List, Optional, Set, Type, Union, Callable 

14from functools import wraps 

15from openhcs.constants.constants import Backend 

16from openhcs.io.exceptions import StorageResolutionError 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class DataSink(ABC): 

22 """ 

23 Abstract base class for data destinations. 

24 

25 Defines the minimal interface for sending data to any destination, 

26 whether storage, streaming, or other data handling systems. 

27 

28 This interface follows OpenHCS principles: 

29 - Fail-loud: No defensive programming, explicit error handling 

30 - Minimal: Only essential operations both storage and streaming need 

31 - Generic: Enables any type of data destination backend 

32 """ 

33 

34 @abstractmethod 

35 def save(self, data: Any, identifier: Union[str, Path], **kwargs) -> None: 

36 """ 

37 Send data to the destination. 

38 

39 Args: 

40 data: The data to send 

41 identifier: Unique identifier for the data (path-like for compatibility) 

42 **kwargs: Backend-specific arguments 

43 

44 Raises: 

45 TypeError: If identifier is not a valid type 

46 ValueError: If data cannot be sent to destination 

47 """ 

48 pass 

49 

50 @abstractmethod 

51 def save_batch(self, data_list: List[Any], identifiers: List[Union[str, Path]], **kwargs) -> None: 

52 """ 

53 Send multiple data objects to the destination in a single operation. 

54 

55 Args: 

56 data_list: List of data objects to send 

57 identifiers: List of unique identifiers (must match length of data_list) 

58 **kwargs: Backend-specific arguments 

59 

60 Raises: 

61 ValueError: If data_list and identifiers have different lengths 

62 TypeError: If any identifier is not a valid type 

63 ValueError: If any data cannot be sent to destination 

64 """ 

65 pass 

66 

67 

68class StorageBackend(DataSink): 

69 """ 

70 Abstract base class for persistent storage operations. 

71 

72 Extends DataSink with retrieval capabilities and file system operations 

73 for backends that provide persistent storage with file-like semantics. 

74 

75 Concrete implementations should use StorageBackendMeta for automatic registration. 

76 """ 

77 

78 # Inherits save() and save_batch() from DataSink 

79 

80 @abstractmethod 

81 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

82 """ 

83 Load data from a file. 

84 

85 Args: 

86 file_path: Path to the file to load 

87 **kwargs: Additional arguments for the load operation 

88 

89 Returns: 

90 The loaded data 

91 

92 Raises: 

93 FileNotFoundError: If the file does not exist 

94 TypeError: If the file_path is not a valid path type 

95 ValueError: If the file cannot be loaded 

96 """ 

97 pass 

98 

99 @abstractmethod 

100 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None: 

101 """ 

102 Save data to a file. 

103 

104 Args: 

105 data: The data to save 

106 output_path: Path where the data should be saved 

107 **kwargs: Additional arguments for the save operation 

108 

109 Raises: 

110 TypeError: If the output_path is not a valid path type 

111 ValueError: If the data cannot be saved 

112 """ 

113 pass 

114 

115 @abstractmethod 

116 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

117 """ 

118 Load multiple files in a single batch operation. 

119 

120 Args: 

121 file_paths: List of file paths to load 

122 **kwargs: Additional arguments for the load operation 

123 

124 Returns: 

125 List of loaded data objects in the same order as file_paths 

126 

127 Raises: 

128 FileNotFoundError: If any file does not exist 

129 TypeError: If any file_path is not a valid path type 

130 ValueError: If any file cannot be loaded 

131 """ 

132 pass 

133 

134 @abstractmethod 

135 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

136 """ 

137 Save multiple data objects in a single batch operation. 

138 

139 Args: 

140 data_list: List of data objects to save 

141 output_paths: List of destination paths (must match length of data_list) 

142 **kwargs: Additional arguments for the save operation 

143 

144 Raises: 

145 ValueError: If data_list and output_paths have different lengths 

146 TypeError: If any output_path is not a valid path type 

147 ValueError: If any data cannot be saved 

148 """ 

149 pass 

150 

151 @abstractmethod 

152 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None, 

153 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Path]: 

154 """ 

155 List files in a directory, optionally filtering by pattern and extensions. 

156 

157 Args: 

158 directory: Directory to search. 

159 pattern: Optional glob pattern to match filenames. 

160 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}). 

161 Extensions should include the dot and are case-insensitive. 

162 recursive: Whether to search recursively. 

163 

164 Returns: 

165 List of paths to matching files. 

166 

167 Raises: 

168 TypeError: If the directory is not a valid path type 

169 FileNotFoundError: If the directory does not exist 

170 """ 

171 pass 

172 

173 @abstractmethod 

174 def list_dir(self, path: Union[str, Path]) -> List[str]: 

175 """ 

176 List the names of immediate entries in a directory. 

177 

178 Args: 

179 path: Directory path to list. 

180 

181 Returns: 

182 List of entry names (not full paths) in the directory. 

183 

184 Raises: 

185 FileNotFoundError: If the path does not exist. 

186 NotADirectoryError: If the path is not a directory. 

187 TypeError: If the path is not a valid path type. 

188 """ 

189 pass 

190 

191 @abstractmethod 

192 def delete(self, file_path: Union[str, Path]) -> None: 

193 """ 

194 Delete a file. 

195 

196 Args: 

197 file_path: Path to the file to delete 

198 

199 Raises: 

200 TypeError: If the file_path is not a valid path type 

201 FileNotFoundError: If the file does not exist 

202 ValueError: If the file cannot be deleted 

203 """ 

204 pass 

205 

206 @abstractmethod 

207 def delete_all(self, file_path: Union[str, Path]) -> None: 

208 """ 

209 Deletes a file or a folder in full. 

210 

211 Args: 

212 file_path: Path to the file to delete 

213 

214 Raises: 

215 TypeError: If the file_path is not a valid path type 

216 ValueError: If the file cannot be deleted 

217 """ 

218 pass 

219 

220 

221 @abstractmethod 

222 def ensure_directory(self, directory: Union[str, Path]) -> Path: 

223 """ 

224 Ensure a directory exists, creating it if necessary. 

225 

226 Args: 

227 directory: Path to the directory to ensure exists 

228 

229 Returns: 

230 The path to the directory 

231 

232 Raises: 

233 TypeError: If the directory is not a valid path type 

234 ValueError: If the directory cannot be created 

235 """ 

236 pass 

237 

238 

239 @abstractmethod 

240 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path]): 

241 """ 

242 Creates a symlink from source to link_name. 

243 

244 Args: 

245 source: Path to the source file 

246 link_name: Path where the symlink should be created 

247 

248 Raises: 

249 TypeError: If the path is not a valid path type 

250 """ 

251 pass 

252 

253 @abstractmethod 

254 def is_symlink(self, source: Union[str, Path]) -> bool: 

255 """ 

256 Checks if a path is a symlink. 

257 

258 Args: 

259 source: Path to the source file 

260 

261 Raises: 

262 TypeError: If the path is not a valid path type 

263 """ 

264 

265 @abstractmethod 

266 def is_file(self, source: Union[str, Path]) -> bool: 

267 """ 

268 Checks if a path is a file. 

269 

270 Args: 

271 source: Path to the source file 

272 

273 Raises: 

274 TypeError: If the path is not a valid path type 

275 """ 

276 @abstractmethod 

277 def is_dir(self, source: Union[str, Path]) -> bool: 

278 """ 

279 Checks if a path is a symlink. 

280 

281 Args: 

282 source: Path to the source file 

283 

284 Raises: 

285 TypeError: If the path is not a valid path type 

286 """ 

287 

288 @abstractmethod 

289 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

290 """  

291 Move a file or directory from src to dst. 

292 

293 Args: 

294 src: Path to the source file 

295 dst: Path to the destination file 

296 

297 Raises: 

298 TypeError: If the path is not a valid path type 

299 FileNotFoundError: If the source file does not exist 

300 FileExistsError: If the destination file already exists 

301 ValueError: If the file cannot be moved 

302 """ 

303 pass 

304 

305 @abstractmethod 

306 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

307 """ 

308 Copy a file or directory from src to dst. 

309 

310 Args: 

311 src: Path to the source file 

312 dst: Path to the destination file 

313 

314 Raises: 

315 TypeError: If the path is not a valid path type 

316 FileNotFoundError: If the source file does not exist 

317 FileExistsError: If the destination file already exists 

318 ValueError: If the file cannot be copied 

319 """ 

320 pass 

321 

322 @abstractmethod 

323 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

324 """ 

325 Get metadata for a file or directory. 

326 

327 Args: 

328 src: Path to the source file 

329 

330 Raises: 

331 TypeError: If the path is not a valid path type 

332 FileNotFoundError: If the source file does not exist 

333 """ 

334 pass 

335 

336 def exists(self, path: Union[str, Path]) -> bool: 

337 """ 

338 Declarative truth test: does the path resolve to a valid object? 

339 

340 A path only 'exists' if: 

341 - it is a valid file or directory 

342 - or it is a symlink that resolves to a valid file or directory 

343 

344 Returns: 

345 bool: True if path structurally resolves to a real object 

346 """ 

347 try: 

348 return self.is_file(path) 

349 except (FileNotFoundError, NotADirectoryError, StorageResolutionError): 

350 pass 

351 except IsADirectoryError: 

352 # Path exists but is a directory, so check if it's a valid directory 

353 try: 

354 return self.is_dir(path) 

355 except (FileNotFoundError, NotADirectoryError, StorageResolutionError): 

356 return False 

357 

358 # If is_file failed for other reasons, try is_dir 

359 try: 

360 return self.is_dir(path) 

361 except (FileNotFoundError, NotADirectoryError, StorageResolutionError): 

362 return False 

363 

364 

365def _create_storage_registry() -> Dict[str, DataSink]: 

366 """ 

367 Create a new storage registry using metaclass-based discovery. 

368 

369 This function creates a dictionary mapping backend names to their respective 

370 storage backend instances using automatic discovery and registration. 

371 

372 Now returns Dict[str, DataSink] to support both StorageBackend and StreamingBackend. 

373 

374 Returns: 

375 A dictionary mapping backend names to DataSink instances (polymorphic) 

376 

377 Note: 

378 This function now uses the metaclass-based registry system for automatic 

379 backend discovery, eliminating hardcoded imports. 

380 """ 

381 # Import the metaclass-based registry system 

382 from openhcs.io.backend_registry import create_storage_registry 

383 

384 return create_storage_registry() 

385 

386 

387# Global singleton storage registry - created lazily to avoid GPU imports in subprocess mode 

388# This is the shared registry instance that all components should use 

389import os 

390if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 390 ↛ 392line 390 didn't jump to line 392 because the condition on line 390 was never true

391 # Subprocess runner mode - create minimal registry with only essential backends 

392 storage_registry: Dict[str, DataSink] = {} 

393 logger.info("Subprocess runner mode - storage registry will be created lazily") 

394else: 

395 # Normal mode - create full registry at import time 

396 storage_registry: Dict[str, DataSink] = _create_storage_registry() 

397 

398 

399def ensure_storage_registry() -> None: 

400 """ 

401 Ensure storage registry is initialized. 

402 

403 In subprocess runner mode, the registry is created lazily to avoid 

404 importing GPU libraries during subprocess runner initialization. 

405 """ 

406 global storage_registry 

407 if not storage_registry: 

408 storage_registry.update(_create_storage_registry()) 

409 logger.info("Lazily created storage registry") 

410 

411 

412def reset_memory_backend() -> None: 

413 """ 

414 Clear files from the memory backend while preserving directory structure. 

415 

416 This function clears all file entries from the existing memory backend but preserves 

417 directory entries (None values). This prevents key collisions between plate executions 

418 while maintaining the directory structure needed for subsequent operations. 

419 

420 Benefits over full reset: 

421 - Preserves directory structure created by path planner 

422 - Prevents "Parent path does not exist" errors on subsequent runs 

423 - Avoids key collisions for special inputs/outputs 

424 - Maintains performance by not recreating directory hierarchy 

425 

426 Note: 

427 This only affects the memory backend. Other backends (disk, zarr) are not modified. 

428 """ 

429 from openhcs.constants.constants import Backend 

430 

431 # Clear files from existing memory backend while preserving directories 

432 memory_backend = storage_registry[Backend.MEMORY.value] 

433 memory_backend.clear_files_only() 

434 logger.info("Memory backend reset - files cleared, directories preserved")