[core] Update placement group retry implementation (#18842)

* exp backoff

* up

* format

* up

* up

* up

* up

* up

* format

* fix

* up

* format

* adjust ordering

* up

* Revert "[tune] Cache unstaged placement groups for potential re-use (#18706)"

This reverts commit 2e99fb215f.

* up

* update

* format

* up

* format

* fix

* Revert "Revert "[tune] Cache unstaged placement groups for potential re-use (#18706)""

This reverts commit 93425fdb986059e53699623a0fc8590c062e139b.

* up

* format

* fix lint

* up

* up

* up

* up

* check

* add test1

* format

* up

* add test

* up

* up

* up

* fix

* up

* up

* up

* add test

* format

* up

* up

* fix lint

* format

* fix

* format

* fix

* up
This commit is contained in:
Yi Cheng 2021-10-04 21:31:56 -07:00 committed by GitHub
parent beaba4782a
commit 056c3af699
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 502 additions and 245 deletions

View file

@ -177,6 +177,7 @@ build:debug --strip="never"
# Undefined Behavior Sanitizer
build:ubsan --strip=never
build:ubsan --copt -fsanitize=undefined
build:ubsan --copt -fno-sanitize=vptr
build:ubsan --copt -fno-sanitize-recover=all
build:ubsan --copt -g
build:ubsan --linkopt -fsanitize=undefined

View file

@ -414,7 +414,6 @@ cc_library(
],
) + [
"src/ray/raylet/scheduling/cluster_resource_data.cc",
"src/ray/raylet/scheduling/fixed_point.cc",
"src/ray/raylet/scheduling/scheduling_ids.cc",
],
hdrs = glob(
@ -553,6 +552,7 @@ cc_library(
":pubsub_lib",
":raylet_client_lib",
":worker_rpc",
"@com_google_absl//absl/container:btree",
],
)
@ -1181,6 +1181,22 @@ cc_test(
],
)
cc_test(
name = "gcs_placement_group_manager_mock_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":gcs_server_lib",
":gcs_test_util_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "placement_group_resource_manager_test",
size = "small",

View file

@ -1,4 +1,4 @@
// Copyright The Ray Authors.
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -30,8 +30,8 @@ class MockGcsPlacementGroupSchedulerInterface
public:
MOCK_METHOD(void, ScheduleUnplacedBundles,
(std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_callback,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_callback),
PGSchedulingFailureCallback failure_callback,
PGSchedulingSuccessfulCallback success_callback),
(override));
MOCK_METHOD((absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>>),
GetBundlesOnNode, (const NodeID &node_id), (override));
@ -63,11 +63,12 @@ namespace gcs {
class MockGcsScheduleStrategy : public GcsScheduleStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
MOCK_METHOD(
ScheduleResult, Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
};
} // namespace gcs
@ -78,11 +79,12 @@ namespace gcs {
class MockGcsPackStrategy : public GcsPackStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
MOCK_METHOD(
ScheduleResult, Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
};
} // namespace gcs
@ -93,11 +95,12 @@ namespace gcs {
class MockGcsSpreadStrategy : public GcsSpreadStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
MOCK_METHOD(
ScheduleResult, Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
};
} // namespace gcs
@ -108,11 +111,12 @@ namespace gcs {
class MockGcsStrictPackStrategy : public GcsStrictPackStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
MOCK_METHOD(
ScheduleResult, Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
};
} // namespace gcs
@ -123,11 +127,12 @@ namespace gcs {
class MockGcsStrictSpreadStrategy : public GcsStrictSpreadStrategy {
public:
MOCK_METHOD(ScheduleMap, Schedule,
(std::vector<std::shared_ptr<const ray::BundleSpecification>> & bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
MOCK_METHOD(
ScheduleResult, Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
(override));
};
} // namespace gcs
@ -160,8 +165,8 @@ class MockGcsPlacementGroupScheduler : public GcsPlacementGroupScheduler {
public:
MOCK_METHOD(void, ScheduleUnplacedBundles,
(std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_handler,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_handler),
PGSchedulingFailureCallback failure_handler,
PGSchedulingSuccessfulCallback success_handler),
(override));
MOCK_METHOD(void, DestroyPlacementGroupBundleResourcesIfExists,
(const PlacementGroupID &placement_group_id), (override));

View file

@ -17,6 +17,7 @@ namespace gcs {
class MockGcsResourceManager : public GcsResourceManager {
public:
using GcsResourceManager::GcsResourceManager;
MOCK_METHOD(void, HandleGetResources,
(const rpc::GetResourcesRequest &request, rpc::GetResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback),

View file

@ -17,6 +17,7 @@ namespace gcs {
class MockRedisStoreClient : public RedisStoreClient {
public:
MockRedisStoreClient() : RedisStoreClient(nullptr) {}
MOCK_METHOD(Status, AsyncPut,
(const std::string &table_name, const std::string &key,
const std::string &data, const StatusCallback &callback),

View file

@ -35,6 +35,12 @@ class MockWorkerLeaseInterface : public WorkerLeaseInterface {
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size),
(override));
MOCK_METHOD(
void, RequestWorkerLease,
(const rpc::TaskSpec &task_spec,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size),
(override));
MOCK_METHOD(ray::Status, ReturnWorker,
(int worker_port, const WorkerID &worker_id, bool disconnect_worker),
(override));

View file

@ -235,8 +235,10 @@ RAY_CONFIG(uint64_t, gcs_redis_heartbeat_interval_milliseconds, 100)
RAY_CONFIG(uint32_t, gcs_lease_worker_retry_interval_ms, 200)
/// Duration to wait between retries for creating actor in gcs server.
RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200)
/// Duration to wait between retries for creating placement group in gcs server.
RAY_CONFIG(uint32_t, gcs_create_placement_group_retry_interval_ms, 200)
/// Exponential backoff params for gcs to retry creating a placement group
RAY_CONFIG(uint32_t, gcs_create_placement_group_retry_min_interval_ms, 200)
RAY_CONFIG(uint32_t, gcs_create_placement_group_retry_max_interval_ms, 5000)
RAY_CONFIG(double, gcs_create_placement_group_retry_multiplier, 1.5);
/// Maximum number of destroyed actors in GCS server memory cache.
RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000)
/// Maximum number of dead nodes in GCS server memory cache.

View file

@ -731,7 +731,7 @@ Status ServiceBasedNodeResourceInfoAccessor::AsyncUpdateResources(
});
};
sequencer_.Post(node_id, operation);
sequencer_.Post(node_id, std::move(operation));
return Status::OK();
}

View file

@ -182,7 +182,7 @@ void GcsPlacementGroupManager::RegisterPlacementGroup(
.emplace_back(std::move(callback));
registered_placement_groups_.emplace(placement_group->GetPlacementGroupID(),
placement_group);
pending_placement_groups_.emplace_back(placement_group);
AddToPendingQueue(placement_group);
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
placement_group_id, placement_group->GetPlacementGroupTableData(),
@ -227,7 +227,8 @@ PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName(
}
void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
std::shared_ptr<GcsPlacementGroup> placement_group, bool is_feasible) {
std::shared_ptr<GcsPlacementGroup> placement_group, ExponentialBackOff backoff,
bool is_feasible) {
RAY_LOG(DEBUG) << "Failed to create placement group " << placement_group->GetName()
<< ", id: " << placement_group->GetPlacementGroupID() << ", try again.";
@ -235,7 +236,6 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
// We will attempt to schedule this placement_group once an eligible node is
// registered.
infeasible_placement_groups_.emplace_back(std::move(placement_group));
MarkSchedulingDone();
} else {
auto state = placement_group->GetState();
RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING ||
@ -247,14 +247,13 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
// NOTE: If a node is dead, the placement group scheduler should try to recover the
// group by rescheduling the bundles of the dead node. This should have higher
// priority than trying to place other placement groups.
pending_placement_groups_.emplace_front(std::move(placement_group));
AddToPendingQueue(std::move(placement_group), /* rank */ 0);
} else {
pending_placement_groups_.emplace_back(std::move(placement_group));
AddToPendingQueue(std::move(placement_group), std::nullopt, backoff);
}
MarkSchedulingDone();
RetryCreatingPlacementGroup();
}
io_context_.post([this] { SchedulePendingPlacementGroups(); });
MarkSchedulingDone();
}
void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
@ -262,16 +261,11 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
RAY_LOG(INFO) << "Successfully created placement group " << placement_group->GetName()
<< ", id: " << placement_group->GetPlacementGroupID();
placement_group->UpdateState(rpc::PlacementGroupTableData::CREATED);
// Mark the scheduling done firstly.
MarkSchedulingDone();
auto placement_group_id = placement_group->GetPlacementGroupID();
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
placement_group_id, placement_group->GetPlacementGroupTableData(),
[this, placement_group_id](Status status) {
RAY_CHECK_OK(status);
SchedulePendingPlacementGroups();
// Invoke all callbacks for all `WaitPlacementGroupUntilReady` requests of this
// placement group and remove all of them from
// placement_group_to_create_callbacks_.
@ -284,6 +278,8 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
placement_group_to_create_callbacks_.erase(pg_to_create_iter);
}
}));
io_context_.post([this] { SchedulePendingPlacementGroups(); });
MarkSchedulingDone();
}
void GcsPlacementGroupManager::SchedulePendingPlacementGroups() {
@ -300,16 +296,28 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() {
bool is_new_placement_group_scheduled = false;
while (!pending_placement_groups_.empty() && !is_new_placement_group_scheduled) {
const auto placement_group = pending_placement_groups_.front();
pending_placement_groups_.pop_front();
auto iter = pending_placement_groups_.begin();
if (iter->first > absl::GetCurrentTimeNanos()) {
// Here the rank equals the time to schedule, and it's an ordered tree,
// it means all the other tasks should be scheduled after this one.
// If the first one won't be scheduled, we just skip.
// Tick will cover the next time retry.
break;
}
auto backoff = iter->second.first;
auto placement_group = std::move(iter->second.second);
pending_placement_groups_.erase(iter);
const auto &placement_group_id = placement_group->GetPlacementGroupID();
// Do not reschedule if the placement group has removed already.
if (registered_placement_groups_.contains(placement_group_id)) {
MarkSchedulingStarted(placement_group_id);
gcs_placement_group_scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<GcsPlacementGroup> placement_group, bool is_insfeasble) {
OnPlacementGroupCreationFailed(std::move(placement_group), is_insfeasble);
[this, backoff](std::shared_ptr<GcsPlacementGroup> placement_group,
bool is_insfeasble) {
OnPlacementGroupCreationFailed(std::move(placement_group), backoff,
is_insfeasble);
},
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationSuccess(std::move(placement_group));
@ -400,18 +408,10 @@ void GcsPlacementGroupManager::RemovePlacementGroup(
}
// Remove a placement group from a pending list if exists.
auto pending_it = std::find_if(
pending_placement_groups_.begin(), pending_placement_groups_.end(),
[placement_group_id](const std::shared_ptr<GcsPlacementGroup> &placement_group) {
return placement_group->GetPlacementGroupID() == placement_group_id;
});
if (pending_it != pending_placement_groups_.end()) {
// The placement group was pending scheduling, remove it from the queue.
pending_placement_groups_.erase(pending_it);
}
RemoveFromPendingQueue(placement_group_id);
// Remove a placement group from infeasible queue if exists.
pending_it = std::find_if(
auto pending_it = std::find_if(
infeasible_placement_groups_.begin(), infeasible_placement_groups_.end(),
[placement_group_id](const std::shared_ptr<GcsPlacementGroup> &placement_group) {
return placement_group->GetPlacementGroupID() == placement_group_id;
@ -580,10 +580,36 @@ void GcsPlacementGroupManager::WaitPlacementGroup(
}
}
void GcsPlacementGroupManager::RetryCreatingPlacementGroup() {
execute_after(
io_context_, [this] { SchedulePendingPlacementGroups(); },
RayConfig::instance().gcs_create_placement_group_retry_interval_ms());
void GcsPlacementGroupManager::AddToPendingQueue(
std::shared_ptr<GcsPlacementGroup> pg, std::optional<int64_t> rank,
std::optional<ExponentialBackOff> exp_backer) {
if (!rank) {
rank = absl::GetCurrentTimeNanos();
}
if (!exp_backer) {
exp_backer = ExponentialBackOff(
1000000 *
RayConfig::instance().gcs_create_placement_group_retry_min_interval_ms(),
RayConfig::instance().gcs_create_placement_group_retry_multiplier(),
1000000 *
RayConfig::instance().gcs_create_placement_group_retry_max_interval_ms());
} else {
*rank += static_cast<int64_t>(exp_backer->Next());
}
auto val = std::make_pair(*exp_backer, std::move(pg));
pending_placement_groups_.emplace(*rank, std::move(val));
}
void GcsPlacementGroupManager::RemoveFromPendingQueue(const PlacementGroupID &pg_id) {
auto it = std::find_if(pending_placement_groups_.begin(),
pending_placement_groups_.end(), [&pg_id](const auto &val) {
return val.second.second->GetPlacementGroupID() == pg_id;
});
// The placement group was pending scheduling, remove it from the queue.
if (it != pending_placement_groups_.end()) {
pending_placement_groups_.erase(it);
}
}
void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) {
@ -601,7 +627,7 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) {
// creating until a node with the resources is added. we will solve it in next pr.
if (iter->second->GetState() != rpc::PlacementGroupTableData::RESCHEDULING) {
iter->second->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
pending_placement_groups_.emplace_front(iter->second);
AddToPendingQueue(iter->second, 0);
}
}
}
@ -617,9 +643,9 @@ void GcsPlacementGroupManager::OnNodeAdd(const NodeID &node_id) {
// Move all the infeasible placement groups to the pending queue so that we can
// reschedule them.
if (infeasible_placement_groups_.size() > 0) {
auto end_it = pending_placement_groups_.end();
pending_placement_groups_.insert(end_it, infeasible_placement_groups_.cbegin(),
infeasible_placement_groups_.cend());
for (auto &pg : infeasible_placement_groups_) {
AddToPendingQueue(std::move(pg));
}
infeasible_placement_groups_.clear();
}
SchedulePendingPlacementGroups();
@ -683,7 +709,8 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() {
std::shared_ptr<rpc::PlacementGroupLoad> placement_group_load =
std::make_shared<rpc::PlacementGroupLoad>();
int total_cnt = 0;
for (const auto &pending_pg_spec : pending_placement_groups_) {
for (const auto &elem : pending_placement_groups_) {
const auto pending_pg_spec = elem.second.second;
auto placement_group_data = placement_group_load->add_placement_group_data();
auto placement_group_table_data = pending_pg_spec->GetPlacementGroupTableData();
placement_group_data->Swap(&placement_group_table_data);
@ -719,7 +746,7 @@ void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
if (item.second.state() == rpc::PlacementGroupTableData::PENDING ||
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
pending_placement_groups_.emplace_back(std::move(placement_group));
AddToPendingQueue(std::move(placement_group));
}
if (item.second.state() == rpc::PlacementGroupTableData::CREATED ||

View file

@ -15,8 +15,10 @@
#pragma once
#include <gtest/gtest_prod.h>
#include <optional>
#include <utility>
#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/asio/instrumented_io_context.h"
@ -216,7 +218,7 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// \param placement_group The placement_group whose creation task is infeasible.
/// \param is_feasible whether the scheduler can be retry or not currently.
void OnPlacementGroupCreationFailed(std::shared_ptr<GcsPlacementGroup> placement_group,
bool is_feasible = true);
ExponentialBackOff backoff, bool is_feasible);
/// Handle placement_group creation task success. This should be called when the
/// placement_group creation task has been scheduled successfully.
@ -284,6 +286,19 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
std::string DebugString() const;
private:
/// Push a placement group to pending queue.
///
/// \param pg The placementgroup we are adding
/// \param rank The rank for this placement group. Semantically it's the time
/// this placement group to be scheduled. By default it'll be assigned to be
/// the current time.
/// \param exp_backer The exponential backoff. A default one will be given if
/// it's not set. This will be used to generate the deferred time for this pg.
void AddToPendingQueue(std::shared_ptr<GcsPlacementGroup> pg,
std::optional<int64_t> rank = std::nullopt,
std::optional<ExponentialBackOff> exp_backer = std::nullopt);
void RemoveFromPendingQueue(const PlacementGroupID &pg_id);
/// Try to create placement group after a short time.
void RetryCreatingPlacementGroup();
@ -329,12 +344,17 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
absl::flat_hash_map<PlacementGroupID, std::shared_ptr<GcsPlacementGroup>>
registered_placement_groups_;
/// The pending placement_groups which will not be scheduled until there's a resource
/// change.
/// NOTE: When we remove placement group, we need to look for
/// `pending_placement_groups_` and delete the specific placement group, so we can't use
/// `std::priority_queue`.
std::deque<std::shared_ptr<GcsPlacementGroup>> pending_placement_groups_;
/// The pending placement_groups which will not be scheduled until there's a
/// resource change. The pending queue is represented as an ordered map, where
/// the key is the time to schedule the pg and value if a pair containing the
/// actual placement group and a exp-backoff.
/// When error happens, we'll retry it later and this can be simply done by
/// inserting an element into the queue with a bigger key. With this, we don't
/// need to post retry job to io context. And when schedule pending placement
/// group, we always start with the one with the smallest key.
absl::btree_multimap<int64_t,
std::pair<ExponentialBackOff, std::shared_ptr<GcsPlacementGroup>>>
pending_placement_groups_;
/// The infeasible placement_groups that can't be scheduled currently.
std::deque<std::shared_ptr<GcsPlacementGroup>> infeasible_placement_groups_;
@ -374,6 +394,10 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
CountType_MAX = 7,
};
uint64_t counts_[CountType::CountType_MAX] = {0};
FRIEND_TEST(GcsPlacementGroupManagerMockTest, PendingQueuePriorityReschedule);
FRIEND_TEST(GcsPlacementGroupManagerMockTest, PendingQueuePriorityFailed);
FRIEND_TEST(GcsPlacementGroupManagerMockTest, PendingQueuePriorityOrder);
};
} // namespace gcs

View file

@ -14,6 +14,7 @@
#pragma once
#include <memory>
#include <utility>
#include "ray/common/asio/instrumented_io_context.h"
@ -285,6 +286,29 @@ class GcsInternalConfigTable : public GcsTable<UniqueID, StoredConfig> {
/// derive from this class and override class member variables.
class GcsTableStorage {
public:
explicit GcsTableStorage(std::shared_ptr<StoreClient> store_client)
: store_client_(std::move(store_client)) {
job_table_ = std::make_unique<GcsJobTable>(store_client_);
actor_table_ = std::make_unique<GcsActorTable>(store_client_);
placement_group_table_ = std::make_unique<GcsPlacementGroupTable>(store_client_);
task_table_ = std::make_unique<GcsTaskTable>(store_client_);
task_lease_table_ = std::make_unique<GcsTaskLeaseTable>(store_client_);
task_reconstruction_table_ =
std::make_unique<GcsTaskReconstructionTable>(store_client_);
object_table_ = std::make_unique<GcsObjectTable>(store_client_);
node_table_ = std::make_unique<GcsNodeTable>(store_client_);
node_resource_table_ = std::make_unique<GcsNodeResourceTable>(store_client_);
placement_group_schedule_table_ =
std::make_unique<GcsPlacementGroupScheduleTable>(store_client_);
placement_group_schedule_table_ =
std::make_unique<GcsPlacementGroupScheduleTable>(store_client_);
resource_usage_batch_table_ =
std::make_unique<GcsResourceUsageBatchTable>(store_client_);
profile_table_ = std::make_unique<GcsProfileTable>(store_client_);
worker_table_ = std::make_unique<GcsWorkerTable>(store_client_);
system_config_table_ = std::make_unique<GcsInternalConfigTable>(store_client_);
}
GcsJobTable &JobTable() {
RAY_CHECK(job_table_ != nullptr);
return *job_table_;
@ -383,26 +407,8 @@ class GcsTableStorage {
/// that uses redis as storage.
class RedisGcsTableStorage : public GcsTableStorage {
public:
explicit RedisGcsTableStorage(std::shared_ptr<RedisClient> redis_client) {
store_client_ = std::make_shared<RedisStoreClient>(redis_client);
job_table_.reset(new GcsJobTable(store_client_));
actor_table_.reset(new GcsActorTable(store_client_));
placement_group_table_.reset(new GcsPlacementGroupTable(store_client_));
task_table_.reset(new GcsTaskTable(store_client_));
task_lease_table_.reset(new GcsTaskLeaseTable(store_client_));
task_reconstruction_table_.reset(new GcsTaskReconstructionTable(store_client_));
object_table_.reset(new GcsObjectTable(store_client_));
node_table_.reset(new GcsNodeTable(store_client_));
node_resource_table_.reset(new GcsNodeResourceTable(store_client_));
placement_group_schedule_table_.reset(
new GcsPlacementGroupScheduleTable(store_client_));
placement_group_schedule_table_.reset(
new GcsPlacementGroupScheduleTable(store_client_));
resource_usage_batch_table_.reset(new GcsResourceUsageBatchTable(store_client_));
profile_table_.reset(new GcsProfileTable(store_client_));
worker_table_.reset(new GcsWorkerTable(store_client_));
system_config_table_.reset(new GcsInternalConfigTable(store_client_));
}
explicit RedisGcsTableStorage(std::shared_ptr<RedisClient> redis_client)
: GcsTableStorage(std::make_shared<RedisStoreClient>(std::move(redis_client))) {}
};
/// \class InMemoryGcsTableStorage
@ -410,24 +416,8 @@ class RedisGcsTableStorage : public GcsTableStorage {
/// that uses memory as storage.
class InMemoryGcsTableStorage : public GcsTableStorage {
public:
explicit InMemoryGcsTableStorage(instrumented_io_context &main_io_service) {
store_client_ = std::make_shared<InMemoryStoreClient>(main_io_service);
job_table_.reset(new GcsJobTable(store_client_));
actor_table_.reset(new GcsActorTable(store_client_));
placement_group_table_.reset(new GcsPlacementGroupTable(store_client_));
task_table_.reset(new GcsTaskTable(store_client_));
task_lease_table_.reset(new GcsTaskLeaseTable(store_client_));
task_reconstruction_table_.reset(new GcsTaskReconstructionTable(store_client_));
object_table_.reset(new GcsObjectTable(store_client_));
node_table_.reset(new GcsNodeTable(store_client_));
node_resource_table_.reset(new GcsNodeResourceTable(store_client_));
placement_group_schedule_table_.reset(
new GcsPlacementGroupScheduleTable(store_client_));
resource_usage_batch_table_.reset(new GcsResourceUsageBatchTable(store_client_));
profile_table_.reset(new GcsProfileTable(store_client_));
worker_table_.reset(new GcsWorkerTable(store_client_));
system_config_table_.reset(new GcsInternalConfigTable(store_client_));
}
explicit InMemoryGcsTableStorage(instrumented_io_context &main_io_service)
: GcsTableStorage(std::make_shared<InMemoryStoreClient>(main_io_service)) {}
};
} // namespace gcs

View file

@ -0,0 +1,173 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// clang-format off
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "mock/ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h"
#include "mock/ray/gcs/gcs_server/gcs_resource_manager.h"
#include "mock/ray/gcs/store_client/store_client.h"
#include "ray/gcs/test/gcs_test_util.h"
// clang-format on
using namespace ::testing;
using namespace ray;
using namespace ray::gcs;
namespace ray {
namespace gcs {
class GcsPlacementGroupManagerMockTest : public Test {
public:
void SetUp() override {
store_client_ = std::make_shared<MockStoreClient>();
gcs_table_storage_ = std::make_shared<GcsTableStorage>(store_client_);
gcs_placement_group_scheduler_ =
std::make_shared<MockGcsPlacementGroupSchedulerInterface>();
resource_manager_ =
std::make_shared<MockGcsResourceManager>(io_context_, nullptr, nullptr, true);
gcs_placement_group_manager_ = std::make_unique<GcsPlacementGroupManager>(
io_context_, gcs_placement_group_scheduler_, gcs_table_storage_,
*resource_manager_, [](auto &) { return ""; });
}
std::unique_ptr<GcsPlacementGroupManager> gcs_placement_group_manager_;
std::shared_ptr<MockGcsPlacementGroupSchedulerInterface> gcs_placement_group_scheduler_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<MockStoreClient> store_client_;
std::shared_ptr<GcsResourceManager> resource_manager_;
instrumented_io_context io_context_;
};
TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityReschedule) {
// Test priority works
// When return with reschedule, it should be given with the highest pri
auto req =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 1);
auto pg = std::make_shared<GcsPlacementGroup>(req, "");
auto cb = [](Status s) {};
PGSchedulingFailureCallback failure_callback;
PGSchedulingSuccessfulCallback success_callback;
StatusCallback put_cb;
EXPECT_CALL(*store_client_, AsyncPut(_, _, _, _))
.WillOnce(DoAll(SaveArg<3>(&put_cb), Return(Status::OK())));
EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_, _, _))
.WillOnce(DoAll(SaveArg<1>(&failure_callback), SaveArg<2>(&success_callback)));
auto now = absl::GetCurrentTimeNanos();
gcs_placement_group_manager_->RegisterPlacementGroup(pg, cb);
auto &pending_queue = gcs_placement_group_manager_->pending_placement_groups_;
ASSERT_EQ(1, pending_queue.size());
ASSERT_LE(now, pending_queue.begin()->first);
ASSERT_GE(absl::GetCurrentTimeNanos(), pending_queue.begin()->first);
put_cb(Status::OK());
pg->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
failure_callback(pg, true);
ASSERT_EQ(1, pending_queue.size());
ASSERT_GE(0, pending_queue.begin()->first);
}
TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityFailed) {
// Test priority works
// When return with a failure, exp backoff should work
auto req =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 1);
auto pg = std::make_shared<GcsPlacementGroup>(req, "");
auto cb = [](Status s) {};
PGSchedulingFailureCallback failure_callback;
PGSchedulingSuccessfulCallback success_callback;
StatusCallback put_cb;
EXPECT_CALL(*store_client_, AsyncPut(_, _, _, _))
.WillOnce(DoAll(SaveArg<3>(&put_cb), Return(Status::OK())));
EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_, _, _))
.Times(2)
.WillRepeatedly(
DoAll(SaveArg<1>(&failure_callback), SaveArg<2>(&success_callback)));
auto now = absl::GetCurrentTimeNanos();
gcs_placement_group_manager_->RegisterPlacementGroup(pg, cb);
auto &pending_queue = gcs_placement_group_manager_->pending_placement_groups_;
ASSERT_EQ(1, pending_queue.size());
ASSERT_LE(now, pending_queue.begin()->first);
ASSERT_GE(absl::GetCurrentTimeNanos(), pending_queue.begin()->first);
put_cb(Status::OK());
pg->UpdateState(rpc::PlacementGroupTableData::PENDING);
now = absl::GetCurrentTimeNanos();
failure_callback(pg, true);
auto exp_backer = ExponentialBackOff(
1000000 * RayConfig::instance().gcs_create_placement_group_retry_min_interval_ms(),
RayConfig::instance().gcs_create_placement_group_retry_multiplier(),
1000000 * RayConfig::instance().gcs_create_placement_group_retry_max_interval_ms());
auto next = exp_backer.Next();
ASSERT_DOUBLE_EQ(
next,
1000000 * RayConfig::instance().gcs_create_placement_group_retry_min_interval_ms());
ASSERT_EQ(1, pending_queue.size());
auto rank = pending_queue.begin()->first;
ASSERT_LE(now + next, rank);
// ScheduleUnplacedBundles is not called here
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(1, pending_queue.size());
ASSERT_EQ(rank, pending_queue.begin()->first);
absl::SleepFor(absl::Nanoseconds(rank - absl::GetCurrentTimeNanos()));
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(0, pending_queue.size());
pg->UpdateState(rpc::PlacementGroupTableData::PENDING);
now = absl::GetCurrentTimeNanos();
failure_callback(pg, true);
next = RayConfig::instance().gcs_create_placement_group_retry_multiplier() * next;
ASSERT_EQ(1, pending_queue.size());
ASSERT_LE(now + next, pending_queue.begin()->first);
}
TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityOrder) {
// Test priority works
// Add two pgs
// Fail one and make sure it's scheduled later
auto req1 =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 1);
auto pg1 = std::make_shared<GcsPlacementGroup>(req1, "");
auto req2 =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 1);
auto pg2 = std::make_shared<GcsPlacementGroup>(req2, "");
auto cb = [](Status s) {};
PGSchedulingFailureCallback failure_callback;
PGSchedulingSuccessfulCallback success_callback;
StatusCallback put_cb;
EXPECT_CALL(*store_client_, AsyncPut(_, _, _, _))
.Times(2)
.WillRepeatedly(DoAll(SaveArg<3>(&put_cb), Return(Status::OK())));
EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_, _, _))
.Times(2)
.WillRepeatedly(
DoAll(SaveArg<1>(&failure_callback), SaveArg<2>(&success_callback)));
gcs_placement_group_manager_->RegisterPlacementGroup(pg1, cb);
gcs_placement_group_manager_->RegisterPlacementGroup(pg2, cb);
auto &pending_queue = gcs_placement_group_manager_->pending_placement_groups_;
ASSERT_EQ(2, pending_queue.size());
put_cb(Status::OK());
ASSERT_EQ(1, pending_queue.size());
// PG1 is scheduled first, so PG2 is in pending queue
ASSERT_EQ(pg2, pending_queue.begin()->second.second);
failure_callback(pg1, true);
ASSERT_EQ(2, pending_queue.size());
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
// PG2 is scheduled for the next, so PG1 is in pending queue
ASSERT_EQ(1, pending_queue.size());
ASSERT_EQ(pg1, pending_queue.begin()->second.second);
}
} // namespace gcs
} // namespace ray

View file

@ -136,6 +136,8 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
EXPECT_TRUE(WaitForCondition(condition, 10 * 1000));
}
ExponentialBackOff GetExpBackOff() { return ExponentialBackOff(0, 1); }
std::shared_ptr<MockPlacementGroupScheduler> mock_placement_group_scheduler_;
std::unique_ptr<gcs::GcsPlacementGroupManager> gcs_placement_group_manager_;
std::unordered_map<JobID, std::string> job_namespace_table_;
@ -197,7 +199,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) {
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group,
GetExpBackOff(), true);
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
mock_placement_group_scheduler_->placement_groups_.clear();
@ -261,7 +264,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) {
mock_placement_group_scheduler_->placement_groups_.pop_back();
// If the creation of placement group fails, it will be rescheduled after a short time.
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group,
GetExpBackOff(), true);
WaitForExpectedPgCount(1);
}
@ -276,7 +280,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group,
GetExpBackOff(), true);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING);
const auto &placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id,
@ -312,7 +317,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id,
[](const Status &status) {});
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group,
GetExpBackOff(), true);
// Make sure it is not rescheduled
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
@ -375,7 +381,6 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
placement_group->GetMutableBundle(0)->set_node_id(NodeID::FromRandom().Binary());
placement_group->GetMutableBundle(1)->set_node_id(NodeID::FromRandom().Binary());
mock_placement_group_scheduler_->placement_groups_.pop_back();
// If a node dies, we will set the bundles above it to be unplaced and reschedule the
// placement group. The placement group state is set to `RESCHEDULING` and will be
// scheduled first.
@ -401,7 +406,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group,
GetExpBackOff(), true);
WaitForExpectedPgCount(1);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(),
placement_group->GetPlacementGroupID());
@ -547,7 +553,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingCanceledWhenPgIsInfeasible) {
mock_placement_group_scheduler_->placement_groups_.clear();
// Mark it non-retryable.
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group, false);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group,
GetExpBackOff(), false);
// Schedule twice to make sure it will not be scheduled afterward.
gcs_placement_group_manager_->SchedulePendingPlacementGroups();

