Exit workers when raylet dies unexpectedly (#6014)

This commit is contained in:
Edward Oakes 2019-10-30 20:29:25 -07:00 committed by GitHub
parent e9e78871b9
commit 16e9dfd2e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 17 deletions

View file

@ -176,14 +176,21 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
} }
CoreWorker::~CoreWorker() { CoreWorker::~CoreWorker() {
io_service_.stop(); Shutdown();
io_thread_.join(); io_thread_.join();
if (worker_type_ == WorkerType::WORKER) { }
task_execution_service_.stop();
} void CoreWorker::Shutdown() {
if (log_dir_ != "") { if (!shutdown_) {
RayLog::ShutDownRayLog(); io_service_.stop();
if (worker_type_ == WorkerType::WORKER) {
task_execution_service_.stop();
}
if (log_dir_ != "") {
RayLog::ShutDownRayLog();
}
} }
shutdown_ = true;
} }
void CoreWorker::Disconnect() { void CoreWorker::Disconnect() {
@ -236,18 +243,21 @@ void CoreWorker::ReportActiveObjectIDs() {
absl::MutexLock lock(&object_ref_mu_); absl::MutexLock lock(&object_ref_mu_);
// Only send a heartbeat when the set of active object IDs has changed because the // Only send a heartbeat when the set of active object IDs has changed because the
// raylet only modifies the set of IDs when it receives a heartbeat. // raylet only modifies the set of IDs when it receives a heartbeat.
if (active_object_ids_updated_) { // TODO(edoakes): this is currently commented out because this heartbeat causes the
RAY_LOG(DEBUG) << "Sending " << active_object_ids_.size() << " object IDs to raylet."; // workers to die when the raylet crashes unexpectedly. Without this, they could
if (active_object_ids_.size() > // hang idle forever because they wait for the raylet to push tasks via gRPC.
RayConfig::instance().raylet_max_active_object_ids()) { // if (active_object_ids_updated_) {
RAY_LOG(WARNING) << active_object_ids_.size() RAY_LOG(DEBUG) << "Sending " << active_object_ids_.size() << " object IDs to raylet.";
<< "object IDs are currently in scope. " if (active_object_ids_.size() > RayConfig::instance().raylet_max_active_object_ids()) {
<< "This may lead to required objects being garbage collected."; RAY_LOG(WARNING) << active_object_ids_.size() << "object IDs are currently in scope. "
} << "This may lead to required objects being garbage collected.";
std::unordered_set<ObjectID> copy;
copy.insert(active_object_ids_.begin(), active_object_ids_.end());
RAY_CHECK_OK(raylet_client_->ReportActiveObjectIDs(copy));
} }
std::unordered_set<ObjectID> copy(active_object_ids_.begin(), active_object_ids_.end());
if (!raylet_client_->ReportActiveObjectIDs(copy).ok()) {
RAY_LOG(ERROR) << "Raylet connection failed. Shutting down.";
Shutdown();
}
// }
// Reset the timer from the previous expiration time to avoid drift. // Reset the timer from the previous expiration time to avoid drift.
heartbeat_timer_.expires_at( heartbeat_timer_.expires_at(

View file

@ -282,6 +282,10 @@ class CoreWorker {
/// Run the io_service_ event loop. This should be called in a background thread. /// Run the io_service_ event loop. This should be called in a background thread.
void RunIOService(); void RunIOService();
/// Shut down the worker completely.
/// \return void.
void Shutdown();
/// Send the list of active object IDs to the raylet. /// Send the list of active object IDs to the raylet.
void ReportActiveObjectIDs() LOCKS_EXCLUDED(object_ref_mu_); void ReportActiveObjectIDs() LOCKS_EXCLUDED(object_ref_mu_);
@ -361,6 +365,9 @@ class CoreWorker {
/// worker context. /// worker context.
TaskID main_thread_task_id_; TaskID main_thread_task_id_;
// Flag indicating whether this worker has been shut down.
bool shutdown_ = false;
/// Event loop where the IO events are handled. e.g. async GCS operations. /// Event loop where the IO events are handled. e.g. async GCS operations.
boost::asio::io_service io_service_; boost::asio::io_service io_service_;