Vincenzo Gulisano

Documentation for the Liebre project.

View My GitHub Profile

The basics

A streaming application runs a query, a directed acyclic graph of sources, operators and sinks connected by streams:

  1. Sources produce tuples.
  2. Operators consume input tuple and produce output tuples.
  3. Sinks consume tuples.

When you create your query, you add sources, operators and sinks and connect them with streams.

A first example (complete example here)

In this example, a source creates a stream of tuples with attributes <timestamp,key,value> and feeds them to an operator that multiplies the value by 2. This operator feeds its output tuples to a sink that prints them.

class MyTuple implements Tuple {
	public long timestamp;
	public int key;
	public int value;

	public MyTuple(long timestamp, int key, int value) {
		this.timestamp = timestamp;
		this.key = key;
		this.value = value;
	}
}

Query q = new Query();

StreamKey<MyTuple> inKey = q.addStream("in", MyTuple.class);
StreamKey<MyTuple> outKey = q.addStream("out", MyTuple.class);

q.addSource("inSource", new BaseSource<MyTuple>() {
	Random r = new Random();

	@Override
	protected MyTuple getNextTuple() {
		Util.sleep(100);
		return new MyTuple(System.currentTimeMillis(), r.nextInt(5), r
			.nextInt(100));
	}
}, inKey);

q.addOperator("multiply", new BaseOperator<MyTuple, MyTuple>() {
	@Override
	protected List<MyTuple> processTuple(MyTuple tuple) {
		List<MyTuple> result = new LinkedList<MyTuple>();
		result.add(new MyTuple(tuple.timestamp, tuple.key,
			tuple.value * 2));
		return result;
	}
}, inKey, outKey);

q.addSink("outSink", new BaseSink<MyTuple>() {
	@Override
	protected void processTuple(MyTuple tuple) {
		System.out.println(tuple.timestamp + "," + tuple.key + ","
			+ tuple.value);
	}
}, outKey);

q.activate();
Util.sleep(30000);
q.deActivate();

Please notice:

  1. You start defining a class for your tuples, which implements the Tuple interface.
  2. You can add streams to your query using the method addStream, providing an id for the stream and the class for the type of tuples that will be added and read from it. This method returns you a key you will use later on when adding sources, operators and sinks.
  3. You can add a source to your query using the method addSource. For this method, you provide:
    • an id for the source.
    • an instance of BaseSource, for which you specify the method getNextTuple.
    • the key of the stream to which the tuples produced by the source will be added.
  4. You can add an operator to your query using the method addOperator. For this method, you provide:
    • an id for the operator.
    • an instance of BaseOperator, for which you specify the method processTuple. The BaseOperator allows you to return more than one tuple for each incoming tuple. If no tuple is returned, the method can return an empty list.
    • the key of the stream from which tuples are read by the operator.
    • the key of the stream to which tuples are added by the operator.
  5. You can add a sink to your query using the method addSink. For this method, you provide:
    • an id for the sink.
    • an instance of BaseSink, for which you specify the method processTuple.
    • the key of the stream from which tuples are read by the sink.
  6. You can activate and de-activate the query with methods activate and deactivate.