`
jamie.wang
  • 浏览: 347399 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Java Concurrency 之 Executor Framework

阅读更多

1. Executor接口

public interface Executor {
void execute(Runnable command);
}

2. 线程池

Executors的工厂方法可以返回多种线程池:

i. newFixedThreadPool(int nThread),固定大小的线程池,一直维持nThread个线程;

ii. newCachedThreadPool(),根据负载,自动调整线程池线程个数;

iii. newSingleThreadExecutor(),单线程的Executor;

iv. newScheduledThreadPool(int nThread),固定大小的线程池,可以延迟和周期性的执行任务,类似于Timer()(java1.5之后就应该很少用它了);

 

3. 具有生命周期的executor

ExecutorService一般为执行一组任务,它在没有shutdown前会一直等待。

awaitTermination会阻塞当前调用线程,等待线程池中的线程执行完成,或者超时,注意之前必须先调用shutdown。

public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// ... additional convenience methods for task submission
}

4. Callable和Future

Executor的基本执行单元是Runner,但Runner不能返回值,也不好取消等。

接口Callable可以返回计算的值:

public interface Callable<V> {
V call() throws Exception;
}

 而Future则可以管理任务的生命周期:

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException,
CancellationException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
CancellationException, TimeoutException;
} 

ExecutorService.submit()方法会返回一个Future,可以用来获取返回值,或者取消任务。

也可以实例化FutureTask。

5. CompletionService,BlockingQueue和Executor的组合

你可以提交(submit)一组Callable任务,且像队列一样在可用时获取(poll或者taken)计算结果,结果都被包装在Future里。

引用书上的例子:

浏览器渲染HTML页面,先从HTML中获取图片的地址,加入到任务队列中,然后渲染文字,图片到了后,就立即显示。

 

package org.jamie.demo;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class ImageData {
    private final String data;

    public String getData() {
        return data;
    }

    public ImageData(String data) {
        super();
        this.data = data;
    }
}

class ImageLoader {
    private Random ran = new Random();

    public ImageData load(String src) throws InterruptedException {
        long loadTime = 0L;
        do {loadTime = ran.nextInt(10000);} //load time between 1 secs and 10 secs.
        while (1000 > loadTime);

        Thread.sleep(loadTime);
        return new ImageData("image data for " + src);
    }
    
}

public class ExecutorServiceDemo {
    private ImageLoader imageLoader = new ImageLoader();

    public void renderHtmlPage() throws Throwable {
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
        List<String> imagesSources = scanImageSources();
        CompletionService<ImageData> cs = new ExecutorCompletionService<ImageData>(executor);
        for (final String src : imagesSources) {
            cs.submit(new Callable<ImageData> () {
                @Override
                public ImageData call() throws Exception {
                    return imageLoader.load(src);
                }
                
            });
        }
        randerText();

        for (int i = 0; i < imagesSources.size(); ++i) {
            try {
                Future<ImageData> future = cs.poll(2, TimeUnit.SECONDS);
                if (null != future) {
                    ImageData data = future.get();
                    randerImage(data);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                throw e.getCause();
            }
        }
        executor.shutdown();
    }

    private void randerImage(ImageData data) {
        System.out.println("image of HTML " + data.getData());
    }

    private void randerText() {
        System.out.println("text content of HTML");
    }

    private List<String> scanImageSources() {
        return Arrays.asList(
                "/foo.png",
                "/bar.png",
                "/beauty.jpg",
                "/food.png",
                "/sex.png",
                "/ml.png",
                "/boring.png");
    }

    public static void main(String[] args) throws Throwable {
        new ExecutorServiceDemo().renderHtmlPage();
    }
}

  输出:

 写道
text content of HTML
image of HTML image data for /beauty.jpg
image of HTML image data for /foo.png
image of HTML image data for /food.png
image of HTML image data for /bar.png
 
分享到:
评论

相关推荐

    Java Concurrency Framework 的介绍

    ### Java Concurrency Framework 的介绍 #### 一、概述 本文档由 David Holmes 撰写,旨在为初学者提供一个关于 Java Concurrency Framework 的简单介绍。对于那些希望快速掌握 Java 并发编程基础概念的学习者来说...

    Java 9 Concurrency Cookbook - Second Edition

    Separate the thread management from the rest of the application with the Executor framework Solve problems using a parallelized version of the divide and conquer paradigm with the Fork / Join ...

    Java 7 Concurrency Cookbook

    useful mechanisms included in Version 7 of the Java concurrency API, so you will be able to use them directly in your applications, which are as follows: f f t e n . d u o l Basic thread management w....

    Java.Concurrency.in.Practice.pdf

    如何在多线程中执行任务,使用线程池(Thread Pools)和执行器框架(Executor Framework)等高级工具。 7. **取消和关闭(Cancellation and Shutdown)** 处理任务的取消机制,以及在服务停止时的线程安全问题。...

    Programming Concurrency on the JVM

    - **Executor Framework**:用于管理和控制线程池的高级API,有助于优化资源使用。 - **Locks and Conditions**:提供了比内置同步更灵活的锁定机制。 - **Concurrent Collections**:专门设计用于支持并发访问的...

    java7帮助文档

    The fork/join framework, which is based on the ForkJoinPool class, is an implementation of the Executor interface. It is designed to efficiently run a large number of tasks using a pool of worker ...

    jdk1.7API文档(2)

    执行器框架(Executor Framework Enhancements) `java.util.concurrent`包中的`ExecutorService`接口新增了`invokeAll()`和`invokeAny()`方法,用于执行多个任务并等待任意一个或所有任务完成。 以上只是Java ...

    python3.6.5参考手册 chm

    Policy Framework Provisional Policy with New Header API Other API Changes ftplib functools gc hmac http html imaplib inspect io itertools logging math mmap multiprocessing nntplib os...

Global site tag (gtag.js) - Google Analytics