[Java]More efficient getAllNodeInfo() (#26872)

Like https://github.com/ray-project/ray/pull/26760, we remove the unnecessary rpc calls for each node to get their resources, instead we use the existing fields.
This commit is contained in:
Tao Wang 2022-07-25 16:53:03 +08:00 committed by GitHub
parent abde2a5f97
commit 4d6cbb0fd4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 3 additions and 58 deletions

View file

@ -98,34 +98,15 @@ public class GcsClient {
data.getRayletSocketName(),
data.getState() == GcsNodeInfo.GcsNodeState.ALIVE,
new HashMap<>());
if (nodeInfo.isAlive) {
nodeInfo.resources.putAll(data.getResourcesTotalMap());
}
nodes.put(nodeId, nodeInfo);
}
// Fill resources.
for (Map.Entry<UniqueId, NodeInfo> node : nodes.entrySet()) {
if (node.getValue().isAlive) {
node.getValue().resources.putAll(getResourcesForClient(node.getKey()));
}
}
return new ArrayList<>(nodes.values());
}
private Map<String, Double> getResourcesForClient(UniqueId clientId) {
byte[] resourceMapBytes = globalStateAccessor.getNodeResourceInfo(clientId);
Gcs.ResourceMap resourceMap;
try {
resourceMap = Gcs.ResourceMap.parseFrom(resourceMapBytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid protobuf data from GCS.");
}
HashMap<String, Double> resources = new HashMap<>();
for (Map.Entry<String, Gcs.ResourceTableData> entry : resourceMap.getItemsMap().entrySet()) {
resources.put(entry.getKey(), entry.getValue().getResourceCapacity());
}
return resources;
}
/** If the actor exists in GCS. */
public boolean actorExists(ActorId actorId) {
byte[] result = globalStateAccessor.getActorInfo(actorId);

View file

@ -3,7 +3,6 @@ package io.ray.runtime.gcs;
import com.google.common.base.Preconditions;
import io.ray.api.id.ActorId;
import io.ray.api.id.PlacementGroupId;
import io.ray.api.id.UniqueId;
import java.util.List;
/** `GlobalStateAccessor` is used for accessing information from GCS. */
@ -72,19 +71,6 @@ public class GlobalStateAccessor {
}
}
/**
* Get node resource info.
*
* @param nodeId node unique id.
* @return A map of node resource info in protobuf schema.
*/
public byte[] getNodeResourceInfo(UniqueId nodeId) {
synchronized (GlobalStateAccessor.class) {
validateGlobalStateAccessorPointer();
return nativeGetNodeResourceInfo(globalStateAccessorNativePointer, nodeId.getBytes());
}
}
public byte[] getPlacementGroupInfo(PlacementGroupId placementGroupId) {
synchronized (GlobalStateAccessor.class) {
validateGlobalStateAccessorPointer();
@ -163,8 +149,6 @@ public class GlobalStateAccessor {
private native List<byte[]> nativeGetAllNodeInfo(long nativePtr);
private native byte[] nativeGetNodeResourceInfo(long nativePtr, byte[] nodeId);
private native List<byte[]> nativeGetAllActorInfo(long nativePtr);
private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId);

View file

@ -80,15 +80,6 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *env,
});
}
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jbyteArray node_id_bytes) {
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto node_id = JavaByteArrayToId<NodeID>(env, node_id_bytes);
auto node_resource_info = gcs_accessor->GetNodeResourceInfo(node_id);
return static_cast<jbyteArray>(NativeStringToJavaByteArray(env, node_resource_info));
}
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllActorInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {

View file

@ -76,17 +76,6 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *,
jobject,
jlong);
/*
* Class: io_ray_runtime_gcs_GlobalStateAccessor
* Method: nativeGetNodeResourceInfo
* Signature: (J[B)[B
*/
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo(JNIEnv *,
jobject,
jlong,
jbyteArray);
/*
* Class: io_ray_runtime_gcs_GlobalStateAccessor
* Method: nativeGetAllActorInfo