progress on hdf5 backend

This commit is contained in:
Richard Hartmann 2016-08-30 00:35:02 +02:00
parent 955e3dfd5b
commit 69597b011f
2 changed files with 326 additions and 353 deletions

View file

@ -12,6 +12,8 @@ import pickle
import warnings
import random
from . import binfootprint as bf
import progress
try:
@ -580,6 +582,7 @@ class PersistentDataStructure_HDF5(object):
self.__classname = self.__class__.__name__
self.verbose = verbose
if gr is None:
self._is_group = False
self._open = False
self._name = name
self._path = abspath(path)
@ -590,57 +593,108 @@ class PersistentDataStructure_HDF5(object):
self._filename = join(self._path, self._name + '.hdf5')
self.open()
else:
self._is_group = True
self._open = True
self._name = None
self._path = None
self._filename = None
self.db = gr
def _md5(self, key):
if isinstance(key, str):
key = key.encode('utf8')
return hashlib.md5(key).hexdigest()
def _convkey(self, key):
if isinstance(key, str):
return key
elif isinstance(key, (bytearray, bytes)):
return np.void(key)
if not isinstance(key, (bytes, bytearray)):
key = bf.dump(key)
return key
def _md5(self, binkey):
return hashlib.md5(binkey).hexdigest()
def __create_group(self, key, overwrite):
binkey = self._convkey(key)
_md5 = self._md5(binkey)
try:
gr_md5 = self.db[_md5]
except KeyError:
gr_md5 = self.db.create_group(_md5)
gr_md5.attrs['cnt'] = 0
for k in gr_md5:
test_binkey = gr_md5[k].attrs['key'].tostring()
if binkey == test_binkey:
if not overwrite:
raise KeyError("key exists but overwrite == False")
del gr_md5[k]
break
name = "gr{}".format(gr_md5.attrs['cnt'])
gr = gr_md5.create_group(name)
gr_md5.attrs['cnt'] = gr_md5.attrs['cnt'] + 1
gr.attrs['key'] = np.void(binkey)
return gr, binkey
def __set_dataset(self, key, data, overwrite):
binkey = self._convkey(key)
_md5 = self._md5(binkey)
try:
gr_md5 = self.db[_md5]
except KeyError:
gr_md5 = self.db.create_group(_md5)
gr_md5.attrs['cnt'] = 0
for k in gr_md5:
test_binkey = gr_md5[k].attrs['key'].tostring()
if binkey == test_binkey:
if not overwrite:
raise KeyError("key exists but overwrite == False")
del gr_md5[k]
break
name = "ds{}".format(gr_md5.attrs['cnt'])
try:
dataset = gr_md5.create_dataset(name, data=data )
dataset.attrs['pickle'] = False
except ValueError:
dataset = gr_md5.create_dataset(name, data=np.void(pickle.dumps(data)) )
dataset.attrs['pickle'] = True
gr_md5.attrs['cnt'] = gr_md5.attrs['cnt']+1
dataset.attrs['key'] = np.void(binkey)
return dataset, binkey
# implements '[]' operator setter
def __setitem__(self, key, value, overwrite=True):
if isinstance(value, self.__class__):
gr, binkey = self.__create_group(key, overwrite)
gr.update(value.db)
gr.attrs['key'] = np.void(binkey)
else:
raise TypeError("bad key type")
self.__set_dataset(key, value, overwrite)
def __dataset_to_object(self, dataset):
if isinstance(dataset, h5py.Dataset):
data = dataset.value
if dataset.attrs['pickle'] == True:
return pickle.loads(data)
else:
return data
else:
return PersistentDataStructure_HDF5(gr=dataset, verbose=self.verbose)
# implements '[]' operator getter
def __getitem__(self, key):
_md5 = self._md5(key)
if self.verbose > 2:
print("__getitem__")
print("key : ", key)
print("md5 : ", _md5)
binkey = self._convkey(key)
_md5 = self._md5(binkey)
try:
gr_md5 = self.db[_md5]
if self.verbose > 2:
print("gr_md5 found")
gr_md5 = self.db[_md5]
except KeyError:
raise KeyError("key not found in {}".format(self.__classname))
for k in gr_md5:
test_key = gr_md5[k].attrs['key']
if isinstance(test_key, np.void):
test_key = test_key.tostring()
if self.verbose > 2:
print("test against key", test_key)
if key == test_key:
dat = gr_md5[k]
print(type(dat))
print()
if isinstance(dat, h5py.Dataset):
return gr_md5[k].value
else:
return PersistentDataStructure_HDF5(gr=gr_md5[k], verbose=self.verbose)
test_binkey = gr_md5[k].attrs['key'].tostring()
if binkey == test_binkey:
return self.__dataset_to_object(gr_md5[k])
raise KeyError("key not found in {}".format(self.__classnme))
raise KeyError("key not found in {}".format(self.__classname))
# implements the 'in' statement
def __contains__(self, key):
@ -648,84 +702,7 @@ class PersistentDataStructure_HDF5(object):
self.__getitem__(key)
return True
except KeyError:
return False
def __create_group(self, key):
_md5 = self._md5(key)
if self.verbose > 2:
print("__create_group")
print("key : ", key)
print("md5 : ", _md5)
try:
gr_md5 = self.db[_md5]
if self.verbose > 2:
print("found md5 group")
except KeyError:
gr_md5 = self.db.create_group(_md5)
if self.verbose > 2:
print("create md5 group")
for dat in gr_md5:
test_key = dat.attrs['key']
if isinstance(test_key, np.void):
test_key = test_key.tostring()
if self.verbose > 2:
print("compare with test_key in md5_group: ", format(test_key))
if key == test_key:
raise RuntimeError("key must not exist when creating a group")
n = len(gr_md5)
gr = gr_md5.create_group('gr{}'.format(n))
gr.attrs['key'] = self._convkey(key)
if self.verbose > 2:
print("create group as {}th object in gr_md5".format(n))
return gr
def __set_dataset(self, key, data):
_md5 = self._md5(key)
if self.verbose > 2:
print("__set_dataset")
print("key : ", key)
print("md5 : ", _md5)
print("data: ", data)
try:
gr_md5 = self.db[_md5]
if self.verbose > 2:
print("found md5 group")
except KeyError:
gr_md5 = self.db.create_group(_md5)
print("create md5 group")
for k in gr_md5:
test_key = gr_md5[k].attrs['key']
if isinstance(test_key, np.void):
test_key = test_key.tostring()
if self.verbose > 2:
print("compare with test_key in md5_group: ", format(test_key))
if key == test_key:
raise RuntimeError("key must not exist when creating a dataset")
n = len(gr_md5)
try:
dat = gr_md5.create_dataset('dat{}'.format(n), data=data)
if self.verbose > 2:
print("set dataset from pure data")
except TypeError:
dat = gr_md5.create_dataset('dat{}'.format(n), data=np.void(pickle.dumps(data)))
if self.verbose > 2:
print("set dataset from binary data")
dat.attrs['key'] = self._convkey(key)
return dat
# implements '[]' operator setter
def __setitem__(self, key, value):
if isinstance(value, self.__class__):
raise NotImplementedError
else:
self.__set_dataset(key, value)
return False
def __len__(self):
l = 0
@ -735,26 +712,34 @@ class PersistentDataStructure_HDF5(object):
# implements '[]' operator deletion
def __delitem__(self, key):
_md5 = self._md5(key)
binkey = self._convkey(key)
_md5 = self._md5(binkey)
try:
gr_md5 = self.db[_md5]
except KeyError:
return
for k in gr_md5:
test_key = gr_md5[k].attrs['key']
if isinstance(test_key, np.void):
test_key = test_key.tostring()
test_binkey = gr_md5[k].attrs['key'].tostring()
if key == test_key:
if binkey == test_binkey:
del gr_md5[k]
if len(gr_md5) == 0:
del self.db[_md5]
# implements the iterator
def __iter__(self):
self.need_open()
for gr_md5 in self.db.values():
for ob in gr_md5.values():
yield self.__dataset_to_object(ob)
def open(self):
if self._filename is None:
raise RuntimeError("can not open a group")
if self._is_group:
raise RuntimeError("can not open a group")
if self.verbose > 1:
print("open", self._filename)
self.db = h5py.File(self._filename)
self._open = True
@ -769,7 +754,7 @@ class PersistentDataStructure_HDF5(object):
raise RuntimeError("{} needs to be open".format(self.__classname))
def close(self):
if self._filename is None:
if self._is_group:
raise RuntimeError("can not close as group")
self.db.close()
@ -779,17 +764,15 @@ class PersistentDataStructure_HDF5(object):
return self
def __exit__(self, exc_type, exc_value, traceback):
if self._filename is not None:
if not self._is_group:
self.close()
def clear(self):
self.need_open()
for k in self.db:
del self.db[k]
for k in self.db.attrs:
del self.db.attrs[k]
self.db.attrs['cnt'] = 0
def erase(self):
if self.verbose > 1:
@ -803,26 +786,26 @@ class PersistentDataStructure_HDF5(object):
def setData(self, key, value, overwrite=False):
if overwrite:
self.__delitem__(key)
self.__setitem__(key, value)
self.__setitem__(key, value, overwrite)
def newSubData(self, key):
pass
def newSubData(self, key, overwrite=False):
gr, binkey = self.__create_group(key, overwrite)
return PersistentDataStructure_HDF5(gr = gr, verbose = self.verbose)
def getData(self, key, create_sub_data = False):
try:
return self.__getitem__(key)
except KeyError:
if create_sub_data:
return PersistentDataStructure_HDF5(gr = self.__create_group(key), verbose = self.verbose)
return self.newSubData(key)
else:
raise
def setDataFromSubData(self, key, subData, overwrite=False):
pass
self.setData(key, subData, overwrite)
def mergeOtherPDS(self, other_db_name, other_db_path = './', update = 'error', status_interval=5):
pass
raise NotImplementedError

