Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье описывается использование встроенных функций мониторинга и наблюдаемости для декларативных конвейеров Lakeflow. Такие функции поддерживают такие задачи, как:
- Наблюдение за ходом и состоянием обновлений в конвейере. См. Какие сведения о конвейере доступны в пользовательском интерфейсе?.
- Оповещение о конвейерных событиях, таких как успех или сбой обновлений конвейера. См. Добавьте уведомления по электронной почте для событий работы конвейера.
- Просмотр метрик для источников потоковой передачи, таких как Apache Kafka и автозагрузчик (общедоступная предварительная версия). См. просмотр метрик потоковой передачи.
- Извлечение подробной информации об обновлениях в конвейерах, таких как происхождение данных, метрики качества данных и использование ресурсов. См. "Что такое журнал событий для Lakeflow Declarative Pipelines?".
- Определение пользовательских действий, выполняемых при возникновении определенных событий. См. раздел "Определение пользовательского мониторинга декларативных конвейеров Lakeflow с помощью перехватчиков событий".
Сведения о проверке и диагностике производительности запросов см. в журнале запросов Access для декларативных конвейеров Lakeflow. Эта функция доступна в общедоступной предварительной версии.
Добавление уведомлений по электронной почте для событий конвейера
Вы можете настроить один или несколько адресов электронной почты для получения уведомлений, когда происходит следующее:
- Обновление конвейера успешно завершено.
- Обновление конвейера завершается сбоем либо с ошибкой, допускающей повторные попытки, либо с ошибкой, которую нельзя повторить. Выберите этот параметр, чтобы получить уведомление обо всех сбоях конвейера.
- Обновление конвейера завершается неустранимой (фатальной) ошибкой. Выберите этот параметр, чтобы получить уведомление только в том случае, если возникает ошибка без повторных попыток.
- Сбой одного потока данных.
Чтобы настроить уведомления по электронной почте при создании или изменении конвейера:
- Щелкните Добавить уведомление.
- Введите один или несколько адресов электронной почты для получения уведомлений.
- Установите флажок для каждого типа уведомления, чтобы отправить на настроенные адреса электронной почты.
- Щелкните Добавить уведомление.
Просмотр конвейеров в пользовательском интерфейсе
Декларативные конвейеры Lakeflow можно найти в параметрах конвейеров или рабочих процессов на боковой панели рабочей области. Откроется страница "Рабочие процессы " на вкладке "Задания и конвейеры" , где можно просматривать сведения о каждом задании и конвейере, к чему у вас есть доступ. Щелкните имя конвейера, чтобы открыть страницу сведений о конвейере.
Использование списка заданий и конвейеров
Чтобы просмотреть список конвейеров, к которым у вас есть доступ, кликните Рабочие процессы или
Конвейеры на боковой панели. Вкладка "Задания и конвейеры " в пользовательском интерфейсе заданий Lakeflow содержит сведения обо всех доступных заданиях и конвейерах, таких как создатель рабочего процесса, триггер рабочего процесса, если он есть, и результат последних пяти запусков.
Чтобы изменить столбцы, отображаемые в списке, щелкните и выберите или отмените выбор столбцов.
Важный
Список объединенных заданий и конвейеров находится в общедоступной предварительной версии. Вы можете отключить функцию и вернуться к интерфейсу по умолчанию, отключив задания и конвейеры: унифицированное управление, поиск и фильтрацию. Дополнительные сведения см. в статье "Управление предварительными версиями Azure Databricks ".
Вы можете фильтровать задания в списке заданий и конвейеров , как показано на следующем снимке экрана.
-
Поиск текста: поиск ключевых слов поддерживается для полей "Имя и идентификатор ". Чтобы найти тег, созданный с ключом и значением, можно выполнить поиск по ключу, по значению или по ключу и значению. Например, для тега с ключом
department
и значениемfinance
соответствующие задания можно найти поdepartment
илиfinance
. Чтобы выполнить поиск по ключу и значению, введите ключ и значение, разделенные двоеточием (например,department:finance
). - Тип: фильтрация по заданиям, конвейерам или всем. Если выбрать потоки данных, можно также фильтровать по типу потока данных, который включает потоки данных ETL и ввода данных.
- Владелец: отображение только собственных заданий.
- Избранное: отображение заданий, помеченных как избранное.
- Теги: использование тегов. Для поиска по тегу можно использовать раскрывающееся меню тегов, чтобы отфильтровать до пяти тегов одновременно или напрямую использовать поиск по ключевым словам.
-
Запуск от имени: фильтрация по двум
run as
значениям.
Чтобы запустить задание или конвейер, нажмите значок . Чтобы остановить задание или поток, нажмите кнопку с иконкой
. Чтобы получить доступ к другим действиям, щелкните значок меню
. Например, можно удалить задание или конвейер или параметры доступа для конвейера из этого меню.
Какие сведения о конвейере доступны в пользовательском интерфейсе?
График конвейера отображается сразу после успешного запуска обновления конвейера. Стрелки представляют зависимости между наборами данных в конвейере. По умолчанию на странице сведений о конвейере отображается последнее обновление таблицы, но в раскрывающемся меню можно выбрать старые обновления.
Сведения включают идентификатор конвейера, исходный код, затраты на вычисления, выпуск продукта и канал, настроенный для конвейера.
Чтобы просмотреть табличное представление наборов данных, перейдите на вкладку List. Представление List позволяет просматривать все наборы данных в вашем конвейере, представленные в виде строки в таблице, и полезно, если DAG конвейера слишком велик для визуализации в представлении Graph. Вы можете управлять наборами данных, отображаемыми в таблице, с помощью нескольких фильтров, таких как имя набора данных, тип и состояние. Чтобы вернуться к визуализации DAG, щелкните Graph.
Пользователь запуска от имени является владельцем потока данных, и обновления потока выполняются с разрешениями этого пользователя. Чтобы изменить пользователя run as
, сначала щелкните Разрешения, а затем измените владельца конвейера.
Как просмотреть сведения о наборе данных?
Щелкнув набор данных в графе конвейера или списке наборов данных, отображаются сведения о наборе данных. Сведения включают схему набора данных, метрики качества данных и ссылку на исходный код, определяющий набор данных.
История обновлений
Чтобы просмотреть журнал и состояние обновлений конвейера, щелкните раскрывающееся меню журнала обновлений в верхней строке.
Выберите обновление в раскрывающемся меню, чтобы увидеть его график, подробности и события. Чтобы вернуться к последнему обновлению, щелкните Показать последнее обновление.
Просмотр метрик потокового вещания
Важный
Наблюдаемость потоковой передачи для декларативных конвейеров Lakeflow доступна в общедоступной предварительной версии.
Вы можете просматривать метрики потоковой передачи из источников данных, поддерживаемых Spark Structured Streaming, таких как Apache Kafka, Amazon Kinesis, Auto Loader и Delta tables, для каждого потока в ваших декларативных конвейерах Lakeflow. Метрики отображаются в виде диаграмм в правой области пользовательского интерфейса Lakeflow Pipelines и включают в себя секунды невыполненной работы, байты невыполненной работы, записи невыполненной работы и файлы невыполненной работы. На диаграммах отображается максимальное значение, агрегированное по минуте, а подсказка отображает максимальные значения при наведении указателя мыши на диаграмму. Данные ограничены последними 48 часами с текущего времени.
Таблицы в вашем конвейере, для которых доступны стриминговые метрики, отображают значок при просмотре структуры DAG конвейера в графическом режиме в пользовательском интерфейсе. Чтобы просмотреть метрики потоковой передачи, щелкните значок
, чтобы отобразить диаграмму метрик потоковой передачи на вкладке потоков на правой панели. Вы также можете применить фильтр для просмотра только таблиц с метриками потоковой передачи, щелкнув List, а затем щелкнув Имеет метрики потоковой передачи.
Каждый источник потоковой передачи поддерживает только определенные метрики. Метрики, не поддерживаемые источником потоковой передачи, недоступны для просмотра в пользовательском интерфейсе. В следующей таблице показаны метрики, доступные для поддерживаемых источников потоковой передачи:
источник | невыполненные байты | записи невыполненной работы | секунды невыполненной работы | файлы невыполненной работы |
---|---|---|---|---|
Кафка | ✓ | ✓ | ||
кинетика | ✓ | ✓ | ||
Дельта | ✓ | ✓ | ||
Автозагрузчик | ✓ | ✓ | ||
Google Pub/Sub | ✓ | ✓ |
Что такое журнал событий декларативных конвейеров Lakeflow?
Журнал событий декларативных конвейеров Lakeflow содержит всю информацию, связанную с конвейером, включая аудиторские журналы, проверки качества данных, ход выполнения конвейера и происхождение данных. Журнал событий можно использовать для отслеживания, понимания и мониторинга состояния конвейеров данных.
Вы можете просматривать записи журнала событий в пользовательском интерфейсе Декларативных конвейеров Lakeflow, API декларативных конвейеров Lakeflow или напрямую запрашивая журнал событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.
Можно также определить пользовательские действия для выполнения при регистрации событий, например, отправки оповещений, используя перехватчики событий.
Важный
Не удаляйте журнал событий или родительский каталог или схему, в которой публикуется журнал событий. Удаление журнала событий может привести к сбою обновления конвейера во время будущих запусков.
схема журнала событий
В следующей таблице описывается схема журнала событий. Некоторые из этих полей содержат данные JSON, требующие синтаксического анализа для выполнения некоторых запросов, таких как поле details
. Azure Databricks поддерживает оператор :
для анализа полей JSON. См. :
(знак двоеточия) оператор.
Поле | Описание |
---|---|
id |
Уникальный идентификатор записи журнала событий. |
sequence |
Документ JSON, содержащий метаданные для идентификации и упорядочивания событий. |
origin |
Документ JSON, содержащий метаданные для источника события, например поставщик облачных служб, регион поставщика облачных служб, user_id , pipeline_id или pipeline_type , чтобы показать, где был создан конвейер, либо DBSQL или WORKSPACE . |
timestamp |
Время записи события. |
message |
Сообщение, доступное для чтения человеком, описывающее событие. |
level |
Тип события, например INFO , WARN , ERROR или METRICS . |
maturity_level |
Стабильность схемы событий. Возможные значения:
|
error |
Если произошла ошибка, будут предоставлены её подробности. |
details |
Документ JSON, содержащий структурированные сведения о событии. Это основное поле, используемое для анализа событий. |
event_type |
Тип события. |
запрос журнала событий
Заметка
В этом разделе описывается поведение и синтаксис по умолчанию для работы с журналами событий для конвейеров, настроенных с каталогом Unity и режимом публикации по умолчанию.
- Сведения о поведении конвейеров каталога Unity, использующих устаревший режим публикации, см. в статье Работа с журналом событий для конвейеров устаревшей публикации каталога Unity.
- Сведения о поведении и синтаксисе конвейеров хранилища метаданных Hive см. в статье Работа с журналом событий для конвейеров хранилища метаданных Hive.
По умолчанию декларативный конвейер Lakeflow записывает журнал событий в скрытую таблицу Delta в каталоге и схеме по умолчанию, настроенных для конвейера. Несмотря на скрытие, таблица по-прежнему может запрашиваться всеми достаточно привилегированными пользователями. По умолчанию только владелец конвейера может запрашивать таблицу журнала событий.
По умолчанию имя скрытого журнала событий отформатировано как event_log_{pipeline_id}
, где идентификатор конвейера — это UUID, назначенный системой, в котором дефисы заменены на подчеркивания.
Вы можете взаимодействовать с конфигурацией JSON для публикации журнала событий. При публикации журнала событий укажите имя журнала событий и при необходимости укажите каталог и схему, как показано в следующем примере:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
Расположение журнала событий также служит местоположением схемы для любых запросов Auto Loader в потоке данных. Databricks рекомендует создать представление по таблице журнала событий перед изменением привилегий, так как некоторые параметры вычислений могут позволить пользователям получить доступ к метаданным схемы, если таблица журнала событий предоставляется напрямую. В следующем примере синтаксиса создается представление таблицы журнала событий и используется в примерах запросов журнала событий, включенных в эту статью.
CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;
Каждый экземпляр запуска конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор для самого последнего обновления и сохранить его во временном представлении под именем latest_update
. Это представление используется в примерах запросов журнала событий, включенных в эту статью:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
В каталоге Unity представления поддерживают потоковые запросы. В следующем примере используется структурированная потоковая передача для запроса представления, определенного в верхней части таблицы журнала событий:
df = spark.readStream.table("event_log_raw")
Владелец конвейера может опубликовать журнал событий в виде общедоступной таблицы Delta, переключив параметр Publish event log to metastore
в разделе Advanced конфигурации конвейера. При необходимости можно указать новое имя таблицы, каталог и схему для журнала событий.
Запросить информацию о родословной из журнала событий
События, содержащие сведения о происхождении, имеют тип события flow_definition
. Объект details:flow_definition
содержит output_dataset
и input_datasets
, определяющие каждую связь в графе.
Для извлечения входных и выходных наборов данных можно использовать следующий запрос, чтобы просмотреть сведения о происхождении:
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
output_dataset |
input_datasets |
---|---|
customers |
null |
sales_orders_raw |
null |
sales_orders_cleaned |
["customers", "sales_orders_raw"] |
sales_order_in_la |
["sales_orders_cleaned"] |
Запросите качество данных из журнала событий
Если вы определяете ожидания для наборов данных в конвейере, метрики для количества переданных и неудачных записей хранятся в объекте details:flow_progress.data_quality.expectations
. Метрика для количества удаленных записей хранится в объекте details:flow_progress.data_quality
. События, содержащие сведения о качестве данных, имеют тип события flow_progress
.
Метрики качества данных могут быть недоступны для некоторых наборов данных. См. ограничения ожидания.
Доступны следующие метрики качества данных:
Метрика | Описание |
---|---|
dropped_records |
Количество записей, которые были удалены из-за сбоя одного или нескольких ожиданий. |
passed_records |
Количество записей, которые прошли критерии ожидания. |
failed_records |
Количество записей, которые не выполнили критерии ожидания. |
В следующем примере запрашивается метрика качества данных для последнего обновления конвейера:
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
---|---|---|---|
sales_orders_cleaned |
valid_order_number |
4083 | 0 |
Запрос событий автозагрузки из журнала событий
Декларативные конвейеры Lakeflow создают события при обработке файлов автозагрузчика. Для событий загрузчика, event_type
— operation_progress
, а details:operation_progress:type
— это либо AUTO_LOADER_LISTING
, либо AUTO_LOADER_BACKFILL
. Объект details:operation_progress
также включает поля status
, duration_ms
, auto_loader_details:source_path
и auto_loader_details:num_files_listed
.
В следующем примере выполняется запрос событий автозагрузчика для последнего обновления:
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,
latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest.update_id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')
Отслеживание очереди данных с помощью запроса к журналу событий
Декларативные конвейеры Lakeflow отслеживают количество данных, присутствующих в невыполненной работе в объекте details:flow_progress.metrics.backlog_bytes
. События, содержащие метрики невыполненной работы, имеют тип события flow_progress
. В следующем примере запрашиваются метрики накопившихся задач для последнего обновления конвейера.
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
Заметка
Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии среды выполнения Databricks.
Мониторинг улучшенных событий автомасштабирования из журнала событий для конвейеров без включённых функций безсерверности
Для декларативных конвейеров Lakeflow, которые не используют бессерверные вычисления, журнал событий записывает изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип события autoscale
. Сведения о изменении размера кластера хранятся в объекте details:autoscale
. В следующем примере выполняется запрос на изменение размера расширенного кластера с автоматическим масштабированием для последнего обновления конвейера.
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
мониторинг использования вычислительных ресурсов
cluster_resources
события предоставляют метрики по количеству слотов задач в кластере, уровню их использования и количеству задач, которые ожидают постановки в расписание.
Если включен расширенный автомасштабирование, события cluster_resources
также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executors
и optimal_num_executors
. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE
, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
и BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
.
Эти сведения можно просматривать вместе с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.
В следующем примере выполняется запрос истории размера очереди задач для последнего обновления конвейера.
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
В следующем примере запрашивается история использования за последнее обновление конвейера.
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
В следующем примере выполняется запрос журнала счетчиков исполнителя, сопровождаемых метриками, доступными только для расширенных конвейеров автомасштабирования, включая количество исполнителей, запрашиваемых алгоритмом в последнем запросе, оптимальное количество исполнителей, рекомендуемых алгоритмом на основе последних метрик, и состояния алгоритма автомасштабирования:
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
Аудит декларативных конвейеров Lakeflow
Вы можете использовать журналы событий Lakeflow Declarative Pipelines и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в Lakeflow Declarative Pipelines.
Декларативные конвейеры Lakeflow используют учетные данные владельца конвейера для выполнения обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. Декларативные конвейеры Lakeflow регистрируют пользователя для выполнения действий в конвейере, включая создание конвейера, изменение конфигурации и инициацию обновлений.
См. события каталога Unity для справки по событиям аудита каталога Unity.
Запрос действий пользователей в журнале событий
Журнал событий можно использовать для аудита событий, например действий пользователей. События, содержащие сведения о действиях пользователя, имеют тип события user_action
.
Сведения о действии хранятся в объекте user_action
в поле details
. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Сведения о создании представления event_log_raw
, используемого в этом запросе, см. в статье Запрос журнала событий.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
[email protected] |
2021-05-20T19:35:59.913+0000 | CREATE |
[email protected] |
2021-05-27T00:35:51.971+0000 | START |
[email protected] |
информация о среде выполнения
Вы можете просмотреть информацию о среде выполнения для обновления конвейера, например, версию Databricks Runtime для обновления:
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11,0 |