У меня есть искровое потоковое приложение, которое выглядит так:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
kafkaDF.foreachPartition(
i =>{
createConnection()
i.foreach(
row =>{
connection.sendToTable()
}
)
closeConnection()
}
)
И я запускаю его на кластере пряжи, используя
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
Когда я пытаюсь зарегистрировать kafkaDF.rdd.partitions.size
, результат оказывается в основном «1» или «5». Я запутался, можно ли контролировать количество разделов моего DataFrame? KafkaUtils.createStream
, похоже, не принимает никаких параметров, связанных с количеством разделов, которые я хочу для rdd. Пробовал kafkaDF.rdd.repartition( int )
, тоже не работает.
Как я могу добиться большего параллелизма в своем коде? Если мой подход неверен, каков правильный способ его достижения?
spark.executor.cores
должен определять ядра для исполнителей. Итак,#Executors x spark.executor.cores = Total Cores
05.02.2016