- 浏览: 988327 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
PriorityBlockingQueue解析:http://donald-draper.iteye.com/blog/2364100
SynchronousQueue解析上-TransferStack:http://donald-draper.iteye.com/blog/2364622
SynchronousQueue解析下-TransferQueue:http://donald-draper.iteye.com/blog/2364842
从上面可以看出DelayQueue内在一个优先级队列,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
再来看相关操作:
add,put,超时offer操作都是委托给offer操作,来看offer操作:
}
从方法来看:
offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;
入队列后,检查队头元素,如果队头元素为当前先添加的元素,
则设置leader为null,唤醒等待available条件的线程。
再来看take操作:
从take方法,可以看出,首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;
自旋的过程为获取队头元素,如果队头为null,则等待available条件,
如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,
如果未过期,且leader不为null,则等待available条件,
如果未过期,且leader为null,则选举当前线程为leader,
以元素的delay为超时时间,超时等待触发available条件,
超时时间过后触发available条件,最后判断当前线程是否为leader
如果当前线程成为leader,超时等待后,take成功,则重置leader为null;
在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。
再看超时poll:
超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,
而超时poll为超时等待。
再看poll操作:
再看peek操作:
再看remove操作:
peek和remove操作直接委托给内部优先级队列。
drainTo操作:
drainTo操作是有peek和poll操作协作完成。
clear:
remainingCapacity:
size:
总结:
DelayQueue内在一个优先级队列PriorityQueue,用于存放元素,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;入队列后,检查队头元素,如果队头元素为当前先添加的元素,则设置leader为null,唤醒等待available条件的线程。add,put,超时offer操作都是委托给offer操作。
take操作首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;自旋的过程为获取队头元素,如果队头为null,则等待available条件,如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,如果未过期,且leader不为null,则等待available条件,
如果未过期,且leader为null,则选举当前线程为leader,以元素的delay为超时时间,超时等待触发available条件,超时时间过后触发available条件,最后判断当前线程是否为leader,如果当前线程成为leader,超时等待后,take成功,则重置leader为null;在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,而超时poll为超时等待。poll操作为如果队列为空,或队列头元素为过期,则返回null,否则返回队头的过期元素。
peek和remove,clear,size操作直接委托给内部优先级队列。drainTo操作是有peek和poll操作协作完成。
附:
//Delayed
有了前面队列文章的分析,LinkedBlockingDeque就没有什么值得看的,贴出源码,一看定义,就知道什么意思了,LinkedBlockingDeque就不在说了。
//LinkedBlockingDeque
Exchanger为双向队列模式同步队列的,这个我们抽时间在研究:
我们把Exchanger的JavaDoc放在这里
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
PriorityBlockingQueue解析:http://donald-draper.iteye.com/blog/2364100
SynchronousQueue解析上-TransferStack:http://donald-draper.iteye.com/blog/2364622
SynchronousQueue解析下-TransferQueue:http://donald-draper.iteye.com/blog/2364842
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An unbounded {@linkplain BlockingQueue blocking queue} of * <tt>Delayed</tt> elements, in which an element can only be taken * when its delay has expired. The [i]head[/i] of the queue is that * <tt>Delayed</tt> element whose delay expired furthest in the * past. If no delay has expired there is no head and <tt>poll</tt> * will return <tt>null</tt>. Expiration occurs when an element's * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less * than or equal to zero. Even though unexpired elements cannot be * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise * treated as normal elements. For example, the <tt>size</tt> method * returns the count of both expired and unexpired elements. * This queue does not permit null elements. * DelayQueue是一个延时元素无界非阻塞队列,在延时队列中,如果有一个延时时间耗尽, 则将会被消费take。队列的头部是延时时间即将过期或过期最早的元素。如果队列中没有 过期元素,那么poll操作将会返回null。判断一个元素是否过期的标准是,调用元素的 getDelay方法,如果返回的延时时间等于或小于零,即过期。未过期的元素不能被take或poll操作从队列中移除, 将被做为队列普通元素看待。size返回的队列中的过期与未过期元素。队列不允许为null元素的存在。 * <p>This class and its iterator implement all of the * [i]optional[/i] methods of the {@link Collection} and {@link * Iterator} interfaces. * * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <E> the type of elements held in this collection */ public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { //可重入锁 private transient final ReentrantLock lock = new ReentrantLock(); //优先级队列 private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. 这个线程用于等待队列头元素是否过期。本设计是 Leader-Follower模式的 变种,用为最小化必须要时间的等待。当一个线程成为leader时,则将会等待 下一个过期的元素,而其他线程等待是不确定性的。leader线程在take或poll操作 返回后,必须唤醒其他线程,除非其他线程在过渡期成为了leader。无论任何时候, 当一个更早过期的元素成为队头时,当前leader将会由于无效被设置为null,其他等待线程 将会被唤醒,而不是当前leader。 */ private Thread leader = null; /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. 当一个队列头的元素可以用,或一个新线程成为leader,条件将会触发 */ private final Condition available = lock.newCondition(); /** * Creates a new <tt>DelayQueue</tt> that is initially empty. 构造为空队列 */ public DelayQueue() {} }
从上面可以看出DelayQueue内在一个优先级队列,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
再来看相关操作:
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return offer(e); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @throws NullPointerException {@inheritDoc} */ public void put(E e) { offer(e); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @return <tt>true</tt> * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); }
add,put,超时offer操作都是委托给offer操作,来看offer操作:
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { //委托给内部优先级队列的offer操作 q.offer(e); if (q.peek() == e) { //检查队头元素,如果队头元素为当前先添加的元素,则设置leader为null,唤醒等待available条件的线程 leader = null; available.signal(); } return true; } finally { lock.unlock(); }
}
从方法来看:
offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;
入队列后,检查队头元素,如果队头元素为当前先添加的元素,
则设置leader为null,唤醒等待available条件的线程。
再来看take操作:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * 过期元素出队列,如果需要线程等待直到队列中有一个过期元素可以利用 * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //以可中断方式获取锁 lock.lockInterruptibly(); try { //自旋等待,直到有过期元素可以用 for (;;) { E first = q.peek();//获取队头元素 if (first == null) available.await();//如果队头为null,则等待available条件 else { //如果队头元素不为空,则获取元素延时时间 long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) //如果过期,则出队列 return q.poll(); else if (leader != null) //如果未过期,且leader不为null,则等待available条件 available.await(); else { //如果未过期,且leader为null,则选举当前线程为leader Thread thisThread = Thread.currentThread(); leader = thisThread; try { //以delay为超时时间,超时等待触发available条件 available.awaitNanos(delay); } finally { if (leader == thisThread) //当前线程成为leader,超时等待后,take成功,则重置leader为null leader = null; } } } } } finally { if (leader == null && q.peek() != null) //自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程 available.signal(); lock.unlock(); } }
从take方法,可以看出,首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;
自旋的过程为获取队头元素,如果队头为null,则等待available条件,
如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,
如果未过期,且leader不为null,则等待available条件,
如果未过期,且leader为null,则选举当前线程为leader,
以元素的delay为超时时间,超时等待触发available条件,
超时时间过后触发available条件,最后判断当前线程是否为leader
如果当前线程成为leader,超时等待后,take成功,则重置leader为null;
在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。
再看超时poll:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or <tt>null</tt> if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else //这一点与take不同,take为等待,poll为超时等待 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; if (nanos < delay || leader != null) //这一点与take不同,take为等待,poll为超时等待,如果nanos小于元素的延时时间,等待时间为超时等待时间nanos nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { //如果nanos大于元素的延时时间,在等待元素时间过期后,再等待nanos-delay+timeLeft(元素剩余等待时间) long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,
而超时poll为超时等待。
再看poll操作:
/** * Retrieves and removes the head of this queue, or returns <tt>null</tt> * if this queue has no elements with an expired delay. * * @return the head of this queue, or <tt>null</tt> if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); //如果队列为空,或队列头元素为过期,则返回null,否则返回队头的过期元素 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }
再看peek操作:
/** * Retrieves, but does not remove, the head of this queue, or * returns <tt>null</tt> if this queue is empty. Unlike * <tt>poll</tt>, if no expired elements are available in the queue, * this method returns the element that will expire next, * if one exists. * 查看队列头元素,不会移除元素,如果队列为空,返回null,如果没有元素过期, 则将会返回下一个即将过期的元素 * @return the head of this queue, or <tt>null</tt> if this * queue is empty. */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //委托给内部优先级队列,返回队头元素 return q.peek(); } finally { lock.unlock(); } }
再看remove操作:
/** * Removes a single instance of the specified element from this * queue, if it is present, whether or not it has expired. 委托给内部优先级队列,无论元素是否过期,只要元素相等,则移除 */ public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } }
peek和remove操作直接委托给内部优先级队列。
drainTo操作:
/** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (;;) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } } /** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException {@inheritDoc} * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; while (n < maxElements) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } }
drainTo操作是有peek和poll操作协作完成。
clear:
/** * Atomically removes all of the elements from this delay queue. * The queue will be empty after this call returns. * Elements with an unexpired delay are not waited for; they are * simply discarded from the queue. */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } }
remainingCapacity:
/** * Always returns <tt>Integer.MAX_VALUE</tt> because * a <tt>DelayQueue</tt> is not capacity constrained. * 为整数最大值 * @return <tt>Integer.MAX_VALUE</tt> */ public int remainingCapacity() { return Integer.MAX_VALUE; }
size:
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } }
总结:
DelayQueue内在一个优先级队列PriorityQueue,用于存放元素,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;入队列后,检查队头元素,如果队头元素为当前先添加的元素,则设置leader为null,唤醒等待available条件的线程。add,put,超时offer操作都是委托给offer操作。
take操作首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;自旋的过程为获取队头元素,如果队头为null,则等待available条件,如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,如果未过期,且leader不为null,则等待available条件,
如果未过期,且leader为null,则选举当前线程为leader,以元素的delay为超时时间,超时等待触发available条件,超时时间过后触发available条件,最后判断当前线程是否为leader,如果当前线程成为leader,超时等待后,take成功,则重置leader为null;在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,而超时poll为超时等待。poll操作为如果队列为空,或队列头元素为过期,则返回null,否则返回队头的过期元素。
peek和remove,clear,size操作直接委托给内部优先级队列。drainTo操作是有peek和poll操作协作完成。
附:
//Delayed
package java.util.concurrent; import java.util.*; /** * A mix-in style interface for marking objects that should be * acted upon after a given delay. * Delayed是一个用于标记一个线程或动作在多少延时后,被执行的迷你接口。 * <p>An implementation of this interface must define a * <tt>compareTo</tt> method that provides an ordering consistent with * its <tt>getDelay</tt> method. * * @since 1.5 * @author Doug Lea */ public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
有了前面队列文章的分析,LinkedBlockingDeque就没有什么值得看的,贴出源码,一看定义,就知道什么意思了,LinkedBlockingDeque就不在说了。
//LinkedBlockingDeque
package java.util.concurrent; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on * linked nodes. * * <p> The optional capacity bound constructor argument serves as a * way to prevent excessive expansion. The capacity, if unspecified, * is equal to {@link Integer#MAX_VALUE}. Linked nodes are * dynamically created upon each insertion unless this would bring the * deque above capacity. * * <p>Most operations run in constant time (ignoring time spent * blocking). Exceptions include {@link #remove(Object) remove}, * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link * #removeLastOccurrence removeLastOccurrence}, {@link #contains * contains}, {@link #iterator iterator.remove()}, and the bulk * operations, all of which run in linear time. * * <p>This class and its iterator implement all of the * <em>optional</em> methods of the {@link Collection} and {@link * Iterator} interfaces. * * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.6 * @author Doug Lea * @param <E> the type of elements held in this collection */ public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /* * Implemented as a simple doubly-linked list protected by a * single lock and using conditions to manage blocking. * * To implement weakly consistent iterators, it appears we need to * keep all Nodes GC-reachable from a predecessor dequeued Node. * That would cause two problems: * - allow a rogue Iterator to cause unbounded memory retention * - cause cross-generational linking of old Nodes to new Nodes if * a Node was tenured while live, which generational GCs have a * hard time dealing with, causing repeated major collections. * However, only non-deleted Nodes need to be reachable from * dequeued Nodes, and reachability does not necessarily have to * be of the kind understood by the GC. We use the trick of * linking a Node that has just been dequeued to itself. Such a * self-link implicitly means to jump to "first" (for next links) * or "last" (for prev links). */ /* * We have "diamond" multiple interface/abstract class inheritance * here, and that introduces ambiguities. Often we want the * BlockingDeque javadoc combined with the AbstractQueue * implementation, so a lot of method specs are duplicated here. */ private static final long serialVersionUID = -387911632671998426L; /** Doubly-linked list node class */ static final class Node<E> { /** * The item, or null if this node has been removed. */ E item; /** * One of: * - the real predecessor Node * - this Node, meaning the predecessor is tail * - null, meaning there is no predecessor */ Node<E> prev; /** * One of: * - the real successor Node * - this Node, meaning the successor is head * - null, meaning there is no successor */ Node<E> next; Node(E x) { item = x; } } /** * Pointer to first node. * Invariant: (first == null && last == null) || * (first.prev == null && first.item != null) */ transient Node<E> first; /** * Pointer to last node. * Invariant: (first == null && last == null) || * (last.next == null && last.item != null) */ transient Node<E> last; /** Number of items in the deque */ private transient int count; /** Maximum number of items in the deque */ private final int capacity; /** Main lock guarding all access */ final ReentrantLock lock = new ReentrantLock(); /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); /** Condition for waiting puts */ private final Condition notFull = lock.newCondition(); /** * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } /** * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. * * @param capacity the capacity of this deque * @throws IllegalArgumentException if {@code capacity} is less than 1 */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } }
Exchanger为双向队列模式同步队列的,这个我们抽时间在研究:
我们把Exchanger的JavaDoc放在这里
package java.util.concurrent; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.LockSupport; /** * A synchronization point at which threads can pair and swap elements * within pairs. Each thread presents some object on entry to the * {@link #exchange exchange} method, matches with a partner thread, * and receives its partner's object on return. An Exchanger may be * viewed as a bidirectional form of a {@link SynchronousQueue}. * Exchangers may be useful in applications such as genetic algorithms * and pipeline designs. * * <p><b>Sample Usage:</b> * Here are the highlights of a class that uses an {@code Exchanger} * to swap buffers between threads so that the thread filling the * buffer gets a freshly emptied one when it needs it, handing off the * filled one to the thread emptying the buffer. * <pre>{@code * class FillAndEmpty { * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); * DataBuffer initialEmptyBuffer = ... a made-up type * DataBuffer initialFullBuffer = ... * * class FillingLoop implements Runnable { * public void run() { * DataBuffer currentBuffer = initialEmptyBuffer; * try { * while (currentBuffer != null) { * addToBuffer(currentBuffer); * if (currentBuffer.isFull()) * currentBuffer = exchanger.exchange(currentBuffer); * } * } catch (InterruptedException ex) { ... handle ... } * } * } * * class EmptyingLoop implements Runnable { * public void run() { * DataBuffer currentBuffer = initialFullBuffer; * try { * while (currentBuffer != null) { * takeFromBuffer(currentBuffer); * if (currentBuffer.isEmpty()) * currentBuffer = exchanger.exchange(currentBuffer); * } * } catch (InterruptedException ex) { ... handle ...} * } * } * * void start() { * new Thread(new FillingLoop()).start(); * new Thread(new EmptyingLoop()).start(); * } * } * }</pre> * * <p>Memory consistency effects: For each pair of threads that * successfully exchange objects via an {@code Exchanger}, actions * prior to the {@code exchange()} in each thread * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * those subsequent to a return from the corresponding {@code exchange()} * in the other thread. * * @since 1.5 * @author Doug Lea and Bill Scherer and Michael Scott * @param <V> The type of objects that may be exchanged */ public class Exchanger<V>
发表评论
-
Executors解析
2017-04-07 14:38 1266ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4467ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2131ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4996Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9120Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6089Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3048Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20531Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1520Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1091Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1603Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1076Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1338package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1203/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1171Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1688package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2037线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 930Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2148Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析上-TransferStack
2017-03-21 22:08 3084Queue接口定义:http://donald-draper. ...
相关推荐
学习视频,可以丰富java知识。能够获得更多的专业技能
在Java编程中,DelayQueue是一种特殊的并发队列,它遵循先进先出(FIFO)原则,但具有一个独特的特性:元素只有在其指定的延迟时间过去之后才能被获取和处理。这个特性使得DelayQueue成为实现定时任务和延迟操作的...
DelayQueue的使用以及注意事项,这里需要由BlockingQueue的基本知识,一般的Queue的使用方法poll(),take(),drainTo()和offer(),put()这些应该懂。
本文将深入探讨如何利用Java中的`DelayQueue`和Redis来实现这一功能。`DelayQueue`是Java并发库`java.util.concurrent`中的一个数据结构,它是一个基于优先级队列的无界阻塞队列,可以用于存储具有延迟时间的元素。...
**延迟队列(DelayQueue)详解** 在Java的并发编程中,`DelayQueue`是一个非常特殊的队列,它属于并发包`java.util.concurrent`的一部分。`DelayQueue`是一个基于优先级队列(PriorityQueue)实现的无界阻塞队列,...
Spring Boot延时任务之DelayQueue的使用详解 DelayQueue是一个无界阻塞队列,只有在延迟期满时,才能从中提取元素。它提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。DelayQueue的元素...
"Java多线程并发开发之DelayQueue使用示例" DelayQueue是Java多线程并发开发中的一种常用的数据结构,它是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象。DelayQueue的主要作用是按照对象的延迟时间...
基于DelayQueue的简单的定时任务队列.zip Quick Start class Main { public static void main(String[] args) { // 初始化任务队列 JobScheduler scheduler = new JobScheduler("default"); // 向队列中提交任务...
local delayQueue implemented by JDK & two kinds of distributed delayQueue based redis 1. 基本介绍 RedisSynDelayQueue 基于redis,并发情况下会加分布式锁,单线程场景(syn=false)性能较好, 并发场景性能较...
该项目是SpringBoot框架下的延迟消息Starter,提供对DelayQueue、Redisson和RabbitMQ三种延迟消息机制的集成支持。项目包含32个文件,涵盖24个Java源文件、4个XML配置文件、1个Git忽略文件、1个Markdown文件、1个...
DelayQueue<Task> queue = new DelayQueue(); ``` 我们可以将 Task 对象添加到 DelayQueue 中,并使用.take()方法来获取队头对象,并执行对应的任务。 ``` queue.put(new Task(30 * 1000, new Runnable() { ...
### Java.util.concurrent_您不知道的5件事 #### 1. Semaphore(信号量) - **定义与作用**:`Semaphore` 类是一种控制多个线程访问共享资源的机制,它通过内部维护一个整数计数器(许可的数量)以及一组等待线程...
### Java企业版中性能调节的最佳实践 #### 一、引言 在当今高度竞争的商业环境中,企业级应用系统的性能优化至关重要。一个高效稳定的系统不仅能提高用户体验,还能降低运营成本,提升企业的竞争力。...
延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列呢? 一、背景 先看看一下业务场景: 1.会员过期前3天发送召回通知 2.订单支付成功后,5分钟后检测下游环节是否都正常...
标题 "Delayed interface and Delay Queue" 涉及到Java并发编程中的两个重要概念:Delayed接口和DelayQueue。这篇博文可能是作者Tomboxfan在iteye博客上分享关于这两个概念的深入理解和应用实例。 Delayed接口是...
- DelayQueue:用于延迟执行元素的无界阻塞队列。 - LinkedTransferQueue:基于链表的无界阻塞队列。 - LinkedBlockingDeque:基于链表的双端阻塞队列。 使用无界队列可能会导致内存飙升,因为队列会不断增长,如果...
Redis 是一个高性能的键值数据库,它支持多种数据结构,如字符串、哈希、列表、集合和有序集合。在本文中,我们将重点关注 Redis 如何实现简单队列,并通过使用 StackExchange.Redis C# 客户端进行操作。...
在IT行业中,队列是一种非常基础且重要的数据结构,它遵循先进先出(FIFO,First In First Out)的原则。本篇文章将详细讲解如何高效地实现队列,并结合提供的文件"Quene"来探讨可能的实现方式。...
在IT行业中,延迟队列是一种特殊的消息队列,它允许我们设定消息在特定时间后才被消费,这对于处理定时任务、订单超时、通知发送等场景非常有用。本项目是基于Go语言和Redis实现的延迟队列,借鉴了有赞(Zan)的设计...
重点介绍了ScheduledThreadPoolExecutor的内部工作机制,如何使用DelayQueue来存储等待执行的任务。DelayQueue内部实现了一个基于时间优先级的PriorityQueue,保证任务能按计划时间顺序执行。文档还详细描述了任务...