Coverage for openhcs/textual_tui/windows/terminal_window.py: 0.0%

271 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1"""Terminal window for OpenHCS Textual TUI.""" 

2 

3from pathlib import Path 

4from textual.app import ComposeResult 

5from textual.widgets import Button, Static 

6from textual.containers import Container, Horizontal, Vertical 

7 

8from openhcs.textual_tui.windows.base_window import BaseOpenHCSWindow 

9 

10# Fix textual-terminal compatibility with Textual 3.5.0+ 

11import textual.app 

12from textual.color import ANSI_COLORS 

13if not hasattr(textual.app, 'DEFAULT_COLORS'): 

14 textual.app.DEFAULT_COLORS = ANSI_COLORS 

15 

16# Import textual-terminal with compatibility fix 

17try: 

18 from textual_terminal import Terminal 

19 from textual_terminal._terminal import TerminalEmulator 

20 TERMINAL_AVAILABLE = True 

21 

22 # Import our extracted terminal enhancements 

23 from openhcs.textual_tui.services.terminal_enhancements import terminal_enhancements 

24 

25 # Import Gate One terminal for enhanced features 

26 from openhcs.textual_tui.services.terminal import Terminal as GateOneTerminal 

27 

28 # Monkey-patch Terminal to track cursor styles 

29 _original_terminal_recv = Terminal.recv 

30 

31 async def _patched_recv(self): 

32 """Patched recv that tracks cursor style and renders appropriately.""" 

33 import re 

34 import asyncio 

35 

36 # Initialize cursor_style if not present 

37 if not hasattr(self, 'cursor_style'): 

38 self.cursor_style = 1 # Default block cursor 

39 

40 try: 

41 while True: 

42 message = await self.recv_queue.get() 

43 cmd = message[0] 

44 if cmd == "setup": 

45 await self.send_queue.put(["set_size", self.nrow, self.ncol]) 

46 elif cmd == "stdout": 

47 chars = message[1] 

48 

49 # Track cursor style sequences - comprehensive detection 

50 

51 # Debug: Look for cursor sequences specifically 

52 if ' q' in chars or 'q' in chars: 

53 print(f"DEBUG: Raw chars containing 'q': {repr(chars)}") 

54 

55 # Standard DECSCUSR sequences: ESC [ Ps q (vim style) 

56 cursor_style_pattern = re.compile(r'\x1b\[([0-6]) q') 

57 for match in cursor_style_pattern.finditer(chars): 

58 style_num = int(match.group(1)) 

59 old_style = self.cursor_style 

60 if style_num in [0, 1, 2]: # 0=default, 1=blinking block, 2=steady block 

61 self.cursor_style = 1 # block 

62 elif style_num in [3, 4]: # 3=blinking underline, 4=steady underline 

63 self.cursor_style = 3 # underline 

64 elif style_num in [5, 6]: # 5=blinking bar, 6=steady bar 

65 self.cursor_style = 5 # bar 

66 print(f"DEBUG: VIM cursor style changed from {old_style} to {self.cursor_style} (sequence: {repr(match.group(0))})") 

67 

68 # Linux terminal sequences: ESC [ ? Ps c 

69 linux_cursor_pattern = re.compile(r'\x1b\[\?([0-9]+)c') 

70 for match in linux_cursor_pattern.finditer(chars): 

71 style_num = int(match.group(1)) 

72 old_style = self.cursor_style 

73 if style_num == 0: 

74 self.cursor_style = 1 # normal block 

75 elif style_num == 1: 

76 self.cursor_style = 0 # invisible 

77 elif style_num == 8: 

78 self.cursor_style = 3 # very visible (underline) 

79 print(f"DEBUG: Linux cursor style changed from {old_style} to {self.cursor_style} (sequence: {match.group(0)})") 

80 

81 # Vim-style cursor sequences: ESC ] 50 ; CursorShape=N BEL 

82 vim_cursor_pattern = re.compile(r'\x1b\]50;CursorShape=([0-2])\x07') 

83 for match in vim_cursor_pattern.finditer(chars): 

84 shape_num = int(match.group(1)) 

85 old_style = self.cursor_style 

86 if shape_num == 0: 

87 self.cursor_style = 1 # block 

88 elif shape_num == 1: 

89 self.cursor_style = 5 # bar 

90 elif shape_num == 2: 

91 self.cursor_style = 3 # underline 

92 print(f"DEBUG: Vim cursor style changed from {old_style} to {self.cursor_style} (sequence: {match.group(0)})") 

93 

94 # Alternative vim sequences: ESC ] 12 ; color BEL (cursor color) 

95 # We don't change style but log it 

96 vim_color_pattern = re.compile(r'\x1b\]12;[^\\x07]*\x07') 

97 for match in vim_color_pattern.finditer(chars): 

98 print(f"DEBUG: Vim cursor color sequence detected: {match.group(0)}") 

99 

100 # Cursor visibility sequences 

101 cursor_visibility_pattern = re.compile(r'\x1b\[\?25([hl])') 

102 for match in cursor_visibility_pattern.finditer(chars): 

103 visibility = match.group(1) 

104 if visibility == 'l': # hide cursor 

105 print(f"DEBUG: Cursor hidden") 

106 elif visibility == 'h': # show cursor 

107 print(f"DEBUG: Cursor shown") 

108 

109 # Handle mouse tracking (from original) 

110 _re_ansi_sequence = re.compile(r"(\x1b\[\??[\d;]*[a-zA-Z])") 

111 DECSET_PREFIX = "\x1b[?" 

112 

113 for sep_match in re.finditer(_re_ansi_sequence, chars): 

114 sequence = sep_match.group(0) 

115 if sequence.startswith(DECSET_PREFIX): 

116 parameters = sequence.removeprefix(DECSET_PREFIX).split(";") 

117 if "1000h" in parameters: 

118 self.mouse_tracking = True 

119 if "1000l" in parameters: 

120 self.mouse_tracking = False 

121 

122 try: 

123 self.stream.feed(chars) 

124 except TypeError as error: 

125 from textual import log 

126 log.warning("could not feed:", error) 

127 

128 # Custom display building with cursor styles 

129 from rich.text import Text 

130 lines = [] 

131 for y in range(self._screen.lines): 

132 line_text = Text() 

133 line = self._screen.buffer[y] 

134 style_change_pos: int = 0 

135 for x in range(self._screen.columns): 

136 char = line[x] 

137 

138 # Check if this is the cursor position 

139 is_cursor = ( 

140 self._screen.cursor.x == x 

141 and self._screen.cursor.y == y 

142 and not self._screen.cursor.hidden 

143 ) 

144 

145 # Modify character for cursor styles 

146 char_data = char.data 

147 if is_cursor: 

148 cursor_style = getattr(self, 'cursor_style', 1) 

149 if cursor_style == 5: # bar cursor 

150 # Use a thin left-aligned vertical bar 

151 char_data = "▎" # Left one-quarter block (slightly thicker but still thin) 

152 elif cursor_style == 3: # underline cursor 

153 char_data = char.data # Keep original character 

154 

155 line_text.append(char_data) 

156 

157 # Handle styling (from original) 

158 if x > 0: 

159 last_char = line[x - 1] 

160 if not self.char_style_cmp(char, last_char) or x == self._screen.columns - 1: 

161 last_style = self.char_rich_style(last_char) 

162 line_text.stylize(last_style, style_change_pos, x + 1) 

163 style_change_pos = x 

164 

165 # Apply cursor styling 

166 if is_cursor: 

167 cursor_style = getattr(self, 'cursor_style', 1) 

168 if cursor_style == 1: # block cursor 

169 line_text.stylize("reverse blink", x, x + 1) 

170 elif cursor_style == 3: # underline cursor 

171 line_text.stylize("underline bold blink", x, x + 1) 

172 elif cursor_style == 5: # bar cursor 

173 # Make the thin bar visible and blinking 

174 line_text.stylize("bold blink", x, x + 1) 

175 

176 lines.append(line_text) 

177 

178 from textual_terminal._terminal import TerminalDisplay 

179 self._display = TerminalDisplay(lines) 

180 self.refresh() 

181 

182 elif cmd == "disconnect": 

183 self.stop() 

184 except asyncio.CancelledError: 

185 pass 

186 

187 # Apply the monkey patch 

188 Terminal.recv = _patched_recv 

189 

190 # Monkey-patch TerminalEmulator for better environment 

191 _original_open_terminal = TerminalEmulator.open_terminal 

192 

193 def _patched_open_terminal(self, command: str): 

194 """Patched version that uses linux terminal type for proper Unicode box-drawing.""" 

195 import pty 

196 import shlex 

197 import os 

198 from pathlib import Path 

199 

200 self.pid, fd = pty.fork() 

201 if self.pid == 0: 

202 argv = shlex.split(command) 

203 # Use linux terminal type - this makes programs output Unicode directly 

204 # instead of VT100 line-drawing sequences that need translation 

205 env = dict( 

206 TERM="linux", # Critical: linux terminal outputs Unicode box-drawing directly 

207 LC_ALL="en_US.UTF-8", 

208 LC_CTYPE="en_US.UTF-8", 

209 LANG="en_US.UTF-8", 

210 HOME=str(Path.home()), 

211 ) 

