This commit is contained in:
Stephanie Wang 2020-03-20 18:53:28 -07:00 committed by GitHub
parent ec50037ee1
commit 6a12a31b2e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 36 additions and 36 deletions

View file

@ -1752,7 +1752,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) {
ASSERT_TRUE(lineage_deleted.empty());
// We should keep lineage for owned objects.
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address());
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->HasReference(id));
rc->RemoveLocalReference(id, nullptr);
@ -1771,7 +1771,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
for (int i = 0; i < 3; i++) {
ObjectID id = ObjectID::FromRandom();
ids.push_back(id);
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address());
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0);
}
rc->SetReleaseLineageCallback(

View file

@ -786,7 +786,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) {
std::vector<ObjectID> ids(buffers.size());
for (size_t i = 0; i < ids.size(); i++) {
ids[i] = ObjectID::FromRandom().WithDirectTransportType();
RAY_CHECK_OK(provider.Put(buffers[i], ids[i]));
RAY_CHECK(provider.Put(buffers[i], ids[i]));
}
absl::flat_hash_set<ObjectID> wait_ids(ids.begin(), ids.end());
@ -843,7 +843,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) {
std::vector<ObjectID> unready_ids(buffers.size());
for (size_t i = 0; i < unready_ids.size(); i++) {
ready_ids[i] = ObjectID::FromRandom().WithDirectTransportType();
RAY_CHECK_OK(provider.Put(buffers[i], ready_ids[i]));
RAY_CHECK(provider.Put(buffers[i], ready_ids[i]));
unready_ids[i] = ObjectID::FromRandom().WithDirectTransportType();
}
@ -851,7 +851,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) {
sleep(1);
for (size_t i = 0; i < unready_ids.size(); i++) {
RAY_CHECK_OK(provider.Put(buffers[i], unready_ids[i]));
RAY_CHECK(provider.Put(buffers[i], unready_ids[i]));
}
};

View file

@ -136,9 +136,9 @@ TEST_F(DirectActorTransportTest, TestDependencies) {
// Put the dependencies in the store in the same order as task submission.
auto data = GenerateRandomObject();
ASSERT_TRUE(store_->Put(*data, obj1).ok());
ASSERT_TRUE(store_->Put(*data, obj1));
ASSERT_EQ(worker_client_->callbacks.size(), 1);
ASSERT_TRUE(store_->Put(*data, obj2).ok());
ASSERT_TRUE(store_->Put(*data, obj2));
ASSERT_EQ(worker_client_->callbacks.size(), 2);
}
@ -165,9 +165,9 @@ TEST_F(DirectActorTransportTest, TestOutOfOrderDependencies) {
// Put the dependencies in the store in the opposite order of task
// submission.
auto data = GenerateRandomObject();
ASSERT_TRUE(store_->Put(*data, obj2).ok());
ASSERT_TRUE(store_->Put(*data, obj2));
ASSERT_EQ(worker_client_->callbacks.size(), 0);
ASSERT_TRUE(store_->Put(*data, obj1).ok());
ASSERT_TRUE(store_->Put(*data, obj1));
ASSERT_EQ(worker_client_->callbacks.size(), 2);
}

View file

@ -139,7 +139,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) {
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
auto data = GenerateRandomObject();
ASSERT_TRUE(mem->Put(*data, obj1).ok());
ASSERT_TRUE(mem->Put(*data, obj1));
// Test getting an already existing object.
ASSERT_TRUE(mem->GetOrPromoteToPlasma(obj1) != nullptr);
@ -148,7 +148,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) {
// Testing getting an object that doesn't exist yet causes promotion.
ASSERT_TRUE(mem->GetOrPromoteToPlasma(obj2) == nullptr);
ASSERT_TRUE(num_plasma_puts == 0);
ASSERT_TRUE(mem->Put(*data, obj2).ok());
ASSERT_FALSE(mem->Put(*data, obj2));
ASSERT_TRUE(num_plasma_puts == 1);
// The next time you get it, it's already there so no need to promote.
@ -191,7 +191,7 @@ TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) {
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto data = RayObject(nullptr, meta_buffer, std::vector<ObjectID>());
ASSERT_TRUE(store->Put(data, obj1).ok());
ASSERT_TRUE(store->Put(data, obj1));
TaskSpecification task;
task.GetMutableMessage().add_args()->add_object_ids(obj1.Binary());
ASSERT_TRUE(task.ArgId(0, 0).IsDirectCallType());
@ -213,8 +213,8 @@ TEST(LocalDependencyResolverTest, TestInlineLocalDependencies) {
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
auto data = GenerateRandomObject();
// Ensure the data is already present in the local store.
ASSERT_TRUE(store->Put(*data, obj1).ok());
ASSERT_TRUE(store->Put(*data, obj2).ok());
ASSERT_TRUE(store->Put(*data, obj1));
ASSERT_TRUE(store->Put(*data, obj2));
TaskSpecification task;
task.GetMutableMessage().add_args()->add_object_ids(obj1.Binary());
task.GetMutableMessage().add_args()->add_object_ids(obj2.Binary());
@ -244,8 +244,8 @@ TEST(LocalDependencyResolverTest, TestInlinePendingDependencies) {
resolver.ResolveDependencies(task, [&ok]() { ok = true; });
ASSERT_EQ(resolver.NumPendingTasks(), 1);
ASSERT_TRUE(!ok);
ASSERT_TRUE(store->Put(*data, obj1).ok());
ASSERT_TRUE(store->Put(*data, obj2).ok());
ASSERT_TRUE(store->Put(*data, obj1));
ASSERT_TRUE(store->Put(*data, obj2));
// Tests that the task proto was rewritten to have inline argument values after
// resolution completes.
ASSERT_TRUE(ok);
@ -273,8 +273,8 @@ TEST(LocalDependencyResolverTest, TestInlinedObjectIds) {
resolver.ResolveDependencies(task, [&ok]() { ok = true; });
ASSERT_EQ(resolver.NumPendingTasks(), 1);
ASSERT_TRUE(!ok);
ASSERT_TRUE(store->Put(*data, obj1).ok());
ASSERT_TRUE(store->Put(*data, obj2).ok());
ASSERT_TRUE(store->Put(*data, obj1));
ASSERT_TRUE(store->Put(*data, obj2));
// Tests that the task proto was rewritten to have inline argument values after
// resolution completes.
ASSERT_TRUE(ok);
@ -698,16 +698,16 @@ TEST(DirectTaskTransportTest, TestSchedulingKeys) {
ObjectID plasma2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
// Ensure the data is already present in the local store for direct call objects.
auto data = GenerateRandomObject();
ASSERT_TRUE(store->Put(*data, direct1).ok());
ASSERT_TRUE(store->Put(*data, direct2).ok());
ASSERT_TRUE(store->Put(*data, direct1));
ASSERT_TRUE(store->Put(*data, direct2));
// Force plasma objects to be promoted.
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto plasma_data = RayObject(nullptr, meta_buffer, std::vector<ObjectID>());
ASSERT_TRUE(store->Put(plasma_data, plasma1).ok());
ASSERT_TRUE(store->Put(plasma_data, plasma2).ok());
ASSERT_TRUE(store->Put(plasma_data, plasma1));
ASSERT_TRUE(store->Put(plasma_data, plasma2));
TaskSpecification same_deps_1 = BuildTaskSpec(resources1, descriptor1);
same_deps_1.GetMutableMessage().add_args()->add_object_ids(direct1.Binary());

View file

@ -197,7 +197,7 @@ TEST_F(TaskManagerTest, TestLineageEvicted) {
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
auto spec = CreateTaskHelper(1, {dep1, dep2});
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, num_retries);
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
rpc::PushTaskReply reply;
@ -216,7 +216,7 @@ TEST_F(TaskManagerTest, TestLineageEvicted) {
// Once the return ID goes out of scope, the task spec and its dependencies
// are released.
reference_counter_->AddLocalReference(return_id);
reference_counter_->AddLocalReference(return_id, "");
reference_counter_->RemoveLocalReference(return_id, nullptr);
ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId()));
ASSERT_FALSE(reference_counter_->HasReference(return_id));
@ -234,9 +234,9 @@ TEST_F(TaskManagerLineageTest, TestLineagePinned) {
auto spec = CreateTaskHelper(1, {dep1, dep2});
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, num_retries);
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
@ -274,9 +274,9 @@ TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) {
auto spec = CreateTaskHelper(1, {dep1, dep2});
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, num_retries);
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
@ -309,9 +309,9 @@ TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) {
auto spec = CreateTaskHelper(1, {dep1, dep2});
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, num_retries);
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
@ -346,13 +346,13 @@ TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) {
rpc::Address caller_address;
ObjectID dep = ObjectID::FromRandom();
reference_counter_->AddLocalReference(dep);
reference_counter_->AddLocalReference(dep, "");
for (int i = 0; i < 3; i++) {
auto spec = CreateTaskHelper(1, {dep});
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, num_retries);
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id);
reference_counter_->AddLocalReference(return_id, "");
// The task completes.
rpc::PushTaskReply reply;
@ -388,13 +388,13 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) {
rpc::Address caller_address;
ObjectID dep = ObjectID::FromRandom();
reference_counter_->AddLocalReference(dep);
reference_counter_->AddLocalReference(dep, "");
for (int i = 0; i < 3; i++) {
auto spec = CreateTaskHelper(1, {dep});
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, num_retries);
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id);
reference_counter_->AddLocalReference(return_id, "");
// The task completes.
rpc::PushTaskReply reply;