Создание и обновление определения задания Spark с форматом версии 2 с помощью REST API Microsoft Fabric

Определение задания Spark (SJD) — это тип элемента Fabric, который позволяет пользователям определять и запускать задания Apache Spark в Microsoft Fabric. API определения заданий Spark версии 2 позволяет пользователям создавать и обновлять элементы определения заданий Spark с новым форматом SparkJobDefinitionV2. Основное преимущество использования формата версии 2 заключается в том, что пользователи могут управлять основным исполняемым файлом и другими файлами библиотеки с одним вызовом API, а не использовать API хранилища для отправки файлов отдельно, для управления файлами больше маркера хранения не требуется.

Предпосылки

REST API Microsoft Fabric определяет единую конечную точку для операций CRUD элементов Fabric. Конечная точка https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items.

Общие сведения о формате определения задания Spark версии 2

В данных по управлению элементом определения задания Spark поле definition предназначено для указания подробной настройки элемента определения задания Spark. Поле definition содержит два подфилда: format и parts. Поле format задает формат элемента определения задания Spark, который должен быть SparkJobDefinitionV2 для формата версии 2.

Поле parts представляет собой массив, содержащий подробную настройку элемента определения задания Spark. Каждый элемент в массиве parts представляет часть подробной настройки. Каждая часть содержит три подфилда: path, payloadи payloadType. Поле path указывает путь к части, payload поле указывает содержимое части, закодированной в кодировке Base64, и payloadType поле указывает тип полезных данных, которые должны быть InlineBase64.

Это важно

Этот формат версии 2 поддерживает только определения заданий Spark с форматами файлов .py или scala. Формат файла .jar не поддерживается.

Создание элемента определения задания Spark с основным файлом определения и другими файлами lib

В следующем примере мы создадим элемент определения задания Spark, который:

  1. Имя — SJDHelloWorld.
  2. Основной файл определения — main.py, который считывает CSV-файл из своего стандартного Lakehouse и сохраняет в виде таблицы Delta обратно в тот же Lakehouse.
  3. Другой файл lib — это libs.py, который содержит служебную функцию для возврата имени CSV-файла и таблицы Delta.
  4. По умолчанию Lakehouse имеет определенный идентификатор артефакта Lakehouse.

Ниже приведены подробные полезные данные для создания элемента определения задания Spark.

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib1.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

Для кодирования/декодирования детальной настройки воспользуйтесь следующими вспомогательными функциями в Python. Существуют также другие онлайн-инструменты, например https://www.base64decode.org/ , которые могут выполнять ту же работу.

import base64

def json_to_base64(json_data):
    # Serialize the JSON data to a string
    json_string = json.dumps(json_data)
    
    # Encode the JSON string as bytes
    json_bytes = json_string.encode('utf-8')
    
    # Encode the bytes as Base64
    base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
    
    return base64_encoded

def base64_to_json(base64_data):
    # Decode the Base64-encoded string to bytes
    base64_bytes = base64_data.encode('utf-8')
    
    # Decode the bytes to a JSON string
    json_string = base64.b64decode(base64_bytes).decode('utf-8')
    
    # Deserialize the JSON string to a Python dictionary
    json_data = json.loads(json_string)
    
    return json_data

Ответ HTTP-кода 202 указывает, что элемент определения задания Spark был успешно создан.

Получить определение задания Spark с частями определения в формате v2

В новом формате версии 2, при получении элемента определения задания Spark с частями определения, содержимое основного файла определения и других библиотечных файлов включается в полезную нагрузку ответа, будучи закодированным в формате base64 в поле parts. Ниже приведен пример получения элемента определения задания Spark с частями определения:

  1. Сначала выполните запрос POST к конечной точке https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}/getDefinitionParts?format=SparkJobDefinitionV2. Убедитесь, что значение параметра запроса формата равно SparkJobDefinitionV2.
  2. Затем в заголовках ответа проверьте код состояния HTTP. Код HTTP 202 указывает, что запрос был успешно принят. x-ms-operation-id Скопируйте значение из заголовков ответа.
  3. Наконец, выполните запрос GET к конечной точке https://api.fabric.microsoft.com/v1/operations/{operationId} с скопированным x-ms-operation-id значением, чтобы получить результат операции. В поле definition в полезных данных ответа содержится подробное описание элемента определения задания Spark, включая основной файл определения и другие файлы библиотеки под полем parts.

Обновите элемент определения задания Spark с помощью основного файла определения и других файлов lib в формате версии 2

Чтобы обновить существующий элемент описания задания Spark с главным файлом определения и другими файлами библиотек в формате v2, можно использовать аналогичную структуру полезных данных, как и при операции создания. Ниже приведен пример обновления элемента определения задания Spark, созданного в предыдущем разделе:

{
  "displayName": "SJDHelloWorld",
  "type": "SparkJobDefinition",
  "definition": {
    "format": "SparkJobDefinitionV2",
    "parts": [
      {
        "path": "SparkJobDefinitionV1.json",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Main/main.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      },
      {
        "path": "Libs/lib2.py",
        "payload": "<REDACTED>",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

При использовании приведенного выше пейлоада в файлы вносятся следующие изменения:

  1. Файл main.py обновляется с новым содержимым.
  2. Lib1.py удаляется из этого элемента определения задания Spark, а также удаляется из хранилища OneLake.
  3. Новый lib2.py файл добавляется в этот элемент определения задания Spark и передается в хранилище OneLake.

Чтобы обновить элемент определения задания Spark, выполните запрос POST к конечной точке https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid} с вышеуказанной нагрузкой данных. Ответ HTTP-кода 202 указывает, что элемент определения задания Spark был успешно обновлен.