212 # Add current PATH and other important vars 

213 for key in ['PATH', 'USER', 'SHELL']: 

214 if key in os.environ: 

215 env[key] = os.environ[key] 

216 os.execvpe(argv[0], argv, env) 

217 return fd 

218 

219 # Apply the monkey patch 

220 TerminalEmulator.open_terminal = _patched_open_terminal 

221 

222except ImportError: 

223 TERMINAL_AVAILABLE = False 

224 # Create a placeholder Terminal class 

225 class Terminal(Static): 

226 def __init__(self, command=None, **kwargs): 

227 super().__init__("Terminal not available\n\nInstall textual-terminal:\npip install textual-terminal", **kwargs) 

228 self.command = command 

229 

230 def clear(self): 

231 pass 

232 

233 def write(self, text): 

234 pass 

235 

236 

237class TerminalWindow(BaseOpenHCSWindow): 

238 """Terminal window using textual-window system with embedded terminal.""" 

239 

240 DEFAULT_CSS = """ 

241 TerminalWindow { 

242 width: 80; height: 24; 

243 min-width: 80; min-height: 20; 

244 } 

245 TerminalWindow #content_pane { 

246 padding: 0; 

247 } 

248 TerminalWindow #terminal { 

249 height: 1fr; 

250 width: 100%; 

251 } 

252 """ 

253 

254 def __init__(self, shell_command: str = None, **kwargs): 

255 """ 

256 Initialize terminal window. 

257 

258 Args: 

259 shell_command: Optional command to run (defaults to current shell) 

260 """ 

261 # Get shell before calling super() so we can use it in title 

262 self.shell_command = shell_command or self._get_current_shell() 

263 

264 # Extract shell name for title 

265 import os 

266 import logging 

267 shell_name = os.path.basename(self.shell_command) 

268 

269 logger = logging.getLogger(__name__) 

270 logger.info(f"Terminal: Initializing with shell: {self.shell_command}") 

271 

272 super().__init__( 

273 window_id="terminal", 

274 title=f"Terminal ({shell_name})", 

275 mode="temporary", 

276 **kwargs 

277 ) 

278 

279 # Track wrapper script for cleanup 

280 self.wrapper_script_path = None 

281 

282 def _get_current_shell(self) -> str: 

283 """Get the current shell command with login flag.""" 

284 import os 

285 import logging 

286 

287 logger = logging.getLogger(__name__) 

288 

289 # Method 1: Check SHELL environment variable 

290 shell_env = os.environ.get('SHELL') 

291 if shell_env and os.path.exists(shell_env): 

292 logger.debug(f"Terminal: Using shell from SHELL env var: {shell_env}") 

293 return f"{shell_env} -l" # Login shell 

294 

295 # Method 2: Check parent process (what launched the TUI) 

296 try: 

297 import psutil 

298 current_process = psutil.Process() 

299 parent_process = current_process.parent() 

300 if parent_process and parent_process.name() in ['bash', 'zsh', 'fish', 'tcsh', 'csh', 'sh']: 

301 # Try to get the full path 

302 try: 

303 shell_path = parent_process.exe() 

304 logger.debug(f"Terminal: Using shell from parent process: {shell_path}") 

305 return f"{shell_path} -l" # Login shell 

306 except (psutil.AccessDenied, psutil.NoSuchProcess): 

307 # Fall back to name-based lookup 

308 shell_name = parent_process.name() 

309 shell_path = f"/bin/{shell_name}" 

310 logger.debug(f"Terminal: Using shell from parent process name: {shell_path}") 

311 return f"{shell_path} -l" # Login shell 

312 except (ImportError, psutil.NoSuchProcess, psutil.AccessDenied): 

313 pass 

314 

315 # Method 3: Check what shell the user is actually using 

316 try: 

317 # Get the shell from /etc/passwd for current user 

318 import pwd 

319 user_shell = pwd.getpwuid(os.getuid()).pw_shell 

320 if user_shell and os.path.exists(user_shell): 

321 return f"{user_shell} -l" # Login shell 

322 except (ImportError, KeyError, OSError): 

323 pass 

324 

325 # Method 4: Try to detect common shells in order of preference 

326 common_shells = ['/bin/zsh', '/usr/bin/zsh', '/bin/bash', '/usr/bin/bash', '/bin/fish', '/usr/bin/fish', '/bin/sh'] 

327 for shell_path in common_shells: 

328 if os.path.exists(shell_path): 

329 return f"{shell_path} -l" # Login shell 

330 

331 # Fallback: bash (original behavior) 

332 logger.debug("Terminal: Using fallback shell: /bin/bash -l") 

333 return "/bin/bash -l" 

334 

335 def _create_environment_wrapper(self) -> str: 

336 """Create a wrapper command that exports current environment variables.""" 

337 import os 

338 import tempfile 

339 import stat 

340 

341 # Export key environment variables that the monkey-patch doesn't handle 

342 important_vars = ['PATH', 'HOME', 'USER', 'SHELL', 'PWD'] 

343 env_exports = [] 

344 

345 for key in important_vars: 

346 if key in os.environ: 

347 escaped_value = os.environ[key].replace('"', '\\"') 

348 env_exports.append(f'export {key}="{escaped_value}"') 

349 

350 script_content = f'''#!/bin/bash 

351# Export important environment variables 

352{chr(10).join(env_exports)} 

353 

354# Launch the shell (terminal capabilities handled by monkey-patch) 

355exec {self.shell_command} 

356''' 

357 

358 # Write to temporary file 

359 with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f: 

360 f.write(script_content) 

361 script_path = f.name 

362 

363 # Make executable 

364 os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH) 

365 

366 # Store for cleanup 

367 self.wrapper_script_path = script_path 

368 

369 return script_path 

370 

371 def compose(self) -> ComposeResult: 

372 """Compose the terminal window content - full window, no buttons.""" 

373 with Vertical(): 

374 if TERMINAL_AVAILABLE: 

375 # Create wrapper script that exports all environment variables 

376 wrapper_command = self._create_environment_wrapper() 

377 

378 yield Terminal( 

379 command=wrapper_command, 

380 id="terminal" 

381 ) 

382 else: 

383 yield Terminal(id="terminal") 

384 

385 async def on_key(self, event) -> None: 

386 """Handle key presses for terminal shortcuts.""" 

387 # Ctrl+L to clear terminal (like most terminals) 

388 if event.key == "ctrl+l": 

389 if TERMINAL_AVAILABLE: 

390 await self.send_command("clear") 

391 # Ctrl+D to close terminal (like most terminals) 

392 elif event.key == "ctrl+d": 

393 self.close_window() 

394 

395 async def send_command(self, command: str): 

396 """Send a command to the terminal.""" 

397 terminal = self.query_one("#terminal", Terminal) 

398 if TERMINAL_AVAILABLE and hasattr(terminal, 'send_queue') and terminal.send_queue: 

399 # Send each character of the command 

400 for char in command: 

401 await terminal.send_queue.put(["stdin", char]) 

402 # Send enter key 

403 await terminal.send_queue.put(["stdin", "\n"]) 

404 

405 def analyze_terminal_output(self, text: str) -> dict: 

406 """ 

407 Analyze terminal output using enhanced escape sequence parsing. 

408 

409 Returns: 

410 Dictionary with parsed information about colors, styles, etc. 

411 """ 

412 if not TERMINAL_AVAILABLE: 

413 return {} 

414 

415 try: 

416 parts = terminal_enhancements.parse_enhanced_escape_sequences(text) 

417 

418 analysis = { 

419 'has_colors': False, 

420 'has_styles': False, 

421 'title_changes': [], 

422 'color_sequences': [], 

423 'text_parts': [], 

424 } 

425 

426 for text_part, seq_type, params in parts: 

427 if seq_type == 'text': 

428 analysis['text_parts'].append(text_part) 

429 elif seq_type == 'csi' and params.get('command') == 'm': 

430 # Color/style sequence 

431 color_info = terminal_enhancements.parse_color_sequence(params.get('params', [])) 

432 analysis['color_sequences'].append(color_info) 

433 if color_info.get('fg_color') or color_info.get('bg_color'): 

434 analysis['has_colors'] = True 

435 if any(color_info.get(k) for k in ['bold', 'italic', 'underline']): 

436 analysis['has_styles'] = True 

437 elif seq_type == 'title': 

438 analysis['title_changes'].append(params.get('title', '')) 

439 

440 return analysis 

441 

442 except Exception as e: 

443 # Fallback to empty analysis if parsing fails 

444 return {} 

445 

446 def on_mount(self) -> None: 

447 """Called when terminal window is mounted.""" 

448 terminal = self.query_one("#terminal", Terminal) 

449 if TERMINAL_AVAILABLE: 

450 terminal.start() # Start the terminal emulator 

451 terminal.focus() 

452 

453 def on_unmount(self) -> None: 

454 """Called when terminal window is unmounted - cleanup wrapper script.""" 

455 import os 

456 if self.wrapper_script_path and os.path.exists(self.wrapper_script_path): 

457 try: 

458 os.unlink(self.wrapper_script_path) 

459 except OSError: 

460 pass # Ignore cleanup errors