Add the Delete function in GCS (#4081)

* Add the Delete function in GCS

* Unify BatchDelete and Delete

* Fix comment

* Lint

* Refine according to comments

* Unify test.

* Address comment

* C++ lint

* Update ray_redis_module.cc
This commit is contained in:
Yuhong Guo 2019-02-21 13:33:37 +08:00 committed by GitHub
parent 1f864a02bc
commit 3549cd8195
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 281 additions and 1 deletions

View file

@ -82,4 +82,6 @@ cdef extern from "ray/ray_config.h" nogil:
uint32_t num_actor_checkpoints_to_keep() const
uint32_t maximum_gcs_deletion_batch_size() const
void initialize(const unordered_map[c_string, int] &config_map)

View file

@ -148,3 +148,7 @@ cdef class Config:
@staticmethod
def num_actor_checkpoints_to_keep():
return RayConfig.instance().num_actor_checkpoints_to_keep()
@staticmethod
def maximum_gcs_deletion_batch_size():
return RayConfig.instance().maximum_gcs_deletion_batch_size()

View file

@ -7,6 +7,7 @@ extern "C" {
#include "ray/gcs/client.h"
#include "ray/gcs/tables.h"
#include "ray/ray_config.h"
namespace ray {
@ -255,6 +256,160 @@ TEST_F(TestGcsWithAsio, TestLogAppendAt) {
TestLogAppendAt(job_id_, client_);
}
void TestDeleteKeysFromLog(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client,
std::vector<std::shared_ptr<ObjectTableDataT>> &data_vector) {
std::vector<ObjectID> ids;
ObjectID object_id;
for (auto &data : data_vector) {
object_id = ObjectID::from_random();
ids.push_back(object_id);
// Check that we added the correct object entries.
auto add_callback = [object_id, data](gcs::AsyncGcsClient *client, const UniqueID &id,
const ObjectTableDataT &d) {
ASSERT_EQ(id, object_id);
ASSERT_EQ(data->manager, d.manager);
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->object_table().Append(job_id, object_id, data, add_callback));
}
for (const auto &object_id : ids) {
// Check that lookup returns the added object entries.
auto lookup_callback = [object_id, data_vector](
gcs::AsyncGcsClient *client, const ObjectID &id,
const std::vector<ObjectTableDataT> &data) {
ASSERT_EQ(id, object_id);
ASSERT_EQ(data.size(), 1);
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->object_table().Lookup(job_id, object_id, lookup_callback));
}
if (ids.size() == 1) {
client->object_table().Delete(job_id, ids[0]);
} else {
client->object_table().Delete(job_id, ids);
}
for (const auto &object_id : ids) {
auto lookup_callback = [object_id](gcs::AsyncGcsClient *client, const ObjectID &id,
const std::vector<ObjectTableDataT> &data) {
ASSERT_EQ(id, object_id);
ASSERT_TRUE(data.size() == 0);
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->object_table().Lookup(job_id, object_id, lookup_callback));
}
}
void TestDeleteKeysFromTable(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client,
std::vector<std::shared_ptr<protocol::TaskT>> &data_vector,
bool stop_at_end) {
std::vector<TaskID> ids;
TaskID task_id;
for (auto &data : data_vector) {
task_id = TaskID::from_random();
ids.push_back(task_id);
// Check that we added the correct object entries.
auto add_callback = [task_id, data](gcs::AsyncGcsClient *client, const UniqueID &id,
const protocol::TaskT &d) {
ASSERT_EQ(id, task_id);
ASSERT_EQ(data->task_specification, d.task_specification);
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback));
}
for (const auto &task_id : ids) {
auto task_lookup_callback = [task_id](gcs::AsyncGcsClient *client, const TaskID &id,
const protocol::TaskT &data) {
ASSERT_EQ(id, task_id);
test->IncrementNumCallbacks();
};
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, task_lookup_callback,
nullptr));
}
if (ids.size() == 1) {
client->raylet_task_table().Delete(job_id, ids[0]);
} else {
client->raylet_task_table().Delete(job_id, ids);
}
auto expected_failure_callback = [](AsyncGcsClient *client, const TaskID &id) {
ASSERT_TRUE(true);
test->IncrementNumCallbacks();
};
auto undesired_callback = [](gcs::AsyncGcsClient *client, const TaskID &id,
const protocol::TaskT &data) { ASSERT_TRUE(false); };
for (size_t i = 0; i < ids.size(); ++i) {
RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, undesired_callback,
expected_failure_callback));
}
if (stop_at_end) {
auto stop_callback = [](AsyncGcsClient *client, const TaskID &id) { test->Stop(); };
RAY_CHECK_OK(
client->raylet_task_table().Lookup(job_id, ids[0], nullptr, stop_callback));
}
}
// Test delete function for keys of Log or Table.
void TestDeleteKeys(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
// Test delete function for keys of Log.
std::vector<std::shared_ptr<ObjectTableDataT>> object_vector;
auto AppendObjectData = [&object_vector](size_t add_count) {
for (size_t i = 0; i < add_count; ++i) {
auto data = std::make_shared<ObjectTableDataT>();
data->manager = ObjectID::from_random().hex();
object_vector.push_back(data);
}
};
// Test one element case.
AppendObjectData(1);
ASSERT_EQ(object_vector.size(), 1);
TestDeleteKeysFromLog(job_id, client, object_vector);
// Test the case for more than one elements and less than
// maximum_gcs_deletion_batch_size.
AppendObjectData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2);
ASSERT_GT(object_vector.size(), 1);
ASSERT_LT(object_vector.size(),
RayConfig::instance().maximum_gcs_deletion_batch_size());
TestDeleteKeysFromLog(job_id, client, object_vector);
// Test the case for more than maximum_gcs_deletion_batch_size.
// The Delete function will split the data into two commands.
AppendObjectData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2);
ASSERT_GT(object_vector.size(),
RayConfig::instance().maximum_gcs_deletion_batch_size());
TestDeleteKeysFromLog(job_id, client, object_vector);
// Test delete function for keys of Table.
std::vector<std::shared_ptr<protocol::TaskT>> task_vector;
auto AppendTaskData = [&task_vector](size_t add_count) {
for (size_t i = 0; i < add_count; ++i) {
auto task_data = std::make_shared<protocol::TaskT>();
task_data->task_specification = ObjectID::from_random().hex();
task_vector.push_back(task_data);
}
};
AppendTaskData(1);
ASSERT_EQ(task_vector.size(), 1);
TestDeleteKeysFromTable(job_id, client, task_vector, false);
AppendTaskData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2);
ASSERT_GT(task_vector.size(), 1);
ASSERT_LT(task_vector.size(), RayConfig::instance().maximum_gcs_deletion_batch_size());
TestDeleteKeysFromTable(job_id, client, task_vector, false);
AppendTaskData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2);
ASSERT_GT(task_vector.size(), RayConfig::instance().maximum_gcs_deletion_batch_size());
TestDeleteKeysFromTable(job_id, client, task_vector, true);
test->Start();
ASSERT_GT(test->NumCallbacks(),
9 * RayConfig::instance().maximum_gcs_deletion_batch_size());
}
TEST_F(TestGcsWithAsio, TestDeleteKey) {
test = this;
TestDeleteKeys(job_id_, client_);
}
// Task table callbacks.
void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id,
const TaskTableDataT &data) {

View file

@ -505,6 +505,67 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
return REDISMODULE_OK;
}
// The deleting helper function.
static Status DeleteKeyHelper(RedisModuleCtx *ctx, RedisModuleString *prefix_str,
RedisModuleString *id_data) {
RedisModuleKey *delete_key = nullptr;
RAY_RETURN_NOT_OK(
OpenPrefixedKey(&delete_key, ctx, prefix_str, id_data, REDISMODULE_READ));
if (delete_key == nullptr) {
return Status::RedisError("Key does not exist.");
}
auto key_type = RedisModule_KeyType(delete_key);
if (key_type == REDISMODULE_KEYTYPE_STRING || key_type == REDISMODULE_KEYTYPE_LIST) {
// Current Table or Log only has this two types of entries.
RAY_RETURN_NOT_OK(
OpenPrefixedKey(&delete_key, ctx, prefix_str, id_data, REDISMODULE_WRITE));
RedisModule_DeleteKey(delete_key);
} else {
std::ostringstream ostream;
size_t redis_string_size;
const char *redis_string_str = RedisModule_StringPtrLen(id_data, &redis_string_size);
auto id_binary = std::string(redis_string_str, redis_string_size);
ostream << "Undesired type for RAY.TableDelete: " << key_type
<< " id:" << ray::UniqueID::from_binary(id_binary);
RAY_LOG(ERROR) << ostream.str();
return Status::RedisError(ostream.str());
}
return Status::OK();
}
/// Delete a list of redis keys in batch mode.
///
/// This is called from a client with the command:
//
/// RAY.TABLE_DELETE <table_prefix> <pubsub_channel> <id> <data>
///
/// \param table_prefix The prefix string for keys in this table.
/// \param pubsub_channel Unused but follow the interface.
/// \param id This id will be ignored but follow the interface.
/// \param data The list of Unique Ids, kUniqueIDSize bytes for each.
/// \return Always return OK unless the arguments are invalid.
int TableDelete_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 5) {
return RedisModule_WrongArity(ctx);
}
RedisModuleString *prefix_str = argv[1];
RedisModuleString *data = argv[4];
size_t len = 0;
const char *data_ptr = nullptr;
data_ptr = RedisModule_StringPtrLen(data, &len);
REPLY_AND_RETURN_IF_FALSE(
len % kUniqueIDSize == 0,
"The deletion data length must be a multiple of the UniqueID size.");
size_t ids_to_delete = len / kUniqueIDSize;
for (size_t i = 0; i < ids_to_delete; ++i) {
RedisModuleString *id_data =
RedisModule_CreateString(ctx, data_ptr + i * kUniqueIDSize, kUniqueIDSize);
RAY_IGNORE_EXPR(DeleteKeyHelper(ctx, prefix_str, id_data));
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/// Request notifications for changes to a key. Returns the current value or
/// values at the key. Notifications will be sent to the requesting client for
/// every subsequent TABLE_ADD to the key.
@ -692,6 +753,7 @@ AUTO_MEMORY(TableAdd_RedisCommand);
AUTO_MEMORY(TableAppend_RedisCommand);
AUTO_MEMORY(TableLookup_RedisCommand);
AUTO_MEMORY(TableRequestNotifications_RedisCommand);
AUTO_MEMORY(TableDelete_RedisCommand);
AUTO_MEMORY(TableCancelNotifications_RedisCommand);
AUTO_MEMORY(TableTestAndUpdate_RedisCommand);
AUTO_MEMORY(DebugString_RedisCommand);
@ -727,6 +789,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "ray.table_delete", TableDelete_RedisCommand,
"write", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "ray.table_request_notifications",
TableRequestNotifications_RedisCommand, "write pubsub", 0,
0, 0) == REDISMODULE_ERR) {

View file

@ -172,6 +172,35 @@ Status Log<ID, Data>::CancelNotifications(const JobID &job_id, const ID &id,
pubsub_channel_, nullptr);
}
template <typename ID, typename Data>
void Log<ID, Data>::Delete(const JobID &job_id, const std::vector<ID> &ids) {
if (ids.empty()) {
return;
}
std::unordered_map<RedisContext *, std::ostringstream> sharded_data;
for (const auto &id : ids) {
sharded_data[GetRedisContext(id).get()] << id.binary();
}
// Breaking really large deletion commands into batches of smaller size.
const size_t batch_size =
RayConfig::instance().maximum_gcs_deletion_batch_size() * kUniqueIDSize;
for (const auto &pair : sharded_data) {
std::string current_data = pair.second.str();
for (size_t cur = 0; cur < pair.second.str().size(); cur += batch_size) {
RAY_IGNORE_EXPR(pair.first->RunAsync(
"RAY.TABLE_DELETE", UniqueID::nil(),
reinterpret_cast<const uint8_t *>(current_data.c_str() + cur),
std::min(batch_size, current_data.size() - cur), prefix_, pubsub_channel_,
/*redisCallback=*/nullptr));
}
}
}
template <typename ID, typename Data>
void Log<ID, Data>::Delete(const JobID &job_id, const ID &id) {
Delete(job_id, std::vector<ID>({id}));
}
template <typename ID, typename Data>
std::string Log<ID, Data>::DebugString() const {
std::stringstream result;
@ -462,7 +491,7 @@ Status ActorCheckpointIdTable::AddCheckpointId(const JobID &job_id,
<< actor_id;
copy->timestamps.erase(copy->timestamps.begin());
copy->checkpoint_ids.erase(0, kUniqueIDSize);
// TODO(hchen): also delete checkpoint data from GCS.
client_->actor_checkpoint_table().Delete(job_id, checkpoint_id);
}
RAY_CHECK_OK(Add(job_id, actor_id, copy, nullptr));
};

View file

@ -184,6 +184,20 @@ class Log : public LogInterface<ID, Data>, virtual public PubsubInterface<ID> {
Status CancelNotifications(const JobID &job_id, const ID &id,
const ClientID &client_id);
/// Delete an entire key from redis.
///
/// \param job_id The ID of the job (= driver).
/// \param id The ID of the data to delete from the GCS.
/// \return Void.
void Delete(const JobID &job_id, const ID &id);
/// Delete several keys from redis.
///
/// \param job_id The ID of the job (= driver).
/// \param ids The vector of IDs to delete from the GCS.
/// \return Void.
void Delete(const JobID &job_id, const std::vector<ID> &ids);
/// Returns debug string for class.
///
/// \return string.
@ -304,6 +318,12 @@ class Table : private Log<ID, Data>,
const Callback &subscribe, const FailureCallback &failure,
const SubscriptionCallback &done);
void Delete(const JobID &job_id, const ID &id) { Log<ID, Data>::Delete(job_id, id); }
void Delete(const JobID &job_id, const std::vector<ID> &ids) {
Log<ID, Data>::Delete(job_id, ids);
}
/// Returns debug string for class.
///
/// \return string.

View file

@ -150,3 +150,6 @@ RAY_CONFIG(int64_t, max_task_lease_timeout_ms, 60000);
/// checkpoint isn't atomic with saving the backend checkpoint, and it will break
/// if this number is set to 1 and users save application checkpoints in place.
RAY_CONFIG(uint32_t, num_actor_checkpoints_to_keep, 20);
/// Maximum number of ids in one batch to send to GCS to delete keys.
RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000);