Coverage for openhcs/io/disk.py: 51.6%
357 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1# openhcs/io/storage/backends/disk.py
2"""
3Disk-based storage backend implementation.
5This module provides a concrete implementation of the storage backend interfaces
6for local disk storage. It strictly enforces VFS boundaries and doctrinal clauses.
7"""
9import logging
10import os
11import shutil
12from pathlib import Path
13from typing import Any, Callable, Dict, List, Optional, Set, Union
15import numpy as np
17from openhcs.constants.constants import FileFormat, Backend
18from openhcs.io.base import StorageBackend
20logger = logging.getLogger(__name__)
23def optional_import(module_name):
24 try:
25 return __import__(module_name)
26 except ImportError:
27 return None
29# Optional dependencies at module level (not instance level to avoid pickle issues)
30# Skip GPU libraries in subprocess runner mode
31if os.getenv('OPENHCS_SUBPROCESS_NO_GPU') == '1': 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true
32 torch = None
33 jax = None
34 jnp = None
35 cupy = None
36 tf = None
37 logger.info("Subprocess runner mode - skipping GPU library imports in disk backend")
38else:
39 from openhcs.core.lazy_gpu_imports import torch, jax, jnp, cupy, tf
40tifffile = optional_import("tifffile")
42class FileFormatRegistry:
43 def __init__(self):
44 self._writers: Dict[str, Callable[[Path, Any], None]] = {}
45 self._readers: Dict[str, Callable[[Path], Any]] = {}
47 def register(self, ext: str, writer: Callable, reader: Callable):
48 ext = ext.lower()
49 self._writers[ext] = writer
50 self._readers[ext] = reader
52 def get_writer(self, ext: str) -> Callable:
53 return self._writers[ext.lower()]
55 def get_reader(self, ext: str) -> Callable:
56 return self._readers[ext.lower()]
58 def is_registered(self, ext: str) -> bool:
59 return ext.lower() in self._writers and ext.lower() in self._readers
62class DiskStorageBackend(StorageBackend):
63 """Disk storage backend with automatic registration."""
64 _backend_type = Backend.DISK.value
65 def __init__(self):
66 self.format_registry = FileFormatRegistry()
67 self._register_formats()
69 def _register_formats(self):
70 """
71 Register all file format handlers.
73 Uses enum-driven registration to eliminate boilerplate.
74 Complex formats (CSV, JSON, TIFF, ROI.ZIP, TEXT) use custom handlers.
75 Simple formats (NumPy, Torch, CuPy, JAX, TensorFlow) use library save/load directly.
76 """
77 # Format handler metadata: (FileFormat enum, module_check, writer, reader)
78 # None for writer/reader means use the format's library save/load directly
79 format_handlers = [
80 # Simple formats - use library save/load directly
81 (FileFormat.NUMPY, True, np.save, np.load),
82 (FileFormat.TORCH, torch, torch.save if torch else None, torch.load if torch else None),
83 (FileFormat.JAX, (jax and jnp), self._jax_writer, self._jax_reader),
84 (FileFormat.CUPY, cupy, self._cupy_writer, self._cupy_reader),
85 (FileFormat.TENSORFLOW, tf, self._tensorflow_writer, self._tensorflow_reader),
87 # Complex formats - use custom handlers
88 (FileFormat.TIFF, tifffile, self._tiff_writer, self._tiff_reader),
89 (FileFormat.TEXT, True, self._text_writer, self._text_reader),
90 (FileFormat.JSON, True, self._json_writer, self._json_reader),
91 (FileFormat.CSV, True, self._csv_writer, self._csv_reader),
92 (FileFormat.ROI, True, self._roi_zip_writer, self._roi_zip_reader),
93 ]
95 # Register all available formats
96 for file_format, module_available, writer, reader in format_handlers:
97 if not module_available or writer is None or reader is None:
98 continue
100 # Register all extensions for this format
101 for ext in file_format.value:
102 self.format_registry.register(ext.lower(), writer, reader)
104 # Format-specific writer/reader functions (pickleable)
105 # Only needed for formats that require special handling beyond library save/load
107 def _jax_writer(self, path, data, **kwargs):
108 """JAX arrays must be moved to CPU before saving."""
109 np.save(path, jax.device_get(data))
111 def _jax_reader(self, path):
112 """Load NumPy array and convert to JAX."""
113 return jnp.array(np.load(path))
115 def _cupy_writer(self, path, data, **kwargs):
116 """CuPy has its own save format."""
117 cupy.save(path, data)
119 def _cupy_reader(self, path):
120 """Load CuPy array from disk."""
121 return cupy.load(path)
123 def _tensorflow_writer(self, path, data, **kwargs):
124 """TensorFlow uses tensor serialization."""
125 tf.io.write_file(path.as_posix(), tf.io.serialize_tensor(data))
127 def _tensorflow_reader(self, path):
128 """Load and deserialize TensorFlow tensor."""
129 return tf.io.parse_tensor(tf.io.read_file(path.as_posix()), out_type=tf.dtypes.float32)
131 def _tiff_writer(self, path, data, **kwargs):
132 tifffile.imwrite(path, data)
134 def _tiff_reader(self, path):
135 # For symlinks, try multiple approaches to handle filesystem issues
136 path_obj = Path(path)
138 if path_obj.is_symlink(): 138 ↛ 140line 138 didn't jump to line 140 because the condition on line 138 was never true
139 # First try reading the symlink directly (let OS handle it)
140 try:
141 return tifffile.imread(str(path))
142 except FileNotFoundError:
143 # If that fails, try the target path
144 try:
145 target_path = path_obj.readlink()
146 return tifffile.imread(str(target_path))
147 except FileNotFoundError:
148 # If target doesn't exist, try resolving the symlink
149 resolved_path = path_obj.resolve()
150 return tifffile.imread(str(resolved_path))
151 else:
152 return tifffile.imread(str(path))
154 def _text_writer(self, path, data, **kwargs):
155 """Write text data to file. Accepts and ignores extra kwargs for compatibility."""
156 path.write_text(str(data))
158 def _text_reader(self, path):
159 return path.read_text()
161 def _json_writer(self, path, data, **kwargs):
162 import json
163 # Ensure parent directory exists
164 path.parent.mkdir(parents=True, exist_ok=True)
165 path.write_text(json.dumps(data, indent=2))
167 def _json_reader(self, path):
168 import json
169 return json.loads(path.read_text())
171 def _csv_writer(self, path, data, **kwargs):
172 import csv
173 # Assume data is a list of rows or a dict
174 with path.open('w', newline='') as f:
175 if isinstance(data, dict): 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was never true
176 # Write dict as CSV with headers
177 writer = csv.DictWriter(f, fieldnames=data.keys())
178 writer.writeheader()
179 writer.writerow(data)
180 elif isinstance(data, list) and len(data) > 0: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 if isinstance(data[0], dict):
182 # List of dicts
183 writer = csv.DictWriter(f, fieldnames=data[0].keys())
184 writer.writeheader()
185 writer.writerows(data)
186 else:
187 # List of lists/tuples
188 writer = csv.writer(f)
189 writer.writerows(data)
190 else:
191 # Fallback: write as single row
192 writer = csv.writer(f)
193 writer.writerow([data])
195 def _roi_zip_writer(self, path, data, **kwargs):
196 """Write ROIs to .roi.zip archive. Wrapper for _save_rois."""
197 # data should be a list of ROI objects
198 self._save_rois(data, path, **kwargs)
200 def _roi_zip_reader(self, path, **kwargs):
201 """Read ROIs from .roi.zip archive."""
202 from openhcs.core.roi import load_rois_from_zip
203 return load_rois_from_zip(path)
205 def _csv_reader(self, path):
206 import csv
207 with path.open('r', newline='') as f:
208 reader = csv.DictReader(f)
209 return list(reader)
212 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
213 """
214 Load data from disk based on explicit content type.
216 Args:
217 file_path: Path to the file to load
218 **kwargs: Additional arguments for the load operation, must include 'content_type'
219 to explicitly specify the type of content to load
221 Returns:
222 The loaded data
224 Raises:
225 TypeError: If file_path is not a valid path type or content_type is not specified
226 FileNotFoundError: If the file does not exist
227 ValueError: If the file cannot be loaded
228 """
230 disk_path = Path(file_path)
232 # Handle double extensions (e.g., .roi.zip, .csv.zip)
233 # Check if file has double extension by looking at suffixes
234 ext = None
235 if len(disk_path.suffixes) >= 2: 235 ↛ 237line 235 didn't jump to line 237 because the condition on line 235 was never true
236 # Try double extension first (e.g., '.roi.zip')
237 double_ext = ''.join(disk_path.suffixes[-2:]).lower()
238 if self.format_registry.is_registered(double_ext):
239 ext = double_ext
241 # Fall back to single extension if double extension not registered
242 if ext is None: 242 ↛ 245line 242 didn't jump to line 245 because the condition on line 242 was always true
243 ext = disk_path.suffix.lower()
245 if not self.format_registry.is_registered(ext): 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 raise ValueError(f"No writer registered for extension '{ext}'")
248 try:
249 reader = self.format_registry.get_reader(ext)
250 return reader(disk_path, **kwargs)
251 except Exception as e:
252 raise ValueError(f"Error loading data from {disk_path}: {e}") from e
254 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None:
255 """
256 Save data to disk based on explicit content type.
258 Args:
259 data: The data to save
260 output_path: Path where the data should be saved
261 **kwargs: Additional arguments for the save operation, must include 'content_type'
262 to explicitly specify the type of content to save
264 Raises:
265 TypeError: If output_path is not a valid path type or content_type is not specified
266 ValueError: If the data cannot be saved
267 """
268 from openhcs.core.roi import ROI
270 disk_output_path = Path(output_path)
272 # Explicit type dispatch for ROI data
273 if isinstance(data, list) and len(data) > 0 and isinstance(data[0], ROI):
274 # ROI data - save as JSON
275 images_dir = kwargs.pop('images_dir', None)
276 self._save_rois(data, disk_output_path, images_dir=images_dir, **kwargs)
277 return
279 ext = disk_output_path.suffix.lower()
280 if not self.format_registry.is_registered(ext): 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 raise ValueError(f"No writer registered for extension '{ext}'")
283 try:
284 writer = self.format_registry.get_writer(ext)
285 return writer(disk_output_path, data, **kwargs )
286 except Exception as e:
287 raise ValueError(f"Error saving data to {disk_output_path}: {e}") from e
289 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
290 """
291 Load multiple files sequentially using existing load method.
293 Args:
294 file_paths: List of file paths to load
295 **kwargs: Additional arguments passed to load method
297 Returns:
298 List of loaded data objects in the same order as file_paths
299 """
300 results = []
301 for file_path in file_paths:
302 result = self.load(file_path, **kwargs)
303 results.append(result)
304 return results
306 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None:
307 """
308 Save multiple files sequentially using existing save method.
310 Converts GPU arrays to CPU numpy arrays before saving using OpenHCS memory conversion system.
312 Args:
313 data_list: List of data objects to save
314 output_paths: List of destination paths (must match length of data_list)
315 **kwargs: Additional arguments passed to save method
317 Raises:
318 ValueError: If data_list and output_paths have different lengths
319 """
320 if len(data_list) != len(output_paths): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})")
323 # Convert GPU arrays to CPU numpy arrays using OpenHCS memory conversion system
324 from openhcs.core.memory.converters import convert_memory, detect_memory_type
325 from openhcs.constants.constants import MemoryType
327 cpu_data_list = []
328 for data in data_list:
329 # Detect the memory type of the data
330 source_type = detect_memory_type(data)
332 # Convert to numpy if not already numpy
333 if source_type == MemoryType.NUMPY.value: 333 ↛ 339line 333 didn't jump to line 339 because the condition on line 333 was always true
334 # Already numpy, use as-is
335 cpu_data_list.append(data)
336 else:
337 # Convert to numpy using OpenHCS memory conversion system
338 # Allow CPU roundtrip since we're explicitly going to disk
339 numpy_data = convert_memory(
340 data=data,
341 source_type=source_type,
342 target_type=MemoryType.NUMPY.value,
343 gpu_id=0 # Placeholder since numpy doesn't use GPU ID
344 )
345 cpu_data_list.append(numpy_data)
347 # Save converted data using existing save method
348 for cpu_data, output_path in zip(cpu_data_list, output_paths):
349 self.save(cpu_data, output_path, **kwargs)
351 def list_files(self, directory: Union[str, Path], pattern: Optional[str] = None,
352 extensions: Optional[Set[str]] = None, recursive: bool = False) -> List[Union[str,Path]]:
353 """
354 List files on disk, optionally filtering by pattern and extensions.
356 Args:
357 directory: Directory to search.
358 pattern: Optional glob pattern to match filenames.
359 extensions: Optional set of file extensions to filter by (e.g., {'.tif', '.png'}).
360 Extensions should include the dot and are case-insensitive.
361 recursive: Whether to search recursively.
363 Returns:
364 List of paths to matching files.
366 Raises:
367 TypeError: If directory is not a valid path type
368 FileNotFoundError: If the directory does not exist
369 """
370 disk_directory = Path(directory)
372 if not disk_directory.is_dir(): 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 raise ValueError(f"Path is not a directory: {disk_directory}")
375 # Use appropriate search strategy based on recursion
376 if recursive:
377 # Use breadth-first traversal to prioritize shallower files
378 files = self._list_files_breadth_first(disk_directory, pattern)
379 else:
380 glob_pattern = pattern if pattern else "*"
381 # Include both regular files and symlinks (even broken ones)
382 files = [p for p in disk_directory.glob(glob_pattern) if p.is_file() or p.is_symlink()]
384 # Filter out macOS metadata files (._* files) that interfere with parsing
385 files = [f for f in files if not f.name.startswith('._')]
387 # Filter by extensions if provided
388 if extensions:
389 # Convert extensions to lowercase for case-insensitive comparison
390 lowercase_extensions = {ext.lower() for ext in extensions}
391 files = [f for f in files if f.suffix.lower() in lowercase_extensions]
393 # Return paths as strings
394 return [str(f) for f in files]
396 def _list_files_breadth_first(self, directory: Path, pattern: Optional[str] = None) -> List[Path]:
397 """
398 List files using breadth-first traversal to prioritize shallower files.
400 This ensures that files in the root directory are found before files
401 in subdirectories, which is important for metadata detection.
403 Args:
404 directory: Root directory to search
405 pattern: Optional glob pattern to match filenames
407 Returns:
408 List of file paths sorted by depth (shallower first)
409 """
410 from collections import deque
412 files = []
413 # Use deque for breadth-first traversal
414 dirs_to_search = deque([(directory, 0)]) # (path, depth)
416 while dirs_to_search:
417 current_dir, depth = dirs_to_search.popleft()
419 try:
420 # Get all entries in current directory
421 for entry in current_dir.iterdir():
422 if entry.is_file():
423 # Filter out macOS metadata files (._* files) that interfere with parsing
424 if entry.name.startswith('._'): 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true
425 continue
426 # Check if file matches pattern
427 if pattern is None or entry.match(pattern): 427 ↛ 421line 427 didn't jump to line 421 because the condition on line 427 was always true
428 files.append((entry, depth))
429 elif entry.is_dir(): 429 ↛ 421line 429 didn't jump to line 421 because the condition on line 429 was always true
430 # Add subdirectory to queue for later processing
431 dirs_to_search.append((entry, depth + 1))
432 except (PermissionError, OSError):
433 # Skip directories we can't read
434 continue
436 # Sort by depth first, then by path for consistent ordering
437 files.sort(key=lambda x: (x[1], str(x[0])))
439 # Return just the paths
440 return [file_path for file_path, _ in files]
442 def list_dir(self, path: Union[str, Path]) -> List[str]:
443 path = Path(path)
444 if not path.exists(): 444 ↛ 445line 444 didn't jump to line 445 because the condition on line 444 was never true
445 raise FileNotFoundError(f"Path does not exist: {path}")
446 if not path.is_dir(): 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true
447 raise NotADirectoryError(f"Not a directory: {path}")
448 return [entry.name for entry in path.iterdir()]
451 def delete(self, path: Union[str, Path]) -> None:
452 """
453 Delete a file or empty directory at the given disk path.
455 Args:
456 path: Path to delete
458 Raises:
459 FileNotFoundError: If path does not exist
460 IsADirectoryError: If path is a directory and not empty
461 StorageResolutionError: If deletion fails for unknown reasons
462 """
463 path = Path(path)
465 if not path.exists():
466 raise FileNotFoundError(f"Cannot delete: path does not exist: {path}")
468 try:
469 if path.is_dir():
470 # Do not allow recursive deletion
471 path.rmdir() # will raise OSError if directory is not empty
472 else:
473 path.unlink()
474 except IsADirectoryError:
475 raise
476 except OSError as e:
477 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") from e
478 except Exception as e:
479 raise StorageResolutionError(f"Failed to delete {path}") from e
481 def delete_all(self, path: Union[str, Path]) -> None:
482 """
483 Recursively delete a file or directory and all its contents from disk.
485 Args:
486 path: Filesystem path to delete
488 Raises:
489 FileNotFoundError: If the path does not exist
490 StorageResolutionError: If deletion fails for any reason
491 """
492 path = Path(path)
494 if not path.exists():
495 raise FileNotFoundError(f"Path does not exist: {path}")
497 try:
498 if path.is_file():
499 path.unlink()
500 else:
501 # Safe, recursive removal of directories
502 import shutil
503 shutil.rmtree(path)
504 except Exception as e:
505 raise StorageResolutionError(f"Failed to recursively delete: {path}") from e
508 def ensure_directory(self, directory: Union[str, Path]) -> Union[str, Path]:
509 """
510 Ensure a directory exists on disk.
512 Args:
513 directory: Path to the directory to ensure exists
515 Returns:
516 Path to the directory
518 Raises:
519 TypeError: If directory is not a valid path type
520 ValueError: If there is an error creating the directory
521 """
522 # 🔒 Clause 17 — VFS Boundary Enforcement
523 try:
524 disk_directory = Path(directory)
525 disk_directory.mkdir(parents=True, exist_ok=True)
526 return directory
527 except OSError as e:
528 # 🔒 Clause 65 — No Fallback Logic
529 # Propagate the error with additional context
530 raise ValueError(f"Error creating directory {disk_directory}: {e}") from e
532 def exists(self, path: Union[str, Path]) -> bool:
533 return Path(path).exists()
535 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
536 source = Path(source).resolve()
537 link_name = Path(link_name) # Don't resolve link_name - we want the actual symlink path
539 if not source.exists():
540 raise FileNotFoundError(f"Source path does not exist: {source}")
542 # Check if target exists and handle overwrite policy
543 if link_name.exists() or link_name.is_symlink():
544 if not overwrite:
545 raise FileExistsError(f"Target already exists: {link_name}")
546 link_name.unlink() # Remove existing file/symlink only if overwrite=True
548 link_name.parent.mkdir(parents=True, exist_ok=True)
549 # On Windows, symlink_to() requires target_is_directory to be set correctly
550 # On Unix, this parameter is ignored, so it's safe to always specify it
551 link_name.symlink_to(source, target_is_directory=source.is_dir())
554 def is_symlink(self, path: Union[str, Path]) -> bool:
555 return Path(path).is_symlink()
558 def is_file(self, path: Union[str, Path]) -> bool:
559 path = Path(path)
561 if not path.exists(): 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true
562 raise FileNotFoundError(f"Path does not exist: {path}")
564 # Resolve symlinks and return True only if final target is a file
565 resolved = path.resolve(strict=True)
567 if resolved.is_dir(): 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true
568 raise IsADirectoryError(f"Path is a directory: {path}")
570 return resolved.is_file()
572 def is_dir(self, path: Union[str, Path]) -> bool:
573 """
574 Check if a given disk path is a directory.
576 Follows filesystem symlinks to determine the actual resolved structure.
578 Args:
579 path: Filesystem path (absolute or relative)
581 Returns:
582 bool: True if path resolves to a directory
584 Raises:
585 FileNotFoundError: If the path or symlink target does not exist
586 NotADirectoryError: If the resolved target is not a directory
587 """
588 path = Path(path)
590 if not path.exists(): 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true
591 raise FileNotFoundError(f"Path does not exist: {path}")
593 # Follow symlinks to final real target
594 resolved = path.resolve(strict=True)
596 if not resolved.is_dir(): 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true
597 raise NotADirectoryError(f"Path is not a directory: {path}")
599 return True
601 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
602 """
603 Move a file or directory on disk. Follows symlinks and performs overwrite-safe move.
605 Raises:
606 FileNotFoundError: If source does not exist
607 FileExistsError: If destination already exists
608 StorageResolutionError: On failure to move
609 """
610 import shutil
611 from pathlib import Path
613 src = Path(src)
614 dst = Path(dst)
616 if not src.exists():
617 raise FileNotFoundError(f"Source path does not exist: {src}")
618 if dst.exists():
619 raise FileExistsError(f"Destination already exists: {dst}")
621 try:
622 shutil.move(str(src), str(dst))
623 except Exception as e:
624 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e
626 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
627 """
628 Return structural metadata about a disk-backed path.
630 Returns:
631 dict with keys:
632 - 'type': 'file', 'directory', 'symlink', or 'missing'
633 - 'path': str(path)
634 - 'target': resolved target if symlink
635 - 'exists': bool
637 Raises:
638 StorageResolutionError: On access or resolution failure
639 """
640 path_str = str(path)
641 try:
642 if not os.path.lexists(path_str): # includes broken symlinks
643 return {
644 "type": "missing",
645 "path": path_str,
646 "exists": False
647 }
649 if os.path.islink(path_str):
650 try:
651 resolved = os.readlink(path_str)
652 target_exists = os.path.exists(path_str)
653 except OSError as e:
654 raise StorageResolutionError(f"Failed to resolve symlink: {path}") from e
656 return {
657 "type": "symlink",
658 "path": path_str,
659 "target": resolved,
660 "exists": target_exists
661 }
663 if os.path.isdir(path_str):
664 return {
665 "type": "directory",
666 "path": path_str,
667 "exists": True
668 }
670 if os.path.isfile(path_str):
671 return {
672 "type": "file",
673 "path": path_str,
674 "exists": True
675 }
677 raise StorageResolutionError(f"Unknown filesystem object at: {path_str}")
679 except Exception as e:
680 raise StorageResolutionError(f"Failed to stat disk path: {path}") from e
682 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
683 """
684 Copy a file or directory to a new location.
686 - Does not overwrite destination.
687 - Will raise if destination exists.
688 - Supports file-to-file and dir-to-dir copies.
690 Raises:
691 FileExistsError: If destination already exists
692 FileNotFoundError: If source is missing
693 StorageResolutionError: On structural failure
694 """
695 src = Path(src)
696 dst = Path(dst)
698 if not src.exists():
699 raise FileNotFoundError(f"Source does not exist: {src}")
700 if dst.exists():
701 raise FileExistsError(f"Destination already exists: {dst}")
703 try:
704 if src.is_dir():
705 shutil.copytree(src, dst)
706 else:
707 shutil.copy2(src, dst)
708 except Exception as e:
709 raise StorageResolutionError(f"Failed to copy {src} → {dst}") from e
711 def _save_rois(self, rois: List, output_path: Path, images_dir: str = None, **kwargs) -> str:
712 """Save ROIs as .roi.zip archive (ImageJ standard format).
714 Args:
715 rois: List of ROI objects
716 output_path: Output path (e.g., /disk/plate_001/step_7_results/A01_rois_step7.roi.zip)
717 images_dir: Images directory path (unused for disk backend)
719 Returns:
720 Path where ROIs were saved
721 """
722 import zipfile
723 import numpy as np
724 from openhcs.core.roi import PolygonShape, MaskShape, PointShape, EllipseShape
726 output_path = Path(output_path)
728 # Ensure output directory exists
729 output_path.parent.mkdir(parents=True, exist_ok=True)
731 # Ensure output path has .roi.zip extension
732 if not output_path.name.endswith('.roi.zip'): 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 output_path = output_path.with_suffix('.roi.zip')
735 try:
736 from roifile import ImagejRoi
737 except ImportError:
738 logger.error("roifile library not available - cannot save ROIs")
739 raise ImportError("roifile library required for ROI saving. Install with: pip install roifile")
741 # Create .roi.zip archive
742 roi_count = 0
743 with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
744 for idx, roi in enumerate(rois):
745 for shape in roi.shapes:
746 if isinstance(shape, PolygonShape): 746 ↛ 760line 746 didn't jump to line 760 because the condition on line 746 was always true
747 # Convert polygon to ImageJ ROI
748 # roifile expects (x, y) coordinates, but we have (y, x)
749 coords_xy = shape.coordinates[:, [1, 0]] # Swap columns
750 ij_roi = ImagejRoi.frompoints(coords_xy)
752 # Use incrementing counter for unique filenames (avoid duplicate names from label values)
753 ij_roi.name = f"ROI_{roi_count + 1}"
755 # Write to zip archive
756 roi_bytes = ij_roi.tobytes()
757 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes)
758 roi_count += 1
760 elif isinstance(shape, PointShape):
761 # Convert point to ImageJ ROI
762 coords_xy = np.array([[shape.x, shape.y]])
763 ij_roi = ImagejRoi.frompoints(coords_xy)
765 ij_roi.name = f"ROI_{roi_count + 1}"
767 roi_bytes = ij_roi.tobytes()
768 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes)
769 roi_count += 1
771 elif isinstance(shape, EllipseShape):
772 # Convert ellipse to polygon approximation (ImageJ ROI format limitation)
773 # Generate 64 points around the ellipse
774 theta = np.linspace(0, 2 * np.pi, 64)
775 x = shape.center_x + shape.radius_x * np.cos(theta)
776 y = shape.center_y + shape.radius_y * np.sin(theta)
777 coords_xy = np.column_stack([x, y])
779 ij_roi = ImagejRoi.frompoints(coords_xy)
780 ij_roi.name = f"ROI_{roi_count + 1}"
782 roi_bytes = ij_roi.tobytes()
783 zf.writestr(f"{roi_count + 1:04d}.roi", roi_bytes)
784 roi_count += 1
786 elif isinstance(shape, MaskShape):
787 # Skip mask shapes - ImageJ ROI format doesn't support binary masks
788 logger.warning(f"Skipping mask shape for ROI {idx} - not supported in ImageJ .roi format")
789 continue
791 logger.info(f"Saved {roi_count} ROIs to .roi.zip archive: {output_path}")
792 return str(output_path)