Coverage for openhcs/io/base.py: 53.3%
41 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1# openhcs/io/storage/backends/base.py
2"""
3Abstract base classes for storage backends.
5This module defines the fundamental interfaces for storage backends,
6independent of specific implementations. It establishes the contract
7that all storage backends must fulfill.
8"""
10import logging
11from abc import ABC, abstractmethod
12from pathlib import Path
13from typing import Any, Dict, List, Optional, Set, Type, Union, Callable
14from functools import wraps
15from openhcs.constants.constants import Backend
16from openhcs.io.exceptions import StorageResolutionError
18logger = logging.getLogger(__name__)
21class DataSink(ABC):
22 """
23 Abstract base class for data destinations.
25 Defines the minimal interface for sending data to any destination,
26 whether storage, streaming, or other data handling systems.
28 This interface follows OpenHCS principles:
29 - Fail-loud: No defensive programming, explicit error handling
30 - Minimal: Only essential operations both storage and streaming need
31 - Generic: Enables any type of data destination backend
32 """
34 @abstractmethod
35 def save(self, data: Any, identifier: Union[str, Path], **kwargs) -> None:
36 """
37 Send data to the destination.
39 Args:
40 data: The data to send
41 identifier: Unique identifier for the data (path-like for compatibility)
42 **kwargs: Backend-specific arguments
44 Raises:
45 TypeError: If identifier is not a valid type
46 ValueError: If data cannot be sent to destination
47 """
48 pass
50 @abstractmethod
51 def save_batch(self, data_list: List[Any], identifiers: List[Union[str, Path]], **kwargs) -> None:
52 """
53 Send multiple data objects to the destination in a single operation.
55 Args:
56 data_list: List of data objects to send
57 identifiers: List of unique identifiers (must match length of data_list)
58 **kwargs: Backend-specific arguments
60 Raises:
61 ValueError: If data_list and identifiers have different lengths
62 TypeError: If any identifier is not a valid type
63 ValueError: If any data cannot be sent to destination
64 """
65 pass
68class StorageBackend(DataSink):
69 """
70 Abstract base class for persistent storage operations.
72 Extends DataSink with retrieval capabilities and file system operations
73 for backends that provide persistent storage with file-like semantics.
75 Concrete implementations should use StorageBackendMeta for automatic registration.
76 """
78 # Inherits save() and save_batch() from DataSink
80 @abstractmethod
81 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
82 """
83 Load data from a file.
85 Args:
86 file_path: Path to the file to load
87 **kwargs: Additional arguments for the load operation
89 Returns:
90 The loaded data
92 Raises:
93 FileNotFoundError: If the file does not exist
94 TypeError: If the file_path is not a valid path type
95 ValueError: If the file cannot be loaded
96 """
97 pass
99 @abstractmethod
100 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None:
101 """
102 Save data to a file.
104 Args:
105 data: The data to save
106 output_path: Path where the data should be saved
107 **kwargs: Additional arguments for the save operation
109 Raises:
110 TypeError: If the output_path is not a valid path type
111 ValueError: If the data cannot be saved
112 """
113 pass
115 @abstractmethod
116 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
117 """
118 Load multiple files in a single batch operation.
120 Args:
121 file_paths: List of file paths to load
122 **kwargs: Additional arguments for the load operation
124 Returns:
125 List of loaded data objects in the same order as file_paths
127 Raises:
128 FileNotFoundError: If any file does not exist
129 TypeError: If any file_path is not a valid path type
130 ValueError: If any file cannot be loaded
131 """
132 pass
134 @abstractmethod
135 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None:
136 """
137 Save multiple data objects in a single batch operation.
139 Args:
140 data_list: List of data objects to save
141 output_paths: List of destination paths (must match length of data_list)
142 **kwargs: Additional arguments for the save operation
144 Raises:
145 ValueError: If data_list and output_paths have different lengths
146 TypeError: If any output_path is not a valid path type
147 ValueError: If any data cannot be saved
148 """
149 pass
151 @abstractmethod
152 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None,
153 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Path]:
154 """
155 List files in a directory, optionally filtering by pattern and extensions.
157 Args:
158 directory: Directory to search.
159 pattern: Optional glob pattern to match filenames.
160 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}).
161 Extensions should include the dot and are case-insensitive.
162 recursive: Whether to search recursively.
164 Returns:
165 List of paths to matching files.
167 Raises:
168 TypeError: If the directory is not a valid path type
169 FileNotFoundError: If the directory does not exist
170 """
171 pass
173 @abstractmethod
174 def list_dir(self, path: Union[str, Path]) -> List[str]:
175 """
176 List the names of immediate entries in a directory.
178 Args:
179 path: Directory path to list.
181 Returns:
182 List of entry names (not full paths) in the directory.
184 Raises:
185 FileNotFoundError: If the path does not exist.
186 NotADirectoryError: If the path is not a directory.
187 TypeError: If the path is not a valid path type.
188 """
189 pass
191 @abstractmethod
192 def delete(self, file_path: Union[str, Path]) -> None:
193 """
194 Delete a file.
196 Args:
197 file_path: Path to the file to delete
199 Raises:
200 TypeError: If the file_path is not a valid path type
201 FileNotFoundError: If the file does not exist
202 ValueError: If the file cannot be deleted
203 """
204 pass
206 @abstractmethod
207 def delete_all(self, file_path: Union[str, Path]) -> None:
208 """
209 Deletes a file or a folder in full.
211 Args:
212 file_path: Path to the file to delete
214 Raises:
215 TypeError: If the file_path is not a valid path type
216 ValueError: If the file cannot be deleted
217 """
218 pass
221 @abstractmethod
222 def ensure_directory(self, directory: Union[str, Path]) -> Path:
223 """
224 Ensure a directory exists, creating it if necessary.
226 Args:
227 directory: Path to the directory to ensure exists
229 Returns:
230 The path to the directory
232 Raises:
233 TypeError: If the directory is not a valid path type
234 ValueError: If the directory cannot be created
235 """
236 pass
239 @abstractmethod
240 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path]):
241 """
242 Creates a symlink from source to link_name.
244 Args:
245 source: Path to the source file
246 link_name: Path where the symlink should be created
248 Raises:
249 TypeError: If the path is not a valid path type
250 """
251 pass
253 @abstractmethod
254 def is_symlink(self, source: Union[str, Path]) -> bool:
255 """
256 Checks if a path is a symlink.
258 Args:
259 source: Path to the source file
261 Raises:
262 TypeError: If the path is not a valid path type
263 """
265 @abstractmethod
266 def is_file(self, source: Union[str, Path]) -> bool:
267 """
268 Checks if a path is a file.
270 Args:
271 source: Path to the source file
273 Raises:
274 TypeError: If the path is not a valid path type
275 """
276 @abstractmethod
277 def is_dir(self, source: Union[str, Path]) -> bool:
278 """
279 Checks if a path is a symlink.
281 Args:
282 source: Path to the source file
284 Raises:
285 TypeError: If the path is not a valid path type
286 """
288 @abstractmethod
289 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
290 """
291 Move a file or directory from src to dst.
293 Args:
294 src: Path to the source file
295 dst: Path to the destination file
297 Raises:
298 TypeError: If the path is not a valid path type
299 FileNotFoundError: If the source file does not exist
300 FileExistsError: If the destination file already exists
301 ValueError: If the file cannot be moved
302 """
303 pass
305 @abstractmethod
306 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
307 """
308 Copy a file or directory from src to dst.
310 Args:
311 src: Path to the source file
312 dst: Path to the destination file
314 Raises:
315 TypeError: If the path is not a valid path type
316 FileNotFoundError: If the source file does not exist
317 FileExistsError: If the destination file already exists
318 ValueError: If the file cannot be copied
319 """
320 pass
322 @abstractmethod
323 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
324 """
325 Get metadata for a file or directory.
327 Args:
328 src: Path to the source file
330 Raises:
331 TypeError: If the path is not a valid path type
332 FileNotFoundError: If the source file does not exist
333 """
334 pass
336 def exists(self, path: Union[str, Path]) -> bool:
337 """
338 Declarative truth test: does the path resolve to a valid object?
340 A path only 'exists' if:
341 - it is a valid file or directory
342 - or it is a symlink that resolves to a valid file or directory
344 Returns:
345 bool: True if path structurally resolves to a real object
346 """
347 try:
348 return self.is_file(path)
349 except (FileNotFoundError, NotADirectoryError, StorageResolutionError):
350 pass
351 except IsADirectoryError:
352 # Path exists but is a directory, so check if it's a valid directory
353 try:
354 return self.is_dir(path)
355 except (FileNotFoundError, NotADirectoryError, StorageResolutionError):
356 return False
358 # If is_file failed for other reasons, try is_dir
359 try:
360 return self.is_dir(path)
361 except (FileNotFoundError, NotADirectoryError, StorageResolutionError):
362 return False
365def _create_storage_registry() -> Dict[str, DataSink]:
366 """
367 Create a new storage registry using metaclass-based discovery.
369 This function creates a dictionary mapping backend names to their respective
370 storage backend instances using automatic discovery and registration.
372 Now returns Dict[str, DataSink] to support both StorageBackend and StreamingBackend.
374 Returns:
375 A dictionary mapping backend names to DataSink instances (polymorphic)
377 Note:
378 This function now uses the metaclass-based registry system for automatic
379 backend discovery, eliminating hardcoded imports.
380 """
381 # Import the metaclass-based registry system
382 from openhcs.io.backend_registry import create_storage_registry
384 return create_storage_registry()
387# Global singleton storage registry - created lazily to avoid GPU imports in subprocess mode
388# This is the shared registry instance that all components should use
389import os
390if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 390 ↛ 392line 390 didn't jump to line 392 because the condition on line 390 was never true
391 # Subprocess runner mode - create minimal registry with only essential backends
392 storage_registry: Dict[str, DataSink] = {}
393 logger.info("Subprocess runner mode - storage registry will be created lazily")
394else:
395 # Normal mode - create full registry at import time
396 storage_registry: Dict[str, DataSink] = _create_storage_registry()
399def ensure_storage_registry() -> None:
400 """
401 Ensure storage registry is initialized.
403 In subprocess runner mode, the registry is created lazily to avoid
404 importing GPU libraries during subprocess runner initialization.
405 """
406 global storage_registry
407 if not storage_registry:
408 storage_registry.update(_create_storage_registry())
409 logger.info("Lazily created storage registry")
412def reset_memory_backend() -> None:
413 """
414 Clear files from the memory backend while preserving directory structure.
416 This function clears all file entries from the existing memory backend but preserves
417 directory entries (None values). This prevents key collisions between plate executions
418 while maintaining the directory structure needed for subsequent operations.
420 Benefits over full reset:
421 - Preserves directory structure created by path planner
422 - Prevents "Parent path does not exist" errors on subsequent runs
423 - Avoids key collisions for special inputs/outputs
424 - Maintains performance by not recreating directory hierarchy
426 Note:
427 This only affects the memory backend. Other backends (disk, zarr) are not modified.
428 """
429 from openhcs.constants.constants import Backend
431 # Clear files from existing memory backend while preserving directories
432 memory_backend = storage_registry[Backend.MEMORY.value]
433 memory_backend.clear_files_only()
434 logger.info("Memory backend reset - files cleared, directories preserved")