Coverage for src/polystore/zarr.py: 20%

600 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 06:58 +0000

1# openhcs/io/storage/backends/zarr.py 

2""" 

3Zarr storage backend module for OpenHCS. 

4 

5This module provides a Zarr-backed implementation of the MicroscopyStorageBackend interface. 

6It stores data in a Zarr store on disk and supports overlay operations 

7for materializing data to disk when needed. 

8""" 

9 

10import fnmatch 

11import logging 

12import threading 

13from functools import wraps 

14from pathlib import Path 

15from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

16 

17import numpy as np 

18import zarr 

19 

20# Lazy ome-zarr loading to avoid dask → GPU library chain at import time 

21_ome_zarr_state = {'available': None, 'cache': {}, 'event': threading.Event(), 'thread': None} 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26# Decorator for passthrough to disk backend 

27def passthrough_to_disk(*extensions: str, ensure_parent_dir: bool = False): 

28 """ 

29 Decorator to automatically passthrough certain file types to disk backend. 

30 

31 Zarr only supports array data, so non-array files (JSON, CSV, TXT, ROI.ZIP, etc.) 

32 are automatically delegated to the disk backend. 

33 

34 Uses introspection to automatically find the path parameter (any parameter with 'path' in its name). 

35 

36 Args: 

37 *extensions: File extensions to passthrough (e.g., '.json', '.csv', '.txt') 

38 ensure_parent_dir: If True, ensure parent directory exists before calling disk backend (for save operations) 

39 

40 Usage: 

41 @passthrough_to_disk('.json', '.csv', '.txt', '.roi.zip', '.zip', ensure_parent_dir=True) 

42 def save(self, data, output_path, **kwargs): 

43 # Zarr-specific save logic here 

44 ... 

45 """ 

46 import inspect 

47 

48 def decorator(method: Callable) -> Callable: 

49 # Use introspection to find the path parameter index at decoration time 

50 sig = inspect.signature(method) 

51 path_param_index = None 

52 

53 for i, (param_name, param) in enumerate(sig.parameters.items()): 

54 if param_name == 'self': 

55 continue 

56 # Find first parameter with 'path' in its name 

57 if 'path' in param_name.lower(): 

58 # Adjust for self parameter (subtract 1 since we skip 'self' in args) 

59 path_param_index = i - 1 

60 break 

61 

62 if path_param_index is None: 

63 raise ValueError(f"No path parameter found in {method.__name__} signature. " 

64 f"Expected a parameter with 'path' in its name.") 

65 

66 @wraps(method) 

67 def wrapper(self, *args, **kwargs): 

68 # Extract path from args at the discovered index 

69 path_arg = None 

70 

71 if len(args) > path_param_index: 

72 arg = args[path_param_index] 

73 if isinstance(arg, (str, Path)): 

74 path_arg = str(arg) 

75 

76 # Check if path matches passthrough extensions 

77 if path_arg and any(path_arg.endswith(ext) for ext in extensions): 

78 # Use local backend registry to avoid OpenHCS dependency 

79 disk_backend = get_backend_instance('disk') 

80 

81 # Ensure parent directory exists if requested (for save operations) 

82 if ensure_parent_dir: 

83 parent_dir = Path(path_arg).parent 

84 disk_backend.ensure_directory(parent_dir) 

85 

86 # Call the same method on disk backend 

87 return getattr(disk_backend, method.__name__)(*args, **kwargs) 

88 

89 # Otherwise, call the original method 

90 return method(self, *args, **kwargs) 

91 

92 return wrapper 

93 return decorator 

94 

95 

96def _load_ome_zarr(): 

97 """Load ome-zarr and cache imports.""" 

98 try: 

99 logger.info("Loading ome-zarr...") 

100 from ome_zarr.writer import write_image, write_plate_metadata, write_well_metadata 

101 from ome_zarr.io import parse_url 

102 

103 _ome_zarr_state['cache'] = { 

104 'write_image': write_image, 

105 'write_plate_metadata': write_plate_metadata, 

106 'write_well_metadata': write_well_metadata, 

107 'parse_url': parse_url 

108 } 

109 _ome_zarr_state['available'] = True 

110 logger.info("ome-zarr loaded successfully") 

111 except ImportError as e: 

112 _ome_zarr_state['available'] = False 

113 logger.warning(f"ome-zarr not available: {e}") 

114 finally: 

115 _ome_zarr_state['event'].set() 

116 

117 

118def start_ome_zarr_loading_async(): 

119 """Start loading ome-zarr in background thread (safe to call multiple times).""" 

120 if _ome_zarr_state['thread'] is None and _ome_zarr_state['available'] is None: 

121 _ome_zarr_state['thread'] = threading.Thread( 

122 target=_load_ome_zarr, daemon=True, name="ome-zarr-loader" 

123 ) 

124 _ome_zarr_state['thread'].start() 

125 logger.info("Started ome-zarr background loading") 

126 

127 

128def _ensure_ome_zarr(timeout: float = 30.0): 

129 """ 

130 Ensure ome-zarr is loaded, waiting for background load if needed. 

131 

132 Returns: Tuple of (write_image, write_plate_metadata, write_well_metadata, parse_url) 

133 Raises: ImportError if ome-zarr not available, TimeoutError if loading times out 

134 """ 

135 # Load synchronously if not started 

136 if _ome_zarr_state['available'] is None and _ome_zarr_state['thread'] is None: 

137 logger.warning("ome-zarr not pre-loaded, loading synchronously (will block)") 

138 _load_ome_zarr() 

139 

140 # Wait for background loading 

141 if not _ome_zarr_state['event'].is_set(): 

142 logger.info("Waiting for ome-zarr background loading...") 

143 if not _ome_zarr_state['event'].wait(timeout): 

144 raise TimeoutError(f"ome-zarr loading timed out after {timeout}s") 

145 

146 # Check availability 

147 if not _ome_zarr_state['available']: 

148 raise ImportError("ome-zarr library not available. Install with: pip install ome-zarr") 

149 

150 cache = _ome_zarr_state['cache'] 

151 return (cache['write_image'], cache['write_plate_metadata'], 

152 cache['write_well_metadata'], cache['parse_url']) 

153 

154# Cross-platform file locking 

155try: 

156 import fcntl 

157 FCNTL_AVAILABLE = True 

158except ImportError: 

159 import portalocker 

160 FCNTL_AVAILABLE = False 

161 

162from .backend_registry import get_backend_instance 

163from .base import StorageBackend 

164from .exceptions import StorageResolutionError 

165 

166 

167class ZarrStorageBackend(StorageBackend): 

168 """Zarr storage backend with automatic registration.""" 

169 # Use simple backend type string to avoid depending on OpenHCS enums 

170 _backend_type = "zarr" 

171 """ 

172 Zarr storage backend implementation with configurable compression. 

173 

174 This class provides a concrete implementation of the storage backend interfaces 

175 for Zarr storage. It stores data in a Zarr store on disk with configurable 

176 compression algorithms and settings. 

177 

178 Features: 

179 - Single-chunk batch operations for 40x performance improvement 

180 - Configurable compression (Blosc, Zlib, LZ4, Zstd, or none) 

181 - Configurable compression levels 

182 - Full path mapping for batch operations 

183 """ 

184 

185 def __init__(self, zarr_config: Optional['ZarrConfig'] = None): 

186 """ 

187 Initialize Zarr backend with ZarrConfig. 

188 

189 Args: 

190 zarr_config: ZarrConfig dataclass with all zarr settings (uses defaults if None) 

191 """ 

192 # Import local ZarrConfig to remain OpenHCS-agnostic 

193 from .config import ZarrConfig 

194 

195 if zarr_config is None: 

196 zarr_config = ZarrConfig() 

197 

198 self.config = zarr_config 

199 

200 # Convenience attributes 

201 self.compression_level = zarr_config.compression_level 

202 

203 # Create actual compressor from config (shuffle always enabled for Blosc) 

204 self.compressor = self.config.compressor.create_compressor( 

205 self.config.compression_level, 

206 shuffle=True # Always enable shuffle for better compression 

207 ) 

208 

209 def _get_compressor(self) -> Optional[Any]: 

210 """ 

211 Get the configured compressor with appropriate settings. 

212 

213 Returns: 

214 Configured compressor instance or None for no compression 

215 """ 

216 if self.compressor is None: 

217 return None 

218 

219 # If compression_level is specified and compressor supports it 

220 if self.compression_level is not None: 

221 # Check if compressor has level parameter 

222 if hasattr(self.compressor, '__class__'): 

223 try: 

224 # Create new instance with compression level 

225 compressor_class = self.compressor.__class__ 

226 if 'level' in compressor_class.__init__.__code__.co_varnames: 

227 return compressor_class(level=self.compression_level) 

228 elif 'clevel' in compressor_class.__init__.__code__.co_varnames: 

229 return compressor_class(clevel=self.compression_level) 

230 except (AttributeError, TypeError): 

231 # Fall back to original compressor if level setting fails 

232 pass 

233 

234 return self.compressor 

235 

236 def _calculate_chunks(self, data_shape: Tuple[int, ...]) -> Tuple[int, ...]: 

237 """ 

238 Calculate chunk shape based on configured strategy. 

239 

240 Args: 

241 data_shape: Shape of the 5D array (fields, channels, z, y, x) 

242 

243 Returns: 

244 Chunk shape tuple 

245 """ 

246 from .config import ZarrChunkStrategy 

247 

248 match self.config.chunk_strategy: 

249 case ZarrChunkStrategy.WELL: 

250 # Single chunk for entire well (current behavior, optimal for batch I/O) 

251 return data_shape 

252 case ZarrChunkStrategy.FILE: 

253 # One chunk per individual file: (1, 1, 1, y, x) 

254 # Each original tif is compressed separately 

255 return (1, 1, 1, data_shape[3], data_shape[4]) 

256 

257 def _split_store_and_key(self, path: Union[str, Path]) -> Tuple[Any, str]: 

258 """ 

259 Split path into zarr store and key. 

260 

261 The zarr store is always the directory containing the image files, regardless of backend. 

262 For example: 

263 - "/path/to/plate_outputs/images/A01.tif" → Store: "/path/to/plate_outputs/images", Key: "A01.tif" 

264 - "/path/to/plate.zarr/images/A01.tif" → Store: "/path/to/plate.zarr/images", Key: "A01.tif" 

265 

266 The images directory itself becomes the zarr store - zarr files are added within it. 

267 A zarr store doesn't need to have a folder name ending in .zarr. 

268 

269 Returns a DirectoryStore with dimension_separator='/' for OME-ZARR compatibility. 

270 """ 

271 path = Path(path) 

272 

273 # If path has a file extension (like .tif), the parent directory is the zarr store 

274 if path.suffix: 

275 # File path - parent directory (e.g., "images") is the zarr store 

276 store_path = path.parent 

277 relative_key = path.name 

278 else: 

279 # Directory path - treat as zarr store 

280 store_path = path 

281 relative_key = "" 

282 

283 # CRITICAL: Create DirectoryStore with dimension_separator='/' for OME-ZARR compatibility 

284 # This ensures chunk paths use '/' instead of '.' (e.g., '0/0/0' not '0.0.0') 

285 store = zarr.DirectoryStore(str(store_path), dimension_separator='/') 

286 return store, relative_key 

287 

288 @passthrough_to_disk('.json', '.csv', '.txt', '.roi.zip', '.zip', ensure_parent_dir=True) 

289 def save(self, data: Any, output_path: Union[str, Path], **kwargs): 

290 """ 

291 Save data to Zarr at the given output_path. 

292 

293 Will only write if the key does not already exist. 

294 Will NOT overwrite or delete existing data. 

295 

296 Raises: 

297 FileExistsError: If destination key already exists 

298 StorageResolutionError: If creation fails 

299 """ 

300 # Zarr-specific save logic (non-array files automatically passthrough to disk) 

301 store, key = self._split_store_and_key(output_path) 

302 group = zarr.group(store=store) 

303 

304 if key in group: 

305 raise FileExistsError(f"Zarr key already exists: {output_path}") 

306 

307 chunks = kwargs.get("chunks") 

308 if chunks is None: 

309 chunks = self._auto_chunks(data, chunk_divisor=kwargs.get("chunk_divisor", 1)) 

310 

311 try: 

312 # Create array with correct shape and dtype, then assign data 

313 array = group.create_dataset( 

314 name=key, 

315 shape=data.shape, 

316 dtype=data.dtype, 

317 chunks=chunks, 

318 compressor=kwargs.get("compressor", self._get_compressor()), 

319 overwrite=False # 🔒 Must be False by doctrine 

320 ) 

321 array[:] = data 

322 except Exception as e: 

323 raise StorageResolutionError(f"Failed to save to Zarr: {output_path}") from e 

324 

325 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

326 """ 

327 Load from zarr array using filename mapping. 

328 

329 Args: 

330 file_paths: List of file paths to load 

331 **kwargs: Additional arguments (zarr_config not needed) 

332 

333 Returns: 

334 List of loaded data objects in same order as file_paths 

335 

336 Raises: 

337 FileNotFoundError: If expected zarr store not found 

338 KeyError: If filename not found in filename_map 

339 """ 

340 if not file_paths: 

341 return [] 

342 

343 # Use _split_store_and_key to get store path from first file path 

344 store, _ = self._split_store_and_key(file_paths[0]) 

345 store_path = Path(store.path) 

346 

347 # FAIL LOUD: Store must exist 

348 if not store_path.exists(): 

349 raise FileNotFoundError(f"Expected zarr store not found: {store_path}") 

350 root = zarr.open_group(store=store, mode='r') 

351 

352 # Group files by well based on OME-ZARR structure 

353 well_to_files = {} 

354 well_to_indices = {} 

355 

356 # Search OME-ZARR structure for requested files 

357 for row_name in root.group_keys(): 

358 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 

359 row_group = root[row_name] 

360 for col_name in row_group.group_keys(): 

361 if col_name.isdigit(): # Column directory (01, 02, etc.) 

362 well_group = row_group[col_name] 

363 well_name = f"{row_name}{col_name}" 

364 

365 # Check if this well has our filename mapping in the field array 

366 if "0" in well_group.group_keys(): 

367 field_group = well_group["0"] 

368 if "0" in field_group.array_keys(): 

369 field_array = field_group["0"] 

370 if "openhcs_filename_map" in field_array.attrs: 

371 filename_map = dict(field_array.attrs["openhcs_filename_map"]) 

372 

373 # Check which requested files are in this well 

374 for i, path in enumerate(file_paths): 

375 filename = Path(path).name # Use filename only for matching 

376 if filename in filename_map: 

377 if well_name not in well_to_files: 

378 well_to_files[well_name] = [] 

379 well_to_indices[well_name] = [] 

380 well_to_files[well_name].append(i) # Original position in file_paths 

381 well_to_indices[well_name].append(filename_map[filename]) # 5D coordinates (field, channel, z) 

382 

383 # Load data from each well using single well chunk 

384 results = [None] * len(file_paths) # Pre-allocate results array 

385 

386 for well_name, file_positions in well_to_files.items(): 

387 row, col = well_name[0], well_name[1:] 

388 well_group = root[row][col] 

389 well_indices = well_to_indices[well_name] 

390 

391 # Load entire well field array in single operation (well chunking) 

392 field_group = well_group["0"] 

393 field_array = field_group["0"] 

394 all_well_data = field_array[:] # Single I/O operation for entire well 

395 

396 # Extract requested 2D slices using 5D coordinates 

397 for file_pos, coords_5d in zip(file_positions, well_indices): 

398 field_idx, channel_idx, z_idx = coords_5d 

399 # Extract 2D slice: (field, channel, z, y, x) -> (y, x) 

400 results[file_pos] = all_well_data[field_idx, channel_idx, z_idx, :, :] # 2D slice 

401 

402 logger.debug(f"Loaded {len(file_paths)} images from zarr store at {store_path} from {len(well_to_files)} wells") 

403 return results 

404 

405 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

406 """Save multiple images using ome-zarr-py for proper OME-ZARR compliance with multi-dimensional support. 

407 

408 Args: 

409 data_list: List of image data to save 

410 output_paths: List of output file paths 

411 **kwargs: Must include chunk_name, n_channels, n_z, n_fields, row, col 

412 """ 

413 

414 # Ensure ome-zarr is loaded (waits for background load if needed) 

415 write_image, write_plate_metadata, write_well_metadata, _ = _ensure_ome_zarr() 

416 

417 # Extract required parameters from kwargs 

418 chunk_name = kwargs.get('chunk_name') 

419 n_channels = kwargs.get('n_channels') 

420 n_z = kwargs.get('n_z') 

421 n_fields = kwargs.get('n_fields') 

422 row = kwargs.get('row') 

423 col = kwargs.get('col') 

424 

425 # Validate required parameters 

426 if chunk_name is None: 

427 raise ValueError("chunk_name must be provided") 

428 if n_channels is None: 

429 raise ValueError("n_channels must be provided") 

430 if n_z is None: 

431 raise ValueError("n_z must be provided") 

432 if n_fields is None: 

433 raise ValueError("n_fields must be provided") 

434 if row is None: 

435 raise ValueError("row must be provided") 

436 if col is None: 

437 raise ValueError("col must be provided") 

438 

439 if not data_list: 

440 logger.warning(f"Empty data list for chunk {chunk_name}") 

441 return 

442 

443 if not _ome_zarr_state['available']: 

444 raise ImportError("ome-zarr package is required. Install with: pip install ome-zarr") 

445 

446 # Use _split_store_and_key to get store path from first output path 

447 store, _ = self._split_store_and_key(output_paths[0]) 

448 store_path = Path(store.path) 

449 

450 logger.debug(f"Saving batch for chunk {chunk_name} with {len(data_list)} images to row={row}, col={col}") 

451 

452 # Convert GPU arrays to CPU arrays before saving 

453 cpu_data_list = [] 

454 for data in data_list: 

455 if hasattr(data, 'get'): # CuPy array 

456 cpu_data_list.append(data.get()) 

457 elif hasattr(data, 'cpu'): # PyTorch tensor 

458 cpu_data_list.append(data.cpu().numpy()) 

459 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # JAX on GPU 

460 import jax 

461 cpu_data_list.append(jax.device_get(data)) 

462 else: # Already CPU array (NumPy, etc.) 

463 cpu_data_list.append(data) 

464 

465 # Ensure parent directory exists 

466 store_path.parent.mkdir(parents=True, exist_ok=True) 

467 

468 # Use _split_store_and_key to get properly configured store with dimension_separator='/' 

469 store, _ = self._split_store_and_key(store_path) 

470 root = zarr.group(store=store) # Open existing or create new group without mode conflicts 

471 

472 # Set OME metadata if not already present 

473 if "ome" not in root.attrs: 

474 root.attrs["ome"] = {"version": "0.4"} 

475 

476 # Get the store for compatibility with existing code 

477 store = root.store 

478 

479 # Write plate metadata with locking to prevent concurrent corruption 

480 # Always enabled for OME-ZARR HCS compliance 

481 self._ensure_plate_metadata_with_lock(root, row, col, store_path) 

482 

483 # Create HCS-compliant structure: plate/row/col/field/resolution 

484 # Create row group if it doesn't exist 

485 if row not in root: 

486 row_group = root.create_group(row) 

487 else: 

488 row_group = root[row] 

489 

490 # Create well group (remove existing if present to allow overwrite) 

491 if col in row_group: 

492 del row_group[col] 

493 well_group = row_group.create_group(col) 

494 

495 # Add HCS well metadata 

496 well_metadata = { 

497 "images": [ 

498 { 

499 "path": "0", # Single image containing all fields 

500 "acquisition": 0 

501 } 

502 ], 

503 "version": "0.5" 

504 } 

505 well_group.attrs["ome"] = {"version": "0.5", "well": well_metadata} 

506 

507 # Create field group (single field "0" containing all field data) 

508 field_group = well_group.require_group("0") 

509 

510 # Always use full 5D structure: (fields, channels, z, y, x) 

511 # Define OME-NGFF compliant axes 

512 axes = [ 

513 {'name': 'field', 'type': 'field'}, # Custom field type - allowed before space 

514 {'name': 'c', 'type': 'channel'}, 

515 {'name': 'z', 'type': 'space'}, 

516 {'name': 'y', 'type': 'space'}, 

517 {'name': 'x', 'type': 'space'} 

518 ] 

519 

520 # Get image dimensions 

521 sample_image = cpu_data_list[0] 

522 height, width = sample_image.shape[-2:] 

523 

524 # Always reshape to full 5D: (n_fields, n_channels, n_z, y, x) 

525 target_shape = [n_fields, n_channels, n_z, height, width] 

526 

527 # Stack and reshape data 

528 stacked_data = np.stack(cpu_data_list, axis=0) 

529 

530 # Calculate total expected images for validation 

531 total_expected = n_fields * n_channels * n_z 

532 if len(data_list) != total_expected: 

533 logger.warning(f"Data count mismatch: got {len(data_list)}, expected {total_expected} " 

534 f"(fields={n_fields}, channels={n_channels}, z={n_z})") 

535 

536 # Log detailed shape information before reshape 

537 logger.info(f"🔍 ZARR RESHAPE DEBUG:") 

538 logger.info(f" - Input: {len(data_list)} images") 

539 logger.info(f" - Stacked shape: {stacked_data.shape}") 

540 logger.info(f" - Stacked size: {stacked_data.size}") 

541 logger.info(f" - Target shape: {target_shape}") 

542 logger.info(f" - Target size: {np.prod(target_shape)}") 

543 logger.info(f" - Sample image shape: {sample_image.shape}") 

544 logger.info(f" - Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, h={height}, w={width}") 

545 

546 # Always reshape to 5D structure 

547 reshaped_data = stacked_data.reshape(target_shape) 

548 

549 logger.info(f"Zarr save_batch: {len(data_list)} images → {stacked_data.shape}{reshaped_data.shape}") 

550 axes_names = [ax['name'] for ax in axes] 

551 logger.info(f"Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, axes={''.join(axes_names)}") 

552 

553 # Create field group (single field "0" containing all field data) 

554 if "0" in well_group: 

555 field_group = well_group["0"] 

556 else: 

557 field_group = well_group.create_group("0") 

558 

559 # Write OME-ZARR well metadata with single field (well-chunked approach) 

560 write_well_metadata(well_group, ['0']) 

561 

562 # Calculate chunks based on configured strategy 

563 storage_options = { 

564 "chunks": self._calculate_chunks(reshaped_data.shape), 

565 "compressor": self._get_compressor() 

566 } 

567 

568 # Write as single well-chunked array with proper multi-dimensional axes 

569 write_image( 

570 image=reshaped_data, 

571 group=field_group, 

572 axes=axes, 

573 storage_options=storage_options, 

574 scaler=None, # Single scale only for performance 

575 compute=True 

576 ) 

577 

578 # Axes are already correctly set by write_image function 

579 

580 # Store filename mapping with 5D coordinates (field, channel, z, y, x) 

581 # Convert flat index to 5D coordinates for proper zarr slicing 

582 filename_map = {} 

583 for i, path in enumerate(output_paths): 

584 # Calculate 5D coordinates from flat index 

585 field_idx = i // (n_channels * n_z) 

586 remaining = i % (n_channels * n_z) 

587 channel_idx = remaining // n_z 

588 z_idx = remaining % n_z 

589 

590 # Store as tuple (field, channel, z) - y,x are full slices 

