diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5e5147460..f61404231 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -27,6 +27,7 @@ namespace ray { namespace core { +namespace { // Duration between internal book-keeping heartbeats. const uint64_t kInternalHeartbeatMillis = 1000; @@ -89,6 +90,16 @@ ObjectLocation CreateObjectLocation(const rpc::GetObjectLocationsOwnerReply &rep /// The global instance of `CoreWorkerProcess`. std::unique_ptr core_worker_process; +/// Teriminate the process without cleaning up the resources. +/// It will flush the log if logging_enabled is set to true. +void QuickExit(bool logging_enabled) { + if (logging_enabled) { + RayLog::ShutDownRayLog(); + } + _Exit(1); +} +} // namespace + thread_local std::weak_ptr CoreWorkerProcess::current_core_worker_; void CoreWorkerProcess::Initialize(const CoreWorkerOptions &options) { @@ -148,18 +159,8 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options) // NOTE(kfstorm): any initialization depending on RayConfig must happen after this line. InitializeSystemConfig(); - if (options_.num_workers == 1) { - // We need to create the worker instance here if: - // 1. This is a driver process. In this case, the driver is ready to use right after - // the CoreWorkerProcess::Initialize. - // 2. This is a Python worker process. In this case, Python will invoke some core - // worker APIs before `CoreWorkerProcess::RunTaskExecutionLoop` is called. So we need - // to create the worker instance here. One example of invocations is - // https://github.com/ray-project/ray/blob/45ce40e5d44801193220d2c546be8de0feeef988/python/ray/worker.py#L1281. - if (options_.worker_type == WorkerType::DRIVER || - options_.language == Language::PYTHON) { - CreateWorker(); - } + if (ShouldCreateGlobalWorkerOnConstruction()) { + CreateWorker(); } // Assume stats module will be initialized exactly once in once process. @@ -257,6 +258,18 @@ void CoreWorkerProcess::InitializeSystemConfig() { RayConfig::instance().initialize(promise.get_future().get()); } +bool CoreWorkerProcess::ShouldCreateGlobalWorkerOnConstruction() const { + // We need to create the worker instance here if: + // 1. This is a driver process. In this case, the driver is ready to use right after + // the CoreWorkerProcess::Initialize. + // 2. This is a Python worker process. In this case, Python will invoke some core + // worker APIs before `CoreWorkerProcess::RunTaskExecutionLoop` is called. So we need + // to create the worker instance here. One example of invocations is + // https://github.com/ray-project/ray/blob/45ce40e5d44801193220d2c546be8de0feeef988/python/ray/worker.py#L1281. + return options_.num_workers == 1 && (options_.worker_type == WorkerType::DRIVER || + options_.language == Language::PYTHON); +} + std::shared_ptr CoreWorkerProcess::TryGetWorker(const WorkerID &worker_id) { if (!core_worker_process) { return nullptr; @@ -273,6 +286,16 @@ CoreWorker &CoreWorkerProcess::GetCoreWorker() { EnsureInitialized(); if (core_worker_process->options_.num_workers == 1) { auto global_worker = core_worker_process->GetGlobalWorker(); + if (core_worker_process->ShouldCreateGlobalWorkerOnConstruction() && !global_worker) { + // This could only happen when the worker has already been shutdown. + // In this case, we should exit without crashing. + // TODO (scv119): A better solution could be returning error code + // and handling it at language frontend. + RAY_LOG(ERROR) << "The global worker has already been shutdown. This happens when " + "the language frontend accesses the Ray's worker after it is " + "shutdown. The process will exit"; + QuickExit(core_worker_process->options_.enable_logging); + } RAY_CHECK(global_worker) << "global_worker_ must not be NULL"; return *global_worker; } @@ -417,11 +440,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // Avoid using FATAL log or RAY_CHECK here because they may create a core dump file. RAY_LOG(ERROR) << "Failed to register worker " << worker_id << " to Raylet. " << raylet_client_status; - if (options_.enable_logging) { - RayLog::ShutDownRayLog(); - } // Quit the process immediately. - _Exit(1); + QuickExit(options_.enable_logging); } connected_ = true; @@ -2882,13 +2902,10 @@ void CoreWorker::HandleCancelTask(const rpc::CancelTaskRequest &request, << " has received a force kill request after the cancellation. Killing " "a worker..."; Disconnect(); - if (options_.enable_logging) { - RayLog::ShutDownRayLog(); - } - // NOTE(hchen): Use `_Exit()` to force-exit this process without doing cleanup. + // NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup. // `exit()` will destruct static objects in an incorrect order, which will lead to // core dumps. - _Exit(1); + QuickExit(options_.enable_logging); } } @@ -2921,13 +2938,10 @@ void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request, "please create the Java actor with some dynamic options to make it being " "hosted in a dedicated worker process."; } - if (options_.enable_logging) { - RayLog::ShutDownRayLog(); - } - // NOTE(hchen): Use `_Exit()` to force-exit this process without doing cleanup. + // NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup. // `exit()` will destruct static objects in an incorrect order, which will lead to // core dumps. - _Exit(1); + QuickExit(options_.enable_logging); } else { Exit(rpc::WorkerExitType::INTENDED_EXIT); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b695e521a..b57b87c2c 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -291,6 +291,9 @@ class CoreWorkerProcess { void InitializeSystemConfig(); + /// Check that if the global worker should be created on construction. + bool ShouldCreateGlobalWorkerOnConstruction() const; + /// Get the `CoreWorker` instance by worker ID. /// /// \param[in] workerId The worker ID.