Ускорение анализа больших данных в реальном времени с помощью соединителя Spark

Применимо к: База данных SQL Azure Управляемый экземпляр SQL Azure

Примечание.

По состоянию на сентябрь 2020 г. этот соединитель уже не поддерживается на должном уровне. Эта статья сохраняется только для архивных целей.

Соединитель Spark позволяет базам данных в Базе данных SQL Azure, Управляемом экземпляре SQL Azure и SQL Server действовать в качестве источника входных данных или приемника выходных данных для заданий Spark. Это позволяет использовать данные о транзакциях в реальном времени при анализе больших данных и сохранять результаты для нерегламентированных запросов или отчетов. По сравнению со встроенным соединителем JDBC этот соединитель обеспечивает возможность массовой вставки данных в базу данных. Он может в 10–20 раз превосходить по производительности вставку по строкам. Соединитель Spark поддерживает проверку подлинности с помощью Microsoft Entra ID (ранее Azure Active Directory) для подключения к базе данных SQL Azure и управляемому экземпляру SQL Azure, что позволяет подключаться к базе данных из Azure Databricks с помощью учетной записи Microsoft Entra. Она предоставляет аналогичные интерфейсы со встроенным соединителем JDBC. Имеющиеся задания Spark можно легко перенести, чтобы использовать этот коннектор.

Примечание.

Microsoft Entra ID ранее назывался Azure Active Directory (Azure AD).

Скачивание и создание соединителя Spark

Репозиторий GitHub для старого соединителя, ранее связанного с этой страницей, уже не поддерживается на должном уровне. Настоятельно рекомендуется оценить новый соединитель и использовать его вместо прежнего.

Официальные поддерживаемые версии

Компонент Версия
Apache Spark 2.0.2 или более поздние версии
язык программирования Scala 2.10 или более поздние версии
Драйвер Microsoft JDBC для SQL Server 6.2 или более поздние версии
Microsoft SQL Server SQL Server 2008 или более поздняя версия
База данных SQL Azure Поддерживается
Управляемый экземпляр SQL Azure Поддерживается

Соединитель Spark использует Microsoft JDBC Driver для SQL Server, чтобы перемещать данные между рабочими узлами Spark и базами данных.

Поток данных выглядит следующим образом:

  1. Главный узел Spark подключается к базам данных в Базе данных SQL или SQL Server и загружает данные из определенной таблицы или использует специальный SQL-запрос.
  2. Главный узел Spark распределяет данные в рабочие узлы для преобразования.
  3. Рабочий узел подключается к базам данных, которые подключаются к Базе данных SQL и SQL Server и записывает данные в базу данных. Пользователь может использовать вставку по строкам или массовую вставку.

На схеме ниже показан поток данных.

На схеме показан описанный поток с главным узлом, который подключается непосредственно к базе данных и подключается к трем рабочим узлам, которые подключаются к базе данных.

Создание соединителя Spark

Сейчас в проекте соединителя используется maven. Чтобы создать соединитель без зависимостей, выполните следующую команду:

  • mvn clean package;
  • скачать последние версии JAR-файла из папки release;
  • включить JAR-файл Базы данных SQL Spark.

Подключение и чтение данных с помощью соединителя Spark

Из задания Spark можно подключаться к базам данных в SQL Database и SQL Server для чтения или записи данных. Кроме того, можно выполнить запрос DML или DDL в базах данных в Базе данных SQL и SQL Server.

Чтение данных из Azure SQL и SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients",
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Чтение данных из Azure SQL и SQL Server с помощью указанного SQL-запроса

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

Запись данных в Azure SQL или SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Acquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients",
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

Выполнение запроса DML или DDL в Azure SQL и SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.sqlDBQuery(config)

Подключение из Spark с помощью проверки подлинности Microsoft Entra

Вы можете подключиться к базе данных SQL и управляемому экземпляру SQL с помощью проверки подлинности Microsoft Entra. Используйте проверку подлинности Microsoft Entra для централизованного управления удостоверениями пользователей базы данных и в качестве альтернативы проверке подлинности SQL.

Подключение с помощью режима проверки подлинности ActiveDirectoryPassword

Требования к настройке

Если вы используете режим проверки подлинности ActiveDirectoryPassword, скачайте microsoft-authentication-library-for-java и его зависимости и включите их в путь сборки Java.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Подключение с помощью токена доступа

Требования к настройке

Если вы используете режим проверки подлинности на основе маркеров доступа, необходимо скачать microsoft-authentication-library-for-java и его зависимости и включить их в путь сборки Java.

Ознакомьтесь с проверкой подлинности Microsoft Entra для SQL Azure , чтобы узнать, как получить маркер доступа к базе данных в Базе данных SQL Azure или Управляемом экземпляре SQL Azure.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.windows.net",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token",
  "hostNameInCertificate" -> "*.database.windows.net",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

Запись данных с помощью массовой вставки

Традиционный соединитель JDBC записывает данные в базу данных путем построчной вставки. Соединитель Spark можно использовать для массовой вставки данных в Azure SQL и SQL Server. Это значительно повышает производительность записи при загрузке больших наборов данных или загрузке данных в таблицы, где используется индекс columnstore.

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.