mirror of
https://github.com/vale981/ray
synced 2025-03-04 17:41:43 -05:00
[runtime env] runtime env inheritance refactor (#24538)
* [runtime env] runtime env inheritance refactor (#22244) Runtime Environments is already GA in Ray 1.6.0. The latest doc is [here](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#runtime-environments). And now, we already supported a [inheritance](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#inheritance) behavior as follows (copied from the doc): - The runtime_env["env_vars"] field will be merged with the runtime_env["env_vars"] field of the parent. This allows for environment variables set in the parent’s runtime environment to be automatically propagated to the child, even if new environment variables are set in the child’s runtime environment. - Every other field in the runtime_env will be overridden by the child, not merged. For example, if runtime_env["py_modules"] is specified, it will replace the runtime_env["py_modules"] field of the parent. We think this runtime env merging logic is so complex and confusing to users because users can't know the final runtime env before the jobs are run. Current PR tries to do a refactor and change the behavior of Runtime Environments inheritance. Here is the new behavior: - **If there is no runtime env option when we create actor, inherit the parent runtime env.** - **Otherwise, use the optional runtime env directly and don't do the merging.** Add a new API named `ray.runtime_env.get_current_runtime_env()` to get the parent runtime env and modify this dict by yourself. Like: ```Actor.options(runtime_env=ray.runtime_env.get_current_runtime_env().update({"X": "Y"}))``` This new API also can be used in ray client.
This commit is contained in:
parent
d89c8aa9f9
commit
eb2692cb32
12 changed files with 215 additions and 265 deletions
|
@ -378,30 +378,48 @@ To disable all deletion behavior (for example, for debugging purposes) you may s
|
|||
Inheritance
|
||||
"""""""""""
|
||||
|
||||
The runtime environment is inheritable, so it will apply to all tasks/actors within a job and all child tasks/actors of a task or actor once set, unless it is overridden.
|
||||
The runtime environment is inheritable, so it will apply to all tasks/actors within a job and all child tasks/actors of a task or actor once set, unless it is overridden by explicitly specifying a runtime environment for the child task/actor.
|
||||
|
||||
If an actor or task specifies a new ``runtime_env``, it will override the parent’s ``runtime_env`` (i.e., the parent actor/task's ``runtime_env``, or the job's ``runtime_env`` if there is no parent actor or task) as follows:
|
||||
|
||||
* The ``runtime_env["env_vars"]`` field will be merged with the ``runtime_env["env_vars"]`` field of the parent.
|
||||
This allows for environment variables set in the parent's runtime environment to be automatically propagated to the child, even if new environment variables are set in the child's runtime environment.
|
||||
* Every other field in the ``runtime_env`` will be *overridden* by the child, not merged. For example, if ``runtime_env["py_modules"]`` is specified, it will replace the ``runtime_env["py_modules"]`` field of the parent.
|
||||
|
||||
Example:
|
||||
1. By default, all actors and tasks inherit the current runtime env.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Parent's `runtime_env`
|
||||
# Current `runtime_env`
|
||||
ray.init(runtime_env={"pip": ["requests", "chess"]})
|
||||
|
||||
# Create child actor
|
||||
ChildActor.remote()
|
||||
|
||||
# ChildActor's actual `runtime_env` (inherit from current runtime env)
|
||||
{"pip": ["requests", "chess"]}
|
||||
|
||||
2. However, if you specify runtime_env for task/actor, it will override current runtime env.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Current `runtime_env`
|
||||
ray.init(runtime_env={"pip": ["requests", "chess"]})
|
||||
|
||||
# Create child actor
|
||||
ChildActor.options(runtime_env={"env_vars": {"A": "a", "B": "b"}}).remote()
|
||||
|
||||
# ChildActor's actual `runtime_env` (specify runtime_env overrides)
|
||||
{"env_vars": {"A": "a", "B": "b"}}
|
||||
|
||||
3. If you'd like to still use current runtime env, you can use the API :ref:`ray.get_current_runtime_env() <runtime-env-apis>` to get the current runtime env and modify it by yourself.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Current `runtime_env`
|
||||
ray.init(runtime_env={"pip": ["requests", "chess"]})
|
||||
|
||||
# Child updates `runtime_env`
|
||||
Actor.options(runtime_env=ray.get_runtime_context().runtime_env.update({"env_vars": {"A": "a", "B": "b"}}))
|
||||
|
||||
# Child's actual `runtime_env` (merged with current runtime env)
|
||||
{"pip": ["requests", "chess"],
|
||||
"env_vars": {"A": "a", "B": "b"}}
|
||||
|
||||
# Child's specified `runtime_env`
|
||||
{"pip": ["torch", "ray[serve]"],
|
||||
"env_vars": {"B": "new", "C", "c"}}
|
||||
|
||||
# Child's actual `runtime_env` (merged with parent's)
|
||||
{"pip": ["torch", "ray[serve]"],
|
||||
"env_vars": {"A": "a", "B": "new", "C", "c"}}
|
||||
|
||||
.. _runtime-env-faq:
|
||||
|
||||
Frequently Asked Questions
|
||||
|
|
|
@ -55,6 +55,69 @@ public class RuntimeEnvTest {
|
|||
}
|
||||
}
|
||||
|
||||
public void testPerActorEnvVars() {
|
||||
try {
|
||||
Ray.init();
|
||||
{
|
||||
RuntimeEnv runtimeEnv =
|
||||
new RuntimeEnv.Builder()
|
||||
.addEnvVar("KEY1", "A")
|
||||
.addEnvVar("KEY2", "B")
|
||||
.addEnvVar("KEY1", "C")
|
||||
.build();
|
||||
|
||||
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
|
||||
String val = actor1.task(A::getEnv, "KEY1").remote().get();
|
||||
Assert.assertEquals(val, "C");
|
||||
val = actor1.task(A::getEnv, "KEY2").remote().get();
|
||||
Assert.assertEquals(val, "B");
|
||||
}
|
||||
|
||||
{
|
||||
/// Because we didn't set them for actor2 , all should be null.
|
||||
ActorHandle<A> actor2 = Ray.actor(A::new).remote();
|
||||
String val = actor2.task(A::getEnv, "KEY1").remote().get();
|
||||
Assert.assertNull(val);
|
||||
val = actor2.task(A::getEnv, "KEY2").remote().get();
|
||||
Assert.assertNull(val);
|
||||
}
|
||||
|
||||
} finally {
|
||||
Ray.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testPerActorEnvVarsOverwritePerJobEnvVars() {
|
||||
System.setProperty("ray.job.runtime-env.env-vars.KEY1", "A");
|
||||
System.setProperty("ray.job.runtime-env.env-vars.KEY2", "B");
|
||||
|
||||
try {
|
||||
Ray.init();
|
||||
{
|
||||
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder().addEnvVar("KEY1", "C").build();
|
||||
|
||||
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
|
||||
String val = actor1.task(A::getEnv, "KEY1").remote().get();
|
||||
Assert.assertEquals(val, "C");
|
||||
val = actor1.task(A::getEnv, "KEY2").remote().get();
|
||||
Assert.assertNull(val);
|
||||
}
|
||||
|
||||
{
|
||||
/// Because we didn't set them for actor2 explicitly, it should use the per job
|
||||
/// runtime env.
|
||||
ActorHandle<A> actor2 = Ray.actor(A::new).remote();
|
||||
String val = actor2.task(A::getEnv, "KEY1").remote().get();
|
||||
Assert.assertEquals(val, "A");
|
||||
val = actor2.task(A::getEnv, "KEY2").remote().get();
|
||||
Assert.assertEquals(val, "B");
|
||||
}
|
||||
|
||||
} finally {
|
||||
Ray.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private static String getEnvVar(String key) {
|
||||
return System.getenv(key);
|
||||
}
|
||||
|
@ -92,7 +155,7 @@ public class RuntimeEnvTest {
|
|||
Ray.task(RuntimeEnvTest::getEnvVar, "KEY1").setRuntimeEnv(runtimeEnv).remote().get();
|
||||
Assert.assertEquals(val, "C");
|
||||
val = Ray.task(RuntimeEnvTest::getEnvVar, "KEY2").setRuntimeEnv(runtimeEnv).remote().get();
|
||||
Assert.assertEquals(val, "B");
|
||||
Assert.assertNull(val);
|
||||
} finally {
|
||||
Ray.shutdown();
|
||||
}
|
||||
|
|
|
@ -159,7 +159,7 @@ class RuntimeContext(object):
|
|||
"""
|
||||
return self.worker.should_capture_child_tasks_in_placement_group
|
||||
|
||||
def get_runtime_env_string(self):
|
||||
def _get_runtime_env_string(self):
|
||||
"""Get the runtime env string used for the current driver or worker.
|
||||
|
||||
Returns:
|
||||
|
@ -169,14 +169,27 @@ class RuntimeContext(object):
|
|||
|
||||
@property
|
||||
def runtime_env(self):
|
||||
"""Get the runtime env used for the current driver or worker.
|
||||
"""Get the runtime env of the current job/worker.
|
||||
|
||||
If this API is called in driver or ray client, returns the job level runtime
|
||||
env.
|
||||
If this API is called in workers/actors, returns the worker level runtime env.
|
||||
|
||||
Returns:
|
||||
The runtime env currently using by this worker. The type of
|
||||
return value is ray.runtime_env.RuntimeEnv.
|
||||
A new ray.runtime_env.RuntimeEnv instance.
|
||||
|
||||
To merge from the current runtime env in some specific cases, you can get the
|
||||
current runtime env by this API and modify it by yourself.
|
||||
|
||||
Example:
|
||||
>>> # Inherit current runtime env, except `env_vars`
|
||||
>>> Actor.options( # doctest: +SKIP
|
||||
... runtime_env=ray.get_runtime_context().runtime_env.update(
|
||||
... {"env_vars": {"A": "a", "B": "b"}})
|
||||
... ) # doctest: +SKIP
|
||||
"""
|
||||
|
||||
return RuntimeEnv.deserialize(self.get_runtime_env_string())
|
||||
return RuntimeEnv.deserialize(self._get_runtime_env_string())
|
||||
|
||||
@property
|
||||
def current_actor(self):
|
||||
|
|
|
@ -742,7 +742,7 @@ def test_wrapped_actor_creation(call_ray_start):
|
|||
def test_init_requires_no_resources(call_ray_start, use_client):
|
||||
import ray
|
||||
|
||||
if use_client:
|
||||
if not use_client:
|
||||
address = call_ray_start
|
||||
ray.init(address)
|
||||
else:
|
||||
|
|
|
@ -124,6 +124,41 @@ def test_runtime_env_config(start_cluster):
|
|||
run(runtime_env)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"call_ray_start",
|
||||
["ray start --head --ray-client-server-port 25553"],
|
||||
indirect=True,
|
||||
)
|
||||
@pytest.mark.parametrize("use_client", [False, True])
|
||||
def test_get_current_runtime_env(call_ray_start, use_client):
|
||||
job_runtime_env = {"env_vars": {"a": "b"}}
|
||||
|
||||
if not use_client:
|
||||
address = call_ray_start
|
||||
ray.init(address, runtime_env=job_runtime_env)
|
||||
else:
|
||||
ray.init("ray://localhost:25553", runtime_env=job_runtime_env)
|
||||
|
||||
current_runtime_env = ray.get_runtime_context().runtime_env
|
||||
current_runtime_env_2 = ray.get_runtime_context().runtime_env
|
||||
# Ensure we can get a new instance for update.
|
||||
assert current_runtime_env is not current_runtime_env_2
|
||||
assert isinstance(current_runtime_env, dict)
|
||||
assert current_runtime_env == job_runtime_env
|
||||
|
||||
@ray.remote
|
||||
def get_runtime_env():
|
||||
return ray.get_runtime_context().runtime_env
|
||||
|
||||
assert ray.get(get_runtime_env.remote()) == job_runtime_env
|
||||
|
||||
task_runtime_env = {"env_vars": {"a": "c"}}
|
||||
assert (
|
||||
ray.get(get_runtime_env.options(runtime_env=task_runtime_env).remote())
|
||||
== task_runtime_env
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
|
|
|
@ -887,16 +887,16 @@ def test_e2e_complex(call_ray_start, tmp_path):
|
|||
|
||||
return Path("./test").read_text()
|
||||
|
||||
a = TestActor.options(runtime_env={"pip": str(requirement_path)}).remote()
|
||||
a = TestActor.remote()
|
||||
assert ray.get(a.test.remote()) == "Hello"
|
||||
|
||||
# Check that per-task pip specification works and that the job's
|
||||
# working_dir is still inherited.
|
||||
# working_dir is not inherited.
|
||||
@ray.remote
|
||||
def test_pip():
|
||||
import pip_install_test # noqa
|
||||
|
||||
return Path("./test").read_text()
|
||||
return "Hello"
|
||||
|
||||
assert (
|
||||
ray.get(
|
||||
|
@ -905,22 +905,44 @@ def test_e2e_complex(call_ray_start, tmp_path):
|
|||
== "Hello"
|
||||
)
|
||||
|
||||
@ray.remote
|
||||
def test_working_dir():
|
||||
import pip_install_test # noqa
|
||||
|
||||
return Path("./test").read_text()
|
||||
|
||||
with pytest.raises(ray.exceptions.RayTaskError) as excinfo:
|
||||
ray.get(
|
||||
test_working_dir.options(
|
||||
runtime_env={"pip": ["pip-install-test"]}
|
||||
).remote()
|
||||
)
|
||||
assert "FileNotFoundError" in str(excinfo.value)
|
||||
|
||||
# Check that pip_install_test is not in the job's pip requirements.
|
||||
with pytest.raises(ray.exceptions.RayTaskError) as excinfo:
|
||||
ray.get(test_pip.remote())
|
||||
assert "ModuleNotFoundError" in str(excinfo.value)
|
||||
|
||||
# Check that per-actor pip specification works and that the job's
|
||||
# working_dir is still inherited.
|
||||
# working_dir is not inherited.
|
||||
@ray.remote
|
||||
class TestActor:
|
||||
def test(self):
|
||||
import pip_install_test # noqa
|
||||
|
||||
return "Hello"
|
||||
|
||||
def test_working_dir(self):
|
||||
import pip_install_test # noqa
|
||||
|
||||
return Path("./test").read_text()
|
||||
|
||||
a = TestActor.options(runtime_env={"pip": ["pip-install-test"]}).remote()
|
||||
assert ray.get(a.test.remote()) == "Hello"
|
||||
with pytest.raises(ray.exceptions.RayTaskError) as excinfo:
|
||||
ray.get(a.test_working_dir.remote())
|
||||
assert "FileNotFoundError" in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.skipif(_WIN32, reason="Fails on windows")
|
||||
|
|
|
@ -107,7 +107,7 @@ def test_environment_variables_multitenancy(shutdown_only):
|
|||
}
|
||||
).remote("foo2")
|
||||
)
|
||||
== "bar2"
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
|
@ -164,7 +164,7 @@ def test_environment_variables_complex(shutdown_only):
|
|||
|
||||
assert ray.get(a.get.remote("a")) == "b"
|
||||
assert ray.get(a.get_task.remote("a")) == "b"
|
||||
assert ray.get(a.nested_get.remote("a")) == "b"
|
||||
assert ray.get(a.nested_get.remote("a")) is None
|
||||
assert ray.get(a.nested_get.remote("c")) == "e"
|
||||
assert ray.get(a.nested_get.remote("d")) == "dd"
|
||||
assert (
|
||||
|
@ -180,9 +180,9 @@ def test_environment_variables_complex(shutdown_only):
|
|||
== "b"
|
||||
)
|
||||
|
||||
assert ray.get(a.get.remote("z")) == "job_z"
|
||||
assert ray.get(a.get_task.remote("z")) == "job_z"
|
||||
assert ray.get(a.nested_get.remote("z")) == "job_z"
|
||||
assert ray.get(a.get.remote("z")) is None
|
||||
assert ray.get(a.get_task.remote("z")) is None
|
||||
assert ray.get(a.nested_get.remote("z")) is None
|
||||
assert (
|
||||
ray.get(
|
||||
get_env.options(
|
||||
|
@ -193,7 +193,7 @@ def test_environment_variables_complex(shutdown_only):
|
|||
}
|
||||
).remote("z")
|
||||
)
|
||||
== "job_z"
|
||||
is None
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -236,7 +236,7 @@ class RayletServicer(ray_client_pb2_grpc.RayletDriverServicer):
|
|||
ctx.capture_client_tasks = (
|
||||
rtc.should_capture_child_tasks_in_placement_group
|
||||
)
|
||||
ctx.runtime_env = rtc.get_runtime_env_string()
|
||||
ctx.runtime_env = rtc._get_runtime_env_string()
|
||||
resp.runtime_context.CopyFrom(ctx)
|
||||
else:
|
||||
with disable_client_hook():
|
||||
|
|
|
@ -215,7 +215,7 @@ const std::string &WorkerContext::GetCurrentSerializedRuntimeEnv() const {
|
|||
return runtime_env_info_.serialized_runtime_env();
|
||||
}
|
||||
|
||||
std::shared_ptr<rpc::RuntimeEnv> WorkerContext::GetCurrentRuntimeEnv() const {
|
||||
std::shared_ptr<const rpc::RuntimeEnv> WorkerContext::GetCurrentRuntimeEnv() const {
|
||||
absl::ReaderMutexLock lock(&mutex_);
|
||||
return runtime_env_;
|
||||
}
|
||||
|
|
|
@ -44,7 +44,8 @@ class WorkerContext {
|
|||
|
||||
const std::string &GetCurrentSerializedRuntimeEnv() const LOCKS_EXCLUDED(mutex_);
|
||||
|
||||
std::shared_ptr<rpc::RuntimeEnv> GetCurrentRuntimeEnv() const LOCKS_EXCLUDED(mutex_);
|
||||
std::shared_ptr<const rpc::RuntimeEnv> GetCurrentRuntimeEnv() const
|
||||
LOCKS_EXCLUDED(mutex_);
|
||||
|
||||
// TODO(edoakes): remove this once Python core worker uses the task interfaces.
|
||||
void SetCurrentTaskId(const TaskID &task_id, uint64_t attempt_number);
|
||||
|
|
|
@ -1471,43 +1471,6 @@ std::unordered_map<std::string, double> AddPlacementGroupConstraint(
|
|||
return resources;
|
||||
}
|
||||
|
||||
rpc::RuntimeEnv CoreWorker::OverrideRuntimeEnv(
|
||||
const rpc::RuntimeEnv &child, const std::shared_ptr<rpc::RuntimeEnv> parent) {
|
||||
// By default, the child runtime env inherits non-specified options from the
|
||||
// parent. There is one exception to this:
|
||||
// - The env_vars dictionaries are merged, so environment variables
|
||||
// not specified by the child are still inherited from the parent.
|
||||
|
||||
// Override environment variables.
|
||||
google::protobuf::Map<std::string, std::string> result_env_vars(parent->env_vars());
|
||||
result_env_vars.insert(child.env_vars().begin(), child.env_vars().end());
|
||||
// Inherit all other non-specified options from the parent.
|
||||
rpc::RuntimeEnv result_runtime_env(*parent);
|
||||
// TODO(SongGuyang): avoid dupliacated fields.
|
||||
result_runtime_env.MergeFrom(child);
|
||||
if (child.python_runtime_env().py_modules().size() > 0 &&
|
||||
parent->python_runtime_env().py_modules().size() > 0) {
|
||||
result_runtime_env.mutable_python_runtime_env()->clear_py_modules();
|
||||
for (auto &module : child.python_runtime_env().py_modules()) {
|
||||
result_runtime_env.mutable_python_runtime_env()->add_py_modules(module);
|
||||
}
|
||||
result_runtime_env.mutable_uris()->clear_py_modules_uris();
|
||||
result_runtime_env.mutable_uris()->mutable_py_modules_uris()->CopyFrom(
|
||||
child.uris().py_modules_uris());
|
||||
}
|
||||
if (child.python_runtime_env().has_pip_runtime_env() &&
|
||||
parent->python_runtime_env().has_pip_runtime_env()) {
|
||||
result_runtime_env.mutable_python_runtime_env()->clear_pip_runtime_env();
|
||||
result_runtime_env.mutable_python_runtime_env()->mutable_pip_runtime_env()->CopyFrom(
|
||||
child.python_runtime_env().pip_runtime_env());
|
||||
}
|
||||
if (!result_env_vars.empty()) {
|
||||
result_runtime_env.mutable_env_vars()->insert(result_env_vars.begin(),
|
||||
result_env_vars.end());
|
||||
}
|
||||
return result_runtime_env;
|
||||
}
|
||||
|
||||
static std::vector<std::string> GetUrisFromRuntimeEnv(
|
||||
const rpc::RuntimeEnv *runtime_env) {
|
||||
std::vector<std::string> result;
|
||||
|
@ -1539,69 +1502,39 @@ std::shared_ptr<rpc::RuntimeEnvInfo> CoreWorker::OverrideTaskOrActorRuntimeEnvIn
|
|||
const std::string &serialized_runtime_env_info) {
|
||||
// TODO(Catch-Bull,SongGuyang): task runtime env not support the field eager_install
|
||||
// yet, we will overwrite the filed eager_install when it did.
|
||||
std::shared_ptr<rpc::RuntimeEnv> parent = nullptr;
|
||||
std::shared_ptr<rpc::RuntimeEnvInfo> runtime_env_info = nullptr;
|
||||
runtime_env_info.reset(new rpc::RuntimeEnvInfo());
|
||||
|
||||
if (!IsRuntimeEnvInfoEmpty(serialized_runtime_env_info)) {
|
||||
RAY_CHECK(google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info,
|
||||
runtime_env_info.get())
|
||||
.ok());
|
||||
}
|
||||
|
||||
auto runtime_env_info = std::make_shared<rpc::RuntimeEnvInfo>();
|
||||
std::shared_ptr<const rpc::RuntimeEnv> parent_runtime_env;
|
||||
std::string parent_serialized_runtime_env;
|
||||
if (options_.worker_type == WorkerType::DRIVER) {
|
||||
if (IsRuntimeEnvEmpty(runtime_env_info->serialized_runtime_env())) {
|
||||
runtime_env_info->set_serialized_runtime_env(
|
||||
job_config_->runtime_env_info().serialized_runtime_env());
|
||||
runtime_env_info->clear_uris();
|
||||
for (const std::string &uri : GetUrisFromRuntimeEnv(job_runtime_env_.get())) {
|
||||
runtime_env_info->add_uris(uri);
|
||||
}
|
||||
|
||||
return runtime_env_info;
|
||||
}
|
||||
parent = job_runtime_env_;
|
||||
parent_runtime_env = job_runtime_env_;
|
||||
parent_serialized_runtime_env =
|
||||
job_config_->runtime_env_info().serialized_runtime_env();
|
||||
} else {
|
||||
if (IsRuntimeEnvEmpty(runtime_env_info->serialized_runtime_env())) {
|
||||
runtime_env_info->set_serialized_runtime_env(
|
||||
worker_context_.GetCurrentSerializedRuntimeEnv());
|
||||
runtime_env_info->clear_uris();
|
||||
for (const std::string &uri :
|
||||
GetUrisFromRuntimeEnv(worker_context_.GetCurrentRuntimeEnv().get())) {
|
||||
runtime_env_info->add_uris(uri);
|
||||
}
|
||||
|
||||
return runtime_env_info;
|
||||
}
|
||||
parent = worker_context_.GetCurrentRuntimeEnv();
|
||||
parent_runtime_env = worker_context_.GetCurrentRuntimeEnv();
|
||||
parent_serialized_runtime_env = worker_context_.GetCurrentSerializedRuntimeEnv();
|
||||
}
|
||||
if (parent) {
|
||||
std::string serialized_runtime_env = runtime_env_info->serialized_runtime_env();
|
||||
rpc::RuntimeEnv child_runtime_env;
|
||||
if (!google::protobuf::util::JsonStringToMessage(serialized_runtime_env,
|
||||
&child_runtime_env)
|
||||
.ok()) {
|
||||
RAY_LOG(WARNING) << "Parse runtime env failed for " << serialized_runtime_env
|
||||
<< ". serialized runtime env info: "
|
||||
<< serialized_runtime_env_info;
|
||||
// TODO(SongGuyang): We pass the raw string here and the task will fail after an
|
||||
// exception raised in runtime env agent. Actually, we can fail the task here.
|
||||
return runtime_env_info;
|
||||
}
|
||||
auto override_runtime_env = OverrideRuntimeEnv(child_runtime_env, parent);
|
||||
std::string serialized_override_runtime_env;
|
||||
RAY_CHECK(google::protobuf::util::MessageToJsonString(
|
||||
override_runtime_env, &serialized_override_runtime_env)
|
||||
.ok());
|
||||
runtime_env_info->set_serialized_runtime_env(serialized_override_runtime_env);
|
||||
runtime_env_info->clear_uris();
|
||||
for (const std::string &uri : GetUrisFromRuntimeEnv(&override_runtime_env)) {
|
||||
if (IsRuntimeEnvInfoEmpty(serialized_runtime_env_info)) {
|
||||
// Inherit runtime env from job or worker.
|
||||
runtime_env_info->set_serialized_runtime_env(parent_serialized_runtime_env);
|
||||
for (const std::string &uri : GetUrisFromRuntimeEnv(parent_runtime_env.get())) {
|
||||
runtime_env_info->add_uris(uri);
|
||||
}
|
||||
return runtime_env_info;
|
||||
} else {
|
||||
return runtime_env_info;
|
||||
}
|
||||
|
||||
if (!IsRuntimeEnvEmpty(parent_serialized_runtime_env)) {
|
||||
// TODO(SongGuyang): We add this warning log because of the change of API behavior.
|
||||
// Refer to https://github.com/ray-project/ray/issues/21818.
|
||||
// Modify this log level to `INFO` or `DEBUG` after a few release versions.
|
||||
RAY_LOG(WARNING) << "Runtime env already exists and the parent runtime env is "
|
||||
<< parent_serialized_runtime_env << ". It will be overridden by "
|
||||
<< serialized_runtime_env_info << ".";
|
||||
}
|
||||
|
||||
RAY_CHECK(google::protobuf::util::JsonStringToMessage(serialized_runtime_env_info,
|
||||
runtime_env_info.get())
|
||||
.ok());
|
||||
return runtime_env_info;
|
||||
}
|
||||
|
||||
void CoreWorker::BuildCommonTaskSpec(
|
||||
|
|
|
@ -960,141 +960,6 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodesFailure) {
|
|||
resources.emplace("resource1", 1);
|
||||
TestActorFailure(resources);
|
||||
}
|
||||
|
||||
TEST(TestOverrideRuntimeEnv, TestOverrideEnvVars) {
|
||||
rpc::RuntimeEnv child;
|
||||
auto parent = std::make_shared<rpc::RuntimeEnv>();
|
||||
// child {"a": "b"}, parent {}, expected {"a": "b"}
|
||||
(*child.mutable_env_vars())["a"] = "b";
|
||||
auto result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.env_vars().size(), 1);
|
||||
ASSERT_EQ(result.env_vars().count("a"), 1);
|
||||
ASSERT_EQ(result.env_vars().at("a"), "b");
|
||||
child.clear_env_vars();
|
||||
parent->clear_env_vars();
|
||||
// child {}, parent {"a": "b"}, expected {"a": "b"}
|
||||
(*(parent->mutable_env_vars()))["a"] = "b";
|
||||
result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.env_vars().size(), 1);
|
||||
ASSERT_EQ(result.env_vars().count("a"), 1);
|
||||
ASSERT_EQ(result.env_vars().at("a"), "b");
|
||||
child.clear_env_vars();
|
||||
parent->clear_env_vars();
|
||||
// child {"a": "b"}, parent {"a": "d"}, expected {"a": "b"}
|
||||
(*child.mutable_env_vars())["a"] = "b";
|
||||
(*(parent->mutable_env_vars()))["a"] = "d";
|
||||
result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.env_vars().size(), 1);
|
||||
ASSERT_EQ(result.env_vars().count("a"), 1);
|
||||
ASSERT_EQ(result.env_vars().at("a"), "b");
|
||||
child.clear_env_vars();
|
||||
parent->clear_env_vars();
|
||||
// child {"a": "b"}, parent {"c": "d"}, expected {"a": "b", "c": "d"}
|
||||
(*child.mutable_env_vars())["a"] = "b";
|
||||
(*(parent->mutable_env_vars()))["c"] = "d";
|
||||
result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.env_vars().size(), 2);
|
||||
ASSERT_EQ(result.env_vars().count("a"), 1);
|
||||
ASSERT_EQ(result.env_vars().at("a"), "b");
|
||||
ASSERT_EQ(result.env_vars().count("c"), 1);
|
||||
ASSERT_EQ(result.env_vars().at("c"), "d");
|
||||
child.clear_env_vars();
|
||||
parent->clear_env_vars();
|
||||
// child {"a": "b"}, parent {"a": "e", "c": "d"}, expected {"a": "b", "c": "d"}
|
||||
(*child.mutable_env_vars())["a"] = "b";
|
||||
(*(parent->mutable_env_vars()))["a"] = "e";
|
||||
(*(parent->mutable_env_vars()))["c"] = "d";
|
||||
result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.env_vars().size(), 2);
|
||||
ASSERT_EQ(result.env_vars().count("a"), 1);
|
||||
ASSERT_EQ(result.env_vars().at("a"), "b");
|
||||
ASSERT_EQ(result.env_vars().count("c"), 1);
|
||||
ASSERT_EQ(result.env_vars().at("c"), "d");
|
||||
child.clear_env_vars();
|
||||
parent->clear_env_vars();
|
||||
}
|
||||
|
||||
TEST(TestOverrideRuntimeEnv, TestPyModulesInherit) {
|
||||
rpc::RuntimeEnv child;
|
||||
auto parent = std::make_shared<rpc::RuntimeEnv>();
|
||||
parent->mutable_python_runtime_env()->mutable_py_modules()->Add("s3://456");
|
||||
parent->mutable_uris()->mutable_py_modules_uris()->Add("s3://456");
|
||||
auto result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.python_runtime_env().py_modules().size(), 1);
|
||||
ASSERT_EQ(result.python_runtime_env().py_modules()[0], "s3://456");
|
||||
ASSERT_EQ(result.uris().py_modules_uris().size(), 1);
|
||||
ASSERT_EQ(result.uris().py_modules_uris()[0], "s3://456");
|
||||
}
|
||||
|
||||
TEST(TestOverrideRuntimeEnv, TestOverridePyModules) {
|
||||
rpc::RuntimeEnv child;
|
||||
auto parent = std::make_shared<rpc::RuntimeEnv>();
|
||||
child.mutable_python_runtime_env()->mutable_py_modules()->Add("s3://123");
|
||||
child.mutable_uris()->mutable_py_modules_uris()->Add("s3://123");
|
||||
parent->mutable_python_runtime_env()->mutable_py_modules()->Add("s3://456");
|
||||
parent->mutable_python_runtime_env()->mutable_py_modules()->Add("s3://789");
|
||||
parent->mutable_uris()->mutable_py_modules_uris()->Add("s3://456");
|
||||
parent->mutable_uris()->mutable_py_modules_uris()->Add("s3://789");
|
||||
auto result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.python_runtime_env().py_modules().size(), 1);
|
||||
ASSERT_EQ(result.python_runtime_env().py_modules()[0], "s3://123");
|
||||
ASSERT_EQ(result.uris().py_modules_uris().size(), 1);
|
||||
ASSERT_EQ(result.uris().py_modules_uris()[0], "s3://123");
|
||||
}
|
||||
|
||||
TEST(TestOverrideRuntimeEnv, TestWorkingDirInherit) {
|
||||
rpc::RuntimeEnv child;
|
||||
auto parent = std::make_shared<rpc::RuntimeEnv>();
|
||||
parent->set_working_dir("uri://abc");
|
||||
auto result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.working_dir(), "uri://abc");
|
||||
}
|
||||
|
||||
TEST(TestOverrideRuntimeEnv, TestWorkingDirOverride) {
|
||||
rpc::RuntimeEnv child;
|
||||
auto parent = std::make_shared<rpc::RuntimeEnv>();
|
||||
child.set_working_dir("uri://abc");
|
||||
parent->set_working_dir("uri://def");
|
||||
auto result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.working_dir(), "uri://abc");
|
||||
}
|
||||
|
||||
TEST(TestOverrideRuntimeEnv, TestCondaInherit) {
|
||||
rpc::RuntimeEnv child;
|
||||
auto parent = std::make_shared<rpc::RuntimeEnv>();
|
||||
child.mutable_uris()->set_working_dir_uri("gcs://abc");
|
||||
parent->mutable_uris()->set_working_dir_uri("gcs://def");
|
||||
parent->mutable_uris()->set_conda_uri("conda://456");
|
||||
parent->mutable_python_runtime_env()->mutable_conda_runtime_env()->set_conda_env_name(
|
||||
"my-env-name");
|
||||
auto result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.uris().working_dir_uri(), "gcs://abc");
|
||||
ASSERT_EQ(result.uris().conda_uri(), "conda://456");
|
||||
ASSERT_TRUE(result.python_runtime_env().has_conda_runtime_env());
|
||||
ASSERT_TRUE(result.python_runtime_env().conda_runtime_env().has_conda_env_name());
|
||||
ASSERT_EQ(result.python_runtime_env().conda_runtime_env().conda_env_name(),
|
||||
"my-env-name");
|
||||
}
|
||||
|
||||
TEST(TestOverrideRuntimeEnv, TestCondaOverride) {
|
||||
rpc::RuntimeEnv child;
|
||||
auto parent = std::make_shared<rpc::RuntimeEnv>();
|
||||
child.mutable_uris()->set_conda_uri("conda://123");
|
||||
child.mutable_python_runtime_env()->mutable_conda_runtime_env()->set_conda_env_name(
|
||||
"my-env-name-123");
|
||||
parent->mutable_uris()->set_conda_uri("conda://456");
|
||||
parent->mutable_python_runtime_env()->mutable_conda_runtime_env()->set_conda_env_name(
|
||||
"my-env-name-456");
|
||||
parent->mutable_uris()->set_working_dir_uri("gcs://def");
|
||||
auto result = CoreWorker::OverrideRuntimeEnv(child, parent);
|
||||
ASSERT_EQ(result.uris().conda_uri(), "conda://123");
|
||||
ASSERT_TRUE(result.python_runtime_env().has_conda_runtime_env());
|
||||
ASSERT_TRUE(result.python_runtime_env().conda_runtime_env().has_conda_env_name());
|
||||
ASSERT_EQ(result.python_runtime_env().conda_runtime_env().conda_env_name(),
|
||||
"my-env-name-123");
|
||||
ASSERT_EQ(result.uris().working_dir_uri(), "gcs://def");
|
||||
}
|
||||
|
||||
} // namespace core
|
||||
} // namespace ray
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue