From 5efb21e1d0d75ee2d4189bd9707a432b8045a036 Mon Sep 17 00:00:00 2001 From: ppeagle Date: Sat, 30 Mar 2019 19:32:05 +0800 Subject: [PATCH] Initial commit for Ray streaming (#4268) --- java/BUILD.bazel | 39 +++++ java/pom.xml | 1 + java/streaming/pom.xml | 42 ++++++ .../streaming/api/collector/Collector.java | 13 ++ .../api/context/StreamingContext.java | 66 +++++++++ .../ray/streaming/api/function/Function.java | 10 ++ .../api/function/impl/AggregateFunction.java | 23 +++ .../api/function/impl/FlatMapFunction.java | 16 +++ .../api/function/impl/JoinFunction.java | 17 +++ .../api/function/impl/KeyFunction.java | 15 ++ .../api/function/impl/MapFunction.java | 15 ++ .../api/function/impl/ProcessFunction.java | 14 ++ .../api/function/impl/ReduceFunction.java | 14 ++ .../api/function/impl/SinkFunction.java | 14 ++ .../api/function/impl/SourceFunction.java | 23 +++ .../internal/CollectionSourceFunction.java | 34 +++++ .../streaming/api/partition/Partition.java | 21 +++ .../partition/impl/BroadcastPartition.java | 17 +++ .../api/partition/impl/KeyPartition.java | 20 +++ .../partition/impl/RoundRobinPartition.java | 24 ++++ .../ray/streaming/api/stream/DataStream.java | 136 ++++++++++++++++++ .../ray/streaming/api/stream/JoinStream.java | 82 +++++++++++ .../streaming/api/stream/KeyDataStream.java | 53 +++++++ .../org/ray/streaming/api/stream/Stream.java | 71 +++++++++ .../ray/streaming/api/stream/StreamSink.java | 21 +++ .../streaming/api/stream/StreamSource.java | 35 +++++ .../ray/streaming/api/stream/UnionStream.java | 25 ++++ .../streaming/cluster/ResourceManager.java | 20 +++ .../ray/streaming/core/command/BatchInfo.java | 20 +++ .../streaming/core/graph/ExecutionEdge.java | 48 +++++++ .../streaming/core/graph/ExecutionGraph.java | 73 ++++++++++ .../streaming/core/graph/ExecutionNode.java | 98 +++++++++++++ .../streaming/core/graph/ExecutionTask.java | 47 ++++++ .../core/processor/MasterProcessor.java | 101 +++++++++++++ .../core/processor/OneInputProcessor.java | 29 ++++ .../core/processor/ProcessBuilder.java | 33 +++++ .../streaming/core/processor/Processor.java | 15 ++ .../core/processor/SourceProcessor.java | 25 ++++ .../core/processor/StreamProcessor.java | 33 +++++ .../core/processor/TwoInputProcessor.java | 37 +++++ .../streaming/core/runtime/StreamWorker.java | 86 +++++++++++ .../collector/CollectionCollector.java | 26 ++++ .../runtime/collector/RayCallCollector.java | 58 ++++++++ .../runtime/context/RayRuntimeContext.java | 58 ++++++++ .../core/runtime/context/RuntimeContext.java | 19 +++ .../core/runtime/context/WorkerContext.java | 41 ++++++ .../org/ray/streaming/message/KeyRecord.java | 20 +++ .../org/ray/streaming/message/Message.java | 64 +++++++++ .../org/ray/streaming/message/Record.java | 50 +++++++ .../streaming/operator/OneInputOperator.java | 13 ++ .../org/ray/streaming/operator/Operator.java | 17 +++ .../ray/streaming/operator/OperatorType.java | 9 ++ .../streaming/operator/StreamOperator.java | 47 ++++++ .../streaming/operator/TwoInputOperator.java | 13 ++ .../operator/impl/FlatMapOperator.java | 31 ++++ .../operator/impl/KeyByOperator.java | 22 +++ .../streaming/operator/impl/MapOperator.java | 20 +++ .../operator/impl/MasterOperator.java | 17 +++ .../operator/impl/ReduceOperator.java | 44 ++++++ .../streaming/operator/impl/SinkOperator.java | 20 +++ .../operator/impl/SourceOperator.java | 61 ++++++++ .../java/org/ray/streaming/plan/Plan.java | 58 ++++++++ .../org/ray/streaming/plan/PlanBuilder.java | 62 ++++++++ .../java/org/ray/streaming/plan/PlanEdge.java | 50 +++++++ .../org/ray/streaming/plan/PlanVertex.java | 49 +++++++ .../org/ray/streaming/plan/VertexType.java | 11 ++ .../ray/streaming/schedule/IJobSchedule.java | 17 +++ .../ray/streaming/schedule/ITaskAssign.java | 20 +++ .../schedule/impl/JobScheduleImpl.java | 93 ++++++++++++ .../schedule/impl/TaskAssignImpl.java | 66 +++++++++ .../org/ray/streaming/util/ConfigKey.java | 10 ++ .../src/main/resources/log4j.properties | 6 + java/streaming/src/main/resources/ray.conf | 5 + .../org/ray/streaming/demo/WordCountTest.java | 75 ++++++++++ .../ray/streaming/plan/PlanBuilderTest.java | 87 +++++++++++ .../schedule/impl/TaskAssignImplTest.java | 58 ++++++++ .../src/test/resources/log4j.properties | 6 + java/streaming/src/test/resources/ray.conf | 3 + java/streaming/testng.xml | 9 ++ java/test.sh | 23 +-- 80 files changed, 2873 insertions(+), 11 deletions(-) create mode 100644 java/streaming/pom.xml create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/collector/Collector.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/context/StreamingContext.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/Function.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/AggregateFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/FlatMapFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/JoinFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/KeyFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/MapFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/ProcessFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/ReduceFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/SinkFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/impl/SourceFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/function/internal/CollectionSourceFunction.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/partition/Partition.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/partition/impl/BroadcastPartition.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/partition/impl/KeyPartition.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/partition/impl/RoundRobinPartition.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/stream/DataStream.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/stream/JoinStream.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/stream/KeyDataStream.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/stream/Stream.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/stream/StreamSink.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/stream/StreamSource.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/api/stream/UnionStream.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/cluster/ResourceManager.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/command/BatchInfo.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionEdge.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionGraph.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionNode.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionTask.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/processor/MasterProcessor.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/processor/OneInputProcessor.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/processor/ProcessBuilder.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/processor/Processor.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/processor/SourceProcessor.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/processor/StreamProcessor.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/processor/TwoInputProcessor.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/runtime/StreamWorker.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/CollectionCollector.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/RayCallCollector.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RayRuntimeContext.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RuntimeContext.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/core/runtime/context/WorkerContext.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/message/KeyRecord.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/message/Message.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/message/Record.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/OneInputOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/Operator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/OperatorType.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/StreamOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/TwoInputOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/impl/FlatMapOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/impl/KeyByOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/impl/MapOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/impl/MasterOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/impl/ReduceOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/impl/SinkOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/operator/impl/SourceOperator.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/plan/Plan.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/plan/PlanBuilder.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/plan/PlanEdge.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/plan/PlanVertex.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/plan/VertexType.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/schedule/IJobSchedule.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/schedule/ITaskAssign.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/schedule/impl/JobScheduleImpl.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/schedule/impl/TaskAssignImpl.java create mode 100644 java/streaming/src/main/java/org/ray/streaming/util/ConfigKey.java create mode 100644 java/streaming/src/main/resources/log4j.properties create mode 100644 java/streaming/src/main/resources/ray.conf create mode 100644 java/streaming/src/test/java/org/ray/streaming/demo/WordCountTest.java create mode 100644 java/streaming/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java create mode 100644 java/streaming/src/test/java/org/ray/streaming/schedule/impl/TaskAssignImplTest.java create mode 100644 java/streaming/src/test/resources/log4j.properties create mode 100644 java/streaming/src/test/resources/ray.conf create mode 100644 java/streaming/testng.xml diff --git a/java/BUILD.bazel b/java/BUILD.bazel index 54c55f4fe..d20847272 100644 --- a/java/BUILD.bazel +++ b/java/BUILD.bazel @@ -4,6 +4,7 @@ exports_files([ "testng.xml", "checkstyle.xml", "checkstyle-suppressions.xml", + "streaming/testng.xml", ]) java_binary( @@ -21,14 +22,20 @@ java_import( "liborg_ray_ray_runtime-src.jar", "liborg_ray_ray_tutorial.jar", "liborg_ray_ray_tutorial-src.jar", + "liborg_ray_ray_streaming.jar", + "liborg_ray_ray_streaming-src.jar", "all_tests_deploy.jar", "all_tests_deploy-src.jar", + "streaming_tests_deploy.jar", + "streaming_tests_deploy-src.jar", ], deps = [ ":org_ray_ray_api", ":org_ray_ray_runtime", ":org_ray_ray_tutorial", + ":org_ray_ray_streaming", ":all_tests", + ":streaming_tests", ], ) @@ -103,6 +110,28 @@ define_java_module( ], ) +define_java_module( + name = "streaming", + deps = [ + ":org_ray_ray_api", + ":org_ray_ray_runtime", + "@com_google_guava_guava//jar", + "@org_slf4j_slf4j_api//jar", + "@org_slf4j_slf4j_log4j12//jar", + ], + define_test_lib = True, + test_deps = [ + ":org_ray_ray_api", + ":org_ray_ray_runtime", + ":org_ray_ray_streaming", + "@com_beust_jcommander//jar", + "@com_google_guava_guava//jar", + "@org_slf4j_slf4j_api//jar", + "@org_slf4j_slf4j_log4j12//jar", + "@org_testng_testng//jar", + ], +) + java_binary( name = "all_tests", main_class = "org.testng.TestNG", @@ -114,6 +143,16 @@ java_binary( ], ) +java_binary( + name = "streaming_tests", + main_class = "org.testng.TestNG", + data = ["streaming/testng.xml"], + args = ["java/streaming/testng.xml"], + runtime_deps = [ + ":org_ray_ray_streaming_test", + ], +) + flatbuffers_generated_files = [ "ActorCheckpointData.java", "ActorCheckpointIdData.java", diff --git a/java/pom.xml b/java/pom.xml index fe2c1d71c..ce5ffa2fa 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -12,6 +12,7 @@ api runtime test + streaming tutorial diff --git a/java/streaming/pom.xml b/java/streaming/pom.xml new file mode 100644 index 000000000..c95976373 --- /dev/null +++ b/java/streaming/pom.xml @@ -0,0 +1,42 @@ + + + + org.ray + ray-superpom + 0.1-SNAPSHOT + + 4.0.0 + + streaming + ray streaming + ray streaming + + jar + + + + org.ray + ray-api + ${project.version} + + + org.ray + ray-runtime + ${project.version} + + + org.slf4j + slf4j-log4j12 + + + com.google.guava + guava + + + org.testng + testng + + + diff --git a/java/streaming/src/main/java/org/ray/streaming/api/collector/Collector.java b/java/streaming/src/main/java/org/ray/streaming/api/collector/Collector.java new file mode 100644 index 000000000..481f3e95f --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/collector/Collector.java @@ -0,0 +1,13 @@ +package org.ray.streaming.api.collector; + +/** + * The collector that collects data from an upstream operator, and emits data to downstream + * operators. + * + * @param Type of the data to collect. + */ +public interface Collector { + + void collect(T value); + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/context/StreamingContext.java b/java/streaming/src/main/java/org/ray/streaming/api/context/StreamingContext.java new file mode 100644 index 000000000..aaffa5021 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/context/StreamingContext.java @@ -0,0 +1,66 @@ +package org.ray.streaming.api.context; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.ray.api.Ray; +import org.ray.streaming.api.stream.StreamSink; +import org.ray.streaming.plan.Plan; +import org.ray.streaming.plan.PlanBuilder; +import org.ray.streaming.schedule.IJobSchedule; +import org.ray.streaming.schedule.impl.JobScheduleImpl; + +/** + * Encapsulate the context information of a streaming Job. + */ +public class StreamingContext implements Serializable { + + private transient AtomicInteger idGenerator; + /** + * The sinks of this streaming job. + */ + private List streamSinks; + private Map jobConfig; + /** + * The logic plan. + */ + private Plan plan; + + private StreamingContext() { + this.idGenerator = new AtomicInteger(0); + this.streamSinks = new ArrayList<>(); + this.jobConfig = new HashMap(); + } + + public static StreamingContext buildContext() { + Ray.init(); + return new StreamingContext(); + } + + /** + * Construct job DAG, and execute the job. + */ + public void execute() { + PlanBuilder planBuilder = new PlanBuilder(this.streamSinks); + this.plan = planBuilder.buildPlan(); + plan.printPlan(); + + IJobSchedule jobSchedule = new JobScheduleImpl(jobConfig); + jobSchedule.schedule(plan); + } + + public int generateId() { + return this.idGenerator.incrementAndGet(); + } + + public void addSink(StreamSink streamSink) { + streamSinks.add(streamSink); + } + + public void withConfig(Map jobConfig) { + this.jobConfig = jobConfig; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/Function.java b/java/streaming/src/main/java/org/ray/streaming/api/function/Function.java new file mode 100644 index 000000000..35fe994b7 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/Function.java @@ -0,0 +1,10 @@ +package org.ray.streaming.api.function; + +import java.io.Serializable; + +/** + * Interface of streaming functions. + */ +public interface Function extends Serializable { + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/AggregateFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/AggregateFunction.java new file mode 100644 index 000000000..a7a8032e1 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/AggregateFunction.java @@ -0,0 +1,23 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of aggregate functions. + * + * @param Type of the input data. + * @param Type of the intermediate data. + * @param Type of the output data. + */ +public interface AggregateFunction extends Function { + + A createAccumulator(); + + void add(I value, A accumulator); + + O getResult(A accumulator); + + A merge(A a, A b); + + void retract(A acc, I value); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/FlatMapFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/FlatMapFunction.java new file mode 100644 index 000000000..20b2a91de --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/FlatMapFunction.java @@ -0,0 +1,16 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.api.function.Function; + +/** + * Interface of flat-map functions. + * + * @param Type of the input data. + * @param Type of the output data. + */ +@FunctionalInterface +public interface FlatMapFunction extends Function { + + void flatMap(T value, Collector collector); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/JoinFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/JoinFunction.java new file mode 100644 index 000000000..8dad87a90 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/JoinFunction.java @@ -0,0 +1,17 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of join functions. + * + * @param Type of the left input data. + * @param Type of the right input data. + * @param Type of the output data. + */ +@FunctionalInterface +public interface JoinFunction extends Function { + + R join(T left, O right); + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/KeyFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/KeyFunction.java new file mode 100644 index 000000000..4c3c28904 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/KeyFunction.java @@ -0,0 +1,15 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of key-by functions. + * + * @param Type of the input data. + * @param Type of the key-by field. + */ +@FunctionalInterface +public interface KeyFunction extends Function { + + K keyBy(T value); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/MapFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/MapFunction.java new file mode 100644 index 000000000..88a487fc7 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/MapFunction.java @@ -0,0 +1,15 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of map functions. + * + * @param type of the input data. + * @param type of the output data. + */ +@FunctionalInterface +public interface MapFunction extends Function { + + R map(T value); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/ProcessFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/ProcessFunction.java new file mode 100644 index 000000000..5633783aa --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/ProcessFunction.java @@ -0,0 +1,14 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of process functions. + * + * @param Type of the input data. + */ +@FunctionalInterface +public interface ProcessFunction extends Function { + + void process(T value); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/ReduceFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/ReduceFunction.java new file mode 100644 index 000000000..96d6ab289 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/ReduceFunction.java @@ -0,0 +1,14 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of reduce functions. + * + * @param Type of the input data. + */ +@FunctionalInterface +public interface ReduceFunction extends Function { + + T reduce(T oldValue, T newValue); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/SinkFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/SinkFunction.java new file mode 100644 index 000000000..6a311a30e --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/SinkFunction.java @@ -0,0 +1,14 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of sink functions. + * + * @param Type of the sink data. + */ +@FunctionalInterface +public interface SinkFunction extends Function { + + void sink(T value); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/impl/SourceFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/SourceFunction.java new file mode 100644 index 000000000..64a410172 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/impl/SourceFunction.java @@ -0,0 +1,23 @@ +package org.ray.streaming.api.function.impl; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of Source functions. + * + * @param Type of the data output by the source. + */ +public interface SourceFunction extends Function { + + void init(int parallel, int index); + + void fetch(long batchId, SourceContext ctx) throws Exception; + + void close(); + + interface SourceContext { + + void collect(T element) throws Exception; + + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/function/internal/CollectionSourceFunction.java b/java/streaming/src/main/java/org/ray/streaming/api/function/internal/CollectionSourceFunction.java new file mode 100644 index 000000000..1ad6736f7 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/function/internal/CollectionSourceFunction.java @@ -0,0 +1,34 @@ +package org.ray.streaming.api.function.internal; + +import java.util.Collection; +import org.ray.streaming.api.function.impl.SourceFunction; + +/** + * The SourceFunction that fetch data from a Java Collection object. + * + * @param Type of the data output by the source. + */ +public class CollectionSourceFunction implements SourceFunction { + + private Collection values; + + public CollectionSourceFunction(Collection values) { + this.values = values; + } + + @Override + public void init(int parallel, int index) { + } + + @Override + public void fetch(long batchId, SourceContext ctx) throws Exception { + for (T value : values) { + ctx.collect(value); + } + } + + @Override + public void close() { + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/partition/Partition.java b/java/streaming/src/main/java/org/ray/streaming/api/partition/Partition.java new file mode 100644 index 000000000..46c8b04f3 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/partition/Partition.java @@ -0,0 +1,21 @@ +package org.ray.streaming.api.partition; + +import org.ray.streaming.api.function.Function; + +/** + * Interface of the partitioning strategy. + * @param Type of the input data. + */ +@FunctionalInterface +public interface Partition extends Function { + + /** + * Given a record and downstream tasks, determine which task(s) should receive the record. + * + * @param record The record. + * @param taskIds IDs of all downstream tasks. + * @return IDs of the downstream tasks that should receive the record. + */ + int[] partition(T record, int[] taskIds); + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/BroadcastPartition.java b/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/BroadcastPartition.java new file mode 100644 index 000000000..2e415ee7e --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/BroadcastPartition.java @@ -0,0 +1,17 @@ +package org.ray.streaming.api.partition.impl; + +import org.ray.streaming.api.partition.Partition; + +/** + * Broadcast the record to all downstream tasks. + */ +public class BroadcastPartition implements Partition { + + public BroadcastPartition() { + } + + @Override + public int[] partition(T value, int[] taskIds) { + return taskIds; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/KeyPartition.java b/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/KeyPartition.java new file mode 100644 index 000000000..9c86def34 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/KeyPartition.java @@ -0,0 +1,20 @@ +package org.ray.streaming.api.partition.impl; + +import org.ray.streaming.api.partition.Partition; +import org.ray.streaming.message.KeyRecord; + +/** + * Partition the record by the key. + * + * @param Type of the partition key. + * @param Type of the input record. + */ +public class KeyPartition implements Partition> { + + @Override + public int[] partition(KeyRecord keyRecord, int[] taskIds) { + int length = taskIds.length; + int taskId = taskIds[Math.abs(keyRecord.getKey().hashCode() % length)]; + return new int[]{taskId}; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/RoundRobinPartition.java b/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/RoundRobinPartition.java new file mode 100644 index 000000000..0c821400b --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/partition/impl/RoundRobinPartition.java @@ -0,0 +1,24 @@ +package org.ray.streaming.api.partition.impl; + +import org.ray.streaming.api.partition.Partition; + +/** + * Partition record to downstream tasks in a round-robin matter. + * + * @param Type of the input record. + */ +public class RoundRobinPartition implements Partition { + + private int seq; + + public RoundRobinPartition() { + this.seq = 0; + } + + @Override + public int[] partition(T value, int[] taskIds) { + int length = taskIds.length; + int taskId = taskIds[seq++ % length]; + return new int[]{taskId}; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/stream/DataStream.java b/java/streaming/src/main/java/org/ray/streaming/api/stream/DataStream.java new file mode 100644 index 000000000..a1989b9f2 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/stream/DataStream.java @@ -0,0 +1,136 @@ +package org.ray.streaming.api.stream; + + +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.function.impl.FlatMapFunction; +import org.ray.streaming.api.function.impl.KeyFunction; +import org.ray.streaming.api.function.impl.MapFunction; +import org.ray.streaming.api.function.impl.SinkFunction; +import org.ray.streaming.api.partition.Partition; +import org.ray.streaming.api.partition.impl.BroadcastPartition; +import org.ray.streaming.operator.StreamOperator; +import org.ray.streaming.operator.impl.FlatMapOperator; +import org.ray.streaming.operator.impl.KeyByOperator; +import org.ray.streaming.operator.impl.MapOperator; +import org.ray.streaming.operator.impl.SinkOperator; + +/** + * Represents a stream of data. + * + * This class defines all the streaming operations. + * + * @param Type of data in the stream. + */ +public class DataStream extends Stream { + + public DataStream(StreamingContext streamingContext, StreamOperator streamOperator) { + super(streamingContext, streamOperator); + } + + public DataStream(DataStream input, StreamOperator streamOperator) { + super(input, streamOperator); + } + + /** + * Apply a map function to this stream. + * + * @param mapFunction The map function. + * @param Type of data returned by the map function. + * @return A new DataStream. + */ + public DataStream map(MapFunction mapFunction) { + return new DataStream<>(this, new MapOperator(mapFunction)); + } + + /** + * Apply a flat-map function to this stream. + * + * @param flatMapFunction The FlatMapFunction + * @param Type of data returned by the flatmap function. + * @return A new DataStream + */ + public DataStream flatMap(FlatMapFunction flatMapFunction) { + return new DataStream(this, new FlatMapOperator(flatMapFunction)); + } + + /** + * Apply a union transformation to this stream, with another stream. + * + * @param other Another stream. + * @return A new UnionStream. + */ + public UnionStream union(DataStream other) { + return new UnionStream(this, null, other); + } + + /** + * Apply a join transformation to this stream, with another stream. + * + * @param other Another stream. + * @param The type of the other stream data. + * @param The type of the data in the joined stream. + * @return A new JoinStream. + */ + public JoinStream join(DataStream other) { + return new JoinStream<>(this, other); + } + + public DataStream process() { + // TODO(zhenxuanpan): Need to add processFunction. + return new DataStream(this, null); + } + + /** + * Apply a sink function and get a StreamSink. + * + * @param sinkFunction The sink function. + * @return A new StreamSink. + */ + public StreamSink sink(SinkFunction sinkFunction) { + return new StreamSink<>(this, new SinkOperator(sinkFunction)); + } + + /** + * Apply a key-by function to this stream. + * + * @param keyFunction the key function. + * @param The type of the key. + * @return A new KeyDataStream. + */ + public KeyDataStream keyBy(KeyFunction keyFunction) { + return new KeyDataStream<>(this, new KeyByOperator(keyFunction)); + } + + /** + * Apply broadcast to this stream. + * + * @return This stream. + */ + public DataStream broadcast() { + this.partition = new BroadcastPartition<>(); + return this; + } + + /** + * Apply a partition to this stream. + * + * @param partition The partitioning strategy. + * @return This stream. + */ + public DataStream partitionBy(Partition partition) { + this.partition = partition; + return this; + } + + /** + * Set parallelism to current transformation. + * + * @param parallelism The parallelism to set. + * @return This stream. + */ + public DataStream setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/stream/JoinStream.java b/java/streaming/src/main/java/org/ray/streaming/api/stream/JoinStream.java new file mode 100644 index 000000000..2795feadc --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/stream/JoinStream.java @@ -0,0 +1,82 @@ +package org.ray.streaming.api.stream; + +import java.io.Serializable; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.function.impl.JoinFunction; +import org.ray.streaming.api.function.impl.KeyFunction; +import org.ray.streaming.operator.StreamOperator; + +/** + * Represents a DataStream of two joined DataStream. + * + * @param Lype of the data in the left stream. + * @param Lype of the data in the right stream. + * @param Lype of the data in the joined stream. + */ +public class JoinStream extends DataStream { + + public JoinStream(StreamingContext streamingContext, StreamOperator streamOperator) { + super(streamingContext, streamOperator); + } + + public JoinStream(DataStream leftStream, DataStream rightStream) { + super(leftStream, null); + } + + /** + * Apply key-by to the left join stream. + */ + public Where where(KeyFunction keyFunction) { + return new Where<>(this, keyFunction); + } + + /** + * Where clause of the join transformation. + * + * @param Lype of the data in the left stream. + * @param Lype of the data in the right stream. + * @param Lype of the data in the joined stream. + * @param Lype of the join key. + */ + class Where implements Serializable { + + private JoinStream joinStream; + private KeyFunction leftKeyByFunction; + + public Where(JoinStream joinStream, KeyFunction leftKeyByFunction) { + this.joinStream = joinStream; + this.leftKeyByFunction = leftKeyByFunction; + } + + public Equal equalLo(KeyFunction rightKeyFunction) { + return new Equal<>(joinStream, leftKeyByFunction, rightKeyFunction); + } + } + + /** + * Equal clause of the join transformation. + * + * @param Lype of the data in the left stream. + * @param Lype of the data in the right stream. + * @param Lype of the data in the joined stream. + * @param Lype of the join key. + */ + class Equal implements Serializable { + + private JoinStream joinStream; + private KeyFunction leftKeyByFunction; + private KeyFunction rightKeyByFunction; + + public Equal(JoinStream joinStream, KeyFunction leftKeyByFunction, + KeyFunction rightKeyByFunction) { + this.joinStream = joinStream; + this.leftKeyByFunction = leftKeyByFunction; + this.rightKeyByFunction = rightKeyByFunction; + } + + public DataStream with(JoinFunction joinFunction) { + return (DataStream) joinStream; + } + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/stream/KeyDataStream.java b/java/streaming/src/main/java/org/ray/streaming/api/stream/KeyDataStream.java new file mode 100644 index 000000000..16cc3d104 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/stream/KeyDataStream.java @@ -0,0 +1,53 @@ +package org.ray.streaming.api.stream; + +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.function.impl.AggregateFunction; +import org.ray.streaming.api.function.impl.ReduceFunction; +import org.ray.streaming.api.partition.impl.KeyPartition; +import org.ray.streaming.operator.StreamOperator; +import org.ray.streaming.operator.impl.ReduceOperator; + +/** + * Represents a DataStream returned by a key-by operation. + * + * @param Type of the key. + * @param Type of the data. + */ +public class KeyDataStream extends DataStream { + + public KeyDataStream(StreamingContext streamingContext, StreamOperator streamOperator) { + super(streamingContext, streamOperator); + } + + public KeyDataStream(DataStream input, StreamOperator streamOperator) { + super(input, streamOperator); + this.partition = new KeyPartition(); + } + + /** + * Apply a reduce function to this stream. + * + * @param reduceFunction The reduce function. + * @return A new DataStream. + */ + public DataStream reduce(ReduceFunction reduceFunction) { + return new DataStream<>(this, new ReduceOperator(reduceFunction)); + } + + /** + * Apply an aggregate Function to this stream. + * + * @param aggregateFunction The aggregate function + * @param The type of aggregated intermediate data. + * @param The type of result data. + * @return A new DataStream. + */ + public DataStream aggregate(AggregateFunction aggregateFunction) { + return new DataStream<>(this, null); + } + + public KeyDataStream setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/stream/Stream.java b/java/streaming/src/main/java/org/ray/streaming/api/stream/Stream.java new file mode 100644 index 000000000..52cb53b1d --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/stream/Stream.java @@ -0,0 +1,71 @@ +package org.ray.streaming.api.stream; + +import java.io.Serializable; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.partition.Partition; +import org.ray.streaming.api.partition.impl.RoundRobinPartition; +import org.ray.streaming.operator.StreamOperator; + +/** + * Abstract base class of all stream types. + * + * @param Type of the data in the stream. + */ +public abstract class Stream implements Serializable { + + protected int id; + protected int parallelism = 1; + protected StreamOperator operator; + protected Stream inputStream; + protected StreamingContext streamingContext; + protected Partition partition; + + public Stream(StreamingContext streamingContext, StreamOperator streamOperator) { + this.streamingContext = streamingContext; + this.operator = streamOperator; + this.id = streamingContext.generateId(); + this.partition = new RoundRobinPartition<>(); + } + + public Stream(Stream inputStream, StreamOperator streamOperator) { + this.inputStream = inputStream; + this.parallelism = inputStream.getParallelism(); + this.streamingContext = this.inputStream.getStreamingContext(); + this.operator = streamOperator; + this.id = streamingContext.generateId(); + this.partition = new RoundRobinPartition<>(); + } + + public Stream getInputStream() { + return inputStream; + } + + public StreamOperator getOperator() { + return operator; + } + + public StreamingContext getStreamingContext() { + return streamingContext; + } + + public int getParallelism() { + return parallelism; + } + + public Stream setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public int getId() { + return id; + } + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/stream/StreamSink.java b/java/streaming/src/main/java/org/ray/streaming/api/stream/StreamSink.java new file mode 100644 index 000000000..00ec34ef4 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/stream/StreamSink.java @@ -0,0 +1,21 @@ +package org.ray.streaming.api.stream; + +import org.ray.streaming.operator.impl.SinkOperator; + +/** + * Represents a sink of the DataStream. + * + * @param Type of the input data of this sink. + */ +public class StreamSink extends Stream { + + public StreamSink(DataStream input, SinkOperator sinkOperator) { + super(input, sinkOperator); + this.streamingContext.addSink(this); + } + + public StreamSink setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/stream/StreamSource.java b/java/streaming/src/main/java/org/ray/streaming/api/stream/StreamSource.java new file mode 100644 index 000000000..727410d5a --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/stream/StreamSource.java @@ -0,0 +1,35 @@ +package org.ray.streaming.api.stream; + +import java.util.Collection; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.function.internal.CollectionSourceFunction; +import org.ray.streaming.operator.impl.SourceOperator; + +/** + * Represents a source of the DataStream. + * + * @param The type of StreamSource data. + */ +public class StreamSource extends DataStream { + + public StreamSource(StreamingContext streamingContext, SourceOperator sourceOperator) { + super(streamingContext, sourceOperator); + } + + /** + * Build a StreamSource source from a collection. + * + * @param context Stream context. + * @param values A collection of values. + * @param The type of source data. + * @return A StreamSource. + */ + public static StreamSource buildSource(StreamingContext context, Collection values) { + return new StreamSource(context, new SourceOperator(new CollectionSourceFunction(values))); + } + + public StreamSource setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/api/stream/UnionStream.java b/java/streaming/src/main/java/org/ray/streaming/api/stream/UnionStream.java new file mode 100644 index 000000000..0e296526e --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/api/stream/UnionStream.java @@ -0,0 +1,25 @@ +package org.ray.streaming.api.stream; + +import java.util.ArrayList; +import java.util.List; +import org.ray.streaming.operator.StreamOperator; + +/** + * Represents a union DataStream. + * + * @param The type of union data. + */ +public class UnionStream extends DataStream { + + private List unionStreams; + + public UnionStream(DataStream input, StreamOperator streamOperator, DataStream other) { + super(input, streamOperator); + this.unionStreams = new ArrayList<>(); + this.unionStreams.add(other); + } + + public List getUnionStreams() { + return unionStreams; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/cluster/ResourceManager.java b/java/streaming/src/main/java/org/ray/streaming/cluster/ResourceManager.java new file mode 100644 index 000000000..3230abce5 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/cluster/ResourceManager.java @@ -0,0 +1,20 @@ +package org.ray.streaming.cluster; + +import java.util.ArrayList; +import java.util.List; +import org.ray.api.Ray; +import org.ray.api.RayActor; +import org.ray.streaming.core.runtime.StreamWorker; + +public class ResourceManager { + + public List> createWorker(int workerNum) { + List> workers = new ArrayList<>(); + for (int i = 0; i < workerNum; i++) { + RayActor worker = Ray.createActor(StreamWorker::new); + workers.add(worker); + } + return workers; + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/command/BatchInfo.java b/java/streaming/src/main/java/org/ray/streaming/core/command/BatchInfo.java new file mode 100644 index 000000000..359a4c1f2 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/command/BatchInfo.java @@ -0,0 +1,20 @@ +package org.ray.streaming.core.command; + +import java.io.Serializable; + +public class BatchInfo implements Serializable { + + private long batchId; + + public BatchInfo(long batchId) { + this.batchId = batchId; + } + + public long getBatchId() { + return batchId; + } + + public void setBatchId(long batchId) { + this.batchId = batchId; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionEdge.java b/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionEdge.java new file mode 100644 index 000000000..42b50ead8 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionEdge.java @@ -0,0 +1,48 @@ +package org.ray.streaming.core.graph; + +import java.io.Serializable; +import org.ray.streaming.api.partition.Partition; + +/** + * An edge in the physical execution graph. + */ +public class ExecutionEdge implements Serializable { + + private int srcNodeId; + private int targetNodeId; + private Partition partition; + + public ExecutionEdge(int srcNodeId, int targetNodeId, Partition partition) { + this.srcNodeId = srcNodeId; + this.targetNodeId = targetNodeId; + this.partition = partition; + } + + public int getSrcNodeId() { + return srcNodeId; + } + + public void setSrcNodeId(int srcNodeId) { + this.srcNodeId = srcNodeId; + } + + public int getTargetNodeId() { + return targetNodeId; + } + + public void setTargetNodeId(int targetNodeId) { + this.targetNodeId = targetNodeId; + } + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } + + public String getStream() { + return "stream:" + srcNodeId + "-" + targetNodeId; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionGraph.java b/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionGraph.java new file mode 100644 index 000000000..fc8d2b29e --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionGraph.java @@ -0,0 +1,73 @@ +package org.ray.streaming.core.graph; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.ray.api.RayActor; +import org.ray.streaming.core.runtime.StreamWorker; + +/** + * Physical execution graph. + */ +public class ExecutionGraph implements Serializable { + + private List executionNodeList; + + public ExecutionGraph(List executionNodes) { + this.executionNodeList = executionNodes; + } + + public void addExectionNode(ExecutionNode executionNode) { + this.executionNodeList.add(executionNode); + } + + public List getExecutionNodeList() { + return executionNodeList; + } + + public ExecutionTask getExecutionTaskByTaskId(int taskId) { + for (ExecutionNode executionNode : executionNodeList) { + for (ExecutionTask executionTask : executionNode.getExecutionTaskList()) { + if (executionTask.getTaskId() == taskId) { + return executionTask; + } + } + } + throw new RuntimeException("Task " + taskId + " does not exist!"); + } + + public ExecutionNode getExecutionNodeByNodeId(int nodeId) { + for (ExecutionNode executionNode : executionNodeList) { + if (executionNode.getNodeId() == nodeId) { + return executionNode; + } + } + throw new RuntimeException("Node " + nodeId + " does not exist!"); + } + + public ExecutionNode getExecutionNodeByTaskId(int taskId) { + for (ExecutionNode executionNode : executionNodeList) { + for (ExecutionTask executionTask : executionNode.getExecutionTaskList()) { + if (executionTask.getTaskId() == taskId) { + return executionNode; + } + } + } + throw new RuntimeException("Task " + taskId + " does not exist!"); + } + + public Map> getTaskId2WorkerByNodeId(int nodeId) { + for (ExecutionNode executionNode : executionNodeList) { + if (executionNode.getNodeId() == nodeId) { + Map> taskId2Worker = new HashMap<>(); + for (ExecutionTask executionTask : executionNode.getExecutionTaskList()) { + taskId2Worker.put(executionTask.getTaskId(), executionTask.getWorker()); + } + return taskId2Worker; + } + } + throw new RuntimeException("Node " + nodeId + " does not exist!"); + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionNode.java b/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionNode.java new file mode 100644 index 000000000..32756c49c --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionNode.java @@ -0,0 +1,98 @@ +package org.ray.streaming.core.graph; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.ray.streaming.core.processor.StreamProcessor; +import org.ray.streaming.plan.VertexType; + +/** + * A node in the physical execution graph. + */ +public class ExecutionNode implements Serializable { + + private int nodeId; + private int parallelism; + private NodeType nodeType; + private StreamProcessor streamProcessor; + private List executionTaskList; + private List executionEdgeList; + + public ExecutionNode(int nodeId, int parallelism) { + this.nodeId = nodeId; + this.parallelism = parallelism; + this.executionTaskList = new ArrayList<>(); + this.executionEdgeList = new ArrayList<>(); + } + + public int getNodeId() { + return nodeId; + } + + public void setNodeId(int nodeId) { + this.nodeId = nodeId; + } + + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public List getExecutionTaskList() { + return executionTaskList; + } + + public void setExecutionTaskList(List executionTaskList) { + this.executionTaskList = executionTaskList; + } + + public List getExecutionEdgeList() { + return executionEdgeList; + } + + public void setExecutionEdgeList(List executionEdgeList) { + this.executionEdgeList = executionEdgeList; + } + + public void addExecutionEdge(ExecutionEdge executionEdge) { + this.executionEdgeList.add(executionEdge); + } + + public StreamProcessor getStreamProcessor() { + return streamProcessor; + } + + public void setStreamProcessor(StreamProcessor streamProcessor) { + this.streamProcessor = streamProcessor; + } + + public NodeType getNodeType() { + return nodeType; + } + + public void setNodeType(VertexType vertexType) { + switch (vertexType) { + case MASTER: + this.nodeType = NodeType.MASTER; + break; + case SOURCE: + this.nodeType = NodeType.SOURCE; + break; + case SINK: + this.nodeType = NodeType.SINK; + break; + default: + this.nodeType = NodeType.PROCESS; + } + } + + public enum NodeType { + MASTER, + SOURCE, + PROCESS, + SINK, + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionTask.java b/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionTask.java new file mode 100644 index 000000000..72d3eaa6f --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/graph/ExecutionTask.java @@ -0,0 +1,47 @@ +package org.ray.streaming.core.graph; + +import java.io.Serializable; +import org.ray.api.RayActor; +import org.ray.streaming.core.runtime.StreamWorker; + +/** + * ExecutionTask is minimal execution unit. + * + * An ExecutionNode has n ExecutionTasks if parallelism is n. + */ +public class ExecutionTask implements Serializable { + + private int taskId; + private int taskIndex; + private RayActor worker; + + public ExecutionTask(int taskId, int taskIndex, RayActor worker) { + this.taskId = taskId; + this.taskIndex = taskIndex; + this.worker = worker; + } + + public int getTaskId() { + return taskId; + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + public int getTaskIndex() { + return taskIndex; + } + + public void setTaskIndex(int taskIndex) { + this.taskIndex = taskIndex; + } + + public RayActor getWorker() { + return worker; + } + + public void setWorker(RayActor worker) { + this.worker = worker; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/processor/MasterProcessor.java b/java/streaming/src/main/java/org/ray/streaming/core/processor/MasterProcessor.java new file mode 100644 index 000000000..3ea081322 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/processor/MasterProcessor.java @@ -0,0 +1,101 @@ +package org.ray.streaming.core.processor; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.core.command.BatchInfo; +import org.ray.streaming.core.graph.ExecutionGraph; +import org.ray.streaming.core.graph.ExecutionNode; +import org.ray.streaming.core.graph.ExecutionNode.NodeType; +import org.ray.streaming.core.graph.ExecutionTask; +import org.ray.streaming.core.runtime.context.RuntimeContext; +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.impl.MasterOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * MasterProcessor is responsible for overall control logic. + */ +public class MasterProcessor extends StreamProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(MasterProcessor.class); + + private Thread batchControllerThread; + private long maxBatch; + + public MasterProcessor(MasterOperator masterOperator) { + super(masterOperator); + } + + public void open(List collectors, RuntimeContext runtimeContext, + ExecutionGraph executionGraph) { + super.open(collectors, runtimeContext); + this.maxBatch = runtimeContext.getMaxBatch(); + startBatchController(executionGraph); + + } + + private void startBatchController(ExecutionGraph executionGraph) { + BatchController batchController = new BatchController(maxBatch, collectors); + List sinkTasks = new ArrayList<>(); + for (ExecutionNode executionNode : executionGraph.getExecutionNodeList()) { + if (executionNode.getNodeType() == NodeType.SINK) { + List nodeTasks = executionNode.getExecutionTaskList().stream() + .map(ExecutionTask::getTaskId).collect(Collectors.toList()); + sinkTasks.addAll(nodeTasks); + } + } + + batchControllerThread = new Thread(batchController, "controller-thread"); + batchControllerThread.start(); + } + + @Override + public void process(BatchInfo executionGraph) { + + } + + @Override + public void close() { + + } + + static class BatchController implements Runnable, Serializable { + + private AtomicInteger batchId; + private List collectors; + private Map sinkBatchMap; + private Integer frequency; + private long maxBatch; + + public BatchController(long maxBatch, List collectors) { + this.batchId = new AtomicInteger(0); + this.maxBatch = maxBatch; + this.collectors = collectors; + // TODO(zhenxuanpan): Use config to set. + this.frequency = 1000; + } + + @Override + public void run() { + while (batchId.get() < maxBatch) { + try { + Record record = new Record<>(new BatchInfo(batchId.getAndIncrement())); + for (Collector collector : collectors) { + collector.collect(record); + } + Thread.sleep(frequency); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + } + + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/processor/OneInputProcessor.java b/java/streaming/src/main/java/org/ray/streaming/core/processor/OneInputProcessor.java new file mode 100644 index 000000000..3d675aa16 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/processor/OneInputProcessor.java @@ -0,0 +1,29 @@ +package org.ray.streaming.core.processor; + +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.OneInputOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OneInputProcessor extends StreamProcessor, OneInputOperator> { + + private static final Logger LOGGER = LoggerFactory.getLogger(OneInputProcessor.class); + + public OneInputProcessor(OneInputOperator operator) { + super(operator); + } + + @Override + public void process(Record record) { + try { + this.operator.processElement(record); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + this.operator.close(); + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/processor/ProcessBuilder.java b/java/streaming/src/main/java/org/ray/streaming/core/processor/ProcessBuilder.java new file mode 100644 index 000000000..2a8c1e634 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/processor/ProcessBuilder.java @@ -0,0 +1,33 @@ +package org.ray.streaming.core.processor; + +import org.ray.streaming.operator.OneInputOperator; +import org.ray.streaming.operator.OperatorType; +import org.ray.streaming.operator.StreamOperator; +import org.ray.streaming.operator.TwoInputOperator; +import org.ray.streaming.operator.impl.SourceOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ProcessBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessBuilder.class); + + public static StreamProcessor buildProcessor(StreamOperator streamOperator) { + OperatorType type = streamOperator.getOpType(); + LOGGER.info("Building StreamProcessor, operator type = {}, operator = {}.", type, + streamOperator.getClass().getSimpleName().toString()); + switch (type) { + case MASTER: + return new MasterProcessor(null); + case SOURCE: + return new SourceProcessor<>((SourceOperator) streamOperator); + case ONE_INPUT: + return new OneInputProcessor<>((OneInputOperator) streamOperator); + case TWO_INPUT: + return new TwoInputProcessor((TwoInputOperator) streamOperator); + default: + throw new RuntimeException("current operator type is not support"); + } + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/processor/Processor.java b/java/streaming/src/main/java/org/ray/streaming/core/processor/Processor.java new file mode 100644 index 000000000..02b38f08a --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/processor/Processor.java @@ -0,0 +1,15 @@ +package org.ray.streaming.core.processor; + +import java.io.Serializable; +import java.util.List; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.core.runtime.context.RuntimeContext; + +public interface Processor extends Serializable { + + void open(List collectors, RuntimeContext runtimeContext); + + void process(T t); + + void close(); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/processor/SourceProcessor.java b/java/streaming/src/main/java/org/ray/streaming/core/processor/SourceProcessor.java new file mode 100644 index 000000000..759a73d01 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/processor/SourceProcessor.java @@ -0,0 +1,25 @@ +package org.ray.streaming.core.processor; + +import org.ray.streaming.operator.impl.SourceOperator; + +/** + * The processor for the stream sources, containing a SourceOperator. + * + * @param The type of source data. + */ +public class SourceProcessor extends StreamProcessor> { + + public SourceProcessor(SourceOperator operator) { + super(operator); + } + + @Override + public void process(Long batchId) { + this.operator.process(batchId); + } + + @Override + public void close() { + + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/processor/StreamProcessor.java b/java/streaming/src/main/java/org/ray/streaming/core/processor/StreamProcessor.java new file mode 100644 index 000000000..3dc307e01 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/processor/StreamProcessor.java @@ -0,0 +1,33 @@ +package org.ray.streaming.core.processor; + +import java.util.List; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.core.runtime.context.RuntimeContext; +import org.ray.streaming.operator.Operator; + +/** + * StreamingProcessor is a process unit for a operator. + * + * @param The type of process data. + * @param

Type of the specific operator class. + */ +public abstract class StreamProcessor implements Processor { + + protected List collectors; + protected RuntimeContext runtimeContext; + protected P operator; + + public StreamProcessor(P operator) { + this.operator = operator; + } + + @Override + public void open(List collectors, RuntimeContext runtimeContext) { + this.collectors = collectors; + this.runtimeContext = runtimeContext; + if (operator != null) { + this.operator.open(collectors, runtimeContext); + } + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/processor/TwoInputProcessor.java b/java/streaming/src/main/java/org/ray/streaming/core/processor/TwoInputProcessor.java new file mode 100644 index 000000000..88094b0c8 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/processor/TwoInputProcessor.java @@ -0,0 +1,37 @@ +package org.ray.streaming.core.processor; + +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.TwoInputOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TwoInputProcessor extends StreamProcessor> { + + private static final Logger LOGGER = LoggerFactory.getLogger(TwoInputProcessor.class); + + // TODO(zhenxuanpan): Set leftStream and rightStream. + private String leftStream; + private String rigthStream; + + public TwoInputProcessor(TwoInputOperator operator) { + super(operator); + } + + @Override + public void process(Record record) { + try { + if (record.getStream().equals(leftStream)) { + this.operator.processElement(record, null); + } else { + this.operator.processElement(null, record); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + this.operator.close(); + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/StreamWorker.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/StreamWorker.java new file mode 100644 index 000000000..292d05b5b --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/StreamWorker.java @@ -0,0 +1,86 @@ +package org.ray.streaming.core.runtime; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.ray.api.annotation.RayRemote; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.core.command.BatchInfo; +import org.ray.streaming.core.graph.ExecutionEdge; +import org.ray.streaming.core.graph.ExecutionGraph; +import org.ray.streaming.core.graph.ExecutionNode; +import org.ray.streaming.core.graph.ExecutionNode.NodeType; +import org.ray.streaming.core.graph.ExecutionTask; +import org.ray.streaming.core.processor.MasterProcessor; +import org.ray.streaming.core.processor.StreamProcessor; +import org.ray.streaming.core.runtime.collector.RayCallCollector; +import org.ray.streaming.core.runtime.context.RayRuntimeContext; +import org.ray.streaming.core.runtime.context.RuntimeContext; +import org.ray.streaming.core.runtime.context.WorkerContext; +import org.ray.streaming.message.Message; +import org.ray.streaming.message.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The stream worker, it is a ray actor. + */ +@RayRemote +public class StreamWorker implements Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(StreamWorker.class); + + private int taskId; + private WorkerContext workerContext; + private StreamProcessor streamProcessor; + private NodeType nodeType; + + public StreamWorker() { + } + + public Boolean init(WorkerContext workerContext) { + this.workerContext = workerContext; + this.taskId = workerContext.getTaskId(); + ExecutionGraph executionGraph = this.workerContext.getExecutionGraph(); + ExecutionTask executionTask = executionGraph.getExecutionTaskByTaskId(taskId); + ExecutionNode executionNode = executionGraph.getExecutionNodeByTaskId(taskId); + + this.nodeType = executionNode.getNodeType(); + this.streamProcessor = executionNode.getStreamProcessor(); + LOGGER.debug("Initializing StreamWorker, taskId: {}, operator: {}.", taskId, streamProcessor); + + List executionEdges = executionNode.getExecutionEdgeList(); + + List collectors = new ArrayList<>(); + for (ExecutionEdge executionEdge : executionEdges) { + collectors.add(new RayCallCollector(taskId, executionEdge, executionGraph)); + } + + RuntimeContext runtimeContext = new RayRuntimeContext(executionTask, workerContext.getConfig(), + executionNode.getParallelism()); + if (this.nodeType == NodeType.MASTER) { + ((MasterProcessor) streamProcessor).open(collectors, runtimeContext, executionGraph); + } else { + this.streamProcessor.open(collectors, runtimeContext); + } + return true; + } + + public Boolean process(Message message) { + LOGGER.debug("Processing message, taskId: {}, message: {}.", taskId, message); + if (nodeType == NodeType.SOURCE) { + Record record = message.getRecord(0); + BatchInfo batchInfo = (BatchInfo) record.getValue(); + this.streamProcessor.process(batchInfo.getBatchId()); + } else { + List records = message.getRecordList(); + for (Record record : records) { + record.setBatchId(message.getBatchId()); + record.setStream(message.getStream()); + this.streamProcessor.process(record); + } + } + return true; + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/CollectionCollector.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/CollectionCollector.java new file mode 100644 index 000000000..03ef391d2 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/CollectionCollector.java @@ -0,0 +1,26 @@ +package org.ray.streaming.core.runtime.collector; + +import java.util.List; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.message.Record; + +/** + * Combination of multiple collectors. + * + * @param The type of output data. + */ +public class CollectionCollector implements Collector { + + private List collectorList; + + public CollectionCollector(List collectorList) { + this.collectorList = collectorList; + } + + @Override + public void collect(T value) { + for (Collector collector : collectorList) { + collector.collect(new Record(value)); + } + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/RayCallCollector.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/RayCallCollector.java new file mode 100644 index 000000000..e5331a44e --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/collector/RayCallCollector.java @@ -0,0 +1,58 @@ +package org.ray.streaming.core.runtime.collector; + +import java.util.Arrays; +import java.util.Map; +import org.ray.api.Ray; +import org.ray.api.RayActor; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.api.partition.Partition; +import org.ray.streaming.core.graph.ExecutionEdge; +import org.ray.streaming.core.graph.ExecutionGraph; +import org.ray.streaming.core.runtime.StreamWorker; +import org.ray.streaming.message.Message; +import org.ray.streaming.message.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The collector that emits data via Ray remote calls. + */ +public class RayCallCollector implements Collector { + + private static final Logger LOGGER = LoggerFactory.getLogger(RayCallCollector.class); + + private int taskId; + private String stream; + private Map> taskId2Worker; + private int[] targetTaskIds; + private Partition partition; + + public RayCallCollector(int taskId, ExecutionEdge executionEdge, ExecutionGraph executionGraph) { + this.taskId = taskId; + this.stream = executionEdge.getStream(); + int targetNodeId = executionEdge.getTargetNodeId(); + taskId2Worker = executionGraph + .getTaskId2WorkerByNodeId(targetNodeId); + targetTaskIds = Arrays.stream(taskId2Worker.keySet() + .toArray(new Integer[taskId2Worker.size()])) + .mapToInt(Integer::valueOf).toArray(); + + this.partition = executionEdge.getPartition(); + LOGGER.debug("RayCallCollector constructed, taskId:{}, add stream:{}, partition:{}.", + taskId, stream, this.partition); + } + + @Override + public void collect(Record record) { + int[] taskIds = this.partition.partition(record, targetTaskIds); + LOGGER.debug("Sending data from task {} to remote tasks {}, collector stream:{}, record:{}", + taskId, taskIds, stream, record); + Message message = new Message(taskId, record.getBatchId(), stream, record); + for (int targetTaskId : taskIds) { + RayActor streamWorker = this.taskId2Worker.get(targetTaskId); + // Use ray call to send message to downstream actor. + Ray.call(StreamWorker::process, streamWorker, message); + } + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RayRuntimeContext.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RayRuntimeContext.java new file mode 100644 index 000000000..6d796c30e --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RayRuntimeContext.java @@ -0,0 +1,58 @@ +package org.ray.streaming.core.runtime.context; + +import static org.ray.streaming.util.ConfigKey.STREAMING_MAX_BATCH_COUNT; + +import java.util.Map; +import org.ray.streaming.core.graph.ExecutionTask; + +/** + * Use Ray to implement RuntimeContext. + */ +public class RayRuntimeContext implements RuntimeContext { + + private int taskId; + private int taskIndex; + private int parallelism; + private Long batchId; + private Map config; + + public RayRuntimeContext(ExecutionTask executionTask, Map config, + int parallelism) { + this.taskId = executionTask.getTaskId(); + this.config = config; + this.taskIndex = executionTask.getTaskIndex(); + this.parallelism = parallelism; + } + + @Override + public int getTaskId() { + return taskId; + } + + @Override + public int getTaskIndex() { + return taskIndex; + } + + @Override + public int getParallelism() { + return parallelism; + } + + @Override + public Long getBatchId() { + return batchId; + } + + @Override + public Long getMaxBatch() { + if (config.containsKey(STREAMING_MAX_BATCH_COUNT)) { + return Long.valueOf(String.valueOf(config.get(STREAMING_MAX_BATCH_COUNT))); + } + return Long.MAX_VALUE; + } + + public void setBatchId(Long batchId) { + this.batchId = batchId; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RuntimeContext.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RuntimeContext.java new file mode 100644 index 000000000..984987e29 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/RuntimeContext.java @@ -0,0 +1,19 @@ +package org.ray.streaming.core.runtime.context; + +/** + * Encapsulate the runtime information of a streaming task. + */ +public interface RuntimeContext { + + int getTaskId(); + + int getTaskIndex(); + + int getParallelism(); + + Long getBatchId(); + + Long getMaxBatch(); + + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/WorkerContext.java b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/WorkerContext.java new file mode 100644 index 000000000..39117fd7b --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/core/runtime/context/WorkerContext.java @@ -0,0 +1,41 @@ +package org.ray.streaming.core.runtime.context; + +import java.io.Serializable; +import java.util.Map; +import org.ray.streaming.core.graph.ExecutionGraph; + +/** + * Encapsulate the context information for worker initialization. + */ +public class WorkerContext implements Serializable { + + private int taskId; + private ExecutionGraph executionGraph; + private Map config; + + public WorkerContext(int taskId, ExecutionGraph executionGraph, Map jobConfig) { + this.taskId = taskId; + this.executionGraph = executionGraph; + this.config = jobConfig; + } + + public int getTaskId() { + return taskId; + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + public ExecutionGraph getExecutionGraph() { + return executionGraph; + } + + public void setExecutionGraph(ExecutionGraph executionGraph) { + this.executionGraph = executionGraph; + } + + public Map getConfig() { + return config; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/message/KeyRecord.java b/java/streaming/src/main/java/org/ray/streaming/message/KeyRecord.java new file mode 100644 index 000000000..de4fb2767 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/message/KeyRecord.java @@ -0,0 +1,20 @@ +package org.ray.streaming.message; + + +public class KeyRecord extends Record { + + private K key; + + public KeyRecord(K key, T value) { + super(value); + this.key = key; + } + + public K getKey() { + return key; + } + + public void setKey(K key) { + this.key = key; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/message/Message.java b/java/streaming/src/main/java/org/ray/streaming/message/Message.java new file mode 100644 index 000000000..164aba7d5 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/message/Message.java @@ -0,0 +1,64 @@ +package org.ray.streaming.message; + +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.List; + +public class Message implements Serializable { + + private int taskId; + private long batchId; + private String stream; + private List recordList; + + public Message(int taskId, long batchId, String stream, List recordList) { + this.taskId = taskId; + this.batchId = batchId; + this.stream = stream; + this.recordList = recordList; + } + + public Message(int taskId, long batchId, String stream, Record record) { + this.taskId = taskId; + this.batchId = batchId; + this.stream = stream; + this.recordList = Lists.newArrayList(record); + } + + public int getTaskId() { + return taskId; + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + public long getBatchId() { + return batchId; + } + + public void setBatchId(long batchId) { + this.batchId = batchId; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public List getRecordList() { + return recordList; + } + + public void setRecordList(List recordList) { + this.recordList = recordList; + } + + public Record getRecord(int index) { + return recordList.get(0); + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/message/Record.java b/java/streaming/src/main/java/org/ray/streaming/message/Record.java new file mode 100644 index 000000000..5898fc63a --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/message/Record.java @@ -0,0 +1,50 @@ +package org.ray.streaming.message; + +import java.io.Serializable; + + +public class Record implements Serializable { + + protected transient String stream; + protected transient long batchId; + protected T value; + + public Record(T value) { + this.value = value; + } + + public Record(long batchId, T value) { + this.batchId = batchId; + this.value = value; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public long getBatchId() { + return batchId; + } + + public void setBatchId(long batchId) { + this.batchId = batchId; + } + + @Override + public String toString() { + return value.toString(); + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/OneInputOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/OneInputOperator.java new file mode 100644 index 000000000..fdc620740 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/OneInputOperator.java @@ -0,0 +1,13 @@ +package org.ray.streaming.operator; + +import org.ray.streaming.message.Record; + + +public interface OneInputOperator extends Operator { + + void processElement(Record record) throws Exception; + + default OperatorType getOpType() { + return OperatorType.ONE_INPUT; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/Operator.java b/java/streaming/src/main/java/org/ray/streaming/operator/Operator.java new file mode 100644 index 000000000..46e8f2b25 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/Operator.java @@ -0,0 +1,17 @@ +package org.ray.streaming.operator; + +import java.io.Serializable; +import java.util.List; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.core.runtime.context.RuntimeContext; + +public interface Operator extends Serializable { + + void open(List collectors, RuntimeContext runtimeContext); + + void finish(); + + void close(); + + OperatorType getOpType(); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/OperatorType.java b/java/streaming/src/main/java/org/ray/streaming/operator/OperatorType.java new file mode 100644 index 000000000..cc8f56406 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/OperatorType.java @@ -0,0 +1,9 @@ +package org.ray.streaming.operator; + + +public enum OperatorType { + MASTER, + SOURCE, + ONE_INPUT, + TWO_INPUT, +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/StreamOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/StreamOperator.java new file mode 100644 index 000000000..5294e9115 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/StreamOperator.java @@ -0,0 +1,47 @@ +package org.ray.streaming.operator; + +import java.util.List; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.api.function.Function; +import org.ray.streaming.core.runtime.context.RuntimeContext; +import org.ray.streaming.message.KeyRecord; +import org.ray.streaming.message.Record; + +public abstract class StreamOperator implements Operator { + + protected F function; + protected List collectorList; + protected RuntimeContext runtimeContext; + + + public StreamOperator(F function) { + this.function = function; + } + + public void open(List collectorList, RuntimeContext runtimeContext) { + this.collectorList = collectorList; + this.runtimeContext = runtimeContext; + } + + public void finish() { + + } + + public void close() { + + } + + + protected void collect(Record record) { + for (Collector collector : this.collectorList) { + collector.collect(record); + } + } + + protected void collect(KeyRecord keyRecord) { + for (Collector collector : this.collectorList) { + collector.collect(keyRecord); + } + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/TwoInputOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/TwoInputOperator.java new file mode 100644 index 000000000..5528b0875 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/TwoInputOperator.java @@ -0,0 +1,13 @@ +package org.ray.streaming.operator; + +import org.ray.streaming.message.Record; + + +public interface TwoInputOperator extends Operator { + + void processElement(Record record1, Record record2); + + default OperatorType getOpType() { + return OperatorType.TWO_INPUT; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/FlatMapOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/FlatMapOperator.java new file mode 100644 index 000000000..556c17077 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/FlatMapOperator.java @@ -0,0 +1,31 @@ +package org.ray.streaming.operator.impl; + +import java.util.List; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.api.function.impl.FlatMapFunction; +import org.ray.streaming.core.runtime.collector.CollectionCollector; +import org.ray.streaming.core.runtime.context.RuntimeContext; +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.OneInputOperator; +import org.ray.streaming.operator.StreamOperator; + +public class FlatMapOperator extends StreamOperator> implements + OneInputOperator { + + private CollectionCollector collectionCollector; + + public FlatMapOperator(FlatMapFunction flatMapFunction) { + super(flatMapFunction); + } + + @Override + public void open(List collectorList, RuntimeContext runtimeContext) { + super.open(collectorList, runtimeContext); + this.collectionCollector = new CollectionCollector(collectorList); + } + + @Override + public void processElement(Record record) throws Exception { + this.function.flatMap(record.getValue(), (Collector) collectionCollector); + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/KeyByOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/KeyByOperator.java new file mode 100644 index 000000000..5ace0b13f --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/KeyByOperator.java @@ -0,0 +1,22 @@ +package org.ray.streaming.operator.impl; + +import org.ray.streaming.api.function.impl.KeyFunction; +import org.ray.streaming.message.KeyRecord; +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.OneInputOperator; +import org.ray.streaming.operator.StreamOperator; + +public class KeyByOperator extends StreamOperator> implements + OneInputOperator { + + public KeyByOperator(KeyFunction keyFunction) { + super(keyFunction); + } + + @Override + public void processElement(Record record) throws Exception { + K key = this.function.keyBy(record.getValue()); + collect(new KeyRecord<>(key, record.getValue())); + } +} + diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/MapOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/MapOperator.java new file mode 100644 index 000000000..7fc260479 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/MapOperator.java @@ -0,0 +1,20 @@ +package org.ray.streaming.operator.impl; + +import org.ray.streaming.api.function.impl.MapFunction; +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.OneInputOperator; +import org.ray.streaming.operator.StreamOperator; + + +public class MapOperator extends StreamOperator> implements + OneInputOperator { + + public MapOperator(MapFunction mapFunction) { + super(mapFunction); + } + + @Override + public void processElement(Record record) throws Exception { + this.collect(new Record(this.function.map(record.getValue()))); + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/MasterOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/MasterOperator.java new file mode 100644 index 000000000..963a0d11b --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/MasterOperator.java @@ -0,0 +1,17 @@ +package org.ray.streaming.operator.impl; + +import org.ray.streaming.operator.OperatorType; +import org.ray.streaming.operator.StreamOperator; + + +public class MasterOperator extends StreamOperator { + + public MasterOperator() { + super(null); + } + + @Override + public OperatorType getOpType() { + return OperatorType.MASTER; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/ReduceOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/ReduceOperator.java new file mode 100644 index 000000000..6aa7ff892 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/ReduceOperator.java @@ -0,0 +1,44 @@ +package org.ray.streaming.operator.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.api.function.impl.ReduceFunction; +import org.ray.streaming.core.runtime.context.RuntimeContext; +import org.ray.streaming.message.KeyRecord; +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.OneInputOperator; +import org.ray.streaming.operator.StreamOperator; + +public class ReduceOperator extends StreamOperator> implements + OneInputOperator { + + private Map reduceState; + + public ReduceOperator(ReduceFunction reduceFunction) { + super(reduceFunction); + } + + @Override + public void open(List collectorList, RuntimeContext runtimeContext) { + super.open(collectorList, runtimeContext); + this.reduceState = new HashMap<>(); + } + + @Override + public void processElement(Record record) throws Exception { + KeyRecord keyRecord = (KeyRecord) record; + K key = keyRecord.getKey(); + T value = keyRecord.getValue(); + if (reduceState.containsKey(key)) { + T oldValue = reduceState.get(key); + T newValue = this.function.reduce(oldValue, value); + reduceState.put(key, newValue); + collect(new Record(newValue)); + } else { + reduceState.put(key, value); + collect(record); + } + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/SinkOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/SinkOperator.java new file mode 100644 index 000000000..66bd7b9ac --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/SinkOperator.java @@ -0,0 +1,20 @@ +package org.ray.streaming.operator.impl; + +import org.ray.streaming.api.function.impl.SinkFunction; +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.OneInputOperator; +import org.ray.streaming.operator.StreamOperator; + + +public class SinkOperator extends StreamOperator> implements + OneInputOperator { + + public SinkOperator(SinkFunction sinkFunction) { + super(sinkFunction); + } + + @Override + public void processElement(Record record) throws Exception { + this.function.sink(record.getValue()); + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/operator/impl/SourceOperator.java b/java/streaming/src/main/java/org/ray/streaming/operator/impl/SourceOperator.java new file mode 100644 index 000000000..679000abd --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/operator/impl/SourceOperator.java @@ -0,0 +1,61 @@ +package org.ray.streaming.operator.impl; + +import java.util.List; +import org.ray.streaming.api.collector.Collector; +import org.ray.streaming.api.function.impl.SourceFunction; +import org.ray.streaming.api.function.impl.SourceFunction.SourceContext; +import org.ray.streaming.core.runtime.context.RuntimeContext; +import org.ray.streaming.message.Record; +import org.ray.streaming.operator.OperatorType; +import org.ray.streaming.operator.StreamOperator; + +public class SourceOperator extends StreamOperator> { + + private SourceContextImpl sourceContext; + + public SourceOperator(SourceFunction function) { + super(function); + } + + @Override + public void open(List collectorList, RuntimeContext runtimeContext) { + super.open(collectorList, runtimeContext); + this.sourceContext = new SourceContextImpl(collectorList); + } + + public void process(Long batchId) { + try { + this.sourceContext.setBatchId(batchId); + this.function.fetch(batchId, this.sourceContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + @Override + public OperatorType getOpType() { + return OperatorType.SOURCE; + } + + class SourceContextImpl implements SourceContext { + + private long batchId; + private List collectors; + + public SourceContextImpl(List collectors) { + this.collectors = collectors; + } + + @Override + public void collect(T t) throws Exception { + for (Collector collector : collectors) { + collector.collect(new Record(batchId, t)); + } + } + + private void setBatchId(long batchId) { + this.batchId = batchId; + } + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/plan/Plan.java b/java/streaming/src/main/java/org/ray/streaming/plan/Plan.java new file mode 100644 index 000000000..7e86a5645 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/plan/Plan.java @@ -0,0 +1,58 @@ +package org.ray.streaming.plan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The logical execution plan. + */ +public class Plan implements Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(Plan.class); + + private List planVertexList; + private List planEdgeList; + + public Plan() { + this.planVertexList = new ArrayList<>(); + this.planEdgeList = new ArrayList<>(); + } + + public void addVertex(PlanVertex vertex) { + this.planVertexList.add(vertex); + } + + public void addEdge(PlanEdge planEdge) { + this.planEdgeList.add(planEdge); + } + + public List getPlanVertexList() { + return planVertexList; + } + + public List getPlanEdgeList() { + return planEdgeList; + } + + public String getGraphVizPlan() { + return ""; + } + + public void printPlan() { + if (!LOGGER.isInfoEnabled()) { + return; + } + LOGGER.info("Printing logic plan:"); + for (PlanVertex planVertex : planVertexList) { + LOGGER.info(planVertex.toString()); + } + for (PlanEdge planEdge : planEdgeList) { + LOGGER.info(planEdge.toString()); + } + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/plan/PlanBuilder.java b/java/streaming/src/main/java/org/ray/streaming/plan/PlanBuilder.java new file mode 100644 index 000000000..5ae09cdbc --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/plan/PlanBuilder.java @@ -0,0 +1,62 @@ +package org.ray.streaming.plan; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.ray.streaming.api.stream.DataStream; +import org.ray.streaming.api.stream.Stream; +import org.ray.streaming.api.stream.StreamSink; +import org.ray.streaming.api.stream.StreamSource; +import org.ray.streaming.operator.StreamOperator; + +public class PlanBuilder { + + private Plan plan; + + private AtomicInteger edgeIdGenerator; + private List streamSinkList; + + public PlanBuilder(List streamSinkList) { + this.plan = new Plan(); + this.streamSinkList = streamSinkList; + this.edgeIdGenerator = new AtomicInteger(0); + } + + public Plan buildPlan() { + for (StreamSink streamSink : streamSinkList) { + processStream(streamSink); + } + return this.plan; + } + + private void processStream(Stream stream) { + int vertexId = stream.getId(); + int parallelism = stream.getParallelism(); + + StreamOperator streamOperator = stream.getOperator(); + PlanVertex planVertex = null; + + if (stream instanceof StreamSink) { + planVertex = new PlanVertex(vertexId, parallelism, VertexType.SINK, streamOperator); + Stream parentStream = stream.getInputStream(); + int inputVertexId = parentStream.getId(); + PlanEdge planEdge = new PlanEdge(inputVertexId, vertexId, parentStream.getPartition()); + this.plan.addEdge(planEdge); + processStream(parentStream); + } else if (stream instanceof StreamSource) { + planVertex = new PlanVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator); + } else if (stream instanceof DataStream) { + planVertex = new PlanVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator); + Stream parentStream = stream.getInputStream(); + int inputVertexId = parentStream.getId(); + PlanEdge planEdge = new PlanEdge(inputVertexId, vertexId, parentStream.getPartition()); + this.plan.addEdge(planEdge); + processStream(parentStream); + } + this.plan.addVertex(planVertex); + } + + private int getEdgeId() { + return this.edgeIdGenerator.incrementAndGet(); + } + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/plan/PlanEdge.java b/java/streaming/src/main/java/org/ray/streaming/plan/PlanEdge.java new file mode 100644 index 000000000..aee61ec00 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/plan/PlanEdge.java @@ -0,0 +1,50 @@ +package org.ray.streaming.plan; + +import java.io.Serializable; +import org.ray.streaming.api.partition.Partition; + +/** + * PlanEdge is connection and partition rules of upstream and downstream execution nodes. + */ +public class PlanEdge implements Serializable { + + private int srcVertexId; + private int targetVertexId; + private Partition partition; + + public PlanEdge(int srcVertexId, int targetVertexId, Partition partition) { + this.srcVertexId = srcVertexId; + this.targetVertexId = targetVertexId; + this.partition = partition; + } + + public int getSrcVertexId() { + return srcVertexId; + } + + public void setSrcVertexId(int srcVertexId) { + this.srcVertexId = srcVertexId; + } + + public int getTargetVertexId() { + return targetVertexId; + } + + public void setTargetVertexId(int targetVertexId) { + this.targetVertexId = targetVertexId; + } + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } + + @Override + public String toString() { + return "Edge(" + "from:" + srcVertexId + "-" + targetVertexId + "-" + this.partition.getClass() + + ")"; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/plan/PlanVertex.java b/java/streaming/src/main/java/org/ray/streaming/plan/PlanVertex.java new file mode 100644 index 000000000..e5c88b227 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/plan/PlanVertex.java @@ -0,0 +1,49 @@ +package org.ray.streaming.plan; + +import java.io.Serializable; +import org.ray.streaming.operator.StreamOperator; + +/** + * PlanVertex is a cell node where logic is executed. + */ +public class PlanVertex implements Serializable { + + private int vertexId; + private int parallelism; + private VertexType vertexType; + private StreamOperator streamOperator; + + public PlanVertex(int vertexId, int parallelism, VertexType vertexType, + StreamOperator streamOperator) { + this.vertexId = vertexId; + this.parallelism = parallelism; + this.vertexType = vertexType; + this.streamOperator = streamOperator; + } + + public int getVertexId() { + return vertexId; + } + + public int getParallelism() { + return parallelism; + } + + public StreamOperator getStreamOperator() { + return streamOperator; + } + + public VertexType getVertexType() { + return vertexType; + } + + @Override + public String toString() { + return "PlanVertex{" + + "vertexId=" + vertexId + + ", parallelism=" + parallelism + + ", vertexType=" + vertexType + + ", streamOperator=" + streamOperator + + '}'; + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/plan/VertexType.java b/java/streaming/src/main/java/org/ray/streaming/plan/VertexType.java new file mode 100644 index 000000000..b52e0d3be --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/plan/VertexType.java @@ -0,0 +1,11 @@ +package org.ray.streaming.plan; + +/** + * Different roles for a node. + */ +public enum VertexType { + MASTER, + SOURCE, + PROCESS, + SINK, +} diff --git a/java/streaming/src/main/java/org/ray/streaming/schedule/IJobSchedule.java b/java/streaming/src/main/java/org/ray/streaming/schedule/IJobSchedule.java new file mode 100644 index 000000000..aa57166c7 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/schedule/IJobSchedule.java @@ -0,0 +1,17 @@ +package org.ray.streaming.schedule; + + +import org.ray.streaming.plan.Plan; + +/** + * Interface of the job scheduler. + */ +public interface IJobSchedule { + + /** + * Assign logical plan to physical execution graph, and schedule job to run. + * + * @param plan The logical plan. + */ + void schedule(Plan plan); +} diff --git a/java/streaming/src/main/java/org/ray/streaming/schedule/ITaskAssign.java b/java/streaming/src/main/java/org/ray/streaming/schedule/ITaskAssign.java new file mode 100644 index 000000000..d9c7cd507 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/schedule/ITaskAssign.java @@ -0,0 +1,20 @@ +package org.ray.streaming.schedule; + +import java.io.Serializable; +import java.util.List; +import org.ray.api.RayActor; +import org.ray.streaming.core.graph.ExecutionGraph; +import org.ray.streaming.core.runtime.StreamWorker; +import org.ray.streaming.plan.Plan; + +/** + * Interface of the task assigning strategy. + */ +public interface ITaskAssign extends Serializable { + + /** + * Assign logical plan to physical execution graph. + */ + ExecutionGraph assign(Plan plan, List> workers); + +} diff --git a/java/streaming/src/main/java/org/ray/streaming/schedule/impl/JobScheduleImpl.java b/java/streaming/src/main/java/org/ray/streaming/schedule/impl/JobScheduleImpl.java new file mode 100644 index 000000000..45a13a107 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/schedule/impl/JobScheduleImpl.java @@ -0,0 +1,93 @@ +package org.ray.streaming.schedule.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.ray.api.Ray; +import org.ray.api.RayActor; +import org.ray.api.RayObject; +import org.ray.streaming.api.partition.impl.BroadcastPartition; +import org.ray.streaming.cluster.ResourceManager; +import org.ray.streaming.core.graph.ExecutionGraph; +import org.ray.streaming.core.graph.ExecutionNode; +import org.ray.streaming.core.graph.ExecutionNode.NodeType; +import org.ray.streaming.core.graph.ExecutionTask; +import org.ray.streaming.core.runtime.StreamWorker; +import org.ray.streaming.core.runtime.context.WorkerContext; +import org.ray.streaming.operator.impl.MasterOperator; +import org.ray.streaming.plan.Plan; +import org.ray.streaming.plan.PlanEdge; +import org.ray.streaming.plan.PlanVertex; +import org.ray.streaming.plan.VertexType; +import org.ray.streaming.schedule.IJobSchedule; +import org.ray.streaming.schedule.ITaskAssign; + + +public class JobScheduleImpl implements IJobSchedule { + + private Plan plan; + private Map jobConfig; + private ResourceManager resourceManager; + private ITaskAssign taskAssign; + + public JobScheduleImpl(Map jobConfig) { + this.resourceManager = new ResourceManager(); + this.taskAssign = new TaskAssignImpl(); + this.jobConfig = jobConfig; + } + + /** + * Schedule physical plan to execution graph, and call streaming worker to init and run. + */ + @Override + public void schedule(Plan plan) { + this.plan = plan; + addJobMaster(plan); + List> workers = this.resourceManager.createWorker(getPlanWorker()); + ExecutionGraph executionGraph = this.taskAssign.assign(this.plan, workers); + + List executionNodes = executionGraph.getExecutionNodeList(); + List> waits = new ArrayList<>(); + ExecutionTask masterTask = null; + for (ExecutionNode executionNode : executionNodes) { + List executionTasks = executionNode.getExecutionTaskList(); + for (ExecutionTask executionTask : executionTasks) { + if (executionNode.getNodeType() != NodeType.MASTER) { + Integer taskId = executionTask.getTaskId(); + RayActor streamWorker = executionTask.getWorker(); + waits.add(Ray.call(StreamWorker::init, streamWorker, + new WorkerContext(taskId, executionGraph, jobConfig))); + } else { + masterTask = executionTask; + } + } + } + Ray.wait(waits); + + Integer masterId = masterTask.getTaskId(); + RayActor masterWorker = masterTask.getWorker(); + Ray.call(StreamWorker::init, masterWorker, + new WorkerContext(masterId, executionGraph, jobConfig)).get(); + } + + private void addJobMaster(Plan plan) { + int masterVertexId = 0; + int masterParallelism = 1; + PlanVertex masterVertex = new PlanVertex(masterVertexId, masterParallelism, VertexType.MASTER, + new MasterOperator()); + plan.getPlanVertexList().add(masterVertex); + List planVertices = plan.getPlanVertexList(); + for (PlanVertex planVertex : planVertices) { + if (planVertex.getVertexType() == VertexType.SOURCE) { + PlanEdge planEdge = new PlanEdge(masterVertexId, planVertex.getVertexId(), + new BroadcastPartition()); + plan.getPlanEdgeList().add(planEdge); + } + } + } + + private int getPlanWorker() { + List planVertexList = plan.getPlanVertexList(); + return planVertexList.stream().map(vertex -> vertex.getParallelism()).reduce(0, Integer::sum); + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/schedule/impl/TaskAssignImpl.java b/java/streaming/src/main/java/org/ray/streaming/schedule/impl/TaskAssignImpl.java new file mode 100644 index 000000000..be3f6ae35 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/schedule/impl/TaskAssignImpl.java @@ -0,0 +1,66 @@ +package org.ray.streaming.schedule.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.ray.api.RayActor; +import org.ray.streaming.core.graph.ExecutionEdge; +import org.ray.streaming.core.graph.ExecutionGraph; +import org.ray.streaming.core.graph.ExecutionNode; +import org.ray.streaming.core.graph.ExecutionTask; +import org.ray.streaming.core.processor.ProcessBuilder; +import org.ray.streaming.core.processor.StreamProcessor; +import org.ray.streaming.core.runtime.StreamWorker; +import org.ray.streaming.plan.Plan; +import org.ray.streaming.plan.PlanEdge; +import org.ray.streaming.plan.PlanVertex; +import org.ray.streaming.schedule.ITaskAssign; + +public class TaskAssignImpl implements ITaskAssign { + + /** + * Assign an optimized logical plan to execution graph. + * + * @param plan The logical plan. + * @param workers The worker actors. + * @return The physical execution graph. + */ + @Override + public ExecutionGraph assign(Plan plan, List> workers) { + List planVertices = plan.getPlanVertexList(); + List planEdges = plan.getPlanEdgeList(); + + int taskId = 0; + Map idToExecutionNode = new HashMap<>(); + for (PlanVertex planVertex : planVertices) { + ExecutionNode executionNode = new ExecutionNode(planVertex.getVertexId(), + planVertex.getParallelism()); + executionNode.setNodeType(planVertex.getVertexType()); + List vertexTasks = new ArrayList<>(); + for (int taskIndex = 0; taskIndex < planVertex.getParallelism(); taskIndex++) { + vertexTasks.add(new ExecutionTask(taskId, taskIndex, workers.get(taskId))); + taskId++; + } + StreamProcessor streamProcessor = ProcessBuilder + .buildProcessor(planVertex.getStreamOperator()); + executionNode.setExecutionTaskList(vertexTasks); + executionNode.setStreamProcessor(streamProcessor); + idToExecutionNode.put(executionNode.getNodeId(), executionNode); + } + + for (PlanEdge planEdge : planEdges) { + int srcNodeId = planEdge.getSrcVertexId(); + int targetNodeId = planEdge.getTargetVertexId(); + + ExecutionEdge executionEdge = new ExecutionEdge(srcNodeId, targetNodeId, + planEdge.getPartition()); + idToExecutionNode.get(srcNodeId).addExecutionEdge(executionEdge); + } + + List executionNodes = idToExecutionNode.values().stream() + .collect(Collectors.toList()); + return new ExecutionGraph(executionNodes); + } +} diff --git a/java/streaming/src/main/java/org/ray/streaming/util/ConfigKey.java b/java/streaming/src/main/java/org/ray/streaming/util/ConfigKey.java new file mode 100644 index 000000000..3fed75654 --- /dev/null +++ b/java/streaming/src/main/java/org/ray/streaming/util/ConfigKey.java @@ -0,0 +1,10 @@ +package org.ray.streaming.util; + +public class ConfigKey { + + /** + * Maximum number of batches to run in a streaming job. + */ + public static final String STREAMING_MAX_BATCH_COUNT = "streaming.max.batch.count"; + +} diff --git a/java/streaming/src/main/resources/log4j.properties b/java/streaming/src/main/resources/log4j.properties new file mode 100644 index 000000000..30d876aec --- /dev/null +++ b/java/streaming/src/main/resources/log4j.properties @@ -0,0 +1,6 @@ +log4j.rootLogger=INFO, stdout +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/java/streaming/src/main/resources/ray.conf b/java/streaming/src/main/resources/ray.conf new file mode 100644 index 000000000..4494cff06 --- /dev/null +++ b/java/streaming/src/main/resources/ray.conf @@ -0,0 +1,5 @@ +ray { + run-mode = SINGLE_PROCESS + resources = "CPU:4,GPU:0" + redis.address = "" +} diff --git a/java/streaming/src/test/java/org/ray/streaming/demo/WordCountTest.java b/java/streaming/src/test/java/org/ray/streaming/demo/WordCountTest.java new file mode 100644 index 000000000..d104c8e6d --- /dev/null +++ b/java/streaming/src/test/java/org/ray/streaming/demo/WordCountTest.java @@ -0,0 +1,75 @@ +package org.ray.streaming.demo; + +import com.google.common.collect.ImmutableMap; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.function.impl.FlatMapFunction; +import org.ray.streaming.api.function.impl.ReduceFunction; +import org.ray.streaming.api.function.impl.SinkFunction; +import org.ray.streaming.api.stream.StreamSource; +import org.ray.streaming.util.ConfigKey; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class WordCountTest implements Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(WordCountTest.class); + + // TODO(zhenxuanpan): this test only works in single-process mode, because we put + // results in this in-memory map. + static Map wordCount = new ConcurrentHashMap<>(); + + @Test + public void testWordCount() { + StreamingContext streamingContext = StreamingContext.buildContext(); + Map config = new HashMap<>(); + config.put(ConfigKey.STREAMING_MAX_BATCH_COUNT, 1); + streamingContext.withConfig(config); + List text = new ArrayList<>(); + text.add("hello world eagle eagle eagle"); + StreamSource streamSource = StreamSource.buildSource(streamingContext, text); + streamSource + .flatMap((FlatMapFunction) (value, collector) -> { + String[] records = value.split(" "); + for (String record : records) { + collector.collect(new WordAndCount(record, 1)); + } + }) + .keyBy(pair -> pair.word) + .reduce((ReduceFunction) (oldValue, newValue) -> + new WordAndCount(oldValue.word, oldValue.count + newValue.count)) + .sink((SinkFunction) result -> wordCount.put(result.word, result.count)); + + streamingContext.execute(); + + // Sleep until the count for every word is computed. + while (wordCount.size() < 3) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOGGER.warn("Got an exception while sleeping.", e); + } + } + Assert.assertEquals(wordCount, ImmutableMap.of("eagle", 3, "hello", 1, "world", 1)); + } + + private static class WordAndCount implements Serializable { + + public final String word; + public final Integer count; + + public WordAndCount(String key, Integer count) { + this.word = key; + this.count = count; + } + } + +} diff --git a/java/streaming/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java b/java/streaming/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java new file mode 100644 index 000000000..bf631d61b --- /dev/null +++ b/java/streaming/src/test/java/org/ray/streaming/plan/PlanBuilderTest.java @@ -0,0 +1,87 @@ +package org.ray.streaming.plan; + + +import com.google.common.collect.Lists; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.partition.impl.KeyPartition; +import org.ray.streaming.api.partition.impl.RoundRobinPartition; +import org.ray.streaming.api.stream.DataStream; +import org.ray.streaming.api.stream.StreamSink; +import org.ray.streaming.api.stream.StreamSource; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class PlanBuilderTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(PlanBuilderTest.class); + + @Test + public void testDataSync() { + Plan plan = buildDataSyncPlan(); + List planVertexList = plan.getPlanVertexList(); + List planEdgeList = plan.getPlanEdgeList(); + + Assert.assertEquals(planVertexList.size(), 2); + Assert.assertEquals(planEdgeList.size(), 1); + + PlanEdge planEdge = planEdgeList.get(0); + Assert.assertEquals(planEdge.getPartition().getClass(), RoundRobinPartition.class); + + PlanVertex sinkVertex = planVertexList.get(1); + PlanVertex sourceVertex = planVertexList.get(0); + Assert.assertEquals(sinkVertex.getVertexType(), VertexType.SINK); + Assert.assertEquals(sourceVertex.getVertexType(), VertexType.SOURCE); + + } + + public Plan buildDataSyncPlan() { + StreamingContext streamingContext = StreamingContext.buildContext(); + DataStream dataStream = StreamSource.buildSource(streamingContext, + Lists.newArrayList("a", "b", "c")); + StreamSink streamSink = dataStream.sink(x -> LOGGER.info(x)); + PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink)); + + Plan plan = planBuilder.buildPlan(); + return plan; + } + + @Test + public void testKeyByPlan() { + Plan plan = buildKeyByPlan(); + List planVertexList = plan.getPlanVertexList(); + List planEdgeList = plan.getPlanEdgeList(); + + Assert.assertEquals(planVertexList.size(), 3); + Assert.assertEquals(planEdgeList.size(), 2); + + PlanVertex source = planVertexList.get(0); + PlanVertex map = planVertexList.get(1); + PlanVertex sink = planVertexList.get(2); + + Assert.assertEquals(source.getVertexType(), VertexType.SOURCE); + Assert.assertEquals(map.getVertexType(), VertexType.PROCESS); + Assert.assertEquals(sink.getVertexType(), VertexType.SINK); + + PlanEdge keyBy2Sink = planEdgeList.get(0); + PlanEdge source2KeyBy = planEdgeList.get(1); + + Assert.assertEquals(keyBy2Sink.getPartition().getClass(), KeyPartition.class); + Assert.assertEquals(source2KeyBy.getPartition().getClass(), RoundRobinPartition.class); + } + + public Plan buildKeyByPlan() { + StreamingContext streamingContext = StreamingContext.buildContext(); + DataStream dataStream = StreamSource.buildSource(streamingContext, + Lists.newArrayList("1", "2", "3", "4")); + StreamSink streamSink = dataStream.keyBy(x -> x) + .sink(x -> LOGGER.info(x)); + PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink)); + + Plan plan = planBuilder.buildPlan(); + return plan; + } + +} \ No newline at end of file diff --git a/java/streaming/src/test/java/org/ray/streaming/schedule/impl/TaskAssignImplTest.java b/java/streaming/src/test/java/org/ray/streaming/schedule/impl/TaskAssignImplTest.java new file mode 100644 index 000000000..90c6bff2a --- /dev/null +++ b/java/streaming/src/test/java/org/ray/streaming/schedule/impl/TaskAssignImplTest.java @@ -0,0 +1,58 @@ +package org.ray.streaming.schedule.impl; + +import org.ray.streaming.api.partition.impl.RoundRobinPartition; +import org.ray.streaming.core.graph.ExecutionEdge; +import org.ray.streaming.core.graph.ExecutionGraph; +import org.ray.streaming.core.graph.ExecutionNode; +import org.ray.streaming.core.graph.ExecutionNode.NodeType; +import org.ray.streaming.core.runtime.StreamWorker; +import org.ray.streaming.plan.Plan; +import org.ray.streaming.plan.PlanBuilderTest; +import org.ray.streaming.schedule.ITaskAssign; +import java.util.ArrayList; +import java.util.List; +import org.ray.api.RayActor; +import org.ray.runtime.RayActorImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TaskAssignImplTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskAssignImplTest.class); + + @Test + public void testTaskAssignImpl() { + PlanBuilderTest planBuilderTest = new PlanBuilderTest(); + Plan plan = planBuilderTest.buildDataSyncPlan(); + + List> workers = new ArrayList<>(); + for(int i = 0; i < plan.getPlanVertexList().size(); i++) { + workers.add(new RayActorImpl<>()); + } + + ITaskAssign taskAssign = new TaskAssignImpl(); + ExecutionGraph executionGraph = taskAssign.assign(plan, workers); + + List executionNodeList = executionGraph.getExecutionNodeList(); + + Assert.assertEquals(executionNodeList.size(), 2); + ExecutionNode sourceNode = executionNodeList.get(0); + Assert.assertEquals(sourceNode.getNodeType(), NodeType.SOURCE); + Assert.assertEquals(sourceNode.getExecutionTaskList().size(), 1); + Assert.assertEquals(sourceNode.getExecutionEdgeList().size(), 1); + + List sourceExecutionEdges = sourceNode.getExecutionEdgeList(); + + Assert.assertEquals(sourceExecutionEdges.size(), 1); + ExecutionEdge source2Sink = sourceExecutionEdges.get(0); + + Assert.assertEquals(source2Sink.getPartition().getClass(), RoundRobinPartition.class); + + ExecutionNode sinkNode = executionNodeList.get(1); + Assert.assertEquals(sinkNode.getNodeType(), NodeType.SINK); + Assert.assertEquals(sinkNode.getExecutionTaskList().size(), 1); + Assert.assertEquals(sinkNode.getExecutionEdgeList().size(), 0); + } +} diff --git a/java/streaming/src/test/resources/log4j.properties b/java/streaming/src/test/resources/log4j.properties new file mode 100644 index 000000000..30d876aec --- /dev/null +++ b/java/streaming/src/test/resources/log4j.properties @@ -0,0 +1,6 @@ +log4j.rootLogger=INFO, stdout +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n diff --git a/java/streaming/src/test/resources/ray.conf b/java/streaming/src/test/resources/ray.conf new file mode 100644 index 000000000..fdc897fa6 --- /dev/null +++ b/java/streaming/src/test/resources/ray.conf @@ -0,0 +1,3 @@ +ray { + run-mode = SINGLE_PROCESS +} diff --git a/java/streaming/testng.xml b/java/streaming/testng.xml new file mode 100644 index 000000000..3ae10ba75 --- /dev/null +++ b/java/streaming/testng.xml @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/java/test.sh b/java/test.sh index d6be89b7f..68e0fc87c 100755 --- a/java/test.sh +++ b/java/test.sh @@ -7,6 +7,14 @@ set -x ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) +run_testng() { + $@ || exit_code=$? + # exit_code == 2 means there are skipped tests. + if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then + exit $exit_code + fi +} + pushd $ROOT_DIR/.. echo "Linting Java code with checkstyle." # NOTE(hchen): The `test_tag_filters` option causes bazel to ignore caches. @@ -17,21 +25,14 @@ echo "Running tests under cluster mode." # TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests, # TestNG will exit with code 2. And bazel treats it as test failure. # bazel test //java:all_tests --action_env=ENABLE_MULTI_LANGUAGE_TESTS=1 --test_output="errors" || cluster_exit_code=$? -ENABLE_MULTI_LANGUAGE_TESTS=1 java -jar $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar $ROOT_DIR/testng.xml|| cluster_exit_code=$? - -# exit_code == 2 means there are some tests skiped. -if [ $cluster_exit_code -ne 2 ] && [ $cluster_exit_code -ne 0 ] ; then - exit $cluster_exit_code -fi +ENABLE_MULTI_LANGUAGE_TESTS=1 run_testng java -jar $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar $ROOT_DIR/testng.xml echo "Running tests under single-process mode." # bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --test_output="errors" || single_exit_code=$? -java -jar -Dray.run-mode="SINGLE_PROCESS" $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar $ROOT_DIR/testng.xml || single_exit_code=$? +run_testng java -jar -Dray.run-mode="SINGLE_PROCESS" $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar $ROOT_DIR/testng.xml -# exit_code == 2 means there are some tests skiped. -if [ $single_exit_code -ne 2 ] && [ $single_exit_code -ne 0 ] ; then - exit $single_exit_code -fi +echo "Running streaming tests." +run_testng java -jar ./bazel-bin/java/streaming_tests_deploy.jar java/streaming/testng.xml popd