`

Java之 java.util.concurrent 包之概述

阅读更多
一、概述:java.util.concurrent (Java 并发编程工具包 ) Ref- jenkov

从 JDK1.5 开始新增了一个包:java.util.concurrent,这个包包含了一组Java类,它们使多线程开发变得容易。在新增这个包之前,你需要自己写这些工具类。

在本文中,我(用JDK1.6)将与你介绍一下这个包(java.util.concurrent)里面的类,挨个介绍。
本文不会涉及并讨论【Java并发编程】的原理。如果你对此感兴趣,请看这里:Ref- jenkov

1、 java.util.concurrent.atomic.AtomicInteger

在Java语言中,++i 和 i++ 操作并不是线程安全的,在多线程并发情况下,不可避免的会用到synchronized关键字。而AtomicInteger则提供了一种线程安全的加减操作。

/**
来看看AtomicInteger提供的接口。

- public final int get()                    //获取当前的值
- public final int getAndIncrement()        //获取当前的值,并自增
- public final int getAndDecrement()        //获取当前的值,并自减
- public final int getAndAdd(int delta)     //获取当前的值,并加上一个(正负)值
- public final int getAndSet(int newValue)  //取当前的值,并设置新的值

*/


2、java.util.concurrent.BlockingQueue 接口 Ref- jenkov



BlockingQueue 的典型应用场景是:一个线程生产对象,另一个线程则消费这个对象。

生产线程保持生产对象到队列中,直到队列满时,生产线程被阻塞。
消费线程保持消费队列中的对象,直到队列空时,消费线程被阻塞。

方法介绍:
Throws ExceptionSpecial ValueBlocksTimes Out
Insertadd(o)offer(o)put(o)offer(o, timeout, timeunit)
Removeremove(o)poll()take()poll(timeout, timeunit)
Examineelement()peek()


实现类:
        - ArrayBlockingQueue
        - DelayQueue
        - LinkedBlockingQueue
        - PriorityBlockingQueue
        - SynchronousQueue

例子:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockedQueueTest {  
	  
    public static void main(String[] args) throws Exception { 
        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);  
  
        Producer producer = new Producer(queue);  
        Consumer consumer = new Consumer(queue);  
  
        new Thread(producer).start();  
        new Thread(consumer).start();  
  
        System.out.println("done!");
         
    }  
}  
  
class Producer implements Runnable{
    protected BlockingQueue<String> queue ;
    public Producer(BlockingQueue<String> queue) {  
        this.queue = queue;  
    }
    public void run() {  
        try {  
            queue.put("1");  
            Thread.sleep(1000);  
            queue.put("2");  
            Thread.sleep(1000);  
            queue.put("3");  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  
  
  
class Consumer implements Runnable{
    protected BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {  
        this.queue = queue;  
    }
    public void run() {  
        try {  
            System.out.println(queue.take());  
            System.out.println(queue.take());  
            System.out.println(queue.take());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  





3、java.util.concurrent.ConcurrentMap 接口 Ref- jenkov

java.util.concurrent.ConcurrentMap 接口 extends java.util.Map 接口 ,具有处理并发 put 和 get 的能力。

实现类:java.util.concurrent.ConcurrentHashMap
ConcurrentHashMap 与 java.util.HashTable 比较相似,但性能更高:读取数据时不锁Map;写数据时不会锁整个Map,只锁写的那部分。

另一个不同是:ConcurrentHashMap 不会抛 ConcurrentModificationException(当使用iterator 且 Map 数据发生改变时)。因为 Iterator 被设计为只允许一个线程使用。

例子:
      ConcurrentMap concurrentMap = new ConcurrentHashMap();
      concurrentMap.put("key", "value");
      Object value = concurrentMap.get("key");



4、Executors 框架 Ref- journaldev

JDK1.5以前,创建过多的线程可能会引起内存耗尽。所以使用线程池(ThreadPool)是一个很好的解决方案。

Executors 框架,可以创建线程池线程池(ThreadPool),并提供了对异步线程的 调用(invocation),调度(scheduling),执行(execution)的功能。

问题:如何重复使用一个线程?
解决:线程对象一旦运行结束,不可以再被从新执行。但使用阻塞消息队列,可以不断接收和处理 Runnable 或 Callable 对象。


java.util.concurrent.ExecutorService

java.util.concurrent.ExecutorService 接口是一个线程池(Thread Pool),可以异步执行线程。


import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Test;


public class TestExecutors {
	
	static class MyRunnable implements Runnable{
		@Override
		public void run() {
			System.out.println("Run, run, run!");
		}
		
	}
	
	static class MyCallable implements Callable<String>{
		@Override
		public String call() throws Exception {
			System.out.println("Call, call, call!");
			return null;
		}
		
	}
	
	static class MyBlockedRunnale implements Runnable{
		@Override
		public void run() {
			try {
				for(int i = 1; i <= 5; i++){
					System.out.println("Sleeping " + i + "s.");
					Thread.sleep(1000);
					
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
	
	
	
	
    /**
     *  test01_createServices
     *============================================================
     *  This ExecutorService contains the follow 5 methods 
     *  for passing it task for execution:
     *  
     * 
     *      - execute(Runnable)
     *      
     *      - submit(Runnable)
     *      - submit(Callable)
     *      
     *      - invokeAny(...)
     *      - invokeAll(...)
     *      
     *      
     */
	public void test01_createServices() throws Exception{
		
	    ExecutorService singlService    = Executors.newSingleThreadExecutor();
	    ExecutorService fixedService    = Executors.newFixedThreadPool(10);
	    ExecutorService schduService    = Executors.newScheduledThreadPool(10);
	    ExecutorService cacheService    = Executors.newCachedThreadPool();
	    
	    singlService.shutdown();
	    fixedService.shutdownNow();
	    schduService.awaitTermination(10, TimeUnit.DAYS);
	    cacheService.awaitTermination(10, TimeUnit.SECONDS);
	}
    
	
    /**
     *  test02_Runnable_ExeuteSubmit
     *  ===========================================================
     */
    public void test02_Runnable_ExeuteSubmit() throws Exception{
        
    	ExecutorService singleService    = Executors.newSingleThreadExecutor();
    
    	/*
	     * service.execute() - void
	     */
	    singleService.execute(new MyRunnable());
	    
	    /*
	     * service.submit(Runnable) - Future
	     *  - future.get(): this method is blocked, until result is returned.
	     *  - returns null if the task has finished correctly - for Runnable.
	     */
	    Future<?> future =  singleService.submit(new MyRunnable());
	    future.get(); 
	    
    }
	    
    
    /**
     *  test03_Callable_Submit
     *  ===========================================================
     */
	public void test03_Callable_Submit() throws Exception{  
	    
		ExecutorService singleService    = Executors.newSingleThreadExecutor();
		
		/*
		 * service.submit(Callable) - Future
		 * - future.get(): this method is blocked, until result is returned.
		 * - returns result if successful completion.
		 */
		Future<String> future = singleService.submit(new MyCallable());
		System.out.println("future.get() = " + future.get());
    
	}
	
	

    /**
     * test04_invokeAny()
     * ================================================================
        The invokeAny() method takes a collection of Callable objects
        
        Invoking this method does not return a Future, 
        but returns the result of one of the Callable objects. 
        
        You have no guarantee about which of the Callable's results you get. 
        Just one of the ones that finish.

        If one of the tasks complete (or throws an exception), 
        the rest of the Callable's are cancelled.
     */
	public void test04_invokeAny() throws Exception{  
    
		ExecutorService cacheService    = Executors.newCachedThreadPool();
    
		Set<Callable<String>> callables = new HashSet<Callable<String>>();
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 1";
	        }
	    });
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 2";
	        }
	    });
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 3";
	        }
	    });

	    String result = cacheService.invokeAny(callables);
	    System.out.println("result = " + result);
    
	}
	    

    /**
     * test05_invokeAll
     ===========================================================================
     The invokeAll() method invokes all of the Callable objects you pass to it 
     in the collection passed as parameter. 
     
     The invokeAll() returns a list of Future objects via which you can obtain 
     the results of the executions of each Callable.


     Keep in mind that a task might finish due to an exception, 
     so it may not have "succeeded". 
     
     There is no way on a Future to tell the difference.
     
     */
	public void test05_invokeAll() throws Exception{
		ExecutorService cacheService    = Executors.newCachedThreadPool();
    
		Set<Callable<String>> callables = new HashSet<Callable<String>>();
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 1";
	        }
	    });
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 2";
	        }
	    });
	    callables.add(new Callable<String>() {
	        public String call() throws Exception {
	            return "Task 3";
	        }
	    });

	    List<Future<String>> futures = cacheService.invokeAll(callables);
	    for(Future<String> future : futures){
	        System.out.println("future.get = " + future.get());
	    }
    
	}


	
	public void test06_Shutdown_AwaitTermination() throws Exception{
	
		ExecutorService cacheService    = Executors.newCachedThreadPool();
		
		/*
	     * This method does not wait for previously 
	     * submitted tasks to complete execution,
	     * and it will no longer accept new tasks.
	     * 
	     * The ExecutorService will shut down immediately.
		 */
		cacheService.shutdown();
		
		
		/*
		 * Blocks until: 
		 * 
		 * - all tasks have completed execution after a shutdown request, or 
		 * - the timeout occurs, or 
		 * - the current thread is interrupted, 
		 * 
		 * whichever happens first.
		 */
		cacheService.awaitTermination(50, TimeUnit.MINUTES);
    
    }
	
	
	

	
	@Test
	public void testShutdown(){
		ExecutorService cacheService    = Executors.newCachedThreadPool();
		cacheService.execute(new MyBlockedRunnale());
		cacheService.shutdown();
		System.out.println("shut down...");
	}
	
	@Test
	public void testAwaitTermination(){
		ExecutorService cacheService    = Executors.newCachedThreadPool();
		cacheService.execute(new MyBlockedRunnale());
		try {
			cacheService.awaitTermination(50, TimeUnit.MINUTES);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("shut down...");
		
	}
	
	/**
	 * Best Practice
	 */
	@Test
	public void testShutdownAndAwaitTermination(){
		ExecutorService cacheService    = Executors.newCachedThreadPool();
		cacheService.execute(new MyBlockedRunnale());
		try {
			cacheService.shutdown();
			cacheService.awaitTermination(50, TimeUnit.MINUTES);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("shut down...");
		
	}

}







5、java.util.concurrent.locks 包

java.util.concurrent.locks 包提供了线性同步机制,跟 synchronized 关键字一样,但是功能更多。


5.1 java.util.concurrent.locks.Lock 接口 Ref- jenkov
方法概述:

      - lock()
      - lockInterruptibly()
      - tryLock()
      - tryLock(long timeout, TimeUnit timeUnit)
      - unlock()

实现类:
java.util.concurrent.locks.ReentrantLock




5.2 java.util.concurrent.locks.ReadWriteLock 接口 Ref- jenkov

提供了更高级的锁机制:可以有多个线程同时读,但只能有一个线程写。

Read Lock
取得读锁的条件:
- 没有线程在(或已经)获取写锁。

Write Lock 
取得写锁的条件:
- 没有线程在读
- 没有线程在写

实现类:
java.util.concurrent.locks.ReentrantReadWriteLock

例子:


import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class TestConcurrent_Lock {
    
    public void test(){
        
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();


        readWriteLock.readLock().lock();

            // multiple readers can enter this section,
            // if not locked for writing, and no writers waiting
            // to lock for writing.

        readWriteLock.readLock().unlock();


        readWriteLock.writeLock().lock();

            // only one writer can enter this section,
            // and only if no threads are currently reading.

        readWriteLock.writeLock().unlock();
    }

}

/*


readLock.lock();

This means that if any other thread is writing (i.e. holds a write lock) 
then stop here until no other thread is writing.




writeLock.lock();

This means that if any other thread is reading or writing, 
then stop here and wait until no other thread is reading or writing.



*/







java.util.concurrent包之Execuotor系列文章

00_Java之 java.util.concurrent 包之概述

01_Java之java.util.concurrent包之Executor与ExecutorService

02_Java之 java.util.concurrent 包之ExecutorService之submit () 之 Future

03_Java之多线程之Callable与Future

04_Java之多线程之Lock




转载请注明,
原文出处:http://lixh1986.iteye.com/blog/2341898












-


  • 大小: 9 KB
分享到:
评论

相关推荐

    The java.util.concurrent Synchronizer Framework

    Java平台在J2SE 1.5版本中引入了`java.util.concurrent`包,这是一系列中等层次的并发支持类集合,通过Java社区过程(Java Community Process, JCP)的Java规范请求(Java Specification Request, JSR)166创建而成...

    [Java参考文档].JDK_API_1_6_zh_CN

    4. **多线程**:如`java.lang.Thread`和`java.util.concurrent`,支持并发编程,包括线程的创建、同步和管理。 5. **网络编程**:如`java.net`,提供了处理套接字(Socket)和服务器套接字(ServerSocket)的类,...

    28个java常用的工具类

    23. **`java.util.concurrent.locks.Lock`** 和 **`java.util.concurrent.locks.ReentrantLock`**: 锁机制,用于线程同步。 24. **`java.util.ArrayList`**: 用于创建堆栈、队列和双端队列的实现,如`ArrayDeque`。...

    JAVA_API1.6文档(中文)

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 ...

    基于线程池的Java多线程应用技术.pdf

    Java提供了三种方式用于创建线程,分别是继承java.lang.Thread类、实现java.lang.Runnable接口、应用java.util.concurrent.Callable接口与java.util.concurrent.Future接口。每种方式都有其优缺,需要根据实际工程...

    java工具类

    11. **`java.util.concurrent`** 包:包含并发和多线程工具,如`ExecutorService`、`Future`和`Callable`。 12. **`java.util.logging.Logger`**:日志记录工具,用于输出程序运行时的信息。 13. **`java.lang....

    [Java参考文档].JDK_API 1.6

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 JAR ...

    Java2_类库.rar

    4. **多线程**:在`java.lang`和`java.util.concurrent`包中,Java提供了Thread类和Runnable接口,以及各种并发工具类,如Semaphore、ExecutorService等,支持多任务并行执行,提升了程序的并发性能。 5. **网络...

    Java虚拟机并发编程.pdf

    Java并发API在不断地演进,最新版本的Java提供了更为强大的并发工具,例如java.util.concurrent包下的各类工具类和执行器框架。 了解和掌握这些知识点,对于开发高效、稳定、可扩展的Java应用程序至关重要。尤其是...

    Java 1.6 API 中文 New

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 JAR ...

    JavaAPI1.6中文chm文档 part1

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 ...

    Java api html完整中文版

    Java API中的`java.lang.Thread`和`java.util.concurrent`包提供了多线程支持。开发者可以通过继承`Thread`类或者实现`Runnable`接口来创建并管理线程,`java.util.concurrent`包则提供了高级并发工具,如`...

    javaAPI1.6

    `java.lang.Thread`类允许创建和管理线程,而`java.util.concurrent`包则提供了高级并发工具,如`ExecutorService`、`Future`和`Semaphore`,帮助开发者更好地控制并发执行的线程。 4. **异常处理**:Java的异常...

    java随机数

    如果需要更复杂的功能或在并发环境下工作,则应考虑使用 `java.util.Random` 或 `java.util.concurrent.ThreadLocalRandom`。理解这些方法的差异可以帮助开发者选择最合适的技术方案,以满足具体项目的需求。

    java多线程编程总结

    - `java.util.concurrent.locks`包提供了多种类型的锁,如`ReentrantLock`等。 ##### 4. 信号量 - `java.util.concurrent.Semaphore`用于控制对有限资源的访问。 ##### 5. 阻塞队列 - `java.util.concurrent....

    java api最新7.0

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 JAR ...

    java API文档

    ### Java API文档重要知识点概述 #### 一、Java API概览 Java API是Java应用程序接口(Application Programming Interface)的简称,它提供了一系列预定义的类和接口,支持开发者构建复杂的应用程序。Java API覆盖...

    java jdk-api-1.6 中文 chmd

    java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。 java.util.jar 提供读写 ...

    Master concurrency programming java 9 2nd.pdf

    - **`java.util.concurrent`包**:包含了大量用于实现并发编程的类,如`ThreadPoolExecutor`、`ScheduledThreadPoolExecutor`、`CountDownLatch`、`CyclicBarrier`等。 - **`CompletableFuture`**:Java 8引入并在...

Global site tag (gtag.js) - Google Analytics