Condividi tramite


Abilitazione della sincronizzazione in tempo reale delle modifiche ai dati di MongoDB Atlas in Azure Synapse Analytics

Azure Synapse Analytics

L'analisi in tempo reale consente di prendere decisioni rapide ed eseguire azioni automatizzate in base alle informazioni dettagliate correnti. Può anche aiutare a offrire esperienze avanzate per i clienti. Questa soluzione descrive come mantenere sincronizzati i pool di dati di Azure Synapse Analytics con le modifiche ai dati operativi in MongoDB.

Architettura

Il diagramma seguente illustra come implementare la sincronizzazione in tempo reale da Atlas ad Azure Synapse Analytics. Questo flusso semplice garantisce che tutte le modifiche apportate nella raccolta Atlas di MongoDB vengano replicate nel repository predefinito di Azure Data Lake Storage nell'area di lavoro di Azure Synapse Analytics. Dopo che i dati si trovano in Data Lake Storage, è possibile usare le pipeline di Azure Synapse Analytics per eseguire il push dei dati in pool SQL dedicati, pool di Spark o altre soluzioni, a seconda dei requisiti di analisi.

Diagramma che mostra un'architettura per l'implementazione della sincronizzazione in tempo reale da MongoDB Atlas ad Azure Synapse Analytics.

Scaricare un file di PowerPoint di questa architettura.

Flusso di dati

Le modifiche in tempo reale nell'archivio dati operativo di MongoDB Atlas (ODS) vengono acquisite e rese disponibili a Data Lake Storage in un'area di lavoro di Azure Synapse Analytics per casi d'uso di analisi in tempo reale, report live e dashboard.

  1. Le modifiche ai dati nell'archivio dati operativo/transazionale di MongoDB Atlas vengono acquisite dai trigger Atlas.

  2. Quando un trigger di database Atlas osserva un evento, passa il tipo di modifica e il documento modificato (completo o delta) a una funzione Atlas.

  3. La funzione Atlas attiva una funzione di Azure, passando l'evento di modifica e un documento JSON.

  4. Funzioni di Azure usa la libreria client di Archiviazione di Azure Files Data Lake per scrivere il documento modificato nell'archivio Data Lake configurato nell'area di lavoro di Azure Synapse Analytics.

  5. Dopo che i dati si trovano in Data Lake Storage, possono essere inviati a pool SQL dedicati, pool Spark e altre soluzioni. In alternativa, è possibile convertire i dati da JSON a formati Parquet o Delta usando i flussi di dati di Azure Synapse Analytics o Le pipeline di copia per eseguire report bi aggiuntivi o intelligenza artificiale/Machine Learning sui dati correnti.

Componenti

  • I flussi di modifica atlas di MongoDB consentono di notificare alle applicazioni le modifiche apportate a una raccolta, a un database o a un cluster di distribuzione. I flussi di modifica offrono alle applicazioni l'accesso alle modifiche dei dati in tempo reale e consentono loro di reagire immediatamente alle modifiche. Questa funzionalità è fondamentale nei casi d'uso, ad esempio il rilevamento degli eventi IoT e le modifiche ai dati finanziari, in cui gli allarmi devono essere generati e azioni reattive devono essere eseguite immediatamente. I trigger atlas usano flussi di modifica per monitorare le raccolte per rilevare le modifiche e richiamare automaticamente la funzione Atlas associata in risposta all'evento trigger.
  • I trigger Atlas rispondono a inserimenti, aggiornamenti ed eliminazioni di documenti in una raccolta specifica e possono richiamare automaticamente una funzione Atlas in risposta all'evento di modifica.
  • Le funzioni Atlas sono implementazioni di codice JavaScript lato serverless che possono eseguire azioni in base agli eventi che richiamano un trigger Atlas. La combinazione di trigger Atlas con le funzioni Atlas semplifica l'implementazione di architetture guidate dagli eventi.
  • Funzioni di Azure è una piattaforma di calcolo serverless basata su eventi che è possibile usare per sviluppare applicazioni in modo efficiente con il linguaggio di programmazione preferito. È anche possibile usarlo per connettersi senza problemi ad altri servizi di Azure. In questo scenario, una funzione di Azure acquisisce un evento di modifica e la usa per scrivere un BLOB contenente i dati modificati in Data Lake Storage usando la libreria client di Archiviazione di Azure Files Data Lake.
  • Data Lake Storage è la soluzione di archiviazione predefinita in Azure Synapse Analytics. È possibile usare pool serverless per eseguire query sui dati direttamente.
  • Le pipeline e iflussi di dati in Azure Synapse Analytics possono essere usati per eseguire il push del BLOB che contiene i dati modificati di MongoDB in pool SQL dedicati o pool Spark per un'ulteriore analisi. Le pipeline consentono di agire sui set di dati modificati in Data Lake Storage usando sia trigger di eventi di archiviazione che trigger pianificati per creare soluzioni per casi d'uso in tempo reale e quasi in tempo reale. Questa integrazione accelera l'utilizzo downstream dei set di dati delle modifiche.

