[2/3][kv] Add delete by prefix support for internal kv (#21442)

Delete by prefix for internal kv is necessary to cleanup the function table. This will be used to fix the issue #8822
This commit is contained in:
Yi Cheng 2022-01-07 22:54:24 -08:00 committed by GitHub
parent 4a34233a90
commit bdfba88082
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 92 additions and 35 deletions

View file

@ -241,10 +241,11 @@ class GcsClient:
f"due to error {reply.status.message}")
@_auto_reconnect
def internal_kv_del(self, key: bytes, namespace: Optional[bytes]) -> int:
logger.debug(f"internal_kv_del {key} {namespace}")
def internal_kv_del(self, key: bytes, del_by_prefix: bool,
namespace: Optional[bytes]) -> int:
logger.debug(f"internal_kv_del {key} {del_by_prefix} {namespace}")
req = gcs_service_pb2.InternalKVDelRequest(
namespace=namespace, key=key)
namespace=namespace, key=key, del_by_prefix=del_by_prefix)
reply = self._kv_stub.InternalKVDel(req)
if reply.status.code == GcsCode.OK:
return reply.deleted_num

View file

@ -88,13 +88,14 @@ def _internal_kv_put(key: Union[str, bytes],
@client_mode_hook(auto_init=False)
def _internal_kv_del(key: Union[str, bytes],
*,
del_by_prefix: bool = False,
namespace: Optional[Union[str, bytes]] = None) -> int:
if isinstance(key, str):
key = key.encode()
if isinstance(namespace, str):
namespace = namespace.encode()
assert isinstance(key, bytes)
return global_gcs_client.internal_kv_del(key, namespace)
return global_gcs_client.internal_kv_del(key, del_by_prefix, namespace)
@client_mode_hook(auto_init=False)

View file

@ -104,6 +104,11 @@ def test_internal_kv(ray_start_regular):
assert set(kv._internal_kv_list("k",
namespace="n")) == {b"k1", b"k2", b"k3"}
assert kv._internal_kv_del("k", del_by_prefix=True, namespace="n") == 3
assert kv._internal_kv_get("k1", namespace="n") is None
assert kv._internal_kv_get("k2", namespace="n") is None
assert kv._internal_kv_get("k3", namespace="n") is None
with pytest.raises(RuntimeError):
kv._internal_kv_put("@namespace_", "x", True)
with pytest.raises(RuntimeError):

View file

@ -275,7 +275,7 @@ class MockInternalKVAccessor : public InternalKVAccessor {
const OptionalItemCallback<bool> &callback),
(override));
MOCK_METHOD(Status, AsyncInternalKVDel,
(const std::string &ns, const std::string &key,
(const std::string &ns, const std::string &key, bool del_by_prefix,
const StatusCallback &callback),
(override));
};

View file

@ -1185,7 +1185,7 @@ Status InternalKVAccessor::AsyncInternalKVExists(
}
Status InternalKVAccessor::AsyncInternalKVDel(const std::string &ns,
const std::string &key,
const std::string &key, bool del_by_prefix,
const StatusCallback &callback) {
rpc::InternalKVDelRequest req;
req.set_namespace_(ns);
@ -1254,10 +1254,12 @@ Status InternalKVAccessor::Get(const std::string &ns, const std::string &key,
return ret_promise.get_future().get();
}
Status InternalKVAccessor::Del(const std::string &ns, const std::string &key) {
Status InternalKVAccessor::Del(const std::string &ns, const std::string &key,
bool del_by_prefix) {
std::promise<Status> ret_promise;
RAY_CHECK_OK(AsyncInternalKVDel(
ns, key, [&ret_promise](Status status) { ret_promise.set_value(status); }));
RAY_CHECK_OK(AsyncInternalKVDel(ns, key, del_by_prefix, [&ret_promise](Status status) {
ret_promise.set_value(status);
}));
return ret_promise.get_future().get();
}

View file

@ -772,10 +772,11 @@ class InternalKVAccessor {
///
/// \param ns The namespace to delete from.
/// \param key The key to delete.
/// \param del_by_prefix If set to be true, delete all keys with prefix as `key`.
/// \param callback Callback that will be called after the operation.
/// \return Status
virtual Status AsyncInternalKVDel(const std::string &ns, const std::string &key,
const StatusCallback &callback);
bool del_by_prefix, const StatusCallback &callback);
// These are sync functions of the async above
@ -821,8 +822,9 @@ class InternalKVAccessor {
///
/// \param ns The namespace to delete from.
/// \param key The key to delete
/// \param del_by_prefix If set to be true, delete all keys with prefix as `key`.
/// \return Status
virtual Status Del(const std::string &ns, const std::string &key);
virtual Status Del(const std::string &ns, const std::string &key, bool del_by_prefix);
/// Check existence of a key in the store
///

View file

@ -89,13 +89,30 @@ void RedisInternalKV::Put(const std::string &ns, const std::string &key,
}
void RedisInternalKV::Del(const std::string &ns, const std::string &key,
std::function<void(int64_t)> callback) {
bool del_by_prefix, std::function<void(int64_t)> callback) {
auto true_key = MakeKey(ns, key);
std::vector<std::string> cmd = {"HDEL", true_key, "value"};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd, [callback = std::move(callback)](auto redis_reply) {
callback(redis_reply->ReadAsInteger() != 0);
}));
if (del_by_prefix) {
std::vector<std::string> cmd = {"KEYS", true_key + "*"};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd, [this, callback = std::move(callback)](auto redis_reply) {
const auto &reply = redis_reply->ReadAsStringArray();
std::vector<std::string> del_cmd = {"DEL"};
for (const auto &r : reply) {
RAY_CHECK(r.has_value());
del_cmd.emplace_back(*r);
}
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
del_cmd, [callback = std::move(callback)](auto redis_reply) {
callback(redis_reply->ReadAsInteger());
}));
}));
} else {
std::vector<std::string> cmd = {"DEL", true_key};
RAY_CHECK_OK(redis_client_->GetPrimaryContext()->RunArgvAsync(
cmd, [callback = std::move(callback)](auto redis_reply) {
callback(redis_reply->ReadAsInteger());
}));
}
}
void RedisInternalKV::Exists(const std::string &ns, const std::string &key,
@ -157,15 +174,28 @@ void MemoryInternalKV::Put(const std::string &ns, const std::string &key,
}
void MemoryInternalKV::Del(const std::string &ns, const std::string &key,
std::function<void(int64_t)> callback) {
bool del_by_prefix, std::function<void(int64_t)> callback) {
absl::WriterMutexLock _(&mu_);
auto true_key = MakeKey(ns, key);
auto it = map_.find(true_key);
auto it = map_.lower_bound(true_key);
int64_t del_num = 0;
if (it != map_.end()) {
map_.erase(it);
del_num += 1;
while (it != map_.end()) {
if (!del_by_prefix) {
if (it->first == true_key) {
map_.erase(it);
++del_num;
}
break;
}
if (absl::StartsWith(it->first, true_key)) {
it = map_.erase(it);
++del_num;
} else {
break;
}
}
if (callback != nullptr) {
io_context_.post(std::bind(std::move(callback), del_num));
}
@ -243,7 +273,8 @@ void GcsInternalKVManager::HandleInternalKVDel(
reply->set_deleted_num(del_num);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
kv_instance_->Del(request.namespace_(), request.key(), std::move(callback));
kv_instance_->Del(request.namespace_(), request.key(), request.del_by_prefix(),
std::move(callback));
}
}

View file

@ -54,8 +54,10 @@ class InternalKVInterface {
///
/// \param ns The namespace of the key.
/// \param key The key to be deleted.
/// \param del_by_prefix Whether to treat the key as prefix. If true, it'll
/// delete all keys with `key` as the prefix.
/// \param callback Callback function.
virtual void Del(const std::string &ns, const std::string &key,
virtual void Del(const std::string &ns, const std::string &key, bool del_by_prefix,
std::function<void(int64_t)> callback) = 0;
/// Check whether the key exists in the store.
@ -98,7 +100,7 @@ class RedisInternalKV : public InternalKVInterface {
void Put(const std::string &ns, const std::string &key, const std::string &value,
bool overwrite, std::function<void(bool)> callback) override;
void Del(const std::string &ns, const std::string &key,
void Del(const std::string &ns, const std::string &key, bool del_by_prefix,
std::function<void(int64_t)> callback) override;
void Exists(const std::string &ns, const std::string &key,
@ -127,7 +129,7 @@ class MemoryInternalKV : public InternalKVInterface {
void Put(const std::string &ns, const std::string &key, const std::string &value,
bool overwrite, std::function<void(bool)> callback) override;
void Del(const std::string &ns, const std::string &key,
void Del(const std::string &ns, const std::string &key, bool del_by_prefix,
std::function<void(int64_t)> callback) override;
void Exists(const std::string &ns, const std::string &key,

View file

@ -465,7 +465,8 @@ void GcsServer::InitRuntimeEnvManager() {
} else {
auto uri = plugin_uri.substr(protocol_pos);
this->kv_manager_->GetInstance().Del(
"", uri, [callback = std::move(callback)](int64_t) { callback(false); });
"" /* namespace */, uri /* key */, false /* del_by_prefix*/,
[callback = std::move(callback)](int64_t) { callback(false); });
}
}
});

View file

@ -66,14 +66,25 @@ TEST_P(GcsKVManagerTest, TestInternalKV) {
ASSERT_EQ(expected, std::set<std::string>(keys.begin(), keys.end()));
});
kv_instance->Get("N2", "A_1", [](auto b) { ASSERT_FALSE(b.has_value()); });
// Make sure the last callback is called before end.
std::promise<void> p;
auto f = p.get_future();
kv_instance->Get("N1", "A_1", [&p](auto b) {
ASSERT_TRUE(b.has_value());
p.set_value();
});
f.get();
kv_instance->Get("N1", "A_1", [](auto b) { ASSERT_TRUE(b.has_value()); });
{
// Delete by prefix are two steps in redis mode, so we need sync here
std::promise<void> p;
kv_instance->Del("N1", "A_", true, [&p](auto b) {
ASSERT_EQ(3, b);
p.set_value();
});
p.get_future().get();
}
{
// Make sure the last cb is called
std::promise<void> p;
kv_instance->Get("N1", "A_1", [&p](auto b) {
ASSERT_FALSE(b.has_value());
p.set_value();
});
p.get_future().get();
}
}
INSTANTIATE_TEST_SUITE_P(GcsKVManagerTestFixture, GcsKVManagerTest,

View file

@ -475,6 +475,7 @@ message InternalKVPutReply {
message InternalKVDelRequest {
bytes key = 1;
bytes namespace = 2;
bool del_by_prefix = 3;
}
message InternalKVDelReply {