`

Spring - ThreadPoolTaskExecutor

 
阅读更多

Spring 擅长对组件的封装和集成, Spring-context对JDK的并发包做了功能增强。

    <bean id="asyncTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    	<property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="10" />
        <property name="keepAliveSeconds" value="60" />
        <property name="queueCapacity" value="100" />
        <property name="allowCoreThreadTimeOut" value="false" />
    </bean>


以上asyncTaskExecutor,你可以注入到任何一个bean去执行,底层使用JDK的ThreadPoolTaskExecutor来管理线程,默认使用的是JDK的线程池.

	@Autowired
	@Qualifier("asyncTaskExecutor")
	private ThreadPoolTaskExecutor asyncTaskExecutor;

         FutureTask<String> f1 = (FutureTask<String>)asyncTaskExecutor.submit(c1);
         if (f1.isDone()) {
              String result = future.get();
         }

以上只是简单的应用,非常方便的开发,我们都不用去处理线程池的初始化,以及线程的管理。

Spring 还增加了对线程的监听,这样当现场成功的时候做一些处理,线程失败的时候做一些处理。

 ListenableFutureTask<String> f1 = (ListenableFutureTask<String>) asyncTaskExecutor.submitListenable(c1);
 f1.addCallback(new ListenableFutureCallback<String>() {

				@Override
				public void onSuccess(String result) {
					//TODO
				}

				@Override
				public void onFailure(Throwable t) {
					// TODO Auto-generated method stub
					t.printStackTrace();
				}
            	
            }); 


这里Spring非常机智的扩展FutureTask的protected方法

/*
 * Copyright 2002-2013 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.util.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * Extension of {@link FutureTask} that implements {@link ListenableFuture}.
 *
 * @author Arjen Poutsma
 * @since 4.0
 */
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {

	private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();


	/**
	 * Create a new {@code ListenableFutureTask} that will, upon running,
	 * execute the given {@link Callable}.
	 * @param callable the callable task
	 */
	public ListenableFutureTask(Callable<T> callable) {
		super(callable);
	}

	/**
	 * Create a {@code ListenableFutureTask} that will, upon running,
	 * execute the given {@link Runnable}, and arrange that {@link #get()}
	 * will return the given result on successful completion.
	 * @param runnable the runnable task
	 * @param result the result to return on successful completion
	 */
	public ListenableFutureTask(Runnable runnable, T result) {
		super(runnable, result);
	}


	@Override
	public void addCallback(ListenableFutureCallback<? super T> callback) {
		this.callbacks.addCallback(callback);
	}

	@Override
	protected final void done() {
		Throwable cause;
		try {
			T result = get();
			this.callbacks.success(result);
			return;
		}
		catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
			return;
		}
		catch (ExecutionException ex) {
			cause = ex.getCause();
			if (cause == null) {
				cause = ex;
			}
		}
		catch (Throwable ex) {
			cause = ex;
		}
		this.callbacks.failure(cause);
	}

}



ListenableFutureCallbackRegistry做了一个简单的策略, 可以添加CallBack, 有两方法success 和 failure.

/*
 * Copyright 2002-2013 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.util.concurrent;

import java.util.LinkedList;
import java.util.Queue;

import org.springframework.util.Assert;

/**
 * Registry for {@link ListenableFutureCallback} instances.
 *
 * <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}.
 *
 * @author Arjen Poutsma
 * @since 4.0
 */
public class ListenableFutureCallbackRegistry<T> {

	private final Queue<ListenableFutureCallback<? super T>> callbacks =
			new LinkedList<ListenableFutureCallback<? super T>>();

	private State state = State.NEW;

	private Object result = null;

	private final Object mutex = new Object();


	/**
	 * Adds the given callback to this registry.
	 * @param callback the callback to add
	 */
	@SuppressWarnings("unchecked")
	public void addCallback(ListenableFutureCallback<? super T> callback) {
		Assert.notNull(callback, "'callback' must not be null");

		synchronized (mutex) {
			switch (state) {
				case NEW:
					callbacks.add(callback);
					break;
				case SUCCESS:
					callback.onSuccess((T)result);
					break;
				case FAILURE:
					callback.onFailure((Throwable) result);
					break;
			}
		}
	}

	/**
	 * Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all added
	 * callbacks with the given result
	 * @param result the result to trigger the callbacks with
	 */
	public void success(T result) {
		synchronized (mutex) {
			state = State.SUCCESS;
			this.result = result;

			while (!callbacks.isEmpty()) {
				callbacks.poll().onSuccess(result);
			}
		}
	}

	/**
	 * Triggers a {@link ListenableFutureCallback#onFailure(Throwable)} call on all added
	 * callbacks with the given {@code Throwable}.
	 * @param t the exception to trigger the callbacks with
	 */
	public void failure(Throwable t) {
		synchronized (mutex) {
			state = State.FAILURE;
			this.result = t;

			while (!callbacks.isEmpty()) {
				callbacks.poll().onFailure(t);
			}
		}
	}

	private enum State {NEW, SUCCESS, FAILURE}

}




这里使用LinkedQueue 来维护Callbacks. 使用add()添加,使用pool()方法来获取Queue的first head去执行success/failure,然后删除,使用LinkedQueue对于
经常insert/delete的队列可是非常快的,很多东西可以学习的(Spring source code).

其实我是不大喜欢自己去实现一套,Spring封装的挺好,跑题了,呵呵.
ListenableFutureTask 重写了JDK FutureTask.done().

   /**
     * Protected method invoked when this task transitions to state
     * {@code isDone} (whether normally or via cancellation). The
     * default implementation does nothing.  Subclasses may override
     * this method to invoke completion callbacks or perform
     * bookkeeping. Note that you can query status inside the
     * implementation of this method to determine whether this task
     * has been cancelled.
     */
    protected void done() { }


当任务结束的时候,FutureTask会调用这个方法。所以这个方法还能异步调用。
比如
   task1  execute 10s
  task2 execute 20s
在同时执行的情况下,task1's callback 方法会先返回, 在实际中,如果有10个任务,这个功能先返回执行完成的任务给客户,对于实时性要求比较高的应用,这是个不错的解决方案。

分享到:
评论

相关推荐

    spring-framework-4.3.2.RELEASE-dist

    8. **异步处理与任务调度**:Spring的TaskExecutor和ThreadPoolTaskExecutor接口提供了异步处理能力,而TaskScheduler则用于计划和执行周期性任务。 9. **WebFlux**:虽然在4.3.2版本中WebFlux尚未引入,但后续版本...

    spring-spring-framework-4.3.24.RELEASE.zip

    9. **任务调度**:Spring的Task模块提供了异步任务和定时任务的支持,包括SimpleAsyncTaskExecutor、ThreadPoolTaskExecutor以及ScheduledTaskRegistrar等。 10. **RESTful支持**:Spring MVC的Controller可以通过@...

    spring-batch分区处理示例

    3. **TaskExecutor**:Spring Batch支持多种类型的`TaskExecutor`,如SimpleAsyncTaskExecutor(基于线程池的异步执行)或ThreadPoolTaskExecutor(自定义线程池),用于并行执行子任务。 4. **Step实现**:每个...

    spring-boot-multithreading.zip_spring boot_多线程

    它主要通过Java的`java.util.concurrent`包以及Spring的`ThreadPoolTaskExecutor`或`AsyncConfigurer`接口来实现。`ThreadPoolTaskExecutor`是一个基于线程池的异步任务执行器,而`AsyncConfigurer`接口则用来配置...

    Spring线程池ThreadPoolTaskExecutor配置详情

    Spring线程池ThreadPoolTaskExecutor配置详情 Spring线程池ThreadPoolTaskExecutor是Spring Framework提供的一种线程池实现,用于管理和执行异步任务。本文将详细介绍ThreadPoolTaskExecutor的配置详情,并提供一...

    spring-framework-3.2.11.RELEASE 源码

    Spring还提供了大量的其他模块,如数据访问/集成(包括JDBC、ORM和OXM支持)、消息(例如JMS支持)、任务调度(如`ThreadPoolTaskExecutor`)和测试工具。这些模块在源码中都有清晰的结构和实现。 通过分析`spring-...

    spring定时任务关键jar包(齐全)

    在Spring中,你可以通过配置`ThreadPoolTaskExecutor`或`SimpleAsyncTaskExecutor`来实现异步任务的执行。 2. **Spring TaskScheduler**: `TaskScheduler`接口是Spring提供的另一个核心组件,用于计划周期性的任务...

    spring-android-1.0.0.M3

    5. **任务调度器(Scheduler)**:Spring for Android包含`TaskExecutor`接口和`ThreadPoolTaskExecutor`实现,提供了一种线程池管理的方式,帮助开发者优雅地执行异步任务,改善UI的响应性能,同时避免资源浪费。...

    spring线程池(同步、异步).docx

    3. `ThreadPoolTaskExecutor`:这是Spring最常用的线程池实现,它包装了`java.util.concurrent.ThreadPoolExecutor`,支持线程池配置,并且是异步执行任务的。 4. `ConcurrentTaskExecutor`:作为`Executor`接口的...

    Spring-quartz实现定时器(含代码).doc

    线程池是用于管理并发任务的核心组件,`ThreadPoolTaskExecutor` 是 Spring 提供的一个基于线程池的任务执行器。在配置中,`corePoolSize` 表示核心线程数(最小线程数),`maxPoolSize` 是最大线程数,`...

    spring3和mybatis3的初始探索

    - 为了性能考虑,可以使用Spring的`ThreadPoolTaskExecutor`进行SQL批处理。 - 使用Spring的`@Scope("prototype")`注解避免单例模式下Mapper接口的线程安全问题。 总的来说,Spring3和MyBatis3的集成使得开发者...

    spring-task:Spring TaskExecutor演示

    Spring提供了一个实现类ThreadPoolTaskExecutor,它基于Java的ThreadPoolExecutor,提供了线程池管理的功能。ThreadPoolTaskExecutor允许我们配置核心线程数、最大线程数、队列大小、超时时间等参数,以满足不同...

    spring-async-example:从2个REST端点异步获取数据并合并响应

    该解决方案的逻辑非常简单,该应用程序使用Spring的@EnableAsync来配置ThreadPoolTaskExecutor ,该ThreadPoolTaskExecutor将用于运行异步方法。 在此应用程序中,使用提供的此ThreadPoolTaskExecutor获取用户及其...

    maven spring 任务调度实列

    &lt;bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"&gt; &lt;!-- 配置线程池的核心线程数、最大线程数、线程存活时间等 --&gt; &lt;bean id="taskScheduler" class="org....

    spring-scheduling-sr:org.springframework.scheduling包源码阅读笔记,学习如何用好Java执行器(高级并发对象)-Spring Framework 3.2.10-源码包

    在Spring Framework 3.2.10中,`org.springframework.scheduling`包是关于Spring调度功能的核心组件。这个包提供了全面的异步任务处理和定时任务管理能力,使得开发者能够轻松地实现定时任务和后台任务的执行。本文...

    spring+quartz使用jar包

    1. **Spring与Quartz的整合**:Spring 提供了对Quartz的集成支持,通过`org.springframework.scheduling.quartz`包中的类,如`SchedulerFactoryBean`和`ThreadPoolTaskExecutor`,可以轻松地将Quartz纳入Spring的...

    spring的自带定时任务

    Spring提供了多种实现,如`ThreadPoolTaskScheduler`和`ThreadPoolTaskExecutor`,它们都基于线程池来提高性能和资源管理。 ### 2. 基于注解的定时任务 #### 2.1 `@Scheduled` Spring的`@Scheduled`注解可以用于...

    spring调度器用到的jar

    你可以实现这个接口或者使用Spring提供的实现,如SimpleAsyncTaskExecutor、ThreadPoolTaskExecutor等。TaskExecutor通常用于立即启动一个任务,而不关心何时或如何执行。 2. TaskScheduler:这是另一个接口,用于...

    Spring3.1 定时器配置所需jar包-文档-xml配置-class类-maven-IDEA

    在Spring 3.1中,可以通过`ThreadPoolTaskExecutor`和`ThreadPoolTaskScheduler`实现这两个接口,以创建线程池来处理定时任务。 为了配置Spring 3.1的定时器,我们需要添加相关的依赖到项目的Maven pom.xml文件中。...

    Spring基于线程池的定时任务线挰异常实践

    线程池的实现可以通过实现这个接口来完成,Spring提供了ThreadPoolTaskExecutor作为线程池的实现。通过配置ThreadPoolTaskExecutor,我们可以定制线程池的参数,如核心线程数、最大线程数、线程存活时间、工作队列...

Global site tag (gtag.js) - Google Analytics