Coverage for openhcs/io/zarr.py: 7.1%

513 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 18:33 +0000

1# openhcs/io/storage/backends/zarr.py 

2""" 

3Zarr storage backend module for OpenHCS. 

4 

5This module provides a Zarr-backed implementation of the MicroscopyStorageBackend interface. 

6It stores data in a Zarr store on disk and supports overlay operations 

7for materializing data to disk when needed. 

8""" 

9 

10import fnmatch 

11import logging 

12from pathlib import Path 

13from typing import Any, Dict, List, Literal, Optional, Set, Tuple, Union 

14 

15import numpy as np 

16import zarr 

17 

18try: 

19 from ome_zarr.writer import write_image, write_plate_metadata, write_well_metadata 

20 from ome_zarr.io import parse_url 

21 OME_ZARR_AVAILABLE = True 

22except ImportError: 

23 OME_ZARR_AVAILABLE = False 

24 

25from openhcs.constants.constants import Backend, DEFAULT_IMAGE_EXTENSIONS 

26from openhcs.io.base import StorageBackend 

27from openhcs.io.backend_registry import StorageBackendMeta 

28from openhcs.constants.constants import Backend 

29from openhcs.io.exceptions import StorageResolutionError 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34class ZarrStorageBackend(StorageBackend, metaclass=StorageBackendMeta): 

35 """Zarr storage backend with automatic metaclass registration.""" 

36 

37 # Backend type from enum for registration 

38 _backend_type = Backend.ZARR.value 

39 """ 

40 Zarr storage backend implementation with configurable compression. 

41 

42 This class provides a concrete implementation of the storage backend interfaces 

43 for Zarr storage. It stores data in a Zarr store on disk with configurable 

44 compression algorithms and settings. 

45 

46 Features: 

47 - Single-chunk batch operations for 40x performance improvement 

48 - Configurable compression (Blosc, Zlib, LZ4, Zstd, or none) 

49 - Configurable compression levels 

50 - Full path mapping for batch operations 

51 """ 

52 

53 def __init__(self, zarr_config: Optional['ZarrConfig'] = None): 

54 """ 

55 Initialize Zarr backend with ZarrConfig. 

56 

57 Args: 

58 zarr_config: ZarrConfig dataclass with all zarr settings (uses defaults if None) 

59 """ 

60 # Import here to avoid circular imports 

61 from openhcs.core.config import ZarrConfig 

62 

63 if zarr_config is None: 

64 zarr_config = ZarrConfig() 

65 

66 self.config = zarr_config 

67 

68 # Convenience attributes 

69 self.compression_level = zarr_config.compression_level 

70 

71 # Create actual compressor from config 

72 self.compressor = self.config.compressor.create_compressor( 

73 self.config.compression_level, 

74 self.config.shuffle 

75 ) 

76 

77 def _get_compressor(self) -> Optional[Any]: 

78 """ 

79 Get the configured compressor with appropriate settings. 

80 

81 Returns: 

82 Configured compressor instance or None for no compression 

83 """ 

84 if self.compressor is None: 

85 return None 

86 

87 # If compression_level is specified and compressor supports it 

88 if self.compression_level is not None: 

89 # Check if compressor has level parameter 

90 if hasattr(self.compressor, '__class__'): 

91 try: 

92 # Create new instance with compression level 

93 compressor_class = self.compressor.__class__ 

94 if 'level' in compressor_class.__init__.__code__.co_varnames: 

95 return compressor_class(level=self.compression_level) 

96 elif 'clevel' in compressor_class.__init__.__code__.co_varnames: 

97 return compressor_class(clevel=self.compression_level) 

98 except (AttributeError, TypeError): 

99 # Fall back to original compressor if level setting fails 

100 pass 

101 

102 return self.compressor 

103 

104 

105 

106 def _split_store_and_key(self, path: Union[str, Path]) -> Tuple[Any, str]: 

107 """ 

108 Split path into zarr store and key without auto-injection. 

109 Path planner now provides the complete storage path. 

110 

111 Maps paths to zarr store and key: 

112 - Directory: "/path/to/plate/images.zarr" → Store: "/path/to/plate/images.zarr", Key: "" 

113 - File: "/path/to/plate/images.zarr/A01.tif" → Store: "/path/to/plate/images.zarr", Key: "A01.tif" 

114 

115 Returns a DirectoryStore with dimension_separator='/' for OME-ZARR compatibility. 

116 """ 

117 path = Path(path) 

118 

119 # If path has no extension or ends with .zarr, treat as directory (zarr store) 

120 if not path.suffix or path.suffix == '.zarr': 

121 # Directory path - treat as zarr store 

122 store_path = path 

123 relative_key = "" 

124 else: 

125 # File path - parent is zarr store, filename is key 

126 store_path = path.parent 

127 relative_key = path.name 

128 

129 # CRITICAL: Create DirectoryStore with dimension_separator='/' for OME-ZARR compatibility 

130 # This ensures chunk paths use '/' instead of '.' (e.g., '0/0/0' not '0.0.0') 

131 store = zarr.DirectoryStore(str(store_path), dimension_separator='/') 

132 return store, relative_key 

133 

134 def save(self, data: Any, output_path: Union[str, Path], **kwargs): 

135 """ 

136 Save data to Zarr at the given output_path. 

137 

138 Will only write if the key does not already exist. 

139 Will NOT overwrite or delete existing data. 

140 

141 Raises: 

142 FileExistsError: If destination key already exists 

143 StorageResolutionError: If creation fails 

144 """ 

145 store, key = self._split_store_and_key(output_path) 

146 group = zarr.group(store=store) 

147 

148 if key in group: 

149 raise FileExistsError(f"Zarr key already exists: {output_path}") 

150 

151 chunks = kwargs.get("chunks") 

152 if chunks is None: 

153 chunks = self._auto_chunks(data, chunk_divisor=kwargs.get("chunk_divisor", 1)) 

154 

155 try: 

156 # Create array with correct shape and dtype, then assign data 

157 array = group.create_dataset( 

158 name=key, 

159 shape=data.shape, 

160 dtype=data.dtype, 

161 chunks=chunks, 

162 compressor=kwargs.get("compressor", self._get_compressor()), 

163 overwrite=False # 🔒 Must be False by doctrine 

164 ) 

165 array[:] = data 

166 except Exception as e: 

167 raise StorageResolutionError(f"Failed to save to Zarr: {output_path}") from e 

168 

169 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

170 """ 

171 Load from zarr array using filename mapping. 

172 

173 Args: 

174 file_paths: List of file paths to load 

175 **kwargs: Additional arguments (zarr_config not needed) 

176 

177 Returns: 

178 List of loaded data objects in same order as file_paths 

179 

180 Raises: 

181 FileNotFoundError: If expected zarr store not found 

182 KeyError: If filename not found in filename_map 

183 """ 

184 if not file_paths: 

185 return [] 

186 

187 # Use _split_store_and_key to get store path from first file path 

188 store, _ = self._split_store_and_key(file_paths[0]) 

189 store_path = Path(store.path) 

190 

191 # FAIL LOUD: Store must exist 

192 if not store_path.exists(): 

193 raise FileNotFoundError(f"Expected zarr store not found: {store_path}") 

194 root = zarr.open_group(store=store, mode='r') 

195 

196 # Group files by well based on OME-ZARR structure 

197 well_to_files = {} 

198 well_to_indices = {} 

199 

200 # Search OME-ZARR structure for requested files 

201 for row_name in root.group_keys(): 

202 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 

203 row_group = root[row_name] 

204 for col_name in row_group.group_keys(): 

205 if col_name.isdigit(): # Column directory (01, 02, etc.) 

206 well_group = row_group[col_name] 

207 well_name = f"{row_name}{col_name}" 

208 

209 # Check if this well has our filename mapping in the field array 

210 if "0" in well_group.group_keys(): 

211 field_group = well_group["0"] 

212 if "0" in field_group.array_keys(): 

213 field_array = field_group["0"] 

214 if "openhcs_filename_map" in field_array.attrs: 

215 filename_map = dict(field_array.attrs["openhcs_filename_map"]) 

216 

217 # Check which requested files are in this well 

218 for i, path in enumerate(file_paths): 

219 filename = Path(path).name # Use filename only for matching 

220 if filename in filename_map: 

221 if well_name not in well_to_files: 

222 well_to_files[well_name] = [] 

223 well_to_indices[well_name] = [] 

224 well_to_files[well_name].append(i) # Original position in file_paths 

225 well_to_indices[well_name].append(filename_map[filename]) # 5D coordinates (field, channel, z) 

226 

227 # Load data from each well using single well chunk 

228 results = [None] * len(file_paths) # Pre-allocate results array 

229 

230 for well_name, file_positions in well_to_files.items(): 

231 row, col = well_name[0], well_name[1:] 

232 well_group = root[row][col] 

233 well_indices = well_to_indices[well_name] 

234 

235 # Load entire well field array in single operation (well chunking) 

236 field_group = well_group["0"] 

237 field_array = field_group["0"] 

238 all_well_data = field_array[:] # Single I/O operation for entire well 

239 

240 # Extract requested 2D slices using 5D coordinates 

241 for file_pos, coords_5d in zip(file_positions, well_indices): 

242 field_idx, channel_idx, z_idx = coords_5d 

243 # Extract 2D slice: (field, channel, z, y, x) -> (y, x) 

244 results[file_pos] = all_well_data[field_idx, channel_idx, z_idx, :, :] # 2D slice 

245 

246 logger.debug(f"Loaded {len(file_paths)} images from zarr store at {store_path} from {len(well_to_files)} wells") 

247 return results 

248 

249 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

250 """Save multiple images using ome-zarr-py for proper OME-ZARR compliance with multi-dimensional support. 

251 

252 Args: 

253 data_list: List of image data to save 

254 output_paths: List of output file paths 

255 **kwargs: Must include chunk_name, n_channels, n_z, n_fields, row, col 

256 """ 

257 

258 # Extract required parameters from kwargs 

259 chunk_name = kwargs.get('chunk_name') 

260 n_channels = kwargs.get('n_channels') 

261 n_z = kwargs.get('n_z') 

262 n_fields = kwargs.get('n_fields') 

263 row = kwargs.get('row') 

264 col = kwargs.get('col') 

265 

266 # Validate required parameters 

267 if chunk_name is None: 

268 raise ValueError("chunk_name must be provided") 

269 if n_channels is None: 

270 raise ValueError("n_channels must be provided") 

271 if n_z is None: 

272 raise ValueError("n_z must be provided") 

273 if n_fields is None: 

274 raise ValueError("n_fields must be provided") 

275 if row is None: 

276 raise ValueError("row must be provided") 

277 if col is None: 

278 raise ValueError("col must be provided") 

279 

280 if not data_list: 

281 logger.warning(f"Empty data list for chunk {chunk_name}") 

282 return 

283 

284 if not OME_ZARR_AVAILABLE: 

285 raise ImportError("ome-zarr package is required. Install with: pip install ome-zarr") 

286 

287 # Use _split_store_and_key to get store path from first output path 

288 store, _ = self._split_store_and_key(output_paths[0]) 

289 store_path = Path(store.path) 

290 

291 logger.debug(f"Saving batch for chunk {chunk_name} with {len(data_list)} images to row={row}, col={col}") 

292 

293 # Convert GPU arrays to CPU arrays before saving 

294 cpu_data_list = [] 

295 for data in data_list: 

296 if hasattr(data, 'get'): # CuPy array 

297 cpu_data_list.append(data.get()) 

298 elif hasattr(data, 'cpu'): # PyTorch tensor 

299 cpu_data_list.append(data.cpu().numpy()) 

300 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # JAX on GPU 

301 import jax 

302 cpu_data_list.append(jax.device_get(data)) 

303 else: # Already CPU array (NumPy, etc.) 

304 cpu_data_list.append(data) 

305 

306 # Ensure parent directory exists 

307 store_path.parent.mkdir(parents=True, exist_ok=True) 

308 

309 # Use _split_store_and_key to get properly configured store with dimension_separator='/' 

310 store, _ = self._split_store_and_key(store_path) 

311 root = zarr.group(store=store) # Open existing or create new group without mode conflicts 

312 

313 # Set OME metadata if not already present 

314 if "ome" not in root.attrs: 

315 root.attrs["ome"] = {"version": "0.4"} 

316 

317 # Get the store for compatibility with existing code 

318 store = root.store 

319 

320 # Write plate metadata with locking to prevent concurrent corruption (if enabled) 

321 should_write_plate_metadata = kwargs.get('write_plate_metadata', self.config.write_plate_metadata) 

322 if should_write_plate_metadata: 

323 self._ensure_plate_metadata_with_lock(root, row, col, store_path) 

324 

325 # Create HCS-compliant structure: plate/row/col/field/resolution 

326 # Create row group if it doesn't exist 

327 if row not in root: 

328 row_group = root.create_group(row) 

329 else: 

330 row_group = root[row] 

331 

332 # Create well group (remove existing if present to allow overwrite) 

333 if col in row_group: 

334 del row_group[col] 

335 well_group = row_group.create_group(col) 

336 

337 # Add HCS well metadata 

338 well_metadata = { 

339 "images": [ 

340 { 

341 "path": "0", # Single image containing all fields 

342 "acquisition": 0 

343 } 

344 ], 

345 "version": "0.5" 

346 } 

347 well_group.attrs["ome"] = {"version": "0.5", "well": well_metadata} 

348 

349 # Create field group (single field "0" containing all field data) 

350 field_group = well_group.require_group("0") 

351 

352 # Always use full 5D structure: (fields, channels, z, y, x) 

353 # Define OME-NGFF compliant axes 

354 axes = [ 

355 {'name': 'field', 'type': 'field'}, # Custom field type - allowed before space 

356 {'name': 'c', 'type': 'channel'}, 

357 {'name': 'z', 'type': 'space'}, 

358 {'name': 'y', 'type': 'space'}, 

359 {'name': 'x', 'type': 'space'} 

360 ] 

361 

362 # Get image dimensions 

363 sample_image = cpu_data_list[0] 

364 height, width = sample_image.shape[-2:] 

365 

366 # Always reshape to full 5D: (n_fields, n_channels, n_z, y, x) 

367 target_shape = [n_fields, n_channels, n_z, height, width] 

368 

369 # Stack and reshape data 

370 stacked_data = np.stack(cpu_data_list, axis=0) 

371 

372 # Calculate total expected images for validation 

373 total_expected = n_fields * n_channels * n_z 

374 if len(data_list) != total_expected: 

375 logger.warning(f"Data count mismatch: got {len(data_list)}, expected {total_expected} " 

376 f"(fields={n_fields}, channels={n_channels}, z={n_z})") 

377 

378 # Always reshape to 5D structure 

379 reshaped_data = stacked_data.reshape(target_shape) 

380 

381 logger.info(f"Zarr save_batch: {len(data_list)} images → {stacked_data.shape}{reshaped_data.shape}") 

382 axes_names = [ax['name'] for ax in axes] 

383 logger.info(f"Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, axes={''.join(axes_names)}") 

384 

385 # Create field group (single field "0" containing all field data) 

386 if "0" in well_group: 

387 field_group = well_group["0"] 

388 else: 

389 field_group = well_group.create_group("0") 

390 

391 # Write OME-ZARR well metadata with single field (well-chunked approach) 

392 write_well_metadata(well_group, ['0']) 

393 

394 # Use single chunk approach for optimal performance 

395 storage_options = { 

396 "chunks": reshaped_data.shape, # Single chunk for entire well 

397 "compressor": self._get_compressor() 

398 } 

399 

400 # Write as single well-chunked array with proper multi-dimensional axes 

401 write_image( 

402 image=reshaped_data, 

403 group=field_group, 

404 axes=axes, 

405 storage_options=storage_options, 

406 scaler=None, # Single scale only for performance 

407 compute=True 

408 ) 

409 

410 # Axes are already correctly set by write_image function 

411 

412 # Store filename mapping with 5D coordinates (field, channel, z, y, x) 

413 # Convert flat index to 5D coordinates for proper zarr slicing 

414 filename_map = {} 

415 for i, path in enumerate(output_paths): 

416 # Calculate 5D coordinates from flat index 

417 field_idx = i // (n_channels * n_z) 

418 remaining = i % (n_channels * n_z) 

419 channel_idx = remaining // n_z 

420 z_idx = remaining % n_z 

421 

422 # Store as tuple (field, channel, z) - y,x are full slices 

423 filename_map[Path(path).name] = (field_idx, channel_idx, z_idx) 

424 

425 field_array = field_group['0'] 

426 field_array.attrs["openhcs_filename_map"] = filename_map 

427 field_array.attrs["openhcs_output_paths"] = [str(path) for path in output_paths] 

428 field_array.attrs["openhcs_dimensions"] = { 

429 "n_fields": n_fields, 

430 "n_channels": n_channels, 

431 "n_z": n_z 

432 } 

433 

434 logger.debug(f"Successfully saved batch for chunk {chunk_name}") 

435 

436 # Aggressive memory cleanup 

437 del cpu_data_list 

438 import gc 

439 gc.collect() 

440 

441 def _ensure_plate_metadata_with_lock(self, root: zarr.Group, row: str, col: str, store_path: Path) -> None: 

442 """Ensure plate-level metadata includes ALL existing wells with file locking.""" 

443 import fcntl 

444 

445 lock_path = store_path.with_suffix('.metadata.lock') 

446 

447 try: 

448 with open(lock_path, 'w') as lock_file: 

449 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) 

450 self._ensure_plate_metadata(root, row, col) 

451 except Exception as e: 

452 logger.error(f"Failed to update plate metadata with lock: {e}") 

453 raise 

454 finally: 

455 if lock_path.exists(): 

456 lock_path.unlink() 

457 

458 def _ensure_plate_metadata(self, root: zarr.Group, row: str, col: str) -> None: 

459 """Ensure plate-level metadata includes ALL existing wells in the store.""" 

460 

461 # Scan the store for all existing wells 

462 all_rows = set() 

463 all_cols = set() 

464 all_wells = [] 

465 

466 for row_name in root.group_keys(): 

467 if isinstance(root[row_name], zarr.Group): # Ensure it's a row group 

468 row_group = root[row_name] 

469 all_rows.add(row_name) 

470 

471 for col_name in row_group.group_keys(): 

472 if isinstance(row_group[col_name], zarr.Group): # Ensure it's a well group 

473 all_cols.add(col_name) 

474 well_path = f"{row_name}/{col_name}" 

475 all_wells.append(well_path) 

476 

477 # Include the current well being added (might not exist yet) 

478 all_rows.add(row) 

479 all_cols.add(col) 

480 current_well_path = f"{row}/{col}" 

481 if current_well_path not in all_wells: 

482 all_wells.append(current_well_path) 

483 

484 # Sort for consistent ordering 

485 sorted_rows = sorted(all_rows) 

486 sorted_cols = sorted(all_cols) 

487 sorted_wells = sorted(all_wells) 

488 

489 # Build wells metadata with proper indices 

490 wells_metadata = [] 

491 for well_path in sorted_wells: 

492 well_row, well_col = well_path.split("/") 

493 row_index = sorted_rows.index(well_row) 

494 col_index = sorted_cols.index(well_col) 

495 wells_metadata.append({ 

496 "path": well_path, 

497 "rowIndex": row_index, 

498 "columnIndex": col_index 

499 }) 

500 

501 # Add acquisition metadata for HCS compliance 

502 acquisitions = [ 

503 { 

504 "id": 0, 

505 "name": "default_acquisition", 

506 "maximumfieldcount": 1 # Single field containing all field data 

507 } 

508 ] 

509 

510 # Write complete HCS plate metadata 

511 write_plate_metadata( 

512 root, 

513 sorted_rows, 

514 sorted_cols, 

515 wells_metadata, 

516 acquisitions=acquisitions, 

517 field_count=1, 

518 name="OpenHCS_Plate" 

519 ) 

520 

521 

522 

523 

524 

525 

526 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

527 store, key = self._split_store_and_key(file_path) 

528 group = zarr.group(store=store) 

529 

530 visited = set() 

531 while self.is_symlink(key): 

532 if key in visited: 

533 raise RuntimeError(f"Zarr symlink loop detected at {key}") 

534 visited.add(key) 

535 key = group[key].attrs["_symlink"] 

536 

537 if key not in group: 

538 raise FileNotFoundError(f"No array found at key '{key}'") 

539 return group[key][:] 

540 

541 def list_files(self, 

542 directory: Union[str, Path], 

543 pattern: Optional[str] = None, 

544 extensions: Optional[Set[str]] = None, 

545 recursive: bool = False) -> List[Path]: 

546 """ 

547 List all file-like entries (i.e. arrays) in a Zarr store, optionally filtered. 

548 Returns filenames from array attributes (output_paths) if available. 

549 """ 

550 

551 store, relative_key = self._split_store_and_key(directory) 

552 result: List[Path] = [] 

553 

554 def _matches_filters(name: str) -> bool: 

555 if pattern and not fnmatch.fnmatch(name, pattern): 

556 return False 

557 if extensions: 

558 return any(name.lower().endswith(ext.lower()) for ext in extensions) 

559 return True 

560 

561 try: 

562 # Open zarr group and traverse OME-ZARR structure 

563 group = zarr.open_group(store=store) 

564 

565 # Check if this is OME-ZARR structure (has plate metadata) 

566 if "plate" in group.attrs: 

567 # OME-ZARR structure: traverse A/01/ wells 

568 for row_name in group.group_keys(): 

569 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 

570 row_group = group[row_name] 

571 for col_name in row_group.group_keys(): 

572 if col_name.isdigit(): # Column directory (01, 02, etc.) 

573 well_group = row_group[col_name] 

574 

575 # Get filenames from field array metadata 

576 if "0" in well_group.group_keys(): 

577 field_group = well_group["0"] 

578 if "0" in field_group.array_keys(): 

579 field_array = field_group["0"] 

580 if "openhcs_output_paths" in field_array.attrs: 

581 output_paths = field_array.attrs["openhcs_output_paths"] 

582 for filename in output_paths: 

583 filename_only = Path(filename).name 

584 if _matches_filters(filename_only): 

585 result.append(Path(filename)) 

586 else: 

587 # Legacy flat structure: get array keys directly 

588 array_keys = list(group.array_keys()) 

589 for array_key in array_keys: 

590 try: 

591 array = group[array_key] 

592 if "output_paths" in array.attrs: 

593 # Get original filenames from array attributes 

594 output_paths = array.attrs["output_paths"] 

595 for filename in output_paths: 

596 filename_only = Path(filename).name 

597 if _matches_filters(filename_only): 

598 result.append(Path(filename)) 

599 

600 except Exception as e: 

601 # Skip arrays that can't be accessed 

602 continue 

603 

604 except Exception as e: 

605 raise StorageResolutionError(f"Failed to list zarr arrays: {e}") from e 

606 

607 return result 

608 

609 def list_dir(self, path: Union[str, Path]) -> List[str]: 

610 store, relative_key = self._split_store_and_key(path) 

611 

612 # Normalize key for Zarr API 

613 key = relative_key.rstrip("/") 

614 

615 try: 

616 # Zarr 3.x uses async API - convert async generator to list 

617 import asyncio 

618 async def _get_entries(): 

619 entries = [] 

620 async for entry in store.list_dir(key): 

621 entries.append(entry) 

622 return entries 

623 return asyncio.run(_get_entries()) 

624 except KeyError: 

625 raise NotADirectoryError(f"Zarr path is not a directory: {path}") 

626 except FileNotFoundError: 

