ray/rllib/utils/memory.py

100 lines
3 KiB
Python

import numpy as np
import time
import os
import ray
FREE_DELAY_S = 10.0
MAX_FREE_QUEUE_SIZE = 100
_last_free_time = 0.0
_to_free = []
# TODO(ekl) remove this feature entirely. It's here for now just in case we
# need to turn it on for debugging.
RLLIB_DEBUG_EXPLICIT_FREE = bool(os.environ.get("RLLIB_DEBUG_EXPLICIT_FREE"))
def ray_get_and_free(object_ids):
"""Call ray.get and then queue the object ids for deletion.
This function should be used whenever possible in RLlib, to optimize
memory usage. The only exception is when an object_id is shared among
multiple readers.
Args:
object_ids (ObjectID|List[ObjectID]): Object ids to fetch and free.
Returns:
The result of ray.get(object_ids).
"""
if not RLLIB_DEBUG_EXPLICIT_FREE:
return ray.get(object_ids)
global _last_free_time
global _to_free
result = ray.get(object_ids)
if type(object_ids) is not list:
object_ids = [object_ids]
_to_free.extend(object_ids)
# batch calls to free to reduce overheads
now = time.time()
if (len(_to_free) > MAX_FREE_QUEUE_SIZE
or now - _last_free_time > FREE_DELAY_S):
ray.internal.free(_to_free)
_to_free = []
_last_free_time = now
return result
def aligned_array(size, dtype, align=64):
"""Returns an array of a given size that is 64-byte aligned.
The returned array can be efficiently copied into GPU memory by TensorFlow.
"""
n = size * dtype.itemsize
empty = np.empty(n + (align - 1), dtype=np.uint8)
data_align = empty.ctypes.data % align
offset = 0 if data_align == 0 else (align - data_align)
if n == 0:
# stop np from optimising out empty slice reference
output = empty[offset:offset + 1][0:0].view(dtype)
else:
output = empty[offset:offset + n].view(dtype)
assert len(output) == size, len(output)
assert output.ctypes.data % align == 0, output.ctypes.data
return output
def concat_aligned(items):
"""Concatenate arrays, ensuring the output is 64-byte aligned.
We only align float arrays; other arrays are concatenated as normal.
This should be used instead of np.concatenate() to improve performance
when the output array is likely to be fed into TensorFlow.
"""
if len(items) == 0:
return []
elif len(items) == 1:
# we assume the input is aligned. In any case, it doesn't help
# performance to force align it since that incurs a needless copy.
return items[0]
elif (isinstance(items[0], np.ndarray)
and items[0].dtype in [np.float32, np.float64, np.uint8]):
dtype = items[0].dtype
flat = aligned_array(sum(s.size for s in items), dtype)
batch_dim = sum(s.shape[0] for s in items)
new_shape = (batch_dim, ) + items[0].shape[1:]
output = flat.reshape(new_shape)
assert output.ctypes.data % 64 == 0, output.ctypes.data
np.concatenate(items, out=output)
return output
else:
return np.concatenate(items)