There will be times where we need to merge multiple streams together to obtain the insights we need. Renoir provides a set of operators that allow you to merge multiple streams together.

Join

The most common operation is to join two streams together. The join operator allows you to join two streams together based on a key. The key is evaluated using a closure (one for each stream) and two elements are joinded together if the keys are equal.

This operator is similar to the SQL query SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b).

let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();

Renoir offers also two other variants of the join operator:

  • left_join that keeps all the elements of the left stream and the elements of the right stream that have a matching key.
  • outer_join that keeps all the elements of both streams and the elements of the right stream that have a matching key.
  • interval_join that allows to join two streams based on a time interval. The element of the right stream are joined with the element of the left stream if its timestamp is inside an interval centered on the timestamp of the left element.

The join operator is also available for KeyedStream, in that case the key is the one used to partition the stream.

A more experience user may want to use the join_with operator that allows to customize the behaviour of the join. You can select which ship strategy and which local strategy to use. A complete list of the possible join conditions can be found in the Join_With API documentation.

Merge

The merge operator allows you to merge two streams together. The operator takes two streams and returns a new stream that contains all the elements of the two streams. The elements are not ordered in any way.

let s1 = env.stream_iter(0..10);
let s2 = env.stream_iter(10..20);
let res = s1.merge(s2).collect_vec();

Zip

The zip operator allows you to merge two streams together. The operator takes two streams and returns a new stream that contains the elements of the two streams zipped together. The elements are ordered in the same way they are produced by the two streams.

let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter()));
let s2 = env.stream_iter((vec![1, 2, 3].into_iter()));
let res = s1.zip(s2).collect_vec();

All the element after the end of the shortest stream will be discarted