Fix two types of eviction hangs (#5225)

This commit is contained in:
Eric Liang 2019-07-23 21:20:17 -07:00 committed by GitHub
parent 97c43284a6
commit 5b76238bce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 126 additions and 45 deletions

View file

@ -1330,7 +1330,7 @@ def test_actors_and_tasks_with_gpus_version_two(shutdown_only):
def test_blocking_actor_task(shutdown_only):
ray.init(num_cpus=1, num_gpus=1)
ray.init(num_cpus=1, num_gpus=1, object_store_memory=int(10**8))
@ray.remote(num_gpus=1)
def f():
@ -2023,7 +2023,7 @@ def test_lifetime_and_transient_resources(ray_start_regular):
actor2s = [Actor2.remote() for _ in range(2)]
results = [a.method.remote() for a in actor2s]
ready_ids, remaining_ids = ray.wait(
results, num_returns=len(results), timeout=1.0)
results, num_returns=len(results), timeout=5.0)
assert len(ready_ids) == 1

View file

@ -688,7 +688,7 @@ def test_raylet_crash_when_get(ray_start_regular):
thread = threading.Thread(target=sleep_to_kill_raylet)
thread.start()
with pytest.raises(Exception, match=r".*Connection closed unexpectedly.*"):
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(nonexistent_id)
thread.join()

View file

@ -6,6 +6,7 @@ import json
import numpy as np
import os
import pytest
import sys
import time
import ray
@ -479,6 +480,8 @@ def test_nondeterministic_task(ray_start_reconstruction):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
@pytest.mark.skipif(
sys.version_info < (3, 0), reason="This test requires Python 3.")
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**9], indirect=True)
def test_driver_put_errors(ray_start_object_store_memory):
@ -524,6 +527,7 @@ def test_driver_put_errors(ray_start_object_store_memory):
errors = wait_for_errors(error_check)
assert all(error["type"] == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR
or "ray.exceptions.UnreconstructableError" in error["message"]
for error in errors)

View file

@ -0,0 +1,40 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import unittest
import ray
class TestUnreconstructableErrors(unittest.TestCase):
def setUp(self):
ray.init(object_store_memory=10000000, redis_max_memory=10000000)
def tearDown(self):
ray.shutdown()
def testDriverPutEvictedCannotReconstruct(self):
x_id = ray.put(np.zeros(1 * 1024 * 1024))
ray.get(x_id)
for _ in range(10):
ray.put(np.zeros(1 * 1024 * 1024))
self.assertRaises(ray.exceptions.UnreconstructableError,
lambda: ray.get(x_id))
def testLineageEvictedReconstructionFails(self):
@ray.remote
def f(data):
return 0
x_id = f.remote(None)
ray.get(x_id)
for _ in range(400):
ray.get([f.remote(np.zeros(10000)) for _ in range(50)])
self.assertRaises(ray.exceptions.UnreconstructableError,
lambda: ray.get(x_id))
if __name__ == "__main__":
unittest.main(verbosity=2)

View file

