`

java并发工具包

阅读更多

一.并发工具概述

        1.传统的多线程并没有提供高级特性,例如:信号量、线程池和执行管理器等,而这些特性恰恰有助于创建强大的并发程序。

        2.新的Fork/Join框架针对当前的多核系统,也提供了并行编程的可行性。

        3.并发工具包处理于java.util.concurrent包,主要包括同步器、执行顺、并发集合、Fork/Join框架、atomic包、locks包。

        4.同步器:为每种特定的同步问题提供了解决方案

        5.执行器:用来管理线程的执行

        6.并发集合:提供了集合框架中集合的并发版本

        7.Fork/Join框架:提供了对并行编程的支持

        8.atomic包:提供了不需要锁即可完成并发环境变量使用的原子性操作

        9.locks包:使用Lock接口为并发编程提供了同步的另外一种替代方案

 

二.同步器-Semaphore和CountDownLatch

1.Semaphore同步器

        a.经典的信号量,通过计数器控制对共享资源的访问。

        b.Semaphore(int count):创建拥有count个许可证的信号量

        c.acquire()/acquire(int num):获取1/num个许可证

        d.release()/release(int num):释放1/num个许可证

package com.bijian.concurrent.study;

import java.util.concurrent.Semaphore;

/**
 * 银行营业部有两个柜台给三个人提供服务
 * @author bijian
 */
public class SemaphoreDemo {

	public static void main(String[] args) {
		//最多允许多少个并发线程来进入这个区域
		Semaphore semaphore = new Semaphore(2);
		
		Person p1 = new Person(semaphore, "P1");
		p1.start();
		
		Person p2 = new Person(semaphore, "P2");
		p2.start();
		
		Person p3 = new Person(semaphore, "P3");
		p3.start();
	}
}

class Person extends Thread {
	
	private Semaphore semaphore;
	
	public Person(Semaphore semaphore, String name) {
		setName(name);
		this.semaphore = semaphore;
	}
	
	public void run() {
		System.out.println(getName() + " is waiting...");
		try {
			//每个线程过来,首先要获取许可证
			semaphore.acquire();
			System.out.println(getName() + " is servicing...");
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(getName() + " is done!");
		//操作结束,要释放许可证
		semaphore.release();
	}
}

运行结果:

P1 is waiting...
P3 is waiting...
P2 is waiting...
P1 is servicing...
P3 is servicing...
P1 is done!
P3 is done!
P2 is servicing...
P2 is done!

2.CountDownLatch同步器

        a.必须发生指定数量的事件后才可以继续运行,如赛跑比赛的倒计时后开始

        b.CountDownLatch(int count):必须发生count个数量才可以打开锁存器

        c.await():等待锁存器

        d.countDown():触发事件

package com.bijian.concurrent.study;

import java.util.concurrent.CountDownLatch;

/**
 * 赛跑比赛倒计时
 * @author bijian
 *
 */
public class CountDownLatchDemo {

	public static void main(String[] args) {
		//创建一个需要多少个事件发生才可以指定线程执行的计数器,这里的3表示三个事件发生才可以执行
		CountDownLatch countDownLatch = new CountDownLatch(3);
		
		new Racer(countDownLatch, "A").start();
		new Racer(countDownLatch, "A").start();
		new Racer(countDownLatch, "A").start();
		
		for(int i=0;i<3;i++) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(3 - i);
			countDownLatch.countDown();
			if(i == 2) {
				System.out.println("Start");
			}
		}
	}
}

class Racer extends Thread {
	
	private CountDownLatch countDownLatch;
	
	public Racer(CountDownLatch countDownLatch, String name) {
		setName(name);
		this.countDownLatch = countDownLatch;
	}
	
	public void run() {
		try {
			countDownLatch.await();
			for(int i=0;i<3;i++) {
				System.out.println(getName() + " : " + i);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
	}
}

运行结果:

3
2
1
Start
C : 0
A : 0
B : 0
A : 1
C : 1
A : 2
B : 1
C : 2
B : 2

 

三.同步器-CylicBarrier、Exchanger和Phaser

1.CylicBarrier同步器

        a.适用于只有多个线程都到达预定点时才可以继续执行。

        b.CyclicBarrier(int num):等待线程的数量

        c.CyclicBarrier(int num, Runnable action):等待线程的数量以及所有线程到达后的操作

        d.await():到达临界点后暂停线程

package com.bijian.concurrent.study;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 模拟斗地主
 * @author bijian
 */
public class CyclicBarrierDemo {

	public static void main(String[] args) {
		//斗地主需要三个人,所以这里为3
		CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
			
			//主线程一旦通过循环屏障,就可以执行某个动作,如通过Runnable实现的动作
			@Override
			public void run() {
				System.out.println("Game start");
			}
		});
		new Player(cyclicBarrier, "A").start();
		new Player(cyclicBarrier, "B").start();
		new Player(cyclicBarrier, "C").start();
	}
}

class Player extends Thread {
	
	private CyclicBarrier cyclicBarrier;
	
	public Player(CyclicBarrier cyclicBarrier, String name) {
		setName(name);
		this.cyclicBarrier = cyclicBarrier;
	}
	
	public void run() {
		System.out.println(getName() + " is waiting other players...");
		try {
			//每个线程在循环屏障处等待
			cyclicBarrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

运行结果:

C is waiting other players...
B is waiting other players...
A is waiting other players...
Game start

2.Exchanger同步器

        a.简化两个线程间数据的交换

        b.Exchanger<V>:指定进行交换的数据类型

        c.V exchange(V object):等待线程到达,交换数据

package com.bijian.concurrent.study;

import java.util.concurrent.Exchanger;

public class ExchangerDemo {

	public static void main(String[] args) {
		Exchanger<String> ex = new Exchanger<String>();
		new A(ex).start();
		new B(ex).start();
	}
}

class A extends Thread {
	
	private Exchanger<String> ex;
	
	public A(Exchanger<String> ex) {
		this.ex = ex;
	}
	
	public void run() {
		String str = null;
		try {
			str = ex.exchange("Hello");
			System.out.println(str);
			
			str = ex.exchange("A");
			System.out.println(str);
			
			str = ex.exchange("B");
			System.out.println(str);
		}catch(InterruptedException e) {
			e.printStackTrace();
		}
	}
}

class B extends Thread {
	
	private Exchanger<String> ex;
	
	public B(Exchanger<String> ex) {
		this.ex = ex;
	}
	
	public void run() {
		String str = null;
		try {
			str = ex.exchange("Hi!");
			System.out.println(str);
			
			str = ex.exchange("1");
			System.out.println(str);
			
			str = ex.exchange("2");
			System.out.println(str);
		}catch(InterruptedException e) {
			e.printStackTrace();
		}
	}
}

运行结果:

Hello
Hi!
1
A
B
2

3.Phaser同步器

        a.工作方式与CyclicBarrier类似,但是可以定义多个阶段

        b.Phaser()/Phaser(int num):使用指定0/num个party创建Phaser

        c.register():注册party

        d.arriveAndAdvance():到达时等待到所有party到达

        e.arriveAndDeregister():到达时注销线程自已

package com.bijian.concurrent.study;

import java.util.concurrent.Phaser;

public class PhaserDemo {

	public static void main(String[] args) {
		
		Phaser phaser = new Phaser();
		
		System.out.println("starting ...");
		
		//在Worker中只是执行、等待
		new Worker(phaser, "Fuwuyuan").start();
		new Worker(phaser, "Chushi").start();
		new Worker(phaser, "Shangcaiyuan").start();
		
		//表示一个有三个订单,对于每一个订单,都需要所有人处理完毕后,才能继续执行
		for(int i=1; i<=3; i++) {
			phaser.arriveAndAwaitAdvance();//自已处理完了,等待其它线程处理完才能继续进行
			System.out.println("Order " + i + " finished!");
		}
		
		//所有订单执行完毕后,解除所有注册的线程
		phaser.arriveAndDeregister();
		System.out.println("All done!");
	}
}

class Worker extends Thread {
	
	private Phaser phaser;
	
	public Worker(Phaser phaser, String name) {
		this.setName(name);
		this.phaser = phaser;
		//把当前线程注册到phaser中
		phaser.register();
	}
	
	public void run() {
		for(int i=1;i<= 3;i++) {
			System.out.println("current order is :" + i + ":" + getName());
			if(i == 3) {
				//如果三个订单都处理完成,则解除注销
				phaser.arriveAndDeregister();
			}else {
				//如果还有其它订单未处理完,则等待其它订单处理完毕
				phaser.arriveAndAwaitAdvance();
			}
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

运行结果:

starting ...
current order is :1:Fuwuyuan
current order is :1:Shangcaiyuan
current order is :1:Chushi
Order 1 finished!
current order is :2:Shangcaiyuan
Order 2 finished!
current order is :2:Fuwuyuan
current order is :2:Chushi
current order is :3:Shangcaiyuan
Order 3 finished!
All done!
current order is :3:Chushi
current order is :3:Fuwuyuan

 

四.执行器

1.执行器

        a.用于启动并控制线程的执行

        b.核心接口为Executor,包含一个execute(Runnable)用于指定被执行的线程

        c.ExecutorService接口用于控制线程执行和管理线程

        d.预定义了如下执行器:ThreadPoolExecutor/ScheduledThreadPoolExecutor/ForkJoinPool

2.Callable与Future

        a.Callable<V>:表示具有返回值的线程,V:表示返回值类型

        b.call():执行任务

        c.Future<V>:表示Callable的返回值,V:返回值类型

        d.get():获取返回值

实例:

package com.bijian.concurrent.study;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorDemo {

	public static void main(String[] args) throws Exception {
		ExecutorService es = Executors.newFixedThreadPool(2);
		
		//将Callable提交到线程池中
		Future<Integer> r1 = es.submit(new MC(1,100));
		Future<Integer> r2 = es.submit(new MC(100,10000));
		
		System.out.println(r1.get() + ":" + r2.get());
		
		es.shutdown();
	}
}

class MC implements Callable<Integer> {

	private int begin, end;
	
	public MC(int begin, int end) {
		this.begin = begin;
		this.end = end;
	}
	
	@Override
	public Integer call() throws Exception {
		int sum = 0;
		for(int i=begin;i<end;i++) {
			sum += i;
		}
		return sum;
	}
}

运行结果:

4950:49990050

 

五.锁与原子操作

1.锁

        a.java.util.concurrent.lock包中提供了对锁的支持

        b.为使用synchronized控制对资源访问提供了替代机制

        c.基本操作模型:访问资源之前申请锁,访问完毕后释放锁

        d.lock/tryLock:申请锁

        e.unlock:释放锁

        f.具体锁类ReentrantLock实现了Lock接口

实例:

package com.bijian.concurrent.study;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {

	public static void main(String[] args) {
		
		new NT().start();
		new NT().start();
		new NT().start();
		new NT().start();
	}
}

class Data {
	static int i=0;
	static Lock lock = new ReentrantLock();
	//static synchronized void operate() {
	static void operate() {
		//操作之前申诅锁
		lock.lock();
		i++;
		System.out.println(i);
		//操作完毕释放锁
		lock.unlock();
	}
}

class NT extends Thread {
	public void run() {
		while(true) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			Data.operate();
		}
	}
}

运行结果:

1
2
3
4
5
6
7
8
...

2.原子操作

        a.java.util.concurrent.atom包中提供了对原子操作的支持

        b.提供了不需要锁以及其他同步机制就可以进行的一些不可中断操作

        c.主要操作为:获取、设置、比较等

实例:

package com.bijian.concurrent.study;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomDemo {

	public static void main(String[] args) {
		
		new ANT().start();
		new ANT().start();
		new ANT().start();
		new ANT().start();
	}
}

class AData {
	//用原子操作代替锁的机制
	static AtomicInteger ai = new AtomicInteger(0);
	
	static void operate() {
		System.out.println(ai.incrementAndGet());
	}
}

class ANT extends Thread {
	public void run() {
		while(true) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			AData.operate();
		}
	}
}

运行结果:

1
4
3
2
5
8
6
7
...

 

六.流编程

1.流的基本知识

        a.表示数据移动,移动过程中可能会对数据进行处理

        b.不同于IO流,表示流对象

        c.操作分为中间操作和终端操作

        d.中间操作会产生一个新流

        e.终端操作会消费流

2.流的编程模型

        a.获取流:stream/parallelSteam(获取串行流/并行流)

        b.操作:sort/max/min/...

3.流的基本操作

        过滤、排序、缩减、映射、收集、迭代

实例:

package com.bijian.concurrent.study;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class StreamDemo {

	public static void main(String[] args) {
		List<String> ls = new ArrayList<>();
		ls.add("abc");
		ls.add("def");
		ls.add("ddd");
		ls.add("eee");
		ls.add("def");
		ls.add("cha");
		
		//max属于终端操作
		Optional<String> max = ls.stream().max(String::compareTo);
		System.out.println("max:" + max.get());//max:eee
		
		//forEach属于终端操作,但sorted则是中间操作
		ls.stream().sorted().forEach(e -> System.out.println(e));
		
		//不重复的元属的个数
		System.out.println(ls.stream().distinct().count());//5
	}
}

运行结果:

max:eee
abc
cha
ddd
def
def
eee
5

 

七.Fork/Join框架

1. Fork/Join框架中的主要类

        a.ForkJoinTask<V>:描述任务的抽象类

        b.ForkJoinPool:管理ForkJoinTask的线程池

        c.RecursiveAction:ForkJoinTask子类,描述无返回值的任务

        d.RecursiveTask<V>:ForkJoinTask子类,描述有返回值的任务

 

2.分而治之策略

        a.将任务递归划分成更小的子任务,直到子任务足够小,从而能够被连续地处理掉为止

        b.优势是处理过程可以使用并行发生,这种情况特别适合基于多核处理器的并行编程

        c.根据Java API中定义,分而治之的建议临界点定义在100-1000个操作中的某个位置

 

3.Fork/Join框架案例

        计算1-100000的和

package com.bijian.concurrent.study;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo {

	public static void main(String[] args) throws Exception {
		
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		
		Future<Long> result = forkJoinPool.submit(new NTask(0, 1000001));
		System.out.println(result.get());
		
		forkJoinPool.shutdown();
	}
}

class NTask extends RecursiveTask<Long> {

	static final int THRESHOLD = 1000;
	
	private int begin, end;
	
	public NTask(int begin, int end) {
		this.begin = begin;
		this.end = end;
	}
	
	@Override
	protected Long compute() {
		
		long sum = 0;
		if((end - begin) <= THRESHOLD) {
			for(int i=begin;i<end;i++) {
				sum += i;
			}
		}else {
			int mid = (begin + end) / 2;
			NTask left = new NTask(begin, mid);
			left.fork();
			NTask right = new NTask(mid + 1, end);
			right.fork();
			
			Long lr = left.join();
			System.out.println(begin + "-" + mid + ":" + lr);
			Long rr = right.join();
			System.out.println(mid + "-" + end + ":" + rr);
			
			sum = lr + rr;
		}
		return sum;
	}
}

运行结果:

499488998835

 

视频学习地址:http://www.jikexueyuan.com/course/2091.html

分享到:
评论

相关推荐

    java并发工具包详解

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java并发工具包

    Java并发工具包是Java平台中的一个关键特性,它位于`java.util.concurrent`包下,为开发者提供了高效、安全的多线程编程支持。这个工具包的设计目标是简化并发编程,提高程序的性能和可维护性,同时避免了线程同步的...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    深入浅出_Java并发工具包原理讲解

    Java并发工具包(J.U.C)是Java编程语言中用于并发编程的一系列工具包的统称,它包含了一系列方便实现多线程编程的类和接口,使得开发者可以更加方便地编写高效、线程安全的程序。本文将深入浅出地探讨J.U.C的原理和...

    Java并发工具包对并发编程的优化.zip

    Java并发工具包是Java平台中一个非常重要的组成部分,它为开发者提供了高级的并发和多线程编程工具,极大地简化了并发编程的复杂性。在Java 5及更高版本中,Java并发工具包(java.util.concurrent)引入了一系列新的...

    java并发工具包 java.util.concurrent中文版-带书签版

    Java并发工具包(java.util.concurrent)是Java平台上用于高效、安全地处理多线程编程的重要组件。这个包包含了丰富的并发工具类,旨在帮助开发者构建高度并发的程序,提高程序的性能和可伸缩性。本资源是该工具包的...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java_Concurrent_java并发工具包.pdf

    Java并发工具包(Java Concurrency Utilities,简称J.U.C)是Java编程语言中的一个核心组件,它提供了丰富的类和接口,用于高效地处理多线程环境中的并发问题。这个工具包在Java 5.0版本中引入,极大地提升了开发者...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版pdf

    Java并发工具包java.util.concurrent是Java平台在Java 5版本中引入的一组新的并发编程类库,旨在帮助Java开发者更容易地实现复杂的并发程序。这一包的出现,极大地简化了开发者在处理线程和数据同步时所遇到的难题,...

    Java并发工具包对并发编程的优化.pdf

    Java并发工具包对并发编程的优化.pdf

    java并发工具包 java.util.concurrent中文版pdf

    ### Java并发工具包 `java.util.concurrent` 知识点详解 #### 一、引言 随着多核处理器的普及和应用程序复杂度的增加,多线程编程成为了现代软件开发不可或缺的一部分。为了简化并发编程的复杂性,Java 5 引入了 `...

    Java并发编程实践.pdf

    阻塞队列(BlockingQueue)是Java并发工具包中的一个重要组成部分,它能够保证在队列为空时取元素的操作会等待队列变为非空,而在队列满时插入元素的操作会等待队列中有空余空间。常见的阻塞队列实现有...

    Java开发工具包

    Java开发工具包(Java Development Kit,简称JDK)是Java编程语言的核心组件,它为开发者提供了编译、调试和运行Java应用程序所需的所有工具。JDK1.8.0_66是Oracle公司发布的一个特定版本,它包含了Java运行时环境...

    Java并发编程(一)

    并发集合(Concurrent Collections):Java并发工具包提供了一套特殊的集合类,如ConcurrentHashMap、CopyOnWriteArrayList等,它们在内部实现了高效线程安全的并发操作。 线程池(Thread Pool):ExecutorService...

    java_util_concurrent_user_guide并发工具包详解

    Java并发工具包(java.util.concurrent)是Java编程中不可或缺的一部分,它为多线程环境提供了高效、安全且易用的工具。这个包包含了各种类和接口,帮助开发者编写高效的并发程序,避免了直接操作线程所带来的复杂性...

    Java-并发(Concurrent)编程

    5. **Java并发工具包(JUC)**:包括原子类、并发容器(如`ConcurrentHashMap`)、阻塞队列(如`ArrayBlockingQueue`)以及锁机制。 6. **Lock和AQS**:Lock接口提供了比`synchronized`更灵活的锁机制,而...

    Java并发程序设计教程.pdf

    `CountDownlatch`和`Barrier`是Java并发工具包中用于协调多个线程完成特定任务的工具。`CountDownlatch`允许一个或多个线程等待其他线程完成一系列操作,而`Barrier`则确保一组线程在到达某个点之前不会继续执行。 ...

Global site tag (gtag.js) - Google Analytics