mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[C++ worker] Ray actor task for RAY_REMOTE (#15039)
This commit is contained in:
parent
9c5a0cfd7a
commit
e54dfd8cc5
17 changed files with 539 additions and 249 deletions
|
@ -20,6 +20,10 @@ class Counter {
|
|||
int count;
|
||||
|
||||
Counter(int init) { count = init; }
|
||||
template <typename... Args>
|
||||
static Counter *GenericFactoryCreate(Args... args) {
|
||||
return FactoryCreate(args...);
|
||||
}
|
||||
static Counter *FactoryCreate() { return new Counter(0); }
|
||||
static Counter *FactoryCreate(int init) { return new Counter(init); }
|
||||
static Counter *FactoryCreate(int init1, int init2) {
|
||||
|
@ -73,20 +77,21 @@ int main(int argc, char **argv) {
|
|||
std::cout << "task_result2 = " << task_result2 << std::endl;
|
||||
|
||||
/// actor task without args
|
||||
ActorHandle<Counter> actor1 = Ray::Actor(Counter::FactoryCreate).Remote();
|
||||
ActorHandle<Counter> actor1 = Ray::Actor(Counter::GenericFactoryCreate<>).Remote();
|
||||
auto actor_object1 = actor1.Task(&Counter::Plus1).Remote();
|
||||
int actor_result1 = *(Ray::Get(actor_object1));
|
||||
std::cout << "actor_result1 = " << actor_result1 << std::endl;
|
||||
|
||||
/// actor task with args
|
||||
ActorHandle<Counter> actor2 = Ray::Actor(Counter::FactoryCreate, 1).Remote();
|
||||
auto actor_object2 = actor2.Task(&Counter::Add, 5).Remote();
|
||||
ActorHandle<Counter> actor2 = Ray::Actor(Counter::GenericFactoryCreate<int>).Remote(1);
|
||||
auto actor_object2 = actor2.Task(&Counter::Add).Remote(5);
|
||||
int actor_result2 = *(Ray::Get(actor_object2));
|
||||
std::cout << "actor_result2 = " << actor_result2 << std::endl;
|
||||
|
||||
/// actor task with args which pass by reference
|
||||
ActorHandle<Counter> actor3 = Ray::Actor(Counter::FactoryCreate, 6, 0).Remote();
|
||||
auto actor_object3 = actor3.Task(&Counter::Add, actor_object2).Remote();
|
||||
ActorHandle<Counter> actor3 =
|
||||
Ray::Actor(Counter::GenericFactoryCreate<int, int>).Remote(6, 0);
|
||||
auto actor_object3 = actor3.Task(&Counter::Add).Remote(actor_object2);
|
||||
int actor_result3 = *(Ray::Get(actor_object3));
|
||||
std::cout << "actor_result3 = " << actor_result3 << std::endl;
|
||||
|
||||
|
@ -107,17 +112,18 @@ int main(int argc, char **argv) {
|
|||
<< std::endl;
|
||||
|
||||
/// create actor and actor function remote call with args passed by value
|
||||
ActorHandle<Counter> actor4 = Ray::Actor(Counter::FactoryCreate, 10).Remote();
|
||||
auto r10 = actor4.Task(&Counter::Add, 8).Remote();
|
||||
ActorHandle<Counter> actor4 = Ray::Actor(Counter::GenericFactoryCreate<int>).Remote(10);
|
||||
auto r10 = actor4.Task(&Counter::Add).Remote(8);
|
||||
int actor_result4 = *(Ray::Get(r10));
|
||||
std::cout << "actor_result4 = " << actor_result4 << std::endl;
|
||||
|
||||
/// create actor and task function remote call with args passed by reference
|
||||
ActorHandle<Counter> actor5 = Ray::Actor(Counter::FactoryCreate, r10, 0).Remote();
|
||||
auto r11 = actor5.Task(&Counter::Add, r0).Remote();
|
||||
auto r12 = actor5.Task(&Counter::Add, r11).Remote();
|
||||
auto r13 = actor5.Task(&Counter::Add, r10).Remote();
|
||||
auto r14 = actor5.Task(&Counter::Add, r13).Remote();
|
||||
ActorHandle<Counter> actor5 =
|
||||
Ray::Actor(Counter::GenericFactoryCreate<int, int>).Remote(r10, 0);
|
||||
auto r11 = actor5.Task(&Counter::Add).Remote(r0);
|
||||
auto r12 = actor5.Task(&Counter::Add).Remote(r11);
|
||||
auto r13 = actor5.Task(&Counter::Add).Remote(r10);
|
||||
auto r14 = actor5.Task(&Counter::Add).Remote(r13);
|
||||
auto r15 = Ray::Task(Plus).Remote(r0, r11);
|
||||
auto r16 = Ray::Task(Plus1).Remote(r15);
|
||||
int result12 = *(Ray::Get(r12));
|
||||
|
|
|
@ -79,42 +79,23 @@ class Ray {
|
|||
/// \param[in] args The function arguments passed by a value or ObjectRef.
|
||||
/// \return TaskCaller.
|
||||
template <typename F>
|
||||
static TaskCaller<boost::callable_traits::return_type_t<F>> Task(F func);
|
||||
static TaskCaller<F> Task(F func);
|
||||
|
||||
/// Generic version of creating an actor
|
||||
/// It is used for creating an actor, such as: ActorCreator<Counter> creator =
|
||||
/// Ray::Actor(Counter::FactoryCreate, 1).
|
||||
/// Ray::Actor(Counter::FactoryCreate<int>).Remote(1);
|
||||
template <typename ActorType, typename... Args>
|
||||
static ActorCreator<ActorType> Actor(
|
||||
CreateActorFunc<ActorType, typename FilterArgType<Args>::type...> create_func,
|
||||
Args... args);
|
||||
|
||||
/// TODO: The bellow specific version of creating an actor will be replaced with generic
|
||||
/// version later.
|
||||
#include <ray/api/generated/create_funcs.generated.h>
|
||||
|
||||
#include "api/generated/create_actors.generated.h"
|
||||
static ActorCreator<ActorType> Actor(CreateActorFunc<ActorType, Args...> create_func);
|
||||
|
||||
private:
|
||||
static std::once_flag is_inited_;
|
||||
|
||||
template <typename ReturnType, typename FuncType>
|
||||
static TaskCaller<ReturnType> TaskInternal(FuncType &func);
|
||||
template <typename FuncType>
|
||||
static TaskCaller<FuncType> TaskInternal(FuncType &func);
|
||||
|
||||
template <typename ActorType, typename FuncType, typename ExecFuncType,
|
||||
typename... ArgTypes>
|
||||
template <typename ActorType, typename FuncType, typename ExecFuncType>
|
||||
static ActorCreator<ActorType> CreateActorInternal(FuncType &func,
|
||||
ExecFuncType &exec_func,
|
||||
ArgTypes &... args);
|
||||
|
||||
/// Include the `Call` methods for calling actor methods.
|
||||
/// Used by ActorHandle to implement .Call()
|
||||
/// It is called by ActorHandle: Ray::Task(&Counter::Add, counter/*instance of
|
||||
/// Counter*/, 1);
|
||||
template <typename ReturnType, typename ActorType, typename... Args>
|
||||
static ActorTaskCaller<ReturnType> Task(
|
||||
ActorFunc<ActorType, ReturnType, typename FilterArgType<Args>::type...> actor_func,
|
||||
ActorHandle<ActorType> &actor, Args... args);
|
||||
ExecFuncType &exec_func);
|
||||
};
|
||||
|
||||
} // namespace api
|
||||
|
@ -170,8 +151,8 @@ inline WaitResult Ray::Wait(const std::vector<ObjectID> &ids, int num_objects,
|
|||
return ray::internal::RayRuntime()->Wait(ids, num_objects, timeout_ms);
|
||||
}
|
||||
|
||||
template <typename ReturnType, typename FuncType>
|
||||
inline TaskCaller<ReturnType> Ray::TaskInternal(FuncType &func) {
|
||||
template <typename FuncType>
|
||||
inline TaskCaller<FuncType> Ray::TaskInternal(FuncType &func) {
|
||||
RemoteFunctionPtrHolder ptr{};
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(func);
|
||||
if (ray::api::RayConfig::GetInstance()->use_ray_remote) {
|
||||
|
@ -182,55 +163,42 @@ inline TaskCaller<ReturnType> Ray::TaskInternal(FuncType &func) {
|
|||
}
|
||||
ptr.function_name = std::move(function_name);
|
||||
}
|
||||
return TaskCaller<ReturnType>(ray::internal::RayRuntime().get(), ptr);
|
||||
return TaskCaller<FuncType>(ray::internal::RayRuntime().get(), ptr);
|
||||
}
|
||||
|
||||
template <typename ActorType, typename FuncType, typename ExecFuncType,
|
||||
typename... ArgTypes>
|
||||
template <typename ActorType, typename FuncType, typename ExecFuncType>
|
||||
inline ActorCreator<ActorType> Ray::CreateActorInternal(FuncType &create_func,
|
||||
ExecFuncType &exec_func,
|
||||
ArgTypes &... args) {
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> task_args;
|
||||
Arguments::WrapArgs(&task_args, args...);
|
||||
RemoteFunctionPtrHolder ptr;
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(create_func);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
return ActorCreator<ActorType>(ray::internal::RayRuntime().get(), ptr,
|
||||
std::move(task_args));
|
||||
ExecFuncType &exec_func) {
|
||||
RemoteFunctionPtrHolder ptr{};
|
||||
if (ray::api::RayConfig::GetInstance()->use_ray_remote) {
|
||||
auto function_name =
|
||||
ray::internal::FunctionManager::Instance().GetFunctionName(create_func);
|
||||
if (function_name.empty()) {
|
||||
throw RayException(
|
||||
"Function not found. Please use RAY_REMOTE to register this function.");
|
||||
}
|
||||
|
||||
ptr.function_name = std::move(function_name);
|
||||
} else {
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(create_func);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
}
|
||||
|
||||
return ActorCreator<ActorType>(ray::internal::RayRuntime().get(), ptr);
|
||||
}
|
||||
|
||||
/// Normal task.
|
||||
template <typename F>
|
||||
TaskCaller<boost::callable_traits::return_type_t<F>> Ray::Task(F func) {
|
||||
using ReturnType = boost::callable_traits::return_type_t<F>;
|
||||
|
||||
return TaskInternal<ReturnType>(func);
|
||||
TaskCaller<F> Ray::Task(F func) {
|
||||
return TaskInternal<F>(func);
|
||||
}
|
||||
|
||||
/// Generic version of creating an actor.
|
||||
/// Creating an actor.
|
||||
template <typename ActorType, typename... Args>
|
||||
ActorCreator<ActorType> Ray::Actor(
|
||||
CreateActorFunc<ActorType, typename FilterArgType<Args>::type...> create_func,
|
||||
Args... args) {
|
||||
ActorCreator<ActorType> Ray::Actor(CreateActorFunc<ActorType, Args...> create_func) {
|
||||
return CreateActorInternal<ActorType>(
|
||||
create_func,
|
||||
CreateActorExecFunction<ActorType *, typename FilterArgType<Args>::type...>,
|
||||
args...);
|
||||
}
|
||||
|
||||
/// TODO: The bellow specific version of creating an actor will be replaced with generic
|
||||
/// version later.
|
||||
#include <ray/api/generated/create_actors_impl.generated.h>
|
||||
|
||||
/// Actor task.
|
||||
template <typename ReturnType, typename ActorType, typename... Args>
|
||||
ActorTaskCaller<ReturnType> Ray::Task(
|
||||
ActorFunc<ActorType, ReturnType, typename FilterArgType<Args>::type...> actor_func,
|
||||
ActorHandle<ActorType> &actor, Args... args) {
|
||||
return CallActorInternal<ReturnType, ActorType>(
|
||||
actor_func,
|
||||
ActorExecFunction<ReturnType, ActorType, typename FilterArgType<Args>::type...>,
|
||||
actor, args...);
|
||||
CreateActorExecFunction<ActorType *, typename FilterArgType<Args>::type...>);
|
||||
}
|
||||
|
||||
} // namespace api
|
||||
|
|
|
@ -12,10 +12,11 @@ class ActorCreator {
|
|||
public:
|
||||
ActorCreator();
|
||||
|
||||
ActorCreator(RayRuntime *runtime, RemoteFunctionPtrHolder ptr,
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args);
|
||||
ActorCreator(RayRuntime *runtime, RemoteFunctionPtrHolder ptr)
|
||||
: runtime_(runtime), ptr_(ptr) {}
|
||||
|
||||
ActorHandle<ActorType> Remote();
|
||||
template <typename... Args>
|
||||
ActorHandle<ActorType> Remote(Args... args);
|
||||
|
||||
private:
|
||||
RayRuntime *runtime_;
|
||||
|
@ -29,12 +30,9 @@ template <typename ActorType>
|
|||
ActorCreator<ActorType>::ActorCreator() {}
|
||||
|
||||
template <typename ActorType>
|
||||
ActorCreator<ActorType>::ActorCreator(RayRuntime *runtime, RemoteFunctionPtrHolder ptr,
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args)
|
||||
: runtime_(runtime), ptr_(ptr), args_(std::move(args)) {}
|
||||
|
||||
template <typename ActorType>
|
||||
ActorHandle<ActorType> ActorCreator<ActorType>::Remote() {
|
||||
template <typename... Args>
|
||||
ActorHandle<ActorType> ActorCreator<ActorType>::Remote(Args... args) {
|
||||
Arguments::WrapArgs(&args_, args...);
|
||||
auto returned_actor_id = runtime_->CreateActor(ptr_, args_);
|
||||
return ActorHandle<ActorType>(returned_actor_id);
|
||||
}
|
||||
|
|
|
@ -11,16 +11,6 @@
|
|||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
template <typename T>
|
||||
struct FilterArgType {
|
||||
using type = T;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct FilterArgType<ObjectRef<T>> {
|
||||
using type = T;
|
||||
};
|
||||
|
||||
template <typename ActorType, typename ReturnType, typename... Args>
|
||||
using ActorFunc = ReturnType (ActorType::*)(Args...);
|
||||
|
||||
|
@ -39,11 +29,8 @@ class ActorHandle {
|
|||
const ActorID &ID() const;
|
||||
|
||||
/// Include the `Call` methods for calling remote functions.
|
||||
|
||||
template <typename ReturnType, typename... Args>
|
||||
ActorTaskCaller<ReturnType> Task(
|
||||
ActorFunc<ActorType, ReturnType, typename FilterArgType<Args>::type...> actor_func,
|
||||
Args... args);
|
||||
template <typename F>
|
||||
ActorTaskCaller<F> Task(F actor_func);
|
||||
|
||||
/// Make ActorHandle serializable
|
||||
MSGPACK_DEFINE(id_);
|
||||
|
@ -53,22 +40,6 @@ class ActorHandle {
|
|||
};
|
||||
|
||||
// ---------- implementation ----------
|
||||
template <typename ReturnType, typename ActorType, typename FuncType,
|
||||
typename ExecFuncType, typename... ArgTypes>
|
||||
inline ActorTaskCaller<ReturnType> CallActorInternal(FuncType &actor_func,
|
||||
ExecFuncType &exec_func,
|
||||
ActorHandle<ActorType> &actor,
|
||||
ArgTypes &... args) {
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> task_args;
|
||||
Arguments::WrapArgs(&task_args, args...);
|
||||
RemoteFunctionPtrHolder ptr;
|
||||
MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func);
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(holder.value[0]);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
return ActorTaskCaller<ReturnType>(internal::RayRuntime().get(), actor.ID(), ptr,
|
||||
std::move(task_args));
|
||||
}
|
||||
|
||||
template <typename ActorType>
|
||||
ActorHandle<ActorType>::ActorHandle() {}
|
||||
|
||||
|
@ -83,14 +54,23 @@ const ActorID &ActorHandle<ActorType>::ID() const {
|
|||
}
|
||||
|
||||
template <typename ActorType>
|
||||
template <typename ReturnType, typename... Args>
|
||||
ActorTaskCaller<ReturnType> ActorHandle<ActorType>::Task(
|
||||
ActorFunc<ActorType, ReturnType, typename FilterArgType<Args>::type...> actor_func,
|
||||
Args... args) {
|
||||
return CallActorInternal<ReturnType, ActorType>(
|
||||
actor_func,
|
||||
ActorExecFunction<ReturnType, ActorType, typename FilterArgType<Args>::type...>,
|
||||
*this, args...);
|
||||
template <typename F>
|
||||
ActorTaskCaller<F> ActorHandle<ActorType>::Task(F actor_func) {
|
||||
RemoteFunctionPtrHolder ptr{};
|
||||
if (ray::api::RayConfig::GetInstance()->use_ray_remote) {
|
||||
auto function_name =
|
||||
ray::internal::FunctionManager::Instance().GetFunctionName(actor_func);
|
||||
if (function_name.empty()) {
|
||||
throw RayException(
|
||||
"Function not found. Please use RAY_REMOTE to register this function.");
|
||||
}
|
||||
ptr.function_name = std::move(function_name);
|
||||
return ActorTaskCaller<F>(internal::RayRuntime().get(), id_, ptr);
|
||||
}
|
||||
|
||||
MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func);
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(holder.value[0]);
|
||||
return ActorTaskCaller<F>(internal::RayRuntime().get(), id_, ptr);
|
||||
}
|
||||
|
||||
} // namespace api
|
||||
|
|
|
@ -1,21 +1,29 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <ray/api/arguments.h>
|
||||
#include <ray/api/exec_funcs.h>
|
||||
#include <ray/api/object_ref.h>
|
||||
#include <ray/api/static_check.h>
|
||||
#include "ray/core.h"
|
||||
|
||||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
template <typename ReturnType>
|
||||
template <typename F>
|
||||
class ActorTaskCaller {
|
||||
public:
|
||||
ActorTaskCaller();
|
||||
ActorTaskCaller() = default;
|
||||
|
||||
ActorTaskCaller(RayRuntime *runtime, ActorID id, RemoteFunctionPtrHolder ptr,
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args)
|
||||
: runtime_(runtime), id_(id), ptr_(ptr), args_(std::move(args)) {}
|
||||
|
||||
ObjectRef<ReturnType> Remote();
|
||||
ActorTaskCaller(RayRuntime *runtime, ActorID id, RemoteFunctionPtrHolder ptr)
|
||||
: runtime_(runtime), id_(id), ptr_(ptr) {}
|
||||
|
||||
template <typename... Args>
|
||||
ObjectRef<boost::callable_traits::return_type_t<F>> Remote(Args... args);
|
||||
|
||||
private:
|
||||
RayRuntime *runtime_;
|
||||
|
@ -26,19 +34,23 @@ class ActorTaskCaller {
|
|||
|
||||
// ---------- implementation ----------
|
||||
|
||||
template <typename ReturnType>
|
||||
ActorTaskCaller<ReturnType>::ActorTaskCaller() {}
|
||||
template <typename F>
|
||||
template <typename... Args>
|
||||
ObjectRef<boost::callable_traits::return_type_t<F>> ActorTaskCaller<F>::Remote(
|
||||
Args... args) {
|
||||
using ActorType = boost::callable_traits::class_of_t<F>;
|
||||
using ReturnType = boost::callable_traits::return_type_t<F>;
|
||||
StaticCheck<F, Args...>();
|
||||
if (!ray::api::RayConfig::GetInstance()->use_ray_remote) {
|
||||
auto exe_func =
|
||||
ActorExecFunction<ReturnType, ActorType, typename FilterArgType<Args>::type...>;
|
||||
ptr_.exec_function_pointer = reinterpret_cast<uintptr_t>(exe_func);
|
||||
}
|
||||
|
||||
template <typename ReturnType>
|
||||
ActorTaskCaller<ReturnType>::ActorTaskCaller(
|
||||
RayRuntime *runtime, ActorID id, RemoteFunctionPtrHolder ptr,
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args)
|
||||
: runtime_(runtime), id_(id), ptr_(ptr), args_(std::move(args)) {}
|
||||
|
||||
template <typename ReturnType>
|
||||
ObjectRef<ReturnType> ActorTaskCaller<ReturnType>::Remote() {
|
||||
Arguments::WrapArgs(&args_, args...);
|
||||
auto returned_object_id = runtime_->CallActor(ptr_, id_, args_);
|
||||
return ObjectRef<ReturnType>(returned_object_id);
|
||||
}
|
||||
|
||||
} // namespace api
|
||||
} // namespace ray
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
#pragma once
|
||||
|
||||
#include <ray/api/serializer.h>
|
||||
#include "absl/utility/utility.h"
|
||||
|
||||
#include <boost/callable_traits.hpp>
|
||||
#include <functional>
|
||||
|
@ -24,16 +23,24 @@
|
|||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "absl/utility/utility.h"
|
||||
#include "ray/core.h"
|
||||
|
||||
namespace ray {
|
||||
namespace internal {
|
||||
|
||||
template <typename T>
|
||||
inline static msgpack::sbuffer PackReturnValue(T result) {
|
||||
inline static absl::enable_if_t<!std::is_pointer<T>::value, msgpack::sbuffer>
|
||||
PackReturnValue(T result) {
|
||||
return ray::api::Serializer::Serialize(std::move(result));
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
inline static absl::enable_if_t<std::is_pointer<T>::value, msgpack::sbuffer>
|
||||
PackReturnValue(T result) {
|
||||
return ray::api::Serializer::Serialize((uint64_t)result);
|
||||
}
|
||||
|
||||
inline static msgpack::sbuffer PackVoid() {
|
||||
return ray::api::Serializer::Serialize(msgpack::type::nil_t());
|
||||
}
|
||||
|
@ -48,6 +55,17 @@ inline static msgpack::sbuffer PackError(std::string error_msg) {
|
|||
return sbuffer;
|
||||
}
|
||||
|
||||
template <typename>
|
||||
struct RemoveFirst;
|
||||
|
||||
template <class First, class... Second>
|
||||
struct RemoveFirst<std::tuple<First, Second...>> {
|
||||
using type = std::tuple<Second...>;
|
||||
};
|
||||
|
||||
template <class Tuple>
|
||||
using RemoveFirst_t = typename RemoveFirst<Tuple>::type;
|
||||
|
||||
/// It's help to invoke functions and member functions, the class Invoker<Function> help
|
||||
/// do type erase.
|
||||
template <typename Function>
|
||||
|
@ -82,6 +100,35 @@ struct Invoker {
|
|||
return result;
|
||||
}
|
||||
|
||||
static inline msgpack::sbuffer ApplyMember(
|
||||
const Function &func, msgpack::sbuffer *ptr,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
|
||||
using ArgsTuple = RemoveFirst_t<boost::callable_traits::args_t<Function>>;
|
||||
if (std::tuple_size<ArgsTuple>::value != args_buffer.size()) {
|
||||
return PackError("Arguments number not match");
|
||||
}
|
||||
|
||||
msgpack::sbuffer result;
|
||||
ArgsTuple tp{};
|
||||
try {
|
||||
bool is_ok =
|
||||
GetArgsTuple(tp, args_buffer,
|
||||
absl::make_index_sequence<std::tuple_size<ArgsTuple>::value>{});
|
||||
if (!is_ok) {
|
||||
return PackError("arguments error");
|
||||
}
|
||||
result = Invoker<Function>::CallMember(func, ptr, std::move(tp));
|
||||
} catch (msgpack::type_error &e) {
|
||||
result = PackError(std::string("invalid arguments: ") + e.what());
|
||||
} catch (const std::exception &e) {
|
||||
result = PackError(std::string("function execute exception: ") + e.what());
|
||||
} catch (...) {
|
||||
result = PackError("unknown exception");
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename T>
|
||||
static inline T ParseArg(char *data, size_t size, bool &is_ok) {
|
||||
|
@ -111,25 +158,56 @@ struct Invoker {
|
|||
template <typename F, typename... Args>
|
||||
static absl::enable_if_t<std::is_void<absl::result_of_t<F(Args...)>>::value,
|
||||
msgpack::sbuffer>
|
||||
Call(const F &f, std::tuple<Args...> tp) {
|
||||
CallInternal(f, absl::make_index_sequence<sizeof...(Args)>{}, std::move(tp));
|
||||
Call(const F &f, std::tuple<Args...> args) {
|
||||
CallInternal(f, absl::make_index_sequence<sizeof...(Args)>{}, std::move(args));
|
||||
return PackVoid();
|
||||
}
|
||||
|
||||
template <typename F, typename... Args>
|
||||
static absl::enable_if_t<!std::is_void<absl::result_of_t<F(Args...)>>::value,
|
||||
msgpack::sbuffer>
|
||||
Call(const F &f, std::tuple<Args...> tp) {
|
||||
auto r = CallInternal(f, absl::make_index_sequence<sizeof...(Args)>{}, std::move(tp));
|
||||
Call(const F &f, std::tuple<Args...> args) {
|
||||
auto r =
|
||||
CallInternal(f, absl::make_index_sequence<sizeof...(Args)>{}, std::move(args));
|
||||
return PackReturnValue(r);
|
||||
}
|
||||
|
||||
template <typename F, size_t... I, typename... Args>
|
||||
static absl::result_of_t<F(Args...)> CallInternal(const F &f,
|
||||
const absl::index_sequence<I...> &,
|
||||
std::tuple<Args...> tup) {
|
||||
(void)tup;
|
||||
return f(std::move(std::get<I>(tup))...);
|
||||
std::tuple<Args...> args) {
|
||||
(void)args;
|
||||
return f(std::move(std::get<I>(args))...);
|
||||
}
|
||||
|
||||
template <typename F, typename... Args>
|
||||
static absl::enable_if_t<std::is_void<boost::callable_traits::return_type_t<F>>::value,
|
||||
msgpack::sbuffer>
|
||||
CallMember(const F &f, msgpack::sbuffer *ptr, std::tuple<Args...> args) {
|
||||
CallMemberInternal(f, ptr, absl::make_index_sequence<sizeof...(Args)>{},
|
||||
std::move(args));
|
||||
return PackVoid();
|
||||
}
|
||||
|
||||
template <typename F, typename... Args>
|
||||
static absl::enable_if_t<!std::is_void<boost::callable_traits::return_type_t<F>>::value,
|
||||
msgpack::sbuffer>
|
||||
CallMember(const F &f, msgpack::sbuffer *ptr, std::tuple<Args...> args) {
|
||||
auto r = CallMemberInternal(f, ptr, absl::make_index_sequence<sizeof...(Args)>{},
|
||||
std::move(args));
|
||||
return PackReturnValue(r);
|
||||
}
|
||||
|
||||
template <typename F, size_t... I, typename... Args>
|
||||
static boost::callable_traits::return_type_t<F> CallMemberInternal(
|
||||
const F &f, msgpack::sbuffer *ptr, const absl::index_sequence<I...> &,
|
||||
std::tuple<Args...> args) {
|
||||
(void)args;
|
||||
uint64_t actor_ptr =
|
||||
ray::api::Serializer::Deserialize<uint64_t>(ptr->data(), ptr->size());
|
||||
using Self = boost::callable_traits::class_of_t<F>;
|
||||
Self *self = (Self *)actor_ptr;
|
||||
return (self->*f)(std::move(std::get<I>(args))...);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -153,20 +231,40 @@ class FunctionManager {
|
|||
}
|
||||
|
||||
template <typename Function>
|
||||
bool RegisterRemoteFunction(std::string const &name, const Function &f) {
|
||||
/// Now it is just support free function, it will be
|
||||
/// improved to support member function later.
|
||||
auto pair = func_ptr_to_key_map_.emplace((uint64_t)f, name);
|
||||
absl::enable_if_t<!std::is_member_function_pointer<Function>::value, bool>
|
||||
RegisterRemoteFunction(std::string const &name, const Function &f) {
|
||||
auto pair = func_ptr_to_key_map_.emplace(GetAddress(f), name);
|
||||
if (!pair.second) {
|
||||
return false;
|
||||
throw ray::api::RayException("Duplicate RAY_REMOTE function: " + name);
|
||||
}
|
||||
|
||||
return RegisterNonMemberFunc(name, f);
|
||||
bool ok = RegisterNonMemberFunc(name, f);
|
||||
if (!ok) {
|
||||
throw ray::api::RayException("Duplicate RAY_REMOTE function: " + name);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename Function>
|
||||
absl::enable_if_t<std::is_member_function_pointer<Function>::value, bool>
|
||||
RegisterRemoteFunction(std::string const &name, const Function &f) {
|
||||
auto pair = func_ptr_to_key_map_.emplace(GetAddress(f), name);
|
||||
if (!pair.second) {
|
||||
throw ray::api::RayException("Duplicate RAY_REMOTE function: " + name);
|
||||
}
|
||||
|
||||
bool ok = RegisterMemberFunc(name, f);
|
||||
if (!ok) {
|
||||
throw ray::api::RayException("Duplicate RAY_REMOTE function: " + name);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename Function>
|
||||
std::string GetFunctionName(const Function &f) {
|
||||
auto it = func_ptr_to_key_map_.find((uint64_t)f);
|
||||
auto it = func_ptr_to_key_map_.find(GetAddress(f));
|
||||
if (it == func_ptr_to_key_map_.end()) {
|
||||
return "";
|
||||
}
|
||||
|
@ -174,6 +272,17 @@ class FunctionManager {
|
|||
return it->second;
|
||||
}
|
||||
|
||||
std::function<msgpack::sbuffer(msgpack::sbuffer *,
|
||||
const std::vector<std::shared_ptr<RayObject>> &)>
|
||||
*GetMemberFunction(const std::string &func_name) {
|
||||
auto it = map_mem_func_invokers_.find(func_name);
|
||||
if (it == map_mem_func_invokers_.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return &it->second;
|
||||
}
|
||||
|
||||
private:
|
||||
FunctionManager() = default;
|
||||
~FunctionManager() = default;
|
||||
|
@ -188,10 +297,38 @@ class FunctionManager {
|
|||
.second;
|
||||
}
|
||||
|
||||
template <typename Function>
|
||||
bool RegisterMemberFunc(std::string const &name, Function f) {
|
||||
return map_mem_func_invokers_
|
||||
.emplace(name, std::bind(&Invoker<Function>::ApplyMember, std::move(f),
|
||||
std::placeholders::_1, std::placeholders::_2))
|
||||
.second;
|
||||
}
|
||||
|
||||
template <class Dest, class Source>
|
||||
Dest BitCast(const Source &source) {
|
||||
static_assert(sizeof(Dest) == sizeof(Source),
|
||||
"BitCast requires source and destination to be the same size");
|
||||
|
||||
Dest dest;
|
||||
memcpy(&dest, &source, sizeof(dest));
|
||||
return dest;
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
std::string GetAddress(F f) {
|
||||
auto arr = BitCast<std::array<char, sizeof(F)>>(f);
|
||||
return std::string(arr.data(), arr.size());
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::function<msgpack::sbuffer(
|
||||
const std::vector<std::shared_ptr<RayObject>> &)>>
|
||||
map_invokers_;
|
||||
std::unordered_map<uintptr_t, std::string> func_ptr_to_key_map_;
|
||||
std::unordered_map<std::string, std::function<msgpack::sbuffer(
|
||||
msgpack::sbuffer *,
|
||||
const std::vector<std::shared_ptr<RayObject>> &)>>
|
||||
map_mem_func_invokers_;
|
||||
std::unordered_map<std::string, std::string> func_ptr_to_key_map_;
|
||||
};
|
||||
} // namespace internal
|
||||
} // namespace ray
|
|
@ -48,9 +48,8 @@ class Serializer {
|
|||
}
|
||||
|
||||
static bool HasError(char *data, size_t size) {
|
||||
size_t off = 0;
|
||||
msgpack::unpacked unpacked = msgpack::unpack(data, 1, off);
|
||||
return unpacked.get().is_nil() && size > off;
|
||||
msgpack::unpacked unpacked = msgpack::unpack(data, size);
|
||||
return unpacked.get().is_nil() && size > 1;
|
||||
}
|
||||
};
|
||||
|
||||
|
|
53
cpp/include/ray/api/static_check.h
Normal file
53
cpp/include/ray/api/static_check.h
Normal file
|
@ -0,0 +1,53 @@
|
|||
// Copyright 2017 The Ray Authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <ray/api/object_ref.h>
|
||||
|
||||
#include <boost/callable_traits.hpp>
|
||||
#include <type_traits>
|
||||
|
||||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
template <typename T>
|
||||
struct FilterArgType {
|
||||
using type = T;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct FilterArgType<ObjectRef<T>> {
|
||||
using type = T;
|
||||
};
|
||||
|
||||
template <typename Function, typename... Args>
|
||||
inline absl::enable_if_t<!std::is_member_function_pointer<Function>::value>
|
||||
StaticCheck() {
|
||||
static_assert(std::is_same<std::tuple<typename FilterArgType<Args>::type...>,
|
||||
boost::callable_traits::args_t<Function>>::value,
|
||||
"arguments not match");
|
||||
}
|
||||
|
||||
template <typename Function, typename... Args>
|
||||
inline absl::enable_if_t<std::is_member_function_pointer<Function>::value> StaticCheck() {
|
||||
using ActorType = boost::callable_traits::class_of_t<Function>;
|
||||
static_assert(
|
||||
std::is_same<std::tuple<ActorType &, typename FilterArgType<Args>::type...>,
|
||||
boost::callable_traits::args_t<Function>>::value,
|
||||
"arguments not match");
|
||||
}
|
||||
|
||||
} // namespace api
|
||||
} // namespace ray
|
|
@ -1,12 +1,13 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <ray/api/static_check.h>
|
||||
#include "ray/core.h"
|
||||
|
||||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
template <typename ReturnType>
|
||||
template <typename F>
|
||||
class TaskCaller {
|
||||
public:
|
||||
TaskCaller();
|
||||
|
@ -14,13 +15,7 @@ class TaskCaller {
|
|||
TaskCaller(RayRuntime *runtime, RemoteFunctionPtrHolder ptr);
|
||||
|
||||
template <typename... Args>
|
||||
ObjectRef<ReturnType> Remote(Args... args) {
|
||||
Arguments::WrapArgs(&args_, args...);
|
||||
ptr_.exec_function_pointer = reinterpret_cast<uintptr_t>(
|
||||
NormalExecFunction<ReturnType, typename FilterArgType<Args>::type...>);
|
||||
auto returned_object_id = runtime_->Call(ptr_, args_);
|
||||
return ObjectRef<ReturnType>(returned_object_id);
|
||||
}
|
||||
ObjectRef<boost::callable_traits::return_type_t<F>> Remote(Args... args);
|
||||
|
||||
private:
|
||||
RayRuntime *runtime_;
|
||||
|
@ -31,11 +26,23 @@ class TaskCaller {
|
|||
|
||||
// ---------- implementation ----------
|
||||
|
||||
template <typename ReturnType>
|
||||
TaskCaller<ReturnType>::TaskCaller() {}
|
||||
template <typename F>
|
||||
TaskCaller<F>::TaskCaller() {}
|
||||
|
||||
template <typename ReturnType>
|
||||
TaskCaller<ReturnType>::TaskCaller(RayRuntime *runtime, RemoteFunctionPtrHolder ptr)
|
||||
template <typename F>
|
||||
TaskCaller<F>::TaskCaller(RayRuntime *runtime, RemoteFunctionPtrHolder ptr)
|
||||
: runtime_(runtime), ptr_(ptr) {}
|
||||
|
||||
template <typename F>
|
||||
template <typename... Args>
|
||||
ObjectRef<boost::callable_traits::return_type_t<F>> TaskCaller<F>::Remote(Args... args) {
|
||||
StaticCheck<F, Args...>();
|
||||
using ReturnType = boost::callable_traits::return_type_t<F>;
|
||||
Arguments::WrapArgs(&args_, args...);
|
||||
ptr_.exec_function_pointer = reinterpret_cast<uintptr_t>(
|
||||
NormalExecFunction<ReturnType, typename FilterArgType<Args>::type...>);
|
||||
auto returned_object_id = runtime_->Call(ptr_, args_);
|
||||
return ObjectRef<ReturnType>(returned_object_id);
|
||||
}
|
||||
} // namespace api
|
||||
} // namespace ray
|
||||
|
|
|
@ -14,7 +14,8 @@ namespace internal {
|
|||
/// Execute remote functions by networking stream.
|
||||
msgpack::sbuffer TaskExecutionHandler(
|
||||
const std::string &func_name,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
msgpack::sbuffer *actor_ptr) {
|
||||
if (func_name.empty()) {
|
||||
return PackError("Task function name is empty");
|
||||
}
|
||||
|
@ -22,13 +23,23 @@ msgpack::sbuffer TaskExecutionHandler(
|
|||
msgpack::sbuffer result;
|
||||
do {
|
||||
try {
|
||||
auto func_ptr = FunctionManager::Instance().GetFunction(func_name);
|
||||
if (func_ptr == nullptr) {
|
||||
result = PackError("unknown function: " + func_name);
|
||||
break;
|
||||
}
|
||||
if (actor_ptr) {
|
||||
auto func_ptr = FunctionManager::Instance().GetMemberFunction(func_name);
|
||||
if (func_ptr == nullptr) {
|
||||
result = PackError("unknown actor task: " + func_name);
|
||||
break;
|
||||
}
|
||||
|
||||
result = (*func_ptr)(args_buffer);
|
||||
result = (*func_ptr)(actor_ptr, args_buffer);
|
||||
} else {
|
||||
auto func_ptr = FunctionManager::Instance().GetFunction(func_name);
|
||||
if (func_ptr == nullptr) {
|
||||
result = PackError("unknown function: " + func_name);
|
||||
break;
|
||||
}
|
||||
|
||||
result = (*func_ptr)(args_buffer);
|
||||
}
|
||||
} catch (const std::exception &ex) {
|
||||
result = PackError(ex.what());
|
||||
}
|
||||
|
@ -77,31 +88,58 @@ Status TaskExecutor::ExecuteTask(
|
|||
|
||||
std::shared_ptr<msgpack::sbuffer> data = nullptr;
|
||||
if (task_type == TaskType::ACTOR_CREATION_TASK) {
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer);
|
||||
ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset));
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args_buffer);
|
||||
if (!func_name.empty()) {
|
||||
auto entry_func = FunctionHelper::GetInstance().GetEntryFunction(lib_name);
|
||||
if (entry_func == nullptr) {
|
||||
return ray::Status::NotFound(lib_name + " not found");
|
||||
}
|
||||
|
||||
RAY_LOG(DEBUG) << "Get execute function" << func_name << " ok";
|
||||
auto result = entry_func(func_name, args_buffer, nullptr);
|
||||
RAY_LOG(DEBUG) << "Execute function" << func_name << " ok";
|
||||
data = std::make_shared<msgpack::sbuffer>(std::move(result));
|
||||
} else {
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer);
|
||||
ExecFunction exec_function =
|
||||
(ExecFunction)(base_addr + std::stoul(exec_func_offset));
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args_buffer);
|
||||
}
|
||||
|
||||
current_actor_ = data;
|
||||
} else if (task_type == TaskType::ACTOR_TASK) {
|
||||
RAY_CHECK(current_actor_ != nullptr);
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
std::shared_ptr<msgpack::sbuffer> object);
|
||||
ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset));
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args_buffer, current_actor_);
|
||||
} else { // NORMAL_TASK
|
||||
if (!func_name.empty()) {
|
||||
auto execute_func = FunctionHelper::GetInstance().GetExecuteFunction(lib_name);
|
||||
if (execute_func == nullptr) {
|
||||
auto entry_func = FunctionHelper::GetInstance().GetEntryFunction(lib_name);
|
||||
if (entry_func == nullptr) {
|
||||
return ray::Status::NotFound(lib_name + " not found");
|
||||
}
|
||||
|
||||
RAY_LOG(DEBUG) << "Get execute function ok";
|
||||
auto result = execute_func(func_name, args_buffer);
|
||||
auto result = entry_func(func_name, args_buffer, current_actor_.get());
|
||||
RAY_LOG(DEBUG) << "Execute function ok";
|
||||
data = std::make_shared<msgpack::sbuffer>(std::move(result));
|
||||
} else {
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
std::shared_ptr<msgpack::sbuffer> object);
|
||||
ExecFunction exec_function =
|
||||
(ExecFunction)(base_addr + std::stoul(exec_func_offset));
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args_buffer, current_actor_);
|
||||
}
|
||||
} else { // NORMAL_TASK
|
||||
if (!func_name.empty()) {
|
||||
auto entry_func = FunctionHelper::GetInstance().GetEntryFunction(lib_name);
|
||||
if (entry_func == nullptr) {
|
||||
return ray::Status::NotFound(lib_name + " not found");
|
||||
}
|
||||
|
||||
RAY_LOG(DEBUG) << "Get execute function ok";
|
||||
auto result = entry_func(func_name, args_buffer, nullptr);
|
||||
RAY_LOG(DEBUG) << "Execute function ok";
|
||||
data = std::make_shared<msgpack::sbuffer>(std::move(result));
|
||||
} else {
|
||||
|
@ -162,10 +200,25 @@ void TaskExecutor::Invoke(
|
|||
|
||||
std::shared_ptr<msgpack::sbuffer> data;
|
||||
if (ray::api::RayConfig::GetInstance()->use_ray_remote) {
|
||||
auto result =
|
||||
internal::TaskExecutionHandler(typed_descriptor->FunctionName(), args_buffer);
|
||||
data = std::make_shared<msgpack::sbuffer>(std::move(result));
|
||||
runtime->Put(std::move(data), task_spec.ReturnId(0));
|
||||
if (actor) {
|
||||
auto result = internal::TaskExecutionHandler(typed_descriptor->FunctionName(),
|
||||
args_buffer, actor.get());
|
||||
data = std::make_shared<msgpack::sbuffer>(std::move(result));
|
||||
runtime->Put(std::move(data), task_spec.ReturnId(0));
|
||||
} else {
|
||||
auto result = internal::TaskExecutionHandler(typed_descriptor->FunctionName(),
|
||||
args_buffer, nullptr);
|
||||
data = std::make_shared<msgpack::sbuffer>(std::move(result));
|
||||
if (task_spec.IsActorCreationTask()) {
|
||||
std::unique_ptr<ActorContext> actorContext(new ActorContext());
|
||||
actorContext->current_actor = data;
|
||||
absl::MutexLock lock(&actor_contexts_mutex);
|
||||
actor_contexts.emplace(task_spec.ActorCreationId(), std::move(actorContext));
|
||||
} else {
|
||||
runtime->Put(std::move(data), task_spec.ReturnId(0));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,8 @@ namespace internal {
|
|||
/// Execute remote functions by networking stream.
|
||||
msgpack::sbuffer TaskExecutionHandler(
|
||||
const std::string &func_name,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer);
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
msgpack::sbuffer *actor_ptr);
|
||||
|
||||
BOOST_DLL_ALIAS(internal::TaskExecutionHandler, TaskExecutionHandler);
|
||||
} // namespace internal
|
||||
|
|
|
@ -116,11 +116,11 @@ TEST(RayApiTest, CallWithObjectTest) {
|
|||
TEST(RayApiTest, ActorTest) {
|
||||
Ray::Init();
|
||||
ActorHandle<Counter> actor = Ray::Actor(Counter::FactoryCreate).Remote();
|
||||
auto rt1 = actor.Task(&Counter::Add, 1).Remote();
|
||||
auto rt2 = actor.Task(&Counter::Add, 2).Remote();
|
||||
auto rt3 = actor.Task(&Counter::Add, 3).Remote();
|
||||
auto rt4 = actor.Task(&Counter::Add, rt3).Remote();
|
||||
auto rt5 = actor.Task(&Counter::Triple, 1, 2, 3).Remote();
|
||||
auto rt1 = actor.Task(&Counter::Add).Remote(1);
|
||||
auto rt2 = actor.Task(&Counter::Add).Remote(2);
|
||||
auto rt3 = actor.Task(&Counter::Add).Remote(3);
|
||||
auto rt4 = actor.Task(&Counter::Add).Remote(rt3);
|
||||
auto rt5 = actor.Task(&Counter::Triple).Remote(1, 2, 3);
|
||||
|
||||
int return1 = *(rt1.Get());
|
||||
int return2 = *(rt2.Get());
|
||||
|
|
|
@ -16,6 +16,11 @@ class Counter {
|
|||
int count;
|
||||
|
||||
Counter(int init) { count = init; }
|
||||
|
||||
template <typename... Args>
|
||||
static Counter *GenericFactoryCreate(Args... args) {
|
||||
return FactoryCreate(args...);
|
||||
}
|
||||
static Counter *FactoryCreate() { return new Counter(0); }
|
||||
static Counter *FactoryCreate(int init) { return new Counter(init); }
|
||||
static Counter *FactoryCreate(int init1, int init2) {
|
||||
|
@ -60,20 +65,21 @@ TEST(RayClusterModeTest, FullTest) {
|
|||
EXPECT_EQ(6, task_result);
|
||||
|
||||
/// actor task without args
|
||||
ActorHandle<Counter> actor1 = Ray::Actor(Counter::FactoryCreate).Remote();
|
||||
ActorHandle<Counter> actor1 = Ray::Actor(Counter::GenericFactoryCreate<>).Remote();
|
||||
auto actor_object1 = actor1.Task(&Counter::Plus1).Remote();
|
||||
int actor_task_result1 = *(Ray::Get(actor_object1));
|
||||
EXPECT_EQ(1, actor_task_result1);
|
||||
|
||||
/// actor task with args
|
||||
ActorHandle<Counter> actor2 = Ray::Actor(Counter::FactoryCreate, 1).Remote();
|
||||
auto actor_object2 = actor2.Task(&Counter::Add, 5).Remote();
|
||||
ActorHandle<Counter> actor2 = Ray::Actor(Counter::GenericFactoryCreate<int>).Remote(1);
|
||||
auto actor_object2 = actor2.Task(&Counter::Add).Remote(5);
|
||||
int actor_task_result2 = *(Ray::Get(actor_object2));
|
||||
EXPECT_EQ(6, actor_task_result2);
|
||||
|
||||
/// actor task with args which pass by reference
|
||||
ActorHandle<Counter> actor3 = Ray::Actor(Counter::FactoryCreate, 6, 0).Remote();
|
||||
auto actor_object3 = actor3.Task(&Counter::Add, actor_object2).Remote();
|
||||
ActorHandle<Counter> actor3 =
|
||||
Ray::Actor(Counter::GenericFactoryCreate<int, int>).Remote(6, 0);
|
||||
auto actor_object3 = actor3.Task(&Counter::Add).Remote(actor_object2);
|
||||
int actor_task_result3 = *(Ray::Get(actor_object3));
|
||||
EXPECT_EQ(12, actor_task_result3);
|
||||
|
||||
|
@ -106,11 +112,11 @@ TEST(RayClusterModeTest, FullTest) {
|
|||
EXPECT_EQ(result6, 12);
|
||||
|
||||
/// create actor and actor function remote call with args passed by value
|
||||
ActorHandle<Counter> actor4 = Ray::Actor(Counter::FactoryCreate, 10).Remote();
|
||||
auto r7 = actor4.Task(&Counter::Add, 5).Remote();
|
||||
auto r8 = actor4.Task(&Counter::Add, 1).Remote();
|
||||
auto r9 = actor4.Task(&Counter::Add, 3).Remote();
|
||||
auto r10 = actor4.Task(&Counter::Add, 8).Remote();
|
||||
ActorHandle<Counter> actor4 = Ray::Actor(Counter::GenericFactoryCreate<int>).Remote(10);
|
||||
auto r7 = actor4.Task(&Counter::Add).Remote(5);
|
||||
auto r8 = actor4.Task(&Counter::Add).Remote(1);
|
||||
auto r9 = actor4.Task(&Counter::Add).Remote(3);
|
||||
auto r10 = actor4.Task(&Counter::Add).Remote(8);
|
||||
|
||||
int result7 = *(Ray::Get(r7));
|
||||
int result8 = *(Ray::Get(r8));
|
||||
|
@ -122,12 +128,13 @@ TEST(RayClusterModeTest, FullTest) {
|
|||
EXPECT_EQ(result10, 27);
|
||||
|
||||
/// create actor and task function remote call with args passed by reference
|
||||
ActorHandle<Counter> actor5 = Ray::Actor(Counter::FactoryCreate, r10, 0).Remote();
|
||||
ActorHandle<Counter> actor5 =
|
||||
Ray::Actor(Counter::GenericFactoryCreate<int, int>).Remote(r10, 0);
|
||||
|
||||
auto r11 = actor5.Task(&Counter::Add, r0).Remote();
|
||||
auto r12 = actor5.Task(&Counter::Add, r11).Remote();
|
||||
auto r13 = actor5.Task(&Counter::Add, r10).Remote();
|
||||
auto r14 = actor5.Task(&Counter::Add, r13).Remote();
|
||||
auto r11 = actor5.Task(&Counter::Add).Remote(r0);
|
||||
auto r12 = actor5.Task(&Counter::Add).Remote(r11);
|
||||
auto r13 = actor5.Task(&Counter::Add).Remote(r10);
|
||||
auto r14 = actor5.Task(&Counter::Add).Remote(r13);
|
||||
auto r15 = Ray::Task(Plus).Remote(r0, r11);
|
||||
auto r16 = Ray::Task(Plus1).Remote(r15);
|
||||
|
||||
|
|
|
@ -13,6 +13,24 @@ RAY_REMOTE(Return1);
|
|||
RAY_REMOTE(Plus1);
|
||||
RAY_REMOTE(Plus);
|
||||
|
||||
class DummyObject {
|
||||
public:
|
||||
int count;
|
||||
|
||||
MSGPACK_DEFINE(count);
|
||||
DummyObject() { count = 0; };
|
||||
DummyObject(int init) { count = init; }
|
||||
|
||||
static DummyObject *FactoryCreate(int init) { return new DummyObject(init); }
|
||||
|
||||
int Add(int x) {
|
||||
count += x;
|
||||
return count;
|
||||
}
|
||||
};
|
||||
RAY_REMOTE(DummyObject::FactoryCreate);
|
||||
RAY_REMOTE(&DummyObject::Add);
|
||||
|
||||
std::string lib_name = "";
|
||||
|
||||
std::string redis_ip = "";
|
||||
|
@ -63,6 +81,47 @@ TEST(RayClusterModeTest, FullTest) {
|
|||
EXPECT_EQ(result4, 2);
|
||||
EXPECT_EQ(result5, 3);
|
||||
EXPECT_EQ(result6, 12);
|
||||
|
||||
/// create actor and actor function remote call with args passed by value
|
||||
ActorHandle<DummyObject> actor4 = Ray::Actor(DummyObject::FactoryCreate).Remote(10);
|
||||
auto r7 = actor4.Task(&DummyObject::Add).Remote(5);
|
||||
auto r8 = actor4.Task(&DummyObject::Add).Remote(1);
|
||||
auto r9 = actor4.Task(&DummyObject::Add).Remote(3);
|
||||
auto r10 = actor4.Task(&DummyObject::Add).Remote(8);
|
||||
|
||||
int result7 = *(Ray::Get(r7));
|
||||
int result8 = *(Ray::Get(r8));
|
||||
int result9 = *(Ray::Get(r9));
|
||||
int result10 = *(Ray::Get(r10));
|
||||
EXPECT_EQ(result7, 15);
|
||||
EXPECT_EQ(result8, 16);
|
||||
EXPECT_EQ(result9, 19);
|
||||
EXPECT_EQ(result10, 27);
|
||||
|
||||
/// create actor and task function remote call with args passed by reference
|
||||
ActorHandle<DummyObject> actor5 = Ray::Actor(DummyObject::FactoryCreate).Remote(r10);
|
||||
|
||||
auto r11 = actor5.Task(&DummyObject::Add).Remote(r0);
|
||||
auto r12 = actor5.Task(&DummyObject::Add).Remote(r11);
|
||||
auto r13 = actor5.Task(&DummyObject::Add).Remote(r10);
|
||||
auto r14 = actor5.Task(&DummyObject::Add).Remote(r13);
|
||||
// auto r15 = Ray::Task(Plus).Remote(r0, r11);
|
||||
// auto r16 = Ray::Task(Plus1).Remote(r15);
|
||||
|
||||
int result12 = *(Ray::Get(r12));
|
||||
int result14 = *(Ray::Get(r14));
|
||||
int result11 = *(Ray::Get(r11));
|
||||
int result13 = *(Ray::Get(r13));
|
||||
// int result16 = *(Ray::Get(r16));
|
||||
// int result15 = *(Ray::Get(r15));
|
||||
|
||||
EXPECT_EQ(result11, 28);
|
||||
EXPECT_EQ(result12, 56);
|
||||
EXPECT_EQ(result13, 83);
|
||||
EXPECT_EQ(result14, 166);
|
||||
// EXPECT_EQ(result15, 29);
|
||||
// EXPECT_EQ(result16, 30);
|
||||
|
||||
ray::api::RayConfig::GetInstance()->use_ray_remote = false;
|
||||
Ray::Shutdown();
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <ray/api.h>
|
||||
#include <ray/api/serializer.h>
|
||||
|
||||
#include "cpp/src/ray/runtime/task/task_executor.h"
|
||||
#include "cpp/src/ray/util/function_helper.h"
|
||||
#include "ray/core.h"
|
||||
|
@ -36,6 +37,26 @@ int NotRegisteredFunc(int x) { return x; }
|
|||
|
||||
void ExceptionFunc(int x) { throw std::invalid_argument(std::to_string(x)); }
|
||||
|
||||
class DummyObject {
|
||||
public:
|
||||
int count;
|
||||
|
||||
MSGPACK_DEFINE(count);
|
||||
DummyObject() = default;
|
||||
DummyObject(int init) {
|
||||
std::cout << "construct DummyObject\n";
|
||||
count = init;
|
||||
}
|
||||
|
||||
int Add(int x, int y) { return x + y; }
|
||||
|
||||
~DummyObject() { std::cout << "destruct DummyObject\n"; }
|
||||
|
||||
static DummyObject *FactoryCreate(int init) { return new DummyObject(init); }
|
||||
};
|
||||
RAY_REMOTE(DummyObject::FactoryCreate);
|
||||
RAY_REMOTE(&DummyObject::Add);
|
||||
|
||||
RAY_REMOTE(PlusOne);
|
||||
RAY_REMOTE(PlusTwo);
|
||||
RAY_REMOTE(VoidFuncNoArgs);
|
||||
|
@ -47,11 +68,10 @@ TEST(RayApiTest, DuplicateRegister) {
|
|||
EXPECT_TRUE(r);
|
||||
|
||||
/// Duplicate register
|
||||
bool r1 = FunctionManager::Instance().RegisterRemoteFunction("Return", Return);
|
||||
EXPECT_FALSE(r1);
|
||||
|
||||
bool r2 = FunctionManager::Instance().RegisterRemoteFunction("PlusOne", PlusOne);
|
||||
EXPECT_FALSE(r2);
|
||||
EXPECT_THROW(FunctionManager::Instance().RegisterRemoteFunction("Return", Return),
|
||||
RayException);
|
||||
EXPECT_THROW(FunctionManager::Instance().RegisterRemoteFunction("PlusOne", PlusOne),
|
||||
RayException);
|
||||
}
|
||||
|
||||
TEST(RayApiTest, NormalTask) {
|
||||
|
@ -100,20 +120,7 @@ TEST(RayApiTest, NotExistFunction) {
|
|||
EXPECT_THROW(Ray::Task(NotRegisteredFunc), RayException);
|
||||
}
|
||||
|
||||
TEST(RayApiTest, ArgumentsNotMatch) {
|
||||
/// Arguments number is not match.
|
||||
auto r = Ray::Task(PlusOne).Remote();
|
||||
EXPECT_THROW(r.Get(), RayException);
|
||||
|
||||
auto r1 = Ray::Task(PlusOne).Remote(1, 2);
|
||||
EXPECT_THROW(r1.Get(), RayException);
|
||||
|
||||
auto r2 = Ray::Task(ExceptionFunc).Remote();
|
||||
EXPECT_THROW(r2.Get(), RayException);
|
||||
|
||||
auto r3 = Ray::Task(ExceptionFunc).Remote(1, 2);
|
||||
EXPECT_THROW(r3.Get(), RayException);
|
||||
|
||||
TEST(RayApiTest, ExceptionTask) {
|
||||
/// Normal task Exception.
|
||||
auto r4 = Ray::Task(ExceptionFunc).Remote(2);
|
||||
EXPECT_THROW(r4.Get(), RayException);
|
||||
|
|
|
@ -54,8 +54,9 @@ std::shared_ptr<boost::dll::shared_library> FunctionHelper::LoadDll(
|
|||
}
|
||||
|
||||
std::function<msgpack::sbuffer(const std::string &,
|
||||
const std::vector<std::shared_ptr<::ray::RayObject>> &)>
|
||||
FunctionHelper::GetExecuteFunction(const std::string &lib_name) {
|
||||
const std::vector<std::shared_ptr<::ray::RayObject>> &,
|
||||
msgpack::sbuffer *)>
|
||||
FunctionHelper::GetEntryFunction(const std::string &lib_name) {
|
||||
auto it = funcs_.find(lib_name);
|
||||
if (it != funcs_.end()) {
|
||||
return it->second;
|
||||
|
@ -67,11 +68,11 @@ FunctionHelper::GetExecuteFunction(const std::string &lib_name) {
|
|||
}
|
||||
|
||||
try {
|
||||
auto execute_func = boost::dll::import_alias<msgpack::sbuffer(
|
||||
const std::string &, const std::vector<std::shared_ptr<::ray::RayObject>> &)>(
|
||||
*lib, "TaskExecutionHandler");
|
||||
funcs_.emplace(lib_name, execute_func);
|
||||
return execute_func;
|
||||
auto entry_func = boost::dll::import_alias<msgpack::sbuffer(
|
||||
const std::string &, const std::vector<std::shared_ptr<::ray::RayObject>> &,
|
||||
msgpack::sbuffer *)>(*lib, "TaskExecutionHandler");
|
||||
funcs_.emplace(lib_name, entry_func);
|
||||
return entry_func;
|
||||
} catch (std::exception &e) {
|
||||
RAY_LOG(WARNING) << "Get execute function failed, lib_name: " << lib_name
|
||||
<< ", failed reason: " << e.what();
|
||||
|
|
|
@ -21,8 +21,9 @@ class FunctionHelper {
|
|||
|
||||
std::shared_ptr<boost::dll::shared_library> LoadDll(const std::string &lib_name);
|
||||
std::function<msgpack::sbuffer(const std::string &,
|
||||
const std::vector<std::shared_ptr<::ray::RayObject>> &)>
|
||||
GetExecuteFunction(const std::string &lib_name);
|
||||
const std::vector<std::shared_ptr<::ray::RayObject>> &,
|
||||
msgpack::sbuffer *)>
|
||||
GetEntryFunction(const std::string &lib_name);
|
||||
|
||||
private:
|
||||
FunctionHelper() = default;
|
||||
|
@ -37,7 +38,8 @@ class FunctionHelper {
|
|||
std::unordered_map<
|
||||
std::string,
|
||||
std::function<msgpack::sbuffer(
|
||||
const std::string &, const std::vector<std::shared_ptr<::ray::RayObject>> &)>>
|
||||
const std::string &, const std::vector<std::shared_ptr<::ray::RayObject>> &,
|
||||
msgpack::sbuffer *)>>
|
||||
funcs_;
|
||||
};
|
||||
} // namespace api
|
||||
|
|
Loading…
Add table
Reference in a new issue