Обнаружение многомерных аномалий

Общие сведения об многовариантном обнаружении аномалий в аналитике в режиме реального времени см. в разделе "Многовариантное обнаружение аномалий" в Microsoft Fabric . Обзор. В этом руководстве вы используете примеры данных для обучения многовариантной модели обнаружения аномалий с помощью обработчика Spark в записной книжке Python. Затем вы прогнозируете аномалии, применяя обученную модель к новым данным с помощью подсистемы Eventhouse. Первые несколько шагов по настройке сред, а также следующие шаги по обучению модели и прогнозированию аномалий.

Предварительные условия

Часть 1. Активация доступности OneLake

Доступность OneLake должна быть включена до того, как вы начнёте загружать данные в Eventhouse. Этот шаг важен, так как он позволяет загруженным данным стать доступными в OneLake. На следующем шаге вы обращаетесь к этим же данным из записной книжки Spark для обучения модели.

  1. В рабочей области выберите хранилище событий, созданное в предварительных требованиях. Выберите базу данных, в которой вы хотите хранить данные.

  2. В области сведений о базе данных переключите кнопку доступности OneLake в положение Вкл.

    Скриншот включения функции OneLake в системе Eventhouse.

Часть 2. Включение KQL Python плагина

На этом шаге вы включите плагин Python в вашем Eventhouse. Этот шаг необходим для запуска кода Python прогнозирования аномалий в наборе запросов KQL. Важно выбрать правильное изображение, содержащее пакет детектора аномалий временных рядов .

  1. На экране Eventhouse выберите Eventhouse>подключаемые модули на ленте.

  2. В области подключаемых модулей переключите расширение языка Python на Вкл.

  3. Выберите Python 3.11.7 DL (предварительная версия).

  4. Нажмите кнопку Готово.

    Скриншот, как включить пакет Python 3.11.7 DL в Eventhouse.

Часть 3. Создание среды Spark

На этом шаге вы создадите среду Spark для запуска записной книжки Python, которая обучает многовариантную модель обнаружения аномалий с помощью двигателя Spark. Дополнительные сведения о создании сред см. в статье "Создание сред и управление ими".

  1. В рабочей области выберите +Создать элемент , а затем среду.

    Снимок экрана плитки

  2. Введите имя MVAD_ENV для среды и нажмите кнопку "Создать".

  3. В разделе "Библиотеки" выберите общедоступные библиотеки.

  4. Выберите 'Добавить из PyPI'.

  5. В поле поиска введите детектор аномалий временных рядов. Поле версии автоматически заполняется последней версией. Измените его на версию 0.3.9, которая является последней версией, совместимой с MLflow 2.19.0 (текущая версия на образе Python)

  6. Выберите Сохранить.

    снимок экрана: добавление пакета PyPI в среду Spark.

  7. Выберите вкладку "Главная " в среде.

  8. Выберите значок публикации на ленте.

  9. Выберите Опубликовать все. Этот шаг может занять несколько минут.

    Снимок экрана публикации среды.

Часть 4. Загрузка данных в Eventhouse

  1. Наведите указатель мыши на базу данных KQL, в которой требуется хранить данные. Выберите меню "Дополнительно" [...]>Получить данные>Локальный файл.

    Снимок экрана: получение данных из локального файла.

  2. Выберите +Создать таблицу и введите demo_stocks_change в качестве имени таблицы.

  3. В диалоговом окне отправки данных выберите "Обзор файлов " и отправьте пример файла данных, скачанный в предварительных требованиях.

  4. Выберите Далее.

  5. В разделе "Просмотр данных" убедитесь, что параметр "Первая строка — заголовок столбца" установлен в положение "Вкл.".

  6. Выберите Готово.

  7. После отправки данных нажмите кнопку "Закрыть".

Часть 5. Копирование пути OneLake в таблицу

Убедитесь, что выбрана таблица demo_stocks_change. В области сведений о таблице выберите папку OneLake, чтобы скопировать путь OneLake в буфер обмена. Сохраните скопированный текст в текстовом редакторе где-то, где будет использоваться на следующем шаге.

Скриншот копирования пути OneLake.

Часть 6. Подготовка записной книжки

  1. Выберите свою рабочую область.

  2. Выберите Импорт, Блокнот, затем С этого компьютера.

  3. Выберите Загрузить и выберите блокнот, который вы скачали в разделе предварительных требований.

  4. После отправки записной книжки вы можете найти и открыть записную книжку в рабочей области.

  5. На верхней ленте выберите раскрывающийся список "Рабочая область по умолчанию " и выберите среду, созданную на предыдущем шаге.

    Снимок экрана: выбор окружения в блокноте.

