//一个基于而为堆的优先级队列,它是无界的。
//先看构造函数:
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
//是否需要堆化
boolean heapify = true; // true if not known to be in heap order
//是否需要检测null
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
//生成堆
heapify();
}
//添加元素
public boolean add(E e) {
return offer(e);
}
//添加元素不阻塞
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
//长度+1
size = n + 1;
//当前队列已经不为空了唤醒在notEmpty上等待的线程。
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
//扩容。这里处理的很精巧利用CAS进行扩容。
private void tryGrow(Object[] array, int oldCap) {
//这里释放了锁
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
/**利用CAS设置allocationSpinLock状态成功则扩容。注意看这段代码虽然采用CAS机制但是还是有并发问题.
关键在于下面的一句判断&& queue == array关键就在这里,这里判断了相等才会复制 */
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
//扩容完后置为0
allocationSpinLock = 0;
}
}
//其他线程扩容了交给其他线程执行
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
// queue == array这个判断很重要这个判断判断了只有一个线程扩容成功,其他扩容的线程都失败。
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
//添加元素不阻塞
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
//只获取头元素不阻塞
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
//获取头元素并移除
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
//在一定时间内获取并移除某个元素
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//当队列中有元素并且没有超时的情况下,在notEmpty条件上等待队列变为非空。
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
//可以阻塞的获取元素直接队列非空。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
//获取队列大小
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
//无界队列所以返回最大值。
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
//返回元素的比较器
public Comparator<? super E> comparator() {
return comparator;
}
//返回是否包含某元素
public boolean contains(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return indexOf(o) != -1;
} finally {
lock.unlock();
}
}
private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
for (int i = 0; i < n; i++)
if (o.equals(array[i]))
return i;
}
return -1;
}
//删除某个元素
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(o);
if (i == -1)
return false;
removeAt(i);
return true;
} finally {
lock.unlock();
}
}
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i) // removed last element
array[i] = null;
else {
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}
//转化为数组
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return Arrays.copyOf(queue, size);
} finally {
lock.unlock();
}
}
//转化为带类型的数组
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = size;
//如果数组长度小于队列长度重新生成一个队列长度的数组
if (a.length < n)
// Make a new array of a's runtime type, but my contents:
return (T[]) Arrays.copyOf(queue, size, a.getClass());
System.arraycopy(queue, 0, a, 0, n);
if (a.length > n)
a[n] = null;
return a;
} finally {
lock.unlock();
}
}
//清空队列
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] array = queue;
int n = size;
size = 0;
for (int i = 0; i < n; i++)
array[i] = null;
} finally {
lock.unlock();
}
}
//将队列中的所有元素移除封装到集合中
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(size, maxElements);
for (int i = 0; i < n; i++) {
c.add((E) queue[0]); // In this order, in case add() throws.
dequeue();
}
return n;
} finally {
lock.unlock();
}
}
分享到:
相关推荐
小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 ...
在这个“微信小程序开发-绘本跟读案例源码.zip”压缩包中,包含了用于创建一个绘本跟读功能的小程序的全部源代码,非常适合开发者学习和参考。 首先,我们要理解绘本跟读的功能。绘本跟读是教育类应用中常见的一种...
微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小...
【标题】"英语绘本听跟读小程序源码"所涉及的知识点主要集中在移动应用开发、语音识别技术以及教育软件设计上。这个项目是一款专为英语学习者设计的小程序,其核心功能是听读英语绘本,并且能与智能评分系统对接,...
易语言源码易语言端口读rs232源码.rar 易语言源码易语言端口读rs232源码.rar 易语言源码易语言端口读rs232源码.rar 易语言源码易语言端口读rs232源码.rar 易语言源码易语言端口读rs232源码.rar 易语言源码...
小程序&教育培训&悦读神器(源码+截图+源码导入教程和视频) 小程序&教育培训&悦读神器(源码+截图+源码导入教程和视频) 小程序&教育培训&悦读神器(源码+截图+源码导入教程和视频)小程序&教育培训&悦读神器(源码...
易语言源码易语言读内存例程源码.rar 易语言源码易语言读内存例程源码.rar 易语言源码易语言读内存例程源码.rar 易语言源码易语言读内存例程源码.rar 易语言源码易语言读内存例程源码.rar 易语言源码易语言读...
免责声明:资料部分来源于合法的互联网渠道收集和整理,部分自己学习积累成果,供大家学习参考与交流。收取的费用仅用于收集和整理资料耗费时间的酬劳。 本人尊重原创作者或出版方,资料版权归原作者或出版方所有,...
(微信小程序毕业设计)悦读神器(源码+截图)(微信小程序毕业设计)悦读神器(源码+截图)(微信小程序毕业设计)悦读神器(源码+截图)(微信小程序毕业设计)悦读神器(源码+截图)(微信小程序毕业设计)悦读神器(源码+截图)(微信...
易语言源码易语言汇编读字节集源码.rar 易语言源码易语言汇编读字节集源码.rar 易语言源码易语言汇编读字节集源码.rar 易语言源码易语言汇编读字节集源码.rar 易语言源码易语言汇编读字节集源码.rar 易语言源码...
微信小程序——语音跟读(截图+源码).zip 微信小程序——语音跟读(截图+源码).zip 微信小程序——语音跟读(截图+源码).zip 微信小程序——语音跟读(截图+源码).zip 微信小程序——语音跟读(截图+源码).zip ...
易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码....
【微信小程序-毕设期末大作业】微信小程序源码 【微信小程序-毕设期末大作业】微信小程序源码 【微信小程序-毕设期末大作业】微信小程序源码 【微信小程序-毕设期末大作业】微信小程序源码 【微信小程序-毕设期末大...
基于YOLOv5的水表读数系统源码+模型+使用说明.zip基于YOLOv5的水表读数系统源码+模型+使用说明.zip基于YOLOv5的水表读数系统源码+模型+使用说明.zip基于YOLOv5的水表读数系统源码+模型+使用说明.zip基于YOLOv5的水表...
易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件...
易语言源码易语言读网页框架源码.rar 易语言源码易语言读网页框架源码.rar 易语言源码易语言读网页框架源码.rar 易语言源码易语言读网页框架源码.rar 易语言源码易语言读网页框架源码.rar 易语言源码易语言读...
易语言源码易语言读网页文件源码.rar 易语言源码易语言读网页文件源码.rar 易语言源码易语言读网页文件源码.rar 易语言源码易语言读网页文件源码.rar 易语言源码易语言读网页文件源码.rar 易语言源码易语言读...
微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序...
Python 使用Pandas实现数据库的读、写操作 Python源码Python 使用Pandas实现数据库的读、写操作 Python源码Python 使用Pandas实现数据库的读、写操作 Python源码Python 使用Pandas实现数据库的读、写操作 Python源码...
易语言读网页框架源码.rar