Coverage for openhcs/textual_tui/windows/terminal_window.py: 0.0%
270 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""Terminal window for OpenHCS Textual TUI."""
3from textual.app import ComposeResult
4from textual.widgets import Static
5from textual.containers import Vertical
7from openhcs.textual_tui.windows.base_window import BaseOpenHCSWindow
9# Fix textual-terminal compatibility with Textual 3.5.0+
10import textual.app
11from textual.color import ANSI_COLORS
12if not hasattr(textual.app, 'DEFAULT_COLORS'):
13 textual.app.DEFAULT_COLORS = ANSI_COLORS
15# Import textual-terminal with compatibility fix
16try:
17 from textual_terminal import Terminal
18 from textual_terminal._terminal import TerminalEmulator
19 TERMINAL_AVAILABLE = True
21 # Import our extracted terminal enhancements
22 from openhcs.textual_tui.services.terminal_enhancements import terminal_enhancements
24 # Import Gate One terminal for enhanced features
25 from openhcs.textual_tui.services.terminal import Terminal as GateOneTerminal
27 # Monkey-patch Terminal to track cursor styles
28 _original_terminal_recv = Terminal.recv
30 async def _patched_recv(self):
31 """Patched recv that tracks cursor style and renders appropriately."""
32 import re
33 import asyncio
35 # Initialize cursor_style if not present
36 if not hasattr(self, 'cursor_style'):
37 self.cursor_style = 1 # Default block cursor
39 try:
40 while True:
41 message = await self.recv_queue.get()
42 cmd = message[0]
43 if cmd == "setup":
44 await self.send_queue.put(["set_size", self.nrow, self.ncol])
45 elif cmd == "stdout":
46 chars = message[1]
48 # Track cursor style sequences - comprehensive detection
50 # Debug: Look for cursor sequences specifically
51 if ' q' in chars or 'q' in chars:
52 print(f"DEBUG: Raw chars containing 'q': {repr(chars)}")
54 # Standard DECSCUSR sequences: ESC [ Ps q (vim style)
55 cursor_style_pattern = re.compile(r'\x1b\[([0-6]) q')
56 for match in cursor_style_pattern.finditer(chars):
57 style_num = int(match.group(1))
58 old_style = self.cursor_style
59 if style_num in [0, 1, 2]: # 0=default, 1=blinking block, 2=steady block
60 self.cursor_style = 1 # block
61 elif style_num in [3, 4]: # 3=blinking underline, 4=steady underline
62 self.cursor_style = 3 # underline
63 elif style_num in [5, 6]: # 5=blinking bar, 6=steady bar
64 self.cursor_style = 5 # bar
65 print(f"DEBUG: VIM cursor style changed from {old_style} to {self.cursor_style} (sequence: {repr(match.group(0))})")
67 # Linux terminal sequences: ESC [ ? Ps c
68 linux_cursor_pattern = re.compile(r'\x1b\[\?([0-9]+)c')
69 for match in linux_cursor_pattern.finditer(chars):
70 style_num = int(match.group(1))
71 old_style = self.cursor_style
72 if style_num == 0:
73 self.cursor_style = 1 # normal block
74 elif style_num == 1:
75 self.cursor_style = 0 # invisible
76 elif style_num == 8:
77 self.cursor_style = 3 # very visible (underline)
78 print(f"DEBUG: Linux cursor style changed from {old_style} to {self.cursor_style} (sequence: {match.group(0)})")
80 # Vim-style cursor sequences: ESC ] 50 ; CursorShape=N BEL
81 vim_cursor_pattern = re.compile(r'\x1b\]50;CursorShape=([0-2])\x07')
82 for match in vim_cursor_pattern.finditer(chars):
83 shape_num = int(match.group(1))
84 old_style = self.cursor_style
85 if shape_num == 0:
86 self.cursor_style = 1 # block
87 elif shape_num == 1:
88 self.cursor_style = 5 # bar
89 elif shape_num == 2:
90 self.cursor_style = 3 # underline
91 print(f"DEBUG: Vim cursor style changed from {old_style} to {self.cursor_style} (sequence: {match.group(0)})")
93 # Alternative vim sequences: ESC ] 12 ; color BEL (cursor color)
94 # We don't change style but log it
95 vim_color_pattern = re.compile(r'\x1b\]12;[^\\x07]*\x07')
96 for match in vim_color_pattern.finditer(chars):
97 print(f"DEBUG: Vim cursor color sequence detected: {match.group(0)}")
99 # Cursor visibility sequences
100 cursor_visibility_pattern = re.compile(r'\x1b\[\?25([hl])')
101 for match in cursor_visibility_pattern.finditer(chars):
102 visibility = match.group(1)
103 if visibility == 'l': # hide cursor
104 print("DEBUG: Cursor hidden")
105 elif visibility == 'h': # show cursor
106 print("DEBUG: Cursor shown")
108 # Handle mouse tracking (from original)
109 _re_ansi_sequence = re.compile(r"(\x1b\[\??[\d;]*[a-zA-Z])")
110 DECSET_PREFIX = "\x1b[?"
112 for sep_match in re.finditer(_re_ansi_sequence, chars):
113 sequence = sep_match.group(0)
114 if sequence.startswith(DECSET_PREFIX):
115 parameters = sequence.removeprefix(DECSET_PREFIX).split(";")
116 if "1000h" in parameters:
117 self.mouse_tracking = True
118 if "1000l" in parameters:
119 self.mouse_tracking = False
121 try:
122 self.stream.feed(chars)
123 except TypeError as error:
124 from textual import log
125 log.warning("could not feed:", error)
127 # Custom display building with cursor styles
128 from rich.text import Text
129 lines = []
130 for y in range(self._screen.lines):
131 line_text = Text()
132 line = self._screen.buffer[y]
133 style_change_pos: int = 0
134 for x in range(self._screen.columns):
135 char = line[x]
137 # Check if this is the cursor position
138 is_cursor = (
139 self._screen.cursor.x == x
140 and self._screen.cursor.y == y
141 and not self._screen.cursor.hidden
142 )
144 # Modify character for cursor styles
145 char_data = char.data
146 if is_cursor:
147 cursor_style = getattr(self, 'cursor_style', 1)
148 if cursor_style == 5: # bar cursor
149 # Use a thin left-aligned vertical bar
150 char_data = "▎" # Left one-quarter block (slightly thicker but still thin)
151 elif cursor_style == 3: # underline cursor
152 char_data = char.data # Keep original character
154 line_text.append(char_data)
156 # Handle styling (from original)
157 if x > 0:
158 last_char = line[x - 1]
159 if not self.char_style_cmp(char, last_char) or x == self._screen.columns - 1:
160 last_style = self.char_rich_style(last_char)
161 line_text.stylize(last_style, style_change_pos, x + 1)
162 style_change_pos = x
164 # Apply cursor styling
165 if is_cursor:
166 cursor_style = getattr(self, 'cursor_style', 1)
167 if cursor_style == 1: # block cursor
168 line_text.stylize("reverse blink", x, x + 1)
169 elif cursor_style == 3: # underline cursor
170 line_text.stylize("underline bold blink", x, x + 1)
171 elif cursor_style == 5: # bar cursor
172 # Make the thin bar visible and blinking
173 line_text.stylize("bold blink", x, x + 1)
175 lines.append(line_text)
177 from textual_terminal._terminal import TerminalDisplay
178 self._display = TerminalDisplay(lines)
179 self.refresh()
181 elif cmd == "disconnect":
182 self.stop()
183 except asyncio.CancelledError:
184 pass
186 # Apply the monkey patch
187 Terminal.recv = _patched_recv
189 # Monkey-patch TerminalEmulator for better environment
190 _original_open_terminal = TerminalEmulator.open_terminal
192 def _patched_open_terminal(self, command: str):
193 """Patched version that uses linux terminal type for proper Unicode box-drawing."""
194 import pty
195 import shlex
196 import os
197 from pathlib import Path
199 self.pid, fd = pty.fork()
200 if self.pid == 0:
201 argv = shlex.split(command)
202 # Use linux terminal type - this makes programs output Unicode directly
203 # instead of VT100 line-drawing sequences that need translation
204 env = dict(
205 TERM="linux", # Critical: linux terminal outputs Unicode box-drawing directly
206 LC_ALL="en_US.UTF-8",
207 LC_CTYPE="en_US.UTF-8",
208 LANG="en_US.UTF-8",
209 HOME=str(Path.home()),
210 )
211 # Add current PATH and other important vars
212 for key in ['PATH', 'USER', 'SHELL']:
213 if key in os.environ:
214 env[key] = os.environ[key]
215 os.execvpe(argv[0], argv, env)
216 return fd
218 # Apply the monkey patch
219 TerminalEmulator.open_terminal = _patched_open_terminal
221except ImportError:
222 TERMINAL_AVAILABLE = False
223 # Create a placeholder Terminal class
224 class Terminal(Static):
225 def __init__(self, command=None, **kwargs):
226 super().__init__("Terminal not available\n\nInstall textual-terminal:\npip install textual-terminal", **kwargs)
227 self.command = command
229 def clear(self):
230 pass
232 def write(self, text):
233 pass
236class TerminalWindow(BaseOpenHCSWindow):
237 """Terminal window using textual-window system with embedded terminal."""
239 DEFAULT_CSS = """
240 TerminalWindow {
241 width: 80; height: 24;
242 min-width: 80; min-height: 20;
243 }
244 TerminalWindow #content_pane {
245 padding: 0;
246 }
247 TerminalWindow #terminal {
248 height: 1fr;
249 width: 100%;
250 }
251 """
253 def __init__(self, shell_command: str = None, **kwargs):
254 """
255 Initialize terminal window.
257 Args:
258 shell_command: Optional command to run (defaults to current shell)
259 """
260 # Get shell before calling super() so we can use it in title
261 self.shell_command = shell_command or self._get_current_shell()
263 # Extract shell name for title
264 import os
265 import logging
266 shell_name = os.path.basename(self.shell_command)
268 logger = logging.getLogger(__name__)
269 logger.info(f"Terminal: Initializing with shell: {self.shell_command}")
271 super().__init__(
272 window_id="terminal",
273 title=f"Terminal ({shell_name})",
274 mode="temporary",
275 **kwargs
276 )
278 # Track wrapper script for cleanup
279 self.wrapper_script_path = None
281 def _get_current_shell(self) -> str:
282 """Get the current shell command with login flag."""
283 import os
284 import logging
286 logger = logging.getLogger(__name__)
288 # Method 1: Check SHELL environment variable
289 shell_env = os.environ.get('SHELL')
290 if shell_env and os.path.exists(shell_env):
291 logger.debug(f"Terminal: Using shell from SHELL env var: {shell_env}")
292 return f"{shell_env} -l" # Login shell
294 # Method 2: Check parent process (what launched the TUI)
295 try:
296 import psutil
297 current_process = psutil.Process()
298 parent_process = current_process.parent()
299 if parent_process and parent_process.name() in ['bash', 'zsh', 'fish', 'tcsh', 'csh', 'sh']:
300 # Try to get the full path
301 try:
302 shell_path = parent_process.exe()
303 logger.debug(f"Terminal: Using shell from parent process: {shell_path}")
304 return f"{shell_path} -l" # Login shell
305 except (psutil.AccessDenied, psutil.NoSuchProcess):
306 # Fall back to name-based lookup
307 shell_name = parent_process.name()
308 shell_path = f"/bin/{shell_name}"
309 logger.debug(f"Terminal: Using shell from parent process name: {shell_path}")
310 return f"{shell_path} -l" # Login shell
311 except (ImportError, psutil.NoSuchProcess, psutil.AccessDenied):
312 pass
314 # Method 3: Check what shell the user is actually using
315 try:
316 # Get the shell from /etc/passwd for current user
317 import pwd
318 user_shell = pwd.getpwuid(os.getuid()).pw_shell
319 if user_shell and os.path.exists(user_shell):
320 return f"{user_shell} -l" # Login shell
321 except (ImportError, KeyError, OSError):
322 pass
324 # Method 4: Try to detect common shells in order of preference
325 common_shells = ['/bin/zsh', '/usr/bin/zsh', '/bin/bash', '/usr/bin/bash', '/bin/fish', '/usr/bin/fish', '/bin/sh']
326 for shell_path in common_shells:
327 if os.path.exists(shell_path):
328 return f"{shell_path} -l" # Login shell
330 # Fallback: bash (original behavior)
331 logger.debug("Terminal: Using fallback shell: /bin/bash -l")
332 return "/bin/bash -l"
334 def _create_environment_wrapper(self) -> str:
335 """Create a wrapper command that exports current environment variables."""
336 import os
337 import tempfile
338 import stat
340 # Export key environment variables that the monkey-patch doesn't handle
341 important_vars = ['PATH', 'HOME', 'USER', 'SHELL', 'PWD']
342 env_exports = []
344 for key in important_vars:
345 if key in os.environ:
346 escaped_value = os.environ[key].replace('"', '\\"')
347 env_exports.append(f'export {key}="{escaped_value}"')
349 script_content = f'''#!/bin/bash
350# Export important environment variables
351{chr(10).join(env_exports)}
353# Launch the shell (terminal capabilities handled by monkey-patch)
354exec {self.shell_command}
355'''
357 # Write to temporary file
358 with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f:
359 f.write(script_content)
360 script_path = f.name
362 # Make executable
363 os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
365 # Store for cleanup
366 self.wrapper_script_path = script_path
368 return script_path
370 def compose(self) -> ComposeResult:
371 """Compose the terminal window content - full window, no buttons."""
372 with Vertical():
373 if TERMINAL_AVAILABLE:
374 # Create wrapper script that exports all environment variables
375 wrapper_command = self._create_environment_wrapper()
377 yield Terminal(
378 command=wrapper_command,
379 id="terminal"
380 )
381 else:
382 yield Terminal(id="terminal")
384 async def on_key(self, event) -> None:
385 """Handle key presses for terminal shortcuts."""
386 # Ctrl+L to clear terminal (like most terminals)
387 if event.key == "ctrl+l":
388 if TERMINAL_AVAILABLE:
389 await self.send_command("clear")
390 # Ctrl+D to close terminal (like most terminals)
391 elif event.key == "ctrl+d":
392 self.close_window()
394 async def send_command(self, command: str):
395 """Send a command to the terminal."""
396 terminal = self.query_one("#terminal", Terminal)
397 if TERMINAL_AVAILABLE and hasattr(terminal, 'send_queue') and terminal.send_queue:
398 # Send each character of the command
399 for char in command:
400 await terminal.send_queue.put(["stdin", char])
401 # Send enter key
402 await terminal.send_queue.put(["stdin", "\n"])
404 def analyze_terminal_output(self, text: str) -> dict:
405 """
406 Analyze terminal output using enhanced escape sequence parsing.
408 Returns:
409 Dictionary with parsed information about colors, styles, etc.
410 """
411 if not TERMINAL_AVAILABLE:
412 return {}
414 try:
415 parts = terminal_enhancements.parse_enhanced_escape_sequences(text)
417 analysis = {
418 'has_colors': False,
419 'has_styles': False,
420 'title_changes': [],
421 'color_sequences': [],
422 'text_parts': [],
423 }
425 for text_part, seq_type, params in parts:
426 if seq_type == 'text':
427 analysis['text_parts'].append(text_part)
428 elif seq_type == 'csi' and params.get('command') == 'm':
429 # Color/style sequence
430 color_info = terminal_enhancements.parse_color_sequence(params.get('params', []))
431 analysis['color_sequences'].append(color_info)
432 if color_info.get('fg_color') or color_info.get('bg_color'):
433 analysis['has_colors'] = True
434 if any(color_info.get(k) for k in ['bold', 'italic', 'underline']):
435 analysis['has_styles'] = True
436 elif seq_type == 'title':
437 analysis['title_changes'].append(params.get('title', ''))
439 return analysis
441 except Exception as e:
442 # Fallback to empty analysis if parsing fails
443 return {}
445 def on_mount(self) -> None:
446 """Called when terminal window is mounted."""
447 terminal = self.query_one("#terminal", Terminal)
448 if TERMINAL_AVAILABLE:
449 terminal.start() # Start the terminal emulator
450 terminal.focus()
452 def on_unmount(self) -> None:
453 """Called when terminal window is unmounted - cleanup wrapper script."""
454 import os
455 if self.wrapper_script_path and os.path.exists(self.wrapper_script_path):
456 try:
457 os.unlink(self.wrapper_script_path)
458 except OSError:
459 pass # Ignore cleanup errors