Хобрук: Ваш путь к мастерству в программировании

Установка стратегии разделения в коннекторе Kafka

Я использую специальный коннектор Kafka (написанный на Java с использованием Java API Kafka Connect) для извлечения данных из внешнего источника и сохранения в теме. Мне нужно установить собственную стратегию разделения. Я понимаю, что установка специального разделителя возможно в Kafka Producer, задав свойство partitioner.class. Однако это свойство, похоже, ничего не делает для коннектора Kafka. Как мне настроить Kafka Connect (я использую сценарий connect-standalone для запуска коннектора) для использования написанного мной пользовательского Partitioner?


Ответы:


1

Исходный соединитель может управлять разделом, в который записывается каждая исходная запись, через поле SourceRecord partition. Это самый простой способ, если это ваш собственный коннектор.

Однако, если вы хотите изменить способ разделения каждой записи исходным соединителем, можно использовать преобразование одного сообщения (SMT), которое перезаписывает поле partition исходных записей. Скорее всего, вам придется написать собственный SMT, реализовав org.apache.kafka.connect.transforms.Transformation и используя свою собственную логику разделения, но на самом деле это немного проще, чем писать собственный разделитель Kafka.

Например, вот условное настраиваемое преобразование, которое показывает, как использовать свойства конфигурации и как создать новый SourceRecord экземпляр с желаемым номером раздела. Пример неполный, поскольку в нем действительно нет никакой истинной логики разделения, но он должен быть хорошей отправной точкой.

package io.acme.example;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Map;

public class CustomPartitioner implements Transformation {

  private static final String MAX_PARTITIONS_CONFIG = "max.partitions";
  private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions";
  private static final int MAX_PARTITIONS_DEFAULT = 1;

  /** 
   * The definition of the configurations. We just define a single configuration property here,
   * but you can chain multiple "define" methods together. Complex configurations may warrant
   * pulling all the config-related things into a separate class that extends {@link AbstractConfig}
   * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the
   * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}.
   */
  private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC);

  private int maxPartitions;

  @Override
  public void configure(Map configs) {
    // store any configuration parameters as fields ...
    AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
    maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG);
  }

  @Override
  public SourceRecord apply(SourceRecord record) {
    // Compute the desired partition here
    int actualPartition = record.kafkaPartition();
    int desiredPartition = ...
    // Then create the new record with all of the existing fields except with the new partition ...
    return record.newRecord(record.topic(), desiredPartition,
                            record.keySchema(), record.key(),
                            record.valueSchema(), record.value(),
                            record.timestamp());
  }

  @Override
  public ConfigDef config() {
    return CONFIG_DEF;
  }

  @Override
  public void close() {
    // do nothing
  }
}

Функциональные возможности ConfigDef и AbstractConfig очень полезны и могут делать гораздо больше интересных вещей, в том числе использовать настраиваемые валидаторы и рекомендатели, а также иметь свойства конфигурации, которые зависят от других свойств. Если вы хотите узнать об этом больше, ознакомьтесь с некоторыми из существующих коннекторов Kafka Connect, которые также используют эту же платформу.

И последнее. При запуске автономных или распределенных рабочих процессов Kafka Connect обязательно установите переменную среды CLASSPATH, чтобы она указывала на файл JAR, содержащий ваш настраиваемый SMT, а также любые файлы JAR, от которых зависит ваш SMT, кроме предоставляемых Kafka. Команды connect-standalone.sh и connect-distributed.sh автоматически добавят файлы JAR Kafka в путь к классам.

29.06.2017
Новые материалы

Dall-E 2: недавние исследования показывают недостатки в искусстве, созданном искусственным интеллектом
DALL-E 2 — это всеобщее внимание в индустрии искусственного интеллекта. Люди в списке ожидания пытаются заполучить продукт. Что это означает для развития креативной индустрии? О применении ИИ в..

«Очень простой» эволюционный подход к обучению с подкреплением
В прошлом семестре я посетил лекцию по обучению с подкреплением (RL) в моем университете. Честно говоря, я присоединился к нему официально, но я редко ходил на лекции, потому что в целом я нахожу..

Освоение информационного поиска: создание интеллектуальных поисковых систем (глава 1)
Глава 1. Поиск по ключевым словам: основы информационного поиска Справочная глава: «Оценка моделей поиска информации: подробное руководство по показателям производительности » Глава 1: «Поиск..

Фишинг — Упаковано и зашифровано
Будучи старшим ИТ-специалистом в небольшой фирме, я могу делать много разных вещей. Одна из этих вещей: специалист по кибербезопасности. Мне нравится это делать, потому что в настоящее время я..

ВЫ РЕГРЕСС ЭТО?
Чтобы понять, когда использовать регрессионный анализ, мы должны сначала понять, что именно он делает. Вот простой ответ, который появляется, когда вы используете Google: Регрессионный..

Не зря же это называют интеллектом
Стек — C#, Oracle Опыт — 4 года Работа — Разведывательный корпус Мне пора служить Может быть, я немного приукрашиваю себя, но там, где я живу, есть обязательная военная служба на 3..

LeetCode Проблема 41. Первый пропущенный положительный результат
LeetCode Проблема 41. Первый пропущенный положительный результат Учитывая несортированный массив целых чисел, найдите наименьшее пропущенное положительное целое число. Пример 1: Input:..