[core] Reconstruction for lost plasma objects (#7733)

* Add a lineage_ref_count to References

* Refactor TaskManager to store TaskEntry as a struct

* Refactor to fix deadlock between TaskManager and ReferenceCounter
Add references to task specs

* Pin TaskEntries and References in the lineage of any ObjectIDs in scope

* Fix deadlock, convert num_plasma_returns to a set of object IDs

* fix unit tests

* Feature flag

* Do not release lineage for objects that were promoted to plasma

* fix build

* fix build

* Remove num executions

* Remove num executions

* Add pinned locations to ReferenceCounter, empty handler for node death

* Fix num returns for actor tasks, fix Put return value

* Add regression test

* Clear pinned locations and callbacks on node removal

* Clear pinned locations and callbacks on node removal

* Simplify num return values

* Remove unused

* doc

* tmp

* Set num returns

* Move lineage pinning flag to ReferenceCounter

* comments

* Recover from plasma failures by pinning a new copy

* Basic object reconstruction, no concurrent reqs yet

* reconstruction test suite and a few fixes:
- fix for disabling lineage
- fix for updating submitted task refs

* Handle concurrent attempts to recover the same object

* Fix deadlock in DrainAndShutdown

* Revert "[core] Revert lineage pinning (#7499) (#7692)"

This reverts commit ba86a02b37.

* debug rllib

* debug rllib

* turn on all rllib tests again

* debug rllib

* Fix drain bug, check number of pending tasks

* revert rllib debug

* remove todo

* Trigger rllib tests

* revert rllib debug commit

* Split out logic into ObjectRecoveryManager

* Fix python tests

* Refactor to remove dependency on gcs client

* Unit tests

* Move pinned at node ID to direct memory store

* Unit test fixes and lint

* simplify and more tests

* Add ResubmitTask test for TaskManager

* Doc

* fix build

* comments

* Fix

* debug

* Update

* fix

* Fix

* Fix bad status handling, unit test

* Fix build
This commit is contained in:
Stephanie Wang 2020-04-11 16:52:57 -07:00 committed by GitHub
parent 18e9a076e5
commit d7eef808b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1284 additions and 48 deletions

View file

@ -272,6 +272,7 @@ cc_library(
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/memory",
"@com_google_googletest//:gtest",
"@msgpack",
"@plasma//:plasma_client",
@ -542,6 +543,16 @@ cc_test(
],
)
cc_test(
name = "object_recovery_manager_test",
srcs = ["src/ray/core_worker/test/object_recovery_manager_test.cc"],
copts = COPTS,
deps = [
":core_worker_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "scheduling_queue_test",
srcs = ["src/ray/core_worker/test/scheduling_queue_test.cc"],

View file

@ -203,6 +203,14 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_reconstruction",
size = "medium",
srcs = ["test_reconstruction.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_reference_counting",
size = "medium",

View file

@ -0,0 +1,279 @@
import json
import sys
import numpy as np
import pytest
import ray
from ray.cluster_utils import Cluster
from ray.test_utils import (
wait_for_condition, )
def test_cached_object(ray_start_cluster):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
})
cluster = Cluster()
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
cluster.add_node(
num_cpus=1, resources={"node2": 1}, object_store_memory=10**8)
cluster.wait_for_nodes()
ray.init(address=cluster.address)
@ray.remote
def large_object():
return np.zeros(10**7, dtype=np.uint8)
@ray.remote
def dependent_task(x):
return
obj = large_object.options(resources={"node1": 1}).remote()
ray.get(dependent_task.options(resources={"node2": 1}).remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
assert wait_for_condition(
lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10)
for _ in range(20):
large_object.options(resources={"node2": 1}).remote()
ray.get(dependent_task.remote(obj))
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_reconstruction_cached_dependency(ray_start_cluster,
reconstruction_enabled):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
})
cluster = Cluster()
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.add_node(
num_cpus=1,
resources={"node2": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
@ray.remote(max_retries=0)
def large_object():
return np.zeros(10**7, dtype=np.uint8)
@ray.remote
def chain(x):
return x
@ray.remote
def dependent_task(x):
return
obj = large_object.options(resources={"node2": 1}).remote()
obj = chain.options(resources={"node1": 1}).remote(obj)
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)
assert wait_for_condition(
lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10)
for _ in range(20):
large_object.options(resources={"node2": 1}).remote()
if reconstruction_enabled:
ray.get(dependent_task.remote(obj))
else:
with pytest.raises(ray.exceptions.RayTaskError) as e:
ray.get(dependent_task.remote(obj))
with pytest.raises(ray.exceptions.UnreconstructableError):
raise e.as_instanceof_cause()
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
})
cluster = Cluster()
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.add_node(
num_cpus=1,
resources={"node2": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
return np.zeros(10**7, dtype=np.uint8)
@ray.remote
def dependent_task(x):
return
obj = large_object.options(resources={"node1": 1}).remote()
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)
if reconstruction_enabled:
ray.get(dependent_task.remote(obj))
else:
with pytest.raises(ray.exceptions.RayTaskError) as e:
ray.get(dependent_task.remote(obj))
with pytest.raises(ray.exceptions.UnreconstructableError):
raise e.as_instanceof_cause()
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
})
cluster = Cluster()
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.add_node(
num_cpus=1,
resources={"node2": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
return np.zeros(10**7, dtype=np.uint8)
@ray.remote
def chain(x):
return x
@ray.remote
def dependent_task(x):
return
obj = large_object.options(resources={"node2": 1}).remote()
downstream = [chain.remote(obj) for _ in range(4)]
for obj in downstream:
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)
if reconstruction_enabled:
for obj in downstream:
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))
else:
with pytest.raises(ray.exceptions.RayTaskError) as e:
for obj in downstream:
ray.get(
dependent_task.options(resources={
"node1": 1
}).remote(obj))
with pytest.raises(ray.exceptions.UnreconstructableError):
raise e.as_instanceof_cause()
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
})
cluster = Cluster()
# Head node with no resources.
cluster.add_node(
num_cpus=0, _internal_config=config, object_store_memory=10**8)
node_to_kill = cluster.add_node(
num_cpus=1, object_store_memory=10**8, _internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)
@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
return np.zeros(10**7, dtype=np.uint8)
@ray.remote
def chain(x):
return x
@ray.remote
def dependent_task(x):
return x
obj = large_object.remote()
for _ in range(20):
obj = chain.remote(obj)
ray.get(dependent_task.remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1, object_store_memory=10**8, _internal_config=config)
if reconstruction_enabled:
ray.get(dependent_task.remote(obj))
else:
with pytest.raises(ray.exceptions.RayTaskError) as e:
ray.get(dependent_task.remote(obj))
with pytest.raises(ray.exceptions.UnreconstructableError):
raise e.as_instanceof_cause()
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -15,6 +15,7 @@
#ifndef RAY_COMMON_RAY_OBJECT_H
#define RAY_COMMON_RAY_OBJECT_H
#include "absl/types/optional.h"
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/protobuf/gcs.pb.h"

View file

@ -274,6 +274,14 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RegisterToGcs();
// Register a callback to monitor removed nodes.
auto on_node_change = [this](const ClientID &node_id, const rpc::GcsNodeInfo &data) {
if (data.state() == rpc::GcsNodeInfo::DEAD) {
OnNodeRemoved(data);
}
};
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr));
actor_manager_ = std::unique_ptr<ActorManager>(new ActorManager(gcs_client_->Actors()));
// Initialize profiler.
@ -342,8 +350,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
boost::asio::chrono::milliseconds(kInternalHeartbeatMillis));
internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this, _1));
io_thread_ = std::thread(&CoreWorker::RunIOService, this);
plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider(
options_.store_socket, local_raylet_client_, options_.check_signals,
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
@ -359,7 +365,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
task_manager_.reset(new TaskManager(
memory_store_, reference_counter_, actor_manager_,
[this](const TaskSpecification &spec) {
[this](const TaskSpecification &spec, bool delay) {
if (delay) {
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
uint32_t delay = RayConfig::instance().task_retry_delay_ms();
@ -367,6 +374,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
<< "ms delay: " << spec.DebugString();
absl::MutexLock lock(&mutex_);
to_resubmit_.push_back(std::make_pair(current_time_ms() + delay, spec));
} else {
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec));
}
}));
// Create an entry for the driver task in the task table. This task is
@ -394,19 +404,19 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr, *client_call_manager_));
};
auto raylet_client_factory = [this](const std::string ip_address, int port) {
auto grpc_client =
rpc::NodeManagerWorkerClient::make(ip_address, port, *client_call_manager_);
return std::shared_ptr<raylet::RayletClient>(
new raylet::RayletClient(std::move(grpc_client)));
};
direct_actor_submitter_ = std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(rpc_address_, client_factory, memory_store_,
task_manager_));
direct_task_submitter_ =
std::unique_ptr<CoreWorkerDirectTaskSubmitter>(new CoreWorkerDirectTaskSubmitter(
rpc_address_, local_raylet_client_, client_factory,
[this](const std::string ip_address, int port) {
auto grpc_client = rpc::NodeManagerWorkerClient::make(ip_address, port,
*client_call_manager_);
return std::shared_ptr<raylet::RayletClient>(
new raylet::RayletClient(std::move(grpc_client)));
},
rpc_address_, local_raylet_client_, client_factory, raylet_client_factory,
memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds()));
future_resolver_.reset(new FutureResolver(memory_store_, client_factory));
@ -414,6 +424,44 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
if (direct_task_receiver_ != nullptr) {
direct_task_receiver_->Init(client_factory, rpc_address_, local_raylet_client_);
}
auto object_lookup_fn = [this](const ObjectID &object_id,
const ObjectLookupCallback &callback) {
return gcs_client_->Objects().AsyncGetLocations(
object_id,
[this, object_id, callback](const Status &status,
const std::vector<rpc::ObjectTableData> &results) {
RAY_CHECK_OK(status);
std::vector<rpc::Address> locations;
for (const auto &result : results) {
const auto &node_id = ClientID::FromBinary(result.manager());
auto node = gcs_client_->Nodes().Get(node_id);
RAY_CHECK(node.has_value());
if (node->state() == rpc::GcsNodeInfo::ALIVE) {
rpc::Address address;
address.set_raylet_id(node->node_id());
address.set_ip_address(node->node_manager_address());
address.set_port(node->node_manager_port());
locations.push_back(address);
}
}
callback(object_id, locations);
});
};
object_recovery_manager_ =
std::unique_ptr<ObjectRecoveryManager>(new ObjectRecoveryManager(
rpc_address_, raylet_client_factory, local_raylet_client_, object_lookup_fn,
task_manager_, reference_counter_, memory_store_,
[this](const ObjectID &object_id, bool pin_object) {
RAY_CHECK_OK(Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE),
/*contained_object_ids=*/{}, object_id,
/*pin_object=*/pin_object));
},
RayConfig::instance().lineage_pinning_enabled()));
// Start the IO thread after all other members have been initialized, in case
// the thread calls back into any of our members.
io_thread_ = std::thread(&CoreWorker::RunIOService, this);
}
void CoreWorker::Shutdown() {
@ -502,6 +550,22 @@ void CoreWorker::RunIOService() {
io_service_.run();
}
void CoreWorker::OnNodeRemoved(const rpc::GcsNodeInfo &node_info) {
const auto node_id = ClientID::FromBinary(node_info.node_id());
RAY_LOG(INFO) << "Node failure " << node_id;
const auto lost_objects = reference_counter_->ResetObjectsOnRemovedNode(node_id);
// Delete the objects from the in-memory store to indicate that they are not
// available. The object recovery manager will guarantee that a new value
// will eventually be stored for the objects (either an
// UnreconstructableError or a value reconstructed from lineage).
memory_store_->Delete(lost_objects);
for (const auto &object_id : lost_objects) {
RAY_LOG(INFO) << "Object " << object_id << " lost due to node failure " << node_id;
// The lost object must have been owned by us.
RAY_CHECK_OK(object_recovery_manager_->RecoverObject(object_id));
}
}
void CoreWorker::WaitForShutdown() {
if (io_thread_.joinable()) {
io_thread_.join();
@ -1611,6 +1675,8 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques
void CoreWorker::HandleWaitForObjectEviction(
const rpc::WaitForObjectEvictionRequest &request,
rpc::WaitForObjectEvictionReply *reply, rpc::SendReplyCallback send_reply_callback) {
// TODO(swang): Drop requests from raylets that executed an older version of
// the task.
if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()),
send_reply_callback)) {
return;
@ -1695,7 +1761,9 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &
rpc::SendReplyCallback send_reply_callback) {
absl::MutexLock lock(&mutex_);
auto stats = reply->mutable_core_worker_stats();
stats->set_num_pending_tasks(task_manager_->NumPendingTasks());
// TODO(swang): Differentiate between tasks that are currently pending
// execution and tasks that have finished but may be retried.
stats->set_num_pending_tasks(task_manager_->NumSubmissibleTasks());
stats->set_task_queue_length(task_queue_length_);
stats->set_num_executed_tasks(num_executed_tasks_);
stats->set_num_object_ids_in_scope(reference_counter_->NumObjectIDsInScope());

View file

@ -23,6 +23,7 @@
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/future_resolver.h"
#include "ray/core_worker/object_recovery_manager.h"
#include "ray/core_worker/profiling.h"
#include "ray/core_worker/reference_count.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
@ -867,6 +868,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
}
}
/// Handler if a raylet node is removed from the cluster.
void OnNodeRemoved(const rpc::GcsNodeInfo &node_info);
const CoreWorkerOptions options_;
/// Callback to get the current language (e.g., Python) call site.
@ -961,6 +965,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
// Interface to submit non-actor tasks directly to leased workers.
std::unique_ptr<CoreWorkerDirectTaskSubmitter> direct_task_submitter_;
/// Manages recovery of objects stored in remote plasma nodes.
std::unique_ptr<ObjectRecoveryManager> object_recovery_manager_;
/// The `actor_handles_` field could be mutated concurrently due to multi-threading, we
/// need a mutex to protect it.
mutable absl::Mutex actor_handles_mutex_;

View file

@ -0,0 +1,145 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/core_worker/object_recovery_manager.h"
#include "ray/util/util.h"
namespace ray {
Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
// Check the ReferenceCounter to see if there is a location for the object.
bool pinned = false;
bool owned_by_us = reference_counter_->IsPlasmaObjectPinned(object_id, &pinned);
if (!owned_by_us) {
return Status::Invalid(
"Object reference no longer exists or is not owned by us. Either lineage pinning "
"is disabled or this object ID is borrowed.");
}
bool already_pending_recovery = true;
if (!pinned) {
{
absl::MutexLock lock(&mu_);
// Mark that we are attempting recovery for this object to prevent
// duplicate reconstructions of the same object.
already_pending_recovery = !objects_pending_recovery_.insert(object_id).second;
}
}
if (!already_pending_recovery) {
RAY_LOG(INFO) << "Starting recovery for object " << object_id;
in_memory_store_->GetAsync(
object_id, [this, object_id](std::shared_ptr<RayObject> obj) {
absl::MutexLock lock(&mu_);
RAY_CHECK(objects_pending_recovery_.erase(object_id)) << object_id;
RAY_LOG(INFO) << "Recovery complete for object " << object_id;
});
// Lookup the object in the GCS to find another copy.
RAY_RETURN_NOT_OK(object_lookup_(
object_id,
[this](const ObjectID &object_id, const std::vector<rpc::Address> &locations) {
PinOrReconstructObject(object_id, locations);
}));
} else {
RAY_LOG(DEBUG) << "Recovery already started for object " << object_id;
}
return Status::OK();
}
void ObjectRecoveryManager::PinOrReconstructObject(
const ObjectID &object_id, const std::vector<rpc::Address> &locations) {
RAY_LOG(INFO) << "Lost object " << object_id << " has " << locations.size()
<< " locations";
if (!locations.empty()) {
auto locations_copy = locations;
const auto location = locations_copy.back();
locations_copy.pop_back();
PinExistingObjectCopy(object_id, location, locations_copy);
} else if (lineage_reconstruction_enabled_) {
// There are no more copies to pin, try to reconstruct the object.
ReconstructObject(object_id);
} else {
reconstruction_failure_callback_(object_id, /*pin_object=*/true);
}
}
void ObjectRecoveryManager::PinExistingObjectCopy(
const ObjectID &object_id, const rpc::Address &raylet_address,
const std::vector<rpc::Address> &other_locations) {
// If a copy still exists, pin the object by sending a
// PinObjectIDs RPC.
const auto node_id = ClientID::FromBinary(raylet_address.raylet_id());
RAY_LOG(DEBUG) << "Trying to pin copy of lost object " << object_id << " at node "
<< node_id;
std::shared_ptr<PinObjectsInterface> client;
if (node_id == ClientID::FromBinary(rpc_address_.raylet_id())) {
client = local_object_pinning_client_;
} else {
absl::MutexLock lock(&mu_);
auto client_it = remote_object_pinning_clients_.find(node_id);
if (client_it == remote_object_pinning_clients_.end()) {
RAY_LOG(DEBUG) << "Connecting to raylet " << node_id;
client_it = remote_object_pinning_clients_
.emplace(node_id, client_factory_(raylet_address.ip_address(),
raylet_address.port()))
.first;
}
client = client_it->second;
}
RAY_UNUSED(client->PinObjectIDs(
rpc_address_, {object_id},
[this, object_id, other_locations, node_id](const Status &status,
const rpc::PinObjectIDsReply &reply) {
if (status.ok()) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id));
reference_counter_->UpdateObjectPinnedAtRaylet(object_id, node_id);
} else {
RAY_LOG(INFO) << "Error pinning new copy of lost object " << object_id
<< ", trying again";
PinOrReconstructObject(object_id, other_locations);
}
}));
}
void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
// Notify the task manager that we are retrying the task that created this
// object.
const auto task_id = object_id.TaskId();
std::vector<ObjectID> task_deps;
auto status = task_resubmitter_->ResubmitTask(task_id, &task_deps);
if (status.ok()) {
// Try to recover the task's dependencies.
for (const auto &dep : task_deps) {
auto status = RecoverObject(dep);
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to reconstruct object " << dep << ": "
<< status.message();
// We do not pin the dependency because we may not be the owner.
reconstruction_failure_callback_(dep, /*pin_object=*/false);
}
}
} else {
RAY_LOG(INFO) << "Failed to reconstruct object " << object_id;
reconstruction_failure_callback_(object_id, /*pin_object=*/true);
}
}
} // namespace ray

View file

@ -0,0 +1,151 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RAY_CORE_WORKER_OBJECT_RECOVERY_MANAGER_H
#define RAY_CORE_WORKER_OBJECT_RECOVERY_MANAGER_H
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/id.h"
#include "ray/core_worker/reference_count.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/task_manager.h"
#include "ray/raylet/raylet_client.h"
namespace ray {
typedef std::function<std::shared_ptr<PinObjectsInterface>(const std::string &ip_address,
int port)>
ObjectPinningClientFactoryFn;
typedef std::function<void(const ObjectID &object_id,
const std::vector<rpc::Address> &raylet_locations)>
ObjectLookupCallback;
class ObjectRecoveryManager {
public:
ObjectRecoveryManager(const rpc::Address &rpc_address,
ObjectPinningClientFactoryFn client_factory,
std::shared_ptr<PinObjectsInterface> local_object_pinning_client,
std::function<Status(const ObjectID &object_id,
const ObjectLookupCallback &callback)>
object_lookup,
std::shared_ptr<TaskResubmissionInterface> task_resubmitter,
std::shared_ptr<ReferenceCounter> reference_counter,
std::shared_ptr<CoreWorkerMemoryStore> in_memory_store,
std::function<void(const ObjectID &object_id, bool pin_object)>
reconstruction_failure_callback,
bool lineage_reconstruction_enabled)
: rpc_address_(rpc_address),
client_factory_(client_factory),
local_object_pinning_client_(local_object_pinning_client),
object_lookup_(object_lookup),
task_resubmitter_(task_resubmitter),
reference_counter_(reference_counter),
in_memory_store_(in_memory_store),
reconstruction_failure_callback_(reconstruction_failure_callback),
lineage_reconstruction_enabled_(lineage_reconstruction_enabled) {}
/// Recover an object that was stored in plasma. This will only succeed for
/// objects that are lost from memory and that this process owns (returns
/// Status::Invalid if false). This method is idempotent for overlapping
/// recovery operations on the same object. This class will guarantee that
/// each recovery operation ends in either success (by storing a new value
/// for the object in the direct memory/plasma store) or failure (by calling
/// the given reconstruction failure callback).
///
/// Algorithm:
/// 1. Check that the object is missing from the direct memory store and that
/// we own the object. If either is false, then fail the recovery operation.
/// 2. Look up the object in the global directory to check for other
/// locations of the object. If another location exists, attempt to pin it.
/// If the pinning is successful, then mark the recovery as a success by
/// storing a new value for the object in the direct memory store.
/// 3. If pinning fails at all locations for the object (or there are no
/// locations), attempt to reconstruct the object by resubmitting the task
/// that created the object. If the task resubmission fails, then the
/// fail the recovery operation.
/// 4. If task resubmission succeeds, recursively attempt to recover any
/// plasma arguments to the task. The recovery operation will succeed once
/// the task completes and stores a new value for its return object.
///
/// \return OK if recovery for the object has successfully started, Invalid
/// if the object is not recoverable because we do not own it. Note that the
/// Status::OK value only indicates that the recovery operation has started,
/// but does not guarantee that the recovery operation is successful.
Status RecoverObject(const ObjectID &object_id);
private:
/// Pin a new copy for a lost object from the given locations or, if that
/// fails, attempt to reconstruct it by resubmitting the task that created
/// the object.
void PinOrReconstructObject(const ObjectID &object_id,
const std::vector<rpc::Address> &locations);
/// Pin a new copy for the object at the given location. If that fails, then
/// try one of the other locations.
void PinExistingObjectCopy(const ObjectID &object_id,
const rpc::Address &raylet_address,
const std::vector<rpc::Address> &other_locations);
/// Reconstruct an object by resubmitting the task that created it.
void ReconstructObject(const ObjectID &object_id);
/// Used to resubmit tasks.
std::shared_ptr<TaskResubmissionInterface> task_resubmitter_;
/// Used to check whether we own an object.
std::shared_ptr<ReferenceCounter> reference_counter_;
/// Address of our RPC server.
rpc::Address rpc_address_;
/// Factory for producing new clients to pin objects at remote nodes.
const ObjectPinningClientFactoryFn client_factory_;
// Client that can be used to pin objects from the local raylet.
std::shared_ptr<PinObjectsInterface> local_object_pinning_client_;
/// Function to lookup an object's locations from the global database.
const std::function<Status(const ObjectID &object_id,
const ObjectLookupCallback &callback)>
object_lookup_;
/// Used to store object values (InPlasmaError) if recovery succeeds.
std::shared_ptr<CoreWorkerMemoryStore> in_memory_store_;
/// Callback to call if recovery fails.
const std::function<void(const ObjectID &object_id, bool pin_object)>
reconstruction_failure_callback_;
/// Whether lineage reconstruction is enabled. If disabled, then we will try
/// to pin new copies for a lost object, but we will never reconstruct it
/// through task submission.
const bool lineage_reconstruction_enabled_;
/// Protects below fields.
mutable absl::Mutex mu_;
/// Cache of gRPC clients to remote raylets for pinning objects.
absl::flat_hash_map<ClientID, std::shared_ptr<PinObjectsInterface>>
remote_object_pinning_clients_ GUARDED_BY(mu_);
/// Objects that are currently pending recovery. Calls to RecoverObject for
/// objects currently in this set are idempotent.
absl::flat_hash_set<ObjectID> objects_pending_recovery_ GUARDED_BY(mu_);
};
} // namespace ray
#endif // RAY_CORE_WORKER_OBJECT_RECOVERY_MANAGER_H

View file

@ -234,6 +234,16 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
deleted);
}
void ReferenceCounter::UpdateResubmittedTaskReferences(
const std::vector<ObjectID> &argument_ids) {
absl::MutexLock lock(&mutex_);
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
RAY_CHECK(it != object_id_refs_.end());
it->second.submitted_task_ref_count++;
}
}
void ReferenceCounter::UpdateFinishedTaskReferences(
const std::vector<ObjectID> &argument_ids, bool release_lineage,
const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs,
@ -323,8 +333,12 @@ bool ReferenceCounter::GetOwner(const ObjectID &object_id, TaskID *owner_id,
}
if (it->second.owner.has_value()) {
if (owner_id) {
*owner_id = it->second.owner.value().first;
}
if (owner_address) {
*owner_address = it->second.owner.value().second;
}
return true;
} else {
return false;
@ -404,6 +418,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
RAY_LOG(DEBUG) << "Calling on_delete for object " << id;
it->second.on_delete(id);
it->second.on_delete = nullptr;
it->second.pinned_at_raylet_id.reset();
}
if (deleted) {
deleted->push_back(id);
@ -430,12 +445,66 @@ bool ReferenceCounter::SetDeleteCallback(
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
return false;
} else if (it->second.OutOfScope() &&
!it->second.ShouldDelete(lineage_pinning_enabled_)) {
// The object has already gone out of scope but cannot be deleted yet. Do
// not set the deletion callback because it may never get called.
return false;
}
RAY_CHECK(!it->second.on_delete) << object_id;
it->second.on_delete = callback;
return true;
}
std::vector<ObjectID> ReferenceCounter::ResetObjectsOnRemovedNode(
const ClientID &raylet_id) {
absl::MutexLock lock(&mutex_);
std::vector<ObjectID> lost_objects;
for (auto &it : object_id_refs_) {
const auto &object_id = it.first;
auto &ref = it.second;
if (ref.pinned_at_raylet_id.value_or(ClientID::Nil()) == raylet_id) {
lost_objects.push_back(object_id);
ref.pinned_at_raylet_id.reset();
if (ref.on_delete) {
ref.on_delete(object_id);
ref.on_delete = nullptr;
}
}
}
return lost_objects;
}
void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id,
const ClientID &raylet_id) {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it != object_id_refs_.end()) {
// The object is still in scope. Track the raylet location until the object
// has gone out of scope or the raylet fails, whichever happens first.
RAY_CHECK(!it->second.pinned_at_raylet_id.has_value());
// Only the owner tracks the location.
RAY_CHECK(it->second.owned_by_us);
if (!it->second.OutOfScope()) {
it->second.pinned_at_raylet_id = raylet_id;
}
}
}
bool ReferenceCounter::IsPlasmaObjectPinned(const ObjectID &object_id,
bool *pinned) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it != object_id_refs_.end()) {
if (it->second.owned_by_us) {
*pinned = it->second.pinned_at_raylet_id.has_value();
return true;
}
}
return false;
}
bool ReferenceCounter::HasReference(const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
return object_id_refs_.find(object_id) != object_id_refs_.end();

View file

@ -86,6 +86,14 @@ class ReferenceCounter {
const std::vector<ObjectID> &argument_ids_to_remove = std::vector<ObjectID>(),
std::vector<ObjectID> *deleted = nullptr) LOCKS_EXCLUDED(mutex_);
/// Add references for the object dependencies of a resubmitted task. This
/// does not increment the arguments' lineage ref counts because we should
/// have already incremented them when the task was first submitted.
///
/// \param[in] argument_ids The arguments of the task to add references for.
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> &argument_ids)
LOCKS_EXCLUDED(mutex_);
/// Update object references that were given to a submitted task. The task
/// may still be borrowing any object IDs that were contained in its
/// arguments. This should be called when inlined dependencies are inlined or
@ -165,8 +173,8 @@ class ReferenceCounter {
/// \param[in] object_id The ID of the object to look up.
/// \param[out] owner_id The TaskID of the object owner.
/// \param[out] owner_address The address of the object owner.
bool GetOwner(const ObjectID &object_id, TaskID *owner_id,
rpc::Address *owner_address) const LOCKS_EXCLUDED(mutex_);
bool GetOwner(const ObjectID &object_id, TaskID *owner_id = nullptr,
rpc::Address *owner_address = nullptr) const LOCKS_EXCLUDED(mutex_);
/// Manually delete the objects from the reference counter.
void DeleteReferences(const std::vector<ObjectID> &object_ids) LOCKS_EXCLUDED(mutex_);
@ -177,6 +185,9 @@ class ReferenceCounter {
const std::function<void(const ObjectID &)> callback)
LOCKS_EXCLUDED(mutex_);
void ResetDeleteCallbacks(const std::vector<ObjectID> &object_ids)
LOCKS_EXCLUDED(mutex_);
/// Set a callback for when we are no longer borrowing this object (when our
/// ref count goes to 0).
///
@ -267,6 +278,33 @@ class ReferenceCounter {
const std::vector<ObjectID> &inner_ids,
const rpc::WorkerAddress &owner_address) LOCKS_EXCLUDED(mutex_);
/// Update the pinned location of an object stored in plasma.
///
/// \param[in] object_id The object to update.
/// \param[in] raylet_id The raylet that is now pinning the object ID.
void UpdateObjectPinnedAtRaylet(const ObjectID &object_id, const ClientID &raylet_id)
LOCKS_EXCLUDED(mutex_);
/// Check whether the object is pinned at a remote plasma store node.
///
/// \param[in] object_id The object to check.
/// \param[out] pinned Whether the object was pinned at a remote plasma store
/// node.
/// \return True if the object exists and is owned by us, false otherwise. We
/// return false here because a borrower should not know the pinned location
/// for an object.
bool IsPlasmaObjectPinned(const ObjectID &object_id, bool *pinned) const
LOCKS_EXCLUDED(mutex_);
/// Get and reset the objects that were pinned on the given node. This
/// method should be called upon a node failure, to determine which plasma
/// objects were lost. If a deletion callback was set for a lost object, it
/// will be invoked and reset.
///
/// \param[in] node_id The node whose object store has been removed.
/// \return The set of objects that were pinned on the given node.
std::vector<ObjectID> ResetObjectsOnRemovedNode(const ClientID &raylet_id);
/// Whether we have a reference to a particular ObjectID.
///
/// \param[in] object_id The object ID to check for.
@ -350,6 +388,10 @@ class ReferenceCounter {
/// if we do not know the object's owner (because distributed ref counting
/// is not yet implemented).
absl::optional<std::pair<TaskID, rpc::Address>> owner;
// If this object is owned by us and stored in plasma, and reference
// counting is enabled, then some raylet must be pinning the object value.
// This is the address of that raylet.
absl::optional<ClientID> pinned_at_raylet_id;
/// The local ref count for the ObjectID in the language frontend.
size_t local_ref_count = 0;

View file

@ -1800,6 +1800,11 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
// The task finishes but is retryable.
rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out);
// We should fail to set the deletion callback because the object has
// already gone out of scope.
ASSERT_FALSE(rc->SetDeleteCallback(
id, [&](const ObjectID &object_id) { ASSERT_FALSE(true); }));
ASSERT_EQ(out.size(), 1);
out.clear();
ASSERT_TRUE(lineage_deleted.empty());
@ -1815,6 +1820,81 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
ASSERT_EQ(rc->NumObjectIDsInScope(), 0);
}
TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
std::vector<ObjectID> out;
std::vector<ObjectID> lineage_deleted;
ObjectID id = ObjectID::FromRandom();
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0);
rc->SetReleaseLineageCallback(
[&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
lineage_deleted.push_back(object_id);
});
// Local references.
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->HasReference(id));
// Submit 2 dependent tasks.
rc->UpdateSubmittedTaskReferences({id});
rc->UpdateSubmittedTaskReferences({id});
rc->RemoveLocalReference(id, nullptr);
ASSERT_TRUE(rc->HasReference(id));
// Both tasks finish, 1 is retryable.
rc->UpdateFinishedTaskReferences({id}, true, empty_borrower, empty_refs, &out);
rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out);
// The dependency is no longer in scope, but we still keep a reference to it
// because it is in the lineage of the retryable task.
ASSERT_EQ(out.size(), 1);
ASSERT_TRUE(rc->HasReference(id));
// Simulate retrying the task.
rc->UpdateResubmittedTaskReferences({id});
rc->UpdateFinishedTaskReferences({id}, true, empty_borrower, empty_refs, &out);
ASSERT_FALSE(rc->HasReference(id));
ASSERT_EQ(lineage_deleted.size(), 1);
}
TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
auto deleted = std::make_shared<std::unordered_set<ObjectID>>();
auto callback = [&](const ObjectID &object_id) { deleted->insert(object_id); };
ObjectID borrowed_id = ObjectID::FromRandom();
rc->AddLocalReference(borrowed_id, "");
bool pinned = false;
ASSERT_FALSE(rc->IsPlasmaObjectPinned(borrowed_id, &pinned));
ObjectID id = ObjectID::FromRandom();
ClientID node_id = ClientID::FromRandom();
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned));
ASSERT_FALSE(pinned);
rc->UpdateObjectPinnedAtRaylet(id, node_id);
ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned));
ASSERT_TRUE(pinned);
rc->RemoveLocalReference(id, nullptr);
ASSERT_FALSE(rc->IsPlasmaObjectPinned(id, &pinned));
ASSERT_TRUE(deleted->count(id) > 0);
deleted->clear();
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
rc->UpdateObjectPinnedAtRaylet(id, node_id);
auto objects = rc->ResetObjectsOnRemovedNode(node_id);
ASSERT_EQ(objects.size(), 1);
ASSERT_EQ(objects[0], id);
ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned));
ASSERT_FALSE(pinned);
ASSERT_TRUE(deleted->count(id) > 0);
deleted->clear();
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -154,7 +154,7 @@ class CoreWorkerMemoryStore {
std::shared_ptr<raylet::RayletClient> raylet_client_ = nullptr;
/// Protects the data structures below.
absl::Mutex mu_;
mutable absl::Mutex mu_;
/// Set of objects that should be promoted to plasma once available.
absl::flat_hash_set<ObjectID> promoted_to_plasma_ GUARDED_BY(mu_);

View file

@ -78,6 +78,50 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
}
}
Status TaskManager::ResubmitTask(const TaskID &task_id,
std::vector<ObjectID> *task_deps) {
TaskSpecification spec;
bool resubmit = false;
{
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
if (it == submissible_tasks_.end()) {
return Status::Invalid("Task spec missing");
}
if (!it->second.pending) {
resubmit = true;
it->second.pending = true;
RAY_CHECK(it->second.num_retries_left > 0);
it->second.num_retries_left--;
spec = it->second.spec;
}
}
for (size_t i = 0; i < spec.NumArgs(); i++) {
if (spec.ArgByRef(i)) {
for (size_t j = 0; j < spec.ArgIdCount(i); j++) {
task_deps->push_back(spec.ArgId(i, j));
}
} else {
const auto &inlined_ids = spec.ArgInlinedIds(i);
for (const auto &inlined_id : inlined_ids) {
task_deps->push_back(inlined_id);
}
}
}
if (!task_deps->empty()) {
reference_counter_->UpdateResubmittedTaskReferences(*task_deps);
}
if (resubmit) {
retry_task_callback_(spec, /*delay=*/false);
}
return Status::OK();
}
void TaskManager::DrainAndShutdown(std::function<void()> shutdown) {
bool has_pending_tasks = false;
{
@ -127,6 +171,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
RAY_LOG(DEBUG) << "Completing task " << task_id;
std::vector<ObjectID> direct_return_ids;
std::vector<ObjectID> plasma_return_ids;
for (int i = 0; i < reply.return_objects_size(); i++) {
const auto &return_object = reply.return_objects(i);
ObjectID object_id = ObjectID::FromBinary(return_object.object_id());
@ -136,7 +181,14 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
// Mark it as in plasma with a dummy object.
RAY_CHECK(
in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
const auto pinned_at_raylet_id = ClientID::FromBinary(worker_addr.raylet_id());
reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id);
} else {
// NOTE(swang): If a direct object was promoted to plasma, then we do not
// record the node ID that it was pinned at, which means that we will not
// be able to reconstruct it if the plasma object copy is lost. However,
// this is okay because the pinned copy is on the local node, so we will
// fate-share with the object if the local node fails.
std::shared_ptr<LocalMemoryBuffer> data_buffer;
if (return_object.data().size() > 0) {
data_buffer = std::make_shared<LocalMemoryBuffer>(
@ -235,7 +287,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
if (num_retries_left > 0) {
RAY_LOG(ERROR) << num_retries_left << " retries left for task " << spec.TaskId()
<< ", attempting to resubmit.";
retry_task_callback_(spec);
retry_task_callback_(spec, /*delay=*/true);
} else {
// Throttled logging of task failure errors.
{
@ -380,11 +432,4 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id,
}
}
TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const {
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
RAY_CHECK(it != submissible_tasks_.end());
return it->second.spec;
}
} // namespace ray

View file

@ -42,9 +42,17 @@ class TaskFinisherInterface {
virtual ~TaskFinisherInterface() {}
};
using RetryTaskCallback = std::function<void(const TaskSpecification &spec)>;
class TaskResubmissionInterface {
public:
virtual Status ResubmitTask(const TaskID &task_id,
std::vector<ObjectID> *task_deps) = 0;
class TaskManager : public TaskFinisherInterface {
virtual ~TaskResubmissionInterface() {}
};
using RetryTaskCallback = std::function<void(const TaskSpecification &spec, bool delay)>;
class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface {
public:
TaskManager(std::shared_ptr<CoreWorkerMemoryStore> in_memory_store,
std::shared_ptr<ReferenceCounter> reference_counter,
@ -73,6 +81,19 @@ class TaskManager : public TaskFinisherInterface {
const TaskSpecification &spec, const std::string &call_site,
int max_retries = 0);
/// Resubmit a task that has completed execution before. This is used to
/// reconstruct objects stored in Plasma that were lost.
///
/// \param[in] task_id The ID of the task to resubmit.
/// \param[out] task_deps The object dependencies of the resubmitted task,
/// i.e. all arguments that were not inlined in the task spec. The caller is
/// responsible for making sure that these dependencies become available, so
/// that the resubmitted task can run. This is only populated if the task was
/// not already pending and was successfully resubmitted.
/// \return OK if the task was successfully resubmitted or was
/// already pending, Invalid if the task spec is no longer present.
Status ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps);
/// Wait for all pending tasks to finish, and then shutdown.
///
/// \param shutdown The shutdown callback to call.
@ -108,9 +129,6 @@ class TaskManager : public TaskFinisherInterface {
void OnTaskDependenciesInlined(const std::vector<ObjectID> &inlined_dependency_ids,
const std::vector<ObjectID> &contained_ids) override;
/// Return the spec for a pending task.
TaskSpecification GetTaskSpec(const TaskID &task_id) const;
/// Return whether this task can be submitted for execution.
///
/// \param[in] task_id ID of the task to query.

View file

@ -0,0 +1,238 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/core_worker/transport/direct_task_transport.h"
#include "gtest/gtest.h"
#include "ray/common/task/task_spec.h"
#include "ray/common/task/task_util.h"
#include "ray/common/test_util.h"
#include "ray/core_worker/object_recovery_manager.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/raylet/raylet_client.h"
namespace ray {
// Used to prevent leases from timing out when not testing that logic. It would
// be better to use a mock clock or lease manager interface, but that's high
// overhead for the very simple timeout logic we currently have.
int64_t kLongTimeout = 1024 * 1024 * 1024;
class MockTaskResubmitter : public TaskResubmissionInterface {
public:
MockTaskResubmitter() {}
void AddTask(const TaskID &task_id, std::vector<ObjectID> task_deps) {
task_specs[task_id] = task_deps;
}
Status ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) {
if (task_specs.find(task_id) == task_specs.end()) {
return Status::Invalid("");
}
for (const auto &dep : task_specs[task_id]) {
task_deps->push_back(dep);
}
num_tasks_resubmitted++;
return Status::OK();
}
std::unordered_map<TaskID, std::vector<ObjectID>> task_specs;
int num_tasks_resubmitted = 0;
};
class MockRayletClient : public PinObjectsInterface {
public:
ray::Status PinObjectIDs(
const rpc::Address &caller_address, const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) override {
RAY_LOG(INFO) << "PinObjectIDs " << object_ids.size();
callbacks.push_back(callback);
return Status::OK();
}
size_t Flush() {
size_t flushed = callbacks.size();
for (const auto &callback : callbacks) {
callback(Status::OK(), rpc::PinObjectIDsReply());
}
callbacks.clear();
return flushed;
}
std::list<rpc::ClientCallback<rpc::PinObjectIDsReply>> callbacks = {};
};
class MockObjectDirectory {
public:
void AsyncGetLocations(const ObjectID &object_id,
const ObjectLookupCallback &callback) {
callbacks.push_back({object_id, callback});
}
void SetLocations(const ObjectID &object_id,
const std::vector<rpc::Address> &addresses) {
locations[object_id] = addresses;
}
size_t Flush() {
size_t flushed = callbacks.size();
for (const auto &pair : callbacks) {
pair.second(pair.first, locations[pair.first]);
}
for (size_t i = 0; i < flushed; i++) {
callbacks.erase(callbacks.begin());
}
return flushed;
}
std::vector<std::pair<ObjectID, ObjectLookupCallback>> callbacks = {};
std::unordered_map<ObjectID, std::vector<rpc::Address>> locations;
};
class ObjectRecoveryManagerTest : public ::testing::Test {
public:
ObjectRecoveryManagerTest()
: local_raylet_id_(ClientID::FromRandom()),
object_directory_(std::make_shared<MockObjectDirectory>()),
memory_store_(std::make_shared<CoreWorkerMemoryStore>()),
raylet_client_(std::make_shared<MockRayletClient>()),
task_resubmitter_(std::make_shared<MockTaskResubmitter>()),
ref_counter_(std::make_shared<ReferenceCounter>(
rpc::Address(), /*distributed_ref_counting_enabled=*/true,
/*lineage_pinning_enabled=*/true)),
manager_(rpc::Address(),
[&](const std::string &ip, int port) { return raylet_client_; },
raylet_client_,
[&](const ObjectID &object_id, const ObjectLookupCallback &callback) {
object_directory_->AsyncGetLocations(object_id, callback);
return Status::OK();
},
task_resubmitter_, ref_counter_, memory_store_,
[&](const ObjectID &object_id, bool pin_object) {
RAY_CHECK(failed_reconstructions_.count(object_id) == 0);
failed_reconstructions_[object_id] = pin_object;
std::string meta =
std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(
reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer =
std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto data = RayObject(nullptr, meta_buffer, std::vector<ObjectID>());
RAY_CHECK(memory_store_->Put(data, object_id));
ref_counter_->UpdateObjectPinnedAtRaylet(object_id, local_raylet_id_);
},
/*lineage_reconstruction_enabled=*/true) {}
ClientID local_raylet_id_;
std::unordered_map<ObjectID, bool> failed_reconstructions_;
std::shared_ptr<MockObjectDirectory> object_directory_;
std::shared_ptr<CoreWorkerMemoryStore> memory_store_;
std::shared_ptr<MockRayletClient> raylet_client_;
std::shared_ptr<MockTaskResubmitter> task_resubmitter_;
std::shared_ptr<ReferenceCounter> ref_counter_;
ObjectRecoveryManager manager_;
};
TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0);
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
}
TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0);
std::vector<rpc::Address> addresses({rpc::Address()});
object_directory_->SetLocations(object_id, addresses);
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(raylet_client_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
}
TEST_F(ObjectRecoveryManagerTest, TestReconstruction) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0);
task_resubmitter_->AddTask(object_id.TaskId(), {});
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 1);
}
TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0);
ref_counter_->AddLocalReference(object_id, "");
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
// A second attempt to recover the object will not trigger any more
// callbacks.
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(object_directory_->Flush() == 1);
failed_reconstructions_.clear();
// The object has been marked as failed. Another attempt to recover the
// object will not trigger any callbacks.
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_EQ(object_directory_->Flush(), 0);
// The object is removed and can be recovered again.
auto objects = ref_counter_->ResetObjectsOnRemovedNode(local_raylet_id_);
ASSERT_EQ(objects.size(), 1);
ASSERT_EQ(objects[0], object_id);
memory_store_->Delete(objects);
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(object_directory_->Flush() == 1);
}
TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) {
std::vector<ObjectID> object_ids;
std::vector<ObjectID> dependencies;
for (int i = 0; i < 3; i++) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0);
task_resubmitter_->AddTask(object_id.TaskId(), dependencies);
dependencies = {object_id};
object_ids.push_back(object_id);
}
ASSERT_TRUE(manager_.RecoverObject(object_ids.back()).ok());
for (int i = 0; i < 3; i++) {
RAY_LOG(INFO) << i;
ASSERT_EQ(object_directory_->Flush(), 1);
ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, i + 1);
}
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View file

@ -51,7 +51,7 @@ class TaskManagerTest : public ::testing::Test {
/*distributed_ref_counting_enabled=*/true, lineage_pinning_enabled))),
actor_manager_(std::shared_ptr<ActorManagerInterface>(new MockActorManager())),
manager_(store_, reference_counter_, actor_manager_,
[this](const TaskSpecification &spec) {
[this](const TaskSpecification &spec, bool delay) {
num_retries_++;
return Status::OK();
}) {}
@ -421,6 +421,65 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) {
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
}
// Test to make sure that the task manager only resubmits tasks whose specs are
// pinned and that are not already pending execution.
TEST_F(TaskManagerLineageTest, TestResubmitTask) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
// Submit a task with 2 arguments.
ObjectID dep1 = ObjectID::FromRandom();
ObjectID dep2 = ObjectID::FromRandom();
auto spec = CreateTaskHelper(1, {dep1, dep2});
int num_retries = 3;
// Cannot resubmit a task whose spec we do not have.
std::vector<ObjectID> resubmitted_task_deps;
ASSERT_FALSE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok());
ASSERT_TRUE(resubmitted_task_deps.empty());
ASSERT_EQ(num_retries_, 0);
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
// A task that is already pending does not get resubmitted.
ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok());
ASSERT_TRUE(resubmitted_task_deps.empty());
ASSERT_EQ(num_retries_, 0);
// The task completes.
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id, "");
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
auto data = GenerateRandomBuffer();
return_object->set_data(data->Data(), data->Size());
return_object->set_in_plasma(true);
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
// The task finished, its return ID is still in scope, and the return object
// was stored in plasma. It is okay to resubmit it now.
ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok());
ASSERT_EQ(resubmitted_task_deps, spec.GetDependencies());
ASSERT_EQ(num_retries_, 1);
resubmitted_task_deps.clear();
// The return ID goes out of scope.
reference_counter_->RemoveLocalReference(return_id, nullptr);
// The task is still pending execution.
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
// A task that is already pending does not get resubmitted.
ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok());
ASSERT_TRUE(resubmitted_task_deps.empty());
ASSERT_EQ(num_retries_, 1);
// The resubmitted task finishes.
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
// The task cannot be resubmitted because its spec has been released.
ASSERT_FALSE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok());
ASSERT_TRUE(resubmitted_task_deps.empty());
ASSERT_EQ(num_retries_, 1);
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -162,6 +162,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
node_manager_service_(io_service, *this),
client_call_manager_(io_service),
new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()) {
RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_;
RAY_CHECK(heartbeat_period_.count() > 0);
// Initialize the resource map with own cluster resource configuration.
cluster_resource_map_.emplace(self_node_id_,
@ -3371,10 +3372,13 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
}
RAY_LOG(DEBUG) << "Pinning object " << object_id;
pinned_objects_.emplace(
object_id, std::unique_ptr<RayObject>(new RayObject(
RAY_CHECK(
pinned_objects_
.emplace(object_id,
std::unique_ptr<RayObject>(new RayObject(
std::make_shared<PlasmaBuffer>(plasma_results[i].data),
std::make_shared<PlasmaBuffer>(plasma_results[i].metadata), {})));
std::make_shared<PlasmaBuffer>(plasma_results[i].metadata), {})))
.second);
i++;
// Send a long-running RPC request to the owner for each object. When we get a
@ -3394,7 +3398,9 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
pinned_objects_.erase(object_id);
// Try to evict all copies of the object from the cluster.
if (free_objects_period_ >= 0) {
objects_to_free_.push_back(object_id);
}
if (objects_to_free_.size() ==
RayConfig::instance().free_objects_batch_size() ||
free_objects_period_ == 0) {
@ -3411,10 +3417,6 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
}
void NodeManager::FlushObjectsToFree() {
if (free_objects_period_ < 0) {
return;
}
if (!objects_to_free_.empty()) {
RAY_LOG(DEBUG) << "Freeing " << objects_to_free_.size() << " out-of-scope objects";
object_manager_.FreeObjects(objects_to_free_, /*local_only=*/false);

View file

@ -48,6 +48,17 @@ namespace ray {
typedef boost::asio::generic::stream_protocol local_stream_protocol;
typedef boost::asio::basic_stream_socket<local_stream_protocol> local_stream_socket;
/// Interface for pinning objects. Abstract for testing.
class PinObjectsInterface {
public:
/// Request to a raylet to pin a plasma object. The callback will be sent via gRPC.
virtual ray::Status PinObjectIDs(
const rpc::Address &caller_address, const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) = 0;
virtual ~PinObjectsInterface(){};
};
/// Interface for leasing workers. Abstract for testing.
class WorkerLeaseInterface {
public:
@ -129,7 +140,9 @@ class RayletConnection {
std::mutex write_mutex_;
};
class RayletClient : public WorkerLeaseInterface, public DependencyWaiterInterface {
class RayletClient : public PinObjectsInterface,
public WorkerLeaseInterface,
public DependencyWaiterInterface {
public:
/// Connect to the raylet.
///
@ -287,7 +300,7 @@ class RayletClient : public WorkerLeaseInterface, public DependencyWaiterInterfa
ray::Status PinObjectIDs(
const rpc::Address &caller_address, const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback);
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) override;
ray::Status GlobalGC(const rpc::ClientCallback<rpc::GlobalGCReply> &callback);