Coverage for openhcs/io/disk.py: 64.8%

270 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1# openhcs/io/storage/backends/disk.py 

2""" 

3Disk-based storage backend implementation. 

4 

5This module provides a concrete implementation of the storage backend interfaces 

6for local disk storage. It strictly enforces VFS boundaries and doctrinal clauses. 

7""" 

8 

9import fnmatch 

10import logging 

11import os 

12import shutil 

13from pathlib import Path 

14from typing import Any, Callable, Dict, List, Optional, Set, Union 

15from os import PathLike 

16 

17import numpy as np 

18 

19from openhcs.constants.constants import FileFormat 

20from openhcs.io.base import StorageBackend 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25def optional_import(module_name): 

26 try: 

27 return __import__(module_name) 

28 except ImportError: 

29 return None 

30 

31# Optional dependencies at module level (not instance level to avoid pickle issues) 

32torch = optional_import("torch") 

33jax = optional_import("jax") 

34jnp = optional_import("jax.numpy") 

35cupy = optional_import("cupy") 

36tf = optional_import("tensorflow") 

37tifffile = optional_import("tifffile") 

38 

39class FileFormatRegistry: 

40 def __init__(self): 

41 self._writers: Dict[str, Callable[[Path, Any], None]] = {} 

42 self._readers: Dict[str, Callable[[Path], Any]] = {} 

43 

44 def register(self, ext: str, writer: Callable, reader: Callable): 

45 ext = ext.lower() 

46 self._writers[ext] = writer 

47 self._readers[ext] = reader 

48 

49 def get_writer(self, ext: str) -> Callable: 

50 return self._writers[ext.lower()] 

51 

52 def get_reader(self, ext: str) -> Callable: 

53 return self._readers[ext.lower()] 

54 

55 def is_registered(self, ext: str) -> bool: 

56 return ext.lower() in self._writers and ext.lower() in self._readers 

57 

58 

59class DiskStorageBackend(StorageBackend): 

60 def __init__(self): 

61 self.format_registry = FileFormatRegistry() 

62 self._register_formats() 

63 

64 def _register_formats(self): 

65 formats = [] 

66 

67 # NumPy 

68 formats.append(( 

69 FileFormat.NUMPY.value, 

70 np.save, 

71 np.load 

72 )) 

73 

74 if torch: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 formats.append(( 

76 FileFormat.TORCH.value, 

77 torch.save, 

78 torch.load 

79 )) 

80 

81 if jax and jnp: 81 ↛ 89line 81 didn't jump to line 89 because the condition on line 81 was always true

82 formats.append(( 

83 FileFormat.JAX.value, 

84 self._jax_writer, 

85 self._jax_reader 

86 )) 

87 

88 # CuPy 

89 if cupy: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 formats.append(( 

91 FileFormat.CUPY.value, 

92 self._cupy_writer, 

93 self._cupy_reader 

94 )) 

95 

96 # TensorFlow 

97 if tf: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 formats.append(( 

99 FileFormat.TENSORFLOW.value, 

100 self._tensorflow_writer, 

101 self._tensorflow_reader 

102 )) 

103 

104 # TIFF 

105 if tifffile: 105 ↛ 113line 105 didn't jump to line 113 because the condition on line 105 was always true

106 formats.append(( 

107 FileFormat.TIFF.value, 

108 self._tiff_writer, 

109 self._tiff_reader 

110 )) 

111 

112 # Plain Text 

113 formats.append(( 

114 FileFormat.TEXT.value, 

115 self._text_writer, 

116 self._text_reader 

117 )) 

118 

119 # Register everything 

120 for extensions, writer, reader in formats: 

121 for ext in extensions: 

122 self.format_registry.register(ext.lower(), writer, reader) 

123 

124 # Format-specific writer/reader functions (pickleable) 

125 def _jax_writer(self, path, data, **kwargs): 

126 np.save(path, jax.device_get(data)) 

127 

128 def _jax_reader(self, path): 

129 return jnp.array(np.load(path)) 

130 

131 def _cupy_writer(self, path, data, **kwargs): 

132 cupy.save(path, data) 

133 

134 def _cupy_reader(self, path): 

135 return cupy.load(path) 

136 

137 def _tensorflow_writer(self, path, data, **kwargs): 

138 tf.io.write_file(path.as_posix(), tf.io.serialize_tensor(data)) 

139 

140 def _tensorflow_reader(self, path): 

141 return tf.io.parse_tensor(tf.io.read_file(path.as_posix()), out_type=tf.dtypes.float32) 

142 

143 def _tiff_writer(self, path, data, **kwargs): 

144 tifffile.imwrite(path, data) 

145 

146 def _tiff_reader(self, path): 

147 return tifffile.imread(path) 

148 

149 def _text_writer(self, path, data): 

150 path.write_text(str(data)) 

151 

152 def _text_reader(self, path): 

153 return path.read_text() 

154 

155 

156 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

157 """ 

158 Load data from disk based on explicit content type. 

159 

160 Args: 

161 file_path: Path to the file to load 

162 **kwargs: Additional arguments for the load operation, must include 'content_type' 

163 to explicitly specify the type of content to load 

164 

165 Returns: 

166 The loaded data 

167 

168 Raises: 

169 TypeError: If file_path is not a valid path type or content_type is not specified 

170 FileNotFoundError: If the file does not exist 

171 ValueError: If the file cannot be loaded 

172 """ 

173 

174 disk_path = Path(file_path) 

175 ext = disk_path.suffix.lower() 

176 if not self.format_registry.is_registered(ext): 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 raise ValueError(f"No writer registered for extension '{ext}'") 

178 

179 try: 

180 reader = self.format_registry.get_reader(ext) 

181 return reader(disk_path, **kwargs) 

182 except Exception as e: 

183 raise ValueError(f"Error loading data from {disk_path}: {e}") from e 

184 

185 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None: 

186 """ 

187 Save data to disk based on explicit content type. 

188 

189 Args: 

190 data: The data to save 

191 output_path: Path where the data should be saved 

192 **kwargs: Additional arguments for the save operation, must include 'content_type' 

193 to explicitly specify the type of content to save 

194 

195 Raises: 

196 TypeError: If output_path is not a valid path type or content_type is not specified 

197 ValueError: If the data cannot be saved 

198 """ 

199 disk_output_path = Path(output_path) 

200 ext = disk_output_path.suffix.lower() 

201 if not self.format_registry.is_registered(ext): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise ValueError(f"No writer registered for extension '{ext}'") 

203 

204 try: 

205 writer = self.format_registry.get_writer(ext) 

206 return writer(disk_output_path, data, **kwargs ) 

207 except Exception as e: 

208 raise ValueError(f"Error saving data to {disk_output_path}: {e}") from e 

209 

210 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

211 """ 

212 Load multiple files sequentially using existing load method. 

213 

214 Args: 

215 file_paths: List of file paths to load 

216 **kwargs: Additional arguments passed to load method 

217 

218 Returns: 

219 List of loaded data objects in the same order as file_paths 

220 """ 

221 results = [] 

222 for file_path in file_paths: 

223 result = self.load(file_path, **kwargs) 

224 results.append(result) 

225 return results 

226 

227 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

228 """ 

229 Save multiple files sequentially using existing save method. 

230 

231 Converts GPU arrays to CPU numpy arrays before saving using OpenHCS memory conversion system. 

232 

233 Args: 

234 data_list: List of data objects to save 

235 output_paths: List of destination paths (must match length of data_list) 

236 **kwargs: Additional arguments passed to save method 

237 

238 Raises: 

239 ValueError: If data_list and output_paths have different lengths 

240 """ 

241 if len(data_list) != len(output_paths): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})") 

243 

244 # Convert GPU arrays to CPU numpy arrays using OpenHCS memory conversion system 

245 from openhcs.core.memory.converters import convert_memory 

246 from openhcs.core.memory.stack_utils import _detect_memory_type 

247 from openhcs.constants.constants import MemoryType 

248 

249 cpu_data_list = [] 

250 for data in data_list: 

251 # Detect the memory type of the data 

252 source_type = _detect_memory_type(data) 

253 

254 # Convert to numpy if not already numpy 

255 if source_type == MemoryType.NUMPY.value: 255 ↛ 261line 255 didn't jump to line 261 because the condition on line 255 was always true

256 # Already numpy, use as-is 

257 cpu_data_list.append(data) 

258 else: 

259 # Convert to numpy using OpenHCS memory conversion system 

260 # Allow CPU roundtrip since we're explicitly going to disk 

261 numpy_data = convert_memory( 

262 data=data, 

263 source_type=source_type, 

264 target_type=MemoryType.NUMPY.value, 

265 gpu_id=0, # Placeholder since numpy doesn't use GPU ID 

266 allow_cpu_roundtrip=True 

267 ) 

268 cpu_data_list.append(numpy_data) 

269 

270 # Save converted data using existing save method 

271 for cpu_data, output_path in zip(cpu_data_list, output_paths): 

272 self.save(cpu_data, output_path, **kwargs) 

273 

274 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None, 

275 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Union[str,Path]]: 

276 """ 

277 List files on disk, optionally filtering by pattern and extensions. 

278 

279 Args: 

280 directory: Directory to search. 

281 pattern: Optional glob pattern to match filenames. 

282 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}). 

283 Extensions should include the dot and are case-insensitive. 

284 recursive: Whether to search recursively. 

285 

286 Returns: 

287 List of paths to matching files. 

288 

289 Raises: 

290 TypeError: If directory is not a valid path type 

291 FileNotFoundError: If the directory does not exist 

292 """ 

293 disk_directory = Path(directory) 

294 

295 

296 if not disk_directory.is_dir(): 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 raise ValueError(f"Path is not a directory: {disk_directory}") 

298 

299 # Use appropriate search strategy based on recursion 

300 if recursive: 

301 # Use breadth-first traversal to prioritize shallower files 

302 files = self._list_files_breadth_first(disk_directory, pattern) 

303 else: 

304 glob_pattern = pattern if pattern else "*" 

305 files = [p for p in disk_directory.glob(glob_pattern) if p.is_file()] 

306 

307 # Filter by extensions if provided 

308 if extensions: 

309 # Convert extensions to lowercase for case-insensitive comparison 

310 lowercase_extensions = {ext.lower() for ext in extensions} 

311 files = [f for f in files if f.suffix.lower() in lowercase_extensions] 

312 

313 # Return paths as strings 

314 return [str(f) for f in files] 

315 

316 def _list_files_breadth_first(self, directory: Path, pattern: Optional[str] = None) -> List[Path]: 

317 """ 

318 List files using breadth-first traversal to prioritize shallower files. 

319 

320 This ensures that files in the root directory are found before files 

321 in subdirectories, which is important for metadata detection. 

322 

323 Args: 

324 directory: Root directory to search 

325 pattern: Optional glob pattern to match filenames 

326 

327 Returns: 

328 List of file paths sorted by depth (shallower first) 

329 """ 

330 from collections import deque 

331 

332 files = [] 

333 # Use deque for breadth-first traversal 

334 dirs_to_search = deque([(directory, 0)]) # (path, depth) 

335 

336 while dirs_to_search: 

337 current_dir, depth = dirs_to_search.popleft() 

338 

339 try: 

340 # Get all entries in current directory 

341 for entry in current_dir.iterdir(): 

342 if entry.is_file(): 

343 # Check if file matches pattern 

344 if pattern is None or entry.match(pattern): 344 ↛ 341line 344 didn't jump to line 341 because the condition on line 344 was always true

345 files.append((entry, depth)) 

346 elif entry.is_dir(): 346 ↛ 341line 346 didn't jump to line 341 because the condition on line 346 was always true

347 # Add subdirectory to queue for later processing 

348 dirs_to_search.append((entry, depth + 1)) 

349 except (PermissionError, OSError): 

350 # Skip directories we can't read 

351 continue 

352 

353 # Sort by depth first, then by path for consistent ordering 

354 files.sort(key=lambda x: (x[1], str(x[0]))) 

355 

356 # Return just the paths 

357 return [file_path for file_path, _ in files] 

358 

359 def list_dir(self, path: Union[str, Path]) -> List[str]: 

360 path = Path(path) 

361 if not path.exists(): 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 raise FileNotFoundError(f"Path does not exist: {path}") 

363 if not path.is_dir(): 

364 raise NotADirectoryError(f"Not a directory: {path}") 

365 return [entry.name for entry in path.iterdir()] 

366 

367 

368 def delete(self, path: Union[str, Path]) -> None: 

