[C++ API] fix wrong arg handling for object references in TaskExecutor, TaskArgByReference (#21236)

Previously, ref arg is handled wrongly, serializing the object ref, instead of RayObject to be passed as args buffer to the user function. 

That's because CoreWorker is the component responsible for ensuring that all ObjectReferences are resolved and serialized into `RayObject`s at the time of the `task_execution_callback` invocation, not any component downstream of the callback. 

This resulted in the following error for large objects which are not turned into `TaskArg::value` due to being over 100KB.
```
C++ exception with description "Invalid: invalid arguments: std::bad_cast" thrown in the test body.
```
This was not caught due to lack of testing for large objects, which has now been added.
This commit is contained in:
jon-chuang 2022-01-17 12:08:15 +08:00 committed by GitHub
parent 86bbf28e4c
commit 5f7224bd51
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 36 additions and 29 deletions

View file

@ -128,8 +128,13 @@ std::vector<std::unique_ptr<::ray::TaskArg>> TransformArgs(
memory_buffer, nullptr, std::vector<rpc::ObjectReference>())); memory_buffer, nullptr, std::vector<rpc::ObjectReference>()));
} else { } else {
RAY_CHECK(arg.id); RAY_CHECK(arg.id);
ray_arg = absl::make_unique<ray::TaskArgByReference>(ObjectID::FromBinary(*arg.id), auto id = ObjectID::FromBinary(*arg.id);
ray::rpc::Address{}, auto owner_address = ray::rpc::Address{};
if (ConfigInternal::Instance().run_mode == RunMode::CLUSTER) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
owner_address = core_worker.GetOwnerAddress(id);
}
ray_arg = absl::make_unique<ray::TaskArgByReference>(id, owner_address,
/*call_site=*/""); /*call_site=*/"");
} }
ray_args.push_back(std::move(ray_arg)); ray_args.push_back(std::move(ray_arg));

View file

