Включение синхронизации данных MongoDB Atlas в Azure Synapse Analytics в режиме реального времени
Аналитика в режиме реального времени помогает принимать быстрые решения и выполнять автоматизированные действия на основе текущей аналитики. Он также может помочь вам обеспечить расширенный интерфейс клиентов. Это решение описывает, как обеспечить синхронизацию пулов данных Azure Synapse Analytics с изменениями операционных данных в MongoDB.
Архитектура
На следующей схеме показано, как реализовать синхронизацию в режиме реального времени из Atlas в Azure Synapse Analytics. Этот простой поток гарантирует, что все изменения, происходящие в коллекции MongoDB Atlas, реплицируются в репозиторий Azure Data Lake Storage по умолчанию в рабочей области Azure Synapse Analytics. После того как данные будут доступны в Data Lake Storage, вы можете использовать конвейеры Azure Synapse Analytics для отправки данных в выделенные пулы SQL, пулы Spark или другие решения в зависимости от требований аналитики.
Скачайте файл PowerPoint этой архитектуры.
Поток данных
Изменения в хранилище операционных данных MongoDB Atlas (ODS) записываются и становятся доступными для Data Lake Storage в рабочей области Azure Synapse Analytics для вариантов использования аналитики в режиме реального времени, динамических отчетов и панелей мониторинга.
Изменения данных в оперативном или транзакционном хранилище данных MongoDB записываются триггерами Atlas.
Когда триггер базы данных Atlas наблюдает за событием, он передает тип изменения и документ, измененный (полный или разностный) функции Atlas.
Функция Atlas активирует функцию Azure, передав событие изменения и документ JSON.
Функции Azure использует клиентская библиотека Data Lake файлов служба хранилища Azure для записи измененного документа в настроенную службу Data Lake Storage в рабочей области Azure Synapse Analytics.
После того как данные будут доступны в Data Lake Storage, их можно отправлять в выделенные пулы SQL, пулы Spark и другие решения. Кроме того, можно преобразовать данные из JSON в формат Parquet или Delta с помощью потоков данных Azure Synapse Analytics или конвейеров копирования для выполнения дополнительных отчетов бизнес-аналитики или машинного обучения на текущих данных.
Компоненты
- Потоки изменений MongoDB Atlas позволяют уведомлять приложения об изменениях в коллекции, базе данных или кластере развертывания. Потоки изменений предоставляют приложениям доступ к изменениям в режиме реального времени и позволяют им немедленно реагировать на изменения. Эта функция имеет решающее значение в случаях использования, таких как отслеживание событий Интернета вещей и изменения финансовых данных, в которых необходимо немедленно вызывать оповещения и реагировать на действия. Триггеры Atlas используют потоки изменений для отслеживания коллекций изменений и автоматического вызова связанной функции Atlas в ответ на событие триггера.
- Триггеры Atlas реагируют на вставки, обновления и удаления документов в определенной коллекции и могут автоматически вызывать функцию Atlas в ответ на событие изменения.
- Функции Atlas — это бессерверные реализации кода JavaScript на стороне сервера, которые могут выполнять действия на основе событий, вызывающих триггер Atlas. Объединение триггеров Atlas с функциями Atlas упрощает реализацию архитектур на основе событий.
- Функции Azure — это бессерверная платформа вычислений, которая позволяет эффективно разрабатывать приложения с помощью выбранного языка программирования. Его также можно использовать для эффективного подключения к другим службам Azure. В этом сценарии функция Azure записывает событие изменения и использует его для записи большого двоичного объекта, содержащего измененные данные в Data Lake Storage, с помощью клиентской библиотеки служба хранилища Azure Files Data Lake.
- Data Lake Storage — это решение по умолчанию для хранения в Azure Synapse Analytics. Бессерверные пулы можно использовать для запроса данных напрямую.
- Конвейеры и потоки данных в Azure Synapse Analytics можно использовать для отправки большого двоичного объекта, содержащего измененные данные MongoDB на выделенные пулы SQL или пулы Spark для дальнейшего анализа. Конвейеры позволяют работать с измененными наборами данных в Data Lake Storage с помощью триггеров событий хранилища и запланированных триггеров для создания решений как в режиме реального времени, так и практически в режиме реального времени. Эта интеграция ускоряет потребление нижестоящих наборов данных.
Альтернативные варианты
Это решение использует триггеры Atlas для упаковки кода для прослушивания потоков изменений Atlas и активации Функции Azure в ответ на событие изменения. Поэтому гораздо проще реализовать, чем ранее предоставленное альтернативное решение. Для этого решения необходимо написать код для прослушивания потоков изменений в веб-приложении Службы приложений Azure .
Другой альтернативой является использование соединителя MongoDB Spark для чтения потоков данных MongoDB и записи их в таблицы Delta. Код выполняется непрерывно в записной книжке Spark, которая входит в конвейер в Azure Synapse Analytics. Дополнительные сведения о реализации этого решения см. в статье "Синхронизация" из Atlas в Azure Synapse Analytics с помощью потоковой передачи Spark.
Однако использование триггеров Atlas с Функции Azure обеспечивает полностью бессерверное решение. Так как это бессерверное решение обеспечивает надежную масштабируемость и оптимизацию затрат. Цены основаны на модели затрат по мере использования. Вы можете сэкономить больше денег с помощью функции Atlas для объединения нескольких событий изменений перед вызовом конечной точки Функции Azure. Эта стратегия может быть полезна в сценариях с большим трафиком.
Кроме того, Microsoft Fabric объединяет ресурсы данных и упрощает выполнение аналитики и искусственного интеллекта по данным, чтобы быстро получить аналитические сведения. Проектирование данных Azure Synapse Analytics, обработка и хранение данных и аналитика в режиме реального времени в Fabric теперь могут лучше использовать данные MongoDB, которые были отправлены в OneLake. Соединители конвейера потоков данных 2-го поколения можно использовать для загрузки данных Atlas непосредственно в OneLake. Этот механизм без кода предоставляет эффективный способ приема данных из Atlas в OneLake.
В Fabric можно напрямую ссылаться на данные, отправленные в Data Lake Storage с помощью сочетаний клавиш OneLake, без извлечения, преобразования, загрузки (ETL).
Вы можете отправить данные в Power BI, чтобы создавать отчеты и визуализации для отчетов бизнес-аналитики.
Подробности сценария
MongoDB Atlas, операционный уровень данных многих корпоративных приложений, хранит данные из внутренних приложений, служб, подключенных к клиентам, и сторонних API из нескольких каналов. Конвейеры данных в Azure Synapse Analytics можно использовать для объединения этих данных с реляционными данными из других традиционных приложений и с неструктурированными данными из таких источников, как журналы, хранилища объектов и потоки щелчков.
Предприятия используют возможности MongoDB, такие как агрегирование, аналитические узлы, atlas search, Vector Search, Atlas Data Lake, Atlas Data Interface, Atlas SQL Interface, Data Federation и Chart , чтобы обеспечить аналитику на основе приложений. Однако данные транзакций в MongoDB извлекаются, преобразуются и загружаются в выделенные пулы SQL Azure Synapse Analytics или пулы Spark для пакетной службы, искусственного интеллекта и машинного обучения и аналитики бизнес-аналитики хранилища данных.
Существует два сценария перемещения данных между Atlas и Azure Synapse Analytics: пакетная интеграция и синхронизация в режиме реального времени.
Пакетная интеграция
Вы можете использовать пакетную и микро пакетную интеграцию для перемещения данных из Atlas в Data Lake Storage в Azure Synapse Analytics. Вы можете одновременно получить все исторические данные или получить добавочные данные на основе критериев фильтра.
Локальные экземпляры MongoDB и MongoDB Atlas можно интегрировать как источник или ресурс приемника в Azure Synapse Analytics. Сведения о соединителях см. в статье "Копирование данных из MongoDB" или "Копирование данных из MongoDB" или в MongoDB Atlas.
Исходный соединитель упрощает запуск Azure Synapse Analytics на операционных данных, хранящихся в локальной среде MongoDB или Atlas. Вы можете получить данные из Atlas с помощью исходного соединителя и загрузить данные в Data Lake Storage в Parquet, Avro, JSON и текстовых форматах или в виде хранилища BLOB-объектов CSV. Затем эти файлы можно преобразовать или объединить с другими файлами из других источников данных в сценариях с несколькими базами данных, мультиоблачными или гибридными облачными сценариями. Этот вариант использования распространен в корпоративных сценариях хранилища данных (EDW) и аналитики в масштабе. Соединитель приемника можно также использовать для хранения результатов аналитики в Atlas. Дополнительные сведения об интеграции пакетной службы см. в статье "Анализ операционных данных в MongoDB Atlas" с помощью Azure Synapse Analytics.
Синхронизация в режиме реального времени
Архитектура, описанная в этой статье, поможет реализовать синхронизацию в режиме реального времени для поддержания текущего хранилища Azure Synapse Analytics с операционными данными MongoDB.
Это решение состоит из двух основных функций:
- Запись изменений в Atlas
- Активация функции Azure для распространения изменений в Azure Synapse Analytics
Запись изменений в Atlas
Изменения можно записать с помощью триггера Atlas, который можно настроить в пользовательском интерфейсе добавления триггера или с помощью API администрирования служб приложений Atlas. Триггеры прослушивают изменения базы данных, вызванные событиями базы данных, такими как вставки, обновления и удаления. Триггеры Atlas также активируют функцию Atlas при обнаружении события изменения. Для добавления функции можно использовать пользовательский интерфейс добавления триггера . Вы также можете создать функцию Atlas и связать ее в качестве конечной точки вызова триггера с помощью API администрирования Atlas.
На следующем снимку экрана показана форма, которую можно использовать для создания и редактирования триггера Atlas. В разделе "Сведения о источнике триггера " укажите коллекцию, в которую триггер следит за событиями изменений и событиями базы данных( вставка, обновление, удаление и/или замена).
Триггер может вызвать функцию Atlas в ответ на событие, для которое оно включено. На следующем снимке экрана показан простой код JavaScript, добавленный в качестве функции Atlas, для вызова в ответ на триггер базы данных. Функция Atlas вызывает функцию Azure, передавая метаданные события изменения вместе с документом, который был вставлен, обновлен, удален или заменен в зависимости от того, для чего включен триггер.
Код функции Atlas
Код функции Atlas активирует функцию Azure, связанную с конечной точкой функции Azure, передав весь changeEvent
текст запроса в функцию Azure.
Заполнитель необходимо заменить фактической <Azure function URL endpoint>
конечной точкой URL-адреса функции Azure.
exports = function(changeEvent) {
// Invoke Azure function that inserts the change stream into Data Lake Storage.
console.log(typeof fullDocument);
const response = context.http.post({
url: "<Azure function URL endpoint>",
body: changeEvent,
encodeBodyAsJSON: true
});
return response;
};
Активация функции Azure для распространения изменений в Azure Synapse Analytics
Функция Atlas закодирована для вызова функции Azure, которая записывает документ изменений в Data Lake Storage в Azure Synapse Analytics. Функция Azure использует клиентская библиотека Azure Data Lake Storage для пакета SDK для Python для создания экземпляра DataLakeServiceClient
класса, представляющего учетную запись хранения.
Функция Azure использует ключ хранилища для проверки подлинности. Также можно использовать реализации OAuth идентификатора Microsoft Entra. Другие storage_account_key
атрибуты, связанные с Dake Lake Storage, извлекаются из настроенных переменных среды ОС. После декодирования fullDocument
текста запроса (весь вставленный или обновленный документ) анализируется из текста запроса, а затем записывается в Data Lake Storage с помощью клиентских функций append_data
flush_data
Data Lake.
Для операции fullDocumentBeforeChange
удаления используется вместо fullDocument
.
fullDocument
не имеет никакого значения в операции удаления, поэтому код извлекает документ, который был удален, который был записан в fullDocumentBeforeChange
. Обратите внимание, что заполнено только в том случае, fullDocumentBeforeChange
если для параметра предварительного просмотра документа задано значение, как показано на предыдущем снимке экрана.
import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a new request.')
logging.info(req)
storage_account_name = os.environ["storage_account_name"]
storage_account_key = os.environ["storage_account_key"]
storage_container = os.environ["storage_container"]
storage_directory = os.environ["storage_directory"]
storage_file_name = os.environ["storage_file_name"]
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", storage_account_name), credential=storage_account_key)
json_data = req.get_body()
logging.info(json_data)
object_id = "test"
try:
json_string = json_data.decode("utf-8")
json_object = json.loads(json_string)
if json_object["operationType"] == "delete":
object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
else:
object_id = json_object["fullDocument"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}
logging.info(object_id)
encoded_data = json.dumps(data)
except Exception as e:
logging.info("Exception occurred : "+ str(e))
file_system_client = service_client.get_file_system_client(file_system=storage_container)
directory_client = file_system_client.get_directory_client(storage_directory)
file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
file_client.flush_data(len(encoded_data))
return func.HttpResponse(f"This HTTP triggered function executed successfully.")
До сих пор вы узнали, как триггер Atlas фиксирует любые изменения, которые происходят и передает его в функцию Azure через функцию Atlas, и что функция Azure записывает документ изменений в виде нового файла в Data Lake Storage в рабочей области Azure Synapse Analytics.
После добавления файла в Data Lake Storage можно настроить триггер события хранилища для активации конвейера, который затем может записать документ изменений в выделенный пул SQL или в таблицу пула Spark. Конвейер может использовать действие копирования и преобразовать данные с помощью потока данных. Кроме того, если конечный целевой объект является выделенным пулом SQL, можно изменить функцию Azure для записи непосредственно в выделенный пул SQL в Azure Synapse Analytics. Для пула SQL получите строку подключения ODBC для подключения к пулу SQL. Сведения об использовании Python для запроса базы данных для примера кода Python, который можно использовать для запроса таблицы пула SQL с помощью строки подключения. Этот код можно изменить, чтобы использовать запрос Insert для записи в выделенный пул SQL. Существуют параметры конфигурации и роли, которым необходимо назначить функцию для записи в выделенный пул SQL. Сведения об этих параметрах и ролях находятся вне области этой статьи.
Если вы хотите, чтобы решение почти в режиме реального времени и не требуется синхронизация данных в режиме реального времени, использование запланированных запусков конвейера может быть хорошим вариантом. Вы можете настроить запланированные триггеры для активации конвейера с действием копирования или потоком данных с частотой почти в реальном времени, которую может позволить ваш бизнес, использовать соединитель MongoDB для получения данных из MongoDB, которые были вставлены, обновлены или удалены между последним запланированным запуском и текущим запуском. Конвейер использует соединитель MongoDB в качестве исходного соединителя, чтобы получить разностные данные из MongoDB Atlas и отправить его в Data Lake Storage или Azure Synapse Analytics выделенные пулы SQL, используя их в качестве подключений приемника. Это решение использует механизм извлечения (в отличие от основного решения, описанного в этой статье, который является механизмом принудительной передачи) из MongoDB Atlas, как изменения происходят в коллекции MongoDB Atlas, которую триггер Atlas прослушивает.
Потенциальные варианты использования
MongoDB и Azure Synapse Analytics EDW и аналитические службы могут служить многочисленными вариантами использования:
Розничная торговля
- Создание аналитики в области разработки продуктов и продвижения продуктов
- Реализация клиентом 360 и гипер-персонализации
- Прогнозирование истощения запасов и оптимизации заказов в цепочке поставок
- Реализация динамических цен на скидку и смарт-поиск в электронной коммерции
Банковское дело и финансы
- Настройка финансовых услуг клиентов
- Обнаружение и блокировка мошеннических транзакций
Телекоммуникации
- Оптимизация сетей следующего поколения
- Максимизация значения пограничных сетей
Автомобильная промышленность
- Оптимизация параметризации подключенных транспортных средств
- Обнаружение аномалий в обмене данными Интернета вещей в подключенных транспортных средствах
Производство
- Обеспечение прогнозного обслуживания для оборудования
- Оптимизация управления хранилищем и инвентаризацией
Рекомендации
Эти рекомендации реализуют основные принципы платформы Azure Well-Architected Framework, которая представляет собой набор руководящих принципов, которые можно использовать для улучшения качества рабочей нагрузки. Дополнительные сведения см. вWell-Architected Framework.
Безопасность
Безопасность обеспечивает гарантии от преднамеренного нападения и неправильного использования ценных данных и систем. Дополнительные сведения см. в контрольном списке проверки конструктора для безопасности.
Функции Azure — это бессерверная управляемая служба, поэтому ресурсы приложения и компоненты платформы защищены повышенной безопасностью. Однако рекомендуется использовать протокол HTTPS и последние версии TLS. Кроме того, рекомендуется проверить входные данные, чтобы убедиться, что это документ изменения MongoDB. Сведения о безопасности функций Azure см. в статье "Защита функций Azure ".
MongoDB Atlas — это управляемая база данных как услуга, поэтому MongoDB обеспечивает расширенную безопасность платформы. MongoDB предоставляет несколько механизмов для обеспечения безопасности 360-градусных данных для хранимых данных, включая доступ к базам данных, безопасность сети, шифрование неактивных данных и транзитив, а также суверенитет данных. См. технический документ по безопасности MongoDB Atlas и другие статьи, которые помогут обеспечить безопасность данных в MongoDB на протяжении всего жизненного цикла данных.
Оптимизация затрат
Оптимизация затрат фокусируется на способах сокращения ненужных расходов и повышения эффективности работы. Дополнительные сведения см. в контрольном списке проверки конструктора для оптимизации затрат.
Чтобы оценить стоимость продуктов и конфигураций Azure, используйте калькулятор цен Azure. Azure помогает избежать ненужных затрат, определяя правильное количество ресурсов для использования, анализ расходов с течением времени и масштабирование в соответствии с бизнес-потребностями без перерасхода ресурсов. Функции Azure нести расходы только при вызове. Однако в зависимости от объема изменений в MongoDB Atlas можно оценить с помощью механизма пакетной обработки в функции Atlas для хранения изменений в другой временной коллекции и активации функции Azure только в том случае, если пакет превышает определенное ограничение.
Сведения о кластерах Atlas см. в разделе "5 Способов сокращения затрат с помощью MongoDB Atlas и конфигурации кластера". Страница ценообразования MongoDB поможет вам понять варианты ценообразования для кластеров MongoDB Atlas и другие предложения платформы данных разработчика MongoDB Atlas. Федерация данных Atlas может быть развернута в Azure и поддерживает хранилище BLOB-объектов Azure (в предварительной версии). Если вы планируете использовать пакетную обработку для оптимизации затрат, рассмотрите возможность записи в хранилище BLOB-объектов вместо временной коллекции MongoDB.
Эффективность производительности
Эффективность производительности — это способность рабочей нагрузки эффективно масштабироваться в соответствии с требованиями пользователей. Дополнительные сведения см. в контрольном списке проверки конструктора для повышения эффективности.
Триггеры Atlas и Функции Azure проверяются на производительность и масштабируемость. Сведения о производительности и масштабируемости функций Azure см. в статье "Производительность и масштабируемость в устойчивых функциях Azure" (Функции Azure). Дополнительные сведения о повышении производительности экземпляров MongoDB Atlas см. в статье "Масштабирование по запросу ". Ознакомьтесь с руководством по рекомендациям по производительности MongoDB для настройки MongoDB Atlas.
Заключение
MongoDB Atlas легко интегрируется с Azure Synapse Analytics, что позволяет клиентам Atlas легко использовать Atlas в качестве источника или приемника для Azure Synapse Analytics. Это решение позволяет использовать операционные данные MongoDB в режиме реального времени из Azure Synapse Analytics для сложной аналитики и вывода искусственного интеллекта.
Развертывание этого сценария
синхронизацияReal-Time из MongoDB Atlas в Azure Synapse Analytics
Соавторы
Эта статья поддерживается корпорацией Майкрософт. Первоначально он был написан следующими участниками.
Основные авторы:
- Диана Энни Дженош | Старший архитектор решений — команда партнеров MongoDB
- Венкатеш Шанбаг | Старший архитектор решений — команда партнеров MongoDB
Другие участники:
- Сунил Сабат | Главный руководитель программы — команда ADF
- Wee Hyong Tok | Главный директор PM - команда ADF
Чтобы просмотреть недоступные профили LinkedIn, войдите в LinkedIn.