Поделиться через


Upsert в таблицу Delta Lake с помощью слияния

Данные из исходной таблицы, представления или DataFrame можно передать с помощью операции upsert в целевую таблицу Delta с помощью операции SQL MERGE. Delta Lake поддерживает вставки, обновления и удаления, MERGEа также поддерживает расширенный синтаксис за пределами стандартов SQL для упрощения расширенных вариантов использования.

Предположим, у вас есть исходная таблица с именем people10mupdates или исходный путь /tmp/delta/people-10m-updates, содержащий новые данные для целевой таблицы с именем people10m или целевым путем /tmp/delta/people-10m. Некоторые из этих новых записей уже могут присутствовать в целевых данных. Чтобы объединить новые данные, необходимо обновить строки, в которых уже присутствует пользователь id , и вставить новые строки, в которых отсутствует сопоставление id . Вы можете выполнить следующий запрос:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Питон

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

язык программирования Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Внимание

Только одна строка из исходной таблицы может соответствовать заданной строке в целевой таблице. В Databricks Runtime 16.0 и более поздних версий вычисляет условия, MERGE указанные в WHEN MATCHED предложениях и ON определяющих повторяющиеся совпадения. В Databricks Runtime 15.4 LTS и ниже MERGE операции рассматривают только условия, указанные в предложении ON .

Дополнительные сведения о синтаксисе Scala и Python см. в документации по API Delta Lake. Сведения о синтаксисе SQL см. в MERGE INTO

Изменение всех несовпаденных строк с помощью слияния

В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях можно использовать WHEN NOT MATCHED BY SOURCE предложение UPDATE для или DELETE записи в целевой таблице, которые не имеют соответствующих записей в исходной таблице. Databricks рекомендует добавить необязательное условное предложение, чтобы избежать полной перезаписи целевой таблицы.

В следующем примере кода показан базовый синтаксис использования для удаления, перезапись целевой таблицы с содержимым исходной таблицы и удаление несовпаденных записей в целевой таблице. Дополнительные масштабируемые шаблоны для таблиц, в которых исходные обновления и удаления привязаны к времени, см. в разделе Добавочная синхронизация таблицы Delta с источником.

Питон

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

язык программирования Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

В следующем примере добавляются условия в WHEN NOT MATCHED BY SOURCE предложение и указываются значения для обновления в несовпаденных целевых строках.

Питон

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

язык программирования Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

Семантика операции слияния

Ниже приведено подробное описание семантики программной merge операции.

  • Может существовать любое количество предложений whenMatched и whenNotMatched.

  • Предложения whenMatched выполняются, когда исходная строка соответствует строке целевой таблицы на основе условия соответствия. Эти предложения имеют следующую семантику.

    • Предложения whenMatched могут иметь не более одного действия update и одного действия delete. Действие update обновляет merge только указанные столбцы (аналогично updateоперации) соответствующей целевой строки. Действие delete удаляет сопоставленную строку.

    • Каждое предложение whenMatched может иметь необязательное условие. Если это условие предложения существует, действие update или delete выполняется для любой соответствующей пары исходной и целевой строк, только если условие предложения истинно.

    • Если существует несколько предложений whenMatched, они вычисляются в том порядке, в котором указаны. Все предложения whenMatched, за исключением последнего, должны иметь условия.

    • Если ни одно из условий whenMatched не вычисляется как истинное для исходной и целевой пары строк, которая соответствует условию слияния, целевая строка остается неизменной.

    • Чтобы обновить все столбцы целевой таблицы Delta с соответствующими столбцами исходного набора данных, используйте whenMatched(...).updateAll(). Это соответствует следующей записи:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      для всех столбцов целевой таблицы Delta. Следовательно, это действие предполагает, что в исходной таблице есть те же столбцы, что и в целевой таблице, иначе запрос выдаст ошибку анализа.

      Примечание.

      Это поведение изменяется при включенной автоматической эволюции схемы. Дополнительные сведения см. в статье об автоматической эволюции схемы.

  • Предложения whenNotMatched выполняются, когда исходная строка не соответствует какой-либо целевой строке на основе условия соответствия. Эти предложения имеют следующую семантику.

    • Предложения whenNotMatched могут иметь только действие insert. Новая строка создается на основе указанного столбца и соответствующих выражений. Нет необходимости указывать все столбцы в целевой таблице. Для неуказанных целевых столбцов вставляется значение NULL.

    • Каждое предложение whenNotMatched может иметь необязательное условие. Если условие предложения присутствует, исходная строка вставляется, только если это условие истинно для этой строки. В противном случае исходный столбец игнорируется.

    • Если существует несколько предложений whenNotMatched, они вычисляются в том порядке, в котором указаны. Все предложения whenNotMatched, за исключением последнего, должны иметь условия.

    • Чтобы заменить все столбцы целевой таблицы Delta соответствующими столбцами исходного набора данных, используйте whenNotMatched(...).insertAll(). Это соответствует следующей записи:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      для всех столбцов целевой таблицы Delta. Следовательно, это действие предполагает, что в исходной таблице есть те же столбцы, что и в целевой таблице, иначе запрос выдаст ошибку анализа.

      Примечание.

      Это поведение изменяется при включенной автоматической эволюции схемы. Дополнительные сведения см. в статье об автоматической эволюции схемы.

  • whenNotMatchedBySource предложения выполняются, если целевая строка не соответствует исходной строке в зависимости от условия слияния. Эти предложения имеют следующую семантику.

    • whenNotMatchedBySource предложения могут указывать delete и update выполнять действия.
    • Каждое предложение whenNotMatchedBySource может иметь необязательное условие. Если условие предложения присутствует, целевая строка изменяется только в том случае, если это условие имеет значение true для этой строки. В противном случае целевая строка остается без изменений.
    • Если существует несколько предложений whenNotMatchedBySource, они вычисляются в том порядке, в котором указаны. Все предложения whenNotMatchedBySource, за исключением последнего, должны иметь условия.
    • По определению whenNotMatchedBySource предложения не имеют исходной строки для извлечения значений столбцов, поэтому на исходные столбцы нельзя ссылаться. Для каждого столбца, который необходимо изменить, можно указать литерал или выполнить действие в целевом столбце, например SET target.deleted_count = target.deleted_count + 1.

Внимание

  • Операция merge может завершиться ошибкой, если несколько строк исходного набора данных совпадают и операция слияния пытается обновить одни и те же строки целевой таблицы Delta. В соответствии с семантикой слияния SQL такая операция обновления неоднозначна, так как неясно, какую исходную строку следует использовать для обновления соответствующей целевой строки. Можно предварительно обработать исходную таблицу, чтобы исключить возможность нескольких совпадений.
  • Вы можете применить операцию SQL MERGE к представлению SQL, только если представление определено как CREATE VIEW viewName AS SELECT * FROM deltaTable.

Дедупликация данных при записи в таблицы Delta

Распространенным вариантом использования извлечения. преобразования и загрузки является сбору журналов в таблице Delta путем добавления их в таблицу. Однако часто источники могут создавать дублирующиеся записи журнала, и тогда требуются нижестоящие шаги по дедупликации. С помощью merge можно избежать вставки дублирующихся записей.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Питон

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

язык программирования Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Ява

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

Примечание.

Набор данных, содержащий новые журналы, нужно дедуплицировать внутри самого себя. По семантике SQL слияния оно сопоставляет новые данные с существующими данными в таблице и выполняет дедупликацию, но если дублирующиеся данные есть в новом наборе данных, они вставляются. Таким образом, перед слиянием в таблицу новых данных следует провести их дедупликацию.

Если вы знаете, что вы можете получить повторяющиеся записи только в течение нескольких дней, вы можете оптимизировать запрос дальше, секционируя таблицу по дате, а затем указывая диапазон дат целевой таблицы для сопоставления.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Питон

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

язык программирования Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Ява

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

Это более эффективно, чем предыдущая команда, так как она ищет дубликаты только за последние семь дней журналов, а не во всей таблице. Кроме того, вы можете использовать слияние только со вставкой со структурированной потоковой передачей для выполнения непрерывной дедупликации журналов.

  • В запросах потоковой передачи можно использовать операцию слияния в foreachBatch для непрерывной записи потоковых данных в таблицу Delta с дедупликацией. Дополнительные сведения о см. в следующем foreachBatch.
  • В другом запросе потоковой передачи вы можете постоянно считывать дедуплицированные данные из этой таблицы Delta. Это возможно, поскольку слияние только со вставкой добавляет новые данные в таблицу Delta.

Медленно меняющиеся данные (SCD) и запись измененных данных (CDC) с Delta Lake

Декларативные конвейеры Lakeflow имеют встроенную поддержку отслеживания и применения SCD Type 1 и Type 2. Используйте AUTO CDC ... INTO с деклаpативными конвейерами Lakeflow, чтобы обеспечить правильную обработку записей, не по порядку, при обработке потоков данных CDC. См. API AUTO CDC: упрощение захвата данных об изменениях с помощью декларативных конвейеров Lakeflow.

Добавочная синхронизация таблицы Delta с источником

В Databricks SQL и Databricks Runtime 12.2 LTS и более поздних версиях можно использовать WHEN NOT MATCHED BY SOURCE для создания произвольных условий для атомарного удаления и замены части таблицы. Это может быть особенно полезно, если у вас есть исходная таблица, в которой записи могут изменяться или удаляться в течение нескольких дней после начальной записи данных, но в конечном итоге урегулируются до окончательного состояния.

Следующий запрос показывает использование этого шаблона для выбора 5 дней записей из источника, обновления соответствующих записей в целевом объекте, вставки новых записей из источника в целевой объект и удаления всех несовпаденных записей за последние 5 дней в целевом объекте.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

Предоставляя тот же логический фильтр в исходных и целевых таблицах, вы можете динамически распространять изменения из источника в целевые таблицы, включая удаления.

Примечание.

Хотя этот шаблон можно использовать без каких-либо условных предложений, это приведет к полной перезаписи целевой таблицы, которая может быть дорогой.