Coverage for openhcs/io/filemanager.py: 39.5%

220 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2FileManager directory operations. 

3 

4This module contains the directory-related methods of the FileManager class, 

5including directory listing, existence checking, mkdir, symlink, and mirror operations. 

6""" 

7 

8import logging 

9from pathlib import Path 

10from typing import List, Set, Union, Tuple, Any 

11 

12from openhcs.constants.constants import DEFAULT_IMAGE_EXTENSIONS 

13from openhcs.io.base import DataSink 

14from openhcs.io.exceptions import StorageResolutionError 

15 

16logger = logging.getLogger(__name__) 

17 

18class FileManager: 

19 

20 def __init__(self, registry): 

21 """ 

22 Initialize the file manager. 

23 

24 Args: 

25 registry: Registry for storage backends. Must be provided. 

26 Now accepts Dict[str, DataSink] (includes StorageBackend and StreamingBackend) 

27 

28 Raises: 

29 ValueError: If registry is not provided. 

30 

31 Note: 

32 This class is a backend-agnostic router. It maintains no default backend 

33 or fallback behavior, and all state is instance-local and declarative. 

34 Each operation must explicitly specify which backend to use. 

35 

36 Thread Safety: 

37 Each FileManager instance must be scoped to a single execution context. 

38 Do NOT share FileManager instances across pipelines or threads. 

39 For isolation, create a dedicated registry for each FileManager. 

40 """ 

41 # Validate registry parameter 

42 if registry is None: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true

43 raise ValueError("Registry must be provided to FileManager. Default fallback has been removed.") 

44 

45 # Store registry 

46 self.registry = registry 

47 

48 

49 

50 logger.debug("FileManager initialized with registry") 

51 

52 def _get_backend(self, backend_name: str) -> DataSink: 

53 """ 

54 Get a backend by name. 

55 

56 This method uses the instance registry to get the backend instance directly. 

57 All FileManagers that use the same registry share the same backend instances. 

58 

59 Returns DataSink (base interface) - could be StorageBackend or StreamingBackend. 

60 Load operations will fail-loud on StreamingBackend (no load method). 

61 

62 Args: 

63 backend_name: Name of the backend to get (e.g., "disk", "memory", "zarr") 

64 

65 Returns: 

66 The backend instance (DataSink - polymorphic) 

67 

68 Raises: 

69 StorageResolutionError: If the backend is not found in the registry 

70 

71 Thread Safety: 

72 Backend instances are shared across all FileManager instances that use 

73 the same registry. This ensures shared state (especially for memory backend). 

74 """ 

75 # Normalize backend name 

76 backend_name = backend_name.lower() 

77 

78 if backend_name is None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 raise StorageResolutionError(f"Backend '{backend_name}' not found in registry") 

80 

81 try: 

82 # Get the backend instance from the registry dictionary 

83 if backend_name not in self.registry: 

84 raise KeyError(f"Backend '{backend_name}' not found in registry") 

85 

86 # Return the backend instance directly 

87 return self.registry[backend_name] 

88 except Exception as e: 

89 raise StorageResolutionError(f"Failed to get backend '{backend_name}': {e}") from e 

90 

91 def load(self, file_path: Union[str, Path], backend: str, **kwargs) -> Any: 

92 """ 

93 Load data from a file using the specified backend. 

94 

95 This method assumes the file path is already backend-compatible and performs no inference or fallback. 

96 All semantic validation and file format decoding must occur within the backend. 

97 

98 Args: 

99 file_path: Path to the file to load (str or Path) 

100 backend: Backend enum to use for loading (StorageBackendType.DISK, etc.) — POSITIONAL argument 

101 **kwargs: Additional keyword arguments passed to the backend's load method 

102 

103 Returns: 

104 Any: The loaded data object 

105 

106 Raises: 

107 StorageResolutionError: If the backend is not supported or load fails 

108 """ 

109 

110 try: 

111 backend_instance = self._get_backend(backend) 

112 return backend_instance.load(file_path, **kwargs) 

113 except StorageResolutionError: # Allow specific backend errors to propagate 

114 raise 

115 except Exception as e: 

116 logger.error(f"Unexpected error during load from {file_path} with backend {backend}: {e}", exc_info=True) 

117 raise StorageResolutionError( 

118 f"Failed to load file at {file_path} using backend '{backend}'" 

119 ) from e 

120 

121 def save(self, data: Any, output_path: Union[str, Path], backend: str, **kwargs) -> None: 

122 """ 

