[C++ worker] set native library path for shared library search (#19376)

This commit is contained in:
Guyang Song 2021-10-18 16:03:49 +08:00 committed by GitHub
parent 1047914ee0
commit c04fb62f1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 69 additions and 17 deletions

View file

@ -79,15 +79,12 @@ genrule(
outs = ["ray_cpp_pkg.out"],
cmd = """
WORK_DIR="$$(pwd)" &&
mkdir -p "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
cp -f $(location default_worker) "$$WORK_DIR/python/ray/core/src/ray/cpp/default_worker" &&
cp -f $(locations ray_api) "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
cp -f $(locations libray_api.so) "$$WORK_DIR/python/ray/core/src/ray/cpp/" &&
PY_CPP_DIR="$$WORK_DIR/python/ray/cpp" &&
rm -rf $$PY_CPP_DIR &&
BOOST_DIR="$$PY_CPP_DIR/include/boost/" &&
mkdir -p "$$BOOST_DIR" &&
mkdir -p "$$PY_CPP_DIR/lib/" &&
cp -f $(location default_worker) "$$PY_CPP_DIR/" &&
cp -f -r $$WORK_DIR/external/msgpack/include/* "$$PY_CPP_DIR/include" &&
cp -f -r "$$WORK_DIR/external/boost/boost/archive" "$$BOOST_DIR" &&
cp -f -r "$$WORK_DIR/external/boost/boost/assert" "$$BOOST_DIR" &&

View file

@ -4,10 +4,7 @@ ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE:-$0}")" || exit; pwd)"
bazel --nosystem_rc --nohome_rc build //:example
if [[ "$OSTYPE" == "darwin"* ]]; then
# TODO: We use `install_name_tool` to walk around the issue of dynamic libraries loading in macOS.
install_name_tool -change bazel-out/darwin-opt/bin/cpp/libray_api.so ./thirdparty/lib/libray_api.so ./bazel-bin/example.so
install_name_tool -change bazel-out/darwin-opt/bin/cpp/libray_api.so ./thirdparty/lib/libray_api.so ./bazel-bin/example
"${ROOT_DIR}"/bazel-bin/example
DYLD_LIBRARY_PATH="$ROOT_DIR/thirdparty/lib" "${ROOT_DIR}"/bazel-bin/example
else
LD_LIBRARY_PATH="$ROOT_DIR/thirdparty/lib" "${ROOT_DIR}"/bazel-bin/example
fi

View file

@ -52,7 +52,10 @@ GCS_SERVER_EXECUTABLE = os.path.join(
# Location of the cpp default worker executables.
DEFAULT_WORKER_EXECUTABLE = os.path.join(RAY_PATH,
"core/src/ray/cpp/default_worker")
"cpp/default_worker" + EXE_SUFFIX)
# Location of the native libraries.
DEFAULT_NATIVE_LIBRARY_PATH = os.path.join(RAY_PATH, "cpp/lib")
DASHBOARD_DEPENDENCY_ERROR_MESSAGE = (
"Not all Ray Dashboard dependencies were "
@ -1557,6 +1560,7 @@ def start_raylet(redis_address,
f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}", # noqa
f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}", # noqa
f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}", # noqa
f"--native_library_path={DEFAULT_NATIVE_LIBRARY_PATH}",
f"--redis_password={redis_password or ''}",
f"--temp_dir={temp_dir}",
f"--session_dir={session_dir}",

View file

@ -141,7 +141,7 @@ if BUILD_JAVA or os.path.exists(
ray_files.append("ray/jars/ray_dist.jar")
if setup_spec.type == SetupType.RAY_CPP:
setup_spec.files_to_include += ["ray/core/src/ray/cpp/default_worker"]
setup_spec.files_to_include += ["ray/cpp/default_worker" + exe_suffix]
# C++ API library and project template files.
setup_spec.files_to_include += [
os.path.join(dirpath, filename)

View file

@ -48,6 +48,8 @@ DEFINE_string(python_worker_command, "", "Python worker command.");
DEFINE_string(java_worker_command, "", "Java worker command.");
DEFINE_string(agent_command, "", "Dashboard agent command.");
DEFINE_string(cpp_worker_command, "", "CPP worker command.");
DEFINE_string(native_library_path, "",
"The native library path which includes the core libraries.");
DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_string(temp_dir, "", "Temporary directory.");
DEFINE_string(session_dir, "", "The path of this ray session directory.");
@ -94,6 +96,7 @@ int main(int argc, char *argv[]) {
const std::string java_worker_command = FLAGS_java_worker_command;
const std::string agent_command = FLAGS_agent_command;
const std::string cpp_worker_command = FLAGS_cpp_worker_command;
const std::string native_library_path = FLAGS_native_library_path;
const std::string redis_password = FLAGS_redis_password;
const std::string temp_dir = FLAGS_temp_dir;
const std::string session_dir = FLAGS_session_dir;
@ -187,6 +190,7 @@ int main(int argc, char *argv[]) {
node_manager_config.worker_commands.emplace(
make_pair(ray::Language::CPP, ParseCommandLine(cpp_worker_command)));
}
node_manager_config.native_library_path = native_library_path;
if (python_worker_command.empty() && java_worker_command.empty() &&
cpp_worker_command.empty()) {
RAY_LOG(FATAL) << "At least one of Python/Java/CPP worker command "

View file

@ -182,7 +182,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
config.num_workers_soft_limit, config.num_initial_python_workers_for_first_job,
config.maximum_startup_concurrency, config.min_worker_port,
config.max_worker_port, config.worker_ports, gcs_client_,
config.worker_commands,
config.worker_commands, config.native_library_path,
/*starting_worker_timeout_callback=*/
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
config.ray_debugger_external,

View file

@ -80,6 +80,8 @@ struct NodeManagerConfig {
int maximum_startup_concurrency;
/// The commands used to start the worker process, grouped by language.
WorkerCommandMap worker_commands;
/// The native library path which includes the core libraries.
std::string native_library_path;
/// The command used to start agent.
std::string agent_command;
/// The time between reports resources in milliseconds.

View file

@ -64,6 +64,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id
int max_worker_port, const std::vector<int> &worker_ports,
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands,
const std::string &native_library_path,
std::function<void()> starting_worker_timeout_callback,
int ray_debugger_external, const std::function<double()> get_time)
: worker_startup_token_counter_(0),
@ -73,6 +74,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id
num_workers_soft_limit_(num_workers_soft_limit),
maximum_startup_concurrency_(maximum_startup_concurrency),
gcs_client_(std::move(gcs_client)),
native_library_path_(native_library_path),
starting_worker_timeout_callback_(starting_worker_timeout_callback),
ray_debugger_external(ray_debugger_external),
first_job_registered_python_worker_count_(0),
@ -229,6 +231,7 @@ Process WorkerPool::StartWorkerProcess(
std::vector<std::string> options;
// Append Ray-defined per-job options here
std::string code_search_path;
if (language == Language::JAVA || language == Language::CPP) {
if (job_config) {
std::string code_search_path_str;
@ -240,6 +243,7 @@ Process WorkerPool::StartWorkerProcess(
code_search_path_str += path;
}
if (!code_search_path_str.empty()) {
code_search_path = code_search_path_str;
if (language == Language::JAVA) {
code_search_path_str = "-Dray.job.code-search-path=" + code_search_path_str;
} else if (language == Language::CPP) {
@ -324,6 +328,32 @@ Process WorkerPool::StartWorkerProcess(
env.emplace(kEnvVarKeyJobId, job_id.Hex());
}
// TODO(SongGuyang): Maybe Python and Java also need native library path in future.
if (language == Language::CPP) {
// Set native library path for shared library search.
if (!native_library_path_.empty() || !code_search_path.empty()) {
#if defined(__APPLE__) || defined(__linux__) || defined(_WIN32)
#if defined(__APPLE__)
static const std::string kLibraryPathEnvName = "DYLD_LIBRARY_PATH";
#elif defined(__linux__)
static const std::string kLibraryPathEnvName = "LD_LIBRARY_PATH";
#elif defined(_WIN32)
static const std::string kLibraryPathEnvName = "PATH";
#endif
auto path_env_p = std::getenv(kLibraryPathEnvName.c_str());
std::string path_env = native_library_path_;
if (path_env_p != nullptr && strlen(path_env_p) != 0) {
path_env.append(":").append(path_env_p);
}
// Append per-job code search path to library path.
if (!code_search_path.empty()) {
path_env.append(":").append(code_search_path);
}
env.emplace(kLibraryPathEnvName, path_env);
#endif
}
}
if (language == Language::PYTHON || language == Language::JAVA) {
if (serialized_runtime_env != "{}" && serialized_runtime_env != "") {
worker_command_args.push_back("--serialized-runtime-env=" + serialized_runtime_env);
@ -445,12 +475,25 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_args,
const ProcessEnvironment &env) {
if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting worker process with command:";
std::string debug_info;
debug_info.append("Starting worker process with command:");
for (const auto &arg : worker_command_args) {
stream << " " << arg;
debug_info.append(" ").append(arg);
}
RAY_LOG(DEBUG) << stream.str();
debug_info.append(", and the envs:");
for (const auto &entry : env) {
debug_info.append(" ")
.append(entry.first)
.append(":")
.append(entry.second)
.append(",");
}
if (!env.empty()) {
// Erase the last ","
debug_info.pop_back();
}
debug_info.append(".");
RAY_LOG(DEBUG) << debug_info;
}
// Launch the process to create the worker.

View file

@ -156,6 +156,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// on. This takes precedence over min_worker_port and max_worker_port.
/// \param worker_commands The commands used to start the worker process, grouped by
/// language.
/// \param native_library_path The native library path which includes the core
/// libraries.
/// \param starting_worker_timeout_callback The callback that will be triggered once
/// it times out to start a worker.
/// \param ray_debugger_external Ray debugger in workers will be started in a way
@ -168,6 +170,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
const std::vector<int> &worker_ports,
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands,
const std::string &native_library_path,
std::function<void()> starting_worker_timeout_callback,
int ray_debugger_external, const std::function<double()> get_time);
@ -624,6 +627,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
int node_manager_port_ = 0;
/// A client connection to the GCS.
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// The native library path which includes the core libraries.
std::string native_library_path_;
/// The callback that will be triggered once it times out to start a worker.
std::function<void()> starting_worker_timeout_callback_;
/// The callback that will be triggered when a runtime_env setup for a task fails.

View file

@ -105,8 +105,8 @@ class WorkerPoolMock : public WorkerPool {
&mock_worker_rpc_clients)
: WorkerPool(
io_service, NodeID::FromRandom(), "", POOL_SIZE_SOFT_LIMIT, 0,
MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr, worker_commands, []() {}, 0,
[this]() { return current_time_ms_; }),
MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr, worker_commands, "", []() {},
0, [this]() { return current_time_ms_; }),
last_worker_process_(),
instrumented_io_service_(io_service),
error_message_type_(1),