2017-12-15 23:56:39 -08:00
|
|
|
from __future__ import absolute_import
|
|
|
|
from __future__ import division
|
|
|
|
from __future__ import print_function
|
|
|
|
|
|
|
|
import shutil
|
|
|
|
import tempfile
|
|
|
|
import time
|
|
|
|
import unittest
|
|
|
|
import yaml
|
2018-03-04 23:35:58 -08:00
|
|
|
import copy
|
2017-12-15 23:56:39 -08:00
|
|
|
|
|
|
|
import ray
|
2018-03-04 23:35:58 -08:00
|
|
|
from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics, \
|
|
|
|
fillout_defaults, validate_config
|
2017-12-15 23:56:39 -08:00
|
|
|
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS
|
|
|
|
from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider
|
|
|
|
from ray.autoscaler.updater import NodeUpdaterThread
|
|
|
|
|
|
|
|
|
|
|
|
class MockNode(object):
|
|
|
|
def __init__(self, node_id, tags):
|
|
|
|
self.node_id = node_id
|
|
|
|
self.state = "pending"
|
|
|
|
self.tags = tags
|
|
|
|
self.external_ip = "1.2.3.4"
|
2017-12-31 14:39:57 -08:00
|
|
|
self.internal_ip = "172.0.0.{}".format(self.node_id)
|
2017-12-15 23:56:39 -08:00
|
|
|
|
|
|
|
def matches(self, tags):
|
|
|
|
for k, v in tags.items():
|
|
|
|
if k not in self.tags or self.tags[k] != v:
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
class MockProcessRunner(object):
|
|
|
|
def __init__(self, fail_cmds=[]):
|
|
|
|
self.calls = []
|
|
|
|
self.fail_cmds = fail_cmds
|
|
|
|
|
|
|
|
def check_call(self, cmd, *args, **kwargs):
|
|
|
|
for token in self.fail_cmds:
|
|
|
|
if token in str(cmd):
|
|
|
|
raise Exception("Failing command on purpose")
|
|
|
|
self.calls.append(cmd)
|
|
|
|
|
|
|
|
|
|
|
|
class MockProvider(NodeProvider):
|
|
|
|
def __init__(self):
|
|
|
|
self.mock_nodes = {}
|
|
|
|
self.next_id = 0
|
|
|
|
self.throw = False
|
|
|
|
self.fail_creates = False
|
|
|
|
|
|
|
|
def nodes(self, tag_filters):
|
|
|
|
if self.throw:
|
|
|
|
raise Exception("oops")
|
|
|
|
return [
|
|
|
|
n.node_id for n in self.mock_nodes.values()
|
2018-04-11 10:11:35 -07:00
|
|
|
if n.matches(tag_filters) and n.state != "terminated"
|
|
|
|
]
|
2017-12-15 23:56:39 -08:00
|
|
|
|
|
|
|
def is_running(self, node_id):
|
|
|
|
return self.mock_nodes[node_id].state == "running"
|
|
|
|
|
|
|
|
def is_terminated(self, node_id):
|
|
|
|
return self.mock_nodes[node_id].state == "terminated"
|
|
|
|
|
|
|
|
def node_tags(self, node_id):
|
|
|
|
return self.mock_nodes[node_id].tags
|
|
|
|
|
2017-12-31 14:39:57 -08:00
|
|
|
def internal_ip(self, node_id):
|
|
|
|
return self.mock_nodes[node_id].internal_ip
|
|
|
|
|
2017-12-15 23:56:39 -08:00
|
|
|
def external_ip(self, node_id):
|
|
|
|
return self.mock_nodes[node_id].external_ip
|
|
|
|
|
|
|
|
def create_node(self, node_config, tags, count):
|
|
|
|
if self.fail_creates:
|
|
|
|
return
|
|
|
|
for _ in range(count):
|
|
|
|
self.mock_nodes[self.next_id] = MockNode(self.next_id, tags)
|
|
|
|
self.next_id += 1
|
|
|
|
|
|
|
|
def set_node_tags(self, node_id, tags):
|
|
|
|
self.mock_nodes[node_id].tags.update(tags)
|
|
|
|
|
|
|
|
def terminate_node(self, node_id):
|
|
|
|
self.mock_nodes[node_id].state = "terminated"
|
|
|
|
|
|
|
|
|
|
|
|
SMALL_CLUSTER = {
|
|
|
|
"cluster_name": "default",
|
|
|
|
"min_workers": 2,
|
|
|
|
"max_workers": 2,
|
2017-12-31 14:39:57 -08:00
|
|
|
"target_utilization_fraction": 0.8,
|
|
|
|
"idle_timeout_minutes": 5,
|
2017-12-15 23:56:39 -08:00
|
|
|
"provider": {
|
|
|
|
"type": "mock",
|
|
|
|
"region": "us-east-1",
|
2018-01-10 13:18:34 -08:00
|
|
|
"availability_zone": "us-east-1a",
|
2017-12-15 23:56:39 -08:00
|
|
|
},
|
2018-02-20 00:24:01 -08:00
|
|
|
"docker": {
|
|
|
|
"image": "example",
|
|
|
|
"container_name": "mock",
|
|
|
|
},
|
2017-12-15 23:56:39 -08:00
|
|
|
"auth": {
|
|
|
|
"ssh_user": "ubuntu",
|
|
|
|
"ssh_private_key": "/dev/null",
|
|
|
|
},
|
|
|
|
"head_node": {
|
|
|
|
"TestProp": 1,
|
|
|
|
},
|
|
|
|
"worker_nodes": {
|
|
|
|
"TestProp": 2,
|
|
|
|
},
|
|
|
|
"file_mounts": {},
|
2017-12-31 14:39:57 -08:00
|
|
|
"setup_commands": ["cmd1"],
|
|
|
|
"head_setup_commands": ["cmd2"],
|
|
|
|
"worker_setup_commands": ["cmd3"],
|
|
|
|
"head_start_ray_commands": ["start_ray_head"],
|
|
|
|
"worker_start_ray_commands": ["start_ray_worker"],
|
2017-12-15 23:56:39 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-12-31 14:39:57 -08:00
|
|
|
class LoadMetricsTest(unittest.TestCase):
|
|
|
|
def testUpdate(self):
|
|
|
|
lm = LoadMetrics()
|
|
|
|
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
|
|
|
|
self.assertEqual(lm.approx_workers_used(), 0.5)
|
|
|
|
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
|
|
|
|
self.assertEqual(lm.approx_workers_used(), 1.0)
|
|
|
|
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0})
|
|
|
|
self.assertEqual(lm.approx_workers_used(), 2.0)
|
|
|
|
|
|
|
|
def testPruneByNodeIp(self):
|
|
|
|
lm = LoadMetrics()
|
|
|
|
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0})
|
|
|
|
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0})
|
|
|
|
lm.prune_active_ips({"1.1.1.1", "4.4.4.4"})
|
|
|
|
self.assertEqual(lm.approx_workers_used(), 1.0)
|
|
|
|
|
|
|
|
def testBottleneckResource(self):
|
|
|
|
lm = LoadMetrics()
|
|
|
|
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
|
|
|
|
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
|
|
|
|
self.assertEqual(lm.approx_workers_used(), 1.88)
|
|
|
|
|
|
|
|
def testHeartbeat(self):
|
|
|
|
lm = LoadMetrics()
|
|
|
|
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
|
|
|
|
lm.mark_active("2.2.2.2")
|
|
|
|
self.assertIn("1.1.1.1", lm.last_heartbeat_time_by_ip)
|
|
|
|
self.assertIn("2.2.2.2", lm.last_heartbeat_time_by_ip)
|
|
|
|
self.assertNotIn("3.3.3.3", lm.last_heartbeat_time_by_ip)
|
|
|
|
|
|
|
|
def testDebugString(self):
|
|
|
|
lm = LoadMetrics()
|
|
|
|
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
|
|
|
|
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
|
|
|
|
debug = lm.debug_string()
|
|
|
|
self.assertIn("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU", debug)
|
|
|
|
self.assertIn("NumNodesConnected: 2", debug)
|
|
|
|
self.assertIn("NumNodesUsed: 1.88", debug)
|
|
|
|
|
|
|
|
|
2017-12-15 23:56:39 -08:00
|
|
|
class AutoscalingTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
NODE_PROVIDERS["mock"] = \
|
|
|
|
lambda: (None, self.create_provider)
|
|
|
|
self.provider = None
|
|
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
del NODE_PROVIDERS["mock"]
|
|
|
|
shutil.rmtree(self.tmpdir)
|
|
|
|
ray.worker.cleanup()
|
|
|
|
|
|
|
|
def waitFor(self, condition):
|
|
|
|
for _ in range(50):
|
|
|
|
if condition():
|
|
|
|
return
|
|
|
|
time.sleep(.1)
|
|
|
|
raise Exception("Timed out waiting for {}".format(condition))
|
|
|
|
|
|
|
|
def create_provider(self, config, cluster_name):
|
|
|
|
assert self.provider
|
|
|
|
return self.provider
|
|
|
|
|
|
|
|
def write_config(self, config):
|
|
|
|
path = self.tmpdir + "/simple.yaml"
|
|
|
|
with open(path, "w") as f:
|
|
|
|
f.write(yaml.dump(config))
|
|
|
|
return path
|
|
|
|
|
|
|
|
def testInvalidConfig(self):
|
|
|
|
invalid_config = "/dev/null"
|
|
|
|
self.assertRaises(
|
2017-12-31 14:39:57 -08:00
|
|
|
ValueError,
|
|
|
|
lambda: StandardAutoscaler(
|
|
|
|
invalid_config, LoadMetrics(), update_interval_s=0))
|
2017-12-15 23:56:39 -08:00
|
|
|
|
2018-03-04 23:35:58 -08:00
|
|
|
def testValidation(self):
|
|
|
|
"""Ensures that schema validation is working."""
|
|
|
|
config = copy.deepcopy(SMALL_CLUSTER)
|
|
|
|
try:
|
|
|
|
validate_config(config)
|
|
|
|
except Exception:
|
|
|
|
self.fail("Test config did not pass validation test!")
|
|
|
|
|
|
|
|
config["blah"] = "blah"
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
validate_config(config)
|
|
|
|
del config["blah"]
|
|
|
|
|
|
|
|
config["provider"]["blah"] = "blah"
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
validate_config(config)
|
|
|
|
del config["provider"]["blah"]
|
|
|
|
|
|
|
|
del config["provider"]
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
validate_config(config)
|
|
|
|
|
|
|
|
def testValidateDefaultConfig(self):
|
|
|
|
config = {}
|
|
|
|
config["provider"] = {
|
|
|
|
"type": "aws",
|
|
|
|
"region": "us-east-1",
|
|
|
|
"availability_zone": "us-east-1a",
|
|
|
|
}
|
|
|
|
config = fillout_defaults(config)
|
|
|
|
try:
|
|
|
|
validate_config(config)
|
|
|
|
except Exception:
|
|
|
|
self.fail("Default config did not pass validation test!")
|
|
|
|
|
2017-12-15 23:56:39 -08:00
|
|
|
def testScaleUp(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
2017-12-31 14:39:57 -08:00
|
|
|
autoscaler = StandardAutoscaler(
|
|
|
|
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
self.assertEqual(len(self.provider.nodes({})), 0)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
|
|
|
|
def testTerminateOutdatedNodesGracefully(self):
|
|
|
|
config = SMALL_CLUSTER.copy()
|
|
|
|
config["min_workers"] = 5
|
|
|
|
config["max_workers"] = 5
|
|
|
|
config_path = self.write_config(config)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "Worker"}, 10)
|
2017-12-31 14:39:57 -08:00
|
|
|
autoscaler = StandardAutoscaler(
|
|
|
|
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
self.assertEqual(len(self.provider.nodes({})), 10)
|
|
|
|
|
|
|
|
# Gradually scales down to meet target size, never going too low
|
|
|
|
for _ in range(10):
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertLessEqual(len(self.provider.nodes({})), 5)
|
|
|
|
self.assertGreaterEqual(len(self.provider.nodes({})), 4)
|
|
|
|
|
|
|
|
# Eventually reaches steady state
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 5)
|
|
|
|
|
|
|
|
def testDynamicScaling(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
autoscaler = StandardAutoscaler(
|
2018-04-11 10:11:35 -07:00
|
|
|
config_path,
|
|
|
|
LoadMetrics(),
|
|
|
|
max_concurrent_launches=5,
|
|
|
|
max_failures=0,
|
|
|
|
update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
self.assertEqual(len(self.provider.nodes({})), 0)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
|
|
|
|
# Update the config to reduce the cluster size
|
|
|
|
new_config = SMALL_CLUSTER.copy()
|
|
|
|
new_config["max_workers"] = 1
|
|
|
|
self.write_config(new_config)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 1)
|
|
|
|
|
|
|
|
# Update the config to reduce the cluster size
|
2017-12-31 14:39:57 -08:00
|
|
|
new_config["min_workers"] = 10
|
2017-12-15 23:56:39 -08:00
|
|
|
new_config["max_workers"] = 10
|
|
|
|
self.write_config(new_config)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 6)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 10)
|
|
|
|
|
2017-12-31 14:39:57 -08:00
|
|
|
def testUpdateThrottling(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
autoscaler = StandardAutoscaler(
|
2018-04-11 10:11:35 -07:00
|
|
|
config_path,
|
|
|
|
LoadMetrics(),
|
|
|
|
max_concurrent_launches=5,
|
|
|
|
max_failures=0,
|
|
|
|
update_interval_s=10)
|
2017-12-31 14:39:57 -08:00
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
new_config = SMALL_CLUSTER.copy()
|
|
|
|
new_config["max_workers"] = 1
|
|
|
|
self.write_config(new_config)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2) # not updated yet
|
|
|
|
|
2017-12-15 23:56:39 -08:00
|
|
|
def testLaunchConfigChange(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
2017-12-31 14:39:57 -08:00
|
|
|
autoscaler = StandardAutoscaler(
|
|
|
|
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
|
|
|
|
# Update the config to change the node type
|
|
|
|
new_config = SMALL_CLUSTER.copy()
|
|
|
|
new_config["worker_nodes"]["InstanceType"] = "updated"
|
|
|
|
self.write_config(new_config)
|
|
|
|
existing_nodes = set(self.provider.nodes({}))
|
|
|
|
for _ in range(5):
|
|
|
|
autoscaler.update()
|
|
|
|
new_nodes = set(self.provider.nodes({}))
|
|
|
|
self.assertEqual(len(new_nodes), 2)
|
|
|
|
self.assertEqual(len(new_nodes.intersection(existing_nodes)), 0)
|
|
|
|
|
|
|
|
def testIgnoresCorruptedConfig(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
autoscaler = StandardAutoscaler(
|
2018-04-11 10:11:35 -07:00
|
|
|
config_path,
|
|
|
|
LoadMetrics(),
|
|
|
|
max_concurrent_launches=10,
|
|
|
|
max_failures=0,
|
|
|
|
update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
|
|
|
|
# Write a corrupted config
|
|
|
|
self.write_config("asdf")
|
|
|
|
for _ in range(10):
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
|
|
|
|
# New a good config again
|
|
|
|
new_config = SMALL_CLUSTER.copy()
|
2017-12-31 14:39:57 -08:00
|
|
|
new_config["min_workers"] = 10
|
2017-12-15 23:56:39 -08:00
|
|
|
new_config["max_workers"] = 10
|
|
|
|
self.write_config(new_config)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 10)
|
|
|
|
|
|
|
|
def testMaxFailures(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
self.provider.throw = True
|
2017-12-31 14:39:57 -08:00
|
|
|
autoscaler = StandardAutoscaler(
|
|
|
|
config_path, LoadMetrics(), max_failures=2, update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertRaises(Exception, autoscaler.update)
|
|
|
|
|
|
|
|
def testAbortOnCreationFailures(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
self.provider.fail_creates = True
|
2017-12-31 14:39:57 -08:00
|
|
|
autoscaler = StandardAutoscaler(
|
|
|
|
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
self.assertRaises(AssertionError, autoscaler.update)
|
|
|
|
|
|
|
|
def testLaunchNewNodeOnOutOfBandTerminate(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
2017-12-31 14:39:57 -08:00
|
|
|
autoscaler = StandardAutoscaler(
|
|
|
|
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
for node in self.provider.mock_nodes.values():
|
|
|
|
node.state = "terminated"
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 0)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
|
|
|
|
def testConfiguresNewNodes(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
runner = MockProcessRunner()
|
|
|
|
autoscaler = StandardAutoscaler(
|
2018-04-11 10:11:35 -07:00
|
|
|
config_path,
|
|
|
|
LoadMetrics(),
|
|
|
|
max_failures=0,
|
|
|
|
process_runner=runner,
|
|
|
|
verbose_updates=True,
|
|
|
|
node_updater_cls=NodeUpdaterThread,
|
2017-12-31 14:39:57 -08:00
|
|
|
update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
for node in self.provider.mock_nodes.values():
|
|
|
|
node.state = "running"
|
2018-04-11 10:11:35 -07:00
|
|
|
assert len(
|
|
|
|
self.provider.nodes({
|
|
|
|
TAG_RAY_NODE_STATUS: "Uninitialized"
|
|
|
|
})) == 2
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
self.waitFor(
|
|
|
|
lambda: len(self.provider.nodes(
|
|
|
|
{TAG_RAY_NODE_STATUS: "Up-to-date"})) == 2)
|
|
|
|
|
|
|
|
def testReportsConfigFailures(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
runner = MockProcessRunner(fail_cmds=["cmd1"])
|
|
|
|
autoscaler = StandardAutoscaler(
|
2018-04-11 10:11:35 -07:00
|
|
|
config_path,
|
|
|
|
LoadMetrics(),
|
|
|
|
max_failures=0,
|
|
|
|
process_runner=runner,
|
|
|
|
verbose_updates=True,
|
|
|
|
node_updater_cls=NodeUpdaterThread,
|
2017-12-31 14:39:57 -08:00
|
|
|
update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
for node in self.provider.mock_nodes.values():
|
|
|
|
node.state = "running"
|
2018-04-11 10:11:35 -07:00
|
|
|
assert len(
|
|
|
|
self.provider.nodes({
|
|
|
|
TAG_RAY_NODE_STATUS: "Uninitialized"
|
|
|
|
})) == 2
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
self.waitFor(
|
|
|
|
lambda: len(self.provider.nodes(
|
|
|
|
{TAG_RAY_NODE_STATUS: "UpdateFailed"})) == 2)
|
|
|
|
|
|
|
|
def testConfiguresOutdatedNodes(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
runner = MockProcessRunner()
|
|
|
|
autoscaler = StandardAutoscaler(
|
2018-04-11 10:11:35 -07:00
|
|
|
config_path,
|
|
|
|
LoadMetrics(),
|
|
|
|
max_failures=0,
|
|
|
|
process_runner=runner,
|
|
|
|
verbose_updates=True,
|
|
|
|
node_updater_cls=NodeUpdaterThread,
|
2017-12-31 14:39:57 -08:00
|
|
|
update_interval_s=0)
|
2017-12-15 23:56:39 -08:00
|
|
|
autoscaler.update()
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
for node in self.provider.mock_nodes.values():
|
|
|
|
node.state = "running"
|
|
|
|
autoscaler.update()
|
|
|
|
self.waitFor(
|
|
|
|
lambda: len(self.provider.nodes(
|
|
|
|
{TAG_RAY_NODE_STATUS: "Up-to-date"})) == 2)
|
|
|
|
runner.calls = []
|
|
|
|
new_config = SMALL_CLUSTER.copy()
|
2017-12-31 14:39:57 -08:00
|
|
|
new_config["worker_setup_commands"] = ["cmdX", "cmdY"]
|
2017-12-15 23:56:39 -08:00
|
|
|
self.write_config(new_config)
|
|
|
|
autoscaler.update()
|
|
|
|
autoscaler.update()
|
|
|
|
self.waitFor(lambda: len(runner.calls) > 0)
|
|
|
|
|
2017-12-31 14:39:57 -08:00
|
|
|
def testScaleUpBasedOnLoad(self):
|
|
|
|
config = SMALL_CLUSTER.copy()
|
|
|
|
config["min_workers"] = 2
|
|
|
|
config["max_workers"] = 10
|
|
|
|
config["target_utilization_fraction"] = 0.5
|
|
|
|
config_path = self.write_config(config)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
lm = LoadMetrics()
|
|
|
|
autoscaler = StandardAutoscaler(
|
|
|
|
config_path, lm, max_failures=0, update_interval_s=0)
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 0)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
|
|
|
|
# Scales up as nodes are reported as used
|
|
|
|
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0})
|
|
|
|
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0})
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 4)
|
|
|
|
lm.update("172.0.0.2", {"CPU": 2}, {"CPU": 0})
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 6)
|
|
|
|
|
|
|
|
# Holds steady when load is removed
|
|
|
|
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2})
|
|
|
|
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2})
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 6)
|
|
|
|
|
|
|
|
# Scales down as nodes become unused
|
|
|
|
lm.last_used_time_by_ip["172.0.0.0"] = 0
|
|
|
|
lm.last_used_time_by_ip["172.0.0.1"] = 0
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 4)
|
|
|
|
lm.last_used_time_by_ip["172.0.0.2"] = 0
|
|
|
|
lm.last_used_time_by_ip["172.0.0.3"] = 0
|
|
|
|
autoscaler.update()
|
|
|
|
self.assertEqual(len(self.provider.nodes({})), 2)
|
|
|
|
|
|
|
|
def testRecoverUnhealthyWorkers(self):
|
|
|
|
config_path = self.write_config(SMALL_CLUSTER)
|
|
|
|
self.provider = MockProvider()
|
|
|
|
runner = MockProcessRunner()
|
|
|
|
lm = LoadMetrics()
|
|
|
|
autoscaler = StandardAutoscaler(
|
2018-04-11 10:11:35 -07:00
|
|
|
config_path,
|
|
|
|
lm,
|
|
|
|
max_failures=0,
|
|
|
|
process_runner=runner,
|
|
|
|
verbose_updates=True,
|
|
|
|
node_updater_cls=NodeUpdaterThread,
|
2017-12-31 14:39:57 -08:00
|
|
|
update_interval_s=0)
|
|
|
|
autoscaler.update()
|
|
|
|
for node in self.provider.mock_nodes.values():
|
|
|
|
node.state = "running"
|
|
|
|
autoscaler.update()
|
|
|
|
self.waitFor(
|
|
|
|
lambda: len(self.provider.nodes(
|
|
|
|
{TAG_RAY_NODE_STATUS: "Up-to-date"})) == 2)
|
|
|
|
|
|
|
|
# Mark a node as unhealthy
|
|
|
|
lm.last_heartbeat_time_by_ip["172.0.0.0"] = 0
|
|
|
|
num_calls = len(runner.calls)
|
|
|
|
autoscaler.update()
|
|
|
|
self.waitFor(lambda: len(runner.calls) > num_calls)
|
|
|
|
|
2018-03-18 00:59:13 +01:00
|
|
|
def testExternalNodeScaler(self):
|
|
|
|
config = SMALL_CLUSTER.copy()
|
|
|
|
config["provider"] = {
|
|
|
|
"type": "external",
|
|
|
|
"module": "ray.autoscaler.node_provider.NodeProvider",
|
2018-04-11 10:11:35 -07:00
|
|
|
}
|
2018-03-18 00:59:13 +01:00
|
|
|
config_path = self.write_config(config)
|
|
|
|
autoscaler = StandardAutoscaler(
|
|
|
|
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
|
|
|
|
self.assertIsInstance(autoscaler.provider, NodeProvider)
|
|
|
|
|
|
|
|
def testExternalNodeScalerWrongImport(self):
|
|
|
|
config = SMALL_CLUSTER.copy()
|
|
|
|
config["provider"] = {
|
|
|
|
"type": "external",
|
|
|
|
"module": "mymodule.provider_class",
|
2018-04-11 10:11:35 -07:00
|
|
|
}
|
2018-03-18 00:59:13 +01:00
|
|
|
invalid_provider = self.write_config(config)
|
|
|
|
self.assertRaises(
|
|
|
|
ImportError,
|
|
|
|
lambda: StandardAutoscaler(
|
|
|
|
invalid_provider, LoadMetrics(), update_interval_s=0))
|
|
|
|
|
|
|
|
def testExternalNodeScalerWrongModuleFormat(self):
|
|
|
|
config = SMALL_CLUSTER.copy()
|
|
|
|
config["provider"] = {
|
|
|
|
"type": "external",
|
|
|
|
"module": "does-not-exist",
|
2018-04-11 10:11:35 -07:00
|
|
|
}
|
2018-03-18 00:59:13 +01:00
|
|
|
invalid_provider = self.write_config(config)
|
|
|
|
self.assertRaises(
|
|
|
|
ValueError,
|
|
|
|
lambda: StandardAutoscaler(
|
|
|
|
invalid_provider, LoadMetrics(), update_interval_s=0))
|
|
|
|
|
2017-12-15 23:56:39 -08:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
unittest.main(verbosity=2)
|