Добавление источника CDC из базы данных PostgreSQL в поток событий.

В этой статье рассказывается о том, как добавить источник захвата изменений данных (CDC) базы данных PostgreSQL в поток событий.

Коннектор источника захвата изменений данных базы данных PostgreSQL (CDC) для потоков событий Microsoft Fabric позволяет обеспечивать захват моментального снимка текущих данных в базе данных PostgreSQL. В настоящее время служба PostgreSQL Database Change Data Capture (CDC) поддерживается из следующих служб, где базы данных могут быть доступны для общедоступного доступа:

  • База данных Azure для PostgreSQL
  • Amazon RDS для PostgreSQL
  • Amazon Aurora PostgreSQL
  • Google Cloud SQL для PostgreSQL

После добавления источника CDC базы данных PostgreSQL в поток событий он записывает изменения на уровне строк в указанные таблицы. Затем эти изменения можно обрабатывать в режиме реального времени и отправлять в разные места назначения для дальнейшего анализа.

Примечание.

С помощью DeltaFlow (предварительная версия) можно преобразовать необработанные события Debezium CDC в потоки, готовые к аналитике, которые отражают структуру исходной таблицы. DeltaFlow автоматизирует регистрацию схемы, управление таблицами назначения и обработку эволюции схемы. Чтобы использовать DeltaFlow, выберите события, готовые к аналитике, и автоматически обновленную схему на этапе обработки схемы.

Предварительные условия

Включение CDC в базе данных PostgreSQL

В этом разделе в качестве примера используется Azure Database for PostgreSQL.

Чтобы включить CDC в Azure Database for PostgreSQL Flexible Server, выполните следующие действия:

  1. На странице Azure Database for PostgreSQL Flexible Server на портале Azure выберите Server parameters в меню навигации.

  2. На странице параметров сервера:

    • Задайте параметр wal_level значением логический.
    • Обновите max_worker_processes до как минимум 16.

    Снимок экрана: включение CDC для гибкого развертывания сервера.

  3. Сохраните изменения и перезапустите сервер.

  4. Убедитесь, что экземпляр гибкого сервера Azure Database for PostgreSQL разрешает общедоступный сетевой трафик.

  5. Предоставьте администраторам разрешения на репликацию, выполнив следующую инструкцию SQL. Если вы хотите использовать другую учетную запись пользователя для подключения базы данных PostgreSQL (DB) для получения CDC, убедитесь, что пользователь является владельцем таблицы.

    ALTER ROLE <admin_user_or_table_owner_user> WITH REPLICATION;
    

Запуск мастера выбора источника данных

Если вы еще не добавили источник в поток событий, выберите плитку "Подключить источники данных ". Вы также можете выбрать Добавить источник>Подключить источники данных на ленте.

Снимок экрана, показывающий выбор плитки для использования внешнего источника.

Если вы добавляете источник в уже опубликованный поток событий, переключитесь в режим редактирования . На ленте выберите Добавить источник>Подключить источники данных.

Снимок экрана: выборы для добавления внешних источников.

На странице Выбор источника данных найдите и выберите Connect на плитке DB PostgreSQL (CDC).

Скриншот, показывающий выбор базы данных Azure (DB) для PostgreSQL (CDC) в качестве исходного типа в мастере получения событий.

Настройка и подключение к Базе данных PostgreSQL CDC

Прием изменений данных из баз данных PostgreSQL с автоматической регистрацией схем таблиц через функцию CDC в Eventstream.

Примечание.

DeltaFlow (предварительная версия): при выборе событий, готовых к аналитике, и автоматического обновления схемы на этапе обработки схемы, DeltaFlow преобразует необработанные события Debezium CDC в потоки, готовые к аналитике, которые отражают структуру исходной таблицы. DeltaFlow также автоматизирует создание целевой таблицы и обработку эволюции схемы.

  1. На странице "Подключение" выберите "Создать подключение".

    Снимок экрана: страница

  2. В разделе "Параметры подключения" введите следующие сведения.

    • Сервер: адрес сервера базы данных PostgreSQL, например my-pgsql-server.postgres.database.azure.com.

    • База данных: имя базы данных, например my_database.

      Снимок экрана: раздел

    • Имя подключения: введите имя подключения.

    • Тип проверки подлинности, выберите "Базовый" и введите имя пользователя и пароль для базы данных.

      Примечание.

      В настоящее время потоки событий Fabric поддерживают проверку подлинности только Basic.

    • Выберите "Подключиться" , чтобы завершить параметры подключения. Снимок экрана: раздел учетных данных подключения для соединителя базы данных PostgreSQL.

  3. Порт. Введите номер порта сервера. Значение по умолчанию — 5432. Если выбранное облачное подключение настроено в Управление подключениями и шлюзами, убедитесь, что номер порта соответствует тому, который указан там. Если они не совпадают, то номер порта облачного подключения в Управление подключениями и шлюзами имеет приоритет.

  4. При записи изменений из таблиц базы данных можно выбрать один из двух вариантов:

    • Все таблицы: запись изменений из каждой таблицы в базе данных.
    • Введите имена таблиц: позволяет указать подмножество таблиц с помощью списка, разделенного запятыми. Можно использовать полные идентификаторы таблицы в формате schemaName.tableName или допустимых регулярных выражениях. Примеры.
    • dbo.test.*: выберите все таблицы из схемы test, имена которых начинаются с dbo.
    • dbo\.(test1|test2): выберите dbo.test1 и dbo.test2.

    В списке можно объединить оба формата. Общее ограничение символов для всей записи составляет 102 400 символов.

  5. Имя слота (необязательно): Введите имя слота логического декодирования PostgreSQL, который был создан для потоковой передачи изменений из определенного подключаемого модуля для конкретной базы данных или схемы. Сервер использует этот слот для потоковой передачи событий в соединитель потоковой передачи Eventstream. Он должен содержать только строчные буквы, цифры и знаки подчеркивания.

    • Если это не указано, для создания слота используется GUID, требующий соответствующих разрешений базы данных.
    • Если указанное имя слота существует, соединитель использует его напрямую.
  6. Разверните расширенные настройки, чтобы получить доступ к дополнительным вариантам настройки источника CDC базы данных PostgreSQL.

    • Имя публикации: указывает имя публикации логической репликации PostgreSQL, используемой. Это значение должно соответствовать существующей публикации в базе данных или автоматически создаваться в зависимости от режима автоматического создания. Значение по умолчанию: dbz_publication.

      Примечание.

      Пользователь соединителя должен иметь разрешения суперпользователя для создания публикации. Рекомендуется вручную создать публикацию перед первым запуском коннектора, чтобы избежать проблем, связанных с разрешениями.

    • Режим автоматического создания публикации: определяет, создается ли публикация и как она создается автоматически. Возможные варианты:

      • Filtered (по умолчанию): если указанная публикация не существует, соединитель создает публикацию, которая включает только выбранные таблицы (как указано в списке включаемых таблиц).
      • AllTables: Если указанная публикация существует, соединитель использует её. Если он не существует, коннектор создаёт объект, который включает в себя все таблицы в базе данных.
      • Disabled: соединитель не создает публикацию. Если указанная публикация отсутствует, соединитель создает исключение и останавливается. В этом случае публикация должна быть создана вручную в базе данных.

      Дополнительные сведения см. в документации Debezium по режиму автоматического создания публикации

    • Режим десятичной обработки: указывает, как соединитель обрабатывает значения PostgreSQL DECIMAL и NUMERIC столбцов:

      • Precise: представляет значения с использованием точных десятичных типов (например, Java BigDecimal), чтобы обеспечить полную точность и точность представления данных.
      • Double: преобразует значения в числа с плавающей запятой двойной точности. Этот параметр повышает удобство использования и производительность, но может привести к потере точности.
      • String: кодирует значения в виде форматированных строк. Этот параметр упрощает их использование в подчиненных системах, но теряет семантические сведения о исходном числовом типе.
    • Режим моментального снимка: укажите условия для выполнения моментального снимка при запуске соединителя:

      • Initial: Соединитель запускает снимок только если не записаны смещения для имени логического сервера или если обнаруживается, что предыдущий снимок не завершился. После завершения моментального снимка соединитель начинает потоковую передачу записей событий для последующих изменений базы данных.
      • InitialOnly: Соединитель запускает моментальный снимок только в том случае, если не зафиксированы сдвиги для имени логического сервера. После завершения моментального снимка соединитель останавливается. Он не переходит на потоковую передачу для чтения событий изменения из binlog.
      • NoData: соединитель запускает снапшот, который сохраняет только схему, но не данные таблицы. Установите этот параметр, если вам не нужен согласованный моментальный снимок данных, но необходимо только изменения, происходящие с момента запуска соединителя.
    • Запрос действия Heartbeat: указывает запрос, который соединитель выполняет в исходной базе данных, когда соединитель отправляет сообщение пульса.

    • Переопределение инструкции выбора снимка: указывает строки таблицы, которые необходимо включить в снимок. Используйте свойство, если требуется, чтобы в снимок вошло только подмножество строк таблицы. Это свойство влияет только на моментальные снимки. Он не применяется к событиям, которые соединитель считывает из журнала.

Сведения о потоке или источнике

  1. На странице "Подключение" выполните одно из этих действий на основе того, используете ли вы поток событий или концентратор Real-Time.

    • Eventstream:

      В области сведений о источнике справа выполните следующие действия:

      1. Чтобы изменить имя источника, нажмите кнопку Карандаш.

      2. Обратите внимание, что имя потока событий и имя Stream доступны только для чтения.

    • центрReal-Time:

      В разделе сведений о потоке справа выполните следующие действия.

      1. Выберите рабочую область Fabric где нужно создать поток событий.

      2. Для имени eventstream нажмите кнопку «Карандаш» и введите название для eventstream.

      3. Значение имени потока автоматически создается шляхом добавления -stream к имени eventstream. Этот поток отображается на странице "Все потоки данных " концентратора в режиме реального времени, когда мастер завершит работу.

  2. Нажмите кнопку "Далее" в нижней части страницы "Настройка ".

Проверка и подключение

На экране "Обзор и подключение" просмотрите сводку и выберите "Добавить (Eventstream) или "Connect (Real-Time hub).

Страница обработки схемы

  1. На шаге обработки схемы выберите один из следующих вариантов:

    • События, готовые к аналитике и автоматически обновляемая схема (предварительная версия DeltaFlow): соединитель преобразует необработанные события CDC в потоки, готовые к аналитике, которые отражают структуру исходной таблицы. DeltaFlow дополняет события метаданными, такими как тип изменения (вставка, обновление или удаление) и метки времени, а также автоматически управляет целевыми таблицами и эволюцией схемы.
    • Необработанные события CDC: Соединитель принимает и делает доступными необработанные события CDC. При необходимости соединитель может автоматически обнаружить схемы таблиц и зарегистрировать их в реестре схем. Используйте этот параметр, если требуется осведомленность о схеме без преобразования DeltaFlow.

    Примечание.

    На следующем снимке экрана показана Azure SQL Database CDC. Параметры обработки схем одинаковы для всех поддерживаемых соединителей источника CDC.

    Снимок экрана: шаг обработки схемы с параметрами событий DeltaFlow и Raw CDC для соединителя источника CDC.

  2. Включите ассоциацию схемы событий.

  3. Для Workspace выберите рабочую область Fabric для набора схем.

  4. Для набора схем по умолчанию выбран параметр +Create , который создает новый набор схем. Его можно изменить, чтобы выбрать существующий набор схем событий.

  5. Если вы выбрали параметр +Создать на предыдущем шаге, введите имя набора схем.

  6. На странице «Проверка + подключение» просмотрите сводку, а затем выберите «Добавить» (Eventstream) или «Подключить» (Real-Time hub).

    Снимок экрана: страница проверки и создания соединителя базы данных PostgreSQL с расширенными функциями.

    Для всех таблиц или выбранных таблиц в базе данных PostgreSQL соединитель автообнаружает и создает схемы и регистрирует их в реестре схем.

DeltaFlow: готовое для аналитики преобразование событий (предварительная версия)

При включении событий, готовых для аналитики, и автообновляемой схемы (DeltaFlow) коннектор предоставляет следующие возможности:

  • Фигура события, готовая к аналитике: Сырые события Debezium CDC преобразуются в табличный формат, который отражает структуру исходной таблицы. События обогащены столбцами метаданных, включая тип изменения (insertилиupdatedelete) и метку времени события.
  • Автоматическое управление таблицами назначения: при маршрутизации потоков с поддержкой DeltaFlow в поддерживаемое назначение, например в хранилище событий, целевые таблицы автоматически создаются для сопоставления схемы исходной таблицы. Вам не нужно вручную создавать или настраивать целевые таблицы.
  • Обработка эволюции схемы. При изменении исходных таблиц баз данных (например, добавляются новые столбцы или создаются таблицы), DeltaFlow автоматически обнаруживает изменения, обновляет зарегистрированные схемы и корректирует целевые таблицы соответствующим образом. Это поведение сводит к минимуму вмешательство вручную, вызванное изменениями схемы.

Примечание.

DeltaFlow (предварительная версия) в настоящее время поддерживается для источников CDC: Azure SQL Database, Azure SQL Managed Instance, SQL Server на виртуальной машине и PostgreSQL.

Дополнительные сведения о том, как DeltaFlow преобразует необработанные события CDC в готовые к аналитике выходные данные, включая типы операций и столбцы метаданных, см. в разделе "Преобразование выходных данных DeltaFlow".

Просмотр обновленного потока событий

  1. Вы можете увидеть источник базы данных PostgreSQL CDC, добавленный в поток событий, в режиме редактирования.

    Снимок экрана: источник CDC для потоковой передачи PostgreSQL DB в режиме редактирования с расширенными функциями.

  2. Чтобы реализовать новый добавленный источник CDC базы данных PostgreSQL, выберите "Опубликовать". После выполнения этих действий источник CDC базы данных PostgreSQL доступен для визуализации в режиме реального времени.

    Снимок экрана потока источника CDC базы данных PostgreSQL в режиме реального времени с расширенными функциями.

Настройте направления Eventstream для использования схем

В настоящее время для потоков событий со связанными схемами поддерживаются только Eventhouse, настраиваемая конечная точка и назначения производных потоков. В этом разделе показано, как добавить и настроить назначение Eventhouse, если для потока событий включены расширенные функции (например, поддержка схемы).

Примечание.

При использовании DeltaFlow (предварительная версия) с поддерживаемым источником отслеживания измененных данных (CDC) целевые таблицы в хранилище событий автоматически создаются и управляются для сопоставления структуры исходной таблицы. Вам не нужно вручную настраивать схему целевой таблицы. DeltaFlow также обрабатывает эволюцию схемы автоматически при изменении исходных таблиц.

Настройте схему для назначения пользовательской конечной точки.

  1. Выберите "Преобразовать события" или "Добавить назначение", а затем выберите CustomEndpoint.

  2. На панели настраиваемых конечных точек укажите имя назначения.

  3. Для схемы входных данных выберите схему для событий. При включении поддержки схемы для потока событий вы можете выбрать этот флажок.

Снимок экрана: панель настройки пользовательской конечной точки.

Подробные инструкции по настройке назначения настраиваемой конечной точки см. в разделе "Добавление настраиваемой конечной точки" или назначения пользовательского приложения в поток событий.

Настройте схемы для места назначения eventhouse

  1. Выберите "Преобразовать события" или "Добавить назначение", а затем выберите Eventhouse.

  2. На панели eventhouse настройте следующие параметры, связанные со схемой:

    1. Для входной схемы выберите одну или несколько схем из раскрывающегося списка.

      Снимок экрана: область конфигурации дома событий с выбранной входной схемой.

      Примечание.

      Если вы выбрали опцию динамическая схема через заголовки при настройке источника Центров событий, возможно, вы сконфигурировали несколько схем для источника и сопоставили их с различными свойствами и значениями.

    2. Для метода создания таблицы выберите одну таблицу со всеми схемами, объединенными или отдельными таблицамидля каждой схемы в зависимости от ваших требований.

      Снимок экрана: область конфигурации дома событий с методами создания таблицы.

    3. Для записи данных с помощью выберите один из следующих вариантов:

      • Только полезная нагрузка: запишите извлеченные полезные данные в таблицу. Если существует несколько входных схем, данные отправляются в несколько таблиц.
      • Метаданные и полезные данные: запись метаданных и полезных данных в одну таблицу. Примеры столбцов: source , , subjecttypeи data.

      Снимок экрана: область конфигурации дома событий с параметрами записи данных.

Подробные инструкции по настройке назначения eventhouse см. в разделе "Добавление хранилища событий в поток событий" .

Просмотр данных DeltaFlow, подготовленных для аналитики (предварительная версия)

Если вы включили анализ-подготовленные события и автоматически обновляемую схему (DeltaFlow), целевые таблицы автоматически создаются в структуре, которая отражает таблицы вашей исходной базы данных. Каждая таблица содержит исходные столбцы, а также столбцы метаданных для типа изменения и метки времени.

Примечание.

На следующем снимке экрана показана Azure SQL Database CDC. Выходные данные целевой таблицы DeltaFlow одинаковы для всех поддерживаемых соединителей источника CDC.

Снимок экрана: таблицы назначения Eventhouse, созданные DeltaFlow в форме, готовой к аналитике.

Эти таблицы можно запрашивать с помощью языка запросов Kusto (KQL) или других средств аналитики без необходимости анализировать необработанные полезные данные Debezium CDC.

Другие соединители: