[C++ Worker]Add some APIs for task call part one (#16499)

This commit is contained in:
qicosmos 2021-08-05 17:25:36 +08:00 committed by GitHub
parent d3155bc1a8
commit f1f7d4a085
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 642 additions and 139 deletions

View file

@ -58,9 +58,9 @@ class Ray {
/// Get a single object from the object store.
/// This method will be blocked until the object is ready.
///
/// \param[in] object The object reference which should be returned.
/// \return shared pointer of the result.
/// \Throws RayException if task or worker failed, or object is unreconstructable.
template <typename T>
static std::shared_ptr<T> Get(const ObjectRef<T> &object);
@ -98,6 +98,23 @@ class Ray {
template <typename F>
static ActorCreator<F> Actor(F create_func);
/// Get a handle to a global named actor.
/// Gets a handle to a global named actor with the given name. The actor must have been
/// created with global name specified.
///
/// \param[in] name The global name of the named actor.
/// \return An ActorHandle to the actor if the actor of specified name exists or an
/// empty optional object.
template <typename T>
inline static boost::optional<ActorHandle<T>> GetGlobalActor(
const std::string &actor_name);
/// Intentionally exit the current actor.
/// It is used to disconnect an actor and exit the worker.
/// \Throws RayException if the current process is a driver or the current worker is not
/// an actor.
static void ExitActor() { ray::internal::RayRuntime()->ExitActor(); }
private:
static std::once_flag is_inited_;
@ -109,6 +126,10 @@ class Ray {
template <typename FuncType>
static ActorCreator<FuncType> CreateActorInternal(FuncType &func);
template <typename T>
inline static boost::optional<ActorHandle<T>> GetActorInternal(
bool global, const std::string &actor_name);
};
} // namespace api
@ -202,5 +223,25 @@ ActorCreator<F> Ray::Actor(F create_func) {
return CreateActorInternal<F>(create_func);
}
template <typename T>
boost::optional<ActorHandle<T>> Ray::GetActorInternal(bool global,
const std::string &actor_name) {
if (actor_name.empty()) {
return {};
}
auto actor_id = ray::internal::RayRuntime()->GetActorId(global, actor_name);
if (actor_id.empty()) {
return {};
}
return ActorHandle<T>(actor_id);
}
template <typename T>
boost::optional<ActorHandle<T>> Ray::GetGlobalActor(const std::string &actor_name) {
return GetActorInternal<T>(true, actor_name);
}
} // namespace api
} // namespace ray

View file

@ -15,6 +15,7 @@
#pragma once
#include <ray/api/actor_handle.h>
#include <ray/api/task_options.h>
namespace ray {
namespace api {
@ -33,10 +34,36 @@ class ActorCreator {
template <typename... Args>
ActorHandle<GetActorType<F>> Remote(Args &&... args);
ActorCreator &SetGlobalName(std::string name) {
create_options_.name = std::move(name);
return *this;
}
ActorCreator &SetResources(std::unordered_map<std::string, double> resources) {
create_options_.resources = std::move(resources);
return *this;
}
ActorCreator &SetResource(std::string name, double value) {
create_options_.resources.emplace(std::move(name), value);
return *this;
}
ActorCreator &SetMaxRestarts(int max_restarts) {
create_options_.max_restarts = max_restarts;
return *this;
}
ActorCreator &SetMaxConcurrency(int max_concurrency) {
create_options_.max_concurrency = max_concurrency;
return *this;
}
private:
RayRuntime *runtime_;
RemoteFunctionHolder remote_function_holder_;
std::vector<ray::api::TaskArg> args_;
ActorCreationOptions create_options_{};
};
// ---------- implementation ----------
@ -44,8 +71,10 @@ template <typename F>
template <typename... Args>
ActorHandle<GetActorType<F>> ActorCreator<F>::Remote(Args &&... args) {
StaticCheck<F, Args...>();
CheckTaskOptions(create_options_.resources);
Arguments::WrapArgs(&args_, std::forward<Args>(args)...);
auto returned_actor_id = runtime_->CreateActor(remote_function_holder_, args_);
auto returned_actor_id =
runtime_->CreateActor(remote_function_holder_, args_, create_options_);
return ActorHandle<GetActorType<F>>(returned_actor_id);
}
} // namespace api

View file

@ -41,6 +41,9 @@ class ActorHandle {
template <typename F>
ActorTaskCaller<F> Task(F actor_func);
void Kill();
void Kill(bool no_restart);
/// Make ActorHandle serializable
MSGPACK_DEFINE(id_);
@ -74,5 +77,15 @@ ActorTaskCaller<F> ActorHandle<ActorType>::Task(F actor_func) {
std::move(remote_func_holder));
}
template <typename ActorType>
void ActorHandle<ActorType>::Kill() {
ray::internal::RayRuntime()->KillActor(id_, true);
}
template <typename ActorType>
void ActorHandle<ActorType>::Kill(bool no_restart) {
ray::internal::RayRuntime()->KillActor(id_, no_restart);
}
} // namespace api
} // namespace ray

View file

@ -17,7 +17,7 @@
#include <ray/api/arguments.h>
#include <ray/api/object_ref.h>
#include <ray/api/static_check.h>
#include <ray/api/task_options.h>
namespace ray {
namespace api {
@ -35,11 +35,27 @@ class ActorTaskCaller {
template <typename... Args>
ObjectRef<boost::callable_traits::return_type_t<F>> Remote(Args &&... args);
ActorTaskCaller &SetName(std::string name) {
task_options_.name = std::move(name);
return *this;
}
ActorTaskCaller &SetResources(std::unordered_map<std::string, double> resources) {
task_options_.resources = std::move(resources);
return *this;
}
ActorTaskCaller &SetResource(std::string name, double value) {
task_options_.resources.emplace(std::move(name), value);
return *this;
}
private:
RayRuntime *runtime_;
std::string id_;
RemoteFunctionHolder remote_function_holder_;
std::vector<ray::api::TaskArg> args_;
CallOptions task_options_;
};
// ---------- implementation ----------
@ -50,9 +66,11 @@ ObjectRef<boost::callable_traits::return_type_t<F>> ActorTaskCaller<F>::Remote(
Args &&... args) {
using ReturnType = boost::callable_traits::return_type_t<F>;
StaticCheck<F, Args...>();
CheckTaskOptions(task_options_.resources);
Arguments::WrapArgs(&args_, std::forward<Args>(args)...);
auto returned_object_id = runtime_->CallActor(remote_function_holder_, id_, args_);
auto returned_object_id =
runtime_->CallActor(remote_function_holder_, id_, args_, task_options_);
return ObjectRef<ReturnType>(returned_object_id);
}

View file

@ -88,27 +88,18 @@ struct Invoker {
using RetrunType = boost::callable_traits::return_type_t<Function>;
using ArgsTuple = RemoveReference_t<boost::callable_traits::args_t<Function>>;
if (std::tuple_size<ArgsTuple>::value != args_buffer.size()) {
return PackError("Arguments number not match");
throw std::invalid_argument("Arguments number not match");
}
msgpack::sbuffer result;
ArgsTuple tp{};
try {
bool is_ok = GetArgsTuple(
tp, args_buffer, std::make_index_sequence<std::tuple_size<ArgsTuple>::value>{});
if (!is_ok) {
return PackError("arguments error");
}
result = Invoker<Function>::Call<RetrunType>(func, std::move(tp));
} catch (msgpack::type_error &e) {
result = PackError(std::string("invalid arguments: ") + e.what());
} catch (const std::exception &e) {
result = PackError(std::string("function execute exception: ") + e.what());
} catch (...) {
result = PackError("unknown exception");
bool is_ok = GetArgsTuple(
tp, args_buffer, std::make_index_sequence<std::tuple_size<ArgsTuple>::value>{});
if (!is_ok) {
throw std::invalid_argument("Arguments error");
}
result = Invoker<Function>::Call<RetrunType>(func, std::move(tp));
return result;
}
@ -119,31 +110,23 @@ struct Invoker {
using ArgsTuple =
RemoveReference_t<RemoveFirst_t<boost::callable_traits::args_t<Function>>>;
if (std::tuple_size<ArgsTuple>::value != args_buffer.size()) {
return PackError("Arguments number not match");
throw std::invalid_argument("Arguments number not match");
}
msgpack::sbuffer result;
ArgsTuple tp{};
try {
bool is_ok = GetArgsTuple(
tp, args_buffer, std::make_index_sequence<std::tuple_size<ArgsTuple>::value>{});
if (!is_ok) {
return PackError("arguments error");
}
uint64_t actor_ptr =
ray::api::Serializer::Deserialize<uint64_t>(ptr->data(), ptr->size());
using Self = boost::callable_traits::class_of_t<Function>;
Self *self = (Self *)actor_ptr;
result = Invoker<Function>::CallMember<RetrunType>(func, self, std::move(tp));
} catch (msgpack::type_error &e) {
result = PackError(std::string("invalid arguments: ") + e.what());
} catch (const std::exception &e) {
result = PackError(std::string("function execute exception: ") + e.what());
} catch (...) {
result = PackError("unknown exception");
bool is_ok = GetArgsTuple(
tp, args_buffer, std::make_index_sequence<std::tuple_size<ArgsTuple>::value>{});
if (!is_ok) {
throw std::invalid_argument("Arguments error");
}
uint64_t actor_ptr =
ray::api::Serializer::Deserialize<uint64_t>(ptr->data(), ptr->size());
using Self = boost::callable_traits::class_of_t<Function>;
Self *self = (Self *)actor_ptr;
result = Invoker<Function>::CallMember<RetrunType>(func, self, std::move(tp));
return result;
}

View file

@ -34,7 +34,7 @@ inline void CheckResult(const std::shared_ptr<msgpack::sbuffer> &packed_object)
auto tp = Serializer::Deserialize<std::tuple<int, std::string>>(
packed_object->data(), packed_object->size(), 1);
std::string err_msg = std::get<1>(tp);
throw RayException(err_msg);
throw RayTaskException(err_msg);
}
}

View file

@ -29,9 +29,30 @@ class RayException : public std::exception {
std::string msg_;
};
class RayActorException : public RayException {
public:
RayActorException(const std::string &msg) : RayException(msg){};
};
class RayTaskException : public RayException {
public:
RayTaskException(const std::string &msg) : RayException(msg){};
};
class RayWorkerException : public RayException {
public:
RayWorkerException(const std::string &msg) : RayException(msg){};
};
class UnreconstructableException : public RayException {
public:
UnreconstructableException(const std::string &msg) : RayException(msg){};
};
class RayFunctionNotFound : public RayException {
public:
RayFunctionNotFound(const std::string &msg) : RayException(msg){};
};
} // namespace api
} // namespace ray

View file

