Поделиться через


Быстрый старт: отправка и получение событий в центры событий с использованием Python

Из этого краткого руководства вы узнаете, как отправлять события и получать события из концентратора событий с помощью пакета Python azure-eventhub .

Предварительные требования

Если вы впервые используете Центры событий Azure, ознакомьтесь с общими сведениями о Центрах событий, прежде чем приступить к работе с этим руководством.

Чтобы завершить работу с этим кратким руководством, убедитесь, что у вас есть следующие предварительные требования:

  • Подписка Microsoft Azure. Зарегистрируйте бесплатную пробную версию , если у вас нет.
  • Python 3.8 или более поздней версии: убедитесь, что pip установлен и обновлен.
  • Visual Studio Code (рекомендуется): или используйте любую другую интегрированную среду разработки.
  • Пространство имен Центров событий и концентратор событий: воспользуйтесь этим руководством, чтобы создать их на портале Azure.

Установка пакетов для отправки событий

Чтобы установить пакеты Python для Центров событий, откройте командную строку с python в пути. Измените каталог на папку, в которой вы хотите сохранить примеры.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Проверка подлинности приложения в Azure

В этом кратком руководстве показано два способа подключения к Центрам событий Azure:

  • Без пароля. Используйте принципал безопасности в Microsoft Entra ID и управление доступом на основе ролей (RBAC) для подключения к пространству имен Центров событий. Вам не нужно беспокоиться о наличии жестко закодированных строк подключения в коде, в файле конфигурации или безопасном хранилище, например Azure Key Vault.
  • Строка подключения. Используйте строку подключения для подключения к пространству имен Event Hubs. Если вы не знакомы с Azure, вы можете найти вариант строка подключения проще следовать.

Мы рекомендуем использовать параметр без пароля в реальных приложениях и рабочих средах. Дополнительные сведения см. в статье "Проверка подлинности и авторизация служебной шины" иподключения без пароля для служб Azure.

Назначение ролей пользователю Microsoft Entra

При локальной разработке убедитесь, что учетная запись пользователя, которая подключается к Центрам событий Azure, имеет правильные разрешения. Для отправки и получения сообщений требуется роль владельца данных Центров событий Azure . Чтобы назначить себе эту роль, вам потребуется роль администратора доступа пользователей или другая роль, которая включает Microsoft.Authorization/roleAssignments/write действие. Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения см. на странице "Общие сведения об области применения Azure RBAC".

Следующий пример назначает роль Azure Event Hubs Data Owner учетной записи вашего пользователя, предоставляющую полный доступ к ресурсам событийных концентраторов Azure. В реальном сценарии следуйте принципу наименьших привилегий , чтобы предоставить пользователям только минимальные разрешения, необходимые для более безопасной рабочей среды.

Встроенные роли Azure для Центров событий Azure

Для Центров событий Azure управление пространствами имен и всеми связанными ресурсами с помощью портала Azure и API управления ресурсами Azure уже защищено с помощью модели управления доступом на основе ролей Azure (RBAC). Azure предоставляет следующие встроенные роли для авторизации доступа к пространству имен Центров событий:

Если вы хотите создать пользовательскую роль, см. раздел "Права", необходимые для операций Центров событий.

Внимание

В большинстве случаев для распространения назначения ролей в Azure требуется минута или две. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. В портале Azure находите ваше пространство имен Event Hubs с помощью основной строки поиска или навигации слева.

  2. На странице обзора выберите элемент управления доступом (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите +Добавить в верхнем меню. Затем выберите "Добавить назначение роли".

    Снимок экрана: назначение роли.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите Azure Event Hubs Data Owner и выберите соответствующий результат. Теперь щелкните Далее.

  6. В разделе "Назначение доступа" выберите "Пользователь", "Группа" или "Субъект-служба". Затем нажмите кнопку +Выбрать участников.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно ваш user@domain адрес электронной почты). Нажмите Выбрать в нижней части диалогового окна.

  8. Выберите Рецензирование + назначение, чтобы перейти на окончательную страницу. Нажмите кнопку "Проверить и назначить " еще раз, чтобы завершить процесс.

Отправка событий

В этом разделе описано, как создать скрипт Python для отправки событий в созданный ранее концентратор событий.

  1. Откройте предпочитаемый редактор Python, например Visual Studio Code.

  2. Создайте сценарий с именем send.py. Этот сценарий отправляет пакет событий в концентратор событий, созданный ранее.

  3. Скопируйте приведенный ниже код в файл send.py.

    В коде используйте реальные значения для замены следующих заполнителей:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE — Полностью определенное имя можно найти на странице Обзор пространства имен. Он должен быть в формате: <NAMESPACENAME>>.servicebus.windows.net
    • EVENT_HUB_NAME — Имя концентратора событий.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Примечание.

    Примеры других вариантов отправки событий в концентратор событий асинхронно с помощью строки подключения см. на странице send_async.py GitHub. Приведенные шаблоны также применимы к отправке событий без пароля.

Получение событий

В этом кратком руководстве хранилище объектов Azure Blob используется в качестве хранилища контрольных точек. Хранилище контрольных точек используется для сохранения контрольных точек (т. е. последних позиций чтения).

Следуйте этим рекомендациям при использовании хранилища BLOB-объектов Azure в качестве хранилища контрольных точек:

  • Используйте отдельный контейнер для каждой группы потребителей. Вы можете использовать одну и ту же учетную запись хранения, но использовать один контейнер для каждой группы.
  • Не используйте учетную запись хранения для других действий.
  • Не используйте контейнер для ничего другого.
  • Создайте учетную запись хранения в том же регионе, что и развернутое приложение. Если приложение находится в локальной среде, попробуйте выбрать ближайший регион.

На странице учетной записи хранения в портале Azure, в разделе службы BLOB, убедитесь, что следующие параметры отключены.

  • Иерархическое пространство имен
  • Обратимое удаление BLOB-объекта
  • Управление версиями

Создать учетную запись хранения Azure и контейнер Blob

Создайте учетную запись хранения Azure и блоб-контейнер в ней, выполнив следующие действия.

  1. Создайте учетную запись хранения Azure
  2. Создайте контейнер BLOB.
  3. Аутентификация входа в контейнер BLOB.

Обязательно запишите строку подключения и имя контейнера для последующего использования в коде получения.

При локальной разработке убедитесь, что учетная запись пользователя, которая обращается к данным больших двоичных объектов, имеет правильные разрешения. Необходим участник данных BLOB-объектов хранилища для чтения и записи данных BLOB-объектов. Чтобы назначить себе эту роль, необходимо назначить роль администратора доступа пользователей или другую роль, которая включает действие Microsoft.Authorization/roleAssignments/write . Роли Azure RBAC можно назначить пользователю с помощью портала Azure, Azure CLI или Azure PowerShell. Дополнительные сведения см. в Описание области действия для Azure RBAC.

В этом сценарии вы назначаете разрешения своей учетной записи пользователя, ограниченные учетной записью хранения, чтобы следовать принципу наименьших привилегий. В рамках этой практики пользователям предоставляются только минимальные необходимые разрешения, что позволяет создавать более защищенные рабочие среды.

В следующем примере роль участника данных BLOB-объектов хранилища назначается учетной записи пользователя, которая предоставляет доступ как для чтения, так и записи к данным BLOB-объектов в вашей учетной записи хранения.

Внимание

В большинстве случаев для распространения назначения ролей в Azure требуется минута или две. В редких случаях может потребоваться до восьми минут. Если при первом запуске кода возникают ошибки аутентификации, подождите несколько минут и повторите попытку.

  1. На портале Azure найдите свою учетную запись хранения, воспользовавшись основной панелью поиска или областью навигации слева.

  2. На странице учетной записи хранения выберите элемент управления доступом (IAM) в меню слева.

  3. На странице Контроль доступа (IAM) откройте вкладку Назначения ролей.

  4. Выберите +Добавить в верхнем меню. Затем выберите "Добавить назначение роли".

    Снимок экрана: назначение роли учетной записи хранения.

  5. Используйте поле поиска, чтобы отфильтровать результаты для отображения нужной роли. В этом примере найдите Storage Blob Data Contributor. Выберите соответствующий результат, а затем нажмите кнопку Далее.

  6. В разделе Назначение доступа для выберите Пользователь, группа или служебный принципал и + Выбрать участников.

  7. В диалоговом окне найдите имя пользователя Microsoft Entra (обычно это ваш адрес электронной почты user@domain), а затем выберите пункт Select в нижней части диалогового окна.

  8. Выберите Рецензирование + назначение, чтобы перейти на окончательную страницу. Нажмите кнопку "Проверить и назначить " еще раз, чтобы завершить процесс.

Установка пакетов для получения событий

Для принимающей стороны необходимо установить один или несколько пакетов. В этом кратком руководстве вы используете Azure Blob-хранилище для сохранения контрольных точек, чтобы программа не считывала уже прочитанные события. Оно регулярно выполняет фиксацию метаданных для полученных сообщений в большом двоичном объекте. Благодаря такому подходу позже можно легко продолжить получать сообщения с того места, где вы остановились.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Создание сценария Python для получения событий

В этом разделе вы создадите скрипт Python для получения событий из концентратора событий.

  1. Откройте предпочитаемый редактор Python, например Visual Studio Code.

  2. Создайте сценарий с именем recv.py.

  3. Вставьте в recv.py следующий код.

    В коде используйте реальные значения для замены следующих заполнителей:

    • BLOB_STORAGE_ACCOUNT_URL — Это значение должно быть в формате: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME — Название контейнера BLOB в аккаунте хранения Azure.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE — Полностью определенное имя можно найти на странице Обзор пространства имен. Он должен быть в формате: <NAMESPACENAME>>.servicebus.windows.net
    • EVENT_HUB_NAME — Имя концентратора событий.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Примечание.

    Примеры других вариантов получения событий из концентратора событий асинхронно с помощью строки подключения см. на странице recv_with_checkpoint_store_async.py GitHub. Отображаемые шаблоны также применимы к получению событий без пароля.

Запуск приложения получателя

  1. Запустите командную строку.

  2. Выполните следующую команду и войдите с помощью учетной записи, которая была добавлена в роль владельца данных Центров событий Azure в пространстве имен Центров событий и роли участника данных BLOB-объектов хранилища в учетной записи хранения Azure.

    az login
    
  3. Перейдите в папку с файлом receive.py и выполните следующую команду:

    python recv.py
    

Запуск приложения отправителя

  1. Запустите командную строку.

  2. Выполните следующую команду и войдите с помощью учетной записи, которая была добавлена в роль владельца данных Центров событий Azure в пространстве имен Центров событий и роли участника данных BLOB-объектов хранилища в учетной записи хранения Azure.

    az login
    
  3. Перейдите в папку с send.py, а затем выполните следующую команду:

    python send.py
    

В окне получателя должны отобразиться сообщения, отправленные в концентратор событий.

Устранение неполадок

Если события не отображаются в окне приемника или код сообщает об ошибке, попробуйте выполнить следующие советы по устранению неполадок:

  • Если результаты не отображаются из recy.py, выполните send.py несколько раз.

  • Если при использовании кода без пароля (с учетными данными) возникают ошибки, связанные с "coroutine", убедитесь, что вы импортируете из azure.identity.aio.

  • Если отображается "Незакрытый сеанс клиента" с кодом без пароля (с учетными данными), убедитесь, что вы закройте учетные данные после завершения. Дополнительные сведения см. в разделе "Асинхронные учетные данные".

  • Если при доступе к хранилищу возникают ошибки авторизации с recv.py, убедитесь, что следовали шагам из Создание учетной записи хранения Azure и контейнера для блобов и назначили роль Участник данных блоба хранилища основному приложению.

  • Если вы получаете события с различными идентификаторами секций, ожидается этот результат. Разделы — это механизм организации данных, связанный с необходимым для потребляющих приложений степенью параллелизма. Количество разделов в концентраторе событий непосредственно связано с числом параллельных читателей, которые вы ожидаете иметь. Дополнительные сведения см. в разделе "Дополнительные сведения о секциях".

Следующие шаги

В этом кратком руководстве вы асинхронно отправляли и получали события. Чтобы узнать, как отправлять и получать события синхронно, перейдите на страницу GitHub sync_samples.

Ознакомьтесь с дополнительными примерами и дополнительными сценариями в клиентской библиотеке Центров событий Azure для примеров Python.