[C++ worker]Support ActorHandle type parameter (#27364)

Now c++ worker doesn't support `ActorHandle` type parameter.
When we pass an `ActorHandle` object to a task, it will incur this error:
![image](https://user-images.githubusercontent.com/5276001/182349872-a616ff55-6a2b-454d-9831-18877b56c228.png)
The reason is that caller just deserializes the actor handle but doesn't register it to core worker, so if we call tasks of the actor, it will not be found in local.
This commit is contained in:
Tao Wang 2022-08-04 16:39:52 +08:00 committed by GitHub
parent b6765bb4f3
commit d4a1cebaa3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 74 additions and 4 deletions

View file

@ -30,6 +30,9 @@ class ActorHandle {
ActorHandle(const std::string &id) { id_ = id; }
// Used to identify its type.
static bool IsActorHandle() { return true; }
/// Get a untyped ID of the actor
const std::string &ID() const { return id_; }
@ -65,11 +68,17 @@ class ActorHandle {
return {ray::internal::GetRayRuntime().get(), id_, std::move(remote_func_holder)};
}
void Kill() { ray::internal::GetRayRuntime()->KillActor(id_, true); }
void Kill() { Kill(true); }
void Kill(bool no_restart) {
ray::internal::GetRayRuntime()->KillActor(id_, no_restart);
}
static ActorHandle FromBytes(const std::string &serialized_actor_handle) {
std::string id = ray::internal::GetRayRuntime()->DeserializeAndRegisterActorHandle(
serialized_actor_handle);
return ActorHandle(id);
}
/// Make ActorHandle serializable
MSGPACK_DEFINE(id_);

View file

@ -20,6 +20,7 @@
#include <ray/api/xlang_function.h>
#include <msgpack.hpp>
#include <type_traits>
namespace ray {
namespace internal {
@ -44,8 +45,16 @@ class Arguments {
}
} else {
if (lang_type == LangType::CPP) {
msgpack::sbuffer buffer = Serializer::Serialize(std::forward<InputArgTypes>(arg));
PushValueArg(task_args, std::move(buffer));
if constexpr (is_actor_handle_v<InputArgTypes>) {
auto serialized_actor_handle =
RayRuntimeHolder::Instance().Runtime()->SerializeActorHandle(arg.ID());
msgpack::sbuffer buffer = Serializer::Serialize(serialized_actor_handle);
PushValueArg(task_args, std::move(buffer));
} else {
msgpack::sbuffer buffer =
Serializer::Serialize(std::forward<InputArgTypes>(arg));
PushValueArg(task_args, std::move(buffer));
}
} else {
// Fill dummy field for handling kwargs.
if (lang_type == LangType::PYTHON) {

View file

@ -117,6 +117,10 @@ struct Invoker {
if constexpr (is_object_ref_v<T>) {
// Construct an ObjectRef<T> by id.
return T(std::string(args_buffer.data(), args_buffer.size()));
} else if constexpr (is_actor_handle_v<T>) {
auto actor_handle =
Serializer::Deserialize<std::string>(args_buffer.data(), args_buffer.size());
return T::FromBytes(actor_handle);
} else {
auto [success, value] =
Serializer::DeserializeWhenNil<T>(args_buffer.data(), args_buffer.size());

View file

@ -93,6 +93,9 @@ class RayRuntime {
virtual PlacementGroup GetPlacementGroup(const std::string &name) = 0;
virtual bool IsLocalMode() { return false; }
virtual std::string GetNamespace() = 0;
virtual std::string SerializeActorHandle(const std::string &actor_id) = 0;
virtual std::string DeserializeAndRegisterActorHandle(
const std::string &serialized_actor_handle) = 0;
};
} // namespace internal
} // namespace ray

View file

@ -51,6 +51,16 @@ struct is_object_ref_t<T, std::void_t<decltype(std::declval<T>().IsObjectRef())>
template <typename T>
auto constexpr is_object_ref_v = is_object_ref_t<T>::value;
template <class, class = void>
struct is_actor_handle_t : std::false_type {};
template <class T>
struct is_actor_handle_t<T, std::void_t<decltype(std::declval<T>().IsActorHandle())>>
: std::true_type {};
template <typename T>
auto constexpr is_actor_handle_v = is_actor_handle_t<T>::value;
template <class, class = void>
struct is_python_t : std::false_type {};

View file

@ -366,5 +366,22 @@ std::string AbstractRayRuntime::GetNamespace() {
return core_worker.GetJobConfig().ray_namespace();
}
std::string AbstractRayRuntime::SerializeActorHandle(const std::string &actor_id) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
std::string output;
ObjectID actor_handle_id;
auto status = core_worker.SerializeActorHandle(
ActorID::FromBinary(actor_id), &output, &actor_handle_id);
return output;
}
std::string AbstractRayRuntime::DeserializeAndRegisterActorHandle(
const std::string &serialized_actor_handle) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
return core_worker
.DeserializeAndRegisterActorHandle(serialized_actor_handle, ObjectID::Nil())
.Binary();
}
} // namespace internal
} // namespace ray

View file

@ -108,6 +108,9 @@ class AbstractRayRuntime : public RayRuntime {
virtual PlacementGroup GetPlacementGroup(const std::string &name);
std::string GetNamespace();
std::string SerializeActorHandle(const std::string &actor_id);
std::string DeserializeAndRegisterActorHandle(
const std::string &serialized_actor_handle);
protected:
std::unique_ptr<TaskSubmitter> task_submitter_;

View file

@ -238,6 +238,15 @@ TEST(RayClusterModeTest, FullTest) {
EXPECT_FALSE(Counter::IsProcessAlive(pid));
}
TEST(RayClusterModeTest, ActorHandleTest) {
auto actor1 = ray::Actor(RAY_FUNC(Counter::FactoryCreate)).Remote();
auto obj1 = actor1.Task(&Counter::Plus1).Remote();
EXPECT_EQ(1, *obj1.Get());
auto actor2 = ray::Actor(RAY_FUNC(Counter::FactoryCreate)).Remote();
auto obj2 = actor2.Task(&Counter::Plus1ForActor).Remote(actor1);
EXPECT_EQ(2, *obj2.Get());
}
TEST(RayClusterModeTest, PythonInvocationTest) {
auto py_actor_handle =
ray::Actor(ray::PyActorClass{"test_cross_language_invocation", "Counter"})

View file

@ -91,6 +91,10 @@ ray::ActorHandle<Counter> Counter::CreateChildActor(std::string actor_name) {
std::string Counter::GetNamespaceInActor() { return ray::GetNamespace(); }
int Counter::Plus1ForActor(ray::ActorHandle<Counter> actor) {
return *actor.Task(&Counter::Plus1).Remote().Get();
}
RAY_REMOTE(RAY_FUNC(Counter::FactoryCreate),
Counter::FactoryCreateException,
RAY_FUNC(Counter::FactoryCreate, int),
@ -107,7 +111,8 @@ RAY_REMOTE(RAY_FUNC(Counter::FactoryCreate),
&Counter::GetIntVal,
&Counter::Initialized,
&Counter::CreateChildActor,
&Counter::GetEnvVar);
&Counter::GetEnvVar,
&Counter::Plus1ForActor);
RAY_REMOTE(ActorConcurrentCall::FactoryCreate, &ActorConcurrentCall::CountDown);

View file

@ -38,6 +38,7 @@ class Counter {
bool CheckRestartInActorCreationTask();
bool CheckRestartInActorTask();
ray::ActorHandle<Counter> CreateChildActor(std::string actor_name);
int Plus1ForActor(ray::ActorHandle<Counter> actor);
std::string GetNamespaceInActor();