The type of process data.
+ * @param Type of the specific operator class.
+ */
+public abstract class StreamProcessor implements Processor {
+
+ protected List collectors;
+ protected RuntimeContext runtimeContext;
+ protected P operator;
+
+ public StreamProcessor(P operator) {
+ this.operator = operator;
+ }
+
+ @Override
+ public void open(List collectors, RuntimeContext runtimeContext) {
+ this.collectors = collectors;
+ this.runtimeContext = runtimeContext;
+ if (operator != null) {
+ this.operator.open(collectors, runtimeContext);
+ }
+ }
+
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/core/processor/TwoInputProcessor.java b/java/streaming/src/main/java/org/ray/streaming/core/processor/TwoInputProcessor.java
new file mode 100644
index 000000000..88094b0c8
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/core/processor/TwoInputProcessor.java
@@ -0,0 +1,37 @@
+package org.ray.streaming.core.processor;
+
+import org.ray.streaming.message.Record;
+import org.ray.streaming.operator.TwoInputOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwoInputProcessor extends StreamProcessor> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwoInputProcessor.class);
+
+ // TODO(zhenxuanpan): Set leftStream and rightStream.
+ private String leftStream;
+ private String rigthStream;
+
+ public TwoInputProcessor(TwoInputOperator operator) {
+ super(operator);
+ }
+
+ @Override
+ public void process(Record record) {
+ try {
+ if (record.getStream().equals(leftStream)) {
+ this.operator.processElement(record, null);
+ } else {
+ this.operator.processElement(null, record);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ this.operator.close();
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/StreamWorker.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/StreamWorker.java
new file mode 100644
index 000000000..292d05b5b
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/StreamWorker.java
@@ -0,0 +1,86 @@
+package org.ray.streaming.core.runtime;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.ray.api.annotation.RayRemote;
+import org.ray.streaming.api.collector.Collector;
+import org.ray.streaming.core.command.BatchInfo;
+import org.ray.streaming.core.graph.ExecutionEdge;
+import org.ray.streaming.core.graph.ExecutionGraph;
+import org.ray.streaming.core.graph.ExecutionNode;
+import org.ray.streaming.core.graph.ExecutionNode.NodeType;
+import org.ray.streaming.core.graph.ExecutionTask;
+import org.ray.streaming.core.processor.MasterProcessor;
+import org.ray.streaming.core.processor.StreamProcessor;
+import org.ray.streaming.core.runtime.collector.RayCallCollector;
+import org.ray.streaming.core.runtime.context.RayRuntimeContext;
+import org.ray.streaming.core.runtime.context.RuntimeContext;
+import org.ray.streaming.core.runtime.context.WorkerContext;
+import org.ray.streaming.message.Message;
+import org.ray.streaming.message.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The stream worker, it is a ray actor.
+ */
+@RayRemote
+public class StreamWorker implements Serializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamWorker.class);
+
+ private int taskId;
+ private WorkerContext workerContext;
+ private StreamProcessor streamProcessor;
+ private NodeType nodeType;
+
+ public StreamWorker() {
+ }
+
+ public Boolean init(WorkerContext workerContext) {
+ this.workerContext = workerContext;
+ this.taskId = workerContext.getTaskId();
+ ExecutionGraph executionGraph = this.workerContext.getExecutionGraph();
+ ExecutionTask executionTask = executionGraph.getExecutionTaskByTaskId(taskId);
+ ExecutionNode executionNode = executionGraph.getExecutionNodeByTaskId(taskId);
+
+ this.nodeType = executionNode.getNodeType();
+ this.streamProcessor = executionNode.getStreamProcessor();
+ LOGGER.debug("Initializing StreamWorker, taskId: {}, operator: {}.", taskId, streamProcessor);
+
+ List executionEdges = executionNode.getExecutionEdgeList();
+
+ List collectors = new ArrayList<>();
+ for (ExecutionEdge executionEdge : executionEdges) {
+ collectors.add(new RayCallCollector(taskId, executionEdge, executionGraph));
+ }
+
+ RuntimeContext runtimeContext = new RayRuntimeContext(executionTask, workerContext.getConfig(),
+ executionNode.getParallelism());
+ if (this.nodeType == NodeType.MASTER) {
+ ((MasterProcessor) streamProcessor).open(collectors, runtimeContext, executionGraph);
+ } else {
+ this.streamProcessor.open(collectors, runtimeContext);
+ }
+ return true;
+ }
+
+ public Boolean process(Message message) {
+ LOGGER.debug("Processing message, taskId: {}, message: {}.", taskId, message);
+ if (nodeType == NodeType.SOURCE) {
+ Record record = message.getRecord(0);
+ BatchInfo batchInfo = (BatchInfo) record.getValue();
+ this.streamProcessor.process(batchInfo.getBatchId());
+ } else {
+ List records = message.getRecordList();
+ for (Record record : records) {
+ record.setBatchId(message.getBatchId());
+ record.setStream(message.getStream());
+ this.streamProcessor.process(record);
+ }
+ }
+ return true;
+ }
+
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/CollectionCollector.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/CollectionCollector.java
new file mode 100644
index 000000000..03ef391d2
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/CollectionCollector.java
@@ -0,0 +1,26 @@
+package org.ray.streaming.core.runtime.collector;
+
+import java.util.List;
+import org.ray.streaming.api.collector.Collector;
+import org.ray.streaming.message.Record;
+
+/**
+ * Combination of multiple collectors.
+ *
+ * @param The type of output data.
+ */
+public class CollectionCollector implements Collector {
+
+ private List collectorList;
+
+ public CollectionCollector(List collectorList) {
+ this.collectorList = collectorList;
+ }
+
+ @Override
+ public void collect(T value) {
+ for (Collector collector : collectorList) {
+ collector.collect(new Record(value));
+ }
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/RayCallCollector.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/RayCallCollector.java
new file mode 100644
index 000000000..e5331a44e
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/RayCallCollector.java
@@ -0,0 +1,58 @@
+package org.ray.streaming.core.runtime.collector;
+
+import java.util.Arrays;
+import java.util.Map;
+import org.ray.api.Ray;
+import org.ray.api.RayActor;
+import org.ray.streaming.api.collector.Collector;
+import org.ray.streaming.api.partition.Partition;
+import org.ray.streaming.core.graph.ExecutionEdge;
+import org.ray.streaming.core.graph.ExecutionGraph;
+import org.ray.streaming.core.runtime.StreamWorker;
+import org.ray.streaming.message.Message;
+import org.ray.streaming.message.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The collector that emits data via Ray remote calls.
+ */
+public class RayCallCollector implements Collector {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RayCallCollector.class);
+
+ private int taskId;
+ private String stream;
+ private Map> taskId2Worker;
+ private int[] targetTaskIds;
+ private Partition partition;
+
+ public RayCallCollector(int taskId, ExecutionEdge executionEdge, ExecutionGraph executionGraph) {
+ this.taskId = taskId;
+ this.stream = executionEdge.getStream();
+ int targetNodeId = executionEdge.getTargetNodeId();
+ taskId2Worker = executionGraph
+ .getTaskId2WorkerByNodeId(targetNodeId);
+ targetTaskIds = Arrays.stream(taskId2Worker.keySet()
+ .toArray(new Integer[taskId2Worker.size()]))
+ .mapToInt(Integer::valueOf).toArray();
+
+ this.partition = executionEdge.getPartition();
+ LOGGER.debug("RayCallCollector constructed, taskId:{}, add stream:{}, partition:{}.",
+ taskId, stream, this.partition);
+ }
+
+ @Override
+ public void collect(Record record) {
+ int[] taskIds = this.partition.partition(record, targetTaskIds);
+ LOGGER.debug("Sending data from task {} to remote tasks {}, collector stream:{}, record:{}",
+ taskId, taskIds, stream, record);
+ Message message = new Message(taskId, record.getBatchId(), stream, record);
+ for (int targetTaskId : taskIds) {
+ RayActor streamWorker = this.taskId2Worker.get(targetTaskId);
+ // Use ray call to send message to downstream actor.
+ Ray.call(StreamWorker::process, streamWorker, message);
+ }
+ }
+
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RayRuntimeContext.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RayRuntimeContext.java
new file mode 100644
index 000000000..6d796c30e
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RayRuntimeContext.java
@@ -0,0 +1,58 @@
+package org.ray.streaming.core.runtime.context;
+
+import static org.ray.streaming.util.ConfigKey.STREAMING_MAX_BATCH_COUNT;
+
+import java.util.Map;
+import org.ray.streaming.core.graph.ExecutionTask;
+
+/**
+ * Use Ray to implement RuntimeContext.
+ */
+public class RayRuntimeContext implements RuntimeContext {
+
+ private int taskId;
+ private int taskIndex;
+ private int parallelism;
+ private Long batchId;
+ private Map config;
+
+ public RayRuntimeContext(ExecutionTask executionTask, Map config,
+ int parallelism) {
+ this.taskId = executionTask.getTaskId();
+ this.config = config;
+ this.taskIndex = executionTask.getTaskIndex();
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public int getTaskId() {
+ return taskId;
+ }
+
+ @Override
+ public int getTaskIndex() {
+ return taskIndex;
+ }
+
+ @Override
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ @Override
+ public Long getBatchId() {
+ return batchId;
+ }
+
+ @Override
+ public Long getMaxBatch() {
+ if (config.containsKey(STREAMING_MAX_BATCH_COUNT)) {
+ return Long.valueOf(String.valueOf(config.get(STREAMING_MAX_BATCH_COUNT)));
+ }
+ return Long.MAX_VALUE;
+ }
+
+ public void setBatchId(Long batchId) {
+ this.batchId = batchId;
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RuntimeContext.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RuntimeContext.java
new file mode 100644
index 000000000..984987e29
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RuntimeContext.java
@@ -0,0 +1,19 @@
+package org.ray.streaming.core.runtime.context;
+
+/**
+ * Encapsulate the runtime information of a streaming task.
+ */
+public interface RuntimeContext {
+
+ int getTaskId();
+
+ int getTaskIndex();
+
+ int getParallelism();
+
+ Long getBatchId();
+
+ Long getMaxBatch();
+
+
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/WorkerContext.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/WorkerContext.java
new file mode 100644
index 000000000..39117fd7b
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/WorkerContext.java
@@ -0,0 +1,41 @@
+package org.ray.streaming.core.runtime.context;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.ray.streaming.core.graph.ExecutionGraph;
+
+/**
+ * Encapsulate the context information for worker initialization.
+ */
+public class WorkerContext implements Serializable {
+
+ private int taskId;
+ private ExecutionGraph executionGraph;
+ private Map config;
+
+ public WorkerContext(int taskId, ExecutionGraph executionGraph, Map jobConfig) {
+ this.taskId = taskId;
+ this.executionGraph = executionGraph;
+ this.config = jobConfig;
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+
+ public ExecutionGraph getExecutionGraph() {
+ return executionGraph;
+ }
+
+ public void setExecutionGraph(ExecutionGraph executionGraph) {
+ this.executionGraph = executionGraph;
+ }
+
+ public Map getConfig() {
+ return config;
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/message/KeyRecord.java b/java/streaming/src/main/java/org/ray/streaming/message/KeyRecord.java
new file mode 100644
index 000000000..de4fb2767
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/message/KeyRecord.java
@@ -0,0 +1,20 @@
+package org.ray.streaming.message;
+
+
+public class KeyRecord extends Record {
+
+ private K key;
+
+ public KeyRecord(K key, T value) {
+ super(value);
+ this.key = key;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public void setKey(K key) {
+ this.key = key;
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/message/Message.java b/java/streaming/src/main/java/org/ray/streaming/message/Message.java
new file mode 100644
index 000000000..164aba7d5
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/message/Message.java
@@ -0,0 +1,64 @@
+package org.ray.streaming.message;
+
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.List;
+
+public class Message implements Serializable {
+
+ private int taskId;
+ private long batchId;
+ private String stream;
+ private List recordList;
+
+ public Message(int taskId, long batchId, String stream, List recordList) {
+ this.taskId = taskId;
+ this.batchId = batchId;
+ this.stream = stream;
+ this.recordList = recordList;
+ }
+
+ public Message(int taskId, long batchId, String stream, Record record) {
+ this.taskId = taskId;
+ this.batchId = batchId;
+ this.stream = stream;
+ this.recordList = Lists.newArrayList(record);
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+
+ public long getBatchId() {
+ return batchId;
+ }
+
+ public void setBatchId(long batchId) {
+ this.batchId = batchId;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public List getRecordList() {
+ return recordList;
+ }
+
+ public void setRecordList(List recordList) {
+ this.recordList = recordList;
+ }
+
+ public Record getRecord(int index) {
+ return recordList.get(0);
+ }
+
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/message/Record.java b/java/streaming/src/main/java/org/ray/streaming/message/Record.java
new file mode 100644
index 000000000..5898fc63a
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/message/Record.java
@@ -0,0 +1,50 @@
+package org.ray.streaming.message;
+
+import java.io.Serializable;
+
+
+public class Record implements Serializable {
+
+ protected transient String stream;
+ protected transient long batchId;
+ protected T value;
+
+ public Record(T value) {
+ this.value = value;
+ }
+
+ public Record(long batchId, T value) {
+ this.batchId = batchId;
+ this.value = value;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public long getBatchId() {
+ return batchId;
+ }
+
+ public void setBatchId(long batchId) {
+ this.batchId = batchId;
+ }
+
+ @Override
+ public String toString() {
+ return value.toString();
+ }
+
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/OneInputOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/OneInputOperator.java
new file mode 100644
index 000000000..fdc620740
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/OneInputOperator.java
@@ -0,0 +1,13 @@
+package org.ray.streaming.operator;
+
+import org.ray.streaming.message.Record;
+
+
+public interface OneInputOperator extends Operator {
+
+ void processElement(Record record) throws Exception;
+
+ default OperatorType getOpType() {
+ return OperatorType.ONE_INPUT;
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/Operator.java b/java/streaming/src/main/java/org/ray/streaming/operator/Operator.java
new file mode 100644
index 000000000..46e8f2b25
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/Operator.java
@@ -0,0 +1,17 @@
+package org.ray.streaming.operator;
+
+import java.io.Serializable;
+import java.util.List;
+import org.ray.streaming.api.collector.Collector;
+import org.ray.streaming.core.runtime.context.RuntimeContext;
+
+public interface Operator extends Serializable {
+
+ void open(List collectors, RuntimeContext runtimeContext);
+
+ void finish();
+
+ void close();
+
+ OperatorType getOpType();
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/OperatorType.java b/java/streaming/src/main/java/org/ray/streaming/operator/OperatorType.java
new file mode 100644
index 000000000..cc8f56406
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/OperatorType.java
@@ -0,0 +1,9 @@
+package org.ray.streaming.operator;
+
+
+public enum OperatorType {
+ MASTER,
+ SOURCE,
+ ONE_INPUT,
+ TWO_INPUT,
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/StreamOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/StreamOperator.java
new file mode 100644
index 000000000..5294e9115
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/StreamOperator.java
@@ -0,0 +1,47 @@
+package org.ray.streaming.operator;
+
+import java.util.List;
+import org.ray.streaming.api.collector.Collector;
+import org.ray.streaming.api.function.Function;
+import org.ray.streaming.core.runtime.context.RuntimeContext;
+import org.ray.streaming.message.KeyRecord;
+import org.ray.streaming.message.Record;
+
+public abstract class StreamOperator implements Operator {
+
+ protected F function;
+ protected List collectorList;
+ protected RuntimeContext runtimeContext;
+
+
+ public StreamOperator(F function) {
+ this.function = function;
+ }
+
+ public void open(List collectorList, RuntimeContext runtimeContext) {
+ this.collectorList = collectorList;
+ this.runtimeContext = runtimeContext;
+ }
+
+ public void finish() {
+
+ }
+
+ public void close() {
+
+ }
+
+
+ protected void collect(Record record) {
+ for (Collector collector : this.collectorList) {
+ collector.collect(record);
+ }
+ }
+
+ protected void collect(KeyRecord keyRecord) {
+ for (Collector collector : this.collectorList) {
+ collector.collect(keyRecord);
+ }
+ }
+
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/TwoInputOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/TwoInputOperator.java
new file mode 100644
index 000000000..5528b0875
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/TwoInputOperator.java
@@ -0,0 +1,13 @@
+package org.ray.streaming.operator;
+
+import org.ray.streaming.message.Record;
+
+
+public interface TwoInputOperator extends Operator {
+
+ void processElement(Record record1, Record record2);
+
+ default OperatorType getOpType() {
+ return OperatorType.TWO_INPUT;
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/FlatMapOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/FlatMapOperator.java
new file mode 100644
index 000000000..556c17077
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/FlatMapOperator.java
@@ -0,0 +1,31 @@
+package org.ray.streaming.operator.impl;
+
+import java.util.List;
+import org.ray.streaming.api.collector.Collector;
+import org.ray.streaming.api.function.impl.FlatMapFunction;
+import org.ray.streaming.core.runtime.collector.CollectionCollector;
+import org.ray.streaming.core.runtime.context.RuntimeContext;
+import org.ray.streaming.message.Record;
+import org.ray.streaming.operator.OneInputOperator;
+import org.ray.streaming.operator.StreamOperator;
+
+public class FlatMapOperator extends StreamOperator> implements
+ OneInputOperator {
+
+ private CollectionCollector collectionCollector;
+
+ public FlatMapOperator(FlatMapFunction flatMapFunction) {
+ super(flatMapFunction);
+ }
+
+ @Override
+ public void open(List collectorList, RuntimeContext runtimeContext) {
+ super.open(collectorList, runtimeContext);
+ this.collectionCollector = new CollectionCollector(collectorList);
+ }
+
+ @Override
+ public void processElement(Record record) throws Exception {
+ this.function.flatMap(record.getValue(), (Collector) collectionCollector);
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/KeyByOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/KeyByOperator.java
new file mode 100644
index 000000000..5ace0b13f
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/KeyByOperator.java
@@ -0,0 +1,22 @@
+package org.ray.streaming.operator.impl;
+
+import org.ray.streaming.api.function.impl.KeyFunction;
+import org.ray.streaming.message.KeyRecord;
+import org.ray.streaming.message.Record;
+import org.ray.streaming.operator.OneInputOperator;
+import org.ray.streaming.operator.StreamOperator;
+
+public class KeyByOperator extends StreamOperator> implements
+ OneInputOperator {
+
+ public KeyByOperator(KeyFunction keyFunction) {
+ super(keyFunction);
+ }
+
+ @Override
+ public void processElement(Record record) throws Exception {
+ K key = this.function.keyBy(record.getValue());
+ collect(new KeyRecord<>(key, record.getValue()));
+ }
+}
+
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/MapOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/MapOperator.java
new file mode 100644
index 000000000..7fc260479
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/MapOperator.java
@@ -0,0 +1,20 @@
+package org.ray.streaming.operator.impl;
+
+import org.ray.streaming.api.function.impl.MapFunction;
+import org.ray.streaming.message.Record;
+import org.ray.streaming.operator.OneInputOperator;
+import org.ray.streaming.operator.StreamOperator;
+
+
+public class MapOperator extends StreamOperator> implements
+ OneInputOperator {
+
+ public MapOperator(MapFunction mapFunction) {
+ super(mapFunction);
+ }
+
+ @Override
+ public void processElement(Record record) throws Exception {
+ this.collect(new Record(this.function.map(record.getValue())));
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/MasterOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/MasterOperator.java
new file mode 100644
index 000000000..963a0d11b
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/MasterOperator.java
@@ -0,0 +1,17 @@
+package org.ray.streaming.operator.impl;
+
+import org.ray.streaming.operator.OperatorType;
+import org.ray.streaming.operator.StreamOperator;
+
+
+public class MasterOperator extends StreamOperator {
+
+ public MasterOperator() {
+ super(null);
+ }
+
+ @Override
+ public OperatorType getOpType() {
+ return OperatorType.MASTER;
+ }
+}
diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/ReduceOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/ReduceOperator.java
new file mode 100644
index 000000000..6aa7ff892
--- /dev/null
+++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/ReduceOperator.java
@@ -0,0 +1,44 @@
+package org.ray.streaming.operator.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.ray.streaming.api.collector.Collector;
+import org.ray.streaming.api.function.impl.ReduceFunction;
+import org.ray.streaming.core.runtime.context.RuntimeContext;
+import org.ray.streaming.message.KeyRecord;
+import org.ray.streaming.message.Record;
+import org.ray.streaming.operator.OneInputOperator;
+import org.ray.streaming.operator.StreamOperator;
+
+public class ReduceOperator extends StreamOperator> implements
+ OneInputOperator {
+
+ private Map