Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Кластеризация Liquid заменяет секционирование таблиц и ZORDER
, упрощая принятие решений о структуре данных и оптимизируя производительность запросов. Она обеспечивает гибкость для переопределения ключей кластеризации без перезаписи существующих данных, позволяя макету данных развиваться вместе с потребностями аналитики с течением времени. Кластеризация Liquid применяется как к таблицам потоковой передачи, так и к материализованным представлениям.
Внимание
Жидкостная кластеризация общедоступна для Delta Lake и в общедоступной предварительной версии для Apache Iceberg.
Для всех таблиц Delta Lake с включенной динамической кластеризацией Databricks рекомендуется использовать Databricks Runtime 15.2 и более поздние версии. Поддержка общедоступной предварительной версии с ограничениями доступна в Databricks Runtime 13.3 LTS и выше. Параллелизм на уровне строк поддерживается в Databricks Runtime 13.3 LTS и более поздних версиях и общедоступен в Databricks Runtime 14.2 и выше для всех таблиц с включенными векторами удаления. См. информацию об уровнях изоляции и конфликтах записи в Azure Databricks.
Для всех таблиц Iceberg, где включена Liquid Clustering, требуется Databricks Runtime версии 16.4 LTS и выше.
Для чего используется кластеризация жидкости?
Databricks рекомендует жидкостную кластеризацию для всех новых таблиц, включая потоковые таблицы и материализованные представления. Ниже приведены примеры сценариев, в которых удобно использовать кластеризацию:
- Таблицы часто фильтруются по столбцам высокой кратности.
- Таблицы со значительным неравномерным распределением данных.
- Таблицы, размер которых быстро увеличивается и которые нуждаются в настройке и обслуживании.
- Таблицы, в которых требуются параллельные записи.
- Таблицы с меняющимися шаблонами доступа.
- Таблицы, в которых использование типичного ключа раздела может привести к слишком большому или слишком малому числу разделов.
Включение кластеризации жидкости
Вы можете включить кластеризацию жидкости в существующей таблице или во время создания таблицы. Кластеризация несовместима с секционированием или ZORDER
, и требует использования Azure Databricks для управления всеми операциями макета и оптимизации данных в таблице. После включения кластеризации данных, запускайте задания OPTIMIZE
как обычно для постепенной кластеризации данных. Узнайте , как активировать кластеризацию.
Чтобы включить кластеризацию жидкости, добавьте фразу CLUSTER BY
в инструкцию создания таблицы, как показано в приведенных ниже примерах:
Примечание.
В Databricks Runtime 14.2 и более поздних версиях можно использовать API кадра данных и API DeltaTable в Python или Scala, чтобы включить кластеризацию жидкости для таблиц Delta Lake.
SQL
-- Create an empty Delta table
CREATE TABLE table1(col0 INT, col1 string) CLUSTER BY (col0);
-- Using a CTAS statement
CREATE EXTERNAL TABLE table2 CLUSTER BY (col0) -- specify clustering after table name, not in subquery
LOCATION 'table_location'
AS SELECT * FROM table1;
-- Using a LIKE statement to copy configurations
CREATE TABLE table3 LIKE table1;
Для Apache Iceberg необходимо явно отключить векторы удаления и идентификаторы строк при включении кластеризации Liquid в таблице Managed Iceberg.
Питон
# Create an empty Delta table
(DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute())
# Using a CTAS statement
df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
# CTAS using DataFrameWriterV2
df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
язык программирования Scala
// Create an empty Delta table
DeltaTable.create()
.tableName("table1")
.addColumn("col0", dataType = "INT")
.addColumn("col1", dataType = "STRING")
.clusterBy("col0")
.execute()
// Using a CTAS statement
val df = spark.read.table("table1")
df.write.clusterBy("col0").saveAsTable("table2")
// CTAS using DataFrameWriterV2
val df = spark.read.table("table1")
df.writeTo("table1").using("delta").clusterBy("col0").create()
В Databricks Runtime 16.0 и более поздних версиях вы можете создавать таблицы с использованием технологии liquid clustering, применяя записи из структурированных потоков, например:
:::
SQL
CREATE TABLE table1 (
col0 STRING,
col1 DATE,
col2 BIGINT
)
CLUSTER BY (col0, col1)
TBLPROPERTIES (
'clusterByAuto' = 'true'
);
Питон
(spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
)
язык программирования Scala
spark.readStream.table("source_table")
.writeStream
.clusterBy("column_name")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Предупреждение
Delta таблицы, созданные с включенной жидкой кластеризацией, включают множество функций Delta таблиц при создании и использовании Delta writer версии 7 и reader версии 3. Вы можете переопределить включение некоторых из этих функций. См. Переопределение включения функций по умолчанию (необязательно).
Версии протокола таблиц не могут быть понижены, а таблицы с включенной кластеризацией недоступны для чтения клиентами Delta Lake, которые не поддерживают все включенные функции протокола чтения Delta. См. сведения о совместимости функций Delta Lake и протоколах.
Включите кластеризацию жидкости в существующей непартиментной таблице Delta с помощью следующего синтаксиса:
-- Alter an existing table
ALTER TABLE <table_name>
CLUSTER BY (<clustering_columns>)
Для Apache Iceberg необходимо явно отключить векторы удаления и идентификаторы строк при включении кластеризации жидкости в существующей управляемой таблице Iceberg.
Внимание
Поведение по умолчанию не применяет кластеризацию к ранее записанным данным. Чтобы принудительно перекластеризовать все записи, необходимо использовать OPTIMIZE FULL
. См. раздел Принудительная перекластеризация для всех записей.
Чтобы удалить ключи кластеризации, используйте следующий синтаксис:
ALTER TABLE table_name CLUSTER BY NONE;
автоматическая кластеризация жидкости
Примечание.
Автоматическое кластеризация жидкости недоступна для Apache Iceberg.
В Databricks Runtime 15.4 LTS и более поздних версиях можно включить автоматическое жидкое кластеризование для управляемых Unity Catalog таблиц Delta. При включенной автоматической кластеризации данных Azure Databricks интеллектуально определяет ключи кластеризации для оптимизации производительности запросов. Вы включаете автоматическую кластеризацию жидкости при помощи предложения CLUSTER BY AUTO
.
При включении автоматические операции выбора ключей и кластеризации выполняются асинхронно в качестве операции обслуживания и требуют включения прогнозной оптимизации для таблицы. См прогнозную оптимизацию для управляемых таблиц каталога Unity.
Чтобы определить ключи кластеризации, Azure Databricks анализирует историческую рабочую нагрузку запроса таблицы и определяет лучшие столбцы кандидатов. Ключи кластеризации изменяются, когда прогнозируемая экономия затрат от пропуска данных перевешивает затраты на кластеризацию данных.
Если способ запроса ваших данных изменяется со временем или производительность запроса указывает на изменения в распределении данных, автоматическая кластеризация жидкости выбирает новые ключи для оптимизации производительности.
Если ключ не был выбран автоматической кластеризацией жидкости, причина может быть:
- Таблица слишком мала, чтобы извлечь выгоду из объединения жидкостей.
- Таблица уже имеет хорошую схему кластеризации. Например, он имеет хорошие ключи, которые были применены один раз раньше, или порядок вставки уже хорошо работает для заданного шаблона запроса, например данных, вставленных в хронологическом порядке и запрашиваемых по метке времени.
- В таблице нет частых запросов.
- Вы не используете Databricks Runtime 15.4 LTS или более поздней версии.
Автоматическое кластеризация жидкости можно применять для всех управляемых таблиц каталога Unity независимо от характеристик данных и запросов. Эти функции обеспечивают интеллектуальную оптимизацию макета данных на основе шаблонов использования данных, и эвристика решит, полезно ли выбрать ключи кластеризации.
Примечание.
Таблицы можно считывать или записывать с поддержкой автоматического кластеризации из всех версий среды выполнения Databricks, поддерживающих кластеризацию жидкости. Однако выбор интеллектуального ключа зависит от метаданных, представленных в Databricks Runtime 15.4 LTS. Используйте Databricks Runtime 15.4 LTS или более поздней версии, чтобы гарантировать, что автоматически выбранные ключи используют все ваши рабочие нагрузки и что эти рабочие нагрузки учитываются при выборе новых ключей.
Включение или отключение автоматической кластеризации жидкости
Чтобы включить или отключить автоматическое кластеризация жидкости в новой или существующей таблице, используйте следующий синтаксис:
SQL
-- Create an empty table.
CREATE OR REPLACE TABLE table1(column01 int, column02 string) CLUSTER BY AUTO;
-- Enable automatic liquid clustering on an existing table,
-- including tables that previously had manually specified keys.
ALTER TABLE table1 CLUSTER BY AUTO;
-- Disable automatic liquid clustering on an existing table.
ALTER TABLE table1 CLUSTER BY NONE;
-- Disable automatic liquid clustering by setting the clustering keys
-- to chosen clustering columns or new columns.
ALTER TABLE table1 CLUSTER BY (column01, column02);
Примечание.
Если запускаете CREATE OR REPLACE table_name
без указания CLUSTER BY AUTO
, а таблица уже существует и включает автоматическую кластеризацию данных, параметр AUTO
и столбцы для кластеризации таблицы (при применении) сохраняются при замене таблицы. Прогнозная оптимизация также поддерживает историческую рабочую нагрузку запроса для этой таблицы, чтобы определить лучшие ключи кластеризации.
Питон
df = spark.read.table("table1")
df.write
.format("delta")
.option(“clusterByAuto”, “true”)
.saveAsTable(...)
# To set clustering columns and auto, which serves as a way to give a hint
# for the initial selection.
df.write
.format("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option(“clusterByAuto”, “true”)
.saveAsTable(...)
# Using DataFrameWriterV2
df.writeTo(...).using("delta")
.option(“clusterByAuto”, “true”)
.create()
# To set clustering columns and auto, which serves as a way to give a hint
# for the initial selection.
df.writeTo(...).using("delta")
.clusterBy("clusteringColumn1", "clusteringColumn2")
.option(“clusterByAuto”, “true”)
.create()
# Similar syntax can also be used to set clusterByAuto for streaming tables.
spark.readStream.table("source_table")
.writeStream
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
# Or to specify a hint for the clustering columns by specifying both auto and columns together
spark.readStream.table("source_table")
.writeStream
.clusterBy("column1", "column2")
.option("clusterByAuto", "true")
.option("checkpointLocation", checkpointPath)
.toTable("target_table")
Примечание.
API Python доступен в Databricks Runtime 16.4 и выше.
Когда .clusterBy
используется вместе с .option('clusterByAuto', 'true)
, затем:
- Если этот параметр впервые устанавливает автоматическое кластеризирование жидкости, он всегда будет учитывать вручную введённые данные и задавать столбцы кластеризации в
.clusterBy
. - Если это уже таблица с автоматической кластеризацией жидкости, то подсказку с использованием
.clusterBy
можно принять один раз. Например, столбцы, указанные.clusterBy
, задаются только в том случае, если в таблице нет уже установленных столбцов кластеризации, либо вами, либо автоматической кластеризацией.
Вы можете использовать Python только при создании или замене таблицы. Используйте SQL для изменения clusterByAuto
состояния существующей таблицы.
Проверьте, включена ли автоматическая кластеризация
Чтобы проверить, включена ли в таблице автоматическая кластеризация жидкости, используйте DESCRIBE TABLE
или SHOW TBLPROPERTIES
.
Если включена автоматическая кластеризация жидкости, свойству clusterByAuto
задано значение true
. В свойстве clusteringColumns
показаны текущие столбцы кластеризации, которые были автоматически или вручную выбраны.
Переопределить включение функции по умолчанию (необязательно)
Вы можете переопределить поведение по умолчанию, которое включает функции таблицы Delta при активации жидкостной кластеризации. Это предотвращает обновление протоколов чтения и записи, связанных с этими функциями таблицы. Чтобы выполнить следующие действия, необходимо создать существующую таблицу:
Используется
ALTER TABLE
для задания свойства таблицы, которое отключает одну или несколько функций. Например, чтобы отключить векторы удаления, выполните следующие действия:ALTER TABLE table_name SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);
Включите кластеризацию жидкости в таблице, выполнив следующие действия:
ALTER TABLE <table_name> CLUSTER BY (<clustering_columns>)
В следующей таблице приведены сведения о функциях Delta, которые можно переопределить и как включение влияет на совместимость с версиями среды выполнения Databricks.
Функция Delta | Совместимость среды выполнения | Свойство для переопределения активации | Влияние отключения на кластеризацию жидкости |
---|---|---|---|
Векторы удаления | Для чтения и записи требуется Databricks Runtime 12.2 LTS и более поздних версий. | 'delta.enableDeletionVectors' = false |
Параллелизм на уровне строк отключен, что делает транзакции и операции кластеризации более вероятными для конфликта. См. Конфликты записи при параллелизме на уровне строк.DELETE , MERGE и UPDATE команды могут выполняться медленнее. |
Отслеживание строк данных | Записи требуют Databricks Runtime 13.3 LTS и более поздних версий. Можно считывать из любой версии Databricks Runtime. | 'delta.enableRowTracking' = false |
Параллелизм на уровне строк отключен, что делает транзакции и операции кластеризации более вероятными для конфликта. См. Конфликты записи при параллелизме на уровне строк. |
Контрольные точки версии 2 | Для чтения и записи требуется Databricks Runtime 13.3 LTS и более поздних версий. | 'delta.checkpointPolicy' = 'classic' |
Не влияет на поведение кластеризации жидкости. |
Выбор ключей кластеризации
Databricks рекомендует автоматическую кластеризацию жидкости для поддерживаемых таблиц. См. автоматическая кластеризация жидкости.
Databricks рекомендует выбирать ключи кластеризации на основе столбцов, наиболее часто используемых в фильтрах запросов. Ключи кластеризации можно определить в любом порядке. Если два столбца очень коррелируются, необходимо включить только один из них в качестве ключа кластеризации.
Можно указать до четырех ключей кластеризации. Для небольших таблиц (менее 10 ТБ) использование дополнительных ключей кластеризации (например, четыре) может снизить производительность при фильтрации по одному столбцу по сравнению с использованием меньшего количества ключей кластеризации (например, двух). Однако по мере увеличения размера таблицы разница в производительности с использованием дополнительных ключей кластеризации для запросов с одним столбцом становится незначительной.
Можно указать только столбцы, которые собирают статистику в качестве ключей кластеризации. По умолчанию в таблице Delta сбор статистики производится для первых 32 столбцов. См. раздел "Указание столбцов статистики Delta".
Кластеризация поддерживает следующие типы данных для кластеризации ключей.
- Дата
- Метка времени
- TimestampNTZ (требуется Databricks Runtime 14.3 LTS или более поздняя версия)
- Строка
- Целое число
- длинный
- Коротко
- Плавающий
- Двойной
- Десятичное число
- Байт
Если вы преобразуете существующую таблицу, рассмотрите следующие рекомендации:
Текущий метод оптимизации данных | Рекомендация по кластеризации ключей |
---|---|
Секционирование в стиле Hive | Используйте столбцы секций в качестве ключей кластеризации. |
Индексирование Z-порядка |
ZORDER BY Используйте столбцы в качестве ключей кластеризации. |
Секционирование в стиле Hive и порядок Z | Используйте как столбцы секционирования, так и ZORDER BY столбцы в качестве ключей кластеризации. |
Созданные столбцы для уменьшения кардинальности (например, дата для метки времени) | Используйте исходный столбец в качестве ключа кластеризации и не создавайте созданный столбец. |
Запись данных в кластеризованную таблицу
Для записи в кластерную таблицу Delta необходимо использовать клиент записи Delta, поддерживающий все функции протокола записи Delta, используемые при динамической кластеризации. Для записи в кластеризованную таблицу Iceberg можно использовать Iceberg REST API каталога Unity. В Azure Databricks необходимо использовать Databricks Runtime 13.3 LTS и выше.
Операции, которые концентрируются при записи, включают следующие:
-
INSERT INTO
операции - Выражения
CTAS
иRTAS
-
COPY INTO
из формата Parquet spark.write.mode("append")
Структурированная потоковая обработка данных никогда не активирует кластеризацию данных при записи. Применяются дополнительные ограничения. См. Ограничения.
Кластеризация при записи активируется только в том случае, если данные в транзакции соответствуют пороговой величине. Эти пороговые значения зависят от количества столбцов кластеризации и ниже для управляемых таблиц каталога Unity, чем другие таблицы Delta.
Количество столбцов кластеризации | Пороговое значение для управляемых таблиц каталога Unity | Пороговый размер для других таблиц Delta |
---|---|---|
1 | 64 МБ | 256 МБ |
2 | 256 МБ | 1 ГБ |
3 | 512 МБ | 2 ГБ |
4 | 1 ГБ | 4 ГБ |
Поскольку не все операции применяют динамическую кластеризацию, Databricks рекомендует часто выполнять OPTIMIZE
, чтобы обеспечить эффективную кластеризацию всех данных.
Как запустить кластеризацию
Прогнозная оптимизация автоматически выполняет OPTIMIZE команды для включенных таблиц. См прогнозную оптимизацию для управляемых таблиц каталога Unity. При использовании прогнозной оптимизации Databricks рекомендует отключить все запланированные OPTIMIZE задания.
Чтобы активировать кластеризацию, необходимо использовать Databricks Runtime 13.3 LTS или более поздней версии. Используйте команду OPTIMIZE
на вашей таблице.
OPTIMIZE table_name;
Жидкая кластеризация является инкрементной, что означает, что данные перезаписываются только при необходимости для учёта данных, которые должны быть кластеризованы. Файлы данных с ключами кластеризации, которые не соответствуют кластеризованным данным, не перезаписываются.
Если вы не используете прогнозную оптимизацию, Databricks рекомендует планировать регулярные OPTIMIZE
задания для кластеризации данных. Для таблиц с большим количеством обновлений или вставок Databricks рекомендует планировать задание OPTIMIZE
с интервалом в один или два часа. Так как кластеризация жидкости увеличивается, большинство OPTIMIZE
заданий для кластеризованных таблиц выполняются быстро.
Принудительная перекластеризация для всех записей
В Databricks Runtime 16.0 и более поздних версиях можно принудительно выполнить повторение всех записей в таблице со следующим синтаксисом:
OPTIMIZE table_name FULL;
Внимание
При необходимости выполнение OPTIMIZE FULL
перекластеризует все существующие данные. Для больших таблиц, которые ранее не были кластеризованы по указанным ключам, эта операция может занять несколько часов.
Запустите OPTIMIZE FULL
при первом включении кластеризации или изменении ключей кластеризации. Если вы ранее выполнили OPTIMIZE FULL
и не было изменений в ключах кластеризации, OPTIMIZE FULL
выполняется так же, как и OPTIMIZE
. В этом сценарии OPTIMIZE
использует добавочный подход и перезаписывает только файлы, которые ранее не были сжаты. Всегда используйте OPTIMIZE FULL
, чтобы убедиться, что макет данных отражает текущие ключи кластеризации.
Чтение данных из кластеризованной таблицы
Данные можно считывать в кластеризованной таблице Delta с помощью любого клиента Delta Lake, поддерживающего чтение векторов удаления. С помощью API REST каталога Iceberg можно считывать данные в кластеризованной таблице Iceberg.
SELECT * FROM table_name WHERE cluster_key_column_name = "some_value";
Изменение ключей кластеризации
Ключи кластеризации для таблицы можно изменить в любое время, выполнив ALTER TABLE
команду, как показано в следующем примере:
ALTER TABLE table_name CLUSTER BY (new_column1, new_column2);
При изменении ключей кластеризации последующие OPTIMIZE
операции и операции записи используют новый подход кластеризации, но существующие данные не перезаписываются.
Кроме того, можно отключить кластеризацию, задав ключи NONE
следующим образом:
ALTER TABLE table_name CLUSTER BY NONE;
Настройка ключей NONE
кластера не перезаписывает данные, которые уже кластеризованы, но не позволяет будущим OPTIMIZE
операциям использовать ключи кластеризации.
Использование кластеризации жидкости из внешнего движка
Вы можете включить кластеризацию жидкости на управляемых таблицах Iceberg из внешних двигателей Iceberg. Чтобы включить кластеризацию жидкости, укажите столбцы секций при создании таблицы. Каталог Unity интерпретирует секции как ключи кластеризации. Например, выполните следующую команду в OSS Spark:
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY c1;
Вы можете отключить кластеризацию жидкости:
ALTER TABLE main.schema.icebergTable DROP PARTITION FIELD c2;
Вы можете изменить ключи кластеризации с помощью эволюции разделов Iceberg.
ALTER TABLE main.schema.icebergTable ADD PARTITION FIELD c2;
Если указать секцию с помощью преобразования контейнера, каталог Unity удаляет выражение и использует столбец в качестве ключа кластеризации:
CREATE OR REPLACE TABLE main.schema.icebergTable
PARTITIONED BY (bucket(c1, 10));
Узнайте, как кластеризована таблица
Вы можете использовать команды DESCRIBE
для просмотра ключей кластеризации таблицы, как в следующих примерах:
DESCRIBE TABLE table_name;
DESCRIBE DETAIL table_name;
Совместимость таблиц с гибкой кластеризацией
Таблицы, созданные с помощью технологической кластеризации в Databricks Runtime 14.1 и выше, по умолчанию используют контрольные точки версии 2. Таблицы можно считывать и записывать с помощью контрольных точек версии 2 в Databricks Runtime 13.3 LTS и выше.
Вы можете отключить контрольные точки версии 2 и понизить протоколы таблиц для чтения таблиц с жидкой кластеризацией в Databricks Runtime 12.2 LTS и более поздних версиях. См. Управление функцией удаления таблицы Delta Lake и понижение протокола таблицы.
Ограничения
Применяются следующие ограничения:
- В Databricks Runtime 15.1 и ниже кластеризация при записи данных не поддерживает запросы источника данных, включающие фильтры, объединения или агрегации.
- Нагрузки структурированного потокового режима не поддерживают кластеризацию данных при записи.
- В Databricks Runtime 15.4 LTS и ниже нельзя создать таблицу с активированной возможностью жидкостной кластеризации при использовании записи с помощью Structured Streaming. Структурированная потоковая передача можно использовать для записи данных в существующую таблицу с включенным кластеризированием жидкости.
- Параллелизм на уровне строк не поддерживается в управляемых таблицах Iceberg, поскольку в таблицах Iceberg не поддерживаются векторы удаления и отслеживание строк.