[Core][RFC] limit the total number of inlined bytes in task request rpc

Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
This commit is contained in:
Chen Shen 2021-08-12 13:55:54 -07:00 committed by GitHub
parent ec8409ff06
commit 9565fa549e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 22 deletions

View file

@ -106,7 +106,8 @@ cdef class CoreWorker:
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
c_bool created_by_worker,
owner_address=*)
owner_address=*,
c_bool inline_small_object=*)
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,

View file

@ -322,11 +322,15 @@ cdef prepare_args(
cdef:
size_t size
int64_t put_threshold
int64_t rpc_inline_threshold
int64_t total_inlined
shared_ptr[CBuffer] arg_data
c_vector[CObjectID] inlined_ids
worker = ray.worker.global_worker
put_threshold = RayConfig.instance().max_direct_call_object_size()
total_inlined = 0
rpc_inline_threshold = RayConfig.instance().task_rpc_inlined_bytes_limit()
for arg in args:
if isinstance(arg, ObjectRef):
c_arg = (<ObjectRef>arg).native()
@ -353,7 +357,8 @@ cdef prepare_args(
# plasma here. This is inefficient for small objects, but inlined
# arguments aren't associated ObjectRefs right now so this is a
# simple fix for reference counting purposes.
if <int64_t>size <= put_threshold:
if <int64_t>size <= put_threshold and \
(<int64_t>size + total_inlined <= rpc_inline_threshold):
arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](size))
if size > 0:
@ -367,10 +372,12 @@ cdef prepare_args(
arg_data, string_to_buffer(metadata),
inlined_ids))))
inlined_ids.clear()
total_inlined += <int64_t>size
else:
args_vector.push_back(unique_ptr[CTaskArg](
new CTaskArgByReference(CObjectID.FromBinary(
core_worker.put_serialized_object(serialized_arg)),
core_worker.put_serialized_object(
serialized_arg, inline_small_object=False)),
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress())))
@ -1064,7 +1071,8 @@ cdef class CoreWorker:
c_vector[CObjectID] contained_ids,
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
c_bool created_by_worker,
owner_address=None):
owner_address=None,
c_bool inline_small_object=True):
cdef:
unique_ptr[CAddress] c_owner_address
@ -1075,7 +1083,8 @@ cdef class CoreWorker:
check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned(
metadata, data_size, contained_ids,
c_object_id, data, created_by_worker,
move(c_owner_address)))
move(c_owner_address),
inline_small_object))
else:
c_object_id[0] = object_ref.native()
if owner_address is None:
@ -1161,7 +1170,8 @@ cdef class CoreWorker:
def put_serialized_object(self, serialized_object,
ObjectRef object_ref=None,
c_bool pin_object=True,
owner_address=None):
owner_address=None,
c_bool inline_small_object=True):
cdef:
CObjectID c_object_id
shared_ptr[CBuffer] data
@ -1174,12 +1184,13 @@ cdef class CoreWorker:
metadata = string_to_buffer(serialized_object.metadata)
put_threshold = RayConfig.instance().max_direct_call_object_size()
put_small_object_in_memory_store = (
RayConfig.instance().put_small_object_in_memory_store())
RayConfig.instance().put_small_object_in_memory_store() and
inline_small_object)
total_bytes = serialized_object.total_bytes
object_already_exists = self._create_put_buffer(
metadata, total_bytes, object_ref,
ObjectRefsToVector(serialized_object.contained_object_refs),
&c_object_id, &data, True, owner_address)
&c_object_id, &data, True, owner_address, inline_small_object)
if not object_already_exists:
if total_bytes > 0:

View file

@ -192,7 +192,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[CObjectID] &contained_object_ids,
CObjectID *object_id, shared_ptr[CBuffer] *data,
c_bool created_by_worker,
const unique_ptr[CAddress] &owner_address)
const unique_ptr[CAddress] &owner_address,
c_bool inline_small_object)
CRayStatus CreateExisting(const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const CObjectID &object_id,

View file

@ -55,7 +55,7 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_bool put_small_object_in_memory_store() const
int64_t task_output_inlined_bytes_limit() const
int64_t task_rpc_inlined_bytes_limit() const
uint32_t max_tasks_in_flight_per_worker() const

View file

@ -572,14 +572,14 @@ def test_task_output_inline_bytes_limit(ray_start_cluster):
# Disable worker caching so worker leases are not reused; set object
# inlining size threshold and enable storing of small objects in in-memory
# object store so the borrowed ref is inlined.
# set task_output_inlined_bytes_limit which only allows inline 20 bytes.
# set task_rpc_inlined_bytes_limit which only allows inline 20 bytes.
cluster.add_node(
num_cpus=1,
resources={"pin_head": 1},
_system_config={
"worker_lease_timeout_milliseconds": 0,
"max_direct_call_object_size": 100 * 1024,
"task_output_inlined_bytes_limit": 20,
"task_rpc_inlined_bytes_limit": 20,
"put_small_object_in_memory_store": True,
},
)
@ -591,7 +591,8 @@ def test_task_output_inline_bytes_limit(ray_start_cluster):
return list(range(5))
@ray.remote(resources={"pin_worker": 1})
def sum(numbers):
def sum():
numbers = f.remote()
result = 0
for i, ref in enumerate(numbers):
result += ray.get(ref)
@ -603,9 +604,44 @@ def test_task_output_inline_bytes_limit(ray_start_cluster):
assert not inlined
return result
results = f.remote()
g_ref = sum.remote(results)
assert ray.get(g_ref) == 10
assert ray.get(sum.remote()) == 10
def test_task_arguments_inline_bytes_limit(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(
num_cpus=1,
resources={"pin_head": 1},
_system_config={
"max_direct_call_object_size": 100 * 1024,
# if task_rpc_inlined_bytes_limit is greater than
# max_grpc_message_size, this test fails.
"task_rpc_inlined_bytes_limit": 18 * 1024,
"max_grpc_message_size": 20 * 1024,
"put_small_object_in_memory_store": True,
},
)
cluster.add_node(num_cpus=1, resources={"pin_worker": 1})
ray.init(address=cluster.address)
@ray.remote(resources={"pin_worker": 1})
def foo(ref1, ref2, ref3):
return ref1 == ref2 + ref3
@ray.remote(resources={"pin_head": 1})
def bar():
# if the refs are inlined, the test fails.
# refs = [ray.put(np.random.rand(1024) for _ in range(3))]
# return ray.get(
# foo.remote(refs[0], refs[1], refs[2]))
return ray.get(
foo.remote(
np.random.rand(1024), # 8k
np.random.rand(1024), # 8k
np.random.rand(1024))) # 8k
ray.get(bar.remote())
if __name__ == "__main__":

View file

@ -297,8 +297,8 @@ RAY_CONFIG(int64_t, enable_metrics_collection, true)
/// Whether put small objects in the local memory store.
RAY_CONFIG(bool, put_small_object_in_memory_store, false)
// Max number bytes of inlined objects in a task execution result.
RAY_CONFIG(int64_t, task_output_inlined_bytes_limit, 10 * 1024 * 1024)
// Max number bytes of inlined objects in a task rpc request/response.
RAY_CONFIG(int64_t, task_rpc_inlined_bytes_limit, 10 * 1024 * 1024)
/// Maximum number of tasks that can be in flight between an owner and a worker for which
/// the owner has been granted a lease. A value >1 is used when we want to enable

View file

@ -1128,7 +1128,8 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
const std::vector<ObjectID> &contained_object_ids,
ObjectID *object_id, std::shared_ptr<Buffer> *data,
bool created_by_worker,
const std::unique_ptr<rpc::Address> &owner_address) {
const std::unique_ptr<rpc::Address> &owner_address,
bool inline_small_object) {
*object_id = ObjectID::FromIndex(worker_context_.GetCurrentTaskID(),
worker_context_.GetNextPutIndex());
rpc::Address real_owner_address =
@ -1173,7 +1174,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
if ((options_.is_local_mode ||
(RayConfig::instance().put_small_object_in_memory_store() &&
static_cast<int64_t>(data_size) < max_direct_call_object_size_)) &&
owned_by_us) {
owned_by_us && inline_small_object) {
*data = std::make_shared<LocalMemoryBuffer>(data_size);
} else {
if (status.ok()) {
@ -2097,7 +2098,7 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id,
(static_cast<int64_t>(data_size) < max_direct_call_object_size_ &&
// ensure we don't exceed the limit if we allocate this object inline.
(task_output_inlined_bytes + static_cast<int64_t>(data_size) <=
RayConfig::instance().task_output_inlined_bytes_limit()))) {
RayConfig::instance().task_rpc_inlined_bytes_limit()))) {
data_buffer = std::make_shared<LocalMemoryBuffer>(data_size);
task_output_inlined_bytes += static_cast<int64_t>(data_size);
} else {

View file

@ -540,12 +540,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] object create by worker or not.
/// \param[in] owner_address The address of object's owner. If not provided,
/// defaults to this worker.
/// \param[in] inline_small_object wether to inline create this object if it's
/// small.
/// \return Status.
Status CreateOwned(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const std::vector<ObjectID> &contained_object_ids,
ObjectID *object_id, std::shared_ptr<Buffer> *data,
bool created_by_worker,
const std::unique_ptr<rpc::Address> &owner_address = nullptr);
const std::unique_ptr<rpc::Address> &owner_address = nullptr,
bool inline_small_object = true);
/// Create and return a buffer in the object store that can be directly written
/// into, for an object ID that already exists. After writing to the buffer, the