`
he_wen
  • 浏览: 238735 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

利用Executor框架执行并行任务---之基础篇

阅读更多

    本文章的主题主要是在应用程序中寻找可以并行的任务,也就是把问题分成几个子问题。

 

一、讲解Executor和Executors以及ThreadPoolExecutor

 

public interface Executor{

void execute(Runnable command) ;
 
}

 

    执行已提交的 Runnable 任务的对象。

 

  Executors是一个工厂类,它创建不同类型的线程池,主要是根据不同的参数配置。

 

下面还是主要说明ThreadPoolExecutor类中的构造方法,如:

 

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory,  
                          RejectedExecutionHandler handler)  

 

具体流程如下:

1)当池子大小小于corePoolSize就新建线程,并处理请求

2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

 

ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住。

下面分析一下创建几个经典的线程池是怎么做到的:

1、一个固定大小的线程池

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

 2、

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。

二、讲解ExecutorService

由于创建了线程池说明是一个非常复杂的程序,所以就需要对它进行管理,也就是说他们的生命周期。

ExecutorService是Executor的父类,它定义了一个灵活的执行策略以及对任务的批量处理,讲解这个接口之前,需要讲解一下Future接口和Callable接口。

public interface Future<V> 

 

 

 

boolean cancel(boolean mayInterruptIfRunning)
          试图取消对此任务的执行。
 V get()
          如有必要,等待计算完成,然后获取其结果。
 V get(long timeout, TimeUnit unit)
          如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
 boolean isCancelled()
          如果在任务正常完成前将其取消,则返回 true
 boolean isDone()
          如果任务已完成,则返回 true

 

 

 

 

 

 

 

由于是异步获得结果,重点说明get()方法。FutureTask实现了Future接口并且实现了Runnable接口,所以在执行任务的时候可以调用run方法,如:

   

     Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                public V call() throws InterruptedException {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft = new FutureTask<V>(eval);
            f = ft;
            cache.put(arg, ft);
            ft.run(); // call to c.compute happens here
        }
        try {
            return f.get();

 执行任务还有另外一个方法就是委托给线程:new Thread(ft).start()

 

下面讲解ExecutorService接口:

 

由于Executor接口一次只能执行一个任务对多个任务不支持,而且对任务执行中参数的异常不可以管理以及取消提交的任务都不支持,对于以上的需求,就扩展了ExecutorService接口。

 

 

 

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();    
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

      <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

自从Executor引进了Service后,该应用就能提供一种优雅的方式关闭应用程序,并且提供一些影响关闭状态的消息。

 

ExecutorService生命周期有三个状态:运行状态、正在关闭状态、终止状态,他是刚刚创建的时候就是运行状态。

 shutdow方法允许已经被提交的任务被执行但是不接收新的任务;shutdownNow方法是立即关闭:试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

 

invokeAll方法时批量的提交任务返回一个future集合,这个方法在下面的文章能应用。。。

0
8
分享到:
评论

相关推荐

    java concurrenty in practice pdf

    - **执行框架**:深入探讨了Java中的执行框架,包括`Executor`接口的使用。 - **发现可利用的并行性**:讲解了如何识别并利用程序中的并行性。 #### 第七章:取消与关闭 - **任务取消**:探讨了取消任务的方法和...

    Java concurrency in practice

    - **第6章:任务执行**:探讨了在线程中执行任务的方法,重点介绍了Executor框架,以及如何识别可利用的并行性。 - **第7章:取消与关闭**:讲解了任务取消机制和停止基于线程的服务的最佳实践。 《Java并发实践》...

    Java语言程序设计-进阶篇(原书第8版)

    - **线程池**:介绍线程池的概念、工作原理以及使用Executor框架管理线程。 #### 并行流 - **Stream API**:了解Java 8引入的Stream API如何简化集合操作。 - **并行流**:利用并行流加速大数据集的处理过程,同时...

    java_concurrency_in_practice

    首先介绍了如何在独立线程中执行任务,然后深入剖析了Executor框架,最后讨论了如何识别可利用的并行性。 - **第7章:任务取消与系统关闭** 本章讲解了如何取消正在执行的任务,并讨论了如何停止基于线程的服务。...

    JavaSE常见面试题-线程篇.pdf

    2. **线程与进程的区别**:线程是进程的子集,一个进程中可以有多个线程并行执行不同任务。进程间各自拥有独立的内存空间,而线程共享同一进程的内存空间,但每个线程有独立的栈内存用于存储本地变量。 3. **Java中...

    Java-jdk10-最新最全多线程编程实战指南-核心篇

    3. **并发工具类**:如Executor框架,ThreadPoolExecutor的配置与管理,Future和Callable接口,CyclicBarrier和Semaphore等同步工具类的使用。 4. **并发集合**:讲解ConcurrentHashMap、CopyOnWriteArrayList、...

    Spark(1.6.13)源码

    - **Executor**:运行在工作节点上的进程,负责执行Task任务并存储数据。 3. **源码解析** - **src**目录: - **core**:包含Spark核心模块,如RDD、调度器、存储系统和网络通信。 - **sql**:提供DataFrame和...

    Apache Spark源码剖析

    在源码中,我们能看到DAGScheduler负责将任务拆分成任务集(TaskSet),然后通过TaskScheduler接口将其分发到Executor上执行。Executor是运行在Worker节点上的进程,负责执行任务并管理内存。内存管理在Spark中至关...

    storm基础框架分析

    前期收到的问题:1、在Topology中我们可以指定spout、...本篇来建立一个基本的背景,来大概看下构成storm流式计算能力的一些基础框架,并部分回答第一个问题。worker、executor、task的关系worker是一个进程.executor

    基于Spark框架的大数据局部频繁项集挖掘算法设计.zip

    总之,基于Spark框架的大数据局部频繁项集挖掘算法设计是一项复杂但极具价值的任务。通过合理利用Spark的并行计算能力,我们可以有效地挖掘大规模数据中的局部模式,为业务决策提供有价值的信息。这一领域的研究和...

    大数据处理平台Spark基础实践研究.pdf

    DAG调度器(DAGScheduler)将RDD图转换为一系列阶段(Stage)的有向无环图,并将这些阶段提交给任务调度器(TaskScheduler),最终由TaskScheduler将任务分配给Executor执行。 Spark提供了丰富的数据处理操作。例如...

    JDK_API_1_6

    - **Executor框架**:提供了一个高级抽象来管理线程池和执行任务。 - **原子变量类**:如`AtomicInteger`,`AtomicLong`等,用于实现无锁编程。 - **并发容器**:如`ConcurrentHashMap`,`ConcurrentLinkedQueue`等...

    SparkCore快速入门详解

    SparkCore是Apache Spark的核心组件,它是大数据处理框架Spark的基础,主要负责分布式计算任务的调度、内存管理和集群资源的协调。本篇文章将详细讲解SparkCore的基本概念、架构、核心功能以及如何进行快速入门。 ...

    光环大数据培训spark体系学习文档

    2. 并行度调整:合理设置Executor数量和核心数,平衡资源利用率和任务并发度。 3. 数据倾斜:识别和处理数据不均匀分布问题,如使用自定义分区器。 4. SQL性能调优:优化查询计划,使用广播变量和Join优化等方法。 ...

    SpringBoot版本的多线程下载文件,分段下载文件

    本篇将深入探讨如何利用SpringBoot实现多线程下载文件以及分段下载文件的技术。 首先,多线程下载文件是一种提高下载速度的方法,通过将大文件分成多个小部分,每个部分由一个单独的线程负责下载,从而充分利用多核...

    spark快速大数据分析

    Stage是DAG(有向无环图)任务分解后的执行阶段,每个Stage由一系列Task组成,Tasks在Executor上运行。理解Stage和Task的关系有助于优化作业执行。 此外,Spark提供了两种主要的编程接口:Scala和Java API,以及...

    JUNIT多线程测试

    然而,随着并发编程的普及,多线程测试成为了一个不可或缺的部分,因为我们需要确保我们的程序在并行执行时能够正常工作。本篇将深入探讨如何在JUnit中进行多线程测试,以及它的重要性。 首先,理解为什么需要进行...

    java线程入门

    Java线程是Java编程中的重要概念,它是多任务并行执行的基础。在现代计算机系统中,多线程已经成为实现高效程序设计的关键技术。本篇主要针对Java线程的入门知识进行详细阐述,帮助初学者理解并掌握Java线程的基本...

    spark考试练习题含答案.rar

    3. **Executor配置**:调整executor的数量、内存大小和CPU核心数,平衡资源利用率和任务并发度。 六、Spark的其他高级特性 1. **Spark MLlib**:提供机器学习库,包含多种算法,如分类、回归、聚类等。 2. **...

Global site tag (gtag.js) - Google Analytics