Trasformazione stateful cumulativa in Apache Spark Streaming



Questo post del blog illustra le trasformazioni stateful in Spark Streaming. Scopri tutto sul monitoraggio cumulativo e sull'up-skill per una carriera in Hadoop Spark.

Contributo di Prithviraj Bose

trasforma double in int java

Nel mio blog precedente ho discusso delle trasformazioni stateful utilizzando il concetto di finestre di Apache Spark Streaming. Puoi leggerlo Qui .





In questo post parlerò delle operazioni con stato cumulative in Apache Spark Streaming. Se sei nuovo in Spark Streaming, ti consiglio vivamente di leggere il mio blog precedente per capire come funziona il windowing.

Tipi di trasformazione stateful in Spark Streaming (continua ...)

> Monitoraggio cumulativo

Avevamo usato il reduceByKeyAndWindow (...) API per tenere traccia degli stati delle chiavi, tuttavia il windowing pone limitazioni per alcuni casi d'uso. E se volessimo accumulare gli stati delle chiavi dappertutto invece di limitarli a una finestra temporale? In tal caso dovremmo usare updateStateByKey (...) FUOCO.



Questa API è stata introdotta in Spark 1.3.0 ed è stata molto popolare. Tuttavia, questa API ha un certo sovraccarico delle prestazioni, le sue prestazioni si degradano con l'aumentare della dimensione degli stati nel tempo. Ho scritto un esempio per mostrare l'utilizzo di questa API. Puoi trovare il codice Qui .

Spark 1.6.0 ha introdotto una nuova API mapWithState (...) che risolve le spese generali di prestazione poste da updateStateByKey (...) . In questo blog discuterò di questa particolare API utilizzando un programma di esempio che ho scritto. Puoi trovare il codice Qui .

Prima di immergermi nella procedura dettagliata del codice, risparmiamo qualche parola sul checkpoint. Per qualsiasi trasformazione stateful, il checkpoint è obbligatorio. Il checkpoint è un meccanismo per ripristinare lo stato delle chiavi nel caso in cui il programma del driver fallisca. Al riavvio del driver, lo stato delle chiavi viene ripristinato dai file di checkpoint. Le posizioni dei checkpoint sono generalmente HDFS o Amazon S3 o qualsiasi archiviazione affidabile. Durante il test del codice, è anche possibile archiviarlo nel file system locale.



Nel programma di esempio, ascoltiamo il flusso di testo del socket su host = localhost e porta = 9999. Tokenizza il flusso in entrata in (parole, numero di occorrenze) e tiene traccia del conteggio delle parole utilizzando l'API 1.6.0 mapWithState (...) . Inoltre, le chiavi senza aggiornamenti vengono rimosse utilizzando StateSpec.timeout API. Stiamo effettuando il checkpoint in HDFS e la frequenza del checkpoint è ogni 20 secondi.

Per prima cosa creiamo una sessione di Spark Streaming,

Spark-streaming-session

Creiamo un file checkpointDir nell'HDFS e quindi chiamare il metodo dell'oggetto getOrCreate (...) . Il getOrCreate L'API controlla il file checkpointDir per vedere se ci sono stati precedenti da ripristinare, se esiste, quindi ricrea la sessione di Spark Streaming e aggiorna gli stati delle chiavi dai dati memorizzati nei file prima di passare a nuovi dati. Altrimenti crea una nuova sessione di Spark Streaming.

Il getOrCreate prende il nome della directory del checkpoint e una funzione (che abbiamo chiamato createFunc ) la cui firma dovrebbe essere () => StreamingContext .

istruzione goto in c ++

Esaminiamo il codice all'interno createFunc .

Riga # 2: creiamo un contesto di streaming con il nome del lavoro su 'TestMapWithStateJob' e intervallo batch = 5 secondi.

Riga 5: imposta la directory del checkpoint.

Riga # 8: imposta la specifica dello stato usando la classe org.apache.streaming.StateSpec oggetto. Per prima cosa impostiamo la funzione che seguirà lo stato, quindi impostiamo il numero di partizioni per i DStream risultanti che devono essere generati durante le trasformazioni successive. Infine impostiamo il timeout (a 30 secondi) in cui se un aggiornamento per una chiave non viene ricevuto entro 30 secondi, lo stato della chiave verrà rimosso.

Riga 12 #: imposta il flusso del socket, appiattisci i dati batch in entrata, crea una coppia chiave-valore, chiama mapWithState , imposta l'intervallo di checkpoint a 20 secondi e infine stampa i risultati.

Il framework Spark chiama th e createFunc per ogni chiave con il valore precedente e lo stato corrente. Calcoliamo la somma e aggiorniamo lo stato con la somma cumulativa e infine restituiamo la somma per la chiave.

cosa fa trim in java

Fonti Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Hai domande per noi? Per favore menzionalo nella sezione commenti e ti risponderemo.

Post correlati:

Inizia con Apache Spark e Scala

Trasformazioni stateful con windowing in Spark Streaming