|
from __future__ import annotations |
|
|
|
import sys |
|
|
|
assert sys.platform != "win32" |
|
|
|
import contextlib |
|
import io |
|
import termios |
|
import tty |
|
from asyncio import AbstractEventLoop, get_running_loop |
|
from typing import Callable, ContextManager, Generator, TextIO |
|
|
|
from ..key_binding import KeyPress |
|
from .base import Input |
|
from .posix_utils import PosixStdinReader |
|
from .vt100_parser import Vt100Parser |
|
|
|
__all__ = [ |
|
"Vt100Input", |
|
"raw_mode", |
|
"cooked_mode", |
|
] |
|
|
|
|
|
class Vt100Input(Input): |
|
""" |
|
Vt100 input for Posix systems. |
|
(This uses a posix file descriptor that can be registered in the event loop.) |
|
""" |
|
|
|
|
|
|
|
_fds_not_a_terminal: set[int] = set() |
|
|
|
def __init__(self, stdin: TextIO) -> None: |
|
|
|
|
|
try: |
|
|
|
stdin.fileno() |
|
except io.UnsupportedOperation as e: |
|
if "idlelib.run" in sys.modules: |
|
raise io.UnsupportedOperation( |
|
"Stdin is not a terminal. Running from Idle is not supported." |
|
) from e |
|
else: |
|
raise io.UnsupportedOperation("Stdin is not a terminal.") from e |
|
|
|
|
|
|
|
|
|
|
|
|
|
isatty = stdin.isatty() |
|
fd = stdin.fileno() |
|
|
|
if not isatty and fd not in Vt100Input._fds_not_a_terminal: |
|
msg = "Warning: Input is not a terminal (fd=%r).\n" |
|
sys.stderr.write(msg % fd) |
|
sys.stderr.flush() |
|
Vt100Input._fds_not_a_terminal.add(fd) |
|
|
|
|
|
self.stdin = stdin |
|
|
|
|
|
|
|
self._fileno = stdin.fileno() |
|
|
|
self._buffer: list[KeyPress] = [] |
|
self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding) |
|
self.vt100_parser = Vt100Parser( |
|
lambda key_press: self._buffer.append(key_press) |
|
) |
|
|
|
def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: |
|
""" |
|
Return a context manager that makes this input active in the current |
|
event loop. |
|
""" |
|
return _attached_input(self, input_ready_callback) |
|
|
|
def detach(self) -> ContextManager[None]: |
|
""" |
|
Return a context manager that makes sure that this input is not active |
|
in the current event loop. |
|
""" |
|
return _detached_input(self) |
|
|
|
def read_keys(self) -> list[KeyPress]: |
|
"Read list of KeyPress." |
|
|
|
data = self.stdin_reader.read() |
|
|
|
|
|
self.vt100_parser.feed(data) |
|
|
|
|
|
result = self._buffer |
|
self._buffer = [] |
|
return result |
|
|
|
def flush_keys(self) -> list[KeyPress]: |
|
""" |
|
Flush pending keys and return them. |
|
(Used for flushing the 'escape' key.) |
|
""" |
|
|
|
|
|
self.vt100_parser.flush() |
|
|
|
|
|
result = self._buffer |
|
self._buffer = [] |
|
return result |
|
|
|
@property |
|
def closed(self) -> bool: |
|
return self.stdin_reader.closed |
|
|
|
def raw_mode(self) -> ContextManager[None]: |
|
return raw_mode(self.stdin.fileno()) |
|
|
|
def cooked_mode(self) -> ContextManager[None]: |
|
return cooked_mode(self.stdin.fileno()) |
|
|
|
def fileno(self) -> int: |
|
return self.stdin.fileno() |
|
|
|
def typeahead_hash(self) -> str: |
|
return f"fd-{self._fileno}" |
|
|
|
|
|
_current_callbacks: dict[ |
|
tuple[AbstractEventLoop, int], Callable[[], None] | None |
|
] = {} |
|
|
|
|
|
@contextlib.contextmanager |
|
def _attached_input( |
|
input: Vt100Input, callback: Callable[[], None] |
|
) -> Generator[None, None, None]: |
|
""" |
|
Context manager that makes this input active in the current event loop. |
|
|
|
:param input: :class:`~prompt_toolkit.input.Input` object. |
|
:param callback: Called when the input is ready to read. |
|
""" |
|
loop = get_running_loop() |
|
fd = input.fileno() |
|
previous = _current_callbacks.get((loop, fd)) |
|
|
|
def callback_wrapper() -> None: |
|
"""Wrapper around the callback that already removes the reader when |
|
the input is closed. Otherwise, we keep continuously calling this |
|
callback, until we leave the context manager (which can happen a bit |
|
later). This fixes issues when piping /dev/null into a prompt_toolkit |
|
application.""" |
|
if input.closed: |
|
loop.remove_reader(fd) |
|
callback() |
|
|
|
try: |
|
loop.add_reader(fd, callback_wrapper) |
|
except PermissionError: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
raise EOFError |
|
|
|
_current_callbacks[loop, fd] = callback |
|
|
|
try: |
|
yield |
|
finally: |
|
loop.remove_reader(fd) |
|
|
|
if previous: |
|
loop.add_reader(fd, previous) |
|
_current_callbacks[loop, fd] = previous |
|
else: |
|
del _current_callbacks[loop, fd] |
|
|
|
|
|
@contextlib.contextmanager |
|
def _detached_input(input: Vt100Input) -> Generator[None, None, None]: |
|
loop = get_running_loop() |
|
fd = input.fileno() |
|
previous = _current_callbacks.get((loop, fd)) |
|
|
|
if previous: |
|
loop.remove_reader(fd) |
|
_current_callbacks[loop, fd] = None |
|
|
|
try: |
|
yield |
|
finally: |
|
if previous: |
|
loop.add_reader(fd, previous) |
|
_current_callbacks[loop, fd] = previous |
|
|
|
|
|
class raw_mode: |
|
""" |
|
:: |
|
|
|
with raw_mode(stdin): |
|
''' the pseudo-terminal stdin is now used in raw mode ''' |
|
|
|
We ignore errors when executing `tcgetattr` fails. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, fileno: int) -> None: |
|
self.fileno = fileno |
|
self.attrs_before: list[int | list[bytes | int]] | None |
|
try: |
|
self.attrs_before = termios.tcgetattr(fileno) |
|
except termios.error: |
|
|
|
self.attrs_before = None |
|
|
|
def __enter__(self) -> None: |
|
|
|
try: |
|
newattr = termios.tcgetattr(self.fileno) |
|
except termios.error: |
|
pass |
|
else: |
|
newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG]) |
|
newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
newattr[tty.CC][termios.VMIN] = 1 |
|
|
|
termios.tcsetattr(self.fileno, termios.TCSANOW, newattr) |
|
|
|
@classmethod |
|
def _patch_lflag(cls, attrs: int) -> int: |
|
return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) |
|
|
|
@classmethod |
|
def _patch_iflag(cls, attrs: int) -> int: |
|
return attrs & ~( |
|
|
|
|
|
|
|
termios.IXON |
|
| termios.IXOFF |
|
| |
|
|
|
termios.ICRNL |
|
| termios.INLCR |
|
| termios.IGNCR |
|
) |
|
|
|
def __exit__(self, *a: object) -> None: |
|
if self.attrs_before is not None: |
|
try: |
|
termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before) |
|
except termios.error: |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
class cooked_mode(raw_mode): |
|
""" |
|
The opposite of ``raw_mode``, used when we need cooked mode inside a |
|
`raw_mode` block. Used in `Application.run_in_terminal`.:: |
|
|
|
with cooked_mode(stdin): |
|
''' the pseudo-terminal stdin is now used in cooked mode. ''' |
|
""" |
|
|
|
@classmethod |
|
def _patch_lflag(cls, attrs: int) -> int: |
|
return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) |
|
|
|
@classmethod |
|
def _patch_iflag(cls, attrs: int) -> int: |
|
|
|
|
|
|
|
|
|
return attrs | termios.ICRNL |
|
|