Coverage for openhcs/io/disk.py: 50.7%
291 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1# openhcs/io/storage/backends/disk.py
2"""
3Disk-based storage backend implementation.
5This module provides a concrete implementation of the storage backend interfaces
6for local disk storage. It strictly enforces VFS boundaries and doctrinal clauses.
7"""
9import fnmatch
10import logging
11import os
12import shutil
13from pathlib import Path
14from typing import Any, Callable, Dict, List, Optional, Set, Union
15from os import PathLike
17import numpy as np
19from openhcs.constants.constants import FileFormat
20from openhcs.io.base import StorageBackend
21from openhcs.io.backend_registry import StorageBackendMeta
22from openhcs.constants.constants import Backend
24logger = logging.getLogger(__name__)
27def optional_import(module_name):
28 try:
29 return __import__(module_name)
30 except ImportError:
31 return None
33# Optional dependencies at module level (not instance level to avoid pickle issues)
34# Skip GPU libraries in subprocess runner mode
35if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 torch = None
37 jax = None
38 jnp = None
39 cupy = None
40 tf = None
41 logger.info("Subprocess runner mode - skipping GPU library imports in disk backend")
42else:
43 torch = optional_import("torch")
44 jax = optional_import("jax")
45 jnp = optional_import("jax.numpy")
46 cupy = optional_import("cupy")
47 tf = optional_import("tensorflow")
48tifffile = optional_import("tifffile")
50class FileFormatRegistry:
51 def __init__(self):
52 self._writers: Dict[str, Callable[[Path, Any], None]] = {}
53 self._readers: Dict[str, Callable[[Path], Any]] = {}
55 def register(self, ext: str, writer: Callable, reader: Callable):
56 ext = ext.lower()
57 self._writers[ext] = writer
58 self._readers[ext] = reader
60 def get_writer(self, ext: str) -> Callable:
61 return self._writers[ext.lower()]
63 def get_reader(self, ext: str) -> Callable:
64 return self._readers[ext.lower()]
66 def is_registered(self, ext: str) -> bool:
67 return ext.lower() in self._writers and ext.lower() in self._readers
70class DiskStorageBackend(StorageBackend, metaclass=StorageBackendMeta):
71 """Disk storage backend with automatic metaclass registration."""
73 # Backend type from enum for registration
74 _backend_type = Backend.DISK.value
75 def __init__(self):
76 self.format_registry = FileFormatRegistry()
77 self._register_formats()
79 def _register_formats(self):
80 formats = []
82 # NumPy
83 formats.append((
84 FileFormat.NUMPY.value,
85 np.save,
86 np.load
87 ))
89 if torch: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 formats.append((
91 FileFormat.TORCH.value,
92 torch.save,
93 torch.load
94 ))
96 if jax and jnp: 96 ↛ 104line 96 didn't jump to line 104 because the condition on line 96 was always true
97 formats.append((
98 FileFormat.JAX.value,
99 self._jax_writer,
100 self._jax_reader
101 ))
103 # CuPy
104 if cupy: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 formats.append((
106 FileFormat.CUPY.value,
107 self._cupy_writer,
108 self._cupy_reader
109 ))
111 # TensorFlow
112 if tf: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 formats.append((
114 FileFormat.TENSORFLOW.value,
115 self._tensorflow_writer,
116 self._tensorflow_reader
117 ))
119 # TIFF
120 if tifffile: 120 ↛ 128line 120 didn't jump to line 128 because the condition on line 120 was always true
121 formats.append((
122 FileFormat.TIFF.value,
123 self._tiff_writer,
124 self._tiff_reader
125 ))
127 # Plain Text
128 formats.append((
129 FileFormat.TEXT.value,
130 self._text_writer,
131 self._text_reader
132 ))
134 # Register everything
135 for extensions, writer, reader in formats:
136 for ext in extensions:
137 self.format_registry.register(ext.lower(), writer, reader)
139 # Format-specific writer/reader functions (pickleable)
140 def _jax_writer(self, path, data, **kwargs):
141 np.save(path, jax.device_get(data))
143 def _jax_reader(self, path):
144 return jnp.array(np.load(path))
146 def _cupy_writer(self, path, data, **kwargs):
147 cupy.save(path, data)
149 def _cupy_reader(self, path):
150 return cupy.load(path)
152 def _tensorflow_writer(self, path, data, **kwargs):
153 tf.io.write_file(path.as_posix(), tf.io.serialize_tensor(data))
155 def _tensorflow_reader(self, path):
156 return tf.io.parse_tensor(tf.io.read_file(path.as_posix()), out_type=tf.dtypes.float32)
158 def _tiff_writer(self, path, data, **kwargs):
159 tifffile.imwrite(path, data)
161 def _tiff_reader(self, path):
162 # For symlinks, try multiple approaches to handle filesystem issues
163 path_obj = Path(path)
165 if path_obj.is_symlink():
166 # First try reading the symlink directly (let OS handle it)
167 try:
168 return tifffile.imread(str(path))
169 except FileNotFoundError:
170 # If that fails, try the target path
171 try:
172 target_path = path_obj.readlink()
173 return tifffile.imread(str(target_path))
174 except FileNotFoundError:
175 # If target doesn't exist, try resolving the symlink
176 resolved_path = path_obj.resolve()
177 return tifffile.imread(str(resolved_path))
178 else:
179 return tifffile.imread(str(path))
181 def _text_writer(self, path, data):
182 path.write_text(str(data))
184 def _text_reader(self, path):
185 return path.read_text()
188 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
189 """
190 Load data from disk based on explicit content type.
192 Args:
193 file_path: Path to the file to load
194 **kwargs: Additional arguments for the load operation, must include 'content_type'
195 to explicitly specify the type of content to load
197 Returns:
198 The loaded data
200 Raises:
201 TypeError: If file_path is not a valid path type or content_type is not specified
202 FileNotFoundError: If the file does not exist
203 ValueError: If the file cannot be loaded
204 """
206 disk_path = Path(file_path)
207 ext = disk_path.suffix.lower()
208 if not self.format_registry.is_registered(ext):
209 raise ValueError(f"No writer registered for extension '{ext}'")
211 try:
212 reader = self.format_registry.get_reader(ext)
213 return reader(disk_path, **kwargs)
214 except Exception as e:
215 raise ValueError(f"Error loading data from {disk_path}: {e}") from e
217 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None:
218 """
219 Save data to disk based on explicit content type.
221 Args:
222 data: The data to save
223 output_path: Path where the data should be saved
224 **kwargs: Additional arguments for the save operation, must include 'content_type'
225 to explicitly specify the type of content to save
227 Raises:
228 TypeError: If output_path is not a valid path type or content_type is not specified
229 ValueError: If the data cannot be saved
230 """
231 disk_output_path = Path(output_path)
232 ext = disk_output_path.suffix.lower()
233 if not self.format_registry.is_registered(ext):
234 raise ValueError(f"No writer registered for extension '{ext}'")
236 try:
237 writer = self.format_registry.get_writer(ext)
238 return writer(disk_output_path, data, **kwargs )
239 except Exception as e:
240 raise ValueError(f"Error saving data to {disk_output_path}: {e}") from e
242 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
243 """
244 Load multiple files sequentially using existing load method.
246 Args:
247 file_paths: List of file paths to load
248 **kwargs: Additional arguments passed to load method
250 Returns:
251 List of loaded data objects in the same order as file_paths
252 """
253 results = []
254 for file_path in file_paths:
255 result = self.load(file_path, **kwargs)
256 results.append(result)
257 return results
259 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None:
260 """
261 Save multiple files sequentially using existing save method.
263 Converts GPU arrays to CPU numpy arrays before saving using OpenHCS memory conversion system.
265 Args:
266 data_list: List of data objects to save
267 output_paths: List of destination paths (must match length of data_list)
268 **kwargs: Additional arguments passed to save method
270 Raises:
271 ValueError: If data_list and output_paths have different lengths
272 """
273 if len(data_list) != len(output_paths):
274 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})")
276 # Convert GPU arrays to CPU numpy arrays using OpenHCS memory conversion system
277 from openhcs.core.memory.converters import convert_memory
278 from openhcs.core.memory.stack_utils import _detect_memory_type
279 from openhcs.constants.constants import MemoryType
281 cpu_data_list = []
282 for data in data_list:
283 # Detect the memory type of the data
284 source_type = _detect_memory_type(data)
286 # Convert to numpy if not already numpy
287 if source_type == MemoryType.NUMPY.value:
288 # Already numpy, use as-is
289 cpu_data_list.append(data)
290 else:
291 # Convert to numpy using OpenHCS memory conversion system
292 # Allow CPU roundtrip since we're explicitly going to disk
293 numpy_data = convert_memory(
294 data=data,
295 source_type=source_type,
296 target_type=MemoryType.NUMPY.value,
297 gpu_id=0, # Placeholder since numpy doesn't use GPU ID
298 allow_cpu_roundtrip=True
299 )
300 cpu_data_list.append(numpy_data)
302 # Save converted data using existing save method
303 for cpu_data, output_path in zip(cpu_data_list, output_paths):
304 self.save(cpu_data, output_path, **kwargs)
306 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None,
307 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Union[str,Path]]:
308 """
309 List files on disk, optionally filtering by pattern and extensions.
311 Args:
312 directory: Directory to search.
313 pattern: Optional glob pattern to match filenames.
314 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}).
315 Extensions should include the dot and are case-insensitive.
316 recursive: Whether to search recursively.
318 Returns:
319 List of paths to matching files.
321 Raises:
322 TypeError: If directory is not a valid path type
323 FileNotFoundError: If the directory does not exist
324 """
325 disk_directory = Path(directory)
327 if not disk_directory.is_dir(): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 raise ValueError(f"Path is not a directory: {disk_directory}")
330 # Use appropriate search strategy based on recursion
331 if recursive:
332 # Use breadth-first traversal to prioritize shallower files
333 files = self._list_files_breadth_first(disk_directory, pattern)
334 else:
335 glob_pattern = pattern if pattern else "*"
336 # Include both regular files and symlinks (even broken ones)
337 files = [p for p in disk_directory.glob(glob_pattern) if p.is_file() or p.is_symlink()]
339 # Filter by extensions if provided
340 if extensions:
341 # Convert extensions to lowercase for case-insensitive comparison
342 lowercase_extensions = {ext.lower() for ext in extensions}
343 files = [f for f in files if f.suffix.lower() in lowercase_extensions]
345 # Return paths as strings
346 return [str(f) for f in files]
348 def _list_files_breadth_first(self, directory: Path, pattern: Optional[str] = None) -> List[Path]:
349 """
350 List files using breadth-first traversal to prioritize shallower files.
352 This ensures that files in the root directory are found before files
353 in subdirectories, which is important for metadata detection.
355 Args:
356 directory: Root directory to search
357 pattern: Optional glob pattern to match filenames
359 Returns:
360 List of file paths sorted by depth (shallower first)
361 """
362 from collections import deque
364 files = []
365 # Use deque for breadth-first traversal
366 dirs_to_search = deque([(directory, 0)]) # (path, depth)
368 while dirs_to_search:
369 current_dir, depth = dirs_to_search.popleft()
371 try:
372 # Get all entries in current directory
373 for entry in current_dir.iterdir():
374 if entry.is_file():
375 # Check if file matches pattern
376 if pattern is None or entry.match(pattern): 376 ↛ 373line 376 didn't jump to line 373 because the condition on line 376 was always true
377 files.append((entry, depth))
378 elif entry.is_dir(): 378 ↛ 373line 378 didn't jump to line 373 because the condition on line 378 was always true
379 # Add subdirectory to queue for later processing
380 dirs_to_search.append((entry, depth + 1))
381 except (PermissionError, OSError):
382 # Skip directories we can't read
383 continue
385 # Sort by depth first, then by path for consistent ordering
386 files.sort(key=lambda x: (x[1], str(x[0])))
388 # Return just the paths
389 return [file_path for file_path, _ in files]
391 def list_dir(self, path: Union[str, Path]) -> List[str]:
392 path = Path(path)
393 if not path.exists(): 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true
394 raise FileNotFoundError(f"Path does not exist: {path}")
395 if not path.is_dir():
396 raise NotADirectoryError(f"Not a directory: {path}")
397 return [entry.name for entry in path.iterdir()]
400 def delete(self, path: Union[str, Path]) -> None:
401 """
402 Delete a file or empty directory at the given disk path.
404 Args:
405 path: Path to delete
407 Raises:
408 FileNotFoundError: If path does not exist
409 IsADirectoryError: If path is a directory and not empty
410 StorageResolutionError: If deletion fails for unknown reasons
411 """
412 path = Path(path)
414 if not path.exists(): 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true
415 raise FileNotFoundError(f"Cannot delete: path does not exist: {path}")
417 try:
418 if path.is_dir():
419 # Do not allow recursive deletion
420 path.rmdir() # will raise OSError if directory is not empty
421 else:
422 path.unlink()
423 except IsADirectoryError:
424 raise
425 except OSError as e:
426 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") from e
427 except Exception as e:
428 raise StorageResolutionError(f"Failed to delete {path}") from e
430 def delete_all(self, path: Union[str, Path]) -> None:
431 """
432 Recursively delete a file or directory and all its contents from disk.
434 Args:
435 path: Filesystem path to delete
437 Raises:
438 FileNotFoundError: If the path does not exist
439 StorageResolutionError: If deletion fails for any reason
440 """
441 path = Path(path)
443 if not path.exists(): 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 raise FileNotFoundError(f"Path does not exist: {path}")
446 try:
447 if path.is_file(): 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true
448 path.unlink()
449 else:
450 # Safe, recursive removal of directories
451 import shutil
452 shutil.rmtree(path)
453 except Exception as e:
454 raise StorageResolutionError(f"Failed to recursively delete: {path}") from e
457 def ensure_directory(self, directory: Union[str, Path]) -> Union[str, Path]:
458 """
459 Ensure a directory exists on disk.
461 Args:
462 directory: Path to the directory to ensure exists
464 Returns:
465 Path to the directory
467 Raises:
468 TypeError: If directory is not a valid path type
469 ValueError: If there is an error creating the directory
470 """
471 # 🔒 Clause 17 — VFS Boundary Enforcement
472 try:
473 disk_directory = Path(directory)
474 disk_directory.mkdir(parents=True, exist_ok=True)
475 return directory
476 except OSError as e:
477 # 🔒 Clause 65 — No Fallback Logic
478 # Propagate the error with additional context
479 raise ValueError(f"Error creating directory {disk_directory}: {e}") from e
481 def exists(self, path: Union[str, Path]) -> bool:
482 return Path(path).exists()
484 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
485 source = Path(source).resolve()
486 link_name = Path(link_name) # Don't resolve link_name - we want the actual symlink path
488 if not source.exists(): 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true
489 raise FileNotFoundError(f"Source path does not exist: {source}")
491 # Check if target exists and handle overwrite policy
492 if link_name.exists() or link_name.is_symlink(): 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 if not overwrite:
494 raise FileExistsError(f"Target already exists: {link_name}")
495 link_name.unlink() # Remove existing file/symlink only if overwrite=True
497 link_name.parent.mkdir(parents=True, exist_ok=True)
498 link_name.symlink_to(source)
501 def is_symlink(self, path: Union[str, Path]) -> bool:
502 return Path(path).is_symlink()
505 def is_file(self, path: Union[str, Path]) -> bool:
506 path = Path(path)
508 if not path.exists(): 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true
509 raise FileNotFoundError(f"Path does not exist: {path}")
511 # Resolve symlinks and return True only if final target is a file
512 resolved = path.resolve(strict=True)
514 if resolved.is_dir(): 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true
515 raise IsADirectoryError(f"Path is a directory: {path}")
517 return resolved.is_file()
519 def is_dir(self, path: Union[str, Path]) -> bool:
520 """
521 Check if a given disk path is a directory.
523 Follows filesystem symlinks to determine the actual resolved structure.
525 Args:
526 path: Filesystem path (absolute or relative)
528 Returns:
529 bool: True if path resolves to a directory
531 Raises:
532 FileNotFoundError: If the path or symlink target does not exist
533 NotADirectoryError: If the resolved target is not a directory
534 StorageResolutionError: For unexpected filesystem resolution errors
535 """
536 from pathlib import Path
538 try:
539 path = Path(path)
541 if not path.exists(): 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true
542 raise FileNotFoundError(f"Path does not exist: {path}")
544 # Follow symlinks to final real target
545 resolved = path.resolve(strict=True)
547 if not resolved.is_dir(): 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 raise NotADirectoryError(f"Path is not a directory: {path}")
550 return True
552 except FileNotFoundError:
553 raise # broken symlink or missing path
554 except NotADirectoryError:
555 raise
556 except Exception as e:
557 raise StorageResolutionError(f"Failed to resolve directory: {path}") from e
559 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
560 """
561 Move a file or directory on disk. Follows symlinks and performs overwrite-safe move.
563 Raises:
564 FileNotFoundError: If source does not exist
565 FileExistsError: If destination already exists
566 StorageResolutionError: On failure to move
567 """
568 import shutil
569 from pathlib import Path
571 src = Path(src)
572 dst = Path(dst)
574 if not src.exists(): 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true
575 raise FileNotFoundError(f"Source path does not exist: {src}")
576 if dst.exists(): 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true
577 raise FileExistsError(f"Destination already exists: {dst}")
579 try:
580 shutil.move(str(src), str(dst))
581 except Exception as e:
582 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e
584 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
585 """
586 Return structural metadata about a disk-backed path.
588 Returns:
589 dict with keys:
590 - 'type': 'file', 'directory', 'symlink', or 'missing'
591 - 'path': str(path)
592 - 'target': resolved target if symlink
593 - 'exists': bool
595 Raises:
596 StorageResolutionError: On access or resolution failure
597 """
598 path_str = str(path)
599 try:
600 if not os.path.lexists(path_str): # includes broken symlinks
601 return {
602 "type": "missing",
603 "path": path_str,
604 "exists": False
605 }
607 if os.path.islink(path_str):
608 try:
609 resolved = os.readlink(path_str)
610 target_exists = os.path.exists(path_str)
611 except OSError as e:
612 raise StorageResolutionError(f"Failed to resolve symlink: {path}") from e
614 return {
615 "type": "symlink",
616 "path": path_str,
617 "target": resolved,
618 "exists": target_exists
619 }
621 if os.path.isdir(path_str):
622 return {
623 "type": "directory",
624 "path": path_str,
625 "exists": True
626 }
628 if os.path.isfile(path_str):
629 return {
630 "type": "file",
631 "path": path_str,
632 "exists": True
633 }
635 raise StorageResolutionError(f"Unknown filesystem object at: {path_str}")
637 except Exception as e:
638 raise StorageResolutionError(f"Failed to stat disk path: {path}") from e
640 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
641 """
642 Copy a file or directory to a new location.
644 - Does not overwrite destination.
645 - Will raise if destination exists.
646 - Supports file-to-file and dir-to-dir copies.
648 Raises:
649 FileExistsError: If destination already exists
650 FileNotFoundError: If source is missing
651 StorageResolutionError: On structural failure
652 """
653 src = Path(src)
654 dst = Path(dst)
656 if not src.exists():
657 raise FileNotFoundError(f"Source does not exist: {src}")
658 if dst.exists():
659 raise FileExistsError(f"Destination already exists: {dst}")
661 try:
662 if src.is_dir():
663 shutil.copytree(src, dst)
664 else:
665 shutil.copy2(src, dst)
666 except Exception as e:
667 raise StorageResolutionError(f"Failed to copy {src} → {dst}") from e