Coverage for openhcs/io/filemanager.py: 56.2%

220 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1""" 

2FileManager directory operations. 

3 

4This module contains the directory-related methods of the FileManager class, 

5including directory listing, existence checking, mkdir, symlink, and mirror operations. 

6""" 

7 

8import logging 

9import os 

10from pathlib import Path 

11from typing import List, Set, Union, Tuple, Optional, Any 

12 

13from openhcs.constants.constants import DEFAULT_IMAGE_EXTENSIONS 

14from openhcs.io.base import DataSink 

15from openhcs.io.exceptions import PathMismatchError, StorageResolutionError 

16from openhcs.validation import validate_path_types, validate_backend_parameter 

17import traceback 

18 

19logger = logging.getLogger(__name__) 

20 

21class FileManager: 

22 

23 def __init__(self, registry): 

24 """ 

25 Initialize the file manager. 

26 

27 Args: 

28 registry: Registry for storage backends. Must be provided. 

29 Now accepts Dict[str, DataSink] (includes StorageBackend and StreamingBackend) 

30 

31 Raises: 

32 ValueError: If registry is not provided. 

33 

34 Note: 

35 This class is a backend-agnostic router. It maintains no default backend 

36 or fallback behavior, and all state is instance-local and declarative. 

37 Each operation must explicitly specify which backend to use. 

38 

39 Thread Safety: 

40 Each FileManager instance must be scoped to a single execution context. 

41 Do NOT share FileManager instances across pipelines or threads. 

42 For isolation, create a dedicated registry for each FileManager. 

43 """ 

44 # Validate registry parameter 

45 if registry is None: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 raise ValueError("Registry must be provided to FileManager. Default fallback has been removed.") 

47 

48 # Store registry 

49 self.registry = registry 

50 

51 

52 

53 logger.debug("FileManager initialized with registry") 

54 

55 def _get_backend(self, backend_name: str) -> DataSink: 

56 """ 

57 Get a backend by name. 

58 

59 This method uses the instance registry to get the backend instance directly. 

60 All FileManagers that use the same registry share the same backend instances. 

61 

62 Returns DataSink (base interface) - could be StorageBackend or StreamingBackend. 

63 Load operations will fail-loud on StreamingBackend (no load method). 

64 

65 Args: 

66 backend_name: Name of the backend to get (e.g., "disk", "memory", "zarr") 

67 

68 Returns: 

69 The backend instance (DataSink - polymorphic) 

70 

71 Raises: 

72 StorageResolutionError: If the backend is not found in the registry 

73 

74 Thread Safety: 

75 Backend instances are shared across all FileManager instances that use 

76 the same registry. This ensures shared state (especially for memory backend). 

77 """ 

78 # Normalize backend name 

79 backend_name = backend_name.lower() 

80 

81 if backend_name is None: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 raise StorageResolutionError(f"Backend '{backend_name}' not found in registry") 

83 

84 try: 

85 # Get the backend instance from the registry dictionary 

86 if backend_name not in self.registry: 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 raise KeyError(f"Backend '{backend_name}' not found in registry") 

88 

89 # Return the backend instance directly 

90 return self.registry[backend_name] 

91 except Exception as e: 

92 raise StorageResolutionError(f"Failed to get backend '{backend_name}': {e}") from e 

93 

94 def load(self, file_path: Union[str, Path], backend: str, **kwargs) -> Any: 

95 """ 

96 Load data from a file using the specified backend. 

97 

98 This method assumes the file path is already backend-compatible and performs no inference or fallback. 

99 All semantic validation and file format decoding must occur within the backend. 

100 

101 Args: 

102 file_path: Path to the file to load (str or Path) 

103 backend: Backend enum to use for loading (StorageBackendType.DISK, etc.) — POSITIONAL argument 

104 **kwargs: Additional keyword arguments passed to the backend's load method 

105 

106 Returns: 

107 Any: The loaded data object 

108 

109 Raises: 

110 StorageResolutionError: If the backend is not supported or load fails 

111 """ 

112 

113 try: 

114 backend_instance = self._get_backend(backend) 

115 return backend_instance.load(file_path, **kwargs) 

116 except StorageResolutionError: # Allow specific backend errors to propagate 

117 raise 

118 except Exception as e: 

119 logger.error(f"Unexpected error during load from {file_path} with backend {backend}: {e}", exc_info=True) 

120 raise StorageResolutionError( 

121 f"Failed to load file at {file_path} using backend '{backend}'" 

122 ) from e 

123 

124 def save(self, data: Any, output_path: Union[str, Path], backend: str, **kwargs) -> None: 

125 """ 

126 Save data to a file using the specified backend. 

127 

128 This method performs no semantic transformation, format inference, or fallback logic. 

129 It assumes the output path and data are valid and structurally aligned with the backend’s expectations. 

130 

131 Args: 

132 data: The data object to save (e.g., np.ndarray, torch.Tensor, dict, etc.) 

133 output_path: Destination path to write to (str or Path) 

134 backend: Backend enum to use for saving (StorageBackendType.DISK, etc.) — POSITIONAL argument 

135 **kwargs: Additional keyword arguments passed to the backend's save method 

136 

137 Raises: 

138 StorageResolutionError: If the backend is not supported or save fails 

139 """ 

140 

141 try: 

142 backend_instance = self._get_backend(backend) 

143 backend_instance.save(data, output_path, **kwargs) 

144 except StorageResolutionError: # Allow specific backend errors to propagate if they are StorageResolutionError 

145 raise 

146 except Exception as e: 

147 logger.error(f"Unexpected error during save to {output_path} with backend {backend}: {e}", exc_info=True) 

148 raise StorageResolutionError( 

149 f"Failed to save data to {output_path} using backend '{backend}'" 

150 ) from e 

151 

152 def load_batch(self, file_paths: List[Union[str, Path]], backend: str, **kwargs) -> List[Any]: 

153 """ 

154 Load multiple files using the specified backend. 

155 

156 Args: 

157 file_paths: List of file paths to load 

158 backend: Backend to use for loading 

159 **kwargs: Additional keyword arguments passed to the backend's load_batch method 

160 

161 Returns: 

162 List of loaded data objects in the same order as file_paths 

163 

164 Raises: 

165 StorageResolutionError: If the backend is not supported or load fails 

166 """ 

167 try: 

168 backend_instance = self._get_backend(backend) 

169 return backend_instance.load_batch(file_paths, **kwargs) 

170 except StorageResolutionError: 

171 raise 

172 except Exception as e: 

173 logger.error(f"Unexpected error during batch load with backend {backend}: {e}", exc_info=True) 

174 raise StorageResolutionError( 

175 f"Failed to load batch of {len(file_paths)} files using backend '{backend}'" 

176 ) from e 

177 

178 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], backend: str, **kwargs) -> None: 

179 """ 

180 Save multiple data objects using the specified backend. 

181 

182 Args: 

183 data_list: List of data objects to save 

184 output_paths: List of destination paths (must match length of data_list) 

185 backend: Backend to use for saving 

186 **kwargs: Additional keyword arguments passed to the backend's save_batch method 

187 

188 Raises: 

189 StorageResolutionError: If the backend is not supported or save fails 

190 ValueError: If data_list and output_paths have different lengths 

191 """ 

192 try: 

193 backend_instance = self._get_backend(backend) 

194 backend_instance.save_batch(data_list, output_paths, **kwargs) 

195 except StorageResolutionError: 

196 raise 

197 except Exception as e: 

198 logger.error(f"Unexpected error during batch save with backend {backend}: {e}", exc_info=True) 

199 raise StorageResolutionError( 

200 f"Failed to save batch of {len(data_list)} files using backend '{backend}'" 

201 ) from e 

202 

203 def list_image_files(self, directory: Union[str, Path], backend: str, 

204 pattern: str = None, extensions: Set[str] = DEFAULT_IMAGE_EXTENSIONS, recursive: bool = False) -> List[str]: 

205 """ 

206 List all image files in a directory using the specified backend. 

207 

208 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

209 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

210 

211 Note: ONLY backend is a POSITIONAL argument. Other parameters may remain as kwargs. 

212 

213 Args: 

214 directory: Directory to search (str or Path) 

215 backend: Backend to use for listing ('disk', 'memory', 'zarr') - POSITIONAL 

216 pattern: Pattern to filter files (e.g., "*.tif") - can be keyword arg 

217 extensions: Set of file extensions to filter by - can be keyword arg 

218 recursive: Whether to search recursively - can be keyword arg 

219 

220 Returns: 

221 List of string paths for image files found 

222 

223 Raises: 

224 StorageResolutionError: If the backend is not supported 

225 TypeError: If directory is not a valid path type 

226 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

227 """ 

228 # Get backend instance 

229 backend_instance = self._get_backend(backend) 

230 

231 # List image files and apply natural sorting 

232 from openhcs.core.utils import natural_sort 

233 files = backend_instance.list_files(str(directory), pattern, extensions, recursive) 

234 return natural_sort(files) 

235 

236 

237 def list_files(self, directory: Union[str, Path], backend: str, 

238 pattern: str = None, extensions: Set[str] = None, recursive: bool = False) -> List[str]: 

239 """ 

240 List all files in a directory using the specified backend. 

241 

242 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

243 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

244 

245 Note: ONLY backend is a POSITIONAL argument. Other parameters may remain as kwargs. 

246 

247 Args: 

248 directory: Directory to search (str or Path) 

249 backend: Backend to use for listing ('disk', 'memory', 'zarr') - POSITIONAL 

250 pattern: Pattern to filter files (e.g., "*.txt") - can be keyword arg 

251 extensions: Set of file extensions to filter by - can be keyword arg 

252 recursive: Whether to search recursively - can be keyword arg 

253 

254 Returns: 

255 List of string paths for files found 

256 

257 Raises: 

258 StorageResolutionError: If the backend is not supported 

259 TypeError: If directory is not a valid path type 

260 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

261 """ 

262 # Get backend instance 

263 backend_instance = self._get_backend(backend) 

264 

265 # List files and apply natural sorting 

266 from openhcs.core.utils import natural_sort 

267 files = backend_instance.list_files(str(directory), pattern, extensions, recursive) 

268 return natural_sort(files) 

269 

270 

271 def find_file_recursive(self, directory: Union[str, Path], filename: str, backend: str) -> Union[str, None]: 

272 """ 

273 Find a file recursively in a directory using the specified backend. 

274 

275 This is a convenience method that uses list_files with recursive=True and filters for the specific filename. 

276 

277 Args: 

278 directory: Directory to search (str or Path) 

279 filename: Name of the file to find 

280 backend: Backend to use for listing ('disk', 'memory', 'zarr') - POSITIONAL 

281 

282 Returns: 

283 String path to the file if found, None otherwise 

284 

285 Raises: 

286 StorageResolutionError: If the backend is not supported 

287 TypeError: If directory is not a valid path type 

288 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

289 """ 

290 # List all files recursively 

291 all_files = self.list_files(directory, backend, recursive=True) 

292 

293 # Filter for the specific filename 

294 for file_path in all_files: 

295 if Path(file_path).name == filename: 

296 return file_path 

297 

298 # File not found 

299 return None 

300 

301 

302 def list_dir(self, path: Union[str, Path], backend: str) -> List[str]: 

303 if not isinstance(path, (str, Path)): 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 raise TypeError(f"Expected str or Path, got {type(path)}") 

305 

306 path = str(path) 

307 backend_instance = self._get_backend(backend) 

308 

309 try: 

310 # Get directory listing and apply natural sorting 

311 from openhcs.core.utils import natural_sort 

312 entries = backend_instance.list_dir(str(path)) 

313 return natural_sort(entries) 

314 except (FileNotFoundError, NotADirectoryError): 

315 # Let these bubble up for structural truth-checking 

316 raise 

317 except Exception as e: 

318 # Optional trace wrapper, no type mutation 

319 raise RuntimeError(f"Unexpected failure in list_dir({path}) for backend {backend}") from e 

320 

321 def ensure_directory(self, directory: Union[str, Path], backend: str) -> str: 

322 """ 

323 Ensure a directory exists, creating it if necessary. 

324 

325 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

326 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

327 

328 Note: ONLY backend is a POSITIONAL argument. All parameters are required. 

329 

330 Args: 

331 directory: Directory to ensure exists (str or Path) 

332 backend: Backend to use for directory operations ('disk', 'memory', 'zarr') - POSITIONAL 

333 

334 Returns: 

335 String path to the directory 

336 

337 Raises: 

338 StorageResolutionError: If the backend is not supported 

339 TypeError: If directory is not a valid path type 

340 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

341 """ 

342 # Get backend instance 

343 backend_instance = self._get_backend(backend) 

344 

345 # Ensure directory 

346 return backend_instance.ensure_directory(str(directory)) 

347 

348 

349 

350 def exists(self, path: Union[str, Path], backend: str) -> bool: 

351 """ 

352 Check if a path exists. 

353 

354 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

355 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

356 

357 Note: ONLY backend is a POSITIONAL argument. All parameters are required. 

358 

359 Args: 

360 path: Path to check (str or Path) 

361 backend: Backend to use for checking ('disk', 'memory', 'zarr') - POSITIONAL 

362 

363 Returns: 

364 True if the path exists, False otherwise 

365 

366 Raises: 

367 StorageResolutionError: If the backend is not supported 

368 TypeError: If path is not a valid path type 

369 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

370 """ 

371 # Get backend instance 

372 backend_instance = self._get_backend(backend) 

373 

374 # Check if path exists 

375 return backend_instance.exists(str(path)) 

376 

377 

378 def mirror_directory_with_symlinks( 

379 self, 

380 source_dir: Union[str, Path], 

381 target_dir: Union[str, Path], 

382 backend: str, 

383 recursive: bool = True, 

384 overwrite_symlinks_only: bool = False 

385 ) -> int: 

386 """ 

387 Mirror a directory structure from source to target and create symlinks to all files. 

388 

389 This method performs no semantic validation, normalization, or naming enforcement on the input paths. 

390 It assumes the caller has provided valid, backend-compatible paths and merely dispatches them for execution. 

391 

392 By default, this method will NOT overwrite existing files. Use overwrite_symlinks_only=True to allow 

393 overwriting existing symlinks (but not regular files). 

394 

395 Note: ONLY backend is a POSITIONAL argument. Other parameters may remain as kwargs. 

396 

397 Args: 

398 source_dir: Path to the source directory to mirror (str or Path) 

399 target_dir: Path to the target directory where the mirrored structure will be created (str or Path) 

400 backend: Backend to use for mirroring ('disk', 'memory', 'zarr') - POSITIONAL 

401 recursive: Whether to recursively mirror subdirectories - can be keyword arg 

402 overwrite_symlinks_only: If True, allows overwriting existing symlinks but blocks overwriting regular files. 

403 If False (default), no overwriting is allowed. - can be keyword arg 

404 

405 Returns: 

406 int: Number of symlinks created 

407 

408 Raises: 

409 StorageResolutionError: If the backend is not supported 

410 FileExistsError: If target files exist and overwrite_symlinks_only=False, or if trying to overwrite regular files 

411 TypeError: If source_dir or target_dir is not a valid path type 

412 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

413 """ 

414 # Get backend instance 

415 backend_instance = self._get_backend(backend) 

416 # Mirror the directory structure and create symlinks for files recursively 

417 self.ensure_directory(target_dir, backend) 

418 try: 

419 # Ensure target directory exists 

420 

421 # Count symlinks 

422 symlink_count = 0 

423 

424 # Get all directories under source_dir (including source_dir itself) 

425 

426 _, all_files = self.collect_dirs_and_files(source_dir, backend, recursive=True) 

427 

428 # 1. Ensure base target exists 

429 self.ensure_directory(target_dir, backend) 

430 

431 # 2. Symlink all file paths 

432 for file_path in all_files: 

433 rel_path = Path(file_path).relative_to(Path(source_dir)) 

434 symlink_path = Path(target_dir) / rel_path 

435 self.create_symlink(file_path, str(symlink_path), backend, overwrite_symlinks_only=overwrite_symlinks_only) 

436 symlink_count += 1 

437 

438 return symlink_count 

439 

440 except Exception as e: 

441 raise StorageResolutionError(f"Failed to mirror directory {source_dir} to {target_dir} with backend {backend}") from e 

442 

