`
IXHONG
  • 浏览: 449328 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【转】聊聊java高并发系统之异步非阻塞

阅读更多

[京东技术]声明:本文转载自微信公众号“开涛的博客”,转载务必声明。

 

在做电商系统时,流量入口如首页、活动页、商品详情页等系统承载了网站的大部分流量,而这些系统的主要职责包括聚合数据拼装模板、热点统计、缓存、下游功能降级开关、托底数据等等。其中聚合数据需要调用其它多个系统服务获取数据、拼装数据/模板然后返回给前端,聚合数据来源主要有依赖系统/服务、缓存、数据库等;而系统之间的调用可以通过如http接口调用(如HttpClient)、SOA服务调用(如dubbo、thrift)等等。

 

在Java中,如使用Tomcat,一个请求会分配一个线程进行请求处理,该线程负责获取数据、拼装数据或模板然后返回给前端;在同步调用获取数据接口的情况下(等待依赖系统返回数据),整个线程是一直被占用并阻塞的。如果有大量的这种请求,每个请求占用一个线程,但线程一直处于阻塞,降低了系统的吞吐量,这将导致应用的吞吐量下降;我们希望在调用依赖的服务响应比较慢,此时应该让出线程和CPU来处理下一个请求,当依赖的服务返回了再分配相应的线程来继续处理。而这应该有更好的解决方案:异步/协程。而Java是不支持协程的(虽然有些Java框架说支持,但还是高层API的封装),因此在Java中我们还可以使用异步来提升吞吐量。目前java一些开源框架(HttpClient\HttpAsyncClient、dubbo、thrift等等)大部分都支持。

 

几种调用方式

同步阻塞调用

即串行调用,响应时间为所有服务的响应时间总和;

 

半异步(异步Future)

线程池,异步Future,使用场景:并发请求多服务,总耗时为最长响应时间;提升总响应时间,但是阻塞主请求线程,高并发时依然会造成线程数过多,CPU上下文切换;

 

全异步(Callback)

Callback方式调用,使用场景:不考虑回调时间且只能对结果做简单处理,如果依赖服务是两个或两个以上服务,则不能合并两个服务的处理结果;不阻塞主请求线程,但使用场景有限。

 

异步回调链式编排

异步回调链式编排(JDK8 CompletableFuture),使用场景:其实不是异步调用方式,只是对依赖多服务的Callback调用结果处理做结果编排,来弥补Callback的不足,从而实现全异步链式调用。

 

接下来看看如何设计利用全异步Callback调用和异步回调链式编排处理结果来实现全异步系统设计。

 

同步阻塞调用

public class Test {

   public static void main(String[] args) throws Exception {

       RpcService rpcService = new RpcService();

       HttpService httpService = new HttpService();

       //耗时10ms

       Map<String, String> result1 = rpcService.getRpcResult();

       //耗时20ms

       Integer result2 = httpService.getHttpResult();

       //总耗时30ms

    }

   static class RpcService {

       Map<String, String> getRpcResult() throws Exception {

           //调用远程方法(远程方法耗时约10ms,可以使用Thread.sleep模拟)

       }

    }

   static class HttpService {

       Integer getHttpResult() throws Exception {

           //调用远程方法(远程方法耗时约20ms,可以使用Thread.sleep模拟)

           Thread.sleep(20);

           return 0;

       }

    }

}

 

半异步(异步Future)

public class Test {

   final static ExecutorService executor = Executors.newFixedThreadPool(2);

   public static void main(String[] args) {

       RpcService rpcService = new RpcService();

       HttpService httpService = new HttpService();

       Future<Map<String, String>> future1 = null;

       Future<Integer> future2 = null;

       try {

           future1 = executor.submit(() -> rpcService.getRpcResult());

           future2 = executor.submit(() -> httpService.getHttpResult());

           //耗时10ms

           Map<String, String> result1 = future1.get(300, TimeUnit.MILLISECONDS);

           //耗时20ms

           Integer result2 = future2.get(300, TimeUnit.MILLISECONDS);

           //总耗时20ms

       } catch (Exception e) {

           if (future1 != null) {

                future1.cancel(true);

           }

           if (future2 != null) {

                future2.cancel(true);

           }

           throw new RuntimeException(e);

       }

    }

   static class RpcService {

       Map<String, String> getRpcResult() throws Exception {

           //调用远程方法(远程方法耗时约10ms,可以使用Thread.sleep模拟)

       }

    }

   static class HttpService {

       Integer getHttpResult() throws Exception {

           //调用远程方法(远程方法耗时约20ms,可以使用Thread.sleep模拟)

       }

    }

}

 

全异步(Callback)

public class AsyncTest {

public staticHttpAsyncClient httpAsyncClient;

   public static CompletableFuture<String> getHttpData(String url) {

       CompletableFuture asyncFuture = new CompletableFuture();

       HttpPost post = new HttpPost(url);

       HttpAsyncRequestProducer producer = HttpAsyncMethods.create(post);

       AsyncCharConsumer<HttpResponse> consumer = newAsyncCharConsumer<HttpResponse>() {

            HttpResponse response;

           protected HttpResponse buildResult(final HttpContext context) {

                return response;

           }

…...

       };

       FutureCallback callback = new FutureCallback<HttpResponse>() {

           public void completed(HttpResponse response) {

               asyncFuture.complete(EntityUtils.toString(response.getEntity()));

           }

…...

       };

       httpAsyncClient.execute(producer, consumer, callback);

       return asyncFuture;

    }

 

   public static void main(String[] args) throws Exception {

       AsyncTest.getHttpData("网页链接);

       Thread.sleep(1000000);

    }

}

 

本示例使用HttpAsyncClient演示。

 

异步回调链式编排

CompletableFuture提供了50多个API,可以满足所需的各种场景的异步处理的编排,在此列举三个场景:

 

场景1:三个服务并发异步调用,返回CompletableFuture,不阻塞主线程;


方法test1:

   public static void test1() throws Exception {

       HelloClientDemoTest service = new HelloClientDemoTest();

       /**

        * 场景1 两个以上服务并发异步调用,返回CompletableFuture,不阻塞主线程

        * 并且两个服务也是异步非阻塞调用

        */

       CompletableFuture future1 = service.getHttpData("网页链接);

       CompletableFuture future2 = service.getHttpData("网页链接);

       CompletableFuture future3 =service.getHttpData("网页链接);

       List<CompletableFuture> futureList = Lists.newArrayList(future1,future2, future3);

       CompletableFuture<Void> allDoneFuture =CompletableFuture.allOf(futureList.toArray(newCompletableFuture[futureList.size()]));

       CompletableFuture<String> future4 =allDoneFuture.thenApply(v -> {

            List<Object> result =futureList.stream().map(CompletableFuture::join)

                   .collect(Collectors.toList());

            //注意顺序

            String result1 = (String)result.get(0);

            String result2 = (String)result.get(1);

            String result3 = (String)result.get(2);

            //处理业务....

            return result1 + result2 + result3;

        }).exceptionally(e -> {

            //e.printStackTrace();

            return "";

        });

       //返回

    }

 

场景2、两个服务并发异步调用,返回CompletableFuture,不阻塞主线程;


方法test2:

   public void test2() throws Exception {

       HelloClientDemoTest service = new HelloClientDemoTest();

       /**

        * 场景2 两个接口并发异步调用,返回CompletableFuture,不阻塞主线程

        * 并且两个服务也是异步非阻塞调用

        */

       CompletableFuture future1 = service.getHttpData("网页链接);

       CompletableFuture future2 =service.getHttpData("网页链接);

       CompletableFuture future3 =future1.thenCombine(future2, (f1, f2) -> {

            //理业务....

            return f1 + "," + f2;

        }).exceptionally(e -> {

            return "";

        });

       //返回

    }

 

场景3、两个服务,并发异步调用两个服务,并且一个服务的结果返回后再次调用另一服务,然后将三个结果后并处理,返回CompletableFuture,整个处理过程中不阻塞任何线程;

方法test3:

    publicvoid test3() throws Exception {

       HelloClientDemoTest service = new HelloClientDemoTest();

       /**

        * 场景3 两请求依赖调用,然后与另一服务结果组合处理,返回CompletableFuture,不阻塞主线程

        * 并且两个服务也是异步非阻塞调用

        */

        CompletableFuture future1 = service.getHttpData("网页链接);

        CompletableFuture future2 = service.getHttpData("网页链接);

        CompletableFuture<String> future3= future1.thenApply((param) -> {

            CompletableFuture future4 =service.getHttpData("网页链接);

            return future4;

        });

        CompletableFuture future5 =future2.thenCombine(future3, (f2, f3) -> {

            //....处理业务

            return f2 + "," + f3;

        }).exceptionally(e -> {

            return "";

        });

        //返回future5

    }

 

全异步Web系统设计

主要技术:servlet3,JDK8 CompletableFuture,支持异步Callback调用的RPC框架。

 

先看一下处理流程图:

servlet3:Servlet 接收到请求之后,可能首先需要对请求携带的数据进行一些预处理;接着,Servlet 线程将请求转交给一个异步线程来执行业务处理,线程本身返回至容器。针对业务处理较耗时的情况,这将大大减少服务器资源的占用,并且提高并发处理速度。servlet3可参考商品详情页系统的Servlet3异步化实践,结合其中讲解的servlet3整合:

public void submitFuture(finalHttpServletRequest req, final Callable<CompletableFuture> task) throwsException{

       final String uri = req.getRequestURI();

       final Map<String, String[]> params = req.getParameterMap();

       final AsyncContext asyncContext = req.startAsync();

       asyncContext.getRequest().setAttribute("uri", uri);

       asyncContext.getRequest().setAttribute("params", params);

       asyncContext.setTimeout(asyncTimeoutInSeconds * 1000);

       if(asyncListener != null) {

           asyncContext.addListener(asyncListener);

       }

       CompletableFuture future = task.call();

       future.thenAccept(result -> {

           HttpServletResponse resp = (HttpServletResponse)asyncContext.getResponse();

           try {

                if(result instanceof String) {

                    byte[] bytes = new byte[0];

                    if (StringUtils.isBlank(result)){

                       resp.setContentType("text/html;charset=gbk");

                       resp.setContentLength(0);

                    } else {

                        bytes =result.getBytes("GBK");

                    }

                   //resp.setBufferSize(bytes.length);

                   resp.setContentType("text/html;charset=gbk");

                   if(StringUtils.isNotBlank(localIp)) {

                       resp.setHeader("t.ser", localIp);

                    }

                   resp.setContentLength(bytes.length);

                   resp.getOutputStream().write(bytes);

                } else {

                    write(resp,JSONUtils.toJSON(result));

                }

           } catch (Throwable e) {

               resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); //程序内部错误

                try {

                    LOG.error("get infoerror, uri : {},  params : {}", uri,JSONUtils.toJSON(params), e);

                } catch (Exception ex) {

                }

           } finally {

                asyncContext.complete();

           }

       }).exceptionally(e -> {

           asyncContext.complete();

           return null;

       });

}

 

另外还有Java中协程库Quasar,可参考《Java的纤程库 - Quasar》,目前没有在应用中使用并在测试FiberHttpServlet的时候遇到很多坑,日后把Quasar自如运用后形成日记,希望能结实更多的朋友一起研究,踩坑。

 

作者介绍:孙伟,目前负责京东商品详情页统一服务系统,写过java,写过ngx_lua,还写过storm等,喜欢学习研究新事物。

0
1
分享到:
评论

相关推荐

    java 中同步、异步、阻塞和非阻塞区别详解

    在Java编程中,同步、异步、阻塞和非阻塞是四...同步和阻塞适合处理简单的任务,而异步和非阻塞更适合需要高并发和低延迟的场景。理解这些概念,并结合Java提供的并发工具,能够帮助开发者编写出更加高效和可靠的程序。

    20_来聊聊redis的线程模型吧?为啥单线程还能有很高的效率?.zip

    这些客户端库同样采用了异步非阻塞I/O模型,以适应Redis的单线程模型。它们会将请求打包成队列,然后在一个后台线程中发送到Redis,避免阻塞主线程。 PPT.pptx可能包含更深入的Redis线程模型讲解,例如如何利用事件...

    聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇).doc

    Netty 是一个高性能、异步事件驱动的网络应用框架,常用于开发高并发、低延迟的网络服务。在 Netty 中,Reactor 模型是其处理 I/O 事件的关键设计,它允许服务端高效地处理大量并发连接。在本文中,我们将探讨 ...

    javaniofile:java.nio.file 上的谷歌环聊会话支持类

    Java NIO(非阻塞I/O)是Java标准库中的一部分,从Java 1.4版本开始引入,为处理I/O操作提供了全新的机制。相比于传统的Java IO,NIO提供了更高效、灵活的I/O操作方式,特别适用于高并发、大数据量的场景。`java.nio...

    毕设项目:基于netty+websocket+springboot的实时聊天系统.zip

    Netty的非阻塞I/O模型有助于在高并发场景下保持低延迟和高吞吐量。 **项目结构与组件** 1. **用户模块**:实现用户注册、登录功能,通常会涉及到数据库操作,如MySQL,存储用户信息。 2. **聊天模块**:通过...

    AsyncDemo.zip

    当用户访问`/async`端点时,`asyncTask()`方法将在后台线程中执行,而控制器立即返回"异步任务已启动",这样就实现了非阻塞的异步处理。 然而,`@Async`还有一些高级用法,比如支持返回值、异常处理和异步方法的回...

    一文聊透 Netty 核心引擎 Reactor 的运转架构.doc

    当需要执行非I/O任务时,Reactor会将任务提交到队列,由专门的工作线程进行处理,确保了I/O线程不会被阻塞,从而维持高并发性能。 总的来说,Netty的Reactor模型通过高效地处理I/O事件和调度任务执行,实现了网络...

    基于Netty+Redis+protobuf开发的即时通讯服务器.zip

    Netty的NIO(非阻塞I/O)模型使得服务器能够同时处理大量连接,显著提升了系统的并行处理能力。在即时通讯服务器中,Netty用于建立客户端与服务器之间的连接,处理各种网络事件,如连接建立、数据传输和断开连接等。...

    并发:并发任务1/2

    为了避免这些问题,我们需要合理设计同步策略,如使用公平锁或非阻塞算法。 在Java 5之后引入的并发集合,如`ConcurrentHashMap`, `CopyOnWriteArrayList`等,为并发编程提供了线程安全的数据结构,减少了对显式...

    Netty大纲-同步netty专栏

    - **EventLoop剖析**:深入EventLoop的工作原理,它是Netty高并发性能的关键。 - **accept和read流程**:解析接受新连接和处理读取数据的具体步骤,揭示了Netty如何高效处理网络I/O。 通过以上内容,我们可以全面...

    使用netty做底层架构,搭建聊天室

    考虑到并发性,服务器端可能需要多线程或者EventLoopGroup来处理多个连接,确保系统的高并发能力。 **安全性**也不能忽视。虽然这是一个学习项目,但实际应用中必须考虑用户认证、数据加密以及防止恶意攻击。例如,...

    Springwebflux测试devdojo

    在Java领域,Spring WebFlux提供了一种处理HTTP请求的新方式,与传统的基于Servlet的Spring MVC不同,它支持异步、事件驱动的编程模型,旨在优化高并发场景下的性能。 在Spring WebFlux中,核心概念包括: 1. **...

    spring-racing-tipping-backend:spring-racing-tipping-app的后端数据库

    Node.js利用JavaScript运行时环境,提供了异步、非阻塞I/O模型,使得处理高并发请求变得高效。在这个项目中,开发者可能利用了Express.js,一个流行的Node.js框架,来构建RESTful API,以便前端与后端进行通信。 ...

Global site tag (gtag.js) - Google Analytics