Add interfaces for C++ worker cluster mode (#8859)

This commit is contained in:
SongGuyang 2020-06-15 10:13:19 +08:00 committed by GitHub
parent 19cc1ae781
commit 1583cd14ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 230 additions and 7 deletions

View file

@ -1,5 +1,7 @@
#pragma once
#include <memory>
#include <string>
namespace ray {
namespace api {
@ -11,9 +13,16 @@ enum class WorkerMode { NONE, DRIVER, WORKER };
/// TODO(Guyang Song): Make configuration complete and use to initialize.
class RayConfig {
public:
WorkerMode workerMode = WorkerMode::DRIVER;
WorkerMode worker_mode = WorkerMode::DRIVER;
RunMode runMode = RunMode::SINGLE_PROCESS;
RunMode run_mode = RunMode::SINGLE_PROCESS;
std::string redis_address;
static std::shared_ptr<RayConfig> GetInstance();
private:
static std::shared_ptr<RayConfig> config_;
};
} // namespace api

View file

@ -11,9 +11,8 @@ RayRuntime *Ray::runtime_ = nullptr;
std::once_flag Ray::is_inited_;
void Ray::Init() {
std::call_once(is_inited_, [] {
runtime_ = AbstractRayRuntime::DoInit(std::make_shared<RayConfig>());
});
std::call_once(is_inited_,
[] { runtime_ = AbstractRayRuntime::DoInit(RayConfig::GetInstance()); });
}
} // namespace api

16
cpp/src/ray/ray_config.cc Normal file
View file

@ -0,0 +1,16 @@
#include <ray/api/ray_config.h>
namespace ray {
namespace api {
std::shared_ptr<RayConfig> RayConfig::config_ = nullptr;
std::shared_ptr<RayConfig> RayConfig::GetInstance() {
if (config_ == nullptr) {
config_ = std::make_shared<RayConfig>();
}
return config_;
}
} // namespace api
} // namespace ray

View file

@ -7,17 +7,20 @@
#include <ray/api/ray_config.h>
#include <ray/api/ray_exception.h>
#include "../util/address_helper.h"
#include "../util/process_helper.h"
#include "local_mode_ray_runtime.h"
#include "native_ray_runtime.h"
namespace ray {
namespace api {
AbstractRayRuntime *AbstractRayRuntime::DoInit(std::shared_ptr<RayConfig> config) {
AbstractRayRuntime *runtime;
if (config->runMode == RunMode::SINGLE_PROCESS) {
if (config->run_mode == RunMode::SINGLE_PROCESS) {
GenerateBaseAddressOfCurrentLibrary();
runtime = new LocalModeRayRuntime(config);
} else {
throw RayException("Only single process mode supported now");
ProcessHelper::RayStart();
runtime = new NativeRayRuntime(config);
}
RAY_CHECK(runtime);
return runtime;

View file

@ -0,0 +1,23 @@
#include "native_ray_runtime.h"
#include <ray/api.h>
#include "../util/address_helper.h"
#include "./object/native_object_store.h"
#include "./object/object_store.h"
#include "./task/native_task_submitter.h"
namespace ray {
namespace api {
NativeRayRuntime::NativeRayRuntime(std::shared_ptr<RayConfig> config) {
config_ = config;
worker_ = std::unique_ptr<WorkerContext>(
new WorkerContext(WorkerType::DRIVER, WorkerID::Nil(), JobID::Nil()));
object_store_ = std::unique_ptr<ObjectStore>(new NativeObjectStore(*this));
task_submitter_ = std::unique_ptr<TaskSubmitter>(new NativeTaskSubmitter(*this));
task_executor_ = std::unique_ptr<TaskExecutor>(new TaskExecutor(*this));
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,17 @@
#pragma once
#include <unordered_map>
#include "abstract_ray_runtime.h"
#include "ray/core.h"
namespace ray {
namespace api {
class NativeRayRuntime : public AbstractRayRuntime {
public:
NativeRayRuntime(std::shared_ptr<RayConfig> config);
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,35 @@
#include <algorithm>
#include <chrono>
#include <list>
#include <thread>
#include <ray/api/ray_exception.h>
#include "../abstract_ray_runtime.h"
#include "native_object_store.h"
namespace ray {
namespace api {
NativeObjectStore::NativeObjectStore(NativeRayRuntime &native_ray_tuntime)
: native_ray_tuntime_(native_ray_tuntime) {}
void NativeObjectStore::PutRaw(const ObjectID &object_id,
std::shared_ptr<msgpack::sbuffer> data) {}
std::shared_ptr<msgpack::sbuffer> NativeObjectStore::GetRaw(const ObjectID &object_id,
int timeout_ms) {
return nullptr;
}
std::vector<std::shared_ptr<msgpack::sbuffer>> NativeObjectStore::GetRaw(
const std::vector<ObjectID> &ids, int timeout_ms) {
return std::vector<std::shared_ptr<msgpack::sbuffer>>();
}
WaitResult NativeObjectStore::Wait(const std::vector<ObjectID> &ids, int num_objects,
int timeout_ms) {
native_ray_tuntime_.GetWorkerContext();
return WaitResult();
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,31 @@
#pragma once
#include <unordered_map>
#include "ray/core.h"
#include "../native_ray_runtime.h"
#include "object_store.h"
namespace ray {
namespace api {
class NativeObjectStore : public ObjectStore {
public:
NativeObjectStore(NativeRayRuntime &native_ray_tuntime);
WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects, int timeout_ms);
private:
void PutRaw(const ObjectID &object_id, std::shared_ptr<msgpack::sbuffer> data);
std::shared_ptr<msgpack::sbuffer> GetRaw(const ObjectID &object_id, int timeout_ms);
std::vector<std::shared_ptr<msgpack::sbuffer>> GetRaw(const std::vector<ObjectID> &ids,
int timeout_ms);
NativeRayRuntime &native_ray_tuntime_;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,30 @@
#include "native_task_submitter.h"
#include <ray/api/ray_exception.h>
#include "../../util/address_helper.h"
#include "../abstract_ray_runtime.h"
namespace ray {
namespace api {
NativeTaskSubmitter::NativeTaskSubmitter(NativeRayRuntime &native_ray_tuntime_)
: native_ray_tuntime_(native_ray_tuntime_) {}
ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskType type) {
return ObjectID();
}
ObjectID NativeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) {
return Submit(invocation, TaskType::NORMAL_TASK);
}
ActorID NativeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
return native_ray_tuntime_.GetNextActorID();
}
ObjectID NativeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) {
return Submit(invocation, TaskType::ACTOR_TASK);
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,31 @@
#pragma once
#include <boost/asio/thread_pool.hpp>
#include <memory>
#include <queue>
#include "../native_ray_runtime.h"
#include "invocation_spec.h"
#include "ray/core.h"
#include "task_submitter.h"
namespace ray {
namespace api {
class NativeTaskSubmitter : public TaskSubmitter {
public:
NativeTaskSubmitter(NativeRayRuntime &native_ray_tuntime);
ObjectID SubmitTask(const InvocationSpec &invocation);
ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args);
ObjectID SubmitActorTask(const InvocationSpec &invocation);
private:
NativeRayRuntime &native_ray_tuntime_;
ObjectID Submit(const InvocationSpec &invocation, TaskType type);
};
} // namespace api
} // namespace ray

View file

@ -8,9 +8,13 @@
namespace ray {
namespace api {
TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_)
: abstract_ray_tuntime_(abstract_ray_tuntime_) {}
// TODO(Guyang Song): Make a common task execution function used for both local mode and
// cluster mode.
std::unique_ptr<ObjectID> TaskExecutor::Execute(const InvocationSpec &invocation) {
abstract_ray_tuntime_.GetWorkerContext();
return std::unique_ptr<ObjectID>(new ObjectID());
};

View file

@ -21,6 +21,8 @@ class ActorContext {
class TaskExecutor {
public:
TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_);
/// TODO(Guyang Song): support multiple tasks execution
std::unique_ptr<ObjectID> Execute(const InvocationSpec &invocation);
@ -29,6 +31,9 @@ class TaskExecutor {
AbstractRayRuntime *runtime);
virtual ~TaskExecutor(){};
private:
AbstractRayRuntime &abstract_ray_tuntime_;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,9 @@
#include "process_helper.h"
namespace ray {
namespace api {
void ProcessHelper::RayStart() { return; }
} // namespace api
} // namespace ray

View file

@ -0,0 +1,11 @@
#pragma once
namespace ray {
namespace api {
class ProcessHelper {
public:
static void RayStart();
};
} // namespace api
} // namespace ray