Поделиться через


Выведение и развитие схемы с помощью from_json в декларативных конвейерах Lakeflow

Это важно

Эта функция предоставляется в режиме общедоступной предварительной версии.

В этой статье описывается, как выводить и развивать схему JSON-блоков с помощью функции SQL в Декларативных конвейерах Lakeflow.

Обзор

Функция from_json SQL анализирует строковый столбец JSON и возвращает значение структуры. При использовании вне декларативного конвейера Lakeflow необходимо явно указать схему возвращаемого значения с помощью аргумента schema . При использовании с декларативными конвейерами Lakeflow можно включить определение и эволюцию схемы, которые автоматически управляют схемой возвращаемого значения. Эта функция упрощает начальную настройку (особенно если схема неизвестна) и текущие операции при частом изменении схемы. Она позволяет легко обрабатывать произвольные большие двоичные объекты JSON из потоковых источников данных, таких как автозагрузчик, Kafka или Kinesis.

В частности, при использовании в декларативных конвейерах Lakeflow, функция SQL from_json может выполнять вывод и эволюцию схем.

  • Обнаружение новых полей в входящих записях JSON (включая вложенные объекты JSON)
  • Вывод типов полей и сопоставление их с соответствующими типами данных Spark
  • Автоматическое развитие схемы для размещения новых полей
  • Автоматическая обработка данных, которые не соответствуют текущей схеме

Синтаксис: автоматически вывод и развитие схемы

Если вы используете from_json с декларативными конвейерами Lakeflow, онa может автоматически определять и развивать схему. Чтобы включить эту функцию, задайте для схемы значение NULL и укажите schemaLocationKey параметр. Это позволяет ему выводить и отслеживать схему.

SQL

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))

Питон

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})

Запрос может иметь несколько from_json выражений, но каждое выражение должно иметь уникальный schemaLocationKey. Элемент schemaLocationKey также должен быть уникальным для каждого конвейера.

SQL

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Питон

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

Синтаксис: фиксированная схема

Если вы хотите применить определенную схему вместо этого, можно использовать следующий from_json синтаксис, чтобы проанализировать строку JSON с помощью этой схемы:

from_json(jsonStr, schema, [, options])

Этот синтаксис можно использовать в любой среде Azure Databricks, включая декларативные конвейеры Lakeflow. Дополнительные сведения см. здесь.

Вывод схемы

from_json выводит схему из первого пакета столбцов данных JSON и внутренне индексирует ее ( schemaLocationKey обязательно).

Если строка JSON является одним объектом (например, {"id": 123, "name": "John"}), from_json определяет схему типа STRUCT и добавляет rescuedDataColumn в список полей.

STRUCT<id LONG, name STRING, _rescued_data STRING>

Однако если строка JSON содержит массив верхнего уровня (например ["id": 123, "name": "John"]), то from_json оборачивает этот массив в структуру. Этот подход позволяет получить данные, несовместимые с выводимой схемой. У вас есть возможность разделить значения массива на отдельные строки в последующих строках.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Переопределение процесса определения схемы с помощью подсказок схемы

Вы можете опционально указать schemaHints, чтобы повлиять на то, как from_json определяет тип столбца. Это полезно, если вы знаете, что столбец имеет определенный тип данных или вы хотите выбрать более общий тип данных (например, двойной вместо целого числа). Можно указать произвольное количество подсказок для типов данных столбцов с помощью синтаксиса спецификации схемы SQL. Семантика для подсказок схемы такая же, как для подсказок схемы автозагрузчика. Рассмотрим пример.

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

Если строка JSON содержит массив верхнего уровня, он оборачивается в структуру. В этих случаях указания схемы применяются к схеме ARRAY вместо оболочки структуры. Например, рассмотрим строку JSON с массивом верхнего уровня, например:

[{"id": 123, "name": "John"}]

Схема выводимого МАССИВа упаковывается в структуру STRUCT:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Чтобы изменить тип idданных, укажите указание схемы в виде element.id STRING. Чтобы добавить новый столбец типа DOUBLE, укажите element.new_col DOUBLE. Из-за этих подсказок схема для массива JSON верхнего уровня становится следующей:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Развивайте схему, используя schemaEvolutionMode

from_json обнаруживает добавление новых столбцов при обработке данных. При обнаружении нового поля from_json обновляет выводимую схему, добавляя новые столбцы в конец последней версии схемы. Типы данных существующих столбцов останутся неизменными. После обновления схемы конвейер перезапускается автоматически с обновленной схемой.

from_json поддерживает следующие режимы для эволюции схемы, которые задаются с помощью необязательного schemaEvolutionMode параметра. Эти режимы согласованы с автозагрузчиком.

schemaEvolutionMode Поведение при чтении нового столбца
addNewColumns (по умолчанию) Поток не работает. Новые столбцы будут добавлены в схему. Существующие столбцы не развивают типы данных.
rescue Схема никогда не эволюционирует, и поток не завершается ошибкой из-за изменений схемы. Все новые столбцы записываются в спасаемом столбце данных.
failOnNewColumns Поток не работает. Поток не перезапускается, если schemaHints не будут обновлены или проблемные данные не будут удалены.
none Не развивается схема, новые столбцы игнорируются, и данные не спасаются, если rescuedDataColumn параметр не задан. Поток не дает сбой из-за изменений схемы.

Рассмотрим пример.

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

столбец данных спасенный

В схему _rescued_dataавтоматически добавляется столбец спасенных данных. Можно переименовать столбец, задав rescuedDataColumn параметр. Рассмотрим пример.

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

При выборе использования спасаемого столбца данных все столбцы, которые не соответствуют выводимой схеме, будут спасены вместо удаления. Это может произойти из-за несоответствия типов данных, отсутствия столбца в схеме или различия в регистре имени столбца.

Обработка поврежденных записей

Чтобы сохранить записи, которые неправильно сформированы и не могут быть проанализированы, добавьте столбец, задав _corrupt_record указания схемы, как показано в следующем примере:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Чтобы переименовать столбец поврежденной записи, задайте columnNameOfCorruptRecord параметр.

Средство синтаксического анализа JSON поддерживает три режима обработки поврежденных записей:

Режим Описание
PERMISSIVE Для поврежденных записей помещает неправильно сформированную строку в поле, настроенное с помощью columnNameOfCorruptRecord, и задаёт неправильно сформированные поля в null. Чтобы сохранить поврежденные записи, можно задать поле типа строки с именем columnNameOfCorruptRecord в определяемой пользователем схеме. Если в схеме нет поля, во время синтаксического анализа удаляются поврежденные записи. При выведении схемы средство синтаксического анализа неявно добавляет columnNameOfCorruptRecord поле в выходную схему.
DROPMALFORMED Игнорирует поврежденные записи.
При использовании DROPMALFORMED режима с rescuedDataColumnнесоответствиями типов данных не удаляются записи. Удаляются только поврежденные записи, такие как неполный или неправильный формат JSON.
FAILFAST Вызывает исключение при обнаружении поврежденных записей парсером.
При использовании режима FAILFAST с rescuedDataColumn типом данных, несоответствия типов данных не вызывают ошибки. Только поврежденные записи вызывают ошибки, такие как неполный или неправильный формат JSON.

См. поле в выходных данных from_json

from_json определяет схему во время выполнения конвейера. Если последующий запрос ссылается на from_json поле, прежде чем from_json функция успешно выполнится по крайней мере один раз, поле не может быть обработано, и запрос пропускается. В следующем примере анализ запроса серебряной таблицы будет пропущен до тех пор, пока в бронзовом запросе не выполнится функция from_json и не будет определена схема.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

from_json Если функция и поля, которые она выводит, упоминаются в том же запросе, анализ может завершиться ошибкой, как в следующем примере:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Это можно исправить, переместив ссылку на from_json поле в подчиненный запрос (например, приведенный выше пример бронзы или серебра).) Кроме того, можно указать schemaHints , что содержит указанные from_json поля. Рассмотрим пример.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

Примеры. Автоматическое вывод и развитие схемы

В этом разделе приведен пример кода для включения автоматической инференции и развития схемы с помощью from_json в декларативных конвейерах Lakeflow.

Создание потоковой таблицы из облачного хранилища

В следующем примере используется read_files синтаксис для создания потоковой таблицы из облачного хранилища объектов.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Питон

@dlt.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

Создайте потоковую таблицу на основе Kafka

В следующем примере используется read_kafka синтаксис для создания потоковой таблицы из Kafka.

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

Питон

@dlt.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

Примеры: фиксированная схема

Пример кода с использованием фиксированной схемы см. в функции .

Вопросы и ответы

В этом разделе приведены ответы на часто задаваемые вопросы о поддержке вывода схемы и эволюции в from_json функции.

Что такое разница между from_json и parse_json?

Функция parse_json возвращает VARIANT значение из строки JSON.

VARIANT предоставляет гибкий и эффективный способ хранения полуструктурированных данных. Это позволяет обойти вывод схемы и её эволюцию, полностью отказавшись от строгих типов. Однако, если вы хотите применить схему во время записи (например, так как у вас есть относительно строгая схема), from_json может оказаться лучшим вариантом.

В следующей таблице описываются различия между from_json и parse_json:

Функция Случаи использования Доступность
from_json Эволюция схемы с from_json сохранением схемы. Это полезно, когда:
  • Вы хотите обеспечить выполнение вашей схемы данных (например, проверяя каждое изменение схемы перед его сохранением).
  • Вы хотите оптимизировать хранилище и нуждаетесь в низкой задержке обработки запросов и низких затратах.
  • Вы хотите получать ошибку на данных с несовпадением типов.
  • Вы хотите извлечь частичные результаты из поврежденных записей JSON и сохранить неправильно сформированную запись в столбце _corrupt_record . Наоборот, обработка VARIANT возвращает ошибку для некорректного JSON.
Доступно с выводом схемы и эволюцией только в декларативных конвейерах Lakeflow
parse_json VARIANT особенно хорошо подходит для хранения данных, которые не должны быть схематизированы. Рассмотрим пример.
  • Вы хотите сохранить полуструктурированные данные, так как они гибки.
  • Схема изменяется слишком быстро, чтобы привести ее в схему без частых сбоев потока и перезапусков.
  • Вы не хотите допустить сбой из-за данных с несоответствующими типами. (Прием VARIANT всегда будет успешным для допустимых записей JSON, даже если существуют несоответствия типов.)
  • Пользователи не хотят иметь дело с колонкой восстановленных данных, содержащей поля, несоответствующие схеме.
Доступно с декларативными конвейерами Lakeflow и без них

Можно ли использовать from_json синтаксис вывода схемы и эволюции за пределами декларативных конвейеров Lakeflow?

Нет, нельзя использовать синтаксис определения схемы и эволюции from_json за пределами декларативных конвейеров Lakeflow.

Как получить доступ к схеме, выводимой с помощью from_json?

Просмотрите схему целевой потоковой таблицы.

Можно ли передать from_json схему, а также сделать эволюцию?

Нет, вы не можете передать from_json схему и одновременно осуществлять эволюцию. Однако, можно указать подсказки схемы для переопределения некоторых или всех полей, выводимых from_json.

Что происходит со схемой, если таблица полностью обновлена?

Расположения схемы, связанные с таблицей, очищаются, и схема повторно выводится с нуля.