一.并发工具概述
1.传统的多线程并没有提供高级特性,例如:信号量、线程池和执行管理器等,而这些特性恰恰有助于创建强大的并发程序。
2.新的Fork/Join框架针对当前的多核系统,也提供了并行编程的可行性。
3.并发工具包处理于java.util.concurrent包,主要包括同步器、执行顺、并发集合、Fork/Join框架、atomic包、locks包。
4.同步器:为每种特定的同步问题提供了解决方案
5.执行器:用来管理线程的执行
6.并发集合:提供了集合框架中集合的并发版本
7.Fork/Join框架:提供了对并行编程的支持
8.atomic包:提供了不需要锁即可完成并发环境变量使用的原子性操作
9.locks包:使用Lock接口为并发编程提供了同步的另外一种替代方案
二.同步器-Semaphore和CountDownLatch
1.Semaphore同步器
a.经典的信号量,通过计数器控制对共享资源的访问。
b.Semaphore(int count):创建拥有count个许可证的信号量
c.acquire()/acquire(int num):获取1/num个许可证
d.release()/release(int num):释放1/num个许可证
package com.bijian.concurrent.study; import java.util.concurrent.Semaphore; /** * 银行营业部有两个柜台给三个人提供服务 * @author bijian */ public class SemaphoreDemo { public static void main(String[] args) { //最多允许多少个并发线程来进入这个区域 Semaphore semaphore = new Semaphore(2); Person p1 = new Person(semaphore, "P1"); p1.start(); Person p2 = new Person(semaphore, "P2"); p2.start(); Person p3 = new Person(semaphore, "P3"); p3.start(); } } class Person extends Thread { private Semaphore semaphore; public Person(Semaphore semaphore, String name) { setName(name); this.semaphore = semaphore; } public void run() { System.out.println(getName() + " is waiting..."); try { //每个线程过来,首先要获取许可证 semaphore.acquire(); System.out.println(getName() + " is servicing..."); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName() + " is done!"); //操作结束,要释放许可证 semaphore.release(); } }
运行结果:
P1 is waiting... P3 is waiting... P2 is waiting... P1 is servicing... P3 is servicing... P1 is done! P3 is done! P2 is servicing... P2 is done!
2.CountDownLatch同步器
a.必须发生指定数量的事件后才可以继续运行,如赛跑比赛的倒计时后开始
b.CountDownLatch(int count):必须发生count个数量才可以打开锁存器
c.await():等待锁存器
d.countDown():触发事件
package com.bijian.concurrent.study; import java.util.concurrent.CountDownLatch; /** * 赛跑比赛倒计时 * @author bijian * */ public class CountDownLatchDemo { public static void main(String[] args) { //创建一个需要多少个事件发生才可以指定线程执行的计数器,这里的3表示三个事件发生才可以执行 CountDownLatch countDownLatch = new CountDownLatch(3); new Racer(countDownLatch, "A").start(); new Racer(countDownLatch, "A").start(); new Racer(countDownLatch, "A").start(); for(int i=0;i<3;i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(3 - i); countDownLatch.countDown(); if(i == 2) { System.out.println("Start"); } } } } class Racer extends Thread { private CountDownLatch countDownLatch; public Racer(CountDownLatch countDownLatch, String name) { setName(name); this.countDownLatch = countDownLatch; } public void run() { try { countDownLatch.await(); for(int i=0;i<3;i++) { System.out.println(getName() + " : " + i); } } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果:
3 2 1 Start C : 0 A : 0 B : 0 A : 1 C : 1 A : 2 B : 1 C : 2 B : 2
三.同步器-CylicBarrier、Exchanger和Phaser
1.CylicBarrier同步器
a.适用于只有多个线程都到达预定点时才可以继续执行。
b.CyclicBarrier(int num):等待线程的数量
c.CyclicBarrier(int num, Runnable action):等待线程的数量以及所有线程到达后的操作
d.await():到达临界点后暂停线程
package com.bijian.concurrent.study; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 模拟斗地主 * @author bijian */ public class CyclicBarrierDemo { public static void main(String[] args) { //斗地主需要三个人,所以这里为3 CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() { //主线程一旦通过循环屏障,就可以执行某个动作,如通过Runnable实现的动作 @Override public void run() { System.out.println("Game start"); } }); new Player(cyclicBarrier, "A").start(); new Player(cyclicBarrier, "B").start(); new Player(cyclicBarrier, "C").start(); } } class Player extends Thread { private CyclicBarrier cyclicBarrier; public Player(CyclicBarrier cyclicBarrier, String name) { setName(name); this.cyclicBarrier = cyclicBarrier; } public void run() { System.out.println(getName() + " is waiting other players..."); try { //每个线程在循环屏障处等待 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
运行结果:
C is waiting other players... B is waiting other players... A is waiting other players... Game start
2.Exchanger同步器
a.简化两个线程间数据的交换
b.Exchanger<V>:指定进行交换的数据类型
c.V exchange(V object):等待线程到达,交换数据
package com.bijian.concurrent.study; import java.util.concurrent.Exchanger; public class ExchangerDemo { public static void main(String[] args) { Exchanger<String> ex = new Exchanger<String>(); new A(ex).start(); new B(ex).start(); } } class A extends Thread { private Exchanger<String> ex; public A(Exchanger<String> ex) { this.ex = ex; } public void run() { String str = null; try { str = ex.exchange("Hello"); System.out.println(str); str = ex.exchange("A"); System.out.println(str); str = ex.exchange("B"); System.out.println(str); }catch(InterruptedException e) { e.printStackTrace(); } } } class B extends Thread { private Exchanger<String> ex; public B(Exchanger<String> ex) { this.ex = ex; } public void run() { String str = null; try { str = ex.exchange("Hi!"); System.out.println(str); str = ex.exchange("1"); System.out.println(str); str = ex.exchange("2"); System.out.println(str); }catch(InterruptedException e) { e.printStackTrace(); } } }
运行结果:
Hello Hi! 1 A B 2
3.Phaser同步器
a.工作方式与CyclicBarrier类似,但是可以定义多个阶段
b.Phaser()/Phaser(int num):使用指定0/num个party创建Phaser
c.register():注册party
d.arriveAndAdvance():到达时等待到所有party到达
e.arriveAndDeregister():到达时注销线程自已
package com.bijian.concurrent.study; import java.util.concurrent.Phaser; public class PhaserDemo { public static void main(String[] args) { Phaser phaser = new Phaser(); System.out.println("starting ..."); //在Worker中只是执行、等待 new Worker(phaser, "Fuwuyuan").start(); new Worker(phaser, "Chushi").start(); new Worker(phaser, "Shangcaiyuan").start(); //表示一个有三个订单,对于每一个订单,都需要所有人处理完毕后,才能继续执行 for(int i=1; i<=3; i++) { phaser.arriveAndAwaitAdvance();//自已处理完了,等待其它线程处理完才能继续进行 System.out.println("Order " + i + " finished!"); } //所有订单执行完毕后,解除所有注册的线程 phaser.arriveAndDeregister(); System.out.println("All done!"); } } class Worker extends Thread { private Phaser phaser; public Worker(Phaser phaser, String name) { this.setName(name); this.phaser = phaser; //把当前线程注册到phaser中 phaser.register(); } public void run() { for(int i=1;i<= 3;i++) { System.out.println("current order is :" + i + ":" + getName()); if(i == 3) { //如果三个订单都处理完成,则解除注销 phaser.arriveAndDeregister(); }else { //如果还有其它订单未处理完,则等待其它订单处理完毕 phaser.arriveAndAwaitAdvance(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果:
starting ... current order is :1:Fuwuyuan current order is :1:Shangcaiyuan current order is :1:Chushi Order 1 finished! current order is :2:Shangcaiyuan Order 2 finished! current order is :2:Fuwuyuan current order is :2:Chushi current order is :3:Shangcaiyuan Order 3 finished! All done! current order is :3:Chushi current order is :3:Fuwuyuan
四.执行器
1.执行器
a.用于启动并控制线程的执行
b.核心接口为Executor,包含一个execute(Runnable)用于指定被执行的线程
c.ExecutorService接口用于控制线程执行和管理线程
d.预定义了如下执行器:ThreadPoolExecutor/ScheduledThreadPoolExecutor/ForkJoinPool
2.Callable与Future
a.Callable<V>:表示具有返回值的线程,V:表示返回值类型
b.call():执行任务
c.Future<V>:表示Callable的返回值,V:返回值类型
d.get():获取返回值
实例:
package com.bijian.concurrent.study; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorDemo { public static void main(String[] args) throws Exception { ExecutorService es = Executors.newFixedThreadPool(2); //将Callable提交到线程池中 Future<Integer> r1 = es.submit(new MC(1,100)); Future<Integer> r2 = es.submit(new MC(100,10000)); System.out.println(r1.get() + ":" + r2.get()); es.shutdown(); } } class MC implements Callable<Integer> { private int begin, end; public MC(int begin, int end) { this.begin = begin; this.end = end; } @Override public Integer call() throws Exception { int sum = 0; for(int i=begin;i<end;i++) { sum += i; } return sum; } }
运行结果:
4950:49990050
五.锁与原子操作
1.锁
a.java.util.concurrent.lock包中提供了对锁的支持
b.为使用synchronized控制对资源访问提供了替代机制
c.基本操作模型:访问资源之前申请锁,访问完毕后释放锁
d.lock/tryLock:申请锁
e.unlock:释放锁
f.具体锁类ReentrantLock实现了Lock接口
实例:
package com.bijian.concurrent.study; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockDemo { public static void main(String[] args) { new NT().start(); new NT().start(); new NT().start(); new NT().start(); } } class Data { static int i=0; static Lock lock = new ReentrantLock(); //static synchronized void operate() { static void operate() { //操作之前申诅锁 lock.lock(); i++; System.out.println(i); //操作完毕释放锁 lock.unlock(); } } class NT extends Thread { public void run() { while(true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Data.operate(); } } }
运行结果:
1 2 3 4 5 6 7 8 ...
2.原子操作
a.java.util.concurrent.atom包中提供了对原子操作的支持
b.提供了不需要锁以及其他同步机制就可以进行的一些不可中断操作
c.主要操作为:获取、设置、比较等
实例:
package com.bijian.concurrent.study; import java.util.concurrent.atomic.AtomicInteger; public class AtomDemo { public static void main(String[] args) { new ANT().start(); new ANT().start(); new ANT().start(); new ANT().start(); } } class AData { //用原子操作代替锁的机制 static AtomicInteger ai = new AtomicInteger(0); static void operate() { System.out.println(ai.incrementAndGet()); } } class ANT extends Thread { public void run() { while(true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } AData.operate(); } } }
运行结果:
1 4 3 2 5 8 6 7 ...
六.流编程
1.流的基本知识
a.表示数据移动,移动过程中可能会对数据进行处理
b.不同于IO流,表示流对象
c.操作分为中间操作和终端操作
d.中间操作会产生一个新流
e.终端操作会消费流
2.流的编程模型
a.获取流:stream/parallelSteam(获取串行流/并行流)
b.操作:sort/max/min/...
3.流的基本操作
过滤、排序、缩减、映射、收集、迭代
实例:
package com.bijian.concurrent.study; import java.util.ArrayList; import java.util.List; import java.util.Optional; public class StreamDemo { public static void main(String[] args) { List<String> ls = new ArrayList<>(); ls.add("abc"); ls.add("def"); ls.add("ddd"); ls.add("eee"); ls.add("def"); ls.add("cha"); //max属于终端操作 Optional<String> max = ls.stream().max(String::compareTo); System.out.println("max:" + max.get());//max:eee //forEach属于终端操作,但sorted则是中间操作 ls.stream().sorted().forEach(e -> System.out.println(e)); //不重复的元属的个数 System.out.println(ls.stream().distinct().count());//5 } }
运行结果:
max:eee abc cha ddd def def eee 5
七.Fork/Join框架
1. Fork/Join框架中的主要类
a.ForkJoinTask<V>:描述任务的抽象类
b.ForkJoinPool:管理ForkJoinTask的线程池
c.RecursiveAction:ForkJoinTask子类,描述无返回值的任务
d.RecursiveTask<V>:ForkJoinTask子类,描述有返回值的任务
2.分而治之策略
a.将任务递归划分成更小的子任务,直到子任务足够小,从而能够被连续地处理掉为止
b.优势是处理过程可以使用并行发生,这种情况特别适合基于多核处理器的并行编程
c.根据Java API中定义,分而治之的建议临界点定义在100-1000个操作中的某个位置
3.Fork/Join框架案例
计算1-100000的和
package com.bijian.concurrent.study; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class ForkJoinDemo { public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(); Future<Long> result = forkJoinPool.submit(new NTask(0, 1000001)); System.out.println(result.get()); forkJoinPool.shutdown(); } } class NTask extends RecursiveTask<Long> { static final int THRESHOLD = 1000; private int begin, end; public NTask(int begin, int end) { this.begin = begin; this.end = end; } @Override protected Long compute() { long sum = 0; if((end - begin) <= THRESHOLD) { for(int i=begin;i<end;i++) { sum += i; } }else { int mid = (begin + end) / 2; NTask left = new NTask(begin, mid); left.fork(); NTask right = new NTask(mid + 1, end); right.fork(); Long lr = left.join(); System.out.println(begin + "-" + mid + ":" + lr); Long rr = right.join(); System.out.println(mid + "-" + end + ":" + rr); sum = lr + rr; } return sum; } }
运行结果:
499488998835
相关推荐
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
Java并发工具包是Java平台中的一个关键特性,它位于`java.util.concurrent`包下,为开发者提供了高效、安全的多线程编程支持。这个工具包的设计目标是简化并发编程,提高程序的性能和可维护性,同时避免了线程同步的...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
Java并发工具包(J.U.C)是Java编程语言中用于并发编程的一系列工具包的统称,它包含了一系列方便实现多线程编程的类和接口,使得开发者可以更加方便地编写高效、线程安全的程序。本文将深入浅出地探讨J.U.C的原理和...
Java并发工具包是Java平台中一个非常重要的组成部分,它为开发者提供了高级的并发和多线程编程工具,极大地简化了并发编程的复杂性。在Java 5及更高版本中,Java并发工具包(java.util.concurrent)引入了一系列新的...
Java并发工具包(java.util.concurrent)是Java平台上用于高效、安全地处理多线程编程的重要组件。这个包包含了丰富的并发工具类,旨在帮助开发者构建高度并发的程序,提高程序的性能和可伸缩性。本资源是该工具包的...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
Java并发工具包(Java Concurrency Utilities,简称J.U.C)是Java编程语言中的一个核心组件,它提供了丰富的类和接口,用于高效地处理多线程环境中的并发问题。这个工具包在Java 5.0版本中引入,极大地提升了开发者...
Java并发工具包java.util.concurrent是Java平台在Java 5版本中引入的一组新的并发编程类库,旨在帮助Java开发者更容易地实现复杂的并发程序。这一包的出现,极大地简化了开发者在处理线程和数据同步时所遇到的难题,...
Java并发工具包对并发编程的优化.pdf
### Java并发工具包 `java.util.concurrent` 知识点详解 #### 一、引言 随着多核处理器的普及和应用程序复杂度的增加,多线程编程成为了现代软件开发不可或缺的一部分。为了简化并发编程的复杂性,Java 5 引入了 `...
阻塞队列(BlockingQueue)是Java并发工具包中的一个重要组成部分,它能够保证在队列为空时取元素的操作会等待队列变为非空,而在队列满时插入元素的操作会等待队列中有空余空间。常见的阻塞队列实现有...
Java开发工具包(Java Development Kit,简称JDK)是Java编程语言的核心组件,它为开发者提供了编译、调试和运行Java应用程序所需的所有工具。JDK1.8.0_66是Oracle公司发布的一个特定版本,它包含了Java运行时环境...
并发集合(Concurrent Collections):Java并发工具包提供了一套特殊的集合类,如ConcurrentHashMap、CopyOnWriteArrayList等,它们在内部实现了高效线程安全的并发操作。 线程池(Thread Pool):ExecutorService...
Java并发工具包(java.util.concurrent)是Java编程中不可或缺的一部分,它为多线程环境提供了高效、安全且易用的工具。这个包包含了各种类和接口,帮助开发者编写高效的并发程序,避免了直接操作线程所带来的复杂性...
5. **Java并发工具包(JUC)**:包括原子类、并发容器(如`ConcurrentHashMap`)、阻塞队列(如`ArrayBlockingQueue`)以及锁机制。 6. **Lock和AQS**:Lock接口提供了比`synchronized`更灵活的锁机制,而...
`CountDownlatch`和`Barrier`是Java并发工具包中用于协调多个线程完成特定任务的工具。`CountDownlatch`允许一个或多个线程等待其他线程完成一系列操作,而`Barrier`则确保一组线程在到达某个点之前不会继续执行。 ...