RDD с использованием Spark: строительный блок Apache Spark



Этот блог о RDD с использованием Spark предоставит вам подробные и всесторонние знания о RDD, который является фундаментальным элементом Spark, и насколько он полезен.

, Самого слова достаточно, чтобы зажечь искру в голове каждого инженера Hadoop. К n в памяти инструмент для обработки который является молниеносным в кластерных вычислениях. По сравнению с MapReduce совместное использование данных в памяти делает RDD 10–100 раз Быстрее чем совместное использование сети и диска, и все это возможно благодаря RDD (устойчивые распределенные наборы данных). Ключевые моменты, которые мы сосредоточим сегодня в этой статье RDD с использованием Spark:

Нужны RDD?

Зачем нам нужен RDD? -RDD с использованием Spark





Мир развивается вместе с и Data Science из-за продвижения в . Алгоритмы на основе Регресс , , и который работает на Распространено Итерационные вычисления действие мода, которая включает повторное использование и совместное использование данных между несколькими вычислительными устройствами.

Традиционный методы требовали стабильного промежуточного и распределенного хранилища, например HDFS состоящий из повторяющихся вычислений с репликацией и сериализацией данных, что значительно замедлило процесс. Найти решение всегда было нелегко.



Это где RDD (Устойчивые распределенные наборы данных) раскрывают общую картину.

RDD просты в использовании и легко создаются, поскольку данные импортируются из источников данных и помещаются в RDD. Далее применяются операции по их обработке. Они распределенный набор памяти с разрешениями как Только чтение и самое главное они Отказоустойчивой .



Если есть раздел данных из СДР потерял , его можно восстановить, применив тот же трансформация операция на том потерянном разделе в родословная , а не обрабатывать все данные с нуля. Такой подход в сценариях реального времени может творить чудеса в ситуациях потери данных или когда система не работает.

Что такое RDD?

RDD или ( Устойчивый распределенный набор данных ) является фундаментальным структура данных в Spark. Период, термин Эластичный определяет возможность автоматического создания данных или данных откат к исходное состояние когда происходит непредвиденная катастрофа с вероятностью потери данных.

Данные, записанные в RDD, разделенный и хранится в несколько исполняемых узлов . Если исполняющий узел терпит неудачу во время выполнения, тогда он мгновенно получает резервную копию от следующий исполняемый узел . Вот почему RDD рассматриваются как расширенный тип структур данных по сравнению с другими традиционными структурами данных. СДР могут хранить структурированные, неструктурированные и полуструктурированные данные.

Давайте продвинемся вперед с нашим RDD с использованием блога Spark и узнаем об уникальных функциях RDD, которые дают ему преимущество перед другими типами структур данных.

Особенности RDD

  • В памяти (ОЗУ) Расчеты : Концепция вычислений в памяти переводит обработку данных на более быстрый и эффективный этап, на котором общая спектакль системы обновлен.
  • L его оценка : Термин 'ленивая оценка' означает трансформации применяются к данным в СДР, но вывод не создается. Вместо этого применяемые преобразования зарегистрирован.
  • Упорство : Результирующие RDD всегда многоразовый.
  • Крупнозернистые операции : Пользователь может применять преобразования ко всем элементам в наборах данных с помощью карта, фильтр или группа по операции.
  • Отказоустойчивой : В случае потери данных система может откатиться к его исходное состояние используя зарегистрированный трансформации .
  • Неизменность : Данные, определенные, полученные или созданные, не могут быть изменено после входа в систему. В случае, если вам нужно получить доступ и изменить существующий RDD, вы должны создать новый RDD, применив набор Трансформация функции для текущего или предыдущего RDD.
  • Разбиение : Это решающий блок параллелизма в Spark RDD. По умолчанию количество созданных разделов зависит от вашего источника данных. Вы даже можете решить, сколько разделов вы хотите сделать, используя пользовательский раздел функции.

Создание RDD с использованием Spark

RDD могут быть созданы в три способа:

  1. Чтение данных из параллельные коллекции
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Применение трансформация по предыдущим СДР
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'очень', 'мощный', 'язык')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Чтение данных из внешнее хранилище или пути к файлам, например HDFS или HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Операции, выполняемые над RDD:

В основном есть два типа операций, которые выполняются над RDD, а именно:

  • Трансформации
  • Действия

Трансформации : В операции мы применяем RDD к фильтр, доступ и модифицировать данные в родительском RDD для создания последовательный RDD называется трансформация . Новый СДР возвращает указатель на предыдущий СДР, обеспечивая зависимость между ними.

Преобразования Ленивые оценки, другими словами, операции, применяемые к RDD, с которым вы работаете, будут регистрироваться, но не выполнен. Система выдает результат или исключение после запуска Действие .

Мы можем разделить преобразования на два типа, как показано ниже:

передать по значению vs передать по ссылке java
  • Узкие преобразования
  • Широкие трансформации

Узкие преобразования Применяем узкие преобразования к один раздел родительского RDD для создания нового RDD, поскольку данные, необходимые для обработки RDD, доступны в одном разделе родительский РАС . Примеры узких преобразований:

  • карта()
  • фильтр()
  • flatMap ()
  • раздел ()
  • mapPartitions ()

Широкие трансформации: Применяем широкую трансформацию на несколько разделов для создания нового RDD. Данные, необходимые для обработки RDD, доступны в нескольких разделах родительский аутизм . Примеры широких преобразований:

  • reduceBy ()
  • союз ()

Действия : Действия предписывают Apache Spark применить вычисление и передать результат или исключение обратно драйверу RDD. Некоторые из действий включают:

  • собирать ()
  • count ()
  • взять ()
  • первый()

Применим практически операции над RDD:

IPL (Индийская премьер-лига) это турнир по крикету, который идет на пике популярности. Итак, давайте сегодня приступим к набору данных IPL и выполним наш RDD с помощью Spark.

  • В первую очередь, давайте загрузим CSV-данные соответствия IPL. После загрузки он начинает выглядеть как файл EXCEL со строками и столбцами.

На следующем шаге мы зажигаем искру и загружаем файл match.csv из его местоположения, в моем случаеcsvрасположение файла «/User/edureka_566977/test/matches.csv»

Теперь давайте начнем с Трансформация часть первая:

  • карта():

Мы используем Преобразование карты чтобы применить определенную операцию преобразования к каждому элементу СДР. Здесь мы создаем RDD по имени CKfile, в котором храним нашиcsvфайл. Мы создадим еще один RDD под названием States to хранить детали города .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val состояния = CKfile.map (_. split (',') (2)) состояния.collect (). foreach (println)

  • фильтр():

Преобразование фильтра, само название описывает его использование. Мы используем эту операцию преобразования, чтобы отфильтровать выборочные данные из набора данных. Мы применяем работа фильтра здесь, чтобы получить рекорды матчей IPL года 2017 г. и сохраните его в fil RDD.

val fil = CKfile.filter (строка => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Мы применяем flatMap - операцию преобразования к каждому из элементов RDD для создания newRDD. Это похоже на преобразование карты. здесь мы применяемFlatmapк выплюнуть матчи города Хайдарабад и сохраните данные вfilRDDRDD.

val filRDD = fil.flatMap (строка => line.split ('Хайдарабад')). collect ()

  • раздел ():

Все данные, которые мы записываем в RDD, разбиты на определенное количество разделов. Мы используем это преобразование, чтобы найти количество разделов данные фактически разбиты на.

val fil = CKfile.filter (строка => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Мы рассматриваем MapPatitions как альтернативу Map () идля каждого() все вместе. Здесь мы используем mapPartitions, чтобы найти количество рядов у нас в fil RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reduceBy ():

Мы используемReduceBy() на Пары ключ-значение . Мы использовали это преобразование на нашемcsvфайл, чтобы найти игрока с лучший игрок матчей .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (ложь) ManOTH.take (10) .foreach (println)

  • union ():

Название объясняет все, мы используем преобразование объединения, чтобы объединить два RDD вместе . Здесь мы создаем два RDD, а именно fil и fil2. fil RDD содержит записи о совпадениях IPL за 2017 год, а fil2 RDD содержит запись о совпадениях IPL за 2016 год.

val fil = CKfile.filter (строка => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Начнем с Действие часть, где мы показываем фактический результат:

  • collect ():

Сбор - это действие, которое мы используем для отображать содержимое в СДР.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • count ():

Считатьэто действие, которое мы используем для подсчета количество записей присутствует в СДР.Вотмы используем эту операцию для подсчета общего количества записей в нашем файле match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

java когда использовать это
  • take ():

Take - это операция Action, аналогичная collect, но с той лишь разницей, что она может печатать любые выборочное количество строк согласно запросу пользователя. Здесь мы применяем следующий код для печати десятка ведущих отчетов.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. возьмите (10) .foreach (println)

  • первый():

First () - это операция действия, аналогичная collect () и take ()Этоиспользуется для печати самого верхнего отчета. Здесь мы используем операцию first (), чтобы найти максимальное количество матчей, сыгранных в конкретном городе и на выходе мы получаем Мумбаи.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val состояния = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Чтобы сделать наш процесс изучения RDD с использованием Spark еще более интересным, я придумал интересный вариант использования.

RDD с использованием Spark: пример использования Pokemon

  • В первую очередь, Давайте загрузим файл Pokemon.csv и загрузим его в искровую оболочку, как мы это сделали с файлом Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Покемоны на самом деле доступны в большом количестве, давайте найдем несколько разновидностей.

  • Удаление схемы из файла Pokemon.csv

Нам может не понадобиться Схема файла Pokemon.csv. Следовательно, мы его удаляем.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Нахождение количества перегородки наш pokemon.csv распространяется в.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Водный покемон

Нахождение количество водных покемонов

val WaterRDD = PokemonDataRDD1.filter (строка => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Огненный покемон

Нахождение количество огненных покемонов

val FireRDD = PokemonDataRDD1.filter (строка => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Мы также можем обнаружить Население покемонов другого типа с помощью функции подсчета
WaterRDD.count () FireRDD.count ()

  • Поскольку мне нравится игра защитная стратегия давай найдем покемона с максимальная защита.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Мы знаем максимум значение силы защиты но мы не знаем, что это за покемон. Итак, давайте найдем, что это покемон.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Заказ [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Теперь давайте разберемся с покемонами с помощью наименьшая защита
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Теперь давайте посмотрим на покемона с менее оборонительная стратегия.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals2 (Head)) val .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Итак, на этом мы подошли к концу этого RDD, использующего статью Spark. Я надеюсь, что мы пролили немного света на ваши знания о RDD, их функциях и различных типах операций, которые можно выполнять с ними.

Эта статья основана на разработан, чтобы подготовить вас к сертификационному экзамену разработчика Cloudera Hadoop и Spark (CCA175). Вы получите глубокие знания об Apache Spark и экосистеме Spark, которая включает Spark RDD, Spark SQL, Spark MLlib и Spark Streaming. Вы получите исчерпывающие знания о языке программирования Scala, HDFS, Sqoop, Flume, Spark GraphX ​​и системе обмена сообщениями, такой как Kafka.