Remove is_direct logic from the raylet (#7698)

This commit is contained in:
Edward Oakes 2020-03-23 17:09:35 -05:00 committed by GitHub
parent 3fa2e4a346
commit 9318b29f5e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 49 additions and 103 deletions

View file

@ -600,7 +600,7 @@ class NodeStats(threading.Thread):
"jobId": ray.utils.binary_to_hex(
actor_data.job_id),
"state": actor_data.state,
"isDirectCall": actor_data.is_direct_call,
"isDirectCall": True,
"timestamp": actor_data.timestamp
}
else:

View file

@ -333,7 +333,7 @@ class GlobalState:
"IPAddress": actor_table_data.owner_address.ip_address,
"Port": actor_table_data.owner_address.port
},
"IsDirectCall": actor_table_data.is_direct_call,
"IsDirectCall": True,
"State": actor_table_data.state,
"Timestamp": actor_table_data.timestamp,
}

View file

@ -221,16 +221,6 @@ ObjectID TaskSpecification::ActorDummyObject() const {
return ReturnId(NumReturns() - 1, TaskTransportType::RAYLET);
}
bool TaskSpecification::IsDirectCall() const { return message_->is_direct_call(); }
bool TaskSpecification::IsDirectActorCreationCall() const {
if (IsActorCreationTask()) {
return message_->actor_creation_task_spec().is_direct_call();
} else {
return false;
}
}
int TaskSpecification::MaxActorConcurrency() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().max_concurrency();
@ -261,7 +251,6 @@ std::string TaskSpecification::DebugString() const {
// Print actor creation task spec.
stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId()
<< ", max_reconstructions=" << MaxActorReconstructions()
<< ", is_direct_call=" << IsDirectCall()
<< ", max_concurrency=" << MaxActorConcurrency()
<< ", is_asyncio_actor=" << IsAsyncioActor()
<< ", is_detached=" << IsDetachedActor() << "}";

View file

@ -159,8 +159,6 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
bool IsDirectCall() const;
bool IsDirectActorCreationCall() const;
int MaxActorConcurrency() const;
bool IsAsyncioActor() const;

View file

@ -27,7 +27,7 @@ class TaskSpecBuilder {
const TaskID &task_id, const Language &language,
const ray::FunctionDescriptor &function_descriptor, const JobID &job_id,
const TaskID &parent_task_id, uint64_t parent_counter, const TaskID &caller_id,
const rpc::Address &caller_address, uint64_t num_returns, bool is_direct_call,
const rpc::Address &caller_address, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources) {
message_->set_type(TaskType::NORMAL_TASK);
@ -40,7 +40,6 @@ class TaskSpecBuilder {
message_->set_caller_id(caller_id.Binary());
message_->mutable_caller_address()->CopyFrom(caller_address);
message_->set_num_returns(num_returns);
message_->set_is_direct_call(is_direct_call);
message_->mutable_required_resources()->insert(required_resources.begin(),
required_resources.end());
message_->mutable_required_placement_resources()->insert(
@ -65,7 +64,6 @@ class TaskSpecBuilder {
message_->set_caller_id(caller_id.Binary());
message_->mutable_caller_address()->CopyFrom(caller_address);
message_->set_num_returns(0);
message_->set_is_direct_call(false);
return *this;
}
@ -105,8 +103,7 @@ class TaskSpecBuilder {
TaskSpecBuilder &SetActorCreationTaskSpec(
const ActorID &actor_id, uint64_t max_reconstructions = 0,
const std::vector<std::string> &dynamic_worker_options = {},
bool is_direct_call = false, int max_concurrency = 1, bool is_detached = false,
bool is_asyncio = false) {
int max_concurrency = 1, bool is_detached = false, bool is_asyncio = false) {
message_->set_type(TaskType::ACTOR_CREATION_TASK);
auto actor_creation_spec = message_->mutable_actor_creation_task_spec();
actor_creation_spec->set_actor_id(actor_id.Binary());
@ -114,7 +111,6 @@ class TaskSpecBuilder {
for (const auto &option : dynamic_worker_options) {
actor_creation_spec->add_dynamic_worker_options(option);
}
actor_creation_spec->set_is_direct_call(is_direct_call);
actor_creation_spec->set_max_concurrency(max_concurrency);
actor_creation_spec->set_is_asyncio(is_asyncio);
actor_creation_spec->set_is_detached(is_detached);

View file

@ -103,13 +103,13 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
if (task_spec.IsNormalTask()) {
RAY_CHECK(current_job_id_.IsNil());
SetCurrentJobId(task_spec.JobId());
current_task_is_direct_call_ = task_spec.IsDirectCall();
current_task_is_direct_call_ = true;
} else if (task_spec.IsActorCreationTask()) {
RAY_CHECK(current_job_id_.IsNil());
SetCurrentJobId(task_spec.JobId());
RAY_CHECK(current_actor_id_.IsNil());
current_actor_id_ = task_spec.ActorCreationId();
current_actor_is_direct_call_ = task_spec.IsDirectActorCreationCall();
current_actor_is_direct_call_ = true;
current_actor_max_concurrency_ = task_spec.MaxActorConcurrency();
current_actor_is_asyncio_ = task_spec.IsAsyncioActor();
} else if (task_spec.IsActorTask()) {

View file

@ -40,8 +40,7 @@ void BuildCommonTaskSpec(
builder.SetCommonTaskSpec(task_id, function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, current_task_id,
task_index, caller_id, address, num_returns,
/*is_direct_transport_type=*/true, required_resources,
required_placement_resources);
required_resources, required_placement_resources);
// Set task arguments.
for (const auto &arg : args) {
if (arg.IsPassedByReference()) {
@ -827,11 +826,11 @@ Status CoreWorker::CreateActor(const RayFunction &function,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, 1, actor_creation_options.resources,
actor_creation_options.placement_resources, &return_ids);
builder.SetActorCreationTaskSpec(
actor_id, actor_creation_options.max_reconstructions,
actor_creation_options.dynamic_worker_options,
/*is_direct_call=*/true, actor_creation_options.max_concurrency,
actor_creation_options.is_detached, actor_creation_options.is_asyncio);
builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions,
actor_creation_options.dynamic_worker_options,
actor_creation_options.max_concurrency,
actor_creation_options.is_detached,
actor_creation_options.is_asyncio);
*return_actor_id = actor_id;
TaskSpecification task_spec = builder.Build();

View file

@ -643,8 +643,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(),
function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0,
RandomTaskId(), address, num_returns, /*is_direct*/ false,
resources, resources);
RandomTaskId(), address, num_returns, resources, resources);
// Set task arguments.
for (const auto &arg : args) {
if (arg.IsPassedByReference()) {

View file

@ -293,7 +293,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map<std::string, double> &r
rpc::Address empty_address;
builder.SetCommonTaskSpec(TaskID::Nil(), Language::PYTHON, function_descriptor,
JobID::Nil(), TaskID::Nil(), 0, TaskID::Nil(), empty_address,
1, true, resources, resources);
1, resources, resources);
return builder.Build();
}

View file

@ -74,7 +74,6 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(
actor_info_ptr->set_is_detached(task_spec.IsDetachedActor());
// Set the fields that change when the actor is restarted.
actor_info_ptr->set_remaining_reconstructions(remaining_reconstructions);
actor_info_ptr->set_is_direct_call(task_spec.IsDirectCall());
actor_info_ptr->mutable_address()->CopyFrom(address);
actor_info_ptr->mutable_owner_address()->CopyFrom(
task_spec.GetMessage().caller_address());

View file

@ -113,10 +113,8 @@ message TaskSpec {
// Task specification for an actor task.
// This field is only valid when `type == ACTOR_TASK`.
ActorTaskSpec actor_task_spec = 15;
// Whether this task is a direct call task.
bool is_direct_call = 16;
// Number of times this task may be retried on worker failure.
int32 max_retries = 17;
int32 max_retries = 16;
}
// Argument in the task.
@ -146,14 +144,12 @@ message ActorCreationTaskSpec {
// the placeholder strings (`RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER_0`,
// `RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER_1`, etc) in the worker command.
repeated string dynamic_worker_options = 4;
// Whether direct actor call is used.
bool is_direct_call = 5;
// The max number of concurrent calls for direct call actors.
int32 max_concurrency = 6;
int32 max_concurrency = 5;
// Whether the actor is persistent
bool is_detached = 7;
bool is_detached = 6;
// Whether the actor use async actor calls
bool is_asyncio = 8;
bool is_asyncio = 7;
}
// Task spec of an actor task.

View file

@ -127,12 +127,10 @@ message ActorTableData {
Address address = 9;
// The address of the the actor's owner (parent).
Address owner_address = 10;
// Whether direct actor call is used.
bool is_direct_call = 11;
// Whether the actor is persistent.
bool is_detached = 12;
bool is_detached = 11;
// Timestamp that the actor is created or reconstructed.
double timestamp = 13;
double timestamp = 12;
}
message ErrorTableData {

View file

@ -174,15 +174,13 @@ LineageCache::LineageCache(const ClientID &self_node_id,
/// A helper function to add some uncommitted lineage to the local cache.
void LineageCache::AddUncommittedLineage(const TaskID &task_id,
const Lineage &uncommitted_lineage) {
// TODO(edoakes): remove this method, it's currently only called in unit tests.
RAY_LOG(DEBUG) << "Adding uncommitted task " << task_id << " on " << self_node_id_;
// If the entry is not found in the lineage to merge, then we stop since
// there is nothing to copy into the merged lineage.
auto entry = uncommitted_lineage.GetEntry(task_id);
if (!entry) {
return;
} else if (entry->TaskData().GetTaskSpecification().IsDirectCall()) {
// Disable lineage logging for direct tasks.
return;
}
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED);
@ -200,10 +198,7 @@ void LineageCache::AddUncommittedLineage(const TaskID &task_id,
}
bool LineageCache::CommitTask(const Task &task) {
if (task.GetTaskSpecification().IsDirectCall()) {
// Disable lineage logging for direct tasks.
return true;
}
// TODO(edoakes): remove this method, it's currently only called in unit tests.
const TaskID task_id = task.GetTaskSpecification().TaskId();
RAY_LOG(DEBUG) << "Committing task " << task_id << " on " << self_node_id_;

View file

@ -197,7 +197,7 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""),
JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address,
num_returns, false, {}, {});
num_returns, {}, {});
for (const auto &arg : arguments) {
builder.AddByRefArg(arg);
}

View file

@ -1439,18 +1439,8 @@ void NodeManager::ProcessPrepareActorCheckpointRequest(
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
RAY_CHECK(worker && worker->GetActorId() == actor_id);
std::shared_ptr<ActorCheckpointData> checkpoint_data;
if (actor_entry->second.GetTableData().is_direct_call()) {
checkpoint_data =
actor_entry->second.GenerateCheckpointData(actor_entry->first, nullptr);
} else {
// Find the task that is running on this actor.
const auto task_id = worker->GetAssignedTaskId();
const Task &task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING);
// Generate checkpoint data.
checkpoint_data =
actor_entry->second.GenerateCheckpointData(actor_entry->first, &task);
}
std::shared_ptr<ActorCheckpointData> checkpoint_data =
actor_entry->second.GenerateCheckpointData(actor_entry->first, nullptr);
// Write checkpoint data to GCS.
RAY_CHECK_OK(gcs_client_->Actors().AsyncAddCheckpoint(
@ -2005,17 +1995,6 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
return;
}
// Add the task and its uncommitted lineage to the lineage cache.
if (forwarded) {
lineage_cache_.AddUncommittedLineage(task_id, uncommitted_lineage);
} else {
if (!lineage_cache_.CommitTask(task)) {
RAY_LOG(WARNING)
<< "Task " << task_id
<< " already committed to the GCS. This is most likely due to reconstruction.";
}
}
if (spec.IsActorTask()) {
// Check whether we know the location of the actor.
const auto actor_entry = actor_registry_.find(spec.ActorId());
@ -2464,7 +2443,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) {
if (!spec.IsActorCreationTask() && !spec.IsActorTask()) {
worker.AssignJobId(JobID::Nil());
}
if (!spec.IsDirectActorCreationCall()) {
if (!spec.IsActorCreationTask()) {
// Unset the worker's assigned task. We keep the assigned task ID for
// direct actor creation calls because this ID is used later if the actor
// requires objects from plasma.
@ -2473,7 +2452,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) {
}
// Direct actors will be assigned tasks via the core worker and therefore are
// not idle.
return !spec.IsDirectActorCreationCall();
return !spec.IsActorCreationTask();
}
std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTask(
@ -2498,7 +2477,6 @@ std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTas
// This is the first time that the actor has been created, so the number
// of remaining reconstructions is the max.
actor_info_ptr->set_remaining_reconstructions(task_spec.MaxActorReconstructions());
actor_info_ptr->set_is_direct_call(task_spec.IsDirectActorCreationCall());
actor_info_ptr->set_is_detached(task_spec.IsDetachedActor());
actor_info_ptr->mutable_owner_address()->CopyFrom(
task_spec.GetMessage().caller_address());
@ -2807,12 +2785,12 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
}
}
bool NodeManager::IsDirectActorCreationTask(const TaskID &task_id) {
bool NodeManager::IsActorCreationTask(const TaskID &task_id) {
auto actor_id = task_id.ActorId();
if (!actor_id.IsNil() && task_id == TaskID::ForActorCreationTask(actor_id)) {
// This task ID corresponds to an actor creation task.
auto iter = actor_registry_.find(actor_id);
if (iter != actor_registry_.end() && iter->second.GetTableData().is_direct_call()) {
if (iter != actor_registry_.end()) {
// This actor is direct call actor.
return true;
}
@ -2854,7 +2832,7 @@ void NodeManager::HandleObjectMissing(const ObjectID &object_id) {
// So here we check for direct actor creation task explicitly to allow this case.
auto iter = waiting_task_id_set.begin();
while (iter != waiting_task_id_set.end()) {
if (IsDirectActorCreationTask(*iter)) {
if (IsActorCreationTask(*iter)) {
RAY_LOG(DEBUG) << "Ignoring direct actor creation task " << *iter
<< " when handling object missing for " << object_id;
iter = waiting_task_id_set.erase(iter);

View file

@ -617,8 +617,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Repeat the process as long as we can schedule a task.
void NewSchedulerSchedulePendingTasks();
/// Whether a task is an direct actor creation task.
bool IsDirectActorCreationTask(const TaskID &task_id);
/// Whether a task is an actor creation task.
bool IsActorCreationTask(const TaskID &task_id);
/// ID of this node.
ClientID self_node_id_;

View file

@ -337,23 +337,21 @@ void TaskDependencyManager::TaskPending(const Task &task) {
// and eventually assigned to a worker. In this case we need
// the task lease to make sure there's only one raylet can
// resubmit the task.
if (task.GetTaskSpecification().IsDirectCall()) {
// We can use `OnDispatch` to differeniate whether this task is
// a worker lease request.
// For direct actor creation task:
// - when it's submitted by core worker, we guarantee that
// we always request a new worker lease, in that case
// `OnDispatch` is overriden to an actual callback.
// - when it's resubmitted by raylet because of reconstruction,
// `OnDispatch` will not be overriden and thus is nullptr.
if (task.GetTaskSpecification().IsActorCreationTask() &&
task.OnDispatch() == nullptr) {
// This is an actor creation task, and it's being reconstructed,
// in this case we still need the task lease. Note that we don't
// require task lease for direct actor creation task.
} else {
return;
}
//
// We can use `OnDispatch` to differeniate whether this task is
// a worker lease request.
// For direct actor creation task:
// - when it's submitted by core worker, we guarantee that
// we always request a new worker lease, in that case
// `OnDispatch` is overriden to an actual callback.
// - when it's resubmitted by raylet because of reconstruction,
// `OnDispatch` will not be overriden and thus is nullptr.
if (task.GetTaskSpecification().IsActorCreationTask() && task.OnDispatch() == nullptr) {
// This is an actor creation task, and it's being reconstructed,
// in this case we still need the task lease. Note that we don't
// require task lease for direct actor creation task.
} else {
return;
}
TaskID task_id = task.GetTaskSpecification().TaskId();

View file

@ -110,7 +110,8 @@ static inline Task ExampleTask(const std::vector<ObjectID> &arguments,
builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""),
JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address,
num_returns, false, {}, {});
num_returns, {}, {});
builder.SetActorCreationTaskSpec(ActorID::Nil(), 1, {}, 1, false, false);
for (const auto &arg : arguments) {
builder.AddByRefArg(arg);
}