mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[cpp worker] support pass by reference on cluster mode (#11753)
This commit is contained in:
parent
8d599bb3f5
commit
df2c2a7ce5
22 changed files with 235 additions and 223 deletions
|
@ -197,13 +197,12 @@ template <typename ReturnType, typename FuncType, typename ExecFuncType,
|
|||
typename... ArgTypes>
|
||||
inline TaskCaller<ReturnType> Ray::TaskInternal(FuncType &func, ExecFuncType &exec_func,
|
||||
ArgTypes &... args) {
|
||||
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
|
||||
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
|
||||
Arguments::WrapArgs(packer, args...);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> task_args;
|
||||
Arguments::WrapArgs(&task_args, args...);
|
||||
RemoteFunctionPtrHolder ptr;
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(func);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
return TaskCaller<ReturnType>(runtime_.get(), ptr, buffer);
|
||||
return TaskCaller<ReturnType>(runtime_.get(), ptr, std::move(task_args));
|
||||
}
|
||||
|
||||
template <typename ActorType, typename FuncType, typename ExecFuncType,
|
||||
|
@ -211,13 +210,12 @@ template <typename ActorType, typename FuncType, typename ExecFuncType,
|
|||
inline ActorCreator<ActorType> Ray::CreateActorInternal(FuncType &create_func,
|
||||
ExecFuncType &exec_func,
|
||||
ArgTypes &... args) {
|
||||
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
|
||||
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
|
||||
Arguments::WrapArgs(packer, args...);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> task_args;
|
||||
Arguments::WrapArgs(&task_args, args...);
|
||||
RemoteFunctionPtrHolder ptr;
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(create_func);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
return ActorCreator<ActorType>(runtime_.get(), ptr, buffer);
|
||||
return ActorCreator<ActorType>(runtime_.get(), ptr, std::move(task_args));
|
||||
}
|
||||
|
||||
template <typename ReturnType, typename ActorType, typename FuncType,
|
||||
|
@ -226,14 +224,14 @@ inline ActorTaskCaller<ReturnType> Ray::CallActorInternal(FuncType &actor_func,
|
|||
ExecFuncType &exec_func,
|
||||
ActorHandle<ActorType> &actor,
|
||||
ArgTypes &... args) {
|
||||
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
|
||||
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
|
||||
Arguments::WrapArgs(packer, args...);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> task_args;
|
||||
Arguments::WrapArgs(&task_args, args...);
|
||||
RemoteFunctionPtrHolder ptr;
|
||||
MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func);
|
||||
ptr.function_pointer = reinterpret_cast<uintptr_t>(holder.value[0]);
|
||||
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
|
||||
return ActorTaskCaller<ReturnType>(runtime_.get(), actor.ID(), ptr, buffer);
|
||||
return ActorTaskCaller<ReturnType>(runtime_.get(), actor.ID(), ptr,
|
||||
std::move(task_args));
|
||||
}
|
||||
|
||||
// TODO(barakmich): These includes are generated files that do not contain their
|
||||
|
|
|
@ -12,14 +12,14 @@ class ActorCreator {
|
|||
ActorCreator();
|
||||
|
||||
ActorCreator(RayRuntime *runtime, RemoteFunctionPtrHolder ptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args);
|
||||
|
||||
ActorHandle<ActorType> Remote();
|
||||
|
||||
private:
|
||||
RayRuntime *runtime_;
|
||||
RemoteFunctionPtrHolder ptr_;
|
||||
std::shared_ptr<msgpack::sbuffer> args_;
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> args_;
|
||||
};
|
||||
|
||||
// ---------- implementation ----------
|
||||
|
@ -29,8 +29,8 @@ ActorCreator<ActorType>::ActorCreator() {}
|
|||
|
||||
template <typename ActorType>
|
||||
ActorCreator<ActorType>::ActorCreator(RayRuntime *runtime, RemoteFunctionPtrHolder ptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args)
|
||||
: runtime_(runtime), ptr_(ptr), args_(args) {}
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args)
|
||||
: runtime_(runtime), ptr_(ptr), args_(std::move(args)) {}
|
||||
|
||||
template <typename ActorType>
|
||||
ActorHandle<ActorType> ActorCreator<ActorType>::Remote() {
|
||||
|
|
|
@ -12,7 +12,7 @@ class ActorTaskCaller {
|
|||
ActorTaskCaller();
|
||||
|
||||
ActorTaskCaller(RayRuntime *runtime, ActorID id, RemoteFunctionPtrHolder ptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args);
|
||||
|
||||
ObjectRef<ReturnType> Remote();
|
||||
|
||||
|
@ -20,7 +20,7 @@ class ActorTaskCaller {
|
|||
RayRuntime *runtime_;
|
||||
ActorID id_;
|
||||
RemoteFunctionPtrHolder ptr_;
|
||||
std::shared_ptr<msgpack::sbuffer> args_;
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> args_;
|
||||
};
|
||||
|
||||
// ---------- implementation ----------
|
||||
|
@ -29,10 +29,10 @@ template <typename ReturnType>
|
|||
ActorTaskCaller<ReturnType>::ActorTaskCaller() {}
|
||||
|
||||
template <typename ReturnType>
|
||||
ActorTaskCaller<ReturnType>::ActorTaskCaller(RayRuntime *runtime, ActorID id,
|
||||
RemoteFunctionPtrHolder ptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args)
|
||||
: runtime_(runtime), id_(id), ptr_(ptr), args_(args) {}
|
||||
ActorTaskCaller<ReturnType>::ActorTaskCaller(
|
||||
RayRuntime *runtime, ActorID id, RemoteFunctionPtrHolder ptr,
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args)
|
||||
: runtime_(runtime), id_(id), ptr_(ptr), args_(std::move(args)) {}
|
||||
|
||||
template <typename ReturnType>
|
||||
ObjectRef<ReturnType> ActorTaskCaller<ReturnType>::Remote() {
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <ray/api/object_ref.h>
|
||||
#include <ray/api/serializer.h>
|
||||
#include "ray/common/task/task_util.h"
|
||||
|
||||
#include <msgpack.hpp>
|
||||
|
||||
|
@ -10,75 +12,100 @@ namespace api {
|
|||
|
||||
class Arguments {
|
||||
public:
|
||||
static void WrapArgs(msgpack::packer<msgpack::sbuffer> &packer);
|
||||
static void WrapArgs(std::vector<std::unique_ptr<::ray::TaskArg>> *task_args);
|
||||
|
||||
template <typename Arg1Type>
|
||||
static void WrapArgs(msgpack::packer<msgpack::sbuffer> &packer, Arg1Type &arg1);
|
||||
|
||||
template <typename Arg1Type, typename... OtherArgTypes>
|
||||
static void WrapArgs(msgpack::packer<msgpack::sbuffer> &packer, Arg1Type &arg1,
|
||||
OtherArgTypes &... args);
|
||||
|
||||
static void UnwrapArgs(msgpack::unpacker &unpacker);
|
||||
static void WrapArgs(std::vector<std::unique_ptr<::ray::TaskArg>> *task_args,
|
||||
Arg1Type &arg1);
|
||||
|
||||
template <typename Arg1Type>
|
||||
static void UnwrapArgs(msgpack::unpacker &unpacker, std::shared_ptr<Arg1Type> *arg1);
|
||||
static void WrapArgs(std::vector<std::unique_ptr<::ray::TaskArg>> *task_args,
|
||||
ObjectRef<Arg1Type> &arg1);
|
||||
|
||||
template <typename Arg1Type, typename... OtherArgTypes>
|
||||
static void UnwrapArgs(msgpack::unpacker &unpacker, std::shared_ptr<Arg1Type> *arg1,
|
||||
static void WrapArgs(std::vector<std::unique_ptr<::ray::TaskArg>> *task_args,
|
||||
Arg1Type &arg1, OtherArgTypes &... args);
|
||||
|
||||
static void UnwrapArgs(const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
int &arg_index);
|
||||
|
||||
template <typename Arg1Type>
|
||||
static void UnwrapArgs(const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
int &arg_index, std::shared_ptr<Arg1Type> *arg1);
|
||||
|
||||
template <typename Arg1Type, typename... OtherArgTypes>
|
||||
static void UnwrapArgs(const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
int &arg_index, std::shared_ptr<Arg1Type> *arg1,
|
||||
std::shared_ptr<OtherArgTypes> *... args);
|
||||
};
|
||||
|
||||
// --------- inline implementation ------------
|
||||
#include <typeinfo>
|
||||
|
||||
inline void Arguments::WrapArgs(msgpack::packer<msgpack::sbuffer> &packer) {}
|
||||
inline void Arguments::WrapArgs(std::vector<std::unique_ptr<::ray::TaskArg>> *task_args) {
|
||||
}
|
||||
|
||||
template <typename Arg1Type>
|
||||
inline void Arguments::WrapArgs(msgpack::packer<msgpack::sbuffer> &packer,
|
||||
inline void Arguments::WrapArgs(std::vector<std::unique_ptr<::ray::TaskArg>> *task_args,
|
||||
Arg1Type &arg1) {
|
||||
/// TODO(Guyang Song): optimize the memory copy.
|
||||
msgpack::sbuffer buffer;
|
||||
msgpack::packer<msgpack::sbuffer> packer(buffer);
|
||||
/// Notice ObjectRefClassPrefix should be modified by ObjectRef class name or namespace.
|
||||
static const std::string ObjectRefClassPrefix = "N3ray3api9ObjectRef";
|
||||
std::string type_name = typeid(arg1).name();
|
||||
if (type_name.rfind(ObjectRefClassPrefix, 0) == 0) {
|
||||
/// Pass by reference.
|
||||
Serializer::Serialize(packer, true);
|
||||
} else {
|
||||
/// Pass by value.
|
||||
Serializer::Serialize(packer, false);
|
||||
}
|
||||
RAY_CHECK(type_name.rfind(ObjectRefClassPrefix, 0) != 0)
|
||||
<< "ObjectRef can not be wrapped";
|
||||
Serializer::Serialize(packer, arg1);
|
||||
auto memory_buffer = std::make_shared<::ray::LocalMemoryBuffer>(
|
||||
reinterpret_cast<uint8_t *>(buffer.data()), buffer.size(), true);
|
||||
/// Pass by value.
|
||||
auto task_arg = new TaskArgByValue(std::make_shared<::ray::RayObject>(
|
||||
memory_buffer, nullptr, std::vector<ObjectID>()));
|
||||
task_args->emplace_back(task_arg);
|
||||
}
|
||||
|
||||
template <typename Arg1Type, typename... OtherArgTypes>
|
||||
inline void Arguments::WrapArgs(msgpack::packer<msgpack::sbuffer> &packer, Arg1Type &arg1,
|
||||
OtherArgTypes &... args) {
|
||||
WrapArgs(packer, arg1);
|
||||
WrapArgs(packer, args...);
|
||||
}
|
||||
|
||||
inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker) {}
|
||||
|
||||
template <typename Arg1Type>
|
||||
inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker,
|
||||
std::shared_ptr<Arg1Type> *arg1) {
|
||||
bool is_object_ref;
|
||||
Serializer::Deserialize(unpacker, &is_object_ref);
|
||||
if (is_object_ref) {
|
||||
ObjectRef<Arg1Type> object_ref;
|
||||
Serializer::Deserialize(unpacker, &object_ref);
|
||||
*arg1 = object_ref.Get();
|
||||
} else {
|
||||
Serializer::Deserialize(unpacker, arg1);
|
||||
}
|
||||
inline void Arguments::WrapArgs(std::vector<std::unique_ptr<::ray::TaskArg>> *task_args,
|
||||
ObjectRef<Arg1Type> &arg1) {
|
||||
/// Pass by reference.
|
||||
auto task_arg = new TaskArgByReference(arg1.ID(), rpc::Address());
|
||||
task_args->emplace_back(task_arg);
|
||||
}
|
||||
|
||||
template <typename Arg1Type, typename... OtherArgTypes>
|
||||
inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker,
|
||||
std::shared_ptr<Arg1Type> *arg1,
|
||||
std::shared_ptr<OtherArgTypes> *... args) {
|
||||
UnwrapArgs(unpacker, arg1);
|
||||
UnwrapArgs(unpacker, args...);
|
||||
inline void Arguments::WrapArgs(std::vector<std::unique_ptr<::ray::TaskArg>> *task_args,
|
||||
Arg1Type &arg1, OtherArgTypes &... args) {
|
||||
WrapArgs(task_args, arg1);
|
||||
WrapArgs(task_args, args...);
|
||||
}
|
||||
|
||||
inline void Arguments::UnwrapArgs(
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer, int &arg_index) {}
|
||||
|
||||
template <typename Arg1Type>
|
||||
inline void Arguments::UnwrapArgs(
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer, int &arg_index,
|
||||
std::shared_ptr<Arg1Type> *arg1) {
|
||||
std::shared_ptr<msgpack::sbuffer> sbuffer;
|
||||
auto arg_buffer = args_buffer[arg_index]->GetData();
|
||||
sbuffer = std::make_shared<msgpack::sbuffer>(arg_buffer->Size());
|
||||
/// TODO(Guyang Song): Avoid the memory copy.
|
||||
sbuffer->write(reinterpret_cast<const char *>(arg_buffer->Data()), arg_buffer->Size());
|
||||
msgpack::unpacker unpacker;
|
||||
unpacker.reserve_buffer(sbuffer->size());
|
||||
memcpy(unpacker.buffer(), sbuffer->data(), sbuffer->size());
|
||||
unpacker.buffer_consumed(sbuffer->size());
|
||||
Serializer::Deserialize(unpacker, arg1);
|
||||
arg_index++;
|
||||
}
|
||||
|
||||
template <typename Arg1Type, typename... OtherArgTypes>
|
||||
inline void Arguments::UnwrapArgs(
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer, int &arg_index,
|
||||
std::shared_ptr<Arg1Type> *arg1, std::shared_ptr<OtherArgTypes> *... args) {
|
||||
UnwrapArgs(args_buffer, arg_index, arg1);
|
||||
UnwrapArgs(args_buffer, arg_index, args...);
|
||||
}
|
||||
|
||||
} // namespace api
|
||||
|
|
|
@ -10,13 +10,10 @@
|
|||
template <typename ReturnType, typename CastReturnType, typename... OtherArgTypes>
|
||||
std::shared_ptr<msgpack::sbuffer> ExecuteNormalFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer, TaskType task_type,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer, TaskType task_type,
|
||||
std::shared_ptr<OtherArgTypes> &... args) {
|
||||
msgpack::unpacker unpacker;
|
||||
unpacker.reserve_buffer(args_buffer->size());
|
||||
memcpy(unpacker.buffer(), args_buffer->data(), args_buffer->size());
|
||||
unpacker.buffer_consumed(args_buffer->size());
|
||||
Arguments::UnwrapArgs(unpacker, &args...);
|
||||
int arg_index = 0;
|
||||
Arguments::UnwrapArgs(args_buffer, arg_index, &args...);
|
||||
|
||||
ReturnType return_value;
|
||||
typedef ReturnType (*Func)(OtherArgTypes...);
|
||||
|
@ -33,7 +30,7 @@ std::shared_ptr<msgpack::sbuffer> ExecuteNormalFunction(
|
|||
template <typename ReturnType, typename ActorType, typename... OtherArgTypes>
|
||||
std::shared_ptr<msgpack::sbuffer> ExecuteActorFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
std::shared_ptr<msgpack::sbuffer> &actor_buffer,
|
||||
std::shared_ptr<OtherArgTypes> &... args) {
|
||||
msgpack::unpacker actor_unpacker;
|
||||
|
@ -44,11 +41,8 @@ std::shared_ptr<msgpack::sbuffer> ExecuteActorFunction(
|
|||
Serializer::Deserialize(actor_unpacker, &actor_ptr);
|
||||
ActorType *actor_object = (ActorType *)actor_ptr;
|
||||
|
||||
msgpack::unpacker unpacker;
|
||||
unpacker.reserve_buffer(args_buffer->size());
|
||||
memcpy(unpacker.buffer(), args_buffer->data(), args_buffer->size());
|
||||
unpacker.buffer_consumed(args_buffer->size());
|
||||
Arguments::UnwrapArgs(unpacker, &args...);
|
||||
int arg_index = 0;
|
||||
Arguments::UnwrapArgs(args_buffer, arg_index, &args...);
|
||||
|
||||
ReturnType return_value;
|
||||
typedef ReturnType (ActorType::*Func)(OtherArgTypes...);
|
||||
|
@ -68,7 +62,7 @@ std::shared_ptr<msgpack::sbuffer> ExecuteActorFunction(
|
|||
template <typename ReturnType>
|
||||
std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
|
||||
return ExecuteNormalFunction<ReturnType, ReturnType>(
|
||||
base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK);
|
||||
}
|
||||
|
@ -77,7 +71,7 @@ std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
|
|||
template <typename ReturnType, typename Arg1Type>
|
||||
std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
|
||||
std::shared_ptr<Arg1Type> arg1_ptr;
|
||||
return ExecuteNormalFunction<ReturnType, ReturnType>(
|
||||
base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK, arg1_ptr);
|
||||
|
@ -87,7 +81,7 @@ std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
|
|||
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
|
||||
std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
|
||||
std::shared_ptr<Arg1Type> arg1_ptr;
|
||||
std::shared_ptr<Arg2Type> arg2_ptr;
|
||||
return ExecuteNormalFunction<ReturnType, ReturnType>(
|
||||
|
@ -98,7 +92,7 @@ std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
|
|||
template <typename ReturnType>
|
||||
std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
|
||||
return ExecuteNormalFunction<ReturnType, uintptr_t>(base_addr, func_offset, args_buffer,
|
||||
TaskType::ACTOR_CREATION_TASK);
|
||||
}
|
||||
|
@ -107,7 +101,7 @@ std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
|
|||
template <typename ReturnType, typename Arg1Type>
|
||||
std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
|
||||
std::shared_ptr<Arg1Type> arg1_ptr;
|
||||
return ExecuteNormalFunction<ReturnType, uintptr_t>(
|
||||
base_addr, func_offset, args_buffer, TaskType::ACTOR_CREATION_TASK, arg1_ptr);
|
||||
|
@ -117,7 +111,7 @@ std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
|
|||
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
|
||||
std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer) {
|
||||
std::shared_ptr<Arg1Type> arg1_ptr;
|
||||
std::shared_ptr<Arg2Type> arg2_ptr;
|
||||
return ExecuteNormalFunction<ReturnType, uintptr_t>(base_addr, func_offset, args_buffer,
|
||||
|
@ -129,7 +123,7 @@ std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
|
|||
template <typename ReturnType, typename ActorType>
|
||||
std::shared_ptr<msgpack::sbuffer> ActorExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
std::shared_ptr<msgpack::sbuffer> &actor_buffer) {
|
||||
return ExecuteActorFunction<ReturnType, ActorType>(base_addr, func_offset, args_buffer,
|
||||
actor_buffer);
|
||||
|
@ -139,7 +133,7 @@ std::shared_ptr<msgpack::sbuffer> ActorExecFunction(
|
|||
template <typename ReturnType, typename ActorType, typename Arg1Type>
|
||||
std::shared_ptr<msgpack::sbuffer> ActorExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
std::shared_ptr<msgpack::sbuffer> &actor_buffer) {
|
||||
std::shared_ptr<Arg1Type> arg1_ptr;
|
||||
return ExecuteActorFunction<ReturnType, ActorType>(base_addr, func_offset, args_buffer,
|
||||
|
@ -150,7 +144,7 @@ std::shared_ptr<msgpack::sbuffer> ActorExecFunction(
|
|||
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
|
||||
std::shared_ptr<msgpack::sbuffer> ActorExecFunction(
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::shared_ptr<msgpack::sbuffer> &args_buffer,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
std::shared_ptr<msgpack::sbuffer> &actor_buffer) {
|
||||
std::shared_ptr<Arg1Type> arg1_ptr;
|
||||
std::shared_ptr<Arg2Type> arg2_ptr;
|
||||
|
|
|
@ -37,11 +37,11 @@ class RayRuntime {
|
|||
int timeout_ms) = 0;
|
||||
|
||||
virtual ObjectID Call(const RemoteFunctionPtrHolder &fptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args) = 0;
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args) = 0;
|
||||
virtual ActorID CreateActor(const RemoteFunctionPtrHolder &fptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args) = 0;
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args) = 0;
|
||||
virtual ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor,
|
||||
std::shared_ptr<msgpack::sbuffer> args) = 0;
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args) = 0;
|
||||
};
|
||||
} // namespace api
|
||||
} // namespace ray
|
|
@ -12,14 +12,14 @@ class TaskCaller {
|
|||
TaskCaller();
|
||||
|
||||
TaskCaller(RayRuntime *runtime, RemoteFunctionPtrHolder ptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args);
|
||||
|
||||
ObjectRef<ReturnType> Remote();
|
||||
|
||||
private:
|
||||
RayRuntime *runtime_;
|
||||
RemoteFunctionPtrHolder ptr_;
|
||||
std::shared_ptr<msgpack::sbuffer> args_;
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> args_;
|
||||
};
|
||||
|
||||
// ---------- implementation ----------
|
||||
|
@ -29,8 +29,8 @@ TaskCaller<ReturnType>::TaskCaller() {}
|
|||
|
||||
template <typename ReturnType>
|
||||
TaskCaller<ReturnType>::TaskCaller(RayRuntime *runtime, RemoteFunctionPtrHolder ptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args)
|
||||
: runtime_(runtime), ptr_(ptr), args_(args) {}
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &&args)
|
||||
: runtime_(runtime), ptr_(ptr), args_(std::move(args)) {}
|
||||
|
||||
template <typename ReturnType>
|
||||
ObjectRef<ReturnType> TaskCaller<ReturnType>::Remote() {
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
#include <ray/api.h>
|
||||
|
||||
/// using namespace
|
||||
using namespace ray::api;
|
||||
using namespace ::ray::api;
|
||||
|
||||
/// general function of user code
|
||||
int Return1() { return 1; }
|
||||
|
|
|
@ -74,7 +74,7 @@ WaitResult AbstractRayRuntime::Wait(const std::vector<ObjectID> &ids, int num_ob
|
|||
|
||||
InvocationSpec BuildInvocationSpec(TaskType task_type, std::string lib_name,
|
||||
const RemoteFunctionPtrHolder &fptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args,
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args,
|
||||
const ActorID &actor) {
|
||||
InvocationSpec invocation_spec;
|
||||
invocation_spec.task_type = task_type;
|
||||
|
@ -83,27 +83,28 @@ InvocationSpec BuildInvocationSpec(TaskType task_type, std::string lib_name,
|
|||
invocation_spec.lib_name = lib_name;
|
||||
invocation_spec.fptr = fptr;
|
||||
invocation_spec.actor_id = actor;
|
||||
invocation_spec.args = args;
|
||||
invocation_spec.args = std::move(args);
|
||||
return invocation_spec;
|
||||
}
|
||||
|
||||
ObjectID AbstractRayRuntime::Call(const RemoteFunctionPtrHolder &fptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args) {
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args) {
|
||||
auto invocation_spec = BuildInvocationSpec(
|
||||
TaskType::NORMAL_TASK, this->config_->lib_name, fptr, args, ActorID::Nil());
|
||||
return task_submitter_->SubmitTask(invocation_spec);
|
||||
}
|
||||
|
||||
ActorID AbstractRayRuntime::CreateActor(const RemoteFunctionPtrHolder &fptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args) {
|
||||
ActorID AbstractRayRuntime::CreateActor(
|
||||
const RemoteFunctionPtrHolder &fptr,
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args) {
|
||||
auto invocation_spec = BuildInvocationSpec(
|
||||
TaskType::ACTOR_CREATION_TASK, this->config_->lib_name, fptr, args, ActorID::Nil());
|
||||
return task_submitter_->CreateActor(invocation_spec);
|
||||
}
|
||||
|
||||
ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr,
|
||||
const ActorID &actor,
|
||||
std::shared_ptr<msgpack::sbuffer> args) {
|
||||
ObjectID AbstractRayRuntime::CallActor(
|
||||
const RemoteFunctionPtrHolder &fptr, const ActorID &actor,
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args) {
|
||||
auto invocation_spec = BuildInvocationSpec(TaskType::ACTOR_TASK,
|
||||
this->config_->lib_name, fptr, args, actor);
|
||||
return task_submitter_->SubmitActorTask(invocation_spec);
|
||||
|
@ -115,13 +116,6 @@ const TaskID &AbstractRayRuntime::GetCurrentTaskId() {
|
|||
|
||||
const JobID &AbstractRayRuntime::GetCurrentJobID() { return worker_->GetCurrentJobID(); }
|
||||
|
||||
ActorID AbstractRayRuntime::GetNextActorID() {
|
||||
const int next_task_index = worker_->GetNextTaskIndex();
|
||||
const ActorID actor_id = ActorID::Of(worker_->GetCurrentJobID(),
|
||||
worker_->GetCurrentTaskID(), next_task_index);
|
||||
return actor_id;
|
||||
}
|
||||
|
||||
const std::unique_ptr<WorkerContext> &AbstractRayRuntime::GetWorkerContext() {
|
||||
return worker_;
|
||||
}
|
||||
|
|
|
@ -32,15 +32,13 @@ class AbstractRayRuntime : public RayRuntime {
|
|||
WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects, int timeout_ms);
|
||||
|
||||
ObjectID Call(const RemoteFunctionPtrHolder &fptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args);
|
||||
|
||||
ActorID CreateActor(const RemoteFunctionPtrHolder &fptr,
|
||||
std::shared_ptr<msgpack::sbuffer> args);
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args);
|
||||
|
||||
ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor,
|
||||
std::shared_ptr<msgpack::sbuffer> args);
|
||||
|
||||
ActorID GetNextActorID();
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> &args);
|
||||
|
||||
const TaskID &GetCurrentTaskId();
|
||||
|
||||
|
|
|
@ -19,5 +19,12 @@ LocalModeRayRuntime::LocalModeRayRuntime(std::shared_ptr<RayConfig> config) {
|
|||
task_submitter_ = std::unique_ptr<TaskSubmitter>(new LocalModeTaskSubmitter(*this));
|
||||
}
|
||||
|
||||
ActorID LocalModeRayRuntime::GetNextActorID() {
|
||||
const int next_task_index = worker_->GetNextTaskIndex();
|
||||
const ActorID actor_id = ActorID::Of(worker_->GetCurrentJobID(),
|
||||
worker_->GetCurrentTaskID(), next_task_index);
|
||||
return actor_id;
|
||||
}
|
||||
|
||||
} // namespace api
|
||||
} // namespace ray
|
|
@ -12,6 +12,8 @@ namespace api {
|
|||
class LocalModeRayRuntime : public AbstractRayRuntime {
|
||||
public:
|
||||
LocalModeRayRuntime(std::shared_ptr<RayConfig> config);
|
||||
|
||||
ActorID GetNextActorID();
|
||||
};
|
||||
|
||||
} // namespace api
|
||||
|
|
|
@ -18,7 +18,7 @@ class InvocationSpec {
|
|||
int actor_counter;
|
||||
std::string lib_name;
|
||||
RemoteFunctionPtrHolder fptr;
|
||||
std::shared_ptr<msgpack::sbuffer> args;
|
||||
std::vector<std::unique_ptr<::ray::TaskArg>> args;
|
||||
};
|
||||
} // namespace api
|
||||
} // namespace ray
|
||||
|
|
|
@ -18,7 +18,7 @@ LocalModeTaskSubmitter::LocalModeTaskSubmitter(
|
|||
thread_pool_.reset(new boost::asio::thread_pool(10));
|
||||
}
|
||||
|
||||
ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) {
|
||||
ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation) {
|
||||
/// TODO(Guyang Song): Make the infomation of TaskSpecification more reasonable
|
||||
/// We just reuse the TaskSpecification class and make the single process mode work.
|
||||
/// Maybe some infomation of TaskSpecification are not reasonable or invalid.
|
||||
|
@ -47,6 +47,7 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) {
|
|||
PlacementGroupID::Nil(), true);
|
||||
if (invocation.task_type == TaskType::NORMAL_TASK) {
|
||||
} else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) {
|
||||
invocation.actor_id = local_mode_ray_tuntime_.GetNextActorID();
|
||||
builder.SetActorCreationTaskSpec(invocation.actor_id);
|
||||
} else if (invocation.task_type == TaskType::ACTOR_TASK) {
|
||||
const TaskID actor_creation_task_id =
|
||||
|
@ -58,13 +59,9 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) {
|
|||
} else {
|
||||
throw RayException("unknown task type");
|
||||
}
|
||||
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
|
||||
reinterpret_cast<uint8_t *>(invocation.args->data()), invocation.args->size(),
|
||||
true);
|
||||
/// TODO(Guyang Song): Use both 'AddByRefArg' and 'AddByValueArg' to distinguish
|
||||
auto arg = TaskArgByValue(
|
||||
std::make_shared<::ray::RayObject>(buffer, nullptr, std::vector<ObjectID>()));
|
||||
builder.AddArg(arg);
|
||||
for (size_t i = 0; i < invocation.args.size(); i++) {
|
||||
builder.AddArg(*invocation.args[i]);
|
||||
}
|
||||
auto task_specification = builder.Build();
|
||||
ObjectID return_object_id = task_specification.ReturnId(0);
|
||||
|
||||
|
@ -81,47 +78,34 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation) {
|
|||
/// TODO(Guyang Song): Handle task dependencies.
|
||||
/// Execute actor task directly in the main thread because we must guarantee the actor
|
||||
/// task executed by calling order.
|
||||
TaskExecutor::Invoke(task_specification, actor, runtime, dynamic_library_base_addr);
|
||||
TaskExecutor::Invoke(task_specification, actor, runtime, dynamic_library_base_addr,
|
||||
actor_contexts_, actor_contexts_mutex_);
|
||||
} else {
|
||||
boost::asio::post(*thread_pool_.get(),
|
||||
std::bind(
|
||||
[actor, mutex, runtime](TaskSpecification &ts) {
|
||||
[actor, mutex, runtime, this](TaskSpecification &ts) {
|
||||
if (mutex) {
|
||||
absl::MutexLock lock(mutex.get());
|
||||
}
|
||||
TaskExecutor::Invoke(ts, actor, runtime,
|
||||
dynamic_library_base_addr);
|
||||
TaskExecutor::Invoke(
|
||||
ts, actor, runtime, dynamic_library_base_addr,
|
||||
this->actor_contexts_, this->actor_contexts_mutex_);
|
||||
},
|
||||
std::move(task_specification)));
|
||||
}
|
||||
return return_object_id;
|
||||
}
|
||||
|
||||
ObjectID LocalModeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) {
|
||||
ObjectID LocalModeTaskSubmitter::SubmitTask(InvocationSpec &invocation) {
|
||||
return Submit(invocation);
|
||||
}
|
||||
|
||||
ActorID LocalModeTaskSubmitter::CreateActor(const InvocationSpec &invocation) {
|
||||
if (dynamic_library_base_addr == 0) {
|
||||
dynamic_library_base_addr =
|
||||
GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer);
|
||||
}
|
||||
ActorID id = local_mode_ray_tuntime_.GetNextActorID();
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
|
||||
ExecFunction exec_function = (ExecFunction)(invocation.fptr.exec_function_pointer);
|
||||
auto data = (*exec_function)(
|
||||
dynamic_library_base_addr,
|
||||
(size_t)(invocation.fptr.function_pointer - dynamic_library_base_addr),
|
||||
invocation.args);
|
||||
std::unique_ptr<ActorContext> actorContext(new ActorContext());
|
||||
actorContext->current_actor = data;
|
||||
absl::MutexLock lock(&actor_contexts_mutex_);
|
||||
actor_contexts_.emplace(id, std::move(actorContext));
|
||||
return id;
|
||||
ActorID LocalModeTaskSubmitter::CreateActor(InvocationSpec &invocation) {
|
||||
Submit(invocation);
|
||||
return invocation.actor_id;
|
||||
}
|
||||
|
||||
ObjectID LocalModeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) {
|
||||
ObjectID LocalModeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation) {
|
||||
return Submit(invocation);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,11 +18,11 @@ class LocalModeTaskSubmitter : public TaskSubmitter {
|
|||
public:
|
||||
LocalModeTaskSubmitter(LocalModeRayRuntime &local_mode_ray_tuntime);
|
||||
|
||||
ObjectID SubmitTask(const InvocationSpec &invocation);
|
||||
ObjectID SubmitTask(InvocationSpec &invocation);
|
||||
|
||||
ActorID CreateActor(const InvocationSpec &invocation);
|
||||
ActorID CreateActor(InvocationSpec &invocation);
|
||||
|
||||
ObjectID SubmitActorTask(const InvocationSpec &invocation);
|
||||
ObjectID SubmitActorTask(InvocationSpec &invocation);
|
||||
|
||||
private:
|
||||
std::unordered_map<ActorID, std::unique_ptr<ActorContext>> actor_contexts_;
|
||||
|
@ -33,7 +33,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter {
|
|||
|
||||
LocalModeRayRuntime &local_mode_ray_tuntime_;
|
||||
|
||||
ObjectID Submit(const InvocationSpec &invocation);
|
||||
ObjectID Submit(InvocationSpec &invocation);
|
||||
};
|
||||
} // namespace api
|
||||
} // namespace ray
|
|
@ -8,7 +8,7 @@
|
|||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
RayFunction BuildRayFunction(const InvocationSpec &invocation) {
|
||||
RayFunction BuildRayFunction(InvocationSpec &invocation) {
|
||||
auto base_addr =
|
||||
GetBaseAddressOfLibraryFromAddr((void *)invocation.fptr.function_pointer);
|
||||
auto func_offset = (size_t)(invocation.fptr.function_pointer - base_addr);
|
||||
|
@ -18,41 +18,25 @@ RayFunction BuildRayFunction(const InvocationSpec &invocation) {
|
|||
return RayFunction(Language::CPP, function_descriptor);
|
||||
}
|
||||
|
||||
void BuildTaskArgs(const InvocationSpec &invocation,
|
||||
std::vector<std::unique_ptr<ray::TaskArg>> &args) {
|
||||
if (invocation.args->size() > 0) {
|
||||
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
|
||||
reinterpret_cast<uint8_t *>(invocation.args->data()), invocation.args->size(),
|
||||
true);
|
||||
auto task_arg = new TaskArgByValue(
|
||||
std::make_shared<::ray::RayObject>(buffer, nullptr, std::vector<ObjectID>()));
|
||||
args.emplace_back(task_arg);
|
||||
}
|
||||
}
|
||||
|
||||
ObjectID NativeTaskSubmitter::Submit(const InvocationSpec &invocation) {
|
||||
std::vector<std::unique_ptr<ray::TaskArg>> args;
|
||||
BuildTaskArgs(invocation, args);
|
||||
ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation) {
|
||||
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
|
||||
std::vector<ObjectID> return_ids;
|
||||
if (invocation.task_type == TaskType::ACTOR_TASK) {
|
||||
core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation), args,
|
||||
TaskOptions(), &return_ids);
|
||||
core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation),
|
||||
invocation.args, TaskOptions(), &return_ids);
|
||||
} else {
|
||||
core_worker.SubmitTask(BuildRayFunction(invocation), args, TaskOptions(), &return_ids,
|
||||
1, std::make_pair(PlacementGroupID::Nil(), -1), true);
|
||||
core_worker.SubmitTask(BuildRayFunction(invocation), invocation.args, TaskOptions(),
|
||||
&return_ids, 1, std::make_pair(PlacementGroupID::Nil(), -1),
|
||||
true);
|
||||
}
|
||||
return return_ids[0];
|
||||
}
|
||||
|
||||
ObjectID NativeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) {
|
||||
ObjectID NativeTaskSubmitter::SubmitTask(InvocationSpec &invocation) {
|
||||
return Submit(invocation);
|
||||
}
|
||||
|
||||
ActorID NativeTaskSubmitter::CreateActor(const InvocationSpec &invocation) {
|
||||
std::vector<std::unique_ptr<ray::TaskArg>> args;
|
||||
BuildTaskArgs(invocation, args);
|
||||
|
||||
ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation) {
|
||||
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
|
||||
|
||||
std::unordered_map<std::string, double> resources;
|
||||
|
@ -67,15 +51,15 @@ ActorID NativeTaskSubmitter::CreateActor(const InvocationSpec &invocation) {
|
|||
name,
|
||||
/*is_asyncio=*/false};
|
||||
ActorID actor_id;
|
||||
auto status = core_worker.CreateActor(BuildRayFunction(invocation), args, actor_options,
|
||||
"", &actor_id);
|
||||
auto status = core_worker.CreateActor(BuildRayFunction(invocation), invocation.args,
|
||||
actor_options, "", &actor_id);
|
||||
if (!status.ok()) {
|
||||
throw RayException("Create actor error");
|
||||
}
|
||||
return actor_id;
|
||||
}
|
||||
|
||||
ObjectID NativeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) {
|
||||
ObjectID NativeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation) {
|
||||
return Submit(invocation);
|
||||
}
|
||||
|
||||
|
|
|
@ -10,14 +10,14 @@ namespace api {
|
|||
|
||||
class NativeTaskSubmitter : public TaskSubmitter {
|
||||
public:
|
||||
ObjectID SubmitTask(const InvocationSpec &invocation);
|
||||
ObjectID SubmitTask(InvocationSpec &invocation);
|
||||
|
||||
ActorID CreateActor(const InvocationSpec &invocation);
|
||||
ActorID CreateActor(InvocationSpec &invocation);
|
||||
|
||||
ObjectID SubmitActorTask(const InvocationSpec &invocation);
|
||||
ObjectID SubmitActorTask(InvocationSpec &invocation);
|
||||
|
||||
private:
|
||||
ObjectID Submit(const InvocationSpec &invocation);
|
||||
ObjectID Submit(InvocationSpec &invocation);
|
||||
};
|
||||
} // namespace api
|
||||
} // namespace ray
|
|
@ -17,7 +17,7 @@ TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_)
|
|||
|
||||
// TODO(Guyang Song): Make a common task execution function used for both local mode and
|
||||
// cluster mode.
|
||||
std::unique_ptr<ObjectID> TaskExecutor::Execute(const InvocationSpec &invocation) {
|
||||
std::unique_ptr<ObjectID> TaskExecutor::Execute(InvocationSpec &invocation) {
|
||||
abstract_ray_tuntime_.GetWorkerContext();
|
||||
return std::unique_ptr<ObjectID>(new ObjectID());
|
||||
};
|
||||
|
@ -25,7 +25,7 @@ std::unique_ptr<ObjectID> TaskExecutor::Execute(const InvocationSpec &invocation
|
|||
Status TaskExecutor::ExecuteTask(
|
||||
TaskType task_type, const std::string task_name, const RayFunction &ray_function,
|
||||
const std::unordered_map<std::string, double> &required_resources,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
const std::vector<ObjectID> &arg_reference_ids,
|
||||
const std::vector<ObjectID> &return_ids,
|
||||
std::vector<std::shared_ptr<RayObject>> *results) {
|
||||
|
@ -38,39 +38,32 @@ Status TaskExecutor::ExecuteTask(
|
|||
std::string lib_name = typed_descriptor->LibName();
|
||||
std::string func_offset = typed_descriptor->FunctionOffset();
|
||||
std::string exec_func_offset = typed_descriptor->ExecFunctionOffset();
|
||||
std::shared_ptr<msgpack::sbuffer> args_sbuffer;
|
||||
if (args.size() > 0) {
|
||||
auto args_buffer = args[0]->GetData();
|
||||
args_sbuffer = std::make_shared<msgpack::sbuffer>(args_buffer->Size());
|
||||
/// TODO(Guyang Song): Avoid the memory copy.
|
||||
args_sbuffer->write(reinterpret_cast<const char *>(args_buffer->Data()),
|
||||
args_buffer->Size());
|
||||
} else {
|
||||
args_sbuffer = std::make_shared<msgpack::sbuffer>();
|
||||
}
|
||||
auto base_addr = FunctionHelper::GetInstance().GetBaseAddress(lib_name);
|
||||
std::shared_ptr<msgpack::sbuffer> data = nullptr;
|
||||
if (task_type == TaskType::ACTOR_CREATION_TASK) {
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer);
|
||||
ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset));
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args_sbuffer);
|
||||
args_buffer);
|
||||
current_actor_ = data;
|
||||
} else if (task_type == TaskType::ACTOR_TASK) {
|
||||
RAY_CHECK(current_actor_ != nullptr);
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args,
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer,
|
||||
std::shared_ptr<msgpack::sbuffer> object);
|
||||
ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset));
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args_sbuffer, current_actor_);
|
||||
args_buffer, current_actor_);
|
||||
} else { // NORMAL_TASK
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
const std::vector<std::shared_ptr<RayObject>> &args_buffer);
|
||||
ExecFunction exec_function = (ExecFunction)(base_addr + std::stoul(exec_func_offset));
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args_sbuffer);
|
||||
args_buffer);
|
||||
}
|
||||
|
||||
std::vector<size_t> data_sizes;
|
||||
|
@ -95,34 +88,56 @@ Status TaskExecutor::ExecuteTask(
|
|||
return ray::Status::OK();
|
||||
}
|
||||
|
||||
void TaskExecutor::Invoke(const TaskSpecification &task_spec,
|
||||
std::shared_ptr<msgpack::sbuffer> actor,
|
||||
AbstractRayRuntime *runtime, const uintptr_t base_addr) {
|
||||
auto args = std::make_shared<msgpack::sbuffer>(task_spec.ArgDataSize(0));
|
||||
/// TODO(Guyang Song): Avoid the memory copy.
|
||||
args->write(reinterpret_cast<const char *>(task_spec.ArgData(0)),
|
||||
task_spec.ArgDataSize(0));
|
||||
void TaskExecutor::Invoke(
|
||||
const TaskSpecification &task_spec, std::shared_ptr<msgpack::sbuffer> actor,
|
||||
AbstractRayRuntime *runtime, const uintptr_t base_addr,
|
||||
std::unordered_map<ActorID, std::unique_ptr<ActorContext>> &actor_contexts,
|
||||
absl::Mutex &actor_contexts_mutex) {
|
||||
std::vector<std::shared_ptr<RayObject>> args_buffer;
|
||||
for (size_t i = 0; i < task_spec.NumArgs(); i++) {
|
||||
std::shared_ptr<::ray::LocalMemoryBuffer> memory_buffer = nullptr;
|
||||
if (task_spec.ArgByRef(i)) {
|
||||
auto arg = runtime->Get(task_spec.ArgId(i));
|
||||
memory_buffer = std::make_shared<::ray::LocalMemoryBuffer>(
|
||||
reinterpret_cast<uint8_t *>(arg->data()), arg->size(), true);
|
||||
} else {
|
||||
memory_buffer = std::make_shared<::ray::LocalMemoryBuffer>(
|
||||
const_cast<uint8_t *>(task_spec.ArgData(i)), task_spec.ArgDataSize(i), true);
|
||||
}
|
||||
args_buffer.emplace_back(
|
||||
std::make_shared<RayObject>(memory_buffer, nullptr, std::vector<ObjectID>()));
|
||||
}
|
||||
|
||||
auto function_descriptor = task_spec.FunctionDescriptor();
|
||||
auto typed_descriptor = function_descriptor->As<ray::CppFunctionDescriptor>();
|
||||
std::shared_ptr<msgpack::sbuffer> data;
|
||||
if (actor) {
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args,
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::vector<std::shared_ptr<RayObject>> & args_buffer,
|
||||
std::shared_ptr<msgpack::sbuffer> object);
|
||||
unsigned long offset = std::stoul(typed_descriptor->ExecFunctionOffset());
|
||||
auto address = base_addr + offset;
|
||||
ExecFunction exec_function = (ExecFunction)(address);
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args, actor);
|
||||
args_buffer, actor);
|
||||
} else {
|
||||
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
|
||||
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
|
||||
uintptr_t base_addr, size_t func_offset,
|
||||
std::vector<std::shared_ptr<RayObject>> & args_buffer);
|
||||
ExecFunction exec_function =
|
||||
(ExecFunction)(base_addr + std::stoul(typed_descriptor->ExecFunctionOffset()));
|
||||
data =
|
||||
(*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()), args);
|
||||
data = (*exec_function)(base_addr, std::stoul(typed_descriptor->FunctionOffset()),
|
||||
args_buffer);
|
||||
}
|
||||
if (task_spec.IsActorCreationTask()) {
|
||||
std::unique_ptr<ActorContext> actorContext(new ActorContext());
|
||||
actorContext->current_actor = data;
|
||||
absl::MutexLock lock(&actor_contexts_mutex);
|
||||
actor_contexts.emplace(task_spec.ActorCreationId(), std::move(actorContext));
|
||||
} else {
|
||||
runtime->Put(std::move(data), task_spec.ReturnId(0));
|
||||
}
|
||||
runtime->Put(std::move(data), task_spec.ReturnId(0));
|
||||
}
|
||||
|
||||
} // namespace api
|
||||
|
|
|
@ -25,11 +25,13 @@ class TaskExecutor {
|
|||
TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_);
|
||||
|
||||
/// TODO(Guyang Song): support multiple tasks execution
|
||||
std::unique_ptr<ObjectID> Execute(const InvocationSpec &invocation);
|
||||
std::unique_ptr<ObjectID> Execute(InvocationSpec &invocation);
|
||||
|
||||
static void Invoke(const TaskSpecification &task_spec,
|
||||
std::shared_ptr<msgpack::sbuffer> actor, AbstractRayRuntime *runtime,
|
||||
const uintptr_t base_addr);
|
||||
static void Invoke(
|
||||
const TaskSpecification &task_spec, std::shared_ptr<msgpack::sbuffer> actor,
|
||||
AbstractRayRuntime *runtime, const uintptr_t base_addr,
|
||||
std::unordered_map<ActorID, std::unique_ptr<ActorContext>> &actor_contexts,
|
||||
absl::Mutex &actor_contexts_mutex);
|
||||
|
||||
static Status ExecuteTask(
|
||||
TaskType task_type, const std::string task_name, const RayFunction &ray_function,
|
||||
|
|
|
@ -15,11 +15,11 @@ class TaskSubmitter {
|
|||
|
||||
virtual ~TaskSubmitter(){};
|
||||
|
||||
virtual ObjectID SubmitTask(const InvocationSpec &invocation) = 0;
|
||||
virtual ObjectID SubmitTask(InvocationSpec &invocation) = 0;
|
||||
|
||||
virtual ActorID CreateActor(const InvocationSpec &invocation) = 0;
|
||||
virtual ActorID CreateActor(InvocationSpec &invocation) = 0;
|
||||
|
||||
virtual ObjectID SubmitActorTask(const InvocationSpec &invocation) = 0;
|
||||
virtual ObjectID SubmitActorTask(InvocationSpec &invocation) = 0;
|
||||
};
|
||||
} // namespace api
|
||||
} // namespace ray
|
|
@ -4,7 +4,7 @@
|
|||
#include <ray/api/ray_config.h>
|
||||
#include <ray/experimental/default_worker.h>
|
||||
|
||||
using namespace ray::api;
|
||||
using namespace ::ray::api;
|
||||
|
||||
/// general function of user code
|
||||
int Return1() { return 1; }
|
||||
|
@ -63,6 +63,11 @@ TEST(RayClusterModeTest, FullTest) {
|
|||
int actor_task_result2 = *(Ray::Get(actor_object2));
|
||||
EXPECT_EQ(6, actor_task_result2);
|
||||
|
||||
/// actor task with args which pass by reference
|
||||
ActorHandle<Counter> actor3 = Ray::Actor(Counter::FactoryCreate, 6).Remote();
|
||||
auto actor_object3 = actor3.Task(&Counter::Add, actor_object2).Remote();
|
||||
int actor_task_result3 = *(Ray::Get(actor_object3));
|
||||
EXPECT_EQ(12, actor_task_result3);
|
||||
Ray::Shutdown();
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
#include <ray/api/ray_config.h>
|
||||
#include <ray/util/logging.h>
|
||||
|
||||
using namespace ::ray;
|
||||
|
||||
namespace ray {
|
||||
namespace api {
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue