DBInputFormat per trasferire dati da SQL a database NoSQL



Obiettivo di questo blog è imparare come trasferire dati da database SQL a HDFS, come trasferire dati da database SQL a database NoSQL.

In questo blog esploreremo le capacità e le possibilità di uno dei componenti più importanti della tecnologia Hadoop, ovvero MapReduce.

Oggi, le aziende stanno adottando il framework Hadoop come prima scelta per l'archiviazione dei dati grazie alle sue capacità di gestire efficacemente dati di grandi dimensioni. Ma sappiamo anche che i dati sono versatili ed esistono in varie strutture e formati. Per controllare una così grande varietà di dati e dei suoi diversi formati dovrebbe esserci un meccanismo per accogliere tutte le varietà e tuttavia produrre un risultato efficace e coerente.





Il componente più potente nel framework Hadoop è MapReduce che può fornire il controllo sui dati e sulla sua struttura meglio delle sue altre controparti. Sebbene richieda un overhead della curva di apprendimento e della complessità di programmazione, se riesci a gestire queste complessità puoi sicuramente gestire qualsiasi tipo di dati con Hadoop.

Il framework MapReduce suddivide tutte le sue attività di elaborazione fondamentalmente in due fasi: Mappa e Riduci.



La preparazione dei dati grezzi per queste fasi richiede la comprensione di alcune classi e interfacce di base. La super classe per questi ritrattamento è InputFormat.

Il InputFormat class è una delle classi principali nell'API MapReduce di Hadoop. Questa classe è responsabile della definizione di due cose principali:

  • I dati si dividono
  • Lettore di dischi

Divisione dei dati è un concetto fondamentale nel framework Hadoop MapReduce che definisce sia la dimensione delle singole attività della mappa che il suo potenziale server di esecuzione. Il Record Reader è responsabile della lettura effettiva dei record dal file di input e dell'invio (come coppie chiave / valore) al mappatore.



Il numero di mappatori viene deciso in base al numero di divisioni. È compito di InputFormat creare le suddivisioni. La maggior parte delle volte la dimensione della divisione del tempo è equivalente alla dimensione del blocco, ma non è sempre che le suddivisioni vengano create in base alla dimensione del blocco HDFS. Dipende totalmente da come è stato sovrascritto il metodo getSplits () del tuo InputFormat.

C'è una differenza fondamentale tra la divisione MR e il blocco HDFS. Un blocco è un blocco fisico di dati mentre una suddivisione è solo un blocco logico letto da un mappatore. Una divisione non contiene i dati di input, contiene solo un riferimento o un indirizzo dei dati. Una divisione ha fondamentalmente due cose: una lunghezza in byte e un insieme di posizioni di archiviazione, che sono solo stringhe.

Per capirlo meglio, prendiamo un esempio: elaborazione dei dati archiviati in MySQL utilizzando MR. Poiché in questo caso non esiste il concetto di blocchi, la teoria: 'le suddivisioni vengono sempre create in base al blocco HDFS',non riesce. Una possibilità è creare divisioni basate su intervalli di righe nella tabella MySQL (e questo è ciò che fa DBInputFormat, un formato di input per la lettura dei dati da un database relazionale). Possiamo avere k numero di divisioni costituite da n righe.

cos'è un deadlock in java

È solo per InputFormats basato su FileInputFormat (un InputFormat per la gestione dei dati archiviati nei file) che le suddivisioni vengono create in base alla dimensione totale, in byte, dei file di input. Tuttavia, la dimensione del blocco FileSystem dei file di input viene considerata come un limite superiore per le suddivisioni di input. Se hai un file più piccolo della dimensione del blocco HDFS, otterrai solo 1 mappatore per quel file. Se vuoi avere un comportamento diverso, puoi usare mapred.min.split.size. Ma ancora una volta dipende esclusivamente da getSplits () del tuo InputFormat.

Abbiamo così tanti formati di input preesistenti disponibili nel pacchetto org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

L'impostazione predefinita è TextInputFormat.

Allo stesso modo, abbiamo così tanti formati di output che leggono i dati dai riduttori e li memorizzano in HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

L'impostazione predefinita è TextOutputFormat.

Quando finirai di leggere questo blog, avresti imparato:

