Coverage for openhcs/core/log_utils.py: 0.0%
120 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Core Log Utilities for OpenHCS
4Unified log discovery, classification, and monitoring utilities
5shared between TUI and PyQt GUI implementations.
6"""
8import logging
9import time
10from pathlib import Path
11from typing import Optional, List
12from dataclasses import dataclass
14logger = logging.getLogger(__name__)
17def get_current_log_file_path() -> str:
18 """Get the current log file path from the logging system."""
19 try:
20 # Get the root logger and find the FileHandler
21 root_logger = logging.getLogger()
22 for handler in root_logger.handlers:
23 if isinstance(handler, logging.FileHandler):
24 return handler.baseFilename
26 # Fallback: try to get from openhcs logger
27 openhcs_logger = logging.getLogger("openhcs")
28 for handler in openhcs_logger.handlers:
29 if isinstance(handler, logging.FileHandler):
30 return handler.baseFilename
32 # Last resort: create a default path
33 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs"
34 log_dir.mkdir(parents=True, exist_ok=True)
35 return str(log_dir / f"openhcs_subprocess_{int(time.time())}.log")
37 except Exception as e:
38 logger.error(f"Failed to get current log file path: {e}")
39 raise RuntimeError(f"Could not determine log file path: {e}")
42@dataclass
43class LogFileInfo:
44 """Information about a discovered log file."""
45 path: Path
46 log_type: str # "tui", "main", "worker", "unknown"
47 worker_id: Optional[str] = None
48 display_name: Optional[str] = None
50 def __post_init__(self):
51 """Generate display name if not provided."""
52 if not self.display_name:
53 if self.log_type == "tui":
54 self.display_name = "Main Process"
55 elif self.log_type == "main":
56 self.display_name = "Main Subprocess"
57 elif self.log_type == "worker" and self.worker_id:
58 self.display_name = f"Worker {self.worker_id}"
59 else:
60 self.display_name = self.path.name
63def discover_logs(base_log_path: Optional[str] = None, include_main_log: bool = True,
64 log_directory: Optional[Path] = None) -> List[LogFileInfo]:
65 """
66 Discover OpenHCS log files and return as classified LogFileInfo objects.
68 Args:
69 base_log_path: Base path for specific subprocess logs (optional)
70 include_main_log: Whether to include the current main process log
71 log_directory: Directory to search (defaults to standard OpenHCS log directory)
73 Returns:
74 List of LogFileInfo objects for discovered log files
75 """
76 discovered_logs = []
78 # Include current main process log if requested
79 if include_main_log:
80 try:
81 main_log_path = get_current_log_file_path()
82 main_log = Path(main_log_path)
83 if main_log.exists():
84 log_info = classify_log_file(main_log, base_log_path, include_main_log)
85 discovered_logs.append(log_info)
86 except Exception:
87 pass # Main log not available, continue
89 # Discover subprocess logs if base_log_path is provided
90 if base_log_path:
91 base_path = Path(base_log_path)
92 log_dir = base_path.parent
93 if log_dir.exists():
94 for log_file in log_dir.glob("*.log"):
95 if is_relevant_log_file(log_file, base_log_path):
96 log_info = classify_log_file(log_file, base_log_path, include_main_log)
97 discovered_logs.append(log_info)
99 # Discover all OpenHCS logs if no specific base_log_path
100 elif log_directory or not base_log_path:
101 if log_directory is None:
102 log_directory = Path.home() / ".local" / "share" / "openhcs" / "logs"
104 if log_directory.exists():
105 for log_file in log_directory.glob("*.log"):
106 if is_openhcs_log_file(log_file) and log_file not in [log.path for log in discovered_logs]:
107 # Infer base_log_path for proper classification
108 inferred_base = infer_base_log_path(log_file) if 'subprocess_' in log_file.name else None
109 log_info = classify_log_file(log_file, inferred_base, include_main_log)
110 discovered_logs.append(log_info)
112 return discovered_logs
115def classify_log_file(log_path: Path, base_log_path: Optional[str] = None, include_tui_log: bool = True) -> LogFileInfo:
116 """
117 Pure function: Classify a log file and extract metadata.
119 Args:
120 log_path: Path to log file
121 base_log_path: Base path for subprocess log files
122 include_tui_log: Whether to check for TUI log classification
124 Returns:
125 LogFileInfo with classification and metadata
126 """
127 file_name = log_path.name
129 # Check if it's the current TUI log
130 if include_tui_log:
131 try:
132 tui_log_path = get_current_log_file_path()
133 if log_path == Path(tui_log_path):
134 return LogFileInfo(log_path, "tui", display_name="Main Process")
135 except RuntimeError:
136 pass # TUI log not found, continue with other classification
138 # Check for ZMQ server logs (openhcs_zmq_server_port_{port}_{timestamp}.log)
139 if file_name.startswith('openhcs_zmq_server_port_'):
140 # Extract port from filename
141 parts = file_name.replace('openhcs_zmq_server_port_', '').replace('.log', '').split('_')
142 port = parts[0] if parts else 'unknown'
143 return LogFileInfo(log_path, "zmq_server", display_name=f"ZMQ Server (port {port})")
145 # Check for ZMQ worker logs
146 if file_name.startswith('zmq_worker_exec_'):
147 # Extract execution ID and worker PID
148 parts = file_name.replace('zmq_worker_exec_', '').replace('.log', '').split('_worker_')
149 if len(parts) == 2:
150 exec_id_short = parts[0][:8] # First 8 chars of UUID
151 worker_pid = parts[1].split('_')[0] # PID is first part after _worker_
152 return LogFileInfo(log_path, "zmq_worker", worker_pid, display_name=f"ZMQ Worker {worker_pid}")
154 # Check for Napari viewer logs
155 if file_name.startswith('napari_detached_port_'):
156 port = file_name.replace('napari_detached_port_', '').replace('.log', '')
157 return LogFileInfo(log_path, "napari", display_name=f"Napari Viewer (port {port})")
159 # Check subprocess logs if base_log_path is provided
160 if base_log_path:
161 base_name = Path(base_log_path).name
163 # Check if it's the main subprocess log: exact match
164 if file_name == f"{base_name}.log":
165 return LogFileInfo(log_path, "main", display_name="Main Subprocess")
167 # Check if it's a worker log: {base_name}_worker_*.log
168 if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'):
169 # Extract worker ID (everything between _worker_ and .log)
170 worker_part = file_name[len(f"{base_name}_worker_"):-4] # Remove .log suffix
171 worker_id = worker_part.split('_')[0] # Take first part before any additional underscores
172 return LogFileInfo(log_path, "worker", worker_id, display_name=f"Worker {worker_id}")
174 # Unknown or malformed log file
175 logger.debug(f"Unrecognized log file pattern: {file_name}")
176 return LogFileInfo(log_path, "unknown")
179def is_relevant_log_file(file_path: Path, base_log_path: Optional[str]) -> bool:
180 """
181 Check if file is a relevant log file for monitoring.
183 Args:
184 file_path: Path to file to check
185 base_log_path: Base path for subprocess log files
187 Returns:
188 bool: True if file is relevant for monitoring
189 """
190 if not base_log_path:
191 return False
193 base_name = Path(base_log_path).name
194 file_name = file_path.name
196 # Check if it matches our patterns
197 if file_name == f"{base_name}.log":
198 return True
200 if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'):
201 return True
203 return False
206def is_openhcs_log_file(file_path: Path) -> bool:
207 """
208 Check if a file is an OpenHCS log file.
210 Args:
211 file_path: Path to file to check
213 Returns:
214 bool: True if file is an OpenHCS log file
215 """
216 if not file_path.name.endswith('.log'):
217 return False
219 file_name = file_path.name
221 # OpenHCS log patterns:
222 # - openhcs_unified_*.log (main UI process)
223 # - openhcs_subprocess_*.log (subprocess runner)
224 # - openhcs_zmq_server_port_*.log (ZMQ execution server)
225 # - pyqt_gui_subprocess_*.log (PyQt subprocess runner)
226 # - zmq_worker_exec_*.log (ZMQ worker processes)
227 # - napari_detached_port_*.log (Napari viewer)
229 openhcs_patterns = [
230 'openhcs_',
231 'pyqt_gui_subprocess_',
232 'zmq_worker_',
233 'napari_detached_'
234 ]
236 return any(file_name.startswith(pattern) for pattern in openhcs_patterns)
239def infer_base_log_path(file_path: Path) -> str:
240 """
241 Infer the base log path from a subprocess log file name.
243 Args:
244 file_path: Path to subprocess log file
246 Returns:
247 str: Inferred base log path
248 """
249 file_name = file_path.name
251 # Handle worker logs: remove _worker_* suffix
252 if '_worker_' in file_name:
253 base_name = file_name.split('_worker_')[0]
254 else:
255 # Handle main subprocess logs: remove .log extension
256 base_name = file_path.stem
258 return str(file_path.parent / base_name)