Публикация бизнес-событий с помощью записной книжки Spark и реагирование на них с помощью функции пользовательских данных (UDF) с помощью активатора

В этом руководстве описан полный рабочий процесс реагирования на собственные бизнес-события в Microsoft Fabric. Ниже приведены следующие действия.

  1. Создание бизнес-события и определение схемы
  2. Настройте записную книжку для публикации события с помощью Python.
  3. Проверьте опубликованные события в центре Real-Time.
  4. Создайте правило активатора, которое активирует функцию пользовательских данных при возникновении бизнес-события.

Это важно

Эта функция доступна в предварительной версии.

Создание нового бизнес-события

  1. Перейдите к бизнес-событиям в центре Real-Time.

  2. Выберите +Создать бизнес-событие и нажмите кнопку "Создать схему".

    Снимок экрана: страница

  3. Определите схему бизнес-событий.

    1. Для параметра Имя введите VibrationCriticalDetected.

    2. В правой области для набора схем событий нажмите кнопку "Создать".

      Снимок экрана: страница

    3. Введите ManufacturingEquipmentHealth для имени набора схем.

      Снимок экрана: имя набора схем событий.

    4. Выберите "Добавить строку " в средней плоскости.

      Снимок экрана: выбор кнопки

    5. Выберите строку для типа события и введите MachineIDимя.

      Снимок экрана, на котором показана конфигурация свойства бизнес-события.

    6. Повторите приведенный выше шаг, чтобы добавить следующие свойства: ProductionLineID (строка), MeasuredVibration (строка), ImpactAssessment (строка), RecommendationAction (строка).

      Снимок экрана: конфигурация свойств схемы бизнес-событий.

    7. Для продолжения выберите Далее.

  4. Проверьте и подтвердите конфигурацию, а затем нажмите кнопку "Создать ", чтобы создать бизнес-событие.

    Снимок экрана: страница проверки и подтверждения для создания бизнес-события.

Настройка записной книжки для публикации

  1. После создания бизнес-мероприятия перейдите в свою рабочую область.

  2. Выберите +Создать элемент и выберите "Записная книжка " в разделе "Анализ и обучение данных ".

    Снимок экрана: кнопка

  3. Введите имя записной книжки (например, BusinessEventTutorialOneNotebook), подтвердите расположение (рабочая область), а затем нажмите кнопку "Создать ", чтобы создать записную книжку.

  4. Убедитесь, что вы выбрали Python 3.11 или B) PySpark (Python).

    Снимок экрана: выбор среды выполнения Python 3.11 в записной книжке.

    Снимок экрана: выбор среды выполнения Python PySpark в записной книжке.

  5. Найдите пример кода для публикации бизнес-события с помощью следующей команды. notebookutils.businessEvents.help() Вы должны увидеть выходные данные, аналогичные следующему:

    Help on module notebookutils.businessEvents in notebookutils:
    
    NAME
        notebookutils.businessEvents - [Preview] Utility for Business Events operations in Fabric
    
    FUNCTIONS
        help(methodName: str = '') -> None
            [Preview] Provides help for the notebookutils.businessEvents module or the specified method.
    
            Examples:
            notebookutils.businessEvents.help()
            notebookutils.businessEvents.help("publish")
            :param methodName: The name of the method to get help with.
    
        publish(eventSchemaSetWorkspace: str, eventSchemaSet: str, eventTypeName: str, eventData: Union[Dict[str, Any], List[Dict[str, Any]]], dataVersion: str = 'v1') -> bool
            [Preview] Publish business events data to the specified event type.
    
            Examples:
            notebookutils.businessEvents.publish(
                eventSchemaSetWorkspace="my-workspace-id",
                eventSchemaSet="OrderEvents",
                eventTypeName="OrderDelayed",
                eventData={"orderId": "12345", "status": "delayed", "reason": "weather"},
                dataVersion="v1"
            )
    
            # Batch publish multiple events
            notebookutils.businessEvents.publish(
                eventSchemaSetWorkspace="my-workspace-id",
                eventSchemaSet="OrderEvents",
                eventTypeName="OrderDelayed",
                eventData=[
                    {"orderId": "12345", "status": "delayed", "reason": "weather"},
                    {"orderId": "12346", "status": "delayed", "reason": "traffic"}
                ],
                dataVersion="v1"
            )
    
            :param eventSchemaSetWorkspace: The workspace ID or name where the event schema set is located
            :param eventSchemaSet: The ID or name of the event schema set
            :param eventTypeName: The name of the business events type to publish to
            :param eventData: The event data payload as a dictionary or list of dictionaries for batch publishing
            :param dataVersion: The version of the event type schema (default: "v1")
            :return: True if the event was published successfully
            :raises: Exception if the event could not be published
    
    DATA
        __all__ = ['help', 'publish']
    
    FILE
        /home/trusted-service-user/jupyter-env/python3.11/lib/python3.11/site-packages/notebookutils/businessEvents.py
    
    
  6. Добавьте новую ячейку, введите следующий код и запустите его для публикации бизнес-события.

    Замечание

    Обязательно замените значения eventSchemaSetWorkspace, eventSchemaSet, и eventTypeName на те, которые вы использовали при создании вашего бизнес-события. Свойства eventData также должны соответствовать схеме, определенной для вашего бизнес-события.

    
    notebookutils.businessEvents.publish(
        eventSchemaSetWorkspace="My workspace",
        eventSchemaSet="ManufacturingEquipmentHealth",
        eventTypeName="VibrationCriticalDetected",
        eventData={
            "MachineID": "12345",
            "ProductionLineID": "WestLine01",
            "MeasuredVibration": "1.52",
            "ImpactAssessment": "Production slowdown risk",
            "RecommendationAction": "Schedule maintenance"
        },
        dataVersion="v1"
    )
    

    Замечание

    Свойства eventSchemaSetWorkspace и eventSchemaSetName поддерживают как имена элементов Fabric, так и идентификаторы (ID) элементов Fabric.

    Снимок экрана: ячейка записной книжки Spark с кодом Python.

  7. Сохраните записную книжку, если не задано значение AutoSave.

Проверка опубликованных бизнес-событий

  1. Перейдите к бизнес-событиям в центре Real-Time.

  2. Выберите созданное бизнес-событие (например, VibrationCriticalDetected).

    Снимок экрана: страница

  3. На вкладке "Издатель" подтвердите, что вы видите событие и записная книжка указана в списке издателя.

    Снимок экрана: страница событий VibrationCriticalDetected с примером события и сведениями о издателе.

  4. Перейдите на вкладку "Предварительный просмотр данных ".

  5. В фильтре издателя выберите имя предварительно созданного издателя Ноутбук.

  6. Визуализировать событие в таблице предварительного просмотра.

Настройка пользовательской бизнес-логики с помощью функции пользовательских данных

  1. Перейдите в рабочую область и создайте новую функцию данных пользователя с именем ProcessVibrationCritical.

    Снимок экрана: создание новой функции данных пользователя в рабочей области.

  2. Создайте новую функцию.

    Снимок экрана: диалоговое окно создания новой функции.

  3. Измените логику новой функции и добавьте входные параметры, необходимые для получения бизнес-события.

    import datetime
    import fabric.functions as fn
    import logging
    import json
    
    udf = fn.UserDataFunctions()
    
    @udf.function()
    
    def processVibrationCritical(
        machineID: str,
        productionLine: str,
        measuredVibration: str,
        impactAssessment: str,
        recommendedAction: str
    ) -> str:
    
        logging.info("processVibrationCritical invoked.")
    
        event_data = {
            "machineID": machineID,
            "productionLine": productionLine,
            "measuredVibration": measuredVibration,
            "impactAssessment": impactAssessment,
            "recommendedAction": recommendedAction
        }
    
        # Log as structured JSON for easy searching/filtering in logs
        logging.info("processVibrationCritical payload=%s", json.dumps(event_data))
    
        return (
            f"Processed processVibrationCritical for machineID={machineID} "
            f"on line={productionLine} at {datetime.datetime.now()}."
        )
    
  4. Протестируйте функцию.

    1. В обозревателе функций наведите указатель мыши на созданную функцию, выберите меню ⋯(три точки), а затем выберите "Тест".

      Снимок экрана: обозреватель функций с выбранным меню

    2. В окне "Тест" введите эти примеры значений и выберите "Тест".

      • machineID: 12345
      • линияПроизводства: WestLine01
      • измереннаяВибрация: 1.52
      • Оценка воздействия: риск замедления производства
      • recommendedAction: планирование обслуживания
    3. Проверьте выходные данные и журналы, чтобы убедиться, что функция работает должным образом.

      Снимок экрана: обозреватель функций с окном тестирования с выходными данными.

  5. Выберите "Опубликовать " на панели инструментов, чтобы опубликовать функцию и сделать ее доступной для использования в правиле активации.

    Снимок экрана: кнопка

Создайте триггер активатора для использования событий

  1. Выберите значок центра в реальном времени в области навигации слева на портале Fabric.

  2. В центре Real-Time выберите "Бизнес-события " в категории "Подписка на ".

  3. В списке бизнес-событий найдите VibrationCriticalDetected событие. Выберите значок молнии или меню ⋯(три точки) рядом ⚡ с событием, а затем нажмите кнопку "Задать оповещение".

    Снимок экрана: выбор параметра

  4. На странице "Добавление правила " в разделе "Сведения " введите имя правила. Например, VibrationCriticalDetected_Rule.

  5. В разделе «Условие» для Проверка выберите «На каждом событии».

  6. В разделе "Действие " выберите одно из следующих действий. Чтобы настроить оповещение для активации функции при выполнении условия, выполните следующие действия.

    1. Для Выбор действия, выберите Выполнить функцию.

      Снимок экрана: выбор функции запуска в качестве действия.

    2. Выберите элемент Fabric, который вы хотите запустить, и нажмите кнопку "Добавить для продолжения" (например, функция пользовательских ProcessVibrationCritical данных).

      Снимок экрана: выбор элемента Fabric для запуска.

    3. Выберите функцию, которую нужно использовать для обработки действия (например, processVibrationCritical функции).

      Снимок экрана: выбор функции для обработки действия.

    4. Сопоставляйте каждый входной параметр, определенный в функции, с помощью свойства бизнес-события, которое вы ранее определили. Введите @ и выберите свойство бизнес-события из раскрывающегося списка, чтобы сопоставить его с параметром функции.

      Снимок экрана: сопоставление входных параметров с свойствами бизнес-событий.

      Повторите этот процесс для каждого входного параметра, определенного в функции.

      Снимок экрана: все входные параметры, сопоставленные с свойствами бизнес-событий.

  7. В разделе "Сохранить расположение " для рабочей области выберите рабочую область, в которой нужно создать элемент активатора Fabric.

  8. Для элемента выберите раскрывающийся список и нажмите кнопку "Создать элемент".

    Снимок экрана: выбор параметра

  9. В диалоговом окне введите имя нового элемента активатора Fabric (например, VibrationCriticalDetected_Activator), а затем нажмите кнопку "Создать".

    Снимок экрана: указание имени элемента активатора.

  10. Вы увидите страницу создания оповещения со ссылкой, чтобы открыть правило в пользовательском интерфейсе активатора Fabric на отдельной вкладке.

    Снимок экрана: страница создания оповещения.

  11. Выберите Открыть, чтобы открыть правило в пользовательском интерфейсе Fabric Activator.

Тестирование решения

Публикация другого события

В записной книжке Spark снова запустите ячейку, чтобы опубликовать новое VibrationCriticalDetected бизнес-событие.

Просмотр бизнес-события в центре Real-Time

Вкладка "Издатели"

  1. В центре Real-Time выберите бизнес-события в меню слева.

  2. Выберите VibrationCriticalDetected из списка бизнес-событий.

  3. На вкладке "Издатель" убедитесь, что вы видите недавно опубликованное событие и старое событие, опубликованное в начале руководства. Если новое событие не отображается, нажмите кнопку обновления, чтобы обновить список событий. Чтобы новое событие отображалось в списке, может потребоваться несколько секунд.

  4. В списке издателей убедитесь, что записная книжка, используемая для публикации события, указана в качестве издателя.

    Снимок экрана: вкладка

Вкладка "Потребители"

  1. Перейдите на вкладку "Потребители".

  2. Проверьте, что вы видите, что событие было доставлено в течение последнего часа.

  3. Убедитесь, что созданный активатор указан как потребитель.

    Снимок экрана: вкладка

Вкладка предварительного просмотра данных

  1. Перейдите на вкладку предварительного просмотра данных .

  2. Убедитесь, что вы видите потребительский фильтр с активатором, который вы создали.

    Снимок экрана: фильтр потребителя на вкладке

Просмотр бизнес-события в журнале выполнения активатора

  1. Перейдите в рабочую область, в которой вы создали элемент активатора, и откройте элемент активатора (например, VibrationCriticalDetected_Activator), если он еще не открыт.

  2. В элементе активатора выберите вкладку «Журнал».

  3. Убедитесь, что вы видите одну активацию.

    Снимок экрана: вкладка истории для правила Активатора.

Просмотр журналов для функции данных пользователя

  1. В рабочей области откройте созданную вами функцию ProcessVibrationCritical данных пользователя.

  2. Переключитесь на режим запуска только с помощью переключателя режима.

    Снимок экрана, на котором показано, как переключить функцию данных пользователя для запуска только в режиме.

  3. Наведите указатель мыши на имя функции в списке функций, выберите значок многоточия (...), а затем выберите "Просмотреть исторический журнал".

    Снимок экрана: параметр

  4. Вы увидите таблицу с историческими запусками функции. Выберите последний запуск, чтобы просмотреть журналы для этого запуска.

    Снимок экрана: исторические запуски для функции пользовательских данных.

  5. Убедитесь, что в журналах отображаются полезные данные события в формате JSON, как определено в логике функции, что означает, что функция была активирована бизнес-событием и правильно обрабатывала данные события.

    Снимок экрана логов для определенного запуска функции пользовательских данных.