From 1583cd14ef14e8aac19ce38f80e25feeed278a39 Mon Sep 17 00:00:00 2001 From: SongGuyang Date: Mon, 15 Jun 2020 10:13:19 +0800 Subject: [PATCH] Add interfaces for C++ worker cluster mode (#8859) --- cpp/include/ray/api/ray_config.h | 13 +++++-- cpp/src/ray/api.cc | 5 ++- cpp/src/ray/ray_config.cc | 16 +++++++++ cpp/src/ray/runtime/abstract_ray_runtime.cc | 7 ++-- cpp/src/ray/runtime/native_ray_runtime.cc | 23 ++++++++++++ cpp/src/ray/runtime/native_ray_runtime.h | 17 +++++++++ .../ray/runtime/object/native_object_store.cc | 35 +++++++++++++++++++ .../ray/runtime/object/native_object_store.h | 31 ++++++++++++++++ .../ray/runtime/task/native_task_submitter.cc | 30 ++++++++++++++++ .../ray/runtime/task/native_task_submitter.h | 31 ++++++++++++++++ cpp/src/ray/runtime/task/task_executor.cc | 4 +++ cpp/src/ray/runtime/task/task_executor.h | 5 +++ cpp/src/ray/util/process_helper.cc | 9 +++++ cpp/src/ray/util/process_helper.h | 11 ++++++ 14 files changed, 230 insertions(+), 7 deletions(-) create mode 100644 cpp/src/ray/ray_config.cc create mode 100644 cpp/src/ray/runtime/native_ray_runtime.cc create mode 100644 cpp/src/ray/runtime/native_ray_runtime.h create mode 100644 cpp/src/ray/runtime/object/native_object_store.cc create mode 100644 cpp/src/ray/runtime/object/native_object_store.h create mode 100644 cpp/src/ray/runtime/task/native_task_submitter.cc create mode 100644 cpp/src/ray/runtime/task/native_task_submitter.h create mode 100644 cpp/src/ray/util/process_helper.cc create mode 100644 cpp/src/ray/util/process_helper.h diff --git a/cpp/include/ray/api/ray_config.h b/cpp/include/ray/api/ray_config.h index 3f5528f3f..268ae144c 100644 --- a/cpp/include/ray/api/ray_config.h +++ b/cpp/include/ray/api/ray_config.h @@ -1,5 +1,7 @@ #pragma once +#include +#include namespace ray { namespace api { @@ -11,9 +13,16 @@ enum class WorkerMode { NONE, DRIVER, WORKER }; /// TODO(Guyang Song): Make configuration complete and use to initialize. class RayConfig { public: - WorkerMode workerMode = WorkerMode::DRIVER; + WorkerMode worker_mode = WorkerMode::DRIVER; - RunMode runMode = RunMode::SINGLE_PROCESS; + RunMode run_mode = RunMode::SINGLE_PROCESS; + + std::string redis_address; + + static std::shared_ptr GetInstance(); + + private: + static std::shared_ptr config_; }; } // namespace api diff --git a/cpp/src/ray/api.cc b/cpp/src/ray/api.cc index c10ce6915..22a7f1d27 100644 --- a/cpp/src/ray/api.cc +++ b/cpp/src/ray/api.cc @@ -11,9 +11,8 @@ RayRuntime *Ray::runtime_ = nullptr; std::once_flag Ray::is_inited_; void Ray::Init() { - std::call_once(is_inited_, [] { - runtime_ = AbstractRayRuntime::DoInit(std::make_shared()); - }); + std::call_once(is_inited_, + [] { runtime_ = AbstractRayRuntime::DoInit(RayConfig::GetInstance()); }); } } // namespace api diff --git a/cpp/src/ray/ray_config.cc b/cpp/src/ray/ray_config.cc new file mode 100644 index 000000000..dee958449 --- /dev/null +++ b/cpp/src/ray/ray_config.cc @@ -0,0 +1,16 @@ + +#include + +namespace ray { +namespace api { + +std::shared_ptr RayConfig::config_ = nullptr; + +std::shared_ptr RayConfig::GetInstance() { + if (config_ == nullptr) { + config_ = std::make_shared(); + } + return config_; +} +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 3fb2491d1..c7cfcb5d9 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -7,17 +7,20 @@ #include #include #include "../util/address_helper.h" +#include "../util/process_helper.h" #include "local_mode_ray_runtime.h" +#include "native_ray_runtime.h" namespace ray { namespace api { AbstractRayRuntime *AbstractRayRuntime::DoInit(std::shared_ptr config) { AbstractRayRuntime *runtime; - if (config->runMode == RunMode::SINGLE_PROCESS) { + if (config->run_mode == RunMode::SINGLE_PROCESS) { GenerateBaseAddressOfCurrentLibrary(); runtime = new LocalModeRayRuntime(config); } else { - throw RayException("Only single process mode supported now"); + ProcessHelper::RayStart(); + runtime = new NativeRayRuntime(config); } RAY_CHECK(runtime); return runtime; diff --git a/cpp/src/ray/runtime/native_ray_runtime.cc b/cpp/src/ray/runtime/native_ray_runtime.cc new file mode 100644 index 000000000..d07189b6b --- /dev/null +++ b/cpp/src/ray/runtime/native_ray_runtime.cc @@ -0,0 +1,23 @@ + +#include "native_ray_runtime.h" + +#include +#include "../util/address_helper.h" +#include "./object/native_object_store.h" +#include "./object/object_store.h" +#include "./task/native_task_submitter.h" + +namespace ray { +namespace api { + +NativeRayRuntime::NativeRayRuntime(std::shared_ptr config) { + config_ = config; + worker_ = std::unique_ptr( + new WorkerContext(WorkerType::DRIVER, WorkerID::Nil(), JobID::Nil())); + object_store_ = std::unique_ptr(new NativeObjectStore(*this)); + task_submitter_ = std::unique_ptr(new NativeTaskSubmitter(*this)); + task_executor_ = std::unique_ptr(new TaskExecutor(*this)); +} + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/native_ray_runtime.h b/cpp/src/ray/runtime/native_ray_runtime.h new file mode 100644 index 000000000..c2d907a18 --- /dev/null +++ b/cpp/src/ray/runtime/native_ray_runtime.h @@ -0,0 +1,17 @@ + +#pragma once + +#include +#include "abstract_ray_runtime.h" +#include "ray/core.h" + +namespace ray { +namespace api { + +class NativeRayRuntime : public AbstractRayRuntime { + public: + NativeRayRuntime(std::shared_ptr config); +}; + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/object/native_object_store.cc b/cpp/src/ray/runtime/object/native_object_store.cc new file mode 100644 index 000000000..b22278abd --- /dev/null +++ b/cpp/src/ray/runtime/object/native_object_store.cc @@ -0,0 +1,35 @@ + +#include +#include +#include +#include + +#include +#include "../abstract_ray_runtime.h" +#include "native_object_store.h" + +namespace ray { +namespace api { +NativeObjectStore::NativeObjectStore(NativeRayRuntime &native_ray_tuntime) + : native_ray_tuntime_(native_ray_tuntime) {} + +void NativeObjectStore::PutRaw(const ObjectID &object_id, + std::shared_ptr data) {} + +std::shared_ptr NativeObjectStore::GetRaw(const ObjectID &object_id, + int timeout_ms) { + return nullptr; +} + +std::vector> NativeObjectStore::GetRaw( + const std::vector &ids, int timeout_ms) { + return std::vector>(); +} + +WaitResult NativeObjectStore::Wait(const std::vector &ids, int num_objects, + int timeout_ms) { + native_ray_tuntime_.GetWorkerContext(); + return WaitResult(); +} +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/object/native_object_store.h b/cpp/src/ray/runtime/object/native_object_store.h new file mode 100644 index 000000000..0f633452c --- /dev/null +++ b/cpp/src/ray/runtime/object/native_object_store.h @@ -0,0 +1,31 @@ + +#pragma once + +#include +#include "ray/core.h" + +#include "../native_ray_runtime.h" +#include "object_store.h" + +namespace ray { +namespace api { + +class NativeObjectStore : public ObjectStore { + public: + NativeObjectStore(NativeRayRuntime &native_ray_tuntime); + + WaitResult Wait(const std::vector &ids, int num_objects, int timeout_ms); + + private: + void PutRaw(const ObjectID &object_id, std::shared_ptr data); + + std::shared_ptr GetRaw(const ObjectID &object_id, int timeout_ms); + + std::vector> GetRaw(const std::vector &ids, + int timeout_ms); + + NativeRayRuntime &native_ray_tuntime_; +}; + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc new file mode 100644 index 000000000..a929aaece --- /dev/null +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -0,0 +1,30 @@ +#include "native_task_submitter.h" +#include +#include "../../util/address_helper.h" +#include "../abstract_ray_runtime.h" + +namespace ray { +namespace api { + +NativeTaskSubmitter::NativeTaskSubmitter(NativeRayRuntime &native_ray_tuntime_) + : native_ray_tuntime_(native_ray_tuntime_) {} + +ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskType type) { + return ObjectID(); +} + +ObjectID NativeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) { + return Submit(invocation, TaskType::NORMAL_TASK); +} + +ActorID NativeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args) { + return native_ray_tuntime_.GetNextActorID(); +} + +ObjectID NativeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) { + return Submit(invocation, TaskType::ACTOR_TASK); +} + +} // namespace api +} // namespace ray diff --git a/cpp/src/ray/runtime/task/native_task_submitter.h b/cpp/src/ray/runtime/task/native_task_submitter.h new file mode 100644 index 000000000..5131b5c7b --- /dev/null +++ b/cpp/src/ray/runtime/task/native_task_submitter.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include "../native_ray_runtime.h" +#include "invocation_spec.h" +#include "ray/core.h" +#include "task_submitter.h" + +namespace ray { +namespace api { + +class NativeTaskSubmitter : public TaskSubmitter { + public: + NativeTaskSubmitter(NativeRayRuntime &native_ray_tuntime); + + ObjectID SubmitTask(const InvocationSpec &invocation); + + ActorID CreateActor(RemoteFunctionPtrHolder &fptr, + std::shared_ptr args); + + ObjectID SubmitActorTask(const InvocationSpec &invocation); + + private: + NativeRayRuntime &native_ray_tuntime_; + + ObjectID Submit(const InvocationSpec &invocation, TaskType type); +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index 48599267b..23f7b6657 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -8,9 +8,13 @@ namespace ray { namespace api { +TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_) + : abstract_ray_tuntime_(abstract_ray_tuntime_) {} + // TODO(Guyang Song): Make a common task execution function used for both local mode and // cluster mode. std::unique_ptr TaskExecutor::Execute(const InvocationSpec &invocation) { + abstract_ray_tuntime_.GetWorkerContext(); return std::unique_ptr(new ObjectID()); }; diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h index f68b5b51a..f7a8675a1 100644 --- a/cpp/src/ray/runtime/task/task_executor.h +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -21,6 +21,8 @@ class ActorContext { class TaskExecutor { public: + TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_); + /// TODO(Guyang Song): support multiple tasks execution std::unique_ptr Execute(const InvocationSpec &invocation); @@ -29,6 +31,9 @@ class TaskExecutor { AbstractRayRuntime *runtime); virtual ~TaskExecutor(){}; + + private: + AbstractRayRuntime &abstract_ray_tuntime_; }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc new file mode 100644 index 000000000..da23bd0e4 --- /dev/null +++ b/cpp/src/ray/util/process_helper.cc @@ -0,0 +1,9 @@ +#include "process_helper.h" + +namespace ray { +namespace api { + +void ProcessHelper::RayStart() { return; } + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/util/process_helper.h b/cpp/src/ray/util/process_helper.h new file mode 100644 index 000000000..fd9be3a5a --- /dev/null +++ b/cpp/src/ray/util/process_helper.h @@ -0,0 +1,11 @@ +#pragma once + +namespace ray { +namespace api { + +class ProcessHelper { + public: + static void RayStart(); +}; +} // namespace api +} // namespace ray \ No newline at end of file