View file

@ -1,96 +0,0 @@
// Copyright 2020-2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/raylet/scheduling/fixed_point.h"
#include <cmath>
FixedPoint::FixedPoint(double d) { i_ = (uint64_t)(d * RESOURCE_UNIT_SCALING); }
FixedPoint::FixedPoint(int i) { i_ = (i * RESOURCE_UNIT_SCALING); }
FixedPoint::FixedPoint(uint32_t i) { i_ = (i * RESOURCE_UNIT_SCALING); }
FixedPoint::FixedPoint(int64_t i) : FixedPoint((double)i) {}
FixedPoint::FixedPoint(uint64_t i) : FixedPoint((double)i) {}
FixedPoint FixedPoint::operator+(FixedPoint const &ru) const {
FixedPoint res;
res.i_ = i_ + ru.i_;
return res;
}
FixedPoint FixedPoint::operator+=(FixedPoint const &ru) {
i_ += ru.i_;
return *this;
}
FixedPoint FixedPoint::operator-(FixedPoint const &ru) const {
FixedPoint res;
res.i_ = i_ - ru.i_;
return res;
}
FixedPoint FixedPoint::operator-=(FixedPoint const &ru) {
i_ -= ru.i_;
return *this;
}
FixedPoint FixedPoint::operator-() const {
FixedPoint res;
res.i_ = -i_;
return res;
}
FixedPoint FixedPoint::operator+(double const d) const {
FixedPoint res;
res.i_ = i_ + (int64_t)(d * RESOURCE_UNIT_SCALING);
return res;
}
FixedPoint FixedPoint::operator-(double const d) const {
FixedPoint res;
res.i_ = i_ - (int64_t)(d * RESOURCE_UNIT_SCALING);
return res;
}
FixedPoint FixedPoint::operator=(double const d) {
i_ = (int64_t)(d * RESOURCE_UNIT_SCALING);
return *this;
}
FixedPoint FixedPoint::operator+=(double const d) {
i_ += (int64_t)(d * RESOURCE_UNIT_SCALING);
return *this;
}
FixedPoint FixedPoint::operator+=(int64_t const ru) {
*this += (double)ru;
return *this;
}
bool FixedPoint::operator<(FixedPoint const &ru1) const { return (i_ < ru1.i_); };
bool FixedPoint::operator>(FixedPoint const &ru1) const { return (i_ > ru1.i_); };
bool FixedPoint::operator<=(FixedPoint const &ru1) const { return (i_ <= ru1.i_); };
bool FixedPoint::operator>=(FixedPoint const &ru1) const { return (i_ >= ru1.i_); };
bool FixedPoint::operator==(FixedPoint const &ru1) const { return (i_ == ru1.i_); };
bool FixedPoint::operator!=(FixedPoint const &ru1) const { return (i_ != ru1.i_); };
std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1) {
out << ru1.i_;
return out;
}
double FixedPoint::Double() const { return round(i_) / RESOURCE_UNIT_SCALING; };

View file

@ -14,6 +14,7 @@
#pragma once
#include <cmath>
#include <cstdint>
#include <iostream>
@ -25,41 +26,85 @@ class FixedPoint {
int64_t i_ = 0;
public:
FixedPoint() = default;
FixedPoint(double d);
FixedPoint(int i);
FixedPoint(uint32_t i);
FixedPoint(int64_t i);
FixedPoint(uint64_t i);
FixedPoint() : FixedPoint(0.0) {}
FixedPoint(double d) { i_ = (uint64_t)(d * RESOURCE_UNIT_SCALING); } // NOLINT
FixedPoint operator+(FixedPoint const &ru) const;
FixedPoint(int i) { i_ = (i * RESOURCE_UNIT_SCALING); } // NOLINT
FixedPoint operator+=(FixedPoint const &ru);
FixedPoint(uint32_t i) { i_ = (i * RESOURCE_UNIT_SCALING); } // NOLINT
FixedPoint operator-(FixedPoint const &ru) const;
FixedPoint(int64_t i) : FixedPoint((double)i) {} // NOLINT
FixedPoint operator-=(FixedPoint const &ru);
FixedPoint(uint64_t i) : FixedPoint((double)i) {} // NOLINT
FixedPoint operator-() const;
FixedPoint operator+(FixedPoint const &ru) const {
FixedPoint res;
res.i_ = i_ + ru.i_;
return res;
}
FixedPoint operator+(double const d) const;
FixedPoint &operator+=(FixedPoint const &ru) {
i_ += ru.i_;
return *this;
}
FixedPoint operator-(double const d) const;
FixedPoint operator-(FixedPoint const &ru) const {
FixedPoint res;
res.i_ = i_ - ru.i_;
return res;
}
FixedPoint operator=(double const d);
FixedPoint &operator-=(FixedPoint const &ru) {
i_ -= ru.i_;
return *this;
}
FixedPoint operator+=(double const d);
FixedPoint operator-() const {
FixedPoint res;
res.i_ = -i_;
return res;
}
FixedPoint operator+=(int64_t const ru);
FixedPoint operator+(double const d) const {
FixedPoint res;
res.i_ = i_ + static_cast<int64_t>(d * RESOURCE_UNIT_SCALING);
return res;
}
bool operator<(FixedPoint const &ru1) const;
bool operator>(FixedPoint const &ru1) const;
bool operator<=(FixedPoint const &ru1) const;
bool operator>=(FixedPoint const &ru1) const;
bool operator==(FixedPoint const &ru1) const;
bool operator!=(FixedPoint const &ru1) const;
FixedPoint operator-(double const d) const {
FixedPoint res;
res.i_ = i_ + static_cast<int64_t>(d * RESOURCE_UNIT_SCALING);
return res;
}
double Double() const;
FixedPoint operator=(double const d) {
i_ = static_cast<int64_t>(d * RESOURCE_UNIT_SCALING);
return *this;
}
FixedPoint operator+=(double const d) {
i_ += static_cast<int64_t>(d * RESOURCE_UNIT_SCALING);
return *this;
}
FixedPoint operator+=(int64_t const ru) {
*this += static_cast<double>(ru);
return *this;
}
bool operator<(FixedPoint const &ru1) const { return (i_ < ru1.i_); };
bool operator>(FixedPoint const &ru1) const { return (i_ > ru1.i_); };
bool operator<=(FixedPoint const &ru1) const { return (i_ <= ru1.i_); };
bool operator>=(FixedPoint const &ru1) const { return (i_ >= ru1.i_); };
bool operator==(FixedPoint const &ru1) const { return (i_ == ru1.i_); };
bool operator!=(FixedPoint const &ru1) const { return (i_ != ru1.i_); };
[[nodiscard]] double Double() const { return round(i_) / RESOURCE_UNIT_SCALING; };
friend std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1);
};
inline std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1) {
out << ru1.i_;
return out;
}

