Я следую этой инструкции https://debezium.io/docs/tutorial-for-0-2/. Мой CDC хорошо работает для события mysql (создание, обновление, удаление).
Я пытаюсь получить это событие kafka из pyspark, используя python, мой код все еще не может получить это событие.
Под кодом:
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/spark-streaming-kafka-0-8-assembly_2.10-2.2.0.jar pyspark-shell'
if __name__ == '__main__':
conf = SparkConf().setAppName("DBWatcher").setMaster("local[3]")
sc = SparkContext(conf=conf)
sc.setLogLevel("INFO")
ssc = StreamingContext(sc, 20)
#Already try use :2181
kafkaStream = KafkaUtils.createStream(ssc, '10.90.29.24:9092', 'spark-streaming', {'mysql-server-1.inventory.customers': 1})
print('contexts =================== {} {}')
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
Из этого кода я получил следующую ошибку:
14.11.2018, 16:22:39 ОШИБКА TaskSetManager:70 - Задача 0 на этапе 0.0 не удалась 1 раз; aborting job 14.11.2018 16:22:39 ИНФОРМАЦИЯ TaskSchedulerImpl:54 — удален TaskSet 0.0, все задачи которого выполнены, из пула -14 16:22:39 INFO DAGScheduler:54 — ResultStage 0 (начало с NativeMethodAccessorImpl.java:0) завершился ошибкой за 0,438 с из-за того, что задание прервано из-за сбоя этапа: задача 0 на этапе 0.0 завершилась с ошибкой 1 раз, последний сбой: потерян задача 0.0 на этапе 0.0 (TID 0, локальный хост, драйвер исполнителя): java.lang.AbstractMethodError в org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99) в org.apache.spark.streaming. kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.log(Logging.scala:46) в org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream. scala:68) в org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) в org.apache.spark.streaming.kafka.KafkaReceiv er.logInfo(KafkaInputDStream.scala:68) в org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90) в org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala: 149) в org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) в org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) на org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) на org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) на org .apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) в org.apache.spark.scheduler.Task.run (Task.scala:109) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) в java.util.concurren t.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) в java.lang.Thread.run(Thread.java:748)
Кто-нибудь подскажет, как получить потоковые данные? С уважением.
Я бы предложил
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/spark-streaming-kafka-0-8-assembly_2.10-2.2.0.jar pyspark-shell'
if __name__ == '__main__':
conf = SparkConf().setAppName("DBWatcher").setMaster("local[3]")
sc = SparkContext(conf=conf)
sc.setLogLevel("INFO")
ssc = StreamingContext(sc, 20)
#Already try use :2181
kafkaStream = KafkaUtils.createStream(ssc, '10.90.29.24:9092', 'spark-streaming', {'mysql-server-1.inventory.customers': 1})
print('contexts =================== {} {}')
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()