Coverage for src/polystore/zarr.py: 20%
600 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 06:58 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 06:58 +0000
1# openhcs/io/storage/backends/zarr.py
2"""
3Zarr storage backend module for OpenHCS.
5This module provides a Zarr-backed implementation of the MicroscopyStorageBackend interface.
6It stores data in a Zarr store on disk and supports overlay operations
7for materializing data to disk when needed.
8"""
10import fnmatch
11import logging
12import threading
13from functools import wraps
14from pathlib import Path
15from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
17import numpy as np
18import zarr
20# Lazy ome-zarr loading to avoid dask → GPU library chain at import time
21_ome_zarr_state = {'available': None, 'cache': {}, 'event': threading.Event(), 'thread': None}
23logger = logging.getLogger(__name__)
26# Decorator for passthrough to disk backend
27def passthrough_to_disk(*extensions: str, ensure_parent_dir: bool = False):
28 """
29 Decorator to automatically passthrough certain file types to disk backend.
31 Zarr only supports array data, so non-array files (JSON, CSV, TXT, ROI.ZIP, etc.)
32 are automatically delegated to the disk backend.
34 Uses introspection to automatically find the path parameter (any parameter with 'path' in its name).
36 Args:
37 *extensions: File extensions to passthrough (e.g., '.json', '.csv', '.txt')
38 ensure_parent_dir: If True, ensure parent directory exists before calling disk backend (for save operations)
40 Usage:
41 @passthrough_to_disk('.json', '.csv', '.txt', '.roi.zip', '.zip', ensure_parent_dir=True)
42 def save(self, data, output_path, **kwargs):
43 # Zarr-specific save logic here
44 ...
45 """
46 import inspect
48 def decorator(method: Callable) -> Callable:
49 # Use introspection to find the path parameter index at decoration time
50 sig = inspect.signature(method)
51 path_param_index = None
53 for i, (param_name, param) in enumerate(sig.parameters.items()):
54 if param_name == 'self':
55 continue
56 # Find first parameter with 'path' in its name
57 if 'path' in param_name.lower():
58 # Adjust for self parameter (subtract 1 since we skip 'self' in args)
59 path_param_index = i - 1
60 break
62 if path_param_index is None:
63 raise ValueError(f"No path parameter found in {method.__name__} signature. "
64 f"Expected a parameter with 'path' in its name.")
66 @wraps(method)
67 def wrapper(self, *args, **kwargs):
68 # Extract path from args at the discovered index
69 path_arg = None
71 if len(args) > path_param_index:
72 arg = args[path_param_index]
73 if isinstance(arg, (str, Path)):
74 path_arg = str(arg)
76 # Check if path matches passthrough extensions
77 if path_arg and any(path_arg.endswith(ext) for ext in extensions):
78 # Use local backend registry to avoid OpenHCS dependency
79 disk_backend = get_backend_instance('disk')
81 # Ensure parent directory exists if requested (for save operations)
82 if ensure_parent_dir:
83 parent_dir = Path(path_arg).parent
84 disk_backend.ensure_directory(parent_dir)
86 # Call the same method on disk backend
87 return getattr(disk_backend, method.__name__)(*args, **kwargs)
89 # Otherwise, call the original method
90 return method(self, *args, **kwargs)
92 return wrapper
93 return decorator
96def _load_ome_zarr():
97 """Load ome-zarr and cache imports."""
98 try:
99 logger.info("Loading ome-zarr...")
100 from ome_zarr.writer import write_image, write_plate_metadata, write_well_metadata
101 from ome_zarr.io import parse_url
103 _ome_zarr_state['cache'] = {
104 'write_image': write_image,
105 'write_plate_metadata': write_plate_metadata,
106 'write_well_metadata': write_well_metadata,
107 'parse_url': parse_url
108 }
109 _ome_zarr_state['available'] = True
110 logger.info("ome-zarr loaded successfully")
111 except ImportError as e:
112 _ome_zarr_state['available'] = False
113 logger.warning(f"ome-zarr not available: {e}")
114 finally:
115 _ome_zarr_state['event'].set()
118def start_ome_zarr_loading_async():
119 """Start loading ome-zarr in background thread (safe to call multiple times)."""
120 if _ome_zarr_state['thread'] is None and _ome_zarr_state['available'] is None:
121 _ome_zarr_state['thread'] = threading.Thread(
122 target=_load_ome_zarr, daemon=True, name="ome-zarr-loader"
123 )
124 _ome_zarr_state['thread'].start()
125 logger.info("Started ome-zarr background loading")
128def _ensure_ome_zarr(timeout: float = 30.0):
129 """
130 Ensure ome-zarr is loaded, waiting for background load if needed.
132 Returns: Tuple of (write_image, write_plate_metadata, write_well_metadata, parse_url)
133 Raises: ImportError if ome-zarr not available, TimeoutError if loading times out
134 """
135 # Load synchronously if not started
136 if _ome_zarr_state['available'] is None and _ome_zarr_state['thread'] is None:
137 logger.warning("ome-zarr not pre-loaded, loading synchronously (will block)")
138 _load_ome_zarr()
140 # Wait for background loading
141 if not _ome_zarr_state['event'].is_set():
142 logger.info("Waiting for ome-zarr background loading...")
143 if not _ome_zarr_state['event'].wait(timeout):
144 raise TimeoutError(f"ome-zarr loading timed out after {timeout}s")
146 # Check availability
147 if not _ome_zarr_state['available']:
148 raise ImportError("ome-zarr library not available. Install with: pip install ome-zarr")
150 cache = _ome_zarr_state['cache']
151 return (cache['write_image'], cache['write_plate_metadata'],
152 cache['write_well_metadata'], cache['parse_url'])
154# Cross-platform file locking
155try:
156 import fcntl
157 FCNTL_AVAILABLE = True
158except ImportError:
159 import portalocker
160 FCNTL_AVAILABLE = False
162from .backend_registry import get_backend_instance
163from .base import StorageBackend
164from .exceptions import StorageResolutionError
167class ZarrStorageBackend(StorageBackend):
168 """Zarr storage backend with automatic registration."""
169 # Use simple backend type string to avoid depending on OpenHCS enums
170 _backend_type = "zarr"
171 """
172 Zarr storage backend implementation with configurable compression.
174 This class provides a concrete implementation of the storage backend interfaces
175 for Zarr storage. It stores data in a Zarr store on disk with configurable
176 compression algorithms and settings.
178 Features:
179 - Single-chunk batch operations for 40x performance improvement
180 - Configurable compression (Blosc, Zlib, LZ4, Zstd, or none)
181 - Configurable compression levels
182 - Full path mapping for batch operations
183 """
185 def __init__(self, zarr_config: Optional['ZarrConfig'] = None):
186 """
187 Initialize Zarr backend with ZarrConfig.
189 Args:
190 zarr_config: ZarrConfig dataclass with all zarr settings (uses defaults if None)
191 """
192 # Import local ZarrConfig to remain OpenHCS-agnostic
193 from .config import ZarrConfig
195 if zarr_config is None:
196 zarr_config = ZarrConfig()
198 self.config = zarr_config
200 # Convenience attributes
201 self.compression_level = zarr_config.compression_level
203 # Create actual compressor from config (shuffle always enabled for Blosc)
204 self.compressor = self.config.compressor.create_compressor(
205 self.config.compression_level,
206 shuffle=True # Always enable shuffle for better compression
207 )
209 def _get_compressor(self) -> Optional[Any]:
210 """
211 Get the configured compressor with appropriate settings.
213 Returns:
214 Configured compressor instance or None for no compression
215 """
216 if self.compressor is None:
217 return None
219 # If compression_level is specified and compressor supports it
220 if self.compression_level is not None:
221 # Check if compressor has level parameter
222 if hasattr(self.compressor, '__class__'):
223 try:
224 # Create new instance with compression level
225 compressor_class = self.compressor.__class__
226 if 'level' in compressor_class.__init__.__code__.co_varnames:
227 return compressor_class(level=self.compression_level)
228 elif 'clevel' in compressor_class.__init__.__code__.co_varnames:
229 return compressor_class(clevel=self.compression_level)
230 except (AttributeError, TypeError):
231 # Fall back to original compressor if level setting fails
232 pass
234 return self.compressor
236 def _calculate_chunks(self, data_shape: Tuple[int, ...]) -> Tuple[int, ...]:
237 """
238 Calculate chunk shape based on configured strategy.
240 Args:
241 data_shape: Shape of the 5D array (fields, channels, z, y, x)
243 Returns:
244 Chunk shape tuple
245 """
246 from .config import ZarrChunkStrategy
248 match self.config.chunk_strategy:
249 case ZarrChunkStrategy.WELL:
250 # Single chunk for entire well (current behavior, optimal for batch I/O)
251 return data_shape
252 case ZarrChunkStrategy.FILE:
253 # One chunk per individual file: (1, 1, 1, y, x)
254 # Each original tif is compressed separately
255 return (1, 1, 1, data_shape[3], data_shape[4])
257 def _split_store_and_key(self, path: Union[str, Path]) -> Tuple[Any, str]:
258 """
259 Split path into zarr store and key.
261 The zarr store is always the directory containing the image files, regardless of backend.
262 For example:
263 - "/path/to/plate_outputs/images/A01.tif" → Store: "/path/to/plate_outputs/images", Key: "A01.tif"
264 - "/path/to/plate.zarr/images/A01.tif" → Store: "/path/to/plate.zarr/images", Key: "A01.tif"
266 The images directory itself becomes the zarr store - zarr files are added within it.
267 A zarr store doesn't need to have a folder name ending in .zarr.
269 Returns a DirectoryStore with dimension_separator='/' for OME-ZARR compatibility.
270 """
271 path = Path(path)
273 # If path has a file extension (like .tif), the parent directory is the zarr store
274 if path.suffix:
275 # File path - parent directory (e.g., "images") is the zarr store
276 store_path = path.parent
277 relative_key = path.name
278 else:
279 # Directory path - treat as zarr store
280 store_path = path
281 relative_key = ""
283 # CRITICAL: Create DirectoryStore with dimension_separator='/' for OME-ZARR compatibility
284 # This ensures chunk paths use '/' instead of '.' (e.g., '0/0/0' not '0.0.0')
285 store = zarr.DirectoryStore(str(store_path), dimension_separator='/')
286 return store, relative_key
288 @passthrough_to_disk('.json', '.csv', '.txt', '.roi.zip', '.zip', ensure_parent_dir=True)
289 def save(self, data: Any, output_path: Union[str, Path], **kwargs):
290 """
291 Save data to Zarr at the given output_path.
293 Will only write if the key does not already exist.
294 Will NOT overwrite or delete existing data.
296 Raises:
297 FileExistsError: If destination key already exists
298 StorageResolutionError: If creation fails
299 """
300 # Zarr-specific save logic (non-array files automatically passthrough to disk)
301 store, key = self._split_store_and_key(output_path)
302 group = zarr.group(store=store)
304 if key in group:
305 raise FileExistsError(f"Zarr key already exists: {output_path}")
307 chunks = kwargs.get("chunks")
308 if chunks is None:
309 chunks = self._auto_chunks(data, chunk_divisor=kwargs.get("chunk_divisor", 1))
311 try:
312 # Create array with correct shape and dtype, then assign data
313 array = group.create_dataset(
314 name=key,
315 shape=data.shape,
316 dtype=data.dtype,
317 chunks=chunks,
318 compressor=kwargs.get("compressor", self._get_compressor()),
319 overwrite=False # 🔒 Must be False by doctrine
320 )
321 array[:] = data
322 except Exception as e:
323 raise StorageResolutionError(f"Failed to save to Zarr: {output_path}") from e
325 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
326 """
327 Load from zarr array using filename mapping.
329 Args:
330 file_paths: List of file paths to load
331 **kwargs: Additional arguments (zarr_config not needed)
333 Returns:
334 List of loaded data objects in same order as file_paths
336 Raises:
337 FileNotFoundError: If expected zarr store not found
338 KeyError: If filename not found in filename_map
339 """
340 if not file_paths:
341 return []
343 # Use _split_store_and_key to get store path from first file path
344 store, _ = self._split_store_and_key(file_paths[0])
345 store_path = Path(store.path)
347 # FAIL LOUD: Store must exist
348 if not store_path.exists():
349 raise FileNotFoundError(f"Expected zarr store not found: {store_path}")
350 root = zarr.open_group(store=store, mode='r')
352 # Group files by well based on OME-ZARR structure
353 well_to_files = {}
354 well_to_indices = {}
356 # Search OME-ZARR structure for requested files
357 for row_name in root.group_keys():
358 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.)
359 row_group = root[row_name]
360 for col_name in row_group.group_keys():
361 if col_name.isdigit(): # Column directory (01, 02, etc.)
362 well_group = row_group[col_name]
363 well_name = f"{row_name}{col_name}"
365 # Check if this well has our filename mapping in the field array
366 if "0" in well_group.group_keys():
367 field_group = well_group["0"]
368 if "0" in field_group.array_keys():
369 field_array = field_group["0"]
370 if "openhcs_filename_map" in field_array.attrs:
371 filename_map = dict(field_array.attrs["openhcs_filename_map"])
373 # Check which requested files are in this well
374 for i, path in enumerate(file_paths):
375 filename = Path(path).name # Use filename only for matching
376 if filename in filename_map:
377 if well_name not in well_to_files:
378 well_to_files[well_name] = []
379 well_to_indices[well_name] = []
380 well_to_files[well_name].append(i) # Original position in file_paths
381 well_to_indices[well_name].append(filename_map[filename]) # 5D coordinates (field, channel, z)
383 # Load data from each well using single well chunk
384 results = [None] * len(file_paths) # Pre-allocate results array
386 for well_name, file_positions in well_to_files.items():
387 row, col = well_name[0], well_name[1:]
388 well_group = root[row][col]
389 well_indices = well_to_indices[well_name]
391 # Load entire well field array in single operation (well chunking)
392 field_group = well_group["0"]
393 field_array = field_group["0"]
394 all_well_data = field_array[:] # Single I/O operation for entire well
396 # Extract requested 2D slices using 5D coordinates
397 for file_pos, coords_5d in zip(file_positions, well_indices):
398 field_idx, channel_idx, z_idx = coords_5d
399 # Extract 2D slice: (field, channel, z, y, x) -> (y, x)
400 results[file_pos] = all_well_data[field_idx, channel_idx, z_idx, :, :] # 2D slice
402 logger.debug(f"Loaded {len(file_paths)} images from zarr store at {store_path} from {len(well_to_files)} wells")
403 return results
405 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None:
406 """Save multiple images using ome-zarr-py for proper OME-ZARR compliance with multi-dimensional support.
408 Args:
409 data_list: List of image data to save
410 output_paths: List of output file paths
411 **kwargs: Must include chunk_name, n_channels, n_z, n_fields, row, col
412 """
414 # Ensure ome-zarr is loaded (waits for background load if needed)
415 write_image, write_plate_metadata, write_well_metadata, _ = _ensure_ome_zarr()
417 # Extract required parameters from kwargs
418 chunk_name = kwargs.get('chunk_name')
419 n_channels = kwargs.get('n_channels')
420 n_z = kwargs.get('n_z')
421 n_fields = kwargs.get('n_fields')
422 row = kwargs.get('row')
423 col = kwargs.get('col')
425 # Validate required parameters
426 if chunk_name is None:
427 raise ValueError("chunk_name must be provided")
428 if n_channels is None:
429 raise ValueError("n_channels must be provided")
430 if n_z is None:
431 raise ValueError("n_z must be provided")
432 if n_fields is None:
433 raise ValueError("n_fields must be provided")
434 if row is None:
435 raise ValueError("row must be provided")
436 if col is None:
437 raise ValueError("col must be provided")
439 if not data_list:
440 logger.warning(f"Empty data list for chunk {chunk_name}")
441 return
443 if not _ome_zarr_state['available']:
444 raise ImportError("ome-zarr package is required. Install with: pip install ome-zarr")
446 # Use _split_store_and_key to get store path from first output path
447 store, _ = self._split_store_and_key(output_paths[0])
448 store_path = Path(store.path)
450 logger.debug(f"Saving batch for chunk {chunk_name} with {len(data_list)} images to row={row}, col={col}")
452 # Convert GPU arrays to CPU arrays before saving
453 cpu_data_list = []
454 for data in data_list:
455 if hasattr(data, 'get'): # CuPy array
456 cpu_data_list.append(data.get())
457 elif hasattr(data, 'cpu'): # PyTorch tensor
458 cpu_data_list.append(data.cpu().numpy())
459 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # JAX on GPU
460 import jax
461 cpu_data_list.append(jax.device_get(data))
462 else: # Already CPU array (NumPy, etc.)
463 cpu_data_list.append(data)
465 # Ensure parent directory exists
466 store_path.parent.mkdir(parents=True, exist_ok=True)
468 # Use _split_store_and_key to get properly configured store with dimension_separator='/'
469 store, _ = self._split_store_and_key(store_path)
470 root = zarr.group(store=store) # Open existing or create new group without mode conflicts
472 # Set OME metadata if not already present
473 if "ome" not in root.attrs:
474 root.attrs["ome"] = {"version": "0.4"}
476 # Get the store for compatibility with existing code
477 store = root.store
479 # Write plate metadata with locking to prevent concurrent corruption
480 # Always enabled for OME-ZARR HCS compliance
481 self._ensure_plate_metadata_with_lock(root, row, col, store_path)
483 # Create HCS-compliant structure: plate/row/col/field/resolution
484 # Create row group if it doesn't exist
485 if row not in root:
486 row_group = root.create_group(row)
487 else:
488 row_group = root[row]
490 # Create well group (remove existing if present to allow overwrite)
491 if col in row_group:
492 del row_group[col]
493 well_group = row_group.create_group(col)
495 # Add HCS well metadata
496 well_metadata = {
497 "images": [
498 {
499 "path": "0", # Single image containing all fields
500 "acquisition": 0
501 }
502 ],
503 "version": "0.5"
504 }
505 well_group.attrs["ome"] = {"version": "0.5", "well": well_metadata}
507 # Create field group (single field "0" containing all field data)
508 field_group = well_group.require_group("0")
510 # Always use full 5D structure: (fields, channels, z, y, x)
511 # Define OME-NGFF compliant axes
512 axes = [
513 {'name': 'field', 'type': 'field'}, # Custom field type - allowed before space
514 {'name': 'c', 'type': 'channel'},
515 {'name': 'z', 'type': 'space'},
516 {'name': 'y', 'type': 'space'},
517 {'name': 'x', 'type': 'space'}
518 ]
520 # Get image dimensions
521 sample_image = cpu_data_list[0]
522 height, width = sample_image.shape[-2:]
524 # Always reshape to full 5D: (n_fields, n_channels, n_z, y, x)
525 target_shape = [n_fields, n_channels, n_z, height, width]
527 # Stack and reshape data
528 stacked_data = np.stack(cpu_data_list, axis=0)
530 # Calculate total expected images for validation
531 total_expected = n_fields * n_channels * n_z
532 if len(data_list) != total_expected:
533 logger.warning(f"Data count mismatch: got {len(data_list)}, expected {total_expected} "
534 f"(fields={n_fields}, channels={n_channels}, z={n_z})")
536 # Log detailed shape information before reshape
537 logger.info(f"🔍 ZARR RESHAPE DEBUG:")
538 logger.info(f" - Input: {len(data_list)} images")
539 logger.info(f" - Stacked shape: {stacked_data.shape}")
540 logger.info(f" - Stacked size: {stacked_data.size}")
541 logger.info(f" - Target shape: {target_shape}")
542 logger.info(f" - Target size: {np.prod(target_shape)}")
543 logger.info(f" - Sample image shape: {sample_image.shape}")
544 logger.info(f" - Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, h={height}, w={width}")
546 # Always reshape to 5D structure
547 reshaped_data = stacked_data.reshape(target_shape)
549 logger.info(f"Zarr save_batch: {len(data_list)} images → {stacked_data.shape} → {reshaped_data.shape}")
550 axes_names = [ax['name'] for ax in axes]
551 logger.info(f"Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, axes={''.join(axes_names)}")
553 # Create field group (single field "0" containing all field data)
554 if "0" in well_group:
555 field_group = well_group["0"]
556 else:
557 field_group = well_group.create_group("0")
559 # Write OME-ZARR well metadata with single field (well-chunked approach)
560 write_well_metadata(well_group, ['0'])
562 # Calculate chunks based on configured strategy
563 storage_options = {
564 "chunks": self._calculate_chunks(reshaped_data.shape),
565 "compressor": self._get_compressor()
566 }
568 # Write as single well-chunked array with proper multi-dimensional axes
569 write_image(
570 image=reshaped_data,
571 group=field_group,
572 axes=axes,
573 storage_options=storage_options,
574 scaler=None, # Single scale only for performance
575 compute=True
576 )
578 # Axes are already correctly set by write_image function
580 # Store filename mapping with 5D coordinates (field, channel, z, y, x)
581 # Convert flat index to 5D coordinates for proper zarr slicing
582 filename_map = {}
583 for i, path in enumerate(output_paths):
584 # Calculate 5D coordinates from flat index
585 field_idx = i // (n_channels * n_z)
586 remaining = i % (n_channels * n_z)
587 channel_idx = remaining // n_z
588 z_idx = remaining % n_z
590 # Store as tuple (field, channel, z) - y,x are full slices
591 filename_map[Path(path).name] = (field_idx, channel_idx, z_idx)
593 field_array = field_group['0']
594 field_array.attrs["openhcs_filename_map"] = filename_map
595 field_array.attrs["openhcs_output_paths"] = [str(path) for path in output_paths]
596 field_array.attrs["openhcs_dimensions"] = {
597 "n_fields": n_fields,
598 "n_channels": n_channels,
599 "n_z": n_z
600 }
602 logger.debug(f"Successfully saved batch for chunk {chunk_name}")
604 # Aggressive memory cleanup
605 del cpu_data_list
606 import gc
607 gc.collect()
609 def _ensure_plate_metadata_with_lock(self, root: zarr.Group, row: str, col: str, store_path: Path) -> None:
610 """Ensure plate-level metadata includes ALL existing wells with file locking."""
611 lock_path = store_path.with_suffix('.metadata.lock')
613 try:
614 with open(lock_path, 'w') as lock_file:
615 if FCNTL_AVAILABLE:
616 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
617 else:
618 portalocker.lock(lock_file, portalocker.LOCK_EX)
619 self._ensure_plate_metadata(root, row, col)
620 except Exception as e:
621 logger.error(f"Failed to update plate metadata with lock: {e}")
622 raise
623 finally:
624 if lock_path.exists():
625 lock_path.unlink()
627 def _ensure_plate_metadata(self, root: zarr.Group, row: str, col: str) -> None:
628 """Ensure plate-level metadata includes ALL existing wells in the store."""
630 # Ensure ome-zarr is loaded
631 _, write_plate_metadata, _, _ = _ensure_ome_zarr()
633 # Scan the store for all existing wells
634 all_rows = set()
635 all_cols = set()
636 all_wells = []
638 for row_name in root.group_keys():
639 if isinstance(root[row_name], zarr.Group): # Ensure it's a row group
640 row_group = root[row_name]
641 all_rows.add(row_name)
643 for col_name in row_group.group_keys():
644 if isinstance(row_group[col_name], zarr.Group): # Ensure it's a well group
645 all_cols.add(col_name)
646 well_path = f"{row_name}/{col_name}"
647 all_wells.append(well_path)
649 # Include the current well being added (might not exist yet)
650 all_rows.add(row)
651 all_cols.add(col)
652 current_well_path = f"{row}/{col}"
653 if current_well_path not in all_wells:
654 all_wells.append(current_well_path)
656 # Sort for consistent ordering
657 sorted_rows = sorted(all_rows)
658 sorted_cols = sorted(all_cols)
659 sorted_wells = sorted(all_wells)
661 # Build wells metadata with proper indices
662 wells_metadata = []
663 for well_path in sorted_wells:
664 well_row, well_col = well_path.split("/")
665 row_index = sorted_rows.index(well_row)
666 col_index = sorted_cols.index(well_col)
667 wells_metadata.append({
668 "path": well_path,
669 "rowIndex": row_index,
670 "columnIndex": col_index
671 })
673 # Add acquisition metadata for HCS compliance
674 acquisitions = [
675 {
676 "id": 0,
677 "name": "default_acquisition",
678 "maximumfieldcount": 1 # Single field containing all field data
679 }
680 ]
682 # Write complete HCS plate metadata
683 write_plate_metadata(
684 root,
685 sorted_rows,
686 sorted_cols,
687 wells_metadata,
688 acquisitions=acquisitions,
689 field_count=1,
690 name="OpenHCS_Plate"
691 )
698 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
699 """
700 Load a single file from zarr store.
702 For OME-ZARR structure with filename mapping, delegates to load_batch.
703 For legacy flat structure or direct keys, uses direct key lookup.
705 Args:
706 file_path: Path to file to load
707 **kwargs: Additional arguments
709 Returns:
710 Loaded array data
712 Raises:
713 FileNotFoundError: If file not found in zarr store
714 """
715 store, key = self._split_store_and_key(file_path)
716 group = zarr.group(store=store)
718 # Check if this is OME-ZARR structure with filename mapping
719 if "plate" in group.attrs:
720 # OME-ZARR structure: use load_batch which understands filename mapping
721 result = self.load_batch([file_path], **kwargs)
722 if not result:
723 raise FileNotFoundError(f"File not found in OME-ZARR store: {file_path}")
724 return result[0]
726 # Legacy flat structure: direct key lookup with symlink resolution
727 visited = set()
728 while self.is_symlink(key):
729 if key in visited:
730 raise RuntimeError(f"Zarr symlink loop detected at {key}")
731 visited.add(key)
732 key = group[key].attrs["_symlink"]
734 if key not in group:
735 raise FileNotFoundError(f"No array found at key '{key}'")
736 return group[key][:]
738 def list_files(self,
739 directory: Union[str, Path],
740 pattern: Optional[str] = None,
741 extensions: Optional[Set[str]] = None,
742 recursive: bool = False) -> List[Path]:
743 """
744 List all file-like entries (i.e. arrays) in a Zarr store, optionally filtered.
745 Returns filenames from array attributes (output_paths) if available.
746 """
748 store, relative_key = self._split_store_and_key(directory)
749 result: List[Path] = []
751 def _matches_filters(name: str) -> bool:
752 if pattern and not fnmatch.fnmatch(name, pattern):
753 return False
754 if extensions:
755 return any(name.lower().endswith(ext.lower()) for ext in extensions)
756 return True
758 try:
759 # Open zarr group and traverse OME-ZARR structure
760 group = zarr.open_group(store=store)
762 # Check if this is OME-ZARR structure (has plate metadata)
763 if "plate" in group.attrs:
764 # OME-ZARR structure: traverse A/01/ wells
765 for row_name in group.group_keys():
766 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.)
767 row_group = group[row_name]
768 for col_name in row_group.group_keys():
769 if col_name.isdigit(): # Column directory (01, 02, etc.)
770 well_group = row_group[col_name]
772 # Get filenames from field array metadata
773 if "0" in well_group.group_keys():
774 field_group = well_group["0"]
775 if "0" in field_group.array_keys():
776 field_array = field_group["0"]
777 if "openhcs_output_paths" in field_array.attrs:
778 output_paths = field_array.attrs["openhcs_output_paths"]
779 for filename in output_paths:
780 filename_only = Path(filename).name
781 if _matches_filters(filename_only):
782 result.append(Path(filename))
783 else:
784 # Legacy flat structure: get array keys directly
785 array_keys = list(group.array_keys())
786 for array_key in array_keys:
787 try:
788 array = group[array_key]
789 if "output_paths" in array.attrs:
790 # Get original filenames from array attributes
791 output_paths = array.attrs["output_paths"]
792 for filename in output_paths:
793 filename_only = Path(filename).name
794 if _matches_filters(filename_only):
795 result.append(Path(filename))
797 except Exception as e:
798 # Skip arrays that can't be accessed
799 continue
801 except Exception as e:
802 raise StorageResolutionError(f"Failed to list zarr arrays: {e}") from e
804 return result
806 def list_dir(self, path: Union[str, Path]) -> List[str]:
807 store, relative_key = self._split_store_and_key(path)
809 # Normalize key for Zarr API
810 key = relative_key.rstrip("/")
812 try:
813 # Zarr 3.x uses async API - convert async generator to list
814 import asyncio
815 async def _get_entries():
816 entries = []
817 async for entry in store.list_dir(key):
818 entries.append(entry)
819 return entries
820 return asyncio.run(_get_entries())
821 except KeyError:
822 raise NotADirectoryError(f"Zarr path is not a directory: {path}")
823 except FileNotFoundError:
824 raise FileNotFoundError(f"Zarr path does not exist: {path}")
827 def delete(self, path: Union[str, Path]) -> None:
828 """
829 Delete a Zarr array (file) or empty group (directory) at the given path.
831 Args:
832 path: Zarr path or URI
834 Raises:
835 FileNotFoundError: If path does not exist
836 IsADirectoryError: If path is a non-empty group
837 StorageResolutionError: For unexpected failures
838 """
839 import zarr
840 import shutil
841 import os
843 # Passthrough to disk backend for text files (JSON, CSV, TXT)
844 path_str = str(path)
845 if path_str.endswith(('.json', '.csv', '.txt')):
846 disk_backend = get_backend_instance('disk')
847 return disk_backend.delete(path)
849 path = str(path)
851 if not os.path.exists(path):
852 raise FileNotFoundError(f"Zarr path does not exist: {path}")
854 try:
855 zarr_obj = zarr.open(path, mode='r')
856 except Exception as e:
857 raise StorageResolutionError(f"Failed to open Zarr path: {path}") from e
859 # Determine if it's a file (array) or directory (group)
860 if isinstance(zarr_obj, zarr.core.Array):
861 try:
862 shutil.rmtree(path) # Array folders can be deleted directly
863 except Exception as e:
864 raise StorageResolutionError(f"Failed to delete Zarr array: {path}") from e
866 elif isinstance(zarr_obj, zarr.hierarchy.Group):
867 if os.listdir(path):
868 raise IsADirectoryError(f"Zarr group is not empty: {path}")
869 try:
870 os.rmdir(path)
871 except Exception as e:
872 raise StorageResolutionError(f"Failed to delete empty Zarr group: {path}") from e
873 else:
874 raise StorageResolutionError(f"Unrecognized Zarr object type at: {path}")
876 def delete_all(self, path: Union[str, Path]) -> None:
877 """
878 Recursively delete a Zarr array or group (file or directory).
880 This is the only permitted recursive deletion method for the Zarr backend.
882 Args:
883 path: the path shared through all backnds
885 Raises:
886 FileNotFoundError: If the path does not exist
887 StorageResolutionError: If deletion fails
888 """
889 import os
890 import shutil
892 path = str(path)
894 if not os.path.exists(path):
895 raise FileNotFoundError(f"Zarr path does not exist: {path}")
897 try:
898 shutil.rmtree(path)
899 except Exception as e:
900 raise StorageResolutionError(f"Failed to recursively delete Zarr path: {path}") from e
902 @passthrough_to_disk('.json', '.csv', '.txt')
903 def exists(self, path: Union[str, Path]) -> bool:
904 # Zarr-specific existence check (text files automatically passthrough to disk)
905 path = Path(path)
907 # If path has no file extension, treat as directory existence check
908 # This handles auto_detect_patterns asking "does this directory exist?"
909 if not path.suffix:
910 return path.exists()
912 # Otherwise, check zarr key existence (for actual files)
913 store, key = self._split_store_and_key(path)
915 # First check if the zarr store itself exists
916 if isinstance(store, str):
917 store_path = Path(store)
918 if not store_path.exists():
919 return False
921 try:
922 root_group = zarr.group(store=store)
923 return key in root_group or any(k.startswith(key.rstrip("/") + "/") for k in root_group.array_keys())
924 except Exception:
925 # If we can't open the zarr store, it doesn't exist
926 return False
928 def ensure_directory(self, directory: Union[str, Path]) -> Path:
929 """
930 No-op for zarr backend - zarr stores handle their own structure.
932 Zarr doesn't have filesystem directories that need to be "ensured".
933 Store creation and group structure is handled by save operations.
934 """
935 return Path(directory)
937 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
938 store, src_key = self._split_store_and_key(source)
939 store2, dst_key = self._split_store_and_key(link_name)
941 if store.root != store2.root:
942 raise ValueError("Symlinks must exist within the same .zarr store")
944 group = zarr.group(store=store)
945 if src_key not in group:
946 raise FileNotFoundError(f"Source key '{src_key}' not found in Zarr store")
948 if dst_key in group:
949 if not overwrite:
950 raise FileExistsError(f"Symlink target already exists at: {dst_key}")
951 # Remove existing entry if overwrite=True
952 del group[dst_key]
954 # Create a new group at the symlink path
955 link_group = group.require_group(dst_key)
956 link_group.attrs["_symlink"] = src_key # Store as declared string
958 def is_symlink(self, path: Union[str, Path]) -> bool:
959 """
960 Check if the given Zarr path represents a logical symlink (based on attribute contract).
962 Returns:
963 bool: True if the key exists and has an OpenHCS-declared symlink attribute
964 False if the key doesn't exist or is not a symlink
965 """
966 store, key = self._split_store_and_key(path)
967 group = zarr.group(store=store)
969 try:
970 obj = group[key]
971 attrs = getattr(obj, "attrs", {})
973 if "_symlink" not in attrs:
974 return False
976 # Enforce that the _symlink attr matches schema (e.g. str or list of path components)
977 if not isinstance(attrs["_symlink"], str):
978 raise StorageResolutionError(f"Invalid symlink format in Zarr attrs at: {path}")
980 return True
981 except KeyError:
982 # Key doesn't exist, so it's not a symlink
983 return False
984 except Exception as e:
985 raise StorageResolutionError(f"Failed to inspect Zarr symlink at: {path}") from e
987 def _auto_chunks(self, data: Any, chunk_divisor: int = 1) -> Tuple[int, ...]:
988 shape = data.shape
990 # Simple logic: 1/10th of each dim, with min 1
991 return tuple(max(1, s // chunk_divisor) for s in shape)
993 def is_file(self, path: Union[str, Path]) -> bool:
994 """
995 Check if a Zarr path points to a file (Zarr array), resolving both OS and Zarr-native symlinks.
997 Args:
998 path: Zarr store path (may point to key within store)
1000 Returns:
1001 bool: True if resolved path is a Zarr array
1003 Raises:
1004 FileNotFoundError: If path does not exist or broken symlink
1005 IsADirectoryError: If resolved object is a Zarr group
1006 StorageResolutionError: For other failures
1007 """
1008 path = str(path)
1010 if not os.path.exists(path):
1011 raise FileNotFoundError(f"Zarr path does not exist: {path}")
1013 try:
1014 store, key = self._split_store_and_key(path)
1015 group = zarr.group(store=store)
1017 # Resolve symlinks (Zarr-native, via .attrs)
1018 seen_keys = set()
1019 while True:
1020 if key not in group:
1021 raise FileNotFoundError(f"Zarr key does not exist: {key}")
1022 obj = group[key]
1024 if hasattr(obj, "attrs") and "_symlink" in obj.attrs:
1025 if key in seen_keys:
1026 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}")
1027 seen_keys.add(key)
1028 key = obj.attrs["_symlink"]
1029 continue
1030 break # resolution complete
1032 # Now obj is the resolved target
1033 if isinstance(obj, zarr.core.Array):
1034 return True
1035 elif isinstance(obj, zarr.hierarchy.Group):
1036 raise IsADirectoryError(f"Zarr path is a group (directory): {path}")
1037 else:
1038 raise StorageResolutionError(f"Unknown Zarr object at: {path}")
1040 except Exception as e:
1041 raise StorageResolutionError(f"Failed to resolve Zarr file path: {path}") from e
1043 def is_dir(self, path: Union[str, Path]) -> bool:
1044 """
1045 Check if a Zarr path resolves to a directory (i.e., a Zarr group).
1047 Resolves both OS-level symlinks and Zarr-native symlinks via .attrs['_symlink'].
1049 Args:
1050 path: Zarr path or URI
1052 Returns:
1053 bool: True if path resolves to a Zarr group
1055 Raises:
1056 FileNotFoundError: If path or resolved target does not exist
1057 NotADirectoryError: If resolved target is not a group
1058 StorageResolutionError: For symlink cycles or other failures
1059 """
1060 import os
1063 path = str(path)
1065 if not os.path.exists(path):
1066 raise FileNotFoundError(f"Zarr path does not exist: {path}")
1068 try:
1069 store, key = self._split_store_and_key(path)
1070 group = zarr.group(store=store)
1072 seen_keys = set()
1074 # Resolve symlink chain
1075 while True:
1076 if key not in group:
1077 raise FileNotFoundError(f"Zarr key does not exist: {key}")
1078 obj = group[key]
1080 if hasattr(obj, "attrs") and "_symlink" in obj.attrs:
1081 if key in seen_keys:
1082 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}")
1083 seen_keys.add(key)
1084 key = obj.attrs["_symlink"]
1085 continue
1086 break
1088 # obj is resolved
1089 if isinstance(obj, zarr.hierarchy.Group):
1090 return True
1091 elif isinstance(obj, zarr.core.Array):
1092 raise NotADirectoryError(f"Zarr path is an array (file): {path}")
1093 else:
1094 raise StorageResolutionError(f"Unknown Zarr object at: {path}")
1096 except Exception as e:
1097 raise StorageResolutionError(f"Failed to resolve Zarr directory path: {path}") from e
1099 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
1100 """
1101 Move a Zarr key or object (array/group) from one location to another, resolving symlinks.
1103 Supports:
1104 - Disk or memory stores
1105 - Zarr-native symlinks
1106 - Key renames within group
1107 - Full copy+delete across stores if needed
1109 Raises:
1110 FileNotFoundError: If src does not exist
1111 FileExistsError: If dst already exists
1112 StorageResolutionError: On failure
1113 """
1114 import zarr
1116 src_store, src_key = self._split_store_and_key(src)
1117 dst_store, dst_key = self._split_store_and_key(dst)
1119 src_group = zarr.group(store=src_store)
1120 dst_group = zarr.group(store=dst_store)
1122 if src_key not in src_group:
1123 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}")
1124 if dst_key in dst_group:
1125 raise FileExistsError(f"Zarr destination key already exists: {dst_key}")
1127 obj = src_group[src_key]
1129 # Resolve symlinks if present
1130 seen_keys = set()
1131 while hasattr(obj, "attrs") and "_symlink" in obj.attrs:
1132 if src_key in seen_keys:
1133 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}")
1134 seen_keys.add(src_key)
1135 src_key = obj.attrs["_symlink"]
1136 obj = src_group[src_key]
1138 try:
1139 if src_store is dst_store:
1140 # Native move within the same Zarr group/store
1141 src_group.move(src_key, dst_key)
1142 else:
1143 # Cross-store: perform manual copy + delete
1144 obj.copy(dst_group, name=dst_key)
1145 del src_group[src_key]
1146 except Exception as e:
1147 raise StorageResolutionError(f"Failed to move {src_key} to {dst_key}") from e
1149 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
1150 """
1151 Copy a Zarr key or object (array/group) from one location to another.
1153 - Resolves Zarr-native symlinks before copying
1154 - Prevents overwrite unless explicitly allowed (future feature)
1155 - Works across memory or disk stores
1157 Raises:
1158 FileNotFoundError: If src does not exist
1159 FileExistsError: If dst already exists
1160 StorageResolutionError: On failure
1161 """
1162 import zarr
1164 src_store, src_key = self._split_store_and_key(src)
1165 dst_store, dst_key = self._split_store_and_key(dst)
1167 src_group = zarr.group(store=src_store)
1168 dst_group = zarr.group(store=dst_store)
1170 if src_key not in src_group:
1171 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}")
1172 if dst_key in dst_group:
1173 raise FileExistsError(f"Zarr destination key already exists: {dst_key}")
1175 obj = src_group[src_key]
1177 seen_keys = set()
1178 while hasattr(obj, "attrs") and "_symlink" in obj.attrs:
1179 if src_key in seen_keys:
1180 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}")
1181 seen_keys.add(src_key)
1182 src_key = obj.attrs["_symlink"]
1183 obj = src_group[src_key]
1185 try:
1186 obj.copy(dst_group, name=dst_key)
1187 except Exception as e:
1188 raise StorageResolutionError(f"Failed to copy {src_key} to {dst_key}") from e
1190 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
1191 """
1192 Return structural metadata about a Zarr path.
1194 Returns:
1195 dict with keys:
1196 - 'type': 'file', 'directory', 'symlink', or 'missing'
1197 - 'key': final resolved key
1198 - 'target': symlink target if applicable
1199 - 'store': repr(store)
1200 - 'exists': bool
1202 Raises:
1203 StorageResolutionError: On resolution failure
1204 """
1205 store, key = self._split_store_and_key(path)
1206 group = zarr.group(store=store)
1208 try:
1209 if key in group:
1210 obj = group[key]
1211 attrs = getattr(obj, "attrs", {})
1212 is_link = "_symlink" in attrs
1214 if is_link:
1215 target = attrs["_symlink"]
1216 if not isinstance(target, str):
1217 raise StorageResolutionError(f"Invalid symlink format at {key}")
1218 return {
1219 "type": "symlink",
1220 "key": key,
1221 "target": target,
1222 "store": repr(store),
1223 "exists": target in group
1224 }
1226 if isinstance(obj, zarr.Array):
1227 return {
1228 "type": "file",
1229 "key": key,
1230 "store": repr(store),
1231 "exists": True
1232 }
1234 elif isinstance(obj, zarr.Group):
1235 return {
1236 "type": "directory",
1237 "key": key,
1238 "store": repr(store),
1239 "exists": True
1240 }
1242 raise StorageResolutionError(f"Unknown object type at: {key}")
1243 else:
1244 return {
1245 "type": "missing",
1246 "key": key,
1247 "store": repr(store),
1248 "exists": False
1249 }
1251 except Exception as e:
1252 raise StorageResolutionError(f"Failed to stat Zarr key {key}") from e
1254class ZarrSymlink:
1255 """
1256 Represents a symbolic link in a Zarr store.
1258 This class is used to represent symbolic links in a Zarr store.
1259 It stores the target path of the symlink.
1260 """
1261 def __init__(self, target: str):
1262 self.target = target
1264 def __repr__(self):
1265 return f"<ZarrSymlink → {self.target}>"
1268# Backwards-compatible name used by package public API
1269ZarrBackend = ZarrStorageBackend