Хобрук: Ваш путь к мастерству в программировании

Функция Azure не принимает события Eventhub

Я начал экспериментировать с Функциями Azure и столкнулся с проблемой, что моя функция не запускается событиями, входящими в мой концентратор событий.

Это код моей функции:

host.json:

  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  }
}

function.json:

  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "events",
      "direction": "in",
      "eventHubName": "eventhub",
      "connection": "eventhub_connection",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "stream"
    }
  ]
}

init.py:

import logging

import azure.functions as func


def main(events: List[func.EventHubEvent]):
    for event in events:
        logging.info('Python EventHub trigger processed an event: %s',
                        event.get_body().decode('utf-8'))
        logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
        logging.info(f'  EnqueuedTimeUtc = {event.enqueued_time}')
        logging.info(f'  SequenceNumber = {event.sequence_number}')
        logging.info(f'  Offset = {event.offset}')

# def main(event: func.EventHubEvent):
#     logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
#     logging.info(f'  EnqueuedTimeUtc = {event.enqueued_time}')
#     logging.info(f'  SequenceNumber = {event.sequence_number}')
#     logging.info(f'  Offset = {event.offset}')

#     # Metadata
#     for key in event.metadata:
#         logging.info(f'Metadata: {key} = {event.metadata[key]}')
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=storageaccount;AccountKey=storageacciuntaccesskey=;EndpointSuffix=core.windows.net",
    "eventhub_connection": "Endpoint=sb://eventhub01.servicebus.windows.net/;SharedAccessKeyName=function;SharedAccessKey=0omitted;EntityPath=eventhub"
  }
}

Я начал с базового кода Python для концентраторов событий, предоставляемого инструментами Azure Function Core. И тестировали различные фрагменты кода, найденные в онлайн-примерах из блогов людей и документации Microsoft.

При переключении на мощность: один - ›Я переключаюсь на код, который в данный момент закомментирован. Я не знаю, должно ли это происходить так, мне просто кажется, что это правильно.

В любом случае, независимо от настройки количества элементов или типа данных, изменяемого между двоичным, потоковым или строковым. Моя функция просто не срабатывает.

Я могу запросить свой хаб событий и просмотреть / прочитать события. Итак, я знаю свою политику, и общий ключ и тому подобное, работают нормально. Я также использую только группу потребителей $ Default.

Я также попытался настроить функцию запуска HTTP, и эта функция запускается из Azure Monitor. Я вижу в журналах каждый запрос, входящий в функцию.

Я что-то не так делаю в коде моей функции eventhub? Возможно, мне не хватает какой-то другой настройки конфигурации? Я уже проверил правила доступа для этой функции, но это действительно не имеет значения, не так ли? Функция извлекает событие из концентратора событий. Это не отправляет данные инициатором.

Изменить: добавлена ​​конфигурация файла local.settings.json и обновлен function.json Edit 2: решение моей конкретной проблемы находится в комментариях к ответу.


Ответы:


1

Обновление:

__init__.py функции:

from typing import List
import logging

import azure.functions as func


def main(events: List[func.EventHubEvent]):
    for event in events:
        logging.info('Python EventHub trigger processed an event: %s',
                        event.get_body().decode('utf-8'))

Отправить сообщение в концентратор событий:

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx;EntityPath=test", eventhub_name="test")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

И убедитесь, что вы указали правильное имя концентратора событий:

введите описание изображения здесь


Кажется, у вашего function.json есть проблема, строка подключения не должна напрямую вставляться в элемент привязки.

Должно получиться так:

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "events",
      "direction": "in",
      "eventHubName": "test",
      "connection": "testbowman_RootManageSharedAccessKey_EVENTHUB",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "binary"
    }
  ]
}

local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=xxxxxx;EndpointSuffix=core.windows.net",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "testbowman_RootManageSharedAccessKey_EVENTHUB": "Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx;EntityPath=test"
  }
}
26.04.2021
  • Привет, Боуман, спасибо за ответ. Я буду проверять это. Где мне найти эту информацию? Я не нахожу его в документации MS. Также как узнать, какими должны быть значения для DefaultEndpointsProtocol, AccountName, AccountKey и EndpointSuffix? Я предполагаю, что это изменится при создании триггера HTTP, функции EventGrid или другого типа триггера для функции. 26.04.2021
  • Прямой поиск этого параметра конфигурации приводит меня сюда: docs.microsoft.com/en-us/azure/azure-functions/ - ›Это не объясняет мне, почему настройки были неправильными или почему это, к сожалению, необходимо для триггера концентратора событий. Но я понимаю, какими должны быть ценности. 26.04.2021
  • @Marco Большинство триггеров должны предоставлять хранилище AzureWebJobsStorage. Вы можете предоставить this, вы также можете предоставить UseDevelopmentStorage=true (это моделируется с помощью базы данных sql. ). 26.04.2021
  • @Marco Привет, но я имею в виду строку подключения концентратора событий. 26.04.2021
  • :) Большое спасибо за размышления и предложения. Я обновил свою функцию вашими предложениями - ›Я обновил разделы кода в исходном сообщении. Я ждал около 15 минут, чтобы журналы попали в концентратор событий и посмотрели, изменятся ли исходящие сообщения и исходящие байты. Без изменений. Я проверил на мониторе функцию, и не было никаких журналов, указывающих на то, что функция зафиксировала какое-либо из новых событий. Я проверил соединение как с помощью rootmanagedaccesskey, так и с помощью клавиши доступа к функции. Пока успеха нет. 26.04.2021
  • Хорошо, я понял это благодаря тому, что ты зажег связи в моем мозгу. Я не знал, что local.settings.json содержит переменные, используемые function.json. Таким образом, я никогда не проверял настройки приложения после загрузки обновленного local.settings.json, и при проверке казалось, что новая переменная никогда не добавлялась в настройки приложения. После добавления вручную триггер eventhub автоматически заработал. 26.04.2021
  • Новые материалы

    Освоение информационного поиска: создание интеллектуальных поисковых систем (глава 1)
    Глава 1. Поиск по ключевым словам: основы информационного поиска Справочная глава: «Оценка моделей поиска информации: подробное руководство по показателям производительности » Глава 1: «Поиск..

    Фишинг — Упаковано и зашифровано
    Будучи старшим ИТ-специалистом в небольшой фирме, я могу делать много разных вещей. Одна из этих вещей: специалист по кибербезопасности. Мне нравится это делать, потому что в настоящее время я..

    ВЫ РЕГРЕСС ЭТО?
    Чтобы понять, когда использовать регрессионный анализ, мы должны сначала понять, что именно он делает. Вот простой ответ, который появляется, когда вы используете Google: Регрессионный..

    Не зря же это называют интеллектом
    Стек — C#, Oracle Опыт — 4 года Работа — Разведывательный корпус Мне пора служить Может быть, я немного приукрашиваю себя, но там, где я живу, есть обязательная военная служба на 3..

    LeetCode Проблема 41. Первый пропущенный положительный результат
    LeetCode Проблема 41. Первый пропущенный положительный результат Учитывая несортированный массив целых чисел, найдите наименьшее пропущенное положительное целое число. Пример 1: Input:..

    Расистский и сексистский робот, обученный в Интернете
    Его ИИ основан на предвзятых данных, которые создают предрассудки. Он словно переходит из одного эпизода в другой из серии Черное зеркало , а вместо этого представляет собой хронику..

    Управление состоянием в микрофронтендах
    Стратегии бесперебойного сотрудничества Микро-фронтенды — это быстро растущая тенденция в сфере фронтенда, гарантирующая, что удовольствие не ограничивается исключительно бэкэнд-системами..