Coverage for openhcs/io/metadata_writer.py: 62.7%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Atomic metadata writer for OpenHCS with concurrency safety. 

3 

4Provides specialized atomic operations for OpenHCS metadata files with proper 

5locking and merging to prevent race conditions in multiprocessing environments. 

6""" 

7 

8import logging 

9from dataclasses import dataclass 

10from pathlib import Path 

11from typing import Any, Callable, Dict, Optional, Union 

12 

13from .atomic import atomic_update_json, FileLockError, LOCK_CONFIG 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18@dataclass(frozen=True) 

19class MetadataConfig: 

20 """Configuration constants for metadata operations.""" 

21 METADATA_FILENAME: str = "openhcs_metadata.json" 

22 SUBDIRECTORIES_KEY: str = "subdirectories" 

23 AVAILABLE_BACKENDS_KEY: str = "available_backends" 

24 DEFAULT_TIMEOUT: float = LOCK_CONFIG.DEFAULT_TIMEOUT 

25 

26 

27METADATA_CONFIG = MetadataConfig() 

28 

29 

30@dataclass(frozen=True) 

31class MetadataUpdateRequest: 

32 """Parameter object for metadata update operations.""" 

33 metadata_path: Union[str, Path] 

34 sub_dir: str 

35 metadata: Dict[str, Any] 

36 available_backends: Optional[Dict[str, bool]] = None 

37 

38 

39class MetadataWriteError(Exception): 

40 """Raised when metadata write operations fail.""" 

41 pass 

42 

43 

44class AtomicMetadataWriter: 

45 """Atomic metadata writer with file locking for concurrent safety.""" 

46 

47 def __init__(self, timeout: float = METADATA_CONFIG.DEFAULT_TIMEOUT): 

48 self.timeout = timeout 

49 self.logger = logging.getLogger(__name__) 

50 

51 def _execute_update(self, metadata_path: Union[str, Path], update_func: Callable, default_data: Optional[Dict] = None) -> None: 

52 """Execute atomic update with error handling.""" 

53 try: 

54 atomic_update_json(metadata_path, update_func, self.timeout, default_data) 

55 except FileLockError as e: 

56 raise MetadataWriteError(f"Failed to update metadata: {e}") from e 

57 

58 def _ensure_subdirectories_structure(self, data: Optional[Dict[str, Any]]) -> Dict[str, Any]: 

59 """Ensure metadata has proper subdirectories structure.""" 

60 data = data or {} 

61 data.setdefault(METADATA_CONFIG.SUBDIRECTORIES_KEY, {}) 

62 return data 

63 

64 def _create_subdirectory_update(self, sub_dir: str, metadata: Dict[str, Any]) -> Callable: 

65 """Create update function for subdirectory operations.""" 

66 def update_func(data): 

67 data = self._ensure_subdirectories_structure(data) 

68 data[METADATA_CONFIG.SUBDIRECTORIES_KEY][sub_dir] = metadata 

69 return data 

70 return update_func 

71 

72 def update_subdirectory_metadata(self, metadata_path: Union[str, Path], sub_dir: str, metadata: Dict[str, Any]) -> None: 

73 """Atomically update metadata for a specific subdirectory.""" 

74 update_func = self._create_subdirectory_update(sub_dir, metadata) 

75 self._execute_update(metadata_path, update_func, {METADATA_CONFIG.SUBDIRECTORIES_KEY: {}}) 

76 self.logger.debug(f"Updated subdirectory '{sub_dir}' in {metadata_path}") 

77 

78 def update_available_backends(self, metadata_path: Union[str, Path], available_backends: Dict[str, bool]) -> None: 

79 """Atomically update available backends in metadata.""" 

80 def update_func(data): 

81 if data is None: 

82 raise MetadataWriteError("Cannot update backends: metadata file does not exist") 

83 data[METADATA_CONFIG.AVAILABLE_BACKENDS_KEY] = available_backends 

84 return data 

85 

86 self._execute_update(metadata_path, update_func) 

87 self.logger.debug(f"Updated available backends in {metadata_path}") 

88 

89 def merge_subdirectory_metadata(self, metadata_path: Union[str, Path], subdirectory_updates: Dict[str, Dict[str, Any]]) -> None: 

90 """Atomically merge multiple subdirectory metadata updates.""" 

91 def update_func(data): 

92 data = self._ensure_subdirectories_structure(data) 

93 data[METADATA_CONFIG.SUBDIRECTORIES_KEY].update(subdirectory_updates) 

94 return data 

95 

96 self._execute_update(metadata_path, update_func, {METADATA_CONFIG.SUBDIRECTORIES_KEY: {}}) 

97 self.logger.debug(f"Merged {len(subdirectory_updates)} subdirectories in {metadata_path}") 

98 

99 def create_or_update_metadata(self, request: MetadataUpdateRequest) -> None: 

100 """Atomically create or update metadata file with subdirectory and backend info.""" 

101 update_func = self._create_subdirectory_update(request.sub_dir, request.metadata) 

102 

103 if request.available_backends is not None: 

104 # Compose with backend update 

105 original_func = update_func 

106 def update_func(data): 

107 data = original_func(data) 

108 data[METADATA_CONFIG.AVAILABLE_BACKENDS_KEY] = request.available_backends 

109 return data 

110 

111 self._execute_update(request.metadata_path, update_func, {METADATA_CONFIG.SUBDIRECTORIES_KEY: {}}) 

112 self.logger.debug(f"Created/updated metadata for '{request.sub_dir}' in {request.metadata_path}") 

113 

114 

115def get_metadata_path(plate_root: Union[str, Path]) -> Path: 

116 """ 

117 Get the standard metadata file path for a plate root directory. 

118  

119 Args: 

120 plate_root: Path to the plate root directory 

121  

122 Returns: 

123 Path to the metadata file 

124 """ 

125 return Path(plate_root) / METADATA_CONFIG.METADATA_FILENAME