use help proto-init-macro for streaming config (#9272)

This commit is contained in:
Lingxuan Zuo 2020-07-24 17:59:33 +08:00 committed by GitHub
parent d4324a4a8f
commit 58a38e81d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,39 +13,34 @@ uint32_t StreamingConfig::DEFAULT_EMPTY_MESSAGE_TIME_INTERVAL = 20;
// Time to force clean if barrier in queue, default 0ms
const uint32_t StreamingConfig::MESSAGE_BUNDLE_MAX_SIZE = 2048;
#define RESET_IF_INT_CONF(KEY, VALUE) \
if (0 != VALUE) { \
Set##KEY(VALUE); \
}
#define RESET_IF_STR_CONF(KEY, VALUE) \
if (!VALUE.empty()) { \
Set##KEY(VALUE); \
}
#define RESET_IF_NOT_DEFAULT_CONF(KEY, VALUE, DEFAULT) \
if (DEFAULT != VALUE) { \
Set##KEY(VALUE); \
}
void StreamingConfig::FromProto(const uint8_t *data, uint32_t size) {
proto::StreamingConfig config;
STREAMING_CHECK(config.ParseFromArray(data, size)) << "Parse streaming conf failed";
if (!config.job_name().empty()) {
SetJobName(config.job_name());
}
if (!config.worker_name().empty()) {
SetWorkerName(config.worker_name());
}
if (!config.op_name().empty()) {
SetOpName(config.op_name());
}
if (config.role() != proto::NodeType::UNKNOWN) {
SetNodeType(config.role());
}
if (config.ring_buffer_capacity() != 0) {
SetRingBufferCapacity(config.ring_buffer_capacity());
}
if (config.empty_message_interval() != 0) {
SetEmptyMessageTimeInterval(config.empty_message_interval());
}
if (config.flow_control_type() != proto::FlowControlType::UNKNOWN_FLOW_CONTROL_TYPE) {
SetFlowControlType(config.flow_control_type());
}
if (config.writer_consumed_step() != 0) {
SetWriterConsumedStep(config.writer_consumed_step());
}
if (config.reader_consumed_step() != 0) {
SetReaderConsumedStep(config.reader_consumed_step());
}
if (config.event_driven_flow_control_interval()) {
SetReaderConsumedStep(config.event_driven_flow_control_interval());
}
RESET_IF_STR_CONF(JobName, config.job_name())
RESET_IF_STR_CONF(WorkerName, config.worker_name())
RESET_IF_STR_CONF(OpName, config.op_name())
RESET_IF_NOT_DEFAULT_CONF(NodeType, config.role(), proto::NodeType::UNKNOWN)
RESET_IF_INT_CONF(RingBufferCapacity, config.ring_buffer_capacity())
RESET_IF_INT_CONF(EmptyMessageTimeInterval, config.empty_message_interval())
RESET_IF_NOT_DEFAULT_CONF(FlowControlType, config.flow_control_type(),
proto::FlowControlType::UNKNOWN_FLOW_CONTROL_TYPE)
RESET_IF_INT_CONF(WriterConsumedStep, config.writer_consumed_step())
RESET_IF_INT_CONF(ReaderConsumedStep, config.reader_consumed_step())
RESET_IF_INT_CONF(EventDrivenFlowControlInterval,
config.event_driven_flow_control_interval())
STREAMING_CHECK(writer_consumed_step_ >= reader_consumed_step_)
<< "Writer consuemd step " << writer_consumed_step_
<< "can not be smaller then reader consumed step " << reader_consumed_step_;