[runtime_env] Unify rpc::RuntimeEnv with serialized_runtime_env field (#18641)

This commit is contained in:
Edward Oakes 2021-09-28 15:13:15 -05:00 committed by GitHub
parent 4af07a8917
commit 73b8936aa8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 140 additions and 139 deletions

View file

@ -77,7 +77,7 @@ class APIHead(dashboard_utils.DashboardHeadModule):
"namespace": job_table_entry.config.ray_namespace,
"metadata": dict(job_table_entry.config.metadata),
"runtime_env": json.loads(
job_table_entry.config.serialized_runtime_env),
job_table_entry.config.runtime_env.serialized_runtime_env),
}
entry = {
"is_dead": job_table_entry.is_dead,

View file

@ -114,13 +114,14 @@ class RuntimeEnvDict:
"dict")
self._dict["pip"] = None
if "pip" in runtime_env_json:
pip = runtime_env_json.get("pip")
if pip is not None:
if sys.platform == "win32":
raise NotImplementedError("The 'pip' field in runtime_env "
"is not currently supported on "
"Windows.")
if ("conda" in runtime_env_json
and runtime_env_json["conda"] is not None):
conda = runtime_env_json.get("conda")
if runtime_env_json.get("conda") is not None:
raise ValueError(
"The 'pip' field and 'conda' field of "
"runtime_env cannot both be specified.\n"
@ -132,7 +133,6 @@ class RuntimeEnvDict:
"https://conda.io/projects/conda/en/latest/"
"user-guide/tasks/manage-environments.html"
"#create-env-file-manually")
pip = runtime_env_json["pip"]
if isinstance(pip, str):
# We have been given a path to a requirements.txt file.
pip_file = Path(pip)
@ -156,8 +156,8 @@ class RuntimeEnvDict:
self._dict["container"] = runtime_env_json["container"]
self._dict["env_vars"] = None
if "env_vars" in runtime_env_json:
env_vars = runtime_env_json["env_vars"]
env_vars = runtime_env_json.get("env_vars")
if env_vars is not None:
self._dict["env_vars"] = env_vars
if not (isinstance(env_vars, dict) and all(
isinstance(k, str) and isinstance(v, str)

View file

@ -1354,6 +1354,7 @@ cdef class CoreWorker:
c_bool placement_group_capture_child_tasks,
c_string debugger_breakpoint,
runtime_env_dict,
runtime_env_uris,
override_environment_variables
):
cdef:
@ -1363,6 +1364,7 @@ cdef class CoreWorker:
CPlacementGroupID c_placement_group_id = \
placement_group_id.native()
c_string c_serialized_runtime_env
c_vector[c_string] c_runtime_env_uris = runtime_env_uris
unordered_map[c_string, c_string] \
c_override_environment_variables = \
override_environment_variables
@ -1384,6 +1386,7 @@ cdef class CoreWorker:
name, num_returns, c_resources,
b"",
c_serialized_runtime_env,
c_runtime_env_uris,
c_override_environment_variables),
max_retries, retry_exceptions,
c_pair[CPlacementGroupID, int64_t](
@ -1411,6 +1414,7 @@ cdef class CoreWorker:
c_bool placement_group_capture_child_tasks,
c_string extension_data,
runtime_env_dict,
runtime_env_uris,
override_environment_variables
):
cdef:
@ -1423,6 +1427,7 @@ cdef class CoreWorker:
CPlacementGroupID c_placement_group_id = \
placement_group_id.native()
c_string c_serialized_runtime_env
c_vector[c_string] c_runtime_env_uris = runtime_env_uris
unordered_map[c_string, c_string] \
c_override_environment_variables = \
override_environment_variables
@ -1450,6 +1455,7 @@ cdef class CoreWorker:
placement_group_bundle_index),
placement_group_capture_child_tasks,
c_serialized_runtime_env,
c_runtime_env_uris,
c_override_environment_variables),
extension_data,
&c_actor_id))
@ -1865,8 +1871,8 @@ cdef class CoreWorker:
# This should never change, so we can safely cache it to avoid ser/de
if self.current_runtime_env_dict is None:
if self.is_driver:
self.current_runtime_env_dict = \
json.loads(self.get_job_config().serialized_runtime_env)
self.current_runtime_env_dict = json.loads(
self.get_job_config().runtime_env.serialized_runtime_env)
else:
self.current_runtime_env_dict = json.loads(
CCoreWorkerProcess.GetCoreWorker()

View file

@ -755,6 +755,7 @@ class ActorClass:
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu),
runtime_env_dict=runtime_env_dict,
runtime_env_uris=runtime_env_dict.get("uris") or [],
override_environment_variables=override_environment_variables
or dict())

View file

@ -260,6 +260,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
unordered_map[c_string, double] &resources,
c_string concurrency_group_name,
c_string serialized_runtime_env,
c_vector[c_string] runtime_env_uris,
const unordered_map[c_string, c_string]
&override_environment_variables)
@ -277,6 +278,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
c_pair[CPlacementGroupID, int64_t] placement_options,
c_bool placement_group_capture_child_tasks,
c_string serialized_runtime_env,
c_vector[c_string] runtime_env_uris,
const unordered_map[c_string, c_string]
&override_environment_variables)

View file

@ -1,9 +1,7 @@
from typing import Any, Dict, Optional
import uuid
import json
import ray._private.gcs_utils as gcs_utils
from ray.core.generated.common_pb2 import RuntimeEnv as RuntimeEnvPB
class JobConfig:
@ -54,8 +52,7 @@ class JobConfig:
def serialize(self):
"""Serialize the struct into protobuf string"""
job_config = self.get_proto_job_config()
return job_config.SerializeToString()
return self.get_proto_job_config().SerializeToString()
def set_runtime_env(self, runtime_env: Optional[Dict[str, Any]]) -> None:
# Lazily import this to avoid circular dependencies.
@ -90,9 +87,9 @@ class JobConfig:
self.num_java_workers_per_process)
self._cached_pb.jvm_options.extend(self.jvm_options)
self._cached_pb.code_search_path.extend(self.code_search_path)
self._cached_pb.runtime_env.CopyFrom(self._get_proto_runtime())
self._cached_pb.serialized_runtime_env = \
self.get_serialized_runtime_env()
self._cached_pb.runtime_env.uris[:] = self.get_runtime_env_uris()
serialized_env = self.get_serialized_runtime_env()
self._cached_pb.runtime_env.serialized_runtime_env = serialized_env
for k, v in self.metadata.items():
self._cached_pb.metadata[k] = v
return self._cached_pb
@ -103,16 +100,10 @@ class JobConfig:
return self.runtime_env.get("uris")
return []
def set_runtime_env_uris(self, uris):
self.runtime_env["uris"] = uris
self._parsed_runtime_env.set_uris(uris)
def get_serialized_runtime_env(self) -> str:
"""Return the JSON-serialized parsed runtime env dict"""
return self._parsed_runtime_env.serialize()
def _get_proto_runtime(self) -> RuntimeEnvPB:
runtime_env = RuntimeEnvPB()
runtime_env.uris[:] = self.get_runtime_env_uris()
runtime_env.raw_json = json.dumps(self.runtime_env)
return runtime_env
def set_runtime_env_uris(self, uris):
self.runtime_env["uris"] = uris
self._parsed_runtime_env.set_uris(uris)

View file

@ -328,6 +328,7 @@ class RemoteFunction:
placement_group_capture_child_tasks,
worker.debugger_breakpoint,
runtime_env_dict,
runtime_env_dict.get("uris") or [],
override_environment_variables=override_environment_variables
or dict())
# Reset worker's debug context from the last "remote" command

View file

@ -152,7 +152,7 @@ class RuntimeContext(object):
@property
def runtime_env(self):
"""Get the runtime env passed to job_config
"""Get the runtime env used for the current driver or worker.
Returns:
The runtime env currently using by this worker.

View file

@ -774,9 +774,8 @@ def test_container_option_serialize():
job_config = ray.job_config.JobConfig(runtime_env=runtime_env)
job_config_serialized = job_config.serialize()
# job_config_serialized is JobConfig protobuf serialized string,
# job_config.runtime_env.raw_json has container_option info
# job_config.serialized_runtime_env also has container_option info
assert job_config_serialized.count(b"image") == 2
# job_config.runtime_env.serialized_runtime_env has container_option info
assert job_config_serialized.count(b"image") == 1
def test_working_dir_override_failure(shutdown_only):

View file

@ -5,7 +5,6 @@ from ray._private.client_mode_hook import (_explicitly_disable_client_mode,
import os
import sys
import logging
import json
import threading
import grpc
@ -66,7 +65,7 @@ class _ClientContext:
job_config = job_config or JobConfig()
job_config.set_ray_namespace(namespace)
if job_config is not None:
runtime_env = json.loads(job_config.get_serialized_runtime_env())
runtime_env = job_config.runtime_env
if runtime_env.get("pip") or runtime_env.get("conda"):
logger.warning("The 'pip' or 'conda' field was specified in "
"the runtime env, so it may take some time to "

View file

@ -263,7 +263,7 @@ class ProxyManager():
f"ray_client_server_{specific_server.port}", unique=True)
serialized_runtime_env = job_config.get_serialized_runtime_env()
if serialized_runtime_env == "{}":
if not serialized_runtime_env or serialized_runtime_env == "{}":
serialized_runtime_env_context = RuntimeEnvContext().serialize()
else:
serialized_runtime_env_context = self._create_runtime_env(

View file

@ -191,8 +191,8 @@ class Worker:
@property
def runtime_env(self):
"""Get the runtime env in json format"""
return json.loads(
self.core_worker.get_job_config().runtime_env.raw_json)
return json.loads(self.core_worker.get_job_config()
.runtime_env.serialized_runtime_env)
def get_serialization_context(self, job_id=None):
"""Get the SerializationContext of the job that this worker is processing.

View file

@ -13,6 +13,7 @@
// limitations under the License.
#include "ray/common/runtime_env_manager.h"
#include "ray/util/logging.h"
namespace ray {
@ -20,19 +21,14 @@ void RuntimeEnvManager::AddURIReference(const std::string &hex_id,
const rpc::RuntimeEnv &runtime_env) {
const auto &uris = runtime_env.uris();
for (const auto &uri : uris) {
AddURIReference(hex_id, uri);
if (unused_uris_.count(uri)) {
unused_uris_.erase(uri);
}
uri_reference_[uri]++;
id_to_uris_[hex_id].push_back(uri);
}
}
void RuntimeEnvManager::AddURIReference(const std::string &hex_id,
const std::string &uri) {
if (unused_uris_.count(uri)) {
unused_uris_.erase(uri);
}
uri_reference_[uri]++;
id_to_uris_[hex_id].push_back(uri);
}
const std::vector<std::string> &RuntimeEnvManager::GetReferences(
const std::string &hex_id) const {
static const std::vector<std::string> _default;

View file

@ -13,6 +13,7 @@
// limitations under the License.
#pragma once
#include <functional>
#include "ray/common/id.h"
#include "src/ray/protobuf/common.pb.h"
@ -37,12 +38,6 @@ class RuntimeEnvManager {
/// \param[in] runtime_env The runtime env used by the id.
void AddURIReference(const std::string &hex_id, const rpc::RuntimeEnv &runtime_env);
/// Increase the reference of URI by URI and runtime_env.
///
/// \param[in] hex_id The id of the runtime env. It can be an actor or job id.
/// \param[in] uri The URI referenced by the id.
void AddURIReference(const std::string &hex_id, const std::string &uri);
/// Get the reference of URIs by id.
///
/// \param[in] hex_id The id of to look.

View file

@ -132,8 +132,10 @@ ray::FunctionDescriptor TaskSpecification::FunctionDescriptor() const {
return ray::FunctionDescriptorBuilder::FromProto(message_->function_descriptor());
}
rpc::RuntimeEnv TaskSpecification::RuntimeEnv() const { return message_->runtime_env(); }
std::string TaskSpecification::SerializedRuntimeEnv() const {
return message_->serialized_runtime_env();
return message_->runtime_env().serialized_runtime_env();
}
bool TaskSpecification::HasRuntimeEnv() const {

View file

@ -100,6 +100,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
ray::FunctionDescriptor FunctionDescriptor() const;
[[nodiscard]] rpc::RuntimeEnv RuntimeEnv() const;
std::string SerializedRuntimeEnv() const;
bool HasRuntimeEnv() const;

View file

@ -106,6 +106,7 @@ class TaskSpecBuilder {
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string &debugger_breakpoint,
const std::string &serialized_runtime_env = "{}",
const std::vector<std::string> &runtime_env_uris = {},
const std::unordered_map<std::string, std::string> &override_environment_variables =
{},
const std::string &concurrency_group_name = "") {
@ -129,7 +130,10 @@ class TaskSpecBuilder {
message_->set_placement_group_capture_child_tasks(
placement_group_capture_child_tasks);
message_->set_debugger_breakpoint(debugger_breakpoint);
message_->set_serialized_runtime_env(serialized_runtime_env);
message_->mutable_runtime_env()->set_serialized_runtime_env(serialized_runtime_env);
for (const std::string &uri : runtime_env_uris) {
message_->mutable_runtime_env()->add_uris(uri);
}
message_->set_concurrency_group_name(concurrency_group_name);
for (const auto &env : override_environment_variables) {
(*message_->mutable_override_environment_variables())[env.first] = env.second;

View file

@ -60,6 +60,7 @@ struct TaskOptions {
std::unordered_map<std::string, double> &resources,
const std::string &concurrency_group_name = "",
const std::string &serialized_runtime_env = "{}",
const std::vector<std::string> &runtime_env_uris = {},
const std::unordered_map<std::string, std::string>
&override_environment_variables = {})
: name(name),
@ -67,6 +68,7 @@ struct TaskOptions {
resources(resources),
concurrency_group_name(concurrency_group_name),
serialized_runtime_env(serialized_runtime_env),
runtime_env_uris(runtime_env_uris),
override_environment_variables(override_environment_variables) {}
/// The name of this task.
@ -77,8 +79,10 @@ struct TaskOptions {
std::unordered_map<std::string, double> resources;
/// The name of the concurrency group in which this task will be executed.
std::string concurrency_group_name;
// Runtime Env used by this task. Propagated to child actors and tasks.
// Runtime Env used by this task. Propagated to child actors and tasks.
std::string serialized_runtime_env;
// URIs contained in the runtime_env.
std::vector<std::string> runtime_env_uris;
/// Environment variables to update for this task. Maps a variable name to its
/// value. Can override existing environment variables and introduce new ones.
/// Propagated to child actors and/or tasks.
@ -97,6 +101,7 @@ struct ActorCreationOptions {
BundleID placement_options = std::make_pair(PlacementGroupID::Nil(), -1),
bool placement_group_capture_child_tasks = true,
const std::string &serialized_runtime_env = "{}",
const std::vector<std::string> &runtime_env_uris = {},
const std::unordered_map<std::string, std::string> &override_environment_variables =
{},
const std::vector<ConcurrencyGroup> &concurrency_groups = {})
@ -113,6 +118,7 @@ struct ActorCreationOptions {
placement_options(placement_options),
placement_group_capture_child_tasks(placement_group_capture_child_tasks),
serialized_runtime_env(serialized_runtime_env),
runtime_env_uris(runtime_env_uris),
override_environment_variables(override_environment_variables),
concurrency_groups(concurrency_groups.begin(), concurrency_groups.end()){};
@ -155,6 +161,8 @@ struct ActorCreationOptions {
bool placement_group_capture_child_tasks = true;
// Runtime Env used by this actor. Propagated to child actors and tasks.
std::string serialized_runtime_env;
// URIs contained in the runtime_env.
std::vector<std::string> runtime_env_uris;
/// Environment variables to update for this actor. Maps a variable name to its
/// value. Can override existing environment variables and introduce new ones.
/// Propagated to child actors and/or tasks.

View file

@ -168,7 +168,7 @@ bool WorkerContext::ShouldCaptureChildTasksInPlacementGroup() const {
}
const std::string &WorkerContext::GetCurrentSerializedRuntimeEnv() const {
return serialized_runtime_env_;
return runtime_env_.serialized_runtime_env();
}
const std::unordered_map<std::string, std::string>
@ -186,9 +186,9 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
if (task_spec.IsNormalTask()) {
current_task_is_direct_call_ = true;
// TODO(architkulkarni): Once workers are cached by runtime env, we should
// only set serialized_runtime_env_ once and then RAY_CHECK that we
// only set runtime_env_ once and then RAY_CHECK that we
// never see a new one.
serialized_runtime_env_ = task_spec.SerializedRuntimeEnv();
runtime_env_ = task_spec.RuntimeEnv();
override_environment_variables_ = task_spec.OverrideEnvironmentVariables();
} else if (task_spec.IsActorCreationTask()) {
RAY_CHECK(current_actor_id_.IsNil());
@ -199,7 +199,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
is_detached_actor_ = task_spec.IsDetachedActor();
current_actor_placement_group_id_ = task_spec.PlacementGroupBundleId().first;
placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks();
serialized_runtime_env_ = task_spec.SerializedRuntimeEnv();
runtime_env_ = task_spec.RuntimeEnv();
override_environment_variables_ = task_spec.OverrideEnvironmentVariables();
} else if (task_spec.IsActorTask()) {
RAY_CHECK(current_actor_id_ == task_spec.ActorId());

View file

@ -98,8 +98,8 @@ class WorkerContext {
PlacementGroupID current_actor_placement_group_id_;
// Whether or not we should implicitly capture parent's placement group.
bool placement_group_capture_child_tasks_;
// The JSON-serialized runtime env for the current actor or task.
std::string serialized_runtime_env_ = "{}";
// The runtime env for the current actor or task.
rpc::RuntimeEnv runtime_env_;
// The environment variable overrides for the current actor or task.
std::unordered_map<std::string, std::string> override_environment_variables_;
/// The id of the (main) thread that constructed this worker context.

View file

@ -40,6 +40,7 @@ void BuildCommonTaskSpec(
const std::unordered_map<std::string, double> &required_placement_resources,
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string debugger_breakpoint, const std::string &serialized_runtime_env,
const std::vector<std::string> &runtime_env_uris,
const std::unordered_map<std::string, std::string> &override_environment_variables,
const std::string &concurrency_group_name = "") {
// Build common task spec.
@ -47,8 +48,8 @@ void BuildCommonTaskSpec(
task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
current_task_id, task_index, caller_id, address, num_returns, required_resources,
required_placement_resources, bundle_id, placement_group_capture_child_tasks,
debugger_breakpoint, serialized_runtime_env, override_environment_variables,
concurrency_group_name);
debugger_breakpoint, serialized_runtime_env, runtime_env_uris,
override_environment_variables, concurrency_group_name);
// Set task arguments.
for (const auto &arg : args) {
builder.AddArg(*arg);
@ -1669,12 +1670,13 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
override_environment_variables.insert(current_override_environment_variables.begin(),
current_override_environment_variables.end());
// TODO(ekl) offload task building onto a thread pool for performance
BuildCommonTaskSpec(
builder, worker_context_.GetCurrentJobID(), task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_,
function, args, task_options.num_returns, constrained_resources, required_resources,
placement_options, placement_group_capture_child_tasks, debugger_breakpoint,
task_options.serialized_runtime_env, override_environment_variables);
BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, task_name,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, task_options.num_returns,
constrained_resources, required_resources, placement_options,
placement_group_capture_child_tasks, debugger_breakpoint,
task_options.serialized_runtime_env, task_options.runtime_env_uris,
override_environment_variables);
builder.SetNormalTaskSpec(max_retries, retry_exceptions);
TaskSpecification task_spec = builder.Build();
RAY_LOG(DEBUG) << "Submit task " << task_spec.DebugString();
@ -1736,6 +1738,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
actor_creation_options.placement_group_capture_child_tasks,
"", /* debugger_breakpoint */
actor_creation_options.serialized_runtime_env,
actor_creation_options.runtime_env_uris,
override_environment_variables);
auto actor_handle = std::make_unique<ActorHandle>(
@ -1921,6 +1924,7 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitActorTask(
true, /* placement_group_capture_child_tasks */
"", /* debugger_breakpoint */
"{}", /* serialized_runtime_env */
{}, /* runtime_env_uris */
override_environment_variables,
task_options.concurrency_group_name);
// NOTE: placement_group_capture_child_tasks and runtime_env will

View file

@ -235,9 +235,10 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
ray_namespace,
/*is_asyncio=*/false,
placement_options,
true,
"{}",
{},
/*placement_group_capture_child_tasks=*/true,
/*serialized_runtime_env=*/"{}",
/*runtime_env_uris=*/{},
/*override_environment_variables=*/{},
concurrency_groups};
return actor_creation_options;
}

View file

@ -385,13 +385,9 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
// owner to determine when the actor should be removed.
PollOwnerForActorOutOfScope(actor);
} else {
// If it's a detached actor, we need to register the runtime env it used to GC
auto job_id = JobID::FromBinary(request.task_spec().job_id());
const auto &uris = runtime_env_manager_.GetReferences(job_id.Hex());
auto actor_id_hex = actor->GetActorID().Hex();
for (const auto &uri : uris) {
runtime_env_manager_.AddURIReference(actor_id_hex, uri);
}
// If it's a detached actor, we need to register the runtime env it used to GC.
runtime_env_manager_.AddURIReference(actor->GetActorID().Hex(),
request.task_spec().runtime_env());
}
// The backend storage is supposed to be reliable, so the status must be ok.

View file

@ -86,7 +86,8 @@ class GcsActor {
break;
}
actor_table_data_.set_serialized_runtime_env(task_spec.serialized_runtime_env());
actor_table_data_.set_serialized_runtime_env(
task_spec.runtime_env().serialized_runtime_env());
}
/// Get the node id on which this actor is created.

View file

@ -156,9 +156,9 @@ message RayException {
/// The runtime environment describes all the runtime packages needed to
/// run some task or actor.
message RuntimeEnv {
/// The raw json passed from user
string raw_json = 1;
/// Uris used in this runtime env
/// The serialized runtime env passed from the user.
string serialized_runtime_env = 1;
/// URIs used in this runtime env. These will be used for reference counting.
repeated string uris = 2;
}
@ -219,8 +219,8 @@ message TaskSpec {
// Breakpoint if this task should drop into the debugger when it starts executing
// and "" if the task should not drop into the debugger.
bytes debugger_breakpoint = 23;
// Serialized JSON string of the parsed runtime environment dict for this task.
string serialized_runtime_env = 24;
// Runtime environment for this task.
RuntimeEnv runtime_env = 24;
// The concurrency group name in which this task will be performed.
string concurrency_group_name = 25;
// Whether application-level errors (exceptions) should be retried.

View file

@ -150,19 +150,17 @@ message ActorTableData {
RayException creation_task_exception = 18;
// The actor's namespace. Named `ray_namespace` to avoid confusions when invoked in c++.
string ray_namespace = 19;
// Runtime required to run this actor
// It'll only be set if it's a detached actor and the original job has this field
RuntimeEnv runtime_env = 20;
// The unix ms timestamp the actor was started at.
uint64 start_time = 21;
uint64 start_time = 20;
// The unix ms timestamp the actor was ended at.
uint64 end_time = 22;
uint64 end_time = 21;
// Serialized runtime_env used to report in the dashboard snapshot. We need to populate
// it here instead of grabbing it from the task spec because the task spec is cleared
// for deleted actors: https://github.com/ray-project/ray/pull/11149.
string serialized_runtime_env = 22;
// The actor's class name. This is necessary because the task spec's lifetime
// is shorter than the ActorTableData.
string class_name = 23;
// The actor's serialized runtime environment. This is necessary because the
// task spec's lifetime is shorter than the ActorTableData.
string serialized_runtime_env = 24;
}
message ErrorTableData {
@ -293,10 +291,8 @@ message JobConfig {
RuntimeEnv runtime_env = 5;
// The job's namespace. Named `ray_namespace` to avoid confusions when invoked in c++.
string ray_namespace = 6;
// Serialized JSON string of the parsed runtime environment dict for this job.
string serialized_runtime_env = 7;
// An opaque kv store for job related metadata.
map<string, string> metadata = 8;
map<string, string> metadata = 7;
}
message JobTableData {

View file

@ -1868,7 +1868,8 @@ void NodeManager::FinishAssignedActorCreationTask(WorkerInterface &worker,
auto job_id = task.GetTaskSpecification().JobId();
auto job_config = worker_pool_.GetJobConfig(job_id);
RAY_CHECK(job_config);
runtime_env_manager_.AddURIReference(actor_id.Hex(), job_config->runtime_env());
runtime_env_manager_.AddURIReference(actor_id.Hex(),
task.GetTaskSpecification().RuntimeEnv());
}
}

View file

@ -48,7 +48,6 @@ namespace ray {
namespace raylet {
using rpc::ActorTableData;
using rpc::ErrorType;
using rpc::GcsNodeInfo;
using rpc::HeartbeatTableData;
@ -273,13 +272,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// returned to idle.
bool FinishAssignedTask(const std::shared_ptr<WorkerInterface> &worker_ptr);
/// Helper function to produce actor table data for a newly created actor.
///
/// \param task_spec RayTask specification of the actor creation task that created the
/// actor.
/// \param worker The port that the actor is listening on.
std::shared_ptr<ActorTableData> CreateActorTableDataFromCreationTask(
const TaskSpecification &task_spec, int port, const WorkerID &worker_id);
/// Handle a worker finishing an assigned actor creation task.
/// \param worker The worker that finished the task.
/// \param task The actor task or actor creation task.

View file

@ -116,16 +116,18 @@ std::shared_ptr<ClusterResourceScheduler> CreateSingleNodeScheduler(
RayTask CreateTask(const std::unordered_map<std::string, double> &required_resources,
int num_args = 0, std::vector<ObjectID> args = {},
std::string serialized_runtime_env = "{}") {
const std::string &serialized_runtime_env = "{}",
const std::vector<std::string> &runtime_env_uris = {}) {
TaskSpecBuilder spec_builder;
TaskID id = RandomTaskId();
JobID job_id = RandomJobId();
rpc::Address address;
spec_builder.SetCommonTaskSpec(
id, "dummy_task", Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""), job_id, TaskID::Nil(), 0,
TaskID::Nil(), address, 0, required_resources, {},
std::make_pair(PlacementGroupID::Nil(), -1), true, "", serialized_runtime_env);
spec_builder.SetCommonTaskSpec(id, "dummy_task", Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("", "", "", ""),
job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0,
required_resources, {},
std::make_pair(PlacementGroupID::Nil(), -1), true, "",
serialized_runtime_env, runtime_env_uris);
if (!args.empty()) {
for (auto &arg : args) {
@ -184,32 +186,33 @@ class ClusterTaskManagerTest : public ::testing::Test {
node_info_calls_(0),
announce_infeasible_task_calls_(0),
dependency_manager_(missing_objects_),
task_manager_(id_, scheduler_, dependency_manager_,
/* is_owner_alive= */
[this](const WorkerID &worker_id, const NodeID &node_id) {
return is_owner_alive_;
},
/* get_node_info= */
[this](const NodeID &node_id) {
node_info_calls_++;
return node_info_[node_id];
},
/* announce_infeasible_task= */
[this](const RayTask &task) { announce_infeasible_task_calls_++; },
pool_, leased_workers_,
/* get_task_arguments= */
[this](const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
for (auto &obj_id : object_ids) {
if (missing_objects_.count(obj_id) == 0) {
results->emplace_back(MakeDummyArg());
} else {
results->emplace_back(nullptr);
}
}
return true;
},
/*max_pinned_task_arguments_bytes=*/1000) {}
task_manager_(
id_, scheduler_, dependency_manager_,
/* is_owner_alive= */
[this](const WorkerID &worker_id, const NodeID &node_id) {
return is_owner_alive_;
},
/* get_node_info= */
[this](const NodeID &node_id) {
node_info_calls_++;
return node_info_[node_id];
},
/* announce_infeasible_task= */
[this](const RayTask &task) { announce_infeasible_task_calls_++; }, pool_,
leased_workers_,
/* get_task_arguments= */
[this](const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
for (auto &obj_id : object_ids) {
if (missing_objects_.count(obj_id) == 0) {
results->emplace_back(MakeDummyArg());
} else {
results->emplace_back(nullptr);
}
}
return true;
},
/*max_pinned_task_arguments_bytes=*/1000) {}
RayObject *MakeDummyArg() {
std::vector<uint8_t> data;

View file

@ -103,9 +103,10 @@ class WorkerPoolMock : public WorkerPool {
const WorkerCommandMap &worker_commands,
absl::flat_hash_map<WorkerID, std::shared_ptr<MockWorkerClient>>
&mock_worker_rpc_clients)
: WorkerPool(io_service, NodeID::FromRandom(), "", POOL_SIZE_SOFT_LIMIT, 0,
MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr, worker_commands,
[]() {}, 0, [this]() { return current_time_ms_; }),
: WorkerPool(
io_service, NodeID::FromRandom(), "", POOL_SIZE_SOFT_LIMIT, 0,
MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr, worker_commands, []() {}, 0,
[this]() { return current_time_ms_; }),
last_worker_process_(),
instrumented_io_service_(io_service),
error_message_type_(1),
@ -458,7 +459,7 @@ static inline TaskSpecification ExampleTaskSpec(
} else {
message.set_type(TaskType::NORMAL_TASK);
}
message.set_serialized_runtime_env(serialized_runtime_env);
message.mutable_runtime_env()->set_serialized_runtime_env(serialized_runtime_env);
return TaskSpecification(std::move(message));
}