Coverage for openhcs/textual_tui/windows/terminal_window.py: 0.0%
271 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""Terminal window for OpenHCS Textual TUI."""
3from pathlib import Path
4from textual.app import ComposeResult
5from textual.widgets import Button, Static
6from textual.containers import Container, Horizontal, Vertical
8from openhcs.textual_tui.windows.base_window import BaseOpenHCSWindow
10# Fix textual-terminal compatibility with Textual 3.5.0+
11import textual.app
12from textual.color import ANSI_COLORS
13if not hasattr(textual.app, 'DEFAULT_COLORS'):
14 textual.app.DEFAULT_COLORS = ANSI_COLORS
16# Import textual-terminal with compatibility fix
17try:
18 from textual_terminal import Terminal
19 from textual_terminal._terminal import TerminalEmulator
20 TERMINAL_AVAILABLE = True
22 # Import our extracted terminal enhancements
23 from openhcs.textual_tui.services.terminal_enhancements import terminal_enhancements
25 # Import Gate One terminal for enhanced features
26 from openhcs.textual_tui.services.terminal import Terminal as GateOneTerminal
28 # Monkey-patch Terminal to track cursor styles
29 _original_terminal_recv = Terminal.recv
31 async def _patched_recv(self):
32 """Patched recv that tracks cursor style and renders appropriately."""
33 import re
34 import asyncio
36 # Initialize cursor_style if not present
37 if not hasattr(self, 'cursor_style'):
38 self.cursor_style = 1 # Default block cursor
40 try:
41 while True:
42 message = await self.recv_queue.get()
43 cmd = message[0]
44 if cmd == "setup":
45 await self.send_queue.put(["set_size", self.nrow, self.ncol])
46 elif cmd == "stdout":
47 chars = message[1]
49 # Track cursor style sequences - comprehensive detection
51 # Debug: Look for cursor sequences specifically
52 if ' q' in chars or 'q' in chars:
53 print(f"DEBUG: Raw chars containing 'q': {repr(chars)}")
55 # Standard DECSCUSR sequences: ESC [ Ps q (vim style)
56 cursor_style_pattern = re.compile(r'\x1b\[([0-6]) q')
57 for match in cursor_style_pattern.finditer(chars):
58 style_num = int(match.group(1))
59 old_style = self.cursor_style
60 if style_num in [0, 1, 2]: # 0=default, 1=blinking block, 2=steady block
61 self.cursor_style = 1 # block
62 elif style_num in [3, 4]: # 3=blinking underline, 4=steady underline
63 self.cursor_style = 3 # underline
64 elif style_num in [5, 6]: # 5=blinking bar, 6=steady bar
65 self.cursor_style = 5 # bar
66 print(f"DEBUG: VIM cursor style changed from {old_style} to {self.cursor_style} (sequence: {repr(match.group(0))})")
68 # Linux terminal sequences: ESC [ ? Ps c
69 linux_cursor_pattern = re.compile(r'\x1b\[\?([0-9]+)c')
70 for match in linux_cursor_pattern.finditer(chars):
71 style_num = int(match.group(1))
72 old_style = self.cursor_style
73 if style_num == 0:
74 self.cursor_style = 1 # normal block
75 elif style_num == 1:
76 self.cursor_style = 0 # invisible
77 elif style_num == 8:
78 self.cursor_style = 3 # very visible (underline)
79 print(f"DEBUG: Linux cursor style changed from {old_style} to {self.cursor_style} (sequence: {match.group(0)})")
81 # Vim-style cursor sequences: ESC ] 50 ; CursorShape=N BEL
82 vim_cursor_pattern = re.compile(r'\x1b\]50;CursorShape=([0-2])\x07')
83 for match in vim_cursor_pattern.finditer(chars):
84 shape_num = int(match.group(1))
85 old_style = self.cursor_style
86 if shape_num == 0:
87 self.cursor_style = 1 # block
88 elif shape_num == 1:
89 self.cursor_style = 5 # bar
90 elif shape_num == 2:
91 self.cursor_style = 3 # underline
92 print(f"DEBUG: Vim cursor style changed from {old_style} to {self.cursor_style} (sequence: {match.group(0)})")
94 # Alternative vim sequences: ESC ] 12 ; color BEL (cursor color)
95 # We don't change style but log it
96 vim_color_pattern = re.compile(r'\x1b\]12;[^\\x07]*\x07')
97 for match in vim_color_pattern.finditer(chars):
98 print(f"DEBUG: Vim cursor color sequence detected: {match.group(0)}")
100 # Cursor visibility sequences
101 cursor_visibility_pattern = re.compile(r'\x1b\[\?25([hl])')
102 for match in cursor_visibility_pattern.finditer(chars):
103 visibility = match.group(1)
104 if visibility == 'l': # hide cursor
105 print(f"DEBUG: Cursor hidden")
106 elif visibility == 'h': # show cursor
107 print(f"DEBUG: Cursor shown")
109 # Handle mouse tracking (from original)
110 _re_ansi_sequence = re.compile(r"(\x1b\[\??[\d;]*[a-zA-Z])")
111 DECSET_PREFIX = "\x1b[?"
113 for sep_match in re.finditer(_re_ansi_sequence, chars):
114 sequence = sep_match.group(0)
115 if sequence.startswith(DECSET_PREFIX):
116 parameters = sequence.removeprefix(DECSET_PREFIX).split(";")
117 if "1000h" in parameters:
118 self.mouse_tracking = True
119 if "1000l" in parameters:
120 self.mouse_tracking = False
122 try:
123 self.stream.feed(chars)
124 except TypeError as error:
125 from textual import log
126 log.warning("could not feed:", error)
128 # Custom display building with cursor styles
129 from rich.text import Text
130 lines = []
131 for y in range(self._screen.lines):
132 line_text = Text()
133 line = self._screen.buffer[y]
134 style_change_pos: int = 0
135 for x in range(self._screen.columns):
136 char = line[x]
138 # Check if this is the cursor position
139 is_cursor = (
140 self._screen.cursor.x == x
141 and self._screen.cursor.y == y
142 and not self._screen.cursor.hidden
143 )
145 # Modify character for cursor styles
146 char_data = char.data
147 if is_cursor:
148 cursor_style = getattr(self, 'cursor_style', 1)
149 if cursor_style == 5: # bar cursor
150 # Use a thin left-aligned vertical bar
151 char_data = "▎" # Left one-quarter block (slightly thicker but still thin)
152 elif cursor_style == 3: # underline cursor
153 char_data = char.data # Keep original character
155 line_text.append(char_data)
157 # Handle styling (from original)
158 if x > 0:
159 last_char = line[x - 1]
160 if not self.char_style_cmp(char, last_char) or x == self._screen.columns - 1:
161 last_style = self.char_rich_style(last_char)
162 line_text.stylize(last_style, style_change_pos, x + 1)
163 style_change_pos = x
165 # Apply cursor styling
166 if is_cursor:
167 cursor_style = getattr(self, 'cursor_style', 1)
168 if cursor_style == 1: # block cursor
169 line_text.stylize("reverse blink", x, x + 1)
170 elif cursor_style == 3: # underline cursor
171 line_text.stylize("underline bold blink", x, x + 1)
172 elif cursor_style == 5: # bar cursor
173 # Make the thin bar visible and blinking
174 line_text.stylize("bold blink", x, x + 1)
176 lines.append(line_text)
178 from textual_terminal._terminal import TerminalDisplay
179 self._display = TerminalDisplay(lines)
180 self.refresh()
182 elif cmd == "disconnect":
183 self.stop()
184 except asyncio.CancelledError:
185 pass
187 # Apply the monkey patch
188 Terminal.recv = _patched_recv
190 # Monkey-patch TerminalEmulator for better environment
191 _original_open_terminal = TerminalEmulator.open_terminal
193 def _patched_open_terminal(self, command: str):
194 """Patched version that uses linux terminal type for proper Unicode box-drawing."""
195 import pty
196 import shlex
197 import os
198 from pathlib import Path
200 self.pid, fd = pty.fork()
201 if self.pid == 0:
202 argv = shlex.split(command)
203 # Use linux terminal type - this makes programs output Unicode directly
204 # instead of VT100 line-drawing sequences that need translation
205 env = dict(
206 TERM="linux", # Critical: linux terminal outputs Unicode box-drawing directly
207 LC_ALL="en_US.UTF-8",
208 LC_CTYPE="en_US.UTF-8",
209 LANG="en_US.UTF-8",
210 HOME=str(Path.home()),
211 )
212 # Add current PATH and other important vars
213 for key in ['PATH', 'USER', 'SHELL']:
214 if key in os.environ:
215 env[key] = os.environ[key]
216 os.execvpe(argv[0], argv, env)
217 return fd
219 # Apply the monkey patch
220 TerminalEmulator.open_terminal = _patched_open_terminal
222except ImportError:
223 TERMINAL_AVAILABLE = False
224 # Create a placeholder Terminal class
225 class Terminal(Static):
226 def __init__(self, command=None, **kwargs):
227 super().__init__("Terminal not available\n\nInstall textual-terminal:\npip install textual-terminal", **kwargs)
228 self.command = command
230 def clear(self):
231 pass
233 def write(self, text):
234 pass
237class TerminalWindow(BaseOpenHCSWindow):
238 """Terminal window using textual-window system with embedded terminal."""
240 DEFAULT_CSS = """
241 TerminalWindow {
242 width: 80; height: 24;
243 min-width: 80; min-height: 20;
244 }
245 TerminalWindow #content_pane {
246 padding: 0;
247 }
248 TerminalWindow #terminal {
249 height: 1fr;
250 width: 100%;
251 }
252 """
254 def __init__(self, shell_command: str = None, **kwargs):
255 """
256 Initialize terminal window.
258 Args:
259 shell_command: Optional command to run (defaults to current shell)
260 """
261 # Get shell before calling super() so we can use it in title
262 self.shell_command = shell_command or self._get_current_shell()
264 # Extract shell name for title
265 import os
266 import logging
267 shell_name = os.path.basename(self.shell_command)
269 logger = logging.getLogger(__name__)
270 logger.info(f"Terminal: Initializing with shell: {self.shell_command}")
272 super().__init__(
273 window_id="terminal",
274 title=f"Terminal ({shell_name})",
275 mode="temporary",
276 **kwargs
277 )
279 # Track wrapper script for cleanup
280 self.wrapper_script_path = None
282 def _get_current_shell(self) -> str:
283 """Get the current shell command with login flag."""
284 import os
285 import logging
287 logger = logging.getLogger(__name__)
289 # Method 1: Check SHELL environment variable
290 shell_env = os.environ.get('SHELL')
291 if shell_env and os.path.exists(shell_env):
292 logger.debug(f"Terminal: Using shell from SHELL env var: {shell_env}")
293 return f"{shell_env} -l" # Login shell
295 # Method 2: Check parent process (what launched the TUI)
296 try:
297 import psutil
298 current_process = psutil.Process()
299 parent_process = current_process.parent()
300 if parent_process and parent_process.name() in ['bash', 'zsh', 'fish', 'tcsh', 'csh', 'sh']:
301 # Try to get the full path
302 try:
303 shell_path = parent_process.exe()
304 logger.debug(f"Terminal: Using shell from parent process: {shell_path}")
305 return f"{shell_path} -l" # Login shell
306 except (psutil.AccessDenied, psutil.NoSuchProcess):
307 # Fall back to name-based lookup
308 shell_name = parent_process.name()
309 shell_path = f"/bin/{shell_name}"
310 logger.debug(f"Terminal: Using shell from parent process name: {shell_path}")
311 return f"{shell_path} -l" # Login shell
312 except (ImportError, psutil.NoSuchProcess, psutil.AccessDenied):
313 pass
315 # Method 3: Check what shell the user is actually using
316 try:
317 # Get the shell from /etc/passwd for current user
318 import pwd
319 user_shell = pwd.getpwuid(os.getuid()).pw_shell
320 if user_shell and os.path.exists(user_shell):
321 return f"{user_shell} -l" # Login shell
322 except (ImportError, KeyError, OSError):
323 pass
325 # Method 4: Try to detect common shells in order of preference
326 common_shells = ['/bin/zsh', '/usr/bin/zsh', '/bin/bash', '/usr/bin/bash', '/bin/fish', '/usr/bin/fish', '/bin/sh']
327 for shell_path in common_shells:
328 if os.path.exists(shell_path):
329 return f"{shell_path} -l" # Login shell
331 # Fallback: bash (original behavior)
332 logger.debug("Terminal: Using fallback shell: /bin/bash -l")
333 return "/bin/bash -l"
335 def _create_environment_wrapper(self) -> str:
336 """Create a wrapper command that exports current environment variables."""
337 import os
338 import tempfile
339 import stat
341 # Export key environment variables that the monkey-patch doesn't handle
342 important_vars = ['PATH', 'HOME', 'USER', 'SHELL', 'PWD']
343 env_exports = []
345 for key in important_vars:
346 if key in os.environ:
347 escaped_value = os.environ[key].replace('"', '\\"')
348 env_exports.append(f'export {key}="{escaped_value}"')
350 script_content = f'''#!/bin/bash
351# Export important environment variables
352{chr(10).join(env_exports)}
354# Launch the shell (terminal capabilities handled by monkey-patch)
355exec {self.shell_command}
356'''
358 # Write to temporary file
359 with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f:
360 f.write(script_content)
361 script_path = f.name
363 # Make executable
364 os.chmod(script_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
366 # Store for cleanup
367 self.wrapper_script_path = script_path
369 return script_path
371 def compose(self) -> ComposeResult:
372 """Compose the terminal window content - full window, no buttons."""
373 with Vertical():
374 if TERMINAL_AVAILABLE:
375 # Create wrapper script that exports all environment variables
376 wrapper_command = self._create_environment_wrapper()
378 yield Terminal(
379 command=wrapper_command,
380 id="terminal"
381 )
382 else:
383 yield Terminal(id="terminal")
385 async def on_key(self, event) -> None:
386 """Handle key presses for terminal shortcuts."""
387 # Ctrl+L to clear terminal (like most terminals)
388 if event.key == "ctrl+l":
389 if TERMINAL_AVAILABLE:
390 await self.send_command("clear")
391 # Ctrl+D to close terminal (like most terminals)
392 elif event.key == "ctrl+d":
393 self.close_window()
395 async def send_command(self, command: str):
396 """Send a command to the terminal."""
397 terminal = self.query_one("#terminal", Terminal)
398 if TERMINAL_AVAILABLE and hasattr(terminal, 'send_queue') and terminal.send_queue:
399 # Send each character of the command
400 for char in command:
401 await terminal.send_queue.put(["stdin", char])
402 # Send enter key
403 await terminal.send_queue.put(["stdin", "\n"])
405 def analyze_terminal_output(self, text: str) -> dict:
406 """
407 Analyze terminal output using enhanced escape sequence parsing.
409 Returns:
410 Dictionary with parsed information about colors, styles, etc.
411 """
412 if not TERMINAL_AVAILABLE:
413 return {}
415 try:
416 parts = terminal_enhancements.parse_enhanced_escape_sequences(text)
418 analysis = {
419 'has_colors': False,
420 'has_styles': False,
421 'title_changes': [],
422 'color_sequences': [],
423 'text_parts': [],
424 }
426 for text_part, seq_type, params in parts:
427 if seq_type == 'text':
428 analysis['text_parts'].append(text_part)
429 elif seq_type == 'csi' and params.get('command') == 'm':
430 # Color/style sequence
431 color_info = terminal_enhancements.parse_color_sequence(params.get('params', []))
432 analysis['color_sequences'].append(color_info)
433 if color_info.get('fg_color') or color_info.get('bg_color'):
434 analysis['has_colors'] = True
435 if any(color_info.get(k) for k in ['bold', 'italic', 'underline']):
436 analysis['has_styles'] = True
437 elif seq_type == 'title':
438 analysis['title_changes'].append(params.get('title', ''))
440 return analysis
442 except Exception as e:
443 # Fallback to empty analysis if parsing fails
444 return {}
446 def on_mount(self) -> None:
447 """Called when terminal window is mounted."""
448 terminal = self.query_one("#terminal", Terminal)
449 if TERMINAL_AVAILABLE:
450 terminal.start() # Start the terminal emulator
451 terminal.focus()
453 def on_unmount(self) -> None:
454 """Called when terminal window is unmounted - cleanup wrapper script."""
455 import os
456 if self.wrapper_script_path and os.path.exists(self.wrapper_script_path):
457 try:
458 os.unlink(self.wrapper_script_path)
459 except OSError:
460 pass # Ignore cleanup errors