Хобрук: Ваш путь к мастерству в программировании

Конфигурация Spring Integration DSL KafkaProducerContext

Я пытаюсь адаптировать следующий пример: https://github.com/joshlong/spring-and-kafka

с последними стабильными версиями следующих библиотек:

org.apache.kafka > kafka_2.10 > 0.8.2.2
org.springframework.integration > spring-integration-kafka > 1.2.1.RELEASE
org.springframework.integration > spring-integration-java-dsl > 1.1.0.RELEASE

Библиотека интеграции dsl, похоже, подверглась рефакторингу, вероятно, вызванному введением нового KafkaProducer.

Вот код моей конфигурации Producer:

@Bean(name = OUTBOUND_ID)
IntegrationFlow producer() {
    log.info("starting producer flow..");

    return flowDefinition -> {
        ProducerMetadata<String, String> getProducerMetadata = new ProducerMetadata<>(this.kafkaConfig.getTopic(),
                    String.class, String.class, new StringSerializer(), new StringSerializer());


        KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
                props.put("timeout.ms", "35000"))
                .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
                .get();

        flowDefinition
                .handle(kafkaProducerMessageHandler);
    };
}

И код для генерации сообщения:

@Bean
@DependsOn(OUTBOUND_ID)
CommandLineRunner kickOff(@Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
    return args -> {              
        for (int i = 0; i < 1000; i++) {
            in.send(MessageBuilder.withPayload("#" + i).setHeader(KafkaHeaders.TOPIC, this.kafkaConfig.getTopic()).build());
                log.info("sending message #" + i);
        }
    };
}

Это исключение, которое я получаю:

Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at jc.DemoApplication$ProducerConfiguration.lambda$kickOff$0(DemoApplication.java:104)
at org.springframework.boot.SpringApplication.runCommandLineRunners(SpringApplication.java:673)
... 10 more
Caused by: java.lang.NullPointerException
at org.springframework.integration.kafka.support.KafkaProducerContext.getTopicConfiguration(KafkaProducerContext.java:67)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:201)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 18 more

ОБНОВЛЕНИЕ:
Полный рабочий исходный код можно найти в моем форке:
https://github.com/magiccrafter/spring-and-kafka


Ответы:


1

Извините за задержку.

Ваша проблема связана с ранним созданием экземпляра IntegrationComponentSpec:

KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
          props.put("timeout.ms", "35000"))
          .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
          .addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
          .get();

вам не следует звонить .get() самостоятельно. KafkaProducerMessageHandlerSpec - это ComponentsRegistration, и только SI Java DSL может правильно его разрешить. Код там выглядит так:

public Collection<Object> getComponentsToRegister() {
    this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
    return Collections.<Object>singleton(this.kafkaProducerContext);
}

Поскольку этот код не вызывается, this.producerConfigurations не заполняется this.kafkaProducerContext. Хотя последний все равно должен быть зарегистрирован как бин.

Итак, чтобы решить вашу проблему, вы должны иметь дело только с IntegrationComponentSpec в определении DSL.

Просто получите KafkaProducerMessageHandlerSpec и используйте его для .handle() ниже. Не уверен, есть ли смысл извлекать этот объект, если мы можем использовать Kafka.outboundChannelAdapter() непосредственно из файла .handle().

20.10.2015
  • Я уже решил это, но я не знал, как все работает под капотом. Большое спасибо за SDL, ответ и полное объяснение. 21.10.2015
  • Новые материалы

    Dall-E 2: недавние исследования показывают недостатки в искусстве, созданном искусственным интеллектом
    DALL-E 2 — это всеобщее внимание в индустрии искусственного интеллекта. Люди в списке ожидания пытаются заполучить продукт. Что это означает для развития креативной индустрии? О применении ИИ в..

    «Очень простой» эволюционный подход к обучению с подкреплением
    В прошлом семестре я посетил лекцию по обучению с подкреплением (RL) в моем университете. Честно говоря, я присоединился к нему официально, но я редко ходил на лекции, потому что в целом я нахожу..

    Освоение информационного поиска: создание интеллектуальных поисковых систем (глава 1)
    Глава 1. Поиск по ключевым словам: основы информационного поиска Справочная глава: «Оценка моделей поиска информации: подробное руководство по показателям производительности » Глава 1: «Поиск..

    Фишинг — Упаковано и зашифровано
    Будучи старшим ИТ-специалистом в небольшой фирме, я могу делать много разных вещей. Одна из этих вещей: специалист по кибербезопасности. Мне нравится это делать, потому что в настоящее время я..

    ВЫ РЕГРЕСС ЭТО?
    Чтобы понять, когда использовать регрессионный анализ, мы должны сначала понять, что именно он делает. Вот простой ответ, который появляется, когда вы используете Google: Регрессионный..

    Не зря же это называют интеллектом
    Стек — C#, Oracle Опыт — 4 года Работа — Разведывательный корпус Мне пора служить Может быть, я немного приукрашиваю себя, но там, где я живу, есть обязательная военная служба на 3..

    LeetCode Проблема 41. Первый пропущенный положительный результат
    LeetCode Проблема 41. Первый пропущенный положительный результат Учитывая несортированный массив целых чисел, найдите наименьшее пропущенное положительное целое число. Пример 1: Input:..