[Core] Cache workers by runtime_env in worker pool (#15782)

* pass RuntimeEnv in task spec as opaque string

* lint

* set correct empty value for json: "{}" not ""

* add comment for field in proto

* fix worker pool test by checking both "" and "{}"

* add RAY_CHECK todo

* make dict empty if all values null

* remove unnecessary ser/de

* fix

* address comments

* add WorkerCacheKey with hash function

* clean up

* add naive impl., dedicated workers never killed

* put dedicated workers in idle_of_all_languages

* pipe env hash from worker.py -> Worker

* fully pipe through hash, basic cache test passing

* use int type for runtime env hash

* convert Worker env hash type from size_t to int

* fix

* add method to MockWorker to fix cpp tests

* make compatible with java streaming test

* restore old dynamic_options code to fix java test

* address comments

* add comment about sorting before hash

* add comments for private members of WorkerCacheKey
This commit is contained in:
architkulkarni 2021-05-18 00:19:27 -07:00 committed by GitHub
parent 863532af0a
commit 194c5e3a96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 270 additions and 73 deletions

View file

@ -859,7 +859,7 @@ cdef class CoreWorker:
JobID job_id, GcsClientOptions gcs_options, log_dir,
node_ip_address, node_manager_port, raylet_ip_address,
local_mode, driver_name, stdout_file, stderr_file,
serialized_job_config, metrics_agent_port):
serialized_job_config, metrics_agent_port, runtime_env_hash):
self.is_local_mode = local_mode
cdef CCoreWorkerOptions options = CCoreWorkerOptions()
@ -913,6 +913,7 @@ cdef class CoreWorker:
options.serialized_job_config = serialized_job_config
options.metrics_agent_port = metrics_agent_port
options.connect_on_start = False
options.runtime_env_hash = runtime_env_hash
CCoreWorkerProcess.Initialize(options)
def __dealloc__(self):

View file

@ -286,6 +286,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_string serialized_job_config
int metrics_agent_port
c_bool connect_on_start
int runtime_env_hash
cdef cppclass CCoreWorkerProcess "ray::CoreWorkerProcess":
@staticmethod

View file

@ -886,6 +886,88 @@ def test_override_environment_variables_reuse(shutdown_only):
assert ray.get(f.remote()) is None
def test_override_environment_variables_env_caching(shutdown_only):
"""Test that workers with specified envs are cached and reused.
When a new task or actor is created with a new runtime env, a
new worker process is started. If a subsequent task or actor
uses the same runtime env, the same worker process should be
used. This function checks the pid of the worker to test this.
"""
ray.init()
env_var_name = "TEST123"
val1 = "VAL1"
val2 = "VAL2"
assert os.environ.get(env_var_name) is None
def task():
return os.environ.get(env_var_name), os.getpid()
@ray.remote
def f():
return task()
@ray.remote
def g():
return task()
# Empty runtime env does not set our env var.
assert ray.get(f.remote())[0] is None
# Worker pid1 should have an env var set.
ret_val1, pid1 = ray.get(
f.options(override_environment_variables={
env_var_name: val1
}).remote())
assert ret_val1 == val1
# Worker pid2 should have an env var set to something different.
ret_val2, pid2 = ray.get(
g.options(override_environment_variables={
env_var_name: val2
}).remote())
assert ret_val2 == val2
# Because the runtime env is different, it should use a different process.
assert pid1 != pid2
# Call g with an empty runtime env. It shouldn't reuse pid2, because
# pid2 has an env var set.
_, pid3 = ray.get(g.remote())
assert pid2 != pid3
# Call g with the same runtime env as pid2. Check it uses the same process.
_, pid4 = ray.get(
g.options(override_environment_variables={
env_var_name: val2
}).remote())
assert pid4 == pid2
# Call f with a different runtime env from pid1. Check that it uses a new
# process.
_, pid5 = ray.get(
f.options(override_environment_variables={
env_var_name: val2
}).remote())
assert pid5 != pid1
# Call f with the same runtime env as pid1. Check it uses the same
# process.
_, pid6 = ray.get(
f.options(override_environment_variables={
env_var_name: val1
}).remote())
assert pid6 == pid1
# Same as above but with g instead of f. Shouldn't affect the outcome.
_, pid7 = ray.get(
g.options(override_environment_variables={
env_var_name: val1
}).remote())
assert pid7 == pid1
def test_sync_job_config(shutdown_only):
num_java_workers_per_process = 8
worker_env = {

View file

@ -1126,7 +1126,8 @@ def connect(node,
driver_object_store_memory=None,
job_id=None,
namespace=None,
job_config=None):
job_config=None,
runtime_env_hash=0):
"""Connect this worker to the raylet, to Plasma, and to Redis.
Args:
@ -1140,6 +1141,7 @@ def connect(node,
use in the object store when creating objects.
job_id: The ID of job. If it's None, then we will generate one.
job_config (ray.job_config.JobConfig): The job configuration.
runtime_env_hash (int): The hash of the runtime env for this worker.
"""
# Do some basic checking to make sure we didn't call ray.init twice.
error_message = "Perhaps you called ray.init twice by accident?"
@ -1246,7 +1248,7 @@ def connect(node,
gcs_options, node.get_logs_dir_path(), node.node_ip_address,
node.node_manager_port, node.raylet_ip_address, (mode == LOCAL_MODE),
driver_name, log_stdout_file_path, log_stderr_file_path,
serialized_job_config, node.metrics_agent_port)
serialized_job_config, node.metrics_agent_port, runtime_env_hash)
worker.gcs_client = worker.core_worker.get_gcs_client()
# Create an object for interfacing with the global state.

View file

@ -112,6 +112,12 @@ parser.add_argument(
default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
help="Specify the backup count of rotated log file, default is "
f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.")
parser.add_argument(
"--runtime-env-hash",
required=False,
type=int,
default=0,
help="The computed hash of the runtime env for this worker.")
if __name__ == "__main__":
# NOTE(sang): For some reason, if we move the code below
# to a separate function, tensorflow will capture that method
@ -168,7 +174,7 @@ if __name__ == "__main__":
spawn_reaper=False,
connect_only=True)
ray.worker._global_node = node
ray.worker.connect(node, mode=mode)
ray.worker.connect(node, mode=mode, runtime_env_hash=args.runtime_env_hash)
# Add code search path to sys.path, set load_code_from_local.
core_worker = ray.worker.global_worker.core_worker

View file

@ -386,9 +386,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
std::string serialized_job_config = options_.serialized_job_config;
local_raylet_client_ = std::make_shared<raylet::RayletClient>(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
options_.worker_type, worker_context_.GetCurrentJobID(), options_.language,
options_.node_ip_address, &raylet_client_status, &local_raylet_id, &assigned_port,
&serialized_job_config);
options_.worker_type, worker_context_.GetCurrentJobID(), options_.runtime_env_hash,
options_.language, options_.node_ip_address, &raylet_client_status,
&local_raylet_id, &assigned_port, &serialized_job_config);
if (!raylet_client_status.ok()) {
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.

View file

@ -98,7 +98,8 @@ struct CoreWorkerOptions {
terminate_asyncio_thread(nullptr),
serialized_job_config(""),
metrics_agent_port(-1),
connect_on_start(true) {}
connect_on_start(true),
runtime_env_hash(0) {}
/// Type of this worker (i.e., DRIVER or WORKER).
WorkerType worker_type;
@ -182,6 +183,8 @@ struct CoreWorkerOptions {
/// ready. It should be explicitly startd by a caller using CoreWorker::Start.
/// TODO(sang): Use this method for Java and cpp frontend too.
bool connect_on_start;
/// The hash of the runtime env for this worker.
int runtime_env_hash;
};
/// Lifecycle management of one or more `CoreWorker` instances in a process.

View file

@ -130,6 +130,8 @@ table RegisterClientRequest {
worker_pid: long;
// The job ID if the client is a driver, otherwise it should be NIL.
job_id: string;
// The hash of the runtime env for this worker.
runtime_env_hash: int;
// Language of this worker.
// TODO(hchen): Use `Language` in `common.proto`.
language: int;

View file

@ -1011,6 +1011,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
Language language = static_cast<Language>(message->language());
const JobID job_id = from_flatbuf<JobID>(*message->job_id());
const int runtime_env_hash = static_cast<int>(message->runtime_env_hash());
WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id());
pid_t pid = message->worker_pid();
std::string worker_ip_address = string_from_flatbuf(*message->ip_address());
@ -1025,7 +1026,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
RAY_CHECK(job_id.IsNil());
}
auto worker = std::dynamic_pointer_cast<WorkerInterface>(
std::make_shared<Worker>(job_id, worker_id, language, worker_type,
std::make_shared<Worker>(job_id, runtime_env_hash, worker_id, language, worker_type,
worker_ip_address, client, client_call_manager_));
auto send_reply_callback = [this, client, job_id](Status status, int assigned_port) {

View file

@ -102,6 +102,7 @@ class MockWorker : public WorkerInterface {
RAY_CHECK(false) << "Method unused";
return JobID::Nil();
}
int GetRuntimeEnvHash() const { return 0; }
void AssignActorId(const ActorID &actor_id) { RAY_CHECK(false) << "Method unused"; }
const ActorID &GetActorId() const {
RAY_CHECK(false) << "Method unused";

View file

@ -26,8 +26,9 @@ namespace ray {
namespace raylet {
/// A constructor responsible for initializing the state of a worker.
Worker::Worker(const JobID &job_id, const WorkerID &worker_id, const Language &language,
rpc::WorkerType worker_type, const std::string &ip_address,
Worker::Worker(const JobID &job_id, const int runtime_env_hash, const WorkerID &worker_id,
const Language &language, rpc::WorkerType worker_type,
const std::string &ip_address,
std::shared_ptr<ClientConnection> connection,
rpc::ClientCallManager &client_call_manager)
: worker_id_(worker_id),
@ -38,6 +39,7 @@ Worker::Worker(const JobID &job_id, const WorkerID &worker_id, const Language &l
port_(-1),
connection_(connection),
assigned_job_id_(job_id),
runtime_env_hash_(runtime_env_hash),
bundle_id_(std::make_pair(PlacementGroupID::Nil(), -1)),
dead_(false),
blocked_(false),
@ -116,6 +118,8 @@ const std::unordered_set<TaskID> &Worker::GetBlockedTaskIds() const {
const JobID &Worker::GetAssignedJobId() const { return assigned_job_id_; }
int Worker::GetRuntimeEnvHash() const { return runtime_env_hash_; }
void Worker::AssignActorId(const ActorID &actor_id) {
RAY_CHECK(actor_id_.IsNil())
<< "A worker that is already an actor cannot be assigned an actor ID again.";

View file

@ -64,6 +64,7 @@ class WorkerInterface {
virtual bool RemoveBlockedTaskId(const TaskID &task_id) = 0;
virtual const std::unordered_set<TaskID> &GetBlockedTaskIds() const = 0;
virtual const JobID &GetAssignedJobId() const = 0;
virtual int GetRuntimeEnvHash() const = 0;
virtual void AssignActorId(const ActorID &actor_id) = 0;
virtual const ActorID &GetActorId() const = 0;
virtual void MarkDetachedActor() = 0;
@ -117,9 +118,9 @@ class Worker : public WorkerInterface {
public:
/// A constructor that initializes a worker object.
/// NOTE: You MUST manually set the worker process.
Worker(const JobID &job_id, const WorkerID &worker_id, const Language &language,
rpc::WorkerType worker_type, const std::string &ip_address,
std::shared_ptr<ClientConnection> connection,
Worker(const JobID &job_id, const int runtime_env_hash, const WorkerID &worker_id,
const Language &language, rpc::WorkerType worker_type,
const std::string &ip_address, std::shared_ptr<ClientConnection> connection,
rpc::ClientCallManager &client_call_manager);
/// A destructor responsible for freeing all worker state.
~Worker() {}
@ -149,6 +150,7 @@ class Worker : public WorkerInterface {
bool RemoveBlockedTaskId(const TaskID &task_id);
const std::unordered_set<TaskID> &GetBlockedTaskIds() const;
const JobID &GetAssignedJobId() const;
int GetRuntimeEnvHash() const;
void AssignActorId(const ActorID &actor_id);
const ActorID &GetActorId() const;
void MarkDetachedActor();
@ -230,6 +232,10 @@ class Worker : public WorkerInterface {
TaskID assigned_task_id_;
/// Job ID for the worker's current assigned task.
JobID assigned_job_id_;
/// The hash of the worker's assigned runtime env. We use this in the worker
/// pool to cache and reuse workers with the same runtime env, because
/// installing runtime envs from scratch can be slow.
const int runtime_env_hash_;
/// The worker's actor ID. If this is nil, then the worker is not an actor.
ActorID actor_id_;
/// The worker's placement group bundle. It is used to detect if the worker is

View file

@ -295,6 +295,10 @@ Process WorkerPool::StartWorkerProcess(
worker_command_args.begin() + 3);
}
}
WorkerCacheKey env = {override_environment_variables, serialized_runtime_env};
const std::string runtime_env_hash_str = std::to_string(env.IntHash());
worker_command_args.push_back("--runtime-env-hash=" + runtime_env_hash_str);
}
// We use setproctitle to change python worker process title,
@ -664,23 +668,15 @@ void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
RAY_CHECK(worker->GetAssignedTaskId().IsNil())
<< "Idle workers cannot have an assigned task ID";
auto &state = GetStateForLanguage(worker->GetLanguage());
auto it_p = state.pending_dedicated_workers_to_tasks.find(worker->GetProcess());
auto it_r = state.registered_dedicated_workers_to_tasks.find(worker->GetProcess());
if (it_p != state.pending_dedicated_workers_to_tasks.end()) {
// The worker is used for a task which needs a dedicated worker process.
// Put it into the idle dedicated worker pool.
const auto task_id = it_p->second;
auto it = state.dedicated_workers_to_tasks.find(worker->GetProcess());
if (it != state.dedicated_workers_to_tasks.end()) {
// The worker is used for the actor creation task with dynamic options.
// Put it into idle dedicated worker pool.
const auto task_id = it->second;
state.idle_dedicated_workers[task_id] = worker;
} else if (it_r != state.registered_dedicated_workers_to_tasks.end()) {
// The dedicated worker has been used before and should be added again
// to the idle dedicated worker pool.
const auto task_id = it_r->second;
state.idle_dedicated_workers[task_id] = worker;
// The worker has been assigned to the pool, so we can remove it from this map.
state.registered_dedicated_workers_to_tasks.erase(it_r);
} else {
// The worker is not used for a task which needs a dedicated worker process.
// Put the worker to the general idle pool.
// The worker is not used for the actor creation task with dynamic options.
// Put the worker to the idle pool.
state.idle.insert(worker);
int64_t now = get_time_();
idle_of_all_languages_.emplace_back(worker, now);
@ -835,13 +831,13 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
std::shared_ptr<WorkerInterface> worker = nullptr;
Process proc;
if ((task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) ||
task_spec.OverrideEnvironmentVariables().size() > 0 ||
!(task_spec.SerializedRuntimeEnv() == "{}" ||
task_spec.SerializedRuntimeEnv() == "")) {
if (task_spec.IsActorTask()) {
// Code path of actor task.
RAY_CHECK(false) << "Direct call shouldn't reach here.";
} else if ((task_spec.IsActorCreationTask() &&
!task_spec.DynamicWorkerOptions().empty())) {
// Code path of task that needs a dedicated worker: an actor creation task with
// dynamic worker options, or any task with environment variable overrides, or
// any task with a specified RuntimeEnv.
// dynamic worker options.
// Try to pop it from idle dedicated pool.
auto it = state.idle_dedicated_workers.find(task_spec.TaskId());
if (it != state.idle_dedicated_workers.end()) {
@ -849,13 +845,9 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
worker = std::move(it->second);
state.idle_dedicated_workers.erase(it);
// Because we found a worker that can perform this task,
// we can remove it from pending_dedicated_workers_to_tasks.
state.pending_dedicated_workers_to_tasks.erase(worker->GetProcess());
state.tasks_to_pending_dedicated_workers.erase(task_spec.TaskId());
// We don't want this dedicated worker to end up in the general idle pool because
// it has state from its environment, so keep track of it in this map.
state.registered_dedicated_workers_to_tasks[worker->GetProcess()] =
task_spec.TaskId();
// we can remove it from dedicated_workers_to_tasks.
state.dedicated_workers_to_tasks.erase(worker->GetProcess());
state.tasks_to_dedicated_workers.erase(task_spec.TaskId());
} else if (!HasPendingWorkerForTask(task_spec.GetLanguage(), task_spec.TaskId())) {
// We are not pending a registration from a worker for this task,
// so start a new worker process for this task.
@ -864,21 +856,19 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
dynamic_options = task_spec.DynamicWorkerOptions();
}
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId(), dynamic_options,
task_spec.SerializedRuntimeEnv(),
task_spec.OverrideEnvironmentVariables());
task_spec.JobId(), dynamic_options);
if (proc.IsValid()) {
state.pending_dedicated_workers_to_tasks[proc] = task_spec.TaskId();
state.tasks_to_pending_dedicated_workers[task_spec.TaskId()] = proc;
state.dedicated_workers_to_tasks[proc] = task_spec.TaskId();
state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc;
}
}
} else if (task_spec.IsActorTask()) {
// Code path of actor task.
RAY_CHECK(false) << "Direct call shouldn't reach here.";
} else {
// Code path of normal task or actor creation task without dynamic worker options.
// Find an available worker which is already assigned to this job.
// Find an available worker which is already assigned to this job and which has
// the specified runtime env.
// Try to pop the most recently pushed worker.
const WorkerCacheKey env = {task_spec.OverrideEnvironmentVariables(),
task_spec.SerializedRuntimeEnv()};
const int runtime_env_hash = env.IntHash();
for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend();
it++) {
if (task_spec.GetLanguage() != it->first->GetLanguage() ||
@ -891,6 +881,11 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
if (pending_exit_idle_workers_.count(it->first->WorkerId())) {
continue;
}
// Skip if the runtime env doesn't match.
if (runtime_env_hash != it->first->GetRuntimeEnvHash()) {
continue;
}
state.idle.erase(it->first);
// We can't erase a reverse_iterator.
auto lit = it.base();
@ -905,7 +900,9 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
// There are no more non-actor workers available to execute this task.
// Start a new worker process.
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId());
task_spec.JobId(), {}, /* dynamic_options */
task_spec.SerializedRuntimeEnv(),
task_spec.OverrideEnvironmentVariables());
}
}
@ -921,11 +918,14 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
int64_t backlog_size) {
// Code path of task that needs a dedicated worker: an actor creation task with
// dynamic worker options, or any task with environment variable overrides.
// Code path of task that needs a dedicated worker.
if ((task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) ||
task_spec.OverrideEnvironmentVariables().size() > 0) {
task_spec.OverrideEnvironmentVariables().size() > 0 ||
!(task_spec.SerializedRuntimeEnv() == "{}" ||
task_spec.SerializedRuntimeEnv() == "")) {
return; // Not handled.
// TODO(architkulkarni): We'd eventually like to prestart workers with the same
// runtime env to improve initial startup performance.
}
auto &state = GetStateForLanguage(task_spec.GetLanguage());
@ -1100,8 +1100,8 @@ void WorkerPool::WarnAboutSize() {
bool WorkerPool::HasPendingWorkerForTask(const Language &language,
const TaskID &task_id) {
auto &state = GetStateForLanguage(language);
auto it = state.tasks_to_pending_dedicated_workers.find(task_id);
return it != state.tasks_to_pending_dedicated_workers.end();
auto it = state.tasks_to_dedicated_workers.find(task_id);
return it != state.tasks_to_dedicated_workers.end();
}
void WorkerPool::TryStartIOWorkers(const Language &language) {
@ -1189,6 +1189,47 @@ WorkerPool::IOWorkerState &WorkerPool::GetIOWorkerStateFromWorkerType(
UNREACHABLE;
}
WorkerCacheKey::WorkerCacheKey(
const std::unordered_map<std::string, std::string> override_environment_variables,
const std::string serialized_runtime_env)
: override_environment_variables(override_environment_variables),
serialized_runtime_env(serialized_runtime_env) {}
bool WorkerCacheKey::operator==(const WorkerCacheKey &k) const {
return Hash() == k.Hash();
}
bool WorkerCacheKey::EnvIsEmpty() const {
return override_environment_variables.size() == 0 &&
(serialized_runtime_env == "" || serialized_runtime_env == "{}");
}
std::size_t WorkerCacheKey::Hash() const {
// Cache the hash value.
if (!hash_) {
if (EnvIsEmpty()) {
// It's useful to have the same predetermined value for both unspecified and empty
// runtime envs.
hash_ = 0;
} else {
std::vector<std::pair<std::string, std::string>> env_vars(
override_environment_variables.begin(), override_environment_variables.end());
// The environment doesn't depend the order of the variables, so the hash should not
// either. Sort the variables so different permutations yield the same hash.
std::sort(env_vars.begin(), env_vars.end());
for (auto &pair : env_vars) {
boost::hash_combine(hash_, pair.first);
boost::hash_combine(hash_, pair.second);
}
boost::hash_combine(hash_, serialized_runtime_env);
}
}
return hash_;
}
int WorkerCacheKey::IntHash() const { return (int)Hash(); }
} // namespace raylet
} // namespace ray

View file

@ -18,6 +18,7 @@
#include <algorithm>
#include <boost/asio/io_service.hpp>
#include <boost/functional/hash.hpp>
#include <queue>
#include <unordered_map>
#include <unordered_set>
@ -39,6 +40,51 @@ namespace raylet {
using WorkerCommandMap =
std::unordered_map<Language, std::vector<std::string>, std::hash<int>>;
/// \class WorkerCacheKey
///
/// Class used to cache workers, keyed by runtime_env.
class WorkerCacheKey {
public:
/// Create a cache key with the given environment variable overrides and serialized
/// runtime_env.
///
/// \param override_environment_variables The environment variable overrides set in this
/// worker. \param serialized_runtime_env The JSON-serialized runtime env for this
/// worker.
WorkerCacheKey(
const std::unordered_map<std::string, std::string> override_environment_variables,
const std::string serialized_runtime_env);
bool operator==(const WorkerCacheKey &k) const;
/// Check if this worker's environment is empty (the default).
///
/// \return true if there are no environment variables set and the runtime env is the
/// empty string (protobuf default) or a JSON-serialized empty dict.
bool EnvIsEmpty() const;
/// Get the hash for this worker's environment.
///
/// \return The hash of the override_environment_variables and the serialized
/// runtime_env.
std::size_t Hash() const;
/// Get the int-valued hash for this worker's environment, useful for portability in
/// flatbuffers.
///
/// \return The hash truncated to an int.
int IntHash() const;
private:
/// The environment variable overrides for this worker.
const std::unordered_map<std::string, std::string> override_environment_variables;
/// The JSON-serialized runtime env for this worker.
const std::string serialized_runtime_env;
/// The cached hash of the worker's environment. This is set to 0
/// for unspecified or empty environments.
mutable std::size_t hash_ = 0;
};
/// \class WorkerPoolInterface
///
/// Used for new scheduler unit tests.
@ -385,7 +431,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// The commands and arguments used to start the worker process
std::vector<std::string> worker_command;
/// The pool of dedicated workers for actor creation tasks
/// with prefix or suffix worker command.
/// with dynamic worker options (prefix or suffix worker command.)
std::unordered_map<TaskID, std::shared_ptr<WorkerInterface>> idle_dedicated_workers;
/// The pool of idle non-actor workers.
std::unordered_set<std::shared_ptr<WorkerInterface>> idle;
@ -406,14 +452,11 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// A map from the pids of starting worker processes
/// to the number of their unregistered workers.
std::unordered_map<Process, int> starting_worker_processes;
/// A map for looking up the task with dynamic options by the pid of the pending
/// A map for looking up the task with dynamic options by the pid of
/// worker. Note that this is used for the dedicated worker processes.
std::unordered_map<Process, TaskID> pending_dedicated_workers_to_tasks;
std::unordered_map<Process, TaskID> dedicated_workers_to_tasks;
/// A map for speeding up looking up the pending worker for the given task.
std::unordered_map<TaskID, Process> tasks_to_pending_dedicated_workers;
/// A map for looking up tasks with existing dedicated worker processes (processes
/// with a specially installed environment) so the processes can be reused.
std::unordered_map<Process, TaskID> registered_dedicated_workers_to_tasks;
std::unordered_map<TaskID, Process> tasks_to_dedicated_workers;
/// We'll push a warning to the user every time a multiple of this many
/// worker processes has been started.
int multiple_for_warning;

View file

@ -182,7 +182,7 @@ class WorkerPoolTest : public ::testing::Test {
ClientConnection::Create(client_handler, message_handler, std::move(socket),
"worker", {}, error_message_type_);
std::shared_ptr<Worker> worker_ =
std::make_shared<Worker>(job_id, WorkerID::FromRandom(), language, worker_type,
std::make_shared<Worker>(job_id, 0, WorkerID::FromRandom(), language, worker_type,
"127.0.0.1", client, client_call_manager_);
std::shared_ptr<WorkerInterface> worker =
std::dynamic_pointer_cast<WorkerInterface>(worker_);

View file

@ -84,18 +84,19 @@ raylet::RayletClient::RayletClient(
instrumented_io_context &io_service,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, Status *status, NodeID *raylet_id, int *port,
std::string *serialized_job_config)
rpc::WorkerType worker_type, const JobID &job_id, const int &runtime_env_hash,
const Language &language, const std::string &ip_address, Status *status,
NodeID *raylet_id, int *port, std::string *serialized_job_config)
: grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) {
conn_ = std::make_unique<raylet::RayletConnection>(io_service, raylet_socket, -1, -1);
flatbuffers::FlatBufferBuilder fbb;
// TODO(suquark): Use `WorkerType` in `common.proto` without converting to int.
// TODO(architkulkarni) this creates the message
auto message = protocol::CreateRegisterClientRequest(
fbb, static_cast<int>(worker_type), to_flatbuf(fbb, worker_id), getpid(),
to_flatbuf(fbb, job_id), language, fbb.CreateString(ip_address), /*port=*/0,
fbb.CreateString(*serialized_job_config));
to_flatbuf(fbb, job_id), runtime_env_hash, language, fbb.CreateString(ip_address),
/*port=*/0, fbb.CreateString(*serialized_job_config));
fbb.Finish(message);
// Register the process ID with the raylet.
// NOTE(swang): If raylet exits and we are registered as a worker, we will get killed.

View file

@ -213,6 +213,7 @@ class RayletClient : public RayletClientInterface {
/// \param worker_type The type of the worker. If it is a certain worker type, an
/// additional message will be sent to register as one.
/// \param job_id The job ID of the driver or worker.
/// \param runtime_env_hash The hash of the runtime env of the worker.
/// \param language Language of the worker.
/// \param ip_address The IP address of the worker.
/// \param status This will be populated with the result of connection attempt.
@ -227,7 +228,8 @@ class RayletClient : public RayletClientInterface {
RayletClient(instrumented_io_context &io_service,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
rpc::WorkerType worker_type, const JobID &job_id,
const int &runtime_env_hash, const Language &language,
const std::string &ip_address, Status *status, NodeID *raylet_id,
int *port, std::string *serialized_job_config);

View file

@ -639,10 +639,11 @@ class StreamingWorker {
} // namespace ray
int main(int argc, char **argv) {
RAY_CHECK(argc == 4);
RAY_CHECK(argc == 5);
auto store_socket = std::string(argv[1]);
auto raylet_socket = std::string(argv[2]);
auto node_manager_port = std::stoi(std::string(argv[3]));
// auto runtime_env_hash = std::string(argv[4]); // Unused in this test
ray::gcs::GcsClientOptions gcs_options("127.0.0.1", 6379, "");
ray::streaming::StreamingWorker worker(store_socket, raylet_socket, node_manager_port,