[Core][GCS] remove gcs object manager (#19963)

This commit is contained in:
mwtian 2021-11-02 16:20:53 -07:00 committed by GitHub
parent 14d0889fbc
commit ef4b6e4648
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 6 additions and 1774 deletions

View file

@ -1810,23 +1810,6 @@ cc_test(
],
)
cc_test(
name = "gcs_object_manager_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":gcs_client_lib",
":gcs_server_lib",
":gcs_server_test_util",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "object_manager",
srcs = glob([

View file

@ -25,7 +25,6 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
c_vector[c_string] GetAllNodeInfo()
c_vector[c_string] GetAllAvailableResources()
c_vector[c_string] GetAllProfileInfo()
c_vector[c_string] GetAllObjectInfo()
unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id)
unique_ptr[c_string] GetAllResourceUsage()
c_vector[c_string] GetAllActorInfo()

View file

@ -67,21 +67,6 @@ cdef class GlobalStateAccessor:
result = self.inner.get().GetAllProfileInfo()
return result
def get_object_table(self):
cdef c_vector[c_string] result
with nogil:
result = self.inner.get().GetAllObjectInfo()
return result
def get_object_info(self, object_id):
cdef unique_ptr[c_string] object_info
cdef CObjectID cobject_id = CObjectID.FromBinary(object_id.binary())
with nogil:
object_info = self.inner.get().GetObjectInfo(cobject_id)
if object_info:
return c_string(object_info.get().data(), object_info.get().size())
return None
def get_all_resource_usage(self):
"""Get newest resource usage of all nodes from GCS service."""
cdef unique_ptr[c_string] result

View file

@ -83,55 +83,6 @@ class GlobalState:
self.redis_address, self.redis_password)
self.global_state_accessor.connect()
def object_table(self, object_ref=None):
"""Fetch and parse the object table info for one or more object refs.
Args:
object_ref: An object ref to fetch information about. If this is
None, then the entire object table is fetched.
Returns:
Information from the object table.
"""
self._check_connected()
if object_ref is not None:
object_ref = ray.ObjectRef(hex_to_binary(object_ref))
object_info = self.global_state_accessor.get_object_info(
object_ref)
if object_info is None:
return {}
else:
object_location_info = gcs_utils.ObjectLocationInfo.FromString(
object_info)
return self._gen_object_info(object_location_info)
else:
object_table = self.global_state_accessor.get_object_table()
results = {}
for i in range(len(object_table)):
object_location_info = gcs_utils.ObjectLocationInfo.FromString(
object_table[i])
results[binary_to_hex(object_location_info.object_id)] = \
self._gen_object_info(object_location_info)
return results
def _gen_object_info(self, object_location_info):
"""Parse object location info.
Returns:
Information from object.
"""
locations = []
for location in object_location_info.locations:
locations.append(
ray._private.utils.binary_to_hex(location.manager))
object_info = {
"ObjectRef": ray._private.utils.binary_to_hex(
object_location_info.object_id),
"Locations": locations,
}
return object_info
def actor_table(self, actor_id):
"""Fetch and parse the actor table information for a single actor ID.

View file

@ -129,45 +129,6 @@ class MockTaskInfoAccessor : public TaskInfoAccessor {
namespace ray {
namespace gcs {
class MockObjectInfoAccessor : public ObjectInfoAccessor {
public:
MOCK_METHOD(Status, AsyncGetLocations,
(const ObjectID &object_id,
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback),
(override));
MOCK_METHOD(Status, AsyncGetAll,
(const MultiItemCallback<rpc::ObjectLocationInfo> &callback), (override));
MOCK_METHOD(Status, AsyncAddLocation,
(const ObjectID &object_id, const NodeID &node_id, size_t object_size,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncAddSpilledUrl,
(const ObjectID &object_id, const std::string &spilled_url,
const NodeID &spilled_node_id, size_t object_size,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncRemoveLocation,
(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncSubscribeToLocations,
(const ObjectID &object_id,
(const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>>
&subscribe),
const StatusCallback &done),
(override));
MOCK_METHOD(Status, AsyncUnsubscribeToLocations, (const ObjectID &object_id),
(override));
MOCK_METHOD(void, AsyncResubscribe, (bool is_pubsub_server_restarted), (override));
MOCK_METHOD(bool, IsObjectUnsubscribed, (const ObjectID &object_id), (override));
};
} // namespace gcs
} // namespace ray
namespace ray {
namespace gcs {
class MockNodeInfoAccessor : public NodeInfoAccessor {
public:
MOCK_METHOD(Status, RegisterSelf,

View file

@ -39,7 +39,6 @@ class MockGcsClient : public GcsClient {
MockGcsClient() {
mock_job_accessor = new MockJobInfoAccessor();
mock_actor_accessor = new MockActorInfoAccessor();
mock_object_accessor = new MockObjectInfoAccessor();
mock_node_accessor = new MockNodeInfoAccessor();
mock_node_resource_accessor = new MockNodeResourceInfoAccessor();
mock_task_accessor = new MockTaskInfoAccessor();
@ -54,7 +53,6 @@ class MockGcsClient : public GcsClient {
GcsClient::node_accessor_.reset(mock_node_accessor);
GcsClient::node_resource_accessor_.reset(mock_node_resource_accessor);
GcsClient::task_accessor_.reset(mock_task_accessor);
GcsClient::object_accessor_.reset(mock_object_accessor);
GcsClient::stats_accessor_.reset(mock_stats_accessor);
GcsClient::error_accessor_.reset(mock_error_accessor);
GcsClient::worker_accessor_.reset(mock_worker_accessor);
@ -62,7 +60,6 @@ class MockGcsClient : public GcsClient {
}
MockActorInfoAccessor *mock_actor_accessor;
MockJobInfoAccessor *mock_job_accessor;
MockObjectInfoAccessor *mock_object_accessor;
MockNodeInfoAccessor *mock_node_accessor;
MockNodeResourceInfoAccessor *mock_node_resource_accessor;
MockTaskInfoAccessor *mock_task_accessor;

View file

@ -1,43 +0,0 @@
// Copyright The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace gcs {
class MockGcsObjectManager : public GcsObjectManager {
public:
MOCK_METHOD(void, HandleGetObjectLocations,
(const rpc::GetObjectLocationsRequest &request,
rpc::GetObjectLocationsReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void, HandleGetAllObjectLocations,
(const rpc::GetAllObjectLocationsRequest &request,
rpc::GetAllObjectLocationsReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void, HandleAddObjectLocation,
(const rpc::AddObjectLocationRequest &request,
rpc::AddObjectLocationReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void, HandleRemoveObjectLocation,
(const rpc::RemoveObjectLocationRequest &request,
rpc::RemoveObjectLocationReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
};
} // namespace gcs
} // namespace ray

View file

@ -1019,230 +1019,6 @@ bool TaskInfoAccessor::IsTaskLeaseUnsubscribed(const TaskID &task_id) {
return client_impl_->GetGcsSubscriber().IsTaskLeaseUnsubscribed(task_id);
}
ObjectInfoAccessor::ObjectInfoAccessor(GcsClient *client_impl)
: client_impl_(client_impl) {}
Status ObjectInfoAccessor::AsyncGetLocations(
const ObjectID &object_id,
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback) {
RAY_LOG(DEBUG) << "Getting object locations, object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
rpc::GetObjectLocationsRequest request;
request.set_object_id(object_id.Binary());
client_impl_->GetGcsRpcClient().GetObjectLocations(
request, [object_id, callback](const Status &status,
const rpc::GetObjectLocationsReply &reply) {
callback(status, reply.location_info());
RAY_LOG(DEBUG) << "Finished getting object locations, status = " << status
<< ", object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
});
return Status::OK();
}
Status ObjectInfoAccessor::AsyncGetAll(
const MultiItemCallback<rpc::ObjectLocationInfo> &callback) {
RAY_LOG(DEBUG) << "Getting all object locations.";
rpc::GetAllObjectLocationsRequest request;
client_impl_->GetGcsRpcClient().GetAllObjectLocations(
request,
[callback](const Status &status, const rpc::GetAllObjectLocationsReply &reply) {
std::vector<rpc::ObjectLocationInfo> result;
result.reserve((reply.object_location_info_list_size()));
for (int index = 0; index < reply.object_location_info_list_size(); ++index) {
result.emplace_back(reply.object_location_info_list(index));
}
callback(status, result);
RAY_LOG(DEBUG) << "Finished getting all object locations, status = " << status;
});
return Status::OK();
}
Status ObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_id,
const NodeID &node_id, size_t object_size,
const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Adding object location, object id = " << object_id
<< ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
rpc::AddObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_node_id(node_id.Binary());
request.set_size(object_size);
auto operation = [this, request, object_id, node_id,
callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().AddObjectLocation(
request, [object_id, node_id, callback, done_callback](
const Status &status, const rpc::AddObjectLocationReply &reply) {
if (callback) {
callback(status);
}
RAY_LOG(DEBUG) << "Finished adding object location, status = " << status
<< ", object id = " << object_id << ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
done_callback();
});
};
sequencer_.Post(object_id, operation);
return Status::OK();
}
Status ObjectInfoAccessor::AsyncAddSpilledUrl(const ObjectID &object_id,
const std::string &spilled_url,
const NodeID &spilled_node_id,
size_t object_size,
const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id
<< ", spilled_url = " << spilled_url
<< ", job id = " << object_id.TaskId().JobId();
rpc::AddObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_spilled_url(spilled_url);
request.set_spilled_node_id(spilled_node_id.Binary());
request.set_size(object_size);
auto operation = [this, request, callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().AddObjectLocation(
request, [callback, done_callback](const Status &status,
const rpc::AddObjectLocationReply &reply) {
if (callback) {
callback(status);
}
done_callback();
});
};
sequencer_.Post(object_id, operation);
return Status::OK();
}
Status ObjectInfoAccessor::AsyncRemoveLocation(const ObjectID &object_id,
const NodeID &node_id,
const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Removing object location, object id = " << object_id
<< ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
rpc::RemoveObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_node_id(node_id.Binary());
auto operation = [this, request, object_id, node_id,
callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().RemoveObjectLocation(
request, [object_id, node_id, callback, done_callback](
const Status &status, const rpc::RemoveObjectLocationReply &reply) {
if (callback) {
callback(status);
}
RAY_LOG(DEBUG) << "Finished removing object location, status = " << status
<< ", object id = " << object_id << ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
done_callback();
});
};
sequencer_.Post(object_id, operation);
return Status::OK();
}
Status ObjectInfoAccessor::AsyncSubscribeToLocations(
const ObjectID &object_id,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr)
<< "Failed to subscribe object location, object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
auto fetch_data_operation = [this, object_id,
subscribe](const StatusCallback &fetch_done) {
auto callback = [object_id, subscribe, fetch_done](
const Status &status,
const boost::optional<rpc::ObjectLocationInfo> &result) {
if (status.ok()) {
std::vector<rpc::ObjectLocationChange> notification;
for (const auto &loc : result->locations()) {
rpc::ObjectLocationChange update;
update.set_is_add(true);
update.set_node_id(loc.manager());
update.set_size(result->size());
notification.push_back(update);
}
if (!result->spilled_url().empty()) {
rpc::ObjectLocationChange update;
update.set_spilled_url(result->spilled_url());
update.set_spilled_node_id(result->spilled_node_id());
update.set_size(result->size());
notification.push_back(update);
}
subscribe(object_id, notification);
}
if (fetch_done) {
fetch_done(status);
}
};
RAY_CHECK_OK(AsyncGetLocations(object_id, callback));
};
auto subscribe_operation = [this, object_id,
subscribe](const StatusCallback &subscribe_done) {
return client_impl_->GetGcsSubscriber().SubscribeObject(object_id, subscribe,
subscribe_done);
};
{
absl::MutexLock lock(&mutex_);
subscribe_object_operations_[object_id] = subscribe_operation;
fetch_object_data_operations_[object_id] = fetch_data_operation;
}
return subscribe_operation(
[fetch_data_operation, done](const Status &status) { fetch_data_operation(done); });
}
void ObjectInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(DEBUG) << "Reestablishing subscription for object locations.";
// If only the GCS sever has restarted, we only need to fetch data from the GCS server.
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
// server first, then fetch data from the GCS server.
absl::MutexLock lock(&mutex_);
if (is_pubsub_server_restarted) {
for (auto &item : subscribe_object_operations_) {
RAY_CHECK_OK(item.second([this, item](const Status &status) {
absl::MutexLock lock(&mutex_);
auto fetch_object_data_operation = fetch_object_data_operations_[item.first];
// `fetch_object_data_operation` is called in the callback function of subscribe.
// Before that, if the user calls `AsyncUnsubscribeToLocations` function, the
// corresponding fetch function will be deleted, so we need to check if it's null.
if (fetch_object_data_operation != nullptr) {
fetch_object_data_operation(nullptr);
}
}));
}
} else {
for (auto &item : fetch_object_data_operations_) {
item.second(nullptr);
}
}
}
Status ObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Unsubscribing object location, object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
auto status = client_impl_->GetGcsSubscriber().UnsubscribeObject(object_id);
absl::MutexLock lock(&mutex_);
subscribe_object_operations_.erase(object_id);
fetch_object_data_operations_.erase(object_id);
RAY_LOG(DEBUG) << "Finished unsubscribing object location, object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
return status;
}
bool ObjectInfoAccessor::IsObjectUnsubscribed(const ObjectID &object_id) {
return client_impl_->GetGcsSubscriber().IsObjectUnsubscribed(object_id);
}
StatsInfoAccessor::StatsInfoAccessor(GcsClient *client_impl)
: client_impl_(client_impl) {}

View file

@ -341,113 +341,6 @@ class TaskInfoAccessor {
GcsClient *client_impl_;
};
/// `ObjectInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// object information in the GCS.
class ObjectInfoAccessor {
public:
ObjectInfoAccessor() = default;
explicit ObjectInfoAccessor(GcsClient *client_impl);
virtual ~ObjectInfoAccessor() = default;
/// Get object's locations from GCS asynchronously.
///
/// \param object_id The ID of object to lookup in GCS.
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGetLocations(
const ObjectID &object_id,
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback);
/// Get all object's locations from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finished.
/// \return Status
virtual Status AsyncGetAll(const MultiItemCallback<rpc::ObjectLocationInfo> &callback);
/// Add location of object to GCS asynchronously.
///
/// \param object_id The ID of object which location will be added to GCS.
/// \param node_id The location that will be added to GCS.
/// \param callback Callback that will be called after object has been added to GCS.
/// \return Status
virtual Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id,
size_t object_size, const StatusCallback &callback);
/// Add spilled location of object to GCS asynchronously.
///
/// \param object_id The ID of object which location will be added to GCS.
/// \param spilled_url The URL where the object has been spilled.
/// \param spilled_node_id The NodeID where the object has been spilled.
/// \param callback Callback that will be called after object has been added to GCS.
/// \return Status
virtual Status AsyncAddSpilledUrl(const ObjectID &object_id,
const std::string &spilled_url,
const NodeID &spilled_node_id, size_t object_size,
const StatusCallback &callback);
/// Remove location of object from GCS asynchronously.
///
/// \param object_id The ID of object which location will be removed from GCS.
/// \param node_id The location that will be removed from GCS.
/// \param callback Callback that will be called after the delete finished.
/// \return Status
virtual Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback);
/// Subscribe to any update of an object's location.
///
/// \param object_id The ID of the object to be subscribed to.
/// \param subscribe Callback that will be called each time when the object's
/// location is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeToLocations(
const ObjectID &object_id,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>>
&subscribe,
const StatusCallback &done);
/// Cancel subscription to any update of an object's location.
///
/// \param object_id The ID of the object to be unsubscribed to.
/// \return Status
virtual Status AsyncUnsubscribeToLocations(const ObjectID &object_id);
/// Reestablish subscription.
/// This should be called when GCS server restarts from a failure.
/// PubSub server restart will cause GCS server restart. In this case, we need to
/// resubscribe from PubSub server, otherwise we only need to fetch data from GCS
/// server.
///
/// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual void AsyncResubscribe(bool is_pubsub_server_restarted);
/// Check if the specified object is unsubscribed.
///
/// \param object_id The ID of the object.
/// \return Whether the specified object is unsubscribed.
virtual bool IsObjectUnsubscribed(const ObjectID &object_id);
private:
// Mutex to protect the subscribe_object_operations_ field and
// fetch_object_data_operations_ field.
absl::Mutex mutex_;
/// Save the subscribe operations, so we can call them again when PubSub
/// server restarts from a failure.
std::unordered_map<ObjectID, SubscribeOperation> subscribe_object_operations_
GUARDED_BY(mutex_);
/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
std::unordered_map<ObjectID, FetchDataOperation> fetch_object_data_operations_
GUARDED_BY(mutex_);
GcsClient *client_impl_;
Sequencer<ObjectID> sequencer_;
};
/// \class NodeInfoAccessor
/// `NodeInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing

View file

@ -131,7 +131,6 @@ Status GcsClient::Connect(instrumented_io_context &io_service) {
node_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
node_resource_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
task_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
object_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
worker_accessor_->AsyncResubscribe(is_pubsub_server_restarted);
};
@ -174,7 +173,6 @@ Status GcsClient::Connect(instrumented_io_context &io_service) {
node_accessor_ = std::make_unique<NodeInfoAccessor>(this);
node_resource_accessor_ = std::make_unique<NodeResourceInfoAccessor>(this);
task_accessor_ = std::make_unique<TaskInfoAccessor>(this);
object_accessor_ = std::make_unique<ObjectInfoAccessor>(this);
stats_accessor_ = std::make_unique<StatsInfoAccessor>(this);
error_accessor_ = std::make_unique<ErrorInfoAccessor>(this);
worker_accessor_ = std::make_unique<WorkerInfoAccessor>(this);

View file

@ -115,13 +115,6 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
return *job_accessor_;
}
/// Get the sub-interface for accessing object information in GCS.
/// This function is thread safe.
ObjectInfoAccessor &Objects() {
RAY_CHECK(object_accessor_ != nullptr);
return *object_accessor_;
}
/// Get the sub-interface for accessing node information in GCS.
/// This function is thread safe.
NodeInfoAccessor &Nodes() {
@ -187,7 +180,6 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
std::unique_ptr<ActorInfoAccessor> actor_accessor_;
std::unique_ptr<JobInfoAccessor> job_accessor_;
std::unique_ptr<ObjectInfoAccessor> object_accessor_;
std::unique_ptr<NodeInfoAccessor> node_accessor_;
std::unique_ptr<NodeResourceInfoAccessor> node_resource_accessor_;
std::unique_ptr<TaskInfoAccessor> task_accessor_;

View file

@ -121,40 +121,6 @@ std::vector<std::string> GlobalStateAccessor::GetAllProfileInfo() {
return profile_table_data;
}
std::vector<std::string> GlobalStateAccessor::GetAllObjectInfo() {
std::vector<std::string> object_table_data;
std::promise<bool> promise;
{
absl::ReaderMutexLock lock(&mutex_);
RAY_CHECK_OK(gcs_client_->Objects().AsyncGetAll(
TransformForMultiItemCallback<rpc::ObjectLocationInfo>(object_table_data,
promise)));
}
promise.get_future().get();
return object_table_data;
}
std::unique_ptr<std::string> GlobalStateAccessor::GetObjectInfo(
const ObjectID &object_id) {
std::unique_ptr<std::string> object_info;
std::promise<bool> promise;
auto on_done = [&object_info, &promise](
const Status &status,
const boost::optional<rpc::ObjectLocationInfo> &result) {
RAY_CHECK_OK(status);
if (result) {
object_info = std::make_unique<std::string>(result->SerializeAsString());
}
promise.set_value(true);
};
{
absl::ReaderMutexLock lock(&mutex_);
RAY_CHECK_OK(gcs_client_->Objects().AsyncGetLocations(object_id, on_done));
}
promise.get_future().get();
return object_info;
}
std::string GlobalStateAccessor::GetNodeResourceInfo(const NodeID &node_id) {
rpc::ResourceMap node_resource_map;
std::promise<void> promise;

View file

@ -70,22 +70,6 @@ class GlobalStateAccessor {
/// deserialized with protobuf function.
std::vector<std::string> GetAllProfileInfo() LOCKS_EXCLUDED(mutex_);
/// Get information of all objects from GCS Service.
///
/// \return All object info. To support multi-language, we serialize each
/// ObjectTableData and return the serialized string. Where used, it needs to be
/// deserialized with protobuf function.
std::vector<std::string> GetAllObjectInfo() LOCKS_EXCLUDED(mutex_);
/// Get information of an object from GCS Service.
///
/// \param object_id The ID of object to look up in the GCS Service.
/// \return Object info. To support multi-language, we serialize each ObjectTableData
/// and return the serialized string. Where used, it needs to be deserialized with
/// protobuf function.
std::unique_ptr<std::string> GetObjectInfo(const ObjectID &object_id)
LOCKS_EXCLUDED(mutex_);
/// Get information of a node resource from GCS Service.
///
/// \param node_id The ID of node to look up in the GCS Service.

View file

@ -450,60 +450,6 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool SubscribeToLocations(
const ObjectID &object_id,
const gcs::SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>>
&subscribe) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Objects().AsyncSubscribeToLocations(
object_id, subscribe,
[&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}
void UnsubscribeToLocations(const ObjectID &object_id) {
RAY_CHECK_OK(gcs_client_->Objects().AsyncUnsubscribeToLocations(object_id));
}
void WaitForObjectUnsubscribed(const ObjectID &object_id) {
auto condition = [this, object_id]() {
return gcs_client_->Objects().IsObjectUnsubscribed(object_id);
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
bool AddLocation(const ObjectID &object_id, const NodeID &node_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Objects().AsyncAddLocation(
object_id, node_id, 0,
[&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}
bool RemoveLocation(const ObjectID &object_id, const NodeID &node_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Objects().AsyncRemoveLocation(
object_id, node_id,
[&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}
std::vector<rpc::ObjectTableData> GetLocations(const ObjectID &object_id) {
std::promise<bool> promise;
std::vector<rpc::ObjectTableData> locations;
RAY_CHECK_OK(gcs_client_->Objects().AsyncGetLocations(
object_id,
[&locations, &promise](Status status,
const boost::optional<rpc::ObjectLocationInfo> &result) {
for (const auto &loc : result->locations()) {
locations.push_back(loc);
}
promise.set_value(status.ok());
}));
EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_));
return locations;
}
bool AddProfileData(const std::shared_ptr<rpc::ProfileTableData> &profile_table_data) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(
@ -915,52 +861,6 @@ TEST_P(GcsClientTest, TestTaskInfo) {
ASSERT_TRUE(AttemptTaskReconstruction(task_reconstruction_data));
}
TEST_P(GcsClientTest, TestObjectInfo) {
ObjectID object_id = ObjectID::FromRandom();
NodeID node_id = NodeID::FromRandom();
// Subscribe to any update of an object's location.
std::atomic<int> object_add_count(0);
std::atomic<int> object_remove_count(0);
auto on_subscribe = [&object_add_count, &object_remove_count](
const ObjectID &object_id,
const std::vector<rpc::ObjectLocationChange> &result) {
for (const auto &res : result) {
if (res.is_add()) {
++object_add_count;
} else {
++object_remove_count;
}
}
};
ASSERT_TRUE(SubscribeToLocations(object_id, on_subscribe));
// Add location of object to GCS.
ASSERT_TRUE(AddLocation(object_id, node_id));
WaitForExpectedCount(object_add_count, 1);
// Get object's locations from GCS.
auto locations = GetLocations(object_id);
ASSERT_EQ(locations.size(), 1);
ASSERT_EQ(locations.back().manager(), node_id.Binary());
// Remove location of object from GCS.
ASSERT_TRUE(RemoveLocation(object_id, node_id));
WaitForExpectedCount(object_remove_count, 1);
ASSERT_TRUE(GetLocations(object_id).empty());
// Cancel subscription to any update of an object's location.
UnsubscribeToLocations(object_id);
WaitForObjectUnsubscribed(object_id);
// Add location of object to GCS again.
ASSERT_TRUE(AddLocation(object_id, node_id));
// Assert unsubscribe succeeded.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ASSERT_EQ(object_add_count, 1);
}
TEST_P(GcsClientTest, TestStats) {
// Add profile data to GCS.
NodeID node_id = NodeID::FromRandom();
@ -1114,54 +1014,6 @@ TEST_P(GcsClientTest, TestActorTableResubscribe) {
EXPECT_TRUE(WaitForCondition(condition_subscribe_all_restart, timeout_ms_.count()));
}
TEST_P(GcsClientTest, TestObjectTableResubscribe) {
ObjectID object1_id = ObjectID::FromRandom();
ObjectID object2_id = ObjectID::FromRandom();
NodeID node_id = NodeID::FromRandom();
// Subscribe to any update of an object's location.
std::atomic<int> object1_change_count(0);
std::atomic<int> object2_change_count(0);
ASSERT_TRUE(SubscribeToLocations(
object1_id,
[&object1_change_count](const ObjectID &object_id,
const std::vector<rpc::ObjectLocationChange> &result) {
if (!result.empty()) {
++object1_change_count;
}
}));
ASSERT_TRUE(SubscribeToLocations(
object2_id,
[&object2_change_count](const ObjectID &object_id,
const std::vector<rpc::ObjectLocationChange> &result) {
if (!result.empty()) {
++object2_change_count;
}
}));
ASSERT_TRUE(AddLocation(object1_id, node_id));
WaitForExpectedCount(object1_change_count, 1);
ASSERT_TRUE(AddLocation(object2_id, node_id));
WaitForExpectedCount(object2_change_count, 1);
// Cancel subscription to any update of an object's location.
UnsubscribeToLocations(object1_id);
WaitForObjectUnsubscribed(object1_id);
// Restart GCS.
RestartGcsServer();
// When GCS client detects that GCS server has restarted, but the pub-sub server
// didn't restart, it will fetch the subscription data again from the GCS server, so
// `object2_change_count` plus 1.
WaitForExpectedCount(object2_change_count, 2);
// Add location of object to GCS again and check if resubscribe works.
ASSERT_TRUE(AddLocation(object1_id, node_id));
WaitForExpectedCount(object1_change_count, 1);
ASSERT_TRUE(AddLocation(object2_id, node_id));
WaitForExpectedCount(object2_change_count, 3);
}
TEST_P(GcsClientTest, TestNodeTableResubscribe) {
// Test that subscription of the node table can still work when GCS server restarts.
// Subscribe to node addition and removal events from GCS and cache those information.
@ -1267,27 +1119,16 @@ TEST_P(GcsClientTest, TestWorkerTableResubscribe) {
}
TEST_P(GcsClientTest, TestGcsTableReload) {
ObjectID object_id = ObjectID::FromRandom();
NodeID node_id = NodeID::FromRandom();
// Register node to GCS.
auto node_info = Mocker::GenNodeInfo();
ASSERT_TRUE(RegisterNode(*node_info));
// Add location of object to GCS.
ASSERT_TRUE(AddLocation(object_id, node_id));
// Restart GCS.
RestartGcsServer();
// Get information of nodes from GCS.
std::vector<rpc::GcsNodeInfo> node_list = GetNodeInfoList();
EXPECT_EQ(node_list.size(), 1);
// Get object's locations from GCS.
auto locations = GetLocations(object_id);
ASSERT_EQ(locations.size(), 1);
ASSERT_EQ(locations.back().manager(), node_id.Binary());
}
TEST_P(GcsClientTest, TestGcsRedisFailureDetector) {
@ -1324,23 +1165,7 @@ TEST_P(GcsClientTest, TestMultiThreadSubAndUnsub) {
}
}));
}
for (auto &thread : threads) {
thread->join();
thread.reset();
}
// Multithreading subscribe/unsubscribe objects.
for (int index = 0; index < size; ++index) {
threads[index].reset(new std::thread([this, sub_and_unsub_loop_count] {
for (int index = 0; index < sub_and_unsub_loop_count; ++index) {
auto object_id = ObjectID::FromRandom();
ASSERT_TRUE(SubscribeToLocations(
object_id, [](const ObjectID &id,
const std::vector<rpc::ObjectLocationChange> &result) {}));
UnsubscribeToLocations(object_id);
}
}));
}
for (auto &thread : threads) {
thread->join();
thread.reset();

View file

@ -18,7 +18,7 @@ namespace ray {
namespace gcs {
void GcsInitData::AsyncLoad(const EmptyCallback &on_done) {
// There are 6 kinds of table data need to be loaded.
auto count_down = std::make_shared<int>(6);
auto count_down = std::make_shared<int>(5);
auto on_load_finished = [count_down, on_done] {
if (--(*count_down) == 0) {
if (on_done) {
@ -31,8 +31,6 @@ void GcsInitData::AsyncLoad(const EmptyCallback &on_done) {
AsyncLoadNodeTableData(on_load_finished);
AsyncLoadObjectTableData(on_load_finished);
AsyncLoadResourceTableData(on_load_finished);
AsyncLoadActorTableData(on_load_finished);
@ -64,19 +62,6 @@ void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) {
RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(load_node_table_data_callback));
}
void GcsInitData::AsyncLoadObjectTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading object table data.";
auto load_object_table_data_callback =
[this,
on_done](const std::unordered_map<ObjectID, rpc::ObjectLocationInfo> &result) {
object_table_data_ = result;
RAY_LOG(INFO) << "Finished loading object table data, size = "
<< object_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(load_object_table_data_callback));
}
void GcsInitData::AsyncLoadResourceTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading cluster resources table data.";
auto load_resource_table_data_callback =

View file

@ -49,11 +49,6 @@ class GcsInitData {
return node_table_data_;
}
/// Get object location metadata.
const std::unordered_map<ObjectID, rpc::ObjectLocationInfo> &Objects() const {
return object_table_data_;
}
/// Get resource metadata.
const std::unordered_map<NodeID, rpc::ResourceMap> &ClusterResources() const {
return resource_table_data_;
@ -81,11 +76,6 @@ class GcsInitData {
/// \param on_done The callback when node metadata is loaded successfully.
void AsyncLoadNodeTableData(const EmptyCallback &on_done);
/// Load object locations metadata from the store into memory asynchronously.
///
/// \param on_done The callback when object location metadata is loaded successfully.
void AsyncLoadObjectTableData(const EmptyCallback &on_done);
/// Load resource metadata from the store into memory asynchronously.
///
/// \param on_done The callback when resource metadata is loaded successfully.
@ -111,9 +101,6 @@ class GcsInitData {
/// Node metadata.
std::unordered_map<NodeID, rpc::GcsNodeInfo> node_table_data_;
/// Object location metadata.
std::unordered_map<ObjectID, rpc::ObjectLocationInfo> object_table_data_;
/// Resource metadata.
std::unordered_map<NodeID, rpc::ResourceMap> resource_table_data_;

View file

@ -15,7 +15,7 @@
#pragma once
#include "ray/common/runtime_env_manager.h"
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"

View file

@ -1,333 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/pb_util.h"
namespace ray {
namespace gcs {
void GcsObjectManager::HandleGetObjectLocations(
const rpc::GetObjectLocationsRequest &request, rpc::GetObjectLocationsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->mutable_location_info()->set_object_id(request.object_id());
ObjectID object_id = ObjectID::FromBinary(request.object_id());
RAY_LOG(DEBUG) << "Getting object locations, job id = " << object_id.TaskId().JobId()
<< ", object id = " << object_id;
absl::MutexLock lock(&mutex_);
auto object_data = GenObjectLocationInfo(object_id);
reply->mutable_location_info()->Swap(&object_data);
RAY_LOG(DEBUG) << "Finished getting object locations, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::GET_OBJECT_LOCATIONS_REQUEST];
}
void GcsObjectManager::HandleGetAllObjectLocations(
const rpc::GetAllObjectLocationsRequest &request,
rpc::GetAllObjectLocationsReply *reply, rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting all object locations.";
absl::MutexLock lock(&mutex_);
for (auto &item : object_to_locations_) {
rpc::ObjectLocationInfo object_location_info;
object_location_info.set_object_id(item.first.Binary());
for (auto &node_id : item.second.locations) {
rpc::ObjectTableData object_table_data;
object_table_data.set_manager(node_id.Binary());
object_location_info.add_locations()->CopyFrom(object_table_data);
}
object_location_info.set_size(item.second.object_size);
reply->add_object_location_info_list()->CopyFrom(object_location_info);
}
RAY_LOG(DEBUG) << "Finished getting all object locations.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::GET_ALL_OBJECT_LOCATIONS_REQUEST];
}
void GcsObjectManager::HandleAddObjectLocation(
const rpc::AddObjectLocationRequest &request, rpc::AddObjectLocationReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ObjectID object_id = ObjectID::FromBinary(request.object_id());
NodeID node_id;
std::string spilled_url;
NodeID spilled_node_id;
if (!request.node_id().empty()) {
node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Adding object location, job id = " << object_id.TaskId().JobId()
<< ", object id = " << object_id << ", node id = " << node_id;
AddObjectLocationInCache(object_id, node_id);
} else {
absl::MutexLock lock(&mutex_);
RAY_CHECK(!request.spilled_url().empty());
spilled_url = request.spilled_url();
spilled_node_id = NodeID::FromBinary(request.spilled_node_id());
object_to_locations_[object_id].spilled_url = spilled_url;
object_to_locations_[object_id].spilled_node_id = spilled_node_id;
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id;
}
size_t size = request.size();
auto on_done = [this, object_id, node_id, spilled_url, size, spilled_node_id, reply,
send_reply_callback](const Status &status) {
if (status.ok()) {
rpc::ObjectLocationChange notification;
notification.set_is_add(true);
if (!node_id.IsNil()) {
notification.set_node_id(node_id.Binary());
}
if (!spilled_url.empty()) {
notification.set_spilled_url(spilled_url);
notification.set_spilled_node_id(spilled_node_id.Binary());
}
notification.set_size(size);
RAY_CHECK_OK(gcs_publisher_->PublishObject(object_id, notification, nullptr));
RAY_LOG(DEBUG) << "Finished adding object location, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id
<< ", node id = " << node_id << ", task id = " << object_id.TaskId()
<< ", spilled_url = " << spilled_url
<< ", spilled_node_id = " << spilled_node_id;
} else {
RAY_LOG(ERROR) << "Failed to add object location: " << status.ToString()
<< ", job id = " << object_id.TaskId().JobId()
<< ", object id = " << object_id << ", node id = " << node_id;
}
// We should only reply after the update is written to storage.
// So, if GCS server crashes before writing storage, GCS client will retry this
// request.
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
absl::MutexLock lock(&mutex_);
object_to_locations_[object_id].object_size = size;
const auto object_data = GenObjectLocationInfo(object_id);
Status status = gcs_table_storage_->ObjectTable().Put(object_id, object_data, on_done);
if (!status.ok()) {
on_done(status);
}
++counts_[CountType::ADD_OBJECT_LOCATION_REQUEST];
}
void GcsObjectManager::HandleRemoveObjectLocation(
const rpc::RemoveObjectLocationRequest &request,
rpc::RemoveObjectLocationReply *reply, rpc::SendReplyCallback send_reply_callback) {
ObjectID object_id = ObjectID::FromBinary(request.object_id());
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Removing object location, job id = " << object_id.TaskId().JobId()
<< ", object id = " << object_id << ", node id = " << node_id;
RemoveObjectLocationInCache(object_id, node_id);
auto on_done = [this, object_id, node_id, reply,
send_reply_callback](const Status &status) {
if (status.ok()) {
RAY_CHECK_OK(gcs_publisher_->PublishObject(
object_id, *gcs::CreateObjectLocationChange(node_id, false), nullptr));
RAY_LOG(DEBUG) << "Finished removing object location, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id
<< ", node id = " << node_id;
} else {
RAY_LOG(ERROR) << "Failed to remove object location: " << status.ToString()
<< ", job id = " << object_id.TaskId().JobId()
<< ", object id = " << object_id << ", node id = " << node_id;
}
// We should only reply after the update is written to storage.
// So, if GCS server crashes before writing storage, GCS client will retry this
// request.
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
absl::MutexLock lock(&mutex_);
auto object_location_set =
GetObjectLocationSet(object_id, /* create_if_not_exist */ false);
Status status;
if (object_location_set != nullptr) {
const auto object_data = GenObjectLocationInfo(object_id);
status = gcs_table_storage_->ObjectTable().Put(object_id, object_data, on_done);
} else {
status = gcs_table_storage_->ObjectTable().Delete(object_id, on_done);
}
if (!status.ok()) {
on_done(status);
}
++counts_[CountType::REMOVE_OBJECT_LOCATION_REQUEST];
}
void GcsObjectManager::AddObjectsLocation(
const NodeID &node_id, const absl::flat_hash_set<ObjectID> &object_ids) {
// TODO(micafan) Optimize the lock when necessary.
// Maybe use read/write lock. Or reduce the granularity of the lock.
absl::MutexLock lock(&mutex_);
auto *objects_on_node = GetObjectSetByNode(node_id, /* create_if_not_exist */ true);
objects_on_node->insert(object_ids.begin(), object_ids.end());
for (const auto &object_id : object_ids) {
auto *object_locations =
GetObjectLocationSet(object_id, /* create_if_not_exist */ true);
object_locations->locations.emplace(node_id);
}
}
void GcsObjectManager::AddObjectLocationInCache(const ObjectID &object_id,
const NodeID &node_id) {
absl::MutexLock lock(&mutex_);
auto *objects_on_node = GetObjectSetByNode(node_id, /* create_if_not_exist */ true);
objects_on_node->emplace(object_id);
auto *object_locations =
GetObjectLocationSet(object_id, /* create_if_not_exist */ true);
object_locations->locations.emplace(node_id);
}
absl::flat_hash_set<NodeID> GcsObjectManager::GetObjectLocations(
const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
auto *object_locations = GetObjectLocationSet(object_id);
if (object_locations) {
return object_locations->locations;
}
return absl::flat_hash_set<NodeID>{};
}
void GcsObjectManager::OnNodeRemoved(const NodeID &node_id) {
absl::MutexLock lock(&mutex_);
ObjectSet objects_on_node;
auto it = node_to_objects_.find(node_id);
if (it != node_to_objects_.end()) {
objects_on_node.swap(it->second);
node_to_objects_.erase(it);
}
if (objects_on_node.empty()) {
return;
}
for (const auto &object_id : objects_on_node) {
auto *object_locations = GetObjectLocationSet(object_id);
if (object_locations) {
object_locations->locations.erase(node_id);
if (object_locations->locations.empty() && object_locations->spilled_url.empty()) {
object_to_locations_.erase(object_id);
}
}
}
}
void GcsObjectManager::RemoveObjectLocationInCache(const ObjectID &object_id,
const NodeID &node_id) {
absl::MutexLock lock(&mutex_);
auto *object_locations = GetObjectLocationSet(object_id);
if (object_locations) {
object_locations->locations.erase(node_id);
if (object_locations->locations.empty() && object_locations->spilled_url.empty()) {
object_to_locations_.erase(object_id);
}
}
auto *objects_on_node = GetObjectSetByNode(node_id);
if (objects_on_node) {
objects_on_node->erase(object_id);
if (objects_on_node->empty()) {
node_to_objects_.erase(node_id);
}
}
}
GcsObjectManager::LocationSet *GcsObjectManager::GetObjectLocationSet(
const ObjectID &object_id, bool create_if_not_exist) {
LocationSet *object_locations = nullptr;
auto it = object_to_locations_.find(object_id);
if (it != object_to_locations_.end()) {
object_locations = &it->second;
} else if (create_if_not_exist) {
auto ret = object_to_locations_.emplace(std::make_pair(object_id, LocationSet{}));
RAY_CHECK(ret.second);
object_locations = &(ret.first->second);
}
return object_locations;
}
GcsObjectManager::ObjectSet *GcsObjectManager::GetObjectSetByNode(
const NodeID &node_id, bool create_if_not_exist) {
ObjectSet *objects_on_node = nullptr;
auto it = node_to_objects_.find(node_id);
if (it != node_to_objects_.end()) {
objects_on_node = &it->second;
} else if (create_if_not_exist) {
auto ret = node_to_objects_.emplace(std::make_pair(node_id, ObjectSet{}));
RAY_CHECK(ret.second);
objects_on_node = &(ret.first->second);
}
return objects_on_node;
}
const ObjectLocationInfo GcsObjectManager::GenObjectLocationInfo(
const ObjectID &object_id) const {
ObjectLocationInfo object_data;
object_data.set_object_id(object_id.Binary());
auto it = object_to_locations_.find(object_id);
if (it != object_to_locations_.end()) {
for (const auto &node_id : it->second.locations) {
object_data.add_locations()->set_manager(node_id.Binary());
}
object_data.set_spilled_url(it->second.spilled_url);
object_data.set_spilled_node_id(it->second.spilled_node_id.Binary());
object_data.set_size(it->second.object_size);
}
return object_data;
}
void GcsObjectManager::Initialize(const GcsInitData &gcs_init_data) {
absl::flat_hash_map<NodeID, ObjectSet> node_to_objects;
for (const auto &item : gcs_init_data.Objects()) {
for (const auto &loc : item.second.locations()) {
node_to_objects[NodeID::FromBinary(loc.manager())].insert(item.first);
}
}
for (auto &item : node_to_objects) {
AddObjectsLocation(item.first, item.second);
}
}
std::string GcsObjectManager::DebugString() const {
absl::MutexLock lock(&mutex_);
std::ostringstream stream;
stream << "GcsObjectManager: {GetObjectLocations request count: "
<< counts_[CountType::GET_OBJECT_LOCATIONS_REQUEST]
<< ", GetAllObjectLocations request count: "
<< counts_[CountType::GET_ALL_OBJECT_LOCATIONS_REQUEST]
<< ", AddObjectLocation request count: "
<< counts_[CountType::ADD_OBJECT_LOCATION_REQUEST]
<< ", RemoveObjectLocation request count: "
<< counts_[CountType::REMOVE_OBJECT_LOCATION_REQUEST]
<< ", Object count: " << object_to_locations_.size() << "}";
return stream.str();
}
} // namespace gcs
} // namespace ray

View file

@ -1,159 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
namespace ray {
namespace gcs {
class GcsObjectManager : public rpc::ObjectInfoHandler {
public:
explicit GcsObjectManager(std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> &gcs_publisher,
gcs::GcsNodeManager &gcs_node_manager)
: gcs_table_storage_(std::move(gcs_table_storage)), gcs_publisher_(gcs_publisher) {
gcs_node_manager.AddNodeRemovedListener(
[this](const std::shared_ptr<rpc::GcsNodeInfo> &node) {
// All of the related actors should be reconstructed when a node is removed from
// the GCS.
OnNodeRemoved(NodeID::FromBinary(node->node_id()));
});
}
void HandleGetObjectLocations(const rpc::GetObjectLocationsRequest &request,
rpc::GetObjectLocationsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleGetAllObjectLocations(const rpc::GetAllObjectLocationsRequest &request,
rpc::GetAllObjectLocationsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleAddObjectLocation(const rpc::AddObjectLocationRequest &request,
rpc::AddObjectLocationReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleRemoveObjectLocation(const rpc::RemoveObjectLocationRequest &request,
rpc::RemoveObjectLocationReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param gcs_init_data.
void Initialize(const GcsInitData &gcs_init_data);
std::string DebugString() const;
protected:
struct LocationSet {
absl::flat_hash_set<NodeID> locations;
std::string spilled_url = "";
NodeID spilled_node_id = NodeID::Nil();
size_t object_size = 0;
};
/// Add a location of objects.
/// If the GCS server restarts, this function is used to reload data from storage.
///
/// \param node_id The object location that will be added.
/// \param object_ids The ids of objects which location will be added.
void AddObjectsLocation(const NodeID &node_id,
const absl::flat_hash_set<ObjectID> &object_ids)
LOCKS_EXCLUDED(mutex_);
/// Add a new location for the given object in local cache.
///
/// \param object_id The id of object.
/// \param node_id The node id of the new location.
void AddObjectLocationInCache(const ObjectID &object_id, const NodeID &node_id)
LOCKS_EXCLUDED(mutex_);
/// Get all locations of the given object.
///
/// \param object_id The id of object to lookup.
/// \return Object locations.
absl::flat_hash_set<NodeID> GetObjectLocations(const ObjectID &object_id)
LOCKS_EXCLUDED(mutex_);
/// Handler if a node is removed.
///
/// \param node_id The node that will be removed.
void OnNodeRemoved(const NodeID &node_id) LOCKS_EXCLUDED(mutex_);
/// Remove object's location.
///
/// \param object_id The id of the object which location will be removed.
/// \param node_id The location that will be removed.
void RemoveObjectLocationInCache(const ObjectID &object_id, const NodeID &node_id)
LOCKS_EXCLUDED(mutex_);
private:
typedef absl::flat_hash_set<ObjectID> ObjectSet;
const ObjectLocationInfo GenObjectLocationInfo(const ObjectID &object_id) const
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Get object locations by object id from map.
/// Will create it if not exist and the flag create_if_not_exist is set to true.
///
/// \param object_id The id of object to lookup.
/// \param create_if_not_exist Whether to create a new one if not exist.
/// \return LocationSet *
GcsObjectManager::LocationSet *GetObjectLocationSet(const ObjectID &object_id,
bool create_if_not_exist = false)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Get objects by node id from map.
/// Will create it if not exist and the flag create_if_not_exist is set to true.
///
/// \param node_id The id of node to lookup.
/// \param create_if_not_exist Whether to create a new one if not exist.
/// \return ObjectSet *
GcsObjectManager::ObjectSet *GetObjectSetByNode(const NodeID &node_id,
bool create_if_not_exist = false)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
mutable absl::Mutex mutex_;
/// Mapping from object id to object locations.
/// This is the local cache of objects' locations in the storage.
absl::flat_hash_map<ObjectID, LocationSet> object_to_locations_ GUARDED_BY(mutex_);
/// Mapping from node id to objects that held by the node.
/// This is the local cache of nodes' objects in the storage.
absl::flat_hash_map<NodeID, ObjectSet> node_to_objects_ GUARDED_BY(mutex_);
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
// Debug info.
enum CountType {
GET_OBJECT_LOCATIONS_REQUEST = 0,
GET_ALL_OBJECT_LOCATIONS_REQUEST = 1,
ADD_OBJECT_LOCATION_REQUEST = 2,
REMOVE_OBJECT_LOCATION_REQUEST = 3,
CountType_MAX = 4,
};
uint64_t counts_[CountType::CountType_MAX] = {0};
};
} // namespace gcs
} // namespace ray

View file

@ -21,7 +21,6 @@
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
#include "ray/gcs/gcs_server/gcs_job_manager.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/gcs/gcs_server/stats_handler_impl.h"
@ -120,9 +119,6 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs actor manager.
InitGcsActorManager(gcs_init_data);
// Init object manager.
InitObjectManager(gcs_init_data);
// Init gcs worker manager.
InitGcsWorkerManager();
@ -329,18 +325,6 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
rpc_server_.RegisterService(*placement_group_info_service_);
}
void GcsServer::InitObjectManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_publisher_ && gcs_node_manager_);
gcs_object_manager_.reset(
new GcsObjectManager(gcs_table_storage_, gcs_publisher_, *gcs_node_manager_));
// Initialize by gcs tables data.
gcs_object_manager_->Initialize(gcs_init_data);
// Register service.
object_info_service_.reset(
new rpc::ObjectInfoGrpcService(main_service_, *gcs_object_manager_));
rpc_server_.RegisterService(*object_info_service_);
}
void GcsServer::StoreGcsServerAddressInRedis() {
std::string ip = config_.node_ip_address;
if (ip.empty()) {
@ -525,7 +509,6 @@ void GcsServer::PrintDebugInfo() {
std::ostringstream stream;
stream << gcs_node_manager_->DebugString() << "\n"
<< gcs_actor_manager_->DebugString() << "\n"
<< gcs_object_manager_->DebugString() << "\n"
<< gcs_placement_group_manager_->DebugString() << "\n"
<< gcs_publisher_->DebugString() << "\n"
<< ((rpc::DefaultTaskInfoHandler *)task_info_handler_.get())->DebugString();

View file

@ -19,7 +19,6 @@
#include "ray/gcs/gcs_server/gcs_heartbeat_manager.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_kv_manager.h"
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_report_poller.h"
@ -107,9 +106,6 @@ class GcsServer {
/// Initialize gcs placement group manager.
void InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data);
/// Initialize gcs object manager.
void InitObjectManager(const GcsInitData &gcs_init_data);
/// Initialize gcs worker manager.
void InitGcsWorkerManager();
@ -192,9 +188,6 @@ class GcsServer {
std::unique_ptr<rpc::NodeResourceInfoGrpcService> node_resource_info_service_;
/// Heartbeat info handler and service.
std::unique_ptr<rpc::HeartbeatInfoGrpcService> heartbeat_info_service_;
/// Object info handler and service.
std::unique_ptr<gcs::GcsObjectManager> gcs_object_manager_;
std::unique_ptr<rpc::ObjectInfoGrpcService> object_info_service_;
/// Task info handler and service.
std::unique_ptr<rpc::TaskInfoHandler> task_info_handler_;
std::unique_ptr<rpc::TaskInfoGrpcService> task_info_service_;

View file

@ -295,7 +295,6 @@ class GcsTableStorage {
task_lease_table_ = std::make_unique<GcsTaskLeaseTable>(store_client_);
task_reconstruction_table_ =
std::make_unique<GcsTaskReconstructionTable>(store_client_);
object_table_ = std::make_unique<GcsObjectTable>(store_client_);
node_table_ = std::make_unique<GcsNodeTable>(store_client_);
node_resource_table_ = std::make_unique<GcsNodeResourceTable>(store_client_);
placement_group_schedule_table_ =
@ -337,11 +336,6 @@ class GcsTableStorage {
return *task_reconstruction_table_;
}
GcsObjectTable &ObjectTable() {
RAY_CHECK(object_table_ != nullptr);
return *object_table_;
}
GcsNodeTable &NodeTable() {
RAY_CHECK(node_table_ != nullptr);
return *node_table_;
@ -390,7 +384,6 @@ class GcsTableStorage {
std::unique_ptr<GcsTaskTable> task_table_;
std::unique_ptr<GcsTaskLeaseTable> task_lease_table_;
std::unique_ptr<GcsTaskReconstructionTable> task_reconstruction_table_;
std::unique_ptr<GcsObjectTable> object_table_;
std::unique_ptr<GcsNodeTable> node_table_;
std::unique_ptr<GcsNodeResourceTable> node_resource_table_;
std::unique_ptr<GcsPlacementGroupScheduleTable> placement_group_schedule_table_;

View file

@ -1,158 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "gtest/gtest.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/gcs/gcs_server/test/gcs_server_test_util.h"
#include "ray/gcs/test/gcs_test_util.h"
namespace ray {
class MockedGcsObjectManager : public gcs::GcsObjectManager {
public:
explicit MockedGcsObjectManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPublisher> &gcs_publisher,
gcs::GcsNodeManager &gcs_node_manager)
: gcs::GcsObjectManager(gcs_table_storage, gcs_publisher, gcs_node_manager) {}
public:
void AddObjectsLocation(const NodeID &node_id,
const absl::flat_hash_set<ObjectID> &object_ids) {
gcs::GcsObjectManager::AddObjectsLocation(node_id, object_ids);
}
void AddObjectLocationInCache(const ObjectID &object_id, const NodeID &node_id) {
gcs::GcsObjectManager::AddObjectLocationInCache(object_id, node_id);
}
absl::flat_hash_set<NodeID> GetObjectLocations(const ObjectID &object_id) {
return gcs::GcsObjectManager::GetObjectLocations(object_id);
}
void OnNodeRemoved(const NodeID &node_id) {
gcs::GcsObjectManager::OnNodeRemoved(node_id);
}
void RemoveObjectLocationInCache(const ObjectID &object_id, const NodeID &node_id) {
gcs::GcsObjectManager::RemoveObjectLocationInCache(object_id, node_id);
}
};
class GcsObjectManagerTest : public ::testing::Test {
public:
void SetUp() override {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletClient>();
raylet_client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &addr) { return raylet_client_; });
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
gcs_publisher_, gcs_table_storage_, raylet_client_pool_);
gcs_object_manager_ = std::make_shared<MockedGcsObjectManager>(
gcs_table_storage_, gcs_publisher_, *gcs_node_manager_);
GenTestData();
}
void GenTestData() {
for (size_t i = 0; i < object_count_; ++i) {
ObjectID object_id = ObjectID::FromRandom();
object_ids_.emplace(object_id);
}
for (size_t i = 0; i < node_count_; ++i) {
NodeID node_id = NodeID::FromRandom();
node_ids_.emplace(node_id);
}
}
void CheckLocations(const absl::flat_hash_set<NodeID> &locations) {
ASSERT_EQ(locations.size(), node_ids_.size());
for (const auto &location : locations) {
auto it = node_ids_.find(location);
ASSERT_TRUE(it != node_ids_.end());
ASSERT_TRUE(location == *it);
}
}
protected:
instrumented_io_context io_service_;
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
std::shared_ptr<MockedGcsObjectManager> gcs_object_manager_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
size_t object_count_{5};
size_t node_count_{10};
absl::flat_hash_set<ObjectID> object_ids_;
absl::flat_hash_set<NodeID> node_ids_;
};
TEST_F(GcsObjectManagerTest, AddObjectsLocationAndGetLocationTest) {
for (const auto &node_id : node_ids_) {
gcs_object_manager_->AddObjectsLocation(node_id, object_ids_);
}
for (const auto &object_id : object_ids_) {
auto locations = gcs_object_manager_->GetObjectLocations(object_id);
CheckLocations(locations);
}
}
TEST_F(GcsObjectManagerTest, AddObjectLocationInCacheTest) {
for (const auto &object_id : object_ids_) {
for (const auto &node_id : node_ids_) {
gcs_object_manager_->AddObjectLocationInCache(object_id, node_id);
}
}
for (const auto &object_id : object_ids_) {
auto locations = gcs_object_manager_->GetObjectLocations(object_id);
CheckLocations(locations);
}
}
TEST_F(GcsObjectManagerTest, RemoveNodeTest) {
for (const auto &node_id : node_ids_) {
gcs_object_manager_->AddObjectsLocation(node_id, object_ids_);
}
gcs_object_manager_->OnNodeRemoved(*node_ids_.begin());
auto locations = gcs_object_manager_->GetObjectLocations(*object_ids_.begin());
ASSERT_EQ(locations.size() + 1, node_ids_.size());
locations.emplace(*node_ids_.begin());
ASSERT_EQ(locations.size(), node_ids_.size());
}
TEST_F(GcsObjectManagerTest, RemoveObjectLocationTest) {
for (const auto &node_id : node_ids_) {
gcs_object_manager_->AddObjectsLocation(node_id, object_ids_);
}
gcs_object_manager_->RemoveObjectLocationInCache(*object_ids_.begin(),
*node_ids_.begin());
auto locations = gcs_object_manager_->GetObjectLocations(*object_ids_.begin());
ASSERT_EQ(locations.size() + 1, node_ids_.size());
locations.emplace(*node_ids_.begin());
ASSERT_EQ(locations.size(), node_ids_.size());
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View file

@ -189,49 +189,6 @@ class GcsServerTest : public ::testing::Test {
return resources;
}
bool AddObjectLocation(const rpc::AddObjectLocationRequest &request) {
std::promise<bool> promise;
client_->AddObjectLocation(
request,
[&promise](const Status &status, const rpc::AddObjectLocationReply &reply) {
RAY_CHECK_OK(status);
promise.set_value(true);
});
return WaitReady(promise.get_future(), timeout_ms_);
}
bool RemoveObjectLocation(const rpc::RemoveObjectLocationRequest &request) {
std::promise<bool> promise;
client_->RemoveObjectLocation(
request,
[&promise](const Status &status, const rpc::RemoveObjectLocationReply &reply) {
RAY_CHECK_OK(status);
promise.set_value(true);
});
return WaitReady(promise.get_future(), timeout_ms_);
}
std::vector<rpc::ObjectTableData> GetObjectLocations(const std::string &object_id) {
std::vector<rpc::ObjectTableData> object_locations;
rpc::GetObjectLocationsRequest request;
request.set_object_id(object_id);
std::promise<bool> promise;
client_->GetObjectLocations(
request, [&object_locations, &promise](
const Status &status, const rpc::GetObjectLocationsReply &reply) {
RAY_CHECK_OK(status);
for (const auto &loc : reply.location_info().locations()) {
object_locations.push_back(loc);
}
promise.set_value(true);
});
EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_));
return object_locations;
}
bool AddTask(const rpc::AddTaskRequest &request) {
std::promise<bool> promise;
client_->AddTask(request,
@ -503,37 +460,6 @@ TEST_F(GcsServerTest, TestHeartbeatWithNoRegistering) {
rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD);
}
TEST_F(GcsServerTest, TestObjectInfo) {
// Create object table data
ObjectID object_id = ObjectID::FromRandom();
NodeID node1_id = NodeID::FromRandom();
NodeID node2_id = NodeID::FromRandom();
// Add object location
rpc::AddObjectLocationRequest add_object_location_request;
add_object_location_request.set_object_id(object_id.Binary());
add_object_location_request.set_node_id(node1_id.Binary());
ASSERT_TRUE(AddObjectLocation(add_object_location_request));
std::vector<rpc::ObjectTableData> object_locations =
GetObjectLocations(object_id.Binary());
ASSERT_TRUE(object_locations.size() == 1);
ASSERT_TRUE(object_locations[0].manager() == node1_id.Binary());
add_object_location_request.set_node_id(node2_id.Binary());
ASSERT_TRUE(AddObjectLocation(add_object_location_request));
object_locations = GetObjectLocations(object_id.Binary());
ASSERT_TRUE(object_locations.size() == 2);
// Remove object location
rpc::RemoveObjectLocationRequest remove_object_location_request;
remove_object_location_request.set_object_id(object_id.Binary());
remove_object_location_request.set_node_id(node1_id.Binary());
ASSERT_TRUE(RemoveObjectLocation(remove_object_location_request));
object_locations = GetObjectLocations(object_id.Binary());
ASSERT_TRUE(object_locations.size() == 1);
ASSERT_TRUE(object_locations[0].manager() == node2_id.Binary());
}
TEST_F(GcsServerTest, TestTaskInfo) {
// Create task_table_data
JobID job_id = JobID::FromInt(1);

View file

@ -111,19 +111,6 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
return worker_failure_info_ptr;
}
/// Helper function to produce object location change.
///
/// \param node_id The node ID that this object appeared on or was evicted by.
/// \param is_add Whether the object is appeared on the node.
/// \return The object location change created by this method.
inline std::shared_ptr<ray::rpc::ObjectLocationChange> CreateObjectLocationChange(
const NodeID &node_id, bool is_add) {
auto object_location_change = std::make_shared<ray::rpc::ObjectLocationChange>();
object_location_change->set_is_add(is_add);
object_location_change->set_node_id(node_id.Binary());
return object_location_change;
}
} // namespace gcs
} // namespace ray

View file

@ -205,12 +205,6 @@ std::string GcsPubSub::DebugString() const {
return stream.str();
}
Status GcsPublisher::PublishObject(const ObjectID &id,
const rpc::ObjectLocationChange &message,
const StatusCallback &done) {
return pubsub_->Publish(OBJECT_CHANNEL, id.Hex(), message.SerializeAsString(), done);
}
Status GcsPublisher::PublishActor(const ActorID &id, const rpc::ActorTableData &message,
const StatusCallback &done) {
return pubsub_->Publish(ACTOR_CHANNEL, id.Hex(), message.SerializeAsString(), done);
@ -355,26 +349,6 @@ bool GcsSubscriber::IsTaskLeaseUnsubscribed(const TaskID &id) {
return pubsub_->IsUnsubscribed(TASK_LEASE_CHANNEL, id.Hex());
}
Status GcsSubscriber::SubscribeObject(
const ObjectID &id,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>> &subscribe,
const StatusCallback &done) {
auto on_subscribe = [id, subscribe](const std::string &, const std::string &data) {
rpc::ObjectLocationChange object_location_change;
object_location_change.ParseFromString(data);
subscribe(id, {object_location_change});
};
return pubsub_->Subscribe(OBJECT_CHANNEL, id.Hex(), on_subscribe, done);
}
Status GcsSubscriber::UnsubscribeObject(const ObjectID &id) {
return pubsub_->Unsubscribe(OBJECT_CHANNEL, id.Hex());
}
bool GcsSubscriber::IsObjectUnsubscribed(const ObjectID &id) {
return pubsub_->IsUnsubscribed(OBJECT_CHANNEL, id.Hex());
}
Status GcsSubscriber::SubscribeAllWorkerFailures(
const ItemCallback<rpc::WorkerDeltaData> &subscribe, const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &, const std::string &data) {

View file

@ -38,7 +38,6 @@ inline constexpr std::string_view NODE_CHANNEL = "NODE";
inline constexpr std::string_view NODE_RESOURCE_CHANNEL = "NODE_RESOURCE";
inline constexpr std::string_view ACTOR_CHANNEL = "ACTOR";
inline constexpr std::string_view WORKER_CHANNEL = "WORKER";
inline constexpr std::string_view OBJECT_CHANNEL = "OBJECT";
inline constexpr std::string_view TASK_LEASE_CHANNEL = "TASK_LEASE";
inline constexpr std::string_view RESOURCES_BATCH_CHANNEL = "RESOURCES_BATCH";
inline constexpr std::string_view ERROR_INFO_CHANNEL = "ERROR_INFO";
@ -247,11 +246,6 @@ class GcsPublisher {
Status PublishResourceBatch(const rpc::ResourceUsageBatchData &message,
const StatusCallback &done);
/// TODO: Object publishing is deprecated. Remove this and callsites.
/// Uses Redis pubsub.
Status PublishObject(const ObjectID &id, const rpc::ObjectLocationChange &message,
const StatusCallback &done);
/// Prints debugging info for the publisher.
std::string DebugString() const;
@ -317,16 +311,6 @@ class GcsSubscriber {
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done);
/// TODO: Object subscribing is deprecated. Remove this and callsites.
/// Uses Redis pubsub.
Status SubscribeObject(
const ObjectID &id,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>>
&subscribe,
const StatusCallback &done);
Status UnsubscribeObject(const ObjectID &id);
bool IsObjectUnsubscribed(const ObjectID &id);
/// Prints debugging info for the subscriber.
std::string DebugString() const;

View file

@ -247,74 +247,6 @@ service HeartbeatInfoGcsService {
rpc CheckAlive(CheckAliveRequest) returns (CheckAliveReply);
}
message GetObjectLocationsRequest {
// The ID of object to lookup in GCS Service.
bytes object_id = 1;
}
message GetObjectLocationsReply {
GcsStatus status = 1;
// Object location information.
ObjectLocationInfo location_info = 2;
}
message GetAllObjectLocationsRequest {
}
message GetAllObjectLocationsReply {
GcsStatus status = 1;
// Data of object location info.
repeated ObjectLocationInfo object_location_info_list = 2;
}
message AddObjectLocationRequest {
// The ID of object which location will be added to GCS Service.
bytes object_id = 1;
// The location that will be added to GCS Service.
bytes node_id = 2;
// The spilled URL that will be added to GCS Service. Either this or the node
// ID should be set.
string spilled_url = 3;
// The node id that spills the object to the disk.
// It will be Nil if it uses a distributed external storage.
bytes spilled_node_id = 4;
// The size of the object in bytes.
uint64 size = 5;
}
message AddObjectLocationReply {
GcsStatus status = 1;
}
message AddObjectSpilledUrlReply {
GcsStatus status = 1;
}
message RemoveObjectLocationRequest {
// The ID of object which location will be removed from GCS Service.
bytes object_id = 1;
// The location that will be removed from GCS Service.
bytes node_id = 2;
}
message RemoveObjectLocationReply {
GcsStatus status = 1;
}
// Service for object info access.
service ObjectInfoGcsService {
// Get object's locations from GCS Service.
rpc GetObjectLocations(GetObjectLocationsRequest) returns (GetObjectLocationsReply);
// Get all object's locations from GCS Service.
rpc GetAllObjectLocations(GetAllObjectLocationsRequest)
returns (GetAllObjectLocationsReply);
// Add location of object to GCS Service.
rpc AddObjectLocation(AddObjectLocationRequest) returns (AddObjectLocationReply);
// Remove location of object from GCS Service.
rpc RemoveObjectLocation(RemoveObjectLocationRequest)
returns (RemoveObjectLocationReply);
}
message AddTaskRequest {
TaskTableData task_data = 1;
}

View file

@ -39,10 +39,8 @@ class LocalObjectManager {
LocalObjectManager(
const NodeID &node_id, std::string self_node_address, int self_node_port,
size_t free_objects_batch_size, int64_t free_objects_period_ms,
IOWorkerPoolInterface &io_worker_pool,
gcs::ObjectInfoAccessor &object_info_accessor,
rpc::CoreWorkerClientPool &owner_client_pool, int max_io_workers,
int64_t min_spilling_size, bool is_external_storage_type_fs,
IOWorkerPoolInterface &io_worker_pool, rpc::CoreWorkerClientPool &owner_client_pool,
int max_io_workers, int64_t min_spilling_size, bool is_external_storage_type_fs,
int64_t max_fused_object_count,
std::function<void(const std::vector<ObjectID> &)> on_objects_freed,
std::function<bool(const ray::ObjectID &)> is_plasma_object_spillable,
@ -53,7 +51,6 @@ class LocalObjectManager {
free_objects_period_ms_(free_objects_period_ms),
free_objects_batch_size_(free_objects_batch_size),
io_worker_pool_(io_worker_pool),
object_info_accessor_(object_info_accessor),
owner_client_pool_(owner_client_pool),
on_objects_freed_(on_objects_freed),
last_free_objects_at_ms_(current_time_ms()),
@ -197,9 +194,6 @@ class LocalObjectManager {
/// A worker pool, used for spilling and restoring objects.
IOWorkerPoolInterface &io_worker_pool_;
/// A GCS client, used to update locations for spilled objects.
gcs::ObjectInfoAccessor &object_info_accessor_;
/// Cache of gRPC clients to owners of objects pinned on
/// this node.
rpc::CoreWorkerClientPool &owner_client_pool_;

View file

@ -269,7 +269,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
self_node_id_, config.node_manager_address, config.node_manager_port,
RayConfig::instance().free_objects_batch_size(),
RayConfig::instance().free_objects_period_milliseconds(), worker_pool_,
gcs_client_->Objects(), worker_rpc_pool_,
worker_rpc_pool_,
/*max_io_workers*/ config.max_io_workers,
/*min_spilling_size*/ config.min_spilling_size,
/*is_external_storage_type_fs*/

View file

@ -225,58 +225,6 @@ class MockIOWorkerPool : public IOWorkerPoolInterface {
std::make_shared<MockIOWorker>(WorkerID::FromRandom(), 1234, io_worker_client);
};
class MockObjectInfoAccessor : public gcs::ObjectInfoAccessor {
public:
MOCK_METHOD2(
AsyncGetLocations,
Status(const ObjectID &object_id,
const gcs::OptionalItemCallback<rpc::ObjectLocationInfo> &callback));
MOCK_METHOD1(AsyncGetAll,
Status(const gcs::MultiItemCallback<rpc::ObjectLocationInfo> &callback));
MOCK_METHOD4(AsyncAddLocation,
Status(const ObjectID &object_id, const NodeID &node_id,
size_t object_size, const gcs::StatusCallback &callback));
Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
const NodeID &spilled_node_id, size_t object_size,
const gcs::StatusCallback &callback) {
object_urls[object_id] = spilled_url;
callbacks.push_back(callback);
return Status();
}
bool ReplyAsyncAddSpilledUrl() {
if (callbacks.size() == 0) {
return false;
}
auto callback = callbacks.front();
callback(Status::OK());
callbacks.pop_front();
return true;
}
MOCK_METHOD3(AsyncRemoveLocation,
Status(const ObjectID &object_id, const NodeID &node_id,
const gcs::StatusCallback &callback));
MOCK_METHOD3(AsyncSubscribeToLocations,
Status(const ObjectID &object_id,
const gcs::SubscribeCallback<
ObjectID, std::vector<rpc::ObjectLocationChange>> &subscribe,
const gcs::StatusCallback &done));
MOCK_METHOD1(AsyncUnsubscribeToLocations, Status(const ObjectID &object_id));
MOCK_METHOD1(AsyncResubscribe, void(bool is_pubsub_server_restarted));
MOCK_METHOD1(IsObjectUnsubscribed, bool(const ObjectID &object_id));
absl::flat_hash_map<ObjectID, std::string> object_urls;
std::list<gcs::StatusCallback> callbacks;
};
class MockObjectBuffer : public Buffer {
public:
MockObjectBuffer(size_t size, ObjectID object_id,
@ -308,7 +256,7 @@ class LocalObjectManagerTest : public ::testing::Test {
max_fused_object_count_(15),
manager(
manager_node_id_, "address", 1234, free_objects_batch_size,
/*free_objects_period_ms=*/1000, worker_pool, object_table, client_pool,
/*free_objects_period_ms=*/1000, worker_pool, client_pool,
/*max_io_workers=*/2,
/*min_spilling_size=*/0,
/*is_external_storage_type_fs=*/true,
@ -341,7 +289,6 @@ class LocalObjectManagerTest : public ::testing::Test {
std::shared_ptr<MockWorkerClient> owner_client;
rpc::CoreWorkerClientPool client_pool;
MockIOWorkerPool worker_pool;
MockObjectInfoAccessor object_table;
NodeID manager_node_id_;
size_t max_fused_object_count_;
LocalObjectManager manager;

View file

@ -102,8 +102,6 @@ class GcsRpcClient {
client_call_manager);
heartbeat_info_grpc_client_ = std::make_unique<GrpcClient<HeartbeatInfoGcsService>>(
address, port, client_call_manager);
object_info_grpc_client_ = std::make_unique<GrpcClient<ObjectInfoGcsService>>(
address, port, client_call_manager);
task_info_grpc_client_ = std::make_unique<GrpcClient<TaskInfoGcsService>>(
address, port, client_call_manager);
stats_grpc_client_ =
@ -205,22 +203,6 @@ class GcsRpcClient {
VOID_GCS_RPC_CLIENT_METHOD(HeartbeatInfoGcsService, CheckAlive,
heartbeat_info_grpc_client_, )
/// Get object's locations from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations,
object_info_grpc_client_, )
/// Get all object's locations from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetAllObjectLocations,
object_info_grpc_client_, )
/// Add location of object to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, AddObjectLocation,
object_info_grpc_client_, )
/// Remove location of object to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, RemoveObjectLocation,
object_info_grpc_client_, )
/// Add a task to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTask, task_info_grpc_client_, )
@ -308,7 +290,6 @@ class GcsRpcClient {
std::unique_ptr<GrpcClient<NodeInfoGcsService>> node_info_grpc_client_;
std::unique_ptr<GrpcClient<NodeResourceInfoGcsService>> node_resource_info_grpc_client_;
std::unique_ptr<GrpcClient<HeartbeatInfoGcsService>> heartbeat_info_grpc_client_;
std::unique_ptr<GrpcClient<ObjectInfoGcsService>> object_info_grpc_client_;
std::unique_ptr<GrpcClient<TaskInfoGcsService>> task_info_grpc_client_;
std::unique_ptr<GrpcClient<StatsGcsService>> stats_grpc_client_;
std::unique_ptr<GrpcClient<WorkerInfoGcsService>> worker_info_grpc_client_;

View file

@ -351,56 +351,6 @@ class HeartbeatInfoGrpcService : public GrpcService {
HeartbeatInfoGcsServiceHandler &service_handler_;
};
class ObjectInfoGcsServiceHandler {
public:
virtual ~ObjectInfoGcsServiceHandler() = default;
virtual void HandleGetObjectLocations(const GetObjectLocationsRequest &request,
GetObjectLocationsReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetAllObjectLocations(const GetAllObjectLocationsRequest &request,
GetAllObjectLocationsReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleAddObjectLocation(const AddObjectLocationRequest &request,
AddObjectLocationReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleRemoveObjectLocation(const RemoveObjectLocationRequest &request,
RemoveObjectLocationReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `ObjectInfoGcsServiceHandler`.
class ObjectInfoGrpcService : public GrpcService {
public:
/// Constructor.
///
/// \param[in] handler The service handler that actually handle the requests.
explicit ObjectInfoGrpcService(instrumented_io_context &io_service,
ObjectInfoGcsServiceHandler &handler)
: GrpcService(io_service), service_handler_(handler){};
protected:
grpc::Service &GetGrpcService() override { return service_; }
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
OBJECT_INFO_SERVICE_RPC_HANDLER(GetObjectLocations);
OBJECT_INFO_SERVICE_RPC_HANDLER(GetAllObjectLocations);
OBJECT_INFO_SERVICE_RPC_HANDLER(AddObjectLocation);
OBJECT_INFO_SERVICE_RPC_HANDLER(RemoveObjectLocation);
}
private:
/// The grpc async service object.
ObjectInfoGcsService::AsyncService service_;
/// The service handler that actually handle the requests.
ObjectInfoGcsServiceHandler &service_handler_;
};
class TaskInfoGcsServiceHandler {
public:
virtual ~TaskInfoGcsServiceHandler() = default;
@ -690,7 +640,6 @@ using ActorInfoHandler = ActorInfoGcsServiceHandler;
using NodeInfoHandler = NodeInfoGcsServiceHandler;
using NodeResourceInfoHandler = NodeResourceInfoGcsServiceHandler;
using HeartbeatInfoHandler = HeartbeatInfoGcsServiceHandler;
using ObjectInfoHandler = ObjectInfoGcsServiceHandler;
using TaskInfoHandler = TaskInfoGcsServiceHandler;
using StatsHandler = StatsGcsServiceHandler;
using WorkerInfoHandler = WorkerInfoGcsServiceHandler;