Coverage for openhcs/io/base.py: 73.0%

83 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1# openhcs/io/storage/backends/base.py 

2""" 

3Abstract base classes for storage backends. 

4 

5This module defines the fundamental interfaces for storage backends, 

6independent of specific implementations. It establishes the contract 

7that all storage backends must fulfill. 

8""" 

9 

10import logging 

11import threading 

12from abc import ABC, abstractmethod 

13from pathlib import Path 

14from typing import Any, Dict, List, Optional, Set, Union 

15from openhcs.constants.constants import Backend 

16from openhcs.io.exceptions import StorageResolutionError 

17from openhcs.core.auto_register_meta import AutoRegisterMeta 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22class DataSink(ABC): 

23 """ 

24 Abstract base class for data destinations. 

25 

26 Defines the minimal interface for sending data to any destination, 

27 whether storage, streaming, or other data handling systems. 

28 

29 This interface follows OpenHCS principles: 

30 - Fail-loud: No defensive programming, explicit error handling 

31 - Minimal: Only essential operations both storage and streaming need 

32 - Generic: Enables any type of data destination backend 

33 """ 

34 

35 @abstractmethod 

36 def save(self, data: Any, identifier: Union[str, Path], **kwargs) -> None: 

37 """ 

38 Send data to the destination. 

39 

40 Args: 

41 data: The data to send 

42 identifier: Unique identifier for the data (path-like for compatibility) 

43 **kwargs: Backend-specific arguments 

44 

45 Raises: 

46 TypeError: If identifier is not a valid type 

47 ValueError: If data cannot be sent to destination 

48 """ 

49 pass 

50 

51 @abstractmethod 

52 def save_batch(self, data_list: List[Any], identifiers: List[Union[str, Path]], **kwargs) -> None: 

53 """ 

54 Send multiple data objects to the destination in a single operation. 

55 

56 Args: 

57 data_list: List of data objects to send 

58 identifiers: List of unique identifiers (must match length of data_list) 

59 **kwargs: Backend-specific arguments 

60 

61 Raises: 

62 ValueError: If data_list and identifiers have different lengths 

63 TypeError: If any identifier is not a valid type 

64 ValueError: If any data cannot be sent to destination 

65 """ 

66 pass 

67 

68 

69class DataSource(ABC): 

70 """ 

71 Abstract base class for read-only data sources. 

72 

73 Defines the minimal interface for loading data from any source, 

74 whether filesystem, virtual workspace, remote storage, or databases. 

75 

76 This is the read-only counterpart to DataSink. 

77 """ 

78 

79 @abstractmethod 

80 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

81 """ 

82 Load data from a file path. 

83 

84 Args: 

85 file_path: Path to the file to load 

86 **kwargs: Backend-specific arguments 

87 

88 Raises: 

89 FileNotFoundError: If the file does not exist 

90 TypeError: If file_path is not a valid type 

91 ValueError: If the data cannot be loaded 

92 """ 

93 pass 

94 

95 @abstractmethod 

96 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

97 """ 

98 Load multiple files in a single batch operation. 

99 

100 Args: 

101 file_paths: List of file paths to load 

102 **kwargs: Backend-specific arguments 

103 

104 Raises: 

105 FileNotFoundError: If any file does not exist 

106 TypeError: If any file_path is not a valid type 

107 ValueError: If any data cannot be loaded 

108 """ 

109 pass 

110 

111 @abstractmethod 

112 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None, 

113 extensions: Optional[Set[str]] = None, recursive: bool = False, 

114 **kwargs) -> List[str]: 

115 """ 

116 List files in a directory. 

117 

118 Args: 

119 directory: Directory to list files from 

120 pattern: Optional glob pattern to filter files 

121 extensions: Optional set of file extensions to filter (e.g., {'.tif', '.png'}) 

122 recursive: Whether to search recursively 

123 **kwargs: Backend-specific arguments 

124 

125 Returns: 

126 List of file paths (absolute or relative depending on backend) 

127 """ 

128 pass 

129 

130 @abstractmethod 

131 def exists(self, path: Union[str, Path]) -> bool: 

132 """Check if a path exists.""" 

133 pass 

134 

135 @abstractmethod 

136 def is_file(self, path: Union[str, Path]) -> bool: 

137 """Check if a path is a file.""" 

138 pass 

139 

140 @abstractmethod 

141 def is_dir(self, path: Union[str, Path]) -> bool: 

142 """Check if a path is a directory.""" 

143 pass 

144 

145 @abstractmethod 

146 def list_dir(self, path: Union[str, Path]) -> List[str]: 

147 """List immediate entries in a directory (names only).""" 

148 pass 

149 

150 

151class VirtualBackend(DataSink): 

152 """ 

153 Abstract base for backends that provide virtual filesystem semantics. 

154 

155 Virtual backends generate file listings on-demand without real filesystem operations. 

156 Examples: OMERO (generates filenames from plate structure), S3 (lists objects), HTTP APIs. 

157 

158 Virtual backends may require additional context via kwargs. 

159 Backends MUST validate required kwargs and raise TypeError if missing. 

160 """ 

