Enable plasma fallback allocations by default (#16244)

This commit is contained in:
Eric Liang 2021-06-09 22:05:52 -07:00 committed by GitHub
parent 67761a4fc5
commit d390344a8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 81 additions and 52 deletions

View file

@ -1492,6 +1492,9 @@ cdef class CoreWorker:
return resources_dict
def plasma_unlimited(self):
return RayConfig.instance().plasma_unlimited()
def profile_event(self, c_string event_type, object extra_data=None):
if RayConfig.instance().enable_timeline():
return ProfileEvent.make(

View file

@ -63,4 +63,6 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_bool automatic_object_deletion_enabled() const
c_bool plasma_unlimited() const
uint32_t max_grpc_message_size() const

View file

@ -108,6 +108,10 @@ cdef class Config:
def automatic_object_deletion_enabled():
return RayConfig.instance().automatic_object_deletion_enabled()
@staticmethod
def plasma_unlimited():
return RayConfig.instance().plasma_unlimited()
@staticmethod
def max_grpc_message_size():
return RayConfig.instance().max_grpc_message_size()

View file

@ -16,6 +16,9 @@ def test_fill_object_store_exception(shutdown_only):
object_store_memory=10**8,
_system_config={"automatic_object_spilling_enabled": False})
if ray.worker.global_worker.core_worker.plasma_unlimited():
return # No exception is raised.
@ray.remote
def expensive_task():
return np.zeros((10**8) // 10, dtype=np.uint8)

View file

@ -106,6 +106,9 @@ def test_default_config(shutdown_only):
"object_store_full_delay_ms": 100
})
assert "object_spilling_config" not in ray.worker._global_node._config
if ray.worker.global_worker.core_worker.plasma_unlimited():
run_basic_workload()
else:
with pytest.raises(ray.exceptions.ObjectStoreFullError):
run_basic_workload()
ray.shutdown()
@ -163,6 +166,9 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config,
arr = np.random.rand(5 * 1024 * 1024) # 40 MB
ref = ray.get(ray.put(arr)) # noqa
# Since the ref exists, it should raise OOM.
if ray.worker.global_worker.core_worker.plasma_unlimited():
ref2 = ray.put(arr) # noqa
else:
with pytest.raises(ray.exceptions.ObjectStoreFullError):
ref2 = ray.put(arr) # noqa

View file

@ -164,6 +164,29 @@ def test_task_unlimited_multiget_args():
ray.shutdown()
# TODO(ekl) enable this test once we implement this behavior.
# @pytest.mark.skipif(
# platform.system() == "Windows", reason="Need to fix up for Windows.")
# def test_task_unlimited_huge_args():
# try:
# address = _init_ray()
#
# # PullManager should raise an error, since the set of task args is
# # too huge to fit into memory.
# @ray.remote
# def consume(*refs):
# return "ok"
#
# # Too many refs to fit into memory.
# refs = []
# for _ in range(10):
# refs.append(ray.put(np.zeros(200 * MB, dtype=np.uint8)))
#
# with pytest.raises(Exception):
# ray.get(consume.remote(*refs))
# finally:
# ray.shutdown()
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -397,6 +397,7 @@ RAY_CONFIG(int64_t, max_fused_object_count, 2000)
RAY_CONFIG(bool, automatic_object_deletion_enabled, true)
/// Grace period until we throw the OOM error to the application in seconds.
/// In unlimited allocation mode, this is the time delay prior to fallback allocating.
RAY_CONFIG(int64_t, oom_grace_period_s, 10)
/// Whether or not the external storage is file system.

View file

@ -43,8 +43,6 @@ enum class ObjectState : int {
PLASMA_CREATED = 1,
/// Object is sealed and stored in the local Plasma Store.
PLASMA_SEALED = 2,
/// Object is evicted to external store.
PLASMA_EVICTED = 3,
};
/// This type is used by the Plasma store. It is here because it is exposed to

View file

@ -115,10 +115,6 @@ Status CreateRequestQueue::ProcessRequests() {
oom_start_time_ns_ = now;
}
auto grace_period_ns = oom_grace_period_ns_;
if (plasma_unlimited_) {
// Use a faster timeout in unlimited allocation mode to avoid excess latency.
grace_period_ns = std::min(grace_period_ns, (int64_t)2e9);
}
if (status.IsTransientObjectStoreFull() || spill_objects_callback_()) {
oom_start_time_ns_ = -1;
return Status::TransientObjectStoreFull("Waiting for objects to seal or spill.");
@ -130,11 +126,9 @@ Status CreateRequestQueue::ProcessRequests() {
} else {
if (plasma_unlimited_) {
// Trigger the fallback allocator.
RAY_CHECK_OK(ProcessRequest(*request_it, /*fallback_allocator=*/true));
// Note that we don't reset oom_start_time_ns_ until we complete a
// "normal" allocation.
FinishRequest(request_it);
} else {
status = ProcessRequest(*request_it, /*fallback_allocator=*/true);
}
if (!status.ok()) {
std::string dump = "";
if (dump_debug_info_callback_ && !logged_oom) {
dump = dump_debug_info_callback_();
@ -144,10 +138,8 @@ Status CreateRequestQueue::ProcessRequests() {
<< (*request_it)->object_id << " of size "
<< (*request_it)->object_size / 1024 / 1024 << "MB\n"
<< dump;
// Raise OOM. In this case, the request will be marked as OOM.
// We don't return so that we can process the next entry right away.
FinishRequest(request_it);
}
FinishRequest(request_it);
}
}
}

View file

@ -267,7 +267,7 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, MEMFD_TYPE *fd, int64_t *map_s
// Fallback to allocating from the filesystem.
if (pointer == nullptr && RayConfig::instance().plasma_unlimited() &&
fallback_allocator) {
RAY_LOG(ERROR)
RAY_LOG(INFO)
<< "Shared memory store full, falling back to allocating from filesystem: "
<< size;
pointer = reinterpret_cast<uint8_t *>(
@ -560,30 +560,6 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
// If necessary, record that this client is using this object. In the case
// where entry == NULL, this will be called from SealObject.
AddToClientObjectIds(object_id, entry, client);
} else if (entry && entry->state == ObjectState::PLASMA_EVICTED) {
// Make sure the object pointer is not already allocated
RAY_CHECK(!entry->pointer);
PlasmaError error = PlasmaError::OK;
// TODO(ekl) this is dead code (we don't use the plasma eviction path).
// We should remove this.
entry->pointer =
AllocateMemory(entry->data_size + entry->metadata_size, &entry->fd,
&entry->map_size, &entry->offset, client, /*is_create=*/false,
/*fallback_allocator=*/true, &error);
if (entry->pointer) {
// TODO(suquark): Not sure if this old behavior is still compatible
// with our current object spilling mechanics.
entry->state = ObjectState::PLASMA_CREATED;
entry->create_time = std::time(nullptr);
eviction_policy_.ObjectCreated(object_id, client.get(), false);
AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
} else {
// We are out of memory and cannot allocate memory for this object.
// Change the state of the object back to PLASMA_EVICTED so some
// other request can try again.
entry->state = ObjectState::PLASMA_EVICTED;
}
} else {
// Add a placeholder plasma object to the get request to indicate that the
// object is not present. This will be parsed by the client. We set the
@ -674,8 +650,7 @@ void PlasmaStore::ReleaseObject(const ObjectID &object_id,
// Check if an object is present.
ObjectStatus PlasmaStore::ContainsObject(const ObjectID &object_id) {
auto entry = GetObjectTableEntry(&store_info_, object_id);
return entry && (entry->state == ObjectState::PLASMA_SEALED ||
entry->state == ObjectState::PLASMA_EVICTED)
return entry && entry->state == ObjectState::PLASMA_SEALED
? ObjectStatus::OBJECT_FOUND
: ObjectStatus::OBJECT_NOT_FOUND;
}

View file

@ -93,8 +93,7 @@ bool PullManager::ActivateNextPullBundleRequest(const Queue &bundles,
return false;
}
if (next_request_it->second.num_object_sizes_missing > 0 &&
!RayConfig::instance().plasma_unlimited()) {
if (next_request_it->second.num_object_sizes_missing > 0) {
// There is at least one object size missing. We should not activate the
// bundle, since it may put us over the available capacity.
return false;

View file

@ -499,6 +499,7 @@ TEST_P(PullManagerTest, TestDuplicateObjectsAreActivatedAndCleanedUp) {
}
TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
auto is_worker_request = GetParam();
/// Test admission control for a single pull bundle request. We should
/// activate the request when we are under the reported capacity and
/// deactivate it when we are over.
@ -507,7 +508,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
size_t object_size = 2;
AssertNumActiveRequestsEquals(0);
std::vector<rpc::ObjectReference> objects_to_locate;
auto req_id = pull_manager_.Pull(refs, GetParam(), &objects_to_locate);
auto req_id = pull_manager_.Pull(refs, is_worker_request, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), oids);
std::unordered_set<NodeID> client_ids;
@ -529,6 +530,14 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
ASSERT_TRUE(num_abort_calls_.empty());
ASSERT_EQ(num_object_store_full_calls_, 0);
pull_manager_.UpdatePullsBasedOnAvailableMemory(oids.size() * object_size - 1);
// In unlimited mode, we fulfill all ray.gets using the fallback allocator.
if (RayConfig::instance().plasma_unlimited() && is_worker_request) {
AssertNumActiveRequestsEquals(3);
ASSERT_EQ(num_object_store_full_calls_, 0);
return;
}
AssertNumActiveRequestsEquals(0);
ASSERT_EQ(num_object_store_full_calls_, 1);
for (auto &oid : oids) {
@ -569,6 +578,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestBasic) {
}
TEST_P(PullManagerWithAdmissionControlTest, TestQueue) {
bool is_worker_request = GetParam();
/// Test admission control for a queue of pull bundle requests. We should
/// activate as many requests as we can, subject to the reported capacity.
int object_size = 2;
@ -581,7 +591,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestQueue) {
auto refs = CreateObjectRefs(num_oids_per_request);
auto oids = ObjectRefsToIds(refs);
std::vector<rpc::ObjectReference> objects_to_locate;
auto req_id = pull_manager_.Pull(refs, GetParam(), &objects_to_locate);
auto req_id = pull_manager_.Pull(refs, is_worker_request, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), oids);
bundles.push_back(oids);
@ -599,12 +609,17 @@ TEST_P(PullManagerWithAdmissionControlTest, TestQueue) {
for (int capacity = 0; capacity < 20; capacity++) {
int num_requests_expected =
std::min(num_requests, capacity / (object_size * num_oids_per_request));
if (RayConfig::instance().plasma_unlimited() && is_worker_request) {
num_requests_expected = num_requests;
}
pull_manager_.UpdatePullsBasedOnAvailableMemory(capacity);
AssertNumActiveRequestsEquals(num_requests_expected * num_oids_per_request);
if (!RayConfig::instance().plasma_unlimited()) {
// The total requests that are active is under the specified capacity.
ASSERT_TRUE(
IsUnderCapacity(num_requests_expected * num_oids_per_request * object_size));
}
// This is the maximum number of requests that can be served at once that
// is under the capacity.
if (num_requests_expected < num_requests) {
@ -634,6 +649,10 @@ TEST_P(PullManagerWithAdmissionControlTest, TestQueue) {
}
TEST_P(PullManagerWithAdmissionControlTest, TestCancel) {
auto is_worker_request = GetParam();
if (RayConfig::instance().plasma_unlimited() && is_worker_request) {
return; // This case isn't meaningful to test.
}
/// Test admission control while requests are cancelled out-of-order. When an
/// active request is cancelled, we should activate another request in the
/// queue, if there is one that satisfies the reported capacity.
@ -646,7 +665,7 @@ TEST_P(PullManagerWithAdmissionControlTest, TestCancel) {
std::vector<int64_t> req_ids;
for (auto &ref : refs) {
std::vector<rpc::ObjectReference> objects_to_locate;
auto req_id = pull_manager_.Pull({ref}, GetParam(), &objects_to_locate);
auto req_id = pull_manager_.Pull({ref}, is_worker_request, &objects_to_locate);
req_ids.push_back(req_id);
}
for (size_t i = 0; i < object_sizes.size(); i++) {
@ -773,7 +792,11 @@ TEST_F(PullManagerWithAdmissionControlTest, TestPrioritizeWorkerRequests) {
// the same type by FIFO order.
pull_manager_.UpdatePullsBasedOnAvailableMemory(2);
ASSERT_TRUE(pull_manager_.IsObjectActive(worker_oids[0]));
if (RayConfig::instance().plasma_unlimited()) {
ASSERT_TRUE(pull_manager_.IsObjectActive(worker_oids[1]));
} else {
ASSERT_FALSE(pull_manager_.IsObjectActive(worker_oids[1]));
}
ASSERT_FALSE(pull_manager_.IsObjectActive(task_oids[0]));
ASSERT_FALSE(pull_manager_.IsObjectActive(task_oids[1]));