Add external module as a node scaler. (#1703)

* WIP: add external module as a node scaler.

* Fix style.

* Add tests, fix style issues.

* Fix typos.

* Fix test error.

* Fix node provider path.

* Add function to spli pkg from class.

* Add doc.

* Correct documentation.

* Debugging....

* Debugging....

* Add __init__.py to tests.

* add more output for debugging

* Add more test, fix error with import.

* Add a small detail to the documentation.

* Update autoscaler.py
This commit is contained in:
Christian Barra 2018-03-18 00:59:13 +01:00 committed by Eric Liang
parent e3685fca5e
commit 070e27ea7a
5 changed files with 77 additions and 2 deletions

View file

@ -157,6 +157,21 @@ with GPU worker nodes instead.
MarketType: spot
InstanceType: p2.xlarge
External Node Provider
--------------------------
Ray also supports external node providers (check `node_provider.py <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/node_provider.py>`__ implementation).
You can specify the external node provider using the yaml config:
.. code-block:: yaml
provider:
type: external
module: mypackage.myclass
The module needs to be in the format `package.provider_class` or `package.sub_package.provider_class`.
Additional Cloud providers
--------------------------

View file

@ -53,8 +53,9 @@ CLUSTER_CONFIG_SCHEMA = {
# Cloud-provider specific configuration.
"provider": ({
"type": (str, REQUIRED), # e.g. aws
"region": (str, REQUIRED), # e.g. us-east-1
"availability_zone": (str, REQUIRED), # e.g. us-east-1a
"region": (str, OPTIONAL), # e.g. us-east-1
"availability_zone": (str, OPTIONAL), # e.g. us-east-1a
"module": (str, OPTIONAL), # module, if using external node provider
}, REQUIRED),
# How Ray will authenticate with newly launched nodes.

View file

@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import importlib
import os
import yaml
@ -25,6 +26,7 @@ NODE_PROVIDERS = {
"kubernetes": None,
"docker": None,
"local_cluster": None,
"external": None, # Import an external module
}
DEFAULT_CONFIGS = {
@ -37,8 +39,30 @@ DEFAULT_CONFIGS = {
}
def load_class(path):
"""
Load a class at runtime given a full path.
Example of the path: mypkg.mysubpkg.myclass
"""
class_data = path.split(".")
if len(class_data) < 2:
raise ValueError(
"You need to pass a valid path like mymodule.provider_class"
)
module_path = ".".join(class_data[:-1])
class_str = class_data[-1]
module = importlib.import_module(module_path)
return getattr(module, class_str)
def get_node_provider(provider_config, cluster_name):
if provider_config["type"] == "external":
provider_cls = load_class(path=provider_config["module"])
return provider_cls(provider_config, cluster_name)
importer = NODE_PROVIDERS.get(provider_config["type"])
if importer is None:
raise NotImplementedError(
"Unsupported node provider: {}".format(provider_config["type"]))

0
test/__init__.py Normal file
View file

View file

@ -507,6 +507,41 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitFor(lambda: len(runner.calls) > num_calls)
def testExternalNodeScaler(self):
config = SMALL_CLUSTER.copy()
config["provider"] = {
"type": "external",
"module": "ray.autoscaler.node_provider.NodeProvider",
}
config_path = self.write_config(config)
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
self.assertIsInstance(autoscaler.provider, NodeProvider)
def testExternalNodeScalerWrongImport(self):
config = SMALL_CLUSTER.copy()
config["provider"] = {
"type": "external",
"module": "mymodule.provider_class",
}
invalid_provider = self.write_config(config)
self.assertRaises(
ImportError,
lambda: StandardAutoscaler(
invalid_provider, LoadMetrics(), update_interval_s=0))
def testExternalNodeScalerWrongModuleFormat(self):
config = SMALL_CLUSTER.copy()
config["provider"] = {
"type": "external",
"module": "does-not-exist",
}
invalid_provider = self.write_config(config)
self.assertRaises(
ValueError,
lambda: StandardAutoscaler(
invalid_provider, LoadMetrics(), update_interval_s=0))
if __name__ == "__main__":
unittest.main(verbosity=2)