Конвейеры и активности в Azure Data Factory и Azure Synapse Analytics

ПРИМЕНИМО К: Azure Data Factory Azure Synapse Analytics

Совет

Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.

Внимание

Поддержка Azure Machine Learning Studio (классическая версия) завершится 31 августа 2024 г. Мы рекомендуем перейти на Azure Machine Learning по этой дате.

По состоянию на 1 декабря 2021 г. нельзя создавать новые Machine Learning Studio (классические) ресурсы (рабочая область и план веб-службы). До 31 августа 2024 г. можно продолжать использовать существующие Machine Learning Studio (классические) эксперименты и веб-службы. Дополнительные сведения см. в разделе:

Документация Machine Learning Studio (classic) снимается с поддержки и может не обновляться в будущем.

Эта статья поможет понять конвейеры и действия в Azure Data Factory и Azure Synapse Analytics и использовать их для создания комплексных рабочих процессов, управляемых данными, для сценариев перемещения данных и обработки данных.

Обзор

Рабочая область Фабрики данных или Synapse может содержать один или несколько конвейеров. Конвейеры — это логические группы действий, которые вместе отвечают за выполнение задачи. Например, конвейер обработки данных может содержать набор действий, загружающих и очищающих данные журнала, а затем запускающих поток сопоставления данных для их анализа. Потоковая обработка позволяет управлять действиями в виде набора, а не каждым из них в отдельности. Вы развертываете и планируете конвейер, а не действия отдельно.

Действия в конвейере определяют действия, выполняемые с данными. Например, можно использовать действие копирования для копирования данных из SQL Server в Azure Blob Storage. Затем используйте действие потока данных или действие в Databricks Notebook для обработки и преобразования данных из объектного хранилища BLOB в пул Azure Synapse Analytics, на основе которого создаются отчеты бизнес-аналитики.

Azure Data Factory и Azure Synapse Analytics имеют три группировки действий: data movement activities, data transformation activities и control activities. У действия может быть ноль или несколько входных наборов данных, и оно может производить один или несколько выходных наборов данных. На следующей схеме показана связь между конвейером, действием и набором данных:

Связь между набором данных, действием и конвейером

Входной набор данных представляет входные данные для действия в конвейере, а выходной набор данных — выходные данные для действия. Наборы данных представляют данные в разных хранилищах, например в таблицах, файлах, папках и документах. Созданные наборы данных можно использовать для действий в конвейере. Например, набор данных может быть входным/выходным набором данных для действия копирования или действия HDInsightHive. Дополнительные сведения о наборах данных см. в статье Наборы данных в Azure Data Factory.

Примечание.

По умолчанию используется мягкое ограничение в 120 активностей на конвейер, включая внутренние активности для контейнеров.

Действия перемещения данных

Действие копирования в Фабрике данных копирует данные из хранилища-источника в хранилище-приемник. Фабрика данных поддерживает хранилища данных, перечисленные в таблице в этом разделе. Данные из любого источника можно записывать в любой приемник.

Дополнительные сведения см. в статье Обзор — действие копирования.

Выберите хранилище данных, чтобы узнать, как скопировать данные в него и из него.

Категория Хранилище данных Поддерживается в качестве источника Поддерживается в качестве приемника Поддерживается Azure IR Поддерживается самостоятельно размещённым IR
Azure Azure хранилище BLOB-объектов
  Индекс поиска по искусственному интеллекту Azure
  Azure Cosmos DB для NoSQL
  Azure Cosmos DB для MongoDB
  Обозреватель данных Azure
  Azure Data Lake Storage Gen1
  Azure Data Lake Storage Gen2
  База данных Azure для MariaDB
  База данных Azure для MySQL
  База данных Azure для PostgreSQL
  Azure Databricks Delta Lake
  Файлы Azure
  База данных SQL Azure
  Azure SQL Managed Instance
  Azure Synapse Analytics
  Azure хранилище таблиц
База данных Amazon RDS для Oracle
  Amazon RDS для SQL Server
  Amazon Redshift
  DB2
  Drill.
  Google BigQuery
  Greenplum
  HBase
  Hive
  Apache Impala
  Информикс
  MariaDB
  Microsoft Access
  MySQL
  Netezza
  Оракул
  Феникс
  PostgreSQL
  Престо
  SAP Business Warehouse через Open Hub
  SAP Business Warehouse через MDX
  SAP HANA Приемник поддерживается только с ODBC Connector и драйвером ODBC SAP HANA ODBC
  Таблица SAP
  Снежинка
  Spark
  SQL Server
  Sybase
  Teradata
  Vertica
NoSQL Кассандра
  Couchbase (предварительная версия)
  MongoDB
  Атлас MongoDB
Файл Amazon S3
  Совместимое хранилище Amazon S3
  Файловая система
  FTP
  Google Cloud Storage
  HDFS
  Хранилище Oracle Cloud
  SFTP
Базовый протокол Базовый HTTP-запрос
  Базовый протокол OData
  Базовый протокол ODBC
  Базовый протокол REST
Службы и приложения Веб-служба Amazon Marketplace (не рекомендуется)
  Concur (предварительная версия)
  Dataverse
  Динамика 365
  Dynamics AX
  Dynamics CRM
  Google AdWords
  Хабспот
  Джира
  Magento (предварительная версия)
  Marketo (предварительная версия)
  Microsoft 365
  Oracle Eloqua (предварительная версия)
  Oracle Responsys (предварительная версия)
  Oracle Service Cloud (предварительная версия)
  PayPal (предварительная версия)
  QuickBooks (предварительная версия)
  Salesforce
  Облако службы Salesforce
  Salesforce Marketing Cloud
  SAP Cloud для заказчика (C4C)
  SAP ECC
  ServiceNow
SharePoint Online список
  Shopify (предварительная версия)
  Square (предварительная версия)
  Веб-таблица (таблица HTML)
  Ксеро
  Zoho (предварительная версия)

Примечание.

Пометка предварительная версия возле соединителя означает, что с ним можно поработать и оставить отзыв. Если вы хотите задействовать зависимость от соединителей предварительной версии в вашем решении, обратитесь к поддержке Azure.

Действия преобразования данных

Azure Data Factory и Azure Synapse Analytics поддерживают следующие действия преобразования, которые можно добавить отдельно или связать с другим действием.

Дополнительные сведения см. в статье Преобразование данных.

Действия по преобразованию данных Вычислительная среда
Data Flow Кластеры Apache Spark, управляемые Azure Data Factory
функция Azure Функции Azure
Hive HDInsight [Hadoop]
Свинья HDInsight [Hadoop]
MapReduce HDInsight [Hadoop]
Hadoop Streaming HDInsight [Hadoop]
Spark HDInsight [Hadoop]
Действия в Azure ML Studio (классическая версия): пакетная обработка и обновление ресурса виртуальная машина Azure
Хранимая процедура Azure SQL, Azure Synapse Analytics или SQL Server
U-SQL Azure Data Lake Analytics
Настраиваемое действие Azure Batch
Ноутбук Databricks Azure Databricks
Активность JAR в Databricks Azure Databricks
действие Databricks Python Azure Databricks
Активность записной книжки Synapse Azure Synapse Analytics

Действия управления потоком

Поддерживаются следующие действия потока управления:

Действие управления Описание
Добавление переменной Добавьте значение в существующую переменную массива.
Запустить конвейер Действие Execute Pipeline позволяет пайплайну Data Factory или Synapse вызвать другой пайплайн.
Фильтр Применение выражения фильтра ко входному массиву
Для каждого Действие ForEach определяет повторяющийся поток управления в конвейере. Это действие используется для выполнения итерации коллекции и выполняет указанные в цикле действия. Реализация цикла этого действия аналогична структуре цикла Foreach на языках программирования.
Получение метаданных Действие GetMetadata можно использовать для получения метаданных для любых данных в конвейере Фабрики данных Azure или Synapse.
Активность для условия If Условие If можно использовать для ветвления на основе условия, которое оценивается как истинное или ложное. Действие условия If предоставляет те же функциональные возможности, что и инструкция if в языках программирования. Оно определяет один набор действий, если условие принимает значение true, и другой набор действий, если условие принимает значение false..
Активность поиска Действие поиска можно использовать для считывания или поиска записи, имени таблицы и значения из внешних источников. На эти выходные данные можно затем ссылаться в последующих действиях.
Установка значения переменной Установка значения существующей переменной.
Действие типа "Until" Реализует цикл Do Until, который аналогичен циклической структуре Do-Until в языках программирования. Оно выполняет набор действий в цикле, пока условие, связанное с действием, не получит значение true. Можно указать значение времени ожидания для действия до тех пор.
Проверочная операция Убедитесь, что конвейер продолжает выполняться только в том случае, если эталонный набор данных существует, соответствует заданным условиям или достигается время ожидания.
Действие ожидания Если в конвейере используется действие Wait, он приостанавливает обработку на указанное время, прежде чем перейти к выполнению последующих действий.
Веб-действие Веб-активность можно использовать для вызова пользовательской конечной точки REST из конвейера. Вы можете передать наборы данных и связанные службы, которые будет использовать это действие и к которым оно будет обращаться.
Активность Webhook С помощью активности веб-перехватчика вызовите конечную точку и передайте URL-адрес для обратного вызова. Выполнение конвейера дожидается обратного вызова, после чего переходит к следующему действию.

Создание конвейера с помощью пользовательского интерфейса

Чтобы создать новый конвейер, перейдите на вкладку "Автор" в Data Factory Studio (представленная значком карандаша), а затем выберите знак плюса и выберите "Конвейер" в меню, а затем снова в подменю конвейера.

Показывает шаги по созданию нового канала с использованием Azure Data Factory Studio.

Фабрика данных отображает редактор конвейера, где можно найти:

  1. все действия, которые могут использоваться в конвейере;
  2. Поле редактора конвейера, где отображаются действия после добавления в конвейер.
  3. область настройки конвейера, включая параметры, переменные, общие параметры и выходные данные;
  4. область свойств конвейера, где можно настроить имя конвейера, необязательное описание и заметки. В этой области также отображаются все элементы, связанные с конвейером, внутри фабрики данных.

Снимок экрана, на котором показана область редактора конвейера в студии Azure Data Factory с выделением каждого из описанных разделов.

JSON-файл конвейера.

Вот как конвейер определен в формате JSON:

{
    "name": "PipelineName",
    "properties":
    {
        "description": "pipeline description",
        "activities":
        [
        ],
        "parameters": {
        },
        "concurrency": <your max pipeline concurrency>,
        "annotations": [
        ]
    }
}
Тег Описание Тип Обязательное поле
имя Имя конвейера. Укажите имя, представляющее действие, которое выполняет конвейер.
  • Максимальное количество знаков: 140
  • Должно начинаться с буквы, цифры или символа подчеркивания (_).
  • Следующие символы не допускаются: ".", "+", "?", "/", "", "", "*", "%", "&"
Строка Да
описание Укажите текст, описывающий, для чего используется конвейер. Строка Нет
мероприятия В разделе activities можно определить одно или несколько действий. Дополнительные сведения об элементе JSON активности см. в разделе Activity JSON. Массив Да
Параметры В разделе Параметры может быть задан один или несколько параметров, определенных в конвейере, которые делают конвейер гибким для повторного использования. Список Нет
конкурентность Максимальное количество одновременных запусков конвейера. По умолчанию нет максимального значения. Если достигнуто ограничение параллелизма, дополнительные запуски конвейера будут помещены в очередь до завершения предыдущих. Число Нет
аннотации Список тегов, связанных с конвейером Массив Нет

Активность JSON

В разделе activities можно определить одно или несколько действий. Существует два основных типа действий: действия выполнения и действия управления.

Операции выполнения

Действия выполнения включают в себя действия перемещения данных и преобразования данных. Они имеют следующую структуру верхнего уровня:

{
    "name": "Execution Activity Name",
    "description": "description",
    "type": "<ActivityType>",
    "typeProperties":
    {
    },
    "linkedServiceName": "MyLinkedService",
    "policy":
    {
    },
    "dependsOn":
    {
    }
}

В приведенной ниже таблице описаны свойства в JSON-определении действий:

Тег Описание Обязательное поле
имя Название действия. Укажите имя, представляющее операцию, которую выполняет действие.
  • Максимальное количество знаков: 55
  • Должно начинаться с буквенно-цифрового символа или знака подчеркивания (_).
  • Следующие символы не допускаются: ".", "+", "?", "/", "", "", "*", "%", "&", ":", " "
Да
описание Текст, описывающий, для чего используется действие Да
тип Тип действия. Разные типы действий описаны в разделах, посвященных действиям перемещения данных, действиям преобразования данных и действиям управления. Да
ИмяСвязанногоСервиса Имя связанной службы, используемой действием.

Для действия может потребоваться указать связанную службу, которая связывается с требуемой вычислительной средой.
Обязательно для действия HDInsight, действия пакетной проверки в классической версии Студии машинного обучения и действия хранимой процедуры.

Нет — для всех остальных
свойстваТипа Свойства в разделе typeProperties зависят от каждого типа действия. Чтобы просмотреть свойства типа для действия, выберите ссылки на действие в предыдущем разделе. Нет
политика Политики, которые влияют на поведение во время выполнения действия. Это свойство включает в себя время ожидания и алгоритм повторных попыток. Если оно не указано, используются значения по умолчанию. Дополнительные сведения см. в разделе Политика действий. Нет
Свойство dependsOn Это свойство используется для определения зависимостей между последующими и предыдущими действиями. Дополнительные сведения см. в разделе Зависимости действий. Нет

Политика действий

Политики влияют на поведение действия во время его выполнения, предоставляя возможность настройки параметров. Политики действий доступны только для выполнения действий.

Определение JSON политики активности (Activity policy JSON definition)

{
    "name": "MyPipelineName",
    "properties": {
      "activities": [
        {
          "name": "MyCopyBlobtoSqlActivity",
          "type": "Copy",
          "typeProperties": {
            ...
          },
         "policy": {
            "timeout": "00:10:00",
            "retry": 1,
            "retryIntervalInSeconds": 60,
            "secureOutput": true
         }
        }
      ],
        "parameters": {
           ...
        }
    }
}
Имя JSON Описание Допустимые значения Обязательное поле
таймаут Определяет таймаут для выполнения действия. Временной диапазон № Время ожидания по умолчанию — 12 часов, минимум 10 минут.
повторить попытку Максимальное количество повторных попыток Целое число № Значение по умолчанию — 0.
интервалПовторнойПопыткиВСекундах Задержка между повторными попытками (в секундах) Целое число № Значение по умолчанию: 30 секунд
безопасный вывод Если задано значение true, выходные данные действия будут рассматриваться как безопасные. Они не будут регистрироваться для мониторинга. Логический № По умолчанию — false.

Действие управления

Действие управления имеет следующую структуру верхнего уровня:

{
    "name": "Control Activity Name",
    "description": "description",
    "type": "<ActivityType>",
    "typeProperties":
    {
    },
    "dependsOn":
    {
    }
}
Тег Описание Обязательное поле
имя Название действия. Укажите имя, представляющее операцию, которую выполняет действие.
  • Максимальное количество знаков: 55
  • Должно начинаться с буквы, цифры или символа подчеркивания (_).
  • Следующие символы не допускаются: ".", "+", "?", "/", "", "", "*", "%", "&", ":", " "
Да
    описание Текст, описывающий, для чего используется действие Да
    тип Тип действия. Разные типы действий описаны в разделах, посвященных действиям перемещения данных, действиям преобразования данных и действиям управления. Да
    свойстваТипа Свойства в разделе typeProperties зависят от каждого типа действия. Чтобы просмотреть свойства типа для действия, выберите ссылки на действие в предыдущем разделе. Нет
    Свойство dependsOn Это свойство используется для определения зависимости активности и того, как последующие действия зависят от предыдущих действий. Дополнительные сведения см. в разделе Зависимости действий. Нет

    Зависимость активностей

    Зависимость действий определяет, как последующие действия зависят от предыдущих, указывая, следует ли продолжать выполнение следующей задачи. Действие может зависеть от одного или нескольких предыдущих действий с разными условиями зависимости.

    Различные условия зависимости: "Выполнено", "Сбой", "Пропущено", "Завершено".

    Например, если в потоке данных есть активность A -> активность B, возможны различные развития событий:

    • Действие B имеет условие зависимости от действия А с условием Выполнено: действие B выполняется, только если действие А имеет конечное состояние "Выполнено".
    • Действие B имеет условие зависимости от действия А с условием Сбой: действие B выполняется, только если действие А имеет конечное состояние "Сбой".
    • Действие Б имеет условие зависимости от действия А с условием Завершено: действие B выполняется, если действие А имеет конечное состояние "Выполнено" или "Сбой".
    • Для действия Б задано условие зависимости от действия А с условием Пропущено: действие B выполняется, если действие А находится в окончательном состоянии "Пропущено". Состояние "Пропущено" возникает в сценарии, когда действие X -> действие Y -> действие Z, и каждое действие выполняется только в случае успешного завершения предыдущего действия. При сбое действия X действие Y остается в состоянии "Пропущено", так как оно никогда не выполнится. Аналогично, действие Z находится в состоянии "Пропущено".

    Пример. Действие 2 зависит от успешного завершения действия 1

    {
        "name": "PipelineName",
        "properties":
        {
            "description": "pipeline description",
            "activities": [
             {
                "name": "MyFirstActivity",
                "type": "Copy",
                "typeProperties": {
                },
                "linkedServiceName": {
                }
            },
            {
                "name": "MySecondActivity",
                "type": "Copy",
                "typeProperties": {
                },
                "linkedServiceName": {
                },
                "dependsOn": [
                {
                    "activity": "MyFirstActivity",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
              ]
            }
          ],
          "parameters": {
           }
        }
    }
    
    

    Пример конвейера копирования

    В следующем примере конвейера в разделе activities есть одно действие типа Copy. В этом примере действие copy копирует данные из хранилища BLOB-объектов Azure в базу данных в Azure SQL Database.

    {
      "name": "CopyPipeline",
      "properties": {
        "description": "Copy data from a blob to Azure SQL table",
        "activities": [
          {
            "name": "CopyFromBlobToSQL",
            "type": "Copy",
            "inputs": [
              {
                "name": "InputDataset"
              }
            ],
            "outputs": [
              {
                "name": "OutputDataset"
              }
            ],
            "typeProperties": {
              "source": {
                "type": "BlobSource"
              },
              "sink": {
                "type": "SqlSink",
                "writeBatchSize": 10000,
                "writeBatchTimeout": "60:00:00"
              }
            },
            "policy": {
              "retry": 2,
              "timeout": "01:00:00"
            }
          }
        ]
      }
    }
    

    Обратите внимание на следующие аспекты:

    • В разделе действий есть только одно действие, тип которого имеет значение Copy.
    • Для этого действия параметру input присвоено значение InputDataset, а параметру output — значение OutputDataset. Сведения об определении наборов данных в JSON см. в статье Наборы данных.
    • В разделе typeProperties в качестве типа источника указано BlobSource, а в качестве типа приемника — SqlSink. В разделе действий по перемещению данных выберите хранилище данных, которое вы хотите использовать в качестве источника или приемника, чтобы узнать больше о перемещении данных в это хранилище данных или из нее.

    Полное пошаговое руководство по созданию этого конвейера см. в статье Краткое руководство: создание фабрики данных.

    Пример конвейера преобразования

    В следующем примере конвейера в разделе activities есть одно действие типа HDInsightHive. В этом примере действие HDInsight Hive преобразует данные из хранилища BLOB-объектов Azure путём выполнения скрипта Hive в кластере Azure HDInsight Hadoop.

    {
        "name": "TransformPipeline",
        "properties": {
            "description": "My first Azure Data Factory pipeline",
            "activities": [
                {
                    "type": "HDInsightHive",
                    "typeProperties": {
                        "scriptPath": "adfgetstarted/script/partitionweblogs.hql",
                        "scriptLinkedService": "AzureStorageLinkedService",
                        "defines": {
                            "inputtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/inputdata",
                            "partitionedtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/partitioneddata"
                        }
                    },
                    "inputs": [
                        {
                            "name": "AzureBlobInput"
                        }
                    ],
                    "outputs": [
                        {
                            "name": "AzureBlobOutput"
                        }
                    ],
                    "policy": {
                        "retry": 3
                    },
                    "name": "RunSampleHiveActivity",
                    "linkedServiceName": "HDInsightOnDemandLinkedService"
                }
            ]
        }
    }
    

    Обратите внимание на следующие аспекты:

    • В разделе действий есть только одно действие, тип которого имеет значение HDInsightHive.
    • Файл скрипта Hive, partitionweblogs.hql, хранится в учетной записи Azure Storage (указанной скриптомLinkedService с именем AzureStorageLinkedService) и в папке скрипта в контейнере adfgetstarted.
    • Раздел defines используется, чтобы указать параметры среды выполнения, передаваемые скрипту Hive в качестве значений конфигурации Hive (например, {hiveconf:inputtable}, $${hiveconf:partitionedtable}).

    Разделы typeProperties для каждого действия преобразования отличаются. Чтобы узнать о свойствах типов, поддерживаемых для действия преобразования, выберите действие преобразования в действиях преобразования данных.

    Полное пошаговое руководство по созданию этого конвейера см. в статье Учебник: преобразование данных с использованием Spark.

    Несколько действий в конвейере

    В двух предыдущих примерах конвейеров содержится только одно действие. Конвейер может включать несколько задач. Если в конвейере есть несколько действий, а последующие действия не зависят от предыдущих действий, эти действия могут выполняться параллельно.

    Два действия можно соединить с помощью зависимости действий, определяющей зависимость последующих действий от предыдущих, а также то, следует ли продолжать выполнение следующей задачи. Действие может зависеть от одного или нескольких предыдущих действий с разными условиями зависимости.

    Планирование конвейеров

    Конвейеры планируются триггерами. Существуют различные типы триггеров (триггер планировщика, который позволяет запускать конвейеры по расписанию, а также ручной триггер, который активирует конвейеры по запросу). Дополнительные сведения см. в статье Pipeline execution and triggers in Azure Data Factory (Выполнение конвейера и триггеры в фабрике данных Azure).

    Чтобы триггер запускал выполнение конвейера, необходимо включить в определение триггера ссылку на конкретный конвейер. Конвейеры и триггеры имеют связь n-m. Один триггер может запускать несколько конвейеров, и несколько триггеров могут запускать один конвейер. После определения триггера необходимо активировать его, чтобы он начал запуск конвейера. Дополнительные сведения см. в статье Pipeline execution and triggers in Azure Data Factory (Выполнение конвейера и триггеры в фабрике данных Azure).

    Предположим, у вас есть триггер по расписанию («Триггер A»), который я хочу использовать для запуска конвейера «MyCopyPipeline». Вы определяете триггер, как показано в следующем примере:

    Определение триггера А

    {
      "name": "TriggerA",
      "properties": {
        "type": "ScheduleTrigger",
        "typeProperties": {
          ...
          }
        },
        "pipeline": {
          "pipelineReference": {
            "type": "PipelineReference",
            "referenceName": "MyCopyPipeline"
          },
          "parameters": {
            "copySourceName": "FileSource"
          }
        }
      }
    }