mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[autoscaler] Create provider exactly once (#10703)
Co-authored-by: Alex Wu <itswu.alex@gmail.com>
This commit is contained in:
parent
67bf396ae7
commit
eb025ea8cb
2 changed files with 13 additions and 5 deletions
|
@ -64,6 +64,9 @@ class StandardAutoscaler:
|
|||
process_runner=subprocess,
|
||||
update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S):
|
||||
self.config_path = config_path
|
||||
# Keep this before self.reset (self.provider needs to be created
|
||||
# exactly once).
|
||||
self.provider = None
|
||||
self.reset(errors_fatal=True)
|
||||
self.load_metrics = load_metrics
|
||||
|
||||
|
@ -250,6 +253,7 @@ class StandardAutoscaler:
|
|||
self.should_update(node_id) for node_id in nodes):
|
||||
if node_id is not None:
|
||||
resources = self._node_resources(node_id)
|
||||
logger.debug(f"{node_id}: Starting new thread runner.")
|
||||
T.append(
|
||||
threading.Thread(
|
||||
target=self.spawn_updater,
|
||||
|
@ -295,9 +299,9 @@ class StandardAutoscaler:
|
|||
self.config = new_config
|
||||
self.runtime_hash = new_runtime_hash
|
||||
self.file_mounts_contents_hash = new_file_mounts_contents_hash
|
||||
|
||||
self.provider = get_node_provider(self.config["provider"],
|
||||
self.config["cluster_name"])
|
||||
if not self.provider:
|
||||
self.provider = get_node_provider(self.config["provider"],
|
||||
self.config["cluster_name"])
|
||||
# Check whether we can enable the resource demand scheduler.
|
||||
if "available_node_types" in self.config:
|
||||
self.available_node_types = self.config["available_node_types"]
|
||||
|
@ -462,6 +466,8 @@ class StandardAutoscaler:
|
|||
|
||||
def spawn_updater(self, node_id, init_commands, ray_start_commands,
|
||||
node_resources, docker_config):
|
||||
logger.info(f"Creating new (spawn_updater) updater thread for node"
|
||||
f" {node_id}.")
|
||||
updater = NodeUpdaterThread(
|
||||
node_id=node_id,
|
||||
provider_config=self.config["provider"],
|
||||
|
@ -492,6 +498,8 @@ class StandardAutoscaler:
|
|||
return False
|
||||
if self.num_failed_updates.get(node_id, 0) > 0: # TODO(ekl) retry?
|
||||
return False
|
||||
logger.debug(f"{node_id} is not being updated and "
|
||||
"passes config check (can_update=True).")
|
||||
return True
|
||||
|
||||
def launch_new_node(self, count: int, node_type: Optional[str]) -> None:
|
||||
|
|
|
@ -455,7 +455,7 @@ def kill_node(config_file, yes, hard, override_cluster_name):
|
|||
|
||||
def monitor_cluster(cluster_config_file, num_lines, override_cluster_name):
|
||||
"""Tails the autoscaler logs of a Ray cluster."""
|
||||
cmd = "tail -n {} -f /tmp/ray/session_*/logs/monitor*".format(num_lines)
|
||||
cmd = f"tail -n {num_lines} -f /tmp/ray/session_latest/logs/monitor*"
|
||||
exec_cluster(
|
||||
cluster_config_file,
|
||||
cmd=cmd,
|
||||
|
@ -717,7 +717,7 @@ def get_or_create_head_node(config,
|
|||
logger, "get_or_create_head_node: "
|
||||
"Head node up-to-date, IP address is: {}", head_node_ip)
|
||||
|
||||
monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
|
||||
monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*"
|
||||
if override_cluster_name:
|
||||
modifiers = " --cluster-name={}".format(
|
||||
quote(override_cluster_name))
|
||||
|
|
Loading…
Add table
Reference in a new issue