前言:这是一场艰苦的旅行....
请首先参考:http://shift-alt-ctrl.iteye.com/blog/1839142
一.BlockingDeque阻塞双端队列(线程安全):
注意ArrayDeque和LinkedList仅仅扩展了Deque,是非阻塞类型的双端队列。
BlockingQueue单向队列,其内部基于ReentrantLock + Condition来控制同步和"阻塞"/"唤醒"的时机;有如下几个实现类:
- ArrayBlockingQueue: “浮动相对游标”的数组,来实现有界的阻塞队列。
- DelayQueue:支持“可延迟”的队列,DelayQueue还只接受Delayed类型的元素,Delayed接口继承自Compare接口并提供了一个long getDelay(TimeUnit),来获取指定时间到now的时间剩余量。DelayQueue底层就是使用PriorityQueue作为支撑的。
- PriorityBlockingQueue:有权重的队列,此队列时可以根据指定的comparator进行排序的。
- SynchronousQueue://
- LinkedBlockingDeque:有界或者无界阻塞队列
PriorityQueue为非线程安全非阻塞,有权重的队列,其权重需要根据特定的compartor来产生。
二.ConcurrentMap(接口):支持并发的map,支持多线程环境中安全的访问。
其提供了几个独特的方法:
- V putIfAbsent(K,V):如果map中不存在此key,则put,否则返回现有的此key关联的值。此过程有Lock同步:
//等价于: if (!map.containsKey(key)) return map.put(key, value); else return map.get(key);
Map<String,Object> map = new ConcurrentHashMap<String, Object>(); if(map.containsKey("key")){ map.put("key", new Object()); } //注意,concurrentHashMap并不保证contains方法和put方法直接保持"原子性",即有可能contains方法返回false之后,在put之前,可能其他线程已经put成功,即在当前线程put时,此时数据已经不一致了.建议采用putIfAbsent
- boolean remove(Object key,Object value):比较并删除指定的key和value。
- boolean replace(K,V oldValue,V newValue):比较并替换。
目前实现ConcurrentMap的类有ConcurrentHashMap,一种基于锁分段技术(Segement)实现的并发hashMap,锁采取了ReentrantLock(Segement扩展了ReentrantLock)。
备注:
1)HashMap非线程安全,即在并发环境中进行put、remove时,map容器中最终的数据可能并非当前线程预期的结果,比如Thread-1、Thread-2同时对map中相同的key进行put操作,因为并发的问题,可能Thread-1操作之后(尚未返回)值已经被Thread-2给覆盖,即put方法返回的V与map中实际存储的V并不一致;还有另外一个问题,HashMap内部基于数组存储,所以在并发操作时,数据的扩容并不是“同步的”,这仍然会导致一些问题。
2)为了解决HashMap线程安全问题,我们可以手动对hashMap实例进行同步操作,或者使用JAVA中自带的SynchronizedMap,可以通过Collections.synchronizedMap()创建,此map是线程安全的,其内部put/remove方法均是同步的。但是这种同步,尽管保证了数据安全,但是却牺牲了并发性。
3)所以,我们引入了ConcurrentHashMap,既是线程安全的,有可以支持并发操作。
在JDK 1.6中,ConcurrentHashMap基于锁分段手段来提高并发能力,相对于synchronizedMap任何时候只能有一个线程操作(put、remove),ConcurrentHashMap允许多个线程并发操作且可以保证数据操作是安全的;ConcurrentHashMap中有一个“CONCURRENT_LEVEL”(并发级别)的属性,默认为16,即最多允许16个线程并行操作;此外它还有一个Segements[],数组大小跟并发级别一样,对于任何put/remove操作,首先根据key的hashcode对并发级别取模,并将模数作为Segements[]数组的索引,获取segement对象,然后执行lock,并在数据操作完毕后unlock。由此可见,hashcode与并发level模数相同的key,将会被“同步执行”,模数不同的key使用不同的lock,可以并行执行。这个思想非常简单易于理解!
在JDK 1.8中,ConcurrentHashMap的实现原理进行了修正,抛弃了Segements锁的手段和并发级别,转而使用CAS + 同步操作。(具体原理比较复杂,暂不介绍)
ConcurrentHashMap支持并发环境下的遍历 + 删除,无论是基于entrySet()、keySet()、values(),都是安全可靠的,因为这三个方法返回的均为View,其iterator内部持有实际map数据。
三.ConcurrentLinkedQueue:
基于单向链表实现的,线程安全的并发队列,无界非阻塞队列,当队列需要在多线程环境中被使用,可以考虑使用它。记住,这是个非阻塞队列,不过支持阻塞的队列,貌似都是线程安全的。
此队列的size不是时间固定的,它的iterator也会被不断调整。ConcurrentLinkedQueue并没有使用Lock,而是采用了CAS的方式,对tail.next进行赋值操作。因为tail.next永远是null,且队列不接受null的元素。
注意,非并发集合(list,queue,set)的iterator以及forEach循环在并发环境中是不能正常工作的,如果原始集合被外部修改(其他线程的add,remove),将会导致异常。对于并发集合的iterator,没有做相关的size校验。
Lock(锁)是控制操作(action)的,可以让一个操作或者一个子操作被串行的处理。。。CAS其实只是对内存数据的变更时使用,如果想知道数据变更在并发环境中是否符合预期,才会使用到CAS。
四.ConcurrentSkipListMap/ConcurrentSkipListSet
两个基于SkipList(跳跃表)方式实现的、支持并发访问的数据结构。依据跳跃表的思想,可以提高数据访问的效率。其中ConcurrentSkipListSet底层使用ConcurrentSkipListMap支撑。
ConcurrentSkipListMap也是ConcurrentNavigableMap的实现类,对于SkipList,其内部元素,必须是可排序的。
跳跃表是一个很简单的表,(参见跳跃表概念),对底层的线性存储结构,加入类似“多级索引”的概念,“索引”的添加时基于随即化。一个跳跃表,整体设计上(设计思路很多)分为表左端head索引,右端tail索引(边界),底端存储层(排序的线性链表),和一个随机化、散列化的不同高度的多级索引“指针”。head索引是高度最高的索引,它是整个链表中值最小的元素锁产生的索引;右端为边界索引,索引指向null或者任意设计的边界值(bound).
跳跃表的底端是一个和普通的链表没啥区别,单向或者双向的均可,前提是必须是排序的。索引节点,就是一个有向路径的标,每个索引节点,都分别有right、down指向,对于双向跳跃表,就具有left、right、up、down四个方向指针;指针就是为了方便寻路。每个新增元素时,首先会导致底层链表的改动;根据自定义的随即算法,来决定此元素的索引高度,如果高度不为0,则依次建立相应层高的索引,并调整各个层高的所以指向。
跳跃表之所以这么设计,实事上就是在做一件事情:基于简单的设计思路和算法,来实现较为高效的查询策略。相对于二分查找有一定的优势.
五.CopyOnWriteArrayList/CopyOnWriteArraySet:
均是CopyOnWrite思想,在数据修改时(happen-before),对数据进行Copy(),read操作可以在原数据结构上继续进行,待write操作结束后,调整数据结构指针。基于这种设计思路的数据结构,通常是read操作频率远大于write操作,可以在并发环境中,支撑较高的吞吐量;避免了因为同步而带来的瓶颈,同时也能确保数据安全操作。同时需要注意,copy操作将会带来多余的空间消耗。注意,此API时支持并发的,多个线程add操作(即CopyOnWrite)将会被队列化,内部采取了ReentrantLock机制来控制.
- CopyOnWriteArrayList底层基于数组实现,在进行write操作时(add,remove),将会导致Arrays.copy操作,创建一个新的数组;待write操作成功后,将原数组的指针替换成新数组指针.
- CopyOnWriteArraySet底层直接使用CopyOnWriteArrayList作为支撑,只不过在add操作时会遍历整个数组结构并进行equals比较(确保具有Set的特性),只有发现此新元素不存在时才会"替换指针".
java中这两个API,支持并发操作时,仍然可以进行遍历而无需额外的同步;即不会抛出ConcurrentModificationException。事实上,迭代器所持有的数组只是一个"创建iterator时底层数组的引用",所以在遍历期间,即使CopyOnWriteArrayList已经新增或者删除了某些元素,仍不会发生冲突,因为iterator持有的是旧数组的引用,而CopyOnWriteArrayList持有的是Copy操作时创建的新数组引用..由此可见,iterator也无法反应实时的数组变化(遍历期间,实际数组的添加、删除),但是原始数组中对象内容发生改变还是可以在迭代器中反应出来。CopyOnWrite的遍历器的remove/add/set操作不被支持,这区别于ArrayList.
CopyOnWriteArrayList、CopyOnWriteArraySet,底层基于数组实现,采取ReentrantLock来同步add/remove/clear等操作。并采取了snapshot的简单手段:
//例如add: public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //数组copy Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; //修改结束后,指针转换 setArray(newElements); return true; } finally { lock.unlock(); } }
六.CountDownLatch:
同步类,用于多个线程协调工作。共享锁,当锁计数器较少到0时,将释放等待的线程。使用场景是,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。当CountDownLatch的锁计数器为1时,可以作为一种“开关”来使用。计数器无法被重置,如果需要重复计数,可以使用CyclicBarrier。
CountDownLatch内部基于AQS来控制线程访问。这个API很简单,只有2个核心方法:
- void await():如果计数器不为0,则获取锁失败,加入同步队列;即线程阻塞。
- void countDown():释放锁,导致计数器递减,如果此时计数器为0,则表示锁释放成功,AQS会帮助“发射”因为await阻塞的线程(组)。
public class CountDownLatchTestMain { /** * @param args */ public static void main(String[] args) throws Exception{ System.out.println("Begin"); CountDownLatch latch = new CountDownLatch(2); for(int i=0;i<4;i++){ CThread c = new CThread(i,latch); c.start(); //Thread.sleep(500); } latch.await(); System.out.println("End"); } static class CThread extends Thread{ CountDownLatch latch; int count; CThread(int count,CountDownLatch latch){ this.count = count; this.latch = latch; } @Override public void run(){ try{ System.out.println("---"+count); }catch(Exception e){ e.printStackTrace(); } finally { latch.countDown();} } } }
七.CyclicBarrier:
循环屏障,它允许一组线程互相等待,直到到达某个公共屏障点;线程(组)数量固定,线程之间需要不时的互相等待,CyclicBarrier和CountDownLatch相比,它可以在释放等待线程后被再次“重用”,所以称为循环屏障。它提供了类似“关卡”的功能。对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
- CyclicBarrier(int parties):指定参与者个数
- CyclicBarrier(int parties,Runnable barrierAction):指定一个屏障操作,此操作将会有最后一个进入barrier的线程执行。
- int await():在所有的线程达到barrier之前,一直等待。此方法可以抛出InterrutedExeception(此线程被中断),可以抛出BrokenBarrierExeception(当其他参与者在wait期间中断,导致屏障完整性被破坏),在方法被await时,如果抛出上述异常,需要做补救的相应操作。此方法返回当前线程到达屏障时的索引。(第一个到达的,为0,最后一个为getParties() - 1);根据返回值的不同可以做一些操作,比如最先/最后达到的做一些前置、后置操作等。await方法其实就是获取锁的过程---信号量减1,初始信号量为parties,如果信号量减少到0时,将会唤醒之前所有await()的线程,并开始新的一轮(next),不需要执行reset,即此时信号量再次恢复到parties,接下来线程可以继续await。
- boolean isBroken():屏障是否处于损坏状态。
- void reset():重置屏障为其初始状态;如果此时有线程在await,其线程将会抛出BrokenBarrierExeception。对于reset操作,需要线程的执行方法有相应的配合(比如支持操作轮训等),否则重置会带来一些不必要的麻烦。。。如果你需要重置,尚不如重新建一个CyclicBarrier。
底层基于ReentrantLock实现。线程阻塞基于Condition方式(注意Condition底层也是通过AQS框架实现);大概伪代码:
ReentrantLock lock = new ReentrantLock(); Condition trip = lock.newCondition(); ////await方法: if(count!=0){ trip.await();//AQS:当前线程队列化,LockSupport.park count--; }else{ trip.signalAll(); }
//////////////////代码实例 public class CyclicBarrierTestMain { /** * @param args */ public static void main(String[] args) throws Exception{ CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("Barrier action!!"); } }); for(int i=0;i<5;i++){ CThread c = new CThread(barrier); c.start(); } Thread.sleep(1000); } static class CThread extends Thread{ CyclicBarrier barrier; CThread(CyclicBarrier barrier){ this.barrier = barrier; } @Override public void run(){ int count = 0; while(true){ try{ System.out.print("---" + count); int index = barrier.await(); System.out.println("+++" + count); count++; if(index == barrier.getParties() - 1){ //barrier.reset(); } }catch(Exception e){ e.printStackTrace(); break; } } } } }
八.Exchanger
Exchanger:同步交换器,2个互相匹配(协调的对象),互相交换数据。2个线程需要把相同类型的数据,以互相等待的方式交换。比如线程1将数据A交换给B,此时线程1等待直到线程B将数据交换出去。Exchanger有一个方法,就是exchange(V x):其作用就是等待另一个线程到达交换点,然后将数据传递给线程。
如果没有其他线程到达交换点,处于调度的目的,禁用当前线程,直到某个线程到达或者某个线程中断。
伪代码:
void exchange(V item){ //如果有线程已经到达 for(;;){ Node e = get(); if(e != null){ V i = e.getItem(); CAS(e,i,null);//将等待匹配者移除 Thread t = e.waiter; LockSupport.unpark(t); // Node ne = new Node(currentThread,ne); set();//将当前需要交换的数据加入,当其他线程unpart之后,可以get,并获取数据 return i;//返回需要交换的数据 }else{ Node e = new Now(currentThread,item); set(node); LockSupport.park(currentThread); //重新回到顶层for循环,并获取交换数据 } } }
如下的例子是基于一个简单的Productor和Consumer模式,一个线程负责生产数据,当数据满时,交换给consumer消费;当consumer消费完时,也申请交换。
import java.util.ArrayDeque; import java.util.Queue; import java.util.Random; import java.util.concurrent.Exchanger; public class ExchangerTestMain { /** * @param args */ public static void main(String[] args) throws Exception{ Exchanger<Queue<Integer>> exchanger = new Exchanger<Queue<Integer>>(); CThread c = new CThread(exchanger); PThread p = new PThread(exchanger); c.start(); p.start(); Thread.sleep(2000); } static class CThread extends Thread{ Exchanger<Queue<Integer>> exchanger ; Queue<Integer> current; CThread(Exchanger<Queue<Integer>> exchanger){ this.exchanger = exchanger; } @Override public void run(){ if(current == null){ current = new ArrayDeque<Integer>(10); } try{ while(true){ //productor if(current.size() == 0){ current = exchanger.exchange(current);//交换出去fullList,希望获得EmptyList } System.out.println("C:" + current.poll()); } }catch(Exception e){ e.printStackTrace(); return; } } } static class PThread extends Thread{ Exchanger<Queue<Integer>> exchanger ; Queue<Integer> current; PThread(Exchanger<Queue<Integer>> exchanger){ this.exchanger = exchanger; } @Override public void run(){ Random r = new Random(); if(current == null){ current = new ArrayDeque<Integer>(10); } try{ while(true){ //productor if(current.size() >= 10){ current = exchanger.exchange(current);//交换出去fullList,希望获得EmptyList } Integer i = r.nextInt(20); System.out.println("P:" + i); current.add(i); } }catch(Exception e){ e.printStackTrace(); return; } } } }
九.Semaphore:信号量
我们需要把semaphore真的看成“信号量”,它是可以被“增减”的锁引用,“0”是判断信号“过剩”的界限。
我们通常使用semaphore来控制资源访问并发量。它底层使用“共享”模式锁实现,提供了“公平”“非公平”2中策略。当“信号量”大于0时,允许获取锁;否则将阻塞直到信号量恢复。
将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。
- Semaphore(int permits, boolean fair):指定信号量,指定公平策略。
- void acquire():获取一个信号,如果信号量<=0,则阻塞;在非公平模式下,允许闯入。
- void acquire(int permits).
上面2个方法都会抛出InterruptException,即在等待线程被“中断时”,将会抛出异常而返回。底层基于AQS.acquireSharedInterruptibly()
- void acquireUninterruptibly():获取一个信号,不支持中断,当线程被中断时,此线程将继续等待,当线程确实从此方法返回后,将设置其中断状态。底层基于AQS.acquireShared();
- void release():释放一个信号,直接导致信号量++。
- boolean tryAcquire():获取一个信号,如果获取成功,则返回true。
相关推荐
本文将深入探讨Java集合框架的核心概念、数据结构以及相关的算法。 首先,集合框架主要由两个核心部分组成:集合(Collection)和图(Map)。集合是用来存储单一元素的容器,而图则用于存储键值对映射关系。在...
### Java并发编程实践知识点详解 #### 一、Java并发编程基础 ##### 1.1 并发与并行概念区分 在Java并发编程实践中,首先需要理解“并发”与“并行”的区别。“并发”指的是多个任务同时进行,但实际上可能是在多...
#### 八、Java并发工具类详解 第八章关注于Java提供的并发工具类,这些工具类包括CountDownLatch、CyclicBarrier、Semaphore等,它们是Java并发编程库的重要组成部分。本章不仅详细介绍了这些工具类的功能和用途,...
- 采用合适的并发数据结构和算法。 #### 四、理论与实践相结合 - **示例代码:** 书中提供了大量的示例代码,涵盖各种并发编程的典型应用场景,帮助读者更好地理解并发编程的技术细节。 - **案例分析:** 分析真实...
《Nginx高性能Web服务器详解》可能是与Java并发编程实战相关的补充阅读,因为Nginx作为一个高性能的反向代理服务器,经常与Java应用服务器配合使用,优化Web服务的并发处理能力。Nginx的异步非阻塞I/O模型和事件驱动...
} }`,了解Java程序的基本结构和输出方式。 4. **基本数据类型**:Java有八种基本数据类型,包括整型(byte, short, int, long)、浮点型(float, double)、字符型(char)和布尔型(boolean)。理解它们的取值...
《Tomcat与Java Web开发技术详解(第2版)》是孙卫琴先生撰写的一本经典教程,专注于讲解如何使用Apache Tomcat服务器进行Java Web应用的开发。这本书深入浅出地介绍了Tomcat的基础架构、配置、优化以及故障排查,同时...
### Java数据结构面试题知识点详解 #### 一、栈与队列的特点 - **栈**:栈是一种特殊的线性表,只允许在一端(称为栈顶)进行插入和删除操作,具有“后进先出”(Last In First Out, LIFO)的特性。常见的存储结构有...
本书会详细讲解Java的语法基础,包括数据类型、控制结构、类和对象、接口、包等核心概念。对于初学者,理解这些基础知识至关重要,因为它们构成了Java编程的基础。 其次,书中会深入探讨面向对象编程(OOP)的概念...
- **变量可见性问题:** 探讨线程间的可见性问题,理解为何简单的数据结构在多线程环境中也可能出现错误。 - **死锁与活锁:** 分析死锁发生的条件和预防措施,同时讨论活锁的特点及其避免方法。 - **竞态条件与数据...
3. ConcurrentSkipListMap 和 ConcurrentSkipListSet 基于跳表(Skip List)数据结构,提供了一个可并发访问的有序映射和集合。跳表允许快速查找,同时保持插入和删除操作的高效性。 4. ConcurrentLinkedQueue 是一...
"Java面试题答案详解"这份资料旨在帮助求职者准备Java相关的面试,深入理解Java语言的关键点,并提供详尽的答案解析。 一、Java基础 1. 类与对象:Java是一种面向对象的语言,类是对象的模板,对象则是类的实例。...
7. **数据结构理解**:了解这些集合背后的底层数据结构(如数组、链表、树等)对于优化代码性能至关重要。例如,ArrayList的动态数组结构适合频繁的随机访问,而不适合频繁的插入和删除操作。 8. **泛型使用**:在...
* Java 语言的基本概念:包括变量、数据类型、运算符、控制结构、函数等。 * Java 语言的高级特性:包括 static 和 final 关键字、抽象类和接口、多态性等。 Java 虚拟机(JVM) * JVM 的架构:包括类加载器、运行...
10. 并发和多线程中的数据结构:例如,使用ConcurrentHashMap进行线程安全的键值对存储。 通过阅读这本书,你可以逐步掌握这些概念,并通过书中给出的实例加深理解。实战练习是巩固知识的关键,可以尝试自己实现...
《Java2编程详解(Special Edition Using Java)》是一本针对Java初学者和爱好者精心编写的教程,旨在提供全面且深入的Java编程知识。本书详细介绍了Java语言的核心概念、语法和应用,是学习Java 2平台的理想资源。...
│ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...
### Java 数据结构详解 #### 一、引言 在计算机科学领域中,数据结构是组织、管理和存储数据的有效方式,使数据能够高效地被访问和修改。Java 作为一种广泛使用的编程语言,在其标准库中提供了丰富的集合框架...
《Java数据结构基础详解》 数据结构是计算机科学的基础,它是研究非数值计算问题中数据元素之间关系的学科。在程序设计中,数据结构与算法相辅相成,构成计算机科技的两大支柱。数据结构主要关注数据元素之间的逻辑...