Поделиться через


Поток данных (библиотека параллельных задач)

Библиотека параллельных задач (TPL) предоставляет компоненты потока данных, что позволяет повысить надежность приложений с включенным параллелизмом. Эти компоненты потока данных вместе называются Библиотекой потоков данных TPL. Эта модель потоков данных поддерживает программирование на основе субъектов путем обеспечения внутрипроцессной передачи сообщений для недетализированного потока данных и задач по конвейеризации. Компоненты потоков данных строятся на типах и инфраструктуре планирования TPL и интегрированы с языковой поддержкой асинхронного программирования на C#, Visual Basic и F#. Эти компоненты потоков данных полезны при наличии нескольких операций, которые должны асинхронно взаимодействовать друг с другом, или при необходимости обрабатывать данные по мере того, как они становятся доступными. Например, рассмотрим приложение, которое обрабатывает данные, поступающие с веб-камеры. С помощью модели потока данных приложение может обрабатывать кадры, как только они становятся доступными. Если приложение повышает качество изображений на кадрах, например, выполняя коррекцию освещенности или удаление "красных глаз", можно создать конвейер компонентов потока данных. Каждый этап конвейера может использовать функциональность с более грубым параллелизмом, например функцию, предоставляемую библиотекой TPL, для преобразования изображения.

В этом документе содержатся общие сведения о библиотеке потоков данных TPL. Здесь описывается модель программирования, предопределенные типы блоков потоков данных и способы настройки блоков потоков данных для соответствия требованиям вашего приложения.

Примечание.

Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не поставляется с .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Проект и выполните поиск пакета System.Threading.Tasks.Dataflow в Интернете. Вы также можете установить его, выполнив в .NET Core CLI команду dotnet add package System.Threading.Tasks.Dataflow.

Модель программирования

Библиотека потоков данных TPL обеспечивает основу для передачи сообщений и параллелизации приложений, создающих большую нагрузку на ЦПУ и ввод-вывод, которые имеют высокую пропускную способность и низкую задержку. Она также предоставляет явный контроль над тем, как данные буферизуются и перемещаются по системе. Чтобы лучше понять модель программирования потоков данных, рассмотрим приложение, асинхронно загружающее изображения с диска и создающее из них композитное изображение. Традиционные модели программирования обычно требуют использования обратных вызовов и объектов синхронизации, например блокировок, для координации задач и доступа к общим данным. С помощью модели программирования потоков данных можно создавать объекты потоков данных, которые обрабатывают изображения по мере считывания их с диска. В модели потоков данных объявляется, как обрабатываются данные, когда они становятся доступными, а также объявляются любые зависимости между данными. Поскольку среда выполнения управляет зависимостями между данными, часто можно избежать необходимости синхронизировать доступ к общим данным. Кроме того, поскольку планирование в среде выполнения основано на асинхронном прибытии данных, поток данных может увеличить пропускную способность и ускорить время ответа, эффективно управляя лежащими в основе потоками. Пример, в котором используется модель программирования потоков данных для реализации обработки изображений в приложении Windows Forms, см. в разделе Пошаговое руководство. Использование потока данных в приложении Windows Forms.

Источники и целевые блоки

Библиотека потоков данных TPL состоит из блоков потоков данных, которые представляют собой структуры данных, буферизующие и обрабатывающие данные. В TPL определено три типа блоков потоков данных: блоки источника, целевые блоки и блоки передачи. Блок источника выступает в качестве источника данных, из которого можно считать данные. Целевой блок выступает в качестве получателя данных, в который можно писать. Блок передачи действует и как блок источника, и как целевой блок: из него можно читать и в него можно писать. TPL определяет интерфейс System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> для представления источников, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> для представления целевых объектов и System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> для представления передающих. IPropagatorBlock<TInput,TOutput> наследуется и от ISourceBlock<TOutput>, и от ITargetBlock<TInput>.

Библиотека потоков данных TPL предоставляет несколько предопределенных типов блоков потоков данных, которые реализуют интерфейсы ISourceBlock<TOutput>, ITargetBlock<TInput> и IPropagatorBlock<TInput,TOutput>. Эти типы блоков потоков данных описаны в настоящем документе в разделе Предопределенные типы блоков потоков данных.

Соединение блоков

Можно также соединять блоки потоков данных для создания конвейеров, которые являются линейными последовательностями блоков потоков данных, или сетей, являющихся графами блоков потоков данных. Конвейер является одним из видов сетей. Конвейеры или сети асинхронно распространяют исходные данные целевым объектам, когда данные становятся доступны. Метод ISourceBlock<TOutput>.LinkTo связывает блок потока данных источника и целевой блок. Источник может быть связан с несколькими целевыми объектами или не связан ни с одним; целевые объекты могут иметь связь с несколькими источниками или не иметь связей. Можно добавлять или удалять блоки потока данных из конвейера или сети одновременно. Предопределенные типы блоков потоков данных отвечают за все аспекты потокобезопасности установки и удаления связей.

