`
fonter
  • 浏览: 869230 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

[精] java 5.0 中的 同步(Concurrent) [转]

    博客分类:
  • J2SE
阅读更多

9.   同步(Concurrent)

    
1.      Executor接口

     Executor接口提供了一个类似于线程池的管理工具。用于只需要往Executor中提交Runnable对象,剩下的启动线程等工作,都会有对应的实现类来完成。ScheduledExecutorService比ExecutorService增加了,时间上的控制,即用户可以在提交的时候额外的定义该任务的启动时机,以及随后的执行间隔和延迟等。

     例子:

     任务:

     public class ETask implements Runnable{

          private int id = 0;

          public ETask(int id){

               this.id = id;

          }

          public void run(){

               try{

                   System.out.println(id+" Start");

                   Thread.sleep(1000);

                   System.out.println(id+" Do");

                   Thread.sleep(1000);

                   System.out.println(id+" Exit");

              }catch(Exception e){

                   e.printStackTrace();

              }

          }

     }

    

     测试类:

     public class ETest{

          public static void main(String[] args){       

              ExecutorService executor = Executors.newFixedThreadPool(2);

              for(int i=0;i<5;i++){

                   Runnable r = new ETask(i);

                   executor.execute(r);

              }

              executor.shutdown();

          }

     }

 

     输出:

     0 Start

     1 Start

     0 Do

     1 Do

     0 Exit

     2 Start

     1 Exit

     3 Start

     2 Do

     3 Do

     2 Exit

     3 Exit

     4 Start

     4 Do

     4 Exit

 

2.      Future和Callable

     Callable是一个类似于Runnable的接口,他与Runnable的区别是,她在执行完毕之后能够返回结果。Future用于获取线程的执行结果,或者取消已向Executor的任务。当我们通过Future提供的get()方法获取任务的执行结果时,如果任务没有完成,则调用get()方法的线程将会被阻塞,知道任务完成为止。一般我们都会使用Future的实现类FutureTask。

     例子:

     Callable对象:

     public class ETask implements Callable{

          private String id = null;

          public ETask(String id){

               this.id = id;

          }

    

          public String call(){

              try{

                   System.out.println(id+" Start");

                   Thread.sleep(1000);

                   System.out.println(id+" Do");

                   Thread.sleep(1000);

                   System.out.println(id+" Exit");          

              }catch(Exception e){

                   e.printStackTrace();

              }

              return id;

          }

     }

 

     测试类:

     public class ETest{

          public static void main(String[] args){       

              ExecutorService executor = Executors.newFixedThreadPool(2);

              for(int i=0;i<5;i++){           

                   try{

                       Callable c = new ETask(String.valueOf(i));

                        FutureTask ft = new FutureTask(c);

                        executor.execute(ft);

                        System.out.println("Finish:" + ft.get());         

                   }catch(Exception e){

                       e.printStackTrace();

                   }

              }

               executor.shutdown();

          }

     }

 

     输出:

     0 Start

     0 Do

     0 Exit

     Finish:0

     1 Start

     1 Do

     1 Exit

     Finish:1

     2 Start

     …

3.      CompletionService和ExecutorCompletionService

     CompletionService类似于一个Executor和Queue的混合。我们可以通过submit()向CompletionService提交任务,然后通过poll()来获取第一个完成的任务,也可以通过take()来阻塞等待下一个完成的任务。ExecutorCompletionService是CompletionService的实现类,他需要提供一个Executor作为构造函数的参数。

     例子:

     Executor executor = …;

     CompletionService cs = new ExecutorCompletionService(executor);

     Future fs = cs.submit(…);

     Future ft = cs.take();

 

4.      Semaphore

     信号量是用于同步和互斥的低级原语。信号量提供的acquire()和release()操作,与操作系统上的p,v操作同。

     例子:

     缓冲区:

     public class Buffer{

          private Semaphore s = null;

          private Semaphore p = null;

          Vector<Integer> v = new Vector<Integer>();

         

          public Buffer(int capacity){

               s = new Semaphore(capacity);

              p = new Semaphore(0);

          }

    

          public void put(int i){

              try{

                   s.acquire();

                   v.add(new Integer(i));

                   p.release();

               }catch(Exception e){

                   e.printStackTrace();

              }

          }

    

          public int get(){ 

               int i = 0;

              try{

                   p.acquire();

                   i = ((Integer)v.remove(0)).intValue();

                   s.release();

              }catch(Exception e){

                   e.printStackTrace();

              }

               return i;

          }   

     }

 

     生产者:

     public class Producer extends Thread{

          private Buffer b;

          private int count;

          private int step;

          private int id;

 

          public Producer(Buffer b,int step,int id){

               this.b =  b;

              this.step = step;

              this.id = id;

               count = 0;

          }

    

          public void run(){

              try{

                   while(true){

                       System.out.println("In put");

                        b.put(count);

                        System.out.println("Producer "+id+":"+count);

                        count++;

                       Thread.sleep(step);

                        System.out.println("Out put");

                   }

               }catch(Exception e){

                   e.printStackTrace();

              }

          }

     }

 

     消费者:

     public class Consumer extends Thread{

          private Buffer b;

          private int step;

          private int id;

    

          public Consumer(Buffer b,int step,int id){

              this.b = b;

               this.step = step;

              this.id = id;

          }

         

          public void run(){

              try{

                   while(true){

                        System.out.println("In get");

                       System.out.println("\t\tConsume "+id+":"+b.get());

                        System.out.println("Out get");

                        Thread.sleep(step);

                   }

               }catch(Exception e){

                   e.printStackTrace();

              }   

          }

     }

 

     测试程序:

     public class CPTest{

          public static void main(String[] args){

               Buffer b = new Buffer(3);

              Consumer c1 = new Consumer(b,1000,1);

              Consumer c2 = new Consumer(b,1000,2);

               Producer p1 = new Producer(b,100,1);

              Producer p2 = new Producer(b,100,2);

        

              c1.start();

               c2.start();

              p1.start();

              p2.start();

          }

     }

 

5.      CyclicBarrier

     CyclicBarrier可以让一组线程在某一个时间点上进行等待,当所有进程都到达该等待点后,再继续往下执行。CyclicBarrier使用完以后,通过调用reset()方法,可以重用该CyclicBarrier。线程通过调用await()来减少计数。

 

CyclicBarrier
 
 

 

 

 

 

 

 

     例子:

     任务:

     public class Task extends Thread{

          private String id;

          private CyclicBarrier c;

          private int time;

    

          public Task(CyclicBarrier c,String id,int time){

               this.c = c;

              this.id = id;

               this.time = time;

          }

    

          public void run(){

               try{

                   System.out.println(id+" Start");

                  Thread.sleep(time);

                   System.out.println(id+" Finish");

                   c.await();

                   System.out.println(id+" Exit");         

               }catch(Exception e){

                   e.printStackTrace();

              }

          }   

     }

 

     测试类:

     public class Test{

          public static void main(String[] args){

              CyclicBarrier c = new CyclicBarrier(3,new Runnable(){

                   public void run(){

                        System.out.println("All Work Done");

                   }

              });

               Task t1 = new Task(c,"1",1000);

              Task t2 = new Task(c,"2",3000);

              Task t3 = new Task(c,"3",5000);

               t1.start();

              t2.start();

              t3.start();       

          }

     }

 

     输出结果:

     1 Start

     2 Start

     3 Start

     1 Finish

     2 Finish

     3 Finish

     All Work Done

     3 Exit

     1 Exit

     2 Exit

 

6.      CountdownLatch

     CountdownLatch具有与CyclicBarrier相似的功能,也能让一组线程在某个点上进行同步。但是与CyclicBarrier不同的是:1.CountdownLatch不能重用,2.线程在CountdownLatch上调用await()操作一定会被阻塞,直到计数值为0时才会被唤醒,而且计数值只能通过conutDown()方法进行减少。

特别的,当CountdownLatch的值为1时,该Latch被称为“启动大门”,所有任务线程都在该Latch上await(),直到某个非任务线程调用countDown()触发,所有任务线程开始同时工作。

 

7.      Exchanger

     Exchanger是一个类似于计数值为2的CyclicBarrier。她允许两个线程在某个点上进行数据交换。

       例子:

     public class FillAndEmpty {

         Exchanger<DataBuffer> exchanger = new Exchanger();

         DataBuffer initialEmptyBuffer = ... a made-up type

         DataBuffer initialFullBuffer = ...

 

         public class FillingLoop implements Runnable {

              public void run() {

                   DataBuffer currentBuffer = initialEmptyBuffer;

                   try {

                       while (currentBuffer != null) {

                            addToBuffer(currentBuffer);

                            if (currentBuffer.full())

                                 currentBuffer = exchanger.exchange(currentBuffer);

                       }

                   }catch(InterruptedException ex) { ... handle ... }

              }

         }

 

         public class EmptyingLoop implements Runnable {

              public void run() {

                   DataBuffer currentBuffer = initialFullBuffer;

                   try {

                       while (currentBuffer != null) {

                            takeFromBuffer(currentBuffer);

                            if (currentBuffer.empty())

                                 currentBuffer = exchanger.exchange(currentBuffer);

                       }

                   } catch (InterruptedException ex) { ... handle ...}

              }

         }

 

         public void start() {

              new Thread(new FillingLoop()).start();

              new Thread(new EmptyingLoop()).start();

         }

     }

Exchange
 
 

 

    

 

 

 

 

 

 

 

 

8.      Lock,Condition

     锁是最基本的同步原语。通过在锁上面调用lock()和unlock()操作,可以达到与synchronized关键字相似的效果,但是有一点要注意的是,锁必须显式释放,如果由于抛出异常,而没有释放锁,将导致死锁出现。Condition提供的await(),signal(),signal()操作,与原来的wai(),notify(),notifyAll()操作具有相似的含义。Lock的两个主要子类是ReentrantLock和ReadWriteLock。其中ReadWriteLock的作用是允许多人读,而一人写。

     例子:

     使用Lock和Condition的生产者,消费者问题

     public class BoundedBuffer {

         final Lock lock = new ReentrantLock();

         final Condition notFull  = lock.newCondition();

         final Condition notEmpty = lock.newCondition();

         final Object[] items = new Object[100];

         int putptr, takeptr, count;

        

         public void put(Object x) throws InterruptedException {

              lock.lock();

              try {

                   while (count == items.length)

                       notFull.await();

                   items[putptr] = x;

                   if (++putptr == items.length)

                        putptr = 0;

                   ++count;

                   notEmpty.signal();

              } finally {

                   lock.unlock();

               }

          }

    

          public Object take() throws InterruptedException {

               lock.lock();

              try {

                   while (count == 0)

                       notEmpty.await();

                   Object x = items[takeptr];

                   if (++takeptr == items.length)

                        takeptr = 0;

                   --count;

                   notFull.signal();

                   return x;

               } finally {

                   lock.unlock();

              }

          }

     }   

 

9.      小结:新的concurrent包提供了一个从低到高的同步操作。

分享到:
评论

相关推荐

    Java5.0 Tiger程序高手秘笈(含源码)

    本书《Java5.0 Tiger程序高手秘笈》正是为了帮助开发者掌握这些新特性而编写,结合源码分析,将有助于深入理解Java 5.0的核心改进。 1. **泛型**:Java 5.0引入了泛型,这是一种类型安全机制,允许在编译时检查类型...

    Java5.0多线程编程实践.pdf

    首先,Java 5.0中引入了`java.util.concurrent.Executors`工厂类,它提供了一系列静态工厂方法来创建不同类型的线程池。其中`newFixedThreadPool`方法可以根据需要创建一个固定大小的线程池,这个线程池中的线程数量...

    Java 5.0多线程编程

    1. **Concurrent 包**:Java 5.0 中新增了 `java.util.concurrent` 包,提供了许多高级的并发工具类,如 `ExecutorService`、`Future`、`Semaphore` 等,这些工具简化了线程管理和同步控制。 2. **原子操作类**:`...

    Java5.0Tiger程序高手秘笈PDF.rar

    10. **并发改进**:Java 5.0对并发编程提供了大量支持,如`java.util.concurrent`包,包含线程池、并发容器和同步工具类等,提高了多线程编程的效率和安全性。 通过《Java 5.0 Tiger 程序高手秘笈》,读者可以深入...

    java.util.concurrent 学习ppt

    Java.util.concurrent是Java 5.0引入的一个重要包,它为多线程编程提供了一组高级并发工具。这个包的设计者是Doug Lea,它的出现是JSR-166的一部分,也被称作Tiger更新。Java.util.concurrent的引入是为了解决传统...

    The java.util.concurrent synchronizer framework.pdf

    文档中提到了AbstractQueuedSynchronizer类,这个类是Java 5.0中java.util.concurrent包大多数同步器的基础,如锁、屏障等同步器。这些同步器以AbstractQueuedSynchronizer为基础框架,提供了原子管理同步状态、阻塞...

    The java. util. concurrent synchronizer framework.pdf

    AQS(AbstractQueuedSynchronizer)是Java.util.concurrent包中同步器的基础框架,它的核心设计思想与实现方法在Doug Lea先生的这篇论文中有详细的介绍。论文详细阐述了AQS框架的原理、设计、实现、应用以及性能等...

    Java2参考大全(jdk5.0)

    10. **并发改进**:包括`java.util.concurrent`包的引入,提供了线程安全的容器和同步工具类,如`ExecutorService`、`Future`、`Callable`等,简化了多线程编程。 这本书详细阐述了这些特性以及如何在实际项目中...

    java concurrent 精简源码

    Java并发库是Java SE 5.0引入的一个重要特性,它提供了很多高级并发工具,如线程池、同步容器、并发集合以及阻塞队列等,极大地简化了并发编程。 2. **阻塞队列(BlockingQueue)** 阻塞队列是一种特殊的队列,当...

    良葛格Java JDK 5.0学习笔记

    10. **并发改进**:JDK 5.0引入了`java.util.concurrent`包,提供了高级并发工具,如线程池、阻塞队列、同步器等,使得多线程编程更加高效和易于管理。 这些只是JDK 5.0中部分重要的新特性。通过深入学习《良葛格...

    java 多线程同步

    Java多线程同步是Java编程中关键的并发概念,它涉及到如何在多个线程访问共享资源时保持数据的一致性和完整性。`java.util.concurrent`包是Java提供的一个强大的并发工具库,它为开发者提供了多种线程安全的工具,...

    良葛格Java JDK 5[1].0学习笔记

    11. **并发编程改进**:`java.util.concurrent`包提供了线程池、并发集合和同步工具类,如`ExecutorService`、`Future`和`Semaphore`,帮助开发者更方便地编写并发程序。 以上是Java JDK 5.0的关键特性,这些改进...

    良葛格Java JDK 5.0学习笔记.rar

    此外,JDK 5.0还改进了内存模型,支持了线程并发的更多原语,如`java.util.concurrent`包中的`Executor`和`Future`接口,以及`CyclicBarrier`、`Semaphore`等同步工具类。这些特性为编写高效的多线程程序提供了便利...

    Java函数包 API

    此外,Java 5.0的API还包括了对并发编程的改进,如`java.util.concurrent`包中的线程池、同步器(如CyclicBarrier和Semaphore)等工具,以及新的日期时间API(java.util.Calendar和java.util.Date的替代者,如java....

    J2SERuntimeEnvironment5.0开发者版

    J2SE 5.0引入了java.util.concurrent包,包含了丰富的线程安全的数据结构和同步工具类,如Executor框架、Semaphore、CyclicBarrier等,这些工具极大地简化了多线程编程。 8. **死锁检测(Deadlock Detection)** ...

    JavaTM 2 Platform Standard Edition 5.0

    9. **并发工具类(Concurrency Utilities)**:Java SE 5.0引入了java.util.concurrent包,包含了线程池、同步容器、并发集合等高级并发工具,极大地改善了多线程编程的效率和安全性。 10. **NIO.2(New I/O 2)**...

    JDK DOC5.0 中文帮助文档

    同时,对于新的并发编程工具,如`java.util.concurrent`包下的线程池、并发容器和同步原语,文档也会有详尽的解释。 此外,文档还包含了异常处理、I/O流、网络编程、国际化、XML处理、安全管理、垃圾回收等方面的...

    jdk5.0文档说明

    新增`java.util.concurrent`包,提供了线程安全的数据结构和并发控制工具,如线程池、并发队列、同步器等,简化了多线程编程。 这些新特性不仅增强了Java的表达力,还提升了开发效率和代码质量。通过深入学习JDK ...

    Java多线程编辑核心技术

    Java的并发包java.util.concurrent是Java多线程编程中的重要组成部分,它提供了比原生的java.lang.Thread类和synchronized关键字更高级的并发工具。比如,它提供了线程池的实现(如ExecutorService和...

Global site tag (gtag.js) - Google Analytics