Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Соединитель Spark для хранилища данных Fabric позволяет разработчикам Spark и специалистам по обработке и анализу данных получать доступ к данным из хранилища и конечной точки аналитики SQL в лейкхаусе. Соединитель предлагает следующие возможности:
- Вы можете работать с данными из хранилища данных или SQL аналитики в одной рабочей области или в нескольких рабочих областях.
- Конечная точка аналитики SQL в Lakehouse автоматически обнаруживается в зависимости от контекста рабочей области.
- Соединитель имеет упрощенный API Spark, абстрагирует базовую сложность и работает только с одной строкой кода.
- При доступе к таблице или представлению соединитель поддерживает модели безопасности, определенные на уровне ядра SQL. К этим моделям относятся безопасность на уровне объектов (OLS), безопасность на уровне строк (RLS) и безопасность на уровне столбцов (CLS).
- Соединитель предварительно установлен в среде выполнения Fabric, что устраняет необходимость отдельной установки.
Примечание.
Соединитель в данный момент находится на этапе предварительного просмотра. Для получения дополнительной информации смотрите текущие соображения в этой статье.
Проверка подлинности
Проверка подлинности Microsoft Entra — это интегрированный подход к проверке подлинности. Пользователи входят в рабочую область Microsoft Fabric, и их учетные данные автоматически передаются в движок SQL для проверки подлинности и авторизации. Учетные данные автоматически сопоставляются, и от пользователей не требуется предоставление определенных параметров конфигурации.
Разрешения
Чтобы подключиться к подсистеме SQL, пользователям требуется по крайней мере разрешение на чтение (аналогично разрешение CONNECT в SQL Server) на конечной точке хранилища или аналитики SQL (уровень элемента). Пользователям также требуются подробные разрешения на уровень объекта для чтения данных из определенных таблиц или представлений. Дополнительные сведения см. в статье "Безопасность хранения данных" в Microsoft Fabric.
Шаблоны кода и примеры
Использование сигнатуры метода
Следующая команда показывает сигнатуру synapsesql
метода для запроса на чтение. Для доступа к таблицам или представлениям из хранилища и к конечной точке SQL-аналитики в лейкхаусе требуется аргумент, состоящий из трех частей tableName
. Обновите аргумент со следующими именами на основе вашего сценария:
- Часть 1. Имя склада или озера.
- Часть 2: Имя схемы.
- Часть 3. Имя таблицы или представления.
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame
Помимо чтения из таблицы или представления напрямую, этот соединитель также позволяет указать настраиваемый или сквозной запрос, который передается подсистеме SQL и результат возвращается обратно в Spark.
spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame
Хотя этот соединитель автоматически обнаруживает конечную точку для указанного хранилища или озера, если вы хотите явно указать ее, это можно сделать.
//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>")
Чтение данных в одной рабочей области
Внимание
Выполните следующие инструкции импорта в начале записной книжки или перед началом использования соединителя:
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants
Следующий код — это пример чтения данных из таблицы или представления в DataFrame Spark.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
Следующий код является примером для чтения данных из таблицы или представления в кадре данных Spark с ограничением количества строк 10:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)
Следующий код служит примером считывания данных из таблицы или представления в Spark DataFrame после применения фильтра.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")
Следующий код является примером чтения данных из таблицы или представления в DataFrame Spark, включая только выбранные столбцы.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")
Чтение данных между рабочими областями
Для доступа к данным из хранилища или озера в рабочих областях можно указать идентификатор рабочей области, в котором существует хранилище или озеро, а затем идентификатор элемента lakehouse или хранилища. В следующей строке приведен пример чтения данных из таблицы или представления в кадре данных Spark из хранилища или озера с указанным идентификатором рабочей области и идентификатором lakehouse или хранилища:
# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
Примечание.
При запуске записной книжки соединитель по умолчанию ищет указанный склад или lakehouse в рабочей области lakehouse, подключенной к записной книжке. Чтобы ссылаться на склад или lakehouse из другой рабочей области, укажите идентификатор рабочей области и идентификатор элемента lakehouse или склада, как описано выше.
Создание таблицы Lakehouse на основе данных из хранилища
Эти строки кода предоставляют пример чтения данных из таблицы или представления в Spark DataFrame и использования этих данных для создания lakehouse таблицы.
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")
Запись данных кадра данных Spark в таблицу хранилища
Этот соединитель использует двухфазный процесс записи в таблицу DW Fabric. Изначально он выполняет этапы данных кадра данных Spark в промежуточное хранилище, а затем с помощью COPY INTO
команды для приема данных в таблицу DW Fabric. Такой подход обеспечивает масштабируемость с увеличением объема данных.
Поддерживаемые режимы сохранения DataFrame
При записи исходных данных кадра данных в целевую таблицу в хранилище поддерживаются следующие режимы сохранения:
- ErrorIfExists (режим сохранения по умолчанию): если целевая таблица существует, то запись прерывается немедленно с исключением, возвращаемым вызывающей стороне. В противном случае создается новая таблица с данными.
- Будет проигнорировано: если целевая таблица существует, запрос на запись будет проигнорирован без возврата ошибки. В противном случае создается новая таблица с данными.
- Перезапись: Если целевая таблица существует, то существующие данные в ней заменяются новыми данными. В противном случае создается новая таблица с данными.
- Добавление. Если целевая таблица существует, к ней добавляются новые данные. В противном случае создается новая таблица с данными.
В следующем коде показаны примеры записи данных кадра данных Spark в таблицу DW Fabric:
df.write.synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>") # this uses default mode - errorifexists
df.write.mode("errorifexists").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("ignore").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("append").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("overwrite").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
Примечание.
Соединитель поддерживает запись в таблицу DW Fabric, поскольку конечная точка аналитики SQL в системе Lakehouse доступна только для чтения.
Устранение неполадок
После завершения прочитанный ответ отображается в выходных данных ячейки. Сбой в текущей ячейке также отменяет последующие выполнения ячеек записной книжки. Подробные сведения об ошибке доступны в журналах приложений Spark.
Рекомендации по использованию этого соединителя
В настоящее время соединитель:
- Поддерживает извлечение или чтение данных из хранилищ Fabric и конечных точек аналитики SQL для элементов Lakehouse.
- Поддерживает запись данных в таблицу хранилища в разных режимах сохранения — это доступно только в последней среде выполнения GA версии, т. е. Среда выполнения 1.3. Кроме того, в настоящее время операция записи не работает, если
Private Link
включена иPublic Access
заблокирована. - Структура DW теперь поддерживает
Time Travel
, однако этот соединитель не работает для запроса с синтаксисом путешествия во времени. - Сохраняет характер использования, аналогичный тому, который поставляется с Apache Spark для Azure Synapse Analytics для консистентности. Однако не обеспечена обратная совместимость для подключения и работы с выделенным пулом SQL в Azure Synapse Analytics.
- Имена столбцов со специальными символами будут обрабатываться путем добавления экранирующего символа перед отправкой запроса, основанного на имени таблицы/представления из трех частей. В случае чтения на основе пользовательского или сквозного запроса пользователи должны экранировать имена столбцов, которые будут содержать специальные символы.