591 filename_map[Path(path).name] = (field_idx, channel_idx, z_idx) 

592 

593 field_array = field_group['0'] 

594 field_array.attrs["openhcs_filename_map"] = filename_map 

595 field_array.attrs["openhcs_output_paths"] = [str(path) for path in output_paths] 

596 field_array.attrs["openhcs_dimensions"] = { 

597 "n_fields": n_fields, 

598 "n_channels": n_channels, 

599 "n_z": n_z 

600 } 

601 

602 logger.debug(f"Successfully saved batch for chunk {chunk_name}") 

603 

604 # Aggressive memory cleanup 

605 del cpu_data_list 

606 import gc 

607 gc.collect() 

608 

609 def _ensure_plate_metadata_with_lock(self, root: zarr.Group, row: str, col: str, store_path: Path) -> None: 

610 """Ensure plate-level metadata includes ALL existing wells with file locking.""" 

611 lock_path = store_path.with_suffix('.metadata.lock') 

612 

613 try: 

614 with open(lock_path, 'w') as lock_file: 

615 if FCNTL_AVAILABLE: 

616 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) 

617 else: 

618 portalocker.lock(lock_file, portalocker.LOCK_EX) 

619 self._ensure_plate_metadata(root, row, col) 

620 except Exception as e: 

621 logger.error(f"Failed to update plate metadata with lock: {e}") 

622 raise 

623 finally: 

624 if lock_path.exists(): 

625 lock_path.unlink() 

626 

627 def _ensure_plate_metadata(self, root: zarr.Group, row: str, col: str) -> None: 

628 """Ensure plate-level metadata includes ALL existing wells in the store.""" 

629 

630 # Ensure ome-zarr is loaded 

631 _, write_plate_metadata, _, _ = _ensure_ome_zarr() 

632 

633 # Scan the store for all existing wells 

634 all_rows = set() 

635 all_cols = set() 

636 all_wells = [] 

637 

638 for row_name in root.group_keys(): 

639 if isinstance(root[row_name], zarr.Group): # Ensure it's a row group 

640 row_group = root[row_name] 

641 all_rows.add(row_name) 

642 

643 for col_name in row_group.group_keys(): 

644 if isinstance(row_group[col_name], zarr.Group): # Ensure it's a well group 

645 all_cols.add(col_name) 

646 well_path = f"{row_name}/{col_name}" 

647 all_wells.append(well_path) 

648 

649 # Include the current well being added (might not exist yet) 

650 all_rows.add(row) 

651 all_cols.add(col) 

652 current_well_path = f"{row}/{col}" 

653 if current_well_path not in all_wells: 

654 all_wells.append(current_well_path) 

655 

656 # Sort for consistent ordering 

657 sorted_rows = sorted(all_rows) 

658 sorted_cols = sorted(all_cols) 

659 sorted_wells = sorted(all_wells) 

660 

661 # Build wells metadata with proper indices 

662 wells_metadata = [] 

663 for well_path in sorted_wells: 

664 well_row, well_col = well_path.split("/") 

665 row_index = sorted_rows.index(well_row) 

666 col_index = sorted_cols.index(well_col) 

667 wells_metadata.append({ 

668 "path": well_path, 

669 "rowIndex": row_index, 

670 "columnIndex": col_index 

671 }) 

672 

673 # Add acquisition metadata for HCS compliance 

674 acquisitions = [ 

675 { 

676 "id": 0, 

677 "name": "default_acquisition", 

678 "maximumfieldcount": 1 # Single field containing all field data 

679 } 

680 ] 

681 

682 # Write complete HCS plate metadata 

683 write_plate_metadata( 

684 root, 

685 sorted_rows, 

686 sorted_cols, 

687 wells_metadata, 

688 acquisitions=acquisitions, 

689 field_count=1, 

690 name="OpenHCS_Plate" 

691 ) 

692 

693 

694 

695 

696 

697 

698 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

699 """ 

700 Load a single file from zarr store. 

701 

702 For OME-ZARR structure with filename mapping, delegates to load_batch. 

703 For legacy flat structure or direct keys, uses direct key lookup. 

704 

705 Args: 

706 file_path: Path to file to load 

707 **kwargs: Additional arguments 

708 

709 Returns: 

710 Loaded array data 

711 

712 Raises: 

713 FileNotFoundError: If file not found in zarr store 

714 """ 

715 store, key = self._split_store_and_key(file_path) 

716 group = zarr.group(store=store) 

717 

718 # Check if this is OME-ZARR structure with filename mapping 

719 if "plate" in group.attrs: 

720 # OME-ZARR structure: use load_batch which understands filename mapping 

721 result = self.load_batch([file_path], **kwargs) 

722 if not result: 

723 raise FileNotFoundError(f"File not found in OME-ZARR store: {file_path}") 

724 return result[0] 

725 

726 # Legacy flat structure: direct key lookup with symlink resolution 

727 visited = set() 

728 while self.is_symlink(key): 

729 if key in visited: 

730 raise RuntimeError(f"Zarr symlink loop detected at {key}") 

731 visited.add(key) 

732 key = group[key].attrs["_symlink"] 

733 

734 if key not in group: 

735 raise FileNotFoundError(f"No array found at key '{key}'") 

736 return group[key][:] 

737 

738 def list_files(self, 

739 directory: Union[str, Path], 

740 pattern: Optional[str] = None, 

741 extensions: Optional[Set[str]] = None, 

742 recursive: bool = False) -> List[Path]: 

743 """ 

744 List all file-like entries (i.e. arrays) in a Zarr store, optionally filtered. 

745 Returns filenames from array attributes (output_paths) if available. 

746 """ 

747 

748 store, relative_key = self._split_store_and_key(directory) 

749 result: List[Path] = [] 

750 

751 def _matches_filters(name: str) -> bool: 

752 if pattern and not fnmatch.fnmatch(name, pattern): 

753 return False 

754 if extensions: 

755 return any(name.lower().endswith(ext.lower()) for ext in extensions) 

