[Core] Fix get_actor consistency on ray.kill (#20178)

* Improve race condition on ray.kill

* Fix a bug.

* Fix a bug

* fix core worker test

* done
This commit is contained in:
SangBin Cho 2021-11-15 16:29:47 +09:00 committed by GitHub
parent 61778a952d
commit 477b6265d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 68 additions and 31 deletions

View file

@ -1311,7 +1311,6 @@ def test_get_actor_after_killed(shutdown_only):
actor = A.options(
name="actor", namespace="namespace", lifetime="detached").remote()
ray.kill(actor)
# This could be flaky due to our caching named actor mechanism.
with pytest.raises(ValueError):
ray.get_actor("actor", namespace="namespace")
@ -1324,12 +1323,6 @@ def test_get_actor_after_killed(shutdown_only):
assert ray.get(
ray.get_actor("actor_2", namespace="namespace").ready.remote())
# TODO(sang): This currently doesn't pass without time.sleep.
# ray.kill(actor, no_restart=False)
# # Now the actor is killed.
# with pytest.raises(ValueError):
# ray.get_actor("actor_2", namespace="namespace")
def test_get_actor_race_condition(shutdown_only):
@ray.remote

View file

@ -24,7 +24,8 @@ rpc::ActorHandle CreateInnerActorHandle(
const rpc::Address &owner_address, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries) {
const std::string &extension_data, int64_t max_task_retries, const std::string &name,
const std::string &ray_namespace) {
rpc::ActorHandle inner;
inner.set_actor_id(actor_id.Data(), actor_id.Size());
inner.set_owner_id(owner_id.Binary());
@ -36,6 +37,8 @@ rpc::ActorHandle CreateInnerActorHandle(
inner.set_actor_cursor(initial_cursor.Binary());
inner.set_extension_data(extension_data);
inner.set_max_task_retries(max_task_retries);
inner.set_name(name);
inner.set_ray_namespace(ray_namespace);
return inner;
}
@ -61,6 +64,9 @@ rpc::ActorHandle CreateInnerActorHandleFromActorTableData(
actor_table_data.task_spec().actor_creation_task_spec().extension_data());
inner.set_max_task_retries(
actor_table_data.task_spec().actor_creation_task_spec().max_task_retries());
inner.set_name(actor_table_data.task_spec().actor_creation_task_spec().name());
inner.set_ray_namespace(
actor_table_data.task_spec().actor_creation_task_spec().ray_namespace());
return inner;
}
@ -69,10 +75,12 @@ ActorHandle::ActorHandle(
const rpc::Address &owner_address, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries)
const std::string &extension_data, int64_t max_task_retries, const std::string &name,
const std::string &ray_namespace)
: ActorHandle(CreateInnerActorHandle(
actor_id, owner_id, owner_address, job_id, initial_cursor, actor_language,
actor_creation_task_function_descriptor, extension_data, max_task_retries)) {}
actor_creation_task_function_descriptor, extension_data, max_task_retries, name,
ray_namespace)) {}
ActorHandle::ActorHandle(const std::string &serialized)
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}
@ -103,5 +111,9 @@ void ActorHandle::SetResubmittedActorTaskSpec(TaskSpecification &spec,
void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); }
std::string ActorHandle::GetName() const { return inner_.name(); }
std::string ActorHandle::GetNamespace() const { return inner_.ray_namespace(); }
} // namespace core
} // namespace ray

View file

@ -36,7 +36,8 @@ class ActorHandle {
const rpc::Address &owner_address, const JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries);
const std::string &extension_data, int64_t max_task_retries,
const std::string &name, const std::string &ray_namespace);
/// Constructs an ActorHandle from a serialized string.
explicit ActorHandle(const std::string &serialized);
@ -83,6 +84,10 @@ class ActorHandle {
int64_t MaxTaskRetries() const { return inner_.max_task_retries(); }
std::string GetName() const;
std::string GetNamespace() const;
private:
// Protobuf-defined persistent state of the actor handle.
const rpc::ActorHandle inner_;

View file

@ -183,6 +183,20 @@ bool ActorManager::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
return inserted;
}
void ActorManager::OnActorKilled(const ActorID &actor_id) {
const auto &actor_handle = GetActorHandle(actor_id);
const auto &actor_name = actor_handle->GetName();
const auto &ray_namespace = actor_handle->GetNamespace();
/// Invalidate named actor cache.
if (!actor_name.empty()) {
RAY_LOG(DEBUG) << "Actor name cache is invalided for the actor of name " << actor_name
<< " namespace " << ray_namespace << " id " << actor_id;
absl::MutexLock lock(&cache_mutex_);
cached_actor_name_to_ids_.erase(GenerateCachedActorName(ray_namespace, actor_name));
}
}
void ActorManager::WaitForActorOutOfScope(
const ActorID &actor_id,
std::function<void(const ActorID &)> actor_out_of_scope_callback) {
@ -223,11 +237,7 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id,
if (actor_data.state() == rpc::ActorTableData::RESTARTING) {
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), false);
} else if (actor_data.state() == rpc::ActorTableData::DEAD) {
if (!actor_data.name().empty()) {
absl::MutexLock lock(&cache_mutex_);
cached_actor_name_to_ids_.erase(
GenerateCachedActorName(actor_data.ray_namespace(), actor_data.name()));
}
OnActorKilled(actor_id);
std::shared_ptr<rpc::RayException> creation_task_exception = nullptr;
if (actor_data.has_creation_task_exception()) {
RAY_LOG(INFO) << "Creation task formatted exception: "

View file

@ -116,9 +116,10 @@ class ActorManager {
/// This is used for debugging purpose.
std::vector<ObjectID> GetActorHandleIDsFromHandles();
/// Check if named actor is cached locally.
/// If it has been cached, core worker will not get actor id by name from GCS.
ActorID GetCachedNamedActorID(const std::string &actor_name);
/// Function that's invoked when the actor is permanatly dead.
///
/// \param actor_id The actor id of the handle that will be invalidated.
void OnActorKilled(const ActorID &actor_id);
private:
bool AddNewActorHandle(std::unique_ptr<ActorHandle> actor_handle,
@ -152,6 +153,10 @@ class ActorManager {
const ActorID &actor_id, const ObjectID &actor_creation_return_id,
bool is_self = false);
/// Check if named actor is cached locally.
/// If it has been cached, core worker will not get actor id by name from GCS.
ActorID GetCachedNamedActorID(const std::string &actor_name);
/// Handle actor state notification published from GCS.
///
/// \param[in] actor_id The actor id of this notification.

View file

@ -1799,11 +1799,15 @@ Status CoreWorker::CreateActor(const RayFunction &function,
actor_creation_options.serialized_runtime_env,
actor_creation_options.runtime_env_uris);
// If the namespace is not specified, get it from the job.
const auto &ray_namespace = (actor_creation_options.ray_namespace.empty()
? job_config_->ray_namespace()
: actor_creation_options.ray_namespace);
auto actor_handle = std::make_unique<ActorHandle>(
actor_id, GetCallerId(), rpc_address_, job_id,
/*actor_cursor=*/ObjectID::FromIndex(actor_creation_task_id, 1),
function.GetLanguage(), function.GetFunctionDescriptor(), extension_data,
actor_creation_options.max_task_retries);
actor_creation_options.max_task_retries, actor_name, ray_namespace);
std::string serialized_actor_handle;
actor_handle->Serialize(&serialized_actor_handle);
builder.SetActorCreationTaskSpec(
@ -1811,7 +1815,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
actor_creation_options.max_task_retries,
actor_creation_options.dynamic_worker_options,
actor_creation_options.max_concurrency, actor_creation_options.is_detached,
actor_name, actor_creation_options.ray_namespace, actor_creation_options.is_asyncio,
actor_name, ray_namespace, actor_creation_options.is_asyncio,
actor_creation_options.concurrency_groups, extension_data);
// Add the actor handle before we submit the actor creation task, since the
// actor handle must be in scope by the time the GCS sends the
@ -2079,7 +2083,9 @@ Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_r
cb(Status::Invalid(stream.str()));
}
});
return f.get();
const auto &status = f.get();
actor_manager_->OnActorKilled(actor_id);
return status;
}
Status CoreWorker::KillActorLocalMode(const ActorID &actor_id) {

View file

@ -143,7 +143,7 @@ class ActorManagerTest : public ::testing::Test {
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0);
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "");
EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _))
.WillRepeatedly(testing::Return(true));
actor_manager_->AddNewActorHandle(move(actor_handle), call_site, caller_address,
@ -169,7 +169,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) {
FunctionDescriptorBuilder::BuildPython("", "", "", ""));
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0);
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "");
EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _))
.WillRepeatedly(testing::Return(true));
@ -182,7 +182,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) {
auto actor_handle2 = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0);
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "");
// Make sure the same actor id adding will return false.
ASSERT_FALSE(actor_manager_->AddNewActorHandle(move(actor_handle2), call_site,
caller_address, false));
@ -222,7 +222,7 @@ TEST_F(ActorManagerTest, RegisterActorHandles) {
FunctionDescriptorBuilder::BuildPython("", "", "", ""));
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0);
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0, "", "");
EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _))
.WillRepeatedly(testing::Return(true));
ObjectID outer_object_id = ObjectID::Nil();

View file

@ -514,7 +514,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1),
TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "",
0);
0, "", "");
// Manually create `num_tasks` task specs, and for each of them create a
// `PushTaskRequest`, this is to batch performance of TaskSpec
@ -631,10 +631,10 @@ TEST_F(ZeroNodeTest, TestWorkerContext) {
TEST_F(ZeroNodeTest, TestActorHandle) {
// Test actor handle serialization and deserialization round trip.
JobID job_id = NextJobId();
ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0),
TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0);
ActorHandle original(
ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), TaskID::Nil(),
rpc::Address(), job_id, ObjectID::FromRandom(), Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0, "", "");
std::string output;
original.Serialize(&output);
ActorHandle deserialized(output);

View file

@ -55,6 +55,12 @@ message ActorHandle {
// How many times tasks may be retried on this actor if the actor fails.
int64 max_task_retries = 9;
// The name of the actor.
string name = 10;
// The namespace that this actor belongs to.
string ray_namespace = 11;
}
message ReturnObject {