Coverage for openhcs/textual_tui/windows/gateone-terminal-widget.py: 0.0%
244 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-14 05:57 +0000
1"""
2Gate One Terminal Widget for Textual
3A terminal emulator widget using Gate One's terminal.py instead of pyte
4"""
6import os
7import pty
8import asyncio
9import struct
10import fcntl
11import termios
12import logging
13from typing import Optional, Tuple, List, Callable
14from pathlib import Path
16from rich.text import Text
17from rich.style import Style
18from rich.console import Console
19from rich.segment import Segment
20from rich.color import Color as RichColor
22from textual.widget import Widget
23from textual.reactive import reactive
24from textual.geometry import Size
25from textual import events
26from textual.strip import Strip
28# You'll need to extract terminal.py from Gate One
29# Download from: https://github.com/liftoff/GateOne
30try:
31 from terminal import Terminal as GateOneTerminal
32except ImportError:
33 raise ImportError(
34 "Please extract terminal.py from Gate One:\n"
35 "1. git clone https://github.com/liftoff/GateOne\n"
36 "2. Copy GateOne/gateone/terminal.py to your project"
37 )
40class GateOneTextualTerminal(Widget):
41 """A Textual widget that uses Gate One's terminal emulator."""
43 DEFAULT_CSS = """
44 GateOneTextualTerminal {
45 width: 100%;
46 height: 100%;
47 }
48 """
50 # Reactive properties
51 cursor_visible = reactive(True)
52 cursor_style = reactive(1) # 1=block, 3=underline, 5=bar
54 def __init__(
55 self,
56 command: Optional[str] = None,
57 size: Tuple[int, int] = (80, 24),
58 **kwargs
59 ):
60 super().__init__(**kwargs)
62 # Terminal dimensions
63 self.cols, self.rows = size
65 # Command to run
66 self.command = command or os.environ.get('SHELL', '/bin/bash')
68 # Gate One terminal instance
69 self.terminal = GateOneTerminal(rows=self.rows, cols=self.cols)
71 # Set up callbacks
72 self._setup_callbacks()
74 # PTY file descriptors
75 self.pty_master = None
76 self.pty_pid = None
78 # Async reader task
79 self._reader_task = None
81 # Color palette (will be populated from terminal)
82 self.colors = self._default_colors()
84 # For handling OSC sequences (like pywal)
85 self.osc_buffer = ""
87 # Logger
88 self.log = logging.getLogger(__name__)
90 def _default_colors(self) -> dict:
91 """Default 16-color palette."""
92 return {
93 0: "#000000", # Black
94 1: "#cd0000", # Red
95 2: "#00cd00", # Green
96 3: "#cdcd00", # Yellow
97 4: "#0000ee", # Blue
98 5: "#cd00cd", # Magenta
99 6: "#00cdcd", # Cyan
100 7: "#e5e5e5", # White
101 8: "#7f7f7f", # Bright Black
102 9: "#ff0000", # Bright Red
103 10: "#00ff00", # Bright Green
104 11: "#ffff00", # Bright Yellow
105 12: "#5c5cff", # Bright Blue
106 13: "#ff00ff", # Bright Magenta
107 14: "#00ffff", # Bright Cyan
108 15: "#ffffff", # Bright White
109 }
111 def _setup_callbacks(self):
112 """Set up Gate One terminal callbacks."""
113 t = self.terminal
115 # Screen content changed
116 t.callbacks[t.CALLBACK_CHANGED] = self._on_terminal_change
118 # Cursor position changed
119 t.callbacks[t.CALLBACK_CURSOR_POS] = self._on_cursor_move
121 # Mode changes (for cursor styles, etc)
122 t.callbacks[t.CALLBACK_MODE] = self._on_mode_change
124 # Title changes
125 t.callbacks[t.CALLBACK_TITLE] = self._on_title_change
127 # Bell
128 t.callbacks[t.CALLBACK_BELL] = self._on_bell
130 # For handling special escape sequences
131 t.callbacks[t.CALLBACK_ESC] = self._on_escape_sequence
133 # OSC sequences (for colors)
134 t.callbacks[t.CALLBACK_OSC] = self._on_osc_sequence
136 def _on_terminal_change(self):
137 """Called when terminal content changes."""
138 self.refresh()
140 def _on_cursor_move(self):
141 """Called when cursor moves."""
142 self.refresh()
144 def _on_mode_change(self, mode, value):
145 """Called when terminal mode changes."""
146 self.log.debug(f"Mode change: {mode} = {value}")
148 # Handle cursor visibility
149 if mode == 'cursor':
150 self.cursor_visible = value
152 self.refresh()
154 def _on_title_change(self, title):
155 """Called when terminal title changes."""
156 # You could emit an event here to update window title
157 self.log.debug(f"Title changed: {title}")
159 def _on_bell(self):
160 """Called when bell character is received."""
161 # You could flash the screen or play a sound
162 self.log.debug("Bell!")
164 def _on_escape_sequence(self, seq):
165 """Handle special escape sequences like cursor style."""
166 self.log.debug(f"Escape sequence: {repr(seq)}")
168 # Check for DECSCUSR (cursor style) - ESC [ Ps SP q
169 if seq.endswith(' q'):
170 try:
171 # Extract parameter
172 param = seq[2:-2].strip()
173 if param.isdigit():
174 style = int(param)
175 if style in [0, 1, 2]:
176 self.cursor_style = 1 # Block
177 elif style in [3, 4]:
178 self.cursor_style = 3 # Underline
179 elif style in [5, 6]:
180 self.cursor_style = 5 # Bar
181 self.log.debug(f"Cursor style changed to: {self.cursor_style}")
182 except:
183 pass
185 def _on_osc_sequence(self, command, text):
186 """Handle OSC sequences (for pywal colors)."""
187 self.log.debug(f"OSC: command={command}, text={text}")
189 # OSC 4 - Change color palette
190 if command == 4:
191 # Format: "index;rgb:rr/gg/bb"
192 parts = text.split(';', 1)
193 if len(parts) == 2:
194 try:
195 index = int(parts[0])
196 color = parts[1]
197 if color.startswith('rgb:'):
198 # Convert rgb:rr/gg/bb to #rrggbb
199 rgb = color[4:].split('/')
200 if len(rgb) == 3:
201 # Handle both 8-bit and 16-bit formats
202 r = rgb[0][:2]
203 g = rgb[1][:2]
204 b = rgb[2][:2]
205 hex_color = f"#{r}{g}{b}"
206 self.colors[index] = hex_color
207 self.log.debug(f"Color {index} = {hex_color}")
208 except:
209 pass
211 # OSC 10 - Change foreground color
212 elif command == 10:
213 self.log.debug(f"Foreground color: {text}")
215 # OSC 11 - Change background color
216 elif command == 11:
217 self.log.debug(f"Background color: {text}")
219 async def on_mount(self):
220 """Start the terminal when widget is mounted."""
221 await self.start_terminal()
223 async def on_unmount(self):
224 """Clean up when widget is unmounted."""
225 await self.stop_terminal()
227 async def start_terminal(self):
228 """Start the PTY and terminal process."""
229 # Create PTY
230 self.pty_master, pty_slave = pty.openpty()
232 # Get terminal size
233 cols, rows = self.size.width, self.size.height
234 if cols > 0 and rows > 0:
235 self.resize_terminal(cols, rows)
237 # Fork and exec
238 self.pty_pid = os.fork()
240 if self.pty_pid == 0: # Child process
241 os.close(self.pty_master)
243 # Make slave the controlling terminal
244 os.setsid()
245 os.dup2(pty_slave, 0) # stdin
246 os.dup2(pty_slave, 1) # stdout
247 os.dup2(pty_slave, 2) # stderr
248 os.close(pty_slave)
250 # Set environment
251 env = os.environ.copy()
252 env['TERM'] = 'xterm-256color' # Gate One supports this well
253 env['COLORTERM'] = 'truecolor'
255 # Load pywal colors if available
256 pywal_sequence_file = Path.home() / '.cache' / 'wal' / 'sequences'
257 if pywal_sequence_file.exists():
258 # This will be sent to our terminal on startup
259 env['PYWAL_SEQUENCES'] = str(pywal_sequence_file)
261 # Execute shell
262 os.execve(self.command, [self.command], env)
264 else: # Parent process
265 os.close(pty_slave)
267 # Make non-blocking
268 import fcntl
269 flags = fcntl.fcntl(self.pty_master, fcntl.F_GETFL)
270 fcntl.fcntl(self.pty_master, fcntl.F_SETFL, flags | os.O_NONBLOCK)
272 # Start reader task
273 self._reader_task = asyncio.create_task(self._read_pty())
275 # Send pywal sequences if available
276 await self._apply_pywal_colors()
278 async def _apply_pywal_colors(self):
279 """Apply pywal color sequences if available."""
280 pywal_sequence_file = Path.home() / '.cache' / 'wal' / 'sequences'
281 if pywal_sequence_file.exists():
282 try:
283 sequences = pywal_sequence_file.read_bytes()
284 os.write(self.pty_master, sequences)
285 self.log.debug("Applied pywal color sequences")
286 except Exception as e:
287 self.log.error(f"Failed to apply pywal colors: {e}")
289 async def stop_terminal(self):
290 """Stop the terminal process."""
291 if self._reader_task:
292 self._reader_task.cancel()
294 if self.pty_master:
295 os.close(self.pty_master)
297 if self.pty_pid:
298 try:
299 os.kill(self.pty_pid, 9)
300 os.waitpid(self.pty_pid, 0)
301 except:
302 pass
304 async def _read_pty(self):
305 """Read data from PTY and feed to terminal."""
306 loop = asyncio.get_event_loop()
308 while True:
309 try:
310 # Read available data
311 data = await loop.run_in_executor(None, os.read, self.pty_master, 4096)
312 if data:
313 # Decode and feed to terminal
314 text = data.decode('utf-8', errors='replace')
315 self.terminal.write(text)
316 else:
317 break
318 except OSError:
319 await asyncio.sleep(0.01)
320 except Exception as e:
321 self.log.error(f"PTY read error: {e}")
322 break
324 def resize_terminal(self, cols: int, rows: int):
325 """Resize the terminal."""
326 self.cols = cols
327 self.rows = rows
329 # Resize Gate One terminal
330 self.terminal.resize(rows, cols)
332 # Resize PTY
333 if self.pty_master:
334 size = struct.pack('HHHH', rows, cols, 0, 0)
335 fcntl.ioctl(self.pty_master, termios.TIOCSWINSZ, size)
337 def on_resize(self, event):
338 """Handle widget resize."""
339 # Calculate character dimensions
340 cols = self.size.width
341 rows = self.size.height
343 if cols > 0 and rows > 0:
344 self.resize_terminal(cols, rows)
346 def render_line(self, y: int) -> Strip:
347 """Render a single line of the terminal."""
348 segments = []
350 # Get line from terminal
351 if y < len(self.terminal.screen):
352 line = self.terminal.screen[y]
354 for x, char in enumerate(line):
355 # Get character data
356 char_data = char.data if hasattr(char, 'data') else ' '
357 if not char_data:
358 char_data = ' '
360 # Build style from character attributes
361 style = Style()
363 # Foreground color
364 if hasattr(char, 'fg') and char.fg != 'default':
365 if isinstance(char.fg, int) and char.fg < 16:
366 style = style + Style(color=self.colors.get(char.fg, "#ffffff"))
367 else:
368 style = style + Style(color=str(char.fg))
370 # Background color
371 if hasattr(char, 'bg') and char.bg != 'default':
372 if isinstance(char.bg, int) and char.bg < 16:
373 style = style + Style(bgcolor=self.colors.get(char.bg, "#000000"))
374 else:
375 style = style + Style(bgcolor=str(char.bg))
377 # Text attributes
378 if hasattr(char, 'bold') and char.bold:
379 style = style + Style(bold=True)
380 if hasattr(char, 'italic') and char.italic:
381 style = style + Style(italic=True)
382 if hasattr(char, 'underscore') and char.underscore:
383 style = style + Style(underline=True)
384 if hasattr(char, 'strikethrough') and char.strikethrough:
385 style = style + Style(strike=True)
387 # IMPORTANT: Handle reverse video (inversion)
388 if hasattr(char, 'reverse') and char.reverse:
389 # Swap foreground and background
390 fg = style.color
391 bg = style.bgcolor
392 style = Style(
393 color=bg or "#000000",
394 bgcolor=fg or "#ffffff",
395 bold=style.bold,
396 italic=style.italic,
397 underline=style.underline,
398 strike=style.strike
399 )
401 # Handle cursor
402 is_cursor = (x == self.terminal.cursor.x and
403 y == self.terminal.cursor.y and
404 self.cursor_visible and
405 not self.terminal.cursor.hidden)
407 if is_cursor:
408 if self.cursor_style == 1: # Block cursor
409 style = style + Style(reverse=True, blink=True)
410 elif self.cursor_style == 3: # Underline cursor
411 style = style + Style(underline=True, bold=True, blink=True)
412 elif self.cursor_style == 5: # Bar cursor
413 # For bar cursor, we modify the character
414 if char_data == ' ':
415 char_data = '▎' # Left one-eighth block
416 style = style + Style(bold=True, blink=True)
418 segments.append(Segment(char_data, style))
420 return Strip(segments)
422 def on_key(self, event: events.Key) -> None:
423 """Handle keyboard input."""
424 if not self.pty_master:
425 return
427 # Map key to escape sequence
428 key_map = {
429 "up": "\x1b[A",
430 "down": "\x1b[B",
431 "right": "\x1b[C",
432 "left": "\x1b[D",
433 "home": "\x1b[H",
434 "end": "\x1b[F",
435 "pageup": "\x1b[5~",
436 "pagedown": "\x1b[6~",
437 "insert": "\x1b[2~",
438 "delete": "\x1b[3~",
439 "f1": "\x1bOP",
440 "f2": "\x1bOQ",
441 "f3": "\x1bOR",
442 "f4": "\x1bOS",
443 "f5": "\x1b[15~",
444 "f6": "\x1b[17~",
445 "f7": "\x1b[18~",
446 "f8": "\x1b[19~",
447 "f9": "\x1b[20~",
448 "f10": "\x1b[21~",
449 "f11": "\x1b[23~",
450 "f12": "\x1b[24~",
451 "escape": "\x1b",
452 "enter": "\r",
453 "tab": "\t",
454 "backspace": "\x7f",
455 }
457 # Check for special keys
458 if event.key in key_map:
459 data = key_map[event.key]
460 elif event.character:
461 # Regular character
462 data = event.character
464 # Handle Ctrl combinations
465 if event.key.startswith("ctrl+"):
466 char = event.key[5:]
467 if len(char) == 1 and char.isalpha():
468 # Ctrl+A = 1, Ctrl+B = 2, etc.
469 data = chr(ord(char.upper()) - ord('A') + 1)
470 else:
471 return
473 # Write to PTY
474 try:
475 os.write(self.pty_master, data.encode('utf-8'))
476 except:
477 pass
479 def on_paste(self, event: events.Paste) -> None:
480 """Handle paste events."""
481 if self.pty_master and event.text:
482 try:
483 # Send bracketed paste sequences
484 os.write(self.pty_master, b'\x1b[200~')
485 os.write(self.pty_master, event.text.encode('utf-8'))
486 os.write(self.pty_master, b'\x1b[201~')
487 except:
488 pass
491# Example usage with your window system
492if __name__ == "__main__":
493 from textual.app import App
495 class TerminalApp(App):
496 def compose(self):
497 yield GateOneTextualTerminal()
499 app = TerminalApp()
500 app.run()