Coverage for openhcs/textual_tui/widgets/status_bar.py: 0.0%

222 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2StatusBar Widget for OpenHCS Textual TUI 

3 

4Bottom status bar with real-time log streaming. 

5Shows live log messages from OpenHCS operations. 

6""" 

7 

8import logging 

9import time 

10from datetime import datetime 

11from collections import deque 

12from typing import Optional, List 

13from pathlib import Path 

14 

15from textual import work 

16from textual.app import ComposeResult 

17from textual.reactive import reactive 

18from textual.widgets import Static 

19from textual.widget import Widget 

20from textual.worker import get_current_worker 

21from textual.events import Click 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26class TUILogHandler(logging.Handler): 

27 """Custom logging handler that feeds log messages to the TUI status bar with batching.""" 

28 

29 def __init__(self, status_bar: 'StatusBar'): 

30 super().__init__() 

31 self.status_bar = status_bar 

32 self.setLevel(logging.INFO) # Only show INFO and above in status bar 

33 

34 # Format for status bar (compact) 

35 formatter = logging.Formatter('%(levelname)s: %(message)s') 

36 self.setFormatter(formatter) 

37 

38 # Batching to reduce call_from_thread frequency 

39 self.pending_messages = [] 

40 self.last_update_time = 0 

41 self.batch_interval = 0.5 # Batch messages for 500ms 

42 

43 def emit(self, record): 

44 """Emit a log record to the status bar with batching.""" 

45 try: 

46 msg = self.format(record) 

47 

48 # Add to pending messages 

49 self.pending_messages.append((msg, record.levelname)) 

50 

51 # Only update UI if enough time has passed (batching) 

52 current_time = time.time() 

53 if current_time - self.last_update_time >= self.batch_interval: 

54 self._flush_pending_messages() 

55 self.last_update_time = current_time 

56 

57 except (AttributeError, ValueError, TypeError) as e: 

58 # Don't let logging errors crash the app, but log the issue 

59 import logging 

60 logger = logging.getLogger(__name__) 

61 logger.debug(f"Status bar emit error (non-critical): {e}") 

62 

63 def _flush_pending_messages(self): 

64 """Flush all pending messages to the UI.""" 

65 if not self.pending_messages: 

66 return 

67 

68 # Take the most recent message as the current display 

69 latest_msg, latest_level = self.pending_messages[-1] 

70 

71 # Clear pending messages 

72 self.pending_messages.clear() 

73 

74 # Update UI with latest message only (avoid spam) 

75 try: 

76 if hasattr(self.status_bar.app, 'call_from_thread'): 

77 self.status_bar.app.call_from_thread(self.status_bar.add_log_message, latest_msg, latest_level) 

78 else: 

79 # Fallback for direct calls 

80 self.status_bar.add_log_message(latest_msg, latest_level) 

81 except Exception: 

82 pass 

83 

84 

85class StatusBar(Widget): 

86 """ 

87 Top status bar widget with real-time log streaming. 

88 

89 Layout: |————————————————————————— Live Log Messages | 

90 Shows live log messages from OpenHCS operations in real-time. 

