Blocking Queue is something that we might demand constantly for various kinds of applications, where one thread might take data from the queue, and other threads might try to put data into the queue. Unlike other thread-safe queue, the blocking queue should have one unique requiremnt in that whether a thread tries to take item from the queue or a thread try to put an item into the queue, the calling thread might be blocked until certain condition is met, for the put thread, the condition is that the queue is not full, and for the take thrad, the condition being the queue is not empty! They will be unblock immediately once the blocking condition is lifted.
there are many way that you can implement the blocking queue. you can use WaitHandle with the test on the number of items in the concurrent collections, then you will be able to know when to block and when to notify the threads.
But the DotNet framework has provided a very neat means for you to implement the Blocking queue with the off-the-shelf System.Collection.Concurrent namespace.
From the above mentioned namespace, there are BlockingCollection<> and ConcurrentQueue<>, but there is no such ConcurrentQueue<>?? Why Microsoft forget to provide such an important FIFO class, which can be applied in many a place?
Ah, Microsoft has actually inventted a very smart trick here because if you take a look at the signature of the BlockingCollection, it is somethig like this:
[DebuggerDisplay("Count = {Count}, Type = {m_collection}")] [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))] [ComVisible(false)] public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable { public BlockingCollection(); public BlockingCollection(int boundedCapacity); public BlockingCollection(IProducerConsumerCollection<T> collection); public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity); // ... }
as you can see some overload of the constructors takes a IProducerConsumerCollection<T> as it parameter, and what are those IProducerConsumerCollection<T> are coming from?
Ah, if you take a look at the signature of the ConcurrentQueue<T>
public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IEnumerable<T>, ICollection, IEnumerable { //... }
Ah, so ConcurrentQueue is an instance of IProducerConsumerCollection<T>, For those of you who are interested to find ou wha the IProducerConsumerCollection<T> is, here it is the signature.
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable { void CopyTo(T[] array, int index); T[] ToArray(); bool TryAdd(T item); bool TryTake(out T item); }
So, basically IProducerConsumerCollction provides a TryAdd/TryTake pair with additional methods to supports copy to/from array.
So, back to our question, it sounds like that BlockingCollection is something that if you don't provide a underlying ProducerConsumer collection it will use a default one (not sure if that one is a FIFO, or FILO - you simply cannot make the assumption), but if you did, it will use the underlying collection that you provided (which means it will use the take/put characteristic of underlying collection, which means if you pass in a FIFO collection, then take/put will work in a FIFO manner ... The BlockingQueue serves as a container/wrapper of a ConcurrencyCollection with additional blocking ability.
Knowing that the work sounds almost trivial, you simply put a ConcurrentQueue<T> inside a BlockingQueue then you willget a blocking queue. But let's make a class for this.
public class BlockingQueue<T> : BlockingCollection<T> { #region ctor(s) public BlockingQueue() : base(new ConcurrentQueue<T>()) { } public BlockingQueue (int maxSize) : base (new ConcurrentQueue<T>(), maxSize) { } #endregion ctor(s) #region Methods /// <summary> /// Enqueue an Item /// </summary> /// <param name="item">Item to enqueue</param> /// <remarks>blocks if the blocking queue is full</remarks> public void Enqueue(T item) { Add(item); } /// <summary> /// Dequeue an item /// </summary> /// <param name="Item"></param> /// <returns>Item dequeued</returns> /// <remarks>blocks if the blocking queue is empty</remarks> public T Dequeue() { return Take(); } #endregion Methods }
Ah, we explicitly add Enquee and Dequeue to mock up a Queue operation (you can use directly the Take or Add methods as you like)
Let's put this into a multiple thread environment and give it a spin.
First, we declare a Blocking Collection.
public List<HashSet<ITabularPushCallback>> callbackChannelLists = new List<HashSet<ITabularPushCallback>>(); // this is the Blocking Queue List. public List<BlockingQueue<string[][]>> messageQueues = new List<BlockingQueue<string[][]>>();
and If there is data coming, one background thread will send a background thread to do Enqueue.
public void NotifyMesasge(string[][] messages, int tableId) { if (IsChannelRegistered(tableId)) { HashSet<ITabularPushCallback> callbackChannelList = null; BlockingQueue<string[][]> queue = null; lock (SyncObj) { callbackChannelList = new HashSet<ITabularPushCallback>(callbackChannelLists[tableId]); queue = messageQueues[tableId]; } if (callbackChannelList.Count > 0 && queue != null) { ThreadPool.QueueUserWorkItem((o => { if (queue != null) queue.Enqueue(messages); }), null); } } else { // throw or swallow? //throw new ArgumentOutOfRangeException("tableId", tableId, "Invalid callback channel"); } }
and there a dedicated Background thread on parameterized on each "tableId" , which peeks data from the Queue and do processing, here is the main code that does the Dequeue and processing.
private void NotifyMessageThunk(int tableId) { HashSet<ITabularPushCallback> callbackChannelList = null; BlockingQueue<string[][]> queue = null; lock (SyncObj) { if (tableId < 0 || tableId > callbackChannelLists.Count) throw new ArgumentOutOfRangeException("tableId", tableId, "Expected nonnegative number and existing tableId!"); if (!IsChannelRegistered(tableId)) { Thread.Sleep(100); // CPU effecient means. return; } callbackChannelList = GetCallbackChannel(tableId); queue = messageQueues[tableId]; if (queue == null) { Thread.Sleep(100); // CPU effecient boosts return; } } string[][] message = queue.Dequeue(); if (message != null) { HashSet<ITabularPushCallback> channelCopy = null; channelCopy = new HashSet<ITabularPushCallback>(callbackChannelList); foreach (var channel in channelCopy) { try { channel.NotifyMessage(message, tableId); } catch { // swallow? if (!TryRemoveCallbackChannel(channel, tableId)) { // Logs } } } } }
Pretty easy, isn't it?
Actually you can make you own blocking queue with primitives of WaitHandles , something as shown in the references list [0] where you can do something simiar to
class SizeQueue<T> { private readonly Queue<T> queue = new Queue<T>(); private readonly int maxSize; public SizeQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { Monitor.Wait(queue); } queue.Enqueue(item); if (queue.Count == 1) { // wake up any blocked dequeue Monitor.PulseAll(queue); } } } public T Dequeue() { lock (queue) { while (queue.Count == 0) { Monitor.Wait(queue); } T item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return item; } } }
The author, Marc Gravell, actually proposed a improved version which enales exiting cleanly. Below is his idea.
bool closing; public void Close() { lock(queue) { closing = true; Monitor.PulseAll(queue); } } public bool TryDequeue(out T value) { lock (queue) { while (queue.Count == 0) { if (closing) { value = default(T); return false; } Monitor.Wait(queue); } value = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return true; } }
And someone has geniously knocked up with Reactive extension, something such as this:
public class BlockingQueue<T> { private readonly Subject<T> _queue; private readonly IEnumerator<T> _enumerator; private readonly object _sync = new object(); public BlockingQueue() { _queue = new Subject<T>(); _enumerator = _queue.GetEnumerator(); } public void Enqueue(T item) { lock (_sync) { _queue.OnNext(item); } } public T Dequeue() { _enumerator.MoveNext(); return _enumerator.Current; } }
However, we can work more on the synchronization.
References:
相关推荐
在现代网络编程中,非阻塞套接字(non-blocking socket)和多路复用(multiplexing)是处理高并发连接的关键技术之一。这些技术能够帮助服务器高效地管理多个客户端连接,避免因等待某个操作完成而浪费资源。本文将...
《Fast Portable non-blocking network programming with Libevent》是一本关于Libevent库的书籍,它专注于指导读者如何使用Libevent 2.0或更高版本来编写快速、可移植且支持异步I/O的网络程序。本书的作者是Nick ...
异步阻塞队列npm install --save async-blocking-queueAsyncBlockingQueue import AsyncBlockingQueue from 'async-blocking-queue' ;var queue = new AsyncBlockingQueue ( ) ;// wait for next enqueue() ...
"let-prove-blocking-queue"项目显然旨在通过多种方法分析和证明阻塞队列的死锁状态。 死锁是并发编程中的一个重要问题,它通常涉及到四个条件:互斥、占有并等待、无剥夺和循环等待。在阻塞队列中,这四个条件可能...
const setBlocking = require ( 'set-blocking' )setBlocking ( true )console . log ( someLargeStringToOutput )历史背景/警告语创建它的目的是为了解决讨论的错误。 此错误出现在较新版本的Node.js( 0.12+ )上...
标题中的"Asynchronous, non-blocking SAP NW RFC SDK bindings for Node"是指为Node.js开发的一个库,它提供了异步、非阻塞的方式与SAP NetWeaver RFC SDK进行交互。SAP NetWeaver RFC SDK是SAP提供的一套软件开发...
《Blocking Queue 使用详解》 在Java并发编程中,Blocking Queue(阻塞队列)是一个非常重要的工具类,它结合了线程同步与队列数据结构,有效地实现了生产者消费者模式。 Blocking Queue接口位于java.util....
Tomcat8新版本特性: 1.支持servlet3.1, jsp 2.3, el表达式3.0 and Java WebSocket 1.0. 2.默认http与ajp请求实现non-blocking技术,即NIO技术。...6.新增AJP 连接采用了Servlet3.1的non-blocking IO。
标题中的"Asynchronous, non-blocking SAP NW RFC SDK bindings for Pyt" 指的是一种Python库,它提供了异步、非阻塞的接口来与SAP NetWeaver RFC(远程功能调用)SDK交互。SAP NW RFC SDK是SAP提供的一套开发工具,...
在csapp(《深入理解计算机系统》)这本书中,详细探讨了Cache的工作原理和优化技术,其中分块技术(Blocking)是提高Cache命中率的重要方法之一。分块技术的目的是通过组织数据结构为大块,即所谓的blocks(应用...
阻塞方式下的MPI实现laplace矩阵变化,对初学MPI的人还是很有帮助的
The switch fabric delivers strictly non-blocking connectivity while completely canceling the first-order crosstalk. The 4×4 switching circuit consists of eight silicon microring-based spatial (de-)...
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
官方离线安装包,亲测可用。使用rpm -ivh [rpm完整包名] 进行安装
jar包,亲测可用
jar包,亲测可用
jar包,亲测可用
jar包,亲测可用
jar包,亲测可用
experiments 里面包含运行实验的运行文件,先将其他八个程序打开之后最后打开 main client new 然后让其自动运行 结果会保存在一个.txt里面。 源代码 有main client new 和 一个关于随机数生成的头文件 和 子文件