Coverage for openhcs/textual_tui/windows/gateone-terminal-widget.py: 0.0%
244 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 02:09 +0000
1"""
2Gate One Terminal Widget for Textual
3A terminal emulator widget using Gate One's terminal.py instead of pyte
4"""
6import os
7import pty
8import asyncio
9import struct
10import logging
11from typing import Optional, Tuple
12from pathlib import Path
14# Unix-only imports (TUI is deprecated and not supported on Windows)
15try:
16 import fcntl
17 import termios
18 UNIX_AVAILABLE = True
19except ImportError:
20 UNIX_AVAILABLE = False
22from rich.style import Style
23from rich.segment import Segment
25from textual.widget import Widget
26from textual.reactive import reactive
27from textual import events
28from textual.strip import Strip
30# You'll need to extract terminal.py from Gate One
31# Download from: https://github.com/liftoff/GateOne
32try:
33 from terminal import Terminal as GateOneTerminal
34except ImportError:
35 raise ImportError(
36 "Please extract terminal.py from Gate One:\n"
37 "1. git clone https://github.com/liftoff/GateOne\n"
38 "2. Copy GateOne/gateone/terminal.py to your project"
39 )
42class GateOneTextualTerminal(Widget):
43 """A Textual widget that uses Gate One's terminal emulator."""
45 DEFAULT_CSS = """
46 GateOneTextualTerminal {
47 width: 100%;
48 height: 100%;
49 }
50 """
52 # Reactive properties
53 cursor_visible = reactive(True)
54 cursor_style = reactive(1) # 1=block, 3=underline, 5=bar
56 def __init__(
57 self,
58 command: Optional[str] = None,
59 size: Tuple[int, int] = (80, 24),
60 **kwargs
61 ):
62 super().__init__(**kwargs)
64 # Terminal dimensions
65 self.cols, self.rows = size
67 # Command to run
68 self.command = command or os.environ.get('SHELL', '/bin/bash')
70 # Gate One terminal instance
71 self.terminal = GateOneTerminal(rows=self.rows, cols=self.cols)
73 # Set up callbacks
74 self._setup_callbacks()
76 # PTY file descriptors
77 self.pty_master = None
78 self.pty_pid = None
80 # Async reader task
81 self._reader_task = None
83 # Color palette (will be populated from terminal)
84 self.colors = self._default_colors()
86 # For handling OSC sequences (like pywal)
87 self.osc_buffer = ""
89 # Logger
90 self.log = logging.getLogger(__name__)
92 def _default_colors(self) -> dict:
93 """Default 16-color palette."""
94 return {
95 0: "#000000", # Black
96 1: "#cd0000", # Red
97 2: "#00cd00", # Green
98 3: "#cdcd00", # Yellow
99 4: "#0000ee", # Blue
100 5: "#cd00cd", # Magenta
101 6: "#00cdcd", # Cyan
102 7: "#e5e5e5", # White
103 8: "#7f7f7f", # Bright Black
104 9: "#ff0000", # Bright Red
105 10: "#00ff00", # Bright Green
106 11: "#ffff00", # Bright Yellow
107 12: "#5c5cff", # Bright Blue
108 13: "#ff00ff", # Bright Magenta
109 14: "#00ffff", # Bright Cyan
110 15: "#ffffff", # Bright White
111 }
113 def _setup_callbacks(self):
114 """Set up Gate One terminal callbacks."""
115 t = self.terminal
117 # Screen content changed
118 t.callbacks[t.CALLBACK_CHANGED] = self._on_terminal_change
120 # Cursor position changed
121 t.callbacks[t.CALLBACK_CURSOR_POS] = self._on_cursor_move
123 # Mode changes (for cursor styles, etc)
124 t.callbacks[t.CALLBACK_MODE] = self._on_mode_change
126 # Title changes
127 t.callbacks[t.CALLBACK_TITLE] = self._on_title_change
129 # Bell
130 t.callbacks[t.CALLBACK_BELL] = self._on_bell
132 # For handling special escape sequences
133 t.callbacks[t.CALLBACK_ESC] = self._on_escape_sequence
135 # OSC sequences (for colors)
136 t.callbacks[t.CALLBACK_OSC] = self._on_osc_sequence
138 def _on_terminal_change(self):
139 """Called when terminal content changes."""
140 self.refresh()
142 def _on_cursor_move(self):
143 """Called when cursor moves."""
144 self.refresh()
146 def _on_mode_change(self, mode, value):
147 """Called when terminal mode changes."""
148 self.log.debug(f"Mode change: {mode} = {value}")
150 # Handle cursor visibility
151 if mode == 'cursor':
152 self.cursor_visible = value
154 self.refresh()
156 def _on_title_change(self, title):
157 """Called when terminal title changes."""
158 # You could emit an event here to update window title
159 self.log.debug(f"Title changed: {title}")
161 def _on_bell(self):
162 """Called when bell character is received."""
163 # You could flash the screen or play a sound
164 self.log.debug("Bell!")
166 def _on_escape_sequence(self, seq):
167 """Handle special escape sequences like cursor style."""
168 self.log.debug(f"Escape sequence: {repr(seq)}")
170 # Check for DECSCUSR (cursor style) - ESC [ Ps SP q
171 if seq.endswith(' q'):
172 try:
173 # Extract parameter
174 param = seq[2:-2].strip()
175 if param.isdigit():
176 style = int(param)
177 if style in [0, 1, 2]:
178 self.cursor_style = 1 # Block
179 elif style in [3, 4]:
180 self.cursor_style = 3 # Underline
181 elif style in [5, 6]:
182 self.cursor_style = 5 # Bar
183 self.log.debug(f"Cursor style changed to: {self.cursor_style}")
184 except:
185 pass
187 def _on_osc_sequence(self, command, text):
188 """Handle OSC sequences (for pywal colors)."""
189 self.log.debug(f"OSC: command={command}, text={text}")
191 # OSC 4 - Change color palette
192 if command == 4:
193 # Format: "index;rgb:rr/gg/bb"
194 parts = text.split(';', 1)
195 if len(parts) == 2:
196 try:
197 index = int(parts[0])
198 color = parts[1]
199 if color.startswith('rgb:'):
200 # Convert rgb:rr/gg/bb to #rrggbb
201 rgb = color[4:].split('/')
202 if len(rgb) == 3:
203 # Handle both 8-bit and 16-bit formats
204 r = rgb[0][:2]
205 g = rgb[1][:2]
206 b = rgb[2][:2]
207 hex_color = f"#{r}{g}{b}"
208 self.colors[index] = hex_color
209 self.log.debug(f"Color {index} = {hex_color}")
210 except:
211 pass
213 # OSC 10 - Change foreground color
214 elif command == 10:
215 self.log.debug(f"Foreground color: {text}")
217 # OSC 11 - Change background color
218 elif command == 11:
219 self.log.debug(f"Background color: {text}")
221 async def on_mount(self):
222 """Start the terminal when widget is mounted."""
223 await self.start_terminal()
225 async def on_unmount(self):
226 """Clean up when widget is unmounted."""
227 await self.stop_terminal()
229 async def start_terminal(self):
230 """Start the PTY and terminal process."""
231 # Create PTY
232 self.pty_master, pty_slave = pty.openpty()
234 # Get terminal size
235 cols, rows = self.size.width, self.size.height
236 if cols > 0 and rows > 0:
237 self.resize_terminal(cols, rows)
239 # Fork and exec
240 self.pty_pid = os.fork()
242 if self.pty_pid == 0: # Child process
243 os.close(self.pty_master)
245 # Make slave the controlling terminal
246 os.setsid()
247 os.dup2(pty_slave, 0) # stdin
248 os.dup2(pty_slave, 1) # stdout
249 os.dup2(pty_slave, 2) # stderr
250 os.close(pty_slave)
252 # Set environment
253 env = os.environ.copy()
254 env['TERM'] = 'xterm-256color' # Gate One supports this well
255 env['COLORTERM'] = 'truecolor'
257 # Load pywal colors if available
258 pywal_sequence_file = Path.home() / '.cache' / 'wal' / 'sequences'
259 if pywal_sequence_file.exists():
260 # This will be sent to our terminal on startup
261 env['PYWAL_SEQUENCES'] = str(pywal_sequence_file)
263 # Execute shell
264 os.execve(self.command, [self.command], env)
266 else: # Parent process
267 os.close(pty_slave)
269 # Make non-blocking
270 import fcntl
271 flags = fcntl.fcntl(self.pty_master, fcntl.F_GETFL)
272 fcntl.fcntl(self.pty_master, fcntl.F_SETFL, flags | os.O_NONBLOCK)
274 # Start reader task
275 self._reader_task = asyncio.create_task(self._read_pty())
277 # Send pywal sequences if available
278 await self._apply_pywal_colors()
280 async def _apply_pywal_colors(self):
281 """Apply pywal color sequences if available."""
282 pywal_sequence_file = Path.home() / '.cache' / 'wal' / 'sequences'
283 if pywal_sequence_file.exists():
284 try:
285 sequences = pywal_sequence_file.read_bytes()
286 os.write(self.pty_master, sequences)
287 self.log.debug("Applied pywal color sequences")
288 except Exception as e:
289 self.log.error(f"Failed to apply pywal colors: {e}")
291 async def stop_terminal(self):
292 """Stop the terminal process."""
293 if self._reader_task:
294 self._reader_task.cancel()
296 if self.pty_master:
297 os.close(self.pty_master)
299 if self.pty_pid:
300 try:
301 os.kill(self.pty_pid, 9)
302 os.waitpid(self.pty_pid, 0)
303 except:
304 pass
306 async def _read_pty(self):
307 """Read data from PTY and feed to terminal."""
308 loop = asyncio.get_event_loop()
310 while True:
311 try:
312 # Read available data
313 data = await loop.run_in_executor(None, os.read, self.pty_master, 4096)
314 if data:
315 # Decode and feed to terminal
316 text = data.decode('utf-8', errors='replace')
317 self.terminal.write(text)
318 else:
319 break
320 except OSError:
321 await asyncio.sleep(0.01)
322 except Exception as e:
323 self.log.error(f"PTY read error: {e}")
324 break
326 def resize_terminal(self, cols: int, rows: int):
327 """Resize the terminal."""
328 self.cols = cols
329 self.rows = rows
331 # Resize Gate One terminal
332 self.terminal.resize(rows, cols)
334 # Resize PTY
335 if self.pty_master:
336 size = struct.pack('HHHH', rows, cols, 0, 0)
337 fcntl.ioctl(self.pty_master, termios.TIOCSWINSZ, size)
339 def on_resize(self, event):
340 """Handle widget resize."""
341 # Calculate character dimensions
342 cols = self.size.width
343 rows = self.size.height
345 if cols > 0 and rows > 0:
346 self.resize_terminal(cols, rows)
348 def render_line(self, y: int) -> Strip:
349 """Render a single line of the terminal."""
350 segments = []
352 # Get line from terminal
353 if y < len(self.terminal.screen):
354 line = self.terminal.screen[y]
356 for x, char in enumerate(line):
357 # Get character data
358 char_data = char.data if hasattr(char, 'data') else ' '
359 if not char_data:
360 char_data = ' '
362 # Build style from character attributes
363 style = Style()
365 # Foreground color
366 if hasattr(char, 'fg') and char.fg != 'default':
367 if isinstance(char.fg, int) and char.fg < 16:
368 style = style + Style(color=self.colors.get(char.fg, "#ffffff"))
369 else:
370 style = style + Style(color=str(char.fg))
372 # Background color
373 if hasattr(char, 'bg') and char.bg != 'default':
374 if isinstance(char.bg, int) and char.bg < 16:
375 style = style + Style(bgcolor=self.colors.get(char.bg, "#000000"))
376 else:
377 style = style + Style(bgcolor=str(char.bg))
379 # Text attributes
380 if hasattr(char, 'bold') and char.bold:
381 style = style + Style(bold=True)
382 if hasattr(char, 'italic') and char.italic:
383 style = style + Style(italic=True)
384 if hasattr(char, 'underscore') and char.underscore:
385 style = style + Style(underline=True)
386 if hasattr(char, 'strikethrough') and char.strikethrough:
387 style = style + Style(strike=True)
389 # IMPORTANT: Handle reverse video (inversion)
390 if hasattr(char, 'reverse') and char.reverse:
391 # Swap foreground and background
392 fg = style.color
393 bg = style.bgcolor
394 style = Style(
395 color=bg or "#000000",
396 bgcolor=fg or "#ffffff",
397 bold=style.bold,
398 italic=style.italic,
399 underline=style.underline,
400 strike=style.strike
401 )
403 # Handle cursor
404 is_cursor = (x == self.terminal.cursor.x and
405 y == self.terminal.cursor.y and
406 self.cursor_visible and
407 not self.terminal.cursor.hidden)
409 if is_cursor:
410 if self.cursor_style == 1: # Block cursor
411 style = style + Style(reverse=True, blink=True)
412 elif self.cursor_style == 3: # Underline cursor
413 style = style + Style(underline=True, bold=True, blink=True)
414 elif self.cursor_style == 5: # Bar cursor
415 # For bar cursor, we modify the character
416 if char_data == ' ':
417 char_data = '▎' # Left one-eighth block
418 style = style + Style(bold=True, blink=True)
420 segments.append(Segment(char_data, style))
422 return Strip(segments)
424 def on_key(self, event: events.Key) -> None:
425 """Handle keyboard input."""
426 if not self.pty_master:
427 return
429 # Map key to escape sequence
430 key_map = {
431 "up": "\x1b[A",
432 "down": "\x1b[B",
433 "right": "\x1b[C",
434 "left": "\x1b[D",
435 "home": "\x1b[H",
436 "end": "\x1b[F",
437 "pageup": "\x1b[5~",
438 "pagedown": "\x1b[6~",
439 "insert": "\x1b[2~",
440 "delete": "\x1b[3~",
441 "f1": "\x1bOP",
442 "f2": "\x1bOQ",
443 "f3": "\x1bOR",
444 "f4": "\x1bOS",
445 "f5": "\x1b[15~",
446 "f6": "\x1b[17~",
447 "f7": "\x1b[18~",
448 "f8": "\x1b[19~",
449 "f9": "\x1b[20~",
450 "f10": "\x1b[21~",
451 "f11": "\x1b[23~",
452 "f12": "\x1b[24~",
453 "escape": "\x1b",
454 "enter": "\r",
455 "tab": "\t",
456 "backspace": "\x7f",
457 }
459 # Check for special keys
460 if event.key in key_map:
461 data = key_map[event.key]
462 elif event.character:
463 # Regular character
464 data = event.character
466 # Handle Ctrl combinations
467 if event.key.startswith("ctrl+"):
468 char = event.key[5:]
469 if len(char) == 1 and char.isalpha():
470 # Ctrl+A = 1, Ctrl+B = 2, etc.
471 data = chr(ord(char.upper()) - ord('A') + 1)
472 else:
473 return
475 # Write to PTY
476 try:
477 os.write(self.pty_master, data.encode('utf-8'))
478 except:
479 pass
481 def on_paste(self, event: events.Paste) -> None:
482 """Handle paste events."""
483 if self.pty_master and event.text:
484 try:
485 # Send bracketed paste sequences
486 os.write(self.pty_master, b'\x1b[200~')
487 os.write(self.pty_master, event.text.encode('utf-8'))
488 os.write(self.pty_master, b'\x1b[201~')
489 except:
490 pass
493# Example usage with your window system
494if __name__ == "__main__":
495 from textual.app import App
497 class TerminalApp(App):
498 def compose(self):
499 yield GateOneTextualTerminal()
501 app = TerminalApp()
502 app.run()