`
DavyJones2010
  • 浏览: 154240 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Java Concurrency: ExecutorService Introduction

阅读更多

1) Difference between Callable and Runnable

     The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

 

2) ExecutorService Intro

    "Executors are Java 5 name for the concept of thread pools. The 'Executor' naming is due to the fact that there is no guarantee that underlying implementation is actually a pool; an executor may be single-threaded or even synchronous".(From Spring Offical Doc)

    Example:

package edu.xmu.thread;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorTest
{
    public static void main(String[] args)
    {
        List<CalculationCallable> subThreadList = initSubThreadList();

        List<Future<Integer>> futures = new ArrayList<Future<Integer>>();

        ExecutorService pool = Executors.newFixedThreadPool(5);

        System.out.println("Start submit callable. Timestamp: "
                + System.currentTimeMillis());

        for (CalculationCallable calThread : subThreadList)
        {
            futures.add(pool.submit(calThread));
        }

        System.out
                .println("Finished submit callable. Start getFutures. Timestamp: "
                        + System.currentTimeMillis());

        int sum = 0;
        for (Future<Integer> future : futures)
        {
            try
            {
                sum += future.get();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }

        System.out.println("Finished getFutures. sum: " + sum + ", Timestamp: "
                + System.currentTimeMillis());
        pool.shutdown();
    }

    private static List<CalculationCallable> initSubThreadList()
    {
        CalculationCallable thread1 = new CalculationCallable(Arrays.asList(
                200, 200, 200, 200));
        CalculationCallable thread2 = new CalculationCallable(Arrays.asList(
                300, 300, 300, 300));
        CalculationCallable thread3 = new CalculationCallable(Arrays.asList(
                300, 300, 300, 300));
        CalculationCallable thread4 = new CalculationCallable(Arrays.asList(
                300, 300, 300, 300));
        CalculationCallable thread5 = new CalculationCallable(Arrays.asList(
                300, 300, 300, 300));

        List<CalculationCallable> subThreadList = new ArrayList<CalculationCallable>();
        subThreadList.add(thread1);
        subThreadList.add(thread2);
        subThreadList.add(thread3);
        subThreadList.add(thread4);
        subThreadList.add(thread5);

        return subThreadList;
    }
}

class CalculationCallable implements Callable<Integer>
{
    private List<Integer> numList;

    public CalculationCallable(List<Integer> numList)
    {
        super();
        this.numList = numList;
    }

    @Override
    public Integer call() throws Exception
    {
        int sum = 0;
        try
        {
            Thread.sleep(2000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        for (int i : numList)
        {
            sum += i;
        }
        return sum;
    }
}

    Output:

Start submit callable. Timestamp: 1400053633686
Finished submit callable. Start getFutures. Timestamp: 1400053633687
Finished getFutures. sum: 5600, Timestamp: 1400053635687

// We can see that submit(Callable) will not cause execution of Callable.
// It just like a process of "register".
// And future.get() will start the execution process instead.
// And future.get() will start all the callables in the ExecutiveService(Pool)

    Attention:

        1> There is a disadvantage: If the first task takes a long time to compute and all the other tasks end before the first, the current thread cannot compute sum before the first task ends.

        2> Java has the solution for this, CompletionService.

    

    Extensions:

    public static void main(String[] args) throws InterruptedException
    {
        List<CalculationCallable> subThreadList = initSubThreadList();
        ExecutorService pool = Executors.newFixedThreadPool(5);

        System.out.println("Start submit callable. Timestamp: "
                + System.currentTimeMillis());

        List<Future<Integer>> futures = pool.invokeAll(subThreadList);

        System.out
                .println("Finished submit callable. Start getFutures. Timestamp: "
                        + System.currentTimeMillis());

        int sum = 0;
        for (Future<Integer> future : futures)
        {
            try
            {
                sum += future.get();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }

        System.out.println("Finished getFutures. sum: " + sum + ", Timestamp: "
                + System.currentTimeMillis());
        pool.shutdown();
    }

    Output:

Start submit callable. Timestamp: 1400661124891
Finished submit callable. Start getFutures. Timestamp: 1400661126892
Finished getFutures. sum: 5600, Timestamp: 1400661126892
// We will notice that invokeAll(Collection<?>..) will start the execution of all threads
// Instead of future.get(), we now bring the execution time forward.

    The source code snippet for invokeAll(Collection<? extends Callable<T>>)

for (Future<T> f : futures) {
  if (!f.isDone()) {
    try {
      f.get();
    } catch (CancellationException ignore) {
    } catch (ExecutionException ignore) {
    }
  }
}

 

 

3) CompletionService Intro

    Example:

public static void main(String[] args)
    {
        List<CalculationCallable> subThreadList = initSubThreadList();

        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(
                threadPool);

        System.out.println("Start submit callable. Timestamp: "
                + System.currentTimeMillis());

        for (CalculationCallable calThread : subThreadList)
        {
            pool.submit(calThread);
        }

        System.out
                .println("Finished submit callable. Start getFutures. Timestamp: "
                        + System.currentTimeMillis());

        int sum = 0;
        for (int i = 0; i < 5; i++)
        {
            try
            {
                sum += pool.take().get();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }

        System.out.println("Finished getFutures. sum: " + sum + ", Timestamp: "
                + System.currentTimeMillis());
        threadPool.shutdown();
    }

    Output:

Start submit callable. Timestamp: 1400054835960
Finished submit callable. Start getFutures. Timestamp: 1400054835961
Finished getFutures. sum: 5600, Timestamp: 1400054837961

// The CompletionService is based on ExecutorService to work.
// With that, you have the result in the order they are completed and you don't have to keep a collection of Future.

 

    

 

Reference Links:

1) http://baptiste-wicht.com/posts/2010/09/java-concurrency-part-7-executors-and-thread-pools.html

2) http://docs.spring.io/spring/docs/3.0.x/spring-framework-reference/html/scheduling.html

3) http://stackoverflow.com/questions/141284/the-difference-between-the-runnable-and-callable-interfaces-in-java

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics