Начните использование COPY INTO для загрузки данных

С помощью COPY INTO команды SQL можно загрузить данные из расположения файла в таблицу Delta. COPY INTO поддерживает возможность повторения и является идемпотентным — файлы в исходном расположении, которые уже были загружены, пропускаются при последующих запусках.

COPY INTO предлагает следующие возможности:

  • Легко настраиваемые фильтры для файлов или папок из облачного хранилища, включая S3, ADLS, ABFS, GCS и тома Unity Catalog.
  • Поддержка нескольких форматов исходных файлов: CSV, JSON, XML, Avro, ORC, Parquet, text и двоичных файлов.
  • Обработка файлов с гарантией "точно один раз" (идемпотентная) по умолчанию.
  • Интерпретация целевой схемы таблицы, сопоставление, слияние и эволюция.

Примечание.

Для более масштабируемого и надежного приема файлов Databricks рекомендует пользователям SQL использовать потоковые таблицы. Дополнительные сведения см. в таблицах потоковой передачи.

Предупреждение

COPY INTO учитывает параметр рабочей области для векторов удаления. Если этот параметр включен, векторы удаления активируются в целевой таблице, когда COPY INTO выполняется в хранилище SQL или вычислительных ресурсах, работающих под управлением Databricks Runtime 14.0 или более поздних версий. После включения векторов удаления они блокируют запросы к таблице в Databricks Runtime 11.3 LTS и ниже. См. Векторы удаления в Databricks и Автоматическое включение векторов удаления.

Перед тем как начать

Администратор учетной записи должен выполнить действия, описанные в разделе "Настройка доступа к данным для приема данных", чтобы настроить доступ к данным в облачном хранилище объектов, прежде чем пользователи смогут загружать данные с помощью COPY INTO.

Загрузка данных в таблицу Delta Lake без схемы

В Databricks Runtime версии 11.3 LTS и выше можно создать пустые таблицы Delta, чтобы схема определялась во время выполнения команды, установив COPY INTO в mergeSchema значение true в COPY_OPTIONS. В следующем примере используется набор данных Wanderbricks . Замените <catalog>, <schema>а также <volume> каталогом, схемой и томом, где у вас есть CREATE TABLE разрешения.

SQL

CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;

COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Python

table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

Р

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
  " COPY_OPTIONS ('mergeSchema' = 'true')",
  sep = ""
))

язык программирования Scala

val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
  " COPY_OPTIONS ('mergeSchema' = 'true')"
)

Эта SQL-заявка является идемпотентной. Это означает, что вы можете запланировать его многократное выполнение, и он будет загружать только новые данные в таблицу Delta.

Примечание.

Пустая таблица Delta недоступна для использования за пределами COPY INTO. INSERT INTO и MERGE INTO не поддерживаются для записи данных в таблицы delta без схемы. После вставки данных в таблицу с COPY INTOтаблица становится запрашиваемой.

См. раздел Создание целевых таблиц для COPY INTO.

Настройка схемы и загрузка данных в таблицу Delta Lake

В следующем примере создается таблица Delta и используется COPY INTO команда SQL для загрузки примеров данных из набора данных Wanderbricks в таблицу. Исходные файлы — это JSON-файлы, хранящиеся в томе каталога Unity. Вы можете запустить пример Python, R, Scala или SQL-код из записной книжки, подключенной к кластеру Azure Databricks. Вы также можете запустить код SQL из запроса, связанного с хранилищем SQL в Databricks SQL. Замените <catalog>, <schema>а также <volume> каталогом, схемой и томом, где у вас есть CREATE TABLE разрешения.

SQL

DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;

CREATE TABLE <catalog>.<schema>.booking_updates_upload (
  booking_id BIGINT,
  user_id BIGINT,
  status STRING,
  total_amount DOUBLE
);

COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');

SELECT * FROM <catalog>.<schema>.booking_updates_upload;

Python

table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "booking_id BIGINT, " + \
  "user_id BIGINT, " + \
  "status STRING, " + \
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format + \
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)

display(booking_updates_upload_data)

Р

library(SparkR)
sparkR.session()

table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "booking_id BIGINT, ",
  "user_id BIGINT, ",
  "status STRING, ",
  "total_amount DOUBLE)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  " FORMAT_OPTIONS ('multiLine' = 'true')",
  sep = ""
))

booking_updates_upload_data = tableToDF(table_name)

display(booking_updates_upload_data)

язык программирования Scala

val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "booking_id BIGINT, " +
  "user_id BIGINT, " +
  "status STRING, " +
  "total_amount DOUBLE)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format +
  " FORMAT_OPTIONS ('multiLine' = 'true')"
)

val booking_updates_upload_data = spark.table(table_name)

display(booking_updates_upload_data)

Чтобы очистить, выполните следующий код, чтобы удалить пример таблицы.

SQL

DROP TABLE <catalog>.<schema>.booking_updates_upload

Python

spark.sql("DROP TABLE " + table_name)

Р

sql(paste("DROP TABLE ", table_name, sep = ""))

язык программирования Scala

spark.sql("DROP TABLE " + table_name)

Очистка файлов метаданных

Вы можете запустить VACUUM для очистки файлов метаданных, созданных COPY INTO в Databricks Runtime 15.2 и выше.

Дополнительные ресурсы

  • Databricks Runtime 7.x и выше: COPY INTO