[runtime_env] Gracefully fail tasks when an environment fails to be set up (#17249)

This commit is contained in:
Edward Oakes 2021-07-28 15:25:02 -05:00 committed by GitHub
parent 72abf81900
commit 7007c6271d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 163 additions and 61 deletions

View file

@ -1,4 +1,5 @@
import asyncio
from dataclasses import dataclass
import json
import logging
from ray._private.ray_logging import setup_component_logger
@ -18,9 +19,18 @@ from ray.workers.pluggable_runtime_env import (RuntimeEnvContext,
logger = logging.getLogger(__name__)
@dataclass
class CreatedEnvResult:
# Whether or not the env was installed correctly.
success: bool
# If success is True, will be a serialized RuntimeEnvContext
# If success is False, will be an error message.
result: str
class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
runtime_env_agent_pb2_grpc.RuntimeEnvServiceServicer):
"""A rpc server to create or delete runtime envs.
"""An RPC server to create and delete runtime envs.
Attributes:
dashboard_agent: The DashboardAgent object contains global config.
@ -34,9 +44,9 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
self._logging_params = dashboard_agent.logging_params
self._per_job_logger_cache = dict()
runtime_env.PKG_DIR = dashboard_agent.runtime_env_dir
# Maps a serialized runtime env dict to the serialized
# RuntimeEnvContext arising from the creation of the env.
self._created_env_cache: Dict[str, str] = dict()
# Cache the results of creating envs to avoid repeatedly calling into
# conda and other slow calls.
self._env_cache: Dict[str, CreatedEnvResult] = dict()
def get_or_create_logger(self, job_id: bytes):
job_id = job_id.decode()
@ -65,18 +75,26 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
return await loop.run_in_executor(None, run_setup_with_logger)
serialized_env = request.serialized_runtime_env
if serialized_env in self._created_env_cache:
serialized_context = self._created_env_cache[serialized_env]
logger.info("Runtime env already created. Env: %s, context: %s",
serialized_env,
self._created_env_cache[serialized_env])
if serialized_env in self._env_cache:
serialized_context = self._env_cache[serialized_env]
result = self._env_cache[serialized_env]
if result.success:
context = result.result
logger.info("Runtime env already created successfully. "
f"Env: {serialized_env}, context: {context}")
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=serialized_context)
serialized_runtime_env_context=context)
else:
error_message = result.result
logger.info("Runtime env already failed. "
f"Env: {serialized_env}, err: {error_message}")
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=error_message)
logger.info("Creating runtime env: %s.",
request.serialized_runtime_env)
runtime_env_dict = json.loads(request.serialized_runtime_env or "{}")
logger.info(f"Creating runtime env: {serialized_env}")
runtime_env_dict = json.loads(serialized_env or "{}")
uris = runtime_env_dict.get("uris")
runtime_env_context: RuntimeEnvContext = None
error_message = None
@ -88,7 +106,7 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
# But we don't initailize internal kv in agent now.
pass
runtime_env_context = await _setup_runtime_env(
request.serialized_runtime_env, self._session_dir)
serialized_env, self._session_dir)
break
except Exception as ex:
logger.exception("Runtime env creation failed.")
@ -100,14 +118,17 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
"Runtime env creation failed for %d times, "
"don't retry any more.",
runtime_env_consts.RUNTIME_ENV_RETRY_TIMES)
self._env_cache[serialized_env] = CreatedEnvResult(
False, error_message)
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=error_message)
serialized_context = runtime_env_context.serialize()
self._created_env_cache[serialized_env] = serialized_context
self._env_cache[serialized_env] = CreatedEnvResult(
True, serialized_context)
logger.info("Successfully created runtime env: %s, the context: %s",
request.serialized_runtime_env, serialized_context)
serialized_env, serialized_context)
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=serialized_context)

View file

@ -267,6 +267,16 @@ class AsyncioActorExit(RayError):
pass
class RuntimeEnvSetupError(RayError):
"""Raised when a runtime environment fails to be set up."""
def __str__(self):
return (
"The runtime environment for this task or actor failed to be "
"installed. Corresponding error logs should have been streamed "
"to the driver's STDOUT.")
RAY_EXCEPTION_TYPES = [
PlasmaObjectNotAvailable,
RayError,
@ -277,4 +287,5 @@ RAY_EXCEPTION_TYPES = [
ObjectLostError,
GetTimeoutError,
AsyncioActorExit,
RuntimeEnvSetupError,
]

View file

@ -6,9 +6,10 @@ import ray.cloudpickle as pickle
from ray import ray_constants
import ray._private.utils
from ray.gcs_utils import ErrorType
from ray.exceptions import (
RayError, PlasmaObjectNotAvailable, RayTaskError, RayActorError,
TaskCancelledError, WorkerCrashedError, ObjectLostError, RaySystemError)
from ray.exceptions import (RayError, PlasmaObjectNotAvailable, RayTaskError,
RayActorError, TaskCancelledError,
WorkerCrashedError, ObjectLostError,
RaySystemError, RuntimeEnvSetupError)
from ray._raylet import (
split_buffer,
unpack_pickle5_buffers,
@ -223,6 +224,8 @@ class SerializationContext:
return TaskCancelledError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return ObjectLostError(object_ref.hex())
elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"):
return RuntimeEnvSetupError()
else:
assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \
"Tried to get object that has been promoted to plasma."

View file

@ -3,9 +3,12 @@ import pytest
import sys
import random
import tempfile
import time
import requests
from pathlib import Path
import ray
from ray.exceptions import RuntimeEnvSetupError
from ray.test_utils import (run_string_as_driver,
run_string_as_driver_nonblocking)
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
@ -803,6 +806,32 @@ def test_working_dir_override_failure(shutdown_only):
B.options(runtime_env={"working_dir": "."}).remote()
@pytest.mark.skipif(
sys.platform == "win32", reason="runtime_env unsupported on Windows.")
def test_invalid_conda_env(shutdown_only):
ray.init()
@ray.remote
def f():
pass
start = time.time()
bad_env = {"conda": {"dependencies": ["this_doesnt_exist"]}}
with pytest.raises(RuntimeEnvSetupError):
ray.get(f.options(runtime_env=bad_env).remote())
first_time = time.time() - start
# Check that another valid task can run.
ray.get(f.remote())
# The second time this runs it should be faster as the error is cached.
start = time.time()
with pytest.raises(RuntimeEnvSetupError):
ray.get(f.options(runtime_env=bad_env).remote())
assert (time.time() - start) < (first_time / 2.0)
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -505,7 +505,23 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
pending_lease_request = std::make_pair(nullptr, TaskID::Nil());
if (status.ok()) {
if (reply.canceled()) {
if (reply.runtime_env_setup_failed()) {
// If the runtime_env failed to be set up, we fail all of the pending
// tasks in the queue. This makes an implicit assumption that runtime_env
// failures are not transient -- we may consider adding some retries
// in the future.
auto &task_queue = scheduling_key_entry.task_queue;
while (!task_queue.empty()) {
auto &task_spec = task_queue.front();
RAY_UNUSED(task_finisher_->MarkPendingTaskFailed(
task_spec.TaskId(), task_spec, rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED,
nullptr));
task_queue.pop_front();
}
if (scheduling_key_entry.CanDelete()) {
scheduling_key_entries_.erase(scheduling_key);
}
} else if (reply.canceled()) {
RAY_LOG(DEBUG) << "Lease canceled " << task_id;
RequestNewWorkerIfNeeded(scheduling_key);
} else if (!reply.worker_address().raylet_id().empty()) {

View file

@ -115,8 +115,10 @@ enum ErrorType {
OBJECT_IN_PLASMA = 4;
// Indicates that an object has been cancelled.
TASK_CANCELLED = 5;
// Inidicates that creating the GCS service failed to create the actor.
// Indicates that the GCS service failed to create the actor.
ACTOR_CREATION_FAILED = 6;
// Indicates that the runtime_env failed to be created.
RUNTIME_ENV_SETUP_FAILED = 7;
}
/// The task exception encapsulates all information about task

View file

@ -38,8 +38,11 @@ message RequestWorkerLeaseReply {
// Whether this lease request was canceled. In this case, the
// client should try again if the resources are still required.
bool canceled = 4;
// Whether creating the runtime environment for this lease request failed.
// If this is true, the corresponding task should be failed by the client.
bool runtime_env_setup_failed = 5;
// PID of the worker process.
uint32 worker_pid = 5;
uint32 worker_pid = 6;
}
message PrepareBundleResourcesRequest {

View file

@ -183,6 +183,11 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
config.worker_commands,
/*starting_worker_timeout_callback=*/
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
/*runtime_env_setup_failed_callback=*/
[this](const TaskID &task_id) {
RAY_CHECK(cluster_task_manager_->CancelTask(
task_id, /*runtime_env_setup_failed=*/true));
},
config.ray_debugger_external,
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
client_call_manager_(io_service),

View file

@ -468,14 +468,16 @@ void ClusterTaskManager::ReturnWorkerResources(std::shared_ptr<WorkerInterface>
ReleaseWorkerResources(worker);
}
void ReplyCancelled(Work &work) {
void ReplyCancelled(Work &work, bool runtime_env_setup_failed) {
auto reply = std::get<1>(work);
auto callback = std::get<2>(work);
reply->set_canceled(true);
reply->set_runtime_env_setup_failed(runtime_env_setup_failed);
callback();
}
bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
bool ClusterTaskManager::CancelTask(const TaskID &task_id,
bool runtime_env_setup_failed) {
// TODO(sang): There are lots of repetitive code around task backlogs. We should
// refactor them.
for (auto shapes_it = tasks_to_schedule_.begin(); shapes_it != tasks_to_schedule_.end();
@ -486,7 +488,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
if (task.GetTaskSpecification().TaskId() == task_id) {
RemoveFromBacklogTracker(task);
RAY_LOG(DEBUG) << "Canceling task " << task_id;
ReplyCancelled(*work_it);
ReplyCancelled(*work_it, runtime_env_setup_failed);
work_queue.erase(work_it);
if (work_queue.empty()) {
tasks_to_schedule_.erase(shapes_it);
@ -502,7 +504,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
const auto &task = std::get<0>(*work_it);
if (task.GetTaskSpecification().TaskId() == task_id) {
RemoveFromBacklogTracker(task);
ReplyCancelled(*work_it);
ReplyCancelled(*work_it, runtime_env_setup_failed);
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
@ -523,7 +525,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
const auto &task = std::get<0>(*work_it);
if (task.GetTaskSpecification().TaskId() == task_id) {
RemoveFromBacklogTracker(task);
ReplyCancelled(*work_it);
ReplyCancelled(*work_it, runtime_env_setup_failed);
work_queue.erase(work_it);
if (work_queue.empty()) {
infeasible_tasks_.erase(shapes_it);
@ -537,7 +539,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
if (iter != waiting_tasks_index_.end()) {
const auto &task = std::get<0>(*iter->second);
RemoveFromBacklogTracker(task);
ReplyCancelled(*iter->second);
ReplyCancelled(*iter->second, runtime_env_setup_failed);
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());

View file

@ -100,10 +100,12 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
/// \param runtime_env_setup_failed: If this is being cancelled because the env setup
/// failed.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id) override;
bool CancelTask(const TaskID &task_id, bool runtime_env_setup_failed = false) override;
/// Populate the list of pending or infeasible actor tasks for node stats.
///

View file

@ -88,10 +88,13 @@ class ClusterTaskManagerInterface {
/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
/// \param runtime_env_setup_failed: If this is being cancelled because the env setup
/// failed.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
virtual bool CancelTask(const TaskID &task_id) = 0;
virtual bool CancelTask(const TaskID &task_id,
bool runtime_env_setup_failed = false) = 0;
/// Queue task and schedule. This hanppens when processing the worker lease request.
///

View file

@ -57,14 +57,14 @@ namespace ray {
namespace raylet {
WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id,
WorkerPool::WorkerPool(
instrumented_io_context &io_service, const NodeID node_id,
const std::string node_address, int num_workers_soft_limit,
int num_initial_python_workers_for_first_job,
int maximum_startup_concurrency, int min_worker_port,
int max_worker_port, const std::vector<int> &worker_ports,
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands,
int num_initial_python_workers_for_first_job, int maximum_startup_concurrency,
int min_worker_port, int max_worker_port, const std::vector<int> &worker_ports,
std::shared_ptr<gcs::GcsClient> gcs_client, const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback,
std::function<void(const TaskID &)> runtime_env_setup_failed_callback,
int ray_debugger_external, const std::function<double()> get_time)
: io_service_(&io_service),
node_id_(node_id),
@ -73,6 +73,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id
maximum_startup_concurrency_(maximum_startup_concurrency),
gcs_client_(std::move(gcs_client)),
starting_worker_timeout_callback_(starting_worker_timeout_callback),
runtime_env_setup_failed_callback_(runtime_env_setup_failed_callback),
ray_debugger_external(ray_debugger_external),
first_job_registered_python_worker_count_(0),
first_job_driver_wait_num_python_workers_(std::min(
@ -935,20 +936,19 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
state.tasks_with_pending_runtime_envs.emplace(task_spec.TaskId());
agent_manager_->CreateRuntimeEnv(
task_spec.JobId(), task_spec.SerializedRuntimeEnv(),
[start_worker_process_fn, &state, task_spec, dynamic_options,
[this, start_worker_process_fn, &state, task_spec, dynamic_options,
allocated_instances_serialized_json](
bool done, const std::string &serialized_runtime_env_context) {
bool success, const std::string &serialized_runtime_env_context) {
state.tasks_with_pending_runtime_envs.erase(task_spec.TaskId());
if (!done) {
// TODO(guyang.sgy): Reschedule to other nodes when create runtime env
// failed.
RAY_LOG(ERROR) << "Create runtime env(for dedicated actor) rpc failed. "
"Wait for next time to retry or reschedule.";
return;
}
if (success) {
start_worker_process_fn(
task_spec, state, dynamic_options, true, GetRuntimeEnvHash(task_spec),
task_spec.SerializedRuntimeEnv(), serialized_runtime_env_context);
} else {
RAY_LOG(ERROR) << "Creating runtime environment failed. The "
"corresponding task will be failed.";
runtime_env_setup_failed_callback_(task_spec.TaskId());
}
});
} else {
proc =
@ -995,16 +995,17 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
// create runtime env.
agent_manager_->CreateRuntimeEnv(
task_spec.JobId(), task_spec.SerializedRuntimeEnv(),
[start_worker_process_fn, &state, task_spec, runtime_env_hash](
[this, start_worker_process_fn, &state, task_spec, runtime_env_hash](
bool successful, const std::string &serialized_runtime_env_context) {
if (!successful) {
// TODO(guyang.sgy): Reschedule to other nodes when create runtime env
// failed.
return;
}
if (successful) {
start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash,
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context);
} else {
RAY_LOG(ERROR) << "Creating runtime environment failed. The "
"corresponding task will be failed.";
runtime_env_setup_failed_callback_(task_spec.TaskId());
}
});
} else {
proc = start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, "",

View file

@ -137,6 +137,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback,
std::function<void(const TaskID &)> runtime_env_setup_failed_callback,
int ray_debugger_external, const std::function<double()> get_time);
/// Destructor responsible for freeing a set of workers owned by this class.
@ -546,6 +547,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// The callback that will be triggered once it times out to start a worker.
std::function<void()> starting_worker_timeout_callback_;
/// The callback that will be triggered when a runtime_env setup for a task fails.
std::function<void(const TaskID &)> runtime_env_setup_failed_callback_;
/// If 1, expose Ray debuggers started by the workers externally (to this node).
int ray_debugger_external;
FRIEND_TEST(WorkerPoolTest, InitialWorkerProcessCount);

View file

@ -98,7 +98,8 @@ class WorkerPoolMock : public WorkerPool {
const WorkerCommandMap &worker_commands)
: WorkerPool(io_service, NodeID::FromRandom(), "", POOL_SIZE_SOFT_LIMIT, 0,
MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr, worker_commands,
[]() {}, 0, [this]() { return current_time_ms_; }),
[]() {}, [](const TaskID &) {}, 0,
[this]() { return current_time_ms_; }),
last_worker_process_() {
SetNodeManagerPort(1);
}