Coverage for openhcs/io/metadata_writer.py: 84.5%

61 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Atomic metadata writer for OpenHCS with concurrency safety. 

3 

4Provides specialized atomic operations for OpenHCS metadata files with proper 

5locking and merging to prevent race conditions in multiprocessing environments. 

6""" 

7 

8import logging 

9from dataclasses import dataclass 

10from pathlib import Path 

11from typing import Any, Callable, Dict, Optional, Union 

12 

13from .atomic import atomic_update_json, FileLockError, LOCK_CONFIG 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18def get_subdirectory_name(input_dir: Union[str, Path], plate_path: Union[str, Path]) -> str: 

19 """ 

20 Determine subdirectory name for metadata. 

21 

22 Returns "." if input_dir equals plate_path (plate root), otherwise returns 

23 the directory name. 

24 

25 Args: 

26 input_dir: Input directory path 

27 plate_path: Plate root path 

28 

29 Returns: 

30 Subdirectory name ("." for plate root, directory name otherwise) 

31 """ 

32 input_path = Path(input_dir) 

33 plate_path = Path(plate_path) 

34 return "." if input_path == plate_path else input_path.name 

35 

36 

37def resolve_subdirectory_path(subdir_name: str, plate_path: Union[str, Path]) -> Path: 

38 """ 

39 Convert subdirectory name from metadata to actual path. 

40 

41 Inverse of get_subdirectory_name(). Returns plate_path if subdir_name is ".", 

42 otherwise returns plate_path / subdir_name. 

43 

44 Args: 

45 subdir_name: Subdirectory name from metadata ("." for plate root) 

46 plate_path: Plate root path 

47 

48 Returns: 

49 Resolved path (plate_path for ".", plate_path/subdir_name otherwise) 

50 """ 

51 plate_path = Path(plate_path) 

52 return plate_path if subdir_name == "." else plate_path / subdir_name 

53 

54 

55@dataclass(frozen=True) 

56class MetadataConfig: 

57 """Configuration constants for metadata operations.""" 

58 METADATA_FILENAME: str = "openhcs_metadata.json" 

59 SUBDIRECTORIES_KEY: str = "subdirectories" 

60 AVAILABLE_BACKENDS_KEY: str = "available_backends" 

61 DEFAULT_TIMEOUT: float = LOCK_CONFIG.DEFAULT_TIMEOUT 

62 

63 

64METADATA_CONFIG = MetadataConfig() 

65 

66 

67class MetadataWriteError(Exception): 

68 """Raised when metadata write operations fail.""" 

69 pass 

70 

71 

72class AtomicMetadataWriter: 

73 """Atomic metadata writer with file locking for concurrent safety.""" 

74 

75 def __init__(self, timeout: float = METADATA_CONFIG.DEFAULT_TIMEOUT): 

76 self.timeout = timeout 

77 self.logger = logging.getLogger(__name__) 

78 

79 def _execute_update(self, metadata_path: Union[str, Path], update_func: Callable, default_data: Optional[Dict] = None) -> None: 

80 """Execute atomic update with error handling.""" 

81 try: 

82 atomic_update_json(metadata_path, update_func, self.timeout, default_data) 

83 except FileLockError as e: 

84 raise MetadataWriteError(f"Failed to update metadata: {e}") from e 

85 

86 def _ensure_subdirectories_structure(self, data: Optional[Dict[str, Any]]) -> Dict[str, Any]: 

87 """Ensure metadata has proper subdirectories structure.""" 

88 data = data or {} 

89 data.setdefault(METADATA_CONFIG.SUBDIRECTORIES_KEY, {}) 

90 return data 

91 

92 

93 

94 def update_available_backends(self, metadata_path: Union[str, Path], available_backends: Dict[str, bool]) -> None: 

95 """Atomically update available backends in metadata.""" 

96 def update_func(data): 

97 if data is None: 

98 raise MetadataWriteError("Cannot update backends: metadata file does not exist") 

99 data[METADATA_CONFIG.AVAILABLE_BACKENDS_KEY] = available_backends 

100 return data 

101 

102 self._execute_update(metadata_path, update_func) 

103 self.logger.debug(f"Updated available backends in {metadata_path}") 

104 

105 def merge_subdirectory_metadata(self, metadata_path: Union[str, Path], subdirectory_updates: Dict[str, Dict[str, Any]]) -> None: 

106 """Atomically merge multiple subdirectory metadata updates. 

107 

108 Performs deep merge for nested dicts (like available_backends), shallow update for other fields. 

109 

110 Example: 

111 Existing: {"TimePoint_1": {"available_backends": {"disk": True}, "main": True}} 

112 Updates: {"TimePoint_1": {"available_backends": {"zarr": True}, "main": False}} 

113 Result: {"TimePoint_1": {"available_backends": {"disk": True, "zarr": True}, "main": False}} 

114 """ 

115 def update_func(data): 

116 data = self._ensure_subdirectories_structure(data) 

117 subdirs = data[METADATA_CONFIG.SUBDIRECTORIES_KEY] 

118 

119 # Deep merge each subdirectory update 

120 for subdir_name, updates in subdirectory_updates.items(): 

121 if subdir_name in subdirs: 

122 # Merge into existing subdirectory 

123 existing = subdirs[subdir_name] 

124 for key, value in updates.items(): 

125 # Deep merge for available_backends dict 

126 if key == METADATA_CONFIG.AVAILABLE_BACKENDS_KEY and isinstance(value, dict): 

127 existing_backends = existing.get(key, {}) 

128 existing[key] = {**existing_backends, **value} 

129 else: 

130 # Shallow update for other fields 

131 existing[key] = value 

132 else: 

133 # Create new subdirectory 

134 subdirs[subdir_name] = updates 

135 

136 return data 

137 

138 self._execute_update(metadata_path, update_func, {METADATA_CONFIG.SUBDIRECTORIES_KEY: {}}) 

139 self.logger.debug(f"Merged {len(subdirectory_updates)} subdirectories in {metadata_path}") 

140 

141 

142 

143 

144def get_metadata_path(plate_root: Union[str, Path]) -> Path: 

145 """ 

146 Get the standard metadata file path for a plate root directory. 

147  

148 Args: 

149 plate_root: Path to the plate root directory 

150  

151 Returns: 

152 Path to the metadata file 

153 """ 

154 return Path(plate_root) / METADATA_CONFIG.METADATA_FILENAME