From 65eab8f3764018d657a43009c015627ba1652f12 Mon Sep 17 00:00:00 2001 From: architkulkarni Date: Thu, 27 May 2021 08:16:33 -0700 Subject: [PATCH] Revert "Revert "[Core] Add "env_vars" field to runtime_env"" (#16107) --- ci/travis/ci.sh | 1 + python/ray/_private/runtime_env.py | 32 +- python/ray/actor.py | 14 +- python/ray/job_config.py | 5 +- python/ray/remote_function.py | 13 +- python/ray/tests/BUILD | 1 + python/ray/tests/test_advanced_3.py | 235 ----------- python/ray/tests/test_runtime_env_env_vars.py | 364 ++++++++++++++++++ python/ray/workers/setup_runtime_env.py | 5 + 9 files changed, 409 insertions(+), 261 deletions(-) create mode 100644 python/ray/tests/test_runtime_env_env_vars.py diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index 8e11f63ea..6b4b93e12 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -176,6 +176,7 @@ test_python() { -python/ray/tests:test_placement_group # timeout and OOM -python/ray/tests:test_ray_init # test_redis_port() seems to fail here, but pass in isolation -python/ray/tests:test_resource_demand_scheduler + -python/ray/tests:test_runtime_env_env_vars # runtime_env not supported on Windows -python/ray/tests:test_stress # timeout -python/ray/tests:test_stress_sharded # timeout -python/ray/tests:test_k8s_operator_unit_tests diff --git a/python/ray/_private/runtime_env.py b/python/ray/_private/runtime_env.py index 6e1af892d..6ec5ba0cb 100644 --- a/python/ray/_private/runtime_env.py +++ b/python/ray/_private/runtime_env.py @@ -124,9 +124,26 @@ class RuntimeEnvDict: if "_ray_release" in runtime_env_json: self._dict["_ray_release"] = runtime_env_json["_ray_release"] + self._dict["env_vars"] = None + if "env_vars" in runtime_env_json: + env_vars = runtime_env_json["env_vars"] + self._dict["env_vars"] = env_vars + if not (isinstance(env_vars, dict) and all( + isinstance(k, str) and isinstance(v, str) + for (k, v) in env_vars.items())): + raise TypeError("runtime_env['env_vars'] must be of type" + "Dict[str, str]") + + if self._dict.get("working_dir"): + if self._dict["env_vars"] is None: + self._dict["env_vars"] = {} + # TODO(ekl): env vars is probably not the right long term impl. + self._dict["env_vars"].update( + RAY_RUNTIME_ENV_FILES=self._dict["working_dir"]) + # TODO(ekl) we should have better schema validation here. # TODO(ekl) support py_modules - # TODO(architkulkarni) support env_vars, docker + # TODO(architkulkarni) support docker # TODO(architkulkarni) This is to make it easy for the worker caching # code in C++ to check if the env is empty without deserializing and @@ -134,19 +151,6 @@ class RuntimeEnvDict: if all(val is None for val in self._dict.values()): self._dict = {} - def to_worker_env_vars(self, override_environment_variables: dict) -> dict: - """Given existing worker env vars, return an updated dict. - - This sets any necessary env vars to setup the runtime env. - TODO(ekl): env vars is probably not the right long term impl. - """ - if override_environment_variables is None: - override_environment_variables = {} - if self._dict.get("working_dir"): - override_environment_variables.update( - RAY_RUNTIME_ENV_FILES=self._dict["working_dir"]) - return override_environment_variables - def get_parsed_dict(self) -> dict: return self._dict diff --git a/python/ray/actor.py b/python/ray/actor.py index 2b9a67833..a54d3d0f2 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -701,13 +701,17 @@ class ActorClass: creation_args = signature.flatten_args(function_signature, args, kwargs) if runtime_env: - parsed_runtime_env = runtime_support.RuntimeEnvDict(runtime_env) - override_environment_variables = ( - parsed_runtime_env.to_worker_env_vars( - override_environment_variables)) - runtime_env_dict = parsed_runtime_env.get_parsed_dict() + runtime_env_dict = runtime_support.RuntimeEnvDict( + runtime_env).get_parsed_dict() else: runtime_env_dict = {} + + if override_environment_variables: + logger.warning("override_environment_variables is deprecated and " + "will be removed in Ray 1.5. Please use " + ".options(runtime_env={'env_vars': {...}}).remote()" + "instead.") + actor_id = worker.core_worker.create_actor( meta.language, meta.actor_creation_function_descriptor, diff --git a/python/ray/job_config.py b/python/ray/job_config.py index 228a8383d..7cbc74035 100644 --- a/python/ray/job_config.py +++ b/python/ray/job_config.py @@ -68,8 +68,9 @@ class JobConfig: del without_dir["working_dir"] self._parsed_runtime_env = runtime_support.RuntimeEnvDict( without_dir) - self.worker_env = self._parsed_runtime_env.to_worker_env_vars( - self.worker_env) + self.worker_env.update( + self._parsed_runtime_env.get_parsed_dict().get("env_vars") + or {}) else: self._parsed_runtime_env = runtime_support.RuntimeEnvDict({}) self.runtime_env = runtime_env or dict() diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index a93e41206..0d88b141e 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -270,14 +270,17 @@ class RemoteFunction: accelerator_type) if runtime_env: - parsed_runtime_env = runtime_support.RuntimeEnvDict(runtime_env) - override_environment_variables = ( - parsed_runtime_env.to_worker_env_vars( - override_environment_variables)) - runtime_env_dict = parsed_runtime_env.get_parsed_dict() + runtime_env_dict = runtime_support.RuntimeEnvDict( + runtime_env).get_parsed_dict() else: runtime_env_dict = {} + if override_environment_variables: + logger.warning("override_environment_variables is deprecated and " + "will be removed in Ray 1.5. Please use " + ".options(runtime_env={'env_vars': {...}}).remote()" + "instead.") + def invocation(args, kwargs): if self._is_cross_language: list_args = cross_language.format_args(worker, args, kwargs) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index d807328ca..cfe33ad0b 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -73,6 +73,7 @@ py_test_module_list( "test_reconstruction.py", "test_reference_counting.py", "test_resource_demand_scheduler.py", + "test_runtime_env_env_vars.py", "test_scheduling.py", "test_serialization.py", "test_stress.py", diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 8eddabede..45df5ef9a 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -733,241 +733,6 @@ def test_k8s_cpu(): assert 50 < k8s_utils.cpu_percent() < 60 -def test_override_environment_variables_task(ray_start_regular): - @ray.remote - def get_env(key): - return os.environ.get(key) - - assert (ray.get( - get_env.options(override_environment_variables={ - "a": "b", - }).remote("a")) == "b") - - -def test_override_environment_variables_actor(ray_start_regular): - @ray.remote - class EnvGetter: - def get(self, key): - return os.environ.get(key) - - a = EnvGetter.options(override_environment_variables={ - "a": "b", - "c": "d", - }).remote() - assert (ray.get(a.get.remote("a")) == "b") - assert (ray.get(a.get.remote("c")) == "d") - - -def test_override_environment_variables_nested_task(ray_start_regular): - @ray.remote - def get_env(key): - return os.environ.get(key) - - @ray.remote - def get_env_wrapper(key): - return ray.get(get_env.remote(key)) - - assert (ray.get( - get_env_wrapper.options(override_environment_variables={ - "a": "b", - }).remote("a")) == "b") - - -def test_override_environment_variables_multitenancy(shutdown_only): - ray.init( - job_config=ray.job_config.JobConfig(worker_env={ - "foo1": "bar1", - "foo2": "bar2", - })) - - @ray.remote - def get_env(key): - return os.environ.get(key) - - assert ray.get(get_env.remote("foo1")) == "bar1" - assert ray.get(get_env.remote("foo2")) == "bar2" - assert ray.get( - get_env.options(override_environment_variables={ - "foo1": "baz1", - }).remote("foo1")) == "baz1" - assert ray.get( - get_env.options(override_environment_variables={ - "foo1": "baz1", - }).remote("foo2")) == "bar2" - - -def test_override_environment_variables_complex(shutdown_only): - ray.init( - job_config=ray.job_config.JobConfig(worker_env={ - "a": "job_a", - "b": "job_b", - "z": "job_z", - })) - - @ray.remote - def get_env(key): - return os.environ.get(key) - - @ray.remote - class NestedEnvGetter: - def get(self, key): - return os.environ.get(key) - - def get_task(self, key): - return ray.get(get_env.remote(key)) - - @ray.remote - class EnvGetter: - def get(self, key): - return os.environ.get(key) - - def get_task(self, key): - return ray.get(get_env.remote(key)) - - def nested_get(self, key): - aa = NestedEnvGetter.options(override_environment_variables={ - "c": "e", - "d": "dd", - }).remote() - return ray.get(aa.get.remote(key)) - - a = EnvGetter.options(override_environment_variables={ - "a": "b", - "c": "d", - }).remote() - assert (ray.get(a.get.remote("a")) == "b") - assert (ray.get(a.get_task.remote("a")) == "b") - assert (ray.get(a.nested_get.remote("a")) == "b") - assert (ray.get(a.nested_get.remote("c")) == "e") - assert (ray.get(a.nested_get.remote("d")) == "dd") - assert (ray.get( - get_env.options(override_environment_variables={ - "a": "b", - }).remote("a")) == "b") - - assert (ray.get(a.get.remote("z")) == "job_z") - assert (ray.get(a.get_task.remote("z")) == "job_z") - assert (ray.get(a.nested_get.remote("z")) == "job_z") - assert (ray.get( - get_env.options(override_environment_variables={ - "a": "b", - }).remote("z")) == "job_z") - - -def test_override_environment_variables_reuse(shutdown_only): - """Test that previously set env vars don't pollute newer calls.""" - ray.init() - - env_var_name = "TEST123" - val1 = "VAL1" - val2 = "VAL2" - assert os.environ.get(env_var_name) is None - - @ray.remote - def f(): - return os.environ.get(env_var_name) - - @ray.remote - def g(): - return os.environ.get(env_var_name) - - assert ray.get(f.remote()) is None - assert ray.get( - f.options(override_environment_variables={ - env_var_name: val1 - }).remote()) == val1 - assert ray.get(f.remote()) is None - assert ray.get(g.remote()) is None - assert ray.get( - f.options(override_environment_variables={ - env_var_name: val2 - }).remote()) == val2 - assert ray.get(g.remote()) is None - assert ray.get(f.remote()) is None - - -def test_override_environment_variables_env_caching(shutdown_only): - """Test that workers with specified envs are cached and reused. - - When a new task or actor is created with a new runtime env, a - new worker process is started. If a subsequent task or actor - uses the same runtime env, the same worker process should be - used. This function checks the pid of the worker to test this. - """ - ray.init() - - env_var_name = "TEST123" - val1 = "VAL1" - val2 = "VAL2" - assert os.environ.get(env_var_name) is None - - def task(): - return os.environ.get(env_var_name), os.getpid() - - @ray.remote - def f(): - return task() - - @ray.remote - def g(): - return task() - - # Empty runtime env does not set our env var. - assert ray.get(f.remote())[0] is None - - # Worker pid1 should have an env var set. - ret_val1, pid1 = ray.get( - f.options(override_environment_variables={ - env_var_name: val1 - }).remote()) - assert ret_val1 == val1 - - # Worker pid2 should have an env var set to something different. - ret_val2, pid2 = ray.get( - g.options(override_environment_variables={ - env_var_name: val2 - }).remote()) - assert ret_val2 == val2 - - # Because the runtime env is different, it should use a different process. - assert pid1 != pid2 - - # Call g with an empty runtime env. It shouldn't reuse pid2, because - # pid2 has an env var set. - _, pid3 = ray.get(g.remote()) - assert pid2 != pid3 - - # Call g with the same runtime env as pid2. Check it uses the same process. - _, pid4 = ray.get( - g.options(override_environment_variables={ - env_var_name: val2 - }).remote()) - assert pid4 == pid2 - - # Call f with a different runtime env from pid1. Check that it uses a new - # process. - _, pid5 = ray.get( - f.options(override_environment_variables={ - env_var_name: val2 - }).remote()) - assert pid5 != pid1 - - # Call f with the same runtime env as pid1. Check it uses the same - # process. - _, pid6 = ray.get( - f.options(override_environment_variables={ - env_var_name: val1 - }).remote()) - assert pid6 == pid1 - - # Same as above but with g instead of f. Shouldn't affect the outcome. - _, pid7 = ray.get( - g.options(override_environment_variables={ - env_var_name: val1 - }).remote()) - assert pid7 == pid1 - - def test_sync_job_config(shutdown_only): num_java_workers_per_process = 8 worker_env = { diff --git a/python/ray/tests/test_runtime_env_env_vars.py b/python/ray/tests/test_runtime_env_env_vars.py new file mode 100644 index 000000000..80009ff83 --- /dev/null +++ b/python/ray/tests/test_runtime_env_env_vars.py @@ -0,0 +1,364 @@ +# coding: utf-8 +import os +import sys + +import pytest + +import ray + + +@pytest.mark.parametrize("use_runtime_env", [True, False]) +def test_override_environment_variables_task(ray_start_regular, + use_runtime_env): + @ray.remote + def get_env(key): + return os.environ.get(key) + + if use_runtime_env: + assert (ray.get( + get_env.options(runtime_env={ + "env_vars": { + "a": "b", + } + }).remote("a")) == "b") + else: + assert (ray.get( + get_env.options(override_environment_variables={ + "a": "b", + }).remote("a")) == "b") + + +@pytest.mark.parametrize("use_runtime_env", [True, False]) +def test_override_environment_variables_actor(ray_start_regular, + use_runtime_env): + @ray.remote + class EnvGetter: + def get(self, key): + return os.environ.get(key) + + if use_runtime_env: + a = EnvGetter.options(runtime_env={ + "env_vars": { + "a": "b", + "c": "d", + } + }).remote() + else: + a = EnvGetter.options(override_environment_variables={ + "a": "b", + "c": "d", + }).remote() + assert (ray.get(a.get.remote("a")) == "b") + assert (ray.get(a.get.remote("c")) == "d") + + +@pytest.mark.parametrize("use_runtime_env", [True, False]) +def test_override_environment_variables_nested_task(ray_start_regular, + use_runtime_env): + @ray.remote + def get_env(key): + return os.environ.get(key) + + @ray.remote + def get_env_wrapper(key): + return ray.get(get_env.remote(key)) + + if use_runtime_env: + assert (ray.get( + get_env_wrapper.options(runtime_env={ + "env_vars": { + "a": "b", + } + }).remote("a")) == "b") + else: + assert (ray.get( + get_env_wrapper.options(override_environment_variables={ + "a": "b", + }).remote("a")) == "b") + + +@pytest.mark.parametrize("use_runtime_env", [True, False]) +def test_override_environment_variables_multitenancy(shutdown_only, + use_runtime_env): + if use_runtime_env: + ray.init( + job_config=ray.job_config.JobConfig( + runtime_env={"env_vars": { + "foo1": "bar1", + "foo2": "bar2", + }})) + else: + ray.init( + job_config=ray.job_config.JobConfig(worker_env={ + "foo1": "bar1", + "foo2": "bar2", + })) + + @ray.remote + def get_env(key): + return os.environ.get(key) + + assert ray.get(get_env.remote("foo1")) == "bar1" + assert ray.get(get_env.remote("foo2")) == "bar2" + if use_runtime_env: + assert ray.get( + get_env.options(runtime_env={ + "env_vars": { + "foo1": "baz1", + } + }).remote("foo1")) == "baz1" + assert ray.get( + get_env.options(runtime_env={ + "env_vars": { + "foo1": "baz1", + } + }).remote("foo2")) == "bar2" + else: + assert ray.get( + get_env.options(override_environment_variables={ + "foo1": "baz1", + }).remote("foo1")) == "baz1" + assert ray.get( + get_env.options(override_environment_variables={ + "foo1": "baz1", + }).remote("foo2")) == "bar2" + + +@pytest.mark.parametrize("use_runtime_env", [True, False]) +def test_override_environment_variables_complex(shutdown_only, + use_runtime_env): + if use_runtime_env: + ray.init( + job_config=ray.job_config.JobConfig(runtime_env={ + "env_vars": { + "a": "job_a", + "b": "job_b", + "z": "job_z", + } + })) + else: + ray.init( + job_config=ray.job_config.JobConfig(worker_env={ + "a": "job_a", + "b": "job_b", + "z": "job_z", + })) + + @ray.remote + def get_env(key): + return os.environ.get(key) + + @ray.remote + class NestedEnvGetter: + def get(self, key): + return os.environ.get(key) + + def get_task(self, key): + return ray.get(get_env.remote(key)) + + @ray.remote + class EnvGetter: + def get(self, key): + return os.environ.get(key) + + def get_task(self, key): + return ray.get(get_env.remote(key)) + + def nested_get(self, key): + # There is a discrepancy here between the semantics of + # override_environment_variables and runtime_env["env_vars"]. + # override_environment_variables are merged with the parent's + # override_environment_variables, but runtime_env["env_vars"] + # overwrites the parent's runtime_env["env_vars"]. So for this + # test, we include the parent's env_vars {"a": "b"} + # explicitly in the child's env_vars in the case use_runtime_env. + if use_runtime_env: + aa = NestedEnvGetter.options(runtime_env={ + "env_vars": { + "a": "b", + "c": "e", + "d": "dd", + } + }).remote() + else: + aa = NestedEnvGetter.options(override_environment_variables={ + "c": "e", + "d": "dd", + }).remote() + return ray.get(aa.get.remote(key)) + + if use_runtime_env: + a = EnvGetter.options(runtime_env={ + "env_vars": { + "a": "b", + "c": "d", + } + }).remote() + else: + a = EnvGetter.options(override_environment_variables={ + "a": "b", + "c": "d", + }).remote() + assert (ray.get(a.get.remote("a")) == "b") + assert (ray.get(a.get_task.remote("a")) == "b") + assert (ray.get(a.nested_get.remote("a")) == "b") + assert (ray.get(a.nested_get.remote("c")) == "e") + assert (ray.get(a.nested_get.remote("d")) == "dd") + if use_runtime_env: + assert (ray.get( + get_env.options(runtime_env={ + "env_vars": { + "a": "b", + } + }).remote("a")) == "b") + else: + assert (ray.get( + get_env.options(override_environment_variables={ + "a": "b", + }).remote("a")) == "b") + + assert (ray.get(a.get.remote("z")) == "job_z") + assert (ray.get(a.get_task.remote("z")) == "job_z") + assert (ray.get(a.nested_get.remote("z")) == "job_z") + if use_runtime_env: + assert (ray.get( + get_env.options(runtime_env={ + "env_vars": { + "a": "b", + } + }).remote("z")) == "job_z") + else: + assert (ray.get( + get_env.options(override_environment_variables={ + "a": "b", + }).remote("z")) == "job_z") + + +@pytest.mark.parametrize("use_runtime_env", [True, False]) +def test_override_environment_variables_reuse(shutdown_only, use_runtime_env): + """Test that previously set env vars don't pollute newer calls.""" + ray.init() + + env_var_name = "TEST123" + val1 = "VAL1" + val2 = "VAL2" + assert os.environ.get(env_var_name) is None + + @ray.remote + def f(): + return os.environ.get(env_var_name) + + @ray.remote + def g(): + return os.environ.get(env_var_name) + + assert ray.get(f.remote()) is None + if use_runtime_env: + assert ray.get( + f.options(runtime_env={ + "env_vars": { + env_var_name: val1 + } + }).remote()) == val1 + else: + assert ray.get( + f.options(override_environment_variables={ + env_var_name: val1 + }).remote()) == val1 + assert ray.get(f.remote()) is None + assert ray.get(g.remote()) is None + if use_runtime_env: + assert ray.get( + f.options(runtime_env={ + "env_vars": { + env_var_name: val2 + } + }).remote()) == val2 + else: + assert ray.get( + f.options(override_environment_variables={ + env_var_name: val2 + }).remote()) == val2 + assert ray.get(g.remote()) is None + assert ray.get(f.remote()) is None + + +# TODO(architkulkarni): Investigate flakiness on Travis CI. It may be that +# there aren't enough CPUs (2-4 on Travis CI vs. likely 8 on Buildkite) and +# worker processes are being killed to adhere to the soft limit. +@pytest.mark.skipif(sys.platform == "darwin", reason="Flaky on Travis CI.") +@pytest.mark.parametrize("use_runtime_env", [True, False]) +def test_override_environment_variables_env_caching(shutdown_only, + use_runtime_env): + """Test that workers with specified envs are cached and reused. + + When a new task or actor is created with a new runtime env, a + new worker process is started. If a subsequent task or actor + uses the same runtime env, the same worker process should be + used. This function checks the pid of the worker to test this. + """ + ray.init() + + env_var_name = "TEST123" + val1 = "VAL1" + val2 = "VAL2" + assert os.environ.get(env_var_name) is None + + def task(): + return os.environ.get(env_var_name), os.getpid() + + @ray.remote + def f(): + return task() + + @ray.remote + def g(): + return task() + + def get_options(val): + if use_runtime_env: + return {"override_environment_variables": {env_var_name: val}} + else: + return {"runtime_env": {"env_vars": {env_var_name: val}}} + + # Empty runtime env does not set our env var. + assert ray.get(f.remote())[0] is None + + # Worker pid1 should have an env var set. + ret_val1, pid1 = ray.get(f.options(**get_options(val1)).remote()) + assert ret_val1 == val1 + + # Worker pid2 should have an env var set to something different. + ret_val2, pid2 = ray.get(g.options(**get_options(val2)).remote()) + assert ret_val2 == val2 + + # Because the runtime env is different, it should use a different process. + assert pid1 != pid2 + + # Call g with an empty runtime env. It shouldn't reuse pid2, because + # pid2 has an env var set. + _, pid3 = ray.get(g.remote()) + assert pid2 != pid3 + + # Call g with the same runtime env as pid2. Check it uses the same process. + _, pid4 = ray.get(g.options(**get_options(val2)).remote()) + assert pid4 == pid2 + + # Call f with a different runtime env from pid1. Check that it uses a new + # process. + _, pid5 = ray.get(f.options(**get_options(val2)).remote()) + assert pid5 != pid1 + + # Call f with the same runtime env as pid1. Check it uses the same + # process. + _, pid6 = ray.get(f.options(**get_options(val1)).remote()) + assert pid6 == pid1 + + # Same as above but with g instead of f. Shouldn't affect the outcome. + _, pid7 = ray.get(g.options(**get_options(val1)).remote()) + assert pid7 == pid1 + + +if __name__ == "__main__": + import pytest + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/workers/setup_runtime_env.py b/python/ray/workers/setup_runtime_env.py index df9c1f9fc..6b6e33ef7 100644 --- a/python/ray/workers/setup_runtime_env.py +++ b/python/ray/workers/setup_runtime_env.py @@ -111,6 +111,11 @@ def setup(input_args): commands += [" ".join([f"exec {py_executable}"] + remaining_args)] command_separator = " && " command_str = command_separator.join(commands) + + if runtime_env.get("env_vars"): + env_vars = runtime_env["env_vars"] + os.environ.update(env_vars) + os.execvp("bash", ["bash", "-c", command_str])