627 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

628 

629 

630 def delete(self, path: Union[str, Path]) -> None: 

631 """ 

632 Delete a Zarr array (file) or empty group (directory) at the given path. 

633 

634 Args: 

635 path: Zarr path or URI 

636 

637 Raises: 

638 FileNotFoundError: If path does not exist 

639 IsADirectoryError: If path is a non-empty group 

640 StorageResolutionError: For unexpected failures 

641 """ 

642 import zarr 

643 import shutil 

644 import os 

645 

646 path = str(path) 

647 

648 if not os.path.exists(path): 

649 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

650 

651 try: 

652 zarr_obj = zarr.open(path, mode='r') 

653 except Exception as e: 

654 raise StorageResolutionError(f"Failed to open Zarr path: {path}") from e 

655 

656 # Determine if it's a file (array) or directory (group) 

657 if isinstance(zarr_obj, zarr.core.Array): 

658 try: 

659 shutil.rmtree(path) # Array folders can be deleted directly 

660 except Exception as e: 

661 raise StorageResolutionError(f"Failed to delete Zarr array: {path}") from e 

662 

663 elif isinstance(zarr_obj, zarr.hierarchy.Group): 

664 if os.listdir(path): 

665 raise IsADirectoryError(f"Zarr group is not empty: {path}") 

666 try: 

667 os.rmdir(path) 

668 except Exception as e: 

669 raise StorageResolutionError(f"Failed to delete empty Zarr group: {path}") from e 

670 else: 

671 raise StorageResolutionError(f"Unrecognized Zarr object type at: {path}") 

672 

673 def delete_all(self, path: Union[str, Path]) -> None: 

674 """ 

675 Recursively delete a Zarr array or group (file or directory). 

676 

677 This is the only permitted recursive deletion method for the Zarr backend. 

678 

679 Args: 

680 path: the path shared through all backnds 

681 

682 Raises: 

683 FileNotFoundError: If the path does not exist 

684 StorageResolutionError: If deletion fails 

685 """ 

686 import os 

687 import shutil 

688 

689 path = str(path) 

690 

691 if not os.path.exists(path): 

692 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

693 

694 try: 

695 shutil.rmtree(path) 

696 except Exception as e: 

697 raise StorageResolutionError(f"Failed to recursively delete Zarr path: {path}") from e 

698 

699 def exists(self, path: Union[str, Path]) -> bool: 

700 path = Path(path) 

701 

702 # If path has no file extension, treat as directory existence check 

703 # This handles auto_detect_patterns asking "does this directory exist?" 

704 if not path.suffix: 

705 return path.exists() 

706 

707 # Otherwise, check zarr key existence (for actual files) 

708 store, key = self._split_store_and_key(path) 

709 

710 # First check if the zarr store itself exists 

711 if isinstance(store, str): 

712 store_path = Path(store) 

713 if not store_path.exists(): 

714 return False 

715 

716 try: 

717 root_group = zarr.group(store=store) 

718 return key in root_group or any(k.startswith(key.rstrip("/") + "/") for k in root_group.array_keys()) 

719 except Exception: 

720 # If we can't open the zarr store, it doesn't exist 

721 return False 

722 

723 def ensure_directory(self, directory: Union[str, Path]) -> Path: 

724 """ 

725 No-op for zarr backend - zarr stores handle their own structure. 

726 

727 Zarr doesn't have filesystem directories that need to be "ensured". 

728 Store creation and group structure is handled by save operations. 

729 """ 

730 return Path(directory) 

731 

732 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

733 store, src_key = self._split_store_and_key(source) 

734 store2, dst_key = self._split_store_and_key(link_name) 

735 

736 if store.root != store2.root: 

737 raise ValueError("Symlinks must exist within the same .zarr store") 

738 

739 group = zarr.group(store=store) 

740 if src_key not in group: 

741 raise FileNotFoundError(f"Source key '{src_key}' not found in Zarr store") 

742 

743 if dst_key in group: 

744 if not overwrite: 

745 raise FileExistsError(f"Symlink target already exists at: {dst_key}") 

746 # Remove existing entry if overwrite=True 

747 del group[dst_key] 

748 

749 # Create a new group at the symlink path 

750 link_group = group.require_group(dst_key) 

751 link_group.attrs["_symlink"] = src_key # Store as declared string 

752 

753 def is_symlink(self, path: Union[str, Path]) -> bool: 

754 """ 

755 Check if the given Zarr path represents a logical symlink (based on attribute contract). 

756 

757 Returns: 

758 bool: True if the key exists and has an OpenHCS-declared symlink attribute 

759 False if the key doesn't exist or is not a symlink 

760 """ 

761 store, key = self._split_store_and_key(path) 

762 group = zarr.group(store=store) 

763 

764 try: 

765 obj = group[key] 

766 attrs = getattr(obj, "attrs", {}) 

767 

768 if "_symlink" not in attrs: 

769 return False 

770 

771 # Enforce that the _symlink attr matches schema (e.g. str or list of path components) 

772 if not isinstance(attrs["_symlink"], str): 

773 raise StorageResolutionError(f"Invalid symlink format in Zarr attrs at: {path}") 

774 

775 return True 

776 except KeyError: 

777 # Key doesn't exist, so it's not a symlink 

778 return False 

779 except Exception as e: 

780 raise StorageResolutionError(f"Failed to inspect Zarr symlink at: {path}") from e 

781 

782 def _auto_chunks(self, data: Any, chunk_divisor: int = 1) -> Tuple[int, ...]: 

783 shape = data.shape 

784 

785 # Simple logic: 1/10th of each dim, with min 1 

786 return tuple(max(1, s // chunk_divisor) for s in shape) 

787 

788 def is_file(self, path: Union[str, Path]) -> bool: 

789 """ 

790 Check if a Zarr path points to a file (Zarr array), resolving both OS and Zarr-native symlinks. 

791 

792 Args: 

793 path: Zarr store path (may point to key within store) 

794 

795 Returns: 

796 bool: True if resolved path is a Zarr array 

797 

798 Raises: 

799 FileNotFoundError: If path does not exist or broken symlink 

800 IsADirectoryError: If resolved object is a Zarr group 

801 StorageResolutionError: For other failures 

802 """ 

803 path = str(path) 

804 

805 if not os.path.exists(path): 

806 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

807 

808 try: 

809 store, key = self._split_store_and_key(path) 

810 group = zarr.group(store=store) 

811 

812 # Resolve symlinks (Zarr-native, via .attrs) 

813 seen_keys = set() 

814 while True: 

815 if key not in group: 

816 raise FileNotFoundError(f"Zarr key does not exist: {key}") 

817 obj = group[key] 

818 

819 if hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

820 if key in seen_keys: 

821 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}") 

822 seen_keys.add(key) 

823 key = obj.attrs["_symlink"] 

824 continue 

825 break # resolution complete 

826 

827 # Now obj is the resolved target 

828 if isinstance(obj, zarr.core.Array): 

829 return True 

830 elif isinstance(obj, zarr.hierarchy.Group): 

831 raise IsADirectoryError(f"Zarr path is a group (directory): {path}") 

832 else: 

833 raise StorageResolutionError(f"Unknown Zarr object at: {path}") 

834 

835 except Exception as e: 

836 raise StorageResolutionError(f"Failed to resolve Zarr file path: {path}") from e 

837 

838 def is_dir(self, path: Union[str, Path]) -> bool: 

839 """ 

840 Check if a Zarr path resolves to a directory (i.e., a Zarr group). 

841  

842 Resolves both OS-level symlinks and Zarr-native symlinks via .attrs['_symlink']. 

843  

844 Args: 

845 path: Zarr path or URI 

846  

847 Returns: 

848 bool: True if path resolves to a Zarr group 

849  

850 Raises: 

851 FileNotFoundError: If path or resolved target does not exist 

852 NotADirectoryError: If resolved target is not a group 

853 StorageResolutionError: For symlink cycles or other failures 

854 """ 

855 import os 

856 

857 

858 path = str(path) 

859 

860 if not os.path.exists(path): 

861 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

862 

863 try: 

864 store, key = self._split_store_and_key(path) 

865 group = zarr.group(store=store) 

866 

867 seen_keys = set() 

868 

869 # Resolve symlink chain 

870 while True: 

871 if key not in group: 

872 raise FileNotFoundError(f"Zarr key does not exist: {key}") 

873 obj = group[key] 

874 

875 if hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

876 if key in seen_keys: 

877 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}") 

878 seen_keys.add(key) 

879 key = obj.attrs["_symlink"] 

880 continue 

881 break 

882 

883 # obj is resolved 

884 if isinstance(obj, zarr.hierarchy.Group): 

885 return True 

886 elif isinstance(obj, zarr.core.Array): 

887 raise NotADirectoryError(f"Zarr path is an array (file): {path}") 

888 else: 

889 raise StorageResolutionError(f"Unknown Zarr object at: {path}") 

890 

891 except Exception as e: 

892 raise StorageResolutionError(f"Failed to resolve Zarr directory path: {path}") from e 

893 

894 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

895 """ 

896 Move a Zarr key or object (array/group) from one location to another, resolving symlinks. 

897  

898 Supports: 

899 - Disk or memory stores 

900 - Zarr-native symlinks 

901 - Key renames within group 

902 - Full copy+delete across stores if needed 

903  

904 Raises: 

905 FileNotFoundError: If src does not exist 

906 FileExistsError: If dst already exists 

907 StorageResolutionError: On failure 

908 """ 

909 import zarr 

910 

911 src_store, src_key = self._split_store_and_key(src) 

912 dst_store, dst_key = self._split_store_and_key(dst) 

913 

914 src_group = zarr.group(store=src_store) 

915 dst_group = zarr.group(store=dst_store) 

916 

917 if src_key not in src_group: 

918 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}") 

919 if dst_key in dst_group: 

920 raise FileExistsError(f"Zarr destination key already exists: {dst_key}") 

921 

922 obj = src_group[src_key] 

923 

924 # Resolve symlinks if present 

925 seen_keys = set() 

926 while hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

927 if src_key in seen_keys: 

928 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}") 

929 seen_keys.add(src_key) 

930 src_key = obj.attrs["_symlink"] 

931 obj = src_group[src_key] 

932 

933 try: 

934 if src_store is dst_store: 

935 # Native move within the same Zarr group/store 

936 src_group.move(src_key, dst_key) 

937 else: 

938 # Cross-store: perform manual copy + delete 

939 obj.copy(dst_group, name=dst_key) 

940 del src_group[src_key] 

941 except Exception as e: 

942 raise StorageResolutionError(f"Failed to move {src_key} to {dst_key}") from e 

943 

944 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

945 """ 

946 Copy a Zarr key or object (array/group) from one location to another. 

947 

948 - Resolves Zarr-native symlinks before copying 

949 - Prevents overwrite unless explicitly allowed (future feature) 

950 - Works across memory or disk stores 

951 

952 Raises: 

953 FileNotFoundError: If src does not exist 

954 FileExistsError: If dst already exists 

955 StorageResolutionError: On failure 

956 """ 

957 import zarr 

958 

959 src_store, src_key = self._split_store_and_key(src) 

960 dst_store, dst_key = self._split_store_and_key(dst) 

961 

962 src_group = zarr.group(store=src_store) 

963 dst_group = zarr.group(store=dst_store) 

964 

965 if src_key not in src_group: 

966 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}") 

967 if dst_key in dst_group: 

968 raise FileExistsError(f"Zarr destination key already exists: {dst_key}") 

969 

970 obj = src_group[src_key] 

971 

972 seen_keys = set() 

973 while hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

974 if src_key in seen_keys: 

975 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}") 

976 seen_keys.add(src_key) 

977 src_key = obj.attrs["_symlink"] 

978 obj = src_group[src_key] 

979 

980 try: 

981 obj.copy(dst_group, name=dst_key) 

982 except Exception as e: 

983 raise StorageResolutionError(f"Failed to copy {src_key} to {dst_key}") from e 

984 

985 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

986 """ 

987 Return structural metadata about a Zarr path. 

988 

989 Returns: 

990 dict with keys: 

991 - 'type': 'file', 'directory', 'symlink', or 'missing' 

992 - 'key': final resolved key 

993 - 'target': symlink target if applicable 

994 - 'store': repr(store) 

995 - 'exists': bool 

996 

997 Raises: 

998 StorageResolutionError: On resolution failure 

999 """ 

1000 store, key = self._split_store_and_key(path) 

1001 group = zarr.group(store=store) 

1002 

1003 try: 

1004 if key in group: 

1005 obj = group[key] 

1006 attrs = getattr(obj, "attrs", {}) 

1007 is_link = "_symlink" in attrs 

1008 

1009 if is_link: 

1010 target = attrs["_symlink"] 

1011 if not isinstance(target, str): 

1012 raise StorageResolutionError(f"Invalid symlink format at {key}") 

1013 return { 

1014 "type": "symlink", 

1015 "key": key, 

1016 "target": target, 

1017 "store": repr(store), 

1018 "exists": target in group 

1019 } 

1020 

1021 if isinstance(obj, zarr.Array): 

1022 return { 

1023 "type": "file", 

1024 "key": key, 

1025 "store": repr(store), 

1026 "exists": True 

1027 } 

1028 

1029 elif isinstance(obj, zarr.Group): 

1030 return { 

1031 "type": "directory", 

1032 "key": key, 

1033 "store": repr(store), 

1034 "exists": True 

1035 } 

1036 

1037 raise StorageResolutionError(f"Unknown object type at: {key}") 

1038 else: 

1039 return { 

1040 "type": "missing", 

1041 "key": key, 

1042 "store": repr(store), 

1043 "exists": False 

1044 } 

1045 

1046 except Exception as e: 

1047 raise StorageResolutionError(f"Failed to stat Zarr key {key}") from e 

1048 

1049class ZarrSymlink: 

1050 """ 

1051 Represents a symbolic link in a Zarr store. 

1052 

1053 This class is used to represent symbolic links in a Zarr store. 

1054 It stores the target path of the symlink. 

1055 """ 

1056 def __init__(self, target: str): 

1057 self.target = target 

1058 

1059 def __repr__(self): 

1060 return f"<ZarrSymlink → {self.target}>"