[C++ Worker]Add some api of placement group (#18431)

This commit is contained in:
qicosmos 2021-09-13 15:10:54 +08:00 committed by GitHub
parent 3bc5f0501f
commit ac0a153b06
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 112 additions and 34 deletions

View file

@ -150,13 +150,6 @@ PlacementGroup CreatePlacementGroup(
/// \param[in] placement_group_id Id of the placement group.
void RemovePlacementGroup(const std::string &placement_group_id);
/// Wait for the placement group to be ready within the specified time.
///
/// \param[in] id Id of the placement group.
/// \param[in] timeout_seconds Timeout in seconds.
/// \return True if the placement group is created. False otherwise.
bool WaitPlacementGroupReady(const std::string &id, int timeout_seconds);
/// Returns true if the current actor was restarted, otherwise false.
bool WasCurrentActorRestarted();
@ -282,10 +275,6 @@ inline void RemovePlacementGroup(const std::string &placement_group_id) {
return ray::internal::GetRayRuntime()->RemovePlacementGroup(placement_group_id);
}
inline bool WaitPlacementGroupReady(const std::string &id, int timeout_seconds) {
return ray::internal::GetRayRuntime()->WaitPlacementGroupReady(id, timeout_seconds);
}
inline bool WasCurrentActorRestarted() {
return ray::internal::GetRayRuntime()->WasCurrentActorRestarted();
}

View file

@ -66,6 +66,12 @@ class ActorCreator {
return *this;
}
ActorCreator &SetPlacementGroup(PlacementGroup group, int bundle_index) {
create_options_.group = group;
create_options_.bundle_index = bundle_index;
return *this;
}
private:
RayRuntime *runtime_;
RemoteFunctionHolder remote_function_holder_;

View file

@ -44,6 +44,12 @@ class TaskCaller {
return *this;
}
TaskCaller &SetPlacementGroup(PlacementGroup group, int bundle_index) {
task_options_.group = group;
task_options_.bundle_index = bundle_index;
return *this;
}
private:
RayRuntime *runtime_;
RemoteFunctionHolder remote_function_holder_{};

View file

@ -37,19 +37,6 @@ inline void CheckTaskOptions(const std::unordered_map<std::string, double> &reso
}
}
struct CallOptions {
std::string name;
std::unordered_map<std::string, double> resources;
};
struct ActorCreationOptions {
bool global;
std::string name;
std::unordered_map<std::string, double> resources;
int max_restarts = 0;
int max_concurrency = 1;
};
enum class PlacementStrategy {
PACK = 0,
SPREAD = 1,
@ -72,7 +59,7 @@ class PlacementGroup {
PlacementGroup() = default;
PlacementGroup(std::string id, internal::PlacementGroupCreationOptions options)
: id_(std::move(id)), options_(std::move(options)) {}
std::string GetID() { return id_; }
std::string GetID() const { return id_; }
std::string GetName() { return options_.name; }
std::vector<std::unordered_map<std::string, double>> GetBundles() {
return options_.bundles;
@ -82,6 +69,7 @@ class PlacementGroup {
void SetWaitCallbak(std::function<bool(const std::string &, int)> callback) {
callback_ = std::move(callback);
}
bool Empty() const { return id_.empty(); }
private:
std::string id_;
@ -89,4 +77,24 @@ class PlacementGroup {
std::function<bool(const std::string &, int)> callback_;
};
namespace internal {
struct CallOptions {
std::string name;
std::unordered_map<std::string, double> resources;
PlacementGroup group;
int bundle_index;
};
struct ActorCreationOptions {
bool global;
std::string name;
std::unordered_map<std::string, double> resources;
int max_restarts = 0;
int max_concurrency = 1;
PlacementGroup group;
int bundle_index;
};
} // namespace internal
} // namespace ray

View file

@ -30,6 +30,16 @@ RayFunction BuildRayFunction(InvocationSpec &invocation) {
return RayFunction(ray::Language::CPP, function_descriptor);
}
template <typename T>
static BundleID GetBundleID(const T &options) {
BundleID bundle_id = std::make_pair(PlacementGroupID::Nil(), -1);
if (!options.group.Empty()) {
PlacementGroupID id = PlacementGroupID::FromBinary(options.group.GetID());
bundle_id = std::make_pair(id, options.bundle_index);
}
return bundle_id;
};
ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
const CallOptions &call_options) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
@ -41,9 +51,10 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
return_refs = core_worker.SubmitActorTask(
invocation.actor_id, BuildRayFunction(invocation), invocation.args, options);
} else {
return_refs = core_worker.SubmitTask(
BuildRayFunction(invocation), invocation.args, options, 1, false,
std::make_pair(PlacementGroupID::Nil(), -1), true, "");
BundleID bundle_id = GetBundleID(call_options);
return_refs =
core_worker.SubmitTask(BuildRayFunction(invocation), invocation.args, options, 1,
false, std::move(bundle_id), true, "");
}
std::vector<ObjectID> return_ids;
for (const auto &ref : return_refs) {
@ -66,6 +77,7 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
name = GetFullName(create_options.global, create_options.name);
}
std::string ray_namespace = "";
BundleID bundle_id = GetBundleID(create_options);
ray::core::ActorCreationOptions actor_options{create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
@ -75,7 +87,8 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
/*is_detached=*/false,
name,
ray_namespace,
/*is_asyncio=*/false};
/*is_asyncio=*/false,
std::move(bundle_id)};
ActorID actor_id;
auto status = core_worker.CreateActor(BuildRayFunction(invocation), invocation.args,
actor_options, "", &actor_id);

View file

@ -262,7 +262,7 @@ TEST(RayApiTest, CreateAndRemovePlacementGroup) {
ray::internal::PlacementGroupCreationOptions options1{
false, "first_placement_group", bundles, ray::internal::PlacementStrategy::PACK};
auto first_placement_group = ray::CreatePlacementGroup(options1);
EXPECT_TRUE(ray::WaitPlacementGroupReady(first_placement_group.GetID(), 10));
EXPECT_TRUE(first_placement_group.Wait(10));
ray::RemovePlacementGroup(first_placement_group.GetID());
}

View file

@ -222,7 +222,7 @@ TEST(RayClusterModeTest, ResourcesManagementTest) {
EXPECT_EQ(*r1.Get(), 1);
auto actor2 = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetResources({{"CPU", 100.0}})
.SetResources({{"CPU", 10000.0}})
.Remote();
auto r2 = actor2.Task(&Counter::Plus1).Remote();
std::vector<ray::ObjectRef<int>> objects{r2};
@ -276,17 +276,73 @@ TEST(RayClusterModeTest, GetActorTest) {
EXPECT_FALSE(ray::GetActor<Counter>("not_exist_actor"));
}
TEST(RayClusterModeTest, CreateAndRemovePlacementGroup) {
ray::PlacementGroup CreateSimplePlacementGroup(const std::string &name) {
std::vector<std::unordered_map<std::string, double>> bundles{{{"CPU", 1}}};
ray::internal::PlacementGroupCreationOptions options{
false, name, bundles, ray::internal::PlacementStrategy::PACK};
return ray::CreatePlacementGroup(options);
}
TEST(RayClusterModeTest, CreateAndRemovePlacementGroup) {
auto first_placement_group = CreateSimplePlacementGroup("first_placement_group");
EXPECT_TRUE(first_placement_group.Wait(10));
EXPECT_THROW(CreateSimplePlacementGroup("first_placement_group"),
ray::internal::RayException);
ray::RemovePlacementGroup(first_placement_group.GetID());
}
TEST(RayClusterModeTest, CreatePlacementGroupExceedsClusterResource) {
std::vector<std::unordered_map<std::string, double>> bundles{{{"CPU", 10000}}};
ray::internal::PlacementGroupCreationOptions options{
false, "first_placement_group", bundles, ray::internal::PlacementStrategy::PACK};
auto first_placement_group = ray::CreatePlacementGroup(options);
EXPECT_TRUE(ray::WaitPlacementGroupReady(first_placement_group.GetID(), 10));
EXPECT_FALSE(first_placement_group.Wait(3));
ray::RemovePlacementGroup(first_placement_group.GetID());
}
TEST(RayClusterModeTest, CreateActorWithPlacementGroup) {
auto placement_group = CreateSimplePlacementGroup("first_placement_group");
EXPECT_TRUE(placement_group.Wait(10));
auto actor1 = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetResources({{"CPU", 1.0}})
.SetPlacementGroup(placement_group, 0)
.Remote();
auto r1 = actor1.Task(&Counter::Plus1).Remote();
std::vector<ray::ObjectRef<int>> objects{r1};
auto result = ray::Wait(objects, 1, 1000);
EXPECT_EQ(result.ready.size(), 1);
EXPECT_EQ(result.unready.size(), 0);
auto result_vector = ray::Get(objects);
EXPECT_EQ(*(result_vector[0]), 1);
// Exceeds the resources of PlacementGroup.
auto actor2 = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
.SetResources({{"CPU", 2.0}})
.SetPlacementGroup(placement_group, 0)
.Remote();
auto r2 = actor2.Task(&Counter::Plus1).Remote();
std::vector<ray::ObjectRef<int>> objects2{r2};
auto result2 = ray::Wait(objects2, 1, 1000);
EXPECT_EQ(result2.ready.size(), 0);
EXPECT_EQ(result2.unready.size(), 1);
ray::RemovePlacementGroup(placement_group.GetID());
}
TEST(RayClusterModeTest, TaskWithPlacementGroup) {
auto placement_group = CreateSimplePlacementGroup("first_placement_group");
EXPECT_TRUE(placement_group.Wait(10));
auto r = ray::Task(Return1)
.SetResources({{"CPU", 1.0}})
.SetPlacementGroup(placement_group, 0)
.Remote();
EXPECT_EQ(*r.Get(), 1);
ray::RemovePlacementGroup(placement_group.GetID());
}
int main(int argc, char **argv) {
absl::ParseCommandLine(argc, argv);
cmd_argc = argc;