Revert "[Job Submission][refactor 1/N] Add AgentInfo to GCSNodeInfo (#26302)" (#27242)

This reverts commit 14dee5f6a3.
This commit is contained in:
SangBin Cho 2022-07-30 16:08:23 +09:00 committed by GitHub
parent 7da700e337
commit ec69fec1e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 106 additions and 216 deletions

View file

@ -1,6 +1,7 @@
import argparse
import asyncio
import io
import json
import logging
import logging.handlers
import os
@ -12,10 +13,11 @@ import ray._private.services
import ray._private.utils
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.utils as dashboard_utils
import ray.experimental.internal_kv as internal_kv
from ray._private.gcs_pubsub import GcsAioPublisher, GcsPublisher
from ray._private.gcs_utils import GcsAioClient, GcsClient
from ray._private.ray_logging import setup_component_logger
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc, common_pb2
from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc
from ray.experimental.internal_kv import (
_initialize_internal_kv,
_internal_kv_initialized,
@ -260,20 +262,22 @@ class DashboardAgent:
# TODO: Use async version if performance is an issue
# -1 should indicate that http server is not started.
http_port = -1 if not self.http_server else self.http_server.http_port
internal_kv._internal_kv_put(
f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{self.node_id}",
json.dumps([http_port, self.grpc_port]),
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)
# Register agent to agent manager.
raylet_stub = agent_manager_pb2_grpc.AgentManagerServiceStub(
self.aiogrpc_raylet_channel
)
await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest(
agent_info=common_pb2.AgentInfo(
id=self.agent_id,
pid=os.getpid(),
grpc_port=self.grpc_port,
http_port=http_port,
ip_address=self.ip,
)
agent_id=self.agent_id,
agent_port=self.grpc_port,
agent_ip_address=self.ip,
)
)

View file

@ -1,6 +1,7 @@
from ray._private.ray_constants import env_integer
DASHBOARD_LOG_FILENAME = "dashboard.log"
DASHBOARD_AGENT_PORT_PREFIX = "DASHBOARD_AGENT_PORT_PREFIX:"
DASHBOARD_AGENT_LOG_FILENAME = "dashboard_agent.log"
DASHBOARD_AGENT_CHECK_PARENT_INTERVAL_SECONDS = 2
RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME = "RAY_STATE_SERVER_MAX_HTTP_REQUEST"

View file

@ -1,4 +1,5 @@
import asyncio
import json
import logging
import re
import time
@ -6,6 +7,7 @@ import time
import aiohttp.web
import ray._private.utils
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
from ray._private import ray_constants
@ -129,9 +131,10 @@ class NodeHead(dashboard_utils.DashboardHeadModule):
try:
nodes = await self._get_nodes()
alive_node_ids = []
alive_node_infos = []
node_id_to_ip = {}
node_id_to_hostname = {}
agents = dict(DataSource.agents)
for node in nodes.values():
node_id = node["nodeId"]
ip = node["nodeManagerAddress"]
@ -147,10 +150,20 @@ class NodeHead(dashboard_utils.DashboardHeadModule):
node_id_to_hostname[node_id] = hostname
assert node["state"] in ["ALIVE", "DEAD"]
if node["state"] == "ALIVE":
agents[node_id] = [
node["agentInfo"]["httpPort"],
node["agentInfo"]["grpcPort"],
]
alive_node_ids.append(node_id)
alive_node_infos.append(node)
agents = dict(DataSource.agents)
for node_id in alive_node_ids:
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}" f"{node_id}"
# TODO: Use async version if performance is an issue
agent_port = ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
if agent_port:
agents[node_id] = json.loads(agent_port)
for node_id in agents.keys() - set(alive_node_ids):
agents.pop(node_id, None)
DataSource.node_id_to_ip.reset(node_id_to_ip)
DataSource.node_id_to_hostname.reset(node_id_to_hostname)

View file

