From d390344a8f9d0d3f772ab01a1c5a2280c887f562 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 9 Jun 2021 22:05:52 -0700 Subject: [PATCH] Enable plasma fallback allocations by default (#16244) --- python/ray/_raylet.pyx | 3 ++ python/ray/includes/ray_config.pxd | 2 + python/ray/includes/ray_config.pxi | 4 ++ python/ray/tests/test_failure_4.py | 3 ++ python/ray/tests/test_object_spilling.py | 10 ++++- python/ray/tests/test_plasma_unlimited.py | 23 ++++++++++++ src/ray/common/ray_config_def.h | 1 + src/ray/object_manager/plasma/common.h | 2 - .../plasma/create_request_queue.cc | 16 ++------ src/ray/object_manager/plasma/store.cc | 29 +-------------- src/ray/object_manager/pull_manager.cc | 3 +- .../object_manager/test/pull_manager_test.cc | 37 +++++++++++++++---- 12 files changed, 81 insertions(+), 52 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c555b9a29..544c7205c 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1492,6 +1492,9 @@ cdef class CoreWorker: return resources_dict + def plasma_unlimited(self): + return RayConfig.instance().plasma_unlimited() + def profile_event(self, c_string event_type, object extra_data=None): if RayConfig.instance().enable_timeline(): return ProfileEvent.make( diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 309132cf7..a7c8bf54c 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -63,4 +63,6 @@ cdef extern from "ray/common/ray_config.h" nogil: c_bool automatic_object_deletion_enabled() const + c_bool plasma_unlimited() const + uint32_t max_grpc_message_size() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index d6c28805c..8ffd3e883 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -108,6 +108,10 @@ cdef class Config: def automatic_object_deletion_enabled(): return RayConfig.instance().automatic_object_deletion_enabled() + @staticmethod + def plasma_unlimited(): + return RayConfig.instance().plasma_unlimited() + @staticmethod def max_grpc_message_size(): return RayConfig.instance().max_grpc_message_size() diff --git a/python/ray/tests/test_failure_4.py b/python/ray/tests/test_failure_4.py index 09d1fad7e..b7346fd6e 100644 --- a/python/ray/tests/test_failure_4.py +++ b/python/ray/tests/test_failure_4.py @@ -16,6 +16,9 @@ def test_fill_object_store_exception(shutdown_only): object_store_memory=10**8, _system_config={"automatic_object_spilling_enabled": False}) + if ray.worker.global_worker.core_worker.plasma_unlimited(): + return # No exception is raised. + @ray.remote def expensive_task(): return np.zeros((10**8) // 10, dtype=np.uint8) diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 670db326e..f53dde77a 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -106,8 +106,11 @@ def test_default_config(shutdown_only): "object_store_full_delay_ms": 100 }) assert "object_spilling_config" not in ray.worker._global_node._config - with pytest.raises(ray.exceptions.ObjectStoreFullError): + if ray.worker.global_worker.core_worker.plasma_unlimited(): run_basic_workload() + else: + with pytest.raises(ray.exceptions.ObjectStoreFullError): + run_basic_workload() ray.shutdown() # Make sure when we use a different config, it is reflected. @@ -163,8 +166,11 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config, arr = np.random.rand(5 * 1024 * 1024) # 40 MB ref = ray.get(ray.put(arr)) # noqa # Since the ref exists, it should raise OOM. - with pytest.raises(ray.exceptions.ObjectStoreFullError): + if ray.worker.global_worker.core_worker.plasma_unlimited(): ref2 = ray.put(arr) # noqa + else: + with pytest.raises(ray.exceptions.ObjectStoreFullError): + ref2 = ray.put(arr) # noqa wait_for_condition(lambda: is_dir_empty(temp_folder)) assert_no_thrashing(address["redis_address"]) diff --git a/python/ray/tests/test_plasma_unlimited.py b/python/ray/tests/test_plasma_unlimited.py index 429ef1241..4a719cc90 100644 --- a/python/ray/tests/test_plasma_unlimited.py +++ b/python/ray/tests/test_plasma_unlimited.py @@ -164,6 +164,29 @@ def test_task_unlimited_multiget_args(): ray.shutdown() +# TODO(ekl) enable this test once we implement this behavior. +# @pytest.mark.skipif( +# platform.system() == "Windows", reason="Need to fix up for Windows.") +# def test_task_unlimited_huge_args(): +# try: +# address = _init_ray() +# +# # PullManager should raise an error, since the set of task args is +# # too huge to fit into memory. +# @ray.remote +# def consume(*refs): +# return "ok" +# +# # Too many refs to fit into memory. +# refs = [] +# for _ in range(10): +# refs.append(ray.put(np.zeros(200 * MB, dtype=np.uint8))) +# +# with pytest.raises(Exception): +# ray.get(consume.remote(*refs)) +# finally: +# ray.shutdown() + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index c6d75d283..3c9978181 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -397,6 +397,7 @@ RAY_CONFIG(int64_t, max_fused_object_count, 2000) RAY_CONFIG(bool, automatic_object_deletion_enabled, true) /// Grace period until we throw the OOM error to the application in seconds. +/// In unlimited allocation mode, this is the time delay prior to fallback allocating. RAY_CONFIG(int64_t, oom_grace_period_s, 10) /// Whether or not the external storage is file system. diff --git a/src/ray/object_manager/plasma/common.h b/src/ray/object_manager/plasma/common.h index 2090d014b..a330d87e4 100644 --- a/src/ray/object_manager/plasma/common.h +++ b/src/ray/object_manager/plasma/common.h @@ -43,8 +43,6 @@ enum class ObjectState : int { PLASMA_CREATED = 1, /// Object is sealed and stored in the local Plasma Store. PLASMA_SEALED = 2, - /// Object is evicted to external store. - PLASMA_EVICTED = 3, }; /// This type is used by the Plasma store. It is here because it is exposed to diff --git a/src/ray/object_manager/plasma/create_request_queue.cc b/src/ray/object_manager/plasma/create_request_queue.cc index 2ae276e35..ce596218d 100644 --- a/src/ray/object_manager/plasma/create_request_queue.cc +++ b/src/ray/object_manager/plasma/create_request_queue.cc @@ -115,10 +115,6 @@ Status CreateRequestQueue::ProcessRequests() { oom_start_time_ns_ = now; } auto grace_period_ns = oom_grace_period_ns_; - if (plasma_unlimited_) { - // Use a faster timeout in unlimited allocation mode to avoid excess latency. - grace_period_ns = std::min(grace_period_ns, (int64_t)2e9); - } if (status.IsTransientObjectStoreFull() || spill_objects_callback_()) { oom_start_time_ns_ = -1; return Status::TransientObjectStoreFull("Waiting for objects to seal or spill."); @@ -130,11 +126,9 @@ Status CreateRequestQueue::ProcessRequests() { } else { if (plasma_unlimited_) { // Trigger the fallback allocator. - RAY_CHECK_OK(ProcessRequest(*request_it, /*fallback_allocator=*/true)); - // Note that we don't reset oom_start_time_ns_ until we complete a - // "normal" allocation. - FinishRequest(request_it); - } else { + status = ProcessRequest(*request_it, /*fallback_allocator=*/true); + } + if (!status.ok()) { std::string dump = ""; if (dump_debug_info_callback_ && !logged_oom) { dump = dump_debug_info_callback_(); @@ -144,10 +138,8 @@ Status CreateRequestQueue::ProcessRequests() { << (*request_it)->object_id << " of size " << (*request_it)->object_size / 1024 / 1024 << "MB\n" << dump; - // Raise OOM. In this case, the request will be marked as OOM. - // We don't return so that we can process the next entry right away. - FinishRequest(request_it); } + FinishRequest(request_it); } } } diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 41ae230e4..5e2f9ee99 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -267,7 +267,7 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, MEMFD_TYPE *fd, int64_t *map_s // Fallback to allocating from the filesystem. if (pointer == nullptr && RayConfig::instance().plasma_unlimited() && fallback_allocator) { - RAY_LOG(ERROR) + RAY_LOG(INFO) << "Shared memory store full, falling back to allocating from filesystem: " << size; pointer = reinterpret_cast( @@ -560,30 +560,6 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr &client, // If necessary, record that this client is using this object. In the case // where entry == NULL, this will be called from SealObject. AddToClientObjectIds(object_id, entry, client); - } else if (entry && entry->state == ObjectState::PLASMA_EVICTED) { - // Make sure the object pointer is not already allocated - RAY_CHECK(!entry->pointer); - - PlasmaError error = PlasmaError::OK; - // TODO(ekl) this is dead code (we don't use the plasma eviction path). - // We should remove this. - entry->pointer = - AllocateMemory(entry->data_size + entry->metadata_size, &entry->fd, - &entry->map_size, &entry->offset, client, /*is_create=*/false, - /*fallback_allocator=*/true, &error); - if (entry->pointer) { - // TODO(suquark): Not sure if this old behavior is still compatible - // with our current object spilling mechanics. - entry->state = ObjectState::PLASMA_CREATED; - entry->create_time = std::time(nullptr); - eviction_policy_.ObjectCreated(object_id, client.get(), false); - AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client); - } else { - // We are out of memory and cannot allocate memory for this object. - // Change the state of the object back to PLASMA_EVICTED so some - // other request can try again. - entry->state = ObjectState::PLASMA_EVICTED; - } } else { // Add a placeholder plasma object to the get request to indicate that the // object is not present. This will be parsed by the client. We set the @@ -674,8 +650,7 @@ void PlasmaStore::ReleaseObject(const ObjectID &object_id, // Check if an object is present. ObjectStatus PlasmaStore::ContainsObject(const ObjectID &object_id) { auto entry = GetObjectTableEntry(&store_info_, object_id); - return entry && (entry->state == ObjectState::PLASMA_SEALED || - entry->state == ObjectState::PLASMA_EVICTED) + return entry && entry->state == ObjectState::PLASMA_SEALED ? ObjectStatus::OBJECT_FOUND : ObjectStatus::OBJECT_NOT_FOUND; } diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index 32472b824..84461ef2f 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -93,8 +93,7 @@ bool PullManager::ActivateNextPullBundleRequest(const Queue &bundles, return false; } - if (next_request_it->second.num_object_sizes_missing > 0 && - !RayConfig::instance().plasma_unlimited()) { + if (next_request_it->second.num_object_sizes_missing > 0) { // There is at least one object size missing. We should not activate the // bundle, since it may put us over the available capacity. return false; diff --git a/src/ray/object_manager/test/pull_manager_test.cc b/src/ray/object_manager/test/pull_manager_test.cc index 300c46d16..411b98982 100644 --- a/src/ray/object_manager/test/pull_manager_test.cc +++ b/src/ray/object_manager/test/pull_manager_test.cc @@ -499,6 +499,7 @@ TEST_P(PullManagerTest, TestDuplicateObjectsAreActivatedAndCleanedUp) { } TEST_P(PullManagerWithAdmissionControlTest, TestBasic) { + auto is_worker_request = GetParam(); /// Test admission control for a single pull bundle request. We should /// activate the request when we are under the reported capacity and /// deactivate it when we are over. @@ -507,7 +508,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) { size_t object_size = 2; AssertNumActiveRequestsEquals(0); std::vector objects_to_locate; - auto req_id = pull_manager_.Pull(refs, GetParam(), &objects_to_locate); + auto req_id = pull_manager_.Pull(refs, is_worker_request, &objects_to_locate); ASSERT_EQ(ObjectRefsToIds(objects_to_locate), oids); std::unordered_set client_ids; @@ -529,6 +530,14 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) { ASSERT_TRUE(num_abort_calls_.empty()); ASSERT_EQ(num_object_store_full_calls_, 0); pull_manager_.UpdatePullsBasedOnAvailableMemory(oids.size() * object_size - 1); + + // In unlimited mode, we fulfill all ray.gets using the fallback allocator. + if (RayConfig::instance().plasma_unlimited() && is_worker_request) { + AssertNumActiveRequestsEquals(3); + ASSERT_EQ(num_object_store_full_calls_, 0); + return; + } + AssertNumActiveRequestsEquals(0); ASSERT_EQ(num_object_store_full_calls_, 1); for (auto &oid : oids) { @@ -569,6 +578,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) { } TEST_P(PullManagerWithAdmissionControlTest, TestQueue) { + bool is_worker_request = GetParam(); /// Test admission control for a queue of pull bundle requests. We should /// activate as many requests as we can, subject to the reported capacity. int object_size = 2; @@ -581,7 +591,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestQueue) { auto refs = CreateObjectRefs(num_oids_per_request); auto oids = ObjectRefsToIds(refs); std::vector objects_to_locate; - auto req_id = pull_manager_.Pull(refs, GetParam(), &objects_to_locate); + auto req_id = pull_manager_.Pull(refs, is_worker_request, &objects_to_locate); ASSERT_EQ(ObjectRefsToIds(objects_to_locate), oids); bundles.push_back(oids); @@ -599,12 +609,17 @@ TEST_P(PullManagerWithAdmissionControlTest, TestQueue) { for (int capacity = 0; capacity < 20; capacity++) { int num_requests_expected = std::min(num_requests, capacity / (object_size * num_oids_per_request)); + if (RayConfig::instance().plasma_unlimited() && is_worker_request) { + num_requests_expected = num_requests; + } pull_manager_.UpdatePullsBasedOnAvailableMemory(capacity); AssertNumActiveRequestsEquals(num_requests_expected * num_oids_per_request); - // The total requests that are active is under the specified capacity. - ASSERT_TRUE( - IsUnderCapacity(num_requests_expected * num_oids_per_request * object_size)); + if (!RayConfig::instance().plasma_unlimited()) { + // The total requests that are active is under the specified capacity. + ASSERT_TRUE( + IsUnderCapacity(num_requests_expected * num_oids_per_request * object_size)); + } // This is the maximum number of requests that can be served at once that // is under the capacity. if (num_requests_expected < num_requests) { @@ -634,6 +649,10 @@ TEST_P(PullManagerWithAdmissionControlTest, TestQueue) { } TEST_P(PullManagerWithAdmissionControlTest, TestCancel) { + auto is_worker_request = GetParam(); + if (RayConfig::instance().plasma_unlimited() && is_worker_request) { + return; // This case isn't meaningful to test. + } /// Test admission control while requests are cancelled out-of-order. When an /// active request is cancelled, we should activate another request in the /// queue, if there is one that satisfies the reported capacity. @@ -646,7 +665,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestCancel) { std::vector req_ids; for (auto &ref : refs) { std::vector objects_to_locate; - auto req_id = pull_manager_.Pull({ref}, GetParam(), &objects_to_locate); + auto req_id = pull_manager_.Pull({ref}, is_worker_request, &objects_to_locate); req_ids.push_back(req_id); } for (size_t i = 0; i < object_sizes.size(); i++) { @@ -773,7 +792,11 @@ TEST_F(PullManagerWithAdmissionControlTest, TestPrioritizeWorkerRequests) { // the same type by FIFO order. pull_manager_.UpdatePullsBasedOnAvailableMemory(2); ASSERT_TRUE(pull_manager_.IsObjectActive(worker_oids[0])); - ASSERT_FALSE(pull_manager_.IsObjectActive(worker_oids[1])); + if (RayConfig::instance().plasma_unlimited()) { + ASSERT_TRUE(pull_manager_.IsObjectActive(worker_oids[1])); + } else { + ASSERT_FALSE(pull_manager_.IsObjectActive(worker_oids[1])); + } ASSERT_FALSE(pull_manager_.IsObjectActive(task_oids[0])); ASSERT_FALSE(pull_manager_.IsObjectActive(task_oids[1]));