mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
Fix restoration request dedup issues. (#13546)
This commit is contained in:
parent
bfe147a6a8
commit
e544c008df
2 changed files with 21 additions and 3 deletions
|
@ -306,6 +306,8 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
|
||||||
// If the same object is restoring, we dedup here.
|
// If the same object is restoring, we dedup here.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
RAY_CHECK(objects_pending_restore_.emplace(object_id).second)
|
||||||
|
<< "Object dedupe wasn't done properly. Please report if you see this issue.";
|
||||||
io_worker_pool_.PopRestoreWorker([this, object_id, object_url, callback](
|
io_worker_pool_.PopRestoreWorker([this, object_id, object_url, callback](
|
||||||
std::shared_ptr<WorkerInterface> io_worker) {
|
std::shared_ptr<WorkerInterface> io_worker) {
|
||||||
auto start_time = absl::GetCurrentTimeNanos();
|
auto start_time = absl::GetCurrentTimeNanos();
|
||||||
|
@ -313,8 +315,6 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
|
||||||
rpc::RestoreSpilledObjectsRequest request;
|
rpc::RestoreSpilledObjectsRequest request;
|
||||||
request.add_spilled_objects_url(std::move(object_url));
|
request.add_spilled_objects_url(std::move(object_url));
|
||||||
request.add_object_ids_to_restore(object_id.Binary());
|
request.add_object_ids_to_restore(object_id.Binary());
|
||||||
RAY_CHECK(objects_pending_restore_.emplace(object_id).second)
|
|
||||||
<< "Object dedupe wasn't done properly. Please report if you see this issue.";
|
|
||||||
io_worker->rpc_client()->RestoreSpilledObjects(
|
io_worker->rpc_client()->RestoreSpilledObjects(
|
||||||
request,
|
request,
|
||||||
[this, start_time, object_id, callback, io_worker](
|
[this, start_time, object_id, callback, io_worker](
|
||||||
|
|
|
@ -150,7 +150,7 @@ class MockIOWorkerPool : public IOWorkerPoolInterface {
|
||||||
|
|
||||||
void PopRestoreWorker(
|
void PopRestoreWorker(
|
||||||
std::function<void(std::shared_ptr<WorkerInterface>)> callback) override {
|
std::function<void(std::shared_ptr<WorkerInterface>)> callback) override {
|
||||||
callback(io_worker);
|
restoration_callbacks.push_back(callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
void PopDeleteWorker(
|
void PopDeleteWorker(
|
||||||
|
@ -158,6 +158,17 @@ class MockIOWorkerPool : public IOWorkerPoolInterface {
|
||||||
callback(io_worker);
|
callback(io_worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool RestoreWorkerPushed() {
|
||||||
|
if (restoration_callbacks.size() == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const auto callback = restoration_callbacks.front();
|
||||||
|
callback(io_worker);
|
||||||
|
restoration_callbacks.pop_front();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::list<std::function<void(std::shared_ptr<WorkerInterface>)>> restoration_callbacks;
|
||||||
std::shared_ptr<MockIOWorkerClient> io_worker_client =
|
std::shared_ptr<MockIOWorkerClient> io_worker_client =
|
||||||
std::make_shared<MockIOWorkerClient>();
|
std::make_shared<MockIOWorkerClient>();
|
||||||
std::shared_ptr<WorkerInterface> io_worker =
|
std::shared_ptr<WorkerInterface> io_worker =
|
||||||
|
@ -322,6 +333,13 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
|
||||||
num_times_fired++;
|
num_times_fired++;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
ASSERT_EQ(num_times_fired, 0);
|
||||||
|
|
||||||
|
// When restore workers are pushed, the request should be dedupped.
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
worker_pool.RestoreWorkerPushed();
|
||||||
|
ASSERT_EQ(num_times_fired, 0);
|
||||||
|
}
|
||||||
worker_pool.io_worker_client->ReplyRestoreObjects(10);
|
worker_pool.io_worker_client->ReplyRestoreObjects(10);
|
||||||
ASSERT_EQ(num_times_fired, 1);
|
ASSERT_EQ(num_times_fired, 1);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue