Coverage for openhcs/io/atomic.py: 75.4%

110 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Atomic file operations with locking for OpenHCS. 

3 

4Provides utilities for atomic read-modify-write operations with file locking 

5to prevent concurrency issues in multiprocessing environments. 

6""" 

7 

8import json 

9import logging 

10import os 

11import tempfile 

12import time 

13from contextlib import contextmanager 

14from dataclasses import dataclass 

15from pathlib import Path 

16from typing import Any, Callable, Dict, Optional, TypeVar, Union 

17 

18# Cross-platform file locking 

19try: 

20 import fcntl 

21 FCNTL_AVAILABLE = True 

22except ImportError: 

23 # Windows compatibility - use portalocker 

24 import portalocker 

25 FCNTL_AVAILABLE = False 

26 

27logger = logging.getLogger(__name__) 

28 

29T = TypeVar('T') 

30 

31 

32@dataclass(frozen=True) 

33class LockConfig: 

34 """Configuration constants for file locking operations.""" 

35 DEFAULT_TIMEOUT: float = 30.0 

36 DEFAULT_POLL_INTERVAL: float = 0.1 

37 LOCK_SUFFIX: str = '.lock' 

38 TEMP_PREFIX: str = '.tmp' 

39 JSON_INDENT: int = 2 

40 

41 

42LOCK_CONFIG = LockConfig() 

43 

44 

45class FileLockError(Exception): 

46 """Raised when file locking operations fail.""" 

47 pass 

48 

49 

50class FileLockTimeoutError(FileLockError): 

51 """Raised when file lock acquisition times out.""" 

52 pass 

53 

54 

55@contextmanager 

56def file_lock( 

57 lock_path: Union[str, Path], 

58 timeout: float = LOCK_CONFIG.DEFAULT_TIMEOUT, 

59 poll_interval: float = LOCK_CONFIG.DEFAULT_POLL_INTERVAL 

60): 

61 """Context manager for exclusive file locking.""" 

62 lock_path = Path(lock_path) 

63 lock_path.parent.mkdir(parents=True, exist_ok=True) 

64 

65 lock_fd = None 

66 try: 

67 lock_fd = _acquire_lock_with_timeout(lock_path, timeout, poll_interval) 

68 yield 

69 except FileLockTimeoutError: 

70 raise 

71 except Exception as e: 

72 raise FileLockError(f"File lock operation failed for {lock_path}: {e}") from e 

73 finally: 

74 _cleanup_lock(lock_fd, lock_path) 

75 

76 

77def _acquire_lock_with_timeout(lock_path: Path, timeout: float, poll_interval: float) -> int: 

78 """Acquire file lock with timeout and return file descriptor.""" 

79 deadline = time.time() + timeout 

80 

81 while time.time() < deadline: 81 ↛ 86line 81 didn't jump to line 86 because the condition on line 81 was always true

82 if lock_fd := _try_acquire_lock(lock_path): 82 ↛ 84line 82 didn't jump to line 84 because the condition on line 82 was always true

83 return lock_fd 

84 time.sleep(poll_interval) 

85 

86 raise FileLockTimeoutError(f"Failed to acquire lock {lock_path} within {timeout}s") 

87 

88 

89def _try_acquire_lock(lock_path: Path) -> Optional[int]: 

90 """Try to acquire lock once, return fd or None.""" 

91 try: 

92 lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_WRONLY | os.O_TRUNC) 

93 if FCNTL_AVAILABLE: 93 ↛ 97line 93 didn't jump to line 97 because the condition on line 93 was always true

94 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 

95 else: 

96 # Windows: use portalocker 

97 portalocker.lock(lock_fd, portalocker.LOCK_EX | portalocker.LOCK_NB) 

98 logger.debug(f"Acquired file lock: {lock_path}") 

99 return lock_fd 

100 except (OSError, IOError): 

101 return None 

102 

103 

104def _cleanup_lock(lock_fd: Optional[int], lock_path: Path) -> None: 

105 """Clean up file lock resources.""" 

106 if lock_fd is not None: 106 ↛ 118line 106 didn't jump to line 118 because the condition on line 106 was always true

107 try: 

108 if FCNTL_AVAILABLE: 108 ↛ 112line 108 didn't jump to line 112 because the condition on line 108 was always true

109 fcntl.flock(lock_fd, fcntl.LOCK_UN) 

110 else: 

111 # Windows: use portalocker 

112 portalocker.unlock(lock_fd) 

113 os.close(lock_fd) 

114 logger.debug(f"Released file lock: {lock_path}") 

115 except Exception as e: 

116 logger.warning(f"Error releasing lock {lock_path}: {e}") 

117 

118 if lock_path.exists(): 118 ↛ exitline 118 didn't return from function '_cleanup_lock' because the condition on line 118 was always true

119 try: 

120 lock_path.unlink() 

121 except Exception as e: 

122 logger.warning(f"Error removing lock file {lock_path}: {e}") 

123 

124 

125def atomic_write_json( 

126 file_path: Union[str, Path], 

127 data: Dict[str, Any], 

128 indent: int = LOCK_CONFIG.JSON_INDENT, 

129 ensure_directory: bool = True 

130) -> None: 

131 """Atomically write JSON data to file using temporary file + rename.""" 

132 file_path = Path(file_path) 

133 

134 if ensure_directory: 134 ↛ 137line 134 didn't jump to line 137 because the condition on line 134 was always true

135 file_path.parent.mkdir(parents=True, exist_ok=True) 

136 

137 try: 

138 tmp_path = _write_to_temp_file(file_path, data, indent) 

139 # Use os.replace() instead of os.rename() for atomic replacement on all platforms 

140 # os.rename() fails on Windows if destination exists, os.replace() works on both Unix and Windows 

141 os.replace(tmp_path, str(file_path)) 

142 logger.debug(f"Atomically wrote JSON to {file_path}") 

143 except Exception as e: 

144 raise FileLockError(f"Atomic JSON write failed for {file_path}: {e}") from e 

145 

146 

147def _write_to_temp_file(file_path: Path, data: Dict[str, Any], indent: int) -> str: 

148 """Write data to temporary file and return path.""" 

149 with tempfile.NamedTemporaryFile( 

150 mode='w', 

151 dir=file_path.parent, 

152 prefix=f"{LOCK_CONFIG.TEMP_PREFIX}{file_path.name}", 

153 suffix='.json', 

154 delete=False 

155 ) as tmp_file: 

156 json.dump(data, tmp_file, indent=indent) 

157 tmp_file.flush() 

158 os.fsync(tmp_file.fileno()) 

159 return tmp_file.name 

160 

161 

162def atomic_update_json( 

163 file_path: Union[str, Path], 

164 update_func: Callable[[Optional[Dict[str, Any]]], Dict[str, Any]], 

165 lock_timeout: float = LOCK_CONFIG.DEFAULT_TIMEOUT, 

166 default_data: Optional[Dict[str, Any]] = None 

167) -> None: 

168 """Atomically update JSON file using read-modify-write with file locking.""" 

169 file_path = Path(file_path) 

170 lock_path = file_path.with_suffix(f'{file_path.suffix}{LOCK_CONFIG.LOCK_SUFFIX}') 

171 

172 with file_lock(lock_path, timeout=lock_timeout): 

173 current_data = _read_json_or_default(file_path, default_data) 

174 

175 try: 

176 updated_data = update_func(current_data) 

177 except Exception as e: 

178 raise FileLockError(f"Update function failed for {file_path}: {e}") from e 

179 

180 atomic_write_json(file_path, updated_data) 

181 logger.debug(f"Atomically updated JSON file: {file_path}") 

182 

183 

184def _read_json_or_default(file_path: Path, default_data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: 

185 """Read JSON file or return default data if file doesn't exist or is invalid.""" 

186 if not file_path.exists(): 

187 return default_data 

188 

189 try: 

190 with open(file_path, 'r') as f: 

191 return json.load(f) 

192 except (json.JSONDecodeError, IOError) as e: 

193 logger.warning(f"Failed to read {file_path}, using default: {e}") 

194 return default_data