- 浏览: 29066 次
- 性别:
- 来自: 北京
最新评论
-
cin_ie:
通俗易懂,此文甚好。
JAVA设计模式--建造者模式 -
asleep886:
...
JAVA设计模式--建造者模式 -
asleep886:
JAVA设计模式--建造者模式
一:java并发的开篇
1、在实际应用中,经常用到线程的并发,那为什么需要用到并发呢,不能独自单独的程序处理吗,那很明确的说,多条线程做完成一件事情和一条线程去完成一件事情,那是无法同言而语的。比如在实际的网站访问过程中,世界各地的人都去同事访问一个网站,在服务器端,如果收到一个请求就对它先处理,而其他用户的请求到达只是对 他们先存储,然后在对他们进行处理,可想而知这样的速度会有多慢,如果一天之类用千万级的用户访问,那样的速度无法想象。在现实生活中这样的例子随处可见,做 一件事情,一个人去做和多个人同事做可想这样的速度是多么快,只是在做的过程当中对与大家共用的东西应当控制并发访问问题。
2、同步:当线程并发的时候,随之而来的也带来了一些问题,如果多条线程去同时操作共享而用的数据那将是会使共享的数据很容易就出现错误。在JAVA中提供同防止多条线程同时共享数据的方式是:synchronized,volatile很好的利用这两个关键字就能防止并发而带来的问题,同时在JAVA1.5之后也提供了更多好用的类来解决这个问题
3、线程池:当有许多请求需要去处理的时候,如果只是单独的一个人去处理,可想而知那会让后面在排队的人等多久,这样就需要线程池,有请求过来了就到线程池里 面取出一条线程去处理它,处理完成就把它收回到线程池里面,然而自己实现 一个功能强大的线程池也并非易事,在java1.5之后专门提供了线程池的类库
二、java.util.concurrent包中
1、java.util.concurrent.Executors类
通过这个类可以创建不同的线程池
1.1、创建固定大小的线程池
ExecutorService service = Executors.newFixedThreadPool(3);//创建一个大小为3的线程池
//通过线程池service,可以给线程池进行提交任务等操作
ExecutorService的方法
*提交任务
四个方法:
Submit(Callable<T> task);
submit(Runnable task)
submit(Runnable task, T result)
execute()
说明:前三个方法都是带有返回值的执行任务,最后一个方法只是单纯的执行任务,方法的返回值是Future类型的,它是支持泛型的,返回值要和泛型的类型一致
Future<String> future = service.submit(new Callable<String>() {
Public String call() throws Exception() {
Return “test”;
}
});
//得到返回值
String str = future.get();
当使用submit(Runnable task)的时候,里面执行的是run方法是没有返回值的,
得到的future.get()得到的是Null值,表示线程正常执行完成
线程执行完毕关闭线程池里拥有的线程:
两个方法:
Shutdown():调用的时候会先完成当前正在执行和已经提交的任务
shutdownNow():结束当前正在执行的任务和等待的任务也不会执行,返回待执行
任务列表
1.2、创建可变大小线程池
ExecutorService service = Executors. newCachedThreadPool();
这种方式返回的是一个可变的线程池,只要有新的任务来到,如果没有可用的线程存在就新建一个线程
1.3、创建一个定时器样式的线程池:在单位时间内执行任务
1.3.1、定时器:
public static void main5() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生爆炸");
service.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("蹦............");
return "success";
}
},
5,
TimeUnit.SECONDS);
}
1.3.2:每五秒钟发生一次爆炸
public static void main7() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生连环爆炸");
service.scheduleWithFixedDelay((new Runnable() {
@Override
public void run() {
System.out.println("蹦...........");
}
}), 5, 1, TimeUnit.SECONDS);
}
scheduleWithFixedDelay或者scheduleAtFixedRate方法都是能够产生循环定时器,只是实现方式不一样
说明:ScheduledExecutorService接口实现了ExecuorService所以它一样拥有ExecutoService的功能
1.4:创建单线程的线程池
ExecutorService service = Executors.newSingleThreadExecutor();
2、部分ExecuorService接口方法说明:
invokeAll(Collection<? extends Callable<T>> tasks):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们。返回所有任务执行完成后的future集合
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,当任务都完成或者是超时时,返回所有任务的状态future集合
invokeAny(Collection<? extends Callable<T>> tasks): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,每执行完成一个线程就返回一个future值
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,在指定时间内完成了一个任务就返回Future结果
3、对线程任务完成队列的合理处理类ExecutorCompletionService
利用线程池能很好的执行任务,并且得到任务的执行返回结果,当然可以自己对所有处理的结果进行处理,但是java.util.concurrent提供了一个很简单的类给我们用就是
ExecutorCompletionService 用它来执行任务,所有任务的返回结果都将放置在一个队列上,之后可以通过队列取得任务的执行结果,根据任务执行结果做相应的操作,同时也很方便的对所有的执行结果统一的处理
public static void main8() {
ExecutorService service = Executors.newCachedThreadPool();
CompletionService<Integer> cservice = new ExecutorCompletionService<Integer>(service);
Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
for(int i=0; i<10; i++) {
final int squence = i;
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for(int j=1; j<5; j++) { System.out.println(Thread.currentThread().getName() + " task " + squence + " time " + j);
}
return squence;
}
});
}
for(Callable<Integer> task:tasks) {
cservice.submit(task);
}
//统一处理任务结果
int result = 0;
for(int i=0; i<tasks.size(); i++) {
result += cservice.take().get();
}
System.out.println(result);
可知任务完成的时候,处理结果会存放在调用take()方法所获取的队列上面,方便统一处理结果
三、java.util.concurrent.locks
3.1、锁是用来防止线程并发而带来的线程安全问题,在访问线程共享数据的时候,操作它或者读取它的时候,如果不希望别的线程也同时对它就行读取,那应该给它上锁,这样只要一个线程进去了对他上锁了,别的线程就无法在进入了。有时候线程并发不是就是单一并发访问问题,并发访问同时可能也会导致数据不一致
public class LockTest {
public static void main(String[] args) {
final Service s = new Service ();
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<10; i++) {
service.execute(new Runnable() {
public void run() {
s.service();
}
});
}
}
static class Service {
private int count = 0;
Lock lock = new ReentrantLock();
public void service() {
++ count;
try {
Thread.sleep(200l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("service :" + count + " time");
}
}
}
对与以上程序,我们希望打印的是
service : 1 time
service : 2 time
……..
Service: 10 time
实际情况确实在多线程的环境里打印的结果确实
Service:10 time
Service:10 tiem
…..
Service :10 time
所以在调用service方法操作count的时候,应该上锁,service上锁后方法如下:
public void service() {
try {
lock.lock();//加锁
++count;
Thread.sleep(200l);
System.out.println("service :" + count + " time");
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();//释放锁
}
}
当然在JAVA里面同时还可以使用synchronized关键字来防止同步问题
3.2、读写锁,在JDK5.0以前是没用线程并发库的,如果用synchronized确实能实现同步问题,但是比如对与一个类的属性,希望的效果是在读取它的时候能多线程同时读,在写它的时候不能读,在读它的时候不能写。这样如果只是单单在方法上面加synchronized关键字,它会使所有的线程互斥,不能说多个线程能同时读取对象的值,虽然自己编写代码实现,但是JAVA5之后线程并发库已经给了解决方案,那就是读写锁,如下例子:
public class LockTest {
public static void main(String[] args) {
final ReadWriteLockT s = new ReadWriteLockT();
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 4; i++) {
service.execute(new Runnable() {
public void run() {
while(true) {
s.getX();
}
}
});
service.execute(new Runnable() {
public void run() {
while(true) {
s.setX(new Random().nextInt());
}
}
});
}
}
static class ReadWriteLockT{
private int x = 0;
private ReadWriteLock rock = new ReentrantReadWriteLock();
public void setX(int x) {
rock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + ".........进入写读锁");
this.x = x;
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("写完毕");
rock.writeLock().unlock();
}
public int getX() {
rock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "----------进入读读锁:");
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(x);
System.out.println("读完毕");
rock.readLock().unlock();
return x;
}
}
}
说明:从打印的结果中可以看出,在多条线程操作的过程中,在读的过程中别的线程是无法调用写的方法的,在写的过程中也是无法调用读的方法的,只有在读的时候才能同时调用读的方法,当然在写的时候不能同时调用写的方法
3.3、condition的使用
Lock锁得机制是在synchronized关键字上面进行了一些功能的提升,那么condition是在wait(),notify(),notifyall()上面做了对应的提升。对与唤醒线程,是线程等待它能做到更好的实现,即时大部分其实它是更加以一种优雅的形式去表现wait(),notify(),notifyall()的一种实现。其实在某种意义上说,之前的wait这样的方法只是针对一个对象的监视器,然而现在对功能的扩展,就好比能分解成多个监视器在监视一个对象一般,同时每个监视器和对应的锁形成 的组合,能更加有序的对线程进行唤醒和等待。下面结合JDK API帮助文档里的例子谈谈它们之间的区别
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
说明:如上面的例子,线程不断的调用put 和 take方法向队列items里面放入数据,放蛮的情况下就无法在放,唤醒拿的线程。 在拿的时候如果为空唤醒对应的放的线程,协调工作。
在这里如果是之前的wait,notify这样的方式的话,因为程序是有多条put和多条take的线程同时并发访问,wati,notify这样的方式只能唤醒一天线程,比如take发现是空的,就得put了,但同时也有好多其它的take线程在等待锁,所以这时候也有可能锁再一次给了take线程,这样的话显然不是自己的意愿,要的效果是让put线程拿到锁,而condition就做到了这样,上面例子可看出,只要take是空的,就唤醒一条put线程,相反只要put时是满的,就唤醒一条take线程
四、java.util.concurrent.atomic类讲解
类摘要
AtomicBoolean
可以用原子方式更新的 boolean 值。
AtomicInteger
可以用原子方式更新的 int 值。
AtomicIntegerArray
可以用原子方式更新其元素的 int 数组。
AtomicIntegerFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新。
AtomicLong
可以用原子方式更新的 long 值。
AtomicLongArray
可以用原子方式更新其元素的 long 数组。
AtomicLongFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新。
AtomicMarkableReference<V>
AtomicMarkableReference 维护带有标记位的对象引用,可以原子方式对其进行更新。
AtomicReference<V>
可以用原子方式更新的对象引用。
AtomicReferenceArray<E>
可以用原子方式更新其元素的对象引用数组。
AtomicReferenceFieldUpdater<T,V>
基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。
AtomicStampedReference<V>
AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
讲解:从上面的类中可以看出,它对基本的一些数值的操作做了线程同步的安全,有时候可能对一个数的加,多线线程同时访问加就会出现问题,甚至平时一般的i++,多个线程同事访问都会造成数据混乱。所以对多线程共享时候加一般用它来实现更加好,不过在java1.5之前可以使用volatile关键字来解决问题,然而现在有了这套类库就业可以避免每次为变量申明volatile关键字了。其实这些内部源码都也是基于volatitle的实现。也可以使用synchorzied关键字同步方法,但是用基于volatile的方式有更好的效率
但是volatile关键字其实还有线程安全问题,就是当多个线程对变量进行操作的时候还是可能会造成变量的值不准确性。 然而用这些类却解决了这个问题
用一个例子说明:
Class MyDate {
Private int volatile x;
Public void opera() {
X ++;
}
}
上面的类,当有多个线程同时执行opera的时候,虽然x已经用了volatile关键字同步,但是未能防止x的值出错,虽然出错的概率小
五java.util.concurrent.同步集合类的讲解
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet
集合类说明:传统的集合类中,在多线程共享的环境中会出现错误,所以使用这里的安全集合类就不会出现错误了。
六java.util.concurrent.队列的讲解
所有已知子接口:
BlockingDeque<E>
所有已知实现类:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
这些队列在线程并发中是安全的,也有阻塞队列,阻塞队列实际应用比较多,所以在这也给一个实现。
阻塞队列:在放的时候要有空为才能放,要不然执行等待,在拿的时候要有数据才能执行拿,要不然执行等待。
在前面例子中,用condition方式实现了类似的功能,用这里的阻塞队列将会发现更加方便。
例子实现:
package com.moom;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingDequeTest {
public static void main(String[] args) {
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(6);//创建一个只能容纳6个容量的阻塞队列
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<3; i++) {
//生产三条拿的线程
service.execute(new Runnable() {
public void run() {
try {
while(true) {//不停的拿
Thread.sleep(200l);
System.out.println(Thread.currentThread().getName() + "即将拿");
String s = queue.take();
System.out.println(Thread.currentThread().getName() + "拿到" + s);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
while(true) { //不停的放
try {
System.out.println(Thread.currentThread().getName() + "即将放");
String s = String.valueOf(new Random().nextInt(10000));
queue.put(s);
System.out.println(Thread.currentThread().getName() + "放入了" + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
七java.util.concurrent.剩余部分类的讲解
5.1、CyclicBarrier,有些多线程需要在多条线程完成了相应的事情之后然后就继续去完成后续的事情,每次都是这样,就是说当只有一条线程完成了对应的事情时,他是无法继续向前运行的,只有等待。
例子:
package com.moom;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(5);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
service.execute(new Runnable() {
@Override
public void run() {
// 每条线程要执行的任务
System.out.println("线程" + Thread.currentThread().getName()
+ "将执行完任务");
try {
barrier.await();// 线程执行完就等待,要五条线程都执行完才能做后续操作
// 执行后续任务
System.out.println("线程" + Thread.currentThread().getName() + "后续任务执行完毕");
barrier.await();//等待所有线程执行完后续任务
//执行最后任务,随意执行不等待,结束
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
注释:每次开启的线程数量应当和CyclicBarrier对象的线程数一样,比如上面例子是开启5,然而如果开六条线程的话会发觉程序先前5条线程正常执行下去,第六条执行完先前任务,一直阻塞等待着其它线程的到来。等于就是五条五条线程为一组。
5.2、Semaphore,对与有些多线程共享的数据,并不是说一条线程访问的时候其它线程就不能访问了,是规定只有多少条线程可以访问,好比一个房间里面只能容纳多少个人,只要没满的时候其它的线程就可以进去
例子:
package com.moom;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
final Semaphore se = new Semaphore(5);//创建同时可以五个线程访问控制器
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<20; i++) {
service.execute(new Runnable() {
public void run() {
try {
se.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "进来了开始执行任务");
Thread.sleep(100l);
System.out.println("线程" + Thread.currentThread().getName() + "将要执行完毕");
se.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
每次最多只会有5条线程在执行,没条线程执行完毕就释放资源
5.3、CountDownLatch有的多线程应用场景,一个线程唤醒多条线程,多条线程执行完毕有去唤醒这一条线程,就这个循环的唤醒或者单独的唤醒运用用CountDownLatch就比较方便的实现
例子场景:在各类运动比赛当中,裁判线程监视这所有的参赛人线程,参赛人也会监视裁判线程,当裁判说开始参赛人线程开始,参赛人比赛完成,裁判在做回应
实现:
package com.moom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
public static void main(String[] args) {
final CountDownLatch judje = new CountDownLatch(1);//产生一个裁判线程用
final CountDownLatch members = new CountDownLatch(5);//5个队员
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<5; i++) {
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(10l);
System.out.println(Thread.currentThread().getName() + "队员等待下命令");
judje.await();//等待裁判说开始
System.out.println(Thread.currentThread().getName() + "队员开始了");
//执行逻辑
members.countDown();//执行完成
System.out.println(Thread.currentThread().getName() + "结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
System.out.println(Thread.currentThread().getName() + "裁判即将通知开始");
try {
Thread.sleep(2000l);
System.out.println("开始");
judje.countDown();
members.await();//等待所有队员执行完
Thread.sleep(1000l);
System.out.println("裁判统计结果");
service.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5.4、Exchanger有些线程同步需要互相交换数据,比如两天线程执行完毕之后需要交换完毕的数据用exchanger的可以方便的实现
例子实现:
package com.moom;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> ex = new Exchanger<String>();
final Exchanger<String> ex2 = new Exchanger<String>();
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(100l);
String da1 = "aaaa";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
Thread.sleep(100l);
String da1 = "bbbbb";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
八、FAQ
6.1、为什么使用volatile更有效率
同步的代价, 主要由其覆盖范围决定, 如果可以降低同步的覆盖范围, 则可以大幅提升程序性能.而volatile的覆盖范围仅仅变量级别的. 因此它的同步代价很低.
1、在实际应用中,经常用到线程的并发,那为什么需要用到并发呢,不能独自单独的程序处理吗,那很明确的说,多条线程做完成一件事情和一条线程去完成一件事情,那是无法同言而语的。比如在实际的网站访问过程中,世界各地的人都去同事访问一个网站,在服务器端,如果收到一个请求就对它先处理,而其他用户的请求到达只是对 他们先存储,然后在对他们进行处理,可想而知这样的速度会有多慢,如果一天之类用千万级的用户访问,那样的速度无法想象。在现实生活中这样的例子随处可见,做 一件事情,一个人去做和多个人同事做可想这样的速度是多么快,只是在做的过程当中对与大家共用的东西应当控制并发访问问题。
2、同步:当线程并发的时候,随之而来的也带来了一些问题,如果多条线程去同时操作共享而用的数据那将是会使共享的数据很容易就出现错误。在JAVA中提供同防止多条线程同时共享数据的方式是:synchronized,volatile很好的利用这两个关键字就能防止并发而带来的问题,同时在JAVA1.5之后也提供了更多好用的类来解决这个问题
3、线程池:当有许多请求需要去处理的时候,如果只是单独的一个人去处理,可想而知那会让后面在排队的人等多久,这样就需要线程池,有请求过来了就到线程池里 面取出一条线程去处理它,处理完成就把它收回到线程池里面,然而自己实现 一个功能强大的线程池也并非易事,在java1.5之后专门提供了线程池的类库
二、java.util.concurrent包中
1、java.util.concurrent.Executors类
通过这个类可以创建不同的线程池
1.1、创建固定大小的线程池
ExecutorService service = Executors.newFixedThreadPool(3);//创建一个大小为3的线程池
//通过线程池service,可以给线程池进行提交任务等操作
ExecutorService的方法
*提交任务
四个方法:
Submit(Callable<T> task);
submit(Runnable task)
submit(Runnable task, T result)
execute()
说明:前三个方法都是带有返回值的执行任务,最后一个方法只是单纯的执行任务,方法的返回值是Future类型的,它是支持泛型的,返回值要和泛型的类型一致
Future<String> future = service.submit(new Callable<String>() {
Public String call() throws Exception() {
Return “test”;
}
});
//得到返回值
String str = future.get();
当使用submit(Runnable task)的时候,里面执行的是run方法是没有返回值的,
得到的future.get()得到的是Null值,表示线程正常执行完成
线程执行完毕关闭线程池里拥有的线程:
两个方法:
Shutdown():调用的时候会先完成当前正在执行和已经提交的任务
shutdownNow():结束当前正在执行的任务和等待的任务也不会执行,返回待执行
任务列表
1.2、创建可变大小线程池
ExecutorService service = Executors. newCachedThreadPool();
这种方式返回的是一个可变的线程池,只要有新的任务来到,如果没有可用的线程存在就新建一个线程
1.3、创建一个定时器样式的线程池:在单位时间内执行任务
1.3.1、定时器:
public static void main5() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生爆炸");
service.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("蹦............");
return "success";
}
},
5,
TimeUnit.SECONDS);
}
1.3.2:每五秒钟发生一次爆炸
public static void main7() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生连环爆炸");
service.scheduleWithFixedDelay((new Runnable() {
@Override
public void run() {
System.out.println("蹦...........");
}
}), 5, 1, TimeUnit.SECONDS);
}
scheduleWithFixedDelay或者scheduleAtFixedRate方法都是能够产生循环定时器,只是实现方式不一样
说明:ScheduledExecutorService接口实现了ExecuorService所以它一样拥有ExecutoService的功能
1.4:创建单线程的线程池
ExecutorService service = Executors.newSingleThreadExecutor();
2、部分ExecuorService接口方法说明:
invokeAll(Collection<? extends Callable<T>> tasks):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们。返回所有任务执行完成后的future集合
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,当任务都完成或者是超时时,返回所有任务的状态future集合
invokeAny(Collection<? extends Callable<T>> tasks): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,每执行完成一个线程就返回一个future值
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,在指定时间内完成了一个任务就返回Future结果
3、对线程任务完成队列的合理处理类ExecutorCompletionService
利用线程池能很好的执行任务,并且得到任务的执行返回结果,当然可以自己对所有处理的结果进行处理,但是java.util.concurrent提供了一个很简单的类给我们用就是
ExecutorCompletionService 用它来执行任务,所有任务的返回结果都将放置在一个队列上,之后可以通过队列取得任务的执行结果,根据任务执行结果做相应的操作,同时也很方便的对所有的执行结果统一的处理
public static void main8() {
ExecutorService service = Executors.newCachedThreadPool();
CompletionService<Integer> cservice = new ExecutorCompletionService<Integer>(service);
Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
for(int i=0; i<10; i++) {
final int squence = i;
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for(int j=1; j<5; j++) { System.out.println(Thread.currentThread().getName() + " task " + squence + " time " + j);
}
return squence;
}
});
}
for(Callable<Integer> task:tasks) {
cservice.submit(task);
}
//统一处理任务结果
int result = 0;
for(int i=0; i<tasks.size(); i++) {
result += cservice.take().get();
}
System.out.println(result);
可知任务完成的时候,处理结果会存放在调用take()方法所获取的队列上面,方便统一处理结果
三、java.util.concurrent.locks
3.1、锁是用来防止线程并发而带来的线程安全问题,在访问线程共享数据的时候,操作它或者读取它的时候,如果不希望别的线程也同时对它就行读取,那应该给它上锁,这样只要一个线程进去了对他上锁了,别的线程就无法在进入了。有时候线程并发不是就是单一并发访问问题,并发访问同时可能也会导致数据不一致
public class LockTest {
public static void main(String[] args) {
final Service s = new Service ();
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<10; i++) {
service.execute(new Runnable() {
public void run() {
s.service();
}
});
}
}
static class Service {
private int count = 0;
Lock lock = new ReentrantLock();
public void service() {
++ count;
try {
Thread.sleep(200l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("service :" + count + " time");
}
}
}
对与以上程序,我们希望打印的是
service : 1 time
service : 2 time
……..
Service: 10 time
实际情况确实在多线程的环境里打印的结果确实
Service:10 time
Service:10 tiem
…..
Service :10 time
所以在调用service方法操作count的时候,应该上锁,service上锁后方法如下:
public void service() {
try {
lock.lock();//加锁
++count;
Thread.sleep(200l);
System.out.println("service :" + count + " time");
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();//释放锁
}
}
当然在JAVA里面同时还可以使用synchronized关键字来防止同步问题
3.2、读写锁,在JDK5.0以前是没用线程并发库的,如果用synchronized确实能实现同步问题,但是比如对与一个类的属性,希望的效果是在读取它的时候能多线程同时读,在写它的时候不能读,在读它的时候不能写。这样如果只是单单在方法上面加synchronized关键字,它会使所有的线程互斥,不能说多个线程能同时读取对象的值,虽然自己编写代码实现,但是JAVA5之后线程并发库已经给了解决方案,那就是读写锁,如下例子:
public class LockTest {
public static void main(String[] args) {
final ReadWriteLockT s = new ReadWriteLockT();
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 4; i++) {
service.execute(new Runnable() {
public void run() {
while(true) {
s.getX();
}
}
});
service.execute(new Runnable() {
public void run() {
while(true) {
s.setX(new Random().nextInt());
}
}
});
}
}
static class ReadWriteLockT{
private int x = 0;
private ReadWriteLock rock = new ReentrantReadWriteLock();
public void setX(int x) {
rock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + ".........进入写读锁");
this.x = x;
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("写完毕");
rock.writeLock().unlock();
}
public int getX() {
rock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "----------进入读读锁:");
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(x);
System.out.println("读完毕");
rock.readLock().unlock();
return x;
}
}
}
说明:从打印的结果中可以看出,在多条线程操作的过程中,在读的过程中别的线程是无法调用写的方法的,在写的过程中也是无法调用读的方法的,只有在读的时候才能同时调用读的方法,当然在写的时候不能同时调用写的方法
3.3、condition的使用
Lock锁得机制是在synchronized关键字上面进行了一些功能的提升,那么condition是在wait(),notify(),notifyall()上面做了对应的提升。对与唤醒线程,是线程等待它能做到更好的实现,即时大部分其实它是更加以一种优雅的形式去表现wait(),notify(),notifyall()的一种实现。其实在某种意义上说,之前的wait这样的方法只是针对一个对象的监视器,然而现在对功能的扩展,就好比能分解成多个监视器在监视一个对象一般,同时每个监视器和对应的锁形成 的组合,能更加有序的对线程进行唤醒和等待。下面结合JDK API帮助文档里的例子谈谈它们之间的区别
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
说明:如上面的例子,线程不断的调用put 和 take方法向队列items里面放入数据,放蛮的情况下就无法在放,唤醒拿的线程。 在拿的时候如果为空唤醒对应的放的线程,协调工作。
在这里如果是之前的wait,notify这样的方式的话,因为程序是有多条put和多条take的线程同时并发访问,wati,notify这样的方式只能唤醒一天线程,比如take发现是空的,就得put了,但同时也有好多其它的take线程在等待锁,所以这时候也有可能锁再一次给了take线程,这样的话显然不是自己的意愿,要的效果是让put线程拿到锁,而condition就做到了这样,上面例子可看出,只要take是空的,就唤醒一条put线程,相反只要put时是满的,就唤醒一条take线程
四、java.util.concurrent.atomic类讲解
类摘要
AtomicBoolean
可以用原子方式更新的 boolean 值。
AtomicInteger
可以用原子方式更新的 int 值。
AtomicIntegerArray
可以用原子方式更新其元素的 int 数组。
AtomicIntegerFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新。
AtomicLong
可以用原子方式更新的 long 值。
AtomicLongArray
可以用原子方式更新其元素的 long 数组。
AtomicLongFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新。
AtomicMarkableReference<V>
AtomicMarkableReference 维护带有标记位的对象引用,可以原子方式对其进行更新。
AtomicReference<V>
可以用原子方式更新的对象引用。
AtomicReferenceArray<E>
可以用原子方式更新其元素的对象引用数组。
AtomicReferenceFieldUpdater<T,V>
基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。
AtomicStampedReference<V>
AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
讲解:从上面的类中可以看出,它对基本的一些数值的操作做了线程同步的安全,有时候可能对一个数的加,多线线程同时访问加就会出现问题,甚至平时一般的i++,多个线程同事访问都会造成数据混乱。所以对多线程共享时候加一般用它来实现更加好,不过在java1.5之前可以使用volatile关键字来解决问题,然而现在有了这套类库就业可以避免每次为变量申明volatile关键字了。其实这些内部源码都也是基于volatitle的实现。也可以使用synchorzied关键字同步方法,但是用基于volatile的方式有更好的效率
但是volatile关键字其实还有线程安全问题,就是当多个线程对变量进行操作的时候还是可能会造成变量的值不准确性。 然而用这些类却解决了这个问题
用一个例子说明:
Class MyDate {
Private int volatile x;
Public void opera() {
X ++;
}
}
上面的类,当有多个线程同时执行opera的时候,虽然x已经用了volatile关键字同步,但是未能防止x的值出错,虽然出错的概率小
五java.util.concurrent.同步集合类的讲解
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet
集合类说明:传统的集合类中,在多线程共享的环境中会出现错误,所以使用这里的安全集合类就不会出现错误了。
六java.util.concurrent.队列的讲解
所有已知子接口:
BlockingDeque<E>
所有已知实现类:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
这些队列在线程并发中是安全的,也有阻塞队列,阻塞队列实际应用比较多,所以在这也给一个实现。
阻塞队列:在放的时候要有空为才能放,要不然执行等待,在拿的时候要有数据才能执行拿,要不然执行等待。
在前面例子中,用condition方式实现了类似的功能,用这里的阻塞队列将会发现更加方便。
例子实现:
package com.moom;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingDequeTest {
public static void main(String[] args) {
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(6);//创建一个只能容纳6个容量的阻塞队列
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<3; i++) {
//生产三条拿的线程
service.execute(new Runnable() {
public void run() {
try {
while(true) {//不停的拿
Thread.sleep(200l);
System.out.println(Thread.currentThread().getName() + "即将拿");
String s = queue.take();
System.out.println(Thread.currentThread().getName() + "拿到" + s);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
while(true) { //不停的放
try {
System.out.println(Thread.currentThread().getName() + "即将放");
String s = String.valueOf(new Random().nextInt(10000));
queue.put(s);
System.out.println(Thread.currentThread().getName() + "放入了" + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
七java.util.concurrent.剩余部分类的讲解
5.1、CyclicBarrier,有些多线程需要在多条线程完成了相应的事情之后然后就继续去完成后续的事情,每次都是这样,就是说当只有一条线程完成了对应的事情时,他是无法继续向前运行的,只有等待。
例子:
package com.moom;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(5);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
service.execute(new Runnable() {
@Override
public void run() {
// 每条线程要执行的任务
System.out.println("线程" + Thread.currentThread().getName()
+ "将执行完任务");
try {
barrier.await();// 线程执行完就等待,要五条线程都执行完才能做后续操作
// 执行后续任务
System.out.println("线程" + Thread.currentThread().getName() + "后续任务执行完毕");
barrier.await();//等待所有线程执行完后续任务
//执行最后任务,随意执行不等待,结束
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
注释:每次开启的线程数量应当和CyclicBarrier对象的线程数一样,比如上面例子是开启5,然而如果开六条线程的话会发觉程序先前5条线程正常执行下去,第六条执行完先前任务,一直阻塞等待着其它线程的到来。等于就是五条五条线程为一组。
5.2、Semaphore,对与有些多线程共享的数据,并不是说一条线程访问的时候其它线程就不能访问了,是规定只有多少条线程可以访问,好比一个房间里面只能容纳多少个人,只要没满的时候其它的线程就可以进去
例子:
package com.moom;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
final Semaphore se = new Semaphore(5);//创建同时可以五个线程访问控制器
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<20; i++) {
service.execute(new Runnable() {
public void run() {
try {
se.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "进来了开始执行任务");
Thread.sleep(100l);
System.out.println("线程" + Thread.currentThread().getName() + "将要执行完毕");
se.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
每次最多只会有5条线程在执行,没条线程执行完毕就释放资源
5.3、CountDownLatch有的多线程应用场景,一个线程唤醒多条线程,多条线程执行完毕有去唤醒这一条线程,就这个循环的唤醒或者单独的唤醒运用用CountDownLatch就比较方便的实现
例子场景:在各类运动比赛当中,裁判线程监视这所有的参赛人线程,参赛人也会监视裁判线程,当裁判说开始参赛人线程开始,参赛人比赛完成,裁判在做回应
实现:
package com.moom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
public static void main(String[] args) {
final CountDownLatch judje = new CountDownLatch(1);//产生一个裁判线程用
final CountDownLatch members = new CountDownLatch(5);//5个队员
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<5; i++) {
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(10l);
System.out.println(Thread.currentThread().getName() + "队员等待下命令");
judje.await();//等待裁判说开始
System.out.println(Thread.currentThread().getName() + "队员开始了");
//执行逻辑
members.countDown();//执行完成
System.out.println(Thread.currentThread().getName() + "结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
System.out.println(Thread.currentThread().getName() + "裁判即将通知开始");
try {
Thread.sleep(2000l);
System.out.println("开始");
judje.countDown();
members.await();//等待所有队员执行完
Thread.sleep(1000l);
System.out.println("裁判统计结果");
service.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5.4、Exchanger有些线程同步需要互相交换数据,比如两天线程执行完毕之后需要交换完毕的数据用exchanger的可以方便的实现
例子实现:
package com.moom;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> ex = new Exchanger<String>();
final Exchanger<String> ex2 = new Exchanger<String>();
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(100l);
String da1 = "aaaa";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
Thread.sleep(100l);
String da1 = "bbbbb";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
八、FAQ
6.1、为什么使用volatile更有效率
同步的代价, 主要由其覆盖范围决定, 如果可以降低同步的覆盖范围, 则可以大幅提升程序性能.而volatile的覆盖范围仅仅变量级别的. 因此它的同步代价很低.
相关推荐
通过对Java多线程机制的研究,我们可以发现Java提供了一整套完善的支持并发编程的工具和技术。从理论层面理解进程和线程的概念,到具体实践中的线程控制、同步机制的设计,再到高级并发结构的实现,Java多线程机制为...
通过以上分析可以看出,Java多线程加队列上传文件的实现不仅涉及基础的网络编程和文件操作知识,还需要掌握高级的多线程管理和并发控制技术。这种技术方案适用于需要高效处理大量文件上传请求的场景,比如云存储服务...
Java多线程网络爬虫是一种高效地从互联网上抓取数据的技术,特别是在处理大量网页时,多线程能显著提高爬虫的效率。本项目以时光网(Mtime)为例,展示如何使用Java实现这样的爬虫。时光网是一个提供电影、电视剧、...
Java多线程技术在爬虫应用中的重要性不言而喻,它能显著提升图片抓取的效率。本文将深入探讨如何使用Java实现多线程爬虫,以及压缩包中的三个示例案例。 首先,我们需要了解Java中的多线程概念。在Java中,通过创建...
下面将详细讲解Java多线程下载的相关知识点: 1. **线程基础**:在Java中,线程是程序中的执行流。`Thread`类是所有线程的基类,我们可以通过继承它或实现`Runnable`接口来创建线程。创建线程主要有两种方式:直接...
Java多线程小游戏是学习Java并发编程的一个经典实践案例,它可以帮助开发者深入理解线程的创建、同步、通信以及管理。在这个课程设计或毕业设计项目中,学生通常会被要求实现一个能够展示多线程特性的游戏,比如经典...
总的来说,《基于Java多线程的HTTP代理服务器的研究与实现》这篇文档会详细阐述如何利用Java的多线程、网络编程和并发特性来构建一个功能完善的HTTP代理服务器,包括设计思路、核心技术、实现细节和最佳实践。...
Java多线程断点下载文件是一种高效的文件下载方式,它允许在下载过程中暂停并从上次停止的地方继续,尤其适用于大...在实际应用中,上述知识点结合具体的代码实现,可以构建一个功能完善的Java多线程断点下载文件系统。
综上所述,这个项目涉及到的知识点包括:分布式系统设计、Java多线程与线程池、Spring框架的多数据源支持、MyBatis的使用以及Spring的事务管理。通过这些技术的组合,可以构建出一个高效、可扩展的分布式应用,以...
标题《Java多线程编程深入详解》所涉及的知识点涵盖了Java多线程编程的核心思想、原理以及在实际开发中可能遇到的问题和解决方案。以下是对这些知识点的详细阐述: 1. 多进程与多线程概念的区分和理解 - 进程是...
Java多线程断点续传下载程序是一种高级的软件实现技术,它结合了Java的并发处理能力和文件传输的策略,以提高下载效率和用户体验。在这个项目中,我们主要关注两个核心概念:多线程和断点续传。 首先,多线程是Java...
Java多线程断点下载是Java编程中一种高级技术,主要应用于大文件的网络传输,如视频、软件安装包等。这种技术结合了多线程和断点续传的概念,提高了下载效率并允许用户在中断后继续从上次停止的位置开始下载。 首先...
本项目名为"java多线程抽奖器",正是利用Java的多线程特性,实现了一个功能完善的抽奖系统。下面我们将深入探讨该项目的核心知识点。 首先,我们来解析项目的组成部分。项目中的主要文件包括`.java`源代码文件、`....
### Java多线程网络传输文件(非同步) #### 概述 在现代软件开发中,尤其是在涉及大量数据传输的应用场景下,高效的文件传输方案尤为重要。Java作为一种广泛应用的编程语言,提供了丰富的工具和技术来实现高性能...
通过以上这些技术的结合,我们可以创建出一个功能完善的Java多线程聊天程序,其GUI界面允许用户直观地进行交互,而Socket通信则保证了数据的准确无误地在网络间传递。这个程序不仅有助于理解Java的并发编程,同时也...
《Java 7并发编程实战手册》是一本深入探讨Java多线程编程的宝贵资源,它为读者提供了丰富的实例,帮助开发者掌握在Java 7环境中进行高效并发编程的关键技能。本书涵盖了从基本概念到高级特性的全方位知识,是提升...
在Java编程中,多线程下载是一种常见的优化...通过以上讲解,你应该对Java多线程下载有了全面的理解,包括其基本概念、创建线程的方法、下载原理以及示例代码。在实际开发中,可以根据项目需求进行适当的优化和调整。
通过这些类的协同工作,Java多线程和Socket技术构建了一个简单的聊天室环境,实现了用户之间的实时交流。开发者可以进一步优化这个应用,比如增加身份验证、消息加密、错误处理等功能,使其更加完善和安全。 总之,...
综上所述,Java TCP线程聊天是一个涉及网络编程、多线程、数据传输等多个方面的实践项目,通过学习和实现这个项目,可以深入理解这些关键概念和技术。在实际开发中,还可以根据需求进一步完善和优化。
《Java 7并发编程实战手册》是一本深入探讨Java并发编程的权威著作,它涵盖了大量实用的技巧和最佳实践,旨在帮助开发者在多线程环境下编写高效、安全的代码。这本书的实例代码提供了丰富的示例,使读者能够直观地...