Coverage for src/polystore/atomic.py: 27%

136 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-03 06:58 +0000

1""" 

2Atomic file operations with locking for OpenHCS. 

3 

4Provides utilities for atomic read-modify-write operations with file locking 

5to prevent concurrency issues in multiprocessing environments. 

6""" 

7 

8import json 

9import logging 

10import os 

11import tempfile 

12import time 

13from contextlib import contextmanager 

14from dataclasses import dataclass 

15from pathlib import Path 

16from typing import Any, Callable, Dict, Optional, TypeVar, Union 

17 

18# Cross-platform file locking 

19try: 

20 import fcntl 

21 FCNTL_AVAILABLE = True 

22except ImportError: 

23 # Windows compatibility - use portalocker 

24 import portalocker 

25 FCNTL_AVAILABLE = False 

26 

27logger = logging.getLogger(__name__) 

28 

29T = TypeVar('T') 

30 

31 

32@dataclass(frozen=True) 

33class LockConfig: 

34 """Configuration constants for file locking operations.""" 

35 DEFAULT_TIMEOUT: float = 30.0 

36 DEFAULT_POLL_INTERVAL: float = 0.1 

37 LOCK_SUFFIX: str = '.lock' 

38 TEMP_PREFIX: str = '.tmp' 

39 JSON_INDENT: int = 2 

40 

41 

42LOCK_CONFIG = LockConfig() 

43 

44 

45class FileLockError(Exception): 

46 """Raised when file locking operations fail.""" 

47 pass 

48 

49 

50class FileLockTimeoutError(FileLockError): 

51 """Raised when file lock acquisition times out.""" 

52 pass 

53 

54 

55@contextmanager 

56def file_lock( 

57 lock_path: Union[str, Path], 

58 timeout: float = LOCK_CONFIG.DEFAULT_TIMEOUT, 

59 poll_interval: float = LOCK_CONFIG.DEFAULT_POLL_INTERVAL 

60): 

61 """Context manager for exclusive file locking.""" 

62 lock_path = Path(lock_path) 

63 lock_path.parent.mkdir(parents=True, exist_ok=True) 

64 

65 lock_fd = None 

66 try: 

67 lock_fd = _acquire_lock_with_timeout(lock_path, timeout, poll_interval) 

68 yield 

69 except FileLockTimeoutError: 

70 raise 

71 except Exception as e: 

72 raise FileLockError(f"File lock operation failed for {lock_path}: {e}") from e 

73 finally: 

74 _cleanup_lock(lock_fd, lock_path) 

75 

76 

77def _acquire_lock_with_timeout(lock_path: Path, timeout: float, poll_interval: float) -> int: 

78 """Acquire file lock with timeout and return file descriptor.""" 

79 deadline = time.time() + timeout 

80 

81 while time.time() < deadline: 

82 if lock_fd := _try_acquire_lock(lock_path): 

83 return lock_fd 

84 time.sleep(poll_interval) 

85 

86 raise FileLockTimeoutError(f"Failed to acquire lock {lock_path} within {timeout}s") 

87 

88 

89def _try_acquire_lock(lock_path: Path) -> Optional[int]: 

90 """Try to acquire lock once, return fd or None.""" 

91 try: 

92 lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_WRONLY | os.O_TRUNC) 

93 if FCNTL_AVAILABLE: 

94 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 

95 else: 

96 # Windows: use portalocker 

97 portalocker.lock(lock_fd, portalocker.LOCK_EX | portalocker.LOCK_NB) 

98 logger.debug(f"Acquired file lock: {lock_path}") 

99 return lock_fd 

100 except (OSError, IOError): 

101 return None 

102 

103 

104def _cleanup_lock(lock_fd: Optional[int], lock_path: Path) -> None: 

105 """Clean up file lock resources.""" 

106 if lock_fd is not None: 

107 try: 

108 if FCNTL_AVAILABLE: 

109 fcntl.flock(lock_fd, fcntl.LOCK_UN) 

110 else: 

111 # Windows: use portalocker 

112 portalocker.unlock(lock_fd) 

113 os.close(lock_fd) 

114 logger.debug(f"Released file lock: {lock_path}") 

115 except Exception as e: 

116 logger.warning(f"Error releasing lock {lock_path}: {e}") 

117 

118 if lock_path.exists(): 

119 try: 

120 lock_path.unlink() 

121 except Exception as e: 

122 logger.warning(f"Error removing lock file {lock_path}: {e}") 

123 

124 

125@contextmanager 

126def atomic_write(file_path: Union[str, Path], mode: str = 'w', ensure_directory: bool = True): 

127 """ 

128 Context manager for atomic file writes. 

129  

130 Writes to a temporary file first, then renames to the target path. 

131 This ensures the operation is atomic - either fully succeeds or fails 

132 without leaving partial writes. 

133  

134 Args: 

135 file_path: Target file path 

136 mode: File mode ('w' for text, 'wb' for binary) 

137 ensure_directory: Create parent directory if it doesn't exist 

138  

139 Example: 

140 with atomic_write("output.txt") as f: 

141 f.write("data") 

142 """ 

143 file_path = Path(file_path) 

144 

145 if ensure_directory: 

146 file_path.parent.mkdir(parents=True, exist_ok=True) 

147 

148 # Create temporary file in same directory 

149 with tempfile.NamedTemporaryFile( 

150 mode=mode, 

151 dir=file_path.parent, 

152 prefix=f"{LOCK_CONFIG.TEMP_PREFIX}{file_path.name}", 

153 delete=False 

154 ) as tmp_file: 

155 tmp_path = tmp_file.name 

156 try: 

157 yield tmp_file 

158 tmp_file.flush() 

159 os.fsync(tmp_file.fileno()) 

160 except Exception: 

161 # Clean up temp file on error 

162 try: 

163 os.unlink(tmp_path) 

164 except Exception: 

165 pass 

166 raise 

167 

168 # Atomically replace target file 

169 try: 

170 os.replace(tmp_path, str(file_path)) 

171 logger.debug(f"Atomically wrote to {file_path}") 

172 except Exception as e: 

173 try: 

174 os.unlink(tmp_path) 

175 except Exception: 

176 pass 

177 raise FileLockError(f"Atomic write failed for {file_path}: {e}") from e 

178 

179 

180def atomic_write_json( 

181 file_path: Union[str, Path], 

182 data: Dict[str, Any], 

183 indent: int = LOCK_CONFIG.JSON_INDENT, 

184 ensure_directory: bool = True 

185) -> None: 

186 """Atomically write JSON data to file using temporary file + rename.""" 

187 file_path = Path(file_path) 

188 

189 if ensure_directory: 

190 file_path.parent.mkdir(parents=True, exist_ok=True) 

191 

192 try: 

193 tmp_path = _write_to_temp_file(file_path, data, indent) 

194 # Use os.replace() instead of os.rename() for atomic replacement on all platforms 

195 # os.rename() fails on Windows if destination exists, os.replace() works on both Unix and Windows 

196 os.replace(tmp_path, str(file_path)) 

197 logger.debug(f"Atomically wrote JSON to {file_path}") 

198 except Exception as e: 

199 raise FileLockError(f"Atomic JSON write failed for {file_path}: {e}") from e 

200 

201 

202def _write_to_temp_file(file_path: Path, data: Dict[str, Any], indent: int) -> str: 

203 """Write data to temporary file and return path.""" 

204 with tempfile.NamedTemporaryFile( 

205 mode='w', 

206 dir=file_path.parent, 

207 prefix=f"{LOCK_CONFIG.TEMP_PREFIX}{file_path.name}", 

208 suffix='.json', 

209 delete=False 

210 ) as tmp_file: 

211 json.dump(data, tmp_file, indent=indent) 

212 tmp_file.flush() 

213 os.fsync(tmp_file.fileno()) 

214 return tmp_file.name 

215 

216 

217def atomic_update_json( 

218 file_path: Union[str, Path], 

219 update_func: Callable[[Optional[Dict[str, Any]]], Dict[str, Any]], 

220 lock_timeout: float = LOCK_CONFIG.DEFAULT_TIMEOUT, 

221 default_data: Optional[Dict[str, Any]] = None 

222) -> None: 

223 """Atomically update JSON file using read-modify-write with file locking.""" 

224 file_path = Path(file_path) 

225 lock_path = file_path.with_suffix(f'{file_path.suffix}{LOCK_CONFIG.LOCK_SUFFIX}') 

226 

227 with file_lock(lock_path, timeout=lock_timeout): 

228 current_data = _read_json_or_default(file_path, default_data) 

229 

230 try: 

231 updated_data = update_func(current_data) 

232 except Exception as e: 

233 raise FileLockError(f"Update function failed for {file_path}: {e}") from e 

234 

235 atomic_write_json(file_path, updated_data) 

236 logger.debug(f"Atomically updated JSON file: {file_path}") 

237 

238 

239def _read_json_or_default(file_path: Path, default_data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: 

240 """Read JSON file or return default data if file doesn't exist or is invalid.""" 

241 if not file_path.exists(): 

242 return default_data 

243 

244 try: 

245 with open(file_path, 'r') as f: 

246 return json.load(f) 

247 except (json.JSONDecodeError, IOError) as e: 

248 logger.warning(f"Failed to read {file_path}, using default: {e}") 

249 return default_data