Remove local/global_scheduler from code and doc. (#4549)

This commit is contained in:
Yuhong Guo 2019-04-04 08:05:09 +08:00 committed by Philipp Moritz
parent 51dae23d5c
commit c2349cf12d
29 changed files with 177 additions and 204 deletions

View file

@ -46,7 +46,7 @@ To actually create an actor, we can instantiate this class by calling
When an actor is instantiated, the following events happen.
1. A node in the cluster is chosen and a worker process is created on that node
(by the local scheduler on that node) for the purpose of running methods
(by the raylet on that node) for the purpose of running methods
called on the actor.
2. A ``Counter`` object is created on that worker and the ``Counter``
constructor is run.
@ -64,8 +64,8 @@ We can schedule tasks on the actor by calling its methods.
When ``a1.increment.remote()`` is called, the following events happens.
1. A task is created.
2. The task is assigned directly to the local scheduler responsible for the
actor by the driver's local scheduler.
2. The task is assigned directly to the raylet responsible for the
actor by the driver's raylet.
3. An object ID is returned.
We can then call ``ray.get`` on the object ID to retrieve the actual value.

View file

@ -15,7 +15,7 @@ Running Ray standalone
Ray can be used standalone by calling ``ray.init()`` within a script. When the
call to ``ray.init()`` happens, all of the relevant processes are started.
These include a local scheduler, an object store and manager, a Redis server,
These include a raylet, an object store and manager, a Redis server,
and a number of worker processes.
When the script exits, these processes will be killed.
@ -109,31 +109,26 @@ When a driver or worker invokes a remote function, a number of things happen.
- The ID of the task. This is generated uniquely from the above content.
- The IDs for the return values of the task. These are generated uniquely
from the above content.
- The task object is then sent to the local scheduler on the same node as the
driver or worker.
- The local scheduler makes a decision to either schedule the task locally or to
pass the task on to another local scheduler.
- The task object is then sent to the raylet on the same node as the driver
or worker.
- The raylet makes a decision to either schedule the task locally or to
pass the task on to another raylet.
- If all of the task's object dependencies are present in the local object
store and there are enough CPU and GPU resources available to execute the
task, then the local scheduler will assign the task to one of its
available workers.
- If those conditions are not met, the task will be passed on to a global
scheduler. This is done by adding the task to the **task table**, which is
part of the centralized control state.
task, then the raylet will assign the task to one of its available workers.
- If those conditions are not met, the task will be forwarded to another
raylet. This is done by peer-to-peer connection between raylets.
The task table can be inspected as follows.
.. code-block:: python
TODO: Fill this in.
A global scheduler will be notified of the update and will assign the task
to a local scheduler by updating the task's state in the task table. The
local scheduler will be notified and pull the task object.
- Once a task has been scheduled to a local scheduler, whether by itself or by
a global scheduler, the local scheduler queues the task for execution. A task
is assigned to a worker when enough resources become available and the object
dependencies are available locally, in first-in, first-out order.
- Once a task has been scheduled to a raylet, the raylet queues
the task for execution. A task is assigned to a worker when enough resources
become available and the object dependencies are available locally,
in first-in, first-out order.
- When the task has been assigned to a worker, the worker executes the task and
puts the task's return values into the object store. The object store will
then update the **object table**, which is part of the centralized control
@ -157,7 +152,7 @@ Notes and limitations
- When an object store on a particular node fills up, it will begin evicting
objects in a least-recently-used manner. If an object that is needed later is
evicted, then the call to ``ray.get`` for that object will initiate the
reconstruction of the object. The local scheduler will attempt to reconstruct
reconstruction of the object. The raylet will attempt to reconstruct
the object by replaying its task lineage.
TODO: Limitations on reconstruction.
@ -183,7 +178,7 @@ Several things happen when a driver or worker calls ``ray.get`` on an object ID.
state will notify the requesting manager when the object is created. If the
object doesn't exist anywhere because it has been evicted from all object
stores, the worker will also request reconstruction of the object from the
local scheduler. These checks repeat periodically until the object is
raylet. These checks repeat periodically until the object is
available in the local object store, whether through reconstruction or
through object transfer.
- Once the object is available in the local object store, the driver or worker

View file

@ -20,12 +20,12 @@ When using Ray, several processes are involved.
- One **object store** per node stores immutable objects in shared memory and
allows workers to efficiently share objects on the same node with minimal
copying and deserialization.
- One **local scheduler** per node assigns tasks to workers on the same node.
- One **raylet** per node assigns tasks to workers on the same node.
- A **driver** is the Python process that the user controls. For example, if the
user is running a script or using a Python shell, then the driver is the Python
process that runs the script or the shell. A driver is similar to a worker in
that it can submit tasks to its local scheduler and get objects from the object
store, but it is different in that the local scheduler will not assign tasks to
that it can submit tasks to its raylet and get objects from the object
store, but it is different in that the raylet will not assign tasks to
the driver to be executed.
- A **Redis server** maintains much of the system's state. For example, it keeps
track of which objects live on which machines and of the task specifications
@ -167,7 +167,7 @@ to parallelize computation.
There is a sharp distinction between *submitting a task* and *executing the
task*. When a remote function is called, the task of executing that function is
submitted to a local scheduler, and object IDs for the outputs of the task are
submitted to a raylet, and object IDs for the outputs of the task are
immediately returned. However, the task will not be executed until the system
actually schedules the task on a worker. Task execution is **not** done lazily.
The system moves the input data to the task, and the task will execute as soon

View file

@ -49,7 +49,7 @@ Now we've started all of the Ray processes on each node Ray. This includes
- Some worker processes on each machine.
- An object store on each machine.
- A local scheduler on each machine.
- A raylet on each machine.
- Multiple Redis servers (on the head node).
To run some commands, start up Python on one of the nodes in the cluster, and do

View file

@ -38,7 +38,7 @@ public class RayletClientImpl implements RayletClient {
);
/**
* The pointer to c++'s local scheduler client.
* The pointer to c++'s raylet client.
*/
private long client = 0;

View file

@ -708,9 +708,9 @@ def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions):
def __ray_terminate__(self):
worker = ray.worker.get_global_worker()
if worker.mode != ray.LOCAL_MODE:
# Disconnect the worker from the local scheduler. The point of
# this is so that when the worker kills itself below, the local
# scheduler won't push an error message to the driver.
# Disconnect the worker from the raylet. The point of
# this is so that when the worker kills itself below, the
# raylet won't push an error message to the driver.
worker.raylet_client.disconnect()
sys.exit(0)
assert False, "This process should have terminated."
@ -719,7 +719,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, max_reconstructions):
"""Save a checkpoint.
This task saves the current state of the actor, the current task
frontier according to the local scheduler, and the checkpoint index
frontier according to the raylet, and the checkpoint index
(number of tasks executed so far).
"""
worker = ray.worker.global_worker

View file

@ -133,7 +133,7 @@ CLUSTER_CONFIG_SCHEMA = {
class LoadMetrics(object):
"""Container for cluster load metrics.
Metrics here are updated from local scheduler heartbeats. The autoscaler
Metrics here are updated from raylet heartbeats. The autoscaler
queries these metrics to determine when to scale up, and which nodes
can be removed.
"""

View file

@ -725,8 +725,7 @@ class GlobalState(object):
actor_info[binary_to_hex(actor_id)] = {
"class_id": binary_to_hex(info[b"class_id"]),
"driver_id": binary_to_hex(info[b"driver_id"]),
"local_scheduler_id": binary_to_hex(
info[b"local_scheduler_id"]),
"raylet_id": binary_to_hex(info[b"raylet_id"]),
"num_gpus": int(info[b"num_gpus"]),
"removed": decode(info[b"removed"]) == "True"
}

View file

@ -36,13 +36,13 @@ cdef extern from "ray/ray_config.h" nogil:
int64_t connect_timeout_milliseconds() const
int64_t local_scheduler_fetch_timeout_milliseconds() const
int64_t raylet_fetch_timeout_milliseconds() const
int64_t local_scheduler_reconstruction_timeout_milliseconds() const
int64_t raylet_reconstruction_timeout_milliseconds() const
int64_t max_num_to_reconstruct() const
int64_t local_scheduler_fetch_request_size() const
int64_t raylet_fetch_request_size() const
int64_t kill_worker_timeout_milliseconds() const

View file

@ -59,22 +59,22 @@ cdef class Config:
return RayConfig.instance().connect_timeout_milliseconds()
@staticmethod
def local_scheduler_fetch_timeout_milliseconds():
def raylet_fetch_timeout_milliseconds():
return (RayConfig.instance()
.local_scheduler_fetch_timeout_milliseconds())
.raylet_fetch_timeout_milliseconds())
@staticmethod
def local_scheduler_reconstruction_timeout_milliseconds():
def raylet_reconstruction_timeout_milliseconds():
return (RayConfig.instance()
.local_scheduler_reconstruction_timeout_milliseconds())
.raylet_reconstruction_timeout_milliseconds())
@staticmethod
def max_num_to_reconstruct():
return RayConfig.instance().max_num_to_reconstruct()
@staticmethod
def local_scheduler_fetch_request_size():
return RayConfig.instance().local_scheduler_fetch_request_size()
def raylet_fetch_request_size():
return RayConfig.instance().raylet_fetch_request_size()
@staticmethod
def kill_worker_timeout_milliseconds():

View file

@ -48,9 +48,9 @@ class Monitor(object):
# Setup subscriptions to the primary Redis server and the Redis shards.
self.primary_subscribe_client = self.redis.pubsub(
ignore_subscribe_messages=True)
# Keep a mapping from local scheduler client ID to IP address to use
# Keep a mapping from raylet client ID to IP address to use
# for updating the load metrics.
self.local_scheduler_id_to_ip_map = {}
self.raylet_id_to_ip_map = {}
self.load_metrics = LoadMetrics()
if autoscaling_config:
self.autoscaler = StandardAutoscaler(autoscaling_config,
@ -126,9 +126,9 @@ class Monitor(object):
static_resources[static] = (
heartbeat_message.ResourcesTotalCapacity(i))
# Update the load metrics for this local scheduler.
# Update the load metrics for this raylet.
client_id = ray.utils.binary_to_hex(heartbeat_message.ClientId())
ip = self.local_scheduler_id_to_ip_map.get(client_id)
ip = self.raylet_id_to_ip_map.get(client_id)
if ip:
self.load_metrics.update(ip, static_resources,
dynamic_resources)
@ -243,7 +243,7 @@ class Monitor(object):
# Determine the appropriate message handler.
if channel == ray.gcs_utils.XRAY_HEARTBEAT_BATCH_CHANNEL:
# Similar functionality as local scheduler info channel
# Similar functionality as raylet info channel
message_handler = self.xray_heartbeat_batch_handler
elif channel == ray.gcs_utils.XRAY_DRIVER_CHANNEL:
# Handles driver death.
@ -254,16 +254,15 @@ class Monitor(object):
# Call the handler.
message_handler(channel, data)
def update_local_scheduler_map(self):
local_schedulers = self.state.client_table()
self.local_scheduler_id_to_ip_map = {}
for local_scheduler_info in local_schedulers:
client_id = local_scheduler_info.get("DBClientID") or \
local_scheduler_info["ClientID"]
ip_address = (
local_scheduler_info.get("AuxAddress")
or local_scheduler_info["NodeManagerAddress"]).split(":")[0]
self.local_scheduler_id_to_ip_map[client_id] = ip_address
def update_raylet_map(self):
all_raylet_nodes = self.state.client_table()
self.raylet_id_to_ip_map = {}
for raylet_info in all_raylet_nodes:
client_id = (raylet_info.get("DBClientID")
or raylet_info["ClientID"])
ip_address = (raylet_info.get("AuxAddress")
or raylet_info["NodeManagerAddress"]).split(":")[0]
self.raylet_id_to_ip_map[client_id] = ip_address
def _maybe_flush_gcs(self):
"""Experimental: issue a flush request to the GCS.
@ -311,9 +310,9 @@ class Monitor(object):
# Handle messages from the subscription channels.
while True:
# Update the mapping from local scheduler client ID to IP address.
# Update the mapping from raylet client ID to IP address.
# This is only used to update the load metrics for the autoscaler.
self.update_local_scheduler_map()
self.update_raylet_map()
# Process autoscaling actions
if self.autoscaler:

View file

@ -13,9 +13,8 @@ class RayParams(object):
Attributes:
redis_address (str): The address of the Redis server to connect to. If
this address is not provided, then this command will start Redis, a
global scheduler, a local scheduler, a plasma store, a plasma
manager, and some workers. It will also kill these processes when
Python exits.
raylet, a plasma store, a plasma manager, and some workers.
It will also kill these processes when Python exits.
redis_port (int): The port that the primary Redis shard should listen
to. If None, then a random port will be chosen.
redis_shard_ports: A list of the ports to use for the non-primary Redis

View file

@ -95,8 +95,8 @@ class Profiler(object):
"""Drivers run this as a thread to flush profile data in the
background."""
# Note(rkn): This is run on a background thread in the driver. It uses
# the local scheduler client. This should be ok because it doesn't read
# from the local scheduler client and we have the GIL here. However,
# the raylet client. This should be ok because it doesn't read
# from the raylet client and we have the GIL here. However,
# if either of those things changes, then we could run into issues.
while True:
# Sleep for 1 second. This will be interrupted if

View file

@ -970,11 +970,11 @@ def check_and_update_resources(num_cpus, num_gpus, resources):
# See if CUDA_VISIBLE_DEVICES has already been set.
gpu_ids = ray.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the local scheduler wants doesn't
# Check that the number of GPUs that the raylet wants doesn't
# excede the amount allowed by CUDA_VISIBLE_DEVICES.
if ("GPU" in resources and gpu_ids is not None
and resources["GPU"] > len(gpu_ids)):
raise Exception("Attempting to start local scheduler with {} GPUs, "
raise Exception("Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(
resources["GPU"], gpu_ids))

View file

@ -873,7 +873,7 @@ def test_actor_load_balancing(ray_start_cluster):
num_attempts = 20
minimum_count = 5
# Make sure that actors are spread between the local schedulers.
# Make sure that actors are spread between the raylets.
attempts = 0
while attempts < num_attempts:
actors = [Actor1.remote() for _ in range(num_actors)]
@ -1363,7 +1363,7 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
self.x += 1
return self.x
# Create an actor that is not on the local scheduler.
# Create an actor that is not on the raylet.
actor = Counter.remote()
while (ray.get(actor.local_plasma.remote()) !=
remote_node.plasma_store_socket_name):
@ -1496,7 +1496,7 @@ def setup_counter_actor(test_checkpoint=False,
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
# Create an actor that is not on the local scheduler.
# Create an actor that is not on the raylet.
actor = Counter.remote(save_exception)
while ray.get(actor.local_plasma.remote()) == local_plasma:
actor = Counter.remote(save_exception)
@ -1531,7 +1531,7 @@ def test_distributed_handle(ray_start_cluster_2_nodes):
count += num_incs * num_iters
# Kill the second plasma store to get rid of the cached objects and
# trigger the corresponding local scheduler to exit.
# trigger the corresponding raylet to exit.
cluster.list_all_nodes()[1].kill_plasma_store(wait=True)
# Check that the actor did not restore from a checkpoint.
@ -1570,7 +1570,7 @@ def test_remote_checkpoint_distributed_handle(ray_start_cluster_2_nodes):
count += num_incs * num_iters
# Kill the second plasma store to get rid of the cached objects and
# trigger the corresponding local scheduler to exit.
# trigger the corresponding raylet to exit.
cluster.list_all_nodes()[1].kill_plasma_store(wait=True)
# Check that the actor restored from a checkpoint.
@ -1610,7 +1610,7 @@ def test_checkpoint_distributed_handle(ray_start_cluster_2_nodes):
count += num_incs * num_iters
# Kill the second plasma store to get rid of the cached objects and
# trigger the corresponding local scheduler to exit.
# trigger the corresponding raylet to exit.
cluster.list_all_nodes()[1].kill_plasma_store(wait=True)
# Check that the actor restored from a checkpoint.
@ -1638,7 +1638,7 @@ def _test_nondeterministic_reconstruction(
def read(self):
return self.queue
# Schedule the shared queue onto the remote local scheduler.
# Schedule the shared queue onto the remote raylet.
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
actor = Queue.remote()
while ray.get(actor.local_plasma.remote()) == local_plasma:
@ -1673,7 +1673,7 @@ def _test_nondeterministic_reconstruction(
queue = ray.get(actor.read.remote())
# Kill the second plasma store to get rid of the cached objects and
# trigger the corresponding local scheduler to exit.
# trigger the corresponding raylet to exit.
cluster.list_all_nodes()[1].kill_plasma_store(wait=True)
# Read the queue again and check for deterministic reconstruction.
@ -2267,7 +2267,7 @@ def test_multiple_actor_reconstruction(ray_start_cluster_head):
result_ids = collections.defaultdict(lambda: [])
# In a loop we are going to create some actors, run some methods, kill
# a local scheduler, and run some more methods.
# a raylet, and run some more methods.
for node in worker_nodes:
# Create some actors.
actors.extend(

View file

@ -1406,9 +1406,8 @@ def test_free_objects_multi_node(ray_start_cluster):
# This test will do following:
# 1. Create 3 raylets that each hold an actor.
# 2. Each actor creates an object which is the deletion target.
# 3. Invoke 64 methods on each actor to flush plasma client.
# 4. After flushing, the plasma client releases the targets.
# 5. Check that the deletion targets have been deleted.
# 3. Wait 0.1 second for the objects to be deleted.
# 4. Check that the deletion targets have been deleted.
# Caution: if remote functions are used instead of actor methods,
# one raylet may create more than one worker to execute the
# tasks, so the flushing operations may be executed in different
@ -1423,20 +1422,13 @@ def test_free_objects_multi_node(ray_start_cluster):
_internal_config=config)
ray.init(redis_address=cluster.redis_address)
@ray.remote(resources={"Custom0": 1})
class ActorOnNode0(object):
class RawActor(object):
def get(self):
return ray.worker.global_worker.plasma_client.store_socket_name
@ray.remote(resources={"Custom1": 1})
class ActorOnNode1(object):
def get(self):
return ray.worker.global_worker.plasma_client.store_socket_name
@ray.remote(resources={"Custom2": 1})
class ActorOnNode2(object):
def get(self):
return ray.worker.global_worker.plasma_client.store_socket_name
ActorOnNode0 = ray.remote(resources={"Custom0": 1})(RawActor)
ActorOnNode1 = ray.remote(resources={"Custom1": 1})(RawActor)
ActorOnNode2 = ray.remote(resources={"Custom2": 1})(RawActor)
def create(actors):
a = actors[0].get.remote()
@ -1447,15 +1439,6 @@ def test_free_objects_multi_node(ray_start_cluster):
assert len(l2) == 0
return (a, b, c)
def flush(actors):
# Flush the Release History.
# Current Plasma Client Cache will maintain 64-item list.
# If the number changed, this will fail.
logger.info("Start Flush!")
for i in range(64):
ray.get([actor.get.remote() for actor in actors])
logger.info("Flush finished!")
def run_one_test(actors, local_only):
(a, b, c) = create(actors)
# The three objects should be generated on different object stores.
@ -1463,7 +1446,8 @@ def test_free_objects_multi_node(ray_start_cluster):
assert ray.get(a) != ray.get(c)
assert ray.get(c) != ray.get(b)
ray.internal.free([a, b, c], local_only=local_only)
flush(actors)
# Wait for the objects to be deleted.
time.sleep(0.1)
return (a, b, c)
actors = [
@ -1819,7 +1803,7 @@ def test_zero_cpus_actor(ray_start_cluster):
def method(self):
return ray.worker.global_worker.plasma_client.store_socket_name
# Make sure tasks and actors run on the remote local scheduler.
# Make sure tasks and actors run on the remote raylet.
a = Foo.remote()
assert ray.get(a.method.remote()) != local_plasma
@ -1875,10 +1859,10 @@ def test_fractional_resources(shutdown_only):
Foo2._remote([], {}, resources={"Custom": 1.5})
def test_multiple_local_schedulers(ray_start_cluster):
def test_multiple_raylets(ray_start_cluster):
# This test will define a bunch of tasks that can only be assigned to
# specific local schedulers, and we will check that they are assigned
# to the correct local schedulers.
# specific raylets, and we will check that they are assigned
# to the correct raylets.
cluster = ray_start_cluster
cluster.add_node(num_cpus=11, num_gpus=0)
cluster.add_node(num_cpus=5, num_gpus=5)
@ -1888,20 +1872,20 @@ def test_multiple_local_schedulers(ray_start_cluster):
# Define a bunch of remote functions that all return the socket name of
# the plasma store. Since there is a one-to-one correspondence between
# plasma stores and local schedulers (at least right now), this can be
# used to identify which local scheduler the task was assigned to.
# plasma stores and raylets (at least right now), this can be
# used to identify which raylet the task was assigned to.
# This must be run on the zeroth local scheduler.
# This must be run on the zeroth raylet.
@ray.remote(num_cpus=11)
def run_on_0():
return ray.worker.global_worker.plasma_client.store_socket_name
# This must be run on the first local scheduler.
# This must be run on the first raylet.
@ray.remote(num_gpus=2)
def run_on_1():
return ray.worker.global_worker.plasma_client.store_socket_name
# This must be run on the second local scheduler.
# This must be run on the second raylet.
@ray.remote(num_cpus=6, num_gpus=1)
def run_on_2():
return ray.worker.global_worker.plasma_client.store_socket_name
@ -1911,12 +1895,12 @@ def test_multiple_local_schedulers(ray_start_cluster):
def run_on_0_1_2():
return ray.worker.global_worker.plasma_client.store_socket_name
# This must be run on the first or second local scheduler.
# This must be run on the first or second raylet.
@ray.remote(num_gpus=1)
def run_on_1_2():
return ray.worker.global_worker.plasma_client.store_socket_name
# This must be run on the zeroth or second local scheduler.
# This must be run on the zeroth or second raylet.
@ray.remote(num_cpus=8)
def run_on_0_2():
return ray.worker.global_worker.plasma_client.store_socket_name
@ -2022,15 +2006,15 @@ def test_custom_resources(ray_start_cluster):
ray.get([f.remote() for _ in range(5)])
return ray.worker.global_worker.plasma_client.store_socket_name
# The f tasks should be scheduled on both local schedulers.
# The f tasks should be scheduled on both raylets.
assert len(set(ray.get([f.remote() for _ in range(50)]))) == 2
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
# The g tasks should be scheduled only on the second local scheduler.
local_scheduler_ids = set(ray.get([g.remote() for _ in range(50)]))
assert len(local_scheduler_ids) == 1
assert list(local_scheduler_ids)[0] != local_plasma
# The g tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([g.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
assert list(raylet_ids)[0] != local_plasma
# Make sure that resource bookkeeping works when a task that uses a
# custom resources gets blocked.
@ -2076,16 +2060,16 @@ def test_two_custom_resources(ray_start_cluster):
time.sleep(0.001)
return ray.worker.global_worker.plasma_client.store_socket_name
# The f and g tasks should be scheduled on both local schedulers.
# The f and g tasks should be scheduled on both raylets.
assert len(set(ray.get([f.remote() for _ in range(50)]))) == 2
assert len(set(ray.get([g.remote() for _ in range(50)]))) == 2
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
# The h tasks should be scheduled only on the second local scheduler.
local_scheduler_ids = set(ray.get([h.remote() for _ in range(50)]))
assert len(local_scheduler_ids) == 1
assert list(local_scheduler_ids)[0] != local_plasma
# The h tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([h.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
assert list(raylet_ids)[0] != local_plasma
# Make sure that tasks with unsatisfied custom resource requirements do
# not get scheduled.
@ -2242,8 +2226,8 @@ def attempt_to_load_balance(remote_function,
def test_load_balancing(ray_start_cluster):
# This test ensures that tasks are being assigned to all local
# schedulers in a roughly equal manner.
# This test ensures that tasks are being assigned to all raylets
# in a roughly equal manner.
cluster = ray_start_cluster
num_nodes = 3
num_cpus = 7
@ -2261,9 +2245,8 @@ def test_load_balancing(ray_start_cluster):
def test_load_balancing_with_dependencies(ray_start_cluster):
# This test ensures that tasks are being assigned to all local
# schedulers in a roughly equal manner even when the tasks have
# dependencies.
# This test ensures that tasks are being assigned to all raylets in a
# roughly equal manner even when the tasks have dependencies.
cluster = ray_start_cluster
num_nodes = 3
for _ in range(num_nodes):
@ -2275,9 +2258,8 @@ def test_load_balancing_with_dependencies(ray_start_cluster):
time.sleep(0.010)
return ray.worker.global_worker.plasma_client.store_socket_name
# This object will be local to one of the local schedulers. Make sure
# this doesn't prevent tasks from being scheduled on other local
# schedulers.
# This object will be local to one of the raylets. Make sure
# this doesn't prevent tasks from being scheduled on other raylets.
x = ray.put(np.zeros(1000000))
attempt_to_load_balance(f, [x], 100, num_nodes, 25)

View file

@ -315,7 +315,7 @@ def check_components_alive(cluster, component_type, check_component_alive):
}], indirect=True)
def test_raylet_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all local schedulers on worker nodes.
# Kill all raylets on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_RAYLET)
# The plasma stores should still be alive on the worker nodes.

View file

@ -278,7 +278,7 @@ def test_incorrect_method_calls(ray_start_regular):
def test_worker_raising_exception(ray_start_regular):
@ray.remote
def f():
ray.worker.global_worker._get_next_task_from_local_scheduler = None
ray.worker.global_worker._get_next_task_from_raylet = None
# Running this task should cause the worker to raise an exception after
# the task has successfully completed.

View file

@ -75,7 +75,7 @@ def push_error_to_driver_through_redis(redis_client,
"""Push an error message to the driver to be printed in the background.
Normally the push_error_to_driver function should be used. However, in some
instances, the local scheduler client is not available, e.g., because the
instances, the raylet client is not available, e.g., because the
error happens in Python before the driver or worker has connected to the
backend processes.

View file

@ -538,7 +538,7 @@ class Worker(object):
unready_ids.pop(object_id)
# If there were objects that we weren't able to get locally,
# let the local scheduler know that we're now unblocked.
# let the raylet know that we're now unblocked.
self.raylet_client.notify_unblocked(self.current_task_id)
assert len(final_results) == len(object_ids)
@ -609,14 +609,14 @@ class Worker(object):
# Put large or complex arguments that are passed by value in the
# object store first.
args_for_local_scheduler = []
args_for_raylet = []
for arg in args:
if isinstance(arg, ObjectID):
args_for_local_scheduler.append(arg)
args_for_raylet.append(arg)
elif ray._raylet.check_simple_value(arg):
args_for_local_scheduler.append(arg)
args_for_raylet.append(arg)
else:
args_for_local_scheduler.append(put(arg))
args_for_raylet.append(put(arg))
# By default, there are no execution dependencies.
if execution_dependencies is None:
@ -651,14 +651,14 @@ class Worker(object):
# Current driver id must not be nil when submitting a task.
# Because every task must belong to a driver.
assert not self.task_driver_id.is_nil()
# Submit the task to local scheduler.
# Submit the task to raylet.
function_descriptor_list = (
function_descriptor.get_function_descriptor_list())
assert isinstance(driver_id, DriverID)
task = ray._raylet.Task(
driver_id,
function_descriptor_list,
args_for_local_scheduler,
args_for_raylet,
num_return_vals,
self.current_task_id,
self.task_context.task_index,
@ -998,11 +998,11 @@ class Worker(object):
self.raylet_client.disconnect()
sys.exit(0)
def _get_next_task_from_local_scheduler(self):
"""Get the next task from the local scheduler.
def _get_next_task_from_raylet(self):
"""Get the next task from the raylet.
Returns:
A task from the local scheduler.
A task from the raylet.
"""
with profiling.profile("worker_idle"):
task = self.raylet_client.get_task()
@ -1022,7 +1022,7 @@ class Worker(object):
signal.signal(signal.SIGTERM, exit)
while True:
task = self._get_next_task_from_local_scheduler()
task = self._get_next_task_from_raylet()
self._wait_for_and_process_task(task)
@ -1319,12 +1319,11 @@ def init(redis_address=None,
Args:
redis_address (str): The address of the Redis server to connect to. If
this address is not provided, then this command will start Redis, a
global scheduler, a local scheduler, a plasma store, a plasma
manager, and some workers. It will also kill these processes when
Python exits.
num_cpus (int): Number of cpus the user wishes all local schedulers to
raylet, a plasma store, a plasma manager, and some workers.
It will also kill these processes when Python exits.
num_cpus (int): Number of cpus the user wishes all raylets to
be configured with.
num_gpus (int): Number of gpus the user wishes all local schedulers to
num_gpus (int): Number of gpus the user wishes all raylets to
be configured with.
resources: A dictionary mapping the name of a resource to the quantity
of that resource available.
@ -1791,7 +1790,7 @@ def connect(info,
worker=global_worker,
driver_id=None,
load_code_from_local=False):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.
"""Connect this worker to the raylet, to Plasma, and to Redis.
Args:
info (dict): A dictionary with address of the Redis server and the

View file

@ -559,13 +559,13 @@ TEST_F(TestGcsWithAsio, TestDeleteKey) {
void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id,
const TaskTableDataT &data) {
ASSERT_EQ(data.scheduling_state, SchedulingState::SCHEDULED);
ASSERT_EQ(data.scheduler_id, kRandomId);
ASSERT_EQ(data.raylet_id, kRandomId);
}
void TaskLookupHelper(gcs::AsyncGcsClient *client, const TaskID &id,
const TaskTableDataT &data, bool do_stop) {
ASSERT_EQ(data.scheduling_state, SchedulingState::SCHEDULED);
ASSERT_EQ(data.scheduler_id, kRandomId);
ASSERT_EQ(data.raylet_id, kRandomId);
if (do_stop) {
test->Stop();
}

View file

@ -151,11 +151,11 @@ enum SchedulingState:int {
table TaskTableData {
// The state of the task.
scheduling_state: SchedulingState;
// A local scheduler ID.
scheduler_id: string;
// A raylet ID.
raylet_id: string;
// A string of bytes representing the task's TaskExecutionDependencies.
execution_dependencies: string;
// The number of times the task was spilled back by local schedulers.
// The number of times the task was spilled back by raylets.
spillback_count: long;
// A string of bytes representing the task specification.
task_info: string;
@ -164,7 +164,7 @@ table TaskTableData {
}
table TaskTableTestAndUpdate {
test_scheduler_id: string;
test_raylet_id: string;
test_state_bitmask: SchedulingState;
update_state: SchedulingState;
}

View file

@ -834,10 +834,9 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg
static_cast<int>(update->test_state_bitmask());
bool is_nil_result;
REPLY_AND_RETURN_IF_NOT_OK(is_nil(&is_nil_result, update->test_scheduler_id()->str()));
REPLY_AND_RETURN_IF_NOT_OK(is_nil(&is_nil_result, update->test_raylet_id()->str()));
if (!is_nil_result) {
do_update =
do_update && update->test_scheduler_id()->str() == data->scheduler_id()->str();
do_update = do_update && update->test_raylet_id()->str() == data->raylet_id()->str();
}
if (do_update) {

View file

@ -23,7 +23,7 @@ RAY_CONFIG(int64_t, handler_warning_timeout_ms, 100);
/// The duration between heartbeats. These are sent by the raylet.
RAY_CONFIG(int64_t, heartbeat_timeout_milliseconds, 100);
/// If a component has not sent a heartbeat in the last num_heartbeats_timeout
/// heartbeat intervals, the global scheduler or monitor process will report
/// heartbeat intervals, the raylet monitor process will report
/// it as dead to the db_client table.
RAY_CONFIG(int64_t, num_heartbeats_timeout, 300);
/// For a raylet, if the last heartbeat was sent more than this many
@ -59,29 +59,32 @@ RAY_CONFIG(int64_t, actor_max_dummy_objects, 1000);
RAY_CONFIG(int64_t, num_connect_attempts, 5);
RAY_CONFIG(int64_t, connect_timeout_milliseconds, 500);
/// The duration that the local scheduler will wait before reinitiating a
/// The duration that the raylet will wait before reinitiating a
/// fetch request for a missing task dependency. This time may adapt based on
/// the number of missing task dependencies.
RAY_CONFIG(int64_t, local_scheduler_fetch_timeout_milliseconds, 1000);
/// The duration that the local scheduler will wait between initiating
RAY_CONFIG(int64_t, raylet_fetch_timeout_milliseconds, 1000);
/// The duration that the raylet will wait between initiating
/// reconstruction calls for missing task dependencies. If there are many
/// missing task dependencies, we will only iniate reconstruction calls for
/// some of them each time.
RAY_CONFIG(int64_t, local_scheduler_reconstruction_timeout_milliseconds, 1000);
/// The maximum number of objects that the local scheduler will issue
RAY_CONFIG(int64_t, raylet_reconstruction_timeout_milliseconds, 1000);
/// The maximum number of objects that the raylet will issue
/// reconstruct calls for in a single pass through the reconstruct object
/// timeout handler.
RAY_CONFIG(int64_t, max_num_to_reconstruct, 10000);
/// The maximum number of objects to include in a single fetch request in the
/// regular local scheduler fetch timeout handler.
RAY_CONFIG(int64_t, local_scheduler_fetch_request_size, 10000);
/// regular raylet fetch timeout handler.
RAY_CONFIG(int64_t, raylet_fetch_request_size, 10000);
/// The duration that we wait after sending a worker SIGTERM before sending
/// the worker SIGKILL.
RAY_CONFIG(int64_t, kill_worker_timeout_milliseconds, 100);
/// This is a timeout used to cause failures in the plasma manager and local
/// scheduler when certain event loop handlers take too long.
/// This is a timeout used to cause failures in the plasma manager and raylet
/// when certain event loop handlers take too long.
RAY_CONFIG(int64_t, max_time_for_handler_milliseconds, 1000);
/// This is used by the Python extension when serializing objects as part of

View file

@ -1,4 +1,4 @@
// Local scheduler protocol specification
// raylet protocol specification
include "gcs.fbs";
@ -8,50 +8,48 @@ include "gcs.fbs";
namespace ray.protocol;
enum MessageType:int {
// Task is submitted to the local scheduler. This is sent from a worker to a
// local scheduler.
// Task is submitted to the raylet. This is sent from a worker to a
// raylet.
SubmitTask = 1,
// Notify the local scheduler that a task has finished. This is sent from a
// worker to a local scheduler.
// Notify the raylet that a task has finished. This is sent from a
// worker to a raylet.
TaskDone,
// Log a message to the event table. This is sent from a worker to a local
// scheduler.
// Log a message to the event table. This is sent from a worker to a raylet.
EventLogMessage,
// Send an initial connection message to the local scheduler. This is sent
// from a worker or driver to a local scheduler.
// Send an initial connection message to the raylet. This is sent
// from a worker or driver to a raylet.
RegisterClientRequest,
// Send a reply confirming the successful registration of a worker or driver.
// This is sent from the local scheduler to a worker or driver.
// This is sent from the raylet to a worker or driver.
RegisterClientReply,
// Notify the local scheduler that this client is disconnecting unexpectedly.
// This is sent from a worker to a local scheduler.
// Notify the raylet that this client is disconnecting unexpectedly.
// This is sent from a worker to a raylet.
DisconnectClient,
// Notify the local scheduler that this client is disconnecting gracefully.
// This is sent from a worker to a local scheduler.
// Notify the raylet that this client is disconnecting gracefully.
// This is sent from a worker to a raylet.
IntentionalDisconnectClient,
// Get a new task from the local scheduler. This is sent from a worker to a
// local scheduler.
// Get a new task from the raylet. This is sent from a worker to a
// raylet.
GetTask,
// Tell a worker to execute a task. This is sent from a local scheduler to a
// Tell a worker to execute a task. This is sent from a raylet to a
// worker.
ExecuteTask,
// Reconstruct or fetch possibly lost objects. This is sent from a worker to
// a local scheduler.
// a raylet.
FetchOrReconstruct,
// For a worker that was blocked on some object(s), tell the local scheduler
// that the worker is now unblocked. This is sent from a worker to a local
// scheduler.
// For a worker that was blocked on some object(s), tell the raylet
// that the worker is now unblocked. This is sent from a worker to a raylet.
NotifyUnblocked,
// A request to get the task frontier for an actor, called by the actor when
// saving a checkpoint.
GetActorFrontierRequest,
// The ActorFrontier response to a GetActorFrontierRequest. The local
// scheduler returns the actor's per-handle task counts and execution
// dependencies, which can later be used as the argument to SetActorFrontier
// The ActorFrontier response to a GetActorFrontierRequest. The raylet
// returns the actor's per-handle task counts and execution dependencies,
// which can later be used as the argument to SetActorFrontier
// when resuming from the checkpoint.
GetActorFrontierReply,
// A request to set the task frontier for an actor, called when resuming from
// a checkpoint. The local scheduler will update the actor's per-handle task
// a checkpoint. The raylet will update the actor's per-handle task
// counts and execution dependencies, discard any tasks that already executed
// before the checkpoint, and make any tasks on the frontier runnable by
// making their execution dependencies available.
@ -87,7 +85,7 @@ table TaskExecutionSpecification {
dependencies: [string];
// The last time this task was received for scheduling.
last_timestamp: double;
// The number of times this task was spilled back by local schedulers.
// The number of times this task was spilled back by raylets.
num_forwards: int;
}
@ -117,7 +115,7 @@ table ResourceIdSetInfo {
table DisconnectClient {
}
// This message is sent from the local scheduler to a worker.
// This message is sent from the raylet to a worker.
table GetTaskReply {
// A string of bytes representing the task specification.
task_spec: string;
@ -125,7 +123,7 @@ table GetTaskReply {
fractional_resource_ids: [ResourceIdSetInfo];
}
// This struct is used to register a new worker with the local scheduler.
// This struct is used to register a new worker with the raylet.
// It is shipped as part of raylet_connect.
table RegisterClientRequest {
// True if the client is a worker and false if the client is a driver.

View file

@ -455,7 +455,7 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id,
remote_resources.SetAvailableResources(std::move(remote_available));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));
// Extract decision for this local scheduler.
// Extract decision for this raylet.
auto decision = scheduling_policy_.SpillOver(remote_resources);
std::unordered_set<TaskID> local_task_ids;
for (const auto &task_id : decision) {
@ -935,7 +935,7 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) {
from_flatbuf<ObjectID>(*message->execution_dependencies()));
TaskSpecification task_spec(*message->task_spec());
Task task(task_execution_spec, task_spec);
// Submit the task to the local scheduler. Since the task was submitted
// Submit the task to the raylet. Since the task was submitted
// locally, there is no uncommitted lineage.
SubmitTask(task, Lineage());
}
@ -1154,7 +1154,7 @@ void NodeManager::ScheduleTasks(
}
#endif
// Extract decision for this local scheduler.
// Extract decision for this raylet.
std::unordered_set<TaskID> local_task_ids;
// Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node.
for (const auto &task_client_pair : policy_decision) {

View file

@ -468,7 +468,7 @@ class NodeManager {
WorkerPool worker_pool_;
/// A set of queues to maintain tasks.
SchedulingQueue local_queues_;
/// The scheduling policy in effect for this local scheduler.
/// The scheduling policy in effect for this raylet.
SchedulingPolicy scheduling_policy_;
/// The reconstruction policy for deciding when to re-execute a task.
ReconstructionPolicy reconstruction_policy_;

View file

@ -235,8 +235,8 @@ ray::Status RayletClient::SubmitTask(const std::vector<ObjectID> &execution_depe
ray::Status RayletClient::GetTask(
std::unique_ptr<ray::raylet::TaskSpecification> *task_spec) {
std::unique_ptr<uint8_t[]> reply;
// Receive a task from the raylet. This will block until the local
// scheduler gives this client a task.
// Receive a task from the raylet. This will block until the raylet
// gives this client a task.
auto status =
conn_->AtomicRequestReply(MessageType::GetTask, MessageType::ExecuteTask, reply);
if (!status.ok()) return status;

View file

@ -77,7 +77,7 @@ class TaskExecutionSpecification {
/// Set the task's last timestamp to the specified value.
///
/// \param new_timestamp The new timestamp in millisecond to set the task's
/// time stamp to. Tracks the last time this task entered a local scheduler.
/// time stamp to. Tracks the last time this task entered a raylet.
void SetLastTimestamp(int64_t new_timestamp);
private: