Практическое руководство. Реализация шаблона потока данных производителя-потребителя
Из этой статьи вы узнаете, как использовать библиотеки потоков данных TPL для реализации шаблона "производитель-получатель". В этом шаблоне производитель отправляет сообщения в блок сообщений, а потребитель считывает сообщения из этого блока.
Примечание.
Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не поставляется с .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Проект и выполните поиск пакета System.Threading.Tasks.Dataflow
в Интернете. Вы также можете установить его, выполнив в .NET Core CLI команду dotnet add package System.Threading.Tasks.Dataflow
.
Пример
В следующем примере показана базовая модель производителя-потребителя, использующая поток данных. Метод Produce
записывает массивы, содержащие случайные байты данных, в объект System.Threading.Tasks.Dataflow.ITargetBlock<TInput>, а метод Consume
выполняет чтение байтов из объекта System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Используя интерфейсы ISourceBlock<TOutput> и ITargetBlock<TInput> вместо их производных типов, можно создавать пригодный для повторного использования код, который может работать с различными типами блоков потока данных. В этом примере используется класс BufferBlock<T>. Поскольку класс BufferBlock<T> действует и как блок источника, и как целевой блок, потребитель и производитель могут использовать общий объект для передачи данных.
Метод Produce
вызывает метод Post в цикле для синхронной записи данных в целевой блок. После того, как метод Produce
записывает все данные в целевой блок, он вызывает метод Complete, чтобы указать, что у этого блока никогда не будет дополнительных доступных данных. Метод Consume
использует операторы async и await (Async и Await в Visual Basic) для асинхронного вычисления общего числа байтов, полученных от объекта ISourceBlock<TOutput>. Для асинхронной работы метод Consume
вызывает метод OutputAvailableAsync, чтобы получать уведомления, если блок источника получит доступные данные и если у блока источника никогда не будет дополнительных доступных данных.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Отказоустойчивость
В предыдущем примере используется только один получатель для обработки исходных данных. При наличии нескольких получателей в приложении следует использовать метод TryReceive для чтения данных из блока источника, как показано в следующем примере.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
Метод TryReceive возвращает False
, когда нет доступных данных. Когда несколько потребителей должны использовать блок источника параллельно, этот механизм гарантирует, что данные все еще будут доступны после вызова OutputAvailableAsync.