[test] add unit test for PR #17634 (#18585)

This commit is contained in:
Yi Cheng 2021-09-22 14:39:30 -07:00 committed by GitHub
parent e6511bcf56
commit 73c3cff18b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 816 additions and 58 deletions

View file

@ -1513,6 +1513,21 @@ cc_test(
],
)
cc_test(
name = "gcs_actor_scheduler_mock_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":gcs_server_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "gcs_based_actor_scheduler_test",
size = "small",

View file

@ -17,6 +17,7 @@ namespace gcs {
class MockGcsNodeManager : public GcsNodeManager {
public:
MockGcsNodeManager() : GcsNodeManager(nullptr, nullptr) {}
MOCK_METHOD(void, HandleRegisterNode,
(const rpc::RegisterNodeRequest &request, rpc::RegisterNodeReply *reply,
rpc::SendReplyCallback send_reply_callback),

View file

@ -0,0 +1,27 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace gcs {
class MockGcsPubSub : public GcsPubSub {
public:
MOCK_METHOD(Status, Publish,
(const std::string &channel, const std::string &id, const std::string &data,
const StatusCallback &done),
(override));
};
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,66 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace gcs {
class MockInMemoryStoreClient : public InMemoryStoreClient {
public:
MOCK_METHOD(Status, AsyncPut,
(const std::string &table_name, const std::string &key,
const std::string &data, const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncPutWithIndex,
(const std::string &table_name, const std::string &key,
const std::string &index_key, const std::string &data,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncGet,
(const std::string &table_name, const std::string &key,
const OptionalItemCallback<std::string> &callback),
(override));
MOCK_METHOD(Status, AsyncGetByIndex,
(const std::string &table_name, const std::string &index_key,
(const MapCallback<std::string, std::string> &callback)),
(override));
MOCK_METHOD(Status, AsyncGetAll,
(const std::string &table_name,
(const MapCallback<std::string, std::string> &callback)),
(override));
MOCK_METHOD(Status, AsyncDelete,
(const std::string &table_name, const std::string &key,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncDeleteWithIndex,
(const std::string &table_name, const std::string &key,
const std::string &index_key, const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncBatchDelete,
(const std::string &table_name, const std::vector<std::string> &keys,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncBatchDeleteWithIndex,
(const std::string &table_name, const std::vector<std::string> &keys,
const std::vector<std::string> &index_keys,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncDeleteByIndex,
(const std::string &table_name, const std::string &index_key,
const StatusCallback &callback),
(override));
MOCK_METHOD(int, GetNextJobID, (), (override));
};
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,66 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace gcs {
class MockRedisStoreClient : public RedisStoreClient {
public:
MOCK_METHOD(Status, AsyncPut,
(const std::string &table_name, const std::string &key,
const std::string &data, const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncPutWithIndex,
(const std::string &table_name, const std::string &key,
const std::string &index_key, const std::string &data,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncGet,
(const std::string &table_name, const std::string &key,
const OptionalItemCallback<std::string> &callback),
(override));
MOCK_METHOD(Status, AsyncGetByIndex,
(const std::string &table_name, const std::string &index_key,
(const MapCallback<std::string, std::string> &callback)),
(override));
MOCK_METHOD(Status, AsyncGetAll,
(const std::string &table_name,
(const MapCallback<std::string, std::string> &callback)),
(override));
MOCK_METHOD(Status, AsyncDelete,
(const std::string &table_name, const std::string &key,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncDeleteWithIndex,
(const std::string &table_name, const std::string &key,
const std::string &index_key, const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncBatchDelete,
(const std::string &table_name, const std::vector<std::string> &keys,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncBatchDeleteWithIndex,
(const std::string &table_name, const std::vector<std::string> &keys,
const std::vector<std::string> &index_keys,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncDeleteByIndex,
(const std::string &table_name, const std::string &index_key,
const StatusCallback &callback),
(override));
MOCK_METHOD(int, GetNextJobID, (), (override));
};
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,66 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace gcs {
class MockStoreClient : public StoreClient {
public:
MOCK_METHOD(Status, AsyncPut,
(const std::string &table_name, const std::string &key,
const std::string &data, const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncPutWithIndex,
(const std::string &table_name, const std::string &key,
const std::string &index_key, const std::string &data,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncGet,
(const std::string &table_name, const std::string &key,
const OptionalItemCallback<std::string> &callback),
(override));
MOCK_METHOD(Status, AsyncGetByIndex,
(const std::string &table_name, const std::string &index_key,
(const MapCallback<std::string, std::string> &callback)),
(override));
MOCK_METHOD(Status, AsyncGetAll,
(const std::string &table_name,
(const MapCallback<std::string, std::string> &callback)),
(override));
MOCK_METHOD(Status, AsyncDelete,
(const std::string &table_name, const std::string &key,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncDeleteWithIndex,
(const std::string &table_name, const std::string &key,
const std::string &index_key, const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncBatchDelete,
(const std::string &table_name, const std::vector<std::string> &keys,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncBatchDeleteWithIndex,
(const std::string &table_name, const std::vector<std::string> &keys,
const std::vector<std::string> &index_keys,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status, AsyncDeleteByIndex,
(const std::string &table_name, const std::string &index_key,
const StatusCallback &callback),
(override));
MOCK_METHOD(int, GetNextJobID, (), (override));
};
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,100 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace pubsub {
namespace pub_internal {
template <typename KeyIdType>
class MockSubscriptionIndex : public SubscriptionIndex<KeyIdType> {
public:
};
} // namespace pub_internal
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
namespace pub_internal {
class MockLongPollConnection : public LongPollConnection {
public:
};
} // namespace pub_internal
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
namespace pub_internal {
class MockSubscriber : public Subscriber {
public:
};
} // namespace pub_internal
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
class MockPublisherInterface : public PublisherInterface {
public:
MOCK_METHOD(bool, RegisterSubscription,
(const rpc::ChannelType channel_type, const SubscriberID &subscriber_id,
const std::string &key_id_binary),
(override));
MOCK_METHOD(void, Publish,
(const rpc::ChannelType channel_type, const rpc::PubMessage &pub_message,
const std::string &key_id_binary),
(override));
MOCK_METHOD(void, PublishFailure,
(const rpc::ChannelType channel_type, const std::string &key_id_binary),
(override));
MOCK_METHOD(bool, UnregisterSubscription,
(const rpc::ChannelType channel_type, const SubscriberID &subscriber_id,
const std::string &key_id_binary),
(override));
};
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
class MockPublisher : public Publisher {
public:
MOCK_METHOD(bool, RegisterSubscription,
(const rpc::ChannelType channel_type, const SubscriberID &subscriber_id,
const std::string &key_id_binary),
(override));
MOCK_METHOD(void, Publish,
(const rpc::ChannelType channel_type, const rpc::PubMessage &pub_message,
const std::string &key_id_binary),
(override));
MOCK_METHOD(void, PublishFailure,
(const rpc::ChannelType channel_type, const std::string &key_id_binary),
(override));
MOCK_METHOD(bool, UnregisterSubscription,
(const rpc::ChannelType channel_type, const SubscriberID &subscriber_id,
const std::string &key_id_binary),
(override));
};
} // namespace pubsub
} // namespace ray

View file

@ -0,0 +1,155 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace pubsub {
template <typename KeyIdType>
class MockSubscriptionInfo : public SubscriptionInfo<KeyIdType> {
public:
};
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
class MockSubscribeChannelInterface : public SubscribeChannelInterface {
public:
MOCK_METHOD(void, Subscribe,
(const rpc::Address &publisher_address, const std::string &key_id_binary,
SubscriptionCallback subscription_callback,
SubscriptionFailureCallback subscription_failure_callback),
(override));
MOCK_METHOD(bool, Unsubscribe,
(const rpc::Address &publisher_address, const std::string &key_id_binary),
(override));
MOCK_METHOD(void, HandlePublishedMessage,
(const rpc::Address &publisher_address, const rpc::PubMessage &pub_message),
(const, override));
MOCK_METHOD(void, HandlePublisherFailure, (const rpc::Address &publisher_address),
(override));
MOCK_METHOD(void, HandlePublisherFailure,
(const rpc::Address &publisher_address, const std::string &key_id_binary),
(override));
MOCK_METHOD(bool, SubscriptionExists, (const PublisherID &publisher_id), (override));
MOCK_METHOD(const rpc::ChannelType, GetChannelType, (), (const, override));
MOCK_METHOD(bool, CheckNoLeaks, (), (const, override));
MOCK_METHOD(std::string, DebugString, (), (const, override));
};
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
template <typename KeyIdType>
class MockSubscriberChannel : public SubscriberChannel<KeyIdType> {
public:
MOCK_METHOD(void, Subscribe,
(const rpc::Address &publisher_address, const std::string &key_id,
SubscriptionCallback subscription_callback,
SubscriptionFailureCallback subscription_failure_callback),
(override));
MOCK_METHOD(bool, Unsubscribe,
(const rpc::Address &publisher_address, const std::string &key_id),
(override));
MOCK_METHOD(bool, CheckNoLeaks, (), (const, override));
MOCK_METHOD(void, HandlePublishedMessage,
(const rpc::Address &publisher_address, const rpc::PubMessage &pub_message),
(const, override));
MOCK_METHOD(void, HandlePublisherFailure, (const rpc::Address &publisher_address),
(override));
MOCK_METHOD(void, HandlePublisherFailure,
(const rpc::Address &publisher_address, const std::string &key_id_binary),
(override));
MOCK_METHOD(bool, SubscriptionExists, (const PublisherID &publisher_id), (override));
MOCK_METHOD(const rpc::ChannelType, GetChannelType, (), (const, override));
MOCK_METHOD(std::string, DebugString, (), (const, override));
};
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
class MockWaitForObjectEvictionChannel : public WaitForObjectEvictionChannel {
public:
};
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
class MockWaitForRefRemovedChannel : public WaitForRefRemovedChannel {
public:
};
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
class MockObjectLocationsChannel : public ObjectLocationsChannel {
public:
};
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
class MockSubscriberInterface : public SubscriberInterface {
public:
MOCK_METHOD(void, Subscribe,
(std::unique_ptr<rpc::SubMessage> sub_message,
const rpc::ChannelType channel_type, const rpc::Address &publisher_address,
const std::string &key_id_binary,
SubscriptionCallback subscription_callback,
SubscriptionFailureCallback subscription_failure_callback),
(override));
MOCK_METHOD(bool, Unsubscribe,
(const rpc::ChannelType channel_type, const rpc::Address &publisher_address,
const std::string &key_id_binary),
(override));
MOCK_METHOD(std::string, DebugString, (), (const, override));
};
} // namespace pubsub
} // namespace ray
namespace ray {
namespace pubsub {
class MockSubscriberClientInterface : public SubscriberClientInterface {
public:
MOCK_METHOD(void, PubsubLongPolling,
(const rpc::PubsubLongPollingRequest &request,
const rpc::ClientCallback<rpc::PubsubLongPollingReply> &callback),
(override));
MOCK_METHOD(void, PubsubCommandBatch,
(const rpc::PubsubCommandBatchRequest &request,
const rpc::ClientCallback<rpc::PubsubCommandBatchReply> &callback),
(override));
};
} // namespace pubsub
} // namespace ray

View file

@ -106,31 +106,6 @@ class MockResourceTrackingInterface : public ResourceTrackingInterface {
namespace ray {
class MockRayletClientInterface : public RayletClientInterface {
public:
MOCK_METHOD(void, GetSystemConfig,
(const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback),
(override));
MOCK_METHOD(void, GetGcsServerAddress,
(const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback),
(override));
};
} // namespace ray
namespace ray {
namespace raylet {
class MockRayletConnection : public RayletConnection {
public:
};
} // namespace raylet
} // namespace ray
namespace ray {
namespace raylet {
class MockRayletClient : public RayletClient {
public:
MOCK_METHOD(ray::Status, WaitForDirectActorCallArgs,
(const std::vector<rpc::ObjectReference> &references, int64_t tag),
@ -191,5 +166,4 @@ class MockRayletClient : public RayletClient {
(override));
};
} // namespace raylet
} // namespace ray

View file

@ -0,0 +1,123 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace rpc {
class MockWorkerAddress : public WorkerAddress {
public:
};
} // namespace rpc
} // namespace ray
namespace ray {
namespace rpc {
class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientInterface,
public CoreWorkerClientInterface {
public:
MOCK_METHOD(const rpc::Address &, Addr, (), (const, override));
MOCK_METHOD(void, PushActorTask,
(std::unique_ptr<PushTaskRequest> request, bool skip_queue,
const ClientCallback<PushTaskReply> &callback),
(override));
MOCK_METHOD(void, PushNormalTask,
(std::unique_ptr<PushTaskRequest> request,
const ClientCallback<PushTaskReply> &callback),
(override));
MOCK_METHOD(void, StealTasks,
(std::unique_ptr<StealTasksRequest> request,
const ClientCallback<StealTasksReply> &callback),
(override));
MOCK_METHOD(void, DirectActorCallArgWaitComplete,
(const DirectActorCallArgWaitCompleteRequest &request,
const ClientCallback<DirectActorCallArgWaitCompleteReply> &callback),
(override));
MOCK_METHOD(void, GetObjectStatus,
(const GetObjectStatusRequest &request,
const ClientCallback<GetObjectStatusReply> &callback),
(override));
MOCK_METHOD(void, WaitForActorOutOfScope,
(const WaitForActorOutOfScopeRequest &request,
const ClientCallback<WaitForActorOutOfScopeReply> &callback),
(override));
MOCK_METHOD(void, PubsubLongPolling,
(const PubsubLongPollingRequest &request,
const ClientCallback<PubsubLongPollingReply> &callback),
(override));
MOCK_METHOD(void, PubsubCommandBatch,
(const PubsubCommandBatchRequest &request,
const ClientCallback<PubsubCommandBatchReply> &callback),
(override));
MOCK_METHOD(void, UpdateObjectLocationBatch,
(const UpdateObjectLocationBatchRequest &request,
const ClientCallback<UpdateObjectLocationBatchReply> &callback),
(override));
MOCK_METHOD(void, GetObjectLocationsOwner,
(const GetObjectLocationsOwnerRequest &request,
const ClientCallback<GetObjectLocationsOwnerReply> &callback),
(override));
MOCK_METHOD(void, KillActor,
(const KillActorRequest &request,
const ClientCallback<KillActorReply> &callback),
(override));
MOCK_METHOD(void, CancelTask,
(const CancelTaskRequest &request,
const ClientCallback<CancelTaskReply> &callback),
(override));
MOCK_METHOD(void, RemoteCancelTask,
(const RemoteCancelTaskRequest &request,
const ClientCallback<RemoteCancelTaskReply> &callback),
(override));
MOCK_METHOD(void, GetCoreWorkerStats,
(const GetCoreWorkerStatsRequest &request,
const ClientCallback<GetCoreWorkerStatsReply> &callback),
(override));
MOCK_METHOD(void, LocalGC,
(const LocalGCRequest &request,
const ClientCallback<LocalGCReply> &callback),
(override));
MOCK_METHOD(void, SpillObjects,
(const SpillObjectsRequest &request,
const ClientCallback<SpillObjectsReply> &callback),
(override));
MOCK_METHOD(void, RestoreSpilledObjects,
(const RestoreSpilledObjectsRequest &request,
const ClientCallback<RestoreSpilledObjectsReply> &callback),
(override));
MOCK_METHOD(void, DeleteSpilledObjects,
(const DeleteSpilledObjectsRequest &request,
const ClientCallback<DeleteSpilledObjectsReply> &callback),
(override));
MOCK_METHOD(void, AddSpilledUrl,
(const AddSpilledUrlRequest &request,
const ClientCallback<AddSpilledUrlReply> &callback),
(override));
MOCK_METHOD(void, PlasmaObjectReady,
(const PlasmaObjectReadyRequest &request,
const ClientCallback<PlasmaObjectReadyReply> &callback),
(override));
MOCK_METHOD(void, Exit,
(const ExitRequest &request, const ClientCallback<ExitReply> &callback),
(override));
MOCK_METHOD(void, AssignObjectOwner,
(const AssignObjectOwnerRequest &request,
const ClientCallback<AssignObjectOwnerReply> &callback),
(override));
MOCK_METHOD(int64_t, ClientProcessedUpToSeqno, (), (override));
};
} // namespace rpc
} // namespace ray

View file

@ -0,0 +1,23 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace ray {
namespace rpc {
class MockCoreWorkerClientPool : public CoreWorkerClientPool {
public:
};
} // namespace rpc
} // namespace ray

View file

@ -51,8 +51,8 @@ using rpc::WorkerTableData;
template <typename Key, typename Data>
class GcsTable {
public:
explicit GcsTable(std::shared_ptr<StoreClient> &store_client)
: store_client_(store_client) {}
explicit GcsTable(std::shared_ptr<StoreClient> store_client)
: store_client_(std::move(store_client)) {}
virtual ~GcsTable() = default;
@ -106,8 +106,8 @@ class GcsTable {
template <typename Key, typename Data>
class GcsTableWithJobId : public GcsTable<Key, Data> {
public:
explicit GcsTableWithJobId(std::shared_ptr<StoreClient> &store_client)
: GcsTable<Key, Data>(store_client) {}
explicit GcsTableWithJobId(std::shared_ptr<StoreClient> store_client)
: GcsTable<Key, Data>(std::move(store_client)) {}
/// Write data to the table asynchronously.
///
@ -152,16 +152,16 @@ class GcsTableWithJobId : public GcsTable<Key, Data> {
class GcsJobTable : public GcsTable<JobID, JobTableData> {
public:
explicit GcsJobTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsJobTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::JOB);
}
};
class GcsActorTable : public GcsTableWithJobId<ActorID, ActorTableData> {
public:
explicit GcsActorTable(std::shared_ptr<StoreClient> &store_client)
: GcsTableWithJobId(store_client) {
explicit GcsActorTable(std::shared_ptr<StoreClient> store_client)
: GcsTableWithJobId(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::ACTOR);
}
@ -172,16 +172,16 @@ class GcsActorTable : public GcsTableWithJobId<ActorID, ActorTableData> {
class GcsPlacementGroupTable
: public GcsTable<PlacementGroupID, PlacementGroupTableData> {
public:
explicit GcsPlacementGroupTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsPlacementGroupTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::PLACEMENT_GROUP);
}
};
class GcsTaskTable : public GcsTableWithJobId<TaskID, TaskTableData> {
public:
explicit GcsTaskTable(std::shared_ptr<StoreClient> &store_client)
: GcsTableWithJobId(store_client) {
explicit GcsTaskTable(std::shared_ptr<StoreClient> store_client)
: GcsTableWithJobId(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::TASK);
}
@ -191,8 +191,8 @@ class GcsTaskTable : public GcsTableWithJobId<TaskID, TaskTableData> {
class GcsTaskLeaseTable : public GcsTableWithJobId<TaskID, TaskLeaseData> {
public:
explicit GcsTaskLeaseTable(std::shared_ptr<StoreClient> &store_client)
: GcsTableWithJobId(store_client) {
explicit GcsTaskLeaseTable(std::shared_ptr<StoreClient> store_client)
: GcsTableWithJobId(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::TASK_LEASE);
}
@ -203,8 +203,8 @@ class GcsTaskLeaseTable : public GcsTableWithJobId<TaskID, TaskLeaseData> {
class GcsTaskReconstructionTable
: public GcsTableWithJobId<TaskID, TaskReconstructionData> {
public:
explicit GcsTaskReconstructionTable(std::shared_ptr<StoreClient> &store_client)
: GcsTableWithJobId(store_client) {
explicit GcsTaskReconstructionTable(std::shared_ptr<StoreClient> store_client)
: GcsTableWithJobId(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::TASK_RECONSTRUCTION);
}
@ -214,8 +214,8 @@ class GcsTaskReconstructionTable
class GcsObjectTable : public GcsTableWithJobId<ObjectID, ObjectLocationInfo> {
public:
explicit GcsObjectTable(std::shared_ptr<StoreClient> &store_client)
: GcsTableWithJobId(store_client) {
explicit GcsObjectTable(std::shared_ptr<StoreClient> store_client)
: GcsTableWithJobId(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::OBJECT);
}
@ -225,56 +225,56 @@ class GcsObjectTable : public GcsTableWithJobId<ObjectID, ObjectLocationInfo> {
class GcsNodeTable : public GcsTable<NodeID, GcsNodeInfo> {
public:
explicit GcsNodeTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsNodeTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::NODE);
}
};
class GcsNodeResourceTable : public GcsTable<NodeID, ResourceMap> {
public:
explicit GcsNodeResourceTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsNodeResourceTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::NODE_RESOURCE);
}
};
class GcsPlacementGroupScheduleTable : public GcsTable<PlacementGroupID, ScheduleData> {
public:
explicit GcsPlacementGroupScheduleTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsPlacementGroupScheduleTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::PLACEMENT_GROUP_SCHEDULE);
}
};
class GcsResourceUsageBatchTable : public GcsTable<NodeID, ResourceUsageBatchData> {
public:
explicit GcsResourceUsageBatchTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsResourceUsageBatchTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::RESOURCE_USAGE_BATCH);
}
};
class GcsProfileTable : public GcsTable<UniqueID, ProfileTableData> {
public:
explicit GcsProfileTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsProfileTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::PROFILE);
}
};
class GcsWorkerTable : public GcsTable<WorkerID, WorkerTableData> {
public:
explicit GcsWorkerTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsWorkerTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::WORKERS);
}
};
class GcsInternalConfigTable : public GcsTable<UniqueID, StoredConfig> {
public:
explicit GcsInternalConfigTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
explicit GcsInternalConfigTable(std::shared_ptr<StoreClient> store_client)
: GcsTable(std::move(store_client)) {
table_name_ = TablePrefix_Name(TablePrefix::INTERNAL_CONFIG);
}
};

View file

@ -0,0 +1,139 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// clang-format off
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
#include "ray/gcs/gcs_server/gcs_actor_scheduler.h"
#include "mock/ray/gcs/store_client/store_client.h"
#include "mock/ray/gcs/gcs_server/gcs_node_manager.h"
#include "mock/ray/raylet_client/raylet_client.h"
#include "mock/ray/pubsub/subscriber.h"
#include "mock/ray/gcs/pubsub/gcs_pub_sub.h"
#include "mock/ray/rpc/worker/core_worker_client.h"
// clang-format on
using namespace ::testing;
namespace ray {
namespace gcs {
struct MockCallback {
MOCK_METHOD(void, Call, ((std::shared_ptr<GcsActor>)));
void operator()(std::shared_ptr<GcsActor> a) { return Call(a); }
};
class GcsActorSchedulerTest : public Test {
public:
void SetUp() override {
store_client = std::make_shared<MockStoreClient>();
actor_table = std::make_unique<GcsActorTable>(store_client);
gcs_node_manager = std::make_unique<MockGcsNodeManager>();
pub_sub = std::make_shared<MockGcsPubSub>();
raylet_client = std::make_shared<MockRayletClientInterface>();
core_worker_client = std::make_shared<rpc::MockCoreWorkerClientInterface>();
client_pool = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &) { return raylet_client; });
actor_scheduler = std::make_unique<RayletBasedActorScheduler>(
io_context, *actor_table, *gcs_node_manager, pub_sub,
[this](auto a) { schedule_failure_handler(a); },
[this](auto a, const rpc::PushTaskReply) { schedule_success_handler(a); },
client_pool, [this](const rpc::Address &) { return core_worker_client; });
auto node_info = std::make_shared<rpc::GcsNodeInfo>();
node_info->set_state(rpc::GcsNodeInfo::ALIVE);
node_id = NodeID::FromRandom();
node_info->set_node_id(node_id.Binary());
worker_id = WorkerID::FromRandom();
gcs_node_manager->AddNode(node_info);
}
std::shared_ptr<MockRayletClientInterface> raylet_client;
instrumented_io_context io_context;
std::shared_ptr<MockStoreClient> store_client;
std::unique_ptr<GcsActorTable> actor_table;
std::unique_ptr<GcsActorScheduler> actor_scheduler;
std::unique_ptr<MockGcsNodeManager> gcs_node_manager;
std::shared_ptr<MockGcsPubSub> pub_sub;
std::shared_ptr<rpc::MockCoreWorkerClientInterface> core_worker_client;
std::shared_ptr<rpc::NodeManagerClientPool> client_pool;
MockCallback schedule_failure_handler;
MockCallback schedule_success_handler;
NodeID node_id;
WorkerID worker_id;
};
TEST_F(GcsActorSchedulerTest, KillWorkerLeak1) {
// Ensure worker is not leak in the following case:
// 1. Gcs start to lease a worker
// 2. Gcs cancel the actor
// 3. Gcs lease reply with a grant
// We'd like to test the worker got released eventually.
// Worker is released with actor killing
auto actor_id = ActorID::FromHex("f4ce02420592ca68c1738a0d01000000");
rpc::ActorTableData actor_data;
actor_data.set_state(rpc::ActorTableData::PENDING_CREATION);
actor_data.set_actor_id(actor_id.Binary());
auto actor = std::make_shared<GcsActor>(actor_data);
std::function<void(const Status &, const rpc::RequestWorkerLeaseReply &)> cb;
EXPECT_CALL(*raylet_client, RequestWorkerLease(_, _, _))
.WillOnce(testing::SaveArg<1>(&cb));
// Ensure actor is killed
EXPECT_CALL(*core_worker_client, KillActor(_, _));
actor_scheduler->Schedule(actor);
actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::DEAD);
actor_scheduler->CancelOnNode(node_id);
ray::rpc::RequestWorkerLeaseReply reply;
reply.mutable_worker_address()->set_raylet_id(node_id.Binary());
reply.mutable_worker_address()->set_worker_id(worker_id.Binary());
cb(Status::OK(), reply);
}
TEST_F(GcsActorSchedulerTest, KillWorkerLeak2) {
// Ensure worker is not leak in the following case:
// 1. Actor is in pending creation
// 2. Gcs push creation task to run in worker
// 3. Cancel the task
// 4. Task creating reply received
// We'd like to test the worker got released eventually.
// Worker is released with actor killing
auto actor_id = ActorID::FromHex("f4ce02420592ca68c1738a0d01000000");
rpc::ActorTableData actor_data;
actor_data.set_state(rpc::ActorTableData::PENDING_CREATION);
actor_data.set_actor_id(actor_id.Binary());
auto actor = std::make_shared<GcsActor>(actor_data);
rpc::ClientCallback<rpc::RequestWorkerLeaseReply> request_worker_lease_cb;
// Ensure actor is killed
EXPECT_CALL(*core_worker_client, KillActor(_, _));
EXPECT_CALL(*raylet_client, RequestWorkerLease(_, _, _))
.WillOnce(testing::SaveArg<1>(&request_worker_lease_cb));
std::function<void(ray::Status)> async_put_with_index_cb;
// Leasing successfully
EXPECT_CALL(*store_client, AsyncPutWithIndex(_, _, _, _, _))
.WillOnce(DoAll(SaveArg<4>(&async_put_with_index_cb), Return(Status::OK())));
actor_scheduler->Schedule(actor);
rpc::RequestWorkerLeaseReply reply;
reply.mutable_worker_address()->set_raylet_id(node_id.Binary());
reply.mutable_worker_address()->set_worker_id(worker_id.Binary());
request_worker_lease_cb(Status::OK(), reply);
rpc::ClientCallback<rpc::PushTaskReply> push_normal_task_cb;
// Worker start to run task
EXPECT_CALL(*core_worker_client, PushNormalTask(_, _))
.WillOnce(testing::SaveArg<1>(&push_normal_task_cb));
async_put_with_index_cb(Status::OK());
actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::DEAD);
actor_scheduler->CancelOnWorker(node_id, worker_id);
push_normal_task_cb(Status::OK(), rpc::PushTaskReply());
}
} // namespace gcs
} // namespace ray

View file

@ -96,6 +96,9 @@ class GcsPubSub {
std::string DebugString() const;
protected:
GcsPubSub() : GcsPubSub(nullptr) {}
private:
/// Represents a caller's command to subscribe or unsubscribe to a given
/// channel.