[gRPC] Improve blocking call Placement group (#21130)

Use Sync methods with timeout for placement group RPCs
This commit is contained in:
SangBin Cho 2021-12-23 10:21:56 +09:00 committed by GitHub
parent 11ab412db1
commit 99693096d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 113 deletions

View file

@ -230,28 +230,23 @@ namespace gcs {
class MockPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor {
public:
MOCK_METHOD(Status, AsyncCreatePlacementGroup,
(const PlacementGroupSpecification &placement_group_spec,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, SyncCreatePlacementGroup,
(const PlacementGroupSpecification &placement_group_spec), (override));
MOCK_METHOD(Status, AsyncGet,
(const PlacementGroupID &placement_group_id,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback),
(override));
MOCK_METHOD(Status, AsyncGetByName,
(const std::string &placement_group_name, const std::string &ray_namespace,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback),
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback,
int64_t timeout_ms),
(override));
MOCK_METHOD(Status, AsyncGetAll,
(const MultiItemCallback<rpc::PlacementGroupTableData> &callback),
(override));
MOCK_METHOD(Status, AsyncRemovePlacementGroup,
(const PlacementGroupID &placement_group_id,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncWaitUntilReady,
(const PlacementGroupID &placement_group_id,
const StatusCallback &callback),
MOCK_METHOD(Status, SyncRemovePlacementGroup,
(const PlacementGroupID &placement_group_id), (override));
MOCK_METHOD(Status, SyncWaitUntilReady, (const PlacementGroupID &placement_group_id),
(override));
};

View file

@ -1674,8 +1674,6 @@ Status CoreWorker::CreateActor(const RayFunction &function,
Status CoreWorker::CreatePlacementGroup(
const PlacementGroupCreationOptions &placement_group_creation_options,
PlacementGroupID *return_placement_group_id) {
std::shared_ptr<std::promise<Status>> status_promise =
std::make_shared<std::promise<Status>>();
const auto &bundles = placement_group_creation_options.bundles;
for (const auto &bundle : bundles) {
for (const auto &resource : bundle) {
@ -1697,63 +1695,48 @@ Status CoreWorker::CreatePlacementGroup(
PlacementGroupSpecification placement_group_spec = builder.Build();
*return_placement_group_id = placement_group_id;
RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id;
RAY_UNUSED(gcs_client_->PlacementGroups().AsyncCreatePlacementGroup(
placement_group_spec,
[status_promise](const Status &status) { status_promise->set_value(status); }));
auto status_future = status_promise->get_future();
if (status_future.wait_for(std::chrono::seconds(
RayConfig::instance().gcs_server_request_timeout_seconds())) !=
std::future_status::ready) {
const auto status =
gcs_client_->PlacementGroups().SyncCreatePlacementGroup(placement_group_spec);
if (status.IsTimedOut()) {
std::ostringstream stream;
stream << "There was timeout in creating the placement group of id "
<< placement_group_id
<< ". It is probably "
"because GCS server is dead or there's a high load there.";
return Status::TimedOut(stream.str());
} else {
return status;
}
return status_future.get();
}
Status CoreWorker::RemovePlacementGroup(const PlacementGroupID &placement_group_id) {
std::shared_ptr<std::promise<Status>> status_promise =
std::make_shared<std::promise<Status>>();
// Synchronously wait for placement group removal.
RAY_UNUSED(gcs_client_->PlacementGroups().AsyncRemovePlacementGroup(
placement_group_id,
[status_promise](const Status &status) { status_promise->set_value(status); }));
auto status_future = status_promise->get_future();
if (status_future.wait_for(std::chrono::seconds(
RayConfig::instance().gcs_server_request_timeout_seconds())) !=
std::future_status::ready) {
const auto status =
gcs_client_->PlacementGroups().SyncRemovePlacementGroup(placement_group_id);
if (status.IsTimedOut()) {
std::ostringstream stream;
stream << "There was timeout in removing the placement group of id "
<< placement_group_id
<< ". It is probably "
"because GCS server is dead or there's a high load there.";
return Status::TimedOut(stream.str());
} else {
return status;
}
return status_future.get();
}
Status CoreWorker::WaitPlacementGroupReady(const PlacementGroupID &placement_group_id,
int timeout_seconds) {
std::shared_ptr<std::promise<Status>> status_promise =
std::make_shared<std::promise<Status>>();
RAY_CHECK_OK(gcs_client_->PlacementGroups().AsyncWaitUntilReady(
placement_group_id,
[status_promise](const Status &status) { status_promise->set_value(status); }));
auto status_future = status_promise->get_future();
if (status_future.wait_for(std::chrono::seconds(timeout_seconds)) !=
std::future_status::ready) {
const auto status =
gcs_client_->PlacementGroups().SyncWaitUntilReady(placement_group_id);
if (status.IsTimedOut()) {
std::ostringstream stream;
stream << "There was timeout in waiting for placement group " << placement_group_id
<< " creation.";
return Status::TimedOut(stream.str());
} else {
return status;
}
return status_future.get();
}
std::optional<std::vector<rpc::ObjectReference>> CoreWorker::SubmitActorTask(

View file

@ -1031,46 +1031,31 @@ Status WorkerInfoAccessor::AsyncAdd(const std::shared_ptr<rpc::WorkerTableData>
PlacementGroupInfoAccessor::PlacementGroupInfoAccessor(GcsClient *client_impl)
: client_impl_(client_impl) {}
Status PlacementGroupInfoAccessor::AsyncCreatePlacementGroup(
const ray::PlacementGroupSpecification &placement_group_spec,
const StatusCallback &callback) {
Status PlacementGroupInfoAccessor::SyncCreatePlacementGroup(
const ray::PlacementGroupSpecification &placement_group_spec) {
rpc::CreatePlacementGroupRequest request;
rpc::CreatePlacementGroupReply reply;
request.mutable_placement_group_spec()->CopyFrom(placement_group_spec.GetMessage());
client_impl_->GetGcsRpcClient().CreatePlacementGroup(
request,
[placement_group_spec, callback](const Status & /*unused*/,
const rpc::CreatePlacementGroupReply &reply) {
auto status =
reply.status().code() == (int)StatusCode::OK
? Status()
: Status(StatusCode(reply.status().code()), reply.status().message());
if (status.ok()) {
RAY_LOG(DEBUG) << "Finished registering placement group. placement group id = "
<< placement_group_spec.PlacementGroupId();
} else {
RAY_LOG(ERROR) << "Placement group id = "
<< placement_group_spec.PlacementGroupId()
<< " failed to be registered. " << status;
}
if (callback) {
callback(status);
}
});
return Status::OK();
auto status = client_impl_->GetGcsRpcClient().SyncCreatePlacementGroup(
request, &reply, GetGcsTimeoutMs());
if (status.ok()) {
RAY_LOG(DEBUG) << "Finished registering placement group. placement group id = "
<< placement_group_spec.PlacementGroupId();
} else {
RAY_LOG(ERROR) << "Placement group id = " << placement_group_spec.PlacementGroupId()
<< " failed to be registered. " << status;
}
return status;
}
Status PlacementGroupInfoAccessor::AsyncRemovePlacementGroup(
const ray::PlacementGroupID &placement_group_id, const StatusCallback &callback) {
Status PlacementGroupInfoAccessor::SyncRemovePlacementGroup(
const ray::PlacementGroupID &placement_group_id) {
rpc::RemovePlacementGroupRequest request;
rpc::RemovePlacementGroupReply reply;
request.set_placement_group_id(placement_group_id.Binary());
client_impl_->GetGcsRpcClient().RemovePlacementGroup(
request,
[callback](const Status &status, const rpc::RemovePlacementGroupReply &reply) {
if (callback) {
callback(status);
}
});
return Status::OK();
auto status = client_impl_->GetGcsRpcClient().SyncRemovePlacementGroup(
request, &reply, GetGcsTimeoutMs());
return status;
}
Status PlacementGroupInfoAccessor::AsyncGet(
@ -1096,14 +1081,16 @@ Status PlacementGroupInfoAccessor::AsyncGet(
Status PlacementGroupInfoAccessor::AsyncGetByName(
const std::string &name, const std::string &ray_namespace,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback) {
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback,
int64_t timeout_ms) {
RAY_LOG(DEBUG) << "Getting named placement group info, name = " << name;
rpc::GetNamedPlacementGroupRequest request;
request.set_name(name);
request.set_ray_namespace(ray_namespace);
client_impl_->GetGcsRpcClient().GetNamedPlacementGroup(
request, [name, callback](const Status &status,
const rpc::GetNamedPlacementGroupReply &reply) {
request,
[name, callback](const Status &status,
const rpc::GetNamedPlacementGroupReply &reply) {
if (reply.has_placement_group_table_data()) {
callback(status, reply.placement_group_table_data());
} else {
@ -1111,7 +1098,8 @@ Status PlacementGroupInfoAccessor::AsyncGetByName(
}
RAY_LOG(DEBUG) << "Finished getting named placement group info, status = "
<< status << ", name = " << name;
});
},
/*timeout_ms*/ timeout_ms);
return Status::OK();
}
@ -1129,22 +1117,16 @@ Status PlacementGroupInfoAccessor::AsyncGetAll(
return Status::OK();
}
Status PlacementGroupInfoAccessor::AsyncWaitUntilReady(
const PlacementGroupID &placement_group_id, const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Waiting for placement group until ready, placement group id = "
<< placement_group_id;
Status PlacementGroupInfoAccessor::SyncWaitUntilReady(
const PlacementGroupID &placement_group_id) {
rpc::WaitPlacementGroupUntilReadyRequest request;
rpc::WaitPlacementGroupUntilReadyReply reply;
request.set_placement_group_id(placement_group_id.Binary());
client_impl_->GetGcsRpcClient().WaitPlacementGroupUntilReady(
request,
[placement_group_id, callback](
const Status &status, const rpc::WaitPlacementGroupUntilReadyReply &reply) {
callback(status);
RAY_LOG(DEBUG)
<< "Finished waiting placement group until ready, placement group id = "
<< placement_group_id;
});
return Status::OK();
auto status = client_impl_->GetGcsRpcClient().SyncWaitPlacementGroupUntilReady(
request, &reply, GetGcsTimeoutMs());
RAY_LOG(DEBUG) << "Finished waiting placement group until ready, placement group id = "
<< placement_group_id;
return status;
}
InternalKVAccessor::InternalKVAccessor(GcsClient *client_impl)

View file

@ -667,15 +667,15 @@ class PlacementGroupInfoAccessor {
explicit PlacementGroupInfoAccessor(GcsClient *client_impl);
virtual ~PlacementGroupInfoAccessor() = default;
/// Create a placement group to GCS asynchronously.
/// Create a placement group to GCS synchronously.
///
/// The RPC will timeout after the default GCS RPC timeout is exceeded.
///
/// \param placement_group_spec The specification for the placement group creation task.
/// \param callback Callback that will be called after the placement group info is
/// written to GCS.
/// \return Status.
virtual Status AsyncCreatePlacementGroup(
const PlacementGroupSpecification &placement_group_spec,
const StatusCallback &callback);
/// \return Status. The status of the RPC. TimedOut if the RPC times out. Invalid if the
/// same name placement group is registered. NotFound if the placement group is removed.
virtual Status SyncCreatePlacementGroup(
const ray::PlacementGroupSpecification &placement_group_spec);
/// Get a placement group data from GCS asynchronously by id.
///
@ -688,10 +688,14 @@ class PlacementGroupInfoAccessor {
/// Get a placement group data from GCS asynchronously by name.
///
/// \param placement_group_name The name of a placement group to obtain from GCS.
/// \param ray_namespace The ray namespace.
/// \param callback The callback that's called when the RPC is replied.
/// \param timeout_ms The RPC timeout in milliseconds. -1 means the default.
/// \return Status.
virtual Status AsyncGetByName(
const std::string &placement_group_name, const std::string &ray_namespace,
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback);
const OptionalItemCallback<rpc::PlacementGroupTableData> &callback,
int64_t timeout_ms = -1);
/// Get all placement group info from GCS asynchronously.
///
@ -700,22 +704,22 @@ class PlacementGroupInfoAccessor {
virtual Status AsyncGetAll(
const MultiItemCallback<rpc::PlacementGroupTableData> &callback);
/// Remove a placement group to GCS asynchronously.
/// Remove a placement group to GCS synchronously.
///
/// The RPC will timeout after the default GCS RPC timeout is exceeded.
///
/// \param placement_group_id The id for the placement group to remove.
/// \param callback Callback that will be called after the placement group is
/// removed from GCS.
/// \return Status
virtual Status AsyncRemovePlacementGroup(const PlacementGroupID &placement_group_id,
const StatusCallback &callback);
virtual Status SyncRemovePlacementGroup(const PlacementGroupID &placement_group_id);
/// Wait for a placement group until ready asynchronously.
///
/// The RPC will timeout after the default GCS RPC timeout is exceeded.
///
/// \param placement_group_id The id for the placement group to wait for until ready.
/// \param callback Callback that will be called after the placement group is created.
/// \return Status
virtual Status AsyncWaitUntilReady(const PlacementGroupID &placement_group_id,
const StatusCallback &callback);
/// \return Status. TimedOut if the RPC times out. NotFound if the placement has already
/// removed.
virtual Status SyncWaitUntilReady(const PlacementGroupID &placement_group_id);
private:
GcsClient *client_impl_;