diff --git a/doc/source/ray-core/actors.rst b/doc/source/ray-core/actors.rst index fa6afb34f..a7a1df4bd 100644 --- a/doc/source/ray-core/actors.rst +++ b/doc/source/ray-core/actors.rst @@ -349,4 +349,5 @@ More about Ray Actors actors/concurrency_group_api.rst actors/actor-utils.rst actors/fault-tolerance.rst + actors/scheduling.rst actors/patterns/index.rst diff --git a/doc/source/ray-core/actors/scheduling.rst b/doc/source/ray-core/actors/scheduling.rst new file mode 100644 index 000000000..cd1a17394 --- /dev/null +++ b/doc/source/ray-core/actors/scheduling.rst @@ -0,0 +1,47 @@ +.. _ray-actor-scheduling: + +Scheduling +========== + +For each actor, Ray will choose a node to run it and the scheduling decision is based on the following factors in order. + +Resources +--------- +Each actor has the :ref:`specified resource requirements `. +By default, actors require 1 CPU for scheduling and 0 CPU for running. +This means they cannot get scheduled on a zero-cpu node, but an infinite number of them +can run on any non-zero cpu node. If resources are specified explicitly, they are required +for both scheduling and running. +Given the specified resource requirements, a node is available (has the available resources to run the actor now), +feasible (has the resources but they are not available now) +or infeasible (doesn't have the resources). If there are available nodes, Ray will choose one based on other factors discussed below. +If there are no available nodes but only feasible ones, Ray will wait until resources are freed up and nodes become available. +If all nodes are infeasible, the actor cannot be scheduled until feasible nodes are added to the cluster. + +Placement Group +--------------- +If ``placement_group`` option is set then the actor will be scheduled where the placement group is located. +See :ref:`Placement Group ` for more details. + +Scheduling Strategy +------------------- +Actors support ``scheduling_strategy`` option to specify the strategy used to decide the best node among available nodes. +Currently the only supported strategy for actors is "DEFAULT". +"DEFAULT" is the default strategy used by Ray. With the current implementation, Ray will try to pack actors on nodes +until the resource utilization is beyond a certain threshold and spread actors afterwards. +Currently Ray handles actors that don't require any resources (i.e., ``num_cpus=0`` with no other resources) specially by randomly choosing a node in the cluster without considering resource utilization. +Since nodes are randomly chosen, actors that don't require any resources are effectively SPREAD across the cluster. + +.. tabbed:: Python + + .. code-block:: python + + @ray.remote + class Actor: + pass + + # "DEFAULT" scheduling strategy is used (packed onto nodes until reaching a threshold and then spread). + a1 = Actor.remote() + + # Zero-CPU (and no other resources) actors are randomly assigned to nodes. + a2 = Actor.options(num_cpus=0).remote() diff --git a/doc/source/ray-core/tasks.rst b/doc/source/ray-core/tasks.rst index 85bf715c8..6d6651d79 100644 --- a/doc/source/ray-core/tasks.rst +++ b/doc/source/ray-core/tasks.rst @@ -230,4 +230,5 @@ More about Ray Tasks tasks/using-ray-with-gpus.rst tasks/nested-tasks.rst tasks/fault-tolerance.rst + tasks/scheduling.rst tasks/patterns/index.rst diff --git a/doc/source/ray-core/tasks/scheduling.rst b/doc/source/ray-core/tasks/scheduling.rst new file mode 100644 index 000000000..cec0cccb9 --- /dev/null +++ b/doc/source/ray-core/tasks/scheduling.rst @@ -0,0 +1,84 @@ +.. _ray-task-scheduling: + +Scheduling +========== + +For each task, Ray will choose a node to run it and the scheduling decision is based on the following factors in order. + +Resources +--------- +Each task has the :ref:`specified resource requirements ` and requires 1 CPU by default. +Given the specified resource requirements, a node is available (has the available resources to run the task now), +feasible (has the resources but they are not available now) +or infeasible (doesn't have the resources). If there are available nodes, Ray will choose one based on other factors discussed below. +If there are no available nodes but only feasible ones, Ray will wait until resources are freed up and nodes become available. +If all nodes are infeasible, the task cannot be scheduled until feasible nodes are added to the cluster. + +Placement Group +--------------- +If ``placement_group`` option is set then the task will be scheduled where the placement group is located. +See :ref:`Placement Group ` for more details. + +Scheduling Strategy +------------------- +Tasks support ``scheduling_strategy`` option to specify the strategy used to decide the best node among available nodes. +Currently the supported strategies are "DEFAULT" and "SPREAD". +"DEFAULT" is the default strategy used by Ray. With the current implementation, Ray will try to pack tasks on nodes +until the resource utilization is beyond a certain threshold and spread tasks afterwards. +"SPREAD" strategy will try to spread the tasks among available nodes. + +.. tabbed:: Python + + .. code-block:: python + + @ray.remote + def default_function(): + return 1 + + # If unspecified, "DEFAULT" scheduling strategy is used. + default_function.remote() + + # Explicitly set scheduling strategy to "DEFAULT". + default_function.options(scheduling_strategy="DEFAULT").remote() + + @ray.remote(scheduling_strategy="SPREAD") + def spread_function(): + return 2 + + # Spread tasks across the cluster. + [spread_function.remote() for i in range(100)] + +Locality-Aware Scheduling +------------------------- +When the scheduling strategy is "DEFAULT", Ray also prefers nodes that have large task arguments locally +to avoid transferring data over the network. +If there are multiple large task arguments, the node with most object bytes local is preferred. +Note: Locality-aware scheduling is only for tasks not actors. + +.. tabbed:: Python + + .. code-block:: python + + @ray.remote + def large_object_function(): + # Large object is stored in the local object store + # and available in the distributed memory, + # instead of returning inline directly to the caller. + return [1] * (1024 * 1024) + + @ray.remote + def small_object_function(): + # Small object is returned inline directly to the caller, + # instead of storing in the distributed memory. + return [1] + + @ray.remote + def consume_function(data): + return len(data) + + # Ray will try to run consume_function on the same node where large_object_function runs. + consume_function.remote(large_object_function.remote()) + + # Ray won't consider locality for scheduling consume_function + # since the argument is small and will be sent to the worker node inline directly. + consume_function.remote(small_object_function.remote())