From df2c2a7ce5392f21a0056fabc7aacd12ec933ca2 Mon Sep 17 00:00:00 2001 From: SongGuyang Date: Tue, 17 Nov 2020 06:30:35 +0800 Subject: [PATCH] [cpp worker] support pass by reference on cluster mode (#11753) --- cpp/include/ray/api.h | 22 ++-- cpp/include/ray/api/actor_creator.h | 8 +- cpp/include/ray/api/actor_task_caller.h | 12 +- cpp/include/ray/api/arguments.h | 115 +++++++++++------- .../ray/api/generated/exec_funcs.generated.h | 36 +++--- cpp/include/ray/api/ray_runtime.h | 6 +- cpp/include/ray/api/task_caller.h | 8 +- cpp/src/example/example.cc | 2 +- cpp/src/ray/runtime/abstract_ray_runtime.cc | 24 ++-- cpp/src/ray/runtime/abstract_ray_runtime.h | 8 +- cpp/src/ray/runtime/local_mode_ray_runtime.cc | 7 ++ cpp/src/ray/runtime/local_mode_ray_runtime.h | 2 + cpp/src/ray/runtime/task/invocation_spec.h | 2 +- .../runtime/task/local_mode_task_submitter.cc | 48 +++----- .../runtime/task/local_mode_task_submitter.h | 8 +- .../ray/runtime/task/native_task_submitter.cc | 40 ++---- .../ray/runtime/task/native_task_submitter.h | 8 +- cpp/src/ray/runtime/task/task_executor.cc | 77 +++++++----- cpp/src/ray/runtime/task/task_executor.h | 10 +- cpp/src/ray/runtime/task/task_submitter.h | 6 +- cpp/src/ray/test/cluster/cluster_mode_test.cc | 7 +- cpp/src/ray/worker/default_worker.cc | 2 + 22 files changed, 235 insertions(+), 223 deletions(-) diff --git a/cpp/include/ray/api.h b/cpp/include/ray/api.h index caf86558f..7fd739852 100644 --- a/cpp/include/ray/api.h +++ b/cpp/include/ray/api.h @@ -197,13 +197,12 @@ template inline TaskCaller Ray::TaskInternal(FuncType &func, ExecFuncType &exec_func, ArgTypes &... args) { - std::shared_ptr buffer(new msgpack::sbuffer()); - msgpack::packer packer(buffer.get()); - Arguments::WrapArgs(packer, args...); + std::vector> task_args; + Arguments::WrapArgs(&task_args, args...); RemoteFunctionPtrHolder ptr; ptr.function_pointer = reinterpret_cast(func); ptr.exec_function_pointer = reinterpret_cast(exec_func); - return TaskCaller(runtime_.get(), ptr, buffer); + return TaskCaller(runtime_.get(), ptr, std::move(task_args)); } template Ray::CreateActorInternal(FuncType &create_func, ExecFuncType &exec_func, ArgTypes &... args) { - std::shared_ptr buffer(new msgpack::sbuffer()); - msgpack::packer packer(buffer.get()); - Arguments::WrapArgs(packer, args...); + std::vector> task_args; + Arguments::WrapArgs(&task_args, args...); RemoteFunctionPtrHolder ptr; ptr.function_pointer = reinterpret_cast(create_func); ptr.exec_function_pointer = reinterpret_cast(exec_func); - return ActorCreator(runtime_.get(), ptr, buffer); + return ActorCreator(runtime_.get(), ptr, std::move(task_args)); } template Ray::CallActorInternal(FuncType &actor_func, ExecFuncType &exec_func, ActorHandle &actor, ArgTypes &... args) { - std::shared_ptr buffer(new msgpack::sbuffer()); - msgpack::packer packer(buffer.get()); - Arguments::WrapArgs(packer, args...); + std::vector> task_args; + Arguments::WrapArgs(&task_args, args...); RemoteFunctionPtrHolder ptr; MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func); ptr.function_pointer = reinterpret_cast(holder.value[0]); ptr.exec_function_pointer = reinterpret_cast(exec_func); - return ActorTaskCaller(runtime_.get(), actor.ID(), ptr, buffer); + return ActorTaskCaller(runtime_.get(), actor.ID(), ptr, + std::move(task_args)); } // TODO(barakmich): These includes are generated files that do not contain their diff --git a/cpp/include/ray/api/actor_creator.h b/cpp/include/ray/api/actor_creator.h index a70fb166a..6f886a3aa 100644 --- a/cpp/include/ray/api/actor_creator.h +++ b/cpp/include/ray/api/actor_creator.h @@ -12,14 +12,14 @@ class ActorCreator { ActorCreator(); ActorCreator(RayRuntime *runtime, RemoteFunctionPtrHolder ptr, - std::shared_ptr args); + std::vector> &&args); ActorHandle Remote(); private: RayRuntime *runtime_; RemoteFunctionPtrHolder ptr_; - std::shared_ptr args_; + std::vector> args_; }; // ---------- implementation ---------- @@ -29,8 +29,8 @@ ActorCreator::ActorCreator() {} template ActorCreator::ActorCreator(RayRuntime *runtime, RemoteFunctionPtrHolder ptr, - std::shared_ptr args) - : runtime_(runtime), ptr_(ptr), args_(args) {} + std::vector> &&args) + : runtime_(runtime), ptr_(ptr), args_(std::move(args)) {} template ActorHandle ActorCreator::Remote() { diff --git a/cpp/include/ray/api/actor_task_caller.h b/cpp/include/ray/api/actor_task_caller.h index f720db777..761fa521f 100644 --- a/cpp/include/ray/api/actor_task_caller.h +++ b/cpp/include/ray/api/actor_task_caller.h @@ -12,7 +12,7 @@ class ActorTaskCaller { ActorTaskCaller(); ActorTaskCaller(RayRuntime *runtime, ActorID id, RemoteFunctionPtrHolder ptr, - std::shared_ptr args); + std::vector> &&args); ObjectRef Remote(); @@ -20,7 +20,7 @@ class ActorTaskCaller { RayRuntime *runtime_; ActorID id_; RemoteFunctionPtrHolder ptr_; - std::shared_ptr args_; + std::vector> args_; }; // ---------- implementation ---------- @@ -29,10 +29,10 @@ template ActorTaskCaller::ActorTaskCaller() {} template -ActorTaskCaller::ActorTaskCaller(RayRuntime *runtime, ActorID id, - RemoteFunctionPtrHolder ptr, - std::shared_ptr args) - : runtime_(runtime), id_(id), ptr_(ptr), args_(args) {} +ActorTaskCaller::ActorTaskCaller( + RayRuntime *runtime, ActorID id, RemoteFunctionPtrHolder ptr, + std::vector> &&args) + : runtime_(runtime), id_(id), ptr_(ptr), args_(std::move(args)) {} template ObjectRef ActorTaskCaller::Remote() { diff --git a/cpp/include/ray/api/arguments.h b/cpp/include/ray/api/arguments.h index 090c8e25d..b71c55fd4 100644 --- a/cpp/include/ray/api/arguments.h +++ b/cpp/include/ray/api/arguments.h @@ -1,7 +1,9 @@ #pragma once +#include #include +#include "ray/common/task/task_util.h" #include @@ -10,75 +12,100 @@ namespace api { class Arguments { public: - static void WrapArgs(msgpack::packer &packer); + static void WrapArgs(std::vector> *task_args); template - static void WrapArgs(msgpack::packer &packer, Arg1Type &arg1); - - template - static void WrapArgs(msgpack::packer &packer, Arg1Type &arg1, - OtherArgTypes &... args); - - static void UnwrapArgs(msgpack::unpacker &unpacker); + static void WrapArgs(std::vector> *task_args, + Arg1Type &arg1); template - static void UnwrapArgs(msgpack::unpacker &unpacker, std::shared_ptr *arg1); + static void WrapArgs(std::vector> *task_args, + ObjectRef &arg1); template - static void UnwrapArgs(msgpack::unpacker &unpacker, std::shared_ptr *arg1, + static void WrapArgs(std::vector> *task_args, + Arg1Type &arg1, OtherArgTypes &... args); + + static void UnwrapArgs(const std::vector> &args_buffer, + int &arg_index); + + template + static void UnwrapArgs(const std::vector> &args_buffer, + int &arg_index, std::shared_ptr *arg1); + + template + static void UnwrapArgs(const std::vector> &args_buffer, + int &arg_index, std::shared_ptr *arg1, std::shared_ptr *... args); }; // --------- inline implementation ------------ #include -inline void Arguments::WrapArgs(msgpack::packer &packer) {} +inline void Arguments::WrapArgs(std::vector> *task_args) { +} template -inline void Arguments::WrapArgs(msgpack::packer &packer, +inline void Arguments::WrapArgs(std::vector> *task_args, Arg1Type &arg1) { + /// TODO(Guyang Song): optimize the memory copy. + msgpack::sbuffer buffer; + msgpack::packer packer(buffer); /// Notice ObjectRefClassPrefix should be modified by ObjectRef class name or namespace. static const std::string ObjectRefClassPrefix = "N3ray3api9ObjectRef"; std::string type_name = typeid(arg1).name(); - if (type_name.rfind(ObjectRefClassPrefix, 0) == 0) { - /// Pass by reference. - Serializer::Serialize(packer, true); - } else { - /// Pass by value. - Serializer::Serialize(packer, false); - } + RAY_CHECK(type_name.rfind(ObjectRefClassPrefix, 0) != 0) + << "ObjectRef can not be wrapped"; Serializer::Serialize(packer, arg1); + auto memory_buffer = std::make_shared<::ray::LocalMemoryBuffer>( + reinterpret_cast(buffer.data()), buffer.size(), true); + /// Pass by value. + auto task_arg = new TaskArgByValue(std::make_shared<::ray::RayObject>( + memory_buffer, nullptr, std::vector())); + task_args->emplace_back(task_arg); } -template -inline void Arguments::WrapArgs(msgpack::packer &packer, Arg1Type &arg1, - OtherArgTypes &... args) { - WrapArgs(packer, arg1); - WrapArgs(packer, args...); -} - -inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker) {} - template -inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker, - std::shared_ptr *arg1) { - bool is_object_ref; - Serializer::Deserialize(unpacker, &is_object_ref); - if (is_object_ref) { - ObjectRef object_ref; - Serializer::Deserialize(unpacker, &object_ref); - *arg1 = object_ref.Get(); - } else { - Serializer::Deserialize(unpacker, arg1); - } +inline void Arguments::WrapArgs(std::vector> *task_args, + ObjectRef &arg1) { + /// Pass by reference. + auto task_arg = new TaskArgByReference(arg1.ID(), rpc::Address()); + task_args->emplace_back(task_arg); } template -inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker, - std::shared_ptr *arg1, - std::shared_ptr *... args) { - UnwrapArgs(unpacker, arg1); - UnwrapArgs(unpacker, args...); +inline void Arguments::WrapArgs(std::vector> *task_args, + Arg1Type &arg1, OtherArgTypes &... args) { + WrapArgs(task_args, arg1); + WrapArgs(task_args, args...); +} + +inline void Arguments::UnwrapArgs( + const std::vector> &args_buffer, int &arg_index) {} + +template +inline void Arguments::UnwrapArgs( + const std::vector> &args_buffer, int &arg_index, + std::shared_ptr *arg1) { + std::shared_ptr sbuffer; + auto arg_buffer = args_buffer[arg_index]->GetData(); + sbuffer = std::make_shared(arg_buffer->Size()); + /// TODO(Guyang Song): Avoid the memory copy. + sbuffer->write(reinterpret_cast(arg_buffer->Data()), arg_buffer->Size()); + msgpack::unpacker unpacker; + unpacker.reserve_buffer(sbuffer->size()); + memcpy(unpacker.buffer(), sbuffer->data(), sbuffer->size()); + unpacker.buffer_consumed(sbuffer->size()); + Serializer::Deserialize(unpacker, arg1); + arg_index++; +} + +template +inline void Arguments::UnwrapArgs( + const std::vector> &args_buffer, int &arg_index, + std::shared_ptr *arg1, std::shared_ptr *... args) { + UnwrapArgs(args_buffer, arg_index, arg1); + UnwrapArgs(args_buffer, arg_index, args...); } } // namespace api diff --git a/cpp/include/ray/api/generated/exec_funcs.generated.h b/cpp/include/ray/api/generated/exec_funcs.generated.h index 0b218824f..04f051d8f 100644 --- a/cpp/include/ray/api/generated/exec_funcs.generated.h +++ b/cpp/include/ray/api/generated/exec_funcs.generated.h @@ -10,13 +10,10 @@ template std::shared_ptr ExecuteNormalFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer, TaskType task_type, + const std::vector> &args_buffer, TaskType task_type, std::shared_ptr &... args) { - msgpack::unpacker unpacker; - unpacker.reserve_buffer(args_buffer->size()); - memcpy(unpacker.buffer(), args_buffer->data(), args_buffer->size()); - unpacker.buffer_consumed(args_buffer->size()); - Arguments::UnwrapArgs(unpacker, &args...); + int arg_index = 0; + Arguments::UnwrapArgs(args_buffer, arg_index, &args...); ReturnType return_value; typedef ReturnType (*Func)(OtherArgTypes...); @@ -33,7 +30,7 @@ std::shared_ptr ExecuteNormalFunction( template std::shared_ptr ExecuteActorFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer, + const std::vector> &args_buffer, std::shared_ptr &actor_buffer, std::shared_ptr &... args) { msgpack::unpacker actor_unpacker; @@ -44,11 +41,8 @@ std::shared_ptr ExecuteActorFunction( Serializer::Deserialize(actor_unpacker, &actor_ptr); ActorType *actor_object = (ActorType *)actor_ptr; - msgpack::unpacker unpacker; - unpacker.reserve_buffer(args_buffer->size()); - memcpy(unpacker.buffer(), args_buffer->data(), args_buffer->size()); - unpacker.buffer_consumed(args_buffer->size()); - Arguments::UnwrapArgs(unpacker, &args...); + int arg_index = 0; + Arguments::UnwrapArgs(args_buffer, arg_index, &args...); ReturnType return_value; typedef ReturnType (ActorType::*Func)(OtherArgTypes...); @@ -68,7 +62,7 @@ std::shared_ptr ExecuteActorFunction( template std::shared_ptr NormalExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer) { + const std::vector> &args_buffer) { return ExecuteNormalFunction( base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK); } @@ -77,7 +71,7 @@ std::shared_ptr NormalExecFunction( template std::shared_ptr NormalExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer) { + const std::vector> &args_buffer) { std::shared_ptr arg1_ptr; return ExecuteNormalFunction( base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK, arg1_ptr); @@ -87,7 +81,7 @@ std::shared_ptr NormalExecFunction( template std::shared_ptr NormalExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer) { + const std::vector> &args_buffer) { std::shared_ptr arg1_ptr; std::shared_ptr arg2_ptr; return ExecuteNormalFunction( @@ -98,7 +92,7 @@ std::shared_ptr NormalExecFunction( template std::shared_ptr CreateActorExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer) { + const std::vector> &args_buffer) { return ExecuteNormalFunction(base_addr, func_offset, args_buffer, TaskType::ACTOR_CREATION_TASK); } @@ -107,7 +101,7 @@ std::shared_ptr CreateActorExecFunction( template std::shared_ptr CreateActorExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer) { + const std::vector> &args_buffer) { std::shared_ptr arg1_ptr; return ExecuteNormalFunction( base_addr, func_offset, args_buffer, TaskType::ACTOR_CREATION_TASK, arg1_ptr); @@ -117,7 +111,7 @@ std::shared_ptr CreateActorExecFunction( template std::shared_ptr CreateActorExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer) { + const std::vector> &args_buffer) { std::shared_ptr arg1_ptr; std::shared_ptr arg2_ptr; return ExecuteNormalFunction(base_addr, func_offset, args_buffer, @@ -129,7 +123,7 @@ std::shared_ptr CreateActorExecFunction( template std::shared_ptr ActorExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer, + const std::vector> &args_buffer, std::shared_ptr &actor_buffer) { return ExecuteActorFunction(base_addr, func_offset, args_buffer, actor_buffer); @@ -139,7 +133,7 @@ std::shared_ptr ActorExecFunction( template std::shared_ptr ActorExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer, + const std::vector> &args_buffer, std::shared_ptr &actor_buffer) { std::shared_ptr arg1_ptr; return ExecuteActorFunction(base_addr, func_offset, args_buffer, @@ -150,7 +144,7 @@ std::shared_ptr ActorExecFunction( template std::shared_ptr ActorExecFunction( uintptr_t base_addr, size_t func_offset, - std::shared_ptr &args_buffer, + const std::vector> &args_buffer, std::shared_ptr &actor_buffer) { std::shared_ptr arg1_ptr; std::shared_ptr arg2_ptr; diff --git a/cpp/include/ray/api/ray_runtime.h b/cpp/include/ray/api/ray_runtime.h index 45d06242c..eaa1c013d 100644 --- a/cpp/include/ray/api/ray_runtime.h +++ b/cpp/include/ray/api/ray_runtime.h @@ -37,11 +37,11 @@ class RayRuntime { int timeout_ms) = 0; virtual ObjectID Call(const RemoteFunctionPtrHolder &fptr, - std::shared_ptr args) = 0; + std::vector> &args) = 0; virtual ActorID CreateActor(const RemoteFunctionPtrHolder &fptr, - std::shared_ptr args) = 0; + std::vector> &args) = 0; virtual ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, - std::shared_ptr args) = 0; + std::vector> &args) = 0; }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/include/ray/api/task_caller.h b/cpp/include/ray/api/task_caller.h index 763e6089a..a5e1c5f28 100644 --- a/cpp/include/ray/api/task_caller.h +++ b/cpp/include/ray/api/task_caller.h @@ -12,14 +12,14 @@ class TaskCaller { TaskCaller(); TaskCaller(RayRuntime *runtime, RemoteFunctionPtrHolder ptr, - std::shared_ptr args); + std::vector> &&args); ObjectRef Remote(); private: RayRuntime *runtime_; RemoteFunctionPtrHolder ptr_; - std::shared_ptr args_; + std::vector> args_; }; // ---------- implementation ---------- @@ -29,8 +29,8 @@ TaskCaller::TaskCaller() {} template TaskCaller::TaskCaller(RayRuntime *runtime, RemoteFunctionPtrHolder ptr, - std::shared_ptr args) - : runtime_(runtime), ptr_(ptr), args_(args) {} + std::vector> &&args) + : runtime_(runtime), ptr_(ptr), args_(std::move(args)) {} template ObjectRef TaskCaller::Remote() { diff --git a/cpp/src/example/example.cc b/cpp/src/example/example.cc index 0f10c31e4..1375136ca 100644 --- a/cpp/src/example/example.cc +++ b/cpp/src/example/example.cc @@ -5,7 +5,7 @@ #include /// using namespace -using namespace ray::api; +using namespace ::ray::api; /// general function of user code int Return1() { return 1; } diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index d5a102470..0b3ea9381 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -74,7 +74,7 @@ WaitResult AbstractRayRuntime::Wait(const std::vector &ids, int num_ob InvocationSpec BuildInvocationSpec(TaskType task_type, std::string lib_name, const RemoteFunctionPtrHolder &fptr, - std::shared_ptr args, + std::vector> &args, const ActorID &actor) { InvocationSpec invocation_spec; invocation_spec.task_type = task_type; @@ -83,27 +83,28 @@ InvocationSpec BuildInvocationSpec(TaskType task_type, std::string lib_name, invocation_spec.lib_name = lib_name; invocation_spec.fptr = fptr; invocation_spec.actor_id = actor; - invocation_spec.args = args; + invocation_spec.args = std::move(args); return invocation_spec; } ObjectID AbstractRayRuntime::Call(const RemoteFunctionPtrHolder &fptr, - std::shared_ptr args) { + std::vector> &args) { auto invocation_spec = BuildInvocationSpec( TaskType::NORMAL_TASK, this->config_->lib_name, fptr, args, ActorID::Nil()); return task_submitter_->SubmitTask(invocation_spec); } -ActorID AbstractRayRuntime::CreateActor(const RemoteFunctionPtrHolder &fptr, - std::shared_ptr args) { +ActorID AbstractRayRuntime::CreateActor( + const RemoteFunctionPtrHolder &fptr, + std::vector> &args) { auto invocation_spec = BuildInvocationSpec( TaskType::ACTOR_CREATION_TASK, this->config_->lib_name, fptr, args, ActorID::Nil()); return task_submitter_->CreateActor(invocation_spec); } -ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr, - const ActorID &actor, - std::shared_ptr args) { +ObjectID AbstractRayRuntime::CallActor( + const RemoteFunctionPtrHolder &fptr, const ActorID &actor, + std::vector> &args) { auto invocation_spec = BuildInvocationSpec(TaskType::ACTOR_TASK, this->config_->lib_name, fptr, args, actor); return task_submitter_->SubmitActorTask(invocation_spec); @@ -115,13 +116,6 @@ const TaskID &AbstractRayRuntime::GetCurrentTaskId() { const JobID &AbstractRayRuntime::GetCurrentJobID() { return worker_->GetCurrentJobID(); } -ActorID AbstractRayRuntime::GetNextActorID() { - const int next_task_index = worker_->GetNextTaskIndex(); - const ActorID actor_id = ActorID::Of(worker_->GetCurrentJobID(), - worker_->GetCurrentTaskID(), next_task_index); - return actor_id; -} - const std::unique_ptr &AbstractRayRuntime::GetWorkerContext() { return worker_; } diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.h b/cpp/src/ray/runtime/abstract_ray_runtime.h index 9f41fd6f3..ce22893d4 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.h +++ b/cpp/src/ray/runtime/abstract_ray_runtime.h @@ -32,15 +32,13 @@ class AbstractRayRuntime : public RayRuntime { WaitResult Wait(const std::vector &ids, int num_objects, int timeout_ms); ObjectID Call(const RemoteFunctionPtrHolder &fptr, - std::shared_ptr args); + std::vector> &args); ActorID CreateActor(const RemoteFunctionPtrHolder &fptr, - std::shared_ptr args); + std::vector> &args); ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, - std::shared_ptr args); - - ActorID GetNextActorID(); + std::vector> &args); const TaskID &GetCurrentTaskId(); diff --git a/cpp/src/ray/runtime/local_mode_ray_runtime.cc b/cpp/src/ray/runtime/local_mode_ray_runtime.cc index fa671bf70..7c8d41138 100644 --- a/cpp/src/ray/runtime/local_mode_ray_runtime.cc +++ b/cpp/src/ray/runtime/local_mode_ray_runtime.cc @@ -19,5 +19,12 @@ LocalModeRayRuntime::LocalModeRayRuntime(std::shared_ptr config) { task_submitter_ = std::unique_ptr(new LocalModeTaskSubmitter(*this)); } +ActorID LocalModeRayRuntime::GetNextActorID() { + const int next_task_index = worker_->GetNextTaskIndex(); + const ActorID actor_id = ActorID::Of(worker_->GetCurrentJobID(), + worker_->GetCurrentTaskID(), next_task_index); + return actor_id; +} + } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/local_mode_ray_runtime.h b/cpp/src/ray/runtime/local_mode_ray_runtime.h index 7d0eed0ad..5afca1eba 100644 --- a/cpp/src/ray/runtime/local_mode_ray_runtime.h +++ b/cpp/src/ray/runtime/local_mode_ray_runtime.h @@ -12,6 +12,8 @@ namespace api { class LocalModeRayRuntime : public AbstractRayRuntime { public: LocalModeRayRuntime(std::shared_ptr config); + + ActorID GetNextActorID(); }; } // namespace api diff --git a/cpp/src/ray/runtime/task/invocation_spec.h b/cpp/src/ray/runtime/task/invocation_spec.h index 07f840b31..0982d15c2 100644 --- a/cpp/src/ray/runtime/task/invocation_spec.h +++ b/cpp/src/ray/runtime/task/invocation_spec.h @@ -18,7 +18,7 @@ class InvocationSpec { int actor_counter; std::string lib_name; RemoteFunctionPtrHolder fptr; - std::shared_ptr args; + std::vector> args; }; } // namespace api } // namespace ray diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 7ea76eef9..61acd9298 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -18,7 +18,7 @@ LocalModeTaskSubmitter::LocalModeTaskSubmitter( thread_pool_.reset(new boost::asio::thread_pool(10)); } -ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) { +ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation) { /// TODO(Guyang Song): Make the infomation of TaskSpecification more reasonable /// We just reuse the TaskSpecification class and make the single process mode work. /// Maybe some infomation of TaskSpecification are not reasonable or invalid. @@ -47,6 +47,7 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) { PlacementGroupID::Nil(), true); if (invocation.task_type == TaskType::NORMAL_TASK) { } else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) { + invocation.actor_id = local_mode_ray_tuntime_.GetNextActorID(); builder.SetActorCreationTaskSpec(invocation.actor_id); } else if (invocation.task_type == TaskType::ACTOR_TASK) { const TaskID actor_creation_task_id = @@ -58,13 +59,9 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) { } else { throw RayException("unknown task type"); } - auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( - reinterpret_cast(invocation.args->data()), invocation.args->size(), - true); - /// TODO(Guyang Song): Use both 'AddByRefArg' and 'AddByValueArg' to distinguish - auto arg = TaskArgByValue( - std::make_shared<::ray::RayObject>(buffer, nullptr, std::vector())); - builder.AddArg(arg); + for (size_t i = 0; i < invocation.args.size(); i++) { + builder.AddArg(*invocation.args[i]); + } auto task_specification = builder.Build(); ObjectID return_object_id = task_specification.ReturnId(0); @@ -81,47 +78,34 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) { /// TODO(Guyang Song): Handle task dependencies. /// Execute actor task directly in the main thread because we must guarantee the actor /// task executed by calling order. - TaskExecutor::Invoke(task_specification, actor, runtime, dynamic_library_base_addr); + TaskExecutor::Invoke(task_specification, actor, runtime, dynamic_library_base_addr, + actor_contexts_, actor_contexts_mutex_); } else { boost::asio::post(*thread_pool_.get(), std::bind( - [actor, mutex, runtime](TaskSpecification &ts) { + [actor, mutex, runtime, this](TaskSpecification &ts) { if (mutex) { absl::MutexLock lock(mutex.get()); } - TaskExecutor::Invoke(ts, actor, runtime, - dynamic_library_base_addr); + TaskExecutor::Invoke( + ts, actor, runtime, dynamic_library_base_addr, + this->actor_contexts_, this->actor_contexts_mutex_); }, std::move(task_specification))); } return return_object_id; } -ObjectID LocalModeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) { +ObjectID LocalModeTaskSubmitter::SubmitTask(InvocationSpec &invocation) { return Submit(invocation); } -ActorID LocalModeTaskSubmitter::CreateActor(const InvocationSpec &invocation) { - if (dynamic_library_base_addr == 0) { - dynamic_library_base_addr = - GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer); - } - ActorID id = local_mode_ray_tuntime_.GetNextActorID(); - typedef std::shared_ptr (*ExecFunction)( - uintptr_t base_addr, size_t func_offset, std::shared_ptr args); - ExecFunction exec_function = (ExecFunction)(invocation.fptr.exec_function_pointer); - auto data = (*exec_function)( - dynamic_library_base_addr, - (size_t)(invocation.fptr.function_pointer - dynamic_library_base_addr), - invocation.args); - std::unique_ptr actorContext(new ActorContext()); - actorContext->current_actor = data; - absl::MutexLock lock(&actor_contexts_mutex_); - actor_contexts_.emplace(id, std::move(actorContext)); - return id; +ActorID LocalModeTaskSubmitter::CreateActor(InvocationSpec &invocation) { + Submit(invocation); + return invocation.actor_id; } -ObjectID LocalModeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) { +ObjectID LocalModeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation) { return Submit(invocation); } diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.h b/cpp/src/ray/runtime/task/local_mode_task_submitter.h index 22ad1f39d..79ac6a6f0 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.h +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.h @@ -18,11 +18,11 @@ class LocalModeTaskSubmitter : public TaskSubmitter { public: LocalModeTaskSubmitter(LocalModeRayRuntime &local_mode_ray_tuntime); - ObjectID SubmitTask(const InvocationSpec &invocation); + ObjectID SubmitTask(InvocationSpec &invocation); - ActorID CreateActor(const InvocationSpec &invocation); + ActorID CreateActor(InvocationSpec &invocation); - ObjectID SubmitActorTask(const InvocationSpec &invocation); + ObjectID SubmitActorTask(InvocationSpec &invocation); private: std::unordered_map> actor_contexts_; @@ -33,7 +33,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter { LocalModeRayRuntime &local_mode_ray_tuntime_; - ObjectID Submit(const InvocationSpec &invocation); + ObjectID Submit(InvocationSpec &invocation); }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index 4039dc76a..7fb4dacc8 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -8,7 +8,7 @@ namespace ray { namespace api { -RayFunction BuildRayFunction(const InvocationSpec &invocation) { +RayFunction BuildRayFunction(InvocationSpec &invocation) { auto base_addr = GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer); auto func_offset = (size_t)(invocation.fptr.function_pointer - base_addr); @@ -18,41 +18,25 @@ RayFunction BuildRayFunction(const InvocationSpec &invocation) { return RayFunction(Language::CPP, function_descriptor); } -void BuildTaskArgs(const InvocationSpec &invocation, - std::vector> &args) { - if (invocation.args->size() > 0) { - auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( - reinterpret_cast(invocation.args->data()), invocation.args->size(), - true); - auto task_arg = new TaskArgByValue( - std::make_shared<::ray::RayObject>(buffer, nullptr, std::vector())); - args.emplace_back(task_arg); - } -} - -ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation) { - std::vector> args; - BuildTaskArgs(invocation, args); +ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); std::vector return_ids; if (invocation.task_type == TaskType::ACTOR_TASK) { - core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation), args, - TaskOptions(), &return_ids); + core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation), + invocation.args, TaskOptions(), &return_ids); } else { - core_worker.SubmitTask(BuildRayFunction(invocation), args, TaskOptions(), &return_ids, - 1, std::make_pair(PlacementGroupID::Nil(), -1), true); + core_worker.SubmitTask(BuildRayFunction(invocation), invocation.args, TaskOptions(), + &return_ids, 1, std::make_pair(PlacementGroupID::Nil(), -1), + true); } return return_ids[0]; } -ObjectID NativeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) { +ObjectID NativeTaskSubmitter::SubmitTask(InvocationSpec &invocation) { return Submit(invocation); } -ActorID NativeTaskSubmitter::CreateActor(const InvocationSpec &invocation) { - std::vector> args; - BuildTaskArgs(invocation, args); - +ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); std::unordered_map resources; @@ -67,15 +51,15 @@ ActorID NativeTaskSubmitter::CreateActor(const InvocationSpec &invocation) { name, /*is_asyncio=*/false}; ActorID actor_id; - auto status = core_worker.CreateActor(BuildRayFunction(invocation), args, actor_options, - "", &actor_id); + auto status = core_worker.CreateActor(BuildRayFunction(invocation), invocation.args, + actor_options, "", &actor_id); if (!status.ok()) { throw RayException("Create actor error"); } return actor_id; } -ObjectID NativeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) { +ObjectID NativeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation) { return Submit(invocation); } diff --git a/cpp/src/ray/runtime/task/native_task_submitter.h b/cpp/src/ray/runtime/task/native_task_submitter.h index 064170088..ad395ddc1 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.h +++ b/cpp/src/ray/runtime/task/native_task_submitter.h @@ -10,14 +10,14 @@ namespace api { class NativeTaskSubmitter : public TaskSubmitter { public: - ObjectID SubmitTask(const InvocationSpec &invocation); + ObjectID SubmitTask(InvocationSpec &invocation); - ActorID CreateActor(const InvocationSpec &invocation); + ActorID CreateActor(InvocationSpec &invocation); - ObjectID SubmitActorTask(const InvocationSpec &invocation); + ObjectID SubmitActorTask(InvocationSpec &invocation); private: - ObjectID Submit(const InvocationSpec &invocation); + ObjectID Submit(InvocationSpec &invocation); }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index b07f58a7a..029d08ebc 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -17,7 +17,7 @@ TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_) // TODO(Guyang Song): Make a common task execution function used for both local mode and // cluster mode. -std::unique_ptr TaskExecutor::Execute(const InvocationSpec &invocation) { +std::unique_ptr TaskExecutor::Execute(InvocationSpec &invocation) { abstract_ray_tuntime_.GetWorkerContext(); return std::unique_ptr(new ObjectID()); }; @@ -25,7 +25,7 @@ std::unique_ptr TaskExecutor::Execute(const InvocationSpec &invocation Status TaskExecutor::ExecuteTask( TaskType task_type, const std::string task_name, const RayFunction &ray_function, const std::unordered_map &required_resources, - const std::vector> &args, + const std::vector> &args_buffer, const std::vector &arg_reference_ids, const std::vector &return_ids, std::vector> *results) { @@ -38,39 +38,32 @@ Status TaskExecutor::ExecuteTask( std::string lib_name = typed_descriptor->LibName(); std::string func_offset = typed_descriptor->FunctionOffset(); std::string exec_func_offset = typed_descriptor->ExecFunctionOffset(); - std::shared_ptr args_sbuffer; - if (args.size() > 0) { - auto args_buffer = args[0]->GetData(); - args_sbuffer = std::make_shared(args_buffer->Size()); - /// TODO(Guyang Song): Avoid the memory copy. - args_sbuffer->write(reinterpret_cast(args_buffer->Data()), - args_buffer->Size()); - } else { - args_sbuffer = std::make_shared(); - } auto base_addr = FunctionHelper::GetInstance().GetBaseAddress(lib_name); std::shared_ptr data = nullptr; if (task_type == TaskType::ACTOR_CREATION_TASK) { typedef std::shared_ptr (*ExecFunction)( - uintptr_t base_addr, size_t func_offset, std::shared_ptr args); + uintptr_t base_addr, size_t func_offset, + const std::vector> &args_buffer); ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset)); data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), - args_sbuffer); + args_buffer); current_actor_ = data; } else if (task_type == TaskType::ACTOR_TASK) { RAY_CHECK(current_actor_ != nullptr); typedef std::shared_ptr (*ExecFunction)( - uintptr_t base_addr, size_t func_offset, std::shared_ptr args, + uintptr_t base_addr, size_t func_offset, + const std::vector> &args_buffer, std::shared_ptr object); ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset)); data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), - args_sbuffer, current_actor_); + args_buffer, current_actor_); } else { // NORMAL_TASK typedef std::shared_ptr (*ExecFunction)( - uintptr_t base_addr, size_t func_offset, std::shared_ptr args); + uintptr_t base_addr, size_t func_offset, + const std::vector> &args_buffer); ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset)); data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), - args_sbuffer); + args_buffer); } std::vector data_sizes; @@ -95,34 +88,56 @@ Status TaskExecutor::ExecuteTask( return ray::Status::OK(); } -void TaskExecutor::Invoke(const TaskSpecification &task_spec, - std::shared_ptr actor, - AbstractRayRuntime *runtime, const uintptr_t base_addr) { - auto args = std::make_shared(task_spec.ArgDataSize(0)); - /// TODO(Guyang Song): Avoid the memory copy. - args->write(reinterpret_cast(task_spec.ArgData(0)), - task_spec.ArgDataSize(0)); +void TaskExecutor::Invoke( + const TaskSpecification &task_spec, std::shared_ptr actor, + AbstractRayRuntime *runtime, const uintptr_t base_addr, + std::unordered_map> &actor_contexts, + absl::Mutex &actor_contexts_mutex) { + std::vector> args_buffer; + for (size_t i = 0; i < task_spec.NumArgs(); i++) { + std::shared_ptr<::ray::LocalMemoryBuffer> memory_buffer = nullptr; + if (task_spec.ArgByRef(i)) { + auto arg = runtime->Get(task_spec.ArgId(i)); + memory_buffer = std::make_shared<::ray::LocalMemoryBuffer>( + reinterpret_cast(arg->data()), arg->size(), true); + } else { + memory_buffer = std::make_shared<::ray::LocalMemoryBuffer>( + const_cast(task_spec.ArgData(i)), task_spec.ArgDataSize(i), true); + } + args_buffer.emplace_back( + std::make_shared(memory_buffer, nullptr, std::vector())); + } + auto function_descriptor = task_spec.FunctionDescriptor(); auto typed_descriptor = function_descriptor->As(); std::shared_ptr data; if (actor) { typedef std::shared_ptr (*ExecFunction)( - uintptr_t base_addr, size_t func_offset, std::shared_ptr args, + uintptr_t base_addr, size_t func_offset, + std::vector> & args_buffer, std::shared_ptr object); unsigned long offset = std::stoul(typed_descriptor->ExecFunctionOffset()); auto address = base_addr + offset; ExecFunction exec_function = (ExecFunction)(address); data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), - args, actor); + args_buffer, actor); } else { typedef std::shared_ptr (*ExecFunction)( - uintptr_t base_addr, size_t func_offset, std::shared_ptr args); + uintptr_t base_addr, size_t func_offset, + std::vector> & args_buffer); ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(typed_descriptor->ExecFunctionOffset())); - data = - (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), args); + data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), + args_buffer); + } + if (task_spec.IsActorCreationTask()) { + std::unique_ptr actorContext(new ActorContext()); + actorContext->current_actor = data; + absl::MutexLock lock(&actor_contexts_mutex); + actor_contexts.emplace(task_spec.ActorCreationId(), std::move(actorContext)); + } else { + runtime->Put(std::move(data), task_spec.ReturnId(0)); } - runtime->Put(std::move(data), task_spec.ReturnId(0)); } } // namespace api diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h index 754ccd20d..5d02f3e00 100644 --- a/cpp/src/ray/runtime/task/task_executor.h +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -25,11 +25,13 @@ class TaskExecutor { TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_); /// TODO(Guyang Song): support multiple tasks execution - std::unique_ptr Execute(const InvocationSpec &invocation); + std::unique_ptr Execute(InvocationSpec &invocation); - static void Invoke(const TaskSpecification &task_spec, - std::shared_ptr actor, AbstractRayRuntime *runtime, - const uintptr_t base_addr); + static void Invoke( + const TaskSpecification &task_spec, std::shared_ptr actor, + AbstractRayRuntime *runtime, const uintptr_t base_addr, + std::unordered_map> &actor_contexts, + absl::Mutex &actor_contexts_mutex); static Status ExecuteTask( TaskType task_type, const std::string task_name, const RayFunction &ray_function, diff --git a/cpp/src/ray/runtime/task/task_submitter.h b/cpp/src/ray/runtime/task/task_submitter.h index 15e3fae97..bed613b4d 100644 --- a/cpp/src/ray/runtime/task/task_submitter.h +++ b/cpp/src/ray/runtime/task/task_submitter.h @@ -15,11 +15,11 @@ class TaskSubmitter { virtual ~TaskSubmitter(){}; - virtual ObjectID SubmitTask(const InvocationSpec &invocation) = 0; + virtual ObjectID SubmitTask(InvocationSpec &invocation) = 0; - virtual ActorID CreateActor(const InvocationSpec &invocation) = 0; + virtual ActorID CreateActor(InvocationSpec &invocation) = 0; - virtual ObjectID SubmitActorTask(const InvocationSpec &invocation) = 0; + virtual ObjectID SubmitActorTask(InvocationSpec &invocation) = 0; }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc index e3e0b990b..0115519c9 100644 --- a/cpp/src/ray/test/cluster/cluster_mode_test.cc +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -4,7 +4,7 @@ #include #include -using namespace ray::api; +using namespace ::ray::api; /// general function of user code int Return1() { return 1; } @@ -63,6 +63,11 @@ TEST(RayClusterModeTest, FullTest) { int actor_task_result2 = *(Ray::Get(actor_object2)); EXPECT_EQ(6, actor_task_result2); + /// actor task with args which pass by reference + ActorHandle actor3 = Ray::Actor(Counter::FactoryCreate, 6).Remote(); + auto actor_object3 = actor3.Task(&Counter::Add, actor_object2).Remote(); + int actor_task_result3 = *(Ray::Get(actor_object3)); + EXPECT_EQ(12, actor_task_result3); Ray::Shutdown(); } diff --git a/cpp/src/ray/worker/default_worker.cc b/cpp/src/ray/worker/default_worker.cc index f6f483580..2ebfb8d6c 100644 --- a/cpp/src/ray/worker/default_worker.cc +++ b/cpp/src/ray/worker/default_worker.cc @@ -3,6 +3,8 @@ #include #include +using namespace ::ray; + namespace ray { namespace api {