Coverage for openhcs/textual_tui/windows/gateone-terminal-widget.py: 0.0%

244 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-14 05:57 +0000

1""" 

2Gate One Terminal Widget for Textual 

3A terminal emulator widget using Gate One's terminal.py instead of pyte 

4""" 

5 

6import os 

7import pty 

8import asyncio 

9import struct 

10import fcntl 

11import termios 

12import logging 

13from typing import Optional, Tuple, List, Callable 

14from pathlib import Path 

15 

16from rich.text import Text 

17from rich.style import Style 

18from rich.console import Console 

19from rich.segment import Segment 

20from rich.color import Color as RichColor 

21 

22from textual.widget import Widget 

23from textual.reactive import reactive 

24from textual.geometry import Size 

25from textual import events 

26from textual.strip import Strip 

27 

28# You'll need to extract terminal.py from Gate One 

29# Download from: https://github.com/liftoff/GateOne 

30try: 

31 from terminal import Terminal as GateOneTerminal 

32except ImportError: 

33 raise ImportError( 

34 "Please extract terminal.py from Gate One:\n" 

35 "1. git clone https://github.com/liftoff/GateOne\n" 

36 "2. Copy GateOne/gateone/terminal.py to your project" 

37 ) 

38 

39 

40class GateOneTextualTerminal(Widget): 

41 """A Textual widget that uses Gate One's terminal emulator.""" 

42 

43 DEFAULT_CSS = """ 

44 GateOneTextualTerminal { 

45 width: 100%; 

46 height: 100%; 

47 } 

48 """ 

49 

50 # Reactive properties 

51 cursor_visible = reactive(True) 

52 cursor_style = reactive(1) # 1=block, 3=underline, 5=bar 

53 

54 def __init__( 

55 self, 

56 command: Optional[str] = None, 

57 size: Tuple[int, int] = (80, 24), 

58 **kwargs 

59 ): 

60 super().__init__(**kwargs) 

61 

62 # Terminal dimensions 

63 self.cols, self.rows = size 

64 

65 # Command to run 

66 self.command = command or os.environ.get('SHELL', '/bin/bash') 

67 

68 # Gate One terminal instance 

69 self.terminal = GateOneTerminal(rows=self.rows, cols=self.cols) 

70 

71 # Set up callbacks 

72 self._setup_callbacks() 

73 

74 # PTY file descriptors 

75 self.pty_master = None 

76 self.pty_pid = None 

77 

78 # Async reader task 

79 self._reader_task = None 

80 

81 # Color palette (will be populated from terminal) 

82 self.colors = self._default_colors() 

83 

84 # For handling OSC sequences (like pywal) 

85 self.osc_buffer = "" 

86 

87 # Logger 

88 self.log = logging.getLogger(__name__) 

89 

90 def _default_colors(self) -> dict: 

91 """Default 16-color palette.""" 

92 return { 

93 0: "#000000", # Black 

94 1: "#cd0000", # Red 

95 2: "#00cd00", # Green 

96 3: "#cdcd00", # Yellow 

97 4: "#0000ee", # Blue 

98 5: "#cd00cd", # Magenta 

99 6: "#00cdcd", # Cyan 

100 7: "#e5e5e5", # White 

101 8: "#7f7f7f", # Bright Black 

102 9: "#ff0000", # Bright Red 

103 10: "#00ff00", # Bright Green 

104 11: "#ffff00", # Bright Yellow 

105 12: "#5c5cff", # Bright Blue 

106 13: "#ff00ff", # Bright Magenta 

107 14: "#00ffff", # Bright Cyan 

108 15: "#ffffff", # Bright White 

109 } 

110 

111 def _setup_callbacks(self): 

112 """Set up Gate One terminal callbacks.""" 

113 t = self.terminal 

114 

115 # Screen content changed 

116 t.callbacks[t.CALLBACK_CHANGED] = self._on_terminal_change 

117 

118 # Cursor position changed 

119 t.callbacks[t.CALLBACK_CURSOR_POS] = self._on_cursor_move 

120 

121 # Mode changes (for cursor styles, etc) 

122 t.callbacks[t.CALLBACK_MODE] = self._on_mode_change 

123 

124 # Title changes 

125 t.callbacks[t.CALLBACK_TITLE] = self._on_title_change 

126 

127 # Bell 

128 t.callbacks[t.CALLBACK_BELL] = self._on_bell 

129 

130 # For handling special escape sequences 

131 t.callbacks[t.CALLBACK_ESC] = self._on_escape_sequence 

132 

133 # OSC sequences (for colors) 

134 t.callbacks[t.CALLBACK_OSC] = self._on_osc_sequence 

135 

136 def _on_terminal_change(self): 

137 """Called when terminal content changes.""" 

138 self.refresh() 

139 

140 def _on_cursor_move(self): 

141 """Called when cursor moves.""" 

142 self.refresh() 

143 

144 def _on_mode_change(self, mode, value): 

145 """Called when terminal mode changes.""" 

146 self.log.debug(f"Mode change: {mode} = {value}") 

147 

148 # Handle cursor visibility 

149 if mode == 'cursor': 

150 self.cursor_visible = value 

151 

152 self.refresh() 

153 

154 def _on_title_change(self, title): 

155 """Called when terminal title changes.""" 

156 # You could emit an event here to update window title 

157 self.log.debug(f"Title changed: {title}") 

158 

159 def _on_bell(self): 

160 """Called when bell character is received.""" 

161 # You could flash the screen or play a sound 

162 self.log.debug("Bell!") 

163 

164 def _on_escape_sequence(self, seq): 

165 """Handle special escape sequences like cursor style.""" 

166 self.log.debug(f"Escape sequence: {repr(seq)}") 

167 

168 # Check for DECSCUSR (cursor style) - ESC [ Ps SP q 

169 if seq.endswith(' q'): 

170 try: 

171 # Extract parameter 

172 param = seq[2:-2].strip() 

173 if param.isdigit(): 

174 style = int(param) 

175 if style in [0, 1, 2]: 

176 self.cursor_style = 1 # Block 

177 elif style in [3, 4]: 

178 self.cursor_style = 3 # Underline 

179 elif style in [5, 6]: 

180 self.cursor_style = 5 # Bar 

181 self.log.debug(f"Cursor style changed to: {self.cursor_style}") 

182 except: 

183 pass 

184 

185 def _on_osc_sequence(self, command, text): 

186 """Handle OSC sequences (for pywal colors).""" 

187 self.log.debug(f"OSC: command={command}, text={text}") 

188 

189 # OSC 4 - Change color palette 

190 if command == 4: 

191 # Format: "index;rgb:rr/gg/bb" 

192 parts = text.split(';', 1) 

193 if len(parts) == 2: 

194 try: 

195 index = int(parts[0]) 

196 color = parts[1] 

197 if color.startswith('rgb:'): 

198 # Convert rgb:rr/gg/bb to #rrggbb 

199 rgb = color[4:].split('/') 

200 if len(rgb) == 3: 

201 # Handle both 8-bit and 16-bit formats 

202 r = rgb[0][:2] 

203 g = rgb[1][:2] 

204 b = rgb[2][:2] 

205 hex_color = f"#{r}{g}{b}" 

206 self.colors[index] = hex_color 

207 self.log.debug(f"Color {index} = {hex_color}") 

208 except: 

209 pass 

210 

211 # OSC 10 - Change foreground color 

212 elif command == 10: 

213 self.log.debug(f"Foreground color: {text}") 

214 

215 # OSC 11 - Change background color 

216 elif command == 11: 

217 self.log.debug(f"Background color: {text}") 

218 

219 async def on_mount(self): 

220 """Start the terminal when widget is mounted.""" 

221 await self.start_terminal() 

222 

223 async def on_unmount(self): 

224 """Clean up when widget is unmounted.""" 

225 await self.stop_terminal() 

226 

227 async def start_terminal(self): 

228 """Start the PTY and terminal process.""" 

229 # Create PTY 

230 self.pty_master, pty_slave = pty.openpty() 

231 

232 # Get terminal size 

233 cols, rows = self.size.width, self.size.height 

234 if cols > 0 and rows > 0: 

235 self.resize_terminal(cols, rows) 

236 

237 # Fork and exec 

238 self.pty_pid = os.fork() 

239 

240 if self.pty_pid == 0: # Child process 

241 os.close(self.pty_master) 

242 

243 # Make slave the controlling terminal 

244 os.setsid() 

245 os.dup2(pty_slave, 0) # stdin 

246 os.dup2(pty_slave, 1) # stdout 

247 os.dup2(pty_slave, 2) # stderr 

248 os.close(pty_slave) 

249 

250 # Set environment 

251 env = os.environ.copy() 

252 env['TERM'] = 'xterm-256color' # Gate One supports this well 

253 env['COLORTERM'] = 'truecolor' 

254 

255 # Load pywal colors if available 

256 pywal_sequence_file = Path.home() / '.cache' / 'wal' / 'sequences' 

257 if pywal_sequence_file.exists(): 

258 # This will be sent to our terminal on startup 

259 env['PYWAL_SEQUENCES'] = str(pywal_sequence_file) 

260 

261 # Execute shell 

262 os.execve(self.command, [self.command], env) 

263 

264 else: # Parent process 

265 os.close(pty_slave) 

266 

267 # Make non-blocking 

268 import fcntl 

269 flags = fcntl.fcntl(self.pty_master, fcntl.F_GETFL) 

270 fcntl.fcntl(self.pty_master, fcntl.F_SETFL, flags | os.O_NONBLOCK) 

271 

272 # Start reader task 

273 self._reader_task = asyncio.create_task(self._read_pty()) 

274 

275 # Send pywal sequences if available 

276 await self._apply_pywal_colors() 

277 

278 async def _apply_pywal_colors(self): 

279 """Apply pywal color sequences if available.""" 

280 pywal_sequence_file = Path.home() / '.cache' / 'wal' / 'sequences' 

281 if pywal_sequence_file.exists(): 

282 try: 

283 sequences = pywal_sequence_file.read_bytes() 

284 os.write(self.pty_master, sequences) 

285 self.log.debug("Applied pywal color sequences") 

286 except Exception as e: 

287 self.log.error(f"Failed to apply pywal colors: {e}") 

288 

289 async def stop_terminal(self): 

290 """Stop the terminal process.""" 

291 if self._reader_task: 

292 self._reader_task.cancel() 

293 

294 if self.pty_master: 

295 os.close(self.pty_master) 

296 

297 if self.pty_pid: 

298 try: 

299 os.kill(self.pty_pid, 9) 

300 os.waitpid(self.pty_pid, 0) 

301 except: 

302 pass 

303 

304 async def _read_pty(self): 

305 """Read data from PTY and feed to terminal.""" 

306 loop = asyncio.get_event_loop() 

307 

308 while True: 

309 try: 

310 # Read available data 

311 data = await loop.run_in_executor(None, os.read, self.pty_master, 4096) 

312 if data: 

313 # Decode and feed to terminal 

314 text = data.decode('utf-8', errors='replace') 

315 self.terminal.write(text) 

316 else: 

317 break 

318 except OSError: 

319 await asyncio.sleep(0.01) 

320 except Exception as e: 

321 self.log.error(f"PTY read error: {e}") 

322 break 

323 

324 def resize_terminal(self, cols: int, rows: int): 

325 """Resize the terminal.""" 

326 self.cols = cols 

327 self.rows = rows 

328 

329 # Resize Gate One terminal 

330 self.terminal.resize(rows, cols) 

331 

332 # Resize PTY 

333 if self.pty_master: 

334 size = struct.pack('HHHH', rows, cols, 0, 0) 

335 fcntl.ioctl(self.pty_master, termios.TIOCSWINSZ, size) 

336 

337 def on_resize(self, event): 

338 """Handle widget resize.""" 

339 # Calculate character dimensions 

340 cols = self.size.width 

341 rows = self.size.height 

342 

343 if cols > 0 and rows > 0: 

344 self.resize_terminal(cols, rows) 

345 

346 def render_line(self, y: int) -> Strip: 

347 """Render a single line of the terminal.""" 

348 segments = [] 

349 

350 # Get line from terminal 

351 if y < len(self.terminal.screen): 

352 line = self.terminal.screen[y] 

353 

354 for x, char in enumerate(line): 

355 # Get character data 

356 char_data = char.data if hasattr(char, 'data') else ' ' 

357 if not char_data: 

358 char_data = ' ' 

359 

360 # Build style from character attributes 

361 style = Style() 

362 

363 # Foreground color 

364 if hasattr(char, 'fg') and char.fg != 'default': 

365 if isinstance(char.fg, int) and char.fg < 16: 

366 style = style + Style(color=self.colors.get(char.fg, "#ffffff")) 

367 else: 

368 style = style + Style(color=str(char.fg)) 

369 

370 # Background color 

371 if hasattr(char, 'bg') and char.bg != 'default': 

372 if isinstance(char.bg, int) and char.bg < 16: 

373 style = style + Style(bgcolor=self.colors.get(char.bg, "#000000")) 

374 else: 