369 """ 

370 Delete a file or empty directory at the given disk path. 

371 

372 Args: 

373 path: Path to delete 

374 

375 Raises: 

376 FileNotFoundError: If path does not exist 

377 IsADirectoryError: If path is a directory and not empty 

378 StorageResolutionError: If deletion fails for unknown reasons 

379 """ 

380 path = Path(path) 

381 

382 if not path.exists(): 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true

383 raise FileNotFoundError(f"Cannot delete: path does not exist: {path}") 

384 

385 try: 

386 if path.is_dir(): 

387 # Do not allow recursive deletion 

388 path.rmdir() # will raise OSError if directory is not empty 

389 else: 

390 path.unlink() 

391 except IsADirectoryError: 

392 raise 

393 except OSError as e: 

394 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") from e 

395 except Exception as e: 

396 raise StorageResolutionError(f"Failed to delete {path}") from e 

397 

398 def delete_all(self, path: Union[str, Path]) -> None: 

399 """ 

400 Recursively delete a file or directory and all its contents from disk. 

401 

402 Args: 

403 path: Filesystem path to delete 

404 

405 Raises: 

406 FileNotFoundError: If the path does not exist 

407 StorageResolutionError: If deletion fails for any reason 

408 """ 

409 path = Path(path) 

410 

411 if not path.exists(): 411 ↛ 412line 411 didn't jump to line 412 because the condition on line 411 was never true

412 raise FileNotFoundError(f"Path does not exist: {path}") 

413 

414 try: 

415 if path.is_file(): 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true

416 path.unlink() 

417 else: 

418 # Safe, recursive removal of directories 

419 import shutil 

420 shutil.rmtree(path) 

421 except Exception as e: 

422 raise StorageResolutionError(f"Failed to recursively delete: {path}") from e 

423 

424 

425 def ensure_directory(self, directory: Union[str, Path]) -> Union[str, Path]: 

426 """ 

427 Ensure a directory exists on disk. 

428 

429 Args: 

430 directory: Path to the directory to ensure exists 

431 

432 Returns: 

433 Path to the directory 

434 

435 Raises: 

436 TypeError: If directory is not a valid path type 

437 ValueError: If there is an error creating the directory 

438 """ 

439 # 🔒 Clause 17 — VFS Boundary Enforcement 

440 try: 

441 disk_directory = Path(directory) 

442 disk_directory.mkdir(parents=True, exist_ok=True) 

443 return directory 

444 except OSError as e: 

445 # 🔒 Clause 65 — No Fallback Logic 

446 # Propagate the error with additional context 

447 raise ValueError(f"Error creating directory {disk_directory}: {e}") from e 

448 

449 def exists(self, path: Union[str, Path]) -> bool: 

450 return Path(path).exists() 

451 

452 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

453 source = Path(source).resolve() 

454 link_name = Path(link_name) # Don't resolve link_name - we want the actual symlink path 

455 

456 if not source.exists(): 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 raise FileNotFoundError(f"Source path does not exist: {source}") 

458 

459 # Check if target exists and handle overwrite policy 

460 if link_name.exists() or link_name.is_symlink(): 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 if not overwrite: 

462 raise FileExistsError(f"Target already exists: {link_name}") 

463 link_name.unlink() # Remove existing file/symlink only if overwrite=True 

464 

465 link_name.parent.mkdir(parents=True, exist_ok=True) 

466 link_name.symlink_to(source) 

467 

468 

469 def is_symlink(self, path: Union[str, Path]) -> bool: 

470 return Path(path).is_symlink() 

471 

472 

473 def is_file(self, path: Union[str, Path]) -> bool: 

474 path = Path(path) 

475 

476 if not path.exists(): 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 raise FileNotFoundError(f"Path does not exist: {path}") 

478 

479 # Resolve symlinks and return True only if final target is a file 

480 resolved = path.resolve(strict=True) 

481 

482 if resolved.is_dir(): 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true

483 raise IsADirectoryError(f"Path is a directory: {path}") 

484 

485 return resolved.is_file() 

486 

487 def is_dir(self, path: Union[str, Path]) -> bool: 

488 """ 

489 Check if a given disk path is a directory. 

490 

491 Follows filesystem symlinks to determine the actual resolved structure. 

492 

493 Args: 

494 path: Filesystem path (absolute or relative) 

495 

496 Returns: 

497 bool: True if path resolves to a directory 

498 

499 Raises: 

500 FileNotFoundError: If the path or symlink target does not exist 

501 NotADirectoryError: If the resolved target is not a directory 

502 StorageResolutionError: For unexpected filesystem resolution errors 

503 """ 

504 from pathlib import Path 

505 

506 try: 

507 path = Path(path) 

508 

509 if not path.exists(): 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true

510 raise FileNotFoundError(f"Path does not exist: {path}") 

511 

512 # Follow symlinks to final real target 

513 resolved = path.resolve(strict=True) 

514 

515 if not resolved.is_dir(): 515 ↛ 516line 515 didn't jump to line 516 because the condition on line 515 was never true

516 raise NotADirectoryError(f"Path is not a directory: {path}") 

517 

518 return True 

519 

520 except FileNotFoundError: 

521 raise # broken symlink or missing path 

522 except NotADirectoryError: 

523 raise 

524 except Exception as e: 

525 raise StorageResolutionError(f"Failed to resolve directory: {path}") from e 

526 

527 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

528 """ 

529 Move a file or directory on disk. Follows symlinks and performs overwrite-safe move. 

530 

531 Raises: 

532 FileNotFoundError: If source does not exist 

533 FileExistsError: If destination already exists 

534 StorageResolutionError: On failure to move 

535 """ 

536 import shutil 

537 from pathlib import Path 

538 

539 src = Path(src) 

540 dst = Path(dst) 

541 

542 if not src.exists(): 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true

543 raise FileNotFoundError(f"Source path does not exist: {src}") 

544 if dst.exists(): 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true

545 raise FileExistsError(f"Destination already exists: {dst}") 

546 

547 try: 

548 shutil.move(str(src), str(dst)) 

549 except Exception as e: 

550 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e 

551 

552 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

553 """ 

554 Return structural metadata about a disk-backed path. 

555 

556 Returns: 

557 dict with keys: 

558 - 'type': 'file', 'directory', 'symlink', or 'missing' 

559 - 'path': str(path) 

560 - 'target': resolved target if symlink 

561 - 'exists': bool 

562 

563 Raises: 

564 StorageResolutionError: On access or resolution failure 

565 """ 

566 path_str = str(path) 

567 try: 

568 if not os.path.lexists(path_str): # includes broken symlinks 

569 return { 

570 "type": "missing", 

571 "path": path_str, 

572 "exists": False 

573 } 

574 

575 if os.path.islink(path_str): 

576 try: 

577 resolved = os.readlink(path_str) 

578 target_exists = os.path.exists(path_str) 

579 except OSError as e: 

580 raise StorageResolutionError(f"Failed to resolve symlink: {path}") from e 

581 

582 return { 

583 "type": "symlink", 

584 "path": path_str, 

585 "target": resolved, 

586 "exists": target_exists 

587 } 

588 

589 if os.path.isdir(path_str): 

590 return { 

591 "type": "directory", 

592 "path": path_str, 

593 "exists": True 

594 } 

595 

596 if os.path.isfile(path_str): 

597 return { 

598 "type": "file", 

599 "path": path_str, 

600 "exists": True 

601 } 

602 

603 raise StorageResolutionError(f"Unknown filesystem object at: {path_str}") 

604 

605 except Exception as e: 

606 raise StorageResolutionError(f"Failed to stat disk path: {path}") from e 

607 

608 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

609 """ 

610 Copy a file or directory to a new location. 

611  

612 - Does not overwrite destination. 

613 - Will raise if destination exists. 

614 - Supports file-to-file and dir-to-dir copies. 

615  

616 Raises: 

617 FileExistsError: If destination already exists 

618 FileNotFoundError: If source is missing 

619 StorageResolutionError: On structural failure 

620 """ 

621 src = Path(src) 

622 dst = Path(dst) 

623 

624 if not src.exists(): 

625 raise FileNotFoundError(f"Source does not exist: {src}") 

626 if dst.exists(): 

627 raise FileExistsError(f"Destination already exists: {dst}") 

628 

629 try: 

630 if src.is_dir(): 

631 shutil.copytree(src, dst) 

632 else: 

633 shutil.copy2(src, dst) 

634 except Exception as e: 

635 raise StorageResolutionError(f"Failed to copy {src}{dst}") from e