mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[runtime env] Download runtime env(conda) in agent instead of setup_worker (#16525)
This commit is contained in:
parent
2e3771cc29
commit
e74d9d3ded
16 changed files with 223 additions and 58 deletions
|
@ -45,7 +45,9 @@ class DashboardAgent(object):
|
||||||
dashboard_agent_port,
|
dashboard_agent_port,
|
||||||
redis_password=None,
|
redis_password=None,
|
||||||
temp_dir=None,
|
temp_dir=None,
|
||||||
|
session_dir=None,
|
||||||
runtime_env_dir=None,
|
runtime_env_dir=None,
|
||||||
|
runtime_env_setup_hook=None,
|
||||||
log_dir=None,
|
log_dir=None,
|
||||||
metrics_export_port=None,
|
metrics_export_port=None,
|
||||||
node_manager_port=None,
|
node_manager_port=None,
|
||||||
|
@ -57,7 +59,9 @@ class DashboardAgent(object):
|
||||||
self.redis_address = dashboard_utils.address_tuple(redis_address)
|
self.redis_address = dashboard_utils.address_tuple(redis_address)
|
||||||
self.redis_password = redis_password
|
self.redis_password = redis_password
|
||||||
self.temp_dir = temp_dir
|
self.temp_dir = temp_dir
|
||||||
|
self.session_dir = session_dir
|
||||||
self.runtime_env_dir = runtime_env_dir
|
self.runtime_env_dir = runtime_env_dir
|
||||||
|
self.runtime_env_setup_hook = runtime_env_setup_hook
|
||||||
self.log_dir = log_dir
|
self.log_dir = log_dir
|
||||||
self.dashboard_agent_port = dashboard_agent_port
|
self.dashboard_agent_port = dashboard_agent_port
|
||||||
self.metrics_export_port = metrics_export_port
|
self.metrics_export_port = metrics_export_port
|
||||||
|
@ -291,12 +295,25 @@ if __name__ == "__main__":
|
||||||
type=str,
|
type=str,
|
||||||
default=None,
|
default=None,
|
||||||
help="Specify the path of the temporary directory use by Ray process.")
|
help="Specify the path of the temporary directory use by Ray process.")
|
||||||
|
parser.add_argument(
|
||||||
|
"--session-dir",
|
||||||
|
required=True,
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Specify the path of this session.")
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--runtime-env-dir",
|
"--runtime-env-dir",
|
||||||
required=True,
|
required=True,
|
||||||
type=str,
|
type=str,
|
||||||
default=None,
|
default=None,
|
||||||
help="Specify the path of the resource directory used by runtime_env.")
|
help="Specify the path of the resource directory used by runtime_env.")
|
||||||
|
parser.add_argument(
|
||||||
|
"--runtime-env-setup-hook",
|
||||||
|
required=True,
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="The module path to a Python function that"
|
||||||
|
"will be imported and run to set up the runtime env.")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
try:
|
try:
|
||||||
|
@ -324,7 +341,9 @@ if __name__ == "__main__":
|
||||||
args.dashboard_agent_port,
|
args.dashboard_agent_port,
|
||||||
redis_password=args.redis_password,
|
redis_password=args.redis_password,
|
||||||
temp_dir=args.temp_dir,
|
temp_dir=args.temp_dir,
|
||||||
|
session_dir=args.session_dir,
|
||||||
runtime_env_dir=args.runtime_env_dir,
|
runtime_env_dir=args.runtime_env_dir,
|
||||||
|
runtime_env_setup_hook=args.runtime_env_setup_hook,
|
||||||
log_dir=args.log_dir,
|
log_dir=args.log_dir,
|
||||||
metrics_export_port=args.metrics_export_port,
|
metrics_export_port=args.metrics_export_port,
|
||||||
node_manager_port=args.node_manager_port,
|
node_manager_port=args.node_manager_port,
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
@ -5,7 +6,11 @@ from ray.core.generated import runtime_env_agent_pb2
|
||||||
from ray.core.generated import runtime_env_agent_pb2_grpc
|
from ray.core.generated import runtime_env_agent_pb2_grpc
|
||||||
from ray.core.generated import agent_manager_pb2
|
from ray.core.generated import agent_manager_pb2
|
||||||
import ray.new_dashboard.utils as dashboard_utils
|
import ray.new_dashboard.utils as dashboard_utils
|
||||||
|
import ray.new_dashboard.modules.runtime_env.runtime_env_consts \
|
||||||
|
as runtime_env_consts
|
||||||
import ray._private.runtime_env as runtime_env
|
import ray._private.runtime_env as runtime_env
|
||||||
|
from ray._private.utils import import_attr
|
||||||
|
from ray.workers.pluggable_runtime_env import RuntimeEnvContext
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -20,21 +25,54 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
|
||||||
|
|
||||||
def __init__(self, dashboard_agent):
|
def __init__(self, dashboard_agent):
|
||||||
super().__init__(dashboard_agent)
|
super().__init__(dashboard_agent)
|
||||||
|
self._session_dir = dashboard_agent.session_dir
|
||||||
self._runtime_env_dir = dashboard_agent.runtime_env_dir
|
self._runtime_env_dir = dashboard_agent.runtime_env_dir
|
||||||
|
self._setup = import_attr(dashboard_agent.runtime_env_setup_hook)
|
||||||
runtime_env.PKG_DIR = dashboard_agent.runtime_env_dir
|
runtime_env.PKG_DIR = dashboard_agent.runtime_env_dir
|
||||||
|
|
||||||
async def CreateRuntimeEnv(self, request, context):
|
async def CreateRuntimeEnv(self, request, context):
|
||||||
|
async def _setup_runtime_env(serialized_runtime_env, session_dir):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
runtime_env: dict = json.loads(serialized_runtime_env or "{}")
|
||||||
|
return await loop.run_in_executor(None, self._setup, runtime_env,
|
||||||
|
session_dir)
|
||||||
|
|
||||||
|
logger.info("Creating runtime env: %s.",
|
||||||
|
request.serialized_runtime_env)
|
||||||
runtime_env_dict = json.loads(request.serialized_runtime_env or "{}")
|
runtime_env_dict = json.loads(request.serialized_runtime_env or "{}")
|
||||||
uris = runtime_env_dict.get("uris")
|
uris = runtime_env_dict.get("uris")
|
||||||
if uris:
|
runtime_env_context: RuntimeEnvContext = None
|
||||||
logger.info("Creating runtime env with uris %s", repr(uris))
|
error_message = None
|
||||||
# TODO(guyang.sgy): Try `ensure_runtime_env_setup(uris)`
|
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES):
|
||||||
# to download packages.
|
try:
|
||||||
# But we don't initailize internal kv in agent now.
|
if uris:
|
||||||
pass
|
# TODO(guyang.sgy): Try `ensure_runtime_env_setup(uris)`
|
||||||
|
# to download packages.
|
||||||
|
# But we don't initailize internal kv in agent now.
|
||||||
|
pass
|
||||||
|
runtime_env_context = await _setup_runtime_env(
|
||||||
|
request.serialized_runtime_env, self._session_dir)
|
||||||
|
break
|
||||||
|
except Exception as ex:
|
||||||
|
logger.exception("Runtime env creation failed.")
|
||||||
|
error_message = str(ex)
|
||||||
|
await asyncio.sleep(
|
||||||
|
runtime_env_consts.RUNTIME_ENV_RETRY_INTERVAL_MS / 1000)
|
||||||
|
if error_message:
|
||||||
|
logger.error(
|
||||||
|
"Runtime env creation failed for %d times, "
|
||||||
|
"don't retry any more.",
|
||||||
|
runtime_env_consts.RUNTIME_ENV_RETRY_TIMES)
|
||||||
|
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
|
||||||
|
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
|
||||||
|
error_message=error_message)
|
||||||
|
|
||||||
|
serialized_context = runtime_env_context.serialize()
|
||||||
|
logger.info("Successfully created runtime env: %s, the context: %s",
|
||||||
|
request.serialized_runtime_env, serialized_context)
|
||||||
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
|
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
|
||||||
status=agent_manager_pb2.AGENT_RPC_STATUS_OK)
|
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
|
||||||
|
serialized_runtime_env_context=serialized_context)
|
||||||
|
|
||||||
async def DeleteRuntimeEnv(self, request, context):
|
async def DeleteRuntimeEnv(self, request, context):
|
||||||
# TODO(guyang.sgy): Delete runtime env local files.
|
# TODO(guyang.sgy): Delete runtime env local files.
|
||||||
|
|
7
dashboard/modules/runtime_env/runtime_env_consts.py
Normal file
7
dashboard/modules/runtime_env/runtime_env_consts.py
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
import ray.ray_constants as ray_constants
|
||||||
|
|
||||||
|
RUNTIME_ENV_RETRY_TIMES = ray_constants.env_integer("RUNTIME_ENV_RETRY_TIMES",
|
||||||
|
3)
|
||||||
|
|
||||||
|
RUNTIME_ENV_RETRY_INTERVAL_MS = ray_constants.env_integer(
|
||||||
|
"RUNTIME_ENV_RETRY_INTERVAL_MS", 1000)
|
|
@ -72,6 +72,8 @@ class RayParams:
|
||||||
worker_setup_hook to set up the environment for the worker process.
|
worker_setup_hook to set up the environment for the worker process.
|
||||||
worker_setup_hook (str): The module path to a Python function that will
|
worker_setup_hook (str): The module path to a Python function that will
|
||||||
be imported and run to set up the environment for the worker.
|
be imported and run to set up the environment for the worker.
|
||||||
|
runtime_env_setup_hook (str): The module path to a Python function that
|
||||||
|
will be imported and run to set up the runtime env in agent.
|
||||||
huge_pages: Boolean flag indicating whether to start the Object
|
huge_pages: Boolean flag indicating whether to start the Object
|
||||||
Store with hugetlbfs support. Requires plasma_directory.
|
Store with hugetlbfs support. Requires plasma_directory.
|
||||||
include_dashboard: Boolean flag indicating whether to start the web
|
include_dashboard: Boolean flag indicating whether to start the web
|
||||||
|
@ -141,6 +143,8 @@ class RayParams:
|
||||||
worker_path=None,
|
worker_path=None,
|
||||||
setup_worker_path=None,
|
setup_worker_path=None,
|
||||||
worker_setup_hook=ray_constants.DEFAULT_WORKER_SETUP_HOOK,
|
worker_setup_hook=ray_constants.DEFAULT_WORKER_SETUP_HOOK,
|
||||||
|
runtime_env_setup_hook=ray_constants.
|
||||||
|
DEFAULT_RUNTIME_ENV_SETUP_HOOK,
|
||||||
huge_pages=False,
|
huge_pages=False,
|
||||||
include_dashboard=None,
|
include_dashboard=None,
|
||||||
dashboard_host=ray_constants.DEFAULT_DASHBOARD_IP,
|
dashboard_host=ray_constants.DEFAULT_DASHBOARD_IP,
|
||||||
|
@ -189,6 +193,7 @@ class RayParams:
|
||||||
self.worker_path = worker_path
|
self.worker_path = worker_path
|
||||||
self.setup_worker_path = setup_worker_path
|
self.setup_worker_path = setup_worker_path
|
||||||
self.worker_setup_hook = worker_setup_hook
|
self.worker_setup_hook = worker_setup_hook
|
||||||
|
self.runtime_env_setup_hook = runtime_env_setup_hook
|
||||||
self.huge_pages = huge_pages
|
self.huge_pages = huge_pages
|
||||||
self.include_dashboard = include_dashboard
|
self.include_dashboard = include_dashboard
|
||||||
self.dashboard_host = dashboard_host
|
self.dashboard_host = dashboard_host
|
||||||
|
|
|
@ -1384,6 +1384,7 @@ def start_raylet(redis_address,
|
||||||
worker_path,
|
worker_path,
|
||||||
setup_worker_path,
|
setup_worker_path,
|
||||||
worker_setup_hook,
|
worker_setup_hook,
|
||||||
|
runtime_env_setup_hook,
|
||||||
temp_dir,
|
temp_dir,
|
||||||
session_dir,
|
session_dir,
|
||||||
resource_dir,
|
resource_dir,
|
||||||
|
@ -1425,6 +1426,8 @@ def start_raylet(redis_address,
|
||||||
worker_setup_hook to set up the environment for the worker process.
|
worker_setup_hook to set up the environment for the worker process.
|
||||||
worker_setup_hook (str): The module path to a Python function that will
|
worker_setup_hook (str): The module path to a Python function that will
|
||||||
be imported and run to set up the environment for the worker.
|
be imported and run to set up the environment for the worker.
|
||||||
|
runtime_env_setup_hook (str): The module path to a Python function that
|
||||||
|
will be imported and run to set up the runtime env in agent.
|
||||||
temp_dir (str): The path of the temporary directory Ray will use.
|
temp_dir (str): The path of the temporary directory Ray will use.
|
||||||
session_dir (str): The path of this session.
|
session_dir (str): The path of this session.
|
||||||
resource_dir(str): The path of resource of this session .
|
resource_dir(str): The path of resource of this session .
|
||||||
|
@ -1557,7 +1560,9 @@ def start_raylet(redis_address,
|
||||||
f"--object-store-name={plasma_store_name}",
|
f"--object-store-name={plasma_store_name}",
|
||||||
f"--raylet-name={raylet_name}",
|
f"--raylet-name={raylet_name}",
|
||||||
f"--temp-dir={temp_dir}",
|
f"--temp-dir={temp_dir}",
|
||||||
|
f"--session-dir={session_dir}",
|
||||||
f"--runtime-env-dir={resource_dir}",
|
f"--runtime-env-dir={resource_dir}",
|
||||||
|
f"--runtime-env-setup-hook={runtime_env_setup_hook}",
|
||||||
f"--log-dir={log_dir}",
|
f"--log-dir={log_dir}",
|
||||||
f"--logging-rotate-bytes={max_bytes}",
|
f"--logging-rotate-bytes={max_bytes}",
|
||||||
f"--logging-rotate-backup-count={backup_count}",
|
f"--logging-rotate-backup-count={backup_count}",
|
||||||
|
|
|
@ -789,6 +789,7 @@ class Node:
|
||||||
self._ray_params.worker_path,
|
self._ray_params.worker_path,
|
||||||
self._ray_params.setup_worker_path,
|
self._ray_params.setup_worker_path,
|
||||||
self._ray_params.worker_setup_hook,
|
self._ray_params.worker_setup_hook,
|
||||||
|
self._ray_params.runtime_env_setup_hook,
|
||||||
self._temp_dir,
|
self._temp_dir,
|
||||||
self._session_dir,
|
self._session_dir,
|
||||||
self._resource_dir,
|
self._resource_dir,
|
||||||
|
|
|
@ -233,7 +233,11 @@ AUTOSCALER_RESOURCE_REQUEST_CHANNEL = b"autoscaler_resource_request"
|
||||||
REDIS_DEFAULT_PASSWORD = "5241590000000000"
|
REDIS_DEFAULT_PASSWORD = "5241590000000000"
|
||||||
|
|
||||||
# The default module path to a Python function that sets up the worker env.
|
# The default module path to a Python function that sets up the worker env.
|
||||||
DEFAULT_WORKER_SETUP_HOOK = "ray.workers.setup_runtime_env.setup"
|
DEFAULT_WORKER_SETUP_HOOK = "ray.workers.setup_runtime_env.setup_worker"
|
||||||
|
|
||||||
|
# The default module path to a Python function that sets up runtime envs.
|
||||||
|
DEFAULT_RUNTIME_ENV_SETUP_HOOK = \
|
||||||
|
"ray.workers.setup_runtime_env.setup_runtime_env"
|
||||||
|
|
||||||
# The default ip address to bind to.
|
# The default ip address to bind to.
|
||||||
NODE_DEFAULT_IP = "127.0.0.1"
|
NODE_DEFAULT_IP = "127.0.0.1"
|
||||||
|
|
|
@ -451,6 +451,13 @@ def debug(address):
|
||||||
type=str,
|
type=str,
|
||||||
help="Module path to the Python function that will be used to set up the "
|
help="Module path to the Python function that will be used to set up the "
|
||||||
"environment for the worker process.")
|
"environment for the worker process.")
|
||||||
|
@click.option(
|
||||||
|
"--runtime-env-setup-hook",
|
||||||
|
hidden=True,
|
||||||
|
default=ray_constants.DEFAULT_RUNTIME_ENV_SETUP_HOOK,
|
||||||
|
type=str,
|
||||||
|
help="Module path to the Python function that will be used to set up the "
|
||||||
|
"runtime env in agent.")
|
||||||
@add_click_options(logging_options)
|
@add_click_options(logging_options)
|
||||||
def start(node_ip_address, address, port, redis_password, redis_shard_ports,
|
def start(node_ip_address, address, port, redis_password, redis_shard_ports,
|
||||||
object_manager_port, node_manager_port, gcs_server_port,
|
object_manager_port, node_manager_port, gcs_server_port,
|
||||||
|
@ -462,7 +469,8 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
|
||||||
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
|
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
|
||||||
temp_dir, system_config, lru_evict, enable_object_reconstruction,
|
temp_dir, system_config, lru_evict, enable_object_reconstruction,
|
||||||
metrics_export_port, no_monitor, tracing_startup_hook,
|
metrics_export_port, no_monitor, tracing_startup_hook,
|
||||||
worker_setup_hook, log_style, log_color, verbose):
|
worker_setup_hook, runtime_env_setup_hook, log_style, log_color,
|
||||||
|
verbose):
|
||||||
"""Start Ray processes manually on the local machine."""
|
"""Start Ray processes manually on the local machine."""
|
||||||
cli_logger.configure(log_style, log_color, verbose)
|
cli_logger.configure(log_style, log_color, verbose)
|
||||||
if gcs_server_port and not head:
|
if gcs_server_port and not head:
|
||||||
|
@ -525,7 +533,8 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
|
||||||
metrics_export_port=metrics_export_port,
|
metrics_export_port=metrics_export_port,
|
||||||
no_monitor=no_monitor,
|
no_monitor=no_monitor,
|
||||||
tracing_startup_hook=tracing_startup_hook,
|
tracing_startup_hook=tracing_startup_hook,
|
||||||
worker_setup_hook=worker_setup_hook)
|
worker_setup_hook=worker_setup_hook,
|
||||||
|
runtime_env_setup_hook=runtime_env_setup_hook)
|
||||||
if head:
|
if head:
|
||||||
# Use default if port is none, allocate an available port if port is 0
|
# Use default if port is none, allocate an available port if port is 0
|
||||||
if port is None:
|
if port is None:
|
||||||
|
|
|
@ -16,6 +16,11 @@ parser.add_argument(
|
||||||
type=str,
|
type=str,
|
||||||
help="the serialized parsed runtime env dict")
|
help="the serialized parsed runtime env dict")
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--serialized-runtime-env-context",
|
||||||
|
type=str,
|
||||||
|
help="the serialized runtime env context")
|
||||||
|
|
||||||
# The worker is not set up yet, so we can't get session_dir from the worker.
|
# The worker is not set up yet, so we can't get session_dir from the worker.
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--session-dir", type=str, help="the directory for the current session")
|
"--session-dir", type=str, help="the directory for the current session")
|
||||||
|
|
16
python/ray/workers/pluggable_runtime_env.py
Normal file
16
python/ray/workers/pluggable_runtime_env.py
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class RuntimeEnvContext:
|
||||||
|
"""A context used to describe the created runtime env."""
|
||||||
|
|
||||||
|
def __init__(self, conda_env_name=None):
|
||||||
|
self.conda_env_name = conda_env_name
|
||||||
|
|
||||||
|
def serialize(self) -> str:
|
||||||
|
# serialize the context to json string.
|
||||||
|
return json.dumps(self.__dict__)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def deserialize(json_string):
|
||||||
|
return RuntimeEnvContext(**json.loads(json_string))
|
|
@ -16,6 +16,7 @@ from ray._private.conda import (get_conda_activate_commands,
|
||||||
from ray._private.utils import try_to_create_directory
|
from ray._private.utils import try_to_create_directory
|
||||||
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
|
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
|
||||||
get_release_wheel_url)
|
get_release_wheel_url)
|
||||||
|
from ray.workers.pluggable_runtime_env import RuntimeEnvContext
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
|
@ -25,27 +26,19 @@ parser.add_argument(
|
||||||
type=str,
|
type=str,
|
||||||
help="the serialized parsed runtime env dict")
|
help="the serialized parsed runtime env dict")
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--serialized-runtime-env-context",
|
||||||
|
type=str,
|
||||||
|
help="the serialized runtime env context")
|
||||||
|
|
||||||
# The worker is not set up yet, so we can't get session_dir from the worker.
|
# The worker is not set up yet, so we can't get session_dir from the worker.
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--session-dir", type=str, help="the directory for the current session")
|
"--session-dir", type=str, help="the directory for the current session")
|
||||||
|
|
||||||
|
|
||||||
def setup(input_args):
|
def setup_runtime_env(runtime_env: dict, session_dir):
|
||||||
# remaining_args contains the arguments to the original worker command,
|
|
||||||
# minus the python executable, e.g. default_worker.py --node-ip-address=...
|
|
||||||
args, remaining_args = parser.parse_known_args(args=input_args)
|
|
||||||
|
|
||||||
# add worker-shim-pid argument
|
|
||||||
remaining_args.append("--worker-shim-pid={}".format(os.getpid()))
|
|
||||||
|
|
||||||
commands = []
|
|
||||||
runtime_env: dict = json.loads(args.serialized_runtime_env or "{}")
|
|
||||||
|
|
||||||
py_executable: str = sys.executable
|
|
||||||
|
|
||||||
if runtime_env.get("conda") or runtime_env.get("pip"):
|
if runtime_env.get("conda") or runtime_env.get("pip"):
|
||||||
conda_dict = get_conda_dict(runtime_env, args.session_dir)
|
conda_dict = get_conda_dict(runtime_env, session_dir)
|
||||||
py_executable = "python"
|
|
||||||
if isinstance(runtime_env.get("conda"), str):
|
if isinstance(runtime_env.get("conda"), str):
|
||||||
conda_env_name = runtime_env["conda"]
|
conda_env_name = runtime_env["conda"]
|
||||||
else:
|
else:
|
||||||
|
@ -65,8 +58,8 @@ def setup(input_args):
|
||||||
sort_keys=True).encode("utf-8")).hexdigest()
|
sort_keys=True).encode("utf-8")).hexdigest()
|
||||||
conda_hash_str = f"conda-generated-{conda_hash}"
|
conda_hash_str = f"conda-generated-{conda_hash}"
|
||||||
file_lock_name = f"ray-{conda_hash_str}.lock"
|
file_lock_name = f"ray-{conda_hash_str}.lock"
|
||||||
with FileLock(os.path.join(args.session_dir, file_lock_name)):
|
with FileLock(os.path.join(session_dir, file_lock_name)):
|
||||||
conda_dir = os.path.join(args.session_dir, "runtime_resources",
|
conda_dir = os.path.join(session_dir, "runtime_resources",
|
||||||
"conda")
|
"conda")
|
||||||
try_to_create_directory(conda_dir)
|
try_to_create_directory(conda_dir)
|
||||||
conda_yaml_path = os.path.join(conda_dir, "environment.yml")
|
conda_yaml_path = os.path.join(conda_dir, "environment.yml")
|
||||||
|
@ -78,20 +71,59 @@ def setup(input_args):
|
||||||
conda_env_name = get_or_create_conda_env(
|
conda_env_name = get_or_create_conda_env(
|
||||||
conda_yaml_path, conda_dir)
|
conda_yaml_path, conda_dir)
|
||||||
|
|
||||||
commands += get_conda_activate_commands(conda_env_name)
|
return RuntimeEnvContext(conda_env_name)
|
||||||
|
|
||||||
|
return RuntimeEnvContext()
|
||||||
|
|
||||||
|
|
||||||
|
def setup_worker(input_args):
|
||||||
|
# remaining_args contains the arguments to the original worker command,
|
||||||
|
# minus the python executable, e.g. default_worker.py --node-ip-address=...
|
||||||
|
args, remaining_args = parser.parse_known_args(args=input_args)
|
||||||
|
|
||||||
|
# add worker-shim-pid argument
|
||||||
|
remaining_args.append("--worker-shim-pid={}".format(os.getpid()))
|
||||||
|
|
||||||
|
commands = []
|
||||||
|
py_executable: str = sys.executable
|
||||||
|
runtime_env: dict = json.loads(args.serialized_runtime_env or "{}")
|
||||||
|
runtime_env_context: RuntimeEnvContext = None
|
||||||
|
|
||||||
|
# Ray client server setups runtime env by itself instead of agent.
|
||||||
|
if runtime_env.get("conda") or runtime_env.get("pip"):
|
||||||
|
if not args.serialized_runtime_env_context:
|
||||||
|
runtime_env_context = setup_runtime_env(runtime_env,
|
||||||
|
args.session_dir)
|
||||||
|
else:
|
||||||
|
runtime_env_context = RuntimeEnvContext.deserialize(
|
||||||
|
args.serialized_runtime_env_context)
|
||||||
|
|
||||||
|
# activate conda
|
||||||
|
if runtime_env_context and runtime_env_context.conda_env_name:
|
||||||
|
py_executable = "python"
|
||||||
|
conda_activate_commands = get_conda_activate_commands(
|
||||||
|
runtime_env_context.conda_env_name)
|
||||||
|
if (conda_activate_commands):
|
||||||
|
commands += conda_activate_commands
|
||||||
|
elif runtime_env.get("conda"):
|
||||||
|
logger.warning(
|
||||||
|
"Conda env name is not found in context, "
|
||||||
|
"but conda exists in runtime env. The runtime env %s, "
|
||||||
|
"the context %s.", args.serialized_runtime_env,
|
||||||
|
args.serialized_runtime_env_context)
|
||||||
|
|
||||||
commands += [" ".join([f"exec {py_executable}"] + remaining_args)]
|
commands += [" ".join([f"exec {py_executable}"] + remaining_args)]
|
||||||
command_separator = " && "
|
command_separator = " && "
|
||||||
command_str = command_separator.join(commands)
|
command_str = command_separator.join(commands)
|
||||||
|
|
||||||
|
# update env vars
|
||||||
if runtime_env.get("env_vars"):
|
if runtime_env.get("env_vars"):
|
||||||
env_vars = runtime_env["env_vars"]
|
env_vars = runtime_env["env_vars"]
|
||||||
os.environ.update(env_vars)
|
os.environ.update(env_vars)
|
||||||
|
|
||||||
os.execvp("bash", ["bash", "-c", command_str])
|
os.execvp("bash", ["bash", "-c", command_str])
|
||||||
|
|
||||||
|
|
||||||
def get_conda_dict(runtime_env, session_dir) -> Optional[Dict[Any, Any]]:
|
def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]:
|
||||||
""" Construct a conda dependencies dict from a runtime env.
|
""" Construct a conda dependencies dict from a runtime env.
|
||||||
|
|
||||||
This function does not inject Ray or Python into the conda dict.
|
This function does not inject Ray or Python into the conda dict.
|
||||||
|
@ -111,7 +143,7 @@ def get_conda_dict(runtime_env, session_dir) -> Optional[Dict[Any, Any]]:
|
||||||
pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest()
|
pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest()
|
||||||
pip_hash_str = f"pip-generated-{pip_hash}"
|
pip_hash_str = f"pip-generated-{pip_hash}"
|
||||||
|
|
||||||
conda_dir = os.path.join(session_dir, "runtime_resources", "conda")
|
conda_dir = os.path.join(runtime_env_dir, "conda")
|
||||||
requirements_txt_path = os.path.join(
|
requirements_txt_path = os.path.join(
|
||||||
conda_dir, f"requirements-{pip_hash_str}.txt")
|
conda_dir, f"requirements-{pip_hash_str}.txt")
|
||||||
conda_dict = {
|
conda_dict = {
|
||||||
|
@ -121,7 +153,7 @@ def get_conda_dict(runtime_env, session_dir) -> Optional[Dict[Any, Any]]:
|
||||||
}]
|
}]
|
||||||
}
|
}
|
||||||
file_lock_name = f"ray-{pip_hash_str}.lock"
|
file_lock_name = f"ray-{pip_hash_str}.lock"
|
||||||
with FileLock(os.path.join(session_dir, file_lock_name)):
|
with FileLock(os.path.join(runtime_env_dir, file_lock_name)):
|
||||||
try_to_create_directory(conda_dir)
|
try_to_create_directory(conda_dir)
|
||||||
with open(requirements_txt_path, "w") as file:
|
with open(requirements_txt_path, "w") as file:
|
||||||
file.write(requirements_txt)
|
file.write(requirements_txt)
|
||||||
|
|
|
@ -28,6 +28,8 @@ message CreateRuntimeEnvReply {
|
||||||
AgentRpcStatus status = 1;
|
AgentRpcStatus status = 1;
|
||||||
// The error message.
|
// The error message.
|
||||||
string error_message = 2;
|
string error_message = 2;
|
||||||
|
// Runtime env context.
|
||||||
|
string serialized_runtime_env_context = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeleteRuntimeEnvRequest {
|
message DeleteRuntimeEnvRequest {
|
||||||
|
|
|
@ -117,11 +117,11 @@ void AgentManager::CreateRuntimeEnv(const std::string &serialized_runtime_env,
|
||||||
Status status, const rpc::CreateRuntimeEnvReply &reply) {
|
Status status, const rpc::CreateRuntimeEnvReply &reply) {
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
if (reply.status() == rpc::AGENT_RPC_STATUS_OK) {
|
if (reply.status() == rpc::AGENT_RPC_STATUS_OK) {
|
||||||
callback(true);
|
callback(true, reply.serialized_runtime_env_context());
|
||||||
} else {
|
} else {
|
||||||
RAY_LOG(ERROR) << "Failed to create runtime env: " << serialized_runtime_env
|
RAY_LOG(ERROR) << "Failed to create runtime env: " << serialized_runtime_env
|
||||||
<< ", error message: " << reply.error_message();
|
<< ", error message: " << reply.error_message();
|
||||||
callback(false);
|
callback(false, reply.serialized_runtime_env_context());
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -35,7 +35,9 @@ typedef std::function<std::shared_ptr<rpc::RuntimeEnvAgentClientInterface>(
|
||||||
const std::string &ip_address, int port)>
|
const std::string &ip_address, int port)>
|
||||||
RuntimeEnvAgentClientFactoryFn;
|
RuntimeEnvAgentClientFactoryFn;
|
||||||
|
|
||||||
typedef std::function<void(bool successful)> CreateRuntimeEnvCallback;
|
typedef std::function<void(bool successful,
|
||||||
|
const std::string &serialized_runtime_env_context)>
|
||||||
|
CreateRuntimeEnvCallback;
|
||||||
typedef std::function<void()> DeleteRuntimeEnvCallback;
|
typedef std::function<void()> DeleteRuntimeEnvCallback;
|
||||||
|
|
||||||
class AgentManager : public rpc::AgentManagerServiceHandler {
|
class AgentManager : public rpc::AgentManagerServiceHandler {
|
||||||
|
|
|
@ -152,9 +152,10 @@ void WorkerPool::SetAgentManager(std::shared_ptr<AgentManager> agent_manager) {
|
||||||
|
|
||||||
Process WorkerPool::StartWorkerProcess(
|
Process WorkerPool::StartWorkerProcess(
|
||||||
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
|
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
|
||||||
const std::vector<std::string> &dynamic_options,
|
const std::vector<std::string> &dynamic_options, const int runtime_env_hash,
|
||||||
const std::string &serialized_runtime_env,
|
const std::string &serialized_runtime_env,
|
||||||
std::unordered_map<std::string, std::string> override_environment_variables) {
|
std::unordered_map<std::string, std::string> override_environment_variables,
|
||||||
|
const std::string &serialized_runtime_env_context) {
|
||||||
rpc::JobConfig *job_config = nullptr;
|
rpc::JobConfig *job_config = nullptr;
|
||||||
if (!IsIOWorkerType(worker_type)) {
|
if (!IsIOWorkerType(worker_type)) {
|
||||||
RAY_CHECK(!job_id.IsNil());
|
RAY_CHECK(!job_id.IsNil());
|
||||||
|
@ -303,9 +304,13 @@ Process WorkerPool::StartWorkerProcess(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
WorkerCacheKey env = {override_environment_variables, serialized_runtime_env};
|
worker_command_args.push_back("--runtime-env-hash=" +
|
||||||
const std::string runtime_env_hash_str = std::to_string(env.IntHash());
|
std::to_string(runtime_env_hash));
|
||||||
worker_command_args.push_back("--runtime-env-hash=" + runtime_env_hash_str);
|
|
||||||
|
if (serialized_runtime_env_context != "{}" && serialized_runtime_env_context != "") {
|
||||||
|
worker_command_args.push_back("--serialized-runtime-env-context=" +
|
||||||
|
serialized_runtime_env_context);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We use setproctitle to change python worker process title,
|
// We use setproctitle to change python worker process title,
|
||||||
|
@ -834,19 +839,27 @@ void WorkerPool::TryKillingIdleWorkers() {
|
||||||
RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size());
|
RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int GetRuntimeEnvHash(const TaskSpecification &task_spec) {
|
||||||
|
const WorkerCacheKey env = {task_spec.OverrideEnvironmentVariables(),
|
||||||
|
task_spec.SerializedRuntimeEnv()};
|
||||||
|
return env.IntHash();
|
||||||
|
}
|
||||||
|
|
||||||
std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
|
std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
|
||||||
const TaskSpecification &task_spec) {
|
const TaskSpecification &task_spec) {
|
||||||
auto &state = GetStateForLanguage(task_spec.GetLanguage());
|
auto &state = GetStateForLanguage(task_spec.GetLanguage());
|
||||||
|
|
||||||
std::shared_ptr<WorkerInterface> worker = nullptr;
|
std::shared_ptr<WorkerInterface> worker = nullptr;
|
||||||
Process proc;
|
Process proc;
|
||||||
auto start_worker_process_fn = [this](const TaskSpecification &task_spec, State &state,
|
auto start_worker_process_fn =
|
||||||
std::vector<std::string> dynamic_options,
|
[this](const TaskSpecification &task_spec, State &state,
|
||||||
bool dedicated) -> Process {
|
std::vector<std::string> dynamic_options, bool dedicated,
|
||||||
Process proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
|
const int runtime_env_hash, const std::string &serialized_runtime_env,
|
||||||
task_spec.JobId(), dynamic_options,
|
const std::string &serialized_runtime_env_context) -> Process {
|
||||||
task_spec.SerializedRuntimeEnv(),
|
Process proc = StartWorkerProcess(
|
||||||
task_spec.OverrideEnvironmentVariables());
|
task_spec.GetLanguage(), rpc::WorkerType::WORKER, task_spec.JobId(),
|
||||||
|
dynamic_options, runtime_env_hash, serialized_runtime_env,
|
||||||
|
task_spec.OverrideEnvironmentVariables(), serialized_runtime_env_context);
|
||||||
if (proc.IsValid()) {
|
if (proc.IsValid()) {
|
||||||
WarnAboutSize();
|
WarnAboutSize();
|
||||||
if (dedicated) {
|
if (dedicated) {
|
||||||
|
@ -887,7 +900,8 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
|
||||||
state.tasks_with_pending_runtime_envs.emplace(task_spec.TaskId());
|
state.tasks_with_pending_runtime_envs.emplace(task_spec.TaskId());
|
||||||
agent_manager_->CreateRuntimeEnv(
|
agent_manager_->CreateRuntimeEnv(
|
||||||
task_spec.SerializedRuntimeEnv(),
|
task_spec.SerializedRuntimeEnv(),
|
||||||
[start_worker_process_fn, &state, task_spec, dynamic_options](bool done) {
|
[start_worker_process_fn, &state, task_spec, dynamic_options](
|
||||||
|
bool done, const std::string &serialized_runtime_env_context) {
|
||||||
state.tasks_with_pending_runtime_envs.erase(task_spec.TaskId());
|
state.tasks_with_pending_runtime_envs.erase(task_spec.TaskId());
|
||||||
if (!done) {
|
if (!done) {
|
||||||
// TODO(guyang.sgy): Reschedule to other nodes when create runtime env
|
// TODO(guyang.sgy): Reschedule to other nodes when create runtime env
|
||||||
|
@ -896,19 +910,20 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
|
||||||
"Wait for next time to retry or reschedule.";
|
"Wait for next time to retry or reschedule.";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
start_worker_process_fn(task_spec, state, dynamic_options, true);
|
start_worker_process_fn(
|
||||||
|
task_spec, state, dynamic_options, true, GetRuntimeEnvHash(task_spec),
|
||||||
|
task_spec.SerializedRuntimeEnv(), serialized_runtime_env_context);
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
proc = start_worker_process_fn(task_spec, state, dynamic_options, true);
|
proc =
|
||||||
|
start_worker_process_fn(task_spec, state, dynamic_options, true, 0, "", "");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Find an available worker which is already assigned to this job and which has
|
// Find an available worker which is already assigned to this job and which has
|
||||||
// the specified runtime env.
|
// the specified runtime env.
|
||||||
// Try to pop the most recently pushed worker.
|
// Try to pop the most recently pushed worker.
|
||||||
const WorkerCacheKey env = {task_spec.OverrideEnvironmentVariables(),
|
const int runtime_env_hash = GetRuntimeEnvHash(task_spec);
|
||||||
task_spec.SerializedRuntimeEnv()};
|
|
||||||
const int runtime_env_hash = env.IntHash();
|
|
||||||
for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend();
|
for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend();
|
||||||
it++) {
|
it++) {
|
||||||
if (task_spec.GetLanguage() != it->first->GetLanguage() ||
|
if (task_spec.GetLanguage() != it->first->GetLanguage() ||
|
||||||
|
@ -944,16 +959,20 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
|
||||||
// create runtime env.
|
// create runtime env.
|
||||||
agent_manager_->CreateRuntimeEnv(
|
agent_manager_->CreateRuntimeEnv(
|
||||||
task_spec.SerializedRuntimeEnv(),
|
task_spec.SerializedRuntimeEnv(),
|
||||||
[start_worker_process_fn, &state, task_spec](bool successful) {
|
[start_worker_process_fn, &state, task_spec, runtime_env_hash](
|
||||||
|
bool successful, const std::string &serialized_runtime_env_context) {
|
||||||
if (!successful) {
|
if (!successful) {
|
||||||
// TODO(guyang.sgy): Reschedule to other nodes when create runtime env
|
// TODO(guyang.sgy): Reschedule to other nodes when create runtime env
|
||||||
// failed.
|
// failed.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
start_worker_process_fn(task_spec, state, {}, false);
|
start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash,
|
||||||
|
task_spec.SerializedRuntimeEnv(),
|
||||||
|
serialized_runtime_env_context);
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
proc = start_worker_process_fn(task_spec, state, {}, false);
|
proc = start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, "",
|
||||||
|
"");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -403,8 +403,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
|
||||||
Process StartWorkerProcess(
|
Process StartWorkerProcess(
|
||||||
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
|
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
|
||||||
const std::vector<std::string> &dynamic_options = {},
|
const std::vector<std::string> &dynamic_options = {},
|
||||||
const std::string &serialized_runtime_env = "{}",
|
const int runtime_env_hash = 0, const std::string &serialized_runtime_env = "{}",
|
||||||
std::unordered_map<std::string, std::string> override_environment_variables = {});
|
std::unordered_map<std::string, std::string> override_environment_variables = {},
|
||||||
|
const std::string &serialized_runtime_env_context = "{}");
|
||||||
|
|
||||||
/// The implementation of how to start a new worker process with command arguments.
|
/// The implementation of how to start a new worker process with command arguments.
|
||||||
/// The lifetime of the process is tied to that of the returned object,
|
/// The lifetime of the process is tied to that of the returned object,
|
||||||
|
|
Loading…
Add table
Reference in a new issue