|
""" |
|
Parser for the Telnet protocol. (Not a complete implementation of the telnet |
|
specification, but sufficient for a command line interface.) |
|
|
|
Inspired by `Twisted.conch.telnet`. |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import struct |
|
from typing import Callable, Generator |
|
|
|
from .log import logger |
|
|
|
__all__ = [ |
|
"TelnetProtocolParser", |
|
] |
|
|
|
|
|
def int2byte(number: int) -> bytes: |
|
return bytes((number,)) |
|
|
|
|
|
|
|
NOP = int2byte(0) |
|
SGA = int2byte(3) |
|
|
|
IAC = int2byte(255) |
|
DO = int2byte(253) |
|
DONT = int2byte(254) |
|
LINEMODE = int2byte(34) |
|
SB = int2byte(250) |
|
WILL = int2byte(251) |
|
WONT = int2byte(252) |
|
MODE = int2byte(1) |
|
SE = int2byte(240) |
|
ECHO = int2byte(1) |
|
NAWS = int2byte(31) |
|
LINEMODE = int2byte(34) |
|
SUPPRESS_GO_AHEAD = int2byte(3) |
|
|
|
TTYPE = int2byte(24) |
|
SEND = int2byte(1) |
|
IS = int2byte(0) |
|
|
|
DM = int2byte(242) |
|
BRK = int2byte(243) |
|
IP = int2byte(244) |
|
AO = int2byte(245) |
|
AYT = int2byte(246) |
|
EC = int2byte(247) |
|
EL = int2byte(248) |
|
GA = int2byte(249) |
|
|
|
|
|
class TelnetProtocolParser: |
|
""" |
|
Parser for the Telnet protocol. |
|
Usage:: |
|
|
|
def data_received(data): |
|
print(data) |
|
|
|
def size_received(rows, columns): |
|
print(rows, columns) |
|
|
|
p = TelnetProtocolParser(data_received, size_received) |
|
p.feed(binary_data) |
|
""" |
|
|
|
def __init__( |
|
self, |
|
data_received_callback: Callable[[bytes], None], |
|
size_received_callback: Callable[[int, int], None], |
|
ttype_received_callback: Callable[[str], None], |
|
) -> None: |
|
self.data_received_callback = data_received_callback |
|
self.size_received_callback = size_received_callback |
|
self.ttype_received_callback = ttype_received_callback |
|
|
|
self._parser = self._parse_coroutine() |
|
self._parser.send(None) |
|
|
|
def received_data(self, data: bytes) -> None: |
|
self.data_received_callback(data) |
|
|
|
def do_received(self, data: bytes) -> None: |
|
"""Received telnet DO command.""" |
|
logger.info("DO %r", data) |
|
|
|
def dont_received(self, data: bytes) -> None: |
|
"""Received telnet DONT command.""" |
|
logger.info("DONT %r", data) |
|
|
|
def will_received(self, data: bytes) -> None: |
|
"""Received telnet WILL command.""" |
|
logger.info("WILL %r", data) |
|
|
|
def wont_received(self, data: bytes) -> None: |
|
"""Received telnet WONT command.""" |
|
logger.info("WONT %r", data) |
|
|
|
def command_received(self, command: bytes, data: bytes) -> None: |
|
if command == DO: |
|
self.do_received(data) |
|
|
|
elif command == DONT: |
|
self.dont_received(data) |
|
|
|
elif command == WILL: |
|
self.will_received(data) |
|
|
|
elif command == WONT: |
|
self.wont_received(data) |
|
|
|
else: |
|
logger.info("command received %r %r", command, data) |
|
|
|
def naws(self, data: bytes) -> None: |
|
""" |
|
Received NAWS. (Window dimensions.) |
|
""" |
|
if len(data) == 4: |
|
|
|
|
|
|
|
columns, rows = struct.unpack("!HH", data) |
|
self.size_received_callback(rows, columns) |
|
else: |
|
logger.warning("Wrong number of NAWS bytes") |
|
|
|
def ttype(self, data: bytes) -> None: |
|
""" |
|
Received terminal type. |
|
""" |
|
subcmd, data = data[0:1], data[1:] |
|
if subcmd == IS: |
|
ttype = data.decode("ascii") |
|
self.ttype_received_callback(ttype) |
|
else: |
|
logger.warning("Received a non-IS terminal type Subnegotiation") |
|
|
|
def negotiate(self, data: bytes) -> None: |
|
""" |
|
Got negotiate data. |
|
""" |
|
command, payload = data[0:1], data[1:] |
|
|
|
if command == NAWS: |
|
self.naws(payload) |
|
elif command == TTYPE: |
|
self.ttype(payload) |
|
else: |
|
logger.info("Negotiate (%r got bytes)", len(data)) |
|
|
|
def _parse_coroutine(self) -> Generator[None, bytes, None]: |
|
""" |
|
Parser state machine. |
|
Every 'yield' expression returns the next byte. |
|
""" |
|
while True: |
|
d = yield |
|
|
|
if d == int2byte(0): |
|
pass |
|
|
|
|
|
elif d == IAC: |
|
d2 = yield |
|
|
|
if d2 == IAC: |
|
self.received_data(d2) |
|
|
|
|
|
elif d2 in (NOP, DM, BRK, IP, AO, AYT, EC, EL, GA): |
|
self.command_received(d2, b"") |
|
|
|
|
|
elif d2 in (DO, DONT, WILL, WONT): |
|
d3 = yield |
|
self.command_received(d2, d3) |
|
|
|
|
|
elif d2 == SB: |
|
|
|
data = [] |
|
|
|
while True: |
|
d3 = yield |
|
|
|
if d3 == IAC: |
|
d4 = yield |
|
if d4 == SE: |
|
break |
|
else: |
|
data.append(d4) |
|
else: |
|
data.append(d3) |
|
|
|
self.negotiate(b"".join(data)) |
|
else: |
|
self.received_data(d) |
|
|
|
def feed(self, data: bytes) -> None: |
|
""" |
|
Feed data to the parser. |
|
""" |
|
for b in data: |
|
self._parser.send(int2byte(b)) |
|
|