Добавить источник CDC базы данных MySQL в поток событий

В этой статье показано, как добавить источник отслеживания измененных данных MySQL в поток событий.

Соединитель Change Data Capture (CDC) Azure MySQL Database позволяет создавать моментальный снимок текущих данных в базе данных Azure MySQL. Вы указываете таблицы для отслеживания и получения оповещений при последующих изменениях на уровне строк в таблицах. После записи изменений в потоке можно обработать эти данные CDC в режиме реального времени и отправить их в разные места назначения в Fabric для дальнейшей обработки или анализа.

В настоящее время база данных MySQL CDC поддерживается в следующих службах, где базы данных могут быть доступны общедоступно.

  • База данных Azure для MySQL
  • Amazon RDS для MySQL
  • Amazon Aurora MySQL
  • Google Cloud SQL для MySQL (GCP).

В этом руководстве в качестве примера используется Azure Database for MySQL CDC.

После добавления источника CDC базы данных MySQL в поток событий он записывает изменения на уровне строк в указанные таблицы. Затем эти изменения можно обрабатывать в режиме реального времени и отправлять в разные места назначения для дальнейшего анализа.

Предварительные условия

  • Доступ к рабочей области в режиме лицензии на емкость Fabric или в режиме пробной лицензии с разрешениями уровня "Contributor" или выше.
  • Доступ к экземпляру базы данных MySQL, например, к базе данных в Azure Database for MySQL — Гибкий сервер.
  • База данных MySQL должна быть общедоступной, а не за брандмауэром или защищенной в виртуальной сети. Если он находится в защищенной сети, подключитесь к нему с помощью инъекции виртуальной сети соединителя Eventstream.
  • Если у вас нет потока событий, создайте поток событий.

Настройка базы данных MySQL

Соединитель использует соединитель Debezium MySQL для записи изменений в базе данных MySQL. Необходимо определить пользователя MySQL с соответствующими привилегиями во всех базах данных, из которых соединитель обмена сообщениями может записывать изменения. Вы можете напрямую использовать пользователя администратора для подключения к базе данных, которая обычно имеет соответствующие привилегии, или выполнить следующие действия, чтобы создать нового пользователя:

Примечание.

Новая учетная запись пользователя или администратора и соответствующий пароль используются для подключения к базе данных позже внутри eventstream.

  1. В командной строке mysql создайте пользователя MySQL:

    mysql> CREATE USER 'user'@'%' IDENTIFIED BY 'password';
    
  2. Предоставьте пользователю необходимые привилегии:

    mysql> GRANT SELECT, SHOW DATABASES, REPLICATION REPLICA, REPLICATION CLIENT ON *.* TO 'user'@'%';
    

    Примечание.

    Если глобальная блокировка чтения недоступна, как это бывает в размещаемых вариантах, таких как Amazon Relational Database Service (RDS) или Aurora, тогда для создания согласованного моментального снимка используются блокировки на уровне таблицы. В этом случае необходимо предоставить пользователю разрешение LOCK TABLES. Кроме того, для поддержки FLUSH операций во время моментального снимка также может потребоваться предоставить RELOAD или FLUSH_TABLES привилегии.

  3. Завершите работу с разрешениями пользователя:

    mysql> FLUSH PRIVILEGES;
    

Чтобы убедиться, что у пользователя или администратора есть необходимые привилегии, выполните следующую команду, а затем необходимо отобразить необходимые привилегии на шаге 2.

SHOW GRANTS FOR user;

Дополнительные сведения о предоставлении необходимых разрешений пользователю см. в документации по Debezium connector for MySQL: Debezium Documentation.

Включение binlog

Необходимо включить двоичное ведение журнала для репликации MySQL. Двоичные журналы записывают обновления транзакций для средств репликации для распространения изменений. В этом разделе используется Azure Database for MySQL CDC в качестве примера для отображения шагов конфигурации.

  1. На странице портала Azure учетной записи Azure Database for MySQL выберите параметры Server в разделе Settings в области навигации слева.

  2. На странице параметров сервера настройте следующие свойства и нажмите кнопку "Сохранить".

    • Для binlog_row_image выберите полный.

    • Для binlog_expire_logs_seconds задайте количество секунд, ожидаемых службой до очистки файла двоичного журнала. Задайте значение, соответствующее потребностям вашей среды, например 86400.

    Снимок экрана: параметры binlog для репликации в параметрах сервера.

Добавление MySQL Database (DB) Change Data Capture (CDC) в качестве источника

Если вы еще не добавили источник в поток событий, выберите плитку "Подключить источники данных ". Вы также можете выбрать Добавить источник>Подключить источники данных на ленте.

Снимок экрана, показывающий выбор плитки для использования внешнего источника.

Если вы добавляете источник в уже опубликованный поток событий, переключитесь в режим редактирования . На ленте выберите Добавить источник>Подключить источники данных.

Снимок экрана: выборы для добавления внешних источников.

На странице "Выбор источника данных" найдите и выберите "Подключиться" на плитке Базы данных MySQL (CDC).

Снимок экрана: выбор базы данных MySQL (CDC) в качестве исходного типа в мастере получения событий.

Настройка и подключение к базе данных MySQL (CDC)

  1. На экране "Подключение" в разделе "Подключение" выберите "Создать подключение", чтобы создать облачное подключение.

    Снимок экрана: страница

  2. Введите следующие параметры подключения и учетные данные подключения для базы данных MySQL и нажмите кнопку "Подключить".

  3. Введите следующую информацию, чтобы настроить источник данных CDC базы данных MySQL, а затем нажмите кнопку Далее.

    • порт: значение по умолчанию — 3306. Если выбранное облачное подключение настроено в Управление подключениями и шлюзами, убедитесь, что номер порта соответствует тому, который указан там. Если они не соответствуют, номер порта в облачном подключении в Управление подключениями и шлюзами имеет приоритет.

    • таблица: выберите Все таблицы или введите имя(я) таблицы. Если выберете последний вариант, укажите таблицы, используя список полных идентификаторов таблиц, разделённых запятыми (databaseName.tableName), или допустимые регулярные выражения. Рассмотрим пример.

      • Используйте databaseName.test.*, чтобы выбрать все таблицы, имена которых начинаются с databaseName.test.
      • Используйте databaseName\.(test1|test2) для выбора databaseName.test1 и databaseName.test2.

      Вы можете смешивать оба формата с помощью запятых. Общее ограничение символов для всей записи составляет 102 400 символов.

    • Идентификатор сервера: введите уникальное значение для каждого сервера и клиента репликации в кластере MySQL. Значение по умолчанию ― 1000.

    Примечание.

    Задайте для каждого средства чтения разные идентификаторы сервера. Каждый клиент базы данных MySQL для чтения двоичных журналов должен иметь уникальный идентификатор, называемый идентификатором сервера. Сервер MySQL использует этот идентификатор для поддержания сетевого подключения и позиции binlog. Разные задачи, использующие один и тот же идентификатор сервера, могут привести к чтению с неправильной позиции бинлога. Поэтому рекомендуется задать для каждого средства чтения разные идентификаторы сервера.

  4. Вы можете развернуть Расширенные настройки, чтобы получить доступ к дополнительным параметрам конфигурации для источника CDC базы данных MySQL.

    • Режим блокировки моментальных снимков: Варианты:
      • Minimal (default): содержит глобальную блокировку чтения только на начальном этапе для записи схемы и метаданных. Остальная часть моментального снимка использует транзакцию REPEATABLE READ, позволяя обновлять данные во время чтения данных.
      • Extended: обеспечивает глобальную блокировку чтения на период создания снимка, блокируя все операции записи. Используйте для полной согласованности, если блокировка записи допустима.
      • None: пропускает установку блокировок таблицы во время создания моментального снимка. Безопасно, только если во время процесса изменения схемы не происходят.
    • Режим обработки десятичных знаков: указывает, как соединитель обрабатывает значения столбцов DECIMAL и NUMERIC.
      • Precise: представляет значения с использованием точных десятичных типов (например, Java BigDecimal), чтобы обеспечить полную точность и точность представления данных.
      • Double: преобразует значения в числа с плавающей запятой двойной точности. Этот параметр повышает удобство использования и производительность, но может привести к потере точности.
      • String: кодирует значения в виде форматированных строк. Этот параметр упрощает их использование в подчиненных системах, но теряет семантические сведения о исходном числовом типе.
    • Режим моментального снимка: укажите условия для выполнения моментального снимка при запуске соединителя:
      • Initial: соединитель запускает моментальный снимок только при отсутствии смещения для имени логического сервера или если он обнаруживает, что предыдущий моментальный снимок не выполнен. После завершения моментального снимка соединитель начинает потоковую передачу записей событий для последующих изменений базы данных.
      • InitialOnly: соединитель запускает моментальный снимок только в том случае, если для имени логического сервера ещё не записаны смещения. После завершения моментального снимка соединитель останавливается. Он не переходит на потоковую передачу для чтения событий изменения из binlog.
      • NoData: соединитель запускает снапшот, который сохраняет только схему, но не данные таблицы. Установите этот параметр, если вам не нужен согласованный моментальный снимок данных, но необходимо только изменения, происходящие с момента запуска соединителя.

Сведения о потоке или источнике

  1. На странице "Подключение" выполните одно из этих действий на основе того, используете ли вы поток событий или концентратор Real-Time.

    • Eventstream:

      В области сведений о источнике справа выполните следующие действия:

      1. Чтобы изменить имя источника, нажмите кнопку Карандаш.

      2. Обратите внимание, что имя потока событий и имя Stream доступны только для чтения.

    • центрReal-Time:

      В разделе сведений о потоке справа выполните следующие действия.

      1. Выберите рабочую область Fabric где нужно создать поток событий.

      2. Для имени eventstream нажмите кнопку «Карандаш» и введите название для eventstream.

      3. Значение имени потока автоматически создается шляхом добавления -stream к имени eventstream. Этот поток отображается на странице "Все потоки данных " концентратора в режиме реального времени, когда мастер завершит работу.

  2. Нажмите кнопку "Далее" в нижней части страницы "Настройка ".

Проверка и подключение

На экране "Обзор и подключение" просмотрите сводку и выберите "Добавить (Eventstream) или "Connect (Real-Time hub).

Просмотр обновленного потока событий

  1. Вы видите источник базы данных MySQL (CDC), который добавлен в ваш поток событий в Режиме редактирования.

    Скриншот добавленного источника CDC базы данных Azure MySQL в режиме редактирования с выделенной кнопкой

  2. Выберите Опубликовать, чтобы опубликовать изменения и начать передачу данных CDC базы данных MySQL в поток событий.

    Снимок экрана добавленного источника CDC базы данных MySQL Azure в режиме реального времени.

Другие соединители: