Documentation for the Liebre project.
A streaming application runs a query, a directed acyclic graph of sources, operators and sinks connected by streams:
When you create your query, you add sources, operators and sinks and connect them with streams.
In this example, a source creates a stream of tuples with attributes <timestamp,key,value> and feeds them to an operator that multiplies the value by 2. This operator feeds its output tuples to a sink that prints them.
public class SimpleQuery {
public static void main(String[] args) {
Query q = new Query();
Source<MyTuple> source = q.addBaseSource("I1", new SourceFunction<MyTuple>() {
private final Random r = new Random();
@Override
public MyTuple get() {
Util.sleep(50);
return new MyTuple(System.currentTimeMillis(), r.nextInt(5), r.nextInt(100));
}
});
Operator<MyTuple, MyTuple> multiply = q
.addOperator(new BaseOperator1In<MyTuple, MyTuple>("M") {
@Override
public List<MyTuple> processTupleIn1(MyTuple tuple) {
List<MyTuple> result = new LinkedList<MyTuple>();
result.add(new MyTuple(tuple.timestamp, tuple.key, tuple.value * 2));
return result;
}
});
Sink<MyTuple> sink = q.addBaseSink("O1",
tuple -> System.out.println(tuple.timestamp + "," + tuple.key + "," + tuple.value));
q.connect(source, multiply).connect(multiply, sink);
q.activate();
Util.sleep(30000);
q.deActivate();
}
private static class MyTuple {
public long timestamp;
public int key;
public int value;
public MyTuple(long timestamp, int key, int value) {
this.timestamp = timestamp;
this.key = key;
this.value = value;
}
}
}
Please notice: