`
woodding2008
  • 浏览: 289608 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java多线程之ExecutorService.invokeAny()

 
阅读更多

方法描述:

 

    /**
     * Executes the given tasks, returning the result
     * of one that has completed successfully (i.e., without throwing
     * an exception), if any do. Upon normal or exceptional return,
     * tasks that have not completed are cancelled.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param <T> the type of the values returned from the tasks
     * @return the result returned by one of the tasks
     * @throws InterruptedException if interrupted while waiting
     * @throws NullPointerException if tasks or any element task
     *         subject to execution is {@code null}
     * @throws IllegalArgumentException if tasks is empty
     * @throws ExecutionException if no task successfully completes
     * @throws RejectedExecutionException if tasks cannot be scheduled
     *         for execution
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

  方法说明:使用invokeAny()方法,当任意一个线程找到结果之后,立刻终结【中断】所有线程。

 

执行结果可能会有以下三种情况:

  • 任务都执行成功,使用过第一个任务返回的结果。
  • 任务都失败了,抛出Exception,invokeAny方法将抛出ExecutionException。
  • 部分任务失败了,会使用第一个成功的任务返回的结果。

玩具代码:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

//使用invokeAny()方法,当任意一个线程找到结果之后,立刻终结【中断】所有线程
public class UserValidator {
	
	private String name;
	public UserValidator(String name){
		this.name = name;
	}
	
	//Mock验证过程,有可能会抛出异常
	public boolean validate(String name ,String password){
		Random random = new Random();
		try{
			long duration = (long)(Math.random()*10);
			System.out.printf("Validator %s: Validating a user during %d seconds \n", this.name,duration);
			TimeUnit.SECONDS.sleep(duration);
		}catch(InterruptedException e){
			e.printStackTrace();
			return false;
		}
		return random.nextBoolean();
	}
	
	public String getName(){
		return name;
	}
	
	public static class TaskValidator implements Callable<String>{
		
		private UserValidator validator;
		private String name;
		private String password;
		
		public TaskValidator(UserValidator validator,String name,String password){
			this.validator = validator;
			this.name = name;
			this.password = password;
		}

		@Override
		public String call() throws Exception {
			
			if(!validator.validate(name, password)){
				System.out.printf("%s : The user has not been found \n", validator.getName());
				throw new Exception("Error validating user");
			}
			
			System.out.printf("%s: The user has been fount \n", validator.getName());
			
			return validator.getName();
		}
		
	}
	

	public static void main(String[] args) {
		
		String username = "test";
		String password = "test";
		UserValidator ldapValidator = new UserValidator("LDAP");
		UserValidator dbValidator = new UserValidator("DataBase");
		
		TaskValidator ldapTask = new TaskValidator(ldapValidator,username,password);
		TaskValidator dbTask = new TaskValidator(dbValidator,username,password);
		
		List<TaskValidator> taskList = new ArrayList<>();
		taskList.add(ldapTask);
		taskList.add(dbTask);
		
		ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
		String result ;
		try{
			//只执行成功其中任何一个即可,会中断其他线程
			result = executor.invokeAny(taskList);
			System.out.printf("Main: Result : %s \n", result);
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(ExecutionException e){
			e.printStackTrace();
		}
		
		executor.shutdown();
		System.out.printf("Main: End of the Execution \n");
		
	}

}

 

 AbstractExecutorService.invokeAny()实现逻辑:

 

    /**
     * the main mechanics of invokeAny.
     */
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

 

 

分享到:
评论

相关推荐

    java多线程并发

    ### Java多线程并发知识点详解 #### 一、Java多线程并发简介 在现代软件开发中,特别是在Java这样的主流编程语言中,多线程并发技术是提高程序执行效率、优化资源利用的关键手段之一。本篇文章将深入探讨Java中的...

    Executor,Executors,ExecutorService比较.docx

    - **invokeAny**(Collection&lt;Callable&lt;T&gt;&gt; tasks)**:在给定的任务列表中,选择并执行一个任务,返回其结果。 - **invokeAll**(Collection&lt;Callable&lt;T&gt;&gt; tasks)**:执行所有的任务,返回一个包含每个任务结果的`...

    JavaThreaddemo_DEMO_tidecme_线程池Java_源码.zip

    Java线程池是Java并发编程中的重要组成部分,它在多线程编程中扮演着至关重要的角色,有效地管理和调度线程资源,提高系统性能并降低资源消耗。本资料"JavaThreaddemo_DEMO_tidecme_线程池Java_源码.zip"包含了关于...

    Java并发程序设计教程

    - `invokeAny(Collection&lt;Callable&lt;T&gt;&gt; tasks)`:执行一组可返回结果的任务,并返回最先完成的那个任务的结果。 - `shutdown()`:不再接受新任务,但继续执行已提交的任务。 - `isTerminated()`:判断所有任务...

    Java并发编程中使用Executors类创建和管理线程的用法

    - `invokeAny(Collection&lt;Callable&lt;T&gt;&gt; tasks)`:在所有任务中选择第一个完成的任务并返回其结果,其他任务会被取消。 在实际应用中,我们可以创建一个简单的`Runnable` 类实现,如`MyThread`,并使用`...

    Java并发编程实战(中文版带详细目录).pdf

    - **方法**:submit、invokeAll、invokeAny等。 以上内容是基于《Java并发编程实战》中文版所涉及的关键知识点进行的详细总结。通过学习这些概念和技术,可以深入理解Java并发编程的核心原理,并能够在实际开发中...

    Executor框架使用详解

    Executor框架是Java并发编程的核心组件,它在Java 5中被引入,极大地简化了多线程编程。这个框架是基于`java.util.concurrent`包中的接口和类构建的,旨在提供线程池服务、任务调度以及并发执行任务的能力。Executor...

    3_AbstractExecutorService源码阅读1

    `AbstractExecutorService`是Java并发编程中的一个关键抽象类,它是`ExecutorService...总的来说,`AbstractExecutorService`是Java并发编程中一个重要的抽象基础,通过理解和使用它可以更高效地管理和控制多线程任务。

    AbrastractExecutorService

    例如,它默认使用了包内提供的`FutureTask`类来实现`submit`、`invokeAny`和`invokeAll`等方法。 ### 方法详解 #### 构造方法 - **AbstractExecutorService()** - **描述**:构造一个新的`...

Global site tag (gtag.js) - Google Analytics