|
""" |
|
Key bindings registry. |
|
|
|
A `KeyBindings` object is a container that holds a list of key bindings. It has a |
|
very efficient internal data structure for checking which key bindings apply |
|
for a pressed key. |
|
|
|
Typical usage:: |
|
|
|
kb = KeyBindings() |
|
|
|
@kb.add(Keys.ControlX, Keys.ControlC, filter=INSERT) |
|
def handler(event): |
|
# Handle ControlX-ControlC key sequence. |
|
pass |
|
|
|
It is also possible to combine multiple KeyBindings objects. We do this in the |
|
default key bindings. There are some KeyBindings objects that contain the Emacs |
|
bindings, while others contain the Vi bindings. They are merged together using |
|
`merge_key_bindings`. |
|
|
|
We also have a `ConditionalKeyBindings` object that can enable/disable a group of |
|
key bindings at once. |
|
|
|
|
|
It is also possible to add a filter to a function, before a key binding has |
|
been assigned, through the `key_binding` decorator.:: |
|
|
|
# First define a key handler with the `filter`. |
|
@key_binding(filter=condition) |
|
def my_key_binding(event): |
|
... |
|
|
|
# Later, add it to the key bindings. |
|
kb.add(Keys.A, my_key_binding) |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
from abc import ABCMeta, abstractmethod, abstractproperty |
|
from inspect import isawaitable |
|
from typing import ( |
|
TYPE_CHECKING, |
|
Any, |
|
Callable, |
|
Coroutine, |
|
Hashable, |
|
Sequence, |
|
Tuple, |
|
TypeVar, |
|
Union, |
|
cast, |
|
) |
|
|
|
from prompt_toolkit.cache import SimpleCache |
|
from prompt_toolkit.filters import FilterOrBool, Never, to_filter |
|
from prompt_toolkit.keys import KEY_ALIASES, Keys |
|
|
|
if TYPE_CHECKING: |
|
|
|
from .key_processor import KeyPressEvent |
|
|
|
|
|
|
|
|
|
|
|
NotImplementedOrNone = object |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = [ |
|
"NotImplementedOrNone", |
|
"Binding", |
|
"KeyBindingsBase", |
|
"KeyBindings", |
|
"ConditionalKeyBindings", |
|
"merge_key_bindings", |
|
"DynamicKeyBindings", |
|
"GlobalOnlyKeyBindings", |
|
] |
|
|
|
|
|
|
|
|
|
|
|
KeyHandlerCallable = Callable[ |
|
["KeyPressEvent"], |
|
Union["NotImplementedOrNone", Coroutine[Any, Any, "NotImplementedOrNone"]], |
|
] |
|
|
|
|
|
class Binding: |
|
""" |
|
Key binding: (key sequence + handler + filter). |
|
(Immutable binding class.) |
|
|
|
:param record_in_macro: When True, don't record this key binding when a |
|
macro is recorded. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
keys: tuple[Keys | str, ...], |
|
handler: KeyHandlerCallable, |
|
filter: FilterOrBool = True, |
|
eager: FilterOrBool = False, |
|
is_global: FilterOrBool = False, |
|
save_before: Callable[[KeyPressEvent], bool] = (lambda e: True), |
|
record_in_macro: FilterOrBool = True, |
|
) -> None: |
|
self.keys = keys |
|
self.handler = handler |
|
self.filter = to_filter(filter) |
|
self.eager = to_filter(eager) |
|
self.is_global = to_filter(is_global) |
|
self.save_before = save_before |
|
self.record_in_macro = to_filter(record_in_macro) |
|
|
|
def call(self, event: KeyPressEvent) -> None: |
|
result = self.handler(event) |
|
|
|
|
|
if isawaitable(result): |
|
awaitable = cast(Coroutine[Any, Any, "NotImplementedOrNone"], result) |
|
|
|
async def bg_task() -> None: |
|
result = await awaitable |
|
if result != NotImplemented: |
|
event.app.invalidate() |
|
|
|
event.app.create_background_task(bg_task()) |
|
|
|
elif result != NotImplemented: |
|
event.app.invalidate() |
|
|
|
def __repr__(self) -> str: |
|
return ( |
|
f"{self.__class__.__name__}(keys={self.keys!r}, handler={self.handler!r})" |
|
) |
|
|
|
|
|
|
|
KeysTuple = Tuple[Union[Keys, str], ...] |
|
|
|
|
|
class KeyBindingsBase(metaclass=ABCMeta): |
|
""" |
|
Interface for a KeyBindings. |
|
""" |
|
|
|
@abstractproperty |
|
def _version(self) -> Hashable: |
|
""" |
|
For cache invalidation. - This should increase every time that |
|
something changes. |
|
""" |
|
return 0 |
|
|
|
@abstractmethod |
|
def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]: |
|
""" |
|
Return a list of key bindings that can handle these keys. |
|
(This return also inactive bindings, so the `filter` still has to be |
|
called, for checking it.) |
|
|
|
:param keys: tuple of keys. |
|
""" |
|
return [] |
|
|
|
@abstractmethod |
|
def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]: |
|
""" |
|
Return a list of key bindings that handle a key sequence starting with |
|
`keys`. (It does only return bindings for which the sequences are |
|
longer than `keys`. And like `get_bindings_for_keys`, it also includes |
|
inactive bindings.) |
|
|
|
:param keys: tuple of keys. |
|
""" |
|
return [] |
|
|
|
@abstractproperty |
|
def bindings(self) -> list[Binding]: |
|
""" |
|
List of `Binding` objects. |
|
(These need to be exposed, so that `KeyBindings` objects can be merged |
|
together.) |
|
""" |
|
return [] |
|
|
|
|
|
|
|
|
|
T = TypeVar("T", bound=Union[KeyHandlerCallable, Binding]) |
|
|
|
|
|
class KeyBindings(KeyBindingsBase): |
|
""" |
|
A container for a set of key bindings. |
|
|
|
Example usage:: |
|
|
|
kb = KeyBindings() |
|
|
|
@kb.add('c-t') |
|
def _(event): |
|
print('Control-T pressed') |
|
|
|
@kb.add('c-a', 'c-b') |
|
def _(event): |
|
print('Control-A pressed, followed by Control-B') |
|
|
|
@kb.add('c-x', filter=is_searching) |
|
def _(event): |
|
print('Control-X pressed') # Works only if we are searching. |
|
|
|
""" |
|
|
|
def __init__(self) -> None: |
|
self._bindings: list[Binding] = [] |
|
self._get_bindings_for_keys_cache: SimpleCache[KeysTuple, list[Binding]] = ( |
|
SimpleCache(maxsize=10000) |
|
) |
|
self._get_bindings_starting_with_keys_cache: SimpleCache[ |
|
KeysTuple, list[Binding] |
|
] = SimpleCache(maxsize=1000) |
|
self.__version = 0 |
|
|
|
def _clear_cache(self) -> None: |
|
self.__version += 1 |
|
self._get_bindings_for_keys_cache.clear() |
|
self._get_bindings_starting_with_keys_cache.clear() |
|
|
|
@property |
|
def bindings(self) -> list[Binding]: |
|
return self._bindings |
|
|
|
@property |
|
def _version(self) -> Hashable: |
|
return self.__version |
|
|
|
def add( |
|
self, |
|
*keys: Keys | str, |
|
filter: FilterOrBool = True, |
|
eager: FilterOrBool = False, |
|
is_global: FilterOrBool = False, |
|
save_before: Callable[[KeyPressEvent], bool] = (lambda e: True), |
|
record_in_macro: FilterOrBool = True, |
|
) -> Callable[[T], T]: |
|
""" |
|
Decorator for adding a key bindings. |
|
|
|
:param filter: :class:`~prompt_toolkit.filters.Filter` to determine |
|
when this key binding is active. |
|
:param eager: :class:`~prompt_toolkit.filters.Filter` or `bool`. |
|
When True, ignore potential longer matches when this key binding is |
|
hit. E.g. when there is an active eager key binding for Ctrl-X, |
|
execute the handler immediately and ignore the key binding for |
|
Ctrl-X Ctrl-E of which it is a prefix. |
|
:param is_global: When this key bindings is added to a `Container` or |
|
`Control`, make it a global (always active) binding. |
|
:param save_before: Callable that takes an `Event` and returns True if |
|
we should save the current buffer, before handling the event. |
|
(That's the default.) |
|
:param record_in_macro: Record these key bindings when a macro is |
|
being recorded. (True by default.) |
|
""" |
|
assert keys |
|
|
|
keys = tuple(_parse_key(k) for k in keys) |
|
|
|
if isinstance(filter, Never): |
|
|
|
|
|
|
|
def decorator(func: T) -> T: |
|
return func |
|
|
|
else: |
|
|
|
def decorator(func: T) -> T: |
|
if isinstance(func, Binding): |
|
|
|
self.bindings.append( |
|
Binding( |
|
keys, |
|
func.handler, |
|
filter=func.filter & to_filter(filter), |
|
eager=to_filter(eager) | func.eager, |
|
is_global=to_filter(is_global) | func.is_global, |
|
save_before=func.save_before, |
|
record_in_macro=func.record_in_macro, |
|
) |
|
) |
|
else: |
|
self.bindings.append( |
|
Binding( |
|
keys, |
|
cast(KeyHandlerCallable, func), |
|
filter=filter, |
|
eager=eager, |
|
is_global=is_global, |
|
save_before=save_before, |
|
record_in_macro=record_in_macro, |
|
) |
|
) |
|
self._clear_cache() |
|
|
|
return func |
|
|
|
return decorator |
|
|
|
def remove(self, *args: Keys | str | KeyHandlerCallable) -> None: |
|
""" |
|
Remove a key binding. |
|
|
|
This expects either a function that was given to `add` method as |
|
parameter or a sequence of key bindings. |
|
|
|
Raises `ValueError` when no bindings was found. |
|
|
|
Usage:: |
|
|
|
remove(handler) # Pass handler. |
|
remove('c-x', 'c-a') # Or pass the key bindings. |
|
""" |
|
found = False |
|
|
|
if callable(args[0]): |
|
assert len(args) == 1 |
|
function = args[0] |
|
|
|
|
|
for b in self.bindings: |
|
if b.handler == function: |
|
self.bindings.remove(b) |
|
found = True |
|
|
|
else: |
|
assert len(args) > 0 |
|
args = cast(Tuple[Union[Keys, str]], args) |
|
|
|
|
|
keys = tuple(_parse_key(k) for k in args) |
|
|
|
for b in self.bindings: |
|
if b.keys == keys: |
|
self.bindings.remove(b) |
|
found = True |
|
|
|
if found: |
|
self._clear_cache() |
|
else: |
|
|
|
raise ValueError(f"Binding not found: {function!r}") |
|
|
|
|
|
add_binding = add |
|
remove_binding = remove |
|
|
|
def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]: |
|
""" |
|
Return a list of key bindings that can handle this key. |
|
(This return also inactive bindings, so the `filter` still has to be |
|
called, for checking it.) |
|
|
|
:param keys: tuple of keys. |
|
""" |
|
|
|
def get() -> list[Binding]: |
|
result: list[tuple[int, Binding]] = [] |
|
|
|
for b in self.bindings: |
|
if len(keys) == len(b.keys): |
|
match = True |
|
any_count = 0 |
|
|
|
for i, j in zip(b.keys, keys): |
|
if i != j and i != Keys.Any: |
|
match = False |
|
break |
|
|
|
if i == Keys.Any: |
|
any_count += 1 |
|
|
|
if match: |
|
result.append((any_count, b)) |
|
|
|
|
|
result = sorted(result, key=lambda item: -item[0]) |
|
|
|
return [item[1] for item in result] |
|
|
|
return self._get_bindings_for_keys_cache.get(keys, get) |
|
|
|
def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]: |
|
""" |
|
Return a list of key bindings that handle a key sequence starting with |
|
`keys`. (It does only return bindings for which the sequences are |
|
longer than `keys`. And like `get_bindings_for_keys`, it also includes |
|
inactive bindings.) |
|
|
|
:param keys: tuple of keys. |
|
""" |
|
|
|
def get() -> list[Binding]: |
|
result = [] |
|
for b in self.bindings: |
|
if len(keys) < len(b.keys): |
|
match = True |
|
for i, j in zip(b.keys, keys): |
|
if i != j and i != Keys.Any: |
|
match = False |
|
break |
|
if match: |
|
result.append(b) |
|
return result |
|
|
|
return self._get_bindings_starting_with_keys_cache.get(keys, get) |
|
|
|
|
|
def _parse_key(key: Keys | str) -> str | Keys: |
|
""" |
|
Replace key by alias and verify whether it's a valid one. |
|
""" |
|
|
|
if isinstance(key, Keys): |
|
return key |
|
|
|
|
|
key = KEY_ALIASES.get(key, key) |
|
|
|
|
|
if key == "space": |
|
key = " " |
|
|
|
|
|
try: |
|
return Keys(key) |
|
except ValueError: |
|
pass |
|
|
|
|
|
if len(key) != 1: |
|
raise ValueError(f"Invalid key: {key}") |
|
|
|
return key |
|
|
|
|
|
def key_binding( |
|
filter: FilterOrBool = True, |
|
eager: FilterOrBool = False, |
|
is_global: FilterOrBool = False, |
|
save_before: Callable[[KeyPressEvent], bool] = (lambda event: True), |
|
record_in_macro: FilterOrBool = True, |
|
) -> Callable[[KeyHandlerCallable], Binding]: |
|
""" |
|
Decorator that turn a function into a `Binding` object. This can be added |
|
to a `KeyBindings` object when a key binding is assigned. |
|
""" |
|
assert save_before is None or callable(save_before) |
|
|
|
filter = to_filter(filter) |
|
eager = to_filter(eager) |
|
is_global = to_filter(is_global) |
|
save_before = save_before |
|
record_in_macro = to_filter(record_in_macro) |
|
keys = () |
|
|
|
def decorator(function: KeyHandlerCallable) -> Binding: |
|
return Binding( |
|
keys, |
|
function, |
|
filter=filter, |
|
eager=eager, |
|
is_global=is_global, |
|
save_before=save_before, |
|
record_in_macro=record_in_macro, |
|
) |
|
|
|
return decorator |
|
|
|
|
|
class _Proxy(KeyBindingsBase): |
|
""" |
|
Common part for ConditionalKeyBindings and _MergedKeyBindings. |
|
""" |
|
|
|
def __init__(self) -> None: |
|
|
|
self._bindings2: KeyBindingsBase = KeyBindings() |
|
self._last_version: Hashable = () |
|
|
|
def _update_cache(self) -> None: |
|
""" |
|
If `self._last_version` is outdated, then this should update |
|
the version and `self._bindings2`. |
|
""" |
|
raise NotImplementedError |
|
|
|
|
|
|
|
@property |
|
def bindings(self) -> list[Binding]: |
|
self._update_cache() |
|
return self._bindings2.bindings |
|
|
|
@property |
|
def _version(self) -> Hashable: |
|
self._update_cache() |
|
return self._last_version |
|
|
|
def get_bindings_for_keys(self, keys: KeysTuple) -> list[Binding]: |
|
self._update_cache() |
|
return self._bindings2.get_bindings_for_keys(keys) |
|
|
|
def get_bindings_starting_with_keys(self, keys: KeysTuple) -> list[Binding]: |
|
self._update_cache() |
|
return self._bindings2.get_bindings_starting_with_keys(keys) |
|
|
|
|
|
class ConditionalKeyBindings(_Proxy): |
|
""" |
|
Wraps around a `KeyBindings`. Disable/enable all the key bindings according to |
|
the given (additional) filter.:: |
|
|
|
@Condition |
|
def setting_is_true(): |
|
return True # or False |
|
|
|
registry = ConditionalKeyBindings(key_bindings, setting_is_true) |
|
|
|
When new key bindings are added to this object. They are also |
|
enable/disabled according to the given `filter`. |
|
|
|
:param registries: List of :class:`.KeyBindings` objects. |
|
:param filter: :class:`~prompt_toolkit.filters.Filter` object. |
|
""" |
|
|
|
def __init__( |
|
self, key_bindings: KeyBindingsBase, filter: FilterOrBool = True |
|
) -> None: |
|
_Proxy.__init__(self) |
|
|
|
self.key_bindings = key_bindings |
|
self.filter = to_filter(filter) |
|
|
|
def _update_cache(self) -> None: |
|
"If the original key bindings was changed. Update our copy version." |
|
expected_version = self.key_bindings._version |
|
|
|
if self._last_version != expected_version: |
|
bindings2 = KeyBindings() |
|
|
|
|
|
for b in self.key_bindings.bindings: |
|
bindings2.bindings.append( |
|
Binding( |
|
keys=b.keys, |
|
handler=b.handler, |
|
filter=self.filter & b.filter, |
|
eager=b.eager, |
|
is_global=b.is_global, |
|
save_before=b.save_before, |
|
record_in_macro=b.record_in_macro, |
|
) |
|
) |
|
|
|
self._bindings2 = bindings2 |
|
self._last_version = expected_version |
|
|
|
|
|
class _MergedKeyBindings(_Proxy): |
|
""" |
|
Merge multiple registries of key bindings into one. |
|
|
|
This class acts as a proxy to multiple :class:`.KeyBindings` objects, but |
|
behaves as if this is just one bigger :class:`.KeyBindings`. |
|
|
|
:param registries: List of :class:`.KeyBindings` objects. |
|
""" |
|
|
|
def __init__(self, registries: Sequence[KeyBindingsBase]) -> None: |
|
_Proxy.__init__(self) |
|
self.registries = registries |
|
|
|
def _update_cache(self) -> None: |
|
""" |
|
If one of the original registries was changed. Update our merged |
|
version. |
|
""" |
|
expected_version = tuple(r._version for r in self.registries) |
|
|
|
if self._last_version != expected_version: |
|
bindings2 = KeyBindings() |
|
|
|
for reg in self.registries: |
|
bindings2.bindings.extend(reg.bindings) |
|
|
|
self._bindings2 = bindings2 |
|
self._last_version = expected_version |
|
|
|
|
|
def merge_key_bindings(bindings: Sequence[KeyBindingsBase]) -> _MergedKeyBindings: |
|
""" |
|
Merge multiple :class:`.Keybinding` objects together. |
|
|
|
Usage:: |
|
|
|
bindings = merge_key_bindings([bindings1, bindings2, ...]) |
|
""" |
|
return _MergedKeyBindings(bindings) |
|
|
|
|
|
class DynamicKeyBindings(_Proxy): |
|
""" |
|
KeyBindings class that can dynamically returns any KeyBindings. |
|
|
|
:param get_key_bindings: Callable that returns a :class:`.KeyBindings` instance. |
|
""" |
|
|
|
def __init__(self, get_key_bindings: Callable[[], KeyBindingsBase | None]) -> None: |
|
self.get_key_bindings = get_key_bindings |
|
self.__version = 0 |
|
self._last_child_version = None |
|
self._dummy = KeyBindings() |
|
|
|
def _update_cache(self) -> None: |
|
key_bindings = self.get_key_bindings() or self._dummy |
|
assert isinstance(key_bindings, KeyBindingsBase) |
|
version = id(key_bindings), key_bindings._version |
|
|
|
self._bindings2 = key_bindings |
|
self._last_version = version |
|
|
|
|
|
class GlobalOnlyKeyBindings(_Proxy): |
|
""" |
|
Wrapper around a :class:`.KeyBindings` object that only exposes the global |
|
key bindings. |
|
""" |
|
|
|
def __init__(self, key_bindings: KeyBindingsBase) -> None: |
|
_Proxy.__init__(self) |
|
self.key_bindings = key_bindings |
|
|
|
def _update_cache(self) -> None: |
|
""" |
|
If one of the original registries was changed. Update our merged |
|
version. |
|
""" |
|
expected_version = self.key_bindings._version |
|
|
|
if self._last_version != expected_version: |
|
bindings2 = KeyBindings() |
|
|
|
for b in self.key_bindings.bindings: |
|
if b.is_global(): |
|
bindings2.bindings.append(b) |
|
|
|
self._bindings2 = bindings2 |
|
self._last_version = expected_version |
|
|