Coverage for openhcs/pyqt_gui/widgets/log_viewer.py: 0.0%

769 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2PyQt6 Log Viewer Window 

3 

4Provides comprehensive log viewing capabilities with real-time tailing, search functionality, 

5and integration with OpenHCS subprocess execution. Reimplements log viewing using Qt widgets 

6for native desktop integration. 

7""" 

8 

9import logging 

10from typing import Optional, List, Set, Tuple 

11from pathlib import Path 

12 

13from PyQt6.QtWidgets import ( 

14 QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QComboBox, 

15 QTextEdit, QToolBar, QLineEdit, QCheckBox, QPushButton, QDialog 

16) 

17from PyQt6.QtGui import QSyntaxHighlighter, QTextDocument 

18from PyQt6.QtCore import QObject, QTimer, QFileSystemWatcher, pyqtSignal, pyqtSlot, Qt, QRegularExpression, QThread 

19from PyQt6.QtGui import QTextCharFormat, QColor, QAction, QFont, QTextCursor 

20 

21from openhcs.io.filemanager import FileManager 

22from openhcs.core.log_utils import LogFileInfo 

23from openhcs.pyqt_gui.utils.log_detection_utils import ( 

24 get_current_tui_log_path, discover_logs, discover_all_logs 

25) 

26from openhcs.core.log_utils import ( 

27 classify_log_file, is_openhcs_log_file, infer_base_log_path 

28) 

29from openhcs.pyqt_gui.utils.process_tracker import ( 

30 ProcessTracker, extract_pid_from_log_filename, get_log_display_name, get_log_tooltip 

31) 

32 

33# Import Pygments for advanced syntax highlighting 

34from pygments import highlight 

35from pygments.lexers import PythonLexer, get_lexer_by_name 

36from pygments.formatters import get_formatter_by_name 

37from pygments.token import Token 

38from pygments.style import Style 

39from pygments.styles import get_style_by_name 

40from dataclasses import dataclass 

41from typing import Dict, Tuple 

42 

43logger = logging.getLogger(__name__) 

44 

45 

46@dataclass 

47class LogColorScheme: 

48 """ 

49 Centralized color scheme for log highlighting with semantic color names. 

50 

51 Supports light/dark theme variants and ensures WCAG accessibility compliance. 

52 All colors meet minimum 4.5:1 contrast ratio for normal text readability. 

53 """ 

54 

55 # Log level colors with semantic meaning (WCAG 4.5:1 compliant) 

56 log_critical_fg: Tuple[int, int, int] = (255, 255, 255) # White text 

57 log_critical_bg: Tuple[int, int, int] = (139, 0, 0) # Dark red background 

58 log_error_color: Tuple[int, int, int] = (255, 85, 85) # Brighter red - WCAG compliant 

59 log_warning_color: Tuple[int, int, int] = (255, 140, 0) # Dark orange - attention grabbing 

60 log_info_color: Tuple[int, int, int] = (100, 160, 210) # Brighter steel blue - WCAG compliant 

61 log_debug_color: Tuple[int, int, int] = (160, 160, 160) # Lighter gray - better contrast 

62 

63 # Metadata and structural colors 

64 timestamp_color: Tuple[int, int, int] = (105, 105, 105) # Dim gray - unobtrusive 

65 logger_name_color: Tuple[int, int, int] = (147, 112, 219) # Medium slate blue - distinctive 

66 memory_address_color: Tuple[int, int, int] = (255, 182, 193) # Light pink - technical data 

67 file_path_color: Tuple[int, int, int] = (34, 139, 34) # Forest green - file system 

68 

69 # Python syntax colors (following VS Code dark theme conventions) 

70 python_keyword_color: Tuple[int, int, int] = (86, 156, 214) # Blue - language keywords 

71 python_string_color: Tuple[int, int, int] = (206, 145, 120) # Orange - string literals 

72 python_number_color: Tuple[int, int, int] = (181, 206, 168) # Light green - numeric values 

73 python_operator_color: Tuple[int, int, int] = (212, 212, 212) # Light gray - operators/punctuation 

74 python_name_color: Tuple[int, int, int] = (156, 220, 254) # Light blue - identifiers 

75 python_function_color: Tuple[int, int, int] = (220, 220, 170) # Yellow - function names 

76 python_class_color: Tuple[int, int, int] = (78, 201, 176) # Teal - class names 

77 python_builtin_color: Tuple[int, int, int] = (86, 156, 214) # Blue - built-in functions 

78 python_comment_color: Tuple[int, int, int] = (106, 153, 85) # Green - comments 

79 

80 # Special highlighting colors 

81 exception_color: Tuple[int, int, int] = (255, 69, 0) # Red orange - error types 

82 function_call_color: Tuple[int, int, int] = (255, 215, 0) # Gold - function invocations 

83 boolean_color: Tuple[int, int, int] = (86, 156, 214) # Blue - True/False/None 

84 

85 # Enhanced syntax colors (Phase 1 additions) 

86 tuple_parentheses_color: Tuple[int, int, int] = (255, 215, 0) # Gold - tuple delimiters 

87 set_braces_color: Tuple[int, int, int] = (255, 140, 0) # Dark orange - set delimiters 

88 class_representation_color: Tuple[int, int, int] = (78, 201, 176) # Teal - <class 'name'> 

89 function_representation_color: Tuple[int, int, int] = (220, 220, 170) # Yellow - <function name> 

90 module_path_color: Tuple[int, int, int] = (147, 112, 219) # Medium slate blue - module.path 

91 hex_number_color: Tuple[int, int, int] = (181, 206, 168) # Light green - 0xFF 

92 scientific_notation_color: Tuple[int, int, int] = (181, 206, 168) # Light green - 1.23e-4 

93 binary_number_color: Tuple[int, int, int] = (181, 206, 168) # Light green - 0b1010 

94 octal_number_color: Tuple[int, int, int] = (181, 206, 168) # Light green - 0o755 

95 python_special_color: Tuple[int, int, int] = (255, 20, 147) # Deep pink - __name__ 

96 single_quoted_string_color: Tuple[int, int, int] = (206, 145, 120) # Orange - 'string' 

97 list_comprehension_color: Tuple[int, int, int] = (156, 220, 254) # Light blue - [x for x in y] 

98 generator_expression_color: Tuple[int, int, int] = (156, 220, 254) # Light blue - (x for x in y) 

99 

100 @classmethod 

101 def create_dark_theme(cls) -> 'LogColorScheme': 

102 """ 

103 Create a dark theme variant with adjusted colors for dark backgrounds. 

104 

105 Returns: 

106 LogColorScheme: Dark theme color scheme with higher contrast 

107 """ 

108 return cls( 

109 # Enhanced colors for dark backgrounds with better contrast 

110 log_error_color=(255, 100, 100), # Brighter red 

111 log_info_color=(120, 180, 230), # Brighter steel blue 

112 timestamp_color=(160, 160, 160), # Lighter gray 

113 python_string_color=(236, 175, 150), # Brighter orange 

114 python_number_color=(200, 230, 190), # Brighter green 

115 # Other colors remain the same as they work well on dark backgrounds 

116 ) 

117 

118 @classmethod 

119 def create_light_theme(cls) -> 'LogColorScheme': 

120 """ 

121 Create a light theme variant with adjusted colors for light backgrounds. 

122 

123 Returns: 

124 LogColorScheme: Light theme color scheme with appropriate contrast 

125 """ 

126 return cls( 

127 # Darker colors for light backgrounds with WCAG compliance 

128 log_error_color=(180, 20, 40), # Darker red 

129 log_info_color=(30, 80, 130), # Darker steel blue 

130 log_warning_color=(200, 100, 0), # Darker orange 

131 timestamp_color=(60, 60, 60), # Darker gray 

132 logger_name_color=(100, 60, 160), # Darker slate blue 

133 python_string_color=(150, 80, 60), # Darker orange 

134 python_number_color=(120, 140, 100), # Darker green 

135 memory_address_color=(200, 120, 140), # Darker pink 

136 file_path_color=(20, 100, 20), # Darker forest green 

137 exception_color=(200, 40, 0), # Darker red orange 

138 # Adjust other colors for light background contrast 

139 ) 

140 

141 def to_qcolor(self, color_tuple: Tuple[int, int, int]) -> QColor: 

142 """ 

143 Convert RGB tuple to QColor object. 

144 

145 Args: 

146 color_tuple: RGB color tuple (r, g, b) 

147 

148 Returns: 

149 QColor: Qt color object 

150 """ 

151 return QColor(*color_tuple) 

152 

153 

154class LogFileDetector(QObject): 

155 """ 

156 Detects new log files in directory using efficient file monitoring. 

157  

158 Uses QFileSystemWatcher to monitor directory changes and set operations 

159 for efficient new file detection. Handles base_log_path as file prefix 

160 and watches the parent directory. 

161 """ 

162 

163 # Signals 

164 new_log_detected = pyqtSignal(object) # LogFileInfo object 

165 _server_scan_complete = pyqtSignal(list) # List of LogFileInfo from server scan 

166 

167 def __init__(self, base_log_path: Optional[str] = None): 

168 """ 

169 Initialize LogFileDetector. 

170  

171 Args: 

172 base_log_path: Base path for subprocess log files (file prefix, not directory) 

173 """ 

174 super().__init__() 

175 self._base_log_path = base_log_path 

176 self._previous_files: Set[Path] = set() 

177 self._watcher = QFileSystemWatcher() 

178 self._watcher.directoryChanged.connect(self._on_directory_changed) 

179 self._watching_directory: Optional[Path] = None 

180 

181 logger.debug(f"LogFileDetector initialized with base_log_path: {base_log_path}") 

182 

183 def start_watching(self, directory: Path) -> None: 

184 """ 

185 Start watching directory for new log files. 

186  

187 Args: 

188 directory: Directory to watch for new log files 

189 """ 

190 if not directory.exists(): 

191 logger.warning(f"Cannot watch non-existent directory: {directory}") 

192 return 

193 

194 # Stop any existing watching 

195 self.stop_watching() 

196 

197 # Add directory to watcher 

198 success = self._watcher.addPath(str(directory)) 

199 if success: 

200 self._watching_directory = directory 

201 # Initialize previous files set 

202 self._previous_files = self.scan_directory(directory) 

203 logger.debug(f"Started watching directory: {directory}") 

204 logger.debug(f"Initial file count: {len(self._previous_files)}") 

205 else: 

206 logger.error(f"Failed to add directory to watcher: {directory}") 

207 

208 def stop_watching(self) -> None: 

209 """Stop file watching and cleanup.""" 

210 if self._watching_directory: 

211 self._watcher.removePath(str(self._watching_directory)) 

212 self._watching_directory = None 

213 self._previous_files.clear() 

214 logger.debug("Stopped file watching") 

215 

216 def scan_directory(self, directory: Path) -> Set[Path]: 

217 """ 

218 Scan directory for .log files. 

219  

220 Args: 

221 directory: Directory to scan 

222  

223 Returns: 

224 Set[Path]: Set of Path objects for .log files found 

225 """ 

226 try: 

227 log_files = set(directory.glob("*.log")) 

228 logger.debug(f"Scanned directory {directory}: found {len(log_files)} .log files") 

229 return log_files 

230 except (FileNotFoundError, PermissionError) as e: 

231 logger.warning(f"Error scanning directory {directory}: {e}") 

232 return set() 

233 

234 def detect_new_files(self, current_files: Set[Path]) -> Set[Path]: 

235 """ 

236 Use set.difference() to find new files efficiently. 

237  

238 Args: 

239 current_files: Current set of files in directory 

240  

241 Returns: 

242 Set[Path]: Set of newly discovered files 

243 """ 

244 new_files = current_files.difference(self._previous_files) 

245 if new_files: 

246 logger.debug(f"Detected {len(new_files)} new files: {[f.name for f in new_files]}") 

247 

248 # Update previous files set 

249 self._previous_files = current_files 

250 return new_files 

251 

252 

253 

254 def _on_directory_changed(self, directory_path: str) -> None: 

255 """ 

256 Handle QFileSystemWatcher directory change signal. 

257  

258 Args: 

259 directory_path: Path of directory that changed 

260 """ 

261 directory = Path(directory_path) 

262 logger.debug(f"Directory changed: {directory}") 

263 

264 # Scan directory for current files 

265 current_files = self.scan_directory(directory) 

266 

267 # Detect new files 

268 new_files = self.detect_new_files(current_files) 

269 

270 # Process new files 

271 for file_path in new_files: 

272 if file_path.exists() and is_openhcs_log_file(file_path): 

273 try: 

274 # For general watching, try to infer base_log_path from the file name 

275 effective_base_log_path = self._base_log_path 

276 if not effective_base_log_path and 'subprocess_' in file_path.name: 

277 effective_base_log_path = infer_base_log_path(file_path) 

278 

279 log_info = classify_log_file(file_path, effective_base_log_path, 

280 include_tui_log=False) 

281 

282 logger.info(f"New relevant log file detected: {file_path} (type: {log_info.log_type})") 

283 self.new_log_detected.emit(log_info) 

284 except Exception as e: 

285 logger.error(f"Error classifying new log file {file_path}: {e}") 

286 

287 

288class LogHighlighter(QSyntaxHighlighter): 

289 """ 

290 Advanced syntax highlighter for log files using Pygments. 

291 

292 Provides sophisticated highlighting for OpenHCS log format with support for: 

293 - Log levels and timestamps 

294 - Python code snippets and data structures 

295 - Memory addresses and function signatures 

296 - Complex nested dictionaries and lists 

297 - Exception tracebacks and file paths 

298 """ 

299 

300 def __init__(self, parent: QTextDocument, color_scheme: LogColorScheme = None): 

301 """ 

302 Initialize the log highlighter with optional color scheme. 

303 

304 Args: 

305 parent: QTextDocument to apply highlighting to 

306 color_scheme: Color scheme to use (defaults to dark theme) 

307 """ 

308 super().__init__(parent) 

309 self.color_scheme = color_scheme or LogColorScheme() 

310 self.setup_pygments_styles() 

311 self.setup_highlighting_rules() 

312 

313 def setup_pygments_styles(self) -> None: 

314 """Setup Pygments token to QTextCharFormat mapping using color scheme.""" 

315 cs = self.color_scheme # Shorthand for readability 

316 

317 # Create a mapping from Pygments tokens to Qt text formats 

318 self.token_formats = { 

319 # Log levels with distinct colors and backgrounds 

320 'log_critical': self._create_format( 

321 cs.to_qcolor(cs.log_critical_fg), 

322 cs.to_qcolor(cs.log_critical_bg), 

323 bold=True 

324 ), 

325 'log_error': self._create_format(cs.to_qcolor(cs.log_error_color), bold=True), 

326 'log_warning': self._create_format(cs.to_qcolor(cs.log_warning_color), bold=True), 

327 'log_info': self._create_format(cs.to_qcolor(cs.log_info_color), bold=True), 

328 'log_debug': self._create_format(cs.to_qcolor(cs.log_debug_color)), 

329 

330 # Timestamps and metadata 

331 'timestamp': self._create_format(cs.to_qcolor(cs.timestamp_color)), 

332 'logger_name': self._create_format(cs.to_qcolor(cs.logger_name_color), bold=True), 

333 

334 # Python syntax highlighting (for complex data structures) 

335 Token.Keyword: self._create_format(cs.to_qcolor(cs.python_keyword_color), bold=True), 

336 Token.String: self._create_format(cs.to_qcolor(cs.python_string_color)), 

337 Token.String.Single: self._create_format(cs.to_qcolor(cs.python_string_color)), 

338 Token.String.Double: self._create_format(cs.to_qcolor(cs.python_string_color)), 

339 Token.Number: self._create_format(cs.to_qcolor(cs.python_number_color)), 

340 Token.Number.Integer: self._create_format(cs.to_qcolor(cs.python_number_color)), 

341 Token.Number.Float: self._create_format(cs.to_qcolor(cs.python_number_color)), 

342 Token.Number.Hex: self._create_format(cs.to_qcolor(cs.python_number_color)), 

343 Token.Number.Oct: self._create_format(cs.to_qcolor(cs.python_number_color)), 

344 Token.Number.Bin: self._create_format(cs.to_qcolor(cs.python_number_color)), 

345 Token.Operator: self._create_format(cs.to_qcolor(cs.python_operator_color)), 

346 Token.Punctuation: self._create_format(cs.to_qcolor(cs.python_operator_color)), 

347 Token.Name: self._create_format(cs.to_qcolor(cs.python_name_color)), 

348 Token.Name.Function: self._create_format(cs.to_qcolor(cs.python_function_color), bold=True), 

349 Token.Name.Class: self._create_format(cs.to_qcolor(cs.python_class_color), bold=True), 

350 Token.Name.Builtin: self._create_format(cs.to_qcolor(cs.python_builtin_color)), 

351 Token.Comment: self._create_format(cs.to_qcolor(cs.python_comment_color)), 

352 Token.Literal: self._create_format(cs.to_qcolor(cs.python_number_color)), 

353 

354 # Special patterns for log content 

355 'memory_address': self._create_format(cs.to_qcolor(cs.memory_address_color)), 

356 'file_path': self._create_format(cs.to_qcolor(cs.file_path_color)), 

357 'exception': self._create_format(cs.to_qcolor(cs.exception_color), bold=True), 

358 'function_call': self._create_format(cs.to_qcolor(cs.function_call_color)), 

359 'dict_key': self._create_format(cs.to_qcolor(cs.python_name_color)), 

360 'boolean': self._create_format(cs.to_qcolor(cs.boolean_color), bold=True), 

361 

362 # Enhanced Python syntax elements (Phase 1) 

363 'tuple_parentheses': self._create_format(cs.to_qcolor(cs.tuple_parentheses_color)), 

364 'set_braces': self._create_format(cs.to_qcolor(cs.set_braces_color)), 

365 'class_representation': self._create_format(cs.to_qcolor(cs.class_representation_color), bold=True), 

366 'function_representation': self._create_format(cs.to_qcolor(cs.function_representation_color), bold=True), 

367 'module_path': self._create_format(cs.to_qcolor(cs.module_path_color)), 

368 'hex_number': self._create_format(cs.to_qcolor(cs.hex_number_color)), 

369 'scientific_notation': self._create_format(cs.to_qcolor(cs.scientific_notation_color)), 

370 'binary_number': self._create_format(cs.to_qcolor(cs.binary_number_color)), 

371 'octal_number': self._create_format(cs.to_qcolor(cs.octal_number_color)), 

372 'python_special': self._create_format(cs.to_qcolor(cs.python_special_color), bold=True), 

373 'single_quoted_string': self._create_format(cs.to_qcolor(cs.single_quoted_string_color)), 

374 'list_comprehension': self._create_format(cs.to_qcolor(cs.list_comprehension_color)), 

375 'generator_expression': self._create_format(cs.to_qcolor(cs.generator_expression_color)), 

376 } 

377 

378 def _create_format(self, fg_color: QColor, bg_color: QColor = None, bold: bool = False) -> QTextCharFormat: 

379 """Create a QTextCharFormat with specified properties.""" 

380 format = QTextCharFormat() 

381 format.setForeground(fg_color) 

382 if bg_color: 

383 format.setBackground(bg_color) 

384 if bold: 

385 format.setFontWeight(QFont.Weight.Bold) 

386 return format 

387 

388 def setup_highlighting_rules(self) -> None: 

389 """Setup regex patterns for log-specific highlighting.""" 

390 self.highlighting_rules = [] 

391 

392 # Log level patterns (highest priority) 

393 log_levels = [ 

394 ("CRITICAL", self.token_formats['log_critical']), 

395 ("ERROR", self.token_formats['log_error']), 

396 ("WARNING", self.token_formats['log_warning']), 

397 ("INFO", self.token_formats['log_info']), 

398 ("DEBUG", self.token_formats['log_debug']), 

399 ] 

400 

401 for level, format in log_levels: 

402 pattern = QRegularExpression(rf"\b{level}\b") 

403 self.highlighting_rules.append((pattern, format)) 

404 

405 # Timestamp pattern: YYYY-MM-DD HH:MM:SS,mmm 

406 timestamp_pattern = QRegularExpression(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}") 

407 self.highlighting_rules.append((timestamp_pattern, self.token_formats['timestamp'])) 

408 

409 # Logger names (e.g., openhcs.core.orchestrator) 

410 logger_pattern = QRegularExpression(r"openhcs\.[a-zA-Z0-9_.]+") 

411 self.highlighting_rules.append((logger_pattern, self.token_formats['logger_name'])) 

412 

413 # Memory addresses (e.g., 0x7f1640dd8e00) 

414 memory_pattern = QRegularExpression(r"0x[0-9a-fA-F]+") 

415 self.highlighting_rules.append((memory_pattern, self.token_formats['memory_address'])) 

416 

417 # File paths in tracebacks 

418 filepath_pattern = QRegularExpression(r'["\']?/[^"\'\s]+\.py["\']?') 

419 self.highlighting_rules.append((filepath_pattern, self.token_formats['file_path'])) 

420 

421 # Exception names 

422 exception_pattern = QRegularExpression(r'\b[A-Z][a-zA-Z]*Error\b|\b[A-Z][a-zA-Z]*Exception\b') 

423 self.highlighting_rules.append((exception_pattern, self.token_formats['exception'])) 

424 

425 # Function calls with parentheses 

426 function_pattern = QRegularExpression(r'\b[a-zA-Z_][a-zA-Z0-9_]*\(\)') 

427 self.highlighting_rules.append((function_pattern, self.token_formats['function_call'])) 

428 

429 # Boolean values 

430 boolean_pattern = QRegularExpression(r'\b(True|False|None)\b') 

431 self.highlighting_rules.append((boolean_pattern, self.token_formats['boolean'])) 

432 

433 # Enhanced Python syntax elements 

434 

435 # Single-quoted strings (complement to double-quoted) 

436 single_quote_pattern = QRegularExpression(r"'[^']*'") 

437 self.highlighting_rules.append((single_quote_pattern, self.token_formats['single_quoted_string'])) 

438 

439 # Class representations: <class 'module.ClassName'> 

440 class_repr_pattern = QRegularExpression(r"<class '[^']*'>") 

441 self.highlighting_rules.append((class_repr_pattern, self.token_formats['class_representation'])) 

442 

443 # Function representations: <function name at 0xaddress> 

444 function_repr_pattern = QRegularExpression(r"<function [^>]+ at 0x[0-9a-fA-F]+>") 

445 self.highlighting_rules.append((function_repr_pattern, self.token_formats['function_representation'])) 

446 

447 # Extended module paths (beyond just openhcs) 

448 module_path_pattern = QRegularExpression(r"\b[a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*){2,}") 

449 self.highlighting_rules.append((module_path_pattern, self.token_formats['module_path'])) 

450 

451 # Hexadecimal numbers (beyond memory addresses): 0xFF, 0x1A2B 

452 hex_number_pattern = QRegularExpression(r"\b0[xX][0-9a-fA-F]+\b") 

453 self.highlighting_rules.append((hex_number_pattern, self.token_formats['hex_number'])) 

454 

455 # Scientific notation: 1.23e-4, 5.67E+10 

456 scientific_pattern = QRegularExpression(r"\b\d+\.?\d*[eE][+-]?\d+\b") 

457 self.highlighting_rules.append((scientific_pattern, self.token_formats['scientific_notation'])) 

458 

459 # Binary literals: 0b1010 

460 binary_pattern = QRegularExpression(r"\b0[bB][01]+\b") 

461 self.highlighting_rules.append((binary_pattern, self.token_formats['binary_number'])) 

462 

463 # Octal literals: 0o755 

464 octal_pattern = QRegularExpression(r"\b0[oO][0-7]+\b") 

465 self.highlighting_rules.append((octal_pattern, self.token_formats['octal_number'])) 

466 

467 # Python special constants: __name__, __main__, __file__, etc. 

468 python_special_pattern = QRegularExpression(r"\b__[a-zA-Z_][a-zA-Z0-9_]*__\b") 

469 self.highlighting_rules.append((python_special_pattern, self.token_formats['python_special'])) 

470 

471 logger.debug(f"Setup {len(self.highlighting_rules)} highlighting rules") 

472 

473 def set_color_scheme(self, color_scheme: LogColorScheme) -> None: 

474 """ 

475 Update the color scheme and refresh highlighting. 

476 

477 Args: 

478 color_scheme: New color scheme to apply 

479 """ 

480 self.color_scheme = color_scheme 

481 self.setup_pygments_styles() 

482 self.setup_highlighting_rules() 

483 # Trigger re-highlighting of the entire document 

484 self.rehighlight() 

485 logger.debug(f"Applied new color scheme with {len(self.token_formats)} token formats") 

486 

487 def switch_to_dark_theme(self) -> None: 

488 """Switch to dark theme color scheme.""" 

489 self.set_color_scheme(LogColorScheme.create_dark_theme()) 

490 

491 def switch_to_light_theme(self) -> None: 

492 """Switch to light theme color scheme.""" 

493 self.set_color_scheme(LogColorScheme.create_light_theme()) 

494 

495 @classmethod 

496 def load_color_scheme_from_config(cls, config_path: str = None) -> LogColorScheme: 

497 """ 

