Implement direct task worker lease timeouts (#6188)

This commit is contained in:
Edward Oakes 2019-11-25 14:48:19 -07:00 committed by GitHub
parent e72aef2ba6
commit c9314098b9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 106 additions and 34 deletions

View file

@ -50,6 +50,10 @@ RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024)
/// this many milliseconds.
RAY_CONFIG(int64_t, initial_reconstruction_timeout_milliseconds, 10000)
/// The maximum duration that workers can hold on to another worker's lease
/// for direct task submission until it must be returned to the raylet.
RAY_CONFIG(int64_t, worker_lease_timeout_milliseconds, 500)
/// The duration between heartbeats sent from the workers to the raylet.
/// If set to a negative value, the heartbeats will not be sent.
/// These are used to report active object IDs for garbage collection and

View file

@ -208,7 +208,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
return std::shared_ptr<RayletClient>(
new RayletClient(std::move(grpc_client)));
},
memory_store_));
memory_store_, RayConfig::instance().worker_lease_timeout_milliseconds()));
}
CoreWorker::~CoreWorker() {

View file

@ -9,6 +9,11 @@
namespace ray {
// Used to prevent leases from timing out when not testing that logic. It would
// be better to use a mock clock or lease manager interface, but that's high
// overhead for the very simple timeout logic we currently have.
int64_t kLongTimeout = 1024 * 1024 * 1024;
class MockWorkerClient : public rpc::CoreWorkerClientInterface {
public:
ray::Status PushNormalTask(
@ -73,8 +78,8 @@ class MockRayletClient : public WorkerLeaseInterface {
TEST(TestMemoryStore, TestPromoteToPlasma) {
bool num_plasma_puts = 0;
auto mem = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore(
[&](const RayObject &obj, const ObjectID &obj_id) { num_plasma_puts += 1; }));
auto mem = std::make_shared<CoreWorkerMemoryStore>(
[&](const RayObject &obj, const ObjectID &obj_id) { num_plasma_puts += 1; });
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
auto data = GenerateRandomObject();
@ -96,7 +101,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) {
}
TEST(LocalDependencyResolverTest, TestNoDependencies) {
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto store = std::make_shared<CoreWorkerMemoryStore>();
LocalDependencyResolver resolver(store);
TaskSpecification task;
bool ok = false;
@ -105,7 +110,7 @@ TEST(LocalDependencyResolverTest, TestNoDependencies) {
}
TEST(LocalDependencyResolverTest, TestIgnorePlasmaDependencies) {
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto store = std::make_shared<CoreWorkerMemoryStore>();
LocalDependencyResolver resolver(store);
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::RAYLET);
TaskSpecification task;
@ -118,7 +123,7 @@ TEST(LocalDependencyResolverTest, TestIgnorePlasmaDependencies) {
}
TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) {
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto store = std::make_shared<CoreWorkerMemoryStore>();
LocalDependencyResolver resolver(store);
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
@ -139,7 +144,7 @@ TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) {
}
TEST(LocalDependencyResolverTest, TestInlineLocalDependencies) {
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto store = std::make_shared<CoreWorkerMemoryStore>();
LocalDependencyResolver resolver(store);
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
@ -162,7 +167,7 @@ TEST(LocalDependencyResolverTest, TestInlineLocalDependencies) {
}
TEST(LocalDependencyResolverTest, TestInlinePendingDependencies) {
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto store = std::make_shared<CoreWorkerMemoryStore>();
LocalDependencyResolver resolver(store);
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
@ -188,10 +193,11 @@ TEST(LocalDependencyResolverTest, TestInlinePendingDependencies) {
TEST(DirectTaskTransportTest, TestSubmitOneTask) {
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::shared_ptr<MockWorkerClient>(new MockWorkerClient());
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; };
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store);
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store,
kLongTimeout);
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
@ -210,10 +216,11 @@ TEST(DirectTaskTransportTest, TestSubmitOneTask) {
TEST(DirectTaskTransportTest, TestHandleTaskFailure) {
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::shared_ptr<MockWorkerClient>(new MockWorkerClient());
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; };
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store);
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store,
kLongTimeout);
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
@ -228,10 +235,11 @@ TEST(DirectTaskTransportTest, TestHandleTaskFailure) {
TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) {
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::shared_ptr<MockWorkerClient>(new MockWorkerClient());
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; };
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store);
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store,
kLongTimeout);
TaskSpecification task1;
TaskSpecification task2;
TaskSpecification task3;
@ -269,10 +277,11 @@ TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) {
TEST(DirectTaskTransportTest, TestReuseWorkerLease) {
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::shared_ptr<MockWorkerClient>(new MockWorkerClient());
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; };
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store);
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store,
kLongTimeout);
TaskSpecification task1;
TaskSpecification task2;
TaskSpecification task3;
@ -312,10 +321,11 @@ TEST(DirectTaskTransportTest, TestReuseWorkerLease) {
TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) {
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::shared_ptr<MockWorkerClient>(new MockWorkerClient());
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; };
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store);
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store,
kLongTimeout);
TaskSpecification task1;
TaskSpecification task2;
task1.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
@ -346,8 +356,8 @@ TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) {
TEST(DirectTaskTransportTest, TestSpillback) {
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::shared_ptr<MockWorkerClient>(new MockWorkerClient());
auto store = std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore());
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; };
std::unordered_map<ClientID, std::shared_ptr<MockRayletClient>> remote_lease_clients;
@ -360,7 +370,7 @@ TEST(DirectTaskTransportTest, TestSpillback) {
return client;
};
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, lease_client_factory,
store);
store, kLongTimeout);
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
@ -390,6 +400,55 @@ TEST(DirectTaskTransportTest, TestSpillback) {
ASSERT_EQ(remote_lease_clients[remote_raylet_id]->num_workers_disconnected, 0);
}
TEST(DirectTaskTransportTest, TestWorkerLeaseTimeout) {
auto raylet_client = std::make_shared<MockRayletClient>();
auto worker_client = std::make_shared<MockWorkerClient>();
auto store = std::make_shared<CoreWorkerMemoryStore>();
auto factory = [&](const rpc::WorkerAddress &addr) { return worker_client; };
CoreWorkerDirectTaskSubmitter submitter(raylet_client, factory, nullptr, store,
/*lease_timeout_ms=*/5);
TaskSpecification task1;
TaskSpecification task2;
TaskSpecification task3;
task1.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
task2.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
task3.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
ASSERT_TRUE(submitter.SubmitTask(task1).ok());
ASSERT_TRUE(submitter.SubmitTask(task2).ok());
ASSERT_TRUE(submitter.SubmitTask(task3).ok());
ASSERT_EQ(raylet_client->num_workers_requested, 1);
// Task 1 is pushed.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1000, ClientID::Nil()));
ASSERT_EQ(worker_client->callbacks.size(), 1);
ASSERT_EQ(raylet_client->num_workers_requested, 2);
// Task 1 finishes with failure; the worker is returned due to the error even though
// it hasn't timed out.
worker_client->callbacks[0](Status::IOError("worker dead"), rpc::PushTaskReply());
ASSERT_EQ(worker_client->callbacks.size(), 1);
ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(raylet_client->num_workers_disconnected, 1);
// Task 2 runs successfully on the second worker; the worker is returned due to the
// timeout.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1001, ClientID::Nil()));
usleep(10 * 1000); // Sleep for 10ms, causing the lease to time out.
ASSERT_EQ(worker_client->callbacks.size(), 2);
worker_client->callbacks[1](Status::OK(), rpc::PushTaskReply());
ASSERT_EQ(raylet_client->num_workers_returned, 1);
ASSERT_EQ(raylet_client->num_workers_disconnected, 1);
// Task 3 runs successfully on the third worker; the worker is returned even though it
// hasn't timed out.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1002, ClientID::Nil()));
ASSERT_EQ(worker_client->callbacks.size(), 3);
worker_client->callbacks[2](Status::OK(), rpc::PushTaskReply());
ASSERT_EQ(raylet_client->num_workers_returned, 2);
ASSERT_EQ(raylet_client->num_workers_disconnected, 1);
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -27,7 +27,9 @@ void CoreWorkerDirectTaskSubmitter::HandleWorkerLeaseGranted(
std::shared_ptr<rpc::CoreWorkerClientInterface>(client_factory_(addr));
RAY_LOG(INFO) << "Connected to " << addr.first << ":" << addr.second;
}
worker_to_lease_client_[addr] = std::move(lease_client);
int64_t expiration = current_time_ms() + lease_timeout_ms_;
worker_to_lease_client_.emplace(addr,
std::make_pair(std::move(lease_client), expiration));
}
// Try to assign it work.
@ -37,10 +39,10 @@ void CoreWorkerDirectTaskSubmitter::HandleWorkerLeaseGranted(
void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(const rpc::WorkerAddress &addr,
bool was_error) {
absl::MutexLock lock(&mu_);
if (queued_tasks_.empty() || was_error) {
auto lease_client = std::move(worker_to_lease_client_[addr]);
auto entry = worker_to_lease_client_[addr];
if (was_error || queued_tasks_.empty() || current_time_ms() > entry.second) {
RAY_CHECK_OK(entry.first->ReturnWorker(addr.second, was_error));
worker_to_lease_client_.erase(addr);
RAY_CHECK_OK(lease_client->ReturnWorker(addr.second, was_error));
} else {
auto &client = *client_cache_[addr];
PushNormalTask(addr, client, queued_tasks_.front());

View file

@ -23,12 +23,14 @@ class CoreWorkerDirectTaskSubmitter {
CoreWorkerDirectTaskSubmitter(std::shared_ptr<WorkerLeaseInterface> lease_client,
rpc::ClientFactoryFn client_factory,
LeaseClientFactoryFn lease_client_factory,
std::shared_ptr<CoreWorkerMemoryStore> store)
std::shared_ptr<CoreWorkerMemoryStore> store,
int64_t lease_timeout_ms)
: local_lease_client_(lease_client),
client_factory_(client_factory),
lease_client_factory_(lease_client_factory),
in_memory_store_(store),
resolver_(in_memory_store_) {}
resolver_(in_memory_store_),
lease_timeout_ms_(lease_timeout_ms) {}
/// Schedule a task for direct submission to a worker.
///
@ -85,6 +87,10 @@ class CoreWorkerDirectTaskSubmitter {
/// Resolve local and remote dependencies;
LocalDependencyResolver resolver_;
/// The timeout for worker leases; after this duration, workers will be returned
/// to the raylet.
int64_t lease_timeout_ms_;
// Protects task submission state below.
absl::Mutex mu_;
@ -93,8 +99,9 @@ class CoreWorkerDirectTaskSubmitter {
client_cache_ GUARDED_BY(mu_);
/// Map from worker address to the lease client through which it should be
/// returned.
absl::flat_hash_map<rpc::WorkerAddress, std::shared_ptr<WorkerLeaseInterface>>
/// returned and its lease expiration time.
absl::flat_hash_map<rpc::WorkerAddress,
std::pair<std::shared_ptr<WorkerLeaseInterface>, int64_t>>
worker_to_lease_client_ GUARDED_BY(mu_);
// Whether we have a request to the Raylet to acquire a new worker in flight.