[Core] Remove thread local core worker instance 2/n (#25159)

We removed the thread local core worker instance in this PR, which is the further arch cleaning stuff for removing multiple workers in one process.
It also removes the unnecessary parameter `workerId` from JNI.
This commit is contained in:
Qing Wang 2022-06-03 14:08:44 +08:00 committed by GitHub
parent 60587cf1dc
commit 99429b7a92
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 77 additions and 207 deletions

View file

@ -116,14 +116,12 @@ public abstract class NativeActorHandle implements BaseActorHandle, Externalizab
private static final class NativeActorHandleReference
extends FinalizableWeakReference<NativeActorHandle> {
private final AtomicBoolean removed;
private final byte[] workerId;
private final byte[] actorId;
public NativeActorHandleReference(NativeActorHandle handle) {
super(handle, REFERENCE_QUEUE);
this.actorId = handle.actorId;
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
this.workerId = runtime.getWorkerContext().getCurrentWorkerId().getBytes();
this.removed = new AtomicBoolean(false);
REFERENCES.add(this);
}
@ -134,7 +132,7 @@ public abstract class NativeActorHandle implements BaseActorHandle, Externalizab
REFERENCES.remove(this);
// It's possible that GC is executed after the runtime is shutdown.
if (Ray.isInitialized()) {
nativeRemoveActorHandleReference(workerId, actorId);
nativeRemoveActorHandleReference(actorId);
}
}
}
@ -150,5 +148,5 @@ public abstract class NativeActorHandle implements BaseActorHandle, Externalizab
private static native byte[] nativeDeserialize(byte[] data);
private static native void nativeRemoveActorHandleReference(byte[] workerId, byte[] actorId);
private static native void nativeRemoveActorHandleReference(byte[] actorId);
}

View file

@ -4,7 +4,6 @@ import com.google.common.base.Preconditions;
import io.ray.api.exception.RayTimeoutException;
import io.ray.api.id.ActorId;
import io.ray.api.id.ObjectId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.context.WorkerContext;
import io.ray.runtime.generated.Common.Address;
import java.util.ArrayList;
@ -110,10 +109,10 @@ public class LocalModeObjectStore extends ObjectStore {
}
@Override
public void addLocalReference(UniqueId workerId, ObjectId objectId) {}
public void addLocalReference(ObjectId objectId) {}
@Override
public void removeLocalReference(UniqueId workerId, ObjectId objectId) {}
public void removeLocalReference(ObjectId objectId) {}
@Override
public Address getOwnerAddress(ObjectId id) {

View file

@ -5,7 +5,6 @@ import io.ray.api.Ray;
import io.ray.api.id.ActorId;
import io.ray.api.id.BaseId;
import io.ray.api.id.ObjectId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.context.WorkerContext;
import io.ray.runtime.generated.Common.Address;
@ -66,16 +65,16 @@ public class NativeObjectStore extends ObjectStore {
}
@Override
public void addLocalReference(UniqueId workerId, ObjectId objectId) {
nativeAddLocalReference(workerId.getBytes(), objectId.getBytes());
public void addLocalReference(ObjectId objectId) {
nativeAddLocalReference(objectId.getBytes());
}
@Override
public void removeLocalReference(UniqueId workerId, ObjectId objectId) {
public void removeLocalReference(ObjectId objectId) {
Lock readLock = shutdownLock.readLock();
readLock.lock();
try {
nativeRemoveLocalReference(workerId.getBytes(), objectId.getBytes());
nativeRemoveLocalReference(objectId.getBytes());
} finally {
readLock.unlock();
}
@ -128,9 +127,9 @@ public class NativeObjectStore extends ObjectStore {
private static native void nativeDelete(List<byte[]> objectIds, boolean localOnly);
private static native void nativeAddLocalReference(byte[] workerId, byte[] objectId);
private static native void nativeAddLocalReference(byte[] objectId);
private static native void nativeRemoveLocalReference(byte[] workerId, byte[] objectId);
private static native void nativeRemoveLocalReference(byte[] objectId);
private static native Map<byte[], long[]> nativeGetAllReferenceCounts();

View file

@ -7,7 +7,6 @@ import com.google.common.collect.Sets;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.id.ObjectId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.AbstractRayRuntime;
import java.io.Externalizable;
import java.io.IOException;
@ -38,10 +37,6 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
private ObjectId id;
// In GC thread, we don't know which worker this object binds to, so we need to
// store the worker ID for later uses.
private transient UniqueId workerId;
private Class<T> type;
// Raw data of this object.
@ -61,11 +56,9 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
this.id = id;
this.type = (Class<T>) type;
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
Preconditions.checkState(workerId == null);
workerId = runtime.getWorkerContext().getCurrentWorkerId();
if (!skipAddingLocalRef) {
runtime.getObjectStore().addLocalReference(workerId, id);
runtime.getObjectStore().addLocalReference(id);
}
// We still add the reference so that the local ref count will be properly
// decremented once this object is GCed.
@ -122,9 +115,7 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
in.readFully(ownerAddress);
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
Preconditions.checkState(workerId == null);
workerId = runtime.getWorkerContext().getCurrentWorkerId();
runtime.getObjectStore().addLocalReference(workerId, id);
runtime.getObjectStore().addLocalReference(id);
new ObjectRefImplReference(this);
runtime
@ -136,13 +127,11 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
private static final class ObjectRefImplReference
extends FinalizableWeakReference<ObjectRefImpl<?>> {
private final UniqueId workerId;
private final ObjectId objectId;
private final AtomicBoolean removed;
public ObjectRefImplReference(ObjectRefImpl<?> obj) {
super(obj, REFERENCE_QUEUE);
this.workerId = obj.workerId;
this.objectId = obj.id;
this.removed = new AtomicBoolean(false);
REFERENCES.add(this);
@ -151,14 +140,12 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
@Override
public void finalizeReferent() {
// This method may be invoked multiple times on the same instance (due to explicit invoking in
// unit tests). So if `workerId` is null, it means this method has been invoked.
// unit tests).
if (!removed.getAndSet(true)) {
REFERENCES.remove(this);
// It's possible that GC is executed after the runtime is shutdown.
if (Ray.isInitialized()) {
((AbstractRayRuntime) (Ray.internal()))
.getObjectStore()
.removeLocalReference(workerId, objectId);
((AbstractRayRuntime) (Ray.internal())).getObjectStore().removeLocalReference(objectId);
allObjects.remove(objectId);
LOG.debug("Object {} is finalized.", objectId);
}

View file

@ -6,7 +6,6 @@ import io.ray.api.WaitResult;
import io.ray.api.exception.RayException;
import io.ray.api.id.ActorId;
import io.ray.api.id.ObjectId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.context.WorkerContext;
import io.ray.runtime.generated.Common.Address;
import java.util.ArrayList;
@ -222,18 +221,16 @@ public abstract class ObjectStore {
/**
* Increase the local reference count for this object ID.
*
* @param workerId The ID of the worker to increase on.
* @param objectId The object ID to increase the reference count for.
*/
public abstract void addLocalReference(UniqueId workerId, ObjectId objectId);
public abstract void addLocalReference(ObjectId objectId);
/**
* Decrease the reference count for this object ID.
*
* @param workerId The ID of the worker to decrease on.
* @param objectId The object ID to decrease the reference count for.
*/
public abstract void removeLocalReference(UniqueId workerId, ObjectId objectId);
public abstract void removeLocalReference(ObjectId objectId);
public abstract Address getOwnerAddress(ObjectId id);

View file

@ -2277,10 +2277,6 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
task_type = TaskType::ACTOR_TASK;
}
// Because we support concurrent actor calls, we need to update the
// worker ID for the current thread.
CoreWorkerProcess::SetCurrentThreadWorkerId(GetWorkerID());
std::shared_ptr<LocalMemoryBuffer> creation_task_exception_pb_bytes = nullptr;
std::vector<ConcurrencyGroup> defined_concurrency_groups = {};

View file

@ -57,12 +57,7 @@ void CoreWorkerProcess::HandleAtExit() { core_worker_process.reset(); }
CoreWorker &CoreWorkerProcess::GetCoreWorker() {
EnsureInitialized(/*quick_exit*/ true);
return core_worker_process->GetCoreWorkerForCurrentThread();
}
void CoreWorkerProcess::SetCurrentThreadWorkerId(const WorkerID &worker_id) {
EnsureInitialized(/*quick_exit*/ false);
core_worker_process->SetThreadLocalWorkerById(worker_id);
return *core_worker_process->GetCoreWorker();
}
void CoreWorkerProcess::RunTaskExecutionLoop() {
@ -71,26 +66,24 @@ void CoreWorkerProcess::RunTaskExecutionLoop() {
core_worker_process.reset();
}
std::shared_ptr<CoreWorker> CoreWorkerProcess::TryGetWorker(const WorkerID &worker_id) {
std::shared_ptr<CoreWorker> CoreWorkerProcess::TryGetWorker() {
if (!core_worker_process) {
return nullptr;
}
return core_worker_process->GetWorker(worker_id);
return core_worker_process->TryGetCoreWorker();
}
thread_local std::weak_ptr<CoreWorker> CoreWorkerProcessImpl::thread_local_core_worker_;
CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
: options_(options),
global_worker_id_(options.worker_type == WorkerType::DRIVER
? ComputeDriverIdFromJob(options_.job_id)
: WorkerID::FromRandom()) {
worker_id_(options.worker_type == WorkerType::DRIVER
? ComputeDriverIdFromJob(options_.job_id)
: WorkerID::FromRandom()) {
if (options_.enable_logging) {
std::stringstream app_name;
app_name << LanguageString(options_.language) << "-core-"
<< WorkerTypeString(options_.worker_type);
if (!global_worker_id_.IsNil()) {
app_name << "-" << global_worker_id_;
if (!worker_id_.IsNil()) {
app_name << "-" << worker_id_;
}
RayLog::StartRayLog(app_name.str(), RayLogLevel::INFO, options_.log_dir);
if (options_.install_failure_signal_handler) {
@ -115,8 +108,11 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options)
// NOTE(kfstorm): any initialization depending on RayConfig must happen after this line.
InitializeSystemConfig();
if (ShouldCreateGlobalWorkerOnConstruction()) {
CreateWorker();
{
// Initialize global worker instance.
auto worker = std::make_shared<CoreWorker>(options_, worker_id_);
absl::WriterMutexLock lock(&mutex_);
core_worker_ = worker;
}
// Assume stats module will be initialized exactly once in once process.
@ -235,103 +231,57 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
RayConfig::instance().initialize(promise.get_future().get());
}
bool CoreWorkerProcessImpl::ShouldCreateGlobalWorkerOnConstruction() const {
// We need to create the worker instance here if:
// 1. This is a driver process. In this case, the driver is ready to use right after
// the CoreWorkerProcess::Initialize.
// 2. This is a Python worker process. In this case, Python will invoke some core
// worker APIs before `CoreWorkerProcess::RunTaskExecutionLoop` is called. So we need
// to create the worker instance here. One example of invocations is
// https://github.com/ray-project/ray/blob/45ce40e5d44801193220d2c546be8de0feeef988/python/ray/worker.py#L1281.
return (options_.worker_type == WorkerType::DRIVER ||
options_.language == Language::PYTHON);
}
std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::GetWorker(
const WorkerID &worker_id) const {
absl::ReaderMutexLock lock(&mutex_);
return global_worker_;
}
std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::GetGlobalWorker() {
absl::ReaderMutexLock lock(&mutex_);
return global_worker_;
}
std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateWorker() {
auto worker = std::make_shared<CoreWorker>(
options_,
global_worker_id_ != WorkerID::Nil() ? global_worker_id_ : WorkerID::FromRandom());
RAY_LOG(DEBUG) << "Worker " << worker->GetWorkerID() << " is created.";
absl::WriterMutexLock lock(&mutex_);
global_worker_ = worker;
thread_local_core_worker_ = worker;
return worker;
}
void CoreWorkerProcessImpl::RemoveWorker(std::shared_ptr<CoreWorker> worker) {
absl::WriterMutexLock lock(&mutex_);
if (global_worker_) {
RAY_CHECK(global_worker_ == worker);
} else {
RAY_CHECK(thread_local_core_worker_.lock() == worker);
}
thread_local_core_worker_.reset();
RAY_LOG(INFO) << "Removed worker " << worker->GetWorkerID();
if (global_worker_ == worker) {
global_worker_ = nullptr;
}
}
void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() {
RAY_CHECK(options_.worker_type == WorkerType::WORKER);
// Run the task loop in the current thread only if the number of workers is 1.
auto worker = GetGlobalWorker();
if (!worker) {
worker = CreateWorker();
}
worker->RunTaskExecutionLoop();
auto core_worker = GetCoreWorker();
RAY_CHECK(core_worker != nullptr);
core_worker->RunTaskExecutionLoop();
RAY_LOG(INFO) << "Task execution loop terminated. Removing the global worker.";
RemoveWorker(worker);
{
absl::WriterMutexLock lock(&mutex_);
core_worker_.reset();
}
}
void CoreWorkerProcessImpl::ShutdownDriver() {
RAY_CHECK(options_.worker_type == WorkerType::DRIVER)
<< "The `Shutdown` interface is for driver only.";
auto global_worker = GetGlobalWorker();
auto global_worker = GetCoreWorker();
RAY_CHECK(global_worker);
global_worker->Disconnect(/*exit_type*/ rpc::WorkerExitType::INTENDED_USER_EXIT,
/*exit_detail*/ "Shutdown by ray.shutdown().");
global_worker->Shutdown();
RemoveWorker(global_worker);
{
absl::WriterMutexLock lock(&mutex_);
core_worker_.reset();
}
}
CoreWorker &CoreWorkerProcessImpl::GetCoreWorkerForCurrentThread() {
auto global_worker = GetGlobalWorker();
if (ShouldCreateGlobalWorkerOnConstruction() && !global_worker) {
std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::TryGetCoreWorker() const {
absl::ReaderMutexLock lock(&mutex_);
return core_worker_;
}
std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::GetCoreWorker() const {
absl::ReaderMutexLock lock(&mutex_);
if (!core_worker_) {
// This could only happen when the worker has already been shutdown.
// In this case, we should exit without crashing.
// TODO (scv119): A better solution could be returning error code
// and handling it at language frontend.
if (options_.worker_type == WorkerType::DRIVER) {
RAY_LOG(ERROR) << "The global worker has already been shutdown. This happens when "
RAY_LOG(ERROR) << "The core worker has already been shutdown. This happens when "
"the language frontend accesses the Ray's worker after it is "
"shutdown. The process will exit";
} else {
RAY_LOG(INFO) << "The global worker has already been shutdown. This happens when "
RAY_LOG(INFO) << "The core worker has already been shutdown. This happens when "
"the language frontend accesses the Ray's worker after it is "
"shutdown. The process will exit";
}
QuickExit();
}
RAY_CHECK(global_worker) << "global_worker_ must not be NULL";
return *global_worker;
}
void CoreWorkerProcessImpl::SetThreadLocalWorkerById(const WorkerID &worker_id) {
RAY_CHECK(GetGlobalWorker()->GetWorkerID() == worker_id);
return;
RAY_CHECK(core_worker_) << "core_worker_ must not be NULL";
return core_worker_;
}
} // namespace core

View file

@ -19,7 +19,7 @@ namespace core {
class CoreWorker;
/// Lifecycle management of one or more `CoreWorker` instances in a process.
/// Lifecycle management of the `CoreWorker` instance in a process.
///
/// To start a driver in the current process:
/// CoreWorkerOptions options = {
@ -31,7 +31,7 @@ class CoreWorker;
/// To shutdown a driver in the current process:
/// CoreWorkerProcess::Shutdown();
///
/// To start one or more workers in the current process:
/// To start a worker in the current process:
/// CoreWorkerOptions options = {
/// WorkerType::WORKER, // worker_type
/// ..., // other arguments
@ -44,14 +44,6 @@ class CoreWorker;
/// code `IntentionalSystemExit` or `UnexpectedSystemExit`) in the task execution
/// callback.
///
/// If more than 1 worker is started, only the threads which invoke the
/// `task_execution_callback` will be automatically associated with the corresponding
/// worker. If you started your own threads and you want to use core worker APIs in these
/// threads, remember to call `CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id)`
/// once in the new thread before calling core worker APIs, to associate the current
/// thread with a worker. You can obtain the worker ID via
/// `CoreWorkerProcess::GetCoreWorker()->GetWorkerID()`.
///
/// How does core worker process dealloation work?
///
/// For an individual core worker thread's shutdown process, please check core_worker.h.
@ -77,7 +69,7 @@ class CoreWorkerProcess {
/// \param[in] options The various initialization options.
static void Initialize(const CoreWorkerOptions &options);
/// Get the core worker associated with the current thread.
/// Get the core worker.
/// NOTE (kfstorm): Here we return a reference instead of a `shared_ptr` to make sure
/// `CoreWorkerProcess` has full control of the destruction timing of `CoreWorker`.
static CoreWorker &GetCoreWorker();
@ -87,13 +79,7 @@ class CoreWorkerProcess {
///
/// \param[in] workerId The worker ID.
/// \return The `CoreWorker` instance.
static std::shared_ptr<CoreWorker> TryGetWorker(const WorkerID &worker_id);
/// Set the core worker associated with the current thread by worker ID.
/// Currently used by Java worker only.
///
/// \param worker_id The worker ID of the core worker instance.
static void SetCurrentThreadWorkerId(const WorkerID &worker_id);
static std::shared_ptr<CoreWorker> TryGetWorker();
/// Whether the current process has been initialized for core worker.
static bool IsInitialized();
@ -129,35 +115,20 @@ class CoreWorkerProcessImpl {
/// Create an `CoreWorkerProcessImpl` with proper options.
///
/// \param[in] options The various initialization options.
CoreWorkerProcessImpl(const CoreWorkerOptions &options);
explicit CoreWorkerProcessImpl(const CoreWorkerOptions &options);
~CoreWorkerProcessImpl();
void InitializeSystemConfig();
/// Check that if the global worker should be created on construction.
bool ShouldCreateGlobalWorkerOnConstruction() const;
/// Try to get core worker. Returns nullptr if core worker doesn't exist.
std::shared_ptr<CoreWorker> TryGetCoreWorker() const;
/// Get the `CoreWorker` instance by worker ID.
/// Get the `CoreWorker` instance. The process will be exited if
/// the core worker is nullptr.
///
/// \param[in] workerId The worker ID.
/// \return The `CoreWorker` instance.
std::shared_ptr<CoreWorker> GetWorker(const WorkerID &worker_id) const
LOCKS_EXCLUDED(mutex_);
/// Create a new `CoreWorker` instance.
///
/// \return The newly created `CoreWorker` instance.
std::shared_ptr<CoreWorker> CreateWorker() LOCKS_EXCLUDED(mutex_);
/// Remove an existing `CoreWorker` instance.
///
/// \param[in] The existing `CoreWorker` instance.
/// \return Void.
void RemoveWorker(std::shared_ptr<CoreWorker> worker) LOCKS_EXCLUDED(mutex_);
/// Get the `GlobalWorker` instance, if the number of workers is 1.
std::shared_ptr<CoreWorker> GetGlobalWorker() LOCKS_EXCLUDED(mutex_);
std::shared_ptr<CoreWorker> GetCoreWorker() const;
/// Run worker execution loop.
void RunWorkerTaskExecutionLoop();
@ -165,28 +136,17 @@ class CoreWorkerProcessImpl {
/// Shutdown the driver completely at the process level.
void ShutdownDriver();
/// Return the CoreWorker for current thread.
CoreWorker &GetCoreWorkerForCurrentThread();
/// Set the core worker associated with the current thread by worker ID.
/// Currently used by Java worker only.
void SetThreadLocalWorkerById(const WorkerID &worker_id);
private:
/// The various options.
const CoreWorkerOptions options_;
/// The only core worker instance, if the number of workers is 1.
std::shared_ptr<CoreWorker> global_worker_ GUARDED_BY(mutex_);
/// The core worker instance of this worker process.
std::shared_ptr<CoreWorker> core_worker_ GUARDED_BY(mutex_);
/// The core worker instance associated with the current thread.
/// Use weak_ptr here to avoid memory leak due to multi-threading.
static thread_local std::weak_ptr<CoreWorker> thread_local_core_worker_;
/// The worker ID of this worker.
const WorkerID worker_id_;
/// The worker ID of the global worker, if the number of workers is 1.
const WorkerID global_worker_id_;
/// To protect access to workers_ and global_worker_
/// To protect access to core_worker_
mutable absl::Mutex mutex_;
};
} // namespace core

View file

@ -75,12 +75,11 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize(JNIEnv *env,
JNIEXPORT void JNICALL
Java_io_ray_runtime_actor_NativeActorHandle_nativeRemoveActorHandleReference(
JNIEnv *env, jclass clz, jbyteArray workerId, jbyteArray actorId) {
JNIEnv *env, jclass clz, jbyteArray actorId) {
// We can't control the timing of Java GC, so it's normal that this method is called but
// core worker is shutting down (or already shut down). If we can't get a core worker
// instance here, skip calling the `RemoveLocalReference` method.
const auto worker_id = JavaByteArrayToId<ray::WorkerID>(env, workerId);
auto core_worker = CoreWorkerProcess::TryGetWorker(worker_id);
auto core_worker = CoreWorkerProcess::TryGetWorker();
if (core_worker != nullptr) {
const auto actor_id = JavaByteArrayToId<ActorID>(env, actorId);
core_worker->RemoveActorHandleReference(actor_id);

View file

@ -59,12 +59,11 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize(JNIEnv *,
/*
* Class: io_ray_runtime_actor_NativeActorHandle
* Method: nativeRemoveActorHandleReference
* Signature: ([B[B)V
* Signature: ([B)V
*/
JNIEXPORT void JNICALL
Java_io_ray_runtime_actor_NativeActorHandle_nativeRemoveActorHandleReference(JNIEnv *,
jclass,
jbyteArray,
jbyteArray);
#ifdef __cplusplus

View file

@ -168,23 +168,21 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeDelete
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeAddLocalReference(
JNIEnv *env, jclass, jbyteArray workerId, jbyteArray objectId) {
auto worker_id = JavaByteArrayToId<WorkerID>(env, workerId);
JNIEnv *env, jclass, jbyteArray objectId) {
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
auto core_worker = CoreWorkerProcess::TryGetWorker(worker_id);
auto core_worker = CoreWorkerProcess::TryGetWorker();
RAY_CHECK(core_worker);
core_worker->AddLocalReference(object_id);
}
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeRemoveLocalReference(
JNIEnv *env, jclass, jbyteArray workerId, jbyteArray objectId) {
auto worker_id = JavaByteArrayToId<WorkerID>(env, workerId);
JNIEnv *env, jclass, jbyteArray objectId) {
auto object_id = JavaByteArrayToId<ObjectID>(env, objectId);
// We can't control the timing of Java GC, so it's normal that this method is called but
// core worker is shutting down (or already shut down). If we can't get a core worker
// instance here, skip calling the `RemoveLocalReference` method.
auto core_worker = CoreWorkerProcess::TryGetWorker(worker_id);
auto core_worker = CoreWorkerProcess::TryGetWorker();
if (core_worker) {
core_worker->RemoveLocalReference(object_id);
}

View file

@ -68,23 +68,21 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeDelete
/*
* Class: io_ray_runtime_object_NativeObjectStore
* Method: nativeAddLocalReference
* Signature: ([B[B)V
* Signature: ([B)V
*/
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeAddLocalReference(JNIEnv *,
jclass,
jbyteArray,
jbyteArray);
/*
* Class: io_ray_runtime_object_NativeObjectStore
* Method: nativeRemoveLocalReference
* Signature: ([B[B)V
* Signature: ([B)V
*/
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeRemoveLocalReference(JNIEnv *,
jclass,
jbyteArray,
jbyteArray);
/*

View file

@ -64,9 +64,5 @@ const ActorID &GetCurrentActorID() {
bool IsInitialized() { return CoreWorkerProcess::IsInitialized(); }
void SetCurrentThreadWorker(const WorkerID &worker_id) {
CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id);
}
} // namespace internal
} // namespace ray

View file

@ -40,12 +40,6 @@ const stats::TagKeyType TagRegister(const std::string tag_name);
/// Get current actor id via internal.
const ActorID &GetCurrentActorID();
/// Set the core worker associated with the current thread by worker ID.
/// Currently used by Java worker only.
///
/// \param worker_id The worker ID of the core worker instance.
void SetCurrentThreadWorker(const WorkerID &worker_id);
/// Get core worker initialization flag via internal.
bool IsInitialized();
} // namespace internal