Обработка событий с помощью оператора SQL

Оператор SQL, также называемый редактором кода SQL, — это новая возможность преобразования данных в потоках событий Microsoft Fabric. Операторы SQL предоставляют возможность редактирования кода, где можно легко определить собственную логику преобразования данных с помощью простых выражений SQL. В этой статье описывается, как использовать оператор SQL для преобразований данных в потоке событий.

Note

Имена артефактов eventstream, включающие символ подчеркивания (_) или dot (.), несовместимы с операторами SQL. Для оптимального взаимодействия создайте новый поток событий без использования символов подчеркивания или точек в имени артефакта.

Prerequisites

  • Доступ к рабочей области в режиме лицензии емкости Fabric или в режиме пробной лицензии с разрешениями участника или более высокого уровня.

Добавление оператора SQL в поток событий

Чтобы выполнить операции потоковой обработки потоков в потоках данных с помощью оператора SQL, добавьте оператора SQL в поток событий, выполнив следующие инструкции:

  1. Создайте новый поток событий. Затем добавьте в него оператор SQL с помощью одного из следующих параметров:

    • На ленте выберите "Преобразовать события" и выберите SQL.

      Снимок экрана: выбор оператора SQL в меню для преобразования событий.

    • На холсте выберите "Преобразовать события" или "Добавить назначение", а затем выберите SQL Code.

      Снимок экрана: выбор оператора SQL в списке для преобразования событий на холсте.

  2. Новый узел SQL добавляется в поток событий. Щелкните значок карандаша, чтобы продолжить настройку оператора SQL.

    Снимок экрана: выбор значка карандаша на узле оператора SQL.

  3. На панели кода SQL укажите уникальное имя узла оператора SQL в потоке событий.

  4. Измените запрос в области запроса или выберите "Изменить запрос ", чтобы ввести представление полноэкранного редактора кода.

    Снимок экрана: поле для ввода имени операции и кнопки для редактирования запроса на панели кода SQL.

  5. В режиме полноэкранного редактора кода в левой части отображается панель проводника ввода и вывода. Раздел редактора кода настраивается, поэтому его можно изменить в соответствии с вашими предпочтениями. В нижней части раздела предварительного просмотра можно просмотреть как входные данные, так и результат теста запроса.

    Снимок экрана: полный редактор SQL.

  6. Выделите текст в разделе "Выходные данные " и введите имя целевого узла. Оператор SQL поддерживает все назначения Real-Time Intelligence, включая Event House, lakehouse, activator или поток.

    Снимок экрана: область

  7. Укажите псевдоним или имя назначения вывода, в котором данные, обработанные с помощью оператора SQL, записываются.

    Снимок экрана: имя выходных данных.

  8. Добавьте SQL-запрос для требуемого преобразования данных.

    Поток событий основан на Azure Stream Analytics и поддерживает ту же семантику запроса языка запросов Stream Analytics. Дополнительные сведения о синтаксисе и использовании см. в статье Azure Stream Analytics и справочник по языку запросов eventstream.

    Ниже приведена базовая структура запросов:

    SELECT 
    
        column1, column2, ... 
    
    INTO 
    
        [output alias] 
    
    FROM 
    
        [input alias] 
    

    В этом примере запроса показано обнаружение высоких температур в комнате каждую минуту:

    
        SELECT 
        System.Timestamp AS WindowEnd, 
        roomId, 
        AVG(temperature) AS AvgTemp 
    INTO 
        output 
    FROM 
        input 
    GROUP BY 
        roomId, 
        TumblingWindow(minute, 1) 
    HAVING 
        AVG(temperature) > 75 
    

    В этом примере запроса показан CASE оператор для классификации температуры:

    SELECT
        deviceId, 
        temperature, 
        CASE  
            WHEN temperature > 85 THEN 'High' 
            WHEN temperature BETWEEN 60 AND 85 THEN 'Normal' 
            ELSE 'Low' 
        END AS TempCategory 
    INTO 
        CategorizedTempOutput 
    FROM 
        SensorInput 
    
  9. На ленте используйте команду "Тестовый запрос" для проверки логики преобразования. Результаты тестового запроса отображаются на вкладке результатов теста .

    Снимок экрана: результат теста.

  10. После завершения тестирования нажмите кнопку "Сохранить на ленте", чтобы вернуться к холсту потока событий.

    Снимок экрана: лента для запроса, включая команды для тестирования запроса и сохранения.

  11. Если кнопка "Сохранить" включена, на панели кода SQL выберите ее, чтобы сохранить параметры.

    Снимок экрана: панель кода SQL и кнопка

  12. Настройте место назначения.

    Снимок экрана: завершенный поток событий.

Дополнительные примеры

В следующих примерах показаны распространенные сценарии аналитики в режиме реального времени, которые можно реализовать с помощью оператора SQL.

Агрегирование продаж по городам в минуту — используется TumblingWindow для вычисления фиксированной, непереверяющей одноминутной суммы продаж, сгруппированных по городу:

SELECT
    System.Timestamp AS WindowEnd,
    city,
    SUM(salesAmount) AS TotalSales
INTO
    output
FROM
    input
GROUP BY
    city,
    TumblingWindow(minute, 1)

Обнаружение всплесков и ботов - Используйте HoppingWindow для обнаружения пользователей, которые размещают необычно большое количество заказов в течение пятиминутного скользящего окна, вычисляемого каждую минуту:

SELECT
    System.Timestamp AS WindowEnd,
    userId,
    COUNT(*) AS OrderCount
INTO
    output
FROM
    input
GROUP BY
    userId,
    HoppingWindow(minute, 5, 1)
HAVING
    COUNT(*) > 10

Флагирование аномалий на основе скользящей базовой линии - Используйте HoppingWindow для вычисления скользящего среднего и для обозначения устройств, максимальные значения метрик которых превышают дважды среднее значение в пределах окна, что указывает на потенциальную аномалию.

SELECT
    System.Timestamp AS WindowEnd,
    deviceId,
    AVG(metricValue) AS RollingAvg,
    MAX(metricValue) AS CurrentMax
INTO
    output
FROM
    input
GROUP BY
    deviceId,
    HoppingWindow(minute, 10, 1)
HAVING
    MAX(metricValue) > 2 * AVG(metricValue)

Запись в несколько пунктов назначения из одного оператора SQL

С помощью оператора SQL можно отправлять данные в несколько приемников выходных данных или назначений, добавив несколько INTO предложений в запрос SQL и определив несколько выходных данных.

Определение нескольких выходных данных в редакторе запросов

  1. Нажмите Изменить (значок карандаша) на узле оператора SQL, чтобы открыть панель SQL-кода.

  2. В области КОДА SQL выберите "Изменить запрос ", чтобы открыть полноэкранный редактор кода.

    Снимок экрана: панель кода SQL.

  3. В полноэкранном редакторе кода выберите + в разделе "Выходные данные", чтобы добавить новый результат. Выберите нужный тип вывода. Он создает псевдоним выходных данных, которые можно использовать в запросе. Выберите имя созданного вывода и введите имя по вашему выбору.

    Снимок экрана: кнопка добавления выходных данных в полный редактор SQL.

Использование нескольких инструкций SELECT ... INTO

Каждая SELECT инструкция может записывать данные в разные выходные данные. Добавьте запрос для записи выходных данных в несколько назначений.

В следующем примере запроса инструкция SELECT записывает в выходные данные с именем RawArchive (тип: Lakehouse), а инструкция SELECT записывает в выходные данные с именем AggregationResults (тип: Eventhouse).


-- Query 1: Archive all data to Lakehouse
SELECT *
INTO [RawArchive]
FROM [SQLDemoES-stream]

-- Query 2: Aggregate and filter data to create a real time dashboard to an Eventhouse
SELECT System.Timestamp() AS EventTime, COUNT(*) AS EventCount
INTO [AggregationResults]
FROM [SQLDemoES-stream]
GROUP BY TumblingWindow(minute, 1)
HAVING COUNT(*) > 100

Повторное использование промежуточной логики (наилучшая практика)

Если вы хотите избежать дублирования логики, используйте предложение WITH и распределите несколько выходных данных. В следующем примере InputStream общее табличное выражение (CTE) определяется так, чтобы читаться из входного потока один раз, а затем две SELECT инструкции ссылаются на InputStream CTE для записи в различные выходные потоки. Этот подход более эффективен, так как он избегает считывания из входного потока несколько раз.

  1. Введите следующий запрос в редактор кода SQL для чтения из входного потока один раз и записи в несколько выходных потоков.

    
    --Base query:  Reading input stream once
    With InputStream AS(
    SELECT * 
    FROM [SQLDemoES-stream] )
    
    -- Query 1: Archive all data to Lakehouse
    SELECT *
    INTO [RawArchive]
    FROM InputStream
    
    -- Query 2: Aggregate and filter data to create a real time dashboard to an Eventhouse
    SELECT System.Timestamp() AS EventTime, COUNT(*) AS EventCount
    INTO [AggregationResults]
    FROM InputStream
    GROUP BY TumblingWindow(minute, 1)
    HAVING COUNT(*) > 100
    
    
  2. Выберите "Тестовый запрос" , чтобы проверить результат запроса. Каждый вывод, определенный в запросе, имеет отдельную вкладку на панели результатов теста .

    Снимок экрана: пример добавления нескольких целевых запросов в полный редактор SQL.

  3. Нажмите кнопку "Сохранить", чтобы сохранить запрос и выйти из редактора.

    Снимок экрана: кнопка

  4. Снова нажмите кнопку "Сохранить " в области редактора SQL.

  5. Выберите каждый конечный узел, созданный из оператора SQL, а затем настройте параметры назначения для каждого из них.

    Снимок экрана: ссылки конфигурации для каждого конечного узла.

  6. После завершения настройки поток событий должен выглядеть следующим образом, где узел оператора SQL имеет два назначения выходных данных.

    Снимок экрана: пример оператора SQL с несколькими выходными данными.

Настройка политик упорядочения событий в операторе SQL

С помощью оператора SQL можно обрабатывать данные с помощью времени события или приложения. По умолчанию Eventstream использует время прибытия. Для обработки по времени события необходимо явно настроить его с помощью TIMESTAMP BY в запросе.

Пример ввода

{
    "deviceId": "device123",
    "temperature": 72,
    "eventTime": "2024-01-01T12:00:00Z"
}

Пример запроса с использованием времени события


SELECT
    deviceId,
    temperature,
    System.Timestamp() AS EventTimestamp
INTO
    Output
FROM
    Input
TIMESTAMP BY eventTime;

Вы также можете добавить пороговые значения для событий позднего прибытия и событий, нарушающих порядок, в расширенных настройках оператора SQL.

Снимок экрана: дополнительные параметры оператора SQL.

Limitations

  • Оператор SQL предназначен для централизации всей логики преобразования. В результате его нельзя использовать вместе с другими встроенными операторами в одном пути обработки. Цепочка нескольких операторов SQL в одном пути также не поддерживается.

  • При добавлении оператора SQL в топологию необходимо создать новые конечные узлы. Существующие конечные узлы нельзя повторно использовать с помощью оператора SQL.