Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo illustra come interagire con Azure Cosmos DB usando Synapse Apache Spark 3. I clienti possono usare Scala, Python, SparkSQL e C#, per scenari di analisi, ingegneria dei dati, data science ed esplorazione dei dati in Collegamento ad Azure Synapse per Azure Cosmos DB.
Le funzionalità seguenti sono supportate durante l'interazione con Azure Cosmos DB:
- Apache Spark 3 per Synapse consente di analizzare i dati nei contenitori Azure Cosmos DB abilitati con Collegamento ad Azure Synapse quasi in tempo reale senza influire sulle prestazioni dei carichi di lavoro transazionali. Per eseguire una query sull'archivio analitico di Azure Cosmos DB da Spark sono disponibili le due opzioni seguenti:
- Creazione di un frame di dati Spark
- Creazione di una tabella Spark
- Apache Spark per Synapse consente inoltre di inserire dati in Azure Cosmos DB. È importante notare che i dati vengono sempre inseriti in contenitori di Azure Cosmos DB tramite l'archivio transazionale. Quando il collegamento a Synapse è abilitato, eventuali nuovi inserimenti, aggiornamenti ed eliminazioni vengono sincronizzati automaticamente con l'archivio analitico.
- Synapse Apache Spark supporta anche lo streaming strutturato Spark con Azure Cosmos DB come origine e sink.
Le sezioni seguenti illustrano la sintassi. È anche possibile eseguire il checkout del modulo Learn su come eseguire query su Azure Cosmos DB con Apache Spark per Azure Synapse Analytics. I movimenti nell'area di lavoro di Azure Synapse Analytics sono progettati per offrire una semplice esperienza predefinita per iniziare. I movimenti sono visibili quando si fa clic con il pulsante destro del mouse su un contenitore Azure Cosmos DB nella scheda Dati dell'area di lavoro di Synapse. Con i movimenti è possibile generare rapidamente il codice e modificarlo in base alle esigenze. Sono inoltre perfetti per l'individuazione dei dati con un singolo clic.
Important
È necessario tenere presente alcuni vincoli nello schema analitico che potrebbero causare un comportamento imprevisto nelle operazioni di caricamento dei dati. Ad esempio, solo le prime 1.000 proprietà dello schema transazionale sono disponibili nello schema analitico, le proprietà con spazi non sono disponibili e così via. Se si verificano risultati imprevisti, controllare i vincoli dello schema dell'archivio analitico per altri dettagli.
Eseguire query sull'archivio analitico di Azure Cosmos DB
I clienti possono caricare i dati dell'archivio analitico in dataframe Spark o creare tabelle Spark.
La differenza nell'esperienza si verifica se le modifiche ai dati nel contenitore di Azure Cosmos DB devono essere riflesse automaticamente nell'analisi eseguita in Spark. Quando vengono registrati i dataframe Spark o viene creata una tabella Spark, Spark recupera i metadati dell'archivio analitico per un pushdown efficiente. È importante tenere presente che, poiché Spark segue un criterio di valutazione lazy, È necessario intervenire per recuperare l'ultimo snapshot dei dati nelle query Spark DataFrame o SparkSQL.
In caso di caricamento nel dataframe Spark, i metadati recuperati vengono memorizzati nella cache per tutta la durata della sessione Spark e quindi le azioni successive richiamate nel dataframe vengono valutate rispetto allo snapshot dell'archivio analitico al momento della creazione del dataframe.
D'altra parte, in aso di creazione di una tabella Spark, i metadati dello stato dell'archivio analitico non vengono memorizzati nella cache in Spark e vengono ricaricati a ogni esecuzione della query SparkSQL sulla tabella Spark.
Per concludere, è possibile scegliere tra il caricamento di uno snapshot nel dataframe Spark o l'esecuzione di query su una tabella Spark per lo snapshot più recente.
Note
Per eseguire una query sugli account Azure Cosmos DB for MongoDB, è possibile ottenere altre informazioni sulla rappresentazione dello schema con fedeltà completa nell'archivio analitico e sui nomi di proprietà estese da usare.
Note
Tutti gli oggetti options
fanno distinzione tra maiuscole e minuscole.
Authentication
Ora i clienti Spark 3.x possono effettuare l'autenticazione all'archivio analitico di Azure Cosmos DB utilizzando i token di accesso con identità fidate o le chiavi dell'account del database. I token sono più sicuri poiché hanno una vita breve e sono assegnati alle autorizzazioni necessarie tramite il RBAC di Cosmos DB.
Il connettore ora supporta due tipi di autenticazione, MasterKey
e AccessToken
, per la proprietà spark.cosmos.auth.type
.
Autenticazione della chiave master
Usare la chiave per leggere un dataframe usando spark:
val config = Map(
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.accountKey" -> "<key>",
"spark.cosmos.database" -> "<db>",
"spark.cosmos.container" -> "<container>"
)
val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)
Autenticazione del token di accesso
La nuova autenticazione senza chiave introduce il supporto per i token di accesso:
val config = Map(
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.auth.type" -> "AccessToken",
"spark.cosmos.auth.accessToken" -> "<accessToken>",
"spark.cosmos.database" -> "<db>",
"spark.cosmos.container" -> "<container>"
)
val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)
Note
Il connettore Synapse Link Spark di Azure Cosmos DB non supporta l'identità gestita.
L'autenticazione del token di accesso richiede l'assegnazione di ruolo
Per usare l'approccio al token di accesso, è necessario generare token di accesso. Poiché i token di accesso sono associati alle identità di Azure, è necessario assegnare all'identità il controllo degli accessi in base al ruolo corretto. L'assegnazione di ruolo è a livello di piano dati ed è necessario disporre delle autorizzazioni minime del piano di controllo per eseguire l'assegnazione di ruolo.
Le assegnazioni di ruolo IAM (Identity Access Management) dal portale di Azure sono a livello di piano di controllo e non influiscono sulle assegnazioni di ruolo nel piano dati. Le assegnazioni di ruolo del piano dati sono disponibili solo tramite l'interfaccia della riga di comando di Azure. L'azione readAnalytics
è necessaria per leggere i dati dall'archivio analitico in Cosmos DB e non fa parte di alcun ruolo predefinito. Di conseguenza, è necessario creare una definizione di ruolo personalizzata. Oltre all'azione readAnalytics
, aggiungere anche le azioni necessarie per lettore dati. Creare un file JSON con il contenuto seguente e denominarlo role_definition.json
{
"RoleName": "CosmosAnalyticsRole",
"Type": "CustomRole",
"AssignableScopes": ["/"],
"Permissions": [{
"DataActions": [
"Microsoft.DocumentDB/databaseAccounts/readAnalytics",
"Microsoft.DocumentDB/databaseAccounts/readMetadata",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/executeQuery",
"Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed"
]
}]
}
L'autenticazione del token di accesso richiede l'interfaccia della riga di comando di Azure
- Accedere all'interfaccia della riga di comando di Azure:
az login
- Impostare la sottoscrizione predefinita che ha il tuo account Cosmos DB:
az account set --subscription <name or id>
- Creare la definizione del ruolo nell'account Cosmos DB desiderato:
az cosmosdb sql role definition create --account-name <cosmos-account-name> --resource-group <resource-group-name> --body @role_definition.json
- Copiare l'oggetto
definition id
del ruolo restituito:/subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.DocumentDB/databaseAccounts/< cosmos-account-name >/sqlRoleDefinitions/<a-random-generated-guid>
- Ottenere l'ID dell'identità di sicurezza a cui si desidera assegnare il ruolo. L'identità può essere una registrazione dell'app di Azure, una macchina virtuale o qualsiasi altra risorsa di Azure supportata. Assegnare il ruolo all'entità di sicurezza usando:
az cosmosdb sql role assignment create --account-name "<cosmos-account-name>" --resource-group "<resource-group>" --scope "/" --principal-id "<principal-id-of-identity>" --role-definition-id "<role-definition-id-from-previous-step>"
Note
Quando si usa una registrazione dell'app di Azure, usare Object Id
come ID principale del servizio. Inoltre, l'ID principale e l'account Cosmos DB devono trovarsi nello stesso tenant.
Generazione del token di accesso - Synapse Notebook
Il metodo consigliato per Synapse Notebooks è utilizzare un'entità servizio con un certificato per generare token di accesso. Per altre informazioni, fare clic qui.
The following code snippet has been validated to work in a Synapse notebook
val tenantId = "<azure-tenant-id>"
val clientId = "<client-id-of-service-principal>"
val kvLinkedService = "<azure-key-vault-linked-service>"
val certName = "<certificate-name>"
val token = mssparkutils.credentials.getSPTokenWithCertLS(
"https://<cosmos-account-name>.documents.azure.com/.default",
"https://login.microsoftonline.com/" + tenantId, clientId, kvLinkedService, certName)
A questo punto è possibile usare il token di accesso generato in questo passaggio per leggere i dati dall'archivio analitico quando il tipo di autenticazione è impostato per il token di accesso.
Note
Quando si usa una registrazione dell'app di Azure, usare l'applicazione (ID del client).
Note
Attualmente Synapse non supporta la generazione di token di accesso usando il pacchetto azure-identity nei notebook. Inoltre, i VHD di Synapse non includono il pacchetto azure-identity e le relative dipendenze. Per altre informazioni, fare clic qui.
Creazione di un frame di dati Spark
In questo esempio viene creato un dataframe Spark che punta all'archivio analitico di Azure Cosmos DB. È quindi possibile eseguire più analisi richiamando azioni Spark sul dataframe. Questa operazione non ha alcun impatto sull'archivio transazionale.
La sintassi in Python è la seguente:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
df = spark.read.format("cosmos.olap")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.load()
La sintassi equivalente in Scala sarà la seguente:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
load()
Creazione di una tabella Spark
In questo esempio si crea una tabella Spark che punta all'archivio analitico di Azure Cosmos DB. È quindi possibile eseguire un'analisi aggiuntiva richiamando le query SparkSQL sulla tabella. Questa operazione non influisce sull'archivio transazionale o non comporta lo spostamento dei dati. Se si decide di eliminare la tabella Spark, il contenitore Azure Cosmos DB sottostante e l'archivio analitico corrispondente non saranno interessati.
Questo scenario è utile per riusare le tabelle Spark tramite strumenti di terze parti e fornire accessibilità ai dati sottostanti per il runtime.
La sintassi per creare una tabella Spark è la seguente:
%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options
create table call_center using cosmos.olap options (
spark.synapse.linkedService '<enter linked service name>',
spark.cosmos.container '<enter container name>'
)
Note
Se si hanno scenari in cui lo schema del contenitore Azure Cosmos DB sottostante cambia nel tempo e se si vuole che lo schema aggiornato rifletta automaticamente le query sulla tabella Spark, è possibile ottenere questo risultato impostando l'opzione spark.cosmos.autoSchemaMerge
su true
nelle opzioni della tabella Spark.
Scrivere un dataframe Spark sul contenitore Azure Cosmos DB
In questo esempio si scrive un dataframe Spark in un contenitore di Azure Cosmos DB. Questa operazione influisce sulle prestazioni dei carichi di lavoro transazionali e sull'utilizzo delle unità richiesta di cui è stato effettuato il provisioning nel contenitore di Azure Cosmos DB o nel database condiviso.
La sintassi in Python è la seguente:
# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
YOURDATAFRAME.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.mode('append')\
.save()
La sintassi equivalente in Scala sarà la seguente:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
import org.apache.spark.sql.SaveMode
df.write.format("cosmos.oltp").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
mode(SaveMode.Append).
save()
Caricamento di dataframe in streaming da un contenitore
In questo movimento si usa la funzionalità Spark Streaming per caricare i dati da un contenitore in un dataframe. I dati vengono archiviati nell'account del data lake primario (e nel file system) che hai connesso all'area di lavoro.
Note
Altre informazioni su come fare riferimento alle librerie esterne in Synapse Apache Spark, sono disponibili qui. Ad esempio, se si vuole inserire un dataframe Spark in un contenitore di Azure Cosmos DB per MongoDB, è possibile usare il connettore MongoDB per Spark qui.
Caricamento di dataframe in streaming da un contenitore Azure Cosmos DB
In questo esempio si usa lo streaming strutturato di Spark per caricare i dati da un contenitore di Azure Cosmos DB in un dataframe di streaming Spark, usando la funzionalità del feed di modifiche in Azure Cosmos DB. I dati del checkpoint usati da Spark verranno archiviati nell'account data lake primario (e nel file system) connesso all'area di lavoro.
La sintassi in Python è la seguente:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
dfStream = spark.readStream\
.format("cosmos.oltp.changeFeed")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("spark.cosmos.changeFeed.startFrom", "Beginning")\
.option("spark.cosmos.changeFeed.mode", "Incremental")\
.load()
La sintassi equivalente in Scala sarà la seguente:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val dfStream = spark.readStream.
format("cosmos.oltp.changeFeed").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("spark.cosmos.changeFeed.startFrom", "Beginning").
option("spark.cosmos.changeFeed.mode", "Incremental").
load()
Scrittura di dataframe in streaming su un contenitore Azure Cosmos DB
In questo esempio si scrive un dataframe di streaming in un contenitore di Azure Cosmos DB. Questa operazione influisce sulle prestazioni dei carichi di lavoro transazionali e sull'utilizzo delle unità richiesta di cui è stato effettuato il provisioning nel contenitore o nel database condiviso di Azure Cosmos DB. Se la cartella /localWriteCheckpointFolder non viene creata (nell'esempio seguente), viene creata automaticamente.
La sintassi in Python è la seguente:
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
streamQuery = dfStream\
.writeStream\
.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "<enter linked service name>")\
.option("spark.cosmos.container", "<enter container name>")\
.option("checkpointLocation", "/tmp/myRunId/")\
.outputMode("append")\
.start()
streamQuery.awaitTermination()
La sintassi equivalente in Scala sarà la seguente:
// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")
val query = dfStream.
writeStream.
format("cosmos.oltp").
outputMode("append").
option("spark.synapse.linkedService", "<enter linked service name>").
option("spark.cosmos.container", "<enter container name>").
option("checkpointLocation", "/tmp/myRunId/").
start()
query.awaitTermination()
Next steps
- Esempi per iniziare a usare Collegamento ad Azure Synapse in GitHub
- Altre informazioni sulle funzionalità supportate da Collegamento ad Azure Synapse per Azure Cosmos DB
- Connettersi a Collegamento a Synapse per Azure Cosmos DB
- Consulta il modulo Learn per l'esecuzione di query su Azure Cosmos DB con Apache Spark per Azure Synapse Analytics.