[Core] Revert "[Core] Batch PinObjectIDs requests from Raylet client (#24322)" and "[Core] rename PinObjectIDs to PinObjectID (#24451)" (#24741)

we noticed performance regression for nightly test shuffle_1tb_5000_partitions. concretely the test previously takes 1h10m to finish but now it takes more than 2h30minutes.

after investigation we believe mostly likely 5a82640 caused the regression.

here is the run before 5a82640: https://console.anyscale.com/o/anyscale-internal/projects/prj_SVFGM5yBqK6DHCfLtRMryXHM/clusters/ses_1ejykCYq9BnkC5v8ZJjrqc2b?command-history-section=command_history
here is the run after 5a82640:
https://console.anyscale.com/o/anyscale-internal/projects/prj_SVFGM5yBqK6DHCfLtRMryXHM/clusters/ses_Lr5N8jVRdHCWJWYA2SRaUkzZ?command-history-section=command_history
This commit is contained in:
Chen Shen 2022-05-12 16:17:40 -07:00 committed by GitHub
parent 0a0c52e351
commit 02042e1305
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 212 additions and 314 deletions

View file

@ -85,7 +85,7 @@ of what the event stats look like:
CoreWorkerService.grpc_client.GetObjectLocationsOwner - 51333 total (0 active), CPU time: mean = 25.166 us, total = 1.292 s
ObjectManager.ObjectDeleted - 43188 total (0 active), CPU time: mean = 26.017 us, total = 1.124 s
CoreWorkerService.grpc_client.RemoveObjectLocationOwner - 43177 total (0 active), CPU time: mean = 2.368 us, total = 102.252 ms
NodeManagerService.grpc_server.PinObjectID - 40000 total (0 active), CPU time: mean = 194.860 us, total = 7.794 s
NodeManagerService.grpc_server.PinObjectIDs - 40000 total (0 active), CPU time: mean = 194.860 us, total = 7.794 s
Callback latency injection
--------------------------

View file

@ -104,9 +104,9 @@ class MockNodeManager : public NodeManager {
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandlePinObjectID,
(const rpc::PinObjectIDRequest &request,
rpc::PinObjectIDReply *reply,
HandlePinObjectIDs,
(const rpc::PinObjectIDsRequest &request,
rpc::PinObjectIDsReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,

View file

@ -17,10 +17,10 @@ namespace ray {
class MockPinObjectsInterface : public PinObjectsInterface {
public:
MOCK_METHOD(void,
PinObjectID,
PinObjectIDs,
(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback),
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback),
(override));
};
@ -189,10 +189,10 @@ class MockRayletClientInterface : public RayletClientInterface {
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback),
(override));
MOCK_METHOD(void,
PinObjectID,
PinObjectIDs,
(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback),
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback),
(override));
MOCK_METHOD(void,
GetSystemConfig,

View file

@ -896,21 +896,15 @@ Status CoreWorker::PutInLocalPlasmaStore(const RayObject &object,
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning put object " << object_id;
local_raylet_client_->PinObjectID(
local_raylet_client_->PinObjectIDs(
rpc_address_,
object_id,
[this, object_id](const Status &status, const rpc::PinObjectIDReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the object " << object_id
<< ". This object may get evicted while there are still "
"references to it: "
<< status;
}
{object_id},
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
if (auto s = plasma_store_provider_->Release(object_id); !s.ok()) {
if (!plasma_store_provider_->Release(object_id).ok()) {
RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id
<< "), might cause a leak in plasma: " << s;
<< "), might cause a leak in plasma.";
}
});
} else {
@ -1056,21 +1050,15 @@ Status CoreWorker::SealExisting(const ObjectID &object_id,
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning sealed object " << object_id;
local_raylet_client_->PinObjectID(
local_raylet_client_->PinObjectIDs(
owner_address != nullptr ? *owner_address : rpc_address_,
object_id,
[this, object_id](const Status &status, const rpc::PinObjectIDReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the object " << object_id
<< ". This object may get evicted while there are still "
"references to it: "
<< status;
}
{object_id},
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
if (auto s = plasma_store_provider_->Release(object_id); !s.ok()) {
if (!plasma_store_provider_->Release(object_id).ok()) {
RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id
<< "), might cause a leak in plasma: " << s;
<< "), might cause a leak in plasma.";
}
});
} else {
@ -2450,17 +2438,16 @@ bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id,
// Asynchronously ask the raylet to pin the object. Note that this can fail
// if the raylet fails. We expect the owner of the object to handle that
// case (e.g., by detecting the raylet failure and storing an error).
local_raylet_client_->PinObjectID(
local_raylet_client_->PinObjectIDs(
owner_address,
return_id,
{return_id},
[return_id, pinned_return_object](const Status &status,
const rpc::PinObjectIDReply &reply) {
const rpc::PinObjectIDsReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the task return object "
<< return_id
<< ". This object may get evicted while there are still "
"references to it: "
<< status;
"references to it.";
}
});
return true;

View file

@ -96,7 +96,7 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
const rpc::Address &raylet_address,
const std::vector<rpc::Address> &other_locations) {
// If a copy still exists, pin the object by sending a
// PinObjectID RPC.
// PinObjectIDs RPC.
const auto node_id = NodeID::FromBinary(raylet_address.raylet_id());
RAY_LOG(DEBUG) << "Trying to pin copy of lost object " << object_id << " at node "
<< node_id;
@ -118,23 +118,23 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
client = client_it->second;
}
client->PinObjectID(rpc_address_,
object_id,
[this, object_id, other_locations, node_id](
const Status &status, const rpc::PinObjectIDReply &reply) {
if (status.ok()) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_->Put(
RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
reference_counter_->UpdateObjectPinnedAtRaylet(object_id,
node_id);
} else {
RAY_LOG(INFO) << "Error pinning new copy of lost object "
<< object_id << ", trying again";
PinOrReconstructObject(object_id, other_locations);
}
});
client->PinObjectIDs(rpc_address_,
{object_id},
[this, object_id, other_locations, node_id](
const Status &status, const rpc::PinObjectIDsReply &reply) {
if (status.ok()) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_->Put(
RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
reference_counter_->UpdateObjectPinnedAtRaylet(object_id,
node_id);
} else {
RAY_LOG(INFO) << "Error pinning new copy of lost object "
<< object_id << ", trying again";
PinOrReconstructObject(object_id, other_locations);
}
});
}
void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {

View file

@ -58,23 +58,24 @@ class MockTaskResubmitter : public TaskResubmissionInterface {
class MockRayletClient : public PinObjectsInterface {
public:
void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) override {
RAY_LOG(INFO) << "PinObjectID " << object_id.Hex();
callbacks.push_back(std::move(callback));
void PinObjectIDs(
const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const rpc::ClientCallback<rpc::PinObjectIDsReply> &callback) override {
RAY_LOG(INFO) << "PinObjectIDs " << object_ids.size();
callbacks.push_back(callback);
}
size_t Flush() {
size_t flushed = callbacks.size();
for (const auto &callback : callbacks) {
callback(Status::OK(), rpc::PinObjectIDReply());
callback(Status::OK(), rpc::PinObjectIDsReply());
}
callbacks.clear();
return flushed;
}
std::list<rpc::ClientCallback<rpc::PinObjectIDReply>> callbacks = {};
std::list<rpc::ClientCallback<rpc::PinObjectIDsReply>> callbacks = {};
};
class MockObjectDirectory {

View file

@ -459,7 +459,7 @@ class NodeResourceInfoAccessor {
/// server.
virtual void AsyncResubscribe();
/// Report resource usage of a node to GCS asynchronously. Only used in tests.
/// Report resource usage of a node to GCS asynchronously.
///
/// \param data_ptr The data that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.

View file

@ -254,9 +254,10 @@ struct GcsServerMocker {
}
/// PinObjectsInterface
void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) override {}
void PinObjectIDs(
const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) override {}
/// DependencyWaiterInterface
ray::Status WaitForDirectActorCallArgs(

View file

@ -482,8 +482,8 @@ Status PlasmaClient::Impl::GetBuffers(
// If we get here, then the objects aren't all currently in use by this
// client, so we need to send a request to the plasma store.
RAY_RETURN_NOT_OK(
SendGetRequest(store_conn_, object_ids, num_objects, timeout_ms, is_from_worker));
RAY_RETURN_NOT_OK(SendGetRequest(
store_conn_, &object_ids[0], num_objects, timeout_ms, is_from_worker));
std::vector<uint8_t> buffer;
RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaGetReply, &buffer));
std::vector<ObjectID> received_object_ids(num_objects);
@ -560,12 +560,8 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID> &object_ids,
};
const size_t num_objects = object_ids.size();
*out = std::vector<ObjectBuffer>(num_objects);
return GetBuffers(object_ids.data(),
num_objects,
timeout_ms,
wrap_buffer,
out->data(),
is_from_worker);
return GetBuffers(
&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0], is_from_worker);
}
Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID &object_id) {

View file

@ -339,7 +339,7 @@ class PullManager {
int64_t num_active_bundles_ = 0;
/// Callback to pin plasma objects.
std::function<std::unique_ptr<RayObject>(const ObjectID &object_id)> pin_object_;
std::function<std::unique_ptr<RayObject>(const ObjectID &object_ids)> pin_object_;
/// The last time OOM was reported. Track this so we don't spam warnings when
/// the object store is full.

View file

@ -162,14 +162,15 @@ message CancelWorkerLeaseReply {
bool success = 1;
}
message PinObjectIDRequest {
message PinObjectIDsRequest {
// Address of the owner to ask when to unpin the objects.
Address owner_address = 1;
// ObjectIDs to pin.
repeated bytes object_ids = 2;
}
message PinObjectIDReply {}
message PinObjectIDsReply {
}
message GetNodeStatsRequest {
// Whether to include memory stats. This could be large since it includes
@ -353,7 +354,7 @@ service NodeManagerService {
// lease request was not yet granted.
rpc CancelWorkerLease(CancelWorkerLeaseRequest) returns (CancelWorkerLeaseReply);
// Pin the provided object IDs.
rpc PinObjectID(PinObjectIDRequest) returns (PinObjectIDReply);
rpc PinObjectIDs(PinObjectIDsRequest) returns (PinObjectIDsReply);
// Get the current node stats.
rpc GetNodeStats(GetNodeStatsRequest) returns (GetNodeStatsReply);
// Trigger garbage collection in all workers across the cluster.

View file

@ -268,7 +268,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
std::vector<ObjectID> object_ids = {object_id};
std::vector<std::unique_ptr<RayObject>> results;
std::unique_ptr<RayObject> result;
if (GetObjectsFromPlasma(object_ids, &results).ok() && results.size() > 0) {
if (GetObjectsFromPlasma(object_ids, &results) && results.size() > 0) {
result = std::move(results[0]);
}
return result;
@ -387,7 +387,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
leased_workers_,
[this](const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
return GetObjectsFromPlasma(object_ids, results).ok();
return GetObjectsFromPlasma(object_ids, results);
},
max_task_args_memory);
cluster_task_manager_ = std::make_shared<ClusterTaskManager>(
@ -2309,21 +2309,23 @@ std::string compact_tag_string(const opencensus::stats::ViewDescriptor &view,
return result.str();
}
Status NodeManager::GetObjectsFromPlasma(
const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
bool NodeManager::GetObjectsFromPlasma(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results) {
// Pin the objects in plasma by getting them and holding a reference to
// the returned buffer.
// NOTE: the caller must ensure that the objects already exist in plasma before
// sending a PinObjectID request.
// sending a PinObjectIDs request.
std::vector<plasma::ObjectBuffer> plasma_results;
// TODO(swang): This `Get` has a timeout of 0, so the plasma store will not
// block when serving the request. However, if the plasma store is under
// heavy load, then this request can still block the NodeManager event loop
// since we must wait for the plasma store's reply. We should consider using
// an `AsyncGet` instead.
RAY_RETURN_NOT_OK(store_client_.Get(
object_ids, /*timeout_ms=*/0, &plasma_results, /*is_from_worker=*/false));
if (!store_client_
.Get(object_ids, /*timeout_ms=*/0, &plasma_results, /*is_from_worker=*/false)
.ok()) {
return false;
}
for (const auto &plasma_result : plasma_results) {
if (plasma_result.data == nullptr) {
@ -2333,12 +2335,12 @@ Status NodeManager::GetObjectsFromPlasma(
new RayObject(plasma_result.data, plasma_result.metadata, {})));
}
}
return Status::OK();
return true;
}
void NodeManager::HandlePinObjectID(const rpc::PinObjectIDRequest &request,
rpc::PinObjectIDReply *reply,
rpc::SendReplyCallback send_reply_callback) {
void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
rpc::PinObjectIDsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
std::vector<ObjectID> object_ids;
object_ids.reserve(request.object_ids_size());
const auto &owner_address = request.owner_address();
@ -2346,12 +2348,12 @@ void NodeManager::HandlePinObjectID(const rpc::PinObjectIDRequest &request,
object_ids.push_back(ObjectID::FromBinary(object_id_binary));
}
std::vector<std::unique_ptr<RayObject>> results;
if (auto s = GetObjectsFromPlasma(object_ids, &results); !s.ok()) {
if (!GetObjectsFromPlasma(object_ids, &results)) {
RAY_LOG(WARNING)
<< "Failed to get objects that should have been in the object store. These "
"objects may have been evicted while there are still references in scope: "
<< s;
send_reply_callback(s, nullptr, nullptr);
"objects may have been evicted while there are still references in scope.";
// TODO(suquark): Maybe "Status::ObjectNotFound" is more accurate here.
send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr);
return;
}
// Wait for the object to be freed by the owner, which keeps the ref count.

View file

@ -175,7 +175,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// Subscribe to the relevant GCS tables and set up handlers.
///
/// \return Status indicating whether this was done successfully or not.
Status RegisterGcs();
ray::Status RegisterGcs();
/// Get initial node manager configuration.
const NodeManagerConfig &GetInitialConfig() const;
@ -226,7 +226,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \param include_task_info If true, it requires every task metadata information
/// from all workers.
void QueryAllWorkerStates(
const std::function<void(const Status &status,
const std::function<void(const ray::Status &status,
const rpc::GetCoreWorkerStatsReply &r)> &on_replied,
rpc::SendReplyCallback &send_reply_callback,
bool include_memory_info,
@ -551,10 +551,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `PinObjectID` request.
void HandlePinObjectID(const rpc::PinObjectIDRequest &request,
rpc::PinObjectIDReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `PinObjectIDs` request.
void HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
rpc::PinObjectIDsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `NodeStats` request.
void HandleGetNodeStats(const rpc::GetNodeStatsRequest &request,
@ -632,9 +632,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \param[in] object_ids The objects to get.
/// \param[out] results The pointers to objects stored in
/// plasma.
/// \return Status of the request.
Status GetObjectsFromPlasma(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results);
/// \return Whether the request was successful.
bool GetObjectsFromPlasma(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> *results);
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in

View file

@ -48,12 +48,11 @@ AddressesToFlatbuffer(flatbuffers::FlatBufferBuilder &fbb,
} // namespace
namespace ray {
namespace raylet {
RayletConnection::RayletConnection(instrumented_io_context &io_service,
const std::string &raylet_socket,
int num_retries,
int64_t timeout) {
raylet::RayletConnection::RayletConnection(instrumented_io_context &io_service,
const std::string &raylet_socket,
int num_retries,
int64_t timeout) {
local_stream_socket socket(io_service);
Status s = ConnectSocketRetry(socket, raylet_socket, num_retries, timeout);
// If we could not connect to the socket, exit.
@ -63,8 +62,8 @@ RayletConnection::RayletConnection(instrumented_io_context &io_service,
conn_ = ServerConnection::Create(std::move(socket));
}
Status RayletConnection::WriteMessage(MessageType type,
flatbuffers::FlatBufferBuilder *fbb) {
Status raylet::RayletConnection::WriteMessage(MessageType type,
flatbuffers::FlatBufferBuilder *fbb) {
std::unique_lock<std::mutex> guard(write_mutex_);
int64_t length = fbb ? fbb->GetSize() : 0;
uint8_t *bytes = fbb ? fbb->GetBufferPointer() : nullptr;
@ -73,10 +72,10 @@ Status RayletConnection::WriteMessage(MessageType type,
return status;
}
Status RayletConnection::AtomicRequestReply(MessageType request_type,
MessageType reply_type,
std::vector<uint8_t> *reply_message,
flatbuffers::FlatBufferBuilder *fbb) {
Status raylet::RayletConnection::AtomicRequestReply(MessageType request_type,
MessageType reply_type,
std::vector<uint8_t> *reply_message,
flatbuffers::FlatBufferBuilder *fbb) {
std::unique_lock<std::mutex> guard(mutex_);
RAY_RETURN_NOT_OK(WriteMessage(request_type, fbb));
auto status = conn_->ReadMessage(static_cast<int64_t>(reply_type), reply_message);
@ -84,7 +83,7 @@ Status RayletConnection::AtomicRequestReply(MessageType request_type,
return status;
}
void RayletConnection::ShutdownIfLocalRayletDisconnected(const Status &status) {
void raylet::RayletConnection::ShutdownIfLocalRayletDisconnected(const Status &status) {
if (!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) {
RAY_LOG(WARNING) << "The connection is failed because the local raylet has been "
"dead. Terminate the process. Status: "
@ -94,28 +93,27 @@ void RayletConnection::ShutdownIfLocalRayletDisconnected(const Status &status) {
}
}
RayletClient::RayletClient(std::shared_ptr<rpc::NodeManagerWorkerClient> grpc_client)
: grpc_client_(std::move(grpc_client)) {
pin_batcher_ = std::make_unique<PinBatcher>(grpc_client_);
}
raylet::RayletClient::RayletClient(
std::shared_ptr<rpc::NodeManagerWorkerClient> grpc_client)
: grpc_client_(std::move(grpc_client)) {}
RayletClient::RayletClient(instrumented_io_context &io_service,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket,
const WorkerID &worker_id,
rpc::WorkerType worker_type,
const JobID &job_id,
const int &runtime_env_hash,
const Language &language,
const std::string &ip_address,
Status *status,
NodeID *raylet_id,
int *port,
std::string *serialized_job_config,
StartupToken startup_token)
raylet::RayletClient::RayletClient(
instrumented_io_context &io_service,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket,
const WorkerID &worker_id,
rpc::WorkerType worker_type,
const JobID &job_id,
const int &runtime_env_hash,
const Language &language,
const std::string &ip_address,
Status *status,
NodeID *raylet_id,
int *port,
std::string *serialized_job_config,
StartupToken startup_token)
: grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) {
conn_ = std::make_unique<RayletConnection>(io_service, raylet_socket, -1, -1);
pin_batcher_ = std::make_unique<PinBatcher>(grpc_client_);
conn_ = std::make_unique<raylet::RayletConnection>(io_service, raylet_socket, -1, -1);
flatbuffers::FlatBufferBuilder fbb;
// TODO(suquark): Use `WorkerType` in `common.proto` without converting to int.
@ -158,9 +156,7 @@ RayletClient::RayletClient(instrumented_io_context &io_service,
*serialized_job_config = reply_message->serialized_job_config()->str();
}
RayletClient::~RayletClient() {}
Status RayletClient::Disconnect(
Status raylet::RayletClient::Disconnect(
rpc::WorkerExitType exit_type,
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) {
RAY_LOG(INFO) << "RayletClient::Disconnect, exit_type="
@ -194,20 +190,23 @@ Status RayletClient::Disconnect(
return Status::OK();
}
Status RayletClient::AnnounceWorkerPort(int port) {
Status raylet::RayletClient::AnnounceWorkerPort(int port) {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateAnnounceWorkerPort(fbb, port);
fbb.Finish(message);
return conn_->WriteMessage(MessageType::AnnounceWorkerPort, &fbb);
}
Status RayletClient::TaskDone() { return conn_->WriteMessage(MessageType::TaskDone); }
Status raylet::RayletClient::TaskDone() {
return conn_->WriteMessage(MessageType::TaskDone);
}
Status RayletClient::FetchOrReconstruct(const std::vector<ObjectID> &object_ids,
const std::vector<rpc::Address> &owner_addresses,
bool fetch_only,
bool mark_worker_blocked,
const TaskID &current_task_id) {
Status raylet::RayletClient::FetchOrReconstruct(
const std::vector<ObjectID> &object_ids,
const std::vector<rpc::Address> &owner_addresses,
bool fetch_only,
bool mark_worker_blocked,
const TaskID &current_task_id) {
RAY_CHECK(object_ids.size() == owner_addresses.size());
flatbuffers::FlatBufferBuilder fbb;
auto object_ids_message = to_flatbuf(fbb, object_ids);
@ -222,34 +221,34 @@ Status RayletClient::FetchOrReconstruct(const std::vector<ObjectID> &object_ids,
return conn_->WriteMessage(MessageType::FetchOrReconstruct, &fbb);
}
Status RayletClient::NotifyUnblocked(const TaskID &current_task_id) {
Status raylet::RayletClient::NotifyUnblocked(const TaskID &current_task_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateNotifyUnblocked(fbb, to_flatbuf(fbb, current_task_id));
fbb.Finish(message);
return conn_->WriteMessage(MessageType::NotifyUnblocked, &fbb);
}
Status RayletClient::NotifyDirectCallTaskBlocked(bool release_resources) {
Status raylet::RayletClient::NotifyDirectCallTaskBlocked(bool release_resources) {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateNotifyDirectCallTaskBlocked(fbb, release_resources);
fbb.Finish(message);
return conn_->WriteMessage(MessageType::NotifyDirectCallTaskBlocked, &fbb);
}
Status RayletClient::NotifyDirectCallTaskUnblocked() {
Status raylet::RayletClient::NotifyDirectCallTaskUnblocked() {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateNotifyDirectCallTaskUnblocked(fbb);
fbb.Finish(message);
return conn_->WriteMessage(MessageType::NotifyDirectCallTaskUnblocked, &fbb);
}
Status RayletClient::Wait(const std::vector<ObjectID> &object_ids,
const std::vector<rpc::Address> &owner_addresses,
int num_returns,
int64_t timeout_milliseconds,
bool mark_worker_blocked,
const TaskID &current_task_id,
WaitResultPair *result) {
Status raylet::RayletClient::Wait(const std::vector<ObjectID> &object_ids,
const std::vector<rpc::Address> &owner_addresses,
int num_returns,
int64_t timeout_milliseconds,
bool mark_worker_blocked,
const TaskID &current_task_id,
WaitResultPair *result) {
// Write request.
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateWaitRequest(fbb,
@ -278,7 +277,7 @@ Status RayletClient::Wait(const std::vector<ObjectID> &object_ids,
return Status::OK();
}
Status RayletClient::WaitForDirectActorCallArgs(
Status raylet::RayletClient::WaitForDirectActorCallArgs(
const std::vector<rpc::ObjectReference> &references, int64_t tag) {
flatbuffers::FlatBufferBuilder fbb;
std::vector<ObjectID> object_ids;
@ -293,10 +292,10 @@ Status RayletClient::WaitForDirectActorCallArgs(
return conn_->WriteMessage(MessageType::WaitForDirectActorCallArgsRequest, &fbb);
}
Status RayletClient::PushError(const JobID &job_id,
const std::string &type,
const std::string &error_message,
double timestamp) {
Status raylet::RayletClient::PushError(const JobID &job_id,
const std::string &type,
const std::string &error_message,
double timestamp) {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreatePushErrorRequest(fbb,
to_flatbuf(fbb, job_id),
@ -307,8 +306,8 @@ Status RayletClient::PushError(const JobID &job_id,
return conn_->WriteMessage(MessageType::PushErrorRequest, &fbb);
}
Status RayletClient::FreeObjects(const std::vector<ObjectID> &object_ids,
bool local_only) {
Status raylet::RayletClient::FreeObjects(const std::vector<ObjectID> &object_ids,
bool local_only) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
protocol::CreateFreeObjectsRequest(fbb, local_only, to_flatbuf(fbb, object_ids));
@ -316,7 +315,7 @@ Status RayletClient::FreeObjects(const std::vector<ObjectID> &object_ids,
return conn_->WriteMessage(MessageType::FreeObjectsInObjectStoreRequest, &fbb);
}
void RayletClient::RequestWorkerLease(
void raylet::RayletClient::RequestWorkerLease(
const rpc::TaskSpec &task_spec,
bool grant_or_reject,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
@ -338,7 +337,7 @@ void RayletClient::RequestWorkerLease(
}
/// Spill objects to external storage.
void RayletClient::RequestObjectSpillage(
void raylet::RayletClient::RequestObjectSpillage(
const ObjectID &object_id,
const rpc::ClientCallback<rpc::RequestObjectSpillageReply> &callback) {
rpc::RequestObjectSpillageRequest request;
@ -346,11 +345,11 @@ void RayletClient::RequestObjectSpillage(
grpc_client_->RequestObjectSpillage(request, callback);
}
std::shared_ptr<grpc::Channel> RayletClient::GetChannel() const {
std::shared_ptr<grpc::Channel> raylet::RayletClient::GetChannel() const {
return grpc_client_->Channel();
}
void RayletClient::ReportWorkerBacklog(
void raylet::RayletClient::ReportWorkerBacklog(
const WorkerID &worker_id,
const std::vector<rpc::WorkerBacklogReport> &backlog_reports) {
rpc::ReportWorkerBacklogRequest request;
@ -364,10 +363,10 @@ void RayletClient::ReportWorkerBacklog(
});
}
Status RayletClient::ReturnWorker(int worker_port,
const WorkerID &worker_id,
bool disconnect_worker,
bool worker_exiting) {
Status raylet::RayletClient::ReturnWorker(int worker_port,
const WorkerID &worker_id,
bool disconnect_worker,
bool worker_exiting) {
rpc::ReturnWorkerRequest request;
request.set_worker_port(worker_port);
request.set_worker_id(worker_id.Binary());
@ -382,7 +381,7 @@ Status RayletClient::ReturnWorker(int worker_port,
return Status::OK();
}
void RayletClient::ReleaseUnusedWorkers(
void raylet::RayletClient::ReleaseUnusedWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) {
rpc::ReleaseUnusedWorkersRequest request;
@ -401,7 +400,7 @@ void RayletClient::ReleaseUnusedWorkers(
});
}
void RayletClient::CancelWorkerLease(
void raylet::RayletClient::CancelWorkerLease(
const TaskID &task_id,
const rpc::ClientCallback<rpc::CancelWorkerLeaseReply> &callback) {
rpc::CancelWorkerLeaseRequest request;
@ -409,7 +408,7 @@ void RayletClient::CancelWorkerLease(
grpc_client_->CancelWorkerLease(request, callback);
}
void RayletClient::PrepareBundleResources(
void raylet::RayletClient::PrepareBundleResources(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback) {
rpc::PrepareBundleResourcesRequest request;
@ -423,7 +422,7 @@ void RayletClient::PrepareBundleResources(
grpc_client_->PrepareBundleResources(request, callback);
}
void RayletClient::CommitBundleResources(
void raylet::RayletClient::CommitBundleResources(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs,
const ray::rpc::ClientCallback<ray::rpc::CommitBundleResourcesReply> &callback) {
rpc::CommitBundleResourcesRequest request;
@ -437,7 +436,7 @@ void RayletClient::CommitBundleResources(
grpc_client_->CommitBundleResources(request, callback);
}
void RayletClient::CancelResourceReserve(
void raylet::RayletClient::CancelResourceReserve(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CancelResourceReserveReply> &callback) {
rpc::CancelResourceReserveRequest request;
@ -445,7 +444,7 @@ void RayletClient::CancelResourceReserve(
grpc_client_->CancelResourceReserve(request, callback);
}
void RayletClient::ReleaseUnusedBundles(
void raylet::RayletClient::ReleaseUnusedBundles(
const std::vector<rpc::Bundle> &bundles_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback) {
rpc::ReleaseUnusedBundlesRequest request;
@ -464,13 +463,25 @@ void RayletClient::ReleaseUnusedBundles(
});
}
void RayletClient::PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) {
pin_batcher_->Add(caller_address, object_id, std::move(callback));
void raylet::RayletClient::PinObjectIDs(
const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const rpc::ClientCallback<rpc::PinObjectIDsReply> &callback) {
rpc::PinObjectIDsRequest request;
request.mutable_owner_address()->CopyFrom(caller_address);
for (const ObjectID &object_id : object_ids) {
request.add_object_ids(object_id.Binary());
}
pins_in_flight_++;
auto rpc_callback = [this, callback = std::move(callback)](
Status status, const rpc::PinObjectIDsReply &reply) {
pins_in_flight_--;
callback(status, reply);
};
grpc_client_->PinObjectIDs(request, rpc_callback);
}
void RayletClient::ShutdownRaylet(
void raylet::RayletClient::ShutdownRaylet(
const NodeID &node_id,
bool graceful,
const rpc::ClientCallback<rpc::ShutdownRayletReply> &callback) {
@ -479,12 +490,13 @@ void RayletClient::ShutdownRaylet(
grpc_client_->ShutdownRaylet(request, callback);
}
void RayletClient::GlobalGC(const rpc::ClientCallback<rpc::GlobalGCReply> &callback) {
void raylet::RayletClient::GlobalGC(
const rpc::ClientCallback<rpc::GlobalGCReply> &callback) {
rpc::GlobalGCRequest request;
grpc_client_->GlobalGC(request, callback);
}
void RayletClient::UpdateResourceUsage(
void raylet::RayletClient::UpdateResourceUsage(
std::string &serialized_resource_usage_batch,
const rpc::ClientCallback<rpc::UpdateResourceUsageReply> &callback) {
@ -493,20 +505,20 @@ void RayletClient::UpdateResourceUsage(
grpc_client_->UpdateResourceUsage(request, callback);
}
void RayletClient::RequestResourceReport(
void raylet::RayletClient::RequestResourceReport(
const rpc::ClientCallback<rpc::RequestResourceReportReply> &callback) {
rpc::RequestResourceReportRequest request;
grpc_client_->RequestResourceReport(request, callback);
}
void RayletClient::GetResourceLoad(
void raylet::RayletClient::GetResourceLoad(
const rpc::ClientCallback<rpc::GetResourceLoadReply> &callback) {
rpc::GetResourceLoadRequest request;
grpc_client_->GetResourceLoad(request, callback);
}
void RayletClient::SubscribeToPlasma(const ObjectID &object_id,
const rpc::Address &owner_address) {
void raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id,
const rpc::Address &owner_address) {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateSubscribePlasmaReady(
fbb, to_flatbuf(fbb, object_id), to_flatbuf(fbb, owner_address));
@ -515,74 +527,16 @@ void RayletClient::SubscribeToPlasma(const ObjectID &object_id,
RAY_CHECK_OK(conn_->WriteMessage(MessageType::SubscribePlasmaReady, &fbb));
}
void RayletClient::GetSystemConfig(
void raylet::RayletClient::GetSystemConfig(
const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback) {
rpc::GetSystemConfigRequest request;
grpc_client_->GetSystemConfig(request, callback);
}
void RayletClient::GetGcsServerAddress(
void raylet::RayletClient::GetGcsServerAddress(
const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback) {
rpc::GetGcsServerAddressRequest request;
grpc_client_->GetGcsServerAddress(request, callback);
}
int64_t RayletClient::GetPinsInFlight() const { return pin_batcher_->TotalPending(); }
PinBatcher::PinBatcher(std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client)
: grpc_client_(std::move(grpc_client)) {}
void PinBatcher::Add(const rpc::Address &address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) {
absl::MutexLock lock(&mu_);
total_inflight_pins_++;
RayletDestination &raylet =
raylets_.try_emplace(address.raylet_id(), address).first->second;
raylet.buffered_.emplace_back(object_id, std::move(callback));
Flush(address.raylet_id());
}
int64_t PinBatcher::TotalPending() const {
absl::MutexLock lock(&mu_);
return total_inflight_pins_;
}
bool PinBatcher::Flush(const std::string &raylet_id) {
auto &raylet = raylets_.at(raylet_id);
if (raylet.buffered_.empty() || !raylet.inflight_.empty()) {
return false;
}
raylet.inflight_ = std::move(raylet.buffered_);
raylet.buffered_.clear();
rpc::PinObjectIDRequest request;
request.mutable_owner_address()->CopyFrom(raylet.raylet_address_);
for (const auto &req : raylet.inflight_) {
request.add_object_ids(req.object_id.Binary());
}
auto rpc_callback = [this, raylet_id](Status status,
const rpc::PinObjectIDReply &reply) {
std::vector<Request> inflight;
{
absl::MutexLock lock(&mu_);
auto &raylet = raylets_.at(raylet_id);
inflight = std::move(raylet.inflight_);
raylet.inflight_.clear();
total_inflight_pins_ -= inflight.size();
if (!Flush(raylet_id)) {
// No more buffered requests, so this RayletDestination can be dropped.
raylets_.erase(raylet_id);
}
}
for (auto &req : inflight) {
req.callback(status, reply);
}
};
grpc_client_->PinObjectID(request, std::move(rpc_callback));
return true;
}
} // namespace raylet
} // namespace ray

View file

@ -50,9 +50,10 @@ namespace ray {
class PinObjectsInterface {
public:
/// Request to a raylet to pin a plasma object. The callback will be sent via gRPC.
virtual void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) = 0;
virtual void PinObjectIDs(
const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) = 0;
virtual ~PinObjectsInterface(){};
};
@ -232,53 +233,6 @@ class RayletConnection {
std::mutex write_mutex_;
};
/// Batches PinObjectIDRequest so there would be only one outstanding
/// request per Raylet. This reduces the memory and CPU overhead when a
/// large number of objects need to be pinned.
class PinBatcher {
public:
PinBatcher(std::shared_ptr<rpc::NodeManagerWorkerClient> grpc_client);
/// Adds objects to be pinned at the address.
void Add(const rpc::Address &address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback);
/// Total number of objects waiting to be pinned.
int64_t TotalPending() const;
private:
// Request from a single Add() call.
struct Request {
Request(ObjectID oid, rpc::ClientCallback<rpc::PinObjectIDReply> cb)
: object_id(oid), callback(std::move(cb)) {}
ObjectID object_id;
rpc::ClientCallback<rpc::PinObjectIDReply> callback;
};
// Collects buffered pin object requests intended for a raylet.
struct RayletDestination {
RayletDestination(const rpc::Address &address) : raylet_address_(address) {}
const rpc::Address raylet_address_;
std::vector<Request> inflight_;
std::vector<Request> buffered_;
};
/// Tries sending out a batched pin request with buffered object IDs.
///
/// \return true if a request is sent out, false otherwise, e.g. when
/// there is already an inflight request, or there is no buffered Object IDs.
bool Flush(const std::string &raylet_id) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
const std::shared_ptr<rpc::NodeManagerWorkerClient> grpc_client_;
mutable absl::Mutex mu_;
// Maps Raylet ID to the address and buffered messages for the Raylet.
absl::flat_hash_map<std::string, RayletDestination> raylets_ ABSL_GUARDED_BY(mu_);
int64_t total_inflight_pins_ ABSL_GUARDED_BY(mu_) = 0;
};
class RayletClient : public RayletClientInterface {
public:
/// Connect to the raylet.
@ -304,7 +258,7 @@ class RayletClient : public RayletClientInterface {
/// \param startup_token The startup token of the process assigned to
/// it during startup as a command line argument.
RayletClient(instrumented_io_context &io_service,
std::shared_ptr<rpc::NodeManagerWorkerClient> grpc_client,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket,
const WorkerID &worker_id,
rpc::WorkerType worker_type,
@ -321,9 +275,7 @@ class RayletClient : public RayletClientInterface {
/// Connect to the raylet via grpc only.
///
/// \param grpc_client gRPC client to the raylet.
RayletClient(std::shared_ptr<rpc::NodeManagerWorkerClient> grpc_client);
~RayletClient() override;
RayletClient(std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client);
/// Notify the raylet that this client is disconnecting gracefully. This
/// is used by actors to exit gracefully so that the raylet doesn't
@ -488,9 +440,10 @@ class RayletClient : public RayletClientInterface {
const std::vector<rpc::Bundle> &bundles_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback) override;
void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) override;
void PinObjectIDs(
const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) override;
void ShutdownRaylet(
const NodeID &node_id,
@ -524,12 +477,12 @@ class RayletClient : public RayletClientInterface {
const ResourceMappingType &GetResourceIDs() const { return resource_ids_; }
int64_t GetPinsInFlight() const;
int64_t GetPinsInFlight() const { return pins_in_flight_.load(); }
private:
/// gRPC client to the raylet. Right now, this is only used for a couple
/// request types.
std::shared_ptr<rpc::NodeManagerWorkerClient> grpc_client_;
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client_;
const WorkerID worker_id_;
const JobID job_id_;
@ -539,9 +492,12 @@ class RayletClient : public RayletClientInterface {
ResourceMappingType resource_ids_;
/// The connection to the raylet server.
std::unique_ptr<RayletConnection> conn_;
/// Batches pin object ID requests to the same raylet. All PinObjectID requests
/// should go through this.
std::unique_ptr<PinBatcher> pin_batcher_;
/// The number of object ID pin RPCs currently in flight.
std::atomic<int64_t> pins_in_flight_{0};
protected:
RayletClient() {}
};
} // namespace raylet

View file

@ -153,7 +153,7 @@ class NodeManagerWorkerClient
/// Notify the raylet to pin the provided object IDs.
VOID_RPC_CLIENT_METHOD(NodeManagerService,
PinObjectID,
PinObjectIDs,
grpc_client_,
/*method_timeout_ms*/ -1, )

View file

@ -33,7 +33,7 @@ namespace rpc {
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedWorkers, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, CancelWorkerLease, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectID, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, FormatGlobalMemoryInfo, -1) \
@ -114,9 +114,9 @@ class NodeManagerServiceHandler {
rpc::CancelResourceReserveReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;
virtual void HandlePinObjectID(const PinObjectIDRequest &request,
PinObjectIDReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandlePinObjectIDs(const PinObjectIDsRequest &request,
PinObjectIDsReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetNodeStats(const GetNodeStatsRequest &request,
GetNodeStatsReply *reply,