Программный мониторинг Azure Data Factory

ПРИМЕНИМО К: Azure Data Factory Azure Synapse Analytics

Совет

Data Factory в Microsoft Fabric — это следующее поколение Azure Data Factory с более простой архитектурой, встроенным ИИ и новыми функциями. Если вы не знакомы с интеграцией данных, начните с Fabric Data Factory. Существующие рабочие нагрузки ADF могут обновляться до Fabric для доступа к новым возможностям в области обработки и анализа данных, аналитики в режиме реального времени и отчетов.

В этой статье описывается как отслеживать конвейер в фабрике данных с помощью различных пакетов средств разработки программного обеспечения (пакетов SDK).

Примечание.

Мы рекомендуем использовать модуль Az PowerShell Azure для взаимодействия с Azure. Сведения о начале работы см. в разделе Install Azure PowerShell. Сведения о миграции в модуль Az PowerShell см. в статье Migrate Azure PowerShell из AzureRM в Az.

Диапазон данных

В фабрике данных данные запуска конвейеров сохраняются только в течение 45 дней. При программном запросе сведений о запусках конвейеров фабрики данных - например, с помощью команды PowerShell Get-AzDataFactoryV2PipelineRun - максимальные даты для необязательных параметров LastUpdatedAfter и LastUpdatedBefore отсутствуют. Однако если вы запросите данные за прошлый год, например, вы получите не ошибку, а лишь данные о запуске конвейера за последние 45 дней.

Если вы хотите сохранить данные запуска конвейера более 45 дней, настройте собственное ведение журнала диагностики с помощью Azure Monitor.

Сведения о запуске конвейера

Для свойств запуска конвейера данных см. справочник по API PipelineRun. Конвейер имеет разные статусы в течение своего жизненного цикла, возможные значения статусов запуска перечислены ниже:

  • В очереди
  • InProgress
  • Выполнено успешно
  • Неудачно
  • Отмена
  • Отменено

.NET

Полное пошаговое руководство по созданию и мониторингу конвейера с использованием .NET SDK см. в статье Создание фабрики данных и конвейера с использованием .NET.

  1. Добавьте следующий код, чтобы постоянно проверять состояние выполнения конвейера до завершения копирования данных.

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId);
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. Добавьте следующий код, извлекающий сведения о выполнении действия копирования, например размер записанных и прочитанных данных.

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10));
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams);
    if (pipelineRun.Status == "Succeeded")
        Console.WriteLine(queryResponse.Value.First().Output);
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Полную документацию по .NET SDK см. в справочнике Data Factory .NET SDK reference.

Python

Полное пошаговое руководство по созданию и мониторингу конвейера с помощью пакета SDK Python см. в разделе Создание фабрики данных и конвейера с помощью Python.

Для отслеживания работы конвейера добавьте следующий код.

# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
    rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
    last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
    rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])

Полную документацию по пакету SDK Python см. в справочнике по SDK Python Data Factory.

REST API

Полное пошаговое руководство по созданию и мониторингу конвейера с помощью REST API см. в разделе Создание фабрики данных и конвейера с помощью REST API.

  1. Запустите следующий скрипт, чтобы непрерывно проверять состояние выполнения работы до тех пор, пока не закончится копирование данных.

    $request = "https://management.azure.com/subscriptions/${subsId}/resourceGroups/${resourceGroup}/providers/Microsoft.DataFactory/factories/${dataFactoryName}/pipelineruns/${runId}?api-version=${apiVersion}"
    while ($True) {
        $response = Invoke-RestMethod -Method GET -Uri $request -Header $authHeader
        Write-Host  "Pipeline run status: " $response.Status -foregroundcolor "Yellow"
    
        if ( ($response.Status -eq "InProgress") -or ($response.Status -eq "Queued") ) {
            Start-Sleep -Seconds 15
        }
        else {
            $response | ConvertTo-Json
            break
        }
    }
    
  2. Запустите следующий скрипт, извлекающий сведения о выполнении действия копирования, например размер записанных и прочитанных данных.

    $request = "https://management.azure.com/subscriptions/${subscriptionId}/resourceGroups/${resourceGroupName}/providers/Microsoft.DataFactory/factories/${factoryName}/pipelineruns/${runId}/queryActivityruns?api-version=${apiVersion}&startTime="+(Get-Date).ToString('yyyy-MM-dd')+"&endTime="+(Get-Date).AddDays(1).ToString('yyyy-MM-dd')+"&pipelineName=Adfv2QuickStartPipeline"
    $response = Invoke-RestMethod -Method POST -Uri $request -Header $authHeader
    $response | ConvertTo-Json
    

Полная документация по REST API приведена в справочнике по REST API фабрики данных.

PowerShell

Полное пошаговое руководство по созданию и мониторингу конвейера с помощью PowerShell см. в разделе Создание фабрики данных и конвейера с помощью PowerShell.

  1. Запустите следующий скрипт, чтобы непрерывно проверять состояние выполнения работы до тех пор, пока не закончится копирование данных.

    while ($True) {
        $run = Get-AzDataFactoryV2PipelineRun -ResourceGroupName $resourceGroupName -DataFactoryName $DataFactoryName -PipelineRunId $runId
    
        if ($run) {
            if ( ($run.Status -ne "InProgress") -and ($run.Status -ne "Queued") ) {
                Write-Output ("Pipeline run finished. The status is: " +  $run.Status)
                $run
                break
            }
            Write-Output ("Pipeline is running...status: " + $run.Status)
        }
    
        Start-Sleep -Seconds 30
    }
    
  2. Запустите следующий скрипт, извлекающий сведения о выполнении действия копирования, например размер записанных и прочитанных данных.

    Write-Host "Activity run details:" -foregroundcolor "Yellow"
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    $result
    
    Write-Host "Activity 'Output' section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "\nActivity 'Error' section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n"
    

Полная документация по командлетам PowerShell приведена в справочнике по командлетам PowerShell для фабрики данных.

См. статью мониторинг конвейеров с помощью Azure Monitor, чтобы узнать об использовании Azure Monitor для мониторинга конвееров в Data Factory.