`

HBase中asynchbase的使用方式

 
阅读更多
Hbase的原生java 客户端是完全同步的,当你使用原生API 例如HTableInterface 访问HBase表,每个动作都会有一个短暂的阻塞,这对于一些流程较长的操作看起来比较不利。
HBase在此还提供了另外一种java客户端 asynchbase,它实现了完全异步以及考虑线程安全。
依赖jar:asynchbase.jar
        slf4j-api.jar
        slf4j-simple.jar

实战实例:
   现在Hbase 数据库有100W条存储了用户的数据信息,信息包括用户名以及用户密码 数据表如下
rowkey:username
col family:info
col:password

   要实现的功能为:给所有的用户密码进行加盐处理,并通知用户成功修改了密码,如果修改失败则也通知用户修改失败。
   在关系型数据库要实现这个想必大家都会,例如一个分页查询每次1000条,进行遍历操作顺序执行,也可以并行执行。
    asynchbase效仿了python twisted的异步处理组件,在此我们需要先了解一下python twisted 异步编程
     让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。

     在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。
     从上面的讲解中我们可以看到异步模型比多线程模型更简单些,因为异步模型只有一个进程而且任务的停止和运行状态是可控的.但比同步模型相比还是比较复杂,程序员必须把每一个任务分成很多步然后再有序的把他们组合起来,如果一个任务用到了令一个任务的结果,这个任务需要接受另一个任务的输出做为他自己的输入,而且这种接收的数据经常是一段一段的而不是一个整体. 你不禁要问既然异步模型和同步模型都是一个线程,他们执行相同的任务应该花费相同的时间啊,甚至比同步模型花费的时间更多,为什么要才采用异步的模型呢?
    这里最少有两个原因,第一,如果多个任务中的一个任务负责实现一个人机交互接口,在等待用户输入的时候,可以让其他的任务先去执行,等用户输入时再去处理用户的输入.
    Deferreds
    Deferred对象以抽象化的方式表达了一种思想,即结果还尚不存在。它同样能够帮助管理产生这个结果所需要的回调链。当从函数中返回时,Deferred对象承诺在某个时刻函数将产生一个结果。返回的Deferred对象中包含所有注册到事件上的回调引用,因此在函数间只需要传递这一个对象即可,跟踪这个对象比单独管理所有的回调要简单的多。
    Deferred对象包含一对回调链,一个是针对操作成功的回调,一个是针对操作失败的回调。初始状态下Deferred对象的两条链都为空。在事件处理的过程中,每个阶段都为其添加处理成功的回调和处理失败的回调。当一个异步结果到来时,Deferred对象就被“激活”,那么处理成功的回调和处理失败的回调就可以以合适的方式按照它们添加进来的顺序依次得到调用。
   
    用户密码加盐的任务链如下


    在一个deferred实例上加上 Callback将所有的任务链链接到一起 主要代码如下
 public static void main(String[] args) throws Throwable {
    final HBaseClient client = new HBaseClient("localhost");
    Deferred<ArrayList<Object>> d = Deferred.group(doList(client));
    try {
        d.join();
    } catch (DeferredGroupException e) {
        LOG.info(e.getCause().getMessage());
    }

    //线程阻塞直到shutdown完成
    client.shutdown().joinUninterruptibly();
  }

 static List<Deferred<Boolean>> doList(HBaseClient client)
      throws Throwable {
    final Scanner scanner = client.newScanner(TABLE_NAME);
    scanner.setFamily(INFO_FAM);
    scanner.setQualifier(PASSWORD_COL);

    ArrayList<ArrayList<KeyValue>> rows = null;
    ArrayList<Deferred<Boolean>> workers
      = new ArrayList<Deferred<Boolean>>();
    //线程阻塞直到返回所有查询结果
    while ((rows = scanner.nextRows(1).joinUninterruptibly()) != null) {
      LOG.info("received a page of users.");
      for (ArrayList<KeyValue> row : rows) {
        KeyValue kv = row.get(0);
        byte[] expected = kv.value();
        String userId = new String(kv.key());
        PutRequest put = new PutRequest(
            TABLE_NAME, kv.key(), kv.family(),
            kv.qualifier(), mkNewPassword(expected));
        //任务链构建
        Deferred<Boolean> d = client.compareAndSet(put, expected)
          .addCallback(new InterpretResponse(userId))
          .addCallbacks(new ResultToMessage(), new FailureToMessage())
          .addCallback(new SendMessage());
        workers.add(d);
      }
    }
    return workers;
  }
   
   //等待hbase操作结果返回response
   static final class InterpretResponse
      implements Callback<UpdateResult, Boolean> {

    private String userId;

    InterpretResponse(String userId) {
      this.userId = userId;
    }

    public UpdateResult call(Boolean response) throws Exception {
      latency();

      UpdateResult r = new UpdateResult();
      r.userId = this.userId;
      r.success = entropy(response);
      if (!r.success)
        throw new UpdateFailedException(r);

      latency();
      return r;
    }

    @Override
    public String toString() {
      return String.format("InterpretResponse<%s>", userId);
    }
  }

   //密码更新成功
   static final class ResultToMessage
      implements Callback<String, UpdateResult> {

    public String call(UpdateResult r) throws Exception {
      latency();
      String fmt = "password change for user %s successful.";
      latency();
      return String.format(fmt, r.userId);
    }

    @Override
    public String toString() {
      return "ResultToMessage";
    }
  }
  
  //密码更新失败
  static final class FailureToMessage
      implements Callback<String, UpdateFailedException> {

    public String call(UpdateFailedException e) throws Exception {
      latency();
      String fmt = "%s, your password is unchanged!";
      latency();
      return String.format(fmt, e.result.userId);
    }

    @Override
    public String toString() {
      return "FailureToMessage";
    }
  }
  
  ///发送信息
  static final class SendMessage
      implements Callback<Boolean, String> {

    public Boolean call(String s) throws Exception {
      latency();
      if (entropy(null))
        throw new SendMessageFailedException();
      LOG.info(s);
      latency();
      return Boolean.TRUE;
    }

    @Override
    public String toString() {
      return "SendMessage";
    }
  }



 

   
  • 大小: 9.3 KB
  • 大小: 9.6 KB
  • 大小: 20.7 KB
  • 大小: 24.4 KB
  • 大小: 135.7 KB
  • 大小: 50.1 KB
分享到:
评论

相关推荐

    高性能HBase客户端AsynchronousHBase.zip

    asynchbase 是 Java 库使用 HBase 的替代品,要求一个完全异步,非阻塞,线程安全,高性能的 HBase API。这个 HBase 客户端跟 HBase 的客户端 HTable 有着很大的区别,不需要重写所有的 HBase API 交互代码,原生...

    HBase在小米的实践.pdf

    文件提到了为什么要使用异步HBase客户端(AsyncHBaseClient),这表明在分布式系统中,处理方式对于性能和服务可用性有重大影响。传统的阻塞式客户端使用单一线程处理请求,这会导致在高负载、慢RPC响应或网络故障等...

    storm-asynchbase:AsyncHBase 风暴映射器

    使用 AsyncHBase 客户端的 HBase 风暴连接器 这个用于 Apache Storm 的连接器使用 AsyncHBase 客户端将原始数据和 Trident 状态保存到 Apache HBase。 好处 AyncHBase 客户端是 Apache HBase 的完全异步和线程安全的...

    gohbaseHBasego客户端.pdf

    这个例子中使用了Go的context包来控制函数执行的时间限制,以及如何在超时后优雅地取消操作。 3. **使用gohbase客户端的示例**: ```go client := gohbase.NewClient("localhost") ctx, cancel := context.With...

    头目:使用Jinterface和Asynchbase Java客户端查询数据库的ErlangElixir的HBase驱动程序

    潜水员使用和查询数据库的Erlang / Elixir HBase驱动程序。 Diver在启动时将Java服务器创建为,并将GenServer请求直接分派到在Java服务器上运行的HBaseClient 。 这些请求由HBase群集上的客户端异步执行,并且响应...

    java6.0源码-twitbase-async:HBaseinAction的示例异步数据库应用程序

    部署进行交互,则需要编辑源代码中的构造函数并重新编译 jar。 使用 asynchbase TwitBase 客户端 此客户端用于与 HBase 通信。 客户端假定您有一个包含用户数据的现有 TwitBase 架构。 使用以下命令运行应用程序: $...

    asyncbigtable:AsyncHBase的实现,但在Google的Cloud Bigtable服务之上

    它使用链接Google Bigtable库的Apache HBase 1.0 API。 该库最初是asynchbase 1.5.0库的一个分支,因此人们可能会发现乍一看似乎有些奇怪的代码。 我们正在努力清理代码库并删除不相关的依赖项。 基本安装 与原始...

    java6.0源码-HbaseDemo:HbaseDemo

    部署进行交互,则需要编辑源代码中的构造函数并重新编译 jar。 使用 asynchbase TwitBase 客户端 此客户端用于与 HBase 通信。 客户端假定您有一个包含用户数据的现有 TwitBase 架构。 使用以下命令运行应用程序: $...

    基于OpenTSDB和OPC的能耗数据采集存储技术研究

    当TSD服务器收到OPC采集上来的数据后,通过AsyncHBase客户端库将数据写入HBase数据库中。AsyncHBase作为HBase的一个库,支持异步和非阻塞操作,利用较少的线程和内存资源实现高吞吐量的数据写入,特别适合处理大量写...

    storm-opentsdb:OpenTSDB 风暴映射器

    用于 OpenTSDB 的 Storm 连接器这个用于 Apache Storm 的连接器使用 OpenTSDB java 库,使用 AsyncHBase 客户端将原始数据和 Trident 状态直接持久化到 HBase。 因为每个应用程序应该只有一个 AsyncHBase 客户端,...

    OpenTSDB 文档

    - 数据写入:使用AsyncHBase库将数据异步写入HBase,提高写入效率。 - Web UI:提供图形化界面进行数据查询。 - **HBase**:作为底层存储,存储所有时间序列数据。 **流程概述**: 1. **数据收集**:Collector从...

    时序数据库OpenTSDB构建工业大数据存储平台

    OpenTSDB内部使用的asynchbase客户端支持异步请求,确保了高并发实时写入。 在存储机制上,OpenTSDB将一小时内数据整合为一个键值对,利用HBase的Rowkey设计实现数据压缩。通过相同的metric和tag值,多个秒级数据被...

Global site tag (gtag.js) - Google Analytics