Хобрук: Ваш путь к мастерству в программировании

Как обрабатывать события Debezium от Kafka с помощью Pyspark?

Я следую этой инструкции https://debezium.io/docs/tutorial-for-0-2/. Мой CDC хорошо работает для события mysql (создание, обновление, удаление).

Я пытаюсь получить это событие kafka из pyspark, используя python, мой код все еще не может получить это событие.

Под кодом:

os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/spark-streaming-kafka-0-8-assembly_2.10-2.2.0.jar pyspark-shell'

if __name__ == '__main__':

     conf = SparkConf().setAppName("DBWatcher").setMaster("local[3]")

     sc = SparkContext(conf=conf)
     sc.setLogLevel("INFO")

     ssc = StreamingContext(sc, 20)

     #Already try use :2181
     kafkaStream = KafkaUtils.createStream(ssc, '10.90.29.24:9092', 'spark-streaming', {'mysql-server-1.inventory.customers': 1})
     print('contexts =================== {} {}')
     lines = kafkaStream.map(lambda x: x[1])
     lines.pprint()

     ssc.start()
     ssc.awaitTermination()

Из этого кода я получил следующую ошибку:

14.11.2018, 16:22:39 ОШИБКА TaskSetManager:70 - Задача 0 на этапе 0.0 не удалась 1 раз; aborting job 14.11.2018 16:22:39 ИНФОРМАЦИЯ TaskSchedulerImpl:54 — удален TaskSet 0.0, все задачи которого выполнены, из пула -14 16:22:39 INFO DAGScheduler:54 — ResultStage 0 (начало с NativeMethodAccessorImpl.java:0) завершился ошибкой за 0,438 с из-за того, что задание прервано из-за сбоя этапа: задача 0 на этапе 0.0 завершилась с ошибкой 1 раз, последний сбой: потерян задача 0.0 на этапе 0.0 (TID 0, локальный хост, драйвер исполнителя): java.lang.AbstractMethodError в org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99) в org.apache.spark.streaming. kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.log(Logging.scala:46) в org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream. scala:68) в org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) в org.apache.spark.streaming.kafka.KafkaReceiv er.logInfo(KafkaInputDStream.scala:68) в org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90) в org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala: 149) в org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) в org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) на org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) на org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) на org .apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) в org.apache.spark.scheduler.Task.run (Task.scala:109) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) в java.util.concurren t.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) в java.lang.Thread.run(Thread.java:748)

Кто-нибудь подскажет, как получить потоковые данные? С уважением.

Я бы предложил

os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/spark-streaming-kafka-0-8-assembly_2.10-2.2.0.jar pyspark-shell'

if __name__ == '__main__':

     conf = SparkConf().setAppName("DBWatcher").setMaster("local[3]")

     sc = SparkContext(conf=conf)
     sc.setLogLevel("INFO")

     ssc = StreamingContext(sc, 20)

     #Already try use :2181
     kafkaStream = KafkaUtils.createStream(ssc, '10.90.29.24:9092', 'spark-streaming', {'mysql-server-1.inventory.customers': 1})
     print('contexts =================== {} {}')
     lines = kafkaStream.map(lambda x: x[1])
     lines.pprint()

     ssc.start()
     ssc.awaitTermination()


  • В целом похоже, что вы используете неверную версию stackoverflow.com/questions /49180931/ 14.11.2018
  • @cricket_007 Я пытаюсь изменить версию ‹br/›os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/spark-streaming-kafka-0-10_2.11-2.0.0. jar pyspark-shell' ‹br/›все еще получает ту же ошибку. Также попробуйте изменить ‹br/›os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2,org.apache.spark:spark -streaming-kafka-0-10_2.11:2.3.2, org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.3.2 pyspark-shell' 14.11.2018
  • не используйте одновременно _1_ и _2_... Также не используйте _3_, если вы не используете Spark _4_. Вы должны указать, какие версии Spark у вас установлены. 15.11.2018
  • Привет @cricket_007, спасибо за быстрый ответ, на самом деле моя версия spark 2.3.2. Я пытаюсь использовать эти 3 способа: 1. Используйте: os.environ['PYSPARK_SUBMIT_ARGS'] = --packages org.apache.spark:spark- streaming-kafka-0-10_2.11:2.3.2 pyspark-shell 2. Использование: os.environ['PYSPARK_SUBMIT_ARGS'] = --packages org.apache.spark:spark-sql-kafka-0-10_2.11: 2.3.2 pyspark-shell 3. Используйте оба варианта: os.environ['PYSPARK_SUBMIT_ARGS'] = --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2,org.apache.spark :spark-sql-kafka-0-10_2.11:2.3.2 pyspark-shellК сожалению, по-прежнему возникает та же ошибка. 15.11.2018
  • Вы пробовали структурированную потоковую передачу вместо получения DStream? Согласно spark.apache.org/docs/2.3. 2/ А при использовании Spark2 вы обычно начинаете с объекта SparkSession и получаете от него контекст (незначительные подробности здесь) 15.11.2018
  • 2018-11-14 16:22:39 ОШИБКА Исполнитель: 91 - Исключение в задаче 0.0 на этапе 0.0 (TID 0) java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala: 99) в org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.log(Logging.scala:46) в org.apache.spark .streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) в org.apache.spark.streaming.kafka.KafkaReceiver.logInfo (KafkaInputDStream.scala:68) в org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90) в org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) в org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) в org.apache.spark.streaming.scheduler.ReceiverTracker$Rec eiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) в org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) в org.apache.spark.SparkContext$ $anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala: 87) в org.apache.spark.scheduler.Task.run(Task.scala:109) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) в java.util.concurrent.ThreadPoolExecutor .runWorker(ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) в java.lang.Thread.run(Thread.java:748) 14.11.2018 16: 22:39 WARN TaskSetManager:66 — Потерянная задача 0.0 на этапе 0.0 (TID 0, локальный хост, драйвер исполнителя): java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNec essary(Logging.scala:99) в org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.log(Logging.scala:46) в org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) в org.apache.spark.streaming .kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68) в org.apache.spark.streaming.kafka. KafkaReceiver.onStart(KafkaInputDStream.scala:90) в org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) в org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala: 131) в org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) в org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply( ReceiverTracker.scala:590) в org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org. .apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) в org.apache.spark.scheduler.Task.run(Task.scala:109) в org.apache.spark.executor.Executor$TaskRunner.run (Executor.scala:345) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) в java.util.c oncurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) на java.lang.Thread.run(Thread.java:748) 15.11.2018

Новые материалы

Создание кнопочного меню с использованием HTML, CSS и JavaScript
Вы будете создавать кнопочное меню, которое имеет состояние наведения, а также позволяет вам выбирать кнопку при нажатии на нее. Финальный проект можно увидеть в этом Codepen . Шаг 1..

Внедрите OAuth в свои веб-приложения для повышения безопасности
OAuth — это широко распространенный стандарт авторизации, который позволяет приложениям получать доступ к ресурсам от имени пользователя, не раскрывая его пароль. Это позволяет пользователям..

Классы в JavaScript
class является образцом java Script Object. Конструкция «class» позволяет определять классы на основе прототипов с чистым, красивым синтаксисом. // define class Human class Human {..

Как свинг-трейдеры могут использовать ИИ для больших выигрышей
По мере того как все больше и больше профессиональных трейдеров и активных розничных трейдеров узнают о возможностях, которые предоставляет искусственный интеллект и машинное обучение для улучшения..

Как построить любой стол
Я разработчик программного обеспечения. Я люблю делать вещи и всегда любил. Для меня программирование всегда было способом создавать вещи, используя только компьютер и мое воображение...

Обзор: Машинное обучение: классификация
Только что закончил третий курс курса 4 часть специализации по машинному обучению . Как и второй курс, он был посвящен низкоуровневой работе алгоритмов машинного обучения. Что касается..

Разработка расширений Qlik Sense с qExt
Использование современных инструментов веб-разработки для разработки крутых расширений Вы когда-нибудь хотели кнопку для установки переменной в приложении Qlik Sense? Когда-нибудь просили..