diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 84d86738b..b5a4a842a 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -916,6 +916,15 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors): }], indirect=True) def test_fill_object_store_exception(ray_start_cluster_head): + @ray.remote + def expensive_task(): + return np.zeros((10**8) // 10, dtype=np.uint8) + + with pytest.raises(ray.exceptions.RayTaskError) as e: + ray.get([expensive_task.remote() for _ in range(20)]) + with pytest.raises(ray.exceptions.ObjectStoreFullError): + raise e.as_instanceof_cause() + @ray.remote class LargeMemoryActor: def some_expensive_task(self): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 08e3dd46d..1d66982a1 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -480,16 +480,20 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m &result_map, &got_exception)); } + // Erase any objects that were promoted to plasma from the results. These get + // requests will be retried at the plasma store. + for (auto it = result_map.begin(); it != result_map.end(); it++) { + if (it->second->IsInPlasmaError()) { + RAY_LOG(DEBUG) << it->first << " in plasma, doing fetch-and-get"; + plasma_object_ids.insert(it->first); + result_map.erase(it); + } + } + if (!got_exception) { // If any of the objects have been promoted to plasma, then we retry their // gets at the provider plasma. Once we get the objects from plasma, we flip // the transport type again and return them for the original direct call ids. - for (const auto &pair : result_map) { - if (pair.second->IsInPlasmaError()) { - RAY_LOG(DEBUG) << pair.first << " in plasma, doing fetch-and-get"; - plasma_object_ids.insert(pair.first); - } - } int64_t local_timeout_ms = timeout_ms; if (timeout_ms >= 0) { local_timeout_ms = std::max(static_cast(0),