cos'è apache spark vs hadoop
  • Come scrivere un programma di riduzione della mappa
  • Informazioni sui diversi tipi di InputFormats disponibili in Mapreduce
  • Qual è la necessità di InputFormats
  • Come scrivere InputFormats personalizzati
  • Come trasferire dati da database SQL a HDFS
  • Come trasferire dati da database SQL (qui MySQL) a database NoSQL (qui Hbase)
  • Come trasferire i dati da un database SQL a un'altra tabella nei database SQL (Forse questo potrebbe non essere così importante se lo facciamo nello stesso database SQL. Tuttavia, non c'è niente di sbagliato nell'avere una conoscenza dello stesso. Non si sa mai come può entrare in uso)

Prerequisito:

  • Hadoop preinstallato
  • SQL preinstallato
  • Hbase preinstallato
  • Comprensione di base di Java
  • MapReduce la conoscenza
  • Conoscenza di base del framework Hadoop

Comprendiamo la dichiarazione del problema che risolveremo qui:

Abbiamo una tabella dei dipendenti nel database MySQL nel nostro database relazionale Edureka. Ora, secondo i requisiti aziendali, dobbiamo spostare tutti i dati disponibili nel DB relazionale nel file system Hadoop, ovvero HDFS, DB NoSQL noto come Hbase.

Abbiamo molte opzioni per eseguire questa operazione:

  • Sqoop
  • Flume
  • Riduci mappa

Ora non si desidera installare e configurare nessun altro strumento per questa operazione. Ti rimane solo un'opzione che è il framework di elaborazione di Hadoop MapReduce. Il framework MapReduce ti darebbe il pieno controllo sui dati durante il trasferimento. Puoi manipolare le colonne e metterle direttamente in una delle due posizioni di destinazione.

Nota:

  • Dobbiamo scaricare e inserire il connettore MySQL nel classpath di Hadoop per recuperare le tabelle dalla tabella MySQL. Per fare ciò, scarica il connettore com.mysql.jdbc_5.1.5.jar e tienilo nella directory Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Inoltre, metti tutti i jar Hbase sotto il classpath di Hadoop per fare in modo che il tuo programma MR acceda a Hbase. A tale scopo, eseguire il seguente comando :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Le versioni software che ho utilizzato nell'esecuzione di questa attività sono:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Eclipse Moon

Al fine di evitare il programma in qualsiasi problema di compatibilità, prescrivo ai miei lettori di eseguire il comando con un ambiente simile.

DBInput personalizzatoWritable:

pacchetto com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable classe pubblica DBInputWritable implementa Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) genera SQLException // L'oggetto Resultset rappresenta i dati restituiti da un'istruzione SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) genera IOException { } public void write (PreparedStatement ps) genera SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Custom DBOutputWritable:

pacchetto com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable classe pubblica DBOutputWritable implementa Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) genera IOException {} public void readFields (ResultSet rs) genera SQLException {} public void write (DataOutput out) genera IOException {} public void write (PreparedStatement ps) genera SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

Tabella di input:

creare database edureka
crea tabella emp (empid int non null, nome varchar (30), dept varchar (20), chiave primaria (empid))
inserire nei valori emp (1, 'abhay', 'developmentement'), (2, 'brundesh', 'test')
seleziona * da emp

Caso 1: trasferimento da MySQL a HDFS

pacchetto com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) genera un'eccezione {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // classe driver' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // nome utente' root ') // password Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.putputPormat) FileOutset new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} / / table colonne) Path p = new Path (args [0]) FileSystem fs = FileSystem.get (new URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Questo pezzo di codice ci permette di preparare o configurare il formato di input per accedere al nostro database SQL sorgente. Il parametro include la classe del driver, l'URL ha l'indirizzo del database SQL, il suo nome utente e la password.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe driver 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // nome utente 'root') //parola d'ordine

Questa parte di codice ci consente di passare i dettagli delle tabelle nel database e di impostarli nell'oggetto job. I parametri includono ovviamente l'istanza di lavoro, la classe scrivibile personalizzata che deve implementare l'interfaccia DBWritable, il nome della tabella di origine, la condizione se qualsiasi altra null, qualsiasi altro parametro di ordinamento null, l'elenco delle colonne della tabella rispettivamente.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} // table columns)

Mapper

pacchetto com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map estende Mapper {
mappa del vuoto protetto (chiave LongWritable, valore DBInputWritable, Context ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (nuovo testo (nome + '' + id + '' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterructedException e) {e.printStackTrace ()}}}

Reducer: Identity Reducer utilizzato

Comando da eseguire:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Output: tabella MySQL trasferita su HDFS

hadoop dfs -ls / dbtohdfs / *

Caso 2: trasferimento da una tabella in MySQL a un'altra in MySQL

creazione di una tabella di output in MySQL

crea tabella dipendente1 (nome varchar (20), id int, dept varchar (20))

pacchetto com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) genera un'eccezione {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // nome utente' root ') // password Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid ',' name ',' dept '} // colonne della tabella) DBOutputFormat.setOutput (job,' employee1 ', // output table name new String [] {' name ',' id ',' dept '} // table colonne) System.exit (job.waitForCompletion (true)? 0: 1)}}

Questa parte di codice ci consente di configurare il nome della tabella di output nel DB SQL. I parametri sono rispettivamente istanza di lavoro, nome della tabella di output e nomi delle colonne di output.

DBOutputFormat.setOutput (job, 'employee1', // output table name new String [] {'name', 'id', 'dept'} // table columns)

Mappatore: come il caso 1

Riduttore:

pacchetto com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce (Text key, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (riga [0] .toString (), Integer.parseInt (riga [1] .toString ()), riga [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterructedException e) {e.printStackTrace ()}}}

Comando da eseguire:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Output: dati trasferiti dalla tabella EMP in MySQL a un'altra tabella Employee1 in MySQL

Caso 3: trasferimento dalla tabella in MySQL alla tabella NoSQL (Hbase)

Creazione della tabella Hbase per accogliere l'output dalla tabella SQL:

crea 'dipendente', 'info_ufficiale'

Classe del conducente:

pacchetto Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe driver 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // nome utente 'root') // password Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('dipendente', Reduce.class, lavoro) job.setMapOutputValueClass. class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] {'empid', 'name', 'dept'} // table columns) System.exit (job.waitForCompletion (true)? 0: 1)}}

Questa parte di codice ti consente di configurare la classe della chiave di output che nel caso di hbaseè ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Qui stiamo passando il nome della tabella hbase e il riduttore per agire sulla tabella.

cosa è istanza in Python
TableMapReduceUtil.initTableReducerJob ('dipendente', Reduce.class, lavoro)

Mappatore:

pacchetto Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map estende Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {prova {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterructedException e) {e.printStackTrace ()}}}

In questo pezzo di codice stiamo prendendo i valori dai getter della classe DBinputwritable e poi li passiamo
ImmutableBytesWritable in modo che raggiungano il riduttore in forma bytewriatble che Hbase comprende.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + dept ))

Riduttore:

pacchetto Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce estende TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterructedException {String [] cause = null // Loop values for (Text val: values) {cause = val.toString (). split ('')} // Put to HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info' ), Bytes.toBytes ('name'), Bytes.toBytes (causa [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (causa [1 ])) context.write (key, put)}}

Questo pezzo di codice ci consente di decidere la riga esatta e la colonna in cui memorizzare i valori dal riduttore. Qui stiamo memorizzando ogni empid in una riga separata poiché abbiamo creato empid come chiave di riga che sarebbe univoca. In ogni riga vengono archiviate le informazioni ufficiali dei dipendenti nella famiglia di colonne 'info_ufficiale' rispettivamente nelle colonne 'nome' e 'reparto'.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (causa [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (causa [1])) context.write (key, put)

Dati trasferiti in Hbase:

scansiona dipendente

Come si vede, siamo stati in grado di completare con successo l'attività di migrazione dei nostri dati aziendali da un DB SQL relazionale a un DB NoSQL.

Nel prossimo blog impareremo come scrivere ed eseguire codici per altri formati di input e output.

Continua a pubblicare commenti, domande o feedback. Mi farebbe piacere avere tue notizie.

Hai domande per noi? Per favore menzionalo nella sezione commenti e ti risponderemo.

Post correlati: