[autoscaler] Fix tag cache bug, don't kill workers on error (#14424)

This commit is contained in:
Alex Wu 2021-03-02 11:06:06 -08:00 committed by GitHub
parent d921dca075
commit 4572c6cf0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 10 deletions

View file

@ -427,8 +427,12 @@ class AWSNodeProvider(NodeProvider):
else:
node.terminate()
self.tag_cache.pop(node_id, None)
self.tag_cache_pending.pop(node_id, None)
# TODO (Alex): We are leaking the tag cache here. Naively, we would
# want to just remove the cache entry here, but terminating can be
# asyncrhonous or error, which would result in a use after free error.
# If this leak becomes bad, we can garbage collect the tag cache when
# the node cache is updated.
pass
def terminate_nodes(self, node_ids):
if not node_ids:
@ -463,10 +467,6 @@ class AWSNodeProvider(NodeProvider):
else:
self.ec2.meta.client.terminate_instances(InstanceIds=node_ids)
for node_id in node_ids:
self.tag_cache.pop(node_id, None)
self.tag_cache_pending.pop(node_id, None)
def _get_node(self, node_id):
"""Refresh and get info for this node, updating the cache."""
self.non_terminated_nodes({}) # Side effect: updates cache

View file

@ -414,12 +414,19 @@ class _CliLogger():
record.levelname = _level_str
rendered_message = self._formatter.format(record)
# We aren't using standard python logging convention, so we hardcode
# the log levels for now.
if _level_str in ["WARNING", "ERROR", "PANIC"]:
stream = sys.stderr
else:
stream = sys.stdout
if not _linefeed:
sys.stdout.write(rendered_message)
sys.stdout.flush()
stream.write(rendered_message)
stream.flush()
return
print(rendered_message)
print(rendered_message, file=stream)
def indented(self):
"""Context manager that starts an indented block of output.

View file

@ -240,7 +240,8 @@ class Monitor:
def _handle_failure(self, error):
logger.exception("Error in monitor loop")
if self.autoscaler is not None:
if self.autoscaler is not None and \
os.environ.get("RAY_AUTOSCALER_FATESHARE_WORKERS", "") == "1":
self.autoscaler.kill_workers()
# Take down autoscaler workers if necessary.
self.destroy_autoscaler_workers()