В моем приложении pyspark я намерен использовать потоковую передачу Spark как метод преобразования сообщений Kafka «на лету». Каждое такое сообщение изначально получено из определенной темы Kafka. Такое сообщение нужно будет подвергнуть некоторым преобразованиям (скажем, заменить одну строку на другую), а преобразованную версию нужно разместить в другой теме Kafka. Первая часть (получение сообщения Kafka) работает нормально:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
...
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Каков правильный синтаксис, чтобы поместить что-то (скажем, строку) в другую тему Кафки? Должен ли такой метод предоставляться KafkaUtils или он доступен каким-то другим способом?