View file

@ -18,43 +18,75 @@ sys.path = [split(dirname(abspath(__file__)))[0]] + sys.path
from jobmanager.persistentData import PersistentDataStructure_HDF5 as PDS
VERBOSE = 3
VERBOSE = 1
def test_md5_clash():
with PDS(name='test_data', verbose=VERBOSE) as data:
data.clear()
data._md5 = lambda key: str(np.random.randint(0,2))
for i in range(100):
data['a{}'.format(i)] = i
assert len(data) == 100
for i in range(100):
data.newSubData('s{}'.format(i))
assert len(data) == 200
n = 0
for k in data:
n += 1
assert n == 200
def test_pd():
try:
with PDS(name='test_data', verbose=VERBOSE) as data:
key = 'a'
value = 1
data.setData(key=key, value=value)
assert data.getData(key) == value
assert len(data) == 1
with PDS(name='test_data', verbose=VERBOSE) as data:
data.clear()
key = 'a'
value = 1
try:
data.getData(key)
except KeyError as e:
pass
data.setData(key=key, value=value)
assert data.getData(key) == value
assert len(data) == 1
key_sub = 'zz'
with data.getData(key_sub, create_sub_data=True) as sub_data:
sub_data.setData(key=key, value=3)
assert sub_data.getData(key) == 3
assert data.getData(key) == 1
key_sub = 'zz'
with data.getData(key_sub, create_sub_data=True) as sub_data:
print(type(sub_data))
print()
sub_data.setData(key=key, value=3)
print()
with sub_data.getData(key_sub, create_sub_data=True) as sub_sub_data:
sub_sub_data.setData(key=key, value=4)
assert sub_sub_data.getData(key) == 4
assert sub_data.getData(key) == 3
assert data.getData(key) == 1
print("#"*4)
with sub_data.getData(key_sub, create_sub_data=True) as sub_sub_data:
sub_sub_data.setData(key=key, value=4)
assert sub_sub_data.getData(key) == 4
assert sub_data.getData(key) == 3
assert data.getData(key) == 1
print("#"*4)
with sub_data.getData(key_sub, create_sub_data=True) as sub_sub_data:
assert sub_sub_data.getData(key) == 4
assert sub_data.getData(key) == 3
assert data.getData(key) == 1
with sub_data.getData(key_sub, create_sub_data=True) as sub_sub_data:
assert sub_sub_data.getData(key) == 4
assert sub_data.getData(key) == 3
assert data.getData(key) == 1
with PDS(name='test_data', verbose=VERBOSE) as data:
data['d1'] = ('ö', 4, [0])
with PDS(name='test_data', verbose=VERBOSE) as data:
d1 = data['d1']
assert d1[0] == 'ö'
assert d1[1] == 4
assert d1[2] == [0]
with PDS(name='test_data', verbose=VERBOSE) as data:
data.clear()
data.newSubData(key='sub_1', overwrite = False)
with PDS(name='test_data', verbose=VERBOSE) as data:
try:
data.newSubData(key='sub_1', overwrite = False)
except KeyError:
pass
finally:
print()
data.erase()
data.newSubData(key='sub_1', overwrite = True)
def test_pd_bytes():
t1 = (3.4, 4.5, 5.6, 6.7, 7.8, 8.9)
@ -77,35 +109,15 @@ def test_pd_bytes():
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key=b1) as sub_data:
for i in range(2, 10):
assert sub_data[i] == t2
assert np.all(sub_data[i] == t2)
assert base_data[b2] == t1
assert np.all(base_data[b2] == t1)
finally:
print()
base_data.erase()
def test_directory_removal():
try:
with PDS(name='data', verbose=VERBOSE) as data:
with data.newSubData('s1') as s1:
s1['bla'] = 9
f = open(file=data._dirname + '/other_file', mode='w')
f.close()
print("now there should be a warning, because there is an unknown file in the directory!")
finally:
try:
data.erase()
except UserWarning:
pass
assert exists(data._dirname)
os.remove(data._dirname + '/other_file')
os.rmdir(data._dirname)
def test_mp_read_from_sqlite():
import sqlitedict as sqd
import multiprocessing as mp
@ -150,121 +162,104 @@ def test_from_existing_sub_data():
t1 = (3.4, 4.5, 5.6, 6.7, 7.8, 8.9)
t2 = (3.4, 4.5, 5.6, 6.7, 7.8, 8.9, 9,1)
try:
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub1', create_sub_data = True) as sub_data:
sub_data[100] = t1
sub_data[200] = t2
with sub_data.getData(key = 'subsub1', create_sub_data = True) as sub_sub_data:
sub_sub_data['t'] = 'hallo Welt'
base_data.setDataFromSubData(key='sub2', subData = sub_data)
with PDS(name='base', verbose=VERBOSE) as base_data:
base_data.clear()
with base_data.getData(key='s1', create_sub_data = True) as s1:
s1['d1'] = 1
s1['d2'] = 'b'
with PDS(name='base', verbose=VERBOSE) as base_data:
assert base_data['s1']['d1'] == 1
assert base_data['s1']['d2'] == 'b'
base_data.setDataFromSubData('s2', base_data['s1'])
assert base_data['s2']['d1'] == 1
assert base_data['s2']['d2'] == 'b'
del base_data['s1']
assert base_data['s2']['d1'] == 1
assert base_data['s2']['d2'] == 'b'
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub1', create_sub_data = True) as sub_data:
sub_data[100] = t1
sub_data[200] = t2
with sub_data.getData(key = 'subsub1', create_sub_data = True) as sub_sub_data:
sub_sub_data['t'] = 'hallo Welt'
base_data.setDataFromSubData(key='sub2', subData = sub_data)
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub2', create_sub_data = False) as sub_data:
assert np.all(sub_data[100] == t1)
assert np.all(sub_data[200] == t2)
with sub_data.getData(key = 'subsub1', create_sub_data = False) as sub_sub_data:
assert sub_sub_data['t'] == 'hallo Welt'
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub1', create_sub_data = True) as sub_data:
base_data['sub2'] = sub_data
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub2', create_sub_data = False) as sub_data:
assert sub_data[100] == t1
assert sub_data[200] == t2
with sub_data.getData(key = 'subsub1', create_sub_data = False) as sub_sub_data:
assert sub_sub_data['t'] == 'hallo Welt'
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub1', create_sub_data = True) as sub_data:
base_data['sub2'] = sub_data
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub2', create_sub_data = False) as sub_data:
assert sub_data[100] == t1
assert sub_data[200] == t2
with sub_data.getData(key = 'subsub1', create_sub_data = False) as sub_sub_data:
assert sub_sub_data['t'] == 'hallo Welt'
sub_sub_data['t'] = 'sub2:hallo Welt'
sub_data[100] = "sub2:t1"
sub_data[200] = "sub2:t2"
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub1', create_sub_data = True) as sub_data:
assert sub_data[100] == t1
assert sub_data[200] == t2
with sub_data.getData(key = 'subsub1', create_sub_data = True) as sub_sub_data:
assert sub_sub_data['t'] == 'hallo Welt'
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub2', create_sub_data = False) as sub_data:
with sub_data.getData(key = 'subsub1', create_sub_data = False) as sub_sub_data:
assert sub_sub_data['t'] == 'sub2:hallo Welt'
assert sub_data[100] == "sub2:t1"
assert sub_data[200] == "sub2:t2"
finally:
print()
base_data.erase()
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub2', create_sub_data = False) as sub_data:
assert np.all(sub_data[100] == t1)
assert np.all(sub_data[200] == t2)
with sub_data.getData(key = 'subsub1', create_sub_data = False) as sub_sub_data:
assert sub_sub_data['t'] == 'hallo Welt'
sub_sub_data['t'] = 'sub2:hallo Welt'
sub_data[100] = "sub2:t1"
sub_data[200] = "sub2:t2"
with PDS(name='base', verbose=VERBOSE) as base_data:
base_data.clear()
with base_data.getData(key = 'sub1', create_sub_data = True) as sub1:
sub1['npa'] = np.linspace(0,1,10)
sub1['val'] = 'hallo ich bin sub1'
base_data['sub2'] = sub1
with PDS(name='base', verbose=VERBOSE) as base_data:
npa1 = base_data['sub1']['npa']
npa2 = base_data['sub1']['npa']
assert type(npa1) == np.ndarray
assert type(npa2) == np.ndarray
def test_remove_sub_data_and_check_len():
try:
with PDS(name='base', verbose=VERBOSE) as base_data:
with base_data.getData(key='sub1', create_sub_data = True) as sub_data:
sub_data[100] = 't1'
sub_data[200] = 't2'
with sub_data.getData(key = 'subsub1', create_sub_data = True) as sub_sub_data:
sub_sub_data['t'] = 'hallo Welt'
assert len(sub_data) == 3, "len = {}".format(len(sub_data))
assert len(base_data) == 1
base_data['copy_of_sub1'] = sub_data
assert len(base_data) == 2
del base_data['sub1']
assert len(base_data) == 1
with base_data.getData(key='copy_of_sub1', create_sub_data = True) as sub_data:
assert len(sub_data) == 3
assert sub_data[100] == 't1'
assert sub_data[200] == 't2'
with sub_data.getData(key = 'subsub1', create_sub_data = True) as sub_sub_data:
assert sub_sub_data['t'] == 'hallo Welt'
assert ('sub1' not in base_data)
finally:
base_data.erase()
def test_show_stat():
try:
with PDS(name='test_data', verbose=VERBOSE) as data:
key = 'a'
value = 1
data.setData(key=key, value=value)
assert data.getData(key) == value
key_sub = 'zz'
with data.getData(key_sub, create_sub_data=True) as sub_data:
sub_data.setData(key=key, value=3)
assert sub_data.getData(key) == 3
assert data.getData(key) == 1
with PDS(name='base', verbose=VERBOSE) as base_data:
base_data.clear()
with base_data.getData(key='sub1', create_sub_data = True) as sub_data:
sub_data[100] = 't1'
sub_data[200] = 't2'
with sub_data.getData(key = 'subsub1', create_sub_data = True) as sub_sub_data:
sub_sub_data['t'] = 'hallo Welt'
key_sub_bin = pickle.dumps(key_sub, protocol=2)
with sub_data.getData(key_sub_bin, create_sub_data=True) as sub_sub_data:
sub_sub_data.setData(key=key, value=4)
assert sub_sub_data.getData(key) == 4
assert sub_data.getData(key) == 3
assert data.getData(key) == 1
with sub_data.getData(key_sub_bin, create_sub_data=True) as sub_sub_data:
assert sub_sub_data.getData(key) == 4
assert sub_data.getData(key) == 3
assert data.getData(key) == 1
assert len(sub_data) == 3, "len = {}".format(len(sub_data))
assert len(base_data) == 1
base_data['copy_of_sub1'] = sub_data
assert len(base_data) == 2
del base_data['sub1']
assert len(base_data) == 1
with base_data.getData(key='copy_of_sub1', create_sub_data = True) as sub_data:
assert len(sub_data) == 3
assert sub_data[100] == 't1'
assert sub_data[200] == 't2'
with sub_data.getData(key = 'subsub1', create_sub_data = True) as sub_sub_data:
assert sub_sub_data['t'] == 'hallo Welt'
data.show_stat(recursive=True)
finally:
data.erase()
assert ('sub1' not in base_data)
def slow_len(pd):
n = 0
@ -273,61 +268,55 @@ def slow_len(pd):
return n
def test_len():
try:
with PDS(name='data', verbose=VERBOSE) as data:
assert len(data) == 0
assert slow_len(data) == 0
data['a'] = 1
assert len(data) == 1
assert slow_len(data) == 1
for i in range(1, 8):
data[i*10] = i
assert len(data) == 8
assert slow_len(data) == 8
with PDS(name='data', verbose=VERBOSE) as data:
assert len(data) == 8
assert slow_len(data) == 8
data.clear()
assert len(data) == 0
assert slow_len(data) == 0
with PDS(name='data', verbose=VERBOSE) as data:
assert len(data) == 0
assert slow_len(data) == 0
finally:
data.erase()
with PDS(name='data', verbose=VERBOSE) as data:
data.clear()
assert len(data) == 0
assert slow_len(data) == 0
data['a'] = 1
assert len(data) == 1
assert slow_len(data) == 1
for i in range(1, 8):
data[i*10] = i
assert len(data) == 8
assert slow_len(data) == 8
with PDS(name='data', verbose=VERBOSE) as data:
assert len(data) == 8
assert slow_len(data) == 8
data.clear()
assert len(data) == 0
assert slow_len(data) == 0
with PDS(name='data', verbose=VERBOSE) as data:
assert len(data) == 0
assert slow_len(data) == 0
def test_clear():
try:
with PDS(name='data', verbose=VERBOSE) as data:
data['a'] = 1
data['b'] = 2
with data.newSubData('s1') as s1:
s1['bla'] = 9
with data.newSubData('s2') as s2:
s2['bla2'] = 18
with data['s1'] as s1:
s1['t'] = 'tmp'
s1.clear()
with data['s1'] as s1:
assert len(s1) == 0
assert slow_len(s1) == 0
data.clear()
dir_content = os.listdir(data._dirname)
assert len(dir_content) == 1
assert dir_content[0] == 'data.db'
finally:
data.erase()
with PDS(name='data', verbose=VERBOSE) as data:
data.clear()
data['a'] = 1
data['b'] = 2
with data.newSubData('s1') as s1:
s1['bla'] = 9
with data.newSubData('s2') as s2:
s2['bla2'] = 18
with data['s1'] as s1:
s1['t'] = 'tmp'
s1.clear()
with data['s1'] as s1:
assert len(s1) == 0
assert slow_len(s1) == 0
data.clear()
def test_not_in():
try:
@ -518,17 +507,18 @@ def test_merge_fname_conflict():
if __name__ == "__main__":
test_clear()
test_pd()
# test_pd_bytes()
# test_directory_removal()
# test_mp_read_from_sqlite()
# test_from_existing_sub_data()
# test_remove_sub_data_and_check_len()
# test_show_stat()
# test_len()
# test_clear()
# test_not_in()
# test_npa()
test_md5_clash()
test_pd_bytes()
test_mp_read_from_sqlite()
test_from_existing_sub_data()
test_remove_sub_data_and_check_len()
test_len()
test_not_in()
test_npa()
# test_merge()
# test_merge_fname_conflict()
pass