Coverage for src/polystore/memory.py: 86%
263 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 06:58 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-03 06:58 +0000
1# polystore/memory.py
2"""
3Memory storage backend module.
5This module provides an in-memory implementation of the StorageBackend interface.
6It stores data in memory and supports directory operations.
7"""
9import copy as py_copy
10import logging
11from pathlib import Path, PurePosixPath
12from typing import Any
14from .base import StorageBackend
15from .exceptions import StorageResolutionError
17logger = logging.getLogger(__name__)
20class MemoryBackend(StorageBackend):
21 """Memory storage backend with automatic registration."""
23 _backend_type = "memory"
25 def __init__(self, shared_dict: dict[str, Any] | None = None):
26 """
27 Initializes the memory storage.
29 Args:
30 shared_dict: If provided, uses this dictionary as the storage backend.
31 This is useful for sharing memory between processes with a
32 multiprocessing.Manager.dict. If None, a new local
33 dictionary is created.
34 """
35 self._memory_store = shared_dict if shared_dict is not None else {}
36 self._prefixes = set() # Declared directory-like namespaces
38 def _normalize(self, path: str | Path, bypass_normalization=False) -> str:
39 """
40 Normalize paths for memory backend storage.
42 Memory backend uses absolute paths internally for consistency.
43 This method ensures paths are converted to absolute form.
45 Args:
46 path: Path to normalize (absolute or relative)
47 bypass_normalization: If True, return path as-is
49 Returns:
50 Normalized absolute path string
51 """
52 path_obj = Path(path)
54 if bypass_normalization:
55 return path_obj.as_posix()
57 # Convert to absolute path by prepending / if relative
58 posix_path = path_obj.as_posix()
59 if not posix_path.startswith("/"):
60 posix_path = "/" + posix_path
62 return posix_path
64 def load(self, file_path: str | Path, **kwargs) -> Any:
65 key = self._normalize(file_path)
67 if key not in self._memory_store:
68 raise FileNotFoundError(f"Memory path not found: {file_path}")
70 value = self._memory_store[key]
71 if value is None:
72 raise IsADirectoryError(f"Path is a directory: {file_path}")
74 return value
76 def save(self, data: Any, output_path: str | Path, **kwargs) -> None:
77 key = self._normalize(output_path)
79 # Check if parent directory exists (simple flat structure)
80 parent_path = self._normalize(Path(key).parent)
81 if parent_path != "." and parent_path not in self._memory_store:
82 raise FileNotFoundError(f"Parent path does not exist: {output_path}")
84 # Check if file already exists
85 if key in self._memory_store:
86 raise FileExistsError(f"Path already exists: {output_path}")
87 self._memory_store[key] = data
89 # Save the file
91 def load_batch(self, file_paths: list[str | Path]) -> list[Any]:
92 """
93 Load multiple files sequentially using existing load method.
95 Args:
96 file_paths: List of file paths to load
97 **kwargs: Additional arguments passed to load method
99 Returns:
100 List of loaded data objects in the same order as file_paths
101 """
102 results = []
103 for file_path in file_paths:
104 result = self.load(file_path)
105 results.append(result)
106 return results
108 def save_batch(self, data_list: list[Any], output_paths: list[str | Path]) -> None:
109 """
110 Save multiple files sequentially using existing save method.
112 Args:
113 data_list: List of data objects to save
114 output_paths: List of destination paths (must match length of data_list)
115 **kwargs: Additional arguments passed to save method
117 Raises:
118 ValueError: If data_list and output_paths have different lengths
119 """
120 if len(data_list) != len(output_paths):
121 raise ValueError(
122 f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})"
123 )
125 for data, output_path in zip(data_list, output_paths, strict=False):
126 self.save(data, output_path)
128 def list_files(
129 self,
130 directory: str | Path,
131 pattern: str = "*",
132 extensions: set[str] | None = None,
133 recursive: bool = False,
134 ) -> list[Path]:
135 from fnmatch import fnmatch
137 dir_key = self._normalize(directory)
139 # Check if directory exists and is a directory
140 if dir_key not in self._memory_store:
141 raise FileNotFoundError(f"Directory not found: {directory}")
142 if self._memory_store[dir_key] is not None:
143 raise NotADirectoryError(f"Path is not a directory: {directory}")
145 result = []
146 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key
148 for path, value in list(self._memory_store.items()):
149 # Skip if not under this directory
150 if not path.startswith(dir_prefix):
151 continue
153 # Get relative path from directory
154 rel_path = path[len(dir_prefix) :]
156 # Skip if recursive=False and path has subdirectories
157 if not recursive and "/" in rel_path:
158 continue
160 # Only include files (value is not None)
161 if value is not None:
162 filename = Path(rel_path).name
163 # If pattern is None, match all files
164 if pattern is None or fnmatch(filename, pattern):
165 if not extensions or Path(filename).suffix in extensions:
166 # Calculate depth for breadth-first sorting
167 depth = rel_path.count("/")
168 result.append((Path(path), depth))
170 # Sort by depth first (breadth-first), then by path for consistency
171 result.sort(key=lambda x: (x[1], str(x[0])))
173 # Return just the paths
174 return [path for path, _ in result]
176 def list_dir(self, path: str | Path) -> list[str]:
177 dir_key = self._normalize(path)
179 # Check if directory exists and is a directory
180 if dir_key not in self._memory_store:
181 raise FileNotFoundError(f"Directory not found: {path}")
182 if self._memory_store[dir_key] is not None:
183 raise NotADirectoryError(f"Path is not a directory: {path}")
185 # Find all direct children of this directory
186 result = set()
187 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key
189 for stored_path in list(self._memory_store.keys()):
190 if stored_path.startswith(dir_prefix):
191 rel_path = stored_path[len(dir_prefix) :]
192 # Only direct children (no subdirectories)
193 if "/" not in rel_path:
194 result.add(rel_path)
195 else:
196 # Add the first directory component
197 first_dir = rel_path.split("/")[0]
198 result.add(first_dir)
200 return list(result)
202 def delete(self, path: str | Path) -> None:
203 """
204 Delete a file or empty directory from the in-memory store.
206 This method does not support recursive deletion.
208 Args:
209 path: Virtual path to delete
211 Raises:
212 FileNotFoundError: If the path does not exist
213 IsADirectoryError: If path is a non-empty directory
214 StorageResolutionError: For unexpected internal failures
215 """
216 key = self._normalize(path)
218 if key not in self._memory_store:
219 raise FileNotFoundError(f"Path not found: {path}")
221 # If it's a directory, check if it's empty
222 if self._memory_store[key] is None:
223 # Check if directory has any children
224 dir_prefix = key + "/" if not key.endswith("/") else key
225 for stored_path in list(self._memory_store.keys()):
226 if stored_path.startswith(dir_prefix):
227 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}")
229 try:
230 del self._memory_store[key]
231 except Exception as e:
232 raise StorageResolutionError(f"Failed to delete path from memory store: {path}") from e
234 def delete_all(self, path: str | Path) -> None:
235 """
236 Recursively delete a file, empty directory, or a nested directory tree
237 from the in-memory store.
239 This method is the only allowed way to recursively delete in memory backend.
241 Args:
242 path: Virtual path to delete
244 Raises:
245 FileNotFoundError: If the path does not exist
246 StorageResolutionError: If internal deletion fails
247 """
248 key = self._normalize(path)
250 if key not in self._memory_store:
251 raise FileNotFoundError(f"Path not found: {path}")
253 try:
254 # Delete the path itself
255 del self._memory_store[key]
257 # If it was a directory, delete all children
258 dir_prefix = key + "/" if not key.endswith("/") else key
259 keys_to_delete = [
260 k for k in list(self._memory_store.keys()) if k.startswith(dir_prefix)
261 ]
262 for k in keys_to_delete:
263 del self._memory_store[k]
265 except Exception as e:
266 raise StorageResolutionError(f"Failed to recursively delete path: {path}") from e
268 def ensure_directory(self, directory: str | Path) -> PurePosixPath:
269 key = self._normalize(directory)
270 self._prefixes.add(key if key.endswith("/") else key + "/")
272 # Create the entire directory hierarchy
273 path_obj = Path(key)
274 parts = path_obj.parts
276 # Create each parent directory in the hierarchy
277 for i in range(1, len(parts) + 1):
278 partial_path = self._normalize(Path(*parts[:i]))
279 if partial_path not in self._memory_store:
280 self._memory_store[partial_path] = None # Directory = None value
282 # Return a POSIX-style path object so string conversion preserves
283 # forward slashes across platforms (important for Windows CI/tests)
284 return PurePosixPath(key)
286 def create_symlink(self, source: str | Path, link_name: str | Path, overwrite: bool = False):
287 src_key = self._normalize(source)
288 link_key = self._normalize(link_name)
290 # Mirror disk backend semantics: require the target to exist.
291 if src_key not in self._memory_store:
292 raise FileNotFoundError(f"Symlink source not found: {source}")
294 # Check destination parent exists
295 link_parent = self._normalize(Path(link_key).parent)
296 if link_parent != "." and link_parent not in self._memory_store:
297 raise FileNotFoundError(f"Destination parent path does not exist: {link_name}")
299 # Check if destination already exists
300 if link_key in self._memory_store:
301 if not overwrite:
302 raise FileExistsError(f"Symlink destination already exists: {link_name}")
303 # Remove existing entry if overwrite=True
304 del self._memory_store[link_key]
306 self._memory_store[link_key] = MemorySymlink(target=str(source))
308 def is_symlink(self, path: str | Path) -> bool:
309 key = self._normalize(path)
310 return isinstance(self._memory_store.get(key), MemorySymlink)
312 def exists(self, path: str | Path) -> bool:
313 """
314 Check if a path exists in memory storage.
316 Args:
317 path: Path to check
319 Returns:
320 bool: True if path exists (as file or directory), False otherwise
321 """
322 key = self._normalize(path)
323 return key in self._memory_store
325 def is_file(self, path: str | Path) -> bool:
326 """
327 Check if a memory path points to a file.
329 Raises:
330 IsADirectoryError: If path exists and is a directory
332 Returns:
333 bool: True if path exists and is a file, False otherwise
334 """
335 key = self._normalize(path)
337 if key not in self._memory_store:
338 return False
340 value = self._memory_store[key]
341 # Raise if it's a directory
342 if value is None:
343 raise IsADirectoryError(f"Path is a directory: {path}")
344 # File if value is not None
345 return True
347 def is_dir(self, path: str | Path) -> bool:
348 """
349 Check if a memory path points to a directory.
351 Args:
352 path: Path to check
354 Raises:
355 NotADirectoryError: If path exists and is a file
357 Returns:
358 bool: True if path exists and is a directory, False otherwise
359 """
360 key = self._normalize(path)
362 if key not in self._memory_store:
363 return False
365 value = self._memory_store[key]
366 # Raise if it's a file
367 if value is not None:
368 raise NotADirectoryError(f"Path is not a directory: {path}")
369 # Directory if value is None
370 return True
372 def _resolve_path(self, path: str | Path) -> Any | None:
373 """
374 Resolves a memory-style virtual path into an in-memory object (file or directory).
376 Args:
377 path: Memory-style path, e.g., '/root/dir1/file.txt'
379 Returns:
380 The object at that path (could be None for directory or content for file), or None if not found
381 """
382 key = self._normalize(path)
383 return self._memory_store.get(key)
385 def move(self, src: str | Path, dst: str | Path) -> None:
386 """
387 Move a file or directory within the memory store. Symlinks are preserved as objects.
389 Raises:
390 FileNotFoundError: If src path or dst parent path does not exist
391 FileExistsError: If destination already exists
392 StorageResolutionError: On structure violations
393 """
394 src_key = self._normalize(src)
395 dst_key = self._normalize(dst)
397 # Check source exists
398 if src_key not in self._memory_store:
399 raise FileNotFoundError(f"Source not found: {src}")
401 # Check destination parent exists and is a directory
402 dst_parent = self._normalize(Path(dst_key).parent)
403 if dst_parent != ".":
404 if dst_parent not in self._memory_store:
405 raise FileNotFoundError(f"Destination parent path does not exist: {dst}")
406 # Check if parent is actually a directory (None value)
407 if self._memory_store[dst_parent] is not None:
408 raise StorageResolutionError(f"Destination parent is not a directory: {dst_parent}")
410 # Check destination doesn't exist
411 if dst_key in self._memory_store:
412 raise FileExistsError(f"Destination already exists: {dst}")
414 # Move the item (works for files and directories)
415 self._memory_store[dst_key] = self._memory_store.pop(src_key)
417 # If moving a directory, also move all files/subdirs under it
418 if self._memory_store[dst_key] is None: # It's a directory
419 src_prefix = src_key if src_key.endswith("/") else src_key + "/"
420 dst_prefix = dst_key if dst_key.endswith("/") else dst_key + "/"
422 # Find all items under source directory and move them
423 keys_to_move = [k for k in self._memory_store.keys() if k.startswith(src_prefix)]
424 for key in keys_to_move:
425 rel_path = key[len(src_prefix) :]
426 new_key = dst_prefix + rel_path
427 self._memory_store[new_key] = self._memory_store.pop(key)
429 def copy(self, src: str | Path, dst: str | Path) -> None:
430 """
431 Copy a file, directory, or symlink within the memory store.
433 - Respects structural separation (no fallback)
434 - Will not overwrite destination
435 - Will not create missing parent directories
436 - Symlinks are copied as objects
438 Raises:
439 FileNotFoundError: If src does not exist or dst parent is missing
440 FileExistsError: If dst already exists
441 StorageResolutionError: On invalid structure
442 """
443 src_key = self._normalize(src)
444 dst_key = self._normalize(dst)
446 # Check source exists
447 if src_key not in self._memory_store:
448 raise FileNotFoundError(f"Source not found: {src}")
450 # Check destination parent exists and is a directory
451 dst_parent = self._normalize(Path(dst_key).parent)
452 if dst_parent != ".":
453 if dst_parent not in self._memory_store:
454 raise FileNotFoundError(f"Destination parent path does not exist: {dst}")
455 # Check if parent is actually a directory (None value)
456 if self._memory_store[dst_parent] is not None:
457 raise StorageResolutionError(f"Destination parent is not a directory: {dst_parent}")
459 # Check destination doesn't exist
460 if dst_key in self._memory_store:
461 raise FileExistsError(f"Destination already exists: {dst}")
463 # Copy the item (deep copy to avoid aliasing)
464 self._memory_store[dst_key] = py_copy.deepcopy(self._memory_store[src_key])
466 # If copying a directory, also copy all files/subdirs under it
467 if self._memory_store[dst_key] is None: # It's a directory
468 src_prefix = src_key if src_key.endswith("/") else src_key + "/"
469 dst_prefix = dst_key if dst_key.endswith("/") else dst_key + "/"
471 # Find all items under source directory and copy them
472 keys_to_copy = [k for k in self._memory_store.keys() if k.startswith(src_prefix)]
473 for key in keys_to_copy:
474 rel_path = key[len(src_prefix) :]
475 new_key = dst_prefix + rel_path
476 self._memory_store[new_key] = py_copy.deepcopy(self._memory_store[key])
478 def stat(self, path: str | Path) -> dict[str, Any]:
479 """
480 Return structural metadata about a memory-backed path.
482 Returns:
483 dict with keys:
484 - 'type': 'file', 'directory', 'symlink', or 'missing'
485 - 'path': str(path)
486 - 'target': symlink target if applicable
487 - 'exists': bool
489 Raises:
490 StorageResolutionError: On resolution failure
491 """
492 key = self._normalize(path)
494 try:
495 # Check if path exists in store
496 if key not in self._memory_store:
497 return {"type": "missing", "path": str(path), "exists": False}
499 obj = self._memory_store[key]
501 # Check if it's a symlink
502 if isinstance(obj, MemorySymlink):
503 # Check if symlink target exists
504 target_exists = self._resolve_path(obj.target) is not None
505 return {
506 "type": "symlink",
507 "path": str(path),
508 "target": obj.target,
509 "exists": target_exists,
510 }
512 # Check if it's a directory (None value)
513 if obj is None:
514 return {"type": "directory", "path": str(path), "exists": True}
516 # Otherwise it's a file
517 return {"type": "file", "path": str(path), "exists": True}
519 except Exception as e:
520 raise StorageResolutionError(f"Failed to stat memory path: {path}") from e
522 def clear_files_only(self) -> None:
523 """
524 Clear all files from the memory store while preserving directory structure.
526 This method removes all file entries (non-None values) but keeps directory
527 entries (None values) intact. This prevents key collisions when reusing
528 the same processing context while maintaining the directory structure
529 needed for subsequent operations.
531 Note:
532 - Directories (entries with None values) are preserved
533 - Files (entries with non-None values) are deleted
534 - Symlinks are also deleted as they are considered file-like objects
535 - GPU objects are explicitly deleted before removal
536 - Caller is responsible for calling gc.collect() and GPU cleanup after this method
537 """
538 try:
539 # Collect keys and objects to delete (preserve directories)
540 files_to_delete = []
541 gpu_objects_found = 0
543 for key, value in list(self._memory_store.items()):
544 # Delete files (non-None values) and symlinks, but keep directories (None values)
545 if value is not None:
546 files_to_delete.append(key)
548 # Check if this is a GPU object that needs explicit cleanup
549 if self._is_gpu_object(value):
550 gpu_objects_found += 1
551 self._explicit_gpu_delete(value, key)
553 # Delete all file entries from memory store
554 for key in files_to_delete:
555 del self._memory_store[key]
557 logger.debug(
558 f"Cleared {len(files_to_delete)} files from memory backend (including {gpu_objects_found} GPU objects), "
559 f"preserved {len(self._memory_store)} directories"
560 )
562 except Exception as e:
563 raise StorageResolutionError("Failed to clear files from memory store") from e
565 def _is_gpu_object(self, obj: Any) -> bool:
566 """
567 Check if an object is a GPU tensor/array that needs explicit cleanup.
569 Args:
570 obj: Object to check
572 Returns:
573 True if object is a GPU tensor/array
574 """
575 try:
576 # Check for PyTorch tensors on GPU
577 if hasattr(obj, "device") and hasattr(obj, "is_cuda"):
578 if obj.is_cuda:
579 return True
581 # Check for CuPy arrays
582 if hasattr(obj, "__class__") and "cupy" in str(type(obj)):
583 return True
585 # Check for other GPU arrays by device attribute
586 if hasattr(obj, "device") and hasattr(obj.device, "type"):
587 if "cuda" in str(obj.device.type).lower() or "gpu" in str(obj.device.type).lower():
588 return True
590 return False
591 except Exception:
592 # If we can't determine, assume it's not a GPU object
593 return False
595 def _explicit_gpu_delete(self, obj: Any, key: str) -> None:
596 """
597 Explicitly delete a GPU object and clear its memory.
599 Args:
600 obj: GPU object to delete
601 key: Memory backend key for logging
602 """
603 try:
604 # For PyTorch tensors
605 if hasattr(obj, "device") and hasattr(obj, "is_cuda") and obj.is_cuda:
606 device_id = obj.device.index if obj.device.index is not None else 0
607 # Move to CPU first to free GPU memory, then delete
608 obj_cpu = obj.cpu()
609 del obj_cpu
610 logger.debug(f"🔥 EXPLICIT GPU DELETE: PyTorch tensor {key} on device {device_id}")
611 return
613 # For CuPy arrays
614 if hasattr(obj, "__class__") and "cupy" in str(type(obj)):
615 # CuPy arrays are automatically freed when deleted
616 logger.debug(f"🔥 EXPLICIT GPU DELETE: CuPy array {key}")
617 return
619 # For other GPU objects
620 if hasattr(obj, "device"):
621 logger.debug(f"🔥 EXPLICIT GPU DELETE: GPU object {key} on device {obj.device}")
623 except Exception as e:
624 logger.warning(f"Failed to explicitly delete GPU object {key}: {e}")
627class MemorySymlink:
628 def __init__(self, target: str):
629 self.target = target # Must be a normalized key path
631 def __repr__(self):
632 return f"<MemorySymlink → {self.target}>"