Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Renoir

Preprint

REactive Network of Operators In Rust

Documentazione API

Renoir (abbreviato: Noir) [/ʁənwaʁ/, /nwaʁ/] è una piattaforma di elaborazione dati distribuita basata sul paradigma del dataflow che fornisce un’interfaccia di programmazione ergonomica, simile a quella di Apache Flink, ma con caratteristiche prestazionali molto migliori.

Renoir converte ogni job in un grafico di dataflow di operatori e li raggruppa in blocchi. I blocchi contengono una sequenza di operatori che elaborano i dati sequenzialmente senza ripartizionarli. Sono l’unità di deployment utilizzata dal sistema e possono essere distribuiti ed eseguiti su più sistemi.

Il layout comune di un programma Renoir inizia con la creazione di un StreamContext, quindi vengono inizializzati una o più Source creando un Stream. Il grafico degli operatori viene composto utilizzando i metodi dell’oggetto Stream, che seguono un approccio simile al trait Iterator di Rust permettendo di definire ergonomicamente un flusso di lavoro di elaborazione attraverso il concatenamento di metodi.

Esempi

Conteggio Parole

use renoir::prelude::*;

fn main() {
    // Metodo di convenienza per analizzare la configurazione di deployment dagli argomenti CLI
    let (config, args) = RuntimeConfig::from_args();
    config.spawn_remote_workers();
    let env = StreamContext::new(config);

    let result = env
        // Apri e leggi il file riga per riga in parallelo
        .stream_file(&args[1])
        // Dividi in parole
        .flat_map(|line| tokenize(&line))
        // Partizione
        .group_by(|word| word.clone())
        // Conta le occorrenze
        .fold(0, |count, _word| *count += 1)
        // Raccogli il risultato
        .collect_vec();
        
    env.execute_blocking(); // Inizia l'esecuzione (bloccante)
    if let Some(result) = result.get() {
        // Stampa i conteggi delle parole
        result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
    }
}

fn tokenize(s: &str) -> Vec<String> {
    // Strategia di tokenizzazione semplice
    s.split_whitespace().map(str::to_lowercase).collect()
}

// Esegui su 6 host locali `cargo run -- -l 6 input.txt`

Conteggio Parole Associativo (più veloce)

use renoir::prelude::*;

fn main() {
    // Metodo di convenienza per analizzare la configurazione di deployment dagli argomenti CLI
    let (config, args) = RuntimeConfig::from_args();
    let env = StreamContext::new(config);

    let result = env
        .stream_file(&args[1])
        // Il batching adattivo (default) ha latenza prevedibile
        // Il batching di dimensione fissa spesso porta a tempi di esecuzione più brevi
        // Se i dati sono immediatamente disponibili e la latenza non è critica
        .batch_mode(BatchMode::fixed(1024))
        .flat_map(move |line| tokenize(&line))
        .map(|word| (word, 1))
        // Gli operatori associativi dividono l'operazione in un passo locale e uno
        // globale per un'esecuzione più veloce
        .group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
        .unkey()
        .collect_vec();

    env.execute_blocking(); // Inizia l'esecuzione (bloccante)
    if let Some(result) = result.get() {
        // Stampa i conteggi delle parole
        result.into_iter().for_each(|(_, (word, count))| println!("{word}: {count}"));
    }
}

fn tokenize(s: &str) -> Vec<String> {
    s.split_whitespace().map(str::to_lowercase).collect()
}

// Esegui su più host `cargo run -- -r config.toml input.txt`

Deployment remoto

# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16

[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }

Consulta la directory esempi per un set esteso di esempi funzionanti

Installazione e creazione di un progetto Renoir

Il primo requisito per costruire un progetto Renoir è la toolchain Rust.

Installazione di Rust

  • Usando Rustup (Raccomandato): segui le istruzioni su https://rustup.rs/
    • Linux (Raccomandato):

      ~$ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
      
    • Windows: Scarica ed esegui l’installer

  • Usando il tuo package manager: in alternativa puoi usare il pacchetto fornito dal tuo repository

Aggiungi la toolchain rust al PATH

Per poter usare tutti gli strumenti della toolchain Rust dobbiamo aggiungere la cartella “~/.cargo/bin/” nel nostro PATH

  • bash:
~$ echo 'export PATH=$PATH:~/.cargo/bin/' >> ~/.bashrc
  • fish:
~$ set -xU fish_user_paths $fish_user_paths ~/.cargo/bin/

Crea un nuovo progetto cargo

Dopo aver installato con successo la toolchain Rust siamo pronti a creare meraviglie con Renoir. Per farlo creeremo un nuovo progetto aggiungendo Renoir alle sue dipendenze.

cargo new --bin renoir-test
cd renoir-test
 # Aggiungi la dipendenza renoir al `Cargo.toml`
 # Attualmente raccomandiamo di usare il repository GitHub direttamente
cargo add renoir --git https://github.com/deib-polimi/renoir

Ora puoi aprire il progetto nel tuo editor preferito e iniziare a scrivere la tua applicazione usando Renoir!

Suggerimento Bonus: Ambiente di Sviluppo

Installazione del Kernel Jupyter (Opzionale)

Prerequisiti

Installa Rust seguendo la guida di installazione

Evcxr

I programmi Rust possono anche essere eseguiti in un ambiente interattivo. Evcxr è un contesto di valutazione per Rust e fornisce un REPL (analogo a ipython), e un Kernel Jupyter.

Installazione del Kernel Jupyter

Guida ufficiale

I passaggi per installare il kernel jupyter sono i seguenti:

  1. Installa il binario evcxr_jupyter: cargo install --locked evcxr_jupyter
  2. Installa il kernel: evcxr_jupyter --install (Nota: assicurati che $HOME/.cargo/bin sia nella tua variabile PATH)

Utilizzare il Kernel Jupyter in Visual Studio Code

Guida ufficiale

  1. Installa il pacchetto jupyter nel tuo ambiente python
  2. Installa l’estensione Jupyter per VS Code
  3. Crea un nuovo file .ipynb e aprilo
  4. Seleziona il Kernel Jupyter
    1. Clicca sul pulsante Select Kernel in alto a destra
    2. Scegli Jupyter Kernel …
    3. Scegli il kernel Rust evcxr che hai installato prima

Importare dipendenze

Ora che hai selezionato il kernel puoi iniziare a scrivere codice ed eseguirlo. Per importare dipendenze puoi utilizzare la parola chiave :dep1.

:dep renoir = { git = "https://github.com/deib-polimi/renoir" }

Ora con un’istruzione use possiamo importare quello che ci serve per utilizzare renoir.

#![allow(unused)]
fn main() {
use renoir::prelude::*;
}

Prelude raccomandato

Il kernel evcxr può essere configurato usando parole chiave speciali secondo le tue necessità. Elenchiamo alcune delle modifiche più utili che puoi fare (queste possono essere messe in una cella all’inizio del notebook)

  • :cache SIZE Imposta la dimensione della cache in MiB (usa per una compilazione più veloce)
  • :opt LEVEL Imposta il livello di ottimizzazione, il default è nessuna ottimizzazione (per un’esecuzione più veloce usa 1,2 o 3)

Esempio

#![allow(unused)]
fn main() {
:cache 500
:opt 1
:dep renoir = { git = "https://github.com/deib-polimi/renoir" }
use renoir::prelude::*;
}
#![allow(unused)]
fn main() {
let ctx = StreamContext::new_local();

let result = ctx.stream_par_iter(0..100)
    .map(|x| (x, x * x))
    .collect_vec();

ctx.execute_blocking();
let output = result.get()
}
#![allow(unused)]
fn main() {
// La parola chiave :vars stamperà le variabili che hai impostato (Nota: le regole di lifetime di Rust si applicano ancora!)
:vars
}
VariabileTipo
outputOption<Vec<(i32,i32)>>
#![allow(unused)]
fn main() {
println!("{output:?}");
}

Some([(0, 0), (1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (6, 36), (7, 49), (8, …


  1. segue la stessa sintassi di cargo toml

Dagli Iterator agli Stream

Questa è una guida rapida pensata per persone con una certa familiarità con il linguaggio di programmazione Rust. È pensata come un salto per iniziare a scrivere programmi Renoir velocemente, ma non è completa, quindi fai riferimento al resto della documentazione per maggiori dettagli.

L’introduzione più veloce a Renoir è iniziare pensandolo come iteratori intelligenti (potresti aver visto un approccio simile con Rayon)

Con Iterator hai una sequenza di operatori e applichi trasformazioni come map() o filter() per trasformare la sequenza e alla fine raccoglierai il risultato in una collezione o eseguirai qualche tipo di operazione che consuma l’iteratore, come sum().

Gli Stream di Renoir funzionano allo stesso modo, puoi pensare agli stream in modo simile agli iteratori, ti permettono di iniziare da una Source che genera una sequenza di elementi, trasformarli usando Operator e raccoglierli o consumare lo stream usando un Sink.

La differenza chiave è che gli stream di Renoir sono ottimizzati per computazioni parallele e distribuite e possono essere eseguiti senza soluzione di continuità su una o più macchine connesse.

Contesto

A causa della natura distribuita di Renoir, dobbiamo fare un paio di cose prima di iniziare. (Iniziamo con un esempio con deployment locale e mostriamo come passare facilmente a un deployment distribuito dopo)

// Importiamo i componenti core di renoir
use renoir::prelude::*;

fn main() {
 let ctx = StreamContext::new_local();
 // ... Stream e operatori
 ctx.execute_blocking();
 // ...
}

Ogni Stream di Renoir vive all’interno di un StreamContext. Il contesto può contenere più stream e operatori ed è l’oggetto che governa l’esecuzione di tutti gli stream al suo interno.

  1. Un StreamContext viene creato
  2. Uno o più Stream sono definiti all’interno del contesto
  3. Il StreamContext viene eseguito risultando nell’esecuzione di tutti gli stream al suo interno

Per default Renoir fornisce un metodo execute_blocking() che avvia tutti gli stream e operatori e aspetta finché tutti hanno finito. È possibile eseguire l’esecuzione in background eseguendo il metodo StreamContext::execute_blocking() in un altro thread

#![allow(unused)]
fn main() {
std::thread::spawn(move || ctx.execute_blocking());
}

Oppure è anche possibile usare il metodo asincrono StreamContext::execute() se la feature tokio è abilitata. Nota: per ragioni di performance, solo alcune parti del sistema sono eseguite sullo scheduler asincrono quando la feature è abilitata, mentre la maggior parte degli operatori funziona su thread separati.

Dagli Iterator agli Stream

Ora vedremo quanto è facile passare da un iteratore a uno stream.

Vogliamo creare uno stream che prende un range di numeri da 0 a 200, filtra i numeri che sono divisibili per 3 o 5, li moltiplica per 2 e raccoglie il risultato in un vettore.

// Con iteratori
fn main() {
 let input = 0..200;
 let output = input
  .filter(|x| x % 3 == 0 || x % 5 == 0)
  .map(|x| x * 2)
  .collect::<Vec<_>>();
 println!("{output:?}");
}

Con Renoir, dobbiamo solo creare un contesto, creare uno stream dall’iteratore e applicare gli stessi operatori.

// Con renoir
use renoir::prelude::*;
fn main() {
 let ctx = StreamContext::new_local();
 let input = 0..200;

 // Stiamo facendo streaming dell'iteratore dalla nostra macchina
 let output = ctx.stream_iter(input)
  .filter(|x| x % 3 == 0 || x % 5 == 0)
  .map(|x| x * 2)
  .collect_vec();
  // Raccogliamo l'output di ritorno alla nostra macchina
 ctx.execute_blocking();

 // Dato che questi stessi stream potrebbero essere eseguiti in un deployment distribuito,
 // dobbiamo assicurarci che questo nodo sia quello che ha raccolto l'output.
 if let Some(output) = output.get() {
  println!("{output:?}");
 }
}

Con Renoir, possiamo facilmente passare da un iteratore single-thread a uno stream parallelo e distribuito semplicemente cambiando poche righe e riducendo il tempo di esecuzione a una frazione dell’originale.

Distribuire i dati

Nell’esempio precedente, abbiamo usato un deployment su singolo nodo (StreamContext::new_local()) e abbiamo usato IteratorSource, che prende come input un iteratore dal primo nodo nel deployment e alimenta i suoi elementi in uno stream.

E se volessimo eseguire questo in parallelo?

Abbiamo multiple opzioni:

  • Partizionare e distribuire i dati dopo la source casualmente
  • Partizionare e distribuire i dati dopo la source secondo una logica di raggruppamento
  • Usare una source parallela

Mescolamento degli elementi

#![allow(unused)]
fn main() {
let output = ctx.stream_iter(input)
 .shuffle()
 .filter(|x| x % 3 == 0 || x % 5 == 0)
 .map(|x| x * 2)
 .collect_vec();
}

Aggiungendo un operatore shuffle dopo la nostra source, gli elementi saranno distribuiti uniformemente tra tutte le repliche disponibili per l’operatore successivo. (Dato che siamo ancora in un deployment locale, per default gli operatori che non hanno limiti sulla replicazione saranno replicati un numero di volte uguale ai core disponibili nel sistema)

Raggruppamento degli elementi

Uno degli operatori più versatili nel toolkit di Renoir è l’operatore group_by e i suoi derivati. Questo operatore ti permette di definire una Chiave per ogni elemento, elementi con la stessa Chiave appartengono allo stesso gruppo. Quando gli elementi sono raggruppati, i gruppi sono divisi tra le repliche secondo l’Hash della Chiave.

Dopo aver applicato un operatore di raggruppamento, lo Stream diventerà un KeyedStream che permette di interagire con lo stream usando le informazioni di raggruppamento

#![allow(unused)]
fn main() {
let output = ctx.stream_iter(input)
 .group_by(|x| x / 10)
 .filter(|(_key, x)| x % 3 == 0 || x % 5 == 0)
 .map(|(_key, x)| x * 2)
 .collect_vec();
// Nota: l'output di questo esempio è diverso dal precedente
}

Source Parallela

Un altro modo per distribuire i dati è usare una source parallela. Usando questa source Renoir creerà un numero di repliche della source e le eseguirà in parallelo.

Nell’esempio seguente, distribuiamo il range di numeri tra i diversi core rendendo l’intero calcolo parallelo.

#![allow(unused)]
fn main() {
let n = 200;
ctx.stream_par_iter(
     move |id, instances| {
         let chunk_size = (n + instances - 1) / instances;
        let remaining = n - n.min(chunk_size * id);
        let range = remaining.min(chunk_size);

        let start = id * chunk_size;
        let stop = id * chunk_size + range;
        start..stop
    })
    .group_by(|x| x / 10)
 .filter(|(_key, x)| x % 3 == 0 || x % 5 == 0)
 .map(|(_key, x)| x * 2)
 .collect_vec();
}

Andare in Parallelo

In questa sezione vedremo quanto è facile eseguire il calcolo di Renoir in parallelo e distribuirlo attraverso più macchine.

Le informazioni sull’ambiente in cui il calcolo verrà eseguito sono memorizzate in un StreamContext. Il contesto può contenere più stream e operatori ed è l’oggetto che governa l’esecuzione di tutti gli stream al suo interno.

Deployment Locale

Per eseguire un calcolo su una singola macchina, puoi creare un StreamContext con il metodo new_local(). Questo metodo crea un contesto che eseguirà lo stream usando tutte le risorse disponibili della macchina.

#![allow(unused)]
fn main() {
let ctx = StreamContext::new_local();
// ... Stream e operatori
}

se vuoi specificare il numero di thread da usare, puoi creare un RuntimeConfig personalizzato facilmente usando il metodo local(..).

#![allow(unused)]
fn main() {
let config = RuntimeConfig::local(4).unwrap();
let ctx = StreamContext::new(config);
// ... Stream e operatori
}

Deployment Distribuito

Per eseguire un calcolo su più macchine, puoi creare un StreamContext con il metodo remote(..). Questo metodo prende come argomento il percorso a un file di configurazione (toml) che contiene le informazioni sul cluster.

#![allow(unused)]
fn main() {
let config = RuntimeConfig::remote("path/to/config.toml").unwrap();
config.spawn_remote_workers();
let ctx = StreamContext::new(config);
// ... Stream e operatori
}

Se vuoi usare un ambiente distribuito devi spawnarli usando spawn_remote_workers prima di richiedere qualche stream.

il file di configurazione dovrebbe contenere le informazioni necessarie per connettersi alle varie macchine nel cluster, per esempio:

# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16

[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }

E proprio così la tua pipeline sarà automaticamente distribuita attraverso entrambe le macchine.

Opzioni disponibili per il RuntimeConfig sono:

  • address: una stringa con l’indirizzo della macchina
  • base_port: porta di partenza per la comunicazione tra operatori su macchine diverse
  • num_cores: numero di core disponibili sulla macchina
  • ssh: oggetto per memorizzare le informazioni di connessione ssh
  • ssh_port: porta per connettersi alla macchina
  • username: username per connettersi alla macchina
  • password: password per connettersi alla macchina
  • key_file: percorso al file della chiave privata
  • key_passphrase: passphrase per il file della chiave privata

Contesto dagli Argomenti

Per decidere l’ambiente in cui il calcolo verrà eseguito ogni volta, puoi passare il contesto come argomento al programma.

#![allow(unused)]
fn main() {
let (config, args) = RuntimeConfig::from_args();
let ctx = StreamContext::new(config);
// ... Stream e operatori
}

e quando esegui il programma puoi passare gli argomenti al programma, specificando se vuoi eseguire il calcolo localmente o remotamente.

cargo run -- --local numero_di_thread
cargo run -- --remote percorso/al/config.toml

Da Flink a Renoir

Questa guida presuppone che tu abbia già configurato un ambiente per Renoir e creato un progetto cargo seguendo la guida

Questa introduzione rapida segue un approccio pratico mostrando esempi che confrontano l’API di Flink con l’API di Renoir

Se conosci Apache Flink, troverai facile iniziare ad usare Renoir.

Proprio come in Flink, i calcoli in Renoir sono definiti come un grafo di operatori, dove i dati fluiscono da un operatore all’altro.

Iniziamo: Wordcount

Come primo compito implementeremo un’applicazione per il conteggio delle parole sia in Flink che in Renoir. L’obiettivo è leggere un file e contare le occorrenze di tutte le parole distinte contenute in esso.

// Imports...
public class WordCount {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        long start = System.nanoTime();

        DataSet < String > text = env.readTextFile("text-file.txt");

        DataSet < Tuple2 < String, Integer >> counts =
            text.flatMap((value, collector) - > {
                String[] token = value.split("\\s+");
                for (String token: tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2 < > (token.toLowerCase(), 1));
                    }
                }
            })
            // raggruppa per il campo della tupla "0" e somma il campo della tupla "1"
            .groupBy(0)
            .sum(1);
        counts.count();

        long stop = System.nanoTime();
        System.out.printf("total:%d", stop - start);

        counts.sort(Comparator.comparing((t) - > getCount(t)));
        for (Tuple2 < String, Integer > tuple: counts) {
            System.out.printf("%s %d\n", tuple.f0, tuple.f1);
        }
    }
}

Renoir

use renoir::prelude::*;

fn main() {
    let ctx = StreamContext::new_local();

    let result = ctx
        .stream_file("/etc/passwd")
        .flat_map(|line| {
            line.split_whitespace()
                .map(|t| t.to_lowercase())
                .collect::<Vec<String>>()
        })
        .group_by_count(|word: &String| word.clone())
        .collect_vec();

    let start = Instant::now();
    ctx.execute_blocking();
    let elapsed = start.elapsed();

    if let Some(mut res) = result.get() {
        res.sort_by_key(|t| t.1);
        println!("{res:#?}");
    }
    println!("{elapsed:?}");
}

WIP: Questa guida è ancora incompleta

Trasformazioni Sequenziali

Renoir offre una moltitudine di operatori per trasformare e manipolare stream di dati. In questa sezione, vedremo come utilizzare gli operatori di base per eseguire trasformazioni sequenziali su uno stream.

Una trasformazione sequenziale è un’operazione che viene applicata a ogni elemento dello stream una volta in sequenza. Questo ci permette di ottenere il massimo livello di parallelismo e di distribuire il carico equamente tra le risorse disponibili.

Map

L’operatore map è usato per applicare una funzione a ogni elemento dello stream. La funzione può essere qualsiasi closure che prende un elemento dello stream come input e ritorna un nuovo elemento.

#![allow(unused)]
fn main() {
// Moltiplica ogni elemento dello stream per 10
let res = s.map(|n| n * 10).collect_vec();
}

L’operatore map, dato che è applicato a ogni elemento indipendentemente, può usare tutto il potere del parallelismo.

RichMap

L’operatore rich_map è simile all’operatore map ma permette di mantenere uno stato tra gli elementi dello stream. La funzione passata all’operatore rich_map può essere stateful e mantenere uno stato per ogni replica.

#![allow(unused)]
fn main() {
// Enumera gli elementi dello stream
let res = s.rich_map({
    let mut id = 0;
    move |x| {
        id += 1;
 (id - 1, x)
 }
}).collect_vec();
}

Nota che lo stato è mantenuto per ogni replica dell’operatore, quindi in questo caso, se manteniamo il parallelismo ci saranno elementi multipli con lo stesso ID (uno per ogni replica).

Filter

L’operatore filter è usato per mantenere solo gli elementi dello stream che soddisfano una certa condizione. La funzione passata all’operatore filter deve ritornare un valore booleano.

#![allow(unused)]
fn main() {
// Mantieni solo gli elementi pari dello stream
let res = s.filter(|&n| n % 2 == 0).collect_vec();
}

L’operatore filter, dato che è applicato a ogni elemento indipendentemente, può usare tutto il potere del parallelismo.

Flatten

L’operatore flatten è usato per appiattire uno stream di collezioni di elementi. Prende uno stream di container e ritorna uno stream con tutti gli elementi contenuti.

#![allow(unused)]
fn main() {
let s = env.stream_iter((vec![
    vec![1, 2, 3],
    vec![],
    vec![4, 5],
].into_iter()));
let res = s.flatten().collect_vec();
env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![1, 2, 3, 4, 5]);
}