Часть 7. Запуск записной книжки

  1. Импорт стандартных пакетов.

    import numpy as np
    import pandas as pd
    
  2. Spark требует ABFSS URI для безопасного подключения к хранилищу OneLake, поэтому следующий шаг определяет эту функцию для преобразования URI OneLake в URI ABFSS.

    def convert_onelake_to_abfss(onelake_uri):
        if not onelake_uri.startswith('https://'):
            raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
        uri_without_scheme = onelake_uri[8:]
        parts = uri_without_scheme.split('/')
        if len(parts) < 3:
            raise ValueError("Invalid OneLake URI format.")
        account_name = parts[0].split('.')[0]
        container_name = parts[1]
        path = '/'.join(parts[2:])
        abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
        return abfss_uri
    
  3. Замените заполнитель OneLakeTableURI на URI OneLake, скопированный из части 5 - Скопируйте путь OneLake в таблицу, чтобы загрузить таблицу demo_stocks_change в Pandas DataFrame.

    onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI
    abfss_uri = convert_onelake_to_abfss(onelake_uri)
    print(abfss_uri)
    
    df = spark.read.format('delta').load(abfss_uri)
    df = df.toPandas().set_index('Date')
    print(df.shape)
    df[:3]
    
  4. Выполните следующие ячейки, чтобы подготовить фреймы данных для обучения и прогнозирования.

    Примечание.

    Фактическое прогнозирование будет выполняться на данных в Eventhouse в части 9- Predict-anomalies-in-the-kql-queryset. В рабочем сценарии, если данные потоковой передачи передаются в хранилище событий, прогнозы будут сделаны на новых потоковых данных. Для работы с руководством набор данных разделен по дате на два раздела для обучения и прогнозирования. Это позволяет имитировать исторические данные и новые потоковые данные.

    features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
    cutoff_date = pd.to_datetime('2023-01-01')
    
    train_df = df[df.Date < cutoff_date]
    print(train_df.shape)
    train_df[:3]
    
    train_len = len(train_df)
    predict_len = len(df) - train_len
    print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
    
  5. Запустите ячейки, чтобы обучить модель и сохранить ее в реестре моделей MLflow Fabric.

    from anomaly_detector import MultivariateAnomalyDetector
    model = MultivariateAnomalyDetector()
    
    sliding_window = 200
    param   s = {"sliding_window": sliding_window}
    
    model.fit(train_df, params=params)
    
    model_name = "mvad_5_stocks_model"
    
    import mlflow
    
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
    
        model_info = mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path="mvad_artifacts",
            registered_model_name=model_name,
        )
    
  6. Выполните следующую ячейку, чтобы извлечь зарегистрированный путь к модели, который будет использоваться для прогнозирования с использованием песочницы Kusto для Python.

    from mlflow.tracking import MlflowClient
    
    client = MlflowClient()
    mvs = client.search_model_versions(f"name='{model_name}'")
    latest = max(mvs, key=lambda v: v.creation_timestamp)
    model_abfss = latest.source
    print(model_abfss)
    
  7. Скопируйте адрес модели (URI) из последнего вывода ячейки для использования в более позднем шаге.

Часть 8. Настройка набора запросов KQL

Общие сведения см. в разделе "Создание набора запросов KQL".

  1. В рабочей области выберите +Создать элемент>KQL Queryset.
  2. Введите имя MultivariateAnomalyDetectionTutorial, а затем выберите Создать.
  3. В окне концентратора данных OneLake выберите базу данных KQL, в которой хранятся данные.
  4. Нажмите Подключиться.

Часть 9. Прогнозирование аномалий в наборе запросов KQL

  1. Выполните следующий запрос ".create-or-alter function", чтобы определить хранимую функцию predict_fabric_mvad_fl():

    .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
    predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
    {
        let s = artifacts_uri;
        let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                                 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                                 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
        let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
        let code = ```if 1:
            import os
            import shutil
            import mlflow
            model_dir = 'C:/Temp/mvad_model'
            model_data_dir = model_dir + '/data'
            os.mkdir(model_dir)
            shutil.move('C:/Temp/MLmodel', model_dir)
            shutil.move('C:/Temp/conda.yaml', model_dir)
            shutil.move('C:/Temp/requirements.txt', model_dir)
            shutil.move('C:/Temp/python_env.yaml', model_dir)
            shutil.move('C:/Temp/python_model.pkl', model_dir)
            features_cols = kargs["features_cols"]
            trim_result = kargs["trim_result"]
            test_data = df[features_cols]
            model = mlflow.pyfunc.load_model(model_dir)
            predictions = model.predict(test_data)
            predict_result = pd.DataFrame(predictions)
            samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
            if trim_result:                                       # trim the prefix samples
                result = df[samples_offset:]
                result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
            else:
                result = df                                       # output all samples
                result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
            ```;
        samples
        | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
    }
    
  2. Выполните следующий прогнозирующий запрос, заменив URI выходной модели на URI, скопированный в конце шага 7.

    Запрос обнаруживает многовариантные аномалии на пяти акциях на основе обученной модели и отображает результаты в виде anomalychart. Аномальные точки отображаются на первой акции (AAPL), хотя они представляют многовариантные аномалии (другими словами, аномалии совместных изменений пяти акций в определенной дате).

    let cutoff_date=datetime(2023-01-01);
    let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
    let sliding_window=200;                                                                 //  should match the window that was set for model training
    let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
    let num_samples = prefix_score_len + num_predictions;
    demo_stocks_change
    | top num_samples by Date desc
    | order by Date asc
    | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
    | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
                // NOTE: Update artifacts_uri to model path
                artifacts_uri='enter your model URI here',
                trim_result=true)
    | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
    | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
    

Результирующая диаграмма аномалий должна выглядеть следующим образом:

Снимок экрана выходных данных многовариантного анализа аномалий.

Очистка ресурсов

После завершения работы с руководством вы можете удалить созданные ресурсы, чтобы избежать дополнительных затрат. Чтобы удалить ресурсы, выполните следующие действия.

  1. Перейдите на домашнюю страницу рабочей области.
  2. Удалите среду, созданную в этом руководстве.
  3. Удалите ноутбук, созданный в этом уроке.
  4. Удалите хранилище событий или базу данных , используемую в этом руководстве.
  5. Удалите набор запросов KQL, созданный в этом руководстве.