Coverage for openhcs/io/zarr.py: 47.2%

602 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1# openhcs/io/storage/backends/zarr.py 

2""" 

3Zarr storage backend module for OpenHCS. 

4 

5This module provides a Zarr-backed implementation of the MicroscopyStorageBackend interface. 

6It stores data in a Zarr store on disk and supports overlay operations 

7for materializing data to disk when needed. 

8""" 

9 

10import fnmatch 

11import logging 

12import threading 

13from functools import wraps 

14from pathlib import Path 

15from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union 

16 

17import numpy as np 

18import zarr 

19 

20# Lazy ome-zarr loading to avoid dask → GPU library chain at import time 

21_ome_zarr_state = {'available': None, 'cache': {}, 'event': threading.Event(), 'thread': None} 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26# Decorator for passthrough to disk backend 

27def passthrough_to_disk(*extensions: str, ensure_parent_dir: bool = False): 

28 """ 

29 Decorator to automatically passthrough certain file types to disk backend. 

30 

31 Zarr only supports array data, so non-array files (JSON, CSV, TXT, ROI.ZIP, etc.) 

32 are automatically delegated to the disk backend. 

33 

34 Uses introspection to automatically find the path parameter (any parameter with 'path' in its name). 

35 

36 Args: 

37 *extensions: File extensions to passthrough (e.g., '.json', '.csv', '.txt') 

38 ensure_parent_dir: If True, ensure parent directory exists before calling disk backend (for save operations) 

39 

40 Usage: 

41 @passthrough_to_disk('.json', '.csv', '.txt', '.roi.zip', '.zip', ensure_parent_dir=True) 

42 def save(self, data, output_path, **kwargs): 

43 # Zarr-specific save logic here 

44 ... 

45 """ 

46 import inspect 

47 

48 def decorator(method: Callable) -> Callable: 

49 # Use introspection to find the path parameter index at decoration time 

50 sig = inspect.signature(method) 

51 path_param_index = None 

52 

53 for i, (param_name, param) in enumerate(sig.parameters.items()): 53 ↛ 62line 53 didn't jump to line 62 because the loop on line 53 didn't complete

54 if param_name == 'self': 

55 continue 

56 # Find first parameter with 'path' in its name 

57 if 'path' in param_name.lower(): 

58 # Adjust for self parameter (subtract 1 since we skip 'self' in args) 

59 path_param_index = i - 1 

60 break 

61 

62 if path_param_index is None: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 raise ValueError(f"No path parameter found in {method.__name__} signature. " 

64 f"Expected a parameter with 'path' in its name.") 

65 

66 @wraps(method) 

67 def wrapper(self, *args, **kwargs): 

68 # Extract path from args at the discovered index 

69 path_arg = None 

70 

71 if len(args) > path_param_index: 71 ↛ 77line 71 didn't jump to line 77 because the condition on line 71 was always true

72 arg = args[path_param_index] 

73 if isinstance(arg, (str, Path)): 73 ↛ 77line 73 didn't jump to line 77 because the condition on line 73 was always true

74 path_arg = str(arg) 

75 

76 # Check if path matches passthrough extensions 

77 if path_arg and any(path_arg.endswith(ext) for ext in extensions): 

78 from openhcs.constants.constants import Backend 

79 from openhcs.io.backend_registry import get_backend_instance 

80 disk_backend = get_backend_instance(Backend.DISK.value) 

81 

82 # Ensure parent directory exists if requested (for save operations) 

83 if ensure_parent_dir: 

84 parent_dir = Path(path_arg).parent 

85 disk_backend.ensure_directory(parent_dir) 

86 

87 # Call the same method on disk backend 

88 return getattr(disk_backend, method.__name__)(*args, **kwargs) 

89 

90 # Otherwise, call the original method 

91 return method(self, *args, **kwargs) 

92 

93 return wrapper 

94 return decorator 

95 

96 

97def _load_ome_zarr(): 

98 """Load ome-zarr and cache imports.""" 

99 try: 

100 logger.info("Loading ome-zarr...") 

101 from ome_zarr.writer import write_image, write_plate_metadata, write_well_metadata 

102 from ome_zarr.io import parse_url 

103 

104 _ome_zarr_state['cache'] = { 

105 'write_image': write_image, 

106 'write_plate_metadata': write_plate_metadata, 

107 'write_well_metadata': write_well_metadata, 

108 'parse_url': parse_url 

109 } 

110 _ome_zarr_state['available'] = True 

111 logger.info("ome-zarr loaded successfully") 

112 except ImportError as e: 

113 _ome_zarr_state['available'] = False 

114 logger.warning(f"ome-zarr not available: {e}") 

115 finally: 

116 _ome_zarr_state['event'].set() 

117 

118 

119def start_ome_zarr_loading_async(): 

120 """Start loading ome-zarr in background thread (safe to call multiple times).""" 

121 if _ome_zarr_state['thread'] is None and _ome_zarr_state['available'] is None: 

122 _ome_zarr_state['thread'] = threading.Thread( 

123 target=_load_ome_zarr, daemon=True, name="ome-zarr-loader" 

124 ) 

125 _ome_zarr_state['thread'].start() 

126 logger.info("Started ome-zarr background loading") 

127 

128 

129def _ensure_ome_zarr(timeout: float = 30.0): 

130 """ 

131 Ensure ome-zarr is loaded, waiting for background load if needed. 

132 

133 Returns: Tuple of (write_image, write_plate_metadata, write_well_metadata, parse_url) 

134 Raises: ImportError if ome-zarr not available, TimeoutError if loading times out 

135 """ 

136 # Load synchronously if not started 

137 if _ome_zarr_state['available'] is None and _ome_zarr_state['thread'] is None: 

138 logger.warning("ome-zarr not pre-loaded, loading synchronously (will block)") 

139 _load_ome_zarr() 

140 

141 # Wait for background loading 

142 if not _ome_zarr_state['event'].is_set(): 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 logger.info("Waiting for ome-zarr background loading...") 

144 if not _ome_zarr_state['event'].wait(timeout): 

145 raise TimeoutError(f"ome-zarr loading timed out after {timeout}s") 

146 

147 # Check availability 

148 if not _ome_zarr_state['available']: 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 raise ImportError("ome-zarr library not available. Install with: pip install ome-zarr") 

150 

151 cache = _ome_zarr_state['cache'] 

152 return (cache['write_image'], cache['write_plate_metadata'], 

153 cache['write_well_metadata'], cache['parse_url']) 

154 

155# Cross-platform file locking 

156try: 

157 import fcntl 

158 FCNTL_AVAILABLE = True 

159except ImportError: 

160 import portalocker 

161 FCNTL_AVAILABLE = False 

162 

163from openhcs.constants.constants import Backend 

164from openhcs.io.base import StorageBackend 

165from openhcs.io.exceptions import StorageResolutionError 

166 

167 

168class ZarrStorageBackend(StorageBackend): 

169 """Zarr storage backend with automatic registration.""" 

170 _backend_type = Backend.ZARR.value 

171 """ 

172 Zarr storage backend implementation with configurable compression. 

173 

174 This class provides a concrete implementation of the storage backend interfaces 

175 for Zarr storage. It stores data in a Zarr store on disk with configurable 

176 compression algorithms and settings. 

177 

178 Features: 

179 - Single-chunk batch operations for 40x performance improvement 

180 - Configurable compression (Blosc, Zlib, LZ4, Zstd, or none) 

181 - Configurable compression levels 

182 - Full path mapping for batch operations 

183 """ 

184 

185 def __init__(self, zarr_config: Optional['ZarrConfig'] = None): 

186 """ 

187 Initialize Zarr backend with ZarrConfig. 

188 

189 Args: 

190 zarr_config: ZarrConfig dataclass with all zarr settings (uses defaults if None) 

191 """ 

192 # Import here to avoid circular imports 

193 from openhcs.core.config import ZarrConfig 

194 

195 if zarr_config is None: 

196 zarr_config = ZarrConfig() 

197 

198 self.config = zarr_config 

199 

200 # Convenience attributes 

201 self.compression_level = zarr_config.compression_level 

202 

203 # Create actual compressor from config (shuffle always enabled for Blosc) 

204 self.compressor = self.config.compressor.create_compressor( 

205 self.config.compression_level, 

206 shuffle=True # Always enable shuffle for better compression 

207 ) 

208 

209 def _get_compressor(self) -> Optional[Any]: 

210 """ 

211 Get the configured compressor with appropriate settings. 

212 

213 Returns: 

214 Configured compressor instance or None for no compression 

215 """ 

216 if self.compressor is None: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 return None 

218 

219 # If compression_level is specified and compressor supports it 

220 if self.compression_level is not None: 220 ↛ 234line 220 didn't jump to line 234 because the condition on line 220 was always true

221 # Check if compressor has level parameter 

222 if hasattr(self.compressor, '__class__'): 222 ↛ 234line 222 didn't jump to line 234 because the condition on line 222 was always true

223 try: 

224 # Create new instance with compression level 

225 compressor_class = self.compressor.__class__ 

226 if 'level' in compressor_class.__init__.__code__.co_varnames: 226 ↛ 228line 226 didn't jump to line 228 because the condition on line 226 was always true

227 return compressor_class(level=self.compression_level) 

228 elif 'clevel' in compressor_class.__init__.__code__.co_varnames: 

229 return compressor_class(clevel=self.compression_level) 

230 except (AttributeError, TypeError): 

231 # Fall back to original compressor if level setting fails 

232 pass 

233 

234 return self.compressor 

235 

236 def _calculate_chunks(self, data_shape: Tuple[int, ...]) -> Tuple[int, ...]: 

237 """ 

238 Calculate chunk shape based on configured strategy. 

239 

240 Args: 

241 data_shape: Shape of the 5D array (fields, channels, z, y, x) 

242 

243 Returns: 

244 Chunk shape tuple 

245 """ 

246 from openhcs.core.config import ZarrChunkStrategy 

247 

248 match self.config.chunk_strategy: 

249 case ZarrChunkStrategy.WELL: 249 ↛ 252line 249 didn't jump to line 252 because the pattern on line 249 always matched

250 # Single chunk for entire well (current behavior, optimal for batch I/O) 

251 return data_shape 

252 case ZarrChunkStrategy.FILE: 

253 # One chunk per individual file: (1, 1, 1, y, x) 

254 # Each original tif is compressed separately 

255 return (1, 1, 1, data_shape[3], data_shape[4]) 

256 

257 def _split_store_and_key(self, path: Union[str, Path]) -> Tuple[Any, str]: 

258 """ 

259 Split path into zarr store and key. 

260 

261 The zarr store is always the directory containing the image files, regardless of backend. 

262 For example: 

263 - "/path/to/plate_outputs/images/A01.tif" → Store: "/path/to/plate_outputs/images", Key: "A01.tif" 

264 - "/path/to/plate.zarr/images/A01.tif" → Store: "/path/to/plate.zarr/images", Key: "A01.tif" 

265 

266 The images directory itself becomes the zarr store - zarr files are added within it. 

267 A zarr store doesn't need to have a folder name ending in .zarr. 

268 

269 Returns a DirectoryStore with dimension_separator='/' for OME-ZARR compatibility. 

270 """ 

271 path = Path(path) 

272 

273 # If path has a file extension (like .tif), the parent directory is the zarr store 

274 if path.suffix: 

275 # File path - parent directory (e.g., "images") is the zarr store 

276 store_path = path.parent 

277 relative_key = path.name 

278 else: 

279 # Directory path - treat as zarr store 

280 store_path = path 

281 relative_key = "" 

282 

283 # CRITICAL: Create DirectoryStore with dimension_separator='/' for OME-ZARR compatibility 

284 # This ensures chunk paths use '/' instead of '.' (e.g., '0/0/0' not '0.0.0') 

285 store = zarr.DirectoryStore(str(store_path), dimension_separator='/') 

286 return store, relative_key 

287 

288 @passthrough_to_disk('.json', '.csv', '.txt', '.roi.zip', '.zip', ensure_parent_dir=True) 

289 def save(self, data: Any, output_path: Union[str, Path], **kwargs): 

290 """ 

291 Save data to Zarr at the given output_path. 

292 

293 Will only write if the key does not already exist. 

294 Will NOT overwrite or delete existing data. 

295 

296 Raises: 

297 FileExistsError: If destination key already exists 

298 StorageResolutionError: If creation fails 

299 """ 

