Move actor task submission to io service (#9093)

This commit is contained in:
Simon Mo 2020-06-23 10:07:33 -07:00 committed by GitHub
parent 306ca75737
commit b6d425526d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 31 additions and 37 deletions

View file

@ -109,7 +109,8 @@ include "includes/serialization.pxi"
include "includes/libcoreworker.pxi"
include "includes/global_state_accessor.pxi"
# Expose GCC & Clang macro to report whether C++ optimizations were enabled during compilation.
# Expose GCC & Clang macro to report whether C++ optimizations were enabled
# during compilation.
OPTIMIZED = __OPTIMIZE__
logger = logging.getLogger(__name__)
@ -953,11 +954,10 @@ cdef class CoreWorker:
prepare_args(self, language, args, &args_vector)
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().SubmitActorTask(
c_actor_id,
ray_function,
args_vector, task_options, &return_ids))
CCoreWorkerProcess.GetCoreWorker().SubmitActorTask(
c_actor_id,
ray_function,
args_vector, task_options, &return_ids)
return VectorToObjectIDs(return_ids)

View file

@ -89,7 +89,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CActorCreationOptions &options,
const c_string &extension_data, CActorID *actor_id)
CRayStatus SubmitActorTask(
void SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[CTaskArg] &args, const CTaskOptions &options,
c_vector[CObjectID] *return_ids)

View file

@ -1171,12 +1171,12 @@ Status CoreWorker::CreateActor(const RayFunction &function,
return status;
}
Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids) {
void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids) {
ActorHandle *actor_handle = nullptr;
RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle));
RAY_CHECK_OK(GetActorHandle(actor_id, &actor_handle));
// Add one for actor cursor object id for tasks.
const int num_returns = task_options.num_returns + 1;
@ -1199,16 +1199,16 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
return_ids->pop_back();
// Submit task.
Status status;
TaskSpecification task_spec = builder.Build();
if (options_.is_local_mode) {
ExecuteTaskLocalMode(task_spec, actor_id);
} else {
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite(), actor_handle->MaxTaskRetries());
status = direct_actor_submitter_->SubmitTask(task_spec);
io_service_.post([this, task_spec]() {
RAY_UNUSED(direct_actor_submitter_->SubmitTask(task_spec));
});
}
return status;
}
Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) {

View file

@ -579,10 +579,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \return Status error if the task is invalid or if the task submission
/// failed. Tasks can be invalid for direct actor calls because not all tasks
/// are currently supported.
Status SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids);
void SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<TaskArg> &args, const TaskOptions &task_options,
std::vector<ObjectID> *return_ids);
/// Tell an actor to exit immediately, without completing outstanding work.
///

View file

@ -170,10 +170,9 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(
auto task_options = ToTaskOptions(env, numReturns, callOptions);
std::vector<ObjectID> return_ids;
auto status = ray::CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
ray::CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
actor_id, ray_function, task_args, task_options, &return_ids);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
return NativeIdVectorToJavaByteArrayList(env, return_ids);
}

View file

@ -225,8 +225,8 @@ int CoreWorkerTest::GetActorPid(const ActorID &actor_id,
RayFunction func{Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"GetWorkerPid", "", "", "")};
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args,
options, &return_ids));
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(actor_id, func, args, options,
&return_ids);
std::vector<std::shared_ptr<ray::RayObject>> results;
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().Get(return_ids, -1, &results));
@ -306,7 +306,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
ASSERT_TRUE(return_ids[0].IsReturnObject());
@ -348,8 +348,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
std::vector<ObjectID> return_ids;
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
auto status = driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_TRUE(status.ok());
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
@ -412,7 +411,7 @@ void CoreWorkerTest::TestActorRestart(
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
// Verify if it's expected data.
std::vector<std::shared_ptr<RayObject>> results;
@ -455,7 +454,7 @@ void CoreWorkerTest::TestActorFailure(
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
all_results.emplace_back(std::make_pair(return_ids[0], buffer1));
@ -606,7 +605,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"MergeInputArgsAsOutput", "", "", ""));
RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
ASSERT_EQ(return_ids.size(), 1);
object_ids.emplace_back(return_ids[0]);
}

View file

@ -29,11 +29,8 @@ void Transport::SendInternal(std::shared_ptr<LocalMemoryBuffer> buffer,
std::move(buffer), meta, std::vector<ObjectID>(), true)));
std::vector<std::shared_ptr<RayObject>> results;
ray::Status st = CoreWorkerProcess::GetCoreWorker().SubmitActorTask(
peer_actor_id_, function, args, options, &return_ids);
if (!st.ok()) {
STREAMING_LOG(ERROR) << "SubmitActorTask failed. " << st;
}
CoreWorkerProcess::GetCoreWorker().SubmitActorTask(peer_actor_id_, function, args,
options, &return_ids);
}
void Transport::Send(std::shared_ptr<LocalMemoryBuffer> buffer) {

View file

@ -91,7 +91,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
RayFunction func{ray::Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "init", "")};
RAY_CHECK_OK(driver.SubmitActorTask(self_actor_id, func, args, options, &return_ids));
driver.SubmitActorTask(self_actor_id, func, args, options, &return_ids);
}
void SubmitTestToActor(ActorID &actor_id, const std::string test) {
@ -107,7 +107,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"", test, "execute_test", "")};
RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
}
bool CheckCurTest(ActorID &actor_id, const std::string test_name) {
@ -123,7 +123,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"", "", "check_current_test_status", "")};
RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids));
driver.SubmitActorTask(actor_id, func, args, options, &return_ids);
std::vector<bool> wait_results;
std::vector<std::shared_ptr<RayObject>> results;