diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 4937e9a88..b00839d6a 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -128,8 +128,13 @@ std::vector> TransformArgs( memory_buffer, nullptr, std::vector())); } else { RAY_CHECK(arg.id); - ray_arg = absl::make_unique(ObjectID::FromBinary(*arg.id), - ray::rpc::Address{}, + auto id = ObjectID::FromBinary(*arg.id); + auto owner_address = ray::rpc::Address{}; + if (ConfigInternal::Instance().run_mode == RunMode::CLUSTER) { + auto &core_worker = CoreWorkerProcess::GetCoreWorker(); + owner_address = core_worker.GetOwnerAddress(id); + } + ray_arg = absl::make_unique(id, owner_address, /*call_site=*/""); } ray_args.push_back(std::move(ray_arg)); diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index b10e0986e..67c4ff892 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -144,18 +144,9 @@ Status TaskExecutor::ExecuteTask( std::shared_ptr data = nullptr; ArgsBufferList ray_args_buffer; for (size_t i = 0; i < args_buffer.size(); i++) { - auto &ref = arg_refs.at(i); - bool is_ref_arg = (ref.object_id() != ray::ObjectID::Nil().Binary()); - + auto &arg = args_buffer.at(i); msgpack::sbuffer sbuf; - - if (is_ref_arg) { - sbuf.write(ref.object_id().data(), ref.object_id().size()); - } else { - auto &arg = args_buffer.at(i); - sbuf.write((const char *)(arg->GetData()->Data()), arg->GetData()->Size()); - } - + sbuf.write((const char *)(arg->GetData()->Data()), arg->GetData()->Size()); ray_args_buffer.push_back(std::move(sbuf)); } if (task_type == ray::TaskType::ACTOR_CREATION_TASK) { diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc index 13eb311e9..3e36f7a5b 100644 --- a/cpp/src/ray/test/cluster/cluster_mode_test.cc +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -196,6 +196,13 @@ TEST(RayClusterModeTest, FullTest) { EXPECT_EQ(result15, 29); EXPECT_EQ(result16, 30); + /// Test Put, Get & Remote for large objects + std::array arr; + auto r17 = ray::Put(arr); + auto r18 = ray::Task(ReturnLargeArray).Remote(r17); + EXPECT_EQ(arr, *(ray::Get(r17))); + EXPECT_EQ(arr, *(ray::Get(r18))); + uint64_t pid = *actor1.Task(&Counter::GetPid).Remote().Get(); EXPECT_TRUE(Counter::IsProcessAlive(pid)); @@ -286,20 +293,20 @@ TEST(RayClusterModeTest, LocalRefrenceTest) { } TEST(RayClusterModeTest, DependencyRefrenceTest) { - auto r1 = std::make_unique>(ray::Task(Return1).Remote()); - auto object_id = ray::ObjectID::FromBinary(r1->ID()); - EXPECT_TRUE(CheckRefCount({{object_id, std::make_pair(1, 0)}})); + { + auto r1 = ray::Task(Return1).Remote(); + auto object_id = ray::ObjectID::FromBinary(r1.ID()); + EXPECT_TRUE(CheckRefCount({{object_id, std::make_pair(1, 0)}})); - auto r2 = std::make_unique>(ray::Task(Plus1).Remote(*r1)); - EXPECT_TRUE( - CheckRefCount({{object_id, std::make_pair(1, 1)}, - {ray::ObjectID::FromBinary(r2->ID()), std::make_pair(1, 0)}})); - r2->Get(); - EXPECT_TRUE( - CheckRefCount({{object_id, std::make_pair(1, 0)}, - {ray::ObjectID::FromBinary(r2->ID()), std::make_pair(1, 0)}})); - r1.reset(); - r2.reset(); + auto r2 = ray::Task(Plus1).Remote(r1); + EXPECT_TRUE( + CheckRefCount({{object_id, std::make_pair(1, 1)}, + {ray::ObjectID::FromBinary(r2.ID()), std::make_pair(1, 0)}})); + r2.Get(); + EXPECT_TRUE( + CheckRefCount({{object_id, std::make_pair(1, 0)}, + {ray::ObjectID::FromBinary(r2.ID()), std::make_pair(1, 0)}})); + } EXPECT_TRUE(CheckRefCount({})); } diff --git a/cpp/src/ray/test/cluster/plus.cc b/cpp/src/ray/test/cluster/plus.cc index 74f6d85a8..0b69d6995 100644 --- a/cpp/src/ray/test/cluster/plus.cc +++ b/cpp/src/ray/test/cluster/plus.cc @@ -17,6 +17,8 @@ int Return1() { return 1; }; int Plus1(int x) { return x + 1; }; int Plus(int x, int y) { return x + y; }; + +std::array ReturnLargeArray(std::array x) { return x; }; void ThrowTask() { throw std::logic_error("error"); } -RAY_REMOTE(Return1, Plus1, Plus, ThrowTask); +RAY_REMOTE(Return1, Plus1, Plus, ThrowTask, ReturnLargeArray); diff --git a/cpp/src/ray/test/cluster/plus.h b/cpp/src/ray/test/cluster/plus.h index ab8e66696..09f4dc777 100644 --- a/cpp/src/ray/test/cluster/plus.h +++ b/cpp/src/ray/test/cluster/plus.h @@ -21,3 +21,5 @@ int Return1(); int Plus1(int x); int Plus(int x, int y); void ThrowTask(); + +std::array ReturnLargeArray(std::array x); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 56cd4912e..acc47e223 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -733,7 +733,7 @@ rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const { RAY_CHECK(has_owner) << "Object IDs generated randomly (ObjectID.from_random()) or out-of-band " "(ObjectID.from_binary(...)) cannot be passed as a task argument because Ray " - "does not know which task will create them. " + "does not know which task created them. " "If this was not how your object ID was generated, please file an issue " "at https://github.com/ray-project/ray/issues/"; return owner_address; diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 561dca4ac..5c1d5743d 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -468,7 +468,7 @@ std::vector ReferenceCounter::GetOwnerAddresses( << " Object IDs generated randomly (ObjectID.from_random()) or out-of-band " "(ObjectID.from_binary(...)) cannot be passed to ray.get(), ray.wait(), or " "as " - "a task argument because Ray does not know which task will create them. " + "a task argument because Ray does not know which task created them. " "If this was not how your object ID was generated, please file an issue " "at https://github.com/ray-project/ray/issues/"; // TODO(swang): Java does not seem to keep the ref count properly, so the