Use real callback index in subscribe_callback_index_ (#2473)

This commit is contained in:
Yuhong Guo 2018-08-07 06:29:56 +08:00 committed by Robert Nishihara
parent 9825da7233
commit d35ce7fa63
3 changed files with 18 additions and 7 deletions

View file

@ -303,11 +303,14 @@ Status RedisContext::RunAsync(const std::string &command, const UniqueID &id,
Status RedisContext::SubscribeAsync(const ClientID &client_id, Status RedisContext::SubscribeAsync(const ClientID &client_id,
const TablePubsub pubsub_channel, const TablePubsub pubsub_channel,
const RedisCallback &redisCallback) { const RedisCallback &redisCallback,
int64_t *out_callback_index) {
RAY_CHECK(pubsub_channel != TablePubsub::NO_PUBLISH) RAY_CHECK(pubsub_channel != TablePubsub::NO_PUBLISH)
<< "Client requested subscribe on a table that does not support pubsub"; << "Client requested subscribe on a table that does not support pubsub";
int64_t callback_index = RedisCallbackManager::instance().add(redisCallback); int64_t callback_index = RedisCallbackManager::instance().add(redisCallback);
RAY_CHECK(out_callback_index != nullptr);
*out_callback_index = callback_index;
int status = 0; int status = 0;
if (client_id.is_nil()) { if (client_id.is_nil()) {
// Subscribe to all messages. // Subscribe to all messages.

View file

@ -67,13 +67,21 @@ class RedisContext {
/// \param log_length The RAY.TABLE_APPEND command takes in an optional index /// \param log_length The RAY.TABLE_APPEND command takes in an optional index
/// at which the data must be appended. For all other commands, set to /// at which the data must be appended. For all other commands, set to
/// -1 for unused. If set, then data must be provided. /// -1 for unused. If set, then data must be provided.
/// \return Status.
Status RunAsync(const std::string &command, const UniqueID &id, const uint8_t *data, Status RunAsync(const std::string &command, const UniqueID &id, const uint8_t *data,
int64_t length, const TablePrefix prefix, int64_t length, const TablePrefix prefix,
const TablePubsub pubsub_channel, RedisCallback redisCallback, const TablePubsub pubsub_channel, RedisCallback redisCallback,
int log_length = -1); int log_length = -1);
/// Subscribe to a specific Pub-Sub channel.
///
/// \param client_id The client ID that subscribe this message.
/// \param pubsub_channel The Pub-Sub channel to subscribe to.
/// \param redisCallback The callback function that the notification calls.
/// \param out_callback_index The output pointer to callback index.
/// \return Status.
Status SubscribeAsync(const ClientID &client_id, const TablePubsub pubsub_channel, Status SubscribeAsync(const ClientID &client_id, const TablePubsub pubsub_channel,
const RedisCallback &redisCallback); const RedisCallback &redisCallback, int64_t *out_callback_index);
redisAsyncContext *async_context() { return async_context_; } redisAsyncContext *async_context() { return async_context_; }
redisAsyncContext *subscribe_context() { return subscribe_context_; }; redisAsyncContext *subscribe_context() { return subscribe_context_; };

View file

@ -136,8 +136,8 @@ Status Log<ID, Data>::Subscribe(const JobID &job_id, const ClientID &client_id,
// more subscription messages. // more subscription messages.
return false; return false;
}; };
subscribe_callback_index_ = 1; return context_->SubscribeAsync(client_id, pubsub_channel_, std::move(callback),
return context_->SubscribeAsync(client_id, pubsub_channel_, std::move(callback)); &subscribe_callback_index_);
} }
template <typename ID, typename Data> template <typename ID, typename Data>