diff --git a/BUILD.bazel b/BUILD.bazel index d26295b79..7d0955bb8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -272,6 +272,7 @@ cc_library( "@com_github_grpc_grpc//:grpc++", "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/container:flat_hash_set", + "@com_google_absl//absl/memory", "@com_google_googletest//:gtest", "@msgpack", "@plasma//:plasma_client", @@ -542,6 +543,16 @@ cc_test( ], ) +cc_test( + name = "object_recovery_manager_test", + srcs = ["src/ray/core_worker/test/object_recovery_manager_test.cc"], + copts = COPTS, + deps = [ + ":core_worker_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "scheduling_queue_test", srcs = ["src/ray/core_worker/test/scheduling_queue_test.cc"], diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index db16c7881..b1fd2ddb2 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -203,6 +203,14 @@ py_test( deps = ["//:ray_lib"], ) +py_test( + name = "test_reconstruction", + size = "medium", + srcs = ["test_reconstruction.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) + py_test( name = "test_reference_counting", size = "medium", diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py new file mode 100644 index 000000000..d0d7cf4f5 --- /dev/null +++ b/python/ray/tests/test_reconstruction.py @@ -0,0 +1,279 @@ +import json +import sys + +import numpy as np +import pytest + +import ray +from ray.cluster_utils import Cluster +from ray.test_utils import ( + wait_for_condition, ) + + +def test_cached_object(ray_start_cluster): + config = json.dumps({ + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + }) + cluster = Cluster() + # Head node with no resources. + cluster.add_node(num_cpus=0, _internal_config=config) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) + cluster.add_node( + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + @ray.remote + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def dependent_task(x): + return + + obj = large_object.options(resources={"node1": 1}).remote() + ray.get(dependent_task.options(resources={"node2": 1}).remote(obj)) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) + assert wait_for_condition( + lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10) + + for _ in range(20): + large_object.options(resources={"node2": 1}).remote() + + ray.get(dependent_task.remote(obj)) + + +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_reconstruction_cached_dependency(ray_start_cluster, + reconstruction_enabled): + config = json.dumps({ + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, + "free_objects_period_milliseconds": -1, + }) + cluster = Cluster() + # Head node with no resources. + cluster.add_node(num_cpus=0, _internal_config=config) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, + resources={"node1": 1}, + object_store_memory=10**8, + _internal_config=config) + cluster.add_node( + num_cpus=1, + resources={"node2": 1}, + object_store_memory=10**8, + _internal_config=config) + cluster.wait_for_nodes() + ray.init(address=cluster.address, _internal_config=config) + + @ray.remote(max_retries=0) + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def chain(x): + return x + + @ray.remote + def dependent_task(x): + return + + obj = large_object.options(resources={"node2": 1}).remote() + obj = chain.options(resources={"node1": 1}).remote(obj) + ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, + resources={"node1": 1}, + object_store_memory=10**8, + _internal_config=config) + assert wait_for_condition( + lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10) + + for _ in range(20): + large_object.options(resources={"node2": 1}).remote() + + if reconstruction_enabled: + ray.get(dependent_task.remote(obj)) + else: + with pytest.raises(ray.exceptions.RayTaskError) as e: + ray.get(dependent_task.remote(obj)) + with pytest.raises(ray.exceptions.UnreconstructableError): + raise e.as_instanceof_cause() + + +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): + config = json.dumps({ + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, + "free_objects_period_milliseconds": -1, + }) + cluster = Cluster() + # Head node with no resources. + cluster.add_node(num_cpus=0, _internal_config=config) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, + resources={"node1": 1}, + object_store_memory=10**8, + _internal_config=config) + cluster.add_node( + num_cpus=1, + resources={"node2": 1}, + object_store_memory=10**8, + _internal_config=config) + cluster.wait_for_nodes() + ray.init(address=cluster.address, _internal_config=config) + + @ray.remote(max_retries=1 if reconstruction_enabled else 0) + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def dependent_task(x): + return + + obj = large_object.options(resources={"node1": 1}).remote() + ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, + resources={"node1": 1}, + object_store_memory=10**8, + _internal_config=config) + + if reconstruction_enabled: + ray.get(dependent_task.remote(obj)) + else: + with pytest.raises(ray.exceptions.RayTaskError) as e: + ray.get(dependent_task.remote(obj)) + with pytest.raises(ray.exceptions.UnreconstructableError): + raise e.as_instanceof_cause() + + +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): + config = json.dumps({ + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, + "free_objects_period_milliseconds": -1, + }) + cluster = Cluster() + # Head node with no resources. + cluster.add_node(num_cpus=0, _internal_config=config) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, + resources={"node1": 1}, + object_store_memory=10**8, + _internal_config=config) + cluster.add_node( + num_cpus=1, + resources={"node2": 1}, + object_store_memory=10**8, + _internal_config=config) + cluster.wait_for_nodes() + ray.init(address=cluster.address, _internal_config=config) + + @ray.remote(max_retries=1 if reconstruction_enabled else 0) + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def chain(x): + return x + + @ray.remote + def dependent_task(x): + return + + obj = large_object.options(resources={"node2": 1}).remote() + downstream = [chain.remote(obj) for _ in range(4)] + for obj in downstream: + ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, + resources={"node1": 1}, + object_store_memory=10**8, + _internal_config=config) + + if reconstruction_enabled: + for obj in downstream: + ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) + else: + with pytest.raises(ray.exceptions.RayTaskError) as e: + for obj in downstream: + ray.get( + dependent_task.options(resources={ + "node1": 1 + }).remote(obj)) + with pytest.raises(ray.exceptions.UnreconstructableError): + raise e.as_instanceof_cause() + + +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): + config = json.dumps({ + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, + "free_objects_period_milliseconds": -1, + }) + cluster = Cluster() + # Head node with no resources. + cluster.add_node( + num_cpus=0, _internal_config=config, object_store_memory=10**8) + node_to_kill = cluster.add_node( + num_cpus=1, object_store_memory=10**8, _internal_config=config) + cluster.wait_for_nodes() + ray.init(address=cluster.address, _internal_config=config) + + @ray.remote(max_retries=1 if reconstruction_enabled else 0) + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def chain(x): + return x + + @ray.remote + def dependent_task(x): + return x + + obj = large_object.remote() + for _ in range(20): + obj = chain.remote(obj) + ray.get(dependent_task.remote(obj)) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, object_store_memory=10**8, _internal_config=config) + + if reconstruction_enabled: + ray.get(dependent_task.remote(obj)) + else: + with pytest.raises(ray.exceptions.RayTaskError) as e: + ray.get(dependent_task.remote(obj)) + with pytest.raises(ray.exceptions.UnreconstructableError): + raise e.as_instanceof_cause() + + +if __name__ == "__main__": + import pytest + sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/ray_object.h b/src/ray/common/ray_object.h index 525901a19..e785e98c7 100644 --- a/src/ray/common/ray_object.h +++ b/src/ray/common/ray_object.h @@ -15,6 +15,7 @@ #ifndef RAY_COMMON_RAY_OBJECT_H #define RAY_COMMON_RAY_OBJECT_H +#include "absl/types/optional.h" #include "ray/common/buffer.h" #include "ray/common/id.h" #include "ray/protobuf/gcs.pb.h" diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 6f625c101..260b8118a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -274,6 +274,14 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_CHECK_OK(gcs_client_->Connect(io_service_)); RegisterToGcs(); + // Register a callback to monitor removed nodes. + auto on_node_change = [this](const ClientID &node_id, const rpc::GcsNodeInfo &data) { + if (data.state() == rpc::GcsNodeInfo::DEAD) { + OnNodeRemoved(data); + } + }; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); + actor_manager_ = std::unique_ptr(new ActorManager(gcs_client_->Actors())); // Initialize profiler. @@ -342,8 +350,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ boost::asio::chrono::milliseconds(kInternalHeartbeatMillis)); internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this, _1)); - io_thread_ = std::thread(&CoreWorker::RunIOService, this); - plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( options_.store_socket, local_raylet_client_, options_.check_signals, /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), @@ -359,14 +365,18 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ task_manager_.reset(new TaskManager( memory_store_, reference_counter_, actor_manager_, - [this](const TaskSpecification &spec) { - // Retry after a delay to emulate the existing Raylet reconstruction - // behaviour. TODO(ekl) backoff exponentially. - uint32_t delay = RayConfig::instance().task_retry_delay_ms(); - RAY_LOG(ERROR) << "Will resubmit task after a " << delay - << "ms delay: " << spec.DebugString(); - absl::MutexLock lock(&mutex_); - to_resubmit_.push_back(std::make_pair(current_time_ms() + delay, spec)); + [this](const TaskSpecification &spec, bool delay) { + if (delay) { + // Retry after a delay to emulate the existing Raylet reconstruction + // behaviour. TODO(ekl) backoff exponentially. + uint32_t delay = RayConfig::instance().task_retry_delay_ms(); + RAY_LOG(ERROR) << "Will resubmit task after a " << delay + << "ms delay: " << spec.DebugString(); + absl::MutexLock lock(&mutex_); + to_resubmit_.push_back(std::make_pair(current_time_ms() + delay, spec)); + } else { + RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec)); + } })); // Create an entry for the driver task in the task table. This task is @@ -394,19 +404,19 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ return std::shared_ptr( new rpc::CoreWorkerClient(addr, *client_call_manager_)); }; + auto raylet_client_factory = [this](const std::string ip_address, int port) { + auto grpc_client = + rpc::NodeManagerWorkerClient::make(ip_address, port, *client_call_manager_); + return std::shared_ptr( + new raylet::RayletClient(std::move(grpc_client))); + }; direct_actor_submitter_ = std::unique_ptr( new CoreWorkerDirectActorTaskSubmitter(rpc_address_, client_factory, memory_store_, task_manager_)); direct_task_submitter_ = std::unique_ptr(new CoreWorkerDirectTaskSubmitter( - rpc_address_, local_raylet_client_, client_factory, - [this](const std::string ip_address, int port) { - auto grpc_client = rpc::NodeManagerWorkerClient::make(ip_address, port, - *client_call_manager_); - return std::shared_ptr( - new raylet::RayletClient(std::move(grpc_client))); - }, + rpc_address_, local_raylet_client_, client_factory, raylet_client_factory, memory_store_, task_manager_, local_raylet_id, RayConfig::instance().worker_lease_timeout_milliseconds())); future_resolver_.reset(new FutureResolver(memory_store_, client_factory)); @@ -414,6 +424,44 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ if (direct_task_receiver_ != nullptr) { direct_task_receiver_->Init(client_factory, rpc_address_, local_raylet_client_); } + + auto object_lookup_fn = [this](const ObjectID &object_id, + const ObjectLookupCallback &callback) { + return gcs_client_->Objects().AsyncGetLocations( + object_id, + [this, object_id, callback](const Status &status, + const std::vector &results) { + RAY_CHECK_OK(status); + std::vector locations; + for (const auto &result : results) { + const auto &node_id = ClientID::FromBinary(result.manager()); + auto node = gcs_client_->Nodes().Get(node_id); + RAY_CHECK(node.has_value()); + if (node->state() == rpc::GcsNodeInfo::ALIVE) { + rpc::Address address; + address.set_raylet_id(node->node_id()); + address.set_ip_address(node->node_manager_address()); + address.set_port(node->node_manager_port()); + locations.push_back(address); + } + } + callback(object_id, locations); + }); + }; + object_recovery_manager_ = + std::unique_ptr(new ObjectRecoveryManager( + rpc_address_, raylet_client_factory, local_raylet_client_, object_lookup_fn, + task_manager_, reference_counter_, memory_store_, + [this](const ObjectID &object_id, bool pin_object) { + RAY_CHECK_OK(Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), + /*contained_object_ids=*/{}, object_id, + /*pin_object=*/pin_object)); + }, + RayConfig::instance().lineage_pinning_enabled())); + + // Start the IO thread after all other members have been initialized, in case + // the thread calls back into any of our members. + io_thread_ = std::thread(&CoreWorker::RunIOService, this); } void CoreWorker::Shutdown() { @@ -502,6 +550,22 @@ void CoreWorker::RunIOService() { io_service_.run(); } +void CoreWorker::OnNodeRemoved(const rpc::GcsNodeInfo &node_info) { + const auto node_id = ClientID::FromBinary(node_info.node_id()); + RAY_LOG(INFO) << "Node failure " << node_id; + const auto lost_objects = reference_counter_->ResetObjectsOnRemovedNode(node_id); + // Delete the objects from the in-memory store to indicate that they are not + // available. The object recovery manager will guarantee that a new value + // will eventually be stored for the objects (either an + // UnreconstructableError or a value reconstructed from lineage). + memory_store_->Delete(lost_objects); + for (const auto &object_id : lost_objects) { + RAY_LOG(INFO) << "Object " << object_id << " lost due to node failure " << node_id; + // The lost object must have been owned by us. + RAY_CHECK_OK(object_recovery_manager_->RecoverObject(object_id)); + } +} + void CoreWorker::WaitForShutdown() { if (io_thread_.joinable()) { io_thread_.join(); @@ -1611,6 +1675,8 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques void CoreWorker::HandleWaitForObjectEviction( const rpc::WaitForObjectEvictionRequest &request, rpc::WaitForObjectEvictionReply *reply, rpc::SendReplyCallback send_reply_callback) { + // TODO(swang): Drop requests from raylets that executed an older version of + // the task. if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { return; @@ -1695,7 +1761,9 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & rpc::SendReplyCallback send_reply_callback) { absl::MutexLock lock(&mutex_); auto stats = reply->mutable_core_worker_stats(); - stats->set_num_pending_tasks(task_manager_->NumPendingTasks()); + // TODO(swang): Differentiate between tasks that are currently pending + // execution and tasks that have finished but may be retried. + stats->set_num_pending_tasks(task_manager_->NumSubmissibleTasks()); stats->set_task_queue_length(task_queue_length_); stats->set_num_executed_tasks(num_executed_tasks_); stats->set_num_object_ids_in_scope(reference_counter_->NumObjectIDsInScope()); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 21cfe5f3f..1a803cc63 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -23,6 +23,7 @@ #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/future_resolver.h" +#include "ray/core_worker/object_recovery_manager.h" #include "ray/core_worker/profiling.h" #include "ray/core_worker/reference_count.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" @@ -867,6 +868,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } } + /// Handler if a raylet node is removed from the cluster. + void OnNodeRemoved(const rpc::GcsNodeInfo &node_info); + const CoreWorkerOptions options_; /// Callback to get the current language (e.g., Python) call site. @@ -961,6 +965,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // Interface to submit non-actor tasks directly to leased workers. std::unique_ptr direct_task_submitter_; + /// Manages recovery of objects stored in remote plasma nodes. + std::unique_ptr object_recovery_manager_; + /// The `actor_handles_` field could be mutated concurrently due to multi-threading, we /// need a mutex to protect it. mutable absl::Mutex actor_handles_mutex_; diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc new file mode 100644 index 000000000..235e67c72 --- /dev/null +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -0,0 +1,145 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/core_worker/object_recovery_manager.h" + +#include "ray/util/util.h" + +namespace ray { + +Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { + // Check the ReferenceCounter to see if there is a location for the object. + bool pinned = false; + bool owned_by_us = reference_counter_->IsPlasmaObjectPinned(object_id, &pinned); + if (!owned_by_us) { + return Status::Invalid( + "Object reference no longer exists or is not owned by us. Either lineage pinning " + "is disabled or this object ID is borrowed."); + } + + bool already_pending_recovery = true; + if (!pinned) { + { + absl::MutexLock lock(&mu_); + // Mark that we are attempting recovery for this object to prevent + // duplicate reconstructions of the same object. + already_pending_recovery = !objects_pending_recovery_.insert(object_id).second; + } + } + + if (!already_pending_recovery) { + RAY_LOG(INFO) << "Starting recovery for object " << object_id; + in_memory_store_->GetAsync( + object_id, [this, object_id](std::shared_ptr obj) { + absl::MutexLock lock(&mu_); + RAY_CHECK(objects_pending_recovery_.erase(object_id)) << object_id; + RAY_LOG(INFO) << "Recovery complete for object " << object_id; + }); + // Lookup the object in the GCS to find another copy. + RAY_RETURN_NOT_OK(object_lookup_( + object_id, + [this](const ObjectID &object_id, const std::vector &locations) { + PinOrReconstructObject(object_id, locations); + })); + } else { + RAY_LOG(DEBUG) << "Recovery already started for object " << object_id; + } + return Status::OK(); +} + +void ObjectRecoveryManager::PinOrReconstructObject( + const ObjectID &object_id, const std::vector &locations) { + RAY_LOG(INFO) << "Lost object " << object_id << " has " << locations.size() + << " locations"; + if (!locations.empty()) { + auto locations_copy = locations; + const auto location = locations_copy.back(); + locations_copy.pop_back(); + PinExistingObjectCopy(object_id, location, locations_copy); + } else if (lineage_reconstruction_enabled_) { + // There are no more copies to pin, try to reconstruct the object. + ReconstructObject(object_id); + } else { + reconstruction_failure_callback_(object_id, /*pin_object=*/true); + } +} + +void ObjectRecoveryManager::PinExistingObjectCopy( + const ObjectID &object_id, const rpc::Address &raylet_address, + const std::vector &other_locations) { + // If a copy still exists, pin the object by sending a + // PinObjectIDs RPC. + const auto node_id = ClientID::FromBinary(raylet_address.raylet_id()); + RAY_LOG(DEBUG) << "Trying to pin copy of lost object " << object_id << " at node " + << node_id; + + std::shared_ptr client; + if (node_id == ClientID::FromBinary(rpc_address_.raylet_id())) { + client = local_object_pinning_client_; + } else { + absl::MutexLock lock(&mu_); + auto client_it = remote_object_pinning_clients_.find(node_id); + if (client_it == remote_object_pinning_clients_.end()) { + RAY_LOG(DEBUG) << "Connecting to raylet " << node_id; + client_it = remote_object_pinning_clients_ + .emplace(node_id, client_factory_(raylet_address.ip_address(), + raylet_address.port())) + .first; + } + client = client_it->second; + } + + RAY_UNUSED(client->PinObjectIDs( + rpc_address_, {object_id}, + [this, object_id, other_locations, node_id](const Status &status, + const rpc::PinObjectIDsReply &reply) { + if (status.ok()) { + // TODO(swang): Make sure that the node is still alive when + // marking the object as pinned. + RAY_CHECK(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), + object_id)); + reference_counter_->UpdateObjectPinnedAtRaylet(object_id, node_id); + } else { + RAY_LOG(INFO) << "Error pinning new copy of lost object " << object_id + << ", trying again"; + PinOrReconstructObject(object_id, other_locations); + } + })); +} + +void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { + // Notify the task manager that we are retrying the task that created this + // object. + const auto task_id = object_id.TaskId(); + std::vector task_deps; + auto status = task_resubmitter_->ResubmitTask(task_id, &task_deps); + + if (status.ok()) { + // Try to recover the task's dependencies. + for (const auto &dep : task_deps) { + auto status = RecoverObject(dep); + if (!status.ok()) { + RAY_LOG(INFO) << "Failed to reconstruct object " << dep << ": " + << status.message(); + // We do not pin the dependency because we may not be the owner. + reconstruction_failure_callback_(dep, /*pin_object=*/false); + } + } + } else { + RAY_LOG(INFO) << "Failed to reconstruct object " << object_id; + reconstruction_failure_callback_(object_id, /*pin_object=*/true); + } +} + +} // namespace ray diff --git a/src/ray/core_worker/object_recovery_manager.h b/src/ray/core_worker/object_recovery_manager.h new file mode 100644 index 000000000..b324714e6 --- /dev/null +++ b/src/ray/core_worker/object_recovery_manager.h @@ -0,0 +1,151 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RAY_CORE_WORKER_OBJECT_RECOVERY_MANAGER_H +#define RAY_CORE_WORKER_OBJECT_RECOVERY_MANAGER_H + +#include "absl/base/thread_annotations.h" +#include "absl/synchronization/mutex.h" +#include "ray/common/id.h" +#include "ray/core_worker/reference_count.h" +#include "ray/core_worker/store_provider/memory_store/memory_store.h" +#include "ray/core_worker/task_manager.h" +#include "ray/raylet/raylet_client.h" + +namespace ray { + +typedef std::function(const std::string &ip_address, + int port)> + ObjectPinningClientFactoryFn; + +typedef std::function &raylet_locations)> + ObjectLookupCallback; + +class ObjectRecoveryManager { + public: + ObjectRecoveryManager(const rpc::Address &rpc_address, + ObjectPinningClientFactoryFn client_factory, + std::shared_ptr local_object_pinning_client, + std::function + object_lookup, + std::shared_ptr task_resubmitter, + std::shared_ptr reference_counter, + std::shared_ptr in_memory_store, + std::function + reconstruction_failure_callback, + bool lineage_reconstruction_enabled) + : rpc_address_(rpc_address), + client_factory_(client_factory), + local_object_pinning_client_(local_object_pinning_client), + object_lookup_(object_lookup), + task_resubmitter_(task_resubmitter), + reference_counter_(reference_counter), + in_memory_store_(in_memory_store), + reconstruction_failure_callback_(reconstruction_failure_callback), + lineage_reconstruction_enabled_(lineage_reconstruction_enabled) {} + + /// Recover an object that was stored in plasma. This will only succeed for + /// objects that are lost from memory and that this process owns (returns + /// Status::Invalid if false). This method is idempotent for overlapping + /// recovery operations on the same object. This class will guarantee that + /// each recovery operation ends in either success (by storing a new value + /// for the object in the direct memory/plasma store) or failure (by calling + /// the given reconstruction failure callback). + /// + /// Algorithm: + /// 1. Check that the object is missing from the direct memory store and that + /// we own the object. If either is false, then fail the recovery operation. + /// 2. Look up the object in the global directory to check for other + /// locations of the object. If another location exists, attempt to pin it. + /// If the pinning is successful, then mark the recovery as a success by + /// storing a new value for the object in the direct memory store. + /// 3. If pinning fails at all locations for the object (or there are no + /// locations), attempt to reconstruct the object by resubmitting the task + /// that created the object. If the task resubmission fails, then the + /// fail the recovery operation. + /// 4. If task resubmission succeeds, recursively attempt to recover any + /// plasma arguments to the task. The recovery operation will succeed once + /// the task completes and stores a new value for its return object. + /// + /// \return OK if recovery for the object has successfully started, Invalid + /// if the object is not recoverable because we do not own it. Note that the + /// Status::OK value only indicates that the recovery operation has started, + /// but does not guarantee that the recovery operation is successful. + Status RecoverObject(const ObjectID &object_id); + + private: + /// Pin a new copy for a lost object from the given locations or, if that + /// fails, attempt to reconstruct it by resubmitting the task that created + /// the object. + void PinOrReconstructObject(const ObjectID &object_id, + const std::vector &locations); + + /// Pin a new copy for the object at the given location. If that fails, then + /// try one of the other locations. + void PinExistingObjectCopy(const ObjectID &object_id, + const rpc::Address &raylet_address, + const std::vector &other_locations); + + /// Reconstruct an object by resubmitting the task that created it. + void ReconstructObject(const ObjectID &object_id); + + /// Used to resubmit tasks. + std::shared_ptr task_resubmitter_; + + /// Used to check whether we own an object. + std::shared_ptr reference_counter_; + + /// Address of our RPC server. + rpc::Address rpc_address_; + + /// Factory for producing new clients to pin objects at remote nodes. + const ObjectPinningClientFactoryFn client_factory_; + + // Client that can be used to pin objects from the local raylet. + std::shared_ptr local_object_pinning_client_; + + /// Function to lookup an object's locations from the global database. + const std::function + object_lookup_; + + /// Used to store object values (InPlasmaError) if recovery succeeds. + std::shared_ptr in_memory_store_; + + /// Callback to call if recovery fails. + const std::function + reconstruction_failure_callback_; + + /// Whether lineage reconstruction is enabled. If disabled, then we will try + /// to pin new copies for a lost object, but we will never reconstruct it + /// through task submission. + const bool lineage_reconstruction_enabled_; + + /// Protects below fields. + mutable absl::Mutex mu_; + + /// Cache of gRPC clients to remote raylets for pinning objects. + absl::flat_hash_map> + remote_object_pinning_clients_ GUARDED_BY(mu_); + + /// Objects that are currently pending recovery. Calls to RecoverObject for + /// objects currently in this set are idempotent. + absl::flat_hash_set objects_pending_recovery_ GUARDED_BY(mu_); +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_OBJECT_RECOVERY_MANAGER_H diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index b3b86dae3..a41b618f2 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -234,6 +234,16 @@ void ReferenceCounter::UpdateSubmittedTaskReferences( deleted); } +void ReferenceCounter::UpdateResubmittedTaskReferences( + const std::vector &argument_ids) { + absl::MutexLock lock(&mutex_); + for (const ObjectID &argument_id : argument_ids) { + auto it = object_id_refs_.find(argument_id); + RAY_CHECK(it != object_id_refs_.end()); + it->second.submitted_task_ref_count++; + } +} + void ReferenceCounter::UpdateFinishedTaskReferences( const std::vector &argument_ids, bool release_lineage, const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs, @@ -323,8 +333,12 @@ bool ReferenceCounter::GetOwner(const ObjectID &object_id, TaskID *owner_id, } if (it->second.owner.has_value()) { - *owner_id = it->second.owner.value().first; - *owner_address = it->second.owner.value().second; + if (owner_id) { + *owner_id = it->second.owner.value().first; + } + if (owner_address) { + *owner_address = it->second.owner.value().second; + } return true; } else { return false; @@ -404,6 +418,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, RAY_LOG(DEBUG) << "Calling on_delete for object " << id; it->second.on_delete(id); it->second.on_delete = nullptr; + it->second.pinned_at_raylet_id.reset(); } if (deleted) { deleted->push_back(id); @@ -430,12 +445,66 @@ bool ReferenceCounter::SetDeleteCallback( auto it = object_id_refs_.find(object_id); if (it == object_id_refs_.end()) { return false; + } else if (it->second.OutOfScope() && + !it->second.ShouldDelete(lineage_pinning_enabled_)) { + // The object has already gone out of scope but cannot be deleted yet. Do + // not set the deletion callback because it may never get called. + return false; } + RAY_CHECK(!it->second.on_delete) << object_id; it->second.on_delete = callback; return true; } +std::vector ReferenceCounter::ResetObjectsOnRemovedNode( + const ClientID &raylet_id) { + absl::MutexLock lock(&mutex_); + std::vector lost_objects; + for (auto &it : object_id_refs_) { + const auto &object_id = it.first; + auto &ref = it.second; + if (ref.pinned_at_raylet_id.value_or(ClientID::Nil()) == raylet_id) { + lost_objects.push_back(object_id); + ref.pinned_at_raylet_id.reset(); + if (ref.on_delete) { + ref.on_delete(object_id); + ref.on_delete = nullptr; + } + } + } + return lost_objects; +} + +void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, + const ClientID &raylet_id) { + absl::MutexLock lock(&mutex_); + auto it = object_id_refs_.find(object_id); + if (it != object_id_refs_.end()) { + // The object is still in scope. Track the raylet location until the object + // has gone out of scope or the raylet fails, whichever happens first. + RAY_CHECK(!it->second.pinned_at_raylet_id.has_value()); + // Only the owner tracks the location. + RAY_CHECK(it->second.owned_by_us); + if (!it->second.OutOfScope()) { + it->second.pinned_at_raylet_id = raylet_id; + } + } +} + +bool ReferenceCounter::IsPlasmaObjectPinned(const ObjectID &object_id, + bool *pinned) const { + absl::MutexLock lock(&mutex_); + auto it = object_id_refs_.find(object_id); + if (it != object_id_refs_.end()) { + if (it->second.owned_by_us) { + *pinned = it->second.pinned_at_raylet_id.has_value(); + return true; + } + } + return false; +} + bool ReferenceCounter::HasReference(const ObjectID &object_id) const { absl::MutexLock lock(&mutex_); return object_id_refs_.find(object_id) != object_id_refs_.end(); diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 30a9fcb11..122e864ea 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -86,6 +86,14 @@ class ReferenceCounter { const std::vector &argument_ids_to_remove = std::vector(), std::vector *deleted = nullptr) LOCKS_EXCLUDED(mutex_); + /// Add references for the object dependencies of a resubmitted task. This + /// does not increment the arguments' lineage ref counts because we should + /// have already incremented them when the task was first submitted. + /// + /// \param[in] argument_ids The arguments of the task to add references for. + void UpdateResubmittedTaskReferences(const std::vector &argument_ids) + LOCKS_EXCLUDED(mutex_); + /// Update object references that were given to a submitted task. The task /// may still be borrowing any object IDs that were contained in its /// arguments. This should be called when inlined dependencies are inlined or @@ -165,8 +173,8 @@ class ReferenceCounter { /// \param[in] object_id The ID of the object to look up. /// \param[out] owner_id The TaskID of the object owner. /// \param[out] owner_address The address of the object owner. - bool GetOwner(const ObjectID &object_id, TaskID *owner_id, - rpc::Address *owner_address) const LOCKS_EXCLUDED(mutex_); + bool GetOwner(const ObjectID &object_id, TaskID *owner_id = nullptr, + rpc::Address *owner_address = nullptr) const LOCKS_EXCLUDED(mutex_); /// Manually delete the objects from the reference counter. void DeleteReferences(const std::vector &object_ids) LOCKS_EXCLUDED(mutex_); @@ -177,6 +185,9 @@ class ReferenceCounter { const std::function callback) LOCKS_EXCLUDED(mutex_); + void ResetDeleteCallbacks(const std::vector &object_ids) + LOCKS_EXCLUDED(mutex_); + /// Set a callback for when we are no longer borrowing this object (when our /// ref count goes to 0). /// @@ -267,6 +278,33 @@ class ReferenceCounter { const std::vector &inner_ids, const rpc::WorkerAddress &owner_address) LOCKS_EXCLUDED(mutex_); + /// Update the pinned location of an object stored in plasma. + /// + /// \param[in] object_id The object to update. + /// \param[in] raylet_id The raylet that is now pinning the object ID. + void UpdateObjectPinnedAtRaylet(const ObjectID &object_id, const ClientID &raylet_id) + LOCKS_EXCLUDED(mutex_); + + /// Check whether the object is pinned at a remote plasma store node. + /// + /// \param[in] object_id The object to check. + /// \param[out] pinned Whether the object was pinned at a remote plasma store + /// node. + /// \return True if the object exists and is owned by us, false otherwise. We + /// return false here because a borrower should not know the pinned location + /// for an object. + bool IsPlasmaObjectPinned(const ObjectID &object_id, bool *pinned) const + LOCKS_EXCLUDED(mutex_); + + /// Get and reset the objects that were pinned on the given node. This + /// method should be called upon a node failure, to determine which plasma + /// objects were lost. If a deletion callback was set for a lost object, it + /// will be invoked and reset. + /// + /// \param[in] node_id The node whose object store has been removed. + /// \return The set of objects that were pinned on the given node. + std::vector ResetObjectsOnRemovedNode(const ClientID &raylet_id); + /// Whether we have a reference to a particular ObjectID. /// /// \param[in] object_id The object ID to check for. @@ -350,6 +388,10 @@ class ReferenceCounter { /// if we do not know the object's owner (because distributed ref counting /// is not yet implemented). absl::optional> owner; + // If this object is owned by us and stored in plasma, and reference + // counting is enabled, then some raylet must be pinning the object value. + // This is the address of that raylet. + absl::optional pinned_at_raylet_id; /// The local ref count for the ObjectID in the language frontend. size_t local_ref_count = 0; diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index ef5da8ed5..4c2757828 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -1800,6 +1800,11 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { // The task finishes but is retryable. rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out); + // We should fail to set the deletion callback because the object has + // already gone out of scope. + ASSERT_FALSE(rc->SetDeleteCallback( + id, [&](const ObjectID &object_id) { ASSERT_FALSE(true); })); + ASSERT_EQ(out.size(), 1); out.clear(); ASSERT_TRUE(lineage_deleted.empty()); @@ -1815,6 +1820,81 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { ASSERT_EQ(rc->NumObjectIDsInScope(), 0); } +TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) { + std::vector out; + std::vector lineage_deleted; + + ObjectID id = ObjectID::FromRandom(); + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + + rc->SetReleaseLineageCallback( + [&](const ObjectID &object_id, std::vector *ids_to_release) { + lineage_deleted.push_back(object_id); + }); + + // Local references. + rc->AddLocalReference(id, ""); + ASSERT_TRUE(rc->HasReference(id)); + + // Submit 2 dependent tasks. + rc->UpdateSubmittedTaskReferences({id}); + rc->UpdateSubmittedTaskReferences({id}); + rc->RemoveLocalReference(id, nullptr); + ASSERT_TRUE(rc->HasReference(id)); + + // Both tasks finish, 1 is retryable. + rc->UpdateFinishedTaskReferences({id}, true, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out); + // The dependency is no longer in scope, but we still keep a reference to it + // because it is in the lineage of the retryable task. + ASSERT_EQ(out.size(), 1); + ASSERT_TRUE(rc->HasReference(id)); + + // Simulate retrying the task. + rc->UpdateResubmittedTaskReferences({id}); + rc->UpdateFinishedTaskReferences({id}, true, empty_borrower, empty_refs, &out); + ASSERT_FALSE(rc->HasReference(id)); + ASSERT_EQ(lineage_deleted.size(), 1); +} + +TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { + auto deleted = std::make_shared>(); + auto callback = [&](const ObjectID &object_id) { deleted->insert(object_id); }; + + ObjectID borrowed_id = ObjectID::FromRandom(); + rc->AddLocalReference(borrowed_id, ""); + bool pinned = false; + ASSERT_FALSE(rc->IsPlasmaObjectPinned(borrowed_id, &pinned)); + + ObjectID id = ObjectID::FromRandom(); + ClientID node_id = ClientID::FromRandom(); + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + rc->AddLocalReference(id, ""); + ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); + ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); + ASSERT_FALSE(pinned); + rc->UpdateObjectPinnedAtRaylet(id, node_id); + ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); + ASSERT_TRUE(pinned); + + rc->RemoveLocalReference(id, nullptr); + ASSERT_FALSE(rc->IsPlasmaObjectPinned(id, &pinned)); + ASSERT_TRUE(deleted->count(id) > 0); + deleted->clear(); + + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + rc->AddLocalReference(id, ""); + ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); + rc->UpdateObjectPinnedAtRaylet(id, node_id); + auto objects = rc->ResetObjectsOnRemovedNode(node_id); + ASSERT_EQ(objects.size(), 1); + ASSERT_EQ(objects[0], id); + ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); + ASSERT_FALSE(pinned); + ASSERT_TRUE(deleted->count(id) > 0); + deleted->clear(); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 7967fa6d9..c7b7b01c0 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -154,7 +154,7 @@ class CoreWorkerMemoryStore { std::shared_ptr raylet_client_ = nullptr; /// Protects the data structures below. - absl::Mutex mu_; + mutable absl::Mutex mu_; /// Set of objects that should be promoted to plasma once available. absl::flat_hash_set promoted_to_plasma_ GUARDED_BY(mu_); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 0a7c9da3a..a314f7749 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -78,6 +78,50 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, } } +Status TaskManager::ResubmitTask(const TaskID &task_id, + std::vector *task_deps) { + TaskSpecification spec; + bool resubmit = false; + { + absl::MutexLock lock(&mu_); + auto it = submissible_tasks_.find(task_id); + if (it == submissible_tasks_.end()) { + return Status::Invalid("Task spec missing"); + } + + if (!it->second.pending) { + resubmit = true; + it->second.pending = true; + RAY_CHECK(it->second.num_retries_left > 0); + it->second.num_retries_left--; + spec = it->second.spec; + } + } + + for (size_t i = 0; i < spec.NumArgs(); i++) { + if (spec.ArgByRef(i)) { + for (size_t j = 0; j < spec.ArgIdCount(i); j++) { + task_deps->push_back(spec.ArgId(i, j)); + } + } else { + const auto &inlined_ids = spec.ArgInlinedIds(i); + for (const auto &inlined_id : inlined_ids) { + task_deps->push_back(inlined_id); + } + } + } + + if (!task_deps->empty()) { + reference_counter_->UpdateResubmittedTaskReferences(*task_deps); + } + + if (resubmit) { + retry_task_callback_(spec, /*delay=*/false); + } + + return Status::OK(); +} + void TaskManager::DrainAndShutdown(std::function shutdown) { bool has_pending_tasks = false; { @@ -127,6 +171,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, RAY_LOG(DEBUG) << "Completing task " << task_id; std::vector direct_return_ids; + std::vector plasma_return_ids; for (int i = 0; i < reply.return_objects_size(); i++) { const auto &return_object = reply.return_objects(i); ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); @@ -136,7 +181,14 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, // Mark it as in plasma with a dummy object. RAY_CHECK( in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); + const auto pinned_at_raylet_id = ClientID::FromBinary(worker_addr.raylet_id()); + reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); } else { + // NOTE(swang): If a direct object was promoted to plasma, then we do not + // record the node ID that it was pinned at, which means that we will not + // be able to reconstruct it if the plasma object copy is lost. However, + // this is okay because the pinned copy is on the local node, so we will + // fate-share with the object if the local node fails. std::shared_ptr data_buffer; if (return_object.data().size() > 0) { data_buffer = std::make_shared( @@ -235,7 +287,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ if (num_retries_left > 0) { RAY_LOG(ERROR) << num_retries_left << " retries left for task " << spec.TaskId() << ", attempting to resubmit."; - retry_task_callback_(spec); + retry_task_callback_(spec, /*delay=*/true); } else { // Throttled logging of task failure errors. { @@ -380,11 +432,4 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, } } -TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const { - absl::MutexLock lock(&mu_); - auto it = submissible_tasks_.find(task_id); - RAY_CHECK(it != submissible_tasks_.end()); - return it->second.spec; -} - } // namespace ray diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 1743a93ff..d9c74fd07 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -42,9 +42,17 @@ class TaskFinisherInterface { virtual ~TaskFinisherInterface() {} }; -using RetryTaskCallback = std::function; +class TaskResubmissionInterface { + public: + virtual Status ResubmitTask(const TaskID &task_id, + std::vector *task_deps) = 0; -class TaskManager : public TaskFinisherInterface { + virtual ~TaskResubmissionInterface() {} +}; + +using RetryTaskCallback = std::function; + +class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { public: TaskManager(std::shared_ptr in_memory_store, std::shared_ptr reference_counter, @@ -73,6 +81,19 @@ class TaskManager : public TaskFinisherInterface { const TaskSpecification &spec, const std::string &call_site, int max_retries = 0); + /// Resubmit a task that has completed execution before. This is used to + /// reconstruct objects stored in Plasma that were lost. + /// + /// \param[in] task_id The ID of the task to resubmit. + /// \param[out] task_deps The object dependencies of the resubmitted task, + /// i.e. all arguments that were not inlined in the task spec. The caller is + /// responsible for making sure that these dependencies become available, so + /// that the resubmitted task can run. This is only populated if the task was + /// not already pending and was successfully resubmitted. + /// \return OK if the task was successfully resubmitted or was + /// already pending, Invalid if the task spec is no longer present. + Status ResubmitTask(const TaskID &task_id, std::vector *task_deps); + /// Wait for all pending tasks to finish, and then shutdown. /// /// \param shutdown The shutdown callback to call. @@ -108,9 +129,6 @@ class TaskManager : public TaskFinisherInterface { void OnTaskDependenciesInlined(const std::vector &inlined_dependency_ids, const std::vector &contained_ids) override; - /// Return the spec for a pending task. - TaskSpecification GetTaskSpec(const TaskID &task_id) const; - /// Return whether this task can be submitted for execution. /// /// \param[in] task_id ID of the task to query. diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc new file mode 100644 index 000000000..9f69ba93f --- /dev/null +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -0,0 +1,238 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/core_worker/transport/direct_task_transport.h" + +#include "gtest/gtest.h" +#include "ray/common/task/task_spec.h" +#include "ray/common/task/task_util.h" +#include "ray/common/test_util.h" +#include "ray/core_worker/object_recovery_manager.h" +#include "ray/core_worker/store_provider/memory_store/memory_store.h" +#include "ray/raylet/raylet_client.h" + +namespace ray { + +// Used to prevent leases from timing out when not testing that logic. It would +// be better to use a mock clock or lease manager interface, but that's high +// overhead for the very simple timeout logic we currently have. +int64_t kLongTimeout = 1024 * 1024 * 1024; + +class MockTaskResubmitter : public TaskResubmissionInterface { + public: + MockTaskResubmitter() {} + + void AddTask(const TaskID &task_id, std::vector task_deps) { + task_specs[task_id] = task_deps; + } + + Status ResubmitTask(const TaskID &task_id, std::vector *task_deps) { + if (task_specs.find(task_id) == task_specs.end()) { + return Status::Invalid(""); + } + + for (const auto &dep : task_specs[task_id]) { + task_deps->push_back(dep); + } + num_tasks_resubmitted++; + return Status::OK(); + } + + std::unordered_map> task_specs; + int num_tasks_resubmitted = 0; +}; + +class MockRayletClient : public PinObjectsInterface { + public: + ray::Status PinObjectIDs( + const rpc::Address &caller_address, const std::vector &object_ids, + const ray::rpc::ClientCallback &callback) override { + RAY_LOG(INFO) << "PinObjectIDs " << object_ids.size(); + callbacks.push_back(callback); + return Status::OK(); + } + + size_t Flush() { + size_t flushed = callbacks.size(); + for (const auto &callback : callbacks) { + callback(Status::OK(), rpc::PinObjectIDsReply()); + } + callbacks.clear(); + return flushed; + } + + std::list> callbacks = {}; +}; + +class MockObjectDirectory { + public: + void AsyncGetLocations(const ObjectID &object_id, + const ObjectLookupCallback &callback) { + callbacks.push_back({object_id, callback}); + } + + void SetLocations(const ObjectID &object_id, + const std::vector &addresses) { + locations[object_id] = addresses; + } + + size_t Flush() { + size_t flushed = callbacks.size(); + for (const auto &pair : callbacks) { + pair.second(pair.first, locations[pair.first]); + } + for (size_t i = 0; i < flushed; i++) { + callbacks.erase(callbacks.begin()); + } + return flushed; + } + + std::vector> callbacks = {}; + std::unordered_map> locations; +}; + +class ObjectRecoveryManagerTest : public ::testing::Test { + public: + ObjectRecoveryManagerTest() + : local_raylet_id_(ClientID::FromRandom()), + object_directory_(std::make_shared()), + memory_store_(std::make_shared()), + raylet_client_(std::make_shared()), + task_resubmitter_(std::make_shared()), + ref_counter_(std::make_shared( + rpc::Address(), /*distributed_ref_counting_enabled=*/true, + /*lineage_pinning_enabled=*/true)), + manager_(rpc::Address(), + [&](const std::string &ip, int port) { return raylet_client_; }, + raylet_client_, + [&](const ObjectID &object_id, const ObjectLookupCallback &callback) { + object_directory_->AsyncGetLocations(object_id, callback); + return Status::OK(); + }, + task_resubmitter_, ref_counter_, memory_store_, + [&](const ObjectID &object_id, bool pin_object) { + RAY_CHECK(failed_reconstructions_.count(object_id) == 0); + failed_reconstructions_[object_id] = pin_object; + + std::string meta = + std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); + auto metadata = const_cast( + reinterpret_cast(meta.data())); + auto meta_buffer = + std::make_shared(metadata, meta.size()); + auto data = RayObject(nullptr, meta_buffer, std::vector()); + RAY_CHECK(memory_store_->Put(data, object_id)); + + ref_counter_->UpdateObjectPinnedAtRaylet(object_id, local_raylet_id_); + }, + /*lineage_reconstruction_enabled=*/true) {} + + ClientID local_raylet_id_; + std::unordered_map failed_reconstructions_; + + std::shared_ptr object_directory_; + std::shared_ptr memory_store_; + std::shared_ptr raylet_client_; + std::shared_ptr task_resubmitter_; + std::shared_ptr ref_counter_; + ObjectRecoveryManager manager_; +}; + +TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(failed_reconstructions_.empty()); + ASSERT_TRUE(object_directory_->Flush() == 1); + ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); +} + +TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + std::vector addresses({rpc::Address()}); + object_directory_->SetLocations(object_id, addresses); + + ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(object_directory_->Flush() == 1); + ASSERT_TRUE(raylet_client_->Flush() == 1); + ASSERT_TRUE(failed_reconstructions_.empty()); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); +} + +TEST_F(ObjectRecoveryManagerTest, TestReconstruction) { + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + task_resubmitter_->AddTask(object_id.TaskId(), {}); + + ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(object_directory_->Flush() == 1); + + ASSERT_TRUE(failed_reconstructions_.empty()); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 1); +} + +TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) { + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + ref_counter_->AddLocalReference(object_id, ""); + + ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + // A second attempt to recover the object will not trigger any more + // callbacks. + ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(object_directory_->Flush() == 1); + failed_reconstructions_.clear(); + + // The object has been marked as failed. Another attempt to recover the + // object will not trigger any callbacks. + ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_EQ(object_directory_->Flush(), 0); + + // The object is removed and can be recovered again. + auto objects = ref_counter_->ResetObjectsOnRemovedNode(local_raylet_id_); + ASSERT_EQ(objects.size(), 1); + ASSERT_EQ(objects[0], object_id); + memory_store_->Delete(objects); + ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(object_directory_->Flush() == 1); +} + +TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) { + std::vector object_ids; + std::vector dependencies; + for (int i = 0; i < 3; i++) { + ObjectID object_id = ObjectID::FromRandom(); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + task_resubmitter_->AddTask(object_id.TaskId(), dependencies); + dependencies = {object_id}; + object_ids.push_back(object_id); + } + + ASSERT_TRUE(manager_.RecoverObject(object_ids.back()).ok()); + for (int i = 0; i < 3; i++) { + RAY_LOG(INFO) << i; + ASSERT_EQ(object_directory_->Flush(), 1); + ASSERT_TRUE(failed_reconstructions_.empty()); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, i + 1); + } +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 74e98e8fb..9e9e0d48f 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -51,7 +51,7 @@ class TaskManagerTest : public ::testing::Test { /*distributed_ref_counting_enabled=*/true, lineage_pinning_enabled))), actor_manager_(std::shared_ptr(new MockActorManager())), manager_(store_, reference_counter_, actor_manager_, - [this](const TaskSpecification &spec) { + [this](const TaskSpecification &spec, bool delay) { num_retries_++; return Status::OK(); }) {} @@ -421,6 +421,65 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) { ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); } +// Test to make sure that the task manager only resubmits tasks whose specs are +// pinned and that are not already pending execution. +TEST_F(TaskManagerLineageTest, TestResubmitTask) { + TaskID caller_id = TaskID::Nil(); + rpc::Address caller_address; + // Submit a task with 2 arguments. + ObjectID dep1 = ObjectID::FromRandom(); + ObjectID dep2 = ObjectID::FromRandom(); + auto spec = CreateTaskHelper(1, {dep1, dep2}); + int num_retries = 3; + + // Cannot resubmit a task whose spec we do not have. + std::vector resubmitted_task_deps; + ASSERT_FALSE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); + ASSERT_TRUE(resubmitted_task_deps.empty()); + ASSERT_EQ(num_retries_, 0); + + manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + // A task that is already pending does not get resubmitted. + ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); + ASSERT_TRUE(resubmitted_task_deps.empty()); + ASSERT_EQ(num_retries_, 0); + + // The task completes. + auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); + reference_counter_->AddLocalReference(return_id, ""); + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(return_id.Binary()); + auto data = GenerateRandomBuffer(); + return_object->set_data(data->Data(), data->Size()); + return_object->set_in_plasma(true); + manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); + + // The task finished, its return ID is still in scope, and the return object + // was stored in plasma. It is okay to resubmit it now. + ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); + ASSERT_EQ(resubmitted_task_deps, spec.GetDependencies()); + ASSERT_EQ(num_retries_, 1); + resubmitted_task_deps.clear(); + + // The return ID goes out of scope. + reference_counter_->RemoveLocalReference(return_id, nullptr); + // The task is still pending execution. + ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); + // A task that is already pending does not get resubmitted. + ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); + ASSERT_TRUE(resubmitted_task_deps.empty()); + ASSERT_EQ(num_retries_, 1); + + // The resubmitted task finishes. + manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); + ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); + // The task cannot be resubmitted because its spec has been released. + ASSERT_FALSE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); + ASSERT_TRUE(resubmitted_task_deps.empty()); + ASSERT_EQ(num_retries_, 1); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e0be0a8a1..292f653ee 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -162,6 +162,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, node_manager_service_(io_service, *this), client_call_manager_(io_service), new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()) { + RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; RAY_CHECK(heartbeat_period_.count() > 0); // Initialize the resource map with own cluster resource configuration. cluster_resource_map_.emplace(self_node_id_, @@ -3371,10 +3372,13 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, } RAY_LOG(DEBUG) << "Pinning object " << object_id; - pinned_objects_.emplace( - object_id, std::unique_ptr(new RayObject( - std::make_shared(plasma_results[i].data), - std::make_shared(plasma_results[i].metadata), {}))); + RAY_CHECK( + pinned_objects_ + .emplace(object_id, + std::unique_ptr(new RayObject( + std::make_shared(plasma_results[i].data), + std::make_shared(plasma_results[i].metadata), {}))) + .second); i++; // Send a long-running RPC request to the owner for each object. When we get a @@ -3394,7 +3398,9 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, pinned_objects_.erase(object_id); // Try to evict all copies of the object from the cluster. - objects_to_free_.push_back(object_id); + if (free_objects_period_ >= 0) { + objects_to_free_.push_back(object_id); + } if (objects_to_free_.size() == RayConfig::instance().free_objects_batch_size() || free_objects_period_ == 0) { @@ -3411,10 +3417,6 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, } void NodeManager::FlushObjectsToFree() { - if (free_objects_period_ < 0) { - return; - } - if (!objects_to_free_.empty()) { RAY_LOG(DEBUG) << "Freeing " << objects_to_free_.size() << " out-of-scope objects"; object_manager_.FreeObjects(objects_to_free_, /*local_only=*/false); diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index 0c1acb5c8..e82d6cc4d 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -48,6 +48,17 @@ namespace ray { typedef boost::asio::generic::stream_protocol local_stream_protocol; typedef boost::asio::basic_stream_socket local_stream_socket; +/// Interface for pinning objects. Abstract for testing. +class PinObjectsInterface { + public: + /// Request to a raylet to pin a plasma object. The callback will be sent via gRPC. + virtual ray::Status PinObjectIDs( + const rpc::Address &caller_address, const std::vector &object_ids, + const ray::rpc::ClientCallback &callback) = 0; + + virtual ~PinObjectsInterface(){}; +}; + /// Interface for leasing workers. Abstract for testing. class WorkerLeaseInterface { public: @@ -129,7 +140,9 @@ class RayletConnection { std::mutex write_mutex_; }; -class RayletClient : public WorkerLeaseInterface, public DependencyWaiterInterface { +class RayletClient : public PinObjectsInterface, + public WorkerLeaseInterface, + public DependencyWaiterInterface { public: /// Connect to the raylet. /// @@ -287,7 +300,7 @@ class RayletClient : public WorkerLeaseInterface, public DependencyWaiterInterfa ray::Status PinObjectIDs( const rpc::Address &caller_address, const std::vector &object_ids, - const ray::rpc::ClientCallback &callback); + const ray::rpc::ClientCallback &callback) override; ray::Status GlobalGC(const rpc::ClientCallback &callback);