Revert "Revert "use an agent-id rather than the process PID (#24968)"… (#25376)

This commit is contained in:
SangBin Cho 2022-06-02 08:28:48 +09:00 committed by GitHub
parent 80168a09a6
commit cb151d5ad6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 42 deletions

View file

@ -43,17 +43,19 @@ class DashboardAgent(object):
dashboard_agent_port, dashboard_agent_port,
gcs_address, gcs_address,
minimal, minimal,
temp_dir=None,
session_dir=None,
runtime_env_dir=None,
log_dir=None,
metrics_export_port=None, metrics_export_port=None,
node_manager_port=None, node_manager_port=None,
listen_port=0, listen_port=0,
object_store_name=None,
raylet_name=None,
logging_params=None,
disable_metrics_collection: bool = False, disable_metrics_collection: bool = False,
*, # the following are required kwargs
object_store_name: str,
raylet_name: str,
log_dir: str,
temp_dir: str,
session_dir: str,
runtime_env_dir: str,
logging_params: dict,
agent_id: int,
): ):
"""Initialize the DashboardAgent object.""" """Initialize the DashboardAgent object."""
# Public attributes are accessible for all agent modules. # Public attributes are accessible for all agent modules.
@ -76,6 +78,7 @@ class DashboardAgent(object):
self.logging_params = logging_params self.logging_params = logging_params
self.node_id = os.environ["RAY_NODE_ID"] self.node_id = os.environ["RAY_NODE_ID"]
self.metrics_collection_disabled = disable_metrics_collection self.metrics_collection_disabled = disable_metrics_collection
self.agent_id = agent_id
# TODO(edoakes): RAY_RAYLET_PID isn't properly set on Windows. This is # TODO(edoakes): RAY_RAYLET_PID isn't properly set on Windows. This is
# only used for fate-sharing with the raylet and we need a different # only used for fate-sharing with the raylet and we need a different
# fate-sharing mechanism for Windows anyways. # fate-sharing mechanism for Windows anyways.
@ -203,7 +206,7 @@ class DashboardAgent(object):
await raylet_stub.RegisterAgent( await raylet_stub.RegisterAgent(
agent_manager_pb2.RegisterAgentRequest( agent_manager_pb2.RegisterAgentRequest(
agent_pid=os.getpid(), agent_id=self.agent_id,
agent_port=self.grpc_port, agent_port=self.grpc_port,
agent_ip_address=self.ip, agent_ip_address=self.ip,
) )
@ -354,6 +357,12 @@ if __name__ == "__main__":
action="store_true", action="store_true",
help=("If this arg is set, metrics report won't be enabled from the agent."), help=("If this arg is set, metrics report won't be enabled from the agent."),
) )
parser.add_argument(
"--agent-id",
required=True,
type=int,
help="ID to report when registering with raylet",
)
args = parser.parse_args() args = parser.parse_args()
try: try:
@ -383,6 +392,7 @@ if __name__ == "__main__":
raylet_name=args.raylet_name, raylet_name=args.raylet_name,
logging_params=logging_params, logging_params=logging_params,
disable_metrics_collection=args.disable_metrics_collection, disable_metrics_collection=args.disable_metrics_collection,
agent_id=args.agent_id,
) )
if os.environ.get("_RAY_AGENT_FAILING"): if os.environ.get("_RAY_AGENT_FAILING"):
raise Exception("Failure injection failure.") raise Exception("Failure injection failure.")

View file

@ -25,7 +25,7 @@ enum AgentRpcStatus {
} }
message RegisterAgentRequest { message RegisterAgentRequest {
int32 agent_pid = 1; int32 agent_id = 1;
int32 agent_port = 2; int32 agent_port = 2;
string agent_ip_address = 3; string agent_ip_address = 3;
} }

View file

