|
from __future__ import annotations |
|
|
|
import sys |
|
|
|
assert sys.platform == "win32" |
|
|
|
from contextlib import contextmanager |
|
from ctypes import windll |
|
from ctypes.wintypes import HANDLE |
|
from typing import Callable, ContextManager, Iterator |
|
|
|
from prompt_toolkit.eventloop.win32 import create_win32_event |
|
|
|
from ..key_binding import KeyPress |
|
from ..utils import DummyContext |
|
from .base import PipeInput |
|
from .vt100_parser import Vt100Parser |
|
from .win32 import _Win32InputBase, attach_win32_input, detach_win32_input |
|
|
|
__all__ = ["Win32PipeInput"] |
|
|
|
|
|
class Win32PipeInput(_Win32InputBase, PipeInput): |
|
""" |
|
This is an input pipe that works on Windows. |
|
Text or bytes can be feed into the pipe, and key strokes can be read from |
|
the pipe. This is useful if we want to send the input programmatically into |
|
the application. Mostly useful for unit testing. |
|
|
|
Notice that even though it's Windows, we use vt100 escape sequences over |
|
the pipe. |
|
|
|
Usage:: |
|
|
|
input = Win32PipeInput() |
|
input.send_text('inputdata') |
|
""" |
|
|
|
_id = 0 |
|
|
|
def __init__(self, _event: HANDLE) -> None: |
|
super().__init__() |
|
|
|
|
|
|
|
|
|
|
|
self._event = create_win32_event() |
|
|
|
self._closed = False |
|
|
|
|
|
self._buffer: list[KeyPress] = [] |
|
self.vt100_parser = Vt100Parser(lambda key: self._buffer.append(key)) |
|
|
|
|
|
self.__class__._id += 1 |
|
self._id = self.__class__._id |
|
|
|
@classmethod |
|
@contextmanager |
|
def create(cls) -> Iterator[Win32PipeInput]: |
|
event = create_win32_event() |
|
try: |
|
yield Win32PipeInput(_event=event) |
|
finally: |
|
windll.kernel32.CloseHandle(event) |
|
|
|
@property |
|
def closed(self) -> bool: |
|
return self._closed |
|
|
|
def fileno(self) -> int: |
|
""" |
|
The windows pipe doesn't depend on the file handle. |
|
""" |
|
raise NotImplementedError |
|
|
|
@property |
|
def handle(self) -> HANDLE: |
|
"The handle used for registering this pipe in the event loop." |
|
return self._event |
|
|
|
def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: |
|
""" |
|
Return a context manager that makes this input active in the current |
|
event loop. |
|
""" |
|
return attach_win32_input(self, input_ready_callback) |
|
|
|
def detach(self) -> ContextManager[None]: |
|
""" |
|
Return a context manager that makes sure that this input is not active |
|
in the current event loop. |
|
""" |
|
return detach_win32_input(self) |
|
|
|
def read_keys(self) -> list[KeyPress]: |
|
"Read list of KeyPress." |
|
|
|
|
|
result = self._buffer |
|
self._buffer = [] |
|
|
|
|
|
if not self._closed: |
|
|
|
windll.kernel32.ResetEvent(self._event) |
|
|
|
return result |
|
|
|
def flush_keys(self) -> list[KeyPress]: |
|
""" |
|
Flush pending keys and return them. |
|
(Used for flushing the 'escape' key.) |
|
""" |
|
|
|
|
|
self.vt100_parser.flush() |
|
|
|
|
|
result = self._buffer |
|
self._buffer = [] |
|
return result |
|
|
|
def send_bytes(self, data: bytes) -> None: |
|
"Send bytes to the input." |
|
self.send_text(data.decode("utf-8", "ignore")) |
|
|
|
def send_text(self, text: str) -> None: |
|
"Send text to the input." |
|
if self._closed: |
|
raise ValueError("Attempt to write into a closed pipe.") |
|
|
|
|
|
self.vt100_parser.feed(text) |
|
|
|
|
|
windll.kernel32.SetEvent(self._event) |
|
|
|
def raw_mode(self) -> ContextManager[None]: |
|
return DummyContext() |
|
|
|
def cooked_mode(self) -> ContextManager[None]: |
|
return DummyContext() |
|
|
|
def close(self) -> None: |
|
"Close write-end of the pipe." |
|
self._closed = True |
|
windll.kernel32.SetEvent(self._event) |
|
|
|
def typeahead_hash(self) -> str: |
|
""" |
|
This needs to be unique for every `PipeInput`. |
|
""" |
|
return f"pipe-input-{self._id}" |
|
|