Don't release resources during plasma fetch (#13025)

This commit is contained in:
Eric Liang 2020-12-21 18:32:40 -08:00 committed by GitHub
parent 015a0f9935
commit 8068041006
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 75 additions and 23 deletions

View file

@ -563,5 +563,59 @@ def test_fusion_objects(tmp_path, shutdown_only):
assert is_test_passing
# https://github.com/ray-project/ray/issues/12912
def do_test_release_resource(tmp_path, expect_released):
temp_folder = tmp_path / "spill"
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
"release_resources_during_plasma_fetch": expect_released,
"automatic_object_spilling_enabled": True,
"object_spilling_config": json.dumps({
"type": "filesystem",
"params": {
"directory_path": str(temp_folder)
}
}),
})
plasma_obj = ray.put(np.ones(50 * 1024 * 1024, dtype=np.uint8))
for _ in range(5):
ray.put(np.ones(50 * 1024 * 1024, dtype=np.uint8)) # Force spilling
@ray.remote
def sneaky_task_tries_to_steal_released_resources():
print("resources were released!")
@ray.remote
def f(dep):
while True:
try:
ray.get(dep[0], timeout=0.001)
except ray.exceptions.GetTimeoutError:
pass
done = f.remote([plasma_obj]) # noqa
canary = sneaky_task_tries_to_steal_released_resources.remote()
ready, _ = ray.wait([canary], timeout=2)
if expect_released:
assert ready
else:
assert not ready
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_no_release_during_plasma_fetch(tmp_path, shutdown_only):
do_test_release_resource(tmp_path, expect_released=False)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_release_during_plasma_fetch(tmp_path, shutdown_only):
do_test_release_resource(tmp_path, expect_released=True)
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -274,6 +274,10 @@ RAY_CONFIG(int32_t, minimum_gcs_reconnect_interval_milliseconds, 5000)
/// Whether start the Plasma Store as a Raylet thread.
RAY_CONFIG(bool, plasma_store_as_thread, false)
/// Whether to release worker CPUs during plasma fetches.
/// See https://github.com/ray-project/ray/issues/12912 for further discussion.
RAY_CONFIG(bool, release_resources_during_plasma_fetch, false)
/// The interval at which the gcs client will check if the address of gcs service has
/// changed. When the address changed, we will resubscribe again.
RAY_CONFIG(int64_t, gcs_service_address_check_interval_milliseconds, 1000)

View file

@ -232,18 +232,16 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_
Status CoreWorkerMemoryStore::Get(const std::vector<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
const WorkerContext &ctx, bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results,
bool release_resources) {
std::vector<std::shared_ptr<RayObject>> *results) {
return GetImpl(object_ids, num_objects, timeout_ms, ctx, remove_after_get, results,
/*abort_if_any_object_is_exception=*/true, release_resources);
/*abort_if_any_object_is_exception=*/true);
}
Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
int num_objects, int64_t timeout_ms,
const WorkerContext &ctx, bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results,
bool abort_if_any_object_is_exception,
bool release_resources) {
bool abort_if_any_object_is_exception) {
(*results).resize(object_ids.size(), nullptr);
std::shared_ptr<GetRequest> get_request;
@ -301,8 +299,7 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
// Wait for remaining objects (or timeout).
if (should_notify_raylet) {
// SANG-TODO Implement memory store get
RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked(release_resources));
RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources=*/true));
}
bool done = false;
@ -377,11 +374,11 @@ Status CoreWorkerMemoryStore::Get(
const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception, bool release_resources) {
bool *got_exception) {
const std::vector<ObjectID> id_vector(object_ids.begin(), object_ids.end());
std::vector<std::shared_ptr<RayObject>> result_objects;
RAY_RETURN_NOT_OK(Get(id_vector, id_vector.size(), timeout_ms, ctx,
/*remove_after_get=*/false, &result_objects, release_resources));
/*remove_after_get=*/false, &result_objects));
for (size_t i = 0; i < id_vector.size(); i++) {
if (result_objects[i] != nullptr) {
@ -404,9 +401,8 @@ Status CoreWorkerMemoryStore::Wait(const absl::flat_hash_set<ObjectID> &object_i
std::vector<ObjectID> id_vector(object_ids.begin(), object_ids.end());
std::vector<std::shared_ptr<RayObject>> result_objects;
RAY_CHECK(object_ids.size() == id_vector.size());
auto status =
GetImpl(id_vector, num_objects, timeout_ms, ctx, false, &result_objects,
/*abort_if_any_object_is_exception=*/false, /*release_resources=*/true);
auto status = GetImpl(id_vector, num_objects, timeout_ms, ctx, false, &result_objects,
/*abort_if_any_object_is_exception=*/false);
// Ignore TimedOut statuses since we return ready objects explicitly.
if (!status.IsTimedOut()) {
RAY_RETURN_NOT_OK(status);

View file

@ -58,14 +58,13 @@ class CoreWorkerMemoryStore {
/// \return Status.
Status Get(const std::vector<ObjectID> &object_ids, int num_objects, int64_t timeout_ms,
const WorkerContext &ctx, bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results,
bool release_resources = true);
std::vector<std::shared_ptr<RayObject>> *results);
/// Convenience wrapper around Get() that stores results in a given result map.
Status Get(const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception, bool release_resources = true);
bool *got_exception);
/// Convenience wrapper around Get() that stores ready objects in a given result set.
Status Wait(const absl::flat_hash_set<ObjectID> &object_ids, int num_objects,
@ -138,12 +137,11 @@ class CoreWorkerMemoryStore {
private:
/// See the public version of `Get` for meaning of the other arguments.
/// \param[in] abort_if_any_object_is_exception Whether we should abort if any object
/// \param[in] release_resources true if memory store blocking get needs to release
/// resources. is an exception.
Status GetImpl(const std::vector<ObjectID> &object_ids, int num_objects,
int64_t timeout_ms, const WorkerContext &ctx, bool remove_after_get,
std::vector<std::shared_ptr<RayObject>> *results,
bool abort_if_any_object_is_exception, bool release_resources);
bool abort_if_any_object_is_exception);
/// Optional callback for putting objects into the plasma store.
std::function<void(const RayObject &, const ObjectID &)> store_in_plasma_;

View file

@ -226,7 +226,7 @@ Status CoreWorkerPlasmaStoreProvider::Get(
const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception, bool release_resources) {
bool *got_exception) {
int64_t batch_size = RayConfig::instance().worker_fetch_request_size();
std::vector<ObjectID> batch_ids;
absl::flat_hash_set<ObjectID> remaining(object_ids.begin(), object_ids.end());
@ -277,7 +277,8 @@ Status CoreWorkerPlasmaStoreProvider::Get(
size_t previous_size = remaining.size();
// This is a separate IPC from the FetchAndGet in direct call mode.
if (ctx.CurrentTaskIsDirectCall() && ctx.ShouldReleaseResourcesOnBlockingCalls()) {
RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked(release_resources));
RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked(
RayConfig::instance().release_resources_during_plasma_fetch()));
}
RAY_RETURN_NOT_OK(
FetchAndGetFromPlasmaStore(remaining, batch_ids, batch_timeout,
@ -334,9 +335,8 @@ Status CoreWorkerPlasmaStoreProvider::Wait(
// This is a separate IPC from the Wait in direct call mode.
if (ctx.CurrentTaskIsDirectCall() && ctx.ShouldReleaseResourcesOnBlockingCalls()) {
// SANG-TODO Implement wait
RAY_RETURN_NOT_OK(
raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources*/ true));
RAY_RETURN_NOT_OK(raylet_client_->NotifyDirectCallTaskBlocked(
RayConfig::instance().release_resources_during_plasma_fetch()));
}
const auto owner_addresses = reference_counter_->GetOwnerAddresses(id_vector);
RAY_RETURN_NOT_OK(

View file

@ -90,7 +90,7 @@ class CoreWorkerPlasmaStoreProvider {
Status Get(const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
bool *got_exception, bool release_resources = true);
bool *got_exception);
Status Contains(const ObjectID &object_id, bool *has_object);