443 def create_symlink( 

444 self, 

445 source_path: Union[str, Path], 

446 symlink_path: Union[str, Path], 

447 backend: str, 

448 overwrite_symlinks_only: bool = False 

449 ) -> bool: 

450 """ 

451 Create a symbolic link from source_path to symlink_path. 

452 

453 This method performs no semantic validation, normalization, or naming enforcement on the input paths. 

454 It assumes the caller has provided valid, backend-compatible paths and merely dispatches them for execution. 

455 

456 Note: ONLY backend is a POSITIONAL argument. All parameters are required. 

457 

458 Args: 

459 source_path: Path to the source file or directory (str or Path) 

460 symlink_path: Path where the symlink should be created (str or Path) 

461 backend: Backend to use for symlink creation ('disk', 'memory', 'zarr') - POSITIONAL 

462 overwrite_symlinks_only: If True, only allow overwriting existing symlinks (not regular files) 

463 

464 Returns: 

465 bool: True if successful, False otherwise 

466 

467 Raises: 

468 StorageResolutionError: If the backend is not supported 

469 FileExistsError: If target exists and is not a symlink when overwrite_symlinks_only=True 

470 VFSTypeError: If source_path or symlink_path cannot be converted to internal path format 

471 PathMismatchError: If the path scheme doesn't match the expected scheme for the backend 

472 """ 

473 # Get backend instance 

474 backend_instance = self._get_backend(backend) 

475 

476 # Check if target exists and handle overwrite policy 

477 try: 

478 if backend_instance.exists(str(symlink_path)): 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 if overwrite_symlinks_only: 

480 # Check if existing target is a symlink 

481 if not self.is_symlink(symlink_path, backend): 

482 raise FileExistsError( 

483 f"Target exists and is not a symlink (overwrite_symlinks_only=True): {symlink_path}" 

484 ) 

485 # Target is a symlink, allow overwrite 

486 backend_instance.create_symlink(str(source_path), str(symlink_path), overwrite=True) 

487 else: 

488 # No overwrite allowed 

489 raise FileExistsError(f"Target already exists: {symlink_path}") 

490 else: 

491 # Target doesn't exist, create new symlink 

492 backend_instance.create_symlink(str(source_path), str(symlink_path), overwrite=False) 

493 

494 return True 

495 except FileExistsError: 

496 # Re-raise FileExistsError from our check or from backend 

497 raise 

498 except Exception as e: 

499 raise StorageResolutionError( 

500 f"Failed to create symlink from {source_path} to {symlink_path} with backend {backend}" 

501 ) from e 

502 

503 def delete(self, path: Union[str, Path], backend: str, recursive: bool = False) -> bool: 

504 """ 

505 Delete a file or directory. 

506 

507 This method performs no semantic validation, normalization, or naming enforcement on the input path. 

508 It assumes the caller has provided a valid, backend-compatible path and merely dispatches it for execution. 

509 

510 Note: ONLY backend is a POSITIONAL argument. All parameters are required. 

511 

512 Args: 

513 path: Path to the file or directory to delete (str or Path) 

514 backend: Backend to use for deletion ('disk', 'memory', 'zarr') - POSITIONAL 

515 

516 Returns: 

517 True if successful, False otherwise 

518 

519 Raises: 

520 StorageResolutionError: If the backend is not supported 

521 FileNotFoundError: If the file does not exist 

522 TypeError: If the path is not a valid path type 

523 """ 

524 # Get backend instance 

525 backend_instance = self._get_backend(backend) 

526 

527 # Delete the file or directory 

528 try: 

529 # No virtual path conversion needed 

530 return backend_instance.delete(str(path)) 

531 except Exception as e: 

532 raise StorageResolutionError( 

533 f"Failed to delete {path} with backend {backend}" 

534 ) from e 

535 

536 def delete_all(self, path: Union[str, Path], backend: str) -> bool: 

537 """ 

538 Recursively delete a file, symlink, or directory at the given path. 

539  

540 This method performs no fallback, coercion, or resolution — it dispatches to the backend. 

541 All resolution and deletion behavior must be encoded in the backend's `delete_all()` method. 

542  

543 Args: 

544 path: The path to delete 

545 backend: The backend key (e.g., 'disk', 'memory', 'zarr') 

546  

547 Returns: 

548 True if successful 

549  

550 Raises: 

551 StorageResolutionError: If the backend operation fails 

552 FileNotFoundError: If the path does not exist 

553 TypeError: If the path is not a str or Path 

554 """ 

555 backend_instance = self._get_backend(backend) 

556 path_str = str(path) 

557 

558 try: 

559 backend_instance.delete_all(path_str) 

560 return True 

561 except Exception as e: 

562 raise StorageResolutionError( 

563 f"Failed to delete_all({path_str}) using backend '{backend}'" 

564 ) from e 

565 

566 

567 def copy(self, source_path: Union[str, Path], dest_path: Union[str, Path], backend: str) -> bool: 

568 """ 

569 Copy a file, directory, or symlink from source_path to dest_path using the given backend. 

570 

571 - Will NOT overwrite existing files/directories. 

572 - Handles symlinks as first-class objects (not dereferenced). 

573 - Raises on broken links or mismatched structure. 

574 

575 Raises: 

576 FileExistsError: If destination exists 

577 FileNotFoundError: If source does not exist 

578 StorageResolutionError: On backend failure 

579 """ 

580 backend_instance = self._get_backend(backend) 

581 

582 try: 

583 # Prevent overwriting 

584 if backend_instance.exists(dest_path): 

585 raise FileExistsError(f"Destination already exists: {dest_path}") 

586 

587 # Ensure destination parent exists 

588 dest_parent = Path(dest_path).parent 

589 self.ensure_directory(dest_parent, backend) 

590 

591 # Delegate to backend-native copy 

592 return backend_instance.copy(str(source_path), str(dest_path)) 

593 except Exception as e: 

594 raise StorageResolutionError( 

595 f"Failed to copy from {source_path} to {dest_path} on backend {backend}" 

596 ) from e 

597 

598 

599 def move(self, source_path: Union[str, Path], dest_path: Union[str, Path], backend: str, 

600 replace_symlinks: bool = False) -> bool: 

601 """ 

602 Move a file, directory, or symlink from source_path to dest_path. 

603 

604 - Will NOT overwrite by default. 

605 - Preserves symbolic identity (moves links as links). 

606 - Uses backend-native move if available. 

607 - Can optionally replace existing symlinks when replace_symlinks=True. 

608 

609 Args: 

610 source_path: Source file or directory path 

611 dest_path: Destination file or directory path 

612 backend: Backend to use for the operation 

613 replace_symlinks: If True, allows overwriting existing symlinks at destination. 

614 If False (default), raises FileExistsError if destination exists. 

615 

616 Raises: 

617 FileExistsError: If destination exists and replace_symlinks=False, or if 

618 destination exists and is not a symlink when replace_symlinks=True 

619 FileNotFoundError: If source is missing 

620 StorageResolutionError: On backend failure 

621 """ 

622 backend_instance = self._get_backend(backend) 

623 

624 try: 

625 # Handle destination existence based on replace_symlinks setting 

626 if backend_instance.exists(dest_path): 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true

627 if replace_symlinks: 

628 # Check if destination is a symlink 

629 if backend_instance.is_symlink(dest_path): 

630 logger.debug("Destination is a symlink, removing before move: %s", dest_path) 

631 backend_instance.delete(dest_path) 

632 else: 

633 # Destination exists but is not a symlink 

634 raise FileExistsError(f"Destination already exists and is not a symlink: {dest_path}") 

635 else: 

636 # replace_symlinks=False, don't allow any overwriting 

637 raise FileExistsError(f"Destination already exists: {dest_path}") 

638 

639 dest_parent = Path(dest_path).parent 

640 self.ensure_directory(dest_parent, backend) 

641 return backend_instance.move(str(source_path), str(dest_path)) 

642 

643 except Exception as e: 

644 raise StorageResolutionError( 

645 f"Failed to move from {source_path} to {dest_path} on backend {backend}" 

646 ) from e 

647 

648 def collect_dirs_and_files( 

649 self, 

650 base_dir: Union[str, Path], 

651 backend: str, 

652 recursive: bool = True 

653 ) -> Tuple[List[str], List[str]]: 

654 """ 

655 Collect all valid directories and files starting from base_dir using breadth-first traversal. 

656 

657 Returns: 

658 (dirs, files): Lists of string paths for directories and files 

659 """ 

660 from collections import deque 

661 

662 base_dir = str(base_dir) 

663 # Use deque for breadth-first traversal (FIFO instead of LIFO) 

664 queue = deque([base_dir]) 

665 dirs: List[str] = [] 

666 files: List[str] = [] 

667 

668 while queue: 

669 current_path = queue.popleft() # FIFO for breadth-first 

670 

671 try: 

672 entries = self.list_dir(current_path, backend) 

673 dirs.append(current_path) 

674 except (NotADirectoryError, FileNotFoundError): 

675 files.append(current_path) 

676 continue 

677 except Exception as e: 

678 print(f"[collect_dirs_and_files] Unexpected error at {current_path}: {type(e).__name__}{e}") 

679 continue # Fail-safe: skip unexpected issues 

680 

681 if entries is None: 681 ↛ 683line 681 didn't jump to line 683 because the condition on line 681 was never true

682 # Defensive fallback — entries must be iterable 

683 print(f"[collect_dirs_and_files] WARNING: list_dir() returned None at {current_path}") 

684 continue 

685 

686 for entry in entries: 

687 full_path = str(Path(current_path) / entry) 

688 try: 

689 self.list_dir(full_path, backend) 

690 dirs.append(full_path) 

691 if recursive: 691 ↛ 686line 691 didn't jump to line 686 because the condition on line 691 was always true

692 queue.append(full_path) # Add to end of queue for breadth-first 

693 except (NotADirectoryError, FileNotFoundError): 

694 files.append(full_path) 

695 except Exception as e: 

696 print(f"[collect_dirs_and_files] Skipping {full_path}: {type(e).__name__}{e}") 

697 continue 

698 

699 # Apply natural sorting to both dirs and files before returning 

700 from openhcs.core.utils import natural_sort 

701 return natural_sort(dirs), natural_sort(files) 

702 

703 def is_file(self, path: Union[str, Path], backend: str) -> bool: 

704 """ 

705 Check if a given path is a file using the specified backend. 

706 

707 Args: 

708 path: Path to check (raw string or Path) 

709 backend: Backend key ('disk', 'memory', 'zarr') — must be positional 

710 

711 Returns: 

712 bool: True if the path is a file, False otherwise (including if path doesn't exist) 

713 """ 

714 try: 

715 backend_instance = self._get_backend(backend) 

716 return backend_instance.is_file(path) 

717 except Exception: 

718 # Return False for any error (file not found, is a directory, backend issues) 

719 return False 

720 

721 def is_dir(self, path: Union[str, Path], backend: str) -> bool: 

722 """ 

723 Check if a given path is a directory using the specified backend. 

724 

725 Args: 

726 path: Path to check (raw string or Path) 

727 backend: Backend key ('disk', 'memory', 'zarr') — must be positional 

728 

729 Returns: 

730 bool: True if the path is a directory, False if it's a file or doesn't exist 

731 

732 Raises: 

733 StorageResolutionError: If resolution fails or backend misbehaves 

734 """ 

735 try: 

736 backend_instance = self._get_backend(backend) 

737 return backend_instance.is_dir(path) 

738 except (FileNotFoundError, NotADirectoryError): 

739 # Return False for files or non-existent paths instead of raising 

740 return False 

741 except Exception as e: 

742 raise StorageResolutionError( 

743 f"Failed to check if {path} is a directory with backend '{backend}'" 

744 ) from e 

745 

746 def is_symlink(self, path: Union[str, Path], backend: str) -> bool: 

747 """ 

748 Check if a given path is a symbolic link using the specified backend. 

749 

750 Args: 

751 path: Path to check (raw string or Path) 

752 backend: Backend key ('disk', 'memory', 'zarr') — must be positional 

753 

754 Returns: 

755 bool: True if the path is a symbolic link, False otherwise (including if path doesn't exist) 

756 """ 

757 try: 

758 backend_instance = self._get_backend(backend) 

759 return backend_instance.is_symlink(str(path)) 

760 except Exception: 

761 # Return False for any error (file not found, not a symlink, backend issues) 

762 return False 

763