Работа с историей таблицы

Каждая операция, которая изменяет таблицу, создает новую версию таблицы. Используйте информацию из журнала для аудита операций, отката таблицы или запроса таблицы в определенный момент времени с помощью перемещения во времени.

Примечание.

Databricks не рекомендует использовать журнал таблиц в качестве долгосрочного решения резервного копирования для архивации данных. Используйте только последние 7 дней для операций перемещения по времени, если вы не установили конфигурации хранения данных и журналов в большее значение.

Получение истории таблицы

Извлеките сведения, включая операции, пользователя и метку времени для каждой записи в таблицу, выполнив history команду. Операции возвращаются в обратном хронологическом порядке.

Хранение журнала таблиц определяется параметром logRetentionDurationтаблицы, который составляет 30 дней по умолчанию.

Примечание.

Управление перемещением во времени и историей таблиц осуществляется различными порогами удерживания. См. раздел "Что такое путешествие по времени?".

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Сведения о синтаксисе Spark SQL см. в DESCRIBE HISTORY.

Дополнительные сведения о синтаксисе Scala/Java/Python см. в документации по API Delta Lake.

Обозреватель каталогов предоставляет визуальное представление этой подробной информации о таблице и её истории. Помимо схемы таблицы и выборки данных, можно также щелкнуть вкладку История, чтобы просмотреть журнал таблицы, который отображается с DESCRIBE HISTORY.

Схема истории

Выходные данные операции history имеют указанніе ниже столбцы.

колонна Тип Описание
версия длинный Версия таблицы, созданная операцией.
timestamp отметка времени Когда версия была зафиксирована.
userId строка Идентификатор пользователя, запустившего операцию.
userName строка Имя пользователя, запустившего операцию.
Операция строка Имя операции.
параметры операции карта Параметры операции (например, предикаты).
задача Структура Сведения о задании, которое запустило операцию.
записная книжка Структура Сведения о ноутбуке, из которого выполнялась операция.
идентификатор кластера строка Идентификатор кластера, в котором выполнялась операция.
readVersion длинный Версия таблицы, которая была считана для выполнения операции записи.
Уровень изоляции строка Уровень изоляции, использованный для этой операции.
isBlindAppend булевый Производилось ли добавление данных в рамках этой операции.
operationMetrics карта Метрики операции (например, число измененных строк и файлов).
метаданные пользователя строка Указанные пользователем метаданные коммита, если они были заданы.
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Примечание.

Общие сведения partitionBy о параметрах операции

Поле partitionBy имеет смысл только для операций CREATE и OVERWRITE, определяющих или изменяющих схему секционирования таблицы.

Для операций добавления к существующим таблицам (APPEND, INSERT, UPDATEDELETE, MERGE) это поле может отображать пустой массив [] или столбцы секций в зависимости от используемого метода записи (.save() vs .saveAsTable()). Эта несогласованность является ожидаемым состоянием и не должна использоваться для проверки записей.

Внимание

Не полагайтесь на partitionBy историю для проверки операций добавления. Значение зависит от сведений о реализации, но не влияет на то, как данные записываются в разделы.

Example

Рассмотрим таблицу, секционированную по столбцу date :

# Initial table creation - partitionBy is populated
df.write.format("delta") \
  .partitionBy("date") \
  .saveAsTable("sales_data")

Операция CREATE в журнале показывает:

operationParameters: {
  "mode": "ErrorIfExists",
  "partitionBy": "[\"date\"]"
}

При добавлении данных в эту таблицу:

# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
  .mode("append") \
  .saveAsTable("sales_data")

Операция APPEND показывает:

operationParameters: {
  "mode": "Append",
  "partitionBy": "[]"
}

Ожидается пустое partitionBy значение. Данные по-прежнему записываются в правильные секции на основе существующей схемы секционирования таблицы. Обратите внимание, что .save() в пути могут отображаться столбцы секций в этом поле, но это различие является подробной реализацией и не влияет на поведение записи.

Метрики операций

Операция history возвращает коллекцию метрик операций в карте столбцов operationMetrics.

В следующих таблицах перечислены определения ключей карты по операциям.

