`
阅读更多
说明:本篇文章是在阅读《Java 并发编程艺术》过程中的一些笔记和分析,由于本人能力有限,如果有书写错误的地方,欢迎各位大佬批评指正!我们互相交流,学习,共同进步!

该项目的地址:https://github.com/xiaoheng1/concurrent-programming

欢迎有兴趣的小伙伴加入,一起讨论、分析,共同进步!


1.ConcurrentHashMap 的实现原理和使用

ConcurrentHashMap 是线程安全 Map. 为啥不用 HashTable 了?因为 HashTable 效率太低了,比如说在进行 put 操作的时候,read 操作就
用不了,所以一个很正常的思路就是:将数据分段,你加锁就加锁一个段的数据,这样对其他段就没有影响.

可能有的小伙伴会问?为啥不使用 HashMap 了?HashMap 在并发编程中,可能导致程序死循环. 在多线程环境下,使用 HashMap 进行 put 操作会
引起死循环,导致 CPU 利用率接近 100%.

为啥会引起死循环了?

发生死循环的原因在于 transfer 方法.

比如说原来 HashMap 中桶1 的位置存放的是这样的数据:3 -> 7 -> 5.

现在 A、B 两个线程都要向这个 HashMap 中插入数据,由于 HashMap 的容量达到阈值,需要进行扩容,在扩容的时候,是不是要将原 HashMap 中
的值转移到新数组中去?

现在假设线程A执行 transfer 方法的这段代码时挂起了:

Entry<K,V> next = e.next; // 此时 e = 3, next = 7.

现在由线程B进行执行:

线程B在执行的过程中,需要重新定位各个元素在新数组中的位置,假设桶1的位置存放5,桶3的位置存放 7-> 3.

这里的顺序是倒过来的,怎么理解了?

do {
  Entry<K,V> next = e.next; //假设线程一执行至此被挂起,执行线程二
  int i = indexFor(e.hash, newCapacity);
  e.next = newTable[i];
  newTable[i] = e;
   e = next;
} while (e != null);

看上面的那段代码,你会发现遍历链表的时候是从桶的位置开始遍历的,所以越靠近桶位置的元素,在转换到新数组中时,会离桶越远.

假设线程B执行完后,各个元素的位置如上面所描述的那样. 现在轮到 A 执行了,e = 3,next = 7. e = next(7), next = e.next(3)

这不就造成了死循环吗?3 -> 7 -> 3 -> 7 ...

当产生死循环后,如果调用 get 方法,将会陷入死循环,CPU 占用率高达 100%.


现在说下 ConcurrentHashMap 为啥高效了?因为 ConcurrentHashMap 采用了锁分段技术,相比较 HashTable 而言,HashTable 对整个数据
集进行了加锁,当一个线程A进行 put 操作时,其他线程进行读操作(但是被墙在外面,要等到线程A操作结束后,才能竞争锁访问),这样其效率大大
降低了.

那 ConcurrentHashMap 是如何避免 HashTable 的弊端的了?

下面说下 ConcurrentHashMap 的数据结构.

ConcurrentHashMap 是由 Segment 数组和 HashEntry 数组组成. Segment 是一种可重入锁,HashEntry 则用于存储键值对. 一个
ConcurrentHashMap 包含一个 Segment 数组,Segment 的结构和 HashMap 的结构很类似,是一种数组 + 链表的结构(即:一个 Segment
结构包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素),每个 Segment 守护着一个 HashEntry 数组里的元素,当对
HashEntry 数组的数据进行修改时,必须首先获得与它对应的 Segment 锁.

现在再说的直白点,以前 HashTable 相当于一个 Segment 的结构. 现在 ConcurrentHashMap 相当于多个 HashTable. 在操作不同的 Segment
的时候,不影响其他 Segment.

通过阅读 ConcurrentHashMap 的构造方法,总结如下:
(1)segment 数组的长度 ssize 是通过 concurrencyLevel 计算来的. 为了能通过按位 & 来定位 segments 数组的索引,必须保证 ssize 的
长度为 2 的 N 次方. concurrencyLevel 的最大值为 65535.
(2)sshift 为 1 左移多少位到 ssize.
(3)segmentShift 等于 32 - sshift
(4)segmentMask 是散列运算的掩码,为 ssize - 1


2.下面来看下 ConcurrentHashMap 的操作

(1)get 操作,get 操作实现非常简单,先经过一次再散列,然后使用这个散列值通过散列运算定位到 Segment,在通过算列算法定位到元素. 整个
过程高效,为啥高效了?因为 Segment 中 HashEntry 数组是使用 volatile 修饰的,它能够在线程间保持可见性,能够被多线程同时读,并且
保证不会读到过期的值.


3.ConcurrentLinkedQueue 分析

ConcurrentLinkedQueue 的特点是它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取
一个元素的时候,它会返回队列头部的元素. 它采用了 wait-free 的策略来实现的.

ConcurrentLinkedQueue 中有 head 和 tail 节点,初始时 head = tail = new Node(null).

入队:入队过程就是将节点添加到队列的尾部.

具体是如何做的了?

tail 并不是实时更新的,因为为了提高并发效率,对 volatile 变量的读快与对 volatile 变量的写操作.

其实是不是有朋友会问,可不可以写成如下形式:

public boolean offer(E e){
    if(e == null) throw new NullPointerException();
    Node<E> n = new Node<E>(e);
    for(;;){
        Node<E> t = tail;
        if(t.casNext(null, n) && casTail(t, n)){
            return true;
        }
    }
}

答案是肯定的,可以这么写,但是这有个缺点,每次循环的时候,都需要使用循环更新 tail 节点,如果能减少更新 tail 节点,那么并发效率
就能提高.

我们来看下是如何做的:

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

(1) 初始时:head = tail = new Node<E>(null)
(2) 最开始插入的时候,tail.next = null, 直接进队. 此时 tail 节点没有移动,还是指向 head0 那个节点,但是 head0.next=newNode.
(3) 再新加一个节点,那么需要更新 tail 节点指向真正的尾节点了.
    p = (p != t && t != (t = tail)) ? t : q;
先分析下如下代码:如果在单线程的环境中,p = q. 然后就和 (2) 相同了. 现在看下在多线程中的情况,检查 (p != t && t != (t = tail))
是看有没有其他线程修改了 tail 节点,如果修改了,那么 t 就是最新的尾节点了.
(4) p == q 是一种什么场景了?比如说进行了出队操作,此时队列中只有一个元素,对最后一个元素执行出队操作,然后执行一个入队操作.

怎么理解了?

当是哨兵节点时,例如:head = tail -> Node(null) -> Node(C)
此时执行删除操作,p = head,q = head->next = Node(C), 执行出队操作后,更新 head 为 head -> NULL.
此时 tail.next 指向了自身. 所以在入队的时候,会有 p == q 这样的判断.

我们再看下 poll 方法:

public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

(1) 值得我们注意的是,head 节点也是两个两个更新的.
例如 head(Node(null)) -> node1 -> node2 -> node3 -> node4 -> node5.
(2) 执行 q = p.next != null,直接到 p = q.
(3) 现在 h -> node2
(4) 当在出队一个节点时,head 节点是没有发生变化的,只是将该节点指向的值置为 null.


4.Java 中的阻塞队列

(1)阻塞队列是支持两个附加操作的队列. 这两个附加的操作时支持阻塞和插入和删除的方法. 阻塞的插入是说当队列满了的时候,就不再插入了.
当队列空出来了,继续插入. 阻塞删除是说,当队列中没有元素了,则阻塞,当队列中还有元素的时候,则可以继续删除.
它的适用场景为生产者和消费者模式.

JDK 中提供了多种阻塞队列的实现,例如:
ArrayBlockingQueue: 由数组结构组成的有界阻塞队列
LinkedBlockingQueue: 由链表结构组成的有界队列
PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
DelayQueue: 使用优先级队列实现的无界阻塞队列
SynchronousQueue: 不存储元素的阻塞队列
LinkedTransferQueue: 由链表组成的无界阻塞队列
LinkedBlockingDeque: 由链表结构组成的双向阻塞队列.

ArrayBlockingQueue 底层是使用数组实现的,入队和出队的时候,采用一把锁,效率不高.

LinkedBlockingQueue 底层是使用链表实现的一个有界阻塞队列,此队列的默认和最大长度为 Integer.MAX_VALUE. 它使用了两把锁,
takeLock 和 putLock, 效率更高.

PriorityBlockingQueue 支持优先级的无界阻塞队列. 它的特点是使用数组实现了二叉堆这种数据结构(参考堆排序),在 put 和 take 操作
的时候,使用锁,保证线程安全.

DelayQueue 是一个支持延时获取元素的无界阻塞队列,它采用 PriorityQueue 来实现. 值得注意的是,实现中的 leader 是一个等待获取
队列头部元素的线程. 如果 leader 不为空,则表示已经有线程在等待获取队列的头部. 所以使用 await 方法让当前等待信号.

SynchronousQueue **是一个不存储元素的阻塞队列**. 每一个 put 操作必须等待一个 take 操作, 否则不能继续添加元素. SynchronousQueue
可以看成是一个传球手,负责把生成者线程处理的数据直接传递给消费者线程, 队列本身并不存储任何元素,非常适合传递性场景.
SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue.

LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列. 相比较其他阻塞队列,LinkedTransferQueue 多了
tryTransfer 和 transfer 方法.
(1) transfer 表示如果当前有消费者正在等待接收元素,transfer 方法可以把生产者传入的元素立刻 transfer 给消费者. 如果没有消费者在
等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回.
(2) tryTransfer 方法是用来试探性生产者传入的元素是否能直接传给消费者. 如果没有消费者等待接收元素,则返回 false. 和 transfer
方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回,而 transfer 方法是必须等待消费者消费才返回.

LinkedBlockingDeque 是一个由双向链表组成的双向阻塞队列. 所谓双向阻塞队列指的是可以从队列的两端插入和移出元素. 双向队列多了一个
操作队列的入口,在多线程同时入队时,也就减少了一半的竞争. LinkedBlockingDeque 使用了一把锁,那它是如何提升效率的了?**我查阅了大量
资料,并没有发现有介绍说 LinkedBlockingDeque 的效率高于 LinkedBlockingQueue. 相反我认为 LinkedBlockingQueue 的效率更高.
因为 LinkedBlockingQueue 使用了两把锁,一把锁用于控制入队,一把锁用于控制出队. 而 LinkedBlockingDeque 只有一把锁控制入队、
出队操作,虽然你多了一个入队、出队的入口,但是性能还是受限于对锁的竞争,但是 LinkedBlockingQueue 入队和出队是没有竞争的,所以
我认为 LinkedBlockingQueue 的效率更高**.


5.阻塞队列的实现原理
(1) 使用了通知模式. 当生产者往满的队列里添加元素时会阻塞生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用.

6.Fork/Join 框架

Fork/Join 是 Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务
结果的框架.

Fork 是把一个大任务切分为若干个子任务并行执行,Join 是合并这些子任务的执行结果,最后得到这个大任务的结果.

(2) 工作窃取算法. 工作窃取算法是指某个线程从其他队列里窃取任务来执行. 因为存在这么一个场景,我们把一个大任务拆分为多个子任务,为了
减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行. 但是 A 线程很乖的执行完了,而其他线程
还没有干完活,那么 A 线程是不是可以帮其他线程干下活?

所以优点就是充分的利用线程的并行计算能力,缺点是消耗更多的系统资源.

所以设计 Fork/Join 框架时需要考虑的问题是:
(1) 将一个大任务分割为多个子任务.
(2) 将子任务的执行结果进行合并

ForkJoinTask 我们使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务. 它提供在任务中执行 fork() 和 join() 操作的机制. 通常
情况下,我们不需要直接继承 ForkJoinTask 类,只需要继承它的子类.
(1) RecursiveAction 用于没有返回结果的任务
(2) RecursiveTask 用于有返回结构的任务

ForkJoinTask 通过 ForkJoinPool 来执行. 任何分割出来的子任务都会添加到当前你工作线程锁维护的双端队列中,进入队列的头部. 当工作
线程的队列暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务.

example:

class CountTask extends RecursiveTask{
    private static final int THRESHOLD = 2;
    private int start;
    private int end;
   
    public CountTask(int start, int end){
        this.start = start;
        this.end = end;
    }
   
    protected Integer compute(){
        int sum = 0;
        boolean canCompute = end - start <= THRESHOLD;
        if(canCompute){
            for(int i=start; i<=end; i++){
                sum += i;
            }
        }else{
            int middle = (end - start) / 2;
            CountTask taskLeft = new CountTask(start, middle);
            CountTask taskRight = new CountTask(middle+1, end);
           
            leftTask.fork();
            rightTask.fork();
           
            int leftTaskResult = leftTask.join();
            int rightTaskResult = rightTask.join();
           
            sum = leftTaskResult + rightTaskResult;
        }
        return sum;
    }
}

1.fork 执行的时候是异步的还是同步的?
    异步的.
2.如果窃取其他工作队列中的任务?

实现原理:

ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放程序提交给 ForkJoinPool 的任务.
而 ForkJOinWorkerThread 数组负责执行这些任务.

当我们调用 ForkJoinTask 的 fork 方法时,程序会调用 ForkJoinWorkerThread 的 pushTask 方**法异步地**执行这个任务,然后立即返回
结果.

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join 方法的主要作用是阻塞当前线程并等待获取结果.

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

核心点在于 ForkJoinPool, 它的每个工作线程都维护者一个工作队列,这是一个双端队列,里面存放的是任务(ForkJoinTask). 每个工作线程
在运行中产生新的任务(通常由于调用了 fork 放入工作队列的队尾.) 工作线程在处理任务的时候,会从对头拿任务,其他线程窃取任务的时候,
会从队尾窃取任务执行.

参考:① jianshu.com/p/4930801e23c8
     ② https://segmentfault.com/a/1190000016248143
     ③ https://www.cnblogs.com/yaowen/p/10708249.html
     ④ https://www.imooc.com/article/24822

1
0
分享到:
评论

相关推荐

    并发容器的原理,7大并发容器详解、及使用场景

    总的来说,理解和熟练运用并发容器是提升 Java 多线程程序性能的关键。开发者应根据业务场景选择合适的容器,避免全局锁带来的性能损失,充分利用并发容器提供的高级并发控制机制,确保在多线程环境下数据的一致性和...

    《Java并发编程的艺术》

    《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,同时...

    java并发编程内部分享PPT

    Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。这份“java并发编程内部分享PPT”显然是一个深入探讨这一主题的资料,旨在帮助开发者...

    13-Java并发编程学习宝典.zip

    —深入解析ConcurrentHashMap-慕课专栏.html" 和 "25 经典并发容器,多线程面试必备—深入解析ConcurrentHashMap下-慕课专栏.html":这两篇文章深入探讨了Java并发容器中的`ConcurrentHashMap`,它是线程安全的哈希...

    Java并发编程的艺术

    , 《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,...

    Java并发编程实践高清pdf及源码

    4. **线程池**:`ExecutorService`是Java并发框架的核心,它管理一组可重用线程,有效地调度和执行任务。`ThreadPoolExecutor`是其最常见的实现,允许自定义线程池参数。 5. **并发工具类**:如`CountDownLatch`、`...

    java并发编程书籍

    Java并发编程是软件开发中的一个关键领域,尤其是在大型企业级应用和分布式系统中。通过学习相关的书籍,开发者可以深入理解如何有效地设计和实现高效的多线程应用程序,避免并发问题,如竞态条件、死锁、活锁等。...

    JAVA并发编程艺术pdf版

    - **并发容器框架**:如BlockingQueue(阻塞队列),常用于生产者-消费者模式。 6. **死锁与活锁**: - **死锁**:两个或多个线程互相等待对方释放资源,导致无法继续执行。 - **活锁**:线程不断地尝试获取资源...

    Java并发编程之——Amino框架

    Java并发编程是一个复杂而重要的主题,它涉及到多线程、同步机制、线程池和并发容器等关键概念。Amino框架是Java并发编程领域的一个工具,它旨在简化并发编程,提高程序的性能和可维护性。这篇博客文章可能详细探讨...

    ( Java并发程序设计教程.zip )高清版 PDF

    此外,书中可能还会涉及线程优先级、守护线程、线程中断等特性,以及Java 5及以后版本引入的并发改进,如 Fork/Join 框架和 Parallel Streams。 最后,实际应用中的并发最佳实践和调试技巧也是必不可少的。例如,...

    Java并发编程从入门到精通(pdf)(附源码)

    接着,书中将深入探讨Java并发工具类,如Executor框架、Semaphore信号量、CyclicBarrier和CountDownLatch等,这些工具在实际项目中有着广泛的应用,学习它们能帮助开发者更好地控制和协调并发任务。 此外,书中的...

    Java并发编程Xmind思维导图

    Java并发编程Xmind思维导图,思路更清晰。内容来自《Java并发编程的艺术》,包括并发机制底层原理、Java内存模型、Java并发编程基础、锁机制、线程池、并发工具类、原子操作类、并发容器和框架。纯手打,非诚勿扰。

    Java并发编程实战

    5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 CopyOnWriteArrayList 5.3 阻塞队列和生产者-消费者模式 5.3.1 示例:桌面搜索 5.3.2 串行线程封闭 5.3.3 双端队列与工作密取 5.4 ...

    Java并发编程实践(Java Concurrency in Practice) (中英版)

    3. **并发集合与并发容器**:涵盖了Java并发集合框架,包括线程安全的ArrayList、LinkedList、HashMap等,并介绍了ConcurrentHashMap、CopyOnWriteArrayList等高效率的并发容器。 4. **并发工具**:讨论了Executor...

    实战Java高并发程序设计(高清版)

    5. **J.U.C框架**:Java并发 utilities (J.U.C) 框架是Java并发编程的重要组成部分,书中会介绍如何利用这个框架来提升并发性能和代码的可读性。 6. **性能调优**:在高并发场景下,性能优化是必不可少的。可能涵盖...

    java 并发编程实践

    本篇文章将深入探讨Java并发编程的相关知识点,主要基于提供的两个文件——"Java并发编程实战(中文版).pdf"和"Java Concurrency in Practice.pdf"。 1. **线程与并发** - **线程基础**:Java中的线程是并发执行...

    《Java并发编程的艺术》源代码

    第2章介绍Java并发编程的底层实现原理,介绍在CPU和JVM这个层面是如何帮助Java实现并发编程的。 第3章介绍深入介绍了Java的内存模型。Java线程之间的通信对程序员完全透明,内存可见性问题很容易困扰Java程序员,本...

Global site tag (gtag.js) - Google Analytics