Coverage for openhcs/textual_tui/widgets/status_bar.py: 0.0%

223 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2StatusBar Widget for OpenHCS Textual TUI 

3 

4Bottom status bar with real-time log streaming. 

5Shows live log messages from OpenHCS operations. 

6""" 

7 

8import logging 

9import os 

10import time 

11from datetime import datetime 

12from collections import deque 

13from typing import Optional, List 

14from pathlib import Path 

15 

16from textual import work 

17from textual.app import ComposeResult 

18from textual.reactive import reactive 

19from textual.widgets import Static 

20from textual.widget import Widget 

21from textual.worker import get_current_worker 

22from textual.events import Click 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class TUILogHandler(logging.Handler): 

28 """Custom logging handler that feeds log messages to the TUI status bar with batching.""" 

29 

30 def __init__(self, status_bar: 'StatusBar'): 

31 super().__init__() 

32 self.status_bar = status_bar 

33 self.setLevel(logging.INFO) # Only show INFO and above in status bar 

34 

35 # Format for status bar (compact) 

36 formatter = logging.Formatter('%(levelname)s: %(message)s') 

37 self.setFormatter(formatter) 

38 

39 # Batching to reduce call_from_thread frequency 

40 self.pending_messages = [] 

41 self.last_update_time = 0 

42 self.batch_interval = 0.5 # Batch messages for 500ms 

43 

44 def emit(self, record): 

45 """Emit a log record to the status bar with batching.""" 

46 try: 

47 msg = self.format(record) 

48 

49 # Add to pending messages 

50 self.pending_messages.append((msg, record.levelname)) 

51 

52 # Only update UI if enough time has passed (batching) 

53 current_time = time.time() 

54 if current_time - self.last_update_time >= self.batch_interval: 

55 self._flush_pending_messages() 

56 self.last_update_time = current_time 

57 

58 except (AttributeError, ValueError, TypeError) as e: 

59 # Don't let logging errors crash the app, but log the issue 

60 import logging 

61 logger = logging.getLogger(__name__) 

62 logger.debug(f"Status bar emit error (non-critical): {e}") 

63 

64 def _flush_pending_messages(self): 

65 """Flush all pending messages to the UI.""" 

66 if not self.pending_messages: 

67 return 

68 

69 # Take the most recent message as the current display 

70 latest_msg, latest_level = self.pending_messages[-1] 

71 

72 # Clear pending messages 

73 self.pending_messages.clear() 

74 

75 # Update UI with latest message only (avoid spam) 

76 try: 

77 if hasattr(self.status_bar.app, 'call_from_thread'): 

78 self.status_bar.app.call_from_thread(self.status_bar.add_log_message, latest_msg, latest_level) 

79 else: 

80 # Fallback for direct calls 

81 self.status_bar.add_log_message(latest_msg, latest_level) 

82 except Exception: 

83 pass 

84 

85 

86class StatusBar(Widget): 

87 """ 

88 Top status bar widget with real-time log streaming. 

89 

90 Layout: |————————————————————————— Live Log Messages | 

91 Shows live log messages from OpenHCS operations in real-time. 

92 """ 

93 

94 # Reactive state 

95 current_log_message = reactive("Ready") 

96 last_updated = reactive("") 

97 

98 def __init__(self, max_history: int = 100): 

99 """Initialize the status bar with log streaming.""" 

100 super().__init__() 

101 self.last_updated = datetime.now().strftime("%H:%M:%S") 

102 self.max_history = max_history 

103 self.log_history = deque(maxlen=max_history) # Keep recent log messages 

104 self.log_handler: Optional[TUILogHandler] = None 

105 

106 # Log file monitoring for subprocess logs - using Textual workers 

107 self.log_file_path: Optional[str] = None 

108 self.log_file_position: int = 0 

109 self.log_monitor_worker = None 

110 self.subprocess_base_log_path: Optional[str] = None # For ReactiveLogMonitor 

111 

112 logger.debug("StatusBar initialized with real-time logging") 

113 

114 def compose(self) -> ComposeResult: 

115 """Compose the status bar layout.""" 

116 # Make the status bar clickable to open Toolong 

117 yield Static( 

118 self.get_log_display() + " 📋", 

119 id="log_display", 

120 markup=False, 

121 classes="clickable-status" 

122 ) 

123 

124 def get_log_display(self) -> str: 

125 """Get the formatted log display string.""" 

126 timestamp = datetime.now().strftime("%H:%M:%S") 

127 return f"[{timestamp}] {self.current_log_message}" 

128 

129 async def on_click(self, event: Click) -> None: 

130 """Handle click on status bar to open Toolong log viewer.""" 

131 try: 

132 await self.open_toolong_viewer() 

133 except Exception as e: 

134 logger.error(f"Failed to open Toolong viewer: {e}") 

135 self.add_log_message(f"Error opening log viewer: {e}", "ERROR") 

136 

137 async def open_toolong_viewer(self) -> None: 

138 """Open Toolong log viewer in a window using ReactiveLogMonitor.""" 

139 from openhcs.textual_tui.windows.toolong_window import ToolongWindow 

140 from textual.css.query import NoMatches 

141 

142 # Determine base log path for subprocess logs (if any) 

143 base_log_path = "" 

144 if hasattr(self, 'subprocess_base_log_path') and self.subprocess_base_log_path: 

145 base_log_path = self.subprocess_base_log_path 

146 

147 # Try to find existing window - if it doesn't exist, create new one 

148 try: 

149 window = self.app.query_one(ToolongWindow) 

150 # Window exists, just open it 

151 window.open_state = True 

152 except NoMatches: 

153 # Expected case: window doesn't exist yet, create new one 

154 window = ToolongWindow(base_log_path=base_log_path) 

155 await self.app.mount(window) 

156 window.open_state = True 

157 

158 def watch_current_log_message(self, message: str) -> None: 

159 """Update the display when current_log_message changes.""" 

160 self.last_updated = datetime.now().strftime("%H:%M:%S") 

161 # Use call_later to defer DOM operations to next event loop cycle 

162 self.call_later(self._update_log_display) 

163 

164 def _update_log_display(self) -> None: 

165 """Update the log display - deferred to avoid blocking reactive watchers.""" 

166 try: 

167 log_display = self.query_one("#log_display") 

168 log_display.update(self.get_log_display()) 

169 except Exception: 

170 # Widget might not be mounted yet 

171 pass 

172 

173 def on_mount(self) -> None: 

174 """Set up log handler when widget is mounted.""" 

175 self.setup_log_handler() 

176 self.start_log_file_monitoring() 

177 

178 def on_unmount(self) -> None: 

179 """Clean up log handler and background thread when widget is unmounted.""" 

180 self.cleanup_log_handler() 

181 self.stop_log_file_monitoring() 

182 

183 def setup_log_handler(self) -> None: 

184 """Set up the custom log handler to capture OpenHCS logs.""" 

185 if self.log_handler is None: 

186 self.log_handler = TUILogHandler(self) 

187 

188 # Add handler to OpenHCS root logger to capture all OpenHCS logs 

189 openhcs_logger = logging.getLogger("openhcs") 

190 openhcs_logger.addHandler(self.log_handler) 

191 

192 logger.debug("Real-time log handler attached to OpenHCS logger") 

193 

194 def cleanup_log_handler(self) -> None: 

195 """Remove the log handler.""" 

196 if self.log_handler is not None: 

197 openhcs_logger = logging.getLogger("openhcs") 

198 openhcs_logger.removeHandler(self.log_handler) 

199 self.log_handler = None 

200 logger.debug("Real-time log handler removed") 

201 

202 def add_log_message(self, message: str, level: str) -> None: 

203 """ 

204 Add a new log message to the status bar. 

205 

206 Args: 

207 message: Log message to display 

208 level: Log level (INFO, WARNING, ERROR, etc.) 

209 """ 

210 # Store in history 

211 timestamp = datetime.now().strftime("%H:%M:%S") 

212 log_entry = f"[{timestamp}] {message}" 

213 self.log_history.append(log_entry) 

214 

215 # Update current display 

216 self.current_log_message = message 

217 

218 def get_recent_logs(self, count: int = 10) -> list: 

219 """Get the most recent log messages.""" 

220 return list(self.log_history)[-count:] 

221 

222 def collect_log_files(self) -> List[str]: 

223 """Collect all available OpenHCS log files for viewing.""" 

224 log_files = [] 

225 

226 # Add main TUI log file if available 

227 if self.log_file_path and Path(self.log_file_path).exists(): 

228 log_files.append(self.log_file_path) 

229 

230 # Look for subprocess and worker log files in the log directory 

231 if self.log_file_path: 

232 log_dir = Path(self.log_file_path).parent 

233 

234 # Find subprocess logs 

235 subprocess_logs = list(log_dir.glob("openhcs_subprocess_*.log")) 

236 log_files.extend(str(f) for f in subprocess_logs) 

237 

238 # Find worker logs 

239 worker_logs = list(log_dir.glob("openhcs_subprocess_*_worker_*.log")) 

240 log_files.extend(str(f) for f in worker_logs) 

241 

242 # Remove duplicates and sort 

243 log_files = sorted(list(set(log_files))) 

244 

245 logger.info(f"Collected {len(log_files)} log files for Toolong viewer") 

246 return log_files 

247 

248 def start_log_monitoring(self, base_log_path: str) -> None: 

249 """Start log monitoring for subprocess with base path tracking.""" 

250 self.subprocess_base_log_path = base_log_path 

251 logger.info(f"StatusBar: Started log monitoring for base path: {base_log_path}") 

252 

253 def stop_log_monitoring(self) -> None: 

254 """Stop log monitoring and clear subprocess base path.""" 

255 self.subprocess_base_log_path = None 

256 logger.info("StatusBar: Stopped log monitoring") 

257 

258 # Legacy methods for compatibility 

259 def set_status(self, message: str) -> None: 

260 """Legacy method - now adds as INFO log.""" 

261 self.add_log_message(f"Status: {message}", "INFO") 

262 

263 def set_error(self, error_message: str) -> None: 

264 """Legacy method - now adds as ERROR log.""" 

265 self.add_log_message(f"ERROR: {error_message}", "ERROR") 

266 

267 def set_info(self, info_message: str) -> None: 

268 """Legacy method - now adds as INFO log.""" 

269 self.add_log_message(f"INFO: {info_message}", "INFO") 

270 

271 def clear_status(self) -> None: 

272 """Legacy method - now shows ready message.""" 

273 self.current_log_message = "Ready" 

274 

275 def start_log_file_monitoring(self) -> None: 

276 """Start monitoring the log file for subprocess updates using Textual worker.""" 

277 # Get current log file path from the logging system 

278 self.log_file_path = self.get_current_log_file_path() 

279 if self.log_file_path and Path(self.log_file_path).exists(): 

280 # Start from end of file to only show new logs 

281 self.log_file_position = Path(self.log_file_path).stat().st_size 

282 

283 # Stop any existing monitoring 

284 self.stop_log_file_monitoring() 

285 

286 # Start Textual worker using @work decorated method 

287 self.log_monitor_worker = self._log_monitor_worker() 

288 logger.debug(f"Started worker log file monitoring: {self.log_file_path}") 

289 

290 def stop_log_file_monitoring(self) -> None: 

291 """Stop monitoring the log file and cleanup worker.""" 

292 if self.log_monitor_worker and not self.log_monitor_worker.is_finished: 

293 self.log_monitor_worker.cancel() 

294 self.log_monitor_worker = None 

295 logger.debug("Stopped log file monitoring") 

296 

297 def get_current_log_file_path(self) -> Optional[str]: 

298 """Get the current log file path from the logging system.""" 

299 try: 

300 # Get the root logger and find the FileHandler 

301 root_logger = logging.getLogger() 

302 for handler in root_logger.handlers: 

303 if isinstance(handler, logging.FileHandler): 

304 return handler.baseFilename 

305 

306 # Fallback: try to get from openhcs logger 

307 openhcs_logger = logging.getLogger("openhcs") 

308 for handler in openhcs_logger.handlers: 

309 if isinstance(handler, logging.FileHandler): 

310 return handler.baseFilename 

311 

312 return None 

313 except Exception as e: 

314 logger.debug(f"Could not determine log file path: {e}") 

315 return None 

316 

317 @work(thread=True, exclusive=True) 

318 def _log_monitor_worker(self) -> None: 

319 """Textual worker that monitors log file for changes.""" 

320 worker = get_current_worker() 

321 

322 while not worker.is_cancelled: 

323 try: 

324 if not self.log_file_path or not Path(self.log_file_path).exists(): 

325 time.sleep(0.5) # Check every 500ms (reduced frequency) 

326 continue 

327 

328 # Check for new content 

329 current_size = Path(self.log_file_path).stat().st_size 

330 if current_size > self.log_file_position: 

331 # Read new content 

332 with open(self.log_file_path, 'r') as f: 

333 f.seek(self.log_file_position) 

334 new_lines = f.readlines() 

335 self.log_file_position = f.tell() 

336 

337 # Process new log lines and batch UI updates 

338 messages_to_add = [] 

339 for line in new_lines: 

340 line = line.strip() 

341 if line and self.is_subprocess_log(line): 

342 # Extract the message part for display 

343 message = self.extract_log_message(line) 

344 if message: 

345 messages_to_add.append(message) 

346 

347 # Batch update UI if we have messages and not cancelled 

348 if messages_to_add and not worker.is_cancelled: 

349 # Thread-safe UI update - batch all messages at once 

350 self.app.call_from_thread(self._batch_add_log_messages, messages_to_add) 

351 

352 # Sleep for 2 seconds before next check (much reduced frequency) 

353 time.sleep(2.0) 

354 

355 except Exception as e: 

356 logger.debug(f"Error in log monitor worker: {e}") 

357 time.sleep(1.0) # Longer sleep on error 

358 

359 def _batch_add_log_messages(self, messages: List[str]) -> None: 

360 """Add multiple log messages at once to reduce reactive watcher triggers.""" 

361 if not messages: 

362 return 

363 

364 # Add all messages to history without triggering reactive updates 

365 timestamp = datetime.now().strftime("%H:%M:%S") 

366 for message in messages: 

367 log_entry = f"[{timestamp}] {message}" 

368 self.log_history.append(log_entry) 

369 

370 # Only trigger ONE reactive update with the latest message 

371 self.current_log_message = messages[-1] 

372 

373 def is_subprocess_log(self, line: str) -> bool: 

374 """Accept ANY log line - show all subprocess output.""" 

375 return True # Show everything 

376 

377 def extract_log_message(self, line: str) -> Optional[str]: 

378 """Extract the message part from any log line, removing timestamp.""" 

379 try: 

380 # Remove timestamp: "2025-06-18 01:00:50,281 - logger - level - message" 

381 # Find first " - " and take everything after it 

382 if " - " in line: 

383 parts = line.split(" - ", 1) 

384 if len(parts) > 1: 

385 clean_line = parts[1].strip() 

386 else: 

387 clean_line = line.strip() 

388 else: 

389 clean_line = line.strip() 

390 

391 # Allow more characters (200 instead of 100) 

392 if len(clean_line) > 200: 

393 return clean_line[:197] + "..." 

394 return clean_line 

395 except Exception: 

396 return line.strip() if line else None