`
he_wen
  • 浏览: 238631 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

利用Executor框架执行并行任务---之应用篇

阅读更多

本文章主要是通过基础篇应用Executor框架

 

一、如何寻找可并行的任务

 

为了使用Executor框架,你一定要把任务描述成Runnable,所以要设计这样的框架,主要是要做好确定任务的边界:一个好的任务边界是非常困难,因为涉及到任务的划分,每个任务有特定的运行时间,如果你划分的任务运行的时间不一致(运行长的时间的任务远远大于运行短时间的任务)时候,那么就会严重影响你应用程序的并发性,下面有具体的例子演示。

 

 下面的例子主要是说明应用程序如何进行并行化

 

二、举例子

 

下面是一个用户请求页面的例子:

     也就是页面的渲染:页面包含了文本文字和图片,下面有几个版本都是对这个需求进行并行化使得应用程序达到更好的性能。

 

版本一、使用单线程的思想开发这个应用程序如:

 

public class SingleThreadRenderer {
    void renderPage(CharSequence source) {
        renderText(source);
        List<ImageData> imageData = new ArrayList<ImageData>();
        for (ImageInfo imageInfo : scanForImageInfo(source))
            imageData.add(imageInfo.downloadImage());
        for (ImageData data : imageData)
            renderImage(data);
    }
}

 

出现的问题:由于下载文本文字十分的快,但是下载图片涉及到I/O瓶颈,在下载图片时候cup做了很少的事情。如此这样的设计就会导致用户等待很长的时候才能看到请求的页面。

 

解决方案:充分利用并发性来解决出现的问题

 

版本二:

 

用Future和ExecutorService解决这个问题,把下载图片的任务全部提交给Executor框架,返回一个Future,然后调用future.get()方法(等待call方法中返回的结果集)。

 

public class FutureRenderer {
    private final ExecutorService executor = ...;

    void renderPage(CharSequence source) {
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        Callable<List<ImageData>> task =
                new Callable<List<ImageData>>() {
                    public List<ImageData> call() {
                        List<ImageData> result
                                = new ArrayList<ImageData>();
                        for (ImageInfo imageInfo : imageInfos)
                            result.add(imageInfo.downloadImage());
                        return result;
                    }
                };

        Future<List<ImageData>> future =  executor.submit(task);
        renderText(source);

        try {
            List<ImageData> imageData =  future.get();
            for (ImageData data : imageData)
                renderImage(data);
        } catch (InterruptedException e) {
            // Re-assert the thread's interrupted status
            Thread.currentThread().interrupt();
            // We don't need the result, so cancel the task too
            future.cancel(true);
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}

 

 该类是使用了两个任务:一个渲染text,另外一个是下载图片,如果渲染text比下载图片快很多,那么这个划分的并发程序与单线程程序差不多,那么如何进一步寻找应用程序的并发呢?请看下一个版本

 

该版本的优点:用户请求能够很快响应出页面文本信息

缺点:    图片的下载,要全部下载完成后才能够渲染

 

记住一条原则:

 

   真正高性能并发的程序是多个独立的相同任务并发的执行也就是说多个任务处理的时间差不多

 

版本三:

  

  基于上个版本的劣势,那么最理想的做法应该是下载一张图片应该上马响应给客户。接下来该版本说明如何解决这个问题,讲这个问题之前要说明一下一个类和里一个接口。

 

CompletionService接口:

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。也就是说不需要等待全部的任务的结果完成后才返回,只要有任务的结果就会存放在一个队列中;如果需要任务的结果只需要调用take方法即可。

 

Future<V> poll()
          获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null
 Future<V> poll(long timeout, TimeUnit unit)
          获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则将等待指定的时间(如果有必要)。
 Future<V> submit(Callable<V> task)
          提交要执行的值返回任务,并返回表示挂起的任务结果的 Future。
 Future<V> submit(Runnable task, V result)
          提交要执行的 Runnable 任务,并返回一个表示任务完成的 Future,可以提取或轮询此任务。
 Future<V> take()
          获取并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。

 

下面讲一下实现该接口的ExecutorCompletionService类:

 

大概源码如:

 

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
/**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
 public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

 

 

由源码可以看出该类借助Future和一个阻塞队列LinkedBlockingQueue

 

从submit方法,传了一个QueueingFuture对象,该类的父类是FutureTask(它实现了Runnable接口),并且还是实现了done方法,表示任务完成后就把结果存放在阻塞队列中。

 

从源码中可以看出Executor使用了多个ExecutorCompletionService,即提交了多个任务。这个实现可以在submit中体会得到。

 

由于ExecutorCompletionService类可以对多个任务提交,不必要等待整个任务返回结果才能得到结果,只需要某个任务完成就可以利用take方法,获取结果。

 

本版本我了充分利用并发性能,我们为每张图片下载使用一个线程,放在Executor框架中执行即线程池。

 

代码:

 

public class Renderer {
    private final ExecutorService executor;

    Renderer(ExecutorService executor) { this.executor = executor; }

    void renderPage(CharSequence source) {
        final List<ImageInfo> info = scanForImageInfo(source);
        CompletionService<ImageData> completionService =
            new ExecutorCompletionService<ImageData>(executor);
        for (final ImageInfo imageInfo : info)
            completionService.submit(new Callable<ImageData>() {
                 public ImageData call() {
                     return imageInfo.downloadImage();
                 }
            });

        renderText(source);

        try {
            for (int t = 0, n =  info.size(); t < n;  t++) {
take方法相当于在阻塞队列拿去任务执行完的结果
                Future<ImageData> f = completionService.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}

 从上面的代码可以看出多个ExecutorCompletionService共享一个Executor。

 

三、另外一个例子---旅游公司标价

 

需求说明:

 

  想象一下一个旅游的网站,用户输入旅游数据和条件,要显示标价---不同的航空公司、旅店和租赁公司。得到公司的标价可能需要调用一个Web服务、访问数据库、执行EDI事务或者一些其他的机制。所以用户要得到相应的数据,就需要设计好应用程序才能得到及时的响应。应该对于那些不能及时响应的公司(设定一定的时间),那么页面应该忽略或者用一个默认的数据代替。

 

下面是其代码:

Page renderPageWithAd() throws InterruptedException {
    long endNanos = System.nanoTime() + TIME_BUDGET;
    Future<Ad> f = exec.submit(new FetchAdTask());
    // Render the page while waiting for the ad
    Page page = renderPageBody();
    Ad ad;
    try {
        // Only wait for the remaining time budget
        long timeLeft = endNanos - System.nanoTime();
        ad = f.get(timeLeft, NANOSECONDS);
    } catch (ExecutionException e) {
        ad = DEFAULT_AD;
    } catch (TimeoutException e) {
        ad = DEFAULT_AD;
        f.cancel(true);
    }
    page.setAd(ad);
    return page;
}


 

 

从一个公司获取标价与从另外一个公司获取标价是互不影响,所以为了使该应用程序得到更好的性能,则获取所有公司的标价放在Executor池中批量执行(获取每个公司的标价就使用一个线程)。

 

 

也就是说很容易的创建n个任务,把他们提交给线程池,获得Futures,从Future获取结果时加一个时间限制。

 

下面使用ExecutorService中的invokeAll方法(带有时间限制),

private class QuoteTask implements Callable<TravelQuote> {
    private final TravelCompany company;
    private final TravelInfo travelInfo;
    ...
    public TravelQuote call() throws Exception {
        return company.solicitQuote(travelInfo);
    }
}

public List<TravelQuote> getRankedTravelQuotes(
        TravelInfo travelInfo, Set<TravelCompany> companies,
        Comparator<TravelQuote> ranking, long time, TimeUnit unit)
        throws InterruptedException {
    List<QuoteTask> tasks = new ArrayList<QuoteTask>();
    for (TravelCompany company : companies)
        tasks.add(new QuoteTask(company, travelInfo));

    List<Future<TravelQuote>> futures =
        exec.invokeAll(tasks, time, unit);

    List<TravelQuote> quotes =
        new ArrayList<TravelQuote>(tasks.size());
    Iterator<QuoteTask> taskIter = tasks.iterator();
    for (Future<TravelQuote> f : futures) {
        QuoteTask task = taskIter.next();
        try {
            quotes.add(f.get());
        } catch (ExecutionException e) {
            quotes.add(task.getFailureQuote(e.getCause()));
        } catch (CancellationException e) {
            quotes.add(task.getTimeoutQuote(e));
        }
    }

    Collections.sort(quotes, ranking);
    return quotes;
}


 

0
8
分享到:
评论

相关推荐

    Python库 | qcg_pilotjob_executor_api-0.12.3-py3-none-any.whl

    QCG(Quantum Computing Group)Pilotjob是一种灵活的、基于工作流的计算模型,特别适合处理大规模并行任务和复杂的计算流程。它允许用户提交大量的短期任务(称为"pilot jobs")到集群或网格环境,这些任务可以动态...

    掌握并发的钥匙:Java Executor框架深度解析

    Executor框架提供了一种执行异步任务的方法,它允许开发者将任务提交给线程池,而无需直接管理线程的生命周期。这一框架的核心组件包括: - **Executor**:一个接口,定义了执行提交的Runnable任务的方法。 - **...

    go-xxl-executor-master

    "Go-XXL Executor Master" 是一个基于 Go 语言实现的分布式任务执行框架,主要针对大规模并发任务处理场景。从项目名称我们可以推测,它可能是借鉴了 Java 的 XXL-Job 框架,并且进行了 Go 语言的移植或改造,以提供...

    Go-cr是一个以最大并发运行您任务的jobexecutor

    Go-cr 是一个专门为实现最大并发执行任务而设计的 Job Executor,它基于 Go 语言构建,充分利用了 Go 的并发特性,如 Goroutines 和 Channels,来高效地处理并行任务。在 IT 领域,特别是在服务器端应用开发中,对...

    【Spark调优篇01】Spark之常规性能调优1

    例如,4个Executor、每个Executor有2个CPU核心时,可并行执行8个任务。若增加Executor数量到8个,可并行执行的任务数翻倍。 2. 增加每个Executor的CPU核心数同样能提升并行度。例如,4个Executor、每个Executor有2个...

    TymeacAND:适用于 Android 的任务并行引擎-开源

    - **执行任务**:使用TymeacAND提供的Executor服务,可以方便地启动并行任务,系统会自动处理任务的拆分和合并。 4. **文档资源** - `Index.html`:这可能是项目提供的官方文档或用户指南,包含详细的使用说明、...

    分布式任务调度平台XXL-JOB应用.rar

    分布式任务调度平台XXL-JOB是一款广泛应用于企业级服务中的高效、易用、稳定的任务调度框架,它能够帮助开发者实现复杂的工作流控制,为大型分布式系统提供强大的定时任务管理能力。XXL-JOB以其轻量级的设计、丰富的...

    第20章 Part5 并发工具执行器与线程池.pdf

    而ForkJoinPool是专为并行计算而设计,它采用了一种分而治之的策略,可以高效地利用CPU资源,适合执行可以分解为更小任务的复杂任务。 在Java 5引入的Executor框架的基础上,Executors类提供了一些便捷的工厂方法,...

    解决任意的多线程并行、串行、阻塞、依赖、回调的并行框架

    然而,如何有效地管理和协调这些并行任务,以确保系统的稳定性和性能,就成为了一个挑战。 阻塞和非阻塞是线程交互的两种模式。阻塞线程意味着一个线程在等待某个资源或事件发生时会暂停执行,直到该资源可用或事件...

    Apache Spark常见面试题

    ### Apache Spark 常见面试题解析 #### 一、Spark代码执行位置分析 ...每个Executor可以并行执行多个任务,并具有自己的内存空间。 这种设计确保了Spark能够高效地利用集群资源,并支持大规模数据处理的需求。

    xxl-job-admin-2.3.0-SNAPSHOT

    5. **任务执行器(Executor)**:XXL-JOB的核心组件之一,负责实际的任务执行,可以部署在多个节点上,实现任务的分布式执行。 6. **调度中心(Admin)**:XXL-JOB的另一个核心组件,负责任务的调度,包括任务的...

    spark个人总结.doc

    Spark是大数据处理领域的一款高效、快速且通用的计算框架,其强大的并行处理能力使得它在数据处理中占据重要地位。以下是对Spark性能调优和个人总结的一些关键知识点: 1. **资源分配**: - 在Spark应用中,通过`...

    02-Spark性能调优与故障处理.doc

    - **增加Executor数量**:增加Executor数量能提高task的并行执行度,从而加速计算。 - **增加Executor核心数**:同样,增加每个Executor的核心数也能提高并行度,但需注意避免资源浪费。 3. **Executor内存调整**...

    Spring3.0 mvc 定时器及多线程任务demo

    在Spring 3.0 MVC框架中,定时器和多线程任务是两个关键概念,用于构建高效、自动化的Web应用程序。下面将详细讲解这两个概念及其在实际应用中的使用。 一、Spring 3.0 MVC定时器 在Spring 3.0中,我们可以使用...

    Quartz如何实现判断某个任务是否正在运行,在项目中用到的,已经测试过了

    Quartz是一款广泛应用于Java开发中的开源任务调度框架,它提供了强大的定时任务管理功能,支持复杂的调度策略和分布式部署。在实际项目中,有时我们需要判断一个Quartz任务是否正在运行,以便进行相应的操作,如避免...

    XXL-JOB XXL-JOB XXL-JOB

    2. 执行器(Executor):执行器是任务的实际运行载体,每个服务实例都可以注册为一个执行器,负责接收调度中心的任务指令并执行。执行器通过心跳机制与调度中心保持通信,报告状态。 二、任务调度机制 XXL-JOB采用...

    springboot集成xxl-job #资源达人分享计划#

    利用任务分片进行并行处理,提高效率;确保执行器的健康检查,防止任务执行失败。 总结来说,`springboot集成xxl-job`是实现分布式定时任务的一种高效方案,它结合了`SpringBoot`的简洁易用和`xxl-job`的调度能力,...

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    - 运行应用:使用Spark Shell或提交Spark应用程序到集群执行。 6. 开发与交互: - 使用Scala、Java、Python或R语言编写Spark应用。 - 使用SparkSubmit工具提交应用程序到集群。 - Spark Shell提供了一个交互式...

    Spark部署中的关键问题解决之道--许鹏.pdf

    在同一个Executor中,并行执行的任务数取决于Worker上报给Master的CPU核心数。合理配置任务和线程的映射,能够有效提升资源利用率和任务执行效率。 在实际部署中,可能会遇到在同一台机器上启动多个Worker的需求。...

    elastic-job-lite-console-2.1.4.tar.gz

    - **任务拆分**:支持将大任务拆分为多个小任务并行执行,提高执行效率。 - **简单易用的API**:提供Java API,易于集成到现有的Spring或Spring Boot项目中。 2. **Elastic-Job Lite架构** - **作业服务器(Job ...

Global site tag (gtag.js) - Google Analytics