123 Save data to a file using the specified backend. 

124 

125 This method performs no semantic transformation, format inference, or fallback logic. 

126 It assumes the output path and data are valid and structurally aligned with the backend’s expectations. 

127 

128 Args: 

129 data: The data object to save (e.g., np.ndarray, torch.Tensor, dict, etc.) 

130 output_path: Destination path to write to (str or Path) 

131 backend: Backend enum to use for saving (StorageBackendType.DISK, etc.) — POSITIONAL argument 

132 **kwargs: Additional keyword arguments passed to the backend's save method 

133 

134 Raises: 

135 StorageResolutionError: If the backend is not supported or save fails 

136 """ 

137 

138 try: 

139 backend_instance = self._get_backend(backend) 

140 

141 # If materialization context exists, merge it into kwargs 

142 # This allows backends to access context like images_dir for OMERO ROI/analysis linking 

143 if hasattr(self, '_materialization_context') and self._materialization_context: 

144 # Merge context into kwargs (kwargs takes precedence if keys overlap) 

145 merged_kwargs = {**self._materialization_context, **kwargs} 

146 backend_instance.save(data, output_path, **merged_kwargs) 

147 else: 

148 backend_instance.save(data, output_path, **kwargs) 

149 except StorageResolutionError: # Allow specific backend errors to propagate if they are StorageResolutionError 

150 raise 

151 except Exception as e: 

152 logger.error(f"Unexpected error during save to {output_path} with backend {backend}: {e}", exc_info=True) 

153 raise StorageResolutionError( 

154 f"Failed to save data to {output_path} using backend '{backend}'" 

155 ) from e 

156 

157 def load_batch(self, file_paths: List[Union[str, Path]], backend: str, **kwargs) -> List[Any]: 

158 """ 

159 Load multiple files using the specified backend. 

160 

161 Args: 

162 file_paths: List of file paths to load 

163 backend: Backend to use for loading 

164 **kwargs: Additional keyword arguments passed to the backend's load_batch method 

165 

166 Returns: 

167 List of loaded data objects in the same order as file_paths 

168 

169 Raises: 

170 StorageResolutionError: If the backend is not supported or load fails 

171 """ 

172 try: 

173 backend_instance = self._get_backend(backend) 

174 return backend_instance.load_batch(file_paths, **kwargs) 

175 except StorageResolutionError: 

176 raise 

177 except Exception as e: 

178 logger.error(f"Unexpected error during batch load with backend {backend}: {e}", exc_info=True) 

179 raise StorageResolutionError( 

180 f"Failed to load batch of {len(file_paths)} files using backend '{backend}'" 

181 ) from e 

182 

183 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], backend: str, **kwargs) -> None: 

184 """ 

185 Save multiple data objects using the specified backend. 

186 

187 Args: 

188 data_list: List of data objects to save 

189 output_paths: List of destination paths (must match length of data_list) 

190 backend: Backend to use for saving 

191 **kwargs: Additional keyword arguments passed to the backend's save_batch method 

192 

193 Raises: 

194 StorageResolutionError: If the backend is not supported or save fails 

195 ValueError: If data_list and output_paths have different lengths 

196 """ 

197 try: 

198 backend_instance = self._get_backend(backend) 

199 backend_instance.save_batch(data_list, output_paths, **kwargs) 

200 except StorageResolutionError: 

201 raise 

202 except Exception as e: 

203 logger.error(f"Unexpected error during batch save with backend {backend}: {e}", exc_info=True) 

204 raise StorageResolutionError( 

205 f"Failed to save batch of {len(data_list)} files using backend '{backend}'" 

206 ) from e 

207 

208 def list_image_files(self, directory: Union[str, Path], backend: str, 

209 pattern: str = None, extensions: Set[str] = DEFAULT_IMAGE_EXTENSIONS, recursive: bool = False) -> List[str]: 

210 """ 

211 List all image files in a directory using the specified backend. 

212 

