Azure OpenAI для больших данных

Вы можете использовать службу Azure OpenAI для решения многих задач естественного языка, запрашивая API завершения. Чтобы упростить масштабирование рабочих процессов подбора подсказок от нескольких примеров до крупных наборов данных, служба Azure OpenAI интегрируется с распределенной библиотекой машинного обучения SynapseML. С помощью этой интеграции можно использовать платформу распределенных вычислений Apache Spark для обработки миллионов запросов с помощью службы OpenAI. В этом руководстве показано, как применять большие языковые модели в распределенном масштабе с помощью Azure OpenAI и Microsoft Fabric.

Предварительные требования

Основные предварительные требования для этого краткого руководства включают действующий ресурс Azure OpenAI и кластер Apache Spark с установленным SynapseML.

  • Получите подписку Microsoft Fabric. Или зарегистрируйте бесплатную пробную версию Microsoft Fabric.

  • Войдите в Microsoft Fabric.

  • Перейдите в Fabric с помощью переключателя интерфейса в левой нижней части домашней страницы.

    Снимок экрана с выбором Fabric в меню переключателя интерфейса.

Импортируйте это руководство как записную книжку

Следующим шагом станет добавление этого кода в кластер Spark. Вы можете создать записную книжку на платформе Spark и скопировать код в эту записную книжку, чтобы запустить демонстрацию. Или скачайте записную книжку и импортируйте ее в Synapse Analytics.

  1. Скачайте эту демонстрацию в виде записной книжки (выберите "Необработанный", а затем сохраните файл)
  2. Импортируйте записную книжку в рабочую область Synapse или, если используете Fabric, импортируйте в рабочую область Fabric
  3. Установите SynapseML в кластере. См. инструкции по установке Synapse в нижней части веб-сайта SynapseML. Если вы используете Fabric, ознакомьтесь с руководством по установке. Для этого шага требуется вставка дополнительной ячейки в верхней части импортированной записной книжки.
  4. Подключите записную книжку к кластеру и следуйте процессу редактирования и запуска ячеек.

Заполнение сведений о службе

Затем измените ячейку в блокноте, чтобы она указывала на вашу службу. Установите переменные service_name, deployment_name, location и key, соответствующие вашей службе OpenAI:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-4.1-mini"
deployment_name_embeddings = "text-embedding-3-small"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

Создавайте набор данных с подсказками

Далее создайте кадр данных, состоящий из ряда строк, с одним запросом для каждой строки.

Вы также можете загружать данные непосредственно из ADLS или других баз данных. Дополнительные сведения о загрузке и подготовке кадров данных Spark см. в руководстве по загрузке данных Apache Spark.

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code that's",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

Создание клиента OpenAIPrompt Apache Spark

Чтобы применить службу OpenAI Azure к кадру данных, создайте объект OpenAIPrompt, который выступает в качестве распределенного клиента. Задайте параметры службы, используя одно значение или столбец фрейма данных, с помощью соответствующих методов задания объекта OpenAIPrompt. В этом примере задайте maxTokens значение 200. Токен состоит примерно из четырех символов, и это ограничение распространяется на сумму запроса и результата. promptCol Задайте параметр с именем столбца запроса в кадре данных.

from synapse.ml.services.openai import OpenAIPrompt

completion = (
    OpenAIPrompt()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Преобразование фрейма данных с помощью клиента OpenAIPrompt

После создания фрейма данных и клиента команд преобразуйте входной набор данных и добавьте столбец completions, содержащий всю информацию, которую добавляет служба. Выделите просто текст для простоты.

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

Выходные данные должны выглядеть примерно так. Текст завершения отличается от примера.

prompt error text
Привет, меня зовут null Меня зовут Макавали, мне 18 лет. Когда я вырасту, я хочу быть рэпером. Я люблю писать и делать музыку. Я из Лос-Анджелеса, штат Калифорния.
Лучший код — это код, который null Это понятно: это субъективное утверждение, и нет окончательного ответа.
SynapseML — это null Алгоритм машинного обучения, который может научиться прогнозировать будущие результаты событий.

Дополнительные примеры использования

Генерация текстовых эмбеддингов

Помимо завершения текста, можно также внедрить текст для использования в последующих алгоритмах или векторных архитектурах извлечения. Создавая внедрения, можно выполнять поиск и получение документов из больших коллекций. Используйте этот подход, если разработка подсказок недостаточна для выполнения задачи. Дополнительные сведения об использовании OpenAIEmbeddingсм. в руководстве по внедрению.

from synapse.ml.services.openai import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

Завершение чата

Модели, такие как GPT-4o и GPT-4.1, понимают чаты вместо отдельных запросов. Преобразователь OpenAIChatCompletion предоставляет эту функцию в масштабах.

from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "What's your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

Повышение пропускной способности с помощью пакетной обработки запросов

В этом примере выполняется несколько запросов к службе, по одному для каждой подсказки. Чтобы выполнить сразу несколько запросов, используйте пакетный режим. Во-первых, в объекте OpenAIPrompt вместо указания столбца "Запрос" задайте "batchPrompt" для столбца BatchPrompt. Для этого создайте кадр данных, в котором в каждой строке будет список запросов.

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

Затем создайте OpenAIPrompt объект. Вместо задания столбца запроса задайте столбец batchPrompt, если он имеет тип Array[String].

batch_completion = (
    OpenAIPrompt()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

При вызове преобразования запрос создаётся для каждой строки. Так как каждая строка содержит несколько запросов, каждый запрос отправляет все запросы в этой строке. Результаты содержат строку для каждой строки в запросе.

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

Использование автоматического минибатчера

Если ваши данные в формате столбцов, их можно транспонировать в формат строк с помощью SynapseML FixedMiniBatcherTransformer.

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

Программирование подсказок для перевода

Служба OpenAI Azure может решать множество различных задач естественного языка с помощью prompt engineering. В этом примере показано запрос на перевод на язык:

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

Запрос на получение ответов на вопросы

В этом примере модель запрашивает ответы на вопросы общего знания:

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))