Я использую специальный коннектор Kafka (написанный на Java с использованием Java API Kafka Connect) для извлечения данных из внешнего источника и сохранения в теме. Мне нужно установить собственную стратегию разделения. Я понимаю, что установка специального разделителя возможно в Kafka Producer, задав свойство partitioner.class
. Однако это свойство, похоже, ничего не делает для коннектора Kafka. Как мне настроить Kafka Connect (я использую сценарий connect-standalone
для запуска коннектора) для использования написанного мной пользовательского Partitioner
?
Установка стратегии разделения в коннекторе Kafka
Ответы:
Исходный соединитель может управлять разделом, в который записывается каждая исходная запись, через поле SourceRecord
partition
. Это самый простой способ, если это ваш собственный коннектор.
Однако, если вы хотите изменить способ разделения каждой записи исходным соединителем, можно использовать преобразование одного сообщения (SMT), которое перезаписывает поле partition
исходных записей. Скорее всего, вам придется написать собственный SMT, реализовав org.apache.kafka.connect.transforms.Transformation
и используя свою собственную логику разделения, но на самом деле это немного проще, чем писать собственный разделитель Kafka.
Например, вот условное настраиваемое преобразование, которое показывает, как использовать свойства конфигурации и как создать новый SourceRecord
экземпляр с желаемым номером раздела. Пример неполный, поскольку в нем действительно нет никакой истинной логики разделения, но он должен быть хорошей отправной точкой.
package io.acme.example; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.transforms.Transformation; import java.util.Map; public class CustomPartitioner implements Transformation { private static final String MAX_PARTITIONS_CONFIG = "max.partitions"; private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions"; private static final int MAX_PARTITIONS_DEFAULT = 1; /** * The definition of the configurations. We just define a single configuration property here, * but you can chain multiple "define" methods together. Complex configurations may warrant * pulling all the config-related things into a separate class that extends {@link AbstractConfig} * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}. */ private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC); private int maxPartitions; @Override public void configure(Map configs) { // store any configuration parameters as fields ... AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs); maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG); } @Override public SourceRecord apply(SourceRecord record) { // Compute the desired partition here int actualPartition = record.kafkaPartition(); int desiredPartition = ... // Then create the new record with all of the existing fields except with the new partition ... return record.newRecord(record.topic(), desiredPartition, record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); } @Override public ConfigDef config() { return CONFIG_DEF; } @Override public void close() { // do nothing } }
Функциональные возможности ConfigDef
и AbstractConfig
очень полезны и могут делать гораздо больше интересных вещей, в том числе использовать настраиваемые валидаторы и рекомендатели, а также иметь свойства конфигурации, которые зависят от других свойств. Если вы хотите узнать об этом больше, ознакомьтесь с некоторыми из существующих коннекторов Kafka Connect, которые также используют эту же платформу.
И последнее. При запуске автономных или распределенных рабочих процессов Kafka Connect обязательно установите переменную среды CLASSPATH, чтобы она указывала на файл JAR, содержащий ваш настраиваемый SMT, а также любые файлы JAR, от которых зависит ваш SMT, кроме предоставляемых Kafka. Команды connect-standalone.sh
и connect-distributed.sh
автоматически добавят файлы JAR Kafka в путь к классам.