From c9314098b90d197884ce4e52639aff1dd45e6adc Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Mon, 25 Nov 2019 14:48:19 -0700 Subject: [PATCH] Implement direct task worker lease timeouts (#6188) --- src/ray/common/ray_config_def.h | 4 + src/ray/core_worker/core_worker.cc | 2 +- .../test/direct_task_transport_test.cc | 109 ++++++++++++++---- .../transport/direct_task_transport.cc | 10 +- .../transport/direct_task_transport.h | 15 ++- 5 files changed, 106 insertions(+), 34 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 0387b2c4f..cd3b03182 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -50,6 +50,10 @@ RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024) /// this many milliseconds. RAY_CONFIG(int64_t, initial_reconstruction_timeout_milliseconds, 10000) +/// The maximum duration that workers can hold on to another worker's lease +/// for direct task submission until it must be returned to the raylet. +RAY_CONFIG(int64_t, worker_lease_timeout_milliseconds, 500) + /// The duration between heartbeats sent from the workers to the raylet. /// If set to a negative value, the heartbeats will not be sent. /// These are used to report active object IDs for garbage collection and diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8c63e43cb..8df0ab721 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -208,7 +208,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, return std::shared_ptr( new RayletClient(std::move(grpc_client))); }, - memory_store_)); + memory_store_, RayConfig::instance().worker_lease_timeout_milliseconds())); } CoreWorker::~CoreWorker() { diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 024d8a250..80fdbf5d3 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -9,6 +9,11 @@ namespace ray { +// Used to prevent leases from timing out when not testing that logic. It would +// be better to use a mock clock or lease manager interface, but that's high +// overhead for the very simple timeout logic we currently have. +int64_t kLongTimeout = 1024 * 1024 * 1024; + class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: ray::Status PushNormalTask( @@ -73,8 +78,8 @@ class MockRayletClient : public WorkerLeaseInterface { TEST(TestMemoryStore, TestPromoteToPlasma) { bool num_plasma_puts = 0; - auto mem = std::shared_ptr(new CoreWorkerMemoryStore( - [&](const RayObject &obj, const ObjectID &obj_id) { num_plasma_puts += 1; })); + auto mem = std::make_shared( + [&](const RayObject &obj, const ObjectID &obj_id) { num_plasma_puts += 1; }); ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); auto data = GenerateRandomObject(); @@ -96,7 +101,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) { } TEST(LocalDependencyResolverTest, TestNoDependencies) { - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto store = std::make_shared(); LocalDependencyResolver resolver(store); TaskSpecification task; bool ok = false; @@ -105,7 +110,7 @@ TEST(LocalDependencyResolverTest, TestNoDependencies) { } TEST(LocalDependencyResolverTest, TestIgnorePlasmaDependencies) { - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto store = std::make_shared(); LocalDependencyResolver resolver(store); ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::RAYLET); TaskSpecification task; @@ -118,7 +123,7 @@ TEST(LocalDependencyResolverTest, TestIgnorePlasmaDependencies) { } TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) { - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto store = std::make_shared(); LocalDependencyResolver resolver(store); ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); @@ -139,7 +144,7 @@ TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) { } TEST(LocalDependencyResolverTest, TestInlineLocalDependencies) { - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto store = std::make_shared(); LocalDependencyResolver resolver(store); ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); @@ -162,7 +167,7 @@ TEST(LocalDependencyResolverTest, TestInlineLocalDependencies) { } TEST(LocalDependencyResolverTest, TestInlinePendingDependencies) { - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto store = std::make_shared(); LocalDependencyResolver resolver(store); ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); @@ -188,10 +193,11 @@ TEST(LocalDependencyResolverTest, TestInlinePendingDependencies) { TEST(DirectTaskTransportTest, TestSubmitOneTask) { auto raylet_client = std::make_shared(); - auto worker_client = std::shared_ptr(new MockWorkerClient()); - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto worker_client = std::make_shared(); + auto store = std::make_shared(); auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; }; - CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store); + CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store, + kLongTimeout); TaskSpecification task; task.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); @@ -210,10 +216,11 @@ TEST(DirectTaskTransportTest, TestSubmitOneTask) { TEST(DirectTaskTransportTest, TestHandleTaskFailure) { auto raylet_client = std::make_shared(); - auto worker_client = std::shared_ptr(new MockWorkerClient()); - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto worker_client = std::make_shared(); + auto store = std::make_shared(); auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; }; - CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store); + CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store, + kLongTimeout); TaskSpecification task; task.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); @@ -228,10 +235,11 @@ TEST(DirectTaskTransportTest, TestHandleTaskFailure) { TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) { auto raylet_client = std::make_shared(); - auto worker_client = std::shared_ptr(new MockWorkerClient()); - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto worker_client = std::make_shared(); + auto store = std::make_shared(); auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; }; - CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store); + CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store, + kLongTimeout); TaskSpecification task1; TaskSpecification task2; TaskSpecification task3; @@ -269,10 +277,11 @@ TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) { TEST(DirectTaskTransportTest, TestReuseWorkerLease) { auto raylet_client = std::make_shared(); - auto worker_client = std::shared_ptr(new MockWorkerClient()); - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto worker_client = std::make_shared(); + auto store = std::make_shared(); auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; }; - CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store); + CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store, + kLongTimeout); TaskSpecification task1; TaskSpecification task2; TaskSpecification task3; @@ -312,10 +321,11 @@ TEST(DirectTaskTransportTest, TestReuseWorkerLease) { TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) { auto raylet_client = std::make_shared(); - auto worker_client = std::shared_ptr(new MockWorkerClient()); - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto worker_client = std::make_shared(); + auto store = std::make_shared(); auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; }; - CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store); + CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store, + kLongTimeout); TaskSpecification task1; TaskSpecification task2; task1.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); @@ -346,8 +356,8 @@ TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) { TEST(DirectTaskTransportTest, TestSpillback) { auto raylet_client = std::make_shared(); - auto worker_client = std::shared_ptr(new MockWorkerClient()); - auto store = std::shared_ptr(new CoreWorkerMemoryStore()); + auto worker_client = std::make_shared(); + auto store = std::make_shared(); auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; }; std::unordered_map> remote_lease_clients; @@ -360,7 +370,7 @@ TEST(DirectTaskTransportTest, TestSpillback) { return client; }; CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, lease_client_factory, - store); + store, kLongTimeout); TaskSpecification task; task.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); @@ -390,6 +400,55 @@ TEST(DirectTaskTransportTest, TestSpillback) { ASSERT_EQ(remote_lease_clients[remote_raylet_id]->num_workers_disconnected, 0); } +TEST(DirectTaskTransportTest, TestWorkerLeaseTimeout) { + auto raylet_client = std::make_shared(); + auto worker_client = std::make_shared(); + auto store = std::make_shared(); + auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; }; + CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store, + /*lease_timeout_ms=*/5); + TaskSpecification task1; + TaskSpecification task2; + TaskSpecification task3; + task1.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); + task2.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); + task3.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); + + ASSERT_TRUE(submitter.SubmitTask(task1).ok()); + ASSERT_TRUE(submitter.SubmitTask(task2).ok()); + ASSERT_TRUE(submitter.SubmitTask(task3).ok()); + ASSERT_EQ(raylet_client->num_workers_requested, 1); + + // Task 1 is pushed. + ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1000, ClientID::Nil())); + ASSERT_EQ(worker_client->callbacks.size(), 1); + ASSERT_EQ(raylet_client->num_workers_requested, 2); + + // Task 1 finishes with failure; the worker is returned due to the error even though + // it hasn't timed out. + worker_client->callbacks[0](Status::IOError("worker dead"), rpc::PushTaskReply()); + ASSERT_EQ(worker_client->callbacks.size(), 1); + ASSERT_EQ(raylet_client->num_workers_returned, 0); + ASSERT_EQ(raylet_client->num_workers_disconnected, 1); + + // Task 2 runs successfully on the second worker; the worker is returned due to the + // timeout. + ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1001, ClientID::Nil())); + usleep(10 * 1000); // Sleep for 10ms, causing the lease to time out. + ASSERT_EQ(worker_client->callbacks.size(), 2); + worker_client->callbacks[1](Status::OK(), rpc::PushTaskReply()); + ASSERT_EQ(raylet_client->num_workers_returned, 1); + ASSERT_EQ(raylet_client->num_workers_disconnected, 1); + + // Task 3 runs successfully on the third worker; the worker is returned even though it + // hasn't timed out. + ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1002, ClientID::Nil())); + ASSERT_EQ(worker_client->callbacks.size(), 3); + worker_client->callbacks[2](Status::OK(), rpc::PushTaskReply()); + ASSERT_EQ(raylet_client->num_workers_returned, 2); + ASSERT_EQ(raylet_client->num_workers_disconnected, 1); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index a3f1539de..418ee55e8 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -27,7 +27,9 @@ void CoreWorkerDirectTaskSubmitter::HandleWorkerLeaseGranted( std::shared_ptr(client_factory_(addr)); RAY_LOG(INFO) << "Connected to " << addr.first << ":" << addr.second; } - worker_to_lease_client_[addr] = std::move(lease_client); + int64_t expiration = current_time_ms() + lease_timeout_ms_; + worker_to_lease_client_.emplace(addr, + std::make_pair(std::move(lease_client), expiration)); } // Try to assign it work. @@ -37,10 +39,10 @@ void CoreWorkerDirectTaskSubmitter::HandleWorkerLeaseGranted( void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(const rpc::WorkerAddress &addr, bool was_error) { absl::MutexLock lock(&mu_); - if (queued_tasks_.empty() || was_error) { - auto lease_client = std::move(worker_to_lease_client_[addr]); + auto entry = worker_to_lease_client_[addr]; + if (was_error || queued_tasks_.empty() || current_time_ms() > entry.second) { + RAY_CHECK_OK(entry.first->ReturnWorker(addr.second, was_error)); worker_to_lease_client_.erase(addr); - RAY_CHECK_OK(lease_client->ReturnWorker(addr.second, was_error)); } else { auto &client = *client_cache_[addr]; PushNormalTask(addr, client, queued_tasks_.front()); diff --git a/src/ray/core_worker/transport/direct_task_transport.h b/src/ray/core_worker/transport/direct_task_transport.h index e672ed8d5..b9459ea55 100644 --- a/src/ray/core_worker/transport/direct_task_transport.h +++ b/src/ray/core_worker/transport/direct_task_transport.h @@ -23,12 +23,14 @@ class CoreWorkerDirectTaskSubmitter { CoreWorkerDirectTaskSubmitter(std::shared_ptr lease_client, rpc::ClientFactoryFn client_factory, LeaseClientFactoryFn lease_client_factory, - std::shared_ptr store) + std::shared_ptr store, + int64_t lease_timeout_ms) : local_lease_client_(lease_client), client_factory_(client_factory), lease_client_factory_(lease_client_factory), in_memory_store_(store), - resolver_(in_memory_store_) {} + resolver_(in_memory_store_), + lease_timeout_ms_(lease_timeout_ms) {} /// Schedule a task for direct submission to a worker. /// @@ -85,6 +87,10 @@ class CoreWorkerDirectTaskSubmitter { /// Resolve local and remote dependencies; LocalDependencyResolver resolver_; + /// The timeout for worker leases; after this duration, workers will be returned + /// to the raylet. + int64_t lease_timeout_ms_; + // Protects task submission state below. absl::Mutex mu_; @@ -93,8 +99,9 @@ class CoreWorkerDirectTaskSubmitter { client_cache_ GUARDED_BY(mu_); /// Map from worker address to the lease client through which it should be - /// returned. - absl::flat_hash_map> + /// returned and its lease expiration time. + absl::flat_hash_map, int64_t>> worker_to_lease_client_ GUARDED_BY(mu_); // Whether we have a request to the Raylet to acquire a new worker in flight.