Basic C++ worker implementation (#6125)

This commit is contained in:
SongGuyang 2020-03-27 23:01:08 +08:00 committed by GitHub
parent 93b5c38b7d
commit c195dc8f88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
53 changed files with 2464 additions and 1 deletions

View file

@ -298,6 +298,20 @@ matrix:
- if [ $RAY_CI_RLLIB_FULL_AFFECTED != "1" ]; then exit; fi
- ./ci/keep_alive bazel test --build_tests_only --test_tag_filters=tests_dir_J,tests_dir_K,tests_dir_L,tests_dir_M,tests_dir_N,tests_dir_O,tests_dir_P,tests_dir_Q,tests_dir_R,tests_dir_S,tests_dir_T,tests_dir_U,tests_dir_V,tests_dir_W,tests_dir_X,tests_dir_Y,tests_dir_Z --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors rllib/...
# Cpp worker test
- os: linux
env:
- TESTSUITE=cpp_worker
- PYTHON=3.6
install:
- eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py`
- ./ci/travis/install-bazel.sh
- ./ci/suppress_output ./ci/travis/install-dependencies.sh
- export PATH="$HOME/miniconda/bin:$PATH"
- ./ci/suppress_output ./ci/travis/install-ray.sh
script:
- bazel test //cpp:all --build_tests_only --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=streamed
install:
- eval `python $TRAVIS_BUILD_DIR/ci/travis/determine_tests_to_run.py`
- if [ $RAY_CI_SERVE_AFFECTED != "1" ] && [ $RAY_CI_TUNE_AFFECTED != "1" ] && [ $RAY_CI_RLLIB_AFFECTED != "1" ] && [ $RAY_CI_PYTHON_AFFECTED != "1" ]; then exit; fi

View file

@ -270,6 +270,7 @@ cc_library(
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_googletest//:gtest",
"@msgpack",
"@plasma//:plasma_client",
],
)

59
bazel/BUILD.msgpack Normal file
View file

@ -0,0 +1,59 @@
filegroup(
name = "c_headers",
srcs = glob([
"include/*.h",
"include/**/*.h",
"include/**/**/*.h",
"include/**/**/**/*.h",
],
exclude = [
"include/msgpack.h",
],
),
)
filegroup(
name = "cxx_headers",
srcs = glob([
"include/*.hpp",
"include/**/*.hpp",
"include/**/**/*.hpp",
"include/**/**/**/*.hpp",
],
exclude = [
"include/msgpack.hpp",
],
),
)
filegroup(
name = "source_files",
srcs = [
"src/objectc.c",
"src/unpack.c",
"src/version.c",
"src/vrefbuffer.c",
"src/zone.c",
],
)
cc_library(
name = "msgpack",
srcs = [
":c_headers",
":cxx_headers",
":source_files",
],
hdrs = [
"include/msgpack.h",
"include/msgpack.hpp",
],
includes = [
"include",
],
strip_include_prefix = "include",
copts = [
#"-std=c++11",
],
visibility = ["//visibility:public"],
)

View file

@ -234,3 +234,10 @@ def ray_deps_setup():
url = "https://github.com/rules-proto-grpc/rules_proto_grpc/archive/a74fef39c5fe636580083545f76d1eab74f6450d.tar.gz",
sha256 = "2f6606151ec042e23396f07de9e7dcf6ca9a5db1d2b09f0cc93a7fc7f4008d1b",
)
auto_http_archive(
name = "msgpack",
build_file = True,
url = "https://github.com/msgpack/msgpack-c/archive/8085ab8721090a447cf98bb802d1406ad7afe420.tar.gz",
sha256 = "83c37c9ad926bbee68d564d9f53c6cbb057c1f755c264043ddd87d89e36d15bb",
)

View file

@ -45,6 +45,6 @@ done
pushd $ROOT_DIR/../..
BAZEL_FILES="bazel/BUILD bazel/BUILD.plasma bazel/ray.bzl BUILD.bazel java/BUILD.bazel
streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE"
cpp/BUILD.bazel streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE"
buildifier -mode=$RUN_TYPE -diff_command="diff -u" $BAZEL_FILES
popd

87
cpp/BUILD.bazel Normal file
View file

@ -0,0 +1,87 @@
# Bazel build
# C/C++ documentation: https://docs.bazel.build/versions/master/be/c-cpp.html
load("//bazel:ray.bzl", "COPTS")
cc_library(
name = "libray_api_header",
hdrs = glob([
"include/ray/*.h",
"include/ray/**/*.h",
"include/ray/**/**/*.h",
]),
copts = COPTS,
strip_include_prefix = "include",
deps = [
"//:core_worker_lib",
"//:ray_common",
"@msgpack",
],
)
cc_binary(
name = "libray_api.so",
srcs = glob([
"src/ray/api.cc",
"src/ray/api/*.cc",
"src/ray/api/*.h",
"src/ray/app/*.cc",
"src/ray/app/*.h",
"src/ray/runtime/*.cc",
"src/ray/runtime/*.h",
"src/ray/runtime/**/*.cc",
"src/ray/runtime/**/*.h",
"src/ray/runtime/task/*.cc",
"src/ray/runtime/task/*.h",
"src/ray/util/*.cc",
"src/ray/util/*.h",
"src/ray/*.cc",
"src/ray/*.h",
]),
copts = COPTS,
linkopts = ["-ldl"],
linkshared = 1,
linkstatic = False,
visibility = ["//visibility:public"],
deps = [
"libray_api_header",
"//:core_worker_lib",
"//:ray_common",
"//:ray_util",
"@boost//:asio",
"@boost//:thread",
"@com_google_absl//absl/synchronization",
"@msgpack",
],
)
cc_import(
name = "ray_api",
shared_library = "libray_api.so",
)
cc_binary(
name = "example",
testonly = 1,
srcs = [
"src/example/example.cc",
],
copts = COPTS,
deps = [
"libray_api_header",
"ray_api",
],
)
cc_test(
name = "api_test",
srcs = glob([
"src/ray/test/*.cc",
]),
copts = COPTS,
deps = [
"libray_api_header",
"ray_api",
"@com_google_googletest//:gtest_main",
],
)

231
cpp/include/ray/api.h Normal file
View file

@ -0,0 +1,231 @@
#pragma once
#include <memory>
#include <ray/api/generated/actor_funcs.generated.h>
#include <ray/api/generated/create_funcs.generated.h>
#include <ray/api/generated/funcs.generated.h>
#include <ray/api/ray_runtime.h>
#include <msgpack.hpp>
#include "ray/core.h"
namespace ray {
namespace api {
template <typename T>
class RayObject;
template <typename T>
class RayActor;
class WaitResult;
class Ray {
public:
/// Initialize Ray runtime.
static void Init();
/// Store an object in the object store.
///
/// \param[in] obj The object which should be stored.
/// \return RayObject A reference to the object in the object store.
template <typename T>
static RayObject<T> Put(const T &obj);
/// Get a list of objects from the object store.
/// This method will be blocked until all the objects are ready.
///
/// \param[in] ids The object id array which should be got.
/// \return shared pointer array of the result.
template <typename T>
static std::vector<std::shared_ptr<T>> Get(const std::vector<ObjectID> &ids);
/// Get a list of objects from the object store.
/// This method will be blocked until all the objects are ready.
///
/// \param[in] objects The object array which should be got.
/// \return shared pointer array of the result.
template <typename T>
static std::vector<std::shared_ptr<T>> Get(const std::vector<RayObject<T>> &ids);
/// Wait for a list of RayObjects to be locally available,
/// until specified number of objects are ready, or specified timeout has passed.
///
/// \param[in] ids The object id array which should be waited.
/// \param[in] num_objects The minimum number of objects to wait.
/// \param[in] timeout_ms The maximum wait time in milliseconds.
/// \return Two arrays, one containing locally available objects, one containing the
/// rest.
static WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects,
int timeout_ms);
/// Include the `Call` methods for calling remote functions.
#include "api/generated/call_funcs.generated.h"
/// Include the `CreateActor` methods for creating actors.
#include "api/generated/create_actors.generated.h"
private:
static RayRuntime *runtime_;
static std::once_flag is_inited_;
/// Used by RayObject to implement .Get()
template <typename T>
static std::shared_ptr<T> Get(const RayObject<T> &object);
template <typename ReturnType, typename FuncType, typename ExecFuncType,
typename... ArgTypes>
static RayObject<ReturnType> CallInternal(FuncType &func, ExecFuncType &exec_func,
ArgTypes &... args);
template <typename ReturnType, typename FuncType, typename ExecFuncType,
typename... ArgTypes>
static RayActor<ReturnType> CreateActorInternal(FuncType &func, ExecFuncType &exec_func,
ArgTypes &... args);
template <typename ReturnType, typename ActorType, typename FuncType,
typename ExecFuncType, typename... ArgTypes>
static RayObject<ReturnType> CallActorInternal(FuncType &actor_func,
ExecFuncType &exec_func,
RayActor<ActorType> &actor,
ArgTypes &... args);
/// Include the `Call` methods for calling actor methods.
/// Used by RayActor to implement .Call()
#include "api/generated/call_actors.generated.h"
template <typename T>
friend class RayObject;
template <typename ActorType>
friend class RayActor;
};
} // namespace api
} // namespace ray
// --------- inline implementation ------------
#include <ray/api/arguments.h>
#include <ray/api/ray_actor.h>
#include <ray/api/ray_object.h>
#include <ray/api/serializer.h>
#include <ray/api/wait_result.h>
namespace ray {
namespace api {
template <typename T>
inline static std::vector<ObjectID> RayObjectsToObjectIDs(
const std::vector<RayObject<T>> &ray_objects) {
std::vector<ObjectID> object_ids;
for (auto it = ray_objects.begin(); it != ray_objects.end(); it++) {
object_ids.push_back(it->ID());
}
return object_ids;
}
template <typename T>
inline RayObject<T> Ray::Put(const T &obj) {
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
Serializer::Serialize(packer, obj);
auto id = runtime_->Put(buffer);
return RayObject<T>(id);
}
template <typename T>
inline std::shared_ptr<T> Ray::Get(const RayObject<T> &object) {
auto packed_object = runtime_->Get(object.ID());
msgpack::unpacker unpacker;
unpacker.reserve_buffer(packed_object->size());
memcpy(unpacker.buffer(), packed_object->data(), packed_object->size());
unpacker.buffer_consumed(packed_object->size());
std::shared_ptr<T> return_object(new T);
Serializer::Deserialize(unpacker, return_object.get());
return return_object;
}
template <typename T>
inline std::vector<std::shared_ptr<T>> Ray::Get(const std::vector<ObjectID> &ids) {
auto result = runtime_->Get(ids);
std::vector<std::shared_ptr<T>> return_objects;
return_objects.reserve(result.size());
for (auto it = result.begin(); it != result.end(); it++) {
msgpack::unpacker unpacker;
unpacker.reserve_buffer((*it)->size());
memcpy(unpacker.buffer(), (*it)->data(), (*it)->size());
unpacker.buffer_consumed((*it)->size());
std::shared_ptr<T> obj(new T);
Serializer::Deserialize(unpacker, obj.get());
return_objects.push_back(obj);
}
return return_objects;
}
template <typename T>
inline std::vector<std::shared_ptr<T>> Ray::Get(const std::vector<RayObject<T>> &ids) {
auto object_ids = RayObjectsToObjectIDs<T>(ids);
return Get<T>(object_ids);
}
inline WaitResult Ray::Wait(const std::vector<ObjectID> &ids, int num_objects,
int timeout_ms) {
return runtime_->Wait(ids, num_objects, timeout_ms);
}
template <typename ReturnType, typename FuncType, typename ExecFuncType,
typename... ArgTypes>
inline RayObject<ReturnType> Ray::CallInternal(FuncType &func, ExecFuncType &exec_func,
ArgTypes &... args) {
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
Arguments::WrapArgs(packer, args...);
RemoteFunctionPtrHolder ptr;
ptr.function_pointer = reinterpret_cast<uintptr_t>(func);
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
auto returned_object_id = runtime_->Call(ptr, buffer);
return RayObject<ReturnType>(returned_object_id);
}
template <typename ReturnType, typename FuncType, typename ExecFuncType,
typename... ArgTypes>
inline RayActor<ReturnType> Ray::CreateActorInternal(FuncType &create_func,
ExecFuncType &exec_func,
ArgTypes &... args) {
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
Arguments::WrapArgs(packer, args...);
RemoteFunctionPtrHolder ptr;
ptr.function_pointer = reinterpret_cast<uintptr_t>(create_func);
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
auto returned_actor_id = runtime_->CreateActor(ptr, buffer);
return RayActor<ReturnType>(returned_actor_id);
}
template <typename ReturnType, typename ActorType, typename FuncType,
typename ExecFuncType, typename... ArgTypes>
inline RayObject<ReturnType> Ray::CallActorInternal(FuncType &actor_func,
ExecFuncType &exec_func,
RayActor<ActorType> &actor,
ArgTypes &... args) {
std::shared_ptr<msgpack::sbuffer> buffer(new msgpack::sbuffer());
msgpack::packer<msgpack::sbuffer> packer(buffer.get());
Arguments::WrapArgs(packer, args...);
RemoteFunctionPtrHolder ptr;
MemberFunctionPtrHolder holder = *(MemberFunctionPtrHolder *)(&actor_func);
ptr.function_pointer = reinterpret_cast<uintptr_t>(holder.value[0]);
ptr.exec_function_pointer = reinterpret_cast<uintptr_t>(exec_func);
auto returned_object_id = runtime_->CallActor(ptr, actor.ID(), buffer);
return RayObject<ReturnType>(returned_object_id);
}
#include <ray/api/generated/exec_funcs.generated.h>
#include <ray/api/generated/call_funcs_impl.generated.h>
#include <ray/api/generated/create_actors_impl.generated.h>
#include <ray/api/generated/call_actors_impl.generated.h>
} // namespace api
} // namespace ray

View file

@ -0,0 +1,84 @@
#pragma once
#include <ray/api/serializer.h>
#include <msgpack.hpp>
namespace ray {
namespace api {
class Arguments {
public:
static void WrapArgs(msgpack::packer<msgpack::sbuffer> &packer);
template <typename Arg1Type>
static void WrapArgs(msgpack::packer<msgpack::sbuffer> &packer, Arg1Type &arg1);
template <typename Arg1Type, typename... OtherArgTypes>
static void WrapArgs(msgpack::packer<msgpack::sbuffer> &packer, Arg1Type &arg1,
OtherArgTypes &... args);
static void UnwrapArgs(msgpack::unpacker &unpacker);
template <typename Arg1Type>
static void UnwrapArgs(msgpack::unpacker &unpacker, std::shared_ptr<Arg1Type> *arg1);
template <typename Arg1Type, typename... OtherArgTypes>
static void UnwrapArgs(msgpack::unpacker &unpacker, std::shared_ptr<Arg1Type> *arg1,
std::shared_ptr<OtherArgTypes> *... args);
};
// --------- inline implementation ------------
#include <typeinfo>
inline void Arguments::WrapArgs(msgpack::packer<msgpack::sbuffer> &packer) {}
template <typename Arg1Type>
inline void Arguments::WrapArgs(msgpack::packer<msgpack::sbuffer> &packer,
Arg1Type &arg1) {
/// Notice RayObjectClassPrefix should be modified by RayObject class name or namespace.
static const std::string RayObjectClassPrefix = "N3ray3api9RayObject";
std::string type_name = typeid(arg1).name();
if (type_name.rfind(RayObjectClassPrefix, 0) == 0) {
/// Pass by reference.
Serializer::Serialize(packer, true);
} else {
/// Pass by value.
Serializer::Serialize(packer, false);
}
Serializer::Serialize(packer, arg1);
}
template <typename Arg1Type, typename... OtherArgTypes>
inline void Arguments::WrapArgs(msgpack::packer<msgpack::sbuffer> &packer, Arg1Type &arg1,
OtherArgTypes &... args) {
WrapArgs(packer, arg1);
WrapArgs(packer, args...);
}
inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker) {}
template <typename Arg1Type>
inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker,
std::shared_ptr<Arg1Type> *arg1) {
bool is_ray_object;
Serializer::Deserialize(unpacker, &is_ray_object);
if (is_ray_object) {
RayObject<Arg1Type> ray_object;
Serializer::Deserialize(unpacker, &ray_object);
*arg1 = ray_object.Get();
} else {
Serializer::Deserialize(unpacker, arg1);
}
}
template <typename Arg1Type, typename... OtherArgTypes>
inline void Arguments::UnwrapArgs(msgpack::unpacker &unpacker,
std::shared_ptr<Arg1Type> *arg1,
std::shared_ptr<OtherArgTypes> *... args) {
UnwrapArgs(unpacker, arg1);
UnwrapArgs(unpacker, args...);
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,41 @@
/// This file is auto-generated. DO NOT EDIT.
/// The following `Call` methods are used to call remote functions of actors.
/// Their arguments and return types are as following:
/// \param[in] actor_func The function pointer to be remote execution.
/// \param[in] arg1...argn The function arguments passed by a value or RayObject.
/// \return RayObject.
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType>
RayObject<ReturnType> Call(ActorFunc0<ActorType, ReturnType> actor_func);
// 1 arg
template <typename ReturnType, typename Arg1Type>
RayObject<ReturnType> Call(ActorFunc1<ActorType, ReturnType, Arg1Type> actor_func,
Arg1Type arg1);
template <typename ReturnType, typename Arg1Type>
RayObject<ReturnType> Call(ActorFunc1<ActorType, ReturnType, Arg1Type> actor_func,
RayObject<Arg1Type> &arg1);
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func, Arg1Type arg1,
Arg2Type arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayObject<Arg1Type> &arg1, Arg2Type arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func, Arg1Type arg1,
RayObject<Arg2Type> &arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayObject<Arg1Type> &arg1, RayObject<Arg2Type> &arg2);

View file

@ -0,0 +1,57 @@
// TODO(Guyang Song): code generation
// 0 args
template <typename ActorType>
template <typename ReturnType>
RayObject<ReturnType> RayActor<ActorType>::Call(
ActorFunc0<ActorType, ReturnType> actor_func) {
return Ray::Call(actor_func, *this);
}
// 1 arg
template <typename ActorType>
template <typename ReturnType, typename Arg1Type>
RayObject<ReturnType> RayActor<ActorType>::Call(
ActorFunc1<ActorType, ReturnType, Arg1Type> actor_func, Arg1Type arg1) {
return Ray::Call(actor_func, *this, arg1);
}
template <typename ActorType>
template <typename ReturnType, typename Arg1Type>
RayObject<ReturnType> RayActor<ActorType>::Call(
ActorFunc1<ActorType, ReturnType, Arg1Type> actor_func, RayObject<Arg1Type> &arg1) {
return Ray::Call(actor_func, *this, arg1);
}
// 2 args
template <typename ActorType>
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> RayActor<ActorType>::Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func, Arg1Type arg1,
Arg2Type arg2) {
return Ray::Call(actor_func, *this, arg1, arg2);
}
template <typename ActorType>
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> RayActor<ActorType>::Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayObject<Arg1Type> &arg1, Arg2Type arg2) {
return Ray::Call(actor_func, *this, arg1, arg2);
}
template <typename ActorType>
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> RayActor<ActorType>::Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func, Arg1Type arg1,
RayObject<Arg2Type> &arg2) {
return Ray::Call(actor_func, *this, arg1, arg2);
}
template <typename ActorType>
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> RayActor<ActorType>::Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayObject<Arg1Type> &arg1, RayObject<Arg2Type> &arg2) {
return Ray::Call(actor_func, *this, arg1, arg2);
}

View file

@ -0,0 +1,15 @@
// TODO(Guyang Song): code generation
// 0 args
template <typename ActorType, typename ReturnType>
using ActorFunc0 = ReturnType (ActorType::*)();
// 1 arg
template <typename ActorType, typename ReturnType, typename Arg1Type>
using ActorFunc1 = ReturnType (ActorType::*)(Arg1Type);
// 2 args
template <typename ActorType, typename ReturnType, typename Arg1Type, typename Arg2Type>
using ActorFunc2 = ReturnType (ActorType::*)(Arg1Type, Arg2Type);

View file

@ -0,0 +1,38 @@
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType, typename ActorType>
static RayObject<ReturnType> Call(ActorFunc0<ActorType, ReturnType> actor_func,
RayActor<ActorType> &actor);
// 1 arg
template <typename ReturnType, typename ActorType, typename Arg1Type>
static RayObject<ReturnType> Call(ActorFunc1<ActorType, ReturnType, Arg1Type> actor_func,
RayActor<ActorType> &actor, Arg1Type arg1);
template <typename ReturnType, typename ActorType, typename Arg1Type>
static RayObject<ReturnType> Call(ActorFunc1<ActorType, ReturnType, Arg1Type> actor_func,
RayActor<ActorType> &actor, RayObject<Arg1Type> &arg1);
// 2 args
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
static RayObject<ReturnType> Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayActor<ActorType> &actor, Arg1Type arg1, Arg2Type arg2);
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
static RayObject<ReturnType> Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayActor<ActorType> &actor, RayObject<Arg1Type> &arg1, Arg2Type arg2);
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
static RayObject<ReturnType> Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayActor<ActorType> &actor, Arg1Type arg1, RayObject<Arg2Type> &arg2);
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
static RayObject<ReturnType> Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayActor<ActorType> &actor, RayObject<Arg1Type> &arg1, RayObject<Arg2Type> &arg2);

View file

@ -0,0 +1,60 @@
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType, typename ActorType>
RayObject<ReturnType> Ray::Call(ActorFunc0<ActorType, ReturnType> actor_func,
RayActor<ActorType> &actor) {
return CallActorInternal<ReturnType, ActorType>(
actor_func, ActorExecFunction<ReturnType, ActorType>, actor);
}
// 1 arg
template <typename ReturnType, typename ActorType, typename Arg1Type>
RayObject<ReturnType> Ray::Call(ActorFunc1<ActorType, ReturnType, Arg1Type> actor_func,
RayActor<ActorType> &actor, Arg1Type arg1) {
return CallActorInternal<ReturnType, ActorType>(
actor_func, ActorExecFunction<ReturnType, ActorType, Arg1Type>, actor, arg1);
}
template <typename ReturnType, typename ActorType, typename Arg1Type>
RayObject<ReturnType> Ray::Call(ActorFunc1<ActorType, ReturnType, Arg1Type> actor_func,
RayActor<ActorType> &actor, RayObject<Arg1Type> &arg1) {
return CallActorInternal<ReturnType, ActorType>(
actor_func, ActorExecFunction<ReturnType, ActorType, Arg1Type>, actor, arg1);
}
// 2 args
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Ray::Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayActor<ActorType> &actor, Arg1Type arg1, Arg2Type arg2) {
return CallActorInternal<ReturnType, ActorType>(
actor_func, ActorExecFunction<ReturnType, ActorType, Arg1Type, Arg2Type>, actor,
arg1, arg2);
}
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Ray::Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayActor<ActorType> &actor, RayObject<Arg1Type> &arg1, Arg2Type arg2) {
return CallActorInternal<ReturnType, ActorType>(
actor_func, ActorExecFunction<ReturnType, ActorType, Arg1Type, Arg2Type>, actor,
arg1, arg2);
}
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Ray::Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayActor<ActorType> &actor, Arg1Type arg1, RayObject<Arg2Type> &arg2) {
return CallActorInternal<ReturnType, ActorType>(
actor_func, ActorExecFunction<ReturnType, ActorType, Arg1Type, Arg2Type>, actor,
arg1, arg2);
}
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Ray::Call(
ActorFunc2<ActorType, ReturnType, Arg1Type, Arg2Type> actor_func,
RayActor<ActorType> &actor, RayObject<Arg1Type> &arg1, RayObject<Arg2Type> &arg2) {
return CallActorInternal<ReturnType, ActorType>(
actor_func, ActorExecFunction<ReturnType, ActorType, Arg1Type, Arg2Type>, actor,
arg1, arg2);
}

View file

@ -0,0 +1,37 @@
/// This file is auto-generated. DO NOT EDIT.
/// The following `Call` methods are used to call remote functions.
/// Their arguments and return types are as following:
/// \param[in] func The function pointer to be remote execution.
/// \param[in] arg1...argn The function arguments passed by a value or RayObject.
/// \return RayObject.
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType>
static RayObject<ReturnType> Call(Func0<ReturnType> func);
// 1 arg
template <typename ReturnType, typename Arg1Type>
static RayObject<ReturnType> Call(Func1<ReturnType, Arg1Type> func, Arg1Type arg1);
template <typename ReturnType, typename Arg1Type>
static RayObject<ReturnType> Call(Func1<ReturnType, Arg1Type> func,
RayObject<Arg1Type> &arg1);
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
static RayObject<ReturnType> Call(Func2<ReturnType, Arg1Type, Arg2Type> func,
Arg1Type arg1, Arg2Type arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
static RayObject<ReturnType> Call(Func2<ReturnType, Arg1Type, Arg2Type> func,
RayObject<Arg1Type> &arg1, Arg2Type arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
static RayObject<ReturnType> Call(Func2<ReturnType, Arg1Type, Arg2Type> func,
Arg1Type arg1, RayObject<Arg2Type> &arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
static RayObject<ReturnType> Call(Func2<ReturnType, Arg1Type, Arg2Type> func,
RayObject<Arg1Type> &arg1, RayObject<Arg2Type> &arg2);

View file

@ -0,0 +1,48 @@
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType>
RayObject<ReturnType> Ray::Call(Func0<ReturnType> func) {
return CallInternal<ReturnType>(func, NormalExecFunction<ReturnType>);
}
// 1 arg
template <typename ReturnType, typename Arg1Type>
RayObject<ReturnType> Ray::Call(Func1<ReturnType, Arg1Type> func, Arg1Type arg1) {
return CallInternal<ReturnType>(func, NormalExecFunction<ReturnType, Arg1Type>, arg1);
}
template <typename ReturnType, typename Arg1Type>
RayObject<ReturnType> Ray::Call(Func1<ReturnType, Arg1Type> func,
RayObject<Arg1Type> &arg1) {
return CallInternal<ReturnType>(func, NormalExecFunction<ReturnType, Arg1Type>, arg1);
}
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Ray::Call(Func2<ReturnType, Arg1Type, Arg2Type> func, Arg1Type arg1,
Arg2Type arg2) {
return CallInternal<ReturnType>(
func, NormalExecFunction<ReturnType, Arg1Type, Arg2Type>, arg1, arg2);
}
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Ray::Call(Func2<ReturnType, Arg1Type, Arg2Type> func,
RayObject<Arg1Type> &arg1, Arg2Type arg2) {
return CallInternal<ReturnType>(
func, NormalExecFunction<ReturnType, Arg1Type, Arg2Type>, arg1, arg2);
}
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Ray::Call(Func2<ReturnType, Arg1Type, Arg2Type> func, Arg1Type arg1,
RayObject<Arg2Type> &arg2) {
return CallInternal<ReturnType>(
func, NormalExecFunction<ReturnType, Arg1Type, Arg2Type>, arg1, arg2);
}
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayObject<ReturnType> Ray::Call(Func2<ReturnType, Arg1Type, Arg2Type> func,
RayObject<Arg1Type> &arg1, RayObject<Arg2Type> &arg2) {
return CallInternal<ReturnType>(
func, NormalExecFunction<ReturnType, Arg1Type, Arg2Type>, arg1, arg2);
}

View file

@ -0,0 +1,42 @@
/// This file is auto-generated. DO NOT EDIT.
/// The following `Call` methods are used to call remote functions and create an actor.
/// Their arguments and return types are as following:
/// \param[in] create_func The function pointer to be remote execution.
/// \param[in] arg1...argn The function arguments passed by a value or RayObject.
/// \return RayActor.
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType>
static RayActor<ReturnType> CreateActor(CreateActorFunc0<ReturnType> create_func);
// 1 arg
template <typename ReturnType, typename Arg1Type>
static RayActor<ReturnType> CreateActor(
CreateActorFunc1<ReturnType, Arg1Type> create_func, Arg1Type arg1);
template <typename ReturnType, typename Arg1Type>
static RayActor<ReturnType> CreateActor(
CreateActorFunc1<ReturnType, Arg1Type> create_func, RayObject<Arg1Type> &arg1);
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
static RayActor<ReturnType> CreateActor(
CreateActorFunc2<ReturnType, Arg1Type, Arg2Type> create_func, Arg1Type arg1,
Arg2Type arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
static RayActor<ReturnType> CreateActor(
CreateActorFunc2<ReturnType, Arg1Type, Arg2Type> create_func,
RayObject<Arg1Type> &arg1, Arg2Type arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
static RayActor<ReturnType> CreateActor(
CreateActorFunc2<ReturnType, Arg1Type, Arg2Type> create_func, Arg1Type arg1,
RayObject<Arg2Type> &arg2);
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
static RayActor<ReturnType> CreateActor(
CreateActorFunc2<ReturnType, Arg1Type, Arg2Type> create_func,
RayObject<Arg1Type> &arg1, RayObject<Arg2Type> &arg2);

View file

@ -0,0 +1,56 @@
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType>
RayActor<ReturnType> Ray::CreateActor(CreateActorFunc0<ReturnType> create_func) {
return CreateActorInternal<ReturnType>(create_func,
CreateActorExecFunction<ReturnType *>);
}
// 1 arg
template <typename ReturnType, typename Arg1Type>
RayActor<ReturnType> Ray::CreateActor(CreateActorFunc1<ReturnType, Arg1Type> create_func,
Arg1Type arg1) {
return CreateActorInternal<ReturnType>(
create_func, CreateActorExecFunction<ReturnType *, Arg1Type>, arg1);
}
template <typename ReturnType, typename Arg1Type>
RayActor<ReturnType> Ray::CreateActor(CreateActorFunc1<ReturnType, Arg1Type> create_func,
RayObject<Arg1Type> &arg1) {
return CreateActorInternal<ReturnType>(
create_func, CreateActorExecFunction<ReturnType *, Arg1Type>, arg1);
}
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayActor<ReturnType> Ray::CreateActor(
CreateActorFunc2<ReturnType, Arg1Type, Arg2Type> create_func, Arg1Type arg1,
Arg2Type arg2) {
return CreateActorInternal<ReturnType>(
create_func, CreateActorExecFunction<ReturnType *, Arg1Type, Arg2Type>, arg1, arg2);
}
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayActor<ReturnType> Ray::CreateActor(
CreateActorFunc2<ReturnType, Arg1Type, Arg2Type> create_func,
RayObject<Arg1Type> &arg1, Arg2Type arg2) {
return CreateActorInternal<ReturnType>(
create_func, CreateActorExecFunction<ReturnType *, Arg1Type, Arg2Type>, arg1, arg2);
}
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayActor<ReturnType> Ray::CreateActor(
CreateActorFunc2<ReturnType, Arg1Type, Arg2Type> create_func, Arg1Type arg1,
RayObject<Arg2Type> &arg2) {
return CreateActorInternal<ReturnType>(
create_func, CreateActorExecFunction<ReturnType *, Arg1Type, Arg2Type>, arg1, arg2);
}
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
RayActor<ReturnType> Ray::CreateActor(
CreateActorFunc2<ReturnType, Arg1Type, Arg2Type> create_func,
RayObject<Arg1Type> &arg1, RayObject<Arg2Type> &arg2) {
return CreateActorInternal<ReturnType>(
create_func, CreateActorExecFunction<ReturnType *, Arg1Type, Arg2Type>, arg1, arg2);
}

View file

@ -0,0 +1,14 @@
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType>
using CreateActorFunc0 = ReturnType *(*)();
// 1 arg
template <typename ReturnType, typename Arg1Type>
using CreateActorFunc1 = ReturnType *(*)(Arg1Type);
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
using CreateActorFunc2 = ReturnType *(*)(Arg1Type, Arg2Type);

View file

@ -0,0 +1,159 @@
/// This file is auto-generated. DO NOT EDIT.
/// The following execution functions are wrappers of remote functions.
/// Execution functions make remote functions executable in distributed system.
/// NormalExecFunction the wrapper of normal remote function.
/// CreateActorExecFunction the wrapper of actor creation function.
/// ActorExecFunction the wrapper of actor member function.
// TODO(Guyang Song): code generation
template <typename ReturnType, typename CastReturnType, typename... OtherArgTypes>
std::shared_ptr<msgpack::sbuffer> ExecuteNormalFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer, TaskType task_type,
std::shared_ptr<OtherArgTypes> &... args) {
msgpack::unpacker unpacker;
unpacker.reserve_buffer(args_buffer->size());
memcpy(unpacker.buffer(), args_buffer->data(), args_buffer->size());
unpacker.buffer_consumed(args_buffer->size());
Arguments::UnwrapArgs(unpacker, &args...);
ReturnType return_value;
typedef ReturnType (*Func)(OtherArgTypes...);
Func func = (Func)(base_addr + func_offset);
return_value = (*func)(*args...);
std::shared_ptr<msgpack::sbuffer> returnBuffer(new msgpack::sbuffer());
msgpack::packer<msgpack::sbuffer> packer(returnBuffer.get());
Serializer::Serialize(packer, (CastReturnType)(return_value));
return returnBuffer;
}
template <typename ReturnType, typename ActorType, typename... OtherArgTypes>
std::shared_ptr<msgpack::sbuffer> ExecuteActorFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer,
std::shared_ptr<msgpack::sbuffer> &actor_buffer,
std::shared_ptr<OtherArgTypes> &... args) {
msgpack::unpacker actor_unpacker;
actor_unpacker.reserve_buffer(actor_buffer->size());
memcpy(actor_unpacker.buffer(), actor_buffer->data(), actor_buffer->size());
actor_unpacker.buffer_consumed(actor_buffer->size());
uintptr_t actor_ptr;
Serializer::Deserialize(actor_unpacker, &actor_ptr);
ActorType *actor_object = (ActorType *)actor_ptr;
msgpack::unpacker unpacker;
unpacker.reserve_buffer(args_buffer->size());
memcpy(unpacker.buffer(), args_buffer->data(), args_buffer->size());
unpacker.buffer_consumed(args_buffer->size());
Arguments::UnwrapArgs(unpacker, &args...);
ReturnType return_value;
typedef ReturnType (ActorType::*Func)(OtherArgTypes...);
MemberFunctionPtrHolder holder;
holder.value[0] = base_addr + func_offset;
holder.value[1] = 0;
Func func = *((Func *)&holder);
return_value = (actor_object->*func)(*args...);
std::shared_ptr<msgpack::sbuffer> returnBuffer(new msgpack::sbuffer());
msgpack::packer<msgpack::sbuffer> packer(returnBuffer.get());
Serializer::Serialize(packer, return_value);
return returnBuffer;
}
// 0 args
template <typename ReturnType>
std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
return ExecuteNormalFunction<ReturnType, ReturnType>(
base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK);
}
// 1 arg
template <typename ReturnType, typename Arg1Type>
std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
std::shared_ptr<Arg1Type> arg1_ptr;
return ExecuteNormalFunction<ReturnType, ReturnType>(
base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK, arg1_ptr);
}
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
std::shared_ptr<msgpack::sbuffer> NormalExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
std::shared_ptr<Arg1Type> arg1_ptr;
std::shared_ptr<Arg2Type> arg2_ptr;
return ExecuteNormalFunction<ReturnType, ReturnType>(
base_addr, func_offset, args_buffer, TaskType::NORMAL_TASK, arg1_ptr, arg2_ptr);
}
// 0 args
template <typename ReturnType>
std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
return ExecuteNormalFunction<ReturnType, uintptr_t>(base_addr, func_offset, args_buffer,
TaskType::ACTOR_CREATION_TASK);
}
// 1 arg
template <typename ReturnType, typename Arg1Type>
std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
std::shared_ptr<Arg1Type> arg1_ptr;
return ExecuteNormalFunction<ReturnType, uintptr_t>(
base_addr, func_offset, args_buffer, TaskType::ACTOR_CREATION_TASK, arg1_ptr);
}
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
std::shared_ptr<msgpack::sbuffer> CreateActorExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer) {
std::shared_ptr<Arg1Type> arg1_ptr;
std::shared_ptr<Arg2Type> arg2_ptr;
return ExecuteNormalFunction<ReturnType, uintptr_t>(base_addr, func_offset, args_buffer,
TaskType::ACTOR_CREATION_TASK,
arg1_ptr, arg2_ptr);
}
// 0 args
template <typename ReturnType, typename ActorType>
std::shared_ptr<msgpack::sbuffer> ActorExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer,
std::shared_ptr<msgpack::sbuffer> &actor_buffer) {
return ExecuteActorFunction<ReturnType, ActorType>(base_addr, func_offset, args_buffer,
actor_buffer);
}
// 1 arg
template <typename ReturnType, typename ActorType, typename Arg1Type>
std::shared_ptr<msgpack::sbuffer> ActorExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer,
std::shared_ptr<msgpack::sbuffer> &actor_buffer) {
std::shared_ptr<Arg1Type> arg1_ptr;
return ExecuteActorFunction<ReturnType, ActorType>(base_addr, func_offset, args_buffer,
actor_buffer, arg1_ptr);
}
// 2 args
template <typename ReturnType, typename ActorType, typename Arg1Type, typename Arg2Type>
std::shared_ptr<msgpack::sbuffer> ActorExecFunction(
uintptr_t base_addr, size_t func_offset,
std::shared_ptr<msgpack::sbuffer> &args_buffer,
std::shared_ptr<msgpack::sbuffer> &actor_buffer) {
std::shared_ptr<Arg1Type> arg1_ptr;
std::shared_ptr<Arg2Type> arg2_ptr;
return ExecuteActorFunction<ReturnType, ActorType>(base_addr, func_offset, args_buffer,
actor_buffer, arg1_ptr, arg2_ptr);
}

View file

@ -0,0 +1,15 @@
// TODO(Guyang Song): code generation
// 0 args
template <typename ReturnType>
using Func0 = ReturnType (*)();
// 1 arg
template <typename ReturnType, typename Arg1Type>
using Func1 = ReturnType (*)(Arg1Type);
// 2 args
template <typename ReturnType, typename Arg1Type, typename Arg2Type>
using Func2 = ReturnType (*)(Arg1Type, Arg2Type);

View file

@ -0,0 +1,52 @@
#pragma once
#include "ray/core.h"
namespace ray {
namespace api {
#include <ray/api/generated/actor_funcs.generated.h>
/// A handle to an actor which can be used to invoke a remote actor method, with the
/// `Call` method.
/// \param ActorType The type of the concrete actor class.
/// Note, the `Call` method is defined in actor_call.generated.h.
template <typename ActorType>
class RayActor {
public:
RayActor();
RayActor(const ActorID &id);
/// Get a untyped ID of the actor
const ActorID &ID() const;
/// Include the `Call` methods for calling remote functions.
#include <ray/api/generated/actor_call.generated.h>
/// Make RayActor serializable
MSGPACK_DEFINE(id_);
private:
ActorID id_;
};
// ---------- implementation ----------
template <typename ActorType>
RayActor<ActorType>::RayActor() {}
template <typename ActorType>
RayActor<ActorType>::RayActor(const ActorID &id) {
id_ = id;
}
template <typename ActorType>
const ActorID &RayActor<ActorType>::ID() const {
return id_;
}
#include <ray/api/generated/actor_call_impl.generated.h>
} // namespace api
} // namespace ray

View file

@ -0,0 +1,20 @@
#pragma once
namespace ray {
namespace api {
enum class RunMode { SINGLE_PROCESS, SINGLE_BOX, CLUSTER };
enum class WorkerMode { NONE, DRIVER, WORKER };
/// TODO(Guyang Song): Make configuration complete and use to initialize.
class RayConfig {
public:
WorkerMode workerMode = WorkerMode::DRIVER;
RunMode runMode = RunMode::SINGLE_PROCESS;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,18 @@
#pragma once
#include <exception>
#include <string>
namespace ray {
namespace api {
class RayException : public std::exception {
public:
RayException(const std::string &msg) : msg_(msg){};
const char *what() const noexcept override { return msg_.c_str(); };
std::string msg_;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,67 @@
#pragma once
#include <memory>
#include <utility>
#include <msgpack.hpp>
#include "ray/core.h"
namespace ray {
namespace api {
/// Represents an object in the object store..
/// \param T The type of object.
template <typename T>
class RayObject {
public:
RayObject();
RayObject(const ObjectID &id);
bool operator==(const RayObject<T> &object) const;
/// Get a untyped ID of the object
const ObjectID &ID() const;
/// Get the object from the object store.
/// This method will be blocked until the object is ready.
///
/// \return shared pointer of the result.
std::shared_ptr<T> Get() const;
/// Make RayObject serializable
MSGPACK_DEFINE(id_);
private:
ObjectID id_;
};
// ---------- implementation ----------
#include <ray/api.h>
template <typename T>
RayObject<T>::RayObject() {}
template <typename T>
RayObject<T>::RayObject(const ObjectID &id) {
id_ = id;
}
template <typename T>
inline bool RayObject<T>::operator==(const RayObject<T> &object) const {
return id_ == object.id_;
}
template <typename T>
const ObjectID &RayObject<T>::ID() const {
return id_;
}
template <typename T>
inline std::shared_ptr<T> RayObject<T>::Get() const {
return Ray::Get(*this);
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,46 @@
#pragma once
#include <cstdint>
#include <memory>
#include <msgpack.hpp>
#include <typeinfo>
#include <vector>
#include <ray/api/wait_result.h>
#include "ray/core.h"
namespace ray {
namespace api {
struct MemberFunctionPtrHolder {
uintptr_t value[2];
};
struct RemoteFunctionPtrHolder {
/// The remote function pointer
uintptr_t function_pointer;
/// The executable function pointer
uintptr_t exec_function_pointer;
};
class RayRuntime {
public:
virtual ObjectID Put(std::shared_ptr<msgpack::sbuffer> data) = 0;
virtual std::shared_ptr<msgpack::sbuffer> Get(const ObjectID &id) = 0;
virtual std::vector<std::shared_ptr<msgpack::sbuffer>> Get(
const std::vector<ObjectID> &ids) = 0;
virtual WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects,
int timeout_ms) = 0;
virtual ObjectID Call(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) = 0;
virtual ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) = 0;
virtual ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor,
std::shared_ptr<msgpack::sbuffer> args) = 0;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,41 @@
#pragma once
#include <ray/api/ray_exception.h>
#include <msgpack.hpp>
namespace ray {
namespace api {
class Serializer {
public:
template <typename T>
static void Serialize(msgpack::packer<msgpack::sbuffer> &packer, const T &val);
template <typename T>
static void Deserialize(msgpack::unpacker &unpacker, T *val);
};
// ---------- implementation ----------
template <typename T>
inline void Serializer::Serialize(msgpack::packer<msgpack::sbuffer> &packer,
const T &val) {
packer.pack(val);
return;
}
template <typename T>
inline void Serializer::Deserialize(msgpack::unpacker &unpacker, T *val) {
msgpack::object_handle oh;
bool result = unpacker.next(oh);
if (result == false) {
throw RayException("unpack error");
}
msgpack::object obj = oh.get();
obj.convert(*val);
return;
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,22 @@
#pragma once
#include <vector>
#include "ray/core.h"
namespace ray {
namespace api {
class WaitResult {
public:
/// The object id array of ready objects
std::vector<ObjectID> ready;
/// The object id array of unready objects
std::vector<ObjectID> unready;
WaitResult(){};
WaitResult(std::vector<ObjectID> &&ready_objects,
std::vector<ObjectID> &&unready_objects)
: ready(std::move(ready_objects)), unready(std::move(unready_objects)){};
};
} // namespace api
} // namespace ray

13
cpp/include/ray/core.h Normal file
View file

@ -0,0 +1,13 @@
#pragma once
#include "ray/common/buffer.h"
#include "ray/common/function_descriptor.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/common/task/task_common.h"
#include "ray/common/task/task_spec.h"
#include "ray/common/task/task_util.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/util/logging.h"

View file

@ -0,0 +1,76 @@
/// This is a complete example of writing a distributed program using the C ++ worker API.
/// including the header
#include <ray/api.h>
/// using namespace
using namespace ray::api;
/// general function of user code
int Return1() { return 1; }
int Plus1(int x) { return x + 1; }
int Plus(int x, int y) { return x + y; }
/// a class of user code
class Counter {
public:
int count;
Counter() { count = 0; }
static Counter *FactoryCreate() { return new Counter(); }
/// non static function
int Add(int x) {
count += x;
return count;
}
};
int main() {
/// initialization
Ray::Init();
/// put and get object
auto obj = Ray::Put(123);
auto getRsult = obj.Get();
/// general function remote callargs passed by value
auto r0 = Ray::Call(Return1);
auto r1 = Ray::Call(Plus1, 1);
auto r2 = Ray::Call(Plus, 1, 2);
int result0 = *(r0.Get());
int result1 = *(r1.Get());
int result2 = *(r2.Get());
std::cout << "Ray::call with value results: " << result0 << " " << result1 << " "
<< result2 << std::endl;
/// general function remote callargs passed by reference
auto r3 = Ray::Call(Return1);
auto r4 = Ray::Call(Plus1, r3);
auto r5 = Ray::Call(Plus, r4, 1);
int result3 = *(r3.Get());
int result4 = *(r4.Get());
int result5 = *(r5.Get());
std::cout << "Ray::call with reference results: " << result3 << " " << result4 << " "
<< result5 << std::endl;
/// create actor and actor function remote call
RayActor<Counter> actor = Ray::CreateActor(Counter::FactoryCreate);
auto r6 = actor.Call(&Counter::Add, 5);
auto r7 = actor.Call(&Counter::Add, 1);
auto r8 = actor.Call(&Counter::Add, 1);
auto r9 = actor.Call(&Counter::Add, r8);
int result6 = *(r6.Get());
int result7 = *(r7.Get());
int result8 = *(r8.Get());
int result9 = *(r9.Get());
std::cout << "Ray::call with actor results: " << result6 << " " << result7 << " "
<< result8 << " " << result9 << std::endl;
}

20
cpp/src/ray/api.cc Normal file
View file

@ -0,0 +1,20 @@
#include <ray/api.h>
#include <ray/api/ray_config.h>
#include "runtime/abstract_ray_runtime.h"
namespace ray {
namespace api {
RayRuntime *Ray::runtime_ = nullptr;
std::once_flag Ray::is_inited_;
void Ray::Init() {
std::call_once(is_inited_, [] {
runtime_ = AbstractRayRuntime::DoInit(std::make_shared<RayConfig>());
});
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,105 @@
#include "abstract_ray_runtime.h"
#include <cassert>
#include <ray/api.h>
#include <ray/api/ray_config.h>
#include <ray/api/ray_exception.h>
#include "../util/address_helper.h"
#include "local_mode_ray_runtime.h"
namespace ray {
namespace api {
AbstractRayRuntime *AbstractRayRuntime::DoInit(std::shared_ptr<RayConfig> config) {
AbstractRayRuntime *runtime;
if (config->runMode == RunMode::SINGLE_PROCESS) {
GenerateBaseAddressOfCurrentLibrary();
runtime = new LocalModeRayRuntime(config);
} else {
throw RayException("Only single process mode supported now");
}
RAY_CHECK(runtime);
return runtime;
}
void AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data,
const ObjectID &object_id) {
object_store_->Put(object_id, data);
}
ObjectID AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data) {
ObjectID object_id =
ObjectID::ForPut(worker_->GetCurrentTaskID(), worker_->GetNextPutIndex(),
static_cast<uint8_t>(TaskTransportType::RAYLET));
Put(data, object_id);
return object_id;
}
std::shared_ptr<msgpack::sbuffer> AbstractRayRuntime::Get(const ObjectID &object_id) {
return object_store_->Get(object_id, -1);
}
std::vector<std::shared_ptr<msgpack::sbuffer>> AbstractRayRuntime::Get(
const std::vector<ObjectID> &ids) {
return object_store_->Get(ids, -1);
}
WaitResult AbstractRayRuntime::Wait(const std::vector<ObjectID> &ids, int num_objects,
int timeout_ms) {
return object_store_->Wait(ids, num_objects, timeout_ms);
}
ObjectID AbstractRayRuntime::Call(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
InvocationSpec invocationSpec;
invocationSpec.task_id =
TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task
invocationSpec.actor_id = ActorID::Nil();
invocationSpec.args = args;
invocationSpec.func_offset =
(size_t)(fptr.function_pointer - dynamic_library_base_addr);
invocationSpec.exec_func_offset =
(size_t)(fptr.exec_function_pointer - dynamic_library_base_addr);
return task_submitter_->SubmitTask(invocationSpec);
}
ActorID AbstractRayRuntime::CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
return task_submitter_->CreateActor(fptr, args);
}
ObjectID AbstractRayRuntime::CallActor(const RemoteFunctionPtrHolder &fptr,
const ActorID &actor,
std::shared_ptr<msgpack::sbuffer> args) {
InvocationSpec invocationSpec;
invocationSpec.task_id =
TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task
invocationSpec.actor_id = actor;
invocationSpec.args = args;
invocationSpec.func_offset =
(size_t)(fptr.function_pointer - dynamic_library_base_addr);
invocationSpec.exec_func_offset =
(size_t)(fptr.exec_function_pointer - dynamic_library_base_addr);
return task_submitter_->SubmitActorTask(invocationSpec);
}
const TaskID &AbstractRayRuntime::GetCurrentTaskId() {
return worker_->GetCurrentTaskID();
}
const JobID &AbstractRayRuntime::GetCurrentJobID() { return worker_->GetCurrentJobID(); }
ActorID AbstractRayRuntime::GetNextActorID() {
const int next_task_index = worker_->GetNextTaskIndex();
const ActorID actor_id = ActorID::Of(worker_->GetCurrentJobID(),
worker_->GetCurrentTaskID(), next_task_index);
return actor_id;
}
const std::unique_ptr<WorkerContext> &AbstractRayRuntime::GetWorkerContext() {
return worker_;
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,62 @@
#pragma once
#include <mutex>
#include <ray/api/ray_config.h>
#include <ray/api/ray_runtime.h>
#include <msgpack.hpp>
#include "./object/object_store.h"
#include "./task/task_executor.h"
#include "./task/task_submitter.h"
#include "ray/core.h"
namespace ray {
namespace api {
class AbstractRayRuntime : public RayRuntime {
public:
virtual ~AbstractRayRuntime(){};
void Put(std::shared_ptr<msgpack::sbuffer> data, const ObjectID &object_id);
ObjectID Put(std::shared_ptr<msgpack::sbuffer> data);
std::shared_ptr<msgpack::sbuffer> Get(const ObjectID &id);
std::vector<std::shared_ptr<msgpack::sbuffer>> Get(const std::vector<ObjectID> &ids);
WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects, int timeout_ms);
ObjectID Call(RemoteFunctionPtrHolder &fptr, std::shared_ptr<msgpack::sbuffer> args);
ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args);
ObjectID CallActor(const RemoteFunctionPtrHolder &fptr, const ActorID &actor,
std::shared_ptr<msgpack::sbuffer> args);
ActorID GetNextActorID();
const TaskID &GetCurrentTaskId();
const JobID &GetCurrentJobID();
const std::unique_ptr<WorkerContext> &GetWorkerContext();
protected:
std::shared_ptr<RayConfig> config_;
std::unique_ptr<WorkerContext> worker_;
std::unique_ptr<TaskSubmitter> task_submitter_;
std::unique_ptr<TaskExecutor> task_executor_;
std::unique_ptr<ObjectStore> object_store_;
private:
static AbstractRayRuntime *DoInit(std::shared_ptr<RayConfig> config);
void Execute(const TaskSpecification &task_spec);
friend class Ray;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,22 @@
#include "local_mode_ray_runtime.h"
#include <ray/api.h>
#include "../util/address_helper.h"
#include "./object/local_mode_object_store.h"
#include "./object/object_store.h"
#include "./task/local_mode_task_submitter.h"
namespace ray {
namespace api {
LocalModeRayRuntime::LocalModeRayRuntime(std::shared_ptr<RayConfig> config) {
config_ = config;
worker_ =
std::unique_ptr<WorkerContext>(new WorkerContext(WorkerType::DRIVER, JobID::Nil()));
object_store_ = std::unique_ptr<ObjectStore>(new LocalModeObjectStore(*this));
task_submitter_ = std::unique_ptr<TaskSubmitter>(new LocalModeTaskSubmitter(*this));
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,17 @@
#pragma once
#include <unordered_map>
#include "abstract_ray_runtime.h"
#include "ray/core.h"
namespace ray {
namespace api {
class LocalModeRayRuntime : public AbstractRayRuntime {
public:
LocalModeRayRuntime(std::shared_ptr<RayConfig> config);
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,89 @@
#include <algorithm>
#include <chrono>
#include <list>
#include <thread>
#include <ray/api/ray_exception.h>
#include "../abstract_ray_runtime.h"
#include "local_mode_object_store.h"
namespace ray {
namespace api {
LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime)
: local_mode_ray_tuntime_(local_mode_ray_tuntime) {
memory_store_ =
std::unique_ptr<::ray::CoreWorkerMemoryStore>(new ::ray::CoreWorkerMemoryStore());
}
void LocalModeObjectStore::PutRaw(const ObjectID &object_id,
std::shared_ptr<msgpack::sbuffer> data) {
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
reinterpret_cast<uint8_t *>(data->data()), data->size(), true);
auto status = memory_store_->Put(
::ray::RayObject(buffer, nullptr, std::vector<ObjectID>()), object_id);
if (!status) {
throw RayException("Put object error");
}
}
std::shared_ptr<msgpack::sbuffer> LocalModeObjectStore::GetRaw(const ObjectID &object_id,
int timeout_ms) {
std::vector<ObjectID> object_ids;
object_ids.push_back(object_id);
auto buffers = GetRaw(object_ids, timeout_ms);
RAY_CHECK(buffers.size() == 1);
return buffers[0];
}
std::vector<std::shared_ptr<msgpack::sbuffer>> LocalModeObjectStore::GetRaw(
const std::vector<ObjectID> &ids, int timeout_ms) {
std::vector<std::shared_ptr<::ray::RayObject>> results;
::ray::Status status =
memory_store_->Get(ids, (int)ids.size(), timeout_ms,
*local_mode_ray_tuntime_.GetWorkerContext(), false, &results);
if (!status.ok()) {
throw RayException("Get object error: " + status.ToString());
}
RAY_CHECK(results.size() == ids.size());
std::vector<std::shared_ptr<msgpack::sbuffer>> result_sbuffers;
result_sbuffers.reserve(results.size());
for (size_t i = 0; i < results.size(); i++) {
auto data_buffer = results[i]->GetData();
auto sbuffer = std::make_shared<msgpack::sbuffer>(data_buffer->Size());
sbuffer->write(reinterpret_cast<const char *>(data_buffer->Data()),
data_buffer->Size());
result_sbuffers.push_back(sbuffer);
}
return result_sbuffers;
}
WaitResult LocalModeObjectStore::Wait(const std::vector<ObjectID> &ids, int num_objects,
int timeout_ms) {
absl::flat_hash_set<ObjectID> memory_object_ids;
for (const auto &object_id : ids) {
memory_object_ids.insert(object_id);
}
absl::flat_hash_set<ObjectID> ready;
::ray::Status status =
memory_store_->Wait(memory_object_ids, num_objects, timeout_ms,
*local_mode_ray_tuntime_.GetWorkerContext(), &ready);
if (!status.ok()) {
throw RayException("Wait object error: " + status.ToString());
}
std::vector<ObjectID> ready_vector;
ready_vector.reserve(ready.size());
std::vector<ObjectID> unready_vector;
unready_vector.reserve(ids.size() - ready.size());
for (size_t i = 0; i < ids.size(); i++) {
if (ready.find(ids[i]) != ready.end()) {
ready_vector.push_back(ids[i]);
} else {
unready_vector.push_back(ids[i]);
}
}
WaitResult result(std::move(ready_vector), std::move(unready_vector));
return result;
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,33 @@
#pragma once
#include <unordered_map>
#include "ray/core.h"
#include "../local_mode_ray_runtime.h"
#include "object_store.h"
namespace ray {
namespace api {
class LocalModeObjectStore : public ObjectStore {
public:
LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime);
WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects, int timeout_ms);
private:
void PutRaw(const ObjectID &object_id, std::shared_ptr<msgpack::sbuffer> data);
std::shared_ptr<msgpack::sbuffer> GetRaw(const ObjectID &object_id, int timeout_ms);
std::vector<std::shared_ptr<msgpack::sbuffer>> GetRaw(const std::vector<ObjectID> &ids,
int timeout_ms);
std::unique_ptr<::ray::CoreWorkerMemoryStore> memory_store_;
LocalModeRayRuntime &local_mode_ray_tuntime_;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,24 @@
#include "object_store.h"
#include <memory>
#include <utility>
namespace ray {
namespace api {
void ObjectStore::Put(const ObjectID &object_id, std::shared_ptr<msgpack::sbuffer> data) {
PutRaw(object_id, data);
}
std::shared_ptr<msgpack::sbuffer> ObjectStore::Get(const ObjectID &object_id,
int timeout_ms) {
return GetRaw(object_id, timeout_ms);
}
std::vector<std::shared_ptr<msgpack::sbuffer>> ObjectStore::Get(
const std::vector<ObjectID> &ids, int timeout_ms) {
return GetRaw(ids, timeout_ms);
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,65 @@
#pragma once
#include <memory>
#include <ray/api/wait_result.h>
#include <msgpack.hpp>
namespace ray {
namespace api {
class ObjectStore {
public:
/// The default timeout to get object.
static const int default_get_timeout_ms = 1000;
virtual ~ObjectStore(){};
/// Store an object in the object store.
///
/// \param[in] object_id The object which should be stored.
/// \param[in] data The Serialized object buffer which should be stored.
void Put(const ObjectID &object_id, std::shared_ptr<msgpack::sbuffer> data);
/// Get a single object from the object store.
/// This method will be blocked until the object are ready or wait for timeout.
///
/// \param[in] object_id The object id which should be got.
/// \param[in] timeout_ms The maximum wait time in milliseconds.
/// \return shared pointer of the result buffer.
std::shared_ptr<msgpack::sbuffer> Get(const ObjectID &object_id,
int timeout_ms = default_get_timeout_ms);
/// Get a list of objects from the object store.
/// This method will be blocked until all the objects are ready or wait for timeout.
///
/// \param[in] ids The object id array which should be got.
/// \param[in] timeout_ms The maximum wait time in milliseconds.
/// \return shared pointer array of the result buffer.
std::vector<std::shared_ptr<msgpack::sbuffer>> Get(
const std::vector<ObjectID> &ids, int timeout_ms = default_get_timeout_ms);
/// Wait for a list of RayObjects to be locally available,
/// until specified number of objects are ready, or specified timeout has passed.
///
/// \param[in] ids The object id array which should be waited.
/// \param[in] num_objects The minimum number of objects to wait.
/// \param[in] timeout_ms The maximum wait time in milliseconds.
/// \return WaitResult Two arrays, one containing locally available objects, one
/// containing the rest.
virtual WaitResult Wait(const std::vector<ObjectID> &ids, int num_objects,
int timeout_ms) = 0;
private:
virtual void PutRaw(const ObjectID &object_id,
std::shared_ptr<msgpack::sbuffer> data) = 0;
virtual std::shared_ptr<msgpack::sbuffer> GetRaw(const ObjectID &object_id,
int timeout_ms) = 0;
virtual std::vector<std::shared_ptr<msgpack::sbuffer>> GetRaw(
const std::vector<ObjectID> &ids, int timeout_ms) = 0;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,22 @@
#pragma once
#include <msgpack.hpp>
#include "ray/core.h"
namespace ray {
namespace api {
class InvocationSpec {
public:
TaskID task_id;
ActorID actor_id;
int actor_counter;
/// Remote function offset from base address.
size_t func_offset;
/// Executable function offset from base address.
size_t exec_func_offset;
std::shared_ptr<msgpack::sbuffer> args;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,110 @@
#include <boost/asio/post.hpp>
#include <memory>
#include <ray/api/ray_exception.h>
#include "../../util/address_helper.h"
#include "../abstract_ray_runtime.h"
#include "local_mode_task_submitter.h"
namespace ray {
namespace api {
LocalModeTaskSubmitter::LocalModeTaskSubmitter(
LocalModeRayRuntime &local_mode_ray_tuntime)
: local_mode_ray_tuntime_(local_mode_ray_tuntime) {
thread_pool_.reset(new boost::asio::thread_pool(10));
}
ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskType type) {
/// TODO(Guyang Song): Make the infomation of TaskSpecification more reasonable
/// We just reuse the TaskSpecification class and make the single process mode work.
/// Maybe some infomation of TaskSpecification are not reasonable or invalid.
/// We will enhance this after implement the cluster mode.
auto functionDescriptor = FunctionDescriptorBuilder::BuildCpp(
"SingleProcess", std::to_string(invocation.func_offset),
std::to_string(invocation.exec_func_offset));
rpc::Address address;
std::unordered_map<std::string, double> required_resources;
std::unordered_map<std::string, double> required_placement_resources;
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(invocation.task_id, rpc::Language::CPP, functionDescriptor,
local_mode_ray_tuntime_.GetCurrentJobID(),
local_mode_ray_tuntime_.GetCurrentTaskId(), 0,
local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1,
required_resources, required_placement_resources);
if (type == TaskType::NORMAL_TASK) {
} else if (type == TaskType::ACTOR_CREATION_TASK) {
builder.SetActorCreationTaskSpec(invocation.actor_id);
} else if (type == TaskType::ACTOR_TASK) {
const TaskID actor_creation_task_id =
TaskID::ForActorCreationTask(invocation.actor_id);
const ObjectID actor_creation_dummy_object_id = ObjectID::ForTaskReturn(
actor_creation_task_id, 1, static_cast<int>(ray::TaskTransportType::RAYLET));
builder.SetActorTaskSpec(invocation.actor_id, actor_creation_dummy_object_id,
ObjectID(), invocation.actor_counter);
} else {
throw RayException("unknown task type");
}
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
reinterpret_cast<uint8_t *>(invocation.args->data()), invocation.args->size(),
true);
/// TODO(Guyang Song): Use both 'AddByRefArg' and 'AddByValueArg' to distinguish
builder.AddByValueArg(::ray::RayObject(buffer, nullptr, std::vector<ObjectID>()));
auto task_specification = builder.Build();
ObjectID return_object_id =
task_specification.ReturnId(0, ray::TaskTransportType::RAYLET);
std::shared_ptr<msgpack::sbuffer> actor;
std::shared_ptr<absl::Mutex> mutex;
if (type == TaskType::ACTOR_TASK) {
absl::MutexLock lock(&actor_contexts_mutex_);
actor = actor_contexts_.at(invocation.actor_id).get()->current_actor;
mutex = actor_contexts_.at(invocation.actor_id).get()->actor_mutex;
}
AbstractRayRuntime *runtime = &local_mode_ray_tuntime_;
if (type == TaskType::ACTOR_CREATION_TASK || type == TaskType::ACTOR_TASK) {
/// TODO(Guyang Song): Handle task dependencies.
/// Execute actor task directly in the main thread because we must guarantee the actor
/// task executed by calling order.
TaskExecutor::Invoke(task_specification, actor, runtime);
} else {
boost::asio::post(*thread_pool_.get(),
std::bind(
[actor, mutex, runtime](TaskSpecification &ts) {
if (mutex) {
absl::MutexLock lock(mutex.get());
}
TaskExecutor::Invoke(ts, actor, runtime);
},
std::move(task_specification)));
}
return return_object_id;
}
ObjectID LocalModeTaskSubmitter::SubmitTask(const InvocationSpec &invocation) {
return Submit(invocation, TaskType::NORMAL_TASK);
}
ActorID LocalModeTaskSubmitter::CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) {
ActorID id = local_mode_ray_tuntime_.GetNextActorID();
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
ExecFunction exec_function = (ExecFunction)(fptr.exec_function_pointer);
auto data =
(*exec_function)(dynamic_library_base_addr,
(size_t)(fptr.function_pointer - dynamic_library_base_addr), args);
std::unique_ptr<ActorContext> actorContext(new ActorContext());
actorContext->current_actor = data;
absl::MutexLock lock(&actor_contexts_mutex_);
actor_contexts_.emplace(id, std::move(actorContext));
return id;
}
ObjectID LocalModeTaskSubmitter::SubmitActorTask(const InvocationSpec &invocation) {
return Submit(invocation, TaskType::ACTOR_TASK);
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,39 @@
#pragma once
#include <boost/asio/thread_pool.hpp>
#include <memory>
#include <queue>
#include "../local_mode_ray_runtime.h"
#include "absl/synchronization/mutex.h"
#include "invocation_spec.h"
#include "ray/core.h"
#include "task_executor.h"
#include "task_submitter.h"
namespace ray {
namespace api {
class LocalModeTaskSubmitter : public TaskSubmitter {
public:
LocalModeTaskSubmitter(LocalModeRayRuntime &local_mode_ray_tuntime);
ObjectID SubmitTask(const InvocationSpec &invocation);
ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args);
ObjectID SubmitActorTask(const InvocationSpec &invocation);
private:
std::unordered_map<ActorID, std::unique_ptr<ActorContext>> actor_contexts_;
absl::Mutex actor_contexts_mutex_;
std::unique_ptr<boost::asio::thread_pool> thread_pool_;
LocalModeRayRuntime &local_mode_ray_tuntime_;
ObjectID Submit(const InvocationSpec &invocation, TaskType type);
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,46 @@
#include <memory>
#include "../../util/address_helper.h"
#include "../abstract_ray_runtime.h"
#include "task_executor.h"
namespace ray {
namespace api {
// TODO(Guyang Song): Make a common task execution function used for both local mode and
// cluster mode.
std::unique_ptr<ObjectID> TaskExecutor::Execute(const InvocationSpec &invocation) {
return std::unique_ptr<ObjectID>(new ObjectID());
};
void TaskExecutor::Invoke(const TaskSpecification &task_spec,
std::shared_ptr<msgpack::sbuffer> actor,
AbstractRayRuntime *runtime) {
auto args = std::make_shared<msgpack::sbuffer>(task_spec.ArgDataSize(0));
/// TODO(Guyang Song): Avoid the memory copy.
args->write(reinterpret_cast<const char *>(task_spec.ArgData(0)),
task_spec.ArgDataSize(0));
auto function_descriptor = task_spec.FunctionDescriptor();
auto typed_descriptor = function_descriptor->As<ray::CppFunctionDescriptor>();
std::shared_ptr<msgpack::sbuffer> data;
if (actor) {
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args,
std::shared_ptr<msgpack::sbuffer> object);
ExecFunction exec_function = (ExecFunction)(
dynamic_library_base_addr + std::stoul(typed_descriptor->ExecFunctionOffset()));
data = (*exec_function)(dynamic_library_base_addr,
std::stoul(typed_descriptor->FunctionOffset()), args, actor);
} else {
typedef std::shared_ptr<msgpack::sbuffer> (*ExecFunction)(
uintptr_t base_addr, size_t func_offset, std::shared_ptr<msgpack::sbuffer> args);
ExecFunction exec_function = (ExecFunction)(
dynamic_library_base_addr + std::stoul(typed_descriptor->ExecFunctionOffset()));
data = (*exec_function)(dynamic_library_base_addr,
std::stoul(typed_descriptor->FunctionOffset()), args);
}
runtime->Put(std::move(data), task_spec.ReturnId(0, ray::TaskTransportType::RAYLET));
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,34 @@
#pragma once
#include <memory>
#include "absl/synchronization/mutex.h"
#include "invocation_spec.h"
#include "ray/core.h"
namespace ray {
namespace api {
class AbstractRayRuntime;
class ActorContext {
public:
std::shared_ptr<msgpack::sbuffer> current_actor = nullptr;
std::shared_ptr<absl::Mutex> actor_mutex;
ActorContext() { actor_mutex = std::shared_ptr<absl::Mutex>(new absl::Mutex); }
};
class TaskExecutor {
public:
/// TODO(Guyang Song): support multiple tasks execution
std::unique_ptr<ObjectID> Execute(const InvocationSpec &invocation);
static void Invoke(const TaskSpecification &task_spec,
std::shared_ptr<msgpack::sbuffer> actor,
AbstractRayRuntime *runtime);
virtual ~TaskExecutor(){};
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,25 @@
#pragma once
#include <memory>
#include <ray/api/ray_runtime.h>
#include "invocation_spec.h"
namespace ray {
namespace api {
class TaskSubmitter {
public:
TaskSubmitter(){};
virtual ~TaskSubmitter(){};
virtual ObjectID SubmitTask(const InvocationSpec &invocation) = 0;
virtual ActorID CreateActor(RemoteFunctionPtrHolder &fptr,
std::shared_ptr<msgpack::sbuffer> args) = 0;
virtual ObjectID SubmitActorTask(const InvocationSpec &invocation) = 0;
};
} // namespace api
} // namespace ray

View file

@ -0,0 +1,134 @@
#include <gtest/gtest.h>
#include <ray/api.h>
#include <future>
#include <thread>
using namespace ray::api;
int Return1() { return 1; }
int Plus1(int x) { return x + 1; }
int Plus(int x, int y) { return x + y; }
class Counter {
public:
int count;
MSGPACK_DEFINE(count);
Counter() { count = 0; }
static Counter *FactoryCreate() {
Counter *counter = new Counter();
return counter;
}
int Plus1(int x) { return x + 1; }
int Plus(int x, int y) { return x + y; }
int Add(int x) {
count += x;
return count;
}
};
TEST(RayApiTest, PutTest) {
Ray::Init();
auto obj1 = Ray::Put(1);
auto i1 = obj1.Get();
EXPECT_EQ(1, *i1);
}
TEST(RayApiTest, WaitTest) {
Ray::Init();
auto r0 = Ray::Call(Return1);
auto r1 = Ray::Call(Plus1, 3);
auto r2 = Ray::Call(Plus, 2, 3);
std::vector<ObjectID> objects = {r0.ID(), r1.ID(), r2.ID()};
WaitResult result = Ray::Wait(objects, 3, 1000);
EXPECT_EQ(result.ready.size(), 3);
EXPECT_EQ(result.unready.size(), 0);
std::vector<std::shared_ptr<int>> getResult = Ray::Get<int>(objects);
EXPECT_EQ(getResult.size(), 3);
EXPECT_EQ(*getResult[0], 1);
EXPECT_EQ(*getResult[1], 4);
EXPECT_EQ(*getResult[2], 5);
}
TEST(RayApiTest, CallWithValueTest) {
auto r0 = Ray::Call(Return1);
auto r1 = Ray::Call(Plus1, 3);
auto r2 = Ray::Call(Plus, 2, 3);
int result0 = *(r0.Get());
int result1 = *(r1.Get());
int result2 = *(r2.Get());
EXPECT_EQ(result0, 1);
EXPECT_EQ(result1, 4);
EXPECT_EQ(result2, 5);
}
TEST(RayApiTest, CallWithObjectTest) {
auto rt0 = Ray::Call(Return1);
auto rt1 = Ray::Call(Plus1, rt0);
auto rt2 = Ray::Call(Plus, rt1, 3);
auto rt3 = Ray::Call(Plus1, 3);
auto rt4 = Ray::Call(Plus, rt2, rt3);
int return0 = *(rt0.Get());
int return1 = *(rt1.Get());
int return2 = *(rt2.Get());
int return3 = *(rt3.Get());
int return4 = *(rt4.Get());
EXPECT_EQ(return0, 1);
EXPECT_EQ(return1, 2);
EXPECT_EQ(return2, 5);
EXPECT_EQ(return3, 4);
EXPECT_EQ(return4, 9);
}
TEST(RayApiTest, ActorTest) {
Ray::Init();
RayActor<Counter> actor = Ray::CreateActor(Counter::FactoryCreate);
auto rt1 = actor.Call(&Counter::Add, 1);
auto rt2 = actor.Call(&Counter::Add, 2);
auto rt3 = actor.Call(&Counter::Add, 3);
auto rt4 = actor.Call(&Counter::Add, rt3);
int return1 = *(rt1.Get());
int return2 = *(rt2.Get());
int return3 = *(rt3.Get());
int return4 = *(rt4.Get());
EXPECT_EQ(return1, 1);
EXPECT_EQ(return2, 3);
EXPECT_EQ(return3, 6);
EXPECT_EQ(return4, 12);
}
TEST(RayApiTest, CompareWithFuture) {
// future from a packaged_task
std::packaged_task<int(int)> task(Plus1);
std::future<int> f1 = task.get_future();
std::thread t(std::move(task), 1);
int rt1 = f1.get();
// future from an async()
std::future<int> f2 = std::async(std::launch::async, Plus1, 1);
int rt2 = f2.get();
// Ray API
Ray::Init();
auto f3 = Ray::Call(Plus1, 1);
int rt3 = *f3.Get();
EXPECT_EQ(rt1, 2);
EXPECT_EQ(rt2, 2);
EXPECT_EQ(rt3, 2);
t.join();
}

View file

@ -0,0 +1,43 @@
#include <gtest/gtest.h>
#include <ray/api.h>
using namespace ray::api;
TEST(SerializationTest, TypeHybridTest) {
uint32_t in_arg1 = 123456789, out_arg1;
std::string in_arg2 = "123567ABC", out_arg2;
// 1 arg
// marshall
msgpack::sbuffer buffer1;
msgpack::packer<msgpack::sbuffer> pk1(&buffer1);
Serializer::Serialize(pk1, in_arg1);
// unmarshall
msgpack::unpacker upk1;
upk1.reserve_buffer(buffer1.size());
memcpy(upk1.buffer(), buffer1.data(), buffer1.size());
upk1.buffer_consumed(buffer1.size());
Serializer::Deserialize(upk1, &out_arg1);
EXPECT_EQ(in_arg1, out_arg1);
// 2 args
// marshall
msgpack::sbuffer buffer2;
msgpack::packer<msgpack::sbuffer> pk2(&buffer2);
Serializer::Serialize(pk2, in_arg1);
Serializer::Serialize(pk2, in_arg2);
// unmarshall
msgpack::unpacker upk2;
upk2.reserve_buffer(buffer2.size());
memcpy(upk2.buffer(), buffer2.data(), buffer2.size());
upk2.buffer_consumed(buffer2.size());
Serializer::Deserialize(upk2, &out_arg1);
Serializer::Deserialize(upk2, &out_arg2);
EXPECT_EQ(in_arg1, out_arg1);
EXPECT_EQ(in_arg2, out_arg2);
}

View file

@ -0,0 +1,36 @@
#include <gtest/gtest.h>
#include <ray/api.h>
#include <chrono>
#include <thread>
using namespace ray::api;
int slow_function(int i) {
std::this_thread::sleep_for(std::chrono::seconds(i));
return i;
}
TEST(RaySlowFunctionTest, BaseTest) {
Ray::Init();
auto time1 = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch());
auto r0 = Ray::Call(slow_function, 1);
auto r1 = Ray::Call(slow_function, 2);
auto r2 = Ray::Call(slow_function, 3);
auto r3 = Ray::Call(slow_function, 4);
int result0 = *(r0.Get());
int result1 = *(r1.Get());
int result2 = *(r2.Get());
int result3 = *(r3.Get());
auto time2 = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch());
EXPECT_EQ(result0, 1);
EXPECT_EQ(result1, 2);
EXPECT_EQ(result2, 3);
EXPECT_EQ(result3, 4);
EXPECT_LT(time2.count() - time1.count(), 4200);
}

View file

@ -0,0 +1,16 @@
#include <dlfcn.h>
#include <stdint.h>
namespace ray {
namespace api {
uintptr_t dynamic_library_base_addr;
extern "C" void GenerateBaseAddressOfCurrentLibrary() {
Dl_info info;
dladdr((void *)GenerateBaseAddressOfCurrentLibrary, &info);
dynamic_library_base_addr = (uintptr_t)info.dli_fbase;
return;
}
} // namespace api
} // namespace ray

View file

@ -0,0 +1,14 @@
#pragma once
#include <dlfcn.h>
#include <stdint.h>
namespace ray {
namespace api {
/// A base address which is used to calculate function offset
extern uintptr_t dynamic_library_base_addr;
/// A fixed C language function which help to get infomation from dladdr
extern "C" void GenerateBaseAddressOfCurrentLibrary();
} // namespace api
} // namespace ray

View file

@ -44,12 +44,25 @@ FunctionDescriptor FunctionDescriptorBuilder::BuildPython(
return ray::FunctionDescriptor(new PythonFunctionDescriptor(std::move(descriptor)));
}
FunctionDescriptor FunctionDescriptorBuilder::BuildCpp(
const std::string &lib_name, const std::string &function_offset,
const std::string &exec_function_offset) {
rpc::FunctionDescriptor descriptor;
auto typed_descriptor = descriptor.mutable_cpp_function_descriptor();
typed_descriptor->set_lib_name(lib_name);
typed_descriptor->set_function_offset(function_offset);
typed_descriptor->set_exec_function_offset(exec_function_offset);
return ray::FunctionDescriptor(new CppFunctionDescriptor(std::move(descriptor)));
}
FunctionDescriptor FunctionDescriptorBuilder::FromProto(rpc::FunctionDescriptor message) {
switch (message.function_descriptor_case()) {
case ray::FunctionDescriptorType::kJavaFunctionDescriptor:
return ray::FunctionDescriptor(new ray::JavaFunctionDescriptor(std::move(message)));
case ray::FunctionDescriptorType::kPythonFunctionDescriptor:
return ray::FunctionDescriptor(new ray::PythonFunctionDescriptor(std::move(message)));
case ray::FunctionDescriptorType::kCppFunctionDescriptor:
return ray::FunctionDescriptor(new ray::CppFunctionDescriptor(std::move(message)));
default:
break;
}
@ -75,6 +88,13 @@ FunctionDescriptor FunctionDescriptorBuilder::FromVector(
function_descriptor_list[2], // function name
function_descriptor_list[3] // function hash
);
} else if (language == rpc::Language::CPP) {
RAY_CHECK(function_descriptor_list.size() == 3);
return FunctionDescriptorBuilder::BuildCpp(
function_descriptor_list[0], // lib name
function_descriptor_list[1], // function offset
function_descriptor_list[2] // exec function offset
);
} else {
RAY_LOG(FATAL) << "Unspported language " << language;
return FunctionDescriptorBuilder::Empty();

View file

@ -153,6 +153,44 @@ class PythonFunctionDescriptor : public FunctionDescriptorInterface {
const rpc::PythonFunctionDescriptor *typed_message_;
};
class CppFunctionDescriptor : public FunctionDescriptorInterface {
public:
/// Construct from a protobuf message object.
/// The input message will be **copied** into this object.
///
/// \param message The protobuf message.
explicit CppFunctionDescriptor(rpc::FunctionDescriptor message)
: FunctionDescriptorInterface(std::move(message)) {
RAY_CHECK(message_->function_descriptor_case() ==
ray::FunctionDescriptorType::kCppFunctionDescriptor);
typed_message_ = &(message_->cpp_function_descriptor());
}
virtual size_t Hash() const {
return std::hash<int>()(ray::FunctionDescriptorType::kCppFunctionDescriptor) ^
std::hash<std::string>()(typed_message_->lib_name()) ^
std::hash<std::string>()(typed_message_->function_offset()) ^
std::hash<std::string>()(typed_message_->exec_function_offset());
}
virtual std::string ToString() const {
return "{type=CppFunctionDescriptor, lib_name=" + typed_message_->lib_name() +
", function_offset=" + typed_message_->function_offset() +
", exec_function_offset=" + typed_message_->exec_function_offset() + "}";
}
std::string LibName() const { return typed_message_->lib_name(); }
std::string FunctionOffset() const { return typed_message_->function_offset(); }
std::string ExecFunctionOffset() const {
return typed_message_->exec_function_offset();
}
private:
const rpc::CppFunctionDescriptor *typed_message_;
};
typedef std::shared_ptr<FunctionDescriptorInterface> FunctionDescriptor;
inline bool operator==(const FunctionDescriptor &left, const FunctionDescriptor &right) {
@ -190,6 +228,13 @@ class FunctionDescriptorBuilder {
const std::string &function_name,
const std::string &function_hash);
/// Build a CppFunctionDescriptor.
///
/// \return a ray::CppFunctionDescriptor
static FunctionDescriptor BuildCpp(const std::string &lib_name,
const std::string &function_offset,
const std::string &exec_function_offset);
/// Build a ray::FunctionDescriptor according to input message.
///
/// \return new ray::FunctionDescriptor

View file

@ -20,6 +20,7 @@
#include <chrono>
#include <cstring>
#include <msgpack.hpp>
#include <mutex>
#include <random>
#include <string>
@ -106,6 +107,8 @@ class UniqueID : public BaseID<UniqueID> {
UniqueID() : BaseID() {}
MSGPACK_DEFINE(id_);
protected:
UniqueID(const std::string &binary);
@ -126,6 +129,8 @@ class JobID : public BaseID<JobID> {
JobID() : BaseID() {}
MSGPACK_DEFINE(id_);
private:
uint8_t id_[kLength];
};
@ -171,6 +176,8 @@ class ActorID : public BaseID<ActorID> {
/// \return The job id to which this actor belongs.
JobID JobId() const;
MSGPACK_DEFINE(id_);
private:
uint8_t id_[kLength];
};
@ -238,6 +245,8 @@ class TaskID : public BaseID<TaskID> {
/// \return The `JobID` of the job which creates this task.
JobID JobId() const;
MSGPACK_DEFINE(id_);
private:
uint8_t id_[kLength];
};
@ -371,6 +380,8 @@ class ObjectID : public BaseID<ObjectID> {
/// \return The computed object ID.
static ObjectID ForActorHandle(const ActorID &actor_id);
MSGPACK_DEFINE(id_);
private:
/// A helper method to generate an ObjectID.
static ObjectID GenerateObjectId(const std::string &task_id_binary,

View file

@ -67,11 +67,22 @@ message PythonFunctionDescriptor {
string function_hash = 4;
}
/// Function descriptor for C/C++.
message CppFunctionDescriptor {
/// Dynamic library name which contains the remote function.
string lib_name = 1;
/// Remote function offset from base address.
string function_offset = 2;
/// Executable function offset from base address.
string exec_function_offset = 3;
}
// A union wrapper for various function descriptor types.
message FunctionDescriptor {
oneof function_descriptor {
JavaFunctionDescriptor java_function_descriptor = 1;
PythonFunctionDescriptor python_function_descriptor = 2;
CppFunctionDescriptor cpp_function_descriptor = 3;
}
}