|
from __future__ import annotations |
|
|
|
import asyncio |
|
import contextvars |
|
import os |
|
import re |
|
import signal |
|
import sys |
|
import threading |
|
import time |
|
from asyncio import ( |
|
AbstractEventLoop, |
|
Future, |
|
Task, |
|
ensure_future, |
|
get_running_loop, |
|
sleep, |
|
) |
|
from contextlib import ExitStack, contextmanager |
|
from subprocess import Popen |
|
from traceback import format_tb |
|
from typing import ( |
|
Any, |
|
Callable, |
|
Coroutine, |
|
Generator, |
|
Generic, |
|
Hashable, |
|
Iterable, |
|
Iterator, |
|
TypeVar, |
|
cast, |
|
overload, |
|
) |
|
|
|
from prompt_toolkit.buffer import Buffer |
|
from prompt_toolkit.cache import SimpleCache |
|
from prompt_toolkit.clipboard import Clipboard, InMemoryClipboard |
|
from prompt_toolkit.cursor_shapes import AnyCursorShapeConfig, to_cursor_shape_config |
|
from prompt_toolkit.data_structures import Size |
|
from prompt_toolkit.enums import EditingMode |
|
from prompt_toolkit.eventloop import ( |
|
InputHook, |
|
get_traceback_from_context, |
|
new_eventloop_with_inputhook, |
|
run_in_executor_with_context, |
|
) |
|
from prompt_toolkit.eventloop.utils import call_soon_threadsafe |
|
from prompt_toolkit.filters import Condition, Filter, FilterOrBool, to_filter |
|
from prompt_toolkit.formatted_text import AnyFormattedText |
|
from prompt_toolkit.input.base import Input |
|
from prompt_toolkit.input.typeahead import get_typeahead, store_typeahead |
|
from prompt_toolkit.key_binding.bindings.page_navigation import ( |
|
load_page_navigation_bindings, |
|
) |
|
from prompt_toolkit.key_binding.defaults import load_key_bindings |
|
from prompt_toolkit.key_binding.emacs_state import EmacsState |
|
from prompt_toolkit.key_binding.key_bindings import ( |
|
Binding, |
|
ConditionalKeyBindings, |
|
GlobalOnlyKeyBindings, |
|
KeyBindings, |
|
KeyBindingsBase, |
|
KeysTuple, |
|
merge_key_bindings, |
|
) |
|
from prompt_toolkit.key_binding.key_processor import KeyPressEvent, KeyProcessor |
|
from prompt_toolkit.key_binding.vi_state import ViState |
|
from prompt_toolkit.keys import Keys |
|
from prompt_toolkit.layout.containers import Container, Window |
|
from prompt_toolkit.layout.controls import BufferControl, UIControl |
|
from prompt_toolkit.layout.dummy import create_dummy_layout |
|
from prompt_toolkit.layout.layout import Layout, walk |
|
from prompt_toolkit.output import ColorDepth, Output |
|
from prompt_toolkit.renderer import Renderer, print_formatted_text |
|
from prompt_toolkit.search import SearchState |
|
from prompt_toolkit.styles import ( |
|
BaseStyle, |
|
DummyStyle, |
|
DummyStyleTransformation, |
|
DynamicStyle, |
|
StyleTransformation, |
|
default_pygments_style, |
|
default_ui_style, |
|
merge_styles, |
|
) |
|
from prompt_toolkit.utils import Event, in_main_thread |
|
|
|
from .current import get_app_session, set_app |
|
from .run_in_terminal import in_terminal, run_in_terminal |
|
|
|
__all__ = [ |
|
"Application", |
|
] |
|
|
|
|
|
E = KeyPressEvent |
|
_AppResult = TypeVar("_AppResult") |
|
ApplicationEventHandler = Callable[["Application[_AppResult]"], None] |
|
|
|
_SIGWINCH = getattr(signal, "SIGWINCH", None) |
|
_SIGTSTP = getattr(signal, "SIGTSTP", None) |
|
|
|
|
|
class Application(Generic[_AppResult]): |
|
""" |
|
The main Application class! |
|
This glues everything together. |
|
|
|
:param layout: A :class:`~prompt_toolkit.layout.Layout` instance. |
|
:param key_bindings: |
|
:class:`~prompt_toolkit.key_binding.KeyBindingsBase` instance for |
|
the key bindings. |
|
:param clipboard: :class:`~prompt_toolkit.clipboard.Clipboard` to use. |
|
:param full_screen: When True, run the application on the alternate screen buffer. |
|
:param color_depth: Any :class:`~.ColorDepth` value, a callable that |
|
returns a :class:`~.ColorDepth` or `None` for default. |
|
:param erase_when_done: (bool) Clear the application output when it finishes. |
|
:param reverse_vi_search_direction: Normally, in Vi mode, a '/' searches |
|
forward and a '?' searches backward. In Readline mode, this is usually |
|
reversed. |
|
:param min_redraw_interval: Number of seconds to wait between redraws. Use |
|
this for applications where `invalidate` is called a lot. This could cause |
|
a lot of terminal output, which some terminals are not able to process. |
|
|
|
`None` means that every `invalidate` will be scheduled right away |
|
(which is usually fine). |
|
|
|
When one `invalidate` is called, but a scheduled redraw of a previous |
|
`invalidate` call has not been executed yet, nothing will happen in any |
|
case. |
|
|
|
:param max_render_postpone_time: When there is high CPU (a lot of other |
|
scheduled calls), postpone the rendering max x seconds. '0' means: |
|
don't postpone. '.5' means: try to draw at least twice a second. |
|
|
|
:param refresh_interval: Automatically invalidate the UI every so many |
|
seconds. When `None` (the default), only invalidate when `invalidate` |
|
has been called. |
|
|
|
:param terminal_size_polling_interval: Poll the terminal size every so many |
|
seconds. Useful if the applications runs in a thread other then then |
|
main thread where SIGWINCH can't be handled, or on Windows. |
|
|
|
Filters: |
|
|
|
:param mouse_support: (:class:`~prompt_toolkit.filters.Filter` or |
|
boolean). When True, enable mouse support. |
|
:param paste_mode: :class:`~prompt_toolkit.filters.Filter` or boolean. |
|
:param editing_mode: :class:`~prompt_toolkit.enums.EditingMode`. |
|
|
|
:param enable_page_navigation_bindings: When `True`, enable the page |
|
navigation key bindings. These include both Emacs and Vi bindings like |
|
page-up, page-down and so on to scroll through pages. Mostly useful for |
|
creating an editor or other full screen applications. Probably, you |
|
don't want this for the implementation of a REPL. By default, this is |
|
enabled if `full_screen` is set. |
|
|
|
Callbacks (all of these should accept an |
|
:class:`~prompt_toolkit.application.Application` object as input.) |
|
|
|
:param on_reset: Called during reset. |
|
:param on_invalidate: Called when the UI has been invalidated. |
|
:param before_render: Called right before rendering. |
|
:param after_render: Called right after rendering. |
|
|
|
I/O: |
|
(Note that the preferred way to change the input/output is by creating an |
|
`AppSession` with the required input/output objects. If you need multiple |
|
applications running at the same time, you have to create a separate |
|
`AppSession` using a `with create_app_session():` block. |
|
|
|
:param input: :class:`~prompt_toolkit.input.Input` instance. |
|
:param output: :class:`~prompt_toolkit.output.Output` instance. (Probably |
|
Vt100_Output or Win32Output.) |
|
|
|
Usage: |
|
|
|
app = Application(...) |
|
app.run() |
|
|
|
# Or |
|
await app.run_async() |
|
""" |
|
|
|
def __init__( |
|
self, |
|
layout: Layout | None = None, |
|
style: BaseStyle | None = None, |
|
include_default_pygments_style: FilterOrBool = True, |
|
style_transformation: StyleTransformation | None = None, |
|
key_bindings: KeyBindingsBase | None = None, |
|
clipboard: Clipboard | None = None, |
|
full_screen: bool = False, |
|
color_depth: (ColorDepth | Callable[[], ColorDepth | None] | None) = None, |
|
mouse_support: FilterOrBool = False, |
|
enable_page_navigation_bindings: None |
|
| (FilterOrBool) = None, |
|
paste_mode: FilterOrBool = False, |
|
editing_mode: EditingMode = EditingMode.EMACS, |
|
erase_when_done: bool = False, |
|
reverse_vi_search_direction: FilterOrBool = False, |
|
min_redraw_interval: float | int | None = None, |
|
max_render_postpone_time: float | int | None = 0.01, |
|
refresh_interval: float | None = None, |
|
terminal_size_polling_interval: float | None = 0.5, |
|
cursor: AnyCursorShapeConfig = None, |
|
on_reset: ApplicationEventHandler[_AppResult] | None = None, |
|
on_invalidate: ApplicationEventHandler[_AppResult] | None = None, |
|
before_render: ApplicationEventHandler[_AppResult] | None = None, |
|
after_render: ApplicationEventHandler[_AppResult] | None = None, |
|
|
|
input: Input | None = None, |
|
output: Output | None = None, |
|
) -> None: |
|
|
|
|
|
if enable_page_navigation_bindings is None: |
|
enable_page_navigation_bindings = Condition(lambda: self.full_screen) |
|
|
|
paste_mode = to_filter(paste_mode) |
|
mouse_support = to_filter(mouse_support) |
|
reverse_vi_search_direction = to_filter(reverse_vi_search_direction) |
|
enable_page_navigation_bindings = to_filter(enable_page_navigation_bindings) |
|
include_default_pygments_style = to_filter(include_default_pygments_style) |
|
|
|
if layout is None: |
|
layout = create_dummy_layout() |
|
|
|
if style_transformation is None: |
|
style_transformation = DummyStyleTransformation() |
|
|
|
self.style = style |
|
self.style_transformation = style_transformation |
|
|
|
|
|
self.key_bindings = key_bindings |
|
self._default_bindings = load_key_bindings() |
|
self._page_navigation_bindings = load_page_navigation_bindings() |
|
|
|
self.layout = layout |
|
self.clipboard = clipboard or InMemoryClipboard() |
|
self.full_screen: bool = full_screen |
|
self._color_depth = color_depth |
|
self.mouse_support = mouse_support |
|
|
|
self.paste_mode = paste_mode |
|
self.editing_mode = editing_mode |
|
self.erase_when_done = erase_when_done |
|
self.reverse_vi_search_direction = reverse_vi_search_direction |
|
self.enable_page_navigation_bindings = enable_page_navigation_bindings |
|
self.min_redraw_interval = min_redraw_interval |
|
self.max_render_postpone_time = max_render_postpone_time |
|
self.refresh_interval = refresh_interval |
|
self.terminal_size_polling_interval = terminal_size_polling_interval |
|
|
|
self.cursor = to_cursor_shape_config(cursor) |
|
|
|
|
|
self.on_invalidate = Event(self, on_invalidate) |
|
self.on_reset = Event(self, on_reset) |
|
self.before_render = Event(self, before_render) |
|
self.after_render = Event(self, after_render) |
|
|
|
|
|
session = get_app_session() |
|
self.output = output or session.output |
|
self.input = input or session.input |
|
|
|
|
|
self.pre_run_callables: list[Callable[[], None]] = [] |
|
|
|
self._is_running = False |
|
self.future: Future[_AppResult] | None = None |
|
self.loop: AbstractEventLoop | None = None |
|
self._loop_thread: threading.Thread | None = None |
|
self.context: contextvars.Context | None = None |
|
|
|
|
|
self.quoted_insert = False |
|
|
|
|
|
self.vi_state = ViState() |
|
self.emacs_state = EmacsState() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.ttimeoutlen = 0.5 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.timeoutlen = 1.0 |
|
|
|
|
|
|
|
self._merged_style = self._create_merged_style(include_default_pygments_style) |
|
|
|
self.renderer = Renderer( |
|
self._merged_style, |
|
self.output, |
|
full_screen=full_screen, |
|
mouse_support=mouse_support, |
|
cpr_not_supported_callback=self.cpr_not_supported_callback, |
|
) |
|
|
|
|
|
|
|
|
|
self.render_counter = 0 |
|
|
|
|
|
self._invalidated = False |
|
self._invalidate_events: list[ |
|
Event[object] |
|
] = [] |
|
self._last_redraw_time = 0.0 |
|
|
|
|
|
|
|
self.key_processor = KeyProcessor(_CombinedRegistry(self)) |
|
|
|
|
|
|
|
self._running_in_terminal = False |
|
self._running_in_terminal_f: Future[None] | None = None |
|
|
|
|
|
self.reset() |
|
|
|
def _create_merged_style(self, include_default_pygments_style: Filter) -> BaseStyle: |
|
""" |
|
Create a `Style` object that merges the default UI style, the default |
|
pygments style, and the custom user style. |
|
""" |
|
dummy_style = DummyStyle() |
|
pygments_style = default_pygments_style() |
|
|
|
@DynamicStyle |
|
def conditional_pygments_style() -> BaseStyle: |
|
if include_default_pygments_style(): |
|
return pygments_style |
|
else: |
|
return dummy_style |
|
|
|
return merge_styles( |
|
[ |
|
default_ui_style(), |
|
conditional_pygments_style, |
|
DynamicStyle(lambda: self.style), |
|
] |
|
) |
|
|
|
@property |
|
def color_depth(self) -> ColorDepth: |
|
""" |
|
The active :class:`.ColorDepth`. |
|
|
|
The current value is determined as follows: |
|
|
|
- If a color depth was given explicitly to this application, use that |
|
value. |
|
- Otherwise, fall back to the color depth that is reported by the |
|
:class:`.Output` implementation. If the :class:`.Output` class was |
|
created using `output.defaults.create_output`, then this value is |
|
coming from the $PROMPT_TOOLKIT_COLOR_DEPTH environment variable. |
|
""" |
|
depth = self._color_depth |
|
|
|
if callable(depth): |
|
depth = depth() |
|
|
|
if depth is None: |
|
depth = self.output.get_default_color_depth() |
|
|
|
return depth |
|
|
|
@property |
|
def current_buffer(self) -> Buffer: |
|
""" |
|
The currently focused :class:`~.Buffer`. |
|
|
|
(This returns a dummy :class:`.Buffer` when none of the actual buffers |
|
has the focus. In this case, it's really not practical to check for |
|
`None` values or catch exceptions every time.) |
|
""" |
|
return self.layout.current_buffer or Buffer( |
|
name="dummy-buffer" |
|
) |
|
|
|
@property |
|
def current_search_state(self) -> SearchState: |
|
""" |
|
Return the current :class:`.SearchState`. (The one for the focused |
|
:class:`.BufferControl`.) |
|
""" |
|
ui_control = self.layout.current_control |
|
if isinstance(ui_control, BufferControl): |
|
return ui_control.search_state |
|
else: |
|
return SearchState() |
|
|
|
def reset(self) -> None: |
|
""" |
|
Reset everything, for reading the next input. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
self.exit_style = "" |
|
|
|
self._background_tasks: set[Task[None]] = set() |
|
|
|
self.renderer.reset() |
|
self.key_processor.reset() |
|
self.layout.reset() |
|
self.vi_state.reset() |
|
self.emacs_state.reset() |
|
|
|
|
|
self.on_reset.fire() |
|
|
|
|
|
|
|
layout = self.layout |
|
|
|
if not layout.current_control.is_focusable(): |
|
for w in layout.find_all_windows(): |
|
if w.content.is_focusable(): |
|
layout.current_window = w |
|
break |
|
|
|
def invalidate(self) -> None: |
|
""" |
|
Thread safe way of sending a repaint trigger to the input event loop. |
|
""" |
|
if not self._is_running: |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
if self.loop is None or self.loop.is_closed(): |
|
return |
|
|
|
|
|
|
|
|
|
if self._invalidated: |
|
return |
|
else: |
|
self._invalidated = True |
|
|
|
|
|
self.loop.call_soon_threadsafe(self.on_invalidate.fire) |
|
|
|
def redraw() -> None: |
|
self._invalidated = False |
|
self._redraw() |
|
|
|
def schedule_redraw() -> None: |
|
call_soon_threadsafe( |
|
redraw, max_postpone_time=self.max_render_postpone_time, loop=self.loop |
|
) |
|
|
|
if self.min_redraw_interval: |
|
|
|
|
|
diff = time.time() - self._last_redraw_time |
|
if diff < self.min_redraw_interval: |
|
|
|
async def redraw_in_future() -> None: |
|
await sleep(cast(float, self.min_redraw_interval) - diff) |
|
schedule_redraw() |
|
|
|
self.loop.call_soon_threadsafe( |
|
lambda: self.create_background_task(redraw_in_future()) |
|
) |
|
else: |
|
schedule_redraw() |
|
else: |
|
schedule_redraw() |
|
|
|
@property |
|
def invalidated(self) -> bool: |
|
"True when a redraw operation has been scheduled." |
|
return self._invalidated |
|
|
|
def _redraw(self, render_as_done: bool = False) -> None: |
|
""" |
|
Render the command line again. (Not thread safe!) (From other threads, |
|
or if unsure, use :meth:`.Application.invalidate`.) |
|
|
|
:param render_as_done: make sure to put the cursor after the UI. |
|
""" |
|
|
|
def run_in_context() -> None: |
|
|
|
if self._is_running and not self._running_in_terminal: |
|
if self.min_redraw_interval: |
|
self._last_redraw_time = time.time() |
|
|
|
|
|
self.render_counter += 1 |
|
self.before_render.fire() |
|
|
|
if render_as_done: |
|
if self.erase_when_done: |
|
self.renderer.erase() |
|
else: |
|
|
|
self.renderer.render(self, self.layout, is_done=render_as_done) |
|
else: |
|
self.renderer.render(self, self.layout) |
|
|
|
self.layout.update_parents_relations() |
|
|
|
|
|
self.after_render.fire() |
|
|
|
self._update_invalidate_events() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.context is not None: |
|
self.context.copy().run(run_in_context) |
|
|
|
def _start_auto_refresh_task(self) -> None: |
|
""" |
|
Start a while/true loop in the background for automatic invalidation of |
|
the UI. |
|
""" |
|
if self.refresh_interval is not None and self.refresh_interval != 0: |
|
|
|
async def auto_refresh(refresh_interval: float) -> None: |
|
while True: |
|
await sleep(refresh_interval) |
|
self.invalidate() |
|
|
|
self.create_background_task(auto_refresh(self.refresh_interval)) |
|
|
|
def _update_invalidate_events(self) -> None: |
|
""" |
|
Make sure to attach 'invalidate' handlers to all invalidate events in |
|
the UI. |
|
""" |
|
|
|
|
|
for ev in self._invalidate_events: |
|
ev -= self._invalidate_handler |
|
|
|
|
|
|
|
def gather_events() -> Iterable[Event[object]]: |
|
for c in self.layout.find_all_controls(): |
|
yield from c.get_invalidate_events() |
|
|
|
self._invalidate_events = list(gather_events()) |
|
|
|
for ev in self._invalidate_events: |
|
ev += self._invalidate_handler |
|
|
|
def _invalidate_handler(self, sender: object) -> None: |
|
""" |
|
Handler for invalidate events coming from UIControls. |
|
|
|
(This handles the difference in signature between event handler and |
|
`self.invalidate`. It also needs to be a method -not a nested |
|
function-, so that we can remove it again .) |
|
""" |
|
self.invalidate() |
|
|
|
def _on_resize(self) -> None: |
|
""" |
|
When the window size changes, we erase the current output and request |
|
again the cursor position. When the CPR answer arrives, the output is |
|
drawn again. |
|
""" |
|
|
|
|
|
self.renderer.erase(leave_alternate_screen=False) |
|
self._request_absolute_cursor_position() |
|
self._redraw() |
|
|
|
def _pre_run(self, pre_run: Callable[[], None] | None = None) -> None: |
|
""" |
|
Called during `run`. |
|
|
|
`self.future` should be set to the new future at the point where this |
|
is called in order to avoid data races. `pre_run` can be used to set a |
|
`threading.Event` to synchronize with UI termination code, running in |
|
another thread that would call `Application.exit`. (See the progress |
|
bar code for an example.) |
|
""" |
|
if pre_run: |
|
pre_run() |
|
|
|
|
|
for c in self.pre_run_callables: |
|
c() |
|
del self.pre_run_callables[:] |
|
|
|
async def run_async( |
|
self, |
|
pre_run: Callable[[], None] | None = None, |
|
set_exception_handler: bool = True, |
|
handle_sigint: bool = True, |
|
slow_callback_duration: float = 0.5, |
|
) -> _AppResult: |
|
""" |
|
Run the prompt_toolkit :class:`~prompt_toolkit.application.Application` |
|
until :meth:`~prompt_toolkit.application.Application.exit` has been |
|
called. Return the value that was passed to |
|
:meth:`~prompt_toolkit.application.Application.exit`. |
|
|
|
This is the main entry point for a prompt_toolkit |
|
:class:`~prompt_toolkit.application.Application` and usually the only |
|
place where the event loop is actually running. |
|
|
|
:param pre_run: Optional callable, which is called right after the |
|
"reset" of the application. |
|
:param set_exception_handler: When set, in case of an exception, go out |
|
of the alternate screen and hide the application, display the |
|
exception, and wait for the user to press ENTER. |
|
:param handle_sigint: Handle SIGINT signal if possible. This will call |
|
the `<sigint>` key binding when a SIGINT is received. (This only |
|
works in the main thread.) |
|
:param slow_callback_duration: Display warnings if code scheduled in |
|
the asyncio event loop takes more time than this. The asyncio |
|
default of `0.1` is sometimes not sufficient on a slow system, |
|
because exceptionally, the drawing of the app, which happens in the |
|
event loop, can take a bit longer from time to time. |
|
""" |
|
assert not self._is_running, "Application is already running." |
|
|
|
if not in_main_thread() or sys.platform == "win32": |
|
|
|
|
|
|
|
|
|
handle_sigint = False |
|
|
|
async def _run_async(f: asyncio.Future[_AppResult]) -> _AppResult: |
|
context = contextvars.copy_context() |
|
self.context = context |
|
|
|
|
|
|
|
|
|
|
|
flush_task: asyncio.Task[None] | None = None |
|
|
|
|
|
|
|
self.reset() |
|
self._pre_run(pre_run) |
|
|
|
|
|
self.key_processor.feed_multiple(get_typeahead(self.input)) |
|
self.key_processor.process_keys() |
|
|
|
def read_from_input() -> None: |
|
nonlocal flush_task |
|
|
|
|
|
|
|
|
|
|
|
if not self._is_running and not self.renderer.waiting_for_cpr: |
|
return |
|
|
|
|
|
keys = self.input.read_keys() |
|
|
|
|
|
self.key_processor.feed_multiple(keys) |
|
self.key_processor.process_keys() |
|
|
|
|
|
if self.input.closed: |
|
if not f.done(): |
|
f.set_exception(EOFError) |
|
else: |
|
|
|
if flush_task: |
|
flush_task.cancel() |
|
flush_task = self.create_background_task(auto_flush_input()) |
|
|
|
def read_from_input_in_context() -> None: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
context.copy().run(read_from_input) |
|
|
|
async def auto_flush_input() -> None: |
|
|
|
|
|
|
|
await sleep(self.ttimeoutlen) |
|
flush_input() |
|
|
|
def flush_input() -> None: |
|
if not self.is_done: |
|
|
|
keys = self.input.flush_keys() |
|
self.key_processor.feed_multiple(keys) |
|
self.key_processor.process_keys() |
|
|
|
if self.input.closed: |
|
f.set_exception(EOFError) |
|
|
|
|
|
with self.input.raw_mode(), self.input.attach( |
|
read_from_input_in_context |
|
), attach_winch_signal_handler(self._on_resize): |
|
|
|
self._request_absolute_cursor_position() |
|
self._redraw() |
|
self._start_auto_refresh_task() |
|
|
|
self.create_background_task(self._poll_output_size()) |
|
|
|
|
|
try: |
|
result = await f |
|
finally: |
|
|
|
|
|
try: |
|
self._redraw(render_as_done=True) |
|
finally: |
|
|
|
|
|
|
|
self.renderer.reset() |
|
|
|
|
|
|
|
|
|
self._is_running = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
for ev in self._invalidate_events: |
|
ev -= self._invalidate_handler |
|
self._invalidate_events = [] |
|
|
|
|
|
if self.output.responds_to_cpr: |
|
await self.renderer.wait_for_cpr_responses() |
|
|
|
|
|
previous_run_in_terminal_f = self._running_in_terminal_f |
|
|
|
if previous_run_in_terminal_f: |
|
await previous_run_in_terminal_f |
|
|
|
|
|
store_typeahead(self.input, self.key_processor.empty_queue()) |
|
|
|
return result |
|
|
|
@contextmanager |
|
def set_loop() -> Iterator[AbstractEventLoop]: |
|
loop = get_running_loop() |
|
self.loop = loop |
|
self._loop_thread = threading.current_thread() |
|
|
|
try: |
|
yield loop |
|
finally: |
|
self.loop = None |
|
self._loop_thread = None |
|
|
|
@contextmanager |
|
def set_is_running() -> Iterator[None]: |
|
self._is_running = True |
|
try: |
|
yield |
|
finally: |
|
self._is_running = False |
|
|
|
@contextmanager |
|
def set_handle_sigint(loop: AbstractEventLoop) -> Iterator[None]: |
|
if handle_sigint: |
|
with _restore_sigint_from_ctypes(): |
|
|
|
|
|
loop.add_signal_handler( |
|
signal.SIGINT, |
|
lambda *_: loop.call_soon_threadsafe( |
|
self.key_processor.send_sigint |
|
), |
|
) |
|
try: |
|
yield |
|
finally: |
|
loop.remove_signal_handler(signal.SIGINT) |
|
else: |
|
yield |
|
|
|
@contextmanager |
|
def set_exception_handler_ctx(loop: AbstractEventLoop) -> Iterator[None]: |
|
if set_exception_handler: |
|
previous_exc_handler = loop.get_exception_handler() |
|
loop.set_exception_handler(self._handle_exception) |
|
try: |
|
yield |
|
finally: |
|
loop.set_exception_handler(previous_exc_handler) |
|
|
|
else: |
|
yield |
|
|
|
@contextmanager |
|
def set_callback_duration(loop: AbstractEventLoop) -> Iterator[None]: |
|
|
|
original_slow_callback_duration = loop.slow_callback_duration |
|
loop.slow_callback_duration = slow_callback_duration |
|
try: |
|
yield |
|
finally: |
|
|
|
loop.slow_callback_duration = original_slow_callback_duration |
|
|
|
@contextmanager |
|
def create_future( |
|
loop: AbstractEventLoop, |
|
) -> Iterator[asyncio.Future[_AppResult]]: |
|
f = loop.create_future() |
|
self.future = f |
|
|
|
try: |
|
yield f |
|
finally: |
|
|
|
|
|
|
|
self.future = None |
|
|
|
with ExitStack() as stack: |
|
stack.enter_context(set_is_running()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
self._invalidated = False |
|
|
|
loop = stack.enter_context(set_loop()) |
|
|
|
stack.enter_context(set_handle_sigint(loop)) |
|
stack.enter_context(set_exception_handler_ctx(loop)) |
|
stack.enter_context(set_callback_duration(loop)) |
|
stack.enter_context(set_app(self)) |
|
stack.enter_context(self._enable_breakpointhook()) |
|
|
|
f = stack.enter_context(create_future(loop)) |
|
|
|
try: |
|
return await _run_async(f) |
|
finally: |
|
|
|
|
|
|
|
|
|
await self.cancel_and_wait_for_background_tasks() |
|
|
|
|
|
|
|
|
|
|
|
assert False, "unreachable" |
|
|
|
def run( |
|
self, |
|
pre_run: Callable[[], None] | None = None, |
|
set_exception_handler: bool = True, |
|
handle_sigint: bool = True, |
|
in_thread: bool = False, |
|
inputhook: InputHook | None = None, |
|
) -> _AppResult: |
|
""" |
|
A blocking 'run' call that waits until the UI is finished. |
|
|
|
This will run the application in a fresh asyncio event loop. |
|
|
|
:param pre_run: Optional callable, which is called right after the |
|
"reset" of the application. |
|
:param set_exception_handler: When set, in case of an exception, go out |
|
of the alternate screen and hide the application, display the |
|
exception, and wait for the user to press ENTER. |
|
:param in_thread: When true, run the application in a background |
|
thread, and block the current thread until the application |
|
terminates. This is useful if we need to be sure the application |
|
won't use the current event loop (asyncio does not support nested |
|
event loops). A new event loop will be created in this background |
|
thread, and that loop will also be closed when the background |
|
thread terminates. When this is used, it's especially important to |
|
make sure that all asyncio background tasks are managed through |
|
`get_appp().create_background_task()`, so that unfinished tasks are |
|
properly cancelled before the event loop is closed. This is used |
|
for instance in ptpython. |
|
:param handle_sigint: Handle SIGINT signal. Call the key binding for |
|
`Keys.SIGINT`. (This only works in the main thread.) |
|
""" |
|
if in_thread: |
|
result: _AppResult |
|
exception: BaseException | None = None |
|
|
|
def run_in_thread() -> None: |
|
nonlocal result, exception |
|
try: |
|
result = self.run( |
|
pre_run=pre_run, |
|
set_exception_handler=set_exception_handler, |
|
|
|
handle_sigint=False, |
|
inputhook=inputhook, |
|
) |
|
except BaseException as e: |
|
exception = e |
|
|
|
thread = threading.Thread(target=run_in_thread) |
|
thread.start() |
|
thread.join() |
|
|
|
if exception is not None: |
|
raise exception |
|
return result |
|
|
|
coro = self.run_async( |
|
pre_run=pre_run, |
|
set_exception_handler=set_exception_handler, |
|
handle_sigint=handle_sigint, |
|
) |
|
|
|
def _called_from_ipython() -> bool: |
|
try: |
|
return ( |
|
sys.modules["IPython"].version_info < (8, 18, 0, "") |
|
and "IPython/terminal/interactiveshell.py" |
|
in sys._getframe(3).f_code.co_filename |
|
) |
|
except BaseException: |
|
return False |
|
|
|
if inputhook is not None: |
|
|
|
|
|
|
|
loop = new_eventloop_with_inputhook(inputhook) |
|
result = loop.run_until_complete(coro) |
|
loop.run_until_complete(loop.shutdown_asyncgens()) |
|
loop.close() |
|
return result |
|
|
|
elif _called_from_ipython(): |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
except RuntimeError: |
|
|
|
return asyncio.run(coro) |
|
else: |
|
|
|
return loop.run_until_complete(coro) |
|
|
|
else: |
|
|
|
return asyncio.run(coro) |
|
|
|
def _handle_exception( |
|
self, loop: AbstractEventLoop, context: dict[str, Any] |
|
) -> None: |
|
""" |
|
Handler for event loop exceptions. |
|
This will print the exception, using run_in_terminal. |
|
""" |
|
|
|
|
|
|
|
|
|
tb = get_traceback_from_context(context) |
|
formatted_tb = "".join(format_tb(tb)) |
|
|
|
async def in_term() -> None: |
|
async with in_terminal(): |
|
|
|
|
|
print("\nUnhandled exception in event loop:") |
|
print(formatted_tb) |
|
print("Exception {}".format(context.get("exception"))) |
|
|
|
await _do_wait_for_enter("Press ENTER to continue...") |
|
|
|
ensure_future(in_term()) |
|
|
|
@contextmanager |
|
def _enable_breakpointhook(self) -> Generator[None, None, None]: |
|
""" |
|
Install our custom breakpointhook for the duration of this context |
|
manager. (We will only install the hook if no other custom hook was |
|
set.) |
|
""" |
|
if sys.breakpointhook == sys.__breakpointhook__: |
|
sys.breakpointhook = self._breakpointhook |
|
|
|
try: |
|
yield |
|
finally: |
|
sys.breakpointhook = sys.__breakpointhook__ |
|
else: |
|
yield |
|
|
|
def _breakpointhook(self, *a: object, **kw: object) -> None: |
|
""" |
|
Breakpointhook which uses PDB, but ensures that the application is |
|
hidden and input echoing is restored during each debugger dispatch. |
|
|
|
This can be called from any thread. In any case, the application's |
|
event loop will be blocked while the PDB input is displayed. The event |
|
will continue after leaving the debugger. |
|
""" |
|
app = self |
|
|
|
import pdb |
|
from types import FrameType |
|
|
|
TraceDispatch = Callable[[FrameType, str, Any], Any] |
|
|
|
@contextmanager |
|
def hide_app_from_eventloop_thread() -> Generator[None, None, None]: |
|
"""Stop application if `__breakpointhook__` is called from within |
|
the App's event loop.""" |
|
|
|
app.renderer.erase() |
|
|
|
|
|
with app.input.detach(): |
|
with app.input.cooked_mode(): |
|
yield |
|
|
|
|
|
|
|
|
|
|
|
|
|
@contextmanager |
|
def hide_app_from_other_thread() -> Generator[None, None, None]: |
|
"""Stop application if `__breakpointhook__` is called from a |
|
thread other than the App's event loop.""" |
|
ready = threading.Event() |
|
done = threading.Event() |
|
|
|
async def in_loop() -> None: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app.renderer.erase() |
|
|
|
|
|
with app.input.detach(): |
|
with app.input.cooked_mode(): |
|
ready.set() |
|
|
|
|
|
|
|
|
|
|
|
done.wait() |
|
|
|
self.create_background_task(in_loop()) |
|
ready.wait() |
|
try: |
|
yield |
|
finally: |
|
done.set() |
|
|
|
class CustomPdb(pdb.Pdb): |
|
def trace_dispatch( |
|
self, frame: FrameType, event: str, arg: Any |
|
) -> TraceDispatch: |
|
if app._loop_thread is None: |
|
return super().trace_dispatch(frame, event, arg) |
|
|
|
if app._loop_thread == threading.current_thread(): |
|
with hide_app_from_eventloop_thread(): |
|
return super().trace_dispatch(frame, event, arg) |
|
|
|
with hide_app_from_other_thread(): |
|
return super().trace_dispatch(frame, event, arg) |
|
|
|
frame = sys._getframe().f_back |
|
CustomPdb(stdout=sys.__stdout__).set_trace(frame) |
|
|
|
def create_background_task( |
|
self, coroutine: Coroutine[Any, Any, None] |
|
) -> asyncio.Task[None]: |
|
""" |
|
Start a background task (coroutine) for the running application. When |
|
the `Application` terminates, unfinished background tasks will be |
|
cancelled. |
|
|
|
Given that we still support Python versions before 3.11, we can't use |
|
task groups (and exception groups), because of that, these background |
|
tasks are not allowed to raise exceptions. If they do, we'll call the |
|
default exception handler from the event loop. |
|
|
|
If at some point, we have Python 3.11 as the minimum supported Python |
|
version, then we can use a `TaskGroup` (with the lifetime of |
|
`Application.run_async()`, and run run the background tasks in there. |
|
|
|
This is not threadsafe. |
|
""" |
|
loop = self.loop or get_running_loop() |
|
task: asyncio.Task[None] = loop.create_task(coroutine) |
|
self._background_tasks.add(task) |
|
|
|
task.add_done_callback(self._on_background_task_done) |
|
return task |
|
|
|
def _on_background_task_done(self, task: asyncio.Task[None]) -> None: |
|
""" |
|
Called when a background task completes. Remove it from |
|
`_background_tasks`, and handle exceptions if any. |
|
""" |
|
self._background_tasks.discard(task) |
|
|
|
if task.cancelled(): |
|
return |
|
|
|
exc = task.exception() |
|
if exc is not None: |
|
get_running_loop().call_exception_handler( |
|
{ |
|
"message": f"prompt_toolkit.Application background task {task!r} " |
|
"raised an unexpected exception.", |
|
"exception": exc, |
|
"task": task, |
|
} |
|
) |
|
|
|
async def cancel_and_wait_for_background_tasks(self) -> None: |
|
""" |
|
Cancel all background tasks, and wait for the cancellation to complete. |
|
If any of the background tasks raised an exception, this will also |
|
propagate the exception. |
|
|
|
(If we had nurseries like Trio, this would be the `__aexit__` of a |
|
nursery.) |
|
""" |
|
for task in self._background_tasks: |
|
task.cancel() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(self._background_tasks) > 0: |
|
await asyncio.wait( |
|
self._background_tasks, timeout=None, return_when=asyncio.ALL_COMPLETED |
|
) |
|
|
|
async def _poll_output_size(self) -> None: |
|
""" |
|
Coroutine for polling the terminal dimensions. |
|
|
|
Useful for situations where `attach_winch_signal_handler` is not sufficient: |
|
- If we are not running in the main thread. |
|
- On Windows. |
|
""" |
|
size: Size | None = None |
|
interval = self.terminal_size_polling_interval |
|
|
|
if interval is None: |
|
return |
|
|
|
while True: |
|
await asyncio.sleep(interval) |
|
new_size = self.output.get_size() |
|
|
|
if size is not None and new_size != size: |
|
self._on_resize() |
|
size = new_size |
|
|
|
def cpr_not_supported_callback(self) -> None: |
|
""" |
|
Called when we don't receive the cursor position response in time. |
|
""" |
|
if not self.output.responds_to_cpr: |
|
return |
|
|
|
def in_terminal() -> None: |
|
self.output.write( |
|
"WARNING: your terminal doesn't support cursor position requests (CPR).\r\n" |
|
) |
|
self.output.flush() |
|
|
|
run_in_terminal(in_terminal) |
|
|
|
@overload |
|
def exit(self) -> None: |
|
"Exit without arguments." |
|
|
|
@overload |
|
def exit(self, *, result: _AppResult, style: str = "") -> None: |
|
"Exit with `_AppResult`." |
|
|
|
@overload |
|
def exit( |
|
self, *, exception: BaseException | type[BaseException], style: str = "" |
|
) -> None: |
|
"Exit with exception." |
|
|
|
def exit( |
|
self, |
|
result: _AppResult | None = None, |
|
exception: BaseException | type[BaseException] | None = None, |
|
style: str = "", |
|
) -> None: |
|
""" |
|
Exit application. |
|
|
|
.. note:: |
|
|
|
If `Application.exit` is called before `Application.run()` is |
|
called, then the `Application` won't exit (because the |
|
`Application.future` doesn't correspond to the current run). Use a |
|
`pre_run` hook and an event to synchronize the closing if there's a |
|
chance this can happen. |
|
|
|
:param result: Set this result for the application. |
|
:param exception: Set this exception as the result for an application. For |
|
a prompt, this is often `EOFError` or `KeyboardInterrupt`. |
|
:param style: Apply this style on the whole content when quitting, |
|
often this is 'class:exiting' for a prompt. (Used when |
|
`erase_when_done` is not set.) |
|
""" |
|
assert result is None or exception is None |
|
|
|
if self.future is None: |
|
raise Exception("Application is not running. Application.exit() failed.") |
|
|
|
if self.future.done(): |
|
raise Exception("Return value already set. Application.exit() failed.") |
|
|
|
self.exit_style = style |
|
|
|
if exception is not None: |
|
self.future.set_exception(exception) |
|
else: |
|
self.future.set_result(cast(_AppResult, result)) |
|
|
|
def _request_absolute_cursor_position(self) -> None: |
|
""" |
|
Send CPR request. |
|
""" |
|
|
|
|
|
|
|
if not self.key_processor.input_queue and not self.is_done: |
|
self.renderer.request_absolute_cursor_position() |
|
|
|
async def run_system_command( |
|
self, |
|
command: str, |
|
wait_for_enter: bool = True, |
|
display_before_text: AnyFormattedText = "", |
|
wait_text: str = "Press ENTER to continue...", |
|
) -> None: |
|
""" |
|
Run system command (While hiding the prompt. When finished, all the |
|
output will scroll above the prompt.) |
|
|
|
:param command: Shell command to be executed. |
|
:param wait_for_enter: FWait for the user to press enter, when the |
|
command is finished. |
|
:param display_before_text: If given, text to be displayed before the |
|
command executes. |
|
:return: A `Future` object. |
|
""" |
|
async with in_terminal(): |
|
|
|
|
|
try: |
|
input_fd = self.input.fileno() |
|
except AttributeError: |
|
input_fd = sys.stdin.fileno() |
|
try: |
|
output_fd = self.output.fileno() |
|
except AttributeError: |
|
output_fd = sys.stdout.fileno() |
|
|
|
|
|
def run_command() -> None: |
|
self.print_text(display_before_text) |
|
p = Popen(command, shell=True, stdin=input_fd, stdout=output_fd) |
|
p.wait() |
|
|
|
await run_in_executor_with_context(run_command) |
|
|
|
|
|
if wait_for_enter: |
|
await _do_wait_for_enter(wait_text) |
|
|
|
def suspend_to_background(self, suspend_group: bool = True) -> None: |
|
""" |
|
(Not thread safe -- to be called from inside the key bindings.) |
|
Suspend process. |
|
|
|
:param suspend_group: When true, suspend the whole process group. |
|
(This is the default, and probably what you want.) |
|
""" |
|
|
|
|
|
if _SIGTSTP is not None: |
|
|
|
def run() -> None: |
|
signal = cast(int, _SIGTSTP) |
|
|
|
|
|
|
|
|
|
|
|
if suspend_group: |
|
os.kill(0, signal) |
|
else: |
|
os.kill(os.getpid(), signal) |
|
|
|
run_in_terminal(run) |
|
|
|
def print_text( |
|
self, text: AnyFormattedText, style: BaseStyle | None = None |
|
) -> None: |
|
""" |
|
Print a list of (style_str, text) tuples to the output. |
|
(When the UI is running, this method has to be called through |
|
`run_in_terminal`, otherwise it will destroy the UI.) |
|
|
|
:param text: List of ``(style_str, text)`` tuples. |
|
:param style: Style class to use. Defaults to the active style in the CLI. |
|
""" |
|
print_formatted_text( |
|
output=self.output, |
|
formatted_text=text, |
|
style=style or self._merged_style, |
|
color_depth=self.color_depth, |
|
style_transformation=self.style_transformation, |
|
) |
|
|
|
@property |
|
def is_running(self) -> bool: |
|
"`True` when the application is currently active/running." |
|
return self._is_running |
|
|
|
@property |
|
def is_done(self) -> bool: |
|
if self.future: |
|
return self.future.done() |
|
return False |
|
|
|
def get_used_style_strings(self) -> list[str]: |
|
""" |
|
Return a list of used style strings. This is helpful for debugging, and |
|
for writing a new `Style`. |
|
""" |
|
attrs_for_style = self.renderer._attrs_for_style |
|
|
|
if attrs_for_style: |
|
return sorted( |
|
re.sub(r"\s+", " ", style_str).strip() |
|
for style_str in attrs_for_style.keys() |
|
) |
|
|
|
return [] |
|
|
|
|
|
class _CombinedRegistry(KeyBindingsBase): |
|
""" |
|
The `KeyBindings` of key bindings for a `Application`. |
|
This merges the global key bindings with the one of the current user |
|
control. |
|
""" |
|
|
|
def __init__(self, app: Application[_AppResult]) -> None: |
|
self.app = app |
|
self._cache: SimpleCache[ |
|
tuple[Window, frozenset[UIControl]], KeyBindingsBase |
|
] = SimpleCache() |
|
|
|
@property |
|
def _version(self) -> Hashable: |
|
"""Not needed - this object is not going to be wrapped in another |
|
KeyBindings object.""" |
|
raise NotImplementedError |
|
|
|
@property |
|
def bindings(self) -> list[Binding]: |
|
"""Not needed - this object is not going to be wrapped in another |
|
KeyBindings object.""" |
|
raise NotImplementedError |
|
|
|
def _create_key_bindings( |
|
self, current_window: Window, other_controls: list[UIControl] |
|
) -> KeyBindingsBase: |
|
""" |
|
Create a `KeyBindings` object that merges the `KeyBindings` from the |
|
`UIControl` with all the parent controls and the global key bindings. |
|
""" |
|
key_bindings = [] |
|
collected_containers = set() |
|
|
|
|
|
|
|
container: Container = current_window |
|
while True: |
|
collected_containers.add(container) |
|
kb = container.get_key_bindings() |
|
if kb is not None: |
|
key_bindings.append(kb) |
|
|
|
if container.is_modal(): |
|
break |
|
|
|
parent = self.app.layout.get_parent(container) |
|
if parent is None: |
|
break |
|
else: |
|
container = parent |
|
|
|
|
|
for c in walk(container): |
|
if c not in collected_containers: |
|
kb = c.get_key_bindings() |
|
if kb is not None: |
|
key_bindings.append(GlobalOnlyKeyBindings(kb)) |
|
|
|
|
|
if self.app.key_bindings: |
|
key_bindings.append(self.app.key_bindings) |
|
|
|
|
|
key_bindings.append( |
|
ConditionalKeyBindings( |
|
self.app._page_navigation_bindings, |
|
self.app.enable_page_navigation_bindings, |
|
) |
|
) |
|
key_bindings.append(self.app._default_bindings) |
|
|
|
|
|
|
|
key_bindings = key_bindings[::-1] |
|
|
|
return merge_key_bindings(key_bindings) |
|
|
|
@property |
|
def _key_bindings(self) -> KeyBindingsBase: |
|
current_window = self.app.layout.current_window |
|
other_controls = list(self.app.layout.find_all_controls()) |
|
key = current_window, frozenset(other_controls) |
|
|
|
return self._cache.get( |
|
key, lambda: self._create_key_bindings(current_window, other_controls) |
|
) |
|
|
|
def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]: |
|
return self._key_bindings.get_bindings_for_keys(keys) |
|
|
|
def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]: |
|
return self._key_bindings.get_bindings_starting_with_keys(keys) |
|
|
|
|
|
async def _do_wait_for_enter(wait_text: AnyFormattedText) -> None: |
|
""" |
|
Create a sub application to wait for the enter key press. |
|
This has two advantages over using 'input'/'raw_input': |
|
- This will share the same input/output I/O. |
|
- This doesn't block the event loop. |
|
""" |
|
from prompt_toolkit.shortcuts import PromptSession |
|
|
|
key_bindings = KeyBindings() |
|
|
|
@key_bindings.add("enter") |
|
def _ok(event: E) -> None: |
|
event.app.exit() |
|
|
|
@key_bindings.add(Keys.Any) |
|
def _ignore(event: E) -> None: |
|
"Disallow typing." |
|
pass |
|
|
|
session: PromptSession[None] = PromptSession( |
|
message=wait_text, key_bindings=key_bindings |
|
) |
|
try: |
|
await session.app.run_async() |
|
except KeyboardInterrupt: |
|
pass |
|
|
|
|
|
@contextmanager |
|
def attach_winch_signal_handler( |
|
handler: Callable[[], None], |
|
) -> Generator[None, None, None]: |
|
""" |
|
Attach the given callback as a WINCH signal handler within the context |
|
manager. Restore the original signal handler when done. |
|
|
|
The `Application.run` method will register SIGWINCH, so that it will |
|
properly repaint when the terminal window resizes. However, using |
|
`run_in_terminal`, we can temporarily send an application to the |
|
background, and run an other app in between, which will then overwrite the |
|
SIGWINCH. This is why it's important to restore the handler when the app |
|
terminates. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
sigwinch = getattr(signal, "SIGWINCH", None) |
|
if sigwinch is None or not in_main_thread(): |
|
yield |
|
return |
|
|
|
|
|
|
|
loop = get_running_loop() |
|
previous_winch_handler = getattr(loop, "_signal_handlers", {}).get(sigwinch) |
|
|
|
try: |
|
loop.add_signal_handler(sigwinch, handler) |
|
yield |
|
finally: |
|
|
|
loop.remove_signal_handler(sigwinch) |
|
if previous_winch_handler is not None: |
|
loop.add_signal_handler( |
|
sigwinch, |
|
previous_winch_handler._callback, |
|
*previous_winch_handler._args, |
|
) |
|
|
|
|
|
@contextmanager |
|
def _restore_sigint_from_ctypes() -> Generator[None, None, None]: |
|
|
|
|
|
|
|
try: |
|
from ctypes import c_int, c_void_p, pythonapi |
|
except ImportError: |
|
|
|
yield |
|
return |
|
|
|
|
|
pythonapi.PyOS_getsig.restype = c_void_p |
|
pythonapi.PyOS_getsig.argtypes = (c_int,) |
|
|
|
|
|
pythonapi.PyOS_setsig.restype = c_void_p |
|
pythonapi.PyOS_setsig.argtypes = ( |
|
c_int, |
|
c_void_p, |
|
) |
|
|
|
sigint = signal.getsignal(signal.SIGINT) |
|
sigint_os = pythonapi.PyOS_getsig(signal.SIGINT) |
|
|
|
try: |
|
yield |
|
finally: |
|
if sigint is not None: |
|
signal.signal(signal.SIGINT, sigint) |
|
pythonapi.PyOS_setsig(signal.SIGINT, sigint_os) |
|
|