Condividi tramite


Informazioni su Durable Functions

Durable Functions è una funzionalità di Funzioni di Azure che consente di scrivere funzioni con stato in un ambiente di calcolo serverless. L'estensione permette di definire flussi di lavoro con stato, scrivendo funzioni dell'agente di orchestrazione, ed entità con stato, scrivendo funzioni di entità tramite il modello di programmazione di Funzioni di Azure. Dietro le quinte, l'estensione gestisce automaticamente lo stato, i checkpoint e i riavvii, consentendo di concentrarsi sulla logica di business.

Lingue supportate

Durable Functions è progettato per essere usato con tutti i linguaggi di programmazione di Funzioni di Azure, benché i requisiti minimi possano essere diversi per ogni linguaggio. La tabella seguente illustra le configurazioni minime supportate per l’app:

Stack linguaggio Versioni di runtime di Funzioni di Azure Versione per linguisti Versione minima dei pacchetti
.NET / C# / F# Funzioni 1.0+ In-Process
Out-of-process
n/d
JavaScript/TypeScript (modello prog v3) Funzioni 2.0+ Nodo 8+ Bundle 2.x
JavaScript/TypeScript (modello prog v4) Funzioni 4.25+ Nodo 18+ Bundle 3.15+
Python Funzioni 2.0+ Python 3.7+ Bundle 2.x
Python (modello prog v2) Funzioni 4.0+ Python 3.7+ Bundle 3.15+
PowerShell Funzioni 3.0+ PowerShell 7+ Bundle 2.x
Java Funzioni 4.0+ Java 8+ Bundle 4.x

Importante

Questo articolo usa schede per supportare le versioni diverse del modello di programmazione Node.js. Il modello v4 è disponibile a livello generale ed è progettato per offrire un'esperienza più flessibile e intuitiva per gli sviluppatori JavaScript e TypeScript. Per altre informazioni sul funzionamento del modello v4, vedere la guida per gli sviluppatori di Node.js per Funzioni di Azure. Altre informazioni sulle differenze tra i modelli v3 e v4 sono disponibili nella guida alla migrazione.

Importante

Questo articolo usa le schede per supportare le versioni diverse del modello di programmazione Python. Il modello v2 è disponibile a livello generale ed è progettato per offrire un modo più incentrato sul codice per la creazione di funzioni tramite elementi Decorator. Per altre informazioni sul modello v2, vedere la Guida per sviluppatori Python per Funzioni di Azure.

Come per Funzioni di Azure, sono disponibili modelli che permettono di sviluppare Durable Functions tramite Visual Studio, Visual Studio Code e il portale di Azure.

Modelli di applicazione

Durable Functions viene usato principalmente per semplificare i complessi requisiti di coordinamento con stato nelle applicazioni serverless. Le sezioni seguenti descrivono i tipici modelli di applicazione che possono trarre vantaggio da Durable Functions:

Modello 1: Concatenamento di funzioni

Nel modello di concatenamento delle funzioni una sequenza di funzioni viene eseguita in un ordine specifico. In questo modello l'output di una funzione viene applicato all'input di un'altra funzione. L'uso di code tra ogni funzione garantisce che il sistema rimanga durevole e scalabile, anche se esiste un flusso di controllo da una funzione alla successiva.

Diagramma del modello di concatenamento delle funzioni.

È possibile usare Durable Functions per implementare il modello di concatenamento di funzioni in modo conciso, come illustrato nell'esempio seguente.

In questo esempio, i valori F1, F2, F3 ed F4 sono i nomi di altre funzioni nella stessa app per le funzioni. Il flusso di controllo può essere implementato usando normali costrutti di scrittura del codice imperativa. Il codice viene eseguito dall'alto verso il basso Il codice può coinvolgere la semantica esistente del flusso di controllo del linguaggio, come le istruzioni condizionali e i cicli. È possibile includere la logica di gestione degli errori nei in blocchi try/catch/finally.

[FunctionName("Chaining")]
public static async Task<object> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    try
    {
        var x = await context.CallActivityAsync<object>("F1", null);
        var y = await context.CallActivityAsync<object>("F2", x);
        var z = await context.CallActivityAsync<object>("F3", y);
        return  await context.CallActivityAsync<object>("F4", z);
    }
    catch (Exception)
    {
        // Error handling or compensation goes here.
    }
}

È possibile usare il parametro context per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama await, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riavviato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata await precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan-out/fan-in.

In questo esempio, i valori F1, F2, F3 ed F4 sono i nomi di altre funzioni nella stessa app per le funzioni. Il flusso di controllo può essere implementato usando normali costrutti di scrittura del codice imperativa. Il codice viene eseguito dall'alto verso il basso Il codice può coinvolgere la semantica esistente del flusso di controllo del linguaggio, come le istruzioni condizionali e i cicli. È possibile includere la logica di gestione degli errori nei in blocchi try/catch/finally.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    try {
        const x = yield context.df.callActivity("F1");
        const y = yield context.df.callActivity("F2", x);
        const z = yield context.df.callActivity("F3", y);
        return    yield context.df.callActivity("F4", z);
    } catch (error) {
        // Error handling or compensation goes here.
    }
});

È possibile usare l'oggetto context.df per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama yield, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata yield precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan-out/fan-in.

Nota

L'oggetto context in JavaScript rappresenta l'intero contesto della funzione. Accedere al contesto di Durable Functions usando la proprietà df nel contesto principale.

In questo esempio, i valori F1, F2, F3 ed F4 sono i nomi di altre funzioni nella stessa app per le funzioni. Il flusso di controllo può essere implementato usando normali costrutti di scrittura del codice imperativa. Il codice viene eseguito dall'alto verso il basso Il codice può implicare la semantica del flusso di controllo esistente del linguaggio, come istruzioni condizionali e cicli.

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    x = yield context.call_activity("F1", None)
    y = yield context.call_activity("F2", x)
    z = yield context.call_activity("F3", y)
    result = yield context.call_activity("F4", z)
    return result

main = df.Orchestrator.create(orchestrator_function)

È possibile usare l'oggetto context per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama yield, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata yield precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan-out/fan-in.

Nota

L'oggetto context in Python rappresenta il contesto dell'orchestrazione. Accedere al contesto principale di Funzioni di Azure usando la proprietà function_context nel contesto di orchestrazione.

In questo esempio, i valori F1, F2, F3 ed F4 sono i nomi di altre funzioni nella stessa app per le funzioni. Il flusso di controllo può essere implementato usando normali costrutti di scrittura del codice imperativa. Il codice viene eseguito dall'alto verso il basso Il codice può coinvolgere la semantica del flusso di controllo esistente del linguaggio, come istruzioni condizionali e cicli.

param($Context)

$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z

È possibile usare il comando Invoke-DurableActivity per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. Ogni volta che il codice chiama Invoke-DurableActivity senza l'opzione NoWait, il framework di Durable Functions imposta checkpoint sullo stato di avanzamento dell'istanza della funzione corrente. Se la VM o il processo viene riciclato durante l'esecuzione, l'istanza della funzione riprende dalla chiamata Invoke-DurableActivity precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan-out/fan-in.

In questo esempio, i valori F1, F2, F3 ed F4 sono i nomi di altre funzioni nella stessa app per le funzioni. Il flusso di controllo può essere implementato usando normali costrutti di scrittura del codice imperativa. Il codice viene eseguito dall'alto verso il basso Il codice può includere la semantica esistente del flusso di controllo del linguaggio, come le istruzioni condizionali e i cicli.

@FunctionName("Chaining")
public double functionChaining(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    String input = ctx.getInput(String.class);
    int x = ctx.callActivity("F1", input, int.class).await();
    int y = ctx.callActivity("F2", x, int.class).await();
    int z = ctx.callActivity("F3", y, int.class).await();
    return  ctx.callActivity("F4", z, double.class).await();
}

È possibile usare l'oggetto ctx per richiamare altre funzioni tramite il nome, i parametri passati e l'output restituito dalla funzione. L'output di questi metodi è un oggetto Task<V> in cui V è il tipo di dati restituiti dalla funzione richiamata. Ogni volta che il codice chiama Task<V>.await(), il framework di Durable Functions controlla lo stato di avanzamento dell'istanza della funzione corrente tramite checkpoint. Se il processo viene riciclato in modo imprevisto a metà dell'esecuzione, l'istanza della funzione riprende dalla chiamata Task<V>.await() precedente. Per altre informazioni, vedere la sezione successiva Modello 2: Fan-out/fan-in.

Modello 2: Fan-out/fan-in

Nel modello fan-out/fan-in si eseguono più funzioni in parallelo e quindi si attende il completamento di tutte le funzioni. Alcune operazioni di aggregazione vengono spesso eseguite sui risultati restituiti dalle funzioni.

Diagramma del modello fan-out fan-in.

Con le normali funzioni il fan-out può essere effettuato facendo in modo che la funzione invii più messaggi a una coda. L'operazione di fan-in è molto più complessa. In una normale funzione si scrive codice per rilevare il completamento delle funzioni attivate dalla coda e quindi archiviarne l'output.

L'estensione Durable Functions gestisce questo modello con codice relativamente semplice:

[FunctionName("FanOutFanIn")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var parallelTasks = new List<Task<int>>();

    // Get a list of N work items to process in parallel.
    object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
    for (int i = 0; i < workBatch.Length; i++)
    {
        Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
        parallelTasks.Add(task);
    }

    await Task.WhenAll(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    int sum = parallelTasks.Sum(t => t.Result);
    await context.CallActivityAsync("F3", sum);
}

L'operazione di fan-out viene distribuita a più istanze della funzione F2. Il lavoro viene monitorato tramite un elenco dinamico di attività. Viene effettuata una chiamata a Task.WhenAll per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2 vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3.

L'impostazione automatica di checkpoint che avviene alla chiamata di await su Task.WhenAll assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.

const df = require("durable-functions");

module.exports = df.orchestrator(function*(context) {
    const parallelTasks = [];

    // Get a list of N work items to process in parallel.
    const workBatch = yield context.df.callActivity("F1");
    for (let i = 0; i < workBatch.length; i++) {
        parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
    }

    yield context.df.Task.all(parallelTasks);

    // Aggregate all N outputs and send the result to F3.
    const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
    yield context.df.callActivity("F3", sum);
});

L'operazione di fan-out viene distribuita a più istanze della funzione F2. Il lavoro viene monitorato mediante un elenco dinamico di attività. Viene chiamata l'API context.df.Task.all per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2 vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3.

L'impostazione automatica di checkpoint che avviene alla chiamata di yield su context.df.Task.all assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.

import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    # Get a list of N work items to process in parallel.
    work_batch = yield context.call_activity("F1", None)

    parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]

    outputs = yield context.task_all(parallel_tasks)

    # Aggregate all N outputs and send the result to F3.
    total = sum(outputs)
    yield context.call_activity("F3", total)


main = df.Orchestrator.create(orchestrator_function)

L'operazione di fan-out viene distribuita a più istanze della funzione F2. Il lavoro viene monitorato utilizzando un elenco dinamico di attività. Viene chiamata l'API context.task_all per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2 vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3.

L'impostazione automatica di checkpoint che avviene alla chiamata di yield su context.task_all assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.

param($Context)

# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'

$ParallelTasks =
    foreach ($WorkItem in $WorkBatch) {
        Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
    }

$Outputs = Wait-ActivityFunction -Task $ParallelTasks

# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total

L'operazione di fan-out viene distribuita a più istanze della funzione F2. Si noti l'utilizzo dell'opzione NoWait per la chiamata della funzione F2: questa opzione consente all'agente di orchestrazione di continuare a richiamare F2 senza attendere il completamento dell'attività. Il lavoro viene monitorato tramite un elenco dinamico di attività. Il comando Wait-ActivityFunction viene chiamato per attendere il completamento di tutte le funzioni chiamate. Quindi, gli output della funzione F2 vengono aggregati dall'elenco dinamico di attività e passati alla funzione F3.

L'impostazione automatica di checkpoint che avviene alla chiamata di Wait-ActivityFunction assicura che qualsiasi potenziale riavvio o arresto anomalo del sistema durante l'esecuzione non richieda il riavvio di un'attività già completata.

@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    // Get the list of work-items to process in parallel
    List<?> batch = ctx.callActivity("F1", List.class).await();

    // Schedule each task to run in parallel
    List<Task<Integer>> parallelTasks = batch.stream()
            .map(item -> ctx.callActivity("F2", item, Integer.class))
            .collect(Collectors.toList());

    // Wait for all tasks to complete, then return the aggregated sum of the results
    List<Integer> results = ctx.allOf(parallelTasks).await();
    return results.stream().reduce(0, Integer::sum);
}

L'operazione di fan-out viene distribuita a più istanze della funzione F2. Il lavoro viene tracciato utilizzando un elenco dinamico di attività. Viene effettuata una chiamata a ctx.allOf(parallelTasks).await() per attendere il completamento di tutte le funzioni chiamate. Gli output della funzione F2 vengono quindi aggregati dall'elenco di attività dinamico e restituiti come output della funzione di orchestrazione.

Il controllo tramite checkpoint automatico che si verifica al momento della chiamata .await() a ctx.allOf(parallelTasks) garantisce che un riciclo imprevisto del processo non richieda il riavvio delle attività già completate.

Nota

In rari casi, è possibile che si verifichi un arresto anomalo nella finestra dopo che una funzione di attività è stata completata, ma prima che il completamento venga salvato nella cronologia dell'orchestrazione. In questo caso, la funzione di attività verrà rieseguita dall'inizio dopo il ripristino del processo.

Modello 3: API HTTP asincrone

Il modello API HTTP asincrone risolve il problema di coordinare lo stato di operazioni a esecuzione prolungata con client esterni. Un modo comune per implementare questo modello è fare in modo che un endpoint HTTP attivi l'azione a esecuzione prolungata. Successivamente, reindirizza il client a un endpoint di stato che il client interroga per sapere quando l'operazione è terminata.

Diagramma che mostra il modello api HTTP.

Durable Functions offre supporto incorporato per questo modello, semplificando o anche rimuovendo il codice che è necessario scrivere per interagire con le funzioni a esecuzione prolungata. Gli esempi riportati nella guida introduttiva di Durable Functions (C#, JavaScript, TypeScript, Python, PowerShell e Java) mostrano un semplice comando REST che è possibile usare per avviare nuove istanze delle funzioni dell’agente di orchestrazione. Dopo l'avvio di un'istanza, l'estensione espone le API HTTP del webhook che eseguono query sullo stato della funzione di orchestrazione.

L'esempio seguente mostra i comandi REST per avviare un agente di orchestrazione e per eseguire query sul relativo stato. Per maggiore chiarezza, alcuni dettagli sono stati omessi dall'esempio.

> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec

{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}

> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json

{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}

Poiché lo stato è gestito automaticamente dal runtime di Durable Functions, non è necessario implementare un meccanismo personalizzato di monitoraggio.

L'estensione Durable Functions espone le API HTTP predefinite che gestiscono le orchestrazioni a esecuzione prolungata. In alternativa, è possibile implementare questo modello direttamente usando trigger di funzione personalizzati (ad esempio HTTP, una coda o Hub eventi di Azure) e il binding del client durevole. Ad esempio, è possibile usare un messaggio di coda per attivare la terminazione. In alternativa, è possibile usare un trigger HTTP protetto da un criterio di autenticazione di Microsoft Entra anziché le API HTTP predefinite che usano una chiave generata per l'autenticazione.

Per altre informazioni, vedere l'articolo sulle funzionalità HTTP, che spiega come esporre i processi asincroni a esecuzione prolungata su HTTP usando l'estensione Durable Functions.

Modello 4: Monitoraggio

Il modello di monitoraggio indica un processo flessibile e ricorrente in un workflow. Un esempio è il polling eseguito finché non vengono soddisfatte determinate condizioni. È possibile usare un normale trigger timer per gestire un semplice scenario, ad esempio un processo di pulizia periodico, ma l'intervallo è statico e la gestione delle durate delle istanze diventa complessa. È possibile usare Durable Functions per creare intervalli di ricorrenza flessibili, gestire la durata delle attività e creare più processi di monitoraggio da una singola orchestrazione.

Un esempio di modello di monitoraggio consiste nell'invertire lo scenario precedente delle API HTTP asincrone. Invece di esporre un endpoint per un client esterno per monitorare un'operazione a lunga durata, il monitor di lunga durata utilizza un endpoint esterno e poi attende un cambiamento di stato.

Diagramma che mostra il modello di monitoraggio.

In poche righe di codice, è possibile usare Durable Functions per creare più monitoraggi che osservano endpoint arbitrari. I monitoraggi possono terminare l'esecuzione quando viene soddisfatta una condizione oppure un'altra funzione può usare il cliente di orchestrazione durevole per terminarli. È possibile cambiare l'intervallo wait di un monitoraggio in base a una condizione specifica, ad esempio un backoff esponenziale.

Il codice seguente implementa un monitoraggio di base:

[FunctionName("MonitorJobStatus")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    int jobId = context.GetInput<int>();
    int pollingInterval = GetPollingInterval();
    DateTime expiryTime = GetExpiryTime();

    while (context.CurrentUtcDateTime < expiryTime)
    {
        var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
        if (jobStatus == "Completed")
        {
            // Perform an action when a condition is met.
            await context.CallActivityAsync("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
        await context.CreateTimer(nextCheck, CancellationToken.None);
    }

    // Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");

module.exports = df.orchestrator(function*(context) {
    const jobId = context.df.getInput();
    const pollingInterval = getPollingInterval();
    const expiryTime = getExpiryTime();

    while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
        const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
        if (jobStatus === "Completed") {
            // Perform an action when a condition is met.
            yield context.df.callActivity("SendAlert", jobId);
            break;
        }

        // Orchestration sleeps until this time.
        const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
        yield context.df.createTimer(nextCheck.toDate());
    }

    // Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    job = json.loads(context.get_input())
    job_id = job["jobId"]
    polling_interval = job["pollingInterval"]
    expiry_time = job["expiryTime"]

    while context.current_utc_datetime < expiry_time:
        job_status = yield context.call_activity("GetJobStatus", job_id)
        if job_status == "Completed":
            # Perform an action when a condition is met.
            yield context.call_activity("SendAlert", job_id)
            break

        # Orchestration sleeps until this time.
        next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
        yield context.create_timer(next_check)

    # Perform more work here, or let the orchestration end.


main = df.Orchestrator.create(orchestrator_function)
param($Context)

$output = @()

$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime

while ($Context.CurrentUtcDateTime -lt $expiryTime) {
    $jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
    if ($jobStatus -eq "Completed") {
        # Perform an action when a condition is met.
        $output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
        break
    }

    # Orchestration sleeps until this time.
    Start-DurableTimer -Duration $pollingInterval
}

# Perform more work here, or let the orchestration end.

$output
@FunctionName("Monitor")
public String monitorOrchestrator(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    JobInfo jobInfo = ctx.getInput(JobInfo.class);
    String jobId = jobInfo.getJobId();
    Instant expiryTime = jobInfo.getExpirationTime();

    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
        String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();

        // Perform an action when a condition is met
        if (status.equals("Completed")) {
            // send an alert and exit
            ctx.callActivity("SendAlert", jobId).await();
            break;
        }

        // wait N minutes before doing the next poll
        Duration pollingDelay = jobInfo.getPollingDelay();
        ctx.createTimer(pollingDelay).await();
    }

    return "done";
}

Quando viene ricevuta una richiesta, viene creata una nuova istanza di orchestrazione per quell'ID del lavoro. L'istanza esegue il polling di uno stato fino a quando non viene soddisfatta una condizione o fino alla scadenza di un timeout. Un timer durevole controlla l'intervallo di polling. È quindi possibile eseguire altre operazioni o terminare l'orchestrazione.

Modello 5: Interazione umana

Molti processi automatizzati implicano una qualche interazione umana. Il problema in un processo automatizzato con interazione umana è che le persone non sono sempre disponibili e reattive quanto i servizi cloud. Un processo automatizzato potrebbe consentire questa interazione usando timeout e logica di compensazione.

Un processo di approvazione è un esempio di processo aziendale che implica interazione umana. Potrebbe essere richiesta l'approvazione di un manager per una nota spese che supera un determinato importo. Se il responsabile non approva la nota spese entro 72 ore (ad esempio perché è in ferie), viene avviato un processo di escalation per ottenere l'approvazione da parte di qualcun altro, magari il suo diretto superiore.

Diagramma del modello di interazione umana.

È possibile implementare il modello in questo esempio usando una funzione di orchestrazione. L'orchestratore usa un timer durevole per richiedere l'approvazione. L'agente di orchestrazione esegue l'escalation in caso di timeout. L'agente di orchestrazione attende che si verifichi un evento esterno, ad esempio una notifica generata tramite interazione umana.

Questi esempi creano un processo di approvazione per illustrare il modello di interazione umana:

[FunctionName("ApprovalWorkflow")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    await context.CallActivityAsync("RequestApproval", null);
    using (var timeoutCts = new CancellationTokenSource())
    {
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);

        Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
        if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
        {
            timeoutCts.Cancel();
            await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
        }
        else
        {
            await context.CallActivityAsync("Escalate", null);
        }
    }
}

Per creare il timer durevole, chiamare context.CreateTimer. La notifica viene ricevuta da context.WaitForExternalEvent. Viene quindi effettuata una chiamata a Task.WhenAny per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).

const df = require("durable-functions");
const moment = require('moment');

module.exports = df.orchestrator(function*(context) {
    yield context.df.callActivity("RequestApproval");

    const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
    const durableTimeout = context.df.createTimer(dueTime.toDate());

    const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
    const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
    if (winningEvent === approvalEvent) {
        durableTimeout.cancel();
        yield context.df.callActivity("ProcessApproval", approvalEvent.result);
    } else {
        yield context.df.callActivity("Escalate");
    }
});

Per creare il timer durevole, chiamare context.df.createTimer. La notifica viene ricevuta da context.df.waitForExternalEvent. Viene quindi effettuata una chiamata a context.df.Task.any per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).

import azure.durable_functions as df
import json
from datetime import timedelta 


def orchestrator_function(context: df.DurableOrchestrationContext):
    yield context.call_activity("RequestApproval", None)

    due_time = context.current_utc_datetime + timedelta(hours=72)
    durable_timeout_task = context.create_timer(due_time)
    approval_event_task = context.wait_for_external_event("ApprovalEvent")

    winning_task = yield context.task_any([approval_event_task, durable_timeout_task])

    if approval_event_task == winning_task:
        durable_timeout_task.cancel()
        yield context.call_activity("ProcessApproval", approval_event_task.result)
    else:
        yield context.call_activity("Escalate", None)


main = df.Orchestrator.create(orchestrator_function)

Per creare il timer durevole, chiamare context.create_timer. La notifica viene ricevuta da context.wait_for_external_event. Viene quindi effettuata una chiamata a context.task_any per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).

param($Context)

$output = @()

$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId

$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId

$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait

$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any

if ($approvalEvent -eq $firstEvent) {
    Stop-DurableTimerTask -Task $durableTimeoutEvent
    $output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
    $output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}

$output

Per creare il timer durevole, chiamare Start-DurableTimer. La notifica viene ricevuta da Start-DurableExternalEventListener. Viene quindi effettuata una chiamata a Wait-DurableTask per stabilire se attivare l'escalation (si verifica prima il timeout) o elaborare l'approvazione (l'approvazione viene ricevuta prima del timeout).

@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
        @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
    ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
    ctx.callActivity("RequestApproval", approvalInfo).await();

    Duration timeout = Duration.ofHours(72);
    try {
        // Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
        boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
        approvalInfo.setApproved(approved);

        ctx.callActivity("ProcessApproval", approvalInfo).await();
    } catch (TaskCanceledException timeoutEx) {
        ctx.callActivity("Escalate", approvalInfo).await();
    }
}

La chiamata al metodo ctx.waitForExternalEvent(...).await() sospende l'orchestrazione fino a quando non riceve un evento denominato ApprovalEvent, che ha un payload boolean. Se l'evento viene ricevuto, viene chiamata una funzione di attività per elaborare il risultato dell'approvazione. Tuttavia, se non viene ricevuto alcun evento di questo tipo prima della timeout scadenza (72 ore), viene segnalato un TaskCanceledException e viene chiamata la funzione di attività Escalate.

Nota

Non è previsto alcun addebito per il tempo impiegato per l'attesa di eventi esterni durante l'esecuzione nel piano a consumo.

Un client esterno può recapitare la notifica degli eventi a una funzione dell'agente di orchestrazione in attesa usando le API HTTP predefinite:

curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"

Un evento può essere generato anche usando il client di orchestrazione durevole di un'altra funzione nella stessa app per le funzioni:

[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
    [HttpTrigger] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    bool isApproved = true;
    await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
const df = require("durable-functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const isApproved = true;
    await client.raiseEvent(instanceId, "ApprovalEvent", isApproved);
};
import azure.durable_functions as df


async def main(client: str):
    durable_client = df.DurableOrchestrationClient(client)
    is_approved = True
    await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"

@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
        @HttpTrigger(name = "instanceId") String instanceId,
        @DurableClientInput(name = "durableContext") DurableClientContext durableContext) {

    DurableTaskClient client = durableContext.getClient();
    client.raiseEvent(instanceId, "ApprovalEvent", true);
}

Modello 6: Aggregatore (entità con stato)

Il sesto modello riguarda l'aggregazione in una singola entità indirizzabile dei dati degli eventi relativi a un periodo di tempo. In questo modello, i dati aggregati possono provenire da più origini, possono essere recapitati in batch o possono essere sparsi per lunghi periodi di tempo. È possibile che l'aggregatore debba intervenire sui dati degli eventi non appena arrivano, mentre i client esterni potrebbero dover eseguire query sui dati aggregati.

Diagramma che mostra un aggregatore.

Se si prova a implementare questo modello con normali funzioni senza stato, l'aspetto più difficile è che il controllo della concorrenza diventa un problema enorme. Non solo è necessario preoccuparsi di più thread che modificano contemporaneamente gli stessi dati, ma è anche necessario preoccuparsi di garantire che l'aggregatore venga eseguito solo in una singola macchina virtuale alla volta.

È possibile usare entità durevoli per implementare facilmente questo modello come una singola funzione.

[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
    int currentValue = ctx.GetState<int>();
    switch (ctx.OperationName.ToLowerInvariant())
    {
        case "add":
            int amount = ctx.GetInput<int>();
            ctx.SetState(currentValue + amount);
            break;
        case "reset":
            ctx.SetState(0);
            break;
        case "get":
            ctx.Return(currentValue);
            break;
    }
}

Le entità durevoli possono anche essere modellate come classi in .NET. Questo modello può rivelarsi utile se l'elenco delle operazioni è fisso e diventa grande. L'esempio seguente è un'implementazione equivalente dell'entità Counter tramite classi e metodi .NET.

public class Counter
{
    [JsonProperty("value")]
    public int CurrentValue { get; set; }

    public void Add(int amount) => this.CurrentValue += amount;

    public void Reset() => this.CurrentValue = 0;

    public int Get() => this.CurrentValue;

    [FunctionName(nameof(Counter))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
        => ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");

module.exports = df.entity(function(context) {
    const currentValue = context.df.getState(() => 0);
    switch (context.df.operationName) {
        case "add":
            const amount = context.df.getInput();
            context.df.setState(currentValue + amount);
            break;
        case "reset":
            context.df.setState(0);
            break;
        case "get":
            context.df.return(currentValue);
            break;
    }
});
import azure.functions as func
import azure.durable_functions as df


def entity_function(context: df.DurableOrchestrationContext):

    current_value = context.get_state(lambda: 0)
    operation = context.operation_name
    if operation == "add":
        amount = context.get_input()
        current_value += amount
        context.set_result(current_value)
    elif operation == "reset":
        current_value = 0
    elif operation == "get":
        context.set_result(current_value)

    context.set_state(current_value)

main = df.Entity.create(entity_function)

Nota

Le entità durevoli non sono attualmente supportate in PowerShell.

Nota

Le entità durevoli non sono attualmente supportate in Java.

I client possono accodare operazioni (anche nota come segnalazione) per una funzione di entità tramite il binding del client di entità.

[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
    [EventHubTrigger("device-sensor-events")] EventData eventData,
    [DurableClient] IDurableEntityClient entityClient)
{
    var metricType = (string)eventData.Properties["metric"];
    var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);

    // The "Counter/{metricType}" entity is created on-demand.
    var entityId = new EntityId("Counter", metricType);
    await entityClient.SignalEntityAsync(entityId, "add", delta);
}

Nota

In .NET sono anche disponibili proxy generati dinamicamente per la segnalazione di entità in modo indipendente dai tipi. Oltre alla segnalazione, i client possono anche eseguire query per ottenere lo stato di una funzione di entità tramite metodi indipendenti dai tipi sul binding del client di orchestrazione.

const df = require("durable-functions");
const { app } = require("@azure/functions");

module.exports = async function (context) {
    const client = df.getClient(context);
    const entityId = new df.EntityId("Counter", "myCounter");
    await client.signalEntity(entityId, "add", 1);
};
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    entity_id = df.EntityId("Counter", "myCounter")
    instance_id = await client.signal_entity(entity_id, "add", 1)
    return func.HttpResponse("Entity signaled")

Le funzioni dell’entità sono disponibili in Durable Functions 2.0 e versioni successive per C#, JavaScript e Python.

La tecnologia

L'estensione Durable Functions è basata su Durable Task Framework, una libreria open source disponibile in GitHub usata per creare flussi di lavoro nel codice. Allo stesso modo in cui le Funzioni di Azure rappresentano l'evoluzione serverless dei WebJobs di Azure, le Durable Functions rappresentano l'evoluzione serverless del Durable Task Framework. In Microsoft e in altre organizzazioni il sistema Durable Task Framework viene ampiamente usato per automatizzare i processi cruciali. È una scelta ideale per l'ambiente Azure Functions senza server.

Vincoli di codice

Per offrire garanzie di esecuzione affidabili e a esecuzione prolungata, le funzioni dell'agente di orchestrazione prevedono un set di regole di codifica che devono essere rispettate. Per altre informazioni, vedere Vincoli di codice della funzione di Orchestrator.

Fatturazione

Le Durable Functions vengono fatturate allo stesso modo delle Funzioni di Azure. Per ulteriori informazioni, vedere Prezzi di Azure Functions. Quando si eseguono funzioni dell'agente di orchestrazione nel piano a consumo di Funzioni di Azure, è necessario tenere presenti alcuni comportamenti di fatturazione. Per ulteriori informazioni su questi comportamenti, vedere l'articolo sulla fatturazione di Durable Functions.

Per iniziare immediatamente

È possibile iniziare a usare Durable Functions in meno di 10 minuti completando una di queste esercitazioni introduttive specifiche del linguaggio:

In questi argomenti di avvio rapido viene creata e testata in locale una funzione durable Hello world. Il codice della funzione verrà quindi pubblicato in Azure. La funzione creata orchestra e concatena le chiamate ad altre funzioni.

Pubblicazioni

Durable Functions è sviluppato in collaborazione con Microsoft Research. Di conseguenza, il team di Durable Functions produce attivamente documenti di ricerca e artefatti; tra cui:

Video dimostrativo

Il video seguente illustra i vantaggi di Durable Functions:

Altre opzioni di orchestrazione

Durable Functions è un'estensione avanzata per Funzioni di Azure e potrebbe non essere appropriata per tutte le applicazioni. Per un confronto con altre tecnologie di orchestrazione di Azure, vedere Confrontare Funzioni di Azure e App per la logica di Azure.