Only grant or reject spillback lease request (#20050)

This commit is contained in:
Jiajun Yao 2021-11-14 21:34:28 -08:00 committed by GitHub
parent 440da92263
commit 61778a952d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 438 additions and 377 deletions

View file

@ -104,7 +104,7 @@ class Node:
ray_params._system_config) > 0 and (not head ray_params._system_config) > 0 and (not head
and not connect_only): and not connect_only):
raise ValueError( raise ValueError(
"Internal config parameters can only be set on the head node.") "System config parameters can only be set on the head node.")
self._raylet_ip_address = raylet_ip_address self._raylet_ip_address = raylet_ip_address

View file

@ -31,13 +31,13 @@ class MockWorkerLeaseInterface : public WorkerLeaseInterface {
public: public:
MOCK_METHOD( MOCK_METHOD(
void, RequestWorkerLease, void, RequestWorkerLease,
(const ray::TaskSpecification &resource_spec, (const ray::TaskSpecification &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size), const int64_t backlog_size),
(override)); (override));
MOCK_METHOD( MOCK_METHOD(
void, RequestWorkerLease, void, RequestWorkerLease,
(const rpc::TaskSpec &task_spec, (const rpc::TaskSpec &task_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size), const int64_t backlog_size),
(override)); (override));
@ -122,13 +122,13 @@ class MockRayletClientInterface : public RayletClientInterface {
(override)); (override));
MOCK_METHOD( MOCK_METHOD(
void, RequestWorkerLease, void, RequestWorkerLease,
(const ray::TaskSpecification &resource_spec, (const ray::TaskSpecification &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size), const int64_t backlog_size),
(override)); (override));
MOCK_METHOD( MOCK_METHOD(
void, RequestWorkerLease, void, RequestWorkerLease,
(const rpc::TaskSpec &resource_spec, (const rpc::TaskSpec &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size), const int64_t backlog_size),
(override)); (override));

View file

