2020-07-27 11:34:47 +08:00
import abc
import asyncio
import collections
import datetime
import functools
import importlib
import inspect
2021-01-22 12:10:01 +08:00
import json
2020-07-27 11:34:47 +08:00
import logging
2021-01-22 12:10:01 +08:00
import os
2020-07-27 11:34:47 +08:00
import pkgutil
2021-01-22 12:10:01 +08:00
import socket
2021-08-21 03:04:21 -05:00
import time
2020-07-27 11:34:47 +08:00
import traceback
2020-10-28 14:49:31 +08:00
from abc import ABCMeta, abstractmethod
2021-01-22 12:10:01 +08:00
from base64 import b64decode
2020-08-30 14:09:34 +08:00
from collections import namedtuple
2021-01-22 12:10:01 +08:00
from collections.abc import MutableMapping, Mapping, Sequence
2020-08-25 04:24:23 +08:00
from typing import Any
2020-07-27 11:34:47 +08:00
from google.protobuf.json_format import MessageToDict
2021-01-22 12:10:01 +08:00
2021-09-15 11:17:15 -05:00
import ray.dashboard.consts as dashboard_consts
2020-10-10 13:27:05 +08:00
from ray.ray_constants import env_bool
2021-03-10 23:47:28 -07:00
from ray._private.utils import binary_to_hex
2020-10-10 13:27:05 +08:00
2021-08-21 03:04:21 -05:00
# All third-party dependencies that are not included in the minimal Ray
# installation must be included in this file. This allows us to determine if
# the agent has the necessary dependencies to be started.
2021-09-15 11:17:15 -05:00
from ray.dashboard.optional_deps import (aiohttp, aioredis, hdrs, FrozenList,
PathLike, RouteDef)
2021-08-21 03:04:21 -05:00
2020-10-10 13:27:05 +08:00
create_task = asyncio.create_task
except AttributeError:
create_task = asyncio.ensure_future
2020-07-27 11:34:47 +08:00
logger = logging.getLogger(__name__)
class DashboardAgentModule(abc.ABC):
def __init__(self, dashboard_agent):
Initialize current module when DashboardAgent loading modules.
:param dashboard_agent: The DashboardAgent instance.
self._dashboard_agent = dashboard_agent
async def run(self, server):
Run the module in an asyncio loop. An agent module can provide
servicers to the server.
:param server: Asyncio GRPC server.
class DashboardHeadModule(abc.ABC):
def __init__(self, dashboard_head):
Initialize current module when DashboardHead loading modules.
:param dashboard_head: The DashboardHead instance.
self._dashboard_head = dashboard_head
2020-08-25 04:24:23 +08:00
async def run(self, server):
2020-07-27 11:34:47 +08:00
2020-08-25 04:24:23 +08:00
Run the module in an asyncio loop. A head module can provide
servicers to the server.
:param server: Asyncio GRPC server.
2020-07-27 11:34:47 +08:00
class ClassMethodRouteTable:
"""A helper class to bind http route to class method."""
_bind_map = collections.defaultdict(dict)
_routes = aiohttp.web.RouteTableDef()
class _BindInfo:
def __init__(self, filename, lineno, instance):
self.filename = filename
self.lineno = lineno
self.instance = instance
def routes(cls):
return cls._routes
2020-08-25 04:24:23 +08:00
def bound_routes(cls):
bound_items = []
for r in cls._routes._items:
if isinstance(r, RouteDef):
route_method = getattr(r.handler, "__route_method__")
route_path = getattr(r.handler, "__route_path__")
instance = cls._bind_map[route_method][route_path].instance
if instance is not None:
routes = aiohttp.web.RouteTableDef()
routes._items = bound_items
return routes
2020-07-27 11:34:47 +08:00
def _register_route(cls, method, path, **kwargs):
def _wrapper(handler):
if path in cls._bind_map[method]:
bind_info = cls._bind_map[method][path]
2020-08-30 14:09:34 +08:00
raise Exception(f"Duplicated route path: {path}, "
f"previous one registered at "
2020-07-27 11:34:47 +08:00
bind_info = cls._BindInfo(handler.__code__.co_filename,
handler.__code__.co_firstlineno, None)
2020-10-10 13:27:05 +08:00
async def _handler_route(*args) -> aiohttp.web.Response:
2020-07-27 11:34:47 +08:00
2020-10-10 13:27:05 +08:00
# Make the route handler as a bound method.
# The args may be:
# * (Request, )
# * (self, Request)
req = args[-1]
return await handler(bind_info.instance, req)
2020-07-27 11:34:47 +08:00
except Exception:
2020-10-28 14:49:31 +08:00
logger.exception("Handle %s %s failed.", method, path)
2020-10-10 13:27:05 +08:00
return rest_response(
2020-07-27 11:34:47 +08:00
success=False, message=traceback.format_exc())
cls._bind_map[method][path] = bind_info
_handler_route.__route_method__ = method
_handler_route.__route_path__ = path
return cls._routes.route(method, path, **kwargs)(_handler_route)
return _wrapper
def head(cls, path, **kwargs):
return cls._register_route(hdrs.METH_HEAD, path, **kwargs)
def get(cls, path, **kwargs):
return cls._register_route(hdrs.METH_GET, path, **kwargs)
def post(cls, path, **kwargs):
return cls._register_route(hdrs.METH_POST, path, **kwargs)
def put(cls, path, **kwargs):
return cls._register_route(hdrs.METH_PUT, path, **kwargs)
def patch(cls, path, **kwargs):
return cls._register_route(hdrs.METH_PATCH, path, **kwargs)
def delete(cls, path, **kwargs):
return cls._register_route(hdrs.METH_DELETE, path, **kwargs)
def view(cls, path, **kwargs):
return cls._register_route(hdrs.METH_ANY, path, **kwargs)
2020-08-25 04:24:23 +08:00
def static(cls, prefix: str, path: PathLike, **kwargs: Any) -> None:
cls._routes.static(prefix, path, **kwargs)
2020-07-27 11:34:47 +08:00
def bind(cls, instance):
def predicate(o):
if inspect.ismethod(o):
return hasattr(o, "__route_method__") and hasattr(
o, "__route_path__")
return False
handler_routes = inspect.getmembers(instance, predicate)
for _, h in handler_routes:
h.__func__.__route_path__].instance = instance
2020-08-30 14:09:34 +08:00
def dashboard_module(enable):
"""A decorator for dashboard module."""
def _cls_wrapper(cls):
cls.__ray_dashboard_module_enable__ = enable
return cls
return _cls_wrapper
2020-07-27 11:34:47 +08:00
def get_all_modules(module_type):
2020-08-30 14:09:34 +08:00
logger.info(f"Get all modules by type: {module_type.__name__}")
2021-09-15 11:17:15 -05:00
import ray.dashboard.modules
2020-07-27 11:34:47 +08:00
for module_loader, name, ispkg in pkgutil.walk_packages(
2021-09-15 11:17:15 -05:00
ray.dashboard.modules.__name__ + "."):
2020-07-27 11:34:47 +08:00
2020-08-30 14:09:34 +08:00
return [
m for m in module_type.__subclasses__()
if getattr(m, "__ray_dashboard_module_enable__", True)
2020-07-27 11:34:47 +08:00
def to_posix_time(dt):
return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
2020-08-25 04:24:23 +08:00
def address_tuple(address):
if isinstance(address, tuple):
return address
ip, port = address.split(":")
return ip, int(port)
2020-07-27 11:34:47 +08:00
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return binary_to_hex(obj)
2020-10-28 14:49:31 +08:00
if isinstance(obj, Immutable):
return obj.mutable()
2020-07-27 11:34:47 +08:00
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
2021-10-23 10:48:16 -07:00
def rest_response(success, message, convert_google_style=True,
**kwargs) -> aiohttp.web.Response:
2020-10-01 23:48:03 -07:00
# In the dev context we allow a dev server running on a
# different port to consume the API, meaning we need to allow
# cross-origin access
if os.environ.get("RAY_DASHBOARD_DEV") == "1":
headers = {"Access-Control-Allow-Origin": "*"}
headers = {}
2020-07-27 11:34:47 +08:00
return aiohttp.web.json_response(
"result": success,
"msg": message,
2021-10-23 10:48:16 -07:00
"data": to_google_style(kwargs) if convert_google_style else kwargs
2020-07-27 11:34:47 +08:00
2020-10-01 23:48:03 -07:00
dumps=functools.partial(json.dumps, cls=CustomEncoder),
2020-07-27 11:34:47 +08:00
def to_camel_case(snake_str):
"""Convert a snake str to camel case."""
components = snake_str.split("_")
# We capitalize the first letter of each component except the first one
# with the 'title' method and join them together.
return components[0] + "".join(x.title() for x in components[1:])
def to_google_style(d):
"""Recursive convert all keys in dict to google style."""
new_dict = {}
2020-10-01 23:48:03 -07:00
2020-07-27 11:34:47 +08:00
for k, v in d.items():
if isinstance(v, dict):
new_dict[to_camel_case(k)] = to_google_style(v)
elif isinstance(v, list):
new_list = []
for i in v:
if isinstance(i, dict):
new_dict[to_camel_case(k)] = new_list
new_dict[to_camel_case(k)] = v
return new_dict
def message_to_dict(message, decode_keys=None, **kwargs):
"""Convert protobuf message to Python dict."""
def _decode_keys(d):
for k, v in d.items():
if isinstance(v, dict):
d[k] = _decode_keys(v)
if isinstance(v, list):
new_list = []
for i in v:
if isinstance(i, dict):
d[k] = new_list
if k in decode_keys:
d[k] = binary_to_hex(b64decode(v))
d[k] = v
return d
if decode_keys:
return _decode_keys(
MessageToDict(message, use_integers_for_enums=False, **kwargs))
return MessageToDict(message, use_integers_for_enums=False, **kwargs)
2020-10-10 13:27:05 +08:00
# The cache value type used by aiohttp_cache.
_AiohttpCacheValue = namedtuple("AiohttpCacheValue",
["data", "expiration", "task"])
# The methods with no request body used by aiohttp_cache.
def aiohttp_cache(
enable=not env_bool(
assert maxsize > 0
cache = collections.OrderedDict()
def _wrapper(handler):
if enable:
async def _cache_handler(*args) -> aiohttp.web.Response:
# Make the route handler as a bound method.
# The args may be:
# * (Request, )
# * (self, Request)
req = args[-1]
# Make key.
key = req.path_qs
key = (req.path_qs, await req.read())
# Query cache.
value = cache.get(key)
if value is not None:
if (not value.task.done()
or value.expiration >= time.time()):
# Update task not done or the data is not expired.
return aiohttp.web.Response(**value.data)
def _update_cache(task):
response = task.result()
except Exception:
response = rest_response(
success=False, message=traceback.format_exc())
data = {
"status": response.status,
"headers": dict(response.headers),
"body": response.body,
cache[key] = _AiohttpCacheValue(data,
time.time() + ttl_seconds,
if len(cache) > maxsize:
return response
task = create_task(handler(*args))
if value is None:
return await task
return aiohttp.web.Response(**value.data)
suffix = f"[cache ttl={ttl_seconds}, max_size={maxsize}]"
_cache_handler.__name__ += suffix
_cache_handler.__qualname__ += suffix
return _cache_handler
return handler
if inspect.iscoroutinefunction(ttl_seconds):
target_func = ttl_seconds
ttl_seconds = dashboard_consts.AIOHTTP_CACHE_TTL_SECONDS
return _wrapper(target_func)
return _wrapper
2020-07-27 11:34:47 +08:00
class SignalManager:
_signals = FrozenList()
def register(cls, sig):
def freeze(cls):
for sig in cls._signals:
class Signal(aiohttp.signals.Signal):
__slots__ = ()
def __init__(self, owner):
class Bunch(dict):
"""A dict with attribute-access."""
def __getattr__(self, key):
return self.__getitem__(key)
except KeyError:
raise AttributeError(key)
def __setattr__(self, key, value):
self.__setitem__(key, value)
class Change:
"""Notify change object."""
def __init__(self, owner=None, old=None, new=None):
self.owner = owner
self.old = old
self.new = new
def __str__(self):
2020-10-28 14:49:31 +08:00
return f"Change(owner: {type(self.owner)}), " \
f"old: {self.old}, new: {self.new}"
2020-07-27 11:34:47 +08:00
class NotifyQueue:
"""Asyncio notify queue for Dict signal."""
_queue = asyncio.Queue()
def put(cls, co):
async def get(cls):
return await cls._queue.get()
2020-10-28 14:49:31 +08:00
| Python | JSON |
| dict | object |
| list, tuple | array |
| str | string |
| int, float | number |
| True | true |
| False | false |
| None | null |
_json_compatible_types = {
dict, list, tuple, str, int, float, bool,
type(None), bytes
def is_immutable(self):
raise TypeError("%r objects are immutable" % self.__class__.__name__)
def make_immutable(value, strict=True):
value_type = type(value)
if value_type is dict:
return ImmutableDict(value)
if value_type is list:
return ImmutableList(value)
if strict:
if value_type not in _json_compatible_types:
raise TypeError("Type {} can't be immutable.".format(value_type))
return value
class Immutable(metaclass=ABCMeta):
def mutable(self):
class ImmutableList(Immutable, Sequence):
"""Makes a :class:`list` immutable.
__slots__ = ("_list", "_proxy")
def __init__(self, list_value):
if type(list_value) not in (list, ImmutableList):
raise TypeError(f"{type(list_value)} object is not a list.")
if isinstance(list_value, ImmutableList):
list_value = list_value.mutable()
self._list = list_value
self._proxy = [None] * len(list_value)
def __reduce_ex__(self, protocol):
return type(self), (self._list, )
def mutable(self):
return self._list
def __eq__(self, other):
if isinstance(other, ImmutableList):
other = other.mutable()
return list.__eq__(self._list, other)
def __ne__(self, other):
if isinstance(other, ImmutableList):
other = other.mutable()
return list.__ne__(self._list, other)
def __contains__(self, item):
if isinstance(item, Immutable):
item = item.mutable()
return list.__contains__(self._list, item)
def __getitem__(self, item):
proxy = self._proxy[item]
if proxy is None:
proxy = self._proxy[item] = make_immutable(self._list[item])
return proxy
def __len__(self):
return len(self._list)
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, list.__repr__(self._list))
class ImmutableDict(Immutable, Mapping):
"""Makes a :class:`dict` immutable.
__slots__ = ("_dict", "_proxy")
def __init__(self, dict_value):
if type(dict_value) not in (dict, ImmutableDict):
raise TypeError(f"{type(dict_value)} object is not a dict.")
if isinstance(dict_value, ImmutableDict):
dict_value = dict_value.mutable()
self._dict = dict_value
self._proxy = {}
def __reduce_ex__(self, protocol):
return type(self), (self._dict, )
def mutable(self):
return self._dict
def get(self, key, default=None):
return self[key]
except KeyError:
return make_immutable(default)
def __eq__(self, other):
if isinstance(other, ImmutableDict):
other = other.mutable()
return dict.__eq__(self._dict, other)
def __ne__(self, other):
if isinstance(other, ImmutableDict):
other = other.mutable()
return dict.__ne__(self._dict, other)
def __contains__(self, item):
if isinstance(item, Immutable):
item = item.mutable()
return dict.__contains__(self._dict, item)
def __getitem__(self, item):
proxy = self._proxy.get(item, None)
if proxy is None:
proxy = self._proxy[item] = make_immutable(self._dict[item])
return proxy
def __len__(self) -> int:
return len(self._dict)
def __iter__(self):
if len(self._proxy) != len(self._dict):
for key in self._dict.keys() - self._proxy.keys():
self._proxy[key] = make_immutable(self._dict[key])
return iter(self._proxy)
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, dict.__repr__(self._dict))
class Dict(ImmutableDict, MutableMapping):
2020-07-27 11:34:47 +08:00
"""A simple descriptor for dict type to notify data changes.
:note: Only the first level data report change.
2020-08-30 14:09:34 +08:00
ChangeItem = namedtuple("DictChangeItem", ["key", "value"])
2020-07-27 11:34:47 +08:00
def __init__(self, *args, **kwargs):
2020-10-28 14:49:31 +08:00
super().__init__(dict(*args, **kwargs))
2020-07-27 11:34:47 +08:00
self.signal = Signal(self)
def __setitem__(self, key, value):
2020-10-28 14:49:31 +08:00
old = self._dict.pop(key, None)
self._proxy.pop(key, None)
self._dict[key] = value
2020-07-27 11:34:47 +08:00
if len(self.signal) and old != value:
if old is None:
2020-08-30 14:09:34 +08:00
co = self.signal.send(
Change(owner=self, new=Dict.ChangeItem(key, value)))
2020-07-27 11:34:47 +08:00
co = self.signal.send(
2020-08-30 14:09:34 +08:00
old=Dict.ChangeItem(key, old),
new=Dict.ChangeItem(key, value)))
2020-07-27 11:34:47 +08:00
def __delitem__(self, key):
2020-10-28 14:49:31 +08:00
old = self._dict.pop(key, None)
self._proxy.pop(key, None)
2020-07-27 11:34:47 +08:00
if len(self.signal) and old is not None:
2020-08-30 14:09:34 +08:00
co = self.signal.send(
Change(owner=self, old=Dict.ChangeItem(key, old)))
2020-07-27 11:34:47 +08:00
def reset(self, d):
assert isinstance(d, Mapping)
2020-10-28 14:49:31 +08:00
for key in self._dict.keys() - d.keys():
del self[key]
for key, value in d.items():
self[key] = value
# Register immutable types.
for immutable_type in Immutable.__subclasses__():
2020-08-25 04:24:23 +08:00
async def get_aioredis_client(redis_address, redis_password,
retry_interval_seconds, retry_times):
for x in range(retry_times):
return await aioredis.create_redis_pool(
address=redis_address, password=redis_password)
except (socket.gaierror, ConnectionError) as ex:
logger.error("Connect to Redis failed: %s, retry...", ex)
await asyncio.sleep(retry_interval_seconds)
# Raise exception from create_redis_pool
return await aioredis.create_redis_pool(
address=redis_address, password=redis_password)
2020-08-30 14:09:34 +08:00
2021-07-17 21:59:04 -07:00
def async_loop_forever(interval_seconds, cancellable=False):
2020-08-30 14:09:34 +08:00
def _wrapper(coro):
async def _looper(*args, **kwargs):
while True:
await coro(*args, **kwargs)
2021-07-17 21:59:04 -07:00
except asyncio.CancelledError as ex:
if cancellable:
logger.info(f"An async loop forever coroutine "
f"is cancelled {coro}.")
raise ex
logger.exception(f"Can not cancel the async loop "
f"forever coroutine {coro}.")
2020-08-30 14:09:34 +08:00
except Exception:
logger.exception(f"Error looping coroutine {coro}.")
await asyncio.sleep(interval_seconds)
return _looper
return _wrapper