@ -15,6 +15,7 @@
#pragma once
#include <ray/api/function_manager.h>
#include <ray/api/task_options.h>
#include <cstdint>
#include <memory>
@ -53,14 +54,20 @@ class RayRuntime {
int timeout_ms) = 0;
virtual std::string Call(const RemoteFunctionHolder &remote_function_holder,
std::vector<ray::api::TaskArg> &args) = 0;
std::vector<ray::api::TaskArg> &args,
const CallOptions &task_options) = 0;
virtual std::string CreateActor(const RemoteFunctionHolder &remote_function_holder,
std::vector<ray::api::TaskArg> &args) = 0;
std::vector<ray::api::TaskArg> &args,
const ActorCreationOptions &create_options) = 0;
virtual std::string CallActor(const RemoteFunctionHolder &remote_function_holder,
const std::string &actor,
std::vector<ray::api::TaskArg> &args) = 0;
std::vector<ray::api::TaskArg> &args,
const CallOptions &call_options) = 0;
virtual void AddLocalReference(const std::string &id) = 0;
virtual void RemoveLocalReference(const std::string &id) = 0;
virtual std::string GetActorId(bool global, const std::string &actor_name) = 0;
virtual void KillActor(const std::string &str_actor_id, bool no_restart) = 0;
virtual void ExitActor() = 0;
};
} // namespace api
} // namespace ray

View file

@ -43,8 +43,8 @@ class Serializer {
}
template <typename T>
static T Deserialize(const char *data, size_t size, size_t &off) {
msgpack::unpacked unpacked = msgpack::unpack(data, size, off);
static T Deserialize(const char *data, size_t size, size_t *off) {
msgpack::unpacked unpacked = msgpack::unpack(data, size, *off);
return unpacked.get().as<T>();
}

View file

@ -15,7 +15,7 @@
#pragma once
#include <ray/api/static_check.h>
#include <ray/api/task_options.h>
namespace ray {
namespace api {
@ -29,11 +29,27 @@ class TaskCaller {
template <typename... Args>
ObjectRef<boost::callable_traits::return_type_t<F>> Remote(Args &&... args);
TaskCaller &SetName(std::string name) {
task_options_.name = std::move(name);
return *this;
}
TaskCaller &SetResources(std::unordered_map<std::string, double> resources) {
task_options_.resources = std::move(resources);
return *this;
}
TaskCaller &SetResource(std::string name, double value) {
task_options_.resources.emplace(std::move(name), value);
return *this;
}
private:
RayRuntime *runtime_;
RemoteFunctionHolder remote_function_holder_{};
std::string function_name_;
std::vector<ray::api::TaskArg> args_;
CallOptions task_options_;
};
// ---------- implementation ----------
@ -51,9 +67,10 @@ template <typename... Args>
ObjectRef<boost::callable_traits::return_type_t<F>> TaskCaller<F>::Remote(
Args &&... args) {
StaticCheck<F, Args...>();
CheckTaskOptions(task_options_.resources);
using ReturnType = boost::callable_traits::return_type_t<F>;
Arguments::WrapArgs(&args_, std::forward<Args>(args)...);
auto returned_object_id = runtime_->Call(remote_function_holder_, args_);
auto returned_object_id = runtime_->Call(remote_function_holder_, args_, task_options_);
return ObjectRef<ReturnType>(returned_object_id);
}
} // namespace api

View file

@ -0,0 +1,54 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <cmath>
namespace ray {
namespace api {
inline void CheckTaskOptions(const std::unordered_map<std::string, double> &resources) {
for (auto &pair : resources) {
if (pair.first.empty() || pair.second == 0) {
throw ray::api::RayException(
"Resource values should be positive. Specified resource: " + pair.first +
" = " + std::to_string(pair.second) + ".");
}
// Note: A resource value should be an integer if it is greater than 1.0.
// e.g. 3.0 is a valid resource value, but 3.5 is not.
double intpart;
if (pair.second > 1 && std::modf(pair.second, &intpart) != 0.0) {
throw ray::api::RayException(
"A resource value should be an integer if it is greater than 1.0. Specified "
"resource: " +
pair.first + " = " + std::to_string(pair.second) + ".");
}
}
}
struct CallOptions {
std::string name;
std::unordered_map<std::string, double> resources;
};
struct ActorCreationOptions {
std::string name;
std::unordered_map<std::string, double> resources;
int max_restarts = 0;
int max_concurrency = 1;
};
} // namespace api
} // namespace ray

View file

@ -148,26 +148,27 @@ InvocationSpec BuildInvocationSpec1(TaskType task_type,
}
std::string AbstractRayRuntime::Call(const RemoteFunctionHolder &remote_function_holder,
std::vector<ray::api::TaskArg> &args) {
std::vector<ray::api::TaskArg> &args,
const CallOptions &task_options) {
auto invocation_spec = BuildInvocationSpec1(
TaskType::NORMAL_TASK, remote_function_holder, args, ActorID::Nil());
return task_submitter_->SubmitTask(invocation_spec).Binary();
return task_submitter_->SubmitTask(invocation_spec, task_options).Binary();
}
std::string AbstractRayRuntime::CreateActor(
const RemoteFunctionHolder &remote_function_holder,
std::vector<ray::api::TaskArg> &args) {
std::vector<ray::api::TaskArg> &args, const ActorCreationOptions &create_options) {
auto invocation_spec = BuildInvocationSpec1(
TaskType::ACTOR_CREATION_TASK, remote_function_holder, args, ActorID::Nil());
return task_submitter_->CreateActor(invocation_spec).Binary();
return task_submitter_->CreateActor(invocation_spec, create_options).Binary();
}
std::string AbstractRayRuntime::CallActor(
const RemoteFunctionHolder &remote_function_holder, const std::string &actor,
std::vector<ray::api::TaskArg> &args) {
std::vector<ray::api::TaskArg> &args, const CallOptions &call_options) {
auto invocation_spec = BuildInvocationSpec1(
TaskType::ACTOR_TASK, remote_function_holder, args, ActorID::FromBinary(actor));
return task_submitter_->SubmitActorTask(invocation_spec).Binary();
return task_submitter_->SubmitActorTask(invocation_spec, call_options).Binary();
}
const TaskID &AbstractRayRuntime::GetCurrentTaskId() {
@ -194,5 +195,48 @@ void AbstractRayRuntime::RemoveLocalReference(const std::string &id) {
}
}
std::string GetFullName(bool global, const std::string &name) {
if (name.empty()) {
return "";
}
return global ? name
: ::ray::CoreWorkerProcess::GetCoreWorker().GetCurrentJobId().Hex() +
"-" + name;
}
/// TODO(qicosmos): Now only support global name, will support the name of a current job.
std::string AbstractRayRuntime::GetActorId(bool global, const std::string &actor_name) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
auto full_actor_name = GetFullName(global, actor_name);
auto pair = core_worker.GetNamedActorHandle(actor_name, "");
if (!pair.second.ok()) {
RAY_LOG(WARNING) << pair.second.message();
return "";
}
std::string actor_id;
auto actor_handle = pair.first;
RAY_CHECK(actor_handle);
return actor_handle->GetActorID().Binary();
}
void AbstractRayRuntime::KillActor(const std::string &str_actor_id, bool no_restart) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
ray::ActorID actor_id = ray::ActorID::FromBinary(str_actor_id);
Status status = core_worker.KillActor(actor_id, true, no_restart);
if (!status.ok()) {
throw RayException(status.message());
}
}
void AbstractRayRuntime::ExitActor() {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
if (ConfigInternal::Instance().worker_type != ray::WorkerType::WORKER ||
core_worker.GetActorId().IsNil()) {
throw std::logic_error("This shouldn't be called on a non-actor worker.");
}
throw RayIntentionalSystemExitException("SystemExit");
}
} // namespace api
} // namespace ray

View file

@ -30,6 +30,10 @@
namespace ray {
namespace api {
class RayIntentionalSystemExitException : public RayException {
public:
RayIntentionalSystemExitException(const std::string &msg) : RayException(msg){};
};
class AbstractRayRuntime : public RayRuntime {
public:
virtual ~AbstractRayRuntime(){};
@ -38,6 +42,8 @@ class AbstractRayRuntime : public RayRuntime {
void Put(std::shared_ptr<msgpack::sbuffer> data, const ObjectID &object_id);
void Put(ray::rpc::ErrorType type, const ObjectID &object_id);
std::string Put(std::shared_ptr<msgpack::sbuffer> data);
std::shared_ptr<msgpack::sbuffer> Get(const std::string &id);
@ -48,18 +54,26 @@ class AbstractRayRuntime : public RayRuntime {
int timeout_ms);
std::string Call(const RemoteFunctionHolder &remote_function_holder,
std::vector<ray::api::TaskArg> &args);
std::vector<ray::api::TaskArg> &args, const CallOptions &task_options);
std::string CreateActor(const RemoteFunctionHolder &remote_function_holder,
std::vector<ray::api::TaskArg> &args);
std::vector<ray::api::TaskArg> &args,
const ActorCreationOptions &create_options);
std::string CallActor(const RemoteFunctionHolder &remote_function_holder,
const std::string &actor, std::vector<ray::api::TaskArg> &args);
const std::string &actor, std::vector<ray::api::TaskArg> &args,
const CallOptions &call_options);
void AddLocalReference(const std::string &id);
void RemoveLocalReference(const std::string &id);
std::string GetActorId(bool global, const std::string &actor_name);
void KillActor(const std::string &str_actor_id, bool no_restart);
void ExitActor();
const TaskID &GetCurrentTaskId();
const JobID &GetCurrentJobID();

View file

@ -61,6 +61,19 @@ std::shared_ptr<msgpack::sbuffer> NativeObjectStore::GetRaw(const ObjectID &obje
return buffers[0];
}
void NativeObjectStore::CheckException(const std::string &meta_str,
const std::shared_ptr<Buffer> &data_buffer) {
if (meta_str == std::to_string(ray::rpc::ErrorType::WORKER_DIED)) {
throw RayWorkerException({(char *)data_buffer->Data(), data_buffer->Size()});
} else if (meta_str == std::to_string(ray::rpc::ErrorType::ACTOR_DIED)) {
throw RayActorException({(char *)data_buffer->Data(), data_buffer->Size()});
} else if (meta_str == std::to_string(ray::rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE)) {
throw UnreconstructableException({(char *)data_buffer->Data(), data_buffer->Size()});
} else if (meta_str == std::to_string(ray::rpc::ErrorType::TASK_EXECUTION_EXCEPTION)) {
throw RayTaskException({(char *)data_buffer->Data(), data_buffer->Size()});
}
}
std::vector<std::shared_ptr<msgpack::sbuffer>> NativeObjectStore::GetRaw(
const std::vector<ObjectID> &ids, int timeout_ms) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
@ -73,7 +86,13 @@ std::vector<std::shared_ptr<msgpack::sbuffer>> NativeObjectStore::GetRaw(
std::vector<std::shared_ptr<msgpack::sbuffer>> result_sbuffers;
result_sbuffers.reserve(results.size());
for (size_t i = 0; i < results.size(); i++) {
auto data_buffer = results[i]->GetData();
const auto &meta = results[i]->GetMetadata();
const auto &data_buffer = results[i]->GetData();
if (meta != nullptr) {
std::string meta_str((char *)meta->Data(), meta->Size());
CheckException(meta_str, data_buffer);
}
auto sbuffer = std::make_shared<msgpack::sbuffer>(data_buffer->Size());
sbuffer->write(reinterpret_cast<const char *>(data_buffer->Data()),
data_buffer->Size());

View file

@ -40,6 +40,8 @@ class NativeObjectStore : public ObjectStore {
std::vector<std::shared_ptr<msgpack::sbuffer>> GetRaw(const std::vector<ObjectID> &ids,
int timeout_ms);
void CheckException(const std::string &meta_str,
const std::shared_ptr<Buffer> &data_buffer);
};
} // namespace api

View file

@ -30,7 +30,8 @@ LocalModeTaskSubmitter::LocalModeTaskSubmitter(
thread_pool_.reset(new boost::asio::thread_pool(10));
}
ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation) {
ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
const ActorCreationOptions &options) {
/// TODO(Guyang Song): Make the infomation of TaskSpecification more reasonable
/// We just reuse the TaskSpecification class and make the single process mode work.
/// Maybe some infomation of TaskSpecification are not reasonable or invalid.
@ -52,7 +53,9 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation) {
if (invocation.task_type == TaskType::NORMAL_TASK) {
} else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) {
invocation.actor_id = local_mode_ray_tuntime_.GetNextActorID();
builder.SetActorCreationTaskSpec(invocation.actor_id, "");
builder.SetActorCreationTaskSpec(invocation.actor_id, /*serialized_actor_handle=*/"",
options.max_restarts, /*max_task_retries=*/0, {},
options.max_concurrency);
} else if (invocation.task_type == TaskType::ACTOR_TASK) {
const TaskID actor_creation_task_id =
TaskID::ForActorCreationTask(invocation.actor_id);
@ -100,17 +103,20 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation) {
return return_object_id;
}
ObjectID LocalModeTaskSubmitter::SubmitTask(InvocationSpec &invocation) {
return Submit(invocation);
ObjectID LocalModeTaskSubmitter::SubmitTask(InvocationSpec &invocation,
const CallOptions &call_options) {
return Submit(invocation, {});
}
ActorID LocalModeTaskSubmitter::CreateActor(InvocationSpec &invocation) {
Submit(invocation);
ActorID LocalModeTaskSubmitter::CreateActor(InvocationSpec &invocation,
const ActorCreationOptions &create_options) {
Submit(invocation, create_options);
return invocation.actor_id;
}
ObjectID LocalModeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation) {
return Submit(invocation);
ObjectID LocalModeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation,
const CallOptions &call_options) {
return Submit(invocation, {});
}
} // namespace api

View file

@ -31,11 +31,12 @@ class LocalModeTaskSubmitter : public TaskSubmitter {
public:
LocalModeTaskSubmitter(LocalModeRayRuntime &local_mode_ray_tuntime);
ObjectID SubmitTask(InvocationSpec &invocation);
ObjectID SubmitTask(InvocationSpec &invocation, const CallOptions &call_options);
ActorID CreateActor(InvocationSpec &invocation);
ActorID CreateActor(InvocationSpec &invocation,
const ActorCreationOptions &create_options);
ObjectID SubmitActorTask(InvocationSpec &invocation);
ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options);
private:
std::unordered_map<ActorID, std::unique_ptr<ActorContext>> actor_contexts_;
@ -46,7 +47,7 @@ class LocalModeTaskSubmitter : public TaskSubmitter {
LocalModeRayRuntime &local_mode_ray_tuntime_;
ObjectID Submit(InvocationSpec &invocation);
ObjectID Submit(InvocationSpec &invocation, const ActorCreationOptions &options);
};
} // namespace api
} // namespace ray

View file

@ -27,40 +27,45 @@ RayFunction BuildRayFunction(InvocationSpec &invocation) {
return RayFunction(ray::Language::CPP, function_descriptor);
}
ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation) {
ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
const CallOptions &call_options) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
TaskOptions options{};
options.name = call_options.name;
options.resources = call_options.resources;
std::vector<ObjectID> return_ids;
if (invocation.task_type == TaskType::ACTOR_TASK) {
core_worker.SubmitActorTask(invocation.actor_id, BuildRayFunction(invocation),
invocation.args, TaskOptions(), &return_ids);
invocation.args, options, &return_ids);
} else {
core_worker.SubmitTask(BuildRayFunction(invocation), invocation.args, TaskOptions(),
core_worker.SubmitTask(BuildRayFunction(invocation), invocation.args, options,
&return_ids, 1, std::make_pair(PlacementGroupID::Nil(), -1),
true, "");
}
return return_ids[0];
}
ObjectID NativeTaskSubmitter::SubmitTask(InvocationSpec &invocation) {
return Submit(invocation);
ObjectID NativeTaskSubmitter::SubmitTask(InvocationSpec &invocation,
const CallOptions &call_options) {
return Submit(invocation, call_options);
}
ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation) {
ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
const ActorCreationOptions &create_options) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
std::unordered_map<std::string, double> resources;
std::string name = "";
std::string name = create_options.name;
std::string ray_namespace = "";
ActorCreationOptions actor_options{0,
0,
1,
resources,
resources,
{},
/*is_detached=*/false,
name,
ray_namespace,
/*is_asyncio=*/false};
ray::ActorCreationOptions actor_options{create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/false,
name,
ray_namespace,
/*is_asyncio=*/false};
ActorID actor_id;
auto status = core_worker.CreateActor(BuildRayFunction(invocation), invocation.args,
actor_options, "", &actor_id);
@ -70,8 +75,9 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation) {
return actor_id;
}
ObjectID NativeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation) {
return Submit(invocation);
ObjectID NativeTaskSubmitter::SubmitActorTask(InvocationSpec &invocation,
const CallOptions &task_options) {
return Submit(invocation, task_options);
}
} // namespace api

View file

@ -23,14 +23,15 @@ namespace api {
class NativeTaskSubmitter : public TaskSubmitter {
public:
ObjectID SubmitTask(InvocationSpec &invocation);
ObjectID SubmitTask(InvocationSpec &invocation, const CallOptions &call_options);
ActorID CreateActor(InvocationSpec &invocation);
ActorID CreateActor(InvocationSpec &invocation,
const ActorCreationOptions &create_options);
ObjectID SubmitActorTask(InvocationSpec &invocation);
ObjectID SubmitActorTask(InvocationSpec &invocation, const CallOptions &call_options);
private:
ObjectID Submit(InvocationSpec &invocation);
ObjectID Submit(InvocationSpec &invocation, const CallOptions &call_options);
};
} // namespace api
} // namespace ray

View file

@ -29,29 +29,25 @@ msgpack::sbuffer TaskExecutionHandler(const std::string &func_name,
const std::vector<msgpack::sbuffer> &args_buffer,
msgpack::sbuffer *actor_ptr) {
if (func_name.empty()) {
return PackError("Task function name is empty");
throw std::invalid_argument("Task function name is empty");
}
msgpack::sbuffer result;
do {
try {
if (actor_ptr) {
auto func_ptr = FunctionManager::Instance().GetMemberFunction(func_name);
if (func_ptr == nullptr) {
result = PackError("unknown actor task: " + func_name);
break;
}
result = (*func_ptr)(actor_ptr, args_buffer);
} else {
auto func_ptr = FunctionManager::Instance().GetFunction(func_name);
if (func_ptr == nullptr) {
result = PackError("unknown function: " + func_name);
break;
}
result = (*func_ptr)(args_buffer);
if (actor_ptr) {
auto func_ptr = FunctionManager::Instance().GetMemberFunction(func_name);
if (func_ptr == nullptr) {
result = PackError("unknown actor task: " + func_name);
break;
}
} catch (const std::exception &ex) {
result = PackError(ex.what());
result = (*func_ptr)(actor_ptr, args_buffer);
} else {
auto func_ptr = FunctionManager::Instance().GetFunction(func_name);
if (func_ptr == nullptr) {
result = PackError("unknown function: " + func_name);
break;
}
result = (*func_ptr)(args_buffer);
}
} while (0);
@ -82,6 +78,8 @@ std::unique_ptr<ObjectID> TaskExecutor::Execute(InvocationSpec &invocation) {
return std::make_unique<ObjectID>();
};
/// TODO(qicosmos): Need to add more details of the error messages, such as object id,
/// task id etc.
std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
const std::string &func_name, const std::vector<msgpack::sbuffer> &args_buffer,
msgpack::sbuffer *actor_ptr) {
@ -98,8 +96,24 @@ std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
RAY_LOG(DEBUG) << "Execute function " << func_name << " ok.";
return std::make_pair(ray::Status::OK(),
std::make_shared<msgpack::sbuffer>(std::move(result)));
} catch (ray::api::RayIntentionalSystemExitException &e) {
return std::make_pair(ray::Status::IntentionalSystemExit(), nullptr);
} catch (ray::api::RayException &e) {
return std::make_pair(ray::Status::NotFound(e.what()), nullptr);
} catch (msgpack::type_error &e) {
return std::make_pair(
ray::Status::Invalid(std::string("invalid arguments: ") + e.what()), nullptr);
} catch (const std::invalid_argument &e) {
return std::make_pair(
ray::Status::Invalid(std::string("function execute exception: ") + e.what()),
nullptr);
} catch (const std::exception &e) {
return std::make_pair(
ray::Status::Invalid(std::string("function execute exception: ") + e.what()),
nullptr);
} catch (...) {
return std::make_pair(ray::Status::UnknownError(std::string("unknown exception")),
nullptr);
}
}
@ -138,8 +152,20 @@ Status TaskExecutor::ExecuteTask(
std::tie(status, data) = GetExecuteResult(func_name, ray_args_buffer, nullptr);
}
std::shared_ptr<ray::LocalMemoryBuffer> meta_buffer = nullptr;
if (!status.ok()) {
return status;
if (status.IsIntentionalSystemExit()) {
return status;
}
std::string meta_str = std::to_string(ray::rpc::ErrorType::TASK_EXECUTION_EXCEPTION);
meta_buffer = std::make_shared<ray::LocalMemoryBuffer>(
reinterpret_cast<uint8_t *>(&meta_str[0]), meta_str.size(), true);
msgpack::sbuffer buf;
std::string msg = status.ToString();
buf.write(msg.data(), msg.size());
data = std::make_shared<msgpack::sbuffer>(std::move(buf));
}
results->resize(return_ids.size(), nullptr);
@ -149,7 +175,7 @@ Status TaskExecutor::ExecuteTask(
auto result_ptr = &(*results)[0];
int64_t task_output_inlined_bytes = 0;
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
result_id, data_size, nullptr, std::vector<ray::ObjectID>(),
result_id, data_size, meta_buffer, std::vector<ray::ObjectID>(),
task_output_inlined_bytes, result_ptr));
auto result = *result_ptr;
@ -186,23 +212,29 @@ void TaskExecutor::Invoke(
auto typed_descriptor = function_descriptor->As<ray::CppFunctionDescriptor>();
std::shared_ptr<msgpack::sbuffer> data;
if (actor) {
auto result = internal::TaskExecutionHandler(typed_descriptor->FunctionName(),
args_buffer, actor.get());
data = std::make_shared<msgpack::sbuffer>(std::move(result));
runtime->Put(std::move(data), task_spec.ReturnId(0));
} else {
auto result = internal::TaskExecutionHandler(typed_descriptor->FunctionName(),
args_buffer, nullptr);
data = std::make_shared<msgpack::sbuffer>(std::move(result));
if (task_spec.IsActorCreationTask()) {
std::unique_ptr<ActorContext> actorContext(new ActorContext());
actorContext->current_actor = data;
absl::MutexLock lock(&actor_contexts_mutex);
actor_contexts.emplace(task_spec.ActorCreationId(), std::move(actorContext));
} else {
try {
if (actor) {
auto result = internal::TaskExecutionHandler(typed_descriptor->FunctionName(),
args_buffer, actor.get());
data = std::make_shared<msgpack::sbuffer>(std::move(result));
runtime->Put(std::move(data), task_spec.ReturnId(0));
} else {
auto result = internal::TaskExecutionHandler(typed_descriptor->FunctionName(),
args_buffer, nullptr);
data = std::make_shared<msgpack::sbuffer>(std::move(result));
if (task_spec.IsActorCreationTask()) {
std::unique_ptr<ActorContext> actorContext(new ActorContext());
actorContext->current_actor = data;
absl::MutexLock lock(&actor_contexts_mutex);
actor_contexts.emplace(task_spec.ActorCreationId(), std::move(actorContext));
} else {
runtime->Put(std::move(data), task_spec.ReturnId(0));
}
}
} catch (std::exception &e) {
auto result = ray::internal::PackError(e.what());
auto data = std::make_shared<msgpack::sbuffer>(std::move(result));
runtime->Put(std::move(data), task_spec.ReturnId(0));
}
}

View file

@ -29,11 +29,14 @@ class TaskSubmitter {
virtual ~TaskSubmitter(){};
virtual ObjectID SubmitTask(InvocationSpec &invocation) = 0;
virtual ObjectID SubmitTask(InvocationSpec &invocation,
const CallOptions &call_options) = 0;
virtual ActorID CreateActor(InvocationSpec &invocation) = 0;
virtual ActorID CreateActor(InvocationSpec &invocation,
const ActorCreationOptions &create_options) = 0;
virtual ObjectID SubmitActorTask(InvocationSpec &invocation) = 0;
virtual ObjectID SubmitActorTask(InvocationSpec &invocation,
const CallOptions &call_options) = 0;
};
} // namespace api
} // namespace ray

View file

@ -82,6 +82,23 @@ TEST(RayApiTest, LogTest) {
boost::filesystem::remove_all(log_path);
}
TEST(RayApiTest, TaskOptionsCheckTest) {
std::unordered_map<std::string, double> map;
map.emplace("", 1);
EXPECT_THROW(CheckTaskOptions(map), RayException);
map.clear();
map.emplace("dummy", 0);
EXPECT_THROW(CheckTaskOptions(map), RayException);
map.clear();
map.emplace("dummy", 2.0);
CheckTaskOptions(map);
map.emplace("dummy1", 2.5);
EXPECT_THROW(CheckTaskOptions(map), RayException);
map.clear();
map.emplace("dummy", 0.5);
CheckTaskOptions(map);
}
TEST(RayApiTest, PutTest) {
RayConfig config;
config.local_mode = true;

View file

@ -41,6 +41,10 @@ TEST(RayClusterModeTest, FullTest) {
auto get_result = *(Ray::Get(obj));
EXPECT_EQ(12345, get_result);
auto named_obj =
Ray::Task(Return1).SetName("named_task").SetResources({{"CPU", 1.0}}).Remote();
EXPECT_EQ(1, *named_obj.Get());
/// common task without args
auto task_obj = Ray::Task(Return1).Remote();
int task_result = *(Ray::Get(task_obj));
@ -51,6 +55,35 @@ TEST(RayClusterModeTest, FullTest) {
task_result = *(Ray::Get(task_obj));
EXPECT_EQ(6, task_result);
ActorHandle<Counter> actor = Ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetMaxRestarts(1)
.SetGlobalName("named_actor")
.Remote();
auto named_actor_obj = actor.Task(&Counter::Plus1)
.SetName("named_actor_task")
.SetResources({{"CPU", 1.0}})
.Remote();
EXPECT_EQ(1, *named_actor_obj.Get());
auto named_actor_handle_optional = Ray::GetGlobalActor<Counter>("named_actor");
EXPECT_TRUE(named_actor_handle_optional);
auto &named_actor_handle = *named_actor_handle_optional;
auto named_actor_obj1 = named_actor_handle.Task(&Counter::Plus1).Remote();
EXPECT_EQ(2, *named_actor_obj1.Get());
EXPECT_FALSE(Ray::GetGlobalActor<Counter>("not_exist_actor"));
named_actor_handle.Kill(false);
std::this_thread::sleep_for(std::chrono::seconds(2));
auto named_actor_obj2 = named_actor_handle.Task(&Counter::Plus1).Remote();
EXPECT_EQ(1, *named_actor_obj2.Get());
named_actor_handle.Kill();
std::this_thread::sleep_for(std::chrono::seconds(2));
EXPECT_THROW(named_actor_handle.Task(&Counter::Plus1).Remote().Get(),
RayActorException);
EXPECT_FALSE(Ray::GetGlobalActor<Counter>("named_actor"));
/// actor task without args
ActorHandle<Counter> actor1 = Ray::Actor(RAY_FUNC(Counter::FactoryCreate)).Remote();
auto actor_object1 = actor1.Task(&Counter::Plus1).Remote();
@ -146,11 +179,58 @@ TEST(RayClusterModeTest, FullTest) {
EXPECT_EQ(result15, 29);
EXPECT_EQ(result16, 30);
Ray::Shutdown();
uint64_t pid = *actor1.Task(&Counter::GetPid).Remote().Get();
EXPECT_TRUE(Counter::IsProcessAlive(pid));
if (FLAGS_external_cluster) {
ProcessHelper::GetInstance().StopRayNode();
}
auto actor_object4 = actor1.Task(&Counter::Exit).Remote();
std::this_thread::sleep_for(std::chrono::seconds(2));
EXPECT_THROW(actor_object4.Get(), RayActorException);
EXPECT_FALSE(Counter::IsProcessAlive(pid));
}
TEST(RayClusterModeTest, MaxConcurrentTest) {
auto actor1 =
Ray::Actor(ActorConcurrentCall::FactoryCreate).SetMaxConcurrency(3).Remote();
auto object1 = actor1.Task(&ActorConcurrentCall::CountDown).Remote();
auto object2 = actor1.Task(&ActorConcurrentCall::CountDown).Remote();
auto object3 = actor1.Task(&ActorConcurrentCall::CountDown).Remote();
EXPECT_EQ(*object1.Get(), "ok");
EXPECT_EQ(*object2.Get(), "ok");
EXPECT_EQ(*object3.Get(), "ok");
}
TEST(RayClusterModeTest, ResourcesManagementTest) {
auto actor1 =
Ray::Actor(RAY_FUNC(Counter::FactoryCreate)).SetResources({{"CPU", 1.0}}).Remote();
auto r1 = actor1.Task(&Counter::Plus1).Remote();
EXPECT_EQ(*r1.Get(), 1);
auto actor2 = Ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetResources({{"CPU", 100.0}})
.Remote();
auto r2 = actor2.Task(&Counter::Plus1).Remote();
std::vector<ObjectRef<int>> objects{r2};
WaitResult<int> result = Ray::Wait(objects, 1, 1000);
EXPECT_EQ(result.ready.size(), 0);
EXPECT_EQ(result.unready.size(), 1);
auto r3 = Ray::Task(Return1).SetResource("CPU", 1.0).Remote();
EXPECT_EQ(*r3.Get(), 1);
auto r4 = Ray::Task(Return1).SetResource("CPU", 100.0).Remote();
std::vector<ObjectRef<int>> objects1{r4};
WaitResult<int> result2 = Ray::Wait(objects1, 1, 1000);
EXPECT_EQ(result2.ready.size(), 0);
EXPECT_EQ(result2.unready.size(), 1);
}
TEST(RayClusterModeTest, ExceptionTest) {
EXPECT_THROW(Ray::Task(ThrowTask).Remote().Get(), RayTaskException);
auto actor1 = Ray::Actor(RAY_FUNC(Counter::FactoryCreate, int)).Remote(1);
auto object1 = actor1.Task(&Counter::ExceptionFunc).Remote();
EXPECT_THROW(object1.Get(), RayTaskException);
}
int main(int argc, char **argv) {
@ -158,5 +238,13 @@ int main(int argc, char **argv) {
cmd_argc = &argc;
cmd_argv = &argv;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
int ret = RUN_ALL_TESTS();
Ray::Shutdown();
if (FLAGS_external_cluster) {
ProcessHelper::GetInstance().StopRayNode();
}
return ret;
}

View file

@ -14,6 +14,13 @@
#include "counter.h"
#ifdef _WIN32
#include "windows.h"
#else
#include "signal.h"
#include "unistd.h"
#endif
Counter::Counter(int init) { count = init; }
Counter *Counter::FactoryCreate() { return new Counter(0); }
@ -34,5 +41,38 @@ int Counter::Add(int x) {
return count;
}
int Counter::Exit() {
ray::api::Ray::ExitActor();
return 1;
}
bool Counter::IsProcessAlive(uint64_t pid) {
#ifdef _WIN32
auto process = OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, pid);
if (process == NULL) {
return false;
}
CloseHandle(process);
return true;
#else
if (kill(pid, 0) == -1 && errno == ESRCH) {
return false;
}
return true;
#endif
}
uint64_t Counter::GetPid() {
#ifdef _WIN32
return GetCurrentProcessId();
#else
return getpid();
#endif
}
RAY_REMOTE(RAY_FUNC(Counter::FactoryCreate), RAY_FUNC(Counter::FactoryCreate, int),
RAY_FUNC(Counter::FactoryCreate, int, int), &Counter::Plus1, &Counter::Add);
RAY_FUNC(Counter::FactoryCreate, int, int), &Counter::Plus1, &Counter::Add,
&Counter::Exit, &Counter::GetPid, &Counter::ExceptionFunc);
RAY_REMOTE(ActorConcurrentCall::FactoryCreate, &ActorConcurrentCall::CountDown);

View file

@ -15,6 +15,8 @@
#pragma once
#include <ray/api.h>
#include <condition_variable>
#include <mutex>
/// a class of user code
class Counter {
@ -27,4 +29,47 @@ class Counter {
int Plus1();
int Add(int x);
int Exit();
uint64_t GetPid();
void ExceptionFunc() { throw std::invalid_argument("error"); }
static bool IsProcessAlive(uint64_t pid);
};
class CountDownLatch {
public:
explicit CountDownLatch(size_t count) : m_count(count) {}
void Wait() {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_count > 0) {
m_cv.wait(lock, [this]() { return m_count == 0; });
}
}
void CountDown() {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_count > 0) {
m_count--;
m_cv.notify_all();
}
}
private:
std::mutex m_mutex;
std::condition_variable m_cv;
size_t m_count = 0;
};
class ActorConcurrentCall {
public:
static ActorConcurrentCall *FactoryCreate() { return new ActorConcurrentCall(); }
std::string CountDown() {
contdown_.CountDown();
contdown_.Wait();
return "ok";
}
private:
CountDownLatch contdown_{3};
};

View file

@ -17,5 +17,6 @@
int Return1() { return 1; };
int Plus1(int x) { return x + 1; };
int Plus(int x, int y) { return x + y; };
void ThrowTask() { throw std::logic_error("error"); }
RAY_REMOTE(Return1, Plus1, Plus);
RAY_REMOTE(Return1, Plus1, Plus, ThrowTask);

View file

@ -19,4 +19,5 @@
/// general function of user code
int Return1();
int Plus1(int x);
int Plus(int x, int y);
int Plus(int x, int y);
void ThrowTask();

View file

@ -221,5 +221,5 @@ TEST(RayApiTest, NotExistFunction) {
TEST(RayApiTest, ExceptionTask) {
/// Normal task Exception.
auto r4 = Ray::Task(ExceptionFunc).Remote(2);
EXPECT_THROW(r4.Get(), RayException);
EXPECT_THROW(r4.Get(), RayTaskException);
}