Coverage for openhcs/io/metadata_writer.py: 84.5%
61 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Atomic metadata writer for OpenHCS with concurrency safety.
4Provides specialized atomic operations for OpenHCS metadata files with proper
5locking and merging to prevent race conditions in multiprocessing environments.
6"""
8import logging
9from dataclasses import dataclass
10from pathlib import Path
11from typing import Any, Callable, Dict, Optional, Union
13from .atomic import atomic_update_json, FileLockError, LOCK_CONFIG
15logger = logging.getLogger(__name__)
18def get_subdirectory_name(input_dir: Union[str, Path], plate_path: Union[str, Path]) -> str:
19 """
20 Determine subdirectory name for metadata.
22 Returns "." if input_dir equals plate_path (plate root), otherwise returns
23 the directory name.
25 Args:
26 input_dir: Input directory path
27 plate_path: Plate root path
29 Returns:
30 Subdirectory name ("." for plate root, directory name otherwise)
31 """
32 input_path = Path(input_dir)
33 plate_path = Path(plate_path)
34 return "." if input_path == plate_path else input_path.name
37def resolve_subdirectory_path(subdir_name: str, plate_path: Union[str, Path]) -> Path:
38 """
39 Convert subdirectory name from metadata to actual path.
41 Inverse of get_subdirectory_name(). Returns plate_path if subdir_name is ".",
42 otherwise returns plate_path / subdir_name.
44 Args:
45 subdir_name: Subdirectory name from metadata ("." for plate root)
46 plate_path: Plate root path
48 Returns:
49 Resolved path (plate_path for ".", plate_path/subdir_name otherwise)
50 """
51 plate_path = Path(plate_path)
52 return plate_path if subdir_name == "." else plate_path / subdir_name
55@dataclass(frozen=True)
56class MetadataConfig:
57 """Configuration constants for metadata operations."""
58 METADATA_FILENAME: str = "openhcs_metadata.json"
59 SUBDIRECTORIES_KEY: str = "subdirectories"
60 AVAILABLE_BACKENDS_KEY: str = "available_backends"
61 DEFAULT_TIMEOUT: float = LOCK_CONFIG.DEFAULT_TIMEOUT
64METADATA_CONFIG = MetadataConfig()
67class MetadataWriteError(Exception):
68 """Raised when metadata write operations fail."""
69 pass
72class AtomicMetadataWriter:
73 """Atomic metadata writer with file locking for concurrent safety."""
75 def __init__(self, timeout: float = METADATA_CONFIG.DEFAULT_TIMEOUT):
76 self.timeout = timeout
77 self.logger = logging.getLogger(__name__)
79 def _execute_update(self, metadata_path: Union[str, Path], update_func: Callable, default_data: Optional[Dict] = None) -> None:
80 """Execute atomic update with error handling."""
81 try:
82 atomic_update_json(metadata_path, update_func, self.timeout, default_data)
83 except FileLockError as e:
84 raise MetadataWriteError(f"Failed to update metadata: {e}") from e
86 def _ensure_subdirectories_structure(self, data: Optional[Dict[str, Any]]) -> Dict[str, Any]:
87 """Ensure metadata has proper subdirectories structure."""
88 data = data or {}
89 data.setdefault(METADATA_CONFIG.SUBDIRECTORIES_KEY, {})
90 return data
94 def update_available_backends(self, metadata_path: Union[str, Path], available_backends: Dict[str, bool]) -> None:
95 """Atomically update available backends in metadata."""
96 def update_func(data):
97 if data is None:
98 raise MetadataWriteError("Cannot update backends: metadata file does not exist")
99 data[METADATA_CONFIG.AVAILABLE_BACKENDS_KEY] = available_backends
100 return data
102 self._execute_update(metadata_path, update_func)
103 self.logger.debug(f"Updated available backends in {metadata_path}")
105 def merge_subdirectory_metadata(self, metadata_path: Union[str, Path], subdirectory_updates: Dict[str, Dict[str, Any]]) -> None:
106 """Atomically merge multiple subdirectory metadata updates.
108 Performs deep merge for nested dicts (like available_backends), shallow update for other fields.
110 Example:
111 Existing: {"TimePoint_1": {"available_backends": {"disk": True}, "main": True}}
112 Updates: {"TimePoint_1": {"available_backends": {"zarr": True}, "main": False}}
113 Result: {"TimePoint_1": {"available_backends": {"disk": True, "zarr": True}, "main": False}}
114 """
115 def update_func(data):
116 data = self._ensure_subdirectories_structure(data)
117 subdirs = data[METADATA_CONFIG.SUBDIRECTORIES_KEY]
119 # Deep merge each subdirectory update
120 for subdir_name, updates in subdirectory_updates.items():
121 if subdir_name in subdirs:
122 # Merge into existing subdirectory
123 existing = subdirs[subdir_name]
124 for key, value in updates.items():
125 # Deep merge for available_backends dict
126 if key == METADATA_CONFIG.AVAILABLE_BACKENDS_KEY and isinstance(value, dict):
127 existing_backends = existing.get(key, {})
128 existing[key] = {**existing_backends, **value}
129 else:
130 # Shallow update for other fields
131 existing[key] = value
132 else:
133 # Create new subdirectory
134 subdirs[subdir_name] = updates
136 return data
138 self._execute_update(metadata_path, update_func, {METADATA_CONFIG.SUBDIRECTORIES_KEY: {}})
139 self.logger.debug(f"Merged {len(subdirectory_updates)} subdirectories in {metadata_path}")
144def get_metadata_path(plate_root: Union[str, Path]) -> Path:
145 """
146 Get the standard metadata file path for a plate root directory.
148 Args:
149 plate_root: Path to the plate root directory
151 Returns:
152 Path to the metadata file
153 """
154 return Path(plate_root) / METADATA_CONFIG.METADATA_FILENAME