mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Core]delete shim pid flag (#21853)
Now we have `startup-token` to identify registering worker, so the shim pid flag is not needed any more.
This commit is contained in:
parent
7fc1683bab
commit
51393abc16
20 changed files with 31 additions and 137 deletions
|
@ -1044,7 +1044,7 @@ cdef class CoreWorker:
|
|||
node_ip_address, node_manager_port, raylet_ip_address,
|
||||
local_mode, driver_name, stdout_file, stderr_file,
|
||||
serialized_job_config, metrics_agent_port, runtime_env_hash,
|
||||
worker_shim_pid, startup_token):
|
||||
startup_token):
|
||||
self.is_local_mode = local_mode
|
||||
|
||||
cdef CCoreWorkerOptions options = CCoreWorkerOptions()
|
||||
|
@ -1094,7 +1094,6 @@ cdef class CoreWorker:
|
|||
options.metrics_agent_port = metrics_agent_port
|
||||
options.connect_on_start = False
|
||||
options.runtime_env_hash = runtime_env_hash
|
||||
options.worker_shim_pid = worker_shim_pid
|
||||
options.startup_token = startup_token
|
||||
CCoreWorkerProcess.Initialize(options)
|
||||
|
||||
|
|
|
@ -318,7 +318,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
|||
int metrics_agent_port
|
||||
c_bool connect_on_start
|
||||
int runtime_env_hash
|
||||
int worker_shim_pid
|
||||
int startup_token
|
||||
|
||||
cdef cppclass CCoreWorkerProcess "ray::core::CoreWorkerProcess":
|
||||
|
|
|
@ -35,12 +35,9 @@ parser.add_argument(
|
|||
|
||||
args, remaining_args = parser.parse_known_args()
|
||||
|
||||
# add worker-shim-pid argument
|
||||
remaining_args.append("--worker-shim-pid={}".format(os.getpid()))
|
||||
py_executable: str = sys.executable
|
||||
command_str = " ".join([f"exec {py_executable}"] + remaining_args)
|
||||
child_pid = os.fork()
|
||||
print(f"shim pid:{os.getpid()} , worker pid:{child_pid}")
|
||||
if child_pid == 0:
|
||||
# child process
|
||||
os.execvp("bash", ["bash", "-c", command_str])
|
||||
|
|
|
@ -774,12 +774,6 @@ def main():
|
|||
required=False,
|
||||
type=str,
|
||||
help="Password for connecting to Redis")
|
||||
parser.add_argument(
|
||||
"--worker-shim-pid",
|
||||
required=False,
|
||||
type=int,
|
||||
default=0,
|
||||
help="The PID of the process for setup worker runtime env.")
|
||||
parser.add_argument(
|
||||
"--metrics-agent-port",
|
||||
required=False,
|
||||
|
|
|
@ -1342,7 +1342,6 @@ def connect(node,
|
|||
namespace=None,
|
||||
job_config=None,
|
||||
runtime_env_hash=0,
|
||||
worker_shim_pid=0,
|
||||
startup_token=0,
|
||||
ray_debugger_external=False):
|
||||
"""Connect this worker to the raylet, to Plasma, and to Redis.
|
||||
|
@ -1358,8 +1357,6 @@ def connect(node,
|
|||
job_id: The ID of job. If it's None, then we will generate one.
|
||||
job_config (ray.job_config.JobConfig): The job configuration.
|
||||
runtime_env_hash (int): The hash of the runtime env for this worker.
|
||||
worker_shim_pid (int): The PID of the process for setup worker
|
||||
runtime env.
|
||||
startup_token (int): The startup token of the process assigned to
|
||||
it during startup as a command line argument.
|
||||
ray_debugger_host (bool): The host to bind a Ray debugger to on
|
||||
|
@ -1525,8 +1522,7 @@ def connect(node,
|
|||
gcs_options, logs_dir, node.node_ip_address, node.node_manager_port,
|
||||
node.raylet_ip_address, (mode == LOCAL_MODE), driver_name,
|
||||
log_stdout_file_path, log_stderr_file_path, serialized_job_config,
|
||||
node.metrics_agent_port, runtime_env_hash, worker_shim_pid,
|
||||
startup_token)
|
||||
node.metrics_agent_port, runtime_env_hash, startup_token)
|
||||
|
||||
# Notify raylet that the core worker is ready.
|
||||
worker.core_worker.notify_raylet()
|
||||
|
|
|
@ -123,12 +123,6 @@ parser.add_argument(
|
|||
type=int,
|
||||
default=0,
|
||||
help="The computed hash of the runtime env for this worker.")
|
||||
parser.add_argument(
|
||||
"--worker-shim-pid",
|
||||
required=False,
|
||||
type=int,
|
||||
default=0,
|
||||
help="The PID of the process for setup worker runtime env.")
|
||||
parser.add_argument(
|
||||
"--startup-token",
|
||||
required=True,
|
||||
|
@ -199,7 +193,6 @@ if __name__ == "__main__":
|
|||
node,
|
||||
mode=mode,
|
||||
runtime_env_hash=args.runtime_env_hash,
|
||||
worker_shim_pid=args.worker_shim_pid,
|
||||
startup_token=args.startup_token,
|
||||
ray_debugger_external=args.ray_debugger_external)
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import argparse
|
||||
import logging
|
||||
import os
|
||||
|
||||
from ray._private.runtime_env.context import RuntimeEnvContext
|
||||
from ray.core.generated.common_pb2 import Language
|
||||
|
@ -21,8 +20,6 @@ parser.add_argument(
|
|||
|
||||
if __name__ == "__main__":
|
||||
args, remaining_args = parser.parse_known_args()
|
||||
# NOTE(chenk008): we still need it to start worker in container.
|
||||
remaining_args.append("--worker-shim-pid={}".format(os.getpid()))
|
||||
# NOTE(edoakes): args.serialized_runtime_env_context is only None when
|
||||
# we're starting the main Ray client proxy server. That case should
|
||||
# probably not even go through this codepath.
|
||||
|
|
|
@ -26,8 +26,6 @@ class MockWorkerInterface : public WorkerInterface {
|
|||
MOCK_METHOD(WorkerID, WorkerId, (), (const, override));
|
||||
MOCK_METHOD(Process, GetProcess, (), (const, override));
|
||||
MOCK_METHOD(void, SetProcess, (Process proc), (override));
|
||||
MOCK_METHOD(Process, GetShimProcess, (), (const, override));
|
||||
MOCK_METHOD(void, SetShimProcess, (Process proc), (override));
|
||||
MOCK_METHOD(Language, GetLanguage, (), (const, override));
|
||||
MOCK_METHOD(const std::string, IpAddress, (), (const, override));
|
||||
MOCK_METHOD(void, Connect, (int port), (override));
|
||||
|
|
|
@ -112,8 +112,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
|||
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
|
||||
options_.worker_type, worker_context_.GetCurrentJobID(), options_.runtime_env_hash,
|
||||
options_.language, options_.node_ip_address, &raylet_client_status,
|
||||
&local_raylet_id, &assigned_port, &serialized_job_config, options_.worker_shim_pid,
|
||||
options_.startup_token);
|
||||
&local_raylet_id, &assigned_port, &serialized_job_config, options_.startup_token);
|
||||
|
||||
if (!raylet_client_status.ok()) {
|
||||
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
|
||||
|
|
|
@ -78,8 +78,7 @@ struct CoreWorkerOptions {
|
|||
serialized_job_config(""),
|
||||
metrics_agent_port(-1),
|
||||
connect_on_start(true),
|
||||
runtime_env_hash(0),
|
||||
worker_shim_pid(0) {}
|
||||
runtime_env_hash(0) {}
|
||||
|
||||
/// Type of this worker (i.e., DRIVER or WORKER).
|
||||
WorkerType worker_type;
|
||||
|
@ -161,8 +160,6 @@ struct CoreWorkerOptions {
|
|||
bool connect_on_start;
|
||||
/// The hash of the runtime env for this worker.
|
||||
int runtime_env_hash;
|
||||
/// The PID of the process for setup worker runtime env.
|
||||
pid_t worker_shim_pid;
|
||||
/// The startup token of the process assigned to it
|
||||
/// during startup via command line arguments.
|
||||
/// This is needed because the actual core worker process
|
||||
|
|
|
@ -123,8 +123,6 @@ table RegisterClientRequest {
|
|||
worker_id: string;
|
||||
// The process ID of this worker.
|
||||
worker_pid: long;
|
||||
// The PID of the process for setup worker runtime env.
|
||||
worker_shim_pid: long;
|
||||
// The startup token of the process assigned to
|
||||
// it during startup as a command line argument.
|
||||
startup_token: long;
|
||||
|
|
|
@ -1081,7 +1081,6 @@ void NodeManager::ProcessRegisterClientRequestMessage(
|
|||
const int runtime_env_hash = static_cast<int>(message->runtime_env_hash());
|
||||
WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id());
|
||||
pid_t pid = message->worker_pid();
|
||||
pid_t worker_shim_pid = message->worker_shim_pid();
|
||||
StartupToken worker_startup_token = message->startup_token();
|
||||
std::string worker_ip_address = string_from_flatbuf(*message->ip_address());
|
||||
// TODO(suquark): Use `WorkerType` in `common.proto` without type converting.
|
||||
|
@ -1121,8 +1120,8 @@ void NodeManager::ProcessRegisterClientRequestMessage(
|
|||
worker_type == rpc::WorkerType::SPILL_WORKER ||
|
||||
worker_type == rpc::WorkerType::RESTORE_WORKER) {
|
||||
// Register the new worker.
|
||||
auto status = worker_pool_.RegisterWorker(worker, pid, worker_shim_pid,
|
||||
worker_startup_token, send_reply_callback);
|
||||
auto status = worker_pool_.RegisterWorker(worker, pid, worker_startup_token,
|
||||
send_reply_callback);
|
||||
if (!status.ok()) {
|
||||
// If the worker failed to register to Raylet, trigger task dispatching here to
|
||||
// allow new worker processes to be started (if capped by
|
||||
|
@ -1132,7 +1131,6 @@ void NodeManager::ProcessRegisterClientRequestMessage(
|
|||
} else {
|
||||
// Register the new driver.
|
||||
RAY_CHECK(pid >= 0);
|
||||
// Don't need to set shim pid for driver
|
||||
worker->SetProcess(Process::FromPid(pid));
|
||||
// Compute a dummy driver task id from a given driver.
|
||||
const TaskID driver_task_id = TaskID::ComputeDriverTaskId(worker_id);
|
||||
|
|
|
@ -70,9 +70,6 @@ class MockWorker : public WorkerInterface {
|
|||
StartupToken GetStartupToken() const { return 0; }
|
||||
void SetProcess(Process proc) { RAY_CHECK(false) << "Method unused"; }
|
||||
|
||||
Process GetShimProcess() const { return Process::CreateNewDummy(); }
|
||||
void SetShimProcess(Process proc) { RAY_CHECK(false) << "Method unused"; }
|
||||
|
||||
Language GetLanguage() const {
|
||||
RAY_CHECK(false) << "Method unused";
|
||||
return Language::PYTHON;
|
||||
|
|
|
@ -74,16 +74,6 @@ void Worker::SetStartupToken(StartupToken startup_token) {
|
|||
startup_token_ = startup_token;
|
||||
}
|
||||
|
||||
Process Worker::GetShimProcess() const {
|
||||
RAY_CHECK(worker_type_ != rpc::WorkerType::DRIVER);
|
||||
return shim_proc_;
|
||||
}
|
||||
|
||||
void Worker::SetShimProcess(Process proc) {
|
||||
RAY_CHECK(shim_proc_.IsNull()); // this procedure should not be called multiple times
|
||||
shim_proc_ = std::move(proc);
|
||||
}
|
||||
|
||||
Language Worker::GetLanguage() const { return language_; }
|
||||
|
||||
const std::string Worker::IpAddress() const { return ip_address_; }
|
||||
|
|
|
@ -52,9 +52,6 @@ class WorkerInterface {
|
|||
/// Return the worker process's startup token
|
||||
virtual StartupToken GetStartupToken() const = 0;
|
||||
virtual void SetProcess(Process proc) = 0;
|
||||
/// Return the worker shim process.
|
||||
virtual Process GetShimProcess() const = 0;
|
||||
virtual void SetShimProcess(Process proc) = 0;
|
||||
virtual Language GetLanguage() const = 0;
|
||||
virtual const std::string IpAddress() const = 0;
|
||||
/// Connect this worker's gRPC client.
|
||||
|
@ -145,9 +142,6 @@ class Worker : public WorkerInterface {
|
|||
/// Return the worker process's startup token
|
||||
StartupToken GetStartupToken() const;
|
||||
void SetProcess(Process proc);
|
||||
/// Return this worker shim process.
|
||||
Process GetShimProcess() const;
|
||||
void SetShimProcess(Process proc);
|
||||
Language GetLanguage() const;
|
||||
const std::string IpAddress() const;
|
||||
/// Connect this worker's gRPC client.
|
||||
|
@ -228,9 +222,6 @@ class Worker : public WorkerInterface {
|
|||
Process proc_;
|
||||
/// The worker's process's startup_token
|
||||
StartupToken startup_token_;
|
||||
/// The worker's shim process. The shim process PID is the same with worker process PID,
|
||||
/// except starting worker process in container.
|
||||
Process shim_proc_;
|
||||
/// The language type of this worker.
|
||||
Language language_;
|
||||
/// The type of the worker.
|
||||
|
|
|
@ -402,7 +402,6 @@ std::tuple<Process, StartupToken> WorkerPool::StartWorkerProcess(
|
|||
worker_command_args.push_back("--serialized-runtime-env-context=" +
|
||||
serialized_runtime_env_context);
|
||||
} else {
|
||||
// The "shim process" setup worker is not needed, so do not run it.
|
||||
// Check that the arg really is the path to the setup worker before erasing it, to
|
||||
// prevent breaking tests that mock out the worker command args.
|
||||
if (worker_command_args.size() >= 2 &&
|
||||
|
@ -649,22 +648,18 @@ boost::optional<const rpc::JobConfig &> WorkerPool::GetJobConfig(
|
|||
}
|
||||
|
||||
Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker,
|
||||
pid_t pid, pid_t worker_shim_pid,
|
||||
StartupToken worker_startup_token,
|
||||
pid_t pid, StartupToken worker_startup_token,
|
||||
std::function<void(Status, int)> send_reply_callback) {
|
||||
RAY_CHECK(worker);
|
||||
auto &state = GetStateForLanguage(worker->GetLanguage());
|
||||
auto it = state.starting_worker_processes.find(worker_startup_token);
|
||||
if (it == state.starting_worker_processes.end()) {
|
||||
RAY_LOG(WARNING)
|
||||
<< "Received a register request from an unknown worker shim process: "
|
||||
<< worker_shim_pid << ", token: " << worker_startup_token;
|
||||
RAY_LOG(WARNING) << "Received a register request from an unknown token: "
|
||||
<< worker_startup_token;
|
||||
Status status = Status::Invalid("Unknown worker");
|
||||
send_reply_callback(status, /*port=*/0);
|
||||
return status;
|
||||
}
|
||||
auto shim_process = Process::FromPid(worker_shim_pid);
|
||||
worker->SetShimProcess(shim_process);
|
||||
auto process = Process::FromPid(pid);
|
||||
worker->SetProcess(process);
|
||||
|
||||
|
@ -695,9 +690,7 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
|
|||
|
||||
void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker) {
|
||||
auto &state = GetStateForLanguage(worker->GetLanguage());
|
||||
const auto &shim_process = worker->GetShimProcess();
|
||||
const StartupToken worker_startup_token = worker->GetStartupToken();
|
||||
RAY_CHECK(shim_process.IsValid());
|
||||
|
||||
auto it = state.starting_worker_processes.find(worker_startup_token);
|
||||
if (it != state.starting_worker_processes.end()) {
|
||||
|
@ -964,7 +957,6 @@ void WorkerPool::TryKillingIdleWorkers() {
|
|||
// This is possible because a Java worker process may hold multiple workers.
|
||||
continue;
|
||||
}
|
||||
auto shim_process = idle_worker->GetShimProcess();
|
||||
auto worker_startup_token = idle_worker->GetStartupToken();
|
||||
auto &worker_state = GetStateForLanguage(idle_worker->GetLanguage());
|
||||
|
||||
|
|
|
@ -217,7 +217,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
|
|||
///
|
||||
/// \param[in] worker The worker to be registered.
|
||||
/// \param[in] pid The PID of the worker.
|
||||
/// \param[in] worker_shim_pid The PID of the process for setup worker runtime env.
|
||||
/// \param[in] worker_startup_token The startup token of the process assigned to
|
||||
/// it during startup as a command line argument.
|
||||
/// \param[in] send_reply_callback The callback to invoke after registration is
|
||||
|
@ -225,7 +224,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
|
|||
/// Returns 0 if the worker should bind on a random port.
|
||||
/// \return If the registration is successful.
|
||||
Status RegisterWorker(const std::shared_ptr<WorkerInterface> &worker, pid_t pid,
|
||||
pid_t worker_shim_pid, StartupToken worker_startup_token,
|
||||
StartupToken worker_startup_token,
|
||||
std::function<void(Status, int)> send_reply_callback);
|
||||
|
||||
/// To be invoked when a worker is started. This method should be called when the worker
|
||||
|
|
|
@ -242,7 +242,6 @@ class WorkerPoolMock : public WorkerPool {
|
|||
mock_worker_rpc_clients_.emplace(worker->WorkerId(), rpc_client);
|
||||
if (set_process && !proc.IsNull()) {
|
||||
worker->SetProcess(proc);
|
||||
worker->SetShimProcess(proc);
|
||||
}
|
||||
return worker;
|
||||
}
|
||||
|
@ -300,7 +299,7 @@ class WorkerPoolMock : public WorkerPool {
|
|||
startup_tokens_by_proc_[it->first],
|
||||
// Don't set process to ensure the `RegisterWorker` succeeds below.
|
||||
false);
|
||||
RAY_CHECK_OK(RegisterWorker(worker, it->first.GetId(), it->first.GetId(),
|
||||
RAY_CHECK_OK(RegisterWorker(worker, it->first.GetId(),
|
||||
startup_tokens_by_proc_[it->first],
|
||||
[](Status, int) {}));
|
||||
OnWorkerStarted(worker);
|
||||
|
@ -565,9 +564,8 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) {
|
|||
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 1);
|
||||
// Check that we cannot lookup the worker before it's registered.
|
||||
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), nullptr);
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(),
|
||||
worker_pool_->GetStartupToken(proc),
|
||||
[](Status, int) {}));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(
|
||||
worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {}));
|
||||
worker_pool_->OnWorkerStarted(worker);
|
||||
// Check that we can lookup the worker after it's registered.
|
||||
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker);
|
||||
|
@ -585,7 +583,7 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) {
|
|||
TEST_F(WorkerPoolTest, HandleUnknownWorkerRegistration) {
|
||||
auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON);
|
||||
auto status = worker_pool_->RegisterWorker(
|
||||
worker, 1234, 1234, -1, [](const Status & /*unused*/, int /*unused*/) {});
|
||||
worker, 1234, -1, [](const Status & /*unused*/, int /*unused*/) {});
|
||||
ASSERT_FALSE(status.ok());
|
||||
}
|
||||
|
||||
|
@ -795,7 +793,7 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) {
|
|||
for (const auto &process : started_processes) {
|
||||
auto worker = worker_pool_->CreateWorker(Process());
|
||||
worker->SetStartupToken(worker_pool_->GetStartupToken(process));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, process.GetId(), process.GetId(),
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, process.GetId(),
|
||||
worker_pool_->GetStartupToken(process),
|
||||
[](Status, int) {}));
|
||||
// Calling `RegisterWorker` won't affect the counter of starting worker processes.
|
||||
|
@ -1053,12 +1051,10 @@ TEST_F(WorkerPoolTest, NoPopOnCrashedWorkerProcess) {
|
|||
// We now imitate worker process crashing while core worker initializing.
|
||||
|
||||
// 1. we register both workers.
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker1, proc.GetId(), proc.GetId(),
|
||||
worker_pool_->GetStartupToken(proc),
|
||||
[](Status, int) {}));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker2, proc.GetId(), proc.GetId(),
|
||||
worker_pool_->GetStartupToken(proc),
|
||||
[](Status, int) {}));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(
|
||||
worker1, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {}));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(
|
||||
worker2, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {}));
|
||||
|
||||
// 2. announce worker port for worker 1. When interacting with worker pool, it's
|
||||
// PushWorker.
|
||||
|
@ -1102,9 +1098,8 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) {
|
|||
auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON, job_id);
|
||||
worker->SetStartupToken(worker_pool_->GetStartupToken(proc));
|
||||
workers.push_back(worker);
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(),
|
||||
worker_pool_->GetStartupToken(proc),
|
||||
[](Status, int) {}));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(
|
||||
worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {}));
|
||||
worker_pool_->OnWorkerStarted(worker);
|
||||
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker);
|
||||
worker_pool_->PushWorker(worker);
|
||||
|
@ -1194,9 +1189,8 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) {
|
|||
auto [proc, token] = worker_pool_->StartWorkerProcess(
|
||||
Language::PYTHON, rpc::WorkerType::SPILL_WORKER, job_id, &status);
|
||||
auto worker = CreateSpillWorker(Process());
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(),
|
||||
worker_pool_->GetStartupToken(proc),
|
||||
[](Status, int) {}));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(
|
||||
worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {}));
|
||||
worker_pool_->OnWorkerStarted(worker);
|
||||
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker);
|
||||
worker_pool_->PushSpillWorker(worker);
|
||||
|
@ -1206,9 +1200,8 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) {
|
|||
auto [proc, token] = worker_pool_->StartWorkerProcess(
|
||||
Language::PYTHON, rpc::WorkerType::RESTORE_WORKER, job_id, &status);
|
||||
auto worker = CreateRestoreWorker(Process());
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(),
|
||||
worker_pool_->GetStartupToken(proc),
|
||||
[](Status, int) {}));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(
|
||||
worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {}));
|
||||
worker_pool_->OnWorkerStarted(worker);
|
||||
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker);
|
||||
worker_pool_->PushRestoreWorker(worker);
|
||||
|
@ -1251,9 +1244,8 @@ TEST_F(WorkerPoolTest, TestWorkerCappingLaterNWorkersNotOwningObjects) {
|
|||
auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON, job_id);
|
||||
worker->SetStartupToken(worker_pool_->GetStartupToken(proc));
|
||||
workers.push_back(worker);
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(),
|
||||
worker_pool_->GetStartupToken(proc),
|
||||
[](Status, int) {}));
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(
|
||||
worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {}));
|
||||
worker_pool_->OnWorkerStarted(worker);
|
||||
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker);
|
||||
worker_pool_->PushWorker(worker);
|
||||
|
@ -1317,7 +1309,7 @@ TEST_F(WorkerPoolTest, TestWorkerCappingWithExitDelay) {
|
|||
auto worker = worker_pool_->CreateWorker(Process(), language);
|
||||
worker->SetStartupToken(worker_pool_->GetStartupToken(proc));
|
||||
workers.push_back(worker);
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(),
|
||||
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(),
|
||||
worker_pool_->GetStartupToken(proc),
|
||||
[](Status, int) {}));
|
||||
worker_pool_->OnWorkerStarted(worker);
|
||||
|
@ -1623,31 +1615,6 @@ TEST_F(WorkerPoolTest, CacheWorkersByRuntimeEnvHash) {
|
|||
worker_pool_->ClearProcesses();
|
||||
}
|
||||
|
||||
TEST_F(WorkerPoolTest, StartWorkWithDifferentShimPid) {
|
||||
auto task_spec = ExampleTaskSpec();
|
||||
auto worker = worker_pool_->PopWorkerSync(task_spec);
|
||||
ASSERT_NE(worker, nullptr);
|
||||
auto last_process = worker_pool_->LastStartedWorkerProcess();
|
||||
pid_t shim_pid = last_process.GetId();
|
||||
ASSERT_EQ(shim_pid, worker->GetShimProcess().GetId());
|
||||
|
||||
// test dedicated worker
|
||||
std::vector<std::string> actor_jvm_options;
|
||||
actor_jvm_options.insert(
|
||||
actor_jvm_options.end(),
|
||||
{"-Dmy-actor.hello=foo", "-Dmy-actor.world=bar", "-Xmx2g", "-Xms1g"});
|
||||
auto task_id = TaskID::ForDriverTask(JOB_ID);
|
||||
auto actor_id = ActorID::Of(JOB_ID, task_id, 1);
|
||||
TaskSpecification java_task_spec = ExampleTaskSpec(
|
||||
ActorID::Nil(), Language::JAVA, JOB_ID, actor_id, actor_jvm_options, task_id);
|
||||
worker = worker_pool_->PopWorkerSync(java_task_spec);
|
||||
ASSERT_NE(worker, nullptr);
|
||||
last_process = worker_pool_->LastStartedWorkerProcess();
|
||||
shim_pid = last_process.GetId();
|
||||
ASSERT_EQ(shim_pid, worker->GetShimProcess().GetId());
|
||||
worker_pool_->ClearProcesses();
|
||||
}
|
||||
|
||||
TEST_F(WorkerPoolTest, WorkerNoLeaks) {
|
||||
std::shared_ptr<WorkerInterface> popped_worker;
|
||||
const auto task_spec = ExampleTaskSpec();
|
||||
|
|
|
@ -101,20 +101,15 @@ raylet::RayletClient::RayletClient(
|
|||
rpc::WorkerType worker_type, const JobID &job_id, const int &runtime_env_hash,
|
||||
const Language &language, const std::string &ip_address, Status *status,
|
||||
NodeID *raylet_id, int *port, std::string *serialized_job_config,
|
||||
pid_t worker_shim_pid, StartupToken startup_token)
|
||||
StartupToken startup_token)
|
||||
: grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) {
|
||||
conn_ = std::make_unique<raylet::RayletConnection>(io_service, raylet_socket, -1, -1);
|
||||
|
||||
// When the "shim process" which is used for setuping runtime_env is not needed,
|
||||
// the worker_shim_pid will be 0.
|
||||
if (worker_shim_pid == 0) {
|
||||
worker_shim_pid = getpid();
|
||||
}
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
// TODO(suquark): Use `WorkerType` in `common.proto` without converting to int.
|
||||
auto message = protocol::CreateRegisterClientRequest(
|
||||
fbb, static_cast<int>(worker_type), to_flatbuf(fbb, worker_id), getpid(),
|
||||
worker_shim_pid, startup_token, to_flatbuf(fbb, job_id), runtime_env_hash, language,
|
||||
startup_token, to_flatbuf(fbb, job_id), runtime_env_hash, language,
|
||||
fbb.CreateString(ip_address),
|
||||
/*port=*/0, fbb.CreateString(*serialized_job_config));
|
||||
fbb.Finish(message);
|
||||
|
|
|
@ -244,7 +244,6 @@ class RayletClient : public RayletClientInterface {
|
|||
/// \param serialized_job_config If this is a driver connection, the job config
|
||||
/// provided by driver will be passed to Raylet. If this is a worker connection,
|
||||
/// this will be populated with the current job config.
|
||||
/// \param worker_shim_pid The PID of the process for setup worker runtime env.
|
||||
/// \param startup_token The startup token of the process assigned to
|
||||
/// it during startup as a command line argument.
|
||||
RayletClient(instrumented_io_context &io_service,
|
||||
|
@ -253,8 +252,7 @@ class RayletClient : public RayletClientInterface {
|
|||
rpc::WorkerType worker_type, const JobID &job_id,
|
||||
const int &runtime_env_hash, const Language &language,
|
||||
const std::string &ip_address, Status *status, NodeID *raylet_id,
|
||||
int *port, std::string *serialized_job_config, pid_t worker_shim_pid,
|
||||
StartupToken startup_token);
|
||||
int *port, std::string *serialized_job_config, StartupToken startup_token);
|
||||
|
||||
/// Connect to the raylet via grpc only.
|
||||
///
|
||||
|
|
Loading…
Add table
Reference in a new issue