[GCS]only update states related fields when publish actor table data (#13448)

This commit is contained in:
Tao Wang 2021-01-28 11:12:57 +08:00 committed by GitHub
parent cb95ff1e56
commit 56ee6ef55f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 121 additions and 16 deletions

View file

@ -221,15 +221,25 @@ class StatsCollector(dashboard_utils.DashboardHeadModule):
RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS)
# Receive actors from channel.
state_keys = ("state", "address", "numRestarts", "timestamp", "pid")
async for sender, msg in receiver.iter():
try:
_, actor_table_data = msg
actor_id, actor_table_data = msg
pubsub_message = ray.gcs_utils.PubSubMessage.FromString(
actor_table_data)
message = ray.gcs_utils.ActorTableData.FromString(
pubsub_message.data)
actor_table_data = actor_table_data_to_dict(message)
_process_actor_table_data(actor_table_data)
# If actor is not new registered but updated, we only update
# states related fields.
if actor_table_data["state"] != "DEPENDENCIES_UNREADY":
actor_id = actor_id.decode("UTF-8")[len(
ray.gcs_utils.TablePrefix_ACTOR_string + ":"):]
actor_table_data_copy = dict(DataSource.actors[actor_id])
for k in state_keys:
actor_table_data_copy[k] = actor_table_data[k]
actor_table_data = actor_table_data_copy
actor_id = actor_table_data["actorId"]
job_id = actor_table_data["jobId"]
node_id = actor_table_data["address"]["rayletId"]

View file

@ -7,9 +7,12 @@ import traceback
import random
import pytest
import ray
import redis
import threading
import ray.new_dashboard.modules.stats_collector.stats_collector_consts \
as stats_collector_consts
import ray.new_dashboard.utils as dashboard_utils
import ray.ray_constants as ray_constants
from datetime import datetime, timedelta
from ray.cluster_utils import Cluster
from ray.new_dashboard.tests.conftest import * # noqa
@ -417,5 +420,85 @@ def test_nil_node(enable_test_module, disable_aiohttp_cache,
raise Exception(f"Timed out while testing, {ex_stack}")
def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard):
timeout = 5
assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])
is True)
address_info = ray_start_with_dashboard
address = address_info["redis_address"]
address = address.split(":")
assert len(address) == 2
client = redis.StrictRedis(
host=address[0],
port=int(address[1]),
password=ray_constants.REDIS_DEFAULT_PASSWORD)
p = client.pubsub(ignore_subscribe_messages=True)
p.psubscribe(ray.gcs_utils.RAY_ACTOR_PUBSUB_PATTERN)
@ray.remote
class DummyActor:
def __init__(self):
pass
# Create a dummy actor.
a = DummyActor.remote()
def handle_pub_messages(client, msgs, timeout, expect_num):
start_time = time.time()
while time.time() - start_time < timeout and len(msgs) < expect_num:
msg = client.get_message()
if msg is None:
time.sleep(0.01)
continue
pubsub_msg = ray.gcs_utils.PubSubMessage.FromString(msg["data"])
actor_data = ray.gcs_utils.ActorTableData.FromString(
pubsub_msg.data)
msgs.append(actor_data)
msgs = []
handle_pub_messages(p, msgs, timeout, 2)
# Assert we received published actor messages with state
# DEPENDENCIES_UNREADY and ALIVE.
assert len(msgs) == 2
# Kill actor.
ray.kill(a)
handle_pub_messages(p, msgs, timeout, 3)
# Assert we received published actor messages with state DEAD.
assert len(msgs) == 3
def actor_table_data_to_dict(message):
return dashboard_utils.message_to_dict(
message, {
"actorId", "parentId", "jobId", "workerId", "rayletId",
"actorCreationDummyObjectId", "callerId", "taskId",
"parentTaskId", "sourceActorId", "placementGroupId"
},
including_default_value_fields=False)
non_state_keys = ("actorId", "jobId", "taskSpec")
for msg in msgs:
actor_data_dict = actor_table_data_to_dict(msg)
# DEPENDENCIES_UNREADY is 0, which would not be keeped in dict. We
# need check its original value.
if msg.state == 0:
assert len(actor_data_dict) > 5
for k in non_state_keys:
assert k in actor_data_dict
# For status that is not DEPENDENCIES_UNREADY, only states fields will
# be published.
elif actor_data_dict["state"] in ("ALIVE", "DEAD"):
assert actor_data_dict.keys() == {
"state", "address", "timestamp", "pid"
}
else:
raise Exception("Unknown state: {}".format(
actor_data_dict["state"]))
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -275,7 +275,7 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
ActorTableData actor_data;
actor_data.ParseFromString(data);
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
subscribe(ActorID::FromHex(id), actor_data);
};
return client_impl_->GetGcsPubSub().Subscribe(ACTOR_CHANNEL, actor_id.Hex(),
on_subscribe, subscribe_done);

