mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[JAVA] [Doc] Improve java doc for PG (#14671)
This commit is contained in:
parent
dcf41d868c
commit
374d166f6d
2 changed files with 512 additions and 95 deletions
|
@ -5,6 +5,8 @@ Placement Groups
|
|||
|
||||
Placement groups allow users to atomically reserve groups of resources across multiple nodes (i.e., gang scheduling). They can be then used to schedule Ray tasks and actors to be packed as close as possible for locality (PACK), or spread apart (SPREAD).
|
||||
|
||||
Java demo code in this documentation can be found here `<https://github.com/ray-project/ray/blob/master/java/test/src/main/java/io/ray/docdemo/PlacementGroupDemo.java>`__.
|
||||
|
||||
Here are some use cases:
|
||||
|
||||
- **Gang Scheduling**: Your application requires all tasks/actors to be scheduled and start at the same time.
|
||||
|
@ -35,40 +37,80 @@ A **placement group strategy** is an algorithm for selecting nodes for bundle pl
|
|||
Starting a placement group
|
||||
--------------------------
|
||||
|
||||
Ray placement group can be created via the ``ray.util.placement_group`` API. Placement groups take in a list of bundles and a :ref:`placement strategy <pgroup-strategy>`:
|
||||
Ray placement group can be created via the ``ray.util.placement_group`` (Python) or ``PlacementGroups.createPlacementGroup`` (Java) API. Placement groups take in a list of bundles and a :ref:`placement strategy <pgroup-strategy>`:
|
||||
|
||||
.. code-block:: python
|
||||
.. tabs::
|
||||
.. group-tab:: Python
|
||||
|
||||
# Import placement group APIs.
|
||||
from ray.util.placement_group import (
|
||||
placement_group,
|
||||
placement_group_table,
|
||||
remove_placement_group
|
||||
)
|
||||
.. code-block:: python
|
||||
|
||||
# Initialize Ray.
|
||||
import ray
|
||||
ray.init(num_gpus=2, resources={"extra_resource": 2})
|
||||
# Import placement group APIs.
|
||||
from ray.util.placement_group import (
|
||||
placement_group,
|
||||
placement_group_table,
|
||||
remove_placement_group
|
||||
)
|
||||
|
||||
bundle1 = {"GPU": 2}
|
||||
bundle2 = {"extra_resource": 2}
|
||||
# Initialize Ray.
|
||||
import ray
|
||||
ray.init(num_gpus=2, resources={"extra_resource": 2})
|
||||
|
||||
pg = placement_group([bundle1, bundle2], strategy="STRICT_PACK")
|
||||
bundle1 = {"GPU": 2}
|
||||
bundle2 = {"extra_resource": 2}
|
||||
|
||||
pg = placement_group([bundle1, bundle2], strategy="STRICT_PACK")
|
||||
|
||||
.. group-tab:: Java
|
||||
|
||||
.. code-block:: java
|
||||
|
||||
// Initialize Ray.
|
||||
Ray.init();
|
||||
|
||||
// Construct a list of bundles.
|
||||
Map<String, Double> bundle = ImmutableMap.of("CPU", 1.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
|
||||
|
||||
// Make a creation option with bundles and strategy.
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_SPREAD)
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
|
||||
.. important:: Each bundle must be able to fit on a single node on the Ray cluster.
|
||||
|
||||
Placement groups are atomically created - meaning that if there exists a bundle that cannot fit in any of the current nodes, then the entire placement group will not be ready.
|
||||
|
||||
.. code-block:: python
|
||||
.. tabs::
|
||||
.. group-tab:: Python
|
||||
|
||||
# Wait until placement group is created.
|
||||
ray.get(pg.ready())
|
||||
.. code-block:: python
|
||||
|
||||
# You can also use ray.wait.
|
||||
ready, unready = ray.wait([pg.ready()], timeout=0)
|
||||
# Wait until placement group is created.
|
||||
ray.get(pg.ready())
|
||||
|
||||
# You can look at placement group states using this API.
|
||||
print(placement_group_table(pg))
|
||||
# You can also use ray.wait.
|
||||
ready, unready = ray.wait([pg.ready()], timeout=0)
|
||||
|
||||
# You can look at placement group states using this API.
|
||||
print(placement_group_table(pg))
|
||||
|
||||
.. group-tab:: Java
|
||||
|
||||
.. code-block:: java
|
||||
|
||||
// Wait for the placement group to be ready within the specified time(unit is seconds).
|
||||
boolean ready = pg.wait(60);
|
||||
Assert.assertTrue(ready);
|
||||
|
||||
// You can look at placement group states using this API.
|
||||
List<PlacementGroup> allPlacementGroup = PlacementGroups.getAllPlacementGroups();
|
||||
for (PlacementGroup group: allPlacementGroup) {
|
||||
System.out.println(group);
|
||||
}
|
||||
|
||||
Infeasible placement groups will be pending until resources are available. The Ray Autoscaler will be aware of placement groups, and auto-scale the cluster to ensure pending groups can be placed as needed.
|
||||
|
||||
|
@ -128,25 +170,72 @@ Let's create a placement group. Recall that each bundle is a collection of resou
|
|||
|
||||
Once the placement group reserves resources, original resources are unavailable until the placement group is removed. For example:
|
||||
|
||||
.. code-block:: python
|
||||
.. tabs::
|
||||
.. group-tab:: Python
|
||||
|
||||
# Two "CPU"s are available.
|
||||
ray.init(num_cpus=2)
|
||||
.. code-block:: python
|
||||
|
||||
# Create a placement group.
|
||||
pg = placement_group([{"CPU": 2}])
|
||||
ray.get(pg.ready())
|
||||
# Two "CPU"s are available.
|
||||
ray.init(num_cpus=2)
|
||||
|
||||
# Now, 2 CPUs are not available anymore because they are pre-reserved by the placement group.
|
||||
@ray.remote(num_cpus=2)
|
||||
def f():
|
||||
return True
|
||||
# Create a placement group.
|
||||
pg = placement_group([{"CPU": 2}])
|
||||
ray.get(pg.ready())
|
||||
|
||||
# Won't be scheduled because there are no 2 cpus.
|
||||
f.remote()
|
||||
# Now, 2 CPUs are not available anymore because they are pre-reserved by the placement group.
|
||||
@ray.remote(num_cpus=2)
|
||||
def f():
|
||||
return True
|
||||
|
||||
# Will be scheduled because 2 cpus are reserved by the placement group.
|
||||
f.options(placement_group=pg).remote()
|
||||
# Won't be scheduled because there are no 2 cpus.
|
||||
f.remote()
|
||||
|
||||
# Will be scheduled because 2 cpus are reserved by the placement group.
|
||||
f.options(placement_group=pg).remote()
|
||||
|
||||
.. group-tab:: Java
|
||||
|
||||
.. code-block:: java
|
||||
|
||||
System.setProperty("ray.head-args.0", "--num-cpus=2");
|
||||
Ray.init();
|
||||
|
||||
public static class Counter {
|
||||
public static String ping() {
|
||||
return "pong";
|
||||
}
|
||||
}
|
||||
|
||||
// Construct a list of bundles.
|
||||
Map<String, Double> bundle = ImmutableMap.of("CPU", 2.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
|
||||
|
||||
// Create a placement group and make sure its creation is successful.
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_SPREAD)
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
boolean isCreated = pg.wait(60);
|
||||
Assert.assertTrue(isCreated);
|
||||
|
||||
// Won't be scheduled because there are no 2 cpus now.
|
||||
ObjectRef<String> obj = Ray.task(Counter::ping)
|
||||
.setResource("CPU", 2.0)
|
||||
.remote();
|
||||
|
||||
List<ObjectRef<String>> waitList = ImmutableList.of(obj);
|
||||
WaitResult<String> waitResult = Ray.wait(waitList, 1, 5 * 1000);
|
||||
Assert.assertEquals(1, waitResult.getUnready().size());
|
||||
|
||||
// Will be scheduled because 2 cpus are reserved by the placement group.
|
||||
obj = Ray.task(Counter::ping)
|
||||
.setPlacementGroup(pg, 0)
|
||||
.setResource("CPU", 2.0)
|
||||
.remote();
|
||||
Assert.assertEquals(obj.get(), "pong");
|
||||
|
||||
.. note::
|
||||
|
||||
|
@ -154,51 +243,118 @@ Let's create a placement group. Recall that each bundle is a collection of resou
|
|||
and have the proper resources. Ray assumes that the placement group will be properly created and does *not*
|
||||
print a warning about infeasible tasks.
|
||||
|
||||
.. code-block:: python
|
||||
.. tabs::
|
||||
.. group-tab:: Python
|
||||
|
||||
gpu_bundle = {"GPU": 2}
|
||||
extra_resource_bundle = {"extra_resource": 2}
|
||||
.. code-block:: python
|
||||
|
||||
# Reserve bundles with strict pack strategy.
|
||||
# It means Ray will reserve 2 "GPU" and 2 "extra_resource" on the same node (strict pack) within a Ray cluster.
|
||||
# Using this placement group for scheduling actors or tasks will guarantee that they will
|
||||
# be colocated on the same node.
|
||||
pg = placement_group([gpu_bundle, extra_resource_bundle], strategy="STRICT_PACK")
|
||||
gpu_bundle = {"GPU": 2}
|
||||
extra_resource_bundle = {"extra_resource": 2}
|
||||
|
||||
# Wait until placement group is created.
|
||||
ray.get(pg.ready())
|
||||
# Reserve bundles with strict pack strategy.
|
||||
# It means Ray will reserve 2 "GPU" and 2 "extra_resource" on the same node (strict pack) within a Ray cluster.
|
||||
# Using this placement group for scheduling actors or tasks will guarantee that they will
|
||||
# be colocated on the same node.
|
||||
pg = placement_group([gpu_bundle, extra_resource_bundle], strategy="STRICT_PACK")
|
||||
|
||||
# Wait until placement group is created.
|
||||
ray.get(pg.ready())
|
||||
|
||||
.. group-tab:: Java
|
||||
|
||||
.. code-block:: java
|
||||
|
||||
Map<String, Double> bundle1 = ImmutableMap.of("GPU", 2.0);
|
||||
Map<String, Double> bundle2 = ImmutableMap.of("extra_resource", 2.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle1, bundle2);
|
||||
|
||||
/**
|
||||
* Reserve bundles with strict pack strategy.
|
||||
* It means Ray will reserve 2 "GPU" and 2 "extra_resource" on the same node (strict pack) within a Ray cluster.
|
||||
* Using this placement group for scheduling actors or tasks will guarantee that they will
|
||||
* be colocated on the same node.
|
||||
*/
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_PACK)
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
boolean isCreated = pg.wait(60);
|
||||
Assert.assertTrue(isCreated);
|
||||
|
||||
Now let's define an actor that uses GPU. We'll also define a task that use ``extra_resources``.
|
||||
|
||||
.. code-block:: python
|
||||
.. tabs::
|
||||
.. group-tab:: Python
|
||||
|
||||
@ray.remote(num_gpus=1)
|
||||
class GPUActor:
|
||||
def __init__(self):
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote(num_gpus=1)
|
||||
class GPUActor:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@ray.remote(resources={"extra_resource": 1})
|
||||
def extra_resource_task():
|
||||
import time
|
||||
# simulate long-running task.
|
||||
time.sleep(10)
|
||||
@ray.remote(resources={"extra_resource": 1})
|
||||
def extra_resource_task():
|
||||
import time
|
||||
# simulate long-running task.
|
||||
time.sleep(10)
|
||||
|
||||
# Create GPU actors on a gpu bundle.
|
||||
gpu_actors = [GPUActor.options(
|
||||
# Create GPU actors on a gpu bundle.
|
||||
gpu_actors = [GPUActor.options(
|
||||
placement_group=pg,
|
||||
# This is the index from the original list.
|
||||
# This index is set to -1 by default, which means any available bundle.
|
||||
placement_group_bundle_index=0) # Index of gpu_bundle is 0.
|
||||
.remote() for _ in range(2)]
|
||||
|
||||
# Create extra_resource actors on a extra_resource bundle.
|
||||
extra_resource_actors = [extra_resource_task.options(
|
||||
# Create extra_resource actors on a extra_resource bundle.
|
||||
extra_resource_actors = [extra_resource_task.options(
|
||||
placement_group=pg,
|
||||
# This is the index from the original list.
|
||||
# This index is set to -1 by default, which means any available bundle.
|
||||
placement_group_bundle_index=1) # Index of extra_resource_bundle is 1.
|
||||
.remote() for _ in range(2)]
|
||||
|
||||
.. group-tab:: Java
|
||||
|
||||
.. code-block:: java
|
||||
|
||||
public static class Counter {
|
||||
private int value;
|
||||
|
||||
public Counter(int initValue) {
|
||||
this.value = initValue;
|
||||
}
|
||||
|
||||
public int getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public static String ping() {
|
||||
return "pong";
|
||||
}
|
||||
}
|
||||
|
||||
// Create GPU actors on a gpu bundle.
|
||||
for (int index = 0; index < 2; index++) {
|
||||
Ray.actor(Counter::new, 1)
|
||||
.setResource("GPU", 1.0)
|
||||
.setPlacementGroup(pg, 0)
|
||||
.remote();
|
||||
}
|
||||
|
||||
// Create extra_resource actors on a extra_resource bundle.
|
||||
for (int index = 0; index < 2; index++) {
|
||||
Ray.task(Counter::ping)
|
||||
.setPlacementGroup(pg, 1)
|
||||
.setResource("extra_resource", 1.0)
|
||||
.remote().get();
|
||||
}
|
||||
|
||||
|
||||
Now, you can guarantee all gpu actors and extra_resource tasks are located on the same node
|
||||
because they are scheduled on a placement group with the STRICT_PACK strategy.
|
||||
|
||||
|
@ -207,56 +363,76 @@ because they are scheduled on a placement group with the STRICT_PACK strategy.
|
|||
In order to fully utilize resources pre-reserved by the placement group,
|
||||
Ray automatically schedules children tasks/actors to the same placement group as its parent.
|
||||
|
||||
.. code-block:: python
|
||||
.. tabs::
|
||||
.. group-tab:: Python
|
||||
|
||||
# Create a placement group with the STRICT_SPREAD strategy.
|
||||
pg = placement_group([{"CPU": 2}, {"CPU": 2}], strategy="STRICT_SPREAD")
|
||||
ray.get(pg.ready())
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote
|
||||
def child():
|
||||
pass
|
||||
# Create a placement group with the STRICT_SPREAD strategy.
|
||||
pg = placement_group([{"CPU": 2}, {"CPU": 2}], strategy="STRICT_SPREAD")
|
||||
ray.get(pg.ready())
|
||||
|
||||
@ray.remote
|
||||
def parent():
|
||||
# The child task is scheduled with the same placement group as its parent
|
||||
# although child.options(placement_group=pg).remote() wasn't called.
|
||||
ray.get(child.remote())
|
||||
@ray.remote
|
||||
def child():
|
||||
pass
|
||||
|
||||
ray.get(parent.options(placement_group=pg).remote())
|
||||
@ray.remote
|
||||
def parent():
|
||||
# The child task is scheduled with the same placement group as its parent
|
||||
# although child.options(placement_group=pg).remote() wasn't called.
|
||||
ray.get(child.remote())
|
||||
|
||||
To avoid it, you should specify `options(placement_group=None)` in a child task/actor remote call.
|
||||
ray.get(parent.options(placement_group=pg).remote())
|
||||
|
||||
.. code-block:: python
|
||||
To avoid it, you should specify `options(placement_group=None)` in a child task/actor remote call.
|
||||
|
||||
@ray.remote
|
||||
def parent():
|
||||
# In this case, the child task won't be
|
||||
# scheduled with the parent's placement group.
|
||||
ray.get(child.options(placement_group=None).remote())
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote
|
||||
def parent():
|
||||
# In this case, the child task won't be
|
||||
# scheduled with the parent's placement group.
|
||||
ray.get(child.options(placement_group=None).remote())
|
||||
|
||||
.. group-tab:: Java
|
||||
|
||||
It's not implemented for Java APIs yet.
|
||||
|
||||
You can remove a placement group at any time to free its allocated resources.
|
||||
|
||||
.. code-block:: python
|
||||
.. tabs::
|
||||
.. group-tab:: Python
|
||||
|
||||
# This API is asynchronous.
|
||||
remove_placement_group(pg)
|
||||
.. code-block:: python
|
||||
|
||||
# Wait until placement group is killed.
|
||||
import time
|
||||
time.sleep(1)
|
||||
# Check the placement group has died.
|
||||
pprint(placement_group_table(pg))
|
||||
# This API is asynchronous.
|
||||
remove_placement_group(pg)
|
||||
|
||||
"""
|
||||
{'bundles': {0: {'GPU': 2.0}, 1: {'extra_resource': 2.0}},
|
||||
'name': 'unnamed_group',
|
||||
'placement_group_id': '40816b6ad474a6942b0edb45809b39c3',
|
||||
'state': 'REMOVED',
|
||||
'strategy': 'STRICT_PACK'}
|
||||
"""
|
||||
# Wait until placement group is killed.
|
||||
import time
|
||||
time.sleep(1)
|
||||
# Check the placement group has died.
|
||||
pprint(placement_group_table(pg))
|
||||
|
||||
"""
|
||||
{'bundles': {0: {'GPU': 2.0}, 1: {'extra_resource': 2.0}},
|
||||
'name': 'unnamed_group',
|
||||
'placement_group_id': '40816b6ad474a6942b0edb45809b39c3',
|
||||
'state': 'REMOVED',
|
||||
'strategy': 'STRICT_PACK'}
|
||||
"""
|
||||
|
||||
ray.shutdown()
|
||||
|
||||
.. group-tab:: Java
|
||||
|
||||
.. code-block:: java
|
||||
|
||||
PlacementGroups.removePlacementGroup(placementGroup.getId());
|
||||
|
||||
PlacementGroup removedPlacementGroup = PlacementGroups.getPlacementGroup(placementGroup.getId());
|
||||
Assert.assertEquals(removedPlacementGroup.getState(), PlacementGroupState.REMOVED);
|
||||
|
||||
ray.shutdown()
|
||||
|
||||
Named Placement Groups
|
||||
----------------------
|
||||
|
@ -289,7 +465,52 @@ See :ref:`placement-group-lifetimes` for more details.
|
|||
|
||||
.. group-tab:: Java
|
||||
|
||||
The named placement group is not implemented for Java APIs yet.
|
||||
.. code-block:: java
|
||||
|
||||
// Create a placement group with a globally unique name.
|
||||
Map<String, Double> bundle = ImmutableMap.of("CPU", 1.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
|
||||
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_SPREAD)
|
||||
.setGlobalName("global_name")
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
pg.wait(60);
|
||||
|
||||
...
|
||||
|
||||
// Retrieve the placement group later somewhere.
|
||||
PlacementGroup group = PlacementGroups.getGlobalPlacementGroup("global_name");
|
||||
Assert.assertNotNull(group);
|
||||
|
||||
We also support non-global named placement group in Java, which means that the placement group name is only valid within the job and cannot be accessed from another job.
|
||||
|
||||
.. code-block:: java
|
||||
|
||||
// Create a placement group with a job-scope-unique name.
|
||||
Map<String, Double> bundle = ImmutableMap.of("CPU", 1.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
|
||||
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_SPREAD)
|
||||
.setName("non_global_name")
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
pg.wait(60);
|
||||
|
||||
...
|
||||
|
||||
// Retrieve the placement group later somewhere in the same job.
|
||||
PlacementGroup group = PlacementGroups.getPlacementGroup("non_global_name");
|
||||
Assert.assertNotNull(group);
|
||||
|
||||
|
||||
.. _placement-group-lifetimes:
|
||||
|
||||
|
|
196
java/test/src/main/java/io/ray/docdemo/PlacementGroupDemo.java
Normal file
196
java/test/src/main/java/io/ray/docdemo/PlacementGroupDemo.java
Normal file
|
@ -0,0 +1,196 @@
|
|||
package io.ray.docdemo;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.ray.api.ObjectRef;
|
||||
import io.ray.api.PlacementGroups;
|
||||
import io.ray.api.Ray;
|
||||
import io.ray.api.WaitResult;
|
||||
import io.ray.api.options.PlacementGroupCreationOptions;
|
||||
import io.ray.api.placementgroup.PlacementGroup;
|
||||
import io.ray.api.placementgroup.PlacementGroupState;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.testng.Assert;
|
||||
|
||||
public class PlacementGroupDemo {
|
||||
|
||||
public static class Counter {
|
||||
private int value;
|
||||
|
||||
public Counter(int initValue) {
|
||||
this.value = initValue;
|
||||
}
|
||||
|
||||
public int getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public static String ping() {
|
||||
return "pong";
|
||||
}
|
||||
}
|
||||
|
||||
public static void createAndRemovePlacementGroup() {
|
||||
// Construct a list of bundles.
|
||||
Map<String, Double> bundle = ImmutableMap.of("CPU", 1.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
|
||||
|
||||
// Make a creation option with bundles and strategy.
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_SPREAD)
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
|
||||
// Wait for the placement group to be ready within the specified time(unit is seconds).
|
||||
boolean ready = pg.wait(60);
|
||||
Assert.assertTrue(ready);
|
||||
|
||||
// You can look at placement group states using this API.
|
||||
List<PlacementGroup> allPlacementGroup = PlacementGroups.getAllPlacementGroups();
|
||||
for (PlacementGroup group : allPlacementGroup) {
|
||||
System.out.println(group);
|
||||
}
|
||||
|
||||
PlacementGroups.removePlacementGroup(pg.getId());
|
||||
|
||||
PlacementGroup removedPlacementGroup = PlacementGroups.getPlacementGroup(pg.getId());
|
||||
Assert.assertEquals(removedPlacementGroup.getState(), PlacementGroupState.REMOVED);
|
||||
}
|
||||
|
||||
public static void runNormalTaskWithPlacementGroup() {
|
||||
// Construct a list of bundles.
|
||||
Map<String, Double> bundle = ImmutableMap.of("CPU", 2.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
|
||||
|
||||
// Create a placement group and make sure its creation is successful.
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_SPREAD)
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
boolean isCreated = pg.wait(60);
|
||||
Assert.assertTrue(isCreated);
|
||||
|
||||
// Won't be scheduled because there are no 2 cpus now.
|
||||
ObjectRef<String> obj = Ray.task(Counter::ping).setResource("CPU", 2.0).remote();
|
||||
|
||||
List<ObjectRef<String>> waitList = ImmutableList.of(obj);
|
||||
WaitResult<String> waitResult = Ray.wait(waitList, 1, 5 * 1000);
|
||||
Assert.assertEquals(1, waitResult.getUnready().size());
|
||||
|
||||
// Will be scheduled because 2 cpus are reserved by the placement group.
|
||||
obj = Ray.task(Counter::ping).setPlacementGroup(pg, 0).setResource("CPU", 2.0).remote();
|
||||
Assert.assertEquals(obj.get(), "pong");
|
||||
|
||||
PlacementGroups.removePlacementGroup(pg.getId());
|
||||
|
||||
PlacementGroup removedPlacementGroup = PlacementGroups.getPlacementGroup(pg.getId());
|
||||
Assert.assertEquals(removedPlacementGroup.getState(), PlacementGroupState.REMOVED);
|
||||
}
|
||||
|
||||
public static void createGlobalNamedPlacementGroup() {
|
||||
// Create a placement group with a globally unique name.
|
||||
Map<String, Double> bundle = ImmutableMap.of("CPU", 1.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
|
||||
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_SPREAD)
|
||||
.setGlobalName("global_name")
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
pg.wait(60);
|
||||
|
||||
// Retrieve the placement group later somewhere.
|
||||
PlacementGroup group = PlacementGroups.getGlobalPlacementGroup("global_name");
|
||||
Assert.assertNotNull(group);
|
||||
|
||||
PlacementGroups.removePlacementGroup(pg.getId());
|
||||
|
||||
PlacementGroup removedPlacementGroup = PlacementGroups.getPlacementGroup(pg.getId());
|
||||
Assert.assertEquals(removedPlacementGroup.getState(), PlacementGroupState.REMOVED);
|
||||
}
|
||||
|
||||
public static void createNonGlobalNamedPlacementGroup() {
|
||||
// Create a placement group with a job-scope-unique name.
|
||||
Map<String, Double> bundle = ImmutableMap.of("CPU", 1.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
|
||||
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_SPREAD)
|
||||
.setName("non_global_name")
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
pg.wait(60);
|
||||
|
||||
// Retrieve the placement group later somewhere in the same job.
|
||||
PlacementGroup group = PlacementGroups.getPlacementGroup("non_global_name");
|
||||
Assert.assertNotNull(group);
|
||||
|
||||
PlacementGroups.removePlacementGroup(pg.getId());
|
||||
|
||||
PlacementGroup removedPlacementGroup = PlacementGroups.getPlacementGroup(pg.getId());
|
||||
Assert.assertEquals(removedPlacementGroup.getState(), PlacementGroupState.REMOVED);
|
||||
}
|
||||
|
||||
public static void strictPackExample() {
|
||||
Map<String, Double> bundle1 = ImmutableMap.of("GPU", 2.0);
|
||||
Map<String, Double> bundle2 = ImmutableMap.of("extra_resource", 2.0);
|
||||
List<Map<String, Double>> bundles = ImmutableList.of(bundle1, bundle2);
|
||||
|
||||
PlacementGroupCreationOptions options =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setBundles(bundles)
|
||||
.setStrategy(PlacementStrategy.STRICT_PACK)
|
||||
.build();
|
||||
|
||||
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
|
||||
boolean isCreated = pg.wait(60);
|
||||
Assert.assertTrue(isCreated);
|
||||
|
||||
// Create GPU actors on a gpu bundle.
|
||||
for (int index = 0; index < 2; index++) {
|
||||
Ray.actor(Counter::new, 1).setResource("GPU", 1.0).setPlacementGroup(pg, 0).remote();
|
||||
}
|
||||
|
||||
// Create extra_resource actors on a extra_resource bundle.
|
||||
for (int index = 0; index < 2; index++) {
|
||||
Ray.task(Counter::ping)
|
||||
.setPlacementGroup(pg, 1)
|
||||
.setResource("extra_resource", 1.0)
|
||||
.remote()
|
||||
.get();
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
// Start Ray runtime. If you're connecting to an existing cluster, you can set
|
||||
// the `-Dray.address=<cluster-address>` java system property.
|
||||
System.setProperty("ray.head-args.0", "--resources={\"extra_resource\":2.0}");
|
||||
System.setProperty("ray.head-args.1", "--num-cpus=2");
|
||||
System.setProperty("ray.head-args.2", "--num-gpus=2");
|
||||
Ray.init();
|
||||
|
||||
createAndRemovePlacementGroup();
|
||||
|
||||
runNormalTaskWithPlacementGroup();
|
||||
|
||||
createGlobalNamedPlacementGroup();
|
||||
|
||||
createNonGlobalNamedPlacementGroup();
|
||||
|
||||
strictPackExample();
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue