- 浏览: 985294 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
小节:
LinkedBlockingQueue是一个线程安全的阻塞并发队列,队列的顺序为FIFO,队列的中
节点包装者原始元素E,有一个后继链接,所以队列是单向的,队列的队头head和队尾节点last是傀儡节点,元素为null。队列有两把锁一个是takeLock,一个为putLock;消费者锁takeLock可以被take,poll等操作持有;生产者锁putLock,可以被put,offer等操作持有;同时有两个条件notEmpty和notFull,notEmpty是takeLock锁的条件,当队列为null,消费者等待的队列非空条件notEmpty;notFull为putLock的条件,为当队列满时,生产者等待队列条件,即队列非满条件notFull。队列中有一个AtomicInteger类型count,用于记录当前队列中元素的个数。
下面我们先来看put操作:
put操作首先以可中断方式获取锁,如果成功,则判断队列是否已满,
如果队列已满,则等待队列非满条件notFull,否则,添加元素节点到队列,
再次判断判断队列是否已满,如果没满,则唤醒一个等待notFull条件的put线程;
释放putLock。如果添加元素成功,则唤醒一个等待队列为notEmpty的take线程。
上面有两点需要关注:
1.
2.
从signalNotEmpty方法来看,先获取takeLock锁,再唤醒等待take的线程;为什么先获取
takeLock,而不是直接唤醒呢?这是为了在通知队列非空信息时,避免其他take线程的进入,
进行不必要的等待。
再来看Offer操作:
从上面来看put操作和offer操作的区别时,put是先获取putLock锁,再判断队列是否已满,已满则等待notFull条件;而offer是先判断队列是否已满,如果已满,则返回false,未满则获取putLock,后续操作相同。从分析来看,在我们向队列中添加元素时,如果使用offer,当队列已满的情况下,我们需要重新将元素放入队列,而put不需要我们再次这样操作,当队列满时,等待队列nullFull条件。
具体选哪一种,根据具体的场景去选择。
从上面来看,offer(E e, long timeout, TimeUnit unit)与put(E e,)更像,区别在于
当超时offer获取putLock锁成功后,如果队列已满,则超时等待notFull条件。
再来看take操作:
take操作,首先获取当前队列容量计数器,并以可中断方式获取takeLock,获取锁成功,则
判断队列是否为空,如果为空,则等待notEmpty条件,否则从队头取出元素,容量计数器减1,
如果队列中还有元素,则唤醒一个等待非空条件的take线程;如果在take前,队列容量已满,
则成功take后,唤醒等待notFull条件的put线程。
这里有两点要关注:
1.
2.
从signalNotFull可以看出,是先获取putLock,再唤醒等待put的线程,以防止
再唤醒的过程之前,有其他put线程进入,进行不必要的等待。
再看超时等待poll
从超时poll方法来看,与take方法的区别在于,当队列为null,进行超时等待。
再看poll方法:
从poll方法来看,与take方法的最大区别为先检查先检查队列是否为空,为空,则返回null;
不为空,剩下的操作与take相同。
再看peek检查元素;
peek操作首先检查队列是否为空,为空,则返回null,
否则获取takeLock锁,取出队头元素,并返回。
再看移除元素remove操作:
以上remove方法有3点要看:
1.
2.
3.
从remove方法来看,需要获取takeLock和putLock锁,遍历队列,比较元素是否相等,
相等则移除,则唤醒一个等待put的线程,最后释放takeLock和putLock锁。为什么要获取
两把锁呢,主要防止在移除的过程中,有线程消费元素,或生产元素,带来的不缺定性结果。
再来看包含contain操作:
contain操作与remove思路一样。
clear操作:
再来看将元素移到另一个集合的操作
//获取当前队列容量
//获取队列当前的剩余空间
迭代器:
从上来看,迭代器在构造时,需要两把锁put和take;获取next,也需要两把锁,
移除,则直接从队列中,移除。
序列化:
反序列化:
总结:
LinkedBlockingQueue是一个线程安全的阻塞并发队列,队列的顺序为FIFO,队列的中节点包装者原始元素E,有一个后继链接,所以队列是单向的,队列的队头head和队尾节点last是傀儡节点,元素为null。队列有两把锁一个是takeLock,一个为putLock;消费者锁takeLock可以被take,poll等操作持有;生产者锁putLock,可以被put,offer等操作持有;同时有两个条件notEmpty和notFull,notEmpty是takeLock锁的条件,当队列为null,消费者等待的队列非空条件notEmpty;notFull为putLock的条件,为当队列满时,生产者等待队列条件,即队列非满条件notFull。队列中有一个AtomicInteger类型count,用于记录当前队列中元素的个数。
put操作首先以可中断方式获取锁,如果成功,则判断队列是否已满,如果队列已满,则等待队列非满条件notFull,否则,添加元素节点到队列,再次判断判断队列是否已满,如果没满,则唤醒一个等待notFull条件的put线程;释放putLock。如果添加元素成功,获取takeLock锁,成功,则唤醒一个等待队列为notEmpty的take线程。put操作和offer操作的区别时,put是先获取putLock锁,再判断队列是否已满,已满则等待notFull条件;而offer是先判断队列是否已满,如果已满,则返回false,未满则获取putLock,后续操作相同。从分析来看,在我们向队列中添加元素时,如果使用offer,当队列已满的情况下,我们需要重新将元素放入队列,而put不需要我们再次这样操作,当队列满时,等待队列nullFull条件。具体选哪一种,根据具体的场景去选择。offer(E e, long timeout, TimeUnit unit)与put(E e,)更像,区别在于当超时offer获取putLock锁成功后,如果队列已满,则超时等待notFull条件。
take操作,首先获取当前队列容量计数器,并以可中断方式获取takeLock,获取锁成功,则
判断队列是否为空,如果为空,则等待notEmpty条件,否则从队头取出元素,容量计数器减1,如果队列中还有元素,则唤醒一个等待非空条件的take线程;如果在take前,队列容量已满,则成功take后,唤醒等待notFull条件的put线程。超时poll方法,与take方法的区别在于,当队列为null,进行超时等待。poll方法,与taket方法的最大区别为先检查先检查队列是否为空,为空,则返回null;不为空,剩下的操作与take相同。具体选哪一种,根据具体的场景去选择。
peek操作首先检查队列是否为空,为空,则返回null,否则获取takeLock锁,取出队头元素,并返回。
remove方法需要获取takeLock和putLock锁,遍历队列,比较元素是否相等,相等则移除,则唤醒一个等待put的线程,最后释放takeLock和putLock锁。为什么要获取两把锁呢,主要防止在移除的过程中,有线程消费元素,或生产元素,带来的不缺定性结果。contain操作与remove思路一样。
drainTo操作首先获取takeLock锁,从队头take,n个元素,并添加集合中,
如果成功移除元素,则唤醒等待put的线程。
在上面所有的操作中,我们看所有的唤醒都是signal而不是signalAll,那么为什么不总是使用signalAll替换signal呢? 假设有N个线程在条件队列中等待,调用signalAll会唤醒所有线程,然后这N个线程竞争同一个锁,最多只有一个线程能够得到锁,于是其它线程又回到挂起状态。这意味每一次唤醒操作可能带来大量的上下文切换(如果N比较大的话),同时有大量的竞争锁的请求。这对于频繁的唤醒操作而言性能上可能是一种灾难。如果说总是只有一个线程被唤醒后能够拿到锁,那么为什么不使用signal呢?所以某些情况下使用signal的性能是要高于signalAll的。如果满足下面的条件,可以使用单一的signal取代signalAll操作: 相同的等待者,也就是说等待条件变量的线程操作相同,每一个从wait条件发生时,执行相同的逻辑,同时一个条件变量的通知至多只能唤醒一个线程。
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
package java.util.concurrent; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; /** * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on * linked nodes. LinkedBlockingQueue是一个基于节点链接的可选是否有界的阻塞队列。 * This queue orders elements FIFO (first-in-first-out). * The [i]head[/i] of the queue is that element that has been on the * queue the longest time. * The [i]tail[/i] of the queue is that element that has been on the * queue the shortest time. New elements * are inserted at the tail of the queue, and the queue retrieval * operations obtain elements at the head of the queue. * Linked queues typically have higher throughput than array-based queues but * less predictable performance in most concurrent applications. * 队列元素的顺序是FIFO,head是待在队列中,最久的元素;tail则是最短的元素。新元素 插入时放在队列尾,消费时,则从head获取。相对基于数组的队列,链接队列有一个高效的吞吐量, 但是在大多数的并发应用中,性能是不可预测的。 * <p> The optional capacity bound constructor argument serves as a * way to prevent excessive queue expansion. The capacity, if unspecified, * is equal to {@link Integer#MAX_VALUE}. Linked nodes are * dynamically created upon each insertion unless this would bring the * queue above capacity. * LinkedBlockingQueue有一个待容量参数的构造函数,以防止扩展。如果没有明确指定容量参数, 则容量最大值为nteger#MAX_VALUE,在容量可用的情况下,每次插入队列元素时,动态创建新链接节点。 * <p>This class and its iterator implement all of the * [i]optional[/i] methods of the {@link Collection} and {@link * Iterator} interfaces. * LinkedBlockingQueue实现了所有Collection和Iterator接口 * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <E> the type of elements held in this collection * */ public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; /* * A variant of the "two lock queue" algorithm. The putLock gates * entry to put (and offer), and has an associated condition for * waiting puts. Similarly for the takeLock. The "count" field * that they both rely on is maintained as an atomic to avoid * needing to get both locks in most cases. Also, to minimize need * for puts to get takeLock and vice-versa, cascading notifies are * used. When a put notices that it has enabled at least one take, * it signals taker. That taker in turn signals others if more * items have been entered since the signal. And symmetrically for * takes signalling puts. Operations such as remove(Object) and * iterators acquire both locks. * LinkedBlockingQueue所使用的算法为two lock queue的变种。一把锁为putLock用于 put&offer,同时关联一个等待put等待条件。另一把锁为takeLock。为了避免在大多数的 可能需要两种锁的情况,用count属性来维持原子性。为了最小化这种需要,比如puts操作 需要takeLock,反之亦然,我们使用了cascading notifies(级联通知)。当有一个线程take 等待时,put通知,竟会在操作完时,唤醒take线程。take线程将会唤醒需要put的等待线程。 同样批量takes将会唤醒puts,比如remove操作和iterators,将获取两种锁。 * Visibility between writers and readers is provided as follows: * 在读写线程之间的可见性如下: * Whenever an element is enqueued, the putLock is acquired and * count updated. A subsequent reader guarantees visibility to the * enqueued Node by either acquiring the putLock (via fullyLock) * or by acquiring the takeLock, and then reading n = count.get(); * this gives visibility to the first n items. * 当一个元素进入队列时,将会获取putLock,并更新Count。通过fullyLock获取putLock或 获取takeLock保证读线程可见进入队列的元素节点,然后读取当前count值;这种机制保证了 元素的可见性。 * To implement weakly consistent iterators, it appears we need to * keep all Nodes GC-reachable from a predecessor dequeued Node. * That would cause two problems: 为了实现弱一致性的iterators,当前驱节点出队列时,我们需要保证所有的节点GC可达。 这样会出现两种问题: * - allow a rogue Iterator to cause unbounded memory retention * - cause cross-generational linking of old Nodes to new Nodes if * a Node was tenured while live, which generational GCs have a * hard time dealing with, causing repeated major collections. * However, only non-deleted Nodes need to be reachable from * dequeued Nodes, and reachability does not necessarily have to * be of the kind understood by the GC. We use the trick of * linking a Node that has just been dequeued to itself. Such a * self-link implicitly means to advance to head.next. */ 1.允许rogue(游手好闲,无用)Iterator引起内存泄漏 2.当节点存活在老年代,可能存在旧节点到新节点的交叉代连接,新生和老年的垃圾回收器 很难处理,这样就会引起重复的FULL GC。然而,已删除的节点,可以从出队列 节点达到,可达性不需要到达GC可以理解的那种。为了避免这种情况的发生, 我们出队列的节点只会连接到它自己。自连接意味着促使队列头元素前进。 /** * Linked list node class,节点 */ static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) 实际的后继节点链接,null意味为最后一个节点,无后继。 */ Node<E> next; Node(E x) { item = x; } } /** The capacity bound, or Integer.MAX_VALUE if none */ 队列容量,最大为Integer.MAX_VALUE private final int capacity; /** Current number of elements */ 当前元素的数量 private final AtomicInteger count = new AtomicInteger(0); /** * Head of linked list. * Invariant: head.item == null 头结点,不变的是头结点元素为null */ private transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null 尾结点,不变的是尾结点元素为null */ private transient Node<E> last; /** Lock held by take, poll, etc */ 消费者锁takeLock可以被take,poll等操作持有 private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ 当队列为null,消费者等待的条件,即队列非空条件notEmpty private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ 生产者锁putLock,可以被put,offer等操作持有 private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ 当队列满时,生产者等待队列条件,即队列非满条件notFull private final Condition notFull = putLock.newCondition(); /** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * 待容量参数的构造,初始化队列头节点与尾节点 * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } }
小节:
LinkedBlockingQueue是一个线程安全的阻塞并发队列,队列的顺序为FIFO,队列的中
节点包装者原始元素E,有一个后继链接,所以队列是单向的,队列的队头head和队尾节点last是傀儡节点,元素为null。队列有两把锁一个是takeLock,一个为putLock;消费者锁takeLock可以被take,poll等操作持有;生产者锁putLock,可以被put,offer等操作持有;同时有两个条件notEmpty和notFull,notEmpty是takeLock锁的条件,当队列为null,消费者等待的队列非空条件notEmpty;notFull为putLock的条件,为当队列满时,生产者等待队列条件,即队列非满条件notFull。队列中有一个AtomicInteger类型count,用于记录当前队列中元素的个数。
下面我们先来看put操作:
/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * 插入元素到队尾,如果需要,等待队列空间可利用,即非满 * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { //元素为null,抛出异常 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; //包装元素为节点,获取当前元素数量,同时以可中断方式获取putLock锁 Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. count计数器是条件等待的依据,不被lock锁保护。当其他puts线程 释放锁时,消费者可以消费元素,减少count,当其他线程等待put时, 如果容量空间可利用,则唤醒put等待线程。对于用count的等待线程,同样。 */ while (count.get() == capacity) { //如果队列已满,则等待队列非满条件notFull notFull.await(); } //将节点添加到队列 enqueue(node); //容量自增1 c = count.getAndIncrement(); if (c + 1 < capacity) //如果队列未满,则唤醒一个等待notFull条件的put线程 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) //队列放入元素成功,则唤醒一个等待队列为notEmpty的take线程; signalNotEmpty(); }
put操作首先以可中断方式获取锁,如果成功,则判断队列是否已满,
如果队列已满,则等待队列非满条件notFull,否则,添加元素节点到队列,
再次判断判断队列是否已满,如果没满,则唤醒一个等待notFull条件的put线程;
释放putLock。如果添加元素成功,则唤醒一个等待队列为notEmpty的take线程。
上面有两点需要关注:
1.
//将节点添加到队列 enqueue(node);
/** * Links node at end of queue. * * @param node the node */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
2.
if (c == 0) //队列放入元素成功,则唤醒一个等待队列为notEmpty的take线程; signalNotEmpty();
/** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ //唤醒一个等待take的线程,put/offer会调用此方法 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
从signalNotEmpty方法来看,先获取takeLock锁,再唤醒等待take的线程;为什么先获取
takeLock,而不是直接唤醒呢?这是为了在通知队列非空信息时,避免其他take线程的进入,
进行不必要的等待。
再来看Offer操作:
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //如果队列已满则返回false if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
从上面来看put操作和offer操作的区别时,put是先获取putLock锁,再判断队列是否已满,已满则等待notFull条件;而offer是先判断队列是否已满,如果已满,则返回false,未满则获取putLock,后续操作相同。从分析来看,在我们向队列中添加元素时,如果使用offer,当队列已满的情况下,我们需要重新将元素放入队列,而put不需要我们再次这样操作,当队列满时,等待队列nullFull条件。
具体选哪一种,根据具体的场景去选择。
/** * Inserts the specified element at the tail of this queue, waiting if * necessary up to the specified wait time for space to become available. * * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before space is available. * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; //超时等待 nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
从上面来看,offer(E e, long timeout, TimeUnit unit)与put(E e,)更像,区别在于
当超时offer获取putLock锁成功后,如果队列已满,则超时等待notFull条件。
再来看take操作:
public E take() throws InterruptedException { E x; int c = -1; //获取当前队列容量计数器,并以可中断方式获取takeLock, final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { //如果队列为空,等待notEmpty notEmpty.await(); } //从队列头获取元素 x = dequeue(); //计数器减1 c = count.getAndDecrement(); if (c > 1) //如果队列中还有元素,则唤醒一个等待非空条件的take线程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) //如果在take前,队列容量已满,则成功take后,唤醒等待notFull条件的put线程。 signalNotFull(); return x; }
take操作,首先获取当前队列容量计数器,并以可中断方式获取takeLock,获取锁成功,则
判断队列是否为空,如果为空,则等待notEmpty条件,否则从队头取出元素,容量计数器减1,
如果队列中还有元素,则唤醒一个等待非空条件的take线程;如果在take前,队列容量已满,
则成功take后,唤醒等待notFull条件的put线程。
这里有两点要关注:
1.
//从队列头获取元素 x = dequeue();
/** * Removes a node from head of queue. * * @return the node */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
2.
if (c == capacity) //如果在take前,队列容量已满,则成功take后,唤醒等待notFull条件的put线程。 signalNotFull();
/** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; //先获取putLock putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
从signalNotFull可以看出,是先获取putLock,再唤醒等待put的线程,以防止
再唤醒的过程之前,有其他put线程进入,进行不必要的等待。
再看超时等待poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
从超时poll方法来看,与take方法的区别在于,当队列为null,进行超时等待。
再看poll方法:
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) //先检查队列是否为空,为空,则返回null return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
从poll方法来看,与take方法的最大区别为先检查先检查队列是否为空,为空,则返回null;
不为空,剩下的操作与take相同。
再看peek检查元素;
public E peek() { //首先检查队列是否为空,为空,则返回null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //否则获取takeLock锁,取出队头元素 Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
peek操作首先检查队列是否为空,为空,则返回null,
否则获取takeLock锁,取出队头元素,并返回。
再看移除元素remove操作:
/** * Removes a single instance of the specified element from this queue, * if it is present. More formally, removes an element {@code e} such * that {@code o.equals(e)}, if this queue contains one or more such * elements. * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { //元素为null,返回false if (o == null) return false; //获取takeLock和putLock锁 fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //遍历队列,比较元素是否相等,相等则移除 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { //释放takeLock和putLock锁 fullyUnlock(); } }
以上remove方法有3点要看:
1.
//获取takeLock和putLock锁 fullyLock(); /** * Lock to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); }
2.
//遍历队列,比较元素是否相等,相等则移除 if (o.equals(p.item)) { unlink(p, trail); return true; }
/** * Unlinks interior Node p with predecessor trail. */ void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) //在移除元素的过程中,移除成功,则唤醒一个等待put的线程 notFull.signal(); }
3.
//释放takeLock和putLock锁 fullyUnlock();
/** * Unlock to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
从remove方法来看,需要获取takeLock和putLock锁,遍历队列,比较元素是否相等,
相等则移除,则唤醒一个等待put的线程,最后释放takeLock和putLock锁。为什么要获取
两把锁呢,主要防止在移除的过程中,有线程消费元素,或生产元素,带来的不缺定性结果。
再来看包含contain操作:
public boolean contains(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } }
contain操作与remove思路一样。
clear操作:
public void clear() { fullyLock(); try { for (Node<E> p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) notFull.signal(); } finally { fullyUnlock(); } }
再来看将元素移到另一个集合的操作
/** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { //委托给drainTo(c, Integer.MAX_VALUE) return drainTo(c, Integer.MAX_VALUE); } /** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; //获取takeLock锁 takeLock.lock(); try { int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { //从队头take,n个元素,并添加集合中 while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) //如果成功移除元素,则唤醒等待put的线程 signalNotFull(); } }
//获取当前队列容量
public int size() { return count.get(); }
//获取队列当前的剩余空间
public int remainingCapacity() { return capacity - count.get(); }
迭代器:
public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { /* * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */ private Node<E> current; private Node<E> lastRet; private E currentElement; Itr() { fullyLock(); try { current = head.next; if (current != null) currentElement = current.item; } finally { fullyUnlock(); } } public boolean hasNext() { return current != null; } /** * Returns the next live successor of p, or null if no such. * * Unlike other traversal methods, iterators need to handle both: * - dequeued nodes (p.next == p) * - (possibly multiple) interior removed nodes (p.item == null) */ private Node<E> nextNode(Node<E> p) { for (;;) { Node<E> s = p.next; if (s == p) return head.next; if (s == null || s.item != null) return s; p = s; } } public E next() { fullyLock(); try { if (current == null) throw new NoSuchElementException(); E x = currentElement; lastRet = current; current = nextNode(current); currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } } public void remove() { if (lastRet == null) throw new IllegalStateException(); fullyLock(); try { Node<E> node = lastRet; lastRet = null; for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (p == node) { unlink(p, trail); break; } } } finally { fullyUnlock(); } } }
从上来看,迭代器在构造时,需要两把锁put和take;获取next,也需要两把锁,
移除,则直接从队列中,移除。
序列化:
private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { fullyLock(); try { // Write out any hidden stuff, plus capacity s.defaultWriteObject(); // Write out all elements in the proper order. for (Node<E> p = head.next; p != null; p = p.next) s.writeObject(p.item); // Use trailing null as sentinel s.writeObject(null); } finally { fullyUnlock(); } }
反序列化:
/** * Reconstitute this queue instance from a stream (that is, * deserialize it). * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { // Read in capacity, and any hidden stuff s.defaultReadObject(); count.set(0); last = head = new Node<E>(null); // Read in all elements and place in queue for (;;) { @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) break; add(item); } }
总结:
LinkedBlockingQueue是一个线程安全的阻塞并发队列,队列的顺序为FIFO,队列的中节点包装者原始元素E,有一个后继链接,所以队列是单向的,队列的队头head和队尾节点last是傀儡节点,元素为null。队列有两把锁一个是takeLock,一个为putLock;消费者锁takeLock可以被take,poll等操作持有;生产者锁putLock,可以被put,offer等操作持有;同时有两个条件notEmpty和notFull,notEmpty是takeLock锁的条件,当队列为null,消费者等待的队列非空条件notEmpty;notFull为putLock的条件,为当队列满时,生产者等待队列条件,即队列非满条件notFull。队列中有一个AtomicInteger类型count,用于记录当前队列中元素的个数。
put操作首先以可中断方式获取锁,如果成功,则判断队列是否已满,如果队列已满,则等待队列非满条件notFull,否则,添加元素节点到队列,再次判断判断队列是否已满,如果没满,则唤醒一个等待notFull条件的put线程;释放putLock。如果添加元素成功,获取takeLock锁,成功,则唤醒一个等待队列为notEmpty的take线程。put操作和offer操作的区别时,put是先获取putLock锁,再判断队列是否已满,已满则等待notFull条件;而offer是先判断队列是否已满,如果已满,则返回false,未满则获取putLock,后续操作相同。从分析来看,在我们向队列中添加元素时,如果使用offer,当队列已满的情况下,我们需要重新将元素放入队列,而put不需要我们再次这样操作,当队列满时,等待队列nullFull条件。具体选哪一种,根据具体的场景去选择。offer(E e, long timeout, TimeUnit unit)与put(E e,)更像,区别在于当超时offer获取putLock锁成功后,如果队列已满,则超时等待notFull条件。
take操作,首先获取当前队列容量计数器,并以可中断方式获取takeLock,获取锁成功,则
判断队列是否为空,如果为空,则等待notEmpty条件,否则从队头取出元素,容量计数器减1,如果队列中还有元素,则唤醒一个等待非空条件的take线程;如果在take前,队列容量已满,则成功take后,唤醒等待notFull条件的put线程。超时poll方法,与take方法的区别在于,当队列为null,进行超时等待。poll方法,与taket方法的最大区别为先检查先检查队列是否为空,为空,则返回null;不为空,剩下的操作与take相同。具体选哪一种,根据具体的场景去选择。
peek操作首先检查队列是否为空,为空,则返回null,否则获取takeLock锁,取出队头元素,并返回。
remove方法需要获取takeLock和putLock锁,遍历队列,比较元素是否相等,相等则移除,则唤醒一个等待put的线程,最后释放takeLock和putLock锁。为什么要获取两把锁呢,主要防止在移除的过程中,有线程消费元素,或生产元素,带来的不缺定性结果。contain操作与remove思路一样。
drainTo操作首先获取takeLock锁,从队头take,n个元素,并添加集合中,
如果成功移除元素,则唤醒等待put的线程。
在上面所有的操作中,我们看所有的唤醒都是signal而不是signalAll,那么为什么不总是使用signalAll替换signal呢? 假设有N个线程在条件队列中等待,调用signalAll会唤醒所有线程,然后这N个线程竞争同一个锁,最多只有一个线程能够得到锁,于是其它线程又回到挂起状态。这意味每一次唤醒操作可能带来大量的上下文切换(如果N比较大的话),同时有大量的竞争锁的请求。这对于频繁的唤醒操作而言性能上可能是一种灾难。如果说总是只有一个线程被唤醒后能够拿到锁,那么为什么不使用signal呢?所以某些情况下使用signal的性能是要高于signalAll的。如果满足下面的条件,可以使用单一的signal取代signalAll操作: 相同的等待者,也就是说等待条件变量的线程操作相同,每一个从wait条件发生时,执行相同的逻辑,同时一个条件变量的通知至多只能唤醒一个线程。
发表评论
-
Executors解析
2017-04-07 14:38 1256ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4460ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2128ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4992Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9113Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6088Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3043Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20523Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1511Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1081Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1597Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1069Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1330package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1199/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1164Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1678package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2032线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 926Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1739Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2141Queue接口定义:http://donald-draper. ...
相关推荐
《LinkedBlockingQueue深度解析与应用实践》 在Java并发编程领域,`LinkedBlockingQueue`是一个不可或缺的工具。作为`java.util.concurrent`包中的一个线程安全的队列,它基于链表结构实现,具备先进先出(FIFO)的...
接下来,我们详细解析`LinkedBlockingQueue`的阻塞实现原理: 1. 入队操作`put(E e)`:如果队列已满(`count.get() == capacity`),入队操作会获取`putLock`并进入等待状态,直到`notFull`条件被满足(即队列中有...
熟悉CopyOnWriteArrayList、LinkedBlockingQueue等并发容器的使用场景。 3. **IO与NIO**:Java的IO流模型包括字节流、字符流、对象流和缓冲流。了解NIO(New IO)的非阻塞特性,Channel、Buffer、Selector等概念,...
以下是对JUC库的详细解析: 1. **线程池(ExecutorService)**: - `ExecutorService`是线程池的核心接口,它负责管理线程的创建和执行。通过`Executors`类提供的静态工厂方法,可以创建不同类型的线程池,如`...
# 掌握并发的钥匙:Java Executor框架深度解析 Java作为一种广泛应用的编程语言,自1995年由Sun Microsystems公司(现属Oracle公司)首次发布以来,已经发展成为软件开发领域的重要工具。Java的设计目标包括跨平台...
5. **阻塞队列**:如`ArrayBlockingQueue`、`LinkedBlockingQueue`,它们是线程安全的队列,用于存储待处理的任务,线程池会从队列中取出任务进行执行。 6. **并发集合**:Java并发包提供了线程安全的集合,如`...
3. **解析页面**:对下载的页面内容进行解析,提取出链接、文本等有用信息。这一步通常需要用到HTML解析库,如Jsoup。 4. **发现新链接**:在解析过程中找到新的URL,添加到待爬取队列。 5. **存储数据**:将抓取到...
这时,可以通过调整公平锁、容量大小、以及使用其他的并发数据结构如LinkedBlockingQueue来优化性能。 6. 应用场景:ArrayBlockingQueue常用于生产者-消费者模型,如多线程环境中的任务调度、消息传递等。它的固定...
本篇文章将深入探讨Java多线程以及分布式爬虫架构的原理,并通过示例代码进行详细解析。 首先,单线程爬虫在面对大规模网页抓取时,其效率低下和资源利用率低的问题尤为突出。为了解决这些问题,我们可以采用多线程...
在Java中,自Java 5.0起,`java.util.concurrent`包提供了多种阻塞队列的实现,例如`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等。这些类都实现了`java.util.concurrent.BlockingQueue`...
- LinkedBlockingQueue:基于链表的可选有界阻塞队列。 - SynchronousQueue:不存储元素的阻塞队列,提交任务必须等待另一个线程取走。 - PriorityBlockingQueue:具有优先级的无界阻塞队列。 - DelayQueue:用于...
new LinkedBlockingQueue(100000)); public static void main(String[] args) throws Exception { for (int i = 0; i ; i++) { es.execute(() -> { System.out.print(1); try { Thread.sleep(1000); } catch...
常见的有`ArrayBlockingQueue`、`LinkedBlockingQueue`和`SynchronousQueue`等。 6. `threadFactory`: 用于创建新线程的工厂,可以自定义线程的命名、优先级等。 7. `handler`: 当线程池无法接受新任务时,例如达到...
Java提供了多种实现阻塞队列的方式,例如使用`ArrayBlockingQueue`、`LinkedBlockingQueue`等预定义类,也可以使用`wait`/`notify`机制自定义实现。 #### 5. 生产者-消费者问题 - **题目**: 用Java写代码来解决...
5. **工作队列** (workQueue):用于存放待执行任务的队列,如ArrayBlockingQueue、LinkedBlockingQueue等。 6. **线程工厂** (threadFactory):创建新线程的工厂,用于定制线程的创建行为。 线程池的工作逻辑如下:...
private static final ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue(capacity), namedThreadFactory, new ...
这可以通过实现队列(如`java.util.concurrent.LinkedBlockingQueue`)或者使用布隆过滤器来实现。 5. **数据存储**:爬取到的数据通常需要保存到本地文件、数据库或云端。Java的标准库提供了文件I/O操作,而连接...
3. 工作队列:存放待执行任务的队列,可以是无界队列(如LinkedBlockingQueue)或有界队列(如ArrayBlockingQueue)。 4. 线程存活时间:当线程池中线程数超过核心线程数,且没有任务执行时,线程存活的最长时间。 ...
为了防止无限循环或者重复抓取同一个页面,我们可以使用队列(如`java.util.concurrent.LinkedBlockingQueue`)存储待抓取的URL,并使用集合(如`HashSet`)记录已访问的URL。 此外,为了避免对目标网站造成过大的...
- **LinkedBlockingQueue**:基于链表的阻塞队列。 - **ConcurrentHashMap**:线程安全的哈希表。 #### 三、Java虚拟机(JVM) - **对象的创建、内存布局和访问定位**:了解对象如何在JVM中创建、分配内存及如何被...