Coverage for src/polystore/memory.py: 86%

263 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 06:58 +0000

1# polystore/memory.py 

2""" 

3Memory storage backend module. 

4 

5This module provides an in-memory implementation of the StorageBackend interface. 

6It stores data in memory and supports directory operations. 

7""" 

8 

9import copy as py_copy 

10import logging 

11from pathlib import Path, PurePosixPath 

12from typing import Any 

13 

14from .base import StorageBackend 

15from .exceptions import StorageResolutionError 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class MemoryBackend(StorageBackend): 

21 """Memory storage backend with automatic registration.""" 

22 

23 _backend_type = "memory" 

24 

25 def __init__(self, shared_dict: dict[str, Any] | None = None): 

26 """ 

27 Initializes the memory storage. 

28 

29 Args: 

30 shared_dict: If provided, uses this dictionary as the storage backend. 

31 This is useful for sharing memory between processes with a 

32 multiprocessing.Manager.dict. If None, a new local 

33 dictionary is created. 

34 """ 

35 self._memory_store = shared_dict if shared_dict is not None else {} 

36 self._prefixes = set() # Declared directory-like namespaces 

37 

38 def _normalize(self, path: str | Path, bypass_normalization=False) -> str: 

39 """ 

40 Normalize paths for memory backend storage. 

41 

42 Memory backend uses absolute paths internally for consistency. 

43 This method ensures paths are converted to absolute form. 

44 

45 Args: 

46 path: Path to normalize (absolute or relative) 

47 bypass_normalization: If True, return path as-is 

48 

49 Returns: 

50 Normalized absolute path string 

51 """ 

52 path_obj = Path(path) 

53 

54 if bypass_normalization: 

55 return path_obj.as_posix() 

56 

57 # Convert to absolute path by prepending / if relative 

58 posix_path = path_obj.as_posix() 

59 if not posix_path.startswith("/"): 

60 posix_path = "/" + posix_path 

61 

62 return posix_path 

63 

64 def load(self, file_path: str | Path, **kwargs) -> Any: 

65 key = self._normalize(file_path) 

66 

67 if key not in self._memory_store: 

68 raise FileNotFoundError(f"Memory path not found: {file_path}") 

69 

70 value = self._memory_store[key] 

71 if value is None: 

72 raise IsADirectoryError(f"Path is a directory: {file_path}") 

73 

74 return value 

75 

76 def save(self, data: Any, output_path: str | Path, **kwargs) -> None: 

77 key = self._normalize(output_path) 

78 

79 # Check if parent directory exists (simple flat structure) 

80 parent_path = self._normalize(Path(key).parent) 

81 if parent_path != "." and parent_path not in self._memory_store: 

82 raise FileNotFoundError(f"Parent path does not exist: {output_path}") 

83 

84 # Check if file already exists 

85 if key in self._memory_store: 

86 raise FileExistsError(f"Path already exists: {output_path}") 

87 self._memory_store[key] = data 

88 

89 # Save the file 

90 

91 def load_batch(self, file_paths: list[str | Path]) -> list[Any]: 

92 """ 

93 Load multiple files sequentially using existing load method. 

94 

95 Args: 

96 file_paths: List of file paths to load 

97 **kwargs: Additional arguments passed to load method 

98 

99 Returns: 

100 List of loaded data objects in the same order as file_paths 

101 """ 

102 results = [] 

103 for file_path in file_paths: 

104 result = self.load(file_path) 

105 results.append(result) 

106 return results 

107 

108 def save_batch(self, data_list: list[Any], output_paths: list[str | Path]) -> None: 

109 """ 

110 Save multiple files sequentially using existing save method. 

111 

112 Args: 

113 data_list: List of data objects to save 

114 output_paths: List of destination paths (must match length of data_list) 

115 **kwargs: Additional arguments passed to save method 

116 

117 Raises: 

118 ValueError: If data_list and output_paths have different lengths 

119 """ 

120 if len(data_list) != len(output_paths): 

121 raise ValueError( 

122 f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})" 

123 ) 

124 

125 for data, output_path in zip(data_list, output_paths, strict=False): 

126 self.save(data, output_path) 

127 

128 def list_files( 

129 self, 

130 directory: str | Path, 

131 pattern: str = "*", 

132 extensions: set[str] | None = None, 

133 recursive: bool = False, 

134 ) -> list[Path]: 

135 from fnmatch import fnmatch 

136 

137 dir_key = self._normalize(directory) 

138 

139 # Check if directory exists and is a directory 

140 if dir_key not in self._memory_store: 

141 raise FileNotFoundError(f"Directory not found: {directory}") 

142 if self._memory_store[dir_key] is not None: 

143 raise NotADirectoryError(f"Path is not a directory: {directory}") 

144 

145 result = [] 

146 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key 

147 

148 for path, value in list(self._memory_store.items()): 

149 # Skip if not under this directory 

150 if not path.startswith(dir_prefix): 

151 continue 

152 

153 # Get relative path from directory 

154 rel_path = path[len(dir_prefix) :] 

155 

156 # Skip if recursive=False and path has subdirectories 

157 if not recursive and "/" in rel_path: 

158 continue 

159 

160 # Only include files (value is not None) 

161 if value is not None: 

162 filename = Path(rel_path).name 

163 # If pattern is None, match all files 

164 if pattern is None or fnmatch(filename, pattern): 

165 if not extensions or Path(filename).suffix in extensions: 

166 # Calculate depth for breadth-first sorting 

167 depth = rel_path.count("/") 

168 result.append((Path(path), depth)) 

169 

170 # Sort by depth first (breadth-first), then by path for consistency 

171 result.sort(key=lambda x: (x[1], str(x[0]))) 

172 

173 # Return just the paths 

174 return [path for path, _ in result] 

175 

176 def list_dir(self, path: str | Path) -> list[str]: 

177 dir_key = self._normalize(path) 

178 

179 # Check if directory exists and is a directory 

180 if dir_key not in self._memory_store: 

181 raise FileNotFoundError(f"Directory not found: {path}") 

182 if self._memory_store[dir_key] is not None: 

183 raise NotADirectoryError(f"Path is not a directory: {path}") 

184 

185 # Find all direct children of this directory 

186 result = set() 

187 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key 

188 

189 for stored_path in list(self._memory_store.keys()): 

190 if stored_path.startswith(dir_prefix): 

191 rel_path = stored_path[len(dir_prefix) :] 

192 # Only direct children (no subdirectories) 

193 if "/" not in rel_path: 

194 result.add(rel_path) 

195 else: 

196 # Add the first directory component 

197 first_dir = rel_path.split("/")[0] 

198 result.add(first_dir) 

199 

200 return list(result) 

201 

202 def delete(self, path: str | Path) -> None: 

203 """ 

204 Delete a file or empty directory from the in-memory store. 

205 

206 This method does not support recursive deletion. 

207 

208 Args: 

209 path: Virtual path to delete 

210 

211 Raises: 

212 FileNotFoundError: If the path does not exist 

213 IsADirectoryError: If path is a non-empty directory 

214 StorageResolutionError: For unexpected internal failures 

215 """ 

216 key = self._normalize(path) 

217 

218 if key not in self._memory_store: 

219 raise FileNotFoundError(f"Path not found: {path}") 

220 

221 # If it's a directory, check if it's empty 

222 if self._memory_store[key] is None: 

223 # Check if directory has any children 

224 dir_prefix = key + "/" if not key.endswith("/") else key 

225 for stored_path in list(self._memory_store.keys()): 

226 if stored_path.startswith(dir_prefix): 

227 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}") 

228 

229 try: 

230 del self._memory_store[key] 

231 except Exception as e: 

232 raise StorageResolutionError(f"Failed to delete path from memory store: {path}") from e 

233 

234 def delete_all(self, path: str | Path) -> None: 

235 """ 

236 Recursively delete a file, empty directory, or a nested directory tree 

237 from the in-memory store. 

238 

239 This method is the only allowed way to recursively delete in memory backend. 

240 

241 Args: 

242 path: Virtual path to delete 

243 

244 Raises: 

245 FileNotFoundError: If the path does not exist 

246 StorageResolutionError: If internal deletion fails 

247 """ 

248 key = self._normalize(path) 

249 

250 if key not in self._memory_store: 

251 raise FileNotFoundError(f"Path not found: {path}") 

252 

253 try: 

254 # Delete the path itself 

255 del self._memory_store[key] 

256 

257 # If it was a directory, delete all children 

258 dir_prefix = key + "/" if not key.endswith("/") else key 

259 keys_to_delete = [ 

260 k for k in list(self._memory_store.keys()) if k.startswith(dir_prefix) 

261 ] 

262 for k in keys_to_delete: 

263 del self._memory_store[k] 

264 

265 except Exception as e: 

266 raise StorageResolutionError(f"Failed to recursively delete path: {path}") from e 

267 

268 def ensure_directory(self, directory: str | Path) -> PurePosixPath: 

269 key = self._normalize(directory) 

270 self._prefixes.add(key if key.endswith("/") else key + "/") 

271 

