2021-06-25 19:39:05 +08:00
|
|
|
import asyncio
|
2021-06-09 20:22:25 +08:00
|
|
|
import json
|
|
|
|
import logging
|
2021-07-26 12:37:40 -07:00
|
|
|
from typing import Dict
|
2021-06-09 20:22:25 +08:00
|
|
|
|
|
|
|
from ray.core.generated import runtime_env_agent_pb2
|
|
|
|
from ray.core.generated import runtime_env_agent_pb2_grpc
|
|
|
|
from ray.core.generated import agent_manager_pb2
|
|
|
|
import ray.new_dashboard.utils as dashboard_utils
|
2021-06-25 19:39:05 +08:00
|
|
|
import ray.new_dashboard.modules.runtime_env.runtime_env_consts \
|
|
|
|
as runtime_env_consts
|
2021-06-09 20:22:25 +08:00
|
|
|
import ray._private.runtime_env as runtime_env
|
2021-06-25 19:39:05 +08:00
|
|
|
from ray._private.utils import import_attr
|
|
|
|
from ray.workers.pluggable_runtime_env import RuntimeEnvContext
|
2021-06-09 20:22:25 +08:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
|
|
|
|
runtime_env_agent_pb2_grpc.RuntimeEnvServiceServicer):
|
|
|
|
"""A rpc server to create or delete runtime envs.
|
|
|
|
|
|
|
|
Attributes:
|
|
|
|
dashboard_agent: The DashboardAgent object contains global config.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, dashboard_agent):
|
|
|
|
super().__init__(dashboard_agent)
|
2021-06-25 19:39:05 +08:00
|
|
|
self._session_dir = dashboard_agent.session_dir
|
2021-06-09 20:22:25 +08:00
|
|
|
self._runtime_env_dir = dashboard_agent.runtime_env_dir
|
2021-06-25 19:39:05 +08:00
|
|
|
self._setup = import_attr(dashboard_agent.runtime_env_setup_hook)
|
2021-06-09 20:22:25 +08:00
|
|
|
runtime_env.PKG_DIR = dashboard_agent.runtime_env_dir
|
2021-07-26 12:37:40 -07:00
|
|
|
# Maps a serialized runtime env dict to the serialized
|
|
|
|
# RuntimeEnvContext arising from the creation of the env.
|
|
|
|
self._created_env_cache: Dict[str, str] = dict()
|
2021-06-09 20:22:25 +08:00
|
|
|
|
|
|
|
async def CreateRuntimeEnv(self, request, context):
|
2021-06-25 19:39:05 +08:00
|
|
|
async def _setup_runtime_env(serialized_runtime_env, session_dir):
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
runtime_env: dict = json.loads(serialized_runtime_env or "{}")
|
|
|
|
return await loop.run_in_executor(None, self._setup, runtime_env,
|
|
|
|
session_dir)
|
|
|
|
|
2021-07-26 12:37:40 -07:00
|
|
|
serialized_env = request.serialized_runtime_env
|
|
|
|
if serialized_env in self._created_env_cache:
|
|
|
|
serialized_context = self._created_env_cache[serialized_env]
|
|
|
|
logger.info("Runtime env already created. Env: %s, context: %s",
|
|
|
|
serialized_env,
|
|
|
|
self._created_env_cache[serialized_env])
|
|
|
|
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
|
|
|
|
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
|
|
|
|
serialized_runtime_env_context=serialized_context)
|
|
|
|
|
2021-06-25 19:39:05 +08:00
|
|
|
logger.info("Creating runtime env: %s.",
|
|
|
|
request.serialized_runtime_env)
|
2021-06-09 20:22:25 +08:00
|
|
|
runtime_env_dict = json.loads(request.serialized_runtime_env or "{}")
|
|
|
|
uris = runtime_env_dict.get("uris")
|
2021-06-25 19:39:05 +08:00
|
|
|
runtime_env_context: RuntimeEnvContext = None
|
|
|
|
error_message = None
|
|
|
|
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES):
|
|
|
|
try:
|
|
|
|
if uris:
|
|
|
|
# TODO(guyang.sgy): Try `ensure_runtime_env_setup(uris)`
|
|
|
|
# to download packages.
|
|
|
|
# But we don't initailize internal kv in agent now.
|
|
|
|
pass
|
|
|
|
runtime_env_context = await _setup_runtime_env(
|
|
|
|
request.serialized_runtime_env, self._session_dir)
|
|
|
|
break
|
|
|
|
except Exception as ex:
|
|
|
|
logger.exception("Runtime env creation failed.")
|
|
|
|
error_message = str(ex)
|
|
|
|
await asyncio.sleep(
|
|
|
|
runtime_env_consts.RUNTIME_ENV_RETRY_INTERVAL_MS / 1000)
|
|
|
|
if error_message:
|
|
|
|
logger.error(
|
|
|
|
"Runtime env creation failed for %d times, "
|
|
|
|
"don't retry any more.",
|
|
|
|
runtime_env_consts.RUNTIME_ENV_RETRY_TIMES)
|
|
|
|
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
|
|
|
|
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
|
|
|
|
error_message=error_message)
|
2021-06-09 20:22:25 +08:00
|
|
|
|
2021-06-25 19:39:05 +08:00
|
|
|
serialized_context = runtime_env_context.serialize()
|
2021-07-26 12:37:40 -07:00
|
|
|
self._created_env_cache[serialized_env] = serialized_context
|
2021-06-25 19:39:05 +08:00
|
|
|
logger.info("Successfully created runtime env: %s, the context: %s",
|
|
|
|
request.serialized_runtime_env, serialized_context)
|
2021-06-09 20:22:25 +08:00
|
|
|
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
|
2021-06-25 19:39:05 +08:00
|
|
|
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
|
|
|
|
serialized_runtime_env_context=serialized_context)
|
2021-06-09 20:22:25 +08:00
|
|
|
|
|
|
|
async def DeleteRuntimeEnv(self, request, context):
|
|
|
|
# TODO(guyang.sgy): Delete runtime env local files.
|
|
|
|
return runtime_env_agent_pb2.DeleteRuntimeEnvReply(
|
|
|
|
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
|
|
|
|
error_message="Not implemented.")
|
|
|
|
|
|
|
|
async def run(self, server):
|
|
|
|
runtime_env_agent_pb2_grpc.add_RuntimeEnvServiceServicer_to_server(
|
|
|
|
self, server)
|