Publish actor state PENDING_CREATION for dashboard showing. (#18666)

This commit is contained in:
Qing Wang 2021-09-18 15:44:58 +08:00 committed by GitHub
parent 948508efb8
commit 6f1d3f94db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 7 deletions

View file

@ -239,18 +239,17 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard):
msgs.append(actor_data) msgs.append(actor_data)
msgs = [] msgs = []
handle_pub_messages(p, msgs, timeout, 2) handle_pub_messages(p, msgs, timeout, 3)
# Assert we received published actor messages with state # Assert we received published actor messages with state
# DEPENDENCIES_UNREADY and ALIVE. # DEPENDENCIES_UNREADY, PENDING_CREATION and ALIVE.
assert len(msgs) == 2 assert len(msgs) == 3
# Kill actor. # Kill actor.
ray.kill(a) ray.kill(a)
handle_pub_messages(p, msgs, timeout, 3) handle_pub_messages(p, msgs, timeout, 4)
# Assert we received published actor messages with state DEAD. # Assert we received published actor messages with state DEAD.
assert len(msgs) == 3 assert len(msgs) == 4
def actor_table_data_to_dict(message): def actor_table_data_to_dict(message):
return dashboard_utils.message_to_dict( return dashboard_utils.message_to_dict(
@ -262,9 +261,10 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard):
including_default_value_fields=False) including_default_value_fields=False)
non_state_keys = ("actorId", "jobId", "taskSpec") non_state_keys = ("actorId", "jobId", "taskSpec")
for msg in msgs: for msg in msgs:
actor_data_dict = actor_table_data_to_dict(msg) actor_data_dict = actor_table_data_to_dict(msg)
# DEPENDENCIES_UNREADY is 0, which would not be keeped in dict. We # DEPENDENCIES_UNREADY is 0, which would not be kept in dict. We
# need check its original value. # need check its original value.
if msg.state == 0: if msg.state == 0:
assert len(actor_data_dict) > 5 assert len(actor_data_dict) > 5
@ -277,6 +277,12 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard):
"state", "address", "timestamp", "pid", "state", "address", "timestamp", "pid",
"creationTaskException", "rayNamespace" "creationTaskException", "rayNamespace"
} }
elif actor_data_dict["state"] == "PENDING_CREATION":
assert actor_data_dict.keys() == {
"state", "address", "actorId", "actorCreationDummyObjectId",
"jobId", "ownerAddress", "taskSpec", "className",
"serializedRuntimeEnv", "rayNamespace"
}
else: else:
raise Exception("Unknown state: {}".format( raise Exception("Unknown state: {}".format(
actor_data_dict["state"])) actor_data_dict["state"]))

View file

@ -476,6 +476,10 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request,
auto actor = auto actor =
std::make_shared<GcsActor>(request.task_spec(), get_ray_namespace_(job_id)); std::make_shared<GcsActor>(request.task_spec(), get_ray_namespace_(job_id));
actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::PENDING_CREATION); actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::PENDING_CREATION);
const auto &actor_table_data = actor->GetActorTableData();
// Pub this state for dashboard showing.
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(), nullptr));
RemoveUnresolvedActor(actor); RemoveUnresolvedActor(actor);
// Update the registered actor as its creation task specification may have changed due // Update the registered actor as its creation task specification may have changed due