used bytes / str (3.4/2.7) and data storage, so the bin foot print is now hashable

This commit is contained in:
Richard Hartmann 2015-11-11 21:59:42 +01:00
parent 6ea441c83c
commit 009d7241c8
2 changed files with 73 additions and 47 deletions

View file

@ -27,10 +27,10 @@ __min_int32 = -2147483648
def __int_to_bytes(i):
m = 0xff
assert i >= 0
ba = bytearray()
ba = str()
while i > 0:
b = i & m
ba += bytearray([b])
ba += str(bytearray([b]))
i = i >> 8
return ba[::-1]
@ -38,68 +38,81 @@ def __bytes_to_int(ba):
i = 0
for b in ba:
i = i << 8
i += b
i += ord(b)
return i
BYTES_CLASS = bytearray
def char_eq_byte(ch, b):
return ord(ch) == b
def byte_eq_byte(b1, b2):
return b1 == b2
if sys.version_info.major > 2:
str_to_bytes = lambda s: BYTES_CLASS(s, 'utf8')
BIN_TYPE = bytes
str_to_bytes = lambda s: bytes(s, 'utf8')
bytes_to_str = lambda b: str(b, 'utf8')
int_to_bytes = lambda i: i.to_bytes(ceil(i.bit_length() / 8), 'big')
bytes_to_int = lambda ba: int.from_bytes(ba, 'big')
np_load = lambda ba: np.loads(ba)
init_BYTES = lambda b: bytes(b)
comp_id = byte_eq_byte
char_to_byte = lambda ch: ord(ch)
BIN_TYPE = str
str_to_bytes = lambda s: s
bytes_to_str = lambda b: str(b)
LONG_TYPE = long
int_to_bytes = __int_to_bytes
bytes_to_int = __bytes_to_int
np_load = lambda ba: np.loads(str(ba))
init_BYTES = lambda b: str(bytearray(b))
comp_id = char_eq_byte
char_to_byte = lambda ch: ch
class BFLoadError(Exception):
def _dump_spec(ob):
if ob == True:
b = BYTES_CLASS([_SPEC, ord('T')])
b = init_BYTES([_SPEC, char_to_byte('T')])
elif ob == False:
b = BYTES_CLASS([_SPEC, ord('F')])
b = init_BYTES([_SPEC, char_to_byte('F')])
elif ob == None:
b = BYTES_CLASS([_SPEC, ord('N')])
b = init_BYTES([_SPEC, char_to_byte('N')])
raise RuntimeError("object is not of 'special' kind!")
return b
def _load_spec(b):
assert b[0] == _SPEC
if b[1] == ord('T'):
assert comp_id(b[0], _SPEC)
if b[1] == char_to_byte('T'):
return True, 2
elif b[1] == ord('F'):
elif b[1] == char_to_byte('F'):
return False, 2
elif b[1] == ord('N'):
elif b[1] == char_to_byte('N'):
return None, 2
raise BFLoadError("unknown code for 'special' {}".format(b[1]))
def _dump_int_32(ob):
b = BYTES_CLASS([_INT_32])
b = init_BYTES([_INT_32])
b += struct.pack('>i', ob)
return b
def _load_int_32(b):
assert b[0] == _INT_32
assert comp_id(b[0], _INT_32)
i = struct.unpack('>i', b[1:5])[0]
return i, 5
def _dump_int(ob):
if ob < 0:
b = init_BYTES([_INT_NEG])
ob *= -1
b = init_BYTES([_INT])
ib = int_to_bytes(ob)
num_bytes = len(ib)
@ -108,9 +121,9 @@ def _dump_int(ob):
return b
def _load_int(b):
if b[0] == _INT:
if comp_id(b[0], _INT):
m = 1
elif b[0] == _INT_NEG:
elif comp_id(b[0], _INT_NEG):
m = -1
assert False
@ -119,29 +132,29 @@ def _load_int(b):
return i, num_bytes + 5
def _dump_float(ob):
b = init_BYTES([_FLOAT])
b += struct.pack('>d', ob)
return b
def _load_float(b):
assert b[0] == _FLOAT
assert comp_id(b[0],_FLOAT)
f = struct.unpack('>d', b[1:9])[0]
return f, 9
def _dump_complex(ob):
b = init_BYTES([_COMPLEX])
b += struct.pack('>d', ob.real)
b += struct.pack('>d', ob.imag)
return b
def _load_complex(b):
assert b[0] == _COMPLEX
assert comp_id(b[0], _COMPLEX)
re = struct.unpack('>d', b[1:9])[0]
im = struct.unpack('>d', b[9:17])[0]
return re + 1j*im, 13
def _dump_str(ob):
b = init_BYTES([_STR])
str_bytes = str_to_bytes(ob)
num_bytes = len(str_bytes)
b += struct.pack('>I', num_bytes)
@ -149,26 +162,29 @@ def _dump_str(ob):
return b
def _load_str(b):
assert b[0] == _STR
assert comp_id(b[0], _STR)
num_bytes = struct.unpack('>I', b[1:5])[0]
s = bytes_to_str(b[5:5+num_bytes])
return s, 5+num_bytes
def _dump_bytes(ob):
b = init_BYTES([_BYTES])
num_bytes = len(ob)
b += struct.pack('>I', num_bytes)
b += ob
if isinstance(b, str):
b += str(ob)
b += ob
return b
def _load_bytes(b):
assert b[0] == _BYTES
assert comp_id(b[0], _BYTES)
num_bytes = struct.unpack('>I', b[1:5])[0]
b_ = b[5:5+num_bytes]
return b_, 5+num_bytes
def _dump_tuple(t):
b = init_BYTES([_TUPLE])
size = len(t)
b += struct.pack('>I', size)
for ti in t:
@ -176,7 +192,7 @@ def _dump_tuple(t):
return b
def _load_tuple(b):
assert b[0] == _TUPLE
assert comp_id(b[0], _TUPLE)
size = struct.unpack('>I', b[1:5])[0]
idx = 5
t = []
@ -187,7 +203,7 @@ def _load_tuple(b):
return tuple(t), idx
def _dump_namedtuple(t):
size = len(t)
b += struct.pack('>I', size)
b += _dump(t.__class__.__name__)
@ -196,7 +212,7 @@ def _dump_namedtuple(t):
return b
def _load_namedtuple(b):
assert b[0] == _NAMEDTUPLE
assert comp_id(b[0], _NAMEDTUPLE)
size = struct.unpack('>I', b[1:5])[0]
class_name, len_ob = _load(b[5:])
idx = 5 + len_ob
@ -208,7 +224,7 @@ def _load_namedtuple(b):
return (class_name, tuple(t)), idx
def _dump_list(t):
b = init_BYTES([_LIST])
size = len(t)
b += struct.pack('>I', size)
for ti in t:
@ -216,7 +232,7 @@ def _dump_list(t):
return b
def _load_list(b):
assert b[0] == _LIST
assert comp_id(b[0], _LIST)
size = struct.unpack('>I', b[1:5])[0]
idx = 5
t = []
@ -227,7 +243,7 @@ def _load_list(b):
return t, idx
def _dump_np_array(np_array):
b = init_BYTES([_NPARRAY])
nparray_bytes = np.ndarray.dumps(np_array)
size = len(nparray_bytes)
b += struct.pack('>I', size)
@ -235,13 +251,13 @@ def _dump_np_array(np_array):
return b
def _load_np_array(b):
assert b[0] == _NPARRAY
assert comp_id(b[0], _NPARRAY)
size = struct.unpack('>I', b[1:5])[0]
npa = np_load(b[5: size+5])
return npa, size+5
def _dump_getstate(ob):
b = init_BYTES([_GETSTATE])
state = ob.__getstate__()
obj_type = ob.__class__.__name__
b += _dump(str(obj_type))
@ -250,13 +266,13 @@ def _dump_getstate(ob):
return b
def _load_getstate(b):
assert b[0] == _GETSTATE
assert comp_id(b[0], _GETSTATE)
obj_type, l_obj_type = _load(b[1:])
state, l_state = _load(b[l_obj_type+1:])
return (obj_type, state), l_obj_type+l_state+1
def _dump_dict(ob):
b = init_BYTES([_DICT])
keys = ob.keys()
bin_keys = []
for k in keys:
@ -265,7 +281,7 @@ def _dump_dict(ob):
return b
def _load_dict(b):
assert b[0] == _DICT
assert comp_id(b[0], _DICT)
sorted_keys_value, l = _load_list(b[1:])
res_dict = {}
for i in range(len(sorted_keys_value)):
@ -289,7 +305,7 @@ def _dump(ob):
return _dump_complex(ob)
elif isinstance(ob, str):
return _dump_str(ob)
elif isinstance(ob, (bytearray, bytes)):
elif isinstance(ob, bytes):
return _dump_bytes(ob)
elif isinstance(ob, tuple):
if hasattr(ob, '_fields'):
@ -309,6 +325,8 @@ def _dump(ob):
def _load(b):
identifier = b[0]
if isinstance(identifier, str):
identifier = ord(identifier)
if identifier == _SPEC:
return _load_spec(b)
elif identifier == _INT_32:
@ -336,19 +354,17 @@ def _load(b):
elif identifier == _GETSTATE:
return _load_getstate(b)
if isinstance(identifier, str):
identifier = ord(identifier)
raise BFLoadError("unknown identifier '{}'".format(hex(identifier)))
def dump(ob):
returns the binary footprint of the object 'ob' as BYTES_CLASS
returns the binary footprint of the object 'ob' as bytes
return _dump(ob)
def load(b):
reconstruct the object from the binary footprint given an BYTES_CLASS 'ba'
reconstruct the object from the binary footprint given an bytes 'ba'
return _load(b)[0]

View file

@ -20,17 +20,21 @@ import warnings
def test_atom():
atoms = [12345678, 3.141, 'hallo Welt', 'öäüß', True, False, None, 2**65, -3**65, bytearray([54,234,2]), b'\xff\fe\03']
atoms = [12345678, 3.141, 'hallo Welt', 'öäüß', True, False, None, 2**65, -3**65, b'\xff\fe\03']
for atom in atoms:
bin_atom = bfp.dump(atom)
assert type(bin_atom) is bfp.BIN_TYPE
atom_prime = bfp.load(bin_atom)
bin_ob_prime = bfp.dump(atom_prime)
assert bin_atom == bin_ob_prime
def test_tuple():
t = (12345678, 3.141, 'hallo Welt', 'öäüß', True, False, None, (3, tuple(), (4,5,None), 'test'))
bin_tuple = bfp._dump_tuple(t)
assert type(bin_tuple) is bfp.BIN_TYPE
t_prime = bfp._load_tuple(bin_tuple)[0]
assert t == t_prime
bin_ob_prime = bfp._dump(t_prime)
@ -39,6 +43,7 @@ def test_tuple():
def test_nparray():
ob = np.random.randn(3,53,2)
bin_ob = bfp._dump(ob)
assert type(bin_ob) is bfp.BIN_TYPE
ob_prime = bfp._load(bin_ob)[0]
assert np.all(ob == ob_prime)
bin_ob_prime = bfp._dump(ob_prime)
@ -56,6 +61,7 @@ def test_nparray():
def test_list():
ob = [1,2,3]
bin_ob = bfp._dump(ob)
assert type(bin_ob) is bfp.BIN_TYPE
ob_prime = bfp._load(bin_ob)[0]
assert np.all(ob == ob_prime)
bin_ob_prime = bfp._dump(ob_prime)
@ -82,6 +88,7 @@ def test_getstate():
ob = T(4)
bin_ob = bfp._dump(ob)
assert type(bin_ob) is bfp.BIN_TYPE
ob_prime_state = bfp._load(bin_ob)[0]
ob_prime = T.__new__(T)
@ -96,6 +103,7 @@ def test_named_tuple():
obj = obj_type(12345678, 3.141, 'hallo Welt')
bin_obj = bfp._dump(obj)
assert type(bin_obj) is bfp.BIN_TYPE
obj_prime = bfp._load(bin_obj)[0]
obj_prime_name, obj_prime_data = obj_prime
@ -110,12 +118,14 @@ def test_named_tuple():
def test_complex():
z = 3+4j
bf = bfp.dump(z)
assert type(bf) is bfp.BIN_TYPE
zr = bfp.load(bf)
assert zr == z
def test_dict():
a = {'a':1, 5:5, 3+4j:'l', False: b'ab4+#'}
bf = bfp.dump(a)
assert type(bf) is bfp.BIN_TYPE
a_restored = bfp.load(bf)
for k in a: