Coverage for openhcs/io/zarr.py: 47.2%
602 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1# openhcs/io/storage/backends/zarr.py
2"""
3Zarr storage backend module for OpenHCS.
5This module provides a Zarr-backed implementation of the MicroscopyStorageBackend interface.
6It stores data in a Zarr store on disk and supports overlay operations
7for materializing data to disk when needed.
8"""
10import fnmatch
11import logging
12import threading
13from functools import wraps
14from pathlib import Path
15from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
17import numpy as np
18import zarr
20# Lazy ome-zarr loading to avoid dask → GPU library chain at import time
21_ome_zarr_state = {'available': None, 'cache': {}, 'event': threading.Event(), 'thread': None}
23logger = logging.getLogger(__name__)
26# Decorator for passthrough to disk backend
27def passthrough_to_disk(*extensions: str, ensure_parent_dir: bool = False):
28 """
29 Decorator to automatically passthrough certain file types to disk backend.
31 Zarr only supports array data, so non-array files (JSON, CSV, TXT, ROI.ZIP, etc.)
32 are automatically delegated to the disk backend.
34 Uses introspection to automatically find the path parameter (any parameter with 'path' in its name).
36 Args:
37 *extensions: File extensions to passthrough (e.g., '.json', '.csv', '.txt')
38 ensure_parent_dir: If True, ensure parent directory exists before calling disk backend (for save operations)
40 Usage:
41 @passthrough_to_disk('.json', '.csv', '.txt', '.roi.zip', '.zip', ensure_parent_dir=True)
42 def save(self, data, output_path, **kwargs):
43 # Zarr-specific save logic here
44 ...
45 """
46 import inspect
48 def decorator(method: Callable) -> Callable:
49 # Use introspection to find the path parameter index at decoration time
50 sig = inspect.signature(method)
51 path_param_index = None
53 for i, (param_name, param) in enumerate(sig.parameters.items()): 53 ↛ 62line 53 didn't jump to line 62 because the loop on line 53 didn't complete
54 if param_name == 'self':
55 continue
56 # Find first parameter with 'path' in its name
57 if 'path' in param_name.lower():
58 # Adjust for self parameter (subtract 1 since we skip 'self' in args)
59 path_param_index = i - 1
60 break
62 if path_param_index is None: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 raise ValueError(f"No path parameter found in {method.__name__} signature. "
64 f"Expected a parameter with 'path' in its name.")
66 @wraps(method)
67 def wrapper(self, *args, **kwargs):
68 # Extract path from args at the discovered index
69 path_arg = None
71 if len(args) > path_param_index: 71 ↛ 77line 71 didn't jump to line 77 because the condition on line 71 was always true
72 arg = args[path_param_index]
73 if isinstance(arg, (str, Path)): 73 ↛ 77line 73 didn't jump to line 77 because the condition on line 73 was always true
74 path_arg = str(arg)
76 # Check if path matches passthrough extensions
77 if path_arg and any(path_arg.endswith(ext) for ext in extensions):
78 from openhcs.constants.constants import Backend
79 from openhcs.io.backend_registry import get_backend_instance
80 disk_backend = get_backend_instance(Backend.DISK.value)
82 # Ensure parent directory exists if requested (for save operations)
83 if ensure_parent_dir:
84 parent_dir = Path(path_arg).parent
85 disk_backend.ensure_directory(parent_dir)
87 # Call the same method on disk backend
88 return getattr(disk_backend, method.__name__)(*args, **kwargs)
90 # Otherwise, call the original method
91 return method(self, *args, **kwargs)
93 return wrapper
94 return decorator
97def _load_ome_zarr():
98 """Load ome-zarr and cache imports."""
99 try:
100 logger.info("Loading ome-zarr...")
101 from ome_zarr.writer import write_image, write_plate_metadata, write_well_metadata
102 from ome_zarr.io import parse_url
104 _ome_zarr_state['cache'] = {
105 'write_image': write_image,
106 'write_plate_metadata': write_plate_metadata,
107 'write_well_metadata': write_well_metadata,
108 'parse_url': parse_url
109 }
110 _ome_zarr_state['available'] = True
111 logger.info("ome-zarr loaded successfully")
112 except ImportError as e:
113 _ome_zarr_state['available'] = False
114 logger.warning(f"ome-zarr not available: {e}")
115 finally:
116 _ome_zarr_state['event'].set()
119def start_ome_zarr_loading_async():
120 """Start loading ome-zarr in background thread (safe to call multiple times)."""
121 if _ome_zarr_state['thread'] is None and _ome_zarr_state['available'] is None:
122 _ome_zarr_state['thread'] = threading.Thread(
123 target=_load_ome_zarr, daemon=True, name="ome-zarr-loader"
124 )
125 _ome_zarr_state['thread'].start()
126 logger.info("Started ome-zarr background loading")
129def _ensure_ome_zarr(timeout: float = 30.0):
130 """
131 Ensure ome-zarr is loaded, waiting for background load if needed.
133 Returns: Tuple of (write_image, write_plate_metadata, write_well_metadata, parse_url)
134 Raises: ImportError if ome-zarr not available, TimeoutError if loading times out
135 """
136 # Load synchronously if not started
137 if _ome_zarr_state['available'] is None and _ome_zarr_state['thread'] is None:
138 logger.warning("ome-zarr not pre-loaded, loading synchronously (will block)")
139 _load_ome_zarr()
141 # Wait for background loading
142 if not _ome_zarr_state['event'].is_set(): 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 logger.info("Waiting for ome-zarr background loading...")
144 if not _ome_zarr_state['event'].wait(timeout):
145 raise TimeoutError(f"ome-zarr loading timed out after {timeout}s")
147 # Check availability
148 if not _ome_zarr_state['available']: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 raise ImportError("ome-zarr library not available. Install with: pip install ome-zarr")
151 cache = _ome_zarr_state['cache']
152 return (cache['write_image'], cache['write_plate_metadata'],
153 cache['write_well_metadata'], cache['parse_url'])
155# Cross-platform file locking
156try:
157 import fcntl
158 FCNTL_AVAILABLE = True
159except ImportError:
160 import portalocker
161 FCNTL_AVAILABLE = False
163from openhcs.constants.constants import Backend
164from openhcs.io.base import StorageBackend
165from openhcs.io.exceptions import StorageResolutionError
168class ZarrStorageBackend(StorageBackend):
169 """Zarr storage backend with automatic registration."""
170 _backend_type = Backend.ZARR.value
171 """
172 Zarr storage backend implementation with configurable compression.
174 This class provides a concrete implementation of the storage backend interfaces
175 for Zarr storage. It stores data in a Zarr store on disk with configurable
176 compression algorithms and settings.
178 Features:
179 - Single-chunk batch operations for 40x performance improvement
180 - Configurable compression (Blosc, Zlib, LZ4, Zstd, or none)
181 - Configurable compression levels
182 - Full path mapping for batch operations
183 """
185 def __init__(self, zarr_config: Optional['ZarrConfig'] = None):
186 """
187 Initialize Zarr backend with ZarrConfig.
189 Args:
190 zarr_config: ZarrConfig dataclass with all zarr settings (uses defaults if None)
191 """
192 # Import here to avoid circular imports
193 from openhcs.core.config import ZarrConfig
195 if zarr_config is None:
196 zarr_config = ZarrConfig()
198 self.config = zarr_config
200 # Convenience attributes
201 self.compression_level = zarr_config.compression_level
203 # Create actual compressor from config (shuffle always enabled for Blosc)
204 self.compressor = self.config.compressor.create_compressor(
205 self.config.compression_level,
206 shuffle=True # Always enable shuffle for better compression
207 )
209 def _get_compressor(self) -> Optional[Any]:
210 """
211 Get the configured compressor with appropriate settings.
213 Returns:
214 Configured compressor instance or None for no compression
215 """
216 if self.compressor is None: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 return None
219 # If compression_level is specified and compressor supports it
220 if self.compression_level is not None: 220 ↛ 234line 220 didn't jump to line 234 because the condition on line 220 was always true
221 # Check if compressor has level parameter
222 if hasattr(self.compressor, '__class__'): 222 ↛ 234line 222 didn't jump to line 234 because the condition on line 222 was always true
223 try:
224 # Create new instance with compression level
225 compressor_class = self.compressor.__class__
226 if 'level' in compressor_class.__init__.__code__.co_varnames: 226 ↛ 228line 226 didn't jump to line 228 because the condition on line 226 was always true
227 return compressor_class(level=self.compression_level)
228 elif 'clevel' in compressor_class.__init__.__code__.co_varnames:
229 return compressor_class(clevel=self.compression_level)
230 except (AttributeError, TypeError):
231 # Fall back to original compressor if level setting fails
232 pass
234 return self.compressor
236 def _calculate_chunks(self, data_shape: Tuple[int, ...]) -> Tuple[int, ...]:
237 """
238 Calculate chunk shape based on configured strategy.
240 Args:
241 data_shape: Shape of the 5D array (fields, channels, z, y, x)
243 Returns:
244 Chunk shape tuple
245 """
246 from openhcs.core.config import ZarrChunkStrategy
248 match self.config.chunk_strategy:
249 case ZarrChunkStrategy.WELL: 249 ↛ 252line 249 didn't jump to line 252 because the pattern on line 249 always matched
250 # Single chunk for entire well (current behavior, optimal for batch I/O)
251 return data_shape
252 case ZarrChunkStrategy.FILE:
253 # One chunk per individual file: (1, 1, 1, y, x)
254 # Each original tif is compressed separately
255 return (1, 1, 1, data_shape[3], data_shape[4])
257 def _split_store_and_key(self, path: Union[str, Path]) -> Tuple[Any, str]:
258 """
259 Split path into zarr store and key.
261 The zarr store is always the directory containing the image files, regardless of backend.
262 For example:
263 - "/path/to/plate_outputs/images/A01.tif" → Store: "/path/to/plate_outputs/images", Key: "A01.tif"
264 - "/path/to/plate.zarr/images/A01.tif" → Store: "/path/to/plate.zarr/images", Key: "A01.tif"
266 The images directory itself becomes the zarr store - zarr files are added within it.
267 A zarr store doesn't need to have a folder name ending in .zarr.
269 Returns a DirectoryStore with dimension_separator='/' for OME-ZARR compatibility.
270 """
271 path = Path(path)
273 # If path has a file extension (like .tif), the parent directory is the zarr store
274 if path.suffix:
275 # File path - parent directory (e.g., "images") is the zarr store
276 store_path = path.parent
277 relative_key = path.name
278 else:
279 # Directory path - treat as zarr store
280 store_path = path
281 relative_key = ""
283 # CRITICAL: Create DirectoryStore with dimension_separator='/' for OME-ZARR compatibility
284 # This ensures chunk paths use '/' instead of '.' (e.g., '0/0/0' not '0.0.0')
285 store = zarr.DirectoryStore(str(store_path), dimension_separator='/')
286 return store, relative_key
288 @passthrough_to_disk('.json', '.csv', '.txt', '.roi.zip', '.zip', ensure_parent_dir=True)
289 def save(self, data: Any, output_path: Union[str, Path], **kwargs):
290 """
291 Save data to Zarr at the given output_path.
293 Will only write if the key does not already exist.
294 Will NOT overwrite or delete existing data.
296 Raises:
297 FileExistsError: If destination key already exists
298 StorageResolutionError: If creation fails
299 """
300 # Zarr-specific save logic (non-array files automatically passthrough to disk)
301 store, key = self._split_store_and_key(output_path)
302 group = zarr.group(store=store)
304 if key in group:
305 raise FileExistsError(f"Zarr key already exists: {output_path}")
307 chunks = kwargs.get("chunks")
308 if chunks is None:
309 chunks = self._auto_chunks(data, chunk_divisor=kwargs.get("chunk_divisor", 1))
311 try:
312 # Create array with correct shape and dtype, then assign data
313 array = group.create_dataset(
314 name=key,
315 shape=data.shape,
316 dtype=data.dtype,
317 chunks=chunks,
318 compressor=kwargs.get("compressor", self._get_compressor()),
319 overwrite=False # 🔒 Must be False by doctrine
320 )
321 array[:] = data
322 except Exception as e:
323 raise StorageResolutionError(f"Failed to save to Zarr: {output_path}") from e
325 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]:
326 """
327 Load from zarr array using filename mapping.
329 Args:
330 file_paths: List of file paths to load
331 **kwargs: Additional arguments (zarr_config not needed)
333 Returns:
334 List of loaded data objects in same order as file_paths
336 Raises:
337 FileNotFoundError: If expected zarr store not found
338 KeyError: If filename not found in filename_map
339 """
340 if not file_paths: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true
341 return []
343 # Use _split_store_and_key to get store path from first file path
344 store, _ = self._split_store_and_key(file_paths[0])
345 store_path = Path(store.path)
347 # FAIL LOUD: Store must exist
348 if not store_path.exists(): 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true
349 raise FileNotFoundError(f"Expected zarr store not found: {store_path}")
350 root = zarr.open_group(store=store, mode='r')
352 # Group files by well based on OME-ZARR structure
353 well_to_files = {}
354 well_to_indices = {}
356 # Search OME-ZARR structure for requested files
357 for row_name in root.group_keys():
358 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 358 ↛ 357line 358 didn't jump to line 357 because the condition on line 358 was always true
359 row_group = root[row_name]
360 for col_name in row_group.group_keys():
361 if col_name.isdigit(): # Column directory (01, 02, etc.) 361 ↛ 360line 361 didn't jump to line 360 because the condition on line 361 was always true
362 well_group = row_group[col_name]
363 well_name = f"{row_name}{col_name}"
365 # Check if this well has our filename mapping in the field array
366 if "0" in well_group.group_keys(): 366 ↛ 360line 366 didn't jump to line 360 because the condition on line 366 was always true
367 field_group = well_group["0"]
368 if "0" in field_group.array_keys(): 368 ↛ 360line 368 didn't jump to line 360 because the condition on line 368 was always true
369 field_array = field_group["0"]
370 if "openhcs_filename_map" in field_array.attrs: 370 ↛ 360line 370 didn't jump to line 360 because the condition on line 370 was always true
371 filename_map = dict(field_array.attrs["openhcs_filename_map"])
373 # Check which requested files are in this well
374 for i, path in enumerate(file_paths):
375 filename = Path(path).name # Use filename only for matching
376 if filename in filename_map:
377 if well_name not in well_to_files:
378 well_to_files[well_name] = []
379 well_to_indices[well_name] = []
380 well_to_files[well_name].append(i) # Original position in file_paths
381 well_to_indices[well_name].append(filename_map[filename]) # 5D coordinates (field, channel, z)
383 # Load data from each well using single well chunk
384 results = [None] * len(file_paths) # Pre-allocate results array
386 for well_name, file_positions in well_to_files.items():
387 row, col = well_name[0], well_name[1:]
388 well_group = root[row][col]
389 well_indices = well_to_indices[well_name]
391 # Load entire well field array in single operation (well chunking)
392 field_group = well_group["0"]
393 field_array = field_group["0"]
394 all_well_data = field_array[:] # Single I/O operation for entire well
396 # Extract requested 2D slices using 5D coordinates
397 for file_pos, coords_5d in zip(file_positions, well_indices):
398 field_idx, channel_idx, z_idx = coords_5d
399 # Extract 2D slice: (field, channel, z, y, x) -> (y, x)
400 results[file_pos] = all_well_data[field_idx, channel_idx, z_idx, :, :] # 2D slice
402 logger.debug(f"Loaded {len(file_paths)} images from zarr store at {store_path} from {len(well_to_files)} wells")
403 return results
405 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None:
406 """Save multiple images using ome-zarr-py for proper OME-ZARR compliance with multi-dimensional support.
408 Args:
409 data_list: List of image data to save
410 output_paths: List of output file paths
411 **kwargs: Must include chunk_name, n_channels, n_z, n_fields, row, col
412 """
414 # Ensure ome-zarr is loaded (waits for background load if needed)
415 write_image, write_plate_metadata, write_well_metadata, _ = _ensure_ome_zarr()
417 # Extract required parameters from kwargs
418 chunk_name = kwargs.get('chunk_name')
419 n_channels = kwargs.get('n_channels')
420 n_z = kwargs.get('n_z')
421 n_fields = kwargs.get('n_fields')
422 row = kwargs.get('row')
423 col = kwargs.get('col')
425 # Validate required parameters
426 if chunk_name is None: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 raise ValueError("chunk_name must be provided")
428 if n_channels is None: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 raise ValueError("n_channels must be provided")
430 if n_z is None: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true
431 raise ValueError("n_z must be provided")
432 if n_fields is None: 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true
433 raise ValueError("n_fields must be provided")
434 if row is None: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 raise ValueError("row must be provided")
436 if col is None: 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true
437 raise ValueError("col must be provided")
439 if not data_list: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true
440 logger.warning(f"Empty data list for chunk {chunk_name}")
441 return
443 if not _ome_zarr_state['available']: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 raise ImportError("ome-zarr package is required. Install with: pip install ome-zarr")
446 # Use _split_store_and_key to get store path from first output path
447 store, _ = self._split_store_and_key(output_paths[0])
448 store_path = Path(store.path)
450 logger.debug(f"Saving batch for chunk {chunk_name} with {len(data_list)} images to row={row}, col={col}")
452 # Convert GPU arrays to CPU arrays before saving
453 cpu_data_list = []
454 for data in data_list:
455 if hasattr(data, 'get'): # CuPy array 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true
456 cpu_data_list.append(data.get())
457 elif hasattr(data, 'cpu'): # PyTorch tensor 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true
458 cpu_data_list.append(data.cpu().numpy())
459 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # JAX on GPU 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true
460 import jax
461 cpu_data_list.append(jax.device_get(data))
462 else: # Already CPU array (NumPy, etc.)
463 cpu_data_list.append(data)
465 # Ensure parent directory exists
466 store_path.parent.mkdir(parents=True, exist_ok=True)
468 # Use _split_store_and_key to get properly configured store with dimension_separator='/'
469 store, _ = self._split_store_and_key(store_path)
470 root = zarr.group(store=store) # Open existing or create new group without mode conflicts
472 # Set OME metadata if not already present
473 if "ome" not in root.attrs:
474 root.attrs["ome"] = {"version": "0.4"}
476 # Get the store for compatibility with existing code
477 store = root.store
479 # Write plate metadata with locking to prevent concurrent corruption
480 # Always enabled for OME-ZARR HCS compliance
481 self._ensure_plate_metadata_with_lock(root, row, col, store_path)
483 # Create HCS-compliant structure: plate/row/col/field/resolution
484 # Create row group if it doesn't exist
485 if row not in root:
486 row_group = root.create_group(row)
487 else:
488 row_group = root[row]
490 # Create well group (remove existing if present to allow overwrite)
491 if col in row_group:
492 del row_group[col]
493 well_group = row_group.create_group(col)
495 # Add HCS well metadata
496 well_metadata = {
497 "images": [
498 {
499 "path": "0", # Single image containing all fields
500 "acquisition": 0
501 }
502 ],
503 "version": "0.5"
504 }
505 well_group.attrs["ome"] = {"version": "0.5", "well": well_metadata}
507 # Create field group (single field "0" containing all field data)
508 field_group = well_group.require_group("0")
510 # Always use full 5D structure: (fields, channels, z, y, x)
511 # Define OME-NGFF compliant axes
512 axes = [
513 {'name': 'field', 'type': 'field'}, # Custom field type - allowed before space
514 {'name': 'c', 'type': 'channel'},
515 {'name': 'z', 'type': 'space'},
516 {'name': 'y', 'type': 'space'},
517 {'name': 'x', 'type': 'space'}
518 ]
520 # Get image dimensions
521 sample_image = cpu_data_list[0]
522 height, width = sample_image.shape[-2:]
524 # Always reshape to full 5D: (n_fields, n_channels, n_z, y, x)
525 target_shape = [n_fields, n_channels, n_z, height, width]
527 # Stack and reshape data
528 stacked_data = np.stack(cpu_data_list, axis=0)
530 # Calculate total expected images for validation
531 total_expected = n_fields * n_channels * n_z
532 if len(data_list) != total_expected: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 logger.warning(f"Data count mismatch: got {len(data_list)}, expected {total_expected} "
534 f"(fields={n_fields}, channels={n_channels}, z={n_z})")
536 # Log detailed shape information before reshape
537 logger.info(f"🔍 ZARR RESHAPE DEBUG:")
538 logger.info(f" - Input: {len(data_list)} images")
539 logger.info(f" - Stacked shape: {stacked_data.shape}")
540 logger.info(f" - Stacked size: {stacked_data.size}")
541 logger.info(f" - Target shape: {target_shape}")
542 logger.info(f" - Target size: {np.prod(target_shape)}")
543 logger.info(f" - Sample image shape: {sample_image.shape}")
544 logger.info(f" - Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, h={height}, w={width}")
546 # Always reshape to 5D structure
547 reshaped_data = stacked_data.reshape(target_shape)
549 logger.info(f"Zarr save_batch: {len(data_list)} images → {stacked_data.shape} → {reshaped_data.shape}")
550 axes_names = [ax['name'] for ax in axes]
551 logger.info(f"Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, axes={''.join(axes_names)}")
553 # Create field group (single field "0" containing all field data)
554 if "0" in well_group: 554 ↛ 557line 554 didn't jump to line 557 because the condition on line 554 was always true
555 field_group = well_group["0"]
556 else:
557 field_group = well_group.create_group("0")
559 # Write OME-ZARR well metadata with single field (well-chunked approach)
560 write_well_metadata(well_group, ['0'])
562 # Calculate chunks based on configured strategy
563 storage_options = {
564 "chunks": self._calculate_chunks(reshaped_data.shape),
565 "compressor": self._get_compressor()
566 }
568 # Write as single well-chunked array with proper multi-dimensional axes
569 write_image(
570 image=reshaped_data,
571 group=field_group,
572 axes=axes,
573 storage_options=storage_options,
574 scaler=None, # Single scale only for performance
575 compute=True
576 )
578 # Axes are already correctly set by write_image function
580 # Store filename mapping with 5D coordinates (field, channel, z, y, x)
581 # Convert flat index to 5D coordinates for proper zarr slicing
582 filename_map = {}
583 for i, path in enumerate(output_paths):
584 # Calculate 5D coordinates from flat index
585 field_idx = i // (n_channels * n_z)
586 remaining = i % (n_channels * n_z)
587 channel_idx = remaining // n_z
588 z_idx = remaining % n_z
590 # Store as tuple (field, channel, z) - y,x are full slices
591 filename_map[Path(path).name] = (field_idx, channel_idx, z_idx)
593 field_array = field_group['0']
594 field_array.attrs["openhcs_filename_map"] = filename_map
595 field_array.attrs["openhcs_output_paths"] = [str(path) for path in output_paths]
596 field_array.attrs["openhcs_dimensions"] = {
597 "n_fields": n_fields,
598 "n_channels": n_channels,
599 "n_z": n_z
600 }
602 logger.debug(f"Successfully saved batch for chunk {chunk_name}")
604 # Aggressive memory cleanup
605 del cpu_data_list
606 import gc
607 gc.collect()
609 def _ensure_plate_metadata_with_lock(self, root: zarr.Group, row: str, col: str, store_path: Path) -> None:
610 """Ensure plate-level metadata includes ALL existing wells with file locking."""
611 lock_path = store_path.with_suffix('.metadata.lock')
613 try:
614 with open(lock_path, 'w') as lock_file:
615 if FCNTL_AVAILABLE: 615 ↛ 618line 615 didn't jump to line 618 because the condition on line 615 was always true
616 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
617 else:
618 portalocker.lock(lock_file, portalocker.LOCK_EX)
619 self._ensure_plate_metadata(root, row, col)
620 except Exception as e:
621 logger.error(f"Failed to update plate metadata with lock: {e}")
622 raise
623 finally:
624 if lock_path.exists(): 624 ↛ exitline 624 didn't return from function '_ensure_plate_metadata_with_lock' because the condition on line 624 was always true
625 lock_path.unlink()
627 def _ensure_plate_metadata(self, root: zarr.Group, row: str, col: str) -> None:
628 """Ensure plate-level metadata includes ALL existing wells in the store."""
630 # Ensure ome-zarr is loaded
631 _, write_plate_metadata, _, _ = _ensure_ome_zarr()
633 # Scan the store for all existing wells
634 all_rows = set()
635 all_cols = set()
636 all_wells = []
638 for row_name in root.group_keys():
639 if isinstance(root[row_name], zarr.Group): # Ensure it's a row group 639 ↛ 638line 639 didn't jump to line 638 because the condition on line 639 was always true
640 row_group = root[row_name]
641 all_rows.add(row_name)
643 for col_name in row_group.group_keys():
644 if isinstance(row_group[col_name], zarr.Group): # Ensure it's a well group 644 ↛ 643line 644 didn't jump to line 643 because the condition on line 644 was always true
645 all_cols.add(col_name)
646 well_path = f"{row_name}/{col_name}"
647 all_wells.append(well_path)
649 # Include the current well being added (might not exist yet)
650 all_rows.add(row)
651 all_cols.add(col)
652 current_well_path = f"{row}/{col}"
653 if current_well_path not in all_wells:
654 all_wells.append(current_well_path)
656 # Sort for consistent ordering
657 sorted_rows = sorted(all_rows)
658 sorted_cols = sorted(all_cols)
659 sorted_wells = sorted(all_wells)
661 # Build wells metadata with proper indices
662 wells_metadata = []
663 for well_path in sorted_wells:
664 well_row, well_col = well_path.split("/")
665 row_index = sorted_rows.index(well_row)
666 col_index = sorted_cols.index(well_col)
667 wells_metadata.append({
668 "path": well_path,
669 "rowIndex": row_index,
670 "columnIndex": col_index
671 })
673 # Add acquisition metadata for HCS compliance
674 acquisitions = [
675 {
676 "id": 0,
677 "name": "default_acquisition",
678 "maximumfieldcount": 1 # Single field containing all field data
679 }
680 ]
682 # Write complete HCS plate metadata
683 write_plate_metadata(
684 root,
685 sorted_rows,
686 sorted_cols,
687 wells_metadata,
688 acquisitions=acquisitions,
689 field_count=1,
690 name="OpenHCS_Plate"
691 )
698 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
699 """
700 Load a single file from zarr store.
702 For OME-ZARR structure with filename mapping, delegates to load_batch.
703 For legacy flat structure or direct keys, uses direct key lookup.
705 Args:
706 file_path: Path to file to load
707 **kwargs: Additional arguments
709 Returns:
710 Loaded array data
712 Raises:
713 FileNotFoundError: If file not found in zarr store
714 """
715 store, key = self._split_store_and_key(file_path)
716 group = zarr.group(store=store)
718 # Check if this is OME-ZARR structure with filename mapping
719 if "plate" in group.attrs:
720 # OME-ZARR structure: use load_batch which understands filename mapping
721 result = self.load_batch([file_path], **kwargs)
722 if not result:
723 raise FileNotFoundError(f"File not found in OME-ZARR store: {file_path}")
724 return result[0]
726 # Legacy flat structure: direct key lookup with symlink resolution
727 visited = set()
728 while self.is_symlink(key):
729 if key in visited:
730 raise RuntimeError(f"Zarr symlink loop detected at {key}")
731 visited.add(key)
732 key = group[key].attrs["_symlink"]
734 if key not in group:
735 raise FileNotFoundError(f"No array found at key '{key}'")
736 return group[key][:]
738 def list_files(self,
739 directory: Union[str, Path],
740 pattern: Optional[str] = None,
741 extensions: Optional[Set[str]] = None,
742 recursive: bool = False) -> List[Path]:
743 """
744 List all file-like entries (i.e. arrays) in a Zarr store, optionally filtered.
745 Returns filenames from array attributes (output_paths) if available.
746 """
748 store, relative_key = self._split_store_and_key(directory)
749 result: List[Path] = []
751 def _matches_filters(name: str) -> bool:
752 if pattern and not fnmatch.fnmatch(name, pattern): 752 ↛ 753line 752 didn't jump to line 753 because the condition on line 752 was never true
753 return False
754 if extensions: 754 ↛ 756line 754 didn't jump to line 756 because the condition on line 754 was always true
755 return any(name.lower().endswith(ext.lower()) for ext in extensions)
756 return True
758 try:
759 # Open zarr group and traverse OME-ZARR structure
760 group = zarr.open_group(store=store)
762 # Check if this is OME-ZARR structure (has plate metadata)
763 if "plate" in group.attrs: 763 ↛ 785line 763 didn't jump to line 785 because the condition on line 763 was always true
764 # OME-ZARR structure: traverse A/01/ wells
765 for row_name in group.group_keys():
766 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 766 ↛ 765line 766 didn't jump to line 765 because the condition on line 766 was always true
767 row_group = group[row_name]
768 for col_name in row_group.group_keys():
769 if col_name.isdigit(): # Column directory (01, 02, etc.) 769 ↛ 768line 769 didn't jump to line 768 because the condition on line 769 was always true
770 well_group = row_group[col_name]
772 # Get filenames from field array metadata
773 if "0" in well_group.group_keys(): 773 ↛ 768line 773 didn't jump to line 768 because the condition on line 773 was always true
774 field_group = well_group["0"]
775 if "0" in field_group.array_keys(): 775 ↛ 768line 775 didn't jump to line 768 because the condition on line 775 was always true
776 field_array = field_group["0"]
777 if "openhcs_output_paths" in field_array.attrs: 777 ↛ 768line 777 didn't jump to line 768 because the condition on line 777 was always true
778 output_paths = field_array.attrs["openhcs_output_paths"]
779 for filename in output_paths:
780 filename_only = Path(filename).name
781 if _matches_filters(filename_only): 781 ↛ 779line 781 didn't jump to line 779 because the condition on line 781 was always true
782 result.append(Path(filename))
783 else:
784 # Legacy flat structure: get array keys directly
785 array_keys = list(group.array_keys())
786 for array_key in array_keys:
787 try:
788 array = group[array_key]
789 if "output_paths" in array.attrs:
790 # Get original filenames from array attributes
791 output_paths = array.attrs["output_paths"]
792 for filename in output_paths:
793 filename_only = Path(filename).name
794 if _matches_filters(filename_only):
795 result.append(Path(filename))
797 except Exception as e:
798 # Skip arrays that can't be accessed
799 continue
801 except Exception as e:
802 raise StorageResolutionError(f"Failed to list zarr arrays: {e}") from e
804 return result
806 def list_dir(self, path: Union[str, Path]) -> List[str]:
807 store, relative_key = self._split_store_and_key(path)
809 # Normalize key for Zarr API
810 key = relative_key.rstrip("/")
812 try:
813 # Zarr 3.x uses async API - convert async generator to list
814 import asyncio
815 async def _get_entries():
816 entries = []
817 async for entry in store.list_dir(key):
818 entries.append(entry)
819 return entries
820 return asyncio.run(_get_entries())
821 except KeyError:
822 raise NotADirectoryError(f"Zarr path is not a directory: {path}")
823 except FileNotFoundError:
824 raise FileNotFoundError(f"Zarr path does not exist: {path}")
827 def delete(self, path: Union[str, Path]) -> None:
828 """
829 Delete a Zarr array (file) or empty group (directory) at the given path.
831 Args:
832 path: Zarr path or URI
834 Raises:
835 FileNotFoundError: If path does not exist
836 IsADirectoryError: If path is a non-empty group
837 StorageResolutionError: For unexpected failures
838 """
839 import zarr
840 import shutil
841 import os
843 # Passthrough to disk backend for text files (JSON, CSV, TXT)
844 path_str = str(path)
845 if path_str.endswith(('.json', '.csv', '.txt')):
846 from openhcs.io.backend_registry import get_backend_instance
847 disk_backend = get_backend_instance(Backend.DISK.value)
848 return disk_backend.delete(path)
850 path = str(path)
852 if not os.path.exists(path):
853 raise FileNotFoundError(f"Zarr path does not exist: {path}")
855 try:
856 zarr_obj = zarr.open(path, mode='r')
857 except Exception as e:
858 raise StorageResolutionError(f"Failed to open Zarr path: {path}") from e
860 # Determine if it's a file (array) or directory (group)
861 if isinstance(zarr_obj, zarr.core.Array):
862 try:
863 shutil.rmtree(path) # Array folders can be deleted directly
864 except Exception as e:
865 raise StorageResolutionError(f"Failed to delete Zarr array: {path}") from e
867 elif isinstance(zarr_obj, zarr.hierarchy.Group):
868 if os.listdir(path):
869 raise IsADirectoryError(f"Zarr group is not empty: {path}")
870 try:
871 os.rmdir(path)
872 except Exception as e:
873 raise StorageResolutionError(f"Failed to delete empty Zarr group: {path}") from e
874 else:
875 raise StorageResolutionError(f"Unrecognized Zarr object type at: {path}")
877 def delete_all(self, path: Union[str, Path]) -> None:
878 """
879 Recursively delete a Zarr array or group (file or directory).
881 This is the only permitted recursive deletion method for the Zarr backend.
883 Args:
884 path: the path shared through all backnds
886 Raises:
887 FileNotFoundError: If the path does not exist
888 StorageResolutionError: If deletion fails
889 """
890 import os
891 import shutil
893 path = str(path)
895 if not os.path.exists(path):
896 raise FileNotFoundError(f"Zarr path does not exist: {path}")
898 try:
899 shutil.rmtree(path)
900 except Exception as e:
901 raise StorageResolutionError(f"Failed to recursively delete Zarr path: {path}") from e
903 @passthrough_to_disk('.json', '.csv', '.txt')
904 def exists(self, path: Union[str, Path]) -> bool:
905 # Zarr-specific existence check (text files automatically passthrough to disk)
906 path = Path(path)
908 # If path has no file extension, treat as directory existence check
909 # This handles auto_detect_patterns asking "does this directory exist?"
910 if not path.suffix: 910 ↛ 914line 910 didn't jump to line 914 because the condition on line 910 was always true
911 return path.exists()
913 # Otherwise, check zarr key existence (for actual files)
914 store, key = self._split_store_and_key(path)
916 # First check if the zarr store itself exists
917 if isinstance(store, str):
918 store_path = Path(store)
919 if not store_path.exists():
920 return False
922 try:
923 root_group = zarr.group(store=store)
924 return key in root_group or any(k.startswith(key.rstrip("/") + "/") for k in root_group.array_keys())
925 except Exception:
926 # If we can't open the zarr store, it doesn't exist
927 return False
929 def ensure_directory(self, directory: Union[str, Path]) -> Path:
930 """
931 No-op for zarr backend - zarr stores handle their own structure.
933 Zarr doesn't have filesystem directories that need to be "ensured".
934 Store creation and group structure is handled by save operations.
935 """
936 return Path(directory)
938 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
939 store, src_key = self._split_store_and_key(source)
940 store2, dst_key = self._split_store_and_key(link_name)
942 if store.root != store2.root:
943 raise ValueError("Symlinks must exist within the same .zarr store")
945 group = zarr.group(store=store)
946 if src_key not in group:
947 raise FileNotFoundError(f"Source key '{src_key}' not found in Zarr store")
949 if dst_key in group:
950 if not overwrite:
951 raise FileExistsError(f"Symlink target already exists at: {dst_key}")
952 # Remove existing entry if overwrite=True
953 del group[dst_key]
955 # Create a new group at the symlink path
956 link_group = group.require_group(dst_key)
957 link_group.attrs["_symlink"] = src_key # Store as declared string
959 def is_symlink(self, path: Union[str, Path]) -> bool:
960 """
961 Check if the given Zarr path represents a logical symlink (based on attribute contract).
963 Returns:
964 bool: True if the key exists and has an OpenHCS-declared symlink attribute
965 False if the key doesn't exist or is not a symlink
966 """
967 store, key = self._split_store_and_key(path)
968 group = zarr.group(store=store)
970 try:
971 obj = group[key]
972 attrs = getattr(obj, "attrs", {})
974 if "_symlink" not in attrs:
975 return False
977 # Enforce that the _symlink attr matches schema (e.g. str or list of path components)
978 if not isinstance(attrs["_symlink"], str):
979 raise StorageResolutionError(f"Invalid symlink format in Zarr attrs at: {path}")
981 return True
982 except KeyError:
983 # Key doesn't exist, so it's not a symlink
984 return False
985 except Exception as e:
986 raise StorageResolutionError(f"Failed to inspect Zarr symlink at: {path}") from e
988 def _auto_chunks(self, data: Any, chunk_divisor: int = 1) -> Tuple[int, ...]:
989 shape = data.shape
991 # Simple logic: 1/10th of each dim, with min 1
992 return tuple(max(1, s // chunk_divisor) for s in shape)
994 def is_file(self, path: Union[str, Path]) -> bool:
995 """
996 Check if a Zarr path points to a file (Zarr array), resolving both OS and Zarr-native symlinks.
998 Args:
999 path: Zarr store path (may point to key within store)
1001 Returns:
1002 bool: True if resolved path is a Zarr array
1004 Raises:
1005 FileNotFoundError: If path does not exist or broken symlink
1006 IsADirectoryError: If resolved object is a Zarr group
1007 StorageResolutionError: For other failures
1008 """
1009 path = str(path)
1011 if not os.path.exists(path):
1012 raise FileNotFoundError(f"Zarr path does not exist: {path}")
1014 try:
1015 store, key = self._split_store_and_key(path)
1016 group = zarr.group(store=store)
1018 # Resolve symlinks (Zarr-native, via .attrs)
1019 seen_keys = set()
1020 while True:
1021 if key not in group:
1022 raise FileNotFoundError(f"Zarr key does not exist: {key}")
1023 obj = group[key]
1025 if hasattr(obj, "attrs") and "_symlink" in obj.attrs:
1026 if key in seen_keys:
1027 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}")
1028 seen_keys.add(key)
1029 key = obj.attrs["_symlink"]
1030 continue
1031 break # resolution complete
1033 # Now obj is the resolved target
1034 if isinstance(obj, zarr.core.Array):
1035 return True
1036 elif isinstance(obj, zarr.hierarchy.Group):
1037 raise IsADirectoryError(f"Zarr path is a group (directory): {path}")
1038 else:
1039 raise StorageResolutionError(f"Unknown Zarr object at: {path}")
1041 except Exception as e:
1042 raise StorageResolutionError(f"Failed to resolve Zarr file path: {path}") from e
1044 def is_dir(self, path: Union[str, Path]) -> bool:
1045 """
1046 Check if a Zarr path resolves to a directory (i.e., a Zarr group).
1048 Resolves both OS-level symlinks and Zarr-native symlinks via .attrs['_symlink'].
1050 Args:
1051 path: Zarr path or URI
1053 Returns:
1054 bool: True if path resolves to a Zarr group
1056 Raises:
1057 FileNotFoundError: If path or resolved target does not exist
1058 NotADirectoryError: If resolved target is not a group
1059 StorageResolutionError: For symlink cycles or other failures
1060 """
1061 import os
1064 path = str(path)
1066 if not os.path.exists(path):
1067 raise FileNotFoundError(f"Zarr path does not exist: {path}")
1069 try:
1070 store, key = self._split_store_and_key(path)
1071 group = zarr.group(store=store)
1073 seen_keys = set()
1075 # Resolve symlink chain
1076 while True:
1077 if key not in group:
1078 raise FileNotFoundError(f"Zarr key does not exist: {key}")
1079 obj = group[key]
1081 if hasattr(obj, "attrs") and "_symlink" in obj.attrs:
1082 if key in seen_keys:
1083 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}")
1084 seen_keys.add(key)
1085 key = obj.attrs["_symlink"]
1086 continue
1087 break
1089 # obj is resolved
1090 if isinstance(obj, zarr.hierarchy.Group):
1091 return True
1092 elif isinstance(obj, zarr.core.Array):
1093 raise NotADirectoryError(f"Zarr path is an array (file): {path}")
1094 else:
1095 raise StorageResolutionError(f"Unknown Zarr object at: {path}")
1097 except Exception as e:
1098 raise StorageResolutionError(f"Failed to resolve Zarr directory path: {path}") from e
1100 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
1101 """
1102 Move a Zarr key or object (array/group) from one location to another, resolving symlinks.
1104 Supports:
1105 - Disk or memory stores
1106 - Zarr-native symlinks
1107 - Key renames within group
1108 - Full copy+delete across stores if needed
1110 Raises:
1111 FileNotFoundError: If src does not exist
1112 FileExistsError: If dst already exists
1113 StorageResolutionError: On failure
1114 """
1115 import zarr
1117 src_store, src_key = self._split_store_and_key(src)
1118 dst_store, dst_key = self._split_store_and_key(dst)
1120 src_group = zarr.group(store=src_store)
1121 dst_group = zarr.group(store=dst_store)
1123 if src_key not in src_group:
1124 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}")
1125 if dst_key in dst_group:
1126 raise FileExistsError(f"Zarr destination key already exists: {dst_key}")
1128 obj = src_group[src_key]
1130 # Resolve symlinks if present
1131 seen_keys = set()
1132 while hasattr(obj, "attrs") and "_symlink" in obj.attrs:
1133 if src_key in seen_keys:
1134 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}")
1135 seen_keys.add(src_key)
1136 src_key = obj.attrs["_symlink"]
1137 obj = src_group[src_key]
1139 try:
1140 if src_store is dst_store:
1141 # Native move within the same Zarr group/store
1142 src_group.move(src_key, dst_key)
1143 else:
1144 # Cross-store: perform manual copy + delete
1145 obj.copy(dst_group, name=dst_key)
1146 del src_group[src_key]
1147 except Exception as e:
1148 raise StorageResolutionError(f"Failed to move {src_key} to {dst_key}") from e
1150 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
1151 """
1152 Copy a Zarr key or object (array/group) from one location to another.
1154 - Resolves Zarr-native symlinks before copying
1155 - Prevents overwrite unless explicitly allowed (future feature)
1156 - Works across memory or disk stores
1158 Raises:
1159 FileNotFoundError: If src does not exist
1160 FileExistsError: If dst already exists
1161 StorageResolutionError: On failure
1162 """
1163 import zarr
1165 src_store, src_key = self._split_store_and_key(src)
1166 dst_store, dst_key = self._split_store_and_key(dst)
1168 src_group = zarr.group(store=src_store)
1169 dst_group = zarr.group(store=dst_store)
1171 if src_key not in src_group:
1172 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}")
1173 if dst_key in dst_group:
1174 raise FileExistsError(f"Zarr destination key already exists: {dst_key}")
1176 obj = src_group[src_key]
1178 seen_keys = set()
1179 while hasattr(obj, "attrs") and "_symlink" in obj.attrs:
1180 if src_key in seen_keys:
1181 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}")
1182 seen_keys.add(src_key)
1183 src_key = obj.attrs["_symlink"]
1184 obj = src_group[src_key]
1186 try:
1187 obj.copy(dst_group, name=dst_key)
1188 except Exception as e:
1189 raise StorageResolutionError(f"Failed to copy {src_key} to {dst_key}") from e
1191 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
1192 """
1193 Return structural metadata about a Zarr path.
1195 Returns:
1196 dict with keys:
1197 - 'type': 'file', 'directory', 'symlink', or 'missing'
1198 - 'key': final resolved key
1199 - 'target': symlink target if applicable
1200 - 'store': repr(store)
1201 - 'exists': bool
1203 Raises:
1204 StorageResolutionError: On resolution failure
1205 """
1206 store, key = self._split_store_and_key(path)
1207 group = zarr.group(store=store)
1209 try:
1210 if key in group:
1211 obj = group[key]
1212 attrs = getattr(obj, "attrs", {})
1213 is_link = "_symlink" in attrs
1215 if is_link:
1216 target = attrs["_symlink"]
1217 if not isinstance(target, str):
1218 raise StorageResolutionError(f"Invalid symlink format at {key}")
1219 return {
1220 "type": "symlink",
1221 "key": key,
1222 "target": target,
1223 "store": repr(store),
1224 "exists": target in group
1225 }
1227 if isinstance(obj, zarr.Array):
1228 return {
1229 "type": "file",
1230 "key": key,
1231 "store": repr(store),
1232 "exists": True
1233 }
1235 elif isinstance(obj, zarr.Group):
1236 return {
1237 "type": "directory",
1238 "key": key,
1239 "store": repr(store),
1240 "exists": True
1241 }
1243 raise StorageResolutionError(f"Unknown object type at: {key}")
1244 else:
1245 return {
1246 "type": "missing",
1247 "key": key,
1248 "store": repr(store),
1249 "exists": False
1250 }
1252 except Exception as e:
1253 raise StorageResolutionError(f"Failed to stat Zarr key {key}") from e
1255class ZarrSymlink:
1256 """
1257 Represents a symbolic link in a Zarr store.
1259 This class is used to represent symbolic links in a Zarr store.
1260 It stores the target path of the symlink.
1261 """
1262 def __init__(self, target: str):
1263 self.target = target
1265 def __repr__(self):
1266 return f"<ZarrSymlink → {self.target}>"