diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 101d39c71..9edd9cb84 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -82,4 +82,6 @@ cdef extern from "ray/ray_config.h" nogil: uint32_t num_actor_checkpoints_to_keep() const + uint32_t maximum_gcs_deletion_batch_size() const + void initialize(const unordered_map[c_string, int] &config_map) diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 3c7ad1e5f..585cd32a6 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -148,3 +148,7 @@ cdef class Config: @staticmethod def num_actor_checkpoints_to_keep(): return RayConfig.instance().num_actor_checkpoints_to_keep() + + @staticmethod + def maximum_gcs_deletion_batch_size(): + return RayConfig.instance().maximum_gcs_deletion_batch_size() diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index a99e4ceb0..6bf2a5315 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -7,6 +7,7 @@ extern "C" { #include "ray/gcs/client.h" #include "ray/gcs/tables.h" +#include "ray/ray_config.h" namespace ray { @@ -255,6 +256,160 @@ TEST_F(TestGcsWithAsio, TestLogAppendAt) { TestLogAppendAt(job_id_, client_); } +void TestDeleteKeysFromLog(const JobID &job_id, + std::shared_ptr client, + std::vector> &data_vector) { + std::vector ids; + ObjectID object_id; + for (auto &data : data_vector) { + object_id = ObjectID::from_random(); + ids.push_back(object_id); + // Check that we added the correct object entries. + auto add_callback = [object_id, data](gcs::AsyncGcsClient *client, const UniqueID &id, + const ObjectTableDataT &d) { + ASSERT_EQ(id, object_id); + ASSERT_EQ(data->manager, d.manager); + test->IncrementNumCallbacks(); + }; + RAY_CHECK_OK(client->object_table().Append(job_id, object_id, data, add_callback)); + } + for (const auto &object_id : ids) { + // Check that lookup returns the added object entries. + auto lookup_callback = [object_id, data_vector]( + gcs::AsyncGcsClient *client, const ObjectID &id, + const std::vector &data) { + ASSERT_EQ(id, object_id); + ASSERT_EQ(data.size(), 1); + test->IncrementNumCallbacks(); + }; + RAY_CHECK_OK(client->object_table().Lookup(job_id, object_id, lookup_callback)); + } + if (ids.size() == 1) { + client->object_table().Delete(job_id, ids[0]); + } else { + client->object_table().Delete(job_id, ids); + } + for (const auto &object_id : ids) { + auto lookup_callback = [object_id](gcs::AsyncGcsClient *client, const ObjectID &id, + const std::vector &data) { + ASSERT_EQ(id, object_id); + ASSERT_TRUE(data.size() == 0); + test->IncrementNumCallbacks(); + }; + RAY_CHECK_OK(client->object_table().Lookup(job_id, object_id, lookup_callback)); + } +} + +void TestDeleteKeysFromTable(const JobID &job_id, + std::shared_ptr client, + std::vector> &data_vector, + bool stop_at_end) { + std::vector ids; + TaskID task_id; + for (auto &data : data_vector) { + task_id = TaskID::from_random(); + ids.push_back(task_id); + // Check that we added the correct object entries. + auto add_callback = [task_id, data](gcs::AsyncGcsClient *client, const UniqueID &id, + const protocol::TaskT &d) { + ASSERT_EQ(id, task_id); + ASSERT_EQ(data->task_specification, d.task_specification); + test->IncrementNumCallbacks(); + }; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback)); + } + for (const auto &task_id : ids) { + auto task_lookup_callback = [task_id](gcs::AsyncGcsClient *client, const TaskID &id, + const protocol::TaskT &data) { + ASSERT_EQ(id, task_id); + test->IncrementNumCallbacks(); + }; + RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, task_lookup_callback, + nullptr)); + } + if (ids.size() == 1) { + client->raylet_task_table().Delete(job_id, ids[0]); + } else { + client->raylet_task_table().Delete(job_id, ids); + } + auto expected_failure_callback = [](AsyncGcsClient *client, const TaskID &id) { + ASSERT_TRUE(true); + test->IncrementNumCallbacks(); + }; + auto undesired_callback = [](gcs::AsyncGcsClient *client, const TaskID &id, + const protocol::TaskT &data) { ASSERT_TRUE(false); }; + for (size_t i = 0; i < ids.size(); ++i) { + RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, undesired_callback, + expected_failure_callback)); + } + if (stop_at_end) { + auto stop_callback = [](AsyncGcsClient *client, const TaskID &id) { test->Stop(); }; + RAY_CHECK_OK( + client->raylet_task_table().Lookup(job_id, ids[0], nullptr, stop_callback)); + } +} + +// Test delete function for keys of Log or Table. +void TestDeleteKeys(const JobID &job_id, std::shared_ptr client) { + // Test delete function for keys of Log. + std::vector> object_vector; + auto AppendObjectData = [&object_vector](size_t add_count) { + for (size_t i = 0; i < add_count; ++i) { + auto data = std::make_shared(); + data->manager = ObjectID::from_random().hex(); + object_vector.push_back(data); + } + }; + // Test one element case. + AppendObjectData(1); + ASSERT_EQ(object_vector.size(), 1); + TestDeleteKeysFromLog(job_id, client, object_vector); + // Test the case for more than one elements and less than + // maximum_gcs_deletion_batch_size. + AppendObjectData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2); + ASSERT_GT(object_vector.size(), 1); + ASSERT_LT(object_vector.size(), + RayConfig::instance().maximum_gcs_deletion_batch_size()); + TestDeleteKeysFromLog(job_id, client, object_vector); + // Test the case for more than maximum_gcs_deletion_batch_size. + // The Delete function will split the data into two commands. + AppendObjectData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2); + ASSERT_GT(object_vector.size(), + RayConfig::instance().maximum_gcs_deletion_batch_size()); + TestDeleteKeysFromLog(job_id, client, object_vector); + + // Test delete function for keys of Table. + std::vector> task_vector; + auto AppendTaskData = [&task_vector](size_t add_count) { + for (size_t i = 0; i < add_count; ++i) { + auto task_data = std::make_shared(); + task_data->task_specification = ObjectID::from_random().hex(); + task_vector.push_back(task_data); + } + }; + AppendTaskData(1); + ASSERT_EQ(task_vector.size(), 1); + TestDeleteKeysFromTable(job_id, client, task_vector, false); + + AppendTaskData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2); + ASSERT_GT(task_vector.size(), 1); + ASSERT_LT(task_vector.size(), RayConfig::instance().maximum_gcs_deletion_batch_size()); + TestDeleteKeysFromTable(job_id, client, task_vector, false); + + AppendTaskData(RayConfig::instance().maximum_gcs_deletion_batch_size() / 2); + ASSERT_GT(task_vector.size(), RayConfig::instance().maximum_gcs_deletion_batch_size()); + TestDeleteKeysFromTable(job_id, client, task_vector, true); + + test->Start(); + ASSERT_GT(test->NumCallbacks(), + 9 * RayConfig::instance().maximum_gcs_deletion_batch_size()); +} + +TEST_F(TestGcsWithAsio, TestDeleteKey) { + test = this; + TestDeleteKeys(job_id_, client_); +} + // Task table callbacks. void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id, const TaskTableDataT &data) { diff --git a/src/ray/gcs/redis_module/ray_redis_module.cc b/src/ray/gcs/redis_module/ray_redis_module.cc index 6307afd50..7d0bc63ce 100644 --- a/src/ray/gcs/redis_module/ray_redis_module.cc +++ b/src/ray/gcs/redis_module/ray_redis_module.cc @@ -505,6 +505,67 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int return REDISMODULE_OK; } +// The deleting helper function. +static Status DeleteKeyHelper(RedisModuleCtx *ctx, RedisModuleString *prefix_str, + RedisModuleString *id_data) { + RedisModuleKey *delete_key = nullptr; + RAY_RETURN_NOT_OK( + OpenPrefixedKey(&delete_key, ctx, prefix_str, id_data, REDISMODULE_READ)); + if (delete_key == nullptr) { + return Status::RedisError("Key does not exist."); + } + auto key_type = RedisModule_KeyType(delete_key); + if (key_type == REDISMODULE_KEYTYPE_STRING || key_type == REDISMODULE_KEYTYPE_LIST) { + // Current Table or Log only has this two types of entries. + RAY_RETURN_NOT_OK( + OpenPrefixedKey(&delete_key, ctx, prefix_str, id_data, REDISMODULE_WRITE)); + RedisModule_DeleteKey(delete_key); + } else { + std::ostringstream ostream; + size_t redis_string_size; + const char *redis_string_str = RedisModule_StringPtrLen(id_data, &redis_string_size); + auto id_binary = std::string(redis_string_str, redis_string_size); + ostream << "Undesired type for RAY.TableDelete: " << key_type + << " id:" << ray::UniqueID::from_binary(id_binary); + RAY_LOG(ERROR) << ostream.str(); + return Status::RedisError(ostream.str()); + } + return Status::OK(); +} + +/// Delete a list of redis keys in batch mode. +/// +/// This is called from a client with the command: +// +/// RAY.TABLE_DELETE +/// +/// \param table_prefix The prefix string for keys in this table. +/// \param pubsub_channel Unused but follow the interface. +/// \param id This id will be ignored but follow the interface. +/// \param data The list of Unique Ids, kUniqueIDSize bytes for each. +/// \return Always return OK unless the arguments are invalid. +int TableDelete_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 5) { + return RedisModule_WrongArity(ctx); + } + RedisModuleString *prefix_str = argv[1]; + RedisModuleString *data = argv[4]; + + size_t len = 0; + const char *data_ptr = nullptr; + data_ptr = RedisModule_StringPtrLen(data, &len); + REPLY_AND_RETURN_IF_FALSE( + len % kUniqueIDSize == 0, + "The deletion data length must be a multiple of the UniqueID size."); + size_t ids_to_delete = len / kUniqueIDSize; + for (size_t i = 0; i < ids_to_delete; ++i) { + RedisModuleString *id_data = + RedisModule_CreateString(ctx, data_ptr + i * kUniqueIDSize, kUniqueIDSize); + RAY_IGNORE_EXPR(DeleteKeyHelper(ctx, prefix_str, id_data)); + } + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + /// Request notifications for changes to a key. Returns the current value or /// values at the key. Notifications will be sent to the requesting client for /// every subsequent TABLE_ADD to the key. @@ -692,6 +753,7 @@ AUTO_MEMORY(TableAdd_RedisCommand); AUTO_MEMORY(TableAppend_RedisCommand); AUTO_MEMORY(TableLookup_RedisCommand); AUTO_MEMORY(TableRequestNotifications_RedisCommand); +AUTO_MEMORY(TableDelete_RedisCommand); AUTO_MEMORY(TableCancelNotifications_RedisCommand); AUTO_MEMORY(TableTestAndUpdate_RedisCommand); AUTO_MEMORY(DebugString_RedisCommand); @@ -727,6 +789,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if (RedisModule_CreateCommand(ctx, "ray.table_delete", TableDelete_RedisCommand, + "write", 0, 0, 0) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + if (RedisModule_CreateCommand(ctx, "ray.table_request_notifications", TableRequestNotifications_RedisCommand, "write pubsub", 0, 0, 0) == REDISMODULE_ERR) { diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index c1342c159..e10048827 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -172,6 +172,35 @@ Status Log::CancelNotifications(const JobID &job_id, const ID &id, pubsub_channel_, nullptr); } +template +void Log::Delete(const JobID &job_id, const std::vector &ids) { + if (ids.empty()) { + return; + } + std::unordered_map sharded_data; + for (const auto &id : ids) { + sharded_data[GetRedisContext(id).get()] << id.binary(); + } + // Breaking really large deletion commands into batches of smaller size. + const size_t batch_size = + RayConfig::instance().maximum_gcs_deletion_batch_size() * kUniqueIDSize; + for (const auto &pair : sharded_data) { + std::string current_data = pair.second.str(); + for (size_t cur = 0; cur < pair.second.str().size(); cur += batch_size) { + RAY_IGNORE_EXPR(pair.first->RunAsync( + "RAY.TABLE_DELETE", UniqueID::nil(), + reinterpret_cast(current_data.c_str() + cur), + std::min(batch_size, current_data.size() - cur), prefix_, pubsub_channel_, + /*redisCallback=*/nullptr)); + } + } +} + +template +void Log::Delete(const JobID &job_id, const ID &id) { + Delete(job_id, std::vector({id})); +} + template std::string Log::DebugString() const { std::stringstream result; @@ -462,7 +491,7 @@ Status ActorCheckpointIdTable::AddCheckpointId(const JobID &job_id, << actor_id; copy->timestamps.erase(copy->timestamps.begin()); copy->checkpoint_ids.erase(0, kUniqueIDSize); - // TODO(hchen): also delete checkpoint data from GCS. + client_->actor_checkpoint_table().Delete(job_id, checkpoint_id); } RAY_CHECK_OK(Add(job_id, actor_id, copy, nullptr)); }; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 49b19114d..cfe280b97 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -184,6 +184,20 @@ class Log : public LogInterface, virtual public PubsubInterface { Status CancelNotifications(const JobID &job_id, const ID &id, const ClientID &client_id); + /// Delete an entire key from redis. + /// + /// \param job_id The ID of the job (= driver). + /// \param id The ID of the data to delete from the GCS. + /// \return Void. + void Delete(const JobID &job_id, const ID &id); + + /// Delete several keys from redis. + /// + /// \param job_id The ID of the job (= driver). + /// \param ids The vector of IDs to delete from the GCS. + /// \return Void. + void Delete(const JobID &job_id, const std::vector &ids); + /// Returns debug string for class. /// /// \return string. @@ -304,6 +318,12 @@ class Table : private Log, const Callback &subscribe, const FailureCallback &failure, const SubscriptionCallback &done); + void Delete(const JobID &job_id, const ID &id) { Log::Delete(job_id, id); } + + void Delete(const JobID &job_id, const std::vector &ids) { + Log::Delete(job_id, ids); + } + /// Returns debug string for class. /// /// \return string. diff --git a/src/ray/ray_config_def.h b/src/ray/ray_config_def.h index 895668332..f995a1d68 100644 --- a/src/ray/ray_config_def.h +++ b/src/ray/ray_config_def.h @@ -150,3 +150,6 @@ RAY_CONFIG(int64_t, max_task_lease_timeout_ms, 60000); /// checkpoint isn't atomic with saving the backend checkpoint, and it will break /// if this number is set to 1 and users save application checkpoints in place. RAY_CONFIG(uint32_t, num_actor_checkpoints_to_keep, 20); + +/// Maximum number of ids in one batch to send to GCS to delete keys. +RAY_CONFIG(uint32_t, maximum_gcs_deletion_batch_size, 1000);