Task and actor fate sharing with the owner process (#6818)

* Add test

* Kill workers leased by failed workers

* merge

* shorten test

* Add node failure test case

* Fix FromBinary for nil IDs, add assertions

* Test

* Fate sharing on node removal, fix owner address bug

* lint

* Update src/ray/raylet/node_manager.cc

Co-Authored-By: Zhijun Fu <37800433+zhijunfu@users.noreply.github.com>

* fix

* Remove unneeded test

* fix IDs

Co-authored-by: Zhijun Fu <37800433+zhijunfu@users.noreply.github.com>
This commit is contained in:
Stephanie Wang 2020-01-20 16:44:04 -08:00 committed by GitHub
parent 14016535a5
commit 815cd0e39a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 191 additions and 24 deletions

View file

@ -149,10 +149,11 @@ def wait_for_condition(condition_predictor,
Whether the condition is met within the timeout.
"""
time_elapsed = 0
start = time.time()
while time_elapsed <= timeout_ms:
if condition_predictor():
return True
time_elapsed += retry_interval_ms
time_elapsed = (time.time() - start) * 1000
time.sleep(retry_interval_ms / 1000.0)
return False

View file

@ -15,6 +15,7 @@ import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.test_utils import (
relevant_errors,
wait_for_condition,
wait_for_errors,
RayTestTimeoutException,
)
@ -991,6 +992,84 @@ def test_serialized_id(ray_start_cluster):
ray.get(get.remote([obj], True))
def test_fate_sharing(ray_start_cluster):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
})
cluster = Cluster()
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
# Node to place the parent actor.
node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1})
# Node to place the child actor.
cluster.add_node(num_cpus=1, resources={"child": 1})
cluster.wait_for_nodes()
ray.init(address=cluster.address)
@ray.remote
def sleep():
time.sleep(1000)
@ray.remote(resources={"child": 1})
def probe():
return
@ray.remote
class Actor(object):
def __init__(self):
return
def start_child(self, use_actors):
if use_actors:
child = Actor.options(resources={"child": 1}).remote()
ray.get(child.sleep.remote())
else:
ray.get(sleep.options(resources={"child": 1}).remote())
def sleep(self):
time.sleep(1000)
def get_pid(self):
return os.getpid()
# Returns whether the "child" resource is available.
def child_resource_available():
p = probe.remote()
ready, _ = ray.wait([p], timeout=1)
return len(ready) > 0
# Test fate sharing if the parent process dies.
def test_process_failure(use_actors):
a = Actor.options(resources={"parent": 1}).remote()
pid = ray.get(a.get_pid.remote())
a.start_child.remote(use_actors=use_actors)
# Wait for the child to be scheduled.
assert wait_for_condition(
lambda: not child_resource_available(), timeout_ms=10000)
# Kill the parent process.
os.kill(pid, 9)
assert wait_for_condition(child_resource_available, timeout_ms=10000)
# Test fate sharing if the parent node dies.
def test_node_failure(node_to_kill, use_actors):
a = Actor.options(resources={"parent": 1}).remote()
a.start_child.remote(use_actors=use_actors)
# Wait for the child to be scheduled.
assert wait_for_condition(
lambda: not child_resource_available(), timeout_ms=10000)
# Kill the parent process.
cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1})
assert wait_for_condition(child_resource_available, timeout_ms=10000)
return node_to_kill
test_process_failure(use_actors=True)
test_process_failure(use_actors=False)
node_to_kill = test_node_failure(node_to_kill, use_actors=True)
node_to_kill = test_node_failure(node_to_kill, use_actors=False)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -74,7 +74,9 @@ class BaseID {
protected:
BaseID(const std::string &binary) {
std::memcpy(const_cast<uint8_t *>(this->Data()), binary.data(), T::Size());
RAY_CHECK(binary.size() == Size() || binary.size() == 0)
<< "expected size is " << Size() << ", but got " << binary.size();
std::memcpy(const_cast<uint8_t *>(this->Data()), binary.data(), binary.size());
}
// All IDs are immutable for hash evaluations. MutableData is only allow to use
// in construction time, so this function is protected.
@ -389,7 +391,9 @@ std::ostream &operator<<(std::ostream &os, const ObjectID &id);
\
private: \
explicit type(const std::string &binary) { \
std::memcpy(&id_, binary.data(), kUniqueIDSize); \
RAY_CHECK(binary.size() == Size() || binary.size() == 0) \
<< "expected size is " << Size() << ", but got " << binary.size(); \
std::memcpy(&id_, binary.data(), binary.size()); \
} \
};
@ -416,10 +420,10 @@ T BaseID<T>::FromRandom() {
template <typename T>
T BaseID<T>::FromBinary(const std::string &binary) {
RAY_CHECK(binary.size() == T::Size())
RAY_CHECK(binary.size() == T::Size() || binary.size() == 0)
<< "expected size is " << T::Size() << ", but got " << binary.size();
T t = T::Nil();
std::memcpy(t.MutableData(), binary.data(), T::Size());
std::memcpy(t.MutableData(), binary.data(), binary.size());
return t;
}

View file

@ -185,6 +185,10 @@ TaskID TaskSpecification::CallerId() const {
return TaskID::FromBinary(message_->caller_id());
}
const rpc::Address &TaskSpecification::CallerAddress() const {
return message_->caller_address();
}
// === Below are getter methods specific to actor tasks.
ActorID TaskSpecification::ActorId() const {

View file

@ -147,6 +147,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
TaskID CallerId() const;
const rpc::Address &CallerAddress() const;
uint64_t ActorCounter() const;
ObjectID ActorCreationDummyObjectId() const;

View file

@ -198,7 +198,7 @@ ray::Status NodeManager::RegisterGcs() {
// in their rpc::Address to the ID of a failed raylet.
const auto &worker_failure_handler =
[this](const WorkerID &id, const gcs::WorkerFailureData &worker_failure_data) {
HandleUnexpectedWorkerFailure(id, worker_failure_data);
HandleUnexpectedWorkerFailure(worker_failure_data.worker_address());
};
RAY_CHECK_OK(gcs_client_->Workers().AsyncSubscribeToWorkerFailures(
worker_failure_handler, /*done_callback=*/nullptr));
@ -222,12 +222,6 @@ ray::Status NodeManager::RegisterGcs() {
return ray::Status::OK();
}
void NodeManager::HandleUnexpectedWorkerFailure(
const WorkerID &worker_id, const gcs::WorkerFailureData &worker_failed_data) {
RAY_LOG(DEBUG) << "Worker " << worker_id << " failed";
// TODO: Clean up after the failure: If the failed worker is our owner, then exit.
}
void NodeManager::KillWorker(std::shared_ptr<Worker> worker) {
#ifdef _WIN32
// TODO(mehrdadn): Implement implement graceful process termination mechanism
@ -527,6 +521,54 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) {
// guarantee that all tasks get flushed eventually, in case one of the tasks
// in our local cache was supposed to be flushed by the node that died.
lineage_cache_.FlushAllUncommittedTasks();
// Clean up workers that were owned by processes that were on the failed
// node.
rpc::Address address;
address.set_raylet_id(node_info.node_id());
HandleUnexpectedWorkerFailure(address);
}
void NodeManager::HandleUnexpectedWorkerFailure(const rpc::Address &address) {
const WorkerID worker_id = WorkerID::FromBinary(address.worker_id());
const ClientID node_id = ClientID::FromBinary(address.raylet_id());
if (!worker_id.IsNil()) {
RAY_LOG(DEBUG) << "Worker " << worker_id << " failed";
failed_workers_cache_.insert(worker_id);
} else {
RAY_CHECK(!node_id.IsNil());
failed_nodes_cache_.insert(node_id);
}
// TODO(swang): Also clean up any lease requests owned by the failed worker
// from the task queues. This is only necessary for lease requests that are
// infeasible, since requests that are fulfilled will get canceled during
// dispatch.
for (const auto &pair : leased_workers_) {
auto &worker = pair.second;
const auto owner_worker_id =
WorkerID::FromBinary(worker->GetOwnerAddress().worker_id());
const auto owner_node_id =
WorkerID::FromBinary(worker->GetOwnerAddress().raylet_id());
RAY_LOG(DEBUG) << "Lease " << worker->WorkerId() << " owned by " << owner_worker_id;
RAY_CHECK(!owner_worker_id.IsNil() && !owner_node_id.IsNil());
if (!worker->IsDetachedActor()) {
if (!worker_id.IsNil()) {
// If the failed worker was a leased worker's owner, then kill the leased worker.
if (owner_worker_id == worker_id) {
RAY_LOG(INFO) << "Owner process " << owner_worker_id
<< " died, killing leased worker " << worker->WorkerId();
KillWorker(worker);
}
} else if (owner_node_id == node_id) {
// If the leased worker's owner was on the failed node, then kill the leased
// worker.
RAY_LOG(INFO) << "Owner node " << owner_node_id << " died, killing leased worker "
<< worker->WorkerId();
KillWorker(worker);
}
}
}
}
void NodeManager::ResourceCreateUpdated(const ClientID &client_id,
@ -1578,11 +1620,11 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
// task directly on the worker.
RAY_LOG(DEBUG) << "Worker lease request " << task.GetTaskSpecification().TaskId();
TaskID task_id = task.GetTaskSpecification().TaskId();
rpc::Address owner_address = task.GetTaskSpecification().CallerAddress();
task.OnDispatchInstead(
[this, task_id, reply, send_reply_callback](
[this, owner_address, reply, send_reply_callback](
const std::shared_ptr<void> granted, const std::string &address, int port,
const WorkerID &worker_id, const ResourceIdSet &resource_ids) {
RAY_LOG(DEBUG) << "Worker lease request DISPATCH " << task_id;
reply->mutable_worker_address()->set_ip_address(address);
reply->mutable_worker_address()->set_port(port);
reply->mutable_worker_address()->set_worker_id(worker_id.Binary());
@ -1603,12 +1645,11 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
}
send_reply_callback(Status::OK(), nullptr, nullptr);
// TODO(swang): Kill worker if other end hangs up.
// TODO(swang): Implement a lease term by which the owner needs to return the
// worker.
RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end())
<< "Worker is already leased out " << worker_id;
leased_workers_[worker_id] = std::static_pointer_cast<Worker>(granted);
auto worker = std::static_pointer_cast<Worker>(granted);
leased_workers_[worker_id] = worker;
});
task.OnSpillbackInstead(
[reply, task_id, send_reply_callback](const ClientID &spillback_to,
@ -1627,6 +1668,7 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
rpc::SendReplyCallback send_reply_callback) {
// Read the resource spec submitted by the client.
auto worker_id = WorkerID::FromBinary(request.worker_id());
RAY_LOG(DEBUG) << "Return worker " << worker_id;
std::shared_ptr<Worker> worker = leased_workers_[worker_id];
if (new_scheduler_enabled_) {
@ -2319,10 +2361,27 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &
if (task.GetTaskSpecification().IsDetachedActor()) {
worker->MarkDetachedActor();
}
const auto owner_worker_id = WorkerID::FromBinary(spec.CallerAddress().worker_id());
const auto owner_node_id = ClientID::FromBinary(spec.CallerAddress().raylet_id());
RAY_CHECK(!owner_worker_id.IsNil());
RAY_LOG(DEBUG) << "Worker lease request DISPATCH " << task_id << " to worker "
<< worker->WorkerId() << ", owner ID " << owner_worker_id;
task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port(),
worker->WorkerId(),
spec.IsActorCreationTask() ? worker->GetLifetimeResourceIds()
: worker->GetTaskResourceIds());
// If the owner has died since this task was queued, cancel the task by
// killing the worker.
if (failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0) {
// TODO(swang): Skip assigning this task to this worker instead of
// killing the worker?
KillWorker(worker);
}
post_assign_callbacks->push_back([this, worker, task_id]() {
RAY_LOG(DEBUG) << "Finished assigning task " << task_id << " to worker "
<< worker->WorkerId();
@ -2386,6 +2445,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) {
// direct actor creation calls because this ID is used later if the actor
// requires objects from plasma.
worker.AssignTaskId(TaskID::Nil());
worker.SetOwnerAddress(rpc::Address());
}
// Direct actors will be assigned tasks via the core worker and therefore are
// not idle.
@ -2965,6 +3025,7 @@ void NodeManager::FinishAssignTask(const std::shared_ptr<Worker> &worker,
auto spec = assigned_task.GetTaskSpecification();
// We successfully assigned the task to the worker.
worker->AssignTaskId(spec.TaskId());
worker->SetOwnerAddress(spec.CallerAddress());
worker->AssignJobId(spec.JobId());
// TODO(swang): For actors with multiple actor handles, to
// guarantee that tasks are replayed in the same order after a

View file

@ -121,10 +121,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Handle an unexpected failure notification from GCS pubsub.
///
/// \param worker_id The ID of the failed worker.
/// \param worker_data Data associated with the worker failure.
void HandleUnexpectedWorkerFailure(const WorkerID &worker_id,
const gcs::WorkerFailureData &worker_failed_data);
/// \param worker_address The address of the worker that died.
void HandleUnexpectedWorkerFailure(const rpc::Address &worker_address);
/// Handler for the addition of a new node.
///
@ -659,6 +657,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Map of workers leased out to direct call clients.
std::unordered_map<WorkerID, std::shared_ptr<Worker>> leased_workers_;
/// Map from owner worker ID to a list of worker IDs that the owner has a
/// lease on.
absl::flat_hash_map<WorkerID, std::vector<WorkerID>> leased_workers_by_owner_;
/// Whether new schedule is enabled.
const bool new_scheduler_enabled_;
@ -687,8 +689,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_;
/// XXX
/// Wait for a task's arguments to become ready.
void WaitForTaskArgsRequests(std::pair<ScheduleFn, Task> &work);
// TODO(swang): Evict entries from these caches.
/// Cache for the WorkerFailureTable in the GCS.
absl::flat_hash_set<WorkerID> failed_workers_cache_;
/// Cache for the ClientTable in the GCS.
absl::flat_hash_set<ClientID> failed_nodes_cache_;
};
} // namespace raylet

View file

@ -383,7 +383,7 @@ void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) {
auto task_lease_data = std::make_shared<TaskLeaseData>();
task_lease_data->set_task_id(task_id.Binary());
task_lease_data->set_node_manager_id(client_id_.Hex());
task_lease_data->set_node_manager_id(client_id_.Binary());
task_lease_data->set_acquired_at(absl::GetCurrentTimeNanos() / 1000000);
task_lease_data->set_timeout(it->second.lease_period);
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAddTaskLease(task_lease_data, nullptr));

View file

@ -91,6 +91,9 @@ const std::shared_ptr<LocalClientConnection> Worker::Connection() const {
return connection_;
}
void Worker::SetOwnerAddress(const rpc::Address &address) { owner_address_ = address; }
const rpc::Address &Worker::GetOwnerAddress() const { return owner_address_; }
const ResourceIdSet &Worker::GetLifetimeResourceIds() const {
return lifetime_resource_ids_;
}

View file

@ -51,6 +51,8 @@ class Worker {
void MarkDetachedActor();
bool IsDetachedActor() const;
const std::shared_ptr<LocalClientConnection> Connection() const;
void SetOwnerAddress(const rpc::Address &address);
const rpc::Address &GetOwnerAddress() const;
const ResourceIdSet &GetLifetimeResourceIds() const;
void SetLifetimeResourceIds(ResourceIdSet &resource_ids);
@ -117,6 +119,9 @@ class Worker {
/// Whether the worker is detached. This is applies when the worker is actor.
/// Detached actor means the actor's creator can exit without killing this actor.
bool is_detached_actor_;
/// The address of this worker's owner. The owner is the worker that
/// currently holds the lease on this worker, if any.
rpc::Address owner_address_;
};
} // namespace raylet