Хобрук: Ваш путь к мастерству в программировании

как записать данные, преобразованные с помощью искры, обратно в брокер kafka с помощью pyspark?

В моем приложении pyspark я намерен использовать потоковую передачу Spark как метод преобразования сообщений Kafka «на лету». Каждое такое сообщение изначально получено из определенной темы Kafka. Такое сообщение нужно будет подвергнуть некоторым преобразованиям (скажем, заменить одну строку на другую), а преобразованную версию нужно разместить в другой теме Kafka. Первая часть (получение сообщения Kafka) работает нормально:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    ...

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

Каков правильный синтаксис, чтобы поместить что-то (скажем, строку) в другую тему Кафки? Должен ли такой метод предоставляться KafkaUtils или он доступен каким-то другим способом?


Ответы:


1

Внутри функции-обработчика мы можем делать что угодно с каждой записью, а затем отправлять эту запись в другую тему kafka:

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        producer.send('spark.out', str(record))
        producer.flush()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

Чтобы запустить это:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test
21.05.2016
  • Имейте в виду, что соединение не должно устанавливаться на драйвере, а затем распределяться между исполнителями (что является ужасной идеей, потому что а) оно может не работать и б) если это произойдет, это добавит серьезные накладные расходы), а создано на каждом разделе. Пара релевантных ссылок майкл -noll.com/blog/2014/10/01/ Шаблоны проектирования раздела для использования foreachRDD в руководстве по потоковой передаче Spark spark.apache.org/docs/1.1.0/ 22.09.2016

  • 2

    Правильный способ сделать согласно документации SPARK https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

    def kafka_sender(messages):
        producer = KafkaProducer(bootstrap_servers='localhost:9092')
    
        for message in messages:
            producer.send('alerts', bytes(message[0].encode('utf-8')))
            # For faster push
            # producer.flush()  
    
        producer.flush()
    
    
    
    # On your Dstream
    sentiment_data.foreachRDD(lambda rdd: rdd.foreachPartition(kafka_sender))
    
    08.05.2018
    Новые материалы

    Создание кнопочного меню с использованием HTML, CSS и JavaScript
    Вы будете создавать кнопочное меню, которое имеет состояние наведения, а также позволяет вам выбирать кнопку при нажатии на нее. Финальный проект можно увидеть в этом Codepen . Шаг 1..

    Внедрите OAuth в свои веб-приложения для повышения безопасности
    OAuth — это широко распространенный стандарт авторизации, который позволяет приложениям получать доступ к ресурсам от имени пользователя, не раскрывая его пароль. Это позволяет пользователям..

    Классы в JavaScript
    class является образцом java Script Object. Конструкция «class» позволяет определять классы на основе прототипов с чистым, красивым синтаксисом. // define class Human class Human {..

    Как свинг-трейдеры могут использовать ИИ для больших выигрышей
    По мере того как все больше и больше профессиональных трейдеров и активных розничных трейдеров узнают о возможностях, которые предоставляет искусственный интеллект и машинное обучение для улучшения..

    Как построить любой стол
    Я разработчик программного обеспечения. Я люблю делать вещи и всегда любил. Для меня программирование всегда было способом создавать вещи, используя только компьютер и мое воображение...

    Обзор: Машинное обучение: классификация
    Только что закончил третий курс курса 4 часть специализации по машинному обучению . Как и второй курс, он был посвящен низкоуровневой работе алгоритмов машинного обучения. Что касается..

    Разработка расширений Qlik Sense с qExt
    Использование современных инструментов веб-разработки для разработки крутых расширений Вы когда-нибудь хотели кнопку для установки переменной в приложении Qlik Sense? Когда-нибудь просили..