Хобрук: Ваш путь к мастерству в программировании

Spark Streaming: как добавить дополнительные разделы в свой DStream?

У меня есть искровое потоковое приложение, которое выглядит так:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)

    kafkaDF.foreachPartition(
      i =>{
        createConnection()
        i.foreach(
          row =>{
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

И я запускаю его на кластере пряжи, используя

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....

Когда я пытаюсь зарегистрировать kafkaDF.rdd.partitions.size, результат оказывается в основном «1» или «5». Я запутался, можно ли контролировать количество разделов моего DataFrame? KafkaUtils.createStream, похоже, не принимает никаких параметров, связанных с количеством разделов, которые я хочу для rdd. Пробовал kafkaDF.rdd.repartition( int ), тоже не работает.

Как я могу добиться большего параллелизма в своем коде? Если мой подход неверен, каков правильный способ его достижения?


  • Вы пробовали решение? Это сработало для вас? 07.02.2016
  • Я добавил больше потребителей и больше разделов в тему Kafka. Теперь производительность лучше. Если у вас есть другие предложения по улучшению производительности, дайте мне знать 08.02.2016

Ответы:


1

В Spark Streaming параллелизм может быть достигнут в двух областях: (а) потребители/получатели (в вашем случае потребители Kafka) и (б) обработка (выполняемая Spark).

По умолчанию искровая потоковая передача назначает одно ядро ​​(он же Thread) каждому потребителю. Поэтому, если вам нужно получить больше данных, вам нужно создать больше потребителей. Каждый потребитель создаст DStream. Затем вы можете объединить DStreams, чтобы получить один большой поток.

// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B

val combineStream = messageStream1.union(messageStream2)

В качестве альтернативы , количество получателей/потребителей можно увеличить, переразбив входной поток:

inputStream.repartition(<number of partitions>))

Все оставшиеся ядра, доступные потоковому приложению, будут назначены Spark.

Таким образом, если у вас есть N ядра (определенные с помощью spark.cores.max) и у вас есть C потребителей, у вас остается N-C ядер, доступных для Spark.

#Partitions =~  #Consumers x (batch duration / block interval)

интервал блокировки = время ожидания потребителя перед отправкой данных, созданных им в виде искрового блока (определяется как конфигурация spark.streaming.blockInterval).

Всегда помните, что у Spark Streaming есть две функции, которые постоянно выполняются. Набор потоков, считывающих текущий микропакет (потребители), и набор потоков, обрабатывающих предыдущий микропакет (Spark).

Дополнительные советы по настройке производительности см. здесь , здесь и здесь.

05.02.2016
  • Я запускаю свое приложение в кластере YARN. Я не могу найти свойство spark.cores.max, но я устанавливаю --executor-cores conf с помощью команды spark-submit. Итак, означает ли это, что общее количество доступных ядер равно (количество исполнителей * число исполнителей)? 05.02.2016
  • Кроме того, можете ли вы привести пример или ссылку для создания большего количества потребителей и объединения их? Большое спасибо! 05.02.2016
  • Я не эксперт по конфигурациям YARN, так как в основном работаю с автономными версиями. spark.executor.cores должен определять ядра для исполнителей. Итак, #Executors x spark.executor.cores = Total Cores 05.02.2016
  • Новые материалы

    Создание кнопочного меню с использованием HTML, CSS и JavaScript
    Вы будете создавать кнопочное меню, которое имеет состояние наведения, а также позволяет вам выбирать кнопку при нажатии на нее. Финальный проект можно увидеть в этом Codepen . Шаг 1..

    Внедрите OAuth в свои веб-приложения для повышения безопасности
    OAuth — это широко распространенный стандарт авторизации, который позволяет приложениям получать доступ к ресурсам от имени пользователя, не раскрывая его пароль. Это позволяет пользователям..

    Классы в JavaScript
    class является образцом java Script Object. Конструкция «class» позволяет определять классы на основе прототипов с чистым, красивым синтаксисом. // define class Human class Human {..

    Как свинг-трейдеры могут использовать ИИ для больших выигрышей
    По мере того как все больше и больше профессиональных трейдеров и активных розничных трейдеров узнают о возможностях, которые предоставляет искусственный интеллект и машинное обучение для улучшения..

    Как построить любой стол
    Я разработчик программного обеспечения. Я люблю делать вещи и всегда любил. Для меня программирование всегда было способом создавать вещи, используя только компьютер и мое воображение...

    Обзор: Машинное обучение: классификация
    Только что закончил третий курс курса 4 часть специализации по машинному обучению . Как и второй курс, он был посвящен низкоуровневой работе алгоритмов машинного обучения. Что касается..

    Разработка расширений Qlik Sense с qExt
    Использование современных инструментов веб-разработки для разработки крутых расширений Вы когда-нибудь хотели кнопку для установки переменной в приложении Qlik Sense? Когда-нибудь просили..