mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Prestart workers to avoid slow start when multi-tenancy is enabled (#12430)
This commit is contained in:
parent
0df55a139c
commit
9ad0f173d6
5 changed files with 80 additions and 2 deletions
|
@ -306,6 +306,11 @@ RAY_CONFIG(bool, enable_multi_tenancy,
|
|||
getenv("RAY_ENABLE_MULTI_TENANCY") == nullptr ||
|
||||
getenv("RAY_ENABLE_MULTI_TENANCY") == std::string("1"))
|
||||
|
||||
/// Whether to enable worker prestarting: https://github.com/ray-project/ray/issues/12052
|
||||
RAY_CONFIG(bool, enable_worker_prestart,
|
||||
getenv("RAY_ENABLE_WORKER_PRESTART") == nullptr ||
|
||||
getenv("RAY_ENABLE_WORKER_PRESTART") == std::string("1"))
|
||||
|
||||
/// The interval of periodic idle worker killing. A negative value means worker capping is
|
||||
/// disabled.
|
||||
RAY_CONFIG(int64_t, kill_idle_workers_interval_ms, 200)
|
||||
|
|
|
@ -1717,6 +1717,13 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
|
|||
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr));
|
||||
}
|
||||
|
||||
// Prestart optimization is only needed when multi-tenancy is on.
|
||||
if (RayConfig::instance().enable_multi_tenancy() &&
|
||||
RayConfig::instance().enable_worker_prestart()) {
|
||||
auto task_spec = task.GetTaskSpecification();
|
||||
worker_pool_.PrestartWorkers(task_spec, request.backlog_size());
|
||||
}
|
||||
|
||||
if (new_scheduler_enabled_) {
|
||||
auto task_spec = task.GetTaskSpecification();
|
||||
cluster_task_manager_->QueueTask(task, reply, [send_reply_callback]() {
|
||||
|
|
|
@ -890,6 +890,41 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
|
|||
return worker;
|
||||
}
|
||||
|
||||
void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
|
||||
int64_t backlog_size) {
|
||||
// Code path of task that needs a dedicated worker: an actor creation task with
|
||||
// dynamic worker options, or any task with environment variable overrides.
|
||||
if ((task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) ||
|
||||
task_spec.OverrideEnvironmentVariables().size() > 0) {
|
||||
return; // Not handled.
|
||||
}
|
||||
|
||||
auto &state = GetStateForLanguage(task_spec.GetLanguage());
|
||||
// The number of available workers that can be used for this task spec.
|
||||
int num_usable_workers = state.idle.size();
|
||||
for (auto &entry : state.starting_worker_processes) {
|
||||
num_usable_workers += entry.second;
|
||||
}
|
||||
// The number of workers total regardless of suitability for this task.
|
||||
int num_workers_total = 0;
|
||||
for (const auto &worker : GetAllRegisteredWorkers()) {
|
||||
if (!worker->IsDead()) {
|
||||
num_workers_total++;
|
||||
}
|
||||
}
|
||||
auto desired_usable_workers =
|
||||
std::min<int64_t>(num_workers_soft_limit_ - num_workers_total, backlog_size);
|
||||
if (num_usable_workers < desired_usable_workers) {
|
||||
int64_t num_needed = desired_usable_workers - num_usable_workers;
|
||||
RAY_LOG(DEBUG) << "Prestarting " << num_needed << " workers given task backlog size "
|
||||
<< backlog_size << " and soft limit " << num_workers_soft_limit_;
|
||||
for (int i = 0; i < num_needed; i++) {
|
||||
StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
|
||||
task_spec.JobId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker) {
|
||||
auto &state = GetStateForLanguage(worker->GetLanguage());
|
||||
RAY_CHECK(RemoveWorker(state.registered_workers, worker));
|
||||
|
|
|
@ -228,6 +228,14 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
|
|||
/// such worker exists.
|
||||
std::shared_ptr<WorkerInterface> PopWorker(const TaskSpecification &task_spec);
|
||||
|
||||
/// Try to prestart a number of workers suitable the given task spec. Prestarting
|
||||
/// is needed since core workers request one lease at a time, if starting is slow,
|
||||
/// then it means it takes a long time to scale up when multi-tenancy is on.
|
||||
///
|
||||
/// \param task_spec The returned worker must be able to execute this task.
|
||||
/// \param backlog_size The number of tasks in the client backlog of this shape.
|
||||
void PrestartWorkers(const TaskSpecification &task_spec, int64_t backlog_size);
|
||||
|
||||
/// Return the current size of the worker pool for the requested language. Counts only
|
||||
/// idle workers.
|
||||
///
|
||||
|
|
|
@ -27,6 +27,7 @@ namespace raylet {
|
|||
int NUM_WORKERS_PER_PROCESS_JAVA = 3;
|
||||
int MAXIMUM_STARTUP_CONCURRENCY = 5;
|
||||
int MAX_IO_WORKER_SIZE = 2;
|
||||
int POOL_SIZE_SOFT_LIMIT = 5;
|
||||
JobID JOB_ID = JobID::FromInt(1);
|
||||
|
||||
std::vector<Language> LANGUAGES = {Language::PYTHON, Language::JAVA};
|
||||
|
@ -35,8 +36,8 @@ class WorkerPoolMock : public WorkerPool {
|
|||
public:
|
||||
explicit WorkerPoolMock(boost::asio::io_service &io_service,
|
||||
const WorkerCommandMap &worker_commands)
|
||||
: WorkerPool(io_service, 0, 0, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr,
|
||||
worker_commands, {}, []() {}),
|
||||
: WorkerPool(io_service, 0, POOL_SIZE_SOFT_LIMIT, 0, MAXIMUM_STARTUP_CONCURRENCY, 0,
|
||||
0, {}, nullptr, worker_commands, {}, []() {}),
|
||||
last_worker_process_() {
|
||||
states_by_lang_[ray::Language::JAVA].num_workers_per_process =
|
||||
NUM_WORKERS_PER_PROCESS_JAVA;
|
||||
|
@ -315,6 +316,28 @@ TEST_P(WorkerPoolTest, InitialWorkerProcessCount) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_P(WorkerPoolTest, TestPrestartingWorkers) {
|
||||
if (RayConfig::instance().enable_multi_tenancy()) {
|
||||
const auto task_spec = ExampleTaskSpec();
|
||||
// Prestarts 2 workers.
|
||||
worker_pool_->PrestartWorkers(task_spec, 2);
|
||||
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 2);
|
||||
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 2);
|
||||
// Prestarts 1 more worker.
|
||||
worker_pool_->PrestartWorkers(task_spec, 3);
|
||||
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3);
|
||||
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3);
|
||||
// No more needed.
|
||||
worker_pool_->PrestartWorkers(task_spec, 1);
|
||||
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 3);
|
||||
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 3);
|
||||
// Capped by soft limit of 5.
|
||||
worker_pool_->PrestartWorkers(task_spec, 20);
|
||||
ASSERT_EQ(worker_pool_->NumWorkersStarting(), 5);
|
||||
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 5);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(WorkerPoolTest, HandleWorkerPushPop) {
|
||||
// Try to pop a worker from the empty pool and make sure we don't get one.
|
||||
std::shared_ptr<WorkerInterface> popped_worker;
|
||||
|
|
Loading…
Add table
Reference in a new issue