`

读PriorityBlocking源码

阅读更多
//一个基于而为堆的优先级队列,它是无界的。
//先看构造函数:
 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }


public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }


 public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
	//是否需要堆化 
        boolean heapify = true; // true if not known to be in heap order
        //是否需要检测null
	boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) {
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
	    //生成堆
            heapify();
    }
//添加元素
     public boolean add(E e) {
        return offer(e);
    }
//添加元素不阻塞
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
	//扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
	   //长度+1
            size = n + 1;
	    //当前队列已经不为空了唤醒在notEmpty上等待的线程。
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

//扩容。这里处理的很精巧利用CAS进行扩容。
     private void tryGrow(Object[] array, int oldCap) {
       //这里释放了锁
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
	/**利用CAS设置allocationSpinLock状态成功则扩容。注意看这段代码虽然采用CAS机制但是还是有并发问题.
          关键在于下面的一句判断&& queue == array关键就在这里,这里判断了相等才会复制 */
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
	      //扩容完后置为0
                allocationSpinLock = 0;
            }
        }
	//其他线程扩容了交给其他线程执行
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
	// queue == array这个判断很重要这个判断判断了只有一个线程扩容成功,其他扩容的线程都失败。
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
 //添加元素不阻塞
  public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }

//只获取头元素不阻塞
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (size == 0) ? null : (E) queue[0];
        } finally {
            lock.unlock();
        }
    }

//获取头元素并移除
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

//在一定时间内获取并移除某个元素
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
	   //当队列中有元素并且没有超时的情况下,在notEmpty条件上等待队列变为非空。
            while ( (result = dequeue()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }

//可以阻塞的获取元素直接队列非空。
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

//获取队列大小
public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return size;
        } finally {
            lock.unlock();
        }
    }

//无界队列所以返回最大值。
public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

//返回元素的比较器
 public Comparator<? super E> comparator() {
        return comparator;
    }

//返回是否包含某元素
 public boolean contains(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return indexOf(o) != -1;
        } finally {
            lock.unlock();
        }
    }

private int indexOf(Object o) {
        if (o != null) {
            Object[] array = queue;
            int n = size;
            for (int i = 0; i < n; i++)
                if (o.equals(array[i]))
                    return i;
        }
        return -1;
    }

//删除某个元素
     public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = indexOf(o);
            if (i == -1)
                return false;
            removeAt(i);
            return true;
        } finally {
            lock.unlock();
        }
    }

private void removeAt(int i) {
        Object[] array = queue;
        int n = size - 1;
        if (n == i) // removed last element
            array[i] = null;
        else {
            E moved = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(i, moved, array, n);
            else
                siftDownUsingComparator(i, moved, array, n, cmp);
            if (array[i] == moved) {
                if (cmp == null)
                    siftUpComparable(i, moved, array);
                else
                    siftUpUsingComparator(i, moved, array, cmp);
            }
        }
        size = n;
    }

//转化为数组
   public Object[] toArray() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return Arrays.copyOf(queue, size);
        } finally {
            lock.unlock();
        }
    }


//转化为带类型的数组
 public <T> T[] toArray(T[] a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = size;
	    //如果数组长度小于队列长度重新生成一个队列长度的数组
            if (a.length < n)
                // Make a new array of a's runtime type, but my contents:
                return (T[]) Arrays.copyOf(queue, size, a.getClass());
            System.arraycopy(queue, 0, a, 0, n);
            if (a.length > n)
                a[n] = null;
            return a;
        } finally {
            lock.unlock();
        }
    }
//清空队列
public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] array = queue;
            int n = size;
            size = 0;
            for (int i = 0; i < n; i++)
                array[i] = null;
        } finally {
            lock.unlock();
        }
    }


//将队列中的所有元素移除封装到集合中
public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }

public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(size, maxElements);
            for (int i = 0; i < n; i++) {
                c.add((E) queue[0]); // In this order, in case add() throws.
                dequeue();
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

分享到:
评论

相关推荐

    微信小程序 语音跟读 (源码)

    微信小程序 语音跟读 (源码)微信小程序 语音跟读 (源码)微信小程序 语音跟读 (源码)微信小程序 语音跟读 (源码)微信小程序 语音跟读 (源码)微信小程序 语音跟读 (源码)微信小程序 语音跟读 (源码)微信小程序 语音跟...

    微信小程序源码 语音跟读(学习版)

    微信小程序源码 语音跟读(学习版)微信小程序源码 语音跟读(学习版)微信小程序源码 语音跟读(学习版)微信小程序源码 语音跟读(学习版)微信小程序源码 语音跟读(学习版)微信小程序源码 语音跟读(学习版)微信小程序源码...

    小程序源码 语音跟读 (代码+截图)

    小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 语音跟读 (代码+截图)小程序源码 ...

    微信小程序源码(含截图)语音跟读

    微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小程序源码(含截图)语音跟读微信小...

    易语言源码易语言对象读网页源码.rar

    易语言源码易语言对象读网页源码.rar 易语言源码易语言对象读网页源码.rar 易语言源码易语言对象读网页源码.rar 易语言源码易语言对象读网页源码.rar 易语言源码易语言对象读网页源码.rar 易语言源码易语言对象...

    微信小程序开发-绘本跟读案例源码.zip

    在这个“微信小程序开发-绘本跟读案例源码.zip”压缩包中,包含了用于创建一个绘本跟读功能的小程序的全部源代码,非常适合开发者学习和参考。 首先,我们要理解绘本跟读的功能。绘本跟读是教育类应用中常见的一种...

    易语言源码易语言端口读rs232源码.rar

    易语言源码易语言端口读rs232源码.rar 易语言源码易语言端口读rs232源码.rar 易语言源码易语言端口读rs232源码.rar 易语言源码易语言端口读rs232源码.rar 易语言源码易语言端口读rs232源码.rar 易语言源码...

    易语言源码易语言读内存例程源码.rar

    易语言源码易语言读内存例程源码.rar 易语言源码易语言读内存例程源码.rar 易语言源码易语言读内存例程源码.rar 易语言源码易语言读内存例程源码.rar 易语言源码易语言读内存例程源码.rar 易语言源码易语言读...

    微信小程序源码 语音跟读(源码+截图).rar

    免责声明:资料部分来源于合法的互联网渠道收集和整理,部分自己学习积累成果,供大家学习参考与交流。收取的费用仅用于收集和整理资料耗费时间的酬劳。 本人尊重原创作者或出版方,资料版权归原作者或出版方所有,...

    (微信小程序毕业设计)悦读神器(源码+截图).zip

    (微信小程序毕业设计)悦读神器(源码+截图)(微信小程序毕业设计)悦读神器(源码+截图)(微信小程序毕业设计)悦读神器(源码+截图)(微信小程序毕业设计)悦读神器(源码+截图)(微信小程序毕业设计)悦读神器(源码+截图)(微信...

    易语言源码易语言汇编读字节集源码.rar

    易语言源码易语言汇编读字节集源码.rar 易语言源码易语言汇编读字节集源码.rar 易语言源码易语言汇编读字节集源码.rar 易语言源码易语言汇编读字节集源码.rar 易语言源码易语言汇编读字节集源码.rar 易语言源码...

    微信小程序——语音跟读(截图+源码).zip

    微信小程序——语音跟读(截图+源码).zip 微信小程序——语音跟读(截图+源码).zip 微信小程序——语音跟读(截图+源码).zip 微信小程序——语音跟读(截图+源码).zip 微信小程序——语音跟读(截图+源码).zip ...

    易语言源码易语言语音报读源码.rar

    易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码.rar 易语言源码易语言语音报读源码....

    基于YOLOv5的水表读数系统源码+模型+使用说明.zip

    基于YOLOv5的水表读数系统源码+模型+使用说明.zip基于YOLOv5的水表读数系统源码+模型+使用说明.zip基于YOLOv5的水表读数系统源码+模型+使用说明.zip基于YOLOv5的水表读数系统源码+模型+使用说明.zip基于YOLOv5的水表...

    易语言源码易语言读HEX文件源码.rar

    易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件源码.rar 易语言源码易语言读HEX文件...

    易语言源码易语言读网页框架源码.rar

    易语言源码易语言读网页框架源码.rar 易语言源码易语言读网页框架源码.rar 易语言源码易语言读网页框架源码.rar 易语言源码易语言读网页框架源码.rar 易语言源码易语言读网页框架源码.rar 易语言源码易语言读...

    易语言源码易语言读网页文件源码.rar

    易语言源码易语言读网页文件源码.rar 易语言源码易语言读网页文件源码.rar 易语言源码易语言读网页文件源码.rar 易语言源码易语言读网页文件源码.rar 易语言源码易语言读网页文件源码.rar 易语言源码易语言读...

    语音跟读微信小程序源码(含截图).rar

    本项目是基于&lt;微信小程序&gt;做的一套语音跟读, 分为【用户/登陆系统、查看教材、查看课程安排、参与跟读(录音/上传/合成)、结果展示】等功能 ##开发/调试环境 微信版本:6.3.30 IOS版本:IOS_10.0.2 微信开发...

    微信小程序源码-悦读神器.zip

    微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序源码-悦读神器.zip微信小程序...

    (微信小程序毕业设计)悦读神器(附源码+截图).zip

    (微信小程序毕业设计)悦读神器(附源码+截图).zip(微信小程序毕业设计)悦读神器(附源码+截图).zip(微信小程序毕业设计)悦读神器(附源码+截图).zip(微信小程序毕业设计)悦读神器(附源码+截图).zip(微信小程序毕业设计)...

Global site tag (gtag.js) - Google Analytics