Vincenzo Gulisano

Documentation for the Liebre project.

View My GitHub Profile

The basics

A streaming application runs a query, a directed acyclic graph of sources, operators and sinks connected by streams:

  1. Sources produce tuples.
  2. Operators consume input tuple and produce output tuples.
  3. Sinks consume tuples.

When you create your query, you add sources, operators and sinks and connect them with streams.

A first example (complete example here)

In this example, a source creates a stream of tuples with attributes <timestamp,key,value> and feeds them to an operator that multiplies the value by 2. This operator feeds its output tuples to a sink that prints them.

public class SimpleQuery {

  public static void main(String[] args) {

    Query q = new Query();
    Source<MyTuple> source = q.addBaseSource("I1", new SourceFunction<MyTuple>() {
      private final Random r = new Random();

      @Override
      public MyTuple get() {
        Util.sleep(50);
        return new MyTuple(System.currentTimeMillis(), r.nextInt(5), r.nextInt(100));
      }
    });

    Operator<MyTuple, MyTuple> multiply = q
        .addOperator(new BaseOperator1In<MyTuple, MyTuple>("M") {
          @Override
          public List<MyTuple> processTupleIn1(MyTuple tuple) {
            List<MyTuple> result = new LinkedList<MyTuple>();
            result.add(new MyTuple(tuple.timestamp, tuple.key, tuple.value * 2));
            return result;
          }
        });

    Sink<MyTuple> sink = q.addBaseSink("O1",
        tuple -> System.out.println(tuple.timestamp + "," + tuple.key + "," + tuple.value));

    q.connect(source, multiply).connect(multiply, sink);

    q.activate();
    Util.sleep(30000);
    q.deActivate();

  }

  private static class MyTuple {

    public long timestamp;
    public int key;
    public int value;

    public MyTuple(long timestamp, int key, int value) {
      this.timestamp = timestamp;
      this.key = key;
      this.value = value;
    }
  }
}

Please notice:

  1. You can define a class for your tuples (MyTuple in the example).
  2. You can add a source to your query using the method addBaseSource. For this method, you provide:
    • an id for the source.
    • an instance of SourceFunction, for which you specify the method get.
  3. You can add an operator to your query using the method addOperator. For this method, you provide:
    • an id for the operator.
    • an instance of BaseOperatorIn1, for which you specify the method processTupleIn1. The BaseOperatorIn1 allows you to return more than one tuple for each incoming tuple. If no tuple is returned, the method can return an empty list.
  4. You can add a sink to your query using the method addBaseSink. For this method, you provide:
    • an id for the sink.
    • an instance of SinkFunction, for which you specify the method accept. Notice that, as shown in the example, Lambda expressions can also be used for compact notation.
  5. Finally, you connect the source to the operator and the operator to the sink with the method connect.
  6. You can activate and de-activate the query with methods activate and deactivate.