Initial commit for Ray streaming (#4268)

This commit is contained in:
ppeagle 2019-03-30 19:32:05 +08:00 committed by Hao Chen
parent e5bcae52f5
commit 5efb21e1d0
80 changed files with 2873 additions and 11 deletions

View file

@ -4,6 +4,7 @@ exports_files([
"testng.xml",
"checkstyle.xml",
"checkstyle-suppressions.xml",
"streaming/testng.xml",
])
java_binary(
@ -21,14 +22,20 @@ java_import(
"liborg_ray_ray_runtime-src.jar",
"liborg_ray_ray_tutorial.jar",
"liborg_ray_ray_tutorial-src.jar",
"liborg_ray_ray_streaming.jar",
"liborg_ray_ray_streaming-src.jar",
"all_tests_deploy.jar",
"all_tests_deploy-src.jar",
"streaming_tests_deploy.jar",
"streaming_tests_deploy-src.jar",
],
deps = [
":org_ray_ray_api",
":org_ray_ray_runtime",
":org_ray_ray_tutorial",
":org_ray_ray_streaming",
":all_tests",
":streaming_tests",
],
)
@ -103,6 +110,28 @@ define_java_module(
],
)
define_java_module(
name = "streaming",
deps = [
":org_ray_ray_api",
":org_ray_ray_runtime",
"@com_google_guava_guava//jar",
"@org_slf4j_slf4j_api//jar",
"@org_slf4j_slf4j_log4j12//jar",
],
define_test_lib = True,
test_deps = [
":org_ray_ray_api",
":org_ray_ray_runtime",
":org_ray_ray_streaming",
"@com_beust_jcommander//jar",
"@com_google_guava_guava//jar",
"@org_slf4j_slf4j_api//jar",
"@org_slf4j_slf4j_log4j12//jar",
"@org_testng_testng//jar",
],
)
java_binary(
name = "all_tests",
main_class = "org.testng.TestNG",
@ -114,6 +143,16 @@ java_binary(
],
)
java_binary(
name = "streaming_tests",
main_class = "org.testng.TestNG",
data = ["streaming/testng.xml"],
args = ["java/streaming/testng.xml"],
runtime_deps = [
":org_ray_ray_streaming_test",
],
)
flatbuffers_generated_files = [
"ActorCheckpointData.java",
"ActorCheckpointIdData.java",

View file

@ -12,6 +12,7 @@
<module>api</module>
<module>runtime</module>
<module>test</module>
<module>streaming</module>
<module>tutorial</module>
</modules>

42
java/streaming/pom.xml Normal file
View file

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.ray</groupId>
<artifactId>ray-superpom</artifactId>
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>streaming</artifactId>
<name>ray streaming</name>
<description>ray streaming</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ray</groupId>
<artifactId>ray-runtime</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
</dependencies>
</project>

View file

@ -0,0 +1,13 @@
package org.ray.streaming.api.collector;
/**
* The collector that collects data from an upstream operator, and emits data to downstream
* operators.
*
* @param <T> Type of the data to collect.
*/
public interface Collector<T> {
void collect(T value);
}

View file

@ -0,0 +1,66 @@
package org.ray.streaming.api.context;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.ray.api.Ray;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanBuilder;
import org.ray.streaming.schedule.IJobSchedule;
import org.ray.streaming.schedule.impl.JobScheduleImpl;
/**
* Encapsulate the context information of a streaming Job.
*/
public class StreamingContext implements Serializable {
private transient AtomicInteger idGenerator;
/**
* The sinks of this streaming job.
*/
private List<StreamSink> streamSinks;
private Map<String, Object> jobConfig;
/**
* The logic plan.
*/
private Plan plan;
private StreamingContext() {
this.idGenerator = new AtomicInteger(0);
this.streamSinks = new ArrayList<>();
this.jobConfig = new HashMap();
}
public static StreamingContext buildContext() {
Ray.init();
return new StreamingContext();
}
/**
* Construct job DAG, and execute the job.
*/
public void execute() {
PlanBuilder planBuilder = new PlanBuilder(this.streamSinks);
this.plan = planBuilder.buildPlan();
plan.printPlan();
IJobSchedule jobSchedule = new JobScheduleImpl(jobConfig);
jobSchedule.schedule(plan);
}
public int generateId() {
return this.idGenerator.incrementAndGet();
}
public void addSink(StreamSink streamSink) {
streamSinks.add(streamSink);
}
public void withConfig(Map<String, Object> jobConfig) {
this.jobConfig = jobConfig;
}
}

View file

@ -0,0 +1,10 @@
package org.ray.streaming.api.function;
import java.io.Serializable;
/**
* Interface of streaming functions.
*/
public interface Function extends Serializable {
}

View file

@ -0,0 +1,23 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of aggregate functions.
*
* @param <I> Type of the input data.
* @param <A> Type of the intermediate data.
* @param <O> Type of the output data.
*/
public interface AggregateFunction<I, A, O> extends Function {
A createAccumulator();
void add(I value, A accumulator);
O getResult(A accumulator);
A merge(A a, A b);
void retract(A acc, I value);
}

View file

@ -0,0 +1,16 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.function.Function;
/**
* Interface of flat-map functions.
*
* @param <T> Type of the input data.
* @param <R> Type of the output data.
*/
@FunctionalInterface
public interface FlatMapFunction<T, R> extends Function {
void flatMap(T value, Collector<R> collector);
}

View file

@ -0,0 +1,17 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of join functions.
*
* @param <T> Type of the left input data.
* @param <O> Type of the right input data.
* @param <R> Type of the output data.
*/
@FunctionalInterface
public interface JoinFunction<T, O, R> extends Function {
R join(T left, O right);
}

View file

@ -0,0 +1,15 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of key-by functions.
*
* @param <T> Type of the input data.
* @param <K> Type of the key-by field.
*/
@FunctionalInterface
public interface KeyFunction<T, K> extends Function {
K keyBy(T value);
}

View file

@ -0,0 +1,15 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of map functions.
*
* @param <T> type of the input data.
* @param <R> type of the output data.
*/
@FunctionalInterface
public interface MapFunction<T, R> extends Function {
R map(T value);
}

View file

@ -0,0 +1,14 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of process functions.
*
* @param <T> Type of the input data.
*/
@FunctionalInterface
public interface ProcessFunction<T> extends Function {
void process(T value);
}

View file

@ -0,0 +1,14 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of reduce functions.
*
* @param <T> Type of the input data.
*/
@FunctionalInterface
public interface ReduceFunction<T> extends Function {
T reduce(T oldValue, T newValue);
}

View file

@ -0,0 +1,14 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of sink functions.
*
* @param <T> Type of the sink data.
*/
@FunctionalInterface
public interface SinkFunction<T> extends Function {
void sink(T value);
}

View file

@ -0,0 +1,23 @@
package org.ray.streaming.api.function.impl;
import org.ray.streaming.api.function.Function;
/**
* Interface of Source functions.
*
* @param <T> Type of the data output by the source.
*/
public interface SourceFunction<T> extends Function {
void init(int parallel, int index);
void fetch(long batchId, SourceContext<T> ctx) throws Exception;
void close();
interface SourceContext<T> {
void collect(T element) throws Exception;
}
}

View file

@ -0,0 +1,34 @@
package org.ray.streaming.api.function.internal;
import java.util.Collection;
import org.ray.streaming.api.function.impl.SourceFunction;
/**
* The SourceFunction that fetch data from a Java Collection object.
*
* @param <T> Type of the data output by the source.
*/
public class CollectionSourceFunction<T> implements SourceFunction<T> {
private Collection<T> values;
public CollectionSourceFunction(Collection<T> values) {
this.values = values;
}
@Override
public void init(int parallel, int index) {
}
@Override
public void fetch(long batchId, SourceContext<T> ctx) throws Exception {
for (T value : values) {
ctx.collect(value);
}
}
@Override
public void close() {
}
}

View file

@ -0,0 +1,21 @@
package org.ray.streaming.api.partition;
import org.ray.streaming.api.function.Function;
/**
* Interface of the partitioning strategy.
* @param <T> Type of the input data.
*/
@FunctionalInterface
public interface Partition<T> extends Function {
/**
* Given a record and downstream tasks, determine which task(s) should receive the record.
*
* @param record The record.
* @param taskIds IDs of all downstream tasks.
* @return IDs of the downstream tasks that should receive the record.
*/
int[] partition(T record, int[] taskIds);
}

View file

@ -0,0 +1,17 @@
package org.ray.streaming.api.partition.impl;
import org.ray.streaming.api.partition.Partition;
/**
* Broadcast the record to all downstream tasks.
*/
public class BroadcastPartition<T> implements Partition<T> {
public BroadcastPartition() {
}
@Override
public int[] partition(T value, int[] taskIds) {
return taskIds;
}
}

View file

@ -0,0 +1,20 @@
package org.ray.streaming.api.partition.impl;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.message.KeyRecord;
/**
* Partition the record by the key.
*
* @param <K> Type of the partition key.
* @param <T> Type of the input record.
*/
public class KeyPartition<K, T> implements Partition<KeyRecord<K, T>> {
@Override
public int[] partition(KeyRecord<K, T> keyRecord, int[] taskIds) {
int length = taskIds.length;
int taskId = taskIds[Math.abs(keyRecord.getKey().hashCode() % length)];
return new int[]{taskId};
}
}

View file

@ -0,0 +1,24 @@
package org.ray.streaming.api.partition.impl;
import org.ray.streaming.api.partition.Partition;
/**
* Partition record to downstream tasks in a round-robin matter.
*
* @param <T> Type of the input record.
*/
public class RoundRobinPartition<T> implements Partition<T> {
private int seq;
public RoundRobinPartition() {
this.seq = 0;
}
@Override
public int[] partition(T value, int[] taskIds) {
int length = taskIds.length;
int taskId = taskIds[seq++ % length];
return new int[]{taskId};
}
}

View file

@ -0,0 +1,136 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.KeyFunction;
import org.ray.streaming.api.function.impl.MapFunction;
import org.ray.streaming.api.function.impl.SinkFunction;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.api.partition.impl.BroadcastPartition;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.operator.impl.FlatMapOperator;
import org.ray.streaming.operator.impl.KeyByOperator;
import org.ray.streaming.operator.impl.MapOperator;
import org.ray.streaming.operator.impl.SinkOperator;
/**
* Represents a stream of data.
*
* This class defines all the streaming operations.
*
* @param <T> Type of data in the stream.
*/
public class DataStream<T> extends Stream<T> {
public DataStream(StreamingContext streamingContext, StreamOperator streamOperator) {
super(streamingContext, streamOperator);
}
public DataStream(DataStream input, StreamOperator streamOperator) {
super(input, streamOperator);
}
/**
* Apply a map function to this stream.
*
* @param mapFunction The map function.
* @param <R> Type of data returned by the map function.
* @return A new DataStream.
*/
public <R> DataStream<R> map(MapFunction<T, R> mapFunction) {
return new DataStream<>(this, new MapOperator(mapFunction));
}
/**
* Apply a flat-map function to this stream.
*
* @param flatMapFunction The FlatMapFunction
* @param <R> Type of data returned by the flatmap function.
* @return A new DataStream
*/
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapFunction) {
return new DataStream(this, new FlatMapOperator(flatMapFunction));
}
/**
* Apply a union transformation to this stream, with another stream.
*
* @param other Another stream.
* @return A new UnionStream.
*/
public UnionStream<T> union(DataStream<T> other) {
return new UnionStream(this, null, other);
}
/**
* Apply a join transformation to this stream, with another stream.
*
* @param other Another stream.
* @param <O> The type of the other stream data.
* @param <R> The type of the data in the joined stream.
* @return A new JoinStream.
*/
public <O, R> JoinStream<T, O, R> join(DataStream<O> other) {
return new JoinStream<>(this, other);
}
public <R> DataStream<R> process() {
// TODO(zhenxuanpan): Need to add processFunction.
return new DataStream(this, null);
}
/**
* Apply a sink function and get a StreamSink.
*
* @param sinkFunction The sink function.
* @return A new StreamSink.
*/
public StreamSink<T> sink(SinkFunction<T> sinkFunction) {
return new StreamSink<>(this, new SinkOperator(sinkFunction));
}
/**
* Apply a key-by function to this stream.
*
* @param keyFunction the key function.
* @param <K> The type of the key.
* @return A new KeyDataStream.
*/
public <K> KeyDataStream<K, T> keyBy(KeyFunction<T, K> keyFunction) {
return new KeyDataStream<>(this, new KeyByOperator(keyFunction));
}
/**
* Apply broadcast to this stream.
*
* @return This stream.
*/
public DataStream<T> broadcast() {
this.partition = new BroadcastPartition<>();
return this;
}
/**
* Apply a partition to this stream.
*
* @param partition The partitioning strategy.
* @return This stream.
*/
public DataStream<T> partitionBy(Partition<T> partition) {
this.partition = partition;
return this;
}
/**
* Set parallelism to current transformation.
*
* @param parallelism The parallelism to set.
* @return This stream.
*/
public DataStream<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}

View file

@ -0,0 +1,82 @@
package org.ray.streaming.api.stream;
import java.io.Serializable;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.JoinFunction;
import org.ray.streaming.api.function.impl.KeyFunction;
import org.ray.streaming.operator.StreamOperator;
/**
* Represents a DataStream of two joined DataStream.
*
* @param <L> Lype of the data in the left stream.
* @param <R> Lype of the data in the right stream.
* @param <J> Lype of the data in the joined stream.
*/
public class JoinStream<L, R, J> extends DataStream<L> {
public JoinStream(StreamingContext streamingContext, StreamOperator streamOperator) {
super(streamingContext, streamOperator);
}
public JoinStream(DataStream<L> leftStream, DataStream<R> rightStream) {
super(leftStream, null);
}
/**
* Apply key-by to the left join stream.
*/
public <K> Where<L, R, J, K> where(KeyFunction<L, K> keyFunction) {
return new Where<>(this, keyFunction);
}
/**
* Where clause of the join transformation.
*
* @param <L> Lype of the data in the left stream.
* @param <R> Lype of the data in the right stream.
* @param <J> Lype of the data in the joined stream.
* @param <K> Lype of the join key.
*/
class Where<L, R, J, K> implements Serializable {
private JoinStream<L, R, J> joinStream;
private KeyFunction<L, K> leftKeyByFunction;
public Where(JoinStream<L, R, J> joinStream, KeyFunction<L, K> leftKeyByFunction) {
this.joinStream = joinStream;
this.leftKeyByFunction = leftKeyByFunction;
}
public Equal<L, R, J, K> equalLo(KeyFunction<R, K> rightKeyFunction) {
return new Equal<>(joinStream, leftKeyByFunction, rightKeyFunction);
}
}
/**
* Equal clause of the join transformation.
*
* @param <L> Lype of the data in the left stream.
* @param <R> Lype of the data in the right stream.
* @param <J> Lype of the data in the joined stream.
* @param <K> Lype of the join key.
*/
class Equal<L, R, J, K> implements Serializable {
private JoinStream<L, R, J> joinStream;
private KeyFunction<L, K> leftKeyByFunction;
private KeyFunction<R, K> rightKeyByFunction;
public Equal(JoinStream<L, R, J> joinStream, KeyFunction<L, K> leftKeyByFunction,
KeyFunction<R, K> rightKeyByFunction) {
this.joinStream = joinStream;
this.leftKeyByFunction = leftKeyByFunction;
this.rightKeyByFunction = rightKeyByFunction;
}
public DataStream<J> with(JoinFunction<L, R, J> joinFunction) {
return (DataStream<J>) joinStream;
}
}
}

View file

@ -0,0 +1,53 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.AggregateFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.partition.impl.KeyPartition;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.operator.impl.ReduceOperator;
/**
* Represents a DataStream returned by a key-by operation.
*
* @param <K> Type of the key.
* @param <T> Type of the data.
*/
public class KeyDataStream<K, T> extends DataStream<T> {
public KeyDataStream(StreamingContext streamingContext, StreamOperator streamOperator) {
super(streamingContext, streamOperator);
}
public KeyDataStream(DataStream<T> input, StreamOperator streamOperator) {
super(input, streamOperator);
this.partition = new KeyPartition();
}
/**
* Apply a reduce function to this stream.
*
* @param reduceFunction The reduce function.
* @return A new DataStream.
*/
public DataStream<T> reduce(ReduceFunction reduceFunction) {
return new DataStream<>(this, new ReduceOperator(reduceFunction));
}
/**
* Apply an aggregate Function to this stream.
*
* @param aggregateFunction The aggregate function
* @param <A> The type of aggregated intermediate data.
* @param <O> The type of result data.
* @return A new DataStream.
*/
public <A, O> DataStream<O> aggregate(AggregateFunction<T, A, O> aggregateFunction) {
return new DataStream<>(this, null);
}
public KeyDataStream<K, T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}

View file

@ -0,0 +1,71 @@
package org.ray.streaming.api.stream;
import java.io.Serializable;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.operator.StreamOperator;
/**
* Abstract base class of all stream types.
*
* @param <T> Type of the data in the stream.
*/
public abstract class Stream<T> implements Serializable {
protected int id;
protected int parallelism = 1;
protected StreamOperator operator;
protected Stream<T> inputStream;
protected StreamingContext streamingContext;
protected Partition<T> partition;
public Stream(StreamingContext streamingContext, StreamOperator streamOperator) {
this.streamingContext = streamingContext;
this.operator = streamOperator;
this.id = streamingContext.generateId();
this.partition = new RoundRobinPartition<>();
}
public Stream(Stream<T> inputStream, StreamOperator streamOperator) {
this.inputStream = inputStream;
this.parallelism = inputStream.getParallelism();
this.streamingContext = this.inputStream.getStreamingContext();
this.operator = streamOperator;
this.id = streamingContext.generateId();
this.partition = new RoundRobinPartition<>();
}
public Stream<T> getInputStream() {
return inputStream;
}
public StreamOperator getOperator() {
return operator;
}
public StreamingContext getStreamingContext() {
return streamingContext;
}
public int getParallelism() {
return parallelism;
}
public Stream<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
public int getId() {
return id;
}
public Partition<T> getPartition() {
return partition;
}
public void setPartition(Partition<T> partition) {
this.partition = partition;
}
}

View file

@ -0,0 +1,21 @@
package org.ray.streaming.api.stream;
import org.ray.streaming.operator.impl.SinkOperator;
/**
* Represents a sink of the DataStream.
*
* @param <T> Type of the input data of this sink.
*/
public class StreamSink<T> extends Stream<T> {
public StreamSink(DataStream<T> input, SinkOperator sinkOperator) {
super(input, sinkOperator);
this.streamingContext.addSink(this);
}
public StreamSink<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}

View file

@ -0,0 +1,35 @@
package org.ray.streaming.api.stream;
import java.util.Collection;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.internal.CollectionSourceFunction;
import org.ray.streaming.operator.impl.SourceOperator;
/**
* Represents a source of the DataStream.
*
* @param <T> The type of StreamSource data.
*/
public class StreamSource<T> extends DataStream<T> {
public StreamSource(StreamingContext streamingContext, SourceOperator sourceOperator) {
super(streamingContext, sourceOperator);
}
/**
* Build a StreamSource source from a collection.
*
* @param context Stream context.
* @param values A collection of values.
* @param <T> The type of source data.
* @return A StreamSource.
*/
public static <T> StreamSource<T> buildSource(StreamingContext context, Collection<T> values) {
return new StreamSource(context, new SourceOperator(new CollectionSourceFunction(values)));
}
public StreamSource<T> setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
}

View file

@ -0,0 +1,25 @@
package org.ray.streaming.api.stream;
import java.util.ArrayList;
import java.util.List;
import org.ray.streaming.operator.StreamOperator;
/**
* Represents a union DataStream.
*
* @param <T> The type of union data.
*/
public class UnionStream<T> extends DataStream<T> {
private List<DataStream> unionStreams;
public UnionStream(DataStream input, StreamOperator streamOperator, DataStream<T> other) {
super(input, streamOperator);
this.unionStreams = new ArrayList<>();
this.unionStreams.add(other);
}
public List<DataStream> getUnionStreams() {
return unionStreams;
}
}

View file

@ -0,0 +1,20 @@
package org.ray.streaming.cluster;
import java.util.ArrayList;
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.streaming.core.runtime.StreamWorker;
public class ResourceManager {
public List<RayActor<StreamWorker>> createWorker(int workerNum) {
List<RayActor<StreamWorker>> workers = new ArrayList<>();
for (int i = 0; i < workerNum; i++) {
RayActor<StreamWorker> worker = Ray.createActor(StreamWorker::new);
workers.add(worker);
}
return workers;
}
}

View file

@ -0,0 +1,20 @@
package org.ray.streaming.core.command;
import java.io.Serializable;
public class BatchInfo implements Serializable {
private long batchId;
public BatchInfo(long batchId) {
this.batchId = batchId;
}
public long getBatchId() {
return batchId;
}
public void setBatchId(long batchId) {
this.batchId = batchId;
}
}

View file

@ -0,0 +1,48 @@
package org.ray.streaming.core.graph;
import java.io.Serializable;
import org.ray.streaming.api.partition.Partition;
/**
* An edge in the physical execution graph.
*/
public class ExecutionEdge implements Serializable {
private int srcNodeId;
private int targetNodeId;
private Partition partition;
public ExecutionEdge(int srcNodeId, int targetNodeId, Partition partition) {
this.srcNodeId = srcNodeId;
this.targetNodeId = targetNodeId;
this.partition = partition;
}
public int getSrcNodeId() {
return srcNodeId;
}
public void setSrcNodeId(int srcNodeId) {
this.srcNodeId = srcNodeId;
}
public int getTargetNodeId() {
return targetNodeId;
}
public void setTargetNodeId(int targetNodeId) {
this.targetNodeId = targetNodeId;
}
public Partition getPartition() {
return partition;
}
public void setPartition(Partition partition) {
this.partition = partition;
}
public String getStream() {
return "stream:" + srcNodeId + "-" + targetNodeId;
}
}

View file

@ -0,0 +1,73 @@
package org.ray.streaming.core.graph;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.api.RayActor;
import org.ray.streaming.core.runtime.StreamWorker;
/**
* Physical execution graph.
*/
public class ExecutionGraph implements Serializable {
private List<ExecutionNode> executionNodeList;
public ExecutionGraph(List<ExecutionNode> executionNodes) {
this.executionNodeList = executionNodes;
}
public void addExectionNode(ExecutionNode executionNode) {
this.executionNodeList.add(executionNode);
}
public List<ExecutionNode> getExecutionNodeList() {
return executionNodeList;
}
public ExecutionTask getExecutionTaskByTaskId(int taskId) {
for (ExecutionNode executionNode : executionNodeList) {
for (ExecutionTask executionTask : executionNode.getExecutionTaskList()) {
if (executionTask.getTaskId() == taskId) {
return executionTask;
}
}
}
throw new RuntimeException("Task " + taskId + " does not exist!");
}
public ExecutionNode getExecutionNodeByNodeId(int nodeId) {
for (ExecutionNode executionNode : executionNodeList) {
if (executionNode.getNodeId() == nodeId) {
return executionNode;
}
}
throw new RuntimeException("Node " + nodeId + " does not exist!");
}
public ExecutionNode getExecutionNodeByTaskId(int taskId) {
for (ExecutionNode executionNode : executionNodeList) {
for (ExecutionTask executionTask : executionNode.getExecutionTaskList()) {
if (executionTask.getTaskId() == taskId) {
return executionNode;
}
}
}
throw new RuntimeException("Task " + taskId + " does not exist!");
}
public Map<Integer, RayActor<StreamWorker>> getTaskId2WorkerByNodeId(int nodeId) {
for (ExecutionNode executionNode : executionNodeList) {
if (executionNode.getNodeId() == nodeId) {
Map<Integer, RayActor<StreamWorker>> taskId2Worker = new HashMap<>();
for (ExecutionTask executionTask : executionNode.getExecutionTaskList()) {
taskId2Worker.put(executionTask.getTaskId(), executionTask.getWorker());
}
return taskId2Worker;
}
}
throw new RuntimeException("Node " + nodeId + " does not exist!");
}
}

View file

@ -0,0 +1,98 @@
package org.ray.streaming.core.graph;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.ray.streaming.core.processor.StreamProcessor;
import org.ray.streaming.plan.VertexType;
/**
* A node in the physical execution graph.
*/
public class ExecutionNode implements Serializable {
private int nodeId;
private int parallelism;
private NodeType nodeType;
private StreamProcessor streamProcessor;
private List<ExecutionTask> executionTaskList;
private List<ExecutionEdge> executionEdgeList;
public ExecutionNode(int nodeId, int parallelism) {
this.nodeId = nodeId;
this.parallelism = parallelism;
this.executionTaskList = new ArrayList<>();
this.executionEdgeList = new ArrayList<>();
}
public int getNodeId() {
return nodeId;
}
public void setNodeId(int nodeId) {
this.nodeId = nodeId;
}
public int getParallelism() {
return parallelism;
}
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public List<ExecutionTask> getExecutionTaskList() {
return executionTaskList;
}
public void setExecutionTaskList(List<ExecutionTask> executionTaskList) {
this.executionTaskList = executionTaskList;
}
public List<ExecutionEdge> getExecutionEdgeList() {
return executionEdgeList;
}
public void setExecutionEdgeList(List<ExecutionEdge> executionEdgeList) {
this.executionEdgeList = executionEdgeList;
}
public void addExecutionEdge(ExecutionEdge executionEdge) {
this.executionEdgeList.add(executionEdge);
}
public StreamProcessor getStreamProcessor() {
return streamProcessor;
}
public void setStreamProcessor(StreamProcessor streamProcessor) {
this.streamProcessor = streamProcessor;
}
public NodeType getNodeType() {
return nodeType;
}
public void setNodeType(VertexType vertexType) {
switch (vertexType) {
case MASTER:
this.nodeType = NodeType.MASTER;
break;
case SOURCE:
this.nodeType = NodeType.SOURCE;
break;
case SINK:
this.nodeType = NodeType.SINK;
break;
default:
this.nodeType = NodeType.PROCESS;
}
}
public enum NodeType {
MASTER,
SOURCE,
PROCESS,
SINK,
}
}

View file

@ -0,0 +1,47 @@
package org.ray.streaming.core.graph;
import java.io.Serializable;
import org.ray.api.RayActor;
import org.ray.streaming.core.runtime.StreamWorker;
/**
* ExecutionTask is minimal execution unit.
*
* An ExecutionNode has n ExecutionTasks if parallelism is n.
*/
public class ExecutionTask implements Serializable {
private int taskId;
private int taskIndex;
private RayActor<StreamWorker> worker;
public ExecutionTask(int taskId, int taskIndex, RayActor<StreamWorker> worker) {
this.taskId = taskId;
this.taskIndex = taskIndex;
this.worker = worker;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public int getTaskIndex() {
return taskIndex;
}
public void setTaskIndex(int taskIndex) {
this.taskIndex = taskIndex;
}
public RayActor<StreamWorker> getWorker() {
return worker;
}
public void setWorker(RayActor<StreamWorker> worker) {
this.worker = worker;
}
}

View file

@ -0,0 +1,101 @@
package org.ray.streaming.core.processor;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.core.command.BatchInfo;
import org.ray.streaming.core.graph.ExecutionGraph;
import org.ray.streaming.core.graph.ExecutionNode;
import org.ray.streaming.core.graph.ExecutionNode.NodeType;
import org.ray.streaming.core.graph.ExecutionTask;
import org.ray.streaming.core.runtime.context.RuntimeContext;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.impl.MasterOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MasterProcessor is responsible for overall control logic.
*/
public class MasterProcessor extends StreamProcessor<BatchInfo, MasterOperator> {
private static final Logger LOGGER = LoggerFactory.getLogger(MasterProcessor.class);
private Thread batchControllerThread;
private long maxBatch;
public MasterProcessor(MasterOperator masterOperator) {
super(masterOperator);
}
public void open(List<Collector> collectors, RuntimeContext runtimeContext,
ExecutionGraph executionGraph) {
super.open(collectors, runtimeContext);
this.maxBatch = runtimeContext.getMaxBatch();
startBatchController(executionGraph);
}
private void startBatchController(ExecutionGraph executionGraph) {
BatchController batchController = new BatchController(maxBatch, collectors);
List<Integer> sinkTasks = new ArrayList<>();
for (ExecutionNode executionNode : executionGraph.getExecutionNodeList()) {
if (executionNode.getNodeType() == NodeType.SINK) {
List<Integer> nodeTasks = executionNode.getExecutionTaskList().stream()
.map(ExecutionTask::getTaskId).collect(Collectors.toList());
sinkTasks.addAll(nodeTasks);
}
}
batchControllerThread = new Thread(batchController, "controller-thread");
batchControllerThread.start();
}
@Override
public void process(BatchInfo executionGraph) {
}
@Override
public void close() {
}
static class BatchController implements Runnable, Serializable {
private AtomicInteger batchId;
private List<Collector> collectors;
private Map<Integer, Integer> sinkBatchMap;
private Integer frequency;
private long maxBatch;
public BatchController(long maxBatch, List<Collector> collectors) {
this.batchId = new AtomicInteger(0);
this.maxBatch = maxBatch;
this.collectors = collectors;
// TODO(zhenxuanpan): Use config to set.
this.frequency = 1000;
}
@Override
public void run() {
while (batchId.get() < maxBatch) {
try {
Record record = new Record<>(new BatchInfo(batchId.getAndIncrement()));
for (Collector collector : collectors) {
collector.collect(record);
}
Thread.sleep(frequency);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}
}
}

View file

@ -0,0 +1,29 @@
package org.ray.streaming.core.processor;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OneInputProcessor<T> extends StreamProcessor<Record<T>, OneInputOperator<T>> {
private static final Logger LOGGER = LoggerFactory.getLogger(OneInputProcessor.class);
public OneInputProcessor(OneInputOperator<T> operator) {
super(operator);
}
@Override
public void process(Record<T> record) {
try {
this.operator.processElement(record);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
this.operator.close();
}
}

View file

@ -0,0 +1,33 @@
package org.ray.streaming.core.processor;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.OperatorType;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.operator.TwoInputOperator;
import org.ray.streaming.operator.impl.SourceOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProcessBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessBuilder.class);
public static StreamProcessor buildProcessor(StreamOperator streamOperator) {
OperatorType type = streamOperator.getOpType();
LOGGER.info("Building StreamProcessor, operator type = {}, operator = {}.", type,
streamOperator.getClass().getSimpleName().toString());
switch (type) {
case MASTER:
return new MasterProcessor(null);
case SOURCE:
return new SourceProcessor<>((SourceOperator) streamOperator);
case ONE_INPUT:
return new OneInputProcessor<>((OneInputOperator) streamOperator);
case TWO_INPUT:
return new TwoInputProcessor((TwoInputOperator) streamOperator);
default:
throw new RuntimeException("current operator type is not support");
}
}
}

View file

@ -0,0 +1,15 @@
package org.ray.streaming.core.processor;
import java.io.Serializable;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.core.runtime.context.RuntimeContext;
public interface Processor<T> extends Serializable {
void open(List<Collector> collectors, RuntimeContext runtimeContext);
void process(T t);
void close();
}

View file

@ -0,0 +1,25 @@
package org.ray.streaming.core.processor;
import org.ray.streaming.operator.impl.SourceOperator;
/**
* The processor for the stream sources, containing a SourceOperator.
*
* @param <T> The type of source data.
*/
public class SourceProcessor<T> extends StreamProcessor<Long, SourceOperator<T>> {
public SourceProcessor(SourceOperator<T> operator) {
super(operator);
}
@Override
public void process(Long batchId) {
this.operator.process(batchId);
}
@Override
public void close() {
}
}

View file

@ -0,0 +1,33 @@
package org.ray.streaming.core.processor;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.core.runtime.context.RuntimeContext;
import org.ray.streaming.operator.Operator;
/**
* StreamingProcessor is a process unit for a operator.
*
* @param <T> The type of process data.
* @param <P> Type of the specific operator class.
*/
public abstract class StreamProcessor<T, P extends Operator> implements Processor<T> {
protected List<Collector> collectors;
protected RuntimeContext runtimeContext;
protected P operator;
public StreamProcessor(P operator) {
this.operator = operator;
}
@Override
public void open(List<Collector> collectors, RuntimeContext runtimeContext) {
this.collectors = collectors;
this.runtimeContext = runtimeContext;
if (operator != null) {
this.operator.open(collectors, runtimeContext);
}
}
}

View file

@ -0,0 +1,37 @@
package org.ray.streaming.core.processor;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.TwoInputOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TwoInputProcessor<T, O> extends StreamProcessor<Record, TwoInputOperator<T, O>> {
private static final Logger LOGGER = LoggerFactory.getLogger(TwoInputProcessor.class);
// TODO(zhenxuanpan): Set leftStream and rightStream.
private String leftStream;
private String rigthStream;
public TwoInputProcessor(TwoInputOperator<T, O> operator) {
super(operator);
}
@Override
public void process(Record record) {
try {
if (record.getStream().equals(leftStream)) {
this.operator.processElement(record, null);
} else {
this.operator.processElement(null, record);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
this.operator.close();
}
}

View file

@ -0,0 +1,86 @@
package org.ray.streaming.core.runtime;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.ray.api.annotation.RayRemote;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.core.command.BatchInfo;
import org.ray.streaming.core.graph.ExecutionEdge;
import org.ray.streaming.core.graph.ExecutionGraph;
import org.ray.streaming.core.graph.ExecutionNode;
import org.ray.streaming.core.graph.ExecutionNode.NodeType;
import org.ray.streaming.core.graph.ExecutionTask;
import org.ray.streaming.core.processor.MasterProcessor;
import org.ray.streaming.core.processor.StreamProcessor;
import org.ray.streaming.core.runtime.collector.RayCallCollector;
import org.ray.streaming.core.runtime.context.RayRuntimeContext;
import org.ray.streaming.core.runtime.context.RuntimeContext;
import org.ray.streaming.core.runtime.context.WorkerContext;
import org.ray.streaming.message.Message;
import org.ray.streaming.message.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The stream worker, it is a ray actor.
*/
@RayRemote
public class StreamWorker implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamWorker.class);
private int taskId;
private WorkerContext workerContext;
private StreamProcessor streamProcessor;
private NodeType nodeType;
public StreamWorker() {
}
public Boolean init(WorkerContext workerContext) {
this.workerContext = workerContext;
this.taskId = workerContext.getTaskId();
ExecutionGraph executionGraph = this.workerContext.getExecutionGraph();
ExecutionTask executionTask = executionGraph.getExecutionTaskByTaskId(taskId);
ExecutionNode executionNode = executionGraph.getExecutionNodeByTaskId(taskId);
this.nodeType = executionNode.getNodeType();
this.streamProcessor = executionNode.getStreamProcessor();
LOGGER.debug("Initializing StreamWorker, taskId: {}, operator: {}.", taskId, streamProcessor);
List<ExecutionEdge> executionEdges = executionNode.getExecutionEdgeList();
List<Collector> collectors = new ArrayList<>();
for (ExecutionEdge executionEdge : executionEdges) {
collectors.add(new RayCallCollector(taskId, executionEdge, executionGraph));
}
RuntimeContext runtimeContext = new RayRuntimeContext(executionTask, workerContext.getConfig(),
executionNode.getParallelism());
if (this.nodeType == NodeType.MASTER) {
((MasterProcessor) streamProcessor).open(collectors, runtimeContext, executionGraph);
} else {
this.streamProcessor.open(collectors, runtimeContext);
}
return true;
}
public Boolean process(Message message) {
LOGGER.debug("Processing message, taskId: {}, message: {}.", taskId, message);
if (nodeType == NodeType.SOURCE) {
Record record = message.getRecord(0);
BatchInfo batchInfo = (BatchInfo) record.getValue();
this.streamProcessor.process(batchInfo.getBatchId());
} else {
List<Record> records = message.getRecordList();
for (Record record : records) {
record.setBatchId(message.getBatchId());
record.setStream(message.getStream());
this.streamProcessor.process(record);
}
}
return true;
}
}

View file

@ -0,0 +1,26 @@
package org.ray.streaming.core.runtime.collector;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.message.Record;
/**
* Combination of multiple collectors.
*
* @param <T> The type of output data.
*/
public class CollectionCollector<T> implements Collector<T> {
private List<Collector> collectorList;
public CollectionCollector(List<Collector> collectorList) {
this.collectorList = collectorList;
}
@Override
public void collect(T value) {
for (Collector collector : collectorList) {
collector.collect(new Record(value));
}
}
}

View file

@ -0,0 +1,58 @@
package org.ray.streaming.core.runtime.collector;
import java.util.Arrays;
import java.util.Map;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.partition.Partition;
import org.ray.streaming.core.graph.ExecutionEdge;
import org.ray.streaming.core.graph.ExecutionGraph;
import org.ray.streaming.core.runtime.StreamWorker;
import org.ray.streaming.message.Message;
import org.ray.streaming.message.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The collector that emits data via Ray remote calls.
*/
public class RayCallCollector implements Collector<Record> {
private static final Logger LOGGER = LoggerFactory.getLogger(RayCallCollector.class);
private int taskId;
private String stream;
private Map<Integer, RayActor<StreamWorker>> taskId2Worker;
private int[] targetTaskIds;
private Partition partition;
public RayCallCollector(int taskId, ExecutionEdge executionEdge, ExecutionGraph executionGraph) {
this.taskId = taskId;
this.stream = executionEdge.getStream();
int targetNodeId = executionEdge.getTargetNodeId();
taskId2Worker = executionGraph
.getTaskId2WorkerByNodeId(targetNodeId);
targetTaskIds = Arrays.stream(taskId2Worker.keySet()
.toArray(new Integer[taskId2Worker.size()]))
.mapToInt(Integer::valueOf).toArray();
this.partition = executionEdge.getPartition();
LOGGER.debug("RayCallCollector constructed, taskId:{}, add stream:{}, partition:{}.",
taskId, stream, this.partition);
}
@Override
public void collect(Record record) {
int[] taskIds = this.partition.partition(record, targetTaskIds);
LOGGER.debug("Sending data from task {} to remote tasks {}, collector stream:{}, record:{}",
taskId, taskIds, stream, record);
Message message = new Message(taskId, record.getBatchId(), stream, record);
for (int targetTaskId : taskIds) {
RayActor<StreamWorker> streamWorker = this.taskId2Worker.get(targetTaskId);
// Use ray call to send message to downstream actor.
Ray.call(StreamWorker::process, streamWorker, message);
}
}
}

View file

@ -0,0 +1,58 @@
package org.ray.streaming.core.runtime.context;
import static org.ray.streaming.util.ConfigKey.STREAMING_MAX_BATCH_COUNT;
import java.util.Map;
import org.ray.streaming.core.graph.ExecutionTask;
/**
* Use Ray to implement RuntimeContext.
*/
public class RayRuntimeContext implements RuntimeContext {
private int taskId;
private int taskIndex;
private int parallelism;
private Long batchId;
private Map<String, Object> config;
public RayRuntimeContext(ExecutionTask executionTask, Map<String, Object> config,
int parallelism) {
this.taskId = executionTask.getTaskId();
this.config = config;
this.taskIndex = executionTask.getTaskIndex();
this.parallelism = parallelism;
}
@Override
public int getTaskId() {
return taskId;
}
@Override
public int getTaskIndex() {
return taskIndex;
}
@Override
public int getParallelism() {
return parallelism;
}
@Override
public Long getBatchId() {
return batchId;
}
@Override
public Long getMaxBatch() {
if (config.containsKey(STREAMING_MAX_BATCH_COUNT)) {
return Long.valueOf(String.valueOf(config.get(STREAMING_MAX_BATCH_COUNT)));
}
return Long.MAX_VALUE;
}
public void setBatchId(Long batchId) {
this.batchId = batchId;
}
}

View file

@ -0,0 +1,19 @@
package org.ray.streaming.core.runtime.context;
/**
* Encapsulate the runtime information of a streaming task.
*/
public interface RuntimeContext {
int getTaskId();
int getTaskIndex();
int getParallelism();
Long getBatchId();
Long getMaxBatch();
}

View file

@ -0,0 +1,41 @@
package org.ray.streaming.core.runtime.context;
import java.io.Serializable;
import java.util.Map;
import org.ray.streaming.core.graph.ExecutionGraph;
/**
* Encapsulate the context information for worker initialization.
*/
public class WorkerContext implements Serializable {
private int taskId;
private ExecutionGraph executionGraph;
private Map<String, Object> config;
public WorkerContext(int taskId, ExecutionGraph executionGraph, Map<String, Object> jobConfig) {
this.taskId = taskId;
this.executionGraph = executionGraph;
this.config = jobConfig;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public ExecutionGraph getExecutionGraph() {
return executionGraph;
}
public void setExecutionGraph(ExecutionGraph executionGraph) {
this.executionGraph = executionGraph;
}
public Map<String, Object> getConfig() {
return config;
}
}

View file

@ -0,0 +1,20 @@
package org.ray.streaming.message;
public class KeyRecord<K, T> extends Record<T> {
private K key;
public KeyRecord(K key, T value) {
super(value);
this.key = key;
}
public K getKey() {
return key;
}
public void setKey(K key) {
this.key = key;
}
}

View file

@ -0,0 +1,64 @@
package org.ray.streaming.message;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;
public class Message implements Serializable {
private int taskId;
private long batchId;
private String stream;
private List<Record> recordList;
public Message(int taskId, long batchId, String stream, List<Record> recordList) {
this.taskId = taskId;
this.batchId = batchId;
this.stream = stream;
this.recordList = recordList;
}
public Message(int taskId, long batchId, String stream, Record record) {
this.taskId = taskId;
this.batchId = batchId;
this.stream = stream;
this.recordList = Lists.newArrayList(record);
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public long getBatchId() {
return batchId;
}
public void setBatchId(long batchId) {
this.batchId = batchId;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public List<Record> getRecordList() {
return recordList;
}
public void setRecordList(List<Record> recordList) {
this.recordList = recordList;
}
public Record getRecord(int index) {
return recordList.get(0);
}
}

View file

@ -0,0 +1,50 @@
package org.ray.streaming.message;
import java.io.Serializable;
public class Record<T> implements Serializable {
protected transient String stream;
protected transient long batchId;
protected T value;
public Record(T value) {
this.value = value;
}
public Record(long batchId, T value) {
this.batchId = batchId;
this.value = value;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public long getBatchId() {
return batchId;
}
public void setBatchId(long batchId) {
this.batchId = batchId;
}
@Override
public String toString() {
return value.toString();
}
}

View file

@ -0,0 +1,13 @@
package org.ray.streaming.operator;
import org.ray.streaming.message.Record;
public interface OneInputOperator<T> extends Operator {
void processElement(Record<T> record) throws Exception;
default OperatorType getOpType() {
return OperatorType.ONE_INPUT;
}
}

View file

@ -0,0 +1,17 @@
package org.ray.streaming.operator;
import java.io.Serializable;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.core.runtime.context.RuntimeContext;
public interface Operator extends Serializable {
void open(List<Collector> collectors, RuntimeContext runtimeContext);
void finish();
void close();
OperatorType getOpType();
}

View file

@ -0,0 +1,9 @@
package org.ray.streaming.operator;
public enum OperatorType {
MASTER,
SOURCE,
ONE_INPUT,
TWO_INPUT,
}

View file

@ -0,0 +1,47 @@
package org.ray.streaming.operator;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.function.Function;
import org.ray.streaming.core.runtime.context.RuntimeContext;
import org.ray.streaming.message.KeyRecord;
import org.ray.streaming.message.Record;
public abstract class StreamOperator<F extends Function> implements Operator {
protected F function;
protected List<Collector> collectorList;
protected RuntimeContext runtimeContext;
public StreamOperator(F function) {
this.function = function;
}
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
this.collectorList = collectorList;
this.runtimeContext = runtimeContext;
}
public void finish() {
}
public void close() {
}
protected void collect(Record record) {
for (Collector collector : this.collectorList) {
collector.collect(record);
}
}
protected void collect(KeyRecord keyRecord) {
for (Collector collector : this.collectorList) {
collector.collect(keyRecord);
}
}
}

View file

@ -0,0 +1,13 @@
package org.ray.streaming.operator;
import org.ray.streaming.message.Record;
public interface TwoInputOperator<T, O> extends Operator {
void processElement(Record<T> record1, Record<O> record2);
default OperatorType getOpType() {
return OperatorType.TWO_INPUT;
}
}

View file

@ -0,0 +1,31 @@
package org.ray.streaming.operator.impl;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.core.runtime.collector.CollectionCollector;
import org.ray.streaming.core.runtime.context.RuntimeContext;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class FlatMapOperator<T, R> extends StreamOperator<FlatMapFunction<T, R>> implements
OneInputOperator<T> {
private CollectionCollector collectionCollector;
public FlatMapOperator(FlatMapFunction<T, R> flatMapFunction) {
super(flatMapFunction);
}
@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
super.open(collectorList, runtimeContext);
this.collectionCollector = new CollectionCollector(collectorList);
}
@Override
public void processElement(Record<T> record) throws Exception {
this.function.flatMap(record.getValue(), (Collector<R>) collectionCollector);
}
}

View file

@ -0,0 +1,22 @@
package org.ray.streaming.operator.impl;
import org.ray.streaming.api.function.impl.KeyFunction;
import org.ray.streaming.message.KeyRecord;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class KeyByOperator<T, K> extends StreamOperator<KeyFunction<T, K>> implements
OneInputOperator<T> {
public KeyByOperator(KeyFunction<T, K> keyFunction) {
super(keyFunction);
}
@Override
public void processElement(Record<T> record) throws Exception {
K key = this.function.keyBy(record.getValue());
collect(new KeyRecord<>(key, record.getValue()));
}
}

View file

@ -0,0 +1,20 @@
package org.ray.streaming.operator.impl;
import org.ray.streaming.api.function.impl.MapFunction;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class MapOperator<T, R> extends StreamOperator<MapFunction<T, R>> implements
OneInputOperator<T> {
public MapOperator(MapFunction<T, R> mapFunction) {
super(mapFunction);
}
@Override
public void processElement(Record<T> record) throws Exception {
this.collect(new Record<R>(this.function.map(record.getValue())));
}
}

View file

@ -0,0 +1,17 @@
package org.ray.streaming.operator.impl;
import org.ray.streaming.operator.OperatorType;
import org.ray.streaming.operator.StreamOperator;
public class MasterOperator extends StreamOperator {
public MasterOperator() {
super(null);
}
@Override
public OperatorType getOpType() {
return OperatorType.MASTER;
}
}

View file

@ -0,0 +1,44 @@
package org.ray.streaming.operator.impl;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.core.runtime.context.RuntimeContext;
import org.ray.streaming.message.KeyRecord;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class ReduceOperator<K, T> extends StreamOperator<ReduceFunction<T>> implements
OneInputOperator<T> {
private Map<K, T> reduceState;
public ReduceOperator(ReduceFunction<T> reduceFunction) {
super(reduceFunction);
}
@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
super.open(collectorList, runtimeContext);
this.reduceState = new HashMap<>();
}
@Override
public void processElement(Record<T> record) throws Exception {
KeyRecord<K, T> keyRecord = (KeyRecord<K, T>) record;
K key = keyRecord.getKey();
T value = keyRecord.getValue();
if (reduceState.containsKey(key)) {
T oldValue = reduceState.get(key);
T newValue = this.function.reduce(oldValue, value);
reduceState.put(key, newValue);
collect(new Record(newValue));
} else {
reduceState.put(key, value);
collect(record);
}
}
}

View file

@ -0,0 +1,20 @@
package org.ray.streaming.operator.impl;
import org.ray.streaming.api.function.impl.SinkFunction;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OneInputOperator;
import org.ray.streaming.operator.StreamOperator;
public class SinkOperator<T> extends StreamOperator<SinkFunction<T>> implements
OneInputOperator<T> {
public SinkOperator(SinkFunction<T> sinkFunction) {
super(sinkFunction);
}
@Override
public void processElement(Record<T> record) throws Exception {
this.function.sink(record.getValue());
}
}

View file

@ -0,0 +1,61 @@
package org.ray.streaming.operator.impl;
import java.util.List;
import org.ray.streaming.api.collector.Collector;
import org.ray.streaming.api.function.impl.SourceFunction;
import org.ray.streaming.api.function.impl.SourceFunction.SourceContext;
import org.ray.streaming.core.runtime.context.RuntimeContext;
import org.ray.streaming.message.Record;
import org.ray.streaming.operator.OperatorType;
import org.ray.streaming.operator.StreamOperator;
public class SourceOperator<T> extends StreamOperator<SourceFunction<T>> {
private SourceContextImpl sourceContext;
public SourceOperator(SourceFunction<T> function) {
super(function);
}
@Override
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
super.open(collectorList, runtimeContext);
this.sourceContext = new SourceContextImpl(collectorList);
}
public void process(Long batchId) {
try {
this.sourceContext.setBatchId(batchId);
this.function.fetch(batchId, this.sourceContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public OperatorType getOpType() {
return OperatorType.SOURCE;
}
class SourceContextImpl implements SourceContext<T> {
private long batchId;
private List<Collector> collectors;
public SourceContextImpl(List<Collector> collectors) {
this.collectors = collectors;
}
@Override
public void collect(T t) throws Exception {
for (Collector collector : collectors) {
collector.collect(new Record(batchId, t));
}
}
private void setBatchId(long batchId) {
this.batchId = batchId;
}
}
}

View file

@ -0,0 +1,58 @@
package org.ray.streaming.plan;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The logical execution plan.
*/
public class Plan implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(Plan.class);
private List<PlanVertex> planVertexList;
private List<PlanEdge> planEdgeList;
public Plan() {
this.planVertexList = new ArrayList<>();
this.planEdgeList = new ArrayList<>();
}
public void addVertex(PlanVertex vertex) {
this.planVertexList.add(vertex);
}
public void addEdge(PlanEdge planEdge) {
this.planEdgeList.add(planEdge);
}
public List<PlanVertex> getPlanVertexList() {
return planVertexList;
}
public List<PlanEdge> getPlanEdgeList() {
return planEdgeList;
}
public String getGraphVizPlan() {
return "";
}
public void printPlan() {
if (!LOGGER.isInfoEnabled()) {
return;
}
LOGGER.info("Printing logic plan:");
for (PlanVertex planVertex : planVertexList) {
LOGGER.info(planVertex.toString());
}
for (PlanEdge planEdge : planEdgeList) {
LOGGER.info(planEdge.toString());
}
}
}

View file

@ -0,0 +1,62 @@
package org.ray.streaming.plan;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.Stream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.operator.StreamOperator;
public class PlanBuilder {
private Plan plan;
private AtomicInteger edgeIdGenerator;
private List<StreamSink> streamSinkList;
public PlanBuilder(List<StreamSink> streamSinkList) {
this.plan = new Plan();
this.streamSinkList = streamSinkList;
this.edgeIdGenerator = new AtomicInteger(0);
}
public Plan buildPlan() {
for (StreamSink streamSink : streamSinkList) {
processStream(streamSink);
}
return this.plan;
}
private void processStream(Stream stream) {
int vertexId = stream.getId();
int parallelism = stream.getParallelism();
StreamOperator streamOperator = stream.getOperator();
PlanVertex planVertex = null;
if (stream instanceof StreamSink) {
planVertex = new PlanVertex(vertexId, parallelism, VertexType.SINK, streamOperator);
Stream parentStream = stream.getInputStream();
int inputVertexId = parentStream.getId();
PlanEdge planEdge = new PlanEdge(inputVertexId, vertexId, parentStream.getPartition());
this.plan.addEdge(planEdge);
processStream(parentStream);
} else if (stream instanceof StreamSource) {
planVertex = new PlanVertex(vertexId, parallelism, VertexType.SOURCE, streamOperator);
} else if (stream instanceof DataStream) {
planVertex = new PlanVertex(vertexId, parallelism, VertexType.PROCESS, streamOperator);
Stream parentStream = stream.getInputStream();
int inputVertexId = parentStream.getId();
PlanEdge planEdge = new PlanEdge(inputVertexId, vertexId, parentStream.getPartition());
this.plan.addEdge(planEdge);
processStream(parentStream);
}
this.plan.addVertex(planVertex);
}
private int getEdgeId() {
return this.edgeIdGenerator.incrementAndGet();
}
}

View file

@ -0,0 +1,50 @@
package org.ray.streaming.plan;
import java.io.Serializable;
import org.ray.streaming.api.partition.Partition;
/**
* PlanEdge is connection and partition rules of upstream and downstream execution nodes.
*/
public class PlanEdge implements Serializable {
private int srcVertexId;
private int targetVertexId;
private Partition partition;
public PlanEdge(int srcVertexId, int targetVertexId, Partition partition) {
this.srcVertexId = srcVertexId;
this.targetVertexId = targetVertexId;
this.partition = partition;
}
public int getSrcVertexId() {
return srcVertexId;
}
public void setSrcVertexId(int srcVertexId) {
this.srcVertexId = srcVertexId;
}
public int getTargetVertexId() {
return targetVertexId;
}
public void setTargetVertexId(int targetVertexId) {
this.targetVertexId = targetVertexId;
}
public Partition getPartition() {
return partition;
}
public void setPartition(Partition partition) {
this.partition = partition;
}
@Override
public String toString() {
return "Edge(" + "from:" + srcVertexId + "-" + targetVertexId + "-" + this.partition.getClass()
+ ")";
}
}

View file

@ -0,0 +1,49 @@
package org.ray.streaming.plan;
import java.io.Serializable;
import org.ray.streaming.operator.StreamOperator;
/**
* PlanVertex is a cell node where logic is executed.
*/
public class PlanVertex implements Serializable {
private int vertexId;
private int parallelism;
private VertexType vertexType;
private StreamOperator streamOperator;
public PlanVertex(int vertexId, int parallelism, VertexType vertexType,
StreamOperator streamOperator) {
this.vertexId = vertexId;
this.parallelism = parallelism;
this.vertexType = vertexType;
this.streamOperator = streamOperator;
}
public int getVertexId() {
return vertexId;
}
public int getParallelism() {
return parallelism;
}
public StreamOperator getStreamOperator() {
return streamOperator;
}
public VertexType getVertexType() {
return vertexType;
}
@Override
public String toString() {
return "PlanVertex{" +
"vertexId=" + vertexId +
", parallelism=" + parallelism +
", vertexType=" + vertexType +
", streamOperator=" + streamOperator +
'}';
}
}

View file

@ -0,0 +1,11 @@
package org.ray.streaming.plan;
/**
* Different roles for a node.
*/
public enum VertexType {
MASTER,
SOURCE,
PROCESS,
SINK,
}

View file

@ -0,0 +1,17 @@
package org.ray.streaming.schedule;
import org.ray.streaming.plan.Plan;
/**
* Interface of the job scheduler.
*/
public interface IJobSchedule {
/**
* Assign logical plan to physical execution graph, and schedule job to run.
*
* @param plan The logical plan.
*/
void schedule(Plan plan);
}

View file

@ -0,0 +1,20 @@
package org.ray.streaming.schedule;
import java.io.Serializable;
import java.util.List;
import org.ray.api.RayActor;
import org.ray.streaming.core.graph.ExecutionGraph;
import org.ray.streaming.core.runtime.StreamWorker;
import org.ray.streaming.plan.Plan;
/**
* Interface of the task assigning strategy.
*/
public interface ITaskAssign extends Serializable {
/**
* Assign logical plan to physical execution graph.
*/
ExecutionGraph assign(Plan plan, List<RayActor<StreamWorker>> workers);
}

View file

@ -0,0 +1,93 @@
package org.ray.streaming.schedule.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.streaming.api.partition.impl.BroadcastPartition;
import org.ray.streaming.cluster.ResourceManager;
import org.ray.streaming.core.graph.ExecutionGraph;
import org.ray.streaming.core.graph.ExecutionNode;
import org.ray.streaming.core.graph.ExecutionNode.NodeType;
import org.ray.streaming.core.graph.ExecutionTask;
import org.ray.streaming.core.runtime.StreamWorker;
import org.ray.streaming.core.runtime.context.WorkerContext;
import org.ray.streaming.operator.impl.MasterOperator;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanEdge;
import org.ray.streaming.plan.PlanVertex;
import org.ray.streaming.plan.VertexType;
import org.ray.streaming.schedule.IJobSchedule;
import org.ray.streaming.schedule.ITaskAssign;
public class JobScheduleImpl implements IJobSchedule {
private Plan plan;
private Map<String, Object> jobConfig;
private ResourceManager resourceManager;
private ITaskAssign taskAssign;
public JobScheduleImpl(Map<String, Object> jobConfig) {
this.resourceManager = new ResourceManager();
this.taskAssign = new TaskAssignImpl();
this.jobConfig = jobConfig;
}
/**
* Schedule physical plan to execution graph, and call streaming worker to init and run.
*/
@Override
public void schedule(Plan plan) {
this.plan = plan;
addJobMaster(plan);
List<RayActor<StreamWorker>> workers = this.resourceManager.createWorker(getPlanWorker());
ExecutionGraph executionGraph = this.taskAssign.assign(this.plan, workers);
List<ExecutionNode> executionNodes = executionGraph.getExecutionNodeList();
List<RayObject<Boolean>> waits = new ArrayList<>();
ExecutionTask masterTask = null;
for (ExecutionNode executionNode : executionNodes) {
List<ExecutionTask> executionTasks = executionNode.getExecutionTaskList();
for (ExecutionTask executionTask : executionTasks) {
if (executionNode.getNodeType() != NodeType.MASTER) {
Integer taskId = executionTask.getTaskId();
RayActor<StreamWorker> streamWorker = executionTask.getWorker();
waits.add(Ray.call(StreamWorker::init, streamWorker,
new WorkerContext(taskId, executionGraph, jobConfig)));
} else {
masterTask = executionTask;
}
}
}
Ray.wait(waits);
Integer masterId = masterTask.getTaskId();
RayActor<StreamWorker> masterWorker = masterTask.getWorker();
Ray.call(StreamWorker::init, masterWorker,
new WorkerContext(masterId, executionGraph, jobConfig)).get();
}
private void addJobMaster(Plan plan) {
int masterVertexId = 0;
int masterParallelism = 1;
PlanVertex masterVertex = new PlanVertex(masterVertexId, masterParallelism, VertexType.MASTER,
new MasterOperator());
plan.getPlanVertexList().add(masterVertex);
List<PlanVertex> planVertices = plan.getPlanVertexList();
for (PlanVertex planVertex : planVertices) {
if (planVertex.getVertexType() == VertexType.SOURCE) {
PlanEdge planEdge = new PlanEdge(masterVertexId, planVertex.getVertexId(),
new BroadcastPartition());
plan.getPlanEdgeList().add(planEdge);
}
}
}
private int getPlanWorker() {
List<PlanVertex> planVertexList = plan.getPlanVertexList();
return planVertexList.stream().map(vertex -> vertex.getParallelism()).reduce(0, Integer::sum);
}
}

View file

@ -0,0 +1,66 @@
package org.ray.streaming.schedule.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.ray.api.RayActor;
import org.ray.streaming.core.graph.ExecutionEdge;
import org.ray.streaming.core.graph.ExecutionGraph;
import org.ray.streaming.core.graph.ExecutionNode;
import org.ray.streaming.core.graph.ExecutionTask;
import org.ray.streaming.core.processor.ProcessBuilder;
import org.ray.streaming.core.processor.StreamProcessor;
import org.ray.streaming.core.runtime.StreamWorker;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanEdge;
import org.ray.streaming.plan.PlanVertex;
import org.ray.streaming.schedule.ITaskAssign;
public class TaskAssignImpl implements ITaskAssign {
/**
* Assign an optimized logical plan to execution graph.
*
* @param plan The logical plan.
* @param workers The worker actors.
* @return The physical execution graph.
*/
@Override
public ExecutionGraph assign(Plan plan, List<RayActor<StreamWorker>> workers) {
List<PlanVertex> planVertices = plan.getPlanVertexList();
List<PlanEdge> planEdges = plan.getPlanEdgeList();
int taskId = 0;
Map<Integer, ExecutionNode> idToExecutionNode = new HashMap<>();
for (PlanVertex planVertex : planVertices) {
ExecutionNode executionNode = new ExecutionNode(planVertex.getVertexId(),
planVertex.getParallelism());
executionNode.setNodeType(planVertex.getVertexType());
List<ExecutionTask> vertexTasks = new ArrayList<>();
for (int taskIndex = 0; taskIndex < planVertex.getParallelism(); taskIndex++) {
vertexTasks.add(new ExecutionTask(taskId, taskIndex, workers.get(taskId)));
taskId++;
}
StreamProcessor streamProcessor = ProcessBuilder
.buildProcessor(planVertex.getStreamOperator());
executionNode.setExecutionTaskList(vertexTasks);
executionNode.setStreamProcessor(streamProcessor);
idToExecutionNode.put(executionNode.getNodeId(), executionNode);
}
for (PlanEdge planEdge : planEdges) {
int srcNodeId = planEdge.getSrcVertexId();
int targetNodeId = planEdge.getTargetVertexId();
ExecutionEdge executionEdge = new ExecutionEdge(srcNodeId, targetNodeId,
planEdge.getPartition());
idToExecutionNode.get(srcNodeId).addExecutionEdge(executionEdge);
}
List<ExecutionNode> executionNodes = idToExecutionNode.values().stream()
.collect(Collectors.toList());
return new ExecutionGraph(executionNodes);
}
}

View file

@ -0,0 +1,10 @@
package org.ray.streaming.util;
public class ConfigKey {
/**
* Maximum number of batches to run in a streaming job.
*/
public static final String STREAMING_MAX_BATCH_COUNT = "streaming.max.batch.count";
}

View file

@ -0,0 +1,6 @@
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

View file

@ -0,0 +1,5 @@
ray {
run-mode = SINGLE_PROCESS
resources = "CPU:4,GPU:0"
redis.address = ""
}

View file

@ -0,0 +1,75 @@
package org.ray.streaming.demo;
import com.google.common.collect.ImmutableMap;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.function.impl.FlatMapFunction;
import org.ray.streaming.api.function.impl.ReduceFunction;
import org.ray.streaming.api.function.impl.SinkFunction;
import org.ray.streaming.api.stream.StreamSource;
import org.ray.streaming.util.ConfigKey;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class WordCountTest implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountTest.class);
// TODO(zhenxuanpan): this test only works in single-process mode, because we put
// results in this in-memory map.
static Map<String, Integer> wordCount = new ConcurrentHashMap<>();
@Test
public void testWordCount() {
StreamingContext streamingContext = StreamingContext.buildContext();
Map<String, Object> config = new HashMap<>();
config.put(ConfigKey.STREAMING_MAX_BATCH_COUNT, 1);
streamingContext.withConfig(config);
List<String> text = new ArrayList<>();
text.add("hello world eagle eagle eagle");
StreamSource<String> streamSource = StreamSource.buildSource(streamingContext, text);
streamSource
.flatMap((FlatMapFunction<String, WordAndCount>) (value, collector) -> {
String[] records = value.split(" ");
for (String record : records) {
collector.collect(new WordAndCount(record, 1));
}
})
.keyBy(pair -> pair.word)
.reduce((ReduceFunction<WordAndCount>) (oldValue, newValue) ->
new WordAndCount(oldValue.word, oldValue.count + newValue.count))
.sink((SinkFunction<WordAndCount>) result -> wordCount.put(result.word, result.count));
streamingContext.execute();
// Sleep until the count for every word is computed.
while (wordCount.size() < 3) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.warn("Got an exception while sleeping.", e);
}
}
Assert.assertEquals(wordCount, ImmutableMap.of("eagle", 3, "hello", 1, "world", 1));
}
private static class WordAndCount implements Serializable {
public final String word;
public final Integer count;
public WordAndCount(String key, Integer count) {
this.word = key;
this.count = count;
}
}
}

View file

@ -0,0 +1,87 @@
package org.ray.streaming.plan;
import com.google.common.collect.Lists;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.partition.impl.KeyPartition;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.api.stream.StreamSource;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class PlanBuilderTest {
private static final Logger LOGGER = LoggerFactory.getLogger(PlanBuilderTest.class);
@Test
public void testDataSync() {
Plan plan = buildDataSyncPlan();
List<PlanVertex> planVertexList = plan.getPlanVertexList();
List<PlanEdge> planEdgeList = plan.getPlanEdgeList();
Assert.assertEquals(planVertexList.size(), 2);
Assert.assertEquals(planEdgeList.size(), 1);
PlanEdge planEdge = planEdgeList.get(0);
Assert.assertEquals(planEdge.getPartition().getClass(), RoundRobinPartition.class);
PlanVertex sinkVertex = planVertexList.get(1);
PlanVertex sourceVertex = planVertexList.get(0);
Assert.assertEquals(sinkVertex.getVertexType(), VertexType.SINK);
Assert.assertEquals(sourceVertex.getVertexType(), VertexType.SOURCE);
}
public Plan buildDataSyncPlan() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = StreamSource.buildSource(streamingContext,
Lists.newArrayList("a", "b", "c"));
StreamSink streamSink = dataStream.sink(x -> LOGGER.info(x));
PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink));
Plan plan = planBuilder.buildPlan();
return plan;
}
@Test
public void testKeyByPlan() {
Plan plan = buildKeyByPlan();
List<PlanVertex> planVertexList = plan.getPlanVertexList();
List<PlanEdge> planEdgeList = plan.getPlanEdgeList();
Assert.assertEquals(planVertexList.size(), 3);
Assert.assertEquals(planEdgeList.size(), 2);
PlanVertex source = planVertexList.get(0);
PlanVertex map = planVertexList.get(1);
PlanVertex sink = planVertexList.get(2);
Assert.assertEquals(source.getVertexType(), VertexType.SOURCE);
Assert.assertEquals(map.getVertexType(), VertexType.PROCESS);
Assert.assertEquals(sink.getVertexType(), VertexType.SINK);
PlanEdge keyBy2Sink = planEdgeList.get(0);
PlanEdge source2KeyBy = planEdgeList.get(1);
Assert.assertEquals(keyBy2Sink.getPartition().getClass(), KeyPartition.class);
Assert.assertEquals(source2KeyBy.getPartition().getClass(), RoundRobinPartition.class);
}
public Plan buildKeyByPlan() {
StreamingContext streamingContext = StreamingContext.buildContext();
DataStream<String> dataStream = StreamSource.buildSource(streamingContext,
Lists.newArrayList("1", "2", "3", "4"));
StreamSink streamSink = dataStream.keyBy(x -> x)
.sink(x -> LOGGER.info(x));
PlanBuilder planBuilder = new PlanBuilder(Lists.newArrayList(streamSink));
Plan plan = planBuilder.buildPlan();
return plan;
}
}

View file

@ -0,0 +1,58 @@
package org.ray.streaming.schedule.impl;
import org.ray.streaming.api.partition.impl.RoundRobinPartition;
import org.ray.streaming.core.graph.ExecutionEdge;
import org.ray.streaming.core.graph.ExecutionGraph;
import org.ray.streaming.core.graph.ExecutionNode;
import org.ray.streaming.core.graph.ExecutionNode.NodeType;
import org.ray.streaming.core.runtime.StreamWorker;
import org.ray.streaming.plan.Plan;
import org.ray.streaming.plan.PlanBuilderTest;
import org.ray.streaming.schedule.ITaskAssign;
import java.util.ArrayList;
import java.util.List;
import org.ray.api.RayActor;
import org.ray.runtime.RayActorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TaskAssignImplTest {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskAssignImplTest.class);
@Test
public void testTaskAssignImpl() {
PlanBuilderTest planBuilderTest = new PlanBuilderTest();
Plan plan = planBuilderTest.buildDataSyncPlan();
List<RayActor<StreamWorker>> workers = new ArrayList<>();
for(int i = 0; i < plan.getPlanVertexList().size(); i++) {
workers.add(new RayActorImpl<>());
}
ITaskAssign taskAssign = new TaskAssignImpl();
ExecutionGraph executionGraph = taskAssign.assign(plan, workers);
List<ExecutionNode> executionNodeList = executionGraph.getExecutionNodeList();
Assert.assertEquals(executionNodeList.size(), 2);
ExecutionNode sourceNode = executionNodeList.get(0);
Assert.assertEquals(sourceNode.getNodeType(), NodeType.SOURCE);
Assert.assertEquals(sourceNode.getExecutionTaskList().size(), 1);
Assert.assertEquals(sourceNode.getExecutionEdgeList().size(), 1);
List<ExecutionEdge> sourceExecutionEdges = sourceNode.getExecutionEdgeList();
Assert.assertEquals(sourceExecutionEdges.size(), 1);
ExecutionEdge source2Sink = sourceExecutionEdges.get(0);
Assert.assertEquals(source2Sink.getPartition().getClass(), RoundRobinPartition.class);
ExecutionNode sinkNode = executionNodeList.get(1);
Assert.assertEquals(sinkNode.getNodeType(), NodeType.SINK);
Assert.assertEquals(sinkNode.getExecutionTaskList().size(), 1);
Assert.assertEquals(sinkNode.getExecutionEdgeList().size(), 0);
}
}

View file

@ -0,0 +1,6 @@
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

View file

@ -0,0 +1,3 @@
ray {
run-mode = SINGLE_PROCESS
}

View file

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
<suite name="Ray streaming test suite">
<test name = "Ray streaming test" >
<packages>
<package name = "org.ray.streaming.*" />
</packages>
</test>
</suite>

View file

@ -7,6 +7,14 @@ set -x
ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
run_testng() {
$@ || exit_code=$?
# exit_code == 2 means there are skipped tests.
if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then
exit $exit_code
fi
}
pushd $ROOT_DIR/..
echo "Linting Java code with checkstyle."
# NOTE(hchen): The `test_tag_filters` option causes bazel to ignore caches.
@ -17,21 +25,14 @@ echo "Running tests under cluster mode."
# TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests,
# TestNG will exit with code 2. And bazel treats it as test failure.
# bazel test //java:all_tests --action_env=ENABLE_MULTI_LANGUAGE_TESTS=1 --test_output="errors" || cluster_exit_code=$?
ENABLE_MULTI_LANGUAGE_TESTS=1 java -jar $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar $ROOT_DIR/testng.xml|| cluster_exit_code=$?
# exit_code == 2 means there are some tests skiped.
if [ $cluster_exit_code -ne 2 ] && [ $cluster_exit_code -ne 0 ] ; then
exit $cluster_exit_code
fi
ENABLE_MULTI_LANGUAGE_TESTS=1 run_testng java -jar $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar $ROOT_DIR/testng.xml
echo "Running tests under single-process mode."
# bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --test_output="errors" || single_exit_code=$?
java -jar -Dray.run-mode="SINGLE_PROCESS" $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar $ROOT_DIR/testng.xml || single_exit_code=$?
run_testng java -jar -Dray.run-mode="SINGLE_PROCESS" $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar $ROOT_DIR/testng.xml
# exit_code == 2 means there are some tests skiped.
if [ $single_exit_code -ne 2 ] && [ $single_exit_code -ne 0 ] ; then
exit $single_exit_code
fi
echo "Running streaming tests."
run_testng java -jar ./bazel-bin/java/streaming_tests_deploy.jar java/streaming/testng.xml
popd