Coverage for openhcs/pyqt_gui/widgets/log_viewer.py: 0.0%
769 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2PyQt6 Log Viewer Window
4Provides comprehensive log viewing capabilities with real-time tailing, search functionality,
5and integration with OpenHCS subprocess execution. Reimplements log viewing using Qt widgets
6for native desktop integration.
7"""
9import logging
10from typing import Optional, List, Set, Tuple
11from pathlib import Path
13from PyQt6.QtWidgets import (
14 QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QComboBox,
15 QTextEdit, QToolBar, QLineEdit, QCheckBox, QPushButton, QDialog
16)
17from PyQt6.QtGui import QSyntaxHighlighter, QTextDocument
18from PyQt6.QtCore import QObject, QTimer, QFileSystemWatcher, pyqtSignal, pyqtSlot, Qt, QRegularExpression, QThread
19from PyQt6.QtGui import QTextCharFormat, QColor, QAction, QFont, QTextCursor
21from openhcs.io.filemanager import FileManager
22from openhcs.core.log_utils import LogFileInfo
23from openhcs.pyqt_gui.utils.log_detection_utils import (
24 get_current_tui_log_path, discover_logs, discover_all_logs
25)
26from openhcs.core.log_utils import (
27 classify_log_file, is_openhcs_log_file, infer_base_log_path
28)
29from openhcs.pyqt_gui.utils.process_tracker import (
30 ProcessTracker, extract_pid_from_log_filename, get_log_display_name, get_log_tooltip
31)
33# Import Pygments for advanced syntax highlighting
34from pygments import highlight
35from pygments.lexers import PythonLexer, get_lexer_by_name
36from pygments.formatters import get_formatter_by_name
37from pygments.token import Token
38from pygments.style import Style
39from pygments.styles import get_style_by_name
40from dataclasses import dataclass
41from typing import Dict, Tuple
43logger = logging.getLogger(__name__)
46@dataclass
47class LogColorScheme:
48 """
49 Centralized color scheme for log highlighting with semantic color names.
51 Supports light/dark theme variants and ensures WCAG accessibility compliance.
52 All colors meet minimum 4.5:1 contrast ratio for normal text readability.
53 """
55 # Log level colors with semantic meaning (WCAG 4.5:1 compliant)
56 log_critical_fg: Tuple[int, int, int] = (255, 255, 255) # White text
57 log_critical_bg: Tuple[int, int, int] = (139, 0, 0) # Dark red background
58 log_error_color: Tuple[int, int, int] = (255, 85, 85) # Brighter red - WCAG compliant
59 log_warning_color: Tuple[int, int, int] = (255, 140, 0) # Dark orange - attention grabbing
60 log_info_color: Tuple[int, int, int] = (100, 160, 210) # Brighter steel blue - WCAG compliant
61 log_debug_color: Tuple[int, int, int] = (160, 160, 160) # Lighter gray - better contrast
63 # Metadata and structural colors
64 timestamp_color: Tuple[int, int, int] = (105, 105, 105) # Dim gray - unobtrusive
65 logger_name_color: Tuple[int, int, int] = (147, 112, 219) # Medium slate blue - distinctive
66 memory_address_color: Tuple[int, int, int] = (255, 182, 193) # Light pink - technical data
67 file_path_color: Tuple[int, int, int] = (34, 139, 34) # Forest green - file system
69 # Python syntax colors (following VS Code dark theme conventions)
70 python_keyword_color: Tuple[int, int, int] = (86, 156, 214) # Blue - language keywords
71 python_string_color: Tuple[int, int, int] = (206, 145, 120) # Orange - string literals
72 python_number_color: Tuple[int, int, int] = (181, 206, 168) # Light green - numeric values
73 python_operator_color: Tuple[int, int, int] = (212, 212, 212) # Light gray - operators/punctuation
74 python_name_color: Tuple[int, int, int] = (156, 220, 254) # Light blue - identifiers
75 python_function_color: Tuple[int, int, int] = (220, 220, 170) # Yellow - function names
76 python_class_color: Tuple[int, int, int] = (78, 201, 176) # Teal - class names
77 python_builtin_color: Tuple[int, int, int] = (86, 156, 214) # Blue - built-in functions
78 python_comment_color: Tuple[int, int, int] = (106, 153, 85) # Green - comments
80 # Special highlighting colors
81 exception_color: Tuple[int, int, int] = (255, 69, 0) # Red orange - error types
82 function_call_color: Tuple[int, int, int] = (255, 215, 0) # Gold - function invocations
83 boolean_color: Tuple[int, int, int] = (86, 156, 214) # Blue - True/False/None
85 # Enhanced syntax colors (Phase 1 additions)
86 tuple_parentheses_color: Tuple[int, int, int] = (255, 215, 0) # Gold - tuple delimiters
87 set_braces_color: Tuple[int, int, int] = (255, 140, 0) # Dark orange - set delimiters
88 class_representation_color: Tuple[int, int, int] = (78, 201, 176) # Teal - <class 'name'>
89 function_representation_color: Tuple[int, int, int] = (220, 220, 170) # Yellow - <function name>
90 module_path_color: Tuple[int, int, int] = (147, 112, 219) # Medium slate blue - module.path
91 hex_number_color: Tuple[int, int, int] = (181, 206, 168) # Light green - 0xFF
92 scientific_notation_color: Tuple[int, int, int] = (181, 206, 168) # Light green - 1.23e-4
93 binary_number_color: Tuple[int, int, int] = (181, 206, 168) # Light green - 0b1010
94 octal_number_color: Tuple[int, int, int] = (181, 206, 168) # Light green - 0o755
95 python_special_color: Tuple[int, int, int] = (255, 20, 147) # Deep pink - __name__
96 single_quoted_string_color: Tuple[int, int, int] = (206, 145, 120) # Orange - 'string'
97 list_comprehension_color: Tuple[int, int, int] = (156, 220, 254) # Light blue - [x for x in y]
98 generator_expression_color: Tuple[int, int, int] = (156, 220, 254) # Light blue - (x for x in y)
100 @classmethod
101 def create_dark_theme(cls) -> 'LogColorScheme':
102 """
103 Create a dark theme variant with adjusted colors for dark backgrounds.
105 Returns:
106 LogColorScheme: Dark theme color scheme with higher contrast
107 """
108 return cls(
109 # Enhanced colors for dark backgrounds with better contrast
110 log_error_color=(255, 100, 100), # Brighter red
111 log_info_color=(120, 180, 230), # Brighter steel blue
112 timestamp_color=(160, 160, 160), # Lighter gray
113 python_string_color=(236, 175, 150), # Brighter orange
114 python_number_color=(200, 230, 190), # Brighter green
115 # Other colors remain the same as they work well on dark backgrounds
116 )
118 @classmethod
119 def create_light_theme(cls) -> 'LogColorScheme':
120 """
121 Create a light theme variant with adjusted colors for light backgrounds.
123 Returns:
124 LogColorScheme: Light theme color scheme with appropriate contrast
125 """
126 return cls(
127 # Darker colors for light backgrounds with WCAG compliance
128 log_error_color=(180, 20, 40), # Darker red
129 log_info_color=(30, 80, 130), # Darker steel blue
130 log_warning_color=(200, 100, 0), # Darker orange
131 timestamp_color=(60, 60, 60), # Darker gray
132 logger_name_color=(100, 60, 160), # Darker slate blue
133 python_string_color=(150, 80, 60), # Darker orange
134 python_number_color=(120, 140, 100), # Darker green
135 memory_address_color=(200, 120, 140), # Darker pink
136 file_path_color=(20, 100, 20), # Darker forest green
137 exception_color=(200, 40, 0), # Darker red orange
138 # Adjust other colors for light background contrast
139 )
141 def to_qcolor(self, color_tuple: Tuple[int, int, int]) -> QColor:
142 """
143 Convert RGB tuple to QColor object.
145 Args:
146 color_tuple: RGB color tuple (r, g, b)
148 Returns:
149 QColor: Qt color object
150 """
151 return QColor(*color_tuple)
154class LogFileDetector(QObject):
155 """
156 Detects new log files in directory using efficient file monitoring.
158 Uses QFileSystemWatcher to monitor directory changes and set operations
159 for efficient new file detection. Handles base_log_path as file prefix
160 and watches the parent directory.
161 """
163 # Signals
164 new_log_detected = pyqtSignal(object) # LogFileInfo object
165 _server_scan_complete = pyqtSignal(list) # List of LogFileInfo from server scan
167 def __init__(self, base_log_path: Optional[str] = None):
168 """
169 Initialize LogFileDetector.
171 Args:
172 base_log_path: Base path for subprocess log files (file prefix, not directory)
173 """
174 super().__init__()
175 self._base_log_path = base_log_path
176 self._previous_files: Set[Path] = set()
177 self._watcher = QFileSystemWatcher()
178 self._watcher.directoryChanged.connect(self._on_directory_changed)
179 self._watching_directory: Optional[Path] = None
181 logger.debug(f"LogFileDetector initialized with base_log_path: {base_log_path}")
183 def start_watching(self, directory: Path) -> None:
184 """
185 Start watching directory for new log files.
187 Args:
188 directory: Directory to watch for new log files
189 """
190 if not directory.exists():
191 logger.warning(f"Cannot watch non-existent directory: {directory}")
192 return
194 # Stop any existing watching
195 self.stop_watching()
197 # Add directory to watcher
198 success = self._watcher.addPath(str(directory))
199 if success:
200 self._watching_directory = directory
201 # Initialize previous files set
202 self._previous_files = self.scan_directory(directory)
203 logger.debug(f"Started watching directory: {directory}")
204 logger.debug(f"Initial file count: {len(self._previous_files)}")
205 else:
206 logger.error(f"Failed to add directory to watcher: {directory}")
208 def stop_watching(self) -> None:
209 """Stop file watching and cleanup."""
210 if self._watching_directory:
211 self._watcher.removePath(str(self._watching_directory))
212 self._watching_directory = None
213 self._previous_files.clear()
214 logger.debug("Stopped file watching")
216 def scan_directory(self, directory: Path) -> Set[Path]:
217 """
218 Scan directory for .log files.
220 Args:
221 directory: Directory to scan
223 Returns:
224 Set[Path]: Set of Path objects for .log files found
225 """
226 try:
227 log_files = set(directory.glob("*.log"))
228 logger.debug(f"Scanned directory {directory}: found {len(log_files)} .log files")
229 return log_files
230 except (FileNotFoundError, PermissionError) as e:
231 logger.warning(f"Error scanning directory {directory}: {e}")
232 return set()
234 def detect_new_files(self, current_files: Set[Path]) -> Set[Path]:
235 """
236 Use set.difference() to find new files efficiently.
238 Args:
239 current_files: Current set of files in directory
241 Returns:
242 Set[Path]: Set of newly discovered files
243 """
244 new_files = current_files.difference(self._previous_files)
245 if new_files:
246 logger.debug(f"Detected {len(new_files)} new files: {[f.name for f in new_files]}")
248 # Update previous files set
249 self._previous_files = current_files
250 return new_files
254 def _on_directory_changed(self, directory_path: str) -> None:
255 """
256 Handle QFileSystemWatcher directory change signal.
258 Args:
259 directory_path: Path of directory that changed
260 """
261 directory = Path(directory_path)
262 logger.debug(f"Directory changed: {directory}")
264 # Scan directory for current files
265 current_files = self.scan_directory(directory)
267 # Detect new files
268 new_files = self.detect_new_files(current_files)
270 # Process new files
271 for file_path in new_files:
272 if file_path.exists() and is_openhcs_log_file(file_path):
273 try:
274 # For general watching, try to infer base_log_path from the file name
275 effective_base_log_path = self._base_log_path
276 if not effective_base_log_path and 'subprocess_' in file_path.name:
277 effective_base_log_path = infer_base_log_path(file_path)
279 log_info = classify_log_file(file_path, effective_base_log_path,
280 include_tui_log=False)
282 logger.info(f"New relevant log file detected: {file_path} (type: {log_info.log_type})")
283 self.new_log_detected.emit(log_info)
284 except Exception as e:
285 logger.error(f"Error classifying new log file {file_path}: {e}")
288class LogHighlighter(QSyntaxHighlighter):
289 """
290 Advanced syntax highlighter for log files using Pygments.
292 Provides sophisticated highlighting for OpenHCS log format with support for:
293 - Log levels and timestamps
294 - Python code snippets and data structures
295 - Memory addresses and function signatures
296 - Complex nested dictionaries and lists
297 - Exception tracebacks and file paths
298 """
300 def __init__(self, parent: QTextDocument, color_scheme: LogColorScheme = None):
301 """
302 Initialize the log highlighter with optional color scheme.
304 Args:
305 parent: QTextDocument to apply highlighting to
306 color_scheme: Color scheme to use (defaults to dark theme)
307 """
308 super().__init__(parent)
309 self.color_scheme = color_scheme or LogColorScheme()
310 self.setup_pygments_styles()
311 self.setup_highlighting_rules()
313 def setup_pygments_styles(self) -> None:
314 """Setup Pygments token to QTextCharFormat mapping using color scheme."""
315 cs = self.color_scheme # Shorthand for readability
317 # Create a mapping from Pygments tokens to Qt text formats
318 self.token_formats = {
319 # Log levels with distinct colors and backgrounds
320 'log_critical': self._create_format(
321 cs.to_qcolor(cs.log_critical_fg),
322 cs.to_qcolor(cs.log_critical_bg),
323 bold=True
324 ),
325 'log_error': self._create_format(cs.to_qcolor(cs.log_error_color), bold=True),
326 'log_warning': self._create_format(cs.to_qcolor(cs.log_warning_color), bold=True),
327 'log_info': self._create_format(cs.to_qcolor(cs.log_info_color), bold=True),
328 'log_debug': self._create_format(cs.to_qcolor(cs.log_debug_color)),
330 # Timestamps and metadata
331 'timestamp': self._create_format(cs.to_qcolor(cs.timestamp_color)),
332 'logger_name': self._create_format(cs.to_qcolor(cs.logger_name_color), bold=True),
334 # Python syntax highlighting (for complex data structures)
335 Token.Keyword: self._create_format(cs.to_qcolor(cs.python_keyword_color), bold=True),
336 Token.String: self._create_format(cs.to_qcolor(cs.python_string_color)),
337 Token.String.Single: self._create_format(cs.to_qcolor(cs.python_string_color)),
338 Token.String.Double: self._create_format(cs.to_qcolor(cs.python_string_color)),
339 Token.Number: self._create_format(cs.to_qcolor(cs.python_number_color)),
340 Token.Number.Integer: self._create_format(cs.to_qcolor(cs.python_number_color)),
341 Token.Number.Float: self._create_format(cs.to_qcolor(cs.python_number_color)),
342 Token.Number.Hex: self._create_format(cs.to_qcolor(cs.python_number_color)),
343 Token.Number.Oct: self._create_format(cs.to_qcolor(cs.python_number_color)),
344 Token.Number.Bin: self._create_format(cs.to_qcolor(cs.python_number_color)),
345 Token.Operator: self._create_format(cs.to_qcolor(cs.python_operator_color)),
346 Token.Punctuation: self._create_format(cs.to_qcolor(cs.python_operator_color)),
347 Token.Name: self._create_format(cs.to_qcolor(cs.python_name_color)),
348 Token.Name.Function: self._create_format(cs.to_qcolor(cs.python_function_color), bold=True),
349 Token.Name.Class: self._create_format(cs.to_qcolor(cs.python_class_color), bold=True),
350 Token.Name.Builtin: self._create_format(cs.to_qcolor(cs.python_builtin_color)),
351 Token.Comment: self._create_format(cs.to_qcolor(cs.python_comment_color)),
352 Token.Literal: self._create_format(cs.to_qcolor(cs.python_number_color)),
354 # Special patterns for log content
355 'memory_address': self._create_format(cs.to_qcolor(cs.memory_address_color)),
356 'file_path': self._create_format(cs.to_qcolor(cs.file_path_color)),
357 'exception': self._create_format(cs.to_qcolor(cs.exception_color), bold=True),
358 'function_call': self._create_format(cs.to_qcolor(cs.function_call_color)),
359 'dict_key': self._create_format(cs.to_qcolor(cs.python_name_color)),
360 'boolean': self._create_format(cs.to_qcolor(cs.boolean_color), bold=True),
362 # Enhanced Python syntax elements (Phase 1)
363 'tuple_parentheses': self._create_format(cs.to_qcolor(cs.tuple_parentheses_color)),
364 'set_braces': self._create_format(cs.to_qcolor(cs.set_braces_color)),
365 'class_representation': self._create_format(cs.to_qcolor(cs.class_representation_color), bold=True),
366 'function_representation': self._create_format(cs.to_qcolor(cs.function_representation_color), bold=True),
367 'module_path': self._create_format(cs.to_qcolor(cs.module_path_color)),
368 'hex_number': self._create_format(cs.to_qcolor(cs.hex_number_color)),
369 'scientific_notation': self._create_format(cs.to_qcolor(cs.scientific_notation_color)),
370 'binary_number': self._create_format(cs.to_qcolor(cs.binary_number_color)),
371 'octal_number': self._create_format(cs.to_qcolor(cs.octal_number_color)),
372 'python_special': self._create_format(cs.to_qcolor(cs.python_special_color), bold=True),
373 'single_quoted_string': self._create_format(cs.to_qcolor(cs.single_quoted_string_color)),
374 'list_comprehension': self._create_format(cs.to_qcolor(cs.list_comprehension_color)),
375 'generator_expression': self._create_format(cs.to_qcolor(cs.generator_expression_color)),
376 }
378 def _create_format(self, fg_color: QColor, bg_color: QColor = None, bold: bool = False) -> QTextCharFormat:
379 """Create a QTextCharFormat with specified properties."""
380 format = QTextCharFormat()
381 format.setForeground(fg_color)
382 if bg_color:
383 format.setBackground(bg_color)
384 if bold:
385 format.setFontWeight(QFont.Weight.Bold)
386 return format
388 def setup_highlighting_rules(self) -> None:
389 """Setup regex patterns for log-specific highlighting."""
390 self.highlighting_rules = []
392 # Log level patterns (highest priority)
393 log_levels = [
394 ("CRITICAL", self.token_formats['log_critical']),
395 ("ERROR", self.token_formats['log_error']),
396 ("WARNING", self.token_formats['log_warning']),
397 ("INFO", self.token_formats['log_info']),
398 ("DEBUG", self.token_formats['log_debug']),
399 ]
401 for level, format in log_levels:
402 pattern = QRegularExpression(rf"\b{level}\b")
403 self.highlighting_rules.append((pattern, format))
405 # Timestamp pattern: YYYY-MM-DD HH:MM:SS,mmm
406 timestamp_pattern = QRegularExpression(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}")
407 self.highlighting_rules.append((timestamp_pattern, self.token_formats['timestamp']))
409 # Logger names (e.g., openhcs.core.orchestrator)
410 logger_pattern = QRegularExpression(r"openhcs\.[a-zA-Z0-9_.]+")
411 self.highlighting_rules.append((logger_pattern, self.token_formats['logger_name']))
413 # Memory addresses (e.g., 0x7f1640dd8e00)
414 memory_pattern = QRegularExpression(r"0x[0-9a-fA-F]+")
415 self.highlighting_rules.append((memory_pattern, self.token_formats['memory_address']))
417 # File paths in tracebacks
418 filepath_pattern = QRegularExpression(r'["\']?/[^"\'\s]+\.py["\']?')
419 self.highlighting_rules.append((filepath_pattern, self.token_formats['file_path']))
421 # Exception names
422 exception_pattern = QRegularExpression(r'\b[A-Z][a-zA-Z]*Error\b|\b[A-Z][a-zA-Z]*Exception\b')
423 self.highlighting_rules.append((exception_pattern, self.token_formats['exception']))
425 # Function calls with parentheses
426 function_pattern = QRegularExpression(r'\b[a-zA-Z_][a-zA-Z0-9_]*\(\)')
427 self.highlighting_rules.append((function_pattern, self.token_formats['function_call']))
429 # Boolean values
430 boolean_pattern = QRegularExpression(r'\b(True|False|None)\b')
431 self.highlighting_rules.append((boolean_pattern, self.token_formats['boolean']))
433 # Enhanced Python syntax elements
435 # Single-quoted strings (complement to double-quoted)
436 single_quote_pattern = QRegularExpression(r"'[^']*'")
437 self.highlighting_rules.append((single_quote_pattern, self.token_formats['single_quoted_string']))
439 # Class representations: <class 'module.ClassName'>
440 class_repr_pattern = QRegularExpression(r"<class '[^']*'>")
441 self.highlighting_rules.append((class_repr_pattern, self.token_formats['class_representation']))
443 # Function representations: <function name at 0xaddress>
444 function_repr_pattern = QRegularExpression(r"<function [^>]+ at 0x[0-9a-fA-F]+>")
445 self.highlighting_rules.append((function_repr_pattern, self.token_formats['function_representation']))
447 # Extended module paths (beyond just openhcs)
448 module_path_pattern = QRegularExpression(r"\b[a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*){2,}")
449 self.highlighting_rules.append((module_path_pattern, self.token_formats['module_path']))
451 # Hexadecimal numbers (beyond memory addresses): 0xFF, 0x1A2B
452 hex_number_pattern = QRegularExpression(r"\b0[xX][0-9a-fA-F]+\b")
453 self.highlighting_rules.append((hex_number_pattern, self.token_formats['hex_number']))
455 # Scientific notation: 1.23e-4, 5.67E+10
456 scientific_pattern = QRegularExpression(r"\b\d+\.?\d*[eE][+-]?\d+\b")
457 self.highlighting_rules.append((scientific_pattern, self.token_formats['scientific_notation']))
459 # Binary literals: 0b1010
460 binary_pattern = QRegularExpression(r"\b0[bB][01]+\b")
461 self.highlighting_rules.append((binary_pattern, self.token_formats['binary_number']))
463 # Octal literals: 0o755
464 octal_pattern = QRegularExpression(r"\b0[oO][0-7]+\b")
465 self.highlighting_rules.append((octal_pattern, self.token_formats['octal_number']))
467 # Python special constants: __name__, __main__, __file__, etc.
468 python_special_pattern = QRegularExpression(r"\b__[a-zA-Z_][a-zA-Z0-9_]*__\b")
469 self.highlighting_rules.append((python_special_pattern, self.token_formats['python_special']))
471 logger.debug(f"Setup {len(self.highlighting_rules)} highlighting rules")
473 def set_color_scheme(self, color_scheme: LogColorScheme) -> None:
474 """
475 Update the color scheme and refresh highlighting.
477 Args:
478 color_scheme: New color scheme to apply
479 """
480 self.color_scheme = color_scheme
481 self.setup_pygments_styles()
482 self.setup_highlighting_rules()
483 # Trigger re-highlighting of the entire document
484 self.rehighlight()
485 logger.debug(f"Applied new color scheme with {len(self.token_formats)} token formats")
487 def switch_to_dark_theme(self) -> None:
488 """Switch to dark theme color scheme."""
489 self.set_color_scheme(LogColorScheme.create_dark_theme())
491 def switch_to_light_theme(self) -> None:
492 """Switch to light theme color scheme."""
493 self.set_color_scheme(LogColorScheme.create_light_theme())
495 @classmethod
496 def load_color_scheme_from_config(cls, config_path: str = None) -> LogColorScheme:
497 """
498 Load color scheme from external configuration file.
500 Args:
501 config_path: Path to JSON/YAML config file (optional)
503 Returns:
504 LogColorScheme: Loaded color scheme or default if file not found
505 """
506 if config_path and Path(config_path).exists():
507 try:
508 import json
509 with open(config_path, 'r') as f:
510 config = json.load(f)
512 # Create color scheme from config
513 scheme_kwargs = {}
514 for key, value in config.items():
515 if key.endswith('_color') or key.endswith('_fg') or key.endswith('_bg'):
516 if isinstance(value, list) and len(value) == 3:
517 scheme_kwargs[key] = tuple(value)
519 return LogColorScheme(**scheme_kwargs)
521 except Exception as e:
522 logger.warning(f"Failed to load color scheme from {config_path}: {e}")
524 return LogColorScheme() # Return default scheme
526 def highlightBlock(self, text: str) -> None:
527 """
528 Apply highlighting to text block using regex patterns.
530 Uses regex patterns for log-specific content (timestamps, log levels, etc.).
531 Fast and doesn't block the UI.
532 """
533 # Apply log-specific patterns
534 for pattern, format in self.highlighting_rules:
535 iterator = pattern.globalMatch(text)
536 while iterator.hasNext():
537 match = iterator.next()
538 start = match.capturedStart()
539 length = match.capturedLength()
540 self.setFormat(start, length, format)
543class LogFileLoader(QThread):
544 """Background thread for loading large log files without blocking UI."""
546 # Signals
547 content_loaded = pyqtSignal(str) # Emits file content when loaded
548 load_failed = pyqtSignal(str) # Emits error message on failure
550 def __init__(self, log_path: Path):
551 super().__init__()
552 self.log_path = log_path
554 def run(self):
555 """Load file content in background thread."""
556 try:
557 with open(self.log_path, 'r', encoding='utf-8', errors='replace') as f:
558 content = f.read()
559 self.content_loaded.emit(content)
560 except Exception as e:
561 self.load_failed.emit(str(e))
564class LogViewerWindow(QMainWindow):
565 """Main log viewer window with dropdown, search, and real-time tailing."""
567 window_closed = pyqtSignal()
568 _subprocess_scan_complete = pyqtSignal(list) # Internal signal for async subprocess scan
569 _server_scan_complete = pyqtSignal(list) # Internal signal for async server scan
571 def __init__(self, file_manager: FileManager, service_adapter, parent=None):
572 super().__init__(parent)
573 self.file_manager = file_manager
574 self.service_adapter = service_adapter
576 # State
577 self.current_log_path: Optional[Path] = None
578 self.current_file_position: int = 0
579 self.auto_scroll_enabled: bool = True
580 self.tailing_paused: bool = False
582 # Search state
583 self.current_search_text: str = ""
584 self.search_highlights: List[QTextCursor] = []
586 # Components
587 self.log_selector: QComboBox = None
588 self.search_toolbar: QToolBar = None
589 self.log_display: QTextEdit = None
590 self.file_detector: LogFileDetector = None
591 self.tail_timer: QTimer = None
592 self.highlighter: LogHighlighter = None
593 self.file_loader: Optional[LogFileLoader] = None # Async file loader
594 self.server_scan_timer: QTimer = None # Periodic ZMQ server scanning
595 self._pending_log_to_load: Optional[Path] = None # Log to load when window is shown
597 # Process tracking for alive/dead process indication
598 self.process_tracker = ProcessTracker()
599 self.process_update_timer: QTimer = None
600 self.show_alive_only: bool = False # Filter to show only logs from running processes
602 # Master list of all discovered logs (single source of truth)
603 # Dropdown is a filtered VIEW of this list
604 self._all_discovered_logs: List[LogFileInfo] = []
606 # Track session start time to filter out old logs from previous sessions
607 # Use current process start time, not log viewer init time
608 self._session_start_time = self._get_process_start_time()
610 self.setup_ui()
611 self.setup_connections()
612 self.initialize_logs()
613 self.start_process_tracking()
615 def setup_ui(self) -> None:
616 """Setup complete UI layout with exact widget hierarchy."""
617 self.setWindowTitle("Log Viewer")
618 self.setMinimumSize(800, 600)
620 # Central widget with main layout
621 central_widget = QWidget()
622 self.setCentralWidget(central_widget)
623 main_layout = QVBoxLayout(central_widget)
625 # Log selector dropdown
626 self.log_selector = QComboBox()
627 self.log_selector.setMinimumHeight(30)
628 main_layout.addWidget(self.log_selector)
630 # Search toolbar (initially hidden)
631 self.search_toolbar = QToolBar("Search")
632 self.search_toolbar.setVisible(False)
634 # Search input
635 self.search_input = QLineEdit()
636 self.search_input.setPlaceholderText("Search logs...")
637 self.search_toolbar.addWidget(self.search_input)
639 # Search options
640 self.case_sensitive_cb = QCheckBox("Case sensitive")
641 self.search_toolbar.addWidget(self.case_sensitive_cb)
643 self.regex_cb = QCheckBox("Regex")
644 self.search_toolbar.addWidget(self.regex_cb)
646 # Search navigation buttons
647 self.prev_button = QPushButton("Previous")
648 self.next_button = QPushButton("Next")
649 self.close_search_button = QPushButton("Close")
651 self.search_toolbar.addWidget(self.prev_button)
652 self.search_toolbar.addWidget(self.next_button)
653 self.search_toolbar.addWidget(self.close_search_button)
655 main_layout.addWidget(self.search_toolbar)
657 # Log display area
658 self.log_display = QTextEdit()
659 self.log_display.setReadOnly(True)
660 self.log_display.setFont(QFont("Consolas", 10)) # Monospace font for logs
661 main_layout.addWidget(self.log_display)
663 # Control buttons layout
664 control_layout = QHBoxLayout()
666 self.auto_scroll_btn = QPushButton("Auto-scroll")
667 self.auto_scroll_btn.setCheckable(True)
668 self.auto_scroll_btn.setChecked(True)
670 self.pause_btn = QPushButton("Pause")
671 self.pause_btn.setCheckable(True)
673 self.clear_btn = QPushButton("Clear")
674 self.bottom_btn = QPushButton("Bottom")
676 # Process filter checkbox
677 self.show_alive_only_cb = QCheckBox("Show only running processes")
678 self.show_alive_only_cb.setToolTip("Filter logs to show only those from currently running processes")
680 control_layout.addWidget(self.auto_scroll_btn)
681 control_layout.addWidget(self.pause_btn)
682 control_layout.addWidget(self.clear_btn)
683 control_layout.addWidget(self.bottom_btn)
684 control_layout.addWidget(self.show_alive_only_cb)
685 control_layout.addStretch() # Push buttons to left
687 main_layout.addLayout(control_layout)
689 # Setup syntax highlighting
690 self.highlighter = LogHighlighter(self.log_display.document())
692 # Setup window-local Ctrl+F shortcut
693 search_action = QAction("Search", self)
694 search_action.setShortcut("Ctrl+F")
695 search_action.triggered.connect(self.toggle_search_toolbar)
696 self.addAction(search_action)
698 logger.debug("LogViewerWindow UI setup complete")
700 def setup_connections(self) -> None:
701 """Setup signal/slot connections."""
702 # Log selector
703 self.log_selector.currentIndexChanged.connect(self.on_log_selection_changed)
705 # Search functionality
706 self.search_input.returnPressed.connect(self.perform_search)
707 self.prev_button.clicked.connect(self.find_previous)
708 self.next_button.clicked.connect(self.find_next)
709 self.close_search_button.clicked.connect(self.toggle_search_toolbar)
711 # Control buttons
712 self.auto_scroll_btn.toggled.connect(self.toggle_auto_scroll)
713 self.pause_btn.toggled.connect(self.toggle_pause_tailing)
714 self.clear_btn.clicked.connect(self.clear_log_display)
715 self.bottom_btn.clicked.connect(self.scroll_to_bottom)
716 self.show_alive_only_cb.stateChanged.connect(self.on_filter_changed)
718 # Internal signals
719 self._subprocess_scan_complete.connect(self._on_subprocess_scan_complete)
720 self._server_scan_complete.connect(self._on_server_scan_complete)
722 logger.debug("LogViewerWindow connections setup complete")
724 def showEvent(self, event):
725 """Override showEvent to load log when window is first shown."""
726 super().showEvent(event)
728 # Load pending log on first show
729 if self._pending_log_to_load:
730 self.switch_to_log(self._pending_log_to_load)
731 self._pending_log_to_load = None
733 def initialize_logs(self) -> None:
734 """Initialize with main log only, then scan for subprocess logs in background."""
735 # Only discover the main log initially (fast startup)
736 initial_logs = []
737 try:
738 from openhcs.core.log_utils import get_current_log_file_path, classify_log_file
739 from pathlib import Path
741 main_log_path = get_current_log_file_path()
742 main_log = Path(main_log_path)
743 if main_log.exists():
744 log_info = classify_log_file(main_log, None, True)
745 initial_logs.append(log_info)
746 logger.debug("Discovered main log")
747 except Exception as e:
748 logger.warning(f"Error discovering main log: {e}")
749 # Continue without main log
750 pass
752 # Store main log in master list
753 self._all_discovered_logs = initial_logs.copy()
755 # Populate dropdown with main log immediately (fast)
756 if initial_logs:
757 self.populate_log_dropdown(initial_logs)
758 # Store first log to load when window is shown (defer loading)
759 self._pending_log_to_load = initial_logs[0].path
761 # Start monitoring for new logs
762 self.start_monitoring()
764 # Scan for existing subprocess logs in background (async - doesn't block)
765 self._scan_subprocess_logs_async()
767 # Scan for servers in background (async - doesn't block)
768 self._scan_servers_async()
770 def _scan_subprocess_logs_async(self) -> None:
771 """Scan for existing subprocess logs in background thread (non-blocking)."""
772 import threading
774 def scan_and_update():
775 """Background thread function."""
776 subprocess_logs = self._scan_for_subprocess_logs()
777 # Emit signal to update UI on main thread
778 self._subprocess_scan_complete.emit(subprocess_logs)
780 thread = threading.Thread(target=scan_and_update, daemon=True)
781 thread.start()
782 logger.debug("Started async subprocess log scan in background")
784 def _scan_servers_async(self) -> None:
785 """Scan for ZMQ servers in background thread (non-blocking)."""
786 import threading
788 def scan_and_update():
789 """Background thread function."""
790 server_logs = self._scan_for_server_logs()
791 # Emit signal to update UI on main thread
792 self._server_scan_complete.emit(server_logs)
794 thread = threading.Thread(target=scan_and_update, daemon=True)
795 thread.start()
796 logger.debug("Started async server scan in background")
798 @pyqtSlot(list)
799 def _on_subprocess_scan_complete(self, subprocess_logs: List[LogFileInfo]) -> None:
800 """Handle subprocess scan completion on UI thread."""
801 if not subprocess_logs:
802 logger.debug("No subprocess logs found during scan")
803 return
805 # Add subprocess logs to master list (avoid duplicates by path)
806 existing_paths = {log.path for log in self._all_discovered_logs}
807 new_logs_added = 0
808 for subprocess_log in subprocess_logs:
809 if subprocess_log.path not in existing_paths:
810 self._all_discovered_logs.append(subprocess_log)
811 new_logs_added += 1
813 # Repopulate dropdown from master list
814 self.populate_log_dropdown(self._all_discovered_logs)
816 logger.info(f"Added {new_logs_added} subprocess logs from current session "
817 f"(scanned {len(subprocess_logs)} total)")
819 @pyqtSlot(list)
820 def _on_server_scan_complete(self, server_logs: List[LogFileInfo]) -> None:
821 """Handle server scan completion on UI thread."""
822 if not server_logs:
823 logger.debug("No server logs found during scan")
824 return
826 # Add server logs to master list (avoid duplicates by path)
827 existing_paths = {log.path for log in self._all_discovered_logs}
828 new_logs_added = 0
829 for server_log in server_logs:
830 if server_log.path not in existing_paths:
831 self._all_discovered_logs.append(server_log)
832 new_logs_added += 1
834 # Repopulate dropdown from master list
835 self.populate_log_dropdown(self._all_discovered_logs)
837 logger.info(f"Added {new_logs_added} new server logs to dropdown (scanned {len(server_logs)} total)")
839 def _scan_for_subprocess_logs(self) -> List[LogFileInfo]:
840 """
841 Efficiently scan log directory for subprocess logs from current session.
842 Uses os.scandir() and filters by mtime FIRST before parsing.
843 Returns list of LogFileInfo for discovered subprocess log files.
844 """
845 from openhcs.core.log_utils import classify_log_file, is_openhcs_log_file
846 from pathlib import Path
847 import os
849 logger.debug("Scanning for subprocess logs from current session...")
851 try:
852 # Get log directory
853 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs"
855 if not log_dir.exists():
856 return []
858 # Use os.scandir() for efficiency - it's faster than glob and gives us stat info
859 session_logs = []
860 total_scanned = 0
861 filtered_by_time = 0
863 # Calculate cutoff time (session start - 5 second buffer)
864 cutoff_time = self._session_start_time - 5.0
866 # Scan directory efficiently
867 with os.scandir(log_dir) as entries:
868 for entry in entries:
869 total_scanned += 1
871 # Skip non-.log files immediately
872 if not entry.name.endswith('.log'):
873 continue
875 # Filter by mtime FIRST (cheap filesystem check)
876 # This avoids parsing thousands of old log files
877 try:
878 stat_info = entry.stat()
879 if stat_info.st_mtime < cutoff_time:
880 filtered_by_time += 1
881 continue
882 except OSError:
883 continue
885 # Now check if it's an OpenHCS log (still just filename check, no file I/O)
886 log_path = Path(entry.path)
887 if not is_openhcs_log_file(log_path):
888 continue
890 # Finally, classify it (this is the expensive part, but we only do it for recent files)
891 try:
892 log_info = classify_log_file(log_path, None, include_tui_log=False)
893 session_logs.append(log_info)
894 except Exception as e:
895 logger.debug(f"Failed to classify {log_path}: {e}")
897 logger.info(f"Found {len(session_logs)} subprocess logs from current session "
898 f"(scanned {total_scanned} files, filtered {filtered_by_time} by time)")
900 return session_logs
901 except Exception as e:
902 logger.warning(f"Error scanning for subprocess logs: {e}")
903 return []
905 def _scan_for_server_logs(self) -> List[LogFileInfo]:
906 """
907 Scan for running ZMQ servers and Napari viewers by pinging common ports.
908 Returns list of LogFileInfo for discovered server log files.
909 """
910 from openhcs.core.log_utils import classify_log_file
911 from pathlib import Path
912 import zmq
913 import pickle
915 logger.debug("Scanning for running ZMQ/streaming servers...")
916 discovered_logs = []
918 # Scan all streaming ports using current global config
919 # This ensures we find viewers launched with custom ports
920 from openhcs.core.config import get_all_streaming_ports
921 ports_to_scan = get_all_streaming_ports(num_ports_per_type=10) # Uses global config by default
923 def ping_server(port: int) -> dict:
924 """Ping a server and return pong response, or None if no response."""
925 from openhcs.constants.constants import CONTROL_PORT_OFFSET
926 from openhcs.runtime.zmq_base import get_zmq_transport_url, get_default_transport_mode
928 control_port = port + CONTROL_PORT_OFFSET
929 try:
930 context = zmq.Context()
931 socket = context.socket(zmq.REQ)
932 socket.setsockopt(zmq.LINGER, 0)
933 socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout (servers may be busy)
935 # Use transport mode-aware URL (IPC or TCP)
936 transport_mode = get_default_transport_mode()
937 control_url = get_zmq_transport_url(control_port, transport_mode, 'localhost')
938 socket.connect(control_url)
940 # Send ping
941 socket.send(pickle.dumps({'type': 'ping'}))
943 # Wait for pong
944 response = socket.recv()
945 pong = pickle.loads(response)
947 socket.close()
948 context.term()
949 logger.debug(f"Port {port} responded: {pong}")
950 return pong
951 except Exception as e:
952 logger.debug(f"Port {port} no response: {e}")
953 return None
955 # Scan all ports (execution server + all streaming types)
956 for port in ports_to_scan:
957 pong = ping_server(port)
958 if pong and pong.get('log_file_path'):
959 log_path = Path(pong['log_file_path'])
960 if log_path.exists():
961 log_info = classify_log_file(log_path, None, False)
962 discovered_logs.append(log_info)
963 viewer_type = pong.get('viewer', 'ZMQ server')
964 logger.debug(f"Discovered {viewer_type} log: {log_path}")
966 return discovered_logs
968 # Dropdown Management Methods
969 def populate_log_dropdown(self, log_files: List[LogFileInfo]) -> None:
970 """
971 Populate QComboBox with log files with process status indicators.
973 Args:
974 log_files: List of LogFileInfo objects to add to dropdown
975 """
976 self.log_selector.clear()
978 # Sort logs: TUI first, main subprocess, then workers by timestamp
979 sorted_logs = sorted(log_files, key=self._log_sort_key)
981 # Filter if "show alive only" is enabled
982 if self.show_alive_only:
983 sorted_logs = [
984 log_info for log_info in sorted_logs
985 if self._is_log_from_alive_process(log_info)
986 ]
988 for log_info in sorted_logs:
989 # Add process status indicator to display name
990 display_name = get_log_display_name(log_info.path, self.process_tracker)
991 tooltip = get_log_tooltip(log_info.path, self.process_tracker)
993 self.log_selector.addItem(display_name, log_info)
994 # Set tooltip for the item
995 self.log_selector.setItemData(self.log_selector.count() - 1, tooltip, Qt.ItemDataRole.ToolTipRole)
997 logger.debug(f"Populated dropdown with {len(sorted_logs)} log files (filtered: {self.show_alive_only})")
999 def _log_sort_key(self, log_info: LogFileInfo) -> tuple:
1000 """
1001 Generate sort key for log files.
1003 Args:
1004 log_info: LogFileInfo to generate sort key for
1006 Returns:
1007 tuple: Sort key (priority, timestamp)
1008 """
1009 # Priority: TUI=0, main=1, worker=2, unknown=3
1010 priority_map = {"tui": 0, "main": 1, "worker": 2, "unknown": 3}
1011 priority = priority_map.get(log_info.log_type, 3)
1013 # Use file modification time as secondary sort
1014 try:
1015 timestamp = log_info.path.stat().st_mtime
1016 except (OSError, AttributeError):
1017 timestamp = 0
1019 return (priority, -timestamp) # Negative timestamp for newest first
1021 def clear_subprocess_logs(self) -> None:
1022 """Remove all non-TUI logs from dropdown and switch to TUI log."""
1023 import traceback
1024 logger.error(f"🔥 DEBUG: clear_subprocess_logs called! Stack trace:")
1025 for line in traceback.format_stack():
1026 logger.error(f"🔥 DEBUG: {line.strip()}")
1028 current_logs = []
1030 # Collect TUI logs only
1031 for i in range(self.log_selector.count()):
1032 log_info = self.log_selector.itemData(i)
1033 if log_info and log_info.log_type == "tui":
1034 current_logs.append(log_info)
1036 # Repopulate with TUI logs only
1037 self.populate_log_dropdown(current_logs)
1039 # Auto-select TUI log if available
1040 if current_logs:
1041 self.switch_to_log(current_logs[0].path)
1043 logger.info("Cleared subprocess logs, kept TUI logs")
1045 def add_new_log(self, log_file_info: LogFileInfo) -> None:
1046 """
1047 Add new log to dropdown maintaining sort order.
1049 Args:
1050 log_file_info: New LogFileInfo to add
1051 """
1052 # Add to master list (avoid duplicates by path)
1053 existing_paths = {log.path for log in self._all_discovered_logs}
1054 if log_file_info.path not in existing_paths:
1055 self._all_discovered_logs.append(log_file_info)
1057 # Repopulate dropdown from master list
1058 self.populate_log_dropdown(self._all_discovered_logs)
1060 logger.info(f"Added new log to dropdown: {log_file_info.display_name}")
1061 else:
1062 logger.debug(f"Log already exists, skipping: {log_file_info.display_name}")
1064 def on_log_selection_changed(self, index: int) -> None:
1065 """
1066 Handle dropdown selection change - switch log display.
1068 Args:
1069 index: Selected index in dropdown
1070 """
1071 if index >= 0:
1072 log_info = self.log_selector.itemData(index)
1073 if log_info:
1074 self.switch_to_log(log_info.path)
1076 def switch_to_log(self, log_path: Path) -> None:
1077 """
1078 Switch log display to show specified log file.
1080 Args:
1081 log_path: Path to log file to display
1082 """
1083 try:
1084 # Stop current tailing
1085 if self.tail_timer and self.tail_timer.isActive():
1086 self.tail_timer.stop()
1088 # Stop any existing file loader
1089 if self.file_loader and self.file_loader.isRunning():
1090 self.file_loader.wait()
1092 # Validate file exists
1093 if not log_path.exists():
1094 self.log_display.setText(f"Log file not found: {log_path}")
1095 return
1097 # Store path for later use
1098 self.current_log_path = log_path
1100 # ALWAYS use async loading to prevent UI blocking
1101 # QSyntaxHighlighter is already lazy - it only highlights visible blocks
1102 file_size = log_path.stat().st_size
1103 logger.debug(f"Loading log file ({file_size} bytes) asynchronously")
1104 self.log_display.setText(f"Loading log file ({file_size // 1024} KB)...")
1106 # Create and start async loader
1107 self.file_loader = LogFileLoader(log_path)
1108 self.file_loader.content_loaded.connect(self._on_file_loaded)
1109 self.file_loader.load_failed.connect(self._on_file_load_failed)
1110 self.file_loader.start()
1112 except Exception as e:
1113 logger.error(f"Error switching to log {log_path}: {e}")
1114 raise
1116 def _on_file_loaded(self, content: str) -> None:
1117 """Handle file content loaded (either sync or async)."""
1118 try:
1119 # Set content - QSyntaxHighlighter only processes visible blocks automatically
1120 self.log_display.setText(content)
1122 # Update file position
1123 self.current_file_position = len(content.encode('utf-8'))
1125 # Start tailing if not paused
1126 if not self.tailing_paused and self.current_log_path:
1127 self.start_log_tailing(self.current_log_path)
1129 # Scroll to bottom if auto-scroll enabled
1130 if self.auto_scroll_enabled:
1131 self.scroll_to_bottom()
1133 logger.info(f"Loaded log file: {self.current_log_path}")
1135 except Exception as e:
1136 logger.error(f"Error displaying loaded content: {e}")
1138 def _on_file_load_failed(self, error_msg: str) -> None:
1139 """Handle file load failure."""
1140 self.log_display.setText(f"Failed to load log file: {error_msg}")
1141 logger.error(f"Failed to load log file: {error_msg}")
1143 # Search Functionality Methods
1144 def toggle_search_toolbar(self) -> None:
1145 """Show/hide search toolbar (Ctrl+F handler)."""
1146 if self.search_toolbar.isVisible():
1147 # Hide toolbar and clear highlights
1148 self.search_toolbar.setVisible(False)
1149 self.clear_search_highlights()
1150 else:
1151 # Show toolbar and focus search input
1152 self.search_toolbar.setVisible(True)
1153 self.search_input.setFocus()
1154 self.search_input.selectAll()
1156 def perform_search(self) -> None:
1157 """Search in log display using QTextEdit.find()."""
1158 search_text = self.search_input.text()
1159 if not search_text:
1160 self.clear_search_highlights()
1161 return
1163 # Clear previous highlights if search text changed
1164 if search_text != self.current_search_text:
1165 self.clear_search_highlights()
1166 self.current_search_text = search_text
1167 self.highlight_all_matches(search_text)
1169 # Find next occurrence
1170 flags = QTextDocument.FindFlag(0)
1171 if self.case_sensitive_cb.isChecked():
1172 flags |= QTextDocument.FindFlag.FindCaseSensitively
1174 found = self.log_display.find(search_text, flags)
1175 if not found:
1176 # Try from beginning
1177 cursor = self.log_display.textCursor()
1178 cursor.movePosition(cursor.MoveOperation.Start)
1179 self.log_display.setTextCursor(cursor)
1180 self.log_display.find(search_text, flags)
1182 def highlight_all_matches(self, search_text: str) -> None:
1183 """
1184 Highlight all matches of search text in the document.
1186 Args:
1187 search_text: Text to search and highlight
1188 """
1189 if not search_text:
1190 return
1192 # Create highlight format
1193 highlight_format = QTextCharFormat()
1194 highlight_format.setBackground(QColor(255, 255, 0, 100)) # Yellow with transparency
1196 # Search through entire document
1197 document = self.log_display.document()
1198 cursor = QTextCursor(document)
1200 flags = QTextDocument.FindFlag(0)
1201 if self.case_sensitive_cb.isChecked():
1202 flags |= QTextDocument.FindFlag.FindCaseSensitively
1204 self.search_highlights.clear()
1206 while True:
1207 cursor = document.find(search_text, cursor, flags)
1208 if cursor.isNull():
1209 break
1211 # Apply highlight
1212 cursor.mergeCharFormat(highlight_format)
1213 self.search_highlights.append(cursor)
1215 logger.debug(f"Highlighted {len(self.search_highlights)} search matches")
1217 def clear_search_highlights(self) -> None:
1218 """Clear all search highlights from the document."""
1219 # Reset format for all highlighted text
1220 for cursor in self.search_highlights:
1221 if not cursor.isNull():
1222 # Reset to default format
1223 default_format = QTextCharFormat()
1224 cursor.setCharFormat(default_format)
1226 self.search_highlights.clear()
1227 self.current_search_text = ""
1229 def find_next(self) -> None:
1230 """Find next search result."""
1231 self.perform_search()
1233 def find_previous(self) -> None:
1234 """Find previous search result."""
1235 search_text = self.search_input.text()
1236 if not search_text:
1237 return
1239 flags = QTextDocument.FindFlag.FindBackward
1240 if self.case_sensitive_cb.isChecked():
1241 flags |= QTextDocument.FindFlag.FindCaseSensitively
1243 found = self.log_display.find(search_text, flags)
1244 if not found:
1245 # Try from end
1246 cursor = self.log_display.textCursor()
1247 cursor.movePosition(cursor.MoveOperation.End)
1248 self.log_display.setTextCursor(cursor)
1249 self.log_display.find(search_text, flags)
1251 # Control Button Methods
1252 def toggle_auto_scroll(self, enabled: bool) -> None:
1253 """Toggle auto-scroll to bottom."""
1254 self.auto_scroll_enabled = enabled
1255 logger.debug(f"Auto-scroll {'enabled' if enabled else 'disabled'}")
1257 def toggle_pause_tailing(self, paused: bool) -> None:
1258 """Toggle pause/resume log tailing."""
1259 self.tailing_paused = paused
1260 if paused and self.tail_timer:
1261 self.tail_timer.stop()
1262 elif not paused and self.current_log_path:
1263 self.start_log_tailing(self.current_log_path)
1264 logger.debug(f"Log tailing {'paused' if paused else 'resumed'}")
1266 def clear_log_display(self) -> None:
1267 """Clear current log display content."""
1268 self.log_display.clear()
1269 logger.debug("Log display cleared")
1271 def scroll_to_bottom(self) -> None:
1272 """Scroll log display to bottom."""
1273 scrollbar = self.log_display.verticalScrollBar()
1274 scrollbar.setValue(scrollbar.maximum())
1278 # Real-time Tailing Methods
1279 def start_log_tailing(self, log_path: Path) -> None:
1280 """
1281 Start tailing log file with QTimer (100ms interval).
1283 Args:
1284 log_path: Path to log file to tail
1285 """
1286 # Stop any existing timer
1287 if self.tail_timer:
1288 self.tail_timer.stop()
1290 # Create new timer
1291 self.tail_timer = QTimer()
1292 self.tail_timer.timeout.connect(self.read_log_incremental)
1293 self.tail_timer.start(100) # 100ms interval
1295 logger.debug(f"Started tailing log file: {log_path}")
1297 def stop_log_tailing(self) -> None:
1298 """Stop current log tailing."""
1299 if self.tail_timer:
1300 self.tail_timer.stop()
1301 self.tail_timer = None
1302 logger.debug("Stopped log tailing")
1304 def read_log_incremental(self) -> None:
1305 """Read new content from current log file (track file position)."""
1306 if not self.current_log_path or not self.current_log_path.exists():
1307 return
1309 try:
1310 # Get current file size
1311 current_size = self.current_log_path.stat().st_size
1313 # Handle log rotation (file size decreased)
1314 if current_size < self.current_file_position:
1315 logger.info(f"Log rotation detected for {self.current_log_path}")
1316 self.current_file_position = 0
1317 # Optionally clear display or add rotation marker
1318 self.log_display.append("\n--- Log rotated ---\n")
1320 # Read new content if file grew
1321 if current_size > self.current_file_position:
1322 with open(self.current_log_path, 'rb') as f:
1323 f.seek(self.current_file_position)
1324 new_data = f.read(current_size - self.current_file_position)
1326 # Decode new content
1327 try:
1328 new_content = new_data.decode('utf-8', errors='replace')
1329 except UnicodeDecodeError:
1330 new_content = new_data.decode('latin-1', errors='replace')
1332 if new_content:
1333 # Check if user has scrolled up (disable auto-scroll)
1334 scrollbar = self.log_display.verticalScrollBar()
1335 was_at_bottom = scrollbar.value() >= scrollbar.maximum() - 10
1337 # Append new content
1338 cursor = self.log_display.textCursor()
1339 cursor.movePosition(cursor.MoveOperation.End)
1340 cursor.insertText(new_content)
1342 # Auto-scroll if enabled and user was at bottom
1343 if self.auto_scroll_enabled and was_at_bottom:
1344 self.scroll_to_bottom()
1346 # Update file position
1347 self.current_file_position = current_size
1349 except (OSError, PermissionError) as e:
1350 logger.warning(f"Error reading log file {self.current_log_path}: {e}")
1351 # Handle file deletion/recreation
1352 if not self.current_log_path.exists():
1353 logger.info(f"Log file deleted: {self.current_log_path}")
1354 self.log_display.append(f"\n--- Log file deleted: {self.current_log_path} ---\n")
1355 # Try to reconnect after a delay
1356 QTimer.singleShot(1000, self._attempt_reconnection)
1357 except Exception as e:
1358 logger.error(f"Unexpected error in log tailing: {e}")
1359 raise
1361 def _attempt_reconnection(self) -> None:
1362 """Attempt to reconnect to log file after deletion."""
1363 if self.current_log_path and self.current_log_path.exists():
1364 logger.info(f"Log file recreated, reconnecting: {self.current_log_path}")
1365 self.current_file_position = 0
1366 self.log_display.append(f"\n--- Reconnected to: {self.current_log_path} ---\n")
1367 # File will be read on next timer tick
1369 # External Integration Methods
1370 def start_monitoring(self, base_log_path: Optional[str] = None) -> None:
1371 """Start monitoring for new logs."""
1372 if self.file_detector:
1373 self.file_detector.stop_watching()
1375 # Get log directory
1376 log_directory = Path(base_log_path).parent if base_log_path else Path.home() / ".local" / "share" / "openhcs" / "logs"
1378 # Start file watching
1379 self.file_detector = LogFileDetector(base_log_path)
1380 self.file_detector.new_log_detected.connect(self.add_new_log)
1381 self.file_detector.start_watching(log_directory)
1383 def stop_monitoring(self) -> None:
1384 """Stop monitoring for new logs."""
1385 if self.file_detector:
1386 self.file_detector.stop_watching()
1387 self.file_detector = None
1388 logger.info("Stopped monitoring for new logs")
1390 def start_process_tracking(self) -> None:
1391 """Start periodic process status updates."""
1392 # Initial update
1393 self.process_tracker.update()
1395 # Setup timer for periodic updates (every 2 seconds)
1396 self.process_update_timer = QTimer()
1397 self.process_update_timer.timeout.connect(self.update_process_status)
1398 self.process_update_timer.start(2000) # 2 second interval
1400 logger.debug("Started process tracking")
1402 def update_process_status(self) -> None:
1403 """Update process status and refresh dropdown if needed."""
1404 # Update process tracker
1405 self.process_tracker.update()
1407 # Refresh dropdown to update status indicators
1408 # Only if we have logs loaded
1409 if self.log_selector.count() > 0:
1410 # Remember current selection
1411 current_index = self.log_selector.currentIndex()
1412 current_log_info = self.log_selector.itemData(current_index) if current_index >= 0 else None
1414 # Temporarily disconnect signal to avoid triggering reload
1415 self.log_selector.currentIndexChanged.disconnect(self.on_log_selection_changed)
1417 try:
1418 # Repopulate from master list with updated status indicators
1419 self.populate_log_dropdown(self._all_discovered_logs)
1421 # Restore selection if possible
1422 if current_log_info:
1423 # Find the same log in the new dropdown
1424 for i in range(self.log_selector.count()):
1425 log_info = self.log_selector.itemData(i)
1426 if log_info and log_info.path == current_log_info.path:
1427 self.log_selector.setCurrentIndex(i)
1428 break
1429 finally:
1430 # Reconnect signal
1431 self.log_selector.currentIndexChanged.connect(self.on_log_selection_changed)
1433 def _get_process_start_time(self) -> float:
1434 """
1435 Get the start time of the current process.
1437 Returns:
1438 float: Process start time as Unix timestamp
1439 """
1440 try:
1441 import psutil
1442 import os
1443 process = psutil.Process(os.getpid())
1444 return process.create_time()
1445 except Exception as e:
1446 logger.warning(f"Failed to get process start time: {e}")
1447 # Fallback to current time
1448 import time
1449 return time.time()
1451 def _is_log_from_current_session(self, log_info: LogFileInfo) -> bool:
1452 """
1453 Check if a log file was created during the current session.
1455 Args:
1456 log_info: LogFileInfo to check
1458 Returns:
1459 bool: True if log was created after session start time
1460 """
1461 try:
1462 # Get file modification time (when log was created/last written)
1463 mtime = log_info.path.stat().st_mtime
1464 # Allow a small buffer (5 seconds) to account for timing differences
1465 return mtime >= (self._session_start_time - 5.0)
1466 except (OSError, FileNotFoundError):
1467 # If we can't stat the file, exclude it
1468 return False
1470 def _is_log_from_alive_process(self, log_info: LogFileInfo) -> bool:
1471 """
1472 Check if a log file is from a currently running process.
1474 Args:
1475 log_info: LogFileInfo to check
1477 Returns:
1478 bool: True if process is alive or unknown, False if terminated
1479 """
1480 pid = extract_pid_from_log_filename(log_info.path)
1481 if pid is None:
1482 # No PID found - assume it's a main log (always show)
1483 return True
1484 return self.process_tracker.is_alive(pid)
1486 def on_filter_changed(self, state: int) -> None:
1487 """
1488 Handle filter checkbox state change.
1490 Args:
1491 state: Qt.CheckState value
1492 """
1493 self.show_alive_only = (state == Qt.CheckState.Checked.value)
1495 # Refresh dropdown with filter applied
1496 # Always use master list as source, not the current dropdown
1497 # This ensures all logs are available when filter is toggled off
1498 if self._all_discovered_logs:
1499 self.populate_log_dropdown(self._all_discovered_logs)
1501 logger.debug(f"Filter changed: show_alive_only={self.show_alive_only}")
1503 def cleanup(self) -> None:
1504 """Cleanup all resources and background processes."""
1505 try:
1506 # Stop tailing timer
1507 if hasattr(self, 'tail_timer') and self.tail_timer and self.tail_timer.isActive():
1508 self.tail_timer.stop()
1509 self.tail_timer.deleteLater()
1510 self.tail_timer = None
1512 # Stop process tracking timer
1513 if hasattr(self, 'process_update_timer') and self.process_update_timer and self.process_update_timer.isActive():
1514 self.process_update_timer.stop()
1515 self.process_update_timer.deleteLater()
1516 self.process_update_timer = None
1518 # Stop file monitoring
1519 self.stop_monitoring()
1521 # Clean up file detector
1522 if hasattr(self, 'file_detector') and self.file_detector:
1523 self.file_detector.stop_watching()
1524 self.file_detector = None
1526 except Exception as e:
1527 logger.warning(f"Error during log viewer cleanup: {e}")
1529 def closeEvent(self, event) -> None:
1530 """Handle window close event."""
1531 if self.file_detector:
1532 self.file_detector.stop_watching()
1533 if self.tail_timer:
1534 self.tail_timer.stop()
1535 if hasattr(self, 'process_update_timer') and self.process_update_timer:
1536 self.process_update_timer.stop()
1537 self.window_closed.emit()
1538 super().closeEvent(event)