Coverage for openhcs/core/log_utils.py: 0.0%
96 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Core Log Utilities for OpenHCS
4Unified log discovery, classification, and monitoring utilities
5shared between TUI and PyQt GUI implementations.
6"""
8import logging
9import re
10from pathlib import Path
11from typing import Set, Dict, Optional, List
12from dataclasses import dataclass
14from openhcs.textual_tui.widgets.plate_manager import get_current_log_file_path as _get_current_log_file_path
16logger = logging.getLogger(__name__)
19@dataclass
20class LogFileInfo:
21 """Information about a discovered log file."""
22 path: Path
23 log_type: str # "tui", "main", "worker", "unknown"
24 worker_id: Optional[str] = None
25 display_name: Optional[str] = None
27 def __post_init__(self):
28 """Generate display name if not provided."""
29 if not self.display_name:
30 if self.log_type == "tui":
31 self.display_name = "Main Process"
32 elif self.log_type == "main":
33 self.display_name = "Main Subprocess"
34 elif self.log_type == "worker" and self.worker_id:
35 self.display_name = f"Worker {self.worker_id}"
36 else:
37 self.display_name = self.path.name
40def get_current_log_file_path() -> str:
41 """
42 Get the current log file path from the logging system.
44 This is a fail-loud wrapper around the core logging utility.
46 Returns:
47 str: Path to the current log file
49 Raises:
50 RuntimeError: If no log file found in logging configuration
51 """
52 log_path = _get_current_log_file_path()
53 if log_path is None:
54 raise RuntimeError("No file handler found in logging configuration")
55 return log_path
58def discover_logs(base_log_path: Optional[str] = None, include_main_log: bool = True,
59 log_directory: Optional[Path] = None) -> List[LogFileInfo]:
60 """
61 Discover OpenHCS log files and return as classified LogFileInfo objects.
63 Args:
64 base_log_path: Base path for specific subprocess logs (optional)
65 include_main_log: Whether to include the current main process log
66 log_directory: Directory to search (defaults to standard OpenHCS log directory)
68 Returns:
69 List of LogFileInfo objects for discovered log files
70 """
71 discovered_logs = []
73 # Include current main process log if requested
74 if include_main_log:
75 try:
76 main_log_path = get_current_log_file_path()
77 main_log = Path(main_log_path)
78 if main_log.exists():
79 log_info = classify_log_file(main_log, base_log_path, include_main_log)
80 discovered_logs.append(log_info)
81 except Exception:
82 pass # Main log not available, continue
84 # Discover subprocess logs if base_log_path is provided
85 if base_log_path:
86 base_path = Path(base_log_path)
87 log_dir = base_path.parent
88 if log_dir.exists():
89 for log_file in log_dir.glob("*.log"):
90 if is_relevant_log_file(log_file, base_log_path):
91 log_info = classify_log_file(log_file, base_log_path, include_main_log)
92 discovered_logs.append(log_info)
94 # Discover all OpenHCS logs if no specific base_log_path
95 elif log_directory or not base_log_path:
96 if log_directory is None:
97 log_directory = Path.home() / ".local" / "share" / "openhcs" / "logs"
99 if log_directory.exists():
100 for log_file in log_directory.glob("*.log"):
101 if is_openhcs_log_file(log_file) and log_file not in [log.path for log in discovered_logs]:
102 # Infer base_log_path for proper classification
103 inferred_base = infer_base_log_path(log_file) if 'subprocess_' in log_file.name else None
104 log_info = classify_log_file(log_file, inferred_base, include_main_log)
105 discovered_logs.append(log_info)
107 return discovered_logs
110def classify_log_file(log_path: Path, base_log_path: Optional[str] = None, include_tui_log: bool = True) -> LogFileInfo:
111 """
112 Pure function: Classify a log file and extract metadata.
114 Args:
115 log_path: Path to log file
116 base_log_path: Base path for subprocess log files
117 include_tui_log: Whether to check for TUI log classification
119 Returns:
120 LogFileInfo with classification and metadata
121 """
122 file_name = log_path.name
124 # Check if it's the current TUI log
125 if include_tui_log:
126 try:
127 tui_log_path = get_current_log_file_path()
128 if log_path == Path(tui_log_path):
129 return LogFileInfo(log_path, "tui", display_name="Main Process")
130 except RuntimeError:
131 pass # TUI log not found, continue with other classification
133 # Check subprocess logs if base_log_path is provided
134 if base_log_path:
135 base_name = Path(base_log_path).name
137 # Check if it's the main subprocess log: exact match
138 if file_name == f"{base_name}.log":
139 return LogFileInfo(log_path, "main", display_name="Main Subprocess")
141 # Check if it's a worker log: {base_name}_worker_*.log
142 if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'):
143 # Extract worker ID (everything between _worker_ and .log)
144 worker_part = file_name[len(f"{base_name}_worker_"):-4] # Remove .log suffix
145 worker_id = worker_part.split('_')[0] # Take first part before any additional underscores
146 return LogFileInfo(log_path, "worker", worker_id, display_name=f"Worker {worker_id}")
148 # Unknown or malformed log file
149 logger.debug(f"Unrecognized log file pattern: {file_name}")
150 return LogFileInfo(log_path, "unknown")
153def is_relevant_log_file(file_path: Path, base_log_path: Optional[str]) -> bool:
154 """
155 Check if file is a relevant log file for monitoring.
157 Args:
158 file_path: Path to file to check
159 base_log_path: Base path for subprocess log files
161 Returns:
162 bool: True if file is relevant for monitoring
163 """
164 if not base_log_path:
165 return False
167 base_name = Path(base_log_path).name
168 file_name = file_path.name
170 # Check if it matches our patterns
171 if file_name == f"{base_name}.log":
172 return True
174 if file_name.startswith(f"{base_name}_worker_") and file_name.endswith('.log'):
175 return True
177 return False
180def is_openhcs_log_file(file_path: Path) -> bool:
181 """
182 Check if a file is an OpenHCS log file.
184 Args:
185 file_path: Path to file to check
187 Returns:
188 bool: True if file is an OpenHCS log file
189 """
190 if not file_path.name.endswith('.log'):
191 return False
193 file_name = file_path.name
194 return (file_name.startswith('openhcs_') and
195 ('unified_' in file_name or 'subprocess_' in file_name))
198def infer_base_log_path(file_path: Path) -> str:
199 """
200 Infer the base log path from a subprocess log file name.
202 Args:
203 file_path: Path to subprocess log file
205 Returns:
206 str: Inferred base log path
207 """
208 file_name = file_path.name
210 # Handle worker logs: remove _worker_* suffix
211 if '_worker_' in file_name:
212 base_name = file_name.split('_worker_')[0]
213 else:
214 # Handle main subprocess logs: remove .log extension
215 base_name = file_path.stem
217 return str(file_path.parent / base_name)