[Java] Dynamic resource API in Java (#4824)

This commit is contained in:
Qing Wang 2019-05-21 17:13:48 +08:00 committed by Si-Yuan
parent 02583a8598
commit 081708bdef
9 changed files with 117 additions and 0 deletions

View file

@ -123,6 +123,21 @@ public final class Ray extends RayCall {
return runtime;
}
/**
* Update the resource for the specified client.
* Set the resource for the specific node.
*/
public static void setResource(UniqueId nodeId, String resourceName, double capacity) {
runtime.setResource(resourceName, capacity, nodeId);
}
/**
* Set the resource for local node.
*/
public static void setResource(String resourceName, double capacity) {
runtime.setResource(resourceName, capacity, UniqueId.NIL);
}
/**
* Get the runtime context.
*/

View file

@ -65,6 +65,15 @@ public interface RayRuntime {
*/
void free(List<UniqueId> objectIds, boolean localOnly, boolean deleteCreatingTasks);
/**
* Set the resource for the specific node.
*
* @param resourceName The name of resource.
* @param capacity The capacity of the resource.
* @param nodeId The node that we want to set its resource.
*/
void setResource(String resourceName, double capacity, UniqueId nodeId);
/**
* Invoke a remote function.
*

View file

@ -210,6 +210,15 @@ public abstract class AbstractRayRuntime implements RayRuntime {
rayletClient.freePlasmaObjects(objectIds, localOnly, deleteCreatingTasks);
}
@Override
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
Preconditions.checkArgument(Double.compare(capacity, 0) >= 0);
if (nodeId == null) {
nodeId = UniqueId.NIL;
}
rayletClient.setResource(resourceName, capacity, nodeId);
}
private List<List<UniqueId>> splitIntoBatches(List<UniqueId> objectIds) {
List<List<UniqueId>> batches = new ArrayList<>();
int objectsSize = objectIds.size();

View file

@ -209,6 +209,11 @@ public class MockRayletClient implements RayletClient {
throw new NotImplementedException("Not implemented.");
}
@Override
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
LOGGER.error("Not implemented under SINGLE_PROCESS mode.");
}
@Override
public void destroy() {
exec.shutdown();

View file

@ -30,5 +30,7 @@ public interface RayletClient {
void notifyActorResumedFromCheckpoint(UniqueId actorId, UniqueId checkpointId);
void setResource(String resourceName, double capacity, UniqueId nodeId);
void destroy();
}

View file

@ -308,6 +308,10 @@ public class RayletClientImpl implements RayletClient {
return buffer;
}
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
nativeSetResource(client, resourceName, capacity, nodeId.getBytes());
}
public void destroy() {
nativeDestroy(client);
}
@ -357,4 +361,7 @@ public class RayletClientImpl implements RayletClient {
private static native void nativeNotifyActorResumedFromCheckpoint(long conn, byte[] actorId,
byte[] checkpointId);
private static native void nativeSetResource(long conn, String resourceName, double capacity,
byte[] nodeId) throws RayException;
}

View file

@ -0,0 +1,44 @@
package org.ray.api.test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.WaitResult;
import org.ray.api.annotation.RayRemote;
import org.ray.api.options.CallOptions;
import org.ray.api.runtimecontext.NodeInfo;
import org.testng.Assert;
import org.testng.annotations.Test;
public class DynamicResourceTest extends BaseTest {
@RayRemote
public static String sayHi() {
return "hi";
}
@Test
public void testSetResource() {
TestUtils.skipTestUnderSingleProcess();
CallOptions op1 = new CallOptions(ImmutableMap.of("A", 10.0));
RayObject<String> obj = Ray.call(DynamicResourceTest::sayHi, op1);
WaitResult<String> result = Ray.wait(ImmutableList.of(obj), 1, 1000);
Assert.assertEquals(result.getReady().size(), 0);
Ray.setResource("A", 10.0);
// Assert node info.
List<NodeInfo> nodes = Ray.getRuntimeContext().getAllNodeInfo();
Assert.assertEquals(nodes.size(), 1);
Assert.assertEquals(nodes.get(0).resources.get("A"), 10.0);
// Assert ray call result.
result = Ray.wait(ImmutableList.of(obj), 1, 1000);
Assert.assertEquals(result.getReady().size(), 1);
Assert.assertEquals(Ray.get(obj.getId()), "hi");
}
}

View file

@ -302,6 +302,24 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeNotifyActorResumedFromCheckpo
ThrowRayExceptionIfNotOK(env, status);
}
/*
* Class: org_ray_runtime_raylet_RayletClientImpl
* Method: nativeSetResource
* Signature: (JLjava/lang/String;D[B)V
*/
JNIEXPORT void JNICALL
Java_org_ray_runtime_raylet_RayletClientImpl_nativeSetResource(JNIEnv *env, jclass,
jlong client, jstring resourceName, jdouble capacity, jbyteArray nodeId) {
auto raylet_client = reinterpret_cast<RayletClient *>(client);
UniqueIdFromJByteArray<ClientID> node_id(env, nodeId);
const char *native_resource_name = env->GetStringUTFChars(resourceName, JNI_FALSE);
auto status = raylet_client->SetResource(native_resource_name,
static_cast<double>(capacity), node_id.GetId());
env->ReleaseStringUTFChars(resourceName, native_resource_name);
ThrowRayExceptionIfNotOK(env, status);
}
#ifdef __cplusplus
}
#endif

View file

@ -116,6 +116,14 @@ JNIEXPORT void JNICALL
Java_org_ray_runtime_raylet_RayletClientImpl_nativeNotifyActorResumedFromCheckpoint(
JNIEnv *, jclass, jlong, jbyteArray, jbyteArray);
/*
* Class: org_ray_runtime_raylet_RayletClientImpl
* Method: nativeSetResource
* Signature: (JLjava/lang/String;D[B)V
*/
JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSetResource(
JNIEnv *, jclass, jlong, jstring, jdouble, jbyteArray);
#ifdef __cplusplus
}
#endif