Coverage for openhcs/io/disk.py: 50.7%

291 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1# openhcs/io/storage/backends/disk.py 

2""" 

3Disk-based storage backend implementation. 

4 

5This module provides a concrete implementation of the storage backend interfaces 

6for local disk storage. It strictly enforces VFS boundaries and doctrinal clauses. 

7""" 

8 

9import fnmatch 

10import logging 

11import os 

12import shutil 

13from pathlib import Path 

14from typing import Any, Callable, Dict, List, Optional, Set, Union 

15from os import PathLike 

16 

17import numpy as np 

18 

19from openhcs.constants.constants import FileFormat 

20from openhcs.io.base import StorageBackend 

21from openhcs.io.backend_registry import StorageBackendMeta 

22from openhcs.constants.constants import Backend 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27def optional_import(module_name): 

28 try: 

29 return __import__(module_name) 

30 except ImportError: 

31 return None 

32 

33# Optional dependencies at module level (not instance level to avoid pickle issues) 

34# Skip GPU libraries in subprocess runner mode 

35if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 torch = None 

37 jax = None 

38 jnp = None 

39 cupy = None 

40 tf = None 

41 logger.info("Subprocess runner mode - skipping GPU library imports in disk backend") 

42else: 

43 torch = optional_import("torch") 

44 jax = optional_import("jax") 

45 jnp = optional_import("jax.numpy") 

46 cupy = optional_import("cupy") 

47 tf = optional_import("tensorflow") 

48tifffile = optional_import("tifffile") 

49 

50class FileFormatRegistry: 

51 def __init__(self): 

52 self._writers: Dict[str, Callable[[Path, Any], None]] = {} 

53 self._readers: Dict[str, Callable[[Path], Any]] = {} 

54 

55 def register(self, ext: str, writer: Callable, reader: Callable): 

56 ext = ext.lower() 

57 self._writers[ext] = writer 

58 self._readers[ext] = reader 

59 

60 def get_writer(self, ext: str) -> Callable: 

61 return self._writers[ext.lower()] 

62 

63 def get_reader(self, ext: str) -> Callable: 

64 return self._readers[ext.lower()] 

65 

66 def is_registered(self, ext: str) -> bool: 

67 return ext.lower() in self._writers and ext.lower() in self._readers 

68 

69 

70class DiskStorageBackend(StorageBackend, metaclass=StorageBackendMeta): 

71 """Disk storage backend with automatic metaclass registration.""" 

72 

73 # Backend type from enum for registration 

74 _backend_type = Backend.DISK.value 

75 def __init__(self): 

76 self.format_registry = FileFormatRegistry() 

77 self._register_formats() 

78 

79 def _register_formats(self): 

80 formats = [] 

81 

82 # NumPy 

83 formats.append(( 

84 FileFormat.NUMPY.value, 

85 np.save, 

86 np.load 

87 )) 

88 

89 if torch: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 formats.append(( 

91 FileFormat.TORCH.value, 

92 torch.save, 

93 torch.load 

94 )) 

95 

96 if jax and jnp: 96 ↛ 104line 96 didn't jump to line 104 because the condition on line 96 was always true

97 formats.append(( 

98 FileFormat.JAX.value, 

99 self._jax_writer, 

100 self._jax_reader 

101 )) 

102 

103 # CuPy 

104 if cupy: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 formats.append(( 

106 FileFormat.CUPY.value, 

107 self._cupy_writer, 

108 self._cupy_reader 

109 )) 

110 

111 # TensorFlow 

112 if tf: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 formats.append(( 

114 FileFormat.TENSORFLOW.value, 

115 self._tensorflow_writer, 

116 self._tensorflow_reader 

117 )) 

118 

119 # TIFF 

120 if tifffile: 120 ↛ 128line 120 didn't jump to line 128 because the condition on line 120 was always true

121 formats.append(( 

122 FileFormat.TIFF.value, 

123 self._tiff_writer, 

124 self._tiff_reader 

125 )) 

126 

127 # Plain Text 

128 formats.append(( 

129 FileFormat.TEXT.value, 

130 self._text_writer, 

131 self._text_reader 

132 )) 

133 

134 # Register everything 

135 for extensions, writer, reader in formats: 

136 for ext in extensions: 

137 self.format_registry.register(ext.lower(), writer, reader) 

138 

139 # Format-specific writer/reader functions (pickleable) 

140 def _jax_writer(self, path, data, **kwargs): 

141 np.save(path, jax.device_get(data)) 

142 

143 def _jax_reader(self, path): 

144 return jnp.array(np.load(path)) 

145 

146 def _cupy_writer(self, path, data, **kwargs): 

147 cupy.save(path, data) 

148 

149 def _cupy_reader(self, path): 

150 return cupy.load(path) 

151 

152 def _tensorflow_writer(self, path, data, **kwargs): 

153 tf.io.write_file(path.as_posix(), tf.io.serialize_tensor(data)) 

154 

155 def _tensorflow_reader(self, path): 

156 return tf.io.parse_tensor(tf.io.read_file(path.as_posix()), out_type=tf.dtypes.float32) 

157 

158 def _tiff_writer(self, path, data, **kwargs): 

159 tifffile.imwrite(path, data) 

160 

161 def _tiff_reader(self, path): 

162 # For symlinks, try multiple approaches to handle filesystem issues 

163 path_obj = Path(path) 

164 

165 if path_obj.is_symlink(): 

166 # First try reading the symlink directly (let OS handle it) 

167 try: 

168 return tifffile.imread(str(path)) 

169 except FileNotFoundError: 

170 # If that fails, try the target path 

171 try: 

172 target_path = path_obj.readlink() 

173 return tifffile.imread(str(target_path)) 

174 except FileNotFoundError: 

175 # If target doesn't exist, try resolving the symlink 

176 resolved_path = path_obj.resolve() 

177 return tifffile.imread(str(resolved_path)) 

178 else: 

179 return tifffile.imread(str(path)) 

180 

181 def _text_writer(self, path, data): 

182 path.write_text(str(data)) 

183 

184 def _text_reader(self, path): 

185 return path.read_text() 

186 

187 

188 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

189 """ 

190 Load data from disk based on explicit content type. 

191 

192 Args: 

193 file_path: Path to the file to load 

194 **kwargs: Additional arguments for the load operation, must include 'content_type' 

195 to explicitly specify the type of content to load 

196 

197 Returns: 

198 The loaded data 

199 

200 Raises: 

201 TypeError: If file_path is not a valid path type or content_type is not specified 

202 FileNotFoundError: If the file does not exist 

203 ValueError: If the file cannot be loaded 

204 """ 

205 

206 disk_path = Path(file_path) 

207 ext = disk_path.suffix.lower() 

208 if not self.format_registry.is_registered(ext): 

209 raise ValueError(f"No writer registered for extension '{ext}'") 

210 

211 try: 

212 reader = self.format_registry.get_reader(ext) 

213 return reader(disk_path, **kwargs) 

214 except Exception as e: 

215 raise ValueError(f"Error loading data from {disk_path}: {e}") from e 

216 

217 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None: 

218 """ 

219 Save data to disk based on explicit content type. 

220 

221 Args: 

222 data: The data to save 

223 output_path: Path where the data should be saved 

224 **kwargs: Additional arguments for the save operation, must include 'content_type' 

225 to explicitly specify the type of content to save 

226 

227 Raises: 

228 TypeError: If output_path is not a valid path type or content_type is not specified 

229 ValueError: If the data cannot be saved 

230 """ 

231 disk_output_path = Path(output_path) 

232 ext = disk_output_path.suffix.lower() 

233 if not self.format_registry.is_registered(ext): 

234 raise ValueError(f"No writer registered for extension '{ext}'") 

235 

236 try: 

237 writer = self.format_registry.get_writer(ext) 

238 return writer(disk_output_path, data, **kwargs ) 

239 except Exception as e: 

240 raise ValueError(f"Error saving data to {disk_output_path}: {e}") from e 

241 

242 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

243 """ 

244 Load multiple files sequentially using existing load method. 

245 

246 Args: 

247 file_paths: List of file paths to load 

248 **kwargs: Additional arguments passed to load method 

249 

250 Returns: 

251 List of loaded data objects in the same order as file_paths 

252 """ 

253 results = [] 

254 for file_path in file_paths: 

255 result = self.load(file_path, **kwargs) 

256 results.append(result) 

257 return results 

258 

259 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

260 """ 

261 Save multiple files sequentially using existing save method. 

262 

263 Converts GPU arrays to CPU numpy arrays before saving using OpenHCS memory conversion system. 

264 

265 Args: 

266 data_list: List of data objects to save 

267 output_paths: List of destination paths (must match length of data_list) 

268 **kwargs: Additional arguments passed to save method 

269 

270 Raises: 

271 ValueError: If data_list and output_paths have different lengths 

272 """ 

273 if len(data_list) != len(output_paths): 

274 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})") 

275 

276 # Convert GPU arrays to CPU numpy arrays using OpenHCS memory conversion system 

277 from openhcs.core.memory.converters import convert_memory 

278 from openhcs.core.memory.stack_utils import _detect_memory_type 

279 from openhcs.constants.constants import MemoryType 

280 

281 cpu_data_list = [] 

282 for data in data_list: 

283 # Detect the memory type of the data 

284 source_type = _detect_memory_type(data) 

285 

286 # Convert to numpy if not already numpy 

287 if source_type == MemoryType.NUMPY.value: 

288 # Already numpy, use as-is 

289 cpu_data_list.append(data) 

290 else: 

291 # Convert to numpy using OpenHCS memory conversion system 

292 # Allow CPU roundtrip since we're explicitly going to disk 

293 numpy_data = convert_memory( 

294 data=data, 

295 source_type=source_type, 

296 target_type=MemoryType.NUMPY.value, 

297 gpu_id=0, # Placeholder since numpy doesn't use GPU ID 

298 allow_cpu_roundtrip=True 

299 ) 

300 cpu_data_list.append(numpy_data) 

301 

302 # Save converted data using existing save method 

303 for cpu_data, output_path in zip(cpu_data_list, output_paths): 

304 self.save(cpu_data, output_path, **kwargs) 

305 

306 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None, 

307 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Union[str,Path]]: 

308 """ 

309 List files on disk, optionally filtering by pattern and extensions. 

310 

311 Args: 

312 directory: Directory to search. 

313 pattern: Optional glob pattern to match filenames. 

314 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}). 

315 Extensions should include the dot and are case-insensitive. 

316 recursive: Whether to search recursively. 

317 

318 Returns: 

319 List of paths to matching files. 

320 

321 Raises: 

322 TypeError: If directory is not a valid path type 

323 FileNotFoundError: If the directory does not exist 

324 """ 

325 disk_directory = Path(directory) 

326 

327 if not disk_directory.is_dir(): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 raise ValueError(f"Path is not a directory: {disk_directory}") 

329 

330 # Use appropriate search strategy based on recursion 

331 if recursive: 

332 # Use breadth-first traversal to prioritize shallower files 

333 files = self._list_files_breadth_first(disk_directory, pattern) 

334 else: 

335 glob_pattern = pattern if pattern else "*" 

336 # Include both regular files and symlinks (even broken ones) 

337 files = [p for p in disk_directory.glob(glob_pattern) if p.is_file() or p.is_symlink()] 

338 

339 # Filter by extensions if provided 

340 if extensions: 

341 # Convert extensions to lowercase for case-insensitive comparison 

342 lowercase_extensions = {ext.lower() for ext in extensions} 

343 files = [f for f in files if f.suffix.lower() in lowercase_extensions] 

344 

345 # Return paths as strings 

346 return [str(f) for f in files] 

347 

348 def _list_files_breadth_first(self, directory: Path, pattern: Optional[str] = None) -> List[Path]: 

349 """ 

350 List files using breadth-first traversal to prioritize shallower files. 

351 

352 This ensures that files in the root directory are found before files 

353 in subdirectories, which is important for metadata detection. 

354 

355 Args: 

356 directory: Root directory to search 

357 pattern: Optional glob pattern to match filenames 

358 

359 Returns: 

360 List of file paths sorted by depth (shallower first) 

361 """ 

362 from collections import deque 

363 

364 files = [] 

365 # Use deque for breadth-first traversal 

366 dirs_to_search = deque([(directory, 0)]) # (path, depth) 

367 

368 while dirs_to_search: 

369 current_dir, depth = dirs_to_search.popleft() 

370 

371 try: 

372 # Get all entries in current directory 

373 for entry in current_dir.iterdir(): 

374 if entry.is_file(): 

375 # Check if file matches pattern 

376 if pattern is None or entry.match(pattern): 376 ↛ 373line 376 didn't jump to line 373 because the condition on line 376 was always true

377 files.append((entry, depth)) 

378 elif entry.is_dir(): 378 ↛ 373line 378 didn't jump to line 373 because the condition on line 378 was always true

379 # Add subdirectory to queue for later processing 

380 dirs_to_search.append((entry, depth + 1)) 

381 except (PermissionError, OSError): 

382 # Skip directories we can't read 

383 continue 

384 

385 # Sort by depth first, then by path for consistent ordering 

386 files.sort(key=lambda x: (x[1], str(x[0]))) 

387 

388 # Return just the paths 

389 return [file_path for file_path, _ in files] 

390 

391 def list_dir(self, path: Union[str, Path]) -> List[str]: 

392 path = Path(path) 

393 if not path.exists(): 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 raise FileNotFoundError(f"Path does not exist: {path}") 

395 if not path.is_dir(): 

396 raise NotADirectoryError(f"Not a directory: {path}") 

397 return [entry.name for entry in path.iterdir()] 

398 

399 

400 def delete(self, path: Union[str, Path]) -> None: 

401 """ 

402 Delete a file or empty directory at the given disk path. 

403 

404 Args: 

405 path: Path to delete 

406 

407 Raises: 

408 FileNotFoundError: If path does not exist 

409 IsADirectoryError: If path is a directory and not empty 

410 StorageResolutionError: If deletion fails for unknown reasons 

411 """ 

412 path = Path(path) 

413 

414 if not path.exists(): 414 ↛ 415line 414 didn't jump to line 415 because the condition on line 414 was never true

415 raise FileNotFoundError(f"Cannot delete: path does not exist: {path}") 

416 

417 try: 

418 if path.is_dir(): 

419 # Do not allow recursive deletion 

420 path.rmdir() # will raise OSError if directory is not empty 

421 else: 

422 path.unlink() 

423 except IsADirectoryError: 

424 raise 

425 except OSError as e: 

426 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") from e 

427 except Exception as e: 

428 raise StorageResolutionError(f"Failed to delete {path}") from e 

429 

430 def delete_all(self, path: Union[str, Path]) -> None: 

431 """ 

432 Recursively delete a file or directory and all its contents from disk. 

433 

434 Args: 

435 path: Filesystem path to delete 

436 

437 Raises: 

438 FileNotFoundError: If the path does not exist 

439 StorageResolutionError: If deletion fails for any reason 

440 """ 

441 path = Path(path) 

442 

443 if not path.exists(): 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 raise FileNotFoundError(f"Path does not exist: {path}") 

445 

446 try: 

447 if path.is_file(): 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true

448 path.unlink() 

449 else: 

450 # Safe, recursive removal of directories 

451 import shutil 

452 shutil.rmtree(path) 

453 except Exception as e: 

454 raise StorageResolutionError(f"Failed to recursively delete: {path}") from e 

455 

456 

457 def ensure_directory(self, directory: Union[str, Path]) -> Union[str, Path]: 

458 """ 

459 Ensure a directory exists on disk. 

460 

461 Args: 

462 directory: Path to the directory to ensure exists 

463 

464 Returns: 

465 Path to the directory 

466 

467 Raises: 

468 TypeError: If directory is not a valid path type 

469 ValueError: If there is an error creating the directory 

470 """ 

471 # 🔒 Clause 17 — VFS Boundary Enforcement 

472 try: 

473 disk_directory = Path(directory) 

474 disk_directory.mkdir(parents=True, exist_ok=True) 

475 return directory 

476 except OSError as e: 

477 # 🔒 Clause 65 — No Fallback Logic 

478 # Propagate the error with additional context 

479 raise ValueError(f"Error creating directory {disk_directory}: {e}") from e 

480 

481 def exists(self, path: Union[str, Path]) -> bool: 

482 return Path(path).exists() 

483 

484 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

485 source = Path(source).resolve() 

486 link_name = Path(link_name) # Don't resolve link_name - we want the actual symlink path 

487 

488 if not source.exists(): 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 raise FileNotFoundError(f"Source path does not exist: {source}") 

490 

491 # Check if target exists and handle overwrite policy 

492 if link_name.exists() or link_name.is_symlink(): 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 if not overwrite: 

494 raise FileExistsError(f"Target already exists: {link_name}") 

495 link_name.unlink() # Remove existing file/symlink only if overwrite=True 

496 

497 link_name.parent.mkdir(parents=True, exist_ok=True) 

498 link_name.symlink_to(source) 

499 

500 

501 def is_symlink(self, path: Union[str, Path]) -> bool: 

502 return Path(path).is_symlink() 

503 

504 

505 def is_file(self, path: Union[str, Path]) -> bool: 

506 path = Path(path) 

507 

508 if not path.exists(): 508 ↛ 509line 508 didn't jump to line 509 because the condition on line 508 was never true

509 raise FileNotFoundError(f"Path does not exist: {path}") 

510 

511 # Resolve symlinks and return True only if final target is a file 

512 resolved = path.resolve(strict=True) 

513 

514 if resolved.is_dir(): 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true

515 raise IsADirectoryError(f"Path is a directory: {path}") 

516 

517 return resolved.is_file() 

518 

519 def is_dir(self, path: Union[str, Path]) -> bool: 

520 """ 

521 Check if a given disk path is a directory. 

522 

523 Follows filesystem symlinks to determine the actual resolved structure. 

524 

525 Args: 

526 path: Filesystem path (absolute or relative) 

527 

528 Returns: 

529 bool: True if path resolves to a directory 

530 

531 Raises: 

532 FileNotFoundError: If the path or symlink target does not exist 

533 NotADirectoryError: If the resolved target is not a directory 

534 StorageResolutionError: For unexpected filesystem resolution errors 

535 """ 

536 from pathlib import Path 

537 

538 try: 

539 path = Path(path) 

540 

541 if not path.exists(): 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true

542 raise FileNotFoundError(f"Path does not exist: {path}") 

543 

544 # Follow symlinks to final real target 

545 resolved = path.resolve(strict=True) 

546 

547 if not resolved.is_dir(): 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 raise NotADirectoryError(f"Path is not a directory: {path}") 

549 

550 return True 

551 

552 except FileNotFoundError: 

553 raise # broken symlink or missing path 

554 except NotADirectoryError: 

555 raise 

556 except Exception as e: 

557 raise StorageResolutionError(f"Failed to resolve directory: {path}") from e 

558 

559 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

560 """ 

561 Move a file or directory on disk. Follows symlinks and performs overwrite-safe move. 

562 

563 Raises: 

564 FileNotFoundError: If source does not exist 

565 FileExistsError: If destination already exists 

566 StorageResolutionError: On failure to move 

567 """ 

568 import shutil 

569 from pathlib import Path 

570 

571 src = Path(src) 

572 dst = Path(dst) 

573 

574 if not src.exists(): 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true

575 raise FileNotFoundError(f"Source path does not exist: {src}") 

576 if dst.exists(): 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true

577 raise FileExistsError(f"Destination already exists: {dst}") 

578 

579 try: 

580 shutil.move(str(src), str(dst)) 

581 except Exception as e: 

582 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e 

583 

584 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

585 """ 

586 Return structural metadata about a disk-backed path. 

587 

588 Returns: 

589 dict with keys: 

590 - 'type': 'file', 'directory', 'symlink', or 'missing' 

591 - 'path': str(path) 

592 - 'target': resolved target if symlink 

593 - 'exists': bool 

594 

595 Raises: 

596 StorageResolutionError: On access or resolution failure 

597 """ 

598 path_str = str(path) 

599 try: 

600 if not os.path.lexists(path_str): # includes broken symlinks 

601 return { 

602 "type": "missing", 

603 "path": path_str, 

604 "exists": False 

605 } 

606 

607 if os.path.islink(path_str): 

608 try: 

609 resolved = os.readlink(path_str) 

610 target_exists = os.path.exists(path_str) 

611 except OSError as e: 

612 raise StorageResolutionError(f"Failed to resolve symlink: {path}") from e 

613 

614 return { 

615 "type": "symlink", 

616 "path": path_str, 

617 "target": resolved, 

618 "exists": target_exists 

619 } 

620 

621 if os.path.isdir(path_str): 

622 return { 

623 "type": "directory", 

624 "path": path_str, 

625 "exists": True 

626 } 

627 

628 if os.path.isfile(path_str): 

629 return { 

630 "type": "file", 

631 "path": path_str, 

632 "exists": True 

633 } 

634 

635 raise StorageResolutionError(f"Unknown filesystem object at: {path_str}") 

636 

637 except Exception as e: 

638 raise StorageResolutionError(f"Failed to stat disk path: {path}") from e 

639 

640 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

641 """ 

642 Copy a file or directory to a new location. 

643  

644 - Does not overwrite destination. 

645 - Will raise if destination exists. 

646 - Supports file-to-file and dir-to-dir copies. 

647  

648 Raises: 

649 FileExistsError: If destination already exists 

650 FileNotFoundError: If source is missing 

651 StorageResolutionError: On structural failure 

652 """ 

653 src = Path(src) 

654 dst = Path(dst) 

655 

656 if not src.exists(): 

657 raise FileNotFoundError(f"Source does not exist: {src}") 

658 if dst.exists(): 

659 raise FileExistsError(f"Destination already exists: {dst}") 

660 

661 try: 

662 if src.is_dir(): 

663 shutil.copytree(src, dst) 

664 else: 

665 shutil.copy2(src, dst) 

666 except Exception as e: 

667 raise StorageResolutionError(f"Failed to copy {src}{dst}") from e