300 # Zarr-specific save logic (non-array files automatically passthrough to disk) 

301 store, key = self._split_store_and_key(output_path) 

302 group = zarr.group(store=store) 

303 

304 if key in group: 

305 raise FileExistsError(f"Zarr key already exists: {output_path}") 

306 

307 chunks = kwargs.get("chunks") 

308 if chunks is None: 

309 chunks = self._auto_chunks(data, chunk_divisor=kwargs.get("chunk_divisor", 1)) 

310 

311 try: 

312 # Create array with correct shape and dtype, then assign data 

313 array = group.create_dataset( 

314 name=key, 

315 shape=data.shape, 

316 dtype=data.dtype, 

317 chunks=chunks, 

318 compressor=kwargs.get("compressor", self._get_compressor()), 

319 overwrite=False # 🔒 Must be False by doctrine 

320 ) 

321 array[:] = data 

322 except Exception as e: 

323 raise StorageResolutionError(f"Failed to save to Zarr: {output_path}") from e 

324 

325 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

326 """ 

327 Load from zarr array using filename mapping. 

328 

329 Args: 

330 file_paths: List of file paths to load 

331 **kwargs: Additional arguments (zarr_config not needed) 

332 

333 Returns: 

334 List of loaded data objects in same order as file_paths 

335 

336 Raises: 

337 FileNotFoundError: If expected zarr store not found 

338 KeyError: If filename not found in filename_map 

339 """ 

340 if not file_paths: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 return [] 

342 

343 # Use _split_store_and_key to get store path from first file path 

344 store, _ = self._split_store_and_key(file_paths[0]) 

345 store_path = Path(store.path) 

346 

347 # FAIL LOUD: Store must exist 

348 if not store_path.exists(): 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true

349 raise FileNotFoundError(f"Expected zarr store not found: {store_path}") 

350 root = zarr.open_group(store=store, mode='r') 

351 

352 # Group files by well based on OME-ZARR structure 

353 well_to_files = {} 

354 well_to_indices = {} 

355 

356 # Search OME-ZARR structure for requested files 

357 for row_name in root.group_keys(): 

358 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 358 ↛ 357line 358 didn't jump to line 357 because the condition on line 358 was always true

359 row_group = root[row_name] 

360 for col_name in row_group.group_keys(): 

361 if col_name.isdigit(): # Column directory (01, 02, etc.) 361 ↛ 360line 361 didn't jump to line 360 because the condition on line 361 was always true

362 well_group = row_group[col_name] 

363 well_name = f"{row_name}{col_name}" 

364 

365 # Check if this well has our filename mapping in the field array 

366 if "0" in well_group.group_keys(): 366 ↛ 360line 366 didn't jump to line 360 because the condition on line 366 was always true

367 field_group = well_group["0"] 

368 if "0" in field_group.array_keys(): 368 ↛ 360line 368 didn't jump to line 360 because the condition on line 368 was always true

369 field_array = field_group["0"] 

370 if "openhcs_filename_map" in field_array.attrs: 370 ↛ 360line 370 didn't jump to line 360 because the condition on line 370 was always true

371 filename_map = dict(field_array.attrs["openhcs_filename_map"]) 

372 

373 # Check which requested files are in this well 

374 for i, path in enumerate(file_paths): 

375 filename = Path(path).name # Use filename only for matching 

376 if filename in filename_map: 

377 if well_name not in well_to_files: 

378 well_to_files[well_name] = [] 

379 well_to_indices[well_name] = [] 

380 well_to_files[well_name].append(i) # Original position in file_paths 

381 well_to_indices[well_name].append(filename_map[filename]) # 5D coordinates (field, channel, z) 

382 

383 # Load data from each well using single well chunk 

384 results = [None] * len(file_paths) # Pre-allocate results array 

385 

386 for well_name, file_positions in well_to_files.items(): 

387 row, col = well_name[0], well_name[1:] 

388 well_group = root[row][col] 

389 well_indices = well_to_indices[well_name] 

390 

391 # Load entire well field array in single operation (well chunking) 

392 field_group = well_group["0"] 

393 field_array = field_group["0"] 

394 all_well_data = field_array[:] # Single I/O operation for entire well 

395 

396 # Extract requested 2D slices using 5D coordinates 

397 for file_pos, coords_5d in zip(file_positions, well_indices): 

398 field_idx, channel_idx, z_idx = coords_5d 

399 # Extract 2D slice: (field, channel, z, y, x) -> (y, x) 

400 results[file_pos] = all_well_data[field_idx, channel_idx, z_idx, :, :] # 2D slice 

401 

402 logger.debug(f"Loaded {len(file_paths)} images from zarr store at {store_path} from {len(well_to_files)} wells") 

403 return results 

404 

405 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

406 """Save multiple images using ome-zarr-py for proper OME-ZARR compliance with multi-dimensional support. 

407 

408 Args: 

409 data_list: List of image data to save 

410 output_paths: List of output file paths 

411 **kwargs: Must include chunk_name, n_channels, n_z, n_fields, row, col 

412 """ 

413 

414 # Ensure ome-zarr is loaded (waits for background load if needed) 

415 write_image, write_plate_metadata, write_well_metadata, _ = _ensure_ome_zarr() 

416 

417 # Extract required parameters from kwargs 

418 chunk_name = kwargs.get('chunk_name') 

419 n_channels = kwargs.get('n_channels') 

420 n_z = kwargs.get('n_z') 

421 n_fields = kwargs.get('n_fields') 

422 row = kwargs.get('row') 

423 col = kwargs.get('col') 

424 

425 # Validate required parameters 

426 if chunk_name is None: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 raise ValueError("chunk_name must be provided") 

428 if n_channels is None: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 raise ValueError("n_channels must be provided") 

430 if n_z is None: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true

431 raise ValueError("n_z must be provided") 

432 if n_fields is None: 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true

433 raise ValueError("n_fields must be provided") 

434 if row is None: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 raise ValueError("row must be provided") 

436 if col is None: 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true

437 raise ValueError("col must be provided") 

438 

439 if not data_list: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 logger.warning(f"Empty data list for chunk {chunk_name}") 

441 return 

442 

443 if not _ome_zarr_state['available']: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 raise ImportError("ome-zarr package is required. Install with: pip install ome-zarr") 

445 

446 # Use _split_store_and_key to get store path from first output path 

447 store, _ = self._split_store_and_key(output_paths[0]) 

448 store_path = Path(store.path) 

449 

450 logger.debug(f"Saving batch for chunk {chunk_name} with {len(data_list)} images to row={row}, col={col}") 

451 

452 # Convert GPU arrays to CPU arrays before saving 

453 cpu_data_list = [] 

454 for data in data_list: 

455 if hasattr(data, 'get'): # CuPy array 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 cpu_data_list.append(data.get()) 

457 elif hasattr(data, 'cpu'): # PyTorch tensor 457 ↛ 458line 457 didn't jump to line 458 because the condition on line 457 was never true

458 cpu_data_list.append(data.cpu().numpy()) 

459 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # JAX on GPU 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true

460 import jax 

461 cpu_data_list.append(jax.device_get(data)) 

462 else: # Already CPU array (NumPy, etc.) 

463 cpu_data_list.append(data) 

464 

465 # Ensure parent directory exists 

466 store_path.parent.mkdir(parents=True, exist_ok=True) 

467 

468 # Use _split_store_and_key to get properly configured store with dimension_separator='/' 

469 store, _ = self._split_store_and_key(store_path) 

470 root = zarr.group(store=store) # Open existing or create new group without mode conflicts 

471 

472 # Set OME metadata if not already present 

473 if "ome" not in root.attrs: 

474 root.attrs["ome"] = {"version": "0.4"} 

475 

476 # Get the store for compatibility with existing code 

477 store = root.store 

478 

479 # Write plate metadata with locking to prevent concurrent corruption 

480 # Always enabled for OME-ZARR HCS compliance 

481 self._ensure_plate_metadata_with_lock(root, row, col, store_path) 

482 

483 # Create HCS-compliant structure: plate/row/col/field/resolution 

484 # Create row group if it doesn't exist 

485 if row not in root: 

486 row_group = root.create_group(row) 

487 else: 

488 row_group = root[row] 

489 

490 # Create well group (remove existing if present to allow overwrite) 

491 if col in row_group: 

492 del row_group[col] 

493 well_group = row_group.create_group(col) 

494 

495 # Add HCS well metadata 

496 well_metadata = { 

497 "images": [ 

498 { 

499 "path": "0", # Single image containing all fields 

500 "acquisition": 0 

501 } 

502 ], 

503 "version": "0.5" 

504 } 

505 well_group.attrs["ome"] = {"version": "0.5", "well": well_metadata} 

506 

507 # Create field group (single field "0" containing all field data) 

508 field_group = well_group.require_group("0") 

509 

510 # Always use full 5D structure: (fields, channels, z, y, x) 

511 # Define OME-NGFF compliant axes 

512 axes = [ 

513 {'name': 'field', 'type': 'field'}, # Custom field type - allowed before space 

514 {'name': 'c', 'type': 'channel'}, 

515 {'name': 'z', 'type': 'space'}, 

516 {'name': 'y', 'type': 'space'}, 

517 {'name': 'x', 'type': 'space'} 

518 ] 

519 

520 # Get image dimensions 

521 sample_image = cpu_data_list[0] 

522 height, width = sample_image.shape[-2:] 

523 

524 # Always reshape to full 5D: (n_fields, n_channels, n_z, y, x) 

525 target_shape = [n_fields, n_channels, n_z, height, width] 

526 

527 # Stack and reshape data 

528 stacked_data = np.stack(cpu_data_list, axis=0) 

529 

530 # Calculate total expected images for validation 

531 total_expected = n_fields * n_channels * n_z 

532 if len(data_list) != total_expected: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true

533 logger.warning(f"Data count mismatch: got {len(data_list)}, expected {total_expected} " 

534 f"(fields={n_fields}, channels={n_channels}, z={n_z})") 

535 

536 # Log detailed shape information before reshape 

537 logger.info(f"🔍 ZARR RESHAPE DEBUG:") 

538 logger.info(f" - Input: {len(data_list)} images") 

539 logger.info(f" - Stacked shape: {stacked_data.shape}") 

540 logger.info(f" - Stacked size: {stacked_data.size}") 

541 logger.info(f" - Target shape: {target_shape}") 

542 logger.info(f" - Target size: {np.prod(target_shape)}") 

543 logger.info(f" - Sample image shape: {sample_image.shape}") 

544 logger.info(f" - Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, h={height}, w={width}") 

545 

546 # Always reshape to 5D structure 

547 reshaped_data = stacked_data.reshape(target_shape) 

548 

549 logger.info(f"Zarr save_batch: {len(data_list)} images → {stacked_data.shape}{reshaped_data.shape}") 

550 axes_names = [ax['name'] for ax in axes] 

551 logger.info(f"Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, axes={''.join(axes_names)}") 

552 

553 # Create field group (single field "0" containing all field data) 

554 if "0" in well_group: 554 ↛ 557line 554 didn't jump to line 557 because the condition on line 554 was always true

555 field_group = well_group["0"] 

556 else: 

557 field_group = well_group.create_group("0") 

558 

559 # Write OME-ZARR well metadata with single field (well-chunked approach) 

560 write_well_metadata(well_group, ['0']) 

561 

562 # Calculate chunks based on configured strategy 

563 storage_options = { 

564 "chunks": self._calculate_chunks(reshaped_data.shape), 

565 "compressor": self._get_compressor() 

566 } 

567 

568 # Write as single well-chunked array with proper multi-dimensional axes 

569 write_image( 

570 image=reshaped_data, 

571 group=field_group, 

572 axes=axes, 

573 storage_options=storage_options, 

574 scaler=None, # Single scale only for performance 

575 compute=True 

576 ) 

577 

578 # Axes are already correctly set by write_image function 

579 

580 # Store filename mapping with 5D coordinates (field, channel, z, y, x) 

581 # Convert flat index to 5D coordinates for proper zarr slicing 

582 filename_map = {} 

583 for i, path in enumerate(output_paths): 

584 # Calculate 5D coordinates from flat index 

585 field_idx = i // (n_channels * n_z) 

586 remaining = i % (n_channels * n_z) 

587 channel_idx = remaining // n_z 

588 z_idx = remaining % n_z 

589 

590 # Store as tuple (field, channel, z) - y,x are full slices 

