mirror of
https://github.com/vale981/binfootprint
synced 2025-03-05 09:11:40 -05:00
move binfootprint from jobmanager
This commit is contained in:
commit
5697bf88da
4 changed files with 857 additions and 0 deletions
25
.travis.yml
Normal file
25
.travis.yml
Normal file
|
@ -0,0 +1,25 @@
|
|||
language: python
|
||||
|
||||
notifications:
|
||||
email: false
|
||||
|
||||
python:
|
||||
- '2.7'
|
||||
- '3.4'
|
||||
- '3.5'
|
||||
|
||||
before_install:
|
||||
- which python
|
||||
- python --version
|
||||
|
||||
install:
|
||||
- pip install numpy
|
||||
- pip install pytest pytest-cov
|
||||
- pip install python-coveralls
|
||||
- pip freeze
|
||||
|
||||
script:
|
||||
- py.test tests/ -v --cov=binfootprint --cov-report term-missing
|
||||
|
||||
after_success:
|
||||
- coveralls
|
59
README.md
Normal file
59
README.md
Normal file
|
@ -0,0 +1,59 @@
|
|||
# binfootprint
|
||||
|
||||
Calculate a unique binary representation (binary footprint) for simple data structures
|
||||
with the intension to use this binary footprint as a loop up key for example in a data base.
|
||||
|
||||
The following atomic types are supported:
|
||||
* integer (32bit and python integer)
|
||||
* float (the usual 64bit)
|
||||
* complex (the usual 2 times 64bit)
|
||||
* string
|
||||
* bytes
|
||||
* numpy ndarray (see note below)
|
||||
|
||||
These atomic types can be structured arbitrarily nested using the following python standard containers:
|
||||
* tuple
|
||||
* named tuple
|
||||
* list
|
||||
* dict
|
||||
|
||||
Generating the binary footprint and reconstruction is done as follows:
|
||||
|
||||
```python
|
||||
import binfootprint as bf
|
||||
|
||||
data = ['hallo', 42]
|
||||
bin_key = bf.dump(data)
|
||||
|
||||
data_prime = bf.load(bin_key)
|
||||
print(data_prime)
|
||||
```
|
||||
|
||||
Further any class that implements __getstate__ may be used as a container as well. When reconstructing, the class needs to have the __setstate__ method implemented.
|
||||
Additionally the `bf.load` function required a mapping from the class name to the class object, like this:
|
||||
```python
|
||||
|
||||
import binfootprint as bf
|
||||
|
||||
class T(object):
|
||||
def __init__(self, a):
|
||||
self.a = a
|
||||
def __getstate__(self):
|
||||
return [self.a]
|
||||
def __setstate__(self, state):
|
||||
self.a = state[0]
|
||||
|
||||
ob = T(4)
|
||||
bin_ob = bf.dump(ob)
|
||||
|
||||
# reconstruction
|
||||
classes = {}
|
||||
classes['T'] = T
|
||||
ob_prime = bf.load(bin_ob, classes)
|
||||
|
||||
```
|
||||
|
||||
### Note on numpy ndarrays
|
||||
|
||||
As it has not been clarified/tested yet whether the buffer of the numpy ndarray is really unique also on different machines and architectures
|
||||
is it not assured that the binary footprint serves as a valid key.
|
593
binfootprint/binfootprint.py
Normal file
593
binfootprint/binfootprint.py
Normal file
|
@ -0,0 +1,593 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import division, print_function
|
||||
|
||||
"""
|
||||
This module intents to generate a binary representation of a python object
|
||||
where it is guaranteed that the same objects will result in the same binary
|
||||
representation.
|
||||
|
||||
By far not all python objects are supported. Here is the list of supported types
|
||||
|
||||
- special build-in constants: True, False, None
|
||||
- integer
|
||||
- float (64bit)
|
||||
- complex (128bit)
|
||||
- np.ndarray
|
||||
- list
|
||||
- tuple
|
||||
- dictionary
|
||||
- namedtuple (new since version 0x80, before it also need the 'classes' lookup when loaded)
|
||||
|
||||
For any nested combination of these objects it is also guaranteed that the
|
||||
original objects can be restored without any extra information.
|
||||
|
||||
Additionally
|
||||
|
||||
- 'getstate' (objects that implement __getstate__ and return a state that can be dumped as well)
|
||||
|
||||
can be dumped. To Restore these objects the load function needs a lookup given by the argument 'classes'
|
||||
which maps the objects class name (obj.__class__.__name__) to the actual class definition (the class object).
|
||||
Of course for these objects the __setstate__ method needs to be implemented.
|
||||
|
||||
NOTE: the tests pass python2.7 and python 3.4 so far, but it not yet been tested if the binary representation
|
||||
is the same among different python versions (they should be though!)
|
||||
"""
|
||||
|
||||
from collections import namedtuple
|
||||
from math import ceil
|
||||
import numpy as np
|
||||
import struct
|
||||
from sys import version_info
|
||||
|
||||
_spec_types = (bool, type(None))
|
||||
|
||||
_SPEC = 0x00 # True, False, None
|
||||
_INT_32 = 0x01
|
||||
_FLOAT = 0x02
|
||||
_COMPLEX = 0x03
|
||||
_STR = 0x04
|
||||
_BYTES = 0x05
|
||||
_INT = 0x06
|
||||
_TUPLE = 0x07
|
||||
_NAMEDTUPLE = 0x08
|
||||
_NPARRAY = 0x09
|
||||
_LIST = 0x0a
|
||||
_GETSTATE = 0x0b
|
||||
_DICT = 0x0c
|
||||
_INT_NEG = 0x0d
|
||||
_BFSTATE = 0x0e
|
||||
|
||||
_VERS = 0x80
|
||||
def getVersion():
|
||||
return _VERS
|
||||
|
||||
__max_int32 = +2147483647
|
||||
__min_int32 = -2147483648
|
||||
|
||||
def __int_to_bytes(i):
|
||||
m = 0xff
|
||||
assert i >= 0
|
||||
ba = str()
|
||||
while i > 0:
|
||||
b = i & m
|
||||
ba += str(bytearray([b]))
|
||||
i = i >> 8
|
||||
return ba[::-1]
|
||||
|
||||
def __bytes_to_int(ba):
|
||||
i = 0
|
||||
for b in ba:
|
||||
i = i << 8
|
||||
i += ord(b)
|
||||
return i
|
||||
|
||||
def char_eq_byte(ch, b):
|
||||
return ord(ch) == b
|
||||
|
||||
def byte_eq_byte(b1, b2):
|
||||
return b1 == b2
|
||||
|
||||
|
||||
|
||||
if version_info.major > 2:
|
||||
BIN_TYPE = bytes
|
||||
str_to_bytes = lambda s: bytes(s, 'utf8')
|
||||
bytes_to_str = lambda b: str(b, 'utf8')
|
||||
LONG_TYPE = int
|
||||
np_load = lambda ba: np.loads(ba)
|
||||
init_BYTES = lambda b: bytes(b)
|
||||
comp_id = byte_eq_byte
|
||||
char_to_byte = lambda ch: ord(ch)
|
||||
byte_to_ord = lambda b: b
|
||||
else:
|
||||
BIN_TYPE = str
|
||||
str_to_bytes = lambda s: s
|
||||
bytes_to_str = lambda b: str(b)
|
||||
LONG_TYPE = long
|
||||
np_load = lambda ba: np.loads(str(ba))
|
||||
init_BYTES = lambda b: str(bytearray(b))
|
||||
comp_id = char_eq_byte
|
||||
char_to_byte = lambda ch: ch
|
||||
byte_to_ord = lambda b: ord(b)
|
||||
|
||||
|
||||
int_to_bytes = lambda i: i.to_bytes(ceil(i.bit_length() / 8), 'big')
|
||||
bytes_to_int = lambda ba: int.from_bytes(ba, 'big')
|
||||
|
||||
try:
|
||||
int_to_bytes(2**77)
|
||||
except AttributeError:
|
||||
int_to_bytes = __int_to_bytes
|
||||
|
||||
__b_tmp = int_to_bytes(2**77)
|
||||
|
||||
try:
|
||||
bytes_to_int(__b_tmp)
|
||||
except AttributeError:
|
||||
bytes_to_int = __bytes_to_int
|
||||
|
||||
assert bytes_to_int(__b_tmp) == 2**77
|
||||
|
||||
class BFLoadError(Exception):
|
||||
pass
|
||||
|
||||
class BFUnkownClassError(Exception):
|
||||
def __init__(self, classname):
|
||||
Exception.__init__(self, "could not load object of type '{}', no class definition found in classes\n".format(classname)+
|
||||
"Please provide the lookup 'classes' when calling load, that maps the class name of the object to the actual "+
|
||||
"class definition (class object).")
|
||||
|
||||
def _dump_spec(ob):
|
||||
if ob == True:
|
||||
b = init_BYTES([_SPEC, char_to_byte('T')])
|
||||
elif ob == False:
|
||||
b = init_BYTES([_SPEC, char_to_byte('F')])
|
||||
elif ob == None:
|
||||
b = init_BYTES([_SPEC, char_to_byte('N')])
|
||||
else:
|
||||
raise RuntimeError("object is not of 'special' kind!")
|
||||
return b
|
||||
|
||||
def _load_spec(b):
|
||||
assert comp_id(b[0], _SPEC)
|
||||
if b[1] == char_to_byte('T'):
|
||||
return True, 2
|
||||
elif b[1] == char_to_byte('F'):
|
||||
return False, 2
|
||||
elif b[1] == char_to_byte('N'):
|
||||
return None, 2
|
||||
else:
|
||||
raise BFLoadError("unknown code for 'special' {}".format(b[1]))
|
||||
|
||||
def _dump_int_32(ob):
|
||||
b = init_BYTES([_INT_32])
|
||||
b += struct.pack('>i', ob)
|
||||
return b
|
||||
|
||||
def _load_int_32(b):
|
||||
assert comp_id(b[0], _INT_32)
|
||||
i = struct.unpack('>i', b[1:5])[0]
|
||||
return i, 5
|
||||
|
||||
def _dump_int(ob):
|
||||
if ob < 0:
|
||||
b = init_BYTES([_INT_NEG])
|
||||
ob *= -1
|
||||
else:
|
||||
b = init_BYTES([_INT])
|
||||
|
||||
ib = int_to_bytes(ob)
|
||||
num_bytes = len(ib)
|
||||
b += struct.pack('>I', num_bytes)
|
||||
b += ib
|
||||
return b
|
||||
|
||||
def _load_int(b):
|
||||
if comp_id(b[0], _INT):
|
||||
m = 1
|
||||
elif comp_id(b[0], _INT_NEG):
|
||||
m = -1
|
||||
else:
|
||||
assert False
|
||||
num_bytes = struct.unpack('>I', b[1:5])[0]
|
||||
i = m*bytes_to_int(b[5:5+num_bytes])
|
||||
return i, num_bytes + 5
|
||||
|
||||
def _dump_float(ob):
|
||||
b = init_BYTES([_FLOAT])
|
||||
b += struct.pack('>d', ob)
|
||||
return b
|
||||
|
||||
def _load_float(b):
|
||||
assert comp_id(b[0],_FLOAT)
|
||||
f = struct.unpack('>d', b[1:9])[0]
|
||||
return f, 9
|
||||
|
||||
def _dump_complex(ob):
|
||||
b = init_BYTES([_COMPLEX])
|
||||
b += struct.pack('>d', ob.real)
|
||||
b += struct.pack('>d', ob.imag)
|
||||
return b
|
||||
|
||||
def _load_complex(b):
|
||||
assert comp_id(b[0], _COMPLEX)
|
||||
re = struct.unpack('>d', b[1:9])[0]
|
||||
im = struct.unpack('>d', b[9:17])[0]
|
||||
return re + 1j*im, 13
|
||||
|
||||
def _dump_str(ob):
|
||||
b = init_BYTES([_STR])
|
||||
str_bytes = str_to_bytes(ob)
|
||||
num_bytes = len(str_bytes)
|
||||
b += struct.pack('>I', num_bytes)
|
||||
b += str_bytes
|
||||
return b
|
||||
|
||||
def _load_str(b):
|
||||
assert comp_id(b[0], _STR)
|
||||
num_bytes = struct.unpack('>I', b[1:5])[0]
|
||||
s = bytes_to_str(b[5:5+num_bytes])
|
||||
return s, 5+num_bytes
|
||||
|
||||
def _dump_bytes(ob):
|
||||
b = init_BYTES([_BYTES])
|
||||
num_bytes = len(ob)
|
||||
b += struct.pack('>I', num_bytes)
|
||||
if isinstance(b, str):
|
||||
b += str(ob)
|
||||
else:
|
||||
b += ob
|
||||
return b
|
||||
|
||||
def _load_bytes(b):
|
||||
assert comp_id(b[0], _BYTES)
|
||||
num_bytes = struct.unpack('>I', b[1:5])[0]
|
||||
b_ = b[5:5+num_bytes]
|
||||
return b_, 5+num_bytes
|
||||
|
||||
def _dump_tuple(t):
|
||||
b = init_BYTES([_TUPLE])
|
||||
size = len(t)
|
||||
b += struct.pack('>I', size)
|
||||
for ti in t:
|
||||
b += _dump(ti)
|
||||
return b
|
||||
|
||||
def _load_tuple(b, classes):
|
||||
assert comp_id(b[0], _TUPLE)
|
||||
size = struct.unpack('>I', b[1:5])[0]
|
||||
idx = 5
|
||||
t = []
|
||||
for i in range(size):
|
||||
ob, len_ob = _load(b[idx:], classes)
|
||||
t.append(ob)
|
||||
idx += len_ob
|
||||
return tuple(t), idx
|
||||
|
||||
|
||||
|
||||
def _dump_namedtuple(t):
|
||||
b = init_BYTES([_NAMEDTUPLE])
|
||||
size = len(t)
|
||||
b += struct.pack('>I', size)
|
||||
b += _dump(t.__class__.__name__)
|
||||
for i in range(size):
|
||||
b += _dump(t._fields[i])
|
||||
b += _dump(t[i])
|
||||
return b
|
||||
|
||||
def _load_namedtuple(b, classes):
|
||||
assert comp_id(b[0], _NAMEDTUPLE)
|
||||
size = struct.unpack('>I', b[1:5])[0]
|
||||
class_name, len_ob = _load_str(b[5:])
|
||||
idx = 5 + len_ob
|
||||
t = []
|
||||
fields = []
|
||||
for i in range(size):
|
||||
ob, len_ob = _load(b[idx:], classes)
|
||||
fields.append(ob)
|
||||
idx += len_ob
|
||||
|
||||
ob, len_ob = _load(b[idx:], classes)
|
||||
t.append(ob)
|
||||
idx += len_ob
|
||||
|
||||
np_class = namedtuple(class_name, fields)
|
||||
np_obj = np_class(*t)
|
||||
|
||||
return np_obj, idx
|
||||
|
||||
def _dump_list(t):
|
||||
b = init_BYTES([_LIST])
|
||||
size = len(t)
|
||||
b += struct.pack('>I', size)
|
||||
for ti in t:
|
||||
b += _dump(ti)
|
||||
return b
|
||||
|
||||
def _load_list(b, classes):
|
||||
assert comp_id(b[0], _LIST)
|
||||
size = struct.unpack('>I', b[1:5])[0]
|
||||
idx = 5
|
||||
t = []
|
||||
for i in range(size):
|
||||
ob, len_ob = _load(b[idx:], classes)
|
||||
t.append(ob)
|
||||
idx += len_ob
|
||||
return t, idx
|
||||
|
||||
def _dump_np_array(np_array):
|
||||
b = init_BYTES([_NPARRAY])
|
||||
nparray_bytes = np.ndarray.dumps(np_array)
|
||||
size = len(nparray_bytes)
|
||||
b += struct.pack('>I', size)
|
||||
b += nparray_bytes
|
||||
return b
|
||||
|
||||
def _load_np_array(b):
|
||||
assert comp_id(b[0], _NPARRAY)
|
||||
size = struct.unpack('>I', b[1:5])[0]
|
||||
npa = np_load(b[5: size+5])
|
||||
return npa, size+5
|
||||
|
||||
def _dump_getstate(ob):
|
||||
b = init_BYTES([_GETSTATE])
|
||||
state = ob.__getstate__()
|
||||
obj_type = ob.__class__.__name__
|
||||
b += _dump(str(obj_type))
|
||||
b += _dump(state)
|
||||
|
||||
return b
|
||||
|
||||
# def _dump_bfstate(ob):
|
||||
# b = init_BYTES([_BFSTATE])
|
||||
# state = ob.__bfstate__()
|
||||
# obj_type = ob.__class__.__name__
|
||||
# b += _dump(str(obj_type))
|
||||
# b += _dump(state)
|
||||
# return b
|
||||
|
||||
def _load_getstate(b, classes):
|
||||
assert comp_id(b[0], _GETSTATE)
|
||||
obj_type, l_obj_type = _load_str(b[1:])
|
||||
state, l_state = _load(b[l_obj_type+1:], classes)
|
||||
try:
|
||||
cls = classes[obj_type]
|
||||
except KeyError:
|
||||
raise BFUnkownClassError(obj_type)
|
||||
obj = cls.__new__(cls)
|
||||
obj.__setstate__(state)
|
||||
return obj, l_obj_type+l_state+1
|
||||
|
||||
def _dump_dict(ob):
|
||||
b = init_BYTES([_DICT])
|
||||
keys = ob.keys()
|
||||
bin_keys = []
|
||||
for k in keys:
|
||||
bin_keys.append( (_dump(k), _dump(ob[k])) )
|
||||
b += _dump_list(sorted(bin_keys))
|
||||
return b
|
||||
|
||||
def _load_dict(b, classes):
|
||||
assert comp_id(b[0], _DICT)
|
||||
sorted_keys_value, l = _load_list(b[1:], classes)
|
||||
res_dict = {}
|
||||
for i in range(len(sorted_keys_value)):
|
||||
key = _load(sorted_keys_value[i][0], classes)[0]
|
||||
value = _load(sorted_keys_value[i][1], classes)[0]
|
||||
res_dict[key] = value
|
||||
|
||||
return res_dict, l+1
|
||||
|
||||
def _dump(ob):
|
||||
if isinstance(ob, _spec_types):
|
||||
return _dump_spec(ob)
|
||||
elif isinstance(ob, (int, LONG_TYPE)):
|
||||
if (__min_int32 <= ob) and (ob <= __max_int32):
|
||||
return _dump_int_32(ob)
|
||||
else:
|
||||
return _dump_int(ob)
|
||||
elif isinstance(ob, float):
|
||||
return _dump_float(ob)
|
||||
elif isinstance(ob, complex):
|
||||
return _dump_complex(ob)
|
||||
elif isinstance(ob, str):
|
||||
return _dump_str(ob)
|
||||
elif isinstance(ob, bytes):
|
||||
return _dump_bytes(ob)
|
||||
elif isinstance(ob, tuple):
|
||||
if hasattr(ob, '_fields'):
|
||||
return _dump_namedtuple(ob)
|
||||
else:
|
||||
return _dump_tuple(ob)
|
||||
elif isinstance(ob, list):
|
||||
return _dump_list(ob)
|
||||
elif isinstance(ob, np.ndarray):
|
||||
return _dump_np_array(ob)
|
||||
elif isinstance(ob, dict):
|
||||
return _dump_dict(ob)
|
||||
elif hasattr(ob, '__getstate__'):
|
||||
return _dump_getstate(ob)
|
||||
elif hasattr(ob, '__bfstate__'):
|
||||
return _dump_bfstate(ob)
|
||||
else:
|
||||
raise RuntimeError("unsupported type for dump '{}'".format(type(ob)))
|
||||
|
||||
def _load(b, classes):
|
||||
identifier = b[0]
|
||||
if isinstance(identifier, str):
|
||||
identifier = ord(identifier)
|
||||
if identifier == _SPEC:
|
||||
return _load_spec(b)
|
||||
elif identifier == _INT_32:
|
||||
return _load_int_32(b)
|
||||
elif (identifier == _INT) or (identifier == _INT_NEG):
|
||||
return _load_int(b)
|
||||
elif identifier == _FLOAT:
|
||||
return _load_float(b)
|
||||
elif identifier == _COMPLEX:
|
||||
return _load_complex(b)
|
||||
elif identifier == _STR:
|
||||
return _load_str(b)
|
||||
elif identifier == _BYTES:
|
||||
return _load_bytes(b)
|
||||
elif identifier == _TUPLE:
|
||||
return _load_tuple(b, classes)
|
||||
elif identifier == _NAMEDTUPLE:
|
||||
return _load_namedtuple(b, classes)
|
||||
elif identifier == _LIST:
|
||||
return _load_list(b, classes)
|
||||
elif identifier == _NPARRAY:
|
||||
return _load_np_array(b)
|
||||
elif identifier == _DICT:
|
||||
return _load_dict(b, classes)
|
||||
elif identifier == _GETSTATE:
|
||||
return _load_getstate(b, classes)
|
||||
elif identifier == _BFSTATE:
|
||||
raise BFLoadError("BFSTATE objects can not be loaded")
|
||||
else:
|
||||
raise BFLoadError("unknown identifier '{}'".format(hex(identifier)))
|
||||
|
||||
def dump(ob, vers=_VERS):
|
||||
"""
|
||||
returns the binary footprint of the object 'ob' as bytes
|
||||
"""
|
||||
global _dump
|
||||
if vers == _VERS:
|
||||
return init_BYTES([_VERS]) + _dump(ob)
|
||||
elif vers < 0x80:
|
||||
__dump_tmp = _dump
|
||||
_dump = _dump_00
|
||||
try:
|
||||
res = _dump(ob)
|
||||
finally:
|
||||
_dump = __dump_tmp
|
||||
|
||||
return res
|
||||
|
||||
def load(b, classes={}):
|
||||
"""
|
||||
reconstruct the object from the binary footprint given an bytes 'ba'
|
||||
"""
|
||||
global _load
|
||||
vers = b[0]
|
||||
if byte_to_ord(vers) == _VERS:
|
||||
return _load(b[1:], classes)[0]
|
||||
elif byte_to_ord(vers) < 0x80:
|
||||
# very first version
|
||||
# has not even a version tag
|
||||
__load_tmp = _load
|
||||
_load = _load_00
|
||||
try:
|
||||
res = _load(b, classes)[0]
|
||||
finally:
|
||||
_load = __load_tmp
|
||||
|
||||
return res
|
||||
else:
|
||||
raise RuntimeError("unknown version tag found!")
|
||||
|
||||
|
||||
##################################################################
|
||||
####
|
||||
#### VERY FIRST VERSION -- NO VERSION TAG
|
||||
####
|
||||
##################################################################
|
||||
#
|
||||
# so the first two bytes must correspond to an identifier which are assumed
|
||||
# to be < 128 = 0x80
|
||||
|
||||
def _load_namedtuple_00(b, classes):
|
||||
assert comp_id(b[0], _NAMEDTUPLE)
|
||||
size = struct.unpack('>I', b[1:5])[0]
|
||||
class_name, len_ob = _load_str(b[5:])
|
||||
idx = 5 + len_ob
|
||||
t = []
|
||||
for i in range(size):
|
||||
ob, len_ob = _load(b[idx:], classes)
|
||||
t.append(ob)
|
||||
idx += len_ob
|
||||
try:
|
||||
np_class = classes[class_name]
|
||||
except KeyError:
|
||||
raise BFUnkownClassError(class_name)
|
||||
|
||||
obj = np_class(*t)
|
||||
|
||||
return obj, idx
|
||||
|
||||
|
||||
def _dump_namedtuple_00(t):
|
||||
b = init_BYTES([_NAMEDTUPLE])
|
||||
size = len(t)
|
||||
b += struct.pack('>I', size)
|
||||
b += _dump(t.__class__.__name__)
|
||||
for ti in t:
|
||||
b += _dump(ti)
|
||||
return b
|
||||
|
||||
|
||||
def _load_00(b, classes):
|
||||
identifier = b[0]
|
||||
if isinstance(identifier, str):
|
||||
identifier = ord(identifier)
|
||||
if identifier == _SPEC:
|
||||
return _load_spec(b)
|
||||
elif identifier == _INT_32:
|
||||
return _load_int_32(b)
|
||||
elif (identifier == _INT) or (identifier == _INT_NEG):
|
||||
return _load_int(b)
|
||||
elif identifier == _FLOAT:
|
||||
return _load_float(b)
|
||||
elif identifier == _COMPLEX:
|
||||
return _load_complex(b)
|
||||
elif identifier == _STR:
|
||||
return _load_str(b)
|
||||
elif identifier == _BYTES:
|
||||
return _load_bytes(b)
|
||||
elif identifier == _TUPLE:
|
||||
return _load_tuple(b, classes)
|
||||
elif identifier == _NAMEDTUPLE:
|
||||
return _load_namedtuple_00(b, classes)
|
||||
elif identifier == _LIST:
|
||||
return _load_list(b, classes)
|
||||
elif identifier == _NPARRAY:
|
||||
return _load_np_array(b)
|
||||
elif identifier == _DICT:
|
||||
return _load_dict(b, classes)
|
||||
elif identifier == _GETSTATE:
|
||||
return _load_getstate(b, classes)
|
||||
else:
|
||||
raise BFLoadError("unknown identifier '{}'".format(hex(identifier)))
|
||||
|
||||
def _dump_00(ob):
|
||||
if isinstance(ob, _spec_types):
|
||||
return _dump_spec(ob)
|
||||
elif isinstance(ob, (int, LONG_TYPE)):
|
||||
if (__min_int32 <= ob) and (ob <= __max_int32):
|
||||
return _dump_int_32(ob)
|
||||
else:
|
||||
return _dump_int(ob)
|
||||
elif isinstance(ob, float):
|
||||
return _dump_float(ob)
|
||||
elif isinstance(ob, complex):
|
||||
return _dump_complex(ob)
|
||||
elif isinstance(ob, str):
|
||||
return _dump_str(ob)
|
||||
elif isinstance(ob, bytes):
|
||||
return _dump_bytes(ob)
|
||||
elif isinstance(ob, tuple):
|
||||
if hasattr(ob, '_fields'):
|
||||
return _dump_namedtuple_00(ob)
|
||||
else:
|
||||
return _dump_tuple(ob)
|
||||
elif isinstance(ob, list):
|
||||
return _dump_list(ob)
|
||||
elif isinstance(ob, np.ndarray):
|
||||
return _dump_np_array(ob)
|
||||
elif isinstance(ob, dict):
|
||||
return _dump_dict(ob)
|
||||
elif hasattr(ob, '__getstate__'):
|
||||
return _dump_getstate(ob)
|
||||
else:
|
||||
raise RuntimeError("unsupported type for dump '{}'".format(type(ob)))
|
180
tests/test_binfootprint.py
Normal file
180
tests/test_binfootprint.py
Normal file
|
@ -0,0 +1,180 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import division, print_function
|
||||
|
||||
import sys
|
||||
|
||||
if sys.version_info.major > 2:
|
||||
import pathlib
|
||||
path = pathlib.PosixPath(__file__).absolute()
|
||||
jobmanager = path.parent.parent
|
||||
sys.path.insert(0, str(jobmanager))
|
||||
else:
|
||||
from os.path import abspath, dirname, split
|
||||
# Add parent directory to beginning of path variable
|
||||
sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
|
||||
|
||||
import binfootprint as bfp
|
||||
|
||||
import numpy as np
|
||||
from collections import namedtuple
|
||||
|
||||
import warnings
|
||||
warnings.filterwarnings('error')
|
||||
|
||||
def test_version_tag():
|
||||
ob = 5
|
||||
binob = bfp.dump(ob)
|
||||
assert bfp.byte_to_ord(binob[0]) == bfp.getVersion()
|
||||
|
||||
def test_atom():
|
||||
atoms = [12345678, 3.141, 'hallo Welt', 'öäüß', True, False, None, 2**65, -3**65, b'\xff\fe\03']
|
||||
|
||||
for atom in atoms:
|
||||
bin_atom = bfp.dump(atom)
|
||||
atom_prime = bfp.load(bin_atom)
|
||||
bin_ob_prime = bfp.dump(atom_prime)
|
||||
assert bin_atom == bin_ob_prime
|
||||
|
||||
hash(bin_atom)
|
||||
|
||||
def test_tuple():
|
||||
t = (12345678, 3.141, 'hallo Welt', 'öäüß', True, False, None, (3, tuple(), (4,5,None), 'test'))
|
||||
bin_tuple = bfp.dump(t)
|
||||
assert type(bin_tuple) is bfp.BIN_TYPE
|
||||
t_prime = bfp.load(bin_tuple)
|
||||
assert t == t_prime
|
||||
bin_ob_prime = bfp.dump(t_prime)
|
||||
assert bin_tuple == bin_ob_prime
|
||||
|
||||
def test_nparray():
|
||||
ob = np.random.randn(3,53,2)
|
||||
bin_ob = bfp.dump(ob)
|
||||
assert type(bin_ob) is bfp.BIN_TYPE
|
||||
ob_prime = bfp.load(bin_ob)
|
||||
assert np.all(ob == ob_prime)
|
||||
bin_ob_prime = bfp.dump(ob_prime)
|
||||
assert bin_ob == bin_ob_prime
|
||||
|
||||
ob = np.random.randn(3,53,2)
|
||||
ob = (ob, ob, 4, None)
|
||||
bin_ob = bfp.dump(ob)
|
||||
ob_prime = bfp.load(bin_ob)
|
||||
assert np.all(ob[0] == ob_prime[0])
|
||||
assert np.all(ob[1] == ob_prime[1])
|
||||
bin_ob_prime = bfp.dump(ob_prime)
|
||||
assert bin_ob == bin_ob_prime
|
||||
|
||||
def test_list():
|
||||
ob = [1,2,3]
|
||||
bin_ob = bfp.dump(ob)
|
||||
assert type(bin_ob) is bfp.BIN_TYPE
|
||||
ob_prime = bfp.load(bin_ob)
|
||||
assert np.all(ob == ob_prime)
|
||||
bin_ob_prime = bfp.dump(ob_prime)
|
||||
assert bin_ob == bin_ob_prime
|
||||
|
||||
ob = [1, (2,3), np.array([2j,3j])]
|
||||
bin_ob = bfp.dump(ob)
|
||||
ob_prime = bfp.load(bin_ob)
|
||||
bin_ob_prime = bfp.dump(ob_prime)
|
||||
assert bin_ob == bin_ob_prime
|
||||
|
||||
assert np.all(ob[0] == ob_prime[0])
|
||||
assert np.all(ob[1] == ob_prime[1])
|
||||
assert np.all(ob[2] == ob_prime[2])
|
||||
|
||||
def test_getstate():
|
||||
class T(object):
|
||||
def __init__(self, a):
|
||||
self.a = a
|
||||
def __getstate__(self):
|
||||
return [self.a]
|
||||
def __setstate__(self, state):
|
||||
self.a = state[0]
|
||||
|
||||
ob = T(4)
|
||||
bin_ob = bfp.dump(ob)
|
||||
assert type(bin_ob) is bfp.BIN_TYPE
|
||||
|
||||
classes = {}
|
||||
classes['T'] = T
|
||||
|
||||
ob_prime = bfp.load(bin_ob, classes)
|
||||
|
||||
assert np.all(ob.a == ob_prime.a)
|
||||
bin_ob_prime = bfp.dump(ob_prime)
|
||||
assert bin_ob == bin_ob_prime
|
||||
|
||||
try:
|
||||
ob_prime = bfp.load(bin_ob)
|
||||
except bfp.BFUnkownClassError:
|
||||
pass
|
||||
else:
|
||||
assert False, "binfootprint.BFUnkownClassError should have been raised"
|
||||
|
||||
def test_named_tuple():
|
||||
obj_type = namedtuple('obj_type', ['a','b','c'])
|
||||
|
||||
obj = obj_type(12345678, 3.141, 'hallo Welt')
|
||||
|
||||
bin_obj = bfp.dump(obj)
|
||||
assert type(bin_obj) is bfp.BIN_TYPE
|
||||
obj_prime = bfp.load(bin_obj)
|
||||
assert obj_prime.__class__.__name__ == obj.__class__.__name__
|
||||
assert obj_prime._fields == obj._fields
|
||||
assert obj_prime == obj
|
||||
bin_ob_prime = bfp.dump(obj_prime)
|
||||
assert bin_obj == bin_ob_prime
|
||||
|
||||
def test_complex():
|
||||
z = 3+4j
|
||||
bf = bfp.dump(z)
|
||||
assert type(bf) is bfp.BIN_TYPE
|
||||
zr = bfp.load(bf)
|
||||
assert zr == z
|
||||
|
||||
def test_dict():
|
||||
a = {'a':1, 5:5, 3+4j:'l', False: b'ab4+#'}
|
||||
bf = bfp.dump(a)
|
||||
assert type(bf) is bfp.BIN_TYPE
|
||||
a_restored = bfp.load(bf)
|
||||
for k in a:
|
||||
assert a[k] == a_restored[k]
|
||||
|
||||
def test_versions():
|
||||
nt = namedtuple('nt', ['x', 'y'])
|
||||
n = nt(4,5)
|
||||
n2 = nt(n, n)
|
||||
ob = [3, n, n2]
|
||||
|
||||
binob = bfp.dump(ob, vers = 0)
|
||||
try:
|
||||
bfp.load(binob)
|
||||
except bfp.BFUnkownClassError:
|
||||
pass
|
||||
else:
|
||||
assert False, "binfootprint.BFUnkownClassError should have been raised"
|
||||
|
||||
rest_ob = bfp.load(binob, {'nt': nt})
|
||||
assert rest_ob == ob
|
||||
|
||||
binob = bfp.dump(ob, vers = 0x80)
|
||||
rest_ob = bfp.load(binob)
|
||||
assert rest_ob == ob
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_version_tag()
|
||||
test_atom()
|
||||
test_tuple()
|
||||
test_nparray()
|
||||
test_list()
|
||||
test_getstate()
|
||||
test_named_tuple()
|
||||
test_complex()
|
||||
test_dict()
|
||||
test_versions()
|
||||
|
||||
|
Loading…
Add table
Reference in a new issue