Coverage for openhcs/textual_tui/widgets/status_bar.py: 0.0%
222 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2StatusBar Widget for OpenHCS Textual TUI
4Bottom status bar with real-time log streaming.
5Shows live log messages from OpenHCS operations.
6"""
8import logging
9import time
10from datetime import datetime
11from collections import deque
12from typing import Optional, List
13from pathlib import Path
15from textual import work
16from textual.app import ComposeResult
17from textual.reactive import reactive
18from textual.widgets import Static
19from textual.widget import Widget
20from textual.worker import get_current_worker
21from textual.events import Click
23logger = logging.getLogger(__name__)
26class TUILogHandler(logging.Handler):
27 """Custom logging handler that feeds log messages to the TUI status bar with batching."""
29 def __init__(self, status_bar: 'StatusBar'):
30 super().__init__()
31 self.status_bar = status_bar
32 self.setLevel(logging.INFO) # Only show INFO and above in status bar
34 # Format for status bar (compact)
35 formatter = logging.Formatter('%(levelname)s: %(message)s')
36 self.setFormatter(formatter)
38 # Batching to reduce call_from_thread frequency
39 self.pending_messages = []
40 self.last_update_time = 0
41 self.batch_interval = 0.5 # Batch messages for 500ms
43 def emit(self, record):
44 """Emit a log record to the status bar with batching."""
45 try:
46 msg = self.format(record)
48 # Add to pending messages
49 self.pending_messages.append((msg, record.levelname))
51 # Only update UI if enough time has passed (batching)
52 current_time = time.time()
53 if current_time - self.last_update_time >= self.batch_interval:
54 self._flush_pending_messages()
55 self.last_update_time = current_time
57 except (AttributeError, ValueError, TypeError) as e:
58 # Don't let logging errors crash the app, but log the issue
59 import logging
60 logger = logging.getLogger(__name__)
61 logger.debug(f"Status bar emit error (non-critical): {e}")
63 def _flush_pending_messages(self):
64 """Flush all pending messages to the UI."""
65 if not self.pending_messages:
66 return
68 # Take the most recent message as the current display
69 latest_msg, latest_level = self.pending_messages[-1]
71 # Clear pending messages
72 self.pending_messages.clear()
74 # Update UI with latest message only (avoid spam)
75 try:
76 if hasattr(self.status_bar.app, 'call_from_thread'):
77 self.status_bar.app.call_from_thread(self.status_bar.add_log_message, latest_msg, latest_level)
78 else:
79 # Fallback for direct calls
80 self.status_bar.add_log_message(latest_msg, latest_level)
81 except Exception:
82 pass
85class StatusBar(Widget):
86 """
87 Top status bar widget with real-time log streaming.
89 Layout: |————————————————————————— Live Log Messages |
90 Shows live log messages from OpenHCS operations in real-time.
91 """
93 # Reactive state
94 current_log_message = reactive("Ready")
95 last_updated = reactive("")
97 def __init__(self, max_history: int = 100):
98 """Initialize the status bar with log streaming."""
99 super().__init__()
100 self.last_updated = datetime.now().strftime("%H:%M:%S")
101 self.max_history = max_history
102 self.log_history = deque(maxlen=max_history) # Keep recent log messages
103 self.log_handler: Optional[TUILogHandler] = None
105 # Log file monitoring for subprocess logs - using Textual workers
106 self.log_file_path: Optional[str] = None
107 self.log_file_position: int = 0
108 self.log_monitor_worker = None
109 self.subprocess_base_log_path: Optional[str] = None # For ReactiveLogMonitor
111 logger.debug("StatusBar initialized with real-time logging")
113 def compose(self) -> ComposeResult:
114 """Compose the status bar layout."""
115 # Make the status bar clickable to open Toolong
116 yield Static(
117 self.get_log_display() + " 📋",
118 id="log_display",
119 markup=False,
120 classes="clickable-status"
121 )
123 def get_log_display(self) -> str:
124 """Get the formatted log display string."""
125 timestamp = datetime.now().strftime("%H:%M:%S")
126 return f"[{timestamp}] {self.current_log_message}"
128 async def on_click(self, event: Click) -> None:
129 """Handle click on status bar to open Toolong log viewer."""
130 try:
131 await self.open_toolong_viewer()
132 except Exception as e:
133 logger.error(f"Failed to open Toolong viewer: {e}")
134 self.add_log_message(f"Error opening log viewer: {e}", "ERROR")
136 async def open_toolong_viewer(self) -> None:
137 """Open Toolong log viewer in a window using ReactiveLogMonitor."""
138 from openhcs.textual_tui.windows.toolong_window import ToolongWindow
139 from textual.css.query import NoMatches
141 # Determine base log path for subprocess logs (if any)
142 base_log_path = ""
143 if hasattr(self, 'subprocess_base_log_path') and self.subprocess_base_log_path:
144 base_log_path = self.subprocess_base_log_path
146 # Try to find existing window - if it doesn't exist, create new one
147 try:
148 window = self.app.query_one(ToolongWindow)
149 # Window exists, just open it
150 window.open_state = True
151 except NoMatches:
152 # Expected case: window doesn't exist yet, create new one
153 window = ToolongWindow(base_log_path=base_log_path)
154 await self.app.mount(window)
155 window.open_state = True
157 def watch_current_log_message(self, message: str) -> None:
158 """Update the display when current_log_message changes."""
159 self.last_updated = datetime.now().strftime("%H:%M:%S")
160 # Use call_later to defer DOM operations to next event loop cycle
161 self.call_later(self._update_log_display)
163 def _update_log_display(self) -> None:
164 """Update the log display - deferred to avoid blocking reactive watchers."""
165 try:
166 log_display = self.query_one("#log_display")
167 log_display.update(self.get_log_display())
168 except Exception:
169 # Widget might not be mounted yet
170 pass
172 def on_mount(self) -> None:
173 """Set up log handler when widget is mounted."""
174 self.setup_log_handler()
175 self.start_log_file_monitoring()
177 def on_unmount(self) -> None:
178 """Clean up log handler and background thread when widget is unmounted."""
179 self.cleanup_log_handler()
180 self.stop_log_file_monitoring()
182 def setup_log_handler(self) -> None:
183 """Set up the custom log handler to capture OpenHCS logs."""
184 if self.log_handler is None:
185 self.log_handler = TUILogHandler(self)
187 # Add handler to OpenHCS root logger to capture all OpenHCS logs
188 openhcs_logger = logging.getLogger("openhcs")
189 openhcs_logger.addHandler(self.log_handler)
191 logger.debug("Real-time log handler attached to OpenHCS logger")
193 def cleanup_log_handler(self) -> None:
194 """Remove the log handler."""
195 if self.log_handler is not None:
196 openhcs_logger = logging.getLogger("openhcs")
197 openhcs_logger.removeHandler(self.log_handler)
198 self.log_handler = None
199 logger.debug("Real-time log handler removed")
201 def add_log_message(self, message: str, level: str) -> None:
202 """
203 Add a new log message to the status bar.
205 Args:
206 message: Log message to display
207 level: Log level (INFO, WARNING, ERROR, etc.)
208 """
209 # Store in history
210 timestamp = datetime.now().strftime("%H:%M:%S")
211 log_entry = f"[{timestamp}] {message}"
212 self.log_history.append(log_entry)
214 # Update current display
215 self.current_log_message = message
217 def get_recent_logs(self, count: int = 10) -> list:
218 """Get the most recent log messages."""
219 return list(self.log_history)[-count:]
221 def collect_log_files(self) -> List[str]:
222 """Collect all available OpenHCS log files for viewing."""
223 log_files = []
225 # Add main TUI log file if available
226 if self.log_file_path and Path(self.log_file_path).exists():
227 log_files.append(self.log_file_path)
229 # Look for subprocess and worker log files in the log directory
230 if self.log_file_path:
231 log_dir = Path(self.log_file_path).parent
233 # Find subprocess logs
234 subprocess_logs = list(log_dir.glob("openhcs_subprocess_*.log"))
235 log_files.extend(str(f) for f in subprocess_logs)
237 # Find worker logs
238 worker_logs = list(log_dir.glob("openhcs_subprocess_*_worker_*.log"))
239 log_files.extend(str(f) for f in worker_logs)
241 # Remove duplicates and sort
242 log_files = sorted(list(set(log_files)))
244 logger.info(f"Collected {len(log_files)} log files for Toolong viewer")
245 return log_files
247 def start_log_monitoring(self, base_log_path: str) -> None:
248 """Start log monitoring for subprocess with base path tracking."""
249 self.subprocess_base_log_path = base_log_path
250 logger.info(f"StatusBar: Started log monitoring for base path: {base_log_path}")
252 def stop_log_monitoring(self) -> None:
253 """Stop log monitoring and clear subprocess base path."""
254 self.subprocess_base_log_path = None
255 logger.info("StatusBar: Stopped log monitoring")
257 # Legacy methods for compatibility
258 def set_status(self, message: str) -> None:
259 """Legacy method - now adds as INFO log."""
260 self.add_log_message(f"Status: {message}", "INFO")
262 def set_error(self, error_message: str) -> None:
263 """Legacy method - now adds as ERROR log."""
264 self.add_log_message(f"ERROR: {error_message}", "ERROR")
266 def set_info(self, info_message: str) -> None:
267 """Legacy method - now adds as INFO log."""
268 self.add_log_message(f"INFO: {info_message}", "INFO")
270 def clear_status(self) -> None:
271 """Legacy method - now shows ready message."""
272 self.current_log_message = "Ready"
274 def start_log_file_monitoring(self) -> None:
275 """Start monitoring the log file for subprocess updates using Textual worker."""
276 # Get current log file path from the logging system
277 self.log_file_path = self.get_current_log_file_path()
278 if self.log_file_path and Path(self.log_file_path).exists():
279 # Start from end of file to only show new logs
280 self.log_file_position = Path(self.log_file_path).stat().st_size
282 # Stop any existing monitoring
283 self.stop_log_file_monitoring()
285 # Start Textual worker using @work decorated method
286 self.log_monitor_worker = self._log_monitor_worker()
287 logger.debug(f"Started worker log file monitoring: {self.log_file_path}")
289 def stop_log_file_monitoring(self) -> None:
290 """Stop monitoring the log file and cleanup worker."""
291 if self.log_monitor_worker and not self.log_monitor_worker.is_finished:
292 self.log_monitor_worker.cancel()
293 self.log_monitor_worker = None
294 logger.debug("Stopped log file monitoring")
296 def get_current_log_file_path(self) -> Optional[str]:
297 """Get the current log file path from the logging system."""
298 try:
299 # Get the root logger and find the FileHandler
300 root_logger = logging.getLogger()
301 for handler in root_logger.handlers:
302 if isinstance(handler, logging.FileHandler):
303 return handler.baseFilename
305 # Fallback: try to get from openhcs logger
306 openhcs_logger = logging.getLogger("openhcs")
307 for handler in openhcs_logger.handlers:
308 if isinstance(handler, logging.FileHandler):
309 return handler.baseFilename
311 return None
312 except Exception as e:
313 logger.debug(f"Could not determine log file path: {e}")
314 return None
316 @work(thread=True, exclusive=True)
317 def _log_monitor_worker(self) -> None:
318 """Textual worker that monitors log file for changes."""
319 worker = get_current_worker()
321 while not worker.is_cancelled:
322 try:
323 if not self.log_file_path or not Path(self.log_file_path).exists():
324 time.sleep(0.5) # Check every 500ms (reduced frequency)
325 continue
327 # Check for new content
328 current_size = Path(self.log_file_path).stat().st_size
329 if current_size > self.log_file_position:
330 # Read new content
331 with open(self.log_file_path, 'r') as f:
332 f.seek(self.log_file_position)
333 new_lines = f.readlines()
334 self.log_file_position = f.tell()
336 # Process new log lines and batch UI updates
337 messages_to_add = []
338 for line in new_lines:
339 line = line.strip()
340 if line and self.is_subprocess_log(line):
341 # Extract the message part for display
342 message = self.extract_log_message(line)
343 if message:
344 messages_to_add.append(message)
346 # Batch update UI if we have messages and not cancelled
347 if messages_to_add and not worker.is_cancelled:
348 # Thread-safe UI update - batch all messages at once
349 self.app.call_from_thread(self._batch_add_log_messages, messages_to_add)
351 # Sleep for 2 seconds before next check (much reduced frequency)
352 time.sleep(2.0)
354 except Exception as e:
355 logger.debug(f"Error in log monitor worker: {e}")
356 time.sleep(1.0) # Longer sleep on error
358 def _batch_add_log_messages(self, messages: List[str]) -> None:
359 """Add multiple log messages at once to reduce reactive watcher triggers."""
360 if not messages:
361 return
363 # Add all messages to history without triggering reactive updates
364 timestamp = datetime.now().strftime("%H:%M:%S")
365 for message in messages:
366 log_entry = f"[{timestamp}] {message}"
367 self.log_history.append(log_entry)
369 # Only trigger ONE reactive update with the latest message
370 self.current_log_message = messages[-1]
372 def is_subprocess_log(self, line: str) -> bool:
373 """Accept ANY log line - show all subprocess output."""
374 return True # Show everything
376 def extract_log_message(self, line: str) -> Optional[str]:
377 """Extract the message part from any log line, removing timestamp."""
378 try:
379 # Remove timestamp: "2025-06-18 01:00:50,281 - logger - level - message"
380 # Find first " - " and take everything after it
381 if " - " in line:
382 parts = line.split(" - ", 1)
383 if len(parts) > 1:
384 clean_line = parts[1].strip()
385 else:
386 clean_line = line.strip()
387 else:
388 clean_line = line.strip()
390 # Allow more characters (200 instead of 100)
391 if len(clean_line) > 200:
392 return clean_line[:197] + "..."
393 return clean_line
394 except Exception:
395 return line.strip() if line else None