Поделиться через


Руководство. Индексирование больших данных из Apache Spark с помощью SynapseML и поиска ИИ Azure

В этом руководстве по поиску ИИ Azure вы узнаете, как индексировать и запрашивать большие данные, загруженные из кластера Spark. Вы настроили Jupyter Notebook для следующих целей:

  • Загрузка различных форм (счетов) в кадр данных в сеансе Apache Spark
  • Анализ форм для определения их функций
  • Соберите полученный результат в табличную структуру данных
  • Запишите результаты в индекс поиска, размещенный в службе Azure AI Search
  • Исследуйте и запрашивайте контент, созданный вами

В этом руководстве используется зависимость от SynapseML, библиотека с открытым кодом, которая поддерживает массовое параллельное машинное обучение по большим данным. В SynapseML индексирование поиска и машинное обучение предоставляются через преобразователи , выполняющие специализированные задачи. Трансформеры используют широкий спектр возможностей интеллекта. В этом упражнении вы используете API AzureSearchWriter для анализа и обогащения ИИ.

Хотя служба поиска ИИ Azure имеет собственное обогащение ИИ, в этом руководстве показано, как получить доступ к возможностям ИИ за пределами службы "Поиск ИИ Azure". Используя SynapseML вместо индексаторов или навыков, вы не подвергаетесь ограничениям данных или другим ограничениям, связанным с этими объектами.

Совет

Просмотрите короткое видео этой демонстрации. Видео дополняет это руководство новыми шагами и визуальными элементами.

Предварительные условия

Вам нужна synapseml библиотека и несколько ресурсов Azure. По возможности используйте ту же подписку и регион для ресурсов Azure и поместите все в одну группу ресурсов для простой очистки позже. Следующие ссылки предназначены для установки портала. Пример данных импортируется из общедоступного сайта.

1 Эта ссылка ведет к руководству по загрузке пакета.

2 Вы можете использовать уровень "Бесплатный" для индексирования примеров данных, но выберите более высокий уровень , если объемы данных большие. Для платных уровней укажите поисковый ключ API в шаге Настройка зависимостей.

3 В этом руководстве используется Аналитика документов Azure и Azure AI Translator. В приведенных ниже инструкциях укажите ключ мультисервисной учетной записи и регион. Один и тот же ключ работает для обеих служб. В этом руководстве важно использовать многосервисную учетную запись служб ИИ Azure с типом API AIServices. Вы можете проверить тип API на портале Azure в разделе "Обзор" страницы учетной записи служб Azure AI с несколькими службами. Дополнительные сведения о типе API см. в статье "Присоединение ресурса служб ИИ Azure с несколькими службами" в службе "Поиск ИИ Azure".

4 В этом руководстве Azure Databricks предоставляет платформу вычислений Spark. Мы использовали инструкции портала для настройки кластера и рабочей области.

Примечание.

Предыдущие ресурсы Azure поддерживают функции безопасности на платформе удостоверений Майкрософт. Для простоты в этом руководстве предполагается аутентификация на основе ключей с использованием конечных точек и ключей, скопированных с страниц портала Azure каждой службы. Если вы реализуете этот рабочий процесс в рабочей среде или совместно используете решение с другими пользователями, не забудьте заменить жестко закодированные ключи интегрированными ключами безопасности или зашифрованными ключами.

Создание кластера Spark и записной книжки

В этом разделе описано, как создать кластер, установить synapseml библиотеку и создать записную книжку для запуска кода.

  1. На портале Azure найдите рабочую область Azure Databricks и выберите "Запустить рабочую область".

  2. В меню слева выберите "Вычисления".

  3. Выберите "Создать вычисления".

  4. Примите конфигурацию по умолчанию. Создание кластера занимает несколько минут.

  5. Убедитесь, что кластер включен и работает. Зеленая точка по имени кластера подтверждает его состояние.

    Снимок экрана: страница вычислений Data Bricks с зеленой точкой по имени кластера.

  6. После создания кластера установите библиотеку synapseml :

    1. Выберите библиотеки на вкладках в верхней части страницы кластера.

    2. Нажмите кнопку "Установить новую".

      Снимок экрана: команда

    3. Выберите Maven.

    4. Введите com.microsoft.azure:synapseml_2.12:1.0.9 в координатах.

    5. Выберите Установить.

      Снимок экрана: спецификация пакета Maven.

  7. В меню слева выберите "Создать>записную книжку".

    Снимок экрана: команда

  8. Присвойте записной книжке имя, выберите Python в качестве языка по умолчанию и выберите кластер с библиотекой synapseml .

  9. Создайте семь последовательных ячеек. В следующих разделах вставьте код в эти ячейки.

    Скриншот блокнота с ячейками-заполнителями.

Настройка зависимостей

Вставьте следующий код в первую ячейку записной книжки.

Замените заполнители конечными точками и ключами доступа для каждого из ресурсов. Укажите имя для создания нового индекса поиска. Никаких других изменений не требуется, поэтому запустите код, когда вы будете готовы.

Этот код импортирует несколько пакетов и настраивает доступ к ресурсам Azure, используемым в этом руководстве.

import os
from pyspark.sql.functions import udf, trim, split, explode, col, monotonically_increasing_id, lit
from pyspark.sql.types import StringType
from synapse.ml.core.spark import FluentAPI

cognitive_services_key = "placeholder-azure-ai-services-multi-service-key"
cognitive_services_region = "placeholder-azure-ai-services-region"

search_service = "placeholder-search-service-name"
search_key = "placeholder-search-service-admin-api-key"
search_index = "placeholder-for-new-search-index-name"

Загрузка данных в Spark

Вставьте следующий код во вторую ячейку. Никаких изменений не требуется, поэтому запустите код, когда вы будете готовы.

Этот код загружает несколько внешних файлов из учетной записи хранения Azure. Файлы представляют собой различные счета, которые считываются в структуру данных.

def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)


df2 = (spark.read.format("binaryFile")
    .load("wasbs://[email protected]/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache())
    
display(df2)

Добавление интеллекта к документам

Вставьте следующий код в третью ячейку. Никаких изменений не требуется, поэтому запустите код, когда вы будете готовы.

Этот код загружает преобразователь AnalyzeInvoices и передает ссылку на кадр данных, содержащий счета. Она вызывает предварительно настроенную модель обработки счетов Azure AI Document Intelligence для извлечения сведений из счетов.

from synapse.ml.services import AnalyzeInvoices

analyzed_df = (AnalyzeInvoices()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache())

display(analyzed_df)

Выходные данные должны выглядеть примерно так, как показано на следующем снимок экрана. Обратите внимание, что анализ форм упакован в плотно структурированный столбец, с которым трудно работать. Следующее преобразование устраняет эту проблему, анализируя столбец в строки и столбцы.

Снимок экрана: выходные данные AnalyzeInvoices.

Реструктурирование выходных данных аналитики документов

Вставьте следующий код в четвертую ячейку и запустите ее. Никаких изменений не требуется.

Этот код загружает FormOntologyLearner, преобразователь, который анализирует выходные данные преобразователей аналитики документов и выводит табличную структуру данных. Выходные данные AnalyzeInvoices являются динамическими и зависят от функций, обнаруженных в содержимом. Кроме того, преобразователь объединяет выходные данные в один столбец. Поскольку выходные данные являются динамическими и консолидированными, их трудно использовать в последующих преобразованиях, требующих более высокой структурированности.

FormOntologyLearner расширяет программу преобразователя AnalyzeInvoices путем поиска шаблонов, которые можно использовать для создания табличной структуры данных. Упорядочение выходных данных в нескольких столбцах и строках позволяет использовать содержимое в других преобразователях, таких как AzureSearchWriter.

from synapse.ml.cognitive import FormOntologyLearner

itemized_df = (FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*").select("*", explode(col("Items")).alias("Item"))
    .drop("Items").select("Item.*", "*").drop("Item"))

display(itemized_df)

Обратите внимание, как это преобразование переадресует вложенные поля в таблицу, что позволяет выполнять следующие два преобразования. Этот снимок экрана обрезается для краткости. Если вы следуете вместе с собственным блокнотом, у вас есть 19 столбцов и 26 строк.

Снимок экрана: выходные данные FormOntologyLearner.

Добавление переводов

Вставьте следующий код в пятую ячейку. Никаких изменений не требуется, поэтому запустите код, когда вы будете готовы.

Этот код загружает Translate, трансформер, который вызывает службу Azure AI Переводчик в сервисах ИИ Azure. Исходный текст, который находится на английском языке в столбце "Описание", преобразуется на различные языки. Все выходные данные объединяются в массив output.translations.

from synapse.ml.cognitive import Translate

translated_df = (Translate()
    .setSubscriptionKey(cognitive_services_key)
    .setLocation(cognitive_services_region)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache())

display(translated_df)

Совет

Чтобы проверить переведенные строки, прокрутите страницу до конца строк.

Снимок экрана: выходные данные таблицы с столбцом

Добавление индекса поиска с помощью AzureSearchWriter

Вставьте следующий код в шестую ячейку и запустите его. Никаких изменений не требуется.

Этот код загружает AzureSearchWriter. Он использует табличный набор данных и выводит схему индекса поиска, которая определяет одно поле для каждого столбца. Поскольку структура переводов представляет собой массив, он сформулируется в индексе как сложная коллекция с подполями для каждого перевода языка. Созданный индекс содержит ключ документа и использует значения по умолчанию для полей, созданных с помощью REST API создания индекса.

from synapse.ml.cognitive import *

(translated_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    ))

Чтобы изучить определение индекса, созданное AzureSearchWriter, проверьте страницы службы поиска на портале Azure.

Примечание.

Если вы не можете использовать индекс поиска по умолчанию, можно указать внешнее настраиваемое определение в JSON, передав его URI в виде строки в свойстве IndexJson. Сначала создайте индекс по умолчанию, чтобы узнать, какие поля нужно указать, а затем следовать настраиваемым свойствам, если вам нужны определенные анализаторы, например.

Запросить индекс

Вставьте следующий код в седьмую ячейку и запустите его. Никаких изменений не требуется, за исключением того, что вам может потребоваться изменить синтаксис или попробовать дополнительные примеры для дальнейшего изучения содержимого:

Нет преобразователя или модуля, который выдает запросы. Эта ячейка является простым вызовом REST API поиска документов.

В этом конкретном примере выполняется поиск слова "дверь" ("search": "door"). Он также возвращает "число" количества соответствующих документов и выбирает только содержимое полей "Описание" и "Переводы" для результатов. Если вы хотите просмотреть полный список полей, удалите параметр select.

import requests

url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2024-07-01".format(search_service, search_index)
requests.post(url, json={"search": "door", "count": "true", "select": "Description, Translations"}, headers={"api-key": search_key}).json()

На следующем снимке экрана показаны выходные данные ячейки для примера скрипта.

Снимок экрана: результаты запроса, показывающие количество, строку поиска и возвращаемые поля.

Очистка ресурсов

Если вы работаете в своей подписке, после завершения проекта целесообразно удалить ресурсы, которые вам больше не нужны. Ресурсы, которые продолжат работать, могут быть платными. Вы можете удалить ресурсы по отдельности либо удалить всю группу ресурсов.

Ресурсы и управление ими можно найти в портал Azure, используя ссылку "Все ресурсы" или "Группы ресурсов" в области навигации слева.

Следующие шаги

В этом руководстве вы узнали о преобразователе AzureSearchWriter в SynapseML, который является новым способом создания и загрузки индексов поиска в поиске ИИ Azure. Преобразователь принимает структурированный JSON в качестве входных данных. FormOntologyLearner может предоставить необходимую структуру для выходных данных, созданных преобразователями аналитики документов в SynapseML.

На следующем шаге ознакомьтесь с другими руководствами SynapseML, которые создают преобразованное содержимое, которое может потребоваться изучить с помощью поиска ИИ Azure: