[refactor cluster-task-manage 0/n] move internal state into a separate header #22160

this is the first PR that refactors cluster task manager. specifically, we move those internal state into a separate header file.
This commit is contained in:
Chen Shen 2022-02-06 22:17:33 -08:00 committed by GitHub
parent ff8af2edba
commit cc577c10ed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 106 additions and 80 deletions

View file

@ -22,6 +22,7 @@
#include "ray/raylet/dependency_manager.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/cluster_task_manager_interface.h"
#include "ray/raylet/scheduling/internal.h"
#include "ray/raylet/worker.h"
#include "ray/raylet/worker_pool.h"
#include "ray/rpc/grpc_client.h"
@ -31,86 +32,6 @@
namespace ray {
namespace raylet {
namespace internal {
enum class WorkStatus {
/// Waiting to be scheduled.
WAITING,
/// Waiting for a worker to start.
WAITING_FOR_WORKER,
/// Queued task has been cancelled.
CANCELLED,
};
/// This enum represents the cause of why work hasn't been scheduled yet.
enum class UnscheduledWorkCause {
/// Waiting for acquiring resources.
WAITING_FOR_RESOURCE_ACQUISITION,
/// Waiting for more plasma store memory to be available. This is set when we can't pin
/// task arguments due to the lack of memory.
WAITING_FOR_AVAILABLE_PLASMA_MEMORY,
/// Pending because there's no node that satisfies the resources in the cluster.
WAITING_FOR_RESOURCES_AVAILABLE,
/// Waiting because the worker wasn't available since job config for the worker wasn't
/// registered yet.
WORKER_NOT_FOUND_JOB_CONFIG_NOT_EXIST,
/// Waiting becasue the worker wasn't available since its registration timed out.
WORKER_NOT_FOUND_REGISTRATION_TIMEOUT,
/// Waiting because the worker wasn't available since it was rate limited.
WORKER_NOT_FOUND_RATE_LIMITED,
};
/// Work represents all the information needed to make a scheduling decision.
/// This includes the task, the information we need to communicate to
/// dispatch/spillback and the callback to trigger it.
class Work {
public:
RayTask task;
const bool grant_or_reject;
const bool is_selected_based_on_locality;
rpc::RequestWorkerLeaseReply *reply;
std::function<void(void)> callback;
std::shared_ptr<TaskResourceInstances> allocated_instances;
Work(RayTask task, bool grant_or_reject, bool is_selected_based_on_locality,
rpc::RequestWorkerLeaseReply *reply, std::function<void(void)> callback,
WorkStatus status = WorkStatus::WAITING)
: task(task),
grant_or_reject(grant_or_reject),
is_selected_based_on_locality(is_selected_based_on_locality),
reply(reply),
callback(callback),
allocated_instances(nullptr),
status_(status){};
Work(const Work &Work) = delete;
Work &operator=(const Work &work) = delete;
~Work() = default;
/// Set the state as waiting with the cause.
void SetStateWaiting(const UnscheduledWorkCause &cause) {
status_ = WorkStatus::WAITING;
unscheduled_work_cause_ = cause;
}
/// Set the state as waiting for workers, meaning it is waiting for workers to start.
void SetStateWaitingForWorker() { status_ = WorkStatus::WAITING_FOR_WORKER; }
/// Set the state as cancelled, meaning this task has to be unqueued from the node.
void SetStateCancelled() { status_ = WorkStatus::CANCELLED; }
WorkStatus GetState() const { return status_; }
UnscheduledWorkCause GetUnscheduledCause() const { return unscheduled_work_cause_; }
private:
WorkStatus status_ = WorkStatus::WAITING;
UnscheduledWorkCause unscheduled_work_cause_ =
UnscheduledWorkCause::WAITING_FOR_RESOURCE_ACQUISITION;
};
typedef std::function<const rpc::GcsNodeInfo *(const NodeID &node_id)> NodeInfoGetter;
} // namespace internal
/// Manages the queuing and dispatching of tasks. The logic is as follows:
/// 1. Queue tasks for scheduling.
/// 2. Pick a node on the cluster which has the available resources to run a

View file

@ -0,0 +1,105 @@
// Copyright 2020-2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/common/ray_object.h"
#include "ray/common/task/task.h"
#include "ray/common/task/task_common.h"
namespace ray {
namespace raylet {
namespace internal {
enum class WorkStatus {
/// Waiting to be scheduled.
WAITING,
/// Waiting for a worker to start.
WAITING_FOR_WORKER,
/// Queued task has been cancelled.
CANCELLED,
};
/// This enum represents the cause of why work hasn't been scheduled yet.
enum class UnscheduledWorkCause {
/// Waiting for acquiring resources.
WAITING_FOR_RESOURCE_ACQUISITION,
/// Waiting for more plasma store memory to be available. This is set when we can't pin
/// task arguments due to the lack of memory.
WAITING_FOR_AVAILABLE_PLASMA_MEMORY,
/// Pending because there's no node that satisfies the resources in the cluster.
WAITING_FOR_RESOURCES_AVAILABLE,
/// Waiting because the worker wasn't available since job config for the worker wasn't
/// registered yet.
WORKER_NOT_FOUND_JOB_CONFIG_NOT_EXIST,
/// Waiting becasue the worker wasn't available since its registration timed out.
WORKER_NOT_FOUND_REGISTRATION_TIMEOUT,
/// Waiting because the worker wasn't available since it was rate limited.
WORKER_NOT_FOUND_RATE_LIMITED,
};
/// Work represents all the information needed to make a scheduling decision.
/// This includes the task, the information we need to communicate to
/// dispatch/spillback and the callback to trigger it.
class Work {
public:
RayTask task;
const bool grant_or_reject;
const bool is_selected_based_on_locality;
rpc::RequestWorkerLeaseReply *reply;
std::function<void(void)> callback;
std::shared_ptr<TaskResourceInstances> allocated_instances;
Work(RayTask task, bool grant_or_reject, bool is_selected_based_on_locality,
rpc::RequestWorkerLeaseReply *reply, std::function<void(void)> callback,
WorkStatus status = WorkStatus::WAITING)
: task(task),
grant_or_reject(grant_or_reject),
is_selected_based_on_locality(is_selected_based_on_locality),
reply(reply),
callback(callback),
allocated_instances(nullptr),
status_(status){};
Work(const Work &Work) = delete;
Work &operator=(const Work &work) = delete;
~Work() = default;
/// Set the state as waiting with the cause.
void SetStateWaiting(const UnscheduledWorkCause &cause) {
status_ = WorkStatus::WAITING;
unscheduled_work_cause_ = cause;
}
/// Set the state as waiting for workers, meaning it is waiting for workers to start.
void SetStateWaitingForWorker() { status_ = WorkStatus::WAITING_FOR_WORKER; }
/// Set the state as cancelled, meaning this task has to be unqueued from the node.
void SetStateCancelled() { status_ = WorkStatus::CANCELLED; }
WorkStatus GetState() const { return status_; }
UnscheduledWorkCause GetUnscheduledCause() const { return unscheduled_work_cause_; }
private:
WorkStatus status_ = WorkStatus::WAITING;
UnscheduledWorkCause unscheduled_work_cause_ =
UnscheduledWorkCause::WAITING_FOR_RESOURCE_ACQUISITION;
};
typedef std::function<const rpc::GcsNodeInfo *(const NodeID &node_id)> NodeInfoGetter;
} // namespace internal
} // namespace raylet
} // namespace ray