[Core] PushManager for reliable broadcast (#11869)

This commit is contained in:
Eric Liang 2020-11-09 18:01:47 -08:00 committed by GitHub
parent 1999266bba
commit ee2da0cf45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 259 additions and 39 deletions

View file

@ -807,6 +807,18 @@ cc_test(
],
)
cc_test(
name = "push_manager_test",
srcs = [
"src/ray/object_manager/test/push_manager_test.cc",
],
copts = COPTS,
deps = [
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "reconstruction_policy_test",
srcs = ["src/ray/raylet/reconstruction_policy_test.cc"],

View file

@ -157,9 +157,8 @@ def test_actor_broadcast(ray_start_cluster_with_resource):
# Make sure that each object was transferred a reasonable number of times.
for x_id in object_refs:
relevant_events = [
event for event in transfer_events
if event["cat"] == "transfer_send"
and event["args"][0] == x_id.hex() and event["args"][2] == 1
event for event in transfer_events if
event["cat"] == "transfer_send" and event["args"][0] == x_id.hex()
]
# NOTE: Each event currently appears twice because we duplicate the

View file

@ -193,7 +193,12 @@ RAY_CONFIG(int, object_manager_repeated_push_delay_ms, 60000)
/// In the object manager, no single thread is permitted to transfer more
/// data than what is specified by the chunk size unless the number of object
/// chunks exceeds the number of available sending threads.
RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 1000000)
/// NOTE(ekl): this has been raised to lower broadcast overheads.
RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 5 * 1024 * 1024)
/// The maximum number of outbound bytes to allow to be outstanding. This avoids
/// excessive memory usage during object broadcast to many receivers.
RAY_CONFIG(uint64_t, object_manager_max_bytes_in_flight, 2L * 1024 * 1024 * 1024)
/// Number of workers per Python worker process
RAY_CONFIG(int, num_workers_per_process_python, 1)

View file

@ -70,6 +70,10 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_
RAY_CHECK(config_.rpc_service_threads_number > 0);
main_service_ = &main_service;
push_manager_.reset(new PushManager(/* max_chunks_in_flight= */ std::max(
static_cast<int64_t>(1L),
static_cast<int64_t>(config_.max_bytes_in_flight / config_.object_chunk_size))));
if (plasma::plasma_store_runner) {
store_notification_ = std::make_shared<ObjectStoreNotificationManager>(main_service);
plasma::plasma_store_runner->SetNotificationListener(store_notification_);
@ -400,6 +404,56 @@ void ObjectManager::HandleReceiveFinished(const ObjectID &object_id,
profile_events_.push_back(profile_event);
}
void PushManager::StartPush(const UniqueID &push_id, int64_t num_chunks,
std::function<void(int64_t)> send_chunk_fn) {
RAY_LOG(DEBUG) << "Start push for " << push_id << ", num chunks " << num_chunks;
RAY_CHECK(num_chunks > 0);
push_info_[push_id] = std::make_pair(num_chunks, send_chunk_fn);
next_chunk_id_[push_id] = 0;
chunks_remaining_ += num_chunks;
ScheduleRemainingPushes();
RAY_CHECK(push_info_.size() == next_chunk_id_.size());
}
void PushManager::OnChunkComplete() {
chunks_in_flight_ -= 1;
ScheduleRemainingPushes();
}
void PushManager::ScheduleRemainingPushes() {
// Loop over all active pushes for approximate round-robin prioritization.
// TODO(ekl) this isn't the best implementation of round robin, we should
// consider tracking the number of chunks active per-push and balancing those.
while (chunks_in_flight_ < max_chunks_in_flight_ && push_info_.size() > 0) {
// Loop over each active push and try to send another chunk.
auto it = push_info_.begin();
while (it != push_info_.end() && chunks_in_flight_ < max_chunks_in_flight_) {
auto push_id = it->first;
auto max_chunks = it->second.first;
auto send_chunk_fn = it->second.second;
// Send the next chunk for this push.
send_chunk_fn(next_chunk_id_[push_id]);
chunks_in_flight_ += 1;
chunks_remaining_ -= 1;
RAY_LOG(DEBUG) << "Sending chunk " << next_chunk_id_[push_id] << " of "
<< max_chunks << " for push " << push_id << ", chunks in flight "
<< NumChunksInFlight() << " / " << max_chunks_in_flight_
<< " max, remaining chunks: " << NumChunksRemaining();
// It is the last chunk and we don't need to track it any more.
if (++next_chunk_id_[push_id] >= max_chunks) {
next_chunk_id_.erase(push_id);
push_info_.erase(it++);
RAY_LOG(DEBUG) << "Push for " << push_id
<< " completed, remaining: " << NumPushesInFlight();
} else {
it++;
}
}
}
}
void ObjectManager::Push(const ObjectID &object_id, const NodeID &client_id) {
RAY_LOG(DEBUG) << "Push on " << self_node_id_ << " to " << client_id << " of object "
<< object_id;
@ -478,17 +532,11 @@ void ObjectManager::Push(const ObjectID &object_id, const NodeID &client_id) {
<< ", total data size: " << data_size;
UniqueID push_id = UniqueID::FromRandom();
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
rpc_service_.post([this, push_id, object_id, owner_address, client_id, data_size,
metadata_size, chunk_index, rpc_client]() {
auto st = SendObjectChunk(push_id, object_id, owner_address, client_id, data_size,
metadata_size, chunk_index, rpc_client);
if (!st.ok()) {
RAY_LOG(WARNING) << "Send object " << object_id << " chunk failed due to "
<< st.message() << ", chunk index " << chunk_index;
}
push_manager_->StartPush(push_id, num_chunks, [=](int64_t chunk_id) {
SendObjectChunk(push_id, object_id, owner_address, client_id, data_size,
metadata_size, chunk_id, rpc_client,
[=](const Status &status) { push_manager_->OnChunkComplete(); });
});
}
} else {
// Push is best effort, so do nothing here.
RAY_LOG(ERROR)
@ -496,10 +544,12 @@ void ObjectManager::Push(const ObjectID &object_id, const NodeID &client_id) {
}
}
ray::Status ObjectManager::SendObjectChunk(
const UniqueID &push_id, const ObjectID &object_id, const rpc::Address &owner_address,
const NodeID &client_id, uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index, std::shared_ptr<rpc::ObjectManagerClient> rpc_client) {
void ObjectManager::SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id,
const rpc::Address &owner_address,
const NodeID &client_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
std::shared_ptr<rpc::ObjectManagerClient> rpc_client,
std::function<void(const Status &)> on_complete) {
double start_time = absl::GetCurrentTimeNanos() / 1e9;
rpc::PushRequest push_request;
// Set request header
@ -522,30 +572,31 @@ ray::Status ObjectManager::SendObjectChunk(
if (!chunk_status.second.ok()) {
RAY_LOG(WARNING) << "Attempting to push object " << object_id
<< " which is not local. It may have been evicted.";
RAY_RETURN_NOT_OK(status);
on_complete(status);
return;
}
push_request.set_data(chunk_info.data, chunk_info.buffer_length);
// record the time cost between send chunk and receive reply
rpc::ClientCallback<rpc::PushReply> callback = [this, start_time, object_id, client_id,
chunk_index](
const Status &status,
const rpc::PushReply &reply) {
rpc::ClientCallback<rpc::PushReply> callback =
[this, start_time, object_id, client_id, chunk_index, owner_address, rpc_client,
on_complete](const Status &status, const rpc::PushReply &reply) {
// TODO: Just print warning here, should we try to resend this chunk?
if (!status.ok()) {
RAY_LOG(WARNING) << "Send object " << object_id << " chunk to client " << client_id
<< " failed due to" << status.message()
RAY_LOG(WARNING) << "Send object " << object_id << " chunk to client "
<< client_id << " failed due to" << status.message()
<< ", chunk index: " << chunk_index;
}
double end_time = absl::GetCurrentTimeNanos() / 1e9;
HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time, status);
HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
on_complete(status);
};
rpc_client->Push(push_request, callback);
// Do this regardless of whether it failed or succeeded.
buffer_pool_.ReleaseGetChunk(object_id, chunk_info.chunk_index);
return Status::OK();
}
void ObjectManager::CancelPull(const ObjectID &object_id) {
@ -920,6 +971,7 @@ std::string ObjectManager::DebugString() const {
result << "\n- num unfulfilled push requests: " << unfulfilled_push_requests_.size();
result << "\n- num pull requests: " << pull_requests_.size();
result << "\n- num buffered profile events: " << profile_events_.size();
result << "\n" << push_manager_->DebugString();
result << "\n" << object_directory_->DebugString();
result << "\n" << store_notification_->DebugString();
result << "\n" << buffer_pool_.DebugString();

View file

@ -53,6 +53,8 @@ struct ObjectManagerConfig {
unsigned int pull_timeout_ms;
/// Object chunk size, in bytes
uint64_t object_chunk_size;
/// Max object push bytes in flight.
uint64_t max_bytes_in_flight;
/// The store socket name.
std::string store_socket_name;
/// The time in milliseconds to wait until a Push request
@ -97,6 +99,75 @@ class ObjectManagerInterface {
virtual ~ObjectManagerInterface(){};
};
class PushManager {
public:
/// Manages rate limiting of outbound object pushes.
/// Create a push manager.
///
/// \param max_chunks_in_flight Max number of chunks allowed to be in flight
/// from this PushManager (this raylet).
PushManager(int64_t max_chunks_in_flight)
: max_chunks_in_flight_(max_chunks_in_flight) {
RAY_CHECK(max_chunks_in_flight_ > 0) << max_chunks_in_flight_;
};
/// Start pushing an object subject to max chunks in flight limit.
///
/// \param push_id Unique identifier for this push.
/// \param num_chunks The total number of chunks to send.
/// \param send_chunk_fn This function will be called with args 0...{num_chunks-1}.
/// The caller promises to call PushManager::OnChunkComplete()
/// once a call to send_chunk_fn finishes.
void StartPush(const UniqueID &push_id, int64_t num_chunks,
std::function<void(int64_t)> send_chunk_fn);
/// Called every time a chunk completes to trigger additional sends.
/// TODO(ekl) maybe we should cancel the entire push on error.
void OnChunkComplete();
/// Return the number of chunks currently in flight. For testing only.
int64_t NumChunksInFlight() const { return chunks_in_flight_; };
/// Return the number of chunks remaining. For testing only.
int64_t NumChunksRemaining() const { return chunks_remaining_; };
/// Return the number of pushes currently in flight. For testing only.
int64_t NumPushesInFlight() const { return push_info_.size(); };
std::string DebugString() const {
std::stringstream result;
result << "PushManager:";
result << "\n- num pushes in flight: " << NumPushesInFlight();
result << "\n- num chunks in flight: " << NumChunksInFlight();
result << "\n- num chunks remaining: " << NumChunksRemaining();
result << "\n- max chunks allowed: " << max_chunks_in_flight_;
return result.str();
}
private:
/// Called on completion events to trigger additional pushes.
void ScheduleRemainingPushes();
/// Info about the pushed object: (num_chunks total, chunk_send_fn).
typedef std::pair<int64_t, std::function<void(int64_t)>> PushInfo;
/// Max number of chunks in flight allowed.
const int64_t max_chunks_in_flight_;
/// Running count of chunks remaining to send.
int64_t chunks_remaining_ = 0;
/// Running count of chunks in flight, used to limit progress of in_flight_pushes_.
int64_t chunks_in_flight_ = 0;
/// Tracks all pushes with chunk transfers in flight.
absl::flat_hash_map<UniqueID, PushInfo> push_info_;
/// Tracks progress of in flight pushes.
absl::flat_hash_map<UniqueID, int64_t> next_chunk_id_;
};
// TODO(hme): Add success/failure callbacks for push and pull.
class ObjectManager : public ObjectManagerInterface,
public rpc::ObjectManagerServiceHandler {
@ -141,15 +212,17 @@ class ObjectManager : public ObjectManagerInterface,
/// \param push_id Unique push id to indicate this push request
/// \param object_id Object id
/// \param owner_address The address of the object's owner
/// \param client_id The id of the receiver.
/// \param data_size Data size
/// \param metadata_size Metadata size
/// \param chunk_index Chunk index of this object chunk, start with 0
/// \param rpc_client Rpc client used to send message to remote object manager
ray::Status SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id,
/// \param on_complete Callback to run on completion.
void SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id,
const rpc::Address &owner_address, const NodeID &client_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
std::shared_ptr<rpc::ObjectManagerClient> rpc_client);
uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index,
std::shared_ptr<rpc::ObjectManagerClient> rpc_client,
std::function<void(const Status &)> on_complete);
/// Receive object chunk from remote object manager, small object may contain one chunk
///
@ -474,6 +547,9 @@ class ObjectManager : public ObjectManagerInterface,
const RestoreSpilledObjectCallback restore_spilled_object_;
/// Object push manager.
std::unique_ptr<PushManager> push_manager_;
/// Running sum of the amount of memory used in the object store.
int64_t used_memory_ = 0;
};

View file

@ -0,0 +1,74 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/object_manager/object_manager.h"
#include "gtest/gtest.h"
#include "ray/common/test_util.h"
namespace ray {
TEST(TestPushManager, TestSingleTransfer) {
std::vector<int> results;
results.reserve(10);
UniqueID push_id = UniqueID::FromRandom();
PushManager pm(5);
pm.StartPush(push_id, 10, [&](int64_t chunk_id) { results[chunk_id] = 1; });
ASSERT_EQ(pm.NumChunksInFlight(), 5);
ASSERT_EQ(pm.NumChunksRemaining(), 5);
ASSERT_EQ(pm.NumPushesInFlight(), 1);
for (int i = 0; i < 10; i++) {
pm.OnChunkComplete();
}
ASSERT_EQ(pm.NumChunksInFlight(), 0);
ASSERT_EQ(pm.NumChunksRemaining(), 0);
ASSERT_EQ(pm.NumPushesInFlight(), 0);
for (int i = 0; i < 10; i++) {
ASSERT_EQ(results[i], 1);
}
}
TEST(TestPushManager, TestMultipleTransfers) {
std::vector<int> results1;
results1.reserve(10);
std::vector<int> results2;
results2.reserve(10);
UniqueID push1 = UniqueID::FromRandom();
UniqueID push2 = UniqueID::FromRandom();
PushManager pm(5);
pm.StartPush(push1, 10, [&](int64_t chunk_id) { results1[chunk_id] = 1; });
pm.StartPush(push2, 10, [&](int64_t chunk_id) { results2[chunk_id] = 2; });
ASSERT_EQ(pm.NumChunksInFlight(), 5);
ASSERT_EQ(pm.NumChunksRemaining(), 15);
ASSERT_EQ(pm.NumPushesInFlight(), 2);
for (int i = 0; i < 20; i++) {
pm.OnChunkComplete();
}
ASSERT_EQ(pm.NumChunksInFlight(), 0);
ASSERT_EQ(pm.NumChunksRemaining(), 0);
ASSERT_EQ(pm.NumPushesInFlight(), 0);
for (int i = 0; i < 10; i++) {
ASSERT_EQ(results1[i], 1);
}
for (int i = 0; i < 10; i++) {
ASSERT_EQ(results2[i], 2);
}
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View file

@ -236,6 +236,8 @@ int main(int argc, char *argv[]) {
object_manager_config.push_timeout_ms =
RayConfig::instance().object_manager_push_timeout_ms();
object_manager_config.object_store_memory = object_store_memory;
object_manager_config.max_bytes_in_flight =
RayConfig::instance().object_manager_max_bytes_in_flight();
object_manager_config.plasma_directory = plasma_directory;
object_manager_config.huge_pages = huge_pages;