论坛首页 Java企业应用论坛

ForkJoinPool VS ExecutorService 实例分析

浏览 8560 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-11-05   最后修改:2011-11-05

ps:这是一篇本人翻译自javaworld上的一篇文章,感觉写的很好,不敢独享。而且网络上目前很少有将ForkJoinPool和ExecutorService对比方面的文章,遂花了两个晚上翻译了一下,深感翻译较之阅读很是困难,翻译不妥之处欢迎大家多多指正,同时也向很多默默无闻的翻译者致敬!

 

When to use ForkJoinPool VS ExecutorService?

 

By Madalin Ilie, JavaWorld.com, 10/04/11  译者:qiaoxueshi

原文:http://www.javaworld.com/javaworld/jw-10-2011/111004-jtip-recursion-in-java-7.html

 

       Java7带来的Fork/Join库扩展了现有的Java并发包,对多核系统上的硬件并行提供了支持。在这篇文章中,作者llie演示了在一个网络爬虫应用中,用Java7ForkJoinPool代替Java6ExecutorService在性能上带来的影响。

       网络爬虫,又叫“网络蜘蛛”,是搜索引擎的核心。这些程序先在网络上收集大量的页面数据,并保存到搜索引擎的数据库中,然后这些数据被编上索引,并经过一些算法的处理,最终形成更快,更精确的搜索结果。网络爬虫除了被大量的用在搜索优化,还用在诸如在网页上检验链接的存在或者查找并返回特定的数据的自动化任务上,比如说从网页上获得感兴趣的email地址。

       从体系结构上来说,爬虫虽然从功能和需求上来说相对简单,但大多数的的爬虫都是高性能,多线程的程序。而且编写爬虫不仅是拿来练手的有趣对象,也是来衡量多线程或者并发编程技术的一种手段。

       在这篇文章中,我会用两种方式实现一个爬虫:一种使用Java6ExecutorService,另一种就是使用Java7里的ForkJoinPool了。想实现下面的例子的话,需要安装(在本文撰写时最新)Java7 Update2,同样还需要第三方的解析HtmlHtmlParser包。

      

Java并发编程的两种方法

       Java5革命性的引入了并发包java.util.concurrent,ExecutorService就是其中一份子,它大大简化了Java平台上的线程处理。ExecutorService是一个提供了管理过程跟踪以及异步任务终止方法的Executor。在引入java.util.concurrent包之前,Java开发者在处理自己程序中的并发时要么依赖第三方包要么干脆自己写。

       Java7引入了Fork/Join,并是不为了替换现有的并发工具类或者与其比个高低,相反是对旧有的更新和完善。Fork/Join主要是为了满足在Java程序中实现分而治之算法和处理递归任务的需要。

       Fork/Join的逻辑非常简单:(1)把大的任务快分为多个小任务,成为Fork;(2)在独自的线程里处理每个任务,如果需要的话,可以把这些任务细分为更小的任务;(3)合并(Join)结果。

Fork/Join结构图(译者添加,来自这里

下面两个爬虫的简单实现,演示了Java6ExecutorServiceJava7里的ForkJoinPool的特性和功能。

 

       创建并考量网络爬虫

       我们的爬虫的任务是查找和跟踪链接。你可以用它来验证链接或者收集一些感兴趣的数据,比如说你可能会写个爬虫在网络上搜索安吉丽娜*朱莉或者布拉德·皮特的照片。

       下面是这个程序的主体架构:

l 一个暴露了与链接(links)简单交互的接口。

比如获得访问过链接的数量,增加待访问的链接到队列中,标记一个已访问的链接。

l 一个实现了上面接口的具体实现,同时也作为应用程序的入口。

l  一个线程/递归动作,它包含了检查一个链接是否被访问过的一个业务逻辑。如果没有被访问过,它将收集在该链接对应的页面中所有的连接,然后新建一个线程/递归任务,并提交到ExecutorService或者ForkjoinPool

l  一个ExectorService或者ForkjoinPool来处理等待中的任务。

(注:一个链接被标记为“已访问过”是指它对应的页面中的所有的链接都已经返回了。)

      

      除了比较在Java6Java7中使用并发工具开发的简单易用之外,我们还根据下面两个因素来考量程序的性能:

l 搜索范围:访问1500个独立的(不重复的)(distinct)链接花费的时间

l 处理强度:访问3000个非独立的(可以重复的)的(non-distinct)链接需要的秒数,类似说你的网络连接每秒能处理多少kb的数据。

虽然比较简单,这些指标至少一定程度上提供了在一个具体应用需求下,Java6Java7在并发程序上的性能对比。

 

Java6ExecutorService构建的爬虫

       Java6爬虫实现中,我们使用了容量为64的固定线程池(fixed-thread pool),它是通过调用Executors.newFixedThreadPool(int)这个工厂方法创建的。List1是具体的实现代码。

List 1WebCrawler6

 

package insidecoding.webcrawler;
 
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import insidecoding.webcrawler.net.LinkFinder;
import java.util.HashSet;
 
/**
 *
 * @author Madalin Ilie
 */
public class WebCrawler6 implements LinkHandler {
 
    private final Collection<String> visitedLinks = Collections.synchronizedSet(new HashSet<String>());
//    private final Collection<String> visitedLinks = Collections.synchronizedList(new ArrayList<String>());    
    private String url;
    private ExecutorService execService;
 
    public WebCrawler6(String startingURL, int maxThreads) {
        this.url = startingURL;
        execService = Executors.newFixedThreadPool(maxThreads);
    }
 
    @Override
    public void queueLink(String link) throws Exception {
        startNewThread(link);
    }
 
    @Override
    public int size() {
        return visitedLinks.size();
    }
 
    @Override
    public void addVisited(String s) {
        visitedLinks.add(s);
    }
 
    @Override
    public boolean visited(String s) {
        return visitedLinks.contains(s);
    }
 
    private void startNewThread(String link) throws Exception {
        execService.execute(new LinkFinder(link, this));
    }
 
    private void startCrawling() throws Exception {
        startNewThread(this.url);
    }
 
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws Exception {
        new WebCrawler("http://www.javaworld.com", 64).startCrawling();
    }
}
 
 

 

       在上面WebCrawler6的构造方法中,我们创建了一个拥有64个线程的线程池。接着调用startCrawling方法开始程序,这个方法创建了首个线程并把它提交到了ExecutorService

       接下来,我们创建一个LinkHandler接口,它暴露了和URL打交道的工具方法。具体需求是这样的:(1)通过addVisited()方法把一个URL标记为“已访问”;(2)通过size()方法获知已经访问过URL的数目;(3)通过visited()方法确定某个URL是否已经被访问过;(4)通过queueLink()方法把一个新的URL添加到处理队列中。

 

       List 2 .LinkHandler接口

 package insidecoding.webcrawler;

 
/**
 *
 * @author Madalin Ilie
 */
public interface LinkHandler {
 
    /**
     * Places the link in the queue
     * @param link
     * @throws Exception
     */
    void queueLink(String link) throws Exception;
 
    /**
     * Returns the number of visited links
     * @return
     */
    int size();
 
    /**
     * Checks if the link was already visited
     * @param link
     * @return
     */
    boolean visited(String link);
 
    /**
     * Marks this link as visited
     * @param link
     */
    void addVisited(String link);
}
  

现在,在爬页面时,我们需要通过LinkFinder接口来启动剩余的线程,如List3所示,注意linkHandler.ququeLink(l)这行。

(译者注: 注意LinkFinder实现的是Runnable接口,因为上面程序中ExecutorServiceexecute()方法的参数是Runnable类型)

List 3 LinkFinder

 

package insidecoding.webcrawler.net;
 
import java.net.URL;
import org.htmlparser.Parser;
import org.htmlparser.filters.NodeClassFilter;
import org.htmlparser.tags.LinkTag;
import org.htmlparser.util.NodeList;
import insidecoding.webcrawler.LinkHandler;
 
/**
 *
 * @author Madalin Ilie
 */
public class LinkFinder implements Runnable {
 
    private String url;
    private LinkHandler linkHandler;
    /**
     * Used fot statistics
     */
    private static final long t0 = System.nanoTime();
 
    public LinkFinder(String url, LinkHandler handler) {
        this.url = url;
        this.linkHandler = handler;
    }
 
    @Override
    public void run() {
        getSimpleLinks(url);
    }
 
    private void getSimpleLinks(String url) {
        //if not already visited
        if (!linkHandler.visited(url)) {
            try {
                URL uriLink = new URL(url);
                Parser parser = new Parser(uriLink.openConnection());
                NodeList list = parser.extractAllNodesThatMatch(new NodeClassFilter(LinkTag.class));
                List<String> urls = new ArrayList<String>();
 
                 for (int i = 0; i < list.size(); i++) {
                    LinkTag extracted = (LinkTag) list.elementAt(i);
 
                    if (!extracted.getLink().isEmpty()
                            && !linkHandler.visited(extracted.getLink())) {
 
                        urls.add(extracted.getLink());
                    }
 
                }
                //we visited this url
                linkHandler.addVisited(url);
 
                if (linkHandler.size() == 1500) {
                    System.out.println("Time to visit 1500 distinct links = " + (System.nanoTime() - t0));                   
                }
 
                for (String l : urls) {
                    linkHandler.queueLink(l);
                }
 
             } catch (Exception e) {
                //ignore all errors for now
            }
        }
    }
}
 

 

 

       LinkFinder的逻辑很简单:(1)首先开始解析URL;(2)当得到某个链接对应页面中所有链接时,我们标记该链接为“已访问”(3)然后,我们通过调用queueLink()方法把得到的每一个新链接都入队,该方法实际上会创建一个新线程并提交到ExecutorService 如果在ExecutorService的线程池中有空闲的线程可用,将执行该线程;否则将其置入等待队列。当达到了访问1500个链接的时候,我们打印出统计数据,程序继续执行。

 

Java7ForkJoinPool构建的爬虫

Java7引入的Fork/Join框架实际上是“分而治之(Divide and Conquer)”算法的一个实现,其核心的ForkJoinPool执行分支的ForkJoinTask。在这个例子中,我们使用了由64个线程“支持”的ForkJoinPool。之所以用“支持”这个词,是因为ForkJoinPool比线程还要轻量。在Fork/Join框架中,少数的Thread可以存储大量的tasks

Java6的实现类似,我们首先用由64个线程支持的ForkJoinPool对象来初始化WebCrawler7

Listing 4. Java 7 LinkHandler 实现

 

package insidecoding.webcrawler7;
 
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ForkJoinPool;
import insidecoding.webcrawler7.net.LinkFinderAction;
import java.util.HashSet;
 
/**
 *
 * @author Madalin Ilie
 */
public class WebCrawler7 implements LinkHandler {
 
    private final Collection<String> visitedLinks = Collections.synchronizedSet(new HashSet<String>());
//    private final Collection<String> visitedLinks = Collections.synchronizedList(new ArrayList<>());
    private String url;
    private ForkJoinPool mainPool;
 
    public WebCrawler7(String startingURL, int maxThreads) {
        this.url = startingURL;
        mainPool = new ForkJoinPool(maxThreads);
    }
 
    private void startCrawling() {
        mainPool.invoke(new LinkFinderAction(this.url, this));
    }
 
    @Override
    public int size() {
        return visitedLinks.size();
    }
 
    @Override
    public void addVisited(String s) {
        visitedLinks.add(s);
    }
 
    @Override
    public boolean visited(String s) {
        return visitedLinks.contains(s);
    }
 
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws Exception {
        new WebCrawler7("http://www.javaworld.com", 64).startCrawling();
    }
}

            

 

注意List4中的LinkHandler接口和List2中的Java6的实现几乎一样,只是少了queueLink()方法。最重要的方式是构造器和startCrawling(),在构造器中,我们创建了一个64个线程的ForkJoinPool(之所以是64,是因为在ForkjoinPoolJavaDoc中说明了线程的个数必须是2N次方)。Pool调用了一个新的LinkFinderActionaction又递归的调用更深层次的ForkJoinPool

(译者注:LinkFinderAction继承了RecursiveActionRecursiveActionForkJoinTask的子类,而且上面程序中,ForkJoinPoolinvoke()方法的参数是一个ForkJoinTask类型的变量。)

List5.LinkFinderAction

 

 

package insidecoding.webcrawler7.net;
 
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
import org.htmlparser.Parser;
import org.htmlparser.filters.NodeClassFilter;
import org.htmlparser.tags.LinkTag;
import org.htmlparser.util.NodeList;
import insidecoding.webcrawler7.LinkHandler;
 
/**
 *
 * @author Madalin Ilie
 */
public class LinkFinderAction extends RecursiveAction {
 
    private String url;
    private LinkHandler cr;
    /**
     * Used for statistics
     */
    private static final long t0 = System.nanoTime();
 
    public LinkFinderAction(String url, LinkHandler cr) {
        this.url = url;
        this.cr = cr;
    }
 
    @Override
    public void compute() {
        if (!cr.visited(url)) {
            try {
                List<RecursiveAction> actions = new ArrayList<RecursiveAction>();
                URL uriLink = new URL(url);
                Parser parser = new Parser(uriLink.openConnection());
                NodeList list = parser.extractAllNodesThatMatch(new NodeClassFilter(LinkTag.class));
 
                for (int i = 0; i < list.size(); i++) {
                    LinkTag extracted = (LinkTag) list.elementAt(i);
 
                    if (!extracted.extractLink().isEmpty()
                            && !cr.visited(extracted.extractLink())) {
 
                        actions.add(new LinkFinderAction(extracted.extractLink(), cr));
                    }
                }
                cr.addVisited(url);
 
                if (cr.size() == 1500) {
                    System.out.println("Time for visit 1500 distinct links= " + (System.nanoTime() - t0));                   
                }
 
                //invoke recursively
                invokeAll(actions);
            } catch (Exception e) {
                //ignore 404, unknown protocol or other server errors
            }
        }
    }
}
 

 

应用程序的逻辑目前来看和Java6的实现是一只的,不同的是我们通过invokeAll()静态方法把链接submitForkJoinPool中,Java6中的实现是手动的把新链接通过LinkHandler类加入到ExecutorService的处理队列中。ForkJoinPool将会用最合理的方法使用这64个线程来调度任务。如果被提交的连接已经访问过了,递归的action就结束了(参见 if(!cr.visited(url) )。

比较搜索覆盖率:1500个单独的链接

       现在我们来比较一下吧。考虑到在计算时间时的相对准确性,先对JVM的预热一下:首先每个程序跑10遍,不计算结果,然后再跑10遍计算平均耗时。在运行代码时,我也调用了数次System.gc()手动的激活垃圾收集器。在运行程序时使用了JVM参数:-d64 –Xmx1512M,意思是设置为64位的平台,并且把最大对内存设置为1512M

       我运行的环境是Windows7 SP1 64位,Intel Core I5 @2.67 GHz with 4 G内存,64位的JDK 7 update 2

       Java 6版耗时 (10次的平均值):

              平均耗时:45,404,628,454 nonoseconds (纳秒)
              最快:43,989,514,242 nanoseconds
              最慢:47,077,714,098 nanoseconds
       Java7版耗时:
              平均耗时:45,269,306,013 nanoseconds
              最快:42,365,714,625 nanoseconds
              最慢:59,042,391,887 nanoseconds
 

       从数据可见,在搜索覆盖情况上,两种实现区别并不大。

比较处理强度:3000个不明显(non-distinct)的连接

为了测试第二种方案,我必须对两种实现做一些改动。在WebCrawler6WebCrawler7类中,我取消注释synchronized List,并注释掉synchronized Set。因为在不明显的连接的比较不需要Set,需要List

 

// private final Collection<String> visitedLinks = Collections.synchronizedSet(new HashSet<String>());
 
private final Collection<String> visitedLinks = Collections.synchronizedList(new ArrayList<String>());

        我还修改了visited()方法,让它总是返回false,因为我们这个测试方法不关心一个链接是不是已经访问过了。

@Override

    public boolean visited(String s) {
        return false;//visitedLinks.contains(s);
    }

        最后我修改了在LinkFinderLinkFinderAction类中的条件,把1500改为了3000

if (cr.size() == 3000) {

   System.out.println("Time for visit 3000 non-distinct links= " + (System.nanoTime() - t0));
}
 

       结果显示在衡量处理强度(这里是每秒钟程序处理的链接数)上Fork/Join运行的更好一些。

       Java6运行10次的平均耗时:

              平均耗时:48,510,285,967 nanoseconds

              最快:44,189,380,355 nanoseconds

              最慢:52,132,053,413 nanoseconds
 

       等于说是每秒处理61.8425个链接

      下面是Java7的表现:              

平均耗时: 31,343,446,584 nanoseconds
最快:30,533,600,312 nanoseconds 
最慢:33,308,851,937 nanoseconds

 

 相当于每秒处理95.7137个链接

 

从次可以看出基于ForkJoinPoolJava7版爬虫比Java61.5倍多,是一个很重要的性能提升。

基于ForkJoinPoolJava7版爬虫比Java61.5倍多,是一个很重要的性能提升。

下面的两张图显示了每个实现的CPU记录,可以看出几乎是一样的,即便是ForkJoinPool实现跑的更快一些。

1Java6ExecutorService实现的CPU使用图

2Java7ForkJoinPool实现的CPU使用图

结论:Fork/Join适合于递归程序编程

       虽然测试相对说比较简单,但是可以看出Fork/Join在解决涉及递归的问题时带来了不错的性能提升。因为在多核平台上递归是并行编程的基本,Fork/JoinJava并发一个重要增加。之前说过,它不会取代原来的java.util.concurrency 包,就像我刚才的演示,ExecutorService对于很多并发编程任务来说依然是很不错的解决方案,当高效的递归是处理强度的关键时,比如上面演示的,Fork/Join是更有效的方案。

 

 

资源:

1.Fork/Join官方API文档

 

 

  • 大小: 23.5 KB
  • 大小: 21.1 KB
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics