RDD utilizzando Spark: The Building Block of Apache Spark



Questo blog su RDD che utilizza Spark ti fornirà una conoscenza dettagliata e completa di RDD, che è l'unità fondamentale di Spark e quanto è utile.

, La parola stessa è sufficiente per generare una scintilla nella mente di ogni ingegnere Hadoop. PER n in memoria strumento di elaborazione che è velocissimo nel cluster computing. Rispetto a MapReduce, la condivisione dei dati in memoria rende RDD 10-100x Più veloce rispetto alla condivisione di rete e disco e tutto questo è possibile grazie agli RDD (Resilient Distributed Data set). I punti chiave su cui ci concentriamo oggi in questo articolo RDD utilizzando Spark sono:

Hai bisogno di RDD?

Perché abbiamo bisogno di RDD? -RDD utilizzando Spark





array di oggetti nel programma di esempio Java

Il mondo si sta evolvendo con e Scienza dei dati a causa del progresso in . Algoritmi basato su Regressione , , e che funziona Distribuito Calcolo iterativo azione moda che include il riutilizzo e la condivisione dei dati tra più unità di calcolo.

Il tradizionale le tecniche richiedevano uno stoccaggio intermedio e distribuito stabile come HDFS comprendendo calcoli ripetitivi con repliche di dati e serializzazione dei dati, che ha reso il processo molto più lento. Trovare una soluzione non è mai stato facile.



Qui è dove RDD (Resilient Distributed Datasets) arriva al quadro generale.

RDD Sono facili da usare e facili da creare poiché i dati vengono importati da origini dati e rilasciati negli RDD. Inoltre, le operazioni vengono applicate per elaborarle. Loro sono un raccolta distribuita di memoria con autorizzazioni come Sola lettura e, cosa più importante, lo sono Tollerante agli errori .



Se ce ne sono partizione dati di l'RDD è perduto , può essere rigenerato applicando lo stesso trasformazione operazione su quella partizione persa in lignaggio , piuttosto che elaborare tutti i dati da zero. Questo tipo di approccio in scenari in tempo reale può fare miracoli in situazioni di perdita di dati o quando un sistema non funziona.

Cosa sono gli RDD?

RDD o ( Data set distribuito resiliente ) è fondamentale struttura dati in Spark. Il termine Resiliente definisce la capacità che genera i dati automaticamente o dati rotolando indietro al stato originale quando si verifica una calamità imprevista con una probabilità di perdita di dati.

I dati scritti negli RDD sono partizionato e memorizzato in più nodi eseguibili . Se un nodo in esecuzione non riesce durante il tempo di esecuzione, quindi ottiene immediatamente il backup dal file successivo nodo eseguibile . Questo è il motivo per cui gli RDD sono considerati un tipo avanzato di strutture dati rispetto ad altre strutture dati tradizionali. Gli RDD possono memorizzare dati strutturati, non strutturati e semi-strutturati.

Andiamo avanti con il nostro RDD utilizzando il blog Spark e scopriamo le caratteristiche uniche degli RDD che gli conferiscono un vantaggio rispetto ad altri tipi di strutture dati.

Caratteristiche di RDD

  • In memoria (RAM) Calcoli : Il concetto di calcolo in memoria porta l'elaborazione dei dati a una fase più rapida ed efficiente in cui il file prestazione del sistema è aggiornato.
  • L la sua valutazione : Il termine Valutazione pigra dice il trasformazioni vengono applicati ai dati in RDD, ma l'output non viene generato. Invece, le trasformazioni applicate sono registrato.
  • Persistenza : Gli RDD risultanti sono sempre riutilizzabile.
  • Operazioni a grana grossa : L'utente può applicare trasformazioni a tutti gli elementi nei set di dati tramite carta geografica, filtro o raggruppa per operazioni.
  • Tollerante agli errori : In caso di perdita di dati, il sistema può tornare indietro al suo stato originale utilizzando il file loggato trasformazioni .
  • Immutabilità : I dati definiti, recuperati o creati non possono essere cambiato una volta effettuato l'accesso al sistema. Nel caso in cui sia necessario accedere e modificare l'RDD esistente, è necessario creare un nuovo RDD applicando un set di Trasformazione funzioni sull'RDD corrente o precedente.
  • Partizionamento : È il unità cruciale di parallelismo in Spark RDD. Per impostazione predefinita, il numero di partizioni create è basato sull'origine dati. Puoi anche decidere il numero di partizioni che desideri utilizzare partizione personalizzata funzioni.

Creazione di RDD utilizzando Spark

Gli RDD possono essere creati in tre modi:

  1. Lettura dei dati da collezioni parallelizzate
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Applicazione trasformazione su RDD precedenti
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'potente', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Lettura dei dati da archiviazione esterna o percorsi di file come HDFS o HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operazioni eseguite su RDD:

Esistono principalmente due tipi di operazioni che vengono eseguite sugli RDD, ovvero:

  • Trasformazioni
  • Azioni

Trasformazioni : Il operazioni applichiamo su RDD a filtro, accesso e modificare i dati nella RDD genitore per generare un file successiva RDD è chiamato trasformazione . Il nuovo RDD restituisce un puntatore al precedente RDD assicurando la dipendenza tra di loro.

Le trasformazioni sono Valutazioni pigre, in altre parole, le operazioni applicate sull'RDD su cui stai lavorando verranno registrate ma non eseguito. Il sistema genera un risultato o un'eccezione dopo aver attivato il Azione .

Possiamo dividere le trasformazioni in due tipi come di seguito:

  • Trasformazioni strette
  • Ampie trasformazioni

Trasformazioni strette Applichiamo trasformazioni ristrette a un file singola partizione dell'RDD genitore per generare un nuovo RDD poiché i dati richiesti per elaborare l'RDD sono disponibili su una singola partizione del file genitore ASD . Gli esempi di trasformazioni strette sono:

  • carta geografica()
  • filtro()
  • flatMap ()
  • partizione()
  • mapPartitions ()

Ampie trasformazioni: Applichiamo l'ampia trasformazione partizioni multiple per generare un nuovo RDD. I dati necessari per elaborare l'RDD sono disponibili sulle più partizioni del genitore ASD . Gli esempi di trasformazioni ampie sono:

  • reduceBy ()
  • unione()

Azioni : Le azioni indicano ad Apache Spark di applicare calcolo e restituire il risultato o un'eccezione al driver RDD. Alcune delle azioni includono:

  • raccogliere()
  • contare()
  • prendere()
  • primo()

Applichiamo praticamente le operazioni sugli RDD:

qual è la differenza tra jquery e javascript

IPL (Indian Premier League) è un torneo di cricket con i suoi massimi livelli. Quindi, oggi mettiamo le mani sul set di dati IPL ed eseguiamo il nostro RDD utilizzando Spark.

  • In primo luogo, scarichiamo un dato di corrispondenza CSV dell'IPL. Dopo averlo scaricato, inizia a sembrare un file EXCEL con righe e colonne.

Nel passaggio successivo, attiviamo la scintilla e carichiamo il file match.csv dalla sua posizione, nel mio caso mycsvil percorso del file è '/User/edureka_566977/test/matches.csv'

Ora iniziamo con il Trasformazione prima parte:

  • carta geografica():

Noi usiamo Trasformazione della mappa applicare una specifica operazione di trasformazione su ogni elemento di un RDD. Qui creiamo un RDD per nome CKfile dove memorizzare il nostrocsvfile. Creeremo un altro RDD chiamato States to memorizzare i dettagli della città .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filtro():

Trasformazione del filtro, il nome stesso descrive il suo utilizzo. Usiamo questa operazione di trasformazione per filtrare i dati selettivi da una raccolta di dati forniti. Applichiamo funzionamento del filtro qui per ottenere i record delle partite IPL dell'anno 2017 e memorizzalo in fil RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Applichiamo flatMap è un'operazione di trasformazione a ciascuno degli elementi di un RDD per creare un nuovo RDD. È simile alla trasformazione della mappa. qui ci applichiamoFlatmapper sputare le partite della città di Hyderabad e memorizzare i dati infilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • partizione():

Ogni dato che scriviamo in un RDD viene suddiviso in un certo numero di partizioni. Usiamo questa trasformazione per trovare il file numero di partizioni i dati vengono effettivamente suddivisi in.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Consideriamo MapPatitions come un'alternativa a Map () eper ciascuno() insieme. Usiamo mapPartitions qui per trovare il file numero di righe abbiamo nel nostro fil RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reduceBy ():

Noi usiamoReduceBy() su Coppie chiave-valore . Abbiamo usato questa trasformazione sul nostrocsvfile per trovare il lettore con l'estensione miglior uomo in campo .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • unione():

Il nome spiega tutto, usiamo la trasformazione sindacale per club due RDD insieme . Qui stiamo creando due RDD: fil e fil2. fil RDD contiene i record delle corrispondenze IPL 2017 e fil2 RDD contiene i record delle partite IPL 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Cominciamo con il Azione parte in cui mostriamo l'output effettivo:

  • raccogliere():

Raccogliere è l'azione che usiamo per visualizzare i contenuti nella RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • contare():

Contareè un'azione che usiamo per contare il numero di record presente nella RDD.Quistiamo usando questa operazione per contare il numero totale di record nel nostro file match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

mongodb crea utente per database
  • prendere():

Take è un'operazione di azione simile a collect ma l'unica differenza è che può stamparne qualsiasi numero selettivo di righe come da richiesta dell'utente. Qui applichiamo il seguente codice per stampare il file primi dieci rapporti principali.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. prendere (10) .foreach (println)

  • primo():

First () è un'operazione di azione simile a collect () e take ()essousato per stampare il report più in alto s l'output Qui usiamo la prima operazione () per trovare il file numero massimo di partite giocate in una determinata città e otteniamo Mumbai come output.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Per rendere il nostro processo di apprendimento RDD utilizzando Spark, ancora più interessante, ho escogitato un caso d'uso interessante.

RDD utilizzando Spark: Pokemon Use Case

  • In primo luogo, Scarichiamo un file Pokemon.csv e lo carichiamo nella spark-shell come abbiamo fatto per il file Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

I Pokemon sono attualmente disponibili in una grande varietà, cerchiamo di trovare alcune varietà.

  • Rimozione dello schema dal file Pokemon.csv

Potremmo non aver bisogno del file Schema del file Pokemon.csv. Quindi, lo rimuoviamo.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Trovare il numero di partizioni il nostro pokemon.csv è distribuito in.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Pokemon d'acqua

Trovare il file numero di pokemon acquatici

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Pokemon di fuoco

Trovare il file numero di pokemon di fuoco

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Possiamo anche rilevare il file popolazione di un diverso tipo di pokemon utilizzando la funzione count
WaterRDD.count () FireRDD.count ()

  • Dato che mi piace il gioco di strategia difensiva cerchiamo di trovare il pokemon con massima difesa.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Conosciamo il massimo valore della forza di difesa ma non sappiamo quale pokemon sia. quindi, cerchiamo di trovare qual è quello Pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Ordinando [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Ora cerchiamo di risolvere il pokemon con minima difesa
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Vediamo ora i Pokemon con estensione strategia meno difensiva.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head )HemonNeaderNome2Pokemon .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Quindi, con questo, arriviamo alla fine di questo RDD usando l'articolo di Spark. Spero di aver fatto luce sulla tua conoscenza degli RDD, delle loro caratteristiche e dei vari tipi di operazioni che possono essere eseguite su di essi.

Questo articolo basato su è progettato per prepararti all'esame di certificazione per sviluppatori Cloudera Hadoop e Spark (CCA175). Otterrai una conoscenza approfondita di Apache Spark e dell'ecosistema Spark, che include Spark RDD, Spark SQL, Spark MLlib e Spark Streaming. Otterrai una conoscenza completa del linguaggio di programmazione Scala, HDFS, Sqoop, Flume, Spark GraphX ​​e del sistema di messaggistica come Kafka.