Операция Имя метрики Описание
WRITE, ASCREATE TABLE, SELECT REPLACE TABLE AS SELECT,COPY INTO
количествоФайлов Число записанных файлов.
количество выходных байтов Размер записанного содержимого в байтах.
КоличествоВыходныхСтрок Число записанных строк.
ПОТОК UPDATE
количествоДобавленныхФайлов Число добавленных файлов
количествоУдаленныхФайлов Число удаленных файлов.
КоличествоВыходныхСтрок Число записанных строк.
количество выходных байтов Размер записанных данных в байтах.
Удалить
количествоДобавленныхФайлов Число добавленных файлов Не указывается при удалении секций таблицы.
количествоУдаленныхФайлов Число удаленных файлов.
numDeletedRows (числоУдаленныхСтрок) Число удаленных строк. Не указывается при удалении секций таблицы.
количествоСкопированныхСтрок Число строк, скопированных в процессе удаления файлов.
executionTimeMs (время выполнения в мс) Время, затраченное на выполнение всей операции.
scanTimeMs Время, затраченное на сканирование файлов на соответствие.
перезаписьTimeMs Время, затраченное на перезапись сопоставленных файлов.
УСЕКАТЬ
количествоУдаленныхФайлов Число удаленных файлов.
executionTimeMs (время выполнения в мс) Время, затраченное на выполнение всей операции.
Слияние
ЧислоИсходныхСтрок Число строк в исходном DataFrame.
numTargetRowsInserted Число строк, вставленных в целевую таблицу.
количествоОбновленныхСтрокЦели Число строк, обновленных в целевой таблице.
numTargetRowsDeleted Число строк, удаленных в целевой таблице.
КоличествоСкопированныхЦелевыхСтрок Число скопированных целевых строк.
КоличествоВыходныхСтрок Число выведенных строк.
КоличествоДобавленныхЦелевыхФайлов Число файлов, добавленных в приемник (целевой объект).
numTargetFilesRemoved Число файлов, удаленных из приемника (целевого объекта).
executionTimeMs (время выполнения в мс) Время, затраченное на выполнение всей операции.
scanTimeMs Время, затраченное на сканирование файлов на соответствие.
перезаписьTimeMs Время, затраченное на перезапись сопоставленных файлов.
UPDATE
количествоДобавленныхФайлов Число добавленных файлов
количествоУдаленныхФайлов Число удаленных файлов.
количествоОбновленныхСтрок Число обновленных строк.
количествоСкопированныхСтрок Число строк, только что скопированных в процессе обновления файлов.
executionTimeMs (время выполнения в мс) Время, затраченное на выполнение всей операции.
scanTimeMs Время, затраченное на сканирование файлов на соответствие.
перезаписьTimeMs Время, затраченное на перезапись сопоставленных файлов.
FSCK (проверка целостности файловой системы) количествоУдаленныхФайлов Число удаленных файлов.
Конвертировать numConvertedFiles Число преобразованных файлов Parquet.
OPTIMIZE
количествоДобавленныхФайлов Число добавленных файлов
количествоУдаленныхФайлов Число оптимизированных файлов.
КоличествоДобавленныхБайтов Число байтов, добавленных после оптимизации таблицы.
числоУдалённыхБайт Число удаленніх байтов.
Мин.размерФайла Размер наименьшего файла после оптимизации таблицы.
p25FileSize Размер файла 25-го процентиля после оптимизации таблицы.
p50FileSize Медианный размер файла после оптимизации таблицы.
p75FileSize Размер файла 75-го процентиля после оптимизации таблицы.
максимальный размер файла Размер наибольшего файла после оптимизации таблицы.
CLONE;
размерИсходнойТаблицы Размер исходной таблицы в байтах на момент клонирования версии.
источникЧислоФайлов Количество файлов в исходной таблице в клонированных версиях.
количествоУдаленныхФайлов Количество файлов, удаленных из целевой таблицы, если предыдущая таблица была заменена.
размерУдалённыхФайлов Общий размер в байтах файлов, удаленных из целевой таблицы, если предыдущая таблица была заменена.
Количество скопированных файлов Число файлов, которые были скопированы в новое расположение. 0 для поверхностных клонов.
размерСкопированныхФайлов Общий размер файлов в байтах, которые были скопированы в новое расположение. 0 для поверхностных клонов.
RESTORE
размерТаблицыПослеВосстановления Размер таблицы в байтах после восстановления.
КоличествоФайловПослеВосстановления Число файлов в таблице после восстановления.
количествоУдаленныхФайлов Число файлов, удаленных операцией восстановления.
количество восстановленных файлов Число файлов, добавленных в результате восстановления.
размерУдалённыхФайлов Размер в байтах файлов, удаленных при восстановлении.
размер восстановленных файлов Размер в байтах файлов, добавленных при восстановлении.
VACUUM
количество удаленных файлов Число удаленных файлов.
numVacuumedDirectories Число вакуумированных каталогов.
КоличествоФайловНаУдаление numFilesToDelete Число удаляемых файлов.

Что такое путешествие по времени?

Путешествие во времени поддерживает выполнение запросов к предыдущим версиям таблиц на основе временной метки или версии таблицы (как это записано в журнале транзакций). Для таких приложений можно использовать поездку по времени:

  • Воссоздание анализа, отчетов или выходных данных (например, выходных данных модели машинного обучения). Это может быть полезно для отладки или аудита, особенно в регулируемых отраслях.
  • Написание сложных темпоральных запросов.
  • Устранение ошибок в данных.
  • Обеспечение изоляции моментальных снимков для набора запросов при работе с таблицами с частыми изменениями.

Внимание

В Databricks Runtime 18.0 и более поздних версиях запросы с временной привязкой блокируются, если запрашивается версия, которая раньше deletedFileRetentionDuration свойства таблицы (по умолчанию 7 дней). Для управляемых таблиц каталога Unity это относится к Databricks Runtime 12.2 и выше.

Синтаксис перемещения по времени

Чтобы запросить таблицу со временем, добавьте предложение после спецификации имени таблицы.

  • timestamp_expression может быть одним из следующих вариантов:
    • '2018-10-18T22:15:12.013Z', то есть строка, которая может приводиться к метке времени
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', то есть строка даты.
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Любое другое выражение, которое является меткой времени или может быть приведено к ней
  • version — это длинное значение, которое можно получить из выходных данных DESCRIBE HISTORY table_spec.

Ни timestamp_expression, ни version не может быть подзапросом.

Принимаются только строки метки даты или времени. Например, "2019-01-01" и "2019-01-01T00:00:00.000Z". См. следующий код для примера синтаксиса:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Питон

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

Можно также использовать @ синтаксис, чтобы указать метку времени или версию в составе имени таблицы. Метка времени должна быть указана в формате yyyyMMddHHmmssSSS. Вы можете указать версию после @, добавив к версии v. Посмотрите следующий код, чтобы ознакомиться с примером синтаксиса:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Питон

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Что такое контрольные точки журнала транзакций?

Версии таблиц записываются в виде JSON-файлов в каталоге журнала транзакций, который хранится вместе с данными таблицы. Для оптимизации запросов к проверочным точкам версии таблиц объединяются в файлы контрольных точек Parquet, что предотвращает необходимость считывания всех версий истории таблиц в формате JSON. Azure Databricks оптимизирует частоту контрольных точек для размера данных и рабочей нагрузки. Пользователям не нужно взаимодействовать с контрольными точками напрямую. Частота контрольных точек подлежит изменению без уведомления.

Настройка хранения данных для запросов на поездки по времени

Чтобы запросить предыдущую версию таблицы, необходимо сохранить журнал и файлы данных для этой версии.

Файлы данных удаляются, когда VACUUM запускается для таблицы. Удаление файла журнала управляется автоматически после контрольных версий таблицы.

Так как большинство таблиц регулярно обрабатываются с использованием VACUUM, запросы на определенный момент времени должны учитывать порог хранения данных для VACUUM, который по умолчанию составляет 7 дней.

Чтобы увеличить порог хранения данных для таблиц, необходимо настроить следующие свойства таблицы:

  • delta.logRetentionDuration = "interval <interval>": управляет продолжительностью хранения истории для таблицы. Значение по умолчанию — interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": определяет пороговое значение VACUUM , используемое для удаления файлов данных, на которые больше не ссылается текущая версия таблицы. Значение по умолчанию — interval 7 days.

Вы можете указать свойства таблицы во время создания таблицы или задать их с помощью инструкции ALTER TABLE . См. справочник по свойствам таблицы.

Примечание.

В Databricks Runtime 18.0 и более поздних версиях logRetentionDuration должно быть больше или равно deletedFileRetentionDuration. Для управляемых таблиц каталога Unity это относится к Databricks Runtime 12.2 и выше.

Чтобы получить доступ к 30 дням исторических данных, установите delta.deletedFileRetentionDuration = "interval 30 days", который соответствует параметру по умолчанию для delta.logRetentionDuration.

Увеличение порогового значения хранения данных может привести к увеличению затрат на хранение, так как сохраняются дополнительные файлы данных.

Восстановление таблицы в более раннее состояние

Вы можете восстановить таблицу в более раннее RESTORE состояние с помощью команды. Таблицы внутренне поддерживают исторические версии, позволяющие восстановить их в более раннем состоянии. В качестве параметров команды RESTORE поддерживаются версия, соответствующая предыдущему состоянию, или метка времени, когда было создано предыдущее состояние.

Внимание

  • Вы можете восстановить уже восстановленную таблицу.
  • Вы можете восстановить клонированную таблицу.
  • Требуется разрешение MODIFY для восстанавливаемой таблицы.
  • Нельзя восстановить таблицу до более старой версии, в которой файлы данных были удалены вручную или с помощью vacuum. Восстановление до этой версии по-прежнему возможно, если для spark.sql.files.ignoreMissingFiles задано значение true.
  • Формат метки времени для восстановления в более раннем состоянии — yyyy-MM-dd HH:mm:ss. Также поддерживается предоставление только строки даты (yyyy-MM-dd).
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Сведения о синтаксисе см. в RESTORE.

Внимание

Восстановление считается операцией изменения данных. Записи журнала, добавленные командой RESTORE, содержат dataChange, установленный в значение true. Если существует нижестоящее приложение, например задание структурированной потоковой передачи , которое обрабатывает обновления таблицы, записи журнала изменений данных, добавленные операцией восстановления, считаются новыми обновлениями данных, а обработка их может привести к дублированию данных.

Например:

Версия таблицы Операция Обновления журналов Записи в обновлениях журнала изменений данных
0 INSERT AddFile(/path/to/file-1, dataChange = true) (имя = Виктор, возраст = 29, (имя = Джордж, возраст = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (имя = Джордж, возраст = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (Нет записей, так как оптимизация сжатия не изменяет данные в таблице.)
3 RESTORE(версия=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (имя = Виктор, возраст = 29), (имя = Джордж, возраст = 55), (имя = Джордж, возраст = 39)

В предыдущем примере RESTORE команда приводит к обновлениям, которые уже были замечены при чтении таблицы версии 0 и 1. Если запрос потоковой передачи выполнял чтение этой таблицы, эти файлы будут считаться новыми добавленными данными и будут обработаны снова.

Восстановить метрики

RESTORE передает следующие метрики в виде DataFrame из одной строки после завершения операции:

  • table_size_after_restore — размер таблицы после восстановления;

  • num_of_files_after_restore — число файлов в таблице после восстановления;

  • num_removed_files — число файлов, удаленных (логически удаленных) из таблицы;

  • num_restored_files — число файлов, восстановленных из-за отката;

  • removed_files_size — общий размер в байтах файлов, удаленных из таблицы;

  • restored_files_size — общий размер восстановленных файлов в байтах.

    Пример восстановления метрик

Примеры использования путешествия по времени

  • Исправление случайных удалений в таблице для пользователя 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Исправление случайного неправильного обновления таблицы:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Запросите количество новых клиентов, добавленных за последнюю неделю.

    SELECT
    (
      SELECT count(distinct userId)
      FROM my_table
    )
    -
    (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
    ) AS new_customers
    

Как найти номер версии последнего коммита в сеансе Spark?

Чтобы получить номер версии последней фиксации, записанной текущим SparkSession, для всех потоков и всех таблиц, выполните запрос конфигурации SQL spark.databricks.delta.lastCommitVersionInSession.

Примечание.

Для таблиц Apache Iceberg используйте spark.databricks.iceberg.lastCommitVersionInSession вместо spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Питон

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

язык программирования Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Если SparkSession не сделал коммиты, запрос ключа возвращает пустое значение.

Примечание.

При совместном использовании одного и того же SparkSession между несколькими потоками, это похоже на совместное использование переменной между несколькими потоками; вы можете столкнуться с условиями гонки, так как значение конфигурации обновляется одновременно.