[DataFrame] Implement Memoizer (#2157)

* Implement Memoizer

* Add LRUCache

* Add comments
This commit is contained in:
Simon Mo 2018-06-05 07:18:12 -07:00 committed by Devin Petersohn
parent 451cdb43f6
commit a139a5df8c

View file

@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import pandas as pd
import numpy as np
import ray
@ -10,6 +11,96 @@ from . import get_npartitions
_NAN_BLOCKS = {}
_MEMOIZER_CAPACITY = 1000 # Capacity per function
class LRUCache:
"""A LRUCache implemented with collections.OrderedDict
Notes:
- OrderedDict will record the order each item is inserted.
- The head of the queue will be LRU items.
"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = collections.OrderedDict()
def __contains__(self, key):
return key in self.cache
def __getitem__(self, key):
"""Retrieve item from cache and re-insert it to the back of the queue
"""
value = self.cache.pop(key)
self.cache[key] = value
return value
def __setitem__(self, key, value):
if key in self.cache:
self.cache.pop(key)
if len(self.cache) >= self.capacity:
# Pop oldest items at the beginning of the queue
self.cache.popitem(last=False)
self.cache[key] = value
class memoize:
"""A basic memoizer that cache the input and output of the remote function
Notes:
- How is this implemented?
This meoizer is implemented by adding a caching layer to the remote
function's remote attribute. When user call f.remote(*args), we will
first check against the cache, and then call the ray remote function
if we can't find the return value in the cache.
- When should this be used?
This should be used when we anticipate temporal locality for the
function. For example, we can reasonally assume users will perform
columnar operation repetitively over time (like sum() or loc[]).
- Caveat
Don't use this decorator if the any argument to the remote function
will mutate. Following snippet will fail
```py
@memoize
@ray.remote
def f(obj):
...
mutable_obj = [1]
oid_1 = f.remote(mutable_obj) # will be cached
mutable_obj.append(3)
oid_2 = f.remote(mutable_obj) # cache hit!
oid_1 == oid_2 # True!
```
In short, use this function sparingly. The ideal case is that all
inputs are ray ObjectIDs because they are immutable objects.
- Future Development
- Fix the mutability bug
- Dynamic cache size (Fixed as 1000 for now)
"""
def __init__(self, f):
# Save of remote function
self.old_remote_func = f.remote
self.cache = LRUCache(capacity=_MEMOIZER_CAPACITY)
def remote(self, *args):
"""Return cached result if the arguments are cached
"""
args = tuple(args)
if args in self.cache:
cached_result = self.cache[args]
return cached_result
result = self.old_remote_func(*args)
self.cache[args] = result
return result
def _get_nan_block_id(n_row=1, n_col=1, transpose=False):
@ -308,6 +399,7 @@ def create_blocks_helper(df, npartitions, axis):
return blocks
@memoize
@ray.remote
def _blocks_to_col(*partition):
if len(partition):
@ -317,6 +409,7 @@ def _blocks_to_col(*partition):
return pd.Series()
@memoize
@ray.remote
def _blocks_to_row(*partition):
row_part = pd.concat(partition, axis=1, copy=False)\