Coverage for openhcs/io/memory.py: 35.4%
319 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1# openhcs/io/storage/backends/memory.py
2"""
3Memory storage backend module for OpenHCS.
5This module provides an in-memory implementation of the MicroscopyStorageBackend interface.
6It stores data in memory using MemoryWrapper objects and supports overlay operations
7for materializing data to disk when needed.
9This implementation enforces Clause 106-A (Declared Memory Types) and
10Clause 251 (Declarative Memory Conversion Interface) by requiring explicit
11memory type declarations and providing declarative conversion methods.
12"""
14import fnmatch
15import logging
16from pathlib import Path
17from typing import Any, Dict, List, Literal, Optional, Set, Union
18from os import PathLike
19import copy as pycopy
21from openhcs.io.base import StorageBackend
23logger = logging.getLogger(__name__)
26class MemoryStorageBackend(StorageBackend):
27 def __init__(self, shared_dict: Optional[Dict[str, Any]] = None):
28 """
29 Initializes the memory storage.
31 Args:
32 shared_dict: If provided, uses this dictionary as the storage backend.
33 This is useful for sharing memory between processes with a
34 multiprocessing.Manager.dict. If None, a new local
35 dictionary is created.
36 """
37 self._memory_store = shared_dict if shared_dict is not None else {}
38 self._prefixes = set() # Declared directory-like namespaces
40 def _normalize(self, path: Union[str, Path],bypass_normalization=False) -> str:
41 """
42 Normalize paths for memory backend storage.
44 Memory backend uses relative paths internally to avoid conflicts
45 between absolute paths from different systems. This method converts
46 absolute paths to relative paths by removing the root component.
48 Args:
49 path: Path to normalize (absolute or relative)
51 Returns:
52 Normalized relative path string
53 """
54 path_obj = Path(path)
56 if bypass_normalization: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 return path_obj.as_posix()
59 # Store paths as-is - no forced relative conversion
60 # This preserves absolute paths which are needed for cross-backend operations
61 return path_obj.as_posix()
63 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
64 key = self._normalize(file_path)
66 if key not in self._memory_store: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 raise FileNotFoundError(f"Memory path not found: {file_path}")
69 value = self._memory_store[key]
70 if value is None: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 raise IsADirectoryError(f"Path is a directory: {file_path}")
73 return value
75 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None:
76 key = self._normalize(output_path)
78 # Check if parent directory exists (simple flat structure)
79 parent_path = self._normalize(Path(key).parent)
80 if parent_path != '.' and parent_path not in self._memory_store: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 raise FileNotFoundError(f"Parent path does not exist: {output_path}")
83 # Check if file already exists
84 if key in self._memory_store: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 raise FileExistsError(f"Path already exists: {output_path}")
86 self._memory_store[key] = data
88 # Save the file
90 def load_batch(self, file_paths: List[Union[str, Path]]) -> List[Any]:
91 """
92 Load multiple files sequentially using existing load method.
94 Args:
95 file_paths: List of file paths to load
96 **kwargs: Additional arguments passed to load method
98 Returns:
99 List of loaded data objects in the same order as file_paths
100 """
101 # 🔧 DEBUG: Show memory contents before batch load
102 print(f"🔧 MEMORY DEBUG: About to load {len(file_paths)} files")
103 print(f"🔧 MEMORY DEBUG: Requested paths: {[str(p) for p in file_paths]}")
104 print(f"🔧 MEMORY DEBUG: Total files in memory: {len(self._memory_store)}")
105 print(f"🔧 MEMORY DEBUG: Memory keys (first 10): {list(self._memory_store.keys())[:10]}")
107 # Show directory structure (thread-safe copy of keys)
108 directories = set()
109 for path in list(self._memory_store.keys()):
110 directories.add(str(Path(path).parent))
111 print(f"🔧 MEMORY DEBUG: Directories in memory: {sorted(directories)}")
113 results = []
114 for file_path in file_paths:
115 result = self.load(file_path)
116 results.append(result)
117 return results
119 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]]) -> None:
120 """
121 Save multiple files sequentially using existing save method.
123 Args:
124 data_list: List of data objects to save
125 output_paths: List of destination paths (must match length of data_list)
126 **kwargs: Additional arguments passed to save method
128 Raises:
129 ValueError: If data_list and output_paths have different lengths
130 """
131 if len(data_list) != len(output_paths): 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})")
134 for data, output_path in zip(data_list, output_paths):
135 self.save(data, output_path)
137 # 🔧 DEBUG: Show memory contents after batch write
138 print(f"🔧 MEMORY DEBUG: Batch saved {len(output_paths)} files")
139 print(f"🔧 MEMORY DEBUG: Paths written: {[str(p) for p in output_paths]}")
140 print(f"🔧 MEMORY DEBUG: Total files in memory: {len(self._memory_store)}")
141 print(f"🔧 MEMORY DEBUG: Memory keys (first 10): {list(self._memory_store.keys())[:10]}")
143 # Show directory structure (thread-safe copy of keys)
144 directories = set()
145 for path in list(self._memory_store.keys()):
146 directories.add(str(Path(path).parent))
147 print(f"🔧 MEMORY DEBUG: Directories in memory: {sorted(directories)}")
149 def list_files(
150 self,
151 directory: Union[str, Path],
152 pattern: str = "*",
153 extensions: Optional[Set[str]] = None,
154 recursive: bool = False
155 ) -> List[Path]:
156 from fnmatch import fnmatch
158 dir_key = self._normalize(directory)
160 # Check if directory exists and is a directory
161 if dir_key not in self._memory_store: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 raise FileNotFoundError(f"Directory not found: {directory}")
163 if self._memory_store[dir_key] is not None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 raise NotADirectoryError(f"Path is not a directory: {directory}")
166 result = []
167 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key
169 for path, value in list(self._memory_store.items()):
170 # Skip if not under this directory
171 if not path.startswith(dir_prefix):
172 continue
174 # Get relative path from directory
175 rel_path = path[len(dir_prefix):]
177 # Skip if recursive=False and path has subdirectories
178 if not recursive and "/" in rel_path: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 continue
181 # Only include files (value is not None)
182 if value is not None: 182 ↛ 169line 182 didn't jump to line 169 because the condition on line 182 was always true
183 filename = Path(rel_path).name
184 # If pattern is None, match all files
185 if pattern is None or fnmatch(filename, pattern): 185 ↛ 169line 185 didn't jump to line 169 because the condition on line 185 was always true
186 if not extensions or Path(filename).suffix in extensions: 186 ↛ 169line 186 didn't jump to line 169 because the condition on line 186 was always true
187 # Calculate depth for breadth-first sorting
188 depth = rel_path.count('/')
189 result.append((Path(path), depth))
191 # Sort by depth first (breadth-first), then by path for consistency
192 result.sort(key=lambda x: (x[1], str(x[0])))
194 # Return just the paths
195 return [path for path, _ in result]
197 def list_dir(self, path: Union[str, Path]) -> List[str]:
198 dir_key = self._normalize(path)
200 # Check if directory exists and is a directory
201 if dir_key not in self._memory_store:
202 raise FileNotFoundError(f"Directory not found: {path}")
203 if self._memory_store[dir_key] is not None:
204 raise NotADirectoryError(f"Path is not a directory: {path}")
206 # Find all direct children of this directory
207 result = set()
208 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key
210 for stored_path in list(self._memory_store.keys()):
211 if stored_path.startswith(dir_prefix):
212 rel_path = stored_path[len(dir_prefix):]
213 # Only direct children (no subdirectories)
214 if "/" not in rel_path:
215 result.add(rel_path)
216 else:
217 # Add the first directory component
218 first_dir = rel_path.split("/")[0]
219 result.add(first_dir)
221 return list(result)
224 def delete(self, path: Union[str, Path]) -> None:
225 """
226 Delete a file or empty directory from the in-memory store.
228 This method does not support recursive deletion.
230 Args:
231 path: Virtual path to delete
233 Raises:
234 FileNotFoundError: If the path does not exist
235 IsADirectoryError: If path is a non-empty directory
236 StorageResolutionError: For unexpected internal failures
237 """
238 key = self._normalize(path)
240 if key not in self._memory_store: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 raise FileNotFoundError(f"Path not found: {path}")
243 # If it's a directory, check if it's empty
244 if self._memory_store[key] is None: 244 ↛ 246line 244 didn't jump to line 246 because the condition on line 244 was never true
245 # Check if directory has any children
246 dir_prefix = key + "/" if not key.endswith("/") else key
247 for stored_path in list(self._memory_store.keys()):
248 if stored_path.startswith(dir_prefix):
249 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}")
251 try:
252 del self._memory_store[key]
253 except Exception as e:
254 raise StorageResolutionError(f"Failed to delete path from memory store: {path}") from e
256 def delete_all(self, path: Union[str, Path]) -> None:
257 """
258 Recursively delete a file, empty directory, or a nested directory tree
259 from the in-memory store.
261 This method is the only allowed way to recursively delete in memory backend.
263 Args:
264 path: Virtual path to delete
266 Raises:
267 FileNotFoundError: If the path does not exist
268 StorageResolutionError: If internal deletion fails
269 """
270 key = self._normalize(path)
272 if key not in self._memory_store:
273 raise FileNotFoundError(f"Path not found: {path}")
275 try:
276 # Delete the path itself
277 del self._memory_store[key]
279 # If it was a directory, delete all children
280 dir_prefix = key + "/" if not key.endswith("/") else key
281 keys_to_delete = [k for k in list(self._memory_store.keys()) if k.startswith(dir_prefix)]
282 for k in keys_to_delete:
283 del self._memory_store[k]
285 except Exception as e:
286 raise StorageResolutionError(f"Failed to recursively delete path: {path}") from e
288 def ensure_directory(self, directory: Union[str, Path]) -> Path:
289 key = self._normalize(directory)
290 self._prefixes.add(key if key.endswith("/") else key + "/")
292 # Create the entire directory hierarchy
293 path_obj = Path(key)
294 parts = path_obj.parts
296 # Create each parent directory in the hierarchy
297 for i in range(1, len(parts) + 1):
298 partial_path = self._normalize(Path(*parts[:i]))
299 if partial_path not in self._memory_store:
300 self._memory_store[partial_path] = None # Directory = None value
302 return Path(key)
305 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
306 src_parts = str(source).strip("/").split("/")
307 dst_parts = str(link_name).strip("/").split("/")
309 # Traverse to source
310 src_dict = self._memory_store
311 for part in src_parts[:-1]:
312 src_dict = src_dict.get(part)
313 if not isinstance(src_dict, dict):
314 raise FileNotFoundError(f"Invalid symlink source path: {source}")
315 src_key = src_parts[-1]
316 if src_key not in src_dict:
317 raise FileNotFoundError(f"Symlink source not found: {source}")
319 # Traverse to destination parent
320 dst_dict = self._memory_store
321 for part in dst_parts[:-1]:
322 dst_dict = dst_dict.get(part)
323 if dst_dict is None or not isinstance(dst_dict, dict):
324 raise FileNotFoundError(f"Destination parent path does not exist: {link_name}")
326 dst_key = dst_parts[-1]
327 if dst_key in dst_dict:
328 if not overwrite:
329 raise FileExistsError(f"Symlink destination already exists: {link_name}")
330 # Remove existing entry if overwrite=True
331 del dst_dict[dst_key]
333 dst_dict[dst_key] = MemorySymlink(target=str(source))
335 def is_symlink(self, path: Union[str, Path]) -> bool:
336 parts = str(path).strip("/").split("/")
337 current = self._memory_store
339 for part in parts[:-1]:
340 current = current.get(part)
341 if not isinstance(current, dict):
342 return False
344 key = parts[-1]
345 return isinstance(current.get(key), MemorySymlink)
347 def is_file(self, path: Union[str, Path]) -> bool:
348 """
349 Check if a memory path points to a file.
351 Raises:
352 FileNotFoundError: If path does not exist
353 IsADirectoryError: If path is a directory
354 """
355 key = self._normalize(path)
357 if key not in self._memory_store:
358 raise FileNotFoundError(f"Memory path does not exist: {path}")
360 value = self._memory_store[key]
361 if value is None:
362 raise IsADirectoryError(f"Path is a directory: {path}")
364 return True
366 def is_dir(self, path: Union[str, Path]) -> bool:
367 """
368 Check if a memory path points to a directory.
370 Args:
371 path: Path to check
373 Returns:
374 bool: True if path is a directory
376 Raises:
377 FileNotFoundError: If path does not exist
378 NotADirectoryError: If path is not a directory
379 """
380 key = self._normalize(path)
382 if key not in self._memory_store:
383 raise FileNotFoundError(f"Memory path does not exist: {path}")
385 value = self._memory_store[key]
386 if value is not None: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 raise NotADirectoryError(f"Path is not a directory: {path}")
389 return True
391 def _resolve_path(self, path: Union[str, Path]) -> Optional[Any]:
392 """
393 Resolves a memory-style virtual path into an in-memory object (file or directory).
395 This performs a pure dictionary traversal. It never coerces types or guesses structure.
396 If any intermediate path component is missing or not a dict, resolution fails.
398 Args:
399 path: Memory-style path, e.g., 'root/dir1/file.txt'
401 Returns:
402 The object at that path (could be dict or content object), or None if not found
403 """
404 components = str(path).strip("/").split("/")
405 current = self._memory_store # root dict, e.g., {"root": {"file.txt": "data"}}
407 for comp in components:
408 if not isinstance(current, dict):
409 return None # hit a file too early
410 if comp not in current:
411 return None
412 current = current[comp]
414 return current
416 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
417 """
418 Move a file or directory within the memory store. Symlinks are preserved as objects.
420 Raises:
421 FileNotFoundError: If src path or dst parent path does not exist
422 FileExistsError: If destination already exists
423 StorageResolutionError: On structure violations
424 """
425 def _resolve_parent(path: Union[str, Path]):
426 parts = str(path).strip("/").split("/")
427 return parts[:-1], parts[-1]
429 src_parts, src_name = _resolve_parent(src)
430 dst_parts, dst_name = _resolve_parent(dst)
432 # Traverse to src
433 src_dict = self._memory_store
434 for part in src_parts:
435 src_dict = src_dict.get(part)
436 if not isinstance(src_dict, dict):
437 raise FileNotFoundError(f"Source path invalid: {src}")
438 if src_name not in src_dict:
439 raise FileNotFoundError(f"Source not found: {src}")
441 # Traverse to dst parent — do not create
442 dst_dict = self._memory_store
443 for part in dst_parts:
444 dst_dict = dst_dict.get(part)
445 if dst_dict is None:
446 raise FileNotFoundError(f"Destination parent path does not exist: {dst}")
447 if not isinstance(dst_dict, dict):
448 raise StorageResolutionError(f"Destination path is not a directory: {part}")
450 if dst_name in dst_dict:
451 raise FileExistsError(f"Destination already exists: {dst}")
453 try:
454 dst_dict[dst_name] = src_dict.pop(src_name)
455 except Exception as e:
456 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e
458 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
459 """
460 Copy a file, directory, or symlink within the memory store.
462 - Respects structural separation (no fallback)
463 - Will not overwrite destination
464 - Will not create missing parent directories
465 - Symlinks are copied as objects
467 Raises:
468 FileNotFoundError: If src does not exist or dst parent is missing
469 FileExistsError: If dst already exists
470 StorageResolutionError: On invalid structure
471 """
472 def _resolve_parent(path: Union[str, Path]):
473 parts = str(path).strip("/").split("/")
474 return parts[:-1], parts[-1]
476 src_parts, src_name = _resolve_parent(src)
477 dst_parts, dst_name = _resolve_parent(dst)
479 # Traverse to src object
480 src_dict = self._memory_store
481 for part in src_parts:
482 src_dict = src_dict.get(part)
483 if not isinstance(src_dict, dict):
484 raise FileNotFoundError(f"Source path invalid: {src}")
485 if src_name not in src_dict:
486 raise FileNotFoundError(f"Source not found: {src}")
487 obj = src_dict[src_name]
489 # Traverse to dst parent (do not create)
490 dst_dict = self._memory_store
491 for part in dst_parts:
492 dst_dict = dst_dict.get(part)
493 if dst_dict is None:
494 raise FileNotFoundError(f"Destination parent path does not exist: {dst}")
495 if not isinstance(dst_dict, dict):
496 raise StorageResolutionError(f"Destination path is not a directory: {part}")
498 if dst_name in dst_dict:
499 raise FileExistsError(f"Destination already exists: {dst}")
501 # Perform copy (deep to avoid aliasing)
502 try:
503 dst_dict[dst_name] = py_copy.deepcopy(obj)
504 except Exception as e:
505 raise StorageResolutionError(f"Failed to copy {src} to {dst}") from e
507 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
508 """
509 Return structural metadata about a memory-backed path.
511 Returns:
512 dict with keys:
513 - 'type': 'file', 'directory', 'symlink', or 'missing'
514 - 'path': str(path)
515 - 'target': symlink target if applicable
516 - 'exists': bool
518 Raises:
519 StorageResolutionError: On resolution failure
520 """
521 parts = str(path).strip("/").split("/")
522 current = self._memory_store
524 try:
525 for part in parts[:-1]:
526 current = current.get(part)
527 if current is None:
528 return {
529 "type": "missing",
530 "path": str(path),
531 "exists": False
532 }
533 if not isinstance(current, dict):
534 raise StorageResolutionError(f"Invalid intermediate path segment: {part}")
536 final_key = parts[-1]
537 if final_key not in current:
538 return {
539 "type": "missing",
540 "path": str(path),
541 "exists": False
542 }
544 obj = current[final_key]
546 if isinstance(obj, MemorySymlink):
547 return {
548 "type": "symlink",
549 "path": str(path),
550 "target": obj.target,
551 "exists": self._resolve_path(obj.target) is not None
552 }
554 if isinstance(obj, dict):
555 return {
556 "type": "directory",
557 "path": str(path),
558 "exists": True
559 }
561 return {
562 "type": "file",
563 "path": str(path),
564 "exists": True
565 }
567 except Exception as e:
568 raise StorageResolutionError(f"Failed to stat memory path: {path}") from e
570 def clear_files_only(self) -> None:
571 """
572 Clear all files from the memory store while preserving directory structure.
574 This method removes all file entries (non-None values) but keeps directory
575 entries (None values) intact. This prevents key collisions when reusing
576 the same processing context while maintaining the directory structure
577 needed for subsequent operations.
579 Enhanced with explicit GPU memory cleanup to ensure VRAM is freed when
580 objects are deleted from the memory backend.
582 Note:
583 - Directories (entries with None values) are preserved
584 - Files (entries with non-None values) are deleted
585 - Symlinks are also deleted as they are considered file-like objects
586 - GPU objects are explicitly deleted and VRAM is cleared
587 """
588 try:
589 # Collect keys and objects to delete (preserve directories)
590 files_to_delete = []
591 gpu_objects_found = 0
593 for key, value in list(self._memory_store.items()): 593 ↛ 595line 593 didn't jump to line 595 because the loop on line 593 never started
594 # Delete files (non-None values) and symlinks, but keep directories (None values)
595 if value is not None:
596 files_to_delete.append(key)
598 # Check if this is a GPU object that needs explicit cleanup
599 if self._is_gpu_object(value):
600 gpu_objects_found += 1
601 self._explicit_gpu_delete(value, key)
603 # Delete all file entries from memory store
604 for key in files_to_delete: 604 ↛ 605line 604 didn't jump to line 605 because the loop on line 604 never started
605 del self._memory_store[key]
607 # Force garbage collection to ensure GPU objects are freed
608 import gc
609 collected = gc.collect()
611 # Trigger GPU memory cleanup for all frameworks
612 try:
613 from openhcs.core.memory.gpu_cleanup import cleanup_all_gpu_frameworks
614 cleanup_all_gpu_frameworks()
615 logger.debug(f"🔥 GPU CLEANUP: Triggered comprehensive GPU cleanup after memory backend clear")
616 except Exception as cleanup_error:
617 logger.warning(f"Failed to trigger GPU cleanup after memory backend clear: {cleanup_error}")
619 logger.debug(f"Cleared {len(files_to_delete)} files from memory backend (including {gpu_objects_found} GPU objects), "
620 f"preserved {len(self._memory_store)} directories, collected {collected} objects")
622 except Exception as e:
623 raise StorageResolutionError(f"Failed to clear files from memory store") from e
625 def _is_gpu_object(self, obj: Any) -> bool:
626 """
627 Check if an object is a GPU tensor/array that needs explicit cleanup.
629 Args:
630 obj: Object to check
632 Returns:
633 True if object is a GPU tensor/array
634 """
635 try:
636 # Check for PyTorch tensors on GPU
637 if hasattr(obj, 'device') and hasattr(obj, 'is_cuda'):
638 if obj.is_cuda:
639 return True
641 # Check for CuPy arrays
642 if hasattr(obj, '__class__') and 'cupy' in str(type(obj)):
643 return True
645 # Check for other GPU arrays by device attribute
646 if hasattr(obj, 'device') and hasattr(obj.device, 'type'):
647 if 'cuda' in str(obj.device.type).lower() or 'gpu' in str(obj.device.type).lower():
648 return True
650 return False
651 except Exception:
652 # If we can't determine, assume it's not a GPU object
653 return False
655 def _explicit_gpu_delete(self, obj: Any, key: str) -> None:
656 """
657 Explicitly delete a GPU object and clear its memory.
659 Args:
660 obj: GPU object to delete
661 key: Memory backend key for logging
662 """
663 try:
664 # For PyTorch tensors
665 if hasattr(obj, 'device') and hasattr(obj, 'is_cuda') and obj.is_cuda:
666 device_id = obj.device.index if obj.device.index is not None else 0
667 # Move to CPU first to free GPU memory, then delete
668 obj_cpu = obj.cpu()
669 del obj_cpu
670 logger.debug(f"🔥 EXPLICIT GPU DELETE: PyTorch tensor {key} on device {device_id}")
671 return
673 # For CuPy arrays
674 if hasattr(obj, '__class__') and 'cupy' in str(type(obj)):
675 # CuPy arrays are automatically freed when deleted
676 logger.debug(f"🔥 EXPLICIT GPU DELETE: CuPy array {key}")
677 return
679 # For other GPU objects
680 if hasattr(obj, 'device'):
681 logger.debug(f"🔥 EXPLICIT GPU DELETE: GPU object {key} on device {obj.device}")
683 except Exception as e:
684 logger.warning(f"Failed to explicitly delete GPU object {key}: {e}")
686class MemorySymlink:
687 def __init__(self, target: str):
688 self.target = target # Must be a normalized key path
690 def __repr__(self):
691 return f"<MemorySymlink → {self.target}>"