91 """ 

92 

93 # Reactive state 

94 current_log_message = reactive("Ready") 

95 last_updated = reactive("") 

96 

97 def __init__(self, max_history: int = 100): 

98 """Initialize the status bar with log streaming.""" 

99 super().__init__() 

100 self.last_updated = datetime.now().strftime("%H:%M:%S") 

101 self.max_history = max_history 

102 self.log_history = deque(maxlen=max_history) # Keep recent log messages 

103 self.log_handler: Optional[TUILogHandler] = None 

104 

105 # Log file monitoring for subprocess logs - using Textual workers 

106 self.log_file_path: Optional[str] = None 

107 self.log_file_position: int = 0 

108 self.log_monitor_worker = None 

109 self.subprocess_base_log_path: Optional[str] = None # For ReactiveLogMonitor 

110 

111 logger.debug("StatusBar initialized with real-time logging") 

112 

113 def compose(self) -> ComposeResult: 

114 """Compose the status bar layout.""" 

115 # Make the status bar clickable to open Toolong 

116 yield Static( 

117 self.get_log_display() + " 📋", 

118 id="log_display", 

119 markup=False, 

120 classes="clickable-status" 

121 ) 

122 

123 def get_log_display(self) -> str: 

124 """Get the formatted log display string.""" 

125 timestamp = datetime.now().strftime("%H:%M:%S") 

126 return f"[{timestamp}] {self.current_log_message}" 

127 

128 async def on_click(self, event: Click) -> None: 

129 """Handle click on status bar to open Toolong log viewer.""" 

130 try: 

131 await self.open_toolong_viewer() 

132 except Exception as e: 

133 logger.error(f"Failed to open Toolong viewer: {e}") 

134 self.add_log_message(f"Error opening log viewer: {e}", "ERROR") 

135 

136 async def open_toolong_viewer(self) -> None: 

137 """Open Toolong log viewer in a window using ReactiveLogMonitor.""" 

138 from openhcs.textual_tui.windows.toolong_window import ToolongWindow 

139 from textual.css.query import NoMatches 

140 

141 # Determine base log path for subprocess logs (if any) 

142 base_log_path = "" 

143 if hasattr(self, 'subprocess_base_log_path') and self.subprocess_base_log_path: 

144 base_log_path = self.subprocess_base_log_path 

145 

146 # Try to find existing window - if it doesn't exist, create new one 

147 try: 

148 window = self.app.query_one(ToolongWindow) 

149 # Window exists, just open it 

150 window.open_state = True 

151 except NoMatches: 

152 # Expected case: window doesn't exist yet, create new one 

153 window = ToolongWindow(base_log_path=base_log_path) 

154 await self.app.mount(window) 

155 window.open_state = True 

156 

157 def watch_current_log_message(self, message: str) -> None: 

158 """Update the display when current_log_message changes.""" 

159 self.last_updated = datetime.now().strftime("%H:%M:%S") 

160 # Use call_later to defer DOM operations to next event loop cycle 

161 self.call_later(self._update_log_display) 

162 

163 def _update_log_display(self) -> None: 

164 """Update the log display - deferred to avoid blocking reactive watchers.""" 

165 try: 

166 log_display = self.query_one("#log_display") 

167 log_display.update(self.get_log_display()) 

168 except Exception: 

169 # Widget might not be mounted yet 

170 pass 

171 

172 def on_mount(self) -> None: 

173 """Set up log handler when widget is mounted.""" 

174 self.setup_log_handler() 

175 self.start_log_file_monitoring() 

176 

177 def on_unmount(self) -> None: 

178 """Clean up log handler and background thread when widget is unmounted.""" 

179 self.cleanup_log_handler() 

180 self.stop_log_file_monitoring() 

181 

182 def setup_log_handler(self) -> None: 

183 """Set up the custom log handler to capture OpenHCS logs.""" 

184 if self.log_handler is None: 

185 self.log_handler = TUILogHandler(self) 

186 

187 # Add handler to OpenHCS root logger to capture all OpenHCS logs 

188 openhcs_logger = logging.getLogger("openhcs") 

189 openhcs_logger.addHandler(self.log_handler) 

190 

191 logger.debug("Real-time log handler attached to OpenHCS logger") 

192 

193 def cleanup_log_handler(self) -> None: 

194 """Remove the log handler.""" 

195 if self.log_handler is not None: 

196 openhcs_logger = logging.getLogger("openhcs") 

197 openhcs_logger.removeHandler(self.log_handler) 

198 self.log_handler = None 

199 logger.debug("Real-time log handler removed") 

200 

201 def add_log_message(self, message: str, level: str) -> None: 

202 """ 

203 Add a new log message to the status bar. 

204 

205 Args: 

206 message: Log message to display 

207 level: Log level (INFO, WARNING, ERROR, etc.) 

208 """ 

209 # Store in history 

210 timestamp = datetime.now().strftime("%H:%M:%S") 

211 log_entry = f"[{timestamp}] {message}" 

212 self.log_history.append(log_entry) 

213 

214 # Update current display 

215 self.current_log_message = message 

216 

217 def get_recent_logs(self, count: int = 10) -> list: 

218 """Get the most recent log messages.""" 

219 return list(self.log_history)[-count:] 

220 

221 def collect_log_files(self) -> List[str]: 

222 """Collect all available OpenHCS log files for viewing.""" 

223 log_files = [] 

224 

225 # Add main TUI log file if available 

226 if self.log_file_path and Path(self.log_file_path).exists(): 

227 log_files.append(self.log_file_path) 

228 

229 # Look for subprocess and worker log files in the log directory 

230 if self.log_file_path: 

231 log_dir = Path(self.log_file_path).parent 

232 

233 # Find subprocess logs 

234 subprocess_logs = list(log_dir.glob("openhcs_subprocess_*.log")) 

235 log_files.extend(str(f) for f in subprocess_logs) 

236 

237 # Find worker logs 

238 worker_logs = list(log_dir.glob("openhcs_subprocess_*_worker_*.log")) 

239 log_files.extend(str(f) for f in worker_logs) 

240 

241 # Remove duplicates and sort 

242 log_files = sorted(list(set(log_files))) 

243 

244 logger.info(f"Collected {len(log_files)} log files for Toolong viewer") 

245 return log_files 

246 

247 def start_log_monitoring(self, base_log_path: str) -> None: 

248 """Start log monitoring for subprocess with base path tracking.""" 

249 self.subprocess_base_log_path = base_log_path 

250 logger.info(f"StatusBar: Started log monitoring for base path: {base_log_path}") 

251 

252 def stop_log_monitoring(self) -> None: 

253 """Stop log monitoring and clear subprocess base path.""" 

254 self.subprocess_base_log_path = None 

255 logger.info("StatusBar: Stopped log monitoring") 

256 

257 # Legacy methods for compatibility 

258 def set_status(self, message: str) -> None: 

259 """Legacy method - now adds as INFO log.""" 

260 self.add_log_message(f"Status: {message}", "INFO") 

261 

262 def set_error(self, error_message: str) -> None: 

263 """Legacy method - now adds as ERROR log.""" 

264 self.add_log_message(f"ERROR: {error_message}", "ERROR") 

265 

266 def set_info(self, info_message: str) -> None: 

267 """Legacy method - now adds as INFO log.""" 

268 self.add_log_message(f"INFO: {info_message}", "INFO") 

269 

270 def clear_status(self) -> None: 

271 """Legacy method - now shows ready message.""" 

272 self.current_log_message = "Ready" 

273 

274 def start_log_file_monitoring(self) -> None: 

275 """Start monitoring the log file for subprocess updates using Textual worker.""" 

276 # Get current log file path from the logging system 

277 self.log_file_path = self.get_current_log_file_path() 

278 if self.log_file_path and Path(self.log_file_path).exists(): 

279 # Start from end of file to only show new logs 

280 self.log_file_position = Path(self.log_file_path).stat().st_size 

281 

282 # Stop any existing monitoring 

283 self.stop_log_file_monitoring() 

284 

285 # Start Textual worker using @work decorated method 

286 self.log_monitor_worker = self._log_monitor_worker() 

287 logger.debug(f"Started worker log file monitoring: {self.log_file_path}") 

288 

289 def stop_log_file_monitoring(self) -> None: 

290 """Stop monitoring the log file and cleanup worker.""" 

291 if self.log_monitor_worker and not self.log_monitor_worker.is_finished: 

292 self.log_monitor_worker.cancel() 

293 self.log_monitor_worker = None 

294 logger.debug("Stopped log file monitoring") 

295 

296 def get_current_log_file_path(self) -> Optional[str]: 

297 """Get the current log file path from the logging system.""" 

298 try: 

299 # Get the root logger and find the FileHandler 

300 root_logger = logging.getLogger() 

301 for handler in root_logger.handlers: 

302 if isinstance(handler, logging.FileHandler): 

303 return handler.baseFilename 

304 

305 # Fallback: try to get from openhcs logger 

306 openhcs_logger = logging.getLogger("openhcs") 

307 for handler in openhcs_logger.handlers: 

308 if isinstance(handler, logging.FileHandler): 

309 return handler.baseFilename 

310 

311 return None 

312 except Exception as e: 

313 logger.debug(f"Could not determine log file path: {e}") 

314 return None 

315 

316 @work(thread=True, exclusive=True) 

317 def _log_monitor_worker(self) -> None: 

318 """Textual worker that monitors log file for changes.""" 

319 worker = get_current_worker() 

320 

321 while not worker.is_cancelled: 

322 try: 

323 if not self.log_file_path or not Path(self.log_file_path).exists(): 

324 time.sleep(0.5) # Check every 500ms (reduced frequency) 

325 continue 

326 

327 # Check for new content 

328 current_size = Path(self.log_file_path).stat().st_size 

329 if current_size > self.log_file_position: 

330 # Read new content 

331 with open(self.log_file_path, 'r') as f: 

332 f.seek(self.log_file_position) 

333 new_lines = f.readlines() 

334 self.log_file_position = f.tell() 

335 

336 # Process new log lines and batch UI updates 

337 messages_to_add = [] 

338 for line in new_lines: 

339 line = line.strip() 

340 if line and self.is_subprocess_log(line): 

341 # Extract the message part for display 

342 message = self.extract_log_message(line) 

343 if message: 

344 messages_to_add.append(message) 

345 

346 # Batch update UI if we have messages and not cancelled 

347 if messages_to_add and not worker.is_cancelled: 

348 # Thread-safe UI update - batch all messages at once 

349 self.app.call_from_thread(self._batch_add_log_messages, messages_to_add) 

350 

351 # Sleep for 2 seconds before next check (much reduced frequency) 

352 time.sleep(2.0) 

353 

354 except Exception as e: 

355 logger.debug(f"Error in log monitor worker: {e}") 

356 time.sleep(1.0) # Longer sleep on error 

357 

358 def _batch_add_log_messages(self, messages: List[str]) -> None: 

359 """Add multiple log messages at once to reduce reactive watcher triggers.""" 

360 if not messages: 

361 return 

362 

363 # Add all messages to history without triggering reactive updates 

364 timestamp = datetime.now().strftime("%H:%M:%S") 

365 for message in messages: 

366 log_entry = f"[{timestamp}] {message}" 

367 self.log_history.append(log_entry) 

368 

369 # Only trigger ONE reactive update with the latest message 

370 self.current_log_message = messages[-1] 

371 

372 def is_subprocess_log(self, line: str) -> bool: 

373 """Accept ANY log line - show all subprocess output.""" 

374 return True # Show everything 

375 

376 def extract_log_message(self, line: str) -> Optional[str]: 

377 """Extract the message part from any log line, removing timestamp.""" 

378 try: 

379 # Remove timestamp: "2025-06-18 01:00:50,281 - logger - level - message" 

380 # Find first " - " and take everything after it 

381 if " - " in line: 

382 parts = line.split(" - ", 1) 

383 if len(parts) > 1: 

384 clean_line = parts[1].strip() 

385 else: 

386 clean_line = line.strip() 

387 else: 

388 clean_line = line.strip() 

389 

390 # Allow more characters (200 instead of 100) 

391 if len(clean_line) > 200: 

392 return clean_line[:197] + "..." 

393 return clean_line 

394 except Exception: 

395 return line.strip() if line else None