Кумулятивное преобразование с отслеживанием состояния в потоковой передаче Apache Spark



В этом сообщении блога обсуждаются преобразования с отслеживанием состояния в Spark Streaming. Узнайте все о совокупном отслеживании и повышении квалификации для карьеры в Hadoop Spark.

Предоставлено Притхвираджем Босом

В моем предыдущем блоге я обсуждал преобразования с отслеживанием состояния с использованием оконной концепции Apache Spark Streaming. Вы можете прочитать это Вот .





какой шеф-повар в DevOps

В этом посте я собираюсь обсудить совокупные операции с отслеживанием состояния в Apache Spark Streaming. Если вы новичок в Spark Streaming, я настоятельно рекомендую вам прочитать мой предыдущий блог, чтобы понять, как работает оконное управление.

Типы преобразования с отслеживанием состояния в потоковой передаче Spark (продолжение…)

> Накопительное отслеживание

Мы использовали reduceByKeyAndWindow (…) API для отслеживания состояний ключей, однако оконное управление накладывает ограничения на определенные варианты использования. Что, если мы хотим накапливать состояния клавиш повсюду, а не ограничивать их временным окном? В этом случае нам нужно будет использовать updateStateByKey (…) ОГОНЬ.



Этот API был представлен в Spark 1.3.0 и был очень популярен. Однако этот API имеет некоторые накладные расходы на производительность, его производительность снижается по мере увеличения размера состояний со временем. Я написал образец, чтобы показать использование этого API. Вы можете найти код Вот .

Spark 1.6.0 представил новый API mapWithState (…) который решает накладные расходы на производительность, связанные с updateStateByKey (…) . В этом блоге я собираюсь обсудить этот конкретный API, используя пример программы, который я написал. Вы можете найти код Вот .

Прежде чем я углублюсь в анализ кода, давайте поговорим о контрольных точках. Для любого преобразования с сохранением состояния установка контрольных точек является обязательной. Контрольные точки - это механизм восстановления состояния ключей в случае сбоя программы драйвера. При перезапуске драйвера состояние ключей восстанавливается из файлов контрольных точек. Местами контрольных точек обычно являются HDFS, Amazon S3 или любое надежное хранилище. При тестировании кода можно также сохранить в локальной файловой системе.



В примере программы мы слушаем текстовый поток сокета на host = localhost и port = 9999. Он токенизирует входящий поток в (слова, количество вхождений) и отслеживает количество слов с помощью API 1.6.0. mapWithState (…) . Дополнительно ключи без обновлений удаляются с помощью StateSpec.timeout API. Мы выполняем контрольные точки в HDFS, и частота контрольных точек - каждые 20 секунд.

Давайте сначала создадим сеанс Spark Streaming,

Spark-streaming-session

Мы создаем checkpointDir в HDFS, а затем вызвать метод объекта getOrCreate (…) . В getOrCreate API проверяет checkpointDir чтобы увидеть, есть ли какие-либо предыдущие состояния для восстановления, если они существуют, затем он воссоздает сеанс Spark Streaming и обновляет состояния ключей на основе данных, хранящихся в файлах, прежде чем перейти к новым данным. В противном случае создается новый сеанс потоковой передачи Spark.

В getOrCreate принимает имя каталога контрольной точки и функцию (которую мы назвали createFunc ) чья подпись должна быть () => StreamingContext .

Давайте изучим код внутри createFunc .

Строка №2: Мы создаем контекст потоковой передачи с именем задания «TestMapWithStateJob» и интервалом пакета = 5 секунд.

Строка № 5: Установите каталог контрольной точки.

Строка № 8: Задайте спецификацию состояния с помощью класса org.apache.streaming.StateSpec объект. Сначала мы устанавливаем функцию, которая будет отслеживать состояние, затем мы устанавливаем количество разделов для результирующих потоков DStream, которые должны быть сгенерированы во время последующих преобразований. Наконец, мы устанавливаем тайм-аут (до 30 секунд), при котором, если какое-либо обновление для ключа не будет получено в течение 30 секунд, состояние ключа будет удалено.

Строка 12 #: настроить поток сокета, сгладить входящие пакетные данные, создать пару ключ-значение, вызвать mapWithState , установите интервал контрольной точки на 20 секунд и, наконец, распечатайте результаты.

Платформа Spark называет th e createFunc для каждого ключа с предыдущим значением и текущим состоянием. Мы вычисляем сумму и обновляем состояние, добавляя кумулятивную сумму, и, наконец, возвращаем сумму для ключа.

как использовать charat в Java

Исходники Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Есть вопрос к нам? Пожалуйста, отметьте это в разделе комментариев, и мы свяжемся с вами.

Похожие сообщения:

Начать работу с Apache Spark и Scala

Преобразования с отслеживанием состояния с использованием окон в потоковой передаче Spark