From 58a38e81d18093dc06dd2e414a3a2b2b8a84d2c0 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Fri, 24 Jul 2020 17:59:33 +0800 Subject: [PATCH] use help proto-init-macro for streaming config (#9272) --- streaming/src/config/streaming_config.cc | 55 +++++++++++------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/streaming/src/config/streaming_config.cc b/streaming/src/config/streaming_config.cc index da39eb960..8a89ad673 100644 --- a/streaming/src/config/streaming_config.cc +++ b/streaming/src/config/streaming_config.cc @@ -13,39 +13,34 @@ uint32_t StreamingConfig::DEFAULT_EMPTY_MESSAGE_TIME_INTERVAL = 20; // Time to force clean if barrier in queue, default 0ms const uint32_t StreamingConfig::MESSAGE_BUNDLE_MAX_SIZE = 2048; +#define RESET_IF_INT_CONF(KEY, VALUE) \ + if (0 != VALUE) { \ + Set##KEY(VALUE); \ + } +#define RESET_IF_STR_CONF(KEY, VALUE) \ + if (!VALUE.empty()) { \ + Set##KEY(VALUE); \ + } +#define RESET_IF_NOT_DEFAULT_CONF(KEY, VALUE, DEFAULT) \ + if (DEFAULT != VALUE) { \ + Set##KEY(VALUE); \ + } + void StreamingConfig::FromProto(const uint8_t *data, uint32_t size) { proto::StreamingConfig config; STREAMING_CHECK(config.ParseFromArray(data, size)) << "Parse streaming conf failed"; - if (!config.job_name().empty()) { - SetJobName(config.job_name()); - } - if (!config.worker_name().empty()) { - SetWorkerName(config.worker_name()); - } - if (!config.op_name().empty()) { - SetOpName(config.op_name()); - } - if (config.role() != proto::NodeType::UNKNOWN) { - SetNodeType(config.role()); - } - if (config.ring_buffer_capacity() != 0) { - SetRingBufferCapacity(config.ring_buffer_capacity()); - } - if (config.empty_message_interval() != 0) { - SetEmptyMessageTimeInterval(config.empty_message_interval()); - } - if (config.flow_control_type() != proto::FlowControlType::UNKNOWN_FLOW_CONTROL_TYPE) { - SetFlowControlType(config.flow_control_type()); - } - if (config.writer_consumed_step() != 0) { - SetWriterConsumedStep(config.writer_consumed_step()); - } - if (config.reader_consumed_step() != 0) { - SetReaderConsumedStep(config.reader_consumed_step()); - } - if (config.event_driven_flow_control_interval()) { - SetReaderConsumedStep(config.event_driven_flow_control_interval()); - } + RESET_IF_STR_CONF(JobName, config.job_name()) + RESET_IF_STR_CONF(WorkerName, config.worker_name()) + RESET_IF_STR_CONF(OpName, config.op_name()) + RESET_IF_NOT_DEFAULT_CONF(NodeType, config.role(), proto::NodeType::UNKNOWN) + RESET_IF_INT_CONF(RingBufferCapacity, config.ring_buffer_capacity()) + RESET_IF_INT_CONF(EmptyMessageTimeInterval, config.empty_message_interval()) + RESET_IF_NOT_DEFAULT_CONF(FlowControlType, config.flow_control_type(), + proto::FlowControlType::UNKNOWN_FLOW_CONTROL_TYPE) + RESET_IF_INT_CONF(WriterConsumedStep, config.writer_consumed_step()) + RESET_IF_INT_CONF(ReaderConsumedStep, config.reader_consumed_step()) + RESET_IF_INT_CONF(EventDrivenFlowControlInterval, + config.event_driven_flow_control_interval()) STREAMING_CHECK(writer_consumed_step_ >= reader_consumed_step_) << "Writer consuemd step " << writer_consumed_step_ << "can not be smaller then reader consumed step " << reader_consumed_step_;