Coverage for openhcs/textual_tui/windows/gateone-terminal-widget.py: 0.0%

244 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 02:09 +0000

1""" 

2Gate One Terminal Widget for Textual 

3A terminal emulator widget using Gate One's terminal.py instead of pyte 

4""" 

5 

6import os 

7import pty 

8import asyncio 

9import struct 

10import logging 

11from typing import Optional, Tuple 

12from pathlib import Path 

13 

14# Unix-only imports (TUI is deprecated and not supported on Windows) 

15try: 

16 import fcntl 

17 import termios 

18 UNIX_AVAILABLE = True 

19except ImportError: 

20 UNIX_AVAILABLE = False 

21 

22from rich.style import Style 

23from rich.segment import Segment 

24 

25from textual.widget import Widget 

26from textual.reactive import reactive 

27from textual import events 

28from textual.strip import Strip 

29 

30# You'll need to extract terminal.py from Gate One 

31# Download from: https://github.com/liftoff/GateOne 

32try: 

33 from terminal import Terminal as GateOneTerminal 

34except ImportError: 

35 raise ImportError( 

36 "Please extract terminal.py from Gate One:\n" 

37 "1. git clone https://github.com/liftoff/GateOne\n" 

38 "2. Copy GateOne/gateone/terminal.py to your project" 

39 ) 

40 

41 

42class GateOneTextualTerminal(Widget): 

43 """A Textual widget that uses Gate One's terminal emulator.""" 

44 

45 DEFAULT_CSS = """ 

46 GateOneTextualTerminal { 

47 width: 100%; 

48 height: 100%; 

49 } 

50 """ 

51 

52 # Reactive properties 

53 cursor_visible = reactive(True) 

54 cursor_style = reactive(1) # 1=block, 3=underline, 5=bar 

55 

56 def __init__( 

57 self, 

58 command: Optional[str] = None, 

59 size: Tuple[int, int] = (80, 24), 

60 **kwargs 

61 ): 

62 super().__init__(**kwargs) 

63 

64 # Terminal dimensions 

65 self.cols, self.rows = size 

66 

67 # Command to run 

68 self.command = command or os.environ.get('SHELL', '/bin/bash') 

69 

70 # Gate One terminal instance 

71 self.terminal = GateOneTerminal(rows=self.rows, cols=self.cols) 

72 

73 # Set up callbacks 

74 self._setup_callbacks() 

75 

76 # PTY file descriptors 

77 self.pty_master = None 

78 self.pty_pid = None 

79 

80 # Async reader task 

81 self._reader_task = None 

82 

83 # Color palette (will be populated from terminal) 

84 self.colors = self._default_colors() 

85 

86 # For handling OSC sequences (like pywal) 

87 self.osc_buffer = "" 

88 

89 # Logger 

90 self.log = logging.getLogger(__name__) 

91 

92 def _default_colors(self) -> dict: 

93 """Default 16-color palette.""" 

94 return { 

95 0: "#000000", # Black 

96 1: "#cd0000", # Red 

97 2: "#00cd00", # Green 

98 3: "#cdcd00", # Yellow 

99 4: "#0000ee", # Blue 

100 5: "#cd00cd", # Magenta 

101 6: "#00cdcd", # Cyan 

102 7: "#e5e5e5", # White 

103 8: "#7f7f7f", # Bright Black 

104 9: "#ff0000", # Bright Red 

105 10: "#00ff00", # Bright Green 

106 11: "#ffff00", # Bright Yellow 

107 12: "#5c5cff", # Bright Blue 

108 13: "#ff00ff", # Bright Magenta 

109 14: "#00ffff", # Bright Cyan 

110 15: "#ffffff", # Bright White 

111 } 

112 

113 def _setup_callbacks(self): 

114 """Set up Gate One terminal callbacks.""" 

115 t = self.terminal 

116 

117 # Screen content changed 

118 t.callbacks[t.CALLBACK_CHANGED] = self._on_terminal_change 

119 

120 # Cursor position changed 

121 t.callbacks[t.CALLBACK_CURSOR_POS] = self._on_cursor_move 

122 

123 # Mode changes (for cursor styles, etc) 

124 t.callbacks[t.CALLBACK_MODE] = self._on_mode_change 

125 

126 # Title changes 

127 t.callbacks[t.CALLBACK_TITLE] = self._on_title_change 

128 

129 # Bell 

130 t.callbacks[t.CALLBACK_BELL] = self._on_bell 

131 

132 # For handling special escape sequences 

133 t.callbacks[t.CALLBACK_ESC] = self._on_escape_sequence 

134 

135 # OSC sequences (for colors) 

136 t.callbacks[t.CALLBACK_OSC] = self._on_osc_sequence 

137 

138 def _on_terminal_change(self): 

139 """Called when terminal content changes.""" 

140 self.refresh() 

141 

142 def _on_cursor_move(self): 

143 """Called when cursor moves.""" 

144 self.refresh() 

145 

146 def _on_mode_change(self, mode, value): 

147 """Called when terminal mode changes.""" 

148 self.log.debug(f"Mode change: {mode} = {value}") 

149 

150 # Handle cursor visibility 

151 if mode == 'cursor': 

152 self.cursor_visible = value 

153 

154 self.refresh() 

155 

156 def _on_title_change(self, title): 

157 """Called when terminal title changes.""" 

158 # You could emit an event here to update window title 

159 self.log.debug(f"Title changed: {title}") 

160 

161 def _on_bell(self): 

162 """Called when bell character is received.""" 

163 # You could flash the screen or play a sound 

164 self.log.debug("Bell!") 

165 

166 def _on_escape_sequence(self, seq): 

167 """Handle special escape sequences like cursor style.""" 

168 self.log.debug(f"Escape sequence: {repr(seq)}") 

169 

170 # Check for DECSCUSR (cursor style) - ESC [ Ps SP q 

171 if seq.endswith(' q'): 

172 try: 

173 # Extract parameter 

174 param = seq[2:-2].strip() 

175 if param.isdigit(): 

176 style = int(param) 

177 if style in [0, 1, 2]: 

178 self.cursor_style = 1 # Block 

179 elif style in [3, 4]: 

180 self.cursor_style = 3 # Underline 

181 elif style in [5, 6]: 

182 self.cursor_style = 5 # Bar 

183 self.log.debug(f"Cursor style changed to: {self.cursor_style}") 

184 except: 

185 pass 

186 

187 def _on_osc_sequence(self, command, text): 

188 """Handle OSC sequences (for pywal colors).""" 

189 self.log.debug(f"OSC: command={command}, text={text}") 

190 

191 # OSC 4 - Change color palette 

192 if command == 4: 

193 # Format: "index;rgb:rr/gg/bb" 

194 parts = text.split(';', 1) 

195 if len(parts) == 2: 

196 try: 

197 index = int(parts[0]) 

198 color = parts[1] 

199 if color.startswith('rgb:'): 

200 # Convert rgb:rr/gg/bb to #rrggbb 

201 rgb = color[4:].split('/') 

202 if len(rgb) == 3: 

203 # Handle both 8-bit and 16-bit formats 

204 r = rgb[0][:2] 

205 g = rgb[1][:2] 

206 b = rgb[2][:2] 

207 hex_color = f"#{r}{g}{b}" 

208 self.colors[index] = hex_color 

209 self.log.debug(f"Color {index} = {hex_color}") 

210 except: 

211 pass 

212 

213 # OSC 10 - Change foreground color 

214 elif command == 10: 

215 self.log.debug(f"Foreground color: {text}") 

216 

217 # OSC 11 - Change background color 

218 elif command == 11: 

219 self.log.debug(f"Background color: {text}") 

220 

221 async def on_mount(self): 

222 """Start the terminal when widget is mounted.""" 

223 await self.start_terminal() 

224 

225 async def on_unmount(self): 

226 """Clean up when widget is unmounted.""" 

227 await self.stop_terminal() 

228 

229 async def start_terminal(self): 

230 """Start the PTY and terminal process.""" 

231 # Create PTY 

232 self.pty_master, pty_slave = pty.openpty() 

233 

234 # Get terminal size 

235 cols, rows = self.size.width, self.size.height 

236 if cols > 0 and rows > 0: 

237 self.resize_terminal(cols, rows) 

238 

239 # Fork and exec 

240 self.pty_pid = os.fork() 

241 

242 if self.pty_pid == 0: # Child process 

243 os.close(self.pty_master) 

244 

245 # Make slave the controlling terminal 

246 os.setsid() 

247 os.dup2(pty_slave, 0) # stdin 

248 os.dup2(pty_slave, 1) # stdout 

249 os.dup2(pty_slave, 2) # stderr 

250 os.close(pty_slave) 

251 

252 # Set environment 

253 env = os.environ.copy() 

254 env['TERM'] = 'xterm-256color' # Gate One supports this well 

