diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 5c3cad4cf..8144bf555 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1044,7 +1044,7 @@ cdef class CoreWorker: node_ip_address, node_manager_port, raylet_ip_address, local_mode, driver_name, stdout_file, stderr_file, serialized_job_config, metrics_agent_port, runtime_env_hash, - worker_shim_pid, startup_token): + startup_token): self.is_local_mode = local_mode cdef CCoreWorkerOptions options = CCoreWorkerOptions() @@ -1094,7 +1094,6 @@ cdef class CoreWorker: options.metrics_agent_port = metrics_agent_port options.connect_on_start = False options.runtime_env_hash = runtime_env_hash - options.worker_shim_pid = worker_shim_pid options.startup_token = startup_token CCoreWorkerProcess.Initialize(options) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 2776f3177..054aa132b 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -318,7 +318,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: int metrics_agent_port c_bool connect_on_start int runtime_env_hash - int worker_shim_pid int startup_token cdef cppclass CCoreWorkerProcess "ray::core::CoreWorkerProcess": diff --git a/python/ray/tests/mock_setup_worker.py b/python/ray/tests/mock_setup_worker.py index a65b8eda2..3d43cb609 100644 --- a/python/ray/tests/mock_setup_worker.py +++ b/python/ray/tests/mock_setup_worker.py @@ -35,12 +35,9 @@ parser.add_argument( args, remaining_args = parser.parse_known_args() -# add worker-shim-pid argument -remaining_args.append("--worker-shim-pid={}".format(os.getpid())) py_executable: str = sys.executable command_str = " ".join([f"exec {py_executable}"] + remaining_args) child_pid = os.fork() -print(f"shim pid:{os.getpid()} , worker pid:{child_pid}") if child_pid == 0: # child process os.execvp("bash", ["bash", "-c", command_str]) diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index 6518cf962..e9a1792fa 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -774,12 +774,6 @@ def main(): required=False, type=str, help="Password for connecting to Redis") - parser.add_argument( - "--worker-shim-pid", - required=False, - type=int, - default=0, - help="The PID of the process for setup worker runtime env.") parser.add_argument( "--metrics-agent-port", required=False, diff --git a/python/ray/worker.py b/python/ray/worker.py index 7c3fdd687..47ed6cd7d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1342,7 +1342,6 @@ def connect(node, namespace=None, job_config=None, runtime_env_hash=0, - worker_shim_pid=0, startup_token=0, ray_debugger_external=False): """Connect this worker to the raylet, to Plasma, and to Redis. @@ -1358,8 +1357,6 @@ def connect(node, job_id: The ID of job. If it's None, then we will generate one. job_config (ray.job_config.JobConfig): The job configuration. runtime_env_hash (int): The hash of the runtime env for this worker. - worker_shim_pid (int): The PID of the process for setup worker - runtime env. startup_token (int): The startup token of the process assigned to it during startup as a command line argument. ray_debugger_host (bool): The host to bind a Ray debugger to on @@ -1525,8 +1522,7 @@ def connect(node, gcs_options, logs_dir, node.node_ip_address, node.node_manager_port, node.raylet_ip_address, (mode == LOCAL_MODE), driver_name, log_stdout_file_path, log_stderr_file_path, serialized_job_config, - node.metrics_agent_port, runtime_env_hash, worker_shim_pid, - startup_token) + node.metrics_agent_port, runtime_env_hash, startup_token) # Notify raylet that the core worker is ready. worker.core_worker.notify_raylet() diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 9d58ef716..56cf5b185 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -123,12 +123,6 @@ parser.add_argument( type=int, default=0, help="The computed hash of the runtime env for this worker.") -parser.add_argument( - "--worker-shim-pid", - required=False, - type=int, - default=0, - help="The PID of the process for setup worker runtime env.") parser.add_argument( "--startup-token", required=True, @@ -199,7 +193,6 @@ if __name__ == "__main__": node, mode=mode, runtime_env_hash=args.runtime_env_hash, - worker_shim_pid=args.worker_shim_pid, startup_token=args.startup_token, ray_debugger_external=args.ray_debugger_external) diff --git a/python/ray/workers/setup_worker.py b/python/ray/workers/setup_worker.py index e783e34b6..a10120431 100644 --- a/python/ray/workers/setup_worker.py +++ b/python/ray/workers/setup_worker.py @@ -1,6 +1,5 @@ import argparse import logging -import os from ray._private.runtime_env.context import RuntimeEnvContext from ray.core.generated.common_pb2 import Language @@ -21,8 +20,6 @@ parser.add_argument( if __name__ == "__main__": args, remaining_args = parser.parse_known_args() - # NOTE(chenk008): we still need it to start worker in container. - remaining_args.append("--worker-shim-pid={}".format(os.getpid())) # NOTE(edoakes): args.serialized_runtime_env_context is only None when # we're starting the main Ray client proxy server. That case should # probably not even go through this codepath. diff --git a/src/mock/ray/raylet/worker.h b/src/mock/ray/raylet/worker.h index 4b91e9ec4..bf1120e70 100644 --- a/src/mock/ray/raylet/worker.h +++ b/src/mock/ray/raylet/worker.h @@ -26,8 +26,6 @@ class MockWorkerInterface : public WorkerInterface { MOCK_METHOD(WorkerID, WorkerId, (), (const, override)); MOCK_METHOD(Process, GetProcess, (), (const, override)); MOCK_METHOD(void, SetProcess, (Process proc), (override)); - MOCK_METHOD(Process, GetShimProcess, (), (const, override)); - MOCK_METHOD(void, SetShimProcess, (Process proc), (override)); MOCK_METHOD(Language, GetLanguage, (), (const, override)); MOCK_METHOD(const std::string, IpAddress, (), (const, override)); MOCK_METHOD(void, Connect, (int port), (override)); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9c4f1bdbe..7e71d0b17 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -112,8 +112,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(), options_.worker_type, worker_context_.GetCurrentJobID(), options_.runtime_env_hash, options_.language, options_.node_ip_address, &raylet_client_status, - &local_raylet_id, &assigned_port, &serialized_job_config, options_.worker_shim_pid, - options_.startup_token); + &local_raylet_id, &assigned_port, &serialized_job_config, options_.startup_token); if (!raylet_client_status.ok()) { // Avoid using FATAL log or RAY_CHECK here because they may create a core dump file. diff --git a/src/ray/core_worker/core_worker_options.h b/src/ray/core_worker/core_worker_options.h index e98605d01..56021c0bb 100644 --- a/src/ray/core_worker/core_worker_options.h +++ b/src/ray/core_worker/core_worker_options.h @@ -78,8 +78,7 @@ struct CoreWorkerOptions { serialized_job_config(""), metrics_agent_port(-1), connect_on_start(true), - runtime_env_hash(0), - worker_shim_pid(0) {} + runtime_env_hash(0) {} /// Type of this worker (i.e., DRIVER or WORKER). WorkerType worker_type; @@ -161,8 +160,6 @@ struct CoreWorkerOptions { bool connect_on_start; /// The hash of the runtime env for this worker. int runtime_env_hash; - /// The PID of the process for setup worker runtime env. - pid_t worker_shim_pid; /// The startup token of the process assigned to it /// during startup via command line arguments. /// This is needed because the actual core worker process diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index ed152ab2b..a3ce616dd 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -123,8 +123,6 @@ table RegisterClientRequest { worker_id: string; // The process ID of this worker. worker_pid: long; - // The PID of the process for setup worker runtime env. - worker_shim_pid: long; // The startup token of the process assigned to // it during startup as a command line argument. startup_token: long; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 1cd798110..156fd6a12 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1081,7 +1081,6 @@ void NodeManager::ProcessRegisterClientRequestMessage( const int runtime_env_hash = static_cast(message->runtime_env_hash()); WorkerID worker_id = from_flatbuf(*message->worker_id()); pid_t pid = message->worker_pid(); - pid_t worker_shim_pid = message->worker_shim_pid(); StartupToken worker_startup_token = message->startup_token(); std::string worker_ip_address = string_from_flatbuf(*message->ip_address()); // TODO(suquark): Use `WorkerType` in `common.proto` without type converting. @@ -1121,8 +1120,8 @@ void NodeManager::ProcessRegisterClientRequestMessage( worker_type == rpc::WorkerType::SPILL_WORKER || worker_type == rpc::WorkerType::RESTORE_WORKER) { // Register the new worker. - auto status = worker_pool_.RegisterWorker(worker, pid, worker_shim_pid, - worker_startup_token, send_reply_callback); + auto status = worker_pool_.RegisterWorker(worker, pid, worker_startup_token, + send_reply_callback); if (!status.ok()) { // If the worker failed to register to Raylet, trigger task dispatching here to // allow new worker processes to be started (if capped by @@ -1132,7 +1131,6 @@ void NodeManager::ProcessRegisterClientRequestMessage( } else { // Register the new driver. RAY_CHECK(pid >= 0); - // Don't need to set shim pid for driver worker->SetProcess(Process::FromPid(pid)); // Compute a dummy driver task id from a given driver. const TaskID driver_task_id = TaskID::ComputeDriverTaskId(worker_id); diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 3c8eabcaa..749398c95 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -70,9 +70,6 @@ class MockWorker : public WorkerInterface { StartupToken GetStartupToken() const { return 0; } void SetProcess(Process proc) { RAY_CHECK(false) << "Method unused"; } - Process GetShimProcess() const { return Process::CreateNewDummy(); } - void SetShimProcess(Process proc) { RAY_CHECK(false) << "Method unused"; } - Language GetLanguage() const { RAY_CHECK(false) << "Method unused"; return Language::PYTHON; diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index 5051a51ed..d55501e9e 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -74,16 +74,6 @@ void Worker::SetStartupToken(StartupToken startup_token) { startup_token_ = startup_token; } -Process Worker::GetShimProcess() const { - RAY_CHECK(worker_type_ != rpc::WorkerType::DRIVER); - return shim_proc_; -} - -void Worker::SetShimProcess(Process proc) { - RAY_CHECK(shim_proc_.IsNull()); // this procedure should not be called multiple times - shim_proc_ = std::move(proc); -} - Language Worker::GetLanguage() const { return language_; } const std::string Worker::IpAddress() const { return ip_address_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index e94e70f4e..dce852a66 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -52,9 +52,6 @@ class WorkerInterface { /// Return the worker process's startup token virtual StartupToken GetStartupToken() const = 0; virtual void SetProcess(Process proc) = 0; - /// Return the worker shim process. - virtual Process GetShimProcess() const = 0; - virtual void SetShimProcess(Process proc) = 0; virtual Language GetLanguage() const = 0; virtual const std::string IpAddress() const = 0; /// Connect this worker's gRPC client. @@ -145,9 +142,6 @@ class Worker : public WorkerInterface { /// Return the worker process's startup token StartupToken GetStartupToken() const; void SetProcess(Process proc); - /// Return this worker shim process. - Process GetShimProcess() const; - void SetShimProcess(Process proc); Language GetLanguage() const; const std::string IpAddress() const; /// Connect this worker's gRPC client. @@ -228,9 +222,6 @@ class Worker : public WorkerInterface { Process proc_; /// The worker's process's startup_token StartupToken startup_token_; - /// The worker's shim process. The shim process PID is the same with worker process PID, - /// except starting worker process in container. - Process shim_proc_; /// The language type of this worker. Language language_; /// The type of the worker. diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index babb8a867..66d5f2a32 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -402,7 +402,6 @@ std::tuple WorkerPool::StartWorkerProcess( worker_command_args.push_back("--serialized-runtime-env-context=" + serialized_runtime_env_context); } else { - // The "shim process" setup worker is not needed, so do not run it. // Check that the arg really is the path to the setup worker before erasing it, to // prevent breaking tests that mock out the worker command args. if (worker_command_args.size() >= 2 && @@ -649,22 +648,18 @@ boost::optional WorkerPool::GetJobConfig( } Status WorkerPool::RegisterWorker(const std::shared_ptr &worker, - pid_t pid, pid_t worker_shim_pid, - StartupToken worker_startup_token, + pid_t pid, StartupToken worker_startup_token, std::function send_reply_callback) { RAY_CHECK(worker); auto &state = GetStateForLanguage(worker->GetLanguage()); auto it = state.starting_worker_processes.find(worker_startup_token); if (it == state.starting_worker_processes.end()) { - RAY_LOG(WARNING) - << "Received a register request from an unknown worker shim process: " - << worker_shim_pid << ", token: " << worker_startup_token; + RAY_LOG(WARNING) << "Received a register request from an unknown token: " + << worker_startup_token; Status status = Status::Invalid("Unknown worker"); send_reply_callback(status, /*port=*/0); return status; } - auto shim_process = Process::FromPid(worker_shim_pid); - worker->SetShimProcess(shim_process); auto process = Process::FromPid(pid); worker->SetProcess(process); @@ -695,9 +690,7 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker void WorkerPool::OnWorkerStarted(const std::shared_ptr &worker) { auto &state = GetStateForLanguage(worker->GetLanguage()); - const auto &shim_process = worker->GetShimProcess(); const StartupToken worker_startup_token = worker->GetStartupToken(); - RAY_CHECK(shim_process.IsValid()); auto it = state.starting_worker_processes.find(worker_startup_token); if (it != state.starting_worker_processes.end()) { @@ -964,7 +957,6 @@ void WorkerPool::TryKillingIdleWorkers() { // This is possible because a Java worker process may hold multiple workers. continue; } - auto shim_process = idle_worker->GetShimProcess(); auto worker_startup_token = idle_worker->GetStartupToken(); auto &worker_state = GetStateForLanguage(idle_worker->GetLanguage()); diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 5a02120bf..b311d6611 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -217,7 +217,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// /// \param[in] worker The worker to be registered. /// \param[in] pid The PID of the worker. - /// \param[in] worker_shim_pid The PID of the process for setup worker runtime env. /// \param[in] worker_startup_token The startup token of the process assigned to /// it during startup as a command line argument. /// \param[in] send_reply_callback The callback to invoke after registration is @@ -225,7 +224,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Returns 0 if the worker should bind on a random port. /// \return If the registration is successful. Status RegisterWorker(const std::shared_ptr &worker, pid_t pid, - pid_t worker_shim_pid, StartupToken worker_startup_token, + StartupToken worker_startup_token, std::function send_reply_callback); /// To be invoked when a worker is started. This method should be called when the worker diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 83613279a..8027144a9 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -242,7 +242,6 @@ class WorkerPoolMock : public WorkerPool { mock_worker_rpc_clients_.emplace(worker->WorkerId(), rpc_client); if (set_process && !proc.IsNull()) { worker->SetProcess(proc); - worker->SetShimProcess(proc); } return worker; } @@ -300,7 +299,7 @@ class WorkerPoolMock : public WorkerPool { startup_tokens_by_proc_[it->first], // Don't set process to ensure the `RegisterWorker` succeeds below. false); - RAY_CHECK_OK(RegisterWorker(worker, it->first.GetId(), it->first.GetId(), + RAY_CHECK_OK(RegisterWorker(worker, it->first.GetId(), startup_tokens_by_proc_[it->first], [](Status, int) {})); OnWorkerStarted(worker); @@ -565,9 +564,8 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) { ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 1); // Check that we cannot lookup the worker before it's registered. ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), nullptr); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), - worker_pool_->GetStartupToken(proc), - [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterWorker( + worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {})); worker_pool_->OnWorkerStarted(worker); // Check that we can lookup the worker after it's registered. ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker); @@ -585,7 +583,7 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) { TEST_F(WorkerPoolTest, HandleUnknownWorkerRegistration) { auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON); auto status = worker_pool_->RegisterWorker( - worker, 1234, 1234, -1, [](const Status & /*unused*/, int /*unused*/) {}); + worker, 1234, -1, [](const Status & /*unused*/, int /*unused*/) {}); ASSERT_FALSE(status.ok()); } @@ -795,7 +793,7 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) { for (const auto &process : started_processes) { auto worker = worker_pool_->CreateWorker(Process()); worker->SetStartupToken(worker_pool_->GetStartupToken(process)); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, process.GetId(), process.GetId(), + RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, process.GetId(), worker_pool_->GetStartupToken(process), [](Status, int) {})); // Calling `RegisterWorker` won't affect the counter of starting worker processes. @@ -1053,12 +1051,10 @@ TEST_F(WorkerPoolTest, NoPopOnCrashedWorkerProcess) { // We now imitate worker process crashing while core worker initializing. // 1. we register both workers. - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker1, proc.GetId(), proc.GetId(), - worker_pool_->GetStartupToken(proc), - [](Status, int) {})); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker2, proc.GetId(), proc.GetId(), - worker_pool_->GetStartupToken(proc), - [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterWorker( + worker1, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterWorker( + worker2, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {})); // 2. announce worker port for worker 1. When interacting with worker pool, it's // PushWorker. @@ -1102,9 +1098,8 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON, job_id); worker->SetStartupToken(worker_pool_->GetStartupToken(proc)); workers.push_back(worker); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), - worker_pool_->GetStartupToken(proc), - [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterWorker( + worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {})); worker_pool_->OnWorkerStarted(worker); ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker); worker_pool_->PushWorker(worker); @@ -1194,9 +1189,8 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { auto [proc, token] = worker_pool_->StartWorkerProcess( Language::PYTHON, rpc::WorkerType::SPILL_WORKER, job_id, &status); auto worker = CreateSpillWorker(Process()); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), - worker_pool_->GetStartupToken(proc), - [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterWorker( + worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {})); worker_pool_->OnWorkerStarted(worker); ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker); worker_pool_->PushSpillWorker(worker); @@ -1206,9 +1200,8 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { auto [proc, token] = worker_pool_->StartWorkerProcess( Language::PYTHON, rpc::WorkerType::RESTORE_WORKER, job_id, &status); auto worker = CreateRestoreWorker(Process()); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), - worker_pool_->GetStartupToken(proc), - [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterWorker( + worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {})); worker_pool_->OnWorkerStarted(worker); ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker); worker_pool_->PushRestoreWorker(worker); @@ -1251,9 +1244,8 @@ TEST_F(WorkerPoolTest, TestWorkerCappingLaterNWorkersNotOwningObjects) { auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON, job_id); worker->SetStartupToken(worker_pool_->GetStartupToken(proc)); workers.push_back(worker); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), - worker_pool_->GetStartupToken(proc), - [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterWorker( + worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {})); worker_pool_->OnWorkerStarted(worker); ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker); worker_pool_->PushWorker(worker); @@ -1317,7 +1309,7 @@ TEST_F(WorkerPoolTest, TestWorkerCappingWithExitDelay) { auto worker = worker_pool_->CreateWorker(Process(), language); worker->SetStartupToken(worker_pool_->GetStartupToken(proc)); workers.push_back(worker); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), + RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), worker_pool_->GetStartupToken(proc), [](Status, int) {})); worker_pool_->OnWorkerStarted(worker); @@ -1623,31 +1615,6 @@ TEST_F(WorkerPoolTest, CacheWorkersByRuntimeEnvHash) { worker_pool_->ClearProcesses(); } -TEST_F(WorkerPoolTest, StartWorkWithDifferentShimPid) { - auto task_spec = ExampleTaskSpec(); - auto worker = worker_pool_->PopWorkerSync(task_spec); - ASSERT_NE(worker, nullptr); - auto last_process = worker_pool_->LastStartedWorkerProcess(); - pid_t shim_pid = last_process.GetId(); - ASSERT_EQ(shim_pid, worker->GetShimProcess().GetId()); - - // test dedicated worker - std::vector actor_jvm_options; - actor_jvm_options.insert( - actor_jvm_options.end(), - {"-Dmy-actor.hello=foo", "-Dmy-actor.world=bar", "-Xmx2g", "-Xms1g"}); - auto task_id = TaskID::ForDriverTask(JOB_ID); - auto actor_id = ActorID::Of(JOB_ID, task_id, 1); - TaskSpecification java_task_spec = ExampleTaskSpec( - ActorID::Nil(), Language::JAVA, JOB_ID, actor_id, actor_jvm_options, task_id); - worker = worker_pool_->PopWorkerSync(java_task_spec); - ASSERT_NE(worker, nullptr); - last_process = worker_pool_->LastStartedWorkerProcess(); - shim_pid = last_process.GetId(); - ASSERT_EQ(shim_pid, worker->GetShimProcess().GetId()); - worker_pool_->ClearProcesses(); -} - TEST_F(WorkerPoolTest, WorkerNoLeaks) { std::shared_ptr popped_worker; const auto task_spec = ExampleTaskSpec(); diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 6f692c6d4..8331a7cb6 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -101,20 +101,15 @@ raylet::RayletClient::RayletClient( rpc::WorkerType worker_type, const JobID &job_id, const int &runtime_env_hash, const Language &language, const std::string &ip_address, Status *status, NodeID *raylet_id, int *port, std::string *serialized_job_config, - pid_t worker_shim_pid, StartupToken startup_token) + StartupToken startup_token) : grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) { conn_ = std::make_unique(io_service, raylet_socket, -1, -1); - // When the "shim process" which is used for setuping runtime_env is not needed, - // the worker_shim_pid will be 0. - if (worker_shim_pid == 0) { - worker_shim_pid = getpid(); - } flatbuffers::FlatBufferBuilder fbb; // TODO(suquark): Use `WorkerType` in `common.proto` without converting to int. auto message = protocol::CreateRegisterClientRequest( fbb, static_cast(worker_type), to_flatbuf(fbb, worker_id), getpid(), - worker_shim_pid, startup_token, to_flatbuf(fbb, job_id), runtime_env_hash, language, + startup_token, to_flatbuf(fbb, job_id), runtime_env_hash, language, fbb.CreateString(ip_address), /*port=*/0, fbb.CreateString(*serialized_job_config)); fbb.Finish(message); diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index a0c2ea61d..db3acca1a 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -244,7 +244,6 @@ class RayletClient : public RayletClientInterface { /// \param serialized_job_config If this is a driver connection, the job config /// provided by driver will be passed to Raylet. If this is a worker connection, /// this will be populated with the current job config. - /// \param worker_shim_pid The PID of the process for setup worker runtime env. /// \param startup_token The startup token of the process assigned to /// it during startup as a command line argument. RayletClient(instrumented_io_context &io_service, @@ -253,8 +252,7 @@ class RayletClient : public RayletClientInterface { rpc::WorkerType worker_type, const JobID &job_id, const int &runtime_env_hash, const Language &language, const std::string &ip_address, Status *status, NodeID *raylet_id, - int *port, std::string *serialized_job_config, pid_t worker_shim_pid, - StartupToken startup_token); + int *port, std::string *serialized_job_config, StartupToken startup_token); /// Connect to the raylet via grpc only. ///