Coverage for openhcs/io/memory.py: 31.7%
294 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1# openhcs/io/storage/backends/memory.py
2"""
3Memory storage backend module for OpenHCS.
5This module provides an in-memory implementation of the MicroscopyStorageBackend interface.
6It stores data in memory and supports overlay operations for materializing data to disk when needed.
8This implementation enforces Clause 106-A (Declared Memory Types) and
9Clause 251 (Declarative Memory Conversion Interface) by requiring explicit
10memory type declarations and providing declarative conversion methods.
11"""
13import logging
14from pathlib import Path
15from typing import Any, Dict, List, Optional, Set, Union
17from openhcs.io.base import StorageBackend
18from openhcs.constants.constants import Backend
20logger = logging.getLogger(__name__)
23class MemoryStorageBackend(StorageBackend):
24 """Memory storage backend with automatic registration."""
25 _backend_type = Backend.MEMORY.value
26 def __init__(self, shared_dict: Optional[Dict[str, Any]] = None):
27 """
28 Initializes the memory storage.
30 Args:
31 shared_dict: If provided, uses this dictionary as the storage backend.
32 This is useful for sharing memory between processes with a
33 multiprocessing.Manager.dict. If None, a new local
34 dictionary is created.
35 """
36 self._memory_store = shared_dict if shared_dict is not None else {}
37 self._prefixes = set() # Declared directory-like namespaces
39 def _normalize(self, path: Union[str, Path],bypass_normalization=False) -> str:
40 """
41 Normalize paths for memory backend storage.
43 Memory backend uses relative paths internally to avoid conflicts
44 between absolute paths from different systems. This method converts
45 absolute paths to relative paths by removing the root component.
47 Args:
48 path: Path to normalize (absolute or relative)
50 Returns:
51 Normalized relative path string
52 """
53 path_obj = Path(path)
55 if bypass_normalization: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 return path_obj.as_posix()
58 # Store paths as-is - no forced relative conversion
59 # This preserves absolute paths which are needed for cross-backend operations
60 return path_obj.as_posix()
62 def load(self, file_path: Union[str, Path], **kwargs) -> Any:
63 key = self._normalize(file_path)
65 if key not in self._memory_store: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 raise FileNotFoundError(f"Memory path not found: {file_path}")
68 value = self._memory_store[key]
69 if value is None: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 raise IsADirectoryError(f"Path is a directory: {file_path}")
72 return value
74 def save(self, data: Any, output_path: Union[str, Path], **kwargs) -> None:
75 key = self._normalize(output_path)
77 # Check if parent directory exists (simple flat structure)
78 parent_path = self._normalize(Path(key).parent)
79 if parent_path != '.' and parent_path not in self._memory_store: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 raise FileNotFoundError(f"Parent path does not exist: {output_path}")
82 # Check if file already exists
83 if key in self._memory_store: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 raise FileExistsError(f"Path already exists: {output_path}")
85 self._memory_store[key] = data
87 # Save the file
89 def load_batch(self, file_paths: List[Union[str, Path]]) -> List[Any]:
90 """
91 Load multiple files sequentially using existing load method.
93 Args:
94 file_paths: List of file paths to load
95 **kwargs: Additional arguments passed to load method
97 Returns:
98 List of loaded data objects in the same order as file_paths
99 """
100 results = []
101 for file_path in file_paths:
102 result = self.load(file_path)
103 results.append(result)
104 return results
106 def save_batch(self, data_list: List[Any], output_paths: List[Union[str, Path]]) -> None:
107 """
108 Save multiple files sequentially using existing save method.
110 Args:
111 data_list: List of data objects to save
112 output_paths: List of destination paths (must match length of data_list)
113 **kwargs: Additional arguments passed to save method
115 Raises:
116 ValueError: If data_list and output_paths have different lengths
117 """
118 if len(data_list) != len(output_paths): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 raise ValueError(f"data_list length ({len(data_list)}) must match output_paths length ({len(output_paths)})")
121 for data, output_path in zip(data_list, output_paths):
122 self.save(data, output_path)
124 def list_files(
125 self,
126 directory: Union[str, Path],
127 pattern: str = "*",
128 extensions: Optional[Set[str]] = None,
129 recursive: bool = False
130 ) -> List[Path]:
131 from fnmatch import fnmatch
133 dir_key = self._normalize(directory)
135 # Check if directory exists and is a directory
136 if dir_key not in self._memory_store: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 raise FileNotFoundError(f"Directory not found: {directory}")
138 if self._memory_store[dir_key] is not None: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 raise NotADirectoryError(f"Path is not a directory: {directory}")
141 result = []
142 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key
144 for path, value in list(self._memory_store.items()):
145 # Skip if not under this directory
146 if not path.startswith(dir_prefix):
147 continue
149 # Get relative path from directory
150 rel_path = path[len(dir_prefix):]
152 # Skip if recursive=False and path has subdirectories
153 if not recursive and "/" in rel_path: 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true
154 continue
156 # Only include files (value is not None)
157 if value is not None: 157 ↛ 144line 157 didn't jump to line 144 because the condition on line 157 was always true
158 filename = Path(rel_path).name
159 # If pattern is None, match all files
160 if pattern is None or fnmatch(filename, pattern): 160 ↛ 144line 160 didn't jump to line 144 because the condition on line 160 was always true
161 if not extensions or Path(filename).suffix in extensions: 161 ↛ 144line 161 didn't jump to line 144 because the condition on line 161 was always true
162 # Calculate depth for breadth-first sorting
163 depth = rel_path.count('/')
164 result.append((Path(path), depth))
166 # Sort by depth first (breadth-first), then by path for consistency
167 result.sort(key=lambda x: (x[1], str(x[0])))
169 # Return just the paths
170 return [path for path, _ in result]
172 def list_dir(self, path: Union[str, Path]) -> List[str]:
173 dir_key = self._normalize(path)
175 # Check if directory exists and is a directory
176 if dir_key not in self._memory_store:
177 raise FileNotFoundError(f"Directory not found: {path}")
178 if self._memory_store[dir_key] is not None:
179 raise NotADirectoryError(f"Path is not a directory: {path}")
181 # Find all direct children of this directory
182 result = set()
183 dir_prefix = dir_key + "/" if not dir_key.endswith("/") else dir_key
185 for stored_path in list(self._memory_store.keys()):
186 if stored_path.startswith(dir_prefix):
187 rel_path = stored_path[len(dir_prefix):]
188 # Only direct children (no subdirectories)
189 if "/" not in rel_path:
190 result.add(rel_path)
191 else:
192 # Add the first directory component
193 first_dir = rel_path.split("/")[0]
194 result.add(first_dir)
196 return list(result)
199 def delete(self, path: Union[str, Path]) -> None:
200 """
201 Delete a file or empty directory from the in-memory store.
203 This method does not support recursive deletion.
205 Args:
206 path: Virtual path to delete
208 Raises:
209 FileNotFoundError: If the path does not exist
210 IsADirectoryError: If path is a non-empty directory
211 StorageResolutionError: For unexpected internal failures
212 """
213 key = self._normalize(path)
215 if key not in self._memory_store: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 raise FileNotFoundError(f"Path not found: {path}")
218 # If it's a directory, check if it's empty
219 if self._memory_store[key] is None: 219 ↛ 221line 219 didn't jump to line 221 because the condition on line 219 was never true
220 # Check if directory has any children
221 dir_prefix = key + "/" if not key.endswith("/") else key
222 for stored_path in list(self._memory_store.keys()):
223 if stored_path.startswith(dir_prefix):
224 raise IsADirectoryError(f"Cannot delete non-empty directory: {path}")
226 try:
227 del self._memory_store[key]
228 except Exception as e:
229 raise StorageResolutionError(f"Failed to delete path from memory store: {path}") from e
231 def delete_all(self, path: Union[str, Path]) -> None:
232 """
233 Recursively delete a file, empty directory, or a nested directory tree
234 from the in-memory store.
236 This method is the only allowed way to recursively delete in memory backend.
238 Args:
239 path: Virtual path to delete
241 Raises:
242 FileNotFoundError: If the path does not exist
243 StorageResolutionError: If internal deletion fails
244 """
245 key = self._normalize(path)
247 if key not in self._memory_store:
248 raise FileNotFoundError(f"Path not found: {path}")
250 try:
251 # Delete the path itself
252 del self._memory_store[key]
254 # If it was a directory, delete all children
255 dir_prefix = key + "/" if not key.endswith("/") else key
256 keys_to_delete = [k for k in list(self._memory_store.keys()) if k.startswith(dir_prefix)]
257 for k in keys_to_delete:
258 del self._memory_store[k]
260 except Exception as e:
261 raise StorageResolutionError(f"Failed to recursively delete path: {path}") from e
263 def ensure_directory(self, directory: Union[str, Path]) -> Path:
264 key = self._normalize(directory)
265 self._prefixes.add(key if key.endswith("/") else key + "/")
267 # Create the entire directory hierarchy
268 path_obj = Path(key)
269 parts = path_obj.parts
271 # Create each parent directory in the hierarchy
272 for i in range(1, len(parts) + 1):
273 partial_path = self._normalize(Path(*parts[:i]))
274 if partial_path not in self._memory_store:
275 self._memory_store[partial_path] = None # Directory = None value
277 return Path(key)
280 def create_symlink(self, source: Union[str, Path], link_name: Union[str, Path], overwrite: bool = False):
281 src_parts = str(source).strip("/").split("/")
282 dst_parts = str(link_name).strip("/").split("/")
284 # Traverse to source
285 src_dict = self._memory_store
286 for part in src_parts[:-1]:
287 src_dict = src_dict.get(part)
288 if not isinstance(src_dict, dict):
289 raise FileNotFoundError(f"Invalid symlink source path: {source}")
290 src_key = src_parts[-1]
291 if src_key not in src_dict:
292 raise FileNotFoundError(f"Symlink source not found: {source}")
294 # Traverse to destination parent
295 dst_dict = self._memory_store
296 for part in dst_parts[:-1]:
297 dst_dict = dst_dict.get(part)
298 if dst_dict is None or not isinstance(dst_dict, dict):
299 raise FileNotFoundError(f"Destination parent path does not exist: {link_name}")
301 dst_key = dst_parts[-1]
302 if dst_key in dst_dict:
303 if not overwrite:
304 raise FileExistsError(f"Symlink destination already exists: {link_name}")
305 # Remove existing entry if overwrite=True
306 del dst_dict[dst_key]
308 dst_dict[dst_key] = MemorySymlink(target=str(source))
310 def is_symlink(self, path: Union[str, Path]) -> bool:
311 parts = str(path).strip("/").split("/")
312 current = self._memory_store
314 for part in parts[:-1]:
315 current = current.get(part)
316 if not isinstance(current, dict):
317 return False
319 key = parts[-1]
320 return isinstance(current.get(key), MemorySymlink)
322 def is_file(self, path: Union[str, Path]) -> bool:
323 """
324 Check if a memory path points to a file.
326 Raises:
327 FileNotFoundError: If path does not exist
328 IsADirectoryError: If path is a directory
329 """
330 key = self._normalize(path)
332 if key not in self._memory_store:
333 raise FileNotFoundError(f"Memory path does not exist: {path}")
335 value = self._memory_store[key]
336 if value is None:
337 raise IsADirectoryError(f"Path is a directory: {path}")
339 return True
341 def is_dir(self, path: Union[str, Path]) -> bool:
342 """
343 Check if a memory path points to a directory.
345 Args:
346 path: Path to check
348 Returns:
349 bool: True if path is a directory
351 Raises:
352 FileNotFoundError: If path does not exist
353 NotADirectoryError: If path is not a directory
354 """
355 key = self._normalize(path)
357 if key not in self._memory_store:
358 raise FileNotFoundError(f"Memory path does not exist: {path}")
360 value = self._memory_store[key]
361 if value is not None: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 raise NotADirectoryError(f"Path is not a directory: {path}")
364 return True
366 def _resolve_path(self, path: Union[str, Path]) -> Optional[Any]:
367 """
368 Resolves a memory-style virtual path into an in-memory object (file or directory).
370 This performs a pure dictionary traversal. It never coerces types or guesses structure.
371 If any intermediate path component is missing or not a dict, resolution fails.
373 Args:
374 path: Memory-style path, e.g., 'root/dir1/file.txt'
376 Returns:
377 The object at that path (could be dict or content object), or None if not found
378 """
379 components = str(path).strip("/").split("/")
380 current = self._memory_store # root dict, e.g., {"root": {"file.txt": "data"}}
382 for comp in components:
383 if not isinstance(current, dict):
384 return None # hit a file too early
385 if comp not in current:
386 return None
387 current = current[comp]
389 return current
391 def move(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
392 """
393 Move a file or directory within the memory store. Symlinks are preserved as objects.
395 Raises:
396 FileNotFoundError: If src path or dst parent path does not exist
397 FileExistsError: If destination already exists
398 StorageResolutionError: On structure violations
399 """
400 def _resolve_parent(path: Union[str, Path]):
401 parts = str(path).strip("/").split("/")
402 return parts[:-1], parts[-1]
404 src_parts, src_name = _resolve_parent(src)
405 dst_parts, dst_name = _resolve_parent(dst)
407 # Traverse to src
408 src_dict = self._memory_store
409 for part in src_parts:
410 src_dict = src_dict.get(part)
411 if not isinstance(src_dict, dict):
412 raise FileNotFoundError(f"Source path invalid: {src}")
413 if src_name not in src_dict:
414 raise FileNotFoundError(f"Source not found: {src}")
416 # Traverse to dst parent — do not create
417 dst_dict = self._memory_store
418 for part in dst_parts:
419 dst_dict = dst_dict.get(part)
420 if dst_dict is None:
421 raise FileNotFoundError(f"Destination parent path does not exist: {dst}")
422 if not isinstance(dst_dict, dict):
423 raise StorageResolutionError(f"Destination path is not a directory: {part}")
425 if dst_name in dst_dict:
426 raise FileExistsError(f"Destination already exists: {dst}")
428 try:
429 dst_dict[dst_name] = src_dict.pop(src_name)
430 except Exception as e:
431 raise StorageResolutionError(f"Failed to move {src} to {dst}") from e
433 def copy(self, src: Union[str, Path], dst: Union[str, Path]) -> None:
434 """
435 Copy a file, directory, or symlink within the memory store.
437 - Respects structural separation (no fallback)
438 - Will not overwrite destination
439 - Will not create missing parent directories
440 - Symlinks are copied as objects
442 Raises:
443 FileNotFoundError: If src does not exist or dst parent is missing
444 FileExistsError: If dst already exists
445 StorageResolutionError: On invalid structure
446 """
447 def _resolve_parent(path: Union[str, Path]):
448 parts = str(path).strip("/").split("/")
449 return parts[:-1], parts[-1]
451 src_parts, src_name = _resolve_parent(src)
452 dst_parts, dst_name = _resolve_parent(dst)
454 # Traverse to src object
455 src_dict = self._memory_store
456 for part in src_parts:
457 src_dict = src_dict.get(part)
458 if not isinstance(src_dict, dict):
459 raise FileNotFoundError(f"Source path invalid: {src}")
460 if src_name not in src_dict:
461 raise FileNotFoundError(f"Source not found: {src}")
462 obj = src_dict[src_name]
464 # Traverse to dst parent (do not create)
465 dst_dict = self._memory_store
466 for part in dst_parts:
467 dst_dict = dst_dict.get(part)
468 if dst_dict is None:
469 raise FileNotFoundError(f"Destination parent path does not exist: {dst}")
470 if not isinstance(dst_dict, dict):
471 raise StorageResolutionError(f"Destination path is not a directory: {part}")
473 if dst_name in dst_dict:
474 raise FileExistsError(f"Destination already exists: {dst}")
476 # Perform copy (deep to avoid aliasing)
477 try:
478 dst_dict[dst_name] = py_copy.deepcopy(obj)
479 except Exception as e:
480 raise StorageResolutionError(f"Failed to copy {src} to {dst}") from e
482 def stat(self, path: Union[str, Path]) -> Dict[str, Any]:
483 """
484 Return structural metadata about a memory-backed path.
486 Returns:
487 dict with keys:
488 - 'type': 'file', 'directory', 'symlink', or 'missing'
489 - 'path': str(path)
490 - 'target': symlink target if applicable
491 - 'exists': bool
493 Raises:
494 StorageResolutionError: On resolution failure
495 """
496 parts = str(path).strip("/").split("/")
497 current = self._memory_store
499 try:
500 for part in parts[:-1]:
501 current = current.get(part)
502 if current is None:
503 return {
504 "type": "missing",
505 "path": str(path),
506 "exists": False
507 }
508 if not isinstance(current, dict):
509 raise StorageResolutionError(f"Invalid intermediate path segment: {part}")
511 final_key = parts[-1]
512 if final_key not in current:
513 return {
514 "type": "missing",
515 "path": str(path),
516 "exists": False
517 }
519 obj = current[final_key]
521 if isinstance(obj, MemorySymlink):
522 return {
523 "type": "symlink",
524 "path": str(path),
525 "target": obj.target,
526 "exists": self._resolve_path(obj.target) is not None
527 }
529 if isinstance(obj, dict):
530 return {
531 "type": "directory",
532 "path": str(path),
533 "exists": True
534 }
536 return {
537 "type": "file",
538 "path": str(path),
539 "exists": True
540 }
542 except Exception as e:
543 raise StorageResolutionError(f"Failed to stat memory path: {path}") from e
545 def clear_files_only(self) -> None:
546 """
547 Clear all files from the memory store while preserving directory structure.
549 This method removes all file entries (non-None values) but keeps directory
550 entries (None values) intact. This prevents key collisions when reusing
551 the same processing context while maintaining the directory structure
552 needed for subsequent operations.
554 Note:
555 - Directories (entries with None values) are preserved
556 - Files (entries with non-None values) are deleted
557 - Symlinks are also deleted as they are considered file-like objects
558 - GPU objects are explicitly deleted before removal
559 - Caller is responsible for calling gc.collect() and GPU cleanup after this method
560 """
561 try:
562 # Collect keys and objects to delete (preserve directories)
563 files_to_delete = []
564 gpu_objects_found = 0
566 for key, value in list(self._memory_store.items()): 566 ↛ 568line 566 didn't jump to line 568 because the loop on line 566 never started
567 # Delete files (non-None values) and symlinks, but keep directories (None values)
568 if value is not None:
569 files_to_delete.append(key)
571 # Check if this is a GPU object that needs explicit cleanup
572 if self._is_gpu_object(value):
573 gpu_objects_found += 1
574 self._explicit_gpu_delete(value, key)
576 # Delete all file entries from memory store
577 for key in files_to_delete: 577 ↛ 578line 577 didn't jump to line 578 because the loop on line 577 never started
578 del self._memory_store[key]
580 logger.debug(f"Cleared {len(files_to_delete)} files from memory backend (including {gpu_objects_found} GPU objects), "
581 f"preserved {len(self._memory_store)} directories")
583 except Exception as e:
584 raise StorageResolutionError("Failed to clear files from memory store") from e
586 def _is_gpu_object(self, obj: Any) -> bool:
587 """
588 Check if an object is a GPU tensor/array that needs explicit cleanup.
590 Args:
591 obj: Object to check
593 Returns:
594 True if object is a GPU tensor/array
595 """
596 try:
597 # Check for PyTorch tensors on GPU
598 if hasattr(obj, 'device') and hasattr(obj, 'is_cuda'):
599 if obj.is_cuda:
600 return True
602 # Check for CuPy arrays
603 if hasattr(obj, '__class__') and 'cupy' in str(type(obj)):
604 return True
606 # Check for other GPU arrays by device attribute
607 if hasattr(obj, 'device') and hasattr(obj.device, 'type'):
608 if 'cuda' in str(obj.device.type).lower() or 'gpu' in str(obj.device.type).lower():
609 return True
611 return False
612 except Exception:
613 # If we can't determine, assume it's not a GPU object
614 return False
616 def _explicit_gpu_delete(self, obj: Any, key: str) -> None:
617 """
618 Explicitly delete a GPU object and clear its memory.
620 Args:
621 obj: GPU object to delete
622 key: Memory backend key for logging
623 """
624 try:
625 # For PyTorch tensors
626 if hasattr(obj, 'device') and hasattr(obj, 'is_cuda') and obj.is_cuda:
627 device_id = obj.device.index if obj.device.index is not None else 0
628 # Move to CPU first to free GPU memory, then delete
629 obj_cpu = obj.cpu()
630 del obj_cpu
631 logger.debug(f"🔥 EXPLICIT GPU DELETE: PyTorch tensor {key} on device {device_id}")
632 return
634 # For CuPy arrays
635 if hasattr(obj, '__class__') and 'cupy' in str(type(obj)):
636 # CuPy arrays are automatically freed when deleted
637 logger.debug(f"🔥 EXPLICIT GPU DELETE: CuPy array {key}")
638 return
640 # For other GPU objects
641 if hasattr(obj, 'device'):
642 logger.debug(f"🔥 EXPLICIT GPU DELETE: GPU object {key} on device {obj.device}")
644 except Exception as e:
645 logger.warning(f"Failed to explicitly delete GPU object {key}: {e}")
647class MemorySymlink:
648 def __init__(self, target: str):
649 self.target = target # Must be a normalized key path
651 def __repr__(self):
652 return f"<MemorySymlink → {self.target}>"