255 env['COLORTERM'] = 'truecolor' 

256 

257 # Load pywal colors if available 

258 pywal_sequence_file = Path.home() / '.cache' / 'wal' / 'sequences' 

259 if pywal_sequence_file.exists(): 

260 # This will be sent to our terminal on startup 

261 env['PYWAL_SEQUENCES'] = str(pywal_sequence_file) 

262 

263 # Execute shell 

264 os.execve(self.command, [self.command], env) 

265 

266 else: # Parent process 

267 os.close(pty_slave) 

268 

269 # Make non-blocking 

270 import fcntl 

271 flags = fcntl.fcntl(self.pty_master, fcntl.F_GETFL) 

272 fcntl.fcntl(self.pty_master, fcntl.F_SETFL, flags | os.O_NONBLOCK) 

273 

274 # Start reader task 

275 self._reader_task = asyncio.create_task(self._read_pty()) 

276 

277 # Send pywal sequences if available 

278 await self._apply_pywal_colors() 

279 

280 async def _apply_pywal_colors(self): 

281 """Apply pywal color sequences if available.""" 

282 pywal_sequence_file = Path.home() / '.cache' / 'wal' / 'sequences' 

283 if pywal_sequence_file.exists(): 

284 try: 

285 sequences = pywal_sequence_file.read_bytes() 

286 os.write(self.pty_master, sequences) 

287 self.log.debug("Applied pywal color sequences") 

288 except Exception as e: 

289 self.log.error(f"Failed to apply pywal colors: {e}") 

290 

291 async def stop_terminal(self): 

292 """Stop the terminal process.""" 

293 if self._reader_task: 

294 self._reader_task.cancel() 

295 

296 if self.pty_master: 

297 os.close(self.pty_master) 

298 

299 if self.pty_pid: 

300 try: 

301 os.kill(self.pty_pid, 9) 

302 os.waitpid(self.pty_pid, 0) 

303 except: 

304 pass 

305 

306 async def _read_pty(self): 

307 """Read data from PTY and feed to terminal.""" 

308 loop = asyncio.get_event_loop() 

309 

310 while True: 

311 try: 

312 # Read available data 

313 data = await loop.run_in_executor(None, os.read, self.pty_master, 4096) 

314 if data: 

315 # Decode and feed to terminal 

316 text = data.decode('utf-8', errors='replace') 

317 self.terminal.write(text) 

318 else: 

319 break 

320 except OSError: 

321 await asyncio.sleep(0.01) 

322 except Exception as e: 

323 self.log.error(f"PTY read error: {e}") 

324 break 

325 

326 def resize_terminal(self, cols: int, rows: int): 

327 """Resize the terminal.""" 

328 self.cols = cols 

329 self.rows = rows 

330 

331 # Resize Gate One terminal 

332 self.terminal.resize(rows, cols) 

333 

334 # Resize PTY 

335 if self.pty_master: 

336 size = struct.pack('HHHH', rows, cols, 0, 0) 

337 fcntl.ioctl(self.pty_master, termios.TIOCSWINSZ, size) 

338 

339 def on_resize(self, event): 

340 """Handle widget resize.""" 

341 # Calculate character dimensions 

342 cols = self.size.width 

343 rows = self.size.height 

344 

345 if cols > 0 and rows > 0: 

346 self.resize_terminal(cols, rows) 

347 

348 def render_line(self, y: int) -> Strip: 

349 """Render a single line of the terminal.""" 

350 segments = [] 

351 

352 # Get line from terminal 

353 if y < len(self.terminal.screen): 

354 line = self.terminal.screen[y] 

355 

356 for x, char in enumerate(line): 

357 # Get character data 

358 char_data = char.data if hasattr(char, 'data') else ' ' 

359 if not char_data: 

360 char_data = ' ' 

361 

362 # Build style from character attributes 

363 style = Style() 

364 

365 # Foreground color 

366 if hasattr(char, 'fg') and char.fg != 'default': 

367 if isinstance(char.fg, int) and char.fg < 16: 

368 style = style + Style(color=self.colors.get(char.fg, "#ffffff")) 

369 else: 

370 style = style + Style(color=str(char.fg)) 

371 

372 # Background color 

373 if hasattr(char, 'bg') and char.bg != 'default': 

374 if isinstance(char.bg, int) and char.bg < 16: 

375 style = style + Style(bgcolor=self.colors.get(char.bg, "#000000")) 

376 else: 

377 style = style + Style(bgcolor=str(char.bg)) 

378 

379 # Text attributes 

380 if hasattr(char, 'bold') and char.bold: 

381 style = style + Style(bold=True) 

382 if hasattr(char, 'italic') and char.italic: 

383 style = style + Style(italic=True) 

384 if hasattr(char, 'underscore') and char.underscore: 

385 style = style + Style(underline=True) 

386 if hasattr(char, 'strikethrough') and char.strikethrough: 

387 style = style + Style(strike=True) 

388 

389 # IMPORTANT: Handle reverse video (inversion) 

390 if hasattr(char, 'reverse') and char.reverse: 

391 # Swap foreground and background 

392 fg = style.color 

393 bg = style.bgcolor 

394 style = Style( 

395 color=bg or "#000000", 

396 bgcolor=fg or "#ffffff", 

397 bold=style.bold, 

398 italic=style.italic, 

399 underline=style.underline, 

400 strike=style.strike 

401 ) 

402 

403 # Handle cursor 

404 is_cursor = (x == self.terminal.cursor.x and 

405 y == self.terminal.cursor.y and 

406 self.cursor_visible and 

407 not self.terminal.cursor.hidden) 

408 

409 if is_cursor: 

410 if self.cursor_style == 1: # Block cursor 

411 style = style + Style(reverse=True, blink=True) 

412 elif self.cursor_style == 3: # Underline cursor  

413 style = style + Style(underline=True, bold=True, blink=True) 

414 elif self.cursor_style == 5: # Bar cursor 

415 # For bar cursor, we modify the character 

416 if char_data == ' ': 

417 char_data = '▎' # Left one-eighth block 

418 style = style + Style(bold=True, blink=True) 

419 

420 segments.append(Segment(char_data, style)) 

421 

422 return Strip(segments) 

423 

424 def on_key(self, event: events.Key) -> None: 

425 """Handle keyboard input.""" 

426 if not self.pty_master: 

427 return 

428 

429 # Map key to escape sequence 

430 key_map = { 

431 "up": "\x1b[A", 

432 "down": "\x1b[B", 

433 "right": "\x1b[C", 

434 "left": "\x1b[D", 

435 "home": "\x1b[H", 

436 "end": "\x1b[F", 

437 "pageup": "\x1b[5~", 

438 "pagedown": "\x1b[6~", 

439 "insert": "\x1b[2~", 

440 "delete": "\x1b[3~", 

441 "f1": "\x1bOP", 

442 "f2": "\x1bOQ", 

443 "f3": "\x1bOR", 

444 "f4": "\x1bOS", 

445 "f5": "\x1b[15~", 

446 "f6": "\x1b[17~", 

447 "f7": "\x1b[18~", 

448 "f8": "\x1b[19~", 

449 "f9": "\x1b[20~", 

450 "f10": "\x1b[21~", 

451 "f11": "\x1b[23~", 

452 "f12": "\x1b[24~", 

453 "escape": "\x1b", 

454 "enter": "\r", 

455 "tab": "\t", 

456 "backspace": "\x7f", 

457 } 

458 

459 # Check for special keys 

460 if event.key in key_map: 

461 data = key_map[event.key] 

462 elif event.character: 

463 # Regular character 

464 data = event.character 

465 

466 # Handle Ctrl combinations 

467 if event.key.startswith("ctrl+"): 

468 char = event.key[5:] 

469 if len(char) == 1 and char.isalpha(): 

470 # Ctrl+A = 1, Ctrl+B = 2, etc. 

471 data = chr(ord(char.upper()) - ord('A') + 1) 

472 else: 

473 return 

474 

475 # Write to PTY 

476 try: 

477 os.write(self.pty_master, data.encode('utf-8')) 

478 except: 

479 pass 

480 

481 def on_paste(self, event: events.Paste) -> None: 

482 """Handle paste events.""" 

483 if self.pty_master and event.text: 

484 try: 

485 # Send bracketed paste sequences 

486 os.write(self.pty_master, b'\x1b[200~') 

487 os.write(self.pty_master, event.text.encode('utf-8')) 

488 os.write(self.pty_master, b'\x1b[201~') 

489 except: 

490 pass 

491 

492 

493# Example usage with your window system 

494if __name__ == "__main__": 

495 from textual.app import App 

496 

497 class TerminalApp(App): 

498 def compose(self): 

499 yield GateOneTextualTerminal() 

500 

501 app = TerminalApp() 

502 app.run()