Поделиться через


Что такое "Устойчивые функции"?

Устойчивые функции — это функция Функции Azure, которая позволяет создавать функции с отслеживанием состояния в бессерверной вычислительной среде. Расширение позволяет задавать рабочие процессы с отслеживанием состояния, используя функции оркестратора, и сущности с отслеживанием состояния, используя функции сущностей на основе модели программирования Azure Functions. В фоновом режиме расширение автоматически управляет состоянием, создает контрольные точки и перезагружается, позволяя вам сосредоточиться на бизнес-логике.

Поддерживаемые языки

Durable Functions предназначены для работы со всеми языками программирования Azure Functions, но могут иметь разные минимальные требования для каждого языка. В следующей таблице показаны минимальные поддерживаемые конфигурации приложений:

Языковой стек Версии среды выполнения Функций Azure Версия для языковых специалистов Минимальная версия пакетов
.NET / C# / F# Функции 1.0 и более поздней версии В процессе
Внепроцессный
Н/Д
JavaScript/TypeScript (модель версии 3) Функции 2.0 и более поздней версии Node 8 и более поздней версии Пакеты 2.x
JavaScript/TypeScript (четвертая версия программной модели) Функции 4.25+ Node 18+ 3.15+ пакеты
Python Функции 2.0 и более поздней версии Python 3.7 и более поздней версии Пакеты 2.x
Python (программная модель версии 2) Функции 4.0+ Python 3.7 и более поздней версии 3.15+ пакеты
PowerShell Функции 3.0 и более поздней версии PowerShell 7 и более поздней версии Пакеты 2.x
Java Функции 4.0+ Java 8 и более поздней версии Пакеты 4.х

Внимание

В этой статье используются вкладки для поддержки нескольких версий модели программирования Node.js. Модель версии 4 общедоступна и предназначена для более гибкого и интуитивно понятного интерфейса для разработчиков JavaScript и TypeScript. Дополнительные сведения о том, как работает модель версии 4, см. в руководстве разработчика Azure Functions для Node.js. Дополнительные сведения о различиях между версиями 3 и 4 см. в руководстве по миграции.

Внимание

В этой статье используются вкладки для поддержки нескольких версий модели программирования Python. Модель версии 2 общедоступна и предназначена для более ориентированного на код способа разработки функций с помощью декораторов. Дополнительные сведения о модели версии 2 см. в руководстве разработчика для функций Azure.

Как и в случае с Функциями Azure, существуют шаблоны для разработки Устойчивых функций с помощью Visual Studio, Visual Studio Code и портала Azure.

Шаблоны приложений

Основной вариант использования Устойчивых функций — это упрощение комплексных требований координации с отслеживанием состояния в безсерверных приложениях. В следующих разделах описываются типичные шаблоны приложений, в которых можно эффективно использовать Устойчивые функции.

Шаблон 1. Цепочка функций

В шаблоне цепочки функций последовательность функций выполняется в определенном порядке. В этом шаблоне выходные данные одной функции применяются к входным данным другой. Использование очередей между каждой функцией гарантирует, что система остается устойчивой и масштабируемой, даже если есть поток управления от одной функции к следующей.

Схема шаблона цепочки функций.

Устойчивые функции можно использовать для реализации шаблона цепочки функций, как показано в следующем примере.

В этом примере значения F1, F2, F3 и F4 являются именами других функций в том же приложении-функции. Поток управления можно реализовать с помощью обычных принудительных конструкций программирования. Код выполняется сверху вниз. Код может включать в себя имеющуюся семантику языка потока управления, такую ​​как условные обозначения и циклы. Вы можете включить логику обработки ошибок в блоках try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

Вы можете использовать параметр context для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает await, платформа Durable Functions контролирует выполнение текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова await. Дополнительные сведения см. в следующем разделе: Шаблон #2: Фан-out/fan-in.

В этом примере значения F1, F2, F3 и F4 являются именами других функций в том же приложении-функции. Поток управления можно реализовать с помощью обычных принудительных конструкций программирования. Код выполняется сверху вниз. Код может включать в себя имеющуюся семантику языка потока управления, такую ​​как условные обозначения и циклы. Вы можете включить логику обработки ошибок в блоках try/catch/finally.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

Вы можете использовать объект context.df для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает yield, платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова yield. Дополнительные сведения см. в следующем разделе: Шаблон #2: Фан-out/fan-in.

Примечание.

Объект context в JavaScript представляет весь контекст функции. Доступ к контексту Устойчивых функций с помощью свойства df в основном контексте.

В этом примере значения F1, F2, F3 и F4 являются именами других функций в том же приложении-функции. Поток управления можно реализовать с помощью обычных принудительных конструкций программирования. Код выполняется сверху вниз. Код может включать в себя имеющуюся семантику языка потока управления, такую ​​как условные обозначения и циклы.

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result

main = df.Orchestrator.create(orchestrator_function)

Вы можете использовать объект context для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает yield, платформа Durable Functions фиксирует прогресс текущего экземпляра функции. Если процесс или виртуальная машина повторно запускается в середине выполнения, экземпляр функции возобновляется с предыдущего вызова yield. Дополнительные сведения см. в следующем разделе: Шаблон #2: Фан-out/fan-in.

Примечание.

Объект context в Python представляет контекст оркестрации. Получите доступ к основному контексту Функций Azure, используя свойство function_context в контексте оркестрации.

В этом примере значения F1, F2, F3 и F4 являются именами других функций в том же приложении-функции. Поток управления можно реализовать с помощью обычных принудительных конструкций программирования. Код выполняется сверху вниз. Код может включать в себя имеющуюся семантику языка потока управления, такую ​​как условные обозначения и циклы.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

Вы можете использовать команду Invoke-DurableActivity для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Каждый раз, когда код вызывает Invoke-DurableActivity без переключателя NoWait, среда Устойчивых функций фиксирует прогресс выполнения текущего экземпляра функции. Если процесс или виртуальная машина перезапускается во время выполнения, экземпляр функции возобновляется из предыдущего вызова Invoke-DurableActivity. Дополнительную информацию см. в следующем разделе: Шаблон №2: Fan-out/Fan-in.

В этом примере значения F1, F2, F3 и F4 являются именами других функций в том же приложении-функции. Поток управления можно реализовать с помощью обычных принудительных конструкций программирования. Код выполняется сверху вниз. Код может включать в себя имеющуюся семантику языка потока управления, такую ​​как условные обозначения и циклы.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

Вы можете использовать объект ctx для вызова других функций по имени, передачи параметров и возврата выходных данных функции. Выходные данные этих методов — это Task<V> объект, в котором V тип данных, возвращаемых вызываемой функцией. Каждый раз, когда вызывается Task<V>.await(), платформа Устойчивых функций создает контрольные точки выполнения текущего экземпляра функции. Если процесс неожиданно перезапускается во время выполнения, экземпляр функции возобновляется с момента предыдущего вызова Task<V>.await(). Дополнительные сведения см. в следующем разделе: Шаблон #2: Фан-out/fan-in.

Шаблон #2: разветвление/сведение

В шаблоне распараллеливания и объединения выполняется несколько функций параллельно, и затем ожидаем завершения всех функций. Часто выполняется некоторая работа по агрегированию над результатами, которые возвращают функции.

Схема шаблона fan-out/fan-in.

При использовании обычных функций вы можете расширить операции, отправив функцией несколько сообщений в очередь. Вернуться обратно гораздо сложнее. Чтобы выполнить объединение в обычной функции, напишите код для отслеживания момента, когда функции, активируемые очередью, завершаются, а затем их выходные значения сохраняются.

Расширение "Устойчивые функции" обрабатывает этот шаблон с помощью относительно простого кода:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

Процесс развертывания распределяется по нескольким экземплярам функции F2. Работа отслеживается с помощью динамического списка задач. Task.WhenAll вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.

Автоматическое создание контрольных точек, которое происходит при вызове await к Task.WhenAll, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

Работа по расщеплению распределяется по нескольким экземплярам функции F2. Работа отслеживается с использованием динамического списка задач. context.df.Task.all API вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.

Автоматическое создание контрольных точек, которое происходит при вызове yield к context.df.Task.all, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

Процесс развертывания распределяется по нескольким экземплярам функции F2. Работа отслеживается с использованием динамического списка задач. context.task_all API вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.

Автоматическое создание контрольных точек, которое происходит при вызове yield к context.task_all, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

Процесс развертывания распределяется по нескольким экземплярам функции F2. Обратите внимание, что параметр NoWait при вызове функции F2 позволяет оркестратору продолжать вызов F2 без ожидания завершения действия. Работа отслеживается с использованием динамического списка задач. Команда Wait-ActivityFunction вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из списка динамических задач и передаются функции F3.

Автоматическое создание контрольных точек, которое происходит при вызове Wait-ActivityFunction, гарантирует, что возможный сбой или перезагрузка во время выполнения не потребуют перезапуска уже завершенной задачи.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

Процесс развертывания распределяется по нескольким экземплярам функции F2. Работа отслеживается с использованием динамического списка задач. ctx.allOf(parallelTasks).await() вызывается для ожидания завершения всех вызванных функций. Затем выходные данные функции F2 агрегируются из динамического списка задач и возвращаются в виде выходных данных функции оркестратора.

Автоматическое создание контрольных точек, которое происходит при вызове .await() на ctx.allOf(parallelTasks), гарантирует, что неожиданная перезагрузка процесса не потребует перезапуска уже завершенных задач.

Примечание.

В редких случаях может произойти сбой в окне после завершения функции действия, но до того, как ее выполнение будет сохранено в журнале оркестрации. Если это произойдет, функция действия будет повторно запускаться с самого начала после восстановления процесса.

Шаблон 3. Асинхронные API-интерфейсы HTTP

Шаблон асинхронного API HTTP решает проблему координации состояния длительных операций с внешними клиентами. Распространенный способ реализации этого шаблона заключается в том, что конечная точка HTTP запускает длительное действие. Затем перенаправьте клиент на конечную точку состояния, которую клиент опрашивает для получения сведений о завершении операции.

Схема, показывающая шаблон API HTTP.

Устойчивые функции предоставляют встроенную поддержку для этого шаблона, упрощая или даже удаляя код, который вам нужно написать для взаимодействия с длительными выполнениями функций. Например, примеры краткого руководства по Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell и Java) показывают простую команду REST, которую можно использовать для запуска новых экземпляров функций оркестратора. После запуска экземпляра расширение предоставляет HTTP API веб-перехватчика, которые запрашивают состояние функции оркестратора.

В следующем примере показаны команды REST, которые запускают оркестратор и запрашивают его состояние. Для ясности в примере опущены некоторые данные протокола.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Так как управление состоянием осуществляется средой выполнения Устойчивых функций, вам не нужно реализовывать собственный механизм отслеживания состояния.

Расширение "Устойчивые функции" предоставляет встроенные API HTTP, которые управляют длительными оркестрациями. Кроме того, этот шаблон можно реализовать самостоятельно с помощью собственных триггеров функций (например, HTTP, очереди или Центры событий Azure) и устойчивой привязки клиента. Например, для активации действия прекращения можно использовать сообщение очереди. Кроме того, можно использовать триггер HTTP, защищенный политикой проверки подлинности Microsoft Entra, вместо встроенных API HTTP, использующих созданный ключ для проверки подлинности.

Дополнительные сведения см. в статье о функциях HTTP, в которой объясняется, как можно предоставлять асинхронные длительные процессы по протоколу HTTP с помощью расширения "Устойчивые функции".

Шаблон #4. Монитор

Шаблон мониторинга представляет собой гибкий, повторяющийся процесс в рабочем процессе. Например, повторение опроса, пока не будут выполнены определенные условия. Вы можете использовать регулярный триггер таймера для решения базового сценария, например периодической очистки, но его интервал является статическим, что усложняет управление временем жизни экземпляра. Вы можете использовать Устойчивые функции, чтобы создать гибкие интервалы повторения, управлять временем существования задачи и создавать несколько процессов мониторинга в рамках одной оркестрации.

Примером шаблона мониторинга является измененный сценарий асинхронных API-интерфейсов HTTP, приведенный ранее. Вместо предоставления конечной точки внешнему клиенту для мониторинга длительной операции, мониторинг использует внешнюю конечную точку, а затем ожидает изменение состояния.

Схема, показывющая шаблон монитора.

Благодаря нескольким строкам кода можно использовать Устойчивые функции, чтобы создать несколько мониторингов, которые наблюдают за произвольными конечными точками. Мониторинг может завершиться, если выполнено определенное условие, или другая функция может использовать надежный клиент оркестрации для остановки мониторинга. Можно изменить интервал мониторинга wait в зависимости от определенного условия (например, экспоненциальной задержки).

В следующем коде реализуется простой мониторинг:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

При получении запроса создается новый экземпляр оркестрации для указанного идентификатора задания. Экземпляр опрашивает состояние до тех пор, пока не будет выполнено условие или не истекает время ожидания. Для управления интервалом опроса используется устойчивый таймер. После этого можно перейти к другим задачам или завершить оркестрацию.

Шаблон 5: Взаимодействие с человеком

Многие автоматизированные процессы требуют некоторого участия пользователя. Трудность вовлечения людей в автоматизированный процесс заключается в том, что пользователи не так доступны и не так быстро реагируют, как облачные службы. Автоматизированный процесс может осуществить это взаимодействие, используя тайм-ауты и логику компенсации.

Процесс утверждения является примером бизнес-процесса, который предполагает взаимодействие с пользователями. Утверждение от менеджера может потребоваться для отчета о расходах, превышающих определенную денежную сумму. Если менеджер не утверждает отчет о расходах в течение 72 часов (возможно, менеджер ушёл в отпуск), процесс эскалации вступает в силу, чтобы получить утверждение от кого-то другого (возможно, руководителя менеджера).

Схема шаблона взаимодействия с человеком.

Этот шаблон в примере можно реализовать с помощью функции оркестратора. Оркестратор использует устойчивый таймер, чтобы запросить одобрение. Оркестратор увеличивает масштаб действий при истечении времени ожидания. Оркестратор ожидает внешнее событие, такое как уведомление, созданное в результате взаимодействия человека.

В этих примерах создается процесс утверждения для демонстрации шаблона участия пользователя:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Для создания устойчивого таймера вызовите context.CreateTimer. Уведомление получает context.WaitForExternalEvent. Затем Task.WhenAny решает, следует ли выполнять эскалацию (если сначала происходит тайм-аут) или обрабатывать утверждение (если утверждение получено до истечения тайм-аута).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Для создания устойчивого таймера вызовите context.df.createTimer. Уведомление получает context.df.waitForExternalEvent. Затем context.df.Task.any вызывается, чтобы решить, следует ли выполнять эскалацию, если сначала происходит истечение времени ожидания, либо обрабатывать одобрение, если оно получено до истечения времени ожидания.

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Для создания устойчивого таймера вызовите context.create_timer. Уведомление получает context.wait_for_external_event. Затем вызывается context.task_any для решения, следует ли выполнять эскалацию (в случае, если время ожидания истекло первым) или обрабатывать утверждение (если утверждение получено до истечения времени ожидания).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Для создания устойчивого таймера вызовите Start-DurableTimer. Уведомление получает Start-DurableExternalEventListener. Затем вызывается функция Wait-DurableTask для определения, следует ли проводить эскалацию (тайм-аут происходит сначала) или обрабатывать утверждение (утверждение получено до истечения тайм-аута).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

Вызов метода ctx.waitForExternalEvent(...).await() приостанавливает оркестрацию, пока он не получит событие с именем ApprovalEvent, содержащее полезные данные boolean. При получении события вызывается функция выполнения задачи для обработки результата проверки. Однако, если такое событие не получено до истечения срока в timeout (72 часа), возникает TaskCanceledException и вызывается функция Escalate активности.

Примечание.

Плата за время ожидания внешних событий при выполнении плана потребления не взимается.

Внешний клиент может доставить уведомление о событии в функцию оркестратора, находящуюся в состоянии ожидания, через встроенные интерфейсы API HTTP:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Событие также можно вызвать с помощью клиента устойчивой оркестрации из другой функции в том же приложении функций.

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Шаблон #6: Агрегатор (объекты с отслеживанием состояния)

Шестой шаблон заключается в статистической обработке данных событий за определенный период времени в одну доступную для адресации сущность. В этом шаблоне агрегированные данные могут поступать из нескольких источников, доставляться в пакеты или быть разбросаны в течение длительного периода времени. Агрегатор может потребоваться принять меры по обработке данных событий по мере поступления, а внешним клиентам может потребоваться запрашивать агрегированные данные.

Схема, показывющая агрегатор.

Сложности при реализации этого шаблона с обычными функциями без отслеживания состояния заключаются в том, что управление параллелизмом крайне усложняется. Вы должны не только беспокоиться о нескольких потоках, изменяющих одни и те же данные одновременно, но также убедиться в том, что агрегатор выполняется только на одной виртуальной машине одновременно.

Устойчивые сущности можно использовать, чтобы легко реализовать этот шаблон как отдельную функцию.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Устойчивые сущности можно также моделировать как классы в .NET. Эта модель может быть полезной, если список операций является фиксированным и становится большим. Следующий пример представляет собой эквивалентную реализацию сущности Counter с помощью классов и методов .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Примечание.

Устойчивые сущности в настоящее время не поддерживаются в PowerShell.

Примечание.

В настоящее время в Java не поддерживаются устойчивые сущности.

Клиенты могут ставить в очередь операции для функции сущности (также известное как сигнализирование), используя привязку клиента сущности.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Примечание.

Динамически создаваемые прокси-серверы также доступны в .NET для сигнализации сущностям типобезопасным способом. Кроме сигнализации, клиенты также могут запрашивать состояние функции сущности с помощью типобезопасных методов в привязке клиента оркестрации.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Функции сущностей доступны в Durable Functions 2.0 и более поздних версиях для C#, JavaScript и Python.

Технология

Расширение "Устойчивые функции" создано на платформе устойчивых задач, библиотеки с открытым кодом на GitHub для создания рабочих процессов в коде. Подобно тому, как Функции Azure являются бессерверным развитием веб-заданий Azure, Устойчивые функции — это бессерверное развитие платформы устойчивых задач. Платформа устойчивых задач широко используется как корпорацией Майкрософт, так и другими организациями для автоматизации критически важных процессов. Это естественный выбор для бессерверной среды функций Azure.

Ограничения кода

Чтобы обеспечить надежность и гарантировать длительное выполнение, в функциях оркестратора предусмотрен набор правил кода, которым нужно следовать. Дополнительные сведения см. в разделе "Ограничения кода функции Orchestrator".

Выставление счетов

Устойчивые функции оплачиваются так же, как и функции Azure. Дополнительные сведения см. на странице цен на Функции Azure. При выполнении функций оркестратора в плане потребления функций Azure существуют некоторые варианты выставления счетов, которые следует учитывать. Дополнительные сведения об этом см. в статье Durable Functions Billing (Выставление счетов за Устойчивые функции).

Погружайтесь прямо сейчас

Вы можете начать работу с Устойчивыми функциями менее чем за 10 минут, просмотрев одно из следующих кратких руководств по конкретным языкам.

В этих кратких руководствах вы локально создаете и тестируете долговечную функцию Hello world. Затем вы опубликуете код функции в Azure. Функция, которую вы создаете, организовывает и объединяет в цепочку вызовы других функций.

Публикации

"Durable Functions" разработано в сотрудничестве с Microsoft Research. Команда, работающая над Устойчивыми функциями, активно создает исследовательские статьи и артефакты, включая следующее:

Демонстрация видео

В следующем видео показаны преимущества Устойчивых функций.

Другие параметры оркестрации

Устойчивые функции — это расширенное расширение для функций Azure и может не соответствовать всем приложениям. Сравнение с другими технологиями оркестрации Azure см. в разделе Сравнение служб "Функции Azure" и Azure Logic Apps.