`

Java多线程高并发高级篇(二)--线程池核心中队列术解密

 
阅读更多

在JDK的并发包java.util.concurrent下,我们可以找找定义的队列有哪些。我们按照字典序排一下:

ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue。

那定义的这些队列是要用在哪?当然是我们上一帖线程池核心构造函数中说的BlockingQueue<Runnable> workQueue(等待任务存放的队列)。所以,这几个队列都无一例外的实现了BlockingQueue接口。


 

下面我们一一介绍下这几个队列。

一、ArrayBlockingQueue

这是一个有界任务队列(从Array估计也能猜出来),我们看下它的有界队列是怎么实现的。上源码。

/**
     * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
     * capacity and default access policy.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /**
     * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if <tt>true</tt> then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if <tt>false</tt> the access order is unspecified.
     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**
     * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
     * capacity, the specified access policy and initially containing the
     * elements of the given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param capacity the capacity of this queue
     * @param fair if <tt>true</tt> then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if <tt>false</tt> the access order is unspecified.
     * @param c the collection of elements to initially contain
     * @throws IllegalArgumentException if <tt>capacity</tt> is less than
     *         <tt>c.size()</tt>, or less than 1.
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        if (capacity < c.size())
            throw new IllegalArgumentException();

        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    }

 

 

在这三个构造函数中,我们应该看第二个(第一个是使用第二个实现的,第三个只是将集合转换为一个队列,没啥说的,核心还是第二个)。在第二个构造函数中,传入两参数int capacity, boolean fair,我们看看人家的设计精妙之处。

①capacity定义了容量大小,这个值用在哪?初始化对象数据大小--this.items = (E[]) new Object[capacity]。这也就是ArrayBlockingQueue前缀Array的由来。

②fair这个值用在哪?我们看构造函数中使用了重入锁lock = new ReentrantLock(fair),fair使用在这里,用来表明重入锁是公平锁还是非公平锁(大家可以自己去看看,这里不说)。

③使用重入锁的好伙伴Condition,定义了两个锁notEmpty和notFull,这两锁用来干啥?我很佩服作者的思路。当需要进行移除操作时,我们需要判断啥?当然是队列不空时候我们才能做移除、取(take)操作--作者使用notEmpty锁。

当我们需要进行插入时,我们需要判断啥?当然是队列是否满了,满了就不能进行插入、放(put)了--作者使用notFull锁。 

我们可以从ArrayBlockingQueue的源码中找put方法和take方法来验证:

 

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }

 若count的值没有达到队列中数组的大小(判满),则进行插入操作insert(e),在insert方法中进行放任务,将count值+1,然后通知不空(notEmpty)锁,有任务来了,你可以取了。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

  若count的值不为0(判空),则进行移除操作extract(),在extract方法中进行任务移除,将count值-1,然后通知不满(notFull)锁,队列有位置了,你可以放任务了。

 

线程池中线程数没有达到corePoolSize,会创建新线程进行任务处理。如果达到了定义的corePoolSize,那么就需要将任务放入等待任务队列中进行任务等待。既然是有界队列,总会有达到队列中数据上限的时候。那么达到上限如何处理?

①首先我们要明白,有界队列中定义的任务队列大小是指谁的大小。我们一定不要弄混了,这里的有界队列是用来存放等待任务的。达到上限也就是等待任务放不下了。

②如果新任务到来,线程池中线程数corePoolSize已经达到,就需要看定义的maximumPoolSize,如果线程池中总的线程数小于maximumPoolSize,那么就可以再新创建线程来处理任务队列中放不下的任务。那如果达到了maximumPoolSize,这里就要使用到拒绝策略了(默认的拒绝策略是抛弃策略,下一帖我们详细说)!

 

二、LinkedBlockingQueue

有些书上说它是个无界队列,这是错误的。如果不指定大小,队列默认的大小是Integer.MAX_VALUE。

不信大家可以看它的构造函数实现。

/**
     * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    /**
     * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
     * {@link Integer#MAX_VALUE}, initially containing the elements of the
     * given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        for (E e : c)
            add(e);
    }

 

从Linked前缀也可以大致判断,它类似于链表形式。但是作者设计很巧妙,设计成了一个双端队列,也就是头尾都可以操作。放任务时,从尾部放进去;

private void insert(E x) {
        last = last.next = new Node<E>(x);
    }

 取任务时,从头部取。

/**
     * Removes a node from head of queue,
     * @return the node
     */
    private E extract() {
        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

 为了便于理解,需要画个图。写博客是件很费力的事情。

1、初始时,head和last分别指向了一个空节点。


 

 2、当有任务put时,执行insert方法中last = last.next = new Node<E>(x);这条语句要好好理解下。拆分一下,这条语句等于下面两句:

last.next = new Node<E>(x);--将当前last节点的next指向新的任务节点Node(X);

last = last.next--将last节点移位,指向新插入的Node(X);

举例:

 

public static void main(String[] args) throws InterruptedException {
		LinkedBlockingQueue<String> link = new LinkedBlockingQueue<String>(5);
		link.put("a");
		link.put("b");
		link.put("c");
		String str = link.take();
		System.out.println(str);
	}

 运行的过程图是这样子:

 


 

图就变成了这样子:


当执行String str = link.take();语句时,执行的方法核心为:

 

private E extract() {
        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

 

所做的操作就是取head指向的下一个任务Node(a),然后将head指向该节点,获取到该节点的item值(a)后,将该节点的item置为null,并返回原item的值(a)。


 

对应的图是变成了这样子:(其他依次类推,不再赘述)


 三、SynchronousQueue

 它是个很特殊的队列。特殊在哪?我们一一道来。根据源码中的注释可以得到以下信息:

1、SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue(A synchronous queue does not have any internal capacity, not even a capacity of one.)。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。(A {@linkplain BlockingQueue blocking queue} in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa)

2、因为没有容量,所以你不能遍历,不能判空,不能获取元素等等。。

3、SynchronousQueue提供可选的公平性策略。默认情况下是不保证公平性的(false)。当设置了公平性策略为true时,TransferQueue会以FIFO的顺序来保证线程的访问顺序。(This class supports an optional fairness policy for ordering waiting producer and consumer threads.  By default, this ordering is not guaranteed. However, a queue constructed with fairness set to <tt>true</tt> grants threads access in FIFO order.)

public SynchronousQueue(boolean fair) {
        transferer = (fair)? new TransferQueue() : new TransferStack();
    }

由构造函数可以看出,SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,他们两种都是通过链表实现的,其节点分别为QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演着非常重要的作用,SynchronousQueue的put、take操作都是委托这两个类来实现的。 

 

4、SynchronousQueue适合做什么?它跟通道类似,可以用在任务生产者和任务消费者线程做一些数据交换。在Executors中定义的newCachedThreadPool,使用的就是SynchronousQueue队列。

 

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
 其中,corePoolSize大小为0,maximumPoolSize最大Integer.MAX_VALUE。这就意味着,在没有任务时,线程池中是没有线程的,当任务提交时,该线程池会使用空闲的线程执行,若没有空闲线程,则将任务加入

 

 SynchronousQueue队列。但是SynchronousQueue队列没有容量,所以它就直接提交给线程池,迫使线程池增加新的线程执行任务。当任务执行完毕后,因为corePoolSize为0,所以在60(TimeUnit.SECONDS)后,该空闲线程就被回收了。

 

四、PriorityBlockingQueue

Priority,看到这个词不陌生吧,线程设置优先权的时候,是不是见过这个单词?为了可以让某个任务优先执行,JDK提供了这么一个队列,但是它的实现核心却是PriorityQueue。

1、构造函数

使用PriorityBlockingQueue构造函数时,如果不设置它的容量,它会使用默认容量DEFAULT_INITIAL_CAPACITY = 11。那么如何实现让某个任务优先执行?总得有个比较准则吧,所以PriorityQueue的核心构造函数中提供了比较器Comparator<? super E> comparator。

public PriorityQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

 public PriorityQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

 public PriorityQueue(int initialCapacity,
                         Comparator<? super E> comparator) {
        // Note: This restriction of at least one is not actually needed,
        // but continues for 1.5 compatibility
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.queue = new Object[initialCapacity];
        this.comparator = comparator;
    }

 2、从实例中去剖析原理

队列的操作,离不开两个:取(take)和放(put)。

我们举一例,简单明了。

public static void main(String[] args) throws InterruptedException {
		PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(5,new Comparator<Integer>() {
			@Override
			public int compare(Integer o1, Integer o2) {
				// TODO Auto-generated method stub
				if(o1.intValue()<o2.intValue()){
					return 1;
				}
				return -1;
			}
			
		});
		queue.put(5);
		queue.put(3);
		queue.put(7);
		queue.put(2);
		Integer num= queue.take();
		System.out.println(num);
	}

 我们使用PriorityBlockingQueue构造了一个队列,并且使用比较器Comparator将队列中元素进行排序(从大到小,模拟优先)。

2.1执行put操作

①执行put操作源码:

public void put(E e) {
        offer(e); // never need to block
    }

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean ok = q.offer(e);
            assert ok;
            notEmpty.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

 在真正执行boolean ok = q.offer(e);时,使用的是PriorityQueue<E> q;中的offer方法,我们看下源码:

 

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            //扩容
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            //排序
            siftUp(i, e);
        return true;
    }

private void siftUp(int k, E x) {
        if (comparator != null)
            //使用比较器进行排序
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }

private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }

 

 

②对应的程序图(第一个元素插入省略,就一步;其他元素插入方法一样,我们讲最复杂的):

插入元素7时:


 执行完后,得到的队列是[7,3,5,null,null]。

 

继续插入元素2:


 
这样就把优先级最高的7放在了队列的第一个位置,处理任务取的时候当然取到的第一个就是它了。

2.2 执行take操作

take操作源码部分:

 

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (q.size() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = q.poll();
            assert x != null;
            return x;
        } finally {
            lock.unlock();
        }
    }

----PriorityQueue中的操作(核心):------
public E poll() {
        if (size == 0)
            return null;
        int s = --size;
        modCount++;
        E result = (E) queue[0];
        E x = (E) queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return result;
    }

private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }

private void siftDownComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        int half = size >>> 1;        // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = queue[child];
            int right = child + 1;
            if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo((E) c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }
        queue[k] = key;
    }

对应的示例程序图解:

执行take操作:


执行siftDown(0,x)方法中调用的siftDownComparable方法(要注意,这里源码位置中k固定是0;x是队列最末尾的元素)----这个方法的主要目的是,采用二分法进行比较,获取队列中剩余元素的优先级最高的那个元素。这里的移位运算用的很巧妙。half是将size右移一位(相当于size/2),获取队列中间位置的元素,然后使用比较器比较queue[1]和queue[2]位置的元素(当然是size>2的情况下),如果子元素小于子元素右面的元素,然后再比较队列末尾元素x的值和c的值谁大,如果x大,则跳出循环直接将queue[0]=x;

如果是c大,那么需要把c放在queue[0]的位置,把x放在原来c的位置.


 

 

  • 大小: 4.6 KB
  • 大小: 5 KB
  • 大小: 13.2 KB
  • 大小: 21 KB
  • 大小: 14.8 KB
  • 大小: 10.9 KB
  • 大小: 29.7 KB
  • 大小: 40.8 KB
  • 大小: 32.3 KB
  • 大小: 40.5 KB
  • 大小: 17.8 KB
  • 大小: 46.8 KB
分享到:
评论

相关推荐

    java多线程、并发及线程池介绍收藏的几篇文档

    Java多线程、并发以及线程池是Java编程中至关重要的概念,特别是在处理高并发、高性能的系统设计时。以下是对这些主题的详细说明: 1. **Java 程序中的多线程** - 多线程允许一个程序同时执行多个任务,提高程序...

    Java 多线程与并发(17-26)-JUC线程池- FutureTask详解.pdf

    ### Java多线程与并发(17-26)-JUC线程池-FutureTask详解 #### 一、概述 本文将围绕Java多线程与并发中的重要概念——`FutureTask`进行深入探讨。`FutureTask`是Java并发库中的一个关键组件,它实现了`...

    Java多线程编程实战指南-核心篇

    《Java多线程编程实战指南-核心篇》是一本深入探讨Java并发编程的书籍,旨在帮助读者掌握在Java环境中创建、管理和同步线程的核心技术。Java的多线程能力是其强大之处,使得开发者能够在同一时间执行多个任务,提高...

    Java多线程与高并发入门到精通-视频教程网盘链接提取码下载.txt

    综上所述,《Java多线程与高并发入门到精通》是一门全面涵盖Java多线程编程基础及高级技术的课程。无论您是初学者还是有一定基础的开发者,都能够从中获得宝贵的知识和经验。通过学习这门课程,您将能够掌握多线程的...

    Java多线程实战精讲-带你一次搞明白Java多线程高并发

    Java多线程实战精讲是Java开发者必备的技能之一,特别是在处理高并发场景时,它的重要性不言而喻。本文将深入探讨Java多线程的相关知识点,帮助你全面理解并掌握这一核心概念。 1. **线程基础** - **线程定义**:...

    java 多线程高并发相关资料收集

    本文将围绕“Java多线程高并发相关资料收集”这一主题,详细探讨这两个领域的核心知识点。 首先,多线程是指在单个程序中同时执行多个线程。Java提供了一个强大的多线程支持,允许开发者创建、管理和控制多个执行...

    深入理解高并发编程-Java线程池核心技术

    在深入理解高并发编程,尤其是Java线程池核心技术时,我们首先要明白线程与多线程的概念。线程是操作系统中的基本调度单元,它比进程更小,且基本不拥有系统资源,主要由程序计数器、寄存器和栈等组成。在同一个进程...

    java并发库高级应用源码--张孝祥

    在《java并发库高级应用源码--张孝祥》中,我们将会深入探讨Java中的线程管理和并发控制策略,这对于我们理解和优化多线程程序至关重要。 首先,Java中的`Thread`类是实现并发的基础,它代表了一个独立的执行线程。...

    JAVA线程高级-线程按序交替执行

    在Java编程中,多线程是并发编程的重要组成部分,它允许程序同时执行多个任务,从而提高了系统的效率和响应性。然而,在某些场景下,我们可能需要控制线程的执行顺序,确保它们按照特定的顺序交替运行,这在并发编程...

    java 多线程编程实战指南(核心 + 设计模式 完整版)

    《Java多线程编程实战指南》这本书深入浅出地讲解了Java多线程的核心概念和实战技巧,分为核心篇和设计模式篇,旨在帮助开发者掌握并应用多线程技术。 1. **线程基础** - **线程的创建**:Java提供了两种创建线程...

    张孝祥Java多线程与并发库高级应用笔记

    ### 张孝祥Java多线程与并发库高级应用笔记概览 #### 一、Java多线程技术的重要性与挑战 Java线程技术是软件工程领域不可或缺的一部分,尤其在底层编程、Android应用开发以及游戏开发中,其重要性不言而喻。然而,...

    java多线程和并发.pdf

    Java多线程与并发编程是Java语言中用于处理多任务执行的关键技术,它能够帮助开发者设计出能够有效应对高并发请求的应用程序。在现代的线上(Online)和离线(Offline)应用中,合理利用多线程技术可以大幅提高系统...

    java多线程并发

    本篇文章将深入探讨Java中的多线程并发机制,并通过具体的示例来帮助读者更好地理解和掌握这一重要概念。 #### 二、为什么需要多线程? 多线程技术的存在主要解决了计算机系统中资源利用率低下的问题。在没有多...

    java多线程查询数据库

    本文将详细探讨如何利用Java的多线程技术和线程池来实现并发查询数据库,以及相关的文件`BatchDataUtil.java`和`BatchDataRunnable.java`可能涉及的关键知识点。 ### 1. 多线程并发查询 多线程并发查询允许我们将一...

    java多线程分页查询

    #### 二、Java多线程分页查询原理及实现 ##### 1. 分页查询基础概念 分页查询是指在查询数据时,将数据分成多个页面展示,而不是一次性返回所有数据。这种方式能够有效地减少单次查询的数据量,从而提高查询速度和...

    图解java多线程设计模式-结城浩-完整高清带书签版本

    总之,《图解Java多线程设计模式》是一本深入浅出的多线程编程指南,它不仅涵盖了Java多线程的基础知识,还深入探讨了高级主题和最佳实践,对于想要提升并发编程能力的Java开发者来说,是一本不可多得的参考书。...

    Java多线程编程总结

    Java线程:新特征-线程池 Java线程:新特征-有返回值的线程 Java线程:新特征-锁(上) Java线程:新特征-锁(下) Java线程:新特征-信号量 Java线程:新特征-阻塞队列 Java线程:新特征-阻塞栈 Java线程:新特征-...

    Java多线程与并发库高级应用

    ### Java多线程与并发库高级应用 #### 一、Java多线程基础 在深入探讨Java多线程与并发库的高级应用之前,我们首先需要回顾一下Java多线程的基础概念和技术要点。 ##### 1.1 线程的概念 在计算机科学中,线程是...

    WHUT-java多线程实验-第二周-异常处理.zip

    在"WHUT-java多线程实验-第二周-异常处理"这个实验中,我们将学习如何在多线程环境下进行异常捕获和处理。实验基于IntelliJ IDEA这个强大的Java集成开发环境进行,IDEA提供了友好的界面和丰富的工具,方便我们进行...

    人工智能-项目实践-多线程-Java多线程高并发实例.zip

    在本项目实践中,我们将深入探讨Java中的多线程与高并发技术,特别是在人工智能领域的应用。Java作为一种广泛应用的编程语言,其强大的并发处理能力是其在大规模数据处理和高性能计算场景中备受青睐的原因之一。本...

Global site tag (gtag.js) - Google Analytics