[runtime env] allow working_dir to be a zipped package (#20826)

Check if working_dir is a zip, unzip it if so.
This commit is contained in:
hckuo 2022-01-10 18:29:01 -06:00 committed by GitHub
parent 6e568d2c02
commit 7955333ffd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 10 deletions

View file

@ -254,7 +254,7 @@ API Reference
The ``runtime_env`` is a Python dictionary including one or more of the following fields:
- ``working_dir`` (str): Specifies the working directory for the Ray workers. This must either be an existing directory on the local machine with total size at most 100 MiB, or a URI to a remotely-stored zip file containing the working directory for your job. See :ref:`remote-uris` for details.
- ``working_dir`` (str): Specifies the working directory for the Ray workers. This must either be (1) an local existing directory with total size at most 100 MiB, (2) a local existing zipped file with total unzipped size at most 100 MiB (Note: ``excludes`` has no effect), or (3) a URI to a remotely-stored zip file containing the working directory for your job. See :ref:`remote-uris` for details.
The specified directory will be downloaded to each node on the cluster, and Ray workers will be started in their node's copy of this directory.
- Examples
@ -263,6 +263,8 @@ The ``runtime_env`` is a Python dictionary including one or more of the followin
- ``"/src/my_project"``
- ``"/src/my_project.zip"``
- ``"s3://path/to/my_dir.zip"``
Note: Setting a local directory per-task or per-actor is currently unsupported; it can only be set per-job (i.e., in ``ray.init()``).

View file

@ -286,6 +286,15 @@ def package_exists(pkg_uri: str) -> bool:
raise NotImplementedError(f"Protocol {protocol} is not supported")
def get_uri_for_package(package: Path) -> str:
"""Get a content-addressable URI from a package's contents.
"""
hash_val = hashlib.md5(package.read_bytes()).hexdigest()
return "{protocol}://{pkg_name}.zip".format(
protocol=Protocol.GCS.value, pkg_name=RAY_PKG_PREFIX + hash_val)
def get_uri_for_directory(directory: str,
excludes: Optional[List[str]] = None) -> str:
"""Get a content-addressable URI from a directory's contents.

View file

@ -8,7 +8,8 @@ from ray.experimental.internal_kv import _internal_kv_initialized
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.runtime_env.packaging import (
download_and_unpack_package, delete_package, get_uri_for_directory,
parse_uri, Protocol, upload_package_if_needed)
get_uri_for_package, upload_package_to_gcs, parse_uri, Protocol,
upload_package_if_needed)
from ray._private.utils import try_to_create_directory
default_logger = logging.getLogger(__name__)
@ -47,7 +48,19 @@ def upload_working_dir_if_needed(
return runtime_env
excludes = runtime_env.get("excludes", None)
working_dir_uri = get_uri_for_directory(working_dir, excludes=excludes)
try:
working_dir_uri = get_uri_for_directory(working_dir, excludes=excludes)
except ValueError: # working_dir is not a directory
package_path = Path(working_dir)
if (not package_path.exists() or package_path.suffix != ".zip"):
raise ValueError(f"directory {package_path} must be an existing "
"directory or a zip package")
pkg_uri = get_uri_for_package(package_path)
upload_package_to_gcs(pkg_uri, package_path.read_bytes())
runtime_env["working_dir"] = pkg_uri
return runtime_env
upload_package_if_needed(
working_dir_uri,
scratch_dir,

View file

@ -3,6 +3,7 @@ import os
from pathlib import Path
import sys
import tempfile
import shutil
import pytest
@ -21,10 +22,8 @@ S3_PACKAGE_URI = "s3://runtime-env-test/test_runtime_env.zip"
GS_PACKAGE_URI = "gs://public-runtime-env-test/test_module.zip"
@pytest.mark.parametrize("option", [
"failure", "working_dir", "py_modules", "py_modules_path",
"working_dir_path"
])
@pytest.mark.parametrize(
"option", ["failure", "working_dir", "working_dir_zip", "py_modules"])
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_lazy_reads(start_cluster, tmp_working_dir, option: str):
"""Tests the case where we lazily read files or import inside a task/actor.
@ -40,9 +39,14 @@ def test_lazy_reads(start_cluster, tmp_working_dir, option: str):
ray.init(address)
elif option == "working_dir":
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
elif option == "working_dir_path":
ray.init(
address, runtime_env={"working_dir": Path(tmp_working_dir)})
elif option == "working_dir_zip":
# Create a temp dir to place the zipped package
# from tmp_working_dir
with tempfile.TemporaryDirectory() as tmp_dir:
zip_dir = Path(tmp_working_dir)
package = shutil.make_archive(
os.path.join(tmp_dir, "test"), "zip", zip_dir)
ray.init(address, runtime_env={"working_dir": package})
elif option == "py_modules":
ray.init(
address,