[cpp worker] support cluster mode (#9977)

This commit is contained in:
SongGuyang 2020-09-18 11:08:18 +08:00 committed by GitHub
parent 1b295a17cb
commit 5cbc411e38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 488 additions and 260 deletions

View file

@ -21,6 +21,7 @@ cc_library(
"src/ray/util/*.h",
"src/ray/*.cc",
"src/ray/*.h",
"src/ray/worker/default_worker.cc",
]),
hdrs = glob([
"include/ray/*.h",
@ -29,8 +30,8 @@ cc_library(
]),
copts = COPTS,
linkopts = ["-ldl"],
linkstatic = True,
strip_include_prefix = "include",
# linkstatic = False,
visibility = ["//visibility:public"],
deps = [
"//:core_worker_lib",
@ -43,35 +44,6 @@ cc_library(
],
)
cc_binary(
name = "default_worker",
srcs = [
"src/ray/worker/default_worker.cc",
],
copts = COPTS,
linkstatic = True,
deps = [
"//:core_worker_lib",
],
)
genrule(
name = "ray_cpp_pkg",
srcs = [
"default_worker",
"ray_api",
],
outs = ["ray_cpp_pkg.out"],
cmd = """
WORK_DIR="$$(pwd)" &&
mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
cp -f $(location default_worker) "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
echo "$$WORK_DIR" > $@
""",
local = 1,
)
cc_binary(
name = "example",
testonly = 1,
@ -85,19 +57,6 @@ cc_binary(
],
)
cc_binary(
name = "example_cluster_mode",
testonly = 1,
srcs = glob([
"src/example/example_cluster_mode.cc",
]),
copts = COPTS,
linkstatic = False,
deps = [
"ray_api",
],
)
cc_test(
name = "api_test",
srcs = glob([
@ -113,13 +72,31 @@ cc_test(
cc_test(
name = "cluster_mode_test",
testonly = 0,
srcs = glob([
"src/ray/test/cluster/*.cc",
]),
copts = COPTS,
linkstatic = False,
linkstatic = True,
deps = [
"ray_api",
"@com_google_googletest//:gtest_main",
],
)
genrule(
name = "ray_cpp_pkg",
srcs = [
"cluster_mode_test",
"ray_api",
],
outs = ["ray_cpp_pkg.out"],
cmd = """
WORK_DIR="$$(pwd)" &&
mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
cp -f $(location cluster_mode_test) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" &&
cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
echo "$$WORK_DIR" > $@
""",
local = 1,
)

View file

@ -85,7 +85,7 @@ class Ray {
#include "api/generated/create_actors.generated.h"
private:
static RayRuntime *runtime_;
static std::shared_ptr<RayRuntime> runtime_;
static std::once_flag is_inited_;
@ -203,7 +203,7 @@ inline TaskCaller<ReturnType> Ray::TaskInternal(FuncType &func, ExecFuncType &ex
RemoteFunctionPtrHolder ptr;
ptr.function_pointer = reinterpret_cast<uintptr_t>(func);
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
return TaskCaller<ReturnType>(runtime_, ptr, buffer);
return TaskCaller<ReturnType>(runtime_.get(), ptr, buffer);
}
template <typename ActorType, typename FuncType, typename ExecFuncType,
@ -217,7 +217,7 @@ inline ActorCreator<ActorType> Ray::CreateActorInternal(FuncType &create_func,
RemoteFunctionPtrHolder ptr;
ptr.function_pointer = reinterpret_cast<uintptr_t>(create_func);
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
return ActorCreator<ActorType>(runtime_, ptr, buffer);
return ActorCreator<ActorType>(runtime_.get(), ptr, buffer);
}
template <typename ReturnType, typename ActorType, typename FuncType,
@ -233,7 +233,7 @@ inline ActorTaskCaller<ReturnType> Ray::CallActorInternal(FuncType &actor_func,
MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func);
ptr.function_pointer = reinterpret_cast<uintptr_t>(holder.value[0]);
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
return ActorTaskCaller<ReturnType>(runtime_, actor.ID(), ptr, buffer);
return ActorTaskCaller<ReturnType>(runtime_.get(), actor.ID(), ptr, buffer);
}
// TODO(barakmich): These includes are generated files that do not contain their

View file

@ -24,6 +24,14 @@ class RayConfig {
int node_manager_port = 62665;
std::string lib_name = "";
std::string store_socket = "";
std::string raylet_socket = "";
std::string session_dir = "";
static std::shared_ptr<RayConfig> GetInstance();
private:

View file

@ -36,9 +36,9 @@ class RayRuntime {
virtual WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects,
int timeout_ms) = 0;
virtual ObjectID Call(RemoteFunctionPtrHolder &fptr,
virtual ObjectID Call(const RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) = 0;
virtual ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
virtual ActorID CreateActor(const RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) = 0;
virtual ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor,
std::shared_ptr<msgpack::sbuffer> args) = 0;

View file

@ -0,0 +1,9 @@
#pragma once
namespace ray {
namespace api {
int default_worker_main(int argc, char **argv);
} // namespace api
} // namespace ray

View file

@ -1,27 +0,0 @@
/// This is a complete example of writing a distributed program using the C ++ worker API.
/// including the header
#include <ray/api.h>
#include <ray/api/ray_config.h>
#include <ray/util/logging.h>
/// using namespace
using namespace ray::api;
int main(int argc, char **argv) {
RAY_LOG(INFO) << "Start cpp worker example";
/// initialization to cluster mode
ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER;
/// Set redis ip to connect an existing ray cluster.
/// ray::api::RayConfig::GetInstance()->redis_ip = "127.0.0.1";
Ray::Init();
/// put and get object
auto obj = Ray::Put(123);
auto get_result = *(obj.Get());
RAY_LOG(INFO) << "Get result: " << get_result;
Ray::Shutdown();
}

View file

@ -7,7 +7,7 @@
namespace ray {
namespace api {
RayRuntime *Ray::runtime_ = nullptr;
std::shared_ptr<RayRuntime> Ray::runtime_ = nullptr;
std::once_flag Ray::is_inited_;
void Ray::Init() {

View file

@ -14,22 +14,30 @@
namespace ray {
namespace api {
AbstractRayRuntime *AbstractRayRuntime::DoInit(std::shared_ptr<RayConfig> config) {
AbstractRayRuntime *runtime;
std::shared_ptr<AbstractRayRuntime> AbstractRayRuntime::abstract_ray_runtime_ = nullptr;
std::shared_ptr<AbstractRayRuntime> AbstractRayRuntime::DoInit(
std::shared_ptr<RayConfig> config) {
std::shared_ptr<AbstractRayRuntime> runtime;
if (config->run_mode == RunMode::SINGLE_PROCESS) {
GenerateBaseAddressOfCurrentLibrary();
runtime = new LocalModeRayRuntime(config);
runtime = std::shared_ptr<AbstractRayRuntime>(new LocalModeRayRuntime(config));
} else {
ProcessHelper::getInstance().RayStart(config);
runtime = new NativeRayRuntime(config);
ProcessHelper::GetInstance().RayStart(config, TaskExecutor::ExecuteTask);
runtime = std::shared_ptr<AbstractRayRuntime>(new NativeRayRuntime(config));
}
runtime->config_ = config;
RAY_CHECK(runtime);
abstract_ray_runtime_ = runtime;
return runtime;
}
std::shared_ptr<AbstractRayRuntime> AbstractRayRuntime::GetInstance() {
return abstract_ray_runtime_;
}
void AbstractRayRuntime::DoShutdown(std::shared_ptr<RayConfig> config) {
if (config->run_mode == RunMode::CLUSTER) {
ProcessHelper::getInstance().RayStop(config);
ProcessHelper::GetInstance().RayStop(config);
}
}
@ -64,40 +72,41 @@ WaitResult AbstractRayRuntime::Wait(const std::vector<ObjectID> &ids, int num_ob
return object_store_->Wait(ids, num_objects, timeout_ms);
}
ObjectID AbstractRayRuntime::Call(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
InvocationSpec invocationSpec;
// TODO(Guyang Song): make it from different task
invocationSpec.task_id = TaskID::ForFakeTask();
invocationSpec.name = "";
invocationSpec.actor_id = ActorID::Nil();
invocationSpec.args = args;
invocationSpec.func_offset =
(size_t)(fptr.function_pointer - dynamic_library_base_addr);
invocationSpec.exec_func_offset =
(size_t)(fptr.exec_function_pointer - dynamic_library_base_addr);
return task_submitter_->SubmitTask(invocationSpec);
InvocationSpec BuildInvocationSpec(TaskType task_type, std::string lib_name,
const RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args,
const ActorID &actor) {
InvocationSpec invocation_spec;
invocation_spec.task_type = task_type;
invocation_spec.task_id =
TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task
invocation_spec.lib_name = lib_name;
invocation_spec.fptr = fptr;
invocation_spec.actor_id = actor;
invocation_spec.args = args;
return invocation_spec;
}
ActorID AbstractRayRuntime::CreateActor(RemoteFunctionPtrHolder &fptr,
ObjectID AbstractRayRuntime::Call(const RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
auto invocation_spec = BuildInvocationSpec(
TaskType::NORMAL_TASK, this->config_->lib_name, fptr, args, ActorID::Nil());
return task_submitter_->SubmitTask(invocation_spec);
}
ActorID AbstractRayRuntime::CreateActor(const RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
return task_submitter_->CreateActor(fptr, args);
auto invocation_spec = BuildInvocationSpec(
TaskType::ACTOR_CREATION_TASK, this->config_->lib_name, fptr, args, ActorID::Nil());
return task_submitter_->CreateActor(invocation_spec);
}
ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr,
const ActorID &actor,
std::shared_ptr<msgpack::sbuffer> args) {
InvocationSpec invocationSpec;
// TODO(Guyang Song): make it from different task
invocationSpec.task_id = TaskID::ForFakeTask();
invocationSpec.name = "";
invocationSpec.actor_id = actor;
invocationSpec.args = args;
invocationSpec.func_offset =
(size_t)(fptr.function_pointer - dynamic_library_base_addr);
invocationSpec.exec_func_offset =
(size_t)(fptr.exec_function_pointer - dynamic_library_base_addr);
return task_submitter_->SubmitActorTask(invocationSpec);
auto invocation_spec = BuildInvocationSpec(TaskType::ACTOR_TASK,
this->config_->lib_name, fptr, args, actor);
return task_submitter_->SubmitActorTask(invocation_spec);
}
const TaskID &AbstractRayRuntime::GetCurrentTaskId() {

View file

@ -31,9 +31,10 @@ class AbstractRayRuntime : public RayRuntime {
WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects, int timeout_ms);
ObjectID Call(RemoteFunctionPtrHolder &fptr, std::shared_ptr<msgpack::sbuffer> args);
ObjectID Call(const RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args);
ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
ActorID CreateActor(const RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args);
ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor,
@ -47,6 +48,8 @@ class AbstractRayRuntime : public RayRuntime {
const std::unique_ptr<WorkerContext> &GetWorkerContext();
static std::shared_ptr<AbstractRayRuntime> GetInstance();
protected:
std::shared_ptr<RayConfig> config_;
std::unique_ptr<WorkerContext> worker_;
@ -55,7 +58,8 @@ class AbstractRayRuntime : public RayRuntime {
std::unique_ptr<ObjectStore> object_store_;
private:
static AbstractRayRuntime *DoInit(std::shared_ptr<RayConfig> config);
static std::shared_ptr<AbstractRayRuntime> abstract_ray_runtime_;
static std::shared_ptr<AbstractRayRuntime> DoInit(std::shared_ptr<RayConfig> config);
static void DoShutdown(std::shared_ptr<RayConfig> config);

View file

@ -14,7 +14,7 @@ namespace api {
NativeRayRuntime::NativeRayRuntime(std::shared_ptr<RayConfig> config) {
config_ = config;
object_store_ = std::unique_ptr<ObjectStore>(new NativeObjectStore(*this));
task_submitter_ = std::unique_ptr<TaskSubmitter>(new NativeTaskSubmitter(*this));
task_submitter_ = std::unique_ptr<TaskSubmitter>(new NativeTaskSubmitter());
task_executor_ = std::unique_ptr<TaskExecutor>(new TaskExecutor(*this));
}

View file

@ -1,6 +1,7 @@
#pragma once
#include <ray/api/ray_runtime.h>
#include <msgpack.hpp>
#include "ray/core.h"
@ -10,14 +11,13 @@ namespace api {
class InvocationSpec {
public:
TaskType task_type;
TaskID task_id;
std::string name;
ActorID actor_id;
int actor_counter;
/// Remote function offset from base address.
size_t func_offset;
/// Executable function offset from base address.
size_t exec_func_offset;
std::string lib_name;
RemoteFunctionPtrHolder fptr;
std::shared_ptr<msgpack::sbuffer> args;
};
} // namespace api

View file

@ -18,14 +18,21 @@ LocalModeTaskSubmitter::LocalModeTaskSubmitter(
thread_pool_.reset(new boost::asio::thread_pool(10));
}
ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskType type) {
ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) {
/// TODO(Guyang Song): Make the infomation of TaskSpecification more reasonable
/// We just reuse the TaskSpecification class and make the single process mode work.
/// Maybe some infomation of TaskSpecification are not reasonable or invalid.
/// We will enhance this after implement the cluster mode.
if (dynamic_library_base_addr == 0) {
dynamic_library_base_addr =
GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer);
}
auto func_offset =
(size_t)(invocation.fptr.function_pointer - dynamic_library_base_addr);
auto exec_func_offset =
(size_t)(invocation.fptr.exec_function_pointer - dynamic_library_base_addr);
auto functionDescriptor = FunctionDescriptorBuilder::BuildCpp(
"SingleProcess", std::to_string(invocation.func_offset),
std::to_string(invocation.exec_func_offset));
"SingleProcess", std::to_string(func_offset), std::to_string(exec_func_offset));
rpc::Address address;
std::unordered_map<std::string, double> required_resources;
std::unordered_map<std::string, double> required_placement_resources;
@ -38,10 +45,10 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy
local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1,
required_resources, required_placement_resources,
PlacementGroupID::Nil());
if (type == TaskType::NORMAL_TASK) {
} else if (type == TaskType::ACTOR_CREATION_TASK) {
if (invocation.task_type == TaskType::NORMAL_TASK) {
} else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) {
builder.SetActorCreationTaskSpec(invocation.actor_id);
} else if (type == TaskType::ACTOR_TASK) {
} else if (invocation.task_type == TaskType::ACTOR_TASK) {
const TaskID actor_creation_task_id =
TaskID::ForActorCreationTask(invocation.actor_id);
const ObjectID actor_creation_dummy_object_id =
@ -63,17 +70,18 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy
std::shared_ptr<msgpack::sbuffer> actor;
std::shared_ptr<absl::Mutex> mutex;
if (type == TaskType::ACTOR_TASK) {
if (invocation.task_type == TaskType::ACTOR_TASK) {
absl::MutexLock lock(&actor_contexts_mutex_);
actor = actor_contexts_.at(invocation.actor_id).get()->current_actor;
mutex = actor_contexts_.at(invocation.actor_id).get()->actor_mutex;
}
AbstractRayRuntime *runtime = &local_mode_ray_tuntime_;
if (type == TaskType::ACTOR_CREATION_TASK || type == TaskType::ACTOR_TASK) {
if (invocation.task_type == TaskType::ACTOR_CREATION_TASK ||
invocation.task_type == TaskType::ACTOR_TASK) {
/// TODO(Guyang Song): Handle task dependencies.
/// Execute actor task directly in the main thread because we must guarantee the actor
/// task executed by calling order.
TaskExecutor::Invoke(task_specification, actor, runtime);
TaskExecutor::Invoke(task_specification, actor, runtime, dynamic_library_base_addr);
} else {
boost::asio::post(*thread_pool_.get(),
std::bind(
@ -81,7 +89,8 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy
if (mutex) {
absl::MutexLock lock(mutex.get());
}
TaskExecutor::Invoke(ts, actor, runtime);
TaskExecutor::Invoke(ts, actor, runtime,
dynamic_library_base_addr);
},
std::move(task_specification)));
}
@ -89,18 +98,22 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy
}
ObjectID LocalModeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) {
return Submit(invocation, TaskType::NORMAL_TASK);
return Submit(invocation);
}
ActorID LocalModeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
ActorID LocalModeTaskSubmitter::CreateActor(const InvocationSpec &invocation) {
if (dynamic_library_base_addr == 0) {
dynamic_library_base_addr =
GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer);
}
ActorID id = local_mode_ray_tuntime_.GetNextActorID();
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
ExecFunction exec_function = (ExecFunction)(fptr.exec_function_pointer);
auto data =
(*exec_function)(dynamic_library_base_addr,
(size_t)(fptr.function_pointer - dynamic_library_base_addr), args);
ExecFunction exec_function = (ExecFunction)(invocation.fptr.exec_function_pointer);
auto data = (*exec_function)(
dynamic_library_base_addr,
(size_t)(invocation.fptr.function_pointer - dynamic_library_base_addr),
invocation.args);
std::unique_ptr<ActorContext> actorContext(new ActorContext());
actorContext->current_actor = data;
absl::MutexLock lock(&actor_contexts_mutex_);
@ -109,7 +122,7 @@ ActorID LocalModeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr,
}
ObjectID LocalModeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) {
return Submit(invocation, TaskType::ACTOR_TASK);
return Submit(invocation);
}
} // namespace api

View file

@ -20,8 +20,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter {
ObjectID SubmitTask(const InvocationSpec &invocation);
ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args);
ActorID CreateActor(const InvocationSpec &invocation);
ObjectID SubmitActorTask(const InvocationSpec &invocation);
@ -34,7 +33,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter {
LocalModeRayRuntime &local_mode_ray_tuntime_;
ObjectID Submit(const InvocationSpec &invocation, TaskType type);
ObjectID Submit(const InvocationSpec &invocation);
};
} // namespace api
} // namespace ray

View file

@ -8,24 +8,86 @@
namespace ray {
namespace api {
NativeTaskSubmitter::NativeTaskSubmitter(NativeRayRuntime &native_ray_tuntime_)
: native_ray_tuntime_(native_ray_tuntime_) {}
void SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options, std::vector<ObjectID> *return_ids);
ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskType type) {
return ObjectID();
ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation) {
auto base_addr =
GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer);
auto func_offset = (size_t)(invocation.fptr.function_pointer - base_addr);
auto exec_func_offset = (size_t)(invocation.fptr.exec_function_pointer - base_addr);
auto function_descriptor = FunctionDescriptorBuilder::BuildCpp(
invocation.lib_name, std::to_string(func_offset), std::to_string(exec_func_offset));
auto ray_function = RayFunction(Language::CPP, function_descriptor);
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
reinterpret_cast<uint8_t *>(invocation.args->data()), invocation.args->size(),
true);
std::vector<std::unique_ptr<ray::TaskArg>> args;
auto task_arg = new TaskArgByValue(
std::make_shared<::ray::RayObject>(buffer, nullptr, std::vector<ObjectID>()));
args.emplace_back(task_arg);
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
std::vector<ObjectID> return_ids;
if (invocation.task_type == TaskType::ACTOR_TASK) {
core_worker.SubmitActorTask(invocation.actor_id, ray_function, args, TaskOptions(),
&return_ids);
} else {
core_worker.SubmitTask(ray_function, args, TaskOptions(), &return_ids, 1,
std::make_pair(PlacementGroupID::Nil(), -1));
}
return return_ids[0];
}
ObjectID NativeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) {
return Submit(invocation, TaskType::NORMAL_TASK);
return Submit(invocation);
}
ActorID NativeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
return native_ray_tuntime_.GetNextActorID();
ActorID NativeTaskSubmitter::CreateActor(const InvocationSpec &invocation) {
auto base_addr =
GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer);
auto func_offset = (size_t)(invocation.fptr.function_pointer - base_addr);
auto exec_func_offset = (size_t)(invocation.fptr.exec_function_pointer - base_addr);
auto function_descriptor = FunctionDescriptorBuilder::BuildCpp(
invocation.lib_name, std::to_string(func_offset), std::to_string(exec_func_offset));
auto ray_function = RayFunction(Language::CPP, function_descriptor);
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
reinterpret_cast<uint8_t *>(invocation.args->data()), invocation.args->size(),
true);
std::vector<std::unique_ptr<ray::TaskArg>> args;
auto task_arg = new TaskArgByValue(
std::make_shared<::ray::RayObject>(buffer, nullptr, std::vector<ObjectID>()));
args.emplace_back(task_arg);
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
std::unordered_map<std::string, double> resources;
std::string name = "";
ActorCreationOptions actor_options{0,
0,
1,
resources,
resources,
{},
/*is_detached=*/false,
name,
/*is_asyncio=*/false};
ActorID actor_id;
auto status = core_worker.CreateActor(ray_function, args, actor_options, "", &actor_id);
if (!status.ok()) {
throw RayException("Create actor error");
}
return actor_id;
}
ObjectID NativeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) {
return Submit(invocation, TaskType::ACTOR_TASK);
return Submit(invocation);
}
} // namespace api

View file

@ -1,9 +1,5 @@
#pragma once
#include <boost/asio/thread_pool.hpp>
#include <memory>
#include <queue>
#include "../native_ray_runtime.h"
#include "invocation_spec.h"
#include "ray/core.h"
@ -14,19 +10,14 @@ namespace api {
class NativeTaskSubmitter : public TaskSubmitter {
public:
NativeTaskSubmitter(NativeRayRuntime &native_ray_tuntime);
ObjectID SubmitTask(const InvocationSpec &invocation);
ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args);
ActorID CreateActor(const InvocationSpec &invocation);
ObjectID SubmitActorTask(const InvocationSpec &invocation);
private:
NativeRayRuntime &native_ray_tuntime_;
ObjectID Submit(const InvocationSpec &invocation, TaskType type);
ObjectID Submit(const InvocationSpec &invocation);
};
} // namespace api
} // namespace ray

View file

@ -4,11 +4,14 @@
#include <memory>
#include "../../util/address_helper.h"
#include "../../util/function_helper.h"
#include "../abstract_ray_runtime.h"
namespace ray {
namespace api {
std::shared_ptr<msgpack::sbuffer> TaskExecutor::current_actor_ = nullptr;
TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_)
: abstract_ray_tuntime_(abstract_ray_tuntime_) {}
@ -19,9 +22,79 @@ std::unique_ptr<ObjectID> TaskExecutor::Execute(const InvocationSpec &invocation
return std::unique_ptr<ObjectID>(new ObjectID());
};
Status TaskExecutor::ExecuteTask(
TaskType task_type, const std::string task_name, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids,
std::vector<std::shared_ptr<RayObject>> *results) {
RAY_LOG(INFO) << "TaskExecutor::ExecuteTask";
RAY_CHECK(ray_function.GetLanguage() == Language::CPP);
auto function_descriptor = ray_function.GetFunctionDescriptor();
RAY_CHECK(function_descriptor->Type() ==
ray::FunctionDescriptorType::kCppFunctionDescriptor);
auto typed_descriptor = function_descriptor->As<ray::CppFunctionDescriptor>();
std::string lib_name = typed_descriptor->LibName();
std::string func_offset = typed_descriptor->FunctionOffset();
std::string exec_func_offset = typed_descriptor->ExecFunctionOffset();
auto args_buffer = args[0]->GetData();
auto args_sbuffer = std::make_shared<msgpack::sbuffer>(args_buffer->Size());
/// TODO(Guyang Song): Avoid the memory copy.
args_sbuffer->write(reinterpret_cast<const char *>(args_buffer->Data()),
args_buffer->Size());
auto base_addr = FunctionHelper::GetInstance().GetBaseAddress(lib_name);
std::shared_ptr<msgpack::sbuffer> data = nullptr;
if (task_type == TaskType::ACTOR_CREATION_TASK) {
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset));
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
args_sbuffer);
current_actor_ = data;
} else if (task_type == TaskType::ACTOR_TASK) {
RAY_CHECK(current_actor_ != nullptr);
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args,
std::shared_ptr<msgpack::sbuffer> object);
ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset));
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
args_sbuffer, current_actor_);
} else { // NORMAL_TASK
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset));
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
args_sbuffer);
}
std::vector<size_t> data_sizes;
std::vector<std::shared_ptr<ray::Buffer>> metadatas;
std::vector<std::vector<ray::ObjectID>> contained_object_ids;
if (task_type != TaskType::ACTOR_CREATION_TASK) {
metadatas.push_back(nullptr);
data_sizes.push_back(data->size());
contained_object_ids.push_back(std::vector<ray::ObjectID>());
}
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().AllocateReturnObjects(
return_ids, data_sizes, metadatas, contained_object_ids, results));
if (task_type != TaskType::ACTOR_CREATION_TASK) {
auto result = (*results)[0];
if (result != nullptr) {
if (result->HasData()) {
memcpy(result->GetData()->Data(), data->data(), data_sizes[0]);
}
}
}
return ray::Status::OK();
}
void TaskExecutor::Invoke(const TaskSpecification &task_spec,
std::shared_ptr<msgpack::sbuffer> actor,
AbstractRayRuntime *runtime) {
AbstractRayRuntime *runtime, const uintptr_t base_addr) {
auto args = std::make_shared<msgpack::sbuffer>(task_spec.ArgDataSize(0));
/// TODO(Guyang Song): Avoid the memory copy.
args->write(reinterpret_cast<const char *>(task_spec.ArgData(0)),
@ -33,19 +106,21 @@ void TaskExecutor::Invoke(const TaskSpecification &task_spec,
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args,
std::shared_ptr<msgpack::sbuffer> object);
ExecFunction exec_function = (ExecFunction)(
dynamic_library_base_addr + std::stoul(typed_descriptor->ExecFunctionOffset()));
data = (*exec_function)(dynamic_library_base_addr,
std::stoul(typed_descriptor->FunctionOffset()), args, actor);
unsigned long offset = std::stoul(typed_descriptor->ExecFunctionOffset());
auto address = base_addr + offset;
ExecFunction exec_function = (ExecFunction)(address);
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
args, actor);
} else {
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
ExecFunction exec_function = (ExecFunction)(
dynamic_library_base_addr + std::stoul(typed_descriptor->ExecFunctionOffset()));
data = (*exec_function)(dynamic_library_base_addr,
std::stoul(typed_descriptor->FunctionOffset()), args);
ExecFunction exec_function =
(ExecFunction)(base_addr + std::stoul(typed_descriptor->ExecFunctionOffset()));
data =
(*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), args);
}
runtime->Put(std::move(data), task_spec.ReturnId(0));
}
} // namespace api
} // namespace ray

View file

@ -28,13 +28,22 @@ class TaskExecutor {
std::unique_ptr<ObjectID> Execute(const InvocationSpec &invocation);
static void Invoke(const TaskSpecification &task_spec,
std::shared_ptr<msgpack::sbuffer> actor,
AbstractRayRuntime *runtime);
std::shared_ptr<msgpack::sbuffer> actor, AbstractRayRuntime *runtime,
const uintptr_t base_addr);
static Status ExecuteTask(
TaskType task_type, const std::string task_name, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids,
std::vector<std::shared_ptr<RayObject>> *results);
virtual ~TaskExecutor(){};
private:
AbstractRayRuntime &abstract_ray_tuntime_;
static std::shared_ptr<msgpack::sbuffer> current_actor_;
};
} // namespace api
} // namespace ray

View file

@ -17,8 +17,7 @@ class TaskSubmitter {
virtual ObjectID SubmitTask(const InvocationSpec &invocation) = 0;
virtual ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) = 0;
virtual ActorID CreateActor(const InvocationSpec &invocation) = 0;
virtual ObjectID SubmitActorTask(const InvocationSpec &invocation) = 0;
};

View file

@ -2,14 +2,65 @@
#include <gtest/gtest.h>
#include <ray/api.h>
#include <ray/api/ray_config.h>
#include <ray/experimental/default_worker.h>
using namespace ray::api;
TEST(RayClusterModeTest, PutTest) {
/// general function of user code
int Plus1(int x) { return x + 1; }
/// a class of user code
class Counter {
public:
int count;
Counter(int init) { count = init; }
static Counter *FactoryCreate(int init) { return new Counter(init); }
/// non static function
int Add(int x) {
count += x;
return count;
}
};
TEST(RayClusterModeTest, FullTest) {
/// initialization to cluster mode
ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER;
/// TODO(Guyang Song): add the dynamic library name
ray::api::RayConfig::GetInstance()->lib_name = "";
Ray::Init();
auto obj1 = Ray::Put(12345);
auto i1 = obj1.Get();
EXPECT_EQ(12345, *i1);
/// put and get object
auto obj = Ray::Put(12345);
auto get_result = *(Ray::Get(obj));
EXPECT_EQ(12345, get_result);
auto task_obj = Ray::Task(Plus1, 5).Remote();
int task_result = *(Ray::Get(task_obj));
EXPECT_EQ(6, task_result);
ActorHandle<Counter> actor = Ray::Actor(Counter::FactoryCreate, 1).Remote();
auto actor_object = actor.Task(&Counter::Add, 5).Remote();
int actor_task_result = *(Ray::Get(actor_object));
EXPECT_EQ(6, actor_task_result);
Ray::Shutdown();
}
/// TODO(Guyang Song): Separate default worker from this test.
/// Currently, we compile `default_worker` and `cluster_mode_test` in one single binary,
/// to work around a symbol conflicting issue.
/// This is the main function of the binary, and we use the `is_default_worker` arg to
/// tell if this binary is used as `default_worker` or `cluster_mode_test`.
int main(int argc, char **argv) {
const char *default_worker_magic = "is_default_worker";
/// `is_default_worker` is the last arg of `argv`
if (argc > 1 &&
memcmp(argv[argc - 1], default_worker_magic, strlen(default_worker_magic)) == 0) {
default_worker_main(argc, argv);
return 0;
}
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View file

@ -4,13 +4,13 @@
namespace ray {
namespace api {
uintptr_t dynamic_library_base_addr;
extern "C" void GenerateBaseAddressOfCurrentLibrary() {
uintptr_t GetBaseAddressOfLibraryFromAddr(void *addr) {
Dl_info info;
dladdr((void *)GenerateBaseAddressOfCurrentLibrary, &info);
dynamic_library_base_addr = (uintptr_t)info.dli_fbase;
return;
dladdr(addr, &info);
return (uintptr_t)info.dli_fbase;
}
uintptr_t dynamic_library_base_addr =
GetBaseAddressOfLibraryFromAddr((void *)GetBaseAddressOfLibraryFromAddr);
} // namespace api
} // namespace ray

View file

@ -8,7 +8,7 @@ namespace api {
/// A base address which is used to calculate function offset
extern uintptr_t dynamic_library_base_addr;
/// A fixed C language function which help to get infomation from dladdr
extern "C" void GenerateBaseAddressOfCurrentLibrary();
/// Get the base address of libary which the function address belongs to.
uintptr_t GetBaseAddressOfLibraryFromAddr(void *addr);
} // namespace api
} // namespace ray

View file

@ -0,0 +1,45 @@
#include "function_helper.h"
#include <dlfcn.h>
#include <stdio.h>
#include <string.h>
#include <memory>
#include "address_helper.h"
#include "ray/core.h"
namespace ray {
namespace api {
uintptr_t base_addr = 0;
static const uintptr_t BaseAddressForHandle(void *handle) {
/// TODO(Guyang Song): Implement a cross-platform function.
/// Not Implemented.
return -1;
}
uintptr_t FunctionHelper::LoadLibrary(std::string lib_name) {
if (dynamic_library_base_addr != 0) {
/// Base address has been generated.
return dynamic_library_base_addr;
}
/// Generate base address from library.
RAY_LOG(INFO) << "Start load library " << lib_name;
void *example = dlopen(lib_name.c_str(), RTLD_LAZY);
uintptr_t base_addr = BaseAddressForHandle(example);
RAY_CHECK(base_addr > 0);
RAY_LOG(INFO) << "Loaded library " << lib_name << " to base address " << base_addr;
loaded_library_.emplace(lib_name, base_addr);
return base_addr;
}
uintptr_t FunctionHelper::GetBaseAddress(std::string lib_name) {
auto got = loaded_library_.find(lib_name);
if (got == loaded_library_.end()) {
return LoadLibrary(lib_name);
}
return got->second;
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,22 @@
#pragma once
#include <string>
#include <unordered_map>
namespace ray {
namespace api {
class FunctionHelper {
public:
uintptr_t GetBaseAddress(std::string lib_name);
static FunctionHelper &GetInstance() {
static FunctionHelper functionHelper;
return functionHelper;
}
private:
std::unordered_map<std::string, uintptr_t> loaded_library_;
uintptr_t LoadLibrary(std::string lib_name);
};
} // namespace api
} // namespace ray

View file

@ -23,10 +23,11 @@ static std::string GetSessionDir(std::string redis_ip, int port, std::string pas
return session_dir;
}
static void StartRayNode(int redis_port, std::string redis_password) {
std::vector<std::string> cmdargs({"ray", "start", "--head", "--redis-port",
std::to_string(redis_port), "--redis-password",
redis_password});
static void StartRayNode(int redis_port, std::string redis_password,
int node_manager_port) {
std::vector<std::string> cmdargs(
{"ray", "start", "--head", "--port", std::to_string(redis_port), "--redis-password",
redis_password, "--node-manager-port", std::to_string(node_manager_port)});
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
sleep(5);
@ -37,18 +38,28 @@ static void StopRayNode() {
std::vector<std::string> cmdargs({"ray", "stop"});
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
usleep(1000 * 1000);
sleep(3);
return;
}
void ProcessHelper::RayStart(std::shared_ptr<RayConfig> config) {
void ProcessHelper::RayStart(std::shared_ptr<RayConfig> config,
CoreWorkerOptions::TaskExecutionCallback callback) {
std::string redis_ip = config->redis_ip;
if (redis_ip.empty()) {
if (config->worker_type == WorkerType::DRIVER && redis_ip.empty()) {
redis_ip = "127.0.0.1";
StartRayNode(config->redis_port, config->redis_password);
StartRayNode(config->redis_port, config->redis_password, config->node_manager_port);
}
auto session_dir = GetSessionDir(redis_ip, config->redis_port, config->redis_password);
auto session_dir =
config->session_dir.empty()
? GetSessionDir(redis_ip, config->redis_port, config->redis_password)
: config->session_dir;
auto store_socket = config->store_socket.empty() ? session_dir + "/sockets/plasma_store"
: config->store_socket;
auto raylet_socket = config->raylet_socket.empty() ? session_dir + "/sockets/raylet"
: config->raylet_socket;
gcs::GcsClientOptions gcs_options =
gcs::GcsClientOptions(redis_ip, config->redis_port, config->redis_password);
@ -56,11 +67,12 @@ void ProcessHelper::RayStart(std::shared_ptr<RayConfig> config) {
CoreWorkerOptions options;
options.worker_type = config->worker_type;
options.language = Language::CPP;
options.store_socket = session_dir + "/sockets/plasma_store";
options.raylet_socket = session_dir + "/sockets/raylet";
options.store_socket = store_socket;
options.raylet_socket = raylet_socket;
options.job_id = JobID::FromInt(1);
options.gcs_options = gcs_options;
options.enable_logging = true;
options.log_dir = session_dir + "/logs";
options.install_failure_signal_handler = true;
options.node_ip_address = "127.0.0.1";
options.node_manager_port = config->node_manager_port;
@ -69,6 +81,7 @@ void ProcessHelper::RayStart(std::shared_ptr<RayConfig> config) {
options.ref_counting_enabled = true;
options.num_workers = 1;
options.metrics_agent_port = -1;
options.task_execution_callback = callback;
CoreWorkerProcess::Initialize(options);
}

View file

@ -8,10 +8,11 @@ namespace api {
class ProcessHelper {
public:
void RayStart(std::shared_ptr<RayConfig> config);
void RayStart(std::shared_ptr<RayConfig> config,
CoreWorkerOptions::TaskExecutionCallback callback);
void RayStop(std::shared_ptr<RayConfig> config);
static ProcessHelper &getInstance() {
static ProcessHelper &GetInstance() {
static ProcessHelper processHelper;
return processHelper;
}

View file

@ -1,71 +1,34 @@
#define BOOST_BIND_NO_PLACEHOLDERS
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
using namespace std::placeholders;
#include <ray/api.h>
#include <ray/api/ray_config.h>
#include <ray/util/logging.h>
namespace ray {
namespace api {
class DefaultWorker {
public:
DefaultWorker(const std::string &store_socket, const std::string &raylet_socket,
int node_manager_port, const gcs::GcsClientOptions &gcs_options,
const std::string &session_dir) {
CoreWorkerOptions options;
options.worker_type = WorkerType::WORKER;
options.language = Language::CPP;
options.store_socket = store_socket;
options.raylet_socket = raylet_socket;
options.job_id = JobID::FromInt(1);
options.gcs_options = gcs_options;
options.enable_logging = true;
options.log_dir = session_dir + "/logs";
options.install_failure_signal_handler = true;
options.node_ip_address = "127.0.0.1";
options.node_manager_port = node_manager_port;
options.raylet_ip_address = "127.0.0.1";
options.task_execution_callback =
std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8);
options.ref_counting_enabled = true;
options.num_workers = 1;
options.metrics_agent_port = -1;
CoreWorkerProcess::Initialize(options);
}
void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); }
private:
Status ExecuteTask(TaskType task_type, const std::string task_name,
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids,
std::vector<std::shared_ptr<RayObject>> *results) {
/// TODO(Guyang Song): Make task execution worked.
return Status::TypeError("Task executor not implemented");
}
};
} // namespace api
} // namespace ray
int main(int argc, char **argv) {
int default_worker_main(int argc, char **argv) {
RAY_LOG(INFO) << "CPP default worker started";
RAY_CHECK(argc == 8);
RAY_CHECK(argc == 6);
auto store_socket = std::string(argv[1]);
auto raylet_socket = std::string(argv[2]);
auto node_manager_port = std::stoi(std::string(argv[3]));
auto redis_password = std::string(std::string(argv[4]));
auto session_dir = std::string(std::string(argv[5]));
auto config = ray::api::RayConfig::GetInstance();
config->run_mode = RunMode::CLUSTER;
config->worker_type = ray::WorkerType::WORKER;
config->store_socket = std::string(argv[1]);
config->raylet_socket = std::string(argv[2]);
config->node_manager_port = std::stoi(std::string(argv[3]));
std::string redis_address = std::string(std::string(argv[4]));
auto pos = redis_address.find(':');
RAY_CHECK(pos != std::string::npos);
config->redis_ip = redis_address.substr(0, pos);
config->redis_port = std::stoi(redis_address.substr(pos + 1, redis_address.length()));
config->redis_password = std::string(std::string(argv[5]));
config->session_dir = std::string(std::string(argv[6]));
/// TODO(Guyang Song): Delete this hard code and get address from redis.
ray::gcs::GcsClientOptions gcs_options("127.0.0.1", 6379, redis_password);
ray::api::DefaultWorker worker(store_socket, raylet_socket, node_manager_port,
gcs_options, session_dir);
worker.RunTaskExecutionLoop();
Ray::Init();
::ray::CoreWorkerProcess::RunTaskExecutionLoop();
return 0;
}
} // namespace api
} // namespace ray

View file

@ -1484,9 +1484,14 @@ def build_cpp_worker_command(
Returns:
The command string for starting CPP worker.
"""
# TODO(Guyang Song): Remove the arg is_default_worker.
# See `cluster_mode_test.cc` for why this workaround is currently needed
# for C++ workers.
command = [
DEFAULT_WORKER_EXECUTABLE, plasma_store_name, raylet_name,
str(node_manager_port), redis_password, session_dir
str(node_manager_port), redis_address, redis_password, session_dir,
"is_default_worker"
]
return command