756 return True 

757 

758 try: 

759 # Open zarr group and traverse OME-ZARR structure 

760 group = zarr.open_group(store=store) 

761 

762 # Check if this is OME-ZARR structure (has plate metadata) 

763 if "plate" in group.attrs: 

764 # OME-ZARR structure: traverse A/01/ wells 

765 for row_name in group.group_keys(): 

766 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 

767 row_group = group[row_name] 

768 for col_name in row_group.group_keys(): 

769 if col_name.isdigit(): # Column directory (01, 02, etc.) 

770 well_group = row_group[col_name] 

771 

772 # Get filenames from field array metadata 

773 if "0" in well_group.group_keys(): 

774 field_group = well_group["0"] 

775 if "0" in field_group.array_keys(): 

776 field_array = field_group["0"] 

777 if "openhcs_output_paths" in field_array.attrs: 

778 output_paths = field_array.attrs["openhcs_output_paths"] 

779 for filename in output_paths: 

780 filename_only = Path(filename).name 

781 if _matches_filters(filename_only): 

782 result.append(Path(filename)) 

783 else: 

784 # Legacy flat structure: get array keys directly 

785 array_keys = list(group.array_keys()) 

786 for array_key in array_keys: 

787 try: 

788 array = group[array_key] 

789 if "output_paths" in array.attrs: 

790 # Get original filenames from array attributes 

791 output_paths = array.attrs["output_paths"] 

792 for filename in output_paths: 

793 filename_only = Path(filename).name 

794 if _matches_filters(filename_only): 

795 result.append(Path(filename)) 

796 

797 except Exception as e: 

798 # Skip arrays that can't be accessed 

799 continue 

800 

801 except Exception as e: 

802 raise StorageResolutionError(f"Failed to list zarr arrays: {e}") from e 

803 

804 return result 

805 

806 def list_dir(self, path: Union[str, Path]) -> List[str]: 

807 store, relative_key = self._split_store_and_key(path) 

808 

809 # Normalize key for Zarr API 

810 key = relative_key.rstrip("/") 

811 

812 try: 

813 # Zarr 3.x uses async API - convert async generator to list 

814 import asyncio 

815 async def _get_entries(): 

816 entries = [] 

817 async for entry in store.list_dir(key): 

818 entries.append(entry) 

819 return entries 

820 return asyncio.run(_get_entries()) 

821 except KeyError: 

822 raise NotADirectoryError(f"Zarr path is not a directory: {path}") 

823 except FileNotFoundError: 

824 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

825 

826 

827 def delete(self, path: Union[str, Path]) -> None: 

828 """ 

829 Delete a Zarr array (file) or empty group (directory) at the given path. 

830 

831 Args: 

832 path: Zarr path or URI 

833 

834 Raises: 

835 FileNotFoundError: If path does not exist 

836 IsADirectoryError: If path is a non-empty group 

837 StorageResolutionError: For unexpected failures 

838 """ 

839 import zarr 

840 import shutil 

841 import os 

842 

843 # Passthrough to disk backend for text files (JSON, CSV, TXT) 

844 path_str = str(path) 

845 if path_str.endswith(('.json', '.csv', '.txt')): 

846 disk_backend = get_backend_instance('disk') 

847 return disk_backend.delete(path) 

848 

849 path = str(path) 

850 

851 if not os.path.exists(path): 

852 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

853 

854 try: 

855 zarr_obj = zarr.open(path, mode='r') 

856 except Exception as e: 

857 raise StorageResolutionError(f"Failed to open Zarr path: {path}") from e 

858 

859 # Determine if it's a file (array) or directory (group) 

860 if isinstance(zarr_obj, zarr.core.Array): 

861 try: 

862 shutil.rmtree(path) # Array folders can be deleted directly 

863 except Exception as e: 

864 raise StorageResolutionError(f"Failed to delete Zarr array: {path}") from e 

865 

866 elif isinstance(zarr_obj, zarr.hierarchy.Group): 

867 if os.listdir(path): 

868 raise IsADirectoryError(f"Zarr group is not empty: {path}") 

869 try: 

870 os.rmdir(path) 

871 except Exception as e: 

872 raise StorageResolutionError(f"Failed to delete empty Zarr group: {path}") from e 

873 else: 

874 raise StorageResolutionError(f"Unrecognized Zarr object type at: {path}") 

875 

876 def delete_all(self, path: Union[str, Path]) -> None: 

877 """ 

878 Recursively delete a Zarr array or group (file or directory). 

879 

880 This is the only permitted recursive deletion method for the Zarr backend. 

881 

882 Args: 

883 path: the path shared through all backnds 

884 

885 Raises: 

886 FileNotFoundError: If the path does not exist 

887 StorageResolutionError: If deletion fails 

888 """ 

889 import os 

890 import shutil 

891 

892 path = str(path) 

893 

894 if not os.path.exists(path): 

895 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

896 

897 try: 

898 shutil.rmtree(path) 

899 except Exception as e: 

900 raise StorageResolutionError(f"Failed to recursively delete Zarr path: {path}") from e 

901 

902 @passthrough_to_disk('.json', '.csv', '.txt') 

903 def exists(self, path: Union[str, Path]) -> bool: 

904 # Zarr-specific existence check (text files automatically passthrough to disk) 

905 path = Path(path) 

906 

907 # If path has no file extension, treat as directory existence check 

908 # This handles auto_detect_patterns asking "does this directory exist?" 

909 if not path.suffix: 

910 return path.exists() 

911 

912 # Otherwise, check zarr key existence (for actual files) 

913 store, key = self._split_store_and_key(path) 

914 

915 # First check if the zarr store itself exists 

916 if isinstance(store, str): 

917 store_path = Path(store) 

918 if not store_path.exists(): 

919 return False 

920 

921 try: 

922 root_group = zarr.group(store=store) 

923 return key in root_group or any(k.startswith(key.rstrip("/") + "/") for k in root_group.array_keys()) 

924 except Exception: 

925 # If we can't open the zarr store, it doesn't exist 

926 return False 

927 

928 def ensure_directory(self, directory: Union[str, Path]) -> Path: 

929 """ 

930 No-op for zarr backend - zarr stores handle their own structure. 

931 

932 Zarr doesn't have filesystem directories that need to be "ensured". 

933 Store creation and group structure is handled by save operations. 

934 """ 

935 return Path(directory) 

936 

937 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

938 store, src_key = self._split_store_and_key(source) 

939 store2, dst_key = self._split_store_and_key(link_name) 

940 

941 if store.root != store2.root: 

942 raise ValueError("Symlinks must exist within the same .zarr store") 

943 

944 group = zarr.group(store=store) 

945 if src_key not in group: 

946 raise FileNotFoundError(f"Source key '{src_key}' not found in Zarr store") 

947 

948 if dst_key in group: 

949 if not overwrite: 

950 raise FileExistsError(f"Symlink target already exists at: {dst_key}") 

951 # Remove existing entry if overwrite=True 

952 del group[dst_key] 

953 

954 # Create a new group at the symlink path 

955 link_group = group.require_group(dst_key) 

956 link_group.attrs["_symlink"] = src_key # Store as declared string 

957 

958 def is_symlink(self, path: Union[str, Path]) -> bool: 

959 """ 

960 Check if the given Zarr path represents a logical symlink (based on attribute contract). 

961 

962 Returns: 

963 bool: True if the key exists and has an OpenHCS-declared symlink attribute 

964 False if the key doesn't exist or is not a symlink 

965 """ 

966 store, key = self._split_store_and_key(path) 

967 group = zarr.group(store=store) 

968 

969 try: 

970 obj = group[key] 

971 attrs = getattr(obj, "attrs", {}) 

972 

973 if "_symlink" not in attrs: 

974 return False 

975 

976 # Enforce that the _symlink attr matches schema (e.g. str or list of path components) 

977 if not isinstance(attrs["_symlink"], str): 

978 raise StorageResolutionError(f"Invalid symlink format in Zarr attrs at: {path}") 

979 

980 return True 

981 except KeyError: 

982 # Key doesn't exist, so it's not a symlink 

983 return False 

984 except Exception as e: 

985 raise StorageResolutionError(f"Failed to inspect Zarr symlink at: {path}") from e 

986 

987 def _auto_chunks(self, data: Any, chunk_divisor: int = 1) -> Tuple[int, ...]: 

988 shape = data.shape 

989 

990 # Simple logic: 1/10th of each dim, with min 1 

991 return tuple(max(1, s // chunk_divisor) for s in shape) 

992 

993 def is_file(self, path: Union[str, Path]) -> bool: 

994 """ 

995 Check if a Zarr path points to a file (Zarr array), resolving both OS and Zarr-native symlinks. 

996 

997 Args: 

998 path: Zarr store path (may point to key within store) 

999 

1000 Returns: 

1001 bool: True if resolved path is a Zarr array 

1002 

1003 Raises: 

1004 FileNotFoundError: If path does not exist or broken symlink 

1005 IsADirectoryError: If resolved object is a Zarr group 

1006 StorageResolutionError: For other failures 

1007 """ 

1008 path = str(path) 

1009 

1010 if not os.path.exists(path): 

1011 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

1012 

1013 try: 

1014 store, key = self._split_store_and_key(path) 

1015 group = zarr.group(store=store) 

1016 

1017 # Resolve symlinks (Zarr-native, via .attrs) 

1018 seen_keys = set() 

1019 while True: 

1020 if key not in group: 

1021 raise FileNotFoundError(f"Zarr key does not exist: {key}") 

1022 obj = group[key] 

1023 

1024 if hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

1025 if key in seen_keys: 

1026 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}") 

1027 seen_keys.add(key) 

1028 key = obj.attrs["_symlink"] 

1029 continue 

1030 break # resolution complete 

1031 

1032 # Now obj is the resolved target 

1033 if isinstance(obj, zarr.core.Array): 

1034 return True 

1035 elif isinstance(obj, zarr.hierarchy.Group): 

1036 raise IsADirectoryError(f"Zarr path is a group (directory): {path}") 

1037 else: 

1038 raise StorageResolutionError(f"Unknown Zarr object at: {path}") 

1039 

1040 except Exception as e: 

1041 raise StorageResolutionError(f"Failed to resolve Zarr file path: {path}") from e 

1042 

1043 def is_dir(self, path: Union[str, Path]) -> bool: 

1044 """ 

1045 Check if a Zarr path resolves to a directory (i.e., a Zarr group). 

1046  

1047 Resolves both OS-level symlinks and Zarr-native symlinks via .attrs['_symlink']. 

1048  

1049 Args: 

1050 path: Zarr path or URI 

1051  

1052 Returns: 

1053 bool: True if path resolves to a Zarr group 

1054  

1055 Raises: 

1056 FileNotFoundError: If path or resolved target does not exist 

1057 NotADirectoryError: If resolved target is not a group 

1058 StorageResolutionError: For symlink cycles or other failures 

1059 """ 

1060 import os 

1061 

1062 

1063 path = str(path) 

1064 

1065 if not os.path.exists(path): 

1066 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

1067 

1068 try: 

1069 store, key = self._split_store_and_key(path) 

1070 group = zarr.group(store=store) 

1071 

1072 seen_keys = set() 

1073 

1074 # Resolve symlink chain 

1075 while True: 

1076 if key not in group: 

1077 raise FileNotFoundError(f"Zarr key does not exist: {key}") 

1078 obj = group[key] 

1079 

1080 if hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

1081 if key in seen_keys: 

1082 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}") 

1083 seen_keys.add(key) 

1084 key = obj.attrs["_symlink"] 

1085 continue 

1086 break 

1087 

1088 # obj is resolved 

1089 if isinstance(obj, zarr.hierarchy.Group): 

1090 return True 

1091 elif isinstance(obj, zarr.core.Array): 

1092 raise NotADirectoryError(f"Zarr path is an array (file): {path}") 

1093 else: 

1094 raise StorageResolutionError(f"Unknown Zarr object at: {path}") 

1095 

1096 except Exception as e: 

1097 raise StorageResolutionError(f"Failed to resolve Zarr directory path: {path}") from e 

1098 

1099 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

1100 """ 

1101 Move a Zarr key or object (array/group) from one location to another, resolving symlinks. 

1102  

1103 Supports: 

1104 - Disk or memory stores 

1105 - Zarr-native symlinks 

1106 - Key renames within group 

1107 - Full copy+delete across stores if needed 

1108  

1109 Raises: 

1110 FileNotFoundError: If src does not exist 

1111 FileExistsError: If dst already exists 

1112 StorageResolutionError: On failure 

1113 """ 

1114 import zarr 

1115 

1116 src_store, src_key = self._split_store_and_key(src) 

1117 dst_store, dst_key = self._split_store_and_key(dst) 

1118 

1119 src_group = zarr.group(store=src_store) 

1120 dst_group = zarr.group(store=dst_store) 

1121 

1122 if src_key not in src_group: 

1123 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}") 

1124 if dst_key in dst_group: 

1125 raise FileExistsError(f"Zarr destination key already exists: {dst_key}") 

1126 

1127 obj = src_group[src_key] 

1128 

1129 # Resolve symlinks if present 

1130 seen_keys = set() 

1131 while hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

1132 if src_key in seen_keys: 

1133 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}") 

1134 seen_keys.add(src_key) 

1135 src_key = obj.attrs["_symlink"] 

1136 obj = src_group[src_key] 

1137 

1138 try: 

1139 if src_store is dst_store: 

1140 # Native move within the same Zarr group/store 

1141 src_group.move(src_key, dst_key) 

1142 else: 

1143 # Cross-store: perform manual copy + delete 

1144 obj.copy(dst_group, name=dst_key) 

1145 del src_group[src_key] 

1146 except Exception as e: 

1147 raise StorageResolutionError(f"Failed to move {src_key} to {dst_key}") from e 

1148 

1149 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

1150 """ 

1151 Copy a Zarr key or object (array/group) from one location to another. 

1152 

1153 - Resolves Zarr-native symlinks before copying 

1154 - Prevents overwrite unless explicitly allowed (future feature) 

1155 - Works across memory or disk stores 

1156 

1157 Raises: 

1158 FileNotFoundError: If src does not exist 

1159 FileExistsError: If dst already exists 

1160 StorageResolutionError: On failure 

1161 """ 

1162 import zarr 

1163 

1164 src_store, src_key = self._split_store_and_key(src) 

1165 dst_store, dst_key = self._split_store_and_key(dst) 

1166 

1167 src_group = zarr.group(store=src_store) 

1168 dst_group = zarr.group(store=dst_store) 

1169 

1170 if src_key not in src_group: 

1171 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}") 

1172 if dst_key in dst_group: 

1173 raise FileExistsError(f"Zarr destination key already exists: {dst_key}") 

1174 

1175 obj = src_group[src_key] 

1176 

1177 seen_keys = set() 

1178 while hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

1179 if src_key in seen_keys: 

1180 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}") 

1181 seen_keys.add(src_key) 

1182 src_key = obj.attrs["_symlink"] 

1183 obj = src_group[src_key] 

1184 

1185 try: 

1186 obj.copy(dst_group, name=dst_key) 

1187 except Exception as e: 

1188 raise StorageResolutionError(f"Failed to copy {src_key} to {dst_key}") from e 

1189 

1190 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

1191 """ 

1192 Return structural metadata about a Zarr path. 

1193 

1194 Returns: 

1195 dict with keys: 

1196 - 'type': 'file', 'directory', 'symlink', or 'missing' 

1197 - 'key': final resolved key 

1198 - 'target': symlink target if applicable 

1199 - 'store': repr(store) 

1200 - 'exists': bool 

1201 

1202 Raises: 

1203 StorageResolutionError: On resolution failure 

1204 """ 

1205 store, key = self._split_store_and_key(path) 

1206 group = zarr.group(store=store) 

1207 

1208 try: 

1209 if key in group: 

1210 obj = group[key] 

1211 attrs = getattr(obj, "attrs", {}) 

1212 is_link = "_symlink" in attrs 

1213 

1214 if is_link: 

1215 target = attrs["_symlink"] 

1216 if not isinstance(target, str): 

1217 raise StorageResolutionError(f"Invalid symlink format at {key}") 

1218 return { 

1219 "type": "symlink", 

1220 "key": key, 

1221 "target": target, 

1222 "store": repr(store), 

1223 "exists": target in group 

1224 } 

1225 

1226 if isinstance(obj, zarr.Array): 

1227 return { 

1228 "type": "file", 

1229 "key": key, 

1230 "store": repr(store), 

1231 "exists": True 

1232 } 

1233 

1234 elif isinstance(obj, zarr.Group): 

1235 return { 

1236 "type": "directory", 

1237 "key": key, 

1238 "store": repr(store), 

1239 "exists": True 

1240 } 

1241 

1242 raise StorageResolutionError(f"Unknown object type at: {key}") 

1243 else: 

1244 return { 

1245 "type": "missing", 

1246 "key": key, 

1247 "store": repr(store), 

1248 "exists": False 

1249 } 

1250 

1251 except Exception as e: 

1252 raise StorageResolutionError(f"Failed to stat Zarr key {key}") from e 

1253 

1254class ZarrSymlink: 

1255 """ 

1256 Represents a symbolic link in a Zarr store. 

1257 

1258 This class is used to represent symbolic links in a Zarr store. 

1259 It stores the target path of the symlink. 

1260 """ 

1261 def __init__(self, target: str): 

1262 self.target = target 

1263 

1264 def __repr__(self): 

1265 return f"<ZarrSymlink → {self.target}>" 

1266 

1267 

1268# Backwards-compatible name used by package public API 

1269ZarrBackend = ZarrStorageBackend