diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 188db98b8..2bbf27635 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -20,7 +20,8 @@ import ray._private.utils from ray.util.queue import Queue, _QueueActor, Empty import requests from ray.scripts.scripts import main as ray_main - +from ray.workers.pluggable_runtime_env import (RuntimeEnvContext, + get_hook_logger) try: from prometheus_client.parser import text_string_to_metric_families except (ImportError, ModuleNotFoundError): @@ -594,6 +595,15 @@ def set_setup_func(): runtime_env.VAR = "hello world" +def sleep_setup_runtime_env(runtime_env: dict, session_dir): + logger = get_hook_logger() + logger.info(f"Setting up runtime environment {runtime_env}") + logger.info("Simulating long runtime env setup. Sleeping for 15s...") + time.sleep(15) + logger.info("Finished sleeping for 15s") + return RuntimeEnvContext() + + class BatchQueue(Queue): def __init__(self, maxsize: int = 0, actor_options: Optional[Dict] = None) -> None: diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 78563bf00..a31bd362a 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -9,8 +9,8 @@ from pathlib import Path import ray from ray.exceptions import RuntimeEnvSetupError -from ray.test_utils import (run_string_as_driver, - run_string_as_driver_nonblocking) +from ray.test_utils import ( + run_string_as_driver, run_string_as_driver_nonblocking, wait_for_condition) from ray._private.utils import (get_wheel_filename, get_master_wheel_url, get_release_wheel_url) import ray.experimental.internal_kv as kv @@ -832,6 +832,74 @@ def test_invalid_conda_env(shutdown_only): assert (time.time() - start) < (first_time / 2.0) +@pytest.mark.skipif( + sys.platform == "win32", reason="runtime_env unsupported on Windows.") +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "_system_config": { + "event_stats_print_interval_ms": 100, + "debug_dump_period_milliseconds": 100, + "event_stats": True + } + }], + indirect=True) +def test_no_spurious_worker_startup(ray_start_cluster): + """Test that no extra workers start up during a long env installation.""" + + cluster = ray_start_cluster + + # This hook sleeps for 15 seconds to simulate creating a runtime env. + cluster.add_node( + num_cpus=1, + runtime_env_setup_hook="ray.test_utils.sleep_setup_runtime_env") + + # Set a nonempty runtime env so that the runtime env setup hook is called. + runtime_env = {"env_vars": {"a": "b"}} + ray.init(address=cluster.address) + + @ray.remote + class Counter(object): + def __init__(self): + self.value = 0 + + def get(self): + return self.value + + # Instantiate an actor that requires the long runtime env installation. + a = Counter.options(runtime_env=runtime_env).remote() + assert ray.get(a.get.remote()) == 0 + + # Check "debug_state.txt" to ensure no extra workers were started. + session_dir = ray.worker.global_worker.node.address_info["session_dir"] + session_path = Path(session_dir) + debug_state_path = session_path / "debug_state.txt" + + def get_num_workers(): + with open(debug_state_path) as f: + for line in f.readlines(): + num_workers_prefix = "- num PYTHON workers: " + if num_workers_prefix in line: + return int(line[len(num_workers_prefix):]) + return None + + # Wait for "debug_state.txt" to be updated to reflect the started worker. + start = time.time() + wait_for_condition(lambda: get_num_workers() > 0) + time_waited = time.time() - start + print(f"Waited {time_waited} for debug_state.txt to be updated") + + # If any workers were unnecessarily started during the initial env + # installation, they will bypass the runtime env setup hook because the + # created env will have been cached and should be added to num_workers + # within a few seconds. Adjusting the default update period for + # debut_state.txt via this cluster_utils pytest fixture seems to be broken, + # so just check it for the next 10 seconds (the default period). + for i in range(100): + # Check that no more workers were started. + assert get_num_workers() <= 1 + time.sleep(0.1) + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/raylet/agent_manager.h b/src/ray/raylet/agent_manager.h index 75a8b7a46..79a9e93f8 100644 --- a/src/ray/raylet/agent_manager.h +++ b/src/ray/raylet/agent_manager.h @@ -64,13 +64,14 @@ class AgentManager : public rpc::AgentManagerServiceHandler { /// Request agent to create runtime env. /// \param[in] runtime_env The runtime env. - void CreateRuntimeEnv(const JobID &job_id, const std::string &serialized_runtime_env, - CreateRuntimeEnvCallback callback); + virtual void CreateRuntimeEnv(const JobID &job_id, + const std::string &serialized_runtime_env, + CreateRuntimeEnvCallback callback); /// Request agent to delete runtime env. /// \param[in] runtime_env The runtime env. - void DeleteRuntimeEnv(const std::string &serialized_runtime_env, - DeleteRuntimeEnvCallback callback); + virtual void DeleteRuntimeEnv(const std::string &serialized_runtime_env, + DeleteRuntimeEnvCallback callback); private: void StartAgent(); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index e4aa2c921..e779faa47 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -945,8 +945,9 @@ std::shared_ptr WorkerPool::PopWorker( task_spec, state, dynamic_options, true, GetRuntimeEnvHash(task_spec), task_spec.SerializedRuntimeEnv(), serialized_runtime_env_context); } else { - RAY_LOG(ERROR) << "Creating runtime environment failed. The " - "corresponding task will be failed."; + RAY_LOG(WARNING) << "Couldn't create a runtime environment for task " + << task_spec.TaskId() << ". The runtime environment was " + << task_spec.SerializedRuntimeEnv() << "."; runtime_env_setup_failed_callback_(task_spec.TaskId()); } }); @@ -990,23 +991,39 @@ std::shared_ptr WorkerPool::PopWorker( if (worker == nullptr) { // There are no more non-actor workers available to execute this task. // Start a new worker process. - if (task_spec.HasRuntimeEnv()) { - // create runtime env. - agent_manager_->CreateRuntimeEnv( - task_spec.JobId(), task_spec.SerializedRuntimeEnv(), - [this, start_worker_process_fn, &state, task_spec, runtime_env_hash]( - bool successful, const std::string &serialized_runtime_env_context) { - if (successful) { - start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, - task_spec.SerializedRuntimeEnv(), - serialized_runtime_env_context); - } else { - RAY_LOG(ERROR) << "Creating runtime environment failed. The " - "corresponding task will be failed."; - runtime_env_setup_failed_callback_(task_spec.TaskId()); - } - }); + // Create runtime env. If the env creation is already in progress on this + // node, skip this to prevent unnecessary CreateRuntimeEnv calls, which would + // unnecessarily start new worker processes. + auto it = runtime_env_statuses_.find(runtime_env_hash); + if (it == runtime_env_statuses_.end() || it->second == RuntimeEnvStatus::DONE) { + if (it == runtime_env_statuses_.end()) { + runtime_env_statuses_[runtime_env_hash] = RuntimeEnvStatus::PENDING; + } + agent_manager_->CreateRuntimeEnv( + task_spec.JobId(), task_spec.SerializedRuntimeEnv(), + [this, start_worker_process_fn, &state, task_spec, runtime_env_hash]( + bool successful, const std::string &serialized_runtime_env_context) { + runtime_env_statuses_[runtime_env_hash] = RuntimeEnvStatus::DONE; + if (successful) { + start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, + task_spec.SerializedRuntimeEnv(), + serialized_runtime_env_context); + } else { + RAY_LOG(WARNING) + << "Couldn't create a runtime environment for task " + << task_spec.TaskId() << ". The runtime environment was " + << task_spec.SerializedRuntimeEnv() << "."; + runtime_env_setup_failed_callback_(task_spec.TaskId()); + } + }); + } else { + RAY_LOG(DEBUG) << "PopWorker called for task " << task_spec.TaskId() + << " but the desired runtime env " + << task_spec.SerializedRuntimeEnv() + << " is pending installation. " + "No worker process will be started in this call."; + } } else { proc = start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, "", ""); diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index bb7a4a8e6..4572a4245 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -40,6 +40,13 @@ namespace raylet { using WorkerCommandMap = std::unordered_map, std::hash>; +enum class RuntimeEnvStatus { + /// This runtime env is currently being installed. + PENDING, + /// This runtime env has completed installation (either successfully or not) + DONE +}; + /// \class WorkerPoolInterface /// /// Used for new scheduler unit tests. @@ -49,11 +56,11 @@ class WorkerPoolInterface { /// the worker back onto the pool once the worker has completed its work. /// /// \param task_spec The returned worker must be able to execute this task. - /// \param allocated_instances_serialized_json The allocated resouce instances + /// \param allocated_instances_serialized_json The allocated resource instances /// json string, it contains resource ID which assigned to this worker. /// Instance resource value will be like {"GPU":[10000,0,10000]}, non-instance /// resource value will be {"CPU":20000}. - /// \return An idle worker with tit he requested task spec. Returns nullptr if no + /// \return An idle worker with the requested task spec. Returns nullptr if no /// such worker exists. virtual std::shared_ptr PopWorker( const TaskSpecification &task_spec, @@ -562,7 +569,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// The number of registered workers of the first job. int first_job_registered_python_worker_count_; - /// The umber of initial Python workers to wait for the first job before the driver + /// The number of initial Python workers to wait for the first job before the driver /// receives RegisterClientReply. int first_job_driver_wait_num_python_workers_; @@ -575,6 +582,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Set of jobs whose drivers have exited. absl::flat_hash_set finished_jobs_; + /// Maps runtime env hash to its status. + std::unordered_map runtime_env_statuses_; + /// This map stores the same data as `idle_of_all_languages_`, but in a map structure /// for lookup performance. std::unordered_map, int64_t> diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 4127c8a20..5149aaca1 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -92,6 +92,29 @@ class MockRuntimeEnvAgentClient : public rpc::RuntimeEnvAgentClientInterface { }; }; +class MockAgentManager : public AgentManager { + public: + MockAgentManager(AgentManager::Options options, DelayExecutorFn delay_executor, + RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory, + bool start_agent = true) + : AgentManager(options, delay_executor, runtime_env_agent_client_factory, + start_agent){}; + void CreateRuntimeEnv(const JobID &job_id, const std::string &serialized_runtime_env, + CreateRuntimeEnvCallback callback) override { + queued_callbacks.push(callback); + }; + + void PopAndInvokeCallback() { + if (queued_callbacks.size() > 0) { + CreateRuntimeEnvCallback callback = queued_callbacks.front(); + queued_callbacks.pop(); + callback(/*success=*/true, /*serialized_runtime_env_context=*/""); + } + } + + std::queue queued_callbacks; +}; + class WorkerPoolMock : public WorkerPool { public: explicit WorkerPoolMock(instrumented_io_context &io_service, @@ -1233,6 +1256,68 @@ TEST_F(WorkerPoolTest, StartWorkWithDifferentShimPid) { ASSERT_TRUE(worker_pool_->PopWorker(java_task_spec) != nullptr); } +TEST_F(WorkerPoolTest, NoSpuriousWorkerStartupDuringEnvInstall) { + std::vector agent_commands = {}; + const NodeID node_id = NodeID::FromRandom(); + auto options = AgentManager::Options({node_id, agent_commands}); + auto agent_manager = std::make_shared( + std::move(options), + /*delay_executor=*/ + [this](std::function task, uint32_t delay_ms) { + return execute_after(io_service_, task, delay_ms); + }, + /*runtime_env_agent_factory=*/ + [](const std::string &ip_address, int port) { + return std::shared_ptr( + new MockRuntimeEnvAgentClient()); + }, + false); + worker_pool_->SetAgentManager(agent_manager); + + ASSERT_EQ(worker_pool_->GetProcessSize(), 0); + const auto normal_task_spec = + ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), + /*dynamic_options=*/{}, TaskID::Nil(), "mock_runtime_env_1"); + + // Pop worker for a task with runtime env. + auto popped_worker = worker_pool_->PopWorker(normal_task_spec); + + // No idle workers. + ASSERT_EQ(popped_worker, nullptr); + + // Worker does not start because CreateRuntimeEnvCallback has not yet been invoked. + ASSERT_EQ(worker_pool_->GetProcessSize(), 0); + + // Simulate the following situation: before runtime env is done installing, + // PopWorker is called several more times for the same task. + for (int i = 0; i < 10; i++) { + worker_pool_->PopWorker(normal_task_spec); + } + + // Still, no workers should have started. + ASSERT_EQ(worker_pool_->GetProcessSize(), 0); + + // AgentManager::CreateRuntimeEnv() should have only been called once. + ASSERT_EQ(agent_manager->queued_callbacks.size(), 1); + + // Finish installing runtime env, call CreateRuntimeEnvCallback. + agent_manager->PopAndInvokeCallback(); + + // Only one worker process is started. + ASSERT_EQ(worker_pool_->GetProcessSize(), 1); + + // Now that the env has been created, we should be able to start workers for this env + // in parallel. The soft limit of the number of workers is 5, so just add 4 more. + for (int i = 0; i < 4; i++) { + worker_pool_->PopWorker(normal_task_spec); + } + ASSERT_EQ(agent_manager->queued_callbacks.size(), 4); + for (int i = 0; i < 4; i++) { + agent_manager->PopAndInvokeCallback(); + } + ASSERT_EQ(worker_pool_->GetProcessSize(), 5); +} + } // namespace raylet } // namespace ray