@ -29,19 +29,19 @@ namespace raylet {
void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request, void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request,
rpc::RegisterAgentReply *reply, rpc::RegisterAgentReply *reply,
rpc::SendReplyCallback send_reply_callback) { rpc::SendReplyCallback send_reply_callback) {
agent_ip_address_ = request.agent_ip_address(); reported_agent_ip_address_ = request.agent_ip_address();
agent_port_ = request.agent_port(); reported_agent_port_ = request.agent_port();
agent_pid_ = request.agent_pid(); reported_agent_id_ = request.agent_id();
// TODO(SongGuyang): We should remove this after we find better port resolution. // TODO(SongGuyang): We should remove this after we find better port resolution.
// Note: `agent_port_` should be 0 if the grpc port of agent is in conflict. // Note: `agent_port_` should be 0 if the grpc port of agent is in conflict.
if (agent_port_ != 0) { if (reported_agent_port_ != 0) {
runtime_env_agent_client_ = runtime_env_agent_client_ = runtime_env_agent_client_factory_(
runtime_env_agent_client_factory_(agent_ip_address_, agent_port_); reported_agent_ip_address_, reported_agent_port_);
RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_ RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << reported_agent_ip_address_
<< ", port: " << agent_port_ << ", pid: " << agent_pid_; << ", port: " << reported_agent_port_ << ", id: " << reported_agent_id_;
} else { } else {
RAY_LOG(WARNING) << "The GRPC port of the Ray agent is invalid (0), ip: " RAY_LOG(WARNING) << "The GRPC port of the Ray agent is invalid (0), ip: "
<< agent_ip_address_ << ", pid: " << agent_pid_ << reported_agent_ip_address_ << ", id: " << reported_agent_id_
<< ". The agent client in the raylet has been disabled."; << ". The agent client in the raylet has been disabled.";
disable_agent_client_ = true; disable_agent_client_ = true;
} }
@ -56,30 +56,45 @@ void AgentManager::StartAgent() {
return; return;
} }
if (RAY_LOG_ENABLED(DEBUG)) { // Create a non-zero random agent_id to pass to the child process
std::stringstream stream; // We cannot use pid an id because os.getpid() from the python process is not
stream << "Starting agent process with command:"; // reliable when using a launcher.
for (const auto &arg : options_.agent_commands) { // See https://github.com/ray-project/ray/issues/24361 and Python issue
stream << " " << arg; // https://github.com/python/cpython/issues/83086
} int agent_id = 0;
RAY_LOG(DEBUG) << stream.str(); while (agent_id == 0) {
agent_id = rand();
} }
const std::string agent_id_str = std::to_string(agent_id);
// Launch the process to create the agent.
std::error_code ec;
std::vector<const char *> argv; std::vector<const char *> argv;
for (const std::string &arg : options_.agent_commands) { for (const std::string &arg : options_.agent_commands) {
argv.push_back(arg.c_str()); argv.push_back(arg.c_str());
} }
argv.push_back("--agent-id");
argv.push_back(agent_id_str.c_str());
// Disable metrics report if needed. // Disable metrics report if needed.
if (!RayConfig::instance().enable_metrics_collection()) { if (!RayConfig::instance().enable_metrics_collection()) {
argv.push_back("--disable-metrics-collection"); argv.push_back("--disable-metrics-collection");
} }
argv.push_back(NULL); argv.push_back(NULL);
if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting agent process with command:";
for (const auto &arg : argv) {
stream << " " << arg;
}
RAY_LOG(DEBUG) << stream.str();
}
// Set node id to agent. // Set node id to agent.
ProcessEnvironment env; ProcessEnvironment env;
env.insert({"RAY_NODE_ID", options_.node_id.Hex()}); env.insert({"RAY_NODE_ID", options_.node_id.Hex()});
env.insert({"RAY_RAYLET_PID", std::to_string(getpid())}); env.insert({"RAY_RAYLET_PID", std::to_string(getpid())});
// Launch the process to create the agent.
std::error_code ec;
Process child(argv.data(), nullptr, ec, false, env); Process child(argv.data(), nullptr, ec, false, env);
if (!child.IsValid() || ec) { if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error. // The worker failed to start. This is a fatal error.
@ -87,17 +102,23 @@ void AgentManager::StartAgent() {
<< ec.message(); << ec.message();
} }
std::thread monitor_thread([this, child]() mutable { std::thread monitor_thread([this, child, agent_id]() mutable {
SetThreadName("agent.monitor"); SetThreadName("agent.monitor");
RAY_LOG(INFO) << "Monitor agent process with pid " << child.GetId() RAY_LOG(INFO) << "Monitor agent process with id " << agent_id << ", register timeout "
<< ", register timeout "
<< RayConfig::instance().agent_register_timeout_ms() << "ms."; << RayConfig::instance().agent_register_timeout_ms() << "ms.";
auto timer = delay_executor_( auto timer = delay_executor_(
[this, child]() mutable { [this, child, agent_id]() mutable {
if (agent_pid_ != child.GetId()) { if (reported_agent_id_ != agent_id) {
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId() if (reported_agent_id_ == 0) {
<< " has not registered. ip " << agent_ip_address_ RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< ", pid " << agent_pid_; << " timed out before registering. ip "
<< reported_agent_ip_address_ << ", id "
<< reported_agent_id_;
} else {
RAY_LOG(WARNING) << "Agent process expected id " << agent_id
<< " but got id " << reported_agent_id_
<< ", this is a fatal error";
}
child.Kill(); child.Kill();
} }
}, },
@ -105,9 +126,9 @@ void AgentManager::StartAgent() {
int exit_code = child.Wait(); int exit_code = child.Wait();
timer->cancel(); timer->cancel();
RAY_LOG(WARNING) << "Agent process with pid " << child.GetId() RAY_LOG(WARNING) << "Agent process with id " << agent_id << " exit, return value "
<< " exit, return value " << exit_code << ". ip " << exit_code << ". ip " << reported_agent_ip_address_ << ". id "
<< agent_ip_address_ << ". pid " << agent_pid_; << reported_agent_id_;
RAY_LOG(ERROR) RAY_LOG(ERROR)
<< "The raylet exited immediately because the Ray agent failed. " << "The raylet exited immediately because the Ray agent failed. "
"The raylet fate shares with the agent. This can happen because the " "The raylet fate shares with the agent. This can happen because the "

View file

@ -93,12 +93,12 @@ class AgentManager : public rpc::AgentManagerServiceHandler {
private: private:
Options options_; Options options_;
pid_t agent_pid_ = 0; pid_t reported_agent_id_ = 0;
int agent_port_ = 0; int reported_agent_port_ = 0;
/// Whether or not we intend to start the agent. This is false if we /// Whether or not we intend to start the agent. This is false if we
/// are missing Ray Dashboard dependencies, for example. /// are missing Ray Dashboard dependencies, for example.
bool should_start_agent_ = true; bool should_start_agent_ = true;
std::string agent_ip_address_; std::string reported_agent_ip_address_;
DelayExecutorFn delay_executor_; DelayExecutorFn delay_executor_;
RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_; RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_;
std::shared_ptr<rpc::RuntimeEnvAgentClientInterface> runtime_env_agent_client_; std::shared_ptr<rpc::RuntimeEnvAgentClientInterface> runtime_env_agent_client_;