591 filename_map[Path(path).name] = (field_idx, channel_idx, z_idx) 

592 

593 field_array = field_group['0'] 

594 field_array.attrs["openhcs_filename_map"] = filename_map 

595 field_array.attrs["openhcs_output_paths"] = [str(path) for path in output_paths] 

596 field_array.attrs["openhcs_dimensions"] = { 

597 "n_fields": n_fields, 

598 "n_channels": n_channels, 

599 "n_z": n_z 

600 } 

601 

602 logger.debug(f"Successfully saved batch for chunk {chunk_name}") 

603 

604 # Aggressive memory cleanup 

605 del cpu_data_list 

606 import gc 

607 gc.collect() 

608 

609 def _ensure_plate_metadata_with_lock(self, root: zarr.Group, row: str, col: str, store_path: Path) -> None: 

610 """Ensure plate-level metadata includes ALL existing wells with file locking.""" 

611 lock_path = store_path.with_suffix('.metadata.lock') 

612 

613 try: 

614 with open(lock_path, 'w') as lock_file: 

615 if FCNTL_AVAILABLE: 615 ↛ 618line 615 didn't jump to line 618 because the condition on line 615 was always true

616 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) 

617 else: 

618 portalocker.lock(lock_file, portalocker.LOCK_EX) 

619 self._ensure_plate_metadata(root, row, col) 

620 except Exception as e: 

621 logger.error(f"Failed to update plate metadata with lock: {e}") 

622 raise 

623 finally: 

624 if lock_path.exists(): 624 ↛ exitline 624 didn't return from function '_ensure_plate_metadata_with_lock' because the condition on line 624 was always true

625 lock_path.unlink() 

626 

627 def _ensure_plate_metadata(self, root: zarr.Group, row: str, col: str) -> None: 

628 """Ensure plate-level metadata includes ALL existing wells in the store.""" 

629 

630 # Ensure ome-zarr is loaded 

631 _, write_plate_metadata, _, _ = _ensure_ome_zarr() 

632 

633 # Scan the store for all existing wells 

634 all_rows = set() 

635 all_cols = set() 

636 all_wells = [] 

637 

638 for row_name in root.group_keys(): 

639 if isinstance(root[row_name], zarr.Group): # Ensure it's a row group 639 ↛ 638line 639 didn't jump to line 638 because the condition on line 639 was always true

640 row_group = root[row_name] 

641 all_rows.add(row_name) 

642 

643 for col_name in row_group.group_keys(): 

644 if isinstance(row_group[col_name], zarr.Group): # Ensure it's a well group 644 ↛ 643line 644 didn't jump to line 643 because the condition on line 644 was always true

645 all_cols.add(col_name) 

646 well_path = f"{row_name}/{col_name}" 

647 all_wells.append(well_path) 

648 

649 # Include the current well being added (might not exist yet) 

650 all_rows.add(row) 

651 all_cols.add(col) 

652 current_well_path = f"{row}/{col}" 

653 if current_well_path not in all_wells: 

654 all_wells.append(current_well_path) 

655 

656 # Sort for consistent ordering 

657 sorted_rows = sorted(all_rows) 

658 sorted_cols = sorted(all_cols) 

659 sorted_wells = sorted(all_wells) 

660 

661 # Build wells metadata with proper indices 

662 wells_metadata = [] 

663 for well_path in sorted_wells: 

664 well_row, well_col = well_path.split("/") 

665 row_index = sorted_rows.index(well_row) 

666 col_index = sorted_cols.index(well_col) 

667 wells_metadata.append({ 

668 "path": well_path, 

669 "rowIndex": row_index, 

670 "columnIndex": col_index 

671 }) 

672 

673 # Add acquisition metadata for HCS compliance 

674 acquisitions = [ 

675 { 

676 "id": 0, 

677 "name": "default_acquisition", 

678 "maximumfieldcount": 1 # Single field containing all field data 

679 } 

680 ] 

681 

682 # Write complete HCS plate metadata 

683 write_plate_metadata( 

684 root, 

685 sorted_rows, 

686 sorted_cols, 

687 wells_metadata, 

688 acquisitions=acquisitions, 

689 field_count=1, 

690 name="OpenHCS_Plate" 

691 ) 

692 

693 

694 

695 

696 

697 

698 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

699 """ 

700 Load a single file from zarr store. 

701 

702 For OME-ZARR structure with filename mapping, delegates to load_batch. 

703 For legacy flat structure or direct keys, uses direct key lookup. 

704 

705 Args: 

706 file_path: Path to file to load 

707 **kwargs: Additional arguments 

708 

709 Returns: 

710 Loaded array data 

711 

712 Raises: 

713 FileNotFoundError: If file not found in zarr store 

714 """ 

715 store, key = self._split_store_and_key(file_path) 

716 group = zarr.group(store=store) 

717 

718 # Check if this is OME-ZARR structure with filename mapping 

719 if "plate" in group.attrs: 

720 # OME-ZARR structure: use load_batch which understands filename mapping 

721 result = self.load_batch([file_path], **kwargs) 

722 if not result: 

723 raise FileNotFoundError(f"File not found in OME-ZARR store: {file_path}") 

724 return result[0] 

725 

726 # Legacy flat structure: direct key lookup with symlink resolution 

727 visited = set() 

728 while self.is_symlink(key): 

729 if key in visited: 

730 raise RuntimeError(f"Zarr symlink loop detected at {key}") 

731 visited.add(key) 

732 key = group[key].attrs["_symlink"] 

733 

734 if key not in group: 

735 raise FileNotFoundError(f"No array found at key '{key}'") 

736 return group[key][:] 

737 

738 def list_files(self, 

739 directory: Union[str, Path], 

740 pattern: Optional[str] = None, 

741 extensions: Optional[Set[str]] = None, 

742 recursive: bool = False) -> List[Path]: 

743 """ 

744 List all file-like entries (i.e. arrays) in a Zarr store, optionally filtered. 

745 Returns filenames from array attributes (output_paths) if available. 

746 """ 

747 

748 store, relative_key = self._split_store_and_key(directory) 

749 result: List[Path] = [] 

750 

751 def _matches_filters(name: str) -> bool: 

752 if pattern and not fnmatch.fnmatch(name, pattern): 752 ↛ 753line 752 didn't jump to line 753 because the condition on line 752 was never true

