From 5f7224bd51151e3ae8e3c46e95242dd4e42b9dd7 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Mon, 17 Jan 2022 12:08:15 +0800 Subject: [PATCH] [C++ API] fix wrong arg handling for object references in `TaskExecutor`, `TaskArgByReference` (#21236) Previously, ref arg is handled wrongly, serializing the object ref, instead of RayObject to be passed as args buffer to the user function. That's because CoreWorker is the component responsible for ensuring that all ObjectReferences are resolved and serialized into `RayObject`s at the time of the `task_execution_callback` invocation, not any component downstream of the callback. This resulted in the following error for large objects which are not turned into `TaskArg::value` due to being over 100KB. ``` C++ exception with description "Invalid: invalid arguments: std::bad_cast" thrown in the test body. ``` This was not caught due to lack of testing for large objects, which has now been added. --- cpp/src/ray/runtime/abstract_ray_runtime.cc | 9 +++-- cpp/src/ray/runtime/task/task_executor.cc | 13 ++------ cpp/src/ray/test/cluster/cluster_mode_test.cc | 33 +++++++++++-------- cpp/src/ray/test/cluster/plus.cc | 4 ++- cpp/src/ray/test/cluster/plus.h | 2 ++ src/ray/core_worker/core_worker.cc | 2 +- src/ray/core_worker/reference_count.cc | 2 +- 7 files changed, 36 insertions(+), 29 deletions(-) diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 4937e9a88..b00839d6a 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -128,8 +128,13 @@ std::vector> TransformArgs( memory_buffer, nullptr, std::vector())); } else { RAY_CHECK(arg.id); - ray_arg = absl::make_unique(ObjectID::FromBinary(*arg.id), - ray::rpc::Address{}, + auto id = ObjectID::FromBinary(*arg.id); + auto owner_address = ray::rpc::Address{}; + if (ConfigInternal::Instance().run_mode == RunMode::CLUSTER) { + auto &core_worker = CoreWorkerProcess::GetCoreWorker(); + owner_address = core_worker.GetOwnerAddress(id); + } + ray_arg = absl::make_unique(id, owner_address, /*call_site=*/""); } ray_args.push_back(std::move(ray_arg)); diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index b10e0986e..67c4ff892 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -144,18 +144,9 @@ Status TaskExecutor::ExecuteTask( std::shared_ptr data = nullptr; ArgsBufferList ray_args_buffer; for (size_t i = 0; i < args_buffer.size(); i++) { - auto &ref = arg_refs.at(i); - bool is_ref_arg = (ref.object_id() != ray::ObjectID::Nil().Binary()); - + auto &arg = args_buffer.at(i); msgpack::sbuffer sbuf; - - if (is_ref_arg) { - sbuf.write(ref.object_id().data(), ref.object_id().size()); - } else { - auto &arg = args_buffer.at(i); - sbuf.write((const char *)(arg->GetData()->Data()), arg->GetData()->Size()); - } - + sbuf.write((const char *)(arg->GetData()->Data()), arg->GetData()->Size()); ray_args_buffer.push_back(std::move(sbuf)); } if (task_type == ray::TaskType::ACTOR_CREATION_TASK) { diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc index 13eb311e9..3e36f7a5b 100644 --- a/cpp/src/ray/test/cluster/cluster_mode_test.cc +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -196,6 +196,13 @@ TEST(RayClusterModeTest, FullTest) { EXPECT_EQ(result15, 29); EXPECT_EQ(result16, 30); + /// Test Put, Get & Remote for large objects + std::array arr; + auto r17 = ray::Put(arr); + auto r18 = ray::Task(ReturnLargeArray).Remote(r17); + EXPECT_EQ(arr, *(ray::Get(r17))); + EXPECT_EQ(arr, *(ray::Get(r18))); + uint64_t pid = *actor1.Task(&Counter::GetPid).Remote().Get(); EXPECT_TRUE(Counter::IsProcessAlive(pid)); @@ -286,20 +293,20 @@ TEST(RayClusterModeTest, LocalRefrenceTest) { } TEST(RayClusterModeTest, DependencyRefrenceTest) { - auto r1 = std::make_unique>(ray::Task(Return1).Remote()); - auto object_id = ray::ObjectID::FromBinary(r1->ID()); - EXPECT_TRUE(CheckRefCount({{object_id, std::make_pair(1, 0)}})); + { + auto r1 = ray::Task(Return1).Remote(); + auto object_id = ray::ObjectID::FromBinary(r1.ID()); + EXPECT_TRUE(CheckRefCount({{object_id, std::make_pair(1, 0)}})); - auto r2 = std::make_unique>(ray::Task(Plus1).Remote(*r1)); - EXPECT_TRUE( - CheckRefCount({{object_id, std::make_pair(1, 1)}, - {ray::ObjectID::FromBinary(r2->ID()), std::make_pair(1, 0)}})); - r2->Get(); - EXPECT_TRUE( - CheckRefCount({{object_id, std::make_pair(1, 0)}, - {ray::ObjectID::FromBinary(r2->ID()), std::make_pair(1, 0)}})); - r1.reset(); - r2.reset(); + auto r2 = ray::Task(Plus1).Remote(r1); + EXPECT_TRUE( + CheckRefCount({{object_id, std::make_pair(1, 1)}, + {ray::ObjectID::FromBinary(r2.ID()), std::make_pair(1, 0)}})); + r2.Get(); + EXPECT_TRUE( + CheckRefCount({{object_id, std::make_pair(1, 0)}, + {ray::ObjectID::FromBinary(r2.ID()), std::make_pair(1, 0)}})); + } EXPECT_TRUE(CheckRefCount({})); } diff --git a/cpp/src/ray/test/cluster/plus.cc b/cpp/src/ray/test/cluster/plus.cc index 74f6d85a8..0b69d6995 100644 --- a/cpp/src/ray/test/cluster/plus.cc +++ b/cpp/src/ray/test/cluster/plus.cc @@ -17,6 +17,8 @@ int Return1() { return 1; }; int Plus1(int x) { return x + 1; }; int Plus(int x, int y) { return x + y; }; + +std::array ReturnLargeArray(std::array x) { return x; }; void ThrowTask() { throw std::logic_error("error"); } -RAY_REMOTE(Return1, Plus1, Plus, ThrowTask); +RAY_REMOTE(Return1, Plus1, Plus, ThrowTask, ReturnLargeArray); diff --git a/cpp/src/ray/test/cluster/plus.h b/cpp/src/ray/test/cluster/plus.h index ab8e66696..09f4dc777 100644 --- a/cpp/src/ray/test/cluster/plus.h +++ b/cpp/src/ray/test/cluster/plus.h @@ -21,3 +21,5 @@ int Return1(); int Plus1(int x); int Plus(int x, int y); void ThrowTask(); + +std::array ReturnLargeArray(std::array x); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 56cd4912e..acc47e223 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -733,7 +733,7 @@ rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const { RAY_CHECK(has_owner) << "Object IDs generated randomly (ObjectID.from_random()) or out-of-band " "(ObjectID.from_binary(...)) cannot be passed as a task argument because Ray " - "does not know which task will create them. " + "does not know which task created them. " "If this was not how your object ID was generated, please file an issue " "at https://github.com/ray-project/ray/issues/"; return owner_address; diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 561dca4ac..5c1d5743d 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -468,7 +468,7 @@ std::vector ReferenceCounter::GetOwnerAddresses( << " Object IDs generated randomly (ObjectID.from_random()) or out-of-band " "(ObjectID.from_binary(...)) cannot be passed to ray.get(), ray.wait(), or " "as " - "a task argument because Ray does not know which task will create them. " + "a task argument because Ray does not know which task created them. " "If this was not how your object ID was generated, please file an issue " "at https://github.com/ray-project/ray/issues/"; // TODO(swang): Java does not seem to keep the ref count properly, so the