@ -6,7 +6,6 @@ import time
import traceback
import random
import pytest
import psutil
import ray
import threading
from datetime import datetime, timedelta
@ -19,7 +18,6 @@ from ray._private.test_utils import (
wait_for_condition,
wait_until_succeeded_without_exception,
)
from ray._private.state import state
logger = logging.getLogger(__name__)
@ -350,33 +348,5 @@ def test_frequent_node_update(
wait_for_condition(verify, timeout=15)
# See detail: https://github.com/ray-project/ray/issues/24361
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
def test_node_register_with_agent(ray_start_cluster_head):
def test_agent_port(pid, port):
p = psutil.Process(pid)
assert p.cmdline()[2].endswith("dashboard/agent.py")
for c in p.connections():
if c.status == psutil.CONN_LISTEN and c.laddr.port == port:
return
assert False
def test_agent_process(pid):
p = psutil.Process(pid)
assert p.cmdline()[2].endswith("dashboard/agent.py")
for node_info in state.node_table():
agent_info = node_info["AgentInfo"]
assert agent_info["IpAddress"] == node_info["NodeManagerAddress"]
test_agent_port(agent_info["Pid"], agent_info["GrpcPort"])
if agent_info["HttpPort"] >= 0:
test_agent_port(agent_info["Pid"], agent_info["HttpPort"])
else:
# Port conflicts may be caused that the previous
# test did not kill the agent cleanly
assert agent_info["HttpPort"] == -1
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -111,6 +111,7 @@ def test_basic(ray_start_with_dashboard):
"""Dashboard test that starts a Ray cluster with a dashboard server running,
then hits the dashboard API and asserts that it receives sensible data."""
address_info = ray_start_with_dashboard
node_id = address_info["node_id"]
gcs_client = make_gcs_client(address_info)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
@ -146,6 +147,11 @@ def test_basic(ray_start_with_dashboard):
namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
)
assert dashboard_rpc_address is not None
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{node_id}"
agent_ports = ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
assert agent_ports is not None
def test_raylet_and_agent_share_fate(shutdown_only):
@ -790,6 +796,7 @@ def test_dashboard_port_conflict(ray_start_with_dashboard):
)
def test_gcs_check_alive(fast_gcs_failure_detection, ray_start_with_dashboard):
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True
all_processes = ray._private.worker._global_node.all_processes
dashboard_info = all_processes[ray_constants.PROCESS_TYPE_DASHBOARD][0]
dashboard_proc = psutil.Process(dashboard_info.process.pid)

View file

@ -164,12 +164,6 @@ class GlobalState:
"RayletSocketName": item.raylet_socket_name,
"MetricsExportPort": item.metrics_export_port,
"NodeName": item.node_name,
"AgentInfo": {
"IpAddress": item.agent_info.ip_address,
"GrpcPort": item.agent_info.grpc_port,
"HttpPort": item.agent_info.http_port,
"Pid": item.agent_info.pid,
},
}
node_info["alive"] = node_info["Alive"]
node_info["Resources"] = (

View file

@ -68,8 +68,7 @@ py_test_module_list(
"test_healthcheck.py",
"test_kill_raylet_signal_log.py",
"test_memstat.py",
"test_protobuf_compatibility.py",
"test_scheduling_performance.py"
"test_protobuf_compatibility.py"
],
size = "medium",
tags = ["exclusive", "medium_size_python_tests_a_to_j", "team:core"],
@ -121,6 +120,7 @@ py_test_module_list(
"test_multi_node_2.py",
"test_multinode_failures.py",
"test_multinode_failures_2.py",
"test_multiprocessing.py",
"test_object_assign_owner.py",
"test_placement_group.py",
"test_placement_group_2.py",
@ -138,6 +138,7 @@ py_test_module_list(
"test_runtime_env_fork_process.py",
"test_serialization.py",
"test_shuffle.py",
"test_state_api.py",
"test_state_api_log.py",
"test_state_api_summary.py",
"test_stress.py",
@ -184,6 +185,7 @@ py_test_module_list(
"test_cross_language.py",
"test_environ.py",
"test_raylet_output.py",
"test_scheduling_performance.py",
"test_get_or_create_actor.py",
],
size = "small",
@ -263,7 +265,6 @@ py_test_module_list(
"test_storage.py",
"test_reference_counting_2.py",
"test_exit_observability.py",
"test_state_api.py",
"test_usage_stats.py",
],
size = "large",
@ -294,7 +295,6 @@ py_test_module_list(
"test_placement_group_mini_integration.py",
"test_scheduling_2.py",
"test_multi_node_3.py",
"test_multiprocessing.py",
],
size = "large",
tags = ["exclusive", "large_size_python_tests_shard_1", "team:core"],

View file

@ -15,7 +15,6 @@ from pathlib import Path
from tempfile import gettempdir
from typing import List, Tuple
from unittest import mock
import signal
import pytest
@ -205,19 +204,10 @@ def _ray_start(**kwargs):
init_kwargs.update(kwargs)
# Start the Ray processes.
address_info = ray.init("local", **init_kwargs)
agent_pids = []
for node in ray.nodes():
agent_pids.append(int(node["AgentInfo"]["Pid"]))
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
# Make sure the agent process is dead.
for pid in agent_pids:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
# Delete the cluster address just in case.
ray._private.utils.reset_ray_address()

View file

@ -834,9 +834,8 @@ def test_ray_status(shutdown_only, monkeypatch):
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported on Windows")
def test_ray_status_multinode(ray_start_cluster):
NODE_NUMBER = 4
cluster = ray_start_cluster
for _ in range(NODE_NUMBER):
for _ in range(4):
cluster.add_node(num_cpus=2)
runner = CliRunner()
@ -851,12 +850,8 @@ def test_ray_status_multinode(ray_start_cluster):
wait_for_condition(output_ready)
def check_result():
result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
return True
wait_for_condition(check_result)
result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
@pytest.mark.skipif(

View file

@ -11,25 +11,17 @@ from ray._private.test_utils import wait_for_condition, run_string_as_driver_non
def get_all_ray_worker_processes():
processes = psutil.process_iter(attrs=["pid", "name", "cmdline"])
processes = [
p.info["cmdline"] for p in psutil.process_iter(attrs=["pid", "name", "cmdline"])
]
result = []
for p in processes:
cmd_line = p.info["cmdline"]
if cmd_line is not None and len(cmd_line) > 0 and "ray::" in cmd_line[0]:
if p is not None and len(p) > 0 and "ray::" in p[0]:
result.append(p)
return result
def kill_all_ray_worker_process():
ray_process = get_all_ray_worker_processes()
for p in ray_process:
try:
p.kill()
except Exception:
pass
@pytest.fixture
def short_gcs_publish_timeout(monkeypatch):
monkeypatch.setenv("RAY_MAX_GCS_PUBLISH_RETRIES", "3")
@ -39,10 +31,6 @@ def short_gcs_publish_timeout(monkeypatch):
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_ray_shutdown(short_gcs_publish_timeout, shutdown_only):
"""Make sure all ray workers are shutdown when driver is done."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()
ray.init()
@ray.remote
@ -63,10 +51,6 @@ def test_ray_shutdown(short_gcs_publish_timeout, shutdown_only):
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_driver_dead(short_gcs_publish_timeout, shutdown_only):
"""Make sure all ray workers are shutdown when driver is killed."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()
driver = """
import ray
ray.init(_system_config={"gcs_rpc_server_reconnect_timeout_s": 1})
@ -96,10 +80,6 @@ tasks = [f.remote() for _ in range(num_cpus)]
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_node_killed(short_gcs_publish_timeout, ray_start_cluster):
"""Make sure all ray workers when nodes are dead."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()
cluster = ray_start_cluster
# head node.
cluster.add_node(
@ -132,10 +112,6 @@ def test_node_killed(short_gcs_publish_timeout, ray_start_cluster):
@pytest.mark.skipif(platform.system() == "Windows", reason="Hang on Windows.")
def test_head_node_down(short_gcs_publish_timeout, ray_start_cluster):
"""Make sure all ray workers when head node is dead."""
# Avoiding the previous test doesn't kill the relevant process,
# thus making the current test fail.
kill_all_ray_worker_process()
cluster = ray_start_cluster
# head node.
head = cluster.add_node(

View file

@ -10,6 +10,7 @@ import yaml
from click.testing import CliRunner
import ray
import ray.dashboard.consts as dashboard_consts
import ray._private.state as global_state
import ray._private.ray_constants as ray_constants
from ray._private.test_utils import (
@ -1159,8 +1160,16 @@ async def test_state_data_source_client(ray_start_cluster):
wait_for_condition(lambda: len(ray.nodes()) == 2)
for node in ray.nodes():
node_id = node["NodeID"]
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{node_id}"
def get_port():
return ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
wait_for_condition(lambda: get_port() is not None)
# The second index is the gRPC port
port = node["AgentInfo"]["GrpcPort"]
port = json.loads(get_port())[1]
ip = node["NodeManagerAddress"]
client.register_agent_client(node_id, ip, port)
result = await client.get_runtime_envs_info(node_id)
@ -1362,8 +1371,16 @@ async def test_state_data_source_client_limit_distributed_sources(ray_start_clus
"""
for node in ray.nodes():
node_id = node["NodeID"]
key = f"{dashboard_consts.DASHBOARD_AGENT_PORT_PREFIX}{node_id}"
def get_port():
return ray.experimental.internal_kv._internal_kv_get(
key, namespace=ray_constants.KV_NAMESPACE_DASHBOARD
)
wait_for_condition(lambda: get_port() is not None)
# The second index is the gRPC port
port = node["AgentInfo"]["GrpcPort"]
port = json.loads(get_port())[1]
ip = node["NodeManagerAddress"]
client.register_agent_client(node_id, ip, port)
@ -1476,13 +1493,8 @@ def test_cli_apis_sanity_check(ray_start_cluster):
)
)
# Test get workers by id
# Still need a `wait_for_condition`,
# because the worker obtained through the api server will not filter the driver,
# but `global_state.workers` will filter the driver.
wait_for_condition(lambda: len(global_state.workers()) > 0)
workers = global_state.workers()
assert len(workers) > 0
worker_id = list(workers.keys())[0]
wait_for_condition(
lambda: verify_output(cli_get, ["workers", worker_id], ["worker_id", worker_id])

View file

@ -76,9 +76,6 @@ RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100)
/// The duration between raylet check memory pressure and send gc request
RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100)
/// If the raylet fails to get agent info, we will retry after this interval.
RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1)
/// For a raylet, if the last resource report was sent more than this many
/// report periods ago, then a warning will be logged that the report
/// handler is drifting.

View file

@ -243,7 +243,7 @@ python_grpc_compile(
proto_library(
name = "agent_manager_proto",
srcs = ["agent_manager.proto"],
deps = [":common_proto"],
deps = [],
)
python_grpc_compile(

View file

@ -17,8 +17,6 @@ option cc_enable_arenas = true;
package ray.rpc;
import "src/ray/protobuf/common.proto";
enum AgentRpcStatus {
// OK.
AGENT_RPC_STATUS_OK = 0;
@ -27,7 +25,9 @@ enum AgentRpcStatus {
}
message RegisterAgentRequest {
AgentInfo agent_info = 1;
int32 agent_id = 1;
int32 agent_port = 2;
string agent_ip_address = 3;
}
message RegisterAgentReply {

View file

@ -677,17 +677,3 @@ message NamedActorInfo {
string ray_namespace = 1;
string name = 2;
}
// Info about a agent process.
message AgentInfo {
// The agent id.
int32 id = 1;
// The agent process pid.
int64 pid = 2;
// IP address of the agent process.
string ip_address = 3;
// The GRPC port number of the agent process.
int32 grpc_port = 4;
// The http port number of the agent process.
int32 http_port = 5;
}

View file

@ -247,8 +247,6 @@ message GcsNodeInfo {
// The user-provided identifier or name for this node.
string node_name = 12;
// The information of the agent process.
AgentInfo agent_info = 13;
}
message HeartbeatTableData {

View file

@ -29,20 +29,19 @@ namespace raylet {
void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request,
rpc::RegisterAgentReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reported_agent_info_.CopyFrom(request.agent_info());
reported_agent_ip_address_ = request.agent_ip_address();
reported_agent_port_ = request.agent_port();
reported_agent_id_ = request.agent_id();
// TODO(SongGuyang): We should remove this after we find better port resolution.
// Note: `reported_agent_info_.grpc_port()` should be 0 if the grpc port of agent is in
// conflict.
if (reported_agent_info_.grpc_port() != 0) {
// Note: `agent_port_` should be 0 if the grpc port of agent is in conflict.
if (reported_agent_port_ != 0) {
runtime_env_agent_client_ = runtime_env_agent_client_factory_(
reported_agent_info_.ip_address(), reported_agent_info_.grpc_port());
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << reported_agent_info_.ip_address()
<< ", port: " << reported_agent_info_.grpc_port()
<< ", id: " << reported_agent_info_.id();
reported_agent_ip_address_, reported_agent_port_);
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << reported_agent_ip_address_
<< ", port: " << reported_agent_port_ << ", id: " << reported_agent_id_;
} else {
RAY_LOG(WARNING) << "The GRPC port of the Ray agent is invalid (0), ip: "
<< reported_agent_info_.ip_address()
<< ", id: " << reported_agent_info_.id()
<< reported_agent_ip_address_ << ", id: " << reported_agent_id_
<< ". The agent client in the raylet has been disabled.";
disable_agent_client_ = true;
}
@ -57,19 +56,16 @@ void AgentManager::StartAgent() {
return;
}
// Create a non-zero random agent_id_ to pass to the child process
// Create a non-zero random agent_id to pass to the child process
// We cannot use pid an id because os.getpid() from the python process is not
// reliable when using a launcher.
// See https://github.com/ray-project/ray/issues/24361 and Python issue
// https://github.com/python/cpython/issues/83086
agent_id_ = 0;
while (agent_id_ == 0) {
agent_id_ = rand();
int agent_id = 0;
while (agent_id == 0) {
agent_id = rand();
}
// Make sure reported_agent_info_.id() not equal
// `agent_id_` before the agent finished register.
reported_agent_info_.set_id(0);
const std::string agent_id_str = std::to_string(agent_id_);
const std::string agent_id_str = std::to_string(agent_id);
std::vector<const char *> argv;
for (const std::string &arg : options_.agent_commands) {
argv.push_back(arg.c_str());
@ -108,22 +104,21 @@ void AgentManager::StartAgent() {
<< ec.message();
}
std::thread monitor_thread([this, child]() mutable {
std::thread monitor_thread([this, child, agent_id]() mutable {
SetThreadName("agent.monitor");
RAY_LOG(INFO) << "Monitor agent process with id " << child.GetId()
<< ", register timeout "
RAY_LOG(INFO) << "Monitor agent process with id " << agent_id << ", register timeout "
<< RayConfig::instance().agent_register_timeout_ms() << "ms.";
auto timer = delay_executor_(
[this, child]() mutable {
if (!IsAgentRegistered()) {
if (reported_agent_info_.id() == 0) {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id_
[this, child, agent_id]() mutable {
if (reported_agent_id_ != agent_id) {
if (reported_agent_id_ == 0) {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " timed out before registering. ip "
<< reported_agent_info_.ip_address() << ", id "
<< reported_agent_info_.id();
<< reported_agent_ip_address_ << ", id "
<< reported_agent_id_;
} else {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id_
<< " but got id " << reported_agent_info_.id()
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " but got id " << reported_agent_id_
<< ", this is a fatal error";
}
child.Kill();
@ -133,9 +128,9 @@ void AgentManager::StartAgent() {
int exit_code = child.Wait();
timer->cancel();
RAY_LOG(WARNING) << "Agent process with id " << agent_id_ << " exited, return value "
<< exit_code << ". ip " << reported_agent_info_.ip_address()
<< ". id " << reported_agent_info_.id();
RAY_LOG(WARNING) << "Agent process with id " << agent_id << " exited, return value "
<< exit_code << ". ip " << reported_agent_ip_address_ << ". id "
<< reported_agent_id_;
RAY_LOG(ERROR)
<< "The raylet exited immediately because the Ray agent failed. "
"The raylet fate shares with the agent. This can happen because the "
@ -308,15 +303,5 @@ void AgentManager::DeleteRuntimeEnvIfPossible(
});
}
const ray::Status AgentManager::TryToGetAgentInfo(rpc::AgentInfo *agent_info) const {
if (IsAgentRegistered()) {
*agent_info = reported_agent_info_;
return ray::Status::OK();
} else {
std::string err_msg = "The agent has not finished register yet.";
return ray::Status::Invalid(err_msg);
}
}
} // namespace raylet
} // namespace ray

View file

@ -23,7 +23,6 @@
#include "ray/rpc/agent_manager/agent_manager_server.h"
#include "ray/rpc/runtime_env/runtime_env_client.h"
#include "ray/util/process.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace raylet {
@ -89,28 +88,17 @@ class AgentManager : public rpc::AgentManagerServiceHandler {
virtual void DeleteRuntimeEnvIfPossible(const std::string &serialized_runtime_env,
DeleteRuntimeEnvIfPossibleCallback callback);
/// Try to Get the information about the agent process.
///
/// \param[out] agent_info The information of the agent process.
/// \return Status, if successful will return `ray::Status::OK`,
/// otherwise will return `ray::Status::Invalid`.
const ray::Status TryToGetAgentInfo(rpc::AgentInfo *agent_info) const;
private:
void StartAgent();
const bool IsAgentRegistered() const { return reported_agent_info_.id() == agent_id_; }
private:
Options options_;
/// we need to make sure `agent_id_` and `reported_agent_info_.id()` are not equal
/// until the agent process is finished registering, the initial value of
/// `reported_agent_info_.id()` is 0, so I set the initial value of `agent_id_` is -1
int agent_id_ = -1;
rpc::AgentInfo reported_agent_info_;
pid_t reported_agent_id_ = 0;
int reported_agent_port_ = 0;
/// Whether or not we intend to start the agent. This is false if we
/// are missing Ray Dashboard dependencies, for example.
bool should_start_agent_ = true;
std::string reported_agent_ip_address_;
DelayExecutorFn delay_executor_;
RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_;
std::shared_ptr<rpc::RuntimeEnvAgentClientInterface> runtime_env_agent_client_;

View file

@ -2860,10 +2860,6 @@ void NodeManager::PublishInfeasibleTaskError(const RayTask &task) const {
}
}
const ray::Status NodeManager::TryToGetAgentInfo(rpc::AgentInfo *agent_info) const {
return agent_manager_->TryToGetAgentInfo(agent_info);
}
} // namespace raylet
} // namespace ray

View file

@ -237,13 +237,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
int64_t limit,
const std::function<void()> &on_all_replied);
/// Try to Get the information about the agent process.
///
/// \param[out] agent_info The information of the agent process.
/// \return Status, if successful will return `ray::Status::OK`,
/// otherwise will return `ray::Status::Invalid`.
const ray::Status TryToGetAgentInfo(rpc::AgentInfo *agent_info) const;
private:
/// Methods for handling nodes.

View file

@ -97,6 +97,7 @@ Raylet::~Raylet() {}
void Raylet::Start() {
RAY_CHECK_OK(RegisterGcs());
// Start listening for clients.
DoAccept();
}
@ -108,21 +109,6 @@ void Raylet::Stop() {
}
ray::Status Raylet::RegisterGcs() {
rpc::AgentInfo agent_info;
auto status = node_manager_.TryToGetAgentInfo(&agent_info);
if (status.ok()) {
self_node_info_.mutable_agent_info()->CopyFrom(agent_info);
} else {
// Because current function and `AgentManager::HandleRegisterAgent`
// will be invoke in same thread, so we need post current function
// into main_service_ after interval milliseconds.
std::this_thread::sleep_for(std::chrono::milliseconds(
RayConfig::instance().raylet_get_agent_info_interval_ms()));
main_service_.post([this]() { RAY_CHECK_OK(RegisterGcs()); },
"Raylet.TryToGetAgentInfoAndRegisterGcs");
return Status::OK();
}
auto register_callback = [this](const Status &status) {
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Raylet of id, " << self_node_id_

View file

@ -68,7 +68,7 @@ class Raylet {
NodeID GetNodeId() const { return self_node_id_; }
private:
/// Try to get agent info, after its success, register the current node to GCS.
/// Register GCS client.
ray::Status RegisterGcs();
/// Accept a client connection.

View file

@ -503,8 +503,7 @@ class WorkerPoolTest : public ::testing::Test {
false);
rpc::RegisterAgentRequest request;
// Set agent port to a nonzero value to avoid invalid agent client.
request.mutable_agent_info()->set_grpc_port(12345);
request.mutable_agent_info()->set_http_port(54321);
request.set_agent_port(12345);
rpc::RegisterAgentReply reply;
auto send_reply_callback =
[](ray::Status status, std::function<void()> f1, std::function<void()> f2) {};