`

为异步而生,Dactor

 
阅读更多

 

DActor

Introduction

DActor框架基于协程思想设计,可同时支持同步和异步代码,简化在线异步代码的开发,用同步代码的思维来开发异步代码,兼顾异步代码的高并发、无阻塞和同步代码的易读性,可维护性。
最大程度的降低阻塞,提高单个线程的处理能力,并可有效的降低线程数。

项目地址

GitHub:https://github.com/allon2/dactor
GitEE:https://gitee.com/handyun/dactor

文档

QQ交流群

783580303

接入系统

  • Dpress:基于Dactor构建的多域名博客系统
  • 持续更新中。。欢迎大家自荐

Overview

目前开发过程中的几个常见模型

  • 同步编程
    所有步骤都在一个主线程中完成,调用一个方法,等待其响应返回。一个请求占用一个线程,在有数据库操作、TCP和Http通讯时因为有阻塞情况,会导致占用线程占用而无法及时释放
    ,因此在同步交易中引入了线程池概念,提高系统的吞吐量
  • 异步编程
    所有步骤都可在不同线程中完成,调用一个方法,不等待响应既返回,典型交易如NodeJs。
    目前市面上的异步框架都比较复杂,市面的通用解决方案是CallBack和Promise/Deferred模式模式。

设计思路

  • 为了保留异步的高性能,简化异步的开发模式,同时使得程序更容易被程序员理解,在性能和代码可阅读性中间取得平衡,设计了此框架。
  • 处理步骤:将请求封装为消息,丢入消息队列,寻找合适步骤处理消息,上述过程不断循环,直到所有可用步骤都执行完毕。
    因为是对消息队列进行处理,对于同步交易,处理完毕即可丢入消息队列。对于异步交易,等待回调完毕再丢入消息队列。
    两种情况对于框架来说是无差别的。同时因为通过异步交易避免了阻塞情况的发生,所以可在不大幅度提高线程数的情况下,提高吞吐量,
    同时也可在一定程度避免流量突增的情况发生。
  • 消息队列采用Disruptor的的高性能队列RingBuffer。
  • 以Actor协程并发模型为基础设计框架。

Features

  • 1、集成Netty
  • 2、集成HttpClient
  • 3、集成HttpServlet
  • 4、支持多层父子结构
  • 5、支持责任链模式
  • 6、J2EE支持json,csv,pdf,xml,html格式输出
  • 7、J2EE支持数据流输出,动态文件下载、动态图片输出、跳转和可根据配置动态输出
  • 8、同时支持SpringBoot和Spring

环境要求

  • JDK 1.8
  • Spring FrameWork 5.2.4.RELEASE + 或者 Spring Boot 2.2.7.RELEASE +
  • Servlet 3.0+(因为需要使用Servlet的异步功能)

注意事项

请求的完整逻辑是分散在不同的线程中执行的,所以尽量避免使用ThreadLocal

Getting Started

example是J2EE程序,下载后,可直接运行,其中集成了若干例子
默认使用.do提交相关交易,但如果是.json将会返回json数据
启动后,在浏览器中输入http://localhost:8080/example/randomTxt2.json
输出的是json格式的字符串
randomTxt2:只有一级父子关系
randomTxt1:有二级父子关系
chaintest1:只使用责任链
chaintest2:同时使用责任链和一级父子关系
exceptionTest:子交易抛出错误,框架对错误的处理
randomTxt3为beginBeanId为Actor标签的BeanId例子
httptest演示的是通过httpclient异步方式访问百度网站
访问URL:http://localhost:8080/example/ httptest.do
http://localhost:8080/example/np.randomTxt2.json为使用命名空间的例子,相关配置在conf/namespace.xml中。

启动后,可在控制台看到内部调用结果
1.png

Maven dependency

<dependency>
    <groupId>cn.ymotel</groupId>
    <artifactId>dactor</artifactId>
    <version>1.1.2</version>
</dependency>

Gradle dependency

compile group: 'cn.ymotel', name: 'dactor', version:'1.1.2'

代码简单讲解

执行过程为chain->grandfather->parent->Selft。
依次调用执行责任链中逻辑,grandfather中的逻辑,parent的逻辑和自身逻辑。
chain,grandfather,parent都可为空,不设置
在grandfather和parent中的Steps中至少有一个为placeholderActor交易,以调用子逻辑

整个过程中,需要先设置全局占位符
<actor:global id="actorglobal">
<actor:param name="beginBeanId" value="beginActor"/>
<actor:param name="endBeanId" value="endActor"/>
</actor:global>
交易中如果未填写beginBeanId或者endBeanId时,系统默认使用全局中配置的beginBeanId或者endBeanId

   <actor id="randomTxt" parent="actorhttpcore" beginBeanId="randomTxtActor">
        <steps>
            <step fromBeanId="randomTxtActor" toBeanId="placeholderActor" conditon=""/>
            <step fromBeanId="placeholderActor" toBeanId="endActor" conditon=""/>
        </steps>
    </actor>

condtion可为空,空字符串,或者是ognl表达式
placeholderActor的作用是在暂存当前环境,并调用子交易,待子交易执行完毕后,再恢复当前环境继续执行
如果在Step中未找到toBeanIdActor,会直接调用endBeanId方法,认为自身交易已执行结束。
交易的请求和流转信息都保存在Message中
如果指定handleException=false或者使用默认设置,直接返回父中执行,如果父中也未捕获,则继续返回上一级执行,
一般来说至少有要有一个actor中指定handleException=true


启动框架接收和执行请求

同步异步写法

同步写法

 public class BeginActor implements Actor {

   /* (non-Javadoc)
    * @see com.ymotel.util.actor.Actor#HandleMessage(com.ymotel.util.actor.Message)
    */
   @Override
   public Object HandleMessage(Message message) throws Exception {
       return message;

   }

}

继承Actor接口,在HandlerMessage返回message方法,框架判断返回不为空的情况下,直接将结果丢给框架进行下一步出来

异步写法

以HttpClientActor为例,在HandlerMessage中最后返回null,框架收到请求后等待下一步处理,释放线程,在HttpClient中的CallBack中调用
message.getControlMessage().getMessageDispatcher().sendMessage(message);框架收到后进行下一步处理。

   public Object HandleMessage(final Message message) throws Exception {
       Map context = message.getContext();

//		
       HttpUriRequest request = getHttpBuild(message.getContext()).build();

       if (referer != null) {
           request.addHeader("Referer", referer);
       }


       /**
        * 通过此处可共用会话,进行类似登录后交易
        */
       HttpClientContext tmplocalContext = null;
       if (context.containsKey(HTTPCLIENT_CONTEXT)) {
           tmplocalContext = (HttpClientContext) context.get(HTTPCLIENT_CONTEXT);
       } else {
           tmplocalContext = HttpClientContext.create();
           CookieStore cookieStore = new BasicCookieStore();
           tmplocalContext.setCookieStore(cookieStore);
           context.put(HTTPCLIENT_CONTEXT, tmplocalContext);
       }

       final HttpClientContext localContext = tmplocalContext;
//        CookieStore cookieStore = new BasicCookieStore();
//        localContext.setCookieStore(cookieStore);
       if (logger.isInfoEnabled()) {
           logger.info("HandleMessage(Message) - httpclient----" + request); //$NON-NLS-1$
       }
//		 final HttpGet httpget = new HttpGet(uri);

       final String tmpcontent = content;
       final String tmpcharset = charset != null ? charset : (String) context.get(CHARSET);


       httpClientHelper.getHttpclient().execute(request, localContext, new FutureCallback<HttpResponse>() {

           public void completed(final HttpResponse response) {
               try {
                   /**
                    * 完成后及时清除
                    */
                   message.getContext().remove(tmpcontent);

                   actorHttpClientResponse.handleResponse(response, localContext, tmpcharset, message);
//					String responseString=HandleResponse((String)message.getContext().get(CHARSET),response);
//					message.getContext().put(RESPONSE, responseString);
               } catch (Exception e) {
                   // TODO Auto-generated catch block
                   if (logger.isErrorEnabled()) {
                       logger.error("$FutureCallback<HttpResponse>.completed(HttpResponse)", e); //$NON-NLS-1$
                   }
                   message.setException(e);
                   message.getControlMessage().getMessageDispatcher().sendMessage(message);
               }
//                  System.out.println(httpget.getRequestLine() + "->" + response.getStatusLine());
           }

           public void failed(final Exception ex) {
               if (logger.isErrorEnabled()) {
                   logger.error("$FutureCallback<HttpResponse>.failed(Exception)", ex); //$NON-NLS-1$
               }
               message.setException(ex);
               message.getControlMessage().getMessageDispatcher().sendMessage(message);

//                  System.out.println(httpget.getRequestLine() + "->" + ex);
           }

           public void cancelled() {
               Exception exception = new Exception("已取消");
               message.setException(exception);
               message.getControlMessage().getMessageDispatcher().sendMessage(message);

//                  System.out.println(httpget.getRequestLine() + " cancelled");
           }

       });
       return null;
   }

配置和API说明

配置说明

通过在xml中的Step实现内部Actor之间的流程跳转
在配置文件中包含
Actor、chain、和global配置 。
程序整个执行顺序为根据交易码找到对应的Actor,然后执行按照chain->parent->selft的顺序进行执行。
chain执行到placeholder处,调用parent交易继续执行,在parent交易中执行到placeholder交易后,调用selft自身交易继续执行。
自身交易执行完毕,弹出parent的placeholder处交易继续执行.parent执行完毕,弹出chain中代码继续执行。
global配置如下

  <actor:global id="actorglobal">
         <actor:param name="beginBeanId" value="beginActor"/>
         <actor:param name="endBeanId" value="endActor"/>
   </actor:global>

beginBeanId为默认的开始Actor,value中的值是在Spring中对应的beanName,程序初始化时将会取得此值,对未指定beginBeanId或者endBeanId的Actor初始化全局配置。
beginActor和endActor都需要继承Actor接口。
actor配置如下

    <actor:actor id="actorhttpcore"   parent="chainparent" chain="unLoginChain"  handleException="true"  endBeanId="FinishActor" >

        <actor:steps>
            <actor:step xpoint="" ypont="" fromBeanId="beginActor"  conditon="" toBeanId="placeholderActor"/>
             <actor:step xpoint="" ypont="" fromBeanId="beginActor"  conditon="" async="true"  toBeanId="placeholderActor"/>
           <actor:step xpoint="" ypont=""  fromBeanId="placeholderActor" conditon="context._SUFFIX=='json'"  toBeanId="JsonViewResolverActor"/>
            <actor:step xpoint="" ypont=""  fromBeanId="placeholderActor" conditon="exception==null" toBeanId="ViewResolveActor"/>
            <actor:step xpoint="" ypont=""   fromBeanId="placeholderActor" conditon="exception!=null"  toBeanId="ErrorViewResolveActor"/>
        </actor:steps>
             <results>
                    <result name="success">htmlstream:</result>
             </results>
    </actor:actor>

属性handleException如果不设置的话,遇到异常,程序将会认为子类中已经执行完毕,跳到parent中PlaceHolder处执行。设置为true,将不会直接跳转到parent中,由子类进行自我处理。
parent和chain为调用具体交易前需要调用的公共交易,由于大部分交易都有通用的前置交易和统一的后置交易。通过设置parent或者chain,可提高代码复用度。
fromBeanId和toBeanId配置的是Actor或者实现Actor接口的beanId。
parent和chain中的ref都需要是Actor.
results中可定义返回的state和需要处理的viewActor
async标记是否是旁路交易,默认值为false,为true值时,会将上下文内容设置复制一份,重新生成一份Message,进行执行,不影响主流程。
chain配置

  <actor:chain id="isLoginChain">
         <list>
             <ref bean="actorhttpcore"></ref>
             <ref bean="isLoginActor1"></ref>
 
         </list>
     </actor:chain>

chain可直观展现Actor调用顺序.
在chain中可顺序并列多个parent类。每个parent中的Step都需要有placeHolderActor,以调用子类。
依次执行list中的交易,再执行自身交易。自身交易执行完毕,再依次回溯责任链中的每个交易,直到无可用交易。
命名空间

<actor:actors xmlns="http://www.ymotel.cn/schema/dactor"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:aop="http://www.springframework.org/schema/aop"
             xmlns:actor="http://www.ymotel.cn/schema/dactor"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.ymotel.cn/schema/dactor http://www.ymotel.cn/schema/dactor.xsd"  namespace="np">
   <!-- parent 和beginBeanId为全局name,randomTxt2在Spring中的全程是np.randomTxt2-->
   <actor id="randomTxt2" parent="actorhttpcore" beginBeanId="randomTxtActor">
   </actor>
</actor:actors>

在actor中可增加命名空间,简化代码开发。在actor中配置namespace=np,则实例中的actor的id会自动拼装为np.randomTxt2
http://localhost:8080/example/np.randomTxt2.json为使用命名空间的例子,相关配置在conf/namespace.xml中。

重要类方法说明

cn.ymotel.dactor.core.MessageDispatcher是交易流转的核心接口类
public void startMessage(Message message, ActorTransactionCfg actorcfg, boolean blocked) throws Exception
方法,用于开始整个流程,其中message需要在执行前进行构造,actorcfg可通过spring的getBean方法得到为Actor对象,如下

<actor id="randomTxt1" parent="randomTxt" beginBeanId="randomTxtActor">
    </actor>
通过getBean('randomTxt1')即可得到ActorTransactionCfg对象。  
blocked为是否阻塞,一般在交易初次放入队列是为false,表示如果队列满,则直接扔给客户端进行处理。为true则一般为内部交易,必须提交给队列进行处理。

sendMessage方法内部调用,用于将处理完毕的Message重新放入队列,继续下一步流程。
cn.ymotel.dactor.core.disruptor.MessageRingBufferDispatcher是MessageDispatcher的接口实现类。,在启动Spring是需要在配置中加上

 <bean id="MessageRingBufferDispatcher" class="cn.ymotel.dactor.core.disruptor.MessageRingBufferDispatcher">
   </bean>

MessageRingBufferDispatcher的strategy、bufferSize、threadNumber为三个可设置属性.正常情况下使用默认设置即可。
strategy默认使用ringBuffer的BlockingWaitStrategy策略进行调度,如果交易量比较大,可调整此策略。
bufferSize默认使用1024。
threadNumber默认使用CPU个数的线程数。

其他默认Actor说明

cn.ymotel.dactor.message.Message.Actor,所有需要在执行的交易都必须继承此接口。
public Object HandleMessage(Message message) throws Exception;程序通过调用HandleMessage对象,如果返回的不是message对象或者为NULL,则认为此交易是异步执行,不再自行调度。由异步交易在收到请求后,自己调用将Message再此放入队列中。
cn.ymotel.dactor.action.PlaceholderActor 交易为特殊交易,用来将当前队列暂存,并调用子交易。
cn.ymotel.dactor.action.BeginActor 为Actor中step的默认开始交易。
cn.ymotel.dactor.action.EndActor 为Actor中step的默认结束交易。
cn.ymotel.dactor.action.JsonViewResolverActor为需要返回Json的J2EE view
cn.ymotel.dactor.action.ViewResolveActor为需要返回J2EE view的统一处理Actor
cn.ymotel.dactor.action.httpclient.HttpClientActor 提供的异步调用httpClient的Actor
cn.ymotel.dactor.action.netty.aysnsocket.TcpClientActor 提供的异步调用netty的Actor

交易流程举例说明

  <actor:actor id="actorhttpcore" handleException="true"  endBeanId="FinishActor" >

        <actor:steps>
            <actor:step xpoint="" ypont="" fromBeanId="beginActor"  conditon="" toBeanId="placeholderActor"/>
            <actor:step xpoint="" ypont=""  fromBeanId="placeholderActor" conditon="context._SUFFIX=='json'"  toBeanId="JsonViewResolverActor"/>
            <actor:step xpoint="" ypont=""  fromBeanId="placeholderActor" conditon="exception==null" toBeanId="ViewResolveActor"/>
            <actor:step xpoint="" ypont=""   fromBeanId="placeholderActor" conditon="exception!=null"  toBeanId="ErrorViewResolveActor"/>
        </actor:steps>

    </actor:actor>

 <actor id="randomTxt2" parent="actorhttpcore" beginBeanId="randomTxtActor">
    </actor>

以上交易的交易流程图如下
randomTxt2.png
以上的完整例子都可在example中得到

分享到:
评论

相关推荐

    小程序 单相异步电机设计 7.1(学生必备)

    小程序 单相异步电机设计 7.1(学生必备)小程序 单相异步电机设计 7.1(学生必备)小程序 单相异步电机设计 7.1(学生必备)小程序 单相异步电机设计 7.1(学生必备)小程序 单相异步电机设计 7.1(学生必备)小...

    C#异步操作 异步查询数据库 异步处理一行一行加载数据

    `async`修饰符标记一个方法为异步,而`await`则用于等待一个异步操作完成,而不阻塞调用线程。这种方法被称为“基于任务的异步模式”(TAP)。 异步查询数据库通常涉及ADO.NET或ORM框架,如Entity Framework。在C#...

    WebSerices异步调用方法总结

    WebServices 异步调用方法总结 WebService 异步调用是指在客户端和服务器端之间进行异步数据交换的过程。异步调用可以提高系统的性能和可扩展性,实现高效的数据交换。下面是 WebService 异步调用的实现方法总结: ...

    Labview异步调用示例

    在LabVIEW中,这通常表现为一个子VI,其连接到启动异步操作的VI的事件结构中。 3. **多线程**:在某些情况下,LabVIEW可能会利用多线程来实现异步操作。尽管LabVIEW默认的执行模型是单线程,但通过引入线程 VI 或...

    Angular异步变同步处理方法

    由于异步操作的特性,它能处理那些按顺序依次执行的接口调用,而不会阻塞主线程,让页面陷入假死状态。在Angular中,主要通过Promise规范来实现异步变同步的处理。 首先,异步编程是JavaScript的核心特征之一,它...

    异步调用流程图

    调用者可以继续执行后续任务,而不会等待该异步操作完成。当异步操作的结果准备就绪时,会通过回调函数、事件或观察者模式来通知调用者。 在异步调用流程图中,通常包含以下几个关键步骤: 1. **发起请求**:调用...

    ProE异步模式类

    在描述中提到的"WF5.0X64的库文件",WF5.0代表的是Pro/Engineer Wildfire 5.0版本,而X64则表示这是为64位操作系统编译的版本。这表明该库文件是为较旧的Creo版本设计的,但已经过测试,适用于Creo 2.0,这显示了...

    异步及异步回调

    这意味着当一个操作开始后,程序可以继续执行其他任务,而无需等待该操作完成。这种非阻塞的特性在处理I/O密集型任务(如网络通信、文件读写)或耗时计算时特别有用,因为它能提高整体的系统效率。 异步操作通常...

    文件同步与异步读写

    而"VS2005下写文件程序"则可能包含异步写入的实现,利用了Windows的异步I/O功能。这些示例可以帮助理解同步与异步I/O在实际编程中的应用。 在实际开发中,选择同步还是异步读写取决于具体需求。如果对性能要求较高...

    asp.net 实现传统方式下异步调用的实现

    异步控制器的方法定义为`async Task&lt;ActionResult&gt;`,`await`关键字用于挂起控制器方法,直到异步操作完成。 7. **优化和注意事项** - 虽然异步调用可以提高性能,但过度使用可能导致线程池资源耗尽。合理评估和...

    C#实现异步连接Sql Server数据库的方法

    `async`修饰符标记一个方法为异步,而`await`关键字则用于挂起异步操作并在操作完成时恢复执行。 以下是一个使用`async`和`await`实现异步连接SQL Server数据库的基本示例: ```csharp using System; using System...

    c#异步经典Demo

    例如,`HttpClient`类的`GetStreamAsync`或`GetByteArrayAsync`方法可以用于异步获取网络资源,而`FileStream`的异步写入方法则用于将下载的数据保存到本地。 "EasyAsync"可能是一个简化异步操作的示例集,它展示了...

    Nodejs让异步变成同步的方法

    Node.js中让异步操作变成同步的方法通常是指将非阻塞的异步调用改写为在逻辑上表现得像同步调用的方式。Node.js中异步操作是通过回调函数、Promises和async/await等技术来实现的。以下我们详细讲解如何通过这些方法...

    C# 同步与异步读写

    `async`关键字定义了一个异步方法,而`await`关键字用于等待异步操作的完成。当`await`操作符遇到时,控制权会返回给调用者,直到异步操作完成才会继续执行后续代码。 此外,C#还提供了`Task.Run`方法,可以在后台...

    Linux异步通信socket

    在异步通信中,通信的任意一方可以在任何时候发送消息,而当消息到达时,系统会通过特定的机制通知接收方进行消息的处理。这种模式避免了进程或线程的阻塞,从而提高了系统的整体效率。 在Linux中,实现异步通信的...

    Springboot启用async异步调用

    在上述代码中,`longRunningTask`方法被标记为异步,这意味着当它被调用时,Spring会将其放入一个单独的线程中执行,而不会阻塞当前请求的处理。`Future`返回类型允许我们在需要时获取异步任务的结果。 为了调用这...

    C#实现异步调用外部程序

    `async`修饰符用于定义一个异步方法,而`await`关键字用于等待一个异步操作完成。这种方式可以让程序在等待外部操作执行时,释放出CPU资源,提高应用程序的响应性和用户体验。 要异步调用外部程序,最常用的方法是...

    异步Socket,主要是简单的异步Socket

    异步Socket编程是网络编程中的一个重要概念,尤其在高并发、大数据传输的场景下,它的优势尤为明显。本文将深入探讨异步Socket的工作原理、如何实现以及它在实际应用中的重要性。 首先,Socket是网络通信的基础,它...

    javaHttp异步请求

    `AsyncClientHttpExchangeStreaming` 类可能是一个自定义的类,用于封装`HttpAsyncClients`的异步请求逻辑,特别是针对流式传输(streaming)的场景。流式传输在处理大文件或持续的数据流时非常有用,因为它允许数据...

    TCP异步通信试验

    在IT领域,网络通信是至关重要的部分,而TCP(传输控制协议)作为其核心协议之一,为应用程序提供了可靠的、基于连接的数据通信服务。本实验“TCP异步通信试验”聚焦于利用MFC(Microsoft Foundation Classes)框架...

Global site tag (gtag.js) - Google Analytics