Проведение обновления конвейера

В этой статье описываются обновления конвейера и содержатся сведения о том, как активировать обновление.

Что такое обновление конвейера?

После создания конвейера и подготовки к его запуску вы начинаете обновление . Обновление конвейера выполняет следующие действия:

  • Запускает кластер с правильной конфигурацией.
  • Обнаруживает все определенные таблицы и представления и проверяет наличие ошибок анализа, таких как недопустимые имена столбцов, отсутствующие зависимости и синтаксические ошибки.
  • Создает или обновляет таблицы и представления с самыми последними доступными данными.

С помощью сухого запуска можно проверить наличие проблем в исходном коде конвейера, не ожидая создания или обновления таблиц. Эта функция полезна при разработке или тестировании конвейеров, так как она позволяет быстро находить и устранять ошибки в конвейере, например неправильные имена таблиц или столбцов.

Как инициируются обновления конвейера?

Чтобы запустить обновления конвейера, используйте один из следующих параметров:

Триггер обновления Сведения
Руководство Вы можете вручную активировать обновления конвейеров из редактора конвейеров Lakeflow или списка конвейеров. См. раздел "Запуск обновления конвейера вручную".
Scheduled Вы можете запланировать обновления для потоков данных с помощью заданий. См. задачу конвейера для заданий.
Programmatic Обновления можно активировать программным способом с помощью сторонних средств, API и clIs. См. "Запуск конвейеров в рабочем процессе" и конвейерный REST API.

вручную запустить обновление конвейера

Используйте один из следующих параметров, чтобы вручную активировать обновление конвейера:

  • Запустите полный конвейер или подмножество конвейера (один исходный файл или отдельную таблицу) из редактора Pipelines Lakeflow. Дополнительные сведения см. в разделе "Выполнение кода конвейера".
  • Запустите полный конвейер из списка заданий и конвейеров . Щелкните значок воспроизведения. В той же строке, где конвейер в списке.
  • На странице мониторинга конвейера нажмите кнопку ".

Замечание

Поведение по умолчанию для обновлений конвейера вручную заключается в обновлении всех наборов данных, определенных в конвейере.

семантика обновления конвейера

В следующей таблице описываются стандартное обновление, полное обновление и поведение контрольных точек сброса для материализованных представлений и потоковых таблиц.

Тип обновления Материализованное представление Потоковая таблица
Обновление (по умолчанию) Обновляет результаты, отражающие текущие результаты для определяющего запроса. Он проверяет затраты и выполняет добавочное обновление, если это более экономично. См. инкрементальное обновление для материализованных представлений Обрабатывает новые записи с помощью логики, определенной в поточных таблицах и потоках обработки данных.
Полное обновление Обновляет результаты, отражающие текущие результаты для определяющего запроса. Очищает данные из таблиц потоковой передачи, очищает сведения о состоянии (контрольные точки) из потоков и повторно обрабатывает все записи из источника данных. Полный просмотр обновлений для потоковых таблиц
Сброс контрольных точек потока передачи данных Неприменимо к материализованным представлениям. Очищает информацию о состоянии (контрольные точки) из потоков, но не очищает данные из потоковых таблиц и повторно обрабатывает все записи из источника данных.

По умолчанию все материализованные представления и потоковые таблицы в конвейере обновляются при каждом обновлении. Кроме того, вы можете исключить таблицы из обновлений с помощью следующих функций:

Обе эти функции поддерживают семантику обновления по умолчанию или полное обновление. При необходимости можно использовать диалоговое окно Выбор таблиц для обновления, чтобы исключить дополнительные таблицы при запуске обновления для неудачных таблиц.

Для потоковых таблиц вы можете выбрать очистку контрольных точек потоков для выбранных потоков и не очищать данные из соответствующих потоковых таблиц. Чтобы очистить контрольные точки для выбранных потоков, используйте REST API Databricks для запуска обновления. См. Начать обновление конвейера для очистки контрольных точек избирательных потоков трансляции.

Следует ли использовать полное обновление?

Databricks рекомендует выполнять полные обновления только при необходимости. Полное обновление всегда повторно обрабатывает все записи из указанных источников данных с помощью логики, которая определяет набор данных. Время и ресурсы для завершения полного обновления коррелируются с размером исходных данных.

Материализованные представления возвращают те же результаты, применяется ли обновление по умолчанию или полное обновление. Использование полного обновления с потоковыми таблицами сбрасывает все данные о состоянии и информацию контрольных точек и может привести к потере записей, если входные данные больше не доступны. Полный просмотр обновлений для потоковых таблиц

Databricks рекомендует только полное обновление, если входные источники данных содержат данные, необходимые для воссоздания требуемого состояния таблицы или представления. Рассмотрим следующие сценарии, в которых входные исходные данные больше не доступны и результат выполнения полного обновления:

Источник данных Причина отсутствия входных данных Результат полного обновления
Kafka Краткий порог хранения Записи, которые больше не присутствуют в источнике Kafka, удаляются из целевой таблицы.
Файлы в хранилище объектов Политика жизненного цикла Файлы данных больше не присутствуют в исходном каталоге, удаляются из целевой таблицы.
Записи в таблице Удалено для соответствия требованиям Обрабатываются только записи, присутствующих в исходной таблице.

Чтобы предотвратить выполнение полных обновлений в таблице или представлении, задайте для свойства таблицы pipelines.reset.allowed значение false. См. свойства таблицы конвейера. Кроме того, можно использовать поток добавления для добавления данных в существующую потоковую таблицу без полного обновления.

Запуск обновления конвейера для выбранных таблиц

При необходимости можно повторно обрабатывать данные только для выбранных таблиц в конвейере. Например, во время разработки вы изменяете только одну таблицу и хотите сократить время тестирования, или обновление конвейера завершается ошибкой, и вы хотите обновить только таблицы, в которых произошла ошибка .

Редактор Конвейеров Lakeflow имеет параметры для повторной обработки исходного файла, выбранных таблиц или одной таблицы. Дополнительные сведения см. в разделе Выполнение кода конвейера.

Запустите обновление конвейера для неудачных таблиц.

Если обновление конвейера завершается сбоем из-за ошибок в одной или нескольких таблицах в графе конвейера, можно запустить обновление только неудачных таблиц и всех подчиненных зависимостей.

Замечание

Исключенные таблицы не обновляются, даже если они зависят от неудачной таблицы.

Чтобы обновить неудачные таблицы, на странице мониторинга конвейера нажмите кнопку "Обновить неудачные таблицы".

Чтобы обновить только выбранные таблицы с ошибками на странице мониторинга конвейера:

  1. Нажмите кнопку рядом с кнопкой Обновить неудачные таблицы и нажмите Выбрать таблицы для обновления. Появляется диалоговое окно Выберите таблицы для обновления.

  2. Чтобы выбрать таблицы для обновления, щелкните каждую таблицу. Выбранные таблицы выделены и помечены. Чтобы удалить таблицу из обновления, снова щелкните таблицу.

  3. Нажмите Обновить выбор.

    Замечание

    Кнопка Обновить выбор отображает количество выбранных таблиц в скобках.

Чтобы повторно обработать данные, уже загруженные для выбранных таблиц, нажмите Blue Down Caret рядом с кнопкой Обновить выбор и выберите Полное обновление выбора.

Запустите обновление конвейера, чтобы сбросить контрольные точки выбранных потоков.

Вы можете повторно обработать данные для выбранных потоков данных в вашей конвейере без удаления уже полученных данных.

Замечание

Потоки, которые не выбраны, запускаются с помощью обновления REFRESH. Вы также можете указать full_refresh_selection или refresh_selection, чтобы выборочно обновить другие таблицы.

Чтобы запустить обновление для обновления выбранных контрольных точек потоковой передачи, используйте запрос updates в REST API Декларативных конвейеров Spark Lakeflow.

Параметр reset_checkpoint_selection принимает список имен потоков. Необходимо использовать имя потока так, как оно указано в графе конвейера.

  • Если вы определили поток с явным именем (например, с помощью flow_name параметра в create_auto_cdc_flow), используйте это имя.
  • Если не задано явное имя потока, имя потока по умолчанию — это полное имя таблицы в catalog.schema.table формате. Использование только имени таблицы (например, gold вместо my_catalog.my_schema.gold) приводит к сбою обновления конвейера с IllegalArgumentException.

Имена потоков можно найти в пользовательском интерфейсе конвейера или в журналах событий конвейера.

Следующий пример использует команду curl, чтобы вызвать запрос updates для запуска обновления конвейера.

curl -X POST \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"reset_checkpoint_selection": ["my_catalog.my_schema.my_streaming_table"]
}' \
https://<your-databricks-instance>/api/2.0/pipelines/<your-pipeline-id>/updates

В следующем примере сбрасывается контрольная точка для потока с пользовательским именем:

curl -X POST \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"reset_checkpoint_selection": ["my_custom_flow_name"]
}' \
https://<your-databricks-instance>/api/2.0/pipelines/<your-pipeline-id>/updates

Проверить конвейер ошибок без ожидания обновления таблиц

Это важно

Функция конвейера Dry run доступна в общедоступной предварительной версии.

Чтобы проверить, является ли исходный код конвейера допустимым без выполнения полного обновления, используйте сухой запуск. Сухой запуск разрешает определения наборов данных и потоков, определенных в конвейере, но не материализует или не публикует наборы данных. Ошибки, обнаруженные во время сухого запуска, например неправильные имена таблиц или столбцов, сообщаются в пользовательском интерфейсе.

Чтобы начать сухой запуск, щелкните синий вниз указывающий значок на странице сведений о конвейере рядом с "Пуск" и выберите "Сухой запуск".

После завершения тестового запуска все ошибки отображаются в панели событий в нижней части экрана. Щелкнув область событий, вы увидите все проблемы, обнаруженные на нижней панели. Кроме того, в журнале событий отображаются события, связанные только с тестовым запуском, и в DAG не отображаются показатели. Если обнаружены ошибки, сведения доступны в журнале событий.

Результаты можно просмотреть только для последнего сухого запуска. Если сухой запуск был последним обновлением, вы можете просмотреть результаты, выбрав его в журнале обновлений. Результаты больше не доступны в пользовательском интерфейсе, если после тестового запуска выполняется другое обновление.

Обновление поведения выполнения

Поведение обновления конвейера определяется тем, как вы его запускаете:

  • Обновления, активированные из пользовательского интерфейса мониторинга конвейера с помощью Запустить сейчас быстро запускаются с прицелом на отладку.
  • Обновления, активированные из заданий, API конвейеров или непрерывных конвейеров , используют автоматическое повторение и поведение перезапуска.

Для триггерных конвейеров можно переопределить поведение по умолчанию для определенного запуска, выбрав команду "Выполнить сейчас" с разными параметрами в раскрывающемся списке в редакторе конвейеров Lakeflow или на странице мониторинга конвейера.

Быстрое начало, поведение, ориентированное на отладку

Используется для немедленного запуска и произвольных обновлений. Эти запуски оптимизированы для быстрой итерации:

  • Повторно использует кластер, чтобы избежать затрат на перезапуски. По умолчанию кластеры выполняются в течение двух часов. Это можно изменить с помощью параметра pipelines.clusterShutdown.delay в Настройке классической вычислительной системы для потоков.
  • Отключает повторные попытки выполнения конвейера, чтобы можно было немедленно обнаружить и устранить ошибки.

Автоматическое повторение и перезапуск поведения

Используется для заданий, обновлений с триггерами API и непрерывных конвейеров. Эти запуски определяют приоритет надежности и экономичности:

  • Перезапускает кластер для определенных ошибок, которые можно восстановить, включая утечки памяти и устаревшие учетные данные.
  • Повторно выполняет команду в случае возникновения специфических ошибок, таких как сбой при запуске кластера.
  • Кластер завершает работу сразу после завершения выполнения.

Замечание

Параметры запуска управляют исключительно выполнением кластера и конвейера. Расположения хранилища и целевые схемы в каталоге для таблиц публикации должны быть настроены как часть параметров конвейера и не влияют на поведение выполнения.