[core worker] Refactor CoreWorker member classes (#5062)

* Move store client mutex inside CoreWorkerPlasmaStoreProvider

* Move PlasmaClient inside CoreWorkerStoreProvider

* Remove CoreWorkerObjectInterface's ref to CoreWorker

* Remove WorkerLanguage

* Remove CoreWorkerTaskInterface's ref to CoreWorker

* Remove CoreWorkerTaskExecutionInterface's ref to CoreWorker

* lint

* move comment

* Fix build

* Fix build
This commit is contained in:
Stephanie Wang 2019-07-02 15:30:30 -07:00 committed by GitHub
parent 1cf7728f35
commit 71d4637b75
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 106 additions and 166 deletions

View file

@ -5,7 +5,7 @@
#include "ray/common/buffer.h"
#include "ray/common/id.h"
#include "ray/protobuf/gcs.pb.h"
#include "ray/gcs/format/gcs_generated.h"
#include "ray/raylet/raylet_client.h"
#include "ray/raylet/task_spec.h"
@ -17,7 +17,7 @@ enum class WorkerType { WORKER, DRIVER };
/// Information about a remote function.
struct RayFunction {
/// Language of the remote function.
const ray::rpc::Language language;
const Language language;
/// Function descriptor of the remote function.
const std::vector<std::string> function_descriptor;
};
@ -107,48 +107,6 @@ enum class StoreProviderType { PLASMA };
enum class TaskTransportType { RAYLET };
/// Translate from ray::rpc::Language to Language type (required by raylet client).
///
/// \param[in] language Language for a task.
/// \return Translated task language.
inline ::Language ToRayletTaskLanguage(ray::rpc::Language language) {
switch (language) {
case ray::rpc::Language::JAVA:
return ::Language::JAVA;
break;
case ray::rpc::Language::PYTHON:
return ::Language::PYTHON;
break;
case ray::rpc::Language::CPP:
return ::Language::CPP;
break;
default:
RAY_LOG(FATAL) << "Invalid language specified: " << static_cast<int>(language);
break;
}
}
/// Translate from Language to ray::rpc::Language type (required by core worker).
///
/// \param[in] language Language for a task.
/// \return Translated task language.
inline ray::rpc::Language ToRpcTaskLanguage(::Language language) {
switch (language) {
case Language::JAVA:
return ray::rpc::Language::JAVA;
break;
case Language::PYTHON:
return ray::rpc::Language::PYTHON;
break;
case Language::CPP:
return ray::rpc::Language::CPP;
break;
default:
RAY_LOG(FATAL) << "Invalid language specified: " << static_cast<int>(language);
break;
}
}
} // namespace ray
#endif // RAY_CORE_WORKER_COMMON_H

View file

@ -3,31 +3,22 @@
namespace ray {
CoreWorker::CoreWorker(const enum WorkerType worker_type,
const ray::rpc::Language language, const std::string &store_socket,
const std::string &raylet_socket, const JobID &job_id)
CoreWorker::CoreWorker(const enum WorkerType worker_type, const ::Language language,
const std::string &store_socket, const std::string &raylet_socket,
const JobID &job_id)
: worker_type_(worker_type),
language_(language),
store_socket_(store_socket),
raylet_socket_(raylet_socket),
worker_context_(worker_type, job_id),
// TODO(zhijunfu): currently RayletClient would crash in its constructor
// if it cannot connect to Raylet after a number of retries, this needs
// to be changed so that the worker (java/python .etc) can retrieve and
// handle the error instead of crashing.
raylet_client_(raylet_socket_,
ClientID::FromBinary(worker_context_.GetWorkerID().Binary()),
(worker_type_ == ray::WorkerType::WORKER),
worker_context_.GetCurrentJobID(), ToRayletTaskLanguage(language_)),
task_interface_(*this),
object_interface_(*this),
task_execution_interface_(*this) {
// TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this needs to be changed
// so that the worker (java/python .etc) can retrieve and handle the error
// instead of crashing.
auto status = store_client_.Connect(store_socket_);
if (!status.ok()) {
RAY_LOG(ERROR) << "Connecting plasma store failed when trying to construct"
<< " core worker: " << status.message();
throw std::runtime_error(status.message());
}
}
worker_context_.GetCurrentJobID(), language_),
task_interface_(worker_context_, raylet_client_),
object_interface_(worker_context_, raylet_client_, store_socket),
task_execution_interface_(worker_context_, raylet_client_, object_interface_) {}
} // namespace ray

View file

@ -7,6 +7,7 @@
#include "ray/core_worker/object_interface.h"
#include "ray/core_worker/task_execution.h"
#include "ray/core_worker/task_interface.h"
#include "ray/gcs/format/gcs_generated.h"
#include "ray/raylet/raylet_client.h"
namespace ray {
@ -22,7 +23,7 @@ class CoreWorker {
/// \param[in] langauge Language of this worker.
///
/// NOTE(zhijunfu): the constructor would throw if a failure happens.
CoreWorker(const WorkerType worker_type, const ray::rpc::Language language,
CoreWorker(const WorkerType worker_type, const ::Language language,
const std::string &store_socket, const std::string &raylet_socket,
const JobID &job_id = JobID::Nil());
@ -30,7 +31,7 @@ class CoreWorker {
enum WorkerType WorkerType() const { return worker_type_; }
/// Language of this worker.
ray::rpc::Language Language() const { return language_; }
::Language Language() const { return language_; }
/// Return the `CoreWorkerTaskInterface` that contains the methods related to task
/// submisson.
@ -49,10 +50,7 @@ class CoreWorker {
const enum WorkerType worker_type_;
/// Language of this worker.
const ray::rpc::Language language_;
/// Plasma store socket name.
const std::string store_socket_;
const ::Language language_;
/// raylet socket name.
const std::string raylet_socket_;
@ -60,12 +58,6 @@ class CoreWorker {
/// Worker context.
WorkerContext worker_context_;
/// Plasma store client.
plasma::PlasmaClient store_client_;
/// Mutex to protect store_client_.
std::mutex store_client_mutex_;
/// Raylet client.
RayletClient raylet_client_;
@ -77,10 +69,6 @@ class CoreWorker {
/// The `CoreWorkerTaskExecutionInterface` instance.
CoreWorkerTaskExecutionInterface task_execution_interface_;
friend class CoreWorkerTaskInterface;
friend class CoreWorkerObjectInterface;
friend class CoreWorkerTaskExecutionInterface;
};
} // namespace ray

View file

@ -124,9 +124,8 @@ class CoreWorkerTest : public ::testing::Test {
void TearDown() {}
void TestNormalTask(const std::unordered_map<std::string, double> &resources) {
CoreWorker driver(WorkerType::DRIVER, ray::rpc::Language::PYTHON,
raylet_store_socket_names_[0], raylet_socket_names_[0],
JobID::FromRandom());
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], JobID::FromRandom());
// Test pass by value.
{
@ -134,7 +133,7 @@ class CoreWorkerTest : public ::testing::Test {
auto buffer1 = std::make_shared<LocalMemoryBuffer>(array1, sizeof(array1));
RayFunction func{ray::rpc::Language::PYTHON, {}};
RayFunction func{Language::PYTHON, {}};
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByValue(buffer1));
@ -165,7 +164,7 @@ class CoreWorkerTest : public ::testing::Test {
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByReference(object_id));
RayFunction func{ray::rpc::Language::PYTHON, {}};
RayFunction func{Language::PYTHON, {}};
TaskOptions options;
std::vector<ObjectID> return_ids;
@ -184,9 +183,8 @@ class CoreWorkerTest : public ::testing::Test {
}
void TestActorTask(const std::unordered_map<std::string, double> &resources) {
CoreWorker driver(WorkerType::DRIVER, ray::rpc::Language::PYTHON,
raylet_store_socket_names_[0], raylet_socket_names_[0],
JobID::FromRandom());
CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], JobID::FromRandom());
std::unique_ptr<ActorHandle> actor_handle;
@ -195,7 +193,7 @@ class CoreWorkerTest : public ::testing::Test {
uint8_t array[] = {1, 2, 3};
auto buffer = std::make_shared<LocalMemoryBuffer>(array, sizeof(array));
RayFunction func{ray::rpc::Language::PYTHON, {}};
RayFunction func{Language::PYTHON, {}};
std::vector<TaskArg> args;
args.emplace_back(TaskArg::PassByValue(buffer));
@ -223,7 +221,7 @@ class CoreWorkerTest : public ::testing::Test {
TaskOptions options{1, resources};
std::vector<ObjectID> return_ids;
RayFunction func{ray::rpc::Language::PYTHON, {}};
RayFunction func{Language::PYTHON, {}};
RAY_CHECK_OK(driver.Tasks().SubmitActorTask(*actor_handle, func, args, options,
&return_ids));
RAY_CHECK(return_ids.size() == 1);
@ -304,7 +302,7 @@ TEST_F(ZeroNodeTest, TestWorkerContext) {
TEST_F(ZeroNodeTest, TestActorHandle) {
ActorHandle handle1(ActorID::FromRandom(), ActorHandleID::FromRandom(),
ray::rpc::Language::JAVA,
::Language::JAVA,
{"org.ray.exampleClass", "exampleMethod", "exampleSignature"});
auto forkedHandle1 = handle1.Fork();
@ -336,7 +334,7 @@ TEST_F(ZeroNodeTest, TestActorHandle) {
}
TEST_F(SingleNodeTest, TestObjectInterface) {
CoreWorker core_worker(WorkerType::DRIVER, ray::rpc::Language::PYTHON,
CoreWorker core_worker(WorkerType::DRIVER, Language::PYTHON,
raylet_store_socket_names_[0], raylet_socket_names_[0],
JobID::FromRandom());
@ -400,13 +398,11 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
}
TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) {
CoreWorker worker1(WorkerType::DRIVER, ray::rpc::Language::PYTHON,
raylet_store_socket_names_[0], raylet_socket_names_[0],
JobID::FromRandom());
CoreWorker worker1(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0],
raylet_socket_names_[0], JobID::FromRandom());
CoreWorker worker2(WorkerType::DRIVER, ray::rpc::Language::PYTHON,
raylet_store_socket_names_[1], raylet_socket_names_[1],
JobID::FromRandom());
CoreWorker worker2(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[1],
raylet_socket_names_[1], JobID::FromRandom());
uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8};
uint8_t array2[] = {10, 11, 12, 13, 14, 15};
@ -491,7 +487,7 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodes) {
TEST_F(SingleNodeTest, TestCoreWorkerConstructorFailure) {
try {
CoreWorker core_worker(WorkerType::DRIVER, ray::rpc::Language::PYTHON, "",
CoreWorker core_worker(WorkerType::DRIVER, Language::PYTHON, "",
raylet_socket_names_[0], JobID::FromRandom());
} catch (const std::exception &e) {
std::cout << "Caught exception when constructing core worker: " << e.what();

View file

@ -17,8 +17,8 @@ namespace ray {
class MockWorker {
public:
MockWorker(const std::string &store_socket, const std::string &raylet_socket)
: worker_(WorkerType::WORKER, ray::rpc::Language::PYTHON, store_socket,
raylet_socket, JobID::FromRandom()) {}
: worker_(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket,
JobID::FromRandom()) {}
void Run() {
auto executor_func = [this](const RayFunction &ray_function,

View file

@ -1,23 +1,22 @@
#include "ray/core_worker/object_interface.h"
#include "ray/common/ray_config.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/store_provider/plasma_store_provider.h"
namespace ray {
CoreWorkerObjectInterface::CoreWorkerObjectInterface(CoreWorker &core_worker)
: core_worker_(core_worker) {
CoreWorkerObjectInterface::CoreWorkerObjectInterface(WorkerContext &worker_context,
RayletClient &raylet_client,
const std::string &store_socket)
: worker_context_(worker_context), raylet_client_(raylet_client) {
store_providers_.emplace(
static_cast<int>(StoreProviderType::PLASMA),
std::unique_ptr<CoreWorkerStoreProvider>(new CoreWorkerPlasmaStoreProvider(
core_worker_.store_client_, core_worker_.store_client_mutex_,
core_worker_.raylet_client_)));
std::unique_ptr<CoreWorkerStoreProvider>(
new CoreWorkerPlasmaStoreProvider(store_socket, raylet_client_)));
}
Status CoreWorkerObjectInterface::Put(const RayObject &object, ObjectID *object_id) {
ObjectID put_id = ObjectID::ForPut(core_worker_.worker_context_.GetCurrentTaskID(),
core_worker_.worker_context_.GetNextPutIndex());
ObjectID put_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(),
worker_context_.GetNextPutIndex());
*object_id = put_id;
return Put(object, put_id);
}
@ -32,8 +31,8 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
int64_t timeout_ms,
std::vector<std::shared_ptr<RayObject>> *results) {
auto type = static_cast<int>(StoreProviderType::PLASMA);
return store_providers_[type]->Get(
ids, timeout_ms, core_worker_.worker_context_.GetCurrentTaskID(), results);
return store_providers_[type]->Get(ids, timeout_ms, worker_context_.GetCurrentTaskID(),
results);
}
Status CoreWorkerObjectInterface::Wait(const std::vector<ObjectID> &object_ids,
@ -41,8 +40,7 @@ Status CoreWorkerObjectInterface::Wait(const std::vector<ObjectID> &object_ids,
std::vector<bool> *results) {
auto type = static_cast<int>(StoreProviderType::PLASMA);
return store_providers_[type]->Wait(object_ids, num_objects, timeout_ms,
core_worker_.worker_context_.GetCurrentTaskID(),
results);
worker_context_.GetCurrentTaskID(), results);
}
Status CoreWorkerObjectInterface::Delete(const std::vector<ObjectID> &object_ids,

View file

@ -6,6 +6,7 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/store_provider/store_provider.h"
namespace ray {
@ -16,7 +17,8 @@ class CoreWorkerStoreProvider;
/// The interface that contains all `CoreWorker` methods that are related to object store.
class CoreWorkerObjectInterface {
public:
CoreWorkerObjectInterface(CoreWorker &core_worker);
CoreWorkerObjectInterface(WorkerContext &worker_context, RayletClient &raylet_client,
const std::string &store_socket);
/// Put an object into object store.
///
@ -62,8 +64,10 @@ class CoreWorkerObjectInterface {
bool delete_creating_tasks);
private:
/// Reference to the parent CoreWorker instance.
CoreWorker &core_worker_;
/// Reference to the parent CoreWorker's context.
WorkerContext &worker_context_;
/// Reference to the parent CoreWorker's raylet client.
RayletClient &raylet_client_;
/// All the store providers supported.
std::unordered_map<int, std::unique_ptr<CoreWorkerStoreProvider>> store_providers_;

View file

@ -7,11 +7,15 @@
namespace ray {
CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
plasma::PlasmaClient &store_client, std::mutex &store_client_mutex,
RayletClient &raylet_client)
: store_client_(store_client),
store_client_mutex_(store_client_mutex),
raylet_client_(raylet_client) {}
const std::string &store_socket, RayletClient &raylet_client)
: raylet_client_(raylet_client) {
auto status = store_client_.Connect(store_socket);
if (!status.ok()) {
RAY_LOG(ERROR) << "Connecting plasma store failed when trying to construct"
<< " core worker: " << status.message();
throw std::runtime_error(status.message());
}
}
Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object,
const ObjectID &object_id) {

View file

@ -17,8 +17,7 @@ class CoreWorker;
/// local and remote store, remote access is done via raylet.
class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
public:
CoreWorkerPlasmaStoreProvider(plasma::PlasmaClient &store_client,
std::mutex &store_client_mutex,
CoreWorkerPlasmaStoreProvider(const std::string &store_socket,
RayletClient &raylet_client);
/// Put an object with specified ID into object store.
@ -62,10 +61,10 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
private:
/// Plasma store client.
plasma::PlasmaClient &store_client_;
plasma::PlasmaClient store_client_;
/// Mutex to protect store_client_.
std::mutex &store_client_mutex_;
std::mutex store_client_mutex_;
/// Raylet client.
RayletClient &raylet_client_;

View file

@ -6,12 +6,12 @@
namespace ray {
CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface(
CoreWorker &core_worker)
: core_worker_(core_worker) {
task_receivers.emplace(
static_cast<int>(TaskTransportType::RAYLET),
std::unique_ptr<CoreWorkerRayletTaskReceiver>(
new CoreWorkerRayletTaskReceiver(core_worker_.raylet_client_)));
WorkerContext &worker_context, RayletClient &raylet_client,
CoreWorkerObjectInterface &object_interface)
: worker_context_(worker_context), object_interface_(object_interface) {
task_receivers.emplace(static_cast<int>(TaskTransportType::RAYLET),
std::unique_ptr<CoreWorkerRayletTaskReceiver>(
new CoreWorkerRayletTaskReceiver(raylet_client)));
}
Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
@ -27,10 +27,9 @@ Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
for (const auto &task : tasks) {
const auto &spec = task.GetTaskSpecification();
core_worker_.worker_context_.SetCurrentTask(spec);
worker_context_.SetCurrentTask(spec);
ray::rpc::Language language = ToRpcTaskLanguage(spec.GetLanguage());
RayFunction func{language, spec.FunctionDescriptor()};
RayFunction func{spec.GetLanguage(), spec.FunctionDescriptor()};
std::vector<std::shared_ptr<RayObject>> args;
RAY_CHECK_OK(BuildArgsForExecutor(spec, &args));
@ -90,7 +89,7 @@ Status CoreWorkerTaskExecutionInterface::BuildArgsForExecutor(
}
std::vector<std::shared_ptr<RayObject>> results;
auto status = core_worker_.object_interface_.Get(object_ids_to_fetch, -1, &results);
auto status = object_interface_.Get(object_ids_to_fetch, -1, &results);
if (status.ok()) {
for (size_t i = 0; i < results.size(); i++) {
(*args)[indices[i]] = results[i];

View file

@ -4,7 +4,8 @@
#include "ray/common/buffer.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/store_provider/store_provider.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/object_interface.h"
#include "ray/core_worker/transport/transport.h"
namespace ray {
@ -19,7 +20,10 @@ class TaskSpecification;
/// execution.
class CoreWorkerTaskExecutionInterface {
public:
CoreWorkerTaskExecutionInterface(CoreWorker &core_worker);
CoreWorkerTaskExecutionInterface(WorkerContext &worker_context,
RayletClient &raylet_client,
CoreWorkerObjectInterface &object_interface);
/// The callback provided app-language workers that executes tasks.
///
/// \param ray_function[in] Information about the function to execute.
@ -46,8 +50,10 @@ class CoreWorkerTaskExecutionInterface {
Status BuildArgsForExecutor(const raylet::TaskSpecification &spec,
std::vector<std::shared_ptr<RayObject>> *args);
/// Reference to the parent CoreWorker instance.
CoreWorker &core_worker_;
/// Reference to the parent CoreWorker's context.
WorkerContext &worker_context_;
/// Reference to the parent CoreWorker's objects interface.
CoreWorkerObjectInterface &object_interface_;
/// All the task task receivers supported.
std::unordered_map<int, std::unique_ptr<CoreWorkerTaskReceiver>> task_receivers;

View file

@ -8,7 +8,7 @@ namespace ray {
ActorHandle::ActorHandle(
const class ActorID &actor_id, const class ActorHandleID &actor_handle_id,
const ray::rpc::Language actor_language,
const ::Language actor_language,
const std::vector<std::string> &actor_creation_task_function_descriptor) {
inner_.set_actor_id(actor_id.Data(), actor_id.Size());
inner_.set_actor_handle_id(actor_handle_id.Data(), actor_handle_id.Size());
@ -30,8 +30,8 @@ ray::ActorHandleID ActorHandle::ActorHandleID() const {
return ActorHandleID::FromBinary(inner_.actor_handle_id());
};
ray::rpc::Language ActorHandle::ActorLanguage() const {
return (ray::rpc::Language)inner_.actor_language();
::Language ActorHandle::ActorLanguage() const {
return (::Language)inner_.actor_language();
};
std::vector<std::string> ActorHandle::ActorCreationTaskFunctionDescriptor() const {
@ -92,19 +92,19 @@ std::vector<ray::ActorHandleID> ActorHandle::NewActorHandles() const {
void ActorHandle::ClearNewActorHandles() { new_actor_handles_.clear(); }
CoreWorkerTaskInterface::CoreWorkerTaskInterface(CoreWorker &core_worker)
: core_worker_(core_worker) {
task_submitters_.emplace(
static_cast<int>(TaskTransportType::RAYLET),
std::unique_ptr<CoreWorkerRayletTaskSubmitter>(
new CoreWorkerRayletTaskSubmitter(core_worker_.raylet_client_)));
CoreWorkerTaskInterface::CoreWorkerTaskInterface(WorkerContext &worker_context,
RayletClient &raylet_client)
: worker_context_(worker_context) {
task_submitters_.emplace(static_cast<int>(TaskTransportType::RAYLET),
std::unique_ptr<CoreWorkerRayletTaskSubmitter>(
new CoreWorkerRayletTaskSubmitter(raylet_client)));
}
Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids) {
auto &context = core_worker_.worker_context_;
auto &context = worker_context_;
auto next_task_index = context.GetNextTaskIndex();
const auto task_id = GenerateTaskId(context.GetCurrentJobID(),
context.GetCurrentTaskID(), next_task_index);
@ -116,12 +116,11 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function,
}
auto task_arguments = BuildTaskArguments(args);
auto language = ToRayletTaskLanguage(function.language);
ray::raylet::TaskSpecification spec(context.GetCurrentJobID(),
context.GetCurrentTaskID(), next_task_index,
task_arguments, num_returns, task_options.resources,
language, function.function_descriptor);
function.language, function.function_descriptor);
std::vector<ObjectID> execution_dependencies;
TaskSpec task(std::move(spec), execution_dependencies);
@ -132,7 +131,7 @@ Status CoreWorkerTaskInterface::CreateActor(
const RayFunction &function, const std::vector<TaskArg> &args,
const ActorCreationOptions &actor_creation_options,
std::unique_ptr<ActorHandle> *actor_handle) {
auto &context = core_worker_.worker_context_;
auto &context = worker_context_;
auto next_task_index = context.GetNextTaskIndex();
const auto task_id = GenerateTaskId(context.GetCurrentJobID(),
context.GetCurrentTaskID(), next_task_index);
@ -147,7 +146,6 @@ Status CoreWorkerTaskInterface::CreateActor(
(*actor_handle)->SetActorCursor(return_ids[0]);
auto task_arguments = BuildTaskArguments(args);
auto language = ToRayletTaskLanguage(function.language);
// Note that the caller is supposed to specify required placement resources
// correctly via actor_creation_options.resources.
@ -155,8 +153,8 @@ Status CoreWorkerTaskInterface::CreateActor(
context.GetCurrentJobID(), context.GetCurrentTaskID(), next_task_index,
actor_creation_id, ObjectID::Nil(), actor_creation_options.max_reconstructions,
ActorID::Nil(), ActorHandleID::Nil(), 0, {}, task_arguments, 1,
actor_creation_options.resources, actor_creation_options.resources, language,
function.function_descriptor);
actor_creation_options.resources, actor_creation_options.resources,
function.language, function.function_descriptor);
std::vector<ObjectID> execution_dependencies;
TaskSpec task(std::move(spec), execution_dependencies);
@ -168,7 +166,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids) {
auto &context = core_worker_.worker_context_;
auto &context = worker_context_;
auto next_task_index = context.GetNextTaskIndex();
const auto task_id = GenerateTaskId(context.GetCurrentJobID(),
context.GetCurrentTaskID(), next_task_index);
@ -184,7 +182,6 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
ObjectID::FromBinary(actor_handle.ActorID().Binary());
auto task_arguments = BuildTaskArguments(args);
auto language = ToRayletTaskLanguage(function.language);
std::unique_lock<std::mutex> guard(actor_handle.mutex_);
@ -193,7 +190,7 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle,
ActorID::Nil(), actor_creation_dummy_object_id, 0, actor_handle.ActorID(),
actor_handle.ActorHandleID(), actor_handle.IncreaseTaskCounter(),
actor_handle.NewActorHandles(), task_arguments, num_returns, task_options.resources,
task_options.resources, language, function.function_descriptor);
task_options.resources, function.language, function.function_descriptor);
std::vector<ObjectID> execution_dependencies;
execution_dependencies.push_back(actor_handle.ActorCursor());

View file

@ -5,6 +5,7 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/transport/transport.h"
#include "ray/protobuf/core_worker.pb.h"
#include "ray/raylet/task.h"
@ -44,7 +45,7 @@ struct ActorCreationOptions {
class ActorHandle {
public:
ActorHandle(const ActorID &actor_id, const ActorHandleID &actor_handle_id,
const ray::rpc::Language actor_language,
const ::Language actor_language,
const std::vector<std::string> &actor_creation_task_function_descriptor);
ActorHandle(const ActorHandle &other);
@ -56,7 +57,7 @@ class ActorHandle {
ray::ActorHandleID ActorHandleID() const;
/// Language of the actor.
ray::rpc::Language ActorLanguage() const;
::Language ActorLanguage() const;
// Function descriptor of actor creation task.
std::vector<std::string> ActorCreationTaskFunctionDescriptor() const;
@ -110,7 +111,7 @@ class ActorHandle {
/// submission.
class CoreWorkerTaskInterface {
public:
CoreWorkerTaskInterface(CoreWorker &core_worker);
CoreWorkerTaskInterface(WorkerContext &worker_context, RayletClient &raylet_client);
/// Submit a normal task.
///
@ -146,10 +147,6 @@ class CoreWorkerTaskInterface {
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids);
private:
/// Reference to the parent CoreWorker instance.
CoreWorker &core_worker_;
private:
/// Build the arguments for a task spec.
///
@ -158,6 +155,9 @@ class CoreWorkerTaskInterface {
std::vector<std::shared_ptr<raylet::TaskArgument>> BuildTaskArguments(
const std::vector<TaskArg> &args);
/// Reference to the parent CoreWorker's context.
WorkerContext &worker_context_;
/// All the task submitters supported.
std::unordered_map<int, std::unique_ptr<CoreWorkerTaskSubmitter>> task_submitters_;
};