@ -178,15 +178,18 @@ class MockRayletClient : public WorkerLeaseInterface {
} }
void RequestWorkerLease( void RequestWorkerLease(
const TaskSpecification &resource_spec, const TaskSpecification &resource_spec, bool grant_or_reject,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback, const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size) override { const int64_t backlog_size) override {
num_workers_requested += 1; num_workers_requested += 1;
if (grant_or_reject) {
num_grant_or_reject_leases_requested += 1;
}
callbacks.push_back(callback); callbacks.push_back(callback);
} }
void RequestWorkerLease( void RequestWorkerLease(
const rpc::TaskSpec &task_spec, const rpc::TaskSpec &task_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size = -1) override { const int64_t backlog_size = -1) override {
num_workers_requested += 1; num_workers_requested += 1;
@ -207,10 +210,12 @@ class MockRayletClient : public WorkerLeaseInterface {
// Trigger reply to RequestWorkerLease. // Trigger reply to RequestWorkerLease.
bool GrantWorkerLease(const std::string &address, int port, bool GrantWorkerLease(const std::string &address, int port,
const NodeID &retry_at_raylet_id, bool cancel = false, const NodeID &retry_at_raylet_id, bool cancel = false,
std::string worker_id = std::string()) { std::string worker_id = std::string(), bool reject = false) {
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
if (cancel) { if (cancel) {
reply.set_canceled(true); reply.set_canceled(true);
} else if (reject) {
reply.set_rejected(true);
} else if (!retry_at_raylet_id.IsNil()) { } else if (!retry_at_raylet_id.IsNil()) {
reply.mutable_retry_at_raylet_address()->set_ip_address(address); reply.mutable_retry_at_raylet_address()->set_ip_address(address);
reply.mutable_retry_at_raylet_address()->set_port(port); reply.mutable_retry_at_raylet_address()->set_port(port);
@ -250,6 +255,7 @@ class MockRayletClient : public WorkerLeaseInterface {
~MockRayletClient() {} ~MockRayletClient() {}
int num_grant_or_reject_leases_requested = 0;
int num_workers_requested = 0; int num_workers_requested = 0;
int num_workers_returned = 0; int num_workers_returned = 0;
int num_workers_disconnected = 0; int num_workers_disconnected = 0;
@ -1108,6 +1114,7 @@ TEST(DirectTaskTransportTest, TestSpillbackRoundTrip) {
TaskSpecification task = BuildEmptyTaskSpec(); TaskSpecification task = BuildEmptyTaskSpec();
ASSERT_TRUE(submitter.SubmitTask(task).ok()); ASSERT_TRUE(submitter.SubmitTask(task).ok());
ASSERT_EQ(raylet_client->num_grant_or_reject_leases_requested, 0);
ASSERT_EQ(raylet_client->num_workers_requested, 1); ASSERT_EQ(raylet_client->num_workers_requested, 1);
ASSERT_EQ(raylet_client->num_workers_returned, 0); ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(worker_client->callbacks.size(), 0); ASSERT_EQ(worker_client->callbacks.size(), 0);
@ -1117,12 +1124,15 @@ TEST(DirectTaskTransportTest, TestSpillbackRoundTrip) {
auto remote_raylet_id = NodeID::FromRandom(); auto remote_raylet_id = NodeID::FromRandom();
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 7777, remote_raylet_id)); ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 7777, remote_raylet_id));
ASSERT_EQ(remote_lease_clients.count(7777), 1); ASSERT_EQ(remote_lease_clients.count(7777), 1);
ASSERT_EQ(remote_lease_clients[7777]->num_workers_requested, 1);
// Confirm that the spillback lease request has grant_or_reject set to true.
ASSERT_EQ(remote_lease_clients[7777]->num_grant_or_reject_leases_requested, 1);
// Confirm that lease policy is not consulted on spillback. // Confirm that lease policy is not consulted on spillback.
ASSERT_EQ(lease_policy->num_lease_policy_consults, 1); ASSERT_EQ(lease_policy->num_lease_policy_consults, 1);
ASSERT_FALSE(raylet_client->GrantWorkerLease("remote", 1234, NodeID::Nil())); ASSERT_FALSE(raylet_client->GrantWorkerLease("remote", 1234, NodeID::Nil()));
// Trigger a spillback back to the local node. // Trigger a rejection back to the local node.
ASSERT_TRUE( ASSERT_TRUE(remote_lease_clients[7777]->GrantWorkerLease("local", 1234, local_raylet_id,
remote_lease_clients[7777]->GrantWorkerLease("local", 1234, local_raylet_id)); false, "", /*reject=*/true));
// We should not have created another lease client to the local raylet. // We should not have created another lease client to the local raylet.
ASSERT_EQ(remote_lease_clients.size(), 1); ASSERT_EQ(remote_lease_clients.size(), 1);
// There should be no more callbacks on the remote node. // There should be no more callbacks on the remote node.
@ -1130,6 +1140,8 @@ TEST(DirectTaskTransportTest, TestSpillbackRoundTrip) {
remote_lease_clients[7777]->GrantWorkerLease("remote", 1234, NodeID::Nil())); remote_lease_clients[7777]->GrantWorkerLease("remote", 1234, NodeID::Nil()));
// The worker is returned to the local node. // The worker is returned to the local node.
ASSERT_EQ(raylet_client->num_grant_or_reject_leases_requested, 0);
ASSERT_EQ(raylet_client->num_workers_requested, 2);
ASSERT_TRUE(raylet_client->GrantWorkerLease("local", 1234, NodeID::Nil())); ASSERT_TRUE(raylet_client->GrantWorkerLease("local", 1234, NodeID::Nil()));
ASSERT_TRUE(worker_client->ReplyPushTask()); ASSERT_TRUE(worker_client->ReplyPushTask());
ASSERT_EQ(raylet_client->num_workers_returned, 1); ASSERT_EQ(raylet_client->num_workers_returned, 1);

View file

@ -520,6 +520,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
resource_spec_msg.set_task_id(TaskID::FromRandom(job_id_).Binary()); resource_spec_msg.set_task_id(TaskID::FromRandom(job_id_).Binary());
const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg); const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg);
rpc::Address best_node_address; rpc::Address best_node_address;
const bool is_spillback = (raylet_address != nullptr);
if (raylet_address == nullptr) { if (raylet_address == nullptr) {
// If no raylet address is given, find the best worker for our next lease request. // If no raylet address is given, find the best worker for our next lease request.
best_node_address = lease_policy_->GetBestNodeForTask(resource_spec); best_node_address = lease_policy_->GetBestNodeForTask(resource_spec);
@ -534,7 +535,8 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
lease_client->RequestWorkerLease( lease_client->RequestWorkerLease(
resource_spec, resource_spec,
[this, scheduling_key, task_id, raylet_address = *raylet_address]( /*grant_or_reject=*/is_spillback,
[this, scheduling_key, task_id, is_spillback, raylet_address = *raylet_address](
const Status &status, const rpc::RequestWorkerLeaseReply &reply) { const Status &status, const rpc::RequestWorkerLeaseReply &reply) {
absl::MutexLock lock(&mu_); absl::MutexLock lock(&mu_);
@ -561,6 +563,14 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
} else if (reply.canceled()) { } else if (reply.canceled()) {
RAY_LOG(DEBUG) << "Lease canceled " << task_id; RAY_LOG(DEBUG) << "Lease canceled " << task_id;
RequestNewWorkerIfNeeded(scheduling_key); RequestNewWorkerIfNeeded(scheduling_key);
} else if (reply.rejected()) {
RAY_LOG(DEBUG) << "Lease rejected " << task_id;
// It might happen when the first raylet has a stale view
// of the spillback raylet resources.
// Retry the request at the first raylet since the resource view may be
// refreshed.
RAY_CHECK(is_spillback);
RequestNewWorkerIfNeeded(scheduling_key);
} else if (!reply.worker_address().raylet_id().empty()) { } else if (!reply.worker_address().raylet_id().empty()) {
// We got a lease for a worker. Add the lease client state and try to // We got a lease for a worker. Add the lease client state and try to
// assign work to the worker. // assign work to the worker.
@ -577,6 +587,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
/*error=*/false, resources_copy); /*error=*/false, resources_copy);
} else { } else {
// The raylet redirected us to a different raylet to retry at. // The raylet redirected us to a different raylet to retry at.
RAY_CHECK(!is_spillback);
RAY_LOG(DEBUG) << "Redirect lease for task " << task_id << " from raylet " RAY_LOG(DEBUG) << "Redirect lease for task " << task_id << " from raylet "
<< NodeID::FromBinary(raylet_address.raylet_id()) << NodeID::FromBinary(raylet_address.raylet_id())
<< " to raylet " << " to raylet "

View file

@ -229,6 +229,7 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
// backlog in GCS. // backlog in GCS.
lease_client->RequestWorkerLease( lease_client->RequestWorkerLease(
actor->GetActorTableData().task_spec(), actor->GetActorTableData().task_spec(),
RayConfig::instance().gcs_actor_scheduling_enabled(),
[this, actor, node](const Status &status, [this, actor, node](const Status &status,
const rpc::RequestWorkerLeaseReply &reply) { const rpc::RequestWorkerLeaseReply &reply) {
HandleWorkerLeaseReply(actor, node, status, reply); HandleWorkerLeaseReply(actor, node, status, reply);

View file

@ -81,8 +81,8 @@ TEST_F(GcsActorSchedulerTest, KillWorkerLeak1) {
actor_data.set_actor_id(actor_id.Binary()); actor_data.set_actor_id(actor_id.Binary());
auto actor = std::make_shared<GcsActor>(actor_data); auto actor = std::make_shared<GcsActor>(actor_data);
std::function<void(const Status &, const rpc::RequestWorkerLeaseReply &)> cb; std::function<void(const Status &, const rpc::RequestWorkerLeaseReply &)> cb;
EXPECT_CALL(*raylet_client, RequestWorkerLease(An<const rpc::TaskSpec &>(), _, _)) EXPECT_CALL(*raylet_client, RequestWorkerLease(An<const rpc::TaskSpec &>(), _, _, _))
.WillOnce(testing::SaveArg<1>(&cb)); .WillOnce(testing::SaveArg<2>(&cb));
// Ensure actor is killed // Ensure actor is killed
EXPECT_CALL(*core_worker_client, KillActor(_, _)); EXPECT_CALL(*core_worker_client, KillActor(_, _));
actor_scheduler->Schedule(actor); actor_scheduler->Schedule(actor);
@ -110,8 +110,8 @@ TEST_F(GcsActorSchedulerTest, KillWorkerLeak2) {
rpc::ClientCallback<rpc::RequestWorkerLeaseReply> request_worker_lease_cb; rpc::ClientCallback<rpc::RequestWorkerLeaseReply> request_worker_lease_cb;
// Ensure actor is killed // Ensure actor is killed
EXPECT_CALL(*core_worker_client, KillActor(_, _)); EXPECT_CALL(*core_worker_client, KillActor(_, _));
EXPECT_CALL(*raylet_client, RequestWorkerLease(An<const rpc::TaskSpec &>(), _, _)) EXPECT_CALL(*raylet_client, RequestWorkerLease(An<const rpc::TaskSpec &>(), _, _, _))
.WillOnce(testing::SaveArg<1>(&request_worker_lease_cb)); .WillOnce(testing::SaveArg<2>(&request_worker_lease_cb));
std::function<void(ray::Status)> async_put_with_index_cb; std::function<void(ray::Status)> async_put_with_index_cb;
// Leasing successfully // Leasing successfully

View file

@ -77,7 +77,7 @@ struct GcsServerMocker {
/// WorkerLeaseInterface /// WorkerLeaseInterface
void RequestWorkerLease( void RequestWorkerLease(
const ray::TaskSpecification &resource_spec, const ray::TaskSpecification &resource_spec, bool grant_or_reject,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback, const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size = -1) override { const int64_t backlog_size = -1) override {
num_workers_requested += 1; num_workers_requested += 1;
@ -85,7 +85,7 @@ struct GcsServerMocker {
} }
void RequestWorkerLease( void RequestWorkerLease(
const rpc::TaskSpec &spec, const rpc::TaskSpec &spec, bool grant_or_reject,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback, const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size = -1) override { const int64_t backlog_size = -1) override {
num_workers_requested += 1; num_workers_requested += 1;

View file

@ -44,6 +44,10 @@ message RequestWorkerLeaseRequest {
TaskSpec resource_spec = 1; TaskSpec resource_spec = 1;
// Worker's backlog size for this spec's shape. // Worker's backlog size for this spec's shape.
int64 backlog_size = 2; int64 backlog_size = 2;
// If it's true, either grant the lease if the task is
// locally schedulable or reject the request.
// Else, the raylet may return another raylet at which to retry the request.
bool grant_or_reject = 3;
} }
message RequestWorkerLeaseReply { message RequestWorkerLeaseReply {

View file

@ -1573,7 +1573,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
rpc::Task task_message; rpc::Task task_message;
task_message.mutable_task_spec()->CopyFrom(request.resource_spec()); task_message.mutable_task_spec()->CopyFrom(request.resource_spec());
RayTask task(task_message); RayTask task(task_message);
bool is_actor_creation_task = task.GetTaskSpecification().IsActorCreationTask(); const bool is_actor_creation_task = task.GetTaskSpecification().IsActorCreationTask();
ActorID actor_id = ActorID::Nil(); ActorID actor_id = ActorID::Nil();
metrics_num_task_scheduled_ += 1; metrics_num_task_scheduled_ += 1;
@ -1593,48 +1593,36 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
worker_pool_.PrestartWorkers(task_spec, request.backlog_size(), available_cpus); worker_pool_.PrestartWorkers(task_spec, request.backlog_size(), available_cpus);
} }
if (!(RayConfig::instance().gcs_actor_scheduling_enabled() && auto send_reply_callback_wrapper = [this, is_actor_creation_task, actor_id, reply,
task.GetTaskSpecification().IsActorCreationTask())) { send_reply_callback](
cluster_task_manager_->QueueAndScheduleTask(task, reply, send_reply_callback);
return;
}
auto send_reply_callback_wrapper = [this, actor_id, reply, send_reply_callback](
Status status, std::function<void()> success, Status status, std::function<void()> success,
std::function<void()> failure) { std::function<void()> failure) {
if (!reply->rejected()) { // If resources are not enough due to normal tasks' preemption
send_reply_callback(status, success, failure); // for GCS based actor scheduling, return a rejection
return; // with normal task resource usages so GCS can update
// its resource view of this raylet.
if (reply->rejected() && is_actor_creation_task) {
ResourceSet normal_task_resources =
cluster_task_manager_->CalcNormalTaskResources();
RAY_LOG(DEBUG) << "Reject leasing as the raylet has no enough resources."
<< " actor_id = " << actor_id
<< ", normal_task_resources = " << normal_task_resources.ToString()
<< ", local_resoruce_view = "
<< cluster_resource_scheduler_->GetLocalResourceViewString();
auto resources_data = reply->mutable_resources_data();
resources_data->set_node_id(self_node_id_.Binary());
resources_data->set_resources_normal_task_changed(true);
auto &normal_task_map = *(resources_data->mutable_resources_normal_task());
normal_task_map = {normal_task_resources.GetResourceMap().begin(),
normal_task_resources.GetResourceMap().end()};
resources_data->set_resources_normal_task_timestamp(absl::GetCurrentTimeNanos());
} }
// If the reqiured resource and normal task resource exceed available resource, send_reply_callback(status, success, failure);
// reject it.
ResourceSet normal_task_resources = cluster_task_manager_->CalcNormalTaskResources();
RAY_LOG(DEBUG) << "Reject leasing as the raylet has no enough resources."
<< " actor_id = " << actor_id
<< ", normal_task_resources = " << normal_task_resources.ToString()
<< ", local_resoruce_view = "
<< cluster_resource_scheduler_->GetLocalResourceViewString();
auto resources_data = reply->mutable_resources_data();
resources_data->set_node_id(self_node_id_.Binary());
resources_data->set_resources_normal_task_changed(true);
auto &normal_task_map = *(resources_data->mutable_resources_normal_task());
normal_task_map = {normal_task_resources.GetResourceMap().begin(),
normal_task_resources.GetResourceMap().end()};
resources_data->set_resources_normal_task_timestamp(absl::GetCurrentTimeNanos());
send_reply_callback(Status::OK(), /*success=*/nullptr, /*failure=*/nullptr);
}; };
// If resources are not enough due to normal tasks' preemption, return a rejection with cluster_task_manager_->QueueAndScheduleTask(task, request.grant_or_reject(), reply,
// normal task resource usages. send_reply_callback_wrapper);
if (!cluster_task_manager_->IsLocallySchedulable(task)) {
reply->set_rejected(true);
send_reply_callback_wrapper(Status::OK(), /*success=*/nullptr, /*failure=*/nullptr);
return;
}
cluster_task_manager_->QueueAndScheduleTask(task, reply, send_reply_callback_wrapper);
} }
void NodeManager::HandlePrepareBundleResources( void NodeManager::HandlePrepareBundleResources(

View file

@ -196,8 +196,6 @@ struct Node {
Node(const NodeResources &resources) Node(const NodeResources &resources)
: last_reported_(resources), local_view_(resources) {} : last_reported_(resources), local_view_(resources) {}
void ResetLocalView() { local_view_ = last_reported_; }
NodeResources *GetMutableLocalView() { return &local_view_; } NodeResources *GetMutableLocalView() { return &local_view_; }
const NodeResources &GetLocalView() const { return local_view_; } const NodeResources &GetLocalView() const { return local_view_; }

View file

@ -18,20 +18,17 @@
#include "ray/common/grpc_util.h" #include "ray/common/grpc_util.h"
#include "ray/common/ray_config.h" #include "ray/common/ray_config.h"
#include "ray/raylet/scheduling/scheduling_policy.h"
namespace ray { namespace ray {
ClusterResourceScheduler::ClusterResourceScheduler()
: spread_threshold_(RayConfig::instance().scheduler_spread_threshold()){};
ClusterResourceScheduler::ClusterResourceScheduler( ClusterResourceScheduler::ClusterResourceScheduler(
int64_t local_node_id, const NodeResources &local_node_resources, int64_t local_node_id, const NodeResources &local_node_resources,
gcs::GcsClient &gcs_client) gcs::GcsClient &gcs_client)
: spread_threshold_(RayConfig::instance().scheduler_spread_threshold()), : local_node_id_(local_node_id),
local_node_id_(local_node_id),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()), gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()),
gcs_client_(&gcs_client) { gcs_client_(&gcs_client) {
scheduling_policy_ = std::make_unique<raylet_scheduling_policy::SchedulingPolicy>(
local_node_id_, nodes_, RayConfig::instance().scheduler_spread_threshold());
InitResourceUnitInstanceInfo(); InitResourceUnitInstanceInfo();
AddOrUpdateNode(local_node_id_, local_node_resources); AddOrUpdateNode(local_node_id_, local_node_resources);
InitLocalResources(local_node_resources); InitLocalResources(local_node_resources);
@ -42,10 +39,11 @@ ClusterResourceScheduler::ClusterResourceScheduler(
const absl::flat_hash_map<std::string, double> &local_node_resources, const absl::flat_hash_map<std::string, double> &local_node_resources,
gcs::GcsClient &gcs_client, std::function<int64_t(void)> get_used_object_store_memory, gcs::GcsClient &gcs_client, std::function<int64_t(void)> get_used_object_store_memory,
std::function<bool(void)> get_pull_manager_at_capacity) std::function<bool(void)> get_pull_manager_at_capacity)
: spread_threshold_(RayConfig::instance().scheduler_spread_threshold()), : get_pull_manager_at_capacity_(get_pull_manager_at_capacity),
get_pull_manager_at_capacity_(get_pull_manager_at_capacity),
gcs_client_(&gcs_client) { gcs_client_(&gcs_client) {
local_node_id_ = string_to_int_map_.Insert(local_node_id); local_node_id_ = string_to_int_map_.Insert(local_node_id);
scheduling_policy_ = std::make_unique<raylet_scheduling_policy::SchedulingPolicy>(
local_node_id_, nodes_, RayConfig::instance().scheduler_spread_threshold());
NodeResources node_resources = ResourceMapToNodeResources( NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, local_node_resources, local_node_resources); string_to_int_map_, local_node_resources, local_node_resources);
@ -172,17 +170,15 @@ bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) {
return RemoveNode(node_id); return RemoveNode(node_id);
} }
int64_t ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_request, bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_request,
int64_t node_id, int64_t node_id,
const NodeResources &resources) const { const NodeResources &resources) const {
int violations = 0;
if (resource_request.requires_object_store_memory && resources.object_pulls_queued && if (resource_request.requires_object_store_memory && resources.object_pulls_queued &&
node_id != local_node_id_) { node_id != local_node_id_) {
// It's okay if the local node's pull manager is at capacity because we // It's okay if the local node's pull manager is at capacity because we
// will eventually spill the task back from the waiting queue if its args // will eventually spill the task back from the waiting queue if its args
// cannot be pulled. // cannot be pulled.
return -1; return false;
} }
// First, check predefined resources. // First, check predefined resources.
@ -191,7 +187,7 @@ int64_t ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_
resources.predefined_resources[i].available) { resources.predefined_resources[i].available) {
// A hard constraint has been violated, so we cannot schedule // A hard constraint has been violated, so we cannot schedule
// this resource request. // this resource request.
return -1; return false;
} }
} }
@ -202,16 +198,16 @@ int64_t ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_
if (it == resources.custom_resources.end()) { if (it == resources.custom_resources.end()) {
// Requested resource doesn't exist at this node. // Requested resource doesn't exist at this node.
// This is a hard constraint so cannot schedule this resource request. // This is a hard constraint so cannot schedule this resource request.
return -1; return false;
} else { } else {
if (task_req_custom_resource.second > it->second.available) { if (task_req_custom_resource.second > it->second.available) {
// Resource constraint is violated. // Resource constraint is violated.
return -1; return false;
} }
} }
} }
return violations; return true;
} }
int64_t ClusterResourceScheduler::GetBestSchedulableNode( int64_t ClusterResourceScheduler::GetBestSchedulableNode(
@ -246,16 +242,11 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(
return best_node; return best_node;
} }
auto spread_threshold = spread_threshold_;
// If the scheduling decision is made by gcs, we ignore the spread threshold
if (actor_creation && RayConfig::instance().gcs_actor_scheduling_enabled()) {
spread_threshold = 1.0;
}
// TODO (Alex): Setting require_available == force_spillback is a hack in order to // TODO (Alex): Setting require_available == force_spillback is a hack in order to
// remain bug compatible with the legacy scheduling algorithms. // remain bug compatible with the legacy scheduling algorithms.
int64_t best_node_id = raylet_scheduling_policy::HybridPolicy( int64_t best_node_id = scheduling_policy_->HybridPolicy(
resource_request, local_node_id_, nodes_, spread_threshold, force_spillback, resource_request, force_spillback, force_spillback,
force_spillback, [this](auto node_id) { return this->NodeAlive(node_id); }); [this](auto node_id) { return this->NodeAlive(node_id); });
*is_infeasible = best_node_id == -1 ? true : false; *is_infeasible = best_node_id == -1 ? true : false;
if (!*is_infeasible) { if (!*is_infeasible) {
// TODO (Alex): Support soft constraints if needed later. // TODO (Alex): Support soft constraints if needed later.
@ -298,7 +289,7 @@ bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources(
NodeResources *resources = it->second.GetMutableLocalView(); NodeResources *resources = it->second.GetMutableLocalView();
// Just double check this node can still schedule the resource request. // Just double check this node can still schedule the resource request.
if (IsSchedulable(resource_request, node_id, *resources) == -1) { if (!IsSchedulable(resource_request, node_id, *resources)) {
return false; return false;
} }
@ -969,16 +960,6 @@ void ClusterResourceScheduler::FillResourceUsage(rpc::ResourcesData &resources_d
last_report_resources_.reset(new NodeResources(node_resources)); last_report_resources_.reset(new NodeResources(node_resources));
} }
// Reset all local views for remote nodes. This is needed in case tasks that
// we spilled back to a remote node were not actually scheduled on the
// node. Then, the remote node's resource availability may not change and
// so it may not send us another update.
for (auto &node : nodes_) {
if (node.first != local_node_id_) {
node.second.ResetLocalView();
}
}
// Automatically report object store usage. // Automatically report object store usage.
// XXX: this MUTATES the resources field, which is needed since we are storing // XXX: this MUTATES the resources field, which is needed since we are storing
// it in last_report_resources_. // it in last_report_resources_.
@ -1083,7 +1064,7 @@ bool ClusterResourceScheduler::IsLocallySchedulable(
const absl::flat_hash_map<std::string, double> &shape) { const absl::flat_hash_map<std::string, double> &shape) {
auto resource_request = ResourceMapToResourceRequest( auto resource_request = ResourceMapToResourceRequest(
string_to_int_map_, shape, /*requires_object_store_memory=*/false); string_to_int_map_, shape, /*requires_object_store_memory=*/false);
return IsSchedulable(resource_request, local_node_id_, GetLocalNodeResources()) == 0; return IsSchedulable(resource_request, local_node_id_, GetLocalNodeResources());
} }
} // namespace ray } // namespace ray

View file

@ -29,6 +29,7 @@
#include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h" #include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h"
#include "ray/raylet/scheduling/fixed_point.h" #include "ray/raylet/scheduling/fixed_point.h"
#include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/raylet/scheduling/scheduling_ids.h"
#include "ray/raylet/scheduling/scheduling_policy.h"
#include "ray/util/logging.h" #include "ray/util/logging.h"
#include "src/ray/protobuf/gcs.pb.h" #include "src/ray/protobuf/gcs.pb.h"
@ -41,7 +42,7 @@ using rpc::HeartbeatTableData;
/// resources at those nodes. /// resources at those nodes.
class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
public: public:
ClusterResourceScheduler(void); ClusterResourceScheduler() {}
/// Constructor initializing the resources associated with the local node. /// Constructor initializing the resources associated with the local node.
/// ///
/// \param local_node_id: ID of local node, /// \param local_node_id: ID of local node,
@ -94,20 +95,16 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
/// are available when we call this function, and this way we avoid /// are available when we call this function, and this way we avoid
/// a map find call which could be expensive.) /// a map find call which could be expensive.)
/// ///
/// \return: -1, if the request cannot be scheduled. This happens when at /// \return: Whether the request can be scheduled.
/// least a hard constraints is violated. bool IsSchedulable(const ResourceRequest &resource_request, int64_t node_id,
/// >= 0, the number soft constraint violations. If 0, no const NodeResources &resources) const;
/// constraint is violated.
int64_t IsSchedulable(const ResourceRequest &resource_request, int64_t node_id,
const NodeResources &resources) const;
/// Find a node in the cluster on which we can schedule a given resource request. /// Find a node in the cluster on which we can schedule a given resource request.
/// In hybrid mode, see `scheduling_policy.h` for a description of the policy. /// In hybrid mode, see `scheduling_policy.h` for a description of the policy.
/// In legacy mode, see `GetBestSchedulableNodeLegacy` for a description of the policy.
/// ///
/// \param resource_request: Task to be scheduled. /// \param resource_request: Task to be scheduled.
/// \param actor_creation: True if this is an actor creation task. /// \param actor_creation: True if this is an actor creation task.
/// \param force_spillback For non-actor creation requests, pick a remote /// \param force_spillback: For non-actor creation requests, pick a remote
/// feasible node. If this is false, then the task may be scheduled to the /// feasible node. If this is false, then the task may be scheduled to the
/// local node. /// local node.
/// \param violations: The number of soft constraint violations associated /// \param violations: The number of soft constraint violations associated
@ -123,8 +120,7 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
int64_t *violations, bool *is_infeasible); int64_t *violations, bool *is_infeasible);
/// Similar to /// Similar to
/// int64_t GetBestSchedulableNode(const ResourceRequest &resource_request, int64_t /// int64_t GetBestSchedulableNode(...)
/// *violations)
/// but the return value is different: /// but the return value is different:
/// \return "", if no node can schedule the current request; otherwise, /// \return "", if no node can schedule the current request; otherwise,
/// return the ID in string format of a node that can schedule the /// return the ID in string format of a node that can schedule the
@ -412,13 +408,13 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
bool SubtractRemoteNodeAvailableResources(int64_t node_id, bool SubtractRemoteNodeAvailableResources(int64_t node_id,
const ResourceRequest &resource_request); const ResourceRequest &resource_request);
/// The threshold at which to switch from packing to spreading.
const float spread_threshold_;
/// List of nodes in the clusters and their resources organized as a map. /// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID. /// The key of the map is the node ID.
absl::flat_hash_map<int64_t, Node> nodes_; absl::flat_hash_map<int64_t, Node> nodes_;
/// Identifier of local node. /// Identifier of local node.
int64_t local_node_id_; int64_t local_node_id_;
/// The scheduling policy to use.
std::unique_ptr<raylet_scheduling_policy::SchedulingPolicy> scheduling_policy_;
/// Internally maintained random number generator. /// Internally maintained random number generator.
std::mt19937_64 gen_; std::mt19937_64 gen_;
/// Resources of local node. /// Resources of local node.

View file

@ -337,7 +337,6 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
bool is_infeasible; bool is_infeasible;
int64_t node_id = resource_scheduler.GetBestSchedulableNode( int64_t node_id = resource_scheduler.GetBestSchedulableNode(
resource_request, false, false, &violations, &is_infeasible); resource_request, false, false, &violations, &is_infeasible);
ASSERT_TRUE(node_id != -1);
ASSERT_EQ(node_id, 1); ASSERT_EQ(node_id, 1);
ASSERT_TRUE(violations == 0); ASSERT_TRUE(violations == 0);
@ -1142,15 +1141,13 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
ASSERT_FALSE(resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation)); ASSERT_FALSE(resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation));
for (int num_slots_available = 0; num_slots_available <= 2; num_slots_available++) { for (int num_slots_available = 0; num_slots_available <= 2; num_slots_available++) {
// Remote node reports updated resource availability.
resource_scheduler.AddOrUpdateNode(remote, {{"CPU", 2.}},
{{"CPU", num_slots_available}});
rpc::ResourcesData data; rpc::ResourcesData data;
int64_t t; int64_t t;
bool is_infeasible; bool is_infeasible;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
// Resource usage report tick should reset the remote node's resources. // Remote node reports update local view.
resource_scheduler.FillResourceUsage(data); resource_scheduler.AddOrUpdateNode(remote, {{"CPU", 2.}},
{{"CPU", num_slots_available}});
for (int j = 0; j < num_slots_available; j++) { for (int j = 0; j < num_slots_available; j++) {
ASSERT_EQ(remote, resource_scheduler.GetBestSchedulableNode( ASSERT_EQ(remote, resource_scheduler.GetBestSchedulableNode(
task_spec, false, false, true, &t, &is_infeasible)); task_spec, false, false, true, &t, &is_infeasible));
@ -1249,35 +1246,6 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) {
node_ids[51]); node_ids[51]);
} }
TEST_F(ClusterResourceSchedulerTest, ActorDecision) {
auto local_node = NodeID::FromRandom();
auto remote_node = NodeID::FromRandom();
std::string cpu = "CPU";
absl::flat_hash_map<std::string, double> resource;
resource[cpu] = 2.0;
absl::flat_hash_map<std::string, double> available;
available[cpu] = 1.5;
ClusterResourceScheduler resource_scheduler(local_node.Binary(), resource,
*gcs_client_);
resource_scheduler.AddOrUpdateNode(remote_node.Binary(), resource, available);
auto usage = std::vector<double>{1.0};
resource_scheduler.SubtractCPUResourceInstances(usage);
RayConfig::instance().gcs_actor_scheduling_enabled() = false;
RayConfig::instance().scheduler_spread_threshold() = 0.6;
absl::flat_hash_map<std::string, double> require;
require[cpu] = 1.0;
int64_t violations = 0;
bool is_feasible = false;
auto node = resource_scheduler.GetBestSchedulableNode(require, false, true, false,
&violations, &is_feasible);
ASSERT_EQ(node, remote_node.Binary());
RayConfig::instance().gcs_actor_scheduling_enabled() = true;
node = resource_scheduler.GetBestSchedulableNode(require, false, true, false,
&violations, &is_feasible);
ASSERT_EQ(node, local_node.Binary());
}
TEST_F(ClusterResourceSchedulerTest, CustomResourceInstanceTest) { TEST_F(ClusterResourceSchedulerTest, CustomResourceInstanceTest) {
RayConfig::instance().initialize( RayConfig::instance().initialize(
R"( R"(

View file

@ -74,15 +74,10 @@ bool ClusterTaskManager::SchedulePendingTasks() {
RayTask task = work->task; RayTask task = work->task;
RAY_LOG(DEBUG) << "Scheduling pending task " RAY_LOG(DEBUG) << "Scheduling pending task "
<< task.GetTaskSpecification().TaskId(); << task.GetTaskSpecification().TaskId();
auto placement_resources = std::string node_id_string =
task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(); GetBestSchedulableNode(*work,
// This argument is used to set violation, which is an unsupported feature now. /*requires_object_store_memory=*/false,
int64_t _unused; /*force_spillback=*/false, &is_infeasible);
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
placement_resources,
/*requires_object_store_memory=*/false,
task.GetTaskSpecification().IsActorCreationTask(),
/*force_spillback=*/false, &_unused, &is_infeasible);
// There is no node that has available resources to run the request. // There is no node that has available resources to run the request.
// Move on to the next shape. // Move on to the next shape.
@ -406,13 +401,10 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
bool ClusterTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &work, bool ClusterTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &work,
bool &is_infeasible) { bool &is_infeasible) {
const auto &spec = work->task.GetTaskSpecification(); std::string node_id_string =
int64_t _unused; GetBestSchedulableNode(*work,
auto placement_resources = spec.GetRequiredPlacementResources().GetResourceMap(); /*requires_object_store_memory=*/false,
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( /*force_spillback=*/false, &is_infeasible);
placement_resources,
/*requires_object_store_memory=*/false, spec.IsActorCreationTask(),
/*force_spillback=*/false, &_unused, &is_infeasible);
if (is_infeasible || node_id_string == self_node_id_.Binary() || if (is_infeasible || node_id_string == self_node_id_.Binary() ||
node_id_string.empty()) { node_id_string.empty()) {
@ -425,14 +417,14 @@ bool ClusterTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &wor
} }
void ClusterTaskManager::QueueAndScheduleTask( void ClusterTaskManager::QueueAndScheduleTask(
const RayTask &task, rpc::RequestWorkerLeaseReply *reply, const RayTask &task, bool grant_or_reject, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) { rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Queuing and scheduling task " RAY_LOG(DEBUG) << "Queuing and scheduling task "
<< task.GetTaskSpecification().TaskId(); << task.GetTaskSpecification().TaskId();
metric_tasks_queued_++; metric_tasks_queued_++;
auto work = std::make_shared<internal::Work>(task, reply, [send_reply_callback] { auto work = std::make_shared<internal::Work>(
send_reply_callback(Status::OK(), nullptr, nullptr); task, grant_or_reject, reply,
}); [send_reply_callback] { send_reply_callback(Status::OK(), nullptr, nullptr); });
const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass();
// If the scheduling class is infeasible, just add the work to the infeasible queue // If the scheduling class is infeasible, just add the work to the infeasible queue
// directly. // directly.
@ -1042,16 +1034,11 @@ void ClusterTaskManager::TryLocalInfeasibleTaskScheduling() {
RayTask task = work->task; RayTask task = work->task;
RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:" RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:"
<< task.GetTaskSpecification().TaskId(); << task.GetTaskSpecification().TaskId();
auto placement_resources =
task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap();
// This argument is used to set violation, which is an unsupported feature now.
int64_t _unused;
bool is_infeasible; bool is_infeasible;
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( std::string node_id_string =
placement_resources, GetBestSchedulableNode(*work,
/*requires_object_store_memory=*/false, /*requires_object_store_memory=*/false,
task.GetTaskSpecification().IsActorCreationTask(), /*force_spillback=*/false, &is_infeasible);
/*force_spillback=*/false, &_unused, &is_infeasible);
// There is no node that has available resources to run the request. // There is no node that has available resources to run the request.
// Move on to the next shape. // Move on to the next shape.
@ -1150,6 +1137,14 @@ void ClusterTaskManager::Dispatch(
void ClusterTaskManager::Spillback(const NodeID &spillback_to, void ClusterTaskManager::Spillback(const NodeID &spillback_to,
const std::shared_ptr<internal::Work> &work) { const std::shared_ptr<internal::Work> &work) {
auto send_reply_callback = work->callback;
if (work->grant_or_reject) {
work->reply->set_rejected(true);
send_reply_callback();
return;
}
metric_tasks_spilled_++; metric_tasks_spilled_++;
const auto &task = work->task; const auto &task = work->task;
const auto &task_spec = task.GetTaskSpecification(); const auto &task_spec = task.GetTaskSpecification();
@ -1171,11 +1166,6 @@ void ClusterTaskManager::Spillback(const NodeID &spillback_to,
reply->mutable_retry_at_raylet_address()->set_port(node_info_ptr->node_manager_port()); reply->mutable_retry_at_raylet_address()->set_port(node_info_ptr->node_manager_port());
reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary());
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
reply->set_rejected(true);
}
auto send_reply_callback = work->callback;
send_reply_callback(); send_reply_callback();
} }
@ -1321,18 +1311,14 @@ void ClusterTaskManager::SpillWaitingTasks() {
bool force_spillback = task_dependency_manager_.TaskDependenciesBlocked(task_id); bool force_spillback = task_dependency_manager_.TaskDependenciesBlocked(task_id);
RAY_LOG(DEBUG) << "Attempting to spill back waiting task " << task_id RAY_LOG(DEBUG) << "Attempting to spill back waiting task " << task_id
<< " to remote node. Force spillback? " << force_spillback; << " to remote node. Force spillback? " << force_spillback;
auto placement_resources =
task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap();
int64_t _unused;
bool is_infeasible; bool is_infeasible;
// TODO(swang): The policy currently does not account for the amount of // TODO(swang): The policy currently does not account for the amount of
// object store memory availability. Ideally, we should pick the node with // object store memory availability. Ideally, we should pick the node with
// the most memory availability. // the most memory availability.
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( std::string node_id_string =
placement_resources, GetBestSchedulableNode(*(*it),
/*requires_object_store_memory=*/true, /*requires_object_store_memory=*/true,
task.GetTaskSpecification().IsActorCreationTask(), /*force_spillback=*/force_spillback, &is_infeasible);
/*force_spillback=*/force_spillback, &_unused, &is_infeasible);
if (!node_id_string.empty() && node_id_string != self_node_id_.Binary()) { if (!node_id_string.empty() && node_id_string != self_node_id_.Binary()) {
NodeID node_id = NodeID::FromBinary(node_id_string); NodeID node_id = NodeID::FromBinary(node_id_string);
Spillback(node_id, *it); Spillback(node_id, *it);
@ -1399,5 +1385,25 @@ ResourceSet ClusterTaskManager::CalcNormalTaskResources() const {
return ResourceSet(total_normal_task_resources); return ResourceSet(total_normal_task_resources);
} }
std::string ClusterTaskManager::GetBestSchedulableNode(const internal::Work &work,
bool requires_object_store_memory,
bool force_spillback,
bool *is_infeasible) {
// If the local node is available, we should directly return it instead of
// going through the full hybrid policy since we don't want spillback.
if (work.grant_or_reject && !force_spillback && IsLocallySchedulable(work.task)) {
*is_infeasible = false;
return self_node_id_.Binary();
}
// This argument is used to set violation, which is an unsupported feature now.
int64_t _unused;
return cluster_resource_scheduler_->GetBestSchedulableNode(
work.task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(),
requires_object_store_memory,
work.task.GetTaskSpecification().IsActorCreationTask(), force_spillback, &_unused,
is_infeasible);
}
} // namespace raylet } // namespace raylet
} // namespace ray } // namespace ray

View file

@ -67,12 +67,14 @@ enum class UnscheduledWorkCause {
class Work { class Work {
public: public:
RayTask task; RayTask task;
const bool grant_or_reject;
rpc::RequestWorkerLeaseReply *reply; rpc::RequestWorkerLeaseReply *reply;
std::function<void(void)> callback; std::function<void(void)> callback;
std::shared_ptr<TaskResourceInstances> allocated_instances; std::shared_ptr<TaskResourceInstances> allocated_instances;
Work(RayTask task, rpc::RequestWorkerLeaseReply *reply, Work(RayTask task, bool grant_or_reject, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> callback, WorkStatus status = WorkStatus::WAITING) std::function<void(void)> callback, WorkStatus status = WorkStatus::WAITING)
: task(task), : task(task),
grant_or_reject(grant_or_reject),
reply(reply), reply(reply),
callback(callback), callback(callback),
allocated_instances(nullptr), allocated_instances(nullptr),
@ -158,9 +160,12 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// Queue task and schedule. This hanppens when processing the worker lease request. /// Queue task and schedule. This hanppens when processing the worker lease request.
/// ///
/// \param task: The incoming task to be queued and scheduled. /// \param task: The incoming task to be queued and scheduled.
/// \param grant_or_reject: True if we we should either grant or reject the request
/// but no spillback.
/// \param reply: The reply of the lease request. /// \param reply: The reply of the lease request.
/// \param send_reply_callback: The function used during dispatching. /// \param send_reply_callback: The function used during dispatching.
void QueueAndScheduleTask(const RayTask &task, rpc::RequestWorkerLeaseReply *reply, void QueueAndScheduleTask(const RayTask &task, bool grant_or_reject,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override; rpc::SendReplyCallback send_reply_callback) override;
/// Move tasks from waiting to ready for dispatch. Called when a task's /// Move tasks from waiting to ready for dispatch. Called when a task's
@ -293,6 +298,11 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
// queue. // queue.
void SpillWaitingTasks(); void SpillWaitingTasks();
/// Helper method to get the best node for running the task.
std::string GetBestSchedulableNode(const internal::Work &work,
bool requires_object_store_memory,
bool force_spillback, bool *is_infeasible);
const NodeID &self_node_id_; const NodeID &self_node_id_;
/// Responsible for resource tracking/view of the cluster. /// Responsible for resource tracking/view of the cluster.
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_; std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;

View file

@ -106,9 +106,11 @@ class ClusterTaskManagerInterface {
/// Queue task and schedule. This hanppens when processing the worker lease request. /// Queue task and schedule. This hanppens when processing the worker lease request.
/// ///
/// \param task: The incoming task to be queued and scheduled. /// \param task: The incoming task to be queued and scheduled.
/// \param grant_or_reject: True if we we should either grant or reject the request
/// but no spillback.
/// \param reply: The reply of the lease request. /// \param reply: The reply of the lease request.
/// \param send_reply_callback: The function used during dispatching. /// \param send_reply_callback: The function used during dispatching.
virtual void QueueAndScheduleTask(const RayTask &task, virtual void QueueAndScheduleTask(const RayTask &task, bool grant_or_reject,
rpc::RequestWorkerLeaseReply *reply, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0; rpc::SendReplyCallback send_reply_callback) = 0;

View file

@ -334,7 +334,7 @@ TEST_F(ClusterTaskManagerTest, BasicTest) {
*callback_occurred_ptr = true; *callback_occurred_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -390,9 +390,9 @@ TEST_F(ClusterTaskManagerTest, DispatchQueueNonBlockingTest) {
auto empty_callback = [](Status, std::function<void()>, std::function<void()>) {}; auto empty_callback = [](Status, std::function<void()>, std::function<void()>) {};
// Ensure task_A is not at the front of the queue. // Ensure task_A is not at the front of the queue.
task_manager_.QueueAndScheduleTask(task_B_1, &reply_B_1, empty_callback); task_manager_.QueueAndScheduleTask(task_B_1, false, &reply_B_1, empty_callback);
task_manager_.QueueAndScheduleTask(task_A, &reply_A, callback); task_manager_.QueueAndScheduleTask(task_A, false, &reply_A, callback);
task_manager_.QueueAndScheduleTask(task_B_2, &reply_B_2, empty_callback); task_manager_.QueueAndScheduleTask(task_B_2, false, &reply_B_2, empty_callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
// Push a worker that can only run task A. // Push a worker that can only run task A.
@ -431,7 +431,7 @@ TEST_F(ClusterTaskManagerTest, BlockedWorkerDiesTest) {
*callback_occurred_ptr = true; *callback_occurred_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
@ -476,7 +476,7 @@ TEST_F(ClusterTaskManagerTest, BlockedWorkerDies2Test) {
*callback_occurred_ptr = true; *callback_occurred_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
@ -521,7 +521,7 @@ TEST_F(ClusterTaskManagerTest, NoFeasibleNodeTest) {
*callback_called_ptr = true; *callback_called_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(callback_called); ASSERT_FALSE(callback_called);
@ -558,7 +558,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
missing_objects_.insert(missing_arg); missing_objects_.insert(missing_arg);
std::unordered_set<TaskID> expected_subscribed_tasks = { std::unordered_set<TaskID> expected_subscribed_tasks = {
task.GetTaskSpecification().TaskId()}; task.GetTaskSpecification().TaskId()};
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
@ -571,7 +571,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
/* This task can run */ /* This task can run */
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1); auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1);
task_manager_.QueueAndScheduleTask(task2, &reply, callback); task_manager_.QueueAndScheduleTask(task2, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
@ -614,6 +614,61 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
AssertNoLeaks(); AssertNoLeaks();
} }
TEST_F(ClusterTaskManagerTest, TestGrantOrReject) {
std::shared_ptr<MockWorker> worker1 =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
std::shared_ptr<MockWorker> worker2 =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1235);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker1));
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker2));
int num_callbacks = 0;
auto callback = [&](Status, std::function<void()>, std::function<void()>) {
num_callbacks++;
};
auto remote_node_id = NodeID::FromRandom();
AddNode(remote_node_id, 8);
auto task1 = CreateTask({{ray::kCPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply local_reply;
task_manager_.QueueAndScheduleTask(task1, /*grant_or_reject=*/false, &local_reply,
callback);
pool_.TriggerCallbacks();
ASSERT_EQ(num_callbacks, 1);
// The first task was dispatched.
ASSERT_EQ(leased_workers_.size(), 1);
ASSERT_EQ(pool_.workers.size(), 1);
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 1}});
rpc::RequestWorkerLeaseReply spillback_reply;
task_manager_.QueueAndScheduleTask(task2, /*grant_or_reject=*/false, &spillback_reply,
callback);
pool_.TriggerCallbacks();
// The second task was spilled.
ASSERT_EQ(num_callbacks, 2);
ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(),
remote_node_id.Binary());
ASSERT_EQ(leased_workers_.size(), 1);
ASSERT_EQ(pool_.workers.size(), 1);
auto task3 = CreateTask({{ray::kCPU_ResourceLabel, 1}});
task_manager_.QueueAndScheduleTask(task3, /*grant_or_reject=*/true, &local_reply,
callback);
pool_.TriggerCallbacks();
ASSERT_EQ(num_callbacks, 3);
// The third task was dispatched.
ASSERT_EQ(leased_workers_.size(), 2);
ASSERT_EQ(pool_.workers.size(), 0);
while (!leased_workers_.empty()) {
RayTask finished_task;
task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task);
leased_workers_.erase(leased_workers_.begin());
}
AssertNoLeaks();
}
TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) { TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
/* /*
Test the race condition in which a task is assigned to the local node, but Test the race condition in which a task is assigned to the local node, but
@ -634,7 +689,7 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
/* Blocked on starting a worker. */ /* Blocked on starting a worker. */
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}); auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply local_reply; rpc::RequestWorkerLeaseReply local_reply;
task_manager_.QueueAndScheduleTask(task, &local_reply, callback); task_manager_.QueueAndScheduleTask(task, false, &local_reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
@ -642,12 +697,24 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
// Resources are no longer available for the second. // Resources are no longer available for the second.
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}); auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply spillback_reply; rpc::RequestWorkerLeaseReply reject_reply;
task_manager_.QueueAndScheduleTask(task2, &spillback_reply, callback); task_manager_.QueueAndScheduleTask(task2, /*grant_or_reject=*/true, &reject_reply,
callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
// The second task was spilled. // The second task was rejected.
ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(num_callbacks, 1);
ASSERT_TRUE(reject_reply.rejected());
ASSERT_EQ(leased_workers_.size(), 0);
// Resources are no longer available for the third.
auto task3 = CreateTask({{ray::kCPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply spillback_reply;
task_manager_.QueueAndScheduleTask(task3, false, &spillback_reply, callback);
pool_.TriggerCallbacks();
// The third task was spilled.
ASSERT_EQ(num_callbacks, 2);
ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(), ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(),
remote_node_id.Binary()); remote_node_id.Binary());
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -657,8 +724,8 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker));
task_manager_.ScheduleAndDispatchTasks(); task_manager_.ScheduleAndDispatchTasks();
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
// Check that both tasks got removed from the queue. // Check that all tasks got removed from the queue.
ASSERT_EQ(num_callbacks, 2); ASSERT_EQ(num_callbacks, 3);
// The first task was dispatched. // The first task was dispatched.
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
// Leave one alive worker. // Leave one alive worker.
@ -688,7 +755,7 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) {
// RayTask not queued so we can't cancel it. // RayTask not queued so we can't cancel it.
ASSERT_FALSE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); ASSERT_FALSE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId()));
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
// RayTask is now in dispatch queue. // RayTask is now in dispatch queue.
@ -703,7 +770,7 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) {
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker));
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
// RayTask is now running so we can't cancel it. // RayTask is now running so we can't cancel it.
@ -740,7 +807,7 @@ TEST_F(ClusterTaskManagerTest, TaskCancelInfeasibleTask) {
*callback_called_ptr = true; *callback_called_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
// RayTask is now queued so cancellation works. // RayTask is now queued so cancellation works.
@ -782,7 +849,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
*callback_called_ptr = true; *callback_called_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_TRUE(callback_called); ASSERT_TRUE(callback_called);
// Now {CPU: 7, GPU: 4, MEM:128} // Now {CPU: 7, GPU: 4, MEM:128}
@ -799,7 +866,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
*callback_called_ptr = true; *callback_called_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(callback_called); // No worker available. ASSERT_FALSE(callback_called); // No worker available.
// Now {CPU: 7, GPU: 4, MEM:128} with 1 queued task. // Now {CPU: 7, GPU: 4, MEM:128} with 1 queued task.
@ -817,7 +884,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
*callback_called_ptr = true; *callback_called_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(callback_called); // Infeasible. ASSERT_FALSE(callback_called); // Infeasible.
// Now there is also an infeasible task {CPU: 9}. // Now there is also an infeasible task {CPU: 9}.
@ -835,7 +902,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
*callback_called_ptr = true; *callback_called_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(callback_called); // Infeasible. ASSERT_FALSE(callback_called); // Infeasible.
// Now there is also an infeasible task {CPU: 10}. // Now there is also an infeasible task {CPU: 10}.
@ -905,7 +972,7 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
// Don't add the fist task to `to_cancel`. // Don't add the fist task to `to_cancel`.
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 8}}); RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 8}});
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
task_manager_.SetWorkerBacklog(task.GetTaskSpecification().GetSchedulingClass(), task_manager_.SetWorkerBacklog(task.GetTaskSpecification().GetSchedulingClass(),
worker_id_submitting_first_task, 10 - i); worker_id_submitting_first_task, 10 - i);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
@ -913,7 +980,7 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
for (int i = 1; i < 10; i++) { for (int i = 1; i < 10; i++) {
RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 8}}); RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 8}});
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
task_manager_.SetWorkerBacklog(task.GetTaskSpecification().GetSchedulingClass(), task_manager_.SetWorkerBacklog(task.GetTaskSpecification().GetSchedulingClass(),
WorkerID::FromRandom(), 10 - i); WorkerID::FromRandom(), 10 - i);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
@ -995,7 +1062,7 @@ TEST_F(ClusterTaskManagerTest, OwnerDeadTest) {
pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker));
is_owner_alive_ = false; is_owner_alive_ = false;
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
@ -1024,7 +1091,7 @@ TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) {
std::function<void()>) { std::function<void()>) {
*callback_occurred = true; *callback_occurred = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_EQ(announce_infeasible_task_calls_, 1);
@ -1071,7 +1138,7 @@ TEST_F(ClusterTaskManagerTest, TestMultipleInfeasibleTasksWarnOnce) {
std::function<void()>) { std::function<void()>) {
*callback_occurred = true; *callback_occurred = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_EQ(announce_infeasible_task_calls_, 1);
@ -1083,7 +1150,7 @@ TEST_F(ClusterTaskManagerTest, TestMultipleInfeasibleTasksWarnOnce) {
std::function<void()>) { std::function<void()>) {
*callback_occurred2 = true; *callback_occurred2 = true;
}; };
task_manager_.QueueAndScheduleTask(task2, &reply2, callback2); task_manager_.QueueAndScheduleTask(task2, false, &reply2, callback2);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_EQ(announce_infeasible_task_calls_, 1);
} }
@ -1104,7 +1171,7 @@ TEST_F(ClusterTaskManagerTest, TestAnyPendingTasksForResourceAcquisition) {
std::function<void()>) { std::function<void()>) {
*callback_occurred = true; *callback_occurred = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_TRUE(*callback_occurred); ASSERT_TRUE(*callback_occurred);
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
@ -1126,7 +1193,7 @@ TEST_F(ClusterTaskManagerTest, TestAnyPendingTasksForResourceAcquisition) {
std::function<void()>) { std::function<void()>) {
*callback_occurred2 = true; *callback_occurred2 = true;
}; };
task_manager_.QueueAndScheduleTask(task2, &reply2, callback2); task_manager_.QueueAndScheduleTask(task2, false, &reply2, callback2);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_FALSE(*callback_occurred2); ASSERT_FALSE(*callback_occurred2);
ASSERT_TRUE(task_manager_.AnyPendingTasksForResourceAcquisition( ASSERT_TRUE(task_manager_.AnyPendingTasksForResourceAcquisition(
@ -1156,7 +1223,7 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
missing_objects_.insert(missing_arg); missing_objects_.insert(missing_arg);
std::unordered_set<TaskID> expected_subscribed_tasks = { std::unordered_set<TaskID> expected_subscribed_tasks = {
task.GetTaskSpecification().TaskId()}; task.GetTaskSpecification().TaskId()};
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
@ -1196,7 +1263,7 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) {
rpc::RequestWorkerLeaseReply reply1; rpc::RequestWorkerLeaseReply reply1;
bool callback_occurred1 = false; bool callback_occurred1 = false;
task_manager_.QueueAndScheduleTask( task_manager_.QueueAndScheduleTask(
task1, &reply1, task1, false, &reply1,
[&callback_occurred1](Status, std::function<void()>, std::function<void()>) { [&callback_occurred1](Status, std::function<void()>, std::function<void()>) {
callback_occurred1 = true; callback_occurred1 = true;
}); });
@ -1216,7 +1283,7 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) {
rpc::RequestWorkerLeaseReply reply2; rpc::RequestWorkerLeaseReply reply2;
bool callback_occurred2 = false; bool callback_occurred2 = false;
task_manager_.QueueAndScheduleTask( task_manager_.QueueAndScheduleTask(
task2, &reply2, task2, false, &reply2,
[&callback_occurred2](Status, std::function<void()>, std::function<void()>) { [&callback_occurred2](Status, std::function<void()>, std::function<void()>) {
callback_occurred2 = true; callback_occurred2 = true;
}); });
@ -1307,7 +1374,7 @@ TEST_F(ClusterTaskManagerTest, TestSpillWaitingTasks) {
auto missing_arg = task.GetTaskSpecification().GetDependencyIds()[0]; auto missing_arg = task.GetTaskSpecification().GetDependencyIds()[0];
missing_objects_.insert(missing_arg); missing_objects_.insert(missing_arg);
} }
task_manager_.QueueAndScheduleTask(task, replies[i].get(), callback); task_manager_.QueueAndScheduleTask(task, false, replies[i].get(), callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
} }
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
@ -1387,7 +1454,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsMemoryTest) {
// This task can run. // This task can run.
default_arg_size_ = 600; default_arg_size_ = 600;
auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1); auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1);
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
@ -1396,7 +1463,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsMemoryTest) {
// This task cannot run because it would put us over the memory threshold. // This task cannot run because it would put us over the memory threshold.
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1); auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1);
task_manager_.QueueAndScheduleTask(task2, &reply, callback); task_manager_.QueueAndScheduleTask(task2, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
@ -1441,7 +1508,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsSameMemoryTest) {
// This task can run. // This task can run.
default_arg_size_ = 600; default_arg_size_ = 600;
auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1); auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1);
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
@ -1451,7 +1518,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsSameMemoryTest) {
// This task can run because it depends on the same object as the first task. // This task can run because it depends on the same object as the first task.
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1, auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1,
task.GetTaskSpecification().GetDependencyIds()); task.GetTaskSpecification().GetDependencyIds());
task_manager_.QueueAndScheduleTask(task2, &reply, callback); task_manager_.QueueAndScheduleTask(task2, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(num_callbacks, 2); ASSERT_EQ(num_callbacks, 2);
ASSERT_EQ(leased_workers_.size(), 2); ASSERT_EQ(leased_workers_.size(), 2);
@ -1480,7 +1547,7 @@ TEST_F(ClusterTaskManagerTest, LargeArgsNoStarvationTest) {
default_arg_size_ = 2000; default_arg_size_ = 2000;
auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1); auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1);
pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::static_pointer_cast<WorkerInterface>(worker));
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
@ -1524,7 +1591,7 @@ TEST_F(ClusterTaskManagerTest, PopWorkerExactlyOnce) {
*callback_occurred_ptr = true; *callback_occurred_ptr = true;
}; };
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
// Make sure callback doesn't occurred. // Make sure callback doesn't occurred.
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
@ -1579,7 +1646,7 @@ TEST_F(ClusterTaskManagerTestWithoutCPUsAtHead, OneCpuInfeasibleTask) {
for (int i = 0; i < num_cases; ++i) { for (int i = 0; i < num_cases; ++i) {
RayTask task = CreateTask({{ray::kCPU_ResourceLabel, cpu_request[i]}}); RayTask task = CreateTask({{ray::kCPU_ResourceLabel, cpu_request[i]}});
task_manager_.QueueAndScheduleTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, false, &reply, callback);
pool_.TriggerCallbacks(); pool_.TriggerCallbacks();
// The task cannot run because there is only 1 node (head) with 0 CPU. // The task cannot run because there is only 1 node (head) with 0 CPU.

View file

@ -36,20 +36,16 @@ bool DoesNodeHaveGPUs(const NodeResources &resources) {
} }
} // namespace } // namespace
int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request, int64_t SchedulingPolicy::HybridPolicyWithFilter(
const int64_t local_node_id, const ResourceRequest &resource_request, bool force_spillback, bool require_available,
const absl::flat_hash_map<int64_t, Node> &nodes, std::function<bool(int64_t)> is_node_available, NodeFilter node_filter) {
float spread_threshold, bool force_spillback,
bool require_available,
std::function<bool(int64_t)> is_node_available,
NodeFilter node_filter) {
// Step 1: Generate the traversal order. We guarantee that the first node is local, to // Step 1: Generate the traversal order. We guarantee that the first node is local, to
// encourage local scheduling. The rest of the traversal order should be globally // encourage local scheduling. The rest of the traversal order should be globally
// consistent, to encourage using "warm" workers. // consistent, to encourage using "warm" workers.
std::vector<int64_t> round; std::vector<int64_t> round;
round.reserve(nodes.size()); round.reserve(nodes_.size());
const auto local_it = nodes.find(local_node_id); const auto local_it = nodes_.find(local_node_id_);
RAY_CHECK(local_it != nodes.end()); RAY_CHECK(local_it != nodes_.end());
auto predicate = [node_filter, &is_node_available]( auto predicate = [node_filter, &is_node_available](
int64_t node_id, const NodeResources &node_resources) { int64_t node_id, const NodeResources &node_resources) {
if (!is_node_available(node_id)) { if (!is_node_available(node_id)) {
@ -71,13 +67,13 @@ int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request,
// so that // so that
// 1. It's first in traversal order. // 1. It's first in traversal order.
// 2. It's easy to avoid sorting it. // 2. It's easy to avoid sorting it.
if (predicate(local_node_id, local_node_view) && !force_spillback) { if (predicate(local_node_id_, local_node_view) && !force_spillback) {
round.push_back(local_node_id); round.push_back(local_node_id_);
} }
const auto start_index = round.size(); const auto start_index = round.size();
for (const auto &pair : nodes) { for (const auto &pair : nodes_) {
if (pair.first != local_node_id && if (pair.first != local_node_id_ &&
predicate(pair.first, pair.second.GetLocalView())) { predicate(pair.first, pair.second.GetLocalView())) {
round.push_back(pair.first); round.push_back(pair.first);
} }
@ -94,15 +90,15 @@ int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request,
auto round_it = round.begin(); auto round_it = round.begin();
for (; round_it != round.end(); round_it++) { for (; round_it != round.end(); round_it++) {
const auto &node_id = *round_it; const auto &node_id = *round_it;
const auto &it = nodes.find(node_id); const auto &it = nodes_.find(node_id);
RAY_CHECK(it != nodes.end()); RAY_CHECK(it != nodes_.end());
const auto &node = it->second; const auto &node = it->second;
if (!node.GetLocalView().IsFeasible(resource_request)) { if (!node.GetLocalView().IsFeasible(resource_request)) {
continue; continue;
} }
bool ignore_pull_manager_at_capacity = false; bool ignore_pull_manager_at_capacity = false;
if (node_id == local_node_id) { if (node_id == local_node_id_) {
// It's okay if the local node's pull manager is at // It's okay if the local node's pull manager is at
// capacity because we will eventually spill the task // capacity because we will eventually spill the task
// back from the waiting queue if its args cannot be // back from the waiting queue if its args cannot be
@ -119,7 +115,7 @@ int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request,
<< " with critical resource utilization " << " with critical resource utilization "
<< critical_resource_utilization << " based on local view " << critical_resource_utilization << " based on local view "
<< node.GetLocalView().DebugString(StringIdMap()); << node.GetLocalView().DebugString(StringIdMap());
if (critical_resource_utilization < spread_threshold) { if (critical_resource_utilization < spread_threshold_) {
critical_resource_utilization = 0; critical_resource_utilization = 0;
} }
@ -150,29 +146,27 @@ int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request,
return best_node_id; return best_node_id;
} }
int64_t HybridPolicy(const ResourceRequest &resource_request, const int64_t local_node_id, int64_t SchedulingPolicy::HybridPolicy(const ResourceRequest &resource_request,
const absl::flat_hash_map<int64_t, Node> &nodes, bool force_spillback, bool require_available,
float spread_threshold, bool force_spillback, bool require_available, std::function<bool(int64_t)> is_node_available,
std::function<bool(int64_t)> is_node_available, bool scheduler_avoid_gpu_nodes) {
bool scheduler_avoid_gpu_nodes) {
if (!scheduler_avoid_gpu_nodes || IsGPURequest(resource_request)) { if (!scheduler_avoid_gpu_nodes || IsGPURequest(resource_request)) {
return HybridPolicyWithFilter(resource_request, local_node_id, nodes, return HybridPolicyWithFilter(resource_request, force_spillback, require_available,
spread_threshold, force_spillback, require_available,
std::move(is_node_available)); std::move(is_node_available));
} }
// Try schedule on non-GPU nodes. // Try schedule on non-GPU nodes.
auto best_node_id = HybridPolicyWithFilter( auto best_node_id = HybridPolicyWithFilter(resource_request, force_spillback,
resource_request, local_node_id, nodes, spread_threshold, force_spillback, /*require_available*/ true,
/*require_available*/ true, is_node_available, NodeFilter::kNonGpu); is_node_available, NodeFilter::kNonGpu);
if (best_node_id != -1) { if (best_node_id != -1) {
return best_node_id; return best_node_id;
} }
// If we cannot find any available node from non-gpu nodes, fallback to the original // If we cannot find any available node from non-gpu nodes, fallback to the original
// scheduling // scheduling
return HybridPolicyWithFilter(resource_request, local_node_id, nodes, spread_threshold, return HybridPolicyWithFilter(resource_request, force_spillback, require_available,
force_spillback, require_available, is_node_available); is_node_available);
} }
} // namespace raylet_scheduling_policy } // namespace raylet_scheduling_policy

View file

@ -21,77 +21,82 @@
namespace ray { namespace ray {
namespace raylet_scheduling_policy { namespace raylet_scheduling_policy {
/// This scheduling policy was designed with the following assumptions in mind: class SchedulingPolicy {
/// 1. Scheduling a task on a new node incurs a cold start penalty (warming the worker public:
/// pool). SchedulingPolicy(int64_t local_node_id, const absl::flat_hash_map<int64_t, Node> &nodes,
/// 2. Past a certain utilization threshold, a big noisy neighbor problem occurs (caused float spread_threshold)
/// by object spilling). : local_node_id_(local_node_id),
/// 3. Locality is helpful, but generally outweighed by (1) and (2). nodes_(nodes),
/// spread_threshold_(spread_threshold) {}
/// In order to solve these problems, we use the following scheduling policy.
/// 1. Generate a traversal.
/// 2. Run a priority scheduler.
///
/// A node's priorities are determined by the following factors:
/// * Always skip infeasible nodes
/// * Always prefer available nodes over feasible nodes.
/// * Break ties in available/feasible by critical resource utilization.
/// * Critical resource utilization below a threshold should be truncated to 0.
///
/// The traversal order should:
/// * Prioritize the local node above all others.
/// * All other nodes should have a globally fixed priority across the cluster.
///
/// We call this a hybrid policy because below the threshold, the traversal and truncation
/// properties will lead to packing of nodes. Above the threshold, the policy will act
/// like a traditional weighted round robin.
///
/// \param resource_request: The resource request we're attempting to schedule.
/// \param local_node_id: The id of the local node, which is needed for traversal order.
/// \param nodes: The summary view of all the nodes that can be scheduled on.
/// \param spread_threshold: Below this threshold, critical resource utilization will be
/// truncated to 0.
/// \param scheduler_avoid_gpu_nodes: if set, we would try scheduling
/// CPU-only requests on CPU-only nodes, and will fallback to scheduling on GPU nodes if
/// needed.
///
/// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to
/// schedule on.
int64_t HybridPolicy(
const ResourceRequest &resource_request, const int64_t local_node_id,
const absl::flat_hash_map<int64_t, Node> &nodes, float spread_threshold,
bool force_spillback, bool require_available,
std::function<bool(int64_t)> is_node_available,
bool scheduler_avoid_gpu_nodes = RayConfig::instance().scheduler_avoid_gpu_nodes());
enum class NodeFilter { /// This scheduling policy was designed with the following assumptions in mind:
/// Default scheduling. /// 1. Scheduling a task on a new node incurs a cold start penalty (warming the worker
kAny, /// pool).
/// Schedule on GPU only nodes. /// 2. Past a certain utilization threshold, a big noisy neighbor problem occurs
kGPU, /// (caused by object spilling).
/// Schedule on nodes that don't have GPU. Since GPUs are more scarce resources, we need /// 3. Locality is helpful, but generally outweighed by (1) and (2).
/// special handling for this. ///
kNonGpu /// In order to solve these problems, we use the following scheduling policy.
/// 1. Generate a traversal.
/// 2. Run a priority scheduler.
///
/// A node's priorities are determined by the following factors:
/// * Always skip infeasible nodes
/// * Always prefer available nodes over feasible nodes.
/// * Break ties in available/feasible by critical resource utilization.
/// * Critical resource utilization below a threshold should be truncated to 0.
///
/// The traversal order should:
/// * Prioritize the local node above all others.
/// * All other nodes should have a globally fixed priority across the cluster.
///
/// We call this a hybrid policy because below the threshold, the traversal and
/// truncation properties will lead to packing of nodes. Above the threshold, the policy
/// will act like a traditional weighted round robin.
///
/// \param resource_request: The resource request we're attempting to schedule.
/// \param scheduler_avoid_gpu_nodes: if set, we would try scheduling
/// CPU-only requests on CPU-only nodes, and will fallback to scheduling on GPU nodes if
/// needed.
///
/// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to
/// schedule on.
int64_t HybridPolicy(
const ResourceRequest &resource_request, bool force_spillback,
bool require_available, std::function<bool(int64_t)> is_node_available,
bool scheduler_avoid_gpu_nodes = RayConfig::instance().scheduler_avoid_gpu_nodes());
private:
/// Identifier of local node.
const int64_t local_node_id_;
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
const absl::flat_hash_map<int64_t, Node> &nodes_;
/// The threshold at which to switch from packing to spreading.
const float spread_threshold_;
enum class NodeFilter {
/// Default scheduling.
kAny,
/// Schedule on GPU only nodes.
kGPU,
/// Schedule on nodes that don't have GPU. Since GPUs are more scarce resources, we
/// need
/// special handling for this.
kNonGpu
};
/// \param resource_request: The resource request we're attempting to schedule.
/// \param node_filter: defines the subset of nodes were are allowed to schedule on.
/// can be one of kAny (can schedule on all nodes), kGPU (can only schedule on kGPU
/// nodes), kNonGpu (can only schedule on non-GPU nodes.
///
/// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to
/// schedule on.
int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request,
bool force_spillback, bool require_available,
std::function<bool(int64_t)> is_node_available,
NodeFilter node_filter = NodeFilter::kAny);
}; };
/// \param resource_request: The resource request we're attempting to schedule.
/// \param local_node_id: The id of the local node, which is needed for traversal order.
/// \param nodes: The summary view of all the nodes that can be scheduled on.
/// \param spread_threshold: Below this threshold, critical resource utilization will be
/// truncated to 0.
/// \param node_filter: defines the subset of nodes were are allowed to schedule on.
/// can be one of kAny (can schedule on all nodes), kGPU (can only schedule on kGPU
/// nodes), kNonGpu (can only schedule on non-GPU nodes.
///
/// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to
/// schedule on.
int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request,
const int64_t local_node_id,
const absl::flat_hash_map<int64_t, Node> &nodes,
float spread_threshold, bool force_spillback,
bool require_available,
std::function<bool(int64_t)> is_node_available,
NodeFilter node_filter = NodeFilter::kAny);
} // namespace raylet_scheduling_policy } // namespace raylet_scheduling_policy
} // namespace ray } // namespace ray

View file

@ -133,8 +133,8 @@ TEST_F(SchedulingPolicyTest, AvailableTruncationTest) {
nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 0)); nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(0.75, 2, 0, 0, 0, 0)); nodes.emplace(remote_node, CreateNodeResources(0.75, 2, 0, 0, 0, 0));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, false, false, [](auto) { return true; }); .HybridPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node); ASSERT_EQ(to_schedule, local_node);
} }
@ -150,8 +150,8 @@ TEST_F(SchedulingPolicyTest, AvailableTieBreakTest) {
nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 0)); nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(1.5, 2, 0, 0, 0, 0)); nodes.emplace(remote_node, CreateNodeResources(1.5, 2, 0, 0, 0, 0));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.50)
req, local_node, nodes, 0.50, false, false, [](auto) { return true; }); .HybridPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node); ASSERT_EQ(to_schedule, remote_node);
} }
@ -169,8 +169,8 @@ TEST_F(SchedulingPolicyTest, AvailableOverFeasibleTest) {
nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 0, 1)); nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 0, 1));
nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 1, 1)); nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 1, 1));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.50)
req, local_node, nodes, 0.50, false, false, [](auto) { return true; }); .HybridPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node); ASSERT_EQ(to_schedule, remote_node);
} }
@ -186,8 +186,8 @@ TEST_F(SchedulingPolicyTest, InfeasibleTest) {
nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 0, 0)); nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 0, 0)); nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 0, 0));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.50)
req, local_node, nodes, 0.50, false, false, [](auto) { return true; }); .HybridPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, -1); ASSERT_EQ(to_schedule, -1);
} }
@ -202,8 +202,8 @@ TEST_F(SchedulingPolicyTest, BarelyFeasibleTest) {
absl::flat_hash_map<int64_t, Node> nodes; absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(0, 1, 0, 0, 0, 1)); nodes.emplace(local_node, CreateNodeResources(0, 1, 0, 0, 0, 1));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.50)
req, local_node, nodes, 0.50, false, false, [](auto) { return true; }); .HybridPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node); ASSERT_EQ(to_schedule, local_node);
} }
@ -220,8 +220,8 @@ TEST_F(SchedulingPolicyTest, TruncationAcrossFeasibleNodesTest) {
nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 1)); nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 1));
nodes.emplace(remote_node, CreateNodeResources(0.75, 2, 0, 0, 0, 1)); nodes.emplace(remote_node, CreateNodeResources(0.75, 2, 0, 0, 0, 1));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, false, false, [](auto) { return true; }); .HybridPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node); ASSERT_EQ(to_schedule, local_node);
} }
@ -238,8 +238,8 @@ TEST_F(SchedulingPolicyTest, ForceSpillbackIfAvailableTest) {
nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1)); nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1));
nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 1, 10)); nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 1, 10));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, true, true, [](auto) { return true; }); .HybridPolicy(req, true, true, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node); ASSERT_EQ(to_schedule, remote_node);
} }
@ -257,31 +257,39 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) {
// non GPU, and the remote node does not have GPUs, thus // non GPU, and the remote node does not have GPUs, thus
// we should schedule on remote node. // we should schedule on remote node.
const ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); const ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
const int to_schedule = raylet_scheduling_policy::HybridPolicy( const int to_schedule =
ResourceMapToResourceRequest(map, {{"CPU", 1}}, false), local_node, nodes, 0.51, raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
false, true, [](auto) { return true; }, true); .HybridPolicy(
ResourceMapToResourceRequest(map, {{"CPU", 1}}, false), false, true,
[](auto) { return true; }, true);
ASSERT_EQ(to_schedule, remote_node); ASSERT_EQ(to_schedule, remote_node);
} }
{ {
// A GPU request should be scheduled on a GPU node. // A GPU request should be scheduled on a GPU node.
const ResourceRequest req = ResourceMapToResourceRequest(map, {{"GPU", 1}}, false); const ResourceRequest req = ResourceMapToResourceRequest(map, {{"GPU", 1}}, false);
const int to_schedule = raylet_scheduling_policy::HybridPolicy( const int to_schedule =
req, local_node, nodes, 0.51, false, true, [](auto) { return true; }, true); raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
.HybridPolicy(
req, false, true, [](auto) { return true; }, true);
ASSERT_EQ(to_schedule, local_node); ASSERT_EQ(to_schedule, local_node);
} }
{ {
// A CPU request can be be scheduled on a CPU node. // A CPU request can be be scheduled on a CPU node.
const ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); const ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
const int to_schedule = raylet_scheduling_policy::HybridPolicy( const int to_schedule =
req, local_node, nodes, 0.51, false, true, [](auto) { return true; }, true); raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
.HybridPolicy(
req, false, true, [](auto) { return true; }, true);
ASSERT_EQ(to_schedule, remote_node); ASSERT_EQ(to_schedule, remote_node);
} }
{ {
// A mixed CPU/GPU request should be scheduled on a GPU node. // A mixed CPU/GPU request should be scheduled on a GPU node.
const ResourceRequest req = const ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
const int to_schedule = raylet_scheduling_policy::HybridPolicy( const int to_schedule =
req, local_node, nodes, 0.51, false, true, [](auto) { return true; }, true); raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
.HybridPolicy(
req, false, true, [](auto) { return true; }, true);
ASSERT_EQ(to_schedule, local_node); ASSERT_EQ(to_schedule, local_node);
} }
} }
@ -298,8 +306,10 @@ TEST_F(SchedulingPolicyTest, SchedulenCPURequestsOnGPUNodeAsALastResort) {
nodes.emplace(local_node, CreateNodeResources(0, 10, 0, 0, 0, 0)); nodes.emplace(local_node, CreateNodeResources(0, 10, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(1, 1, 0, 0, 1, 1)); nodes.emplace(remote_node, CreateNodeResources(1, 1, 0, 0, 1, 1));
const int to_schedule = raylet_scheduling_policy::HybridPolicy( const int to_schedule =
req, local_node, nodes, 0.51, false, true, [](auto) { return true; }, true); raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
.HybridPolicy(
req, false, true, [](auto) { return true; }, true);
ASSERT_EQ(to_schedule, remote_node); ASSERT_EQ(to_schedule, remote_node);
} }
@ -315,8 +325,8 @@ TEST_F(SchedulingPolicyTest, ForceSpillbackTest) {
nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1)); nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1));
nodes.emplace(remote_node, CreateNodeResources(0, 2, 0, 0, 0, 1)); nodes.emplace(remote_node, CreateNodeResources(0, 2, 0, 0, 0, 1));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, true, false, [](auto) { return true; }); .HybridPolicy(req, true, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node); ASSERT_EQ(to_schedule, remote_node);
} }
@ -333,8 +343,8 @@ TEST_F(SchedulingPolicyTest, ForceSpillbackOnlyFeasibleLocallyTest) {
nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1)); nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1));
nodes.emplace(remote_node, CreateNodeResources(0, 2, 0, 0, 0, 0)); nodes.emplace(remote_node, CreateNodeResources(0, 2, 0, 0, 0, 0));
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, true, false, [](auto) { return true; }); .HybridPolicy(req, true, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, -1); ASSERT_EQ(to_schedule, -1);
} }
@ -354,27 +364,31 @@ TEST_F(SchedulingPolicyTest, NonGpuNodePreferredSchedulingTest) {
nodes.emplace(remote_node_2, CreateNodeResources(3, 3, 0, 0, 0, 0)); nodes.emplace(remote_node_2, CreateNodeResources(3, 3, 0, 0, 0, 0));
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false); ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int to_schedule = raylet_scheduling_policy::HybridPolicy( int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, false, true, [](auto) { return true; }, .HybridPolicy(
/*gpu_avoid_scheduling*/ true); req, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_1); ASSERT_EQ(to_schedule, remote_node_1);
req = ResourceMapToResourceRequest(map, {{"CPU", 3}}, false); req = ResourceMapToResourceRequest(map, {{"CPU", 3}}, false);
to_schedule = raylet_scheduling_policy::HybridPolicy( to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, false, true, [](auto) { return true; }, .HybridPolicy(
/*gpu_avoid_scheduling*/ true); req, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_2); ASSERT_EQ(to_schedule, remote_node_2);
req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false); req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
to_schedule = raylet_scheduling_policy::HybridPolicy( to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, false, true, [](auto) { return true; }, .HybridPolicy(
/*gpu_avoid_scheduling*/ true); req, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, local_node); ASSERT_EQ(to_schedule, local_node);
req = ResourceMapToResourceRequest(map, {{"CPU", 2}}, false); req = ResourceMapToResourceRequest(map, {{"CPU", 2}}, false);
to_schedule = raylet_scheduling_policy::HybridPolicy( to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes, 0.51)
req, local_node, nodes, 0.51, false, true, [](auto) { return true; }, .HybridPolicy(
/*gpu_avoid_scheduling*/ true); req, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_1); ASSERT_EQ(to_schedule, remote_node_1);
} }

View file

@ -298,7 +298,7 @@ Status raylet::RayletClient::FreeObjects(const std::vector<ObjectID> &object_ids
} }
void raylet::RayletClient::RequestWorkerLease( void raylet::RayletClient::RequestWorkerLease(
const rpc::TaskSpec &task_spec, const rpc::TaskSpec &task_spec, bool grant_or_reject,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback, const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size) { const int64_t backlog_size) {
google::protobuf::Arena arena; google::protobuf::Arena arena;
@ -310,6 +310,7 @@ void raylet::RayletClient::RequestWorkerLease(
// used any more. // used any more.
request->unsafe_arena_set_allocated_resource_spec( request->unsafe_arena_set_allocated_resource_spec(
const_cast<rpc::TaskSpec *>(&task_spec)); const_cast<rpc::TaskSpec *>(&task_spec));
request->set_grant_or_reject(grant_or_reject);
request->set_backlog_size(backlog_size); request->set_backlog_size(backlog_size);
grpc_client_->RequestWorkerLease(*request, callback); grpc_client_->RequestWorkerLease(*request, callback);
} }

View file

@ -62,14 +62,16 @@ class WorkerLeaseInterface {
public: public:
/// Requests a worker from the raylet. The callback will be sent via gRPC. /// Requests a worker from the raylet. The callback will be sent via gRPC.
/// \param resource_spec Resources that should be allocated for the worker. /// \param resource_spec Resources that should be allocated for the worker.
/// \param grant_or_reject: True if we we should either grant or reject the request
/// but no spillback.
/// \param callback: The callback to call when the request finishes.
/// \param backlog_size The queue length for the given shape on the CoreWorker. /// \param backlog_size The queue length for the given shape on the CoreWorker.
/// \return ray::Status
virtual void RequestWorkerLease( virtual void RequestWorkerLease(
const ray::TaskSpecification &resource_spec, const ray::TaskSpecification &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size = -1) = 0; const int64_t backlog_size = -1) = 0;
virtual void RequestWorkerLease( virtual void RequestWorkerLease(
const rpc::TaskSpec &task_spec, const rpc::TaskSpec &task_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size = -1) = 0; const int64_t backlog_size = -1) = 0;
@ -373,14 +375,15 @@ class RayletClient : public RayletClientInterface {
/// Implements WorkerLeaseInterface. /// Implements WorkerLeaseInterface.
void RequestWorkerLease( void RequestWorkerLease(
const ray::TaskSpecification &resource_spec, const ray::TaskSpecification &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size) override { const int64_t backlog_size) override {
RequestWorkerLease(resource_spec.GetMessage(), callback, backlog_size); RequestWorkerLease(resource_spec.GetMessage(), grant_or_reject, callback,
backlog_size);
} }
void RequestWorkerLease( void RequestWorkerLease(
const rpc::TaskSpec &resource_spec, const rpc::TaskSpec &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback, const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size) override; const int64_t backlog_size) override;