272 # Create the entire directory hierarchy 

273 path_obj = Path(key) 

274 parts = path_obj.parts 

275 

276 # Create each parent directory in the hierarchy 

277 for i in range(1, len(parts) + 1): 

278 partial_path = self._normalize(Path(*parts[:i])) 

279 if partial_path not in self._memory_store: 

280 self._memory_store[partial_path] = None # Directory = None value 

281 

282 # Return a POSIX-style path object so string conversion preserves 

283 # forward slashes across platforms (important for Windows CI/tests) 

284 return PurePosixPath(key) 

285 

286 def create_symlink(self, source: str | Path, link_name: str | Path, overwrite: bool = False): 

287 src_key = self._normalize(source) 

288 link_key = self._normalize(link_name) 

289 

290 # Mirror disk backend semantics: require the target to exist. 

291 if src_key not in self._memory_store: 

292 raise FileNotFoundError(f"Symlink source not found: {source}") 

293 

294 # Check destination parent exists 

295 link_parent = self._normalize(Path(link_key).parent) 

296 if link_parent != "." and link_parent not in self._memory_store: 

297 raise FileNotFoundError(f"Destination parent path does not exist: {link_name}") 

298 

299 # Check if destination already exists 

300 if link_key in self._memory_store: 

301 if not overwrite: 

302 raise FileExistsError(f"Symlink destination already exists: {link_name}") 

303 # Remove existing entry if overwrite=True 

304 del self._memory_store[link_key] 

305 

306 self._memory_store[link_key] = MemorySymlink(target=str(source)) 

307 

308 def is_symlink(self, path: str | Path) -> bool: 

309 key = self._normalize(path) 

310 return isinstance(self._memory_store.get(key), MemorySymlink) 

311 

312 def exists(self, path: str | Path) -> bool: 

313 """ 

314 Check if a path exists in memory storage. 

315 

316 Args: 

317 path: Path to check 

318 

319 Returns: 

320 bool: True if path exists (as file or directory), False otherwise 

321 """ 

322 key = self._normalize(path) 

323 return key in self._memory_store 

324 

325 def is_file(self, path: str | Path) -> bool: 

326 """ 

327 Check if a memory path points to a file. 

328 

329 Raises: 

330 IsADirectoryError: If path exists and is a directory 

331 

332 Returns: 

333 bool: True if path exists and is a file, False otherwise 

334 """ 

335 key = self._normalize(path) 

336 

337 if key not in self._memory_store: 

338 return False 

339 

340 value = self._memory_store[key] 

341 # Raise if it's a directory 

342 if value is None: 

343 raise IsADirectoryError(f"Path is a directory: {path}") 

344 # File if value is not None 

345 return True 

346 

347 def is_dir(self, path: str | Path) -> bool: 

348 """ 

349 Check if a memory path points to a directory. 

350 

351 Args: 

352 path: Path to check 

353 

354 Raises: 

355 NotADirectoryError: If path exists and is a file 

356 

357 Returns: 

358 bool: True if path exists and is a directory, False otherwise 

359 """ 

360 key = self._normalize(path) 

361 

362 if key not in self._memory_store: 

363 return False 

364 

365 value = self._memory_store[key] 

366 # Raise if it's a file 

367 if value is not None: 

368 raise NotADirectoryError(f"Path is not a directory: {path}") 

369 # Directory if value is None 

370 return True 

371 

372 def _resolve_path(self, path: str | Path) -> Any | None: 

373 """ 

374 Resolves a memory-style virtual path into an in-memory object (file or directory). 

375 

376 Args: 

377 path: Memory-style path, e.g., '/root/dir1/file.txt' 

378 

379 Returns: 

380 The object at that path (could be None for directory or content for file), or None if not found 

381 """ 

382 key = self._normalize(path) 

383 return self._memory_store.get(key) 

384 

385 def move(self, src: str | Path, dst: str | Path) -> None: 

386 """ 

387 Move a file or directory within the memory store. Symlinks are preserved as objects. 

388 

389 Raises: 

390 FileNotFoundError: If src path or dst parent path does not exist 

391 FileExistsError: If destination already exists 

392 StorageResolutionError: On structure violations 

393 """ 

394 src_key = self._normalize(src) 

395 dst_key = self._normalize(dst) 

396 

397 # Check source exists 

398 if src_key not in self._memory_store: 

399 raise FileNotFoundError(f"Source not found: {src}") 

400 

401 # Check destination parent exists and is a directory 

402 dst_parent = self._normalize(Path(dst_key).parent) 

403 if dst_parent != ".": 

404 if dst_parent not in self._memory_store: 

