diff --git a/java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java b/java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java index f5ee9deff..0c6e3a3fd 100644 --- a/java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java +++ b/java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java @@ -116,14 +116,12 @@ public abstract class NativeActorHandle implements BaseActorHandle, Externalizab private static final class NativeActorHandleReference extends FinalizableWeakReference { private final AtomicBoolean removed; - private final byte[] workerId; private final byte[] actorId; public NativeActorHandleReference(NativeActorHandle handle) { super(handle, REFERENCE_QUEUE); this.actorId = handle.actorId; AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); - this.workerId = runtime.getWorkerContext().getCurrentWorkerId().getBytes(); this.removed = new AtomicBoolean(false); REFERENCES.add(this); } @@ -134,7 +132,7 @@ public abstract class NativeActorHandle implements BaseActorHandle, Externalizab REFERENCES.remove(this); // It's possible that GC is executed after the runtime is shutdown. if (Ray.isInitialized()) { - nativeRemoveActorHandleReference(workerId, actorId); + nativeRemoveActorHandleReference(actorId); } } } @@ -150,5 +148,5 @@ public abstract class NativeActorHandle implements BaseActorHandle, Externalizab private static native byte[] nativeDeserialize(byte[] data); - private static native void nativeRemoveActorHandleReference(byte[] workerId, byte[] actorId); + private static native void nativeRemoveActorHandleReference(byte[] actorId); } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java index 164daef18..92f814739 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java @@ -4,7 +4,6 @@ import com.google.common.base.Preconditions; import io.ray.api.exception.RayTimeoutException; import io.ray.api.id.ActorId; import io.ray.api.id.ObjectId; -import io.ray.api.id.UniqueId; import io.ray.runtime.context.WorkerContext; import io.ray.runtime.generated.Common.Address; import java.util.ArrayList; @@ -110,10 +109,10 @@ public class LocalModeObjectStore extends ObjectStore { } @Override - public void addLocalReference(UniqueId workerId, ObjectId objectId) {} + public void addLocalReference(ObjectId objectId) {} @Override - public void removeLocalReference(UniqueId workerId, ObjectId objectId) {} + public void removeLocalReference(ObjectId objectId) {} @Override public Address getOwnerAddress(ObjectId id) { diff --git a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java index ef48447d5..2ebbd50d9 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java @@ -5,7 +5,6 @@ import io.ray.api.Ray; import io.ray.api.id.ActorId; import io.ray.api.id.BaseId; import io.ray.api.id.ObjectId; -import io.ray.api.id.UniqueId; import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.context.WorkerContext; import io.ray.runtime.generated.Common.Address; @@ -66,16 +65,16 @@ public class NativeObjectStore extends ObjectStore { } @Override - public void addLocalReference(UniqueId workerId, ObjectId objectId) { - nativeAddLocalReference(workerId.getBytes(), objectId.getBytes()); + public void addLocalReference(ObjectId objectId) { + nativeAddLocalReference(objectId.getBytes()); } @Override - public void removeLocalReference(UniqueId workerId, ObjectId objectId) { + public void removeLocalReference(ObjectId objectId) { Lock readLock = shutdownLock.readLock(); readLock.lock(); try { - nativeRemoveLocalReference(workerId.getBytes(), objectId.getBytes()); + nativeRemoveLocalReference(objectId.getBytes()); } finally { readLock.unlock(); } @@ -128,9 +127,9 @@ public class NativeObjectStore extends ObjectStore { private static native void nativeDelete(List objectIds, boolean localOnly); - private static native void nativeAddLocalReference(byte[] workerId, byte[] objectId); + private static native void nativeAddLocalReference(byte[] objectId); - private static native void nativeRemoveLocalReference(byte[] workerId, byte[] objectId); + private static native void nativeRemoveLocalReference(byte[] objectId); private static native Map nativeGetAllReferenceCounts(); diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java index 51bf3c20d..87e92508f 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java @@ -7,7 +7,6 @@ import com.google.common.collect.Sets; import io.ray.api.ObjectRef; import io.ray.api.Ray; import io.ray.api.id.ObjectId; -import io.ray.api.id.UniqueId; import io.ray.runtime.AbstractRayRuntime; import java.io.Externalizable; import java.io.IOException; @@ -38,10 +37,6 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { private ObjectId id; - // In GC thread, we don't know which worker this object binds to, so we need to - // store the worker ID for later uses. - private transient UniqueId workerId; - private Class type; // Raw data of this object. @@ -61,11 +56,9 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { this.id = id; this.type = (Class) type; AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); - Preconditions.checkState(workerId == null); - workerId = runtime.getWorkerContext().getCurrentWorkerId(); if (!skipAddingLocalRef) { - runtime.getObjectStore().addLocalReference(workerId, id); + runtime.getObjectStore().addLocalReference(id); } // We still add the reference so that the local ref count will be properly // decremented once this object is GCed. @@ -122,9 +115,7 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { in.readFully(ownerAddress); AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); - Preconditions.checkState(workerId == null); - workerId = runtime.getWorkerContext().getCurrentWorkerId(); - runtime.getObjectStore().addLocalReference(workerId, id); + runtime.getObjectStore().addLocalReference(id); new ObjectRefImplReference(this); runtime @@ -136,13 +127,11 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { private static final class ObjectRefImplReference extends FinalizableWeakReference> { - private final UniqueId workerId; private final ObjectId objectId; private final AtomicBoolean removed; public ObjectRefImplReference(ObjectRefImpl obj) { super(obj, REFERENCE_QUEUE); - this.workerId = obj.workerId; this.objectId = obj.id; this.removed = new AtomicBoolean(false); REFERENCES.add(this); @@ -151,14 +140,12 @@ public final class ObjectRefImpl implements ObjectRef, Externalizable { @Override public void finalizeReferent() { // This method may be invoked multiple times on the same instance (due to explicit invoking in - // unit tests). So if `workerId` is null, it means this method has been invoked. + // unit tests). if (!removed.getAndSet(true)) { REFERENCES.remove(this); // It's possible that GC is executed after the runtime is shutdown. if (Ray.isInitialized()) { - ((AbstractRayRuntime) (Ray.internal())) - .getObjectStore() - .removeLocalReference(workerId, objectId); + ((AbstractRayRuntime) (Ray.internal())).getObjectStore().removeLocalReference(objectId); allObjects.remove(objectId); LOG.debug("Object {} is finalized.", objectId); } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java index a2b18b3a1..1121e06bf 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java @@ -6,7 +6,6 @@ import io.ray.api.WaitResult; import io.ray.api.exception.RayException; import io.ray.api.id.ActorId; import io.ray.api.id.ObjectId; -import io.ray.api.id.UniqueId; import io.ray.runtime.context.WorkerContext; import io.ray.runtime.generated.Common.Address; import java.util.ArrayList; @@ -222,18 +221,16 @@ public abstract class ObjectStore { /** * Increase the local reference count for this object ID. * - * @param workerId The ID of the worker to increase on. * @param objectId The object ID to increase the reference count for. */ - public abstract void addLocalReference(UniqueId workerId, ObjectId objectId); + public abstract void addLocalReference(ObjectId objectId); /** * Decrease the reference count for this object ID. * - * @param workerId The ID of the worker to decrease on. * @param objectId The object ID to decrease the reference count for. */ - public abstract void removeLocalReference(UniqueId workerId, ObjectId objectId); + public abstract void removeLocalReference(ObjectId objectId); public abstract Address getOwnerAddress(ObjectId id); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 58cb56c9c..5795a061c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2277,10 +2277,6 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, task_type = TaskType::ACTOR_TASK; } - // Because we support concurrent actor calls, we need to update the - // worker ID for the current thread. - CoreWorkerProcess::SetCurrentThreadWorkerId(GetWorkerID()); - std::shared_ptr creation_task_exception_pb_bytes = nullptr; std::vector defined_concurrency_groups = {}; diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index 9a58414b3..9026bfa0e 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -57,12 +57,7 @@ void CoreWorkerProcess::HandleAtExit() { core_worker_process.reset(); } CoreWorker &CoreWorkerProcess::GetCoreWorker() { EnsureInitialized(/*quick_exit*/ true); - return core_worker_process->GetCoreWorkerForCurrentThread(); -} - -void CoreWorkerProcess::SetCurrentThreadWorkerId(const WorkerID &worker_id) { - EnsureInitialized(/*quick_exit*/ false); - core_worker_process->SetThreadLocalWorkerById(worker_id); + return *core_worker_process->GetCoreWorker(); } void CoreWorkerProcess::RunTaskExecutionLoop() { @@ -71,26 +66,24 @@ void CoreWorkerProcess::RunTaskExecutionLoop() { core_worker_process.reset(); } -std::shared_ptr CoreWorkerProcess::TryGetWorker(const WorkerID &worker_id) { +std::shared_ptr CoreWorkerProcess::TryGetWorker() { if (!core_worker_process) { return nullptr; } - return core_worker_process->GetWorker(worker_id); + return core_worker_process->TryGetCoreWorker(); } -thread_local std::weak_ptr CoreWorkerProcessImpl::thread_local_core_worker_; - CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) : options_(options), - global_worker_id_(options.worker_type == WorkerType::DRIVER - ? ComputeDriverIdFromJob(options_.job_id) - : WorkerID::FromRandom()) { + worker_id_(options.worker_type == WorkerType::DRIVER + ? ComputeDriverIdFromJob(options_.job_id) + : WorkerID::FromRandom()) { if (options_.enable_logging) { std::stringstream app_name; app_name << LanguageString(options_.language) << "-core-" << WorkerTypeString(options_.worker_type); - if (!global_worker_id_.IsNil()) { - app_name << "-" << global_worker_id_; + if (!worker_id_.IsNil()) { + app_name << "-" << worker_id_; } RayLog::StartRayLog(app_name.str(), RayLogLevel::INFO, options_.log_dir); if (options_.install_failure_signal_handler) { @@ -115,8 +108,11 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) // NOTE(kfstorm): any initialization depending on RayConfig must happen after this line. InitializeSystemConfig(); - if (ShouldCreateGlobalWorkerOnConstruction()) { - CreateWorker(); + { + // Initialize global worker instance. + auto worker = std::make_shared(options_, worker_id_); + absl::WriterMutexLock lock(&mutex_); + core_worker_ = worker; } // Assume stats module will be initialized exactly once in once process. @@ -235,103 +231,57 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() { RayConfig::instance().initialize(promise.get_future().get()); } -bool CoreWorkerProcessImpl::ShouldCreateGlobalWorkerOnConstruction() const { - // We need to create the worker instance here if: - // 1. This is a driver process. In this case, the driver is ready to use right after - // the CoreWorkerProcess::Initialize. - // 2. This is a Python worker process. In this case, Python will invoke some core - // worker APIs before `CoreWorkerProcess::RunTaskExecutionLoop` is called. So we need - // to create the worker instance here. One example of invocations is - // https://github.com/ray-project/ray/blob/45ce40e5d44801193220d2c546be8de0feeef988/python/ray/worker.py#L1281. - return (options_.worker_type == WorkerType::DRIVER || - options_.language == Language::PYTHON); -} - -std::shared_ptr CoreWorkerProcessImpl::GetWorker( - const WorkerID &worker_id) const { - absl::ReaderMutexLock lock(&mutex_); - return global_worker_; -} - -std::shared_ptr CoreWorkerProcessImpl::GetGlobalWorker() { - absl::ReaderMutexLock lock(&mutex_); - return global_worker_; -} - -std::shared_ptr CoreWorkerProcessImpl::CreateWorker() { - auto worker = std::make_shared( - options_, - global_worker_id_ != WorkerID::Nil() ? global_worker_id_ : WorkerID::FromRandom()); - RAY_LOG(DEBUG) << "Worker " << worker->GetWorkerID() << " is created."; - absl::WriterMutexLock lock(&mutex_); - global_worker_ = worker; - thread_local_core_worker_ = worker; - - return worker; -} - -void CoreWorkerProcessImpl::RemoveWorker(std::shared_ptr worker) { - absl::WriterMutexLock lock(&mutex_); - if (global_worker_) { - RAY_CHECK(global_worker_ == worker); - } else { - RAY_CHECK(thread_local_core_worker_.lock() == worker); - } - thread_local_core_worker_.reset(); - RAY_LOG(INFO) << "Removed worker " << worker->GetWorkerID(); - if (global_worker_ == worker) { - global_worker_ = nullptr; - } -} - void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() { RAY_CHECK(options_.worker_type == WorkerType::WORKER); - // Run the task loop in the current thread only if the number of workers is 1. - auto worker = GetGlobalWorker(); - if (!worker) { - worker = CreateWorker(); - } - worker->RunTaskExecutionLoop(); + auto core_worker = GetCoreWorker(); + RAY_CHECK(core_worker != nullptr); + core_worker->RunTaskExecutionLoop(); RAY_LOG(INFO) << "Task execution loop terminated. Removing the global worker."; - RemoveWorker(worker); + { + absl::WriterMutexLock lock(&mutex_); + core_worker_.reset(); + } } void CoreWorkerProcessImpl::ShutdownDriver() { RAY_CHECK(options_.worker_type == WorkerType::DRIVER) << "The `Shutdown` interface is for driver only."; - auto global_worker = GetGlobalWorker(); + auto global_worker = GetCoreWorker(); RAY_CHECK(global_worker); global_worker->Disconnect(/*exit_type*/ rpc::WorkerExitType::INTENDED_USER_EXIT, /*exit_detail*/ "Shutdown by ray.shutdown()."); global_worker->Shutdown(); - RemoveWorker(global_worker); + { + absl::WriterMutexLock lock(&mutex_); + core_worker_.reset(); + } } -CoreWorker &CoreWorkerProcessImpl::GetCoreWorkerForCurrentThread() { - auto global_worker = GetGlobalWorker(); - if (ShouldCreateGlobalWorkerOnConstruction() && !global_worker) { +std::shared_ptr CoreWorkerProcessImpl::TryGetCoreWorker() const { + absl::ReaderMutexLock lock(&mutex_); + return core_worker_; +} + +std::shared_ptr CoreWorkerProcessImpl::GetCoreWorker() const { + absl::ReaderMutexLock lock(&mutex_); + if (!core_worker_) { // This could only happen when the worker has already been shutdown. // In this case, we should exit without crashing. // TODO (scv119): A better solution could be returning error code // and handling it at language frontend. if (options_.worker_type == WorkerType::DRIVER) { - RAY_LOG(ERROR) << "The global worker has already been shutdown. This happens when " + RAY_LOG(ERROR) << "The core worker has already been shutdown. This happens when " "the language frontend accesses the Ray's worker after it is " "shutdown. The process will exit"; } else { - RAY_LOG(INFO) << "The global worker has already been shutdown. This happens when " + RAY_LOG(INFO) << "The core worker has already been shutdown. This happens when " "the language frontend accesses the Ray's worker after it is " "shutdown. The process will exit"; } QuickExit(); } - RAY_CHECK(global_worker) << "global_worker_ must not be NULL"; - return *global_worker; -} - -void CoreWorkerProcessImpl::SetThreadLocalWorkerById(const WorkerID &worker_id) { - RAY_CHECK(GetGlobalWorker()->GetWorkerID() == worker_id); - return; + RAY_CHECK(core_worker_) << "core_worker_ must not be NULL"; + return core_worker_; } } // namespace core diff --git a/src/ray/core_worker/core_worker_process.h b/src/ray/core_worker/core_worker_process.h index 15067be3a..09f3853d3 100644 --- a/src/ray/core_worker/core_worker_process.h +++ b/src/ray/core_worker/core_worker_process.h @@ -19,7 +19,7 @@ namespace core { class CoreWorker; -/// Lifecycle management of one or more `CoreWorker` instances in a process. +/// Lifecycle management of the `CoreWorker` instance in a process. /// /// To start a driver in the current process: /// CoreWorkerOptions options = { @@ -31,7 +31,7 @@ class CoreWorker; /// To shutdown a driver in the current process: /// CoreWorkerProcess::Shutdown(); /// -/// To start one or more workers in the current process: +/// To start a worker in the current process: /// CoreWorkerOptions options = { /// WorkerType::WORKER, // worker_type /// ..., // other arguments @@ -44,14 +44,6 @@ class CoreWorker; /// code `IntentionalSystemExit` or `UnexpectedSystemExit`) in the task execution /// callback. /// -/// If more than 1 worker is started, only the threads which invoke the -/// `task_execution_callback` will be automatically associated with the corresponding -/// worker. If you started your own threads and you want to use core worker APIs in these -/// threads, remember to call `CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id)` -/// once in the new thread before calling core worker APIs, to associate the current -/// thread with a worker. You can obtain the worker ID via -/// `CoreWorkerProcess::GetCoreWorker()->GetWorkerID()`. -/// /// How does core worker process dealloation work? /// /// For an individual core worker thread's shutdown process, please check core_worker.h. @@ -77,7 +69,7 @@ class CoreWorkerProcess { /// \param[in] options The various initialization options. static void Initialize(const CoreWorkerOptions &options); - /// Get the core worker associated with the current thread. + /// Get the core worker. /// NOTE (kfstorm): Here we return a reference instead of a `shared_ptr` to make sure /// `CoreWorkerProcess` has full control of the destruction timing of `CoreWorker`. static CoreWorker &GetCoreWorker(); @@ -87,13 +79,7 @@ class CoreWorkerProcess { /// /// \param[in] workerId The worker ID. /// \return The `CoreWorker` instance. - static std::shared_ptr TryGetWorker(const WorkerID &worker_id); - - /// Set the core worker associated with the current thread by worker ID. - /// Currently used by Java worker only. - /// - /// \param worker_id The worker ID of the core worker instance. - static void SetCurrentThreadWorkerId(const WorkerID &worker_id); + static std::shared_ptr TryGetWorker(); /// Whether the current process has been initialized for core worker. static bool IsInitialized(); @@ -129,35 +115,20 @@ class CoreWorkerProcessImpl { /// Create an `CoreWorkerProcessImpl` with proper options. /// /// \param[in] options The various initialization options. - CoreWorkerProcessImpl(const CoreWorkerOptions &options); + explicit CoreWorkerProcessImpl(const CoreWorkerOptions &options); ~CoreWorkerProcessImpl(); void InitializeSystemConfig(); - /// Check that if the global worker should be created on construction. - bool ShouldCreateGlobalWorkerOnConstruction() const; + /// Try to get core worker. Returns nullptr if core worker doesn't exist. + std::shared_ptr TryGetCoreWorker() const; - /// Get the `CoreWorker` instance by worker ID. + /// Get the `CoreWorker` instance. The process will be exited if + /// the core worker is nullptr. /// - /// \param[in] workerId The worker ID. /// \return The `CoreWorker` instance. - std::shared_ptr GetWorker(const WorkerID &worker_id) const - LOCKS_EXCLUDED(mutex_); - - /// Create a new `CoreWorker` instance. - /// - /// \return The newly created `CoreWorker` instance. - std::shared_ptr CreateWorker() LOCKS_EXCLUDED(mutex_); - - /// Remove an existing `CoreWorker` instance. - /// - /// \param[in] The existing `CoreWorker` instance. - /// \return Void. - void RemoveWorker(std::shared_ptr worker) LOCKS_EXCLUDED(mutex_); - - /// Get the `GlobalWorker` instance, if the number of workers is 1. - std::shared_ptr GetGlobalWorker() LOCKS_EXCLUDED(mutex_); + std::shared_ptr GetCoreWorker() const; /// Run worker execution loop. void RunWorkerTaskExecutionLoop(); @@ -165,28 +136,17 @@ class CoreWorkerProcessImpl { /// Shutdown the driver completely at the process level. void ShutdownDriver(); - /// Return the CoreWorker for current thread. - CoreWorker &GetCoreWorkerForCurrentThread(); - - /// Set the core worker associated with the current thread by worker ID. - /// Currently used by Java worker only. - void SetThreadLocalWorkerById(const WorkerID &worker_id); - private: /// The various options. const CoreWorkerOptions options_; - /// The only core worker instance, if the number of workers is 1. - std::shared_ptr global_worker_ GUARDED_BY(mutex_); + /// The core worker instance of this worker process. + std::shared_ptr core_worker_ GUARDED_BY(mutex_); - /// The core worker instance associated with the current thread. - /// Use weak_ptr here to avoid memory leak due to multi-threading. - static thread_local std::weak_ptr thread_local_core_worker_; + /// The worker ID of this worker. + const WorkerID worker_id_; - /// The worker ID of the global worker, if the number of workers is 1. - const WorkerID global_worker_id_; - - /// To protect access to workers_ and global_worker_ + /// To protect access to core_worker_ mutable absl::Mutex mutex_; }; } // namespace core diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc index ae02431f5..97a0ca407 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc @@ -75,12 +75,11 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize(JNIEnv *env, JNIEXPORT void JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeRemoveActorHandleReference( - JNIEnv *env, jclass clz, jbyteArray workerId, jbyteArray actorId) { + JNIEnv *env, jclass clz, jbyteArray actorId) { // We can't control the timing of Java GC, so it's normal that this method is called but // core worker is shutting down (or already shut down). If we can't get a core worker // instance here, skip calling the `RemoveLocalReference` method. - const auto worker_id = JavaByteArrayToId(env, workerId); - auto core_worker = CoreWorkerProcess::TryGetWorker(worker_id); + auto core_worker = CoreWorkerProcess::TryGetWorker(); if (core_worker != nullptr) { const auto actor_id = JavaByteArrayToId(env, actorId); core_worker->RemoveActorHandleReference(actor_id); diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.h b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.h index ed3f14227..0c8a3efdb 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.h @@ -59,12 +59,11 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize(JNIEnv *, /* * Class: io_ray_runtime_actor_NativeActorHandle * Method: nativeRemoveActorHandleReference - * Signature: ([B[B)V + * Signature: ([B)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeRemoveActorHandleReference(JNIEnv *, jclass, - jbyteArray, jbyteArray); #ifdef __cplusplus diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index 26c8d9029..c14cfe186 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -168,23 +168,21 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeDelete JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeAddLocalReference( - JNIEnv *env, jclass, jbyteArray workerId, jbyteArray objectId) { - auto worker_id = JavaByteArrayToId(env, workerId); + JNIEnv *env, jclass, jbyteArray objectId) { auto object_id = JavaByteArrayToId(env, objectId); - auto core_worker = CoreWorkerProcess::TryGetWorker(worker_id); + auto core_worker = CoreWorkerProcess::TryGetWorker(); RAY_CHECK(core_worker); core_worker->AddLocalReference(object_id); } JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeRemoveLocalReference( - JNIEnv *env, jclass, jbyteArray workerId, jbyteArray objectId) { - auto worker_id = JavaByteArrayToId(env, workerId); + JNIEnv *env, jclass, jbyteArray objectId) { auto object_id = JavaByteArrayToId(env, objectId); // We can't control the timing of Java GC, so it's normal that this method is called but // core worker is shutting down (or already shut down). If we can't get a core worker // instance here, skip calling the `RemoveLocalReference` method. - auto core_worker = CoreWorkerProcess::TryGetWorker(worker_id); + auto core_worker = CoreWorkerProcess::TryGetWorker(); if (core_worker) { core_worker->RemoveLocalReference(object_id); } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h index de8bf810e..8c7ad817f 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h @@ -68,23 +68,21 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeDelete /* * Class: io_ray_runtime_object_NativeObjectStore * Method: nativeAddLocalReference - * Signature: ([B[B)V + * Signature: ([B)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeAddLocalReference(JNIEnv *, jclass, - jbyteArray, jbyteArray); /* * Class: io_ray_runtime_object_NativeObjectStore * Method: nativeRemoveLocalReference - * Signature: ([B[B)V + * Signature: ([B)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeRemoveLocalReference(JNIEnv *, jclass, - jbyteArray, jbyteArray); /* diff --git a/src/ray/internal/internal.cc b/src/ray/internal/internal.cc index 03f8786e8..7821c2b1f 100644 --- a/src/ray/internal/internal.cc +++ b/src/ray/internal/internal.cc @@ -64,9 +64,5 @@ const ActorID &GetCurrentActorID() { bool IsInitialized() { return CoreWorkerProcess::IsInitialized(); } -void SetCurrentThreadWorker(const WorkerID &worker_id) { - CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id); -} - } // namespace internal } // namespace ray diff --git a/src/ray/internal/internal.h b/src/ray/internal/internal.h index 92b97e0d0..0eb58062c 100644 --- a/src/ray/internal/internal.h +++ b/src/ray/internal/internal.h @@ -40,12 +40,6 @@ const stats::TagKeyType TagRegister(const std::string tag_name); /// Get current actor id via internal. const ActorID &GetCurrentActorID(); -/// Set the core worker associated with the current thread by worker ID. -/// Currently used by Java worker only. -/// -/// \param worker_id The worker ID of the core worker instance. -void SetCurrentThreadWorker(const WorkerID &worker_id); - /// Get core worker initialization flag via internal. bool IsInitialized(); } // namespace internal