Coverage for openhcs/core/log_utils.py: 0.0%

96 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Core Log Utilities for OpenHCS 

3 

4Unified log discovery, classification, and monitoring utilities 

5shared between TUI and PyQt GUI implementations. 

6""" 

7 

8import logging 

9import re 

10from pathlib import Path 

11from typing import Set, Dict, Optional, List 

12from dataclasses import dataclass 

13 

14from openhcs.textual_tui.widgets.plate_manager import get_current_log_file_path as _get_current_log_file_path 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19@dataclass 

20class LogFileInfo: 

21 """Information about a discovered log file.""" 

22 path: Path 

23 log_type: str # "tui", "main", "worker", "unknown" 

24 worker_id: Optional[str] = None 

25 display_name: Optional[str] = None 

26 

27 def __post_init__(self): 

28 """Generate display name if not provided.""" 

29 if not self.display_name: 

30 if self.log_type == "tui": 

31 self.display_name = "Main Process" 

32 elif self.log_type == "main": 

33 self.display_name = "Main Subprocess" 

34 elif self.log_type == "worker" and self.worker_id: 

35 self.display_name = f"Worker {self.worker_id}" 

36 else: 

37 self.display_name = self.path.name 

38 

39 

40def get_current_log_file_path() -> str: 

41 """ 

42 Get the current log file path from the logging system. 

43  

44 This is a fail-loud wrapper around the core logging utility. 

45  

46 Returns: 

47 str: Path to the current log file 

48  

49 Raises: 

50 RuntimeError: If no log file found in logging configuration 

51 """ 

52 log_path = _get_current_log_file_path() 

53 if log_path is None: 

54 raise RuntimeError("No file handler found in logging configuration") 

55 return log_path 

56 

57 

58def discover_logs(base_log_path: Optional[str] = None, include_main_log: bool = True, 

59 log_directory: Optional[Path] = None) -> List[LogFileInfo]: 

60 """ 

61 Discover OpenHCS log files and return as classified LogFileInfo objects. 

62 

63 Args: 

64 base_log_path: Base path for specific subprocess logs (optional) 

65 include_main_log: Whether to include the current main process log 

66 log_directory: Directory to search (defaults to standard OpenHCS log directory) 

67 

68 Returns: 

69 List of LogFileInfo objects for discovered log files 

70 """ 

71 discovered_logs = [] 

72 

73 # Include current main process log if requested 

74 if include_main_log: 

75 try: 

76 main_log_path = get_current_log_file_path() 

77 main_log = Path(main_log_path) 

78 if main_log.exists(): 

79 log_info = classify_log_file(main_log, base_log_path, include_main_log) 

80 discovered_logs.append(log_info) 

81 except Exception: 

82 pass # Main log not available, continue 

83 

84 # Discover subprocess logs if base_log_path is provided 

85 if base_log_path: 

86 base_path = Path(base_log_path) 

87 log_dir = base_path.parent 

88 if log_dir.exists(): 

89 for log_file in log_dir.glob("*.log"): 

90 if is_relevant_log_file(log_file, base_log_path): 

91 log_info = classify_log_file(log_file, base_log_path, include_main_log) 

92 discovered_logs.append(log_info) 

93 

94 # Discover all OpenHCS logs if no specific base_log_path 

95 elif log_directory or not base_log_path: 

96 if log_directory is None: 

97 log_directory = Path.home() / ".local" / "share" / "openhcs" / "logs" 

98 

99 if log_directory.exists(): 

100 for log_file in log_directory.glob("*.log"): 

101 if is_openhcs_log_file(log_file) and log_file not in [log.path for log in discovered_logs]: 

102 # Infer base_log_path for proper classification 

103 inferred_base = infer_base_log_path(log_file) if 'subprocess_' in log_file.name else None 

104 log_info = classify_log_file(log_file, inferred_base, include_main_log) 

105 discovered_logs.append(log_info) 

106 

107 return discovered_logs 

108 

109 

110def classify_log_file(log_path: Path, base_log_path: Optional[str] = None, include_tui_log: bool = True) -> LogFileInfo: 

111 """ 

112 Pure function: Classify a log file and extract metadata. 

113 

114 Args: 

115 log_path: Path to log file 

116 base_log_path: Base path for subprocess log files 

117 include_tui_log: Whether to check for TUI log classification 

118 

119 Returns: 

120 LogFileInfo with classification and metadata 

121 """ 

122 file_name = log_path.name 

123 

124 # Check if it's the current TUI log 

125 if include_tui_log: 

126 try: 

127 tui_log_path = get_current_log_file_path() 

128 if log_path == Path(tui_log_path): 

129 return LogFileInfo(log_path, "tui", display_name="Main Process") 

130 except RuntimeError: 

131 pass # TUI log not found, continue with other classification 

132 

133 # Check subprocess logs if base_log_path is provided 

134 if base_log_path: 

135 base_name = Path(base_log_path).name 

136 

137 # Check if it's the main subprocess log: exact match 

138 if file_name == f"{base_name}.log": 

139 return LogFileInfo(log_path, "main", display_name="Main Subprocess") 

140 

141 # Check if it's a worker log: {base_name}_worker_*.log 

142 if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'): 

143 # Extract worker ID (everything between _worker_ and .log) 

144 worker_part = file_name[len(f"{base_name}_worker_"):-4] # Remove .log suffix 

145 worker_id = worker_part.split('_')[0] # Take first part before any additional underscores 

146 return LogFileInfo(log_path, "worker", worker_id, display_name=f"Worker {worker_id}") 

147 

148 # Unknown or malformed log file 

149 logger.debug(f"Unrecognized log file pattern: {file_name}") 

150 return LogFileInfo(log_path, "unknown") 

151 

152 

153def is_relevant_log_file(file_path: Path, base_log_path: Optional[str]) -> bool: 

154 """ 

155 Check if file is a relevant log file for monitoring. 

156 

157 Args: 

158 file_path: Path to file to check 

159 base_log_path: Base path for subprocess log files 

160 

161 Returns: 

162 bool: True if file is relevant for monitoring 

163 """ 

164 if not base_log_path: 

165 return False 

166 

167 base_name = Path(base_log_path).name 

168 file_name = file_path.name 

169 

170 # Check if it matches our patterns 

171 if file_name == f"{base_name}.log": 

172 return True 

173 

174 if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'): 

175 return True 

176 

177 return False 

178 

179 

180def is_openhcs_log_file(file_path: Path) -> bool: 

181 """ 

182 Check if a file is an OpenHCS log file. 

183 

184 Args: 

185 file_path: Path to file to check 

186 

187 Returns: 

188 bool: True if file is an OpenHCS log file 

189 """ 

190 if not file_path.name.endswith('.log'): 

191 return False 

192 

193 file_name = file_path.name 

194 return (file_name.startswith('openhcs_') and 

195 ('unified_' in file_name or 'subprocess_' in file_name)) 

196 

197 

198def infer_base_log_path(file_path: Path) -> str: 

199 """ 

200 Infer the base log path from a subprocess log file name. 

201 

202 Args: 

203 file_path: Path to subprocess log file 

204 

205 Returns: 

206 str: Inferred base log path 

207 """ 

208 file_name = file_path.name 

209 

210 # Handle worker logs: remove _worker_* suffix 

211 if '_worker_' in file_name: 

212 base_name = file_name.split('_worker_')[0] 

213 else: 

214 # Handle main subprocess logs: remove .log extension 

215 base_name = file_path.stem 

216 

217 return str(file_path.parent / base_name) 

218 

219 

220 

221 

222 

223