[Core] synchronize job config to worker when it registers to raylet (#13402)

This commit is contained in:
ZhuSenlin 2021-02-23 11:48:54 +08:00 committed by GitHub
parent fcd0dee581
commit 8e0b2d07f4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 92 additions and 13 deletions

View file

@ -55,6 +55,7 @@ from ray.includes.common cimport (
CPlacementStrategy,
CRayFunction,
CWorkerType,
CJobConfig,
move,
LANGUAGE_CPP,
LANGUAGE_JAVA,
@ -1649,6 +1650,13 @@ cdef class CoreWorker:
resource_name.encode("ascii"), capacity,
CNodeID.FromBinary(client_id.binary()))
def get_job_config(self):
cdef CJobConfig c_job_config = \
CCoreWorkerProcess.GetCoreWorker().GetJobConfig()
job_config = ray.gcs_utils.JobConfig()
job_config.ParseFromString(c_job_config.SerializeAsString())
return job_config
cdef void async_callback(shared_ptr[CRayObject] obj,
CObjectID object_ref,
void *user_callback) with gil:

View file

@ -279,3 +279,7 @@ cdef extern from "ray/gcs/gcs_client.h" nogil:
CGcsClientOptions(const c_string &ip, int port,
const c_string &password,
c_bool is_test_client)
cdef extern from "src/ray/protobuf/gcs.pb.h" nogil:
cdef cppclass CJobConfig "ray::rpc::JobConfig":
const c_string &SerializeAsString()

View file

@ -34,6 +34,7 @@ from ray.includes.common cimport (
CWorkerType,
CLanguage,
CGcsClientOptions,
CJobConfig,
)
from ray.includes.function_descriptor cimport (
CFunctionDescriptor,
@ -211,6 +212,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CNodeID &client_Id)
CRayStatus SpillObjects(const c_vector[CObjectID] &object_ids)
CJobConfig GetJobConfig()
cdef cppclass CCoreWorkerOptions "ray::CoreWorkerOptions":
CWorkerType worker_type
CLanguage language

View file

@ -785,6 +785,36 @@ def test_override_environment_variables_complex(shutdown_only):
}).remote("z")) == "job_z")
def test_sync_job_config(shutdown_only):
num_java_workers_per_process = 8
worker_env = {
"key": "value",
}
ray.init(
job_config=ray.job_config.JobConfig(
num_java_workers_per_process=num_java_workers_per_process,
worker_env=worker_env))
# Check that the job config is synchronized at the driver side.
job_config = ray.worker.global_worker.core_worker.get_job_config()
assert (job_config.num_java_workers_per_process ==
num_java_workers_per_process)
assert (job_config.worker_env == worker_env)
@ray.remote
def get_job_config():
job_config = ray.worker.global_worker.core_worker.get_job_config()
return job_config.SerializeToString()
# Check that the job config is synchronized at the worker side.
job_config = ray.gcs_utils.JobConfig()
job_config.ParseFromString(ray.get(get_job_config.remote()))
assert (job_config.num_java_workers_per_process ==
num_java_workers_per_process)
assert (job_config.worker_env == worker_env)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -334,12 +334,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
Status raylet_client_status;
NodeID local_raylet_id;
int assigned_port;
std::string serialized_job_config = options_.serialized_job_config;
std::string system_config;
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
options_.worker_type, worker_context_.GetCurrentJobID(), options_.language,
options_.node_ip_address, &raylet_client_status, &local_raylet_id, &assigned_port,
&system_config, options_.serialized_job_config));
&system_config, &serialized_job_config));
if (!raylet_client_status.ok()) {
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
@ -358,6 +359,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// NOTE(edoakes): any initialization depending on RayConfig must happen after this line.
RayConfig::instance().initialize(system_config);
// Parse job config from serialized string.
job_config_.reset(new rpc::JobConfig());
job_config_->ParseFromString(serialized_job_config);
// Start RPC server after all the task receivers are properly initialized and we have
// our assigned port from the raylet.
core_worker_server_ = std::unique_ptr<rpc::GrpcServer>(
@ -2661,4 +2667,6 @@ void CoreWorker::SetActorTitle(const std::string &title) {
actor_title_ = title;
}
const rpc::JobConfig &CoreWorker::GetJobConfig() const { return *job_config_; }
} // namespace ray

View file

@ -955,6 +955,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void GetAsync(const ObjectID &object_id, SetResultCallback success_callback,
void *python_future);
// Get serialized job configuration.
const rpc::JobConfig &GetJobConfig() const;
private:
void SetCurrentTaskId(const TaskID &task_id);
@ -1270,6 +1273,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
int64_t max_direct_call_object_size_;
friend class CoreWorkerTest;
std::unique_ptr<rpc::JobConfig> job_config_;
};
} // namespace ray

View file

@ -149,6 +149,8 @@ table RegisterClientReply {
port: int;
// Internal config options.
system_config: string;
// The config bytes of this job serialized with protobuf.
serialized_job_config: string;
}
table AnnounceWorkerPort {

View file

@ -1081,12 +1081,18 @@ void NodeManager::ProcessRegisterClientRequestMessage(
std::make_shared<Worker>(job_id, worker_id, language, worker_type,
worker_ip_address, client, client_call_manager_));
auto send_reply_callback = [this, client](Status status, int assigned_port) {
auto send_reply_callback = [this, client, job_id](Status status, int assigned_port) {
flatbuffers::FlatBufferBuilder fbb;
std::string serialized_job_config;
auto job_config = worker_pool_.GetJobConfig(job_id);
if (job_config != boost::none) {
serialized_job_config = (*job_config).SerializeAsString();
}
auto reply = ray::protocol::CreateRegisterClientReply(
fbb, status.ok(), fbb.CreateString(status.ToString()),
to_flatbuf(fbb, self_node_id_), assigned_port,
fbb.CreateString(initial_config_.raylet_config));
fbb.CreateString(initial_config_.raylet_config),
fbb.CreateString(serialized_job_config));
fbb.Finish(reply);
client->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::RegisterClientReply), fbb.GetSize(),

View file

@ -371,6 +371,13 @@ void WorkerPool::HandleJobFinished(const JobID &job_id) {
// unfinished_jobs_.erase(job_id);
}
boost::optional<const rpc::JobConfig &> WorkerPool::GetJobConfig(
const JobID &job_id) const {
auto iter = all_jobs_.find(job_id);
return iter == all_jobs_.end() ? boost::none
: boost::optional<const rpc::JobConfig &>(iter->second);
}
Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker,
pid_t pid,
std::function<void(Status, int)> send_reply_callback) {

View file

@ -135,6 +135,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// \return Void.
void HandleJobFinished(const JobID &job_id);
/// \brief Get the job config by job id.
///
/// \param job_id ID of the job.
/// \return Job config if given job is running, else nullptr.
boost::optional<const rpc::JobConfig &> GetJobConfig(const JobID &job_id) const;
/// Register a new worker. The Worker should be added by the caller to the
/// pool after it becomes idle (e.g., requests a work assignment).
///

View file

@ -86,11 +86,8 @@ raylet::RayletClient::RayletClient(
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, Status *status, NodeID *raylet_id, int *port,
std::string *system_config, const std::string &job_config)
: grpc_client_(std::move(grpc_client)),
worker_id_(worker_id),
job_id_(job_id),
job_config_(job_config) {
std::string *system_config, std::string *serialized_job_config)
: grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) {
// For C++14, we could use std::make_unique
conn_ = std::unique_ptr<raylet::RayletConnection>(
new raylet::RayletConnection(io_service, raylet_socket, -1, -1));
@ -100,7 +97,7 @@ raylet::RayletClient::RayletClient(
auto message = protocol::CreateRegisterClientRequest(
fbb, static_cast<int>(worker_type), to_flatbuf(fbb, worker_id), getpid(),
to_flatbuf(fbb, job_id), language, fbb.CreateString(ip_address), /*port=*/0,
fbb.CreateString(job_config_));
fbb.CreateString(*serialized_job_config));
fbb.Finish(message);
// Register the process ID with the raylet.
// NOTE(swang): If raylet exits and we are registered as a worker, we will get killed.
@ -127,6 +124,7 @@ raylet::RayletClient::RayletClient(
RAY_CHECK(system_config);
*system_config = reply_message->system_config()->str();
*serialized_job_config = reply_message->serialized_job_config()->str();
}
Status raylet::RayletClient::Disconnect() {

View file

@ -193,16 +193,19 @@ class RayletClient : public RayletClientInterface {
/// \param ip_address The IP address of the worker.
/// \param status This will be populated with the result of connection attempt.
/// \param raylet_id This will be populated with the local raylet's NodeID.
/// \param system_config This will be populated with internal config parameters
/// provided by the raylet.
/// \param port The port that the worker should listen on for gRPC requests. If
/// 0, the worker should choose a random port.
/// \param system_config This will be populated with internal config parameters
/// provided by the raylet.
/// \param serialized_job_config If this is a driver connection, the job config
/// provided by driver will be passed to Raylet. If this is a worker connection,
/// this will be populated with the current job config.
RayletClient(boost::asio::io_service &io_service,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, Status *status, NodeID *raylet_id,
int *port, std::string *system_config, const std::string &job_config);
int *port, std::string *system_config, std::string *serialized_job_config);
/// Connect to the raylet via grpc only.
///
@ -403,7 +406,6 @@ class RayletClient : public RayletClientInterface {
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client_;
const WorkerID worker_id_;
const JobID job_id_;
const std::string job_config_;
/// A map from resource name to the resource IDs that are currently reserved
/// for this worker. Each pair consists of the resource ID and the fraction