Coverage for openhcs/io/disk.py: 51.6%

357 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1# openhcs/io/storage/backends/disk.py 

2""" 

3Disk-based storage backend implementation. 

4 

5This module provides a concrete implementation of the storage backend interfaces 

6for local disk storage. It strictly enforces VFS boundaries and doctrinal clauses. 

7""" 

8 

9import logging 

10import os 

11import shutil 

12from pathlib import Path 

13from typing import Any, Callable, Dict, List, Optional, Set, Union 

14 

15import numpy as np 

16 

17from openhcs.constants.constants import FileFormat, Backend 

18from openhcs.io.base import StorageBackend 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23def optional_import(module_name): 

24 try: 

25 return __import__(module_name) 

26 except ImportError: 

27 return None 

28 

29# Optional dependencies at module level (not instance level to avoid pickle issues) 

30# Skip GPU libraries in subprocess runner mode 

31if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true

32 torch = None 

33 jax = None 

34 jnp = None 

35 cupy = None 

36 tf = None 

37 logger.info("Subprocess runner mode - skipping GPU library imports in disk backend") 

38else: 

39 from openhcs.core.lazy_gpu_imports import torch, jax, jnp, cupy, tf 

40tifffile = optional_import("tifffile") 

41 

42class FileFormatRegistry: 

43 def __init__(self): 

44 self._writers: Dict[str, Callable[[Path, Any], None]] = {} 

45 self._readers: Dict[str, Callable[[Path], Any]] = {} 

46 

47 def register(self, ext: str, writer: Callable, reader: Callable): 

48 ext = ext.lower() 

49 self._writers[ext] = writer 

50 self._readers[ext] = reader 

51 

52 def get_writer(self, ext: str) -> Callable: 

53 return self._writers[ext.lower()] 

54 

55 def get_reader(self, ext: str) -> Callable: 

56 return self._readers[ext.lower()] 

57 

58 def is_registered(self, ext: str) -> bool: 

59 return ext.lower() in self._writers and ext.lower() in self._readers 

60 

61 

62class DiskStorageBackend(StorageBackend): 

63 """Disk storage backend with automatic registration.""" 

64 _backend_type = Backend.DISK.value 

65 def __init__(self): 

66 self.format_registry = FileFormatRegistry() 

67 self._register_formats() 

68 

69 def _register_formats(self): 

70 """ 

71 Register all file format handlers. 

72 

73 Uses enum-driven registration to eliminate boilerplate. 

74 Complex formats (CSV, JSON, TIFF, ROI.ZIP, TEXT) use custom handlers. 

75 Simple formats (NumPy, Torch, CuPy, JAX, TensorFlow) use library save/load directly. 

76 """ 

77 # Format handler metadata: (FileFormat enum, module_check, writer, reader) 

78 # None for writer/reader means use the format's library save/load directly 

79 format_handlers = [ 

80 # Simple formats - use library save/load directly 

81 (FileFormat.NUMPY, True, np.save, np.load), 

82 (FileFormat.TORCH, torch, torch.save if torch else None, torch.load if torch else None), 

83 (FileFormat.JAX, (jax and jnp), self._jax_writer, self._jax_reader), 

84 (FileFormat.CUPY, cupy, self._cupy_writer, self._cupy_reader), 

85 (FileFormat.TENSORFLOW, tf, self._tensorflow_writer, self._tensorflow_reader), 

86 

87 # Complex formats - use custom handlers 

88 (FileFormat.TIFF, tifffile, self._tiff_writer, self._tiff_reader), 

89 (FileFormat.TEXT, True, self._text_writer, self._text_reader), 

90 (FileFormat.JSON, True, self._json_writer, self._json_reader), 

91 (FileFormat.CSV, True, self._csv_writer, self._csv_reader), 

92 (FileFormat.ROI, True, self._roi_zip_writer, self._roi_zip_reader), 

93 ] 

94 

95 # Register all available formats 

96 for file_format, module_available, writer, reader in format_handlers: 

97 if not module_available or writer is None or reader is None: 

98 continue 

99 

100 # Register all extensions for this format 

101 for ext in file_format.value: 

102 self.format_registry.register(ext.lower(), writer, reader) 

103 

104 # Format-specific writer/reader functions (pickleable) 

105 # Only needed for formats that require special handling beyond library save/load 

106 

107 def _jax_writer(self, path, data, **kwargs): 

108 """JAX arrays must be moved to CPU before saving.""" 

109 np.save(path, jax.device_get(data)) 

110 

111 def _jax_reader(self, path): 

112 """Load NumPy array and convert to JAX.""" 

113 return jnp.array(np.load(path)) 

114 

115 def _cupy_writer(self, path, data, **kwargs): 

116 """CuPy has its own save format.""" 

117 cupy.save(path, data) 

118 

119 def _cupy_reader(self, path): 

120 """Load CuPy array from disk.""" 

121 return cupy.load(path) 

122 

123 def _tensorflow_writer(self, path, data, **kwargs): 

124 """TensorFlow uses tensor serialization.""" 

125 tf.io.write_file(path.as_posix(), tf.io.serialize_tensor(data)) 

126 

127 def _tensorflow_reader(self, path): 

128 """Load and deserialize TensorFlow tensor.""" 

129 return tf.io.parse_tensor(tf.io.read_file(path.as_posix()), out_type=tf.dtypes.float32) 

130 

131 def _tiff_writer(self, path, data, **kwargs): 

132 tifffile.imwrite(path, data) 

133 

134 def _tiff_reader(self, path): 

135 # For symlinks, try multiple approaches to handle filesystem issues 

136 path_obj = Path(path) 

137 

138 if path_obj.is_symlink(): 138 ↛ 140line 138 didn't jump to line 140 because the condition on line 138 was never true

139 # First try reading the symlink directly (let OS handle it) 

140 try: 

141 return tifffile.imread(str(path)) 

142 except FileNotFoundError: 

143 # If that fails, try the target path 

144 try: 

145 target_path = path_obj.readlink() 

146 return tifffile.imread(str(target_path)) 

147 except FileNotFoundError: 

148 # If target doesn't exist, try resolving the symlink 

149 resolved_path = path_obj.resolve() 

150 return tifffile.imread(str(resolved_path)) 

151 else: 

152 return tifffile.imread(str(path)) 

153 

154 def _text_writer(self, path, data, **kwargs): 

155 """Write text data to file. Accepts and ignores extra kwargs for compatibility.""" 

156 path.write_text(str(data)) 

157 

158 def _text_reader(self, path): 

159 return path.read_text() 

160 

161 def _json_writer(self, path, data, **kwargs): 

162 import json 

163 # Ensure parent directory exists 

164 path.parent.mkdir(parents=True, exist_ok=True) 

165 path.write_text(json.dumps(data, indent=2)) 

166 

167 def _json_reader(self, path): 

168 import json 

169 return json.loads(path.read_text()) 

170 

171 def _csv_writer(self, path, data, **kwargs): 

172 import csv 

173 # Assume data is a list of rows or a dict 

174 with path.open('w', newline='') as f: 

175 if isinstance(data, dict): 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was never true

176 # Write dict as CSV with headers 

177 writer = csv.DictWriter(f, fieldnames=data.keys()) 

178 writer.writeheader() 

179 writer.writerow(data) 

180 elif isinstance(data, list) and len(data) > 0: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 if isinstance(data[0], dict): 

182 # List of dicts 

183 writer = csv.DictWriter(f, fieldnames=data[0].keys()) 

184 writer.writeheader() 

185 writer.writerows(data) 

186 else: 

187 # List of lists/tuples 

188 writer = csv.writer(f) 

189 writer.writerows(data) 

190 else: 

191 # Fallback: write as single row 

192 writer = csv.writer(f) 

193 writer.writerow([data]) 

194 

195 def _roi_zip_writer(self, path, data, **kwargs): 

196 """Write ROIs to .roi.zip archive. Wrapper for _save_rois.""" 

197 # data should be a list of ROI objects 

198 self._save_rois(data, path, **kwargs) 

199 

200 def _roi_zip_reader(self, path, **kwargs): 

201 """Read ROIs from .roi.zip archive.""" 

202 from openhcs.core.roi import load_rois_from_zip 

203 return load_rois_from_zip(path) 

204 

205 def _csv_reader(self, path): 

206 import csv 

207 with path.open('r', newline='') as f: 

208 reader = csv.DictReader(f) 

209 return list(reader) 

210 

211 

212 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

213 """ 

214 Load data from disk based on explicit content type. 

215 

216 Args: 

217 file_path: Path to the file to load 

218 **kwargs: Additional arguments for the load operation, must include 'content_type' 

219 to explicitly specify the type of content to load 

220 

221 Returns: 

222 The loaded data 

223 

224 Raises: 

225 TypeError: If file_path is not a valid path type or content_type is not specified 

226 FileNotFoundError: If the file does not exist 

227 ValueError: If the file cannot be loaded 

228 """ 

229 

230 disk_path = Path(file_path) 

231 

232 # Handle double extensions (e.g., .roi.zip, .csv.zip) 

233 # Check if file has double extension by looking at suffixes 

234 ext = None 

235 if len(disk_path.suffixes) >= 2: 235 ↛ 237line 235 didn't jump to line 237 because the condition on line 235 was never true

236 # Try double extension first (e.g., '.roi.zip') 

237 double_ext = ''.join(disk_path.suffixes[-2:]).lower() 

238 if self.format_registry.is_registered(double_ext): 

239 ext = double_ext 

240 

241 # Fall back to single extension if double extension not registered 

242 if ext is None: 242 ↛ 245line 242 didn't jump to line 245 because the condition on line 242 was always true

243 ext = disk_path.suffix.lower() 

244 

245 if not self.format_registry.is_registered(ext): 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 raise ValueError(f"No writer registered for extension '{ext}'") 

247 

248 try: 

249 reader = self.format_registry.get_reader(ext) 

250 return reader(disk_path, **kwargs) 

251 except Exception as e: 

252 raise ValueError(f"Error loading data from {disk_path}: {e}") from e 

253 

254 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None: 

255 """ 

256 Save data to disk based on explicit content type. 

257 

258 Args: 

259 data: The data to save 

260 output_path: Path where the data should be saved 

261 **kwargs: Additional arguments for the save operation, must include 'content_type' 

262 to explicitly specify the type of content to save 

263 

264 Raises: 

265 TypeError: If output_path is not a valid path type or content_type is not specified 

266 ValueError: If the data cannot be saved 

267 """ 

268 from openhcs.core.roi import ROI 

269 

270 disk_output_path = Path(output_path) 

271 

272 # Explicit type dispatch for ROI data 

273 if isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI): 

274 # ROI data - save as JSON 

275 images_dir = kwargs.pop('images_dir', None) 

276 self._save_rois(data, disk_output_path, images_dir=images_dir, **kwargs) 

277 return 

278 

279 ext = disk_output_path.suffix.lower() 

280 if not self.format_registry.is_registered(ext): 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 raise ValueError(f"No writer registered for extension '{ext}'") 

282 

283 try: 

284 writer = self.format_registry.get_writer(ext) 

285 return writer(disk_output_path, data, **kwargs ) 

286 except Exception as e: 

287 raise ValueError(f"Error saving data to {disk_output_path}: {e}") from e 

288 

289 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

290 """ 

291 Load multiple files sequentially using existing load method. 

292 

293 Args: 

294 file_paths: List of file paths to load 

295 **kwargs: Additional arguments passed to load method 

296 

297 Returns: 

298 List of loaded data objects in the same order as file_paths 

299 """ 

300 results = [] 

301 for file_path in file_paths: 

302 result = self.load(file_path, **kwargs) 

303 results.append(result) 

304 return results 

305 

306 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

307 """ 

308 Save multiple files sequentially using existing save method. 

309 

310 Converts GPU arrays to CPU numpy arrays before saving using OpenHCS memory conversion system. 

311 

312 Args: 

313 data_list: List of data objects to save 

314 output_paths: List of destination paths (must match length of data_list) 

315 **kwargs: Additional arguments passed to save method 

316 

317 Raises: 

318 ValueError: If data_list and output_paths have different lengths 

319 """ 

320 if len(data_list) != len(output_paths): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})") 

322 

323 # Convert GPU arrays to CPU numpy arrays using OpenHCS memory conversion system 

324 from openhcs.core.memory.converters import convert_memory, detect_memory_type 

325 from openhcs.constants.constants import MemoryType 

326 

327 cpu_data_list = [] 

328 for data in data_list: 

329 # Detect the memory type of the data 

330 source_type = detect_memory_type(data) 

331 

332 # Convert to numpy if not already numpy 

333 if source_type == MemoryType.NUMPY.value: 333 ↛ 339line 333 didn't jump to line 339 because the condition on line 333 was always true

334 # Already numpy, use as-is 

335 cpu_data_list.append(data) 

336 else: 

337 # Convert to numpy using OpenHCS memory conversion system 

338 # Allow CPU roundtrip since we're explicitly going to disk 

339 numpy_data = convert_memory( 

340 data=data, 

341 source_type=source_type, 

342 target_type=MemoryType.NUMPY.value, 

343 gpu_id=0 # Placeholder since numpy doesn't use GPU ID 

344 ) 

345 cpu_data_list.append(numpy_data) 

346 

347 # Save converted data using existing save method 

348 for cpu_data, output_path in zip(cpu_data_list, output_paths): 

349 self.save(cpu_data, output_path, **kwargs) 

350 

351 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None, 

352 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Union[str,Path]]: 

353 """ 

354 List files on disk, optionally filtering by pattern and extensions. 

355 

356 Args: 

357 directory: Directory to search. 

358 pattern: Optional glob pattern to match filenames. 

359 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}). 

360 Extensions should include the dot and are case-insensitive. 

361 recursive: Whether to search recursively. 

362 

363 Returns: 

364 List of paths to matching files. 

365 

366 Raises: 

367 TypeError: If directory is not a valid path type 

368 FileNotFoundError: If the directory does not exist 

369 """ 

370 disk_directory = Path(directory) 

371 

372 if not disk_directory.is_dir(): 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true

373 raise ValueError(f"Path is not a directory: {disk_directory}") 

374 

375 # Use appropriate search strategy based on recursion 

376 if recursive: 

377 # Use breadth-first traversal to prioritize shallower files 

378 files = self._list_files_breadth_first(disk_directory, pattern) 

379 else: 

380 glob_pattern = pattern if pattern else "*" 

381 # Include both regular files and symlinks (even broken ones) 

382 files = [p for p in disk_directory.glob(glob_pattern) if p.is_file() or p.is_symlink()] 

383 

384 # Filter out macOS metadata files (._* files) that interfere with parsing 

385 files = [f for f in files if not f.name.startswith('._')] 

386 

387 # Filter by extensions if provided 

388 if extensions: 

389 # Convert extensions to lowercase for case-insensitive comparison 

390 lowercase_extensions = {ext.lower() for ext in extensions} 

391 files = [f for f in files if f.suffix.lower() in lowercase_extensions] 

392 

393 # Return paths as strings 

394 return [str(f) for f in files] 

395 

396 def _list_files_breadth_first(self, directory: Path, pattern: Optional[str] = None) -> List[Path]: 

397 """ 

398 List files using breadth-first traversal to prioritize shallower files. 

399 

400 This ensures that files in the root directory are found before files 

401 in subdirectories, which is important for metadata detection. 

402 

403 Args: 

404 directory: Root directory to search 

405 pattern: Optional glob pattern to match filenames 

406 

407 Returns: 

408 List of file paths sorted by depth (shallower first) 

409 """ 

410 from collections import deque 

411 

412 files = [] 

413 # Use deque for breadth-first traversal 

414 dirs_to_search = deque([(directory, 0)]) # (path, depth) 

415 

416 while dirs_to_search: 

417 current_dir, depth = dirs_to_search.popleft() 

418 

419 try: 

420 # Get all entries in current directory 

421 for entry in current_dir.iterdir(): 

422 if entry.is_file(): 

423 # Filter out macOS metadata files (._* files) that interfere with parsing 

424 if entry.name.startswith('._'): 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true

425 continue 

426 # Check if file matches pattern 

427 if pattern is None or entry.match(pattern): 427 ↛ 421line 427 didn't jump to line 421 because the condition on line 427 was always true

428 files.append((entry, depth)) 

429 elif entry.is_dir(): 429 ↛ 421line 429 didn't jump to line 421 because the condition on line 429 was always true

430 # Add subdirectory to queue for later processing 

431 dirs_to_search.append((entry, depth + 1)) 

432 except (PermissionError, OSError): 

433 # Skip directories we can't read 

434 continue 

435 

436 # Sort by depth first, then by path for consistent ordering 

437 files.sort(key=lambda x: (x[1], str(x[0]))) 

438 

439 # Return just the paths 

440 return [file_path for file_path, _ in files] 

441 

442 def list_dir(self, path: Union[str, Path]) -> List[str]: 

443 path = Path(path) 

444 if not path.exists(): 444 ↛ 445line 444 didn't jump to line 445 because the condition on line 444 was never true

445 raise FileNotFoundError(f"Path does not exist: {path}") 

446 if not path.is_dir(): 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true

447 raise NotADirectoryError(f"Not a directory: {path}") 

448 return [entry.name for entry in path.iterdir()] 

449 

450 

451 def delete(self, path: Union[str, Path]) -> None: 

452 """ 

453 Delete a file or empty directory at the given disk path. 

454 

455 Args: 

456 path: Path to delete 

457 

458 Raises: 

459 FileNotFoundError: If path does not exist 

460 IsADirectoryError: If path is a directory and not empty 

461 StorageResolutionError: If deletion fails for unknown reasons 

462 """ 

463 path = Path(path) 

464 

465 if not path.exists(): 

466 raise FileNotFoundError(f"Cannot delete: path does not exist: {path}") 

467 

468 try: 

469 if path.is_dir(): 

470 # Do not allow recursive deletion 

471 path.rmdir() # will raise OSError if directory is not empty 

472 else: 

473 path.unlink() 

474 except IsADirectoryError: 

475 raise 

476 except OSError as e: 

477 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") from e 

478 except Exception as e: 

479 raise StorageResolutionError(f"Failed to delete {path}") from e 

480 

481 def delete_all(self, path: Union[str, Path]) -> None: 

482 """ 

483 Recursively delete a file or directory and all its contents from disk. 

484 

485 Args: 

486 path: Filesystem path to delete 

487 

488 Raises: 

489 FileNotFoundError: If the path does not exist 

490 StorageResolutionError: If deletion fails for any reason 

491 """ 

492 path = Path(path) 

493 

494 if not path.exists(): 

495 raise FileNotFoundError(f"Path does not exist: {path}") 

496 

497 try: 

498 if path.is_file(): 

499 path.unlink() 

500 else: 

501 # Safe, recursive removal of directories 

502 import shutil 

503 shutil.rmtree(path) 

504 except Exception as e: 

505 raise StorageResolutionError(f"Failed to recursively delete: {path}") from e 

506 

507 

508 def ensure_directory(self, directory: Union[str, Path]) -> Union[str, Path]: 

509 """ 

510 Ensure a directory exists on disk. 

511 

512 Args: 

513 directory: Path to the directory to ensure exists 

514 

515 Returns: 

516 Path to the directory 

517 

518 Raises: 

519 TypeError: If directory is not a valid path type 

520 ValueError: If there is an error creating the directory 

521 """ 

522 # 🔒 Clause 17 — VFS Boundary Enforcement 

523 try: 

524 disk_directory = Path(directory) 

525 disk_directory.mkdir(parents=True, exist_ok=True) 

526 return directory 

527 except OSError as e: 

528 # 🔒 Clause 65 — No Fallback Logic 

529 # Propagate the error with additional context 

530 raise ValueError(f"Error creating directory {disk_directory}: {e}") from e 

531 

532 def exists(self, path: Union[str, Path]) -> bool: 

533 return Path(path).exists() 

534 

535 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

536 source = Path(source).resolve() 

537 link_name = Path(link_name) # Don't resolve link_name - we want the actual symlink path 

538 

539 if not source.exists(): 

540 raise FileNotFoundError(f"Source path does not exist: {source}") 

541 

542 # Check if target exists and handle overwrite policy 

543 if link_name.exists() or link_name.is_symlink(): 

544 if not overwrite: 

545 raise FileExistsError(f"Target already exists: {link_name}") 

546 link_name.unlink() # Remove existing file/symlink only if overwrite=True 

547 

548 link_name.parent.mkdir(parents=True, exist_ok=True) 

549 # On Windows, symlink_to() requires target_is_directory to be set correctly 

550 # On Unix, this parameter is ignored, so it's safe to always specify it 

551 link_name.symlink_to(source, target_is_directory=source.is_dir()) 

552 

553 

554 def is_symlink(self, path: Union[str, Path]) -> bool: 

555 return Path(path).is_symlink() 

556 

557 

558 def is_file(self, path: Union[str, Path]) -> bool: 

559 path = Path(path) 

560 

561 if not path.exists(): 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true

562 raise FileNotFoundError(f"Path does not exist: {path}") 

563 

564 # Resolve symlinks and return True only if final target is a file 

565 resolved = path.resolve(strict=True) 

566 

567 if resolved.is_dir(): 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true

568 raise IsADirectoryError(f"Path is a directory: {path}") 

569 

570 return resolved.is_file() 

571 

572 def is_dir(self, path: Union[str, Path]) -> bool: 

573 """ 

574 Check if a given disk path is a directory. 

575 

576 Follows filesystem symlinks to determine the actual resolved structure. 

577 

578 Args: 

579 path: Filesystem path (absolute or relative) 

580 

581 Returns: 

582 bool: True if path resolves to a directory 

583 

584 Raises: 

585 FileNotFoundError: If the path or symlink target does not exist 

586 NotADirectoryError: If the resolved target is not a directory 

587 """ 

588 path = Path(path) 

589 

590 if not path.exists(): 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true

591 raise FileNotFoundError(f"Path does not exist: {path}") 

592 

593 # Follow symlinks to final real target 

594 resolved = path.resolve(strict=True) 

595 

596 if not resolved.is_dir(): 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true

597 raise NotADirectoryError(f"Path is not a directory: {path}") 

598 

599 return True 

600 

601 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

602 """ 

603 Move a file or directory on disk. Follows symlinks and performs overwrite-safe move. 

604 

605 Raises: 

606 FileNotFoundError: If source does not exist 

607 FileExistsError: If destination already exists 

608 StorageResolutionError: On failure to move 

609 """ 

610 import shutil 

611 from pathlib import Path 

612 

613 src = Path(src) 

614 dst = Path(dst) 

615 

616 if not src.exists(): 

617 raise FileNotFoundError(f"Source path does not exist: {src}") 

618 if dst.exists(): 

619 raise FileExistsError(f"Destination already exists: {dst}") 

620 

621 try: 

622 shutil.move(str(src), str(dst)) 

623 except Exception as e: 

624 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e 

625 

626 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

627 """ 

628 Return structural metadata about a disk-backed path. 

629 

630 Returns: 

631 dict with keys: 

632 - 'type': 'file', 'directory', 'symlink', or 'missing' 

633 - 'path': str(path) 

634 - 'target': resolved target if symlink 

635 - 'exists': bool 

636 

637 Raises: 

638 StorageResolutionError: On access or resolution failure 

639 """ 

640 path_str = str(path) 

641 try: 

642 if not os.path.lexists(path_str): # includes broken symlinks 

643 return { 

644 "type": "missing", 

645 "path": path_str, 

646 "exists": False 

647 } 

648 

649 if os.path.islink(path_str): 

650 try: 

651 resolved = os.readlink(path_str) 

652 target_exists = os.path.exists(path_str) 

653 except OSError as e: 

654 raise StorageResolutionError(f"Failed to resolve symlink: {path}") from e 

655 

656 return { 

657 "type": "symlink", 

658 "path": path_str, 

659 "target": resolved, 

660 "exists": target_exists 

661 } 

662 

663 if os.path.isdir(path_str): 

664 return { 

665 "type": "directory", 

666 "path": path_str, 

667 "exists": True 

668 } 

669 

670 if os.path.isfile(path_str): 

671 return { 

672 "type": "file", 

673 "path": path_str, 

674 "exists": True 

675 } 

676 

677 raise StorageResolutionError(f"Unknown filesystem object at: {path_str}") 

678 

679 except Exception as e: 

680 raise StorageResolutionError(f"Failed to stat disk path: {path}") from e 

681 

682 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

683 """ 

684 Copy a file or directory to a new location. 

685  

686 - Does not overwrite destination. 

687 - Will raise if destination exists. 

688 - Supports file-to-file and dir-to-dir copies. 

689  

690 Raises: 

691 FileExistsError: If destination already exists 

692 FileNotFoundError: If source is missing 

693 StorageResolutionError: On structural failure 

694 """ 

695 src = Path(src) 

696 dst = Path(dst) 

697 

698 if not src.exists(): 

699 raise FileNotFoundError(f"Source does not exist: {src}") 

700 if dst.exists(): 

701 raise FileExistsError(f"Destination already exists: {dst}") 

702 

703 try: 

704 if src.is_dir(): 

705 shutil.copytree(src, dst) 

706 else: 

707 shutil.copy2(src, dst) 

708 except Exception as e: 

709 raise StorageResolutionError(f"Failed to copy {src}{dst}") from e 

710 

711 def _save_rois(self, rois: List, output_path: Path, images_dir: str = None, **kwargs) -> str: 

712 """Save ROIs as .roi.zip archive (ImageJ standard format). 

713 

714 Args: 

715 rois: List of ROI objects 

716 output_path: Output path (e.g., /disk/plate_001/step_7_results/A01_rois_step7.roi.zip) 

717 images_dir: Images directory path (unused for disk backend) 

718 

719 Returns: 

720 Path where ROIs were saved 

721 """ 

722 import zipfile 

723 import numpy as np 

724 from openhcs.core.roi import PolygonShape, MaskShape, PointShape, EllipseShape 

725 

726 output_path = Path(output_path) 

727 

728 # Ensure output directory exists 

729 output_path.parent.mkdir(parents=True, exist_ok=True) 

730 

731 # Ensure output path has .roi.zip extension 

732 if not output_path.name.endswith('.roi.zip'): 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true

733 output_path = output_path.with_suffix('.roi.zip') 

734 

735 try: 

736 from roifile import ImagejRoi 

737 except ImportError: 

738 logger.error("roifile library not available - cannot save ROIs") 

739 raise ImportError("roifile library required for ROI saving. Install with: pip install roifile") 

740 

741 # Create .roi.zip archive 

742 roi_count = 0 

743 with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf: 

744 for idx, roi in enumerate(rois): 

745 for shape in roi.shapes: 

746 if isinstance(shape, PolygonShape): 746 ↛ 760line 746 didn't jump to line 760 because the condition on line 746 was always true

747 # Convert polygon to ImageJ ROI 

748 # roifile expects (x, y) coordinates, but we have (y, x) 

749 coords_xy = shape.coordinates[:, [1, 0]] # Swap columns 

750 ij_roi = ImagejRoi.frompoints(coords_xy) 

751 

752 # Use incrementing counter for unique filenames (avoid duplicate names from label values) 

753 ij_roi.name = f"ROI_{roi_count + 1}" 

754 

755 # Write to zip archive 

756 roi_bytes = ij_roi.tobytes() 

757 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes) 

758 roi_count += 1 

759 

760 elif isinstance(shape, PointShape): 

761 # Convert point to ImageJ ROI 

762 coords_xy = np.array([[shape.x, shape.y]]) 

763 ij_roi = ImagejRoi.frompoints(coords_xy) 

764 

765 ij_roi.name = f"ROI_{roi_count + 1}" 

766 

767 roi_bytes = ij_roi.tobytes() 

768 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes) 

769 roi_count += 1 

770 

771 elif isinstance(shape, EllipseShape): 

772 # Convert ellipse to polygon approximation (ImageJ ROI format limitation) 

773 # Generate 64 points around the ellipse 

774 theta = np.linspace(0, 2 * np.pi, 64) 

775 x = shape.center_x + shape.radius_x * np.cos(theta) 

776 y = shape.center_y + shape.radius_y * np.sin(theta) 

777 coords_xy = np.column_stack([x, y]) 

778 

779 ij_roi = ImagejRoi.frompoints(coords_xy) 

780 ij_roi.name = f"ROI_{roi_count + 1}" 

781 

782 roi_bytes = ij_roi.tobytes() 

783 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes) 

784 roi_count += 1 

785 

786 elif isinstance(shape, MaskShape): 

787 # Skip mask shapes - ImageJ ROI format doesn't support binary masks 

788 logger.warning(f"Skipping mask shape for ROI {idx} - not supported in ImageJ .roi format") 

789 continue 

790 

791 logger.info(f"Saved {roi_count} ROIs to .roi.zip archive: {output_path}") 

792 return str(output_path)