Поделиться через


Перенос данных из Amazon S3 в Azure Data Lake Storage 2-го поколения

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Используйте шаблон, с помощью которого можно перенести из Amazon S3 в Azure Data Lake Storage 2-го поколения сотни миллионов файлов, общий объем которых измеряется петабайтами.

Примечание.

Если вы хотите скопировать небольшое количество информации из AWS S3 в Azure (например, менее 10 ТБ), эффективнее и удобнее будет использовать специальный инструмент копирования в Фабрике данных Azure. Шаблон, описанный в этой статье, предлагает больше, чем вам нужно.

Общая информация о шаблонах решений

При переносе более чем 10 ТБ данных рекомендуется применить секционирование. Чтобы разделить данные, используйте параметр 'prefix' для фильтрации папок и файлов в Amazon S3 по имени, а затем каждое задание копирования в Azure Data Factory может копировать одну секцию за раз. Чтобы повысить пропускную способность, вы можете запускать несколько заданий копирования в Azure Data Factory одновременно.

Обычно требуется однократный перенос исторических данных, а также периодическая синхронизация изменений AWS S3 с Azure. Ниже приведено два шаблона. Один из них покрывает однократный перенос исторических данных, а другой — синхронизацию изменений AWS S3 с Azure.

Шаблон для переноса исторических данных из Amazon S3 в Azure Data Lake Storage 2-го поколения

Этот шаблон для переноса называется Migrate historical data from AWS S3 to Azure Data Lake Storage Gen2 и предполагает, что вы прописали список секций во внешней управляющей таблице в Базе данных SQL Azure. Поэтому он будет использовать действие Lookup для извлечения списка разделов из внешней управляющей таблицы, затем выполнит итерацию для каждого раздела и запустит задание копирования в ADF с одной секцией за раз. Когда любое задание копирования будет завершено, используется действие Stored Procedure для обновления статуса копирования каждой секции в управляющей таблице.

Шаблон состоит из пяти действий.

  • Действие Lookup извлекает из внешней управляющей таблицы разделы, которые не были скопированы в Azure Data Lake Storage второго поколения. Имя таблицы — s3_partition_control_table, а запрос для загрузки из нее данных выглядит следующим образом: "SELECT PartitionPrefix FROM s3_partition_control_table WHERE SuccessOrFailure = 0".
  • Действие ForEach получает список секций из действия Lookup и выполняет итерацию для каждой секции для действия TriggerCopy. Вы можете установить параметр batchCount, чтобы одновременно запускать несколько заданий копирования в ADF. В этом шаблоне мы задали значение 2.
  • ExecutePipeline выполняет пайплайн CopyFolderPartitionFromS3. Мы создаем отдельный конвейер для каждого задания копирования секции, потому что это облегчает повторный запуск неудачного задания и загрузку этой секции из AWS S3. Остальные задания копирования, которые загружают другие секции, не будут затронуты.
  • Действие Copy копирует каждую секцию из AWS S3 в Azure Data Lake Storage 2-го поколения.
  • SqlServerStoredProcedure обновляет информацию о состоянии копирования каждого раздела в управляющей таблице.

Шаблон содержит два параметра.

  • AWS_S3_bucketName — это имя контейнера в AWS S3, из которого вам нужно перенести данные. Если вы хотите перенести данные из нескольких контейнеров AWS S3, добавьте еще один столбец во внешнюю управляющую таблицу, в котором будет храниться имя контейнера для каждой секции, и соответствующим образом обновите конвейер, чтобы он получал данные из этого столбца.
  • Azure_Storage_fileSystem — это имя файловой системы на Azure Data Lake Storage 2-го поколения, в которую вам нужно перенести данные.

Шаблон для копирования только измененных файлов из Amazon S3 в Azure Data Lake Storage 2-го поколения

Этот шаблон для переноса называется Copy delta data from AWS S3 to Azure Data Lake Storage Gen2. Он использует данные свойства LastModifiedTime для каждого файла, чтобы копировать из AWS S3 в Azure только новые или обновленные файлы. Обратите внимание, если ваши файлы или папки на AWS S3 уже секционированы по времени, и информация о временном срезе указана в их именах (например, /гггг/мм/дд/файл.csv), вы можете перейти к этому руководству, чтобы узнать более эффективный подход для инкрементальной загрузки новых файлов. Этот шаблон предполагает, что вы прописали список секций во внешней управляющей таблице в Базе данных SQL Azure. Поэтому он будет использовать действие Lookup для извлечения списка разделов из внешней управляющей таблицы, затем выполнит итерацию для каждого раздела и запустит задание копирования в ADF для одного раздела за раз. При запуске каждого задания копирования файлов из AWS S3 используется свойство LastModifiedTime, чтобы обнаруживать и копировать только новые или обновленные файлы. Когда задание копирования завершается, используется действие хранимой процедуры для обновления статуса копирования каждого раздела в управляющей таблице.

Шаблон состоит из семи действий.

  • Lookup извлекает разделы из внешней управляющей таблицы. Имя таблицы — s3_partition_delta_control_table, а запрос для загрузки данных из этой таблицы следующий: "select distinct PartitionPrefix from s3_partition_delta_control_table".
  • Действие ForEach получает список секций из действия Lookup и выполняет итерацию для каждой секции для действия TriggerDeltaCopy. Можно задать параметр batchCount, чтобы одновременно запускать несколько заданий копирования ADF. В этом шаблоне мы задали значение 2.
  • Команда ExecutePipeline запускает конвейер DeltaCopyFolderPartitionFromS3. Мы создаем отдельный конвейер для каждого задания копирования секции, потому что это облегчает повторный запуск неудачного задания и загрузку этой секции из AWS S3. Остальные задания копирования, которые загружают другие секции, не будут затронуты.
  • Действие Lookup извлекает из внешней управляющей таблицы информацию о времени выполнения последнего задания копирования, чтобы новые или обновленные файлы можно было обнаружить с помощью свойства LastModifiedTime. Имя таблицы — s3_partition_delta_control_table, а запрос для загрузки из нее данных выглядит следующим образом: "select max(JobRunTime) as LastModifiedTime from s3_partition_delta_control_table where PartitionPrefix = '@{pipeline().parameters.prefixStr}' and SuccessOrFailure = 1".
  • Действие Copy копирует из AWS S3 в Azure Data Lake Storage 2-го поколения только новые или измененные файлы из каждой секции. Для свойства modifiedDatetimeStart задано значение времени выполнения последнего задания копирования. Для свойства modifiedDatetimeEnd задано значение времени выполнения текущего задания копирования. Имейте в виду, что используется время часового пояса UTC.
  • Процедура SqlServerStoredProcedure обновляет в управляющей таблице статус копирования каждой секции и время копирования, когда операция завершается успешно. Для столбца SuccessOrFailure задано значение 1.
  • Действие SqlServerStoredProcedure обновляет в управляющей таблице статус копирования каждого раздела и время выполнения копии, когда она терпит неудачу. Для столбца SuccessOrFailure задано значение 0.

Шаблон содержит два параметра.

  • AWS_S3_bucketName — это имя контейнера в AWS S3, из которого вам нужно перенести данные. Если вы хотите перенести данные из нескольких контейнеров AWS S3, добавьте еще один столбец во внешнюю управляющую таблицу, в котором будет храниться имя контейнера для каждой секции, и соответствующим образом обновите конвейер, чтобы он получал данные из этого столбца.
  • Azure_Storage_fileSystem — это имя файловой системы на Azure Data Lake Storage 2-го поколения, в которую вам нужно перенести данные.

Как использовать эти два шаблона решений

Шаблон для переноса исторических данных из Amazon S3 в Azure Data Lake Storage 2-го поколения

  1. Создайте управляющую таблицу в Базе данных SQL Azure для хранения списка секций AWS S3.

    Примечание.

    Имя таблицы — s3_partition_control_table. Схема управляющей таблицы — PartitionPrefix и SuccessOrFailure. PartitionPrefix — это параметр префикса в S3 для фильтрации папок и файлов по имени в Amazon S3. SuccessOrFailure указывает на состояние процедуры копирования для каждой секции, где 0 означает, что секция не скопирована в Azure, а 1 — успешное копирование. В управляющей таблице определены 5 секций, и по умолчанию для состояния процедуры копирования для каждой секции установлено значение 0.

    CREATE TABLE [dbo].[s3_partition_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [SuccessOrFailure] [bit] NULL
    )
    
    INSERT INTO s3_partition_control_table (PartitionPrefix, SuccessOrFailure)
    VALUES
    ('a', 0),
    ('b', 0),
    ('c', 0),
    ('d', 0),
    ('e', 0);
    
  2. Создайте хранимую процедуру в той же Базе данных SQL Azure для управляющей таблицы.

    Примечание.

    Имя хранимой процедуры — sp_update_partition_success. Эта процедура будет вызываться активностью SqlServerStoredProcedure в вашем конвейере ADF.

    CREATE PROCEDURE [dbo].[sp_update_partition_success] @PartPrefix varchar(255)
    AS
    BEGIN
    
        UPDATE s3_partition_control_table
        SET [SuccessOrFailure] = 1 WHERE [PartitionPrefix] = @PartPrefix
    END
    GO
    
  3. Перейдите к шаблону Migrate historical data from AWS S3 to Azure Data Lake Storage Gen2. Установите соединение с внешней управляющей таблицей, укажите AWS S3 в качестве хранилища источников данных и Azure Data Lake Storage 2-го поколения в качестве целевого хранилища. Имейте в виду, что внешняя управляющая таблица и хранимая процедура ссылаются на одно и то же соединение.

    Снимок экрана, на котором изображен шаблон для переноса исторических данных из AWS S3 в Azure Data Lake Storage 2-го поколения.

  4. Выберите Использовать этот шаблон.

    Снимок экрана, на котором выделена кнопка

  5. Вы увидите два созданных конвейера и три набора данных, как показано в примере ниже.

    Снимок экрана, на котором показаны два конвейера и три набора данных, созданные с помощью шаблона.

  6. Перейдите к конвейеру BulkCopyFromS3 и выберите Debug (Отладка), а затем введите значения в разделе Parameters (Параметры). Щелкните Готово.

    Скриншот, на котором показано, где выбрать отладку и ввести параметры, перед тем как нажать

  7. Вы увидите результат, аналогичный приведенному ниже:

    Снимок экрана, на котором показаны возвращенные результаты.

Шаблон для копирования только измененных файлов из Amazon S3 в Azure Data Lake Storage 2-го поколения

  1. Создайте управляющую таблицу в Базе данных SQL Azure для хранения списка секций AWS S3.

    Примечание.

    Имя таблицы — s3_partition_delta_control_table. Схема управляющей таблицы — PartitionPrefix, JobRunTime и SuccessOrFailure. PartitionPrefix — это параметр префикса в S3 для фильтрации папок и файлов по имени в Amazon S3. JobRunTime — это значение даты и времени при запуске заданий копирования. SuccessOrFailure указывает на состояние процедуры копирования для каждой секции, где 0 означает, что секция не скопирована в Azure, а 1 — успешное копирование. В управляющей таблице определены 5 разделов. Значением по умолчанию для JobRunTime может быть время, когда начинается однократный перенос исторических данных. Действие копирования в ADF скопирует файлы на AWS S3, которые были изменены в последний раз после этого времени. По умолчанию для состояния процедуры копирования для каждой секции установлено значение 1.

    CREATE TABLE [dbo].[s3_partition_delta_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [JobRunTime] [datetime] NULL,
        [SuccessOrFailure] [bit] NULL
        )
    
    INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
    VALUES
    ('a','1/1/2019 12:00:00 AM',1),
    ('b','1/1/2019 12:00:00 AM',1),
    ('c','1/1/2019 12:00:00 AM',1),
    ('d','1/1/2019 12:00:00 AM',1),
    ('e','1/1/2019 12:00:00 AM',1);
    
  2. Создайте хранимую процедуру в той же Базе данных SQL Azure для управляющей таблицы.

    Примечание.

    Имя хранимой процедуры — sp_insert_partition_JobRunTime_success. Действием SqlServerStoredProcedure в конвейере ADF будет вызываться эта процедура.

    CREATE PROCEDURE [dbo].[sp_insert_partition_JobRunTime_success] @PartPrefix varchar(255), @JobRunTime datetime, @SuccessOrFailure bit
    AS
    BEGIN
        INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
        VALUES
            (@PartPrefix,@JobRunTime,@SuccessOrFailure)
    END
    GO
    
  3. Перейдите к шаблону Copy delta data from AWS S3 to Azure Data Lake Storage Gen2. Установите соединение с внешней управляющей таблицей, укажите AWS S3 в качестве хранилища источников данных и Azure Data Lake Storage 2-го поколения в качестве целевого хранилища. Имейте в виду, что внешняя управляющая таблица и хранимая процедура ссылаются на одно и то же соединение.

    Создание нового подключения

  4. Выберите Использовать этот шаблон.

    Использовать этот шаблон

  5. Вы увидите два созданных конвейера и три набора данных, как показано в примере ниже.

    Проверка конвейера

  6. Перейдите к конвейеру DeltaCopyFromS3 и выберите Debug (Отладка), а затем введите значения в разделе Parameters (Параметры). Щелкните Готово.

    Щелкните **Отладка**

  7. Вы увидите результат, аналогичный приведенному ниже:

    Просмотр результатов

  8. Вы также можете проверить результаты из управляющей таблицы с помощью запроса "select * from s3_partition_delta_control_table". Вы получите результат, как на приведенном ниже изображении.

    Снимок экрана, на котором показаны результаты из управляющей таблицы после выполнения запроса.