Documentation for the Liebre project.
Liebre gives you the possibility of fully defining the semantics of sources, operators and sinks. At the same time, it offers a set of common sources, operators and sinks. Here you find their description.
If you are reading tuples from a text file, then the TextSource allows you to minimize the information you need to provide to the query in order to instantiate such a source. In the following example, we read lines from a text file and convert them to tuples composed by attributes <timestamp,key,value> from a file.
Source<String> i1 = q.addTextFileSource("I1", inputFile);
Operator<String, MyTuple> inputReader =
q.addMapOperator(
"map",
line -> {
Util.sleep(100);
String[] tokens = line.split(",");
return new MyTuple(
Long.valueOf(tokens[0]), Integer.valueOf(tokens[1]), Integer.valueOf(tokens[2]));
});
Please notice: As for the example in the basics, you need to specify an id for the operator you are adding (in this case i1).
The Map operator allows you to transform each input tuple into a different output tuple (possibly of a different type). In the following example, we transform each input tuple of type InputTuple into a tuple of type OutputTuple:
class MyTuple {
public long timestamp;
public int key;
public int value;
public MyTuple(long timestamp, int key, int value) {
this.timestamp = timestamp;
this.key = key;
this.value = value;
}
}
class OutputTuple {
public long timestamp;
public int key;
public int valueA;
public int valueB;
public int valueC;
public OutputTuple(MyTuple t) {
this.timestamp = t.timestamp;
this.key = t.key;
this.valueA = t.value * 2;
this.valueB = t.value / 2;
this.valueC = t.value + 10;
}
}
Operator<MyTuple, OutputTuple> transform =
q.addMapOperator("transform", tuple -> new OutputTuple(tuple));
Please notice:
Operator<MyTuple, MyTuple> multiply =
q.addMapOperator(
"multiply", tuple -> new MyTuple(tuple.timestamp, tuple.key, tuple.value * 2));
The FlatMap operator is similar to the Map operator, but it allows to transform each input tuple into zero, one or more output tuples. Continuing the previous example, you can add it to your query as follows.
Operator<MyTuple, MyTuple> multiply = q.addFlatMapOperator("multiply",
tuple -> {List<MyTuple> result = new LinkedList<MyTuple>();
result.add(new MyTuple(tuple.timestamp, tuple.key, tuple.value * 2));
result.add(new MyTuple(tuple.timestamp, tuple.key, tuple.value * 3));
result.add(new MyTuple(tuple.timestamp, tuple.key, tuple.value * 4));
return result;
});
The Filter operator allows you to check whether a certain input tuple should be forwarded or not. In the following example, each input tuple of type MyTuple is forwarded if the value is greater than 150.
q.addFilterOperator("filter", tuple -> tuple.value >= 150);
Please notice:
The Router operator allows you to forward each input tuple to one or multiple output streams. In the following example, each input tuple of type MyTuple is forwarded to two output streams defined for the operator.
Operator<MyTuple, MyTuple> router = q.addRouterOperator("router");
Operator<MyTuple, MyTuple> filterHigh = q.addFilterOperator("fHigh", t -> Integer.valueOf(t.getKey()) < 5);
Operator<MyTuple, MyTuple> filterLow = q.addFilterOperator("fLow", t -> Integer.valueOf(t.getKey()) > 4);
q.connect(router, filterHigh).connect(router, filterLow)
Differently from the Map’s and the Filter’s stateless analysis, the Aggregate runs stateful analysis. Stateless means each output tuple depends on exactly one input tuple (or to be more formal, it means that the operator does not maintain a state that evolves depending on the tuples being processed). Stateful, on the other hand, means that each output tuple (potentially) depends on multiple input tuples.
The Aggregate operator aggregates multiple tuples with functions such as sum, max, min or any other user-defined function. Since streams are unbounded, the aggregation is performed over windows. The Aggregate operator allows for such functions to be computed over all the incoming tuples or for different group-by values.
The semantics of streaming aggregation depend on the type and behavior of its window, while the memory footprint and the processing cost depend on its internal implementation. Many variations have been discussed in the literature. The Aggregate currently provided by Liebre is for time-based sliding windows and is implemented maintaining a single window for each distinct group-by value. In the following, we build an example step-by-step.
For an Aggregate operator, our input and output tuples must implement the RichTuple interface:
class InputTuple implements RichTuple {
public long timestamp;
public int key;
public int value;
public InputTuple(long timestamp, int key, int value) {
this.timestamp = timestamp;
this.key = key;
this.value = value;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public String getKey() {
return key + "";
}
}
class OutputTuple implements RichTuple {
public long timestamp;
public int key;
public int count;
public double average;
public OutputTuple(long timestamp, int key, int count,
double average) {
this.timestamp = timestamp;
this.key = key;
this.count = count;
this.average = average;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public String getKey() {
return key + "";
}
}
Please notice:
For your convenience, you can also extend the BaseRichTuple class, which defines fields and method to keep the timestamp and key fields:
class InputTuple extends BaseRichTuple {
public int value;
public InputTuple(long timestamp, int key, int value) {
super(timestamp, key + "");
this.value = value;
}
}
class OutputTuple extends BaseRichTuple {
public int count;
public double average;
public OutputTuple(long timestamp, int key, int count,
double average) {
super(timestamp, key + "");
this.count = count;
this.average = average;
}
}
Once the input and output tuples types are defined, you can specify the function you will use to aggregate the data. In the following example, our window will count the tuples observed in the window and also compute the average for the field value:
class AverageWindow extends BaseTimeBasedSingleWindow<InputTuple, OutputTuple> {
private double count = 0;
private double sum = 0;
@Override
public void add(InputTuple t) {
count++;
sum += t.value;
}
@Override
public void remove(InputTuple t) {
count--;
sum -= t.value;
}
@Override
public OutputTuple getAggregatedResult() {
double average = count > 0 ? sum / count : 0;
return new OutputTuple(startTimestamp, Integer.valueOf(key),
(int) count, average);
}
@Override
public TimeBasedSingleWindow<InputTuple, OutputTuple> factory() {
return new Win();
}
}
Please notice:
Once you define the types for the input and output tuples and the window, you can then add the aggregate to the query:
q.addAggregateOperator("aggOp", new AverageWindow(), WINDOW_SIZE, WINDOW_SLIDE)
As shown, aside from the id of the operator and an instance of the window, you also specify the window size and the window advance, using the same unit of measure used by your tuples. For instance, if the timestamp of the tuples are in seconds, the following aggregate defined a window of size 4 weeks and advance 1 week:
q.addAggregateOperator("aggOp", new AverageWindow(), 4 * 7 * 24 * 3600,
7 * 24 * 3600);
Similarly to the Aggregate, the Join is a stateful operator. It compares tuples from 2 streams with a given predicate and forwards an output tuple every time the predicate holds. For this operator, you can specify different types for the two input tuples and for the output one. Since the join also operates on time-based sliding windows, these types should implement RichTuple or extend BaseRichTuple:
class InputTuple1 extends BaseRichTuple {
public int a;
public InputTuple1(long timestamp, int a) {
super(timestamp, "");
this.a = a;
}
}
class InputTuple2 extends BaseRichTuple {
public int b;
public InputTuple2(long timestamp, int b) {
super(timestamp, "");
this.b = b;
}
}
class OutputTuple extends BaseRichTuple {
public InputTuple1 t1;
public InputTuple2 t2;
public OutputTuple(long timestamp, InputTuple1 t1, InputTuple2 t2) {
super(timestamp, "");
this.t1 = t1;
this.t2 = t2;
}
}
Please notice: Since the field key is not used in this example, we simply set an empty string for it.
Once the types for input and output tuples are defined, we can proceed adding the join operator to the query. In the example, we want to produce an output tuple carrying each pair of input tuples for which a < b:
q.addJoinOperator("join",
new JoinFunction<InputTuple1, InputTuple2, OutputTuple>() {
@Override
public OutputTuple apply(InputTuple1 t1, InputTuple2 t2) {
if (t1.a < t2.b) {
return new OutputTuple(t1.getTimestamp(), t1, t2);
}
return null;
}
}, 10000);
Please notice:
If you are writing tuples to a text file, then the TextSink allows you to minimize the information you need to provide to the query in order to instantiate such a sink. In the following example, we write tuples composed by attributes <timestamp,key,value> to a file (we use again the MyTuple defined for the TextSink).
q.addTextFileSink("o1", outputFile, true);
Please notice: as for the example in the basics, you need to specify an id for the operator you are adding (in this case o1).