Spring 擅长对组件的封装和集成, Spring-context对JDK的并发包做了功能增强。
<bean id="asyncTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="10" />
<property name="keepAliveSeconds" value="60" />
<property name="queueCapacity" value="100" />
<property name="allowCoreThreadTimeOut" value="false" />
</bean>
以上asyncTaskExecutor,你可以注入到任何一个bean去执行,底层使用JDK的ThreadPoolTaskExecutor来管理线程,默认使用的是JDK的线程池.
@Autowired
@Qualifier("asyncTaskExecutor")
private ThreadPoolTaskExecutor asyncTaskExecutor;
FutureTask<String> f1 = (FutureTask<String>)asyncTaskExecutor.submit(c1);
if (f1.isDone()) {
String result = future.get();
}
以上只是简单的应用,非常方便的开发,我们都不用去处理线程池的初始化,以及线程的管理。
Spring 还增加了对线程的监听,这样当现场成功的时候做一些处理,线程失败的时候做一些处理。
ListenableFutureTask<String> f1 = (ListenableFutureTask<String>) asyncTaskExecutor.submitListenable(c1);
f1.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
//TODO
}
@Override
public void onFailure(Throwable t) {
// TODO Auto-generated method stub
t.printStackTrace();
}
});
这里Spring非常机智的扩展FutureTask的protected方法
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* Extension of {@link FutureTask} that implements {@link ListenableFuture}.
*
* @author Arjen Poutsma
* @since 4.0
*/
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> {
private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();
/**
* Create a new {@code ListenableFutureTask} that will, upon running,
* execute the given {@link Callable}.
* @param callable the callable task
*/
public ListenableFutureTask(Callable<T> callable) {
super(callable);
}
/**
* Create a {@code ListenableFutureTask} that will, upon running,
* execute the given {@link Runnable}, and arrange that {@link #get()}
* will return the given result on successful completion.
* @param runnable the runnable task
* @param result the result to return on successful completion
*/
public ListenableFutureTask(Runnable runnable, T result) {
super(runnable, result);
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.callbacks.addCallback(callback);
}
@Override
protected final void done() {
Throwable cause;
try {
T result = get();
this.callbacks.success(result);
return;
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
catch (ExecutionException ex) {
cause = ex.getCause();
if (cause == null) {
cause = ex;
}
}
catch (Throwable ex) {
cause = ex;
}
this.callbacks.failure(cause);
}
}
ListenableFutureCallbackRegistry做了一个简单的策略, 可以添加CallBack, 有两方法success 和 failure.
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util.concurrent;
import java.util.LinkedList;
import java.util.Queue;
import org.springframework.util.Assert;
/**
* Registry for {@link ListenableFutureCallback} instances.
*
* <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}.
*
* @author Arjen Poutsma
* @since 4.0
*/
public class ListenableFutureCallbackRegistry<T> {
private final Queue<ListenableFutureCallback<? super T>> callbacks =
new LinkedList<ListenableFutureCallback<? super T>>();
private State state = State.NEW;
private Object result = null;
private final Object mutex = new Object();
/**
* Adds the given callback to this registry.
* @param callback the callback to add
*/
@SuppressWarnings("unchecked")
public void addCallback(ListenableFutureCallback<? super T> callback) {
Assert.notNull(callback, "'callback' must not be null");
synchronized (mutex) {
switch (state) {
case NEW:
callbacks.add(callback);
break;
case SUCCESS:
callback.onSuccess((T)result);
break;
case FAILURE:
callback.onFailure((Throwable) result);
break;
}
}
}
/**
* Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all added
* callbacks with the given result
* @param result the result to trigger the callbacks with
*/
public void success(T result) {
synchronized (mutex) {
state = State.SUCCESS;
this.result = result;
while (!callbacks.isEmpty()) {
callbacks.poll().onSuccess(result);
}
}
}
/**
* Triggers a {@link ListenableFutureCallback#onFailure(Throwable)} call on all added
* callbacks with the given {@code Throwable}.
* @param t the exception to trigger the callbacks with
*/
public void failure(Throwable t) {
synchronized (mutex) {
state = State.FAILURE;
this.result = t;
while (!callbacks.isEmpty()) {
callbacks.poll().onFailure(t);
}
}
}
private enum State {NEW, SUCCESS, FAILURE}
}
这里使用LinkedQueue 来维护Callbacks. 使用add()添加,使用pool()方法来获取Queue的first head去执行success/failure,然后删除,使用LinkedQueue对于
经常insert/delete的队列可是非常快的,很多东西可以学习的(Spring source code).
其实我是不大喜欢自己去实现一套,Spring封装的挺好,跑题了,呵呵.
ListenableFutureTask 重写了JDK FutureTask.done().
/**
* Protected method invoked when this task transitions to state
* {@code isDone} (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }
当任务结束的时候,FutureTask会调用这个方法。所以这个方法还能异步调用。
比如
task1 execute 10s
task2 execute 20s
在同时执行的情况下,task1's callback 方法会先返回, 在实际中,如果有10个任务,这个功能先返回执行完成的任务给客户,对于实时性要求比较高的应用,这是个不错的解决方案。
分享到:
相关推荐
8. **异步处理与任务调度**:Spring的TaskExecutor和ThreadPoolTaskExecutor接口提供了异步处理能力,而TaskScheduler则用于计划和执行周期性任务。 9. **WebFlux**:虽然在4.3.2版本中WebFlux尚未引入,但后续版本...
9. **任务调度**:Spring的Task模块提供了异步任务和定时任务的支持,包括SimpleAsyncTaskExecutor、ThreadPoolTaskExecutor以及ScheduledTaskRegistrar等。 10. **RESTful支持**:Spring MVC的Controller可以通过@...
3. **TaskExecutor**:Spring Batch支持多种类型的`TaskExecutor`,如SimpleAsyncTaskExecutor(基于线程池的异步执行)或ThreadPoolTaskExecutor(自定义线程池),用于并行执行子任务。 4. **Step实现**:每个...
它主要通过Java的`java.util.concurrent`包以及Spring的`ThreadPoolTaskExecutor`或`AsyncConfigurer`接口来实现。`ThreadPoolTaskExecutor`是一个基于线程池的异步任务执行器,而`AsyncConfigurer`接口则用来配置...
Spring线程池ThreadPoolTaskExecutor配置详情 Spring线程池ThreadPoolTaskExecutor是Spring Framework提供的一种线程池实现,用于管理和执行异步任务。本文将详细介绍ThreadPoolTaskExecutor的配置详情,并提供一...
Spring还提供了大量的其他模块,如数据访问/集成(包括JDBC、ORM和OXM支持)、消息(例如JMS支持)、任务调度(如`ThreadPoolTaskExecutor`)和测试工具。这些模块在源码中都有清晰的结构和实现。 通过分析`spring-...
在Spring中,你可以通过配置`ThreadPoolTaskExecutor`或`SimpleAsyncTaskExecutor`来实现异步任务的执行。 2. **Spring TaskScheduler**: `TaskScheduler`接口是Spring提供的另一个核心组件,用于计划周期性的任务...
5. **任务调度器(Scheduler)**:Spring for Android包含`TaskExecutor`接口和`ThreadPoolTaskExecutor`实现,提供了一种线程池管理的方式,帮助开发者优雅地执行异步任务,改善UI的响应性能,同时避免资源浪费。...
3. `ThreadPoolTaskExecutor`:这是Spring最常用的线程池实现,它包装了`java.util.concurrent.ThreadPoolExecutor`,支持线程池配置,并且是异步执行任务的。 4. `ConcurrentTaskExecutor`:作为`Executor`接口的...
线程池是用于管理并发任务的核心组件,`ThreadPoolTaskExecutor` 是 Spring 提供的一个基于线程池的任务执行器。在配置中,`corePoolSize` 表示核心线程数(最小线程数),`maxPoolSize` 是最大线程数,`...
- 为了性能考虑,可以使用Spring的`ThreadPoolTaskExecutor`进行SQL批处理。 - 使用Spring的`@Scope("prototype")`注解避免单例模式下Mapper接口的线程安全问题。 总的来说,Spring3和MyBatis3的集成使得开发者...
Spring提供了一个实现类ThreadPoolTaskExecutor,它基于Java的ThreadPoolExecutor,提供了线程池管理的功能。ThreadPoolTaskExecutor允许我们配置核心线程数、最大线程数、队列大小、超时时间等参数,以满足不同...
该解决方案的逻辑非常简单,该应用程序使用Spring的@EnableAsync来配置ThreadPoolTaskExecutor ,该ThreadPoolTaskExecutor将用于运行异步方法。 在此应用程序中,使用提供的此ThreadPoolTaskExecutor获取用户及其...
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 配置线程池的核心线程数、最大线程数、线程存活时间等 --> <bean id="taskScheduler" class="org....
在Spring Framework 3.2.10中,`org.springframework.scheduling`包是关于Spring调度功能的核心组件。这个包提供了全面的异步任务处理和定时任务管理能力,使得开发者能够轻松地实现定时任务和后台任务的执行。本文...
1. **Spring与Quartz的整合**:Spring 提供了对Quartz的集成支持,通过`org.springframework.scheduling.quartz`包中的类,如`SchedulerFactoryBean`和`ThreadPoolTaskExecutor`,可以轻松地将Quartz纳入Spring的...
Spring提供了多种实现,如`ThreadPoolTaskScheduler`和`ThreadPoolTaskExecutor`,它们都基于线程池来提高性能和资源管理。 ### 2. 基于注解的定时任务 #### 2.1 `@Scheduled` Spring的`@Scheduled`注解可以用于...
你可以实现这个接口或者使用Spring提供的实现,如SimpleAsyncTaskExecutor、ThreadPoolTaskExecutor等。TaskExecutor通常用于立即启动一个任务,而不关心何时或如何执行。 2. TaskScheduler:这是另一个接口,用于...
在Spring 3.1中,可以通过`ThreadPoolTaskExecutor`和`ThreadPoolTaskScheduler`实现这两个接口,以创建线程池来处理定时任务。 为了配置Spring 3.1的定时器,我们需要添加相关的依赖到项目的Maven pom.xml文件中。...
线程池的实现可以通过实现这个接口来完成,Spring提供了ThreadPoolTaskExecutor作为线程池的实现。通过配置ThreadPoolTaskExecutor,我们可以定制线程池的参数,如核心线程数、最大线程数、线程存活时间、工作队列...