`
wyf
  • 浏览: 437939 次
  • 性别: Icon_minigender_1
  • 来自: 唐山
社区版块
存档分类
最新评论

向集合添加优先级限制和阻塞功能

    博客分类:
  • C#
 
阅读更多

自定义集合类是一个基本优先级别队列,优先级别在其中表示为 System.Collections.Concurrent.ConcurrentQueue<T> 对象的数组。在每个队列中不进行其他排序。通过实现类中的 System.Collections.Concurrent.IProducerConsumerCollection<T> 接口,然后将类实例用作 System.Collections.Concurrent.BlockingCollection<T> 的内部存储机制,来向自定义集合类添加限制和阻塞功能。

namespace ProdConsumerCS
{
    using System;
    using System.Collections;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    // 实现具有边界和阻止功能的优先级队列。
    public class SimplePriorityQueue<TPriority, TValue> : IProducerConsumerCollection<KeyValuePair<int, TValue>>
    {
        // 数组中每一个队列表示同一个优先级. 
        // 队列中的元素具有相同的优先级
        private ConcurrentQueue<KeyValuePair<int, TValue>>[] _queues = null;

        // 内部存储的队列个数.
        private int priorityCount = 0;
        private int m_count = 0;

        public SimplePriorityQueue(int priCount)
        {
            this.priorityCount = priCount;
            _queues = new ConcurrentQueue<KeyValuePair<int, TValue>>[priorityCount];
            for (int i = 0; i < priorityCount; i++)
                _queues[i] = new ConcurrentQueue<KeyValuePair<int, TValue>>();
        }

        // IProducerConsumerCollection members
        public bool TryAdd(KeyValuePair<int, TValue> item)
        {
            _queues[item.Key].Enqueue(item);
            Interlocked.Increment(ref m_count);
            return true;
        }

        public bool TryTake(out KeyValuePair<int, TValue> item)
        {
            bool success = false;

            // Loop through the queues in priority order
            // looking for an item to dequeue.
            for (int i = 0; i < priorityCount; i++)
            {
                // Lock the internal data so that the Dequeue
                // operation and the updating of m_count are atomic.
                lock (_queues)
                {
                    success = _queues[i].TryDequeue(out item);
                    if (success)
                    {
                        Interlocked.Decrement(ref m_count);
                        return true;
                    }
                }
            }

            // If we get here, we found nothing. 
            // Assign the out parameter to its default value and return false.
            item = new KeyValuePair<int, TValue>(0, default(TValue));
            return false;
        }

        public int Count
        {
            get { return m_count; }
        }

        // Required for ICollection
        void ICollection.CopyTo(Array array, int index)
        {
            CopyTo(array as KeyValuePair<int, TValue>[], index);
        }

        // CopyTo is problematic in a producer-consumer.
        // The destination array might be shorter or longer than what 
        // we get from ToArray due to adds or takes after the destination array was allocated.
        // Therefore, all we try to do here is fill up destination with as much
        // data as we have without running off the end.                
        public void CopyTo(KeyValuePair<int, TValue>[] destination, int destStartingIndex)
        {
            if (destination == null) throw new ArgumentNullException();
            if (destStartingIndex < 0) throw new ArgumentOutOfRangeException();

            int remaining = destination.Length;
            KeyValuePair<int, TValue>[] temp = this.ToArray();
            for (int i = 0; i < destination.Length && i < temp.Length; i++)
                destination[i] = temp[i];
        }

        public KeyValuePair<int, TValue>[] ToArray()
        {
            KeyValuePair<int, TValue>[] result;

            lock (_queues)
            {
                result = new KeyValuePair<int, TValue>[this.Count];
                int index = 0;
                foreach (var q in _queues)
                {
                    if (q.Count > 0)
                    {
                        q.CopyTo(result, index);
                        index += q.Count;
                    }
                }
                return result;
            }
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }

        public IEnumerator<KeyValuePair<int, TValue>> GetEnumerator()
        {
            for (int i = 0; i < priorityCount; i++)
            {
                foreach (var item in _queues[i])
                    yield return item;
            }
        }

        public bool IsSynchronized
        {
            get
            {
                throw new NotSupportedException();
            }
        }

        public object SyncRoot
        {
            get { throw new NotSupportedException(); }
        }
    }

    public class TestBlockingCollection
    {
        static void Main()
        {

            int priorityCount = 7;
            SimplePriorityQueue<int, int> queue = new SimplePriorityQueue<int, int>(priorityCount);
            var bc = new BlockingCollection<KeyValuePair<int, int>>(queue, 50);


            CancellationTokenSource cts = new CancellationTokenSource();

            Task.Run(() =>
                {
                    if (Console.ReadKey(true).KeyChar == 'c')
                        cts.Cancel();
                });

            // Create a Task array so that we can Wait on it
            // and catch any exceptions, including user cancellation.
            Task[] tasks = new Task[2];

            // Create a producer thread. You can change the code to 
            // make the wait time a bit slower than the consumer 
            // thread to demonstrate the blocking capability.
            tasks[0] = Task.Run(() =>
            {
                // We randomize the wait time, and use that value
                // to determine the priority level (Key) of the item.
                Random r = new Random();

                int itemsToAdd = 40;
                int count = 0;
                while (!cts.Token.IsCancellationRequested && itemsToAdd-- > 0)
                {
                    int waitTime = r.Next(2000);
                    int priority = waitTime % priorityCount;
                    var item = new KeyValuePair<int, int>(priority, count++);

                    bc.Add(item);
                    Console.WriteLine("added pri {0}, data={1}", item.Key, item.Value);
                }
                Console.WriteLine("Producer is done adding.");
                bc.CompleteAdding();
            },
             cts.Token);

            //Give the producer a chance to add some items.
            Thread.SpinWait(1000000);

            // Create a consumer thread. The wait time is
            // a bit slower than the producer thread to demonstrate
            // the bounding capability at the high end. Change this value to see
            // the consumer run faster to demonstrate the blocking functionality
            // at the low end.

            tasks[1] = Task.Run(() =>
                {
                    while (!bc.IsCompleted && !cts.Token.IsCancellationRequested)
                    {
                        Random r = new Random();
                        int waitTime = r.Next(2000);
                        Thread.SpinWait(waitTime * 70);

                        // KeyValuePair is a value type. Initialize to avoid compile error in if(success)
                        KeyValuePair<int, int> item = new KeyValuePair<int, int>();
                        bool success = false;
                        success = bc.TryTake(out item);
                        if (success)
                        {
                            // Do something useful with the data.
                            Console.WriteLine("removed Pri = {0} data = {1} collCount= {2}", item.Key, item.Value, bc.Count);
                        }
                        else
                            Console.WriteLine("No items to retrieve. count = {0}", bc.Count);
                    }
                    Console.WriteLine("Exited consumer loop");
                },
                cts.Token);

            try {
                Task.WaitAll(tasks, cts.Token);
            }
            catch (OperationCanceledException e) {
                if (e.CancellationToken == cts.Token)
                    Console.WriteLine("Operation was canceled by user. Press any key to exit");
            }
            catch (AggregateException ae) {
                foreach (var v in ae.InnerExceptions)
                    Console.WriteLine(v.Message);
            }
            finally {
                cts.Dispose();
            }

            Console.ReadKey(true);

        }
    }

}

 默认情况下,System.Collections.Concurrent.BlockingCollection<T> 的存储为 System.Collections.Concurrent.ConcurrentQueue<T>

 

分享到:
评论

相关推荐

    java 面试题 集合

    - 泛型用于限制集合中元素类型,提高代码安全性和可读性。 - 通配符如"? extends T"用于限制操作的子类型。 7. **迭代器(Iterator)**: - 迭代器是遍历集合的标准方式,提供了`hasNext()`和`next()`方法。 - ...

    Java多线程运算集合

    当队列满时,往队列里添加元素的操作将会阻塞。 - **阻塞栈**: - `BlockingStack` 是一种支持阻塞操作的栈结构。 - 它的行为类似于阻塞队列,但在某些情况下可能更符合栈的使用场景。 #### 十五、Java线程:新...

    commons-collections4-4.0.rar

    `QueueDecorator`则提供了对标准Java Queue接口的增强,如添加阻塞、优先级等功能。 此外,该库还支持迭代器的工厂和策略,使得在处理迭代器时可以更加灵活。例如,`IteratorUtils`类提供了多种生成迭代器的方法,...

    数据结构知识梳理汇总文档

    常见的堆有最小堆和最大堆,常用于优先级队列和部分排序算法。 8. **图**是由节点和边构成的网络结构,用于表示对象之间的复杂关系,如社交网络、道路网络等。 在并发编程中,Java提供了多种线程安全的集合类,如...

    windows线程池,使用Windows自带的线程池api功能,比你写的线程池性能好得多

    线程池是多线程编程中的一个重要概念,它是一种线程使用模式,通过预先创建一组线程并维护一个线程集合来处理并发任务。在Windows操作系统中,内建的线程池API(Thread Pool API)提供了高效且灵活的线程管理机制,...

    C#多线程详细解答.pdf

    3. Mutex、Semaphore和SemaphoreSlim:用于限制同时访问资源的线程数量,实现线程同步。 四、线程间通信 1. WaitHandle集合:包括Mutex、Semaphore、EventWaitHandle等,通过WaitOne、SignalOne等方法实现线程间的...

    .net版本简单线程调用源码2019

    - **Concurrent collections**:如`ConcurrentBag`, `ConcurrentDictionary`, `ConcurrentQueue` 和 `ConcurrentStack`,这些集合类在多线程环境中提供线程安全的添加和删除操作。 10. **线程本地存储**: - **...

    Core Java 2 Volume II 7th Edition

    Java集合框架提供了一套统一的设计模式来实现集合,使得集合的使用更加方便和高效。 #### 2.4 算法 集合框架还包括了对集合操作的一些算法支持,如排序、查找等。 #### 2.5 遗留集合 遗留集合是指Java早期版本中...

    C++实现线程池源文件

    - 工作队列管理:考虑使用优先级队列或阻塞队列以优化任务调度。 总的来说,"C++实现线程池源文件"是一个实践性很强的学习资料,通过阅读和理解代码,可以深入学习C++多线程编程、线程池的设计和实现,以及相关...

    C#多线程编程实战 源代码

    - `QueueUserWorkItem`和`Execute`方法:将工作项添加到线程池中。 6. **并发集合**: - `ConcurrentDictionary, TValue&gt;`:线程安全的字典,允许多个线程同时读写。 - `ConcurrentQueue&lt;T&gt;`和`ConcurrentStack...

    c#,多线程详解,对于初学者有一定用途

    `Thread.CurrentThread.Priority`可以设置线程的优先级,但应谨慎使用,避免优先级反转和饥饿问题。 七、线程安全与并发问题 线程安全是指在多线程环境下,对共享资源的操作不会导致数据不一致。例如,`System....

    C++API事件选择模型

    - **优先级策略**:根据事件的重要性和紧急程度进行处理,提高系统响应速度。 - **工作队列策略**:将就绪事件放入工作队列,由工作线程池进行处理,平衡CPU使用。 6. **事件驱动编程** - 在事件选择模型中,...

    JAVA 面试 问题和答案

    1.1 封装(Encapsulation):封装是面向对象编程的一个核心概念,它涉及到将数据(属性)和代码(方法)捆绑在一起,限制对内部状态的直接访问,仅通过公开的接口与对象交互。 1.2 多态(Polymorphism):多态是面向对象...

    自己收集的超全的java笔试题、面试题

    - 运算符:包括算术运算符、关系运算符、逻辑运算符等,以及它们的优先级和结合性。 - 控制流程:if语句、switch语句、for循环、while循环和do-while循环是控制程序执行流程的关键。 2. **面向对象**: - 类与...

    多线程编程.docx

    同样地,当队列满时,向队列中添加元素的操作也会被阻塞,直到队列中有空余空间。 - **ArrayBlockingQueue**:一个有界的阻塞队列,基于数组实现,一旦初始化后容量固定不变。 - **DelayQueue**:一个特殊类型的...

    J2ME考试试题集

    - **线程状态**:线程的状态包括新建、就绪、运行、阻塞和死亡等。如果一个线程处于死亡状态,则表明该线程已经结束运行,并且不能再被重新启动。因此,它不再是一个`Runnable`线程。 - **线程调度**:在Java中,较...

    大数据基础复习

    BlockQueue是Java并发编程中的重要工具,它是一种特殊的队列,当队列满时,生产者尝试添加元素会阻塞,直到队列有空位;当队列为空时,消费者尝试取出元素也会阻塞,直到队列中有元素可用。BlockQueue提供了一种线程...

    C#多线程教程,经典清析教程

    C#提供了一些线程安全的集合类,如`ConcurrentQueue`和`ConcurrentStack`,以及线程安全的`Interlocked`类,它们提供了原子操作,确保在多线程环境下的安全性。 8. **线程异常处理**:线程中抛出的异常不会自动传播...

    c#+多线程学习

    线程池是系统管理的一组线程集合,用于执行可重用或短生命周期的任务,避免频繁创建和销毁线程的开销。`ThreadPool.QueueUserWorkItem`方法可以将任务添加到线程池。 ```csharp ThreadPool.QueueUserWorkItem(new ...

Global site tag (gtag.js) - Google Analytics