[runtime_env] Remove global logger from working_dir code (#18605)

This commit is contained in:
Edward Oakes 2021-09-16 10:37:45 -05:00 committed by GitHub
parent 187e4a86ca
commit e7ea1f9a82
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 59 deletions

View file

@ -15,9 +15,10 @@ import ray
from ray._private.runtime_env import RuntimeEnvContext
from ray._private.runtime_env.conda_utils import (get_conda_activate_commands,
get_or_create_conda_env)
from ray._private.utils import try_to_create_directory
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
get_release_wheel_url)
get_release_wheel_url, try_to_create_directory)
default_logger = logging.getLogger(__name__)
def _resolve_current_ray_path():
@ -37,11 +38,9 @@ def _resolve_install_from_source_ray_dependencies():
return ray_install_requires
def _inject_ray_to_conda_site(conda_path,
logger: Optional[logging.Logger] = None):
def _inject_ray_to_conda_site(
conda_path, logger: Optional[logging.Logger] = default_logger):
"""Write the current Ray site package directory to a new site"""
if logger is None:
logger = logging.getLogger(__name__)
python_binary = os.path.join(conda_path, "bin/python")
site_packages_path = subprocess.check_output(
[python_binary, "-c",
@ -105,7 +104,7 @@ def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]:
def current_ray_pip_specifier(
logger: Optional[logging.Logger] = None) -> Optional[str]:
logger: Optional[logging.Logger] = default_logger) -> Optional[str]:
"""The pip requirement specifier for the running version of Ray.
Returns:
@ -117,8 +116,6 @@ def current_ray_pip_specifier(
Returns "https://s3-us-west-2.amazonaws.com/ray-wheels/[..].whl"
if running a stable release, a nightly or a specific commit
"""
if logger is None:
logger = logging.getLogger(__name__)
if os.environ.get("RAY_CI_POST_WHEEL_TESTS"):
# Running in Buildkite CI after the wheel has been built.
# Wheels are at in the ray/.whl directory, but use relative path to
@ -199,10 +196,7 @@ class CondaManager:
def setup(self,
runtime_env: dict,
context: RuntimeEnvContext,
logger: Optional[logging.Logger] = None):
if logger is None:
logger = logging.getLogger(__name__)
logger: Optional[logging.Logger] = default_logger):
if not runtime_env.get("conda") and not runtime_env.get("pip"):
return

View file

@ -17,7 +17,7 @@ from ray.job_config import JobConfig
from ray._private.thirdparty.pathspec import PathSpec
from ray._private.runtime_env import RuntimeEnvContext
_logger = logging.getLogger(__name__)
default_logger = logging.getLogger(__name__)
FILE_SIZE_WARNING = 10 * 1024 * 1024 # 10MiB
# NOTE(edoakes): we should be able to support up to 512 MiB based on the GCS'
@ -50,6 +50,7 @@ def _dir_travel(
path: Path,
excludes: List[Callable],
handler: Callable,
logger: Optional[logging.Logger] = default_logger,
):
e = _get_gitignore(path)
if e is not None:
@ -59,17 +60,20 @@ def _dir_travel(
try:
handler(path)
except Exception as e:
_logger.error(f"Issue with path: {path}")
logger.error(f"Issue with path: {path}")
raise e
if path.is_dir():
for sub_path in path.iterdir():
_dir_travel(sub_path, excludes, handler)
_dir_travel(sub_path, excludes, handler, logger=logger)
if e is not None:
excludes.pop()
def _zip_module(root: Path, relative_path: Path, excludes: Optional[Callable],
zip_handler: ZipFile) -> None:
def _zip_module(root: Path,
relative_path: Path,
excludes: Optional[Callable],
zip_handler: ZipFile,
logger: Optional[logging.Logger] = default_logger) -> None:
"""Go through all files and zip them into a zip file"""
def handler(path: Path):
@ -78,20 +82,21 @@ def _zip_module(root: Path, relative_path: Path, excludes: Optional[Callable],
None) is None or path.is_file():
file_size = path.stat().st_size
if file_size >= FILE_SIZE_WARNING:
_logger.warning(
logger.warning(
f"File {path} is very large ({file_size} bytes). "
"Consider excluding this file from the working directory.")
to_path = path.relative_to(relative_path)
zip_handler.write(path, to_path)
excludes = [] if excludes is None else [excludes]
_dir_travel(root, excludes, handler)
_dir_travel(root, excludes, handler, logger=logger)
def _hash_modules(
root: Path,
relative_path: Path,
excludes: Optional[Callable],
logger: Optional[logging.Logger] = default_logger,
) -> bytes:
"""Helper function to create hash of a directory.
@ -114,7 +119,7 @@ def _hash_modules(
hash_val = _xor_bytes(hash_val, md5.digest())
excludes = [] if excludes is None else [excludes]
_dir_travel(root, excludes, handler)
_dir_travel(root, excludes, handler, logger=logger)
return hash_val
@ -166,8 +171,11 @@ def _store_package_in_gcs(gcs_key: str, data: bytes) -> int:
# TODO(yic): Fix this later to handle big directories in better way
def get_project_package_name(working_dir: str, py_modules: List[str],
excludes: List[str]) -> str:
def get_project_package_name(
working_dir: str,
py_modules: List[str],
excludes: List[str],
logger: Optional[logging.Logger] = default_logger) -> str:
"""Get the name of the package by working dir and modules.
This function will generate the name of the package by the working
@ -204,8 +212,11 @@ def get_project_package_name(working_dir: str, py_modules: List[str],
" directory")
hash_val = _xor_bytes(
hash_val,
_hash_modules(working_dir, working_dir,
_get_excludes(working_dir, excludes)))
_hash_modules(
working_dir,
working_dir,
_get_excludes(working_dir, excludes),
logger=logger))
for py_module in py_modules or []:
if not isinstance(py_module, str):
raise TypeError("`py_module` must be a string.")
@ -214,7 +225,8 @@ def get_project_package_name(working_dir: str, py_modules: List[str],
raise ValueError(f"py_module {py_module} must be an existing"
" directory")
hash_val = _xor_bytes(
hash_val, _hash_modules(module_dir, module_dir.parent, None))
hash_val,
_hash_modules(module_dir, module_dir.parent, None, logger=logger))
return RAY_PKG_PREFIX + hash_val.hex() + ".zip" if hash_val else None
@ -243,8 +255,12 @@ def rewrite_runtime_env_uris(job_config: JobConfig) -> None:
[Protocol.GCS.value + "://" + pkg_name])
def create_project_package(working_dir: str, py_modules: List[str],
excludes: List[str], output_path: str) -> None:
def create_project_package(
working_dir: str,
py_modules: List[str],
excludes: List[str],
output_path: str,
logger: Optional[logging.Logger] = default_logger) -> None:
"""Create a pckage that will be used by workers.
This function is used to create a package file based on working
@ -262,11 +278,20 @@ def create_project_package(working_dir: str, py_modules: List[str],
if working_dir:
# put all files in /path/working_dir into zip
working_path = Path(working_dir).absolute()
_zip_module(working_path, working_path,
_get_excludes(working_path, excludes), zip_handler)
_zip_module(
working_path,
working_path,
_get_excludes(working_path, excludes),
zip_handler,
logger=logger)
for py_module in py_modules or []:
module_path = Path(py_module).absolute()
_zip_module(module_path, module_path.parent, None, zip_handler)
_zip_module(
module_path,
module_path.parent,
None,
zip_handler,
logger=logger)
def push_package(pkg_uri: str, pkg_path: str) -> int:
@ -315,7 +340,10 @@ class WorkingDirManager:
_, pkg_name = _parse_uri(pkg_uri)
return os.path.join(self._resources_dir, pkg_name)
def fetch_package(self, pkg_uri: str) -> int:
def fetch_package(self,
pkg_uri: str,
logger: Optional[logging.Logger] = default_logger
) -> int:
"""Fetch a package from a given uri if not exists locally.
This function is used to fetch a pacakge from the given uri and unpack
@ -327,13 +355,18 @@ class WorkingDirManager:
Returns:
The directory containing this package
"""
if logger is None:
logger = default_logger
logger.debug(f"Fetching package for uri: {pkg_uri}")
pkg_file = Path(self._get_local_path(pkg_uri))
local_dir = pkg_file.with_suffix("")
assert local_dir != pkg_file, "Invalid pkg_file!"
if local_dir.exists():
assert local_dir.is_dir(), f"{local_dir} is not a directory"
return local_dir
_logger.debug("Fetch packge")
protocol, pkg_name = _parse_uri(pkg_uri)
if protocol in (Protocol.GCS, Protocol.PIN_GCS):
code = _internal_kv_get(pkg_uri)
@ -344,13 +377,17 @@ class WorkingDirManager:
else:
raise NotImplementedError(f"Protocol {protocol} is not supported")
_logger.debug(f"Unpack {pkg_file} to {local_dir}")
logger.debug(f"Unpacking {pkg_file} to {local_dir}")
with ZipFile(str(pkg_file), "r") as zip_ref:
zip_ref.extractall(local_dir)
pkg_file.unlink()
return local_dir
def upload_runtime_env_package_if_needed(self, job_config: JobConfig):
def upload_runtime_env_package_if_needed(
self,
job_config: JobConfig,
logger: Optional[logging.Logger] = default_logger):
"""Upload runtime env if it's not there.
It'll check whether the runtime environment exists in the cluster or
@ -361,6 +398,9 @@ class WorkingDirManager:
Args:
job_config (JobConfig): The job config of driver.
"""
if logger is None:
logger = default_logger
pkg_uris = job_config.get_runtime_env_uris()
if len(pkg_uris) == 0:
return # Return early to avoid internal kv check in this case.
@ -371,18 +411,24 @@ class WorkingDirManager:
working_dir = job_config.runtime_env.get("working_dir")
py_modules = job_config.runtime_env.get("py_modules")
excludes = job_config.runtime_env.get("excludes") or []
_logger.info(
f"{pkg_uri} doesn't exist. Create new package with"
logger.info(f"{pkg_uri} doesn't exist. Create new package with"
f" {working_dir} and {py_modules}")
if not pkg_file.exists():
create_project_package(working_dir, py_modules, excludes,
file_path)
create_project_package(
working_dir,
py_modules,
excludes,
file_path,
logger=logger)
# Push the data to remote storage
pkg_size = push_package(pkg_uri, pkg_file)
_logger.info(
f"{pkg_uri} has been pushed with {pkg_size} bytes")
logger.info(f"{pkg_uri} has been pushed with {pkg_size} bytes")
def ensure_runtime_env_setup(self, pkg_uris: List[str]) -> Optional[str]:
def ensure_runtime_env_setup(
self,
pkg_uris: List[str],
logger: Optional[logging.Logger] = default_logger,
) -> Optional[str]:
"""Make sure all required packages are downloaded it local.
Necessary packages required to run the job will be downloaded
@ -401,14 +447,16 @@ class WorkingDirManager:
# Locking to avoid multiple process download concurrently
pkg_file = Path(self._get_local_path(pkg_uri))
with FileLock(str(pkg_file) + ".lock"):
pkg_dir = self.fetch_package(pkg_uri)
pkg_dir = self.fetch_package(pkg_uri, logger=logger)
sys.path.insert(0, str(pkg_dir))
# Right now, multiple pkg_uris are not supported correctly.
# We return the last one as working directory
return str(pkg_dir) if pkg_dir else None
def delete_uri(self, uri: str) -> bool:
def delete_uri(self,
uri: str,
logger: Optional[logging.Logger] = default_logger) -> bool:
"""Deletes a specific URI from the local filesystem.
Args:
@ -417,6 +465,9 @@ class WorkingDirManager:
Returns:
True if the URI was successfully deleted, else False.
"""
if logger is None:
logger = default_logger
deleted = False
path = Path(self._get_local_path(uri))
with FileLock(str(path) + ".lock"):
@ -429,26 +480,19 @@ class WorkingDirManager:
deleted = True
if not deleted:
_logger.warning(f"Tried to delete nonexistent path: {path}")
logger.warning(f"Tried to delete nonexistent path: {path}")
return deleted
def setup(self,
runtime_env: dict,
context: RuntimeEnvContext,
logger: Optional[logging.Logger] = None):
logger: Optional[logging.Logger] = default_logger):
if not runtime_env.get("uris"):
return
# Overwrite the module-wide logger temporarily.
# TODO(edoakes): we should be able to remove this by refactoring the
# working_dir setup code into a class instead of using global vars.
global _logger
if logger:
prev_logger = _logger
_logger = logger
working_dir = self.ensure_runtime_env_setup(runtime_env["uris"])
working_dir = self.ensure_runtime_env_setup(
runtime_env["uris"], logger=logger)
context.command_prefix += [f"cd {working_dir}"]
# Insert the working_dir as the first entry in PYTHONPATH. This is
@ -457,6 +501,3 @@ class WorkingDirManager:
if "PYTHONPATH" in context.env_vars:
python_path += os.pathsep + context.env_vars["PYTHONPATH"]
context.env_vars["PYTHONPATH"] = python_path
if logger:
_logger = prev_logger