405 raise FileNotFoundError(f"Destination parent path does not exist: {dst}") 

406 # Check if parent is actually a directory (None value) 

407 if self._memory_store[dst_parent] is not None: 

408 raise StorageResolutionError(f"Destination parent is not a directory: {dst_parent}") 

409 

410 # Check destination doesn't exist 

411 if dst_key in self._memory_store: 

412 raise FileExistsError(f"Destination already exists: {dst}") 

413 

414 # Move the item (works for files and directories) 

415 self._memory_store[dst_key] = self._memory_store.pop(src_key) 

416 

417 # If moving a directory, also move all files/subdirs under it 

418 if self._memory_store[dst_key] is None: # It's a directory 

419 src_prefix = src_key if src_key.endswith("/") else src_key + "/" 

420 dst_prefix = dst_key if dst_key.endswith("/") else dst_key + "/" 

421 

422 # Find all items under source directory and move them 

423 keys_to_move = [k for k in self._memory_store.keys() if k.startswith(src_prefix)] 

424 for key in keys_to_move: 

425 rel_path = key[len(src_prefix) :] 

426 new_key = dst_prefix + rel_path 

427 self._memory_store[new_key] = self._memory_store.pop(key) 

428 

429 def copy(self, src: str | Path, dst: str | Path) -> None: 

430 """ 

431 Copy a file, directory, or symlink within the memory store. 

432 

433 - Respects structural separation (no fallback) 

434 - Will not overwrite destination 

435 - Will not create missing parent directories 

436 - Symlinks are copied as objects 

437 

438 Raises: 

439 FileNotFoundError: If src does not exist or dst parent is missing 

440 FileExistsError: If dst already exists 

441 StorageResolutionError: On invalid structure 

442 """ 

443 src_key = self._normalize(src) 

444 dst_key = self._normalize(dst) 

445 

446 # Check source exists 

447 if src_key not in self._memory_store: 

448 raise FileNotFoundError(f"Source not found: {src}") 

449 

450 # Check destination parent exists and is a directory 

451 dst_parent = self._normalize(Path(dst_key).parent) 

452 if dst_parent != ".": 

453 if dst_parent not in self._memory_store: 

454 raise FileNotFoundError(f"Destination parent path does not exist: {dst}") 

455 # Check if parent is actually a directory (None value) 

456 if self._memory_store[dst_parent] is not None: 

457 raise StorageResolutionError(f"Destination parent is not a directory: {dst_parent}") 

458 

459 # Check destination doesn't exist 

460 if dst_key in self._memory_store: 

461 raise FileExistsError(f"Destination already exists: {dst}") 

462 

463 # Copy the item (deep copy to avoid aliasing) 

464 self._memory_store[dst_key] = py_copy.deepcopy(self._memory_store[src_key]) 

465 

466 # If copying a directory, also copy all files/subdirs under it 

467 if self._memory_store[dst_key] is None: # It's a directory 

468 src_prefix = src_key if src_key.endswith("/") else src_key + "/" 

469 dst_prefix = dst_key if dst_key.endswith("/") else dst_key + "/" 

470 

471 # Find all items under source directory and copy them 

472 keys_to_copy = [k for k in self._memory_store.keys() if k.startswith(src_prefix)] 

473 for key in keys_to_copy: 

474 rel_path = key[len(src_prefix) :] 

475 new_key = dst_prefix + rel_path 

476 self._memory_store[new_key] = py_copy.deepcopy(self._memory_store[key]) 

477 

478 def stat(self, path: str | Path) -> dict[str, Any]: 

479 """ 

480 Return structural metadata about a memory-backed path. 

481 

482 Returns: 

483 dict with keys: 

484 - 'type': 'file', 'directory', 'symlink', or 'missing' 

485 - 'path': str(path) 

486 - 'target': symlink target if applicable 

487 - 'exists': bool 

488 

489 Raises: 

490 StorageResolutionError: On resolution failure 

491 """ 

492 key = self._normalize(path) 

493 

494 try: 

495 # Check if path exists in store 

496 if key not in self._memory_store: 

497 return {"type": "missing", "path": str(path), "exists": False} 

498 

499 obj = self._memory_store[key] 

500 

501 # Check if it's a symlink 

502 if isinstance(obj, MemorySymlink): 

503 # Check if symlink target exists 

504 target_exists = self._resolve_path(obj.target) is not None 

505 return { 

506 "type": "symlink", 

507 "path": str(path), 

508 "target": obj.target, 

509 "exists": target_exists, 

510 } 

511 

512 # Check if it's a directory (None value) 

513 if obj is None: 

514 return {"type": "directory", "path": str(path), "exists": True} 

515 

516 # Otherwise it's a file 

517 return {"type": "file", "path": str(path), "exists": True} 

518 

519 except Exception as e: 

520 raise StorageResolutionError(f"Failed to stat memory path: {path}") from e 

521 

522 def clear_files_only(self) -> None: 

523 """ 

524 Clear all files from the memory store while preserving directory structure. 

525 

526 This method removes all file entries (non-None values) but keeps directory 

527 entries (None values) intact. This prevents key collisions when reusing 

528 the same processing context while maintaining the directory structure 

529 needed for subsequent operations. 

530 

531 Note: 

532 - Directories (entries with None values) are preserved 

533 - Files (entries with non-None values) are deleted 

534 - Symlinks are also deleted as they are considered file-like objects 

535 - GPU objects are explicitly deleted before removal 

536 - Caller is responsible for calling gc.collect() and GPU cleanup after this method 

537 """ 

538 try: 

539 # Collect keys and objects to delete (preserve directories) 

540 files_to_delete = [] 

541 gpu_objects_found = 0 

542 

543 for key, value in list(self._memory_store.items()): 

544 # Delete files (non-None values) and symlinks, but keep directories (None values) 

545 if value is not None: 

546 files_to_delete.append(key) 

547 

548 # Check if this is a GPU object that needs explicit cleanup 

549 if self._is_gpu_object(value): 

550 gpu_objects_found += 1 

551 self._explicit_gpu_delete(value, key) 

552 

553 # Delete all file entries from memory store 

554 for key in files_to_delete: 

555 del self._memory_store[key] 

556 

557 logger.debug( 

558 f"Cleared {len(files_to_delete)} files from memory backend (including {gpu_objects_found} GPU objects), " 

559 f"preserved {len(self._memory_store)} directories" 

560 ) 

561 

562 except Exception as e: 

563 raise StorageResolutionError("Failed to clear files from memory store") from e 

564 

565 def _is_gpu_object(self, obj: Any) -> bool: 

566 """ 

567 Check if an object is a GPU tensor/array that needs explicit cleanup. 

568 

569 Args: 

570 obj: Object to check 

571 

572 Returns: 

573 True if object is a GPU tensor/array 

574 """ 

575 try: 

576 # Check for PyTorch tensors on GPU 

577 if hasattr(obj, "device") and hasattr(obj, "is_cuda"): 

578 if obj.is_cuda: 

579 return True 

580 

581 # Check for CuPy arrays 

582 if hasattr(obj, "__class__") and "cupy" in str(type(obj)): 

583 return True 

584 

585 # Check for other GPU arrays by device attribute 

586 if hasattr(obj, "device") and hasattr(obj.device, "type"): 

587 if "cuda" in str(obj.device.type).lower() or "gpu" in str(obj.device.type).lower(): 

588 return True 

589 

590 return False 

591 except Exception: 

592 # If we can't determine, assume it's not a GPU object 

593 return False 

594 

595 def _explicit_gpu_delete(self, obj: Any, key: str) -> None: 

596 """ 

597 Explicitly delete a GPU object and clear its memory. 

598 

599 Args: 

600 obj: GPU object to delete 

601 key: Memory backend key for logging 

602 """ 

603 try: 

604 # For PyTorch tensors 

605 if hasattr(obj, "device") and hasattr(obj, "is_cuda") and obj.is_cuda: 

606 device_id = obj.device.index if obj.device.index is not None else 0 

607 # Move to CPU first to free GPU memory, then delete 

608 obj_cpu = obj.cpu() 

609 del obj_cpu 

610 logger.debug(f"🔥 EXPLICIT GPU DELETE: PyTorch tensor {key} on device {device_id}") 

611 return 

612 

613 # For CuPy arrays 

614 if hasattr(obj, "__class__") and "cupy" in str(type(obj)): 

615 # CuPy arrays are automatically freed when deleted 

616 logger.debug(f"🔥 EXPLICIT GPU DELETE: CuPy array {key}") 

617 return 

618 

619 # For other GPU objects 

620 if hasattr(obj, "device"): 

621 logger.debug(f"🔥 EXPLICIT GPU DELETE: GPU object {key} on device {obj.device}") 

622 

623 except Exception as e: 

624 logger.warning(f"Failed to explicitly delete GPU object {key}: {e}") 

625 

626 

627class MemorySymlink: 

628 def __init__(self, target: str): 

629 self.target = target # Must be a normalized key path 

630 

631 def __repr__(self): 

632 return f"<MemorySymlink → {self.target}>"