Coverage for openhcs/textual_tui/windows/terminal_window.py: 0.0%

270 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1"""Terminal window for OpenHCS Textual TUI.""" 

2 

3from textual.app import ComposeResult 

4from textual.widgets import Static 

5from textual.containers import Vertical 

6 

7from openhcs.textual_tui.windows.base_window import BaseOpenHCSWindow 

8 

9# Fix textual-terminal compatibility with Textual 3.5.0+ 

10import textual.app 

11from textual.color import ANSI_COLORS 

12if not hasattr(textual.app, 'DEFAULT_COLORS'): 

13 textual.app.DEFAULT_COLORS = ANSI_COLORS 

14 

15# Import textual-terminal with compatibility fix 

16try: 

17 from textual_terminal import Terminal 

18 from textual_terminal._terminal import TerminalEmulator 

19 TERMINAL_AVAILABLE = True 

20 

21 # Import our extracted terminal enhancements 

22 from openhcs.textual_tui.services.terminal_enhancements import terminal_enhancements 

23 

24 # Import Gate One terminal for enhanced features 

25 from openhcs.textual_tui.services.terminal import Terminal as GateOneTerminal 

26 

27 # Monkey-patch Terminal to track cursor styles 

28 _original_terminal_recv = Terminal.recv 

29 

30 async def _patched_recv(self): 

31 """Patched recv that tracks cursor style and renders appropriately.""" 

32 import re 

33 import asyncio 

34 

35 # Initialize cursor_style if not present 

36 if not hasattr(self, 'cursor_style'): 

37 self.cursor_style = 1 # Default block cursor 

38 

39 try: 

40 while True: 

41 message = await self.recv_queue.get() 

42 cmd = message[0] 

43 if cmd == "setup": 

44 await self.send_queue.put(["set_size", self.nrow, self.ncol]) 

45 elif cmd == "stdout": 

46 chars = message[1] 

47 

48 # Track cursor style sequences - comprehensive detection 

49 

50 # Debug: Look for cursor sequences specifically 

51 if ' q' in chars or 'q' in chars: 

52 print(f"DEBUG: Raw chars containing 'q': {repr(chars)}") 

53 

54 # Standard DECSCUSR sequences: ESC [ Ps q (vim style) 

55 cursor_style_pattern = re.compile(r'\x1b\[([0-6]) q') 

56 for match in cursor_style_pattern.finditer(chars): 

57 style_num = int(match.group(1)) 

58 old_style = self.cursor_style 

59 if style_num in [0, 1, 2]: # 0=default, 1=blinking block, 2=steady block 

60 self.cursor_style = 1 # block 

61 elif style_num in [3, 4]: # 3=blinking underline, 4=steady underline 

62 self.cursor_style = 3 # underline 

63 elif style_num in [5, 6]: # 5=blinking bar, 6=steady bar 

64 self.cursor_style = 5 # bar 

65 print(f"DEBUG: VIM cursor style changed from {old_style} to {self.cursor_style} (sequence: {repr(match.group(0))})") 

66 

67 # Linux terminal sequences: ESC [ ? Ps c 

68 linux_cursor_pattern = re.compile(r'\x1b\[\?([0-9]+)c') 

69 for match in linux_cursor_pattern.finditer(chars): 

70 style_num = int(match.group(1)) 

71 old_style = self.cursor_style 

72 if style_num == 0: 

73 self.cursor_style = 1 # normal block 

74 elif style_num == 1: 

75 self.cursor_style = 0 # invisible 

76 elif style_num == 8: 

77 self.cursor_style = 3 # very visible (underline) 

78 print(f"DEBUG: Linux cursor style changed from {old_style} to {self.cursor_style} (sequence: {match.group(0)})") 

79 

80 # Vim-style cursor sequences: ESC ] 50 ; CursorShape=N BEL 

81 vim_cursor_pattern = re.compile(r'\x1b\]50;CursorShape=([0-2])\x07') 

82 for match in vim_cursor_pattern.finditer(chars): 

83 shape_num = int(match.group(1)) 

84 old_style = self.cursor_style 

85 if shape_num == 0: 

86 self.cursor_style = 1 # block 

87 elif shape_num == 1: 

88 self.cursor_style = 5 # bar 

89 elif shape_num == 2: 

90 self.cursor_style = 3 # underline 

91 print(f"DEBUG: Vim cursor style changed from {old_style} to {self.cursor_style} (sequence: {match.group(0)})") 

92 

93 # Alternative vim sequences: ESC ] 12 ; color BEL (cursor color) 

94 # We don't change style but log it 

95 vim_color_pattern = re.compile(r'\x1b\]12;[^\\x07]*\x07') 

96 for match in vim_color_pattern.finditer(chars): 

97 print(f"DEBUG: Vim cursor color sequence detected: {match.group(0)}") 

98 

99 # Cursor visibility sequences 

100 cursor_visibility_pattern = re.compile(r'\x1b\[\?25([hl])') 

101 for match in cursor_visibility_pattern.finditer(chars): 

102 visibility = match.group(1) 

103 if visibility == 'l': # hide cursor 

104 print("DEBUG: Cursor hidden") 

105 elif visibility == 'h': # show cursor 

106 print("DEBUG: Cursor shown") 

107 

108 # Handle mouse tracking (from original) 

109 _re_ansi_sequence = re.compile(r"(\x1b\[\??[\d;]*[a-zA-Z])") 

110 DECSET_PREFIX = "\x1b[?" 

111 

112 for sep_match in re.finditer(_re_ansi_sequence, chars): 

113 sequence = sep_match.group(0) 

114 if sequence.startswith(DECSET_PREFIX): 

115 parameters = sequence.removeprefix(DECSET_PREFIX).split(";") 

116 if "1000h" in parameters: 

117 self.mouse_tracking = True 

118 if "1000l" in parameters: 

119 self.mouse_tracking = False 

120 

121 try: 

122 self.stream.feed(chars) 

123 except TypeError as error: 

124 from textual import log 

125 log.warning("could not feed:", error) 

126 

127 # Custom display building with cursor styles 

128 from rich.text import Text 

129 lines = [] 

130 for y in range(self._screen.lines): 

131 line_text = Text() 

132 line = self._screen.buffer[y] 

133 style_change_pos: int = 0 

134 for x in range(self._screen.columns): 

135 char = line[x] 

136 

137 # Check if this is the cursor position 

138 is_cursor = ( 

139 self._screen.cursor.x == x 

140 and self._screen.cursor.y == y 

141 and not self._screen.cursor.hidden 

142 ) 

143 

144 # Modify character for cursor styles 

145 char_data = char.data 

146 if is_cursor: 

147 cursor_style = getattr(self, 'cursor_style', 1) 

148 if cursor_style == 5: # bar cursor 

149 # Use a thin left-aligned vertical bar 

150 char_data = "▎" # Left one-quarter block (slightly thicker but still thin) 

151 elif cursor_style == 3: # underline cursor 

152 char_data = char.data # Keep original character 

153 

154 line_text.append(char_data) 

155 

156 # Handle styling (from original) 

157 if x > 0: 

158 last_char = line[x - 1] 

159 if not self.char_style_cmp(char, last_char) or x == self._screen.columns - 1: 

160 last_style = self.char_rich_style(last_char) 

161 line_text.stylize(last_style, style_change_pos, x + 1) 

162 style_change_pos = x 

163 

164 # Apply cursor styling 

165 if is_cursor: 

166 cursor_style = getattr(self, 'cursor_style', 1) 

167 if cursor_style == 1: # block cursor 

168 line_text.stylize("reverse blink", x, x + 1) 

169 elif cursor_style == 3: # underline cursor 

170 line_text.stylize("underline bold blink", x, x + 1) 

171 elif cursor_style == 5: # bar cursor 

172 # Make the thin bar visible and blinking 

173 line_text.stylize("bold blink", x, x + 1) 

174 

175 lines.append(line_text) 

176 

177 from textual_terminal._terminal import TerminalDisplay 

178 self._display = TerminalDisplay(lines) 

179 self.refresh() 

180 

181 elif cmd == "disconnect": 

182 self.stop() 

183 except asyncio.CancelledError: 

184 pass 

185 

186 # Apply the monkey patch 

187 Terminal.recv = _patched_recv 

188 

189 # Monkey-patch TerminalEmulator for better environment 

190 _original_open_terminal = TerminalEmulator.open_terminal 

191 

192 def _patched_open_terminal(self, command: str): 

193 """Patched version that uses linux terminal type for proper Unicode box-drawing.""" 

194 import pty 

195 import shlex 

196 import os 

197 from pathlib import Path 

198 

199 self.pid, fd = pty.fork() 

200 if self.pid == 0: 

201 argv = shlex.split(command) 

202 # Use linux terminal type - this makes programs output Unicode directly 

203 # instead of VT100 line-drawing sequences that need translation 

204 env = dict( 

205 TERM="linux", # Critical: linux terminal outputs Unicode box-drawing directly 

206 LC_ALL="en_US.UTF-8", 

207 LC_CTYPE="en_US.UTF-8", 

208 LANG="en_US.UTF-8", 

209 HOME=str(Path.home()), 

210 ) 

211 # Add current PATH and other important vars 

212 for key in ['PATH', 'USER', 'SHELL']: 

213 if key in os.environ: 

214 env[key] = os.environ[key] 

215 os.execvpe(argv[0], argv, env) 

216 return fd 

217 

218 # Apply the monkey patch 

219 TerminalEmulator.open_terminal = _patched_open_terminal 

220 

221except ImportError: 

222 TERMINAL_AVAILABLE = False 

223 # Create a placeholder Terminal class 

224 class Terminal(Static): 

225 def __init__(self, command=None, **kwargs): 

226 super().__init__("Terminal not available\n\nInstall textual-terminal:\npip install textual-terminal", **kwargs) 

227 self.command = command 

228 

229 def clear(self): 

230 pass 

231 

232 def write(self, text): 

233 pass 

234 

235 

236class TerminalWindow(BaseOpenHCSWindow): 

237 """Terminal window using textual-window system with embedded terminal.""" 

238 

239 DEFAULT_CSS = """ 

240 TerminalWindow { 

241 width: 80; height: 24; 

242 min-width: 80; min-height: 20; 

243 } 

244 TerminalWindow #content_pane { 

245 padding: 0; 

246 } 

247 TerminalWindow #terminal { 

248 height: 1fr; 

249 width: 100%; 

250 } 

251 """ 

252 

253 def __init__(self, shell_command: str = None, **kwargs): 

254 """ 

255 Initialize terminal window. 

256 

257 Args: 

258 shell_command: Optional command to run (defaults to current shell) 

259 """ 

260 # Get shell before calling super() so we can use it in title 

261 self.shell_command = shell_command or self._get_current_shell() 

262 

263 # Extract shell name for title 

264 import os 

265 import logging 

266 shell_name = os.path.basename(self.shell_command) 

267 

268 logger = logging.getLogger(__name__) 

269 logger.info(f"Terminal: Initializing with shell: {self.shell_command}") 

270 

271 super().__init__( 

272 window_id="terminal", 

273 title=f"Terminal ({shell_name})", 

274 mode="temporary", 

275 **kwargs 

276 ) 

277 

278 # Track wrapper script for cleanup 

279 self.wrapper_script_path = None 

280 

281 def _get_current_shell(self) -> str: 

282 """Get the current shell command with login flag.""" 

283 import os 

284 import logging 

285 

286 logger = logging.getLogger(__name__) 

287 

288 # Method 1: Check SHELL environment variable 

289 shell_env = os.environ.get('SHELL') 

290 if shell_env and os.path.exists(shell_env): 

291 logger.debug(f"Terminal: Using shell from SHELL env var: {shell_env}") 

292 return f"{shell_env} -l" # Login shell 

293 

294 # Method 2: Check parent process (what launched the TUI) 

295 try: 

296 import psutil 

297 current_process = psutil.Process() 

298 parent_process = current_process.parent() 

299 if parent_process and parent_process.name() in ['bash', 'zsh', 'fish', 'tcsh', 'csh', 'sh']: 

300 # Try to get the full path 

301 try: 

302 shell_path = parent_process.exe() 

303 logger.debug(f"Terminal: Using shell from parent process: {shell_path}") 

304 return f"{shell_path} -l" # Login shell 

305 except (psutil.AccessDenied, psutil.NoSuchProcess): 

306 # Fall back to name-based lookup 

307 shell_name = parent_process.name() 

308 shell_path = f"/bin/{shell_name}" 

309 logger.debug(f"Terminal: Using shell from parent process name: {shell_path}") 

310 return f"{shell_path} -l" # Login shell 

311 except (ImportError, psutil.NoSuchProcess, psutil.AccessDenied): 

312 pass 

313 

314 # Method 3: Check what shell the user is actually using 

315 try: 

316 # Get the shell from /etc/passwd for current user 

317 import pwd 

318 user_shell = pwd.getpwuid(os.getuid()).pw_shell 

319 if user_shell and os.path.exists(user_shell): 

320 return f"{user_shell} -l" # Login shell 

321 except (ImportError, KeyError, OSError): 

322 pass 

323 

324 # Method 4: Try to detect common shells in order of preference 

325 common_shells = ['/bin/zsh', '/usr/bin/zsh', '/bin/bash', '/usr/bin/bash', '/bin/fish', '/usr/bin/fish', '/bin/sh'] 

326 for shell_path in common_shells: 

327 if os.path.exists(shell_path): 

328 return f"{shell_path} -l" # Login shell 

329 

330 # Fallback: bash (original behavior) 

331 logger.debug("Terminal: Using fallback shell: /bin/bash -l") 

332 return "/bin/bash -l" 

333 

334 def _create_environment_wrapper(self) -> str: 

335 """Create a wrapper command that exports current environment variables.""" 

336 import os 

337 import tempfile 

338 import stat 

339 

340 # Export key environment variables that the monkey-patch doesn't handle 

341 important_vars = ['PATH', 'HOME', 'USER', 'SHELL', 'PWD'] 

342 env_exports = [] 

343 

344 for key in important_vars: 

345 if key in os.environ: 

346 escaped_value = os.environ[key].replace('"', '\\"') 

347 env_exports.append(f'export {key}="{escaped_value}"') 

348 

349 script_content = f'''#!/bin/bash 

350# Export important environment variables 

351{chr(10).join(env_exports)} 

352 

353# Launch the shell (terminal capabilities handled by monkey-patch) 

354exec {self.shell_command} 

355''' 

356 

357 # Write to temporary file 

358 with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f: 

359 f.write(script_content) 

360 script_path = f.name 

361 

362 # Make executable 

363 os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH) 

364 

365 # Store for cleanup 

366 self.wrapper_script_path = script_path 

367 

368 return script_path 

369 

370 def compose(self) -> ComposeResult: 

371 """Compose the terminal window content - full window, no buttons.""" 

372 with Vertical(): 

373 if TERMINAL_AVAILABLE: 

374 # Create wrapper script that exports all environment variables 

375 wrapper_command = self._create_environment_wrapper() 

376 

377 yield Terminal( 

378 command=wrapper_command, 

379 id="terminal" 

380 ) 

381 else: 

382 yield Terminal(id="terminal") 

383 

384 async def on_key(self, event) -> None: 

385 """Handle key presses for terminal shortcuts.""" 

386 # Ctrl+L to clear terminal (like most terminals) 

387 if event.key == "ctrl+l": 

388 if TERMINAL_AVAILABLE: 

389 await self.send_command("clear") 

390 # Ctrl+D to close terminal (like most terminals) 

391 elif event.key == "ctrl+d": 

392 self.close_window() 

393 

394 async def send_command(self, command: str): 

395 """Send a command to the terminal.""" 

396 terminal = self.query_one("#terminal", Terminal) 

397 if TERMINAL_AVAILABLE and hasattr(terminal, 'send_queue') and terminal.send_queue: 

398 # Send each character of the command 

399 for char in command: 

400 await terminal.send_queue.put(["stdin", char]) 

401 # Send enter key 

402 await terminal.send_queue.put(["stdin", "\n"]) 

403 

404 def analyze_terminal_output(self, text: str) -> dict: 

405 """ 

406 Analyze terminal output using enhanced escape sequence parsing. 

407 

408 Returns: 

409 Dictionary with parsed information about colors, styles, etc. 

410 """ 

411 if not TERMINAL_AVAILABLE: 

412 return {} 

413 

414 try: 

415 parts = terminal_enhancements.parse_enhanced_escape_sequences(text) 

416 

417 analysis = { 

418 'has_colors': False, 

419 'has_styles': False, 

420 'title_changes': [], 

421 'color_sequences': [], 

422 'text_parts': [], 

423 } 

424 

425 for text_part, seq_type, params in parts: 

426 if seq_type == 'text': 

427 analysis['text_parts'].append(text_part) 

428 elif seq_type == 'csi' and params.get('command') == 'm': 

429 # Color/style sequence 

430 color_info = terminal_enhancements.parse_color_sequence(params.get('params', [])) 

431 analysis['color_sequences'].append(color_info) 

432 if color_info.get('fg_color') or color_info.get('bg_color'): 

433 analysis['has_colors'] = True 

434 if any(color_info.get(k) for k in ['bold', 'italic', 'underline']): 

435 analysis['has_styles'] = True 

436 elif seq_type == 'title': 

437 analysis['title_changes'].append(params.get('title', '')) 

438 

439 return analysis 

440 

441 except Exception as e: 

442 # Fallback to empty analysis if parsing fails 

443 return {} 

444 

445 def on_mount(self) -> None: 

446 """Called when terminal window is mounted.""" 

447 terminal = self.query_one("#terminal", Terminal) 

448 if TERMINAL_AVAILABLE: 

449 terminal.start() # Start the terminal emulator 

450 terminal.focus() 

451 

452 def on_unmount(self) -> None: 

453 """Called when terminal window is unmounted - cleanup wrapper script.""" 

454 import os 

455 if self.wrapper_script_path and os.path.exists(self.wrapper_script_path): 

456 try: 

457 os.unlink(self.wrapper_script_path) 

458 except OSError: 

459 pass # Ignore cleanup errors