161 

162 @abstractmethod 

163 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

164 """ 

165 Load data from virtual path. 

166 

167 Args: 

168 file_path: Virtual path to load 

169 **kwargs: Backend-specific context (e.g., plate_id for OMERO) 

170 

171 Returns: 

172 The loaded data 

173 

174 Raises: 

175 FileNotFoundError: If the virtual path does not exist 

176 TypeError: If required kwargs are missing 

177 ValueError: If the data cannot be loaded 

178 """ 

179 pass 

180 

181 @abstractmethod 

182 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

183 """ 

184 Load multiple virtual paths in a single batch operation. 

185 

186 Args: 

187 file_paths: List of virtual paths to load 

188 **kwargs: Backend-specific context 

189 

190 Returns: 

191 List of loaded data objects in the same order as file_paths 

192 

193 Raises: 

194 FileNotFoundError: If any virtual path does not exist 

195 TypeError: If required kwargs are missing 

196 ValueError: If any data cannot be loaded 

197 """ 

198 pass 

199 

200 @abstractmethod 

201 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None, 

202 extensions: Optional[Set[str]] = None, recursive: bool = False, 

203 **kwargs) -> List[str]: 

204 """ 

205 Generate virtual file listing. 

206 

207 Args: 

208 directory: Virtual directory path 

209 pattern: Optional file pattern filter 

210 extensions: Optional set of file extensions to filter 

211 recursive: Whether to list recursively 

212 **kwargs: Backend-specific context (e.g., plate_id for OMERO) 

213 

214 Returns: 

215 List of virtual filenames 

216 

217 Raises: 

218 TypeError: If required kwargs are missing 

219 ValueError: If directory is invalid 

220 """ 

221 pass 

222 

223 @property 

224 def requires_filesystem_validation(self) -> bool: 

225 """ 

226 Whether this backend requires filesystem validation. 

227 

228 Virtual backends return False - they don't have real filesystem paths. 

229 Real backends return True - they need path validation. 

230 

231 Returns: 

232 False for virtual backends 

233 """ 

234 return False 

235 

236 

237class BackendBase(metaclass=AutoRegisterMeta): 

238 """ 

239 Base class for all storage backends (read-only and read-write). 

240 

241 Defines the registry and common interface for backend discovery. 

242 Concrete backends should inherit from StorageBackend or ReadOnlyBackend. 

243 """ 

244 __registry_key__ = '_backend_type' 

245 

246 @property 

247 @abstractmethod 

248 def requires_filesystem_validation(self) -> bool: 

249 """Whether this backend requires filesystem validation.""" 

250 pass 

251 

252 

253class ReadOnlyBackend(BackendBase, DataSource): 

254 """ 

255 Abstract base class for read-only storage backends with auto-registration. 

256 

257 Use this for backends that only need to read data (virtual workspaces, 

258 read-only mounts, archive viewers, etc.). 

259 

260 Inherits from BackendBase (for registration) and DataSource (for read interface). 

261 No write operations - clean separation of concerns. 

262 

263 Concrete implementations are automatically registered via AutoRegisterMeta. 

264 """ 

265 

266 @property 

267 def requires_filesystem_validation(self) -> bool: 

268 """ 

269 Whether this backend requires filesystem validation. 

270 

271 Returns: 

272 False for virtual/remote backends, True for local filesystem 

273 """ 

274 return False 

275 

276 # Inherits all abstract methods from DataSource: 

277 # - load(), load_batch() 

278 # - list_files(), list_dir() 

279 # - exists(), is_file(), is_dir() 

280 

281 

282class StorageBackend(BackendBase, DataSource, DataSink): 

283 """ 

284 Abstract base class for read-write storage backends. 

285 

286 Extends DataSource (read) and DataSink (write) with file system operations 

287 for backends that provide persistent storage with file-like semantics. 

288 

289 Concrete implementations are automatically registered via AutoRegisterMeta. 

290 """ 

291 # Inherits load(), load_batch(), list_files(), etc. from DataSource 

292 # Inherits save() and save_batch() from DataSink 

293 

294 @property 

295 def requires_filesystem_validation(self) -> bool: 

296 """ 

297 Whether this backend requires filesystem validation. 

298 

299 Returns: 

300 True for real filesystem backends (default for StorageBackend) 

301 """ 

302 return True 

303 

304 def exists(self, path: Union[str, Path]) -> bool: 

305 """ 

306 Declarative truth test: does the path resolve to a valid object? 

307 

308 A path only 'exists' if: 

309 - it is a valid file or directory 

310 - or it is a symlink that resolves to a valid file or directory 

311 

312 Returns: 

313 bool: True if path structurally resolves to a real object 

314 """ 

315 try: 

316 return self.is_file(path) 

317 except (FileNotFoundError, NotADirectoryError, StorageResolutionError): 

318 pass 

319 except IsADirectoryError: 

320 # Path exists but is a directory, so check if it's a valid directory 

321 try: 

322 return self.is_dir(path) 

323 except (FileNotFoundError, NotADirectoryError, StorageResolutionError): 

324 return False 

325 

326 # If is_file failed for other reasons, try is_dir 

327 try: 

328 return self.is_dir(path) 

329 except (FileNotFoundError, NotADirectoryError, StorageResolutionError): 

330 return False 

331 

332 

333def _create_storage_registry() -> Dict[str, DataSink]: 

334 """ 

335 Create a new storage registry using metaclass-based discovery. 

336 

337 This function creates a dictionary mapping backend names to their respective 

338 storage backend instances using automatic discovery and registration. 

339 

340 Now returns Dict[str, DataSink] to support both StorageBackend and StreamingBackend. 

341 

342 Returns: 

343 A dictionary mapping backend names to DataSink instances (polymorphic) 

344 

345 Note: 

346 This function now uses the metaclass-based registry system for automatic 

347 backend discovery, eliminating hardcoded imports. 

348 """ 

349 # Import the metaclass-based registry system 

350 from openhcs.io.backend_registry import create_storage_registry 

351 

352 return create_storage_registry() 

353 

354 

355class _LazyStorageRegistry(dict): 

356 """ 

357 Storage registry that auto-initializes on first access. 

358 

359 This maintains backward compatibility with existing code that 

360 directly accesses storage_registry without calling ensure_storage_registry(). 

361 All read operations trigger lazy initialization, while write operations 

362 (like OMERO backend registration) work without initialization. 

363 """ 

364 

365 def __getitem__(self, key): 

366 ensure_storage_registry() 

367 return super().__getitem__(key) 

368 

369 def __setitem__(self, key, value): 

370 # Allow setting without initialization (for OMERO backend registration) 

371 return super().__setitem__(key, value) 

372 

373 def __contains__(self, key): 

374 ensure_storage_registry() 

375 return super().__contains__(key) 

376 

377 def get(self, key, default=None): 

378 ensure_storage_registry() 

379 return super().get(key, default) 

380 

381 def keys(self): 

382 ensure_storage_registry() 

383 return super().keys() 

384 

385 def values(self): 

386 ensure_storage_registry() 

387 return super().values() 

388 

389 def items(self): 

390 ensure_storage_registry() 

391 return super().items() 

392 

393 

394# Global singleton storage registry - created lazily on first access 

395# This is the shared registry instance that all components should use 

396storage_registry: Dict[str, DataSink] = _LazyStorageRegistry() 

397_registry_initialized = False 

398# Use RLock (reentrant lock) to allow same thread to acquire lock multiple times 

399# This prevents deadlocks when gc.collect() triggers __del__ methods that access storage_registry 

400_registry_lock = threading.RLock() 

401 

402 

403def ensure_storage_registry() -> None: 

404 """ 

405 Ensure storage registry is initialized. 

406 

407 Lazily creates the registry on first access to avoid importing 

408 GPU-heavy backends during module import. This provides instant 

409 imports while maintaining backward compatibility. 

410 

411 Thread-safe: Multiple threads can call this simultaneously. 

412 """ 

413 global _registry_initialized 

414 

415 # Double-checked locking pattern for thread safety 

416 if not _registry_initialized: 

417 with _registry_lock: 

418 if not _registry_initialized: 418 ↛ exitline 418 didn't jump to the function exit

419 storage_registry.update(_create_storage_registry()) 

420 _registry_initialized = True 

421 logger.info("Lazily initialized storage registry") 

422 

423 

424def get_backend(backend_type: str) -> DataSink: 

425 """ 

426 Get a backend by type, ensuring registry is initialized. 

427 

428 Args: 

429 backend_type: Backend type (e.g., 'disk', 'memory', 'zarr') 

430 

431 Returns: 

432 Backend instance 

433 

434 Raises: 

435 KeyError: If backend type not found 

436 """ 

437 ensure_storage_registry() 

438 

439 backend_key = backend_type.lower() 

440 if backend_key not in storage_registry: 

441 raise KeyError(f"Backend '{backend_type}' not found. " 

442 f"Available: {list(storage_registry.keys())}") 

443 

444 return storage_registry[backend_key] 

445 

446 

447def reset_memory_backend() -> None: 

448 """ 

449 Clear files from the memory backend while preserving directory structure. 

450 

451 This function clears all file entries from the existing memory backend but preserves 

452 directory entries (None values). This prevents key collisions between plate executions 

453 while maintaining the directory structure needed for subsequent operations. 

454 

455 Benefits over full reset: 

456 - Preserves directory structure created by path planner 

457 - Prevents "Parent path does not exist" errors on subsequent runs 

458 - Avoids key collisions for special inputs/outputs 

459 - Maintains performance by not recreating directory hierarchy 

460 

461 Note: 

462 This only affects the memory backend. Other backends (disk, zarr) are not modified. 

463 Caller is responsible for calling gc.collect() and GPU cleanup after this function. 

464 """ 

465 

466 # Clear files from existing memory backend while preserving directories 

467 memory_backend = storage_registry[Backend.MEMORY.value] 

468 memory_backend.clear_files_only() 

469 logger.info("Memory backend reset - files cleared, directories preserved")