@ -88,7 +88,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_,
[this](const TaskID &task_id) { HandleTaskReconstruction(task_id); },
[this](const TaskID &task_id, const ObjectID &required_object_id) {
HandleTaskReconstruction(task_id, required_object_id);
},
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
gcs_client_->client_table().GetLocalClientId(), gcs_client_->task_lease_table(),
object_directory_, gcs_client_->task_reconstruction_log()),
@ -1384,9 +1386,27 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ
// information about the TaskSpecification implementation.
num_returns -= 1;
}
const std::string meta = std::to_string(static_cast<int>(error_type));
// Determine which IDs should be marked as failed.
std::vector<plasma::ObjectID> objects_to_fail;
for (int64_t i = 0; i < num_returns; i++) {
const auto object_id = spec.ReturnId(i).ToPlasmaId();
objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId());
}
const JobID job_id = task.GetTaskSpecification().JobId();
MarkObjectsAsFailed(error_type, objects_to_fail, job_id);
task_dependency_manager_.TaskCanceled(spec.TaskId());
// Notify the task dependency manager that we no longer need this task's
// object dependencies. TODO(swang): Ideally, we would check the return value
// here. However, we don't know at this point if the task was in the WAITING
// or READY queue before, in which case we would not have been subscribed to
// its dependencies.
task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId());
}
void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type,
const std::vector<plasma::ObjectID> objects_to_fail,
const JobID &job_id) {
const std::string meta = std::to_string(static_cast<int>(error_type));
for (const auto &object_id : objects_to_fail) {
arrow::Status status = store_client_.CreateAndSeal(object_id, "", meta);
if (!status.ok() && !plasma::IsPlasmaObjectExists(status)) {
// If we failed to save the error code, log a warning and push an error message
@ -1398,16 +1418,9 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ
std::string error_message = stream.str();
RAY_LOG(WARNING) << error_message;
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
task.GetTaskSpecification().JobId(), "task", error_message, current_time_ms()));
job_id, "task", error_message, current_time_ms()));
}
}
task_dependency_manager_.TaskCanceled(spec.TaskId());
// Notify the task dependency manager that we no longer need this task's
// object dependencies. TODO(swang): Ideally, we would check the return value
// here. However, we don't know at this point if the task was in the WAITING
// or READY queue before, in which case we would not have been subscribed to
// its dependencies.
task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId());
}
void NodeManager::TreatTaskAsFailedIfLost(const Task &task) {
@ -2024,34 +2037,41 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
}
}
void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
void NodeManager::HandleTaskReconstruction(const TaskID &task_id,
const ObjectID &required_object_id) {
// Retrieve the task spec in order to re-execute the task.
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup(
JobID::Nil(), task_id,
/*success_callback=*/
[this](ray::gcs::RedisGcsClient *client, const TaskID &task_id,
const TaskTableData &task_data) {
[this, required_object_id](ray::gcs::RedisGcsClient *client, const TaskID &task_id,
const TaskTableData &task_data) {
// The task was in the GCS task table. Use the stored task spec to
// re-execute the task.
ResubmitTask(Task(task_data.task()));
ResubmitTask(Task(task_data.task()), required_object_id);
},
/*failure_callback=*/
[this](ray::gcs::RedisGcsClient *client, const TaskID &task_id) {
[this, required_object_id](ray::gcs::RedisGcsClient *client,
const TaskID &task_id) {
// The task was not in the GCS task table. It must therefore be in the
// lineage cache.
RAY_CHECK(lineage_cache_.ContainsTask(task_id))
<< "Metadata of task " << task_id
<< " not found in either GCS or lineage cache. It may have been evicted "
<< "by the redis LRU configuration. Consider increasing the memory "
"allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>).";
// Use a copy of the cached task spec to re-execute the task.
const Task task = lineage_cache_.GetTaskOrDie(task_id);
ResubmitTask(task);
if (lineage_cache_.ContainsTask(task_id)) {
// Use a copy of the cached task spec to re-execute the task.
const Task task = lineage_cache_.GetTaskOrDie(task_id);
ResubmitTask(task, required_object_id);
} else {
RAY_LOG(WARNING)
<< "Metadata of task " << task_id
<< " not found in either GCS or lineage cache. It may have been evicted "
<< "by the redis LRU configuration. Consider increasing the memory "
"allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>).";
MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE,
{required_object_id.ToPlasmaId()}, JobID::Nil());
}
}));
}
void NodeManager::ResubmitTask(const Task &task) {
void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object_id) {
RAY_LOG(DEBUG) << "Attempting to resubmit task "
<< task.GetTaskSpecification().TaskId();
@ -2082,6 +2102,9 @@ void NodeManager::ResubmitTask(const Task &task) {
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
task.GetTaskSpecification().JobId(), type, error_message.str(),
current_time_ms()));
MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE,
{required_object_id.ToPlasmaId()},
task.GetTaskSpecification().JobId());
return;
}

View file

@ -189,6 +189,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \param error_type The type of the error that caused this task to fail.
/// \return Void.
void TreatTaskAsFailed(const Task &task, const ErrorType &error_type);
/// Mark the specified objects as failed with the given error type.
///
/// \param error_type The type of the error that caused this task to fail.
/// \param object_ids The object ids to store error messages into.
/// \param job_id The optional job to push errors to if the writes fail.
void MarkObjectsAsFailed(const ErrorType &error_type,
const std::vector<plasma::ObjectID> object_ids,
const JobID &job_id);
/// This is similar to TreatTaskAsFailed, but it will only mark the task as
/// failed if at least one of the task's return values is lost. A return
/// value is lost if it has been created before, but no longer exists on any
@ -257,14 +265,17 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Handle a task whose return value(s) must be reconstructed.
///
/// \param task_id The relevant task ID.
/// \param required_object_id The object id we are reconstructing for.
/// \return Void.
void HandleTaskReconstruction(const TaskID &task_id);
void HandleTaskReconstruction(const TaskID &task_id,
const ObjectID &required_object_id);
/// Resubmit a task for execution. This is a task that was previously already
/// submitted to a raylet but which must now be re-executed.
///
/// \param task The task being resubmitted.
/// \param required_object_id The object id that triggered the resubmission.
/// \return Void.
void ResubmitTask(const Task &task);
void ResubmitTask(const Task &task, const ObjectID &required_object_id);
/// Attempt to forward a task to a remote different node manager. If this
/// fails, the task will be resubmit locally.
///

View file

@ -8,7 +8,7 @@ namespace raylet {
ReconstructionPolicy::ReconstructionPolicy(
boost::asio::io_service &io_service,
std::function<void(const TaskID &)> reconstruction_handler,
std::function<void(const TaskID &, const ObjectID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms, const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory,
@ -63,8 +63,8 @@ void ReconstructionPolicy::SetTaskTimeout(
});
}
void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id,
bool success) {
void ReconstructionPolicy::HandleReconstructionLogAppend(
const TaskID &task_id, const ObjectID &required_object_id, bool success) {
auto it = listening_tasks_.find(task_id);
if (it == listening_tasks_.end()) {
return;
@ -76,7 +76,7 @@ void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id,
SetTaskTimeout(it, initial_reconstruction_timeout_ms_);
if (success) {
reconstruction_handler_(task_id);
reconstruction_handler_(task_id, required_object_id);
}
}
@ -112,14 +112,14 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id,
RAY_CHECK_OK(task_reconstruction_log_.AppendAt(
JobID::Nil(), task_id, reconstruction_entry,
/*success_callback=*/
[this](gcs::RedisGcsClient *client, const TaskID &task_id,
const TaskReconstructionData &data) {
HandleReconstructionLogAppend(task_id, /*success=*/true);
[this, required_object_id](gcs::RedisGcsClient *client, const TaskID &task_id,
const TaskReconstructionData &data) {
HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/true);
},
/*failure_callback=*/
[this](gcs::RedisGcsClient *client, const TaskID &task_id,
const TaskReconstructionData &data) {
HandleReconstructionLogAppend(task_id, /*success=*/false);
[this, required_object_id](gcs::RedisGcsClient *client, const TaskID &task_id,
const TaskReconstructionData &data) {
HandleReconstructionLogAppend(task_id, required_object_id, /*success=*/false);
},
reconstruction_attempt));

View file

@ -42,7 +42,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface {
/// lease notifications from.
ReconstructionPolicy(
boost::asio::io_service &io_service,
std::function<void(const TaskID &)> reconstruction_handler,
std::function<void(const TaskID &, const ObjectID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms, const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory,
@ -127,12 +127,13 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface {
/// Handle the response for an attempt at adding an entry to the task
/// reconstruction log.
void HandleReconstructionLogAppend(const TaskID &task_id, bool success);
void HandleReconstructionLogAppend(const TaskID &task_id, const ObjectID &object_id,
bool success);
/// The event loop.
boost::asio::io_service &io_service_;
/// The handler to call for tasks that require reconstruction.
const std::function<void(const TaskID &)> reconstruction_handler_;
const std::function<void(const TaskID &, const ObjectID &)> reconstruction_handler_;
/// The initial timeout within which a task lease notification must be
/// received. Otherwise, reconstruction will be triggered.
const int64_t initial_reconstruction_timeout_ms_;

View file

@ -155,7 +155,9 @@ class ReconstructionPolicyTest : public ::testing::Test {
reconstruction_timeout_ms_(50),
reconstruction_policy_(std::make_shared<ReconstructionPolicy>(
io_service_,
[this](const TaskID &task_id) { TriggerReconstruction(task_id); },
[this](const TaskID &task_id, const ObjectID &obj) {
TriggerReconstruction(task_id);
},
reconstruction_timeout_ms_, ClientID::FromRandom(), mock_gcs_,
mock_object_directory_, mock_gcs_)),
timer_canceled_(false) {