Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Вы можете загрузить данные из любого источника данных, поддерживаемого Apache Spark в Azure Databricks, с помощью декларативных конвейеров Lakeflow. Наборы данных (таблицы и представления) можно определить в декларативных конвейерах Lakeflow для любого запроса, возвращающего кадр данных Spark, включая потоковую передачу кадров данных и Pandas для кадров данных Spark. Для задач загрузки данных Databricks рекомендует использовать потоковые таблицы для большинства сценариев. Потоковые таблицы подходят для загрузки данных из облачного хранилища объектов с помощью автозагрузчика или из систем обмена сообщениями, таких как Kafka.
Заметка
- Не все источники данных поддерживают SQL. Вы можете смешивать записные книжки SQL и Python в декларативных потоках Lakeflow для использования SQL для всех операций за пределами загрузки.
- Дополнительные сведения о работе с библиотеками, не упакованными в декларативные конвейеры Lakeflow по умолчанию, см. в разделе "Управление зависимостями Python для декларативных конвейеров Lakeflow".
- Общие сведения о поглощении данных в Azure Databricks можно найти в статье "Стандартные соединители" в Lakeflow Connect.
В приведенных ниже примерах показаны некоторые распространенные шаблоны.
Загрузка из существующей таблицы
Загрузите данные из любой существующей таблицы в Azure Databricks. Вы можете преобразовать данные с помощью запроса или загрузить таблицу для дальнейшей обработки в конвейере.
В следующем примере данные считываются из существующей таблицы:
Питон
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
загружать файлы из облачного хранилища объектов
Databricks рекомендует использовать автозагрузчик с Декларативными конвейерами Lakeflow для большинства задач приема данных из облачного хранилища объектов или из файлов в томе каталога Unity. Автозагрузчик и декларативные конвейеры Lakeflow предназначены для добавочной и идемпотентной загрузки постоянно растущих данных по мере поступления в облачное хранилище.
См. раздел "Что такое автозагрузчик" и "Загрузка данных из хранилища объектов".
В следующем примере данные из облачного хранилища считываются с помощью автозагрузчика:
Питон
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://[email protected]/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://[email protected]/analysis/*/*/*.json',
format => "json"
);
В следующих примерах используется автозагрузчик для создания наборов данных из CSV-файлов в томе каталога Unity:
Питон
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Заметка
- Если вы используете автозагрузчик с уведомлениями о файлах и запускаете полное обновление для конвейера или потоковой таблицы, необходимо вручную очистить ресурсы. Для выполнения очистки можно использовать CloudFilesResourceManager в записной книжке.
- Чтобы загрузить файлы с помощью Auto Loader в конвейере с поддержкой каталога Unity, нужно применять внешние расположения . Дополнительные сведения об использовании каталога Unity с декларативными конвейерами Lakeflow см. в разделе "Использование каталога Unity с декларативными конвейерами Lakeflow".
Загрузка данных из шины сообщений
Конвейеры с декларативной конструкцией Lakeflow можно настроить для приема данных из шины сообщений. Databricks рекомендует использовать потоковые таблицы с непрерывным выполнением и улучшенным авто-масштабированием, чтобы обеспечить наиболее эффективную загрузку с низкой задержкой из шины сообщений. См. статью "Оптимизация использования кластеров декларативных конвейеров Lakeflow с помощью автомасштабирования".
Например, следующий код настраивает потоковую таблицу для приема данных из Kafka с помощью функции read_kafka :
Питон
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Чтобы получать данные из других источников шины сообщений, смотрите:
- Kinesis: read_kinesis
- Тема Pub/Sub: read_pubsub
- Пульсар: read_pulsar
Загрузка данных из Центров событий Azure
Центры событий Azure — это служба потоковой передачи данных, которая предоставляет совместимый интерфейс Apache Kafka. Вы можете использовать соединитель Структурированной потоковой передачи Kafka, включенный в среду выполнения Декларативных конвейеров Lakeflow, для загрузки сообщений из Центров событий Azure. Дополнительные сведения о загрузке и обработке сообщений из Центров событий Azure см. в разделе "Использование Центров событий Azure" в качестве источника данных Декларативного конвейера Lakeflow.
Загрузка данных из внешних систем
Декларативные конвейеры Lakeflow поддерживают загрузку данных из любого источника данных, поддерживаемого Azure Databricks. См. статью "Подключение к источникам данных" и внешним службам. Вы также можете загрузить внешние данные, используя Lakehouse Federation, для поддерживаемых источников данных . Так как для федерации Lakehouse требуется среда выполнения Databricks 13.3 LTS или более поздней версии, для использования федерации Lakehouse ваш конвейер должен быть настроен на использование предварительного канала.
Некоторые источники данных не поддерживают эквивалентную поддержку в SQL. Если вы не можете использовать федерацию Lakehouse с одним из этих источников данных, можно использовать записную книжку Python для загрузки данных из источника. Исходный код Python и SQL можно добавить в тот же конвейер. В следующем примере объявляется материализованное представление для доступа к текущему состоянию данных в удаленной таблице PostgreSQL:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Загрузка небольших или статических наборов данных из облачного хранилища объектов
Вы можете загружать небольшие или статические наборы данных с помощью синтаксиса загрузки Apache Spark. Декларативные конвейеры Lakeflow поддерживают все форматы файлов, поддерживаемые Apache Spark в Azure Databricks. Полный список см. в разделе "Параметры формата данных".
В следующих примерах показано, как загрузить JSON для создания таблиц Декларативных конвейеров Lakeflow:
Питон
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Заметка
Функция read_files
SQL распространена во всех средах SQL в Azure Databricks. Рекомендуется использовать шаблон прямого доступа к файлам с помощью SQL с декларативными конвейерами Lakeflow. Дополнительные сведения см. в разделе "Параметры".
Сконфигурируйте потоковую таблицу для игнорирования изменений в исходной потоковой таблице
Заметка
- Флаг
skipChangeCommits
работает только сspark.readStream
при использовании функцииoption()
. Этот флаг нельзя использовать вdlt.read_stream()
функции. - Невозможно использовать
skipChangeCommits
флаг, если исходная потоковая таблица определена как цель функции create_auto_cdc_flow().
По умолчанию для потоковых таблиц требуются источники, которые предназначены для добавления. Если потоковая таблица использует другую потоковую таблицу в качестве источника, и для исходной потоковой таблицы требуются обновления или удаления, например, для обработки в соответствии с GDPR «право на забвение», можно установить флаг skipChangeCommits
при чтении исходной потоковой таблицы, чтобы игнорировать эти изменения. Дополнительные сведения об этом флаге см. в разделе "Игнорировать обновления и удаления".
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Обеспечение безопасного доступа к учетным данным хранилища с секретами в потоке обработки
Секреты Azure Databricks можно использовать для хранения учетных данных, таких как ключи доступа или пароли. Чтобы настроить секрет в конвейере, используйте свойство Spark в конфигурации кластера параметров конвейера. См. Настройка вычислений для декларативных конвейеров Lakeflow.
В следующем примере используется секрет для хранения ключа доступа, необходимого для чтения входных данных из учетной записи хранения Azure Data Lake Storage (ADLS) с помощью автозагрузчика. Этот же метод можно использовать для настройки любого секрета, необходимого для конвейера, например ключей AWS для доступа к S3 или пароля к хранилищу метаданных Apache Hive.
Дополнительные сведения о работе с Azure Data Lake Storage см. в статье "Подключение к Azure Data Lake Storage и хранилищу BLOB-объектов".
Заметка
Необходимо добавить префикс spark.hadoop.
в ключ конфигурации spark_conf
, который задает значение секрета.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/[email protected]/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Замени
-
<storage-account-name>
с именем учетной записи хранения ADLS. -
<scope-name>
с именем области секретных данных Azure Databricks. -
<secret-name>
с именем ключа, содержащего ключ доступа к учетной записи хранения Azure.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Замени
-
<container-name>
с именем контейнера учетной записи хранения Azure, в которой хранятся входные данные. -
<storage-account-name>
с именем учетной записи хранения ADLS. -
<path-to-input-dataset>
по пути к входному набору данных.