[core][1/2] Resubscribe when GCS restarts for raylet. (#24628)

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this solves. -->
This PR fixes the path to resubscribe to GCS when GCS restarts.

When GCS restarts, it'll lose all subscription information since everything is stored in memory. Then in the runtime, we need to tell GCS what's currently being subscribed.

The previous method:
- We'll have a thread in core worker/raylet to check whether the GCS restarted or not.
- If it restarted, we'll send resubscribe request to GCS.

However, this is not working in these cases:
- GCS restarts happen so fast so the checker in raylet/core worker missed them.
- GCS doesn't restart, but just being lag due to network issues then, the resubscribe is not necessary.

Actually, GCS knows when a resubscribe is needed: when it restarts. So the PR here is to send a resubscribe request from GCS -> Raylet and Raylet will do the resubscription.

There are two parts for this to work:

- [x] raylet send resubscription
- [ ] raylet ask core worker to send resubscription
This commit is contained in:
Yi Cheng 2022-05-15 18:25:58 -07:00 committed by GitHub
parent 3f5d870541
commit 6df45f0978
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 193 additions and 6 deletions

View file

@ -3,7 +3,10 @@ import sys
import ray
import ray._private.gcs_utils as gcs_utils
import pytest
import psutil
from time import sleep
from ray._private.test_utils import (
generate_system_config_map,
wait_for_condition,
@ -155,6 +158,46 @@ def test_node_failure_detector_when_gcs_server_restart(
wait_for_condition(condition, timeout=10)
@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
generate_system_config_map(
num_heartbeats_timeout=20, gcs_rpc_server_reconnect_timeout_s=60
)
],
indirect=True,
)
def test_actor_raylet_resubscription(ray_start_regular_with_external_redis):
# stat an actor
@ray.remote
class A:
def ready(self):
import os
return os.getpid()
actor = A.options(name="abc", max_restarts=0).remote()
pid = ray.get(actor.ready.remote())
print("actor is ready and kill gcs")
ray.worker._global_node.kill_gcs_server()
print("make actor exit")
import psutil
p = psutil.Process(pid)
p.kill()
from time import sleep
sleep(1)
print("start gcs")
ray.worker._global_node.start_gcs_server()
print("try actor method again")
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.ready.remote())
@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
@ -191,6 +234,66 @@ def test_del_actor_after_gcs_server_restart(ray_start_regular_with_external_redi
ray.get_actor("abc")
@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
generate_system_config_map(
num_heartbeats_timeout=20, gcs_rpc_server_reconnect_timeout_s=60
)
],
indirect=True,
)
def test_raylet_resubscription(tmp_path, ray_start_regular_with_external_redis):
# This test is to make sure resubscription in raylet is working.
# When subscription failed, raylet will not get worker failure error
# and thus, it won't kill the worker which is fate sharing with the failed
# one.
@ray.remote
def long_run():
from time import sleep
print("LONG_RUN")
import os
(tmp_path / "long_run.pid").write_text(str(os.getpid()))
sleep(10000)
@ray.remote
def bar():
import os
return (
os.getpid(),
# Use runtime env to make sure task is running in a different
# ray worker
long_run.options(runtime_env={"env_vars": {"P": ""}}).remote(),
)
(pid, obj_ref) = ray.get(bar.remote())
long_run_pid = None
def condition():
nonlocal long_run_pid
long_run_pid = int((tmp_path / "long_run.pid").read_text())
return True
wait_for_condition(condition, timeout=5)
# kill the gcs
ray.worker._global_node.kill_gcs_server()
# then kill the owner
p = psutil.Process(pid)
p.kill()
ray.worker._global_node.start_gcs_server()
# The long_run_pid should exit
wait_for_pid_to_exit(long_run_pid, 5)
@pytest.mark.parametrize("auto_reconnect", [True, False])
def test_gcs_client_reconnect(ray_start_regular_with_external_redis, auto_reconnect):
gcs_address = ray.worker.global_worker.gcs_client.address

View file

@ -145,6 +145,12 @@ class MockNodeManager : public NodeManager {
rpc::GetSystemConfigReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleNotifyGCSRestart,
(const rpc::NotifyGCSRestartRequest &request,
rpc::NotifyGCSRestartReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleGetGcsServerAddress,
(const rpc::GetGcsServerAddressRequest &request,

View file

@ -215,6 +215,10 @@ class MockRayletClientInterface : public RayletClientInterface {
GetResourceLoad,
(const rpc::ClientCallback<rpc::GetResourceLoadReply> &callback),
(override));
MOCK_METHOD(void,
NotifyGCSRestart,
(const rpc::ClientCallback<rpc::NotifyGCSRestartReply> &callback),
(override));
MOCK_METHOD(void,
ShutdownRaylet,
(const NodeID &node_id,

View file

@ -83,6 +83,14 @@ Status GcsClient::Connect(instrumented_io_context &io_service) {
gcs_rpc_client_ = std::make_shared<rpc::GcsRpcClient>(
options_.gcs_address_, options_.gcs_port_, *client_call_manager_);
resubscribe_func_ = [this]() {
job_accessor_->AsyncResubscribe();
actor_accessor_->AsyncResubscribe();
node_accessor_->AsyncResubscribe();
node_resource_accessor_->AsyncResubscribe();
worker_accessor_->AsyncResubscribe();
};
rpc::Address gcs_address;
gcs_address.set_ip_address(options_.gcs_address_);
gcs_address.set_port(options_.gcs_port_);

View file

@ -88,6 +88,13 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
/// Return client information for debug.
virtual std::string DebugString() const { return ""; }
/// Resubscribe to GCS to recover from a GCS failure.
void AsyncResubscribe() {
if (resubscribe_func_ != nullptr) {
resubscribe_func_();
}
}
/// Get the sub-interface for accessing actor information in GCS.
/// This function is thread safe.
ActorInfoAccessor &Actors() {
@ -173,6 +180,7 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
// Gcs rpc client
std::shared_ptr<rpc::GcsRpcClient> gcs_rpc_client_;
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
std::function<void()> resubscribe_func_;
};
} // namespace gcs

View file

@ -253,12 +253,26 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id) {
}
void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) {
for (const auto &item : gcs_init_data.Nodes()) {
if (item.second.state() == rpc::GcsNodeInfo::ALIVE) {
AddNode(std::make_shared<rpc::GcsNodeInfo>(item.second));
} else if (item.second.state() == rpc::GcsNodeInfo::DEAD) {
dead_nodes_.emplace(item.first, std::make_shared<rpc::GcsNodeInfo>(item.second));
sorted_dead_node_list_.emplace_back(item.first, item.second.timestamp());
for (const auto &[node_id, node_info] : gcs_init_data.Nodes()) {
if (node_info.state() == rpc::GcsNodeInfo::ALIVE) {
AddNode(std::make_shared<rpc::GcsNodeInfo>(node_info));
// Ask the raylet to do initialization in case of GCS restart.
// The protocol is correct because when a new node joined, Raylet will do:
// - RegisterNode (write node to the node table)
// - Setup subscription
// With this, it means we only need to ask the node registered to do resubscription.
// And for the node failed to register, they will crash on the client side due to
// registeration failure.
rpc::Address remote_address;
remote_address.set_raylet_id(node_info.node_id());
remote_address.set_ip_address(node_info.node_manager_address());
remote_address.set_port(node_info.node_manager_port());
auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(remote_address);
raylet_client->NotifyGCSRestart(nullptr);
} else if (node_info.state() == rpc::GcsNodeInfo::DEAD) {
dead_nodes_.emplace(node_id, std::make_shared<rpc::GcsNodeInfo>(node_info));
sorted_dead_node_list_.emplace_back(node_id, node_info.timestamp());
}
}
sorted_dead_node_list_.sort(

View file

@ -291,6 +291,9 @@ struct GcsServerMocker {
bool graceful,
const rpc::ClientCallback<rpc::ShutdownRayletReply> &callback) override{};
void NotifyGCSRestart(
const rpc::ClientCallback<rpc::NotifyGCSRestartReply> &callback) override{};
~MockRayletClient() {}
int num_workers_requested = 0;

View file

@ -315,10 +315,16 @@ message GetResourceLoadReply {
ResourcesData resources = 1;
}
message NotifyGCSRestartRequest {}
message NotifyGCSRestartReply {}
// Service for inter-node-manager communication.
service NodeManagerService {
// Update the node's view of the cluster resource usage.
rpc UpdateResourceUsage(UpdateResourceUsageRequest) returns (UpdateResourceUsageReply);
// Handle the case when GCS restarted.
rpc NotifyGCSRestart(NotifyGCSRestartRequest) returns (NotifyGCSRestartReply);
// Get the resource load of the raylet.
rpc GetResourceLoad(GetResourceLoadRequest) returns (GetResourceLoadReply);
// Request the current resource usage from this raylet

View file

@ -1075,6 +1075,13 @@ void NodeManager::ResourceDeleted(const NodeID &node_id,
return;
}
void NodeManager::HandleNotifyGCSRestart(const rpc::NotifyGCSRestartRequest &request,
rpc::NotifyGCSRestartReply *reply,
rpc::SendReplyCallback send_reply_callback) {
gcs_client_->AsyncResubscribe();
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::UpdateResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resource_data) {
if (!cluster_resource_scheduler_->GetClusterResourceManager().UpdateNode(

View file

@ -601,6 +601,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::GetObjectsInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `HandleGCSRestart` request
void HandleNotifyGCSRestart(const rpc::NotifyGCSRestartRequest &request,
rpc::NotifyGCSRestartReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Trigger local GC on each worker of this raylet.
void DoLocalGC(bool triggered_by_global_gc = false);

View file

@ -517,6 +517,12 @@ void raylet::RayletClient::GetResourceLoad(
grpc_client_->GetResourceLoad(request, callback);
}
void raylet::RayletClient::NotifyGCSRestart(
const rpc::ClientCallback<rpc::NotifyGCSRestartReply> &callback) {
rpc::NotifyGCSRestartRequest request;
grpc_client_->NotifyGCSRestart(request, callback);
}
void raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id,
const rpc::Address &owner_address) {
flatbuffers::FlatBufferBuilder fbb;

View file

@ -188,6 +188,9 @@ class RayletClientInterface : public PinObjectsInterface,
virtual void GetGcsServerAddress(
const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback) = 0;
virtual void NotifyGCSRestart(
const rpc::ClientCallback<rpc::NotifyGCSRestartReply> &callback) = 0;
virtual void ShutdownRaylet(
const NodeID &node_id,
bool graceful,
@ -468,6 +471,9 @@ class RayletClient : public RayletClientInterface {
void GetResourceLoad(
const rpc::ClientCallback<rpc::GetResourceLoadReply> &callback) override;
void NotifyGCSRestart(
const rpc::ClientCallback<rpc::NotifyGCSRestartReply> &callback) override;
// Subscribe to receive notification on plasma object
void SubscribeToPlasma(const ObjectID &object_id, const rpc::Address &owner_address);

View file

@ -97,6 +97,12 @@ class NodeManagerWorkerClient
grpc_client_,
/*method_timeout_ms*/ -1, )
/// Get a resource load
VOID_RPC_CLIENT_METHOD(NodeManagerService,
NotifyGCSRestart,
grpc_client_,
/*method_timeout_ms*/ -1, )
/// Request a worker lease.
VOID_RPC_CLIENT_METHOD(NodeManagerService,
RequestWorkerLease,

View file

@ -28,6 +28,7 @@ namespace rpc {
RPC_SERVICE_HANDLER(NodeManagerService, UpdateResourceUsage, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, RequestResourceReport, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GetResourceLoad, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, NotifyGCSRestart, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, ReportWorkerBacklog, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker, -1) \
@ -75,6 +76,10 @@ class NodeManagerServiceHandler {
rpc::GetResourceLoadReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;
virtual void HandleNotifyGCSRestart(const rpc::NotifyGCSRestartRequest &request,
rpc::NotifyGCSRestartReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;
virtual void HandleRequestWorkerLease(const RequestWorkerLeaseRequest &request,
RequestWorkerLeaseReply *reply,
SendReplyCallback send_reply_callback) = 0;