This is a quick start guide intended for people with some familiarity with the Rust programming language. It is intended as a jump start to get you writing Renoir programs quickly, but it is not complete, so refer to the rest of the documentation for more details.

The fastest introduction to Renoir is to start by thinking of it as smart iterators (you may have seen a similar approach with Rayon)

With Iterator you have a sequence of operators and you apply transformations such as map() or filter() to transform the sequence and eventually you will either collect the result to a collection or perform some kind of operation that consumes the iterator, like sum().

Renoir’s Stream work the same way, you can think of streams in a similar way to iterators, they allow you to start from a Source that generates a sequence of items, transform them using Operators and collect them or consume the stream using a Sink.

The key difference is that Renoir’s stream are optimized for parallel and distributed computations and can be seamlessly executed on one or multiple connected machines.

Context

Due to the distributed nature of Renoir, we need to do a couple of things before we get started. (We start with an example with a local deployment and show how to easily pass to a distributed deployment later)

// We impor the core components of renoir
use renoir::prelude::*;

fn main() {
 let ctx = StreamContext::new_local();
 // ... Streams and operators
 ctx.execute_blocking();
 // ...
}

Every Renoir Stream lives within a StreamContext. The context can contain multiple streams and operators and is the object that rules the execution of all the streams within it.

  1. A StreamContext is created
  2. One or more Streams are defined within the context
  3. The StreamContext is executed resulting in the execution of all streams within it

By default Renoir provides an execute_blocking() method that starts all the streams and operators and waits until all have finished. It is possible to run the execution in the background by running the StreamContext::execute_blocking() method in another thread

std::thread::spawn(move || ctx.execute_blocking());

Or it is also possible to use the asynchronous StreamContext::execute() method if the tokio feature is enabled. Note: for performance reasons, only some parts of the system are executed on the asynchronous scheduler when the feature is enabled, while most operators run on separate threads.

From Iterators to Streams

Now we will see how easy it is to move from an iterator to a stream.

We want to create a stream that takes a range of numbers from 0 to 200, filters the numbers that are divisible by 3 or 5, multiplies them by 2 and collects the result into a vector.

// With iterators
fn main() {
 let input = 0..200;
 let output = input
  .filter(|x| x % 3 == 0 || x % 5 == 0)
  .map(|x| x * 2)
  .collect::<Vec<_>>();
 println!("{output:?}");
}

With Renoir, we just need to create a context, create a stream from the iterator and apply the same operators.

// With renoir
use renoir::prelude::*;
fn main() {
 let ctx = StreamContext::new_local();
 let input = 0..200;

 // We are streaming the iterator from our machine
 let output = ctx.stream_iter(input)
  .filter(|x| x % 3 == 0 || x % 5 == 0)
  .map(|x| x * 2)
  .collect_vec();
  // We collect the output back to our machine
 ctx.execute_blocking();

 // Since this same streams could be executed in a distributed deployment,
 // we need to make sure that this node is the one that collected the output.
 if let Some(output) = output.get() {
  println!("{output:?}");
 }
}

With Renoir, we can easily move from a single-threaded iterator to a parallel and distributed stream by just changing a few lines and cutting down the execution time to a fraction of the original.

Distributing the data

In the previous example, we used a single node deployment (StreamContext::new_local()) and we used the IteratorSource, which takes as input an iterator from the first node in the deployment and feeds its elements into a stream.

What if we wanted to run this in parallel?

We have multiple options:

  • Partition and distribute the data after the source randomly
  • Partition and distribute the data after the source according to a grouping logic
  • Use a parallel source

Shuffling items

let output = ctx.stream_iter(input)
 .shuffle()
 .filter(|x| x % 3 == 0 || x % 5 == 0)
 .map(|x| x * 2)
 .collect_vec();

By adding a shuffle operator after our source, elements will be distributed uniformly between all the available replicas for the next operator. (As we are still in a local deployment, by default operators that have no limits on replication will be replicated a number of times equal to the available cores in the system)

Grouping items

One of the most versatile operators in Renoir’s toolkit is the group_by operator and its derivatives. This operator allows you to define a Key for each element, elements with the same Key belong to the same group. When items are grouped, the groups are divided between replicas according to the Hash of the Key.

After applying a grouping operator, the Stream will become a KeyedStream that allows to interact with the stream using the grouping information

let output = ctx.stream_iter(input)
 .group_by(|x| x / 10)
 .filter(|(_key, x)| x % 3 == 0 || x % 5 == 0)
 .map(|(_key, x)| x * 2)
 .collect_vec();
// Note: the output of this example is different from the previous

Parallel Source

Another way to distribute the data is to use a parallel source. Using this source Renoir will create a numeber of replicas of the source and execute them in parallel.

In the following example, we distribute the range of numbers between the different cores making the whole computation parallel.

let n = 200;
ctx.stream_par_iter(
     move |id, instances| {
         let chunk_size = (n + instances - 1) / instances;
        let remaining = n - n.min(chunk_size * id);
        let range = remaining.min(chunk_size);

        let start = id * chunk_size;
        let stop = id * chunk_size + range;
        start..stop
    })
    .group_by(|x| x / 10)
 .filter(|(_key, x)| x % 3 == 0 || x % 5 == 0)
 .map(|(_key, x)| x * 2)
 .collect_vec();