mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Core] Better logs job message failure (#20363)
<!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? There's one user who has an issue that one of raylets cannot schedule tasks anymore because `num_worker_not_started_by_job_config_not_exist ` > 0. This PR adds better log messages to figure out if the root cause is the job information is not properly propagated from GCS to raylet through Redis pubsub. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :(
This commit is contained in:
parent
2b3d0c691f
commit
137aec04c0
3 changed files with 24 additions and 3 deletions
|
@ -542,7 +542,9 @@ void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker,
|
||||||
}
|
}
|
||||||
|
|
||||||
void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_data) {
|
void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_data) {
|
||||||
RAY_LOG(DEBUG) << "HandleJobStarted for job " << job_id;
|
RAY_LOG(INFO) << "New job has started. Job id " << job_id << " Driver pid "
|
||||||
|
<< job_data.driver_pid() << " is dead: " << job_data.is_dead()
|
||||||
|
<< " driver address: " << job_data.driver_ip_address();
|
||||||
worker_pool_.HandleJobStarted(job_id, job_data.config());
|
worker_pool_.HandleJobStarted(job_id, job_data.config());
|
||||||
// NOTE: Technically `HandleJobStarted` isn't idempotent because we'll
|
// NOTE: Technically `HandleJobStarted` isn't idempotent because we'll
|
||||||
// increment the ref count multiple times. This is fine because
|
// increment the ref count multiple times. This is fine because
|
||||||
|
|
|
@ -193,6 +193,7 @@ Process WorkerPool::StartWorkerProcess(
|
||||||
RAY_LOG(DEBUG) << "Job config of job " << job_id << " are not local yet.";
|
RAY_LOG(DEBUG) << "Job config of job " << job_id << " are not local yet.";
|
||||||
// Will reschedule ready tasks in `NodeManager::HandleJobStarted`.
|
// Will reschedule ready tasks in `NodeManager::HandleJobStarted`.
|
||||||
*status = PopWorkerStatus::JobConfigMissing;
|
*status = PopWorkerStatus::JobConfigMissing;
|
||||||
|
process_failed_job_config_missing_++;
|
||||||
return Process();
|
return Process();
|
||||||
}
|
}
|
||||||
job_config = &it->second;
|
job_config = &it->second;
|
||||||
|
@ -215,6 +216,7 @@ Process WorkerPool::StartWorkerProcess(
|
||||||
<< " workers of language type " << static_cast<int>(language)
|
<< " workers of language type " << static_cast<int>(language)
|
||||||
<< " pending registration";
|
<< " pending registration";
|
||||||
*status = PopWorkerStatus::TooManyStartingWorkerProcesses;
|
*status = PopWorkerStatus::TooManyStartingWorkerProcesses;
|
||||||
|
process_failed_rate_limited_++;
|
||||||
return Process();
|
return Process();
|
||||||
}
|
}
|
||||||
// Either there are no workers pending registration or the worker start is being forced.
|
// Either there are no workers pending registration or the worker start is being forced.
|
||||||
|
@ -447,6 +449,7 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
|
||||||
RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId()
|
RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId()
|
||||||
<< ") have not registered to raylet within timeout.";
|
<< ") have not registered to raylet within timeout.";
|
||||||
PopWorkerStatus status = PopWorkerStatus::WorkerPendingRegistration;
|
PopWorkerStatus status = PopWorkerStatus::WorkerPendingRegistration;
|
||||||
|
process_failed_pending_registration_++;
|
||||||
bool found;
|
bool found;
|
||||||
bool used;
|
bool used;
|
||||||
TaskID task_id;
|
TaskID task_id;
|
||||||
|
@ -1067,13 +1070,14 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
|
||||||
// create runtime env.
|
// create runtime env.
|
||||||
CreateRuntimeEnv(
|
CreateRuntimeEnv(
|
||||||
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
|
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
|
||||||
[start_worker_process_fn, callback, &state, task_spec, dynamic_options](
|
[this, start_worker_process_fn, callback, &state, task_spec, dynamic_options](
|
||||||
bool successful, const std::string &serialized_runtime_env_context) {
|
bool successful, const std::string &serialized_runtime_env_context) {
|
||||||
if (successful) {
|
if (successful) {
|
||||||
start_worker_process_fn(task_spec, state, dynamic_options, true,
|
start_worker_process_fn(task_spec, state, dynamic_options, true,
|
||||||
task_spec.SerializedRuntimeEnv(),
|
task_spec.SerializedRuntimeEnv(),
|
||||||
serialized_runtime_env_context, callback);
|
serialized_runtime_env_context, callback);
|
||||||
} else {
|
} else {
|
||||||
|
process_failed_runtime_env_setup_failed_++;
|
||||||
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed);
|
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed);
|
||||||
RAY_LOG(WARNING)
|
RAY_LOG(WARNING)
|
||||||
<< "Create runtime env failed for task " << task_spec.TaskId()
|
<< "Create runtime env failed for task " << task_spec.TaskId()
|
||||||
|
@ -1125,13 +1129,14 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
|
||||||
// create runtime env.
|
// create runtime env.
|
||||||
CreateRuntimeEnv(
|
CreateRuntimeEnv(
|
||||||
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
|
task_spec.SerializedRuntimeEnv(), task_spec.JobId(),
|
||||||
[start_worker_process_fn, callback, &state, task_spec](
|
[this, start_worker_process_fn, callback, &state, task_spec](
|
||||||
bool successful, const std::string &serialized_runtime_env_context) {
|
bool successful, const std::string &serialized_runtime_env_context) {
|
||||||
if (successful) {
|
if (successful) {
|
||||||
start_worker_process_fn(task_spec, state, {}, false,
|
start_worker_process_fn(task_spec, state, {}, false,
|
||||||
task_spec.SerializedRuntimeEnv(),
|
task_spec.SerializedRuntimeEnv(),
|
||||||
serialized_runtime_env_context, callback);
|
serialized_runtime_env_context, callback);
|
||||||
} else {
|
} else {
|
||||||
|
process_failed_runtime_env_setup_failed_++;
|
||||||
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed);
|
callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed);
|
||||||
RAY_LOG(WARNING)
|
RAY_LOG(WARNING)
|
||||||
<< "Create runtime env failed for task " << task_spec.TaskId()
|
<< "Create runtime env failed for task " << task_spec.TaskId()
|
||||||
|
@ -1381,6 +1386,14 @@ std::unordered_set<std::shared_ptr<WorkerInterface>> WorkerPool::GetWorkersByPro
|
||||||
std::string WorkerPool::DebugString() const {
|
std::string WorkerPool::DebugString() const {
|
||||||
std::stringstream result;
|
std::stringstream result;
|
||||||
result << "WorkerPool:";
|
result << "WorkerPool:";
|
||||||
|
result << "\n- registered jobs: " << all_jobs_.size() - finished_jobs_.size();
|
||||||
|
result << "\n- process_failed_job_config_missing: "
|
||||||
|
<< process_failed_job_config_missing_;
|
||||||
|
result << "\n- process_failed_rate_limited: " << process_failed_rate_limited_;
|
||||||
|
result << "\n- process_failed_pending_registration: "
|
||||||
|
<< process_failed_pending_registration_;
|
||||||
|
result << "\n- process_failed_runtime_env_setup_failed: "
|
||||||
|
<< process_failed_runtime_env_setup_failed_;
|
||||||
for (const auto &entry : states_by_lang_) {
|
for (const auto &entry : states_by_lang_) {
|
||||||
result << "\n- num " << Language_Name(entry.first)
|
result << "\n- num " << Language_Name(entry.first)
|
||||||
<< " workers: " << entry.second.registered_workers.size();
|
<< " workers: " << entry.second.registered_workers.size();
|
||||||
|
|
|
@ -681,6 +681,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
|
||||||
const std::function<double()> get_time_;
|
const std::function<double()> get_time_;
|
||||||
/// Agent manager.
|
/// Agent manager.
|
||||||
std::shared_ptr<AgentManager> agent_manager_;
|
std::shared_ptr<AgentManager> agent_manager_;
|
||||||
|
|
||||||
|
/// Stats
|
||||||
|
int64_t process_failed_job_config_missing_ = 0;
|
||||||
|
int64_t process_failed_rate_limited_ = 0;
|
||||||
|
int64_t process_failed_pending_registration_ = 0;
|
||||||
|
int64_t process_failed_runtime_env_setup_failed_ = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace raylet
|
} // namespace raylet
|
||||||
|
|
Loading…
Add table
Reference in a new issue