498 Load color scheme from external configuration file. 

499 

500 Args: 

501 config_path: Path to JSON/YAML config file (optional) 

502 

503 Returns: 

504 LogColorScheme: Loaded color scheme or default if file not found 

505 """ 

506 if config_path and Path(config_path).exists(): 

507 try: 

508 import json 

509 with open(config_path, 'r') as f: 

510 config = json.load(f) 

511 

512 # Create color scheme from config 

513 scheme_kwargs = {} 

514 for key, value in config.items(): 

515 if key.endswith('_color') or key.endswith('_fg') or key.endswith('_bg'): 

516 if isinstance(value, list) and len(value) == 3: 

517 scheme_kwargs[key] = tuple(value) 

518 

519 return LogColorScheme(**scheme_kwargs) 

520 

521 except Exception as e: 

522 logger.warning(f"Failed to load color scheme from {config_path}: {e}") 

523 

524 return LogColorScheme() # Return default scheme 

525 

526 def highlightBlock(self, text: str) -> None: 

527 """ 

528 Apply highlighting to text block using regex patterns. 

529 

530 Uses regex patterns for log-specific content (timestamps, log levels, etc.). 

531 Fast and doesn't block the UI. 

532 """ 

533 # Apply log-specific patterns 

534 for pattern, format in self.highlighting_rules: 

535 iterator = pattern.globalMatch(text) 

536 while iterator.hasNext(): 

537 match = iterator.next() 

538 start = match.capturedStart() 

539 length = match.capturedLength() 

540 self.setFormat(start, length, format) 

541 

542 

543class LogFileLoader(QThread): 

544 """Background thread for loading large log files without blocking UI.""" 

545 

546 # Signals 

547 content_loaded = pyqtSignal(str) # Emits file content when loaded 

548 load_failed = pyqtSignal(str) # Emits error message on failure 

549 

550 def __init__(self, log_path: Path): 

551 super().__init__() 

552 self.log_path = log_path 

553 

554 def run(self): 

555 """Load file content in background thread.""" 

556 try: 

557 with open(self.log_path, 'r', encoding='utf-8', errors='replace') as f: 

558 content = f.read() 

559 self.content_loaded.emit(content) 

560 except Exception as e: 

561 self.load_failed.emit(str(e)) 

562 

563 

564class LogViewerWindow(QMainWindow): 

565 """Main log viewer window with dropdown, search, and real-time tailing.""" 

566 

567 window_closed = pyqtSignal() 

568 _subprocess_scan_complete = pyqtSignal(list) # Internal signal for async subprocess scan 

569 _server_scan_complete = pyqtSignal(list) # Internal signal for async server scan 

570 

571 def __init__(self, file_manager: FileManager, service_adapter, parent=None): 

572 super().__init__(parent) 

573 self.file_manager = file_manager 

574 self.service_adapter = service_adapter 

575 

576 # State 

577 self.current_log_path: Optional[Path] = None 

578 self.current_file_position: int = 0 

579 self.auto_scroll_enabled: bool = True 

580 self.tailing_paused: bool = False 

581 

582 # Search state 

583 self.current_search_text: str = "" 

584 self.search_highlights: List[QTextCursor] = [] 

585 

586 # Components 

587 self.log_selector: QComboBox = None 

588 self.search_toolbar: QToolBar = None 

589 self.log_display: QTextEdit = None 

590 self.file_detector: LogFileDetector = None 

591 self.tail_timer: QTimer = None 

592 self.highlighter: LogHighlighter = None 

593 self.file_loader: Optional[LogFileLoader] = None # Async file loader 

594 self.server_scan_timer: QTimer = None # Periodic ZMQ server scanning 

595 self._pending_log_to_load: Optional[Path] = None # Log to load when window is shown 

596 

597 # Process tracking for alive/dead process indication 

598 self.process_tracker = ProcessTracker() 

599 self.process_update_timer: QTimer = None 

600 self.show_alive_only: bool = False # Filter to show only logs from running processes 

601 

602 # Master list of all discovered logs (single source of truth) 

603 # Dropdown is a filtered VIEW of this list 

604 self._all_discovered_logs: List[LogFileInfo] = [] 

605 

606 # Track session start time to filter out old logs from previous sessions 

607 # Use current process start time, not log viewer init time 

608 self._session_start_time = self._get_process_start_time() 

609 

610 self.setup_ui() 

611 self.setup_connections() 

612 self.initialize_logs() 

613 self.start_process_tracking() 

614 

615 def setup_ui(self) -> None: 

616 """Setup complete UI layout with exact widget hierarchy.""" 

617 self.setWindowTitle("Log Viewer") 

618 self.setMinimumSize(800, 600) 

619 

620 # Central widget with main layout 

621 central_widget = QWidget() 

622 self.setCentralWidget(central_widget) 

623 main_layout = QVBoxLayout(central_widget) 

624 

625 # Log selector dropdown 

626 self.log_selector = QComboBox() 

627 self.log_selector.setMinimumHeight(30) 

628 main_layout.addWidget(self.log_selector) 

629 

630 # Search toolbar (initially hidden) 

631 self.search_toolbar = QToolBar("Search") 

632 self.search_toolbar.setVisible(False) 

633 

634 # Search input 

635 self.search_input = QLineEdit() 

636 self.search_input.setPlaceholderText("Search logs...") 

637 self.search_toolbar.addWidget(self.search_input) 

638 

639 # Search options 

640 self.case_sensitive_cb = QCheckBox("Case sensitive") 

641 self.search_toolbar.addWidget(self.case_sensitive_cb) 

642 

643 self.regex_cb = QCheckBox("Regex") 

644 self.search_toolbar.addWidget(self.regex_cb) 

645 

646 # Search navigation buttons 

647 self.prev_button = QPushButton("Previous") 

648 self.next_button = QPushButton("Next") 

649 self.close_search_button = QPushButton("Close") 

650 

651 self.search_toolbar.addWidget(self.prev_button) 

652 self.search_toolbar.addWidget(self.next_button) 

653 self.search_toolbar.addWidget(self.close_search_button) 

654 

655 main_layout.addWidget(self.search_toolbar) 

656 

657 # Log display area 

658 self.log_display = QTextEdit() 

659 self.log_display.setReadOnly(True) 

660 self.log_display.setFont(QFont("Consolas", 10)) # Monospace font for logs 

661 main_layout.addWidget(self.log_display) 

662 

663 # Control buttons layout 

664 control_layout = QHBoxLayout() 

665 

666 self.auto_scroll_btn = QPushButton("Auto-scroll") 

667 self.auto_scroll_btn.setCheckable(True) 

668 self.auto_scroll_btn.setChecked(True) 

669 

670 self.pause_btn = QPushButton("Pause") 

671 self.pause_btn.setCheckable(True) 

672 

673 self.clear_btn = QPushButton("Clear") 

674 self.bottom_btn = QPushButton("Bottom") 

675 

676 # Process filter checkbox 

677 self.show_alive_only_cb = QCheckBox("Show only running processes") 

678 self.show_alive_only_cb.setToolTip("Filter logs to show only those from currently running processes") 

679 

680 control_layout.addWidget(self.auto_scroll_btn) 

681 control_layout.addWidget(self.pause_btn) 

682 control_layout.addWidget(self.clear_btn) 

683 control_layout.addWidget(self.bottom_btn) 

684 control_layout.addWidget(self.show_alive_only_cb) 

685 control_layout.addStretch() # Push buttons to left 

686 

687 main_layout.addLayout(control_layout) 

688 

689 # Setup syntax highlighting 

690 self.highlighter = LogHighlighter(self.log_display.document()) 

691 

692 # Setup window-local Ctrl+F shortcut 

693 search_action = QAction("Search", self) 

694 search_action.setShortcut("Ctrl+F") 

695 search_action.triggered.connect(self.toggle_search_toolbar) 

696 self.addAction(search_action) 

697 

698 logger.debug("LogViewerWindow UI setup complete") 

699 

700 def setup_connections(self) -> None: 

701 """Setup signal/slot connections.""" 

702 # Log selector 

703 self.log_selector.currentIndexChanged.connect(self.on_log_selection_changed) 

704 

705 # Search functionality 

706 self.search_input.returnPressed.connect(self.perform_search) 

707 self.prev_button.clicked.connect(self.find_previous) 

708 self.next_button.clicked.connect(self.find_next) 

709 self.close_search_button.clicked.connect(self.toggle_search_toolbar) 

710 

711 # Control buttons 

712 self.auto_scroll_btn.toggled.connect(self.toggle_auto_scroll) 

713 self.pause_btn.toggled.connect(self.toggle_pause_tailing) 

714 self.clear_btn.clicked.connect(self.clear_log_display) 

715 self.bottom_btn.clicked.connect(self.scroll_to_bottom) 

716 self.show_alive_only_cb.stateChanged.connect(self.on_filter_changed) 

717 

718 # Internal signals 

719 self._subprocess_scan_complete.connect(self._on_subprocess_scan_complete) 

720 self._server_scan_complete.connect(self._on_server_scan_complete) 

721 

722 logger.debug("LogViewerWindow connections setup complete") 

723 

724 def showEvent(self, event): 

725 """Override showEvent to load log when window is first shown.""" 

726 super().showEvent(event) 

727 

728 # Load pending log on first show 

729 if self._pending_log_to_load: 

730 self.switch_to_log(self._pending_log_to_load) 

731 self._pending_log_to_load = None 

732 

733 def initialize_logs(self) -> None: 

734 """Initialize with main log only, then scan for subprocess logs in background.""" 

735 # Only discover the main log initially (fast startup) 

736 initial_logs = [] 

737 try: 

738 from openhcs.core.log_utils import get_current_log_file_path, classify_log_file 

739 from pathlib import Path 

740 

741 main_log_path = get_current_log_file_path() 

742 main_log = Path(main_log_path) 

743 if main_log.exists(): 

744 log_info = classify_log_file(main_log, None, True) 

745 initial_logs.append(log_info) 

746 logger.debug("Discovered main log") 

747 except Exception as e: 

748 logger.warning(f"Error discovering main log: {e}") 

749 # Continue without main log 

750 pass 

751 

752 # Store main log in master list 

753 self._all_discovered_logs = initial_logs.copy() 

754 

755 # Populate dropdown with main log immediately (fast) 

756 if initial_logs: 

757 self.populate_log_dropdown(initial_logs) 

758 # Store first log to load when window is shown (defer loading) 

759 self._pending_log_to_load = initial_logs[0].path 

760 

761 # Start monitoring for new logs 

762 self.start_monitoring() 

763 

764 # Scan for existing subprocess logs in background (async - doesn't block) 

765 self._scan_subprocess_logs_async() 

766 

767 # Scan for servers in background (async - doesn't block) 

768 self._scan_servers_async() 

769 

770 def _scan_subprocess_logs_async(self) -> None: 

771 """Scan for existing subprocess logs in background thread (non-blocking).""" 

772 import threading 

773 

774 def scan_and_update(): 

775 """Background thread function.""" 

776 subprocess_logs = self._scan_for_subprocess_logs() 

777 # Emit signal to update UI on main thread 

778 self._subprocess_scan_complete.emit(subprocess_logs) 

779 

780 thread = threading.Thread(target=scan_and_update, daemon=True) 

781 thread.start() 

782 logger.debug("Started async subprocess log scan in background") 

783 

784 def _scan_servers_async(self) -> None: 

785 """Scan for ZMQ servers in background thread (non-blocking).""" 

786 import threading 

787 

788 def scan_and_update(): 

789 """Background thread function.""" 

790 server_logs = self._scan_for_server_logs() 

791 # Emit signal to update UI on main thread 

792 self._server_scan_complete.emit(server_logs) 

793 

794 thread = threading.Thread(target=scan_and_update, daemon=True) 

795 thread.start() 

796 logger.debug("Started async server scan in background") 

797 

798 @pyqtSlot(list) 

799 def _on_subprocess_scan_complete(self, subprocess_logs: List[LogFileInfo]) -> None: 

800 """Handle subprocess scan completion on UI thread.""" 

801 if not subprocess_logs: 

802 logger.debug("No subprocess logs found during scan") 

803 return 

804 

805 # Add subprocess logs to master list (avoid duplicates by path) 

806 existing_paths = {log.path for log in self._all_discovered_logs} 

807 new_logs_added = 0 

808 for subprocess_log in subprocess_logs: 

809 if subprocess_log.path not in existing_paths: 

810 self._all_discovered_logs.append(subprocess_log) 

811 new_logs_added += 1 

812 

813 # Repopulate dropdown from master list 

814 self.populate_log_dropdown(self._all_discovered_logs) 

815 

816 logger.info(f"Added {new_logs_added} subprocess logs from current session " 

817 f"(scanned {len(subprocess_logs)} total)") 

818 

819 @pyqtSlot(list) 

820 def _on_server_scan_complete(self, server_logs: List[LogFileInfo]) -> None: 

821 """Handle server scan completion on UI thread.""" 

822 if not server_logs: 

823 logger.debug("No server logs found during scan") 

824 return 

825 

826 # Add server logs to master list (avoid duplicates by path) 

827 existing_paths = {log.path for log in self._all_discovered_logs} 

828 new_logs_added = 0 

829 for server_log in server_logs: 

830 if server_log.path not in existing_paths: 

831 self._all_discovered_logs.append(server_log) 

832 new_logs_added += 1 

833 

834 # Repopulate dropdown from master list 

835 self.populate_log_dropdown(self._all_discovered_logs) 

836 

837 logger.info(f"Added {new_logs_added} new server logs to dropdown (scanned {len(server_logs)} total)") 

838 

839 def _scan_for_subprocess_logs(self) -> List[LogFileInfo]: 

840 """ 

841 Efficiently scan log directory for subprocess logs from current session. 

842 Uses os.scandir() and filters by mtime FIRST before parsing. 

843 Returns list of LogFileInfo for discovered subprocess log files. 

844 """ 

845 from openhcs.core.log_utils import classify_log_file, is_openhcs_log_file 

846 from pathlib import Path 

847 import os 

848 

849 logger.debug("Scanning for subprocess logs from current session...") 

850 

851 try: 

852 # Get log directory 

853 log_dir = Path.home() / ".local" / "share" / "openhcs" / "logs" 

854 

855 if not log_dir.exists(): 

856 return [] 

857 

858 # Use os.scandir() for efficiency - it's faster than glob and gives us stat info 

859 session_logs = [] 

860 total_scanned = 0 

861 filtered_by_time = 0 

862 

863 # Calculate cutoff time (session start - 5 second buffer) 

864 cutoff_time = self._session_start_time - 5.0 

865 

866 # Scan directory efficiently 

867 with os.scandir(log_dir) as entries: 

868 for entry in entries: 

869 total_scanned += 1 

870 

871 # Skip non-.log files immediately 

872 if not entry.name.endswith('.log'): 

873 continue 

874 

875 # Filter by mtime FIRST (cheap filesystem check) 

876 # This avoids parsing thousands of old log files 

877 try: 

878 stat_info = entry.stat() 

879 if stat_info.st_mtime < cutoff_time: 

880 filtered_by_time += 1 

881 continue 

882 except OSError: 

883 continue 

884 

885 # Now check if it's an OpenHCS log (still just filename check, no file I/O) 

886 log_path = Path(entry.path) 

887 if not is_openhcs_log_file(log_path): 

888 continue 

889 

890 # Finally, classify it (this is the expensive part, but we only do it for recent files) 

891 try: 

892 log_info = classify_log_file(log_path, None, include_tui_log=False) 

893 session_logs.append(log_info) 

894 except Exception as e: 

895 logger.debug(f"Failed to classify {log_path}: {e}") 

896 

897 logger.info(f"Found {len(session_logs)} subprocess logs from current session " 

898 f"(scanned {total_scanned} files, filtered {filtered_by_time} by time)") 

899 

900 return session_logs 

901 except Exception as e: 

902 logger.warning(f"Error scanning for subprocess logs: {e}") 

903 return [] 

904 

905 def _scan_for_server_logs(self) -> List[LogFileInfo]: 

906 """ 

907 Scan for running ZMQ servers and Napari viewers by pinging common ports. 

908 Returns list of LogFileInfo for discovered server log files. 

909 """ 

910 from openhcs.core.log_utils import classify_log_file 

911 from pathlib import Path 

912 import zmq 

913 import pickle 

914 

915 logger.debug("Scanning for running ZMQ/streaming servers...") 

916 discovered_logs = [] 

917 

918 # Scan all streaming ports using current global config 

919 # This ensures we find viewers launched with custom ports 

920 from openhcs.core.config import get_all_streaming_ports 

921 ports_to_scan = get_all_streaming_ports(num_ports_per_type=10) # Uses global config by default 

922 

923 def ping_server(port: int) -> dict: 

924 """Ping a server and return pong response, or None if no response.""" 

925 from openhcs.constants.constants import CONTROL_PORT_OFFSET 

926 from openhcs.runtime.zmq_base import get_zmq_transport_url, get_default_transport_mode 

927 

928 control_port = port + CONTROL_PORT_OFFSET 

929 try: 

930 context = zmq.Context() 

931 socket = context.socket(zmq.REQ) 

932 socket.setsockopt(zmq.LINGER, 0) 

933 socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout (servers may be busy) 

934 

935 # Use transport mode-aware URL (IPC or TCP) 

936 transport_mode = get_default_transport_mode() 

937 control_url = get_zmq_transport_url(control_port, transport_mode, 'localhost') 

938 socket.connect(control_url) 

939 

940 # Send ping 

941 socket.send(pickle.dumps({'type': 'ping'})) 

942 

943 # Wait for pong 

944 response = socket.recv() 

945 pong = pickle.loads(response) 

946 

947 socket.close() 

948 context.term() 

949 logger.debug(f"Port {port} responded: {pong}") 

950 return pong 

951 except Exception as e: 

952 logger.debug(f"Port {port} no response: {e}") 

953 return None 

954 

955 # Scan all ports (execution server + all streaming types) 

956 for port in ports_to_scan: 

957 pong = ping_server(port) 

958 if pong and pong.get('log_file_path'): 

959 log_path = Path(pong['log_file_path']) 

960 if log_path.exists(): 

961 log_info = classify_log_file(log_path, None, False) 

962 discovered_logs.append(log_info) 

963 viewer_type = pong.get('viewer', 'ZMQ server') 

964 logger.debug(f"Discovered {viewer_type} log: {log_path}") 

965 

966 return discovered_logs 

967 

968 # Dropdown Management Methods 

969 def populate_log_dropdown(self, log_files: List[LogFileInfo]) -> None: 

970 """ 

971 Populate QComboBox with log files with process status indicators. 

972 

973 Args: 

974 log_files: List of LogFileInfo objects to add to dropdown 

975 """ 

976 self.log_selector.clear() 

977 

978 # Sort logs: TUI first, main subprocess, then workers by timestamp 

979 sorted_logs = sorted(log_files, key=self._log_sort_key) 

980 

981 # Filter if "show alive only" is enabled 

982 if self.show_alive_only: 

983 sorted_logs = [ 

984 log_info for log_info in sorted_logs 

985 if self._is_log_from_alive_process(log_info) 

986 ] 

987 

988 for log_info in sorted_logs: 

989 # Add process status indicator to display name 

990 display_name = get_log_display_name(log_info.path, self.process_tracker) 

991 tooltip = get_log_tooltip(log_info.path, self.process_tracker) 

992 

993 self.log_selector.addItem(display_name, log_info) 

994 # Set tooltip for the item 

995 self.log_selector.setItemData(self.log_selector.count() - 1, tooltip, Qt.ItemDataRole.ToolTipRole) 

996 

997 logger.debug(f"Populated dropdown with {len(sorted_logs)} log files (filtered: {self.show_alive_only})") 

998 

999 def _log_sort_key(self, log_info: LogFileInfo) -> tuple: 

1000 """ 

1001 Generate sort key for log files. 

1002 

1003 Args: 

1004 log_info: LogFileInfo to generate sort key for 

1005 

1006 Returns: 

1007 tuple: Sort key (priority, timestamp) 

1008 """ 

1009 # Priority: TUI=0, main=1, worker=2, unknown=3 

1010 priority_map = {"tui": 0, "main": 1, "worker": 2, "unknown": 3} 

1011 priority = priority_map.get(log_info.log_type, 3) 

1012 

1013 # Use file modification time as secondary sort 

1014 try: 

1015 timestamp = log_info.path.stat().st_mtime 

1016 except (OSError, AttributeError): 

1017 timestamp = 0 

1018 

1019 return (priority, -timestamp) # Negative timestamp for newest first 

1020 

1021 def clear_subprocess_logs(self) -> None: 

1022 """Remove all non-TUI logs from dropdown and switch to TUI log.""" 

1023 import traceback 

1024 logger.error(f"🔥 DEBUG: clear_subprocess_logs called! Stack trace:") 

1025 for line in traceback.format_stack(): 

1026 logger.error(f"🔥 DEBUG: {line.strip()}") 

1027 

1028 current_logs = [] 

1029 

1030 # Collect TUI logs only 

1031 for i in range(self.log_selector.count()): 

1032 log_info = self.log_selector.itemData(i) 

1033 if log_info and log_info.log_type == "tui": 

1034 current_logs.append(log_info) 

1035 

1036 # Repopulate with TUI logs only 

1037 self.populate_log_dropdown(current_logs) 

1038 

1039 # Auto-select TUI log if available 

1040 if current_logs: 

1041 self.switch_to_log(current_logs[0].path) 

1042 

1043 logger.info("Cleared subprocess logs, kept TUI logs") 

1044 

1045 def add_new_log(self, log_file_info: LogFileInfo) -> None: 

1046 """ 

1047 Add new log to dropdown maintaining sort order. 

1048 

1049 Args: 

1050 log_file_info: New LogFileInfo to add 

1051 """ 

1052 # Add to master list (avoid duplicates by path) 

1053 existing_paths = {log.path for log in self._all_discovered_logs} 

1054 if log_file_info.path not in existing_paths: 

1055 self._all_discovered_logs.append(log_file_info) 

1056 

1057 # Repopulate dropdown from master list 

1058 self.populate_log_dropdown(self._all_discovered_logs) 

1059 

1060 logger.info(f"Added new log to dropdown: {log_file_info.display_name}") 

1061 else: 

1062 logger.debug(f"Log already exists, skipping: {log_file_info.display_name}") 

1063 

1064 def on_log_selection_changed(self, index: int) -> None: 

1065 """ 

1066 Handle dropdown selection change - switch log display. 

1067 

1068 Args: 

1069 index: Selected index in dropdown 

1070 """ 

1071 if index >= 0: 

1072 log_info = self.log_selector.itemData(index) 

1073 if log_info: 

1074 self.switch_to_log(log_info.path) 

1075 

1076 def switch_to_log(self, log_path: Path) -> None: 

1077 """ 

1078 Switch log display to show specified log file. 

1079 

1080 Args: 

1081 log_path: Path to log file to display 

1082 """ 

1083 try: 

1084 # Stop current tailing 

1085 if self.tail_timer and self.tail_timer.isActive(): 

1086 self.tail_timer.stop() 

1087 

1088 # Stop any existing file loader 

1089 if self.file_loader and self.file_loader.isRunning(): 

1090 self.file_loader.wait() 

1091 

1092 # Validate file exists 

1093 if not log_path.exists(): 

1094 self.log_display.setText(f"Log file not found: {log_path}") 

1095 return 

1096 

1097 # Store path for later use 

1098 self.current_log_path = log_path 

1099 

1100 # ALWAYS use async loading to prevent UI blocking 

1101 # QSyntaxHighlighter is already lazy - it only highlights visible blocks 

1102 file_size = log_path.stat().st_size 

1103 logger.debug(f"Loading log file ({file_size} bytes) asynchronously") 

1104 self.log_display.setText(f"Loading log file ({file_size // 1024} KB)...") 

1105 

1106 # Create and start async loader 

1107 self.file_loader = LogFileLoader(log_path) 

1108 self.file_loader.content_loaded.connect(self._on_file_loaded) 

1109 self.file_loader.load_failed.connect(self._on_file_load_failed) 

1110 self.file_loader.start() 

1111 

1112 except Exception as e: 

1113 logger.error(f"Error switching to log {log_path}: {e}") 

1114 raise 

1115 

1116 def _on_file_loaded(self, content: str) -> None: 

1117 """Handle file content loaded (either sync or async).""" 

1118 try: 

1119 # Set content - QSyntaxHighlighter only processes visible blocks automatically 

1120 self.log_display.setText(content) 

1121 

1122 # Update file position 

1123 self.current_file_position = len(content.encode('utf-8')) 

1124 

1125 # Start tailing if not paused 

1126 if not self.tailing_paused and self.current_log_path: 

1127 self.start_log_tailing(self.current_log_path) 

1128 

1129 # Scroll to bottom if auto-scroll enabled 

1130 if self.auto_scroll_enabled: 

1131 self.scroll_to_bottom() 

1132 

1133 logger.info(f"Loaded log file: {self.current_log_path}") 

1134 

1135 except Exception as e: 

1136 logger.error(f"Error displaying loaded content: {e}") 

1137 

1138 def _on_file_load_failed(self, error_msg: str) -> None: 

1139 """Handle file load failure.""" 

1140 self.log_display.setText(f"Failed to load log file: {error_msg}") 

1141 logger.error(f"Failed to load log file: {error_msg}") 

1142 

1143 # Search Functionality Methods 

1144 def toggle_search_toolbar(self) -> None: 

1145 """Show/hide search toolbar (Ctrl+F handler).""" 

1146 if self.search_toolbar.isVisible(): 

1147 # Hide toolbar and clear highlights 

1148 self.search_toolbar.setVisible(False) 

1149 self.clear_search_highlights() 

1150 else: 

1151 # Show toolbar and focus search input 

1152 self.search_toolbar.setVisible(True) 

1153 self.search_input.setFocus() 

1154 self.search_input.selectAll() 

1155 

1156 def perform_search(self) -> None: 

1157 """Search in log display using QTextEdit.find().""" 

1158 search_text = self.search_input.text() 

1159 if not search_text: 

1160 self.clear_search_highlights() 

1161 return 

1162 

1163 # Clear previous highlights if search text changed 

1164 if search_text != self.current_search_text: 

1165 self.clear_search_highlights() 

1166 self.current_search_text = search_text 

1167 self.highlight_all_matches(search_text) 

1168 

1169 # Find next occurrence 

1170 flags = QTextDocument.FindFlag(0) 

1171 if self.case_sensitive_cb.isChecked(): 

1172 flags |= QTextDocument.FindFlag.FindCaseSensitively 

1173 

1174 found = self.log_display.find(search_text, flags) 

1175 if not found: 

1176 # Try from beginning 

1177 cursor = self.log_display.textCursor() 

1178 cursor.movePosition(cursor.MoveOperation.Start) 

1179 self.log_display.setTextCursor(cursor) 

1180 self.log_display.find(search_text, flags) 

1181 

1182 def highlight_all_matches(self, search_text: str) -> None: 

1183 """ 

1184 Highlight all matches of search text in the document. 

1185 

1186 Args: 

1187 search_text: Text to search and highlight 

1188 """ 

1189 if not search_text: 

1190 return 

1191 

1192 # Create highlight format 

1193 highlight_format = QTextCharFormat() 

1194 highlight_format.setBackground(QColor(255, 255, 0, 100)) # Yellow with transparency 

1195 

1196 # Search through entire document 

1197 document = self.log_display.document() 

1198 cursor = QTextCursor(document) 

1199 

1200 flags = QTextDocument.FindFlag(0) 

1201 if self.case_sensitive_cb.isChecked(): 

1202 flags |= QTextDocument.FindFlag.FindCaseSensitively 

1203 

1204 self.search_highlights.clear() 

1205 

1206 while True: 

1207 cursor = document.find(search_text, cursor, flags) 

1208 if cursor.isNull(): 

1209 break 

1210 

1211 # Apply highlight 

1212 cursor.mergeCharFormat(highlight_format) 

1213 self.search_highlights.append(cursor) 

1214 

1215 logger.debug(f"Highlighted {len(self.search_highlights)} search matches") 

1216 

1217 def clear_search_highlights(self) -> None: 

1218 """Clear all search highlights from the document.""" 

1219 # Reset format for all highlighted text 

1220 for cursor in self.search_highlights: 

1221 if not cursor.isNull(): 

1222 # Reset to default format 

1223 default_format = QTextCharFormat() 

1224 cursor.setCharFormat(default_format) 

1225 

1226 self.search_highlights.clear() 

1227 self.current_search_text = "" 

1228 

1229 def find_next(self) -> None: 

1230 """Find next search result.""" 

1231 self.perform_search() 

1232 

1233 def find_previous(self) -> None: 

1234 """Find previous search result.""" 

1235 search_text = self.search_input.text() 

1236 if not search_text: 

1237 return 

1238 

1239 flags = QTextDocument.FindFlag.FindBackward 

1240 if self.case_sensitive_cb.isChecked(): 

1241 flags |= QTextDocument.FindFlag.FindCaseSensitively 

1242 

1243 found = self.log_display.find(search_text, flags) 

1244 if not found: 

1245 # Try from end 

1246 cursor = self.log_display.textCursor() 

1247 cursor.movePosition(cursor.MoveOperation.End) 

1248 self.log_display.setTextCursor(cursor) 

1249 self.log_display.find(search_text, flags) 

1250 

1251 # Control Button Methods 

1252 def toggle_auto_scroll(self, enabled: bool) -> None: 

1253 """Toggle auto-scroll to bottom.""" 

1254 self.auto_scroll_enabled = enabled 

1255 logger.debug(f"Auto-scroll {'enabled' if enabled else 'disabled'}") 

1256 

1257 def toggle_pause_tailing(self, paused: bool) -> None: 

1258 """Toggle pause/resume log tailing.""" 

1259 self.tailing_paused = paused 

1260 if paused and self.tail_timer: 

1261 self.tail_timer.stop() 

1262 elif not paused and self.current_log_path: 

1263 self.start_log_tailing(self.current_log_path) 

1264 logger.debug(f"Log tailing {'paused' if paused else 'resumed'}") 

1265 

1266 def clear_log_display(self) -> None: 

1267 """Clear current log display content.""" 

1268 self.log_display.clear() 

1269 logger.debug("Log display cleared") 

1270 

1271 def scroll_to_bottom(self) -> None: 

1272 """Scroll log display to bottom.""" 

1273 scrollbar = self.log_display.verticalScrollBar() 

1274 scrollbar.setValue(scrollbar.maximum()) 

1275 

1276 

1277 

1278 # Real-time Tailing Methods 

1279 def start_log_tailing(self, log_path: Path) -> None: 

1280 """ 

1281 Start tailing log file with QTimer (100ms interval). 

1282 

1283 Args: 

1284 log_path: Path to log file to tail 

1285 """ 

1286 # Stop any existing timer 

1287 if self.tail_timer: 

1288 self.tail_timer.stop() 

1289 

1290 # Create new timer 

1291 self.tail_timer = QTimer() 

1292 self.tail_timer.timeout.connect(self.read_log_incremental) 

1293 self.tail_timer.start(100) # 100ms interval 

1294 

1295 logger.debug(f"Started tailing log file: {log_path}") 

1296 

1297 def stop_log_tailing(self) -> None: 

1298 """Stop current log tailing.""" 

1299 if self.tail_timer: 

1300 self.tail_timer.stop() 

1301 self.tail_timer = None 

1302 logger.debug("Stopped log tailing") 

1303 

1304 def read_log_incremental(self) -> None: 

1305 """Read new content from current log file (track file position).""" 

1306 if not self.current_log_path or not self.current_log_path.exists(): 

1307 return 

1308 

1309 try: 

1310 # Get current file size 

1311 current_size = self.current_log_path.stat().st_size 

1312 

1313 # Handle log rotation (file size decreased) 

1314 if current_size < self.current_file_position: 

1315 logger.info(f"Log rotation detected for {self.current_log_path}") 

1316 self.current_file_position = 0 

1317 # Optionally clear display or add rotation marker 

1318 self.log_display.append("\n--- Log rotated ---\n") 

1319 

1320 # Read new content if file grew 

1321 if current_size > self.current_file_position: 

1322 with open(self.current_log_path, 'rb') as f: 

1323 f.seek(self.current_file_position) 

1324 new_data = f.read(current_size - self.current_file_position) 

1325 

1326 # Decode new content 

1327 try: 

1328 new_content = new_data.decode('utf-8', errors='replace') 

1329 except UnicodeDecodeError: 

1330 new_content = new_data.decode('latin-1', errors='replace') 

1331 

1332 if new_content: 

1333 # Check if user has scrolled up (disable auto-scroll) 

1334 scrollbar = self.log_display.verticalScrollBar() 

1335 was_at_bottom = scrollbar.value() >= scrollbar.maximum() - 10 

1336 

1337 # Append new content 

1338 cursor = self.log_display.textCursor() 

1339 cursor.movePosition(cursor.MoveOperation.End) 

1340 cursor.insertText(new_content) 

1341 

1342 # Auto-scroll if enabled and user was at bottom 

1343 if self.auto_scroll_enabled and was_at_bottom: 

1344 self.scroll_to_bottom() 

1345 

1346 # Update file position 

1347 self.current_file_position = current_size 

1348 

1349 except (OSError, PermissionError) as e: 

1350 logger.warning(f"Error reading log file {self.current_log_path}: {e}") 

1351 # Handle file deletion/recreation 

1352 if not self.current_log_path.exists(): 

1353 logger.info(f"Log file deleted: {self.current_log_path}") 

1354 self.log_display.append(f"\n--- Log file deleted: {self.current_log_path} ---\n") 

1355 # Try to reconnect after a delay 

1356 QTimer.singleShot(1000, self._attempt_reconnection) 

1357 except Exception as e: 

1358 logger.error(f"Unexpected error in log tailing: {e}") 

1359 raise 

1360 

1361 def _attempt_reconnection(self) -> None: 

1362 """Attempt to reconnect to log file after deletion.""" 

1363 if self.current_log_path and self.current_log_path.exists(): 

1364 logger.info(f"Log file recreated, reconnecting: {self.current_log_path}") 

1365 self.current_file_position = 0 

1366 self.log_display.append(f"\n--- Reconnected to: {self.current_log_path} ---\n") 

1367 # File will be read on next timer tick 

1368 

1369 # External Integration Methods 

1370 def start_monitoring(self, base_log_path: Optional[str] = None) -> None: 

1371 """Start monitoring for new logs.""" 

1372 if self.file_detector: 

1373 self.file_detector.stop_watching() 

1374 

1375 # Get log directory 

1376 log_directory = Path(base_log_path).parent if base_log_path else Path.home() / ".local" / "share" / "openhcs" / "logs" 

1377 

1378 # Start file watching 

1379 self.file_detector = LogFileDetector(base_log_path) 

1380 self.file_detector.new_log_detected.connect(self.add_new_log) 

1381 self.file_detector.start_watching(log_directory) 

1382 

1383 def stop_monitoring(self) -> None: 

1384 """Stop monitoring for new logs.""" 

1385 if self.file_detector: 

1386 self.file_detector.stop_watching() 

1387 self.file_detector = None 

1388 logger.info("Stopped monitoring for new logs") 

1389 

1390 def start_process_tracking(self) -> None: 

1391 """Start periodic process status updates.""" 

1392 # Initial update 

1393 self.process_tracker.update() 

1394 

1395 # Setup timer for periodic updates (every 2 seconds) 

1396 self.process_update_timer = QTimer() 

1397 self.process_update_timer.timeout.connect(self.update_process_status) 

1398 self.process_update_timer.start(2000) # 2 second interval 

1399 

1400 logger.debug("Started process tracking") 

1401 

1402 def update_process_status(self) -> None: 

1403 """Update process status and refresh dropdown if needed.""" 

1404 # Update process tracker 

1405 self.process_tracker.update() 

1406 

1407 # Refresh dropdown to update status indicators 

1408 # Only if we have logs loaded 

1409 if self.log_selector.count() > 0: 

1410 # Remember current selection 

1411 current_index = self.log_selector.currentIndex() 

1412 current_log_info = self.log_selector.itemData(current_index) if current_index >= 0 else None 

1413 

1414 # Temporarily disconnect signal to avoid triggering reload 

1415 self.log_selector.currentIndexChanged.disconnect(self.on_log_selection_changed) 

1416 

1417 try: 

1418 # Repopulate from master list with updated status indicators 

1419 self.populate_log_dropdown(self._all_discovered_logs) 

1420 

1421 # Restore selection if possible 

1422 if current_log_info: 

1423 # Find the same log in the new dropdown 

1424 for i in range(self.log_selector.count()): 

1425 log_info = self.log_selector.itemData(i) 

1426 if log_info and log_info.path == current_log_info.path: 

1427 self.log_selector.setCurrentIndex(i) 

1428 break 

1429 finally: 

1430 # Reconnect signal 

1431 self.log_selector.currentIndexChanged.connect(self.on_log_selection_changed) 

1432 

1433 def _get_process_start_time(self) -> float: 

1434 """ 

1435 Get the start time of the current process. 

1436 

1437 Returns: 

1438 float: Process start time as Unix timestamp 

1439 """ 

1440 try: 

1441 import psutil 

1442 import os 

1443 process = psutil.Process(os.getpid()) 

1444 return process.create_time() 

1445 except Exception as e: 

1446 logger.warning(f"Failed to get process start time: {e}") 

1447 # Fallback to current time 

1448 import time 

1449 return time.time() 

1450 

1451 def _is_log_from_current_session(self, log_info: LogFileInfo) -> bool: 

1452 """ 

1453 Check if a log file was created during the current session. 

1454 

1455 Args: 

1456 log_info: LogFileInfo to check 

1457 

1458 Returns: 

1459 bool: True if log was created after session start time 

1460 """ 

1461 try: 

1462 # Get file modification time (when log was created/last written) 

1463 mtime = log_info.path.stat().st_mtime 

1464 # Allow a small buffer (5 seconds) to account for timing differences 

1465 return mtime >= (self._session_start_time - 5.0) 

1466 except (OSError, FileNotFoundError): 

1467 # If we can't stat the file, exclude it 

1468 return False 

1469 

1470 def _is_log_from_alive_process(self, log_info: LogFileInfo) -> bool: 

1471 """ 

1472 Check if a log file is from a currently running process. 

1473 

1474 Args: 

1475 log_info: LogFileInfo to check 

1476 

1477 Returns: 

1478 bool: True if process is alive or unknown, False if terminated 

1479 """ 

1480 pid = extract_pid_from_log_filename(log_info.path) 

1481 if pid is None: 

1482 # No PID found - assume it's a main log (always show) 

1483 return True 

1484 return self.process_tracker.is_alive(pid) 

1485 

1486 def on_filter_changed(self, state: int) -> None: 

1487 """ 

1488 Handle filter checkbox state change. 

1489 

1490 Args: 

1491 state: Qt.CheckState value 

1492 """ 

1493 self.show_alive_only = (state == Qt.CheckState.Checked.value) 

1494 

1495 # Refresh dropdown with filter applied 

1496 # Always use master list as source, not the current dropdown 

1497 # This ensures all logs are available when filter is toggled off 

1498 if self._all_discovered_logs: 

1499 self.populate_log_dropdown(self._all_discovered_logs) 

1500 

1501 logger.debug(f"Filter changed: show_alive_only={self.show_alive_only}") 

1502 

1503 def cleanup(self) -> None: 

1504 """Cleanup all resources and background processes.""" 

1505 try: 

1506 # Stop tailing timer 

1507 if hasattr(self, 'tail_timer') and self.tail_timer and self.tail_timer.isActive(): 

1508 self.tail_timer.stop() 

1509 self.tail_timer.deleteLater() 

1510 self.tail_timer = None 

1511 

1512 # Stop process tracking timer 

1513 if hasattr(self, 'process_update_timer') and self.process_update_timer and self.process_update_timer.isActive(): 

1514 self.process_update_timer.stop() 

1515 self.process_update_timer.deleteLater() 

1516 self.process_update_timer = None 

1517 

1518 # Stop file monitoring 

1519 self.stop_monitoring() 

1520 

1521 # Clean up file detector 

1522 if hasattr(self, 'file_detector') and self.file_detector: 

1523 self.file_detector.stop_watching() 

1524 self.file_detector = None 

1525 

1526 except Exception as e: 

1527 logger.warning(f"Error during log viewer cleanup: {e}") 

1528 

1529 def closeEvent(self, event) -> None: 

1530 """Handle window close event.""" 

1531 if self.file_detector: 

1532 self.file_detector.stop_watching() 

1533 if self.tail_timer: 

1534 self.tail_timer.stop() 

1535 if hasattr(self, 'process_update_timer') and self.process_update_timer: 

1536 self.process_update_timer.stop() 

1537 self.window_closed.emit() 

1538 super().closeEvent(event)