375 style = style + Style(bgcolor=str(char.bg)) 

376 

377 # Text attributes 

378 if hasattr(char, 'bold') and char.bold: 

379 style = style + Style(bold=True) 

380 if hasattr(char, 'italic') and char.italic: 

381 style = style + Style(italic=True) 

382 if hasattr(char, 'underscore') and char.underscore: 

383 style = style + Style(underline=True) 

384 if hasattr(char, 'strikethrough') and char.strikethrough: 

385 style = style + Style(strike=True) 

386 

387 # IMPORTANT: Handle reverse video (inversion) 

388 if hasattr(char, 'reverse') and char.reverse: 

389 # Swap foreground and background 

390 fg = style.color 

391 bg = style.bgcolor 

392 style = Style( 

393 color=bg or "#000000", 

394 bgcolor=fg or "#ffffff", 

395 bold=style.bold, 

396 italic=style.italic, 

397 underline=style.underline, 

398 strike=style.strike 

399 ) 

400 

401 # Handle cursor 

402 is_cursor = (x == self.terminal.cursor.x and 

403 y == self.terminal.cursor.y and 

404 self.cursor_visible and 

405 not self.terminal.cursor.hidden) 

406 

407 if is_cursor: 

408 if self.cursor_style == 1: # Block cursor 

409 style = style + Style(reverse=True, blink=True) 

410 elif self.cursor_style == 3: # Underline cursor  

411 style = style + Style(underline=True, bold=True, blink=True) 

412 elif self.cursor_style == 5: # Bar cursor 

413 # For bar cursor, we modify the character 

414 if char_data == ' ': 

415 char_data = '▎' # Left one-eighth block 

416 style = style + Style(bold=True, blink=True) 

417 

418 segments.append(Segment(char_data, style)) 

419 

420 return Strip(segments) 

421 

422 def on_key(self, event: events.Key) -> None: 

423 """Handle keyboard input.""" 

424 if not self.pty_master: 

425 return 

426 

427 # Map key to escape sequence 

428 key_map = { 

429 "up": "\x1b[A", 

430 "down": "\x1b[B", 

431 "right": "\x1b[C", 

432 "left": "\x1b[D", 

433 "home": "\x1b[H", 

434 "end": "\x1b[F", 

435 "pageup": "\x1b[5~", 

436 "pagedown": "\x1b[6~", 

437 "insert": "\x1b[2~", 

438 "delete": "\x1b[3~", 

439 "f1": "\x1bOP", 

440 "f2": "\x1bOQ", 

441 "f3": "\x1bOR", 

442 "f4": "\x1bOS", 

443 "f5": "\x1b[15~", 

444 "f6": "\x1b[17~", 

445 "f7": "\x1b[18~", 

446 "f8": "\x1b[19~", 

447 "f9": "\x1b[20~", 

448 "f10": "\x1b[21~", 

449 "f11": "\x1b[23~", 

450 "f12": "\x1b[24~", 

451 "escape": "\x1b", 

452 "enter": "\r", 

453 "tab": "\t", 

454 "backspace": "\x7f", 

455 } 

456 

457 # Check for special keys 

458 if event.key in key_map: 

459 data = key_map[event.key] 

460 elif event.character: 

461 # Regular character 

462 data = event.character 

463 

464 # Handle Ctrl combinations 

465 if event.key.startswith("ctrl+"): 

466 char = event.key[5:] 

467 if len(char) == 1 and char.isalpha(): 

468 # Ctrl+A = 1, Ctrl+B = 2, etc. 

469 data = chr(ord(char.upper()) - ord('A') + 1) 

470 else: 

471 return 

472 

473 # Write to PTY 

474 try: 

475 os.write(self.pty_master, data.encode('utf-8')) 

476 except: 

477 pass 

478 

479 def on_paste(self, event: events.Paste) -> None: 

480 """Handle paste events.""" 

481 if self.pty_master and event.text: 

482 try: 

483 # Send bracketed paste sequences 

484 os.write(self.pty_master, b'\x1b[200~') 

485 os.write(self.pty_master, event.text.encode('utf-8')) 

486 os.write(self.pty_master, b'\x1b[201~') 

487 except: 

488 pass 

489 

490 

491# Example usage with your window system 

492if __name__ == "__main__": 

493 from textual.app import App 

494 

495 class TerminalApp(App): 

496 def compose(self): 

497 yield GateOneTextualTerminal() 

498 

499 app = TerminalApp() 

500 app.run()