Diagramma che illustra come le pipeline di Azure Synapse Analytics possono eseguire il push dei dati nei pool.

Alternative

Questa soluzione usa i trigger Atlas per eseguire il wrapping del codice per l'ascolto dei flussi di modifiche di Atlas e l'attivazione di Funzioni di Azure in risposta all'evento di modifica. È quindi molto più semplice implementare la soluzione alternativa fornita in precedenza. Per questa soluzione, è necessario scrivere codice per ascoltare i flussi di modifica in un'app Web del servizio app di Azure.

Un'altra alternativa consiste nell'usare il connettore Spark MongoDB per leggere i dati di flusso di MongoDB e scriverli nelle tabelle Delta. Il codice viene eseguito continuamente in un notebook Spark che fa parte di una pipeline in Azure Synapse Analytics. Per altre informazioni sull'implementazione di questa soluzione, vedere Sincronizzare da Atlas ad Azure Synapse Analytics usando lo streaming Spark.

Tuttavia, l'uso di trigger Atlas con Funzioni di Azure offre una soluzione completamente serverless. Poiché è serverless, la soluzione offre scalabilità e ottimizzazione dei costi affidabili. I prezzi si basano su un modello di costo con pagamento in base al consumo. È possibile risparmiare più denaro usando la funzione Atlas per combinare alcuni eventi di modifica prima di richiamare l'endpoint Funzioni di Azure. Questa strategia può essere utile in scenari con traffico elevato.

Inoltre, Microsoft Fabric unifica il patrimonio di dati e semplifica l'esecuzione di analisi e intelligenza artificiale sui dati, in modo da ottenere rapidamente informazioni dettagliate. Azure Synapse Analytics data engineering, data science, data warehousing e analisi in tempo reale in Fabric possono ora usare meglio i dati MongoDB di cui viene eseguito il push in OneLake. È possibile usare entrambi i connettori dataflow Gen2 e pipeline di dati per Atlas per caricare i dati Atlas direttamente in OneLake. Questo meccanismo senza codice consente di inserire dati da Atlas a OneLake.

Diagramma che mostra come Microsoft Fabric esegue il push dei dati in OneLake.

In Fabric è possibile fare riferimento direttamente ai dati di cui viene eseguito il push in Data Lake Storage usando i collegamenti OneLake, senza estrarre, trasformare, caricare (ETL).

È possibile eseguire il push dei dati in Power BI per creare report e visualizzazioni per la creazione di report bi.

Dettagli dello scenario

MongoDB Atlas, il livello dati operativo di molte applicazioni aziendali, archivia i dati dalle applicazioni interne, dai servizi rivolti ai clienti e dalle API di terze parti da più canali. È possibile usare le pipeline di dati in Azure Synapse Analytics per combinare questi dati con dati relazionali di altre applicazioni tradizionali e con dati non strutturati da origini come log, archivi oggetti e clickstream.

Le aziende usano funzionalità mongoDB come aggregazioni, nodi analitici, Atlas Search, Ricerca vettoriale, Atlas Data Lake, Atlas SQL Interface, Data Federation e Grafici per abilitare l'intelligence basata sulle applicazioni. I dati transazionali in MongoDB, tuttavia, vengono estratti, trasformati e caricati in pool SQL dedicati di Azure Synapse Analytics o pool spark per batch, intelligenza artificiale/machine learning e analisi e intelligence bi del data warehouse.

Esistono due scenari per lo spostamento dei dati tra Atlas e Azure Synapse Analytics: integrazione batch e sincronizzazione in tempo reale.

Integrazione di batch

È possibile usare l'integrazione batch e micro-batch per spostare i dati da Atlas a Data Lake Storage in Azure Synapse Analytics. È possibile recuperare l'intero dato cronologico contemporaneamente o recuperare i dati incrementali in base ai criteri di filtro.

Le istanze locali di MongoDB e MongoDB Atlas possono essere integrate come risorsa di origine o sink in Azure Synapse Analytics. Per informazioni sui connettori, vedere Copiare dati da o in MongoDB o Copiare dati da o in MongoDB Atlas.

Il connettore di origine semplifica l'esecuzione di Azure Synapse Analytics sui dati operativi archiviati in MongoDB locale o in Atlas. È possibile recuperare i dati da Atlas usando il connettore di origine e caricare i dati in Data Lake Storage in formati Parquet, Avro, JSON e testo o come archiviazione BLOB CSV. Questi file possono quindi essere trasformati o uniti con altri file da altre origini dati in scenari cloud multi-database, multicloud o ibridi. Questo caso d'uso è comune negli scenari EDW (Enterprise Data Warehouse) e analisi su larga scala. È anche possibile usare il connettore sink per archiviare i risultati dell'analisi in Atlas. Per altre informazioni sull'integrazione batch, vedere Analizzare i dati operativi in MongoDB Atlas con Azure Synapse Analytics.

Sincronizzazione in tempo reale

L'architettura descritta in questo articolo consente di implementare la sincronizzazione in tempo reale per mantenere aggiornata l'archiviazione di Azure Synapse Analytics con i dati operativi di MongoDB.

Questa soluzione è costituita da due funzioni principali:

  • Acquisizione delle modifiche in Atlas
  • Attivazione della funzione di Azure per propagare le modifiche ad Azure Synapse Analytics

Acquisizione delle modifiche in Atlas

È possibile acquisire le modifiche usando un trigger Atlas, che è possibile configurare nell'interfaccia utente aggiungi trigger o usando l'API di amministrazione di Atlas App Services. I trigger sono in ascolto delle modifiche del database causate da eventi di database come inserimenti, aggiornamenti ed eliminazioni. I trigger Atlas attivano anche una funzione Atlas quando viene rilevato un evento di modifica. È possibile usare l'interfaccia utente aggiungi trigger per aggiungere la funzione. È anche possibile creare una funzione Atlas e associarla come endpoint di chiamata del trigger usando l'API Di amministrazione di Atlas.

Lo screenshot seguente mostra il modulo che è possibile usare per creare e modificare un trigger Atlas. Nella sezione Trigger Source Details (Dettagli origine trigger ) specificare la raccolta che il trigger controlla gli eventi di modifica e gli eventi del database che controlla (inserimento, aggiornamento, eliminazione e/o sostituzione).

Screenshot che mostra il modulo per la creazione di un trigger Atlas.

Il trigger può richiamare una funzione Atlas in risposta all'evento per cui è abilitato. Lo screenshot seguente mostra il semplice codice JavaScript, aggiunto come funzione Atlas, per richiamare in risposta al trigger del database. La funzione Atlas richiama una funzione di Azure, passando i metadati dell'evento di modifica insieme al documento per cui è stato inserito, aggiornato, eliminato o sostituito, a seconda del trigger abilitato.

Screenshot che mostra il codice JavaScript aggiunto al trigger.

Codice della funzione Atlas

Il codice della funzione Atlas attiva la funzione di Azure associata all'endpoint della funzione di Azure passando l'intero changeEvent nel corpo della richiesta alla funzione di Azure.

È necessario sostituire il segnaposto <Azure function URL endpoint> con l'endpoint effettivo dell'URL della funzione di Azure.

exports =  function(changeEvent) {

    // Invoke Azure function that inserts the change stream into Data Lake Storage.
    console.log(typeof fullDocument);
    const response =  context.http.post({
        url: "<Azure function URL endpoint>",
        body: changeEvent,
        encodeBodyAsJSON: true
    });
    return response;
};

Attivare la funzione di Azure per propagare le modifiche ad Azure Synapse Analytics

La funzione Atlas viene codificata per richiamare una funzione di Azure che scrive il documento di modifica in Data Lake Storage in Azure Synapse Analytics. La funzione di Azure usa la libreria client di Azure Data Lake Storage per Python SDK per creare un'istanza della classe che rappresenta l'account DataLakeServiceClient di archiviazione.

La funzione di Azure usa una chiave di archiviazione per l'autenticazione. È anche possibile usare le implementazioni OAuth di Microsoft Entra ID. Gli attributi storage_account_key e gli altri attributi correlati a Dake Lake Storage vengono recuperati dalle variabili di ambiente del sistema operativo configurate. Dopo la decodifica del corpo della richiesta, fullDocument (l'intero documento inserito o aggiornato) viene analizzato dal corpo della richiesta e quindi scritto in Data Lake Storage dalle funzioni client di Data Lake append_data e flush_data.

Per un'operazione di eliminazione, viene usato fullDocumentBeforeChange anziché fullDocument. fullDocument non ha alcun valore in un'operazione di eliminazione, quindi il codice recupera il documento eliminato, che viene acquisito in fullDocumentBeforeChange. Si noti che fullDocumentBeforeChange viene popolato solo quando l'impostazione Preimage documento è impostata su attivato, come illustrato nello screenshot precedente.

import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a new request.')
    logging.info(req)
    storage_account_name = os.environ["storage_account_name"]
    storage_account_key = os.environ["storage_account_key"]
    storage_container = os.environ["storage_container"]
    storage_directory = os.environ["storage_directory"]
    storage_file_name = os.environ["storage_file_name"]
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
            "https", storage_account_name), credential=storage_account_key)
    json_data = req.get_body()
    logging.info(json_data)
    object_id = "test"
    try:
        json_string = json_data.decode("utf-8")
        json_object = json.loads(json_string)

        if json_object["operationType"] == "delete":
            object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
        else:
            object_id = json_object["fullDocument"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}

        logging.info(object_id)
        encoded_data = json.dumps(data)
    except Exception as e:
        logging.info("Exception occurred : "+ str(e))

    file_system_client = service_client.get_file_system_client(file_system=storage_container)
    directory_client = file_system_client.get_directory_client(storage_directory)
    file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
    file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
    file_client.flush_data(len(encoded_data))
    return func.HttpResponse(f"This HTTP triggered function executed successfully.")

Finora si è visto come il trigger Atlas acquisisce qualsiasi modifica che si verifica e la passa a una funzione di Azure tramite una funzione Atlas e che la funzione di Azure scrive il documento di modifica come nuovo file in Data Lake Storage nell'area di lavoro di Azure Synapse Analytics.

Dopo l'aggiunta del file a Data Lake Storage, è possibile configurare un trigger di eventi di archiviazione per attivare una pipeline in grado di scrivere il documento di modifica in un pool SQL dedicato o in una tabella del pool Spark. La pipeline può usare l'attività Di copia e trasformare i dati usando un flusso di dati. In alternativa, se la destinazione finale è un pool SQL dedicato, è possibile modificare la funzione di Azure per scrivere direttamente nel pool SQL dedicato in Azure Synapse Analytics. Per un pool SQL, ottenere la stringa di connessione ODBC per la connessione al pool SQL. Vedere Usare Python per eseguire query su un database per un esempio di codice Python che è possibile usare per eseguire query sulla tabella del pool SQL usando la stringa di connessione. È possibile modificare questo codice per usare una query Di inserimento per scrivere in un pool SQL dedicato. Sono disponibili impostazioni di configurazione e ruoli che devono essere assegnati per consentire alla funzione di scrivere in un pool SQL dedicato. Le informazioni su queste impostazioni e ruoli non rientrano nell'ambito di questo articolo.

Se si vuole una soluzione near real-time e non è necessario sincronizzare i dati in tempo reale, l'uso delle esecuzioni pianificate della pipeline potrebbe essere un'opzione valida. È possibile configurare i trigger pianificati per attivare una pipeline con l'attività di copia o un flusso di dati, a una frequenza quasi in tempo reale che l'azienda può permettere, di usare il connettore MongoDB per recuperare i dati da MongoDB inseriti, aggiornati o eliminati tra l'ultima esecuzione pianificata e l'esecuzione corrente. La pipeline usa il connettore MongoDB come connettore di origine per recuperare i dati differenziali da MongoDB Atlas ed eseguirne il push in Data Lake Storage o nei pool SQL dedicati di Azure Synapse Analytics, usando questi come connessioni sink. Questa soluzione usa un meccanismo di pull (anziché la soluzione principale descritta in questo articolo, che è un meccanismo di push) di MongoDB Atlas man mano che si verificano modifiche nella raccolta Atlas di MongoDB a cui il trigger Atlas è in ascolto.

Potenziali casi d'uso

MongoDB e i servizi analitici ed EDW di Azure Synapse Analytics possono soddisfare numerosi casi d'uso:

Vendita al dettaglio

  • Creazione di intelligenza nella creazione di bundle di prodotti e promozione dei prodotti
  • Implementazione di clienti 360 e iper-personalizzazione
  • Stima dell'esaurimento delle scorte e ottimizzazione degli ordini della supply chain
  • Implementazione di prezzi di sconto dinamici e ricerca intelligente nell'e-commerce

Servizi bancari e finanziari

  • Personalizzazione dei servizi finanziari dei clienti
  • Rilevamento e blocco di transazioni fraudolente

Telecomunicazioni

  • Ottimizzazione delle reti di nuova generazione
  • Ottimizzazione del valore delle reti perimetrali

Automobili

  • Ottimizzazione della parametrizzazione dei veicoli connessi
  • Rilevamento di anomalie nelle comunicazioni IoT nei veicoli connessi

Produzione

  • Fornitura di manutenzione predittiva per macchinari
  • Ottimizzazione della gestione dell'archiviazione e dell'inventario

Considerazioni

Queste considerazioni implementano i pilastri di Azure Well-Architected Framework, che è un set di principi guida che possono essere usati per migliorare la qualità di un carico di lavoro. Per altre informazioni, vedere Well-Architected Framework.

Sicurezza

La sicurezza offre garanzie contro attacchi intenzionali e l'uso improprio dei dati e dei sistemi preziosi. Per altre informazioni, vedere Elenco di controllo per la revisione della progettazione per la sicurezza.

Funzioni di Azure è un servizio gestito serverless, quindi le risorse dell'app e i componenti della piattaforma sono protetti da sicurezza avanzata. È tuttavia consigliabile usare il protocollo HTTPS e le versioni più recenti di TLS. È anche consigliabile convalidare l'input per assicurarsi che si tratti di un documento di modifica di MongoDB. Vedere Protezione di Funzioni di Azure per considerazioni sulla sicurezza per Funzioni di Azure.

MongoDB Atlas è un database gestito come servizio, quindi MongoDB offre una maggiore sicurezza della piattaforma. MongoDB offre più meccanismi per garantire la sicurezza a 360 gradi per i dati archiviati, tra cui l'accesso al database, la sicurezza di rete, la crittografia dei dati inattivi e in transito e la sovranità dei dati. Vedere MongoDB Atlas Security for the MongoDB Atlas security white paper (White paper sulla sicurezza di MongoDB) e altri articoli che consentono di assicurarsi che i dati in MongoDB siano protetti durante tutto il ciclo di vita dei dati.

Ottimizzazione dei costi

L'ottimizzazione dei costi è incentrata sui modi per ridurre le spese non necessarie e migliorare l'efficienza operativa. Per altre informazioni, vedere Elenco di controllo per la revisione della progettazione per Ottimizzazione costi.

Per stimare il costo dei prodotti e delle configurazioni di Azure, usare il calcolatore prezzi di Azure . Azure consente di evitare costi superflui determinando il numero corretto di risorse da usare tramite l'analisi della spesa nel tempo e il dimensionamento in base alle esigenze aziendali senza spese in eccesso. Funzioni di Azure comporta costi solo quando vengono richiamati. Tuttavia, a seconda del volume di modifiche in MongoDB Atlas, è possibile valutare l'uso di un meccanismo di invio in batch nella funzione Atlas per archiviare le modifiche in un'altra raccolta temporanea e attivare la funzione di Azure solo se il batch supera un determinato limite.

Per informazioni sui cluster Atlas, vedere 5 modi per ridurre i costi con MongoDB Atlas e i costi di configurazione del cluster. La pagina dei prezzi di MongoDB consente di comprendere le opzioni di determinazione dei prezzi per i cluster Atlas mongoDB e altre offerte della piattaforma di dati per sviluppatori Di MongoDB Atlas. Atlas Data Federation può essere distribuito in Azure e supporta Archiviazione BLOB di Azure (in anteprima). Se si sta valutando l'uso dell'invio in batch per ottimizzare i costi, è consigliabile scrivere nell'archivio BLOB anziché in una raccolta temporanea di MongoDB.

Efficienza delle prestazioni

L'efficienza delle prestazioni si riferisce alla capacità del carico di lavoro di ridimensionarsi per soddisfare in modo efficiente le esigenze degli utenti. Per altre informazioni, vedere Elenco di controllo per la revisione della progettazione per l'efficienza delle prestazioni.

I trigger Atlas e le Funzioni di Azure vengono testati in tempo per ottenere prestazioni e scalabilità. Per informazioni sulle prestazioni e sulla scalabilità per Funzioni di Azure, vedere Prestazioni e scalabilità in Funzioni di Azure. Per alcune considerazioni su come migliorare le prestazioni delle istanze di MongoDB Atlas, vedere Scalabilità su richiesta . Vedere La Guida alle procedure consigliate per le prestazioni di MongoDB per le procedure consigliate per la configurazione di MongoDB Atlas.

Conclusione

MongoDB Atlas si integra perfettamente con Azure Synapse Analytics, consentendo ai clienti atlas di usare facilmente Atlas come origine o sink per Azure Synapse Analytics. Questa soluzione consente di usare i dati operativi di MongoDB in tempo reale da Azure Synapse Analytics per l'analisi complessa e l'inferenza dell'intelligenza artificiale.

Distribuire lo scenario

Real-Time sincronizzazione da MongoDB Atlas ad Azure Synapse Analytics

Collaboratori

Questo articolo viene gestito da Microsoft. Originariamente è stato scritto dai seguenti contributori.

Autori principali:

Altri contributori:

  • Sunil Sabat | Principal Program Manager - Team di Azure Data Factory
  • Wee Hyong Tok | Direttore principale del PM - Team di Azure Data Factory

Per visualizzare i profili LinkedIn non pubblici, accedere a LinkedIn.

Passaggi successivi