Coverage for openhcs/io/metadata_writer.py: 62.7%
71 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Atomic metadata writer for OpenHCS with concurrency safety.
4Provides specialized atomic operations for OpenHCS metadata files with proper
5locking and merging to prevent race conditions in multiprocessing environments.
6"""
8import logging
9from dataclasses import dataclass
10from pathlib import Path
11from typing import Any, Callable, Dict, Optional, Union
13from .atomic import atomic_update_json, FileLockError, LOCK_CONFIG
15logger = logging.getLogger(__name__)
18@dataclass(frozen=True)
19class MetadataConfig:
20 """Configuration constants for metadata operations."""
21 METADATA_FILENAME: str = "openhcs_metadata.json"
22 SUBDIRECTORIES_KEY: str = "subdirectories"
23 AVAILABLE_BACKENDS_KEY: str = "available_backends"
24 DEFAULT_TIMEOUT: float = LOCK_CONFIG.DEFAULT_TIMEOUT
27METADATA_CONFIG = MetadataConfig()
30@dataclass(frozen=True)
31class MetadataUpdateRequest:
32 """Parameter object for metadata update operations."""
33 metadata_path: Union[str, Path]
34 sub_dir: str
35 metadata: Dict[str, Any]
36 available_backends: Optional[Dict[str, bool]] = None
39class MetadataWriteError(Exception):
40 """Raised when metadata write operations fail."""
41 pass
44class AtomicMetadataWriter:
45 """Atomic metadata writer with file locking for concurrent safety."""
47 def __init__(self, timeout: float = METADATA_CONFIG.DEFAULT_TIMEOUT):
48 self.timeout = timeout
49 self.logger = logging.getLogger(__name__)
51 def _execute_update(self, metadata_path: Union[str, Path], update_func: Callable, default_data: Optional[Dict] = None) -> None:
52 """Execute atomic update with error handling."""
53 try:
54 atomic_update_json(metadata_path, update_func, self.timeout, default_data)
55 except FileLockError as e:
56 raise MetadataWriteError(f"Failed to update metadata: {e}") from e
58 def _ensure_subdirectories_structure(self, data: Optional[Dict[str, Any]]) -> Dict[str, Any]:
59 """Ensure metadata has proper subdirectories structure."""
60 data = data or {}
61 data.setdefault(METADATA_CONFIG.SUBDIRECTORIES_KEY, {})
62 return data
64 def _create_subdirectory_update(self, sub_dir: str, metadata: Dict[str, Any]) -> Callable:
65 """Create update function for subdirectory operations."""
66 def update_func(data):
67 data = self._ensure_subdirectories_structure(data)
68 data[METADATA_CONFIG.SUBDIRECTORIES_KEY][sub_dir] = metadata
69 return data
70 return update_func
72 def update_subdirectory_metadata(self, metadata_path: Union[str, Path], sub_dir: str, metadata: Dict[str, Any]) -> None:
73 """Atomically update metadata for a specific subdirectory."""
74 update_func = self._create_subdirectory_update(sub_dir, metadata)
75 self._execute_update(metadata_path, update_func, {METADATA_CONFIG.SUBDIRECTORIES_KEY: {}})
76 self.logger.debug(f"Updated subdirectory '{sub_dir}' in {metadata_path}")
78 def update_available_backends(self, metadata_path: Union[str, Path], available_backends: Dict[str, bool]) -> None:
79 """Atomically update available backends in metadata."""
80 def update_func(data):
81 if data is None:
82 raise MetadataWriteError("Cannot update backends: metadata file does not exist")
83 data[METADATA_CONFIG.AVAILABLE_BACKENDS_KEY] = available_backends
84 return data
86 self._execute_update(metadata_path, update_func)
87 self.logger.debug(f"Updated available backends in {metadata_path}")
89 def merge_subdirectory_metadata(self, metadata_path: Union[str, Path], subdirectory_updates: Dict[str, Dict[str, Any]]) -> None:
90 """Atomically merge multiple subdirectory metadata updates."""
91 def update_func(data):
92 data = self._ensure_subdirectories_structure(data)
93 data[METADATA_CONFIG.SUBDIRECTORIES_KEY].update(subdirectory_updates)
94 return data
96 self._execute_update(metadata_path, update_func, {METADATA_CONFIG.SUBDIRECTORIES_KEY: {}})
97 self.logger.debug(f"Merged {len(subdirectory_updates)} subdirectories in {metadata_path}")
99 def create_or_update_metadata(self, request: MetadataUpdateRequest) -> None:
100 """Atomically create or update metadata file with subdirectory and backend info."""
101 update_func = self._create_subdirectory_update(request.sub_dir, request.metadata)
103 if request.available_backends is not None:
104 # Compose with backend update
105 original_func = update_func
106 def update_func(data):
107 data = original_func(data)
108 data[METADATA_CONFIG.AVAILABLE_BACKENDS_KEY] = request.available_backends
109 return data
111 self._execute_update(request.metadata_path, update_func, {METADATA_CONFIG.SUBDIRECTORIES_KEY: {}})
112 self.logger.debug(f"Created/updated metadata for '{request.sub_dir}' in {request.metadata_path}")
115def get_metadata_path(plate_root: Union[str, Path]) -> Path:
116 """
117 Get the standard metadata file path for a plate root directory.
119 Args:
120 plate_root: Path to the plate root directory
122 Returns:
123 Path to the metadata file
124 """
125 return Path(plate_root) / METADATA_CONFIG.METADATA_FILENAME