[core] Do not spill back tasks blocked on args to blocked nodes (#16488)

This commit is contained in:
Stephanie Wang 2021-07-20 17:13:02 -07:00 committed by GitHub
parent f4f702c595
commit dad8db46e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 288 additions and 102 deletions

View file

@ -1889,6 +1889,19 @@ cdef class CoreWorker:
# so all the serialization and validation is done in one place
return json.dumps(result_dict, sort_keys=True)
def get_task_submission_stats(self):
cdef:
int64_t num_tasks_submitted
int64_t num_leases_requested
with nogil:
num_tasks_submitted = (
CCoreWorkerProcess.GetCoreWorker().GetNumTasksSubmitted())
num_leases_requested = (
CCoreWorkerProcess.GetCoreWorker().GetNumLeasesRequested())
return (num_tasks_submitted, num_leases_requested)
cdef void async_callback(shared_ptr[CRayObject] obj,
CObjectID object_ref,
void *user_callback) with gil:

View file

@ -244,6 +244,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_bool IsExiting() const
int64_t GetNumTasksSubmitted() const
int64_t GetNumLeasesRequested() const
cdef cppclass CCoreWorkerOptions "ray::CoreWorkerOptions":
CWorkerType worker_type
CLanguage language

View file

@ -139,8 +139,12 @@ def store_stats_summary(reply):
int(reply.store_stats.restored_bytes_total / (1024 * 1024) /
reply.store_stats.restore_time_total_s)))
if reply.store_stats.consumed_bytes > 0:
store_summary += ("Objects consumed by Ray tasks: {} MiB.".format(
store_summary += ("Objects consumed by Ray tasks: {} MiB.\n".format(
int(reply.store_stats.consumed_bytes / (1024 * 1024))))
if reply.store_stats.object_pulls_queued:
store_summary += (
"Object fetches queued, waiting for available memory.")
return store_summary

View file

@ -10,12 +10,13 @@ import numpy as np
import pytest
import ray
from ray.internal.internal_api import memory_summary
import ray.util.accelerators
import ray.cluster_utils
import ray.test_utils
from ray.test_utils import (wait_for_condition, new_scheduler_enabled,
Semaphore, object_memory_usage)
Semaphore, object_memory_usage, SignalActor)
logger = logging.getLogger(__name__)
@ -446,16 +447,18 @@ def test_lease_request_leak(shutdown_only):
@pytest.mark.skipif(sys.platform == "win32", reason="Fails on windows")
def test_many_args(ray_start_cluster):
# This test ensures that a task will run where its task dependencies are
# located, even when those objects are borrowed.
cluster = ray_start_cluster
object_size = int(1e6)
# Disable worker caching so worker leases are not reused, and disable
# inlining of return objects so return objects are always put into Plasma.
for _ in range(4):
cluster.add_node(
num_cpus=1, object_store_memory=(4 * object_size * 25))
cluster.add_node(
num_cpus=1,
_system_config={
# Lower this to prevent excessive delays in pull retries.
"object_manager_pull_timeout_ms": 100,
"debug_dump_period_milliseconds": 1000,
},
object_store_memory=int(1e8))
for _ in range(3):
cluster.add_node(num_cpus=1, object_store_memory=int(1e8))
ray.init(address=cluster.address)
@ray.remote
@ -467,15 +470,54 @@ def test_many_args(ray_start_cluster):
def put():
return np.zeros(object_size, dtype=np.uint8)
xs = [put.remote() for _ in range(100)]
xs = [put.remote() for _ in range(200)]
ray.wait(xs, num_returns=len(xs), fetch_local=False)
num_tasks_submitted_before, num_leases_requested_before = (
ray.worker.global_worker.core_worker.get_task_submission_stats())
tasks = []
for i in range(100):
args = [np.random.choice(xs) for _ in range(25)]
args = [np.random.choice(xs) for _ in range(10)]
tasks.append(f.remote(i, *args))
ray.get(tasks, timeout=30)
num_tasks_submitted, num_leases_requested = (
ray.worker.global_worker.core_worker.get_task_submission_stats())
num_tasks_submitted -= num_tasks_submitted_before
num_leases_requested -= num_leases_requested_before
print("submitted:", num_tasks_submitted, "leases requested:",
num_leases_requested)
assert num_tasks_submitted == 100
assert num_leases_requested <= 10 * num_tasks_submitted
def test_pull_manager_at_capacity_reports(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0, object_store_memory=int(1e8))
ray.init(address=cluster.address)
cluster.add_node(num_cpus=1, object_store_memory=int(1e8))
object_size = int(1e7)
refs = []
for _ in range(20):
refs.append(ray.put(np.zeros(object_size, dtype=np.uint8)))
def fetches_queued():
return "fetches queued" in memory_summary(stats_only=True)
assert not fetches_queued()
@ray.remote
def f(s, ref):
ray.get(s.wait.remote())
signal = SignalActor.remote()
xs = [f.remote(signal, ref) for ref in refs]
wait_for_condition(fetches_queued)
signal.send.remote()
ray.get(xs)
wait_for_condition(lambda: not fetches_queued())
if __name__ == "__main__":
import pytest

View file

@ -840,6 +840,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Create a profile event with a reference to the core worker's profiler.
std::unique_ptr<worker::ProfileEvent> CreateProfileEvent(const std::string &event_type);
int64_t GetNumTasksSubmitted() const {
return direct_task_submitter_->GetNumTasksSubmitted();
}
int64_t GetNumLeasesRequested() const {
return direct_task_submitter_->GetNumLeasesRequested();
}
public:
/// Allocate the return object for an executing task. The caller should write into the
/// data buffer of the allocated buffer, then call SealReturnObject() to seal it.

View file

@ -20,6 +20,7 @@ namespace ray {
Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(DEBUG) << "Submit task " << task_spec.TaskId();
num_tasks_submitted_++;
if (task_spec.IsActorCreationTask()) {
// Synchronously register the actor to GCS server.
@ -475,6 +476,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
// Create a TaskSpecification with an overwritten TaskID to make sure we don't reuse the
// same TaskID to request a worker
num_leases_requested_++;
auto resource_spec_msg = scheduling_key_entry.resource_spec.GetMutableMessage();
resource_spec_msg.set_task_id(TaskID::ForFakeTask().Binary());
TaskSpecification resource_spec = TaskSpecification(resource_spec_msg);

View file

@ -99,6 +99,13 @@ class CoreWorkerDirectTaskSubmitter {
return scheduling_key_entries_.empty();
}
int64_t GetNumTasksSubmitted() const { return num_tasks_submitted_; }
int64_t GetNumLeasesRequested() {
absl::MutexLock lock(&mu_);
return num_leases_requested_;
}
private:
/// Schedule more work onto an idle worker or return it back to the raylet if
/// no more tasks are queued for submission. If an error was encountered
@ -347,6 +354,9 @@ class CoreWorkerDirectTaskSubmitter {
// Retries cancelation requests if they were not successful.
absl::optional<boost::asio::steady_timer> cancel_retry_timer_;
int64_t num_tasks_submitted_ = 0;
int64_t num_leases_requested_ GUARDED_BY(mu_) = 0;
};
}; // namespace ray

View file

@ -945,6 +945,7 @@ void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const {
stats->set_object_store_bytes_avail(config_.object_store_memory);
stats->set_num_local_objects(local_objects_.size());
stats->set_consumed_bytes(plasma::plasma_store_runner->GetConsumedBytes());
stats->set_object_pulls_queued(pull_manager_->HasPullsQueued());
}
void ObjectManager::Tick(const boost::system::error_code &e) {

View file

@ -299,6 +299,8 @@ class ObjectManager : public ObjectManagerInterface,
return static_cast<double>(used_memory_) / config_.object_store_memory;
}
bool PullManagerHasPullsQueued() const { return pull_manager_->HasPullsQueued(); }
private:
friend class TestObjectManager;

View file

@ -238,6 +238,11 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(int64_t num_bytes_available)
RAY_LOG(DEBUG) << "Updating pulls based on available memory: " << num_bytes_available;
}
num_bytes_available_ = num_bytes_available;
// Assume that initially we have enough capacity for all
// pulls. This will get set to true if there is at least one
// bundle request that we cannot activate due to lack of
// space.
std::vector<ObjectID> objects_to_pull;
std::unordered_set<ObjectID> object_ids_to_cancel;
// If there are any get requests (highest priority), try to activate them. Since we
@ -666,6 +671,11 @@ bool PullManager::PullRequestActiveOrWaitingForMetadata(uint64_t request_id) con
return bundle_it->second.num_object_sizes_missing > 0;
}
bool PullManager::HasPullsQueued() const {
absl::MutexLock lock(&active_objects_mu_);
return active_object_pull_requests_.size() != object_pull_requests_.size();
}
std::string PullManager::BundleInfo(const Queue &bundles,
uint64_t highest_id_being_pulled) const {
auto it = bundles.begin();

View file

@ -134,6 +134,11 @@ class PullManager {
/// earlier request is waiting for metadata.
bool PullRequestActiveOrWaitingForMetadata(uint64_t request_id) const;
/// Whether we are have requests queued that are not currently active. This
/// can happen when we are at capacity in the object store or temporarily, if
/// there are object sizes missing.
bool HasPullsQueued() const;
std::string DebugString() const;
/// Returns the number of bytes of quota remaining. When this is less than zero,

View file

@ -396,6 +396,7 @@ TEST_P(PullManagerTest, TestBasic) {
std::vector<rpc::ObjectReference> objects_to_locate;
auto req_id = pull_manager_.Pull(refs, prio, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), oids);
ASSERT_TRUE(pull_manager_.HasPullsQueued());
std::unordered_set<NodeID> client_ids;
client_ids.insert(NodeID::FromRandom());
@ -409,6 +410,7 @@ TEST_P(PullManagerTest, TestBasic) {
ASSERT_EQ(num_send_pull_request_calls_, oids.size());
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
AssertNumActiveRequestsEquals(oids.size());
ASSERT_FALSE(pull_manager_.HasPullsQueued());
// Don't pull an object if it's local.
object_is_local_ = true;
@ -428,6 +430,7 @@ TEST_P(PullManagerTest, TestBasic) {
ASSERT_EQ(num_abort_calls_[oid], 1);
ASSERT_FALSE(pull_manager_.IsObjectActive(oid));
}
ASSERT_FALSE(pull_manager_.HasPullsQueued());
// Don't pull a remote object if we've canceled.
object_is_local_ = false;
@ -437,6 +440,7 @@ TEST_P(PullManagerTest, TestBasic) {
pull_manager_.OnLocationChange(oids[i], client_ids, "", NodeID::Nil(), 0);
}
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_FALSE(pull_manager_.HasPullsQueued());
AssertNoLeaks();
}
@ -634,6 +638,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
std::vector<rpc::ObjectReference> objects_to_locate;
auto req_id = pull_manager_.Pull(refs, prio, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), oids);
ASSERT_TRUE(pull_manager_.HasPullsQueued());
std::unordered_set<NodeID> client_ids;
client_ids.insert(NodeID::FromRandom());
@ -649,6 +654,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
ASSERT_TRUE(pull_manager_.IsObjectActive(oids[i]));
}
ASSERT_TRUE(pull_manager_.PullRequestActiveOrWaitingForMetadata(req_id));
ASSERT_FALSE(pull_manager_.HasPullsQueued());
// Reduce the available memory.
ASSERT_TRUE(num_abort_calls_.empty());
@ -657,6 +663,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
// In unlimited mode, we fulfill all ray.gets using the fallback allocator.
if (RayConfig::instance().plasma_unlimited() && GetParam()) {
ASSERT_FALSE(pull_manager_.HasPullsQueued());
AssertNumActiveRequestsEquals(3);
AssertNumActiveBundlesEquals(1);
if (!RayConfig::instance().plasma_unlimited()) {
@ -666,6 +673,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
}
if (RayConfig::instance().pull_manager_min_active_pulls() == 0) {
ASSERT_TRUE(pull_manager_.HasPullsQueued());
AssertNumActiveRequestsEquals(0);
if (!RayConfig::instance().plasma_unlimited()) {
ASSERT_EQ(num_object_store_full_calls_, 1);
@ -676,6 +684,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
}
ASSERT_FALSE(pull_manager_.PullRequestActiveOrWaitingForMetadata(req_id));
} else {
ASSERT_FALSE(pull_manager_.HasPullsQueued());
AssertNumActiveRequestsEquals(3);
if (!RayConfig::instance().plasma_unlimited()) {
ASSERT_EQ(num_object_store_full_calls_, 1);

View file

@ -444,27 +444,31 @@ message ResourcesData {
bytes node_id = 1;
// Resource capacity currently available on this node manager.
map<string, double> resources_available = 2;
// Indicates whether available resources is changed. Only used when light
// heartbeat enabled.
bool resources_available_changed = 3;
// Whether this node has object pulls queued. This can happen if
// the node has more pull requests than available object store
// memory. This is a proxy for available object store memory.
bool object_pulls_queued = 3;
// Indicates whether available resources or object_pulls_queued is
// changed. Only used when light heartbeat enabled.
bool resources_available_changed = 4;
// Total resource capacity configured for this node manager.
map<string, double> resources_total = 4;
map<string, double> resources_total = 5;
// Aggregate outstanding resource load on this node manager.
map<string, double> resource_load = 5;
map<string, double> resource_load = 6;
// Indicates whether resource load is changed. Only used when
// light heartbeat enabled.
bool resource_load_changed = 6;
bool resource_load_changed = 7;
// The resource load on this node, sorted by resource shape.
ResourceLoad resource_load_by_shape = 7;
ResourceLoad resource_load_by_shape = 8;
// Whether this node manager is requesting global GC.
bool should_global_gc = 8;
bool should_global_gc = 9;
// IP address of the node.
string node_manager_address = 9;
string node_manager_address = 10;
// Normal task resources.
map<string, double> resources_normal_task = 10;
map<string, double> resources_normal_task = 11;
// Indicates whether resource normal task is changed. Only used when
// light heartbeat enabled.
bool resources_normal_task_changed = 11;
bool resources_normal_task_changed = 12;
}
message ResourceUsageBatchData {

View file

@ -145,6 +145,10 @@ message ObjectStoreStats {
int64 num_local_objects = 11;
// The number of plasma object bytes that are consumed by core workers.
int64 consumed_bytes = 12;
// Whether this node has object pulls queued. This can happen if
// the node has more pull requests than available object store
// memory.
bool object_pulls_queued = 13;
}
message GetNodeStatsReply {

View file

@ -36,6 +36,7 @@ class TaskDependencyManagerInterface {
const std::vector<rpc::ObjectReference> &required_objects) = 0;
virtual void RemoveTaskDependencies(const TaskID &task_id) = 0;
virtual bool TaskDependenciesBlocked(const TaskID &task_id) const = 0;
virtual bool CheckObjectLocal(const ObjectID &object_id) const = 0;
virtual ~TaskDependencyManagerInterface(){};
};

View file

@ -287,7 +287,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
cluster_resource_scheduler_ =
std::shared_ptr<ClusterResourceScheduler>(new ClusterResourceScheduler(
self_node_id_.Binary(), local_resources.GetTotalResources().GetResourceMap(),
[this]() { return object_manager_.GetUsedMemory(); }));
[this]() { return object_manager_.GetUsedMemory(); },
[this]() { return object_manager_.PullManagerHasPullsQueued(); }));
auto get_node_info_func = [this](const NodeID &node_id) {
return gcs_client_->Nodes().Get(node_id);
@ -2257,6 +2258,9 @@ rpc::ObjectStoreStats AccumulateStoreStats(
cur_store.num_local_objects());
store_stats.set_consumed_bytes(store_stats.consumed_bytes() +
cur_store.consumed_bytes());
if (cur_store.object_pulls_queued()) {
store_stats.set_object_pulls_queued(true);
}
}
return store_stats;
}

View file

@ -75,9 +75,11 @@ std::vector<double> VectorFixedPointToVectorDouble(
/// Convert a map of resources to a ResourceRequest data structure.
ResourceRequest ResourceMapToResourceRequest(
StringIdMap &string_to_int_map,
const std::unordered_map<std::string, double> &resource_map) {
const std::unordered_map<std::string, double> &resource_map,
bool requires_object_store_memory) {
ResourceRequest resource_request;
resource_request.requires_object_store_memory = requires_object_store_memory;
resource_request.predefined_resources.resize(PredefinedResources_MAX);
for (auto const &resource : resource_map) {
@ -200,7 +202,13 @@ float NodeResources::CalculateCriticalResourceUtilization() const {
return highest;
}
bool NodeResources::IsAvailable(const ResourceRequest &resource_request) const {
bool NodeResources::IsAvailable(const ResourceRequest &resource_request,
bool ignore_pull_manager_at_capacity) const {
if (!ignore_pull_manager_at_capacity && resource_request.requires_object_store_memory &&
object_pulls_queued) {
RAY_LOG(DEBUG) << "At pull manager capacity";
return false;
}
// First, check predefined resources.
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (i >= this->predefined_resources.size()) {
@ -214,6 +222,7 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request) const {
const auto &demand = resource_request.predefined_resources[i];
if (resource < demand) {
RAY_LOG(DEBUG) << "At resource capacity";
return false;
}
}
@ -227,6 +236,7 @@ bool NodeResources::IsAvailable(const ResourceRequest &resource_request) const {
return false;
}
}
return true;
}

View file

@ -65,6 +65,9 @@ class ResourceRequest {
std::vector<FixedPoint> predefined_resources;
/// List of custom resources required by the task.
std::unordered_map<int64_t, FixedPoint> custom_resources;
/// Whether this task requires object store memory.
/// TODO(swang): This should be a quantity instead of a flag.
bool requires_object_store_memory = false;
/// Check whether the request contains no resources.
bool IsEmpty() const;
/// Returns human-readable string for this task request.
@ -144,18 +147,22 @@ class NodeResources {
NodeResources() {}
NodeResources(const NodeResources &other)
: predefined_resources(other.predefined_resources),
custom_resources(other.custom_resources) {}
custom_resources(other.custom_resources),
object_pulls_queued(other.object_pulls_queued) {}
/// Available and total capacities for predefined resources.
std::vector<ResourceCapacity> predefined_resources;
/// Map containing custom resources. The key of each entry represents the
/// custom resource ID.
absl::flat_hash_map<int64_t, ResourceCapacity> custom_resources;
bool object_pulls_queued = false;
/// Amongst CPU, memory, and object store memory, calculate the utilization percentage
/// of each resource and return the highest.
float CalculateCriticalResourceUtilization() const;
/// Returns true if the node has the available resources to run the task.
/// Note: This doesn't account for the binpacking of unit resources.
bool IsAvailable(const ResourceRequest &resource_request) const;
bool IsAvailable(const ResourceRequest &resource_request,
bool ignore_at_capacity = false) const;
/// Returns true if the node's total resources are enough to run the task.
/// Note: This doesn't account for the binpacking of unit resources.
bool IsFeasible(const ResourceRequest &resource_request) const;
@ -220,6 +227,7 @@ NodeResources ResourceMapToNodeResources(
/// Convert a map of resources to a ResourceRequest data structure.
ResourceRequest ResourceMapToResourceRequest(
StringIdMap &string_to_int_map,
const std::unordered_map<std::string, double> &resource_map);
const std::unordered_map<std::string, double> &resource_map,
bool requires_object_store_memory);
} // namespace ray

View file

@ -42,9 +42,11 @@ ClusterResourceScheduler::ClusterResourceScheduler(
ClusterResourceScheduler::ClusterResourceScheduler(
const std::string &local_node_id,
const std::unordered_map<std::string, double> &local_node_resources,
std::function<int64_t(void)> get_used_object_store_memory)
std::function<int64_t(void)> get_used_object_store_memory,
std::function<bool(void)> get_pull_manager_at_capacity)
: hybrid_spillback_(RayConfig::instance().scheduler_hybrid_scheduling()),
spread_threshold_(RayConfig::instance().scheduler_spread_threshold()) {
spread_threshold_(RayConfig::instance().scheduler_spread_threshold()),
get_pull_manager_at_capacity_(get_pull_manager_at_capacity) {
local_node_id_ = string_to_int_map_.Insert(local_node_id);
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, local_node_resources, local_node_resources);
@ -133,6 +135,8 @@ bool ClusterResourceScheduler::UpdateNode(const std::string &node_id_string,
for (auto &entry : node_resources.custom_resources) {
local_view.custom_resources[entry.first].available = entry.second.available;
}
local_view.object_pulls_queued = resource_data.object_pulls_queued();
}
AddOrUpdateNode(node_id, local_view);
@ -159,16 +163,6 @@ bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) {
return RemoveNode(node_id);
}
bool ClusterResourceScheduler::IsLocallyFeasible(
const std::unordered_map<std::string, double> shape) {
const ResourceRequest resource_request =
ResourceMapToResourceRequest(string_to_int_map_, shape);
RAY_CHECK(nodes_.contains(local_node_id_));
const auto &it = nodes_.find(local_node_id_);
RAY_CHECK(it != nodes_.end());
return IsFeasible(resource_request, it->second.GetLocalView());
}
bool ClusterResourceScheduler::IsFeasible(const ResourceRequest &resource_request,
const NodeResources &resources) const {
// First, check predefined resources.
@ -198,6 +192,15 @@ int64_t ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_
int64_t node_id,
const NodeResources &resources) const {
int violations = 0;
if (resource_request.requires_object_store_memory && resources.object_pulls_queued &&
node_id != local_node_id_) {
// It's okay if the local node's pull manager is at capacity because we
// will eventually spill the task back from the waiting queue if its args
// cannot be pulled.
return -1;
}
// First, check predefined resources.
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (resource_request.predefined_resources[i] >
@ -350,10 +353,11 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(
}
std::string ClusterResourceScheduler::GetBestSchedulableNode(
const std::unordered_map<std::string, double> &task_resources, bool actor_creation,
bool force_spillback, int64_t *total_violations, bool *is_infeasible) {
ResourceRequest resource_request =
ResourceMapToResourceRequest(string_to_int_map_, task_resources);
const std::unordered_map<std::string, double> &task_resources,
bool requires_object_store_memory, bool actor_creation, bool force_spillback,
int64_t *total_violations, bool *is_infeasible) {
ResourceRequest resource_request = ResourceMapToResourceRequest(
string_to_int_map_, task_resources, requires_object_store_memory);
int64_t node_id = GetBestSchedulableNode(
resource_request, actor_creation, force_spillback, total_violations, is_infeasible);
@ -396,6 +400,11 @@ bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources(
std::max(FixedPoint(0), it->second.available - task_req_custom_resource.second);
}
}
// TODO(swang): We should also subtract object store memory if the task has
// arguments. Right now we do not modify object_pulls_queued in case of
// performance regressions in spillback.
return true;
}
@ -937,8 +946,9 @@ bool ClusterResourceScheduler::AllocateLocalTaskResources(
const std::unordered_map<std::string, double> &task_resources,
std::shared_ptr<TaskResourceInstances> task_allocation) {
RAY_CHECK(task_allocation != nullptr);
ResourceRequest resource_request =
ResourceMapToResourceRequest(string_to_int_map_, task_resources);
// We don't track object store memory demands so no need to allocate them.
ResourceRequest resource_request = ResourceMapToResourceRequest(
string_to_int_map_, task_resources, /*requires_object_store_memory=*/false);
return AllocateLocalTaskResources(resource_request, task_allocation);
}
@ -959,8 +969,8 @@ std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx)
bool ClusterResourceScheduler::AllocateRemoteTaskResources(
const std::string &node_string,
const std::unordered_map<std::string, double> &task_resources) {
ResourceRequest resource_request =
ResourceMapToResourceRequest(string_to_int_map_, task_resources);
ResourceRequest resource_request = ResourceMapToResourceRequest(
string_to_int_map_, task_resources, /*requires_object_store_memory=*/false);
auto node_id = string_to_int_map_.Insert(node_string);
RAY_CHECK(node_id != local_node_id_);
return SubtractRemoteNodeAvailableResources(node_id, resource_request);
@ -1045,6 +1055,15 @@ void ClusterResourceScheduler::FillResourceUsage(rpc::ResourcesData &resources_d
(*resources_data.mutable_resources_total())[label] = capacity.total.Double();
}
}
if (get_pull_manager_at_capacity_ != nullptr) {
resources.object_pulls_queued = get_pull_manager_at_capacity_();
if (last_report_resources_->object_pulls_queued != resources.object_pulls_queued) {
resources_data.set_object_pulls_queued(resources.object_pulls_queued);
resources_data.set_resources_available_changed(true);
}
}
if (resources != *last_report_resources_.get()) {
last_report_resources_.reset(new NodeResources(resources));
}

View file

@ -49,7 +49,8 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
ClusterResourceScheduler(
const std::string &local_node_id,
const std::unordered_map<std::string, double> &local_node_resources,
std::function<int64_t(void)> get_used_object_store_memory = nullptr);
std::function<int64_t(void)> get_used_object_store_memory = nullptr,
std::function<bool(void)> get_pull_manager_at_capacity = nullptr);
// Mapping from predefined resource indexes to resource strings
std::string GetResourceNameFromIndex(int64_t res_idx);
@ -78,13 +79,6 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
bool RemoveNode(int64_t node_id);
bool RemoveNode(const std::string &node_id_string) override;
/// Check whether a resource request is feasible on a given node. A node is
/// feasible if it has the total resources needed to eventually execute the
/// task, even if those resources are currently allocated.
///
/// \param shape The resource demand's shape.
bool IsLocallyFeasible(const std::unordered_map<std::string, double> shape);
/// Check whether a resource request is feasible on a given node. A node is
/// feasible if it has the total resources needed to eventually execute the
/// task, even if those resources are currently allocated.
@ -180,8 +174,8 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
// resource request.
std::string GetBestSchedulableNode(
const std::unordered_map<std::string, double> &resource_request,
bool actor_creation, bool force_spillback, int64_t *violations,
bool *is_infeasible);
bool requires_object_store_memory, bool actor_creation, bool force_spillback,
int64_t *violations, bool *is_infeasible);
/// Return resources associated to the given node_id in ret_resources.
/// If node_id not found, return false; otherwise return true.
@ -458,6 +452,8 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
std::unique_ptr<NodeResources> last_report_resources_;
/// Function to get used object store memory.
std::function<int64_t(void)> get_used_object_store_memory_;
/// Function to get whether the pull manager is at capacity.
std::function<bool(void)> get_pull_manager_at_capacity_;
// Specify predefine resources that consists of unit-size instances.
std::unordered_set<int64_t> predefined_unit_instance_resources_{};

View file

@ -898,21 +898,21 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) {
// No feasible nodes.
int64_t total_violations;
bool is_infeasible;
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false,
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false, false,
&total_violations, &is_infeasible),
"");
// Feasible remote node, but doesn't currently have resources available. We
// should spill there.
resource_scheduler.AddOrUpdateNode("remote_feasible", resource_spec, {{"CPU", 0.}});
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false,
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false, false,
&total_violations, &is_infeasible),
"remote_feasible");
// Feasible remote node, and it currently has resources available. We should
// prefer to spill there.
resource_scheduler.AddOrUpdateNode("remote_available", resource_spec, resource_spec);
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false,
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false, false,
&total_violations, &is_infeasible),
"remote_available");
}
@ -1079,16 +1079,16 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
// Resource usage report tick should reset the remote node's resources.
resource_scheduler.FillResourceUsage(data);
for (int j = 0; j < num_slots_available; j++) {
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(task_spec, false, false, &t,
&is_infeasible),
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(task_spec, false, false,
false, &t, &is_infeasible),
"remote");
// Allocate remote resources.
ASSERT_TRUE(resource_scheduler.AllocateRemoteTaskResources("remote", task_spec));
}
// Our local view says there are not enough resources on the remote node to
// schedule another task.
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(task_spec, false, false, &t,
&is_infeasible),
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(task_spec, false, false, false,
&t, &is_infeasible),
"");
ASSERT_FALSE(
resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation));
@ -1106,28 +1106,28 @@ TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) {
bool is_infeasible;
std::string result = resource_scheduler.GetBestSchedulableNode(
resource_request, false, false, &t, &is_infeasible);
resource_request, false, false, false, &t, &is_infeasible);
ASSERT_TRUE(result.empty());
resource_scheduler.AddLocalResourceInstances("custom123", {0., 1.0, 1.0});
result = resource_scheduler.GetBestSchedulableNode(resource_request, false, false, &t,
&is_infeasible);
result = resource_scheduler.GetBestSchedulableNode(resource_request, false, false,
false, &t, &is_infeasible);
ASSERT_FALSE(result.empty()) << resource_scheduler.DebugString();
resource_request["custom123"] = 3;
result = resource_scheduler.GetBestSchedulableNode(resource_request, false, false, &t,
&is_infeasible);
result = resource_scheduler.GetBestSchedulableNode(resource_request, false, false,
false, &t, &is_infeasible);
ASSERT_TRUE(result.empty());
resource_scheduler.AddLocalResourceInstances("custom123", {1.0});
result = resource_scheduler.GetBestSchedulableNode(resource_request, false, false, &t,
&is_infeasible);
result = resource_scheduler.GetBestSchedulableNode(resource_request, false, false,
false, &t, &is_infeasible);
ASSERT_FALSE(result.empty());
resource_scheduler.DeleteLocalResource("custom123");
result = resource_scheduler.GetBestSchedulableNode(resource_request, false, false, &t,
&is_infeasible);
result = resource_scheduler.GetBestSchedulableNode(resource_request, false, false,
false, &t, &is_infeasible);
ASSERT_TRUE(result.empty());
}
@ -1153,24 +1153,24 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) {
int64_t total_violations;
bool is_infeasible;
// Normally we prefer local.
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false,
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false,
/*force_spillback=*/false,
&total_violations, &is_infeasible),
"local");
// If spillback is forced, we try to spill to remote, but only if there is a
// schedulable node.
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false,
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false,
/*force_spillback=*/true,
&total_violations, &is_infeasible),
"");
// Choose a remote node that has the resources available.
resource_scheduler.AddOrUpdateNode(std::to_string(50), resource_spec, {});
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false,
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false,
/*force_spillback=*/true,
&total_violations, &is_infeasible),
"");
resource_scheduler.AddOrUpdateNode(std::to_string(51), resource_spec, resource_spec);
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false,
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, false,
/*force_spillback=*/true,
&total_violations, &is_infeasible),
"51");

