Coverage for openhcs/io/disk.py: 64.8%
270 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1# openhcs/io/storage/backends/disk.py
2"""
3Disk-based storage backend implementation.
5This module provides a concrete implementation of the storage backend interfaces
6for local disk storage. It strictly enforces VFS boundaries and doctrinal clauses.
7"""
9import fnmatch
10import logging
11import os
12import shutil
13from pathlib import Path
14from typing import Any, Callable, Dict, List, Optional, Set, Union
15from os import PathLike
17import numpy as np
19from openhcs.constants.constants import FileFormat
20from openhcs.io.base import StorageBackend
22logger = logging.getLogger(__name__)
25def optional_import(module_name):
26 try:
27 return __import__(module_name)
28 except ImportError:
29 return None
31# Optional dependencies at module level (not instance level to avoid pickle issues)
32torch = optional_import("torch")
33jax = optional_import("jax")
34jnp = optional_import("jax.numpy")
35cupy = optional_import("cupy")
36tf = optional_import("tensorflow")
37tifffile = optional_import("tifffile")
39class FileFormatRegistry:
40 def __init__(self):
41 self._writers: Dict[str, Callable[[Path, Any], None]] = {}
42 self._readers: Dict[str, Callable[[Path], Any]] = {}
44 def register(self, ext: str, writer: Callable, reader: Callable):
45 ext = ext.lower()
46 self._writers[ext] = writer
47 self._readers[ext] = reader
49 def get_writer(self, ext: str) -> Callable:
50 return self._writers[ext.lower()]
52 def get_reader(self, ext: str) -> Callable:
53 return self._readers[ext.lower()]
55 def is_registered(self, ext: str) -> bool:
56 return ext.lower() in self._writers and ext.lower() in self._readers
59class DiskStorageBackend(StorageBackend):
60 def __init__(self):
61 self.format_registry = FileFormatRegistry()
62 self._register_formats()
64 def _register_formats(self):
65 formats = []
67 # NumPy
68 formats.append((
69 FileFormat.NUMPY.value,
70 np.save,
71 np.load
72 ))
74 if torch: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 formats.append((
76 FileFormat.TORCH.value,
77 torch.save,
78 torch.load
79 ))
81 if jax and jnp: 81 ↛ 89line 81 didn't jump to line 89 because the condition on line 81 was always true
82 formats.append((
83 FileFormat.JAX.value,
84 self._jax_writer,
85 self._jax_reader
86 ))
88 # CuPy
89 if cupy: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 formats.append((
91 FileFormat.CUPY.value,
92 self._cupy_writer,
93 self._cupy_reader
94 ))
96 # TensorFlow
97 if tf: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 formats.append((
99 FileFormat.TENSORFLOW.value,
100 self._tensorflow_writer,
101 self._tensorflow_reader
102 ))
104 # TIFF
105 if tifffile: 105 ↛ 113line 105 didn't jump to line 113 because the condition on line 105 was always true
106 formats.append((
107 FileFormat.TIFF.value,
108 self._tiff_writer,
109 self._tiff_reader
110 ))
112 # Plain Text
113 formats.append((
114 FileFormat.TEXT.value,
115 self._text_writer,
116 self._text_reader
117 ))
119 # Register everything
120 for extensions, writer, reader in formats:
121 for ext in extensions:
122 self.format_registry.register(ext.lower(), writer, reader)
124 # Format-specific writer/reader functions (pickleable)
125 def _jax_writer(self, path, data, **kwargs):
126 np.save(path, jax.device_get(data))
128 def _jax_reader(self, path):
129 return jnp.array(np.load(path))
131 def _cupy_writer(self, path, data, **kwargs):
132 cupy.save(path, data)
134 def _cupy_reader(self, path):
135 return cupy.load(path)
137 def _tensorflow_writer(self, path, data, **kwargs):
138 tf.io.write_file(path.as_posix(), tf.io.serialize_tensor(data))
140 def _tensorflow_reader(self, path):
141 return tf.io.parse_tensor(tf.io.read_file(path.as_posix()), out_type=tf.dtypes.float32)
143 def _tiff_writer(self, path, data, **kwargs):
144 tifffile.imwrite(path, data)
146 def _tiff_reader(self, path):
147 return tifffile.imread(path)
149 def _text_writer(self, path, data):
150 path.write_text(str(data))
152 def _text_reader(self, path):
153 return path.read_text()
156 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
157 """
158 Load data from disk based on explicit content type.
160 Args:
161 file_path: Path to the file to load
162 **kwargs: Additional arguments for the load operation, must include 'content_type'
163 to explicitly specify the type of content to load
165 Returns:
166 The loaded data
168 Raises:
169 TypeError: If file_path is not a valid path type or content_type is not specified
170 FileNotFoundError: If the file does not exist
171 ValueError: If the file cannot be loaded
172 """
174 disk_path = Path(file_path)
175 ext = disk_path.suffix.lower()
176 if not self.format_registry.is_registered(ext): 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 raise ValueError(f"No writer registered for extension '{ext}'")
179 try:
180 reader = self.format_registry.get_reader(ext)
181 return reader(disk_path, **kwargs)
182 except Exception as e:
183 raise ValueError(f"Error loading data from {disk_path}: {e}") from e
185 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None:
186 """
187 Save data to disk based on explicit content type.
189 Args:
190 data: The data to save
191 output_path: Path where the data should be saved
192 **kwargs: Additional arguments for the save operation, must include 'content_type'
193 to explicitly specify the type of content to save
195 Raises:
196 TypeError: If output_path is not a valid path type or content_type is not specified
197 ValueError: If the data cannot be saved
198 """
199 disk_output_path = Path(output_path)
200 ext = disk_output_path.suffix.lower()
201 if not self.format_registry.is_registered(ext): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise ValueError(f"No writer registered for extension '{ext}'")
204 try:
205 writer = self.format_registry.get_writer(ext)
206 return writer(disk_output_path, data, **kwargs )
207 except Exception as e:
208 raise ValueError(f"Error saving data to {disk_output_path}: {e}") from e
210 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
211 """
212 Load multiple files sequentially using existing load method.
214 Args:
215 file_paths: List of file paths to load
216 **kwargs: Additional arguments passed to load method
218 Returns:
219 List of loaded data objects in the same order as file_paths
220 """
221 results = []
222 for file_path in file_paths:
223 result = self.load(file_path, **kwargs)
224 results.append(result)
225 return results
227 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None:
228 """
229 Save multiple files sequentially using existing save method.
231 Converts GPU arrays to CPU numpy arrays before saving using OpenHCS memory conversion system.
233 Args:
234 data_list: List of data objects to save
235 output_paths: List of destination paths (must match length of data_list)
236 **kwargs: Additional arguments passed to save method
238 Raises:
239 ValueError: If data_list and output_paths have different lengths
240 """
241 if len(data_list) != len(output_paths): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})")
244 # Convert GPU arrays to CPU numpy arrays using OpenHCS memory conversion system
245 from openhcs.core.memory.converters import convert_memory
246 from openhcs.core.memory.stack_utils import _detect_memory_type
247 from openhcs.constants.constants import MemoryType
249 cpu_data_list = []
250 for data in data_list:
251 # Detect the memory type of the data
252 source_type = _detect_memory_type(data)
254 # Convert to numpy if not already numpy
255 if source_type == MemoryType.NUMPY.value: 255 ↛ 261line 255 didn't jump to line 261 because the condition on line 255 was always true
256 # Already numpy, use as-is
257 cpu_data_list.append(data)
258 else:
259 # Convert to numpy using OpenHCS memory conversion system
260 # Allow CPU roundtrip since we're explicitly going to disk
261 numpy_data = convert_memory(
262 data=data,
263 source_type=source_type,
264 target_type=MemoryType.NUMPY.value,
265 gpu_id=0, # Placeholder since numpy doesn't use GPU ID
266 allow_cpu_roundtrip=True
267 )
268 cpu_data_list.append(numpy_data)
270 # Save converted data using existing save method
271 for cpu_data, output_path in zip(cpu_data_list, output_paths):
272 self.save(cpu_data, output_path, **kwargs)
274 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None,
275 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Union[str,Path]]:
276 """
277 List files on disk, optionally filtering by pattern and extensions.
279 Args:
280 directory: Directory to search.
281 pattern: Optional glob pattern to match filenames.
282 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}).
283 Extensions should include the dot and are case-insensitive.
284 recursive: Whether to search recursively.
286 Returns:
287 List of paths to matching files.
289 Raises:
290 TypeError: If directory is not a valid path type
291 FileNotFoundError: If the directory does not exist
292 """
293 disk_directory = Path(directory)
296 if not disk_directory.is_dir(): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 raise ValueError(f"Path is not a directory: {disk_directory}")
299 # Use appropriate search strategy based on recursion
300 if recursive:
301 # Use breadth-first traversal to prioritize shallower files
302 files = self._list_files_breadth_first(disk_directory, pattern)
303 else:
304 glob_pattern = pattern if pattern else "*"
305 files = [p for p in disk_directory.glob(glob_pattern) if p.is_file()]
307 # Filter by extensions if provided
308 if extensions:
309 # Convert extensions to lowercase for case-insensitive comparison
310 lowercase_extensions = {ext.lower() for ext in extensions}
311 files = [f for f in files if f.suffix.lower() in lowercase_extensions]
313 # Return paths as strings
314 return [str(f) for f in files]
316 def _list_files_breadth_first(self, directory: Path, pattern: Optional[str] = None) -> List[Path]:
317 """
318 List files using breadth-first traversal to prioritize shallower files.
320 This ensures that files in the root directory are found before files
321 in subdirectories, which is important for metadata detection.
323 Args:
324 directory: Root directory to search
325 pattern: Optional glob pattern to match filenames
327 Returns:
328 List of file paths sorted by depth (shallower first)
329 """
330 from collections import deque
332 files = []
333 # Use deque for breadth-first traversal
334 dirs_to_search = deque([(directory, 0)]) # (path, depth)
336 while dirs_to_search:
337 current_dir, depth = dirs_to_search.popleft()
339 try:
340 # Get all entries in current directory
341 for entry in current_dir.iterdir():
342 if entry.is_file():
343 # Check if file matches pattern
344 if pattern is None or entry.match(pattern): 344 ↛ 341line 344 didn't jump to line 341 because the condition on line 344 was always true
345 files.append((entry, depth))
346 elif entry.is_dir(): 346 ↛ 341line 346 didn't jump to line 341 because the condition on line 346 was always true
347 # Add subdirectory to queue for later processing
348 dirs_to_search.append((entry, depth + 1))
349 except (PermissionError, OSError):
350 # Skip directories we can't read
351 continue
353 # Sort by depth first, then by path for consistent ordering
354 files.sort(key=lambda x: (x[1], str(x[0])))
356 # Return just the paths
357 return [file_path for file_path, _ in files]
359 def list_dir(self, path: Union[str, Path]) -> List[str]:
360 path = Path(path)
361 if not path.exists(): 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 raise FileNotFoundError(f"Path does not exist: {path}")
363 if not path.is_dir():
364 raise NotADirectoryError(f"Not a directory: {path}")
365 return [entry.name for entry in path.iterdir()]
368 def delete(self, path: Union[str, Path]) -> None:
369 """
370 Delete a file or empty directory at the given disk path.
372 Args:
373 path: Path to delete
375 Raises:
376 FileNotFoundError: If path does not exist
377 IsADirectoryError: If path is a directory and not empty
378 StorageResolutionError: If deletion fails for unknown reasons
379 """
380 path = Path(path)
382 if not path.exists(): 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true
383 raise FileNotFoundError(f"Cannot delete: path does not exist: {path}")
385 try:
386 if path.is_dir():
387 # Do not allow recursive deletion
388 path.rmdir() # will raise OSError if directory is not empty
389 else:
390 path.unlink()
391 except IsADirectoryError:
392 raise
393 except OSError as e:
394 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") from e
395 except Exception as e:
396 raise StorageResolutionError(f"Failed to delete {path}") from e
398 def delete_all(self, path: Union[str, Path]) -> None:
399 """
400 Recursively delete a file or directory and all its contents from disk.
402 Args:
403 path: Filesystem path to delete
405 Raises:
406 FileNotFoundError: If the path does not exist
407 StorageResolutionError: If deletion fails for any reason
408 """
409 path = Path(path)
411 if not path.exists(): 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true
412 raise FileNotFoundError(f"Path does not exist: {path}")
414 try:
415 if path.is_file(): 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true
416 path.unlink()
417 else:
418 # Safe, recursive removal of directories
419 import shutil
420 shutil.rmtree(path)
421 except Exception as e:
422 raise StorageResolutionError(f"Failed to recursively delete: {path}") from e
425 def ensure_directory(self, directory: Union[str, Path]) -> Union[str, Path]:
426 """
427 Ensure a directory exists on disk.
429 Args:
430 directory: Path to the directory to ensure exists
432 Returns:
433 Path to the directory
435 Raises:
436 TypeError: If directory is not a valid path type
437 ValueError: If there is an error creating the directory
438 """
439 # 🔒 Clause 17 — VFS Boundary Enforcement
440 try:
441 disk_directory = Path(directory)
442 disk_directory.mkdir(parents=True, exist_ok=True)
443 return directory
444 except OSError as e:
445 # 🔒 Clause 65 — No Fallback Logic
446 # Propagate the error with additional context
447 raise ValueError(f"Error creating directory {disk_directory}: {e}") from e
449 def exists(self, path: Union[str, Path]) -> bool:
450 return Path(path).exists()
452 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
453 source = Path(source).resolve()
454 link_name = Path(link_name) # Don't resolve link_name - we want the actual symlink path
456 if not source.exists(): 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true
457 raise FileNotFoundError(f"Source path does not exist: {source}")
459 # Check if target exists and handle overwrite policy
460 if link_name.exists() or link_name.is_symlink(): 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 if not overwrite:
462 raise FileExistsError(f"Target already exists: {link_name}")
463 link_name.unlink() # Remove existing file/symlink only if overwrite=True
465 link_name.parent.mkdir(parents=True, exist_ok=True)
466 link_name.symlink_to(source)
469 def is_symlink(self, path: Union[str, Path]) -> bool:
470 return Path(path).is_symlink()
473 def is_file(self, path: Union[str, Path]) -> bool:
474 path = Path(path)
476 if not path.exists(): 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true
477 raise FileNotFoundError(f"Path does not exist: {path}")
479 # Resolve symlinks and return True only if final target is a file
480 resolved = path.resolve(strict=True)
482 if resolved.is_dir(): 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true
483 raise IsADirectoryError(f"Path is a directory: {path}")
485 return resolved.is_file()
487 def is_dir(self, path: Union[str, Path]) -> bool:
488 """
489 Check if a given disk path is a directory.
491 Follows filesystem symlinks to determine the actual resolved structure.
493 Args:
494 path: Filesystem path (absolute or relative)
496 Returns:
497 bool: True if path resolves to a directory
499 Raises:
500 FileNotFoundError: If the path or symlink target does not exist
501 NotADirectoryError: If the resolved target is not a directory
502 StorageResolutionError: For unexpected filesystem resolution errors
503 """
504 from pathlib import Path
506 try:
507 path = Path(path)
509 if not path.exists(): 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true
510 raise FileNotFoundError(f"Path does not exist: {path}")
512 # Follow symlinks to final real target
513 resolved = path.resolve(strict=True)
515 if not resolved.is_dir(): 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true
516 raise NotADirectoryError(f"Path is not a directory: {path}")
518 return True
520 except FileNotFoundError:
521 raise # broken symlink or missing path
522 except NotADirectoryError:
523 raise
524 except Exception as e:
525 raise StorageResolutionError(f"Failed to resolve directory: {path}") from e
527 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
528 """
529 Move a file or directory on disk. Follows symlinks and performs overwrite-safe move.
531 Raises:
532 FileNotFoundError: If source does not exist
533 FileExistsError: If destination already exists
534 StorageResolutionError: On failure to move
535 """
536 import shutil
537 from pathlib import Path
539 src = Path(src)
540 dst = Path(dst)
542 if not src.exists(): 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true
543 raise FileNotFoundError(f"Source path does not exist: {src}")
544 if dst.exists(): 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true
545 raise FileExistsError(f"Destination already exists: {dst}")
547 try:
548 shutil.move(str(src), str(dst))
549 except Exception as e:
550 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e
552 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
553 """
554 Return structural metadata about a disk-backed path.
556 Returns:
557 dict with keys:
558 - 'type': 'file', 'directory', 'symlink', or 'missing'
559 - 'path': str(path)
560 - 'target': resolved target if symlink
561 - 'exists': bool
563 Raises:
564 StorageResolutionError: On access or resolution failure
565 """
566 path_str = str(path)
567 try:
568 if not os.path.lexists(path_str): # includes broken symlinks
569 return {
570 "type": "missing",
571 "path": path_str,
572 "exists": False
573 }
575 if os.path.islink(path_str):
576 try:
577 resolved = os.readlink(path_str)
578 target_exists = os.path.exists(path_str)
579 except OSError as e:
580 raise StorageResolutionError(f"Failed to resolve symlink: {path}") from e
582 return {
583 "type": "symlink",
584 "path": path_str,
585 "target": resolved,
586 "exists": target_exists
587 }
589 if os.path.isdir(path_str):
590 return {
591 "type": "directory",
592 "path": path_str,
593 "exists": True
594 }
596 if os.path.isfile(path_str):
597 return {
598 "type": "file",
599 "path": path_str,
600 "exists": True
601 }
603 raise StorageResolutionError(f"Unknown filesystem object at: {path_str}")
605 except Exception as e:
606 raise StorageResolutionError(f"Failed to stat disk path: {path}") from e
608 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
609 """
610 Copy a file or directory to a new location.
612 - Does not overwrite destination.
613 - Will raise if destination exists.
614 - Supports file-to-file and dir-to-dir copies.
616 Raises:
617 FileExistsError: If destination already exists
618 FileNotFoundError: If source is missing
619 StorageResolutionError: On structural failure
620 """
621 src = Path(src)
622 dst = Path(dst)
624 if not src.exists():
625 raise FileNotFoundError(f"Source does not exist: {src}")
626 if dst.exists():
627 raise FileExistsError(f"Destination already exists: {dst}")
629 try:
630 if src.is_dir():
631 shutil.copytree(src, dst)
632 else:
633 shutil.copy2(src, dst)
634 except Exception as e:
635 raise StorageResolutionError(f"Failed to copy {src} → {dst}") from e