From 9a4da1951eb19b2b36041442ff8b436bcdebf454 Mon Sep 17 00:00:00 2001 From: Tianyi Chen Date: Wed, 15 Jan 2020 13:12:03 +0800 Subject: [PATCH] [Streaming] Streaming scheduler - part1-1: job graph (#6712) --- .../api/collector/CollectionCollector.java | 1 - .../api/context/StreamingContext.java | 26 +++-- .../partition/impl/BroadcastPartition.java | 1 - .../PlanEdge.java => jobgraph/JobEdge.java} | 8 +- .../org/ray/streaming/jobgraph/JobGraph.java | 103 ++++++++++++++++++ .../JobGraphBuilder.java} | 43 +++++--- .../JobVertex.java} | 21 ++-- .../{plan => jobgraph}/VertexType.java | 2 +- .../streaming/operator/StreamOperator.java | 9 +- .../java/org/ray/streaming/plan/Plan.java | 58 ---------- .../ray/streaming/schedule/JobScheduler.java | 7 +- .../jobgraph/JobGraphBuilderTest.java | 96 ++++++++++++++++ .../ray/streaming/plan/PlanBuilderTest.java | 87 --------------- .../runtime/cluster/ResourceManager.java | 1 - .../runtime/core/graph/ExecutionEdge.java | 1 - .../runtime/core/graph/ExecutionGraph.java | 1 - .../runtime/core/graph/ExecutionNode.java | 3 +- .../runtime/core/graph/ExecutionTask.java | 1 - .../runtime/schedule/JobSchedulerImpl.java | 18 +-- .../runtime/schedule/TaskAssigner.java | 4 +- .../runtime/schedule/TaskAssignerImpl.java | 34 +++--- .../streaming/runtime/worker/JobWorker.java | 4 +- .../worker/context/RayRuntimeContext.java | 7 +- .../runtime/worker/context/WorkerContext.java | 6 +- .../streaming/runtime/demo/WordCountTest.java | 6 +- .../schedule/TaskAssignerImplTest.java | 18 +-- .../streamingqueue/StreamingQueueTest.java | 6 +- 27 files changed, 322 insertions(+), 250 deletions(-) rename streaming/java/streaming-api/src/main/java/org/ray/streaming/{plan/PlanEdge.java => jobgraph/JobEdge.java} (80%) create mode 100644 streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraph.java rename streaming/java/streaming-api/src/main/java/org/ray/streaming/{plan/PlanBuilder.java => jobgraph/JobGraphBuilder.java} (52%) rename streaming/java/streaming-api/src/main/java/org/ray/streaming/{plan/PlanVertex.java => jobgraph/JobVertex.java} (59%) rename streaming/java/streaming-api/src/main/java/org/ray/streaming/{plan => jobgraph}/VertexType.java (74%) delete mode 100644 streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/Plan.java create mode 100644 streaming/java/streaming-api/src/test/java/org/ray/streaming/jobgraph/JobGraphBuilderTest.java delete mode 100644 streaming/java/streaming-api/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/collector/CollectionCollector.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/collector/CollectionCollector.java index 536d33e05..33b0fcd4c 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/collector/CollectionCollector.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/collector/CollectionCollector.java @@ -1,7 +1,6 @@ package org.ray.streaming.api.collector; import java.util.List; -import org.ray.streaming.api.collector.Collector; import org.ray.streaming.message.Record; /** diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/context/StreamingContext.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/context/StreamingContext.java index 24c526e69..20aa57d60 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/context/StreamingContext.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/context/StreamingContext.java @@ -10,8 +10,8 @@ import java.util.Map; import java.util.ServiceLoader; import java.util.concurrent.atomic.AtomicInteger; import org.ray.streaming.api.stream.StreamSink; -import org.ray.streaming.plan.Plan; -import org.ray.streaming.plan.PlanBuilder; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.jobgraph.JobGraphBuilder; import org.ray.streaming.schedule.JobScheduler; /** @@ -20,15 +20,21 @@ import org.ray.streaming.schedule.JobScheduler; public class StreamingContext implements Serializable { private transient AtomicInteger idGenerator; + /** * The sinks of this streaming job. */ private List streamSinks; - private Map jobConfig; + + /** + * The user custom streaming job configuration. + */ + private Map jobConfig; + /** * The logic plan. */ - private Plan plan; + private JobGraph jobGraph; private StreamingContext() { this.idGenerator = new AtomicInteger(0); @@ -43,17 +49,17 @@ public class StreamingContext implements Serializable { /** * Construct job DAG, and execute the job. */ - public void execute() { - PlanBuilder planBuilder = new PlanBuilder(this.streamSinks); - this.plan = planBuilder.buildPlan(); - plan.printPlan(); + public void execute(String jobName) { + JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(this.streamSinks, jobName); + this.jobGraph = jobGraphBuilder.build(); + jobGraph.printJobGraph(); ServiceLoader serviceLoader = ServiceLoader.load(JobScheduler.class); Iterator iterator = serviceLoader.iterator(); Preconditions.checkArgument(iterator.hasNext(), "No JobScheduler implementation has been provided."); JobScheduler jobSchedule = iterator.next(); - jobSchedule.schedule(plan, jobConfig); + jobSchedule.schedule(jobGraph, jobConfig); } public int generateId() { @@ -64,7 +70,7 @@ public class StreamingContext implements Serializable { streamSinks.add(streamSink); } - public void withConfig(Map jobConfig) { + public void withConfig(Map jobConfig) { this.jobConfig = jobConfig; } } diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/partition/impl/BroadcastPartition.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/partition/impl/BroadcastPartition.java index a08ab0d9d..c148c4ec5 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/partition/impl/BroadcastPartition.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/api/partition/impl/BroadcastPartition.java @@ -1,7 +1,6 @@ package org.ray.streaming.api.partition.impl; import java.util.stream.IntStream; - import org.ray.streaming.api.partition.Partition; /** diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanEdge.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobEdge.java similarity index 80% rename from streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanEdge.java rename to streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobEdge.java index aee61ec00..7aa0e1dc7 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanEdge.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobEdge.java @@ -1,18 +1,18 @@ -package org.ray.streaming.plan; +package org.ray.streaming.jobgraph; import java.io.Serializable; import org.ray.streaming.api.partition.Partition; /** - * PlanEdge is connection and partition rules of upstream and downstream execution nodes. + * Job edge is connection and partition rules of upstream and downstream execution nodes. */ -public class PlanEdge implements Serializable { +public class JobEdge implements Serializable { private int srcVertexId; private int targetVertexId; private Partition partition; - public PlanEdge(int srcVertexId, int targetVertexId, Partition partition) { + public JobEdge(int srcVertexId, int targetVertexId, Partition partition) { this.srcVertexId = srcVertexId; this.targetVertexId = targetVertexId; this.partition = partition; diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraph.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraph.java new file mode 100644 index 000000000..920c98a63 --- /dev/null +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraph.java @@ -0,0 +1,103 @@ +package org.ray.streaming.jobgraph; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job graph, the logical plan of streaming job. + */ +public class JobGraph implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(JobGraph.class); + + private final String jobName; + private final Map jobConfig; + private List jobVertexList; + private List jobEdgeList; + private String digraph; + + public JobGraph(String jobName, Map jobConfig) { + this.jobName = jobName; + this.jobConfig = jobConfig; + this.jobVertexList = new ArrayList<>(); + this.jobEdgeList = new ArrayList<>(); + } + + /** + * Generate direct-graph(made up of a set of vertices and connected by edges) + * by current job graph for simple log printing. + * @return Digraph in string type. + * + * Notice: + * This is temporarily implemented in hard code. + * May use 'guru.nidi:graphviz-java' as 3rd dependency in the future if needed. + */ + public String generateDigraph() { + StringBuilder digraph = new StringBuilder(); + digraph.append("digraph ").append(jobName + " ").append(" {"); + + for (JobEdge jobEdge: jobEdgeList) { + String srcNode = null; + String targetNode = null; + for (JobVertex jobVertex : jobVertexList) { + if (jobEdge.getSrcVertexId() == jobVertex.getVertexId()) { + srcNode = jobVertex.getVertexId() + "-" + jobVertex.getStreamOperator().getName(); + } else if (jobEdge.getTargetVertexId() == jobVertex.getVertexId()) { + targetNode = jobVertex.getVertexId() + "-" + jobVertex.getStreamOperator().getName(); + } + } + digraph.append(System.getProperty("line.separator")); + digraph.append(srcNode).append(" -> ").append(targetNode); + } + digraph.append(System.getProperty("line.separator")).append("}"); + + this.digraph = digraph.toString(); + return this.digraph; + } + + public void addVertex(JobVertex vertex) { + this.jobVertexList.add(vertex); + } + + public void addEdge(JobEdge jobEdge) { + this.jobEdgeList.add(jobEdge); + } + + public List getJobVertexList() { + return jobVertexList; + } + + public List getJobEdgeList() { + return jobEdgeList; + } + + public String getDigraph() { + return digraph; + } + + public String getJobName() { + return jobName; + } + + public Map getJobConfig() { + return jobConfig; + } + + public void printJobGraph() { + if (!LOG.isInfoEnabled()) { + return; + } + LOG.info("Printing job graph:"); + for (JobVertex jobVertex : jobVertexList) { + LOG.info(jobVertex.toString()); + } + for (JobEdge jobEdge : jobEdgeList) { + LOG.info(jobEdge.toString()); + } + } + +} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanBuilder.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraphBuilder.java similarity index 52% rename from streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanBuilder.java rename to streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraphBuilder.java index 3b12cca46..c0e5c5b3a 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanBuilder.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobGraphBuilder.java @@ -1,6 +1,8 @@ -package org.ray.streaming.plan; +package org.ray.streaming.jobgraph; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.ray.streaming.api.stream.DataStream; import org.ray.streaming.api.stream.Stream; @@ -9,24 +11,33 @@ import org.ray.streaming.api.stream.StreamSource; import org.ray.streaming.operator.StreamOperator; import org.ray.streaming.python.stream.PythonDataStream; -public class PlanBuilder { +public class JobGraphBuilder { - private Plan plan; + private JobGraph jobGraph; private AtomicInteger edgeIdGenerator; private List streamSinkList; - public PlanBuilder(List streamSinkList) { - this.plan = new Plan(); + public JobGraphBuilder(List streamSinkList) { + this(streamSinkList, "job-" + System.currentTimeMillis()); + } + + public JobGraphBuilder(List streamSinkList, String jobName) { + this(streamSinkList, jobName, new HashMap<>()); + } + + public JobGraphBuilder(List streamSinkList, String jobName, + Map jobConfig) { + this.jobGraph = new JobGraph(jobName, jobConfig); this.streamSinkList = streamSinkList; this.edgeIdGenerator = new AtomicInteger(0); } - public Plan buildPlan() { + public JobGraph build() { for (StreamSink streamSink : streamSinkList) { processStream(streamSink); } - return this.plan; + return this.jobGraph; } private void processStream(Stream stream) { @@ -34,26 +45,26 @@ public class PlanBuilder { int parallelism = stream.getParallelism(); StreamOperator streamOperator = stream.getOperator(); - PlanVertex planVertex = null; + JobVertex jobVertex = null; if (stream instanceof StreamSink) { - planVertex = new PlanVertex(vertexId, parallelism, VertexType.SINK, streamOperator); + jobVertex = new JobVertex(vertexId, parallelism, VertexType.SINK, streamOperator); Stream parentStream = stream.getInputStream(); int inputVertexId = parentStream.getId(); - PlanEdge planEdge = new PlanEdge(inputVertexId, vertexId, parentStream.getPartition()); - this.plan.addEdge(planEdge); + JobEdge jobEdge = new JobEdge(inputVertexId, vertexId, parentStream.getPartition()); + this.jobGraph.addEdge(jobEdge); processStream(parentStream); } else if (stream instanceof StreamSource) { - planVertex = new PlanVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator); + jobVertex = new JobVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator); } else if (stream instanceof DataStream || stream instanceof PythonDataStream) { - planVertex = new PlanVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator); + jobVertex = new JobVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator); Stream parentStream = stream.getInputStream(); int inputVertexId = parentStream.getId(); - PlanEdge planEdge = new PlanEdge(inputVertexId, vertexId, parentStream.getPartition()); - this.plan.addEdge(planEdge); + JobEdge jobEdge = new JobEdge(inputVertexId, vertexId, parentStream.getPartition()); + this.jobGraph.addEdge(jobEdge); processStream(parentStream); } - this.plan.addVertex(planVertex); + this.jobGraph.addVertex(jobVertex); } private int getEdgeId() { diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanVertex.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobVertex.java similarity index 59% rename from streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanVertex.java rename to streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobVertex.java index e5c88b227..ebb736ecb 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/PlanVertex.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/JobVertex.java @@ -1,19 +1,20 @@ -package org.ray.streaming.plan; +package org.ray.streaming.jobgraph; +import com.google.common.base.MoreObjects; import java.io.Serializable; import org.ray.streaming.operator.StreamOperator; /** - * PlanVertex is a cell node where logic is executed. + * Job vertex is a cell node where logic is executed. */ -public class PlanVertex implements Serializable { +public class JobVertex implements Serializable { private int vertexId; private int parallelism; private VertexType vertexType; private StreamOperator streamOperator; - public PlanVertex(int vertexId, int parallelism, VertexType vertexType, + public JobVertex(int vertexId, int parallelism, VertexType vertexType, StreamOperator streamOperator) { this.vertexId = vertexId; this.parallelism = parallelism; @@ -39,11 +40,11 @@ public class PlanVertex implements Serializable { @Override public String toString() { - return "PlanVertex{" + - "vertexId=" + vertexId + - ", parallelism=" + parallelism + - ", vertexType=" + vertexType + - ", streamOperator=" + streamOperator + - '}'; + return MoreObjects.toStringHelper(this) + .add("vertexId", vertexId) + .add("parallelism", parallelism) + .add("vertexType", vertexType) + .add("streamOperator", streamOperator) + .toString(); } } diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/VertexType.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/VertexType.java similarity index 74% rename from streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/VertexType.java rename to streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/VertexType.java index b52e0d3be..fabc0acf0 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/VertexType.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/jobgraph/VertexType.java @@ -1,4 +1,4 @@ -package org.ray.streaming.plan; +package org.ray.streaming.jobgraph; /** * Different roles for a node. diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/operator/StreamOperator.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/operator/StreamOperator.java index 77e4bb2f7..74894dfa5 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/operator/StreamOperator.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/operator/StreamOperator.java @@ -9,29 +9,33 @@ import org.ray.streaming.message.Record; public abstract class StreamOperator implements Operator { + protected String name; protected F function; protected List collectorList; protected RuntimeContext runtimeContext; public StreamOperator(F function) { + this.name = getClass().getSimpleName(); this.function = function; } + @Override public void open(List collectorList, RuntimeContext runtimeContext) { this.collectorList = collectorList; this.runtimeContext = runtimeContext; } + @Override public void finish() { } + @Override public void close() { } - protected void collect(Record record) { for (Collector collector : this.collectorList) { collector.collect(record); @@ -44,4 +48,7 @@ public abstract class StreamOperator implements Operator { } } + public String getName() { + return name; + } } diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/Plan.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/Plan.java deleted file mode 100644 index 7e86a5645..000000000 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/plan/Plan.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.ray.streaming.plan; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * The logical execution plan. - */ -public class Plan implements Serializable { - - private static final Logger LOGGER = LoggerFactory.getLogger(Plan.class); - - private List planVertexList; - private List planEdgeList; - - public Plan() { - this.planVertexList = new ArrayList<>(); - this.planEdgeList = new ArrayList<>(); - } - - public void addVertex(PlanVertex vertex) { - this.planVertexList.add(vertex); - } - - public void addEdge(PlanEdge planEdge) { - this.planEdgeList.add(planEdge); - } - - public List getPlanVertexList() { - return planVertexList; - } - - public List getPlanEdgeList() { - return planEdgeList; - } - - public String getGraphVizPlan() { - return ""; - } - - public void printPlan() { - if (!LOGGER.isInfoEnabled()) { - return; - } - LOGGER.info("Printing logic plan:"); - for (PlanVertex planVertex : planVertexList) { - LOGGER.info(planVertex.toString()); - } - for (PlanEdge planEdge : planEdgeList) { - LOGGER.info(planEdge.toString()); - } - } - -} diff --git a/streaming/java/streaming-api/src/main/java/org/ray/streaming/schedule/JobScheduler.java b/streaming/java/streaming-api/src/main/java/org/ray/streaming/schedule/JobScheduler.java index 86539b432..5b7846af8 100644 --- a/streaming/java/streaming-api/src/main/java/org/ray/streaming/schedule/JobScheduler.java +++ b/streaming/java/streaming-api/src/main/java/org/ray/streaming/schedule/JobScheduler.java @@ -2,8 +2,7 @@ package org.ray.streaming.schedule; import java.util.Map; - -import org.ray.streaming.plan.Plan; +import org.ray.streaming.jobgraph.JobGraph; /** * Interface of the job scheduler. @@ -13,7 +12,7 @@ public interface JobScheduler { /** * Assign logical plan to physical execution graph, and schedule job to run. * - * @param plan The logical plan. + * @param jobGraph The logical plan. */ - void schedule(Plan plan, Map conf); + void schedule(JobGraph jobGraph, Map conf); } diff --git a/streaming/java/streaming-api/src/test/java/org/ray/streaming/jobgraph/JobGraphBuilderTest.java b/streaming/java/streaming-api/src/test/java/org/ray/streaming/jobgraph/JobGraphBuilderTest.java new file mode 100644 index 000000000..0a3951a9c --- /dev/null +++ b/streaming/java/streaming-api/src/test/java/org/ray/streaming/jobgraph/JobGraphBuilderTest.java @@ -0,0 +1,96 @@ +package org.ray.streaming.jobgraph; + +import com.google.common.collect.Lists; +import java.util.List; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.partition.impl.KeyPartition; +import org.ray.streaming.api.partition.impl.RoundRobinPartition; +import org.ray.streaming.api.stream.DataStream; +import org.ray.streaming.api.stream.DataStreamSource; +import org.ray.streaming.api.stream.StreamSink; +import org.ray.streaming.api.stream.StreamSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class JobGraphBuilderTest { + + private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilderTest.class); + + @Test + public void testDataSync() { + JobGraph jobGraph = buildDataSyncJobGraph(); + List jobVertexList = jobGraph.getJobVertexList(); + List jobEdgeList = jobGraph.getJobEdgeList(); + + Assert.assertEquals(jobVertexList.size(), 2); + Assert.assertEquals(jobEdgeList.size(), 1); + + JobEdge jobEdge = jobEdgeList.get(0); + Assert.assertEquals(jobEdge.getPartition().getClass(), RoundRobinPartition.class); + + JobVertex sinkVertex = jobVertexList.get(1); + JobVertex sourceVertex = jobVertexList.get(0); + Assert.assertEquals(sinkVertex.getVertexType(), VertexType.SINK); + Assert.assertEquals(sourceVertex.getVertexType(), VertexType.SOURCE); + + } + + public JobGraph buildDataSyncJobGraph() { + StreamingContext streamingContext = StreamingContext.buildContext(); + DataStream dataStream = DataStreamSource.buildSource(streamingContext, + Lists.newArrayList("a", "b", "c")); + StreamSink streamSink = dataStream.sink(x -> LOG.info(x)); + JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink)); + + JobGraph jobGraph = jobGraphBuilder.build(); + return jobGraph; + } + + @Test + public void testKeyByJobGraph() { + JobGraph jobGraph = buildKeyByJobGraph(); + List jobVertexList = jobGraph.getJobVertexList(); + List jobEdgeList = jobGraph.getJobEdgeList(); + + Assert.assertEquals(jobVertexList.size(), 3); + Assert.assertEquals(jobEdgeList.size(), 2); + + JobVertex source = jobVertexList.get(0); + JobVertex map = jobVertexList.get(1); + JobVertex sink = jobVertexList.get(2); + + Assert.assertEquals(source.getVertexType(), VertexType.SOURCE); + Assert.assertEquals(map.getVertexType(), VertexType.PROCESS); + Assert.assertEquals(sink.getVertexType(), VertexType.SINK); + + JobEdge keyBy2Sink = jobEdgeList.get(0); + JobEdge source2KeyBy = jobEdgeList.get(1); + + Assert.assertEquals(keyBy2Sink.getPartition().getClass(), KeyPartition.class); + Assert.assertEquals(source2KeyBy.getPartition().getClass(), RoundRobinPartition.class); + } + + public JobGraph buildKeyByJobGraph() { + StreamingContext streamingContext = StreamingContext.buildContext(); + DataStream dataStream = DataStreamSource.buildSource(streamingContext, + Lists.newArrayList("1", "2", "3", "4")); + StreamSink streamSink = dataStream.keyBy(x -> x) + .sink(x -> LOG.info(x)); + JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink)); + + JobGraph jobGraph = jobGraphBuilder.build(); + return jobGraph; + } + + @Test + public void testJobGraphViz() { + JobGraph jobGraph = buildKeyByJobGraph(); + jobGraph.generateDigraph(); + String diGraph = jobGraph.getDigraph(); + System.out.println(diGraph); + Assert.assertTrue(diGraph.contains("1-SourceOperator -> 2-KeyByOperator")); + Assert.assertTrue(diGraph.contains("2-KeyByOperator -> 3-SinkOperator")); + } +} \ No newline at end of file diff --git a/streaming/java/streaming-api/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java b/streaming/java/streaming-api/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java deleted file mode 100644 index 0e02f8c7b..000000000 --- a/streaming/java/streaming-api/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java +++ /dev/null @@ -1,87 +0,0 @@ -package org.ray.streaming.plan; - - -import com.google.common.collect.Lists; -import org.ray.streaming.api.context.StreamingContext; -import org.ray.streaming.api.partition.impl.KeyPartition; -import org.ray.streaming.api.partition.impl.RoundRobinPartition; -import org.ray.streaming.api.stream.DataStream; -import org.ray.streaming.api.stream.DataStreamSink; -import org.ray.streaming.api.stream.DataStreamSource; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.Test; - -public class PlanBuilderTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(PlanBuilderTest.class); - - @Test - public void testDataSync() { - Plan plan = buildDataSyncPlan(); - List planVertexList = plan.getPlanVertexList(); - List planEdgeList = plan.getPlanEdgeList(); - - Assert.assertEquals(planVertexList.size(), 2); - Assert.assertEquals(planEdgeList.size(), 1); - - PlanEdge planEdge = planEdgeList.get(0); - Assert.assertEquals(planEdge.getPartition().getClass(), RoundRobinPartition.class); - - PlanVertex sinkVertex = planVertexList.get(1); - PlanVertex sourceVertex = planVertexList.get(0); - Assert.assertEquals(sinkVertex.getVertexType(), VertexType.SINK); - Assert.assertEquals(sourceVertex.getVertexType(), VertexType.SOURCE); - - } - - public Plan buildDataSyncPlan() { - StreamingContext streamingContext = StreamingContext.buildContext(); - DataStream dataStream = DataStreamSource.buildSource(streamingContext, - Lists.newArrayList("a", "b", "c")); - DataStreamSink streamSink = dataStream.sink(x -> LOGGER.info(x)); - PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink)); - - Plan plan = planBuilder.buildPlan(); - return plan; - } - - @Test - public void testKeyByPlan() { - Plan plan = buildKeyByPlan(); - List planVertexList = plan.getPlanVertexList(); - List planEdgeList = plan.getPlanEdgeList(); - - Assert.assertEquals(planVertexList.size(), 3); - Assert.assertEquals(planEdgeList.size(), 2); - - PlanVertex source = planVertexList.get(0); - PlanVertex map = planVertexList.get(1); - PlanVertex sink = planVertexList.get(2); - - Assert.assertEquals(source.getVertexType(), VertexType.SOURCE); - Assert.assertEquals(map.getVertexType(), VertexType.PROCESS); - Assert.assertEquals(sink.getVertexType(), VertexType.SINK); - - PlanEdge keyBy2Sink = planEdgeList.get(0); - PlanEdge source2KeyBy = planEdgeList.get(1); - - Assert.assertEquals(keyBy2Sink.getPartition().getClass(), KeyPartition.class); - Assert.assertEquals(source2KeyBy.getPartition().getClass(), RoundRobinPartition.class); - } - - public Plan buildKeyByPlan() { - StreamingContext streamingContext = StreamingContext.buildContext(); - DataStream dataStream = DataStreamSource.buildSource(streamingContext, - Lists.newArrayList("1", "2", "3", "4")); - DataStreamSink streamSink = dataStream.keyBy(x -> x) - .sink(x -> LOGGER.info(x)); - PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink)); - - Plan plan = planBuilder.buildPlan(); - return plan; - } - -} \ No newline at end of file diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/cluster/ResourceManager.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/cluster/ResourceManager.java index 0a113da76..73fe0b621 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/cluster/ResourceManager.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/cluster/ResourceManager.java @@ -2,7 +2,6 @@ package org.ray.streaming.runtime.cluster; import java.util.ArrayList; import java.util.List; - import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.streaming.runtime.worker.JobWorker; diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionEdge.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionEdge.java index 966834d4d..c74e94b22 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionEdge.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionEdge.java @@ -1,7 +1,6 @@ package org.ray.streaming.runtime.core.graph; import java.io.Serializable; - import org.ray.streaming.api.partition.Partition; /** diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java index a6f850361..56a3ecdb1 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionGraph.java @@ -6,7 +6,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; - import org.ray.api.RayActor; import org.ray.streaming.runtime.worker.JobWorker; diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionNode.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionNode.java index 019583810..7c550d885 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionNode.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionNode.java @@ -3,13 +3,14 @@ package org.ray.streaming.runtime.core.graph; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.ray.streaming.jobgraph.VertexType; import org.ray.streaming.operator.StreamOperator; -import org.ray.streaming.plan.VertexType; /** * A node in the physical execution graph. */ public class ExecutionNode implements Serializable { + private int nodeId; private int parallelism; private NodeType nodeType; diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java index d56c30532..afc831841 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/ExecutionTask.java @@ -1,7 +1,6 @@ package org.ray.streaming.runtime.core.graph; import java.io.Serializable; - import org.ray.api.RayActor; import org.ray.streaming.runtime.worker.JobWorker; diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/JobSchedulerImpl.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/JobSchedulerImpl.java index b93d2bf46..a6fb38bb9 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/JobSchedulerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/JobSchedulerImpl.java @@ -6,8 +6,8 @@ import java.util.Map; import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; -import org.ray.streaming.plan.Plan; -import org.ray.streaming.plan.PlanVertex; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.jobgraph.JobVertex; import org.ray.streaming.runtime.cluster.ResourceManager; import org.ray.streaming.runtime.core.graph.ExecutionGraph; import org.ray.streaming.runtime.core.graph.ExecutionNode; @@ -21,8 +21,8 @@ import org.ray.streaming.schedule.JobScheduler; * from ResourceManager. */ public class JobSchedulerImpl implements JobScheduler { - private Plan plan; - private Map jobConfig; + private JobGraph jobGraph; + private Map jobConfig; private ResourceManager resourceManager; private TaskAssigner taskAssigner; @@ -35,14 +35,14 @@ public class JobSchedulerImpl implements JobScheduler { * Schedule physical plan to execution graph, and call streaming worker to init and run. */ @Override - public void schedule(Plan plan, Map jobConfig) { + public void schedule(JobGraph jobGraph, Map jobConfig) { this.jobConfig = jobConfig; - this.plan = plan; + this.jobGraph = jobGraph; System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); Ray.init(); List> workers = this.resourceManager.createWorkers(getPlanWorker()); - ExecutionGraph executionGraph = this.taskAssigner.assign(this.plan, workers); + ExecutionGraph executionGraph = this.taskAssigner.assign(this.jobGraph, workers); List executionNodes = executionGraph.getExecutionNodeList(); List> waits = new ArrayList<>(); @@ -59,7 +59,7 @@ public class JobSchedulerImpl implements JobScheduler { } private int getPlanWorker() { - List planVertexList = plan.getPlanVertexList(); - return planVertexList.stream().map(PlanVertex::getParallelism).reduce(0, Integer::sum); + List jobVertexList = jobGraph.getJobVertexList(); + return jobVertexList.stream().map(JobVertex::getParallelism).reduce(0, Integer::sum); } } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssigner.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssigner.java index f826bdd4b..9927b6ae6 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssigner.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssigner.java @@ -3,7 +3,7 @@ package org.ray.streaming.runtime.schedule; import java.io.Serializable; import java.util.List; import org.ray.api.RayActor; -import org.ray.streaming.plan.Plan; +import org.ray.streaming.jobgraph.JobGraph; import org.ray.streaming.runtime.core.graph.ExecutionGraph; import org.ray.streaming.runtime.worker.JobWorker; @@ -15,6 +15,6 @@ public interface TaskAssigner extends Serializable { /** * Assign logical plan to physical execution graph. */ - ExecutionGraph assign(Plan plan, List> workers); + ExecutionGraph assign(JobGraph jobGraph, List> workers); } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java index 2c6b70a9c..4e0d2e31d 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/schedule/TaskAssignerImpl.java @@ -6,9 +6,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.ray.api.RayActor; -import org.ray.streaming.plan.Plan; -import org.ray.streaming.plan.PlanEdge; -import org.ray.streaming.plan.PlanVertex; +import org.ray.streaming.jobgraph.JobEdge; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.jobgraph.JobVertex; import org.ray.streaming.runtime.core.graph.ExecutionEdge; import org.ray.streaming.runtime.core.graph.ExecutionGraph; import org.ray.streaming.runtime.core.graph.ExecutionNode; @@ -20,37 +20,37 @@ public class TaskAssignerImpl implements TaskAssigner { /** * Assign an optimized logical plan to execution graph. * - * @param plan The logical plan. + * @param jobGraph The logical plan. * @param workers The worker actors. * @return The physical execution graph. */ @Override - public ExecutionGraph assign(Plan plan, List> workers) { - List planVertices = plan.getPlanVertexList(); - List planEdges = plan.getPlanEdgeList(); + public ExecutionGraph assign(JobGraph jobGraph, List> workers) { + List jobVertices = jobGraph.getJobVertexList(); + List jobEdges = jobGraph.getJobEdgeList(); int taskId = 0; Map idToExecutionNode = new HashMap<>(); - for (PlanVertex planVertex : planVertices) { - ExecutionNode executionNode = new ExecutionNode(planVertex.getVertexId(), - planVertex.getParallelism()); - executionNode.setNodeType(planVertex.getVertexType()); + for (JobVertex jobVertex : jobVertices) { + ExecutionNode executionNode = new ExecutionNode(jobVertex.getVertexId(), + jobVertex.getParallelism()); + executionNode.setNodeType(jobVertex.getVertexType()); List vertexTasks = new ArrayList<>(); - for (int taskIndex = 0; taskIndex < planVertex.getParallelism(); taskIndex++) { + for (int taskIndex = 0; taskIndex < jobVertex.getParallelism(); taskIndex++) { vertexTasks.add(new ExecutionTask(taskId, taskIndex, workers.get(taskId))); taskId++; } executionNode.setExecutionTasks(vertexTasks); - executionNode.setStreamOperator(planVertex.getStreamOperator()); + executionNode.setStreamOperator(jobVertex.getStreamOperator()); idToExecutionNode.put(executionNode.getNodeId(), executionNode); } - for (PlanEdge planEdge : planEdges) { - int srcNodeId = planEdge.getSrcVertexId(); - int targetNodeId = planEdge.getTargetVertexId(); + for (JobEdge jobEdge : jobEdges) { + int srcNodeId = jobEdge.getSrcVertexId(); + int targetNodeId = jobEdge.getTargetVertexId(); ExecutionEdge executionEdge = new ExecutionEdge(srcNodeId, targetNodeId, - planEdge.getPartition()); + jobEdge.getPartition()); idToExecutionNode.get(srcNodeId).addExecutionEdge(executionEdge); idToExecutionNode.get(targetNodeId).addInputEdge(executionEdge); } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/JobWorker.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/JobWorker.java index 47950b0cf..6fabfc4e3 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/JobWorker.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/JobWorker.java @@ -36,7 +36,7 @@ public class JobWorker implements Serializable { } private int taskId; - private Map config; + private Map config; private WorkerContext workerContext; private ExecutionNode executionNode; private ExecutionTask executionTask; @@ -88,7 +88,7 @@ public class JobWorker implements Serializable { return taskId; } - public Map getConfig() { + public Map getConfig() { return config; } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/context/RayRuntimeContext.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/context/RayRuntimeContext.java index e6779733c..b4d89c3ae 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/context/RayRuntimeContext.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/context/RayRuntimeContext.java @@ -3,7 +3,6 @@ package org.ray.streaming.runtime.worker.context; import static org.ray.streaming.util.Config.STREAMING_BATCH_MAX_COUNT; import java.util.Map; - import org.ray.streaming.api.context.RuntimeContext; import org.ray.streaming.runtime.core.graph.ExecutionTask; @@ -16,16 +15,16 @@ public class RayRuntimeContext implements RuntimeContext { private int parallelism; private Long batchId; private final Long maxBatch; - private Map config; + private Map config; - public RayRuntimeContext(ExecutionTask executionTask, Map config, + public RayRuntimeContext(ExecutionTask executionTask, Map config, int parallelism) { this.taskId = executionTask.getTaskId(); this.config = config; this.taskIndex = executionTask.getTaskIndex(); this.parallelism = parallelism; if (config.containsKey(STREAMING_BATCH_MAX_COUNT)) { - this.maxBatch = Long.valueOf(String.valueOf(config.get(STREAMING_BATCH_MAX_COUNT))); + this.maxBatch = Long.valueOf(config.get(STREAMING_BATCH_MAX_COUNT)); } else { this.maxBatch = Long.MAX_VALUE; } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/context/WorkerContext.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/context/WorkerContext.java index 567909f81..8ef3e0d24 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/context/WorkerContext.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/worker/context/WorkerContext.java @@ -11,9 +11,9 @@ public class WorkerContext implements Serializable { private int taskId; private ExecutionGraph executionGraph; - private Map config; + private Map config; - public WorkerContext(int taskId, ExecutionGraph executionGraph, Map jobConfig) { + public WorkerContext(int taskId, ExecutionGraph executionGraph, Map jobConfig) { this.taskId = taskId; this.executionGraph = executionGraph; this.config = jobConfig; @@ -35,7 +35,7 @@ public class WorkerContext implements Serializable { this.executionGraph = executionGraph; } - public Map getConfig() { + public Map getConfig() { return config; } } diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java index 40246abdc..76acd657b 100644 --- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/demo/WordCountTest.java @@ -30,8 +30,8 @@ public class WordCountTest extends BaseUnitTest implements Serializable { @Test public void testWordCount() { StreamingContext streamingContext = StreamingContext.buildContext(); - Map config = new HashMap<>(); - config.put(Config.STREAMING_BATCH_MAX_COUNT, 1); + Map config = new HashMap<>(); + config.put(Config.STREAMING_BATCH_MAX_COUNT, "1"); config.put(Config.CHANNEL_TYPE, Config.MEMORY_CHANNEL); streamingContext.withConfig(config); List text = new ArrayList<>(); @@ -50,7 +50,7 @@ public class WordCountTest extends BaseUnitTest implements Serializable { .sink((SinkFunction) result -> wordCount.put(result.word, result.count)); - streamingContext.execute(); + streamingContext.execute("testWordCount"); // Sleep until the count for every word is computed. while (wordCount.size() < 3) { diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java index c9f108d8b..94ef3c80a 100644 --- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/TaskAssignerImplTest.java @@ -19,8 +19,8 @@ import org.ray.streaming.runtime.core.graph.ExecutionGraph; import org.ray.streaming.runtime.core.graph.ExecutionNode; import org.ray.streaming.runtime.core.graph.ExecutionNode.NodeType; import org.ray.streaming.runtime.worker.JobWorker; -import org.ray.streaming.plan.Plan; -import org.ray.streaming.plan.PlanBuilder; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.jobgraph.JobGraphBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -32,15 +32,15 @@ public class TaskAssignerImplTest extends BaseUnitTest { @Test public void testTaskAssignImpl() { - Plan plan = buildDataSyncPlan(); + JobGraph jobGraph = buildDataSyncPlan(); List> workers = new ArrayList<>(); - for(int i = 0; i < plan.getPlanVertexList().size(); i++) { + for(int i = 0; i < jobGraph.getJobVertexList().size(); i++) { workers.add(new LocalModeRayActor(ActorId.fromRandom(), ObjectId.fromRandom())); } TaskAssigner taskAssigner = new TaskAssignerImpl(); - ExecutionGraph executionGraph = taskAssigner.assign(plan, workers); + ExecutionGraph executionGraph = taskAssigner.assign(jobGraph, workers); List executionNodeList = executionGraph.getExecutionNodeList(); @@ -63,14 +63,14 @@ public class TaskAssignerImplTest extends BaseUnitTest { Assert.assertEquals(sinkNode.getOutputEdges().size(), 0); } - public Plan buildDataSyncPlan() { + public JobGraph buildDataSyncPlan() { StreamingContext streamingContext = StreamingContext.buildContext(); DataStream dataStream = DataStreamSource.buildSource(streamingContext, Lists.newArrayList("a", "b", "c")); DataStreamSink streamSink = dataStream.sink(x -> LOGGER.info(x)); - PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink)); + JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(Lists.newArrayList(streamSink)); - Plan plan = planBuilder.buildPlan(); - return plan; + JobGraph jobGraph = jobGraphBuilder.build(); + return jobGraph; } } diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java index be4c1399a..e65f9d8e2 100644 --- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/streamingqueue/StreamingQueueTest.java @@ -151,8 +151,8 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { Map wordCount = new ConcurrentHashMap<>(); StreamingContext streamingContext = StreamingContext.buildContext(); - Map config = new HashMap<>(); - config.put(Config.STREAMING_BATCH_MAX_COUNT, 1); + Map config = new HashMap<>(); + config.put(Config.STREAMING_BATCH_MAX_COUNT, "1"); config.put(Config.CHANNEL_TYPE, Config.NATIVE_CHANNEL); config.put(Config.CHANNEL_SIZE, "100000"); streamingContext.withConfig(config); @@ -177,7 +177,7 @@ public class StreamingQueueTest extends BaseUnitTest implements Serializable { serializeResultToFile(resultFile, wordCount); }); - streamingContext.execute(); + streamingContext.execute("testWordCount"); Map checkWordCount = (Map) deserializeResultFromFile(resultFile);