Coverage for openhcs/io/zarr.py: 44.3%

509 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1# openhcs/io/storage/backends/zarr.py 

2""" 

3Zarr storage backend module for OpenHCS. 

4 

5This module provides a Zarr-backed implementation of the MicroscopyStorageBackend interface. 

6It stores data in a Zarr store on disk and supports overlay operations 

7for materializing data to disk when needed. 

8""" 

9 

10import fnmatch 

11import logging 

12from pathlib import Path 

13from typing import Any, Dict, List, Literal, Optional, Set, Tuple, Union 

14 

15import numpy as np 

16import zarr 

17 

18try: 

19 from ome_zarr.writer import write_image, write_plate_metadata, write_well_metadata 

20 from ome_zarr.io import parse_url 

21 OME_ZARR_AVAILABLE = True 

22except ImportError: 

23 OME_ZARR_AVAILABLE = False 

24 

25from openhcs.constants.constants import Backend, DEFAULT_IMAGE_EXTENSIONS 

26from openhcs.io.base import StorageBackend 

27from openhcs.io.exceptions import StorageResolutionError 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32class ZarrStorageBackend(StorageBackend): 

33 """ 

34 Zarr storage backend implementation with configurable compression. 

35 

36 This class provides a concrete implementation of the storage backend interfaces 

37 for Zarr storage. It stores data in a Zarr store on disk with configurable 

38 compression algorithms and settings. 

39 

40 Features: 

41 - Single-chunk batch operations for 40x performance improvement 

42 - Configurable compression (Blosc, Zlib, LZ4, Zstd, or none) 

43 - Configurable compression levels 

44 - Full path mapping for batch operations 

45 """ 

46 

47 def __init__(self, zarr_config: Optional['ZarrConfig'] = None): 

48 """ 

49 Initialize Zarr backend with ZarrConfig. 

50 

51 Args: 

52 zarr_config: ZarrConfig dataclass with all zarr settings (uses defaults if None) 

53 """ 

54 # Import here to avoid circular imports 

55 from openhcs.core.config import ZarrConfig 

56 

57 if zarr_config is None: 

58 zarr_config = ZarrConfig() 

59 

60 self.config = zarr_config 

61 

62 # Convenience attributes 

63 self.compression_level = zarr_config.compression_level 

64 

65 # Create actual compressor from config 

66 self.compressor = self.config.compressor.create_compressor( 

67 self.config.compression_level, 

68 self.config.shuffle 

69 ) 

70 

71 def _get_compressor(self) -> Optional[Any]: 

72 """ 

73 Get the configured compressor with appropriate settings. 

74 

75 Returns: 

76 Configured compressor instance or None for no compression 

77 """ 

78 if self.compressor is None: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 return None 

80 

81 # If compression_level is specified and compressor supports it 

82 if self.compression_level is not None: 82 ↛ 96line 82 didn't jump to line 96 because the condition on line 82 was always true

83 # Check if compressor has level parameter 

84 if hasattr(self.compressor, '__class__'): 84 ↛ 96line 84 didn't jump to line 96 because the condition on line 84 was always true

85 try: 

86 # Create new instance with compression level 

87 compressor_class = self.compressor.__class__ 

88 if 'level' in compressor_class.__init__.__code__.co_varnames: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 return compressor_class(level=self.compression_level) 

90 elif 'clevel' in compressor_class.__init__.__code__.co_varnames: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 return compressor_class(clevel=self.compression_level) 

92 except (AttributeError, TypeError): 

93 # Fall back to original compressor if level setting fails 

94 pass 

95 

96 return self.compressor 

97 

98 

99 

100 def _split_store_and_key(self, path: Union[str, Path]) -> Tuple[Any, str]: 

101 """ 

102 Split path into zarr store and key without auto-injection. 

103 Path planner now provides the complete storage path. 

104 

105 Maps paths to zarr store and key: 

106 - Directory: "/path/to/plate/images.zarr" → Store: "/path/to/plate/images.zarr", Key: "" 

107 - File: "/path/to/plate/images.zarr/A01.tif" → Store: "/path/to/plate/images.zarr", Key: "A01.tif" 

108 

109 Returns a DirectoryStore with dimension_separator='/' for OME-ZARR compatibility. 

110 """ 

111 path = Path(path) 

112 

113 # If path has no extension or ends with .zarr, treat as directory (zarr store) 

114 if not path.suffix or path.suffix == '.zarr': 

115 # Directory path - treat as zarr store 

116 store_path = path 

117 relative_key = "" 

118 else: 

119 # File path - parent is zarr store, filename is key 

120 store_path = path.parent 

121 relative_key = path.name 

122 

123 # CRITICAL: Create DirectoryStore with dimension_separator='/' for OME-ZARR compatibility 

124 # This ensures chunk paths use '/' instead of '.' (e.g., '0/0/0' not '0.0.0') 

125 store = zarr.DirectoryStore(str(store_path), dimension_separator='/') 

126 return store, relative_key 

127 

128 def save(self, data: Any, output_path: Union[str, Path], **kwargs): 

129 """ 

130 Save data to Zarr at the given output_path. 

131 

132 Will only write if the key does not already exist. 

133 Will NOT overwrite or delete existing data. 

134 

135 Raises: 

136 FileExistsError: If destination key already exists 

137 StorageResolutionError: If creation fails 

138 """ 

139 store, key = self._split_store_and_key(output_path) 

140 group = zarr.group(store=store) 

141 

142 if key in group: 

143 raise FileExistsError(f"Zarr key already exists: {output_path}") 

144 

145 chunks = kwargs.get("chunks") 

146 if chunks is None: 

147 chunks = self._auto_chunks(data, chunk_divisor=kwargs.get("chunk_divisor", 1)) 

148 

149 try: 

150 # Create array with correct shape and dtype, then assign data 

151 array = group.create_dataset( 

152 name=key, 

153 shape=data.shape, 

154 dtype=data.dtype, 

155 chunks=chunks, 

156 compressor=kwargs.get("compressor", self._get_compressor()), 

157 overwrite=False # 🔒 Must be False by doctrine 

158 ) 

159 array[:] = data 

160 except Exception as e: 

161 raise StorageResolutionError(f"Failed to save to Zarr: {output_path}") from e 

162 

163 def load_batch(self, file_paths: List[Union[str, Path]], **kwargs) -> List[Any]: 

164 """ 

165 Load from zarr array using filename mapping. 

166 

167 Args: 

168 file_paths: List of file paths to load 

169 **kwargs: Additional arguments (zarr_config not needed) 

170 

171 Returns: 

172 List of loaded data objects in same order as file_paths 

173 

174 Raises: 

175 FileNotFoundError: If expected zarr store not found 

176 KeyError: If filename not found in filename_map 

177 """ 

178 if not file_paths: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 return [] 

180 

181 # Use _split_store_and_key to get store path from first file path 

182 store, _ = self._split_store_and_key(file_paths[0]) 

183 store_path = Path(store.path) 

184 

185 # FAIL LOUD: Store must exist 

186 if not store_path.exists(): 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 raise FileNotFoundError(f"Expected zarr store not found: {store_path}") 

188 root = zarr.open_group(store=store, mode='r') 

189 

190 # Group files by well based on OME-ZARR structure 

191 well_to_files = {} 

192 well_to_indices = {} 

193 

194 # Search OME-ZARR structure for requested files 

195 for row_name in root.group_keys(): 

196 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 196 ↛ 195line 196 didn't jump to line 195 because the condition on line 196 was always true

197 row_group = root[row_name] 

198 for col_name in row_group.group_keys(): 

199 if col_name.isdigit(): # Column directory (01, 02, etc.) 199 ↛ 198line 199 didn't jump to line 198 because the condition on line 199 was always true

200 well_group = row_group[col_name] 

201 well_name = f"{row_name}{col_name}" 

202 

203 # Check if this well has our filename mapping in the field array 

204 if "0" in well_group.group_keys(): 204 ↛ 198line 204 didn't jump to line 198 because the condition on line 204 was always true

205 field_group = well_group["0"] 

206 if "0" in field_group.array_keys(): 206 ↛ 198line 206 didn't jump to line 198 because the condition on line 206 was always true

207 field_array = field_group["0"] 

208 if "openhcs_filename_map" in field_array.attrs: 208 ↛ 198line 208 didn't jump to line 198 because the condition on line 208 was always true

209 filename_map = dict(field_array.attrs["openhcs_filename_map"]) 

210 

211 # Check which requested files are in this well 

212 for i, path in enumerate(file_paths): 

213 filename = Path(path).name # Use filename only for matching 

214 if filename in filename_map: 

215 if well_name not in well_to_files: 

216 well_to_files[well_name] = [] 

217 well_to_indices[well_name] = [] 

218 well_to_files[well_name].append(i) # Original position in file_paths 

219 well_to_indices[well_name].append(filename_map[filename]) # 5D coordinates (field, channel, z) 

220 

221 # Load data from each well using single well chunk 

222 results = [None] * len(file_paths) # Pre-allocate results array 

223 

224 for well_name, file_positions in well_to_files.items(): 

225 row, col = well_name[0], well_name[1:] 

226 well_group = root[row][col] 

227 well_indices = well_to_indices[well_name] 

228 

229 # Load entire well field array in single operation (well chunking) 

230 field_group = well_group["0"] 

231 field_array = field_group["0"] 

232 all_well_data = field_array[:] # Single I/O operation for entire well 

233 

234 # Extract requested 2D slices using 5D coordinates 

235 for file_pos, coords_5d in zip(file_positions, well_indices): 

236 field_idx, channel_idx, z_idx = coords_5d 

237 # Extract 2D slice: (field, channel, z, y, x) -> (y, x) 

238 results[file_pos] = all_well_data[field_idx, channel_idx, z_idx, :, :] # 2D slice 

239 

240 logger.debug(f"Loaded {len(file_paths)} images from zarr store at {store_path} from {len(well_to_files)} wells") 

241 return results 

242 

243 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]], **kwargs) -> None: 

244 """Save multiple images using ome-zarr-py for proper OME-ZARR compliance with multi-dimensional support. 

245 

246 Args: 

247 data_list: List of image data to save 

248 output_paths: List of output file paths 

249 **kwargs: Must include chunk_name, n_channels, n_z, n_fields, row, col 

250 """ 

251 

252 # Extract required parameters from kwargs 

253 chunk_name = kwargs.get('chunk_name') 

254 n_channels = kwargs.get('n_channels') 

255 n_z = kwargs.get('n_z') 

256 n_fields = kwargs.get('n_fields') 

257 row = kwargs.get('row') 

258 col = kwargs.get('col') 

259 

260 # Validate required parameters 

261 if chunk_name is None: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 raise ValueError("chunk_name must be provided") 

263 if n_channels is None: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 raise ValueError("n_channels must be provided") 

265 if n_z is None: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 raise ValueError("n_z must be provided") 

267 if n_fields is None: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 raise ValueError("n_fields must be provided") 

269 if row is None: 269 ↛ 270line 269 didn't jump to line 270 because the condition on line 269 was never true

270 raise ValueError("row must be provided") 

271 if col is None: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 raise ValueError("col must be provided") 

273 

274 if not data_list: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 logger.warning(f"Empty data list for chunk {chunk_name}") 

276 return 

277 

278 if not OME_ZARR_AVAILABLE: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 raise ImportError("ome-zarr package is required. Install with: pip install ome-zarr") 

280 

281 # Use _split_store_and_key to get store path from first output path 

282 store, _ = self._split_store_and_key(output_paths[0]) 

283 store_path = Path(store.path) 

284 

285 logger.debug(f"Saving batch for chunk {chunk_name} with {len(data_list)} images to row={row}, col={col}") 

286 

287 # Convert GPU arrays to CPU arrays before saving 

288 cpu_data_list = [] 

289 for data in data_list: 

290 if hasattr(data, 'get'): # CuPy array 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 cpu_data_list.append(data.get()) 

292 elif hasattr(data, 'cpu'): # PyTorch tensor 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 cpu_data_list.append(data.cpu().numpy()) 

294 elif hasattr(data, 'device') and 'cuda' in str(data.device).lower(): # JAX on GPU 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true

295 import jax 

296 cpu_data_list.append(jax.device_get(data)) 

297 else: # Already CPU array (NumPy, etc.) 

298 cpu_data_list.append(data) 

299 

300 # Ensure parent directory exists 

301 store_path.parent.mkdir(parents=True, exist_ok=True) 

302 

303 # Use _split_store_and_key to get properly configured store with dimension_separator='/' 

304 store, _ = self._split_store_and_key(store_path) 

305 root = zarr.group(store=store) # Open existing or create new group without mode conflicts 

306 

307 # Set OME metadata if not already present 

308 if "ome" not in root.attrs: 

309 root.attrs["ome"] = {"version": "0.4"} 

310 

311 # Get the store for compatibility with existing code 

312 store = root.store 

313 

314 # Write plate metadata with locking to prevent concurrent corruption (if enabled) 

315 should_write_plate_metadata = kwargs.get('write_plate_metadata', self.config.write_plate_metadata) 

316 if should_write_plate_metadata: 316 ↛ 321line 316 didn't jump to line 321 because the condition on line 316 was always true

317 self._ensure_plate_metadata_with_lock(root, row, col, store_path) 

318 

319 # Create HCS-compliant structure: plate/row/col/field/resolution 

320 # Create row group if it doesn't exist 

321 if row not in root: 

322 row_group = root.create_group(row) 

323 else: 

324 row_group = root[row] 

325 

326 # Create well group (remove existing if present to allow overwrite) 

327 if col in row_group: 

328 del row_group[col] 

329 well_group = row_group.create_group(col) 

330 

331 # Add HCS well metadata 

332 well_metadata = { 

333 "images": [ 

334 { 

335 "path": "0", # Single image containing all fields 

336 "acquisition": 0 

337 } 

338 ], 

339 "version": "0.5" 

340 } 

341 well_group.attrs["ome"] = {"version": "0.5", "well": well_metadata} 

342 

343 # Create field group (single field "0" containing all field data) 

344 field_group = well_group.require_group("0") 

345 

346 # Always use full 5D structure: (fields, channels, z, y, x) 

347 # Define OME-NGFF compliant axes 

348 axes = [ 

349 {'name': 'field', 'type': 'field'}, # Custom field type - allowed before space 

350 {'name': 'c', 'type': 'channel'}, 

351 {'name': 'z', 'type': 'space'}, 

352 {'name': 'y', 'type': 'space'}, 

353 {'name': 'x', 'type': 'space'} 

354 ] 

355 

356 # Get image dimensions 

357 sample_image = cpu_data_list[0] 

358 height, width = sample_image.shape[-2:] 

359 

360 # Always reshape to full 5D: (n_fields, n_channels, n_z, y, x) 

361 target_shape = [n_fields, n_channels, n_z, height, width] 

362 

363 # Stack and reshape data 

364 stacked_data = np.stack(cpu_data_list, axis=0) 

365 

366 # Calculate total expected images for validation 

367 total_expected = n_fields * n_channels * n_z 

368 if len(data_list) != total_expected: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 logger.warning(f"Data count mismatch: got {len(data_list)}, expected {total_expected} " 

370 f"(fields={n_fields}, channels={n_channels}, z={n_z})") 

371 

372 # Always reshape to 5D structure 

373 reshaped_data = stacked_data.reshape(target_shape) 

374 

375 logger.info(f"Zarr save_batch: {len(data_list)} images → {stacked_data.shape}{reshaped_data.shape}") 

376 axes_names = [ax['name'] for ax in axes] 

377 logger.info(f"Dimensions: fields={n_fields}, channels={n_channels}, z={n_z}, axes={''.join(axes_names)}") 

378 

379 # Create field group (single field "0" containing all field data) 

380 if "0" in well_group: 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true

381 field_group = well_group["0"] 

382 else: 

383 field_group = well_group.create_group("0") 

384 

385 # Write OME-ZARR well metadata with single field (well-chunked approach) 

386 write_well_metadata(well_group, ['0']) 

387 

388 # Use single chunk approach for optimal performance 

389 storage_options = { 

390 "chunks": reshaped_data.shape, # Single chunk for entire well 

391 "compressor": self._get_compressor() 

392 } 

393 

394 # Write as single well-chunked array with proper multi-dimensional axes 

395 write_image( 

396 image=reshaped_data, 

397 group=field_group, 

398 axes=axes, 

399 storage_options=storage_options, 

400 scaler=None, # Single scale only for performance 

401 compute=True 

402 ) 

403 

404 # Axes are already correctly set by write_image function 

405 

406 # Store filename mapping with 5D coordinates (field, channel, z, y, x) 

407 # Convert flat index to 5D coordinates for proper zarr slicing 

408 filename_map = {} 

409 for i, path in enumerate(output_paths): 

410 # Calculate 5D coordinates from flat index 

411 field_idx = i // (n_channels * n_z) 

412 remaining = i % (n_channels * n_z) 

413 channel_idx = remaining // n_z 

414 z_idx = remaining % n_z 

415 

416 # Store as tuple (field, channel, z) - y,x are full slices 

417 filename_map[Path(path).name] = (field_idx, channel_idx, z_idx) 

418 

419 field_array = field_group['0'] 

420 field_array.attrs["openhcs_filename_map"] = filename_map 

421 field_array.attrs["openhcs_output_paths"] = [str(path) for path in output_paths] 

422 field_array.attrs["openhcs_dimensions"] = { 

423 "n_fields": n_fields, 

424 "n_channels": n_channels, 

425 "n_z": n_z 

426 } 

427 

428 logger.debug(f"Successfully saved batch for chunk {chunk_name}") 

429 

430 # Aggressive memory cleanup 

431 del cpu_data_list 

432 import gc 

433 gc.collect() 

434 

435 def _ensure_plate_metadata_with_lock(self, root: zarr.Group, row: str, col: str, store_path: Path) -> None: 

436 """Ensure plate-level metadata includes ALL existing wells with file locking.""" 

437 import fcntl 

438 

439 lock_path = store_path.with_suffix('.metadata.lock') 

440 

441 try: 

442 with open(lock_path, 'w') as lock_file: 

443 fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) 

444 self._ensure_plate_metadata(root, row, col) 

445 except Exception as e: 

446 logger.error(f"Failed to update plate metadata with lock: {e}") 

447 raise 

448 finally: 

449 if lock_path.exists(): 449 ↛ exitline 449 didn't return from function '_ensure_plate_metadata_with_lock' because the condition on line 449 was always true

450 lock_path.unlink() 

451 

452 def _ensure_plate_metadata(self, root: zarr.Group, row: str, col: str) -> None: 

453 """Ensure plate-level metadata includes ALL existing wells in the store.""" 

454 

455 # Scan the store for all existing wells 

456 all_rows = set() 

457 all_cols = set() 

458 all_wells = [] 

459 

460 for row_name in root.group_keys(): 

461 if isinstance(root[row_name], zarr.Group): # Ensure it's a row group 461 ↛ 460line 461 didn't jump to line 460 because the condition on line 461 was always true

462 row_group = root[row_name] 

463 all_rows.add(row_name) 

464 

465 for col_name in row_group.group_keys(): 

466 if isinstance(row_group[col_name], zarr.Group): # Ensure it's a well group 466 ↛ 465line 466 didn't jump to line 465 because the condition on line 466 was always true

467 all_cols.add(col_name) 

468 well_path = f"{row_name}/{col_name}" 

469 all_wells.append(well_path) 

470 

471 # Include the current well being added (might not exist yet) 

472 all_rows.add(row) 

473 all_cols.add(col) 

474 current_well_path = f"{row}/{col}" 

475 if current_well_path not in all_wells: 

476 all_wells.append(current_well_path) 

477 

478 # Sort for consistent ordering 

479 sorted_rows = sorted(all_rows) 

480 sorted_cols = sorted(all_cols) 

481 sorted_wells = sorted(all_wells) 

482 

483 # Build wells metadata with proper indices 

484 wells_metadata = [] 

485 for well_path in sorted_wells: 

486 well_row, well_col = well_path.split("/") 

487 row_index = sorted_rows.index(well_row) 

488 col_index = sorted_cols.index(well_col) 

489 wells_metadata.append({ 

490 "path": well_path, 

491 "rowIndex": row_index, 

492 "columnIndex": col_index 

493 }) 

494 

495 # Add acquisition metadata for HCS compliance 

496 acquisitions = [ 

497 { 

498 "id": 0, 

499 "name": "default_acquisition", 

500 "maximumfieldcount": 1 # Single field containing all field data 

501 } 

502 ] 

503 

504 # Write complete HCS plate metadata 

505 write_plate_metadata( 

506 root, 

507 sorted_rows, 

508 sorted_cols, 

509 wells_metadata, 

510 acquisitions=acquisitions, 

511 field_count=1, 

512 name="OpenHCS_Plate" 

513 ) 

514 

515 

516 

517 

518 

519 

520 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

521 store, key = self._split_store_and_key(file_path) 

522 group = zarr.group(store=store) 

523 

524 visited = set() 

525 while self.is_symlink(key): 

526 if key in visited: 

527 raise RuntimeError(f"Zarr symlink loop detected at {key}") 

528 visited.add(key) 

529 key = group[key].attrs["_symlink"] 

530 

531 if key not in group: 

532 raise FileNotFoundError(f"No array found at key '{key}'") 

533 return group[key][:] 

534 

535 def list_files(self, 

536 directory: Union[str, Path], 

537 pattern: Optional[str] = None, 

538 extensions: Optional[Set[str]] = None, 

539 recursive: bool = False) -> List[Path]: 

540 """ 

541 List all file-like entries (i.e. arrays) in a Zarr store, optionally filtered. 

542 Returns filenames from array attributes (output_paths) if available. 

543 """ 

544 

545 store, relative_key = self._split_store_and_key(directory) 

546 result: List[Path] = [] 

547 

548 def _matches_filters(name: str) -> bool: 

549 if pattern and not fnmatch.fnmatch(name, pattern): 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true

550 return False 

551 if extensions: 551 ↛ 553line 551 didn't jump to line 553 because the condition on line 551 was always true

552 return any(name.lower().endswith(ext.lower()) for ext in extensions) 

553 return True 

554 

555 try: 

556 # Open zarr group and traverse OME-ZARR structure 

557 group = zarr.open_group(store=store) 

558 

559 # Check if this is OME-ZARR structure (has plate metadata) 

560 if "plate" in group.attrs: 560 ↛ 582line 560 didn't jump to line 582 because the condition on line 560 was always true

561 # OME-ZARR structure: traverse A/01/ wells 

562 for row_name in group.group_keys(): 

563 if len(row_name) == 1 and row_name.isalpha(): # Row directory (A, B, etc.) 563 ↛ 562line 563 didn't jump to line 562 because the condition on line 563 was always true

564 row_group = group[row_name] 

565 for col_name in row_group.group_keys(): 

566 if col_name.isdigit(): # Column directory (01, 02, etc.) 566 ↛ 565line 566 didn't jump to line 565 because the condition on line 566 was always true

567 well_group = row_group[col_name] 

568 

569 # Get filenames from field array metadata 

570 if "0" in well_group.group_keys(): 570 ↛ 565line 570 didn't jump to line 565 because the condition on line 570 was always true

571 field_group = well_group["0"] 

572 if "0" in field_group.array_keys(): 572 ↛ 565line 572 didn't jump to line 565 because the condition on line 572 was always true

573 field_array = field_group["0"] 

574 if "openhcs_output_paths" in field_array.attrs: 574 ↛ 565line 574 didn't jump to line 565 because the condition on line 574 was always true

575 output_paths = field_array.attrs["openhcs_output_paths"] 

576 for filename in output_paths: 

577 filename_only = Path(filename).name 

578 if _matches_filters(filename_only): 578 ↛ 576line 578 didn't jump to line 576 because the condition on line 578 was always true

579 result.append(Path(filename)) 

580 else: 

581 # Legacy flat structure: get array keys directly 

582 array_keys = list(group.array_keys()) 

583 for array_key in array_keys: 

584 try: 

585 array = group[array_key] 

586 if "output_paths" in array.attrs: 

587 # Get original filenames from array attributes 

588 output_paths = array.attrs["output_paths"] 

589 for filename in output_paths: 

590 filename_only = Path(filename).name 

591 if _matches_filters(filename_only): 

592 result.append(Path(filename)) 

593 

594 except Exception as e: 

595 # Skip arrays that can't be accessed 

596 continue 

597 

598 except Exception as e: 

599 raise StorageResolutionError(f"Failed to list zarr arrays: {e}") from e 

600 

601 return result 

602 

603 def list_dir(self, path: Union[str, Path]) -> List[str]: 

604 store, relative_key = self._split_store_and_key(path) 

605 

606 # Normalize key for Zarr API 

607 key = relative_key.rstrip("/") 

608 

609 try: 

610 # Zarr 3.x uses async API - convert async generator to list 

611 import asyncio 

612 async def _get_entries(): 

613 entries = [] 

614 async for entry in store.list_dir(key): 

615 entries.append(entry) 

616 return entries 

617 return asyncio.run(_get_entries()) 

618 except KeyError: 

619 raise NotADirectoryError(f"Zarr path is not a directory: {path}") 

620 except FileNotFoundError: 

621 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

622 

623 

624 def delete(self, path: Union[str, Path]) -> None: 

625 """ 

626 Delete a Zarr array (file) or empty group (directory) at the given path. 

627 

628 Args: 

629 path: Zarr path or URI 

630 

631 Raises: 

632 FileNotFoundError: If path does not exist 

633 IsADirectoryError: If path is a non-empty group 

634 StorageResolutionError: For unexpected failures 

635 """ 

636 import zarr 

637 import shutil 

638 import os 

639 

640 path = str(path) 

641 

642 if not os.path.exists(path): 

643 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

644 

645 try: 

646 zarr_obj = zarr.open(path, mode='r') 

647 except Exception as e: 

648 raise StorageResolutionError(f"Failed to open Zarr path: {path}") from e 

649 

650 # Determine if it's a file (array) or directory (group) 

651 if isinstance(zarr_obj, zarr.core.Array): 

652 try: 

653 shutil.rmtree(path) # Array folders can be deleted directly 

654 except Exception as e: 

655 raise StorageResolutionError(f"Failed to delete Zarr array: {path}") from e 

656 

657 elif isinstance(zarr_obj, zarr.hierarchy.Group): 

658 if os.listdir(path): 

659 raise IsADirectoryError(f"Zarr group is not empty: {path}") 

660 try: 

661 os.rmdir(path) 

662 except Exception as e: 

663 raise StorageResolutionError(f"Failed to delete empty Zarr group: {path}") from e 

664 else: 

665 raise StorageResolutionError(f"Unrecognized Zarr object type at: {path}") 

666 

667 def delete_all(self, path: Union[str, Path]) -> None: 

668 """ 

669 Recursively delete a Zarr array or group (file or directory). 

670 

671 This is the only permitted recursive deletion method for the Zarr backend. 

672 

673 Args: 

674 path: the path shared through all backnds 

675 

676 Raises: 

677 FileNotFoundError: If the path does not exist 

678 StorageResolutionError: If deletion fails 

679 """ 

680 import os 

681 import shutil 

682 

683 path = str(path) 

684 

685 if not os.path.exists(path): 

686 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

687 

688 try: 

689 shutil.rmtree(path) 

690 except Exception as e: 

691 raise StorageResolutionError(f"Failed to recursively delete Zarr path: {path}") from e 

692 

693 def exists(self, path: Union[str, Path]) -> bool: 

694 path = Path(path) 

695 

696 # If path has no file extension, treat as directory existence check 

697 # This handles auto_detect_patterns asking "does this directory exist?" 

698 if not path.suffix: 698 ↛ 702line 698 didn't jump to line 702 because the condition on line 698 was always true

699 return path.exists() 

700 

701 # Otherwise, check zarr key existence (for actual files) 

702 store, key = self._split_store_and_key(path) 

703 

704 # First check if the zarr store itself exists 

705 if isinstance(store, str): 

706 store_path = Path(store) 

707 if not store_path.exists(): 

708 return False 

709 

710 try: 

711 root_group = zarr.group(store=store) 

712 return key in root_group or any(k.startswith(key.rstrip("/") + "/") for k in root_group.array_keys()) 

713 except Exception: 

714 # If we can't open the zarr store, it doesn't exist 

715 return False 

716 

717 def ensure_directory(self, directory: Union[str, Path]) -> Path: 

718 """ 

719 No-op for zarr backend - zarr stores handle their own structure. 

720 

721 Zarr doesn't have filesystem directories that need to be "ensured". 

722 Store creation and group structure is handled by save operations. 

723 """ 

724 return Path(directory) 

725 

726 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

727 store, src_key = self._split_store_and_key(source) 

728 store2, dst_key = self._split_store_and_key(link_name) 

729 

730 if store.root != store2.root: 

731 raise ValueError("Symlinks must exist within the same .zarr store") 

732 

733 group = zarr.group(store=store) 

734 if src_key not in group: 

735 raise FileNotFoundError(f"Source key '{src_key}' not found in Zarr store") 

736 

737 if dst_key in group: 

738 if not overwrite: 

739 raise FileExistsError(f"Symlink target already exists at: {dst_key}") 

740 # Remove existing entry if overwrite=True 

741 del group[dst_key] 

742 

743 # Create a new group at the symlink path 

744 link_group = group.require_group(dst_key) 

745 link_group.attrs["_symlink"] = src_key # Store as declared string 

746 

747 def is_symlink(self, path: Union[str, Path]) -> bool: 

748 """ 

749 Check if the given Zarr path represents a logical symlink (based on attribute contract). 

750 

751 Returns: 

752 bool: True if the key exists and has an OpenHCS-declared symlink attribute 

753 False if the key doesn't exist or is not a symlink 

754 """ 

755 store, key = self._split_store_and_key(path) 

756 group = zarr.group(store=store) 

757 

758 try: 

759 obj = group[key] 

760 attrs = getattr(obj, "attrs", {}) 

761 

762 if "_symlink" not in attrs: 

763 return False 

764 

765 # Enforce that the _symlink attr matches schema (e.g. str or list of path components) 

766 if not isinstance(attrs["_symlink"], str): 

767 raise StorageResolutionError(f"Invalid symlink format in Zarr attrs at: {path}") 

768 

769 return True 

770 except KeyError: 

771 # Key doesn't exist, so it's not a symlink 

772 return False 

773 except Exception as e: 

774 raise StorageResolutionError(f"Failed to inspect Zarr symlink at: {path}") from e 

775 

776 def _auto_chunks(self, data: Any, chunk_divisor: int = 1) -> Tuple[int, ...]: 

777 shape = data.shape 

778 

779 # Simple logic: 1/10th of each dim, with min 1 

780 return tuple(max(1, s // chunk_divisor) for s in shape) 

781 

782 def is_file(self, path: Union[str, Path]) -> bool: 

783 """ 

784 Check if a Zarr path points to a file (Zarr array), resolving both OS and Zarr-native symlinks. 

785 

786 Args: 

787 path: Zarr store path (may point to key within store) 

788 

789 Returns: 

790 bool: True if resolved path is a Zarr array 

791 

792 Raises: 

793 FileNotFoundError: If path does not exist or broken symlink 

794 IsADirectoryError: If resolved object is a Zarr group 

795 StorageResolutionError: For other failures 

796 """ 

797 path = str(path) 

798 

799 if not os.path.exists(path): 

800 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

801 

802 try: 

803 store, key = self._split_store_and_key(path) 

804 group = zarr.group(store=store) 

805 

806 # Resolve symlinks (Zarr-native, via .attrs) 

807 seen_keys = set() 

808 while True: 

809 if key not in group: 

810 raise FileNotFoundError(f"Zarr key does not exist: {key}") 

811 obj = group[key] 

812 

813 if hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

814 if key in seen_keys: 

815 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}") 

816 seen_keys.add(key) 

817 key = obj.attrs["_symlink"] 

818 continue 

819 break # resolution complete 

820 

821 # Now obj is the resolved target 

822 if isinstance(obj, zarr.core.Array): 

823 return True 

824 elif isinstance(obj, zarr.hierarchy.Group): 

825 raise IsADirectoryError(f"Zarr path is a group (directory): {path}") 

826 else: 

827 raise StorageResolutionError(f"Unknown Zarr object at: {path}") 

828 

829 except Exception as e: 

830 raise StorageResolutionError(f"Failed to resolve Zarr file path: {path}") from e 

831 

832 def is_dir(self, path: Union[str, Path]) -> bool: 

833 """ 

834 Check if a Zarr path resolves to a directory (i.e., a Zarr group). 

835  

836 Resolves both OS-level symlinks and Zarr-native symlinks via .attrs['_symlink']. 

837  

838 Args: 

839 path: Zarr path or URI 

840  

841 Returns: 

842 bool: True if path resolves to a Zarr group 

843  

844 Raises: 

845 FileNotFoundError: If path or resolved target does not exist 

846 NotADirectoryError: If resolved target is not a group 

847 StorageResolutionError: For symlink cycles or other failures 

848 """ 

849 import os 

850 

851 

852 path = str(path) 

853 

854 if not os.path.exists(path): 

855 raise FileNotFoundError(f"Zarr path does not exist: {path}") 

856 

857 try: 

858 store, key = self._split_store_and_key(path) 

859 group = zarr.group(store=store) 

860 

861 seen_keys = set() 

862 

863 # Resolve symlink chain 

864 while True: 

865 if key not in group: 

866 raise FileNotFoundError(f"Zarr key does not exist: {key}") 

867 obj = group[key] 

868 

869 if hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

870 if key in seen_keys: 

871 raise StorageResolutionError(f"Symlink cycle detected in Zarr at: {key}") 

872 seen_keys.add(key) 

873 key = obj.attrs["_symlink"] 

874 continue 

875 break 

876 

877 # obj is resolved 

878 if isinstance(obj, zarr.hierarchy.Group): 

879 return True 

880 elif isinstance(obj, zarr.core.Array): 

881 raise NotADirectoryError(f"Zarr path is an array (file): {path}") 

882 else: 

883 raise StorageResolutionError(f"Unknown Zarr object at: {path}") 

884 

885 except Exception as e: 

886 raise StorageResolutionError(f"Failed to resolve Zarr directory path: {path}") from e 

887 

888 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

889 """ 

890 Move a Zarr key or object (array/group) from one location to another, resolving symlinks. 

891  

892 Supports: 

893 - Disk or memory stores 

894 - Zarr-native symlinks 

895 - Key renames within group 

896 - Full copy+delete across stores if needed 

897  

898 Raises: 

899 FileNotFoundError: If src does not exist 

900 FileExistsError: If dst already exists 

901 StorageResolutionError: On failure 

902 """ 

903 import zarr 

904 

905 src_store, src_key = self._split_store_and_key(src) 

906 dst_store, dst_key = self._split_store_and_key(dst) 

907 

908 src_group = zarr.group(store=src_store) 

909 dst_group = zarr.group(store=dst_store) 

910 

911 if src_key not in src_group: 

912 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}") 

913 if dst_key in dst_group: 

914 raise FileExistsError(f"Zarr destination key already exists: {dst_key}") 

915 

916 obj = src_group[src_key] 

917 

918 # Resolve symlinks if present 

919 seen_keys = set() 

920 while hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

921 if src_key in seen_keys: 

922 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}") 

923 seen_keys.add(src_key) 

924 src_key = obj.attrs["_symlink"] 

925 obj = src_group[src_key] 

926 

927 try: 

928 if src_store is dst_store: 

929 # Native move within the same Zarr group/store 

930 src_group.move(src_key, dst_key) 

931 else: 

932 # Cross-store: perform manual copy + delete 

933 obj.copy(dst_group, name=dst_key) 

934 del src_group[src_key] 

935 except Exception as e: 

936 raise StorageResolutionError(f"Failed to move {src_key} to {dst_key}") from e 

937 

938 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

939 """ 

940 Copy a Zarr key or object (array/group) from one location to another. 

941 

942 - Resolves Zarr-native symlinks before copying 

943 - Prevents overwrite unless explicitly allowed (future feature) 

944 - Works across memory or disk stores 

945 

946 Raises: 

947 FileNotFoundError: If src does not exist 

948 FileExistsError: If dst already exists 

949 StorageResolutionError: On failure 

950 """ 

951 import zarr 

952 

953 src_store, src_key = self._split_store_and_key(src) 

954 dst_store, dst_key = self._split_store_and_key(dst) 

955 

956 src_group = zarr.group(store=src_store) 

957 dst_group = zarr.group(store=dst_store) 

958 

959 if src_key not in src_group: 

960 raise FileNotFoundError(f"Zarr source key does not exist: {src_key}") 

961 if dst_key in dst_group: 

962 raise FileExistsError(f"Zarr destination key already exists: {dst_key}") 

963 

964 obj = src_group[src_key] 

965 

966 seen_keys = set() 

967 while hasattr(obj, "attrs") and "_symlink" in obj.attrs: 

968 if src_key in seen_keys: 

969 raise StorageResolutionError(f"Symlink cycle detected at: {src_key}") 

970 seen_keys.add(src_key) 

971 src_key = obj.attrs["_symlink"] 

972 obj = src_group[src_key] 

973 

974 try: 

975 obj.copy(dst_group, name=dst_key) 

976 except Exception as e: 

977 raise StorageResolutionError(f"Failed to copy {src_key} to {dst_key}") from e 

978 

979 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

980 """ 

981 Return structural metadata about a Zarr path. 

982 

983 Returns: 

984 dict with keys: 

985 - 'type': 'file', 'directory', 'symlink', or 'missing' 

986 - 'key': final resolved key 

987 - 'target': symlink target if applicable 

988 - 'store': repr(store) 

989 - 'exists': bool 

990 

991 Raises: 

992 StorageResolutionError: On resolution failure 

993 """ 

994 store, key = self._split_store_and_key(path) 

995 group = zarr.group(store=store) 

996 

997 try: 

998 if key in group: 

999 obj = group[key] 

1000 attrs = getattr(obj, "attrs", {}) 

1001 is_link = "_symlink" in attrs 

1002 

1003 if is_link: 

1004 target = attrs["_symlink"] 

1005 if not isinstance(target, str): 

1006 raise StorageResolutionError(f"Invalid symlink format at {key}") 

1007 return { 

1008 "type": "symlink", 

1009 "key": key, 

1010 "target": target, 

1011 "store": repr(store), 

1012 "exists": target in group 

1013 } 

1014 

1015 if isinstance(obj, zarr.Array): 

1016 return { 

1017 "type": "file", 

1018 "key": key, 

1019 "store": repr(store), 

1020 "exists": True 

1021 } 

1022 

1023 elif isinstance(obj, zarr.Group): 

1024 return { 

1025 "type": "directory", 

1026 "key": key, 

1027 "store": repr(store), 

1028 "exists": True 

1029 } 

1030 

1031 raise StorageResolutionError(f"Unknown object type at: {key}") 

1032 else: 

1033 return { 

1034 "type": "missing", 

1035 "key": key, 

1036 "store": repr(store), 

1037 "exists": False 

1038 } 

1039 

1040 except Exception as e: 

1041 raise StorageResolutionError(f"Failed to stat Zarr key {key}") from e 

1042 

1043class ZarrSymlink: 

1044 """ 

1045 Represents a symbolic link in a Zarr store. 

1046 

1047 This class is used to represent symbolic links in a Zarr store. 

1048 It stores the target path of the symlink. 

1049 """ 

1050 def __init__(self, target: str): 

1051 self.target = target 

1052 

1053 def __repr__(self): 

1054 return f"<ZarrSymlink → {self.target}>"