Настройка PostgreSQL для импорта в Azure Databricks

Это важно

Соединитель PostgreSQL для Lakeflow Connect находится в общедоступной предварительной версии. Обратитесь к группе учетной записи Databricks, чтобы зарегистрироваться в общедоступной предварительной версии.

На этой странице описаны задачи по настройке источника для передачи данных из PostgreSQL в Azure Databricks с помощью Lakeflow Connect.

Логическая репликация для отслеживания измененных данных

Соединитель PostgreSQL использует логическую репликацию для отслеживания изменений в исходных таблицах. Логическая репликация позволяет соединителю записывать изменения данных (вставки, обновления и удаления), не требуя триггеров или значительных затрат на исходную базу данных.

Для логической репликации Lakeflow PostgreSQL требуется следующее:

  1. Lakeflow Connect поддерживает репликацию данных из PostgreSQL версии 13 и более поздних версий.

  2. Настройте базу данных для логической репликации:

    Параметр wal_level PostgreSQL должен иметь значение logical.

  3. Создайте публикации, содержащие все таблицы, которые необходимо реплицировать.

  4. Создайте слоты репликации для каждого каталога, который будет реплицирован.

Замечание

Перед созданием слотов репликации необходимо создать публикации.

Дополнительные сведения о логической репликации см. в документации по логической репликации на веб-сайте PostgreSQL.

Общие сведения о задачах установки источника

Перед приемом данных в Azure Databricks выполните следующие задачи в PostgreSQL:

  1. Проверка PostgreSQL 13 или более поздней версии

  2. Настройка сетевого доступа (группы безопасности, правила брандмауэра или VPN)

  3. Настройка логической репликации:

    • Включение логической репликации (wal_level = logical)
  4. Необязательно. Настройте встроенное отслеживание DDL для автоматического обнаружения изменений схемы. Если вы хотите выбрать встроенное отслеживание DDL, обратитесь в службу поддержки Databricks.

Это важно

Если вы планируете реплицировать из нескольких баз данных PostgreSQL, необходимо создать отдельный слот публикации и репликации для каждой базы данных. Встроенный скрипт отслеживания DDL (если используется) также должен выполняться в каждой базе данных.

Настройка логической репликации

Чтобы включить логическую репликацию в PostgreSQL, настройте параметры базы данных и настройте необходимые объекты.

Установите логический уровень WAL

Для логической репликации необходимо настроить журнал Write-Ahead (WAL). Обычно для этого параметра требуется перезапуск базы данных.

  1. Проверьте текущий wal_level параметр:

    SHOW wal_level;
    
  2. Если значение не logical, установите wal_level = logical в конфигурации сервера и перезапустите службу PostgreSQL.

Создание пользователя репликации

Создайте выделенного пользователя для приема Databricks с правами репликации:

CREATE USER databricks_replication WITH PASSWORD 'your_secure_password';
GRANT CONNECT ON DATABASE your_database TO databricks_replication;
GRANT USAGE ON SCHEMA schema_name TO databricks_replication;
GRANT SELECT ON TABLE schema_name.table_name TO databricks_replication;
ALTER USER databricks_replication WITH REPLICATION;

Подробные требования к привилегиям см. в разделе "Требования к пользователю базы данных PostgreSQL".

Установить идентификатор реплики для таблиц

Для каждой таблицы, которую требуется реплицировать, настройте идентификатор реплики. Правильный параметр зависит от структуры таблицы:

Структура таблицы Обязательная ИДЕНТИЧНОСТЬ РЕПЛИКИ Command
Таблица имеет первичный ключ и не содержит столбцы TOASTable (например, TEXTBYTEAVARCHAR(n) с большими значениями) DEFAULT ALTER TABLE schema_name.table_name REPLICA IDENTITY DEFAULT;
Таблица имеет первичный ключ, но включает в себя большие столбцы переменной длины (TOASTable) FULL ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;
Таблица не имеет первичного ключа FULL ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;

Дополнительные сведения о параметрах идентификации реплики см. в документации PostgreSQL.

Создание публикации

Создайте публикацию в каждой базе данных, включающую таблицы, которые необходимо реплицировать. Выполните следующую команду как владелец таблицы или суперпользователь:

-- Create a publication for specific tables
CREATE PUBLICATION databricks_publication FOR TABLE schema_name.table1, schema_name.table2;

-- Or create a publication for all tables in a database
CREATE PUBLICATION databricks_publication FOR ALL TABLES;

Замечание

  • Необходимо создать отдельную публикацию в каждой базе данных PostgreSQL, которую требуется реплицировать.
  • CREATE PUBLICATION ... FOR TABLE требуется владение перечисленными таблицами. FOR ALL TABLES требует привилегий суперпользователя. Выполните эту команду как владелец таблицы или суперпользователь базы данных, а не как пользователь репликации.
  • Избегайте добавления таблиц в публикацию, которые не требуются для репликации, чтобы уменьшить ненужный сетевой трафик.

Настройка параметров слота репликации

Перед созданием слотов репликации настройте следующие параметры сервера:

Ограничение хранения WAL в слотах репликации

Параметр: max_slot_wal_keep_size

Рекомендуется не устанавливатьmax_slot_wal_keep_size-1 (значение по умолчанию), так как это приводит к неограниченному разрастанию WAL из-за задержки на неактивные слоты репликации. В зависимости от рабочей нагрузки задайте для этого параметра конечное значение.

Дополнительные сведения о параметре max_slot_wal_keep_size см. в официальной документации PostgreSQL.

Замечание

Некоторые управляемые поставщики облачных служб не допускают изменения этого параметра и вместо этого используют встроенный мониторинг слотов и автоматическую очистку. Проверьте поведение платформы перед настройкой операционных оповещений.

Дополнительные сведения можно найти здесь

Настройка емкости слота репликации

Параметр: max_replication_slots

Для каждой реплицируемой базы данных PostgreSQL требуется один логический слот репликации. Задайте для этого параметра по крайней мере количество реплицируемых баз данных, а также все существующие потребности репликации.

Настройка отправителей WAL

Параметр: max_wal_senders

Этот параметр определяет максимальное количество параллельных процессов отправителя WAL, которые передают данные WAL подписчикам. В большинстве случаев для каждого слота репликации должен быть один процесс отправителя WAL, чтобы обеспечить эффективную и согласованную репликацию данных.

Настройте max_wal_senders по крайней мере равное количеству используемых слотов репликации, учитывая любое другое существующее использование. Рекомендуется установить его немного выше, чтобы обеспечить гибкость работы.

Создание слота репликации

Создайте слот репликации в каждой базе данных, которую шлюз приема Databricks будет использовать для отслеживания изменений. Слот репликации должен быть создан пользователем с привилегией REPLICATION . Если вы подключены как суперпользователь или администратор, сначала перейдите к пользователю репликации:

SET ROLE databricks_replication;

-- Databricks supports only the pgoutput plugin for replication slots
SELECT pg_create_logical_replication_slot('databricks_slot', 'pgoutput');

-- Switch back to the admin or table owner role for subsequent steps
RESET ROLE;

Это важно

  • Слоты репликации удерживают данные WAL до тех пор, пока они не будут использованы соединителем. max_slot_wal_keep_size Настройте параметр, чтобы ограничить хранение WAL и предотвратить несвязанный рост WAL. Дополнительные сведения см. в разделе "Настройка параметров слота репликации ".
  • При удалении конвейера приема данных необходимо вручную удалить связанный слот репликации. См. раздел "Очистка слотов репликации".

Необязательно: Конфигурация встроенного отслеживания DDL

Встроенное отслеживание DDL — это необязательная функция, которая позволяет соединителю автоматически обнаруживать и применять изменения в схеме из исходной базы данных. По умолчанию эта функция выключена.

Предупреждение

Отслеживание встроенного DDL в настоящее время находится в предварительной версии и требует связывания с службой поддержки Databricks для его включения в вашей рабочей области.

Сведения о том, какие изменения схемы обрабатываются автоматически, и которые требуют полного обновления, см. в разделе "Как управляемые соединители обрабатывают эволюцию схемы" и "Эволюция схемы".

Настройте встроенную трассировку DDL

Если в вашей рабочей области включено встроенное отслеживание DDL, выполните следующие действия в каждой базе данных PostgreSQL:

  1. Скачайте и запустите скрипт lakeflow_pg_ddl_change_tracking.sql :

    \i lakeflow_pg_ddl_change_tracking.sql
    
  2. Убедитесь, что триггеры и таблица аудита были успешно созданы:

    -- Check for the DDL audit table
    SELECT * FROM pg_tables WHERE tablename LIKE 'lakeflow_ddl_audit_table%';
    
    -- Check for the event triggers
    SELECT * FROM pg_event_trigger WHERE evtname LIKE 'lakeflow%';
    
  3. Добавьте таблицу аудита DDL в публикацию. Эта команда должна выполняться как владелец публикации, а не как пользователь репликации:

    ALTER PUBLICATION databricks_publication ADD TABLE public.lakeflow_ddl_audit_table_1_0;
    

Заметки о конфигурации для конкретного облака

AWS RDS и Аврора

  • Убедитесь, что параметр rds.logical_replication установлен на 1 в группе параметров.

  • Настройте группы безопасности, чтобы разрешить подключения из рабочей области Databricks.

  • Пользователю репликации требуется роль rds_replication.

    GRANT rds_replication TO databricks_replication;
    

База данных Azure для PostgreSQL

  • Включите логическую репликацию в параметрах сервера через портал Azure или CLI.
  • Настройте правила брандмауэра, чтобы разрешить подключения из рабочей области Databricks.
  • Для гибкого сервера поддерживается логическая репликация. Для одного сервера убедитесь, что вы используете поддерживаемый уровень.

GCP Cloud SQL для PostgreSQL

  • Включите флаг cloudsql.logical_decoding в параметрах экземпляра.
  • Настройте авторизованные сети, чтобы разрешить подключения из рабочей области Databricks.
  • Убедитесь, что cloudsql.enable_pglogical флаг задан on, если используются расширения pglogical.

Проверка конфигурации

После выполнения задач установки убедитесь, что логическая репликация настроена правильно:

  1. Убедитесь, что wal_level задано значение logical:

    SHOW wal_level;
    
  2. Убедитесь, что у пользователя репликации есть replication права:

    SELECT rolname, rolreplication FROM pg_roles WHERE rolname = 'databricks_replication';
    
  3. Убедитесь, что у пользователя репликации есть SELECT права доступа к таблицам. Замените schema_name.table_name на схему и таблицу, которые вы реплицируете (например, public.my_table):

    SELECT has_table_privilege('databricks_replication', 'schema_name.table_name', 'SELECT');
    
  4. Убедитесь, что публикация существует:

    SELECT * FROM pg_publication WHERE pubname = 'databricks_publication';
    
  5. Убедитесь, что слот репликации существует:

    SELECT slot_name, slot_type, active, restart_lsn
    FROM pg_replication_slots
    WHERE slot_name = 'databricks_slot';
    
  6. Проверьте единтичность реплики для ваших таблиц:

    SELECT schemaname, tablename, relreplident
    FROM pg_tables t
    JOIN pg_class c ON t.tablename = c.relname
    WHERE schemaname = 'your_schema';
    

    Столбец relreplident должен показывать d для идентичности реплики DEFAULT (использует первичный ключ) или f для полной идентичности реплики FULL (требуется для таблиц без первичных ключей или с TOAST-столбцами).

Дальнейшие шаги

После завершения настройки источника можно создать шлюз приема и конвейер для приема данных из PostgreSQL. См. Поглощение данных из PostgreSQL.