mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Placement Group] Support named placement group java api & Refactor construct method (#13821)
This commit is contained in:
parent
3d20d58c90
commit
7647d60fa9
18 changed files with 439 additions and 79 deletions
|
@ -2,6 +2,7 @@ package io.ray.api;
|
|||
|
||||
import io.ray.api.id.PlacementGroupId;
|
||||
import io.ray.api.id.UniqueId;
|
||||
import io.ray.api.options.PlacementGroupCreationOptions;
|
||||
import io.ray.api.placementgroup.PlacementGroup;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import io.ray.api.runtime.RayRuntime;
|
||||
|
@ -256,22 +257,50 @@ public final class Ray extends RayCall {
|
|||
* Create a placement group. A placement group is used to place actors according to a specific
|
||||
* strategy and resource constraints. It will sends a request to GCS to preallocate the specified
|
||||
* resources, which is asynchronous. If the specified resource cannot be allocated, it will wait
|
||||
* for the resource to be updated and rescheduled. This function only works when gcs actor manager
|
||||
* is turned on.
|
||||
* for the resource to be updated and rescheduled.
|
||||
*
|
||||
* @deprecated This method is no longer recommended to create a new placement group, use {@link
|
||||
* Ray#createPlacementGroup(PlacementGroupCreationOptions)} instead.
|
||||
* @param name Name of the placement group.
|
||||
* @param bundles Pre-allocated resource list.
|
||||
* @param strategy Actor placement strategy.
|
||||
* @return A handle to the created placement group.
|
||||
*/
|
||||
@Deprecated
|
||||
public static PlacementGroup createPlacementGroup(
|
||||
String name, List<Map<String, Double>> bundles, PlacementStrategy strategy) {
|
||||
return internal().createPlacementGroup(name, bundles, strategy);
|
||||
PlacementGroupCreationOptions creationOptions =
|
||||
new PlacementGroupCreationOptions.Builder()
|
||||
.setName(name)
|
||||
.setBundles(bundles)
|
||||
.setStrategy(strategy)
|
||||
.build();
|
||||
return createPlacementGroup(creationOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a placement group with an empty name.
|
||||
*
|
||||
* @deprecated This method is no longer recommended to create a new placement group, use {@link
|
||||
* Ray#createPlacementGroup(PlacementGroupCreationOptions)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static PlacementGroup createPlacementGroup(
|
||||
List<Map<String, Double>> bundles, PlacementStrategy strategy) {
|
||||
return internal().createPlacementGroup(bundles, strategy);
|
||||
return createPlacementGroup(null, bundles, strategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a placement group. A placement group is used to place actors according to a specific
|
||||
* strategy and resource constraints. It will sends a request to GCS to preallocate the specified
|
||||
* resources, which is asynchronous. If the specified resource cannot be allocated, it will wait
|
||||
* for the resource to be updated and rescheduled.
|
||||
*
|
||||
* @param creationOptions Creation options of the placement group.
|
||||
* @return A handle to the created placement group.
|
||||
*/
|
||||
public static PlacementGroup createPlacementGroup(PlacementGroupCreationOptions creationOptions) {
|
||||
return internal().createPlacementGroup(creationOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -296,6 +325,26 @@ public final class Ray extends RayCall {
|
|||
return internal().getPlacementGroup(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a placement group by placement group name from current job.
|
||||
*
|
||||
* @param name The placement group name.
|
||||
* @return The placement group.
|
||||
*/
|
||||
public static PlacementGroup getPlacementGroup(String name) {
|
||||
return internal().getPlacementGroup(name, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a placement group by placement group name from all jobs.
|
||||
*
|
||||
* @param name The placement group name.
|
||||
* @return The placement group.
|
||||
*/
|
||||
public static PlacementGroup getGlobalPlacementGroup(String name) {
|
||||
return internal().getPlacementGroup(name, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all placement groups in this cluster.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
package io.ray.api.options;
|
||||
|
||||
import io.ray.api.Ray;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/** The options for creating placement group. */
|
||||
public class PlacementGroupCreationOptions {
|
||||
public final boolean global;
|
||||
public final String name;
|
||||
public final List<Map<String, Double>> bundles;
|
||||
public final PlacementStrategy strategy;
|
||||
|
||||
public PlacementGroupCreationOptions(
|
||||
boolean global, String name, List<Map<String, Double>> bundles, PlacementStrategy strategy) {
|
||||
if (bundles == null || bundles.isEmpty()) {
|
||||
throw new IllegalArgumentException(
|
||||
"`Bundles` must be specified when creating a new placement group.");
|
||||
}
|
||||
boolean bundleResourceValid =
|
||||
bundles.stream()
|
||||
.allMatch(bundle -> bundle.values().stream().allMatch(resource -> resource > 0));
|
||||
|
||||
if (!bundleResourceValid) {
|
||||
throw new IllegalArgumentException(
|
||||
"Bundles cannot be empty or bundle's resource must be positive.");
|
||||
}
|
||||
if (strategy == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"`PlacementStrategy` must be specified when creating a new placement group.");
|
||||
}
|
||||
this.global = global;
|
||||
this.name = name;
|
||||
this.bundles = bundles;
|
||||
this.strategy = strategy;
|
||||
}
|
||||
|
||||
/** The inner class for building PlacementGroupCreationOptions. */
|
||||
public static class Builder {
|
||||
private boolean global;
|
||||
private String name;
|
||||
private List<Map<String, Double>> bundles;
|
||||
private PlacementStrategy strategy;
|
||||
|
||||
/**
|
||||
* Set the name of a named placement group. This named placement group is only accessible from
|
||||
* this job by this name via {@link Ray#getPlacementGroup(java.lang.String)}. If you want to
|
||||
* create a named placement group that is accessible from all jobs, use {@link
|
||||
* Builder#setGlobalName(java.lang.String)} instead.
|
||||
*
|
||||
* @param name The name of the named placement group.
|
||||
* @return self
|
||||
*/
|
||||
public Builder setName(String name) {
|
||||
if (this.name != null) {
|
||||
throw new IllegalArgumentException("Repeated assignment of the name is not allowed!");
|
||||
}
|
||||
this.name = name;
|
||||
this.global = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the name of a named placement group. This placement group can be accessed by all jobs
|
||||
* with this name via {@link Ray#getGlobalPlacementGroup(java.lang.String)}. If you want to
|
||||
* create a named placement group that is only accessible from this job, use {@link
|
||||
* Builder#setName(java.lang.String)} instead.
|
||||
*
|
||||
* @param name The name of the named placement group.
|
||||
* @return self
|
||||
*/
|
||||
public Builder setGlobalName(String name) {
|
||||
if (this.name != null) {
|
||||
throw new IllegalArgumentException("Repeated assignment of the name is not allowed!");
|
||||
}
|
||||
this.name = name;
|
||||
this.global = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the Pre-allocated resource list. Bundle is a collection of resources used to reserve
|
||||
* resources on the raylet side.
|
||||
*
|
||||
* @param bundles The Pre-allocated resource list.
|
||||
* @return self
|
||||
*/
|
||||
public Builder setBundles(List<Map<String, Double>> bundles) {
|
||||
this.bundles = bundles;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the placement strategy used to control the placement relationship between bundles. More
|
||||
* details refer to {@link PlacementStrategy}
|
||||
*
|
||||
* @param strategy The placement strategy between bundles.
|
||||
* @return self
|
||||
*/
|
||||
public Builder setStrategy(PlacementStrategy strategy) {
|
||||
this.strategy = strategy;
|
||||
return this;
|
||||
}
|
||||
|
||||
public PlacementGroupCreationOptions build() {
|
||||
return new PlacementGroupCreationOptions(global, name, bundles, strategy);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,11 +14,10 @@ import io.ray.api.id.PlacementGroupId;
|
|||
import io.ray.api.id.UniqueId;
|
||||
import io.ray.api.options.ActorCreationOptions;
|
||||
import io.ray.api.options.CallOptions;
|
||||
import io.ray.api.options.PlacementGroupCreationOptions;
|
||||
import io.ray.api.placementgroup.PlacementGroup;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import io.ray.api.runtimecontext.RuntimeContext;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
|
@ -169,11 +168,13 @@ public interface RayRuntime {
|
|||
*/
|
||||
PyActorHandle createActor(PyActorClass pyActorClass, Object[] args, ActorCreationOptions options);
|
||||
|
||||
PlacementGroup createPlacementGroup(
|
||||
String name, List<Map<String, Double>> bundles, PlacementStrategy strategy);
|
||||
|
||||
PlacementGroup createPlacementGroup(
|
||||
List<Map<String, Double>> bundles, PlacementStrategy strategy);
|
||||
/**
|
||||
* Create a placement group on remote nodes.
|
||||
*
|
||||
* @param creationOptions Creation options of the placement group.
|
||||
* @return A handle to the created placement group.
|
||||
*/
|
||||
PlacementGroup createPlacementGroup(PlacementGroupCreationOptions creationOptions);
|
||||
|
||||
RuntimeContext getRuntimeContext();
|
||||
|
||||
|
@ -208,6 +209,15 @@ public interface RayRuntime {
|
|||
*/
|
||||
PlacementGroup getPlacementGroup(PlacementGroupId id);
|
||||
|
||||
/**
|
||||
* Get a placement group by name.
|
||||
*
|
||||
* @param name The name of the placement group.
|
||||
* @param global Whether the named placement group is global.
|
||||
* @return The placement group.
|
||||
*/
|
||||
PlacementGroup getPlacementGroup(String name, boolean global);
|
||||
|
||||
/**
|
||||
* Get all placement groups in this cluster.
|
||||
*
|
||||
|
|
|
@ -17,8 +17,8 @@ import io.ray.api.id.ObjectId;
|
|||
import io.ray.api.id.PlacementGroupId;
|
||||
import io.ray.api.options.ActorCreationOptions;
|
||||
import io.ray.api.options.CallOptions;
|
||||
import io.ray.api.options.PlacementGroupCreationOptions;
|
||||
import io.ray.api.placementgroup.PlacementGroup;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import io.ray.api.runtimecontext.RuntimeContext;
|
||||
import io.ray.runtime.config.RayConfig;
|
||||
import io.ray.runtime.context.RuntimeContextImpl;
|
||||
|
@ -38,7 +38,6 @@ import io.ray.runtime.task.TaskExecutor;
|
|||
import io.ray.runtime.task.TaskSubmitter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -50,7 +49,6 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
|
|||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRayRuntime.class);
|
||||
public static final String PYTHON_INIT_METHOD_NAME = "__init__";
|
||||
private static final String DEFAULT_PLACEMENT_GROUP_NAME = "unnamed_group";
|
||||
protected RayConfig rayConfig;
|
||||
protected TaskExecutor taskExecutor;
|
||||
protected FunctionManager functionManager;
|
||||
|
@ -165,23 +163,11 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PlacementGroup createPlacementGroup(
|
||||
String name, List<Map<String, Double>> bundles, PlacementStrategy strategy) {
|
||||
boolean bundleResourceValid =
|
||||
bundles.stream()
|
||||
.allMatch(bundle -> bundle.values().stream().allMatch(resource -> resource > 0));
|
||||
|
||||
if (bundles.isEmpty() || !bundleResourceValid) {
|
||||
throw new IllegalArgumentException(
|
||||
"Bundles cannot be empty or bundle's resource must be positive.");
|
||||
}
|
||||
return taskSubmitter.createPlacementGroup(name, bundles, strategy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PlacementGroup createPlacementGroup(
|
||||
List<Map<String, Double>> bundles, PlacementStrategy strategy) {
|
||||
return createPlacementGroup(DEFAULT_PLACEMENT_GROUP_NAME, bundles, strategy);
|
||||
public PlacementGroup createPlacementGroup(PlacementGroupCreationOptions creationOptions) {
|
||||
Preconditions.checkNotNull(
|
||||
creationOptions,
|
||||
"`PlacementGroupCreationOptions` must be specified when creating a new placement group.");
|
||||
return taskSubmitter.createPlacementGroup(creationOptions);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -194,6 +180,11 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
|
|||
return gcsClient.getPlacementGroupInfo(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PlacementGroup getPlacementGroup(String name, boolean global) {
|
||||
return gcsClient.getPlacementGroupInfo(name, global);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PlacementGroup> getAllPlacementGroups() {
|
||||
return gcsClient.getAllPlacementGroupInfo();
|
||||
|
|
|
@ -43,6 +43,18 @@ public class GcsClient {
|
|||
return PlacementGroupUtils.generatePlacementGroupFromByteArray(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a placement group by name.
|
||||
*
|
||||
* @param name Name of the placement group.
|
||||
* @param global Whether the named placement group is global.
|
||||
* @return The placement group.
|
||||
*/
|
||||
public PlacementGroup getPlacementGroupInfo(String name, boolean global) {
|
||||
byte[] result = globalStateAccessor.getPlacementGroupInfo(name, global);
|
||||
return result == null ? null : PlacementGroupUtils.generatePlacementGroupFromByteArray(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all placement groups in this cluster.
|
||||
*
|
||||
|
|
|
@ -83,6 +83,13 @@ public class GlobalStateAccessor {
|
|||
}
|
||||
}
|
||||
|
||||
public byte[] getPlacementGroupInfo(String name, boolean global) {
|
||||
synchronized (GlobalStateAccessor.class) {
|
||||
validateGlobalStateAccessorPointer();
|
||||
return nativeGetPlacementGroupInfoByName(globalStateAccessorNativePointer, name, global);
|
||||
}
|
||||
}
|
||||
|
||||
public List<byte[]> getAllPlacementGroupInfo() {
|
||||
synchronized (GlobalStateAccessor.class) {
|
||||
validateGlobalStateAccessorPointer();
|
||||
|
@ -136,5 +143,8 @@ public class GlobalStateAccessor {
|
|||
|
||||
private native byte[] nativeGetPlacementGroupInfo(long nativePtr, byte[] placementGroupId);
|
||||
|
||||
private native byte[] nativeGetPlacementGroupInfoByName(
|
||||
long nativePtr, String name, boolean global);
|
||||
|
||||
private native List<byte[]> nativeGetAllPlacementGroupInfo(long nativePtr);
|
||||
}
|
||||
|
|
|
@ -13,8 +13,8 @@ import io.ray.api.id.TaskId;
|
|||
import io.ray.api.id.UniqueId;
|
||||
import io.ray.api.options.ActorCreationOptions;
|
||||
import io.ray.api.options.CallOptions;
|
||||
import io.ray.api.options.PlacementGroupCreationOptions;
|
||||
import io.ray.api.placementgroup.PlacementGroup;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import io.ray.runtime.RayRuntimeInternal;
|
||||
import io.ray.runtime.actor.LocalModeActorHandle;
|
||||
import io.ray.runtime.context.LocalModeWorkerContext;
|
||||
|
@ -250,14 +250,13 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PlacementGroup createPlacementGroup(
|
||||
String name, List<Map<String, Double>> bundles, PlacementStrategy strategy) {
|
||||
public PlacementGroup createPlacementGroup(PlacementGroupCreationOptions creationOptions) {
|
||||
PlacementGroupImpl placementGroup =
|
||||
new PlacementGroupImpl.Builder()
|
||||
.setId(PlacementGroupId.fromRandom())
|
||||
.setName(name)
|
||||
.setBundles(bundles)
|
||||
.setStrategy(strategy)
|
||||
.setName(creationOptions.name)
|
||||
.setBundles(creationOptions.bundles)
|
||||
.setStrategy(creationOptions.strategy)
|
||||
.build();
|
||||
placementGroups.put(placementGroup.getId(), placementGroup);
|
||||
return placementGroup;
|
||||
|
|
|
@ -9,13 +9,12 @@ import io.ray.api.id.ObjectId;
|
|||
import io.ray.api.id.PlacementGroupId;
|
||||
import io.ray.api.options.ActorCreationOptions;
|
||||
import io.ray.api.options.CallOptions;
|
||||
import io.ray.api.options.PlacementGroupCreationOptions;
|
||||
import io.ray.api.placementgroup.PlacementGroup;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import io.ray.runtime.actor.NativeActorHandle;
|
||||
import io.ray.runtime.functionmanager.FunctionDescriptor;
|
||||
import io.ray.runtime.placementgroup.PlacementGroupImpl;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -90,14 +89,23 @@ public class NativeTaskSubmitter implements TaskSubmitter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PlacementGroup createPlacementGroup(
|
||||
String name, List<Map<String, Double>> bundles, PlacementStrategy strategy) {
|
||||
byte[] bytes = nativeCreatePlacementGroup(name, bundles, strategy.value());
|
||||
public PlacementGroup createPlacementGroup(PlacementGroupCreationOptions creationOptions) {
|
||||
if (StringUtils.isNotBlank(creationOptions.name)) {
|
||||
PlacementGroup placementGroup =
|
||||
creationOptions.global
|
||||
? Ray.getGlobalPlacementGroup(creationOptions.name)
|
||||
: Ray.getPlacementGroup(creationOptions.name);
|
||||
|
||||
Preconditions.checkArgument(
|
||||
placementGroup == null,
|
||||
String.format("Placement group with name %s exists!", creationOptions.name));
|
||||
}
|
||||
byte[] bytes = nativeCreatePlacementGroup(creationOptions);
|
||||
return new PlacementGroupImpl.Builder()
|
||||
.setId(PlacementGroupId.fromBytes(bytes))
|
||||
.setName(name)
|
||||
.setBundles(bundles)
|
||||
.setStrategy(strategy)
|
||||
.setName(creationOptions.name)
|
||||
.setBundles(creationOptions.bundles)
|
||||
.setStrategy(creationOptions.strategy)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -133,7 +141,7 @@ public class NativeTaskSubmitter implements TaskSubmitter {
|
|||
CallOptions callOptions);
|
||||
|
||||
private static native byte[] nativeCreatePlacementGroup(
|
||||
String name, List<Map<String, Double>> bundles, int strategy);
|
||||
PlacementGroupCreationOptions creationOptions);
|
||||
|
||||
private static native void nativeRemovePlacementGroup(byte[] placementGroupId);
|
||||
|
||||
|
|
|
@ -6,11 +6,10 @@ import io.ray.api.id.ObjectId;
|
|||
import io.ray.api.id.PlacementGroupId;
|
||||
import io.ray.api.options.ActorCreationOptions;
|
||||
import io.ray.api.options.CallOptions;
|
||||
import io.ray.api.options.PlacementGroupCreationOptions;
|
||||
import io.ray.api.placementgroup.PlacementGroup;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import io.ray.runtime.functionmanager.FunctionDescriptor;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/** A set of methods to submit tasks and create actors. */
|
||||
public interface TaskSubmitter {
|
||||
|
@ -63,13 +62,10 @@ public interface TaskSubmitter {
|
|||
/**
|
||||
* Create a placement group.
|
||||
*
|
||||
* @param name Name of the placement group.
|
||||
* @param bundles Pre-allocated resource list.
|
||||
* @param strategy Actor placement strategy.
|
||||
* @param creationOptions Creation options of the placement group.
|
||||
* @return A handle to the created placement group.
|
||||
*/
|
||||
PlacementGroup createPlacementGroup(
|
||||
String name, List<Map<String, Double>> bundles, PlacementStrategy strategy);
|
||||
PlacementGroup createPlacementGroup(PlacementGroupCreationOptions creationOptions);
|
||||
|
||||
/**
|
||||
* Remove a placement group by id.
|
||||
|
|
|
@ -7,7 +7,10 @@ import io.ray.api.placementgroup.PlacementGroup;
|
|||
import io.ray.api.placementgroup.PlacementGroupState;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import io.ray.runtime.exception.RayException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
|
@ -37,7 +40,10 @@ public class PlacementGroupTest extends BaseTest {
|
|||
|
||||
// Test creating an actor from a constructor.
|
||||
ActorHandle<Counter> actor =
|
||||
Ray.actor(Counter::new, 1).setPlacementGroup(placementGroup, 0).remote();
|
||||
Ray.actor(Counter::new, 1)
|
||||
.setResource("CPU", 1.0)
|
||||
.setPlacementGroup(placementGroup, 0)
|
||||
.remote();
|
||||
Assert.assertNotEquals(actor.getId(), ActorId.NIL);
|
||||
|
||||
// Test calling an actor.
|
||||
|
@ -48,11 +54,11 @@ public class PlacementGroupTest extends BaseTest {
|
|||
public void testGetPlacementGroup() {
|
||||
PlacementGroup firstPlacementGroup =
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, "first_placement_group");
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, "first_placement_group", false);
|
||||
|
||||
PlacementGroup secondPlacementGroup =
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, "second_placement_group");
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, "second_placement_group", false);
|
||||
Assert.assertTrue(firstPlacementGroup.wait(60));
|
||||
Assert.assertTrue(secondPlacementGroup.wait(60));
|
||||
|
||||
|
@ -63,7 +69,6 @@ public class PlacementGroupTest extends BaseTest {
|
|||
Assert.assertNotNull(secondPlacementGroupRes);
|
||||
|
||||
Assert.assertEquals(firstPlacementGroup.getId(), firstPlacementGroupRes.getId());
|
||||
Assert.assertEquals(firstPlacementGroup.getName(), firstPlacementGroupRes.getName());
|
||||
Assert.assertEquals(firstPlacementGroupRes.getBundles().size(), 1);
|
||||
Assert.assertEquals(firstPlacementGroupRes.getStrategy(), PlacementStrategy.PACK);
|
||||
|
||||
|
@ -77,7 +82,6 @@ public class PlacementGroupTest extends BaseTest {
|
|||
? firstPlacementGroup
|
||||
: secondPlacementGroup;
|
||||
|
||||
Assert.assertEquals(placementGroupRes.getName(), expectPlacementGroup.getName());
|
||||
Assert.assertEquals(
|
||||
placementGroupRes.getBundles().size(), expectPlacementGroup.getBundles().size());
|
||||
Assert.assertEquals(placementGroupRes.getStrategy(), expectPlacementGroup.getStrategy());
|
||||
|
@ -87,11 +91,11 @@ public class PlacementGroupTest extends BaseTest {
|
|||
public void testRemovePlacementGroup() {
|
||||
PlacementGroup firstPlacementGroup =
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, "first_placement_group");
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, "first_placement_group", false);
|
||||
|
||||
PlacementGroup secondPlacementGroup =
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, "second_placement_group");
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, "second_placement_group", false);
|
||||
Assert.assertTrue(firstPlacementGroup.wait(60));
|
||||
Assert.assertTrue(secondPlacementGroup.wait(60));
|
||||
|
||||
|
@ -113,6 +117,7 @@ public class PlacementGroupTest extends BaseTest {
|
|||
Assert.assertEquals(exceptionCount, 1);
|
||||
}
|
||||
|
||||
@Test(groups = {"cluster"})
|
||||
public void testCheckBundleIndex() {
|
||||
PlacementGroup placementGroup = PlacementGroupTestUtils.createSimpleGroup();
|
||||
Assert.assertTrue(placementGroup.wait(60));
|
||||
|
@ -142,4 +147,81 @@ public class PlacementGroupTest extends BaseTest {
|
|||
public void testBundleResourceValidCheckWhenCreate() {
|
||||
PlacementGroupTestUtils.createBundleResourceInvalidGroup();
|
||||
}
|
||||
|
||||
@Test(groups = {"cluster"})
|
||||
public void testNamedPlacementGroup() {
|
||||
// Test Non-Global placement group.
|
||||
String pgName = "named_placement_group";
|
||||
PlacementGroup firstPlacementGroup =
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, pgName, false);
|
||||
Assert.assertTrue(firstPlacementGroup.wait(60));
|
||||
// Make sure we can get it by name successfully.
|
||||
PlacementGroup placementGroup = Ray.getPlacementGroup(pgName);
|
||||
Assert.assertNotNull(placementGroup);
|
||||
Assert.assertEquals(placementGroup.getBundles().size(), 1);
|
||||
|
||||
// Test global placement group.
|
||||
String pgGlobalName = "global_placement_group";
|
||||
PlacementGroup secondPlacementGroup =
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, pgGlobalName, true);
|
||||
Assert.assertTrue(secondPlacementGroup.wait(60));
|
||||
// Make sure we can get it by name successfully.
|
||||
placementGroup = Ray.getGlobalPlacementGroup(pgGlobalName);
|
||||
Assert.assertNotNull(placementGroup);
|
||||
Assert.assertEquals(placementGroup.getBundles().size(), 1);
|
||||
}
|
||||
|
||||
@Test(groups = {"cluster"})
|
||||
public void testCreatePlacementGroupWithSameName() {
|
||||
String pgName = "named_placement_group";
|
||||
PlacementGroup firstPlacementGroup =
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, pgName, false);
|
||||
Assert.assertTrue(firstPlacementGroup.wait(60));
|
||||
int exceptionCount = 0;
|
||||
try {
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, pgName, false);
|
||||
} catch (IllegalArgumentException e) {
|
||||
++exceptionCount;
|
||||
}
|
||||
Assert.assertEquals(exceptionCount, 1);
|
||||
}
|
||||
|
||||
@Test(groups = {"cluster"})
|
||||
public void testCreateGlobalPlacementGroupWithSameName() {
|
||||
String pgGlobalName = "global_placement_group";
|
||||
PlacementGroup firstPlacementGroup =
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, pgGlobalName, true);
|
||||
Assert.assertTrue(firstPlacementGroup.wait(60));
|
||||
int exceptionCount = 0;
|
||||
try {
|
||||
PlacementGroupTestUtils.createNameSpecifiedSimpleGroup(
|
||||
"CPU", 1, PlacementStrategy.PACK, 1.0, pgGlobalName, true);
|
||||
} catch (IllegalArgumentException e) {
|
||||
++exceptionCount;
|
||||
}
|
||||
Assert.assertEquals(exceptionCount, 1);
|
||||
}
|
||||
|
||||
@Test(groups = {"cluster"})
|
||||
public void testCompatibleForPreviousApi() {
|
||||
String pgName = "named_placement_group";
|
||||
List<Map<String, Double>> bundles = new ArrayList<>();
|
||||
for (int i = 0; i < 1; i++) {
|
||||
Map<String, Double> bundle = new HashMap<>();
|
||||
bundle.put("CPU", 1.0);
|
||||
bundles.add(bundle);
|
||||
}
|
||||
PlacementGroup placementGroup =
|
||||
Ray.createPlacementGroup(pgName, bundles, PlacementStrategy.PACK);
|
||||
Assert.assertTrue(placementGroup.wait(60));
|
||||
// Make sure we can get it by name successfully.
|
||||
PlacementGroup resPlacementGroup = Ray.getPlacementGroup(pgName);
|
||||
Assert.assertNotNull(resPlacementGroup);
|
||||
Assert.assertEquals(resPlacementGroup.getBundles().size(), 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package io.ray.test;
|
||||
|
||||
import io.ray.api.Ray;
|
||||
import io.ray.api.options.PlacementGroupCreationOptions;
|
||||
import io.ray.api.placementgroup.PlacementGroup;
|
||||
import io.ray.api.placementgroup.PlacementStrategy;
|
||||
import java.util.ArrayList;
|
||||
|
@ -16,7 +17,8 @@ public class PlacementGroupTestUtils {
|
|||
int bundleSize,
|
||||
PlacementStrategy strategy,
|
||||
Double resourceSize,
|
||||
String groupName) {
|
||||
String groupName,
|
||||
boolean isGlobal) {
|
||||
List<Map<String, Double>> bundles = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < bundleSize; i++) {
|
||||
|
@ -24,25 +26,36 @@ public class PlacementGroupTestUtils {
|
|||
bundle.put(resourceName, resourceSize);
|
||||
bundles.add(bundle);
|
||||
}
|
||||
PlacementGroupCreationOptions.Builder builder =
|
||||
new PlacementGroupCreationOptions.Builder().setBundles(bundles).setStrategy(strategy);
|
||||
if (isGlobal) {
|
||||
builder.setGlobalName(groupName);
|
||||
} else {
|
||||
builder.setName(groupName);
|
||||
}
|
||||
|
||||
return Ray.createPlacementGroup(groupName, bundles, strategy);
|
||||
return Ray.createPlacementGroup(builder.build());
|
||||
}
|
||||
|
||||
public static PlacementGroup createSpecifiedSimpleGroup(
|
||||
String resourceName, int bundleSize, PlacementStrategy strategy, Double resourceSize) {
|
||||
String resourceName,
|
||||
int bundleSize,
|
||||
PlacementStrategy strategy,
|
||||
Double resourceSize,
|
||||
boolean isGlobal) {
|
||||
return createNameSpecifiedSimpleGroup(
|
||||
resourceName, bundleSize, strategy, resourceSize, "unnamed_group");
|
||||
resourceName, bundleSize, strategy, resourceSize, "unnamed_group", isGlobal);
|
||||
}
|
||||
|
||||
public static PlacementGroup createSimpleGroup() {
|
||||
return createSpecifiedSimpleGroup("CPU", 1, PlacementStrategy.PACK, 1.0);
|
||||
return createSpecifiedSimpleGroup("CPU", 1, PlacementStrategy.PACK, 1.0, false);
|
||||
}
|
||||
|
||||
public static void createBundleSizeInvalidGroup() {
|
||||
createSpecifiedSimpleGroup("CPU", 0, PlacementStrategy.PACK, 1.0);
|
||||
createSpecifiedSimpleGroup("CPU", 0, PlacementStrategy.PACK, 1.0, false);
|
||||
}
|
||||
|
||||
public static void createBundleResourceInvalidGroup() {
|
||||
createSpecifiedSimpleGroup("CPU", 1, PlacementStrategy.PACK, 0.0);
|
||||
createSpecifiedSimpleGroup("CPU", 1, PlacementStrategy.PACK, 0.0, false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -268,7 +268,7 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetActorIdOfNamedActor(JNIEnv *env, j
|
|||
jstring actor_name,
|
||||
jboolean global) {
|
||||
const char *native_actor_name = env->GetStringUTFChars(actor_name, JNI_FALSE);
|
||||
auto full_name = GetActorFullName(global, native_actor_name);
|
||||
auto full_name = GetFullName(global, native_actor_name);
|
||||
|
||||
const auto actor_handle =
|
||||
ray::CoreWorkerProcess::GetCoreWorker().GetNamedActorHandle(full_name).first;
|
||||
|
|
|
@ -121,6 +121,20 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfo(
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
JNIEXPORT jbyteArray JNICALL
|
||||
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfoByName(
|
||||
JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jstring name, jboolean global) {
|
||||
std::string placement_group_name = JavaStringToNativeString(env, name);
|
||||
auto full_name = GetFullName(global, placement_group_name);
|
||||
auto *gcs_accessor =
|
||||
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
|
||||
auto placement_group = gcs_accessor->GetPlacementGroupByName(full_name);
|
||||
if (placement_group) {
|
||||
return NativeStringToJavaByteArray(env, *placement_group);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
JNIEXPORT jobject JNICALL
|
||||
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllPlacementGroupInfo(
|
||||
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
|
||||
|
|
|
@ -104,6 +104,15 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfo(JNIEnv *
|
|||
jlong,
|
||||
jbyteArray);
|
||||
|
||||
/*
|
||||
* Class: io_ray_runtime_gcs_GlobalStateAccessor
|
||||
* Method: nativeGetPlacementGroupInfoByName
|
||||
* Signature: (JLjava/lang/String;Z)[B
|
||||
*/
|
||||
JNIEXPORT jbyteArray JNICALL
|
||||
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetPlacementGroupInfoByName(
|
||||
JNIEnv *, jobject, jlong, jstring, jboolean);
|
||||
|
||||
/*
|
||||
* Class: io_ray_runtime_gcs_GlobalStateAccessor
|
||||
* Method: nativeGetAllPlacementGroupInfo
|
||||
|
|
|
@ -156,7 +156,7 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
|
|||
}
|
||||
}
|
||||
|
||||
auto full_name = GetActorFullName(global, name);
|
||||
auto full_name = GetFullName(global, name);
|
||||
ray::ActorCreationOptions actor_creation_options{
|
||||
max_restarts,
|
||||
0, // TODO: Allow setting max_task_retries from Java.
|
||||
|
@ -185,7 +185,22 @@ inline ray::PlacementStrategy ConvertStrategy(jint java_strategy) {
|
|||
}
|
||||
|
||||
inline ray::PlacementGroupCreationOptions ToPlacementGroupCreationOptions(
|
||||
JNIEnv *env, jstring name, jobject java_bundles, jint java_strategy) {
|
||||
JNIEnv *env, jobject placementGroupCreationOptions) {
|
||||
// We have make sure the placementGroupCreationOptions is not null in java api.
|
||||
bool global = env->GetBooleanField(placementGroupCreationOptions,
|
||||
java_placement_group_creation_options_global);
|
||||
std::string name = "";
|
||||
jstring java_name = (jstring)env->GetObjectField(
|
||||
placementGroupCreationOptions, java_placement_group_creation_options_name);
|
||||
if (java_name) {
|
||||
name = JavaStringToNativeString(env, java_name);
|
||||
}
|
||||
jobject java_obj_strategy = env->GetObjectField(
|
||||
placementGroupCreationOptions, java_placement_group_creation_options_strategy);
|
||||
jint java_strategy = env->CallIntMethod(
|
||||
java_obj_strategy, java_placement_group_creation_options_strategy_value);
|
||||
jobject java_bundles = env->GetObjectField(
|
||||
placementGroupCreationOptions, java_placement_group_creation_options_bundles);
|
||||
std::vector<std::unordered_map<std::string, double>> bundles;
|
||||
JavaListToNativeVector<std::unordered_map<std::string, double>>(
|
||||
env, java_bundles, &bundles, [](JNIEnv *env, jobject java_bundle) {
|
||||
|
@ -200,8 +215,9 @@ inline ray::PlacementGroupCreationOptions ToPlacementGroupCreationOptions(
|
|||
return value;
|
||||
});
|
||||
});
|
||||
return ray::PlacementGroupCreationOptions(JavaStringToNativeString(env, name),
|
||||
ConvertStrategy(java_strategy), bundles,
|
||||
auto full_name = GetFullName(global, name);
|
||||
return ray::PlacementGroupCreationOptions(full_name, ConvertStrategy(java_strategy),
|
||||
bundles,
|
||||
/*is_detached=*/false);
|
||||
}
|
||||
|
||||
|
@ -277,8 +293,8 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(
|
|||
|
||||
JNIEXPORT jbyteArray JNICALL
|
||||
Java_io_ray_runtime_task_NativeTaskSubmitter_nativeCreatePlacementGroup(
|
||||
JNIEnv *env, jclass, jstring name, jobject bundles, jint strategy) {
|
||||
auto options = ToPlacementGroupCreationOptions(env, name, bundles, strategy);
|
||||
JNIEnv *env, jclass, jobject placementGroupCreationOptions) {
|
||||
auto options = ToPlacementGroupCreationOptions(env, placementGroupCreationOptions);
|
||||
ray::PlacementGroupID placement_group_id;
|
||||
auto status = ray::CoreWorkerProcess::GetCoreWorker().CreatePlacementGroup(
|
||||
options, &placement_group_id);
|
||||
|
|
|
@ -55,12 +55,11 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(JNIEnv *, jcl
|
|||
/*
|
||||
* Class: io_ray_runtime_task_NativeTaskSubmitter
|
||||
* Method: nativeCreatePlacementGroup
|
||||
* Signature: (Ljava/lang/String;Ljava/util/List;I)[B
|
||||
* Signature: (Lio/ray/api/options/PlacementGroupCreationOptions;)[B
|
||||
*/
|
||||
JNIEXPORT jbyteArray JNICALL
|
||||
Java_io_ray_runtime_task_NativeTaskSubmitter_nativeCreatePlacementGroup(JNIEnv *, jclass,
|
||||
jstring, jobject,
|
||||
jint);
|
||||
jobject);
|
||||
|
||||
/*
|
||||
* Class: io_ray_runtime_task_NativeTaskSubmitter
|
||||
|
|
|
@ -92,6 +92,14 @@ jfieldID java_actor_creation_options_max_concurrency;
|
|||
jfieldID java_actor_creation_options_group;
|
||||
jfieldID java_actor_creation_options_bundle_index;
|
||||
|
||||
jclass java_placement_group_creation_options_class;
|
||||
jclass java_placement_group_creation_options_strategy_class;
|
||||
jfieldID java_placement_group_creation_options_global;
|
||||
jfieldID java_placement_group_creation_options_name;
|
||||
jfieldID java_placement_group_creation_options_bundles;
|
||||
jfieldID java_placement_group_creation_options_strategy;
|
||||
jmethodID java_placement_group_creation_options_strategy_value;
|
||||
|
||||
jclass java_gcs_client_options_class;
|
||||
jfieldID java_gcs_client_options_ip;
|
||||
jfieldID java_gcs_client_options_port;
|
||||
|
@ -228,6 +236,22 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
|
|||
java_placement_group_id = env->GetFieldID(java_placement_group_class, "id",
|
||||
"Lio/ray/api/id/PlacementGroupId;");
|
||||
|
||||
java_placement_group_creation_options_class =
|
||||
LoadClass(env, "io/ray/api/options/PlacementGroupCreationOptions");
|
||||
java_placement_group_creation_options_strategy_class =
|
||||
LoadClass(env, "io/ray/api/placementgroup/PlacementStrategy");
|
||||
java_placement_group_creation_options_global =
|
||||
env->GetFieldID(java_placement_group_creation_options_class, "global", "Z");
|
||||
java_placement_group_creation_options_name = env->GetFieldID(
|
||||
java_placement_group_creation_options_class, "name", "Ljava/lang/String;");
|
||||
java_placement_group_creation_options_bundles = env->GetFieldID(
|
||||
java_placement_group_creation_options_class, "bundles", "Ljava/util/List;");
|
||||
java_placement_group_creation_options_strategy =
|
||||
env->GetFieldID(java_placement_group_creation_options_class, "strategy",
|
||||
"Lio/ray/api/placementgroup/PlacementStrategy;");
|
||||
java_placement_group_creation_options_strategy_value = env->GetMethodID(
|
||||
java_placement_group_creation_options_strategy_class, "value", "()I");
|
||||
|
||||
java_actor_creation_options_class =
|
||||
LoadClass(env, "io/ray/api/options/ActorCreationOptions");
|
||||
java_actor_creation_options_global =
|
||||
|
@ -302,6 +326,8 @@ void JNI_OnUnload(JavaVM *vm, void *reserved) {
|
|||
env->DeleteGlobalRef(java_function_arg_class);
|
||||
env->DeleteGlobalRef(java_base_task_options_class);
|
||||
env->DeleteGlobalRef(java_actor_creation_options_class);
|
||||
env->DeleteGlobalRef(java_placement_group_creation_options_class);
|
||||
env->DeleteGlobalRef(java_placement_group_creation_options_strategy_class);
|
||||
env->DeleteGlobalRef(java_native_ray_object_class);
|
||||
env->DeleteGlobalRef(java_task_executor_class);
|
||||
env->DeleteGlobalRef(java_native_task_executor_class);
|
||||
|
|
|
@ -160,6 +160,21 @@ extern jfieldID java_actor_creation_options_group;
|
|||
/// bundleIndex field of ActorCreationOptions class
|
||||
extern jfieldID java_actor_creation_options_bundle_index;
|
||||
|
||||
/// PlacementGroupCreationOptions class
|
||||
extern jclass java_placement_group_creation_options_class;
|
||||
/// PlacementStrategy class
|
||||
extern jclass java_placement_group_creation_options_strategy_class;
|
||||
/// global field of PlacementGroupCreationOptions class
|
||||
extern jfieldID java_placement_group_creation_options_global;
|
||||
/// name field of PlacementGroupCreationOptions class
|
||||
extern jfieldID java_placement_group_creation_options_name;
|
||||
/// bundles field of PlacementGroupCreationOptions class
|
||||
extern jfieldID java_placement_group_creation_options_bundles;
|
||||
/// strategy field of PlacementGroupCreationOptions class
|
||||
extern jfieldID java_placement_group_creation_options_strategy;
|
||||
/// value method of PlacementStrategy class
|
||||
extern jmethodID java_placement_group_creation_options_strategy_value;
|
||||
|
||||
/// GcsClientOptions class
|
||||
extern jclass java_gcs_client_options_class;
|
||||
/// ip field of GcsClientOptions class
|
||||
|
@ -561,8 +576,9 @@ inline NativeT JavaProtobufObjectToNativeProtobufObject(JNIEnv *env, jobject jav
|
|||
return native_obj;
|
||||
}
|
||||
|
||||
// Return an actor fullname with job id prepended if this tis a global actor.
|
||||
inline std::string GetActorFullName(bool global, std::string name) {
|
||||
// Return an actor or a placement group fullname with job id prepended if this is a global
|
||||
// actor or placement group.
|
||||
inline std::string GetFullName(bool global, std::string name) {
|
||||
if (name.empty()) {
|
||||
return "";
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue