Change core worker C++ namespace to ray::core (#17610)

This commit is contained in:
Hao Chen 2021-08-08 23:34:25 +08:00 committed by GitHub
parent c315596ed2
commit 0858f0e4f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
80 changed files with 512 additions and 418 deletions

View file

@ -21,6 +21,8 @@
namespace ray {
namespace api {
using ray::core::WorkerType;
enum class RunMode { SINGLE_PROCESS, CLUSTER };
class ConfigInternal {

View file

@ -40,6 +40,10 @@ msgpack::sbuffer PackError(std::string error_msg) {
}
} // namespace internal
namespace api {
using ray::core::CoreWorkerProcess;
using ray::core::WorkerType;
std::shared_ptr<AbstractRayRuntime> AbstractRayRuntime::abstract_ray_runtime_ = nullptr;
std::shared_ptr<AbstractRayRuntime> AbstractRayRuntime::DoInit() {
@ -200,8 +204,7 @@ std::string GetFullName(bool global, const std::string &name) {
return "";
}
return global ? name
: ::ray::CoreWorkerProcess::GetCoreWorker().GetCurrentJobId().Hex() +
"-" + name;
: CoreWorkerProcess::GetCoreWorker().GetCurrentJobId().Hex() + "-" + name;
}
/// TODO(qicosmos): Now only support global name, will support the name of a current job.
@ -231,7 +234,7 @@ void AbstractRayRuntime::KillActor(const std::string &str_actor_id, bool no_rest
void AbstractRayRuntime::ExitActor() {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
if (ConfigInternal::Instance().worker_type != ray::WorkerType::WORKER ||
if (ConfigInternal::Instance().worker_type != WorkerType::WORKER ||
core_worker.GetActorId().IsNil()) {
throw std::logic_error("This shouldn't be called on a non-actor worker.");
}

View file

@ -30,6 +30,8 @@
namespace ray {
namespace api {
using ray::core::WorkerContext;
class RayIntentionalSystemExitException : public RayException {
public:
RayIntentionalSystemExitException(const std::string &msg) : RayException(msg){};
@ -99,4 +101,4 @@ class AbstractRayRuntime : public RayRuntime {
friend class Ray;
};
} // namespace api
} // namespace ray
} // namespace ray

View file

@ -25,7 +25,7 @@ namespace api {
LocalModeRayRuntime::LocalModeRayRuntime() {
worker_ = std::make_unique<WorkerContext>(
ray::WorkerType::DRIVER, ComputeDriverIdFromJob(JobID::Nil()), JobID::Nil());
ray::core::WorkerType::DRIVER, ComputeDriverIdFromJob(JobID::Nil()), JobID::Nil());
object_store_ = std::unique_ptr<ObjectStore>(new LocalModeObjectStore(*this));
task_submitter_ = std::unique_ptr<TaskSubmitter>(new LocalModeTaskSubmitter(*this));
}
@ -38,4 +38,4 @@ ActorID LocalModeRayRuntime::GetNextActorID() {
}
} // namespace api
} // namespace ray
} // namespace ray

View file

@ -25,9 +25,10 @@
namespace ray {
namespace api {
LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime)
: local_mode_ray_tuntime_(local_mode_ray_tuntime) {
memory_store_ = std::make_unique<::ray::CoreWorkerMemoryStore>();
memory_store_ = std::make_unique<CoreWorkerMemoryStore>();
}
void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
@ -106,4 +107,4 @@ void LocalModeObjectStore::AddLocalReference(const std::string &id) { return; }
void LocalModeObjectStore::RemoveLocalReference(const std::string &id) { return; }
} // namespace api
} // namespace ray
} // namespace ray

View file

@ -23,6 +23,8 @@
namespace ray {
namespace api {
using ray::core::CoreWorkerMemoryStore;
class LocalModeObjectStore : public ObjectStore {
public:
LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime);
@ -44,10 +46,10 @@ class LocalModeObjectStore : public ObjectStore {
std::vector<std::shared_ptr<msgpack::sbuffer>> GetRaw(const std::vector<ObjectID> &ids,
int timeout_ms);
std::unique_ptr<::ray::CoreWorkerMemoryStore> memory_store_;
std::unique_ptr<CoreWorkerMemoryStore> memory_store_;
LocalModeRayRuntime &local_mode_ray_tuntime_;
};
} // namespace api
} // namespace ray
} // namespace ray

View file

@ -26,6 +26,8 @@
namespace ray {
namespace api {
using ray::core::CoreWorkerProcess;
void NativeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
ObjectID *object_id) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
@ -128,4 +130,4 @@ void NativeObjectStore::RemoveLocalReference(const std::string &id) {
}
}
} // namespace api
} // namespace ray
} // namespace ray

View file

@ -45,4 +45,4 @@ class NativeObjectStore : public ObjectStore {
};
} // namespace api
} // namespace ray
} // namespace ray

View file

@ -21,6 +21,9 @@
namespace ray {
namespace api {
using ray::core::CoreWorkerProcess;
using ray::core::TaskOptions;
RayFunction BuildRayFunction(InvocationSpec &invocation) {
auto function_descriptor = FunctionDescriptorBuilder::BuildCpp(
invocation.remote_function_holder.function_name);
@ -56,16 +59,16 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
std::unordered_map<std::string, double> resources;
std::string name = create_options.name;
std::string ray_namespace = "";
ray::ActorCreationOptions actor_options{create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/false,
name,
ray_namespace,
/*is_asyncio=*/false};
ray::core::ActorCreationOptions actor_options{create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/false,
name,
ray_namespace,
/*is_asyncio=*/false};
ActorID actor_id;
auto status = core_worker.CreateActor(BuildRayFunction(invocation), invocation.args,
actor_options, "", &actor_id);

View file

@ -66,6 +66,8 @@ GetRemoteFunctions() {
namespace api {
using ray::core::CoreWorkerProcess;
std::shared_ptr<msgpack::sbuffer> TaskExecutor::current_actor_ = nullptr;
TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_)
@ -174,7 +176,7 @@ Status TaskExecutor::ExecuteTask(
auto &result_id = return_ids[0];
auto result_ptr = &(*results)[0];
int64_t task_output_inlined_bytes = 0;
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
result_id, data_size, meta_buffer, std::vector<ray::ObjectID>(),
task_output_inlined_bytes, result_ptr));
@ -185,8 +187,7 @@ Status TaskExecutor::ExecuteTask(
}
}
RAY_CHECK_OK(
ray::CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
}
return ray::Status::OK();
}

View file

@ -27,6 +27,7 @@
namespace ray {
namespace internal {
/// Execute remote functions by networking stream.
msgpack::sbuffer TaskExecutionHandler(const std::string &func_name,
const std::vector<msgpack::sbuffer> &args_buffer,
@ -44,6 +45,8 @@ BOOST_DLL_ALIAS(internal::GetRemoteFunctions, GetRemoteFunctions);
namespace api {
using ray::core::RayFunction;
class AbstractRayRuntime;
class ActorContext {
@ -85,4 +88,4 @@ class TaskExecutor {
static std::shared_ptr<msgpack::sbuffer> current_actor_;
};
} // namespace api
} // namespace ray
} // namespace ray

View file

@ -23,6 +23,9 @@
namespace ray {
namespace api {
using ray::core::CoreWorkerProcess;
using ray::core::WorkerType;
/// IP address by which the local node can be reached *from* the `address`.
///
/// The behavior should be the same as `node_ip_address_from_perspective` from Ray Python
@ -77,8 +80,7 @@ void ProcessHelper::StopRayNode() {
void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) {
std::string redis_ip = ConfigInternal::Instance().redis_ip;
if (ConfigInternal::Instance().worker_type == ray::WorkerType::DRIVER &&
redis_ip.empty()) {
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER && redis_ip.empty()) {
redis_ip = "127.0.0.1";
StartRayNode(ConfigInternal::Instance().redis_port,
ConfigInternal::Instance().redis_password);
@ -99,7 +101,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
}
std::unique_ptr<ray::gcs::GlobalStateAccessor> global_state_accessor = nullptr;
if (ConfigInternal::Instance().worker_type == ray::WorkerType::DRIVER) {
if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER) {
global_state_accessor.reset(new ray::gcs::GlobalStateAccessor(
redis_address, ConfigInternal::Instance().redis_password));
RAY_CHECK(global_state_accessor->Connect()) << "Failed to connect to GCS.";

View file

@ -21,6 +21,8 @@
namespace ray {
namespace api {
using ray::core::CoreWorkerOptions;
class ProcessHelper {
public:
void RayStart(CoreWorkerOptions::TaskExecutionCallback callback);

View file

@ -21,9 +21,9 @@
int main(int argc, char **argv) {
RAY_LOG(INFO) << "CPP default worker started.";
ray::api::ConfigInternal::Instance().worker_type = ray::WorkerType::WORKER;
ray::api::ConfigInternal::Instance().worker_type = ray::core::WorkerType::WORKER;
ray::api::RayConfig config;
ray::api::Ray::Init(config, &argc, &argv);
::ray::CoreWorkerProcess::RunTaskExecutionLoop();
::ray::core::CoreWorkerProcess::RunTaskExecutionLoop();
return 0;
}

View file

@ -144,11 +144,11 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef cppclass CLanguage "Language":
pass
cdef cppclass CWorkerType "ray::WorkerType":
cdef cppclass CWorkerType "ray::core::WorkerType":
pass
cdef cppclass CTaskType "ray::TaskType":
pass
cdef cppclass CPlacementStrategy "ray::PlacementStrategy":
cdef cppclass CPlacementStrategy "ray::core::PlacementStrategy":
pass
cdef cppclass CAddress "ray::rpc::Address":
CAddress()
@ -166,11 +166,11 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CWorkerType WORKER_TYPE_WORKER "ray::WorkerType::WORKER"
cdef CWorkerType WORKER_TYPE_DRIVER "ray::WorkerType::DRIVER"
cdef CWorkerType WORKER_TYPE_SPILL_WORKER "ray::WorkerType::SPILL_WORKER"
cdef CWorkerType WORKER_TYPE_RESTORE_WORKER "ray::WorkerType::RESTORE_WORKER" # noqa: E501
cdef CWorkerType WORKER_TYPE_UTIL_WORKER "ray::WorkerType::UTIL_WORKER" # noqa: E501
cdef CWorkerType WORKER_TYPE_WORKER "ray::core::WorkerType::WORKER"
cdef CWorkerType WORKER_TYPE_DRIVER "ray::core::WorkerType::DRIVER"
cdef CWorkerType WORKER_TYPE_SPILL_WORKER "ray::core::WorkerType::SPILL_WORKER" # noqa: E501
cdef CWorkerType WORKER_TYPE_RESTORE_WORKER "ray::core::WorkerType::RESTORE_WORKER" # noqa: E501
cdef CWorkerType WORKER_TYPE_UTIL_WORKER "ray::core::WorkerType::UTIL_WORKER" # noqa: E501
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CTaskType TASK_TYPE_NORMAL_TASK "ray::TaskType::NORMAL_TASK"
@ -179,13 +179,13 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef extern from "src/ray/protobuf/common.pb.h" nogil:
cdef CPlacementStrategy PLACEMENT_STRATEGY_PACK \
"ray::PlacementStrategy::PACK"
"ray::core::PlacementStrategy::PACK"
cdef CPlacementStrategy PLACEMENT_STRATEGY_SPREAD \
"ray::PlacementStrategy::SPREAD"
"ray::core::PlacementStrategy::SPREAD"
cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_PACK \
"ray::PlacementStrategy::STRICT_PACK"
"ray::core::PlacementStrategy::STRICT_PACK"
cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_SPREAD \
"ray::PlacementStrategy::STRICT_SPREAD"
"ray::core::PlacementStrategy::STRICT_SPREAD"
cdef extern from "ray/common/task/scheduling_resources.h" nogil:
cdef cppclass ResourceSet "ray::ResourceSet":
@ -230,7 +230,7 @@ cdef extern from "ray/common/ray_object.h" nogil:
c_bool IsInPlasmaError() const
cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CRayFunction "ray::RayFunction":
cdef cppclass CRayFunction "ray::core::RayFunction":
CRayFunction()
CRayFunction(CLanguage language,
const CFunctionDescriptor &function_descriptor)
@ -247,7 +247,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CTaskArgByValue "ray::TaskArgByValue":
CTaskArgByValue(const shared_ptr[CRayObject] &data)
cdef cppclass CTaskOptions "ray::TaskOptions":
cdef cppclass CTaskOptions "ray::core::TaskOptions":
CTaskOptions()
CTaskOptions(c_string name, int num_returns,
unordered_map[c_string, double] &resources)
@ -258,7 +258,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
const unordered_map[c_string, c_string]
&override_environment_variables)
cdef cppclass CActorCreationOptions "ray::ActorCreationOptions":
cdef cppclass CActorCreationOptions "ray::core::ActorCreationOptions":
CActorCreationOptions()
CActorCreationOptions(
int64_t max_restarts,
@ -276,7 +276,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
&override_environment_variables)
cdef cppclass CPlacementGroupCreationOptions \
"ray::PlacementGroupCreationOptions":
"ray::core::PlacementGroupCreationOptions":
CPlacementGroupCreationOptions()
CPlacementGroupCreationOptions(
const c_string &name,
@ -285,7 +285,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
c_bool is_detached
)
cdef cppclass CObjectLocation "ray::ObjectLocation":
cdef cppclass CObjectLocation "ray::core::ObjectLocation":
const CNodeID &GetPrimaryNodeID() const
const uint64_t GetObjectSize() const
const c_vector[CNodeID] &GetNodeIDs() const

View file

@ -62,38 +62,38 @@ ctypedef void (*plasma_callback_function) \
ctypedef shared_ptr[const CActorHandle] ActorHandleSharedPtr
cdef extern from "ray/core_worker/profiling.h" nogil:
cdef cppclass CProfiler "ray::worker::Profiler":
cdef cppclass CProfiler "ray::core::worker::Profiler":
void Start()
cdef cppclass CProfileEvent "ray::worker::ProfileEvent":
cdef cppclass CProfileEvent "ray::core::worker::ProfileEvent":
CProfileEvent(const shared_ptr[CProfiler] profiler,
const c_string &event_type)
void SetExtraData(const c_string &extra_data)
cdef extern from "ray/core_worker/profiling.h" nogil:
cdef cppclass CProfileEvent "ray::worker::ProfileEvent":
cdef cppclass CProfileEvent "ray::core::worker::ProfileEvent":
void SetExtraData(const c_string &extra_data)
cdef extern from "ray/core_worker/fiber.h" nogil:
cdef cppclass CFiberEvent "ray::FiberEvent":
cdef cppclass CFiberEvent "ray::core::FiberEvent":
CFiberEvent()
void Wait()
void Notify()
cdef extern from "ray/core_worker/context.h" nogil:
cdef cppclass CWorkerContext "ray::WorkerContext":
cdef cppclass CWorkerContext "ray::core::WorkerContext":
c_bool CurrentActorIsAsync()
const c_string &GetCurrentSerializedRuntimeEnv()
cdef extern from "ray/core_worker/core_worker.h" nogil:
cdef cppclass CActorHandle "ray::ActorHandle":
cdef cppclass CActorHandle "ray::core::ActorHandle":
CActorID GetActorID() const
CJobID CreationJobID() const
CLanguage ActorLanguage() const
CFunctionDescriptor ActorCreationTaskFunctionDescriptor() const
c_string ExtensionData() const
cdef cppclass CCoreWorker "ray::CoreWorker":
cdef cppclass CCoreWorker "ray::core::CoreWorker":
void ConnectToRaylet()
CWorkerType GetWorkerType()
CLanguage GetLanguage()
@ -248,7 +248,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int64_t GetNumLeasesRequested() const
cdef cppclass CCoreWorkerOptions "ray::CoreWorkerOptions":
cdef cppclass CCoreWorkerOptions "ray::core::CoreWorkerOptions":
CWorkerType worker_type
CLanguage language
c_string store_socket
@ -307,7 +307,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int runtime_env_hash
int worker_shim_pid
cdef cppclass CCoreWorkerProcess "ray::CoreWorkerProcess":
cdef cppclass CCoreWorkerProcess "ray::core::CoreWorkerProcess":
@staticmethod
void Initialize(const CCoreWorkerOptions &options)
# Only call this in CoreWorker.__cinit__,

View file

@ -6,7 +6,7 @@ import json
import traceback
cdef class ProfileEvent:
"""Cython wrapper class of C++ `ray::worker::ProfileEvent`."""
"""Cython wrapper class of C++ `ray::core::worker::ProfileEvent`."""
cdef:
unique_ptr[CProfileEvent] inner
object extra_data

View file

@ -16,15 +16,16 @@
#include <memory>
namespace {
namespace ray {
namespace core {
ray::rpc::ActorHandle CreateInnerActorHandle(
rpc::ActorHandle CreateInnerActorHandle(
const class ActorID &actor_id, const TaskID &owner_id,
const ray::rpc::Address &owner_address, const class JobID &job_id,
const rpc::Address &owner_address, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries) {
ray::rpc::ActorHandle inner;
rpc::ActorHandle inner;
inner.set_actor_id(actor_id.Data(), actor_id.Size());
inner.set_owner_id(owner_id.Binary());
inner.mutable_owner_address()->CopyFrom(owner_address);
@ -38,15 +39,15 @@ ray::rpc::ActorHandle CreateInnerActorHandle(
return inner;
}
ray::rpc::ActorHandle CreateInnerActorHandleFromString(const std::string &serialized) {
ray::rpc::ActorHandle inner;
rpc::ActorHandle CreateInnerActorHandleFromString(const std::string &serialized) {
rpc::ActorHandle inner;
inner.ParseFromString(serialized);
return inner;
}
ray::rpc::ActorHandle CreateInnerActorHandleFromActorTableData(
const ray::rpc::ActorTableData &actor_table_data) {
ray::rpc::ActorHandle inner;
rpc::ActorHandle CreateInnerActorHandleFromActorTableData(
const rpc::ActorTableData &actor_table_data) {
rpc::ActorHandle inner;
inner.set_actor_id(actor_table_data.actor_id());
inner.set_owner_id(actor_table_data.parent_id());
inner.mutable_owner_address()->CopyFrom(actor_table_data.owner_address());
@ -54,7 +55,7 @@ ray::rpc::ActorHandle CreateInnerActorHandleFromActorTableData(
inner.set_actor_language(actor_table_data.task_spec().language());
inner.mutable_actor_creation_task_function_descriptor()->CopyFrom(
actor_table_data.task_spec().function_descriptor());
ray::TaskSpecification task_spec(actor_table_data.task_spec());
TaskSpecification task_spec(actor_table_data.task_spec());
inner.set_actor_cursor(task_spec.ReturnId(0).Binary());
inner.set_extension_data(
actor_table_data.task_spec().actor_creation_task_spec().extension_data());
@ -63,15 +64,11 @@ ray::rpc::ActorHandle CreateInnerActorHandleFromActorTableData(
return inner;
}
} // namespace
namespace ray {
ActorHandle::ActorHandle(
const class ActorID &actor_id, const TaskID &owner_id,
const rpc::Address &owner_address, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries)
: ActorHandle(CreateInnerActorHandle(
actor_id, owner_id, owner_address, job_id, initial_cursor, actor_language,
@ -106,4 +103,5 @@ void ActorHandle::SetResubmittedActorTaskSpec(TaskSpecification &spec,
void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); }
} // namespace core
} // namespace ray

View file

@ -24,17 +24,18 @@
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace core {
class ActorHandle {
public:
ActorHandle(ray::rpc::ActorHandle inner)
ActorHandle(rpc::ActorHandle inner)
: inner_(inner), actor_cursor_(ObjectID::FromBinary(inner_.actor_cursor())) {}
// Constructs a new ActorHandle as part of the actor creation process.
ActorHandle(const ActorID &actor_id, const TaskID &owner_id,
const rpc::Address &owner_address, const JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data, int64_t max_task_retries);
/// Constructs an ActorHandle from a serialized string.
@ -55,8 +56,8 @@ class ActorHandle {
Language ActorLanguage() const { return inner_.actor_language(); };
ray::FunctionDescriptor ActorCreationTaskFunctionDescriptor() const {
return ray::FunctionDescriptorBuilder::FromProto(
FunctionDescriptor ActorCreationTaskFunctionDescriptor() const {
return FunctionDescriptorBuilder::FromProto(
inner_.actor_creation_task_function_descriptor());
};
@ -84,7 +85,7 @@ class ActorHandle {
private:
// Protobuf-defined persistent state of the actor handle.
const ray::rpc::ActorHandle inner_;
const rpc::ActorHandle inner_;
/// The unique id of the dummy object returned by the previous task.
/// TODO: This can be removed once we schedule actor tasks by task counter
@ -100,4 +101,5 @@ class ActorHandle {
FRIEND_TEST(ZeroNodeTest, TestActorHandle);
};
} // namespace core
} // namespace ray

View file

@ -17,6 +17,7 @@
#include "ray/gcs/pb_util.h"
namespace ray {
namespace core {
ActorID ActorManager::RegisterActorHandle(std::unique_ptr<ActorHandle> actor_handle,
const ObjectID &outer_object_id,
@ -179,4 +180,5 @@ std::vector<ObjectID> ActorManager::GetActorHandleIDsFromHandles() {
return actor_handle_ids;
}
} // namespace core
} // namespace ray

View file

@ -21,6 +21,7 @@
#include "ray/gcs/gcs_client.h"
namespace ray {
namespace core {
class ActorCreatorInterface {
public:
@ -202,4 +203,5 @@ class ActorManager {
GUARDED_BY(mutex_);
};
} // namespace core
} // namespace ray

View file

@ -15,6 +15,7 @@
#include "ray/core_worker/common.h"
namespace ray {
namespace core {
std::string WorkerTypeString(WorkerType type) {
// TODO(suquark): Use proto3 utils to get the string.
@ -45,4 +46,5 @@ std::string LanguageString(Language language) {
return "";
}
} // namespace core
} // namespace ray

View file

@ -23,6 +23,7 @@
#include "ray/util/util.h"
namespace ray {
namespace core {
using WorkerType = rpc::WorkerType;
@ -36,18 +37,16 @@ std::string LanguageString(Language language);
class RayFunction {
public:
RayFunction() {}
RayFunction(Language language, const ray::FunctionDescriptor &function_descriptor)
RayFunction(Language language, const FunctionDescriptor &function_descriptor)
: language_(language), function_descriptor_(function_descriptor) {}
Language GetLanguage() const { return language_; }
const ray::FunctionDescriptor &GetFunctionDescriptor() const {
return function_descriptor_;
}
const FunctionDescriptor &GetFunctionDescriptor() const { return function_descriptor_; }
private:
Language language_;
ray::FunctionDescriptor function_descriptor_;
FunctionDescriptor function_descriptor_;
};
/// Options for all tasks (actor and non-actor) except for actor creation.
@ -223,4 +222,5 @@ class ObjectLocation {
const NodeID spilled_node_id_;
};
} // namespace core
} // namespace ray

View file

@ -15,6 +15,7 @@
#include "ray/core_worker/context.h"
namespace ray {
namespace core {
/// per-thread context for core worker.
struct WorkerThreadContext {
@ -254,4 +255,5 @@ WorkerThreadContext &WorkerContext::GetThreadContext() {
return *thread_context_;
}
} // namespace core
} // namespace ray

View file

@ -20,6 +20,7 @@
#include "ray/core_worker/common.h"
namespace ray {
namespace core {
struct WorkerThreadContext;
@ -111,4 +112,5 @@ class WorkerContext {
static thread_local std::unique_ptr<WorkerThreadContext> thread_context_;
};
} // namespace core
} // namespace ray

View file

@ -26,20 +26,19 @@
#include "ray/util/util.h"
namespace ray {
namespace {
namespace core {
// Duration between internal book-keeping heartbeats.
const uint64_t kInternalHeartbeatMillis = 1000;
void BuildCommonTaskSpec(
ray::TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
const std::string name, const TaskID &current_task_id, const uint64_t task_index,
const TaskID &caller_id, const ray::rpc::Address &address,
const ray::RayFunction &function,
const std::vector<std::unique_ptr<ray::TaskArg>> &args, uint64_t num_returns,
const TaskID &caller_id, const rpc::Address &address, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
std::vector<ObjectID> *return_ids, const ray::BundleID &bundle_id,
std::vector<ObjectID> *return_ids, const BundleID &bundle_id,
bool placement_group_capture_child_tasks, const std::string debugger_breakpoint,
const std::string &serialized_runtime_env,
const std::unordered_map<std::string, std::string> &override_environment_variables,
@ -63,18 +62,18 @@ void BuildCommonTaskSpec(
}
}
ray::JobID GetProcessJobID(const ray::CoreWorkerOptions &options) {
if (options.worker_type == ray::WorkerType::DRIVER) {
JobID GetProcessJobID(const CoreWorkerOptions &options) {
if (options.worker_type == WorkerType::DRIVER) {
RAY_CHECK(!options.job_id.IsNil());
} else {
RAY_CHECK(options.job_id.IsNil());
}
if (options.worker_type == ray::WorkerType::WORKER) {
if (options.worker_type == WorkerType::WORKER) {
// For workers, the job ID is assigned by Raylet via an environment variable.
const char *job_id_env = std::getenv(kEnvVarKeyJobId);
RAY_CHECK(job_id_env);
return ray::JobID::FromHex(job_id_env);
return JobID::FromHex(job_id_env);
}
return options.job_id;
}
@ -93,7 +92,6 @@ ObjectLocation CreateObjectLocation(const rpc::GetObjectLocationsOwnerReply &rep
object_info.spilled_url(),
NodeID::FromBinary(object_info.spilled_node_id()));
}
} // namespace
/// The global instance of `CoreWorkerProcess`.
std::unique_ptr<CoreWorkerProcess> core_worker_process;
@ -175,13 +173,13 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options)
// by all of core worker.
RAY_LOG(DEBUG) << "Stats setup in core worker.";
// Initialize stats in core worker global tags.
const ray::stats::TagsType global_tags = {{ray::stats::ComponentKey, "core_worker"},
{ray::stats::VersionKey, "2.0.0.dev0"}};
const stats::TagsType global_tags = {{stats::ComponentKey, "core_worker"},
{stats::VersionKey, "2.0.0.dev0"}};
// NOTE(lingxuan.zlx): We assume RayConfig is initialized before it's used.
// RayConfig is generated in Java_io_ray_runtime_RayNativeRuntime_nativeInitialize
// for java worker or in constructor of CoreWorker for python worker.
ray::stats::Init(global_tags, options_.metrics_agent_port);
stats::Init(global_tags, options_.metrics_agent_port);
#ifndef _WIN32
// NOTE(kfstorm): std::atexit should be put at the end of `CoreWorkerProcess`
@ -199,7 +197,7 @@ CoreWorkerProcess::~CoreWorkerProcess() {
RAY_LOG(INFO) << "Destructing CoreWorkerProcess. pid: " << getpid();
RAY_LOG(DEBUG) << "Stats stop in core worker.";
// Shutdown stats module if worker process exits.
ray::stats::Shutdown();
stats::Shutdown();
if (options_.enable_logging) {
RayLog::ShutDownRayLog();
}
@ -459,7 +457,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
options_.gcs_options.password_,
/*enable_sync_conn=*/false, /*enable_async_conn=*/false,
/*enable_subscribe_conn=*/true);
gcs_client_ = std::make_shared<ray::gcs::ServiceBasedGcsClient>(
gcs_client_ = std::make_shared<gcs::ServiceBasedGcsClient>(
gcs_options, [this](std::pair<std::string, int> *address) {
absl::MutexLock lock(&gcs_server_address_mutex_);
if (gcs_server_address_.second != 0) {
@ -512,7 +510,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
new rpc::CoreWorkerClient(addr, *client_call_manager_));
});
if (options_.worker_type == ray::WorkerType::WORKER) {
if (options_.worker_type == WorkerType::WORKER) {
periodical_runner_.RunFnPeriodically(
[this] { CheckForRayletFailure(); },
RayConfig::instance().raylet_death_check_interval_milliseconds());
@ -522,9 +520,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
options_.store_socket, local_raylet_client_, reference_counter_,
options_.check_signals,
/*warmup=*/
(options_.worker_type != ray::WorkerType::SPILL_WORKER &&
options_.worker_type != ray::WorkerType::RESTORE_WORKER &&
options_.worker_type != ray::WorkerType::UTIL_WORKER),
(options_.worker_type != WorkerType::SPILL_WORKER &&
options_.worker_type != WorkerType::RESTORE_WORKER &&
options_.worker_type != WorkerType::UTIL_WORKER),
/*get_current_call_site=*/boost::bind(&CoreWorker::CurrentCallSite, this)));
memory_store_.reset(new CoreWorkerMemoryStore(
[this](const RayObject &object, const ObjectID &object_id) {
@ -3228,4 +3226,5 @@ std::shared_ptr<gcs::GcsClient> CoreWorker::GetGcsClient() const { return gcs_cl
bool CoreWorker::IsExiting() const { return exiting_; }
} // namespace core
} // namespace ray

View file

@ -54,6 +54,7 @@
/// 4) Add a method to the CoreWorker class below: "CoreWorker::HandleExampleCall"
namespace ray {
namespace core {
class CoreWorker;
@ -1422,4 +1423,5 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
std::unique_ptr<rpc::JobConfig> job_config_;
};
} // namespace core
} // namespace ray

View file

@ -18,6 +18,7 @@
#include "ray/util/logging.h"
namespace ray {
namespace core {
/// Used by async actor mode. The fiber event will be used
/// from python to switch control among different coroutines.
@ -141,4 +142,5 @@ class FiberState {
std::thread fiber_runner_thread_;
};
} // namespace core
} // namespace ray

View file

@ -15,6 +15,7 @@
#include "ray/core_worker/future_resolver.h"
namespace ray {
namespace core {
void FutureResolver::ResolveFutureAsync(const ObjectID &object_id,
const rpc::Address &owner_address) {
@ -94,4 +95,5 @@ void FutureResolver::ProcessResolvedObject(const ObjectID &object_id,
}
}
} // namespace core
} // namespace ray

View file

@ -24,6 +24,7 @@
#include "src/ray/protobuf/core_worker.pb.h"
namespace ray {
namespace core {
using ReportLocalityDataCallback =
std::function<void(const ObjectID &, const absl::flat_hash_set<NodeID> &, uint64_t)>;
@ -76,4 +77,5 @@ class FutureResolver {
const rpc::Address rpc_address_;
};
} // namespace core
} // namespace ray

View file

@ -15,6 +15,7 @@
#include "ray/core_worker/gcs_server_address_updater.h"
namespace ray {
namespace core {
GcsServerAddressUpdater::GcsServerAddressUpdater(
const std::string raylet_ip_address, const int port,
@ -67,4 +68,5 @@ void GcsServerAddressUpdater::UpdateGcsServerAddress() {
});
}
} // namespace core
} // namespace ray

View file

@ -19,6 +19,7 @@
#include "ray/raylet_client/raylet_client.h"
namespace ray {
namespace core {
class GcsServerAddressUpdater {
public:
@ -46,4 +47,5 @@ class GcsServerAddressUpdater {
int32_t failed_ping_count_ = 0;
};
} // namespace core
} // namespace ray

View file

@ -15,6 +15,7 @@
#include "ray/core_worker/lease_policy.h"
namespace ray {
namespace core {
rpc::Address LocalityAwareLeasePolicy::GetBestNodeForTask(const TaskSpecification &spec) {
if (auto node_id = GetBestNodeIdForTask(spec)) {
@ -58,4 +59,5 @@ rpc::Address LocalLeasePolicy::GetBestNodeForTask(const TaskSpecification &spec)
return local_node_rpc_address_;
}
} // namespace core
} // namespace ray

View file

@ -22,6 +22,7 @@
#include "src/ray/protobuf/common.pb.h"
namespace ray {
namespace core {
struct LocalityData {
uint64_t object_size;
@ -95,4 +96,5 @@ class LocalLeasePolicy : public LeasePolicyInterface {
const rpc::Address local_node_rpc_address_;
};
} // namespace core
} // namespace ray

View file

@ -29,22 +29,21 @@ jobject java_task_executor = nullptr;
/// Store Java instances of function descriptor in the cache to avoid unnessesary JNI
/// operations.
thread_local std::unordered_map<size_t,
std::vector<std::pair<ray::FunctionDescriptor, jobject>>>
std::vector<std::pair<FunctionDescriptor, jobject>>>
executor_function_descriptor_cache;
inline ray::gcs::GcsClientOptions ToGcsClientOptions(JNIEnv *env,
jobject gcs_client_options) {
inline gcs::GcsClientOptions ToGcsClientOptions(JNIEnv *env, jobject gcs_client_options) {
std::string ip = JavaStringToNativeString(
env, (jstring)env->GetObjectField(gcs_client_options, java_gcs_client_options_ip));
int port = env->GetIntField(gcs_client_options, java_gcs_client_options_port);
std::string password = JavaStringToNativeString(
env,
(jstring)env->GetObjectField(gcs_client_options, java_gcs_client_options_password));
return ray::gcs::GcsClientOptions(ip, port, password);
return gcs::GcsClientOptions(ip, port, password);
}
jobject ToJavaArgs(JNIEnv *env, jbooleanArray java_check_results,
const std::vector<std::shared_ptr<ray::RayObject>> &args) {
const std::vector<std::shared_ptr<RayObject>> &args) {
if (java_check_results == nullptr) {
// If `java_check_results` is null, it means that `checkByteBufferArguments`
// failed. In this case, just return null here. The args won't be used anyway.
@ -52,10 +51,10 @@ jobject ToJavaArgs(JNIEnv *env, jbooleanArray java_check_results,
} else {
jboolean *check_results = env->GetBooleanArrayElements(java_check_results, nullptr);
size_t i = 0;
jobject args_array_list = NativeVectorToJavaList<std::shared_ptr<ray::RayObject>>(
jobject args_array_list = NativeVectorToJavaList<std::shared_ptr<RayObject>>(
env, args,
[check_results, &i](JNIEnv *env,
const std::shared_ptr<ray::RayObject> &native_object) {
const std::shared_ptr<RayObject> &native_object) {
if (*(check_results + (i++))) {
// If the type of this argument is ByteBuffer, we create a
// DirectByteBuffer here To avoid data copy.
@ -95,14 +94,13 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
jobject gcsClientOptions, jint numWorkersPerProcess, jstring logDir,
jbyteArray jobConfig) {
auto task_execution_callback =
[](ray::TaskType task_type, const std::string task_name,
const ray::RayFunction &ray_function,
[](TaskType task_type, const std::string task_name, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
const std::vector<ObjectID> &return_ids, const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb) {
std::vector<std::shared_ptr<RayObject>> *results,
std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb) {
JNIEnv *env = GetJNIEnv();
RAY_CHECK(java_task_executor);
@ -139,13 +137,13 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
// Check whether the exception is `IntentionalSystemExit`.
jthrowable throwable = env->ExceptionOccurred();
if (throwable) {
ray::Status status_to_return = ray::Status::OK();
Status status_to_return = Status::OK();
if (env->IsInstanceOf(throwable,
java_ray_intentional_system_exit_exception_class)) {
status_to_return = ray::Status::IntentionalSystemExit();
status_to_return = Status::IntentionalSystemExit();
} else if (env->IsInstanceOf(throwable, java_ray_actor_exception_class)) {
creation_task_exception_pb = SerializeActorCreationException(env, throwable);
status_to_return = ray::Status::CreationTaskError();
status_to_return = Status::CreationTaskError();
} else {
RAY_LOG(ERROR) << "Unkown java exception was thrown while executing tasks.";
}
@ -157,8 +155,8 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
int64_t task_output_inlined_bytes = 0;
// Process return objects.
if (!return_ids.empty()) {
std::vector<std::shared_ptr<ray::RayObject>> return_objects;
JavaListToNativeVector<std::shared_ptr<ray::RayObject>>(
std::vector<std::shared_ptr<RayObject>> return_objects;
JavaListToNativeVector<std::shared_ptr<RayObject>>(
env, java_return_objects, &return_objects,
[](JNIEnv *env, jobject java_native_ray_object) {
return JavaNativeRayObjectToNativeRayObject(env, java_native_ray_object);
@ -172,7 +170,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
auto &contained_object_id = return_objects[i]->GetNestedIds();
auto result_ptr = &(*results)[0];
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
result_id, data_size, metadata, contained_object_id,
task_output_inlined_bytes, result_ptr));
@ -185,15 +183,15 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
}
}
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().SealReturnObject(
result_id, result));
RAY_CHECK_OK(
CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
}
}
env->DeleteLocalRef(java_check_results);
env->DeleteLocalRef(java_return_objects);
env->DeleteLocalRef(args_array_list);
return ray::Status::OK();
return Status::OK();
};
auto gc_collect = []() {
@ -216,9 +214,9 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
}
};
auto on_worker_shutdown = [](const ray::WorkerID &worker_id) {
auto on_worker_shutdown = [](const WorkerID &worker_id) {
JNIEnv *env = GetJNIEnv();
auto worker_id_bytes = IdToJavaByteArray<ray::WorkerID>(env, worker_id);
auto worker_id_bytes = IdToJavaByteArray<WorkerID>(env, worker_id);
if (java_task_executor) {
env->CallVoidMethod(java_task_executor,
java_native_task_executor_on_worker_shutdown, worker_id_bytes);
@ -228,12 +226,12 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
std::string serialized_job_config =
(jobConfig == nullptr ? "" : JavaByteArrayToNativeString(env, jobConfig));
ray::CoreWorkerOptions options;
options.worker_type = static_cast<ray::WorkerType>(workerMode);
options.language = ray::Language::JAVA;
CoreWorkerOptions options;
options.worker_type = static_cast<WorkerType>(workerMode);
options.language = Language::JAVA;
options.store_socket = JavaStringToNativeString(env, storeSocket);
options.raylet_socket = JavaStringToNativeString(env, rayletSocket);
options.job_id = JavaByteArrayToId<ray::JobID>(env, jobId);
options.job_id = JavaByteArrayToId<JobID>(env, jobId);
options.gcs_options = ToGcsClientOptions(env, gcsClientOptions);
options.enable_logging = true;
options.log_dir = JavaStringToNativeString(env, logDir);
@ -251,13 +249,13 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
options.serialized_job_config = serialized_job_config;
options.metrics_agent_port = -1;
ray::CoreWorkerProcess::Initialize(options);
CoreWorkerProcess::Initialize(options);
}
JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeRunTaskExecutor(
JNIEnv *env, jclass o, jobject javaTaskExecutor) {
java_task_executor = javaTaskExecutor;
ray::CoreWorkerProcess::RunTaskExecutionLoop();
CoreWorkerProcess::RunTaskExecutionLoop();
java_task_executor = nullptr;
// NOTE(kfstorm): It's possible that users spawn non-daemon Java threads. If these
@ -271,7 +269,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeRunTaskExecuto
JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeShutdown(JNIEnv *env,
jclass o) {
ray::CoreWorkerProcess::Shutdown();
CoreWorkerProcess::Shutdown();
}
JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeSetResource(
@ -279,7 +277,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeSetResource(
const auto node_id = JavaByteArrayToId<NodeID>(env, nodeId);
const char *native_resource_name = env->GetStringUTFChars(resourceName, JNI_FALSE);
auto status = ray::CoreWorkerProcess::GetCoreWorker().SetResource(
auto status = CoreWorkerProcess::GetCoreWorker().SetResource(
native_resource_name, static_cast<double>(capacity), node_id);
env->ReleaseStringUTFChars(resourceName, native_resource_name);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
@ -292,21 +290,21 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetActorIdOfNamedActor(JNIEnv *env, j
const char *native_actor_name = env->GetStringUTFChars(actor_name, JNI_FALSE);
auto full_name = GetFullName(global, native_actor_name);
const auto actor_handle = ray::CoreWorkerProcess::GetCoreWorker()
const auto actor_handle = CoreWorkerProcess::GetCoreWorker()
.GetNamedActorHandle(full_name, /*ray_namespace=*/"")
.first;
ray::ActorID actor_id;
ActorID actor_id;
if (actor_handle) {
actor_id = actor_handle->GetActorID();
} else {
actor_id = ray::ActorID::Nil();
actor_id = ActorID::Nil();
}
return IdToJavaByteArray<ray::ActorID>(env, actor_id);
return IdToJavaByteArray<ActorID>(env, actor_id);
}
JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeKillActor(
JNIEnv *env, jclass, jbyteArray actorId, jboolean noRestart) {
auto status = ray::CoreWorkerProcess::GetCoreWorker().KillActor(
auto status = CoreWorkerProcess::GetCoreWorker().KillActor(
JavaByteArrayToId<ActorID>(env, actorId),
/*force_kill=*/true, noRestart);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
@ -314,8 +312,8 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeKillActor(
JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeSetCoreWorker(
JNIEnv *env, jclass, jbyteArray workerId) {
const auto worker_id = JavaByteArrayToId<ray::WorkerID>(env, workerId);
ray::CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id);
const auto worker_id = JavaByteArrayToId<WorkerID>(env, workerId);
CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id);
}
#ifdef __cplusplus

View file

@ -28,28 +28,28 @@ extern "C" {
JNIEXPORT jint JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeGetLanguage(
JNIEnv *env, jclass o, jbyteArray actorId) {
auto actor_id = JavaByteArrayToId<ray::ActorID>(env, actorId);
auto actor_id = JavaByteArrayToId<ActorID>(env, actorId);
const auto native_actor_handle =
ray::CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id);
CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id);
return native_actor_handle->ActorLanguage();
}
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_actor_NativeActorHandle_nativeGetActorCreationTaskFunctionDescriptor(
JNIEnv *env, jclass o, jbyteArray actorId) {
auto actor_id = JavaByteArrayToId<ray::ActorID>(env, actorId);
auto actor_id = JavaByteArrayToId<ActorID>(env, actorId);
const auto native_actor_handle =
ray::CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id);
CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id);
auto function_descriptor = native_actor_handle->ActorCreationTaskFunctionDescriptor();
return NativeRayFunctionDescriptorToJavaStringList(env, function_descriptor);
}
JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeSerialize(
JNIEnv *env, jclass o, jbyteArray actorId) {
auto actor_id = JavaByteArrayToId<ray::ActorID>(env, actorId);
auto actor_id = JavaByteArrayToId<ActorID>(env, actorId);
std::string output;
ObjectID actor_handle_id;
ray::Status status = ray::CoreWorkerProcess::GetCoreWorker().SerializeActorHandle(
Status status = CoreWorkerProcess::GetCoreWorker().SerializeActorHandle(
actor_id, &output, &actor_handle_id);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
return NativeStringToJavaByteArray(env, output);
@ -61,11 +61,10 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize(JNIEnv *env, jclas
auto buffer = JavaByteArrayToNativeBuffer(env, data);
RAY_CHECK(buffer->Size() > 0);
auto binary = std::string(reinterpret_cast<char *>(buffer->Data()), buffer->Size());
auto actor_id =
ray::CoreWorkerProcess::GetCoreWorker().DeserializeAndRegisterActorHandle(
binary, /*outer_object_id=*/ObjectID::Nil());
auto actor_id = CoreWorkerProcess::GetCoreWorker().DeserializeAndRegisterActorHandle(
binary, /*outer_object_id=*/ObjectID::Nil());
return IdToJavaByteArray<ray::ActorID>(env, actor_id);
return IdToJavaByteArray<ActorID>(env, actor_id);
}
#ifdef __cplusplus

View file

@ -28,8 +28,7 @@ extern "C" {
JNIEXPORT jint JNICALL
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskType(JNIEnv *env,
jclass) {
auto task_spec =
ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentTask();
auto task_spec = CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentTask();
RAY_CHECK(task_spec) << "Current task is not set.";
return static_cast<int>(task_spec->GetMessage().type());
}
@ -37,38 +36,38 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskType(JNIEnv
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskId(JNIEnv *env,
jclass) {
const ray::TaskID &task_id =
ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentTaskID();
return IdToJavaByteBuffer<ray::TaskID>(env, task_id);
const TaskID &task_id =
CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentTaskID();
return IdToJavaByteBuffer<TaskID>(env, task_id);
}
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentJobId(JNIEnv *env,
jclass) {
const auto &job_id =
ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentJobID();
return IdToJavaByteBuffer<ray::JobID>(env, job_id);
CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentJobID();
return IdToJavaByteBuffer<JobID>(env, job_id);
}
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentWorkerId(JNIEnv *env,
jclass) {
const auto &worker_id =
ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetWorkerID();
return IdToJavaByteBuffer<ray::WorkerID>(env, worker_id);
CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetWorkerID();
return IdToJavaByteBuffer<WorkerID>(env, worker_id);
}
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentActorId(JNIEnv *env,
jclass) {
const auto &actor_id =
ray::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID();
return IdToJavaByteBuffer<ray::ActorID>(env, actor_id);
CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID();
return IdToJavaByteBuffer<ActorID>(env, actor_id);
}
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetRpcAddress(JNIEnv *env, jclass) {
const auto &rpc_address = ray::CoreWorkerProcess::GetCoreWorker().GetRpcAddress();
const auto &rpc_address = CoreWorkerProcess::GetCoreWorker().GetRpcAddress();
return NativeStringToJavaByteArray(env, rpc_address.SerializeAsString());
}

View file

@ -28,30 +28,27 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeCreateGlobalStateAccessor(
JNIEnv *env, jobject o, jstring j_redis_address, jstring j_redis_passowrd) {
std::string redis_address = JavaStringToNativeString(env, j_redis_address);
std::string redis_password = JavaStringToNativeString(env, j_redis_passowrd);
ray::gcs::GlobalStateAccessor *gcs_accessor =
new ray::gcs::GlobalStateAccessor(redis_address, redis_password);
gcs::GlobalStateAccessor *gcs_accessor =
new gcs::GlobalStateAccessor(redis_address, redis_password);
return reinterpret_cast<jlong>(gcs_accessor);
}
JNIEXPORT void JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeDestroyGlobalStateAccessor(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
delete gcs_accessor;
}
JNIEXPORT jboolean JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeConnect(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
return gcs_accessor->Connect();
}
JNIEXPORT jobject JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllJobInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto job_info_list = gcs_accessor->GetAllJobInfo();
return NativeVectorToJavaList<std::string>(
env, job_info_list, [](JNIEnv *env, const std::string &str) {
@ -62,17 +59,15 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetA
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNextJobID(JNIEnv *env, jobject o,
jlong gcs_accessor_ptr) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
const auto &job_id = gcs_accessor->GetNextJobID();
return IdToJavaByteArray<ray::JobID>(env, job_id);
return IdToJavaByteArray<JobID>(env, job_id);
}
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *env, jobject o,
jlong gcs_accessor_ptr) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto node_info_list = gcs_accessor->GetAllNodeInfo();
return NativeVectorToJavaList<std::string>(
env, node_info_list, [](JNIEnv *env, const std::string &str) {
@ -83,9 +78,8 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *env, jo
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jbyteArray node_id_bytes) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto node_id = JavaByteArrayToId<ray::NodeID>(env, node_id_bytes);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto node_id = JavaByteArrayToId<NodeID>(env, node_id_bytes);
auto node_resource_info = gcs_accessor->GetNodeResourceInfo(node_id);
return static_cast<jbyteArray>(NativeStringToJavaByteArray(env, node_resource_info));
}
@ -93,8 +87,7 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo(
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllActorInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto actor_info_list = gcs_accessor->GetAllActorInfo();
return NativeVectorToJavaList<std::string>(
env, actor_info_list, [](JNIEnv *env, const std::string &str) {
@ -107,8 +100,7 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetActorInfo(JNIEnv *env, jobj
jlong gcs_accessor_ptr,
jbyteArray actorId) {
const auto actor_id = JavaByteArrayToId<ActorID>(env, actorId);
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto actor_info = gcs_accessor->GetActorInfo(actor_id);
if (actor_info) {
return NativeStringToJavaByteArray(env, *actor_info);
@ -120,9 +112,8 @@ JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jbyteArray placement_group_id_bytes) {
const auto placement_group_id =
JavaByteArrayToId<ray::PlacementGroupID>(env, placement_group_id_bytes);
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
JavaByteArrayToId<PlacementGroupID>(env, placement_group_id_bytes);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto placement_group = gcs_accessor->GetPlacementGroupInfo(placement_group_id);
if (placement_group) {
return NativeStringToJavaByteArray(env, *placement_group);
@ -135,8 +126,7 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfoByName(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jstring name, jboolean global) {
std::string placement_group_name = JavaStringToNativeString(env, name);
auto full_name = GetFullName(global, placement_group_name);
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
// Java doesn't support namespaces.
auto placement_group = gcs_accessor->GetPlacementGroupByName(full_name, "");
if (placement_group) {
@ -148,8 +138,7 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfoByName(
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllPlacementGroupInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto placement_group_info_list = gcs_accessor->GetAllPlacementGroupInfo();
return NativeVectorToJavaList<std::string>(
env, placement_group_info_list, [](JNIEnv *env, const std::string &str) {
@ -162,8 +151,7 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetInternalKV(JNIEnv *env, job
jlong gcs_accessor_ptr,
jstring k) {
std::string key = JavaStringToNativeString(env, k);
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto value = gcs_accessor->GetInternalKV(key);
if (value) {
return NativeStringToJavaByteArray(env, *value);
@ -175,8 +163,7 @@ JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeToConnectForDriver(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jstring nodeIpAddress) {
std::string node_ip_address = JavaStringToNativeString(env, nodeIpAddress);
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
std::string node_to_connect;
auto status =
gcs_accessor->GetNodeToConnectForDriver(node_ip_address, &node_to_connect);

View file

@ -70,7 +70,7 @@ JNIEXPORT jlong JNICALL Java_io_ray_runtime_metric_NativeMetric_registerGaugeNat
std::vector<TagKeyType> tag_keys;
MetricTransform(env, j_name, j_description, j_unit, tag_key_list, &metric_name,
&description, &unit, tag_keys);
auto *gauge = new ray::stats::Gauge(metric_name, description, unit, tag_keys);
auto *gauge = new stats::Gauge(metric_name, description, unit, tag_keys);
return reinterpret_cast<jlong>(gauge);
}
@ -83,7 +83,7 @@ JNIEXPORT jlong JNICALL Java_io_ray_runtime_metric_NativeMetric_registerCountNat
std::vector<TagKeyType> tag_keys;
MetricTransform(env, j_name, j_description, j_unit, tag_key_list, &metric_name,
&description, &unit, tag_keys);
auto *count = new ray::stats::Count(metric_name, description, unit, tag_keys);
auto *count = new stats::Count(metric_name, description, unit, tag_keys);
return reinterpret_cast<jlong>(count);
}
@ -96,7 +96,7 @@ JNIEXPORT jlong JNICALL Java_io_ray_runtime_metric_NativeMetric_registerSumNativ
std::vector<TagKeyType> tag_keys;
MetricTransform(env, j_name, j_description, j_unit, tag_key_list, &metric_name,
&description, &unit, tag_keys);
auto *sum = new ray::stats::Sum(metric_name, description, unit, tag_keys);
auto *sum = new stats::Sum(metric_name, description, unit, tag_keys);
return reinterpret_cast<jlong>(sum);
}
@ -114,22 +114,20 @@ JNIEXPORT jlong JNICALL Java_io_ray_runtime_metric_NativeMetric_registerHistogra
JavaDoubleArrayToNativeDoubleVector(env, j_boundaries, &boundaries);
auto *histogram =
new ray::stats::Histogram(metric_name, description, unit, boundaries, tag_keys);
new stats::Histogram(metric_name, description, unit, boundaries, tag_keys);
return reinterpret_cast<jlong>(histogram);
}
JNIEXPORT void JNICALL Java_io_ray_runtime_metric_NativeMetric_unregisterMetricNative(
JNIEnv *env, jclass obj, jlong metric_native_pointer) {
ray::stats::Metric *metric =
reinterpret_cast<ray::stats::Metric *>(metric_native_pointer);
stats::Metric *metric = reinterpret_cast<stats::Metric *>(metric_native_pointer);
delete metric;
}
JNIEXPORT void JNICALL Java_io_ray_runtime_metric_NativeMetric_recordNative(
JNIEnv *env, jclass obj, jlong metric_native_pointer, jdouble value,
jobject tag_key_list, jobject tag_value_list) {
ray::stats::Metric *metric =
reinterpret_cast<ray::stats::Metric *>(metric_native_pointer);
stats::Metric *metric = reinterpret_cast<stats::Metric *>(metric_native_pointer);
std::vector<std::string> tag_key_str_list;
std::vector<std::string> tag_value_str_list;
JavaStringListToNativeStringVector(env, tag_key_list, &tag_key_str_list);

View file

@ -22,27 +22,26 @@
#include "ray/core_worker/core_worker.h"
#include "ray/gcs/gcs_client/global_state_accessor.h"
ray::Status PutSerializedObject(
JNIEnv *env, jobject obj, ray::ObjectID object_id, ray::ObjectID *out_object_id,
bool pin_object = true,
const std::unique_ptr<ray::rpc::Address> &owner_address = nullptr) {
Status PutSerializedObject(JNIEnv *env, jobject obj, ObjectID object_id,
ObjectID *out_object_id, bool pin_object = true,
const std::unique_ptr<rpc::Address> &owner_address = nullptr) {
auto native_ray_object = JavaNativeRayObjectToNativeRayObject(env, obj);
RAY_CHECK(native_ray_object != nullptr);
size_t data_size = 0;
if (native_ray_object->HasData()) {
data_size = native_ray_object->GetData()->Size();
}
std::shared_ptr<ray::Buffer> data;
ray::Status status;
std::shared_ptr<Buffer> data;
Status status;
if (object_id.IsNil()) {
status = ray::CoreWorkerProcess::GetCoreWorker().CreateOwned(
status = CoreWorkerProcess::GetCoreWorker().CreateOwned(
native_ray_object->GetMetadata(), data_size, native_ray_object->GetNestedIds(),
out_object_id, &data, /*created_by_worker=*/true,
/*owner_address=*/owner_address);
} else {
status = ray::CoreWorkerProcess::GetCoreWorker().CreateExisting(
status = CoreWorkerProcess::GetCoreWorker().CreateExisting(
native_ray_object->GetMetadata(), data_size, object_id,
ray::CoreWorkerProcess::GetCoreWorker().GetRpcAddress(), &data,
CoreWorkerProcess::GetCoreWorker().GetRpcAddress(), &data,
/*created_by_worker=*/true);
*out_object_id = object_id;
}
@ -57,14 +56,14 @@ ray::Status PutSerializedObject(
memcpy(data->Data(), native_ray_object->GetData()->Data(), data->Size());
}
if (object_id.IsNil()) {
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().SealOwned(
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealOwned(
*out_object_id, pin_object, owner_address));
} else {
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().SealExisting(
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealExisting(
*out_object_id, /* pin_object = */ false, owner_address));
}
}
return ray::Status::OK();
return Status::OK();
}
#ifdef __cplusplus
@ -74,20 +73,19 @@ extern "C" {
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativePut__Lio_ray_runtime_object_NativeRayObject_2_3B(
JNIEnv *env, jclass, jobject obj, jbyteArray owner_actor_id_bytes) {
ray::ObjectID object_id;
std::unique_ptr<ray::rpc::Address> owner_address = nullptr;
ObjectID object_id;
std::unique_ptr<rpc::Address> owner_address = nullptr;
if (owner_actor_id_bytes) {
ray::rpc::ActorTableData actor_table_data;
rpc::ActorTableData actor_table_data;
{
/// Get actor info from GCS synchronously.
std::unique_ptr<std::string> serialized_actor_table_data;
std::promise<bool> promise;
auto gcs_client = ray::CoreWorkerProcess::GetCoreWorker().GetGcsClient();
auto gcs_client = CoreWorkerProcess::GetCoreWorker().GetGcsClient();
RAY_CHECK_OK(gcs_client->Actors().AsyncGet(
ActorID::FromBinary(JavaByteArrayToNativeString(env, owner_actor_id_bytes)),
[&serialized_actor_table_data, &promise](
const ray::Status &status,
const boost::optional<ray::rpc::ActorTableData> &result) {
const Status &status, const boost::optional<rpc::ActorTableData> &result) {
RAY_CHECK_OK(status);
if (result) {
serialized_actor_table_data.reset(
@ -98,20 +96,20 @@ Java_io_ray_runtime_object_NativeObjectStore_nativePut__Lio_ray_runtime_object_N
promise.get_future().get();
actor_table_data.ParseFromString(*serialized_actor_table_data);
}
owner_address = std::make_unique<ray::rpc::Address>(actor_table_data.address());
owner_address = std::make_unique<rpc::Address>(actor_table_data.address());
}
auto status = PutSerializedObject(env, obj, /*object_id=*/ray::ObjectID::Nil(),
auto status = PutSerializedObject(env, obj, /*object_id=*/ObjectID::Nil(),
/*out_object_id=*/&object_id, /*pin_object=*/true,
/*owner_address=*/owner_address);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
return IdToJavaByteArray<ray::ObjectID>(env, object_id);
return IdToJavaByteArray<ObjectID>(env, object_id);
}
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativePut___3BLio_ray_runtime_object_NativeRayObject_2(
JNIEnv *env, jclass, jbyteArray objectId, jobject obj) {
auto object_id = JavaByteArrayToId<ray::ObjectID>(env, objectId);
ray::ObjectID dummy_object_id;
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
ObjectID dummy_object_id;
auto status =
PutSerializedObject(env, obj, object_id,
/*out_object_id=*/&dummy_object_id, /*pin_object=*/true);
@ -120,29 +118,28 @@ Java_io_ray_runtime_object_NativeObjectStore_nativePut___3BLio_ray_runtime_objec
JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeGet(
JNIEnv *env, jclass, jobject ids, jlong timeoutMs) {
std::vector<ray::ObjectID> object_ids;
JavaListToNativeVector<ray::ObjectID>(
env, ids, &object_ids, [](JNIEnv *env, jobject id) {
return JavaByteArrayToId<ray::ObjectID>(env, static_cast<jbyteArray>(id));
});
std::vector<std::shared_ptr<ray::RayObject>> results;
auto status = ray::CoreWorkerProcess::GetCoreWorker().Get(object_ids,
(int64_t)timeoutMs, &results);
std::vector<ObjectID> object_ids;
JavaListToNativeVector<ObjectID>(env, ids, &object_ids, [](JNIEnv *env, jobject id) {
return JavaByteArrayToId<ObjectID>(env, static_cast<jbyteArray>(id));
});
std::vector<std::shared_ptr<RayObject>> results;
auto status =
CoreWorkerProcess::GetCoreWorker().Get(object_ids, (int64_t)timeoutMs, &results);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
return NativeVectorToJavaList<std::shared_ptr<ray::RayObject>>(
return NativeVectorToJavaList<std::shared_ptr<RayObject>>(
env, results, NativeRayObjectToJavaNativeRayObject);
}
JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeWait(
JNIEnv *env, jclass, jobject objectIds, jint numObjects, jlong timeoutMs,
jboolean fetch_local) {
std::vector<ray::ObjectID> object_ids;
JavaListToNativeVector<ray::ObjectID>(
std::vector<ObjectID> object_ids;
JavaListToNativeVector<ObjectID>(
env, objectIds, &object_ids, [](JNIEnv *env, jobject id) {
return JavaByteArrayToId<ray::ObjectID>(env, static_cast<jbyteArray>(id));
return JavaByteArrayToId<ObjectID>(env, static_cast<jbyteArray>(id));
});
std::vector<bool> results;
auto status = ray::CoreWorkerProcess::GetCoreWorker().Wait(
auto status = CoreWorkerProcess::GetCoreWorker().Wait(
object_ids, (int)numObjects, (int64_t)timeoutMs, &results, (bool)fetch_local);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
return NativeVectorToJavaList<bool>(env, results, [](JNIEnv *env, const bool &item) {
@ -155,22 +152,21 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeWai
JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeDelete(
JNIEnv *env, jclass, jobject objectIds, jboolean localOnly) {
std::vector<ray::ObjectID> object_ids;
JavaListToNativeVector<ray::ObjectID>(
std::vector<ObjectID> object_ids;
JavaListToNativeVector<ObjectID>(
env, objectIds, &object_ids, [](JNIEnv *env, jobject id) {
return JavaByteArrayToId<ray::ObjectID>(env, static_cast<jbyteArray>(id));
return JavaByteArrayToId<ObjectID>(env, static_cast<jbyteArray>(id));
});
auto status =
ray::CoreWorkerProcess::GetCoreWorker().Delete(object_ids, (bool)localOnly);
auto status = CoreWorkerProcess::GetCoreWorker().Delete(object_ids, (bool)localOnly);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeAddLocalReference(
JNIEnv *env, jclass, jbyteArray workerId, jbyteArray objectId) {
auto worker_id = JavaByteArrayToId<ray::WorkerID>(env, workerId);
auto object_id = JavaByteArrayToId<ray::ObjectID>(env, objectId);
auto core_worker = ray::CoreWorkerProcess::TryGetWorker(worker_id);
auto worker_id = JavaByteArrayToId<WorkerID>(env, workerId);
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
auto core_worker = CoreWorkerProcess::TryGetWorker(worker_id);
RAY_CHECK(core_worker);
core_worker->AddLocalReference(object_id);
}
@ -178,12 +174,12 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeAddLocalReference(
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeRemoveLocalReference(
JNIEnv *env, jclass, jbyteArray workerId, jbyteArray objectId) {
auto worker_id = JavaByteArrayToId<ray::WorkerID>(env, workerId);
auto object_id = JavaByteArrayToId<ray::ObjectID>(env, objectId);
auto worker_id = JavaByteArrayToId<WorkerID>(env, workerId);
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
// We can't control the timing of Java GC, so it's normal that this method is called but
// core worker is shutting down (or already shut down). If we can't get a core worker
// instance here, skip calling the `RemoveLocalReference` method.
auto core_worker = ray::CoreWorkerProcess::TryGetWorker(worker_id);
auto core_worker = CoreWorkerProcess::TryGetWorker(worker_id);
if (core_worker) {
core_worker->RemoveLocalReference(object_id);
}
@ -192,10 +188,10 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeRemoveLocalReference(
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeGetAllReferenceCounts(JNIEnv *env,
jclass) {
auto reference_counts = ray::CoreWorkerProcess::GetCoreWorker().GetAllReferenceCounts();
return NativeMapToJavaMap<ray::ObjectID, std::pair<size_t, size_t>>(
auto reference_counts = CoreWorkerProcess::GetCoreWorker().GetAllReferenceCounts();
return NativeMapToJavaMap<ObjectID, std::pair<size_t, size_t>>(
env, reference_counts,
[](JNIEnv *env, const ray::ObjectID &key) {
[](JNIEnv *env, const ObjectID &key) {
return IdToJavaByteArray<ObjectID>(env, key);
},
[](JNIEnv *env, const std::pair<size_t, size_t> &value) {
@ -211,22 +207,21 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeGetAllReferenceCounts(JNIEnv
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeGetOwnerAddress(JNIEnv *env, jclass,
jbyteArray objectId) {
auto object_id = JavaByteArrayToId<ray::ObjectID>(env, objectId);
const auto &rpc_address =
ray::CoreWorkerProcess::GetCoreWorker().GetOwnerAddress(object_id);
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
const auto &rpc_address = CoreWorkerProcess::GetCoreWorker().GetOwnerAddress(object_id);
return NativeStringToJavaByteArray(env, rpc_address.SerializeAsString());
}
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativePromoteAndGetOwnershipInfo(
JNIEnv *env, jclass, jbyteArray objectId) {
auto object_id = JavaByteArrayToId<ray::ObjectID>(env, objectId);
ray::CoreWorkerProcess::GetCoreWorker().PromoteObjectToPlasma(object_id);
ray::rpc::Address address;
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
CoreWorkerProcess::GetCoreWorker().PromoteObjectToPlasma(object_id);
rpc::Address address;
// TODO(ekl) send serialized object status to Java land.
std::string serialized_object_status;
ray::CoreWorkerProcess::GetCoreWorker().GetOwnershipInfo(object_id, &address,
&serialized_object_status);
CoreWorkerProcess::GetCoreWorker().GetOwnershipInfo(object_id, &address,
&serialized_object_status);
auto address_str = address.SerializeAsString();
auto arr = NativeStringToJavaByteArray(env, address_str);
return arr;
@ -236,18 +231,18 @@ JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeRegisterOwnershipInfoAndResolveFuture(
JNIEnv *env, jclass, jbyteArray objectId, jbyteArray outerObjectId,
jbyteArray ownerAddress) {
auto object_id = JavaByteArrayToId<ray::ObjectID>(env, objectId);
auto outer_objectId = ray::ObjectID::Nil();
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
auto outer_objectId = ObjectID::Nil();
if (outerObjectId != NULL) {
outer_objectId = JavaByteArrayToId<ray::ObjectID>(env, outerObjectId);
outer_objectId = JavaByteArrayToId<ObjectID>(env, outerObjectId);
}
auto ownerAddressStr = JavaByteArrayToNativeString(env, ownerAddress);
ray::rpc::Address address;
rpc::Address address;
address.ParseFromString(ownerAddressStr);
// TODO(ekl) populate serialized object status from Java land.
ray::rpc::GetObjectStatusReply object_status;
rpc::GetObjectStatusReply object_status;
auto serialized_status = object_status.SerializeAsString();
ray::CoreWorkerProcess::GetCoreWorker().RegisterOwnershipInfoAndResolveFuture(
CoreWorkerProcess::GetCoreWorker().RegisterOwnershipInfoAndResolveFuture(
object_id, outer_objectId, address, serialized_status);
}

View file

@ -29,11 +29,11 @@ inline jint GetHashCodeOfJavaObject(JNIEnv *env, jobject java_object) {
}
/// Store C++ instances of ray function in the cache to avoid unnessesary JNI operations.
thread_local std::unordered_map<jint, std::vector<std::pair<jobject, ray::RayFunction>>>
thread_local std::unordered_map<jint, std::vector<std::pair<jobject, RayFunction>>>
submitter_function_descriptor_cache;
inline const ray::RayFunction &ToRayFunction(JNIEnv *env, jobject functionDescriptor,
jint hash) {
inline const RayFunction &ToRayFunction(JNIEnv *env, jobject functionDescriptor,
jint hash) {
auto &fd_vector = submitter_function_descriptor_cache[hash];
for (auto &pair : fd_vector) {
if (env->CallBooleanMethod(pair.first, java_object_equals, functionDescriptor)) {
@ -52,37 +52,35 @@ inline const ray::RayFunction &ToRayFunction(JNIEnv *env, jobject functionDescri
auto language = static_cast<::Language>(
env->CallIntMethod(java_language, java_language_get_number));
RAY_CHECK_JAVA_EXCEPTION(env);
ray::FunctionDescriptor function_descriptor =
ray::FunctionDescriptorBuilder::FromVector(language, function_descriptor_list);
FunctionDescriptor function_descriptor =
FunctionDescriptorBuilder::FromVector(language, function_descriptor_list);
fd_vector.emplace_back(env->NewGlobalRef(functionDescriptor),
ray::RayFunction(language, function_descriptor));
RayFunction(language, function_descriptor));
return fd_vector.back().second;
}
inline std::vector<std::unique_ptr<ray::TaskArg>> ToTaskArgs(JNIEnv *env, jobject args) {
std::vector<std::unique_ptr<ray::TaskArg>> task_args;
JavaListToNativeVector<std::unique_ptr<ray::TaskArg>>(
inline std::vector<std::unique_ptr<TaskArg>> ToTaskArgs(JNIEnv *env, jobject args) {
std::vector<std::unique_ptr<TaskArg>> task_args;
JavaListToNativeVector<std::unique_ptr<TaskArg>>(
env, args, &task_args, [](JNIEnv *env, jobject arg) {
auto java_id = env->GetObjectField(arg, java_function_arg_id);
if (java_id) {
auto java_id_bytes = static_cast<jbyteArray>(
env->CallObjectMethod(java_id, java_base_id_get_bytes));
RAY_CHECK_JAVA_EXCEPTION(env);
auto id = JavaByteArrayToId<ray::ObjectID>(env, java_id_bytes);
auto id = JavaByteArrayToId<ObjectID>(env, java_id_bytes);
auto java_owner_address =
env->GetObjectField(arg, java_function_arg_owner_address);
RAY_CHECK(java_owner_address);
auto owner_address =
JavaProtobufObjectToNativeProtobufObject<ray::rpc::Address>(
env, java_owner_address);
return std::unique_ptr<ray::TaskArg>(
new ray::TaskArgByReference(id, owner_address));
auto owner_address = JavaProtobufObjectToNativeProtobufObject<rpc::Address>(
env, java_owner_address);
return std::unique_ptr<TaskArg>(new TaskArgByReference(id, owner_address));
}
auto java_value =
static_cast<jbyteArray>(env->GetObjectField(arg, java_function_arg_value));
RAY_CHECK(java_value) << "Both id and value of FunctionArg are null.";
auto value = JavaNativeRayObjectToNativeRayObject(env, java_value);
return std::unique_ptr<ray::TaskArg>(new ray::TaskArgByValue(value));
return std::unique_ptr<TaskArg>(new TaskArgByValue(value));
});
return task_args;
}
@ -101,23 +99,23 @@ inline std::unordered_map<std::string, double> ToResources(JNIEnv *env,
});
}
inline std::pair<ray::PlacementGroupID, int64_t> ToPlacementGroupOptions(
JNIEnv *env, jobject callOptions) {
auto placement_group_options = std::make_pair(ray::PlacementGroupID::Nil(), -1);
inline std::pair<PlacementGroupID, int64_t> ToPlacementGroupOptions(JNIEnv *env,
jobject callOptions) {
auto placement_group_options = std::make_pair(PlacementGroupID::Nil(), -1);
auto group = env->GetObjectField(callOptions, java_task_creation_options_group);
if (group) {
auto placement_group_id = env->GetObjectField(group, java_placement_group_id);
auto java_id_bytes = static_cast<jbyteArray>(
env->CallObjectMethod(placement_group_id, java_base_id_get_bytes));
RAY_CHECK_JAVA_EXCEPTION(env);
auto id = JavaByteArrayToId<ray::PlacementGroupID>(env, java_id_bytes);
auto id = JavaByteArrayToId<PlacementGroupID>(env, java_id_bytes);
auto index = env->GetIntField(callOptions, java_task_creation_options_bundle_index);
placement_group_options = std::make_pair(id, index);
}
return placement_group_options;
}
inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptions) {
inline TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptions) {
std::unordered_map<std::string, double> resources;
std::string name = "";
std::string concurrency_group_name = "";
@ -138,20 +136,20 @@ inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject call
}
}
ray::TaskOptions task_options{name, numReturns, resources, concurrency_group_name};
TaskOptions task_options{name, numReturns, resources, concurrency_group_name};
return task_options;
}
inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
jobject actorCreationOptions) {
inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
jobject actorCreationOptions) {
bool global = false;
std::string name = "";
int64_t max_restarts = 0;
std::unordered_map<std::string, double> resources;
std::vector<std::string> dynamic_worker_options;
uint64_t max_concurrency = 1;
auto placement_options = std::make_pair(ray::PlacementGroupID::Nil(), -1);
std::vector<ray::ConcurrencyGroup> concurrency_groups;
auto placement_options = std::make_pair(PlacementGroupID::Nil(), -1);
std::vector<ConcurrencyGroup> concurrency_groups;
if (actorCreationOptions) {
global =
@ -181,7 +179,7 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
auto java_id_bytes = static_cast<jbyteArray>(
env->CallObjectMethod(placement_group_id, java_base_id_get_bytes));
RAY_CHECK_JAVA_EXCEPTION(env);
auto id = JavaByteArrayToId<ray::PlacementGroupID>(env, java_id_bytes);
auto id = JavaByteArrayToId<PlacementGroupID>(env, java_id_bytes);
auto index = env->GetIntField(actorCreationOptions,
java_actor_creation_options_bundle_index);
placement_options = std::make_pair(id, index);
@ -224,7 +222,7 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
// TODO(suquark): support passing namespace for Java. Currently
// there is no use case.
std::string ray_namespace = "";
ray::ActorCreationOptions actor_creation_options{
ActorCreationOptions actor_creation_options{
max_restarts,
0, // TODO: Allow setting max_task_retries from Java.
static_cast<int>(max_concurrency),
@ -243,20 +241,20 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
return actor_creation_options;
}
inline ray::PlacementStrategy ConvertStrategy(jint java_strategy) {
inline PlacementStrategy ConvertStrategy(jint java_strategy) {
switch (java_strategy) {
case 0:
return ray::rpc::PACK;
return rpc::PACK;
case 1:
return ray::rpc::SPREAD;
return rpc::SPREAD;
case 2:
return ray::rpc::STRICT_PACK;
return rpc::STRICT_PACK;
default:
return ray::rpc::STRICT_SPREAD;
return rpc::STRICT_SPREAD;
}
}
inline ray::PlacementGroupCreationOptions ToPlacementGroupCreationOptions(
inline PlacementGroupCreationOptions ToPlacementGroupCreationOptions(
JNIEnv *env, jobject placementGroupCreationOptions) {
// We have make sure the placementGroupCreationOptions is not null in java api.
bool global = env->GetBooleanField(placementGroupCreationOptions,
@ -288,9 +286,8 @@ inline ray::PlacementGroupCreationOptions ToPlacementGroupCreationOptions(
});
});
auto full_name = GetFullName(global, name);
return ray::PlacementGroupCreationOptions(full_name, ConvertStrategy(java_strategy),
bundles,
/*is_detached=*/false);
return PlacementGroupCreationOptions(full_name, ConvertStrategy(java_strategy), bundles,
/*is_detached=*/false);
}
#ifdef __cplusplus
@ -308,7 +305,7 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSub
std::vector<ObjectID> return_ids;
// TODO (kfstorm): Allow setting `max_retries` via `CallOptions`.
ray::CoreWorkerProcess::GetCoreWorker().SubmitTask(
CoreWorkerProcess::GetCoreWorker().SubmitTask(
ray_function, task_args, task_options, &return_ids,
/*max_retries=*/0,
/*placement_options=*/placement_group_options,
@ -333,19 +330,19 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeCreateActor(
auto actor_creation_options = ToActorCreationOptions(env, actorCreationOptions);
ActorID actor_id;
auto status = ray::CoreWorkerProcess::GetCoreWorker().CreateActor(
auto status = CoreWorkerProcess::GetCoreWorker().CreateActor(
ray_function, task_args, actor_creation_options,
/*extension_data*/ "", &actor_id);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
return IdToJavaByteArray<ray::ActorID>(env, actor_id);
return IdToJavaByteArray<ActorID>(env, actor_id);
}
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(
JNIEnv *env, jclass p, jbyteArray actorId, jobject functionDescriptor,
jint functionDescriptorHash, jobject args, jint numReturns, jobject callOptions) {
auto actor_id = JavaByteArrayToId<ray::ActorID>(env, actorId);
auto actor_id = JavaByteArrayToId<ActorID>(env, actorId);
const auto &ray_function =
ToRayFunction(env, functionDescriptor, functionDescriptorHash);
auto task_args = ToTaskArgs(env, args);
@ -353,8 +350,8 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(
auto task_options = ToTaskOptions(env, numReturns, callOptions);
std::vector<ObjectID> return_ids;
ray::CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, ray_function, task_args, task_options, &return_ids);
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, ray_function, task_args,
task_options, &return_ids);
// This is to avoid creating an empty java list and boost performance.
if (return_ids.empty()) {
@ -368,20 +365,20 @@ JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_task_NativeTaskSubmitter_nativeCreatePlacementGroup(
JNIEnv *env, jclass, jobject placementGroupCreationOptions) {
auto options = ToPlacementGroupCreationOptions(env, placementGroupCreationOptions);
ray::PlacementGroupID placement_group_id;
auto status = ray::CoreWorkerProcess::GetCoreWorker().CreatePlacementGroup(
PlacementGroupID placement_group_id;
auto status = CoreWorkerProcess::GetCoreWorker().CreatePlacementGroup(
options, &placement_group_id);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
return IdToJavaByteArray<ray::PlacementGroupID>(env, placement_group_id);
return IdToJavaByteArray<PlacementGroupID>(env, placement_group_id);
}
JNIEXPORT void JNICALL
Java_io_ray_runtime_task_NativeTaskSubmitter_nativeRemovePlacementGroup(
JNIEnv *env, jclass p, jbyteArray placement_group_id_bytes) {
const auto placement_group_id =
JavaByteArrayToId<ray::PlacementGroupID>(env, placement_group_id_bytes);
JavaByteArrayToId<PlacementGroupID>(env, placement_group_id_bytes);
auto status =
ray::CoreWorkerProcess::GetCoreWorker().RemovePlacementGroup(placement_group_id);
CoreWorkerProcess::GetCoreWorker().RemovePlacementGroup(placement_group_id);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}
@ -389,8 +386,8 @@ JNIEXPORT jboolean JNICALL
Java_io_ray_runtime_task_NativeTaskSubmitter_nativeWaitPlacementGroupReady(
JNIEnv *env, jclass p, jbyteArray placement_group_id_bytes, jint timeout_seconds) {
const auto placement_group_id =
JavaByteArrayToId<ray::PlacementGroupID>(env, placement_group_id_bytes);
auto status = ray::CoreWorkerProcess::GetCoreWorker().WaitPlacementGroupReady(
JavaByteArrayToId<PlacementGroupID>(env, placement_group_id_bytes);
auto status = CoreWorkerProcess::GetCoreWorker().WaitPlacementGroupReady(
placement_group_id, timeout_seconds);
if (status.IsNotFound()) {
env->ThrowNew(java_ray_exception_class, status.message().c_str());

View file

@ -24,6 +24,9 @@
#include "ray/common/ray_object.h"
#include "ray/core_worker/core_worker.h"
using namespace ray;
using namespace ray::core;
/// Boolean class
extern jclass java_boolean_class;
/// Constructor of Boolean class
@ -276,7 +279,7 @@ extern JavaVM *jvm;
/// Represents a byte buffer of Java byte array.
/// The destructor will automatically call ReleaseByteArrayElements.
/// NOTE: Instances of this class cannot be used across threads.
class JavaByteArrayBuffer : public ray::Buffer {
class JavaByteArrayBuffer : public Buffer {
public:
JavaByteArrayBuffer(JNIEnv *env, jbyteArray java_byte_array)
: env_(env), java_byte_array_(java_byte_array) {
@ -488,9 +491,9 @@ inline jobject NativeMapToJavaMap(
return java_map;
}
/// Convert a C++ ray::Buffer to a Java byte array.
/// Convert a C++ Buffer to a Java byte array.
inline jbyteArray NativeBufferToJavaByteArray(JNIEnv *env,
const std::shared_ptr<ray::Buffer> buffer) {
const std::shared_ptr<Buffer> buffer) {
if (!buffer) {
return nullptr;
}
@ -511,9 +514,9 @@ inline std::shared_ptr<JavaByteArrayBuffer> JavaByteArrayToNativeBuffer(
return std::make_shared<JavaByteArrayBuffer>(env, javaByteArray);
}
/// Convert a Java NativeRayObject to a C++ ray::RayObject.
/// NOTE: the returned ray::RayObject cannot be used across threads.
inline std::shared_ptr<ray::RayObject> JavaNativeRayObjectToNativeRayObject(
/// Convert a Java NativeRayObject to a C++ RayObject.
/// NOTE: the returned RayObject cannot be used across threads.
inline std::shared_ptr<RayObject> JavaNativeRayObjectToNativeRayObject(
JNIEnv *env, const jobject &java_obj) {
if (!java_obj) {
return nullptr;
@ -521,8 +524,8 @@ inline std::shared_ptr<ray::RayObject> JavaNativeRayObjectToNativeRayObject(
auto java_data = (jbyteArray)env->GetObjectField(java_obj, java_native_ray_object_data);
auto java_metadata =
(jbyteArray)env->GetObjectField(java_obj, java_native_ray_object_metadata);
std::shared_ptr<ray::Buffer> data_buffer = JavaByteArrayToNativeBuffer(env, java_data);
std::shared_ptr<ray::Buffer> metadata_buffer =
std::shared_ptr<Buffer> data_buffer = JavaByteArrayToNativeBuffer(env, java_data);
std::shared_ptr<Buffer> metadata_buffer =
JavaByteArrayToNativeBuffer(env, java_metadata);
if (data_buffer && data_buffer->Size() == 0) {
data_buffer = nullptr;
@ -533,19 +536,18 @@ inline std::shared_ptr<ray::RayObject> JavaNativeRayObjectToNativeRayObject(
auto java_contained_ids =
env->GetObjectField(java_obj, java_native_ray_object_contained_object_ids);
std::vector<ray::ObjectID> contained_object_ids;
JavaListToNativeVector<ray::ObjectID>(
std::vector<ObjectID> contained_object_ids;
JavaListToNativeVector<ObjectID>(
env, java_contained_ids, &contained_object_ids, [](JNIEnv *env, jobject id) {
return JavaByteArrayToId<ray::ObjectID>(env, static_cast<jbyteArray>(id));
return JavaByteArrayToId<ObjectID>(env, static_cast<jbyteArray>(id));
});
env->DeleteLocalRef(java_contained_ids);
return std::make_shared<ray::RayObject>(data_buffer, metadata_buffer,
contained_object_ids);
return std::make_shared<RayObject>(data_buffer, metadata_buffer, contained_object_ids);
}
/// Convert a C++ ray::RayObject to a Java NativeRayObject.
/// Convert a C++ RayObject to a Java NativeRayObject.
inline jobject NativeRayObjectToJavaNativeRayObject(
JNIEnv *env, const std::shared_ptr<ray::RayObject> &rayObject) {
JNIEnv *env, const std::shared_ptr<RayObject> &rayObject) {
if (!rayObject) {
return nullptr;
}
@ -559,19 +561,18 @@ inline jobject NativeRayObjectToJavaNativeRayObject(
return java_obj;
}
// TODO(po): Convert C++ ray::FunctionDescriptor to Java FunctionDescriptor
// TODO(po): Convert C++ FunctionDescriptor to Java FunctionDescriptor
inline jobject NativeRayFunctionDescriptorToJavaStringList(
JNIEnv *env, const ray::FunctionDescriptor &function_descriptor) {
if (function_descriptor->Type() ==
ray::FunctionDescriptorType::kJavaFunctionDescriptor) {
auto typed_descriptor = function_descriptor->As<ray::JavaFunctionDescriptor>();
JNIEnv *env, const FunctionDescriptor &function_descriptor) {
if (function_descriptor->Type() == FunctionDescriptorType::kJavaFunctionDescriptor) {
auto typed_descriptor = function_descriptor->As<JavaFunctionDescriptor>();
std::vector<std::string> function_descriptor_list = {typed_descriptor->ClassName(),
typed_descriptor->FunctionName(),
typed_descriptor->Signature()};
return NativeStringVectorToJavaStringList(env, function_descriptor_list);
} else if (function_descriptor->Type() ==
ray::FunctionDescriptorType::kPythonFunctionDescriptor) {
auto typed_descriptor = function_descriptor->As<ray::PythonFunctionDescriptor>();
FunctionDescriptorType::kPythonFunctionDescriptor) {
auto typed_descriptor = function_descriptor->As<PythonFunctionDescriptor>();
std::vector<std::string> function_descriptor_list = {
typed_descriptor->ModuleName(), typed_descriptor->ClassName(),
typed_descriptor->FunctionName(), typed_descriptor->FunctionHash()};
@ -609,17 +610,16 @@ inline std::string GetFullName(bool global, std::string name) {
return "";
}
return global ? name
: ::ray::CoreWorkerProcess::GetCoreWorker().GetCurrentJobId().Hex() +
"-" + name;
: CoreWorkerProcess::GetCoreWorker().GetCurrentJobId().Hex() + "-" + name;
}
inline std::shared_ptr<ray::LocalMemoryBuffer> SerializeActorCreationException(
inline std::shared_ptr<LocalMemoryBuffer> SerializeActorCreationException(
JNIEnv *env, jthrowable creation_exception) {
jbyteArray exception_jbyte_array = static_cast<jbyteArray>(
env->CallObjectMethod(creation_exception, java_ray_exception_to_bytes));
int len = env->GetArrayLength(exception_jbyte_array);
auto buf = std::make_shared<ray::LocalMemoryBuffer>(len);
auto buf = std::make_shared<LocalMemoryBuffer>(len);
env->GetByteArrayRegion(exception_jbyte_array, 0, len,
reinterpret_cast<jbyte *>(buf->Data()));
return buf;
}
}

View file

@ -17,6 +17,7 @@
#include "ray/util/util.h"
namespace ray {
namespace core {
bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
// Check the ReferenceCounter to see if there is a location for the object.
@ -151,4 +152,5 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
}
}
} // namespace core
} // namespace ray

View file

@ -23,6 +23,7 @@
#include "ray/raylet_client/raylet_client.h"
namespace ray {
namespace core {
typedef std::function<std::shared_ptr<PinObjectsInterface>(const std::string &ip_address,
int port)>
@ -146,4 +147,5 @@ class ObjectRecoveryManager {
absl::flat_hash_set<ObjectID> objects_pending_recovery_ GUARDED_BY(mu_);
};
} // namespace core
} // namespace ray

View file

@ -17,6 +17,7 @@
#include <chrono>
namespace ray {
namespace core {
namespace worker {
@ -89,4 +90,5 @@ void Profiler::FlushEvents() {
} // namespace worker
} // namespace core
} // namespace ray

View file

@ -23,6 +23,7 @@
#include "ray/gcs/gcs_client.h"
namespace ray {
namespace core {
namespace worker {
@ -84,4 +85,5 @@ class ProfileEvent {
} // namespace worker
} // namespace core
} // namespace ray

View file

@ -29,6 +29,7 @@
namespace {} // namespace
namespace ray {
namespace core {
bool ReferenceCounter::OwnObjects() const {
absl::MutexLock lock(&mutex_);
@ -68,7 +69,7 @@ ReferenceCounter::ReferenceTable ReferenceCounter::ReferenceTableFromProto(
const ReferenceTableProto &proto) {
ReferenceTable refs;
for (const auto &ref : proto) {
refs.emplace(ray::ObjectID::FromBinary(ref.reference().object_id()),
refs.emplace(ObjectID::FromBinary(ref.reference().object_id()),
Reference::FromProto(ref));
}
return refs;
@ -1226,4 +1227,5 @@ void ReferenceCounter::Reference::ToProto(rpc::ObjectReferenceCount *ref) const
}
}
} // namespace core
} // namespace ray

View file

@ -31,6 +31,7 @@
#include "src/ray/protobuf/common.pb.h"
namespace ray {
namespace core {
// Interface for mocking.
class ReferenceCounterInterface {
@ -821,4 +822,5 @@ class ReferenceCounter : public ReferenceCounterInterface,
pubsub::SubscriberInterface *object_info_subscriber_;
};
} // namespace core
} // namespace ray

View file

@ -28,6 +28,7 @@
#include "ray/pubsub/subscriber.h"
namespace ray {
namespace core {
static const rpc::Address empty_borrower;
static const ReferenceCounter::ReferenceTableProto empty_refs;
@ -2332,6 +2333,7 @@ TEST_F(ReferenceCountTest, TestRemoveOwnedObject) {
ASSERT_FALSE(rc->HasReference(id));
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -19,6 +19,7 @@
#include "ray/core_worker/core_worker.h"
namespace ray {
namespace core {
// Notify the user about an unhandled error after this amount of time. This only
// applies to interactive console (e.g., IPython), see:
@ -585,4 +586,5 @@ MemoryStoreStats CoreWorkerMemoryStore::GetMemoryStoreStatisticalData() {
return item;
}
} // namespace core
} // namespace ray

View file

@ -26,6 +26,7 @@
#include "ray/core_worker/reference_count.h"
namespace ray {
namespace core {
struct MemoryStoreStats {
int32_t num_in_plasma = 0;
@ -234,4 +235,5 @@ class CoreWorkerMemoryStore {
int64_t used_object_store_memory_ GUARDED_BY(mu_) = 0;
};
} // namespace core
} // namespace ray

View file

@ -20,6 +20,7 @@
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace core {
void BufferTracker::Record(const ObjectID &object_id, TrackedBuffer *buffer,
const std::string &call_site) {
@ -439,4 +440,5 @@ Status CoreWorkerPlasmaStoreProvider::WarmupStore() {
return Status::OK();
}
} // namespace core
} // namespace ray

View file

@ -26,6 +26,7 @@
#include "ray/raylet_client/raylet_client.h"
namespace ray {
namespace core {
class TrackedBuffer;
@ -215,4 +216,5 @@ class CoreWorkerPlasmaStoreProvider {
std::shared_ptr<BufferTracker> buffer_tracker_;
};
} // namespace core
} // namespace ray

View file

@ -21,6 +21,7 @@
#include "msgpack.hpp"
namespace ray {
namespace core {
// Start throttling task failure logs once we hit this threshold.
const int64_t kTaskFailureThrottlingThreshold = 50;
@ -466,8 +467,8 @@ void TaskManager::MarkPendingTaskFailed(
packer.pack_bin(pb_serialized_exception.size());
packer.pack_bin_body(pb_serialized_exception.data(),
pb_serialized_exception.size());
ray::LocalMemoryBuffer final_buffer(msgpack_serialized_exception.size() +
kMessagePackOffset);
LocalMemoryBuffer final_buffer(msgpack_serialized_exception.size() +
kMessagePackOffset);
// copy msgpack-serialized bytes
std::memcpy(final_buffer.Data() + kMessagePackOffset,
msgpack_serialized_exception.data(),
@ -505,4 +506,5 @@ std::vector<TaskID> TaskManager::GetPendingChildrenTasks(
return ret_vec;
}
} // namespace core
} // namespace ray

View file

@ -24,6 +24,7 @@
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace core {
class TaskFinisherInterface {
public:
@ -289,4 +290,5 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
std::function<void()> shutdown_hook_ GUARDED_BY(mu_) = nullptr;
};
} // namespace core
} // namespace ray

View file

@ -24,6 +24,7 @@
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
namespace ray {
namespace core {
using ::testing::_;
@ -34,7 +35,7 @@ class MockActorInfoAccessor : public gcs::ServiceBasedActorInfoAccessor {
~MockActorInfoAccessor() {}
ray::Status AsyncSubscribe(
Status AsyncSubscribe(
const ActorID &actor_id,
const gcs::SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const gcs::StatusCallback &done) {
@ -137,8 +138,8 @@ class ActorManagerTest : public ::testing::Test {
ActorID actor_id = ActorID::Of(job_id, task_id, 1);
const auto caller_address = rpc::Address();
const auto call_site = "";
RayFunction function(ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""));
RayFunction function(Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""));
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
@ -164,8 +165,8 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) {
ActorID actor_id = ActorID::Of(job_id, task_id, 1);
const auto caller_address = rpc::Address();
const auto call_site = "";
RayFunction function(ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""));
RayFunction function(Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""));
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0);
@ -217,8 +218,8 @@ TEST_F(ActorManagerTest, RegisterActorHandles) {
ActorID actor_id = ActorID::Of(job_id, task_id, 1);
const auto caller_address = rpc::Address();
const auto call_site = "";
RayFunction function(ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""));
RayFunction function(Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""));
auto actor_handle = absl::make_unique<ActorHandle>(
actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "", 0);
@ -287,6 +288,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationAlive) {
actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data));
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -44,6 +44,7 @@ int node_manager_port = 0;
} // namespace
namespace ray {
namespace core {
static void flushall_redis(void) {
redisContext *context = redisConnect("127.0.0.1", 6379);
@ -58,8 +59,8 @@ ActorID CreateActorHelper(std::unordered_map<std::string, double> &resources,
uint8_t array[] = {1, 2, 3};
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"actor creation task", "", "", ""));
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"actor creation task", "", "", ""));
std::vector<std::unique_ptr<TaskArg>> args;
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>())));
@ -205,13 +206,13 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id,
std::vector<std::unique_ptr<TaskArg>> args;
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"GetWorkerPid", "", "", "")};
RayFunction func{Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("GetWorkerPid", "", "", "")};
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options,
&return_ids);
std::vector<std::shared_ptr<ray::RayObject>> results;
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results));
if (nullptr == results[0]->GetData()) {
@ -243,8 +244,8 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &res
std::make_shared<RayObject>(buffer1, nullptr, std::vector<ObjectID>())));
args.emplace_back(new TaskArgByReference(object_id, driver.GetRpcAddress()));
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
TaskOptions options;
std::vector<ObjectID> return_ids;
driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0,
@ -253,7 +254,7 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &res
ASSERT_EQ(return_ids.size(), 1);
std::vector<std::shared_ptr<ray::RayObject>> results;
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(driver.Get(return_ids, -1, &results));
ASSERT_EQ(results.size(), 1);
@ -288,13 +289,13 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
std::vector<std::shared_ptr<ray::RayObject>> results;
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(driver.Get(return_ids, -1, &results));
ASSERT_EQ(results.size(), 1);
@ -330,13 +331,13 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
std::vector<std::shared_ptr<ray::RayObject>> results;
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(driver.Get(return_ids, -1, &results));
ASSERT_EQ(results.size(), 1);
@ -392,8 +393,8 @@ void CoreWorkerTest::TestActorRestart(
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
@ -435,8 +436,8 @@ void CoreWorkerTest::TestActorFailure(
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
@ -486,8 +487,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
// to benchmark performance.
uint8_t array[] = {1, 2, 3};
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
RayFunction function(ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""));
RayFunction function(Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""));
std::vector<std::unique_ptr<TaskArg>> args;
args.emplace_back(new TaskArgByValue(
std::make_shared<RayObject>(buffer, nullptr, std::vector<ObjectID>())));
@ -569,8 +570,8 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
TaskOptions options{"", 1, resources};
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RayFunction func(Language::PYTHON, FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
@ -627,10 +628,10 @@ TEST_F(ZeroNodeTest, TestWorkerContext) {
TEST_F(ZeroNodeTest, TestActorHandle) {
// Test actor handle serialization and deserialization round trip.
JobID job_id = NextJobId();
ActorHandle original(
ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), TaskID::Nil(),
rpc::Address(), job_id, ObjectID::FromRandom(), Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0);
ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0),
TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0);
std::string output;
original.Serialize(&output);
ActorHandle deserialized(output);
@ -838,10 +839,10 @@ TEST_F(SingleNodeTest, TestCancelTasks) {
auto &driver = CoreWorkerProcess::GetCoreWorker();
// Create two functions, each implementing a while(true) loop.
RayFunction func1(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"WhileTrueLoop", "", "", ""));
RayFunction func2(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"WhileTrueLoop", "", "", ""));
RayFunction func1(Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("WhileTrueLoop", "", "", ""));
RayFunction func2(Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("WhileTrueLoop", "", "", ""));
// Return IDs for the two functions that implement while(true) loops.
std::vector<ObjectID> return_ids1;
std::vector<ObjectID> return_ids2;
@ -915,6 +916,7 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodesFailure) {
TestActorFailure(resources);
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -23,6 +23,7 @@
#include "ray/rpc/worker/core_worker_client.h"
namespace ray {
namespace core {
using ::testing::_;
using ::testing::ElementsAre;
@ -648,6 +649,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) {
StopIOService();
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -23,13 +23,14 @@
#include "ray/rpc/worker/core_worker_client.h"
namespace ray {
namespace core {
// Used to prevent leases from timing out when not testing that logic. It would
// be better to use a mock clock or lease manager interface, but that's high
// overhead for the very simple timeout logic we currently have.
int64_t kLongTimeout = 1024 * 1024 * 1024;
TaskSpecification BuildTaskSpec(const std::unordered_map<std::string, double> &resources,
const ray::FunctionDescriptor &function_descriptor);
const FunctionDescriptor &function_descriptor);
// Calls BuildTaskSpec with empty resources map and empty function descriptor
TaskSpecification BuildEmptyTaskSpec();
@ -134,8 +135,8 @@ class MockTaskFinisher : public TaskFinisherInterface {
class MockRayletClient : public WorkerLeaseInterface {
public:
ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) override {
Status ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) override {
if (disconnect_worker) {
num_workers_disconnected++;
} else {
@ -145,7 +146,7 @@ class MockRayletClient : public WorkerLeaseInterface {
}
void RequestWorkerLease(
const ray::TaskSpecification &resource_spec,
const TaskSpecification &resource_spec,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size) override {
num_workers_requested += 1;
@ -391,7 +392,7 @@ TEST(LocalDependencyResolverTest, TestInlinedObjectIds) {
}
TaskSpecification BuildTaskSpec(const std::unordered_map<std::string, double> &resources,
const ray::FunctionDescriptor &function_descriptor) {
const FunctionDescriptor &function_descriptor) {
TaskSpecBuilder builder;
rpc::Address empty_address;
builder.SetCommonTaskSpec(TaskID::Nil(), "dummy_task", Language::PYTHON,
@ -403,8 +404,8 @@ TaskSpecification BuildTaskSpec(const std::unordered_map<std::string, double> &r
TaskSpecification BuildEmptyTaskSpec() {
std::unordered_map<std::string, double> empty_resources;
ray::FunctionDescriptor empty_descriptor =
ray::FunctionDescriptorBuilder::BuildPython("", "", "", "");
FunctionDescriptor empty_descriptor =
FunctionDescriptorBuilder::BuildPython("", "", "", "");
return BuildTaskSpec(empty_resources, empty_descriptor);
}
@ -996,10 +997,10 @@ TEST(DirectTaskTransportTest, TestSchedulingKeys) {
std::unordered_map<std::string, double> resources1({{"a", 1.0}});
std::unordered_map<std::string, double> resources2({{"b", 2.0}});
ray::FunctionDescriptor descriptor1 =
ray::FunctionDescriptorBuilder::BuildPython("a", "", "", "");
ray::FunctionDescriptor descriptor2 =
ray::FunctionDescriptorBuilder::BuildPython("b", "", "", "");
FunctionDescriptor descriptor1 =
FunctionDescriptorBuilder::BuildPython("a", "", "", "");
FunctionDescriptor descriptor2 =
FunctionDescriptorBuilder::BuildPython("b", "", "", "");
// Tasks with different resources should request different worker leases.
RAY_LOG(INFO) << "Test different resources";
@ -1984,6 +1985,7 @@ TEST(DirectTaskTransportTest, TestNoWorkerRequestedIfStealingUnavailable) {
ASSERT_EQ(worker_client->steal_callbacks.size(), 0);
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -18,6 +18,7 @@
#include "ray/common/task/task_spec.h"
namespace ray {
namespace core {
TaskSpecification CreateFakeTask(std::vector<ObjectID> deps) {
TaskSpecification spec;
@ -208,4 +209,5 @@ TEST(LocalityAwareLeasePolicyTest, TestBestLocalityFallbackAddrFetchFail) {
ASSERT_EQ(NodeID::FromBinary(best_node_address.raylet_id()), fallback_node);
}
} // namespace core
} // namespace ray

View file

@ -20,6 +20,7 @@
#include "ray/common/test_util.h"
namespace ray {
namespace core {
TEST(TestMemoryStore, TestReportUnhandledErrors) {
std::vector<std::shared_ptr<RayObject>> results;
@ -126,6 +127,7 @@ TEST(TestMemoryStore, TestMemoryStoreStats) {
ASSERT_EQ(item.used_object_store_memory, expected_item3.used_object_store_memory);
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -20,6 +20,7 @@
using namespace std::placeholders;
namespace ray {
namespace core {
/// A mock C++ worker used by core_worker_test.cc to verify the task submission/execution
/// interfaces in both single node and cross-nodes scenarios. As the raylet client can
@ -65,11 +66,10 @@ class MockWorker {
const std::string &debugger_breakpoint,
std::vector<std::shared_ptr<RayObject>> *results) {
// Note that this doesn't include dummy object id.
const ray::FunctionDescriptor function_descriptor =
ray_function.GetFunctionDescriptor();
const FunctionDescriptor function_descriptor = ray_function.GetFunctionDescriptor();
RAY_CHECK(function_descriptor->Type() ==
ray::FunctionDescriptorType::kPythonFunctionDescriptor);
auto typed_descriptor = function_descriptor->As<ray::PythonFunctionDescriptor>();
FunctionDescriptorType::kPythonFunctionDescriptor);
auto typed_descriptor = function_descriptor->As<PythonFunctionDescriptor>();
if ("actor creation task" == typed_descriptor->ModuleName()) {
return Status::OK();
@ -142,6 +142,7 @@ class MockWorker {
int64_t prev_seq_no_ = 0;
};
} // namespace core
} // namespace ray
int main(int argc, char **argv) {
@ -151,7 +152,8 @@ int main(int argc, char **argv) {
auto node_manager_port = std::stoi(std::string(argv[3]));
ray::gcs::GcsClientOptions gcs_options("127.0.0.1", 6379, "");
ray::MockWorker worker(store_socket, raylet_socket, node_manager_port, gcs_options);
ray::core::MockWorker worker(store_socket, raylet_socket, node_manager_port,
gcs_options);
worker.RunTaskExecutionLoop();
return 0;
}

View file

@ -26,6 +26,7 @@
#include "ray/raylet_client/raylet_client.h"
namespace ray {
namespace core {
// Used to prevent leases from timing out when not testing that logic. It would
// be better to use a mock clock or lease manager interface, but that's high
@ -60,7 +61,7 @@ class MockRayletClient : public PinObjectsInterface {
public:
void PinObjectIDs(
const rpc::Address &caller_address, const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) override {
const rpc::ClientCallback<rpc::PinObjectIDsReply> &callback) override {
RAY_LOG(INFO) << "PinObjectIDs " << object_ids.size();
callbacks.push_back(callback);
}
@ -254,6 +255,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) {
}
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -20,6 +20,7 @@
#include "ray/core_worker/transport/direct_actor_transport.h"
namespace ray {
namespace core {
class MockActorSchedulingQueue {
public:
@ -356,6 +357,7 @@ TEST(SchedulingQueueTest, TestStealingOddNumberTasks) {
ASSERT_EQ(n_steal, 5);
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -24,6 +24,7 @@
#include "ray/pubsub/mock_pubsub.h"
namespace ray {
namespace core {
TaskSpecification CreateTaskHelper(uint64_t num_returns,
std::vector<ObjectID> dependencies) {
@ -523,6 +524,7 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
ASSERT_EQ(num_retries_, 1);
}
} // namespace core
} // namespace ray
int main(int argc, char **argv) {

View file

@ -15,6 +15,7 @@
#include "ray/core_worker/transport/dependency_resolver.h"
namespace ray {
namespace core {
struct TaskState {
TaskState(TaskSpecification t,
@ -115,4 +116,5 @@ void LocalDependencyResolver::ResolveDependencies(TaskSpecification &task,
}
}
} // namespace core
} // namespace ray

View file

@ -22,6 +22,7 @@
#include "ray/core_worker/task_manager.h"
namespace ray {
namespace core {
// This class is thread-safe.
class LocalDependencyResolver {
@ -58,4 +59,5 @@ class LocalDependencyResolver {
absl::Mutex mu_;
};
} // namespace core
} // namespace ray

View file

@ -21,6 +21,7 @@
using ray::rpc::ActorTableData;
namespace ray {
namespace core {
void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists(
const ActorID &actor_id) {
@ -121,7 +122,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
void CoreWorkerDirectActorTaskSubmitter::DisconnectRpcClient(ClientQueue &queue) {
queue.rpc_client = nullptr;
core_worker_client_pool_->Disconnect(ray::WorkerID::FromBinary(queue.worker_id));
core_worker_client_pool_->Disconnect(WorkerID::FromBinary(queue.worker_id));
queue.worker_id.clear();
queue.pending_force_kill.reset();
}
@ -621,4 +622,5 @@ void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(bool is_asyncio,
fiber_max_concurrency_ = fiber_max_concurrency;
}
} // namespace core
} // namespace ray

View file

@ -39,6 +39,7 @@
namespace {} // namespace
namespace ray {
namespace core {
/// The max time to wait for out-of-order tasks.
const int kMaxReorderWaitSeconds = 30;
@ -782,7 +783,7 @@ class CoreWorkerDirectTaskReceiver {
std::vector<std::shared_ptr<RayObject>> *return_objects,
ReferenceCounter::ReferenceTableProto *borrower_refs)>;
using OnTaskDone = std::function<ray::Status()>;
using OnTaskDone = std::function<Status()>;
CoreWorkerDirectTaskReceiver(WorkerContext &worker_context,
instrumented_io_context &main_io_service,
@ -857,4 +858,5 @@ class CoreWorkerDirectTaskReceiver {
void SetMaxActorConcurrency(bool is_asyncio, int fiber_max_concurrency);
};
} // namespace core
} // namespace ray

View file

@ -17,6 +17,7 @@
#include "ray/core_worker/transport/dependency_resolver.h"
namespace ray {
namespace core {
Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(DEBUG) << "Submit task " << task_spec.TaskId();
@ -194,7 +195,7 @@ bool CoreWorkerDirectTaskSubmitter::FindOptimalVictimForStealing(
((candidate_entry.tasks_in_flight > victim_entry.tasks_in_flight) &&
candidate_addr.worker_id != thief_addr.worker_id)) {
// We copy the candidate's rpc::Address (instead of its rpc::WorkerAddress) because
// objects of type 'ray::rpc::WorkerAddress' cannot be assigned as their copy
// objects of type 'rpc::WorkerAddress' cannot be assigned as their copy
// assignment operator is implicitly deleted
*victim_raw_addr = candidate_addr.ToProto();
}
@ -749,4 +750,5 @@ Status CoreWorkerDirectTaskSubmitter::CancelRemoteTask(const ObjectID &object_id
return Status::OK();
}
}; // namespace ray
} // namespace core
} // namespace ray

View file

@ -32,6 +32,7 @@
#include "ray/rpc/worker/core_worker_client_pool.h"
namespace ray {
namespace core {
typedef std::function<std::shared_ptr<WorkerLeaseInterface>(const std::string &ip_address,
int port)>
@ -359,4 +360,5 @@ class CoreWorkerDirectTaskSubmitter {
int64_t num_leases_requested_ GUARDED_BY(mu_) = 0;
};
}; // namespace ray
} // namespace core
} // namespace ray

View file

@ -38,10 +38,10 @@ namespace rpc {
server_call_factories->emplace_back(std::move(HANDLER##_call_factory));
// Define a void RPC client method.
#define DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(METHOD) \
virtual void Handle##METHOD(const rpc::METHOD##Request &request, \
rpc::METHOD##Reply *reply, \
rpc::SendReplyCallback send_reply_callback) = 0;
#define DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(METHOD) \
virtual void Handle##METHOD(const ::ray::rpc::METHOD##Request &request, \
::ray::rpc::METHOD##Reply *reply, \
::ray::rpc::SendReplyCallback send_reply_callback) = 0;
class GrpcService;

View file

@ -18,6 +18,9 @@
namespace ray {
namespace streaming {
using ray::core::CoreWorkerProcess;
using ray::core::TaskOptions;
void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffer> buffer,
RayFunction &function, int return_num,
std::vector<ObjectID> &return_ids) {

View file

@ -22,6 +22,8 @@
namespace ray {
namespace streaming {
using ray::core::RayFunction;
/// Send buffer internal
/// \param[in] buffer buffer to be sent.
/// \param[in] function the function descriptor of peer's function.
@ -32,4 +34,4 @@ void SendInternal(const ActorID &peer_actor_id, std::shared_ptr<LocalMemoryBuffe
std::vector<ObjectID> &return_ids);
} // namespace streaming
} // namespace ray
} // namespace ray

View file

@ -10,6 +10,8 @@
namespace ray {
namespace streaming {
using namespace ray::core;
enum class TransferCreationStatus : uint32_t {
FreshStarted = 0,
PullOk = 1,
@ -27,8 +29,8 @@ struct StreamingQueueInfo {
struct ChannelCreationParameter {
ActorID actor_id;
std::shared_ptr<ray::RayFunction> async_function;
std::shared_ptr<ray::RayFunction> sync_function;
std::shared_ptr<RayFunction> async_function;
std::shared_ptr<RayFunction> sync_function;
};
/// PrducerChannelinfo and ConsumerChannelInfo contains channel information and

View file

@ -114,8 +114,8 @@ void JavaStringListToNativeStringVector(JNIEnv *env, jobject java_list,
});
}
std::shared_ptr<ray::RayFunction> FunctionDescriptorToRayFunction(
JNIEnv *env, jobject functionDescriptor) {
std::shared_ptr<RayFunction> FunctionDescriptorToRayFunction(JNIEnv *env,
jobject functionDescriptor) {
jclass java_language_class = LoadClass(env, "io/ray/runtime/generated/Common$Language");
jclass java_function_descriptor_class =
LoadClass(env, "io/ray/runtime/functionmanager/FunctionDescriptor");
@ -136,8 +136,8 @@ std::shared_ptr<ray::RayFunction> FunctionDescriptorToRayFunction(
&function_descriptor_list);
ray::FunctionDescriptor function_descriptor =
ray::FunctionDescriptorBuilder::FromVector(language, function_descriptor_list);
ray::RayFunction ray_function(language, function_descriptor);
return std::make_shared<ray::RayFunction>(ray_function);
RayFunction ray_function(language, function_descriptor);
return std::make_shared<RayFunction>(ray_function);
}
void ParseChannelInitParameters(

View file

@ -8,6 +8,8 @@
#include "ray/core_worker/common.h"
#include "util/streaming_logging.h"
using namespace ray::core;
class UniqueIdFromJByteArray {
private:
JNIEnv *_env;
@ -100,8 +102,8 @@ jint throwRuntimeException(JNIEnv *env, const char *message);
jint throwChannelInitException(JNIEnv *env, const char *message,
const std::vector<ray::ObjectID> &abnormal_queues);
jint throwChannelInterruptException(JNIEnv *env, const char *message);
std::shared_ptr<ray::RayFunction> FunctionDescriptorToRayFunction(
JNIEnv *env, jobject functionDescriptor);
std::shared_ptr<RayFunction> FunctionDescriptorToRayFunction(JNIEnv *env,
jobject functionDescriptor);
void ParseChannelInitParameters(
JNIEnv *env, jobject param_obj,
std::vector<ray::streaming::ChannelCreationParameter> &parameter_vec);

View file

@ -11,6 +11,8 @@
namespace ray {
namespace streaming {
using namespace ray::core;
enum class StreamingQueueStatus : uint32_t {
OK = 0,
Timeout = 1,

View file

@ -7,6 +7,8 @@
namespace ray {
namespace streaming {
using namespace ray::core;
/// Transport is the transfer endpoint to a specific actor, buffers can be sent to peer
/// through direct actor call.
class Transport {

View file

@ -7,6 +7,8 @@
namespace ray {
namespace streaming {
using namespace ray::core;
ray::ObjectID RandomObjectID() { return ObjectID::FromRandom(); }
static void flushall_redis(void) {