Coverage for openhcs/io/memory.py: 31.7%

294 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1# openhcs/io/storage/backends/memory.py 

2""" 

3Memory storage backend module for OpenHCS. 

4 

5This module provides an in-memory implementation of the MicroscopyStorageBackend interface. 

6It stores data in memory and supports overlay operations for materializing data to disk when needed. 

7 

8This implementation enforces Clause 106-A (Declared Memory Types) and 

9Clause 251 (Declarative Memory Conversion Interface) by requiring explicit 

10memory type declarations and providing declarative conversion methods. 

11""" 

12 

13import logging 

14from pathlib import Path 

15from typing import Any, Dict, List, Optional, Set, Union 

16 

17from openhcs.io.base import StorageBackend 

18from openhcs.constants.constants import Backend 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class MemoryStorageBackend(StorageBackend): 

24 """Memory storage backend with automatic registration.""" 

25 _backend_type = Backend.MEMORY.value 

26 def __init__(self, shared_dict: Optional[Dict[str, Any]] = None): 

27 """ 

28 Initializes the memory storage. 

29 

30 Args: 

31 shared_dict: If provided, uses this dictionary as the storage backend. 

32 This is useful for sharing memory between processes with a 

33 multiprocessing.Manager.dict. If None, a new local 

34 dictionary is created. 

35 """ 

36 self._memory_store = shared_dict if shared_dict is not None else {} 

37 self._prefixes = set() # Declared directory-like namespaces 

38 

39 def _normalize(self, path: Union[str, Path],bypass_normalization=False) -> str: 

40 """ 

41 Normalize paths for memory backend storage. 

42 

43 Memory backend uses relative paths internally to avoid conflicts 

44 between absolute paths from different systems. This method converts 

45 absolute paths to relative paths by removing the root component. 

46 

47 Args: 

48 path: Path to normalize (absolute or relative) 

49 

50 Returns: 

51 Normalized relative path string 

52 """ 

53 path_obj = Path(path) 

54 

55 if bypass_normalization: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 return path_obj.as_posix() 

57 

58 # Store paths as-is - no forced relative conversion 

59 # This preserves absolute paths which are needed for cross-backend operations 

60 return path_obj.as_posix() 

61 

62 def load(self, file_path: Union[str, Path], **kwargs) -> Any: 

63 key = self._normalize(file_path) 

64 

65 if key not in self._memory_store: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 raise FileNotFoundError(f"Memory path not found: {file_path}") 

67 

68 value = self._memory_store[key] 

69 if value is None: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 raise IsADirectoryError(f"Path is a directory: {file_path}") 

71 

72 return value 

73 

74 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None: 

75 key = self._normalize(output_path) 

76 

77 # Check if parent directory exists (simple flat structure) 

78 parent_path = self._normalize(Path(key).parent) 

79 if parent_path != '.' and parent_path not in self._memory_store: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 raise FileNotFoundError(f"Parent path does not exist: {output_path}") 

81 

82 # Check if file already exists 

83 if key in self._memory_store: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 raise FileExistsError(f"Path already exists: {output_path}") 

85 self._memory_store[key] = data 

86 

87 # Save the file 

88 

89 def load_batch(self, file_paths: List[Union[str, Path]]) -> List[Any]: 

90 """ 

91 Load multiple files sequentially using existing load method. 

92 

93 Args: 

94 file_paths: List of file paths to load 

95 **kwargs: Additional arguments passed to load method 

96 

97 Returns: 

98 List of loaded data objects in the same order as file_paths 

99 """ 

100 results = [] 

101 for file_path in file_paths: 

102 result = self.load(file_path) 

103 results.append(result) 

104 return results 

105 

106 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]]) -> None: 

107 """ 

108 Save multiple files sequentially using existing save method. 

109 

110 Args: 

111 data_list: List of data objects to save 

112 output_paths: List of destination paths (must match length of data_list) 

113 **kwargs: Additional arguments passed to save method 

114 

115 Raises: 

116 ValueError: If data_list and output_paths have different lengths 

117 """ 

118 if len(data_list) != len(output_paths): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})") 

120 

121 for data, output_path in zip(data_list, output_paths): 

122 self.save(data, output_path) 

123 

124 def list_files( 

125 self, 

126 directory: Union[str, Path], 

127 pattern: str = "*", 

128 extensions: Optional[Set[str]] = None, 

129 recursive: bool = False 

130 ) -> List[Path]: 

131 from fnmatch import fnmatch 

132 

133 dir_key = self._normalize(directory) 

134 

135 # Check if directory exists and is a directory 

136 if dir_key not in self._memory_store: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 raise FileNotFoundError(f"Directory not found: {directory}") 

138 if self._memory_store[dir_key] is not None: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 raise NotADirectoryError(f"Path is not a directory: {directory}") 

140 

141 result = [] 

142 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key 

143 

144 for path, value in list(self._memory_store.items()): 

145 # Skip if not under this directory 

146 if not path.startswith(dir_prefix): 

147 continue 

148 

149 # Get relative path from directory 

150 rel_path = path[len(dir_prefix):] 

151 

152 # Skip if recursive=False and path has subdirectories 

153 if not recursive and "/" in rel_path: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 continue 

155 

156 # Only include files (value is not None) 

157 if value is not None: 157 ↛ 144line 157 didn't jump to line 144 because the condition on line 157 was always true

158 filename = Path(rel_path).name 

159 # If pattern is None, match all files 

160 if pattern is None or fnmatch(filename, pattern): 160 ↛ 144line 160 didn't jump to line 144 because the condition on line 160 was always true

161 if not extensions or Path(filename).suffix in extensions: 161 ↛ 144line 161 didn't jump to line 144 because the condition on line 161 was always true

162 # Calculate depth for breadth-first sorting 

163 depth = rel_path.count('/') 

164 result.append((Path(path), depth)) 

165 

166 # Sort by depth first (breadth-first), then by path for consistency 

167 result.sort(key=lambda x: (x[1], str(x[0]))) 

168 

169 # Return just the paths 

170 return [path for path, _ in result] 

171 

172 def list_dir(self, path: Union[str, Path]) -> List[str]: 

173 dir_key = self._normalize(path) 

174 

175 # Check if directory exists and is a directory 

176 if dir_key not in self._memory_store: 

177 raise FileNotFoundError(f"Directory not found: {path}") 

178 if self._memory_store[dir_key] is not None: 

179 raise NotADirectoryError(f"Path is not a directory: {path}") 

180 

181 # Find all direct children of this directory 

182 result = set() 

183 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key 

184 

185 for stored_path in list(self._memory_store.keys()): 

186 if stored_path.startswith(dir_prefix): 

187 rel_path = stored_path[len(dir_prefix):] 

188 # Only direct children (no subdirectories) 

189 if "/" not in rel_path: 

190 result.add(rel_path) 

191 else: 

192 # Add the first directory component 

193 first_dir = rel_path.split("/")[0] 

194 result.add(first_dir) 

195 

196 return list(result) 

197 

198 

199 def delete(self, path: Union[str, Path]) -> None: 

200 """ 

201 Delete a file or empty directory from the in-memory store. 

202 

203 This method does not support recursive deletion. 

204 

205 Args: 

206 path: Virtual path to delete 

207 

208 Raises: 

209 FileNotFoundError: If the path does not exist 

210 IsADirectoryError: If path is a non-empty directory 

211 StorageResolutionError: For unexpected internal failures 

212 """ 

213 key = self._normalize(path) 

214 

215 if key not in self._memory_store: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 raise FileNotFoundError(f"Path not found: {path}") 

217 

218 # If it's a directory, check if it's empty 

219 if self._memory_store[key] is None: 219 ↛ 221line 219 didn't jump to line 221 because the condition on line 219 was never true

220 # Check if directory has any children 

221 dir_prefix = key + "/" if not key.endswith("/") else key 

222 for stored_path in list(self._memory_store.keys()): 

223 if stored_path.startswith(dir_prefix): 

224 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") 

225 

226 try: 

227 del self._memory_store[key] 

228 except Exception as e: 

229 raise StorageResolutionError(f"Failed to delete path from memory store: {path}") from e 

230 

231 def delete_all(self, path: Union[str, Path]) -> None: 

232 """ 

233 Recursively delete a file, empty directory, or a nested directory tree 

234 from the in-memory store. 

235 

236 This method is the only allowed way to recursively delete in memory backend. 

237 

238 Args: 

239 path: Virtual path to delete 

240 

241 Raises: 

242 FileNotFoundError: If the path does not exist 

243 StorageResolutionError: If internal deletion fails 

244 """ 

245 key = self._normalize(path) 

246 

247 if key not in self._memory_store: 

248 raise FileNotFoundError(f"Path not found: {path}") 

249 

250 try: 

251 # Delete the path itself 

252 del self._memory_store[key] 

253 

254 # If it was a directory, delete all children 

255 dir_prefix = key + "/" if not key.endswith("/") else key 

256 keys_to_delete = [k for k in list(self._memory_store.keys()) if k.startswith(dir_prefix)] 

257 for k in keys_to_delete: 

258 del self._memory_store[k] 

259 

260 except Exception as e: 

261 raise StorageResolutionError(f"Failed to recursively delete path: {path}") from e 

262 

263 def ensure_directory(self, directory: Union[str, Path]) -> Path: 

264 key = self._normalize(directory) 

265 self._prefixes.add(key if key.endswith("/") else key + "/") 

266 

267 # Create the entire directory hierarchy 

268 path_obj = Path(key) 

269 parts = path_obj.parts 

270 

271 # Create each parent directory in the hierarchy 

272 for i in range(1, len(parts) + 1): 

273 partial_path = self._normalize(Path(*parts[:i])) 

274 if partial_path not in self._memory_store: 

275 self._memory_store[partial_path] = None # Directory = None value 

276 

277 return Path(key) 

278 

279 

280 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False): 

281 src_parts = str(source).strip("/").split("/") 

282 dst_parts = str(link_name).strip("/").split("/") 

283 

284 # Traverse to source 

285 src_dict = self._memory_store 

286 for part in src_parts[:-1]: 

287 src_dict = src_dict.get(part) 

288 if not isinstance(src_dict, dict): 

289 raise FileNotFoundError(f"Invalid symlink source path: {source}") 

290 src_key = src_parts[-1] 

291 if src_key not in src_dict: 

292 raise FileNotFoundError(f"Symlink source not found: {source}") 

293 

294 # Traverse to destination parent 

295 dst_dict = self._memory_store 

296 for part in dst_parts[:-1]: 

297 dst_dict = dst_dict.get(part) 

298 if dst_dict is None or not isinstance(dst_dict, dict): 

299 raise FileNotFoundError(f"Destination parent path does not exist: {link_name}") 

300 

301 dst_key = dst_parts[-1] 

302 if dst_key in dst_dict: 

303 if not overwrite: 

304 raise FileExistsError(f"Symlink destination already exists: {link_name}") 

305 # Remove existing entry if overwrite=True 

306 del dst_dict[dst_key] 

307 

308 dst_dict[dst_key] = MemorySymlink(target=str(source)) 

309 

310 def is_symlink(self, path: Union[str, Path]) -> bool: 

311 parts = str(path).strip("/").split("/") 

312 current = self._memory_store 

313 

314 for part in parts[:-1]: 

315 current = current.get(part) 

316 if not isinstance(current, dict): 

317 return False 

318 

319 key = parts[-1] 

320 return isinstance(current.get(key), MemorySymlink) 

321 

322 def is_file(self, path: Union[str, Path]) -> bool: 

323 """ 

324 Check if a memory path points to a file. 

325 

326 Raises: 

327 FileNotFoundError: If path does not exist 

328 IsADirectoryError: If path is a directory 

329 """ 

330 key = self._normalize(path) 

331 

332 if key not in self._memory_store: 

333 raise FileNotFoundError(f"Memory path does not exist: {path}") 

334 

335 value = self._memory_store[key] 

336 if value is None: 

337 raise IsADirectoryError(f"Path is a directory: {path}") 

338 

339 return True 

340 

341 def is_dir(self, path: Union[str, Path]) -> bool: 

342 """ 

343 Check if a memory path points to a directory. 

344 

345 Args: 

346 path: Path to check 

347 

348 Returns: 

349 bool: True if path is a directory 

350 

351 Raises: 

352 FileNotFoundError: If path does not exist 

353 NotADirectoryError: If path is not a directory 

354 """ 

355 key = self._normalize(path) 

356 

357 if key not in self._memory_store: 

358 raise FileNotFoundError(f"Memory path does not exist: {path}") 

359 

360 value = self._memory_store[key] 

361 if value is not None: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 raise NotADirectoryError(f"Path is not a directory: {path}") 

363 

364 return True 

365 

366 def _resolve_path(self, path: Union[str, Path]) -> Optional[Any]: 

367 """ 

368 Resolves a memory-style virtual path into an in-memory object (file or directory). 

369 

370 This performs a pure dictionary traversal. It never coerces types or guesses structure. 

371 If any intermediate path component is missing or not a dict, resolution fails. 

372 

373 Args: 

374 path: Memory-style path, e.g., 'root/dir1/file.txt' 

375 

376 Returns: 

377 The object at that path (could be dict or content object), or None if not found 

378 """ 

379 components = str(path).strip("/").split("/") 

380 current = self._memory_store # root dict, e.g., {"root": {"file.txt": "data"}} 

381 

382 for comp in components: 

383 if not isinstance(current, dict): 

384 return None # hit a file too early 

385 if comp not in current: 

386 return None 

387 current = current[comp] 

388 

389 return current 

390 

391 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

392 """ 

393 Move a file or directory within the memory store. Symlinks are preserved as objects. 

394 

395 Raises: 

396 FileNotFoundError: If src path or dst parent path does not exist 

397 FileExistsError: If destination already exists 

398 StorageResolutionError: On structure violations 

399 """ 

400 def _resolve_parent(path: Union[str, Path]): 

401 parts = str(path).strip("/").split("/") 

402 return parts[:-1], parts[-1] 

403 

404 src_parts, src_name = _resolve_parent(src) 

405 dst_parts, dst_name = _resolve_parent(dst) 

406 

407 # Traverse to src 

408 src_dict = self._memory_store 

409 for part in src_parts: 

410 src_dict = src_dict.get(part) 

411 if not isinstance(src_dict, dict): 

412 raise FileNotFoundError(f"Source path invalid: {src}") 

413 if src_name not in src_dict: 

414 raise FileNotFoundError(f"Source not found: {src}") 

415 

416 # Traverse to dst parent — do not create 

417 dst_dict = self._memory_store 

418 for part in dst_parts: 

419 dst_dict = dst_dict.get(part) 

420 if dst_dict is None: 

421 raise FileNotFoundError(f"Destination parent path does not exist: {dst}") 

422 if not isinstance(dst_dict, dict): 

423 raise StorageResolutionError(f"Destination path is not a directory: {part}") 

424 

425 if dst_name in dst_dict: 

426 raise FileExistsError(f"Destination already exists: {dst}") 

427 

428 try: 

429 dst_dict[dst_name] = src_dict.pop(src_name) 

430 except Exception as e: 

431 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e 

432 

433 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None: 

434 """ 

435 Copy a file, directory, or symlink within the memory store. 

436  

437 - Respects structural separation (no fallback) 

438 - Will not overwrite destination 

439 - Will not create missing parent directories 

440 - Symlinks are copied as objects 

441  

442 Raises: 

443 FileNotFoundError: If src does not exist or dst parent is missing 

444 FileExistsError: If dst already exists 

445 StorageResolutionError: On invalid structure 

446 """ 

447 def _resolve_parent(path: Union[str, Path]): 

448 parts = str(path).strip("/").split("/") 

449 return parts[:-1], parts[-1] 

450 

451 src_parts, src_name = _resolve_parent(src) 

452 dst_parts, dst_name = _resolve_parent(dst) 

453 

454 # Traverse to src object 

455 src_dict = self._memory_store 

456 for part in src_parts: 

457 src_dict = src_dict.get(part) 

458 if not isinstance(src_dict, dict): 

459 raise FileNotFoundError(f"Source path invalid: {src}") 

460 if src_name not in src_dict: 

461 raise FileNotFoundError(f"Source not found: {src}") 

462 obj = src_dict[src_name] 

463 

464 # Traverse to dst parent (do not create) 

465 dst_dict = self._memory_store 

466 for part in dst_parts: 

467 dst_dict = dst_dict.get(part) 

468 if dst_dict is None: 

469 raise FileNotFoundError(f"Destination parent path does not exist: {dst}") 

470 if not isinstance(dst_dict, dict): 

471 raise StorageResolutionError(f"Destination path is not a directory: {part}") 

472 

473 if dst_name in dst_dict: 

474 raise FileExistsError(f"Destination already exists: {dst}") 

475 

476 # Perform copy (deep to avoid aliasing) 

477 try: 

478 dst_dict[dst_name] = py_copy.deepcopy(obj) 

479 except Exception as e: 

480 raise StorageResolutionError(f"Failed to copy {src} to {dst}") from e 

481 

482 def stat(self, path: Union[str, Path]) -> Dict[str, Any]: 

483 """ 

484 Return structural metadata about a memory-backed path. 

485 

486 Returns: 

487 dict with keys: 

488 - 'type': 'file', 'directory', 'symlink', or 'missing' 

489 - 'path': str(path) 

490 - 'target': symlink target if applicable 

491 - 'exists': bool 

492 

493 Raises: 

494 StorageResolutionError: On resolution failure 

495 """ 

496 parts = str(path).strip("/").split("/") 

497 current = self._memory_store 

498 

499 try: 

500 for part in parts[:-1]: 

501 current = current.get(part) 

502 if current is None: 

503 return { 

504 "type": "missing", 

505 "path": str(path), 

506 "exists": False 

507 } 

508 if not isinstance(current, dict): 

509 raise StorageResolutionError(f"Invalid intermediate path segment: {part}") 

510 

511 final_key = parts[-1] 

512 if final_key not in current: 

513 return { 

514 "type": "missing", 

515 "path": str(path), 

516 "exists": False 

517 } 

518 

519 obj = current[final_key] 

520 

521 if isinstance(obj, MemorySymlink): 

522 return { 

523 "type": "symlink", 

524 "path": str(path), 

525 "target": obj.target, 

526 "exists": self._resolve_path(obj.target) is not None 

527 } 

528 

529 if isinstance(obj, dict): 

530 return { 

531 "type": "directory", 

532 "path": str(path), 

533 "exists": True 

534 } 

535 

536 return { 

537 "type": "file", 

538 "path": str(path), 

539 "exists": True 

540 } 

541 

542 except Exception as e: 

543 raise StorageResolutionError(f"Failed to stat memory path: {path}") from e 

544 

545 def clear_files_only(self) -> None: 

546 """ 

547 Clear all files from the memory store while preserving directory structure. 

548 

549 This method removes all file entries (non-None values) but keeps directory 

550 entries (None values) intact. This prevents key collisions when reusing 

551 the same processing context while maintaining the directory structure 

552 needed for subsequent operations. 

553 

554 Note: 

555 - Directories (entries with None values) are preserved 

556 - Files (entries with non-None values) are deleted 

557 - Symlinks are also deleted as they are considered file-like objects 

558 - GPU objects are explicitly deleted before removal 

559 - Caller is responsible for calling gc.collect() and GPU cleanup after this method 

560 """ 

561 try: 

562 # Collect keys and objects to delete (preserve directories) 

563 files_to_delete = [] 

564 gpu_objects_found = 0 

565 

566 for key, value in list(self._memory_store.items()): 566 ↛ 568line 566 didn't jump to line 568 because the loop on line 566 never started

567 # Delete files (non-None values) and symlinks, but keep directories (None values) 

568 if value is not None: 

569 files_to_delete.append(key) 

570 

571 # Check if this is a GPU object that needs explicit cleanup 

572 if self._is_gpu_object(value): 

573 gpu_objects_found += 1 

574 self._explicit_gpu_delete(value, key) 

575 

576 # Delete all file entries from memory store 

577 for key in files_to_delete: 577 ↛ 578line 577 didn't jump to line 578 because the loop on line 577 never started

578 del self._memory_store[key] 

579 

580 logger.debug(f"Cleared {len(files_to_delete)} files from memory backend (including {gpu_objects_found} GPU objects), " 

581 f"preserved {len(self._memory_store)} directories") 

582 

583 except Exception as e: 

584 raise StorageResolutionError("Failed to clear files from memory store") from e 

585 

586 def _is_gpu_object(self, obj: Any) -> bool: 

587 """ 

588 Check if an object is a GPU tensor/array that needs explicit cleanup. 

589 

590 Args: 

591 obj: Object to check 

592 

593 Returns: 

594 True if object is a GPU tensor/array 

595 """ 

596 try: 

597 # Check for PyTorch tensors on GPU 

598 if hasattr(obj, 'device') and hasattr(obj, 'is_cuda'): 

599 if obj.is_cuda: 

600 return True 

601 

602 # Check for CuPy arrays 

603 if hasattr(obj, '__class__') and 'cupy' in str(type(obj)): 

604 return True 

605 

606 # Check for other GPU arrays by device attribute 

607 if hasattr(obj, 'device') and hasattr(obj.device, 'type'): 

608 if 'cuda' in str(obj.device.type).lower() or 'gpu' in str(obj.device.type).lower(): 

609 return True 

610 

611 return False 

612 except Exception: 

613 # If we can't determine, assume it's not a GPU object 

614 return False 

615 

616 def _explicit_gpu_delete(self, obj: Any, key: str) -> None: 

617 """ 

618 Explicitly delete a GPU object and clear its memory. 

619 

620 Args: 

621 obj: GPU object to delete 

622 key: Memory backend key for logging 

623 """ 

624 try: 

625 # For PyTorch tensors 

626 if hasattr(obj, 'device') and hasattr(obj, 'is_cuda') and obj.is_cuda: 

627 device_id = obj.device.index if obj.device.index is not None else 0 

628 # Move to CPU first to free GPU memory, then delete 

629 obj_cpu = obj.cpu() 

630 del obj_cpu 

631 logger.debug(f"🔥 EXPLICIT GPU DELETE: PyTorch tensor {key} on device {device_id}") 

632 return 

633 

634 # For CuPy arrays 

635 if hasattr(obj, '__class__') and 'cupy' in str(type(obj)): 

636 # CuPy arrays are automatically freed when deleted 

637 logger.debug(f"🔥 EXPLICIT GPU DELETE: CuPy array {key}") 

638 return 

639 

640 # For other GPU objects 

641 if hasattr(obj, 'device'): 

642 logger.debug(f"🔥 EXPLICIT GPU DELETE: GPU object {key} on device {obj.device}") 

643 

644 except Exception as e: 

645 logger.warning(f"Failed to explicitly delete GPU object {key}: {e}") 

646 

647class MemorySymlink: 

648 def __init__(self, target: str): 

649 self.target = target # Must be a normalized key path 

650 

651 def __repr__(self): 

652 return f"<MemorySymlink → {self.target}>"