mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Fix plasma bug (#7322)
This commit is contained in:
parent
44b4394afa
commit
9964657815
2 changed files with 19 additions and 6 deletions
|
@ -916,6 +916,15 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head, num_actors):
|
||||||
}],
|
}],
|
||||||
indirect=True)
|
indirect=True)
|
||||||
def test_fill_object_store_exception(ray_start_cluster_head):
|
def test_fill_object_store_exception(ray_start_cluster_head):
|
||||||
|
@ray.remote
|
||||||
|
def expensive_task():
|
||||||
|
return np.zeros((10**8) // 10, dtype=np.uint8)
|
||||||
|
|
||||||
|
with pytest.raises(ray.exceptions.RayTaskError) as e:
|
||||||
|
ray.get([expensive_task.remote() for _ in range(20)])
|
||||||
|
with pytest.raises(ray.exceptions.ObjectStoreFullError):
|
||||||
|
raise e.as_instanceof_cause()
|
||||||
|
|
||||||
@ray.remote
|
@ray.remote
|
||||||
class LargeMemoryActor:
|
class LargeMemoryActor:
|
||||||
def some_expensive_task(self):
|
def some_expensive_task(self):
|
||||||
|
|
|
@ -480,16 +480,20 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
|
||||||
&result_map, &got_exception));
|
&result_map, &got_exception));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Erase any objects that were promoted to plasma from the results. These get
|
||||||
|
// requests will be retried at the plasma store.
|
||||||
|
for (auto it = result_map.begin(); it != result_map.end(); it++) {
|
||||||
|
if (it->second->IsInPlasmaError()) {
|
||||||
|
RAY_LOG(DEBUG) << it->first << " in plasma, doing fetch-and-get";
|
||||||
|
plasma_object_ids.insert(it->first);
|
||||||
|
result_map.erase(it);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!got_exception) {
|
if (!got_exception) {
|
||||||
// If any of the objects have been promoted to plasma, then we retry their
|
// If any of the objects have been promoted to plasma, then we retry their
|
||||||
// gets at the provider plasma. Once we get the objects from plasma, we flip
|
// gets at the provider plasma. Once we get the objects from plasma, we flip
|
||||||
// the transport type again and return them for the original direct call ids.
|
// the transport type again and return them for the original direct call ids.
|
||||||
for (const auto &pair : result_map) {
|
|
||||||
if (pair.second->IsInPlasmaError()) {
|
|
||||||
RAY_LOG(DEBUG) << pair.first << " in plasma, doing fetch-and-get";
|
|
||||||
plasma_object_ids.insert(pair.first);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
int64_t local_timeout_ms = timeout_ms;
|
int64_t local_timeout_ms = timeout_ms;
|
||||||
if (timeout_ms >= 0) {
|
if (timeout_ms >= 0) {
|
||||||
local_timeout_ms = std::max(static_cast<int64_t>(0),
|
local_timeout_ms = std::max(static_cast<int64_t>(0),
|
||||||
|
|
Loading…
Add table
Reference in a new issue