[Placement Group] Make PlacementGroupID generate from JobID (#23175)

This commit is contained in:
Larry 2022-03-21 17:09:16 +08:00 committed by GitHub
parent d3159f201b
commit 81dcf9ff35
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 81 additions and 18 deletions

View file

@ -157,8 +157,9 @@ ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name) const {
ray::PlacementGroup LocalModeTaskSubmitter::CreatePlacementGroup( ray::PlacementGroup LocalModeTaskSubmitter::CreatePlacementGroup(
const ray::PlacementGroupCreationOptions &create_options) { const ray::PlacementGroupCreationOptions &create_options) {
ray::PlacementGroup placement_group{ray::PlacementGroupID::FromRandom().Binary(), ray::PlacementGroup placement_group{
create_options}; PlacementGroupID::Of(local_mode_ray_tuntime_.GetCurrentJobID()).Binary(),
create_options};
placement_group.SetWaitCallbak([this](const std::string &id, int timeout_seconds) { placement_group.SetWaitCallbak([this](const std::string &id, int timeout_seconds) {
return WaitPlacementGroupReady(id, timeout_seconds); return WaitPlacementGroupReady(id, timeout_seconds);
}); });

View file

@ -8,7 +8,9 @@ import java.util.Random;
/** Represents the id of a placement group. */ /** Represents the id of a placement group. */
public class PlacementGroupId extends BaseId implements Serializable { public class PlacementGroupId extends BaseId implements Serializable {
public static final int LENGTH = 16; private static final int UNIQUE_BYTES_LENGTH = 14;
public static final int LENGTH = JobId.LENGTH + UNIQUE_BYTES_LENGTH;
public static final PlacementGroupId NIL = nil(); public static final PlacementGroupId NIL = nil();

View file

@ -166,10 +166,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
CPlacementGroupID FromBinary(const c_string &binary) CPlacementGroupID FromBinary(const c_string &binary)
@staticmethod @staticmethod
const CActorID Nil() const CPlacementGroupID Nil()
@staticmethod @staticmethod
size_t Size() size_t Size()
@staticmethod @staticmethod
CPlacementGroupID FromRandom() CPlacementGroupID Of(CJobID job_id)

View file

@ -357,7 +357,12 @@ cdef class PlacementGroupID(BaseID):
@classmethod @classmethod
def from_random(cls): def from_random(cls):
return cls(CPlacementGroupID.FromRandom().Binary()) return cls(os.urandom(CPlacementGroupID.Size()))
@classmethod
def of(cls, job_id):
assert isinstance(job_id, JobID)
return cls(CPlacementGroupID.Of(CJobID.FromBinary(job_id.binary())).Binary())
@classmethod @classmethod
def nil(cls): def nil(cls):

View file

@ -305,6 +305,22 @@ uint32_t JobID::ToInt() {
return value; return value;
} }
PlacementGroupID PlacementGroupID::Of(const JobID &job_id) {
// No need to set transport type for a random object id.
// No need to assign put_index/return_index bytes.
std::string data(PlacementGroupID::kUniqueBytesLength, 0);
FillRandom(&data);
std::copy_n(job_id.Data(), JobID::kLength, std::back_inserter(data));
RAY_CHECK(data.size() == kLength);
return PlacementGroupID::FromBinary(data);
}
JobID PlacementGroupID::JobId() const {
RAY_CHECK(!IsNil());
return JobID::FromBinary(std::string(
reinterpret_cast<const char *>(this->Data() + kUniqueBytesLength), JobID::kLength));
}
#define ID_OSTREAM_OPERATOR(id_type) \ #define ID_OSTREAM_OPERATOR(id_type) \
std::ostream &operator<<(std::ostream &os, const id_type &id) { \ std::ostream &operator<<(std::ostream &os, const id_type &id) { \
if (id.IsNil()) { \ if (id.IsNil()) { \

View file

@ -331,17 +331,35 @@ class ObjectID : public BaseID<ObjectID> {
}; };
class PlacementGroupID : public BaseID<PlacementGroupID> { class PlacementGroupID : public BaseID<PlacementGroupID> {
private:
static constexpr size_t kUniqueBytesLength = 14;
public: public:
static constexpr size_t kLength = 16; /// Length of `PlacementGroupID` in bytes.
static constexpr size_t kLength = kUniqueBytesLength + JobID::kLength;
/// Size of `PlacementGroupID` in bytes. /// Size of `PlacementGroupID` in bytes.
/// ///
/// \return Size of `PlacementGroupID` in bytes. /// \return Size of `PlacementGroupID` in bytes.
static constexpr size_t Size() { return kLength; } static constexpr size_t Size() { return kLength; }
/// Creates a `PlacementGroupID` by hashing the given information.
///
/// \param job_id The job id to which this actor belongs.
///
/// \return The random `PlacementGroupID`.
static PlacementGroupID Of(const JobID &job_id);
static PlacementGroupID FromRandom() = delete;
/// Constructor of `PlacementGroupID`. /// Constructor of `PlacementGroupID`.
PlacementGroupID() : BaseID() {} PlacementGroupID() : BaseID() {}
/// Get the job id to which this placement group belongs.
///
/// \return The job id to which this placement group belongs.
JobID JobId() const;
MSGPACK_DEFINE(id_); MSGPACK_DEFINE(id_);
private: private:

View file

@ -137,6 +137,27 @@ TEST(HashTest, TestNilHash) {
ASSERT_NE(id1.Hash(), id2.Hash()); ASSERT_NE(id1.Hash(), id2.Hash());
} }
TEST(PlacementGroupIDTest, TestPlacementGroup) {
{
// test from binary
PlacementGroupID placement_group_id_1 = PlacementGroupID::Of(JobID::FromInt(1));
const auto placement_group_id_1_binary = placement_group_id_1.Binary();
const auto placement_group_id_2 =
PlacementGroupID::FromBinary(placement_group_id_1_binary);
ASSERT_EQ(placement_group_id_1, placement_group_id_2);
const auto placement_group_id_1_hex = placement_group_id_1.Hex();
const auto placement_group_id_3 = PlacementGroupID::FromHex(placement_group_id_1_hex);
ASSERT_EQ(placement_group_id_1, placement_group_id_3);
}
{
// test get job id
auto job_id = JobID::FromInt(1);
const PlacementGroupID placement_group_id = PlacementGroupID::Of(job_id);
ASSERT_EQ(job_id, placement_group_id.JobId());
}
}
} // namespace ray } // namespace ray
int main(int argc, char **argv) { int main(int argc, char **argv) {

View file

@ -1880,7 +1880,7 @@ Status CoreWorker::CreatePlacementGroup(
} }
} }
} }
const PlacementGroupID placement_group_id = PlacementGroupID::FromRandom(); const PlacementGroupID placement_group_id = PlacementGroupID::Of(GetCurrentJobId());
PlacementGroupSpecBuilder builder; PlacementGroupSpecBuilder builder;
builder.SetPlacementGroupSpec(placement_group_id, builder.SetPlacementGroupSpec(placement_group_id,
placement_group_creation_options.name, placement_group_creation_options.name,

View file

@ -154,7 +154,7 @@ struct Mocker {
const ActorID &actor_id) { const ActorID &actor_id) {
PlacementGroupSpecBuilder builder; PlacementGroupSpecBuilder builder;
auto placement_group_id = PlacementGroupID::FromRandom(); auto placement_group_id = PlacementGroupID::Of(job_id);
builder.SetPlacementGroupSpec(placement_group_id, builder.SetPlacementGroupSpec(placement_group_id,
name, name,
bundles, bundles,

View file

@ -77,7 +77,7 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleResource) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleResource) {
// 1. create bundle spec. // 1. create bundle spec.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0}); unit_resource.insert({"CPU", 1.0});
auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1);
@ -92,7 +92,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleResource) {
TEST_F(NewPlacementGroupResourceManagerTest, TEST_F(NewPlacementGroupResourceManagerTest,
TestNewPrepareBundleWithInsufficientResource) { TestNewPrepareBundleWithInsufficientResource) {
// 1. create bundle spec. // 1. create bundle spec.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 2.0}); unit_resource.insert({"CPU", 2.0});
auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1);
@ -106,7 +106,7 @@ TEST_F(NewPlacementGroupResourceManagerTest,
TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
// 1. create bundle spec. // 1. create bundle spec.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0}); unit_resource.insert({"CPU", 1.0});
auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1);
@ -137,7 +137,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
// 1. create bundle spec. // 1. create bundle spec.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0}); unit_resource.insert({"CPU", 1.0});
auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource);
@ -161,7 +161,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndReturn) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndReturn) {
// 1. create two bundles spec. // 1. create two bundles spec.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0}); unit_resource.insert({"CPU", 1.0});
auto first_bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); auto first_bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource);
@ -234,7 +234,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) {
// 1. create one bundle spec. // 1. create one bundle spec.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0}); unit_resource.insert({"CPU", 1.0});
auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1);
@ -263,7 +263,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare)
TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) {
// 1. create one bundle spec. // 1. create one bundle spec.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0}); unit_resource.insert({"CPU", 1.0});
auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource);
@ -324,7 +324,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
// 1. create a placement group spec with 4 bundles and each required 1 CPU. // 1. create a placement group spec with 4 bundles and each required 1 CPU.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0}); unit_resource.insert({"CPU", 1.0});
auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4);
@ -383,7 +383,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) {
// 1. create a placement group spec with 4 bundles and each required 1 CPU. // 1. create a placement group spec with 4 bundles and each required 1 CPU.
auto group_id = PlacementGroupID::FromRandom(); auto group_id = PlacementGroupID::Of(JobID::FromInt(1));
absl::flat_hash_map<std::string, double> unit_resource; absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0}); unit_resource.insert({"CPU", 1.0});
auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4);