View file

@ -21,7 +21,6 @@
#include <sstream>
#include <string>
#include <thread>
#include <unordered_map>
#include "ray/util/logging.h"
@ -167,7 +166,7 @@ class InitShutdownRAII {
/// \param shutdown_func The shutdown function.
/// \param args The arguments for the init function.
template <class InitFunc, class... Args>
InitShutdownRAII(InitFunc init_func, ShutdownFunc shutdown_func, Args &&... args)
InitShutdownRAII(InitFunc init_func, ShutdownFunc shutdown_func, Args &&...args)
: shutdown_(shutdown_func) {
init_func(args...);
}
@ -259,7 +258,7 @@ template <typename T>
class ThreadPrivate {
public:
template <typename... Ts>
ThreadPrivate(Ts &&... ts) : t_(std::forward<Ts>(ts)...) {}
explicit ThreadPrivate(Ts &&...ts) : t_(std::forward<Ts>(ts)...) {}
T &operator*() {
ThreadCheck();
@ -312,4 +311,43 @@ class ThreadPrivate {
mutable std::mutex mutex_;
};
class ExponentialBackOff {
public:
ExponentialBackOff() = default;
ExponentialBackOff(const ExponentialBackOff &) = default;
ExponentialBackOff(ExponentialBackOff &&) = default;
ExponentialBackOff &operator=(const ExponentialBackOff &) = default;
ExponentialBackOff &operator=(ExponentialBackOff &&) = default;
/// Construct an exponential back off counter.
///
/// \param[in] initial_value The start value for this counter
/// \param[in] multiplier The multiplier for this counter.
/// \param[in] max_value The maximum value for this counter. By default it's
/// infinite double.
ExponentialBackOff(uint64_t initial_value, double multiplier,
uint64_t max_value = std::numeric_limits<uint64_t>::max())
: curr_value_(initial_value),
initial_value_(initial_value),
max_value_(max_value),
multiplier_(multiplier) {
RAY_CHECK(multiplier > 0.0) << "Multiplier must be greater than 0";
}
uint64_t Next() {
auto ret = curr_value_;
curr_value_ = curr_value_ * multiplier_;
curr_value_ = std::min(curr_value_, max_value_);
return ret;
}
void Reset() { curr_value_ = initial_value_; }
private:
uint64_t curr_value_;
uint64_t initial_value_;
uint64_t max_value_;
double multiplier_;
};
} // namespace ray

View file

@ -102,6 +102,23 @@ TEST(UtilTest, ParseCommandLineTest) {
ASSERT_EQ(ParseCommandLine(R"(x' a \b')", win32), ArgList({R"(x')", R"(a)", R"(\b')"}));
}
TEST(UtilTest, ExponentialBackOffTest) {
auto exp = ExponentialBackOff(1, 2, 9);
ASSERT_EQ(1, exp.Next());
ASSERT_EQ(2, exp.Next());
ASSERT_EQ(4, exp.Next());
ASSERT_EQ(8, exp.Next());
ASSERT_EQ(9, exp.Next());
ASSERT_EQ(9, exp.Next());
exp.Reset();
ASSERT_EQ(1, exp.Next());
ASSERT_EQ(2, exp.Next());
ASSERT_EQ(4, exp.Next());
ASSERT_EQ(8, exp.Next());
ASSERT_EQ(9, exp.Next());
ASSERT_EQ(9, exp.Next());
}
TEST(UtilTest, ParseURLTest) {
const std::string url = "http://abc?num_objects=9&offset=8388878&size=8388878";
auto parsed_url = *ParseURL(url);