753 return False 

754 if extensions: 754 ↛ 756line 754 didn't jump to line 756 because the condition on line 754 was always true

755 return any(name.lower().endswith(ext.lower()) for ext in extensions) 

756 return True 

757 

758 try: 

759 # Open zarr group and traverse OME-ZARR structure 

760 group = zarr.open_group(store=store) 

761 

762 # Check if this is OME-ZARR structure (has plate metadata) 

763 if "plate" in group.attrs: 763 ↛ 785line 763 didn't jump to line 785 because the condition on line 763 was always true

764 # OME-ZARR structure: traverse A/01/ wells 

765 for row_name in group.group_keys(): 

766 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 766 ↛ 765line 766 didn't jump to line 765 because the condition on line 766 was always true

767 row_group = group[row_name] 

768 for col_name in row_group.group_keys(): 

769 if col_name.isdigit(): # Column directory (01, 02, etc.) 769 ↛ 768line 769 didn't jump to line 768 because the condition on line 769 was always true

770 well_group = row_group[col_name] 

771 

772 # Get filenames from field array metadata 

773 if "0" in well_group.group_keys(): 773 ↛ 768line 773 didn't jump to line 768 because the condition on line 773 was always true

774 field_group = well_group["0"] 

775 if "0" in field_group.array_keys(): 775 ↛ 768line 775 didn't jump to line 768 because the condition on line 775 was always true

776 field_array = field_group["0"] 

777 if "openhcs_output_paths" in field_array.attrs: 777 ↛ 768line 777 didn't jump to line 768 because the condition on line 777 was always true

778 output_paths = field_array.attrs["openhcs_output_paths"] 

779 for filename in output_paths: 

780 filename_only = Path(filename).name 

781 if _matches_filters(filename_only): 781 ↛ 779line 781 didn't jump to line 779 because the condition on line 781 was always true

782 result.append(Path(filename)) 

783 else: 

784 # Legacy flat structure: get array keys directly 

785 array_keys = list(group.array_keys()) 

786 for array_key in array_keys: 

787 try: 

788 array = group[array_key] 

789 if "output_paths" in array.attrs: 

790 # Get original filenames from array attributes 

791 output_paths = array.attrs["output_paths"] 

792 for filename in output_paths: 

793 filename_only = Path(filename).name 

794 if _matches_filters(filename_only): 

795 result.append(Path(filename)) 

796 

797 except Exception as e: 

798 # Skip arrays that can't be accessed 

799 continue 

800 

801 except Exception as e: 

802 raise StorageResolutionError(f"Failed to list zarr arrays: {e}") from e 

803 

804 return result 

805 

806 def list_dir(self, path: Union[str, Path]) -> List[str]: 

807 store, relative_key = self._split_store_and_key(path) 

808 

809 # Normalize key for Zarr API 

810 key = relative_key.rstrip("/") 

811 

812 try: 

813 # Zarr 3.x uses async API - convert async generator to list 

814 import asyncio 

815 async def _get_entries(): 

816 entries = [] 

817 async for entry in store.list_dir(key): 

818 entries.append(entry) 

819 return entries 

820 return asyncio.run(_get_entries()) 

821 except KeyError: 

822 raise NotADirectoryError(f"Zarr path is not a directory: {path}") 

823 except FileNotFoundError: 

824 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

825 

826 

827 def delete(self, path: Union[str, Path]) -> None: 

828 """ 

829 Delete a Zarr array (file) or empty group (directory) at the given path. 

830 

831 Args: 

832 path: Zarr path or URI 

833 

834 Raises: 

835 FileNotFoundError: If path does not exist 

836 IsADirectoryError: If path is a non-empty group 

837 StorageResolutionError: For unexpected failures 

838 """ 

839 import zarr 

840 import shutil 

841 import os 

842 

843 # Passthrough to disk backend for text files (JSON, CSV, TXT) 

844 path_str = str(path) 

845 if path_str.endswith(('.json', '.csv', '.txt')): 

846 from openhcs.io.backend_registry import get_backend_instance 

847 disk_backend = get_backend_instance(Backend.DISK.value) 

848 return disk_backend.delete(path) 

849 

850 path = str(path) 

851 

852 if not os.path.exists(path): 

853 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

854 

855 try: 

856 zarr_obj = zarr.open(path, mode='r') 

857 except Exception as e: 

858 raise StorageResolutionError(f"Failed to open Zarr path: {path}") from e 

859 

860 # Determine if it's a file (array) or directory (group) 

861 if isinstance(zarr_obj, zarr.core.Array): 

862 try: 

863 shutil.rmtree(path) # Array folders can be deleted directly 

864 except Exception as e: 

865 raise StorageResolutionError(f"Failed to delete Zarr array: {path}") from e 

866 

867 elif isinstance(zarr_obj, zarr.hierarchy.Group): 

868 if os.listdir(path): 

869 raise IsADirectoryError(f"Zarr group is not empty: {path}") 

870 try: 

871 os.rmdir(path) 

872 except Exception as e: 

873 raise StorageResolutionError(f"Failed to delete empty Zarr group: {path}") from e 

874 else: 

875 raise StorageResolutionError(f"Unrecognized Zarr object type at: {path}") 

876 

877 def delete_all(self, path: Union[str, Path]) -> None: 

878 """ 

879 Recursively delete a Zarr array or group (file or directory). 

880 

881 This is the only permitted recursive deletion method for the Zarr backend. 

882 

883 Args: 

884 path: the path shared through all backnds 

885 

886 Raises: 

887 FileNotFoundError: If the path does not exist 

888 StorageResolutionError: If deletion fails 

889 """ 

890 import os 

891 import shutil 

892 

893 path = str(path) 

894 

895 if not os.path.exists(path): 

896 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

897 

898 try: 

899 shutil.rmtree(path) 

900 except Exception as e: 

901 raise StorageResolutionError(f"Failed to recursively delete Zarr path: {path}") from e 

902 

903 @passthrough_to_disk('.json', '.csv', '.txt') 

904 def exists(self, path: Union[str, Path]) -> bool: 

905 # Zarr-specific existence check (text files automatically passthrough to disk) 

906 path = Path(path) 

907 

908 # If path has no file extension, treat as directory existence check 

909 # This handles auto_detect_patterns asking "does this directory exist?" 

910 if not path.suffix: 910 ↛ 914line 910 didn't jump to line 914 because the condition on line 910 was always true

911 return path.exists() 

912 

913 # Otherwise, check zarr key existence (for actual files) 

914 store, key = self._split_store_and_key(path) 

915 

916 # First check if the zarr store itself exists 

917 if isinstance(store, str): 

918 store_path = Path(store) 

919 if not store_path.exists(): 

920 return False 

921 

922 try: 

923 root_group = zarr.group(store=store) 

924 return key in root_group or any(k.startswith(key.rstrip("/") + "/") for k in root_group.array_keys()) 

925 except Exception: 

926 # If we can't open the zarr store, it doesn't exist 

927 return False 

928 

929 def ensure_directory(self, directory: Union[str, Path]) -> Path: 

930 """ 

931 No-op for zarr backend - zarr stores handle their own structure. 

932 

933 Zarr doesn't have filesystem directories that need to be "ensured". 

934 Store creation and group structure is handled by save operations. 

935 """ 

936 return Path(directory) 

937 

938 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

939 store, src_key = self._split_store_and_key(source) 

940 store2, dst_key = self._split_store_and_key(link_name) 

941 

942 if store.root != store2.root: 

943 raise ValueError("Symlinks must exist within the same .zarr store") 

944 

945 group = zarr.group(store=store) 

946 if src_key not in group: 

947 raise FileNotFoundError(f"Source key '{src_key}' not found in Zarr store") 

948 

949 if dst_key in group: 

950 if not overwrite: 

951 raise FileExistsError(f"Symlink target already exists at: {dst_key}") 

952 # Remove existing entry if overwrite=True 

953 del group[dst_key] 

954 

955 # Create a new group at the symlink path 

956 link_group = group.require_group(dst_key) 

957 link_group.attrs["_symlink"] = src_key # Store as declared string 

958 

959 def is_symlink(self, path: Union[str, Path]) -> bool: 

960 """ 

961 Check if the given Zarr path represents a logical symlink (based on attribute contract). 

962 

963 Returns: 

964 bool: True if the key exists and has an OpenHCS-declared symlink attribute 

965 False if the key doesn't exist or is not a symlink 

966 """ 

967 store, key = self._split_store_and_key(path) 

968 group = zarr.group(store=store) 

969 

970 try: 

971 obj = group[key] 

972 attrs = getattr(obj, "attrs", {}) 

973 

974 if "_symlink" not in attrs: 

975 return False 

976 

977 # Enforce that the _symlink attr matches schema (e.g. str or list of path components) 

978 if not isinstance(attrs["_symlink"], str): 

979 raise StorageResolutionError(f"Invalid symlink format in Zarr attrs at: {path}") 

980 

981 return True 

982 except KeyError: 

983 # Key doesn't exist, so it's not a symlink 

984 return False 

985 except Exception as e: 

986 raise StorageResolutionError(f"Failed to inspect Zarr symlink at: {path}") from e 

987 

988 def _auto_chunks(self, data: Any, chunk_divisor: int = 1) -> Tuple[int, ...]: 

989 shape = data.shape 

990 

991 # Simple logic: 1/10th of each dim, with min 1 

992 return tuple(max(1, s // chunk_divisor) for s in shape) 

993 

994 def is_file(self, path: Union[str, Path]) -> bool: 

995 """ 

996 Check if a Zarr path points to a file (Zarr array), resolving both OS and Zarr-native symlinks. 

997 

998 Args: 

999 path: Zarr store path (may point to key within store) 

1000 

1001 Returns: 

1002 bool: True if resolved path is a Zarr array 

1003 

1004 Raises: 

1005 FileNotFoundError: If path does not exist or broken symlink 

1006 IsADirectoryError: If resolved object is a Zarr group 

1007 StorageResolutionError: For other failures 

1008 """ 

1009 path = str(path) 

1010 

1011 if not os.path.exists(path): 

1012 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

1013 

1014 try: 

1015 store, key = self._split_store_and_key(path) 

1016 group = zarr.group(store=store) 

1017 

1018 # Resolve symlinks (Zarr-native, via .attrs) 

1019 seen_keys = set() 

1020 while True: 

1021 if key not in group: 

1022 raise FileNotFoundError(f"Zarr key does not exist: {key}") 

1023 obj = group[key] 

1024 

1025 if hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

1026 if key in seen_keys: 

1027 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}") 

1028 seen_keys.add(key) 

1029 key = obj.attrs["_symlink"] 

1030 continue 

1031 break # resolution complete 

1032 

1033 # Now obj is the resolved target 

1034 if isinstance(obj, zarr.core.Array): 

1035 return True 

1036 elif isinstance(obj, zarr.hierarchy.Group): 

1037 raise IsADirectoryError(f"Zarr path is a group (directory): {path}") 

1038 else: 

1039 raise StorageResolutionError(f"Unknown Zarr object at: {path}") 

1040 

1041 except Exception as e: 

1042 raise StorageResolutionError(f"Failed to resolve Zarr file path: {path}") from e 

1043 

1044 def is_dir(self, path: Union[str, Path]) -> bool: 

1045 """ 

1046 Check if a Zarr path resolves to a directory (i.e., a Zarr group). 

1047  

1048 Resolves both OS-level symlinks and Zarr-native symlinks via .attrs['_symlink']. 

1049  

1050 Args: 

1051 path: Zarr path or URI 

1052  

1053 Returns: 

1054 bool: True if path resolves to a Zarr group 

1055  

1056 Raises: 

1057 FileNotFoundError: If path or resolved target does not exist 

1058 NotADirectoryError: If resolved target is not a group 

1059 StorageResolutionError: For symlink cycles or other failures 

1060 """ 

1061 import os 

1062 

1063 

1064 path = str(path) 

1065 

1066 if not os.path.exists(path): 

1067 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

1068 

1069 try: 

1070 store, key = self._split_store_and_key(path) 

1071 group = zarr.group(store=store) 

1072 

1073 seen_keys = set() 

1074 

1075 # Resolve symlink chain 

1076 while True: 

1077 if key not in group: 

1078 raise FileNotFoundError(f"Zarr key does not exist: {key}") 

1079 obj = group[key] 

1080 

1081 if hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

1082 if key in seen_keys: 

1083 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}") 

1084 seen_keys.add(key) 

1085 key = obj.attrs["_symlink"] 

1086 continue 

1087 break 

1088 

1089 # obj is resolved 

1090 if isinstance(obj, zarr.hierarchy.Group): 

1091 return True 

1092 elif isinstance(obj, zarr.core.Array): 

1093 raise NotADirectoryError(f"Zarr path is an array (file): {path}") 

1094 else: 

1095 raise StorageResolutionError(f"Unknown Zarr object at: {path}") 

1096 

1097 except Exception as e: 

1098 raise StorageResolutionError(f"Failed to resolve Zarr directory path: {path}") from e 

1099 

1100 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

1101 """ 

1102 Move a Zarr key or object (array/group) from one location to another, resolving symlinks. 

1103  

1104 Supports: 

1105 - Disk or memory stores 

1106 - Zarr-native symlinks 

1107 - Key renames within group 

1108 - Full copy+delete across stores if needed 

1109  

1110 Raises: 

1111 FileNotFoundError: If src does not exist 

1112 FileExistsError: If dst already exists 

1113 StorageResolutionError: On failure 

1114 """ 

1115 import zarr 

1116 

1117 src_store, src_key = self._split_store_and_key(src) 

1118 dst_store, dst_key = self._split_store_and_key(dst) 

1119 

1120 src_group = zarr.group(store=src_store) 

1121 dst_group = zarr.group(store=dst_store) 

1122 

1123 if src_key not in src_group: 

1124 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}") 

1125 if dst_key in dst_group: 

1126 raise FileExistsError(f"Zarr destination key already exists: {dst_key}") 

1127 

1128 obj = src_group[src_key] 

1129 

1130 # Resolve symlinks if present 

1131 seen_keys = set() 

1132 while hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

1133 if src_key in seen_keys: 

1134 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}") 

1135 seen_keys.add(src_key) 

1136 src_key = obj.attrs["_symlink"] 

1137 obj = src_group[src_key] 

1138 

1139 try: 

1140 if src_store is dst_store: 

1141 # Native move within the same Zarr group/store 

1142 src_group.move(src_key, dst_key) 

1143 else: 

1144 # Cross-store: perform manual copy + delete 

1145 obj.copy(dst_group, name=dst_key) 

1146 del src_group[src_key] 

1147 except Exception as e: 

1148 raise StorageResolutionError(f"Failed to move {src_key} to {dst_key}") from e 

1149 

1150 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

1151 """ 

1152 Copy a Zarr key or object (array/group) from one location to another. 

1153 

1154 - Resolves Zarr-native symlinks before copying 

1155 - Prevents overwrite unless explicitly allowed (future feature) 

1156 - Works across memory or disk stores 

1157 

1158 Raises: 

1159 FileNotFoundError: If src does not exist 

1160 FileExistsError: If dst already exists 

1161 StorageResolutionError: On failure 

1162 """ 

1163 import zarr 

1164 

1165 src_store, src_key = self._split_store_and_key(src) 

1166 dst_store, dst_key = self._split_store_and_key(dst) 

1167 

1168 src_group = zarr.group(store=src_store) 

1169 dst_group = zarr.group(store=dst_store) 

1170 

1171 if src_key not in src_group: 

1172 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}") 

1173 if dst_key in dst_group: 

1174 raise FileExistsError(f"Zarr destination key already exists: {dst_key}") 

1175 

1176 obj = src_group[src_key] 

1177 

1178 seen_keys = set() 

1179 while hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

1180 if src_key in seen_keys: 

1181 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}") 

1182 seen_keys.add(src_key) 

1183 src_key = obj.attrs["_symlink"] 

1184 obj = src_group[src_key] 

1185 

1186 try: 

1187 obj.copy(dst_group, name=dst_key) 

1188 except Exception as e: 

1189 raise StorageResolutionError(f"Failed to copy {src_key} to {dst_key}") from e 

1190 

1191 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

1192 """ 

1193 Return structural metadata about a Zarr path. 

1194 

1195 Returns: 

1196 dict with keys: 

1197 - 'type': 'file', 'directory', 'symlink', or 'missing' 

1198 - 'key': final resolved key 

1199 - 'target': symlink target if applicable 

1200 - 'store': repr(store) 

1201 - 'exists': bool 

1202 

1203 Raises: 

1204 StorageResolutionError: On resolution failure 

1205 """ 

1206 store, key = self._split_store_and_key(path) 

1207 group = zarr.group(store=store) 

1208 

1209 try: 

1210 if key in group: 

1211 obj = group[key] 

1212 attrs = getattr(obj, "attrs", {}) 

1213 is_link = "_symlink" in attrs 

1214 

1215 if is_link: 

1216 target = attrs["_symlink"] 

1217 if not isinstance(target, str): 

1218 raise StorageResolutionError(f"Invalid symlink format at {key}") 

1219 return { 

1220 "type": "symlink", 

1221 "key": key, 

1222 "target": target, 

1223 "store": repr(store), 

1224 "exists": target in group 

1225 } 

1226 

1227 if isinstance(obj, zarr.Array): 

1228 return { 

1229 "type": "file", 

1230 "key": key, 

1231 "store": repr(store), 

1232 "exists": True 

1233 } 

1234 

1235 elif isinstance(obj, zarr.Group): 

1236 return { 

1237 "type": "directory", 

1238 "key": key, 

1239 "store": repr(store), 

1240 "exists": True 

1241 } 

1242 

1243 raise StorageResolutionError(f"Unknown object type at: {key}") 

1244 else: 

1245 return { 

1246 "type": "missing", 

1247 "key": key, 

1248 "store": repr(store), 

1249 "exists": False 

1250 } 

1251 

1252 except Exception as e: 

1253 raise StorageResolutionError(f"Failed to stat Zarr key {key}") from e 

1254 

1255class ZarrSymlink: 

1256 """ 

1257 Represents a symbolic link in a Zarr store. 

1258 

1259 This class is used to represent symbolic links in a Zarr store. 

1260 It stores the target path of the symlink. 

1261 """ 

1262 def __init__(self, target: str): 

1263 self.target = target 

1264 

1265 def __repr__(self): 

1266 return f"<ZarrSymlink → {self.target}>"