diff --git a/java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java b/java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java index 501ad151e..65494d532 100644 --- a/java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java +++ b/java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java @@ -7,19 +7,13 @@ import java.util.Map; * The options class for RayCall or ActorCreation. */ public abstract class BaseTaskOptions { - public final Map resources; + public Map resources; public BaseTaskOptions() { resources = new HashMap<>(); } public BaseTaskOptions(Map resources) { - for (Map.Entry entry : resources.entrySet()) { - if (entry.getValue().compareTo(0.0) <= 0) { - throw new IllegalArgumentException(String.format("Resource capacity should be " + - "positive, but got resource %s = %f.", entry.getKey(), entry.getValue())); - } - } this.resources = resources; } diff --git a/java/example.conf b/java/example.conf index cbdaf781d..e6aaa37ea 100644 --- a/java/example.conf +++ b/java/example.conf @@ -14,7 +14,7 @@ ray { run-mode = CLUSTER // Available resources on this node. - resources: "CPU:4" + resources: "CPU:4,GPU:0" // The address of the redis server to connect, in format `ip:port`. // If not provided, Ray processes will be started locally, including diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 0cd6ee4f6..00d24714f 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -32,6 +32,7 @@ import org.ray.runtime.raylet.RayletClient; import org.ray.runtime.task.ArgumentsBuilder; import org.ray.runtime.task.TaskLanguage; import org.ray.runtime.task.TaskSpec; +import org.ray.runtime.util.ResourceUtil; import org.ray.runtime.util.UniqueIdUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -356,6 +357,11 @@ public abstract class AbstractRayRuntime implements RayRuntime { resources = new HashMap<>(taskOptions.resources); } + if (!resources.containsKey(ResourceUtil.CPU_LITERAL) + && !resources.containsKey(ResourceUtil.CPU_LITERAL.toLowerCase())) { + resources.put(ResourceUtil.CPU_LITERAL, 0.0); + } + int maxActorReconstruction = 0; if (taskOptions instanceof ActorCreationOptions) { maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions; diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java index 99e67288d..78bfda089 100644 --- a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java @@ -104,6 +104,10 @@ public class RayConfig { + "setting it to the number of CPU cores: {}", numCpu); resources.put("CPU", numCpu * 1.0); } + if (!resources.containsKey("GPU")) { + LOGGER.warn("No GPU resource is set in configuration, setting it to 0"); + resources.put("GPU", 0.0); + } } // Driver id. String driverId = config.getString("ray.driver.id"); diff --git a/java/streaming/src/main/resources/ray.conf b/java/streaming/src/main/resources/ray.conf index faf1ca9aa..4494cff06 100644 --- a/java/streaming/src/main/resources/ray.conf +++ b/java/streaming/src/main/resources/ray.conf @@ -1,5 +1,5 @@ ray { run-mode = SINGLE_PROCESS - resources = "CPU:4" + resources = "CPU:4,GPU:0" redis.address = "" } diff --git a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java index c3d0e4152..6ca9d7c9d 100644 --- a/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java +++ b/java/test/src/main/java/org/ray/api/test/ResourcesManagementTest.java @@ -46,30 +46,22 @@ public class ResourcesManagementTest extends BaseTest { @Test public void testMethods() { TestUtils.skipTestUnderSingleProcess(); - CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0)); + CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 0.0)); // This is a case that can satisfy required resources. // The static resources for test are "CPU:4,RES-A:4". RayObject result1 = Ray.call(ResourcesManagementTest::echo, 100, callOptions1); Assert.assertEquals(100, (int) result1.get()); - CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0)); + CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 2.0)); // This is a case that can't satisfy required resources. // The static resources for test are "CPU:4,RES-A:4". final RayObject result2 = Ray.call(ResourcesManagementTest::echo, 200, callOptions2); WaitResult waitResult = Ray.wait(ImmutableList.of(result2), 1, 1000); - Assert.assertEquals(1, waitResult.getReady().size()); - Assert.assertEquals(0, waitResult.getUnready().size()); - - try { - CallOptions callOptions3 = new CallOptions(ImmutableMap.of("CPU", 0.0)); - Assert.fail(); - } catch (RuntimeException e) { - // We should receive a RuntimeException indicates that we should not - // pass a zero capacity resource. - } + Assert.assertEquals(0, waitResult.getReady().size()); + Assert.assertEquals(1, waitResult.getUnready().size()); } @Test @@ -77,7 +69,7 @@ public class ResourcesManagementTest extends BaseTest { TestUtils.skipTestUnderSingleProcess(); ActorCreationOptions actorCreationOptions1 = - new ActorCreationOptions(ImmutableMap.of("CPU", 2.0)); + new ActorCreationOptions(ImmutableMap.of("CPU", 2.0, "GPU", 0.0)); // This is a case that can satisfy required resources. // The static resources for test are "CPU:4,RES-A:4". @@ -88,7 +80,7 @@ public class ResourcesManagementTest extends BaseTest { // This is a case that can't satisfy required resources. // The static resources for test are "CPU:4,RES-A:4". ActorCreationOptions actorCreationOptions2 = - new ActorCreationOptions(ImmutableMap.of("CPU", 8.0)); + new ActorCreationOptions(ImmutableMap.of("CPU", 8.0, "GPU", 0.0)); RayActor echo2 = Ray.createActor(Echo::new, actorCreationOptions2); diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index 70e7e584d..872b93d22 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -44,6 +44,8 @@ cdef class Task: # Parse the resource map. if resource_map is not None: required_resources = resource_map_from_dict(resource_map) + if required_resources.count(b"CPU") == 0: + required_resources[b"CPU"] = 1.0 if placement_resource_map is not None: required_placement_resources = ( resource_map_from_dict(placement_resource_map)) diff --git a/python/ray/services.py b/python/ray/services.py index 4a6fd30f3..0fc7f4e73 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1029,25 +1029,14 @@ def check_and_update_resources(num_cpus, num_gpus, resources): if gpu_ids is not None: resources["GPU"] = min(resources["GPU"], len(gpu_ids)) - resources = { - resource_label: resource_quantity - for resource_label, resource_quantity in resources.items() - if resource_quantity != 0 - } - # Check types. for _, resource_quantity in resources.items(): assert (isinstance(resource_quantity, int) or isinstance(resource_quantity, float)) if (isinstance(resource_quantity, float) and not resource_quantity.is_integer()): - raise ValueError( - "Resource quantities must all be whole numbers. Received {}.". - format(resources)) - if resource_quantity < 0: - raise ValueError( - "Resource quantities must be nonnegative. Received {}.".format( - resources)) + raise ValueError("Resource quantities must all be whole numbers.") + if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY: raise ValueError("Resource quantities must be at most {}.".format( ray_constants.MAX_RESOURCE_QUANTITY)) @@ -1124,9 +1113,8 @@ def start_raylet(redis_address, # Limit the number of workers that can be started in parallel by the # raylet. However, make sure it is at least 1. - num_cpus_static = static_resources.get("CPU", 0) maximum_startup_concurrency = max( - 1, min(multiprocessing.cpu_count(), num_cpus_static)) + 1, min(multiprocessing.cpu_count(), static_resources["CPU"])) # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'. resource_argument = ",".join( diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c24c90e0d..d535fa593 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1948,15 +1948,15 @@ def test_multiple_raylets(ray_start_cluster): store_names = [] store_names += [ client["ObjectStoreSocketName"] for client in client_table - if client["Resources"].get("GPU", 0) == 0 + if client["Resources"]["GPU"] == 0 ] store_names += [ client["ObjectStoreSocketName"] for client in client_table - if client["Resources"].get("GPU", 0) == 5 + if client["Resources"]["GPU"] == 5 ] store_names += [ client["ObjectStoreSocketName"] for client in client_table - if client["Resources"].get("GPU", 0) == 1 + if client["Resources"]["GPU"] == 1 ] assert len(store_names) == 3 @@ -2126,32 +2126,6 @@ def test_many_custom_resources(shutdown_only): ray.get(results) -def test_zero_capacity_deletion_semantics(shutdown_only): - ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1}) - - def test(): - resources = ray.global_state.available_resources() - retry_count = 0 - - while resources and retry_count < 5: - time.sleep(0.1) - resources = ray.global_state.available_resources() - retry_count += 1 - - if retry_count >= 5: - raise RuntimeError("Resources were available even after retries.") - - return resources - - function = ray.remote( - num_cpus=2, num_gpus=1, resources={"test_resource": 1})(test) - cluster_resources = ray.get(function.remote()) - - # All cluster resources should be utilized and - # cluster_resources must be empty - assert cluster_resources == {} - - @pytest.fixture def save_gpu_ids_shutdown_only(): # Record the curent value of this environment variable so that we can diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index bc82eb859..ee3d304c2 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -51,8 +51,8 @@ def test_uses_resources(ray_start_regular): while not resource_used: available_resources = ray.global_state.available_resources() - resource_used = available_resources.get( - "CPU", 0) == cluster_resources.get("CPU", 0) - 1 + resource_used = available_resources[ + "CPU"] == cluster_resources["CPU"] - 1 assert resource_used diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 27fea5694..7f10a9ee1 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -11,7 +11,7 @@ import time import traceback import ray -from ray.tune.error import AbortTrialExecution +from ray.tune.error import TuneError, AbortTrialExecution from ray.tune.logger import NoopLogger from ray.tune.trial import Trial, Resources, Checkpoint from ray.tune.trial_executor import TrialExecutor @@ -363,22 +363,17 @@ class RayTrialExecutor(TrialExecutor): resources = ray.services.check_and_update_resources( None, None, None) if not resources: - logger.warning( - "Cluster resources not detected or are 0. Retrying...") + logger.warning("Cluster resources not detected. Retrying...") time.sleep(0.5) - if not resources: - # NOTE: This hides the possibility that Ray may be waiting for - # clients to connect. - resources.setdefault("CPU", 0) - resources.setdefault("GPU", 0) - logger.warning("Cluster resources cannot be detected or are 0. " - "You can resume this experiment by passing in " - "`resume=True` to `run`.") + if not resources or "CPU" not in resources: + raise TuneError("Cluster resources cannot be detected. " + "You can resume this experiment by passing in " + "`resume=True` to `run`.") resources = resources.copy() - num_cpus = resources.pop("CPU", 0) - num_gpus = resources.pop("GPU", 0) + num_cpus = resources.pop("CPU") + num_gpus = resources.pop("GPU") custom_resources = resources self._avail_resources = Resources( diff --git a/python/ray/worker.py b/python/ray/worker.py index 3f4072087..f3c17601f 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -653,13 +653,6 @@ class Worker(object): raise ValueError( "Resource quantities must all be whole numbers.") - # Remove any resources with zero quantity requirements - resources = { - resource_label: resource_quantity - for resource_label, resource_quantity in resources.items() - if resource_quantity > 0 - } - if placement_resources is None: placement_resources = {} @@ -1877,7 +1870,7 @@ def connect(node, nil_actor_counter, # actor_counter. [], # new_actor_handles. [], # execution_dependencies. - {}, # resource_map. + {"CPU": 0}, # resource_map. {}, # placement_resource_map. ) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b8daec203..7e68eafa1 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1469,16 +1469,15 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr // Get the CPU resources required by the running task. const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); double required_cpus = required_resources.GetNumCpus(); - std::unordered_map cpu_resources; - if (required_cpus > 0) { - cpu_resources[kCPU_ResourceLabel] = required_cpus; - } + const std::unordered_map cpu_resources = { + {kCPU_ResourceLabel, required_cpus}}; // Release the CPU resources. auto const cpu_resource_ids = worker->ReleaseTaskCpuResources(); local_available_resources_.Release(cpu_resource_ids); - cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( - ResourceSet(cpu_resources)); + RAY_CHECK( + cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( + ResourceSet(cpu_resources))); worker->MarkBlocked(); // Try dispatching tasks since we may have released some resources. @@ -1522,11 +1521,9 @@ void NodeManager::HandleTaskUnblocked( // Get the CPU resources required by the running task. const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); double required_cpus = required_resources.GetNumCpus(); - std::unordered_map cpu_resources_map; - if (required_cpus > 0) { - cpu_resources_map[kCPU_ResourceLabel] = required_cpus; - } - const ResourceSet cpu_resources(cpu_resources_map); + const ResourceSet cpu_resources( + std::unordered_map({{kCPU_ResourceLabel, required_cpus}})); + // Check if we can reacquire the CPU resources. bool oversubscribed = !local_available_resources_.Contains(cpu_resources); @@ -1536,8 +1533,9 @@ void NodeManager::HandleTaskUnblocked( // reacquire here may be different from the ones that the task started with. auto const resource_ids = local_available_resources_.Acquire(cpu_resources); worker->AcquireTaskCpuResources(resource_ids); - cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire( - cpu_resources); + RAY_CHECK( + cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire( + cpu_resources)); } else { // In this case, we simply don't reacquire the CPU resources for the worker. // The worker can keep running and when the task finishes, it will simply @@ -1629,7 +1627,7 @@ bool NodeManager::AssignTask(const Task &task) { auto acquired_resources = local_available_resources_.Acquire(spec.GetRequiredResources()); const auto &my_client_id = gcs_client_->client_table().GetLocalClientId(); - cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()); + RAY_CHECK(cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); if (spec.IsActorCreationTask()) { // Check that we are not placing an actor creation task on a node with 0 CPUs. @@ -1743,8 +1741,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // Release task's resources. The worker's lifetime resources are still held. auto const &task_resources = worker.GetTaskResourceIds(); local_available_resources_.Release(task_resources); - cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( - task_resources.ToResourceSet()); + RAY_CHECK(cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( + task_resources.ToResourceSet())); worker.ResetTaskResourceIds(); // If this was an actor or actor creation task, handle the actor's new state. @@ -2036,9 +2034,6 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task, RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager " << node_manager_id; - - // TODO(romilb): We should probably revert the load subtraction from - // SchedulingPolicy::Schedule() // Mark the failed task as pending to let other raylets know that we still // have the task. TaskDependencyManager::TaskPending() is assumed to be // idempotent. diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index a8c1db4b1..01a3cde57 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -49,13 +49,11 @@ std::unordered_map SchedulingPolicy::Schedule( const auto &node_resources = client_resource_pair.second; ResourceSet available_node_resources = ResourceSet(node_resources.GetAvailableResources()); - // TODO(romilb): Why do we need to subtract load from available resources? - // Even if we don't the code path below for choosing a dst_client_id would be - // similar. - available_node_resources.SubtractResources(node_resources.GetLoadResources()); + available_node_resources.SubtractResourcesStrict(node_resources.GetLoadResources()); RAY_LOG(DEBUG) << "client_id " << node_client_id << " avail: " << node_resources.GetAvailableResources().ToString() - << " load: " << node_resources.GetLoadResources().ToString(); + << " load: " << node_resources.GetLoadResources().ToString() + << " avail-load: " << available_node_resources.ToString(); if (resource_demand.IsSubset(available_node_resources)) { // This node is a feasible candidate. diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index 17a9aba16..cd4a80cd0 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -9,39 +9,16 @@ namespace ray { namespace raylet { -bool EqualsZeroEpsilon(double quantity) { - if ((quantity <= EPSILON) && (quantity >= -1 * EPSILON)) { - return true; - } - return false; -} - -bool EqualsOneEpsilon(double quantity) { - if ((quantity <= 1 + EPSILON) && (quantity >= 1 - EPSILON)) { - return true; - } - return false; -} - ResourceSet::ResourceSet() {} ResourceSet::ResourceSet(const std::unordered_map &resource_map) - : resource_capacity_(resource_map) { - for (auto const &resource_pair : resource_capacity_) { - RAY_CHECK(resource_pair.second > 0 + EPSILON) - << "Resource " << resource_pair.first << " capacity is " << resource_pair.second - << ". Should have been greater than zero."; - } -} + : resource_capacity_(resource_map) {} ResourceSet::ResourceSet(const std::vector &resource_labels, const std::vector resource_capacity) { RAY_CHECK(resource_labels.size() == resource_capacity.size()); for (uint i = 0; i < resource_labels.size(); i++) { - RAY_CHECK(resource_capacity[i] > 0 + EPSILON) - << "Resource " << resource_labels[i] << " capacity is " << resource_capacity[i] - << ". Should have been greater than zero."; - resource_capacity_[resource_labels[i]] = resource_capacity[i]; + RAY_CHECK(AddResource(resource_labels[i], resource_capacity[i])); } } @@ -53,7 +30,13 @@ bool ResourceSet::operator==(const ResourceSet &rhs) const { bool ResourceSet::IsEmpty() const { // Check whether the capacity of each resource type is zero. Exit early if not. - return resource_capacity_.empty(); + if (resource_capacity_.empty()) return true; + for (const auto &resource_pair : resource_capacity_) { + if (resource_pair.second > 0) { + return false; + } + } + return true; } bool ResourceSet::IsSubset(const ResourceSet &other) const { @@ -61,7 +44,11 @@ bool ResourceSet::IsSubset(const ResourceSet &other) const { for (const auto &resource_pair : resource_capacity_) { const auto &resource_name = resource_pair.first; const double lhs_quantity = resource_pair.second; - double rhs_quantity = other.GetResource(resource_name); + double rhs_quantity = 0; + if (!other.GetResource(resource_name, &rhs_quantity)) { + // Resource not found in rhs, therefore lhs is not a subset of rhs. + return false; + } if (lhs_quantity > rhs_quantity) { // Resource found in rhs, but lhs capacity exceeds rhs capacity. return false; @@ -80,42 +67,41 @@ bool ResourceSet::IsEqual(const ResourceSet &rhs) const { return (this->IsSubset(rhs) && rhs.IsSubset(*this)); } +bool ResourceSet::AddResource(const std::string &resource_name, double capacity) { + resource_capacity_[resource_name] = capacity; + return true; +} + bool ResourceSet::RemoveResource(const std::string &resource_name) { throw std::runtime_error("Method not implemented"); } -void ResourceSet::SubtractResources(const ResourceSet &other) { - // Subtract the resources and delete any if new capacity is zero. - for (const auto &resource_pair : other.GetResourceMap()) { - const std::string &resource_label = resource_pair.first; - const double &resource_capacity = resource_pair.second; - if (resource_capacity_.count(resource_label) == 1) { - resource_capacity_[resource_label] -= resource_capacity; - if (resource_capacity_[resource_label] < 0 + EPSILON) { - resource_capacity_.erase(resource_label); - } - } - } -} -void ResourceSet::SubtractResourcesStrict(const ResourceSet &other) { - // Subtract the resources, make sure none goes below zero and delete any if new capacity - // is zero. +bool ResourceSet::SubtractResourcesStrict(const ResourceSet &other) { + // Subtract the resources and track whether a resource goes below zero. + bool oversubscribed = false; for (const auto &resource_pair : other.GetResourceMap()) { const std::string &resource_label = resource_pair.first; const double &resource_capacity = resource_pair.second; RAY_CHECK(resource_capacity_.count(resource_label) == 1) << "Attempt to acquire unknown resource: " << resource_label; resource_capacity_[resource_label] -= resource_capacity; - // TODO(romilb): Double precision subtraction may sometimes be less than zero by a - // small epsilon - need to fix. - RAY_CHECK(resource_capacity_[resource_label] >= 0 - EPSILON) - << "Capacity of resource " << resource_label << " after subtraction is negative (" - << resource_capacity_[resource_label] << ")." - << " Debug: resource_capacity_:" << ToString() << ", other: " << other.ToString(); - if (EqualsZeroEpsilon(resource_capacity_[resource_label])) { - resource_capacity_.erase(resource_label); + if (resource_capacity_[resource_label] < 0) { + oversubscribed = true; } } + return !oversubscribed; +} + +// Perform a left join. +bool ResourceSet::AddResourcesStrict(const ResourceSet &other) { + // Return failure if attempting to perform vector addition with unknown labels. + for (const auto &resource_pair : other.GetResourceMap()) { + const std::string &resource_label = resource_pair.first; + const double &resource_capacity = resource_pair.second; + RAY_CHECK(resource_capacity_.count(resource_label) != 0); + resource_capacity_[resource_label] += resource_capacity; + } + return true; } // Perform an outer join. @@ -123,22 +109,33 @@ void ResourceSet::AddResources(const ResourceSet &other) { for (const auto &resource_pair : other.GetResourceMap()) { const std::string &resource_label = resource_pair.first; const double &resource_capacity = resource_pair.second; - resource_capacity_[resource_label] += resource_capacity; + if (resource_capacity_.count(resource_label) == 0) { + // Add the new label if not found. + RAY_CHECK(AddResource(resource_label, resource_capacity)); + } else { + // Increment the resource by its capacity. + resource_capacity_[resource_label] += resource_capacity; + } } } -double ResourceSet::GetResource(const std::string &resource_name) const { - if (EqualsZeroEpsilon(resource_capacity_.count(resource_name))) { - return 0; +bool ResourceSet::GetResource(const std::string &resource_name, double *value) const { + if (!value) { + return false; } - double capacity = resource_capacity_.at(resource_name); - RAY_CHECK(capacity > 0 + EPSILON) << "Resource " << resource_name << " capacity is " - << capacity - << ". Should have been greater than zero."; - return capacity; + if (resource_capacity_.count(resource_name) == 0) { + *value = std::nan(""); + return false; + } + *value = resource_capacity_.at(resource_name); + return true; } -double ResourceSet::GetNumCpus() const { return GetResource(kCPU_ResourceLabel); } +double ResourceSet::GetNumCpus() const { + double num_cpus; + RAY_CHECK(GetResource(kCPU_ResourceLabel, &num_cpus)); + return num_cpus; +} const std::string ResourceSet::ToString() const { std::string return_string = ""; @@ -225,12 +222,6 @@ ResourceIds ResourceIds::Acquire(double resource_quantity) { if (fractional_pair.second >= resource_quantity) { auto return_pair = std::make_pair(fractional_pair.first, resource_quantity); fractional_pair.second -= resource_quantity; - - // Remove the fractional pair if the new capacity is 0 - if (EqualsZeroEpsilon(fractional_pair.second)) { - std::swap(fractional_pair, fractional_ids_[fractional_ids_.size() - 1]); - fractional_ids_.pop_back(); - } return ResourceIds({return_pair}); } } @@ -266,18 +257,15 @@ void ResourceIds::Release(const ResourceIds &resource_ids) { if (fractional_pair_it == fractional_ids_.end()) { fractional_ids_.push_back(fractional_pair_to_return); } else { + RAY_CHECK(fractional_pair_it->second < 1); fractional_pair_it->second += fractional_pair_to_return.second; - // TODO(romilb): Double precision addition may sometimes exceed 1 by a small epsilon - // - need to fix this. - RAY_CHECK(fractional_pair_it->second <= 1 + EPSILON) - << "Fractional Resource Id " << fractional_pair_it->first << " capacity is " - << fractional_pair_it->second << ". Should have been less than one."; // If this makes the ID whole, then return it to the list of whole IDs. - // TODO(romilb): Double precision addition may sometimes exceed 1 by a small epsilon - // - need to fix this. - if (EqualsOneEpsilon(fractional_pair_it->second)) { + if (fractional_pair_it->second >= 1) { whole_ids_.push_back(resource_id); - fractional_ids_.erase(fractional_pair_it); + fractional_pair_it->second -= 1; + if (fractional_pair_it->second < 1e-6) { + fractional_ids_.erase(fractional_pair_it); + } } } } @@ -295,10 +283,6 @@ const std::vector> &ResourceIds::FractionalIds() cons return fractional_ids_; } -bool ResourceIds::TotalQuantityIsZero() const { - return whole_ids_.empty() && fractional_ids_.empty(); -} - double ResourceIds::TotalQuantity() const { double total_quantity = whole_ids_.size(); for (auto const &fractional_pair : fractional_ids_) { @@ -346,9 +330,9 @@ bool ResourceIdSet::Contains(const ResourceSet &resource_set) const { for (auto const &resource_pair : resource_set.GetResourceMap()) { auto const &resource_name = resource_pair.first; double resource_quantity = resource_pair.second; - RAY_CHECK(resource_quantity > 0 + EPSILON) << "Resource " << resource_name - << " capacity is " << resource_quantity - << ". Should have been greater than zero."; + if (resource_quantity == 0) { + continue; + } auto it = available_resources_.find(resource_name); if (it == available_resources_.end()) { @@ -368,16 +352,14 @@ ResourceIdSet ResourceIdSet::Acquire(const ResourceSet &resource_set) { for (auto const &resource_pair : resource_set.GetResourceMap()) { auto const &resource_name = resource_pair.first; double resource_quantity = resource_pair.second; - RAY_CHECK(resource_quantity > 0 + EPSILON) << "Resource " << resource_name - << " capacity is " << resource_quantity - << ". Should have been greater than zero."; + + if (resource_quantity == 0) { + continue; + } auto it = available_resources_.find(resource_name); RAY_CHECK(it != available_resources_.end()); acquired_resources[resource_name] = it->second.Acquire(resource_quantity); - if (it->second.TotalQuantityIsZero()) { - available_resources_.erase(it); - } } return ResourceIdSet(acquired_resources); } @@ -386,10 +368,15 @@ void ResourceIdSet::Release(const ResourceIdSet &resource_id_set) { for (auto const &resource_pair : resource_id_set.AvailableResources()) { auto const &resource_name = resource_pair.first; auto const &resource_ids = resource_pair.second; - RAY_CHECK(!resource_ids.TotalQuantityIsZero()); + + if (resource_ids.TotalQuantity() == 0) { + continue; + } auto it = available_resources_.find(resource_name); if (it == available_resources_.end()) { + // This should not happen when Release is called on resources that were obtained + // through a corresponding call to Acquire. available_resources_[resource_name] = resource_ids; } else { it->second.Release(resource_ids); @@ -487,6 +474,18 @@ SchedulingResources::SchedulingResources(const ResourceSet &total) SchedulingResources::~SchedulingResources() {} +ResourceAvailabilityStatus SchedulingResources::CheckResourcesSatisfied( + ResourceSet &resources) const { + if (!resources.IsSubset(resources_total_)) { + return ResourceAvailabilityStatus::kInfeasible; + } + // Resource demand specified is feasible. Check if it's available. + if (!resources.IsSubset(resources_available_)) { + return ResourceAvailabilityStatus::kResourcesUnavailable; + } + return ResourceAvailabilityStatus::kFeasible; +} + const ResourceSet &SchedulingResources::GetAvailableResources() const { return resources_available_; } @@ -508,13 +507,13 @@ const ResourceSet &SchedulingResources::GetLoadResources() const { } // Return specified resources back to SchedulingResources. -void SchedulingResources::Release(const ResourceSet &resources) { - resources_available_.AddResources(resources); +bool SchedulingResources::Release(const ResourceSet &resources) { + return resources_available_.AddResourcesStrict(resources); } // Take specified resources from SchedulingResources. -void SchedulingResources::Acquire(const ResourceSet &resources) { - resources_available_.SubtractResourcesStrict(resources); +bool SchedulingResources::Acquire(const ResourceSet &resources) { + return resources_available_.SubtractResourcesStrict(resources); } std::string SchedulingResources::DebugString() const { diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index df1a53779..daf6372c4 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -8,25 +8,19 @@ #include "ray/raylet/format/node_manager_generated.h" -#define EPSILON 0.00001 - namespace ray { namespace raylet { const std::string kCPU_ResourceLabel = "CPU"; -/// \brief Test if the quantity is within epsilon bounds of 0. -/// -/// \param quantity: Quantity to check -/// \return True if -epsilon <= Quantity <= epsilon, False otherwise. -bool EqualsZeroEpsilon(double quantity); - -/// \brief Test if the quantity is within epsilon bounds of 1. -/// -/// \param quantity: Quantity to check -/// \return True if 1 - epsilon <= Quantity <= 1 + epsilon, False otherwise. -bool EqualsOneEpsilon(double quantity); +/// Resource availability status reports whether the resource requirement is +/// (1) infeasible, (2) feasible but currently unavailable, or (3) available. +enum class ResourceAvailabilityStatus : int { + kInfeasible, ///< Cannot ever satisfy resource requirements. + kResourcesUnavailable, ///< Feasible, but not currently available. + kFeasible ///< Feasible and currently available. +}; /// \class ResourceSet /// \brief Encapsulates and operates on a set of resources, including CPUs, @@ -73,12 +67,26 @@ class ResourceSet { /// False otherwise. bool IsSuperset(const ResourceSet &other) const; + /// \brief Add a new resource to the resource set. + /// + /// \param resource_name: name/label of the resource to add. + /// \param capacity: numeric capacity value for the resource to add. + /// \return True, if the resource was successfully added. False otherwise. + bool AddResource(const std::string &resource_name, double capacity); + /// \brief Remove the specified resource from the resource set. /// /// \param resource_name: name/label of the resource to remove. /// \return True, if the resource was successfully removed. False otherwise. bool RemoveResource(const std::string &resource_name); + /// \brief Add a set of resources to the current set of resources only if the resource + /// labels match. + /// + /// \param other: The other resource set to add. + /// \return True if the resource set was added successfully. False otherwise. + bool AddResourcesStrict(const ResourceSet &other); + /// \brief Aggregate resources from the other set into this set, adding any missing /// resource labels to this set. /// @@ -86,28 +94,21 @@ class ResourceSet { /// \return Void. void AddResources(const ResourceSet &other); - /// \brief Subtract a set of resources from the current set of resources. - /// Deletes any resource if the capacity after subtraction is zero or negative. + /// \brief Subtract a set of resources from the current set of resources, only if + /// resource labels match. /// /// \param other: The resource set to subtract from the current resource set. - /// \return Void. - void SubtractResources(const ResourceSet &other); - - /// \brief Subtract a set of resources from the current set of resources and - /// check that the post-subtraction result nonnegative. Assumes other - /// is a subset of the ResourceSet. Deletes any resource if the capacity after - /// subtraction is zero. - /// - /// \param other: The resource set to subtract from the current resource set. - /// \return Void. - void SubtractResourcesStrict(const ResourceSet &other); + /// \return True if the resource set was subtracted successfully. + /// False otherwise. + bool SubtractResourcesStrict(const ResourceSet &other); /// Return the capacity value associated with the specified resource. /// /// \param resource_name: Resource name for which capacity is requested. - /// \return The capacity value associated with the specified resource, zero if resource - /// does not exist. - double GetResource(const std::string &resource_name) const; + /// \param[out] value: Resource capacity value. + /// \return True if the resource capacity value was successfully retrieved. + /// False otherwise. + bool GetResource(const std::string &resource_name, double *value) const; /// Return the number of CPUs. /// @@ -125,7 +126,7 @@ class ResourceSet { const std::string ToString() const; private: - /// Resource capacity map. The capacities (double) are always positive. + /// Resource capacity map. std::unordered_map resource_capacity_; }; @@ -204,11 +205,6 @@ class ResourceIds { /// \return The fractional IDs. const std::vector> &FractionalIds() const; - /// \brief Check if ResourceIds has any resources. - /// - /// \return True if there are no whole or fractional resources. False otherwise. - bool TotalQuantityIsZero() const; - /// \brief Return the total quantity of resources, ignoring the specific IDs. /// /// \return The total quantity of the resource. @@ -333,6 +329,13 @@ class SchedulingResources { /// \brief SchedulingResources destructor. ~SchedulingResources(); + /// \brief Check if the specified resource request can be satisfied. + /// + /// \param set: The set of resources representing the resource request. + /// \return Availability status that specifies if the requested resource set + /// is feasible, infeasible, or feasible but unavailable. + ResourceAvailabilityStatus CheckResourcesSatisfied(ResourceSet &set) const; + /// \brief Request the set and capacity of resources currently available. /// /// \return Immutable set of resources with currently available capacity. @@ -360,14 +363,16 @@ class SchedulingResources { /// \brief Release the amount of resources specified. /// /// \param resources: the amount of resources to be released. - /// \return Void. - void Release(const ResourceSet &resources); + /// \return True if resources were successfully released. False otherwise. + bool Release(const ResourceSet &resources); /// \brief Acquire the amount of resources specified. /// /// \param resources: the amount of resources to be acquired. - /// \return Void. - void Acquire(const ResourceSet &resources); + /// \return True if resources were acquired without oversubscription. If this + /// returns false, then the resources were still acquired, but we are now at + /// negative resources. + bool Acquire(const ResourceSet &resources); /// Returns debug string for class. /// diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 9799dfb80..c548fc924 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -77,7 +77,7 @@ static inline TaskSpecification ExampleTaskSpec( std::vector function_descriptor(3); return TaskSpecification(DriverID::nil(), TaskID::nil(), 0, ActorID::nil(), ObjectID::nil(), 0, actor_id, ActorHandleID::nil(), 0, {}, {}, - 0, {}, {}, language, function_descriptor); + 0, {{}}, {{}}, language, function_descriptor); } TEST_F(WorkerPoolTest, HandleWorkerRegistration) {