Concatenazione di Operatori Sequenziali

Per aiutare l’utente a scrivere codice pulito e leggibile, Renoir offre una serie di concatenazioni degli operatori precedenti in una singola chiamata. Questo permette di scrivere trasformazioni complesse in una singola riga di codice. Da concatenazioni semplici come flat_map dove il risultato di una map viene appiattito, a quelle più complesse come rich_filter_map dove l’utente può eseguire una map e filter stateful in una singola operazione.

Per una lista completa degli operatori vedi la documentazione API.

La maggior parte delle volte gli elementi in un flusso devono essere raggruppati o partizionati in qualche modo per ottenere il risultato desiderato. Renoir fornisce un insieme di operatori che fanno esattamente questo.

Raggruppamento

Group By

Probabilmente l’operazione più comune è quella di raggruppare insieme elementi per qualche chiave comune. La chiave può essere estratta dall’elemento stesso o calcolata da esso. L’operatore group_by prende una closure che restituisce la chiave per ogni elemento e restituisce un KeyedStream dove gli operatori vengono valutati sugli elementi con la stessa chiave.

#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
// partiziona elementi pari e dispari
let keyed = s.group_by(|&n| n % 2); 
}

Dopo il partizionamento tutti gli elementi verranno inviati alla rete per bilanciare il carico ma se il risultato desiderato è un’aggregazione in molti casi è consigliabile utilizzare una variante associativa dell’operatore group_by come group_by_reduce o group_by_sum, una lista completa delle possibili varianti associative può essere trovata nella documentazione dell’API Group By.

Key By

OPERATORE AVANZATO

Crea un nuovo ‘KeyedStream’ in modo simile all’operatore group_by ma senza rimescolare gli elementi sulla rete. Questo può far comportare male altri operatori. Probabilmente vuoi utilizzare group_by invece.

#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.key_by(|&n| n % 2).collect_vec();
}

Partizionamento

Route

A volte c’è bisogno di inviare elementi a rotte diverse basandosi su qualche condizione. L’operatore route permette di creare una serie di rotte e inviare gli elementi alla rotta corretta basandosi sulla prima condizione soddisfatta.

  • Le rotte vengono create con il metodo add_route, viene creato un nuovo flusso per ogni rotta.
  • Ogni elemento viene instradato al primo flusso per cui la condizione di instradamento si valuta come true.
  • Se nessuna condizione di rotta è soddisfatta, l’elemento viene scartato
#![allow(unused)]
fn main() {
let mut routes = s.route()
 .add_route(|&i| i < 5)
 .add_route(|&i| i % 2 == 0)
 .build()
 .into_iter();
assert_eq!(routes.len(), 2);
// 0 1 2 3 4
routes.next().unwrap().for_each(|i| eprintln!("route1: {i}"));
// 6 8
routes.next().unwrap().for_each(|i| eprintln!("route2: {i}"));
// 5 7 9 ignorati
env.execute_blocking();
}

Split

Crea più flussi da un singolo flusso dove ogni suddivisione avrà tutti gli elementi del flusso originale. L’operatore split è utile quando hai bisogno di applicare trasformazioni diverse allo stesso flusso.

#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let mut splits = s.split(3);
let a = splits.pop().unwrap();
let b = splits.pop().unwrap();
let c = splits.pop().unwrap();
}

Riduzioni e Fold

Per ottenere informazioni da uno stream di dati è spesso necessario aggregare i dati in qualche modo. Renoir fornisce un set di operatori che ti permettono di eseguire riduzioni e fold sui stream di dati per ottenere le informazioni desiderate.

Reduce

L’operatore reduce aggrega i dati di uno stream seguendo una funzione definita dall’utente ed emette un singolo valore. La funzione dovrebbe modificare l’accumulatore che alla fine sarà il valore emesso.

#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.reduce(|acc, value| acc + value).collect::<Vec<_>>();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}

Nota che il tipo dell’accumulatore è lo stesso del tipo degli elementi dello stream. Se è necessario un tipo diverso considera l’uso di fold.

Reduce Associativo

L’operatore reduce_assoc è una variante dell’operatore reduce che può essere usato quando la funzione di riduzione è associativa. Questo permette all’operatore di essere eseguito in parallelo e può essere più efficiente dell’operatore reduce.

L’operatore applica la funzione di riduzione in due passi:

  • Local: la funzione che sarà eseguita su ogni replica.
  • Global: la funzione che aggregherà tutti i risultati parziali ottenuti dalle funzioni locali.
#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.reduce_assoc(|acc, value| acc + value).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}

Nota che il tipo dell’accumulatore è lo stesso del tipo degli elementi dello stream. Se è necessario un tipo diverso considera l’uso di fold_assoc.

Fold

L’operatore fold aggrega i dati di uno stream seguendo una funzione definita dall’utente ed emette un singolo valore. La funzione dovrebbe modificare l’accumulatore che alla fine sarà il valore emesso. È simile all’operatore reduce ma permette di specificare un valore iniziale e quindi il tipo per l’accumulatore.

#![allow(unused)]
fn main() {
let s = env.stream_iter(0..5);
let res = s.fold(0, |acc, value| *acc += value).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}

Fold Associativo

L’operatore fold_assoc è una variante dell’operatore fold che può essere usato quando la funzione di riduzione è associativa. Simile al reduce_assoc, questo permette all’operatore di essere eseguito in parallelo e può essere più efficiente dell’operatore fold.

L’operatore richiede due funzioni definite dall’utente:

  • Local: la funzione che sarà eseguita su ogni replica.
  • Global: la funzione che aggregherà tutti i risultati parziali ottenuti dalle funzioni locali.
#![allow(unused)]
fn main() {
// Esempio
let s = env.stream_iter(0..5);
let res = s.fold_assoc(0, |acc, value| *acc += value, |acc, value| *acc += value).collect_vec();

env.execute_blocking();

assert_eq!(res.get().unwrap(), vec![0 + 1 + 2 + 3 + 4]);
}

Finestre

Quando si lavora con flussi di dati illimitati è spesso necessario considerare solo un sottoinsieme dei dati per eseguire alcuni calcoli. Renoir fornisce un insieme di operatori che ti permettono di definire finestre sui flussi di dati per eseguire il calcolo solo sui dati che fanno parte della finestra. In Renoir le finestre sono definite da un descrittore WinDescr che permette all’utente di specificare il tipo e la logica utilizzata per raggruppare i dati all’interno della finestra giusta.

Descrittori di Finestre

Ci sono diversi tipi di descrittori che possono essere utilizzati:

  • CountWindow: definisce una finestra basata sul numero di elementi nella finestra. Può essere definita come finestre tumbling o sliding.
  • EventTimeWindow: definisce una finestra basata sui timestamp degli eventi dei dati. Può essere definita come finestre tumbling o sliding.
  • ProcessingTimeWindow: definisce una finestra basata sull’orologio di sistema al momento del processamento. Può essere definita come finestre tumbling o sliding.
  • SessionWindow: definisce una finestra che si divide dopo se nessun elemento viene ricevuto per una durata fissa dell’orologio di sistema.
  • TransactionWindow: definisce una finestra basata su una logica definita dall’utente. Un’analisi completa di questo descrittore può essere trovata nella Documentazione dell’API TransactionWindow.

Finestre su un singolo flusso

Se il flusso NON è partizionato in alcun modo come usando gli operatori group_by o key_by, la finestra è definita sull’intero flusso. L’operatore che ti permette di definire una finestra sul flusso è window_all. L’operatore window_all prende un descrittore della finestra come argomento e restituisce un flusso con finestre che può essere utilizzato per eseguire calcoli sulla finestra.

let s = env.stream_iter(0..5usize);
let res = s
    .window_all(CountWindow::tumbling(2))
    // resto della pipeline

Nota che poiché la finestra è definita sull’intero flusso, questo operatore non può essere parallelizzato. Se possibile partiziona il flusso usando gli operatori group_by o key_by per permettere l’esecuzione parallela.

Finestre su un flusso partizionato

Se il flusso è partizionato in qualche modo come usando gli operatori group_by o key_by, la finestra è definita su ogni partizione. Possiamo definire le nostre finestre usando l’operatore window con il descrittore che vogliamo.

let s = env.stream_iter(0..9);
let res = s
    .group_by(|&n| n % 2)
    .window(CountWindow::sliding(3, 2))
    // resto della pipeline

se vogliamo creare una finestra dopo aver unito due KeyedStream sulla stessa chiave possiamo usare l’operatore window_join.

let s1 = env.stream_iter(0..9);
let s2 = env.stream_iter(0..9);

let res = s1
    .key_by(|&n| n % 2)
    .window_join(s2.key_by(|&n| n % 2), CountWindow::tumbling(2))
    // resto della pipeline

Operatori sulle finestre

Una volta definita la finestra possiamo eseguire diverse operazioni per ottenere le informazioni desiderate. Alcune delle operazioni possibili sono le standard max, min, sum o count ma sono disponibili anche operazioni più complesse come l’operatore fold.

Per una lista completa degli operatori disponibili controlla la Documentazione dell’API.

let s = env.stream_iter(0..5usize);
let res = s
    .window_all(CountWindow::tumbling(2))
    .fold(0, |acc, value| acc + value)
    .collect_vec();

Operatori Multi-Stream

Ci saranno momenti in cui abbiamo bisogno di unire più flussi insieme per ottenere le intuizioni di cui abbiamo bisogno. Renoir fornisce un insieme di operatori che ti permettono di unire più flussi insieme.

Join

L’operazione più comune è quella di unire due flussi insieme. L’operatore join ti permette di unire due flussi insieme basandoti su una chiave. La chiave viene valutata usando una closure (una per ogni flusso) e due elementi vengono uniti insieme se le chiavi sono uguali.

Questo operatore è simile alla query SQL SELECT a, b FROM a JOIN b ON keyer1(a) = keyer2(b).

#![allow(unused)]
fn main() {
let s1 = env.stream_iter(0..5u8);
let s2 = env.stream_iter(0..5i32);
let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
}

Renoir offre anche altre due varianti dell’operatore join:

  • left_join che mantiene tutti gli elementi del flusso di sinistra e gli elementi del flusso di destra che hanno una chiave corrispondente.
  • outer_join che mantiene tutti gli elementi di entrambi i flussi e gli elementi del flusso di destra che hanno una chiave corrispondente.
  • interval_join che permette di unire due flussi basandosi su un intervallo di tempo. Gli elementi del flusso di destra vengono uniti con l’elemento del flusso di sinistra se il loro timestamp è all’interno di un intervallo centrato sul timestamp dell’elemento di sinistra.

L’operatore join è disponibile anche per KeyedStream, in quel caso la chiave è quella utilizzata per partizionare il flusso.

Un utente più esperto potrebbe voler utilizzare l’operatore join_with che permette di personalizzare il comportamento del join. Puoi selezionare quale strategia di spedizione e quale strategia locale utilizzare. Una lista completa delle possibili condizioni di join può essere trovata nella documentazione dell’API Join_With.

Merge

L’operatore merge ti permette di unire due flussi insieme. L’operatore prende due flussi e restituisce un nuovo flusso che contiene tutti gli elementi dei due flussi. Gli elementi non sono ordinati in alcun modo.

#![allow(unused)]
fn main() {
let s1 = env.stream_iter(0..10);
let s2 = env.stream_iter(10..20);
let res = s1.merge(s2).collect_vec();
}

Zip

L’operatore zip ti permette di unire due flussi insieme. L’operatore prende due flussi e restituisce un nuovo flusso che contiene gli elementi dei due flussi zippati insieme. Gli elementi sono ordinati nello stesso modo in cui sono prodotti dai due flussi.

#![allow(unused)]
fn main() {
let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter()));
let s2 = env.stream_iter((vec![1, 2, 3].into_iter()));
let res = s1.zip(s2).collect_vec();
}

Tutti gli elementi dopo la fine del flusso più corto verranno scartati

Operatori Iterativi

Potrebbe esserci la possibilità che tu abbia bisogno di iterare su un flusso più volte per ottenere le intuizioni corrette o per eseguire un calcolo complesso. Renoir fornisce un insieme di operatori esattamente per questo scopo.

Iterate

Questo operatore permette di creare un flusso di lavoro iterativo dove i dati ciclano attraverso lo stesso insieme di operatori più volte. Questo operatore ha diverse caratteristiche:

  • Iterazioni Massime: Il numero massimo di iterazioni da eseguire. Se viene raggiunto il numero massimo di iterazioni, l’operatore interromperà l’iterazione e produrrà lo stato corrente dei dati.
  • Stato dell’Iterazione: Lo stato che tutte le repliche di questo operatore possono leggere. Lo stato può essere aggiornato alla fine di ogni iterazione dalla funzione global_fold.
  • Body: L’insieme di operatori che verrà eseguito ad ogni iterazione. L’output del body verrà utilizzato come input dell’iterazione successiva.
  • Local Fold: La funzione che verrà eseguita da ogni replica utilizzata per aggiornare lo Stato dell’Iterazione, i risultati verranno aggregati dalla funzione global_fold.
  • Global Fold: La funzione che verrà eseguita per aggregare i risultati della funzione local_fold e aggiornare lo Stato dell’Iterazione.
  • Condizione del Loop: La condizione che verrà valutata alla fine di ogni iterazione per decidere se l’iterazione deve continuare o meno.
let s = env.stream_iter(0..3).shuffle();
let (state, items) = s.iterate(
    3, // al massimo 3 iterazioni
    0, // lo stato iniziale è zero
    |s, state| s.map(|n| n + 10),
    |delta: &mut i32, n| *delta += n,
    |state, delta| *state += delta,
    |_state| true,
);
let state = state.collect_vec();
let items = items.collect_vec();

se vuoi riciclare lo stesso input iniziale e non il risultato dell’iterazione precedente, puoi usare l’operatore replay. Il replay è molto simile all’operatore iterate cioè richiede gli stessi parametri, ma riprodurrà il flusso di input ad ogni iterazione.

Deployment

Lavori in corso…

Esempi

Esempi

Conteggio Parole

use renoir::prelude::*;

fn main() {
    // Metodo di convenienza per analizzare la configurazione di deployment dagli argomenti CLI
    let (config, args) = RuntimeConfig::from_args();
    config.spawn_remote_workers();
    let env = StreamContext::new(config);

    let result = env
        // Apri e leggi il file riga per riga in parallelo
        .stream_file(&args[0])
        // Dividi in parole
        .flat_map(|line| tokenize(&line))
        // Partiziona
        .group_by(|word| word.clone())
        // Conta le occorrenze
        .fold(0, |count, _word| *count += 1)
        // Raccogli il risultato
        .collect_vec();
        
    env.execute_blocking(); // Inizia l'esecuzione (bloccante)
    if let Some(result) = result.get() {
        // Stampa il conteggio delle parole
        result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
    }
}

fn tokenize(s: &str) -> Vec<String> {
    // Strategia di tokenizzazione semplice
    s.split_whitespace().map(str::to_lowercase).collect()
}

// Esegui su 6 host locali `cargo run -- -l 6 input.txt`

Conteggio Parole associativo (più veloce)

use renoir::prelude::*;

fn main() {
    // Metodo di convenienza per analizzare la configurazione di deployment dagli argomenti CLI
    let (config, args) = RuntimeConfig::from_args();
    let env = StreamContext::new(config);

    let result = env
        .stream_file(&args[0])
        // Il batching adattivo (default) ha latenza prevedibile
        // Il batching di dimensione fissa spesso porta a tempi di esecuzione più brevi
        // Se i dati sono immediatamente disponibili e la latenza non è critica
        .batch_mode(BatchMode::fixed(1024))
        .flat_map(move |line| tokenize(&line))
        .map(|word| (word, 1))
        // Gli operatori associativi dividono l'operazione in un passo locale e uno
        // globale per un'esecuzione più veloce
        .group_by_reduce(|w| w.clone(), |(_w1, c1), (_w2, c2)| *c1 += c2)
        .unkey()
        .collect_vec();

    env.execute_blocking(); // Inizia l'esecuzione (bloccante)
    if let Some(result) = result.get() {
        // Stampa il conteggio delle parole
        result.into_iter().for_each(|(word, count)| println!("{word}: {count}"));
    }
}

fn tokenize(s: &str) -> Vec<String> {
    s.split_whitespace().map(str::to_lowercase).collect()
}

// Esegui su più host `cargo run -- -r config.toml input.txt`

Deployment remoto

# config.toml
[[host]]
address = "host1.lan"
base_port = 9500
num_cores = 16

[[host]]
address = "host2.lan"
base_port = 9500
num_cores = 24
ssh = { username = "renoir", key_file = "/home/renoir/.ssh/id_ed25519" }

Fai riferimento alla directory examples per un set esteso di esempi funzionanti

Se hai già configurato un ambiente di sviluppo con cui ti trovi a tuo agio ti consigliamo di rimanere con quello che trovi più produttivo. Altrimenti, se questa è la tua prima esperienza di programmazione con Rust o stai cercando idee per migliorare la tua produttività, questa pagina ti mostrerà come puoi configurare un ambiente di sviluppo efficace per Rust e Renoir.

Configurare un ambiente usando VS Code

Prima di tutto, raccomanderei di lavorare in un sistema UNIX se possibile1, se sei su Windows potresti provare ad usare Windows Subsystem for Linux (WSL) per un’esperienza migliore.

Come IDE, uso VS Code per lavorare con Rust (Nota: c’è anche un’estensione WSL se sei su windows e vuoi lavorare in WSL)

L’estensione rust-analyzer ti porterà quasi tutto quello di cui potresti aver bisogno per lavorare con Rust, c’è anche una guida ufficiale su come configurare vs code per Rust.

Oltre a questo raccomando anche l’estensione Error Lens che si integra bene con rust-analyzer per mostrare errori del compilatore e suggerimenti direttamente inline (Abilita il livello diagnostico hint nelle impostazioni CTRL+, per maggiori dettagli) Le estensioni Even Better TOML e Crates2 per lavorare con il Cargo.toml

Infine per gli strumenti CLI, ricorda di controllare cargo clippy --all --all-targets per buone pratiche di codifica (puoi usare la flag --fix per applicare automaticamente anche le correzioni) cargo fmt --all per formattare il codice e uso anche cargo-edit per il comando cargo upgrade, che insieme a cargo add può essere utilizzato per gestire il Cargo.toml dal terminale

Quando valuti le prestazioni esegui con la flag --release

Se senti che l’estensione rust-analyzer sta sovraccaricando con informazioni mostrando tutti i suggerimenti di tipo, puoi disabilitarne alcuni, nella mia configurazione ho

  • Imports>Granularity>Group: module
  • Inlay Hints>Chaining Hints>Enable: false
  • Inlay Hints>Type Hints>Enable: false

Altre estensioni consigliate


  1. Attualmente sto usando Fedora KDE e ho usato Kubuntu in passato

  2. Questa estensione è attualmente non mantenuta e stiamo cercando raccomandazioni alternative.