`
codeone
  • 浏览: 46217 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

DeferredResult – asynchronous processing in Spring MVC

    博客分类:
  • java
 
阅读更多

DeferredResult – asynchronous processing in Spring MVC

DeferredResult is a container for possibly not-yet-finished computation that will be available in future. Spring MVC uses it to represent asynchronous computation and take advantage of Servlet 3.0 AsyncContext asynchronous request handling. Just to give a quick impression how it works: 
  
  
  
  
  
  
 

@RequestMapping("/")
@ResponseBody
public DeferredResult<String> square() throws JMSException {
    final DeferredResult<String> deferredResult = new DeferredResult<>();
    runInOtherThread(deferredResult);
    return deferredResult;
}
 
private void runInOtherThread(DeferredResult<String> deferredResult) {
    //seconds later in other thread...
    deferredResult.setResult("HTTP response is: 42");
}

Normally once you leave controller handler method request processing is done. But not with DeferredResult . Spring MVC (using Servlet 3.0 capabilities) will hold on with the response, keeping idle HTTP connection. HTTP worker thread is no longer used, but HTTP connection is still open. Later some other thread will resolve DeferredResult by assigning some value to it. Spring MVC will immediately pick up this event and send response ( “HTTP response is: 42″ in this example) to the browser, finishing request processing.

You might see some conceptual similarity between Future<V> and DeferredResult – they both represent computation with result available some time in the future. You might wonder, why Spring MVC doesn’t allow us to simply return Future<V> but instead introduced new, proprietary abstraction? The reason is simply and once again shows Future<V> deficiencies. The whole point of asynchronous processing is avoid blocking threads. Standard java.util.concurrent.Future does not allow registering callbacks when computation is done – so you either need to devote one thread to block until future is done or use one thread to poll several futures periodically. However the latter option consumes more CPU and introduces latency. But superior ListenableFuture<V> from Guava seems like a good fit? True, but Spring doesn’t have a dependency on Guava, thankfully bridging these two APIs is pretty straightforward.

But first have a look at previous part on implementing custom java.util.concurrent.Future<V> . Admittedly it wasn’t as simple as one might expect. Clean up, handling interruptions, locking and synchronization, maintaining state. A lot of boilerplate when everything we need is as simple as receiving a message and returning it from get() . Let us try to retrofit previous implementation of JmsReplyFuture to also implement more powerful ListenableFuture – so we can use it later in Spring MVC.

ListenableFuture simply extends standard Future adding possibility to register callbacks (listeners). So an eager developer would simply sit down and add list of Runnable listeners to existing implementation:

public class JmsReplyFuture<T extends Serializable> implements ListenableFuture<T>, MessageListener {
 
    private final List<Runnable> listeners = new ArrayList<Runnable>();
 
    @Override
    public void addListener(Runnable listener, Executor executor) {
        listeners.add(listener);
    }
 
    //...

But it’s greatly oversimplified. Of course we must iterate over all listeners when future is done or exception occurs. If the future is already resolved when we add a listener, we must call that listener immediately. Moreover we ignore executor – according to API each listener may use a different thread pool supplied to addListener() so we must store pairs: Runnable + Executor . Last but not least addListener() is not thread safe. Eager developer would fix all this in a matter of an hour or two. And spend two more hours to fix bugs introduced in the meantime. And few more hours weeks later when another “impossible” bug pops-up on production. I am not eager. As a matter of fact, I am too lazy to write even the simplest implementation above. But I am desperate enough to hit Ctrl + H ( Subtypes view in IntelliJ IDEA) on ListenableFuture and scan through available skeletal implementations tree. AbstractFuture<V> – Bingo!

public class JmsReplyListenableFuture<T extends Serializable> extends AbstractFuture<T> implements MessageListener {
 
    private final Connection connection;
    private final Session session;
    private final MessageConsumer replyConsumer;
 
    public JmsReplyListenableFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {
        this.connection = connection;
        this.session = session;
        this.replyConsumer = session.createConsumer(replyQueue);
        this.replyConsumer.setMessageListener(this);
    }
 
    @Override
    public void onMessage(Message message) {
        try {
            final ObjectMessage objectMessage = (ObjectMessage) message;
            final Serializable object = objectMessage.getObject();
            set((T) object);
            cleanUp();
        } catch (Exception e) {
            setException(e);
        }
    }
 
    @Override
    protected void interruptTask() {
        cleanUp();
    }
 
    private void cleanUp() {
        try {
            replyConsumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            Throwables.propagate(e);
        }
    }
}

That’s it, everything, compile and run. Almost 2x less code compared to initial implementation and we get much more powerful ListenableFuture . Most of the code is set up and clean up. AbstractFuture already implements addListener() , locking and state handling for us. All we have to do is call set() method when future is resolved (JMS reply arrives in our case). Moreover we finally support exceptions properly. Previously we simply ignored/rethrown them while now they are correctly wrapped and thrown from get() when accessed. Even if we weren’t interested in ListenableFuture capabilities, AbstractFuture still helps us a lot. And we get ListenableFuture for free.

Good programmers love writing code. Better ones love deleting it . Less to maintain, less to test, less to break. I am sometimes amazed how helpful Guava can be. Last time I was working with iterator-heavy piece of code. Data was generated dynamically and iterators could easily produce millions of items so I had no choice. Limited iterator API together with quite complex business logic is a recipe for endless amount of plumbing code. And then I found Iterators utility class and it saved my life. I suggest you to open JavaDoc of Guava and go through all packages, class by class. You’ll thank me later.

Once we have our custom ListenableFuture in place (obviously you can use any implementation) we can try integrating it with Spring MVC. Here is what we want to achieve:

  1. HTTP request comes in
  2. We send a request to JMS queue
  3. HTTP worker thread is no longer used, it can serve other requests
  4. JMS listener asynchronously waits for a reply in temporary queue
  5. Once the reply arrives we push it immediately as an HTTP response and the connection is done.

First naive implementation using blocking Future :

@Controller
public class JmsController {
 
    private final ConnectionFactory connectionFactory;
 
    public JmsController(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }
 
    @RequestMapping("/square/{value}")
    @ResponseBody
    public String square(@PathVariable double value) throws JMSException, ExecutionException, InterruptedException {
        final ListenableFuture<Double> responseFuture = request(value);
        return responseFuture.get().toString();
    }
 
    //JMS API boilerplate
    private <T extends Serializable> ListenableFuture<T> request(Serializable request) throws JMSException {
        Connection connection = this.connectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        final Queue tempReplyQueue = session.createTemporaryQueue();
        final ObjectMessage requestMsg = session.createObjectMessage(request);
        requestMsg.setJMSReplyTo(tempReplyQueue);
        sendRequest(session.createQueue("square"), session, requestMsg);
        return new JmsReplyListenableFuture<T>(connection, session, tempReplyQueue);
    }
 
    private void sendRequest(Queue queue, Session session, ObjectMessage requestMsg) throws JMSException {
        final MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.send(requestMsg);
        producer.close();
    }
 
}

This implementation is not very fortunate. As a matter of fact we don’t need Future at all as we are barely blocking on get() , synchronously waiting for a response. Let’s try with DeferredResult :

@RequestMapping("/square/{value}")
@ResponseBody
public DeferredResult<String> square(@PathVariable double value) throws JMSException {
    final DeferredResult<String> deferredResult = new DeferredResult<>();
    final ListenableFuture<Double> responseFuture = request(value);
    Futures.addCallback(responseFuture, new FutureCallback<Double>() {
        @Override
        public void onSuccess(Double result) {
            deferredResult.setResult(result.toString());
        }
 
        @Override
        public void onFailure(Throwable t) {
            deferredResult.setErrorResult(t);
        }
    });
    return deferredResult;
}

Much more complex, but will also be much more scalable. This method takes almost no time to execute and HTTP worker thread is shortly after ready to handle another request. The biggest observation to make is that onSuccess() and onFailure() are executed by another thread, seconds or even minutes later. But HTTP worker thread pool is not exhausted and application remains responsive.

This was a school book example, but can we do better? First attempt is to write generic adapter from ListenableFuture to DeferredResult . These two abstractions represent exactly the same thing, but with different API. It’s quite straightforward:

public class ListenableFutureAdapter<T> extends DeferredResult<String> {
 
    public ListenableFutureAdapter(final ListenableFuture<T> target) {
        Futures.addCallback(target, new FutureCallback<T>() {
            @Override
            public void onSuccess(T result) {
                setResult(result.toString());
            }
 
            @Override
            public void onFailure(Throwable t) {
                setErrorResult(t);
            }
        });
    }
}

We simply extend DeferredResult and notify it using ListenableFuture callbacks. Usage is simple:

@RequestMapping("/square/{value}")
@ResponseBody
public DeferredResult<String> square(@PathVariable double value) throws JMSException {
    final ListenableFuture<Double> responseFuture = request(value);
    return new ListenableFutureAdapter<>(responseFuture);
}

But we can do even better! If ListenableFuture and DeferredResult are so similar, why not simply return ListenableFuture from the controller handler method?

@RequestMapping("/square/{value}")
@ResponseBody
public ListenableFuture<Double> square2(@PathVariable double value) throws JMSException {
    final ListenableFuture<Double> responseFuture = request(value);
    return responseFuture;
}

Well, it won’t work because Spring doesn’t understand ListenableFuture and will just blow up. Fortunately Spring MVC is very flexible and it allows us to easily register new so-called HandlerMethodReturnValueHandler . There are 12 such built-in handlers and every time we return some object from a controller, Spring MVC examines them in predefined order and chooses the first one that can handle given type. One such handler is DeferredResultHandler (name says it all) which we will use as a reference:

public class ListenableFutureReturnValueHandler implements HandlerMethodReturnValueHandler {
 
    public boolean supportsReturnType(MethodParameter returnType) {
        Class<?> paramType = returnType.getParameterType();
        return ListenableFuture.class.isAssignableFrom(paramType);
    }
 
    public void handleReturnValue(Object returnValue,
                                  MethodParameter returnType, ModelAndViewContainer mavContainer,
                                  NativeWebRequest webRequest) throws Exception {
 
        if (returnValue == null) {
            mavContainer.setRequestHandled(true);
            return;
        }
 
        final DeferredResult<Object> deferredResult = new DeferredResult<>();
        Futures.addCallback((ListenableFuture<?>) returnValue, new FutureCallback<Object>() {
            @Override
            public void onSuccess(Object result) {
                deferredResult.setResult(result.toString());
            }
 
            @Override
            public void onFailure(Throwable t) {
                deferredResult.setErrorResult(t);
            }
        });
        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
    }
 
}

Running out of karma, installing this handler is not as straightforward as I had hoped. Technically there is WebMvcConfigurerAdapter.addReturnValueHandlers()which we can easily override if using Java configuration for Spring MVC. But this method adds custom return value handler at the end of handlers chain and for reasons beyond the scope of this article we need to add it at the beginning (higher priority). Fortunately with a little bit of hacking we can achieve that as well:

@Configuration
@EnableWebMvc
public class SpringConfig extends WebMvcConfigurerAdapter {
 
    @Resource
    private RequestMappingHandlerAdapter requestMappingHandlerAdapter;
 
    @PostConstruct
    public void init() {
        final List<HandlerMethodReturnValueHandler> originalHandlers = new ArrayList<>(requestMappingHandlerAdapter.getReturnValueHandlers().getHandlers());
        originalHandlers.add(0, listenableFutureReturnValueHandler());
        requestMappingHandlerAdapter.setReturnValueHandlers(originalHandlers);
    }
 
    @Bean
    public HandlerMethodReturnValueHandler listenableFutureReturnValueHandler() {
        return new ListenableFutureReturnValueHandler();
    }
 
}

Summary

In this article we familiarized ourselves with another incarnation of future/promise abstraction called DeferredResult . It is used to postpone handling of HTTP request until some asynchronous task finishes. Thus DeferredResult is great for web GUIs built on top of event-driven systems, message brokers, etc. It is not as powerful as raw Servlet 3.0 API though. For example we cannot stream multiple events as they arrive (e.g. new tweets) in long-running HTTP connection – Spring MVC is designed more toward request-response pattern.

We also tweaked Spring MVC to allow returning ListenableFuture from Guava directly from controller method. It makes our code much cleaner and expressive. 
 

Reference: DeferredResult – asynchronous processing in Spring MVC from our JCG partner Tomasz Nurkiewicz at the NoBlogDefFound blog.

分享到:
评论

相关推荐

    spring MVC .docx

    14. **Asynchronous Request Processing**: Spring MVC支持异步请求处理,可以通过`@Async`注解实现后台任务的并发执行。 15. **Internationalization (i18n) and Localization (l10n)**: Spring MVC提供对国际化和...

    Getting.started.with.Spring.Framework.2nd.Edition1491011912.epub

    Getting started with Spring ...Chapter 13 – More Spring Web MVC – internationalization, file upload and asynchronous request processing Chapter 14 – Securing applications using Spring Security

    Spring.Essentials.178398

    Build lightning-fast web applications and REST APIs using Spring MVC and its asynchronous processing capabilities with the view technologies of your choice Explore simplified but powerful data access ...

    Manning.Spring.in.Action.4th.Edition.2014.11.epub

    Praise for the Third Edition of Spring in Action Preface Acknowledgments About this Book 1. Core Spring Chapter 1. Springing into action 1.1. Simplifying Java development 1.1.1. Unleashing the power ...

    SpringInAction3

    增加了对Asynchronous processing的支持,允许开发者处理非阻塞式的任务;提供了声明式事务管理,简化了事务控制;并且强化了对Web应用的支持,包括Spring MVC的增强和对RESTful服务的构建。 书中通过丰富的示例...

    Spring Framework 概述 + Spring从入门到精通

    5. **Asynchronous Processing**:Spring对异步处理的支持,包括Task Execution和Task Scheduling,可以用来创建后台任务和定时任务。 6. **Spring Boot**:这是一个快速开发工具,基于Spring Framework,简化了...

    spring源码spring-framework-4.3.2.RELEASE

    1. **Asynchronous Processing**:Spring 4.3.2增强了异步处理能力,允许通过`@Async`注解标记异步方法,提高系统并发性能。 2. **Batch Processing**:Spring Batch提供了批量处理框架,适用于大数据量的批处理...

    spring api

    首先,Spring 3.0引入了许多重要的改进,包括对Asynchronous Processing的支持,这使得在处理大量并发请求时性能得以提升。另外,Spring 3.0增强了对JSR-303 Bean Validation的支持,提供了数据验证的能力,帮助...

    csdn spring包下载

    5. **Asynchronous Processing**:Spring的Async模块支持异步处理,允许在非阻塞方式下执行任务,提高了系统的并发性能。 6. **Test**:Spring Test模块为Spring应用程序提供了测试支持,包括单元测试和集成测试,...

    Spring框架JAR包

    5. **Asynchronous Processing**:Spring Framework提供了异步处理的支持,包括异步方法调用和消息驱动的POJO,这在高并发环境下非常有用。 6. **Test**:测试模块支持对Spring应用进行单元测试和集成测试,提供了...

    spring(纯手写spring)

    - **Asynchronous Processing**: 支持异步处理和任务调度。 - **Spring Batch**: 用于批量处理任务的模块。 5. **Spring Boot** - **快速启动**: Spring Boot简化了Spring应用的初始化和配置,通过默认配置极大...

    spring 源码 依赖 jar包

    5. **Asynchronous Processing**:Spring提供了对异步处理的支持,包括基于Executor的TaskExecutor和基于Message-driven的essagingTemplate,可以用于实现后台任务和消息驱动的应用。 6. **Test**:Spring的测试...

    spring4.2的lib包

    5. **Asynchronous Processing**:Spring 4.2版本进一步增强了异步处理能力,开发者可以更方便地利用多线程和并发来提升应用性能。 在`spring4.2.jar`这个文件中,包含了上述所有模块的实现。例如,核心容器的相关...

    Spring源码+配套gradle研究Spring源码可直接使用

    7. **Asynchronous Processing**:Spring还支持异步处理,如消息驱动的bean和TaskExecutor接口,可以提高系统的并发性能。 关于Gradle,这是一个强大的构建自动化工具,相较于Maven,Gradle提供了更灵活的构建配置...

    spring-framework-3.2.2.RELEASE

    - **Asynchronous Processing**:Spring 3.2引入异步处理,允许在后台线程执行耗时操作。 Spring Framework 3.2.2.RELEASE是一个稳定且广泛使用的版本,它不仅包含了上述的诸多特性,还修复了许多已知问题,提升了...

    spring-javadoc-api.rar

    6. **Asynchronous Processing**:Spring 3.1引入了异步处理支持,可以使用`@Async`注解在后台线程执行方法,提高系统性能。 7. **Transactions**:Spring的事务管理模块支持编程式和声明式事务管理,可以无缝集成...

    spring source code

    - **Asynchronous Processing**:包括Message Broker和Task Execution模块,支持异步处理和任务调度。 - **Testing**:提供测试支持,包括Mock对象和测试工具。 2. **核心模块:Spring Beans** Beans模块是...

    Spring3.0源代码

    10. **Asynchronous Processing**:Spring3.0增加了对异步处理的支持,允许在后台执行任务,提高系统性能。 通过对Spring3.0.5源代码的深入研究,开发者可以更好地理解其工作原理,从而优化自己的代码,提升应用...

    spring所有jar包

    - **Asynchronous Processing**:如Task Execution和Task Scheduling模块,支持异步任务执行。 - **Instrumentation**:为应用服务器提供类加载器和应用监控工具。 - **MVC**:Spring Web MVC,用于构建现代的、...

Global site tag (gtag.js) - Google Analytics