DBInputFormat для передачи данных из SQL в базу данных NoSQL



Цель этого блога - научиться переносить данные из баз данных SQL в HDFS, как передавать данные из баз данных SQL в базы данных NoSQL.

В этом блоге мы исследуем возможности и возможности одного из наиболее важных компонентов технологии Hadoop, то есть MapReduce.

Сегодня компании выбирают платформу Hadoop в качестве первого выбора для хранения данных из-за ее способности эффективно обрабатывать большие объемы данных. Но мы также знаем, что данные универсальны и существуют в различных структурах и форматах. Чтобы управлять таким огромным разнообразием данных и их различными форматами, должен существовать механизм, позволяющий учесть все разновидности и при этом обеспечить эффективный и последовательный результат.





Самый мощный компонент фреймворка Hadoop - MapReduce, который может обеспечивать контроль данных и их структуры лучше, чем другие аналоги. Хотя это требует дополнительных затрат на обучение и сложности программирования, если вы можете справиться с этими сложностями, вы, несомненно, сможете обрабатывать любые данные с помощью Hadoop.

Фреймворк MapReduce разбивает все задачи обработки на два основных этапа: отображение и сокращение.



Подготовка исходных данных к этим этапам требует понимания некоторых основных классов и интерфейсов. Суперкласс для такой обработки - InputFormat.

В InputFormat class - один из основных классов в Hadoop MapReduce API. Этот класс отвечает за определение двух основных вещей:

  • Разделение данных
  • Рекордер

Разделение данных - это фундаментальная концепция в структуре Hadoop MapReduce, которая определяет как размер отдельных задач карты, так и ее потенциальный сервер выполнения. В Читатель Записи отвечает за фактическое чтение записей из входного файла и отправку их (в виде пар ключ / значение) преобразователю.



Количество картографов определяется на основе количества разделений. Задача InputFormat - создавать разбиения. В большинстве случаев размер разделения по времени эквивалентен размеру блока, но не всегда разделение создается на основе размера блока HDFS. Это полностью зависит от того, как был переопределен метод getSplits () вашего InputFormat.

Между разделением MR и блоком HDFS есть принципиальная разница. Блок - это физический фрагмент данных, а разделение - это просто логический фрагмент, который читает преобразователь. Разделение не содержит входных данных, оно просто содержит ссылку или адрес данных. Разделение в основном имеет две вещи: длину в байтах и ​​набор мест хранения, которые представляют собой просто строки.

Чтобы лучше понять это, возьмем один пример: обработка данных, хранящихся в MySQL, с помощью MR. Поскольку в данном случае нет понятия блоков, теория: «разбиения всегда создаются на основе блока HDFS»,терпит неудачу. Одна из возможностей - создать разбиения на основе диапазонов строк в вашей таблице MySQL (и это то, что делает DBInputFormat, формат ввода для чтения данных из реляционных баз данных). У нас может быть k разбиений, состоящих из n строк.

Только для InputFormats, основанного на FileInputFormat (InputFormat для обработки данных, хранящихся в файлах), разделения создаются на основе общего размера входных файлов в байтах. Однако размер блока файловой системы входных файлов рассматривается как верхняя граница для входных разделений. Если ваш файл меньше размера блока HDFS, вы получите только 1 сопоставитель для этого файла. Если вы хотите иметь другое поведение, вы можете использовать mapred.min.split.size. Но это опять же зависит исключительно от getSplits () вашего InputFormat.

У нас есть так много уже существующих форматов ввода, доступных в пакете org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

разница между перегрузкой метода и переопределением

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

По умолчанию используется TextInputFormat.

Точно так же у нас есть так много выходных форматов, которые считывают данные из редукторов и сохраняют их в HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

c ++ идет сортировка

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

По умолчанию TextOutputFormat.

К тому времени, когда вы дочитаете этот блог, вы уже узнали бы:

  • Как написать программу уменьшения карты
  • О различных типах входных форматов, доступных в Mapreduce
  • Зачем нужны InputFormats
  • Как писать собственные форматы ввода
  • Как перенести данные из баз данных SQL в HDFS
  • Как перенести данные из баз данных SQL (здесь MySQL) в базы данных NoSQL (здесь Hbase)
  • Как перенести данные из одной базы данных SQL в другую таблицу в базах данных SQL (Возможно, это не так важно, если мы делаем это в той же базе данных SQL. Однако нет ничего плохого в том, чтобы знать то же самое. Вы никогда не знаете как это можно использовать)

Предпосылка:

  • Предустановлен Hadoop
  • Предварительно установленный SQL
  • Hbase предустановлен
  • Базовое понимание Java
  • MapReduce знаний
  • Базовые знания фреймворка Hadoop

Давайте разберемся с постановкой задачи, которую мы собираемся решить здесь:

У нас есть таблица сотрудников в базе данных MySQL в нашей реляционной базе данных Edureka. Теперь согласно бизнес-требованиям мы должны перенести все данные, доступные в реляционной БД, в файловую систему Hadoop, то есть HDFS, базу данных NoSQL, известную как Hbase.

У нас есть много вариантов для выполнения этой задачи:

  • Sqoop
  • Лоток
  • Уменьшение карты

Теперь вы не хотите устанавливать и настраивать какой-либо другой инструмент для этой операции. Остается только один вариант - среда обработки Hadoop MapReduce. Платформа MapReduce предоставит вам полный контроль над данными во время передачи. Вы можете манипулировать столбцами и размещать их прямо в любом из двух целевых мест.

Заметка:

  • Нам нужно загрузить и поместить коннектор MySQL в путь к классам Hadoop для выборки таблиц из таблицы MySQL. Для этого загрузите коннектор com.mysql.jdbc_5.1.5.jar и храните его в каталоге Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Кроме того, поместите все jar-файлы Hbase в путь к классам Hadoop, чтобы ваша программа MR имела доступ к Hbase. Для этого выполните следующую команду :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Версии программного обеспечения, которые я использовал при выполнении этой задачи:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Затмение Луны

Чтобы избежать проблем с совместимостью программы, я рекомендую своим читателям запускать команду в аналогичной среде.

Пользовательский DBInputWritable:

пакет com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable открытый класс DBInputWritable реализует Writable, DBWritable {private int id, частное строковое имя, dept public void readFields (DataInput in) выбрасывает IOException {} public void readFields (ResultSet rs) throws SQLException // Объект Resultset представляет данные, возвращенные оператором SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) выбрасывает IOException { } public void write (PreparedStatement ps) генерирует SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Пользовательский DBOutputWritable:

пакет com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable открытый класс DBOutputWritable реализует Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) выбрасывает IOException {} public void readFields (ResultSet rs) выбрасывает SQLException {} public void write (DataOutput out) выбрасывает IOException {} public void write (PreparedStatement ps) выдает SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

Таблица ввода:

создать базу данных edureka
создать таблицу emp (empid int not null, name varchar (30), dept varchar (20), первичный ключ (empid))
вставить в значения emp (1, 'abhay', 'developement'), (2, 'brundesh', 'test')
выберите * из emp

Случай 1: переход из MySQL в HDFS

пакет com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable открытый класс MainDbtohdfs {public static void main (String [] args) выдает исключение {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // класс драйвера' jdbc: mysql: // localhost: 3306 / edureka ', // URL-адрес базы данных' root ', // имя пользователя' root ') // пароль Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormatput.class) FileOutputFormatClass (DBInputFormatput. new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // имя входной таблицы null, null, new String [] {'empid', 'name', 'dept'} / / table columns) Путь p = новый Путь (args [0]) FileSystem fs = FileSystem.get (новый URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Этот фрагмент кода позволяет нам подготовить или настроить формат ввода для доступа к нашей исходной базе данных SQL. Параметр включает класс драйвера, URL-адрес содержит адрес базы данных SQL, ее имя пользователя и пароль.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // класс драйвера 'jdbc: mysql: // localhost: 3306 / edureka', // URL-адрес базы данных 'root', // имя пользователя 'root') //пароль

Этот фрагмент кода позволяет нам передавать сведения о таблицах в базе данных и устанавливать их в объекте задания. Параметры включают, конечно, экземпляр задания, настраиваемый записываемый класс, который должен реализовывать интерфейс DBWritable, имя исходной таблицы, условие, если любое другое значение null, любые параметры сортировки еще null, список столбцов таблицы соответственно.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // имя входной таблицы null, null, new String [] {'empid', 'name', 'dept'} // столбцы таблицы)

Картограф

пакет com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable открытый класс Map расширяет Mapper {
защищенная карта void (ключ LongWritable, значение DBInputWritable, контекст ctx) {попробуйте {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (новый текст (имя + '' + id + '' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Редуктор: Используемый редуктор идентичности

Команда для запуска:

Hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Вывод: таблица MySQL перенесена в HDFS

hadoop dfs -ls / dbtohdfs / *

Случай 2: переход от одной таблицы в MySQL к другой в MySQL

создание выходной таблицы в MySQL

создать таблицу employee1 (имя varchar (20), id int, dept varchar (20))

package com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable общедоступный класс Mainonetable_to_other_table {public static void main (String [] args) выдает исключение {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // класс драйвера 'jdbc: mysql: // localhost : 3306 / edureka ', // url базы данных' root ', // имя пользователя' root ') // пароль Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // имя входной таблицы null, null, new String [] {'empid ',' name ',' dept '} // столбцы таблицы) DBOutputFormat.setOutput (job,' employee1 ', // имя выходной таблицы new String [] {' name ',' id ',' dept '} // таблица столбцы) System.exit (job.waitForCompletion (true)? 0: 1)}}

Этот фрагмент кода позволяет нам настроить имя выходной таблицы в базе данных SQL. Параметрами являются экземпляр задания, имя выходной таблицы и имена выходных столбцов соответственно.

DBOutputFormat.setOutput (job, 'employee1', // имя выходной таблицы new String [] {'name', 'id', 'dept'} // столбцы таблицы)

Картограф: как в случае 1

Редуктор:

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable открытый класс Reduce extends Reducer {protected void reduce (Text key, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (строка [0] .toString (), Integer.parseInt (строка [1] .toString ()), строка [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Команда для запуска:

хадуп jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Вывод: перенесенные данные из таблицы EMP в MySQL в другую таблицу Employee1 в MySQL

Случай 3: переход из таблицы в MySQL в таблицу NoSQL (Hbase)

Создание таблицы Hbase для размещения вывода из таблицы SQL:

создать 'сотрудник', 'официальная информация'

Класс водителя:

пакет Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org. .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws = Exception {Configuration conf HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // класс драйвера 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // имя пользователя 'root') // пароль Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('служащий', Reduce.class, jobClassput. class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // имя входной таблицы null, null, new String [] {'empid', 'name', 'dept'} // столбцы таблицы) System.exit (job.waitForCompletion (правда)? 0: 1)}}

Этот фрагмент кода позволяет настроить класс выходных ключей, который в случае hbase имеет значение ImmutableBytesWritable.

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Здесь мы передаем имя таблицы hbase и редуктор для работы с таблицей.

TableMapReduceUtil.initTableReducerJob ('сотрудник', Reduce.class, job)

Картограф:

пакет Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map extends Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {попробуйте {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), новый текст (строка + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

В этом фрагменте кода мы берем значения из получателей класса DBinputwritable и затем передаем их в
ImmutableBytesWritable, так что они достигают редуктора в форме с байтовой записью, которую понимает Hbase.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), новый текст (строка + '' + dept ))

Редуктор:

пакет Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.ha .hbase.util.Bytes import org.apache.hadoop.io.Text открытый класс Reduce extends TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) выбрасывает IOException, InterruptedException {String [] cause = null // Значения цикла for (Text val: values) {cause = val.toString (). split ('')} // Поместить в HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info' ), Bytes.toBytes ('имя'), Bytes.toBytes (причина [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('отдел'), Bytes.toBytes (причина [1 ])) context.write (ключ, положить)}}

Этот фрагмент кода позволяет нам определить точную строку и столбец, в которых мы будем хранить значения из редуктора. Здесь мы храним каждый empid в отдельной строке, поскольку мы сделали empid как ключ строки, который будет уникальным. В каждой строке мы храним официальную информацию о сотрудниках в семействе столбцов «official_info», в столбцах «имя» и «отдел» соответственно.

Положить put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (cause [0])) put.add (Bytes. toBytes ('официальная_инфо'), Bytes.toBytes ('отдел'), Bytes.toBytes (причина [1])) context.write (ключ, положить)

Перенесенные данные в Hbase:

сканировать сотрудника

Как мы видим, нам удалось успешно выполнить задачу переноса наших бизнес-данных из реляционной базы данных SQL в базу данных NoSQL.

В следующем блоге мы узнаем, как писать и выполнять коды для других форматов ввода и вывода.

Продолжайте публиковать свои комментарии, вопросы или отзывы. Я хотел бы получить известие от вас.

Есть вопрос к нам? Пожалуйста, укажите это в комментариях, и мы свяжемся с вами.

Похожие сообщения: