From 5cbc411e387469dbafd9a8046337ad87ae31bf26 Mon Sep 17 00:00:00 2001 From: SongGuyang Date: Fri, 18 Sep 2020 11:08:18 +0800 Subject: [PATCH] [cpp worker] support cluster mode (#9977) --- cpp/BUILD.bazel | 65 +++++-------- cpp/include/ray/api.h | 8 +- cpp/include/ray/api/ray_config.h | 8 ++ cpp/include/ray/api/ray_runtime.h | 4 +- cpp/include/ray/experimental/default_worker.h | 9 ++ cpp/src/example/example_cluster_mode.cc | 27 ------ cpp/src/ray/api.cc | 2 +- cpp/src/ray/runtime/abstract_ray_runtime.cc | 75 ++++++++------- cpp/src/ray/runtime/abstract_ray_runtime.h | 10 +- cpp/src/ray/runtime/native_ray_runtime.cc | 2 +- cpp/src/ray/runtime/task/invocation_spec.h | 8 +- .../runtime/task/local_mode_task_submitter.cc | 49 ++++++---- .../runtime/task/local_mode_task_submitter.h | 5 +- .../ray/runtime/task/native_task_submitter.cc | 80 ++++++++++++++-- .../ray/runtime/task/native_task_submitter.h | 13 +-- cpp/src/ray/runtime/task/task_executor.cc | 93 +++++++++++++++++-- cpp/src/ray/runtime/task/task_executor.h | 13 ++- cpp/src/ray/runtime/task/task_submitter.h | 3 +- cpp/src/ray/test/cluster/cluster_mode_test.cc | 59 +++++++++++- cpp/src/ray/util/address_helper.cc | 12 +-- cpp/src/ray/util/address_helper.h | 4 +- cpp/src/ray/util/function_helper.cc | 45 +++++++++ cpp/src/ray/util/function_helper.h | 22 +++++ cpp/src/ray/util/process_helper.cc | 35 ++++--- cpp/src/ray/util/process_helper.h | 5 +- cpp/src/ray/worker/default_worker.cc | 85 +++++------------ python/ray/services.py | 7 +- 27 files changed, 488 insertions(+), 260 deletions(-) create mode 100644 cpp/include/ray/experimental/default_worker.h delete mode 100644 cpp/src/example/example_cluster_mode.cc create mode 100644 cpp/src/ray/util/function_helper.cc create mode 100644 cpp/src/ray/util/function_helper.h diff --git a/cpp/BUILD.bazel b/cpp/BUILD.bazel index ed8dfd626..af82486a0 100644 --- a/cpp/BUILD.bazel +++ b/cpp/BUILD.bazel @@ -21,6 +21,7 @@ cc_library( "src/ray/util/*.h", "src/ray/*.cc", "src/ray/*.h", + "src/ray/worker/default_worker.cc", ]), hdrs = glob([ "include/ray/*.h", @@ -29,8 +30,8 @@ cc_library( ]), copts = COPTS, linkopts = ["-ldl"], + linkstatic = True, strip_include_prefix = "include", - # linkstatic = False, visibility = ["//visibility:public"], deps = [ "//:core_worker_lib", @@ -43,35 +44,6 @@ cc_library( ], ) -cc_binary( - name = "default_worker", - srcs = [ - "src/ray/worker/default_worker.cc", - ], - copts = COPTS, - linkstatic = True, - deps = [ - "//:core_worker_lib", - ], -) - -genrule( - name = "ray_cpp_pkg", - srcs = [ - "default_worker", - "ray_api", - ], - outs = ["ray_cpp_pkg.out"], - cmd = """ - WORK_DIR="$$(pwd)" && - mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" && - cp -f $(location default_worker) "$$WORK_DIR/python/ray/core/src/ray/cpp/" && - cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" && - echo "$$WORK_DIR" > $@ - """, - local = 1, -) - cc_binary( name = "example", testonly = 1, @@ -85,19 +57,6 @@ cc_binary( ], ) -cc_binary( - name = "example_cluster_mode", - testonly = 1, - srcs = glob([ - "src/example/example_cluster_mode.cc", - ]), - copts = COPTS, - linkstatic = False, - deps = [ - "ray_api", - ], -) - cc_test( name = "api_test", srcs = glob([ @@ -113,13 +72,31 @@ cc_test( cc_test( name = "cluster_mode_test", + testonly = 0, srcs = glob([ "src/ray/test/cluster/*.cc", ]), copts = COPTS, - linkstatic = False, + linkstatic = True, deps = [ "ray_api", "@com_google_googletest//:gtest_main", ], ) + +genrule( + name = "ray_cpp_pkg", + srcs = [ + "cluster_mode_test", + "ray_api", + ], + outs = ["ray_cpp_pkg.out"], + cmd = """ + WORK_DIR="$$(pwd)" && + mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" && + cp -f $(location cluster_mode_test) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" && + cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" && + echo "$$WORK_DIR" > $@ + """, + local = 1, +) diff --git a/cpp/include/ray/api.h b/cpp/include/ray/api.h index 53f8026e3..caf86558f 100644 --- a/cpp/include/ray/api.h +++ b/cpp/include/ray/api.h @@ -85,7 +85,7 @@ class Ray { #include "api/generated/create_actors.generated.h" private: - static RayRuntime *runtime_; + static std::shared_ptr runtime_; static std::once_flag is_inited_; @@ -203,7 +203,7 @@ inline TaskCaller Ray::TaskInternal(FuncType &func, ExecFuncType &ex RemoteFunctionPtrHolder ptr; ptr.function_pointer = reinterpret_cast(func); ptr.exec_function_pointer = reinterpret_cast(exec_func); - return TaskCaller(runtime_, ptr, buffer); + return TaskCaller(runtime_.get(), ptr, buffer); } template Ray::CreateActorInternal(FuncType &create_func, RemoteFunctionPtrHolder ptr; ptr.function_pointer = reinterpret_cast(create_func); ptr.exec_function_pointer = reinterpret_cast(exec_func); - return ActorCreator(runtime_, ptr, buffer); + return ActorCreator(runtime_.get(), ptr, buffer); } template Ray::CallActorInternal(FuncType &actor_func, MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func); ptr.function_pointer = reinterpret_cast(holder.value[0]); ptr.exec_function_pointer = reinterpret_cast(exec_func); - return ActorTaskCaller(runtime_, actor.ID(), ptr, buffer); + return ActorTaskCaller(runtime_.get(), actor.ID(), ptr, buffer); } // TODO(barakmich): These includes are generated files that do not contain their diff --git a/cpp/include/ray/api/ray_config.h b/cpp/include/ray/api/ray_config.h index f693e99fc..b6bc55d5d 100644 --- a/cpp/include/ray/api/ray_config.h +++ b/cpp/include/ray/api/ray_config.h @@ -24,6 +24,14 @@ class RayConfig { int node_manager_port = 62665; + std::string lib_name = ""; + + std::string store_socket = ""; + + std::string raylet_socket = ""; + + std::string session_dir = ""; + static std::shared_ptr GetInstance(); private: diff --git a/cpp/include/ray/api/ray_runtime.h b/cpp/include/ray/api/ray_runtime.h index ff39c5b42..45d06242c 100644 --- a/cpp/include/ray/api/ray_runtime.h +++ b/cpp/include/ray/api/ray_runtime.h @@ -36,9 +36,9 @@ class RayRuntime { virtual WaitResult Wait(const std::vector &ids, int num_objects, int timeout_ms) = 0; - virtual ObjectID Call(RemoteFunctionPtrHolder &fptr, + virtual ObjectID Call(const RemoteFunctionPtrHolder &fptr, std::shared_ptr args) = 0; - virtual ActorID CreateActor(RemoteFunctionPtrHolder &fptr, + virtual ActorID CreateActor(const RemoteFunctionPtrHolder &fptr, std::shared_ptr args) = 0; virtual ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, std::shared_ptr args) = 0; diff --git a/cpp/include/ray/experimental/default_worker.h b/cpp/include/ray/experimental/default_worker.h new file mode 100644 index 000000000..2c0e02259 --- /dev/null +++ b/cpp/include/ray/experimental/default_worker.h @@ -0,0 +1,9 @@ +#pragma once + +namespace ray { +namespace api { + +int default_worker_main(int argc, char **argv); + +} // namespace api +} // namespace ray diff --git a/cpp/src/example/example_cluster_mode.cc b/cpp/src/example/example_cluster_mode.cc deleted file mode 100644 index ca178fc8b..000000000 --- a/cpp/src/example/example_cluster_mode.cc +++ /dev/null @@ -1,27 +0,0 @@ - -/// This is a complete example of writing a distributed program using the C ++ worker API. - -/// including the header -#include -#include -#include - -/// using namespace -using namespace ray::api; - -int main(int argc, char **argv) { - RAY_LOG(INFO) << "Start cpp worker example"; - - /// initialization to cluster mode - ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER; - /// Set redis ip to connect an existing ray cluster. - /// ray::api::RayConfig::GetInstance()->redis_ip = "127.0.0.1"; - Ray::Init(); - - /// put and get object - auto obj = Ray::Put(123); - auto get_result = *(obj.Get()); - - RAY_LOG(INFO) << "Get result: " << get_result; - Ray::Shutdown(); -} diff --git a/cpp/src/ray/api.cc b/cpp/src/ray/api.cc index 60eb75819..5bd603875 100644 --- a/cpp/src/ray/api.cc +++ b/cpp/src/ray/api.cc @@ -7,7 +7,7 @@ namespace ray { namespace api { -RayRuntime *Ray::runtime_ = nullptr; +std::shared_ptr Ray::runtime_ = nullptr; std::once_flag Ray::is_inited_; void Ray::Init() { diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index d2016863c..d5a102470 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -14,22 +14,30 @@ namespace ray { namespace api { -AbstractRayRuntime *AbstractRayRuntime::DoInit(std::shared_ptr config) { - AbstractRayRuntime *runtime; +std::shared_ptr AbstractRayRuntime::abstract_ray_runtime_ = nullptr; + +std::shared_ptr AbstractRayRuntime::DoInit( + std::shared_ptr config) { + std::shared_ptr runtime; if (config->run_mode == RunMode::SINGLE_PROCESS) { - GenerateBaseAddressOfCurrentLibrary(); - runtime = new LocalModeRayRuntime(config); + runtime = std::shared_ptr(new LocalModeRayRuntime(config)); } else { - ProcessHelper::getInstance().RayStart(config); - runtime = new NativeRayRuntime(config); + ProcessHelper::GetInstance().RayStart(config, TaskExecutor::ExecuteTask); + runtime = std::shared_ptr(new NativeRayRuntime(config)); } + runtime->config_ = config; RAY_CHECK(runtime); + abstract_ray_runtime_ = runtime; return runtime; } +std::shared_ptr AbstractRayRuntime::GetInstance() { + return abstract_ray_runtime_; +} + void AbstractRayRuntime::DoShutdown(std::shared_ptr config) { if (config->run_mode == RunMode::CLUSTER) { - ProcessHelper::getInstance().RayStop(config); + ProcessHelper::GetInstance().RayStop(config); } } @@ -64,40 +72,41 @@ WaitResult AbstractRayRuntime::Wait(const std::vector &ids, int num_ob return object_store_->Wait(ids, num_objects, timeout_ms); } -ObjectID AbstractRayRuntime::Call(RemoteFunctionPtrHolder &fptr, - std::shared_ptr args) { - InvocationSpec invocationSpec; - // TODO(Guyang Song): make it from different task - invocationSpec.task_id = TaskID::ForFakeTask(); - invocationSpec.name = ""; - invocationSpec.actor_id = ActorID::Nil(); - invocationSpec.args = args; - invocationSpec.func_offset = - (size_t)(fptr.function_pointer - dynamic_library_base_addr); - invocationSpec.exec_func_offset = - (size_t)(fptr.exec_function_pointer - dynamic_library_base_addr); - return task_submitter_->SubmitTask(invocationSpec); +InvocationSpec BuildInvocationSpec(TaskType task_type, std::string lib_name, + const RemoteFunctionPtrHolder &fptr, + std::shared_ptr args, + const ActorID &actor) { + InvocationSpec invocation_spec; + invocation_spec.task_type = task_type; + invocation_spec.task_id = + TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task + invocation_spec.lib_name = lib_name; + invocation_spec.fptr = fptr; + invocation_spec.actor_id = actor; + invocation_spec.args = args; + return invocation_spec; } -ActorID AbstractRayRuntime::CreateActor(RemoteFunctionPtrHolder &fptr, +ObjectID AbstractRayRuntime::Call(const RemoteFunctionPtrHolder &fptr, + std::shared_ptr args) { + auto invocation_spec = BuildInvocationSpec( + TaskType::NORMAL_TASK, this->config_->lib_name, fptr, args, ActorID::Nil()); + return task_submitter_->SubmitTask(invocation_spec); +} + +ActorID AbstractRayRuntime::CreateActor(const RemoteFunctionPtrHolder &fptr, std::shared_ptr args) { - return task_submitter_->CreateActor(fptr, args); + auto invocation_spec = BuildInvocationSpec( + TaskType::ACTOR_CREATION_TASK, this->config_->lib_name, fptr, args, ActorID::Nil()); + return task_submitter_->CreateActor(invocation_spec); } ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, std::shared_ptr args) { - InvocationSpec invocationSpec; - // TODO(Guyang Song): make it from different task - invocationSpec.task_id = TaskID::ForFakeTask(); - invocationSpec.name = ""; - invocationSpec.actor_id = actor; - invocationSpec.args = args; - invocationSpec.func_offset = - (size_t)(fptr.function_pointer - dynamic_library_base_addr); - invocationSpec.exec_func_offset = - (size_t)(fptr.exec_function_pointer - dynamic_library_base_addr); - return task_submitter_->SubmitActorTask(invocationSpec); + auto invocation_spec = BuildInvocationSpec(TaskType::ACTOR_TASK, + this->config_->lib_name, fptr, args, actor); + return task_submitter_->SubmitActorTask(invocation_spec); } const TaskID &AbstractRayRuntime::GetCurrentTaskId() { diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.h b/cpp/src/ray/runtime/abstract_ray_runtime.h index 00535e9fa..9f41fd6f3 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.h +++ b/cpp/src/ray/runtime/abstract_ray_runtime.h @@ -31,9 +31,10 @@ class AbstractRayRuntime : public RayRuntime { WaitResult Wait(const std::vector &ids, int num_objects, int timeout_ms); - ObjectID Call(RemoteFunctionPtrHolder &fptr, std::shared_ptr args); + ObjectID Call(const RemoteFunctionPtrHolder &fptr, + std::shared_ptr args); - ActorID CreateActor(RemoteFunctionPtrHolder &fptr, + ActorID CreateActor(const RemoteFunctionPtrHolder &fptr, std::shared_ptr args); ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor, @@ -47,6 +48,8 @@ class AbstractRayRuntime : public RayRuntime { const std::unique_ptr &GetWorkerContext(); + static std::shared_ptr GetInstance(); + protected: std::shared_ptr config_; std::unique_ptr worker_; @@ -55,7 +58,8 @@ class AbstractRayRuntime : public RayRuntime { std::unique_ptr object_store_; private: - static AbstractRayRuntime *DoInit(std::shared_ptr config); + static std::shared_ptr abstract_ray_runtime_; + static std::shared_ptr DoInit(std::shared_ptr config); static void DoShutdown(std::shared_ptr config); diff --git a/cpp/src/ray/runtime/native_ray_runtime.cc b/cpp/src/ray/runtime/native_ray_runtime.cc index 04bb17e0d..478e8c611 100644 --- a/cpp/src/ray/runtime/native_ray_runtime.cc +++ b/cpp/src/ray/runtime/native_ray_runtime.cc @@ -14,7 +14,7 @@ namespace api { NativeRayRuntime::NativeRayRuntime(std::shared_ptr config) { config_ = config; object_store_ = std::unique_ptr(new NativeObjectStore(*this)); - task_submitter_ = std::unique_ptr(new NativeTaskSubmitter(*this)); + task_submitter_ = std::unique_ptr(new NativeTaskSubmitter()); task_executor_ = std::unique_ptr(new TaskExecutor(*this)); } diff --git a/cpp/src/ray/runtime/task/invocation_spec.h b/cpp/src/ray/runtime/task/invocation_spec.h index b436ef5fc..07f840b31 100644 --- a/cpp/src/ray/runtime/task/invocation_spec.h +++ b/cpp/src/ray/runtime/task/invocation_spec.h @@ -1,6 +1,7 @@ #pragma once +#include #include #include "ray/core.h" @@ -10,14 +11,13 @@ namespace api { class InvocationSpec { public: + TaskType task_type; TaskID task_id; std::string name; ActorID actor_id; int actor_counter; - /// Remote function offset from base address. - size_t func_offset; - /// Executable function offset from base address. - size_t exec_func_offset; + std::string lib_name; + RemoteFunctionPtrHolder fptr; std::shared_ptr args; }; } // namespace api diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 8f2f28f2d..82896a5af 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -18,14 +18,21 @@ LocalModeTaskSubmitter::LocalModeTaskSubmitter( thread_pool_.reset(new boost::asio::thread_pool(10)); } -ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskType type) { +ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) { /// TODO(Guyang Song): Make the infomation of TaskSpecification more reasonable /// We just reuse the TaskSpecification class and make the single process mode work. /// Maybe some infomation of TaskSpecification are not reasonable or invalid. /// We will enhance this after implement the cluster mode. + if (dynamic_library_base_addr == 0) { + dynamic_library_base_addr = + GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer); + } + auto func_offset = + (size_t)(invocation.fptr.function_pointer - dynamic_library_base_addr); + auto exec_func_offset = + (size_t)(invocation.fptr.exec_function_pointer - dynamic_library_base_addr); auto functionDescriptor = FunctionDescriptorBuilder::BuildCpp( - "SingleProcess", std::to_string(invocation.func_offset), - std::to_string(invocation.exec_func_offset)); + "SingleProcess", std::to_string(func_offset), std::to_string(exec_func_offset)); rpc::Address address; std::unordered_map required_resources; std::unordered_map required_placement_resources; @@ -38,10 +45,10 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1, required_resources, required_placement_resources, PlacementGroupID::Nil()); - if (type == TaskType::NORMAL_TASK) { - } else if (type == TaskType::ACTOR_CREATION_TASK) { + if (invocation.task_type == TaskType::NORMAL_TASK) { + } else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) { builder.SetActorCreationTaskSpec(invocation.actor_id); - } else if (type == TaskType::ACTOR_TASK) { + } else if (invocation.task_type == TaskType::ACTOR_TASK) { const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(invocation.actor_id); const ObjectID actor_creation_dummy_object_id = @@ -63,17 +70,18 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy std::shared_ptr actor; std::shared_ptr mutex; - if (type == TaskType::ACTOR_TASK) { + if (invocation.task_type == TaskType::ACTOR_TASK) { absl::MutexLock lock(&actor_contexts_mutex_); actor = actor_contexts_.at(invocation.actor_id).get()->current_actor; mutex = actor_contexts_.at(invocation.actor_id).get()->actor_mutex; } AbstractRayRuntime *runtime = &local_mode_ray_tuntime_; - if (type == TaskType::ACTOR_CREATION_TASK || type == TaskType::ACTOR_TASK) { + if (invocation.task_type == TaskType::ACTOR_CREATION_TASK || + invocation.task_type == TaskType::ACTOR_TASK) { /// TODO(Guyang Song): Handle task dependencies. /// Execute actor task directly in the main thread because we must guarantee the actor /// task executed by calling order. - TaskExecutor::Invoke(task_specification, actor, runtime); + TaskExecutor::Invoke(task_specification, actor, runtime, dynamic_library_base_addr); } else { boost::asio::post(*thread_pool_.get(), std::bind( @@ -81,7 +89,8 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy if (mutex) { absl::MutexLock lock(mutex.get()); } - TaskExecutor::Invoke(ts, actor, runtime); + TaskExecutor::Invoke(ts, actor, runtime, + dynamic_library_base_addr); }, std::move(task_specification))); } @@ -89,18 +98,22 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy } ObjectID LocalModeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) { - return Submit(invocation, TaskType::NORMAL_TASK); + return Submit(invocation); } -ActorID LocalModeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr, - std::shared_ptr args) { +ActorID LocalModeTaskSubmitter::CreateActor(const InvocationSpec &invocation) { + if (dynamic_library_base_addr == 0) { + dynamic_library_base_addr = + GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer); + } ActorID id = local_mode_ray_tuntime_.GetNextActorID(); typedef std::shared_ptr (*ExecFunction)( uintptr_t base_addr, size_t func_offset, std::shared_ptr args); - ExecFunction exec_function = (ExecFunction)(fptr.exec_function_pointer); - auto data = - (*exec_function)(dynamic_library_base_addr, - (size_t)(fptr.function_pointer - dynamic_library_base_addr), args); + ExecFunction exec_function = (ExecFunction)(invocation.fptr.exec_function_pointer); + auto data = (*exec_function)( + dynamic_library_base_addr, + (size_t)(invocation.fptr.function_pointer - dynamic_library_base_addr), + invocation.args); std::unique_ptr actorContext(new ActorContext()); actorContext->current_actor = data; absl::MutexLock lock(&actor_contexts_mutex_); @@ -109,7 +122,7 @@ ActorID LocalModeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr, } ObjectID LocalModeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) { - return Submit(invocation, TaskType::ACTOR_TASK); + return Submit(invocation); } } // namespace api diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.h b/cpp/src/ray/runtime/task/local_mode_task_submitter.h index e2470df62..22ad1f39d 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.h +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.h @@ -20,8 +20,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter { ObjectID SubmitTask(const InvocationSpec &invocation); - ActorID CreateActor(RemoteFunctionPtrHolder &fptr, - std::shared_ptr args); + ActorID CreateActor(const InvocationSpec &invocation); ObjectID SubmitActorTask(const InvocationSpec &invocation); @@ -34,7 +33,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter { LocalModeRayRuntime &local_mode_ray_tuntime_; - ObjectID Submit(const InvocationSpec &invocation, TaskType type); + ObjectID Submit(const InvocationSpec &invocation); }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/native_task_submitter.cc b/cpp/src/ray/runtime/task/native_task_submitter.cc index a162035b3..abb82f55a 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.cc +++ b/cpp/src/ray/runtime/task/native_task_submitter.cc @@ -8,24 +8,86 @@ namespace ray { namespace api { -NativeTaskSubmitter::NativeTaskSubmitter(NativeRayRuntime &native_ray_tuntime_) - : native_ray_tuntime_(native_ray_tuntime_) {} +void SubmitActorTask(const ActorID &actor_id, const RayFunction &function, + const std::vector> &args, + const TaskOptions &task_options, std::vector *return_ids); -ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskType type) { - return ObjectID(); +ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation) { + auto base_addr = + GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer); + + auto func_offset = (size_t)(invocation.fptr.function_pointer - base_addr); + auto exec_func_offset = (size_t)(invocation.fptr.exec_function_pointer - base_addr); + auto function_descriptor = FunctionDescriptorBuilder::BuildCpp( + invocation.lib_name, std::to_string(func_offset), std::to_string(exec_func_offset)); + auto ray_function = RayFunction(Language::CPP, function_descriptor); + + auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( + reinterpret_cast(invocation.args->data()), invocation.args->size(), + true); + std::vector> args; + auto task_arg = new TaskArgByValue( + std::make_shared<::ray::RayObject>(buffer, nullptr, std::vector())); + args.emplace_back(task_arg); + + auto &core_worker = CoreWorkerProcess::GetCoreWorker(); + std::vector return_ids; + if (invocation.task_type == TaskType::ACTOR_TASK) { + core_worker.SubmitActorTask(invocation.actor_id, ray_function, args, TaskOptions(), + &return_ids); + } else { + core_worker.SubmitTask(ray_function, args, TaskOptions(), &return_ids, 1, + std::make_pair(PlacementGroupID::Nil(), -1)); + } + return return_ids[0]; } ObjectID NativeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) { - return Submit(invocation, TaskType::NORMAL_TASK); + return Submit(invocation); } -ActorID NativeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr, - std::shared_ptr args) { - return native_ray_tuntime_.GetNextActorID(); +ActorID NativeTaskSubmitter::CreateActor(const InvocationSpec &invocation) { + auto base_addr = + GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer); + + auto func_offset = (size_t)(invocation.fptr.function_pointer - base_addr); + auto exec_func_offset = (size_t)(invocation.fptr.exec_function_pointer - base_addr); + auto function_descriptor = FunctionDescriptorBuilder::BuildCpp( + invocation.lib_name, std::to_string(func_offset), std::to_string(exec_func_offset)); + auto ray_function = RayFunction(Language::CPP, function_descriptor); + + auto buffer = std::make_shared<::ray::LocalMemoryBuffer>( + reinterpret_cast(invocation.args->data()), invocation.args->size(), + true); + std::vector> args; + auto task_arg = new TaskArgByValue( + std::make_shared<::ray::RayObject>(buffer, nullptr, std::vector())); + args.emplace_back(task_arg); + + auto &core_worker = CoreWorkerProcess::GetCoreWorker(); + + std::unordered_map resources; + std::string name = ""; + ActorCreationOptions actor_options{0, + 0, + 1, + resources, + resources, + {}, + /*is_detached=*/false, + name, + /*is_asyncio=*/false}; + ActorID actor_id; + auto status = core_worker.CreateActor(ray_function, args, actor_options, "", &actor_id); + if (!status.ok()) { + throw RayException("Create actor error"); + } + + return actor_id; } ObjectID NativeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) { - return Submit(invocation, TaskType::ACTOR_TASK); + return Submit(invocation); } } // namespace api diff --git a/cpp/src/ray/runtime/task/native_task_submitter.h b/cpp/src/ray/runtime/task/native_task_submitter.h index 3812f3d47..064170088 100644 --- a/cpp/src/ray/runtime/task/native_task_submitter.h +++ b/cpp/src/ray/runtime/task/native_task_submitter.h @@ -1,9 +1,5 @@ #pragma once -#include -#include -#include - #include "../native_ray_runtime.h" #include "invocation_spec.h" #include "ray/core.h" @@ -14,19 +10,14 @@ namespace api { class NativeTaskSubmitter : public TaskSubmitter { public: - NativeTaskSubmitter(NativeRayRuntime &native_ray_tuntime); - ObjectID SubmitTask(const InvocationSpec &invocation); - ActorID CreateActor(RemoteFunctionPtrHolder &fptr, - std::shared_ptr args); + ActorID CreateActor(const InvocationSpec &invocation); ObjectID SubmitActorTask(const InvocationSpec &invocation); private: - NativeRayRuntime &native_ray_tuntime_; - - ObjectID Submit(const InvocationSpec &invocation, TaskType type); + ObjectID Submit(const InvocationSpec &invocation); }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index cdbda5a1d..b064eb14c 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -4,11 +4,14 @@ #include #include "../../util/address_helper.h" +#include "../../util/function_helper.h" #include "../abstract_ray_runtime.h" namespace ray { namespace api { +std::shared_ptr TaskExecutor::current_actor_ = nullptr; + TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_) : abstract_ray_tuntime_(abstract_ray_tuntime_) {} @@ -19,9 +22,79 @@ std::unique_ptr TaskExecutor::Execute(const InvocationSpec &invocation return std::unique_ptr(new ObjectID()); }; +Status TaskExecutor::ExecuteTask( + TaskType task_type, const std::string task_name, const RayFunction &ray_function, + const std::unordered_map &required_resources, + const std::vector> &args, + const std::vector &arg_reference_ids, + const std::vector &return_ids, + std::vector> *results) { + RAY_LOG(INFO) << "TaskExecutor::ExecuteTask"; + RAY_CHECK(ray_function.GetLanguage() == Language::CPP); + auto function_descriptor = ray_function.GetFunctionDescriptor(); + RAY_CHECK(function_descriptor->Type() == + ray::FunctionDescriptorType::kCppFunctionDescriptor); + auto typed_descriptor = function_descriptor->As(); + std::string lib_name = typed_descriptor->LibName(); + std::string func_offset = typed_descriptor->FunctionOffset(); + std::string exec_func_offset = typed_descriptor->ExecFunctionOffset(); + + auto args_buffer = args[0]->GetData(); + auto args_sbuffer = std::make_shared(args_buffer->Size()); + /// TODO(Guyang Song): Avoid the memory copy. + args_sbuffer->write(reinterpret_cast(args_buffer->Data()), + args_buffer->Size()); + auto base_addr = FunctionHelper::GetInstance().GetBaseAddress(lib_name); + + std::shared_ptr data = nullptr; + if (task_type == TaskType::ACTOR_CREATION_TASK) { + typedef std::shared_ptr (*ExecFunction)( + uintptr_t base_addr, size_t func_offset, std::shared_ptr args); + ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset)); + data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), + args_sbuffer); + current_actor_ = data; + } else if (task_type == TaskType::ACTOR_TASK) { + RAY_CHECK(current_actor_ != nullptr); + typedef std::shared_ptr (*ExecFunction)( + uintptr_t base_addr, size_t func_offset, std::shared_ptr args, + std::shared_ptr object); + ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset)); + data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), + args_sbuffer, current_actor_); + } else { // NORMAL_TASK + typedef std::shared_ptr (*ExecFunction)( + uintptr_t base_addr, size_t func_offset, std::shared_ptr args); + ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset)); + data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), + args_sbuffer); + } + + std::vector data_sizes; + std::vector> metadatas; + std::vector> contained_object_ids; + if (task_type != TaskType::ACTOR_CREATION_TASK) { + metadatas.push_back(nullptr); + data_sizes.push_back(data->size()); + contained_object_ids.push_back(std::vector()); + } + + RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().AllocateReturnObjects( + return_ids, data_sizes, metadatas, contained_object_ids, results)); + if (task_type != TaskType::ACTOR_CREATION_TASK) { + auto result = (*results)[0]; + if (result != nullptr) { + if (result->HasData()) { + memcpy(result->GetData()->Data(), data->data(), data_sizes[0]); + } + } + } + return ray::Status::OK(); +} + void TaskExecutor::Invoke(const TaskSpecification &task_spec, std::shared_ptr actor, - AbstractRayRuntime *runtime) { + AbstractRayRuntime *runtime, const uintptr_t base_addr) { auto args = std::make_shared(task_spec.ArgDataSize(0)); /// TODO(Guyang Song): Avoid the memory copy. args->write(reinterpret_cast(task_spec.ArgData(0)), @@ -33,19 +106,21 @@ void TaskExecutor::Invoke(const TaskSpecification &task_spec, typedef std::shared_ptr (*ExecFunction)( uintptr_t base_addr, size_t func_offset, std::shared_ptr args, std::shared_ptr object); - ExecFunction exec_function = (ExecFunction)( - dynamic_library_base_addr + std::stoul(typed_descriptor->ExecFunctionOffset())); - data = (*exec_function)(dynamic_library_base_addr, - std::stoul(typed_descriptor->FunctionOffset()), args, actor); + unsigned long offset = std::stoul(typed_descriptor->ExecFunctionOffset()); + auto address = base_addr + offset; + ExecFunction exec_function = (ExecFunction)(address); + data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), + args, actor); } else { typedef std::shared_ptr (*ExecFunction)( uintptr_t base_addr, size_t func_offset, std::shared_ptr args); - ExecFunction exec_function = (ExecFunction)( - dynamic_library_base_addr + std::stoul(typed_descriptor->ExecFunctionOffset())); - data = (*exec_function)(dynamic_library_base_addr, - std::stoul(typed_descriptor->FunctionOffset()), args); + ExecFunction exec_function = + (ExecFunction)(base_addr + std::stoul(typed_descriptor->ExecFunctionOffset())); + data = + (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), args); } runtime->Put(std::move(data), task_spec.ReturnId(0)); } + } // namespace api } // namespace ray diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h index 40e7acf3c..754ccd20d 100644 --- a/cpp/src/ray/runtime/task/task_executor.h +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -28,13 +28,22 @@ class TaskExecutor { std::unique_ptr Execute(const InvocationSpec &invocation); static void Invoke(const TaskSpecification &task_spec, - std::shared_ptr actor, - AbstractRayRuntime *runtime); + std::shared_ptr actor, AbstractRayRuntime *runtime, + const uintptr_t base_addr); + + static Status ExecuteTask( + TaskType task_type, const std::string task_name, const RayFunction &ray_function, + const std::unordered_map &required_resources, + const std::vector> &args, + const std::vector &arg_reference_ids, + const std::vector &return_ids, + std::vector> *results); virtual ~TaskExecutor(){}; private: AbstractRayRuntime &abstract_ray_tuntime_; + static std::shared_ptr current_actor_; }; } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/runtime/task/task_submitter.h b/cpp/src/ray/runtime/task/task_submitter.h index 36e6555c3..15e3fae97 100644 --- a/cpp/src/ray/runtime/task/task_submitter.h +++ b/cpp/src/ray/runtime/task/task_submitter.h @@ -17,8 +17,7 @@ class TaskSubmitter { virtual ObjectID SubmitTask(const InvocationSpec &invocation) = 0; - virtual ActorID CreateActor(RemoteFunctionPtrHolder &fptr, - std::shared_ptr args) = 0; + virtual ActorID CreateActor(const InvocationSpec &invocation) = 0; virtual ObjectID SubmitActorTask(const InvocationSpec &invocation) = 0; }; diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc index 98830336a..be7916723 100644 --- a/cpp/src/ray/test/cluster/cluster_mode_test.cc +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -2,14 +2,65 @@ #include #include #include +#include using namespace ray::api; -TEST(RayClusterModeTest, PutTest) { +/// general function of user code +int Plus1(int x) { return x + 1; } + +/// a class of user code +class Counter { + public: + int count; + + Counter(int init) { count = init; } + + static Counter *FactoryCreate(int init) { return new Counter(init); } + /// non static function + int Add(int x) { + count += x; + return count; + } +}; + +TEST(RayClusterModeTest, FullTest) { + /// initialization to cluster mode ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER; + /// TODO(Guyang Song): add the dynamic library name + ray::api::RayConfig::GetInstance()->lib_name = ""; Ray::Init(); - auto obj1 = Ray::Put(12345); - auto i1 = obj1.Get(); - EXPECT_EQ(12345, *i1); + + /// put and get object + auto obj = Ray::Put(12345); + auto get_result = *(Ray::Get(obj)); + EXPECT_EQ(12345, get_result); + + auto task_obj = Ray::Task(Plus1, 5).Remote(); + int task_result = *(Ray::Get(task_obj)); + EXPECT_EQ(6, task_result); + + ActorHandle actor = Ray::Actor(Counter::FactoryCreate, 1).Remote(); + auto actor_object = actor.Task(&Counter::Add, 5).Remote(); + int actor_task_result = *(Ray::Get(actor_object)); + EXPECT_EQ(6, actor_task_result); + Ray::Shutdown(); +} + +/// TODO(Guyang Song): Separate default worker from this test. +/// Currently, we compile `default_worker` and `cluster_mode_test` in one single binary, +/// to work around a symbol conflicting issue. +/// This is the main function of the binary, and we use the `is_default_worker` arg to +/// tell if this binary is used as `default_worker` or `cluster_mode_test`. +int main(int argc, char **argv) { + const char *default_worker_magic = "is_default_worker"; + /// `is_default_worker` is the last arg of `argv` + if (argc > 1 && + memcmp(argv[argc - 1], default_worker_magic, strlen(default_worker_magic)) == 0) { + default_worker_main(argc, argv); + return 0; + } + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); } \ No newline at end of file diff --git a/cpp/src/ray/util/address_helper.cc b/cpp/src/ray/util/address_helper.cc index 92560bebe..d676c627b 100644 --- a/cpp/src/ray/util/address_helper.cc +++ b/cpp/src/ray/util/address_helper.cc @@ -4,13 +4,13 @@ namespace ray { namespace api { -uintptr_t dynamic_library_base_addr; - -extern "C" void GenerateBaseAddressOfCurrentLibrary() { +uintptr_t GetBaseAddressOfLibraryFromAddr(void *addr) { Dl_info info; - dladdr((void *)GenerateBaseAddressOfCurrentLibrary, &info); - dynamic_library_base_addr = (uintptr_t)info.dli_fbase; - return; + dladdr(addr, &info); + return (uintptr_t)info.dli_fbase; } + +uintptr_t dynamic_library_base_addr = + GetBaseAddressOfLibraryFromAddr((void *)GetBaseAddressOfLibraryFromAddr); } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/util/address_helper.h b/cpp/src/ray/util/address_helper.h index 7d41cdf10..dede3f353 100644 --- a/cpp/src/ray/util/address_helper.h +++ b/cpp/src/ray/util/address_helper.h @@ -8,7 +8,7 @@ namespace api { /// A base address which is used to calculate function offset extern uintptr_t dynamic_library_base_addr; -/// A fixed C language function which help to get infomation from dladdr -extern "C" void GenerateBaseAddressOfCurrentLibrary(); +/// Get the base address of libary which the function address belongs to. +uintptr_t GetBaseAddressOfLibraryFromAddr(void *addr); } // namespace api } // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/util/function_helper.cc b/cpp/src/ray/util/function_helper.cc new file mode 100644 index 000000000..5dfa8a012 --- /dev/null +++ b/cpp/src/ray/util/function_helper.cc @@ -0,0 +1,45 @@ + +#include "function_helper.h" +#include +#include +#include +#include +#include "address_helper.h" +#include "ray/core.h" + +namespace ray { +namespace api { + +uintptr_t base_addr = 0; + +static const uintptr_t BaseAddressForHandle(void *handle) { + /// TODO(Guyang Song): Implement a cross-platform function. + /// Not Implemented. + return -1; +} + +uintptr_t FunctionHelper::LoadLibrary(std::string lib_name) { + if (dynamic_library_base_addr != 0) { + /// Base address has been generated. + return dynamic_library_base_addr; + } + /// Generate base address from library. + RAY_LOG(INFO) << "Start load library " << lib_name; + void *example = dlopen(lib_name.c_str(), RTLD_LAZY); + uintptr_t base_addr = BaseAddressForHandle(example); + RAY_CHECK(base_addr > 0); + RAY_LOG(INFO) << "Loaded library " << lib_name << " to base address " << base_addr; + loaded_library_.emplace(lib_name, base_addr); + return base_addr; +} + +uintptr_t FunctionHelper::GetBaseAddress(std::string lib_name) { + auto got = loaded_library_.find(lib_name); + if (got == loaded_library_.end()) { + return LoadLibrary(lib_name); + } + return got->second; +} + +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/util/function_helper.h b/cpp/src/ray/util/function_helper.h new file mode 100644 index 000000000..c302f903e --- /dev/null +++ b/cpp/src/ray/util/function_helper.h @@ -0,0 +1,22 @@ +#pragma once +#include +#include + +namespace ray { +namespace api { + +class FunctionHelper { + public: + uintptr_t GetBaseAddress(std::string lib_name); + + static FunctionHelper &GetInstance() { + static FunctionHelper functionHelper; + return functionHelper; + } + + private: + std::unordered_map loaded_library_; + uintptr_t LoadLibrary(std::string lib_name); +}; +} // namespace api +} // namespace ray \ No newline at end of file diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index 737700865..3f7a50044 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -23,10 +23,11 @@ static std::string GetSessionDir(std::string redis_ip, int port, std::string pas return session_dir; } -static void StartRayNode(int redis_port, std::string redis_password) { - std::vector cmdargs({"ray", "start", "--head", "--redis-port", - std::to_string(redis_port), "--redis-password", - redis_password}); +static void StartRayNode(int redis_port, std::string redis_password, + int node_manager_port) { + std::vector cmdargs( + {"ray", "start", "--head", "--port", std::to_string(redis_port), "--redis-password", + redis_password, "--node-manager-port", std::to_string(node_manager_port)}); RAY_LOG(INFO) << CreateCommandLine(cmdargs); RAY_CHECK(!Process::Spawn(cmdargs, true).second); sleep(5); @@ -37,18 +38,28 @@ static void StopRayNode() { std::vector cmdargs({"ray", "stop"}); RAY_LOG(INFO) << CreateCommandLine(cmdargs); RAY_CHECK(!Process::Spawn(cmdargs, true).second); - usleep(1000 * 1000); + sleep(3); return; } -void ProcessHelper::RayStart(std::shared_ptr config) { +void ProcessHelper::RayStart(std::shared_ptr config, + CoreWorkerOptions::TaskExecutionCallback callback) { std::string redis_ip = config->redis_ip; - if (redis_ip.empty()) { + if (config->worker_type == WorkerType::DRIVER && redis_ip.empty()) { redis_ip = "127.0.0.1"; - StartRayNode(config->redis_port, config->redis_password); + StartRayNode(config->redis_port, config->redis_password, config->node_manager_port); } - auto session_dir = GetSessionDir(redis_ip, config->redis_port, config->redis_password); + auto session_dir = + config->session_dir.empty() + ? GetSessionDir(redis_ip, config->redis_port, config->redis_password) + : config->session_dir; + + auto store_socket = config->store_socket.empty() ? session_dir + "/sockets/plasma_store" + : config->store_socket; + + auto raylet_socket = config->raylet_socket.empty() ? session_dir + "/sockets/raylet" + : config->raylet_socket; gcs::GcsClientOptions gcs_options = gcs::GcsClientOptions(redis_ip, config->redis_port, config->redis_password); @@ -56,11 +67,12 @@ void ProcessHelper::RayStart(std::shared_ptr config) { CoreWorkerOptions options; options.worker_type = config->worker_type; options.language = Language::CPP; - options.store_socket = session_dir + "/sockets/plasma_store"; - options.raylet_socket = session_dir + "/sockets/raylet"; + options.store_socket = store_socket; + options.raylet_socket = raylet_socket; options.job_id = JobID::FromInt(1); options.gcs_options = gcs_options; options.enable_logging = true; + options.log_dir = session_dir + "/logs"; options.install_failure_signal_handler = true; options.node_ip_address = "127.0.0.1"; options.node_manager_port = config->node_manager_port; @@ -69,6 +81,7 @@ void ProcessHelper::RayStart(std::shared_ptr config) { options.ref_counting_enabled = true; options.num_workers = 1; options.metrics_agent_port = -1; + options.task_execution_callback = callback; CoreWorkerProcess::Initialize(options); } diff --git a/cpp/src/ray/util/process_helper.h b/cpp/src/ray/util/process_helper.h index 3dcc10ebb..bfddfe194 100644 --- a/cpp/src/ray/util/process_helper.h +++ b/cpp/src/ray/util/process_helper.h @@ -8,10 +8,11 @@ namespace api { class ProcessHelper { public: - void RayStart(std::shared_ptr config); + void RayStart(std::shared_ptr config, + CoreWorkerOptions::TaskExecutionCallback callback); void RayStop(std::shared_ptr config); - static ProcessHelper &getInstance() { + static ProcessHelper &GetInstance() { static ProcessHelper processHelper; return processHelper; } diff --git a/cpp/src/ray/worker/default_worker.cc b/cpp/src/ray/worker/default_worker.cc index 1c3157bd4..f6f483580 100644 --- a/cpp/src/ray/worker/default_worker.cc +++ b/cpp/src/ray/worker/default_worker.cc @@ -1,71 +1,34 @@ -#define BOOST_BIND_NO_PLACEHOLDERS -#include "ray/core_worker/context.h" -#include "ray/core_worker/core_worker.h" -using namespace std::placeholders; +#include +#include +#include namespace ray { - namespace api { -class DefaultWorker { - public: - DefaultWorker(const std::string &store_socket, const std::string &raylet_socket, - int node_manager_port, const gcs::GcsClientOptions &gcs_options, - const std::string &session_dir) { - CoreWorkerOptions options; - options.worker_type = WorkerType::WORKER; - options.language = Language::CPP; - options.store_socket = store_socket; - options.raylet_socket = raylet_socket; - options.job_id = JobID::FromInt(1); - options.gcs_options = gcs_options; - options.enable_logging = true; - options.log_dir = session_dir + "/logs"; - options.install_failure_signal_handler = true; - options.node_ip_address = "127.0.0.1"; - options.node_manager_port = node_manager_port; - options.raylet_ip_address = "127.0.0.1"; - options.task_execution_callback = - std::bind(&DefaultWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8); - options.ref_counting_enabled = true; - options.num_workers = 1; - options.metrics_agent_port = -1; - - CoreWorkerProcess::Initialize(options); - } - - void RunTaskExecutionLoop() { CoreWorkerProcess::RunTaskExecutionLoop(); } - - private: - Status ExecuteTask(TaskType task_type, const std::string task_name, - const RayFunction &ray_function, - const std::unordered_map &required_resources, - const std::vector> &args, - const std::vector &arg_reference_ids, - const std::vector &return_ids, - std::vector> *results) { - /// TODO(Guyang Song): Make task execution worked. - return Status::TypeError("Task executor not implemented"); - } -}; -} // namespace api -} // namespace ray - -int main(int argc, char **argv) { +int default_worker_main(int argc, char **argv) { RAY_LOG(INFO) << "CPP default worker started"; + RAY_CHECK(argc == 8); - RAY_CHECK(argc == 6); - auto store_socket = std::string(argv[1]); - auto raylet_socket = std::string(argv[2]); - auto node_manager_port = std::stoi(std::string(argv[3])); - auto redis_password = std::string(std::string(argv[4])); - auto session_dir = std::string(std::string(argv[5])); + auto config = ray::api::RayConfig::GetInstance(); + config->run_mode = RunMode::CLUSTER; + config->worker_type = ray::WorkerType::WORKER; + config->store_socket = std::string(argv[1]); + config->raylet_socket = std::string(argv[2]); + config->node_manager_port = std::stoi(std::string(argv[3])); + std::string redis_address = std::string(std::string(argv[4])); + auto pos = redis_address.find(':'); + RAY_CHECK(pos != std::string::npos); + config->redis_ip = redis_address.substr(0, pos); + config->redis_port = std::stoi(redis_address.substr(pos + 1, redis_address.length())); + config->redis_password = std::string(std::string(argv[5])); + config->session_dir = std::string(std::string(argv[6])); - /// TODO(Guyang Song): Delete this hard code and get address from redis. - ray::gcs::GcsClientOptions gcs_options("127.0.0.1", 6379, redis_password); - ray::api::DefaultWorker worker(store_socket, raylet_socket, node_manager_port, - gcs_options, session_dir); - worker.RunTaskExecutionLoop(); + Ray::Init(); + + ::ray::CoreWorkerProcess::RunTaskExecutionLoop(); return 0; } + +} // namespace api +} // namespace ray diff --git a/python/ray/services.py b/python/ray/services.py index 9ce6a6e66..c36776ce6 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1484,9 +1484,14 @@ def build_cpp_worker_command( Returns: The command string for starting CPP worker. """ + + # TODO(Guyang Song): Remove the arg is_default_worker. + # See `cluster_mode_test.cc` for why this workaround is currently needed + # for C++ workers. command = [ DEFAULT_WORKER_EXECUTABLE, plasma_store_name, raylet_name, - str(node_manager_port), redis_password, session_dir + str(node_manager_port), redis_address, redis_password, session_dir, + "is_default_worker" ] return command