Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
ПРИМЕНИМО К:
Azure Data Factory
Azure Synapse Analytics
Tip
Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.
В этой статье описывается использование Copy activity в конвейерах Azure Data Factory и Azure Synapse для копирования данных из и в Snowflake, а также использование Data Flow для преобразования данных в Snowflake. Дополнительные сведения см. в вводной статье Data Factory или Azure Synapse Analytics.
Important
Соединитель Snowflake версии 1 находится на этапе удаления. Рекомендуется обновить соединитель Snowflake с версии 1 до версии 2.
Поддерживаемые возможности
Этот соединитель Snowflake поддерживается для возможностей, указанных ниже:
| Поддерживаемые возможности | IR |
|---|---|
| Копирование данных (источник/приемник) | (1) (2) |
| Сопоставление потока данных (источник или приемник) | ① |
| Операция поиска | (1) (2) |
| Действие скрипта (применение версии 1.1 при использовании параметра скрипта) | (1) (2) |
(1) Azure среды выполнения интеграции (2) локальная среда выполнения интеграции
Для Copy activity этот соединитель Snowflake поддерживает следующие функции:
- Скопируйте данные из Snowflake, используя команду COPY into [location], чтобы добиться оптимальной производительности.
- Скопируйте данные в Snowflake, который использует команду COPY INTO [table] для максимальной производительности. Он поддерживает Snowflake на Azure.
- Если прокси-сервер необходим для подключения к Snowflake из локального Integration Runtime, необходимо настроить переменные среды для HTTP_PROXY и HTTPS_PROXY на узле Integration Runtime.
Prerequisites
Если хранилище данных находится в локальной сети, виртуальной сети Azure или Amazon Virtual Private Cloud, необходимо настроить самостоятельно размещаемую среду выполнения интеграции для подключения к нему. Обязательно добавьте IP-адреса, которые использует локальная среда выполнения интеграции, в список разрешенных.
Если хранилище данных является управляемой облачной службой данных, можно использовать Azure Integration Runtime. Если доступ ограничен ip-адресами, утвержденными в правилах брандмауэра, можно добавить в список разрешенных ip-адресов Azure Integration Runtime/c0.
Учетная запись Snowflake, используемая для источника или приемника, должна иметь необходимый USAGE доступ к базе данных и доступ на чтение и запись в схеме и таблицах и представлениях под ним. Кроме того, на схеме должно быть CREATE STAGE, чтобы создавать внешнюю стадию с URI SAS.
Необходимо задать приведенные ниже значения свойств учетной записи.
| Property | Description | Required | Default |
|---|---|---|---|
| REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | Указывает, требуется ли использование объекта интеграции хранилища в качестве облачных учетных данных при создании именованного внешнего этапа (с помощью CREATE STAGE) для доступа к пункту хранения в частном облаке. | FALSE | FALSE |
| REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | Указывает, требуется ли использовать именованный внешний этап, который ссылается на объект интеграции хранилища в качестве облачных учетных данных при загрузке данных из частного облачного хранилища или выгрузке данных в него. | FALSE | FALSE |
Дополнительные сведения о механизмах и вариантах безопасности сети, поддерживаемых фабрикой данных, см. в стратегиях доступа к данным.
Get started
Для выполнения действия копирования с конвейером можно использовать один из следующих средств или пакетов SDK:
- Средство копирования данных
- портал Azure
- SDK .NET
- пакет SDK Python
- Azure PowerShell
- REST API
- шаблон Azure Resource Manager
Создание связанной службы со Snowflake с помощью пользовательского интерфейса
Выполните следующие действия, чтобы создать связанную службу с Snowflake в пользовательском интерфейсе портала Azure.
Перейдите на вкладку "Управление" в рабочей области Azure Data Factory или Synapse и выберите "Связанные службы", а затем нажмите кнопку "Создать".
Выполните поиск Snowflake и выберите соединитель Snowflake.
Настройте сведения о службе, проверьте подключение и создайте связанную службу.
Сведения о конфигурации соединителя
Следующие разделы содержат сведения о свойствах, используемых для определения сущностей, относящихся к соединителю Snowflake.
Свойства связанной службы
Эти универсальные свойства поддерживаются для связанной службы Snowflake:
| Property | Description | Required |
|---|---|---|
| type | Свойство type должно иметь значение SnowflakeV2. | Yes |
| version | Версия, которую вы указали. Рекомендуется обновить до последней версии, чтобы воспользоваться новыми улучшениями. | Да для версии 1.1 |
| accountIdentifier | Имя учетной записи и название организации. Например, myorg-account123. | Yes |
| database | База данных по умолчанию, используемая для сеанса после подключения. | Yes |
| warehouse | Виртуальное хранилище по умолчанию, используемое для сеанса после подключения. | Yes |
| authenticationType | Тип проверки подлинности, используемый для подключения к службе Snowflake. Допустимые значения: "Базовый " (по умолчанию) и KeyPair. Обратитесь к соответствующим разделам ниже, в которых описываются дополнительные свойства и примеры. | No |
| role | Роль безопасности по умолчанию, используемая для сеанса после подключения. | No |
| host | Имя хоста учетной записи Snowflake. Например: contoso.snowflakecomputing.com.
.cn также поддерживается. |
No |
| connectVia | Среда выполнения интеграции , используемая для подключения к хранилищу данных. Можно использовать среду выполнения интеграции Azure или локальную среду выполнения интеграции (если хранилище данных находится в частной сети). Если он не указан, он использует среду выполнения интеграции по умолчанию Azure. | No |
В зависимости от вашего случая можно задать следующие дополнительные свойства подключения в связанной службе.
| Property | Description | Required | Значение по умолчанию |
|---|---|---|---|
| UseUtcTimestamps | Укажите false, чтобы возвращать тип TIMESTAMP_LTZ и тип TIMESTAMP_TZ в корректной временной зоне, а также тип TIMESTAMP_NTZ без информации о временной зоне. Установите значение true, чтобы возвращать все типы временных меток Snowflake в формате UTC. |
No | false |
| schema | Указывает схему сеанса запроса после подключения. | No | / |
Этот соединитель Snowflake поддерживает следующие типы проверки подлинности. Дополнительные сведения см. в соответствующих разделах.
Basic authentication
Чтобы использовать обычную проверку подлинности, в дополнение к универсальным свойствам, описанным в предыдущем разделе, укажите следующие свойства:
| Property | Description | Required |
|---|---|---|
| user | Имя входа для пользователя Snowflake. | Yes |
| password | Пароль для пользователя Snowflake. Пометьте это поле как тип SecureString для безопасного хранения. Вы также можете сослаться на секрет, хранящийся в Azure Key Vault. | Yes |
Example:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Password в Azure Key Vault:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Проверка подлинности пары ключей
Чтобы использовать аутентификацию с помощью пары ключей, необходимо настроить и создать пользователя с аутентификацией по паре ключей в Snowflake, ссылаясь на Аутентификация по паре ключей и ротация ключей. Затем запишите закрытый ключ и парольную фразу (необязательно), которая используется для определения связанной службы.
В дополнение к общим свойствам, описанных в предыдущих разделах, укажите следующие свойства:
| Property | Description | Required |
|---|---|---|
| user | Имя входа для пользователя Snowflake. | Yes |
| privateKey | Закрытый ключ, используемый для проверки подлинности пары ключей. Чтобы обеспечить допустимость закрытого ключа при отправке в Azure Data Factory, и учитывая, что файл privateKey содержит новые символы (\n), важно правильно отформатировать содержимое privateKey в строковой литеральной форме. Этот процесс включает добавление \n явным образом в каждую новую линию. |
Yes |
| privateKeyPassphrase | Парольная фраза, используемая для расшифровки закрытого ключа, если она зашифрована. | No |
Example:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Note
Для сопоставления потоков данных рекомендуется создать новый закрытый ключ RSA с помощью стандарта PKCS#8 в формате PEM (P8-файл).
Свойства набора данных
Полный список разделов и свойств, доступных для определения наборов данных, см. в статье " Наборы данных ".
Для набора данных Snowflake поддерживаются свойства, указанные ниже.
| Property | Description | Required |
|---|---|---|
| type | Свойство типа набора данных должно иметь значение SnowflakeV2Table. | Yes |
| schema | Имя схемы. Обратите внимание, что имя схемы чувствительно к регистру. | "Нет" для источника, "Да" для приемника |
| table | Имя таблицы или представления. Обратите внимание, что имя таблицы чувствительно к регистру. | "Нет" для источника, "Да" для приемника |
Example:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
Свойства Copy activity
Полный список разделов и свойств, доступных для определения действий, см. в статье "Конвейеры ". В этом разделе приведен список свойств, поддерживаемых источником и приемником Snowflake.
Snowflake в качестве источника
Соединитель Snowflake использует команду Snowflake COPY INTO [расположение], чтобы обеспечить лучшую производительность.
Если хранилище-назначение и его формат изначально поддерживаются командой Snowflake COPY, вы можете использовать действие "Копирование" для непосредственного копирования из Snowflake в хранилище-назначение. Для получения подробностей см. раздел "Прямая копия из Snowflake". В противном случае используйте встроенную функцию «Staged copy from Snowflake».
Чтобы скопировать данные из Snowflake, в разделе Copy activity source поддерживаются следующие свойства.
| Property | Description | Required |
|---|---|---|
| type | Свойство type источника Copy activity должно иметь значение SnowflakeV2Source. | Yes |
| query | Определяет SQL-запрос для чтения данных из Snowflake. Если имена схемы, таблицы и столбцов содержат строчные буквы, заключайте идентификатор объекта в кавычки, например select * from "schema"."myTable".Выполнение хранимой процедуры не поддерживается. |
No |
| exportSettings | Дополнительные параметры, используемые для получения данных из Snowflake. Можно настроить параметры, поддерживаемые командой COPY, в команде, которую будет передавать служба при вызове оператора. | Yes |
| treatDecimalAsString | Укажите, чтобы обрабатывать десятичный тип как строковый в деятельности поиска и скрипта. Значение по умолчанию — false.Это свойство поддерживается только в версии 1.1. |
No |
В разделе exportSettings: |
||
| type | Тип команды экспорта, для которой задано значение SnowflakeExportCopyCommand. | Yes |
| storageIntegration | Укажите имя интеграции хранилища, созданной в Snowflake. Инструкции по использованию интеграции хранилища см. в разделе "Настройка интеграции хранилища Snowflake". | No |
| additionalCopyOptions | Дополнительные параметры копирования, предоставляемые в виде словаря пар "ключ-значение". Примеры: MAX_FILE_SIZE, OVERWRITE. Дополнительные сведения см. в разделе "Параметры копирования Snowflake". | No |
| additionalFormatOptions | Дополнительные параметры формата файла, предоставляемые для команды COPY в виде словаря пар "ключ-значение". Примеры: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, NULL_IF. Дополнительные сведения см. в разделе "Параметры типа формата Snowflake". При использовании NULL_IF значение NULL в Snowflake преобразуется в указанное значение (которое должно быть заключено в одинарные кавычки) при записи в делимитированный текстовый файл в промежуточном хранилище. Указанное значение рассматривается как NULL при чтении из промежуточного файла в приемное хранилище. Значение по умолчанию — 'NULL'. |
No |
Note
Убедитесь, что у вас есть разрешение на выполнение следующей команды и доступ к схеме INFORMATION_SCHEMA и столбцам таблицы.
COPY INTO <location>
Прямое копирование из Snowflake
Если хранилище данных приемника и формат соответствуют критериям, описанным в этом разделе, можно использовать действие копирования для прямого копирования из Snowflake в приемник. Служба проверяет параметры и завершает работу Copy activity, если следующие критерии не выполнены:
При указании
storageIntegrationв источнике:Хранилище данных приемника — это Azure Blob Storage, которое вы указали на внешнем уровне в Snowflake. Перед копированием данных необходимо выполнить следующие действия:
Создайте связанную службу Azure Blob Storage для приемника Azure Blob Storage с любыми поддерживаемыми типами проверки подлинности.
Предоставьте как минимум роль Участник данных BLOB-хранилища служебному принципалу Snowflake в пункте назначения Azure Blob Storage Управление доступом (IAM).
Если вы не указываете
storageIntegrationв источнике:Служба sink, связанная с хранилищем Azure BLOB, с аутентификацией по подписанному общему доступу. Если вы хотите напрямую скопировать данные в Azure Data Lake Storage Gen2 в следующих поддерживаемых форматах, можно создать связанную службу Azure Blob Storage с аутентификацией SAS для учетной записи Azure Data Lake Storage Gen2, чтобы избежать использования промежуточного копирования из Snowflake.
Формат данных приемника — Parquet, разделенный текст или JSON со следующими конфигурациями:
- Для формата Parquet кодек сжатия — None, Snappy или Lzo.
- Для формата текста с разделителями :
-
rowDelimiter— это \r\n, или любой одиночный символ. -
compressionможет быть без сжатия, gzip, bzip2 или deflate. -
encodingNameоставлено по умолчанию или установлено значение utf-8. -
quoteChar— двойнаякавычка, одна кавычка или пустая строка (без символа кавычки).
-
- В формате JSON прямая копия поддерживает только тот случай, когда исходная таблица Snowflake или результат запроса содержит только один столбец, а тип данных этого столбца — VARIANT, OBJECT или ARRAY.
-
compressionможет быть без сжатия, gzip, bzip2 или deflate. -
encodingNameоставлено по умолчанию или установлено значение utf-8. -
filePatternВ приемнике действий копирования используется значение по умолчанию либо задано значение setOfObjects.
-
В источнике действия копирования
additionalColumnsне указан.Сопоставление столбцов не указано.
Example:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Промежуточное копирование из Snowflake
Если хранилище данных приемника или формат не совместимы с командой Snowflake COPY, как упоминалось в последнем разделе, включите встроенное поэтапное копирование с помощью интерфейса промежуточного Azure Blob Storage. Функция поэтапного копирования также обеспечивает более высокую пропускную способность. Служба экспортирует данные из Snowflake в промежуточное хранилище, затем копирует данные в приемник и, наконец, очищает промежуточное хранилище от временных данных. Сведения о копировании данных с помощью промежуточного копирования см. в разделе "Поэтапное копирование ".
Чтобы использовать эту функцию, создайте связанную службу Azure Blob Storage, которая ссылается на учетную запись хранилища Azure в качестве промежуточного хранилища. Затем укажите свойства enableStaging и stagingSettings в Copy activity.
При указании
storageIntegrationв источнике временное хранилище Azure Blob Storage должно быть тем же, что и вы указали во внешнем этапе в Snowflake. Убедитесь, что вы создаете связанную службу Azure Blob Storage с любой поддерживаемой проверкой подлинности при использовании среды выполнения интеграции Azure или с анонимным ключом учетной записи, подписанным url-адресом или проверкой подлинности субъекта-службы при использовании локальной среды выполнения интеграции. Кроме того, предоставьте субъекту службы Snowflake роль "Данные хранилища BLOB - участник" в промежуточном Azure Blob Storage в разделе "Управление доступом (IAM)".Если в источнике не указано
storageIntegration, служба, связанная с промежуточным хранилищем Azure Blob, должна использовать аутентификацию с использованием общего ключа доступа, как требуется командой Snowflake COPY. Убедитесь, что вы предоставляете Snowflake надлежащее разрешение на доступ к промежуточному хранилищу Azure Blob. Дополнительные сведения об этом см. в этой статье.
Example:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
При выполнении поэтапной копии из Snowflake важно задать поведение копирования приемника для слияния файлов. Этот параметр гарантирует правильность обработки и объединения всех секционированных файлов, предотвращая проблему, из-за которой копируется только последний секционированный файл.
Пример конфигурации
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
Note
Если не задать поведение копирования приемника как слияние файлов, это может привести к копированию только последнего секционированного файла.
Snowflake в качестве приемника
Соединитель Snowflake использует команду Snowflake COPY into [table], чтобы добиться оптимальной производительности. Он поддерживает запись данных в Snowflake на Azure.
Если исходное хранилище данных и формат нативно поддерживаются командой Snowflake COPY, можно использовать операцию копирования для непосредственного копирования из источника в Snowflake. Дополнительные сведения см. в разделе "Прямая копия в Snowflake". В противном случае используйте встроенную функцию Staged Copy to Snowflake.
Чтобы скопировать данные в Snowflake, в разделе Copy activity sink поддерживаются следующие свойства.
| Property | Description | Required |
|---|---|---|
| type | Для свойства type приемника Copy activity задано значение SnowflakeV2Sink. | Yes |
| preCopyScript | Укажите SQL-запрос для запуска Copy activity перед записью данных в Snowflake в каждом запуске. Это свойство используется для очистки предварительно загруженных данных. | No |
| importSettings | Дополнительные параметры, используемые для записи данных в Snowflake. Можно настроить параметры, поддерживаемые командой COPY, в команде, которую будет передавать служба при вызове оператора. | Yes |
В разделе importSettings: |
||
| type | Тип команды импорта, для которой задано значение SnowflakeImportCopyCommand. | Yes |
| storageIntegration | Укажите имя интеграции хранилища, созданной в Snowflake. Инструкции по использованию интеграции хранилища см. в разделе "Настройка интеграции хранилища Snowflake". | No |
| additionalCopyOptions | Дополнительные параметры копирования, предоставляемые в виде словаря пар "ключ-значение". Примеры: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Дополнительные сведения см. в разделе "Параметры копирования Snowflake". | No |
| additionalFormatOptions | Дополнительные параметры формата файла, передаваемые команде COPY, указаны в виде словаря пар "ключ-значение". Примеры: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Дополнительные сведения см. в разделе "Параметры типа формата Snowflake". | No |
Note
Убедитесь, что у вас есть разрешение на выполнение следующей команды и доступ к схеме INFORMATION_SCHEMA и столбцам таблицы.
SELECT CURRENT_REGION()COPY INTO <table>SHOW REGIONSCREATE OR REPLACE STAGEDROP STAGE
Прямое копирование в Snowflake
Если исходное хранилище данных и формат соответствуют критериям, описанным в этом разделе, можно использовать Copy activity для непосредственного копирования из источника в Snowflake. Служба проверяет параметры и завершает работу Copy activity, если следующие критерии не выполнены:
При указании
storageIntegrationв приемнике:Исходное хранилище данных — это Azure Blob Storage, которое вы указали в этапе внешнего хранения в Snowflake. Перед копированием данных необходимо выполнить следующие действия:
Создайте связанную службу Azure Blob Storage для исходного Azure Blob Storage с любыми поддерживаемыми типами проверки подлинности.
Предоставьте субъекту-службе Snowflake роль Storage Blob Data Reader в разделе Access Control (IAM) исходного хранилища Azure Blob Storage.
Если не указать
storageIntegrationв приемнике:Связанная служба исходная — это хранилище Azure Blob с аутентификацией с помощью общей подписи доступа. Если вы хотите копировать данные из Azure Data Lake Storage Gen2 напрямую в следующем поддерживаемом формате, можно создать связанную службу Azure Blob Storage с aутентификацией SAS, чтобы работать с аккаунтом Azure Data Lake Storage Gen2 и избежать использования промежуточного копирования в Snowflake.
Исходный формат данных — Parquet, текст с разделителями или JSON со следующими конфигурациями:
Для формата Parquet кодек сжатия имеет значение None или Snappy.
Для формата текста с разделителями :
-
rowDelimiter— это \r\n, или любой одиночный символ. Если разделитель строк не является "\r\n",firstRowAsHeaderдолжен быть false, иskipLineCountне указывается. -
compressionможет быть без сжатия, gzip, bzip2 или deflate. - Для
encodingNameостается значение по умолчанию или устанавливается как "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255". -
quoteChar— двойнаякавычка, одна кавычка или пустая строка (без символа кавычки).
-
Для формата JSON прямая копия поддерживает только тот случай, когда таблица Snowflake приемника имеет только один столбец, а тип данных этого столбца — VARIANT, OBJECT или ARRAY.
-
compressionможет быть без сжатия, gzip, bzip2 или deflate. -
encodingNameоставлено по умолчанию или установлено значение utf-8. - Сопоставление столбцов не указано.
-
В источнике действия "Копирование":
-
additionalColumnsне указан. - Если источником является папка, для
recursiveустановлено значение true. -
prefix,modifiedDateTimeStart,modifiedDateTimeEndиenablePartitionDiscoveryне указаны.
-
Example:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
Поэтапное копирование в Snowflake
Если исходное хранилище данных или формат не совместимы с командой Snowflake COPY, как упоминалось в последнем разделе, включите встроенное поэтапное копирование с помощью промежуточного экземпляра хранилища BLOB-объектов Azure. Функция поэтапного копирования также обеспечивает более высокую пропускную способность. Служба автоматически преобразует данные, чтобы они соответствовали требованиям к формату данных Snowflake. Затем она вызывает команду КОПИРОВАТЬ для загрузки данных в Snowflake. Наконец, она очищает ваши временные данные из хранилища блобов. Сведения о копировании данных с помощью промежуточного копирования см. в разделе "Поэтапное копирование ".
Чтобы использовать эту функцию, создайте связанную службу Azure Blob Storage, которая ссылается на учетную запись хранилища Azure в качестве промежуточного хранилища. Затем укажите свойства enableStaging и stagingSettings в Copy activity.
При указании
storageIntegrationв приемнике промежуточный Azure Blob Storage должен быть тем, который вы указали во внешней стадии в Snowflake. Убедитесь, что вы создаете связанную службу Azure Blob Storage с любой поддерживаемой проверкой подлинности при использовании среды выполнения интеграции Azure или с анонимным ключом учетной записи, подписанным url-адресом или проверкой подлинности субъекта-службы при использовании локальной среды выполнения интеграции. Кроме того, предоставьте роль как минимум Storage Blob Data Reader основному объекту службы Snowflake в стейджинговом Azure Blob Storage Access Control (IAM).Если в приемнике не указано
storageIntegration, промежуточной Azure Blob Storage связанной службе необходимо использовать проверку подлинности подписи общего доступа, как это требуется команде Snowflake COPY.
Example:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Сопоставление свойств потока данных
При преобразовании данных в сопоставительном потоке данных можно выполнять операции чтения и записи в таблицах Snowflake. Дополнительные сведения см. в преобразовании источника и преобразовании приемника в потоках сопоставления данных. Вы можете использовать набор данных Snowflake или встроенный набор данных в качестве источника и типа приемника.
Преобразование источника
В таблице, приведенной ниже, указаны свойства, поддерживаемые источником Snowflake. Эти свойства можно изменить на вкладке "Параметры источника ". Соединитель использует внутреннюю передачу данных Snowflake.
| Name | Description | Required | Допустимые значения | Свойство скрипта потока данных |
|---|---|---|---|---|
| Table | Если в качестве входных данных выбрать вариант "Таблица", поток данных будет получать все данные из таблицы, указанной в наборе данных Snowflake, или в параметрах источника при использовании встроенного набора данных. | No | String |
(только для встроенного набора данных) tableName schemaName |
| Query | Если в качестве входных данных выбрать "Запрос", введите запрос для получения данных из Snowflake. Этот параметр переопределяет любую таблицу, выбранную в наборе данных. Если имена схемы, таблицы и столбцов содержат строчные буквы, заключайте идентификатор объекта в кавычки, например select * from "schema"."myTable". |
No | String | query |
| Включение добавочного извлечения (предварительная версия) | Используйте этот параметр, чтобы указать ADF, что нужно обработать только те строки, которые изменились с момента последнего выполнения конвейера. | No | Boolean | enableCdc |
| Добавочный столбец | При использовании функции инкрементного извлечения необходимо выбрать столбец с датой/временем или числовой столбец, который вы хотите использовать в качестве метки в вашей исходной таблице. | No | String | waterMarkColumn |
| Включить отслеживание изменений в Snowflake (предварительная версия) | Этот параметр позволяет ADF использовать технологию фиксации изменений данных Snowflake для обработки только дельта-данных с момента выполнения предыдущего потока. Этот параметр автоматически загружает разностные данные с операциями вставки строк, обновления и удаления без необходимости добавлять добавочный столбец. | No | Boolean | enableNativeCdc |
| Чистые изменения | При использовании отслеживания изменений snowflake можно использовать этот параметр, чтобы получить дедупликированные измененные строки или исчерпывающие изменения. Дедупликированные измененные строки будут отображать только последние версии строк, которые изменились с заданной точки во времени, в то время как исчерпывающие изменения будут отображать все версии каждой строки, которая изменилась, включая те, которые были удалены или обновлены. Например, при обновлении строки вы увидите версию удаления и версию вставки в исчерпывающих изменениях, но только версию вставки в дедуппированных измененных строках. В зависимости от варианта использования можно выбрать вариант, соответствующий вашим потребностям. Параметр по умолчанию имеет значение false, что означает исчерпывающие изменения. | No | Boolean | netChanges |
| Включите системные столбцы | При использовании отслеживания изменений snowflake можно использовать параметр systemColumns, чтобы контролировать, включены ли или исключены столбцы потока метаданных, предоставляемые Snowflake, в выходные данные отслеживания изменений. По умолчанию systemColumns имеет значение true, что означает, что столбцы потока метаданных включены. Вы можете установить значение false для systemColumns, если хотите их исключить. | No | Boolean | systemColumns |
| Начать читать с начала | Установка этого параметра с добавочным извлечением и отслеживанием изменений приведет к тому, что ADF будет считывать все строки при первом выполнении конвейера с включенным добавочным извлечением. | No | Boolean | skipInitialLoad |
Примеры сценариев для источника Snowflake
При использовании набора данных Snowflake в качестве типа источника связанный сценарий потока данных будет следующим:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
Если используется встроенный набор данных, связанный сценарий потока данных будет следующим:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Встроенное отслеживание изменений
Azure Data Factory теперь поддерживает встроенную функцию в Snowflake, которая называется отслеживанием изменений, которая включает отслеживание изменений в виде журналов. Эта функция Snowflake позволяет следить за изменениями данных со временем, что делает её полезной для инкрементальной загрузки и аудита данных. Чтобы воспользоваться этой функцией, при включении Change data capture и выборе Snowflake Change Tracking мы создаём объект Stream для исходной таблицы, который позволяет отслеживать изменения на исходной таблице snowflake. Затем мы используем предложение CHANGES в нашем запросе для получения только новых или обновленных данных из исходной таблицы. Кроме того, рекомендуется запланировать конвейер так, чтобы изменения потреблялись в течение интервала времени хранения данных, установленного для исходной таблицы Snowflake, иначе пользователь может наблюдать несогласованное поведение в захваченных изменениях.
Преобразование приемника
В таблице, приведенной ниже, указаны свойства, поддерживаемые приемником Snowflake. Эти свойства можно изменить на вкладке "Параметры ". При использовании встроенного набора данных вы увидите дополнительные параметры, которые совпадают с свойствами, описанными в разделе свойств набора данных . Соединитель использует внутреннюю передачу данных Snowflake.
| Name | Description | Required | Допустимые значения | Свойство скрипта потока данных |
|---|---|---|---|---|
| Метод обновления | Укажите, какие операции разрешены в пункте назначения Snowflake. Для обновления, объединенного обновления и вставки или удаления строк требуется преобразование Alter row для пометки строк для этих действий. |
Yes |
true или false |
deletable insertable updateable upsertable |
| Ключевые столбцы | Для выполнения обновления (update), обновления или вставки (upsert) или удаления (delete) должен быть установлен ключевой столбец (или столбцы), позволяющий определить строки для изменения. | No | Array | keys |
| Действие таблицы | Определяет, следует ли повторно создавать или удалять все строки в целевой таблице перед записью. - Нет: Никакие действия с таблицей выполнены не будут. - Повторное создание: таблица будет удалена и создана заново. Это действие необходимо, если новая таблица создается динамически. - Усечение: все строки из целевой таблицы будут удалены. |
No |
true или false |
recreate truncate |
Примеры сценариев для приемника Snowflake
При использовании набора данных Snowflake в качестве типа приемника связанный сценарий потока данных будет следующим:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Если используется встроенный набор данных, связанный сценарий потока данных будет следующим:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Оптимизация pushdown запроса
При установке уровня ведения журнала конвейера в значение None мы исключаем передачу промежуточных метрик преобразования, предотвращая потенциальные помехи для оптимизации Spark и обеспечивая оптимизацию выполнения запросов, доступную благодаря Snowflake. Эта оптимизация снижения нагрузки позволяет существенно ускорить обработку больших таблиц Snowflake с обширными наборами данных.
Note
Мы не поддерживаем временные таблицы в Snowflake, так как они являются локальными для сеанса или пользователя, который создает их, что делает их недоступными для других сеансов и подвержены перезаписи как обычные таблицы Snowflake. Хотя Snowflake предлагает временные таблицы в качестве альтернативы, которые доступны глобально, они требуют ручного удаления, что противоречит нашей основной цели использования временных таблиц, что позволяет избежать любых операций удаления в исходной схеме.
Сопоставление типов данных для Snowflake версии 2
При копировании данных из Snowflake используются следующие сопоставления типов данных Snowflake с промежуточными типами данных, применяемыми внутри службы. Чтобы узнать, как действие копирования сопоставляет исходную схему и типы данных с приемником, см. раздел Сопоставление схем и типов данных.
| Тип данных Snowflake | Промежуточный служебный тип данных |
|---|---|
| НОМЕР (p,0) | Decimal |
| NUMBER (p,s, где s>0) | Decimal |
| FLOAT | Double |
| VARCHAR | String |
| CHAR | String |
| BINARY | Byte[] |
| BOOLEAN | Boolean |
| DATE | DateTime |
| TIME | TimeSpan |
| TIMESTAMP_LTZ | DateTimeOffset |
| TIMESTAMP_NTZ | DateTimeOffset |
| TIMESTAMP_TZ | DateTimeOffset |
| VARIANT | String |
| OBJECT | String |
| ARRAY | String |
Свойства действия поиска
Дополнительные сведения о свойствах см. в действии поиска.
Жизненный цикл и обновление соединителя Snowflake
В следующей таблице показаны этап выпуска и журналы изменений для различных версий соединителя Snowflake:
| Version | Этап выпуска | Журнал изменений |
|---|---|---|
| Snowflake V1 | Removed | Неприменимо. |
| Snowflake V2 (версия 1.0) | GA версия доступна | • Добавьте поддержку проверки подлинности пары ключей. • Добавьте поддержку storageIntegration в операции копирования. accountIdentifier, warehouse, database, schema и role свойства используются для установления соединения вместо свойства connectionstring.• Добавьте поддержку типа данных Decimal в операции Lookup. Тип NUMBER, определенный в Snowflake, будет отображаться как строка в операции поиска. Если вы хотите преобразовать его в числовой тип в V2, можно использовать параметр pipeline с функцией int или функцией float. Например, int(activity('lookup').output.firstRow.VALUE), float(activity('lookup').output.firstRow.VALUE).• тип данных метки времени в Snowflake трактуется как тип данных DateTimeOffset в действиях "Поиск" и "Скрипт". Если после обновления до версии 2 вам все еще нужно использовать значение DateTime в качестве параметра в вашем конвейере, вы можете преобразовать тип DateTimeOffset в тип DateTime с помощью функции formatDateTime (рекомендуется) или функции concat. Например: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE), concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z') • ЧИСЛО (p,0) считывается как десятичный тип данных. • TIMESTAMP_LTZ, TIMESTAMP_NTZ и TIMESTAMP_TZ считываются как тип данных DateTimeOffset. • Параметры скрипта не поддерживаются в действии скрипта. В качестве альтернативы используйте динамические выражения для параметров скрипта. Дополнительные сведения см. в разделе выражения и функции в Azure Data Factory и Azure Synapse Analytics. • Выполнение нескольких инструкций SQL в действии скрипта не поддерживается. |
| Snowflake V2 (версия 1.1) | GA версия доступна | • Добавьте поддержку параметров скрипта. • Добавлена поддержка выполнения нескольких инструкций в действии скрипта. • Добавление свойства treatDecimalAsString в активности Lookup и Script. • Добавьте дополнительное свойство подключения UseUtcTimestamps. |
Обновление соединителя Snowflake с версии 1 до версии 2
Чтобы обновить соединитель Snowflake с версии 1 до версии 2, можно выполнить параллельное обновление или обновление на месте.
Параллельное обновление
Чтобы выполнить параллельное обновление, выполните следующие действия.
- Создайте связанную службу Snowflake и настройте ее, ссылаясь на свойства связанной службы версии 2.
- Создайте набор данных на основе созданной связанной службы Snowflake.
- Замените новую связанную службу и набор данных существующими в конвейерах, предназначенными для объектов версии 1.
Обновление на месте
Чтобы выполнить модернизацию на месте, необходимо изменить существующую полезную нагрузку связанной службы и обновить набор данных для использования новой связанной службы.
Обновите тип от Snowflake до SnowflakeV2.
Измените нагрузку связанной службы с формата версии 1 на версию 2. Вы можете заполнить каждое поле через пользовательский интерфейс после того, как измените упомянутый выше тип, или обновить payload непосредственно через редактор JSON. Сведения о поддерживаемых свойствах подключения см. в разделе свойств связанной службы в этой статье. В следующих примерах показаны различия полезной нагрузки для связанных служб V1 и V2 Snowflake.
Полезные данные JSON связанной службы Snowflake версии 1:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }JSON полезная нагрузка подключенной службы Snowflake V2:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }Обновите набор данных, чтобы использовать новую связанную службу. Можно создать новый набор данных на основе только что созданной связанной службы или обновить свойство типа существующего набора данных из SnowflakeTable на SnowflakeV2Table.
Note
При переходе связанных служб раздел параметров шаблона переопределения может отображать только свойства базы данных. Это можно устранить, изменив параметры вручную. После этого раздел Переопределить параметры шаблона отобразит строки подключения.
Обновление соединителя Snowflake версии 2 с версии 1.0 до версии 1.1
На странице "Изменить связанную службу " выберите 1.1 для версии. Дополнительные сведения см. в разделе "Свойства связанной службы".
Связанный контент
Список хранилищ данных, поддерживаемых в качестве источников и приемников Copy activity, см. в разделе поддерживаемые хранилища данных и форматы.