Coverage for openhcs/io/zarr.py: 7.1%
513 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1# openhcs/io/storage/backends/zarr.py
2"""
3Zarr storage backend module for OpenHCS.
5This module provides a Zarr-backed implementation of the MicroscopyStorageBackend interface.
6It stores data in a Zarr store on disk and supports overlay operations
7for materializing data to disk when needed.
8"""
10import fnmatch
11import logging
12from pathlib import Path
13from typing import Any, Dict, List, Literal, Optional, Set, Tuple, Union
15import numpy as np
16import zarr
18try:
19 from ome_zarr.writer import write_image, write_plate_metadata, write_well_metadata
20 from ome_zarr.io import parse_url
21 OME_ZARR_AVAILABLE = True
22except ImportError:
23 OME_ZARR_AVAILABLE = False
25from openhcs.constants.constants import Backend, DEFAULT_IMAGE_EXTENSIONS
26from openhcs.io.base import StorageBackend
27from openhcs.io.backend_registry import StorageBackendMeta
28from openhcs.constants.constants import Backend
29from openhcs.io.exceptions import StorageResolutionError
31logger = logging.getLogger(__name__)
34class ZarrStorageBackend(StorageBackend, metaclass=StorageBackendMeta):
35 """Zarr storage backend with automatic metaclass registration."""
37 # Backend type from enum for registration
38 _backend_type = Backend.ZARR.value
39 """
40 Zarr storage backend implementation with configurable compression.
42 This class provides a concrete implementation of the storage backend interfaces
43 for Zarr storage. It stores data in a Zarr store on disk with configurable
44 compression algorithms and settings.
46 Features:
47 - Single-chunk batch operations for 40x performance improvement
48 - Configurable compression (Blosc, Zlib, LZ4, Zstd, or none)
49 - Configurable compression levels
50 - Full path mapping for batch operations
51 """
53 def __init__(self, zarr_config: Optional['ZarrConfig'] = None):
54 """
55 Initialize Zarr backend with ZarrConfig.
57 Args:
58 zarr_config: ZarrConfig dataclass with all zarr settings (uses defaults if None)
59 """
60 # Import here to avoid circular imports
61 from openhcs.core.config import ZarrConfig
63 if zarr_config is None:
64 zarr_config = ZarrConfig()
66 self.config = zarr_config
68 # Convenience attributes
69 self.compression_level = zarr_config.compression_level
71 # Create actual compressor from config
72 self.compressor = self.config.compressor.create_compressor(
73 self.config.compression_level,
74 self.config.shuffle
75 )
77 def _get_compressor(self) -> Optional[Any]:
78 """
79 Get the configured compressor with appropriate settings.
81 Returns:
82 Configured compressor instance or None for no compression
83 """
84 if self.compressor is None:
85 return None
87 # If compression_level is specified and compressor supports it
88 if self.compression_level is not None:
89 # Check if compressor has level parameter
90 if hasattr(self.compressor, '__class__'):
91 try:
92 # Create new instance with compression level
93 compressor_class = self.compressor.__class__
94 if 'level' in compressor_class.__init__.__code__.co_varnames:
95 return compressor_class(level=self.compression_level)
96 elif 'clevel' in compressor_class.__init__.__code__.co_varnames:
97 return compressor_class(clevel=self.compression_level)
98 except (AttributeError, TypeError):
99 # Fall back to original compressor if level setting fails
100 pass
102 return self.compressor
106 def _split_store_and_key(self, path: Union[str, Path]) -> Tuple[Any, str]:
107 """
108 Split path into zarr store and key without auto-injection.
109 Path planner now provides the complete storage path.
111 Maps paths to zarr store and key:
112 - Directory: "/path/to/plate/images.zarr" → Store: "/path/to/plate/images.zarr", Key: ""
113 - File: "/path/to/plate/images.zarr/A01.tif" → Store: "/path/to/plate/images.zarr", Key: "A01.tif"
115 Returns a DirectoryStore with dimension_separator='/' for OME-ZARR compatibility.
116 """
117 path = Path(path)
119 # If path has no extension or ends with .zarr, treat as directory (zarr store)
120 if not path.suffix or path.suffix == '.zarr':
121 # Directory path - treat as zarr store
122 store_path = path
123 relative_key = ""
124 else:
125 # File path - parent is zarr store, filename is key
126 store_path = path.parent
127 relative_key = path.name
129 # CRITICAL: Create DirectoryStore with dimension_separator='/' for OME-ZARR compatibility
130 # This ensures chunk paths use '/' instead of '.' (e.g., '0/0/0' not '0.0.0')
131 store = zarr.DirectoryStore(str(store_path), dimension_separator='/')
132 return store, relative_key
134 def save(self, data: Any, output_path: Union[str, Path], **kwargs):
135 """
136 Save data to Zarr at the given output_path.
138 Will only write if the key does not already exist.
139 Will NOT overwrite or delete existing data.
141 Raises:
142 FileExistsError: If destination key already exists
143 StorageResolutionError: If creation fails
144 """
145 store, key = self._split_store_and_key(output_path)
146 group = zarr.group(store=store)
148 if key in group:
149 raise FileExistsError(f"Zarr key already exists: {output_path}")
151 chunks = kwargs.get("chunks")
152 if chunks is None:
153 chunks = self._auto_chunks(data, chunk_divisor=kwargs.get("chunk_divisor", 1))
155 try:
156 # Create array with correct shape and dtype, then assign data
157 array = group.create_dataset(
158 name=key,
159 shape=data.shape,
160 dtype=data.dtype,
161 chunks=chunks,
162 compressor=kwargs.get("compressor", self._get_compressor()),
163 overwrite=False # 🔒 Must be False by doctrine
164 )
165 array[:] = data
166 except Exception as e:
167 raise StorageResolutionError(f"Failed to save to Zarr: {output_path}") from e
169 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
170 """
171 Load from zarr array using filename mapping.
173 Args:
174 file_paths: List of file paths to load
175 **kwargs: Additional arguments (zarr_config not needed)
177 Returns:
178 List of loaded data objects in same order as file_paths
180 Raises:
181 FileNotFoundError: If expected zarr store not found
182 KeyError: If filename not found in filename_map
183 """
184 if not file_paths:
185 return []
187 # Use _split_store_and_key to get store path from first file path
188 store, _ = self._split_store_and_key(file_paths[0])
189 store_path = Path(store.path)
191 # FAIL LOUD: Store must exist
192 if not store_path.exists():
193 raise FileNotFoundError(f"Expected zarr store not found: {store_path}")
194 root = zarr.open_group(store=store, mode='r')
196 # Group files by well based on OME-ZARR structure
197 well_to_files = {}
198 well_to_indices = {}
200 # Search OME-ZARR structure for requested files
201 for row_name in root.group_keys():
202 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.)
203 row_group = root[row_name]
204 for col_name in row_group.group_keys():
205 if col_name.isdigit(): # Column directory (01, 02, etc.)
206 well_group = row_group[col_name]
207 well_name = f"{row_name}{col_name}"
209 # Check if this well has our filename mapping in the field array
210 if "0" in well_group.group_keys():
211 field_group = well_group["0"]
212 if "0" in field_group.array_keys():
213 field_array = field_group["0"]
214 if "openhcs_filename_map" in field_array.attrs:
215 filename_map = dict(field_array.attrs["openhcs_filename_map"])
217 # Check which requested files are in this well
218 for i, path in enumerate(file_paths):
219 filename = Path(path).name # Use filename only for matching
220 if filename in filename_map:
221 if well_name not in well_to_files:
222 well_to_files[well_name] = []
223 well_to_indices[well_name] = []
224 well_to_files[well_name].append(i) # Original position in file_paths
225 well_to_indices[well_name].append(filename_map[filename]) # 5D coordinates (field, channel, z)
227 # Load data from each well using single well chunk
228 results = [None] * len(file_paths) # Pre-allocate results array
230 for well_name, file_positions in well_to_files.items():
231 row, col = well_name[0], well_name[1:]
232 well_group = root[row][col]
233 well_indices = well_to_indices[well_name]
235 # Load entire well field array in single operation (well chunking)
236 field_group = well_group["0"]
237 field_array = field_group["0"]
238 all_well_data = field_array[:] # Single I/O operation for entire well
240 # Extract requested 2D slices using 5D coordinates
241 for file_pos, coords_5d in zip(file_positions, well_indices):
242 field_idx, channel_idx, z_idx = coords_5d
243 # Extract 2D slice: (field, channel, z, y, x) -> (y, x)
244 results[file_pos] = all_well_data[field_idx, channel_idx, z_idx, :, :] # 2D slice
246 logger.debug(f"Loaded {len(file_paths)} images from zarr store at {store_path} from {len(well_to_files)} wells")
247 return results
249 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None:
250 """Save multiple images using ome-zarr-py for proper OME-ZARR compliance with multi-dimensional support.
252 Args:
253 data_list: List of image data to save
254 output_paths: List of output file paths
255 **kwargs: Must include chunk_name, n_channels, n_z, n_fields, row, col
256 """
258 # Extract required parameters from kwargs
259 chunk_name = kwargs.get('chunk_name')
260 n_channels = kwargs.get('n_channels')
261 n_z = kwargs.get('n_z')
262 n_fields = kwargs.get('n_fields')
263 row = kwargs.get('row')
264 col = kwargs.get('col')
266 # Validate required parameters
267 if chunk_name is None:
268 raise ValueError("chunk_name must be provided")
269 if n_channels is None:
270 raise ValueError("n_channels must be provided")
271 if n_z is None:
272 raise ValueError("n_z must be provided")
273 if n_fields is None:
274 raise ValueError("n_fields must be provided")
275 if row is None:
276 raise ValueError("row must be provided")
277 if col is None:
278 raise ValueError("col must be provided")
280 if not data_list:
281 logger.warning(f"Empty data list for chunk {chunk_name}")
282 return
284 if not OME_ZARR_AVAILABLE:
285 raise ImportError("ome-zarr package is required. Install with: pip install ome-zarr")
287 # Use _split_store_and_key to get store path from first output path
288 store, _ = self._split_store_and_key(output_paths[0])
289 store_path = Path(store.path)
291 logger.debug(f"Saving batch for chunk {chunk_name} with {len(data_list)} images to row={row}, col={col}")
293 # Convert GPU arrays to CPU arrays before saving
294 cpu_data_list = []
295 for data in data_list:
296 if hasattr(data, 'get'): # CuPy array
297 cpu_data_list.append(data.get())
298 elif hasattr(data, 'cpu'): # PyTorch tensor
299 cpu_data_list.append(data.cpu().numpy())
300 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # JAX on GPU
301 import jax
302 cpu_data_list.append(jax.device_get(data))
303 else: # Already CPU array (NumPy, etc.)
304 cpu_data_list.append(data)
306 # Ensure parent directory exists
307 store_path.parent.mkdir(parents=True, exist_ok=True)
309 # Use _split_store_and_key to get properly configured store with dimension_separator='/'
310 store, _ = self._split_store_and_key(store_path)
311 root = zarr.group(store=store) # Open existing or create new group without mode conflicts
313 # Set OME metadata if not already present
314 if "ome" not in root.attrs:
315 root.attrs["ome"] = {"version": "0.4"}
317 # Get the store for compatibility with existing code
318 store = root.store
320 # Write plate metadata with locking to prevent concurrent corruption (if enabled)
321 should_write_plate_metadata = kwargs.get('write_plate_metadata', self.config.write_plate_metadata)
322 if should_write_plate_metadata:
323 self._ensure_plate_metadata_with_lock(root, row, col, store_path)
325 # Create HCS-compliant structure: plate/row/col/field/resolution
326 # Create row group if it doesn't exist
327 if row not in root:
328 row_group = root.create_group(row)
329 else:
330 row_group = root[row]
332 # Create well group (remove existing if present to allow overwrite)
333 if col in row_group:
334 del row_group[col]
335 well_group = row_group.create_group(col)
337 # Add HCS well metadata
338 well_metadata = {
339 "images": [
340 {
341 "path": "0", # Single image containing all fields
342 "acquisition": 0
343 }
344 ],
345 "version": "0.5"
346 }
347 well_group.attrs["ome"] = {"version": "0.5", "well": well_metadata}
349 # Create field group (single field "0" containing all field data)
350 field_group = well_group.require_group("0")
352 # Always use full 5D structure: (fields, channels, z, y, x)
353 # Define OME-NGFF compliant axes
354 axes = [
355 {'name': 'field', 'type': 'field'}, # Custom field type - allowed before space
356 {'name': 'c', 'type': 'channel'},
357 {'name': 'z', 'type': 'space'},
358 {'name': 'y', 'type': 'space'},
359 {'name': 'x', 'type': 'space'}
360 ]
362 # Get image dimensions
363 sample_image = cpu_data_list[0]
364 height, width = sample_image.shape[-2:]
366 # Always reshape to full 5D: (n_fields, n_channels, n_z, y, x)
367 target_shape = [n_fields, n_channels, n_z, height, width]
369 # Stack and reshape data
370 stacked_data = np.stack(cpu_data_list, axis=0)
372 # Calculate total expected images for validation
373 total_expected = n_fields * n_channels * n_z
374 if len(data_list) != total_expected:
375 logger.warning(f"Data count mismatch: got {len(data_list)}, expected {total_expected} "
376 f"(fields={n_fields}, channels={n_channels}, z={n_z})")
378 # Always reshape to 5D structure
379 reshaped_data = stacked_data.reshape(target_shape)
381 logger.info(f"Zarr save_batch: {len(data_list)} images → {stacked_data.shape} → {reshaped_data.shape}")
382 axes_names = [ax['name'] for ax in axes]
383 logger.info(f"Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, axes={''.join(axes_names)}")
385 # Create field group (single field "0" containing all field data)
386 if "0" in well_group:
387 field_group = well_group["0"]
388 else:
389 field_group = well_group.create_group("0")
391 # Write OME-ZARR well metadata with single field (well-chunked approach)
392 write_well_metadata(well_group, ['0'])
394 # Use single chunk approach for optimal performance
395 storage_options = {
396 "chunks": reshaped_data.shape, # Single chunk for entire well
397 "compressor": self._get_compressor()
398 }
400 # Write as single well-chunked array with proper multi-dimensional axes
401 write_image(
402 image=reshaped_data,
403 group=field_group,
404 axes=axes,
405 storage_options=storage_options,
406 scaler=None, # Single scale only for performance
407 compute=True
408 )
410 # Axes are already correctly set by write_image function
412 # Store filename mapping with 5D coordinates (field, channel, z, y, x)
413 # Convert flat index to 5D coordinates for proper zarr slicing
414 filename_map = {}
415 for i, path in enumerate(output_paths):
416 # Calculate 5D coordinates from flat index
417 field_idx = i // (n_channels * n_z)
418 remaining = i % (n_channels * n_z)
419 channel_idx = remaining // n_z
420 z_idx = remaining % n_z
422 # Store as tuple (field, channel, z) - y,x are full slices
423 filename_map[Path(path).name] = (field_idx, channel_idx, z_idx)
425 field_array = field_group['0']
426 field_array.attrs["openhcs_filename_map"] = filename_map
427 field_array.attrs["openhcs_output_paths"] = [str(path) for path in output_paths]
428 field_array.attrs["openhcs_dimensions"] = {
429 "n_fields": n_fields,
430 "n_channels": n_channels,
431 "n_z": n_z
432 }
434 logger.debug(f"Successfully saved batch for chunk {chunk_name}")
436 # Aggressive memory cleanup
437 del cpu_data_list
438 import gc
439 gc.collect()
441 def _ensure_plate_metadata_with_lock(self, root: zarr.Group, row: str, col: str, store_path: Path) -> None:
442 """Ensure plate-level metadata includes ALL existing wells with file locking."""
443 import fcntl
445 lock_path = store_path.with_suffix('.metadata.lock')
447 try:
448 with open(lock_path, 'w') as lock_file:
449 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
450 self._ensure_plate_metadata(root, row, col)
451 except Exception as e:
452 logger.error(f"Failed to update plate metadata with lock: {e}")
453 raise
454 finally:
455 if lock_path.exists():
456 lock_path.unlink()
458 def _ensure_plate_metadata(self, root: zarr.Group, row: str, col: str) -> None:
459 """Ensure plate-level metadata includes ALL existing wells in the store."""
461 # Scan the store for all existing wells
462 all_rows = set()
463 all_cols = set()
464 all_wells = []
466 for row_name in root.group_keys():
467 if isinstance(root[row_name], zarr.Group): # Ensure it's a row group
468 row_group = root[row_name]
469 all_rows.add(row_name)
471 for col_name in row_group.group_keys():
472 if isinstance(row_group[col_name], zarr.Group): # Ensure it's a well group
473 all_cols.add(col_name)
474 well_path = f"{row_name}/{col_name}"
475 all_wells.append(well_path)
477 # Include the current well being added (might not exist yet)
478 all_rows.add(row)
479 all_cols.add(col)
480 current_well_path = f"{row}/{col}"
481 if current_well_path not in all_wells:
482 all_wells.append(current_well_path)
484 # Sort for consistent ordering
485 sorted_rows = sorted(all_rows)
486 sorted_cols = sorted(all_cols)
487 sorted_wells = sorted(all_wells)
489 # Build wells metadata with proper indices
490 wells_metadata = []
491 for well_path in sorted_wells:
492 well_row, well_col = well_path.split("/")
493 row_index = sorted_rows.index(well_row)
494 col_index = sorted_cols.index(well_col)
495 wells_metadata.append({
496 "path": well_path,
497 "rowIndex": row_index,
498 "columnIndex": col_index
499 })
501 # Add acquisition metadata for HCS compliance
502 acquisitions = [
503 {
504 "id": 0,
505 "name": "default_acquisition",
506 "maximumfieldcount": 1 # Single field containing all field data
507 }
508 ]
510 # Write complete HCS plate metadata
511 write_plate_metadata(
512 root,
513 sorted_rows,
514 sorted_cols,
515 wells_metadata,
516 acquisitions=acquisitions,
517 field_count=1,
518 name="OpenHCS_Plate"
519 )
526 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
527 store, key = self._split_store_and_key(file_path)
528 group = zarr.group(store=store)
530 visited = set()
531 while self.is_symlink(key):
532 if key in visited:
533 raise RuntimeError(f"Zarr symlink loop detected at {key}")
534 visited.add(key)
535 key = group[key].attrs["_symlink"]
537 if key not in group:
538 raise FileNotFoundError(f"No array found at key '{key}'")
539 return group[key][:]
541 def list_files(self,
542 directory: Union[str, Path],
543 pattern: Optional[str] = None,
544 extensions: Optional[Set[str]] = None,
545 recursive: bool = False) -> List[Path]:
546 """
547 List all file-like entries (i.e. arrays) in a Zarr store, optionally filtered.
548 Returns filenames from array attributes (output_paths) if available.
549 """
551 store, relative_key = self._split_store_and_key(directory)
552 result: List[Path] = []
554 def _matches_filters(name: str) -> bool:
555 if pattern and not fnmatch.fnmatch(name, pattern):
556 return False
557 if extensions:
558 return any(name.lower().endswith(ext.lower()) for ext in extensions)
559 return True
561 try:
562 # Open zarr group and traverse OME-ZARR structure
563 group = zarr.open_group(store=store)
565 # Check if this is OME-ZARR structure (has plate metadata)
566 if "plate" in group.attrs:
567 # OME-ZARR structure: traverse A/01/ wells
568 for row_name in group.group_keys():
569 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.)
570 row_group = group[row_name]
571 for col_name in row_group.group_keys():
572 if col_name.isdigit(): # Column directory (01, 02, etc.)
573 well_group = row_group[col_name]
575 # Get filenames from field array metadata
576 if "0" in well_group.group_keys():
577 field_group = well_group["0"]
578 if "0" in field_group.array_keys():
579 field_array = field_group["0"]
580 if "openhcs_output_paths" in field_array.attrs:
581 output_paths = field_array.attrs["openhcs_output_paths"]
582 for filename in output_paths:
583 filename_only = Path(filename).name
584 if _matches_filters(filename_only):
585 result.append(Path(filename))
586 else:
587 # Legacy flat structure: get array keys directly
588 array_keys = list(group.array_keys())
589 for array_key in array_keys:
590 try:
591 array = group[array_key]
592 if "output_paths" in array.attrs:
593 # Get original filenames from array attributes
594 output_paths = array.attrs["output_paths"]
595 for filename in output_paths:
596 filename_only = Path(filename).name
597 if _matches_filters(filename_only):
598 result.append(Path(filename))
600 except Exception as e:
601 # Skip arrays that can't be accessed
602 continue
604 except Exception as e:
605 raise StorageResolutionError(f"Failed to list zarr arrays: {e}") from e
607 return result
609 def list_dir(self, path: Union[str, Path]) -> List[str]:
610 store, relative_key = self._split_store_and_key(path)
612 # Normalize key for Zarr API
613 key = relative_key.rstrip("/")
615 try:
616 # Zarr 3.x uses async API - convert async generator to list
617 import asyncio
618 async def _get_entries():
619 entries = []
620 async for entry in store.list_dir(key):
621 entries.append(entry)
622 return entries
623 return asyncio.run(_get_entries())
624 except KeyError:
625 raise NotADirectoryError(f"Zarr path is not a directory: {path}")
626 except FileNotFoundError:
627 raise FileNotFoundError(f"Zarr path does not exist: {path}")
630 def delete(self, path: Union[str, Path]) -> None:
631 """
632 Delete a Zarr array (file) or empty group (directory) at the given path.
634 Args:
635 path: Zarr path or URI
637 Raises:
638 FileNotFoundError: If path does not exist
639 IsADirectoryError: If path is a non-empty group
640 StorageResolutionError: For unexpected failures
641 """
642 import zarr
643 import shutil
644 import os
646 path = str(path)
648 if not os.path.exists(path):
649 raise FileNotFoundError(f"Zarr path does not exist: {path}")
651 try:
652 zarr_obj = zarr.open(path, mode='r')
653 except Exception as e:
654 raise StorageResolutionError(f"Failed to open Zarr path: {path}") from e
656 # Determine if it's a file (array) or directory (group)
657 if isinstance(zarr_obj, zarr.core.Array):
658 try:
659 shutil.rmtree(path) # Array folders can be deleted directly
660 except Exception as e:
661 raise StorageResolutionError(f"Failed to delete Zarr array: {path}") from e
663 elif isinstance(zarr_obj, zarr.hierarchy.Group):
664 if os.listdir(path):
665 raise IsADirectoryError(f"Zarr group is not empty: {path}")
666 try:
667 os.rmdir(path)
668 except Exception as e:
669 raise StorageResolutionError(f"Failed to delete empty Zarr group: {path}") from e
670 else:
671 raise StorageResolutionError(f"Unrecognized Zarr object type at: {path}")
673 def delete_all(self, path: Union[str, Path]) -> None:
674 """
675 Recursively delete a Zarr array or group (file or directory).
677 This is the only permitted recursive deletion method for the Zarr backend.
679 Args:
680 path: the path shared through all backnds
682 Raises:
683 FileNotFoundError: If the path does not exist
684 StorageResolutionError: If deletion fails
685 """
686 import os
687 import shutil
689 path = str(path)
691 if not os.path.exists(path):
692 raise FileNotFoundError(f"Zarr path does not exist: {path}")
694 try:
695 shutil.rmtree(path)
696 except Exception as e:
697 raise StorageResolutionError(f"Failed to recursively delete Zarr path: {path}") from e
699 def exists(self, path: Union[str, Path]) -> bool:
700 path = Path(path)
702 # If path has no file extension, treat as directory existence check
703 # This handles auto_detect_patterns asking "does this directory exist?"
704 if not path.suffix:
705 return path.exists()
707 # Otherwise, check zarr key existence (for actual files)
708 store, key = self._split_store_and_key(path)
710 # First check if the zarr store itself exists
711 if isinstance(store, str):
712 store_path = Path(store)
713 if not store_path.exists():
714 return False
716 try:
717 root_group = zarr.group(store=store)
718 return key in root_group or any(k.startswith(key.rstrip("/") + "/") for k in root_group.array_keys())
719 except Exception:
720 # If we can't open the zarr store, it doesn't exist
721 return False
723 def ensure_directory(self, directory: Union[str, Path]) -> Path:
724 """
725 No-op for zarr backend - zarr stores handle their own structure.
727 Zarr doesn't have filesystem directories that need to be "ensured".
728 Store creation and group structure is handled by save operations.
729 """
730 return Path(directory)
732 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
733 store, src_key = self._split_store_and_key(source)
734 store2, dst_key = self._split_store_and_key(link_name)
736 if store.root != store2.root:
737 raise ValueError("Symlinks must exist within the same .zarr store")
739 group = zarr.group(store=store)
740 if src_key not in group:
741 raise FileNotFoundError(f"Source key '{src_key}' not found in Zarr store")
743 if dst_key in group:
744 if not overwrite:
745 raise FileExistsError(f"Symlink target already exists at: {dst_key}")
746 # Remove existing entry if overwrite=True
747 del group[dst_key]
749 # Create a new group at the symlink path
750 link_group = group.require_group(dst_key)
751 link_group.attrs["_symlink"] = src_key # Store as declared string
753 def is_symlink(self, path: Union[str, Path]) -> bool:
754 """
755 Check if the given Zarr path represents a logical symlink (based on attribute contract).
757 Returns:
758 bool: True if the key exists and has an OpenHCS-declared symlink attribute
759 False if the key doesn't exist or is not a symlink
760 """
761 store, key = self._split_store_and_key(path)
762 group = zarr.group(store=store)
764 try:
765 obj = group[key]
766 attrs = getattr(obj, "attrs", {})
768 if "_symlink" not in attrs:
769 return False
771 # Enforce that the _symlink attr matches schema (e.g. str or list of path components)
772 if not isinstance(attrs["_symlink"], str):
773 raise StorageResolutionError(f"Invalid symlink format in Zarr attrs at: {path}")
775 return True
776 except KeyError:
777 # Key doesn't exist, so it's not a symlink
778 return False
779 except Exception as e:
780 raise StorageResolutionError(f"Failed to inspect Zarr symlink at: {path}") from e
782 def _auto_chunks(self, data: Any, chunk_divisor: int = 1) -> Tuple[int, ...]:
783 shape = data.shape
785 # Simple logic: 1/10th of each dim, with min 1
786 return tuple(max(1, s // chunk_divisor) for s in shape)
788 def is_file(self, path: Union[str, Path]) -> bool:
789 """
790 Check if a Zarr path points to a file (Zarr array), resolving both OS and Zarr-native symlinks.
792 Args:
793 path: Zarr store path (may point to key within store)
795 Returns:
796 bool: True if resolved path is a Zarr array
798 Raises:
799 FileNotFoundError: If path does not exist or broken symlink
800 IsADirectoryError: If resolved object is a Zarr group
801 StorageResolutionError: For other failures
802 """
803 path = str(path)
805 if not os.path.exists(path):
806 raise FileNotFoundError(f"Zarr path does not exist: {path}")
808 try:
809 store, key = self._split_store_and_key(path)
810 group = zarr.group(store=store)
812 # Resolve symlinks (Zarr-native, via .attrs)
813 seen_keys = set()
814 while True:
815 if key not in group:
816 raise FileNotFoundError(f"Zarr key does not exist: {key}")
817 obj = group[key]
819 if hasattr(obj, "attrs") and "_symlink" in obj.attrs:
820 if key in seen_keys:
821 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}")
822 seen_keys.add(key)
823 key = obj.attrs["_symlink"]
824 continue
825 break # resolution complete
827 # Now obj is the resolved target
828 if isinstance(obj, zarr.core.Array):
829 return True
830 elif isinstance(obj, zarr.hierarchy.Group):
831 raise IsADirectoryError(f"Zarr path is a group (directory): {path}")
832 else:
833 raise StorageResolutionError(f"Unknown Zarr object at: {path}")
835 except Exception as e:
836 raise StorageResolutionError(f"Failed to resolve Zarr file path: {path}") from e
838 def is_dir(self, path: Union[str, Path]) -> bool:
839 """
840 Check if a Zarr path resolves to a directory (i.e., a Zarr group).
842 Resolves both OS-level symlinks and Zarr-native symlinks via .attrs['_symlink'].
844 Args:
845 path: Zarr path or URI
847 Returns:
848 bool: True if path resolves to a Zarr group
850 Raises:
851 FileNotFoundError: If path or resolved target does not exist
852 NotADirectoryError: If resolved target is not a group
853 StorageResolutionError: For symlink cycles or other failures
854 """
855 import os
858 path = str(path)
860 if not os.path.exists(path):
861 raise FileNotFoundError(f"Zarr path does not exist: {path}")
863 try:
864 store, key = self._split_store_and_key(path)
865 group = zarr.group(store=store)
867 seen_keys = set()
869 # Resolve symlink chain
870 while True:
871 if key not in group:
872 raise FileNotFoundError(f"Zarr key does not exist: {key}")
873 obj = group[key]
875 if hasattr(obj, "attrs") and "_symlink" in obj.attrs:
876 if key in seen_keys:
877 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}")
878 seen_keys.add(key)
879 key = obj.attrs["_symlink"]
880 continue
881 break
883 # obj is resolved
884 if isinstance(obj, zarr.hierarchy.Group):
885 return True
886 elif isinstance(obj, zarr.core.Array):
887 raise NotADirectoryError(f"Zarr path is an array (file): {path}")
888 else:
889 raise StorageResolutionError(f"Unknown Zarr object at: {path}")
891 except Exception as e:
892 raise StorageResolutionError(f"Failed to resolve Zarr directory path: {path}") from e
894 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
895 """
896 Move a Zarr key or object (array/group) from one location to another, resolving symlinks.
898 Supports:
899 - Disk or memory stores
900 - Zarr-native symlinks
901 - Key renames within group
902 - Full copy+delete across stores if needed
904 Raises:
905 FileNotFoundError: If src does not exist
906 FileExistsError: If dst already exists
907 StorageResolutionError: On failure
908 """
909 import zarr
911 src_store, src_key = self._split_store_and_key(src)
912 dst_store, dst_key = self._split_store_and_key(dst)
914 src_group = zarr.group(store=src_store)
915 dst_group = zarr.group(store=dst_store)
917 if src_key not in src_group:
918 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}")
919 if dst_key in dst_group:
920 raise FileExistsError(f"Zarr destination key already exists: {dst_key}")
922 obj = src_group[src_key]
924 # Resolve symlinks if present
925 seen_keys = set()
926 while hasattr(obj, "attrs") and "_symlink" in obj.attrs:
927 if src_key in seen_keys:
928 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}")
929 seen_keys.add(src_key)
930 src_key = obj.attrs["_symlink"]
931 obj = src_group[src_key]
933 try:
934 if src_store is dst_store:
935 # Native move within the same Zarr group/store
936 src_group.move(src_key, dst_key)
937 else:
938 # Cross-store: perform manual copy + delete
939 obj.copy(dst_group, name=dst_key)
940 del src_group[src_key]
941 except Exception as e:
942 raise StorageResolutionError(f"Failed to move {src_key} to {dst_key}") from e
944 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
945 """
946 Copy a Zarr key or object (array/group) from one location to another.
948 - Resolves Zarr-native symlinks before copying
949 - Prevents overwrite unless explicitly allowed (future feature)
950 - Works across memory or disk stores
952 Raises:
953 FileNotFoundError: If src does not exist
954 FileExistsError: If dst already exists
955 StorageResolutionError: On failure
956 """
957 import zarr
959 src_store, src_key = self._split_store_and_key(src)
960 dst_store, dst_key = self._split_store_and_key(dst)
962 src_group = zarr.group(store=src_store)
963 dst_group = zarr.group(store=dst_store)
965 if src_key not in src_group:
966 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}")
967 if dst_key in dst_group:
968 raise FileExistsError(f"Zarr destination key already exists: {dst_key}")
970 obj = src_group[src_key]
972 seen_keys = set()
973 while hasattr(obj, "attrs") and "_symlink" in obj.attrs:
974 if src_key in seen_keys:
975 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}")
976 seen_keys.add(src_key)
977 src_key = obj.attrs["_symlink"]
978 obj = src_group[src_key]
980 try:
981 obj.copy(dst_group, name=dst_key)
982 except Exception as e:
983 raise StorageResolutionError(f"Failed to copy {src_key} to {dst_key}") from e
985 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
986 """
987 Return structural metadata about a Zarr path.
989 Returns:
990 dict with keys:
991 - 'type': 'file', 'directory', 'symlink', or 'missing'
992 - 'key': final resolved key
993 - 'target': symlink target if applicable
994 - 'store': repr(store)
995 - 'exists': bool
997 Raises:
998 StorageResolutionError: On resolution failure
999 """
1000 store, key = self._split_store_and_key(path)
1001 group = zarr.group(store=store)
1003 try:
1004 if key in group:
1005 obj = group[key]
1006 attrs = getattr(obj, "attrs", {})
1007 is_link = "_symlink" in attrs
1009 if is_link:
1010 target = attrs["_symlink"]
1011 if not isinstance(target, str):
1012 raise StorageResolutionError(f"Invalid symlink format at {key}")
1013 return {
1014 "type": "symlink",
1015 "key": key,
1016 "target": target,
1017 "store": repr(store),
1018 "exists": target in group
1019 }
1021 if isinstance(obj, zarr.Array):
1022 return {
1023 "type": "file",
1024 "key": key,
1025 "store": repr(store),
1026 "exists": True
1027 }
1029 elif isinstance(obj, zarr.Group):
1030 return {
1031 "type": "directory",
1032 "key": key,
1033 "store": repr(store),
1034 "exists": True
1035 }
1037 raise StorageResolutionError(f"Unknown object type at: {key}")
1038 else:
1039 return {
1040 "type": "missing",
1041 "key": key,
1042 "store": repr(store),
1043 "exists": False
1044 }
1046 except Exception as e:
1047 raise StorageResolutionError(f"Failed to stat Zarr key {key}") from e
1049class ZarrSymlink:
1050 """
1051 Represents a symbolic link in a Zarr store.
1053 This class is used to represent symbolic links in a Zarr store.
1054 It stores the target path of the symlink.
1055 """
1056 def __init__(self, target: str):
1057 self.target = target
1059 def __repr__(self):
1060 return f"<ZarrSymlink → {self.target}>"