View file

@ -67,7 +67,9 @@ bool ClusterTaskManager::SchedulePendingTasks() {
// This argument is used to set violation, which is an unsupported feature now.
int64_t _unused;
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
placement_resources, task.GetTaskSpecification().IsActorCreationTask(),
placement_resources,
/*requires_object_store_memory=*/false,
task.GetTaskSpecification().IsActorCreationTask(),
/*force_spillback=*/false, &_unused, &is_infeasible);
// There is no node that has available resources to run the request.
@ -288,8 +290,9 @@ bool ClusterTaskManager::TrySpillback(const Work &work, bool &is_infeasible) {
int64_t _unused;
auto placement_resources = spec.GetRequiredPlacementResources().GetResourceMap();
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
placement_resources, spec.IsActorCreationTask(), /*force_spillback=*/false,
&_unused, &is_infeasible);
placement_resources,
/*requires_object_store_memory=*/false, spec.IsActorCreationTask(),
/*force_spillback=*/false, &_unused, &is_infeasible);
if (is_infeasible || node_id_string == self_node_id_.Binary() ||
node_id_string.empty()) {
@ -844,7 +847,9 @@ void ClusterTaskManager::TryLocalInfeasibleTaskScheduling() {
int64_t _unused;
bool is_infeasible;
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
placement_resources, task.GetTaskSpecification().IsActorCreationTask(),
placement_resources,
/*requires_object_store_memory=*/false,
task.GetTaskSpecification().IsActorCreationTask(),
/*force_spillback=*/false, &_unused, &is_infeasible);
// There is no node that has available resources to run the request.
@ -1067,6 +1072,8 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() {
}
void ClusterTaskManager::SpillWaitingTasks() {
RAY_LOG(DEBUG) << "Attempting to spill back from waiting task queue, num waiting: "
<< waiting_task_queue_.size();
// Try to spill waiting tasks to a remote node, prioritizing those at the end
// of the queue. Waiting tasks are spilled if there are enough remote
// resources AND (we have no resources available locally OR their
@ -1082,6 +1089,7 @@ void ClusterTaskManager::SpillWaitingTasks() {
it--;
const auto &task = std::get<0>(*it);
const auto &task_id = task.GetTaskSpecification().TaskId();
// Check whether this task's dependencies are blocked (not being actively
// pulled). If this is true, then we should force the task onto a remote
// feasible node, even if we have enough resources available locally for
@ -1093,11 +1101,13 @@ void ClusterTaskManager::SpillWaitingTasks() {
task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap();
int64_t _unused;
bool is_infeasible;
// TODO(swang): The policy currently does not account for object store
// memory availability. Ideally, we should pick the node with the most
// memory availability.
// TODO(swang): The policy currently does not account for the amount of
// object store memory availability. Ideally, we should pick the node with
// the most memory availability.
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
placement_resources, task.GetTaskSpecification().IsActorCreationTask(),
placement_resources,
/*requires_object_store_memory=*/true,
task.GetTaskSpecification().IsActorCreationTask(),
/*force_spillback=*/force_spillback, &_unused, &is_infeasible);
if (!node_id_string.empty() && node_id_string != self_node_id_.Binary()) {
NodeID node_id = NodeID::FromBinary(node_id_string);

View file

@ -140,6 +140,8 @@ class MockTaskDependencyManager : public TaskDependencyManagerInterface {
return blocked_tasks.count(task_id);
}
bool CheckObjectLocal(const ObjectID &object_id) const { return true; }
std::unordered_set<ObjectID> &missing_objects_;
std::unordered_set<TaskID> subscribed_tasks;
std::unordered_set<TaskID> blocked_tasks;

View file

@ -44,7 +44,18 @@ int64_t HybridPolicy(const ResourceRequest &resource_request, const int64_t loca
continue;
}
bool is_available = node.GetLocalView().IsAvailable(resource_request);
bool ignore_pull_manager_at_capacity = false;
if (node_id == local_node_id) {
// It's okay if the local node's pull manager is at
// capacity because we will eventually spill the task
// back from the waiting queue if its args cannot be
// pulled.
ignore_pull_manager_at_capacity = true;
}
bool is_available = node.GetLocalView().IsAvailable(resource_request,
ignore_pull_manager_at_capacity);
RAY_LOG(DEBUG) << "Node " << node_id << " is "
<< (is_available ? "available" : "not available");
float critical_resource_utilization =
node.GetLocalView().CalculateCriticalResourceUtilization();
if (critical_resource_utilization < spread_threshold) {

View file

@ -24,8 +24,8 @@ class SchedulingPolicyTest : public ::testing::Test {};
TEST_F(SchedulingPolicyTest, FeasibleDefinitionTest) {
StringIdMap map;
auto task_req1 =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"object_store_memory", 1}});
auto task_req2 = ResourceMapToResourceRequest(map, {{"CPU", 1}});
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"object_store_memory", 1}}, false);
auto task_req2 = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
{
// Don't break with a non-resized predefined resources array.
NodeResources resources;
@ -47,8 +47,8 @@ TEST_F(SchedulingPolicyTest, FeasibleDefinitionTest) {
TEST_F(SchedulingPolicyTest, AvailableDefinitionTest) {
StringIdMap map;
auto task_req1 =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"object_store_memory", 1}});
auto task_req2 = ResourceMapToResourceRequest(map, {{"CPU", 1}});
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"object_store_memory", 1}}, false);
auto task_req2 = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
{
// Don't break with a non-resized predefined resources array.
NodeResources resources;
@ -111,7 +111,7 @@ TEST_F(SchedulingPolicyTest, AvailableTruncationTest) {
// has a lower critical resource utilization, but they're both truncated to 0, so we
// should still pick the local node (due to traversal order).
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}});
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
@ -128,7 +128,7 @@ TEST_F(SchedulingPolicyTest, AvailableTieBreakTest) {
// In this test, the local node and a remote node are both available. The remote node
// has a lower critical resource utilization so we schedule on it.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}});
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
@ -146,7 +146,8 @@ TEST_F(SchedulingPolicyTest, AvailableOverFeasibleTest) {
// utilization, but the remote node can run the task immediately, so we pick the remote
// node.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}});
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
@ -162,7 +163,8 @@ TEST_F(SchedulingPolicyTest, AvailableOverFeasibleTest) {
TEST_F(SchedulingPolicyTest, InfeasibleTest) {
// All the nodes are infeasible, so we return -1.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}});
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
@ -179,7 +181,8 @@ TEST_F(SchedulingPolicyTest, BarelyFeasibleTest) {
// Test the edge case where a task requires all of a node's resources, and the node is
// fully utilized.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}});
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
absl::flat_hash_map<int64_t, Node> nodes;
@ -194,7 +197,8 @@ TEST_F(SchedulingPolicyTest, TruncationAcrossFeasibleNodesTest) {
// Same as AvailableTruncationTest except now none of the nodes are available, but the
// tie break logic should apply to feasible nodes too.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}});
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
@ -211,7 +215,8 @@ TEST_F(SchedulingPolicyTest, ForceSpillbackIfAvailableTest) {
// The local node is better, but we force spillback, so we'll schedule on a non-local
// node anyways.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}});
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
@ -227,7 +232,8 @@ TEST_F(SchedulingPolicyTest, ForceSpillbackIfAvailableTest) {
TEST_F(SchedulingPolicyTest, ForceSpillbackTest) {
// The local node is available but disqualified.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}});
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
@ -244,7 +250,8 @@ TEST_F(SchedulingPolicyTest, ForceSpillbackOnlyFeasibleLocallyTest) {
// The local node is better, but we force spillback, so we'll schedule on a non-local
// node anyways.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}});
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;