Coverage for openhcs/io/zarr.py: 44.3%
509 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1# openhcs/io/storage/backends/zarr.py
2"""
3Zarr storage backend module for OpenHCS.
5This module provides a Zarr-backed implementation of the MicroscopyStorageBackend interface.
6It stores data in a Zarr store on disk and supports overlay operations
7for materializing data to disk when needed.
8"""
10import fnmatch
11import logging
12from pathlib import Path
13from typing import Any, Dict, List, Literal, Optional, Set, Tuple, Union
15import numpy as np
16import zarr
18try:
19 from ome_zarr.writer import write_image, write_plate_metadata, write_well_metadata
20 from ome_zarr.io import parse_url
21 OME_ZARR_AVAILABLE = True
22except ImportError:
23 OME_ZARR_AVAILABLE = False
25from openhcs.constants.constants import Backend, DEFAULT_IMAGE_EXTENSIONS
26from openhcs.io.base import StorageBackend
27from openhcs.io.exceptions import StorageResolutionError
29logger = logging.getLogger(__name__)
32class ZarrStorageBackend(StorageBackend):
33 """
34 Zarr storage backend implementation with configurable compression.
36 This class provides a concrete implementation of the storage backend interfaces
37 for Zarr storage. It stores data in a Zarr store on disk with configurable
38 compression algorithms and settings.
40 Features:
41 - Single-chunk batch operations for 40x performance improvement
42 - Configurable compression (Blosc, Zlib, LZ4, Zstd, or none)
43 - Configurable compression levels
44 - Full path mapping for batch operations
45 """
47 def __init__(self, zarr_config: Optional['ZarrConfig'] = None):
48 """
49 Initialize Zarr backend with ZarrConfig.
51 Args:
52 zarr_config: ZarrConfig dataclass with all zarr settings (uses defaults if None)
53 """
54 # Import here to avoid circular imports
55 from openhcs.core.config import ZarrConfig
57 if zarr_config is None:
58 zarr_config = ZarrConfig()
60 self.config = zarr_config
62 # Convenience attributes
63 self.compression_level = zarr_config.compression_level
65 # Create actual compressor from config
66 self.compressor = self.config.compressor.create_compressor(
67 self.config.compression_level,
68 self.config.shuffle
69 )
71 def _get_compressor(self) -> Optional[Any]:
72 """
73 Get the configured compressor with appropriate settings.
75 Returns:
76 Configured compressor instance or None for no compression
77 """
78 if self.compressor is None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 return None
81 # If compression_level is specified and compressor supports it
82 if self.compression_level is not None: 82 ↛ 96line 82 didn't jump to line 96 because the condition on line 82 was always true
83 # Check if compressor has level parameter
84 if hasattr(self.compressor, '__class__'): 84 ↛ 96line 84 didn't jump to line 96 because the condition on line 84 was always true
85 try:
86 # Create new instance with compression level
87 compressor_class = self.compressor.__class__
88 if 'level' in compressor_class.__init__.__code__.co_varnames: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return compressor_class(level=self.compression_level)
90 elif 'clevel' in compressor_class.__init__.__code__.co_varnames: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 return compressor_class(clevel=self.compression_level)
92 except (AttributeError, TypeError):
93 # Fall back to original compressor if level setting fails
94 pass
96 return self.compressor
100 def _split_store_and_key(self, path: Union[str, Path]) -> Tuple[Any, str]:
101 """
102 Split path into zarr store and key without auto-injection.
103 Path planner now provides the complete storage path.
105 Maps paths to zarr store and key:
106 - Directory: "/path/to/plate/images.zarr" → Store: "/path/to/plate/images.zarr", Key: ""
107 - File: "/path/to/plate/images.zarr/A01.tif" → Store: "/path/to/plate/images.zarr", Key: "A01.tif"
109 Returns a DirectoryStore with dimension_separator='/' for OME-ZARR compatibility.
110 """
111 path = Path(path)
113 # If path has no extension or ends with .zarr, treat as directory (zarr store)
114 if not path.suffix or path.suffix == '.zarr':
115 # Directory path - treat as zarr store
116 store_path = path
117 relative_key = ""
118 else:
119 # File path - parent is zarr store, filename is key
120 store_path = path.parent
121 relative_key = path.name
123 # CRITICAL: Create DirectoryStore with dimension_separator='/' for OME-ZARR compatibility
124 # This ensures chunk paths use '/' instead of '.' (e.g., '0/0/0' not '0.0.0')
125 store = zarr.DirectoryStore(str(store_path), dimension_separator='/')
126 return store, relative_key
128 def save(self, data: Any, output_path: Union[str, Path], **kwargs):
129 """
130 Save data to Zarr at the given output_path.
132 Will only write if the key does not already exist.
133 Will NOT overwrite or delete existing data.
135 Raises:
136 FileExistsError: If destination key already exists
137 StorageResolutionError: If creation fails
138 """
139 store, key = self._split_store_and_key(output_path)
140 group = zarr.group(store=store)
142 if key in group:
143 raise FileExistsError(f"Zarr key already exists: {output_path}")
145 chunks = kwargs.get("chunks")
146 if chunks is None:
147 chunks = self._auto_chunks(data, chunk_divisor=kwargs.get("chunk_divisor", 1))
149 try:
150 # Create array with correct shape and dtype, then assign data
151 array = group.create_dataset(
152 name=key,
153 shape=data.shape,
154 dtype=data.dtype,
155 chunks=chunks,
156 compressor=kwargs.get("compressor", self._get_compressor()),
157 overwrite=False # 🔒 Must be False by doctrine
158 )
159 array[:] = data
160 except Exception as e:
161 raise StorageResolutionError(f"Failed to save to Zarr: {output_path}") from e
163 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
164 """
165 Load from zarr array using filename mapping.
167 Args:
168 file_paths: List of file paths to load
169 **kwargs: Additional arguments (zarr_config not needed)
171 Returns:
172 List of loaded data objects in same order as file_paths
174 Raises:
175 FileNotFoundError: If expected zarr store not found
176 KeyError: If filename not found in filename_map
177 """
178 if not file_paths: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 return []
181 # Use _split_store_and_key to get store path from first file path
182 store, _ = self._split_store_and_key(file_paths[0])
183 store_path = Path(store.path)
185 # FAIL LOUD: Store must exist
186 if not store_path.exists(): 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 raise FileNotFoundError(f"Expected zarr store not found: {store_path}")
188 root = zarr.open_group(store=store, mode='r')
190 # Group files by well based on OME-ZARR structure
191 well_to_files = {}
192 well_to_indices = {}
194 # Search OME-ZARR structure for requested files
195 for row_name in root.group_keys():
196 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 196 ↛ 195line 196 didn't jump to line 195 because the condition on line 196 was always true
197 row_group = root[row_name]
198 for col_name in row_group.group_keys():
199 if col_name.isdigit(): # Column directory (01, 02, etc.) 199 ↛ 198line 199 didn't jump to line 198 because the condition on line 199 was always true
200 well_group = row_group[col_name]
201 well_name = f"{row_name}{col_name}"
203 # Check if this well has our filename mapping in the field array
204 if "0" in well_group.group_keys(): 204 ↛ 198line 204 didn't jump to line 198 because the condition on line 204 was always true
205 field_group = well_group["0"]
206 if "0" in field_group.array_keys(): 206 ↛ 198line 206 didn't jump to line 198 because the condition on line 206 was always true
207 field_array = field_group["0"]
208 if "openhcs_filename_map" in field_array.attrs: 208 ↛ 198line 208 didn't jump to line 198 because the condition on line 208 was always true
209 filename_map = dict(field_array.attrs["openhcs_filename_map"])
211 # Check which requested files are in this well
212 for i, path in enumerate(file_paths):
213 filename = Path(path).name # Use filename only for matching
214 if filename in filename_map:
215 if well_name not in well_to_files:
216 well_to_files[well_name] = []
217 well_to_indices[well_name] = []
218 well_to_files[well_name].append(i) # Original position in file_paths
219 well_to_indices[well_name].append(filename_map[filename]) # 5D coordinates (field, channel, z)
221 # Load data from each well using single well chunk
222 results = [None] * len(file_paths) # Pre-allocate results array
224 for well_name, file_positions in well_to_files.items():
225 row, col = well_name[0], well_name[1:]
226 well_group = root[row][col]
227 well_indices = well_to_indices[well_name]
229 # Load entire well field array in single operation (well chunking)
230 field_group = well_group["0"]
231 field_array = field_group["0"]
232 all_well_data = field_array[:] # Single I/O operation for entire well
234 # Extract requested 2D slices using 5D coordinates
235 for file_pos, coords_5d in zip(file_positions, well_indices):
236 field_idx, channel_idx, z_idx = coords_5d
237 # Extract 2D slice: (field, channel, z, y, x) -> (y, x)
238 results[file_pos] = all_well_data[field_idx, channel_idx, z_idx, :, :] # 2D slice
240 logger.debug(f"Loaded {len(file_paths)} images from zarr store at {store_path} from {len(well_to_files)} wells")
241 return results
243 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None:
244 """Save multiple images using ome-zarr-py for proper OME-ZARR compliance with multi-dimensional support.
246 Args:
247 data_list: List of image data to save
248 output_paths: List of output file paths
249 **kwargs: Must include chunk_name, n_channels, n_z, n_fields, row, col
250 """
252 # Extract required parameters from kwargs
253 chunk_name = kwargs.get('chunk_name')
254 n_channels = kwargs.get('n_channels')
255 n_z = kwargs.get('n_z')
256 n_fields = kwargs.get('n_fields')
257 row = kwargs.get('row')
258 col = kwargs.get('col')
260 # Validate required parameters
261 if chunk_name is None: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 raise ValueError("chunk_name must be provided")
263 if n_channels is None: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 raise ValueError("n_channels must be provided")
265 if n_z is None: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true
266 raise ValueError("n_z must be provided")
267 if n_fields is None: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true
268 raise ValueError("n_fields must be provided")
269 if row is None: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true
270 raise ValueError("row must be provided")
271 if col is None: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 raise ValueError("col must be provided")
274 if not data_list: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 logger.warning(f"Empty data list for chunk {chunk_name}")
276 return
278 if not OME_ZARR_AVAILABLE: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 raise ImportError("ome-zarr package is required. Install with: pip install ome-zarr")
281 # Use _split_store_and_key to get store path from first output path
282 store, _ = self._split_store_and_key(output_paths[0])
283 store_path = Path(store.path)
285 logger.debug(f"Saving batch for chunk {chunk_name} with {len(data_list)} images to row={row}, col={col}")
287 # Convert GPU arrays to CPU arrays before saving
288 cpu_data_list = []
289 for data in data_list:
290 if hasattr(data, 'get'): # CuPy array 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 cpu_data_list.append(data.get())
292 elif hasattr(data, 'cpu'): # PyTorch tensor 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true
293 cpu_data_list.append(data.cpu().numpy())
294 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # JAX on GPU 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true
295 import jax
296 cpu_data_list.append(jax.device_get(data))
297 else: # Already CPU array (NumPy, etc.)
298 cpu_data_list.append(data)
300 # Ensure parent directory exists
301 store_path.parent.mkdir(parents=True, exist_ok=True)
303 # Use _split_store_and_key to get properly configured store with dimension_separator='/'
304 store, _ = self._split_store_and_key(store_path)
305 root = zarr.group(store=store) # Open existing or create new group without mode conflicts
307 # Set OME metadata if not already present
308 if "ome" not in root.attrs:
309 root.attrs["ome"] = {"version": "0.4"}
311 # Get the store for compatibility with existing code
312 store = root.store
314 # Write plate metadata with locking to prevent concurrent corruption (if enabled)
315 should_write_plate_metadata = kwargs.get('write_plate_metadata', self.config.write_plate_metadata)
316 if should_write_plate_metadata: 316 ↛ 321line 316 didn't jump to line 321 because the condition on line 316 was always true
317 self._ensure_plate_metadata_with_lock(root, row, col, store_path)
319 # Create HCS-compliant structure: plate/row/col/field/resolution
320 # Create row group if it doesn't exist
321 if row not in root:
322 row_group = root.create_group(row)
323 else:
324 row_group = root[row]
326 # Create well group (remove existing if present to allow overwrite)
327 if col in row_group:
328 del row_group[col]
329 well_group = row_group.create_group(col)
331 # Add HCS well metadata
332 well_metadata = {
333 "images": [
334 {
335 "path": "0", # Single image containing all fields
336 "acquisition": 0
337 }
338 ],
339 "version": "0.5"
340 }
341 well_group.attrs["ome"] = {"version": "0.5", "well": well_metadata}
343 # Create field group (single field "0" containing all field data)
344 field_group = well_group.require_group("0")
346 # Always use full 5D structure: (fields, channels, z, y, x)
347 # Define OME-NGFF compliant axes
348 axes = [
349 {'name': 'field', 'type': 'field'}, # Custom field type - allowed before space
350 {'name': 'c', 'type': 'channel'},
351 {'name': 'z', 'type': 'space'},
352 {'name': 'y', 'type': 'space'},
353 {'name': 'x', 'type': 'space'}
354 ]
356 # Get image dimensions
357 sample_image = cpu_data_list[0]
358 height, width = sample_image.shape[-2:]
360 # Always reshape to full 5D: (n_fields, n_channels, n_z, y, x)
361 target_shape = [n_fields, n_channels, n_z, height, width]
363 # Stack and reshape data
364 stacked_data = np.stack(cpu_data_list, axis=0)
366 # Calculate total expected images for validation
367 total_expected = n_fields * n_channels * n_z
368 if len(data_list) != total_expected: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true
369 logger.warning(f"Data count mismatch: got {len(data_list)}, expected {total_expected} "
370 f"(fields={n_fields}, channels={n_channels}, z={n_z})")
372 # Always reshape to 5D structure
373 reshaped_data = stacked_data.reshape(target_shape)
375 logger.info(f"Zarr save_batch: {len(data_list)} images → {stacked_data.shape} → {reshaped_data.shape}")
376 axes_names = [ax['name'] for ax in axes]
377 logger.info(f"Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, axes={''.join(axes_names)}")
379 # Create field group (single field "0" containing all field data)
380 if "0" in well_group: 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true
381 field_group = well_group["0"]
382 else:
383 field_group = well_group.create_group("0")
385 # Write OME-ZARR well metadata with single field (well-chunked approach)
386 write_well_metadata(well_group, ['0'])
388 # Use single chunk approach for optimal performance
389 storage_options = {
390 "chunks": reshaped_data.shape, # Single chunk for entire well
391 "compressor": self._get_compressor()
392 }
394 # Write as single well-chunked array with proper multi-dimensional axes
395 write_image(
396 image=reshaped_data,
397 group=field_group,
398 axes=axes,
399 storage_options=storage_options,
400 scaler=None, # Single scale only for performance
401 compute=True
402 )
404 # Axes are already correctly set by write_image function
406 # Store filename mapping with 5D coordinates (field, channel, z, y, x)
407 # Convert flat index to 5D coordinates for proper zarr slicing
408 filename_map = {}
409 for i, path in enumerate(output_paths):
410 # Calculate 5D coordinates from flat index
411 field_idx = i // (n_channels * n_z)
412 remaining = i % (n_channels * n_z)
413 channel_idx = remaining // n_z
414 z_idx = remaining % n_z
416 # Store as tuple (field, channel, z) - y,x are full slices
417 filename_map[Path(path).name] = (field_idx, channel_idx, z_idx)
419 field_array = field_group['0']
420 field_array.attrs["openhcs_filename_map"] = filename_map
421 field_array.attrs["openhcs_output_paths"] = [str(path) for path in output_paths]
422 field_array.attrs["openhcs_dimensions"] = {
423 "n_fields": n_fields,
424 "n_channels": n_channels,
425 "n_z": n_z
426 }
428 logger.debug(f"Successfully saved batch for chunk {chunk_name}")
430 # Aggressive memory cleanup
431 del cpu_data_list
432 import gc
433 gc.collect()
435 def _ensure_plate_metadata_with_lock(self, root: zarr.Group, row: str, col: str, store_path: Path) -> None:
436 """Ensure plate-level metadata includes ALL existing wells with file locking."""
437 import fcntl
439 lock_path = store_path.with_suffix('.metadata.lock')
441 try:
442 with open(lock_path, 'w') as lock_file:
443 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
444 self._ensure_plate_metadata(root, row, col)
445 except Exception as e:
446 logger.error(f"Failed to update plate metadata with lock: {e}")
447 raise
448 finally:
449 if lock_path.exists(): 449 ↛ exitline 449 didn't return from function '_ensure_plate_metadata_with_lock' because the condition on line 449 was always true
450 lock_path.unlink()
452 def _ensure_plate_metadata(self, root: zarr.Group, row: str, col: str) -> None:
453 """Ensure plate-level metadata includes ALL existing wells in the store."""
455 # Scan the store for all existing wells
456 all_rows = set()
457 all_cols = set()
458 all_wells = []
460 for row_name in root.group_keys():
461 if isinstance(root[row_name], zarr.Group): # Ensure it's a row group 461 ↛ 460line 461 didn't jump to line 460 because the condition on line 461 was always true
462 row_group = root[row_name]
463 all_rows.add(row_name)
465 for col_name in row_group.group_keys():
466 if isinstance(row_group[col_name], zarr.Group): # Ensure it's a well group 466 ↛ 465line 466 didn't jump to line 465 because the condition on line 466 was always true
467 all_cols.add(col_name)
468 well_path = f"{row_name}/{col_name}"
469 all_wells.append(well_path)
471 # Include the current well being added (might not exist yet)
472 all_rows.add(row)
473 all_cols.add(col)
474 current_well_path = f"{row}/{col}"
475 if current_well_path not in all_wells:
476 all_wells.append(current_well_path)
478 # Sort for consistent ordering
479 sorted_rows = sorted(all_rows)
480 sorted_cols = sorted(all_cols)
481 sorted_wells = sorted(all_wells)
483 # Build wells metadata with proper indices
484 wells_metadata = []
485 for well_path in sorted_wells:
486 well_row, well_col = well_path.split("/")
487 row_index = sorted_rows.index(well_row)
488 col_index = sorted_cols.index(well_col)
489 wells_metadata.append({
490 "path": well_path,
491 "rowIndex": row_index,
492 "columnIndex": col_index
493 })
495 # Add acquisition metadata for HCS compliance
496 acquisitions = [
497 {
498 "id": 0,
499 "name": "default_acquisition",
500 "maximumfieldcount": 1 # Single field containing all field data
501 }
502 ]
504 # Write complete HCS plate metadata
505 write_plate_metadata(
506 root,
507 sorted_rows,
508 sorted_cols,
509 wells_metadata,
510 acquisitions=acquisitions,
511 field_count=1,
512 name="OpenHCS_Plate"
513 )
520 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
521 store, key = self._split_store_and_key(file_path)
522 group = zarr.group(store=store)
524 visited = set()
525 while self.is_symlink(key):
526 if key in visited:
527 raise RuntimeError(f"Zarr symlink loop detected at {key}")
528 visited.add(key)
529 key = group[key].attrs["_symlink"]
531 if key not in group:
532 raise FileNotFoundError(f"No array found at key '{key}'")
533 return group[key][:]
535 def list_files(self,
536 directory: Union[str, Path],
537 pattern: Optional[str] = None,
538 extensions: Optional[Set[str]] = None,
539 recursive: bool = False) -> List[Path]:
540 """
541 List all file-like entries (i.e. arrays) in a Zarr store, optionally filtered.
542 Returns filenames from array attributes (output_paths) if available.
543 """
545 store, relative_key = self._split_store_and_key(directory)
546 result: List[Path] = []
548 def _matches_filters(name: str) -> bool:
549 if pattern and not fnmatch.fnmatch(name, pattern): 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true
550 return False
551 if extensions: 551 ↛ 553line 551 didn't jump to line 553 because the condition on line 551 was always true
552 return any(name.lower().endswith(ext.lower()) for ext in extensions)
553 return True
555 try:
556 # Open zarr group and traverse OME-ZARR structure
557 group = zarr.open_group(store=store)
559 # Check if this is OME-ZARR structure (has plate metadata)
560 if "plate" in group.attrs: 560 ↛ 582line 560 didn't jump to line 582 because the condition on line 560 was always true
561 # OME-ZARR structure: traverse A/01/ wells
562 for row_name in group.group_keys():
563 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 563 ↛ 562line 563 didn't jump to line 562 because the condition on line 563 was always true
564 row_group = group[row_name]
565 for col_name in row_group.group_keys():
566 if col_name.isdigit(): # Column directory (01, 02, etc.) 566 ↛ 565line 566 didn't jump to line 565 because the condition on line 566 was always true
567 well_group = row_group[col_name]
569 # Get filenames from field array metadata
570 if "0" in well_group.group_keys(): 570 ↛ 565line 570 didn't jump to line 565 because the condition on line 570 was always true
571 field_group = well_group["0"]
572 if "0" in field_group.array_keys(): 572 ↛ 565line 572 didn't jump to line 565 because the condition on line 572 was always true
573 field_array = field_group["0"]
574 if "openhcs_output_paths" in field_array.attrs: 574 ↛ 565line 574 didn't jump to line 565 because the condition on line 574 was always true
575 output_paths = field_array.attrs["openhcs_output_paths"]
576 for filename in output_paths:
577 filename_only = Path(filename).name
578 if _matches_filters(filename_only): 578 ↛ 576line 578 didn't jump to line 576 because the condition on line 578 was always true
579 result.append(Path(filename))
580 else:
581 # Legacy flat structure: get array keys directly
582 array_keys = list(group.array_keys())
583 for array_key in array_keys:
584 try:
585 array = group[array_key]
586 if "output_paths" in array.attrs:
587 # Get original filenames from array attributes
588 output_paths = array.attrs["output_paths"]
589 for filename in output_paths:
590 filename_only = Path(filename).name
591 if _matches_filters(filename_only):
592 result.append(Path(filename))
594 except Exception as e:
595 # Skip arrays that can't be accessed
596 continue
598 except Exception as e:
599 raise StorageResolutionError(f"Failed to list zarr arrays: {e}") from e
601 return result
603 def list_dir(self, path: Union[str, Path]) -> List[str]:
604 store, relative_key = self._split_store_and_key(path)
606 # Normalize key for Zarr API
607 key = relative_key.rstrip("/")
609 try:
610 # Zarr 3.x uses async API - convert async generator to list
611 import asyncio
612 async def _get_entries():
613 entries = []
614 async for entry in store.list_dir(key):
615 entries.append(entry)
616 return entries
617 return asyncio.run(_get_entries())
618 except KeyError:
619 raise NotADirectoryError(f"Zarr path is not a directory: {path}")
620 except FileNotFoundError:
621 raise FileNotFoundError(f"Zarr path does not exist: {path}")
624 def delete(self, path: Union[str, Path]) -> None:
625 """
626 Delete a Zarr array (file) or empty group (directory) at the given path.
628 Args:
629 path: Zarr path or URI
631 Raises:
632 FileNotFoundError: If path does not exist
633 IsADirectoryError: If path is a non-empty group
634 StorageResolutionError: For unexpected failures
635 """
636 import zarr
637 import shutil
638 import os
640 path = str(path)
642 if not os.path.exists(path):
643 raise FileNotFoundError(f"Zarr path does not exist: {path}")
645 try:
646 zarr_obj = zarr.open(path, mode='r')
647 except Exception as e:
648 raise StorageResolutionError(f"Failed to open Zarr path: {path}") from e
650 # Determine if it's a file (array) or directory (group)
651 if isinstance(zarr_obj, zarr.core.Array):
652 try:
653 shutil.rmtree(path) # Array folders can be deleted directly
654 except Exception as e:
655 raise StorageResolutionError(f"Failed to delete Zarr array: {path}") from e
657 elif isinstance(zarr_obj, zarr.hierarchy.Group):
658 if os.listdir(path):
659 raise IsADirectoryError(f"Zarr group is not empty: {path}")
660 try:
661 os.rmdir(path)
662 except Exception as e:
663 raise StorageResolutionError(f"Failed to delete empty Zarr group: {path}") from e
664 else:
665 raise StorageResolutionError(f"Unrecognized Zarr object type at: {path}")
667 def delete_all(self, path: Union[str, Path]) -> None:
668 """
669 Recursively delete a Zarr array or group (file or directory).
671 This is the only permitted recursive deletion method for the Zarr backend.
673 Args:
674 path: the path shared through all backnds
676 Raises:
677 FileNotFoundError: If the path does not exist
678 StorageResolutionError: If deletion fails
679 """
680 import os
681 import shutil
683 path = str(path)
685 if not os.path.exists(path):
686 raise FileNotFoundError(f"Zarr path does not exist: {path}")
688 try:
689 shutil.rmtree(path)
690 except Exception as e:
691 raise StorageResolutionError(f"Failed to recursively delete Zarr path: {path}") from e
693 def exists(self, path: Union[str, Path]) -> bool:
694 path = Path(path)
696 # If path has no file extension, treat as directory existence check
697 # This handles auto_detect_patterns asking "does this directory exist?"
698 if not path.suffix: 698 ↛ 702line 698 didn't jump to line 702 because the condition on line 698 was always true
699 return path.exists()
701 # Otherwise, check zarr key existence (for actual files)
702 store, key = self._split_store_and_key(path)
704 # First check if the zarr store itself exists
705 if isinstance(store, str):
706 store_path = Path(store)
707 if not store_path.exists():
708 return False
710 try:
711 root_group = zarr.group(store=store)
712 return key in root_group or any(k.startswith(key.rstrip("/") + "/") for k in root_group.array_keys())
713 except Exception:
714 # If we can't open the zarr store, it doesn't exist
715 return False
717 def ensure_directory(self, directory: Union[str, Path]) -> Path:
718 """
719 No-op for zarr backend - zarr stores handle their own structure.
721 Zarr doesn't have filesystem directories that need to be "ensured".
722 Store creation and group structure is handled by save operations.
723 """
724 return Path(directory)
726 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
727 store, src_key = self._split_store_and_key(source)
728 store2, dst_key = self._split_store_and_key(link_name)
730 if store.root != store2.root:
731 raise ValueError("Symlinks must exist within the same .zarr store")
733 group = zarr.group(store=store)
734 if src_key not in group:
735 raise FileNotFoundError(f"Source key '{src_key}' not found in Zarr store")
737 if dst_key in group:
738 if not overwrite:
739 raise FileExistsError(f"Symlink target already exists at: {dst_key}")
740 # Remove existing entry if overwrite=True
741 del group[dst_key]
743 # Create a new group at the symlink path
744 link_group = group.require_group(dst_key)
745 link_group.attrs["_symlink"] = src_key # Store as declared string
747 def is_symlink(self, path: Union[str, Path]) -> bool:
748 """
749 Check if the given Zarr path represents a logical symlink (based on attribute contract).
751 Returns:
752 bool: True if the key exists and has an OpenHCS-declared symlink attribute
753 False if the key doesn't exist or is not a symlink
754 """
755 store, key = self._split_store_and_key(path)
756 group = zarr.group(store=store)
758 try:
759 obj = group[key]
760 attrs = getattr(obj, "attrs", {})
762 if "_symlink" not in attrs:
763 return False
765 # Enforce that the _symlink attr matches schema (e.g. str or list of path components)
766 if not isinstance(attrs["_symlink"], str):
767 raise StorageResolutionError(f"Invalid symlink format in Zarr attrs at: {path}")
769 return True
770 except KeyError:
771 # Key doesn't exist, so it's not a symlink
772 return False
773 except Exception as e:
774 raise StorageResolutionError(f"Failed to inspect Zarr symlink at: {path}") from e
776 def _auto_chunks(self, data: Any, chunk_divisor: int = 1) -> Tuple[int, ...]:
777 shape = data.shape
779 # Simple logic: 1/10th of each dim, with min 1
780 return tuple(max(1, s // chunk_divisor) for s in shape)
782 def is_file(self, path: Union[str, Path]) -> bool:
783 """
784 Check if a Zarr path points to a file (Zarr array), resolving both OS and Zarr-native symlinks.
786 Args:
787 path: Zarr store path (may point to key within store)
789 Returns:
790 bool: True if resolved path is a Zarr array
792 Raises:
793 FileNotFoundError: If path does not exist or broken symlink
794 IsADirectoryError: If resolved object is a Zarr group
795 StorageResolutionError: For other failures
796 """
797 path = str(path)
799 if not os.path.exists(path):
800 raise FileNotFoundError(f"Zarr path does not exist: {path}")
802 try:
803 store, key = self._split_store_and_key(path)
804 group = zarr.group(store=store)
806 # Resolve symlinks (Zarr-native, via .attrs)
807 seen_keys = set()
808 while True:
809 if key not in group:
810 raise FileNotFoundError(f"Zarr key does not exist: {key}")
811 obj = group[key]
813 if hasattr(obj, "attrs") and "_symlink" in obj.attrs:
814 if key in seen_keys:
815 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}")
816 seen_keys.add(key)
817 key = obj.attrs["_symlink"]
818 continue
819 break # resolution complete
821 # Now obj is the resolved target
822 if isinstance(obj, zarr.core.Array):
823 return True
824 elif isinstance(obj, zarr.hierarchy.Group):
825 raise IsADirectoryError(f"Zarr path is a group (directory): {path}")
826 else:
827 raise StorageResolutionError(f"Unknown Zarr object at: {path}")
829 except Exception as e:
830 raise StorageResolutionError(f"Failed to resolve Zarr file path: {path}") from e
832 def is_dir(self, path: Union[str, Path]) -> bool:
833 """
834 Check if a Zarr path resolves to a directory (i.e., a Zarr group).
836 Resolves both OS-level symlinks and Zarr-native symlinks via .attrs['_symlink'].
838 Args:
839 path: Zarr path or URI
841 Returns:
842 bool: True if path resolves to a Zarr group
844 Raises:
845 FileNotFoundError: If path or resolved target does not exist
846 NotADirectoryError: If resolved target is not a group
847 StorageResolutionError: For symlink cycles or other failures
848 """
849 import os
852 path = str(path)
854 if not os.path.exists(path):
855 raise FileNotFoundError(f"Zarr path does not exist: {path}")
857 try:
858 store, key = self._split_store_and_key(path)
859 group = zarr.group(store=store)
861 seen_keys = set()
863 # Resolve symlink chain
864 while True:
865 if key not in group:
866 raise FileNotFoundError(f"Zarr key does not exist: {key}")
867 obj = group[key]
869 if hasattr(obj, "attrs") and "_symlink" in obj.attrs:
870 if key in seen_keys:
871 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}")
872 seen_keys.add(key)
873 key = obj.attrs["_symlink"]
874 continue
875 break
877 # obj is resolved
878 if isinstance(obj, zarr.hierarchy.Group):
879 return True
880 elif isinstance(obj, zarr.core.Array):
881 raise NotADirectoryError(f"Zarr path is an array (file): {path}")
882 else:
883 raise StorageResolutionError(f"Unknown Zarr object at: {path}")
885 except Exception as e:
886 raise StorageResolutionError(f"Failed to resolve Zarr directory path: {path}") from e
888 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
889 """
890 Move a Zarr key or object (array/group) from one location to another, resolving symlinks.
892 Supports:
893 - Disk or memory stores
894 - Zarr-native symlinks
895 - Key renames within group
896 - Full copy+delete across stores if needed
898 Raises:
899 FileNotFoundError: If src does not exist
900 FileExistsError: If dst already exists
901 StorageResolutionError: On failure
902 """
903 import zarr
905 src_store, src_key = self._split_store_and_key(src)
906 dst_store, dst_key = self._split_store_and_key(dst)
908 src_group = zarr.group(store=src_store)
909 dst_group = zarr.group(store=dst_store)
911 if src_key not in src_group:
912 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}")
913 if dst_key in dst_group:
914 raise FileExistsError(f"Zarr destination key already exists: {dst_key}")
916 obj = src_group[src_key]
918 # Resolve symlinks if present
919 seen_keys = set()
920 while hasattr(obj, "attrs") and "_symlink" in obj.attrs:
921 if src_key in seen_keys:
922 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}")
923 seen_keys.add(src_key)
924 src_key = obj.attrs["_symlink"]
925 obj = src_group[src_key]
927 try:
928 if src_store is dst_store:
929 # Native move within the same Zarr group/store
930 src_group.move(src_key, dst_key)
931 else:
932 # Cross-store: perform manual copy + delete
933 obj.copy(dst_group, name=dst_key)
934 del src_group[src_key]
935 except Exception as e:
936 raise StorageResolutionError(f"Failed to move {src_key} to {dst_key}") from e
938 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
939 """
940 Copy a Zarr key or object (array/group) from one location to another.
942 - Resolves Zarr-native symlinks before copying
943 - Prevents overwrite unless explicitly allowed (future feature)
944 - Works across memory or disk stores
946 Raises:
947 FileNotFoundError: If src does not exist
948 FileExistsError: If dst already exists
949 StorageResolutionError: On failure
950 """
951 import zarr
953 src_store, src_key = self._split_store_and_key(src)
954 dst_store, dst_key = self._split_store_and_key(dst)
956 src_group = zarr.group(store=src_store)
957 dst_group = zarr.group(store=dst_store)
959 if src_key not in src_group:
960 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}")
961 if dst_key in dst_group:
962 raise FileExistsError(f"Zarr destination key already exists: {dst_key}")
964 obj = src_group[src_key]
966 seen_keys = set()
967 while hasattr(obj, "attrs") and "_symlink" in obj.attrs:
968 if src_key in seen_keys:
969 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}")
970 seen_keys.add(src_key)
971 src_key = obj.attrs["_symlink"]
972 obj = src_group[src_key]
974 try:
975 obj.copy(dst_group, name=dst_key)
976 except Exception as e:
977 raise StorageResolutionError(f"Failed to copy {src_key} to {dst_key}") from e
979 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
980 """
981 Return structural metadata about a Zarr path.
983 Returns:
984 dict with keys:
985 - 'type': 'file', 'directory', 'symlink', or 'missing'
986 - 'key': final resolved key
987 - 'target': symlink target if applicable
988 - 'store': repr(store)
989 - 'exists': bool
991 Raises:
992 StorageResolutionError: On resolution failure
993 """
994 store, key = self._split_store_and_key(path)
995 group = zarr.group(store=store)
997 try:
998 if key in group:
999 obj = group[key]
1000 attrs = getattr(obj, "attrs", {})
1001 is_link = "_symlink" in attrs
1003 if is_link:
1004 target = attrs["_symlink"]
1005 if not isinstance(target, str):
1006 raise StorageResolutionError(f"Invalid symlink format at {key}")
1007 return {
1008 "type": "symlink",
1009 "key": key,
1010 "target": target,
1011 "store": repr(store),
1012 "exists": target in group
1013 }
1015 if isinstance(obj, zarr.Array):
1016 return {
1017 "type": "file",
1018 "key": key,
1019 "store": repr(store),
1020 "exists": True
1021 }
1023 elif isinstance(obj, zarr.Group):
1024 return {
1025 "type": "directory",
1026 "key": key,
1027 "store": repr(store),
1028 "exists": True
1029 }
1031 raise StorageResolutionError(f"Unknown object type at: {key}")
1032 else:
1033 return {
1034 "type": "missing",
1035 "key": key,
1036 "store": repr(store),
1037 "exists": False
1038 }
1040 except Exception as e:
1041 raise StorageResolutionError(f"Failed to stat Zarr key {key}") from e
1043class ZarrSymlink:
1044 """
1045 Represents a symbolic link in a Zarr store.
1047 This class is used to represent symbolic links in a Zarr store.
1048 It stores the target path of the symlink.
1049 """
1050 def __init__(self, target: str):
1051 self.target = target
1053 def __repr__(self):
1054 return f"<ZarrSymlink → {self.target}>"