Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Вы можете настроить автозагрузчик для автоматического обнаружения схемы загруженных данных, позволяя инициализировать таблицы без явного объявления схемы данных и изменять схему таблицы по мере введения новых столбцов. Это устраняет необходимость вручную отслеживать и применять изменения схемы с течением времени.
Автозагрузчик также может "спасти" данные, которые оказались неожиданными (например, отличающиеся типы данных) в столбце типа JSON, к которым можно получить доступ позже с помощью API полуструктурированного доступа к данным.
Для вывода и развития схемы поддерживаются следующие форматы:
Формат файла | Поддерживаемые версии |
---|---|
JSON |
Все версии |
CSV |
Все версии |
XML |
Databricks Runtime 14.3 LTS и выше |
Avro |
Databricks Runtime 10.4 LTS и выше |
Parquet |
Databricks Runtime 11.3 LTS и более поздних версий |
ORC |
Не поддерживается |
Text |
Неприменимо (фиксированная схема) |
Binaryfile |
Неприменимо (фиксированная схема) |
Синтаксис для вывода и эволюции схемы
Указание целевого каталога для параметра cloudFiles.schemaLocation
позволяет выводить и развивать схему. Вы можете использовать тот же каталог, который указали для checkpointLocation
. Если вы используете Декларативные конвейеры Lakeflow, Azure Databricks управляет расположением схемы и другими сведениями о контрольной точке автоматически.
Примечание.
Если имеется более одного места расположения исходных данных, загружаемых в целевую таблицу, для каждой рабочей нагрузки приема Auto Loader требуется отдельная контрольная точка потоковой передачи.
В следующем примере используется parquet
для cloudFiles.format
. Используйте csv
, avro
или json
для других источников файлов. Все остальные параметры чтения и записи остаются неизменными для поведения по умолчанию для каждого формата.
Питон
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
язык программирования Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Как работает вывод схемы автозагрузчика?
Чтобы определить схему при первом чтении данных, автозагрузчик выбирает образцы из первых 50 ГБ или 1000 файлов, которые он обнаруживает, в зависимости от того, какой предел достигнут сначала. Автозагрузчик сохраняет сведения о схеме в каталоге _schemas
, настроенном на cloudFiles.schemaLocation
, для отслеживания изменений схемы входных данных с течением времени.
Примечание.
Чтобы изменить размер используемого примера, можно задать конфигурации SQL:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(байтовая строка, например 10gb
)
и
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(целое число)
По умолчанию обнаружение схемы автозагрузчиком стремится избежать проблем с эволюцией схем из-за несоответствия типов. Для форматов, которые не кодируют типы данных (JSON, CSV и XML), автозагрузчик выводит все столбцы в виде строк (включая вложенные поля в JSON-файлах). Для форматов с типизированной схемой (Parquet и Avro) Auto Loader выполняет выборку подмножества файлов и объединяет схемы отдельных файлов. Это поведение приводится в следующей таблице:
Формат файла | Тип данных, определяемый автоматически по умолчанию |
---|---|
JSON |
Строка |
CSV |
Строка |
XML |
Строка |
Avro |
Типы, закодированные в схеме Avro |
Parquet |
Типы, кодируемые в схеме Parquet |
Apache Spark DataFrameReader использует другое поведение для вывода схемы, выбор типов данных для столбцов в json, CSV и XML-источниках на основе примеров данных. Чтобы включить это поведение с помощью автозагрузчика, задайте для параметра значение cloudFiles.inferColumnTypes
true
.
Примечание.
При выводе схемы для данных CSV автозагрузчик предполагает, что файлы содержат заголовки. Если CSV-файлы не содержат заголовков, укажите параметр .option("header", "false")
. Кроме того, Auto Loader объединяет схемы всех файлов в примере, чтобы получить глобальную схему. После этого Auto Loader может считывать каждый файл в соответствии с заголовком и правильно анализировать данные в формате CSV.
Примечание.
Если столбец имеет разные типы данных в двух файлах Parquet, автозагрузчик выбирает самый широкий тип. Для переопределения этого варианта можно использовать schemaHints . При указании схемных подсказок Auto Loader не приводит столбец к указанному типу, а указывает средству чтения Parquet обрабатывать столбец в соответствии с указанным типом. В случае несоответствия столбец переносится в столбец сохраненных данных.
Как работает эволюция схемы автозагрузчика?
Автозагрузчик обнаруживает добавление новых столбцов при обработке данных. Когда автозагрузчик обнаруживает новый столбец, поток останавливается с UnknownFieldException
. Прежде чем поток выдает эту ошибку, Auto Loader выполняет инференцию схемы на последнем микропакете данных и обновляет расположение схемы, добавляя новые столбцы в конец схемы. Типы данных существующих столбцов останутся неизменными.
Databricks рекомендует настраивать потоки Auto Loader с помощью заданий Lakeflow для автоматического перезапуска в случае таких изменений схемы.
Автозагрузчик поддерживает следующие режимы для развития схем, которые задаются в параметре cloudFiles.schemaEvolutionMode
:
Примечание.
режим addNewColumns
используется по умолчанию, если схема не указана, но none
используется по умолчанию при предоставлении схемы.
Как работают разделы с Auto Loader?
Автозагрузчик пытается определить столбцы секций из базовой структуры каталогов данных, если данные размещены в стиле секционирования Hive. Например, путь к base_path/event=click/date=2021-04-01/f0.json
файлу приводит к выводу date
и event
как столбцам секционирования. Если базовая структура каталога содержит конфликтующие секции Hive или не содержит секционирование стилей Hive, столбцы секционирования игнорируются.
Двоичные файлы (binaryFile
) и text
форматы файлов имеют фиксированные схемы данных, но поддерживают вывод столбцов секций. Databricks рекомендует задать cloudFiles.schemaLocation
для этих форматов файлов. Это позволяет избежать возможных ошибок или потери информации и предотвращает определение столбцов разделов при каждом запуске механизма автоматической загрузки.
Столбцы секционирования не учитываются при развитии схемы. Если у вас была начальная структура каталога, например base_path/event=click/date=2021-04-01/f0.json
, а затем начали получать новые файлы в качестве base_path/event=click/date=2021-04-01/hour=01/f1.json
, автозагрузчик игнорирует столбец часа. Для записи сведений о новых столбцах секционирования задайте для cloudFiles.partitionColumns
значение event,date,hour
.
Примечание.
cloudFiles.partitionColumns
Параметр принимает разделенный запятыми список имен столбцов. Анализируются только те столбцы, которые существуют в виде пар key=value
в структуре каталогов.
Что такое столбец спасенных данных?
При автоматическом определении схемы в вашу схему автоматически добавляется столбец данных с именем _rescued_data
. Можно переименовать столбец или включить его в тех случаях, когда вы предоставляете схему, указав опцию rescuedDataColumn
.
Спасаемый столбец данных гарантирует, что столбцы, которые не соответствуют схеме, будут спасены вместо удаления. Столбец спасенных данных содержит любые данные, которые не анализируются по следующим причинам:
- Столбец отсутствует в схеме.
- Несоответствия типов.
- Несоответствия регистра.
Столбец спасенных данных содержит JSON, содержащий спасаемые столбцы и путь к исходному файлу записи.
Примечание.
Синтаксические анализаторы JSON и CSV поддерживают три режима при анализе записей: PERMISSIVE
, DROPMALFORMED
и FAILFAST
. При использовании вместе с rescuedDataColumn
несоответствие типов данных не приводит к удалению записей в режиме DROPMALFORMED
или возникновению ошибки в режиме FAILFAST
. Будут удалены или вызовут ошибку только поврежденные записи, т. е. неполные или неправильные данные JSON или CSV. Если вы используете badRecordsPath
при синтаксическом анализе JSON или CSV, несоответствия типов данных не будут считаться неправильными записями при использовании rescuedDataColumn
. В badRecordsPath
хранятся только неполные и неправильные записи JSON или CSV.
Изменение поведения с учетом регистра
Если чувствительность к регистру не включена, столбцы abc
, Abc
и ABC
считаются одним столбцом для целей определения схемы. Выбор случая является произвольным и зависит от данных выборки. Вы можете использовать подсказки схемы, чтобы задать, какой падеж следует использовать. После того как был сделан выбор и схема была выведена, Автозагрузчик не будет учитывать варианты регистра, которые не были выбраны в соответствии со схемой.
Если столбец спасенных данных включен, поля, именованные в случае, отличном от схемы, загружаются в _rescued_data
столбец. Измените это поведение, установив для параметра readerCaseSensitive
значение false. В этом случае автозагрузчик будет считывать данные без учета регистра.
Переопределение вывода схемы с помощью подсказок для схемы
Вы можете использовать подсказки схемы для принудительного применения сведений о схеме, которые вы знаете и ожидаете в выводимых схемах. Если вы знаете, что столбец имеет определенный тип данных или вы хотите выбрать более общий тип данных (например, double
вместо него integer
), можно указать произвольное количество подсказок для типов данных столбцов в виде строки с помощью синтаксиса спецификации схемы SQL, например следующего:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
См. документацию по типам данных для списка поддерживаемых типов данных.
Если столбец отсутствует в начале потока, можно также использовать указания схемы, чтобы добавить этот столбец в выведенную схему.
Вот пример выведенной схемы, чтобы увидеть поведение с подсказками схемы.
Выведенная схема:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
Задав следующие указания схемы:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
вы получите:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Примечание.
Поддержка подсказок схемы массива и карт доступна в Databricks Runtime 9.1 LTS и более поздних версиях.
Ниже приведен пример выведенной схемы со сложными типами, чтобы увидеть поведение с подсказками схемы.
Выведенная схема:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
Задав следующие указания схемы:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
вы получите:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Примечание.
Подсказки схемы используются только в том случае, если вы не предоставляете схему автозагрузчику. Указания схемы можно использовать независимо от того, включен ли параметр cloudFiles.inferColumnTypes
или отключен.