`

ParSeq in Action

阅读更多

近年来,rest架构风格火了起来,源于越来越多的人在SOA的实践中厌倦了SOAP之流,于是rest api终于抬头。LinkedIn 也于半年前开源了其rest + json的架构:rest.li,着实为rest加了把火。

 

什么是ParSeq

rest.li允许资源异步地返回结果。这个机制由LinkedIn的另外一个开源项目ParSeq来支撑。ParSeq是轻量级的Java异步操作框架。

 

ParSeq能做什么

它的主要提供了如下特性:

  • 并行的异步操作机制
  • 顺序执行非阻塞式的任务
  • 任务组合
  • 简单的异常传播和恢复机制
  • 任务执行追踪和可视化

 

ParSeq的架构简介

在ParSeq的架构里,Task是最基础的工作单元,如同Java的Callable,不同之处在于Task的执行结果可以被异步地获取。Task的启停,调度都交由Engine或者Context(你很快能从下文中看到它们)管理。用户是无法直接使用TaskEngine负责启停和调度Task,而Context则负责子任务。

 

之前我们也提到了ParSeq支持异步获取任务的执行结果。类比Java的多线程机制,不难发现,我们缺少一个等同Java中Future功能的组件,而Promise就是这样一个东西。在Java中如果我们使用

future.get()

来等待future的完成,但这样会阻塞当前的线程。而Promise提供一套监听机制,用户可以为Promise添加监听器,Task完成时会通知这些监听器完成回调逻辑。

 

此外,ParSeq可以非常方便地组合任务。ParSeq提供了一对常用的组合策略:如果并行地运行一组Task,你可以使用par,如果想顺序地执行,那么你可以使用seq

 

下载&编译

下载源码

源码维护在GitHub上:https://github.com/linkedin/parseq。你可以直接下载zip包,使用Github客户端下载,使用Git客户端下载等等。

 

编译环境

请先确认环境是否安装JDKAnt。如果需要可视化追踪功能,请下载node.jsnpm。JDK版本1.6和1.7皆可。Ant需要1.8.2及以上版本。JDK,Ant都需要配置环境变量。

 

编译源码

导航至源码根目录,执行如下命令:

ant -Dno.test.tracevis=1 test

 如果要打包,执行如下命令:

ant dist

 

Eclipse导入parseq源码

我使用的是Eclipse Juno。

File->New->Other->选择Java Project from Existing Ant Buildfile->Next->Browse->选择parseq根目录下的build.xml->Finish。 

 

如何使用ParSeq

环境准备好后,给出一个官网上的简单例子,帮助你快速上手。对于一个最基本的ParSeq程序,用户至少需要定义EngineTask这个两个组件。

 

创建Engine

import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

// ...

final int numCores = Runtime.getRuntime().availableProcessors();
final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor(); 

final Engine engine = new EngineBuilder()
        .setTaskExecutor(taskScheduler)
        .setTimerScheduler(timerScheduler)
        .build();

 

engine由builder构建,这种高可读的显式构造方式使用户对构造engine时所需的参数一目了然。在本例中,我们使用了容量为处理器核数+1的线程池作为任务执行池,并使用另外一个线程来调度计时器。当然这些都可以根据用户的具体用例来定制不同的engine。

 

关闭Engine

engine.shutdown();
engine.awaitTermination(1, TimeUnit.SECOND);
taskScheduler.shutdown();
timerScheduler.shutdown();

自解释的代码,无需多言。值得注意的是Engine并不管理executor的生命周期,用户需要显式地关闭它们。在关闭期间,新的Task不允许执行,而ParSeq允许运行中的Task完成自己的任务。

 

创建任务

用户可以按不同的业务行为定义不同的任务:

<图一>


 各Task实现类的明细功能会在后文中介绍,本例中我们先使用CallableTask。这种Task常用来做一些简单的计算和转换工作。本例的Task用于返回输入字符串的字符数。

import com.linkedin.parseq.Task;
import com.linkedin.parseq.Tasks;

import java.util.concurrent.Callable;

// ...

class GetLengthTask implements Callable<Integer>
{
  private final String _string;
  
  public GetLengthTask(String string) {
    _string = string;
  }
  
  @Override
  public Integer call() throws Exception {
    return _string.length();
  }
}

final Task<Integer> task = Tasks.callable("length of 'test str'", new GetLengthTask("test str"));

CallableTask还常用来转换其他Task的结果,这部分内容我们会在Task组合中更详细地介绍。  

 

运行任务

engine.run(task);

 

 

自定义Task

图一中,我们可以看到名为BaseTask的抽象类。如果用户想自定义自己的Task类型,就可以继承该类。

 

集成第三方异步库

个人觉得ParSeq最赞的功能是可以无缝地集成第三方异步库,这些库把自己托管给ParSeq。一旦集成完毕,所有的异步库摇身一变,跳起ParSeq Style。这对上层的程序员来说,无异于福音,他们无需再了解如何使用这些异步库。此外,第三方库也可以共享ParSeq为任务提供的各种额外功能。

 

为此官方提供了一个例子,这个例子集成的是GitHub上的一个开源异步Http客户端:

https://github.com/ning/async-http-client

https://github.com/sonatype/async-http-client

 

import com.linkedin.parseq.BaseTask;
import com.linkedin.parseq.Context;
import com.linkedin.parseq.promise.Promise;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;

import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.Request;
import com.ning.http.client.Response;

// ...

class HttpRequestTask extends BaseTask<Response>
{
  private final Request _request;

  public HttpRequestTask(Request request) {
    super("Http request: " + request);
    _request = request;
  }

  @Override
  protected Promise<Response> run(final Context context) throws Exception {
    // Create a settable promise. We'll use this to signal completion of this
    // task once the response is received from the HTTP client.
    final SettablePromise<Response> promise = Promises.settable();

    // Send the request and register a callback with the client that will
    // set the response on our promise.
    HTTP_CLIENT.prepareRequest(_request).execute(new AsyncCompletionHandler<Response>() {
      @Override
      public Response onCompleted(final Response response) throws Exception {
        // At this point the HTTP client has given us the HTTP response
        // asynchronously. We set the response value on our promise to indicate
        // that the task is complete.
        promise.done(response);
        return response;
      }

      @Override
      public void onThrowable(final Throwable t)
      {
        // If there was an error then we should set it on the promise.
        promise.fail(t);
      }
    });

    // Return the promise to the ParSeq framework. It may or may not be
    // resolved by the time we return this promise.
    return promise;
  }
}

非常方便,用户立刻得到了一个ParSeq Style的异步Http客户端。代码中的HTTP_CLIENT是AsyncHttpClient的一个实例。

 

在代码中,我们创建了SettablePromise用于记录HTTP的相应:


这部分的实现比较精彩。值得一读源码。

 

Task的组合

 To be continued...

  • 大小: 11.8 KB
  • 大小: 8.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics