Выполнение действий конвейера в Azure Data Factory и Synapse Analytics

ПРИМЕНИМО К: Azure Data Factory Azure Synapse Analytics

Совет

Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.

Действие Execute Pipeline позволяет конвейеру Фабрики данных или Synapse вызвать другой конвейер.

Создание действия Execute Pipeline с помощью пользовательского интерфейса

Чтобы использовать действие Execute Pipeline в конвейере, выполните следующие действия.

  1. Найдите конвейер в области действий конвейера и перетащите действие Execute Pipeline на холст конвейера.

  2. Выберите новое действие Execute Pipeline на холсте, если оно еще не выбрано, и перейдите на вкладку Параметры, чтобы изменить сведения о нем.

    Отображает интерфейс для активности выполнения конвейера.

  3. Выберите существующий конвейер или создайте новый с помощью кнопки "Создать". Выберите другие параметры и настройте необходимые параметры для конвейера, чтобы завершить настройку.

Синтаксис

{
    "name": "MyPipeline",
    "properties": {
        "activities": [
            {
                "name": "ExecutePipelineActivity",
                "type": "ExecutePipeline",
                "typeProperties": {
                    "parameters": {                        
                        "mySourceDatasetFolderPath": {
                            "value": "@pipeline().parameters.mySourceDatasetFolderPath",
                            "type": "Expression"
                        }
                    },
                    "pipeline": {
                        "referenceName": "<InvokedPipelineName>",
                        "type": "PipelineReference"
                    },
                    "waitOnCompletion": true
                 }
            }
        ],
        "parameters": [
            {
                "mySourceDatasetFolderPath": {
                    "type": "String"
                }
            }
        ]
    }
}

Свойства типа

Свойство Описание Допустимые значения Обязательное поле
имя Имя действия исполнения конвейера. Строка Да
тип Должно иметь значение ExecutePipeline. Строка Да
конвейер Ссылка на зависимый конвейер, который вызывает этот конвейер. Объект ссылки конвейера имеет два свойства: referenceName и type. Свойство referenceName указывает имя эталонного конвейера. Для свойства type необходимо задать значение PipelineReference. PipelineReference Да
параметры Параметры для передачи в вызванный конвейер Объект JSON, сопоставляющий имена параметров со значениями аргументов Нет
waitOnCompletion Определяет, будет ли при выполнении действия ожидаться завершение выполнения зависимого конвейера. Значение по умолчанию — true. Логический Нет

Пример

В этом сценарии есть два конвейера:

  • Главный конвейер. Этот конвейер содержит одно действие выполнения конвейера, вызывающее вызванный конвейер. Главный конвейер принимает два параметра: masterSourceBlobContainer, masterSinkBlobContainer.
  • Вызванный конвейер — в этом конвейере есть одна операция копирования, которая копирует данные из источника Azure Blob в приемник Azure Blob. Вызванный конвейер принимает два параметра: sourceBlobContainer, sinkBlobContainer.

Определение главного конвейера

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ExecutePipeline",
        "typeProperties": {
          "pipeline": {
            "referenceName": "invokedPipeline",
            "type": "PipelineReference"
          },
          "parameters": {
            "sourceBlobContainer": {
              "value": "@pipeline().parameters.masterSourceBlobContainer",
              "type": "Expression"
            },
            "sinkBlobContainer": {
              "value": "@pipeline().parameters.masterSinkBlobContainer",
              "type": "Expression"
            }
          },
          "waitOnCompletion": true
        },
        "name": "MyExecutePipelineActivity"
      }
    ],
    "parameters": {
      "masterSourceBlobContainer": {
        "type": "String"
      },
      "masterSinkBlobContainer": {
        "type": "String"
      }
    }
  }
}

Определение вызываемого конвейера

{
  "name": "invokedPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "name": "CopyBlobtoBlob",
        "inputs": [
          {
            "referenceName": "SourceBlobDataset",
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sinkBlobDataset",
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      }
    }
  }
}

Связанная служба

{
    "name": "BlobStorageLinkedService",
    "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": "DefaultEndpointsProtocol=https;AccountName=*****;AccountKey=*****"
    }
  }
}

Исходный набор данных

{
    "name": "SourceBlobDataset",
    "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@pipeline().parameters.sourceBlobContainer",
        "type": "Expression"
      },
      "fileName": "salesforce.txt"
    },
    "linkedServiceName": {
      "referenceName": "BlobStorageLinkedService",
      "type": "LinkedServiceReference"
    }
  }
}

Набор данных приемника

{
    "name": "sinkBlobDataset",
    "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@pipeline().parameters.sinkBlobContainer",
        "type": "Expression"
      }
    },
    "linkedServiceName": {
      "referenceName": "BlobStorageLinkedService",
      "type": "LinkedServiceReference"
    }
  }
}

Запуск конвейера

Чтобы запустить главный конвейер в этом примере, для параметров masterSourceBlobContainer и masterSinkBlobContainer нужно передать следующие значения:

{
  "masterSourceBlobContainer": "executetest",
  "masterSinkBlobContainer": "executesink"
}

Главный конвейер переадресовывает эти значения в вызванный конвейер, как показано в следующем примере:

{
    "type": "ExecutePipeline",
    "typeProperties": {
      "pipeline": {
        "referenceName": "invokedPipeline",
        "type": "PipelineReference"
      },
      "parameters": {
        "sourceBlobContainer": {
          "value": "@pipeline().parameters.masterSourceBlobContainer",
          "type": "Expression"
        },
        "sinkBlobContainer": {
          "value": "@pipeline().parameters.masterSinkBlobContainer",
          "type": "Expression"
        }
      },

      ....
}

Предупреждение

Выполнение действия конвейера передает параметр массива в виде строки дочернему конвейеру. Это связано с тем, что полезные данные передаются из родительского конвейера дочернему >объекту в виде строки. Мы можем увидеть это, когда проверяем входные данные, переданные в дочерний конвейер. Дополнительные сведения см. в этом разделе .

Ознакомьтесь с другими поддерживаемыми действиями потока управления: