[Streaming] DataWriter use event driven model. (#7043)

* streaming writer use event driven model.

* minor changes according reviewer comments

* Fix according to reviewer's comments

* fix bazel lint

* code polished

* Add more comments

* rename Stop & Start of EventQueue to Freeze and Unfreeze.
This commit is contained in:
Lingxuan Zuo 2020-02-11 22:24:45 +08:00 committed by GitHub
parent 58c94f6381
commit 3d9bd64591
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 651 additions and 63 deletions

View file

@ -213,6 +213,14 @@ cc_test(
deps = test_common_deps, deps = test_common_deps,
) )
cc_test(
name = "event_service_tests",
srcs = [
"src/test/event_service_tests.cc",
],
deps = test_common_deps,
)
python_proto_compile( python_proto_compile(
name = "streaming_py_proto", name = "streaming_py_proto",
deps = ["//streaming:streaming_proto"], deps = ["//streaming:streaming_proto"],

View file

@ -207,9 +207,9 @@ struct MockQueueItem {
class MockQueue { class MockQueue {
public: public:
std::unordered_map<ObjectID, std::shared_ptr<AbstractRingBufferImpl<MockQueueItem>>> std::unordered_map<ObjectID, std::shared_ptr<AbstractRingBuffer<MockQueueItem>>>
message_buffer_; message_buffer_;
std::unordered_map<ObjectID, std::shared_ptr<AbstractRingBufferImpl<MockQueueItem>>> std::unordered_map<ObjectID, std::shared_ptr<AbstractRingBuffer<MockQueueItem>>>
consumed_buffer_; consumed_buffer_;
static std::mutex mutex; static std::mutex mutex;
static MockQueue &GetMockQueue() { static MockQueue &GetMockQueue() {

View file

@ -29,6 +29,17 @@ struct ProducerChannelInfo {
uint32_t queue_size; uint32_t queue_size;
int64_t message_pass_by_ts; int64_t message_pass_by_ts;
ActorID actor_id; ActorID actor_id;
/// The following parameters are used for event driven to record different
/// input events.
uint64_t sent_empty_cnt = 0;
uint64_t flow_control_cnt = 0;
uint64_t user_event_cnt = 0;
uint64_t rb_full_cnt = 0;
uint64_t queue_full_cnt = 0;
uint64_t in_event_queue_cnt = 0;
bool in_event_queue = false;
bool flow_control = false;
}; };
struct ConsumerChannelInfo { struct ConsumerChannelInfo {

View file

@ -95,8 +95,11 @@ StreamingStatus DataWriter::WriteBufferToChannel(ProducerChannelInfo &channel_in
} }
void DataWriter::Run() { void DataWriter::Run() {
STREAMING_LOG(INFO) << "WriterLoopForward start"; STREAMING_LOG(INFO) << "Event server start";
loop_thread_ = std::make_shared<std::thread>(&DataWriter::WriterLoopForward, this); event_service_->Run();
// Enable empty message timer after writer running.
empty_message_thread_ =
std::make_shared<std::thread>(&DataWriter::EmptyMessageTimerCallback, this);
} }
/// Since every memory ring buffer's size is limited, when the writing buffer is /// Since every memory ring buffer's size is limited, when the writing buffer is
@ -126,6 +129,18 @@ uint64_t DataWriter::WriteMessageToBufferRing(const ObjectID &q_id, uint8_t *dat
ring_buffer_ptr->Push(std::make_shared<StreamingMessage>( ring_buffer_ptr->Push(std::make_shared<StreamingMessage>(
data, data_size, write_message_id, message_type)); data, data_size, write_message_id, message_type));
if (ring_buffer_ptr->Size() == 1) {
if (channel_info.in_event_queue) {
++channel_info.in_event_queue_cnt;
STREAMING_LOG(DEBUG) << "user_event had been in event_queue";
} else if (!channel_info.flow_control) {
channel_info.in_event_queue = true;
Event event{&channel_info, EventType::UserEvent, false};
event_service_->Push(event);
++channel_info.user_event_cnt;
}
}
return write_message_id; return write_message_id;
} }
@ -177,6 +192,14 @@ StreamingStatus DataWriter::Init(const std::vector<ObjectID> &queue_id_vec,
return status; return status;
} }
} }
// Register empty event and user event to event server.
event_service_ = std::make_shared<EventService>();
event_service_->Register(
EventType::EmptyEvent,
std::bind(&DataWriter::SendEmptyToChannel, this, std::placeholders::_1));
event_service_->Register(EventType::UserEvent, std::bind(&DataWriter::WriteAllToChannel,
this, std::placeholders::_1));
runtime_context_->SetRuntimeStatus(RuntimeStatus::Running); runtime_context_->SetRuntimeStatus(RuntimeStatus::Running);
return StreamingStatus::OK; return StreamingStatus::OK;
} }
@ -190,9 +213,30 @@ DataWriter::~DataWriter() {
return; return;
} }
runtime_context_->SetRuntimeStatus(RuntimeStatus::Interrupted); runtime_context_->SetRuntimeStatus(RuntimeStatus::Interrupted);
if (loop_thread_->joinable()) { if (event_service_) {
STREAMING_LOG(INFO) << "Writer loop thread waiting for join"; event_service_->Stop();
loop_thread_->join(); if (empty_message_thread_->joinable()) {
STREAMING_LOG(INFO) << "Empty message thread waiting for join";
empty_message_thread_->join();
}
int user_event_count = 0;
int empty_event_count = 0;
int flow_control_event_count = 0;
int in_event_queue_cnt = 0;
int queue_full_cnt = 0;
for (auto &output_queue : output_queue_ids_) {
ProducerChannelInfo &channel_info = channel_info_map_[output_queue];
user_event_count += channel_info.user_event_cnt;
empty_event_count += channel_info.sent_empty_cnt;
flow_control_event_count += channel_info.flow_control_cnt;
in_event_queue_cnt += channel_info.in_event_queue_cnt;
queue_full_cnt += channel_info.queue_full_cnt;
}
STREAMING_LOG(WARNING) << "User event nums: " << user_event_count
<< ", empty event nums: " << empty_event_count
<< ", flow control event nums: " << flow_control_event_count
<< ", queue full nums: " << queue_full_cnt
<< ", in event queue: " << in_event_queue_cnt;
} }
STREAMING_LOG(INFO) << "Writer client queue disconnect."; STREAMING_LOG(INFO) << "Writer client queue disconnect.";
} }
@ -306,5 +350,102 @@ void DataWriter::Stop() {
std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::this_thread::sleep_for(std::chrono::milliseconds(200));
runtime_context_->SetRuntimeStatus(RuntimeStatus::Interrupted); runtime_context_->SetRuntimeStatus(RuntimeStatus::Interrupted);
} }
bool DataWriter::WriteAllToChannel(ProducerChannelInfo *info) {
ProducerChannelInfo &channel_info = *info;
channel_info.in_event_queue = false;
while (true) {
if (RuntimeStatus::Running != runtime_context_->GetRuntimeStatus()) {
return false;
}
uint64_t ring_buffer_remain = channel_info.writer_ring_buffer->Size();
StreamingStatus write_status = WriteBufferToChannel(channel_info, ring_buffer_remain);
int64_t current_ts = current_time_ms();
if (StreamingStatus::OK == write_status) {
channel_info.message_pass_by_ts = current_ts;
} else if (StreamingStatus::FullChannel == write_status ||
StreamingStatus::OutOfMemory == write_status) {
++channel_info.queue_full_cnt;
STREAMING_LOG(DEBUG) << "FullChannel after writing to channel, queue_full_cnt:"
<< channel_info.queue_full_cnt;
// TODO(lingxuan.zlx): we should notify consumed status to channel when
// flow control is supported.
} else if (StreamingStatus::EmptyRingBuffer != write_status) {
STREAMING_LOG(INFO) << channel_info.channel_id
<< ":something wrong when WriteToQueue "
<< "write buffer status => "
<< static_cast<uint32_t>(write_status);
break;
}
if (ring_buffer_remain == 0 &&
!channel_info.writer_ring_buffer->IsTransientAvaliable()) {
break;
}
}
return true;
}
bool DataWriter::SendEmptyToChannel(ProducerChannelInfo *channel_info) {
WriteEmptyMessage(*channel_info);
return true;
}
void DataWriter::EmptyMessageTimerCallback() {
while (true) {
if (RuntimeStatus::Running != runtime_context_->GetRuntimeStatus()) {
return;
}
int64_t current_ts = current_time_ms();
int64_t min_passby_message_ts = current_ts;
int count = 0;
for (auto output_queue : output_queue_ids_) {
if (RuntimeStatus::Running != runtime_context_->GetRuntimeStatus()) {
return;
}
ProducerChannelInfo &channel_info = channel_info_map_[output_queue];
if (channel_info.flow_control || channel_info.writer_ring_buffer->Size() ||
current_ts < channel_info.message_pass_by_ts) {
continue;
}
if (current_ts - channel_info.message_pass_by_ts >=
runtime_context_->GetConfig().GetEmptyMessageTimeInterval()) {
Event event{&channel_info, EventType::EmptyEvent, true};
event_service_->Push(event);
++channel_info.sent_empty_cnt;
++count;
continue;
}
if (min_passby_message_ts > channel_info.message_pass_by_ts) {
min_passby_message_ts = channel_info.message_pass_by_ts;
}
}
STREAMING_LOG(DEBUG) << "EmptyThd:produce empty_events:" << count
<< " eventqueue size:" << event_service_->EventNums()
<< " next_sleep_time:"
<< runtime_context_->GetConfig().GetEmptyMessageTimeInterval() -
current_ts + min_passby_message_ts;
for (const auto &output_queue : output_queue_ids_) {
ProducerChannelInfo &channel_info = channel_info_map_[output_queue];
STREAMING_LOG(DEBUG) << output_queue << "==ring_buffer size:"
<< channel_info.writer_ring_buffer->Size()
<< " transient_buffer size:"
<< channel_info.writer_ring_buffer->GetTransientBufferSize()
<< " in_event_queue:" << channel_info.in_event_queue
<< " flow_control:" << channel_info.flow_control
<< " user_event_cnt:" << channel_info.user_event_cnt
<< " flow_control_event:" << channel_info.flow_control_cnt
<< " empty_event_cnt:" << channel_info.sent_empty_cnt
<< " rb_full_cnt:" << channel_info.rb_full_cnt
<< " queue_full_cnt:" << channel_info.queue_full_cnt;
}
std::this_thread::sleep_for(std::chrono::milliseconds(
runtime_context_->GetConfig().GetEmptyMessageTimeInterval() - current_ts +
min_passby_message_ts));
}
}
} // namespace streaming } // namespace streaming
} // namespace ray } // namespace ray

View file

@ -9,6 +9,7 @@
#include "channel.h" #include "channel.h"
#include "config/streaming_config.h" #include "config/streaming_config.h"
#include "event_service.h"
#include "message/message_bundle.h" #include "message/message_bundle.h"
#include "runtime_context.h" #include "runtime_context.h"
@ -29,18 +30,6 @@ namespace streaming {
/// accordingly. It will sleep for a short interval to save cpu if all ring /// accordingly. It will sleep for a short interval to save cpu if all ring
/// buffers have no data in that moment. /// buffers have no data in that moment.
class DataWriter { class DataWriter {
private:
std::shared_ptr<std::thread> loop_thread_;
// One channel have unique identity.
std::vector<ObjectID> output_queue_ids_;
protected:
// ProducerTransfer is middle broker for data transporting.
std::unordered_map<ObjectID, ProducerChannelInfo> channel_info_map_;
std::unordered_map<ObjectID, std::shared_ptr<ProducerChannel>> channel_map_;
std::shared_ptr<Config> transfer_config_;
std::shared_ptr<RuntimeContext> runtime_context_;
private: private:
bool IsMessageAvailableInBuffer(ProducerChannelInfo &channel_info); bool IsMessageAvailableInBuffer(ProducerChannelInfo &channel_info);
@ -80,6 +69,16 @@ class DataWriter {
StreamingStatus InitChannel(const ObjectID &q_id, const ActorID &actor_id, StreamingStatus InitChannel(const ObjectID &q_id, const ActorID &actor_id,
uint64_t channel_message_id, uint64_t queue_size); uint64_t channel_message_id, uint64_t queue_size);
/// Write all messages to channel util ringbuffer is empty.
/// \param channel_info
bool WriteAllToChannel(ProducerChannelInfo *channel_info);
/// Trigger an empty message for channel with no valid data.
/// \param channel_info
bool SendEmptyToChannel(ProducerChannelInfo *channel_info);
void EmptyMessageTimerCallback();
public: public:
explicit DataWriter(std::shared_ptr<RuntimeContext> &runtime_context); explicit DataWriter(std::shared_ptr<RuntimeContext> &runtime_context);
virtual ~DataWriter(); virtual ~DataWriter();
@ -109,6 +108,21 @@ class DataWriter {
void Run(); void Run();
void Stop(); void Stop();
private:
std::shared_ptr<EventService> event_service_;
std::shared_ptr<std::thread> empty_message_thread_;
// One channel have unique identity.
std::vector<ObjectID> output_queue_ids_;
protected:
std::unordered_map<ObjectID, ProducerChannelInfo> channel_info_map_;
/// ProducerChannel is middle broker for data transporting and all downstream
/// producer channels will be channel_map_.
std::unordered_map<ObjectID, std::shared_ptr<ProducerChannel>> channel_map_;
std::shared_ptr<Config> transfer_config_;
std::shared_ptr<RuntimeContext> runtime_context_;
}; };
} // namespace streaming } // namespace streaming
} // namespace ray } // namespace ray

View file

@ -0,0 +1,197 @@
#include <unordered_set>
#include "event_service.h"
namespace ray {
namespace streaming {
EventQueue::~EventQueue() {
is_freezed_ = false;
no_full_cv_.notify_all();
no_empty_cv_.notify_all();
};
void EventQueue::Unfreeze() { is_freezed_ = true; }
void EventQueue::Freeze() {
is_freezed_ = false;
no_empty_cv_.notify_all();
no_full_cv_.notify_all();
}
void EventQueue::Push(const Event &t) {
std::unique_lock<std::mutex> lock(ring_buffer_mutex_);
while (Size() >= capacity_ && is_freezed_) {
STREAMING_LOG(WARNING) << " EventQueue is full, its size:" << Size()
<< " capacity:" << capacity_
<< " buffer size:" << buffer_.size()
<< " urgent_buffer size:" << urgent_buffer_.size();
no_full_cv_.wait(lock);
STREAMING_LOG(WARNING) << "Event server is full_sleep be notified";
}
if (!is_freezed_) {
return;
}
if (t.urgent) {
buffer_.push(t);
} else {
urgent_buffer_.push(t);
}
if (1 == Size()) {
no_empty_cv_.notify_one();
}
}
void EventQueue::Pop() {
std::unique_lock<std::mutex> lock(ring_buffer_mutex_);
if (Size() >= capacity_) {
STREAMING_LOG(WARNING) << "Pop should notify"
<< " size : " << Size();
}
if (urgent_) {
urgent_buffer_.pop();
} else {
buffer_.pop();
}
no_full_cv_.notify_all();
}
bool EventQueue::Get(Event &evt) {
std::unique_lock<std::mutex> lock(ring_buffer_mutex_);
while (Empty() && is_freezed_) {
no_empty_cv_.wait(lock);
}
if (!is_freezed_) {
return false;
}
if (!urgent_buffer_.empty()) {
urgent_ = true;
evt = urgent_buffer_.front();
} else {
urgent_ = false;
evt = buffer_.front();
}
return true;
}
Event EventQueue::PopAndGet() {
std::unique_lock<std::mutex> lock(ring_buffer_mutex_);
while (Empty() && is_freezed_) {
no_empty_cv_.wait(lock);
}
if (!is_freezed_) {
// Return error event if queue is freezed.
return Event({nullptr, EventType::ErrorEvent, false});
}
if (!urgent_buffer_.empty()) {
Event res = urgent_buffer_.front();
urgent_buffer_.pop();
if (Full()) {
no_full_cv_.notify_one();
}
return res;
}
Event res = buffer_.front();
buffer_.pop();
if (Size() + 1 == capacity_) no_full_cv_.notify_one();
return res;
}
Event &EventQueue::Front() {
std::unique_lock<std::mutex> lock(ring_buffer_mutex_);
if (urgent_buffer_.size()) {
return urgent_buffer_.front();
}
return buffer_.front();
}
EventService::EventService(uint32_t event_size)
: event_queue_(std::make_shared<EventQueue>(event_size)), stop_flag_(false) {}
EventService::~EventService() {
stop_flag_ = true;
// No need to join if loop thread has never been created.
if (loop_thread_ && loop_thread_->joinable()) {
STREAMING_LOG(WARNING) << "Loop Thread Stopped";
loop_thread_->join();
}
}
void EventService::Run() {
stop_flag_ = false;
event_queue_->Unfreeze();
loop_thread_ = std::make_shared<std::thread>(&EventService::LoopThreadHandler, this);
STREAMING_LOG(WARNING) << "event_server run";
}
void EventService::Stop() {
stop_flag_ = true;
event_queue_->Freeze();
if (loop_thread_->joinable()) {
loop_thread_->join();
}
STREAMING_LOG(WARNING) << "event_server stop";
}
bool EventService::Register(const EventType &type, const Handle &handle) {
if (event_handle_map_.find(type) != event_handle_map_.end()) {
STREAMING_LOG(WARNING) << "EventType had been registered!";
}
event_handle_map_[type] = handle;
return true;
}
void EventService::Push(const Event &event) { event_queue_->Push(event); }
void EventService::Execute(Event &event) {
if (event_handle_map_.find(event.type) == event_handle_map_.end()) {
STREAMING_LOG(WARNING) << "Handle has never been registered yet, type => "
<< static_cast<int>(event.type);
return;
}
Handle &handle = event_handle_map_[event.type];
if (handle(event.channel_info)) {
event_queue_->Pop();
}
}
void EventService::LoopThreadHandler() {
while (true) {
if (stop_flag_) {
break;
}
Event event;
if (event_queue_->Get(event)) {
Execute(event);
}
}
}
void EventService::RemoveDestroyedChannelEvent(const std::vector<ObjectID> &removed_ids) {
// NOTE(lingxuan.zlx): To prevent producing invalid event for removed
// channels, we pop out all invalid channel related events(push it to
// original queue if it has no connection with removed channels).
std::unordered_set<ObjectID> removed_set(removed_ids.begin(), removed_ids.end());
size_t total_event_nums = EventNums();
STREAMING_LOG(INFO) << "Remove Destroyed channel event, removed_ids size "
<< removed_ids.size() << ", total event size " << total_event_nums;
size_t removed_related_num = 0;
event_queue_->Freeze();
for (size_t i = 0; i < total_event_nums; ++i) {
Event event;
if (!event_queue_->Get(event) || !event.channel_info) {
STREAMING_LOG(WARNING) << "Fail to get event or channel_info is null, i = " << i;
continue;
}
if (removed_set.find(event.channel_info->channel_id) != removed_set.end()) {
removed_related_num++;
} else {
event_queue_->Push(event);
}
event_queue_->Pop();
}
event_queue_->Unfreeze();
STREAMING_LOG(INFO) << "Total event num => " << total_event_nums
<< ", removed related num => " << removed_related_num;
}
} // namespace streaming
} // namespace ray

View file

@ -0,0 +1,140 @@
#ifndef RAY_STREAMING_EVENT_SERVER_H
#define RAY_STREAMING_EVENT_SERVER_H
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
#include "channel.h"
#include "ring_buffer.h"
#include "util/streaming_util.h"
namespace ray {
namespace streaming {
enum class EventType : uint8_t {
// A message created by user writing.
UserEvent = 0,
// Unblock upstream writing when it's under flowcontrol.
FlowEvent = 1,
// Trigger an empty message by timer.
EmptyEvent = 2,
FullChannel = 3,
// Recovery at the beginning.
Reload = 4,
// Error event if event queue is freezed.
ErrorEvent = 5
};
struct EnumTypeHash {
template <typename T>
std::size_t operator()(const T &t) const {
return static_cast<std::size_t>(t);
}
};
struct Event {
ProducerChannelInfo *channel_info;
EventType type;
bool urgent;
};
/// Data writer utilizes what's called an event-driven programming model
/// that includes two important components: event service and event
/// queue. In the process of data transmission, the writer will first define
/// the processing method of corresponding events. However, by triggering
/// different events in actual operation, these events will be put into the event
/// queue, and finally the event server will schedule the previously registered
/// processing functions ordered by its priority.
class EventQueue {
public:
EventQueue(size_t size) : urgent_(false), capacity_(size), is_freezed_(true) {}
virtual ~EventQueue();
/// Resume event queue to normal model.
void Unfreeze();
/// Push is prohibited when event queue is freezed.
void Freeze();
void Push(const Event &t);
void Pop();
bool Get(Event &evt);
Event PopAndGet();
Event &Front();
inline size_t Capacity() const { return capacity_; }
/// It mainly divides event into two different levels: normal event and urgent
/// event, and the total size of the queue is the sum of them.
inline size_t Size() const { return buffer_.size() + urgent_buffer_.size(); }
private:
/// (NOTE:lingxuan.zlx) There is no strict thread-safe when query empty or full,
/// but it can reduce lock contention. In fact, these functions are thread-safe
/// when invoked via Push/Pop where buffer size will only be changed in whole process.
inline bool Empty() const { return buffer_.empty() && urgent_buffer_.empty(); }
inline bool Full() const { return buffer_.size() + urgent_buffer_.size() == capacity_; }
private:
std::mutex ring_buffer_mutex_;
std::condition_variable no_empty_cv_;
std::condition_variable no_full_cv_;
// Normal events wil be pushed into buffer_.
std::queue<Event> buffer_;
// This field urgent_buffer_ is used for serving urgent event.
std::queue<Event> urgent_buffer_;
// Urgent event will be poped out first if urgent_ flag is true.
bool urgent_;
size_t capacity_;
bool is_freezed_;
};
class EventService {
public:
/// User-define event handle for different types.
typedef std::function<bool(ProducerChannelInfo *info)> Handle;
EventService(uint32_t event_size = kEventQueueCapacity);
~EventService();
void Run();
void Stop();
bool Register(const EventType &type, const Handle &handle);
void Push(const Event &event);
inline size_t EventNums() const { return event_queue_->Size(); }
void RemoveDestroyedChannelEvent(const std::vector<ObjectID> &removed_ids);
private:
void Execute(Event &event);
/// A single thread should be invoked to run this loop function, so that
/// event server can poll and execute registered callback function event
/// one by one.
void LoopThreadHandler();
private:
std::unordered_map<EventType, Handle, EnumTypeHash> event_handle_map_;
std::shared_ptr<EventQueue> event_queue_;
std::shared_ptr<std::thread> loop_thread_;
static constexpr int kEventQueueCapacity = 1000;
bool stop_flag_;
};
} // namespace streaming
} // namespace ray
#endif // RAY_STREAMING_EVENT_SERVER_H

View file

@ -23,11 +23,6 @@ bool StreamingRingBuffer::Push(const StreamingMessagePtr &msg) {
return true; return true;
} }
bool StreamingRingBuffer::Push(StreamingMessagePtr &&msg) {
message_buffer_->Push(std::forward<StreamingMessagePtr>(msg));
return true;
}
StreamingMessagePtr &StreamingRingBuffer::Front() { StreamingMessagePtr &StreamingRingBuffer::Front() {
STREAMING_CHECK(!message_buffer_->Empty()); STREAMING_CHECK(!message_buffer_->Empty());
return message_buffer_->Front(); return message_buffer_->Front();
@ -38,11 +33,11 @@ void StreamingRingBuffer::Pop() {
message_buffer_->Pop(); message_buffer_->Pop();
} }
bool StreamingRingBuffer::IsFull() { return message_buffer_->Full(); } bool StreamingRingBuffer::IsFull() const { return message_buffer_->Full(); }
bool StreamingRingBuffer::IsEmpty() { return message_buffer_->Empty(); } bool StreamingRingBuffer::IsEmpty() const { return message_buffer_->Empty(); }
size_t StreamingRingBuffer::Size() { return message_buffer_->Size(); }; size_t StreamingRingBuffer::Size() const { return message_buffer_->Size(); }
size_t StreamingRingBuffer::Capacity() const { return message_buffer_->Capacity(); } size_t StreamingRingBuffer::Capacity() const { return message_buffer_->Capacity(); }

View file

@ -76,31 +76,22 @@ class StreamingTransientBuffer {
}; };
template <class T> template <class T>
class AbstractRingBufferImpl { class AbstractRingBuffer {
public: public:
virtual void Push(T &&) = 0;
virtual void Push(const T &) = 0; virtual void Push(const T &) = 0;
virtual void Pop() = 0; virtual void Pop() = 0;
virtual T &Front() = 0; virtual T &Front() = 0;
virtual bool Empty() = 0; virtual bool Empty() const = 0;
virtual bool Full() = 0; virtual bool Full() const = 0;
virtual size_t Size() = 0; virtual size_t Size() const = 0;
virtual size_t Capacity() = 0; virtual size_t Capacity() const = 0;
}; };
template <class T> template <class T>
class RingBufferImplThreadSafe : public AbstractRingBufferImpl<T> { class RingBufferImplThreadSafe : public AbstractRingBuffer<T> {
private:
boost::shared_mutex ring_buffer_mutex_;
boost::circular_buffer<T> buffer_;
public: public:
RingBufferImplThreadSafe(size_t size) : buffer_(size) {} RingBufferImplThreadSafe(size_t size) : buffer_(size) {}
virtual ~RingBufferImplThreadSafe() = default; virtual ~RingBufferImplThreadSafe() = default;
void Push(T &&t) {
boost::unique_lock<boost::shared_mutex> lock(ring_buffer_mutex_);
buffer_.push_back(t);
}
void Push(const T &t) { void Push(const T &t) {
boost::unique_lock<boost::shared_mutex> lock(ring_buffer_mutex_); boost::unique_lock<boost::shared_mutex> lock(ring_buffer_mutex_);
buffer_.push_back(t); buffer_.push_back(t);
@ -113,23 +104,27 @@ class RingBufferImplThreadSafe : public AbstractRingBufferImpl<T> {
boost::shared_lock<boost::shared_mutex> lock(ring_buffer_mutex_); boost::shared_lock<boost::shared_mutex> lock(ring_buffer_mutex_);
return buffer_.front(); return buffer_.front();
} }
bool Empty() { bool Empty() const {
boost::shared_lock<boost::shared_mutex> lock(ring_buffer_mutex_); boost::shared_lock<boost::shared_mutex> lock(ring_buffer_mutex_);
return buffer_.empty(); return buffer_.empty();
} }
bool Full() { bool Full() const {
boost::shared_lock<boost::shared_mutex> lock(ring_buffer_mutex_); boost::shared_lock<boost::shared_mutex> lock(ring_buffer_mutex_);
return buffer_.full(); return buffer_.full();
} }
size_t Size() { size_t Size() const {
boost::shared_lock<boost::shared_mutex> lock(ring_buffer_mutex_); boost::shared_lock<boost::shared_mutex> lock(ring_buffer_mutex_);
return buffer_.size(); return buffer_.size();
} }
size_t Capacity() { return buffer_.capacity(); } size_t Capacity() const { return buffer_.capacity(); }
private:
mutable boost::shared_mutex ring_buffer_mutex_;
boost::circular_buffer<T> buffer_;
}; };
template <class T> template <class T>
class RingBufferImplLockFree : public AbstractRingBufferImpl<T> { class RingBufferImplLockFree : public AbstractRingBuffer<T> {
private: private:
std::vector<T> buffer_; std::vector<T> buffer_;
std::atomic<size_t> capacity_; std::atomic<size_t> capacity_;
@ -141,12 +136,6 @@ class RingBufferImplLockFree : public AbstractRingBufferImpl<T> {
: buffer_(size, nullptr), capacity_(size), read_index_(0), write_index_(0) {} : buffer_(size, nullptr), capacity_(size), read_index_(0), write_index_(0) {}
virtual ~RingBufferImplLockFree() = default; virtual ~RingBufferImplLockFree() = default;
void Push(T &&t) {
STREAMING_CHECK(!Full());
buffer_[write_index_] = t;
write_index_ = IncreaseIndex(write_index_);
}
void Push(const T &t) { void Push(const T &t) {
STREAMING_CHECK(!Full()); STREAMING_CHECK(!Full());
buffer_[write_index_] = t; buffer_[write_index_] = t;
@ -163,13 +152,13 @@ class RingBufferImplLockFree : public AbstractRingBufferImpl<T> {
return buffer_[read_index_]; return buffer_[read_index_];
} }
bool Empty() { return write_index_ == read_index_; } bool Empty() const { return write_index_ == read_index_; }
bool Full() { return IncreaseIndex(write_index_) == read_index_; } bool Full() const { return IncreaseIndex(write_index_) == read_index_; }
size_t Size() { return (write_index_ + capacity_ - read_index_) % capacity_; } size_t Size() const { return (write_index_ + capacity_ - read_index_) % capacity_; }
size_t Capacity() { return capacity_; } size_t Capacity() const { return capacity_; }
private: private:
size_t IncreaseIndex(size_t index) const { return (index + 1) % capacity_; } size_t IncreaseIndex(size_t index) const { return (index + 1) % capacity_; }
@ -185,7 +174,7 @@ enum class StreamingRingBufferType : uint8_t { SPSC_LOCK, SPSC };
/// it cann't be removed currently. /// it cann't be removed currently.
class StreamingRingBuffer { class StreamingRingBuffer {
private: private:
std::shared_ptr<AbstractRingBufferImpl<StreamingMessagePtr>> message_buffer_; std::shared_ptr<AbstractRingBuffer<StreamingMessagePtr>> message_buffer_;
StreamingTransientBuffer transient_buffer_; StreamingTransientBuffer transient_buffer_;
@ -193,19 +182,17 @@ class StreamingRingBuffer {
explicit StreamingRingBuffer(size_t buf_size, StreamingRingBufferType buffer_type = explicit StreamingRingBuffer(size_t buf_size, StreamingRingBufferType buffer_type =
StreamingRingBufferType::SPSC_LOCK); StreamingRingBufferType::SPSC_LOCK);
bool Push(StreamingMessagePtr &&msg);
bool Push(const StreamingMessagePtr &msg); bool Push(const StreamingMessagePtr &msg);
StreamingMessagePtr &Front(); StreamingMessagePtr &Front();
void Pop(); void Pop();
bool IsFull(); bool IsFull() const;
bool IsEmpty(); bool IsEmpty() const;
size_t Size(); size_t Size() const;
size_t Capacity() const; size_t Capacity() const;

View file

@ -0,0 +1,95 @@
#include <thread>
#include "event_service.h"
#include "gtest/gtest.h"
using namespace ray::streaming;
/// Mock function for send empty message.
bool SendEmptyToChannel(ProducerChannelInfo *info) { return true; }
/// Mock function for write all messages to channel.
bool WriteAllToChannel(ProducerChannelInfo *info) { return true; }
bool stop = false;
TEST(EventServiceTest, Test1) {
std::shared_ptr<EventService> server = std::make_shared<EventService>();
ProducerChannelInfo mock_channel_info;
server->Register(EventType::EmptyEvent, SendEmptyToChannel);
server->Register(EventType::UserEvent, WriteAllToChannel);
server->Register(EventType::FlowEvent, WriteAllToChannel);
std::thread thread_empty([server, &mock_channel_info] {
std::chrono::milliseconds MockTimer(20);
while (true) {
if (stop) break;
Event event{&mock_channel_info, EventType::EmptyEvent, true};
server->Push(event);
std::this_thread::sleep_for(MockTimer);
}
});
std::thread thread_flow([server, &mock_channel_info] {
std::chrono::milliseconds MockTimer(2);
while (true) {
if (stop) break;
Event event{&mock_channel_info, EventType::FlowEvent, true};
server->Push(event);
std::this_thread::sleep_for(MockTimer);
}
});
std::thread thread_user([server, &mock_channel_info] {
std::chrono::milliseconds MockTimer(2);
while (true) {
if (stop) break;
Event event{&mock_channel_info, EventType::UserEvent, true};
server->Push(event);
std::this_thread::sleep_for(MockTimer);
}
});
server->Run();
std::this_thread::sleep_for(std::chrono::milliseconds(5 * 1000));
STREAMING_LOG(INFO) << "5 seconds passed.";
STREAMING_LOG(INFO) << "EventNums: " << server->EventNums();
stop = true;
STREAMING_LOG(INFO) << "Stop";
server->Stop();
thread_empty.join();
thread_flow.join();
thread_user.join();
}
TEST(EventServiceTest, remove_delete_channel_event) {
std::shared_ptr<EventService> server = std::make_shared<EventService>();
std::vector<ObjectID> channel_vec;
std::vector<ProducerChannelInfo> mock_channel_info_vec;
channel_vec.push_back(ObjectID::FromRandom());
ProducerChannelInfo mock_channel_info1;
mock_channel_info1.channel_id = channel_vec.back();
mock_channel_info_vec.push_back(mock_channel_info1);
ProducerChannelInfo mock_channel_info2;
channel_vec.push_back(ObjectID::FromRandom());
mock_channel_info2.channel_id = channel_vec.back();
mock_channel_info_vec.push_back(mock_channel_info2);
for (auto &id : mock_channel_info_vec) {
Event empty_event{&id, EventType::EmptyEvent, true};
Event user_event{&id, EventType::UserEvent, true};
Event flow_event{&id, EventType::FlowEvent, true};
server->Push(empty_event);
server->Push(user_event);
server->Push(flow_event);
}
std::vector<ObjectID> removed_vec(channel_vec.begin(), channel_vec.begin() + 1);
server->RemoveDestroyedChannelEvent(removed_vec);
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}