Coverage for openhcs/io/memory.py: 10.9%
318 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 18:33 +0000
1# openhcs/io/storage/backends/memory.py
2"""
3Memory storage backend module for OpenHCS.
5This module provides an in-memory implementation of the MicroscopyStorageBackend interface.
6It stores data in memory using MemoryWrapper objects and supports overlay operations
7for materializing data to disk when needed.
9This implementation enforces Clause 106-A (Declared Memory Types) and
10Clause 251 (Declarative Memory Conversion Interface) by requiring explicit
11memory type declarations and providing declarative conversion methods.
12"""
14import fnmatch
15import logging
16from pathlib import Path
17from typing import Any, Dict, List, Literal, Optional, Set, Union
18from os import PathLike
19import copy as pycopy
21from openhcs.io.base import StorageBackend
22from openhcs.io.backend_registry import StorageBackendMeta
23from openhcs.constants.constants import Backend
25logger = logging.getLogger(__name__)
28class MemoryStorageBackend(StorageBackend, metaclass=StorageBackendMeta):
29 """Memory storage backend with automatic metaclass registration."""
31 # Backend type from enum for registration
32 _backend_type = Backend.MEMORY.value
33 def __init__(self, shared_dict: Optional[Dict[str, Any]] = None):
34 """
35 Initializes the memory storage.
37 Args:
38 shared_dict: If provided, uses this dictionary as the storage backend.
39 This is useful for sharing memory between processes with a
40 multiprocessing.Manager.dict. If None, a new local
41 dictionary is created.
42 """
43 self._memory_store = shared_dict if shared_dict is not None else {}
44 self._prefixes = set() # Declared directory-like namespaces
46 def _normalize(self, path: Union[str, Path],bypass_normalization=False) -> str:
47 """
48 Normalize paths for memory backend storage.
50 Memory backend uses relative paths internally to avoid conflicts
51 between absolute paths from different systems. This method converts
52 absolute paths to relative paths by removing the root component.
54 Args:
55 path: Path to normalize (absolute or relative)
57 Returns:
58 Normalized relative path string
59 """
60 path_obj = Path(path)
62 if bypass_normalization:
63 return path_obj.as_posix()
65 # Store paths as-is - no forced relative conversion
66 # This preserves absolute paths which are needed for cross-backend operations
67 return path_obj.as_posix()
69 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
70 key = self._normalize(file_path)
72 if key not in self._memory_store:
73 raise FileNotFoundError(f"Memory path not found: {file_path}")
75 value = self._memory_store[key]
76 if value is None:
77 raise IsADirectoryError(f"Path is a directory: {file_path}")
79 return value
81 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None:
82 key = self._normalize(output_path)
84 # Check if parent directory exists (simple flat structure)
85 parent_path = self._normalize(Path(key).parent)
86 if parent_path != '.' and parent_path not in self._memory_store:
87 raise FileNotFoundError(f"Parent path does not exist: {output_path}")
89 # Check if file already exists
90 if key in self._memory_store:
91 raise FileExistsError(f"Path already exists: {output_path}")
92 self._memory_store[key] = data
94 # Save the file
96 def load_batch(self, file_paths: List[Union[str, Path]]) -> List[Any]:
97 """
98 Load multiple files sequentially using existing load method.
100 Args:
101 file_paths: List of file paths to load
102 **kwargs: Additional arguments passed to load method
104 Returns:
105 List of loaded data objects in the same order as file_paths
106 """
107 # 🔧 DEBUG: Show memory contents before batch load
108 print(f"🔧 MEMORY DEBUG: About to load {len(file_paths)} files")
109 print(f"🔧 MEMORY DEBUG: Requested paths: {[str(p) for p in file_paths]}")
110 print(f"🔧 MEMORY DEBUG: Total files in memory: {len(self._memory_store)}")
111 print(f"🔧 MEMORY DEBUG: Memory keys (first 10): {list(self._memory_store.keys())[:10]}")
113 # Show directory structure (thread-safe copy of keys)
114 directories = set()
115 for path in list(self._memory_store.keys()):
116 directories.add(str(Path(path).parent))
117 print(f"🔧 MEMORY DEBUG: Directories in memory: {sorted(directories)}")
119 results = []
120 for file_path in file_paths:
121 result = self.load(file_path)
122 results.append(result)
123 return results
125 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]]) -> None:
126 """
127 Save multiple files sequentially using existing save method.
129 Args:
130 data_list: List of data objects to save
131 output_paths: List of destination paths (must match length of data_list)
132 **kwargs: Additional arguments passed to save method
134 Raises:
135 ValueError: If data_list and output_paths have different lengths
136 """
137 if len(data_list) != len(output_paths):
138 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})")
140 for data, output_path in zip(data_list, output_paths):
141 self.save(data, output_path)
145 # Show directory structure (thread-safe copy of keys)
146 directories = set()
147 for path in list(self._memory_store.keys()):
148 directories.add(str(Path(path).parent))
149 print(f"🔧 MEMORY DEBUG: Directories in memory: {sorted(directories)}")
151 def list_files(
152 self,
153 directory: Union[str, Path],
154 pattern: str = "*",
155 extensions: Optional[Set[str]] = None,
156 recursive: bool = False
157 ) -> List[Path]:
158 from fnmatch import fnmatch
160 dir_key = self._normalize(directory)
162 # Check if directory exists and is a directory
163 if dir_key not in self._memory_store:
164 raise FileNotFoundError(f"Directory not found: {directory}")
165 if self._memory_store[dir_key] is not None:
166 raise NotADirectoryError(f"Path is not a directory: {directory}")
168 result = []
169 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key
171 for path, value in list(self._memory_store.items()):
172 # Skip if not under this directory
173 if not path.startswith(dir_prefix):
174 continue
176 # Get relative path from directory
177 rel_path = path[len(dir_prefix):]
179 # Skip if recursive=False and path has subdirectories
180 if not recursive and "/" in rel_path:
181 continue
183 # Only include files (value is not None)
184 if value is not None:
185 filename = Path(rel_path).name
186 # If pattern is None, match all files
187 if pattern is None or fnmatch(filename, pattern):
188 if not extensions or Path(filename).suffix in extensions:
189 # Calculate depth for breadth-first sorting
190 depth = rel_path.count('/')
191 result.append((Path(path), depth))
193 # Sort by depth first (breadth-first), then by path for consistency
194 result.sort(key=lambda x: (x[1], str(x[0])))
196 # Return just the paths
197 return [path for path, _ in result]
199 def list_dir(self, path: Union[str, Path]) -> List[str]:
200 dir_key = self._normalize(path)
202 # Check if directory exists and is a directory
203 if dir_key not in self._memory_store:
204 raise FileNotFoundError(f"Directory not found: {path}")
205 if self._memory_store[dir_key] is not None:
206 raise NotADirectoryError(f"Path is not a directory: {path}")
208 # Find all direct children of this directory
209 result = set()
210 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key
212 for stored_path in list(self._memory_store.keys()):
213 if stored_path.startswith(dir_prefix):
214 rel_path = stored_path[len(dir_prefix):]
215 # Only direct children (no subdirectories)
216 if "/" not in rel_path:
217 result.add(rel_path)
218 else:
219 # Add the first directory component
220 first_dir = rel_path.split("/")[0]
221 result.add(first_dir)
223 return list(result)
226 def delete(self, path: Union[str, Path]) -> None:
227 """
228 Delete a file or empty directory from the in-memory store.
230 This method does not support recursive deletion.
232 Args:
233 path: Virtual path to delete
235 Raises:
236 FileNotFoundError: If the path does not exist
237 IsADirectoryError: If path is a non-empty directory
238 StorageResolutionError: For unexpected internal failures
239 """
240 key = self._normalize(path)
242 if key not in self._memory_store:
243 raise FileNotFoundError(f"Path not found: {path}")
245 # If it's a directory, check if it's empty
246 if self._memory_store[key] is None:
247 # Check if directory has any children
248 dir_prefix = key + "/" if not key.endswith("/") else key
249 for stored_path in list(self._memory_store.keys()):
250 if stored_path.startswith(dir_prefix):
251 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}")
253 try:
254 del self._memory_store[key]
255 except Exception as e:
256 raise StorageResolutionError(f"Failed to delete path from memory store: {path}") from e
258 def delete_all(self, path: Union[str, Path]) -> None:
259 """
260 Recursively delete a file, empty directory, or a nested directory tree
261 from the in-memory store.
263 This method is the only allowed way to recursively delete in memory backend.
265 Args:
266 path: Virtual path to delete
268 Raises:
269 FileNotFoundError: If the path does not exist
270 StorageResolutionError: If internal deletion fails
271 """
272 key = self._normalize(path)
274 if key not in self._memory_store:
275 raise FileNotFoundError(f"Path not found: {path}")
277 try:
278 # Delete the path itself
279 del self._memory_store[key]
281 # If it was a directory, delete all children
282 dir_prefix = key + "/" if not key.endswith("/") else key
283 keys_to_delete = [k for k in list(self._memory_store.keys()) if k.startswith(dir_prefix)]
284 for k in keys_to_delete:
285 del self._memory_store[k]
287 except Exception as e:
288 raise StorageResolutionError(f"Failed to recursively delete path: {path}") from e
290 def ensure_directory(self, directory: Union[str, Path]) -> Path:
291 key = self._normalize(directory)
292 self._prefixes.add(key if key.endswith("/") else key + "/")
294 # Create the entire directory hierarchy
295 path_obj = Path(key)
296 parts = path_obj.parts
298 # Create each parent directory in the hierarchy
299 for i in range(1, len(parts) + 1):
300 partial_path = self._normalize(Path(*parts[:i]))
301 if partial_path not in self._memory_store:
302 self._memory_store[partial_path] = None # Directory = None value
304 return Path(key)
307 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
308 src_parts = str(source).strip("/").split("/")
309 dst_parts = str(link_name).strip("/").split("/")
311 # Traverse to source
312 src_dict = self._memory_store
313 for part in src_parts[:-1]:
314 src_dict = src_dict.get(part)
315 if not isinstance(src_dict, dict):
316 raise FileNotFoundError(f"Invalid symlink source path: {source}")
317 src_key = src_parts[-1]
318 if src_key not in src_dict:
319 raise FileNotFoundError(f"Symlink source not found: {source}")
321 # Traverse to destination parent
322 dst_dict = self._memory_store
323 for part in dst_parts[:-1]:
324 dst_dict = dst_dict.get(part)
325 if dst_dict is None or not isinstance(dst_dict, dict):
326 raise FileNotFoundError(f"Destination parent path does not exist: {link_name}")
328 dst_key = dst_parts[-1]
329 if dst_key in dst_dict:
330 if not overwrite:
331 raise FileExistsError(f"Symlink destination already exists: {link_name}")
332 # Remove existing entry if overwrite=True
333 del dst_dict[dst_key]
335 dst_dict[dst_key] = MemorySymlink(target=str(source))
337 def is_symlink(self, path: Union[str, Path]) -> bool:
338 parts = str(path).strip("/").split("/")
339 current = self._memory_store
341 for part in parts[:-1]:
342 current = current.get(part)
343 if not isinstance(current, dict):
344 return False
346 key = parts[-1]
347 return isinstance(current.get(key), MemorySymlink)
349 def is_file(self, path: Union[str, Path]) -> bool:
350 """
351 Check if a memory path points to a file.
353 Raises:
354 FileNotFoundError: If path does not exist
355 IsADirectoryError: If path is a directory
356 """
357 key = self._normalize(path)
359 if key not in self._memory_store:
360 raise FileNotFoundError(f"Memory path does not exist: {path}")
362 value = self._memory_store[key]
363 if value is None:
364 raise IsADirectoryError(f"Path is a directory: {path}")
366 return True
368 def is_dir(self, path: Union[str, Path]) -> bool:
369 """
370 Check if a memory path points to a directory.
372 Args:
373 path: Path to check
375 Returns:
376 bool: True if path is a directory
378 Raises:
379 FileNotFoundError: If path does not exist
380 NotADirectoryError: If path is not a directory
381 """
382 key = self._normalize(path)
384 if key not in self._memory_store:
385 raise FileNotFoundError(f"Memory path does not exist: {path}")
387 value = self._memory_store[key]
388 if value is not None:
389 raise NotADirectoryError(f"Path is not a directory: {path}")
391 return True
393 def _resolve_path(self, path: Union[str, Path]) -> Optional[Any]:
394 """
395 Resolves a memory-style virtual path into an in-memory object (file or directory).
397 This performs a pure dictionary traversal. It never coerces types or guesses structure.
398 If any intermediate path component is missing or not a dict, resolution fails.
400 Args:
401 path: Memory-style path, e.g., 'root/dir1/file.txt'
403 Returns:
404 The object at that path (could be dict or content object), or None if not found
405 """
406 components = str(path).strip("/").split("/")
407 current = self._memory_store # root dict, e.g., {"root": {"file.txt": "data"}}
409 for comp in components:
410 if not isinstance(current, dict):
411 return None # hit a file too early
412 if comp not in current:
413 return None
414 current = current[comp]
416 return current
418 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
419 """
420 Move a file or directory within the memory store. Symlinks are preserved as objects.
422 Raises:
423 FileNotFoundError: If src path or dst parent path does not exist
424 FileExistsError: If destination already exists
425 StorageResolutionError: On structure violations
426 """
427 def _resolve_parent(path: Union[str, Path]):
428 parts = str(path).strip("/").split("/")
429 return parts[:-1], parts[-1]
431 src_parts, src_name = _resolve_parent(src)
432 dst_parts, dst_name = _resolve_parent(dst)
434 # Traverse to src
435 src_dict = self._memory_store
436 for part in src_parts:
437 src_dict = src_dict.get(part)
438 if not isinstance(src_dict, dict):
439 raise FileNotFoundError(f"Source path invalid: {src}")
440 if src_name not in src_dict:
441 raise FileNotFoundError(f"Source not found: {src}")
443 # Traverse to dst parent — do not create
444 dst_dict = self._memory_store
445 for part in dst_parts:
446 dst_dict = dst_dict.get(part)
447 if dst_dict is None:
448 raise FileNotFoundError(f"Destination parent path does not exist: {dst}")
449 if not isinstance(dst_dict, dict):
450 raise StorageResolutionError(f"Destination path is not a directory: {part}")
452 if dst_name in dst_dict:
453 raise FileExistsError(f"Destination already exists: {dst}")
455 try:
456 dst_dict[dst_name] = src_dict.pop(src_name)
457 except Exception as e:
458 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e
460 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
461 """
462 Copy a file, directory, or symlink within the memory store.
464 - Respects structural separation (no fallback)
465 - Will not overwrite destination
466 - Will not create missing parent directories
467 - Symlinks are copied as objects
469 Raises:
470 FileNotFoundError: If src does not exist or dst parent is missing
471 FileExistsError: If dst already exists
472 StorageResolutionError: On invalid structure
473 """
474 def _resolve_parent(path: Union[str, Path]):
475 parts = str(path).strip("/").split("/")
476 return parts[:-1], parts[-1]
478 src_parts, src_name = _resolve_parent(src)
479 dst_parts, dst_name = _resolve_parent(dst)
481 # Traverse to src object
482 src_dict = self._memory_store
483 for part in src_parts:
484 src_dict = src_dict.get(part)
485 if not isinstance(src_dict, dict):
486 raise FileNotFoundError(f"Source path invalid: {src}")
487 if src_name not in src_dict:
488 raise FileNotFoundError(f"Source not found: {src}")
489 obj = src_dict[src_name]
491 # Traverse to dst parent (do not create)
492 dst_dict = self._memory_store
493 for part in dst_parts:
494 dst_dict = dst_dict.get(part)
495 if dst_dict is None:
496 raise FileNotFoundError(f"Destination parent path does not exist: {dst}")
497 if not isinstance(dst_dict, dict):
498 raise StorageResolutionError(f"Destination path is not a directory: {part}")
500 if dst_name in dst_dict:
501 raise FileExistsError(f"Destination already exists: {dst}")
503 # Perform copy (deep to avoid aliasing)
504 try:
505 dst_dict[dst_name] = py_copy.deepcopy(obj)
506 except Exception as e:
507 raise StorageResolutionError(f"Failed to copy {src} to {dst}") from e
509 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
510 """
511 Return structural metadata about a memory-backed path.
513 Returns:
514 dict with keys:
515 - 'type': 'file', 'directory', 'symlink', or 'missing'
516 - 'path': str(path)
517 - 'target': symlink target if applicable
518 - 'exists': bool
520 Raises:
521 StorageResolutionError: On resolution failure
522 """
523 parts = str(path).strip("/").split("/")
524 current = self._memory_store
526 try:
527 for part in parts[:-1]:
528 current = current.get(part)
529 if current is None:
530 return {
531 "type": "missing",
532 "path": str(path),
533 "exists": False
534 }
535 if not isinstance(current, dict):
536 raise StorageResolutionError(f"Invalid intermediate path segment: {part}")
538 final_key = parts[-1]
539 if final_key not in current:
540 return {
541 "type": "missing",
542 "path": str(path),
543 "exists": False
544 }
546 obj = current[final_key]
548 if isinstance(obj, MemorySymlink):
549 return {
550 "type": "symlink",
551 "path": str(path),
552 "target": obj.target,
553 "exists": self._resolve_path(obj.target) is not None
554 }
556 if isinstance(obj, dict):
557 return {
558 "type": "directory",
559 "path": str(path),
560 "exists": True
561 }
563 return {
564 "type": "file",
565 "path": str(path),
566 "exists": True
567 }
569 except Exception as e:
570 raise StorageResolutionError(f"Failed to stat memory path: {path}") from e
572 def clear_files_only(self) -> None:
573 """
574 Clear all files from the memory store while preserving directory structure.
576 This method removes all file entries (non-None values) but keeps directory
577 entries (None values) intact. This prevents key collisions when reusing
578 the same processing context while maintaining the directory structure
579 needed for subsequent operations.
581 Enhanced with explicit GPU memory cleanup to ensure VRAM is freed when
582 objects are deleted from the memory backend.
584 Note:
585 - Directories (entries with None values) are preserved
586 - Files (entries with non-None values) are deleted
587 - Symlinks are also deleted as they are considered file-like objects
588 - GPU objects are explicitly deleted and VRAM is cleared
589 """
590 try:
591 # Collect keys and objects to delete (preserve directories)
592 files_to_delete = []
593 gpu_objects_found = 0
595 for key, value in list(self._memory_store.items()): 595 ↛ 597line 595 didn't jump to line 597 because the loop on line 595 never started
596 # Delete files (non-None values) and symlinks, but keep directories (None values)
597 if value is not None:
598 files_to_delete.append(key)
600 # Check if this is a GPU object that needs explicit cleanup
601 if self._is_gpu_object(value):
602 gpu_objects_found += 1
603 self._explicit_gpu_delete(value, key)
605 # Delete all file entries from memory store
606 for key in files_to_delete: 606 ↛ 607line 606 didn't jump to line 607 because the loop on line 606 never started
607 del self._memory_store[key]
609 # Force garbage collection to ensure GPU objects are freed
610 import gc
611 collected = gc.collect()
613 # Trigger GPU memory cleanup for all frameworks
614 try:
615 from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks
616 cleanup_all_gpu_frameworks()
617 logger.debug(f"🔥 GPU CLEANUP: Triggered comprehensive GPU cleanup after memory backend clear")
618 except Exception as cleanup_error:
619 logger.warning(f"Failed to trigger GPU cleanup after memory backend clear: {cleanup_error}")
621 logger.debug(f"Cleared {len(files_to_delete)} files from memory backend (including {gpu_objects_found} GPU objects), "
622 f"preserved {len(self._memory_store)} directories, collected {collected} objects")
624 except Exception as e:
625 raise StorageResolutionError(f"Failed to clear files from memory store") from e
627 def _is_gpu_object(self, obj: Any) -> bool:
628 """
629 Check if an object is a GPU tensor/array that needs explicit cleanup.
631 Args:
632 obj: Object to check
634 Returns:
635 True if object is a GPU tensor/array
636 """
637 try:
638 # Check for PyTorch tensors on GPU
639 if hasattr(obj, 'device') and hasattr(obj, 'is_cuda'):
640 if obj.is_cuda:
641 return True
643 # Check for CuPy arrays
644 if hasattr(obj, '__class__') and 'cupy' in str(type(obj)):
645 return True
647 # Check for other GPU arrays by device attribute
648 if hasattr(obj, 'device') and hasattr(obj.device, 'type'):
649 if 'cuda' in str(obj.device.type).lower() or 'gpu' in str(obj.device.type).lower():
650 return True
652 return False
653 except Exception:
654 # If we can't determine, assume it's not a GPU object
655 return False
657 def _explicit_gpu_delete(self, obj: Any, key: str) -> None:
658 """
659 Explicitly delete a GPU object and clear its memory.
661 Args:
662 obj: GPU object to delete
663 key: Memory backend key for logging
664 """
665 try:
666 # For PyTorch tensors
667 if hasattr(obj, 'device') and hasattr(obj, 'is_cuda') and obj.is_cuda:
668 device_id = obj.device.index if obj.device.index is not None else 0
669 # Move to CPU first to free GPU memory, then delete
670 obj_cpu = obj.cpu()
671 del obj_cpu
672 logger.debug(f"🔥 EXPLICIT GPU DELETE: PyTorch tensor {key} on device {device_id}")
673 return
675 # For CuPy arrays
676 if hasattr(obj, '__class__') and 'cupy' in str(type(obj)):
677 # CuPy arrays are automatically freed when deleted
678 logger.debug(f"🔥 EXPLICIT GPU DELETE: CuPy array {key}")
679 return
681 # For other GPU objects
682 if hasattr(obj, 'device'):
683 logger.debug(f"🔥 EXPLICIT GPU DELETE: GPU object {key} on device {obj.device}")
685 except Exception as e:
686 logger.warning(f"Failed to explicitly delete GPU object {key}: {e}")
688class MemorySymlink:
689 def __init__(self, target: str):
690 self.target = target # Must be a normalized key path
692 def __repr__(self):
693 return f"<MemorySymlink → {self.target}>"