diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 297d158fb..5e3718d17 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -157,8 +157,9 @@ ActorID LocalModeTaskSubmitter::GetActor(const std::string &actor_name) const { ray::PlacementGroup LocalModeTaskSubmitter::CreatePlacementGroup( const ray::PlacementGroupCreationOptions &create_options) { - ray::PlacementGroup placement_group{ray::PlacementGroupID::FromRandom().Binary(), - create_options}; + ray::PlacementGroup placement_group{ + PlacementGroupID::Of(local_mode_ray_tuntime_.GetCurrentJobID()).Binary(), + create_options}; placement_group.SetWaitCallbak([this](const std::string &id, int timeout_seconds) { return WaitPlacementGroupReady(id, timeout_seconds); }); diff --git a/java/api/src/main/java/io/ray/api/id/PlacementGroupId.java b/java/api/src/main/java/io/ray/api/id/PlacementGroupId.java index a27ec6f54..a9a84ec16 100644 --- a/java/api/src/main/java/io/ray/api/id/PlacementGroupId.java +++ b/java/api/src/main/java/io/ray/api/id/PlacementGroupId.java @@ -8,7 +8,9 @@ import java.util.Random; /** Represents the id of a placement group. */ public class PlacementGroupId extends BaseId implements Serializable { - public static final int LENGTH = 16; + private static final int UNIQUE_BYTES_LENGTH = 14; + + public static final int LENGTH = JobId.LENGTH + UNIQUE_BYTES_LENGTH; public static final PlacementGroupId NIL = nil(); diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 4d839f8ed..cd7890119 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -166,10 +166,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: CPlacementGroupID FromBinary(const c_string &binary) @staticmethod - const CActorID Nil() + const CPlacementGroupID Nil() @staticmethod size_t Size() @staticmethod - CPlacementGroupID FromRandom() + CPlacementGroupID Of(CJobID job_id) diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index 47831c3be..2b4f5c78f 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -357,7 +357,12 @@ cdef class PlacementGroupID(BaseID): @classmethod def from_random(cls): - return cls(CPlacementGroupID.FromRandom().Binary()) + return cls(os.urandom(CPlacementGroupID.Size())) + + @classmethod + def of(cls, job_id): + assert isinstance(job_id, JobID) + return cls(CPlacementGroupID.Of(CJobID.FromBinary(job_id.binary())).Binary()) @classmethod def nil(cls): diff --git a/src/ray/common/id.cc b/src/ray/common/id.cc index 897bced08..a7c51ccce 100644 --- a/src/ray/common/id.cc +++ b/src/ray/common/id.cc @@ -305,6 +305,22 @@ uint32_t JobID::ToInt() { return value; } +PlacementGroupID PlacementGroupID::Of(const JobID &job_id) { + // No need to set transport type for a random object id. + // No need to assign put_index/return_index bytes. + std::string data(PlacementGroupID::kUniqueBytesLength, 0); + FillRandom(&data); + std::copy_n(job_id.Data(), JobID::kLength, std::back_inserter(data)); + RAY_CHECK(data.size() == kLength); + return PlacementGroupID::FromBinary(data); +} + +JobID PlacementGroupID::JobId() const { + RAY_CHECK(!IsNil()); + return JobID::FromBinary(std::string( + reinterpret_cast(this->Data() + kUniqueBytesLength), JobID::kLength)); +} + #define ID_OSTREAM_OPERATOR(id_type) \ std::ostream &operator<<(std::ostream &os, const id_type &id) { \ if (id.IsNil()) { \ diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 5a43885fd..efe6c8ed9 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -331,17 +331,35 @@ class ObjectID : public BaseID { }; class PlacementGroupID : public BaseID { + private: + static constexpr size_t kUniqueBytesLength = 14; + public: - static constexpr size_t kLength = 16; + /// Length of `PlacementGroupID` in bytes. + static constexpr size_t kLength = kUniqueBytesLength + JobID::kLength; /// Size of `PlacementGroupID` in bytes. /// /// \return Size of `PlacementGroupID` in bytes. static constexpr size_t Size() { return kLength; } + /// Creates a `PlacementGroupID` by hashing the given information. + /// + /// \param job_id The job id to which this actor belongs. + /// + /// \return The random `PlacementGroupID`. + static PlacementGroupID Of(const JobID &job_id); + + static PlacementGroupID FromRandom() = delete; + /// Constructor of `PlacementGroupID`. PlacementGroupID() : BaseID() {} + /// Get the job id to which this placement group belongs. + /// + /// \return The job id to which this placement group belongs. + JobID JobId() const; + MSGPACK_DEFINE(id_); private: diff --git a/src/ray/common/id_test.cc b/src/ray/common/id_test.cc index c61191430..c6b9d1c1b 100644 --- a/src/ray/common/id_test.cc +++ b/src/ray/common/id_test.cc @@ -137,6 +137,27 @@ TEST(HashTest, TestNilHash) { ASSERT_NE(id1.Hash(), id2.Hash()); } +TEST(PlacementGroupIDTest, TestPlacementGroup) { + { + // test from binary + PlacementGroupID placement_group_id_1 = PlacementGroupID::Of(JobID::FromInt(1)); + const auto placement_group_id_1_binary = placement_group_id_1.Binary(); + const auto placement_group_id_2 = + PlacementGroupID::FromBinary(placement_group_id_1_binary); + ASSERT_EQ(placement_group_id_1, placement_group_id_2); + const auto placement_group_id_1_hex = placement_group_id_1.Hex(); + const auto placement_group_id_3 = PlacementGroupID::FromHex(placement_group_id_1_hex); + ASSERT_EQ(placement_group_id_1, placement_group_id_3); + } + + { + // test get job id + auto job_id = JobID::FromInt(1); + const PlacementGroupID placement_group_id = PlacementGroupID::Of(job_id); + ASSERT_EQ(job_id, placement_group_id.JobId()); + } +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b487234be..98a267138 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1880,7 +1880,7 @@ Status CoreWorker::CreatePlacementGroup( } } } - const PlacementGroupID placement_group_id = PlacementGroupID::FromRandom(); + const PlacementGroupID placement_group_id = PlacementGroupID::Of(GetCurrentJobId()); PlacementGroupSpecBuilder builder; builder.SetPlacementGroupSpec(placement_group_id, placement_group_creation_options.name, diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index b38d14e91..3cd2adb86 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -154,7 +154,7 @@ struct Mocker { const ActorID &actor_id) { PlacementGroupSpecBuilder builder; - auto placement_group_id = PlacementGroupID::FromRandom(); + auto placement_group_id = PlacementGroupID::Of(job_id); builder.SetPlacementGroupSpec(placement_group_id, name, bundles, diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc index 5a3ffc3c3..572e0cc3a 100644 --- a/src/ray/raylet/placement_group_resource_manager_test.cc +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -77,7 +77,7 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleResource) { // 1. create bundle spec. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 1.0}); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1); @@ -92,7 +92,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleResource) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleWithInsufficientResource) { // 1. create bundle spec. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 2.0}); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1); @@ -106,7 +106,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { // 1. create bundle spec. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 1.0}); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1); @@ -137,7 +137,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { // 1. create bundle spec. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 1.0}); auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); @@ -161,7 +161,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndReturn) { // 1. create two bundles spec. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 1.0}); auto first_bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); @@ -234,7 +234,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) { // 1. create one bundle spec. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 1.0}); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 1); @@ -263,7 +263,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) { // 1. create one bundle spec. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 1.0}); auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); @@ -324,7 +324,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { // 1. create a placement group spec with 4 bundles and each required 1 CPU. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 1.0}); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4); @@ -383,7 +383,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) { // 1. create a placement group spec with 4 bundles and each required 1 CPU. - auto group_id = PlacementGroupID::FromRandom(); + auto group_id = PlacementGroupID::Of(JobID::FromInt(1)); absl::flat_hash_map unit_resource; unit_resource.insert({"CPU", 1.0}); auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4);