Renoir
REactive Network of Operators In Rust
Renoir (abbreviato: Noir) [/ʁənwaʁ/, /nwaʁ/] è una piattaforma di elaborazione dati distribuita basata sul paradigma del dataflow che fornisce un’interfaccia di programmazione ergonomica, simile a quella di Apache Flink, ma con caratteristiche prestazionali molto migliori.
Renoir converte ogni job in un grafico di dataflow di operatori e li raggruppa in blocchi. I blocchi contengono una sequenza di operatori che elaborano i dati sequenzialmente senza ripartizionarli. Sono l’unità di deployment utilizzata dal sistema e possono essere distribuiti ed eseguiti su più sistemi.
Il layout comune di un programma Renoir inizia con la creazione di un StreamContext, quindi vengono inizializzati una o più Source creando un Stream. Il grafico degli operatori viene composto utilizzando i metodi dell’oggetto Stream, che seguono un approccio simile al trait Iterator di Rust permettendo di definire ergonomicamente un flusso di lavoro di elaborazione attraverso il concatenamento di metodi.
Esempi
Conteggio Parole
use renoir::prelude::*;
fn main() {
// Metodo di convenienza per analizzare la configurazione di deployment dagli argomenti CLI
let (config, args) = RuntimeConfig::from_args();
config.spawn_remote_workers();
let env = StreamContext::new(config);
let result = env
// Apri e leggi il file riga per riga in parallelo
.stream_file(&args[1])
// Dividi in parole
.flat_map(|line| tokenize(&line))
// Partizione
.group_by(|word| word.clone())
// Conta le occorrenze
.fold(0, |count, _word| *count += 1)
// Raccogli il risultato
.collect_vec();
env.execute_blocking(); // Inizia l'esecuzione (bloccante)
if let Some(result) = result.get() {
// Stampa i conteggi delle parole
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
// Strategia di tokenizzazione semplice
s.split_whitespace().map(str::to_lowercase).collect()
}
// Esegui su 6 host locali `cargo run -- -l 6 input.txt`
Conteggio Parole Associativo (più veloce)
use renoir::prelude::*;
fn main() {
// Metodo di convenienza per analizzare la configurazione di deployment dagli argomenti CLI
let (config, args) = RuntimeConfig::from_args();
let env = StreamContext::new(config);
let result = env
.stream_file(&args[1])
// Il batching adattivo (default) ha latenza prevedibile
// Il batching di dimensione fissa spesso porta a tempi di esecuzione più brevi
// Se i dati sono immediatamente disponibili e la latenza non è critica
.batch_mode(BatchMode::fixed(1024))
.flat_map(move |line| tokenize(&line))
.map(|word| (word, 1))
// Gli operatori associativi dividono l'operazione in un passo locale e uno
// globale per un'esecuzione più veloce
.group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
.unkey()
.collect_vec();
env.execute_blocking(); // Inizia l'esecuzione (bloccante)
if let Some(result) = result.get() {
// Stampa i conteggi delle parole
result.into_iter().for_each(|(_, (word, count))| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
s.split_whitespace().map(str::to_lowercase).collect()
}
// Esegui su più host `cargo run -- -r config.toml input.txt`
Deployment remoto
# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16
[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }
Consulta la directory esempi per un set esteso di esempi funzionanti
Installazione e creazione di un progetto Renoir
Il primo requisito per costruire un progetto Renoir è la toolchain Rust.
Installazione di Rust
- Usando Rustup (Raccomandato): segui le istruzioni su https://rustup.rs/
Linux (Raccomandato):
~$ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | shWindows: Scarica ed esegui l’installer
- Usando il tuo package manager: in alternativa puoi usare il pacchetto fornito dal tuo repository
Aggiungi la toolchain rust al PATH
Per poter usare tutti gli strumenti della toolchain Rust dobbiamo aggiungere la cartella “~/.cargo/bin/” nel nostro PATH
- bash:
~$ echo 'export PATH=$PATH:~/.cargo/bin/' >> ~/.bashrc
- fish:
~$ set -xU fish_user_paths $fish_user_paths ~/.cargo/bin/
Crea un nuovo progetto cargo
Dopo aver installato con successo la toolchain Rust siamo pronti a creare meraviglie con Renoir. Per farlo creeremo un nuovo progetto aggiungendo Renoir alle sue dipendenze.
cargo new --bin renoir-test
cd renoir-test
# Aggiungi la dipendenza renoir al `Cargo.toml`
# Attualmente raccomandiamo di usare il repository GitHub direttamente
cargo add renoir --git https://github.com/deib-polimi/renoir
Ora puoi aprire il progetto nel tuo editor preferito e iniziare a scrivere la tua applicazione usando Renoir!
Suggerimento Bonus: Ambiente di Sviluppo
Installazione del Kernel Jupyter (Opzionale)
Prerequisiti
Installa Rust seguendo la guida di installazione
Evcxr
I programmi Rust possono anche essere eseguiti in un ambiente interattivo. Evcxr è un contesto di valutazione per Rust e fornisce un REPL (analogo a ipython), e un Kernel Jupyter.
Installazione del Kernel Jupyter
I passaggi per installare il kernel jupyter sono i seguenti:
- Installa il binario
evcxr_jupyter:cargo install --locked evcxr_jupyter - Installa il kernel:
evcxr_jupyter --install(Nota: assicurati che$HOME/.cargo/binsia nella tua variabilePATH)
Utilizzare il Kernel Jupyter in Visual Studio Code
- Installa il pacchetto
jupyternel tuo ambiente python - Installa l’estensione Jupyter per VS Code
- Crea un nuovo file
.ipynbe aprilo - Seleziona il Kernel Jupyter
- Clicca sul pulsante Select Kernel in alto a destra
- Scegli Jupyter Kernel …
- Scegli il kernel Rust evcxr che hai installato prima
Importare dipendenze
Ora che hai selezionato il kernel puoi iniziare a scrivere codice ed eseguirlo. Per importare dipendenze puoi utilizzare la parola chiave :dep1.
:dep renoir = { git = "https://github.com/deib-polimi/renoir" }
Ora con un’istruzione use possiamo importare quello che ci serve per utilizzare renoir.
#![allow(unused)]
fn main() {
use renoir::prelude::*;
}Prelude raccomandato
Il kernel evcxr può essere configurato usando parole chiave speciali secondo le tue necessità. Elenchiamo alcune delle modifiche più utili che puoi fare (queste possono essere messe in una cella all’inizio del notebook)
:cache SIZEImposta la dimensione della cache in MiB (usa per una compilazione più veloce):opt LEVELImposta il livello di ottimizzazione, il default è nessuna ottimizzazione (per un’esecuzione più veloce usa 1,2 o 3)
Esempio
#![allow(unused)]
fn main() {
:cache 500
:opt 1
:dep renoir = { git = "https://github.com/deib-polimi/renoir" }
use renoir::prelude::*;
}#![allow(unused)]
fn main() {
let ctx = StreamContext::new_local();
let result = ctx.stream_par_iter(0..100)
.map(|x| (x, x * x))
.collect_vec();
ctx.execute_blocking();
let output = result.get()
}#![allow(unused)]
fn main() {
// La parola chiave :vars stamperà le variabili che hai impostato (Nota: le regole di lifetime di Rust si applicano ancora!)
:vars
}| Variabile | Tipo |
| output | Option<Vec<(i32,i32)>> |
#![allow(unused)]
fn main() {
println!("{output:?}");
}Some([(0, 0), (1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (6, 36), (7, 49), (8, …
segue la stessa sintassi di cargo toml ↩
Dagli Iterator agli Stream
Questa è una guida rapida pensata per persone con una certa familiarità con il linguaggio di programmazione Rust. È pensata come un salto per iniziare a scrivere programmi Renoir velocemente, ma non è completa, quindi fai riferimento al resto della documentazione per maggiori dettagli.
L’introduzione più veloce a Renoir è iniziare pensandolo come iteratori intelligenti (potresti aver visto un approccio simile con Rayon)
Con Iterator hai una sequenza di operatori e applichi trasformazioni come map() o filter() per trasformare la sequenza e alla fine raccoglierai il risultato in una collezione o eseguirai qualche tipo di operazione che consuma l’iteratore, come sum().
Gli Stream di Renoir funzionano allo stesso modo, puoi pensare agli stream in modo simile agli iteratori, ti permettono di iniziare da una Source che genera una sequenza di elementi, trasformarli usando Operator e raccoglierli o consumare lo stream usando un Sink.
La differenza chiave è che gli stream di Renoir sono ottimizzati per computazioni parallele e distribuite e possono essere eseguiti senza soluzione di continuità su una o più macchine connesse.
Contesto
A causa della natura distribuita di Renoir, dobbiamo fare un paio di cose prima di iniziare. (Iniziamo con un esempio con deployment locale e mostriamo come passare facilmente a un deployment distribuito dopo)
// Importiamo i componenti core di renoir
use renoir::prelude::*;
fn main() {
let ctx = StreamContext::new_local();
// ... Stream e operatori
ctx.execute_blocking();
// ...
}Ogni Stream di Renoir vive all’interno di un StreamContext. Il contesto può contenere più stream e operatori ed è l’oggetto che governa l’esecuzione di tutti gli stream al suo interno.
- Un
StreamContextviene creato - Uno o più
Streamsono definiti all’interno del contesto - Il
StreamContextviene eseguito risultando nell’esecuzione di tutti gli stream al suo interno
Per default Renoir fornisce un metodo
execute_blocking()che avvia tutti gli stream e operatori e aspetta finché tutti hanno finito. È possibile eseguire l’esecuzione in background eseguendo il metodoStreamContext::execute_blocking()in un altro thread#![allow(unused)] fn main() { std::thread::spawn(move || ctx.execute_blocking()); }Oppure è anche possibile usare il metodo asincrono
StreamContext::execute()se la featuretokioè abilitata. Nota: per ragioni di performance, solo alcune parti del sistema sono eseguite sullo scheduler asincrono quando la feature è abilitata, mentre la maggior parte degli operatori funziona su thread separati.
Dagli Iterator agli Stream
Ora vedremo quanto è facile passare da un iteratore a uno stream.
Vogliamo creare uno stream che prende un range di numeri da 0 a 200, filtra i numeri che sono divisibili per 3 o 5, li moltiplica per 2 e raccoglie il risultato in un vettore.
// Con iteratori
fn main() {
let input = 0..200;
let output = input
.filter(|x| x % 3 == 0 || x % 5 == 0)
.map(|x| x * 2)
.collect::<Vec<_>>();
println!("{output:?}");
}Con Renoir, dobbiamo solo creare un contesto, creare uno stream dall’iteratore e applicare gli stessi operatori.
// Con renoir
use renoir::prelude::*;
fn main() {
let ctx = StreamContext::new_local();
let input = 0..200;
// Stiamo facendo streaming dell'iteratore dalla nostra macchina
let output = ctx.stream_iter(input)
.filter(|x| x % 3 == 0 || x % 5 == 0)
.map(|x| x * 2)
.collect_vec();
// Raccogliamo l'output di ritorno alla nostra macchina
ctx.execute_blocking();
// Dato che questi stessi stream potrebbero essere eseguiti in un deployment distribuito,
// dobbiamo assicurarci che questo nodo sia quello che ha raccolto l'output.
if let Some(output) = output.get() {
println!("{output:?}");
}
}Con Renoir, possiamo facilmente passare da un iteratore single-thread a uno stream parallelo e distribuito semplicemente cambiando poche righe e riducendo il tempo di esecuzione a una frazione dell’originale.
Distribuire i dati
Nell’esempio precedente, abbiamo usato un deployment su singolo nodo (StreamContext::new_local()) e abbiamo usato IteratorSource, che prende come input un iteratore dal primo nodo nel deployment e alimenta i suoi elementi in uno stream.
E se volessimo eseguire questo in parallelo?
Abbiamo multiple opzioni:
- Partizionare e distribuire i dati dopo la source casualmente
- Partizionare e distribuire i dati dopo la source secondo una logica di raggruppamento
- Usare una source parallela
Mescolamento degli elementi
#![allow(unused)]
fn main() {
let output = ctx.stream_iter(input)
.shuffle()
.filter(|x| x % 3 == 0 || x % 5 == 0)
.map(|x| x * 2)
.collect_vec();
}Aggiungendo un operatore shuffle dopo la nostra source, gli elementi saranno distribuiti uniformemente tra tutte le repliche disponibili per l’operatore successivo. (Dato che siamo ancora in un deployment locale, per default gli operatori che non hanno limiti sulla replicazione saranno replicati un numero di volte uguale ai core disponibili nel sistema)
Raggruppamento degli elementi
Uno degli operatori più versatili nel toolkit di Renoir è l’operatore group_by e i suoi derivati. Questo operatore ti permette di definire una Chiave per ogni elemento, elementi con la stessa Chiave appartengono allo stesso gruppo. Quando gli elementi sono raggruppati, i gruppi sono divisi tra le repliche secondo l’Hash della Chiave.
Dopo aver applicato un operatore di raggruppamento, lo Stream diventerà un KeyedStream che permette di interagire con lo stream usando le informazioni di raggruppamento
#![allow(unused)]
fn main() {
let output = ctx.stream_iter(input)
.group_by(|x| x / 10)
.filter(|(_key, x)| x % 3 == 0 || x % 5 == 0)
.map(|(_key, x)| x * 2)
.collect_vec();
// Nota: l'output di questo esempio è diverso dal precedente
}Source Parallela
Un altro modo per distribuire i dati è usare una source parallela. Usando questa source Renoir creerà un numero di repliche della source e le eseguirà in parallelo.
Nell’esempio seguente, distribuiamo il range di numeri tra i diversi core rendendo l’intero calcolo parallelo.
#![allow(unused)]
fn main() {
let n = 200;
ctx.stream_par_iter(
move |id, instances| {
let chunk_size = (n + instances - 1) / instances;
let remaining = n - n.min(chunk_size * id);
let range = remaining.min(chunk_size);
let start = id * chunk_size;
let stop = id * chunk_size + range;
start..stop
})
.group_by(|x| x / 10)
.filter(|(_key, x)| x % 3 == 0 || x % 5 == 0)
.map(|(_key, x)| x * 2)
.collect_vec();
}Andare in Parallelo
In questa sezione vedremo quanto è facile eseguire il calcolo di Renoir in parallelo e distribuirlo attraverso più macchine.
Le informazioni sull’ambiente in cui il calcolo verrà eseguito sono memorizzate in un StreamContext. Il contesto può contenere più stream e operatori ed è l’oggetto che governa l’esecuzione di tutti gli stream al suo interno.
Deployment Locale
Per eseguire un calcolo su una singola macchina, puoi creare un StreamContext con il metodo new_local(). Questo metodo crea un contesto che eseguirà lo stream usando tutte le risorse disponibili della macchina.
#![allow(unused)]
fn main() {
let ctx = StreamContext::new_local();
// ... Stream e operatori
}se vuoi specificare il numero di thread da usare, puoi creare un RuntimeConfig personalizzato facilmente usando il metodo local(..).
#![allow(unused)]
fn main() {
let config = RuntimeConfig::local(4).unwrap();
let ctx = StreamContext::new(config);
// ... Stream e operatori
}Deployment Distribuito
Per eseguire un calcolo su più macchine, puoi creare un StreamContext con il metodo remote(..). Questo metodo prende come argomento il percorso a un file di configurazione (toml) che contiene le informazioni sul cluster.
#![allow(unused)]
fn main() {
let config = RuntimeConfig::remote("path/to/config.toml").unwrap();
config.spawn_remote_workers();
let ctx = StreamContext::new(config);
// ... Stream e operatori
}Se vuoi usare un ambiente distribuito devi spawnarli usando
spawn_remote_workersprima di richiedere qualche stream.
il file di configurazione dovrebbe contenere le informazioni necessarie per connettersi alle varie macchine nel cluster, per esempio:
# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16
[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }
E proprio così la tua pipeline sarà automaticamente distribuita attraverso entrambe le macchine.
Opzioni disponibili per il
RuntimeConfigsono:
address: una stringa con l’indirizzo della macchinabase_port: porta di partenza per la comunicazione tra operatori su macchine diversenum_cores: numero di core disponibili sulla macchinassh: oggetto per memorizzare le informazioni di connessione ssh
ssh_port: porta per connettersi alla macchinausername: username per connettersi alla macchinapassword: password per connettersi alla macchinakey_file: percorso al file della chiave privatakey_passphrase: passphrase per il file della chiave privata
Contesto dagli Argomenti
Per decidere l’ambiente in cui il calcolo verrà eseguito ogni volta, puoi passare il contesto come argomento al programma.
#![allow(unused)]
fn main() {
let (config, args) = RuntimeConfig::from_args();
let ctx = StreamContext::new(config);
// ... Stream e operatori
}e quando esegui il programma puoi passare gli argomenti al programma, specificando se vuoi eseguire il calcolo localmente o remotamente.
cargo run -- --local numero_di_thread
cargo run -- --remote percorso/al/config.toml
Da Flink a Renoir
Questa guida presuppone che tu abbia già configurato un ambiente per Renoir e creato un progetto cargo seguendo la guida
Questa introduzione rapida segue un approccio pratico mostrando esempi che confrontano l’API di Flink con l’API di Renoir
Se conosci Apache Flink, troverai facile iniziare ad usare Renoir.
Proprio come in Flink, i calcoli in Renoir sono definiti come un grafo di operatori, dove i dati fluiscono da un operatore all’altro.
Iniziamo: Wordcount
Come primo compito implementeremo un’applicazione per il conteggio delle parole sia in Flink che in Renoir. L’obiettivo è leggere un file e contare le occorrenze di tutte le parole distinte contenute in esso.
Flink
// Imports...
public class WordCount {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
long start = System.nanoTime();
DataSet < String > text = env.readTextFile("text-file.txt");
DataSet < Tuple2 < String, Integer >> counts =
text.flatMap((value, collector) - > {
String[] token = value.split("\\s+");
for (String token: tokens) {
if (token.length() > 0) {
out.collect(new Tuple2 < > (token.toLowerCase(), 1));
}
}
})
// raggruppa per il campo della tupla "0" e somma il campo della tupla "1"
.groupBy(0)
.sum(1);
counts.count();
long stop = System.nanoTime();
System.out.printf("total:%d", stop - start);
counts.sort(Comparator.comparing((t) - > getCount(t)));
for (Tuple2 < String, Integer > tuple: counts) {
System.out.printf("%s %d\n", tuple.f0, tuple.f1);
}
}
}
Renoir
use renoir::prelude::*;
fn main() {
let ctx = StreamContext::new_local();
let result = ctx
.stream_file("/etc/passwd")
.flat_map(|line| {
line.split_whitespace()
.map(|t| t.to_lowercase())
.collect::<Vec<String>>()
})
.group_by_count(|word: &String| word.clone())
.collect_vec();
let start = Instant::now();
ctx.execute_blocking();
let elapsed = start.elapsed();
if let Some(mut res) = result.get() {
res.sort_by_key(|t| t.1);
println!("{res:#?}");
}
println!("{elapsed:?}");
}
WIP: Questa guida è ancora incompleta
Trasformazioni Sequenziali
Renoir offre una moltitudine di operatori per trasformare e manipolare stream di dati. In questa sezione, vedremo come utilizzare gli operatori di base per eseguire trasformazioni sequenziali su uno stream.
Una trasformazione sequenziale è un’operazione che viene applicata a ogni elemento dello stream una volta in sequenza. Questo ci permette di ottenere il massimo livello di parallelismo e di distribuire il carico equamente tra le risorse disponibili.
Map
L’operatore map è usato per applicare una funzione a ogni elemento dello stream. La funzione può essere qualsiasi closure che prende un elemento dello stream come input e ritorna un nuovo elemento.
#![allow(unused)]
fn main() {
// Moltiplica ogni elemento dello stream per 10
let res = s.map(|n| n * 10).collect_vec();
}L’operatore map, dato che è applicato a ogni elemento indipendentemente, può usare tutto il potere del parallelismo.
RichMap
L’operatore rich_map è simile all’operatore map ma permette di mantenere uno stato tra gli elementi dello stream. La funzione passata all’operatore rich_map può essere stateful e mantenere uno stato per ogni replica.
#![allow(unused)]
fn main() {
// Enumera gli elementi dello stream
let res = s.rich_map({
let mut id = 0;
move |x| {
id += 1;
(id - 1, x)
}
}).collect_vec();
}Nota che lo stato è mantenuto per ogni replica dell’operatore, quindi in questo caso, se manteniamo il parallelismo ci saranno elementi multipli con lo stesso ID (uno per ogni replica).
Filter
L’operatore filter è usato per mantenere solo gli elementi dello stream che soddisfano una certa condizione. La funzione passata all’operatore filter deve ritornare un valore booleano.
#![allow(unused)]
fn main() {
// Mantieni solo gli elementi pari dello stream
let res = s.filter(|&n| n % 2 == 0).collect_vec();
}L’operatore filter, dato che è applicato a ogni elemento indipendentemente, può usare tutto il potere del parallelismo.
Flatten
L’operatore flatten è usato per appiattire uno stream di collezioni di elementi. Prende uno stream di container e ritorna uno stream con tutti gli elementi contenuti.
#![allow(unused)]
fn main() {
let s = env.stream_iter((vec![
vec![1, 2, 3],
vec![],
vec![4, 5],
].into_iter()));
let res = s.flatten().collect_vec();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![1, 2, 3, 4, 5]);
}Concatenazione di Operatori Sequenziali
Per aiutare l’utente a scrivere codice pulito e leggibile, Renoir offre una serie di concatenazioni degli operatori precedenti in una singola chiamata. Questo permette di scrivere trasformazioni complesse in una singola riga di codice. Da concatenazioni semplici come flat_map dove il risultato di una map viene appiattito, a quelle più complesse come rich_filter_map dove l’utente può eseguire una map e filter stateful in una singola operazione.
Per una lista completa degli operatori vedi la documentazione API.
La maggior parte delle volte gli elementi in un flusso devono essere raggruppati o partizionati in qualche modo per ottenere il risultato desiderato. Renoir fornisce un insieme di operatori che fanno esattamente questo.
Raggruppamento
Group By
Probabilmente l’operazione più comune è quella di raggruppare insieme elementi per qualche chiave comune. La chiave può essere estratta dall’elemento stesso o calcolata da esso. L’operatore group_by prende una closure che restituisce la chiave per ogni elemento e restituisce un KeyedStream dove gli operatori vengono valutati sugli elementi con la stessa chiave.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
// partiziona elementi pari e dispari
let keyed = s.group_by(|&n| n % 2);
}Dopo il partizionamento tutti gli elementi verranno inviati alla rete per bilanciare il carico ma se il risultato desiderato è un’aggregazione in molti casi è consigliabile utilizzare una variante associativa dell’operatore
group_bycomegroup_by_reduceogroup_by_sum, una lista completa delle possibili varianti associative può essere trovata nella documentazione dell’API Group By.
Key By
OPERATORE AVANZATO
Crea un nuovo ‘KeyedStream’ in modo simile all’operatore group_by ma senza rimescolare gli elementi sulla rete. Questo può far comportare male altri operatori. Probabilmente vuoi utilizzare group_by invece.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.key_by(|&n| n % 2).collect_vec();
}Partizionamento
Route
A volte c’è bisogno di inviare elementi a rotte diverse basandosi su qualche condizione. L’operatore route permette di creare una serie di rotte e inviare gli elementi alla rotta corretta basandosi sulla prima condizione soddisfatta.
- Le rotte vengono create con il metodo
add_route, viene creato un nuovo flusso per ogni rotta. - Ogni elemento viene instradato al primo flusso per cui la condizione di instradamento si valuta come true.
- Se nessuna condizione di rotta è soddisfatta, l’elemento viene scartato
#![allow(unused)]
fn main() {
let mut routes = s.route()
.add_route(|&i| i < 5)
.add_route(|&i| i % 2 == 0)
.build()
.into_iter();
assert_eq!(routes.len(), 2);
// 0 1 2 3 4
routes.next().unwrap().for_each(|i| eprintln!("route1: {i}"));
// 6 8
routes.next().unwrap().for_each(|i| eprintln!("route2: {i}"));
// 5 7 9 ignorati
env.execute_blocking();
}Split
Crea più flussi da un singolo flusso dove ogni suddivisione avrà tutti gli elementi del flusso originale. L’operatore split è utile quando hai bisogno di applicare trasformazioni diverse allo stesso flusso.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let mut splits = s.split(3);
let a = splits.pop().unwrap();
let b = splits.pop().unwrap();
let c = splits.pop().unwrap();
}Riduzioni e Fold
Per ottenere informazioni da uno stream di dati è spesso necessario aggregare i dati in qualche modo. Renoir fornisce un set di operatori che ti permettono di eseguire riduzioni e fold sui stream di dati per ottenere le informazioni desiderate.
Reduce
L’operatore reduce aggrega i dati di uno stream seguendo una funzione definita dall’utente ed emette un singolo valore. La funzione dovrebbe modificare l’accumulatore che alla fine sarà il valore emesso.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.reduce(|acc, value| acc + value).collect::<Vec<_>>();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}Nota che il tipo dell’accumulatore è lo stesso del tipo degli elementi dello stream. Se è necessario un tipo diverso considera l’uso di
fold.
Reduce Associativo
L’operatore reduce_assoc è una variante dell’operatore reduce che può essere usato quando la funzione di riduzione è associativa. Questo permette all’operatore di essere eseguito in parallelo e può essere più efficiente dell’operatore reduce.
L’operatore applica la funzione di riduzione in due passi:
- Local: la funzione che sarà eseguita su ogni replica.
- Global: la funzione che aggregherà tutti i risultati parziali ottenuti dalle funzioni locali.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.reduce_assoc(|acc, value| acc + value).collect_vec();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}Nota che il tipo dell’accumulatore è lo stesso del tipo degli elementi dello stream. Se è necessario un tipo diverso considera l’uso di
fold_assoc.
Fold
L’operatore fold aggrega i dati di uno stream seguendo una funzione definita dall’utente ed emette un singolo valore. La funzione dovrebbe modificare l’accumulatore che alla fine sarà il valore emesso. È simile all’operatore reduce ma permette di specificare un valore iniziale e quindi il tipo per l’accumulatore.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.fold(0, |acc, value| *acc += value).collect_vec();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}Fold Associativo
L’operatore fold_assoc è una variante dell’operatore fold che può essere usato quando la funzione di riduzione è associativa. Simile al reduce_assoc, questo permette all’operatore di essere eseguito in parallelo e può essere più efficiente dell’operatore fold.
L’operatore richiede due funzioni definite dall’utente:
- Local: la funzione che sarà eseguita su ogni replica.
- Global: la funzione che aggregherà tutti i risultati parziali ottenuti dalle funzioni locali.
#![allow(unused)]
fn main() {
// Esempio
let s = env.stream_iter(0..5);
let res = s.fold_assoc(0, |acc, value| *acc += value, |acc, value| *acc += value).collect_vec();
env.execute_blocking();
assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}Finestre
Quando si lavora con flussi di dati illimitati è spesso necessario considerare solo un sottoinsieme dei dati per eseguire alcuni calcoli. Renoir fornisce un insieme di operatori che ti permettono di definire finestre sui flussi di dati per eseguire il calcolo solo sui dati che fanno parte della finestra. In Renoir le finestre sono definite da un descrittore WinDescr che permette all’utente di specificare il tipo e la logica utilizzata per raggruppare i dati all’interno della finestra giusta.
Descrittori di Finestre
Ci sono diversi tipi di descrittori che possono essere utilizzati:
- CountWindow: definisce una finestra basata sul numero di elementi nella finestra. Può essere definita come finestre
tumblingosliding. - EventTimeWindow: definisce una finestra basata sui timestamp degli eventi dei dati. Può essere definita come finestre
tumblingosliding. - ProcessingTimeWindow: definisce una finestra basata sull’orologio di sistema al momento del processamento. Può essere definita come finestre
tumblingosliding. - SessionWindow: definisce una finestra che si divide dopo se nessun elemento viene ricevuto per una durata fissa dell’orologio di sistema.
- TransactionWindow: definisce una finestra basata su una logica definita dall’utente. Un’analisi completa di questo descrittore può essere trovata nella Documentazione dell’API TransactionWindow.
Finestre su un singolo flusso
Se il flusso NON è partizionato in alcun modo come usando gli operatori group_by o key_by, la finestra è definita sull’intero flusso. L’operatore che ti permette di definire una finestra sul flusso è window_all. L’operatore window_all prende un descrittore della finestra come argomento e restituisce un flusso con finestre che può essere utilizzato per eseguire calcoli sulla finestra.
let s = env.stream_iter(0..5usize);
let res = s
.window_all(CountWindow::tumbling(2))
// resto della pipeline
Nota che poiché la finestra è definita sull’intero flusso, questo operatore non può essere parallelizzato. Se possibile partiziona il flusso usando gli operatori
group_byokey_byper permettere l’esecuzione parallela.
Finestre su un flusso partizionato
Se il flusso è partizionato in qualche modo come usando gli operatori group_by o key_by, la finestra è definita su ogni partizione. Possiamo definire le nostre finestre usando l’operatore window con il descrittore che vogliamo.
let s = env.stream_iter(0..9);
let res = s
.group_by(|&n| n % 2)
.window(CountWindow::sliding(3, 2))
// resto della pipeline
se vogliamo creare una finestra dopo aver unito due KeyedStream sulla stessa chiave possiamo usare l’operatore window_join.
let s1 = env.stream_iter(0..9);
let s2 = env.stream_iter(0..9);
let res = s1
.key_by(|&n| n % 2)
.window_join(s2.key_by(|&n| n % 2), CountWindow::tumbling(2))
// resto della pipeline
Operatori sulle finestre
Una volta definita la finestra possiamo eseguire diverse operazioni per ottenere le informazioni desiderate. Alcune delle operazioni possibili sono le standard max, min, sum o count ma sono disponibili anche operazioni più complesse come l’operatore fold.
Per una lista completa degli operatori disponibili controlla la Documentazione dell’API.
let s = env.stream_iter(0..5usize);
let res = s
.window_all(CountWindow::tumbling(2))
.fold(0, |acc, value| acc + value)
.collect_vec();
Operatori Multi-Stream
Ci saranno momenti in cui abbiamo bisogno di unire più flussi insieme per ottenere le intuizioni di cui abbiamo bisogno. Renoir fornisce un insieme di operatori che ti permettono di unire più flussi insieme.
Join
L’operazione più comune è quella di unire due flussi insieme. L’operatore join ti permette di unire due flussi insieme basandoti su una chiave. La chiave viene valutata usando una closure (una per ogni flusso) e due elementi vengono uniti insieme se le chiavi sono uguali.
Questo operatore è simile alla query SQL SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b).
#![allow(unused)]
fn main() {
let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
}Renoir offre anche altre due varianti dell’operatore join:
left_joinche mantiene tutti gli elementi del flusso di sinistra e gli elementi del flusso di destra che hanno una chiave corrispondente.outer_joinche mantiene tutti gli elementi di entrambi i flussi e gli elementi del flusso di destra che hanno una chiave corrispondente.interval_joinche permette di unire due flussi basandosi su un intervallo di tempo. Gli elementi del flusso di destra vengono uniti con l’elemento del flusso di sinistra se il loro timestamp è all’interno di un intervallo centrato sul timestamp dell’elemento di sinistra.
L’operatore join è disponibile anche per KeyedStream, in quel caso la chiave è quella utilizzata per partizionare il flusso.
Un utente più esperto potrebbe voler utilizzare l’operatore
join_withche permette di personalizzare il comportamento del join. Puoi selezionare quale strategia di spedizione e quale strategia locale utilizzare. Una lista completa delle possibili condizioni di join può essere trovata nella documentazione dell’API Join_With.
Merge
L’operatore merge ti permette di unire due flussi insieme. L’operatore prende due flussi e restituisce un nuovo flusso che contiene tutti gli elementi dei due flussi. Gli elementi non sono ordinati in alcun modo.
#![allow(unused)]
fn main() {
let s1 = env.stream_iter(0..10);
let s2 = env.stream_iter(10..20);
let res = s1.merge(s2).collect_vec();
}Zip
L’operatore zip ti permette di unire due flussi insieme. L’operatore prende due flussi e restituisce un nuovo flusso che contiene gli elementi dei due flussi zippati insieme. Gli elementi sono ordinati nello stesso modo in cui sono prodotti dai due flussi.
#![allow(unused)]
fn main() {
let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter()));
let s2 = env.stream_iter((vec![1, 2, 3].into_iter()));
let res = s1.zip(s2).collect_vec();
}Tutti gli elementi dopo la fine del flusso più corto verranno scartati
Operatori Iterativi
Potrebbe esserci la possibilità che tu abbia bisogno di iterare su un flusso più volte per ottenere le intuizioni corrette o per eseguire un calcolo complesso. Renoir fornisce un insieme di operatori esattamente per questo scopo.
Iterate
Questo operatore permette di creare un flusso di lavoro iterativo dove i dati ciclano attraverso lo stesso insieme di operatori più volte. Questo operatore ha diverse caratteristiche:
- Iterazioni Massime: Il numero massimo di iterazioni da eseguire. Se viene raggiunto il numero massimo di iterazioni, l’operatore interromperà l’iterazione e produrrà lo stato corrente dei dati.
- Stato dell’Iterazione: Lo stato che tutte le repliche di questo operatore possono leggere. Lo stato può essere aggiornato alla fine di ogni iterazione dalla funzione
global_fold. - Body: L’insieme di operatori che verrà eseguito ad ogni iterazione. L’output del body verrà utilizzato come input dell’iterazione successiva.
- Local Fold: La funzione che verrà eseguita da ogni replica utilizzata per aggiornare lo Stato dell’Iterazione, i risultati verranno aggregati dalla funzione
global_fold. - Global Fold: La funzione che verrà eseguita per aggregare i risultati della funzione
local_folde aggiornare lo Stato dell’Iterazione. - Condizione del Loop: La condizione che verrà valutata alla fine di ogni iterazione per decidere se l’iterazione deve continuare o meno.
let s = env.stream_iter(0..3).shuffle();
let (state, items) = s.iterate(
3, // al massimo 3 iterazioni
0, // lo stato iniziale è zero
|s, state| s.map(|n| n + 10),
|delta: &mut i32, n| *delta += n,
|state, delta| *state += delta,
|_state| true,
);
let state = state.collect_vec();
let items = items.collect_vec();
se vuoi riciclare lo stesso input iniziale e non il risultato dell’iterazione precedente, puoi usare l’operatore
replay. Ilreplayè molto simile all’operatoreiteratecioè richiede gli stessi parametri, ma riprodurrà il flusso di input ad ogni iterazione.
Deployment
Lavori in corso…
Esempi
Esempi
Conteggio Parole
use renoir::prelude::*;
fn main() {
// Metodo di convenienza per analizzare la configurazione di deployment dagli argomenti CLI
let (config, args) = RuntimeConfig::from_args();
config.spawn_remote_workers();
let env = StreamContext::new(config);
let result = env
// Apri e leggi il file riga per riga in parallelo
.stream_file(&args[0])
// Dividi in parole
.flat_map(|line| tokenize(&line))
// Partiziona
.group_by(|word| word.clone())
// Conta le occorrenze
.fold(0, |count, _word| *count += 1)
// Raccogli il risultato
.collect_vec();
env.execute_blocking(); // Inizia l'esecuzione (bloccante)
if let Some(result) = result.get() {
// Stampa il conteggio delle parole
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
// Strategia di tokenizzazione semplice
s.split_whitespace().map(str::to_lowercase).collect()
}
// Esegui su 6 host locali `cargo run -- -l 6 input.txt`
Conteggio Parole associativo (più veloce)
use renoir::prelude::*;
fn main() {
// Metodo di convenienza per analizzare la configurazione di deployment dagli argomenti CLI
let (config, args) = RuntimeConfig::from_args();
let env = StreamContext::new(config);
let result = env
.stream_file(&args[0])
// Il batching adattivo (default) ha latenza prevedibile
// Il batching di dimensione fissa spesso porta a tempi di esecuzione più brevi
// Se i dati sono immediatamente disponibili e la latenza non è critica
.batch_mode(BatchMode::fixed(1024))
.flat_map(move |line| tokenize(&line))
.map(|word| (word, 1))
// Gli operatori associativi dividono l'operazione in un passo locale e uno
// globale per un'esecuzione più veloce
.group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
.unkey()
.collect_vec();
env.execute_blocking(); // Inizia l'esecuzione (bloccante)
if let Some(result) = result.get() {
// Stampa il conteggio delle parole
result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
}
}
fn tokenize(s: &str) -> Vec<String> {
s.split_whitespace().map(str::to_lowercase).collect()
}
// Esegui su più host `cargo run -- -r config.toml input.txt`
Deployment remoto
# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16
[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }
Fai riferimento alla directory examples per un set esteso di esempi funzionanti
Se hai già configurato un ambiente di sviluppo con cui ti trovi a tuo agio ti consigliamo di rimanere con quello che trovi più produttivo. Altrimenti, se questa è la tua prima esperienza di programmazione con Rust o stai cercando idee per migliorare la tua produttività, questa pagina ti mostrerà come puoi configurare un ambiente di sviluppo efficace per Rust e Renoir.
Configurare un ambiente usando VS Code
Prima di tutto, raccomanderei di lavorare in un sistema UNIX se possibile1, se sei su Windows potresti provare ad usare Windows Subsystem for Linux (WSL) per un’esperienza migliore.
Come IDE, uso VS Code per lavorare con Rust (Nota: c’è anche un’estensione WSL se sei su windows e vuoi lavorare in WSL)
L’estensione rust-analyzer ti porterà quasi tutto quello di cui potresti aver bisogno per lavorare con Rust, c’è anche una guida ufficiale su come configurare vs code per Rust.
Oltre a questo raccomando anche l’estensione Error Lens che si integra bene con rust-analyzer per mostrare errori del compilatore e suggerimenti direttamente inline (Abilita il livello diagnostico hint nelle impostazioni CTRL+, per maggiori dettagli) Le estensioni Even Better TOML e Crates2 per lavorare con il Cargo.toml
Infine per gli strumenti CLI, ricorda di controllare cargo clippy --all --all-targets per buone pratiche di codifica (puoi usare la flag --fix per applicare automaticamente anche le correzioni) cargo fmt --all per formattare il codice e uso anche cargo-edit per il comando cargo upgrade, che insieme a cargo add può essere utilizzato per gestire il Cargo.toml dal terminale
Quando valuti le prestazioni esegui con la flag --release
Se senti che l’estensione rust-analyzer sta sovraccaricando con informazioni mostrando tutti i suggerimenti di tipo, puoi disabilitarne alcuni, nella mia configurazione ho
- Imports>Granularity>Group: module
- Inlay Hints>Chaining Hints>Enable: false
- Inlay Hints>Type Hints>Enable: false
Altre estensioni consigliate
Attualmente sto usando Fedora KDE e ho usato Kubuntu in passato ↩
Questa estensione è attualmente non mantenuta e stiamo cercando raccomandazioni alternative. ↩