Spaces:
Runtime error
Runtime error
# Xlib.protocol.rq -- structure primitives for request, events and errors | |
# | |
# Copyright (C) 2000-2002 Peter Liljenberg <petli@ctrl-c.liu.se> | |
# | |
# This library is free software; you can redistribute it and/or | |
# modify it under the terms of the GNU Lesser General Public License | |
# as published by the Free Software Foundation; either version 2.1 | |
# of the License, or (at your option) any later version. | |
# | |
# This library is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. | |
# See the GNU Lesser General Public License for more details. | |
# | |
# You should have received a copy of the GNU Lesser General Public | |
# License along with this library; if not, write to the | |
# Free Software Foundation, Inc., | |
# 59 Temple Place, | |
# Suite 330, | |
# Boston, MA 02111-1307 USA | |
# Standard modules | |
import sys | |
import traceback | |
import struct | |
from array import array | |
# Python 2/3 compatibility. | |
from six import PY3, binary_type, byte2int, indexbytes, iterbytes | |
# Xlib modules | |
from .. import X | |
from ..support import lock | |
def decode_string(bs): | |
return bs.decode('latin1') | |
if PY3: | |
def encode_array(a): | |
return a.tobytes() | |
else: | |
def encode_array(a): | |
return a.tostring() | |
class BadDataError(Exception): pass | |
# These are struct codes, we know their byte sizes | |
signed_codes = { 1: 'b', 2: 'h', 4: 'l' } | |
unsigned_codes = { 1: 'B', 2: 'H', 4: 'L' } | |
# Unfortunately, we don't know the array sizes of B, H and L, since | |
# these use the underlying architecture's size for a char, short and | |
# long. Therefore we probe for their sizes, and additionally create | |
# a mapping that translates from struct codes to array codes. | |
# | |
# Bleah. | |
array_unsigned_codes = { } | |
struct_to_array_codes = { } | |
for c in 'bhil': | |
size = array(c).itemsize | |
array_unsigned_codes[size] = c.upper() | |
try: | |
struct_to_array_codes[signed_codes[size]] = c | |
struct_to_array_codes[unsigned_codes[size]] = c.upper() | |
except KeyError: | |
pass | |
# print array_unsigned_codes, struct_to_array_codes | |
class Field(object): | |
"""Field objects represent the data fields of a Struct. | |
Field objects must have the following attributes: | |
name -- the field name, or None | |
structcode -- the struct codes representing this field | |
structvalues -- the number of values encodes by structcode | |
Additionally, these attributes should either be None or real methods: | |
check_value -- check a value before it is converted to binary | |
parse_value -- parse a value after it has been converted from binary | |
If one of these attributes are None, no check or additional | |
parsings will be done one values when converting to or from binary | |
form. Otherwise, the methods should have the following behaviour: | |
newval = check_value(val) | |
Check that VAL is legal when converting to binary form. The | |
value can also be converted to another Python value. In any | |
case, return the possibly new value. NEWVAL should be a | |
single Python value if structvalues is 1, a tuple of | |
structvalues elements otherwise. | |
newval = parse_value(val, display) | |
VAL is an unpacked Python value, which now can be further | |
refined. DISPLAY is the current Display object. Return the | |
new value. VAL will be a single value if structvalues is 1, | |
a tuple of structvalues elements otherwise. | |
If `structcode' is None the Field must have the method | |
f.parse_binary_value() instead. See its documentation string for | |
details. | |
""" | |
name = None | |
default = None | |
structcode = None | |
structvalues = 0 | |
check_value = None | |
parse_value = None | |
keyword_args = False | |
def __init__(self): | |
pass | |
def parse_binary_value(self, data, display, length, format): | |
"""value, remaindata = f.parse_binary_value(data, display, length, format) | |
Decode a value for this field from the binary string DATA. | |
If there are a LengthField and/or a FormatField connected to this | |
field, their values will be LENGTH and FORMAT, respectively. If | |
there are no such fields the parameters will be None. | |
DISPLAY is the display involved, which is really only used by | |
the Resource fields. | |
The decoded value is returned as VALUE, and the remaining part | |
of DATA shold be returned as REMAINDATA. | |
""" | |
raise RuntimeError('Neither structcode or parse_binary_value ' \ | |
'provided for {0}'.format(self)) | |
class Pad(Field): | |
def __init__(self, size): | |
self.size = size | |
self.value = b'\0' * size | |
self.structcode = '{0}x'.format(size) | |
self.structvalues = 0 | |
class ConstantField(Field): | |
def __init__(self, value): | |
self.value = value | |
class Opcode(ConstantField): | |
structcode = 'B' | |
structvalues = 1 | |
class ReplyCode(ConstantField): | |
structcode = 'B' | |
structvalues = 1 | |
def __init__(self): | |
self.value = 1 | |
class LengthField(Field): | |
"""A LengthField stores the length of some other Field whose size | |
may vary, e.g. List and String8. | |
Its name should be the same as the name of the field whose size | |
it stores. The other_fields attribute can be used to specify the | |
names of other fields whose sizes are stored by this field, so | |
a single length field can set the length of multiple fields. | |
The lf.get_binary_value() method of LengthFields is not used, instead | |
a lf.get_binary_length() should be provided. | |
Unless LengthField.get_binary_length() is overridden in child classes, | |
there should also be a lf.calc_length(). | |
""" | |
structcode = 'L' | |
structvalues = 1 | |
other_fields = None | |
def calc_length(self, length): | |
"""newlen = lf.calc_length(length) | |
Return a new length NEWLEN based on the provided LENGTH. | |
""" | |
return length | |
class TotalLengthField(LengthField): | |
pass | |
class RequestLength(TotalLengthField): | |
structcode = 'H' | |
structvalues = 1 | |
def calc_length(self, length): | |
return length // 4 | |
class ReplyLength(TotalLengthField): | |
structcode = 'L' | |
structvalues = 1 | |
def calc_length(self, length): | |
return (length - 32) // 4 | |
class LengthOf(LengthField): | |
def __init__(self, name, size): | |
if isinstance(name, (list, tuple)): | |
self.name = name[0] | |
self.other_fields = name[1:] | |
else: | |
self.name = name | |
self.structcode = unsigned_codes[size] | |
class OddLength(LengthField): | |
structcode = 'B' | |
structvalues = 1 | |
def __init__(self, name): | |
self.name = name | |
def calc_length(self, length): | |
return length % 2 | |
def parse_value(self, value, display): | |
if value == 0: | |
return 'even' | |
else: | |
return 'odd' | |
class FormatField(Field): | |
"""A FormatField encodes the format of some other field, in a manner | |
similar to LengthFields. | |
The ff.get_binary_value() method is not used, replaced by | |
ff.get_binary_format(). | |
""" | |
structvalues = 1 | |
def __init__(self, name, size): | |
self.name = name | |
self.structcode = unsigned_codes[size] | |
Format = FormatField | |
class ValueField(Field): | |
def __init__(self, name, default = None): | |
self.name = name | |
self.default = default | |
class Int8(ValueField): | |
structcode = 'b' | |
structvalues = 1 | |
class Int16(ValueField): | |
structcode = 'h' | |
structvalues = 1 | |
class Int32(ValueField): | |
structcode = 'l' | |
structvalues = 1 | |
class Card8(ValueField): | |
structcode = 'B' | |
structvalues = 1 | |
class Card16(ValueField): | |
structcode = 'H' | |
structvalues = 1 | |
class Card32(ValueField): | |
structcode = 'L' | |
structvalues = 1 | |
class Resource(Card32): | |
cast_function = '__resource__' | |
class_name = 'resource' | |
def __init__(self, name, codes = (), default = None): | |
Card32.__init__(self, name, default) | |
self.codes = codes | |
def check_value(self, value): | |
if hasattr(value, self.cast_function): | |
return getattr(value, self.cast_function)() | |
else: | |
return value | |
def parse_value(self, value, display): | |
# if not display: | |
# return value | |
if value in self.codes: | |
return value | |
c = display.get_resource_class(self.class_name) | |
if c: | |
return c(display, value) | |
else: | |
return value | |
class Window(Resource): | |
cast_function = '__window__' | |
class_name = 'window' | |
class Pixmap(Resource): | |
cast_function = '__pixmap__' | |
class_name = 'pixmap' | |
class Drawable(Resource): | |
cast_function = '__drawable__' | |
class_name = 'drawable' | |
class Fontable(Resource): | |
cast_function = '__fontable__' | |
class_name = 'fontable' | |
class Font(Resource): | |
cast_function = '__font__' | |
class_name = 'font' | |
class GC(Resource): | |
cast_function = '__gc__' | |
class_name = 'gc' | |
class Colormap(Resource): | |
cast_function = '__colormap__' | |
class_name = 'colormap' | |
class Cursor(Resource): | |
cast_function = '__cursor__' | |
class_name = 'cursor' | |
class Bool(ValueField): | |
structvalues = 1 | |
structcode = 'B' | |
def check_value(self, value): | |
return not not value | |
class Set(ValueField): | |
structvalues = 1 | |
def __init__(self, name, size, values, default = None): | |
ValueField.__init__(self, name, default) | |
self.structcode = unsigned_codes[size] | |
self.values = values | |
def check_value(self, val): | |
if val not in self.values: | |
raise ValueError('field %s: argument %s not in %s' | |
% (self.name, val, self.values)) | |
return val | |
class Gravity(Set): | |
def __init__(self, name): | |
Set.__init__(self, name, 1, (X.ForgetGravity, X.StaticGravity, | |
X.NorthWestGravity, X.NorthGravity, | |
X.NorthEastGravity, X.WestGravity, | |
X.CenterGravity, X.EastGravity, | |
X.SouthWestGravity, X.SouthGravity, | |
X.SouthEastGravity)) | |
class FixedBinary(ValueField): | |
structvalues = 1 | |
def __init__(self, name, size): | |
ValueField.__init__(self, name) | |
self.structcode = '{0}s'.format(size) | |
class Binary(ValueField): | |
structcode = None | |
def __init__(self, name, pad = 1): | |
ValueField.__init__(self, name) | |
self.pad = pad | |
def pack_value(self, val): | |
val_bytes = val | |
slen = len(val_bytes) | |
if self.pad: | |
return val_bytes + b'\0' * ((4 - slen % 4) % 4), slen, None | |
else: | |
return val_bytes, slen, None | |
def parse_binary_value(self, data, display, length, format): | |
if length is None: | |
return data, b'' | |
if self.pad: | |
slen = length + ((4 - length % 4) % 4) | |
else: | |
slen = length | |
return data[:length], data[slen:] | |
class String8(ValueField): | |
structcode = None | |
def __init__(self, name, pad = 1): | |
ValueField.__init__(self, name) | |
self.pad = pad | |
def pack_value(self, val): | |
if isinstance(val, bytes): | |
val_bytes = val | |
else: | |
val_bytes = val.encode() | |
slen = len(val_bytes) | |
if self.pad: | |
return val_bytes + b'\0' * ((4 - slen % 4) % 4), slen, None | |
else: | |
return val_bytes, slen, None | |
def parse_binary_value(self, data, display, length, format): | |
if length is None: | |
return decode_string(data), b'' | |
if self.pad: | |
slen = length + ((4 - length % 4) % 4) | |
else: | |
slen = length | |
data_str = decode_string(data[:length]) | |
return data_str, data[slen:] | |
class String16(ValueField): | |
structcode = None | |
def __init__(self, name, pad = 1): | |
ValueField.__init__(self, name) | |
self.pad = pad | |
def pack_value(self, val): | |
"""Convert 8-byte string into 16-byte list""" | |
if isinstance(val, bytes): | |
val = list(iterbytes(val)) | |
slen = len(val) | |
if self.pad: | |
pad = b'\0\0' * (slen % 2) | |
else: | |
pad = b'' | |
return struct.pack('>' + 'H' * slen, *val) + pad, slen, None | |
def parse_binary_value(self, data, display, length, format): | |
if length == 'odd': | |
length = len(data) // 2 - 1 | |
elif length == 'even': | |
length = len(data) // 2 | |
if self.pad: | |
slen = length + (length % 2) | |
else: | |
slen = length | |
return struct.unpack('>' + 'H' * length, data[:length * 2]), data[slen * 2:] | |
class List(ValueField): | |
"""The List, FixedList and Object fields store compound data objects. | |
The type of data objects must be provided as an object with the | |
following attributes and methods: | |
... | |
""" | |
structcode = None | |
def __init__(self, name, type, pad = 1): | |
ValueField.__init__(self, name) | |
self.type = type | |
self.pad = pad | |
def parse_binary_value(self, data, display, length, format): | |
if length is None: | |
ret = [] | |
if self.type.structcode is None: | |
while data: | |
val, data = self.type.parse_binary(data, display) | |
ret.append(val) | |
else: | |
scode = '=' + self.type.structcode | |
slen = struct.calcsize(scode) | |
pos = 0 | |
while pos + slen <= len(data): | |
v = struct.unpack(scode, data[pos: pos + slen]) | |
if self.type.structvalues == 1: | |
v = v[0] | |
if self.type.parse_value is None: | |
ret.append(v) | |
else: | |
ret.append(self.type.parse_value(v, display)) | |
pos = pos + slen | |
data = data[pos:] | |
else: | |
ret = [None] * int(length) | |
if self.type.structcode is None: | |
for i in range(0, length): | |
ret[i], data = self.type.parse_binary(data, display) | |
else: | |
scode = '=' + self.type.structcode | |
slen = struct.calcsize(scode) | |
pos = 0 | |
for i in range(0, length): | |
v = struct.unpack(scode, data[pos: pos + slen]) | |
if self.type.structvalues == 1: | |
v = v[0] | |
if self.type.parse_value is None: | |
ret[i] = v | |
else: | |
ret[i] = self.type.parse_value(v, display) | |
pos = pos + slen | |
data = data[pos:] | |
if self.pad: | |
data = data[len(data) % 4:] | |
return ret, data | |
def pack_value(self, val): | |
# Single-char values, we'll assume that means integer lists. | |
if self.type.structcode and len(self.type.structcode) == 1: | |
if self.type.check_value is not None: | |
val = [self.type.check_value(v) for v in val] | |
a = array(struct_to_array_codes[self.type.structcode], val) | |
data = encode_array(a) | |
else: | |
data = [] | |
for v in val: | |
data.append(self.type.pack_value(v)) | |
data = b''.join(data) | |
if self.pad: | |
dlen = len(data) | |
data = data + b'\0' * ((4 - dlen % 4) % 4) | |
return data, len(val), None | |
class FixedList(List): | |
def __init__(self, name, size, type, pad = 1): | |
List.__init__(self, name, type, pad) | |
self.size = size | |
def parse_binary_value(self, data, display, length, format): | |
return List.parse_binary_value(self, data, display, self.size, format) | |
def pack_value(self, val): | |
if len(val) != self.size: | |
raise BadDataError('length mismatch for FixedList %s' % self.name) | |
return List.pack_value(self, val) | |
class Object(ValueField): | |
def __init__(self, name, type, default = None): | |
ValueField.__init__(self, name, default) | |
self.type = type | |
self.structcode = self.type.structcode | |
self.structvalues = self.type.structvalues | |
def parse_binary_value(self, data, display, length, format): | |
return self.type.parse_binary(data, display) | |
def parse_value(self, val, display): | |
return self.type.parse_value(val, display) | |
def pack_value(self, val): | |
return self.type.pack_value(val) | |
def check_value(self, val): | |
if isinstance(val, tuple): | |
vals = [] | |
i = 0 | |
for f in self.type.fields: | |
if f.name: | |
if f.check_value is None: | |
v = val[i] | |
else: | |
v = f.check_value(val[i]) | |
if f.structvalues == 1: | |
vals.append(v) | |
else: | |
vals.extend(v) | |
i = i + 1 | |
return vals | |
if isinstance(val, dict): | |
data = val | |
elif isinstance(val, DictWrapper): | |
data = val._data | |
else: | |
raise TypeError('Object value must be tuple, dictionary or DictWrapper: %s' % val) | |
vals = [] | |
for f in self.type.fields: | |
if f.name: | |
if f.check_value is None: | |
v = data[f.name] | |
else: | |
v = f.check_value(data[f.name]) | |
if f.structvalues == 1: | |
vals.append(v) | |
else: | |
vals.extend(v) | |
return vals | |
class PropertyData(ValueField): | |
structcode = None | |
def parse_binary_value(self, data, display, length, format): | |
if length is None: | |
length = len(data) // (format // 8) | |
else: | |
length = int(length) | |
if format == 0: | |
ret = None | |
elif format == 8: | |
ret = (8, data[:length]) | |
data = data[length + ((4 - length % 4) % 4):] | |
elif format == 16: | |
ret = (16, array(array_unsigned_codes[2], data[:2 * length])) | |
data = data[2 * (length + length % 2):] | |
elif format == 32: | |
ret = (32, array(array_unsigned_codes[4], data[:4 * length])) | |
data = data[4 * length:] | |
return ret, data | |
def pack_value(self, value): | |
fmt, val = value | |
if fmt not in (8, 16, 32): | |
raise BadDataError('Invalid property data format {0}'.format(fmt)) | |
if isinstance(val, binary_type): | |
size = fmt // 8 | |
vlen = len(val) | |
if vlen % size: | |
vlen = vlen - vlen % size | |
data = val[:vlen] | |
else: | |
data = val | |
dlen = vlen // size | |
else: | |
if isinstance(val, tuple): | |
val = list(val) | |
size = fmt // 8 | |
a = array(array_unsigned_codes[size], val) | |
data = encode_array(a) | |
dlen = len(val) | |
dl = len(data) | |
data = data + b'\0' * ((4 - dl % 4) % 4) | |
return data, dlen, fmt | |
class FixedPropertyData(PropertyData): | |
def __init__(self, name, size): | |
PropertyData.__init__(self, name) | |
self.size = size | |
def parse_binary_value(self, data, display, length, format): | |
return PropertyData.parse_binary_value(self, data, display, | |
self.size // (format // 8), format) | |
def pack_value(self, value): | |
data, dlen, fmt = PropertyData.pack_value(self, value) | |
if len(data) != self.size: | |
raise BadDataError('Wrong data length for FixedPropertyData: %s' | |
% (value, )) | |
return data, dlen, fmt | |
class ValueList(Field): | |
structcode = None | |
keyword_args = True | |
default = 'usekeywords' | |
def __init__(self, name, mask, pad, *fields): | |
self.name = name | |
self.maskcode = '={0}{1}x'.format(unsigned_codes[mask], pad).encode() | |
self.maskcodelen = struct.calcsize(self.maskcode) | |
self.fields = [] | |
flag = 1 | |
for f in fields: | |
if f.name: | |
self.fields.append((f, flag)) | |
flag = flag << 1 | |
def pack_value(self, arg, keys): | |
mask = 0 | |
data = b'' | |
if arg == self.default: | |
arg = keys | |
for field, flag in self.fields: | |
if field.name in arg: | |
mask = mask | flag | |
val = arg[field.name] | |
if field.check_value is not None: | |
val = field.check_value(val) | |
d = struct.pack('=' + field.structcode, val) | |
data = data + d + b'\0' * (4 - len(d)) | |
return struct.pack(self.maskcode, mask) + data, None, None | |
def parse_binary_value(self, data, display, length, format): | |
r = {} | |
mask = int(struct.unpack(self.maskcode, data[:self.maskcodelen])[0]) | |
data = data[self.maskcodelen:] | |
for field, flag in self.fields: | |
if mask & flag: | |
if field.structcode: | |
vals = struct.unpack('=' + field.structcode, | |
data[:struct.calcsize('=' + field.structcode)]) | |
if field.structvalues == 1: | |
vals = vals[0] | |
if field.parse_value is not None: | |
vals = field.parse_value(vals, display) | |
else: | |
vals, d = field.parse_binary_value(data[:4], display, None, None) | |
r[field.name] = vals | |
data = data[4:] | |
return DictWrapper(r), data | |
class KeyboardMapping(ValueField): | |
structcode = None | |
def parse_binary_value(self, data, display, length, format): | |
if length is None: | |
dlen = len(data) | |
else: | |
dlen = 4 * length * format | |
a = array(array_unsigned_codes[4], bytes(data[:dlen])) | |
ret = [] | |
for i in range(0, len(a), format): | |
ret.append(a[i : i + format]) | |
return ret, data[dlen:] | |
def pack_value(self, value): | |
keycodes = 0 | |
for v in value: | |
keycodes = max(keycodes, len(v)) | |
a = array(array_unsigned_codes[4]) | |
for v in value: | |
for k in v: | |
a.append(k) | |
for i in range(len(v), keycodes): | |
a.append(X.NoSymbol) | |
return encode_array(a), len(value), keycodes | |
class ModifierMapping(ValueField): | |
structcode = None | |
def parse_binary_value(self, data, display, length, format): | |
a = array(array_unsigned_codes[1], data[:8 * format]) | |
ret = [] | |
for i in range(0, 8): | |
ret.append(a[i * format : (i + 1) * format]) | |
return ret, data[8 * format:] | |
def pack_value(self, value): | |
if len(value) != 8: | |
raise BadDataError('ModifierMapping list should have eight elements') | |
keycodes = 0 | |
for v in value: | |
keycodes = max(keycodes, len(v)) | |
a = array(array_unsigned_codes[1]) | |
for v in value: | |
for k in v: | |
a.append(k) | |
for i in range(len(v), keycodes): | |
a.append(0) | |
return encode_array(a), len(value), keycodes | |
class EventField(ValueField): | |
structcode = None | |
def pack_value(self, value): | |
if not isinstance(value, Event): | |
raise BadDataError('%s is not an Event for field %s' % (value, self.name)) | |
return value._binary, None, None | |
def parse_binary_value(self, data, display, length, format): | |
from . import event | |
estruct = display.event_classes.get(byte2int(data) & 0x7f, event.AnyEvent) | |
if type(estruct) == dict: | |
# this etype refers to a set of sub-events with individual subcodes | |
estruct = estruct[indexbytes(data, 1)] | |
return estruct(display = display, binarydata = data[:32]), data[32:] | |
# | |
# Objects usable for List and FixedList fields. | |
# Struct is also usable. | |
# | |
class ScalarObj(object): | |
def __init__(self, code): | |
self.structcode = code | |
self.structvalues = 1 | |
self.parse_value = None | |
self.check_value = None | |
Card8Obj = ScalarObj('B') | |
Card16Obj = ScalarObj('H') | |
Card32Obj = ScalarObj('L') | |
class ResourceObj(object): | |
structcode = 'L' | |
structvalues = 1 | |
def __init__(self, class_name): | |
self.class_name = class_name | |
self.check_value = None | |
def parse_value(self, value, display): | |
# if not display: | |
# return value | |
c = display.get_resource_class(self.class_name) | |
if c: | |
return c(display, value) | |
else: | |
return value | |
WindowObj = ResourceObj('window') | |
ColormapObj = ResourceObj('colormap') | |
class StrClass(object): | |
structcode = None | |
def pack_value(self, val): | |
return (chr(len(val)) + val).encode() | |
def parse_binary(self, data, display): | |
slen = byte2int(data) + 1 | |
return decode_string(data[1:slen]), data[slen:] | |
Str = StrClass() | |
class Struct(object): | |
"""Struct objects represents a binary data structure. It can | |
contain both fields with static and dynamic sizes. However, all | |
static fields must appear before all dynamic fields. | |
Fields are represented by various subclasses of the abstract base | |
class Field. The fields of a structure are given as arguments | |
when instantiating a Struct object. | |
Struct objects have two public methods: | |
to_binary() -- build a binary representation of the structure | |
with the values given as arguments | |
parse_binary() -- convert a binary (string) representation into | |
a Python dictionary or object. | |
These functions will be generated dynamically for each Struct | |
object to make conversion as fast as possible. They are | |
generated the first time the methods are called. | |
""" | |
def __init__(self, *fields): | |
self.fields = fields | |
# Structures for to_binary, parse_value and parse_binary | |
self.static_codes = '=' | |
self.static_values = 0 | |
self.static_fields = [] | |
self.static_size = None | |
self.var_fields = [] | |
for f in self.fields: | |
# Append structcode if there is one and we haven't | |
# got any varsize fields yet. | |
if f.structcode is not None: | |
assert not self.var_fields | |
self.static_codes = self.static_codes + f.structcode | |
# Only store fields with values | |
if f.structvalues > 0: | |
self.static_fields.append(f) | |
self.static_values = self.static_values + f.structvalues | |
# If we have got one varsize field, all the rest must | |
# also be varsize fields. | |
else: | |
self.var_fields.append(f) | |
self.static_size = struct.calcsize(self.static_codes) | |
if self.var_fields: | |
self.structcode = None | |
self.structvalues = 0 | |
else: | |
self.structcode = self.static_codes[1:] | |
self.structvalues = self.static_values | |
# These functions get called only once, as they will override | |
# themselves with dynamically created functions in the Struct | |
# object | |
def to_binary(self, *varargs, **keys): | |
"""data = s.to_binary(...) | |
Convert Python values into the binary representation. The | |
arguments will be all value fields with names, in the order | |
given when the Struct object was instantiated. With one | |
exception: fields with default arguments will be last. | |
Returns the binary representation as the string DATA. | |
""" | |
# Emulate Python function argument handling with our field names | |
names = [f.name for f in self.fields \ | |
if isinstance(f, ValueField) and f.name] | |
field_args = dict(zip(names, varargs)) | |
if set(field_args).intersection(keys): | |
dupes = ", ".join(set(field_args).intersection(keys)) | |
raise TypeError("{0} arguments were passed both positionally and by keyword".format(dupes)) | |
field_args.update(keys) | |
for f in self.fields: | |
if f.name and (f.name not in field_args): | |
if f.default is None: | |
raise TypeError("Missing required argument {0}".format(f.name)) | |
field_args[f.name] = f.default | |
# /argument handling | |
# First pack all varfields so their lengths and formats are | |
# available when we pack their static LengthFields and | |
# FormatFields | |
total_length = self.static_size | |
var_vals = {} | |
lengths = {} | |
formats = {} | |
for f in self.var_fields: | |
if f.keyword_args: | |
v, l, fm = f.pack_value(field_args[f.name], keys) | |
else: | |
v, l, fm = f.pack_value(field_args[f.name]) | |
var_vals[f.name] = v | |
lengths[f.name] = l | |
formats[f.name] = fm | |
total_length += len(v) | |
# Construct item list for struct.pack call, packing all static fields. | |
pack_items = [] | |
for f in self.static_fields: | |
if isinstance(f, LengthField): | |
# If this is a total length field, insert | |
# the calculated field value here | |
if isinstance(f, TotalLengthField): | |
pack_items.append(f.calc_length(total_length)) | |
else: | |
pack_items.append(f.calc_length(lengths[f.name])) | |
# Format field, just insert the value we got previously | |
elif isinstance(f, FormatField): | |
pack_items.append(formats[f.name]) | |
# A constant field, insert its value directly | |
elif isinstance(f, ConstantField): | |
pack_items.append(f.value) | |
# Value fields | |
else: | |
if f.structvalues == 1: | |
# If there's a value check/convert function, call it | |
if f.check_value is not None: | |
pack_items.append(f.check_value(field_args[f.name])) | |
# Else just use the argument as provided | |
else: | |
pack_items.append(field_args[f.name]) | |
# Multivalue field. Handled like single valuefield, | |
# but the value are tuple unpacked into separate arguments | |
# which are appended to pack_items | |
else: | |
if f.check_value is not None: | |
pack_items.extend(f.check_value(field_args[f.name])) | |
else: | |
pack_items.extend(field_args[f.name]) | |
static_part = struct.pack(self.static_codes, *pack_items) | |
var_parts = [var_vals[f.name] for f in self.var_fields] | |
return static_part + b''.join(var_parts) | |
def pack_value(self, value): | |
""" This function allows Struct objects to be used in List and | |
Object fields. Each item represents the arguments to pass to | |
to_binary, either a tuple, a dictionary or a DictWrapper. | |
""" | |
if type(value) is tuple: | |
return self.to_binary(*value) | |
elif isinstance(value, dict): | |
return self.to_binary(**value) | |
elif isinstance(value, DictWrapper): | |
return self.to_binary(**value._data) | |
else: | |
raise BadDataError('%s is not a tuple or a list' % (value)) | |
def parse_value(self, val, display, rawdict = False): | |
"""This function is used by List and Object fields to convert | |
Struct objects with no var_fields into Python values. | |
""" | |
ret = {} | |
vno = 0 | |
for f in self.static_fields: | |
# Fields without names should be ignored, and there should | |
# not be any length or format fields if this function | |
# ever gets called. (If there were such fields, there should | |
# be a matching field in var_fields and then parse_binary | |
# would have been called instead. | |
if not f.name: | |
pass | |
elif isinstance(f, LengthField): | |
pass | |
elif isinstance(f, FormatField): | |
pass | |
# Value fields | |
else: | |
# If this field has a parse_value method, call it, otherwise | |
# use the unpacked value as is. | |
if f.structvalues == 1: | |
field_val = val[vno] | |
else: | |
field_val = val[vno:vno+f.structvalues] | |
if f.parse_value is not None: | |
field_val = f.parse_value(field_val, display, rawdict=rawdict) | |
ret[f.name] = field_val | |
vno = vno + f.structvalues | |
if not rawdict: | |
return DictWrapper(ret) | |
return ret | |
def parse_binary(self, data, display, rawdict = False): | |
"""values, remdata = s.parse_binary(data, display, rawdict = False) | |
Convert a binary representation of the structure into Python values. | |
DATA is a string or a buffer containing the binary data. | |
DISPLAY should be a Xlib.protocol.display.Display object if | |
there are any Resource fields or Lists with ResourceObjs. | |
The Python values are returned as VALUES. If RAWDICT is true, | |
a Python dictionary is returned, where the keys are field | |
names and the values are the corresponding Python value. If | |
RAWDICT is false, a DictWrapper will be returned where all | |
fields are available as attributes. | |
REMDATA are the remaining binary data, unused by the Struct object. | |
""" | |
ret = {} | |
val = struct.unpack(self.static_codes, data[:self.static_size]) | |
lengths = {} | |
formats = {} | |
vno = 0 | |
for f in self.static_fields: | |
# Fields without name should be ignored. This is typically | |
# pad and constant fields | |
if not f.name: | |
pass | |
# Store index in val for Length and Format fields, to be used | |
# when treating varfields. | |
elif isinstance(f, LengthField): | |
f_names = [f.name] | |
if f.other_fields: | |
f_names.extend(f.other_fields) | |
field_val = val[vno] | |
if f.parse_value is not None: | |
field_val = f.parse_value(field_val, display) | |
for f_name in f_names: | |
lengths[f_name] = field_val | |
elif isinstance(f, FormatField): | |
formats[f.name] = val[vno] | |
# Treat value fields the same was as in parse_value. | |
else: | |
if f.structvalues == 1: | |
field_val = val[vno] | |
else: | |
field_val = val[vno:vno+f.structvalues] | |
if f.parse_value is not None: | |
field_val = f.parse_value(field_val, display) | |
ret[f.name] = field_val | |
vno = vno + f.structvalues | |
data = data[self.static_size:] | |
# Call parse_binary_value for each var_field, passing the | |
# length and format values from the unpacked val. | |
for f in self.var_fields: | |
ret[f.name], data = f.parse_binary_value(data, display, | |
lengths.get(f.name), | |
formats.get(f.name), | |
) | |
if not rawdict: | |
ret = DictWrapper(ret) | |
return ret, data | |
class TextElements8(ValueField): | |
string_textitem = Struct( LengthOf('string', 1), | |
Int8('delta'), | |
String8('string', pad = 0) ) | |
def pack_value(self, value): | |
data = b'' | |
args = {} | |
for v in value: | |
# Let values be simple strings, meaning a delta of 0 | |
if type(v) in (str, bytes): | |
v = (0, v) | |
# A tuple, it should be (delta, string) | |
# Encode it as one or more textitems | |
if isinstance(v, (tuple, dict, DictWrapper)): | |
if isinstance(v, tuple): | |
delta, m_str = v | |
else: | |
delta = v['delta'] | |
m_str = v['string'] | |
while delta or m_str: | |
args['delta'] = delta | |
args['string'] = m_str[:254] | |
data = data + self.string_textitem.to_binary(*(), **args) | |
delta = 0 | |
m_str = m_str[254:] | |
# Else an integer, i.e. a font change | |
else: | |
# Use fontable cast function if instance | |
if isinstance(v, Fontable): | |
v = v.__fontable__() | |
data = data + struct.pack('>BL', 255, v) | |
# Pad out to four byte length | |
dlen = len(data) | |
return data + b'\0' * ((4 - dlen % 4) % 4), None, None | |
def parse_binary_value(self, data, display, length, format): | |
values = [] | |
while 1: | |
if len(data) < 2: | |
break | |
# font change | |
if byte2int(data) == 255: | |
values.append(struct.unpack('>L', bytes(data[1:5]))[0]) | |
data = data[5:] | |
# skip null strings | |
elif byte2int(data) == 0 and indexbytes(data, 1) == 0: | |
data = data[2:] | |
# string with delta | |
else: | |
v, data = self.string_textitem.parse_binary(data, display) | |
values.append(v) | |
return values, '' | |
class TextElements16(TextElements8): | |
string_textitem = Struct( LengthOf('string', 1), | |
Int8('delta'), | |
String16('string', pad = 0) ) | |
class GetAttrData(object): | |
def __getattr__(self, attr): | |
try: | |
if self._data: | |
return self._data[attr] | |
else: | |
raise AttributeError(attr) | |
except KeyError: | |
raise AttributeError(attr) | |
class DictWrapper(GetAttrData): | |
def __init__(self, dict): | |
self.__dict__['_data'] = dict | |
def __getitem__(self, key): | |
return self._data[key] | |
def __setitem__(self, key, value): | |
self._data[key] = value | |
def __delitem__(self, key): | |
del self._data[key] | |
def __setattr__(self, key, value): | |
self._data[key] = value | |
def __delattr__(self, key): | |
del self._data[key] | |
def __str__(self): | |
return str(self._data) | |
def __repr__(self): | |
return '%s(%s)' % (self.__class__.__name__, repr(self._data)) | |
def __lt__(self, other): | |
if isinstance(other, DictWrapper): | |
return self._data < other._data | |
else: | |
return self._data < other | |
def __gt__(self, other): | |
if isinstance(other, DictWrapper): | |
return self._data > other._data | |
else: | |
return self._data > other | |
def __eq__(self, other): | |
if isinstance(other, DictWrapper): | |
return self._data == other._data | |
else: | |
return self._data == other | |
class Request(object): | |
def __init__(self, display, onerror = None, *args, **keys): | |
self._errorhandler = onerror | |
self._binary = self._request.to_binary(*args, **keys) | |
self._serial = None | |
display.send_request(self, onerror is not None) | |
def _set_error(self, error): | |
if self._errorhandler is not None: | |
return call_error_handler(self._errorhandler, error, self) | |
else: | |
return 0 | |
class ReplyRequest(GetAttrData): | |
def __init__(self, display, defer = False, *args, **keys): | |
self._display = display | |
self._binary = self._request.to_binary(*args, **keys) | |
self._serial = None | |
self._data = None | |
self._error = None | |
self._response_lock = lock.allocate_lock() | |
self._display.send_request(self, True) | |
if not defer: | |
self.reply() | |
def reply(self): | |
# Send request and wait for reply if we hasn't | |
# already got one. This means that reply() can safely | |
# be called more than one time. | |
self._response_lock.acquire() | |
while self._data is None and self._error is None: | |
self._display.send_recv_lock.acquire() | |
self._response_lock.release() | |
self._display.send_and_recv(request = self._serial) | |
self._response_lock.acquire() | |
self._response_lock.release() | |
self._display = None | |
# If error has been set, raise it | |
if self._error: | |
raise self._error | |
def _parse_response(self, data): | |
self._response_lock.acquire() | |
self._data, d = self._reply.parse_binary(data, self._display, rawdict = True) | |
self._response_lock.release() | |
def _set_error(self, error): | |
self._response_lock.acquire() | |
self._error = error | |
self._response_lock.release() | |
return 1 | |
def __repr__(self): | |
return '<%s serial = %s, data = %s, error = %s>' % (self.__class__.__name__, self._serial, self._data, self._error) | |
class Event(GetAttrData): | |
def __init__(self, binarydata = None, display = None, | |
**keys): | |
if binarydata: | |
self._binary = binarydata | |
self._data, data = self._fields.parse_binary(binarydata, display, | |
rawdict = True) | |
# split event type into type and send_event bit | |
self._data['send_event'] = not not self._data['type'] & 0x80 | |
self._data['type'] = self._data['type'] & 0x7f | |
else: | |
if self._code: | |
keys['type'] = self._code | |
keys['sequence_number'] = 0 | |
self._binary = self._fields.to_binary(**keys) | |
keys['send_event'] = 0 | |
self._data = keys | |
def __repr__(self): | |
kwlist = [] | |
for kw, val in self._data.items(): | |
if kw == 'send_event': | |
continue | |
if kw == 'type' and self._data['send_event']: | |
val = val | 0x80 | |
kwlist.append('%s = %s' % (kw, repr(val))) | |
kws = ', '.join(kwlist) | |
return '%s(%s)' % (self.__class__.__name__, kws) | |
def __lt__(self, other): | |
if isinstance(other, Event): | |
return self._data < other._data | |
else: | |
return self._data < other | |
def __gt__(self, other): | |
if isinstance(other, Event): | |
return self._data > other._data | |
else: | |
return self._data > other | |
def __eq__(self, other): | |
if isinstance(other, Event): | |
return self._data == other._data | |
else: | |
return self._data == other | |
def call_error_handler(handler, error, request): | |
try: | |
return handler(error, request) | |
except: | |
sys.stderr.write('Exception raised by error handler.\n') | |
traceback.print_exc() | |
return 0 | |