[Java] Use gcs client instead of redis client to get session dir (#16773)

* Use gcs client instead of redis client to get session dir

* fix compile and add comments

* fix compile

* lint

* fix

* lint

* lint

* Update src/ray/gcs/gcs_client/global_state_accessor.h

Co-authored-by: Qing Wang <kingchin1218@126.com>

* Update java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java

Co-authored-by: Qing Wang <kingchin1218@126.com>

* per comment

Co-authored-by: Qing Wang <kingchin1218@126.com>
This commit is contained in:
Tao Wang 2021-07-13 14:01:22 +08:00 committed by GitHub
parent e7350ff828
commit 5b7e76770d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 76 additions and 18 deletions

3
.gitignore vendored
View file

@ -158,6 +158,9 @@ java/**/.classpath
java/**/.project
java/runtime/native_dependencies/
# Cpp
cpp/example/thirdparty/
# streaming/python
streaming/python/generated/
streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/generated/

View file

@ -10,7 +10,6 @@ import io.ray.runtime.context.NativeWorkerContext;
import io.ray.runtime.exception.RayIntentionalSystemExitException;
import io.ray.runtime.gcs.GcsClient;
import io.ray.runtime.gcs.GcsClientOptions;
import io.ray.runtime.gcs.RedisClient;
import io.ray.runtime.generated.Common.WorkerType;
import io.ray.runtime.generated.Gcs.GcsNodeInfo;
import io.ray.runtime.generated.Gcs.JobConfig;
@ -47,14 +46,11 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
super(rayConfig);
}
private void updateSessionDir() {
if (rayConfig.workerMode == WorkerType.DRIVER) {
// Fetch session dir from GCS if this is a driver.
RedisClient client = new RedisClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
final String sessionDir = client.get("session_dir", "value");
Preconditions.checkNotNull(sessionDir);
rayConfig.setSessionDir(sessionDir);
}
private void updateSessionDir(GcsClient gcsClient) {
// Fetch session dir from GCS.
final String sessionDir = gcsClient.getInternalKV("session_dir");
Preconditions.checkNotNull(sessionDir);
rayConfig.setSessionDir(sessionDir);
}
@Override
@ -68,13 +64,22 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
}
Preconditions.checkNotNull(rayConfig.getRedisAddress());
updateSessionDir();
// Expose ray ABI symbols which may be depended by other shared
// libraries such as libstreaming_java.so.
// See BUILD.bazel:libcore_worker_library_java.so
Preconditions.checkNotNull(rayConfig.sessionDir);
JniUtils.loadLibrary(rayConfig.sessionDir, BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
// In order to remove redis dependency in Java lang, we use a temp dir to load library
// instead of getting session dir from redis.
if (rayConfig.workerMode == WorkerType.DRIVER) {
String tmpDir = "/tmp/ray/".concat(String.valueOf(System.currentTimeMillis()));
JniUtils.loadLibrary(tmpDir, BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
gcsClient = new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
updateSessionDir(gcsClient);
Preconditions.checkNotNull(rayConfig.sessionDir);
} else {
// Expose ray ABI symbols which may be depended by other shared
// libraries such as libstreaming_java.so.
// See BUILD.bazel:libcore_worker_library_java.so
Preconditions.checkNotNull(rayConfig.sessionDir);
JniUtils.loadLibrary(rayConfig.sessionDir, BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
gcsClient = new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
}
gcsClient = new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);

View file

@ -66,6 +66,11 @@ public class GcsClient {
return placementGroups;
}
public String getInternalKV(String key) {
byte[] value = globalStateAccessor.getInternalKV(key);
return value == null ? null : new String(value);
}
public List<NodeInfo> getAllNodeInfo() {
List<byte[]> results = globalStateAccessor.getAllNodeInfo();

View file

@ -106,6 +106,13 @@ public class GlobalStateAccessor {
}
}
public byte[] getInternalKV(String k) {
synchronized (GlobalStateAccessor.class) {
validateGlobalStateAccessorPointer();
return this.nativeGetInternalKV(globalStateAccessorNativePointer, k);
}
}
/** Returns A list of actor info with ActorInfo protobuf schema. */
public List<byte[]> getAllActorInfo() {
// Fetch a actor list with protobuf bytes format from GCS.
@ -168,5 +175,7 @@ public class GlobalStateAccessor {
private native List<byte[]> nativeGetAllPlacementGroupInfo(long nativePtr);
private native byte[] nativeGetInternalKV(long nativePtr, String k);
private native byte[] nativeGetNodeToConnectForDriver(long nativePtr, String nodeIpAddress);
}

View file

@ -61,7 +61,7 @@ public class JniUtils {
public static synchronized void loadLibrary(
String destDir, String libraryName, boolean exportSymbols) {
if (!loadedLibs.contains(libraryName)) {
LOGGER.debug("Loading native library {}.", libraryName);
LOGGER.debug("Loading native library {} in {}.", libraryName, destDir);
// Load native library.
String fileName = System.mapLibraryName(libraryName);
final File file = BinaryFileUtil.getNativeFile(destDir, fileName);

View file

@ -157,6 +157,20 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllPlacementGroupInfo(
});
}
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetInternalKV(JNIEnv *env, jobject o,
jlong gcs_accessor_ptr,
jstring k) {
std::string key = JavaStringToNativeString(env, k);
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto value = gcs_accessor->GetInternalKV(key);
if (value) {
return NativeStringToJavaByteArray(env, *value);
}
return nullptr;
}
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeToConnectForDriver(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jstring nodeIpAddress) {

View file

@ -131,6 +131,15 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllPlacementGroupInfo(JNIEn
jobject,
jlong);
/*
* Class: io_ray_runtime_gcs_GlobalStateAccessor
* Method: nativeGetInternalKV
* Signature: (JLjava/lang/String;)[B
*/
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetInternalKV(JNIEnv *, jobject, jlong,
jstring);
/*
* Class: io_ray_runtime_gcs_GlobalStateAccessor
* Method: nativeGetNodeToConnectForDriver

View file

@ -262,6 +262,12 @@ std::unique_ptr<std::string> GlobalStateAccessor::GetPlacementGroupByName(
return placement_group_table_data;
}
std::unique_ptr<std::string> GlobalStateAccessor::GetInternalKV(const std::string &key) {
std::string value;
Status status = gcs_client_->InternalKV().Get(key, value);
return status.ok() ? std::make_unique<std::string>(value) : nullptr;
}
std::string GlobalStateAccessor::GetSystemConfig() {
std::promise<std::string> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetInternalConfig(

View file

@ -162,12 +162,19 @@ class GlobalStateAccessor {
/// Get information of a placement group from GCS Service by name.
///
/// \param placement_group_name The name of placement group to look up in the GCS
/// Service. \return Placement group info. To support multi-language, we serialize each
/// Service.
/// \return Placement group info. To support multi-language, we serialize each
/// PlacementGroupTableData and return the serialized string. Where used, it needs to be
/// deserialized with protobuf function.
std::unique_ptr<std::string> GetPlacementGroupByName(
const std::string &placement_group_name, const std::string &ray_namespace);
/// Get value of the key from GCS Service.
///
/// \param key key to get.
/// \return Value of the key.
std::unique_ptr<std::string> GetInternalKV(const std::string &key);
/// Get the serialized system config from GCS.
///
/// \return The serialized system config.