Coverage for openhcs/textual_tui/widgets/status_bar.py: 0.0%
223 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2StatusBar Widget for OpenHCS Textual TUI
4Bottom status bar with real-time log streaming.
5Shows live log messages from OpenHCS operations.
6"""
8import logging
9import os
10import time
11from datetime import datetime
12from collections import deque
13from typing import Optional, List
14from pathlib import Path
16from textual import work
17from textual.app import ComposeResult
18from textual.reactive import reactive
19from textual.widgets import Static
20from textual.widget import Widget
21from textual.worker import get_current_worker
22from textual.events import Click
24logger = logging.getLogger(__name__)
27class TUILogHandler(logging.Handler):
28 """Custom logging handler that feeds log messages to the TUI status bar with batching."""
30 def __init__(self, status_bar: 'StatusBar'):
31 super().__init__()
32 self.status_bar = status_bar
33 self.setLevel(logging.INFO) # Only show INFO and above in status bar
35 # Format for status bar (compact)
36 formatter = logging.Formatter('%(levelname)s: %(message)s')
37 self.setFormatter(formatter)
39 # Batching to reduce call_from_thread frequency
40 self.pending_messages = []
41 self.last_update_time = 0
42 self.batch_interval = 0.5 # Batch messages for 500ms
44 def emit(self, record):
45 """Emit a log record to the status bar with batching."""
46 try:
47 msg = self.format(record)
49 # Add to pending messages
50 self.pending_messages.append((msg, record.levelname))
52 # Only update UI if enough time has passed (batching)
53 current_time = time.time()
54 if current_time - self.last_update_time >= self.batch_interval:
55 self._flush_pending_messages()
56 self.last_update_time = current_time
58 except (AttributeError, ValueError, TypeError) as e:
59 # Don't let logging errors crash the app, but log the issue
60 import logging
61 logger = logging.getLogger(__name__)
62 logger.debug(f"Status bar emit error (non-critical): {e}")
64 def _flush_pending_messages(self):
65 """Flush all pending messages to the UI."""
66 if not self.pending_messages:
67 return
69 # Take the most recent message as the current display
70 latest_msg, latest_level = self.pending_messages[-1]
72 # Clear pending messages
73 self.pending_messages.clear()
75 # Update UI with latest message only (avoid spam)
76 try:
77 if hasattr(self.status_bar.app, 'call_from_thread'):
78 self.status_bar.app.call_from_thread(self.status_bar.add_log_message, latest_msg, latest_level)
79 else:
80 # Fallback for direct calls
81 self.status_bar.add_log_message(latest_msg, latest_level)
82 except Exception:
83 pass
86class StatusBar(Widget):
87 """
88 Top status bar widget with real-time log streaming.
90 Layout: |————————————————————————— Live Log Messages |
91 Shows live log messages from OpenHCS operations in real-time.
92 """
94 # Reactive state
95 current_log_message = reactive("Ready")
96 last_updated = reactive("")
98 def __init__(self, max_history: int = 100):
99 """Initialize the status bar with log streaming."""
100 super().__init__()
101 self.last_updated = datetime.now().strftime("%H:%M:%S")
102 self.max_history = max_history
103 self.log_history = deque(maxlen=max_history) # Keep recent log messages
104 self.log_handler: Optional[TUILogHandler] = None
106 # Log file monitoring for subprocess logs - using Textual workers
107 self.log_file_path: Optional[str] = None
108 self.log_file_position: int = 0
109 self.log_monitor_worker = None
110 self.subprocess_base_log_path: Optional[str] = None # For ReactiveLogMonitor
112 logger.debug("StatusBar initialized with real-time logging")
114 def compose(self) -> ComposeResult:
115 """Compose the status bar layout."""
116 # Make the status bar clickable to open Toolong
117 yield Static(
118 self.get_log_display() + " 📋",
119 id="log_display",
120 markup=False,
121 classes="clickable-status"
122 )
124 def get_log_display(self) -> str:
125 """Get the formatted log display string."""
126 timestamp = datetime.now().strftime("%H:%M:%S")
127 return f"[{timestamp}] {self.current_log_message}"
129 async def on_click(self, event: Click) -> None:
130 """Handle click on status bar to open Toolong log viewer."""
131 try:
132 await self.open_toolong_viewer()
133 except Exception as e:
134 logger.error(f"Failed to open Toolong viewer: {e}")
135 self.add_log_message(f"Error opening log viewer: {e}", "ERROR")
137 async def open_toolong_viewer(self) -> None:
138 """Open Toolong log viewer in a window using ReactiveLogMonitor."""
139 from openhcs.textual_tui.windows.toolong_window import ToolongWindow
140 from textual.css.query import NoMatches
142 # Determine base log path for subprocess logs (if any)
143 base_log_path = ""
144 if hasattr(self, 'subprocess_base_log_path') and self.subprocess_base_log_path:
145 base_log_path = self.subprocess_base_log_path
147 # Try to find existing window - if it doesn't exist, create new one
148 try:
149 window = self.app.query_one(ToolongWindow)
150 # Window exists, just open it
151 window.open_state = True
152 except NoMatches:
153 # Expected case: window doesn't exist yet, create new one
154 window = ToolongWindow(base_log_path=base_log_path)
155 await self.app.mount(window)
156 window.open_state = True
158 def watch_current_log_message(self, message: str) -> None:
159 """Update the display when current_log_message changes."""
160 self.last_updated = datetime.now().strftime("%H:%M:%S")
161 # Use call_later to defer DOM operations to next event loop cycle
162 self.call_later(self._update_log_display)
164 def _update_log_display(self) -> None:
165 """Update the log display - deferred to avoid blocking reactive watchers."""
166 try:
167 log_display = self.query_one("#log_display")
168 log_display.update(self.get_log_display())
169 except Exception:
170 # Widget might not be mounted yet
171 pass
173 def on_mount(self) -> None:
174 """Set up log handler when widget is mounted."""
175 self.setup_log_handler()
176 self.start_log_file_monitoring()
178 def on_unmount(self) -> None:
179 """Clean up log handler and background thread when widget is unmounted."""
180 self.cleanup_log_handler()
181 self.stop_log_file_monitoring()
183 def setup_log_handler(self) -> None:
184 """Set up the custom log handler to capture OpenHCS logs."""
185 if self.log_handler is None:
186 self.log_handler = TUILogHandler(self)
188 # Add handler to OpenHCS root logger to capture all OpenHCS logs
189 openhcs_logger = logging.getLogger("openhcs")
190 openhcs_logger.addHandler(self.log_handler)
192 logger.debug("Real-time log handler attached to OpenHCS logger")
194 def cleanup_log_handler(self) -> None:
195 """Remove the log handler."""
196 if self.log_handler is not None:
197 openhcs_logger = logging.getLogger("openhcs")
198 openhcs_logger.removeHandler(self.log_handler)
199 self.log_handler = None
200 logger.debug("Real-time log handler removed")
202 def add_log_message(self, message: str, level: str) -> None:
203 """
204 Add a new log message to the status bar.
206 Args:
207 message: Log message to display
208 level: Log level (INFO, WARNING, ERROR, etc.)
209 """
210 # Store in history
211 timestamp = datetime.now().strftime("%H:%M:%S")
212 log_entry = f"[{timestamp}] {message}"
213 self.log_history.append(log_entry)
215 # Update current display
216 self.current_log_message = message
218 def get_recent_logs(self, count: int = 10) -> list:
219 """Get the most recent log messages."""
220 return list(self.log_history)[-count:]
222 def collect_log_files(self) -> List[str]:
223 """Collect all available OpenHCS log files for viewing."""
224 log_files = []
226 # Add main TUI log file if available
227 if self.log_file_path and Path(self.log_file_path).exists():
228 log_files.append(self.log_file_path)
230 # Look for subprocess and worker log files in the log directory
231 if self.log_file_path:
232 log_dir = Path(self.log_file_path).parent
234 # Find subprocess logs
235 subprocess_logs = list(log_dir.glob("openhcs_subprocess_*.log"))
236 log_files.extend(str(f) for f in subprocess_logs)
238 # Find worker logs
239 worker_logs = list(log_dir.glob("openhcs_subprocess_*_worker_*.log"))
240 log_files.extend(str(f) for f in worker_logs)
242 # Remove duplicates and sort
243 log_files = sorted(list(set(log_files)))
245 logger.info(f"Collected {len(log_files)} log files for Toolong viewer")
246 return log_files
248 def start_log_monitoring(self, base_log_path: str) -> None:
249 """Start log monitoring for subprocess with base path tracking."""
250 self.subprocess_base_log_path = base_log_path
251 logger.info(f"StatusBar: Started log monitoring for base path: {base_log_path}")
253 def stop_log_monitoring(self) -> None:
254 """Stop log monitoring and clear subprocess base path."""
255 self.subprocess_base_log_path = None
256 logger.info("StatusBar: Stopped log monitoring")
258 # Legacy methods for compatibility
259 def set_status(self, message: str) -> None:
260 """Legacy method - now adds as INFO log."""
261 self.add_log_message(f"Status: {message}", "INFO")
263 def set_error(self, error_message: str) -> None:
264 """Legacy method - now adds as ERROR log."""
265 self.add_log_message(f"ERROR: {error_message}", "ERROR")
267 def set_info(self, info_message: str) -> None:
268 """Legacy method - now adds as INFO log."""
269 self.add_log_message(f"INFO: {info_message}", "INFO")
271 def clear_status(self) -> None:
272 """Legacy method - now shows ready message."""
273 self.current_log_message = "Ready"
275 def start_log_file_monitoring(self) -> None:
276 """Start monitoring the log file for subprocess updates using Textual worker."""
277 # Get current log file path from the logging system
278 self.log_file_path = self.get_current_log_file_path()
279 if self.log_file_path and Path(self.log_file_path).exists():
280 # Start from end of file to only show new logs
281 self.log_file_position = Path(self.log_file_path).stat().st_size
283 # Stop any existing monitoring
284 self.stop_log_file_monitoring()
286 # Start Textual worker using @work decorated method
287 self.log_monitor_worker = self._log_monitor_worker()
288 logger.debug(f"Started worker log file monitoring: {self.log_file_path}")
290 def stop_log_file_monitoring(self) -> None:
291 """Stop monitoring the log file and cleanup worker."""
292 if self.log_monitor_worker and not self.log_monitor_worker.is_finished:
293 self.log_monitor_worker.cancel()
294 self.log_monitor_worker = None
295 logger.debug("Stopped log file monitoring")
297 def get_current_log_file_path(self) -> Optional[str]:
298 """Get the current log file path from the logging system."""
299 try:
300 # Get the root logger and find the FileHandler
301 root_logger = logging.getLogger()
302 for handler in root_logger.handlers:
303 if isinstance(handler, logging.FileHandler):
304 return handler.baseFilename
306 # Fallback: try to get from openhcs logger
307 openhcs_logger = logging.getLogger("openhcs")
308 for handler in openhcs_logger.handlers:
309 if isinstance(handler, logging.FileHandler):
310 return handler.baseFilename
312 return None
313 except Exception as e:
314 logger.debug(f"Could not determine log file path: {e}")
315 return None
317 @work(thread=True, exclusive=True)
318 def _log_monitor_worker(self) -> None:
319 """Textual worker that monitors log file for changes."""
320 worker = get_current_worker()
322 while not worker.is_cancelled:
323 try:
324 if not self.log_file_path or not Path(self.log_file_path).exists():
325 time.sleep(0.5) # Check every 500ms (reduced frequency)
326 continue
328 # Check for new content
329 current_size = Path(self.log_file_path).stat().st_size
330 if current_size > self.log_file_position:
331 # Read new content
332 with open(self.log_file_path, 'r') as f:
333 f.seek(self.log_file_position)
334 new_lines = f.readlines()
335 self.log_file_position = f.tell()
337 # Process new log lines and batch UI updates
338 messages_to_add = []
339 for line in new_lines:
340 line = line.strip()
341 if line and self.is_subprocess_log(line):
342 # Extract the message part for display
343 message = self.extract_log_message(line)
344 if message:
345 messages_to_add.append(message)
347 # Batch update UI if we have messages and not cancelled
348 if messages_to_add and not worker.is_cancelled:
349 # Thread-safe UI update - batch all messages at once
350 self.app.call_from_thread(self._batch_add_log_messages, messages_to_add)
352 # Sleep for 2 seconds before next check (much reduced frequency)
353 time.sleep(2.0)
355 except Exception as e:
356 logger.debug(f"Error in log monitor worker: {e}")
357 time.sleep(1.0) # Longer sleep on error
359 def _batch_add_log_messages(self, messages: List[str]) -> None:
360 """Add multiple log messages at once to reduce reactive watcher triggers."""
361 if not messages:
362 return
364 # Add all messages to history without triggering reactive updates
365 timestamp = datetime.now().strftime("%H:%M:%S")
366 for message in messages:
367 log_entry = f"[{timestamp}] {message}"
368 self.log_history.append(log_entry)
370 # Only trigger ONE reactive update with the latest message
371 self.current_log_message = messages[-1]
373 def is_subprocess_log(self, line: str) -> bool:
374 """Accept ANY log line - show all subprocess output."""
375 return True # Show everything
377 def extract_log_message(self, line: str) -> Optional[str]:
378 """Extract the message part from any log line, removing timestamp."""
379 try:
380 # Remove timestamp: "2025-06-18 01:00:50,281 - logger - level - message"
381 # Find first " - " and take everything after it
382 if " - " in line:
383 parts = line.split(" - ", 1)
384 if len(parts) > 1:
385 clean_line = parts[1].strip()
386 else:
387 clean_line = line.strip()
388 else:
389 clean_line = line.strip()
391 # Allow more characters (200 instead of 100)
392 if len(clean_line) > 200:
393 return clean_line[:197] + "..."
394 return clean_line
395 except Exception:
396 return line.strip() if line else None