[C++ worker] Refine worker context and more (#26281)

* Avoid depending on `CoreWorkerProcess::GetCoreWorker()` in local mode.
* Fix bug in `LocalModeObjectStore::PutRaw`.
* Remove unused `TaskExecutor::Execute` method.
* Use `Process::Wait` instead of sleep when invoking `ray start` and `ray stop`.
This commit is contained in:
Kai Yang 2022-07-05 13:47:28 +08:00 committed by GitHub
parent 6f7efa69d5
commit 7ea9d91e1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 20 additions and 28 deletions

View file

@ -87,7 +87,7 @@ void AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data,
}
std::string AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data) {
ObjectID object_id{};
ObjectID object_id;
object_store_->Put(data, &object_id);
return object_id.Binary();
}
@ -224,8 +224,8 @@ const JobID &AbstractRayRuntime::GetCurrentJobID() {
return GetWorkerContext().GetCurrentJobID();
}
const WorkerContext &AbstractRayRuntime::GetWorkerContext() {
return CoreWorkerProcess::GetCoreWorker().GetWorkerContext();
const ActorID &AbstractRayRuntime::GetCurrentActorID() {
return GetWorkerContext().GetCurrentActorID();
}
void AbstractRayRuntime::AddLocalReference(const std::string &id) {

View file

@ -90,7 +90,9 @@ class AbstractRayRuntime : public RayRuntime {
const JobID &GetCurrentJobID();
virtual const WorkerContext &GetWorkerContext();
const ActorID &GetCurrentActorID();
virtual const WorkerContext &GetWorkerContext() = 0;
static std::shared_ptr<AbstractRayRuntime> GetInstance();
static std::shared_ptr<AbstractRayRuntime> DoInit();
@ -101,8 +103,6 @@ class AbstractRayRuntime : public RayRuntime {
bool WasCurrentActorRestarted();
virtual ActorID GetCurrentActorID() { return ActorID::Nil(); }
virtual std::vector<PlacementGroup> GetAllPlacementGroups();
virtual PlacementGroup GetPlacementGroupById(const std::string &id);
virtual PlacementGroup GetPlacementGroup(const std::string &name);

View file

@ -38,8 +38,6 @@ ActorID LocalModeRayRuntime::GetNextActorID() {
return actor_id;
}
ActorID LocalModeRayRuntime::GetCurrentActorID() { return worker_.GetCurrentActorID(); }
const WorkerContext &LocalModeRayRuntime::GetWorkerContext() { return worker_; }
std::string LocalModeRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data) {

View file

@ -26,7 +26,6 @@ class LocalModeRayRuntime : public AbstractRayRuntime {
LocalModeRayRuntime();
ActorID GetNextActorID();
ActorID GetCurrentActorID();
std::string Put(std::shared_ptr<msgpack::sbuffer> data);
const WorkerContext &GetWorkerContext();
bool IsLocalMode() { return true; }
@ -36,4 +35,4 @@ class LocalModeRayRuntime : public AbstractRayRuntime {
};
} // namespace internal
} // namespace ray
} // namespace ray

View file

@ -39,8 +39,8 @@ NativeRayRuntime::NativeRayRuntime() {
ProcessHelper::GetInstance().CreateGlobalStateAccessor(bootstrap_address);
}
ActorID NativeRayRuntime::GetCurrentActorID() {
return core::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID();
const WorkerContext &NativeRayRuntime::GetWorkerContext() {
return core::CoreWorkerProcess::GetCoreWorker().GetWorkerContext();
}
} // namespace internal

View file

@ -24,8 +24,9 @@ namespace internal {
class NativeRayRuntime : public AbstractRayRuntime {
public:
NativeRayRuntime();
ActorID GetCurrentActorID();
const WorkerContext &GetWorkerContext();
};
} // namespace internal
} // namespace ray
} // namespace ray

View file

@ -32,7 +32,8 @@ LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_t
void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
ObjectID *object_id) {
PutRaw(data, (const ObjectID)(*object_id));
*object_id = ObjectID::FromRandom();
PutRaw(data, *object_id);
}
void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,

View file

@ -76,12 +76,6 @@ using ray::core::CoreWorkerProcess;
std::shared_ptr<msgpack::sbuffer> TaskExecutor::current_actor_ = nullptr;
// TODO(SongGuyang): Make a common task execution function used for both local mode and
// cluster mode.
std::unique_ptr<ObjectID> TaskExecutor::Execute(InvocationSpec &invocation) {
return std::make_unique<ObjectID>();
};
/// TODO(qicosmos): Need to add more details of the error messages, such as object id,
/// task id etc.
std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(

View file

@ -67,9 +67,6 @@ class TaskExecutor {
public:
TaskExecutor() = default;
/// TODO(SongGuyang): support multiple tasks execution
std::unique_ptr<ObjectID> Execute(InvocationSpec &invocation);
static void Invoke(
const TaskSpecification &task_spec,
std::shared_ptr<msgpack::sbuffer> actor,

View file

@ -43,16 +43,18 @@ void ProcessHelper::StartRayNode(const int port,
cmdargs.insert(cmdargs.end(), head_args.begin(), head_args.end());
}
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
std::this_thread::sleep_for(std::chrono::seconds(5));
auto spawn_result = Process::Spawn(cmdargs, true);
RAY_CHECK(!spawn_result.second);
spawn_result.first.Wait();
return;
}
void ProcessHelper::StopRayNode() {
std::vector<std::string> cmdargs({"ray", "stop"});
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
std::this_thread::sleep_for(std::chrono::seconds(3));
auto spawn_result = Process::Spawn(cmdargs, true);
RAY_CHECK(!spawn_result.second);
spawn_result.first.Wait();
return;
}