Renoir
REactive Network of Operators In Rust
Renoir (short: Noir) [/ʁənwaʁ/, /nwaʁ/] is a distributed data processing platform based on the dataflow paradigm that provides an ergonomic programming interface, similar to that of Apache Flink, but has much better performance characteristics.
Renoir converts each job into a dataflow graph of operators and groups them in blocks. Blocks contain a sequence of operors which process the data sequentially without repartitioning it. They are the deployment unit used by the system and can be distributed and executed on multiple systems.
The common layout of a Renoir program starts with the creation of a StreamContext, then one or more Sources are initialised creating a Stream. The graph of operators is composed using the methods of the Stream object, which follow a similar approach to Rust’s Iterator trait allowing ergonomically define a processing workflow through method chaining.
Examples
Wordcount
use renoir::prelude::*;
fn main() {
// Convenience method to parse deployment config from CLI arguments
let (config, args) = RuntimeConfig::from_args();
config.spawn_remote_workers();
let env = StreamContext::new(config);
let result = env
// Open and read file line by line in parallel
.stream_file(&args[1])
// Split into words
.flat_map(|line| tokenize(&line))
// Partition
.group_by(|word| word.clone())
// Count occurrences
.fold(0, |count, _word| *count += 1)
// Collect result
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
if let Some(result) = result.get() {
// Print word counts
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
// Simple tokenisation strategy
s.split_whitespace().map(str::to_lowercase).collect()
}
// Execute on 6 local hosts `cargo run -- -l 6 input.txt`
Wordcount associative (faster)
use renoir::prelude::*;
fn main() {
// Convenience method to parse deployment config from CLI arguments
let (config, args) = RuntimeConfig::from_args();
let env = StreamContext::new(config);
let result = env
.stream_file(&args[1])
// Adaptive batching(default) has predictable latency
// Fixed size batching often leads to shorter execution times
// If data is immediately available and latency is not critical
.batch_mode(BatchMode::fixed(1024))
.flat_map(move |line| tokenize(&line))
.map(|word| (word, 1))
// Associative operators split the operation in a local and a
// global step for faster execution
.group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
.unkey()
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
if let Some(result) = result.get() {
// Print word counts
result.into_iter().for_each(|(_, (word, count))| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
s.split_whitespace().map(str::to_lowercase).collect()
}
// Execute on multiple hosts `cargo run -- -r config.toml input.txt`
Remote deployment
# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16
[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }
Refer to the examples directory for an extended set of working examples
Installing and creating a Renoir project
The first requirement for building a Renoir project is the Rust toolchain.
Installing Rust
- Using Rustup (Recommended): follow the instructions at https://rustup.rs/
Linux (Recommended):
~$ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | shWindows: Download and run the installer
- Using your package manager: in alternative you can use the package provided by your repository
Add rust toolchain to PATH
In order to use all the tools of the Rust toolchain we need to add the “~/.cargo/bin/” folder in our PATH
- bash:
~$ echo 'export PATH=$PATH:~/.cargo/bin/' >> ~/.bashrc
- fish:
~$ set -xU fish_user_paths $fish_user_paths ~/.cargo/bin/
Create a new cargo project
After we succesfully installed the Rust toolchain we are ready to create wonders with Renoir. To do that we are going to create a new project adding Renoir to its dependencies.
cargo new --bin renoir-test
cd renoir-test
# Add the renoir dependency to the `Cargo.toml`
# Currently we recommend using the GitHub repository directly
cargo add renoir --git https://github.com/deib-polimi/renoir
You can now open the project in your editor of choice and start writing your application using Renoir!
Bonus Tip: Development Environment
Installing the Jupyter Kernel (Optional)
Prerequisites
Install Rust following the install guide
Evcxr
Rust programs can also be executed in an interactive environment. Evcxr is an evaluation context for Rust and it provides a REPL (analog to ipython), and a Jupyter Kernel.
Installing the Jupyter Kernel
The steps to install the jupyter kernel are the following:
- Install the
evcxr_jupyterbinary:cargo install --locked evcxr_jupyter - Install the kernel:
evcxr_jupyter --install(Note: ensure that$HOME/.cargo/binis in yourPATHvariable)
Using the Jupyter Kernel in Visual Studio Code
- Install the
jupyterpackage in your python environment - Install the Jupyter extension for VS Code
- Create a new
.ipynbfile and open it - Select the Jupyter Kernel
- Click on the Select Kernel button in the top right
- Choose Jupyter Kernel …
- Choose the Rust evcxr kernel you installed earlier
Importing dependencies
Now that you have selected the kernel you can start writing code and executing it. To import dependencies you can use the :dep keyword1.
:dep renoir = { git = "https://github.com/deib-polimi/renoir" }
Now with an use statement we can import what we need to use renoir.
#![allow(unused)]
fn main() {
use renoir::prelude::*;
}Recommended prelude
The evcxr kernel can be tuned using special keywords according to your needs. We list some of the most useful tweaks you can make (these can be put in a cell at the beginning of the notebook)
:cache SIZESet cache size in MiB (use for faster compilation):opt LEVELSet the optimization level, default is no optimization (for faster execution use 1,2 or 3)
Example
#![allow(unused)]
fn main() {
:cache 500
:opt 1
:dep renoir = { git = "https://github.com/deib-polimi/renoir" }
use renoir::prelude::*;
}#![allow(unused)]
fn main() {
let ctx = StreamContext::new_local();
let result = ctx.stream_par_iter(0..100)
.map(|x| (x, x * x))
.collect_vec();
ctx.execute_blocking();
let output = result.get()
}#![allow(unused)]
fn main() {
// The :vars keyword will print the variables you have set (Note: Rust lifetime rules still apply!)
:vars
}| Variable | Type |
| output | Option<Vec<(i32,i32)>> |
#![allow(unused)]
fn main() {
println!("{output:?}");
}Some([(0, 0), (1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (6, 36), (7, 49), (8, …
it follows the same syntax of cargo toml ↩
From Iterators to Streams
This is a quick start guide intended for people with some familiarity with the Rust programming language. It is intended as a jump start to get you writing Renoir programs quickly, but it is not complete, so refer to the rest of the documentation for more details.
The fastest introduction to Renoir is to start by thinking of it as smart iterators (you may have seen a similar approach with Rayon)
With Iterator you have a sequence of operators and you apply transformations such as map() or filter() to transform the sequence and eventually you will either collect the result to a collection or perform some kind of operation that consumes the iterator, like sum().
Renoir’s Stream work the same way, you can think of streams in a similar way to iterators, they allow you to start from a Source that generates a sequence of items, transform them using Operators and collect them or consume the stream using a Sink.
The key difference is that Renoir’s stream are optimized for parallel and distributed computations and can be seamlessly executed on one or multiple connected machines.
Context
Due to the distributed nature of Renoir, we need to do a couple of things before we get started. (We start with an example with a local deployment and show how to easily pass to a distributed deployment later)
// We impor the core components of renoir
use renoir::prelude::*;
fn main() {
let ctx = StreamContext::new_local();
// ... Streams and operators
ctx.execute_blocking();
// ...
}Every Renoir Stream lives within a StreamContext. The context can contain multiple streams and operators and is the object that rules the execution of all the streams within it.
- A
StreamContextis created - One or more
Streams are defined within the context - The
StreamContextis executed resulting in the execution of all streams within it
By default Renoir provides an
execute_blocking()method that starts all the streams and operators and waits until all have finished. It is possible to run the execution in the background by running theStreamContext::execute_blocking()method in another thread#![allow(unused)] fn main() { std::thread::spawn(move || ctx.execute_blocking()); }Or it is also possible to use the asynchronous
StreamContext::execute()method if thetokiofeature is enabled. Note: for performance reasons, only some parts of the system are executed on the asynchronous scheduler when the feature is enabled, while most operators run on separate threads.
From Iterators to Streams
Now we will see how easy it is to move from an iterator to a stream.
We want to create a stream that takes a range of numbers from 0 to 200, filters the numbers that are divisible by 3 or 5, multiplies them by 2 and collects the result into a vector.
// With iterators
fn main() {
let input = 0..200;
let output = input
.filter(|x| x % 3 == 0 || x % 5 == 0)
.map(|x| x * 2)
.collect::<Vec<_>>();
println!("{output:?}");
}With Renoir, we just need to create a context, create a stream from the iterator and apply the same operators.
// With renoir
use renoir::prelude::*;
fn main() {
let ctx = StreamContext::new_local();
let input = 0..200;
// We are streaming the iterator from our machine
let output = ctx.stream_iter(input)
.filter(|x| x % 3 == 0 || x % 5 == 0)
.map(|x| x * 2)
.collect_vec();
// We collect the output back to our machine
ctx.execute_blocking();
// Since this same streams could be executed in a distributed deployment,
// we need to make sure that this node is the one that collected the output.
if let Some(output) = output.get() {
println!("{output:?}");
}
}With Renoir, we can easily move from a single-threaded iterator to a parallel and distributed stream by just changing a few lines and cutting down the execution time to a fraction of the original.
Distributing the data
In the previous example, we used a single node deployment (StreamContext::new_local()) and we used the IteratorSource, which takes as input an iterator from the first node in the deployment and feeds its elements into a stream.
What if we wanted to run this in parallel?
We have multiple options:
- Partition and distribute the data after the source randomly
- Partition and distribute the data after the source according to a grouping logic
- Use a parallel source
Shuffling items
#![allow(unused)]
fn main() {
let output = ctx.stream_iter(input)
.shuffle()
.filter(|x| x % 3 == 0 || x % 5 == 0)
.map(|x| x * 2)
.collect_vec();
}By adding a shuffle operator after our source, elements will be distributed uniformly between all the available replicas for the next operator. (As we are still in a local deployment, by default operators that have no limits on replication will be replicated a number of times equal to the available cores in the system)
Grouping items
One of the most versatile operators in Renoir’s toolkit is the group_by operator and its derivatives. This operator allows you to define a Key for each element, elements with the same Key belong to the same group. When items are grouped, the groups are divided between replicas according to the Hash of the Key.
After applying a grouping operator, the Stream will become a KeyedStream that allows to interact with the stream using the grouping information
#![allow(unused)]
fn main() {
let output = ctx.stream_iter(input)
.group_by(|x| x / 10)
.filter(|(_key, x)| x % 3 == 0 || x % 5 == 0)
.map(|(_key, x)| x * 2)
.collect_vec();
// Note: the output of this example is different from the previous
}Parallel Source
Another way to distribute the data is to use a parallel source. Using this source Renoir will create a numeber of replicas of the source and execute them in parallel.
In the following example, we distribute the range of numbers between the different cores making the whole computation parallel.
#![allow(unused)]
fn main() {
let n = 200;
ctx.stream_par_iter(
move |id, instances| {
let chunk_size = (n + instances - 1) / instances;
let remaining = n - n.min(chunk_size * id);
let range = remaining.min(chunk_size);
let start = id * chunk_size;
let stop = id * chunk_size + range;
start..stop
})
.group_by(|x| x / 10)
.filter(|(_key, x)| x % 3 == 0 || x % 5 == 0)
.map(|(_key, x)| x * 2)
.collect_vec();
}Going Parallel
In this section we will see how easy it is to run the computaion of Renoir in parallel and distribute it across multiple machines.
The informations about the environment in which the computation will run are stored in a StreamContext. The context can contain multiple streams and operators and is the object that rules the execution of all the streams within it.
Local Deployment
To run a computation on a single machine, you can create a StreamContext with the new_local() method. This method creates a context that will run the stream using all the available resources of the machine.
#![allow(unused)]
fn main() {
let ctx = StreamContext::new_local();
// ... Streams and operators
}if you want to specify the number of threads to use, you can create a custom RuntimeConfig easily by using the local(..) method.
#![allow(unused)]
fn main() {
let config = RuntimeConfig::local(4).unwrap();
let ctx = StreamContext::new(config);
// ... Streams and operators
}Distributed Deployment
To run a computation on multiple machines, you can create a StreamContext with the remote(..) method. This method takes as argument the path to a configuration file (toml) that contains the information about the cluster.
#![allow(unused)]
fn main() {
let config = RuntimeConfig::remote("path/to/config.toml").unwrap();
config.spawn_remote_workers();
let ctx = StreamContext::new(config);
// ... Streams and operators
}If you want to use a distributed environment you have to spawn them using
spawn_remote_workersbefore asking for some stream.
the configuration file should contain the information necessary to connect to the various machines in the cluster, for example:
# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16
[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }
And just like that your pipeline will be automatically distributed across both machines.
Available options for the
RuntimeConfigare:
address: a string with the address of the machinebase_port: starting port for the communication between operators on different machinesnum_cores: number of cores available on the machinessh: object to store the ssh connection information
ssh_port: port to connect to the machineusername: username to connect to the machinepassword: password to connect to the machinekey_file: path to the private key filekey_passphrase: passphrase for the private key file
Context from Arguments
To decide the environment in which the computation will run each time, you can pass the context as an argument to the program.
#![allow(unused)]
fn main() {
let (config, args) = RuntimeConfig::from_args();
let ctx = StreamContext::new(config);
// ... Streams and operators
}and when you run the program you can pass the arguments to the program, specifing if you want to run the computation locally or remotely.
cargo run -- --local num_of_thread
cargo run -- --remote path/to/config.toml
From Flink to Renoir
This guide assumes you have already set-up an environment for Renoir and created a cargo project following the guide
This quick introduction follows a hands-on approach showing examples comparing the Flink API to the Renoir API
If you know Apache Flink, you will find it easy to start to use Renoir.
Just like in Flink, computations in Renoir are defined as a graph of operators, where data flows from one operator to another.
Getting started: Wordcount
As a first task we will implement a word counting application in both Flink and Renoir, the objective is to read a file and count the occurence of all distinct words contained in it.
Flink
// Imports...
public class WordCount {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
long start = System.nanoTime();
DataSet < String > text = env.readTextFile("text-file.txt");
DataSet < Tuple2 < String, Integer >> counts =
text.flatMap((value, collector) - > {
String[] token = value.split("\\s+");
for (String token: tokens) {
if (token.length() > 0) {
out.collect(new Tuple2 < > (token.toLowerCase(), 1));
}
}
})
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
counts.count();
long stop = System.nanoTime();
System.out.printf("total:%d", stop - start);
counts.sort(Comparator.comparing((t) - > getCount(t)));
for (Tuple2 < String, Integer > tuple: counts) {
System.out.printf("%s %d\n", tuple.f0, tuple.f1);
}
}
}
Renoir
use renoir::prelude::*;
fn main() {
let ctx = StreamContext::new_local();
let result = ctx
.stream_file("/etc/passwd")
.flat_map(|line| {
line.split_whitespace()
.map(|t| t.to_lowercase())
.collect::<Vec<String>>()
})
.group_by_count(|word: &String| word.clone())
.collect_vec();
let start = Instant::now();
ctx.execute_blocking();
let elapsed = start.elapsed();
if let Some(mut res) = result.get() {
res.sort_by_key(|t| t.1);
println!("{res:#?}");
}
println!("{elapsed:?}");
}
WIP: This guide is still incomplete
Sequential Transformations
Renoir offers a multitude of operators to transform and manipulate data streams. In this section, we will see how to use the basic operators to perform sequential transformations on a stream.
A sequential transformation is an operation that is applied to each element of the stream once in sequence. This allows us to obtain the maximum level of parallelism and to distribute the load evenly among the available resources.
Map
The map operator is used to apply a function to each element of the stream. The function can be any closure that takes an element of the stream as input and returns a new element.
#![allow(unused)]
fn main() {
// Multiply each element of the stream by 10
let res = s.map(|n| n * 10).collect_vec();
}The map operator since it is applied to each element independently can use the full power of parallelism.
RichMap
The rich_map operator is similar to the map operator but it allows to keep a state between the elements of the stream. The function passed to the rich_map operator can be stateful and maintain a state for each replica.
#![allow(unused)]
fn main() {
// Enumerate the elements of the stream
let res = s.rich_map({
let mut id = 0;
move |x| {
id += 1;
(id - 1, x)
}
}).collect_vec();
}Note that the state is kept for each replica of the operator, so in this case, if we keep the parallelism there will be multiple elements with the same ID (one for each replica).
Filter
The filter operator is used to keep only the elements of the stream that satisfy a certain condition. The function passed to the filter operator must return a boolean value.
#![allow(unused)]
fn main() {
// Keep only the even elements of the stream
let res = s.filter(|&n| n % 2 == 0).collect_vec();
}The filter operator since it is applied to each element independently can use the full power of parallelism.
Flatten
The flatten operator is used to flatten a stream of collections of elements. It takes a stream of containers and returns a stream with all the contained elements.
#![allow(unused)]
fn main() {
let s = env.stream_iter((vec![
vec![1, 2, 3],
vec![],
vec![4, 5],
].into_iter()));
let res = s.flatten().collect_vec();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![1, 2, 3, 4, 5]);
}Concatenation of Sequential Operators
To help the user write clean and readable code, Renoir offers a series of concatenations of previous operators in a single call. This allows to write complex transformations in a single line of code. From simple concatenations like flat_map where the result of a map is flattened, to more complex ones like rich_filter_map where the user can perform a stateful map and filter in a single operation.
For a complete list of operators see the API documentation.
Most of the time elements in a stream need to be grouped or partitioned in some way to retrieve the desired result. Renoir provides a set of operators that do exactly that.
Grouping
Group By
Probably the most common operation is to group together elements by some common key. The key can be extracted from the element itself or computed from it. The group_by operator takes a closure that returns the key for each element and returns a KeyedStream where the operators are evaluated over elements with the same key.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
// partition even and odd elements
let keyed = s.group_by(|&n| n % 2);
}After the partitioning all the elements will be sent to the network to balance the load but if the desired result is an aggregation in many cases is advisable to use an associative variant of the
group_byoperator likegroup_by_reduceorgroup_by_sum, a complete list of the possible associative variant can be found in the Group By API documentation.
Key By
ADVANCED OPERATOR
Create a new ‘KeyedStream’ in a similar way to the group_by operator but without shuffling the elements over the network. This can make other operators misbehave. You probably want to use group_by instead.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.key_by(|&n| n % 2).collect_vec();
}Partitioning
Route
Sometimes there is the need to send elements to different routes based on some condition. The route operator allows to create a series of routes and send the elements to the correct route based on the first met condition.
- Routes are created with the
add_routemethod, a new stream is created for each route. - Each element is routed to the first stream for which the routing condition evaluates to true.
- If no route condition is satisfied, the element is dropped
#![allow(unused)]
fn main() {
let mut routes = s.route()
.add_route(|&i| i < 5)
.add_route(|&i| i % 2 == 0)
.build()
.into_iter();
assert_eq!(routes.len(), 2);
// 0 1 2 3 4
routes.next().unwrap().for_each(|i| eprintln!("route1: {i}"));
// 6 8
routes.next().unwrap().for_each(|i| eprintln!("route2: {i}"));
// 5 7 9 ignored
env.execute_blocking();
}Split
Create multiple streams from a single stream where each split will have all the elements of the original stream. The split operator is useful when you need to apply different transformations to the same stream.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let mut splits = s.split(3);
let a = splits.pop().unwrap();
let b = splits.pop().unwrap();
let c = splits.pop().unwrap();
}Reductions and Folds
To obtain insights from a data stream it is often necessary to aggregate the data in some way. Renoir provides a set of operators that allow you to perform reductions and folds over the data stream to obtain the wanted information.
Reduce
The reduce operator aggregates the data of a stream following a use defined function and emit a single value. The function should modify the accumulator which will at the end be the emitted value.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.reduce(|acc, value| acc + value).collect::<Vec<_>>();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}Note that the type of the accumualtor is the same as the type of the stream elements. If a different type is needed condider using
fold.
Reduce Associative
The reduce_assoc operator is a variant of the reduce operator that can be used when the reduction function is associative. This allows the operator to be executed in parallel and can be more efficient than the reduce operator.
The opertor apply the reducing function in two steps:
- Local: the function that will be executed on each replica.
- Global: the function that will aggregate all the partial results obtained from the local functions.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.reduce_assoc(|acc, value| acc + value).collect_vec();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}Note that the type of the accumualtor is the same as the type of the stream elements. If a different type is needed condider using
fold_assoc.
Fold
The fold operator aggregates the data of a stream following a use defined function and emit a single value. The function should modify the accumulator which will at the end be the emitted value. It is similar to the reduce operator but it allows to specify an initial value and so the type for the accumulator.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.fold(0, |acc, value| *acc += value).collect_vec();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}Fold Associative
The fold_assoc operator is a variant of the fold operator that can be used when the reduction function is associative. Similar to the reduce_assoc this allows the operator to be executed in parallel and can be more efficient than the fold operator.
The operator requires two user defined functions:
- Local: the function that will be executed on each replica.
- Global: the function that will aggregate all the partial results obtained from the local functions.
#![allow(unused)]
fn main() {
Example
let s = env.stream_iter(0..5);
let res = s.fold_assoc(0, |acc, value| *acc += value, |acc, value| *acc += value).collect_vec();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}Windows
When working with unbounded data streams is often necessary to consider only a subset of the data to perform some computation. Renoir provides a set of operators that allow you to define windows over the data stream to perform the computation only on the data that is part of the window. In Renoir windows are defined by a descriptor WinDescr that allows the user to specify the type and the logic used to group the data inside the right window.
Windows Descriptors
There are several types of descriptors that can be used:
- CountWindow: defines a window based on the number of elements in the window. It can be defines as
tumblingorslidingwindows. - EventTimeWindow: defines a window based on the event timestamps of the data. It can be defines as
tumblingorslidingwindows. - ProcessingTimeWindow: defines a window based on the wall clock at time of processing. It can be defines as
tumblingorslidingwindows. - SessionWindow: defines a window that splits after if no element is received for a fixed wall clock duration.
- TransactionWindow: defines a window based on a user defined logic. A complete analysis of this descriptor can be found in the TransactionWindow API Documentation.
Windows over a single stream
If the stream is NOT partitioned in any way like using group_by or key_by operators, the window is defined over the whole stream. The operator that allows you to define a window over the stream is window_all. The window_all operator takes a descriptor of the window as an argument and return a windowed stream that can be used to perform computation over the window.
let s = env.stream_iter(0..5usize);
let res = s
.window_all(CountWindow::tumbling(2))
// rest of the pipeline
Note that since the window is defined over the whole stream, this operator cannot be parallelized. If possible partition the stream using
group_byorkey_byoperators to allow parallel execution.
Windows over a partitioned stream
If the stream is partitioned in some way like using group_by or key_by operators, the window is defined over each partition. We can define our windows using the window operator with the descriptor that we want.
let s = env.stream_iter(0..9);
let res = s
.group_by(|&n| n % 2)
.window(CountWindow::sliding(3, 2))
// rest of the pipeline
if we want to create a window after joining two KeyedStream over the same key we can use the window_join operator.
let s1 = env.stream_iter(0..9);
let s2 = env.stream_iter(0..9);
let res = s1
.key_by(|&n| n % 2)
.window_join(s2.key_by(|&n| n % 2), CountWindow::tumbling(2))
// rest of the pipeline
Operators over windows
Once the window is defined we can perform different operation to obtain the wanted information. Some of the possible operations are the standard max, min, sum or count but also more complex operations are available like the fold operator.
For a complete list of the available operators check the API Documentation.
let s = env.stream_iter(0..5usize);
let res = s
.window_all(CountWindow::tumbling(2))
.fold(0, |acc, value| acc + value)
.collect_vec();
Multi-Stream Operators
There will be times where we need to merge multiple streams together to obtain the insights we need. Renoir provides a set of operators that allow you to merge multiple streams together.
Join
The most common operation is to join two streams together. The join operator allows you to join two streams together based on a key. The key is evaluated using a closure (one for each stream) and two elements are joinded together if the keys are equal.
This operator is similar to the SQL query SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b).
#![allow(unused)]
fn main() {
let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
}Renoir offers also two other variants of the join operator:
left_jointhat keeps all the elements of the left stream and the elements of the right stream that have a matching key.outer_jointhat keeps all the elements of both streams and the elements of the right stream that have a matching key.interval_jointhat allows to join two streams based on a time interval. The element of the right stream are joined with the element of the left stream if its timestamp is inside an interval centered on the timestamp of the left element.
The join operator is also available for KeyedStream, in that case the key is the one used to partition the stream.
A more experience user may want to use the
join_withoperator that allows to customize the behaviour of the join. You can select which ship strategy and which local strategy to use. A complete list of the possible join conditions can be found in the Join_With API documentation.
Merge
The merge operator allows you to merge two streams together. The operator takes two streams and returns a new stream that contains all the elements of the two streams. The elements are not ordered in any way.
#![allow(unused)]
fn main() {
let s1 = env.stream_iter(0..10);
let s2 = env.stream_iter(10..20);
let res = s1.merge(s2).collect_vec();
}Zip
The zip operator allows you to merge two streams together. The operator takes two streams and returns a new stream that contains the elements of the two streams zipped together. The elements are ordered in the same way they are produced by the two streams.
#![allow(unused)]
fn main() {
let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter()));
let s2 = env.stream_iter((vec![1, 2, 3].into_iter()));
let res = s1.zip(s2).collect_vec();
}All the element after the end of the shortest stream will be discarted
Iterative Operators
There could be the possibility that you need to iterate over a stream multiple times to obtain the right insights or to perform a complex computation. Renoir provides a set of operators exactly for this purpose.
Iterate
This operator allows to create an iterative workflow where the data cycle through the same set of operators multiple times. This operator has several features:
- Max Iterations: The maximum number of iterations to perform. If the maximum number of iterations is reached, the operator will stop the iteration and output the current state of the data.
- Iteration State: The state that all the replica of this operator can read. The state can be updated at the end of each iteration by the
global_foldfunction. - Body: The set of operators that will be executed at each iteration. The output of the body will be used as the input of the next iteration.
- Local Fold: The function that will be executed by each replica used to update the Iteration State, the results will be aggregated by the
global_foldfunction. - Global Fold: The function that will be executed to aggregate the results of the
local_foldfunction and update the Iteration State. - Loop Condition: The condition that will be evaluated at the end of each iteration to decide if the iteration should continue or not.
let s = env.stream_iter(0..3).shuffle();
let (state, items) = s.iterate(
3, // at most 3 iterations
0, // the initial state is zero
|s, state| s.map(|n| n + 10),
|delta: &mut i32, n| *delta += n,
|state, delta| *state += delta,
|_state| true,
);
let state = state.collect_vec();
let items = items.collect_vec();
if you want to cycle back the same initial input and not the result of the previous iteration, you can use the
replayoperator. Thereplayis very similar to theiterateoperator i.e it requires the same parameters, but it will replay the input stream at each iteration.
Deployment
Work in progress…
Examples
Wordcount
use renoir::prelude::*;
fn main() {
// Convenience method to parse deployment config from CLI arguments
let (config, args) = RuntimeConfig::from_args();
config.spawn_remote_workers();
let env = StreamContext::new(config);
let result = env
// Open and read file line by line in parallel
.stream_file(&args[0])
// Split into words
.flat_map(|line| tokenize(&line))
// Partition
.group_by(|word| word.clone())
// Count occurrences
.fold(0, |count, _word| *count += 1)
// Collect result
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
if let Some(result) = result.get() {
// Print word counts
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
// Simple tokenisation strategy
s.split_whitespace().map(str::to_lowercase).collect()
}
// Execute on 6 local hosts `cargo run -- -l 6 input.txt`
Wordcount associative (faster)
use renoir::prelude::*;
fn main() {
// Convenience method to parse deployment config from CLI arguments
let (config, args) = RuntimeConfig::from_args();
let env = StreamContext::new(config);
let result = env
.stream_file(&args[0])
// Adaptive batching(default) has predictable latency
// Fixed size batching often leads to shorter execution times
// If data is immediately available and latency is not critical
.batch_mode(BatchMode::fixed(1024))
.flat_map(move |line| tokenize(&line))
.map(|word| (word, 1))
// Associative operators split the operation in a local and a
// global step for faster execution
.group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
.unkey()
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
if let Some(result) = result.get() {
// Print word counts
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
s.split_whitespace().map(str::to_lowercase).collect()
}
// Execute on multiple hosts `cargo run -- -r config.toml input.txt`
Remote deployment
# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16
[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }
Refer to the examples directory for an extended set of working examples
If you are already set-up with a development environment you are comfortable with we recommend staying with what you find to be the most productive. Otherwise if this is your first experience programming with Rust or looking for ideas to enhance your productivity, this page will show you how you can set up an effective development environment for Rust and Renoir.
Setting up an environment using VS Code
First, I would recommend working in a UNIX system if possible1, if you are on Windows you may try using Windows Subsystem for Linux (WSL) for a better experience.
As for IDE, I use VS Code for working with Rust (Note: there is also a WSL extension if you are on windows and want to work in WSL)
The rust-analyzer extension will get you most of the way with what you could need to work with Rust, there is also an official guide on how to setup vs code for Rust.
Other than that I also recommend the Error Lens extension which integrates well with rust-analyzer to show compiler errors and hints directly inline (Enable the hint diagnostic level in the settings CTRL+, for more details) The Even Better TOML and Crates extension2 for working with the Cargo.toml
Finally for CLI tools, remember to check cargo clippy --all --all-targets for good coding practices (you can use the --fix flag to automatically apply the corrections too) cargo fmt --all to format the code and I also use cargo-edit for the cargo upgrade command, which together with cargo add can be used to manage the Cargo.toml from the terminal
When evaluating performance run with --release flag
If you feel like the rust-analyzer extension is overloading with information by showing all type hints, you can disable some of them, in my config I have
- Imports>Granularity>Group: module
- Inlay Hints>Chaining Hints>Enable: false
- Inlay Hints>Type Hints>Enable: false
More suggested extensions
I am currently using Fedora KDE and used Kubuntu in the past ↩
This extension is currently unmaintained and we are looking for alternative recommendations. ↩