Coverage for openhcs/io/atomic.py: 78.8%

101 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Atomic file operations with locking for OpenHCS. 

3 

4Provides utilities for atomic read-modify-write operations with file locking 

5to prevent concurrency issues in multiprocessing environments. 

6""" 

7 

8import fcntl 

9import json 

10import logging 

11import os 

12import tempfile 

13import time 

14from contextlib import contextmanager 

15from dataclasses import dataclass 

16from pathlib import Path 

17from typing import Any, Callable, Dict, Optional, TypeVar, Union 

18 

19logger = logging.getLogger(__name__) 

20 

21T = TypeVar('T') 

22 

23 

24@dataclass(frozen=True) 

25class LockConfig: 

26 """Configuration constants for file locking operations.""" 

27 DEFAULT_TIMEOUT: float = 30.0 

28 DEFAULT_POLL_INTERVAL: float = 0.1 

29 LOCK_SUFFIX: str = '.lock' 

30 TEMP_PREFIX: str = '.tmp' 

31 JSON_INDENT: int = 2 

32 

33 

34LOCK_CONFIG = LockConfig() 

35 

36 

37class FileLockError(Exception): 

38 """Raised when file locking operations fail.""" 

39 pass 

40 

41 

42class FileLockTimeoutError(FileLockError): 

43 """Raised when file lock acquisition times out.""" 

44 pass 

45 

46 

47@contextmanager 

48def file_lock( 

49 lock_path: Union[str, Path], 

50 timeout: float = LOCK_CONFIG.DEFAULT_TIMEOUT, 

51 poll_interval: float = LOCK_CONFIG.DEFAULT_POLL_INTERVAL 

52): 

53 """Context manager for exclusive file locking.""" 

54 lock_path = Path(lock_path) 

55 lock_path.parent.mkdir(parents=True, exist_ok=True) 

56 

57 lock_fd = None 

58 try: 

59 lock_fd = _acquire_lock_with_timeout(lock_path, timeout, poll_interval) 

60 yield 

61 except FileLockTimeoutError: 

62 raise 

63 except Exception as e: 

64 raise FileLockError(f"File lock operation failed for {lock_path}: {e}") from e 

65 finally: 

66 _cleanup_lock(lock_fd, lock_path) 

67 

68 

69def _acquire_lock_with_timeout(lock_path: Path, timeout: float, poll_interval: float) -> int: 

70 """Acquire file lock with timeout and return file descriptor.""" 

71 deadline = time.time() + timeout 

72 

73 while time.time() < deadline: 73 ↛ 78line 73 didn't jump to line 78 because the condition on line 73 was always true

74 if lock_fd := _try_acquire_lock(lock_path): 74 ↛ 76line 74 didn't jump to line 76 because the condition on line 74 was always true

75 return lock_fd 

76 time.sleep(poll_interval) 

77 

78 raise FileLockTimeoutError(f"Failed to acquire lock {lock_path} within {timeout}s") 

79 

80 

81def _try_acquire_lock(lock_path: Path) -> Optional[int]: 

82 """Try to acquire lock once, return fd or None.""" 

83 try: 

84 lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_WRONLY | os.O_TRUNC) 

85 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 

86 logger.debug(f"Acquired file lock: {lock_path}") 

87 return lock_fd 

88 except (OSError, IOError): 

89 return None 

90 

91 

92def _cleanup_lock(lock_fd: Optional[int], lock_path: Path) -> None: 

93 """Clean up file lock resources.""" 

94 if lock_fd is not None: 94 ↛ 102line 94 didn't jump to line 102 because the condition on line 94 was always true

95 try: 

96 fcntl.flock(lock_fd, fcntl.LOCK_UN) 

97 os.close(lock_fd) 

98 logger.debug(f"Released file lock: {lock_path}") 

99 except Exception as e: 

100 logger.warning(f"Error releasing lock {lock_path}: {e}") 

101 

102 if lock_path.exists(): 102 ↛ exitline 102 didn't return from function '_cleanup_lock' because the condition on line 102 was always true

103 try: 

104 lock_path.unlink() 

105 except Exception as e: 

106 logger.warning(f"Error removing lock file {lock_path}: {e}") 

107 

108 

109def atomic_write_json( 

110 file_path: Union[str, Path], 

111 data: Dict[str, Any], 

112 indent: int = LOCK_CONFIG.JSON_INDENT, 

113 ensure_directory: bool = True 

114) -> None: 

115 """Atomically write JSON data to file using temporary file + rename.""" 

116 file_path = Path(file_path) 

117 

118 if ensure_directory: 118 ↛ 121line 118 didn't jump to line 121 because the condition on line 118 was always true

119 file_path.parent.mkdir(parents=True, exist_ok=True) 

120 

121 try: 

122 tmp_path = _write_to_temp_file(file_path, data, indent) 

123 os.rename(tmp_path, str(file_path)) 

124 logger.debug(f"Atomically wrote JSON to {file_path}") 

125 except Exception as e: 

126 raise FileLockError(f"Atomic JSON write failed for {file_path}: {e}") from e 

127 

128 

129def _write_to_temp_file(file_path: Path, data: Dict[str, Any], indent: int) -> str: 

130 """Write data to temporary file and return path.""" 

131 with tempfile.NamedTemporaryFile( 

132 mode='w', 

133 dir=file_path.parent, 

134 prefix=f"{LOCK_CONFIG.TEMP_PREFIX}{file_path.name}", 

135 suffix='.json', 

136 delete=False 

137 ) as tmp_file: 

138 json.dump(data, tmp_file, indent=indent) 

139 tmp_file.flush() 

140 os.fsync(tmp_file.fileno()) 

141 return tmp_file.name 

142 

143 

144def atomic_update_json( 

145 file_path: Union[str, Path], 

146 update_func: Callable[[Optional[Dict[str, Any]]], Dict[str, Any]], 

147 lock_timeout: float = LOCK_CONFIG.DEFAULT_TIMEOUT, 

148 default_data: Optional[Dict[str, Any]] = None 

149) -> None: 

150 """Atomically update JSON file using read-modify-write with file locking.""" 

151 file_path = Path(file_path) 

152 lock_path = file_path.with_suffix(f'{file_path.suffix}{LOCK_CONFIG.LOCK_SUFFIX}') 

153 

154 with file_lock(lock_path, timeout=lock_timeout): 

155 current_data = _read_json_or_default(file_path, default_data) 

156 

157 try: 

158 updated_data = update_func(current_data) 

159 except Exception as e: 

160 raise FileLockError(f"Update function failed for {file_path}: {e}") from e 

161 

162 atomic_write_json(file_path, updated_data) 

163 logger.debug(f"Atomically updated JSON file: {file_path}") 

164 

165 

166def _read_json_or_default(file_path: Path, default_data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: 

167 """Read JSON file or return default data if file doesn't exist or is invalid.""" 

168 if not file_path.exists(): 

169 return default_data 

170 

171 try: 

172 with open(file_path, 'r') as f: 

173 return json.load(f) 

174 except (json.JSONDecodeError, IOError) as e: 

175 logger.warning(f"Failed to read {file_path}, using default: {e}") 

176 return default_data