213 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

214 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

215 

216 Note: ONLY backend is a POSITIONAL argument. Other parameters may remain as kwargs. 

217 

218 Args: 

219 directory: Directory to search (str or Path) 

220 backend: Backend to use for listing ('disk', 'memory', 'zarr') - POSITIONAL 

221 pattern: Pattern to filter files (e.g., "*.tif") - can be keyword arg 

222 extensions: Set of file extensions to filter by - can be keyword arg 

223 recursive: Whether to search recursively - can be keyword arg 

224 

225 Returns: 

226 List of string paths for image files found 

227 

228 Raises: 

229 StorageResolutionError: If the backend is not supported 

230 TypeError: If directory is not a valid path type 

231 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

232 """ 

233 # Get backend instance 

234 backend_instance = self._get_backend(backend) 

235 

236 # List image files and apply natural sorting 

237 from openhcs.core.utils import natural_sort 

238 files = backend_instance.list_files(str(directory), pattern, extensions, recursive) 

239 return natural_sort(files) 

240 

241 

242 def list_files(self, directory: Union[str, Path], backend: str, 

243 pattern: str = None, extensions: Set[str] = None, recursive: bool = False, 

244 **kwargs) -> List[str]: 

245 """ 

246 List all files in a directory using the specified backend. 

247 

248 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

249 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

250 

251 Note: ONLY backend is a POSITIONAL argument. Other parameters may remain as kwargs. 

252 

253 Args: 

254 directory: Directory to search (str or Path) 

255 backend: Backend to use for listing ('disk', 'memory', 'zarr', 'omero_local') - POSITIONAL 

256 pattern: Pattern to filter files (e.g., "*.txt") - can be keyword arg 

257 extensions: Set of file extensions to filter by - can be keyword arg 

258 recursive: Whether to search recursively - can be keyword arg 

259 **kwargs: Backend-specific arguments (e.g., plate_id for OMERO) 

260 

261 Returns: 

262 List of string paths for files found 

263 

264 Raises: 

265 StorageResolutionError: If the backend is not supported 

266 TypeError: If directory is not a valid path type or required kwargs missing 

267 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

268 """ 

269 # Get backend instance 

270 backend_instance = self._get_backend(backend) 

271 

272 # List files and apply natural sorting 

273 from openhcs.core.utils import natural_sort 

274 files = backend_instance.list_files(str(directory), pattern, extensions, recursive, **kwargs) 

275 return natural_sort(files) 

276 

277 

278 def find_file_recursive(self, directory: Union[str, Path], filename: str, backend: str) -> Union[str, None]: 

279 """ 

280 Find a file recursively in a directory using the specified backend. 

281 

282 This is a convenience method that uses list_files with recursive=True and filters for the specific filename. 

283 

284 Args: 

285 directory: Directory to search (str or Path) 

286 filename: Name of the file to find 

287 backend: Backend to use for listing ('disk', 'memory', 'zarr') - POSITIONAL 

288 

289 Returns: 

290 String path to the file if found, None otherwise 

291 

292 Raises: 

293 StorageResolutionError: If the backend is not supported 

294 TypeError: If directory is not a valid path type 

295 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

296 """ 

297 # List all files recursively 

298 all_files = self.list_files(directory, backend, recursive=True) 

299 

300 # Filter for the specific filename 

301 for file_path in all_files: 

302 if Path(file_path).name == filename: 

303 return file_path 

304 

305 # File not found 

306 return None 

307 

308 

309 def list_dir(self, path: Union[str, Path], backend: str) -> List[str]: 

310 if not isinstance(path, (str, Path)): 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 raise TypeError(f"Expected str or Path, got {type(path)}") 

312 

313 path = str(path) 

314 backend_instance = self._get_backend(backend) 

315 

316 try: 

317 # Get directory listing and apply natural sorting 

318 from openhcs.core.utils import natural_sort 

319 entries = backend_instance.list_dir(str(path)) 

320 return natural_sort(entries) 

321 except (FileNotFoundError, NotADirectoryError): 

322 # Let these bubble up for structural truth-checking 

323 raise 

324 except Exception as e: 

325 # Optional trace wrapper, no type mutation 

326 raise RuntimeError(f"Unexpected failure in list_dir({path}) for backend {backend}") from e 

327 

328 def ensure_directory(self, directory: Union[str, Path], backend: str) -> str: 

329 """ 

330 Ensure a directory exists, creating it if necessary. 

331 

332 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

333 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

334 

335 Note: ONLY backend is a POSITIONAL argument. All parameters are required. 

336 

337 Args: 

338 directory: Directory to ensure exists (str or Path) 

339 backend: Backend to use for directory operations ('disk', 'memory', 'zarr') - POSITIONAL 

340 

341 Returns: 

342 String path to the directory 

343 

344 Raises: 

345 StorageResolutionError: If the backend is not supported 

346 TypeError: If directory is not a valid path type 

347 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

348 """ 

349 # Get backend instance 

350 backend_instance = self._get_backend(backend) 

351 

352 # Ensure directory 

353 return backend_instance.ensure_directory(str(directory)) 

354 

355 

356 

357 def exists(self, path: Union[str, Path], backend: str) -> bool: 

358 """ 

359 Check if a path exists. 

360 

361 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

362 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

363 

364 Note: ONLY backend is a POSITIONAL argument. All parameters are required. 

365 

366 Args: 

367 path: Path to check (str or Path) 

368 backend: Backend to use for checking ('disk', 'memory', 'zarr') - POSITIONAL 

369 

370 Returns: 

371 True if the path exists, False otherwise 

372 

373 Raises: 

374 StorageResolutionError: If the backend is not supported 

375 TypeError: If path is not a valid path type 

376 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

377 """ 

378 # Get backend instance 

379 backend_instance = self._get_backend(backend) 

380 

381 # Check if path exists 

382 return backend_instance.exists(str(path)) 

383 

384 

385 def mirror_directory_with_symlinks( 

386 self, 

387 source_dir: Union[str, Path], 

388 target_dir: Union[str, Path], 

389 backend: str, 

390 recursive: bool = True, 

391 overwrite_symlinks_only: bool = False 

392 ) -> int: 

393 """ 

394 Mirror a directory structure from source to target and create symlinks to all files. 

395 

396 This method performs no semantic validation, normalization, or naming enforcement on the input paths. 

397 It assumes the caller has provided valid, backend-compatible paths and merely dispatches them for execution. 

398 

399 By default, this method will NOT overwrite existing files. Use overwrite_symlinks_only=True to allow 

400 overwriting existing symlinks (but not regular files). 

401 

402 Note: ONLY backend is a POSITIONAL argument. Other parameters may remain as kwargs. 

403 

404 Args: 

405 source_dir: Path to the source directory to mirror (str or Path) 

406 target_dir: Path to the target directory where the mirrored structure will be created (str or Path) 

407 backend: Backend to use for mirroring ('disk', 'memory', 'zarr') - POSITIONAL 

408 recursive: Whether to recursively mirror subdirectories - can be keyword arg 

409 overwrite_symlinks_only: If True, allows overwriting existing symlinks but blocks overwriting regular files. 

410 If False (default), no overwriting is allowed. - can be keyword arg 

411 

412 Returns: 

413 int: Number of symlinks created 

414 

415 Raises: 

416 StorageResolutionError: If the backend is not supported 

417 FileExistsError: If target files exist and overwrite_symlinks_only=False, or if trying to overwrite regular files 

418 TypeError: If source_dir or target_dir is not a valid path type 

419 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

420 """ 

421 # Get backend instance 

422 backend_instance = self._get_backend(backend) 

423 # Mirror the directory structure and create symlinks for files recursively 

424 self.ensure_directory(target_dir, backend) 

425 try: 

426 # Ensure target directory exists 

427 

428 # Count symlinks 

429 symlink_count = 0 

430 

431 # Get all directories under source_dir (including source_dir itself) 

432 

433 _, all_files = self.collect_dirs_and_files(source_dir, backend, recursive=True) 

434 

435 # 1. Ensure base target exists 

436 self.ensure_directory(target_dir, backend) 

437 

438 # 2. Symlink all file paths 

439 for file_path in all_files: 

440 rel_path = Path(file_path).relative_to(Path(source_dir)) 

441 symlink_path = Path(target_dir) / rel_path 

442 self.create_symlink(file_path, str(symlink_path), backend, overwrite_symlinks_only=overwrite_symlinks_only) 

443 symlink_count += 1 

444 

445 return symlink_count 

446 

447 except Exception as e: 

448 raise StorageResolutionError(f"Failed to mirror directory {source_dir} to {target_dir} with backend {backend}") from e 

449 

450 def create_symlink( 

451 self, 

452 source_path: Union[str, Path], 

453 symlink_path: Union[str, Path], 

454 backend: str, 

455 overwrite_symlinks_only: bool = False 

456 ) -> bool: 

457 """ 

458 Create a symbolic link from source_path to symlink_path. 

459 

460 This method performs no semantic validation, normalization, or naming enforcement on the input paths. 

461 It assumes the caller has provided valid, backend-compatible paths and merely dispatches them for execution. 

462 

463 Note: ONLY backend is a POSITIONAL argument. All parameters are required. 

464 

465 Args: 

466 source_path: Path to the source file or directory (str or Path) 

467 symlink_path: Path where the symlink should be created (str or Path) 

468 backend: Backend to use for symlink creation ('disk', 'memory', 'zarr') - POSITIONAL 

469 overwrite_symlinks_only: If True, only allow overwriting existing symlinks (not regular files) 

470 

471 Returns: 

472 bool: True if successful, False otherwise 

473 

474 Raises: 

475 StorageResolutionError: If the backend is not supported 

476 FileExistsError: If target exists and is not a symlink when overwrite_symlinks_only=True 

477 VFSTypeError: If source_path or symlink_path cannot be converted to internal path format 

478 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

479 """ 

480 # Get backend instance 

481 backend_instance = self._get_backend(backend) 

482 

483 # Check if target exists and handle overwrite policy 

484 try: 

485 if backend_instance.exists(str(symlink_path)): 

486 if overwrite_symlinks_only: 

487 # Check if existing target is a symlink 

488 if not self.is_symlink(symlink_path, backend): 

489 raise FileExistsError( 

490 f"Target exists and is not a symlink (overwrite_symlinks_only=True): {symlink_path}" 

491 ) 

492 # Target is a symlink, allow overwrite 

493 backend_instance.create_symlink(str(source_path), str(symlink_path), overwrite=True) 

494 else: 

495 # No overwrite allowed 

496 raise FileExistsError(f"Target already exists: {symlink_path}") 

497 else: 

498 # Target doesn't exist, create new symlink 

499 backend_instance.create_symlink(str(source_path), str(symlink_path), overwrite=False) 

500 

501 return True 

502 except FileExistsError: 

503 # Re-raise FileExistsError from our check or from backend 

504 raise 

505 except Exception as e: 

506 raise StorageResolutionError( 

507 f"Failed to create symlink from {source_path} to {symlink_path} with backend {backend}" 

508 ) from e 

509 

510 def delete(self, path: Union[str, Path], backend: str, recursive: bool = False) -> bool: 

511 """ 

512 Delete a file or directory. 

513 

514 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

515 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

516 

517 Note: ONLY backend is a POSITIONAL argument. All parameters are required. 

518 

519 Args: 

520 path: Path to the file or directory to delete (str or Path) 

521 backend: Backend to use for deletion ('disk', 'memory', 'zarr') - POSITIONAL 

522 

523 Returns: 

524 True if successful, False otherwise 

525 

526 Raises: 

527 StorageResolutionError: If the backend is not supported 

528 FileNotFoundError: If the file does not exist 

529 TypeError: If the path is not a valid path type 

530 """ 

531 # Get backend instance 

532 backend_instance = self._get_backend(backend) 

533 

534 # Delete the file or directory 

535 try: 

536 # No virtual path conversion needed 

537 return backend_instance.delete(str(path)) 

538 except Exception as e: 

539 raise StorageResolutionError( 

540 f"Failed to delete {path} with backend {backend}" 

541 ) from e 

542 

543 def delete_all(self, path: Union[str, Path], backend: str) -> bool: 

544 """ 

545 Recursively delete a file, symlink, or directory at the given path. 

546  

547 This method performs no fallback, coercion, or resolution — it dispatches to the backend. 

548 All resolution and deletion behavior must be encoded in the backend's `delete_all()` method. 

549  

550 Args: 

551 path: The path to delete 

552 backend: The backend key (e.g., 'disk', 'memory', 'zarr') 

553  

554 Returns: 

555 True if successful 

556  

557 Raises: 

558 StorageResolutionError: If the backend operation fails 

559 FileNotFoundError: If the path does not exist 

560 TypeError: If the path is not a str or Path 

561 """ 

562 backend_instance = self._get_backend(backend) 

563 path_str = str(path) 

564 

565 try: 

566 backend_instance.delete_all(path_str) 

567 return True 

568 except Exception as e: 

569 raise StorageResolutionError( 

570 f"Failed to delete_all({path_str}) using backend '{backend}'" 

571 ) from e 

572 

573 

574 def copy(self, source_path: Union[str, Path], dest_path: Union[str, Path], backend: str) -> bool: 

575 """ 

576 Copy a file, directory, or symlink from source_path to dest_path using the given backend. 

577 

578 - Will NOT overwrite existing files/directories. 

579 - Handles symlinks as first-class objects (not dereferenced). 

580 - Raises on broken links or mismatched structure. 

581 

582 Raises: 

583 FileExistsError: If destination exists 

584 FileNotFoundError: If source does not exist 

585 StorageResolutionError: On backend failure 

586 """ 

587 backend_instance = self._get_backend(backend) 

588 

589 try: 

590 # Prevent overwriting 

591 if backend_instance.exists(dest_path): 

592 raise FileExistsError(f"Destination already exists: {dest_path}") 

593 

594 # Ensure destination parent exists 

595 dest_parent = Path(dest_path).parent 

596 self.ensure_directory(dest_parent, backend) 

597 

598 # Delegate to backend-native copy 

599 return backend_instance.copy(str(source_path), str(dest_path)) 

600 except Exception as e: 

601 raise StorageResolutionError( 

602 f"Failed to copy from {source_path} to {dest_path} on backend {backend}" 

603 ) from e 

604 

605 

606 def move(self, source_path: Union[str, Path], dest_path: Union[str, Path], backend: str, 

607 replace_symlinks: bool = False) -> bool: 

608 """ 

609 Move a file, directory, or symlink from source_path to dest_path. 

610 

611 - Will NOT overwrite by default. 

612 - Preserves symbolic identity (moves links as links). 

613 - Uses backend-native move if available. 

614 - Can optionally replace existing symlinks when replace_symlinks=True. 

615 

616 Args: 

617 source_path: Source file or directory path 

618 dest_path: Destination file or directory path 

619 backend: Backend to use for the operation 

620 replace_symlinks: If True, allows overwriting existing symlinks at destination. 

621 If False (default), raises FileExistsError if destination exists. 

622 

623 Raises: 

624 FileExistsError: If destination exists and replace_symlinks=False, or if 

625 destination exists and is not a symlink when replace_symlinks=True 

626 FileNotFoundError: If source is missing 

627 StorageResolutionError: On backend failure 

628 """ 

629 backend_instance = self._get_backend(backend) 

630 

631 try: 

632 # Handle destination existence based on replace_symlinks setting 

633 if backend_instance.exists(dest_path): 

634 if replace_symlinks: 

635 # Check if destination is a symlink 

636 if backend_instance.is_symlink(dest_path): 

637 logger.debug("Destination is a symlink, removing before move: %s", dest_path) 

638 backend_instance.delete(dest_path) 

639 else: 

640 # Destination exists but is not a symlink 

641 raise FileExistsError(f"Destination already exists and is not a symlink: {dest_path}") 

642 else: 

643 # replace_symlinks=False, don't allow any overwriting 

644 raise FileExistsError(f"Destination already exists: {dest_path}") 

645 

646 dest_parent = Path(dest_path).parent 

647 self.ensure_directory(dest_parent, backend) 

648 return backend_instance.move(str(source_path), str(dest_path)) 

649 

650 except Exception as e: 

651 raise StorageResolutionError( 

652 f"Failed to move from {source_path} to {dest_path} on backend {backend}" 

653 ) from e 

654 

655 def collect_dirs_and_files( 

656 self, 

657 base_dir: Union[str, Path], 

658 backend: str, 

659 recursive: bool = True 

660 ) -> Tuple[List[str], List[str]]: 

661 """ 

662 Collect all valid directories and files starting from base_dir using breadth-first traversal. 

663 

664 Returns: 

665 (dirs, files): Lists of string paths for directories and files 

666 """ 

667 from collections import deque 

668 

669 base_dir = str(base_dir) 

670 # Use deque for breadth-first traversal (FIFO instead of LIFO) 

671 queue = deque([base_dir]) 

672 dirs: List[str] = [] 

673 files: List[str] = [] 

674 

675 while queue: 

676 current_path = queue.popleft() # FIFO for breadth-first 

677 

678 try: 

679 entries = self.list_dir(current_path, backend) 

680 dirs.append(current_path) 

681 except (NotADirectoryError, FileNotFoundError): 

682 files.append(current_path) 

683 continue 

684 except Exception as e: 

685 print(f"[collect_dirs_and_files] Unexpected error at {current_path}: {type(e).__name__}{e}") 

686 continue # Fail-safe: skip unexpected issues 

687 

688 if entries is None: 

689 # Defensive fallback — entries must be iterable 

690 print(f"[collect_dirs_and_files] WARNING: list_dir() returned None at {current_path}") 

691 continue 

692 

693 for entry in entries: 

694 full_path = str(Path(current_path) / entry) 

695 try: 

696 self.list_dir(full_path, backend) 

697 dirs.append(full_path) 

698 if recursive: 

699 queue.append(full_path) # Add to end of queue for breadth-first 

700 except (NotADirectoryError, FileNotFoundError): 

701 files.append(full_path) 

702 except Exception as e: 

703 print(f"[collect_dirs_and_files] Skipping {full_path}: {type(e).__name__}{e}") 

704 continue 

705 

706 # Apply natural sorting to both dirs and files before returning 

707 from openhcs.core.utils import natural_sort 

708 return natural_sort(dirs), natural_sort(files) 

709 

710 def is_file(self, path: Union[str, Path], backend: str) -> bool: 

711 """ 

712 Check if a given path is a file using the specified backend. 

713 

714 Args: 

715 path: Path to check (raw string or Path) 

716 backend: Backend key ('disk', 'memory', 'zarr') — must be positional 

717 

718 Returns: 

719 bool: True if the path is a file, False otherwise (including if path doesn't exist) 

720 """ 

721 try: 

722 backend_instance = self._get_backend(backend) 

723 return backend_instance.is_file(path) 

724 except Exception: 

725 # Return False for any error (file not found, is a directory, backend issues) 

726 return False 

727 

728 def is_dir(self, path: Union[str, Path], backend: str) -> bool: 

729 """ 

730 Check if a given path is a directory using the specified backend. 

731 

732 Args: 

733 path: Path to check (raw string or Path) 

734 backend: Backend key ('disk', 'memory', 'zarr') — must be positional 

735 

736 Returns: 

737 bool: True if the path is a directory, False if it's a file or doesn't exist 

738 

739 Raises: 

740 StorageResolutionError: If resolution fails or backend misbehaves 

741 """ 

742 try: 

743 backend_instance = self._get_backend(backend) 

744 return backend_instance.is_dir(path) 

745 except (FileNotFoundError, NotADirectoryError): 

746 # Return False for files or non-existent paths instead of raising 

747 return False 

748 except Exception as e: 

749 raise StorageResolutionError( 

750 f"Failed to check if {path} is a directory with backend '{backend}'" 

751 ) from e 

752 

753 def is_symlink(self, path: Union[str, Path], backend: str) -> bool: 

754 """ 

755 Check if a given path is a symbolic link using the specified backend. 

756 

757 Args: 

758 path: Path to check (raw string or Path) 

759 backend: Backend key ('disk', 'memory', 'zarr') — must be positional 

760 

761 Returns: 

762 bool: True if the path is a symbolic link, False otherwise (including if path doesn't exist) 

763 """ 

764 try: 

765 backend_instance = self._get_backend(backend) 

766 return backend_instance.is_symlink(str(path)) 

767 except Exception: 

768 # Return False for any error (file not found, not a symlink, backend issues) 

769 return False 

770