@ -144,18 +144,9 @@ Status TaskExecutor::ExecuteTask(
std::shared_ptr<msgpack::sbuffer> data = nullptr; std::shared_ptr<msgpack::sbuffer> data = nullptr;
ArgsBufferList ray_args_buffer; ArgsBufferList ray_args_buffer;
for (size_t i = 0; i < args_buffer.size(); i++) { for (size_t i = 0; i < args_buffer.size(); i++) {
auto &ref = arg_refs.at(i); auto &arg = args_buffer.at(i);
bool is_ref_arg = (ref.object_id() != ray::ObjectID::Nil().Binary());
msgpack::sbuffer sbuf; msgpack::sbuffer sbuf;
sbuf.write((const char *)(arg->GetData()->Data()), arg->GetData()->Size());
if (is_ref_arg) {
sbuf.write(ref.object_id().data(), ref.object_id().size());
} else {
auto &arg = args_buffer.at(i);
sbuf.write((const char *)(arg->GetData()->Data()), arg->GetData()->Size());
}
ray_args_buffer.push_back(std::move(sbuf)); ray_args_buffer.push_back(std::move(sbuf));
} }
if (task_type == ray::TaskType::ACTOR_CREATION_TASK) { if (task_type == ray::TaskType::ACTOR_CREATION_TASK) {

View file

@ -196,6 +196,13 @@ TEST(RayClusterModeTest, FullTest) {
EXPECT_EQ(result15, 29); EXPECT_EQ(result15, 29);
EXPECT_EQ(result16, 30); EXPECT_EQ(result16, 30);
/// Test Put, Get & Remote for large objects
std::array<int, 100000> arr;
auto r17 = ray::Put(arr);
auto r18 = ray::Task(ReturnLargeArray).Remote(r17);
EXPECT_EQ(arr, *(ray::Get(r17)));
EXPECT_EQ(arr, *(ray::Get(r18)));
uint64_t pid = *actor1.Task(&Counter::GetPid).Remote().Get(); uint64_t pid = *actor1.Task(&Counter::GetPid).Remote().Get();
EXPECT_TRUE(Counter::IsProcessAlive(pid)); EXPECT_TRUE(Counter::IsProcessAlive(pid));
@ -286,20 +293,20 @@ TEST(RayClusterModeTest, LocalRefrenceTest) {
} }
TEST(RayClusterModeTest, DependencyRefrenceTest) { TEST(RayClusterModeTest, DependencyRefrenceTest) {
auto r1 = std::make_unique<ray::ObjectRef<int>>(ray::Task(Return1).Remote()); {
auto object_id = ray::ObjectID::FromBinary(r1->ID()); auto r1 = ray::Task(Return1).Remote();
EXPECT_TRUE(CheckRefCount({{object_id, std::make_pair(1, 0)}})); auto object_id = ray::ObjectID::FromBinary(r1.ID());
EXPECT_TRUE(CheckRefCount({{object_id, std::make_pair(1, 0)}}));
auto r2 = std::make_unique<ray::ObjectRef<int>>(ray::Task(Plus1).Remote(*r1)); auto r2 = ray::Task(Plus1).Remote(r1);
EXPECT_TRUE( EXPECT_TRUE(
CheckRefCount({{object_id, std::make_pair(1, 1)}, CheckRefCount({{object_id, std::make_pair(1, 1)},
{ray::ObjectID::FromBinary(r2->ID()), std::make_pair(1, 0)}})); {ray::ObjectID::FromBinary(r2.ID()), std::make_pair(1, 0)}}));
r2->Get(); r2.Get();
EXPECT_TRUE( EXPECT_TRUE(
CheckRefCount({{object_id, std::make_pair(1, 0)}, CheckRefCount({{object_id, std::make_pair(1, 0)},
{ray::ObjectID::FromBinary(r2->ID()), std::make_pair(1, 0)}})); {ray::ObjectID::FromBinary(r2.ID()), std::make_pair(1, 0)}}));
r1.reset(); }
r2.reset();
EXPECT_TRUE(CheckRefCount({})); EXPECT_TRUE(CheckRefCount({}));
} }

View file

@ -17,6 +17,8 @@
int Return1() { return 1; }; int Return1() { return 1; };
int Plus1(int x) { return x + 1; }; int Plus1(int x) { return x + 1; };
int Plus(int x, int y) { return x + y; }; int Plus(int x, int y) { return x + y; };
std::array<int, 100000> ReturnLargeArray(std::array<int, 100000> x) { return x; };
void ThrowTask() { throw std::logic_error("error"); } void ThrowTask() { throw std::logic_error("error"); }
RAY_REMOTE(Return1, Plus1, Plus, ThrowTask); RAY_REMOTE(Return1, Plus1, Plus, ThrowTask, ReturnLargeArray);

View file

@ -21,3 +21,5 @@ int Return1();
int Plus1(int x); int Plus1(int x);
int Plus(int x, int y); int Plus(int x, int y);
void ThrowTask(); void ThrowTask();
std::array<int, 100000> ReturnLargeArray(std::array<int, 100000> x);

View file

@ -733,7 +733,7 @@ rpc::Address CoreWorker::GetOwnerAddress(const ObjectID &object_id) const {
RAY_CHECK(has_owner) RAY_CHECK(has_owner)
<< "Object IDs generated randomly (ObjectID.from_random()) or out-of-band " << "Object IDs generated randomly (ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed as a task argument because Ray " "(ObjectID.from_binary(...)) cannot be passed as a task argument because Ray "
"does not know which task will create them. " "does not know which task created them. "
"If this was not how your object ID was generated, please file an issue " "If this was not how your object ID was generated, please file an issue "
"at https://github.com/ray-project/ray/issues/"; "at https://github.com/ray-project/ray/issues/";
return owner_address; return owner_address;

View file

@ -468,7 +468,7 @@ std::vector<rpc::Address> ReferenceCounter::GetOwnerAddresses(
<< " Object IDs generated randomly (ObjectID.from_random()) or out-of-band " << " Object IDs generated randomly (ObjectID.from_random()) or out-of-band "
"(ObjectID.from_binary(...)) cannot be passed to ray.get(), ray.wait(), or " "(ObjectID.from_binary(...)) cannot be passed to ray.get(), ray.wait(), or "
"as " "as "
"a task argument because Ray does not know which task will create them. " "a task argument because Ray does not know which task created them. "
"If this was not how your object ID was generated, please file an issue " "If this was not how your object ID was generated, please file an issue "
"at https://github.com/ray-project/ray/issues/"; "at https://github.com/ray-project/ray/issues/";
// TODO(swang): Java does not seem to keep the ref count properly, so the // TODO(swang): Java does not seem to keep the ref count properly, so the