[Runtime Env] Plumbing runtime env failure error message to the exception: Task [1/3] (#22032)

This is the PR to write better runtime env exception. After 3 PRs are merged, we can entirely turn off the runtime env logs streamed to drivers.

The first PR only handles tasks exception.

TODO
- [x] Task (this PR)
- [ ] Actor
- [ ] Turn of runtime env logs & improve error msgs
This commit is contained in:
SangBin Cho 2022-02-04 09:47:04 +09:00 committed by GitHub
parent dd935874ee
commit d7fc7d2e9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 227 additions and 115 deletions

View file

@ -156,7 +156,7 @@ def test_runtime_env_setup_failure(job_sdk_client):
wait_for_condition(_check_job_failed, client=client, job_id=job_id)
status = client.get_job_status(job_id)
assert "The runtime_env failed to be set up" in status.message
assert "Failed to setup runtime environment" in status.message
def test_submit_job_with_exception_in_driver(job_sdk_client):

View file

@ -5,6 +5,7 @@ import json
import logging
import os
import time
import traceback
from typing import Dict, Set
from ray._private.utils import import_attr
@ -261,9 +262,10 @@ class RuntimeEnvAgent(
serialized_env, request.serialized_allocated_resource_instances
)
break
except Exception as ex:
self._logger.exception("Runtime env creation failed.")
error_message = str(ex)
except Exception:
err_msg = f"Failed to create runtime env {serialized_env}."
self._logger.exception(err_msg)
error_message = f"{err_msg}\n{traceback.format_exc()}"
await asyncio.sleep(
runtime_env_consts.RUNTIME_ENV_RETRY_INTERVAL_MS / 1000
)

View file

@ -110,8 +110,11 @@ def create_conda_env_if_needed(
logger.info(f"Creating conda environment {prefix}")
exit_code, output = exec_cmd_stream_to_logger(create_cmd, logger)
if exit_code != 0:
shutil.rmtree(prefix)
raise RuntimeError(f"Failed to install conda environment {prefix}:\n{output}")
if os.path.exists(prefix):
shutil.rmtree(prefix)
raise RuntimeError(
f"Failed to install conda environment {prefix}:\nOutput:\n{output}"
)
def delete_conda_env(prefix: str, logger: Optional[logging.Logger] = None) -> bool:

View file

@ -522,10 +522,21 @@ class AsyncioActorExit(RayError):
class RuntimeEnvSetupError(RayError):
"""Raised when a runtime environment fails to be set up."""
"""Raised when a runtime environment fails to be set up.
params:
error_message: The error message that explains
why runtime env setup has failed.
"""
def __init__(self, error_message: str = None):
self.error_message = error_message
def __str__(self):
return "The runtime_env failed to be set up."
msgs = ["Failed to setup runtime environment."]
if self.error_message:
msgs.append(self.error_message)
return "\n".join(msgs)
class TaskPlacementGroupRemoved(RayError):

View file

@ -200,14 +200,19 @@ class SerializationContext:
raise DeserializationError()
return obj
def _deserialize_actor_died_error(self, data, metadata_fields):
if not data:
return RayActorError()
def _deserialize_error_info(self, data, metadata_fields):
assert data
pb_bytes = self._deserialize_msgpack_data(data, metadata_fields)
assert pb_bytes
ray_error_info = RayErrorInfo()
ray_error_info.ParseFromString(pb_bytes)
return ray_error_info
def _deserialize_actor_died_error(self, data, metadata_fields):
if not data:
return RayActorError()
ray_error_info = self._deserialize_error_info(data, metadata_fields)
assert ray_error_info.HasField("actor_died_error")
if ray_error_info.actor_died_error.HasField("creation_task_failure_context"):
return RayError.from_ray_exception(
@ -291,7 +296,12 @@ class SerializationContext:
object_ref.hex(), object_ref.owner_address(), object_ref.call_site()
)
elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"):
return RuntimeEnvSetupError()
error_info = self._deserialize_error_info(data, metadata_fields)
# TODO(sang): Assert instead once actor also reports error messages.
error_msg = ""
if error_info.HasField("runtime_env_setup_failed_error"):
error_msg = error_info.runtime_env_setup_failed_error.error_message
return RuntimeEnvSetupError(error_message=error_msg)
elif error_type == ErrorType.Value("TASK_PLACEMENT_GROUP_REMOVED"):
return TaskPlacementGroupRemoved()
elif error_type == ErrorType.Value("ACTOR_PLACEMENT_GROUP_REMOVED"):

View file

@ -137,7 +137,11 @@ def test_invalid_conda_env(shutdown_only):
start = time.time()
bad_env = {"conda": {"dependencies": ["this_doesnt_exist"]}}
with pytest.raises(RuntimeEnvSetupError):
with pytest.raises(
RuntimeEnvSetupError,
# The actual error message should be included in the exception.
match="ResolvePackageNotFound",
):
ray.get(f.options(runtime_env=bad_env).remote())
first_time = time.time() - start
@ -150,7 +154,7 @@ def test_invalid_conda_env(shutdown_only):
# The second time this runs it should be faster as the error is cached.
start = time.time()
with pytest.raises(RuntimeEnvSetupError):
with pytest.raises(RuntimeEnvSetupError, match="ResolvePackageNotFound"):
ray.get(f.options(runtime_env=bad_env).remote())
assert (time.time() - start) < (first_time / 2.0)

View file

@ -33,7 +33,10 @@ class MockClusterTaskManagerInterface : public ClusterTaskManagerInterface {
(const, override));
MOCK_METHOD(void, TaskFinished,
(std::shared_ptr<WorkerInterface> worker, RayTask *task), (override));
MOCK_METHOD(bool, CancelTask, (const TaskID &task_id, bool runtime_env_setup_failed),
MOCK_METHOD(bool, CancelTask,
(const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message),
(override));
MOCK_METHOD(void, QueueAndScheduleTask,
(const RayTask &task, rpc::RequestWorkerLeaseReply *reply,

View file

@ -382,8 +382,12 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
if (reply.failure_type() ==
rpc::RequestWorkerLeaseReply::
SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED) {
rpc::RayErrorInfo error_info;
error_info.mutable_runtime_env_setup_failed_error()->set_error_message(
reply.scheduling_failure_message());
RAY_UNUSED(task_finisher_->FailPendingTask(
task_spec.TaskId(), rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED));
task_spec.TaskId(), rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED,
/*status*/ nullptr, &error_info));
} else {
if (task_spec.IsActorCreationTask()) {
RAY_UNUSED(task_finisher_->FailPendingTask(

View file

@ -1047,7 +1047,7 @@ void GcsActorManager::OnActorSchedulingFailed(
case rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED:
error_msg =
"Could not create the actor because its associated runtime env failed to be "
"created..";
"created.";
break;
default:
RAY_LOG(FATAL) << "Unknown error, failure type "

View file

@ -186,6 +186,7 @@ enum ErrorType {
message RayErrorInfo {
oneof error {
ActorDeathCause actor_died_error = 2;
RuntimeEnvFailedContext runtime_env_setup_failed_error = 3;
}
}
@ -204,13 +205,13 @@ message ActorDeathCause {
oneof context {
// Indicates that this actor is marked as DEAD due to actor creation task failure.
RayException creation_task_failure_context = 1;
ActorDeathRuntimeEnvFailedContext runtime_env_failed_context = 2;
RuntimeEnvFailedContext runtime_env_failed_context = 2;
ActorDiedErrorContext actor_died_error_context = 3;
}
}
// ---Actor death contexts start----
// Indicates that this actor is marked as DEAD due to runtime environment setup failure.
message ActorDeathRuntimeEnvFailedContext {
message RuntimeEnvFailedContext {
// TODO(sang,lixin) Get this error message from agent.
string error_message = 1;
}

View file

@ -55,15 +55,17 @@ message RequestWorkerLeaseRequest {
message RequestWorkerLeaseReply {
enum SchedulingFailureType {
// The default failure type is "not failed".
NOT_FAILED = 0;
// Scheduling is failed on this node.
SCHEDULING_FAILED = 0;
SCHEDULING_FAILED = 1;
// Scheduling is cancelled because task/actor's placement is removed.
SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED = 1;
SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED = 2;
// Scheduling is cancelled because task/actor's runtime environment setup is failed
SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED = 2;
SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED = 3;
// Scheduling is cancelled because task/actor is intentionally cancelled. E.g.,
// ray.kill or ray.cancel
SCHEDULING_CANCELLED_INTENDED = 3;
SCHEDULING_CANCELLED_INTENDED = 4;
}
// Address of the leased worker. If this is empty, then the request should be
@ -73,8 +75,7 @@ message RequestWorkerLeaseReply {
Address retry_at_raylet_address = 2;
// Resource mapping ids acquired by the leased worker.
repeated ResourceMapEntry resource_mapping = 3;
// Whether this lease request was canceled. In this case, the
// client should try again if the resources are still required.
// Whether this lease request was canceled.
bool canceled = 4;
// PID of the worker process.
uint32 worker_pid = 6;
@ -83,7 +84,11 @@ message RequestWorkerLeaseReply {
// The (normal task) resources data to be carried by the Reply.
ResourcesData resources_data = 8;
// Scheduling failure type.
// Must be set only when canceled is set.
SchedulingFailureType failure_type = 9;
// The error message explaining why scheduling has failed.
// Must be an empty string if failure_type is `NOT_FAILED`.
string scheduling_failure_message = 10;
}
message PrepareBundleResourcesRequest {

View file

@ -133,16 +133,20 @@ void AgentManager::CreateRuntimeEnv(
CreateRuntimeEnvCallback callback) {
// If the agent cannot be started, fail the request.
if (!should_start_agent_) {
RAY_LOG(ERROR) << "Not all required Ray dependencies for the runtime_env "
"feature were found. To install the required dependencies, "
<< "please run `pip install \"ray[default]\"`.";
std::stringstream str_stream;
str_stream << "Not all required Ray dependencies for the runtime_env "
"feature were found. To install the required dependencies, "
<< "please run `pip install \"ray[default]\"`.";
const auto &error_message = str_stream.str();
RAY_LOG(ERROR) << error_message;
// Execute the callback after the currently executing callback finishes. Otherwise
// the task may be erased from the dispatch queue during the queue iteration in
// ClusterTaskManager::DispatchScheduledTasksToWorkers(), invalidating the iterator
// and causing a segfault.
delay_executor_(
[callback] {
callback(/*successful=*/false, /*serialized_runtime_env_context=*/"");
[callback = std::move(callback), error_message] {
callback(/*successful=*/false, /*serialized_runtime_env_context=*/"",
/*setup_error_message*/ error_message);
},
0);
return;
@ -151,12 +155,17 @@ void AgentManager::CreateRuntimeEnv(
if (runtime_env_agent_client_ == nullptr) {
// If the agent cannot be restarted anymore, fail the request.
if (agent_restart_count_ >= RayConfig::instance().agent_max_restart_count()) {
RAY_LOG(WARNING) << "Runtime environment " << serialized_runtime_env
<< " cannot be created on this node because the agent is dead.";
std::stringstream str_stream;
str_stream << "Runtime environment " << serialized_runtime_env
<< " cannot be created on this node because the agent is dead.";
const auto &error_message = str_stream.str();
RAY_LOG(WARNING) << error_message;
delay_executor_(
[callback, serialized_runtime_env] {
[callback = std::move(callback),
serialized_runtime_env = std::move(serialized_runtime_env), error_message] {
callback(/*successful=*/false,
/*serialized_runtime_env_context=*/serialized_runtime_env);
/*serialized_runtime_env_context=*/serialized_runtime_env,
/*setup_error_message*/ error_message);
},
0);
return;
@ -167,7 +176,7 @@ void AgentManager::CreateRuntimeEnv(
<< serialized_runtime_env;
delay_executor_(
[this, job_id, serialized_runtime_env, serialized_allocated_resource_instances,
callback] {
callback = std::move(callback)] {
CreateRuntimeEnv(job_id, serialized_runtime_env,
serialized_allocated_resource_instances, callback);
},
@ -180,26 +189,29 @@ void AgentManager::CreateRuntimeEnv(
request.set_serialized_allocated_resource_instances(
serialized_allocated_resource_instances);
runtime_env_agent_client_->CreateRuntimeEnv(
request,
[this, job_id, serialized_runtime_env, serialized_allocated_resource_instances,
callback](const Status &status, const rpc::CreateRuntimeEnvReply &reply) {
request, [this, job_id, serialized_runtime_env,
serialized_allocated_resource_instances, callback = std::move(callback)](
const Status &status, const rpc::CreateRuntimeEnvReply &reply) {
if (status.ok()) {
if (reply.status() == rpc::AGENT_RPC_STATUS_OK) {
callback(true, reply.serialized_runtime_env_context());
callback(true, reply.serialized_runtime_env_context(),
/*setup_error_message*/ "");
} else {
RAY_LOG(ERROR) << "Failed to create runtime env: " << serialized_runtime_env
<< ", error message: " << reply.error_message();
callback(false, reply.serialized_runtime_env_context());
RAY_LOG(INFO) << "Failed to create runtime env: " << serialized_runtime_env
<< ", error message: " << reply.error_message();
callback(false, reply.serialized_runtime_env_context(),
/*setup_error_message*/ reply.error_message());
}
} else {
RAY_LOG(ERROR)
// TODO(sang): Invoke a callback if it fails more than X times.
RAY_LOG(INFO)
<< "Failed to create the runtime env: " << serialized_runtime_env
<< ", status = " << status
<< ", maybe there are some network problems, will retry it later.";
delay_executor_(
[this, job_id, serialized_runtime_env,
serialized_allocated_resource_instances, callback] {
serialized_allocated_resource_instances, callback = std::move(callback)] {
CreateRuntimeEnv(job_id, serialized_runtime_env,
serialized_allocated_resource_instances, callback);
},
@ -228,6 +240,7 @@ void AgentManager::DeleteURIs(const std::vector<std::string> &uris,
if (reply.status() == rpc::AGENT_RPC_STATUS_OK) {
callback(true);
} else {
// TODO(sang): Find a better way to delivering error messages in this case.
RAY_LOG(ERROR) << "Failed to delete URIs"
<< ", error message: " << reply.error_message();
callback(false);

View file

@ -35,8 +35,14 @@ typedef std::function<std::shared_ptr<rpc::RuntimeEnvAgentClientInterface>(
const std::string &ip_address, int port)>
RuntimeEnvAgentClientFactoryFn;
/// Callback that's callaed after runtime env is created.
/// \param[in] successful Whether or not the creation was successful.
/// \param[in] serialized_runtime_env_context Serialized context.
/// \param[in] setup_error_message The error message if runtime env creation fails.
/// It must be only set when successful == false.
typedef std::function<void(bool successful,
const std::string &serialized_runtime_env_context)>
const std::string &serialized_runtime_env_context,
const std::string &setup_error_message)>
CreateRuntimeEnvCallback;
typedef std::function<void(bool successful)> DeleteURIsCallback;

View file

@ -154,7 +154,8 @@ bool ClusterTaskManager::PoppedWorkerHandler(
const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const TaskID &task_id, SchedulingClass scheduling_class,
const std::shared_ptr<internal::Work> &work, bool is_detached_actor,
const rpc::Address &owner_address) {
const rpc::Address &owner_address,
const std::string &runtime_env_setup_error_message) {
const auto &reply = work->reply;
const auto &callback = work->callback;
bool canceled = work->GetState() == internal::WorkStatus::CANCELLED;
@ -246,7 +247,8 @@ bool ClusterTaskManager::PoppedWorkerHandler(
// `CancelTask`.
CancelTask(
task_id,
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED);
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_RUNTIME_ENV_SETUP_FAILED,
/*scheduling_failure_message*/ runtime_env_setup_error_message);
} else {
// In other cases, set the work status `WAITING` to make this task
// could be re-dispatched.
@ -452,10 +454,11 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
worker_pool_.PopWorker(
spec,
[this, task_id, scheduling_class, work, is_detached_actor, owner_address](
const std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) -> bool {
const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool {
return PoppedWorkerHandler(worker, status, task_id, scheduling_class, work,
is_detached_actor, owner_address);
is_detached_actor, owner_address,
runtime_env_setup_error_message);
},
allocated_instances_serialized_json);
work_it++;
@ -663,17 +666,20 @@ void ClusterTaskManager::ReleaseTaskArgs(const TaskID &task_id) {
}
void ReplyCancelled(std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type) {
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
auto reply = work->reply;
auto callback = work->callback;
reply->set_canceled(true);
reply->set_failure_type(failure_type);
reply->set_scheduling_failure_message(scheduling_failure_message);
callback();
}
bool ClusterTaskManager::CancelTask(
const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type) {
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
// TODO(sang): There are lots of repetitive code around task backlogs. We should
// refactor them.
for (auto shapes_it = tasks_to_schedule_.begin(); shapes_it != tasks_to_schedule_.end();
@ -683,7 +689,7 @@ bool ClusterTaskManager::CancelTask(
const auto &task = (*work_it)->task;
if (task.GetTaskSpecification().TaskId() == task_id) {
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from schedule queue.";
ReplyCancelled(*work_it, failure_type);
ReplyCancelled(*work_it, failure_type, scheduling_failure_message);
work_queue.erase(work_it);
if (work_queue.empty()) {
tasks_to_schedule_.erase(shapes_it);
@ -699,7 +705,7 @@ bool ClusterTaskManager::CancelTask(
const auto &task = (*work_it)->task;
if (task.GetTaskSpecification().TaskId() == task_id) {
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
ReplyCancelled(*work_it, failure_type);
ReplyCancelled(*work_it, failure_type, scheduling_failure_message);
if ((*work_it)->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
// We've already acquired resources so we need to release them.
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
@ -729,7 +735,7 @@ bool ClusterTaskManager::CancelTask(
const auto &task = (*work_it)->task;
if (task.GetTaskSpecification().TaskId() == task_id) {
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from infeasible queue.";
ReplyCancelled(*work_it, failure_type);
ReplyCancelled(*work_it, failure_type, scheduling_failure_message);
work_queue.erase(work_it);
if (work_queue.empty()) {
infeasible_tasks_.erase(shapes_it);
@ -742,7 +748,7 @@ bool ClusterTaskManager::CancelTask(
auto iter = waiting_tasks_index_.find(task_id);
if (iter != waiting_tasks_index_.end()) {
const auto &task = (*iter->second)->task;
ReplyCancelled(*iter->second, failure_type);
ReplyCancelled(*iter->second, failure_type, scheduling_failure_message);
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());

View file

@ -203,10 +203,10 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(
const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED) override;
bool CancelTask(const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "") override;
/// Populate the list of pending or infeasible actor tasks for node stats.
///
@ -294,7 +294,8 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
PopWorkerStatus status, const TaskID &task_id,
SchedulingClass scheduling_class,
const std::shared_ptr<internal::Work> &work,
bool is_detached_actor, const rpc::Address &owner_address);
bool is_detached_actor, const rpc::Address &owner_address,
const std::string &runtime_env_setup_error_message);
/// (Step 3) Attempts to dispatch all tasks which are ready to run. A task
/// will be dispatched if it is on `tasks_to_dispatch_` and there are still

View file

@ -88,7 +88,8 @@ class ClusterTaskManagerInterface {
virtual bool CancelTask(
const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED) = 0;
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "") = 0;
/// Set the worker backlog size for a particular scheduling class.
///

View file

@ -66,12 +66,15 @@ class MockWorkerPool : public WorkerPoolInterface {
return {};
}
void TriggerCallbacksWithNotOKStatus(PopWorkerStatus status) {
void TriggerCallbacksWithNotOKStatus(
PopWorkerStatus status, const std::string &runtime_env_setup_error_msg = "") {
RAY_CHECK(status != PopWorkerStatus::OK);
for (const auto &pair : callbacks) {
for (const auto &callback : pair.second) {
// No task should be dispatched.
ASSERT_FALSE(callback(nullptr, status));
ASSERT_FALSE(
callback(nullptr, status,
/*runtime_env_setup_error_msg*/ runtime_env_setup_error_msg));
}
}
callbacks.clear();
@ -88,7 +91,7 @@ class MockWorkerPool : public WorkerPoolInterface {
RAY_CHECK(!list.empty());
for (auto list_it = list.begin(); list_it != list.end();) {
auto &callback = *list_it;
dispatched = callback(worker, PopWorkerStatus::OK);
dispatched = callback(worker, PopWorkerStatus::OK, "");
list_it = list.erase(list_it);
if (dispatched) {
break;
@ -914,12 +917,15 @@ TEST_F(ClusterTaskManagerTest, NotOKPopWorkerTest) {
ASSERT_EQ(NumTasksToDispatchWithStatus(internal::WorkStatus::WAITING), 0);
ASSERT_EQ(NumRunningTasks(), 1);
// The task should be cancelled.
pool_.TriggerCallbacksWithNotOKStatus(PopWorkerStatus::RuntimeEnvCreationFailed);
const auto runtime_env_error_msg = "Runtime env error message";
pool_.TriggerCallbacksWithNotOKStatus(PopWorkerStatus::RuntimeEnvCreationFailed,
runtime_env_error_msg);
ASSERT_TRUE(callback_called);
ASSERT_EQ(NumTasksToDispatchWithStatus(internal::WorkStatus::WAITING_FOR_WORKER), 0);
ASSERT_EQ(NumTasksToDispatchWithStatus(internal::WorkStatus::WAITING), 0);
ASSERT_EQ(NumRunningTasks(), 0);
ASSERT_TRUE(reply.canceled());
ASSERT_EQ(reply.scheduling_failure_message(), runtime_env_error_msg);
AssertNoLeaks();
}

View file

@ -172,6 +172,10 @@ void WorkerPool::SetAgentManager(std::shared_ptr<AgentManager> agent_manager) {
void WorkerPool::PopWorkerCallbackAsync(const PopWorkerCallback &callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) {
// This method shouldn't be invoked when runtime env creation has failed because
// when runtime env is failed to be created, they are all
// invoking the callback immediately.
RAY_CHECK(status != PopWorkerStatus::RuntimeEnvCreationFailed);
// Call back this function asynchronously to make sure executed in different stack.
io_service_->post([this, callback, worker,
status]() { PopWorkerCallbackInternal(callback, worker, status); },
@ -182,7 +186,7 @@ void WorkerPool::PopWorkerCallbackInternal(const PopWorkerCallback &callback,
std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) {
RAY_CHECK(callback);
auto used = callback(worker, status);
auto used = callback(worker, status, /*runtime_env_setup_error_message*/ "");
if (worker && !used) {
// The invalid worker not used, restore it to worker pool.
PushWorker(worker);
@ -613,14 +617,15 @@ void WorkerPool::HandleJobStarted(const JobID &job_id, const rpc::JobConfig &job
<< ". The runtime environment was " << runtime_env << ".";
CreateRuntimeEnv(
runtime_env, job_id,
[job_id](bool successful, const std::string &serialized_runtime_env_context) {
[job_id](bool successful, const std::string &serialized_runtime_env_context,
const std::string &setup_error_message) {
if (successful) {
RAY_LOG(INFO) << "[Eagerly] Create runtime env successful for job " << job_id
<< ". The result context was " << serialized_runtime_env_context
<< ".";
} else {
RAY_LOG(ERROR) << "[Eagerly] Couldn't create a runtime environment for job "
<< job_id << ".";
<< job_id << ". Error message: " << setup_error_message;
}
});
}
@ -882,7 +887,11 @@ void WorkerPool::InvokePopWorkerCallbackForProcess(
*task_id = it->second.task_id;
const auto &callback = it->second.callback;
RAY_CHECK(callback);
*worker_used = callback(worker, status);
// This method shouldn't be invoked when runtime env creation has failed because
// when runtime env is failed to be created, they are all
// invoking the callback immediately.
RAY_CHECK(status != PopWorkerStatus::RuntimeEnvCreationFailed);
*worker_used = callback(worker, status, /*runtime_env_setup_error_message*/ "");
starting_workers_to_tasks.erase(it);
}
}
@ -1135,14 +1144,16 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
CreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
[this, start_worker_process_fn, callback, &state, task_spec, dynamic_options](
bool successful, const std::string &serialized_runtime_env_context) {
bool successful, const std::string &serialized_runtime_env_context,
const std::string &setup_error_message) {
if (successful) {
start_worker_process_fn(task_spec, state, dynamic_options, true,
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context, callback);
} else {
process_failed_runtime_env_setup_failed_++;
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed);
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed,
/*runtime_env_setup_error_message*/ setup_error_message);
RAY_LOG(WARNING)
<< "Create runtime env failed for task " << task_spec.TaskId()
<< " and couldn't create the dedicated worker.";
@ -1194,14 +1205,16 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
CreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
[this, start_worker_process_fn, callback, &state, task_spec](
bool successful, const std::string &serialized_runtime_env_context) {
bool successful, const std::string &serialized_runtime_env_context,
const std::string &setup_error_message) {
if (successful) {
start_worker_process_fn(task_spec, state, {}, false,
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context, callback);
} else {
process_failed_runtime_env_setup_failed_++;
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed);
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed,
/*runtime_env_setup_error_message*/ setup_error_message);
RAY_LOG(WARNING)
<< "Create runtime env failed for task " << task_spec.TaskId()
<< " and couldn't create the worker.";
@ -1493,20 +1506,21 @@ WorkerPool::IOWorkerState &WorkerPool::GetIOWorkerStateFromWorkerType(
void WorkerPool::CreateRuntimeEnv(
const std::string &serialized_runtime_env, const JobID &job_id,
const std::function<void(bool, const std::string &)> &callback,
const CreateRuntimeEnvCallback &callback,
const std::string &serialized_allocated_resource_instances) {
// create runtime env.
agent_manager_->CreateRuntimeEnv(
job_id, serialized_runtime_env, serialized_allocated_resource_instances,
[job_id, serialized_runtime_env, callback](
bool successful, const std::string &serialized_runtime_env_context) {
[job_id, serialized_runtime_env = std::move(serialized_runtime_env), callback](
bool successful, const std::string &serialized_runtime_env_context,
const std::string &setup_error_message) {
if (successful) {
callback(true, serialized_runtime_env_context);
callback(true, serialized_runtime_env_context, "");
} else {
RAY_LOG(WARNING) << "Couldn't create a runtime environment for job " << job_id
<< ". The runtime environment was " << serialized_runtime_env
<< ".";
callback(false, "");
callback(false, "", setup_error_message);
}
});
}

View file

@ -61,10 +61,17 @@ enum PopWorkerStatus {
RuntimeEnvCreationFailed = 4,
};
/// \Return true if the worker was used. Otherwise, return false and the worker will be
/// returned to the worker pool.
/// \param[in] worker The started worker instance. Nullptr if worker is not started.
/// \param[in] status The pop worker status. OK if things go well. Otherwise, it will
/// contain the error status.
/// \param[in] runtime_env_setup_error_message The error message
/// when runtime env setup is failed. This should be empty unless status ==
/// RuntimeEnvCreationFailed.
/// \return true if the worker was used. Otherwise, return false
/// and the worker will be returned to the worker pool.
using PopWorkerCallback = std::function<bool(
const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status)>;
const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message)>;
/// \class WorkerPoolInterface
///
@ -621,7 +628,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// Create runtime env asynchronously by runtime env agent.
void CreateRuntimeEnv(
const std::string &serialized_runtime_env, const JobID &job_id,
const std::function<void(bool, const std::string &)> &callback,
const CreateRuntimeEnvCallback &callback,
const std::string &serialized_allocated_resource_instances = "{}");
void AddStartingWorkerProcess(

View file

@ -33,6 +33,7 @@ int POOL_SIZE_SOFT_LIMIT = 5;
int WORKER_REGISTER_TIMEOUT_SECONDS = 3;
JobID JOB_ID = JobID::FromInt(1);
std::string BAD_RUNTIME_ENV = "bad runtime env";
const std::string BAD_RUNTIME_ENV_ERROR_MSG = "bad runtime env";
std::vector<Language> LANGUAGES = {Language::PYTHON, Language::JAVA};
@ -86,6 +87,7 @@ class MockRuntimeEnvAgentClient : public rpc::RuntimeEnvAgentClientInterface {
rpc::CreateRuntimeEnvReply reply;
if (request.serialized_runtime_env() == BAD_RUNTIME_ENV) {
reply.set_status(rpc::AGENT_RPC_STATUS_FAILED);
reply.set_error_message(BAD_RUNTIME_ENV_ERROR_MSG);
} else {
rpc::RuntimeEnv runtime_env;
if (google::protobuf::util::JsonStringToMessage(request.serialized_runtime_env(),
@ -314,23 +316,27 @@ class WorkerPoolMock : public WorkerPool {
// worker synchronously.
// \param[in] push_workers If true, tries to push the workers from the started
// processes.
std::shared_ptr<WorkerInterface> PopWorkerSync(const TaskSpecification &task_spec,
bool push_workers = true,
PopWorkerStatus *worker_status = nullptr,
int timeout_worker_number = 0) {
std::shared_ptr<WorkerInterface> PopWorkerSync(
const TaskSpecification &task_spec, bool push_workers = true,
PopWorkerStatus *worker_status = nullptr, int timeout_worker_number = 0,
std::string *runtime_env_error_msg = nullptr) {
std::shared_ptr<WorkerInterface> popped_worker = nullptr;
std::promise<bool> promise;
this->PopWorker(task_spec,
[&popped_worker, worker_status, &promise](
const std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) -> bool {
popped_worker = worker;
if (worker_status != nullptr) {
*worker_status = status;
}
promise.set_value(true);
return true;
});
this->PopWorker(
task_spec,
[&popped_worker, worker_status, &promise, runtime_env_error_msg](
const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool {
popped_worker = worker;
if (worker_status != nullptr) {
*worker_status = status;
}
if (runtime_env_error_msg) {
*runtime_env_error_msg = runtime_env_setup_error_message;
}
promise.set_value(true);
return true;
});
if (push_workers) {
PushWorkers(timeout_worker_number);
}
@ -773,9 +779,10 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) {
// Try to pop some workers. Some worker processes will be started.
for (int i = 0; i < MAXIMUM_STARTUP_CONCURRENCY; i++) {
worker_pool_->PopWorker(task_spec,
[](const std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) -> bool { return true; });
worker_pool_->PopWorker(
task_spec,
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool { return true; });
auto last_process = worker_pool_->LastStartedWorkerProcess();
RAY_CHECK(last_process.IsValid());
started_processes.push_back(last_process);
@ -783,9 +790,10 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) {
// Can't start a new worker process at this point.
ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting());
worker_pool_->PopWorker(task_spec,
[](const std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) -> bool { return true; });
worker_pool_->PopWorker(
task_spec,
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool { return true; });
ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting());
std::vector<std::shared_ptr<WorkerInterface>> workers;
@ -803,9 +811,10 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) {
// Can't start a new worker process at this point.
ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting());
worker_pool_->PopWorker(task_spec,
[](const std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) -> bool { return true; });
worker_pool_->PopWorker(
task_spec,
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool { return true; });
ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting());
// Call `OnWorkerStarted` to emulate worker port announcement.
@ -1622,7 +1631,8 @@ TEST_F(WorkerPoolTest, WorkerNoLeaks) {
// Pop a worker and don't dispatch.
worker_pool_->PopWorker(
task_spec,
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status) -> bool {
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool {
// Don't dispatch this worker.
return false;
});
@ -1637,7 +1647,8 @@ TEST_F(WorkerPoolTest, WorkerNoLeaks) {
// Pop a worker and don't dispatch.
worker_pool_->PopWorker(
task_spec,
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status) -> bool {
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool {
// Don't dispatch this worker.
return false;
});
@ -1647,7 +1658,8 @@ TEST_F(WorkerPoolTest, WorkerNoLeaks) {
// Pop a worker and dispatch.
worker_pool_->PopWorker(
task_spec,
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status) -> bool {
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool {
// Dispatch this worker.
return true;
});
@ -1665,9 +1677,10 @@ TEST_F(WorkerPoolTest, PopWorkerStatus) {
// Startup worker processes to maximum.
for (int i = 0; i < MAXIMUM_STARTUP_CONCURRENCY; i++) {
auto task_spec = ExampleTaskSpec();
worker_pool_->PopWorker(task_spec,
[](const std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status) -> bool { return true; });
worker_pool_->PopWorker(
task_spec,
[](const std::shared_ptr<WorkerInterface> worker, PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool { return true; });
}
ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting());
@ -1705,11 +1718,13 @@ TEST_F(WorkerPoolTest, PopWorkerStatus) {
const auto task_spec_with_bad_runtime_env = ExampleTaskSpec(
ActorID::Nil(), Language::PYTHON, job_id, ActorID::Nil(), {"XXX=YYY"},
TaskID::FromRandom(JobID::Nil()), ExampleRuntimeEnvInfoFromString(BAD_RUNTIME_ENV));
popped_worker =
worker_pool_->PopWorkerSync(task_spec_with_bad_runtime_env, true, &status);
std::string error_msg;
popped_worker = worker_pool_->PopWorkerSync(task_spec_with_bad_runtime_env, true,
&status, 0, &error_msg);
// PopWorker failed and the status is `RuntimeEnvCreationFailed`.
ASSERT_EQ(popped_worker, nullptr);
ASSERT_EQ(status, PopWorkerStatus::RuntimeEnvCreationFailed);
ASSERT_EQ(error_msg, BAD_RUNTIME_ENV_ERROR_MSG);
// Create a task with available runtime env.
const auto task_spec_with_runtime_env = ExampleTaskSpec(