Пример, в котором блоки потоков данных соединяются в базовый конвейер, см. в разделе Пошаговое руководство. Создание конвейера потока данных. Пример, в котором блоки потоков данных формируют более сложную сеть, см. в разделе Пошаговое руководство. Использование потока данных в приложении Windows Forms. Пример, в котором происходит удаление связи между источником и целевым объектом после того, как источник отправляет целевому объекту сообщение, см. в разделе Практическое руководство. Удаление связей с блоками потоков данных.

Фильтрация

При вызове метода ISourceBlock<TOutput>.LinkTo для связывания источника с целевым объектом, можно указать делегат, который определяет, принимает или отклоняет целевой блок сообщение в зависимости от содержания этого сообщения. Механизм фильтрации позволяет гарантировать, что блок потока данных будет получать только определенные значения. Для большинства стандартных типов блока потока данных, если блок источника подключен к нескольким целевым блокам, когда один из целевых блоков отвергает сообщение, это сообщение отправляется следующему целевому объекту. Порядок, в котором источник отправляет сообщения целевым объектам, определяется источником и может различаться в зависимости от типа источника. Большинство типов блоков источников перестают отправлять сообщение после того, как один из целевых объектов его принимает. Единственным исключением из этого правила является класс BroadcastBlock<T>, который предлагает каждое сообщение всем целевым объектам, даже если некоторые из целевых объектов отклоняют это сообщение. Пример, в котором с помощью фильтрации обрабатываются только определенные сообщения, см. в разделе Пошаговое руководство. Использование потока данных в приложении Windows Forms.

Внимание

Поскольку все стандартные типы блоков потоков данных источника гарантируют, что сообщения передаются в том порядке, в каком они поступают, каждое сообщение должно быть считано из блока источника, чтобы блок источника мог обработать следующее сообщение. Поэтому при использовании фильтрации для подключения нескольких целевых объектов к источнику, убедитесь, что хотя бы один целевой блок получит все сообщения. В противном случае в приложении может возникнуть взаимоблокировка.

Передача сообщений

Модель программирования на основе потоков данных связана с понятием передача сообщений, так как в этой модели независимые компоненты программы взаимодействуют друг с другом посредством отправки сообщений. Одним из способов распространения сообщений между компонентами приложения является вызов Post (синхронных) и SendAsync (асинхронных) методов для отправки сообщений в целевые блоки потока данных, а TryReceive также ReceiveReceiveAsyncметодов для получения сообщений из исходных блоков. Эти методы можно объединить с конвейерами потока данных или сетями, отправив входные данные в головной узел (целевой блок) и получив выходные данные из узла терминала конвейера или узлов терминала сети (один или несколько исходных блоков). Можно также использовать метод Choose для чтения данных из первого из имеющихся источников, где доступны данные, и выполнения действий с этими данными.

Блоки источника предлагают данные целевым блокам, вызывая метод ITargetBlock<TInput>.OfferMessage. Целевой блок отвечает на предложенное сообщение одним из трех способов: он может принять сообщение, отклонить сообщение или отложить сообщение. Если целевой объект принимает сообщение, метод OfferMessage возвращает Accepted. Если целевой объект отклоняет сообщение, метод OfferMessage возвращает Declined. Если целевой объект сообщает, что больше не будет получать сообщений от этого источника, OfferMessage возвращает DecliningPermanently. Стандартные типы блоков источника не предлагают связанным целевым объектам сообщения после получения такого значения, и они автоматически удаляют связи с этим целевым объектом.

Когда целевой блок откладывает сообщение для последующего использования, метод OfferMessage возвращает Postponed. Целевой блок, откладывающий сообщение, может позднее вызвать метод ISourceBlock<TOutput>.ReserveMessage, чтобы попытаться зарезервировать предложенное сообщение. На этом этапе сообщение либо по-прежнему доступно и может быть использовано целевым блоком, либо было принято другим целевым объектом. Если целевой блок пытается получить сообщение позже, он вызывает метод ISourceBlock<TOutput>.ConsumeMessage, а когда сообщение больше не нужно — метод ReleaseReservation. Резервирование сообщений обычно используется типами блоков потоков данных, которые работают в нежадном режиме. Нежадный режим описан далее в этом документе. Вместо резервирования отложенного сообщения целевой блок может также использовать метод ISourceBlock<TOutput>.ConsumeMessage, чтобы попытаться напрямую использовать отложенное сообщение.

