[Streaming]Format java code using IDEA (#10440)

This commit is contained in:
Lixin Wei 2020-08-31 19:45:00 +08:00 committed by GitHub
parent afde3db4f0
commit 6bde6b493e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
125 changed files with 548 additions and 433 deletions

View file

@ -41,6 +41,11 @@
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>

View file

@ -1,14 +1,14 @@
<!DOCTYPE suppressions PUBLIC
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
<suppress checks="OperatorWrap" files=".*" />
<suppress checks="JavadocParagraph" files=".*" />
<suppress checks="SummaryJavadoc" files=".*" />
<suppress checks="OperatorWrap" files=".*"/>
<suppress checks="JavadocParagraph" files=".*"/>
<suppress checks="SummaryJavadoc" files=".*"/>
<suppress checks="AbbreviationAsWordInNameCheck" files=".*"/>
<suppress checks="ClassTypeParameterName" files="OneInputStreamTask.java"/>
<suppress checks="ClassTypeParameterName" files="StreamTask.java"/>
<!-- suppress check for flatbuffer-generated files. -->
<suppress checks=".*" files="io[\\/]ray[\\/]streaming[\\/]runtime[\\/]generated[\\/]" />
<suppress checks=".*" files="io[\\/]ray[\\/]streaming[\\/]runtime[\\/]generated[\\/]"/>
</suppressions>

View file

@ -17,6 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ClusterStarter {
private static final Logger LOG = LoggerFactory.getLogger(ClusterStarter.class);
private static final String PLASMA_STORE_SOCKET_NAME = "/tmp/ray/plasma_store_socket";
private static final String RAYLET_SOCKET_NAME = "/tmp/ray/raylet_socket";

View file

@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
* Encapsulate the context information of a streaming Job.
*/
public class StreamingContext implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(StreamingContext.class);
private transient AtomicInteger idGenerator;

View file

@ -3,21 +3,21 @@ package io.ray.streaming.api.function;
import io.ray.streaming.api.context.RuntimeContext;
/**
* An interface for all user-defined functions to define the life cycle methods of the
* functions, and access the task context where the functions get executed.
* An interface for all user-defined functions to define the life cycle methods of the functions,
* and access the task context where the functions get executed.
*/
public interface RichFunction extends Function {
/**
* Initialization method for user function which called before the first call to the user
* function.
*
* @param runtimeContext runtime context
*/
void open(RuntimeContext runtimeContext);
/**
* Tear-down method for the user function which called after the last call to
* the user function.
* Tear-down method for the user function which called after the last call to the user function.
*/
void close();

View file

@ -3,8 +3,8 @@ package io.ray.streaming.api.function.impl;
import io.ray.streaming.api.function.Function;
/**
* A filter function is a predicate applied individually to each record.
* The predicate decides whether to keep the element, or to discard it.
* A filter function is a predicate applied individually to each record. The predicate decides
* whether to keep the element, or to discard it.
*
* @param <T> type of the input data.
*/

View file

@ -10,6 +10,7 @@ import io.ray.streaming.api.function.RichFunction;
public class Functions {
private static class DefaultRichFunction implements RichFunction {
private final Function function;
private DefaultRichFunction(Function function) {

View file

@ -14,7 +14,7 @@ public interface Partition<T> extends Function {
* Given a record and downstream partitions, determine which partition(s) should receive the
* record.
*
* @param record The record.
* @param record The record.
* @param numPartition num of partitions
* @return IDs of the downstream partitions that should receive the record.
*/

View file

@ -7,6 +7,7 @@ import java.util.stream.IntStream;
* Broadcast the record to all downstream partitions.
*/
public class BroadcastPartition<T> implements Partition<T> {
private int[] partitions = new int[0];
public BroadcastPartition() {

View file

@ -10,6 +10,7 @@ import io.ray.streaming.api.partition.Partition;
* @param <T> Type of the input record.
*/
public class ForwardPartition<T> implements Partition<T> {
private int[] partitions = new int[] {0};
@Override

View file

@ -10,6 +10,7 @@ import io.ray.streaming.message.KeyRecord;
* @param <T> Type of the input record.
*/
public class KeyPartition<K, T> implements Partition<KeyRecord<K, T>> {
private int[] partitions = new int[1];
@Override

View file

@ -8,6 +8,7 @@ import io.ray.streaming.api.partition.Partition;
* @param <T> Type of the input record.
*/
public class RoundRobinPartition<T> implements Partition<T> {
private int seq;
private int[] partitions = new int[1];

View file

@ -22,7 +22,6 @@ import java.util.List;
/**
* Represents a stream of data.
*
* <p>This class defines all the streaming operations.
*
* @param <T> Type of data in the stream.
@ -33,9 +32,10 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
super(streamingContext, streamOperator);
}
public DataStream(StreamingContext streamingContext,
StreamOperator streamOperator,
Partition<T> partition) {
public DataStream(
StreamingContext streamingContext,
StreamOperator streamOperator,
Partition<T> partition) {
super(streamingContext, streamOperator, partition);
}
@ -43,15 +43,16 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
super(input, streamOperator);
}
public <R> DataStream(DataStream<R> input,
StreamOperator streamOperator,
Partition<T> partition) {
public <R> DataStream(
DataStream<R> input,
StreamOperator streamOperator,
Partition<T> partition) {
super(input, streamOperator, partition);
}
/**
* Create a java stream that reference passed python stream.
* Changes in new stream will be reflected in referenced stream and vice versa
* Create a java stream that reference passed python stream. Changes in new stream will be
* reflected in referenced stream and vice versa
*/
public DataStream(PythonDataStream referencedStream) {
super(referencedStream);
@ -84,8 +85,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
}
/**
* Apply union transformations to this stream by merging {@link DataStream} outputs of
* the same type with each other.
* Apply union transformations to this stream by merging {@link DataStream} outputs of the same
* type with each other.
*
* @param stream The DataStream to union output with.
* @param others The other DataStreams to union output with.
@ -100,8 +101,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
}
/**
* Apply union transformations to this stream by merging {@link DataStream} outputs of
* the same type with each other.
* Apply union transformations to this stream by merging {@link DataStream} outputs of the same
* type with each other.
*
* @param streams The DataStreams to union output with.
* @return A new UnionStream.
@ -177,8 +178,8 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
}
/**
* If parent stream is a python stream, we can't call partition related methods
* in the java stream.
* If parent stream is a python stream, we can't call partition related methods in the java
* stream.
*/
private void checkPartitionCall() {
if (getInputStream() != null && getInputStream().getLanguage() == Language.PYTHON) {
@ -188,9 +189,9 @@ public class DataStream<T> extends Stream<DataStream<T>, T> {
}
/**
* Convert this stream as a python stream.
* The converted stream and this stream are the same logical stream, which has same stream id.
* Changes in converted stream will be reflected in this stream and vice versa.
* Convert this stream as a python stream. The converted stream and this stream are the same
* logical stream, which has same stream id. Changes in converted stream will be reflected in this
* stream and vice versa.
*/
public PythonDataStream asPythonStream() {
return new PythonDataStream(this);

View file

@ -13,6 +13,7 @@ import java.io.Serializable;
* @param <O> Type of the data in the joined stream.
*/
public class JoinStream<L, R, O> extends DataStream<L> {
private final DataStream<R> rightStream;
public JoinStream(DataStream<L> leftStream, DataStream<R> rightStream) {
@ -37,6 +38,7 @@ public class JoinStream<L, R, O> extends DataStream<L> {
* @param <K> Type of the join key.
*/
class Where<K> implements Serializable {
private JoinStream<L, R, O> joinStream;
private KeyFunction<L, K> leftKeyByFunction;
@ -56,12 +58,14 @@ public class JoinStream<L, R, O> extends DataStream<L> {
* @param <K> Type of the join key.
*/
class Equal<K> implements Serializable {
private JoinStream<L, R, O> joinStream;
private KeyFunction<L, K> leftKeyByFunction;
private KeyFunction<R, K> rightKeyByFunction;
Equal(JoinStream<L, R, O> joinStream, KeyFunction<L, K> leftKeyByFunction,
KeyFunction<R, K> rightKeyByFunction) {
Equal(
JoinStream<L, R, O> joinStream, KeyFunction<L, K> leftKeyByFunction,
KeyFunction<R, K> rightKeyByFunction) {
this.joinStream = joinStream;
this.leftKeyByFunction = leftKeyByFunction;
this.rightKeyByFunction = rightKeyByFunction;

View file

@ -23,8 +23,8 @@ public class KeyDataStream<K, T> extends DataStream<T> {
}
/**
* Create a java stream that reference passed python stream.
* Changes in new stream will be reflected in referenced stream and vice versa
* Create a java stream that reference passed python stream. Changes in new stream will be
* reflected in referenced stream and vice versa
*/
public KeyDataStream(PythonDataStream referencedStream) {
super(referencedStream);
@ -53,9 +53,9 @@ public class KeyDataStream<K, T> extends DataStream<T> {
}
/**
* Convert this stream as a python stream.
* The converted stream and this stream are the same logical stream, which has same stream id.
* Changes in converted stream will be reflected in this stream and vice versa.
* Convert this stream as a python stream. The converted stream and this stream are the same
* logical stream, which has same stream id. Changes in converted stream will be reflected in this
* stream and vice versa.
*/
public PythonKeyDataStream asPythonStream() {
return new PythonKeyDataStream(this);

View file

@ -21,6 +21,7 @@ import java.util.Map;
*/
public abstract class Stream<S extends Stream<S, T>, T>
implements Serializable {
private final int id;
private final StreamingContext streamingContext;
private final Stream inputStream;
@ -34,9 +35,10 @@ public abstract class Stream<S extends Stream<S, T>, T>
this(streamingContext, null, streamOperator, getForwardPartition(streamOperator));
}
public Stream(StreamingContext streamingContext,
StreamOperator streamOperator,
Partition<T> partition) {
public Stream(
StreamingContext streamingContext,
StreamOperator streamOperator,
Partition<T> partition) {
this(streamingContext, null, streamOperator, partition);
}
@ -49,10 +51,11 @@ public abstract class Stream<S extends Stream<S, T>, T>
this(inputStream.getStreamingContext(), inputStream, streamOperator, partition);
}
protected Stream(StreamingContext streamingContext,
Stream inputStream,
StreamOperator streamOperator,
Partition<T> partition) {
protected Stream(
StreamingContext streamingContext,
Stream inputStream,
StreamOperator streamOperator,
Partition<T> partition) {
this.streamingContext = streamingContext;
this.inputStream = inputStream;
this.operator = streamOperator;
@ -64,8 +67,8 @@ public abstract class Stream<S extends Stream<S, T>, T>
}
/**
* Create a proxy stream of original stream.
* Changes in new stream will be reflected in original stream and vice versa
* Create a proxy stream of original stream. Changes in new stream will be reflected in original
* stream and vice versa
*/
protected Stream(Stream originalStream) {
this.originalStream = originalStream;
@ -183,8 +186,8 @@ public abstract class Stream<S extends Stream<S, T>, T>
}
/**
* Set the partition function of this {@link Stream} so that output elements are forwarded to
* next operator locally.
* Set the partition function of this {@link Stream} so that output elements are forwarded to next
* operator locally.
*/
public S forward() {
return setPartition(getForwardPartition(operator));

View file

@ -8,6 +8,7 @@ import io.ray.streaming.operator.StreamOperator;
* @param <T> Type of the input data of this sink.
*/
public abstract class StreamSink<T> extends Stream<StreamSink<T>, T> {
public StreamSink(Stream inputStream, StreamOperator streamOperator) {
super(inputStream, streamOperator);
}

View file

@ -6,4 +6,5 @@ package io.ray.streaming.api.stream;
* @param <T> The type of StreamSource data.
*/
public interface StreamSource<T> {
}

View file

@ -6,13 +6,13 @@ import java.util.List;
/**
* Represents a union DataStream.
*
* <p>This stream does not create a physical operation, it only affects how upstream data are
* connected to downstream data.
* connected to downstream data.
*
* @param <T> The type of union data.
*/
public class UnionStream<T> extends DataStream<T> {
private List<DataStream<T>> unionStreams;
public UnionStream(DataStream<T> input, List<DataStream<T>> streams) {

View file

@ -30,8 +30,9 @@ public class JobGraph implements Serializable {
this.jobEdges = new ArrayList<>();
}
public JobGraph(String jobName, Map<String, String> jobConfig,
List<JobVertex> jobVertices, List<JobEdge> jobEdges) {
public JobGraph(
String jobName, Map<String, String> jobConfig,
List<JobVertex> jobVertices, List<JobEdge> jobEdges) {
this.jobName = jobName;
this.jobConfig = jobConfig;
this.jobVertices = jobVertices;
@ -40,8 +41,8 @@ public class JobGraph implements Serializable {
}
/**
* Generate direct-graph(made up of a set of vertices and connected by edges)
* by current job graph for simple log printing.
* Generate direct-graph(made up of a set of vertices and connected by edges) by current job graph
* for simple log printing.
*
* @return Digraph in string type.
*/

View file

@ -19,6 +19,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobGraphBuilder {
private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilder.class);
private JobGraph jobGraph;
@ -34,8 +35,9 @@ public class JobGraphBuilder {
this(streamSinkList, jobName, new HashMap<>());
}
public JobGraphBuilder(List<StreamSink> streamSinkList, String jobName,
Map<String, String> jobConfig) {
public JobGraphBuilder(
List<StreamSink> streamSinkList, String jobName,
Map<String, String> jobConfig) {
this.jobGraph = new JobGraph(jobName, jobConfig);
this.streamSinkList = streamSinkList;
this.edgeIdGenerator = new AtomicInteger(0);
@ -98,7 +100,7 @@ public class JobGraphBuilder {
// process join stream
if (stream instanceof JoinStream) {
DataStream rightStream = ((JoinStream) stream).getRightStream();
DataStream rightStream = ((JoinStream) stream).getRightStream();
this.jobGraph.addEdge(
new JobEdge(rightStream.getId(), vertexId, rightStream.getPartition()));
processStream(rightStream);

View file

@ -21,10 +21,11 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
/**
* Optimize job graph by chaining some operators so that some operators can be run in the
* same thread.
* Optimize job graph by chaining some operators so that some operators can be run in the same
* thread.
*/
public class JobGraphOptimizer {
private final JobGraph jobGraph;
private Set<JobVertex> visited = new HashSet<>();
// vertex id -> vertex
@ -89,7 +90,8 @@ public class JobGraphOptimizer {
mergedVertex = headVertex;
} else {
List<StreamOperator> operators = verticesToMerge.stream()
.map(v -> vertexMap.get(v.getVertexId()).getStreamOperator())
.map(v -> vertexMap.get(v.getVertexId())
.getStreamOperator())
.collect(Collectors.toList());
List<Map<String, String>> configs = verticesToMerge.stream()
.map(v -> vertexMap.get(v.getVertexId()).getConfig())
@ -99,7 +101,8 @@ public class JobGraphOptimizer {
operator = ChainedOperator.newChainedOperator(operators, configs);
} else {
List<PythonOperator> pythonOperators = operators.stream()
.map(o -> (PythonOperator) o).collect(Collectors.toList());
.map(o -> (PythonOperator) o)
.collect(Collectors.toList());
operator = new ChainedPythonOperator(pythonOperators, configs);
}
// chained operator config is placed into `ChainedOperator`.
@ -156,9 +159,10 @@ public class JobGraphOptimizer {
}
}
private boolean canBeChained(JobVertex precedingVertex,
JobVertex succeedingVertex,
JobEdge edge) {
private boolean canBeChained(
JobVertex precedingVertex,
JobVertex succeedingVertex,
JobEdge edge) {
if (jobGraph.getVertexOutputEdges(precedingVertex.getVertexId()).size() > 1 ||
jobGraph.getVertexInputEdges(succeedingVertex.getVertexId()).size() > 1) {
return false;

View file

@ -18,11 +18,12 @@ public class JobVertex implements Serializable {
private StreamOperator streamOperator;
private Map<String, String> config;
public JobVertex(int vertexId,
int parallelism,
VertexType vertexType,
StreamOperator streamOperator,
Map<String, String> config) {
public JobVertex(
int vertexId,
int parallelism,
VertexType vertexType,
StreamOperator streamOperator,
Map<String, String> config) {
this.vertexId = vertexId;
this.parallelism = parallelism;
this.vertexType = vertexType;

View file

@ -4,6 +4,7 @@ import java.io.Serializable;
import java.util.Objects;
public class Record<T> implements Serializable {
protected transient String stream;
protected T value;

View file

@ -11,6 +11,7 @@ import io.ray.streaming.message.Record;
import java.util.List;
public abstract class StreamOperator<F extends Function> implements Operator {
protected final String name;
protected F function;
protected RichFunction richFunction;

View file

@ -23,6 +23,7 @@ import java.util.stream.Collectors;
* Abstract base class for chained operators.
*/
public abstract class ChainedOperator extends StreamOperator<Function> {
protected final List<StreamOperator> operators;
protected final Operator headOperator;
protected final Operator tailOperator;
@ -43,7 +44,8 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
// Dont' call super.open() as we `open` every operator separately.
List<ForwardCollector> succeedingCollectors = operators.stream().skip(1)
.map(operator -> new ForwardCollector((OneInputOperator) operator))
.map(operator -> new ForwardCollector(
(OneInputOperator) operator))
.collect(Collectors.toList());
for (int i = 0; i < operators.size() - 1; i++) {
StreamOperator operator = operators.get(i);
@ -113,6 +115,7 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
static class ChainedSourceOperator<T> extends ChainedOperator
implements SourceOperator<T> {
private final SourceOperator<T> sourceOperator;
@SuppressWarnings("unchecked")
@ -135,6 +138,7 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
static class ChainedOneInputOperator<T> extends ChainedOperator
implements OneInputOperator<T> {
private final OneInputOperator<T> inputOperator;
@SuppressWarnings("unchecked")
@ -152,6 +156,7 @@ public abstract class ChainedOperator extends StreamOperator<Function> {
static class ChainedTwoInputOperator<L, R> extends ChainedOperator
implements TwoInputOperator<L, R> {
private final TwoInputOperator<L, R> inputOperator;
@SuppressWarnings("unchecked")

View file

@ -5,6 +5,7 @@ import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
class ForwardCollector implements Collector<Record> {
private final OneInputOperator succeedingOperator;
ForwardCollector(OneInputOperator succeedingOperator) {

View file

@ -17,6 +17,7 @@ import io.ray.streaming.operator.TwoInputOperator;
*/
public class JoinOperator<L, R, K, O> extends StreamOperator<JoinFunction<L, R, O>> implements
TwoInputOperator<L, R> {
public JoinOperator() {
}

View file

@ -13,6 +13,7 @@ import java.util.List;
public class SourceOperatorImpl<T> extends StreamOperator<SourceFunction<T>>
implements SourceOperator {
private SourceContextImpl sourceContext;
public SourceOperatorImpl(SourceFunction<T> function) {
@ -47,6 +48,7 @@ public class SourceOperatorImpl<T> extends StreamOperator<SourceFunction<T>>
}
class SourceContextImpl implements SourceContext<T> {
private List<Collector> collectors;
public SourceContextImpl(List<Collector> collectors) {

View file

@ -7,20 +7,18 @@ import org.apache.commons.lang3.StringUtils;
/**
* Represents a user defined python function.
*
* <p>Python worker can use information in this class to create a function object.</p>
*
* <p>If this object is constructed from serialized python function,
* python worker can deserialize it to create python function directly.
* If this object is constructed from moduleName and className/functionName,
* python worker will use `importlib` to load python function.</p>
*
* python worker can deserialize it to create python function directly. If this object is
* constructed from moduleName and className/functionName, python worker will use `importlib` to
* load python function.</p>
* <p>If the python data stream api is invoked from python, `function` will be not null.</p>
* <p>If the python data stream api is invoked from java, `moduleName` and
* `functionName` will be not null.</p>
* <p>
*/
public class PythonFunction implements Function {
public enum FunctionInterface {
SOURCE_FUNCTION("SourceFunction"),
MAP_FUNCTION("MapFunction"),
@ -47,8 +45,8 @@ public class PythonFunction implements Function {
// null if this function is constructed from serialized python function.
private final String functionName;
/**
* FunctionInterface can be used to validate python function,
* and look up operator class from FunctionInterface.
* FunctionInterface can be used to validate python function, and look up operator class from
* FunctionInterface.
*/
private String functionInterface;
@ -69,10 +67,12 @@ public class PythonFunction implements Function {
*
* @param moduleName module name of streaming function.
* @param functionName function name of streaming function. {@code functionName} is the name
* of a python function, or class name of subclass of `ray.streaming.function.`
* of a
* python function, or class name of subclass of `ray.streaming.function.`
*/
public PythonFunction(String moduleName,
String functionName) {
public PythonFunction(
String moduleName,
String functionName) {
Preconditions.checkArgument(StringUtils.isNotBlank(moduleName));
Preconditions.checkArgument(StringUtils.isNotBlank(functionName));
this.function = null;

View file

@ -17,6 +17,7 @@ import java.util.stream.Collectors;
*/
@SuppressWarnings("unchecked")
public class PythonOperator extends StreamOperator {
private final String moduleName;
private final String className;
@ -80,7 +81,7 @@ public class PythonOperator extends StreamOperator {
StringBuilder builder = new StringBuilder();
builder.append(PythonOperator.class.getSimpleName()).append("[");
if (function != null) {
builder.append(((PythonFunction)function).toSimpleString());
builder.append(((PythonFunction) function).toSimpleString());
} else {
builder.append(moduleName).append(".").append(className);
}
@ -101,6 +102,7 @@ public class PythonOperator extends StreamOperator {
}
public static class ChainedPythonOperator extends PythonOperator {
private final List<PythonOperator> operators;
private final PythonOperator headOperator;
private final PythonOperator tailOperator;

View file

@ -8,16 +8,15 @@ import org.apache.commons.lang3.StringUtils;
/**
* Represents a python partition function.
* <p>
* Python worker can create a partition object using information in this
* PythonPartition.
* Python worker can create a partition object using information in this PythonPartition.
* <p>
* If this object is constructed from serialized python partition,
* python worker can deserialize it to create python partition directly.
* If this object is constructed from moduleName and className/functionName,
* python worker will use `importlib` to load python partition function.
* If this object is constructed from serialized python partition, python worker can deserialize it
* to create python partition directly. If this object is constructed from moduleName and
* className/functionName, python worker will use `importlib` to load python partition function.
* <p>
*/
public class PythonPartition implements Partition<Object> {
public static final PythonPartition BroadcastPartition = new PythonPartition(
"ray.streaming.partition", "BroadcastPartition");
public static final PythonPartition KeyPartition = new PythonPartition(

View file

@ -18,14 +18,16 @@ import java.util.List;
*/
public class PythonDataStream extends Stream<PythonDataStream, Object> implements PythonStream {
protected PythonDataStream(StreamingContext streamingContext,
PythonOperator pythonOperator) {
protected PythonDataStream(
StreamingContext streamingContext,
PythonOperator pythonOperator) {
super(streamingContext, pythonOperator);
}
protected PythonDataStream(StreamingContext streamingContext,
PythonOperator pythonOperator,
Partition<Object> partition) {
protected PythonDataStream(
StreamingContext streamingContext,
PythonOperator pythonOperator,
Partition<Object> partition) {
super(streamingContext, pythonOperator, partition);
}
@ -33,15 +35,16 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
super(input, pythonOperator);
}
public PythonDataStream(PythonDataStream input,
PythonOperator pythonOperator,
Partition<Object> partition) {
public PythonDataStream(
PythonDataStream input,
PythonOperator pythonOperator,
Partition<Object> partition) {
super(input, pythonOperator, partition);
}
/**
* Create a python stream that reference passed java stream.
* Changes in new stream will be reflected in referenced stream and vice versa
* Create a python stream that reference passed java stream. Changes in new stream will be
* reflected in referenced stream and vice versa
*/
public PythonDataStream(DataStream referencedStream) {
super(referencedStream);
@ -85,8 +88,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
* Apply a filter function to this stream.
*
* @param func The python FilterFunction.
* @return A new PythonDataStream that contains only the elements satisfying
* the given filter predicate.
* @return A new PythonDataStream that contains only the elements satisfying the given filter
* predicate.
*/
public PythonDataStream filter(PythonFunction func) {
func.setFunctionInterface(FunctionInterface.FILTER_FUNCTION);
@ -94,8 +97,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
}
/**
* Apply union transformations to this stream by merging {@link PythonDataStream} outputs of
* the same type with each other.
* Apply union transformations to this stream by merging {@link PythonDataStream} outputs of the
* same type with each other.
*
* @param stream The DataStream to union output with.
* @param others The other DataStreams to union output with.
@ -109,8 +112,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
}
/**
* Apply union transformations to this stream by merging {@link PythonDataStream} outputs of
* the same type with each other.
* Apply union transformations to this stream by merging {@link PythonDataStream} outputs of the
* same type with each other.
*
* @param streams The DataStreams to union output with.
* @return A new UnionStream.
@ -178,8 +181,8 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
}
/**
* If parent stream is a python stream, we can't call partition related methods
* in the java stream.
* If parent stream is a python stream, we can't call partition related methods in the java
* stream.
*/
private void checkPartitionCall() {
if (getInputStream() != null && getInputStream().getLanguage() == Language.JAVA) {
@ -189,9 +192,9 @@ public class PythonDataStream extends Stream<PythonDataStream, Object> implement
}
/**
* Convert this stream as a java stream.
* The converted stream and this stream are the same logical stream, which has same stream id.
* Changes in converted stream will be reflected in this stream and vice versa.
* Convert this stream as a java stream. The converted stream and this stream are the same logical
* stream, which has same stream id. Changes in converted stream will be reflected in this stream
* and vice versa.
*/
public DataStream<Object> asJavaStream() {
return new DataStream<>(this);

View file

@ -19,8 +19,8 @@ public class PythonKeyDataStream extends PythonDataStream implements PythonStrea
}
/**
* Create a python stream that reference passed python stream.
* Changes in new stream will be reflected in referenced stream and vice versa
* Create a python stream that reference passed python stream. Changes in new stream will be
* reflected in referenced stream and vice versa
*/
public PythonKeyDataStream(DataStream referencedStream) {
super(referencedStream);
@ -44,9 +44,9 @@ public class PythonKeyDataStream extends PythonDataStream implements PythonStrea
}
/**
* Convert this stream as a java stream.
* The converted stream and this stream are the same logical stream, which has same stream id.
* Changes in converted stream will be reflected in this stream and vice versa.
* Convert this stream as a java stream. The converted stream and this stream are the same logical
* stream, which has same stream id. Changes in converted stream will be reflected in this stream
* and vice versa.
*/
public KeyDataStream<Object, Object> asJavaStream() {
return new KeyDataStream(this);

View file

@ -4,4 +4,5 @@ package io.ray.streaming.python.stream;
* A marker interface used to identify all python streams.
*/
public interface PythonStream {
}

View file

@ -8,6 +8,7 @@ import io.ray.streaming.python.PythonOperator;
* Represents a sink of the PythonStream.
*/
public class PythonStreamSink extends StreamSink implements PythonStream {
public PythonStreamSink(PythonDataStream input, PythonOperator sinkOperator) {
super(input, sinkOperator);
getStreamingContext().addSink(this);

View file

@ -17,8 +17,9 @@ public class PythonStreamSource extends PythonDataStream implements StreamSource
withChainStrategy(ChainStrategy.HEAD);
}
public static PythonStreamSource from(StreamingContext streamingContext,
PythonFunction sourceFunction) {
public static PythonStreamSource from(
StreamingContext streamingContext,
PythonFunction sourceFunction) {
sourceFunction.setFunctionInterface(FunctionInterface.SOURCE_FUNCTION);
return new PythonStreamSource(streamingContext, sourceFunction);
}

View file

@ -6,11 +6,11 @@ import java.util.List;
/**
* Represents a union DataStream.
*
* <p>This stream does not create a physical operation, it only affects how upstream data are
* connected to downstream data.
* connected to downstream data.
*/
public class PythonUnionStream extends PythonDataStream {
private List<PythonDataStream> unionStreams;
public PythonUnionStream(PythonDataStream input, List<PythonDataStream> others) {

View file

@ -11,7 +11,7 @@ public class Config {
public static final String MEMORY_CHANNEL = "memory_channel";
public static final String NATIVE_CHANNEL = "native_channel";
public static final String CHANNEL_SIZE = "channel_size";
public static final String CHANNEL_SIZE_DEFAULT = String.valueOf((long)Math.pow(10, 8));
public static final String CHANNEL_SIZE_DEFAULT = String.valueOf((long) Math.pow(10, 8));
public static final String IS_RECREATE = "streaming.is_recreate";
// return from DataReader.getBundle if only empty message read in this interval.
public static final String TIMER_INTERVAL_MS = "timer_interval_ms";

View file

@ -2,6 +2,7 @@ package io.ray.streaming.api.stream;
import static org.testng.Assert.assertEquals;
import io.ray.streaming.api.context.StreamingContext;
import io.ray.streaming.operator.impl.MapOperator;
import io.ray.streaming.python.stream.PythonDataStream;

View file

@ -2,6 +2,7 @@ package io.ray.streaming.jobgraph;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Lists;
import io.ray.streaming.api.context.StreamingContext;
import io.ray.streaming.api.stream.DataStream;
@ -12,13 +13,14 @@ import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
public class JobGraphOptimizerTest {
private static final Logger LOG = LoggerFactory.getLogger( JobGraphOptimizerTest.class );
private static final Logger LOG = LoggerFactory.getLogger(JobGraphOptimizerTest.class);
@Test
public void testOptimize() {
StreamingContext context = StreamingContext.buildContext();
DataStream<Integer> source1 = DataStreamSource.fromCollection(context,
Lists.newArrayList(1 ,2 ,3));
Lists.newArrayList(1, 2, 3));
DataStream<String> source2 = DataStreamSource.fromCollection(context,
Lists.newArrayList("1", "2", "3"));
DataStream<String> source3 = DataStreamSource.fromCollection(context,
@ -43,7 +45,7 @@ public class JobGraphOptimizerTest {
public void testOptimizeHybridStream() {
StreamingContext context = StreamingContext.buildContext();
DataStream<Integer> source1 = DataStreamSource.fromCollection(context,
Lists.newArrayList(1 ,2 ,3));
Lists.newArrayList(1, 2, 3));
DataStream<String> source2 = DataStreamSource.fromCollection(context,
Lists.newArrayList("1", "2", "3"));
source1.asPythonStream()

View file

@ -12,6 +12,7 @@ public interface CommonConfig extends Config {
/**
* Ray streaming job id. Non-custom.
*
* @return Job id with string type.
*/
@DefaultValue(value = "default-job-id")
@ -20,6 +21,7 @@ public interface CommonConfig extends Config {
/**
* Ray streaming job name. Non-custom.
*
* @return Job name with string type.
*/
@DefaultValue(value = "default-job-name")

View file

@ -11,8 +11,7 @@ public interface SchedulerConfig extends Config {
String WORKER_STARTING_WAIT_TIMEOUT_MS = "streaming.scheduler.worker.starting.timeout.ms";
/**
* The timeout ms of worker initiation.
* Default is: 10000ms(10s).
* The timeout ms of worker initiation. Default is: 10000ms(10s).
*
* @return timeout ms
*/
@ -21,8 +20,7 @@ public interface SchedulerConfig extends Config {
int workerInitiationWaitTimeoutMs();
/**
* The timeout ms of worker starting.
* Default is: 10000ms(10s).
* The timeout ms of worker starting. Default is: 10000ms(10s).
*
* @return timeout ms
*/

View file

@ -17,6 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OutputCollector implements Collector<Record> {
private static final Logger LOGGER = LoggerFactory.getLogger(OutputCollector.class);
private final DataWriter writer;
@ -27,15 +28,17 @@ public class OutputCollector implements Collector<Record> {
private final Serializer javaSerializer = new JavaSerializer();
private final Serializer crossLangSerializer = new CrossLangSerializer();
public OutputCollector(DataWriter writer,
Collection<String> outputChannelIds,
Collection<BaseActorHandle> targetActors,
Partition partition) {
public OutputCollector(
DataWriter writer,
Collection<String> outputChannelIds,
Collection<BaseActorHandle> targetActors,
Partition partition) {
this.writer = writer;
this.outputQueues = outputChannelIds.stream().map(ChannelId::from).toArray(ChannelId[]::new);
this.targetActors = targetActors;
this.targetLanguages = targetActors.stream()
.map(actor -> actor instanceof PyActorHandle ? Language.PYTHON : Language.JAVA)
.map(actor -> actor instanceof PyActorHandle ? Language.PYTHON :
Language.JAVA)
.toArray(Language[]::new);
this.partition = partition;
LOGGER.debug("OutputCollector constructed, outputChannelIds:{}, partition:{}.",

View file

@ -6,10 +6,10 @@ import java.io.Serializable;
import java.util.UUID;
/**
* Streaming system unique identity base class.
* For example, ${@link ContainerID }
* Streaming system unique identity base class. For example, ${@link ContainerID }
*/
public class AbstractID implements Serializable {
private UUID id;
public AbstractID() {
@ -18,7 +18,7 @@ public class AbstractID implements Serializable {
@Override
public boolean equals(Object obj) {
return id.equals(((AbstractID)obj).getId());
return id.equals(((AbstractID) obj).getId());
}
public UUID getId() {

View file

@ -75,10 +75,10 @@ public class ExecutionEdge implements Serializable {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("source", sourceExecutionVertex)
.add("target", targetExecutionVertex)
.add("partition", partition)
.add("index", executionEdgeIndex)
.toString();
.add("source", sourceExecutionVertex)
.add("target", targetExecutionVertex)
.add("partition", partition)
.add("index", executionEdgeIndex)
.toString();
}
}

View file

@ -25,9 +25,7 @@ public class ExecutionGraph implements Serializable {
private Map<String, String> jobConfig;
/**
* Data map for execution job vertex.
* key: job vertex id.
* value: execution job vertex.
* Data map for execution job vertex. key: job vertex id. value: execution job vertex.
*/
private Map<Integer, ExecutionJobVertex> executionJobVertexMap;
@ -166,8 +164,11 @@ public class ExecutionGraph implements Serializable {
*/
public List<BaseActorHandle> getNonSourceActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(executionJobVertex -> executionJobVertex.isTransformationVertex()
|| executionJobVertex.isSinkVertex())
.filter(executionJobVertex ->
executionJobVertex
.isTransformationVertex()
|| executionJobVertex
.isSinkVertex())
.collect(Collectors.toList());
return getActorsFromJobVertices(executionJobVertices);

View file

@ -17,7 +17,6 @@ import org.aeonbits.owner.ConfigFactory;
/**
* Physical job vertex.
*
* <p>Execution job vertex is the physical form of {@link JobVertex} and
* every execution job vertex is corresponding to a group of {@link ExecutionVertex}.
*/
@ -29,8 +28,8 @@ public class ExecutionJobVertex {
private final int executionJobVertexId;
/**
* Use jobVertex id and operator(use {@link StreamOperator}'s name) as name.
* e.g. 1-SourceOperator
* Use jobVertex id and operator(use {@link StreamOperator}'s name) as name. e.g.
* 1-SourceOperator
*/
private final String executionJobVertexName;
private final StreamOperator streamOperator;

View file

@ -48,8 +48,8 @@ public class ExecutionVertex implements Serializable {
private int parallelism;
/**
* Ordered sub index for execution vertex in a execution job vertex.
* Might be changed in dynamic scheduling.
* Ordered sub index for execution vertex in a execution job vertex. Might be changed in dynamic
* scheduling.
*/
private int executionVertexIndex;
@ -102,8 +102,8 @@ public class ExecutionVertex implements Serializable {
}
/**
* Unique name generated by execution job vertex name and index of current execution vertex.
* e.g. 1-SourceOperator-3 (vertex index is 3)
* Unique name generated by execution job vertex name and index of current execution vertex. e.g.
* 1-SourceOperator-3 (vertex index is 3)
*/
public String getExecutionVertexName() {
return executionJobVertexName + "-" + executionVertexIndex;
@ -239,7 +239,7 @@ public class ExecutionVertex implements Serializable {
@Override
public boolean equals(Object obj) {
if (obj instanceof ExecutionVertex) {
return this.executionVertexId == ((ExecutionVertex)obj).getExecutionVertexId();
return this.executionVertexId == ((ExecutionVertex) obj).getExecutionVertexId();
}
return false;
}

View file

@ -6,6 +6,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TwoInputProcessor<T, O> extends StreamProcessor<Record, TwoInputOperator<T, O>> {
private static final Logger LOGGER = LoggerFactory.getLogger(TwoInputProcessor.class);
private String leftStream;

View file

@ -47,8 +47,7 @@ public class Container implements Serializable {
private Map<String, Double> availableResources = new HashMap<>();
/**
* List of {@link ExecutionVertex} ids
* belong to the container.
* List of {@link ExecutionVertex} ids belong to the container.
*/
private List<Integer> executionVertexIds = new ArrayList<>();

View file

@ -6,4 +6,5 @@ import io.ray.streaming.runtime.core.common.AbstractID;
* Container unique identifier.
*/
public class ContainerID extends AbstractID {
}

View file

@ -6,12 +6,12 @@ package io.ray.streaming.runtime.core.resource;
public enum ResourceType {
/**
*Cpu resource key.
* Cpu resource key.
*/
CPU("CPU"),
/**
*Gpu resource key.
* Gpu resource key.
*/
GPU("GPU"),

View file

@ -28,6 +28,7 @@ public class Resources implements Serializable {
/**
* Get registered containers, the container list is read-only.
*
* @return container list.
*/
public ImmutableList<Container> getRegisteredContainers() {
@ -52,7 +53,8 @@ public class Resources implements Serializable {
public ImmutableMap<UniqueId, Container> getRegisteredContainerMap() {
return ImmutableMap.copyOf(registerContainers.stream()
.collect(java.util.stream.Collectors.toMap(Container::getNodeId, c -> c)));
.collect(java.util.stream.Collectors
.toMap(Container::getNodeId, c -> c)));
}
@Override

View file

@ -8,7 +8,6 @@ import java.io.Serializable;
/**
* Runtime context for job master.
*
* <p>Including: graph, resource, checkpoint info, etc.
*/
public class JobRuntimeContext implements Serializable {

View file

@ -5,7 +5,6 @@ import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
/**
* Graph manager is one of the important roles of JobMaster. It mainly focuses on graph management.
*
* <p>
* Such as:
* <ol>

View file

@ -77,7 +77,7 @@ public class ResourceManagerImpl implements ResourceManager {
ResourceAssignStrategyType resourceAssignStrategyType =
ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY;
this.resourceAssignStrategy = ResourceAssignStrategyFactory.getStrategy(
resourceAssignStrategyType);
resourceAssignStrategyType);
LOG.info("Slot assign strategy: {}.", resourceAssignStrategy.getName());
//Init resource
@ -89,7 +89,8 @@ public class ResourceManagerImpl implements ResourceManager {
}
@Override
public ResourceAssignmentView assignResource(List<Container> containers,
public ResourceAssignmentView assignResource(
List<Container> containers,
ExecutionGraph executionGraph) {
return resourceAssignStrategy.assignResource(containers, executionGraph);
}
@ -106,8 +107,8 @@ public class ResourceManagerImpl implements ResourceManager {
}
/**
* Check the status of ray cluster node and update the internal resource information of
* streaming system.
* Check the status of ray cluster node and update the internal resource information of streaming
* system.
*/
private void checkAndUpdateResource() {
//Get add&del nodes(node -> container)
@ -117,7 +118,8 @@ public class ResourceManagerImpl implements ResourceManager {
.filter(this::isAddedNode).collect(Collectors.toList());
List<UniqueId> deleteNodes = resources.getRegisteredContainerMap().keySet().stream()
.filter(nodeId -> !latestNodeInfos.containsKey(nodeId)).collect(Collectors.toList());
.filter(nodeId -> !latestNodeInfos.containsKey(nodeId))
.collect(Collectors.toList());
LOG.info("Latest node infos: {}, current containers: {}, add nodes: {}, delete nodes: {}.",
latestNodeInfos, resources.getRegisteredContainers(), addNodes, deleteNodes);
@ -156,7 +158,6 @@ public class ResourceManagerImpl implements ResourceManager {
// failover case: container has already allocated actors
double availableCapacity = actorNumPerContainer - container.getAllocatedActorNum();
//Create ray resource.
Ray.setResource(container.getNodeId(), container.getName(), availableCapacity);
//Mark container is already registered.
@ -164,7 +165,7 @@ public class ResourceManagerImpl implements ResourceManager {
// update container's available dynamic resources
container.getAvailableResources()
.put(container.getName(), availableCapacity);
.put(container.getName(), availableCapacity);
// update register container list
resources.registerContainer(container);

View file

@ -15,8 +15,8 @@ public class ViewBuilder {
}
public static ResourceAssignmentView buildResourceAssignmentView(List<Container> containers) {
Map<ContainerID, List<Integer>> assignmentView = containers.stream()
.collect(java.util.stream.Collectors.toMap(Container::getId,
Map<ContainerID, List<Integer>> assignmentView =
containers.stream().collect(java.util.stream.Collectors.toMap(Container::getId,
Container::getExecutionVertexIds));
return ResourceAssignmentView.of(assignmentView);

View file

@ -12,8 +12,7 @@ import java.util.List;
public interface ResourceAssignStrategy {
/**
* Assign {@link Container} for
* {@link ExecutionVertex}
* Assign {@link Container} for {@link ExecutionVertex}
*
* @param containers registered container
* @param executionGraph execution graph

View file

@ -17,11 +17,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Based on Ray dynamic resource function, resource details(by ray gcs get) and
* execution logic diagram, PipelineFirstStrategy provides a actor scheduling
* strategies to make the cluster load balanced and controllable scheduling.
* Assume that we have 2 containers and have a DAG graph composed of a source node with parallelism
* of 2 and a sink node with parallelism of 2, the structure will be like:
* Based on Ray dynamic resource function, resource details(by ray gcs get) and execution logic
* diagram, PipelineFirstStrategy provides a actor scheduling strategies to make the cluster load
* balanced and controllable scheduling. Assume that we have 2 containers and have a DAG graph
* composed of a source node with parallelism of 2 and a sink node with parallelism of 2, the
* structure will be like:
* <pre>
* container_0
* |- source_1
@ -38,7 +38,7 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
private int currentContainerIndex = 0;
/**
* Assign resource to each execution vertex in the given execution graph.
* Assign resource to each execution vertex in the given execution graph.
*
* @param containers registered containers
* @param executionGraph execution graph
@ -125,6 +125,7 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
/**
* Find a container which matches required resource
*
* @param requiredResource required resource
* @param containers registered containers
* @return container that matches the required resource
@ -151,6 +152,7 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
/**
* Check if current container has enough resource
*
* @param requiredResource required resource
* @param container container
* @return true if matches, false else
@ -198,6 +200,7 @@ public class PipelineFirstStrategy implements ResourceAssignStrategy {
/**
* Get current container
*
* @param containers registered container
* @return current container to allocate actor
*/

View file

@ -9,6 +9,7 @@ public interface JobScheduler {
/**
* Schedule streaming job using the physical plan.
*
* @param executionGraph physical plan
* @return scheduling result
*/

View file

@ -18,7 +18,8 @@ public class ScheduleException extends RuntimeException {
super(cause);
}
protected ScheduleException(String message, Throwable cause, boolean enableSuppression,
protected ScheduleException(
String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}

View file

@ -171,8 +171,8 @@ public class WorkerLifecycleController {
List<ExecutionVertex> executionVertices) {
final Object asyncContext = Ray.getAsyncContext();
List<CompletableFuture<Boolean>> futureResults = executionVertices.stream()
.map(vertex -> CompletableFuture.supplyAsync(() -> {
List<CompletableFuture<Boolean>> futureResults =
executionVertices.stream().map(vertex -> CompletableFuture.supplyAsync(() -> {
Ray.setAsyncContext(asyncContext);
return operation.apply(vertex);
})).collect(Collectors.toList());

View file

@ -34,8 +34,8 @@ public class GraphPbBuilder {
List<ExecutionVertex> upstreamVertices = executionVertex.getInputVertices();
List<RemoteCall.ExecutionVertexContext.ExecutionVertex> upstreamVertexPbs =
upstreamVertices.stream()
.map(this::buildVertex)
.collect(Collectors.toList());
.map(this::buildVertex)
.collect(Collectors.toList());
builder.addAllUpstreamExecutionVertices(upstreamVertexPbs);
// build downstream vertices
@ -127,7 +127,8 @@ public class GraphPbBuilder {
private byte[] serializePythonChainedOperator(ChainedPythonOperator operator) {
List<byte[]> serializedOperators = operator.getOperators().stream()
.map(this::serializeOperator).collect(Collectors.toList());
.map(this::serializeOperator)
.collect(Collectors.toList());
return serializer.serialize(Arrays.asList(
serializedOperators,
operator.getConfigs()

View file

@ -23,15 +23,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Gateway for streaming python api.
* All calls on DataStream in python will be mapped to DataStream call in java by this
* PythonGateway using ray calls.
* <p>
* Note: this class needs to be in sync with `GatewayClient` in
* `streaming/python/runtime/gateway_client.py`
* Gateway for streaming python api. All calls on DataStream in python will be mapped to DataStream
* call in java by this PythonGateway using ray calls. this class needs to be in sync with
* GatewayClient in `streaming/python/runtime/gateway_client.py`
*/
@SuppressWarnings("unchecked")
public class PythonGateway {
private static final Logger LOG = LoggerFactory.getLogger(PythonGateway.class);
private static final String REFERENCE_ID_PREFIX = "__gateway_reference_id__";
private static MsgPackSerializer serializer = new MsgPackSerializer();
@ -169,8 +167,9 @@ public class PythonGateway {
.toArray(Class[]::new);
Optional<Method> any = methods.stream()
.filter(m -> {
boolean exactMatch = Arrays.equals(m.getParameterTypes(), paramsTypes) ||
Arrays.equals(m.getParameterTypes(), unwrappedTypes);
boolean exactMatch =
Arrays.equals(m.getParameterTypes(), paramsTypes) ||
Arrays.equals(m.getParameterTypes(), unwrappedTypes);
if (exactMatch) {
return true;
} else if (paramsTypes.length == m.getParameterTypes().length) {

View file

@ -12,8 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ray call worker.
* It takes the communication job from {@link JobMaster} to {@link JobWorker}.
* Ray call worker. It takes the communication job from {@link JobMaster} to {@link JobWorker}.
*/
public class RemoteCallWorker {

View file

@ -6,10 +6,11 @@ import java.util.Arrays;
import java.util.List;
/**
* A serializer for cross-lang serialization between java/python.
* TODO implements a more sophisticated serialization framework
* A serializer for cross-lang serialization between java/python. TODO implements a more
* sophisticated serialization framework
*/
public class CrossLangSerializer implements Serializer {
private static final byte RECORD_TYPE_ID = 0;
private static final byte KEY_RECORD_TYPE_ID = 1;

View file

@ -3,6 +3,7 @@ package io.ray.streaming.runtime.serialization;
import io.ray.runtime.serializer.FstSerializer;
public class JavaSerializer implements Serializer {
@Override
public byte[] serialize(Object object) {
return FstSerializer.encode(object);

View file

@ -1,6 +1,7 @@
package io.ray.streaming.runtime.serialization;
public interface Serializer {
byte CROSS_LANG_TYPE_ID = 0;
byte JAVA_TYPE_ID = 1;
byte PYTHON_TYPE_ID = 2;

View file

@ -42,7 +42,7 @@ public class ChannelCreationParametersBuilder {
String language =
asyncFunctionDescriptor instanceof JavaFunctionDescriptor ? "Java" : "Python";
return "Language: " + language + " Desc: " + asyncFunctionDescriptor.toList() + " "
+ syncFunctionDescriptor.toList();
+ syncFunctionDescriptor.toList();
}
// Get actor id in bytes, called from jni.
@ -93,14 +93,16 @@ public class ChannelCreationParametersBuilder {
public ChannelCreationParametersBuilder() {
}
public static void setJavaReaderFunctionDesc(JavaFunctionDescriptor asyncFunc,
JavaFunctionDescriptor syncFunc) {
public static void setJavaReaderFunctionDesc(
JavaFunctionDescriptor asyncFunc,
JavaFunctionDescriptor syncFunc) {
javaReaderAsyncFuncDesc = asyncFunc;
javaReaderSyncFuncDesc = syncFunc;
}
public static void setJavaWriterFunctionDesc(JavaFunctionDescriptor asyncFunc,
JavaFunctionDescriptor syncFunc) {
public static void setJavaWriterFunctionDesc(
JavaFunctionDescriptor asyncFunc,
JavaFunctionDescriptor syncFunc) {
javaWriterAsyncFuncDesc = asyncFunc;
javaWriterSyncFuncDesc = syncFunc;
}
@ -109,19 +111,23 @@ public class ChannelCreationParametersBuilder {
List<String> queues,
List<BaseActorHandle> actors) {
return buildParameters(queues, actors, javaWriterAsyncFuncDesc, javaWriterSyncFuncDesc,
pyWriterAsyncFunctionDesc, pyWriterSyncFunctionDesc);
pyWriterAsyncFunctionDesc, pyWriterSyncFunctionDesc);
}
public ChannelCreationParametersBuilder buildOutputQueueParameters(List<String> queues,
List<BaseActorHandle> actors) {
public ChannelCreationParametersBuilder buildOutputQueueParameters(
List<String> queues,
List<BaseActorHandle> actors) {
return buildParameters(queues, actors, javaReaderAsyncFuncDesc, javaReaderSyncFuncDesc,
pyReaderAsyncFunctionDesc, pyReaderSyncFunctionDesc);
pyReaderAsyncFunctionDesc, pyReaderSyncFunctionDesc);
}
private ChannelCreationParametersBuilder buildParameters(List<String> queues,
private ChannelCreationParametersBuilder buildParameters(
List<String> queues,
List<BaseActorHandle> actors,
JavaFunctionDescriptor javaAsyncFunctionDesc, JavaFunctionDescriptor javaSyncFunctionDesc,
PyFunctionDescriptor pyAsyncFunctionDesc, PyFunctionDescriptor pySyncFunctionDesc
JavaFunctionDescriptor javaAsyncFunctionDesc,
JavaFunctionDescriptor javaSyncFunctionDesc,
PyFunctionDescriptor pyAsyncFunctionDesc,
PyFunctionDescriptor pySyncFunctionDesc
) {
parameters = new ArrayList<>(queues.size());

View file

@ -12,10 +12,11 @@ import java.util.Set;
import sun.nio.ch.DirectBuffer;
/**
* ChannelID is used to identify a transfer channel between a upstream worker
* and downstream worker.
* ChannelID is used to identify a transfer channel between a upstream worker and downstream
* worker.
*/
public class ChannelId {
public static final int ID_LENGTH = 20;
private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
// This ensures that the FinalizablePhantomReference itself is not garbage-collected.
@ -132,7 +133,7 @@ public class ChannelId {
* Generate channel name, which will be 20 character
*
* @param fromTaskId upstream task id
* @param toTaskId downstream task id
* @param toTaskId downstream task id
* @return channel name
*/
public static String genIdStr(int fromTaskId, int toTaskId, long ts) {

View file

@ -7,6 +7,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ChannelUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class);
static byte[] toNativeConf(StreamingWorkerConfig workerConfig) {

View file

@ -6,6 +6,7 @@ import java.nio.ByteBuffer;
* DataMessage represents data between upstream and downstream operator
*/
public class DataMessage implements Message {
private final ByteBuffer body;
private final long msgId;
private final long timestamp;

View file

@ -14,10 +14,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataReader is wrapper of streaming c++ DataReader, which read data
* from channels of upstream workers
* DataReader is wrapper of streaming c++ DataReader, which read data from channels of upstream
* workers
*/
public class DataReader {
private static final Logger LOG = LoggerFactory.getLogger(DataReader.class);
private long nativeReaderPtr;
@ -25,12 +26,13 @@ public class DataReader {
/**
* @param inputChannels input channels ids
* @param fromActors upstream input actors
* @param workerConfig configuration
* @param fromActors upstream input actors
* @param workerConfig configuration
*/
public DataReader(List<String> inputChannels,
List<BaseActorHandle> fromActors,
StreamingWorkerConfig workerConfig) {
public DataReader(
List<String> inputChannels,
List<BaseActorHandle> fromActors,
StreamingWorkerConfig workerConfig) {
Preconditions.checkArgument(inputChannels.size() > 0);
Preconditions.checkArgument(inputChannels.size() == fromActors.size());
ChannelCreationParametersBuilder initialParameters =
@ -169,10 +171,11 @@ public class DataReader {
byte[] configBytes,
boolean isMock);
private native void getBundleNative(long nativeReaderPtr,
long timeoutMillis,
long params,
long metaAddress);
private native void getBundleNative(
long nativeReaderPtr,
long timeoutMillis,
long params,
long metaAddress);
private native void stopReaderNative(long nativeReaderPtr);
@ -191,6 +194,7 @@ public class DataReader {
}
static class BundleMeta {
// kMessageBundleHeaderSize + kUniqueIDSize:
// magicNum(4b) + bundleTs(8b) + lastMessageId(8b) + messageListSize(4b)
// + bundleType(4b) + rawBundleSize(4b) + channelID(20b)

View file

@ -13,10 +13,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataWriter is a wrapper of streaming c++ DataWriter, which sends data
* to downstream workers
* DataWriter is a wrapper of streaming c++ DataWriter, which sends data to downstream workers
*/
public class DataWriter {
private static final Logger LOG = LoggerFactory.getLogger(DataWriter.class);
private long nativeWriterPtr;
@ -29,12 +29,13 @@ public class DataWriter {
/**
* @param outputChannels output channels ids
* @param toActors downstream output actors
* @param workerConfig configuration
* @param toActors downstream output actors
* @param workerConfig configuration
*/
public DataWriter(List<String> outputChannels,
List<BaseActorHandle> toActors,
StreamingWorkerConfig workerConfig) {
public DataWriter(
List<String> outputChannels,
List<BaseActorHandle> toActors,
StreamingWorkerConfig workerConfig) {
Preconditions.checkArgument(!outputChannels.isEmpty());
Preconditions.checkArgument(outputChannels.size() == toActors.size());
ChannelCreationParametersBuilder initialParameters =
@ -66,7 +67,7 @@ public class DataWriter {
/**
* Write msg into the specified channel
*
* @param id channel id
* @param id channel id
* @param item message item data section is specified by [position, limit).
*/
public void write(ChannelId id, ByteBuffer item) {
@ -80,9 +81,10 @@ public class DataWriter {
/**
* Write msg into the specified channels
*
* @param ids channel ids
* @param item message item data section is specified by [position, limit).
* item doesn't have to be a direct buffer.
* @param ids channel ids
* @param item message item data section is specified by [position, limit). item doesn't have
* to
* be a direct buffer.
*/
public void write(Set<ChannelId> ids, ByteBuffer item) {
int size = item.remaining();

View file

@ -6,7 +6,7 @@ public interface Message {
/**
* Message data
*
* <p>
* Message body is a direct byte buffer, which may be invalid after call next
* <code>DataReader#getBundleNative</code>. Please consume this buffer fully
* before next call <code>getBundleNative</code>.

View file

@ -8,7 +8,7 @@ import java.util.Map;
public class CommonUtils {
public static Map<String, Object> strMapToObjectMap(Map<String, String> srcMap) {
Map<String,Object> destMap = (Map) srcMap;
Map<String, Object> destMap = (Map) srcMap;
return destMap;
}
}

View file

@ -34,8 +34,8 @@ public class RayUtils {
*/
public static Map<UniqueId, NodeInfo> getAliveNodeInfoMap() {
return getAllNodeInfo().stream()
.filter(nodeInfo -> nodeInfo.isAlive)
.collect(Collectors.toMap(nodeInfo -> nodeInfo.nodeId, nodeInfo -> nodeInfo));
.filter(nodeInfo -> nodeInfo.isAlive)
.collect(Collectors.toMap(nodeInfo -> nodeInfo.nodeId, nodeInfo -> nodeInfo));
}
private static List<NodeInfo> mockContainerResources() {

View file

@ -20,8 +20,7 @@ import org.slf4j.LoggerFactory;
/**
* The streaming worker implementation class, it is ray actor. JobWorker is created by
* {@link JobMaster} through ray api, and JobMaster communicates
* with JobWorker through Ray.call().
* {@link JobMaster} through ray api, and JobMaster communicates with JobWorker through Ray.call().
*
* <p>The JobWorker is responsible for creating tasks and defines the methods of communication
* between workers.
@ -147,8 +146,7 @@ public class JobWorker implements Serializable {
}
/**
* Used by upstream streaming queue to send data to this actor
* and receive result from this actor
* Used by upstream streaming queue to send data to this actor and receive result from this actor
*/
public byte[] onReaderMessageSync(byte[] buffer) {
if (transferHandler == null) {
@ -165,8 +163,8 @@ public class JobWorker implements Serializable {
}
/**
* Used by downstream streaming queue to send data to this actor
* and receive result from this actor
* Used by downstream streaming queue to send data to this actor and receive result from this
* actor
*/
public byte[] onWriterMessageSync(byte[] buffer) {
if (transferHandler == null) {

View file

@ -56,10 +56,10 @@ public class JobWorkerContext implements Serializable {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("workerId", getWorkerId())
.add("workerName", getWorkerName())
.add("config", getConfig())
.toString();
.add("workerId", getWorkerId())
.add("workerName", getWorkerName())
.add("config", getConfig())
.toString();
}
public byte[] getPythonWorkerContextBytes() {
@ -68,10 +68,11 @@ public class JobWorkerContext implements Serializable {
new GraphPbBuilder().buildExecutionVertexContext(executionVertex);
byte[] contextBytes = RemoteCall.PythonJobWorkerContext.newBuilder()
.setMasterActor(ByteString.copyFrom((((NativeActorHandle) (master)).toBytes())))
.setExecutionVertexContext(executionVertexContext)
.build()
.toByteArray();
.setMasterActor(
ByteString.copyFrom((((NativeActorHandle) (master)).toBytes())))
.setExecutionVertexContext(executionVertexContext)
.build()
.toByteArray();
return contextBytes;
}

View file

@ -19,6 +19,7 @@ import java.util.Map;
* Use Ray to implement RuntimeContext.
*/
public class StreamingRuntimeContext implements RuntimeContext {
/**
* Backend for keyed state. This might be empty if we're not on a keyed stream.
*/
@ -33,7 +34,8 @@ public class StreamingRuntimeContext implements RuntimeContext {
private Long checkpointId;
private Map<String, String> config;
public StreamingRuntimeContext(ExecutionVertex executionVertex, Map<String, String> config,
public StreamingRuntimeContext(
ExecutionVertex executionVertex, Map<String, String> config,
int parallelism) {
this.taskId = executionVertex.getExecutionVertexId();
this.config = config;
@ -115,8 +117,9 @@ public class StreamingRuntimeContext implements RuntimeContext {
return this.keyStateBackend.getMapState(stateDescriptor);
}
protected void stateSanityCheck(AbstractStateDescriptor stateDescriptor,
AbstractKeyStateBackend backend) {
protected void stateSanityCheck(
AbstractStateDescriptor stateDescriptor,
AbstractKeyStateBackend backend) {
Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
Preconditions.checkNotNull(backend, "backend must not be null");
}

View file

@ -9,6 +9,7 @@ import io.ray.streaming.runtime.transfer.Message;
import io.ray.streaming.runtime.worker.JobWorker;
public abstract class InputStreamTask extends StreamTask {
private volatile boolean running = true;
private volatile boolean stopped = false;
private long readTimeoutMillis;
@ -56,8 +57,8 @@ public abstract class InputStreamTask extends StreamTask {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("taskId", taskId)
.add("processor", processor)
.toString();
.add("taskId", taskId)
.add("processor", processor)
.toString();
}
}

View file

@ -14,8 +14,8 @@ public class SourceStreamTask extends StreamTask {
private final SourceProcessor sourceProcessor;
/**
* SourceStreamTask for executing a {@link SourceOperator}.
* It is responsible for running the corresponding source operator.
* SourceStreamTask for executing a {@link SourceOperator}. It is responsible for running the
* corresponding source operator.
*/
public SourceStreamTask(int taskId, Processor sourceProcessor, JobWorker jobWorker) {
super(taskId, sourceProcessor, jobWorker);

View file

@ -19,11 +19,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class StreamTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
protected int taskId;
@ -44,7 +44,7 @@ public abstract class StreamTask implements Runnable {
prepareTask();
this.thread = new Thread(Ray.wrapRunnable(this),
this.getClass().getName() + "-" + System.currentTimeMillis());
this.getClass().getName() + "-" + System.currentTimeMillis());
this.thread.setDaemon(true);
}

View file

@ -16,8 +16,8 @@ public class TwoInputStreamTask extends InputStreamTask {
String leftStream,
String rightStream) {
super(taskId, processor, jobWorker);
((TwoInputProcessor)(super.processor)).setLeftStream(leftStream);
((TwoInputProcessor)(super.processor)).setRightStream(rightStream);
((TwoInputProcessor) (super.processor)).setLeftStream(leftStream);
((TwoInputProcessor) (super.processor)).setRightStream(rightStream);
}
}

View file

@ -25,12 +25,12 @@ public abstract class BaseUnitTest {
@BeforeMethod
public void testBegin(Method method) {
LOG.info(">>>>>>>>>>>>>>>>>>>> Test case: {}.{} began >>>>>>>>>>>>>>>>>>>>",
method.getDeclaringClass(), method.getName());
method.getDeclaringClass(), method.getName());
}
@AfterMethod
public void testEnd(Method method) {
LOG.info(">>>>>>>>>>>>>>>>>>>> Test case: {}.{} end >>>>>>>>>>>>>>>>>>>>",
method.getDeclaringClass(), method.getName());
method.getDeclaringClass(), method.getName());
}
}

View file

@ -50,11 +50,11 @@ public class ExecutionGraphTest extends BaseUnitTest {
executionGraph.getExecutionVertexIdGenerator().get());
executionGraph.getAllExecutionVertices().forEach(vertex -> {
Assert.assertNotNull(vertex.getStreamOperator());
Assert.assertNotNull(vertex.getExecutionJobVertexName());
Assert.assertNotNull(vertex.getVertexType());
Assert.assertNotNull(vertex.getLanguage());
Assert.assertEquals(vertex.getExecutionVertexName(),
Assert.assertNotNull(vertex.getStreamOperator());
Assert.assertNotNull(vertex.getExecutionJobVertexName());
Assert.assertNotNull(vertex.getVertexType());
Assert.assertNotNull(vertex.getLanguage());
Assert.assertEquals(vertex.getExecutionVertexName(),
vertex.getExecutionJobVertexName() + "-" + vertex.getExecutionVertexIndex());
});
@ -66,10 +66,11 @@ public class ExecutionGraphTest extends BaseUnitTest {
List<ExecutionVertex> upStreamVertices = upStream.getExecutionVertices();
List<ExecutionVertex> downStreamVertices = downStream.getExecutionVertices();
upStreamVertices.forEach(vertex -> {
Assert.assertEquals((double) vertex.getResource().get(ResourceType.CPU.name()), 2.0);
vertex.getOutputEdges().forEach(upStreamOutPutEdge -> {
Assert.assertTrue(downStreamVertices.contains(upStreamOutPutEdge.getTargetExecutionVertex()));
});
Assert.assertEquals((double) vertex.getResource().get(ResourceType.CPU.name()), 2.0);
vertex.getOutputEdges().forEach(upStreamOutPutEdge -> {
Assert
.assertTrue(downStreamVertices.contains(upStreamOutPutEdge.getTargetExecutionVertex()));
});
});
}

View file

@ -18,6 +18,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class HybridStreamTest {
private static final Logger LOG = LoggerFactory.getLogger(HybridStreamTest.class);
public static class Mapper1 implements MapFunction<Object, Object> {

View file

@ -17,7 +17,8 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class UnionStreamTest {
private static final Logger LOG = LoggerFactory.getLogger( UnionStreamTest.class );
private static final Logger LOG = LoggerFactory.getLogger(UnionStreamTest.class);
@Test(timeOut = 60000)
public void testUnionStream() throws Exception {

View file

@ -46,7 +46,8 @@ public class WordCountTest extends BaseUnitTest implements Serializable {
.filter(pair -> !pair.word.contains("world"))
.keyBy(pair -> pair.word)
.reduce((ReduceFunction<WordAndCount>) (oldValue, newValue) ->
new WordAndCount(oldValue.word, oldValue.count + newValue.count))
new WordAndCount(oldValue.word,
oldValue.count + newValue.count))
.sink((SinkFunction<WordAndCount>)
result -> wordCount.put(result.word, result.count));

View file

@ -2,6 +2,7 @@ package io.ray.streaming.runtime.python;
import static org.testng.Assert.assertEquals;
import io.ray.streaming.api.stream.StreamSink;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.jobgraph.JobGraphBuilder;

View file

@ -3,6 +3,7 @@ package io.ray.streaming.runtime.serialization;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import io.ray.streaming.message.KeyRecord;
import io.ray.streaming.message.Record;
import org.apache.commons.lang3.builder.EqualsBuilder;
@ -17,10 +18,10 @@ public class CrossLangSerializerTest {
Record record = new Record("value");
record.setStream("stream1");
assertTrue(EqualsBuilder.reflectionEquals(record,
serializer.deserialize(serializer.serialize(record))));
serializer.deserialize(serializer.serialize(record))));
KeyRecord keyRecord = new KeyRecord("key", "value");
keyRecord.setStream("stream2");
assertEquals(keyRecord,
serializer.deserialize(serializer.serialize(keyRecord)));
serializer.deserialize(serializer.serialize(keyRecord)));
}
}

View file

@ -3,6 +3,7 @@ package io.ray.streaming.runtime.serialization;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -18,7 +19,7 @@ public class MsgPackSerializerTest {
MsgPackSerializer serializer = new MsgPackSerializer();
assertEquals(serializer.deserialize(
serializer.serialize((byte)1)), (byte)1);
serializer.serialize((byte) 1)), (byte) 1);
}
@Test

View file

@ -1,12 +1,12 @@
package io.ray.streaming.runtime.streamingqueue;
import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.api.ActorHandle;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.transfer.ChannelId;
import io.ray.streaming.runtime.transfer.ChannelCreationParametersBuilder;
import io.ray.streaming.runtime.transfer.ChannelId;
import io.ray.streaming.runtime.transfer.DataMessage;
import io.ray.streaming.runtime.transfer.DataReader;
import io.ray.streaming.runtime.transfer.DataWriter;
@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
public class Worker {
private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
protected TransferHandler transferHandler = null;
@ -50,6 +51,7 @@ public class Worker {
}
class ReaderWorker extends Worker {
private static final Logger LOGGER = LoggerFactory.getLogger(ReaderWorker.class);
private String name = null;
@ -170,6 +172,7 @@ class ReaderWorker extends Worker {
}
class WriterWorker extends Worker {
private static final Logger LOGGER = LoggerFactory.getLogger(WriterWorker.class);
private String name = null;

View file

@ -2,6 +2,7 @@ package io.ray.streaming.runtime.transfer;
import static org.testng.Assert.assertEquals;
import io.ray.streaming.runtime.BaseUnitTest;
import io.ray.streaming.runtime.util.EnvUtil;
import org.testng.annotations.Test;

View file

@ -21,12 +21,11 @@ public class Mockitools {
public static void mockGscApi() {
PowerMockito.mockStatic(RayUtils.class);
PowerMockito.when(RayUtils.getAliveNodeInfoMap())
.thenReturn(mockGetNodeInfoMap(mockGetAllNodeInfo()));
.thenReturn(mockGetNodeInfoMap(mockGetAllNodeInfo()));
}
/**
* Mock get all node info from GCS
* @return
*/
public static List<NodeInfo> mockGetAllNodeInfo() {
List<NodeInfo> nodeInfos = new LinkedList<>();
@ -55,21 +54,22 @@ public class Mockitools {
/**
* Mock get node info map
*
* @param nodeInfos all node infos fetched from GCS
* @return node info map, key is node unique id, value is node info
*/
public static Map<UniqueId, NodeInfo> mockGetNodeInfoMap(List<NodeInfo> nodeInfos) {
return nodeInfos.stream().filter(nodeInfo -> nodeInfo.isAlive).collect(
Collectors.toMap(nodeInfo -> nodeInfo.nodeId, nodeInfo -> nodeInfo));
Collectors.toMap(nodeInfo -> nodeInfo.nodeId, nodeInfo -> nodeInfo));
}
private static NodeInfo mockNodeInfo(int i, Map<String, Double> resources) {
return new NodeInfo(
createNodeId(i),
"localhost" + i,
"localhost" + i,
true,
resources);
createNodeId(i),
"localhost" + i,
"localhost" + i,
true,
resources);
}
private static UniqueId createNodeId(int id) {

View file

@ -2,6 +2,7 @@ package io.ray.streaming.runtime.util;
import static org.testng.Assert.assertEquals;
import java.io.Serializable;
import java.util.Collections;
import org.testng.annotations.Test;
@ -9,6 +10,7 @@ import org.testng.annotations.Test;
public class ReflectionUtilsTest {
static class Foo implements Serializable {
public void f1() {
}

View file

@ -2,95 +2,95 @@
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>ray-streaming</artifactId>
<groupId>io.ray</groupId>
<version>0.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>ray-streaming</artifactId>
<groupId>io.ray</groupId>
<version>0.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>streaming-state</artifactId>
<name>ray streaming state</name>
<description>ray streaming state</description>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${testng.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<artifactId>streaming-state</artifactId>
<name>ray streaming state</name>
<description>ray streaming state</description>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>${fst.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.27</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${testng.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>${fst.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.27</version>
</dependency>
</dependencies>
</project>

View file

@ -21,21 +21,20 @@ package io.ray.streaming.state;
/**
* TransactionState interface.
* <p>
* Streaming State should implement transaction in case of failure,
* which in our case is four default method, finish, commit, ackCommit, rollback.
* Streaming State should implement transaction in case of failure, which in our case is four
* default method, finish, commit, ackCommit, rollback.
*/
public interface StateStoreManager {
/**
* The finish method is used when the batched data is all saved in state.
* Normally, serialization job is done here.
* The finish method is used when the batched data is all saved in state. Normally, serialization
* job is done here.
*/
void finish(long checkpointId);
/**
* The commit method is used for persistent, and can be used in another thread to reach async
* state commit.
* Normally, data persistent is done here.
* state commit. Normally, data persistent is done here.
*/
void commit(long checkpointId);

View file

@ -20,6 +20,7 @@ package io.ray.streaming.state.backend;
import static io.ray.streaming.state.config.ConfigKey.DELIMITER;
import io.ray.streaming.state.config.ConfigKey;
import io.ray.streaming.state.keystate.desc.AbstractStateDescriptor;
import io.ray.streaming.state.serialization.KeyMapStoreSerializer;

Some files were not shown because too many files have changed in this diff Show more