Coverage for src/polystore/disk.py: 69%

362 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 06:58 +0000

1# polystore/disk.py 

2""" 

3Disk-based storage backend implementation. 

4 

5This module provides a concrete implementation of the storage backend interfaces 

6for local disk storage. 

7""" 

8 

9import logging 

10import os 

11import shutil 

12from pathlib import Path 

13from typing import Any, Callable, Dict, List, Optional, Set, Union 

14 

15import numpy as np 

16 

17from .formats import FileFormat 

18from .base import StorageBackend 

19from .lazy_imports import get_torch, get_jax, get_jnp, get_cupy, get_tf 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24def optional_import(module_name): 

25 try: 

26 return __import__(module_name) 

27 except ImportError: 

28 return None 

29 

30# Optional dependencies at module level (not instance level to avoid pickle issues) 

31# Skip GPU libraries if running in no-GPU mode 

32if os.getenv('POLYSTORE_NO_GPU') == '1': 

33 torch = None 

34 jax = None 

35 jnp = None 

36 cupy = None 

37 tf = None 

38 logger.info("No-GPU mode - skipping GPU library imports in disk backend") 

39else: 

40 torch = get_torch() 

41 jax = get_jax() 

42 jnp = get_jnp() 

43 cupy = get_cupy() 

44 tf = get_tf() 

45tifffile = optional_import("tifffile") 

46 

47class FileFormatRegistry: 

48 def __init__(self): 

49 self._writers: Dict[str, Callable[[Path, Any], None]] = {} 

50 self._readers: Dict[str, Callable[[Path], Any]] = {} 

51 

52 def register(self, ext: str, writer: Callable, reader: Callable): 

53 ext = ext.lower() 

54 self._writers[ext] = writer 

55 self._readers[ext] = reader 

56 

57 def get_writer(self, ext: str) -> Callable: 

58 return self._writers[ext.lower()] 

59 

60 def get_reader(self, ext: str) -> Callable: 

61 return self._readers[ext.lower()] 

62 

63 def is_registered(self, ext: str) -> bool: 

64 return ext.lower() in self._writers and ext.lower() in self._readers 

65 

66 

67class DiskBackend(StorageBackend): 

68 """Disk storage backend with automatic registration.""" 

69 _backend_type = "disk" 

70 def __init__(self): 

71 self.format_registry = FileFormatRegistry() 

72 self._register_formats() 

73 

74 def _register_formats(self): 

75 """ 

76 Register all file format handlers. 

77 

78 Uses enum-driven registration to eliminate boilerplate. 

79 Complex formats (CSV, JSON, TIFF, ROI.ZIP, TEXT) use custom handlers. 

80 Simple formats (NumPy, Torch, CuPy, JAX, TensorFlow) use library save/load directly. 

81 """ 

82 # Format handler metadata: (FileFormat enum, module_check, writer, reader) 

83 # None for writer/reader means use the format's library save/load directly 

84 format_handlers = [ 

85 # Simple formats - use library save/load directly 

86 (FileFormat.NUMPY, True, np.save, np.load), 

87 (FileFormat.TORCH, torch, torch.save if torch else None, torch.load if torch else None), 

88 (FileFormat.JAX, (jax and jnp), self._jax_writer, self._jax_reader), 

89 (FileFormat.CUPY, cupy, self._cupy_writer, self._cupy_reader), 

90 (FileFormat.TENSORFLOW, tf, self._tensorflow_writer, self._tensorflow_reader), 

91 

92 # Complex formats - use custom handlers 

93 (FileFormat.TIFF, tifffile, self._tiff_writer, self._tiff_reader), 

94 (FileFormat.TEXT, True, self._text_writer, self._text_reader), 

95 (FileFormat.JSON, True, self._json_writer, self._json_reader), 

96 (FileFormat.CSV, True, self._csv_writer, self._csv_reader), 

97 (FileFormat.ROI, True, self._roi_zip_writer, self._roi_zip_reader), 

98 ] 

99 

100 # Register all available formats 

101 for file_format, module_available, writer, reader in format_handlers: 

102 if not module_available or writer is None or reader is None: 

103 continue 

104 

105 # Register all extensions for this format 

106 for ext in file_format.extensions: 

107 self.format_registry.register(ext.lower(), writer, reader) 

108 

109 # Format-specific writer/reader functions (pickleable) 

110 # Only needed for formats that require special handling beyond library save/load 

111 

112 def _jax_writer(self, path, data, **kwargs): 

113 """JAX arrays must be moved to CPU before saving.""" 

114 np.save(path, jax.device_get(data)) 

115 

116 def _jax_reader(self, path): 

117 """Load NumPy array and convert to JAX.""" 

118 return jnp.array(np.load(path)) 

119 

120 def _cupy_writer(self, path, data, **kwargs): 

121 """CuPy has its own save format.""" 

122 cupy.save(path, data) 

123 

124 def _cupy_reader(self, path): 

125 """Load CuPy array from disk.""" 

126 return cupy.load(path) 

127 

128 def _tensorflow_writer(self, path, data, **kwargs): 

129 """TensorFlow uses tensor serialization.""" 

130 tf.io.write_file(path.as_posix(), tf.io.serialize_tensor(data)) 

131 

132 def _tensorflow_reader(self, path): 

133 """Load and deserialize TensorFlow tensor.""" 

134 return tf.io.parse_tensor(tf.io.read_file(path.as_posix()), out_type=tf.dtypes.float32) 

135 

136 def _tiff_writer(self, path, data, **kwargs): 

137 tifffile.imwrite(path, data) 

138 

139 def _tiff_reader(self, path): 

140 # For symlinks, try multiple approaches to handle filesystem issues 

141 path_obj = Path(path) 

142 

143 if path_obj.is_symlink(): 

144 # First try reading the symlink directly (let OS handle it) 

145 try: 

146 return tifffile.imread(str(path)) 

147 except FileNotFoundError: 

148 # If that fails, try the target path 

149 try: 

150 target_path = path_obj.readlink() 

151 return tifffile.imread(str(target_path)) 

152 except FileNotFoundError: 

153 # If target doesn't exist, try resolving the symlink 

154 resolved_path = path_obj.resolve() 

155 return tifffile.imread(str(resolved_path)) 

156 else: 

157 return tifffile.imread(str(path)) 

158 

159 def _text_writer(self, path, data, **kwargs): 

160 """Write text data to file. Accepts and ignores extra kwargs for compatibility.""" 

161 path.write_text(str(data)) 

162 

163 def _text_reader(self, path): 

164 return path.read_text() 

165 

166 def _json_writer(self, path, data, **kwargs): 

167 import json 

168 # Ensure parent directory exists 

169 path.parent.mkdir(parents=True, exist_ok=True) 

170 path.write_text(json.dumps(data, indent=2)) 

171 

172 def _json_reader(self, path): 

173 import json 

174 return json.loads(path.read_text()) 

175 

176 def _csv_writer(self, path, data, **kwargs): 

177 import csv 

178 # Assume data is a list of rows or a dict 

179 with path.open('w', newline='') as f: 

180 if isinstance(data, dict): 

181 # Write dict as CSV with headers 

182 writer = csv.DictWriter(f, fieldnames=data.keys()) 

183 writer.writeheader() 

184 writer.writerow(data) 

185 elif isinstance(data, list) and len(data) > 0: 

186 if isinstance(data[0], dict): 

187 # List of dicts 

188 writer = csv.DictWriter(f, fieldnames=data[0].keys()) 

189 writer.writeheader() 

190 writer.writerows(data) 

191 else: 

192 # List of lists/tuples 

193 writer = csv.writer(f) 

194 writer.writerows(data) 

195 else: 

196 # Fallback: write as single row 

197 writer = csv.writer(f) 

198 writer.writerow([data]) 

199 

200 def _roi_zip_writer(self, path, data, **kwargs): 

201 """Write ROIs to .roi.zip archive. Wrapper for _save_rois.""" 

202 # data should be a list of ROI objects 

203 self._save_rois(data, path, **kwargs) 

204 

205 def _roi_zip_reader(self, path, **kwargs): 

206 """Read ROIs from .roi.zip archive.""" 

207 try: 

208 from openhcs.core.roi import load_rois_from_zip 

209 return load_rois_from_zip(path) 

210 except ImportError: 

211 raise ImportError("ROI support requires the openhcs package. Install with: pip install openhcs") 

212 

213 def _csv_reader(self, path): 

214 import csv 

215 with path.open('r', newline='') as f: 

216 reader = csv.DictReader(f) 

217 return list(reader) 

218 

219 

220 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

221 """ 

222 Load data from disk based on explicit content type. 

223 

224 Args: 

225 file_path: Path to the file to load 

226 **kwargs: Additional arguments for the load operation, must include 'content_type' 

227 to explicitly specify the type of content to load 

228 

229 Returns: 

230 The loaded data 

231 

232 Raises: 

233 TypeError: If file_path is not a valid path type or content_type is not specified 

234 FileNotFoundError: If the file does not exist 

235 ValueError: If the file cannot be loaded 

236 """ 

237 

238 disk_path = Path(file_path) 

239 

240 # Handle double extensions (e.g., .roi.zip, .csv.zip) 

241 # Check if file has double extension by looking at suffixes 

242 ext = None 

243 if len(disk_path.suffixes) >= 2: 

244 # Try double extension first (e.g., '.roi.zip') 

245 double_ext = ''.join(disk_path.suffixes[-2:]).lower() 

246 if self.format_registry.is_registered(double_ext): 

247 ext = double_ext 

248 

249 # Fall back to single extension if double extension not registered 

250 if ext is None: 

251 ext = disk_path.suffix.lower() 

252 

253 if not self.format_registry.is_registered(ext): 

254 raise ValueError(f"No writer registered for extension '{ext}'") 

255 

256 try: 

257 reader = self.format_registry.get_reader(ext) 

258 return reader(disk_path, **kwargs) 

259 except Exception as e: 

260 raise ValueError(f"Error loading data from {disk_path}: {e}") from e 

261 

262 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None: 

263 """ 

264 Save data to disk based on explicit content type. 

265 

266 Args: 

267 data: The data to save 

268 output_path: Path where the data should be saved 

269 **kwargs: Additional arguments for the save operation, must include 'content_type' 

270 to explicitly specify the type of content to save 

271 

272 Raises: 

273 TypeError: If output_path is not a valid path type or content_type is not specified 

274 ValueError: If the data cannot be saved 

275 """ 

276 disk_output_path = Path(output_path) 

277 

278 # Explicit type dispatch for ROI data (if openhcs is available) 

279 try: 

280 from openhcs.core.roi import ROI 

281 if isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI): 

282 # ROI data - save as JSON 

283 images_dir = kwargs.pop('images_dir', None) 

284 self._save_rois(data, disk_output_path, images_dir=images_dir, **kwargs) 

285 return 

286 except ImportError: 

287 pass # OpenHCS not available, skip ROI check 

288 

289 ext = disk_output_path.suffix.lower() 

290 if not self.format_registry.is_registered(ext): 

291 raise ValueError(f"No writer registered for extension '{ext}'") 

292 

293 try: 

294 writer = self.format_registry.get_writer(ext) 

295 return writer(disk_output_path, data, **kwargs ) 

296 except Exception as e: 

297 raise ValueError(f"Error saving data to {disk_output_path}: {e}") from e 

298 

299 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

300 """ 

301 Load multiple files sequentially using existing load method. 

302 

303 Args: 

304 file_paths: List of file paths to load 

305 **kwargs: Additional arguments passed to load method 

306 

307 Returns: 

308 List of loaded data objects in the same order as file_paths 

309 """ 

310 results = [] 

311 for file_path in file_paths: 

312 result = self.load(file_path, **kwargs) 

313 results.append(result) 

314 return results 

315 

316 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

317 """ 

318 Save multiple files sequentially using existing save method. 

319 

320 Converts GPU arrays to CPU numpy arrays before saving using OpenHCS memory conversion system. 

321 

322 Args: 

323 data_list: List of data objects to save 

324 output_paths: List of destination paths (must match length of data_list) 

325 **kwargs: Additional arguments passed to save method 

326 

327 Raises: 

328 ValueError: If data_list and output_paths have different lengths 

329 """ 

330 if len(data_list) != len(output_paths): 

331 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})") 

332 

333 # Save each data object using existing save method 

334 # GPU array conversions are handled by the individual format writers 

335 for data, output_path in zip(data_list, output_paths): 

336 self.save(data, output_path, **kwargs) 

337 

338 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None, 

339 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Union[str,Path]]: 

340 """ 

341 List files on disk, optionally filtering by pattern and extensions. 

342 

343 Args: 

344 directory: Directory to search. 

345 pattern: Optional glob pattern to match filenames. 

346 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}). 

347 Extensions should include the dot and are case-insensitive. 

348 recursive: Whether to search recursively. 

349 

350 Returns: 

351 List of paths to matching files. 

352 

353 Raises: 

354 TypeError: If directory is not a valid path type 

355 FileNotFoundError: If the directory does not exist 

356 """ 

357 disk_directory = Path(directory) 

358 

359 if not disk_directory.is_dir(): 

360 raise ValueError(f"Path is not a directory: {disk_directory}") 

361 

362 # Use appropriate search strategy based on recursion 

363 if recursive: 

364 # Use breadth-first traversal to prioritize shallower files 

365 files = self._list_files_breadth_first(disk_directory, pattern) 

366 else: 

367 glob_pattern = pattern if pattern else "*" 

368 # Include both regular files and symlinks (even broken ones) 

369 files = [p for p in disk_directory.glob(glob_pattern) if p.is_file() or p.is_symlink()] 

370 

371 # Filter out macOS metadata files (._* files) that interfere with parsing 

372 files = [f for f in files if not f.name.startswith('._')] 

373 

374 # Filter by extensions if provided 

375 if extensions: 

376 # Convert extensions to lowercase for case-insensitive comparison 

377 lowercase_extensions = {ext.lower() for ext in extensions} 

378 files = [f for f in files if f.suffix.lower() in lowercase_extensions] 

379 

380 # Return paths as strings 

381 return [str(f) for f in files] 

382 

383 def _list_files_breadth_first(self, directory: Path, pattern: Optional[str] = None) -> List[Path]: 

384 """ 

385 List files using breadth-first traversal to prioritize shallower files. 

386 

387 This ensures that files in the root directory are found before files 

388 in subdirectories, which is important for metadata detection. 

389 

390 Args: 

391 directory: Root directory to search 

392 pattern: Optional glob pattern to match filenames 

393 

394 Returns: 

395 List of file paths sorted by depth (shallower first) 

396 """ 

397 from collections import deque 

398 

399 files = [] 

400 # Use deque for breadth-first traversal 

401 dirs_to_search = deque([(directory, 0)]) # (path, depth) 

402 

403 while dirs_to_search: 

404 current_dir, depth = dirs_to_search.popleft() 

405 

406 try: 

407 # Get all entries in current directory 

408 for entry in current_dir.iterdir(): 

409 if entry.is_file(): 

410 # Filter out macOS metadata files (._* files) that interfere with parsing 

411 if entry.name.startswith('._'): 

412 continue 

413 # Check if file matches pattern 

414 if pattern is None or entry.match(pattern): 

415 files.append((entry, depth)) 

416 elif entry.is_dir(): 

417 # Add subdirectory to queue for later processing 

418 dirs_to_search.append((entry, depth + 1)) 

419 except (PermissionError, OSError): 

420 # Skip directories we can't read 

421 continue 

422 

423 # Sort by depth first, then by path for consistent ordering 

424 files.sort(key=lambda x: (x[1], str(x[0]))) 

425 

426 # Return just the paths 

427 return [file_path for file_path, _ in files] 

428 

429 def list_dir(self, path: Union[str, Path]) -> List[str]: 

430 path = Path(path) 

431 if not path.exists(): 

432 raise FileNotFoundError(f"Path does not exist: {path}") 

433 if not path.is_dir(): 

434 raise NotADirectoryError(f"Not a directory: {path}") 

435 return [entry.name for entry in path.iterdir()] 

436 

437 

438 def delete(self, path: Union[str, Path]) -> None: 

439 """ 

440 Delete a file or empty directory at the given disk path. 

441 

442 Args: 

443 path: Path to delete 

444 

445 Raises: 

446 FileNotFoundError: If path does not exist 

447 IsADirectoryError: If path is a directory and not empty 

448 StorageResolutionError: If deletion fails for unknown reasons 

449 """ 

450 path = Path(path) 

451 

452 if not path.exists(): 

453 raise FileNotFoundError(f"Cannot delete: path does not exist: {path}") 

454 

455 try: 

456 if path.is_dir(): 

457 # Do not allow recursive deletion 

458 path.rmdir() # will raise OSError if directory is not empty 

459 else: 

460 path.unlink() 

461 except IsADirectoryError: 

462 raise 

463 except OSError as e: 

464 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") from e 

465 except Exception as e: 

466 raise StorageResolutionError(f"Failed to delete {path}") from e 

467 

468 def delete_all(self, path: Union[str, Path]) -> None: 

469 """ 

470 Recursively delete a file or directory and all its contents from disk. 

471 

472 Args: 

473 path: Filesystem path to delete 

474 

475 Raises: 

476 FileNotFoundError: If the path does not exist 

477 StorageResolutionError: If deletion fails for any reason 

478 """ 

479 path = Path(path) 

480 

481 if not path.exists(): 

482 raise FileNotFoundError(f"Path does not exist: {path}") 

483 

484 try: 

485 if path.is_file(): 

486 path.unlink() 

487 else: 

488 # Safe, recursive removal of directories 

489 import shutil 

490 shutil.rmtree(path) 

491 except Exception as e: 

492 raise StorageResolutionError(f"Failed to recursively delete: {path}") from e 

493 

494 

495 def ensure_directory(self, directory: Union[str, Path]) -> Union[str, Path]: 

496 """ 

497 Ensure a directory exists on disk. 

498 

499 Args: 

500 directory: Path to the directory to ensure exists 

501 

502 Returns: 

503 Path to the directory 

504 

505 Raises: 

506 TypeError: If directory is not a valid path type 

507 ValueError: If there is an error creating the directory 

508 """ 

509 # 🔒 Clause 17 — VFS Boundary Enforcement 

510 try: 

511 disk_directory = Path(directory) 

512 disk_directory.mkdir(parents=True, exist_ok=True) 

513 return directory 

514 except OSError as e: 

515 # 🔒 Clause 65 — No Fallback Logic 

516 # Propagate the error with additional context 

517 raise ValueError(f"Error creating directory {disk_directory}: {e}") from e 

518 

519 def exists(self, path: Union[str, Path]) -> bool: 

520 return Path(path).exists() 

521 

522 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

523 source = Path(source).resolve() 

524 link_name = Path(link_name) # Don't resolve link_name - we want the actual symlink path 

525 

526 if not source.exists(): 

527 raise FileNotFoundError(f"Source path does not exist: {source}") 

528 

529 # Check if target exists and handle overwrite policy 

530 if link_name.exists() or link_name.is_symlink(): 

531 if not overwrite: 

532 raise FileExistsError(f"Target already exists: {link_name}") 

533 link_name.unlink() # Remove existing file/symlink only if overwrite=True 

534 

535 link_name.parent.mkdir(parents=True, exist_ok=True) 

536 # On Windows, symlink_to() requires target_is_directory to be set correctly 

537 # On Unix, this parameter is ignored, so it's safe to always specify it 

538 link_name.symlink_to(source, target_is_directory=source.is_dir()) 

539 

540 

541 def is_symlink(self, path: Union[str, Path]) -> bool: 

542 return Path(path).is_symlink() 

543 

544 

545 def is_file(self, path: Union[str, Path]) -> bool: 

546 path = Path(path) 

547 

548 if not path.exists(): 

549 raise FileNotFoundError(f"Path does not exist: {path}") 

550 

551 # Resolve symlinks and return True only if final target is a file 

552 resolved = path.resolve(strict=True) 

553 

554 if resolved.is_dir(): 

555 raise IsADirectoryError(f"Path is a directory: {path}") 

556 

557 return resolved.is_file() 

558 

559 def is_dir(self, path: Union[str, Path]) -> bool: 

560 """ 

561 Check if a given disk path is a directory. 

562 

563 Follows filesystem symlinks to determine the actual resolved structure. 

564 

565 Args: 

566 path: Filesystem path (absolute or relative) 

567 

568 Returns: 

569 bool: True if path resolves to a directory 

570 

571 Raises: 

572 FileNotFoundError: If the path or symlink target does not exist 

573 NotADirectoryError: If the resolved target is not a directory 

574 """ 

575 path = Path(path) 

576 

577 if not path.exists(): 

578 raise FileNotFoundError(f"Path does not exist: {path}") 

579 

580 # Follow symlinks to final real target 

581 resolved = path.resolve(strict=True) 

582 

583 if not resolved.is_dir(): 

584 raise NotADirectoryError(f"Path is not a directory: {path}") 

585 

586 return True 

587 

588 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

589 """ 

590 Move a file or directory on disk. Follows symlinks and performs overwrite-safe move. 

591 

592 Raises: 

593 FileNotFoundError: If source does not exist 

594 FileExistsError: If destination already exists 

595 StorageResolutionError: On failure to move 

596 """ 

597 import shutil 

598 from pathlib import Path 

599 

600 src = Path(src) 

601 dst = Path(dst) 

602 

603 if not src.exists(): 

604 raise FileNotFoundError(f"Source path does not exist: {src}") 

605 if dst.exists(): 

606 raise FileExistsError(f"Destination already exists: {dst}") 

607 

608 try: 

609 shutil.move(str(src), str(dst)) 

610 except Exception as e: 

611 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e 

612 

613 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

614 """ 

615 Return structural metadata about a disk-backed path. 

616 

617 Returns: 

618 dict with keys: 

619 - 'type': 'file', 'directory', 'symlink', or 'missing' 

620 - 'path': str(path) 

621 - 'target': resolved target if symlink 

622 - 'exists': bool 

623 

624 Raises: 

625 StorageResolutionError: On access or resolution failure 

626 """ 

627 path_str = str(path) 

628 try: 

629 if not os.path.lexists(path_str): # includes broken symlinks 

630 return { 

631 "type": "missing", 

632 "path": path_str, 

633 "exists": False 

634 } 

635 

636 if os.path.islink(path_str): 

637 try: 

638 resolved = os.readlink(path_str) 

639 target_exists = os.path.exists(path_str) 

640 except OSError as e: 

641 raise StorageResolutionError(f"Failed to resolve symlink: {path}") from e 

642 

643 return { 

644 "type": "symlink", 

645 "path": path_str, 

646 "target": resolved, 

647 "exists": target_exists 

648 } 

649 

650 if os.path.isdir(path_str): 

651 return { 

652 "type": "directory", 

653 "path": path_str, 

654 "exists": True 

655 } 

656 

657 if os.path.isfile(path_str): 

658 return { 

659 "type": "file", 

660 "path": path_str, 

661 "exists": True 

662 } 

663 

664 raise StorageResolutionError(f"Unknown filesystem object at: {path_str}") 

665 

666 except Exception as e: 

667 raise StorageResolutionError(f"Failed to stat disk path: {path}") from e 

668 

669 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

670 """ 

671 Copy a file or directory to a new location. 

672  

673 - Does not overwrite destination. 

674 - Will raise if destination exists. 

675 - Supports file-to-file and dir-to-dir copies. 

676  

677 Raises: 

678 FileExistsError: If destination already exists 

679 FileNotFoundError: If source is missing 

680 StorageResolutionError: On structural failure 

681 """ 

682 src = Path(src) 

683 dst = Path(dst) 

684 

685 if not src.exists(): 

686 raise FileNotFoundError(f"Source does not exist: {src}") 

687 if dst.exists(): 

688 raise FileExistsError(f"Destination already exists: {dst}") 

689 

690 try: 

691 if src.is_dir(): 

692 shutil.copytree(src, dst) 

693 else: 

694 shutil.copy2(src, dst) 

695 except Exception as e: 

696 raise StorageResolutionError(f"Failed to copy {src}{dst}") from e 

697 

698 def _save_rois(self, rois: List, output_path: Path, images_dir: str = None, **kwargs) -> str: 

699 """Save ROIs as .roi.zip archive (ImageJ standard format). 

700 

701 Args: 

702 rois: List of ROI objects 

703 output_path: Output path (e.g., /disk/plate_001/step_7_results/A01_rois_step7.roi.zip) 

704 images_dir: Images directory path (unused for disk backend) 

705 

706 Returns: 

707 Path where ROIs were saved 

708 """ 

709 import zipfile 

710 import numpy as np 

711 

712 try: 

713 from openhcs.core.roi import PolygonShape, MaskShape, PointShape, EllipseShape 

714 except ImportError: 

715 raise ImportError("ROI support requires the openhcs package") 

716 

717 output_path = Path(output_path) 

718 

719 # Ensure output directory exists 

720 output_path.parent.mkdir(parents=True, exist_ok=True) 

721 

722 # Ensure output path has .roi.zip extension 

723 if not output_path.name.endswith('.roi.zip'): 

724 output_path = output_path.with_suffix('.roi.zip') 

725 

726 try: 

727 from roifile import ImagejRoi 

728 except ImportError: 

729 logger.error("roifile library not available - cannot save ROIs") 

730 raise ImportError("roifile library required for ROI saving. Install with: pip install roifile") 

731 

732 # Create .roi.zip archive 

733 roi_count = 0 

734 with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf: 

735 for idx, roi in enumerate(rois): 

736 for shape in roi.shapes: 

737 if isinstance(shape, PolygonShape): 

738 # Convert polygon to ImageJ ROI 

739 # roifile expects (x, y) coordinates, but we have (y, x) 

740 coords_xy = shape.coordinates[:, [1, 0]] # Swap columns 

741 ij_roi = ImagejRoi.frompoints(coords_xy) 

742 

743 # Use incrementing counter for unique filenames (avoid duplicate names from label values) 

744 ij_roi.name = f"ROI_{roi_count + 1}" 

745 

746 # Write to zip archive 

747 roi_bytes = ij_roi.tobytes() 

748 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes) 

749 roi_count += 1 

750 

751 elif isinstance(shape, PointShape): 

752 # Convert point to ImageJ ROI 

753 coords_xy = np.array([[shape.x, shape.y]]) 

754 ij_roi = ImagejRoi.frompoints(coords_xy) 

755 

756 ij_roi.name = f"ROI_{roi_count + 1}" 

757 

758 roi_bytes = ij_roi.tobytes() 

759 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes) 

760 roi_count += 1 

761 

762 elif isinstance(shape, EllipseShape): 

763 # Convert ellipse to polygon approximation (ImageJ ROI format limitation) 

764 # Generate 64 points around the ellipse 

765 theta = np.linspace(0, 2 * np.pi, 64) 

766 x = shape.center_x + shape.radius_x * np.cos(theta) 

767 y = shape.center_y + shape.radius_y * np.sin(theta) 

768 coords_xy = np.column_stack([x, y]) 

769 

770 ij_roi = ImagejRoi.frompoints(coords_xy) 

771 ij_roi.name = f"ROI_{roi_count + 1}" 

772 

773 roi_bytes = ij_roi.tobytes() 

774 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes) 

775 roi_count += 1 

776 

777 elif isinstance(shape, MaskShape): 

778 # Skip mask shapes - ImageJ ROI format doesn't support binary masks 

779 logger.warning(f"Skipping mask shape for ROI {idx} - not supported in ImageJ .roi format") 

780 continue 

781 

782 logger.info(f"Saved {roi_count} ROIs to .roi.zip archive: {output_path}") 

783 return str(output_path)