Прочитать на английском

Поделиться через


Как добавлять и извлекать элементы по отдельности в BlockingCollection

В этом примере показано, как добавлять и удалять элементы из BlockingCollection<T> как блокирующим, так и неблокирующим образом. Дополнительные сведения о BlockingCollection<T>см. в обзоре BlockingCollection .

Пример перечисления BlockingCollection<T> до тех пор, пока оно не станет пустым и больше элементы не будут добавлены, см. в разделе Как использовать ForEach для удаления элементов в BlockingCollection.

Пример 1

В первом примере показано, как добавлять и извлекать элементы, чтобы операции блокировались, если коллекция временно пуста (при извлечении), достигнута максимальная емкость (при добавлении) или истек указанный период времени ожидания. Обратите внимание, что блокировка максимальной емкости включена только при создании Объекта BlockingCollection с максимальной емкостью, указанной в конструкторе.

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        // Increase or decrease this value as desired.
        int itemsToAdd = 500;

        if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
        {
            int width = Math.Max(Console.BufferWidth, 80);
            int height = Math.Max(Console.BufferHeight, itemsToAdd * 2 + 3);

            // Preserve all the display output for Adds and Takes
            Console.SetBufferSize(width, height);
        }

        // A bounded collection. Increase, decrease, or remove the
        // maximum capacity argument to see how it impacts behavior.
        var numbers = new BlockingCollection<int>(50);

        // A simple blocking consumer with no cancellation.
        Task.Run(() =>
        {
            int i = -1;
            while (!numbers.IsCompleted)
            {
                try
                {
                    i = numbers.Take();
                }
                catch (InvalidOperationException)
                {
                    Console.WriteLine("Adding was completed!");
                    break;
                }
                Console.WriteLine($"Take:{i} ");

                // Simulate a slow consumer. This will cause
                // collection to fill up fast and thus Adds wil block.
                Thread.SpinWait(100000);
            }

            Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
        });

        // A simple blocking producer with no cancellation.
        Task.Run(() =>
        {
            for (int i = 0; i < itemsToAdd; i++)
            {
                numbers.Add(i);
                Console.WriteLine($"Add:{i} Count={numbers.Count}");
            }

            // See documentation for this method.
            numbers.CompleteAdding();
        });

        // Keep the console display open in debug mode.
        Console.ReadLine();
    }
}

Пример 2

Во втором примере показано, как добавлять и принимать элементы, чтобы операции не блокируются. Если элемент отсутствует, достигнута максимальная емкость привязанной коллекции или истек период времени ожидания, то операция TryAdd или TryTake возвращает значение false. Это позволяет потоку выполнять некоторые другие полезные действия в течение некоторого времени, а затем повторить попытку позже, чтобы получить новый элемент или попытаться добавить тот же элемент, который не удалось добавить ранее. Программа также демонстрирует, как осуществить отмену при доступе к BlockingCollection<T>.

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

class ProgramWithCancellation
{
    static int inputs = 2000;

    static void Main()
    {
        // The token source for issuing the cancelation request.
        var cts = new CancellationTokenSource();

        // A blocking collection that can hold no more than 100 items at a time.
        var numberCollection = new BlockingCollection<int>(100);

        if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
        {
            int width = Math.Max(Console.BufferWidth, 80);
            int height = Math.Max(Console.BufferHeight, 8000);

            // Preserve all the display output for Adds and Takes
            Console.SetBufferSize(width, height);
        }

        // The simplest UI thread ever invented.
        Task.Run(() =>
        {
            if (Console.ReadKey(true).KeyChar == 'c')
            {
                cts.Cancel();
            }
        });

        // Start one producer and one consumer.
        Task t1 = Task.Run(() => NonBlockingConsumer(numberCollection, cts.Token));
        Task t2 = Task.Run(() => NonBlockingProducer(numberCollection, cts.Token));

        // Wait for the tasks to complete execution
        Task.WaitAll(t1, t2);

        cts.Dispose();
        Console.WriteLine("Press the Enter key to exit.");
        Console.ReadLine();
    }

    static void NonBlockingConsumer(BlockingCollection<int> bc, CancellationToken ct)
    {
        // IsCompleted == (IsAddingCompleted && Count == 0)
        while (!bc.IsCompleted)
        {
            int nextItem = 0;
            try
            {
                if (!bc.TryTake(out nextItem, 0, ct))
                {
                    Console.WriteLine(" Take Blocked");
                }
                else
                {
                    Console.WriteLine($" Take:{nextItem}");
                }
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Taking canceled.");
                break;
            }

            // Slow down consumer just a little to cause
            // collection to fill up faster, and lead to "AddBlocked"
            Thread.SpinWait(500000);
        }

        Console.WriteLine("\r\nNo more items to take.");
    }

    static void NonBlockingProducer(BlockingCollection<int> bc, CancellationToken ct)
    {
        int itemToAdd = 0;
        bool success = false;

        do
        {
            // Cancellation causes OCE. We know how to handle it.
            try
            {
                // A shorter timeout causes more failures.
                success = bc.TryAdd(itemToAdd, 2, ct);
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Add loop canceled.");
                // Let other threads know we're done in case
                // they aren't monitoring the cancellation token.
                bc.CompleteAdding();
                break;
            }

            if (success)
            {
                Console.WriteLine($" Add:{itemToAdd}");
                itemToAdd++;
            }
            else
            {
                Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count);
                // Don't increment nextItem. Try again on next iteration.

                //Do something else useful instead.
                UpdateProgress(itemToAdd);
            }
        } while (itemToAdd < inputs);

        // No lock required here because only one producer.
        bc.CompleteAdding();
    }

    static void UpdateProgress(int i)
    {
        double percent = ((double)i / inputs) * 100;
        Console.WriteLine($"Percent complete: {percent}");
    }
}

См. также


Дополнительные ресурсы

Обучение

Модуль

Реализация типов коллекций - Training

Узнайте, как использовать средства C# для динамического управления группами объектов, обеспечивая безопасность типов и эффективную обработку данных.