TaskScheduler Класс
Определение
Важно!
Некоторые сведения относятся к предварительной версии продукта, в которую до выпуска могут быть внесены существенные изменения. Майкрософт не предоставляет никаких гарантий, явных или подразумеваемых, относительно приведенных здесь сведений.
Представляет объект, обрабатывающий низкоуровневую работу задач очереди на потоки.
public ref class TaskScheduler abstract
public abstract class TaskScheduler
type TaskScheduler = class
Public MustInherit Class TaskScheduler
- Наследование
-
TaskScheduler
Примеры
В следующем примере создается настраиваемый планировщик задач, ограничивающий количество потоков, используемых приложением. Затем он запускает два набора задач и отображает сведения о задаче и потоке, на котором выполняется задача.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class Example
{
static void Main()
{
// Create a scheduler that uses two threads.
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(2);
List<Task> tasks = new List<Task>();
// Create a TaskFactory and pass it our custom scheduler.
TaskFactory factory = new TaskFactory(lcts);
CancellationTokenSource cts = new CancellationTokenSource();
// Use our factory to run a set of tasks.
Object lockObj = new Object();
int outputItem = 0;
for (int tCtr = 0; tCtr <= 4; tCtr++) {
int iteration = tCtr;
Task t = factory.StartNew(() => {
for (int i = 0; i < 1000; i++) {
lock (lockObj) {
Console.Write("{0} in task t-{1} on thread {2} ",
i, iteration, Thread.CurrentThread.ManagedThreadId);
outputItem++;
if (outputItem % 3 == 0)
Console.WriteLine();
}
}
}, cts.Token);
tasks.Add(t);
}
// Use it to run a second set of tasks.
for (int tCtr = 0; tCtr <= 4; tCtr++) {
int iteration = tCtr;
Task t1 = factory.StartNew(() => {
for (int outer = 0; outer <= 10; outer++) {
for (int i = 0x21; i <= 0x7E; i++) {
lock (lockObj) {
Console.Write("'{0}' in task t1-{1} on thread {2} ",
Convert.ToChar(i), iteration, Thread.CurrentThread.ManagedThreadId);
outputItem++;
if (outputItem % 3 == 0)
Console.WriteLine();
}
}
}
}, cts.Token);
tasks.Add(t1);
}
// Wait for the tasks to complete before displaying a completion message.
Task.WaitAll(tasks.ToArray());
cts.Dispose();
Console.WriteLine("\n\nSuccessful completion.");
}
}
// Provides a task scheduler that ensures a maximum concurrency level while
// running on top of the thread pool.
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
// Indicates whether the current thread is processing work items.
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
// The list of tasks to be executed
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
// The maximum concurrency level allowed by this scheduler.
private readonly int _maxDegreeOfParallelism;
// Indicates whether the scheduler is currently processing work items.
private int _delegatesQueuedOrRunning = 0;
// Creates a new instance with the specified degree of parallelism.
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
// Queues a task to the scheduler.
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
// Inform the ThreadPool that there's work to be executed for this scheduler.
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
// Attempts to execute the specified task on the current thread.
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued)
// Try to run the task.
if (TryDequeue(task))
return base.TryExecuteTask(task);
else
return false;
else
return base.TryExecuteTask(task);
}
// Attempt to remove a previously scheduled task from the scheduler.
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
// Gets the maximum concurrency level supported by this scheduler.
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
// Gets an enumerable of the tasks currently scheduled on this scheduler.
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks;
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
// The following is a portion of the output from a single run of the example:
// 'T' in task t1-4 on thread 3 'U' in task t1-4 on thread 3 'V' in task t1-4 on thread 3
// 'W' in task t1-4 on thread 3 'X' in task t1-4 on thread 3 'Y' in task t1-4 on thread 3
// 'Z' in task t1-4 on thread 3 '[' in task t1-4 on thread 3 '\' in task t1-4 on thread 3
// ']' in task t1-4 on thread 3 '^' in task t1-4 on thread 3 '_' in task t1-4 on thread 3
// '`' in task t1-4 on thread 3 'a' in task t1-4 on thread 3 'b' in task t1-4 on thread 3
// 'c' in task t1-4 on thread 3 'd' in task t1-4 on thread 3 'e' in task t1-4 on thread 3
// 'f' in task t1-4 on thread 3 'g' in task t1-4 on thread 3 'h' in task t1-4 on thread 3
// 'i' in task t1-4 on thread 3 'j' in task t1-4 on thread 3 'k' in task t1-4 on thread 3
// 'l' in task t1-4 on thread 3 'm' in task t1-4 on thread 3 'n' in task t1-4 on thread 3
// 'o' in task t1-4 on thread 3 'p' in task t1-4 on thread 3 ']' in task t1-2 on thread 4
// '^' in task t1-2 on thread 4 '_' in task t1-2 on thread 4 '`' in task t1-2 on thread 4
// 'a' in task t1-2 on thread 4 'b' in task t1-2 on thread 4 'c' in task t1-2 on thread 4
// 'd' in task t1-2 on thread 4 'e' in task t1-2 on thread 4 'f' in task t1-2 on thread 4
// 'g' in task t1-2 on thread 4 'h' in task t1-2 on thread 4 'i' in task t1-2 on thread 4
// 'j' in task t1-2 on thread 4 'k' in task t1-2 on thread 4 'l' in task t1-2 on thread 4
// 'm' in task t1-2 on thread 4 'n' in task t1-2 on thread 4 'o' in task t1-2 on thread 4
// 'p' in task t1-2 on thread 4 'q' in task t1-2 on thread 4 'r' in task t1-2 on thread 4
// 's' in task t1-2 on thread 4 't' in task t1-2 on thread 4 'u' in task t1-2 on thread 4
// 'v' in task t1-2 on thread 4 'w' in task t1-2 on thread 4 'x' in task t1-2 on thread 4
// 'y' in task t1-2 on thread 4 'z' in task t1-2 on thread 4 '{' in task t1-2 on thread 4
// '|' in task t1-2 on thread 4 '}' in task t1-2 on thread 4 '~' in task t1-2 on thread 4
// 'q' in task t1-4 on thread 3 'r' in task t1-4 on thread 3 's' in task t1-4 on thread 3
// 't' in task t1-4 on thread 3 'u' in task t1-4 on thread 3 'v' in task t1-4 on thread 3
// 'w' in task t1-4 on thread 3 'x' in task t1-4 on thread 3 'y' in task t1-4 on thread 3
// 'z' in task t1-4 on thread 3 '{' in task t1-4 on thread 3 '|' in task t1-4 on thread 3
Комментарии
Класс TaskScheduler представляет планировщик задач. Планировщик задач гарантирует, что работа задачи в конечном итоге выполняется.
Планировщик задач по умолчанию обеспечивает кражу работы для балансировки нагрузки, инъекцию и завершение потоков для максимальной пропускной способности и высокой общей производительности. Это должно быть достаточно для большинства сценариев.
Класс TaskScheduler также служит точкой расширения для всей настраиваемой логики планирования. К ним относятся такие механизмы, как планирование задачи для выполнения, а также способ предоставления запланированных задач отладчикам. Если вам требуются специальные функциональные возможности, можно создать настраиваемый планировщик и включить его для определенных задач или запросов.
Планировщик задач по умолчанию и пул потоков
Планировщик по умолчанию для библиотеки параллельных задач и PLINQ использует пул потоков .NET, представленный ThreadPool классом, для очереди и выполнения работы. Пул потоков использует информацию, предоставляемую типом Task, для эффективной поддержки мелкозернистого параллелизма (кратковременные единицы работы), которым часто представлены параллельные задачи и запросы.
Глобальная очередь и локальные очереди
Пул потоков поддерживает глобальную рабочую очередь FIFO (first-in, first-out) для потоков в каждом домене приложений. Всякий раз, когда программа вызывает метод ThreadPool.QueueUserWorkItem (или ThreadPool.UnsafeQueueUserWorkItem), работа помещается в эту общую очередь и в конечном итоге извлекается из очереди и выполняется на следующем потоке, который становится доступным. Начиная с .NET Framework 4, эта очередь использует алгоритм без блокировки, который напоминает ConcurrentQueue<T> класс. Используя эту неблокирующую реализацию, пул потоков тратит меньше времени на постановку в очередь и удаление из очереди рабочих элементов. Это преимущество производительности доступно для всех программ, использующих пул потоков.
Задачи верхнего уровня, которые не создаются в контексте другой задачи, помещаются в глобальную очередь так же, как и любой другой рабочий элемент. Однако вложенные или дочерние задачи, созданные в контексте другой задачи, обрабатываются совершенно иначе. Дочерняя или вложенная задача помещается в локальную очередь, относящуюся к потоку, в котором выполняется родительская задача. Родительская задача может быть задачей верхнего уровня или может быть дочерним элементом другой задачи. Когда этот поток готов выполнять новые задачи, он сначала проверяет локальную очередь. Если рабочие элементы находятся в ожидании, к ним можно быстро получить доступ. К локальным очередям обращаются в порядке захода последним, выход первым (LIFO), чтобы сохранить локальность кэша и уменьшить конкуренцию. Дополнительные сведения о дочерних задачах и вложенных задачах см. в разделе "Присоединенные и отсоединяемые дочерние задачи".
Использование локальных очередей не только снижает давление на глобальную очередь, но и использует преимущества локальной среды данных. Рабочие элементы в локальной очереди часто ссылаются на структуры данных, которые физически находятся рядом друг с другом в памяти. В таких случаях данные уже находится в кэше после выполнения первой задачи и могут быть доступны быстро. Параллельные LINQ (PLINQ) и Parallel класс используют вложенные задачи и дочерние задачи широко и обеспечивают значительные скорости с помощью локальных рабочих очередей.
Воровство работы
Начиная с .NET Framework 4, пул потоков также имеет алгоритм кражи работы, чтобы убедиться, что потоки не простаивают, а другие по-прежнему имеют задания в своих очередях. Когда поток пула потоков готов к дополнительной работе, он сначала смотрит на начало своей локальной очереди, затем в глобальной очереди и затем в локальные очереди других потоков. Если он находит рабочий элемент в локальной очереди другого потока, сначала применяет эвристики, чтобы убедиться, что он может эффективно выполнять работу. Если система может, она извлекает рабочий элемент из хвоста (в порядке FIFO). Это сокращает конкуренцию в каждой локальной очереди и сохраняет локальность данных. Эта архитектура помогает эффективно работать с балансировкой нагрузки пула потоков, чем предыдущие версии.
Длительные задачи
Возможно, вы хотите явно запретить помещать задачу в локальную очередь. Например, вы можете знать, что определенный рабочий элемент будет выполняться относительно долго и, скорее всего, будет блокировать все остальные рабочие элементы в локальной очереди. В этом случае можно указать параметр System.Threading.Tasks.TaskCreationOptions, который предоставляет планировщику намёк, что для задачи может потребоваться дополнительный поток. Это необходимо, чтобы не блокировать прогресс выполнения других потоков или рабочих элементов в локальной очереди. Используя этот параметр, вы полностью избегаете пула потоков, включая глобальные и локальные очереди.
Встраивание задач
В некоторых случаях, когда Task ожидается, он может выполняться синхронно в потоке, выполняющем операцию ожидания. Это повышает производительность, предотвращая необходимость дополнительного потока и вместо этого используя существующий поток, который был бы заблокирован в противном случае. Чтобы предотвратить ошибки из-за повторного входа, инлайнинг задач происходит только при обнаружении цели ожидания в локальной очереди данного потока.
Указание контекста синхронизации
Вы можете использовать метод TaskScheduler.FromCurrentSynchronizationContext для того, чтобы назначить выполнение задачи на определённом потоке. Это полезно в платформах, таких как Windows Forms и Windows Presentation Foundation, где доступ к объектам пользовательского интерфейса часто ограничен кодом, выполняющимся в том же потоке, на котором был создан объект пользовательского интерфейса.
В следующем примере метод TaskScheduler.FromCurrentSynchronizationContext используется в приложении Windows Presentation Foundation (WPF) для планирования задачи в том же потоке, на котором был создан элемент управления UI. В примере создается мозаика изображений, которые случайным образом выбираются из указанного каталога. Объекты WPF используются для загрузки и изменения размера изображений. Затем необработанные пиксели передаются в задачу, которая использует For цикл для записи данных пикселей в большой однобайтовый массив. Синхронизация не требуется, так как две плитки не занимают одни и те же элементы массива. Плитки также можно записать в любом порядке, так как их положение вычисляется независимо от любой другой плитки. Затем большой массив передается задаче, которая выполняется в потоке пользовательского интерфейса, где пиксельные данные загружаются в элемент управления изображением.
Пример перемещает данные из потока пользовательского интерфейса, изменяет его с помощью параллельных циклов и Task объектов, а затем передает его обратно в задачу, которая выполняется в потоке пользовательского интерфейса. Этот подход полезен при использовании параллельной библиотеки задач для выполнения операций, которые либо не поддерживаются API WPF, либо недостаточно быстро. Еще одним способом создания мозаики изображений в WPF является использование System.Windows.Controls.WrapPanel элемента управления и добавление изображений в него. Компонент WrapPanel выполняет работу по размещению плиток. Однако эта работа может выполняться только в потоке пользовательского интерфейса.
using System;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Media;
using System.Windows.Media.Imaging;
namespace WPF_CS1
{
/// <summary>
/// Interaction logic for MainWindow.xaml
/// </summary>
public partial class MainWindow : Window
{
private int fileCount;
int colCount;
int rowCount;
private int tilePixelHeight;
private int tilePixelWidth;
private int largeImagePixelHeight;
private int largeImagePixelWidth;
private int largeImageStride;
PixelFormat format;
BitmapPalette palette = null;
public MainWindow()
{
InitializeComponent();
// For this example, values are hard-coded to a mosaic of 8x8 tiles.
// Each tile is 50 pixels high and 66 pixels wide and 32 bits per pixel.
colCount = 12;
rowCount = 8;
tilePixelHeight = 50;
tilePixelWidth = 66;
largeImagePixelHeight = tilePixelHeight * rowCount;
largeImagePixelWidth = tilePixelWidth * colCount;
largeImageStride = largeImagePixelWidth * (32 / 8);
this.Width = largeImagePixelWidth + 40;
image.Width = largeImagePixelWidth;
image.Height = largeImagePixelHeight;
}
private void button_Click(object sender, RoutedEventArgs e)
{
// For best results use 1024 x 768 jpg files at 32bpp.
string[] files = System.IO.Directory.GetFiles(@"C:\Users\Public\Pictures\Sample Pictures\", "*.jpg");
fileCount = files.Length;
Task<byte[]>[] images = new Task<byte[]>[fileCount];
for (int i = 0; i < fileCount; i++)
{
int x = i;
images[x] = Task.Factory.StartNew(() => LoadImage(files[x]));
}
// When they've all been loaded, tile them into a single byte array.
var tiledImage = Task.Factory.ContinueWhenAll(
images, (i) => TileImages(i));
// We are currently on the UI thread. Save the sync context and pass it to
// the next task so that it can access the UI control "image".
var UISyncContext = TaskScheduler.FromCurrentSynchronizationContext();
// On the UI thread, put the bytes into a bitmap and
// display it in the Image control.
var t3 = tiledImage.ContinueWith((antecedent) =>
{
// Get System DPI.
Matrix m = PresentationSource.FromVisual(Application.Current.MainWindow)
.CompositionTarget.TransformToDevice;
double dpiX = m.M11;
double dpiY = m.M22;
BitmapSource bms = BitmapSource.Create(largeImagePixelWidth,
largeImagePixelHeight,
dpiX,
dpiY,
format,
palette, //use default palette
antecedent.Result,
largeImageStride);
image.Source = bms;
}, UISyncContext);
}
byte[] LoadImage(string filename)
{
// Use the WPF BitmapImage class to load and
// resize the bitmap. NOTE: Only 32bpp formats are supported correctly.
// Support for additional color formats is left as an exercise
// for the reader. For more information, see documentation for ColorConvertedBitmap.
BitmapImage bitmapImage = new BitmapImage();
bitmapImage.BeginInit();
bitmapImage.UriSource = new Uri(filename);
bitmapImage.DecodePixelHeight = tilePixelHeight;
bitmapImage.DecodePixelWidth = tilePixelWidth;
bitmapImage.EndInit();
format = bitmapImage.Format;
int size = (int)(bitmapImage.Height * bitmapImage.Width);
int stride = (int)bitmapImage.Width * 4;
byte[] dest = new byte[stride * tilePixelHeight];
bitmapImage.CopyPixels(dest, stride, 0);
return dest;
}
int Stride(int pixelWidth, int bitsPerPixel)
{
return (((pixelWidth * bitsPerPixel + 31) / 32) * 4);
}
// Map the individual image tiles to the large image
// in parallel. Any kind of raw image manipulation can be
// done here because we are not attempting to access any
// WPF controls from multiple threads.
byte[] TileImages(Task<byte[]>[] sourceImages)
{
byte[] largeImage = new byte[largeImagePixelHeight * largeImageStride];
int tileImageStride = tilePixelWidth * 4; // hard coded to 32bpp
Random rand = new Random();
Parallel.For(0, rowCount * colCount, (i) =>
{
// Pick one of the images at random for this tile.
int cur = rand.Next(0, sourceImages.Length);
byte[] pixels = sourceImages[cur].Result;
// Get the starting index for this tile.
int row = i / colCount;
int col = (int)(i % colCount);
int idx = ((row * (largeImageStride * tilePixelHeight)) + (col * tileImageStride));
// Write the pixels for the current tile. The pixels are not contiguous
// in the array, therefore we have to advance the index by the image stride
// (minus the stride of the tile) for each scanline of the tile.
int tileImageIndex = 0;
for (int j = 0; j < tilePixelHeight; j++)
{
// Write the next scanline for this tile.
for (int k = 0; k < tileImageStride; k++)
{
largeImage[idx++] = pixels[tileImageIndex++];
}
// Advance to the beginning of the next scanline.
idx += largeImageStride - tileImageStride;
}
});
return largeImage;
}
}
}
Чтобы создать пример, создайте проект приложения WPF в Visual Studio и назовите его WPF_CS1 (для проекта WPF C#) или WPF_VB1 (для проекта WPF Visual Basic). После этого выполните описанные ниже действия.
В режиме конструктора перетащите Image элемент управления из панели элементов управления в левый верхний угол области конструктора. В текстовом поле "Имя " окна "Свойства " присвойте элементу управления "изображение".
Перетащите Button элемент управления из панели элементов в левую часть окна приложения. В представлении XAML укажите свойство кнопки как "Создать мозаику" и укажите Content его Width свойство как "100". Подключите событие Click с обработчиком событий
button_Click, который определен в коде примера, добавивClick="button_Click"в элемент<Button>. В текстовом поле "Имя " окна "Свойства " назовите элемент управления "Button".Замените все содержимое файла MainWindow.xaml.cs или MainWindow.xaml.vb кодом из этого примера. Для проекта WPF C# убедитесь, что имя рабочей области соответствует имени проекта.
В примере считываются изображения JPEG из каталога C:\Users\Public\Pictures\Sample Pictures. Создайте каталог и поместите в него некоторые изображения или измените путь для ссылки на другой каталог, содержащий изображения.
В этом примере есть некоторые ограничения. Например, поддерживаются только 32-разрядные изображения на пиксель; изображения в других форматах повреждены BitmapImage объектом во время операции изменения размера. Кроме того, исходные изображения должны быть больше размера плитки. В качестве дальнейшего упражнения можно добавить функциональные возможности для обработки нескольких форматов пикселей и размеров файлов.
Конструкторы
| Имя | Описание |
|---|---|
| TaskScheduler() |
Инициализирует объект TaskScheduler. |
Свойства
| Имя | Описание |
|---|---|
| Current |
Возвращает связанный TaskScheduler с текущей задачей. |
| Default |
Возвращает экземпляр TaskScheduler по умолчанию, предоставляемый .NET. |
| Id |
Возвращает уникальный идентификатор для этого TaskScheduler. |
| MaximumConcurrencyLevel |
Указывает максимальный уровень параллелизма, TaskScheduler который может поддерживаться. |
Методы
| Имя | Описание |
|---|---|
| Equals(Object) |
Определяет, равен ли указанный объект текущему объекту. (Унаследовано от Object) |
| Finalize() |
Освобождает все ресурсы, связанные с этим планировщиком. |
| FromCurrentSynchronizationContext() |
Создает связанный с текущим TaskSchedulerSynchronizationContext. |
| GetHashCode() |
Служит хэш-функцией по умолчанию. (Унаследовано от Object) |
| GetScheduledTasks() |
Для поддержки отладчика создается только перечисление Task экземпляров, которые в настоящее время помещаются в очередь планировщику, ожидающего выполнения. |
| GetType() |
Возвращает Type текущего экземпляра. (Унаследовано от Object) |
| MemberwiseClone() |
Создает неглубокую копию текущей Object. (Унаследовано от Object) |
| QueueTask(Task) |
Task Очереди планировщика. |
| ToString() |
Возвращает строку, представляющую текущий объект. (Унаследовано от Object) |
| TryDequeue(Task) |
Пытается вывести объект Task , который ранее был помещен в очередь для этого планировщика. |
| TryExecuteTask(Task) |
Пытается выполнить предоставленный Task планировщик. |
| TryExecuteTaskInline(Task, Boolean) |
Определяет, может ли предоставленный Task объект выполняться синхронно в этом вызове и, если он может, выполняет его. |
События
| Имя | Описание |
|---|---|
| UnobservedTaskException |
Возникает, когда необработано исключение неисправной задачи запускает политику эскалации исключений, которая по умолчанию завершит процесс. |
Применяется к
Потокобезопасность
Все члены абстрактного TaskScheduler типа являются потокобезопасными и могут использоваться одновременно из нескольких потоков.