View file

@ -207,7 +207,7 @@ void ServiceBasedGcsClient::ReconnectGcsServer() {
RAY_LOG(INFO)
<< "Repeated reconnection in "
<< RayConfig::instance().minimum_gcs_reconnect_interval_milliseconds()
<< "milliseconds, return directly.";
<< " milliseconds, return directly.";
return;
}

View file

@ -503,9 +503,9 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(), *actor_table_data,
[this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(*actor_table_data)->SerializeAsString(), nullptr));
// Destroy placement group owned by this actor.
destroy_owned_placement_group_if_needed_(actor_id);
}));
@ -677,7 +677,6 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
// between memory cache and storage.
mutable_actor_table_data->set_num_restarts(num_restarts + 1);
mutable_actor_table_data->set_state(rpc::ActorTableData::RESTARTING);
const auto actor_table_data = actor->GetActorTableData();
// Make sure to reset the address before flushing to GCS. Otherwise,
// GCS will mistakenly consider this lease request succeeds when restarting.
actor->UpdateAddress(rpc::Address());
@ -685,10 +684,11 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
[this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(),
nullptr));
[this, actor_id, mutable_actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(*mutable_actor_table_data)->SerializeAsString(),
nullptr));
}));
gcs_actor_scheduler_->Schedule(actor);
} else {
@ -701,6 +701,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
}
mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD);
mutable_actor_table_data->set_timestamp(current_sys_time_ms());
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
@ -713,7 +714,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
mutable_actor_table_data->SerializeAsString(), nullptr));
GenActorDataOnlyWithStates(*mutable_actor_table_data)->SerializeAsString(),
nullptr));
}));
// The actor is dead, but we should not remove the entry from the
// registered actors yet. If the actor is owned, we will destroy the actor
@ -754,9 +756,9 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &ac
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, actor_table_data,
[this, actor_id, actor_table_data, actor](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(actor_table_data)->SerializeAsString(), nullptr));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_create_callbacks_.

View file

@ -316,7 +316,6 @@ class GcsActorManager : public rpc::ActorInfoHandler {
absl::flat_hash_set<ActorID> GetUnresolvedActorsByOwnerWorker(
const NodeID &node_id, const WorkerID &worker_id) const;
private:
/// Reconstruct the specified actor.
///
/// \param actor The target actor to be reconstructed.
@ -346,6 +345,17 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param actor The actor to be killed.
void AddDestroyedActorToCache(const std::shared_ptr<GcsActor> &actor);
std::shared_ptr<rpc::ActorTableData> GenActorDataOnlyWithStates(
const rpc::ActorTableData &actor) {
auto actor_delta = std::make_shared<rpc::ActorTableData>();
actor_delta->set_state(actor.state());
actor_delta->mutable_address()->CopyFrom(actor.address());
actor_delta->set_num_restarts(actor.num_restarts());
actor_delta->set_timestamp(actor.timestamp());
actor_delta->set_pid(actor.pid());
return actor_delta;
}
/// Callbacks of pending `RegisterActor` requests.
/// Maps actor ID to actor registration callbacks, which is used to filter duplicated
/// messages from a driver/worker caused by some network problems.