Завершение блока потока данных

Блоки потоков данных также поддерживают понятие завершения. Блок потока данных, находящийся в состоянии завершения, не будет далее выполнять никакой работы. С каждым блоком потока данных связан объект System.Threading.Tasks.Task, который называется задачей завершения и представляет состояние завершенности блока. Поскольку можно дождаться завершения для объекта Task, с помощью задачи завершения, можно дождаться завершения одного или нескольких терминальных узлов сети потоков данных. Интерфейс IDataflowBlock определяет метод Complete, который уведомляет блок потока данных о запросе завершения, и свойство Completion, возвращающее задачу завершения для блока потока данных. Интерфейсы ISourceBlock<TOutput> и ITargetBlock<TInput> наследуются от IDataflowBlock.

Существует два способа определить, завершился ли блок потока данных без ошибки, с одной или несколькими ошибками или был отменен. Первый способ — вызвать метод Task.Wait для задачи завершения в блоке try-catch (Try-Catch в Visual Basic). В следующем примере создается объект ActionBlock<TInput>, создающий исключение ArgumentOutOfRangeException, если его входное значение меньше нуля. AggregateException возникает, когда в данном примере вызывается Wait для задачи завершения. Доступ к объекту ArgumentOutOfRangeException обеспечивается с помощью свойства InnerExceptions объекта AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

В этом примере показан случай, в котором необработанное исключение передается в делегат блока выполнения потока данных. Рекомендуется обрабатывать исключения в телах таких блоков. Однако, если возможности сделать это нет, блок ведет себя, как если бы он был отменен, и не обрабатывает входящие сообщения.

Если блок потока данных отменяется явно, объект AggregateException содержит OperationCanceledException в свойстве InnerExceptions. Дополнительные сведения об отмене потока данных см. в разделе Выполнение отмены.

Второй способ определить состояние завершения блока потока данных — использовать продолжение задачи завершения или использовать асинхронные функции языка C# и Visual Basic, чтобы асинхронно ожидать завершения задачи. Делегат, который предоставляется методу Task.ContinueWith, принимает объект Task, представляющий предшествующую задачу. В случае со свойством Completion делегат для продолжения принимает саму задачу завершения. Следующий пример похож на предыдущий, но в нем используется метод ContinueWith для создания задачи завершения, которая выводит на печать общее состояние операции потока данных.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Можно также использовать такие свойства, как IsCanceled, в теле задачи продолжения, чтобы определить дополнительные сведения о состоянии выполнения блока потока данных. Дополнительные сведения о задачах продолжения и о том, как они связаны с отменой и обработкой ошибок, см. в разделах Создание цепочки задач с помощью задач продолжения, Отмена задач и Обработка исключений.

Предопределенные типы блоков потоков данных

Библиотека потоков данных TPL предоставляет несколько предопределенных типов блоков потоков данных. Эти типы делятся на три категории: блоки буферизации, блоки выполнения и блоки группировки. В следующих подразделах описаны типы блоков, составляющие эти категории.

Блоки буферизации

Блоки буферизации хранят данные для их использования объектами-потребителями данных. Библиотека потоков данных TPL предоставляет три типа блоков буферизации: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T> и System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

Класс BufferBlock<T> представляет структуру общего назначения для асинхронного обмена сообщениями. В этом классе хранится очередь сообщений типа «первым вошел — первым вышел» (FIFO), в которую могут записывать данные несколько источников и из которой могут читать данные несколько целевых объектов. Если целевой объект получает сообщение от объекта BufferBlock<T>, это сообщение удаляется из очереди сообщений. Поэтому, хотя объект BufferBlock<T> может иметь несколько целевых объектов, каждое сообщение может быть получено только одним из них. Класс BufferBlock<T> удобен, если нужно передать несколько сообщений другому компоненту и этот компонент должен принять каждое сообщение.

Следующий пример отправляет несколько значений Int32 объекту BufferBlock<T>, а затем эти значения считываются из этого объекта.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Полный пример, в котором показано, как записывать сообщения в объект BufferBlock<T> и считывать сообщения из него, вы найдете в статье Практическое руководство. Запись и чтение сообщений в блоке потока данных.

Широковещательный блок<T>

Класс BroadcastBlock<T> удобен, если необходимо передать несколько сообщений другому компоненту, но этому компоненту нужно только самое последнее значение. Этот класс также может оказаться полезным при необходимости широковещательной передачи сообщения нескольким компонентам.

В следующем примере значение Double отправляется объекту BroadcastBlock<T>, а затем это значение считывается обратно из объекта несколько раз. Поскольку значения не удаляются из объектов BroadcastBlock<T> после их прочтения, одно и то же значение доступно каждый раз.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Полный пример, в котором показано, как использовать BroadcastBlock<T> для передачи сообщения нескольким целевым блокам, см. в статье Практическое руководство. Указание планировщика задач в блоке потока данных.

WriteOnceBlock<T>

Класс WriteOnceBlock<T> похож на класс BroadcastBlock<T>, но объект WriteOnceBlock<T> допускает только однократную запись. Можно рассматривать WriteOnceBlock<T> как аналог ключевого слова readonly в C# (ReadOnly в Visual Basic), но объект WriteOnceBlock<T> становится неизменяемым после того, как он получает значение, а не в момент создания. Как и в случае с классом BroadcastBlock<T>, когда целевой объект получает сообщение от объекта WriteOnceBlock<T>, это сообщение не удаляется. Поэтому копию сообщения могут получить несколько целевых объектов. Класс WriteOnceBlock<T> полезен, если требуется передать только первое из нескольких сообщений.

В следующем примере несколько значений String отправляется объекту WriteOnceBlock<T>, а затем это значение считывается обратно из этого объекта. Поскольку в объект WriteOnceBlock<T> можно сделать запись только один раз, после того, как объект WriteOnceBlock<T> получает сообщение, он отклоняет последующие сообщения.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Полный пример того, как применять WriteOnceBlock<T>, чтобы получить значение первой завершенной операции, см. в статье Практическое руководство. Удаление связей с блоками потоков данных.

Блоки выполнения

Блоки выполнения вызывают предоставленный пользователем делегат для каждого элемента полученных данных. Библиотека потоков данных TPL предоставляет три типа блоков выполнения: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> и System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

Класс ActionBlock<TInput> — целевой блок, который вызывает делегат при получении данных. Можно рассматривать объект ActionBlock<TInput> как делегат, который выполняется асинхронно, когда данные становятся доступными. Делегат, который предоставляется объекту ActionBlock<TInput>, может быть типа Action<T> или типа System.Func<TInput, Task>. При использовании объекта ActionBlock<TInput> с Action<T> обработка каждого входного элемента считается завершенной, когда возвращается делегат. При использовании объекта ActionBlock<TInput> с System.Func<TInput, Task> обработка каждого входного элемента считается завершенной, только если возвращенный объект Task завершен. С помощью двух этих механизмов можно использовать ActionBlock<TInput> как для синхронной, так и для асинхронной обработки каждого входного элемента.

В следующем примере несколько значений Int32 отправляется объекту ActionBlock<TInput>. Объект ActionBlock<TInput> выводит эти значения на консоль. Затем в этом примере производится установка блока в завершенное состояние, и происходит ожидание завершения всех задач потока данных.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Полные примеры того, как использовать делегат с классом ActionBlock<TInput>, см. в статье Практическое руководство. Выполнение действий при получении данных блоком потоков данных.

TransformBlock<TInput, TOutput>

Класс TransformBlock<TInput,TOutput> похож на класс ActionBlock<TInput> за исключением того, что он работает и как источник, и как целевой объект. Делегат, который передается объекту TransformBlock<TInput,TOutput>, возвращает значение типа TOutput. Делегат, который предоставляется объекту TransformBlock<TInput,TOutput>, может относиться к типу System.Func<TInput, TOutput> или типу System.Func<TInput, Task<TOutput>>. При использовании объекта TransformBlock<TInput,TOutput> с System.Func<TInput, TOutput> обработка каждого входного элемента считается завершенной, когда делегат возвращается. При использовании объекта TransformBlock<TInput,TOutput> с System.Func<TInput, Task<TOutput>> обработка каждого входного элемента считается завершенной, только если возвращаемый объект Task<TResult> завершен. Как и в случае с ActionBlock<TInput>, с помощью двух этих механизмов можно использовать TransformBlock<TInput,TOutput> как для синхронной, так и для асинхронной обработки каждого входного элемента.

В следующем примере создается объект TransformBlock<TInput,TOutput>, который вычисляет квадратный корень введенного значения. Объект TransformBlock<TInput,TOutput> принимает значения Int32 на входе и создает значения Double на выходе.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Полный пример использования TransformBlock<TInput,TOutput> в сети блоков потоков данных, в котором выполняется обработка изображений в приложении Windows Forms, см. в статье Пошаговое руководство. Использование потока данных в приложении Windows Forms.

TransformManyBlock<TInput, TOutput>

Класс TransformManyBlock<TInput,TOutput> похож на класс TransformBlock<TInput,TOutput> за исключением того, что для каждого входного значения TransformManyBlock<TInput,TOutput> создает ноль или более выходных значений вместо только одного значения. Делегат, который предоставляется объекту TransformManyBlock<TInput,TOutput>, может относиться к типу System.Func<TInput, IEnumerable<TOutput>> или типу System.Func<TInput, Task<IEnumerable<TOutput>>>. При использовании объекта TransformManyBlock<TInput,TOutput> с System.Func<TInput, IEnumerable<TOutput>> обработка каждого входного элемента считается завершенной, когда делегат возвращается. При использовании объекта TransformManyBlock<TInput,TOutput> с System.Func<TInput, Task<IEnumerable<TOutput>>> обработка каждого входного элемента считается завершенной, только если возвращаемый объект System.Threading.Tasks.Task<IEnumerable<TOutput>> завершен.

В следующем примере создается объект TransformManyBlock<TInput,TOutput>, который разделяет строки на отдельные цепочки символов. Объект TransformManyBlock<TInput,TOutput> принимает значения String на входе и создает значения Char на выходе.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Полный пример, в котором TransformManyBlock<TInput,TOutput> используется, чтобы создавать несколько независимых выходных значений для каждого входного элемента в конвейере потока данных, см. в статье Пошаговое руководство. Создание конвейера потока данных.

Степень параллелизма

Каждый из объектов ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput> буферизует входные сообщения до тех пор, пока блок не будет готов их обработать. По умолчанию эти классы обрабатывают сообщения в том порядке, в котором они поступают, по одному. Можно также указать степень параллелизма для включения объектов ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput> для одновременной обработки нескольких сообщений. Дополнительные сведения о параллельном выполнении см. в разделе "Определение степени параллелизма" далее в этом документе. Пример, в котором задается степень параллелизма для включения обработки блоком потока данных более одного сообщения одновременно, см. в разделе Практическое руководство. Указание степени параллелизма в блоке потока данных.

Сводка по типам делегатов

В следующей таблице перечислены типы делегатов, которые можно передать объектам ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput>. В этой таблице также указано, работает ли делегат данного типа синхронно или асинхронно.

Тип Синхронный тип делегата Асинхронный тип делегата
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Можно также использовать лямбда-выражения при работе с типами блоков выполнения. Пример, демонстрирующий способ использования лямбда-выражения с блоком выполнения, см. в разделе Практическое руководство. Выполнение действий при получении данных блоком потоков данных.

Группирующие блоки

Группирующие блоки объединяют данные из одного или более источников с различными ограничениями. Библиотека потоков данных TPL предоставляет три типа блоков объединения: BatchBlock<T>, JoinBlock<T1,T2> и BatchedJoinBlock<T1,T2>.

BatchBlock<T>

Класс BatchBlock<T> объединяет наборы входных данных, называемые пакетами, в массивы выходных данных. Укажите размер каждого пакета при создании объекта BatchBlock<T>. Когда объект BatchBlock<T> получает указанное число входных элементов, он асинхронно передает массив, содержащий эти элементы. Если объект BatchBlock<T> переведен в состояние завершения, но не содержит достаточно элементов для формирования пакета, он формирует и передает конечный массив, который содержит оставшиеся входные элементы.

Класс BatchBlock<T> работает либо в жадном, либо в нежадном режиме. В жадном режиме, используемом по умолчанию, объект BatchBlock<T> принимает каждое предлагаемое ему сообщение и передает массив, как только получит указанное количество элементов. В нежадном режиме объект BatchBlock<T> откладывает все входящие сообщения до тех пор, пока достаточное количество источников не предложит сообщения блоку для формирования пакета. Жадный режим обычно работает быстрее, чем нежадный режим, поскольку он требует меньше дополнительной нагрузки для обработки. Однако можно использовать нежадный режим, если необходимо координировать получение от нескольких источников атомарным образом. Определите нежадный режим, установив свойству Greedy значение False в параметре dataflowBlockOptions в конструкторе BatchBlock<T>.

Следующий пример отправляет несколько значений Int32 объекту BatchBlock<T>, содержащему десять элементов в пакете. Для обеспечения передачи всех значений из BatchBlock<T> в этом примере вызывается метод Complete. Метод Complete переводит объект BatchBlock<T> в состояние завершения, и поэтому объект BatchBlock<T> передает все оставшиеся элементы в качестве последнего пакета.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Полный пример использования BatchBlock<T>, чтобы повысить эффективность операций вставки в базу данных, см. в статье Пошаговое руководство. Повышение эффективности с помощью BatchBlock и BatchedJoinBlock.

JoinBlock<T1, T2, ...>

Классы JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> собирают входные элементы и распространяют объекты System.Tuple<T1,T2> или System.Tuple<T1,T2,T3>, содержащие эти элементы. Классы JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> не наследуются от ITargetBlock<TInput>. Вместо этого они предоставляют свойства Target1, Target2 и Target3, которые реализуют ITargetBlock<TInput>.

Как BatchBlock<T>, JoinBlock<T1,T2> и JoinBlock<T1,T2,T3> работают либо в жадном, либо в нежадном режиме. В жадном режиме, используемом по умолчанию, объект JoinBlock<T1,T2> или JoinBlock<T1,T2,T3> принимает каждое предлагаемое ему сообщение и распространяет кортеж после того, как каждый из его целевых объектов получает по меньшей мере одно сообщение. В нежадном режиме объект JoinBlock<T1,T2> или JoinBlock<T1,T2,T3> откладывает все входящие сообщения, пока всем целевым объектам не будут предложены данные, необходимые для создания кортежа. В этот момент блок использует протокол двухфазной фиксации для атомарного извлечения всех необходимых элементов из источника. Эта задержка дает возможность другой сущности в это же время получать эти данные, что позволяет системе в целом продолжать работу.

В следующем примере описан случай, в котором объект JoinBlock<T1,T2,T3> требует нескольких элементов данных для вычисления значения. В этом примере создается объект JoinBlock<T1,T2,T3>, который требует два значения Int32 и Char для выполнения арифметической операции.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Полный пример применения объектов JoinBlock<T1,T2> в нежадном режиме для совместного использования ресурсов см. в статье Практическое руководство. Использование JoinBlock для чтения данных из нескольких источников.

BatchedJoinBlock<T1, T2, ...>

Классы BatchedJoinBlock<T1,T2> и BatchedJoinBlock<T1,T2,T3> собирают пакеты входных элементов и распространяют объекты System.Tuple(IList(T1), IList(T2)) или System.Tuple(IList(T1), IList(T2), IList(T3)), содержащие эти элементы. Класс BatchedJoinBlock<T1,T2> можно рассматривать как сочетание BatchBlock<T> и JoinBlock<T1,T2>. Укажите размер каждого пакета во время создания объекта BatchedJoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> также предоставляет свойства, Target1 и Target2, которые реализуют ITargetBlock<TInput>. Если указанное число входных элементов получено от всех целевых объектов, объект BatchedJoinBlock<T1,T2> асинхронно распространяет объект System.Tuple(IList(T1), IList(T2)), содержащий эти элементы.

В следующем примере создается объект BatchedJoinBlock<T1,T2>, содержащий результаты, значения Int32 и ошибки, представленные объектами Exception. В этом примере выполняется несколько операций, результаты записываются в свойство Target1, а ошибки — в свойство Target2 объекта BatchedJoinBlock<T1,T2>. Поскольку число успешных и неудачных операций не известно заранее, объекты IList<T> позволяют каждому целевому объекту получать ноль и более значений.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Полный пример использования BatchedJoinBlock<T1,T2> для захвата как результатов, так и всех исключений, которые могут возникнуть при выполнении считывания данных из базы, см. в статье Пошаговое руководство. Повышение эффективности с помощью BatchBlock и BatchedJoinBlock.

Настройка поведения блоков потоков данных

Можно включить дополнительные параметры, предоставив объект System.Threading.Tasks.Dataflow.DataflowBlockOptions конструктору типов блоков потоков данных. Эти параметры управляют таким поведением, как у планировщика, который управляет основной задачей и степенью параллелизма. DataflowBlockOptions также содержит производные типы, которые задают поведение, характерное для некоторых типов блоков потоков данных. В следующей таблице перечислено, какие типы параметров связаны с каждым из типов блоков потоков данных.

Тип блока потока данных тип DataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

В следующих разделах приведены дополнительные сведения о важных типах параметров блоков потоков данных, доступных с помощью классов System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions и System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Определение планировщика заданий

Все стандартные блоки потоков данных используют механизм планирования задач TPL для выполнения таких действий, как передача данных целевому объекту, получение данных из источника и выполнение определенных пользователем делегатов, когда данные становятся доступны. TaskScheduler — абстрактный класс, представляющий планировщик заданий, ставящий задачи в очередь потоков. Планировщик заданий по умолчанию, Default, использует класс ThreadPool для постановки в очередь и выполнения работы. Можно переопределить планировщик заданий по умолчанию, установив свойство TaskScheduler при создании объекта блока потока данных.

Если один и тот же планировщик заданий управляет несколькими блоками потока данных, он может применять к ним определенные политики. Например, если каждый из нескольких блоков потока данных настроен для отдельного планировщика заданий одного объекта ConcurrentExclusiveSchedulerPair, вся работа, выполняемая в этих блоках, сериализуется. Аналогично, если эти блоки настроены для параллельного планировщика заданий одного объекта ConcurrentExclusiveSchedulerPair и этот планировщик настроен так, чтобы иметь максимальный уровень параллелизма, вся работа из этих блоков ограничивается заданным числом одновременных операций. Пример, в котором класс ConcurrentExclusiveSchedulerPair допускает параллельное выполнение операций чтения, а операции записи приостанавливают выполнение любых других операций, см. в статье Практическое руководство. Указание планировщика задач в блоке потока данных. Дополнительные сведения о планировщиках задач в TPL см. в разделе TaskScheduler.

Определение степени параллелизма

По умолчанию библиотека потоков данных TPL предоставляет три типа блоков выполнения: ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput>, которые обрабатывают одно сообщение за раз. Эти типы блоков потоков данных также обрабатывают сообщения в порядке их поступления. Чтобы обеспечить возможность обработки одновременно нескольких сообщений этими блоками потоков данных, задайте свойство ExecutionDataflowBlockOptions.MaxDegreeOfParallelism при создании объекта блока потока данных.

Значение по умолчанию у MaxDegreeOfParallelism — 1, оно гарантирует, что блок потока данных единовременно обрабатывает одно сообщение. Присвоение этому свойству значения более 1 активирует обработку блоком потока данных нескольких сообщений одновременно. Присвоение этому свойству значения DataflowBlockOptions.Unbounded позволяет основному планировщику задач управлять максимальной степенью параллелизма.

Внимание

При максимальной степени параллелизма, превышающей 1, несколько сообщений обрабатываются одновременно, поэтому сообщения не могут обрабатываться в порядке их получения. Однако порядок, в котором сообщения выводятся из блока, соответствует порядку, в котором они поступают в блок.

Поскольку свойство MaxDegreeOfParallelism представляет максимальную степень параллелизма, блок потока данных может выполняться с меньшей степенью параллелизма, чем задано. Блок потока данных может использовать меньшую степень параллелизма для удовлетворения функциональных требований или при нехватке доступных системных ресурсов. Блок потока данных никогда не выбирает степень параллелизма больше заданной.

Значение свойства MaxDegreeOfParallelism уникально для каждого объекта блока потока данных. Например, если для четырех объектов блока потока данных задана 1 как максимальная степень параллелизма, все четыре объекта блока потока данных потенциально могут выполняться параллельно.

Пример, в котором задается максимальная степень параллелизма для предоставления возможности параллельного выполнения продолжительных операций, см. в разделе Практическое руководство. Указание степени параллелизма в блоке потока данных.

Определение количества сообщений на задачу

Предопределенные типы блоков потока данных используют задачи для обработки нескольких входных элементов. Это позволяет минимизировать число объектов задачи, необходимых для обработки данных, из-за чего приложения выполняются более эффективно. Однако если задачи из одного набора блоков потока данных обрабатывают данные, задачи из других блоков потока данных быть вынуждены ожидать в течение времени, необходимого для обработки данных, добавляя сообщения в очередь. Чтобы обеспечить улучшенное распределение ресурсов между задачами потока данных, задайте значение для свойства MaxMessagesPerTask. Если свойству MaxMessagesPerTask задано значение DataflowBlockOptions.Unbounded, которое используется по умолчанию, задача, используемая блоком потока данных, обрабатывает столько сообщений, сколько их доступно. Если свойству MaxMessagesPerTask задано значение, отличное от Unbounded, блок потока данных обрабатывает не больше этого количества сообщений на объект Task. Хотя настройка свойства MaxMessagesPerTask может улучшить распределение ресурсов между задачами, это может привести к созданию в системе количества задач, превышающего необходимое количество, что может снизить производительность.

Включение отмены

TPL предоставляет механизм, который позволяет задачам координировать отмены согласованным образом. Чтобы позволить блокам потока данных участвовать в этом механизме отмены, задайте значение для свойства CancellationToken. Если объект CancellationToken переводится в состояние отмены, все блоки потока данных, которые отслеживают выполнение этого токена, завершат выполнение текущего элемента, но не начнут обрабатывать последующие. Эти блоки потока данных также очищают все буферизованные сообщения, прекращают подключения ко всем блокам источников и целевых объектов, и переходят в состояние отмены. Путем перехода в состояние отмены у свойства Completion свойству Status присваивается значение Canceled, если во время обработки не возникает исключения. В таком случае свойству Status присваивается значение Faulted.

Пример использования отмены в приложении Windows Forms см. в разделе Практическое руководство. Отмена блока потока данных. Подробнее об отмене в TPL читайте в разделе Отмена задач.

Определение жадного и нежадного поведения

Несколько группирующих типов блоков потоков данных могут работать либо в жадном, либо в нежадном режиме. По умолчанию стандартные типы блоков потоков данных работают в жадном режиме.

Для типов блоков соединения, таких как JoinBlock<T1,T2>, жадный режим означает, что блок сразу принимает данные, даже если соответствующие данные, с которыми нужно выполнить объединение, еще недоступны. Нежадный режим означает, что блок откладывает все входящие сообщения до тех пор, пока сообщение не будет доступно на каждом из его целевых объектов для завершения объединения. Если любое из отложенных сообщений больше недоступно, блоки соединения освобождают все отложенные сообщения и перезапускают процесс. Для класса BatchBlock<T> жадный и нежадный режим работают одинаково за исключением того, что в нежадном режиме объект BatchBlock<T> откладывает все входящие сообщения до тех пор, пока достаточное их количество не будет доступно из различных источников для заполнения пакета.

Для определения нежадного режима для блока потока данных задайте свойству Greedy значение False. Пример использования нежадного режима для включения нескольких блоков соединения для более эффективного совместного использования источника данных см. в разделе Практическое руководство. Использование JoinBlock для чтения данных из нескольких источников.

Пользовательские блоки потоков данных

Хотя библиотека потоков данных TPL предоставляет множество стандартных типов блоков, можно создавать дополнительные типы блоков, которые выполняют устанавливаемые пользователем функции. ISourceBlock<TOutput> Реализуйте или ITargetBlock<TInput> интерфейсы напрямую или используйте Encapsulate метод для создания сложного блока, который инкапсулирует поведение существующих типов блоков. Примеры, в которых демонстрируются способы реализации пользовательской функциональности блоков потока данных, см. в разделе Пошаговое руководство. Создание пользовательского типа блока потока данных.

Заголовок Description
Практическое руководство. Запись и чтение сообщений в блоке потока данных Здесь показано, как писать и считывать сообщения из объекта BufferBlock<T>.
Практическое руководство. Реализация шаблона потока данных "производитель-получатель" Здесь приводится описания способов использования модели потока данных для реализации шаблона производитель-потребитель, где производитель отправляет сообщения в блок потока данных, а потребитель считывает сообщения из этого блока.
Практическое руководство. Выполнение действий при получении данных блоком потоков данных Здесь приводится описание способов предоставления делегатов типам блоков выполнения потока данных, ActionBlock<TInput>, TransformBlock<TInput,TOutput> и TransformManyBlock<TInput,TOutput>.
Пошаговое руководство. Создание конвейера потока данных Здесь приводится способ создания конвейера потока данных, который загружает текст из Интернета и выполняет над ним операции.
Практическое руководство. Удаление связей с блоками потоков данных Здесь показано, как с помощью метода LinkTo удалить связь между целевым блоком и его источником после того, как источник отправил сообщение целевому объекту.
Пошаговое руководство. Использование потока данных в приложении Windows Forms Здесь демонстрируется, как создавать сеть блоков потока данных, которые выполняют обработку изображений в приложении Windows Forms.
Практическое руководство. Отмена блока потока данных Здесь демонстрируется способ использования отмены в приложении Windows Forms.
Практическое руководство. Использование JoinBlock для чтения данных из нескольких источников Здесь описывается способ использования класса JoinBlock<T1,T2> для выполнения операции, когда данные доступны из нескольких источников, и использования нежадного режима, чтобы позволить нескольким блокам объединения использовать общий источник данных более эффективно.
Практическое руководство. Указание степени параллелизма в блоке потока данных Здесь описывается, как задать свойство MaxDegreeOfParallelism, чтобы позволить блоку выполнения потока данных обрабатывать более одного сообщения единовременно.
Практическое руководство. Указание планировщика задач в блоке потока данных Здесь демонстрируется, как связать определенный планировщик задач при использовании потока данных в приложении.
Пошаговое руководство. Повышение эффективности с помощью BatchBlock и BatchedJoinBlock Здесь описывается, как использовать класс BatchBlock<T> для увеличения эффективности операций вставки в базу данных, а также как использовать класс BatchedJoinBlock<T1,T2> для получения как результата, так и всех исключений, возникающих при чтении данных из базы.
Пошаговое руководство. Создание пользовательского типа блока потока данных Здесь приводятся два способа создания типа блока потока данных, который реализует пользовательские функции.
Библиотека параллельных задач (TPL) Здесь приводится описание TPL — библиотеки, упрощающей параллельное и одновременное программирование в приложениях .NET Framework.