[Core] [runtime env] Avoid spurious worker startup (#17422)

This commit is contained in:
architkulkarni 2021-08-05 13:46:23 -07:00 committed by GitHub
parent 667851f0ad
commit e84ae6caa5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 219 additions and 28 deletions

View file

@ -20,7 +20,8 @@ import ray._private.utils
from ray.util.queue import Queue, _QueueActor, Empty
import requests
from ray.scripts.scripts import main as ray_main
from ray.workers.pluggable_runtime_env import (RuntimeEnvContext,
get_hook_logger)
try:
from prometheus_client.parser import text_string_to_metric_families
except (ImportError, ModuleNotFoundError):
@ -594,6 +595,15 @@ def set_setup_func():
runtime_env.VAR = "hello world"
def sleep_setup_runtime_env(runtime_env: dict, session_dir):
logger = get_hook_logger()
logger.info(f"Setting up runtime environment {runtime_env}")
logger.info("Simulating long runtime env setup. Sleeping for 15s...")
time.sleep(15)
logger.info("Finished sleeping for 15s")
return RuntimeEnvContext()
class BatchQueue(Queue):
def __init__(self, maxsize: int = 0,
actor_options: Optional[Dict] = None) -> None:

View file

@ -9,8 +9,8 @@ from pathlib import Path
import ray
from ray.exceptions import RuntimeEnvSetupError
from ray.test_utils import (run_string_as_driver,
run_string_as_driver_nonblocking)
from ray.test_utils import (
run_string_as_driver, run_string_as_driver_nonblocking, wait_for_condition)
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
get_release_wheel_url)
import ray.experimental.internal_kv as kv
@ -832,6 +832,74 @@ def test_invalid_conda_env(shutdown_only):
assert (time.time() - start) < (first_time / 2.0)
@pytest.mark.skipif(
sys.platform == "win32", reason="runtime_env unsupported on Windows.")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"_system_config": {
"event_stats_print_interval_ms": 100,
"debug_dump_period_milliseconds": 100,
"event_stats": True
}
}],
indirect=True)
def test_no_spurious_worker_startup(ray_start_cluster):
"""Test that no extra workers start up during a long env installation."""
cluster = ray_start_cluster
# This hook sleeps for 15 seconds to simulate creating a runtime env.
cluster.add_node(
num_cpus=1,
runtime_env_setup_hook="ray.test_utils.sleep_setup_runtime_env")
# Set a nonempty runtime env so that the runtime env setup hook is called.
runtime_env = {"env_vars": {"a": "b"}}
ray.init(address=cluster.address)
@ray.remote
class Counter(object):
def __init__(self):
self.value = 0
def get(self):
return self.value
# Instantiate an actor that requires the long runtime env installation.
a = Counter.options(runtime_env=runtime_env).remote()
assert ray.get(a.get.remote()) == 0
# Check "debug_state.txt" to ensure no extra workers were started.
session_dir = ray.worker.global_worker.node.address_info["session_dir"]
session_path = Path(session_dir)
debug_state_path = session_path / "debug_state.txt"
def get_num_workers():
with open(debug_state_path) as f:
for line in f.readlines():
num_workers_prefix = "- num PYTHON workers: "
if num_workers_prefix in line:
return int(line[len(num_workers_prefix):])
return None
# Wait for "debug_state.txt" to be updated to reflect the started worker.
start = time.time()
wait_for_condition(lambda: get_num_workers() > 0)
time_waited = time.time() - start
print(f"Waited {time_waited} for debug_state.txt to be updated")
# If any workers were unnecessarily started during the initial env
# installation, they will bypass the runtime env setup hook because the
# created env will have been cached and should be added to num_workers
# within a few seconds. Adjusting the default update period for
# debut_state.txt via this cluster_utils pytest fixture seems to be broken,
# so just check it for the next 10 seconds (the default period).
for i in range(100):
# Check that no more workers were started.
assert get_num_workers() <= 1
time.sleep(0.1)
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -64,13 +64,14 @@ class AgentManager : public rpc::AgentManagerServiceHandler {
/// Request agent to create runtime env.
/// \param[in] runtime_env The runtime env.
void CreateRuntimeEnv(const JobID &job_id, const std::string &serialized_runtime_env,
CreateRuntimeEnvCallback callback);
virtual void CreateRuntimeEnv(const JobID &job_id,
const std::string &serialized_runtime_env,
CreateRuntimeEnvCallback callback);
/// Request agent to delete runtime env.
/// \param[in] runtime_env The runtime env.
void DeleteRuntimeEnv(const std::string &serialized_runtime_env,
DeleteRuntimeEnvCallback callback);
virtual void DeleteRuntimeEnv(const std::string &serialized_runtime_env,
DeleteRuntimeEnvCallback callback);
private:
void StartAgent();

View file

@ -945,8 +945,9 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
task_spec, state, dynamic_options, true, GetRuntimeEnvHash(task_spec),
task_spec.SerializedRuntimeEnv(), serialized_runtime_env_context);
} else {
RAY_LOG(ERROR) << "Creating runtime environment failed. The "
"corresponding task will be failed.";
RAY_LOG(WARNING) << "Couldn't create a runtime environment for task "
<< task_spec.TaskId() << ". The runtime environment was "
<< task_spec.SerializedRuntimeEnv() << ".";
runtime_env_setup_failed_callback_(task_spec.TaskId());
}
});
@ -990,23 +991,39 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
if (worker == nullptr) {
// There are no more non-actor workers available to execute this task.
// Start a new worker process.
if (task_spec.HasRuntimeEnv()) {
// create runtime env.
agent_manager_->CreateRuntimeEnv(
task_spec.JobId(), task_spec.SerializedRuntimeEnv(),
[this, start_worker_process_fn, &state, task_spec, runtime_env_hash](
bool successful, const std::string &serialized_runtime_env_context) {
if (successful) {
start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash,
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context);
} else {
RAY_LOG(ERROR) << "Creating runtime environment failed. The "
"corresponding task will be failed.";
runtime_env_setup_failed_callback_(task_spec.TaskId());
}
});
// Create runtime env. If the env creation is already in progress on this
// node, skip this to prevent unnecessary CreateRuntimeEnv calls, which would
// unnecessarily start new worker processes.
auto it = runtime_env_statuses_.find(runtime_env_hash);
if (it == runtime_env_statuses_.end() || it->second == RuntimeEnvStatus::DONE) {
if (it == runtime_env_statuses_.end()) {
runtime_env_statuses_[runtime_env_hash] = RuntimeEnvStatus::PENDING;
}
agent_manager_->CreateRuntimeEnv(
task_spec.JobId(), task_spec.SerializedRuntimeEnv(),
[this, start_worker_process_fn, &state, task_spec, runtime_env_hash](
bool successful, const std::string &serialized_runtime_env_context) {
runtime_env_statuses_[runtime_env_hash] = RuntimeEnvStatus::DONE;
if (successful) {
start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash,
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context);
} else {
RAY_LOG(WARNING)
<< "Couldn't create a runtime environment for task "
<< task_spec.TaskId() << ". The runtime environment was "
<< task_spec.SerializedRuntimeEnv() << ".";
runtime_env_setup_failed_callback_(task_spec.TaskId());
}
});
} else {
RAY_LOG(DEBUG) << "PopWorker called for task " << task_spec.TaskId()
<< " but the desired runtime env "
<< task_spec.SerializedRuntimeEnv()
<< " is pending installation. "
"No worker process will be started in this call.";
}
} else {
proc = start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, "",
"");

View file

@ -40,6 +40,13 @@ namespace raylet {
using WorkerCommandMap =
std::unordered_map<Language, std::vector<std::string>, std::hash<int>>;
enum class RuntimeEnvStatus {
/// This runtime env is currently being installed.
PENDING,
/// This runtime env has completed installation (either successfully or not)
DONE
};
/// \class WorkerPoolInterface
///
/// Used for new scheduler unit tests.
@ -49,11 +56,11 @@ class WorkerPoolInterface {
/// the worker back onto the pool once the worker has completed its work.
///
/// \param task_spec The returned worker must be able to execute this task.
/// \param allocated_instances_serialized_json The allocated resouce instances
/// \param allocated_instances_serialized_json The allocated resource instances
/// json string, it contains resource ID which assigned to this worker.
/// Instance resource value will be like {"GPU":[10000,0,10000]}, non-instance
/// resource value will be {"CPU":20000}.
/// \return An idle worker with tit he requested task spec. Returns nullptr if no
/// \return An idle worker with the requested task spec. Returns nullptr if no
/// such worker exists.
virtual std::shared_ptr<WorkerInterface> PopWorker(
const TaskSpecification &task_spec,
@ -562,7 +569,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// The number of registered workers of the first job.
int first_job_registered_python_worker_count_;
/// The umber of initial Python workers to wait for the first job before the driver
/// The number of initial Python workers to wait for the first job before the driver
/// receives RegisterClientReply.
int first_job_driver_wait_num_python_workers_;
@ -575,6 +582,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// Set of jobs whose drivers have exited.
absl::flat_hash_set<JobID> finished_jobs_;
/// Maps runtime env hash to its status.
std::unordered_map<int, RuntimeEnvStatus> runtime_env_statuses_;
/// This map stores the same data as `idle_of_all_languages_`, but in a map structure
/// for lookup performance.
std::unordered_map<std::shared_ptr<WorkerInterface>, int64_t>

View file

@ -92,6 +92,29 @@ class MockRuntimeEnvAgentClient : public rpc::RuntimeEnvAgentClientInterface {
};
};
class MockAgentManager : public AgentManager {
public:
MockAgentManager(AgentManager::Options options, DelayExecutorFn delay_executor,
RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory,
bool start_agent = true)
: AgentManager(options, delay_executor, runtime_env_agent_client_factory,
start_agent){};
void CreateRuntimeEnv(const JobID &job_id, const std::string &serialized_runtime_env,
CreateRuntimeEnvCallback callback) override {
queued_callbacks.push(callback);
};
void PopAndInvokeCallback() {
if (queued_callbacks.size() > 0) {
CreateRuntimeEnvCallback callback = queued_callbacks.front();
queued_callbacks.pop();
callback(/*success=*/true, /*serialized_runtime_env_context=*/"");
}
}
std::queue<CreateRuntimeEnvCallback> queued_callbacks;
};
class WorkerPoolMock : public WorkerPool {
public:
explicit WorkerPoolMock(instrumented_io_context &io_service,
@ -1233,6 +1256,68 @@ TEST_F(WorkerPoolTest, StartWorkWithDifferentShimPid) {
ASSERT_TRUE(worker_pool_->PopWorker(java_task_spec) != nullptr);
}
TEST_F(WorkerPoolTest, NoSpuriousWorkerStartupDuringEnvInstall) {
std::vector<std::string> agent_commands = {};
const NodeID node_id = NodeID::FromRandom();
auto options = AgentManager::Options({node_id, agent_commands});
auto agent_manager = std::make_shared<MockAgentManager>(
std::move(options),
/*delay_executor=*/
[this](std::function<void()> task, uint32_t delay_ms) {
return execute_after(io_service_, task, delay_ms);
},
/*runtime_env_agent_factory=*/
[](const std::string &ip_address, int port) {
return std::shared_ptr<rpc::RuntimeEnvAgentClientInterface>(
new MockRuntimeEnvAgentClient());
},
false);
worker_pool_->SetAgentManager(agent_manager);
ASSERT_EQ(worker_pool_->GetProcessSize(), 0);
const auto normal_task_spec =
ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(),
/*dynamic_options=*/{}, TaskID::Nil(), "mock_runtime_env_1");
// Pop worker for a task with runtime env.
auto popped_worker = worker_pool_->PopWorker(normal_task_spec);
// No idle workers.
ASSERT_EQ(popped_worker, nullptr);
// Worker does not start because CreateRuntimeEnvCallback has not yet been invoked.
ASSERT_EQ(worker_pool_->GetProcessSize(), 0);
// Simulate the following situation: before runtime env is done installing,
// PopWorker is called several more times for the same task.
for (int i = 0; i < 10; i++) {
worker_pool_->PopWorker(normal_task_spec);
}
// Still, no workers should have started.
ASSERT_EQ(worker_pool_->GetProcessSize(), 0);
// AgentManager::CreateRuntimeEnv() should have only been called once.
ASSERT_EQ(agent_manager->queued_callbacks.size(), 1);
// Finish installing runtime env, call CreateRuntimeEnvCallback.
agent_manager->PopAndInvokeCallback();
// Only one worker process is started.
ASSERT_EQ(worker_pool_->GetProcessSize(), 1);
// Now that the env has been created, we should be able to start workers for this env
// in parallel. The soft limit of the number of workers is 5, so just add 4 more.
for (int i = 0; i < 4; i++) {
worker_pool_->PopWorker(normal_task_spec);
}
ASSERT_EQ(agent_manager->queued_callbacks.size(), 4);
for (int i = 0; i < 4; i++) {
agent_manager->PopAndInvokeCallback();
}
ASSERT_EQ(worker_pool_->GetProcessSize(), 5);
}
} // namespace raylet
} // namespace ray