|
from __future__ import annotations |
|
|
|
import sys |
|
|
|
assert sys.platform != "win32" |
|
|
|
import os |
|
from contextlib import contextmanager |
|
from typing import ContextManager, Iterator, TextIO, cast |
|
|
|
from ..utils import DummyContext |
|
from .base import PipeInput |
|
from .vt100 import Vt100Input |
|
|
|
__all__ = [ |
|
"PosixPipeInput", |
|
] |
|
|
|
|
|
class _Pipe: |
|
"Wrapper around os.pipe, that ensures we don't double close any end." |
|
|
|
def __init__(self) -> None: |
|
self.read_fd, self.write_fd = os.pipe() |
|
self._read_closed = False |
|
self._write_closed = False |
|
|
|
def close_read(self) -> None: |
|
"Close read-end if not yet closed." |
|
if self._read_closed: |
|
return |
|
|
|
os.close(self.read_fd) |
|
self._read_closed = True |
|
|
|
def close_write(self) -> None: |
|
"Close write-end if not yet closed." |
|
if self._write_closed: |
|
return |
|
|
|
os.close(self.write_fd) |
|
self._write_closed = True |
|
|
|
def close(self) -> None: |
|
"Close both read and write ends." |
|
self.close_read() |
|
self.close_write() |
|
|
|
|
|
class PosixPipeInput(Vt100Input, PipeInput): |
|
""" |
|
Input that is send through a pipe. |
|
This is useful if we want to send the input programmatically into the |
|
application. Mostly useful for unit testing. |
|
|
|
Usage:: |
|
|
|
with PosixPipeInput.create() as input: |
|
input.send_text('inputdata') |
|
""" |
|
|
|
_id = 0 |
|
|
|
def __init__(self, _pipe: _Pipe, _text: str = "") -> None: |
|
|
|
self.pipe = _pipe |
|
|
|
class Stdin: |
|
encoding = "utf-8" |
|
|
|
def isatty(stdin) -> bool: |
|
return True |
|
|
|
def fileno(stdin) -> int: |
|
return self.pipe.read_fd |
|
|
|
super().__init__(cast(TextIO, Stdin())) |
|
self.send_text(_text) |
|
|
|
|
|
self.__class__._id += 1 |
|
self._id = self.__class__._id |
|
|
|
@classmethod |
|
@contextmanager |
|
def create(cls, text: str = "") -> Iterator[PosixPipeInput]: |
|
pipe = _Pipe() |
|
try: |
|
yield PosixPipeInput(_pipe=pipe, _text=text) |
|
finally: |
|
pipe.close() |
|
|
|
def send_bytes(self, data: bytes) -> None: |
|
os.write(self.pipe.write_fd, data) |
|
|
|
def send_text(self, data: str) -> None: |
|
"Send text to the input." |
|
os.write(self.pipe.write_fd, data.encode("utf-8")) |
|
|
|
def raw_mode(self) -> ContextManager[None]: |
|
return DummyContext() |
|
|
|
def cooked_mode(self) -> ContextManager[None]: |
|
return DummyContext() |
|
|
|
def close(self) -> None: |
|
"Close pipe fds." |
|
|
|
|
|
|
|
|
|
self.pipe.close_write() |
|
|
|
def typeahead_hash(self) -> str: |
|
""" |
|
This needs to be unique for every `PipeInput`. |
|
""" |
|
return f"pipe-input-{self._id}" |
|
|