Coverage for openhcs/core/log_utils.py: 0.0%

120 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Core Log Utilities for OpenHCS 

3 

4Unified log discovery, classification, and monitoring utilities 

5shared between TUI and PyQt GUI implementations. 

6""" 

7 

8import logging 

9import time 

10from pathlib import Path 

11from typing import Optional, List 

12from dataclasses import dataclass 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17def get_current_log_file_path() -> str: 

18 """Get the current log file path from the logging system.""" 

19 try: 

20 # Get the root logger and find the FileHandler 

21 root_logger = logging.getLogger() 

22 for handler in root_logger.handlers: 

23 if isinstance(handler, logging.FileHandler): 

24 return handler.baseFilename 

25 

26 # Fallback: try to get from openhcs logger 

27 openhcs_logger = logging.getLogger("openhcs") 

28 for handler in openhcs_logger.handlers: 

29 if isinstance(handler, logging.FileHandler): 

30 return handler.baseFilename 

31 

32 # Last resort: create a default path 

33 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs" 

34 log_dir.mkdir(parents=True, exist_ok=True) 

35 return str(log_dir / f"openhcs_subprocess_{int(time.time())}.log") 

36 

37 except Exception as e: 

38 logger.error(f"Failed to get current log file path: {e}") 

39 raise RuntimeError(f"Could not determine log file path: {e}") 

40 

41 

42@dataclass 

43class LogFileInfo: 

44 """Information about a discovered log file.""" 

45 path: Path 

46 log_type: str # "tui", "main", "worker", "unknown" 

47 worker_id: Optional[str] = None 

48 display_name: Optional[str] = None 

49 

50 def __post_init__(self): 

51 """Generate display name if not provided.""" 

52 if not self.display_name: 

53 if self.log_type == "tui": 

54 self.display_name = "Main Process" 

55 elif self.log_type == "main": 

56 self.display_name = "Main Subprocess" 

57 elif self.log_type == "worker" and self.worker_id: 

58 self.display_name = f"Worker {self.worker_id}" 

59 else: 

60 self.display_name = self.path.name 

61 

62 

63def discover_logs(base_log_path: Optional[str] = None, include_main_log: bool = True, 

64 log_directory: Optional[Path] = None) -> List[LogFileInfo]: 

65 """ 

66 Discover OpenHCS log files and return as classified LogFileInfo objects. 

67 

68 Args: 

69 base_log_path: Base path for specific subprocess logs (optional) 

70 include_main_log: Whether to include the current main process log 

71 log_directory: Directory to search (defaults to standard OpenHCS log directory) 

72 

73 Returns: 

74 List of LogFileInfo objects for discovered log files 

75 """ 

76 discovered_logs = [] 

77 

78 # Include current main process log if requested 

79 if include_main_log: 

80 try: 

81 main_log_path = get_current_log_file_path() 

82 main_log = Path(main_log_path) 

83 if main_log.exists(): 

84 log_info = classify_log_file(main_log, base_log_path, include_main_log) 

85 discovered_logs.append(log_info) 

86 except Exception: 

87 pass # Main log not available, continue 

88 

89 # Discover subprocess logs if base_log_path is provided 

90 if base_log_path: 

91 base_path = Path(base_log_path) 

92 log_dir = base_path.parent 

93 if log_dir.exists(): 

94 for log_file in log_dir.glob("*.log"): 

95 if is_relevant_log_file(log_file, base_log_path): 

96 log_info = classify_log_file(log_file, base_log_path, include_main_log) 

97 discovered_logs.append(log_info) 

98 

99 # Discover all OpenHCS logs if no specific base_log_path 

100 elif log_directory or not base_log_path: 

101 if log_directory is None: 

102 log_directory = Path.home() / ".local" / "share" / "openhcs" / "logs" 

103 

104 if log_directory.exists(): 

105 for log_file in log_directory.glob("*.log"): 

106 if is_openhcs_log_file(log_file) and log_file not in [log.path for log in discovered_logs]: 

107 # Infer base_log_path for proper classification 

108 inferred_base = infer_base_log_path(log_file) if 'subprocess_' in log_file.name else None 

109 log_info = classify_log_file(log_file, inferred_base, include_main_log) 

110 discovered_logs.append(log_info) 

111 

112 return discovered_logs 

113 

114 

115def classify_log_file(log_path: Path, base_log_path: Optional[str] = None, include_tui_log: bool = True) -> LogFileInfo: 

116 """ 

117 Pure function: Classify a log file and extract metadata. 

118 

119 Args: 

120 log_path: Path to log file 

121 base_log_path: Base path for subprocess log files 

122 include_tui_log: Whether to check for TUI log classification 

123 

124 Returns: 

125 LogFileInfo with classification and metadata 

126 """ 

127 file_name = log_path.name 

128 

129 # Check if it's the current TUI log 

130 if include_tui_log: 

131 try: 

132 tui_log_path = get_current_log_file_path() 

133 if log_path == Path(tui_log_path): 

134 return LogFileInfo(log_path, "tui", display_name="Main Process") 

135 except RuntimeError: 

136 pass # TUI log not found, continue with other classification 

137 

138 # Check for ZMQ server logs (openhcs_zmq_server_port_{port}_{timestamp}.log) 

139 if file_name.startswith('openhcs_zmq_server_port_'): 

140 # Extract port from filename 

141 parts = file_name.replace('openhcs_zmq_server_port_', '').replace('.log', '').split('_') 

142 port = parts[0] if parts else 'unknown' 

143 return LogFileInfo(log_path, "zmq_server", display_name=f"ZMQ Server (port {port})") 

144 

145 # Check for ZMQ worker logs 

146 if file_name.startswith('zmq_worker_exec_'): 

147 # Extract execution ID and worker PID 

148 parts = file_name.replace('zmq_worker_exec_', '').replace('.log', '').split('_worker_') 

149 if len(parts) == 2: 

150 exec_id_short = parts[0][:8] # First 8 chars of UUID 

151 worker_pid = parts[1].split('_')[0] # PID is first part after _worker_ 

152 return LogFileInfo(log_path, "zmq_worker", worker_pid, display_name=f"ZMQ Worker {worker_pid}") 

153 

154 # Check for Napari viewer logs 

155 if file_name.startswith('napari_detached_port_'): 

156 port = file_name.replace('napari_detached_port_', '').replace('.log', '') 

157 return LogFileInfo(log_path, "napari", display_name=f"Napari Viewer (port {port})") 

158 

159 # Check subprocess logs if base_log_path is provided 

160 if base_log_path: 

161 base_name = Path(base_log_path).name 

162 

163 # Check if it's the main subprocess log: exact match 

164 if file_name == f"{base_name}.log": 

165 return LogFileInfo(log_path, "main", display_name="Main Subprocess") 

166 

167 # Check if it's a worker log: {base_name}_worker_*.log 

168 if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'): 

169 # Extract worker ID (everything between _worker_ and .log) 

170 worker_part = file_name[len(f"{base_name}_worker_"):-4] # Remove .log suffix 

171 worker_id = worker_part.split('_')[0] # Take first part before any additional underscores 

172 return LogFileInfo(log_path, "worker", worker_id, display_name=f"Worker {worker_id}") 

173 

174 # Unknown or malformed log file 

175 logger.debug(f"Unrecognized log file pattern: {file_name}") 

176 return LogFileInfo(log_path, "unknown") 

177 

178 

179def is_relevant_log_file(file_path: Path, base_log_path: Optional[str]) -> bool: 

180 """ 

181 Check if file is a relevant log file for monitoring. 

182 

183 Args: 

184 file_path: Path to file to check 

185 base_log_path: Base path for subprocess log files 

186 

187 Returns: 

188 bool: True if file is relevant for monitoring 

189 """ 

190 if not base_log_path: 

191 return False 

192 

193 base_name = Path(base_log_path).name 

194 file_name = file_path.name 

195 

196 # Check if it matches our patterns 

197 if file_name == f"{base_name}.log": 

198 return True 

199 

200 if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'): 

201 return True 

202 

203 return False 

204 

205 

206def is_openhcs_log_file(file_path: Path) -> bool: 

207 """ 

208 Check if a file is an OpenHCS log file. 

209 

210 Args: 

211 file_path: Path to file to check 

212 

213 Returns: 

214 bool: True if file is an OpenHCS log file 

215 """ 

216 if not file_path.name.endswith('.log'): 

217 return False 

218 

219 file_name = file_path.name 

220 

221 # OpenHCS log patterns: 

222 # - openhcs_unified_*.log (main UI process) 

223 # - openhcs_subprocess_*.log (subprocess runner) 

224 # - openhcs_zmq_server_port_*.log (ZMQ execution server) 

225 # - pyqt_gui_subprocess_*.log (PyQt subprocess runner) 

226 # - zmq_worker_exec_*.log (ZMQ worker processes) 

227 # - napari_detached_port_*.log (Napari viewer) 

228 

229 openhcs_patterns = [ 

230 'openhcs_', 

231 'pyqt_gui_subprocess_', 

232 'zmq_worker_', 

233 'napari_detached_' 

234 ] 

235 

236 return any(file_name.startswith(pattern) for pattern in openhcs_patterns) 

237 

238 

239def infer_base_log_path(file_path: Path) -> str: 

240 """ 

241 Infer the base log path from a subprocess log file name. 

242 

243 Args: 

244 file_path: Path to subprocess log file 

245 

246 Returns: 

247 str: Inferred base log path 

248 """ 

249 file_name = file_path.name 

250 

251 # Handle worker logs: remove _worker_* suffix 

252 if '_worker_' in file_name: 

253 base_name = file_name.split('_worker_')[0] 

254 else: 

255 # Handle main subprocess logs: remove .log extension 

256 base_name = file_path.stem 

257 

258 return str(file_path.parent / base_name) 

259 

260 

261 

262 

263 

264