Используйте встроенные API управления экземплярами для запуска, запроса, завершения, приостановки, возобновления и очистки экземпляров оркестрации в устойчивых рабочих процессах. В Устойчивые функции привязка клиента orchestration предоставляет эти API. В пакетах SDK устойчивых задач те же операции доступны через DurableTaskClient класс. В этой статье показано, как выполнять каждую операцию управления экземплярами с примерами кода для обеих платформ.
Подсказка
Планировщик Azure Durable Task Scheduler рекомендуется в качестве серверной части как для Устойчивые функции, так и для Durable Task SDKs, предлагая полностью управляемую бессерверную архитектуру для выполнения устойчивых рабочих процессов на масштабном уровне.
Запуск виртуальных экземпляров
Метод start-new (или schedule-new) в клиенте оркестрации запускает новый экземпляр оркестрации. Внутри этого метода записывается сообщение в настроенную серверную часть (например, планировщик устойчивых задач или служба хранилища Azure), а затем возвращается. Это сообщение активирует асинхронное начало оркестрации с указанным именем.
Ниже приведены параметры запуска нового экземпляра оркестрации:
-
Имя: имя функции оркестратора для планирования.
-
Входные данные: все данные, сериализуемые в формате JSON, которые должны передаваться в качестве входных данных в функцию оркестратора.
-
InstanceId: (необязательно) Уникальный идентификатор экземпляра. Если этот параметр не указан, метод использует случайный идентификатор.
Подсказка
По возможности используйте случайный идентификатор для идентификатора экземпляра. Идентификаторы случайных экземпляров помогают обеспечить равное распределение нагрузки при масштабировании функций оркестратора на нескольких виртуальных машинах. Правильное время для использования нерондомизированных идентификаторов экземпляров - это когда идентификатор поступает из внешнего источника или при реализации шаблона синглтон-оркестратора.
-
Имя: имя оркестрации для расписания.
-
Входные данные: любые сериализуемые в формате JSON данные, которые должны передаваться в качестве входных данных в оркестрацию.
-
InstanceId: (необязательно) Уникальный идентификатор экземпляра. Если этот параметр не указан, метод использует случайный идентификатор.
Подсказка
По возможности используйте случайный идентификатор для идентификатора экземпляра. Случайные идентификаторы экземпляров помогают обеспечить равномерное распределение нагрузки при масштабировании оркестраций между несколькими виртуальными машинами. Правильное время для использования нерондомизированных идентификаторов экземпляров - это когда идентификатор поступает из внешнего источника или при реализации шаблона синглтон-оркестратора.
В следующем примере функция запускает новый оркестрационный экземпляр:
[FunctionName("HelloWorldQueueTrigger")]
public static async Task Run(
[QueueTrigger("start-queue")] string input,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
string instanceId = await starter.StartNewAsync("HelloWorld", input);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
Если иное не указано, примеры на этой странице используют триггер HTTP со следующими function.json.
function.json
{
"bindings": [
{
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
Замечание
Этот пример предназначен для Устойчивые функции версии 2.x. В версии 1.x используйте orchestrationClient вместо durableClient.
index.js
const df = require("durable-functions");
module.exports = async function(context, input) {
const client = df.getClient(context);
const instanceId = await client.startNew("HelloWorld", undefined, input);
context.log(`Started orchestration with ID = ${instanceId}.`);
};
Если иное не указано, примеры на этой странице используют триггер HTTP со следующими function.json.
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
Замечание
Этот пример предназначен для Устойчивые функции версии 2.x. В версии 1.x используйте orchestrationClient вместо durableClient.
__init__.py
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new('HelloWorld', None, None)
logging.log(f"Started orchestration with ID = ${instance_id}.")
Если иное не указано, примеры на этой странице используют триггер HTTP со следующими function.json.
function.json
{
"bindings": [
{
"name": "Request",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "Response",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
Замечание
Этот пример предназначен для Устойчивые функции версии 2.x. В версии 1.x используйте orchestrationClient вместо durableClient.
run.ps1
param($Request, $TriggerMetadata)
$InstanceId = Start-DurableOrchestration -FunctionName 'HelloWorld'
Write-Host "Started orchestration with ID = '$InstanceId'"
@FunctionName("HelloWorldQueueTrigger")
public void helloWorldQueueTrigger(
@QueueTrigger(name = "input", queueName = "start-queue", connection = "Storage") String input,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
String instanceID = client.scheduleNewOrchestrationInstance("HelloWorld");
context.getLogger().info("Scheduled orchestration with ID = " + instanceID);
}
Чтобы дождаться запуска оркестратора перед возвратом из функции, примените метод waitForInstanceStart().
// wait up to 30 seconds for the scheduled orchestration to enter the "Running" state
client.waitForInstanceStart(instanceID, Duration.ofSeconds(30));
Это важно
В настоящее время пакет SDK для устойчивых задач PowerShell недоступен.
В следующем коде показано, как запустить новый экземпляр оркестрации с помощью пакетов SDK для устойчивых задач:
using Microsoft.DurableTask.Client;
// Schedule a new orchestration instance
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("HelloWorld", input);
Console.WriteLine($"Started orchestration with ID = '{instanceId}'.");
// Optionally, wait for the orchestration to start
OrchestrationMetadata metadata = await client.WaitForInstanceStartAsync(instanceId, timeout: TimeSpan.FromSeconds(30));
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(hello_world, input=input_data)
print(f"Started orchestration with ID = '{instance_id}'.")
# Optionally, wait for the orchestration to start
state = client.wait_for_orchestration_start(instance_id, timeout=30)
import com.microsoft.durabletask.DurableTaskClient;
// Schedule a new orchestration instance
String instanceId = client.scheduleNewOrchestrationInstance("HelloWorld", input);
System.out.println("Started orchestration with ID = '" + instanceId + "'.");
// Optionally, wait for the orchestration to start
OrchestrationMetadata metadata = client.waitForInstanceStart(instanceId, Duration.ofSeconds(30), false);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Schedule a new orchestration instance
const instanceId = await client.scheduleNewOrchestration("HelloWorld", input);
console.log(`Started orchestration with ID = '${instanceId}'.`);
// Optionally, wait for the orchestration to start
const state = await client.waitForOrchestrationStart(instanceId, false, 30);
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Экземпляры запросов
После запуска новых экземпляров оркестрации вам, вероятно, потребуется запросить их текущее состояние, чтобы узнать, запущены ли они, полностью завершены или завершились с ошибкой.
Метод get-status в клиенте оркестрации возвращает состояние экземпляра оркестрации.
Он принимает instanceId (обязательно), showHistory (необязательно), showHistoryOutput (необязательно) и showInput (необязательно) в качестве параметров.
-
showHistory: если задано значение true, ответ содержит журнал выполнения.
-
showHistoryOutput: если задано значение true, журнал выполнения содержит выходные данные действий.
-
showInput: если задано значение false, ответ не содержит входных данных функции. Значение по умолчанию — true.
Метод возвращает объект со следующими свойствами:
-
Имя: имя функции оркестратора.
-
InstanceId: идентификатор экземпляра оркестрации (должен совпадать с входными данными
instanceId ).
-
CreatedTime: время запуска функции оркестратора.
-
LastUpdatedTime: время последнего обновления контрольной точки оркестрации.
-
Входные данные: входные данные функции в виде значения JSON. Поле не заполняется, если
showInput равно false.
-
CustomStatus: состояние настраиваемой оркестрации в формате JSON.
-
Выходные данные: выходные данные функции в виде значения JSON (если функция завершается). Если функция оркестратора завершится с ошибкой, это свойство будет содержать информацию об ошибке. Если функция оркестратора приостановлена или прекращена, это свойство включает причину приостановки или завершения (если таковой имеется).
-
RuntimeStatus: одно из следующих значений:
-
Ожидание: экземпляр запланирован, но еще не запущен.
-
В работе: экземпляр работает.
-
Завершено: экземпляр завершился нормально.
-
ПродолжениеКакНовый: экземпляр перезапустился с новой историей. Это временное состояние.
-
Ошибка: Во время выполнения экземпляра произошла ошибка.
-
Прекращено: экземпляр внезапно прекратился.
-
Приостановлено: инстанция приостановлена и может быть возобновлена позднее.
-
История выполнения: история выполнения оркестрации. Это поле заполняется только в том случае, если
showHistory задано значение true.
-
showHistory: если задано значение true, ответ содержит журнал выполнения.
-
showHistoryOutput: если задано значение true, журнал выполнения содержит выходные данные действий.
-
showInput: если указано значение false, ответ не содержит входных данных оркестрационного процесса. Значение по умолчанию — true.
Метод возвращает объект со следующими свойствами:
-
Имя: имя оркестрации.
-
InstanceId: идентификатор экземпляра оркестрации (должен совпадать с входными данными
instanceId ).
-
CreatedTime: время запуска оркестрации.
-
LastUpdatedTime: время последнего обновления контрольной точки оркестрации.
-
Входные данные оркестрации в виде значения в формате JSON. Поле не заполняется, если
showInput равно false.
-
CustomStatus: состояние настраиваемой оркестрации в формате JSON.
-
Выходные данные: выходные данные оркестрации в формате JSON (если оркестрация завершается). Если оркестрация завершится сбоем, это свойство содержит информацию об ошибке. Если оркестрация приостановлена или прекращена, это свойство включает причину приостановки или завершения (если таковые имеются).
-
RuntimeStatus: одно из следующих значений:
-
Ожидание: экземпляр запланирован, но еще не запущен.
-
В работе: экземпляр работает.
-
Завершено: экземпляр завершился нормально.
-
ПродолжениеКакНовый: экземпляр перезапустился с новой историей. Это временное состояние.
-
Ошибка: Во время выполнения экземпляра произошла ошибка.
-
Прекращено: экземпляр внезапно прекратился.
-
Приостановлено: инстанция приостановлена и может быть возобновлена позднее.
-
История выполнения: история выполнения оркестрации. Это поле заполняется только в том случае, если
showHistory задано значение true.
Замечание
Оркестратор не помечается как Completed до завершения всех запланированных задач и возвращения оркестратора. Иными словами, для оркестратора недостаточно, чтобы достичь его return заявления, чтобы он был помечен как Completed. Это особенно важно для случаев, когда используется WhenAny; эти оркестраторы часто начинают return до того, как все запланированные задачи будут выполнены.
Этот метод возвращает null (.NET и Java), undefined (JavaScript) или None (Python), если экземпляр не существует.
[FunctionName("GetStatus")]
public static async Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("check-status-queue")] string instanceId)
{
DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
// do something based on the current status.
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const status = await client.getStatus(instanceId);
// do something based on the current status.
// example: if status.runtimeStatus === df.OrchestrationRuntimeStatus.Running: ...
}
Сведения о конфигурации function.json см. в разделе «Запуск экземпляров».
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
status = await client.get_status(instance_id)
# do something based on the current status
# example: if (existing_instance.runtime_status is df.OrchestrationRuntimeStatus.Running) { ...
param($Request, $TriggerMetadata)
# Get instanceid from body
$InstanceId = $Request.Body.InstanceId
$Status = Get-DurableStatus -InstanceId $InstanceId -ShowHistory -ShowHistoryOutput -ShowInput
Write-Host "Status: $($Status | ConvertTo-Json)"
# Do something based on status
# example: if ($Status.runtimeStatus -eq 'Running') { ... }
@FunctionName("GetStatus")
public void getStatus(
@QueueTrigger(name = "instanceID", queueName = "check-status-queue", connection = "Storage") String instanceID,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceID, false);
if (metadata != null) {
OrchestrationRuntimeStatus status = metadata.getRuntimeStatus();
switch (status) {
// do something based on the current status
}
}
}
using Microsoft.DurableTask.Client;
// Get the status of an orchestration instance
OrchestrationMetadata? metadata = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: true);
if (metadata != null)
{
OrchestrationRuntimeStatus status = metadata.RuntimeStatus;
// do something based on the current status
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Get the status of an orchestration instance
state = client.get_orchestration_state(instance_id, fetch_payloads=True)
if state is not None:
status = state.runtime_status
# do something based on the current status
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
// Get the status of an orchestration instance
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceId, true);
if (metadata != null) {
OrchestrationRuntimeStatus status = metadata.getRuntimeStatus();
// do something based on the current status
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Get the status of an orchestration instance
const state = await client.getOrchestrationState(instanceId, true);
if (state) {
const status = state.runtimeStatus;
// do something based on the current status
}
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Запросите все экземпляры оркестрации
Вы можете использовать API в пакете SDK для вашего языка программирования, чтобы запрашивать состояния всех экземпляров оркестрации в центре задач. Этот API list-instances или get-status возвращает список объектов, представляющих экземпляры оркестрации, соответствующие параметрам запроса.
[FunctionName("GetAllStatus")]
public static async Task Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
var noFilter = new OrchestrationStatusQueryCondition();
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(
noFilter,
CancellationToken.None);
foreach (DurableOrchestrationStatus instance in result.DurableOrchestrationState)
{
log.LogInformation(JsonConvert.SerializeObject(instance));
}
// Note: ListInstancesAsync only returns the first page of results.
// To request additional pages provide the result.ContinuationToken
// to the OrchestrationStatusQueryCondition's ContinuationToken property.
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const instances = await client.getStatusAll();
instances.forEach((instance) => {
context.log(JSON.stringify(instance));
});
};
Сведения о конфигурации function.json см. в разделе "Запуск экземпляров".
import logging
import json
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instances = await client.get_status_all()
for instance in instances:
logging.log(json.dumps(instance))
Сведения о конфигурации function.json см. в разделе "Запуск экземпляров".
Замечание
В настоящее время PowerShell не поддерживает эту функцию, но ее можно достичь с помощью API Устойчивые функции HTTP.
@FunctionName("GetAllStatus")
public String getAllStatus(
@HttpTrigger(name = "req", methods = {HttpMethod.GET}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
OrchestrationStatusQuery noFilter = new OrchestrationStatusQuery();
OrchestrationStatusQueryResult result = client.queryInstances(noFilter);
return "Found " + result.getOrchestrationState().size() + " orchestrations.";
}
using Microsoft.DurableTask.Client;
// Query all orchestration instances
AsyncPageable<OrchestrationMetadata> instances = client.GetAllInstancesAsync(new OrchestrationQuery());
await foreach (OrchestrationMetadata instance in instances)
{
Console.WriteLine(instance.InstanceId);
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Query all orchestration instances
instances = client.list_orchestrations()
for instance in instances:
print(instance.instance_id)
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationStatusQuery;
import com.microsoft.durabletask.OrchestrationStatusQueryResult;
// Query all orchestration instances
OrchestrationStatusQuery query = new OrchestrationStatusQuery();
OrchestrationStatusQueryResult result = client.queryInstances(query);
for (OrchestrationMetadata instance : result.getOrchestrationState()) {
System.out.println(instance.getInstanceId());
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Query all orchestration instances
const instances = client.getAllInstances();
for await (const instance of instances) {
console.log(instance.instanceId);
}
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Выполнение запроса экземпляров оркестрации с использованием фильтров
Что если вам не нужны все сведения, которые предоставляет стандартный запрос инстанции? Например, что делать, если вы просто ищете время создания оркестрации или статус времени выполнения оркестрации? Сузите запрос, применяя фильтры.
[FunctionName("QueryStatus")]
public static async Task Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
// Get the first 100 running or pending instances that were created between 7 and 1 days ago
var queryFilter = new OrchestrationStatusQueryCondition
{
RuntimeStatus = new[]
{
OrchestrationRuntimeStatus.Pending,
OrchestrationRuntimeStatus.Running,
},
CreatedTimeFrom = DateTime.UtcNow.Subtract(TimeSpan.FromDays(7)),
CreatedTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(1)),
PageSize = 100,
};
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(
queryFilter,
CancellationToken.None);
foreach (DurableOrchestrationStatus instance in result.DurableOrchestrationState)
{
log.LogInformation(JsonConvert.SerializeObject(instance));
}
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const runtimeStatus = [
df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Running,
];
const instances = await client.getStatusBy(
new Date(2021, 3, 10, 10, 1, 0),
new Date(2021, 3, 10, 10, 23, 59),
runtimeStatus
);
instances.forEach((instance) => {
context.log(JSON.stringify(instance));
});
};
Ознакомьтесь с начальными экземплярами конфигурации function.json.
import logging
from datetime import datetime
import json
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
runtime_status = [OrchestrationRuntimeStatus.Completed, OrchestrationRuntimeStatus.Running]
instances = await client.get_status_by(
datetime(2021, 3, 10, 10, 1, 0),
datetime(2021, 3, 10, 10, 23, 59),
runtime_status
)
for instance in instances:
logging.log(json.dumps(instance))
Замечание
Эта функция в настоящее время не поддерживается в PowerShell, но ее можно достичь с помощью API Устойчивые функции HTTP.
@FunctionName("GetRunningInstances")
public String getRunningInstances(
@HttpTrigger(name = "req", methods = {HttpMethod.GET}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
OrchestrationStatusQuery filter = new OrchestrationStatusQuery()
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.PENDING, OrchestrationRuntimeStatus.RUNNING))
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(7)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(1)));
OrchestrationStatusQueryResult result = client.queryInstances(filter);
return "Found " + result.getOrchestrationState().size() + " orchestrations.";
}
using Microsoft.DurableTask.Client;
// Get running or pending instances created in the last 7 days
var query = new OrchestrationQuery
{
Statuses = new[] { OrchestrationRuntimeStatus.Running, OrchestrationRuntimeStatus.Pending },
CreatedFrom = DateTime.UtcNow.AddDays(-7),
CreatedTo = DateTime.UtcNow.AddDays(-1),
PageSize = 100
};
AsyncPageable<OrchestrationMetadata> instances = client.GetAllInstancesAsync(query);
await foreach (OrchestrationMetadata instance in instances)
{
Console.WriteLine($"{instance.InstanceId}: {instance.RuntimeStatus}");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from datetime import datetime, timedelta
# Get running or pending instances created in the last 7 days
instances = client.list_orchestrations(
created_time_from=datetime.utcnow() - timedelta(days=7),
created_time_to=datetime.utcnow() - timedelta(days=1),
runtime_status=['RUNNING', 'PENDING']
)
for instance in instances:
print(f"{instance.instance_id}: {instance.runtime_status}")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationStatusQuery;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
// Get running or pending instances created in the last 7 days
OrchestrationStatusQuery filter = new OrchestrationStatusQuery()
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.PENDING, OrchestrationRuntimeStatus.RUNNING))
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(7)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(1)));
OrchestrationStatusQueryResult result = client.queryInstances(filter);
for (OrchestrationMetadata instance : result.getOrchestrationState()) {
System.out.println(instance.getInstanceId() + ": " + instance.getRuntimeStatus());
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Get running or pending instances created in the last 7 days
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
const instances = client.getAllInstances({
statuses: [OrchestrationStatus.RUNNING, OrchestrationStatus.PENDING],
createdFrom: sevenDaysAgo,
createdTo: oneDayAgo,
});
for await (const instance of instances) {
console.log(`${instance.instanceId}: ${instance.runtimeStatus}`);
}
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Завершение экземпляров оркестрации
Если у вас есть процесс оркестрации, который выполняется слишком долго, или если по какой-либо причине его нужно остановить до завершения, вы можете завершить его.
Два параметра для API завершения — это идентификатор экземпляра и строка причины , которая записывается в журналы и состояние экземпляра.
[FunctionName("TerminateInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("terminate-queue")] string instanceId)
{
string reason = "Found a bug";
return client.TerminateAsync(instanceId, reason);
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const reason = "Found a bug";
return client.terminate(instanceId, reason);
};
Ознакомьтесь с начальными экземплярами конфигурации function.json.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
reason = "Found a bug"
return await client.terminate(instance_id, reason)
param($Request, $TriggerMetadata)
# Get instance id from body
$InstanceId = $Request.Body.InstanceId
$Reason = 'Found a bug'
Stop-DurableOrchestration -InstanceId $InstanceId -Reason $Reason
@FunctionName("TerminateInstance")
public void terminateInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
String reason = "Found a bug";
durableContext.getClient().terminate(instanceID, reason);
}
using Microsoft.DurableTask.Client;
string reason = "Found a bug";
await client.TerminateInstanceAsync(instanceId, reason);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
reason = "Found a bug"
client.terminate_orchestration(instance_id, reason=reason)
import com.microsoft.durabletask.DurableTaskClient;
String reason = "Found a bug";
client.terminate(instanceId, reason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const reason = "Found a bug";
await client.terminateOrchestration(instanceId, reason);
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Завершённый экземпляр в конечном итоге переходит в состояние Terminated. Но этот переход не происходит сразу. Фактически, операция завершения помещается в очередь в узле задач вместе с другими операциями для этого экземпляра.
API-интерфейсы запросов экземпляра можно использовать для того, чтобы узнать, когда завершенный экземпляр фактически достиг Terminated состояния.
Замечание
Завершение экземпляра в настоящее время не передается. Функции активности и подоркестрации выполняются до завершения, независимо от завершения вызывающего их экземпляра оркестрации.
Приостановка и возобновление экземпляров оркестрации
Приостановка оркестрации позволяет прекратить её выполнение. В отличие от завершения оркестрации, приостановленный рабочий процесс можно возобновить позже.
Два параметра для API приостановки — это идентификатор экземпляра и строка причины, записываемая в журналы и состояние экземпляра.
[FunctionName("SuspendResumeInstance")]
public static async Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("suspend-resume-queue")] string instanceId)
{
// To suspend an orchestration
string suspendReason = "Need to pause workflow";
await client.SuspendAsync(instanceId, suspendReason);
// To resume an orchestration
string resumeReason = "Continue workflow";
await client.ResumeAsync(instanceId, resumeReason);
}
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
// To suspend an orchestration
const suspendReason = "Need to pause workflow";
await client.suspend(instanceId, suspendReason);
// To resume an orchestration
const resumeReason = "Continue workflow";
await client.resume(instanceId, resumeReason);
};
import azure.functions as func
import azure.durable_functions as df
from datetime import timedelta
async def main(req: func.HttpRequest, starter: str, instance_id: str):
client = df.DurableOrchestrationClient(starter)
# To suspend an orchestration
suspend_reason = "Need to pause workflow"
await client.suspend(instance_id, suspend_reason)
# To resume an orchestration
resume_reason = "Continue workflow"
await client.resume(instance_id, resume_reason)
param($Request, $TriggerMetadata)
$InstanceId = $Request.Body.InstanceId
# To suspend an orchestration
$SuspendReason = 'Need to pause workflow'
Suspend-DurableOrchestration -InstanceId $InstanceId -Reason $SuspendReason
# To resume an orchestration
$ResumeReason = 'Continue workflow'
Resume-DurableOrchestration -InstanceId $InstanceId -Reason $ResumeReason
Замечание
Эта функция доступна только в автономном наборе средств разработки PowerShell для Устойчивые функции. Ознакомьтесь с разницей между автономным пакетом SDK и устаревшим встроенным пакетом SDK вместе с руководством по миграции.
@FunctionName("SuspendResumeInstance")
public void suspendResumeInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
DurableTaskClient client = durableContext.getClient();
// To suspend an orchestration
String suspendReason = "Need to pause workflow";
client.suspendInstance(instanceID, suspendReason);
// To resume an orchestration
String resumeReason = "Continue workflow";
client.resumeInstance(instanceID, resumeReason);
}
using Microsoft.DurableTask.Client;
// To suspend an orchestration
string suspendReason = "Need to pause workflow";
await client.SuspendInstanceAsync(instanceId, suspendReason);
// To resume an orchestration
string resumeReason = "Continue workflow";
await client.ResumeInstanceAsync(instanceId, resumeReason);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# To suspend an orchestration
suspend_reason = "Need to pause workflow"
client.suspend_orchestration(instance_id, reason=suspend_reason)
# To resume an orchestration
resume_reason = "Continue workflow"
client.resume_orchestration(instance_id, reason=resume_reason)
import com.microsoft.durabletask.DurableTaskClient;
// To suspend an orchestration
String suspendReason = "Need to pause workflow";
client.suspendInstance(instanceId, suspendReason);
// To resume an orchestration
String resumeReason = "Continue workflow";
client.resumeInstance(instanceId, resumeReason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// To suspend an orchestration
await client.suspendOrchestration(instanceId);
// To resume an orchestration
await client.resumeOrchestration(instanceId);
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Приостановленный экземпляр в конечном итоге переходит в состояние Suspended. Однако этот переход не происходит сразу. Вместо этого операция приостановки ставится в очередь в концентраторе задач вместе с другими операциями для этого экземпляра. Используйте API-интерфейсы запроса экземпляра, чтобы узнать, когда запущенный экземпляр фактически достиг Suspended состояния.
Когда приостановленный оркестратор восстанавливается, его состояние меняется на Running.
Отправка событий в экземпляры
В некоторых сценариях функции оркестратора должны ожидать и прослушивать внешние события. Примеры, в которых этот подход полезен, включают в себя сценарии мониторинга и взаимодействия с человеком .
В некоторых сценариях оркестрации должны ждать и прослушивать внешние события. Примеры, в которых этот подход полезен, включают в себя сценарии мониторинга и взаимодействия с человеком .
Вы можете отправлять уведомления о событиях в запущенные экземпляры с помощью API инициации события клиента оркестрации. Оркестрации могут прослушивать и реагировать на эти события с помощью API оркестратора ожидания внешнего события.
Параметры инициирования события :
-
Идентификатор экземпляра: уникальный идентификатор экземпляра.
-
Имя события: имя события для отправки.
-
Данные событий: полезные данные, сериализуемые в ФОРМАТЕ JSON для отправки в экземпляр.
[FunctionName("RaiseEvent")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("event-queue")] string instanceId)
{
int[] eventData = new int[] { 1, 2, 3 };
return client.RaiseEventAsync(instanceId, "MyEvent", eventData);
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const eventData = [ 1, 2, 3 ];
return client.raiseEvent(instanceId, "MyEvent", eventData);
};
Ознакомьтесь с начальными экземплярами конфигурации function.json.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
event_data = [1, 2 ,3]
return await client.raise_event(instance_id, 'MyEvent', event_data)
param($Request, $TriggerMetadata)
# Get instance id from body
$InstanceId = $Request.Body.InstanceId
$EventName = 'MyEvent'
$EventData = @(1,2,3)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName $EventName -EventData $EventData
@FunctionName("RaiseEvent")
public void raiseEvent(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
String eventName = "MyEvent";
int[] eventData = { 1, 2, 3 };
durableContext.getClient().raiseEvent(instanceID, eventName, eventData);
}
using Microsoft.DurableTask.Client;
int[] eventData = new int[] { 1, 2, 3 };
await client.RaiseEventAsync(instanceId, "MyEvent", eventData);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
event_data = [1, 2, 3]
client.raise_orchestration_event(instance_id, "MyEvent", event_data)
import com.microsoft.durabletask.DurableTaskClient;
int[] eventData = { 1, 2, 3 };
client.raiseEvent(instanceId, "MyEvent", eventData);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const eventData = [1, 2, 3];
await client.raiseOrchestrationEvent(instanceId, "MyEvent", eventData);
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Замечание
Если экземпляр оркестрации с указанным идентификатором экземпляра отсутствует, сообщение о событии удаляется. Если экземпляр существует, но он еще не ожидает события, событие хранится в состоянии экземпляра, пока не будет готово к получению и обработке.
Дождитесь завершения оркестрации
В длительных оркестрациях может потребоваться ждать и получать результаты оркестрации. В таких случаях также полезно определить период времени ожидания оркестрации. Если превышено время ожидания, то возвращается состояние оркестрации вместо того, чтобы вернуть результаты.
Используйте API "wait for completion or create check status response" для синхронного получения фактического результата экземпляра оркестрации. По умолчанию этот метод имеет время ожидания в течение десяти секунд и интервал опроса в одну секунду.
Ниже приведен пример функции HTTP-триггера, демонстрирующей использование этого API:
import logging
import azure.functions as func
import azure.durable_functions as df
timeout = "timeout"
retry_interval = "retryInterval"
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params['functionName'], None, req.get_body())
logging.log(f"Started orchestration with ID = '${instance_id}'.")
timeout_in_milliseconds = get_time_in_seconds(req, timeout)
timeout_in_milliseconds = timeout_in_milliseconds if timeout_in_milliseconds != None else 30000
retry_interval_in_milliseconds = get_time_in_seconds(req, retry_interval)
retry_interval_in_milliseconds = retry_interval_in_milliseconds if retry_interval_in_milliseconds != None else 1000
return await client.wait_for_completion_or_create_check_status_response(
req,
instance_id,
timeout_in_milliseconds,
retry_interval_in_milliseconds
)
def get_time_in_seconds(req: func.HttpRequest, query_parameter_name: str):
query_value = req.params.get(query_parameter_name)
return query_value if query_value != None else 1000
Замечание
В настоящее время PowerShell не имеет встроенной команды для этого сценария.
Java в настоящее время не имеет одного метода для этого сценария, но его можно реализовать с помощью нескольких дополнительных строк кода.
@FunctionName("HttpStartAndWait")
public HttpResponseMessage httpStartAndWait(
@HttpTrigger(name = "req", route = "orchestrators/{functionName}/wait", methods = {HttpMethod.POST}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
@BindingName("functionName") String functionName,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance(functionName);
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
try {
String timeoutString = req.getQueryParameters().get("timeout");
Integer timeoutInSeconds = Integer.parseInt(timeoutString);
OrchestrationMetadata orchestration = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(timeoutInSeconds),
true /* getInputsAndOutputs */);
return req.createResponseBuilder(HttpStatus.OK)
.body(orchestration.getSerializedOutput())
.header("Content-Type", "application/json")
.build();
} catch (TimeoutException timeoutEx) {
// timeout expired - return a 202 response
return durableContext.createCheckStatusResponse(req, instanceId);
}
}
SDK «Durable Task» предоставляют метод для синхронного ожидания завершения оркестрации.
using Microsoft.DurableTask.Client;
// Wait for orchestration to complete with a timeout
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(
instanceId,
timeout: TimeSpan.FromSeconds(30),
getInputsAndOutputs: true);
if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
Console.WriteLine($"Output: {metadata.SerializedOutput}");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Wait for orchestration to complete with a timeout
state = client.wait_for_orchestration_completion(
instance_id,
timeout=30,
fetch_payloads=True)
if state.runtime_status == 'COMPLETED':
print(f"Output: {state.serialized_output}")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
// Wait for orchestration to complete with a timeout
try {
OrchestrationMetadata metadata = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true /* getInputsAndOutputs */);
System.out.println("Output: " + metadata.getSerializedOutput());
} catch (TimeoutException e) {
System.out.println("Orchestration did not complete within timeout");
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Wait for orchestration to complete with a timeout
const state = await client.waitForOrchestrationCompletion(instanceId, true, 30);
if (state?.runtimeStatus === OrchestrationStatus.COMPLETED) {
console.log(`Output: ${state.serializedOutput}`);
}
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Вызовите функцию со следующей строкой. Используйте две секунды для времени ожидания и 0,5 секунд для интервала повтора:
curl -X POST "http://localhost:7071/orchestrators/E1_HelloSequence/wait?timeout=2&retryInterval=0.5"
Замечание
В приведенной выше команде cURL предполагается, что в проекте есть функция оркестратора с именем E1_HelloSequence. Поскольку функция триггера HTTP написана таким образом, вы можете заменить её именем любой функции оркестратора в вашем проекте.
В зависимости от времени, необходимого для получения ответа от экземпляра оркестрации, существует два случая:
- Экземпляры оркестрации завершаются в течение определенного времени ожидания (в этом случае два секунды), а ответ — фактические выходные данные экземпляра оркестрации, доставленные синхронно:
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Thu, 14 Dec 2021 06:14:29 GMT
Transfer-Encoding: chunked
[
"Hello Tokyo!",
"Hello Seattle!",
"Hello London!"
]
- Экземпляры оркестрации не могут завершиться в течение определенного времени ожидания, а ответ — это ответ по умолчанию, описанный в обнаружении URL-адресов API HTTP:
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Date: Thu, 14 Dec 2021 06:13:51 GMT
Location: http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177?taskHub={taskHub}&connection={connection}&code={systemKey}
Retry-After: 10
Transfer-Encoding: chunked
{
"id": "d3b72dddefce4e758d92f4d411567177",
"sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/raiseEvent/{eventName}?taskHub={taskHub}&connection={connection}&code={systemKey}",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177?taskHub={taskHub}&connection={connection}&code={systemKey}",
"terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/terminate?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}",
"suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/suspend?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}",
"resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/resume?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}"
}
Замечание
Формат URL-адресов веб-перехватчика может отличаться в зависимости от используемой версии узла Функции Azure. Предыдущий пример предназначен для узла Функции Azure 3.0.
Получение URL-адресов HTTP-вебхуков для управления экземплярами оркестрации
Используйте внешнюю систему для отслеживания или инициирования событий для оркестрации. Внешние системы взаимодействуют с Услугами Устойчивые функции через URL-адреса веб-перехватчиков, которые являются частью ответа по умолчанию, описанного в обнаружение URL-адресов API HTTP. URL-адреса веб-перехватчика также доступны программным способом с помощью привязки клиента оркестрации. В частности, API по созданию полезной нагрузки для управления HTTP получает сериализуемый объект, содержащий эти URL-адреса веб-перехватчика.
В API для создания полезной нагрузки управления HTTP есть один параметр:
-
Идентификатор экземпляра: уникальный идентификатор экземпляра.
Методы возвращают объект со следующими строковыми свойствами:
-
Идентификатор: идентификатор экземпляра оркестрации (должен совпадать с входным значением
InstanceId ).
-
StatusQueryGetUri: URL состояния экземпляра оркестрации.
-
SendEventPostUri: URL-адрес для инициирования события экземпляра оркестрации.
-
TerminatePostUri: URL-адрес "завершения" экземпляра оркестрации.
-
PurgeHistoryDeleteUri: URL удаления истории очистки экземпляра оркестрации.
-
SuspendPostUri: URL-адрес приостановки экземпляра оркестрации.
-
ResumePostUri: URL-адрес "резюме" экземпляра оркестрации.
Функции отправляют экземпляры этих объектов внешним системам для отслеживания или вызова событий в соответствующих оркестрациях, как показано в следующих примерах.
[FunctionName("SendInstanceInfo")]
public static void SendInstanceInfo(
[ActivityTrigger] IDurableActivityContext ctx,
[DurableClient] IDurableOrchestrationClient client,
[CosmosDB(
databaseName: "MonitorDB",
containerName: "HttpManagementPayloads",
Connection = "CosmosDBConnectionSetting")]out dynamic document)
{
HttpManagementPayload payload = client.CreateHttpManagementPayload(ctx.InstanceId);
// send the payload to Azure Cosmos DB
document = new { Payload = payload, id = ctx.InstanceId };
}
Замечание
В предыдущем коде C# используется внутрипроцессная модель с IDurableOrchestrationClient и IDurableActivityContext, которые помечены как устаревшие в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
const df = require("durable-functions");
modules.exports = async function(context, ctx) {
const client = df.getClient(context);
const payload = client.createHttpManagementPayload(ctx.instanceId);
// send the payload to Azure Cosmos DB
context.bindings.document = JSON.stringify({
id: ctx.instanceId,
payload,
});
};
Ознакомьтесь с начальными экземплярами конфигурации function.json.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.cosmosdb.cdb.Document:
client = df.DurableOrchestrationClient(starter)
payload = client.create_check_status_response(req, instance_id).get_body().decode()
return func.cosmosdb.CosmosDBConverter.encode({
id: instance_id,
payload: payload
})
using namespace System.Net
param($Request, $TriggerMetadata)
$InstanceId = $Request.Body.InstanceId
$Response = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response
Замечание
Java в настоящее время не поддерживает эту функцию.
Экземпляры оркестрации перемотка
Если у вас возник сбой оркестрации по неожиданной причине, перемотайте экземпляр в ранее работоспособное состояние, используя API, предназначенный для этой цели.
Замечание
Этот API не предназначен для замены правильной обработки ошибок и политик повторных попыток. Скорее, он предназначен для использования только в случаях, когда экземпляры оркестрации завершаются сбоем по непредвиденным причинам. Оркестрации в состояниях, отличных от Failed (например, Running, Pending, Terminated или Completed) не могут быть "перемотаны". Дополнительные сведения об обработке ошибок и политиках повторных попыток см. в статье об обработке ошибок .
Используйте метод RewindAsync (.NET) или rewind (JavaScript) привязки клиента orchestration, чтобы вернуть оркестрацию в состояние Running. Этот метод также повторно выполняет действия или подоркестрации, сбои которых вызвали сбой оркестрации.
Например, предположим, что у вас есть рабочий процесс, включающий ряд утверждений человека. Предположим, что ряд функций действий уведомляет кого-то о необходимости утверждения и выжидает ответ в режиме реального времени. После того как все действия утверждения получают ответы или время ожидания, предположим, что другое действие завершается ошибкой из-за неправильной настройки приложения, например, недопустимой строки подключения к базе данных. Результатом является сбой оркестрации в глубине рабочего процесса. С помощью API RewindAsync (.NET) или rewind (JavaScript) администратор приложения может исправить ошибку конфигурации и перемотать сбой оркестрации обратно в состояние, предшествующее сбою. Ни один из шагов взаимодействия с человеком не требуется повторно утвердить, и оркестрация теперь может успешно завершиться.
Замечание
Функция перемотка не поддерживает экземпляры оркестрации, использующие долговременные таймеры.
[FunctionName("RewindInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("rewind-queue")] string instanceId)
{
string reason = "Orchestrator failed and needs to be revived.";
return client.RewindAsync(instanceId, reason);
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const reason = "Orchestrator failed and needs to be revived.";
return client.rewind(instanceId, reason);
};
Ознакомьтесь с начальными экземплярами конфигурации function.json.
Замечание
Python в настоящее время не поддерживает эту функцию.
Замечание
В настоящее время PowerShell не поддерживает эту функцию.
Замечание
Java в настоящее время не поддерживает эту функцию.
using Microsoft.DurableTask.Client;
string reason = "Orchestrator failed and needs to be revived.";
await client.RewindInstanceAsync(instanceId, reason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const reason = "Orchestrator failed and needs to be revived.";
await client.rewindInstance(instanceId, reason);
Этот пример показан только для .NET и JavaScript.
Этот пример показан только для .NET и JavaScript.
Этот пример показан только для .NET и JavaScript.
Перезапуск экземпляров оркестрации
Перезапуск оркестрации создает новый экземпляр, используя историю предыдущего запущенного экземпляра. Эта функция полезна, если вы хотите повторно запустить оркестрацию с тем же шаблоном входных данных и идентификатора экземпляра, создавая новый запуск на основе исходного.
[FunctionName("RestartInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("restart-queue")] string instanceId)
{
return client.RestartAsync(instanceId, restartWithNewInstanceId: true);
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
Замечание
JavaScript в настоящее время не поддерживает эту функцию.
Замечание
Python в настоящее время не поддерживает эту функцию.
Замечание
В настоящее время PowerShell не поддерживает эту функцию.
Замечание
Эта функция в настоящее время не поддерживается в Java.
using Microsoft.DurableTask.Client;
// Restart an orchestration with a new instance ID
string newInstanceId = await client.RestartInstanceAsync(instanceId, restartWithNewInstanceId: true);
Console.WriteLine($"Restarted as new instance: {newInstanceId}");
// Restart an orchestration keeping the same instance ID
await client.RestartInstanceAsync(instanceId, restartWithNewInstanceId: false);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Restart an orchestration with a new instance ID
const newInstanceId = await client.restartOrchestration(instanceId, true);
console.log(`Restarted as new instance: ${newInstanceId}`);
// Restart an orchestration keeping the same instance ID
await client.restartOrchestration(instanceId, false);
Этот пример показан только для .NET и JavaScript.
Этот пример показан только для .NET и JavaScript.
Этот пример показан только для .NET и JavaScript.
Очистка истории экземпляров оркестрации
Чтобы удалить все данные, связанные с оркестрацией, очистите историю экземпляра. Например, удалите все ресурсы хранилища, связанные с завершенным экземпляром. Используйте API для удаления экземпляра, используемый клиентом оркестрации.
В следующем примере показано, как очистить один экземпляр модуля оркестрации.
[FunctionName("PurgeInstanceHistory")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("purge-queue")] string instanceId)
{
return client.PurgeInstanceHistoryAsync(instanceId);
}
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
return client.purgeInstanceHistory(instanceId);
};
Ознакомьтесь с начальными экземплярами конфигурации function.json.
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
return await client.purge_instance_history(instance_id)
Замечание
Эта функция в настоящее время не поддерживается в PowerShell, но ее можно достичь с помощью API HTTP Устойчивые функции HTTP.
@FunctionName("PurgeInstance")
public HttpResponseMessage purgeInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}, route = "purge/{instanceID}") HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
@BindingName("instanceID") String instanceID) {
PurgeResult result = durableContext.getClient().purgeInstance(instanceID);
if (result.getDeletedInstanceCount() == 0) {
return req.createResponseBuilder(HttpStatus.NOT_FOUND)
.body("No completed instance with ID '" + instanceID + "' was found!")
.build();
} else {
return req.createResponseBuilder(HttpStatus.OK)
.body("Successfully purged data for " + instanceID)
.build();
}
}
using Microsoft.DurableTask.Client;
// Purge a single orchestration instance
PurgeResult result = await client.PurgeInstanceAsync(instanceId);
Console.WriteLine($"Purged {result.PurgedInstanceCount} instance(s).");
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Purge a single orchestration instance
result = client.purge_orchestration(instance_id)
print(f"Purged {result.deleted_instance_count} instance(s).")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.PurgeResult;
// Purge a single orchestration instance
PurgeResult result = client.purgeInstance(instanceId);
System.out.println("Purged " + result.getDeletedInstanceCount() + " instance(s).");
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Purge a single orchestration instance
const result = await client.purgeOrchestration(instanceId);
console.log(`Purged ${result?.deletedInstanceCount ?? 0} instance(s).`);
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
В следующем примере показана функция, запускаемая таймером, которая очищает историю всех экземпляров оркестрации, завершившихся после указанного интервала времени. В этом случае он удаляет данные для всех результатов выполнения, завершённых 30 или более дней назад. В этом примере функция запланирована на выполнение один раз в день в 12:00 по времени UTC.
[FunctionName("PurgeInstanceHistory")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[TimerTrigger("0 0 12 * * *")] TimerInfo myTimer)
{
return client.PurgeInstanceHistoryAsync(
DateTime.MinValue,
DateTime.UtcNow.AddDays(-30),
new List<OrchestrationStatus>
{
OrchestrationStatus.Completed
});
}
Замечание
В предыдущем коде C# используется модель внутрипроцессного процесса IDurableOrchestrationClient, которая помечена как устаревшая в более новых версиях расширения устойчивых функций. Для новых проектов .NET рекомендуется использовать изолированную рабочую модель .NET с DurableTaskClient. Дополнительные сведения см. в статье о версиях устойчивых функций .
Этот purgeInstanceHistoryBy метод можно использовать для условной очистки истории экземпляров для нескольких экземпляров.
function.json
{
"bindings": [
{
"schedule": "0 0 12 * * *",
"name": "myTimer",
"type": "timerTrigger",
"direction": "in"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
Замечание
Этот пример предназначен для Устойчивые функции версии 2.x. В версии 1.x используйте orchestrationClient вместо durableClient.
index.js
const df = require("durable-functions");
module.exports = async function (context, myTimer) {
const client = df.getClient(context);
const createdTimeFrom = new Date(0);
const createdTimeTo = new Date().setDate(today.getDate() - 30);
const runtimeStatuses = [ df.OrchestrationRuntimeStatus.Completed ];
return client.purgeInstanceHistoryBy(createdTimeFrom, createdTimeTo, runtimeStatuses);
};
Замечание
Эта функция в настоящее время не поддерживается в PowerShell, но ее можно достичь с помощью API HTTP Устойчивые функции HTTP.
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.DurableOrchestrationStatus import OrchestrationRuntimeStatus
from datetime import datetime, timedelta
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
created_time_from = datetime.min
created_time_to = datetime.today() + timedelta(days = -30)
runtime_statuses = [OrchestrationRuntimeStatus.Completed]
return await client.purge_instance_history_by(created_time_from, created_time_to, runtime_statuses)
@FunctionName("PurgeInstances")
public void purgeInstances(
@TimerTrigger(name = "purgeTimer", schedule = "0 0 12 * * *") String timerInfo,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
ExecutionContext context) throws TimeoutException {
PurgeInstanceCriteria criteria = new PurgeInstanceCriteria()
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(60)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(30)))
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.COMPLETED));
PurgeResult result = durableContext.getClient().purgeInstances(criteria);
context.getLogger().info(String.format("Purged %d instance(s)", result.getDeletedInstanceCount()));
}
using Microsoft.DurableTask.Client;
// Purge completed instances older than 30 days
var filter = new PurgeInstancesFilter(
CreatedFrom: DateTime.MinValue,
CreatedTo: DateTime.UtcNow.AddDays(-30),
Statuses: new[] { OrchestrationRuntimeStatus.Completed });
PurgeResult result = await client.PurgeAllInstancesAsync(filter);
Console.WriteLine($"Purged {result.PurgedInstanceCount} instance(s).");
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from datetime import datetime, timedelta
# Purge completed instances older than 30 days
result = client.purge_orchestrations(
created_time_from=datetime.min,
created_time_to=datetime.utcnow() - timedelta(days=30),
runtime_status=['COMPLETED']
)
print(f"Purged {result.deleted_instance_count} instance(s).")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.PurgeInstanceCriteria;
import com.microsoft.durabletask.PurgeResult;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
// Purge completed instances older than 30 days
PurgeInstanceCriteria criteria = new PurgeInstanceCriteria()
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(30)))
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.COMPLETED));
PurgeResult result = client.purgeInstances(criteria);
System.out.println("Purged " + result.getDeletedInstanceCount() + " instance(s).");
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus, PurgeInstanceCriteria } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Purge completed instances older than 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const criteria = new PurgeInstanceCriteria();
criteria.setCreatedTimeTo(thirtyDaysAgo);
criteria.setRuntimeStatusList([OrchestrationStatus.COMPLETED]);
const result = await client.purgeOrchestration(criteria);
console.log(`Purged ${result?.deletedInstanceCount ?? 0} instance(s).`);
Пакет SDK для устойчивых задач недоступен для PowerShell. Вместо этого используйте Устойчивые функции.
Замечание
Для успешной операции очистки журнала состояние среды выполнения целевого экземпляра должно быть завершено, завершено или завершилось сбоем.
Дальнейшие действия