support dynamic library loading in C++ worker (#13734)

This commit is contained in:
SongGuyang 2021-02-01 19:24:33 +08:00 committed by GitHub
parent 1d2ab018b0
commit 361e5f0bef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 144 additions and 243 deletions

View file

@ -45,6 +45,6 @@ done
pushd "$ROOT_DIR"/../..
BAZEL_FILES=(bazel/BUILD bazel/ray.bzl BUILD.bazel java/BUILD.bazel \
cpp/BUILD.bazel streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE)
cpp/BUILD.bazel cpp/example/BUILD.bazel streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE)
buildifier -mode=$RUN_TYPE -diff_command="diff -u" "${BAZEL_FILES[@]}"
popd

View file

@ -188,6 +188,9 @@ test_cpp() {
bazel build --config=ci //cpp:all
# shellcheck disable=SC2046
bazel test --config=ci $(./scripts/bazel_export_options) //cpp:all --build_tests_only
# run the cpp example
bazel run //cpp/example:example
}
test_wheels() {

View file

@ -21,7 +21,6 @@ cc_library(
"src/ray/util/*.h",
"src/ray/*.cc",
"src/ray/*.h",
"src/ray/worker/default_worker.cc",
]),
hdrs = glob([
"include/ray/*.h",
@ -45,18 +44,36 @@ cc_library(
)
cc_binary(
name = "example",
testonly = 1,
name = "default_worker",
srcs = glob([
"src/example/example.cc",
"src/ray/worker/default_worker.cc",
]),
copts = COPTS,
linkstatic = False,
linkstatic = True,
deps = [
"ray_api",
],
)
genrule(
name = "ray_cpp_pkg",
srcs = [
"default_worker",
"ray_api",
],
outs = ["ray_cpp_pkg.out"],
cmd = """
WORK_DIR="$$(pwd)" &&
mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
cp -f $(location default_worker) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" &&
cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
echo "$$WORK_DIR" > $@
""",
local = 1,
visibility = ["//visibility:public"],
)
# test
cc_test(
name = "api_test",
srcs = glob([
@ -76,27 +93,32 @@ cc_test(
srcs = glob([
"src/ray/test/cluster/*.cc",
]),
args = [
"$(location cluster_mode_test.so)",
],
copts = COPTS,
data = [
"cluster_mode_test.so",
"ray_cpp_pkg",
],
linkstatic = True,
deps = [
"ray_api",
"@com_github_gflags_gflags//:gflags",
"@com_google_googletest//:gtest_main",
],
)
genrule(
name = "ray_cpp_pkg",
srcs = [
"cluster_mode_test",
cc_binary(
name = "cluster_mode_test.so",
srcs = glob([
"src/ray/test/cluster/*.cc",
]),
copts = COPTS,
linkstatic = True,
deps = [
"ray_api",
"@com_github_gflags_gflags//:gflags",
"@com_google_googletest//:gtest_main",
],
outs = ["ray_cpp_pkg.out"],
cmd = """
WORK_DIR="$$(pwd)" &&
mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
cp -f $(location cluster_mode_test) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" &&
cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
echo "$$WORK_DIR" > $@
""",
local = 1,
)

View file

@ -1,74 +0,0 @@
# Bazel development build for C++ API.
# C/C++ documentation: https://docs.bazel.build/versions/master/be/c-cpp.html
load("//bazel:ray.bzl", "COPTS")
cc_library(
name = "ray_api",
srcs = glob([
"src/ray/api.cc",
"src/ray/api/*.cc",
"src/ray/api/*.h",
"src/ray/app/*.cc",
"src/ray/app/*.h",
"src/ray/runtime/*.cc",
"src/ray/runtime/*.h",
"src/ray/runtime/**/*.cc",
"src/ray/runtime/**/*.h",
"src/ray/runtime/task/*.cc",
"src/ray/runtime/task/*.h",
"src/ray/util/*.cc",
"src/ray/util/*.h",
"src/ray/*.cc",
"src/ray/*.h",
"src/ray/worker/default_worker.cc",
]),
hdrs = glob([
"include/ray/*.h",
"include/ray/**/*.h",
"include/ray/**/**/*.h",
]),
copts = COPTS,
linkopts = ["-ldl"],
linkstatic = True,
strip_include_prefix = "include",
visibility = ["//visibility:public"],
deps = [
"//:core_worker_lib",
"//:ray_common",
"//:ray_util",
"@boost//:asio",
"@boost//:thread",
"@com_google_absl//absl/synchronization",
"@msgpack",
],
)
cc_binary(
name = "example",
srcs = glob([
"src/ray/example/*.cc",
]),
copts = COPTS,
linkstatic = True,
deps = [
"ray_api",
],
)
genrule(
name = "ray_cpp_pkg",
srcs = [
"example",
"ray_api",
],
outs = ["ray_cpp_pkg.out"],
cmd = """
WORK_DIR="$$(pwd)" &&
mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
cp -f $(location example) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" &&
cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
echo "$$WORK_DIR" > $@
""",
local = 1,
)

37
cpp/example/BUILD.bazel Normal file
View file

@ -0,0 +1,37 @@
# Bazel development build for C++ API.
# C/C++ documentation: https://docs.bazel.build/versions/master/be/c-cpp.html
load("//bazel:ray.bzl", "COPTS")
cc_binary(
name = "example",
srcs = glob([
"*.cc",
]),
args = [
"--dynamic-library-path $(location example.so)",
],
copts = COPTS,
data = [
"example.so",
"//cpp:ray_cpp_pkg",
],
linkstatic = True,
deps = [
"//cpp:ray_api",
"@com_github_gflags_gflags//:gflags",
],
)
cc_binary(
name = "example.so",
srcs = glob([
"*.cc",
]),
copts = COPTS,
linkstatic = True,
deps = [
"//cpp:ray_api",
"@com_github_gflags_gflags//:gflags",
],
)

View file

@ -1,8 +1,12 @@
/// This is a complete example of writing a distributed program using the C ++ worker API.
/// including the header
#include <ray/api.h>
#include <ray/api/ray_config.h>
#include <ray/experimental/default_worker.h>
#include "gflags/gflags.h"
/// using namespace
using namespace ::ray::api;
/// general function of user code
@ -32,22 +36,25 @@ class Counter {
}
};
DEFINE_string(redis_address, "", "The ip address of redis server.");
DEFINE_string(dynamic_library_path, "", "The local path of the dynamic library.");
int main(int argc, char **argv) {
/// Currently, we compile `default_worker` and `example` in one single binary,
/// to work around a symbol conflicting issue.
/// This is the main function of the binary, and we use the `is_default_worker` arg to
/// tell if this binary is used as `default_worker` or `example`.
const char *default_worker_magic = "is_default_worker";
/// `is_default_worker` is the last arg of `argv`
if (argc > 1 &&
memcmp(argv[argc - 1], default_worker_magic, strlen(default_worker_magic)) == 0) {
default_worker_main(argc, argv);
return 0;
/// configuration
gflags::ParseCommandLineFlags(&argc, &argv, true);
const std::string dynamic_library_path = FLAGS_dynamic_library_path;
const std::string redis_address = FLAGS_redis_address;
gflags::ShutDownCommandLineFlags();
RAY_CHECK(!dynamic_library_path.empty())
<< "Please add a local dynamic library by '--dynamic-library-path'";
ray::api::RayConfig::GetInstance()->lib_name = dynamic_library_path;
if (!redis_address.empty()) {
ray::api::RayConfig::GetInstance()->SetRedisAddress(redis_address);
}
/// initialization to cluster mode
ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER;
/// Dynamic library loading is not supported yet.
ray::api::RayConfig::GetInstance()->lib_name = "";
::ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER;
/// initialization
Ray::Init();
/// put and get object
@ -86,7 +93,6 @@ int main(int argc, char **argv) {
/// general function remote callargs passed by value
auto r0 = Ray::Task(Return1).Remote();
auto r2 = Ray::Task(Plus, 3, 22).Remote();
int task_result3 = *(Ray::Get(r2));
std::cout << "task_result3 = " << task_result3 << std::endl;
@ -95,7 +101,6 @@ int main(int argc, char **argv) {
auto r4 = Ray::Task(Plus1, r3).Remote();
auto r5 = Ray::Task(Plus, r4, r3).Remote();
auto r6 = Ray::Task(Plus, r4, 10).Remote();
int task_result4 = *(Ray::Get(r6));
int task_result5 = *(Ray::Get(r5));
std::cout << "task_result4 = " << task_result4 << ", task_result5 = " << task_result5
@ -104,31 +109,30 @@ int main(int argc, char **argv) {
/// create actor and actor function remote call with args passed by value
ActorHandle<Counter> actor4 = Ray::Actor(Counter::FactoryCreate, 10).Remote();
auto r10 = actor4.Task(&Counter::Add, 8).Remote();
int actor_result4 = *(Ray::Get(r10));
std::cout << "actor_result4 = " << actor_result4 << std::endl;
/// create actor and task function remote call with args passed by reference
ActorHandle<Counter> actor5 = Ray::Actor(Counter::FactoryCreate, r10, 0).Remote();
auto r11 = actor5.Task(&Counter::Add, r0).Remote();
auto r12 = actor5.Task(&Counter::Add, r11).Remote();
auto r13 = actor5.Task(&Counter::Add, r10).Remote();
auto r14 = actor5.Task(&Counter::Add, r13).Remote();
auto r15 = Ray::Task(Plus, r0, r11).Remote();
auto r16 = Ray::Task(Plus1, r15).Remote();
int result12 = *(Ray::Get(r12));
int result14 = *(Ray::Get(r14));
int result11 = *(Ray::Get(r11));
int result13 = *(Ray::Get(r13));
int result16 = *(Ray::Get(r16));
int result15 = *(Ray::Get(r15));
std::cout << "Final result:" << std::endl;
std::cout << "result11 = " << result11 << ", result12 = " << result12
<< ", result13 = " << result13 << ", result14 = " << result14
<< ", result15 = " << result15 << ", result16 = " << result16 << std::endl;
/// shutdown
Ray::Shutdown();
return 0;
}

View file

@ -34,6 +34,13 @@ class RayConfig {
static std::shared_ptr<RayConfig> GetInstance();
void SetRedisAddress(const std::string address) {
auto pos = address.find(':');
RAY_CHECK(pos != std::string::npos);
redis_ip = address.substr(0, pos);
redis_port = std::stoi(address.substr(pos + 1, address.length()));
}
private:
static std::shared_ptr<RayConfig> config_;
};

View file

@ -1,9 +0,0 @@
#pragma once
namespace ray {
namespace api {
int default_worker_main(int argc, char **argv);
} // namespace api
} // namespace ray

View file

@ -1,76 +0,0 @@
/// This is a complete example of writing a distributed program using the C ++ worker API.
/// including the header
#include <ray/api.h>
/// using namespace
using namespace ::ray::api;
/// general function of user code
int Return1() { return 1; }
int Plus1(int x) { return x + 1; }
int Plus(int x, int y) { return x + y; }
/// a class of user code
class Counter {
public:
int count;
Counter() { count = 0; }
static Counter *FactoryCreate() { return new Counter(); }
/// non static function
int Add(int x) {
count += x;
return count;
}
};
int main() {
/// initialization
Ray::Init();
/// put and get object
auto obj = Ray::Put(123);
auto get_result = obj.Get();
/// general function remote callargs passed by value
auto r0 = Ray::Task(Return1).Remote();
auto r1 = Ray::Task(Plus1, 1).Remote();
auto r2 = Ray::Task(Plus, 1, 2).Remote();
int result0 = *(r0.Get());
int result1 = *(r1.Get());
int result2 = *(r2.Get());
std::cout << "Ray::call with value results: " << result0 << " " << result1 << " "
<< result2 << std::endl;
/// general function remote callargs passed by reference
auto r3 = Ray::Task(Return1).Remote();
auto r4 = Ray::Task(Plus1, r3).Remote();
auto r5 = Ray::Task(Plus, r4, 1).Remote();
int result3 = *(r3.Get());
int result4 = *(r4.Get());
int result5 = *(r5.Get());
std::cout << "Ray::call with reference results: " << result3 << " " << result4 << " "
<< result5 << std::endl;
/// create actor and actor function remote call
ActorHandle<Counter> actor = Ray::Actor(Counter::FactoryCreate).Remote();
auto r6 = actor.Task(&Counter::Add, 5).Remote();
auto r7 = actor.Task(&Counter::Add, 1).Remote();
auto r8 = actor.Task(&Counter::Add, 1).Remote();
auto r9 = actor.Task(&Counter::Add, r8).Remote();
int result6 = *(r6.Get());
int result7 = *(r7.Get());
int result8 = *(r8.Get());
int result9 = *(r9.Get());
std::cout << "Ray::call with actor results: " << result6 << " " << result7 << " "
<< result8 << " " << result9 << std::endl;
}

View file

@ -29,7 +29,7 @@ Status TaskExecutor::ExecuteTask(
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids, const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<RayObject>> *results) {
RAY_LOG(INFO) << "TaskExecutor::ExecuteTask";
RAY_LOG(INFO) << "Execute task: " << TaskType_Name(task_type);
RAY_CHECK(ray_function.GetLanguage() == Language::CPP);
auto function_descriptor = ray_function.GetFunctionDescriptor();
RAY_CHECK(function_descriptor->Type() ==

View file

@ -2,7 +2,6 @@
#include <gtest/gtest.h>
#include <ray/api.h>
#include <ray/api/ray_config.h>
#include <ray/experimental/default_worker.h>
using namespace ::ray::api;
@ -33,11 +32,16 @@ class Counter {
}
};
std::string lib_name = "";
std::string redis_ip = "";
TEST(RayClusterModeTest, FullTest) {
/// initialization to cluster mode
ray::api::RayConfig::GetInstance()->run_mode = RunMode::CLUSTER;
/// TODO(Guyang Song): add the dynamic library name
ray::api::RayConfig::GetInstance()->lib_name = "";
ray::api::RayConfig::GetInstance()->lib_name = lib_name;
ray::api::RayConfig::GetInstance()->redis_ip = redis_ip;
Ray::Init();
/// put and get object
@ -144,18 +148,11 @@ TEST(RayClusterModeTest, FullTest) {
Ray::Shutdown();
}
/// TODO(Guyang Song): Separate default worker from this test.
/// Currently, we compile `default_worker` and `cluster_mode_test` in one single binary,
/// to work around a symbol conflicting issue.
/// This is the main function of the binary, and we use the `is_default_worker` arg to
/// tell if this binary is used as `default_worker` or `cluster_mode_test`.
int main(int argc, char **argv) {
const char *default_worker_magic = "is_default_worker";
/// `is_default_worker` is the last arg of `argv`
if (argc > 1 &&
memcmp(argv[argc - 1], default_worker_magic, strlen(default_worker_magic)) == 0) {
default_worker_main(argc, argv);
return 0;
RAY_CHECK(argc == 2 || argc == 3);
lib_name = std::string(argv[1]);
if (argc == 3) {
redis_ip = std::string(argv[2]);
}
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();

View file

@ -14,19 +14,14 @@ uintptr_t base_addr = 0;
static const uintptr_t BaseAddressForHandle(void *handle) {
/// TODO(Guyang Song): Implement a cross-platform function.
/// Not Implemented.
return -1;
return (uintptr_t)((NULL == handle) ? NULL : (void *)*(size_t const *)(handle));
}
uintptr_t FunctionHelper::LoadLibrary(std::string lib_name) {
if (dynamic_library_base_addr != 0) {
/// Base address has been generated.
return dynamic_library_base_addr;
}
/// Generate base address from library.
RAY_LOG(INFO) << "Start load library " << lib_name;
void *example = dlopen(lib_name.c_str(), RTLD_LAZY);
uintptr_t base_addr = BaseAddressForHandle(example);
void *handle = dlopen(lib_name.c_str(), RTLD_LAZY);
uintptr_t base_addr = BaseAddressForHandle(handle);
RAY_CHECK(base_addr > 0);
RAY_LOG(INFO) << "Loaded library " << lib_name << " to base address " << base_addr;
loaded_library_.emplace(lib_name, base_addr);

View file

@ -3,14 +3,11 @@
#include <ray/api/ray_config.h>
#include <ray/util/logging.h>
using namespace ::ray;
namespace ray {
namespace api {
using namespace ::ray::api;
int default_worker_main(int argc, char **argv) {
RAY_LOG(INFO) << "CPP default worker started";
RAY_CHECK(argc == 8);
RAY_CHECK(argc == 7);
auto config = ray::api::RayConfig::GetInstance();
config->run_mode = RunMode::CLUSTER;
@ -19,10 +16,7 @@ int default_worker_main(int argc, char **argv) {
config->raylet_socket = std::string(argv[2]);
config->node_manager_port = std::stoi(std::string(argv[3]));
std::string redis_address = std::string(std::string(argv[4]));
auto pos = redis_address.find(':');
RAY_CHECK(pos != std::string::npos);
config->redis_ip = redis_address.substr(0, pos);
config->redis_port = std::stoi(redis_address.substr(pos + 1, redis_address.length()));
config->SetRedisAddress(redis_address);
config->redis_password = std::string(std::string(argv[5]));
config->session_dir = std::string(std::string(argv[6]));
@ -32,5 +26,7 @@ int default_worker_main(int argc, char **argv) {
return 0;
}
} // namespace api
} // namespace ray
int main(int argc, char **argv) {
default_worker_main(argc, argv);
return 0;
}

View file

@ -117,14 +117,17 @@ Ray provides Python, Java, and *EXPERIMENTAL* C++ API. And Ray uses Tasks (funct
| The C++ Ray API is currently experimental with limited support. You can track its development `here <https://github.com/ray-project/ray/milestone/17>`__ and report issues on GitHub.
| Run the following commands to get started:
| - Build ray from source with *bazel* as shown `here <https://docs.ray.io/en/master/development.html#building-ray-full>`__.
| - Run `"cd ray/cpp"`.
| - Run `"cp dev_BUILD.bazel BUILD.bazel"`.
| - Modify `src/ray/example/example.cc`.
| - Modify `cpp/example/example.cc`.
| - Run `"bazel build //cpp:example"`.
| Option 1:, run the example directly with a dynamic library path. It will start a Ray cluster automatically.
| - Run `"ray stop"`.
| - Run `"bazel build //cpp:all"`.
| - Run `"bazel run //cpp:example"`.
| - Run `"./bazel-bin/cpp/example/example --dynamic-library-path=bazel-bin/cpp/example/example.so"`
| Option 2: connect to an existing Ray cluster with a known redis address (e.g. `127.0.0.1:6379`).
| - Run `"ray stop"`.
| - Run `"ray start --head --port 6379 --redis-password 5241590000000000 --node-manager-port 62665"`.
| - Run `"./bazel-bin/cpp/example/example --dynamic-library-path=bazel-bin/cpp/example/example.so --redis-address=127.0.0.1:6379"`.
.. literalinclude:: ../../cpp/src/ray/example/example.cc
.. literalinclude:: ../../cpp/example/example.cc
:language: cpp
You can also get started by visiting our `Tutorials <https://github.com/ray-project/tutorial>`_. For the latest wheels (nightlies), see the `installation page <installation.html>`__.

View file

@ -1580,13 +1580,9 @@ def build_cpp_worker_command(
The command string for starting CPP worker.
"""
# TODO(Guyang Song): Remove the arg is_default_worker.
# See `cluster_mode_test.cc` for why this workaround is currently needed
# for C++ workers.
command = [
DEFAULT_WORKER_EXECUTABLE, plasma_store_name, raylet_name,
str(node_manager_port), redis_address, redis_password, session_dir,
"is_default_worker"
str(node_manager_port), redis_address, redis_password, session_dir
]
return command