Практическое руководство. Указание планировщика задач в блоке потока данных
В этом документе приводятся способы привязки определенного планировщика задач при использовании потока данных в приложении. В этом пример используется класс System.Threading.Tasks.ConcurrentExclusiveSchedulerPair в приложении Windows Forms для указания того, когда активна задача чтения и когда активна задача записи. Здесь также используется метод TaskScheduler.FromCurrentSynchronizationContext, чтобы позволить блоку потока данных выполняться в потоке пользовательского интерфейса.
Примечание.
Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не поставляется с .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Проект и выполните поиск пакета System.Threading.Tasks.Dataflow
в Интернете. Вы также можете установить его, выполнив в .NET Core CLI команду dotnet add package System.Threading.Tasks.Dataflow
.
Создание приложения Windows Forms
Создайте проект Приложение Windows Forms на Visual C# или Visual Basic. На следующих этапах проекту дается название
WriterReadersWinForms
.В конструкторе форм главной формы Form1.cs (Form1.vb для Visual Basic) добавьте четыре элемента управления CheckBox. Установите свойству Text значение Reader 1 для
checkBox1
, Reader 2 дляcheckBox2
, Reader 3 дляcheckBox3
и Writer дляcheckBox4
. Задайте свойству Enabled каждого элемента управления значениеFalse
.Добавьте на форму элемент управления Timer. Установите свойство Interval в значение
2500
.
Добавление функциональных возможностей потока данных
В этом разделе описываются способы создания блоков потока данных, участвующих в приложении, и привязки каждого из них к планировщику задач.
Добавление функциональных возможностей потока данных в приложение
В проекте добавьте ссылку на System.Threading.Tasks.Dataflow.dll.
Убедитесь, что Form1.cs (Form1.vb для Visual Basic) содержит следующие
using
директивы (Imports
в Visual Basic).using System; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Windows.Forms;
Imports System.Threading Imports System.Threading.Tasks Imports System.Threading.Tasks.Dataflow
Добавьте данные-член BroadcastBlock<T> в класс
Form1
.// Broadcasts values to an ActionBlock<int> object that is associated // with each check box. BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
' Broadcasts values to an ActionBlock<int> object that is associated ' with each check box. Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
В конструкторе
Form1
после вызоваInitializeComponent
создайте объект ActionBlock<TInput>, который переключает состояние объектов CheckBox.// Create an ActionBlock<CheckBox> object that toggles the state // of CheckBox objects. // Specifying the current synchronization context enables the // action to run on the user-interface thread. var toggleCheckBox = new ActionBlock<CheckBox>(checkBox => { checkBox.Checked = !checkBox.Checked; }, new ExecutionDataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
' Create an ActionBlock<CheckBox> object that toggles the state ' of CheckBox objects. ' Specifying the current synchronization context enables the ' action to run on the user-interface thread. Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
В конструкторе
Form1
создайте объект ConcurrentExclusiveSchedulerPair и четыре объекта ActionBlock<TInput>, по одному объекту ActionBlock<TInput> для каждого объекта CheckBox. Для каждого ActionBlock<TInput> объекта укажите ExecutionDataflowBlockOptions объект, имеющий свойство, TaskScheduler заданное ConcurrentScheduler свойством для читателей, и ExclusiveScheduler свойство для модуля записи.// Create a ConcurrentExclusiveSchedulerPair object. // Readers will run on the concurrent part of the scheduler pair. // The writer will run on the exclusive part of the scheduler pair. var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(); // Create an ActionBlock<int> object for each reader CheckBox object. // Each ActionBlock<int> object represents an action that can read // from a resource in parallel to other readers. // Specifying the concurrent part of the scheduler pair enables the // reader to run in parallel to other actions that are managed by // that scheduler. var readerActions = from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3} select new ActionBlock<int>(milliseconds => { // Toggle the check box to the checked state. toggleCheckBox.Post(checkBox); // Perform the read action. For demonstration, suspend the current // thread to simulate a lengthy read operation. Thread.Sleep(milliseconds); // Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox); }, new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerPair.ConcurrentScheduler }); // Create an ActionBlock<int> object for the writer CheckBox object. // This ActionBlock<int> object represents an action that writes to // a resource, but cannot run in parallel to readers. // Specifying the exclusive part of the scheduler pair enables the // writer to run in exclusively with respect to other actions that are // managed by the scheduler pair. var writerAction = new ActionBlock<int>(milliseconds => { // Toggle the check box to the checked state. toggleCheckBox.Post(checkBox4); // Perform the write action. For demonstration, suspend the current // thread to simulate a lengthy write operation. Thread.Sleep(milliseconds); // Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox4); }, new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerPair.ExclusiveScheduler }); // Link the broadcaster to each reader and writer block. // The BroadcastBlock<T> class propagates values that it // receives to all connected targets. foreach (var readerAction in readerActions) { broadcaster.LinkTo(readerAction); } broadcaster.LinkTo(writerAction);
' Create a ConcurrentExclusiveSchedulerPair object. ' Readers will run on the concurrent part of the scheduler pair. ' The writer will run on the exclusive part of the scheduler pair. Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair() ' Create an ActionBlock<int> object for each reader CheckBox object. ' Each ActionBlock<int> object represents an action that can read ' from a resource in parallel to other readers. ' Specifying the concurrent part of the scheduler pair enables the ' reader to run in parallel to other actions that are managed by ' that scheduler. Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _ Select New ActionBlock(Of Integer)(Sub(milliseconds) ' Toggle the check box to the checked state. ' Perform the read action. For demonstration, suspend the current ' thread to simulate a lengthy read operation. ' Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox) Thread.Sleep(milliseconds) toggleCheckBox.Post(checkBox) End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler}) ' Create an ActionBlock<int> object for the writer CheckBox object. ' This ActionBlock<int> object represents an action that writes to ' a resource, but cannot run in parallel to readers. ' Specifying the exclusive part of the scheduler pair enables the ' writer to run in exclusively with respect to other actions that are ' managed by the scheduler pair. Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds) ' Toggle the check box to the checked state. ' Perform the write action. For demonstration, suspend the current ' thread to simulate a lengthy write operation. ' Toggle the check box to the unchecked state. toggleCheckBox.Post(checkBox4) Thread.Sleep(milliseconds) toggleCheckBox.Post(checkBox4) End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler}) ' Link the broadcaster to each reader and writer block. ' The BroadcastBlock<T> class propagates values that it ' receives to all connected targets. For Each readerAction In readerActions broadcaster.LinkTo(readerAction) Next readerAction broadcaster.LinkTo(writerAction)
В конструкторе
Form1
запустите объект Timer.// Start the timer. timer1.Start();
' Start the timer. timer1.Start()
В конструкторе форм главной формы создайте обработчик событий для события таймера Tick.
Реализуйте событие Tick для таймера.
// Event handler for the timer. private void timer1_Tick(object sender, EventArgs e) { // Post a value to the broadcaster. The broadcaster // sends this message to each target. broadcaster.Post(1000); }
' Event handler for the timer. Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick ' Post a value to the broadcaster. The broadcaster ' sends this message to each target. broadcaster.Post(1000) End Sub
Поскольку блок потока данных toggleCheckBox
работает с интерфейсом пользователя, важно, чтобы это действие происходило в потоке пользовательского интерфейса. Для этого во время строительства этот объект предоставляет ExecutionDataflowBlockOptions объект, которому TaskScheduler присвоено свойство TaskScheduler.FromCurrentSynchronizationContext. Метод FromCurrentSynchronizationContext создает объект TaskScheduler, выполняющий работу в текущем контексте синхронизации. Поскольку конструктор Form1
вызывается из потока пользовательского интерфейса, действие для блока потока данных toggleCheckBox
также выполняется в потоке пользовательского интерфейса.
В этом примере также используется класс ConcurrentExclusiveSchedulerPair, чтобы обеспечить возможность одновременной работы для нескольких блоков потока данных и монопольной работы еще одного блока потока данных в отношении остальных блоков потока данных, выполняемых в этом же объекте ConcurrentExclusiveSchedulerPair. Этот способ полезен, когда несколько блоков потока данных совместно используют ресурс, и некоторым требуется монопольный доступ к этому ресурсу, поскольку это исключают потребность вручную синхронизировать доступ к этому ресурсу. Исключение ручной синхронизации может сделать код более эффективным.
Пример
В следующем примере приведен полный код Form1.cs (Form1.vb для Visual Basic).
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace WriterReadersWinForms
{
public partial class Form1 : Form
{
// Broadcasts values to an ActionBlock<int> object that is associated
// with each check box.
BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
public Form1()
{
InitializeComponent();
// Create an ActionBlock<CheckBox> object that toggles the state
// of CheckBox objects.
// Specifying the current synchronization context enables the
// action to run on the user-interface thread.
var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
{
checkBox.Checked = !checkBox.Checked;
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
});
// Create a ConcurrentExclusiveSchedulerPair object.
// Readers will run on the concurrent part of the scheduler pair.
// The writer will run on the exclusive part of the scheduler pair.
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
// Create an ActionBlock<int> object for each reader CheckBox object.
// Each ActionBlock<int> object represents an action that can read
// from a resource in parallel to other readers.
// Specifying the concurrent part of the scheduler pair enables the
// reader to run in parallel to other actions that are managed by
// that scheduler.
var readerActions =
from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
select new ActionBlock<int>(milliseconds =>
{
// Toggle the check box to the checked state.
toggleCheckBox.Post(checkBox);
// Perform the read action. For demonstration, suspend the current
// thread to simulate a lengthy read operation.
Thread.Sleep(milliseconds);
// Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ConcurrentScheduler
});
// Create an ActionBlock<int> object for the writer CheckBox object.
// This ActionBlock<int> object represents an action that writes to
// a resource, but cannot run in parallel to readers.
// Specifying the exclusive part of the scheduler pair enables the
// writer to run in exclusively with respect to other actions that are
// managed by the scheduler pair.
var writerAction = new ActionBlock<int>(milliseconds =>
{
// Toggle the check box to the checked state.
toggleCheckBox.Post(checkBox4);
// Perform the write action. For demonstration, suspend the current
// thread to simulate a lengthy write operation.
Thread.Sleep(milliseconds);
// Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox4);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ExclusiveScheduler
});
// Link the broadcaster to each reader and writer block.
// The BroadcastBlock<T> class propagates values that it
// receives to all connected targets.
foreach (var readerAction in readerActions)
{
broadcaster.LinkTo(readerAction);
}
broadcaster.LinkTo(writerAction);
// Start the timer.
timer1.Start();
}
// Event handler for the timer.
private void timer1_Tick(object sender, EventArgs e)
{
// Post a value to the broadcaster. The broadcaster
// sends this message to each target.
broadcaster.Post(1000);
}
}
}
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Namespace WriterReadersWinForms
Partial Public Class Form1
Inherits Form
' Broadcasts values to an ActionBlock<int> object that is associated
' with each check box.
Private broadcaster As New BroadcastBlock(Of Integer)(Nothing)
Public Sub New()
InitializeComponent()
' Create an ActionBlock<CheckBox> object that toggles the state
' of CheckBox objects.
' Specifying the current synchronization context enables the
' action to run on the user-interface thread.
Dim toggleCheckBox = New ActionBlock(Of CheckBox)(Sub(checkBox) checkBox.Checked = Not checkBox.Checked, New ExecutionDataflowBlockOptions With {.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()})
' Create a ConcurrentExclusiveSchedulerPair object.
' Readers will run on the concurrent part of the scheduler pair.
' The writer will run on the exclusive part of the scheduler pair.
Dim taskSchedulerPair = New ConcurrentExclusiveSchedulerPair()
' Create an ActionBlock<int> object for each reader CheckBox object.
' Each ActionBlock<int> object represents an action that can read
' from a resource in parallel to other readers.
' Specifying the concurrent part of the scheduler pair enables the
' reader to run in parallel to other actions that are managed by
' that scheduler.
Dim readerActions = From checkBox In New CheckBox() {checkBox1, checkBox2, checkBox3} _
Select New ActionBlock(Of Integer)(Sub(milliseconds)
' Toggle the check box to the checked state.
' Perform the read action. For demonstration, suspend the current
' thread to simulate a lengthy read operation.
' Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox)
Thread.Sleep(milliseconds)
toggleCheckBox.Post(checkBox)
End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ConcurrentScheduler})
' Create an ActionBlock<int> object for the writer CheckBox object.
' This ActionBlock<int> object represents an action that writes to
' a resource, but cannot run in parallel to readers.
' Specifying the exclusive part of the scheduler pair enables the
' writer to run in exclusively with respect to other actions that are
' managed by the scheduler pair.
Dim writerAction = New ActionBlock(Of Integer)(Sub(milliseconds)
' Toggle the check box to the checked state.
' Perform the write action. For demonstration, suspend the current
' thread to simulate a lengthy write operation.
' Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox4)
Thread.Sleep(milliseconds)
toggleCheckBox.Post(checkBox4)
End Sub, New ExecutionDataflowBlockOptions With {.TaskScheduler = taskSchedulerPair.ExclusiveScheduler})
' Link the broadcaster to each reader and writer block.
' The BroadcastBlock<T> class propagates values that it
' receives to all connected targets.
For Each readerAction In readerActions
broadcaster.LinkTo(readerAction)
Next readerAction
broadcaster.LinkTo(writerAction)
' Start the timer.
timer1.Start()
End Sub
' Event handler for the timer.
Private Sub timer1_Tick(ByVal sender As Object, ByVal e As EventArgs) Handles timer1.Tick
' Post a value to the broadcaster. The broadcaster
' sends this message to each target.
broadcaster.Post(1000)
End Sub
End Class
End Namespace