Coverage for openhcs/io/base.py: 73.0%
83 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1# openhcs/io/storage/backends/base.py
2"""
3Abstract base classes for storage backends.
5This module defines the fundamental interfaces for storage backends,
6independent of specific implementations. It establishes the contract
7that all storage backends must fulfill.
8"""
10import logging
11import threading
12from abc import ABC, abstractmethod
13from pathlib import Path
14from typing import Any, Dict, List, Optional, Set, Union
15from openhcs.constants.constants import Backend
16from openhcs.io.exceptions import StorageResolutionError
17from openhcs.core.auto_register_meta import AutoRegisterMeta
19logger = logging.getLogger(__name__)
22class DataSink(ABC):
23 """
24 Abstract base class for data destinations.
26 Defines the minimal interface for sending data to any destination,
27 whether storage, streaming, or other data handling systems.
29 This interface follows OpenHCS principles:
30 - Fail-loud: No defensive programming, explicit error handling
31 - Minimal: Only essential operations both storage and streaming need
32 - Generic: Enables any type of data destination backend
33 """
35 @abstractmethod
36 def save(self, data: Any, identifier: Union[str, Path], **kwargs) -> None:
37 """
38 Send data to the destination.
40 Args:
41 data: The data to send
42 identifier: Unique identifier for the data (path-like for compatibility)
43 **kwargs: Backend-specific arguments
45 Raises:
46 TypeError: If identifier is not a valid type
47 ValueError: If data cannot be sent to destination
48 """
49 pass
51 @abstractmethod
52 def save_batch(self, data_list: List[Any], identifiers: List[Union[str, Path]], **kwargs) -> None:
53 """
54 Send multiple data objects to the destination in a single operation.
56 Args:
57 data_list: List of data objects to send
58 identifiers: List of unique identifiers (must match length of data_list)
59 **kwargs: Backend-specific arguments
61 Raises:
62 ValueError: If data_list and identifiers have different lengths
63 TypeError: If any identifier is not a valid type
64 ValueError: If any data cannot be sent to destination
65 """
66 pass
69class DataSource(ABC):
70 """
71 Abstract base class for read-only data sources.
73 Defines the minimal interface for loading data from any source,
74 whether filesystem, virtual workspace, remote storage, or databases.
76 This is the read-only counterpart to DataSink.
77 """
79 @abstractmethod
80 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
81 """
82 Load data from a file path.
84 Args:
85 file_path: Path to the file to load
86 **kwargs: Backend-specific arguments
88 Raises:
89 FileNotFoundError: If the file does not exist
90 TypeError: If file_path is not a valid type
91 ValueError: If the data cannot be loaded
92 """
93 pass
95 @abstractmethod
96 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
97 """
98 Load multiple files in a single batch operation.
100 Args:
101 file_paths: List of file paths to load
102 **kwargs: Backend-specific arguments
104 Raises:
105 FileNotFoundError: If any file does not exist
106 TypeError: If any file_path is not a valid type
107 ValueError: If any data cannot be loaded
108 """
109 pass
111 @abstractmethod
112 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None,
113 extensions: Optional[Set[str]] = None, recursive: bool = False,
114 **kwargs) -> List[str]:
115 """
116 List files in a directory.
118 Args:
119 directory: Directory to list files from
120 pattern: Optional glob pattern to filter files
121 extensions: Optional set of file extensions to filter (e.g., {'.tif', '.png'})
122 recursive: Whether to search recursively
123 **kwargs: Backend-specific arguments
125 Returns:
126 List of file paths (absolute or relative depending on backend)
127 """
128 pass
130 @abstractmethod
131 def exists(self, path: Union[str, Path]) -> bool:
132 """Check if a path exists."""
133 pass
135 @abstractmethod
136 def is_file(self, path: Union[str, Path]) -> bool:
137 """Check if a path is a file."""
138 pass
140 @abstractmethod
141 def is_dir(self, path: Union[str, Path]) -> bool:
142 """Check if a path is a directory."""
143 pass
145 @abstractmethod
146 def list_dir(self, path: Union[str, Path]) -> List[str]:
147 """List immediate entries in a directory (names only)."""
148 pass
151class VirtualBackend(DataSink):
152 """
153 Abstract base for backends that provide virtual filesystem semantics.
155 Virtual backends generate file listings on-demand without real filesystem operations.
156 Examples: OMERO (generates filenames from plate structure), S3 (lists objects), HTTP APIs.
158 Virtual backends may require additional context via kwargs.
159 Backends MUST validate required kwargs and raise TypeError if missing.
160 """
162 @abstractmethod
163 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
164 """
165 Load data from virtual path.
167 Args:
168 file_path: Virtual path to load
169 **kwargs: Backend-specific context (e.g., plate_id for OMERO)
171 Returns:
172 The loaded data
174 Raises:
175 FileNotFoundError: If the virtual path does not exist
176 TypeError: If required kwargs are missing
177 ValueError: If the data cannot be loaded
178 """
179 pass
181 @abstractmethod
182 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
183 """
184 Load multiple virtual paths in a single batch operation.
186 Args:
187 file_paths: List of virtual paths to load
188 **kwargs: Backend-specific context
190 Returns:
191 List of loaded data objects in the same order as file_paths
193 Raises:
194 FileNotFoundError: If any virtual path does not exist
195 TypeError: If required kwargs are missing
196 ValueError: If any data cannot be loaded
197 """
198 pass
200 @abstractmethod
201 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None,
202 extensions: Optional[Set[str]] = None, recursive: bool = False,
203 **kwargs) -> List[str]:
204 """
205 Generate virtual file listing.
207 Args:
208 directory: Virtual directory path
209 pattern: Optional file pattern filter
210 extensions: Optional set of file extensions to filter
211 recursive: Whether to list recursively
212 **kwargs: Backend-specific context (e.g., plate_id for OMERO)
214 Returns:
215 List of virtual filenames
217 Raises:
218 TypeError: If required kwargs are missing
219 ValueError: If directory is invalid
220 """
221 pass
223 @property
224 def requires_filesystem_validation(self) -> bool:
225 """
226 Whether this backend requires filesystem validation.
228 Virtual backends return False - they don't have real filesystem paths.
229 Real backends return True - they need path validation.
231 Returns:
232 False for virtual backends
233 """
234 return False
237class BackendBase(metaclass=AutoRegisterMeta):
238 """
239 Base class for all storage backends (read-only and read-write).
241 Defines the registry and common interface for backend discovery.
242 Concrete backends should inherit from StorageBackend or ReadOnlyBackend.
243 """
244 __registry_key__ = '_backend_type'
246 @property
247 @abstractmethod
248 def requires_filesystem_validation(self) -> bool:
249 """Whether this backend requires filesystem validation."""
250 pass
253class ReadOnlyBackend(BackendBase, DataSource):
254 """
255 Abstract base class for read-only storage backends with auto-registration.
257 Use this for backends that only need to read data (virtual workspaces,
258 read-only mounts, archive viewers, etc.).
260 Inherits from BackendBase (for registration) and DataSource (for read interface).
261 No write operations - clean separation of concerns.
263 Concrete implementations are automatically registered via AutoRegisterMeta.
264 """
266 @property
267 def requires_filesystem_validation(self) -> bool:
268 """
269 Whether this backend requires filesystem validation.
271 Returns:
272 False for virtual/remote backends, True for local filesystem
273 """
274 return False
276 # Inherits all abstract methods from DataSource:
277 # - load(), load_batch()
278 # - list_files(), list_dir()
279 # - exists(), is_file(), is_dir()
282class StorageBackend(BackendBase, DataSource, DataSink):
283 """
284 Abstract base class for read-write storage backends.
286 Extends DataSource (read) and DataSink (write) with file system operations
287 for backends that provide persistent storage with file-like semantics.
289 Concrete implementations are automatically registered via AutoRegisterMeta.
290 """
291 # Inherits load(), load_batch(), list_files(), etc. from DataSource
292 # Inherits save() and save_batch() from DataSink
294 @property
295 def requires_filesystem_validation(self) -> bool:
296 """
297 Whether this backend requires filesystem validation.
299 Returns:
300 True for real filesystem backends (default for StorageBackend)
301 """
302 return True
304 def exists(self, path: Union[str, Path]) -> bool:
305 """
306 Declarative truth test: does the path resolve to a valid object?
308 A path only 'exists' if:
309 - it is a valid file or directory
310 - or it is a symlink that resolves to a valid file or directory
312 Returns:
313 bool: True if path structurally resolves to a real object
314 """
315 try:
316 return self.is_file(path)
317 except (FileNotFoundError, NotADirectoryError, StorageResolutionError):
318 pass
319 except IsADirectoryError:
320 # Path exists but is a directory, so check if it's a valid directory
321 try:
322 return self.is_dir(path)
323 except (FileNotFoundError, NotADirectoryError, StorageResolutionError):
324 return False
326 # If is_file failed for other reasons, try is_dir
327 try:
328 return self.is_dir(path)
329 except (FileNotFoundError, NotADirectoryError, StorageResolutionError):
330 return False
333def _create_storage_registry() -> Dict[str, DataSink]:
334 """
335 Create a new storage registry using metaclass-based discovery.
337 This function creates a dictionary mapping backend names to their respective
338 storage backend instances using automatic discovery and registration.
340 Now returns Dict[str, DataSink] to support both StorageBackend and StreamingBackend.
342 Returns:
343 A dictionary mapping backend names to DataSink instances (polymorphic)
345 Note:
346 This function now uses the metaclass-based registry system for automatic
347 backend discovery, eliminating hardcoded imports.
348 """
349 # Import the metaclass-based registry system
350 from openhcs.io.backend_registry import create_storage_registry
352 return create_storage_registry()
355class _LazyStorageRegistry(dict):
356 """
357 Storage registry that auto-initializes on first access.
359 This maintains backward compatibility with existing code that
360 directly accesses storage_registry without calling ensure_storage_registry().
361 All read operations trigger lazy initialization, while write operations
362 (like OMERO backend registration) work without initialization.
363 """
365 def __getitem__(self, key):
366 ensure_storage_registry()
367 return super().__getitem__(key)
369 def __setitem__(self, key, value):
370 # Allow setting without initialization (for OMERO backend registration)
371 return super().__setitem__(key, value)
373 def __contains__(self, key):
374 ensure_storage_registry()
375 return super().__contains__(key)
377 def get(self, key, default=None):
378 ensure_storage_registry()
379 return super().get(key, default)
381 def keys(self):
382 ensure_storage_registry()
383 return super().keys()
385 def values(self):
386 ensure_storage_registry()
387 return super().values()
389 def items(self):
390 ensure_storage_registry()
391 return super().items()
394# Global singleton storage registry - created lazily on first access
395# This is the shared registry instance that all components should use
396storage_registry: Dict[str, DataSink] = _LazyStorageRegistry()
397_registry_initialized = False
398# Use RLock (reentrant lock) to allow same thread to acquire lock multiple times
399# This prevents deadlocks when gc.collect() triggers __del__ methods that access storage_registry
400_registry_lock = threading.RLock()
403def ensure_storage_registry() -> None:
404 """
405 Ensure storage registry is initialized.
407 Lazily creates the registry on first access to avoid importing
408 GPU-heavy backends during module import. This provides instant
409 imports while maintaining backward compatibility.
411 Thread-safe: Multiple threads can call this simultaneously.
412 """
413 global _registry_initialized
415 # Double-checked locking pattern for thread safety
416 if not _registry_initialized:
417 with _registry_lock:
418 if not _registry_initialized: 418 ↛ exitline 418 didn't jump to the function exit
419 storage_registry.update(_create_storage_registry())
420 _registry_initialized = True
421 logger.info("Lazily initialized storage registry")
424def get_backend(backend_type: str) -> DataSink:
425 """
426 Get a backend by type, ensuring registry is initialized.
428 Args:
429 backend_type: Backend type (e.g., 'disk', 'memory', 'zarr')
431 Returns:
432 Backend instance
434 Raises:
435 KeyError: If backend type not found
436 """
437 ensure_storage_registry()
439 backend_key = backend_type.lower()
440 if backend_key not in storage_registry:
441 raise KeyError(f"Backend '{backend_type}' not found. "
442 f"Available: {list(storage_registry.keys())}")
444 return storage_registry[backend_key]
447def reset_memory_backend() -> None:
448 """
449 Clear files from the memory backend while preserving directory structure.
451 This function clears all file entries from the existing memory backend but preserves
452 directory entries (None values). This prevents key collisions between plate executions
453 while maintaining the directory structure needed for subsequent operations.
455 Benefits over full reset:
456 - Preserves directory structure created by path planner
457 - Prevents "Parent path does not exist" errors on subsequent runs
458 - Avoids key collisions for special inputs/outputs
459 - Maintains performance by not recreating directory hierarchy
461 Note:
462 This only affects the memory backend. Other backends (disk, zarr) are not modified.
463 Caller is responsible for calling gc.collect() and GPU cleanup after this function.
464 """
466 # Clear files from existing memory backend while preserving directories
467 memory_backend = storage_registry[Backend.MEMORY.value]
468 memory_backend.clear_files_only()
469 logger.info("Memory backend reset - files cleared, directories preserved")