`
brianf
  • 浏览: 37423 次
  • 来自: 杭州
社区版块
存档分类
最新评论

HBase rpc调用

阅读更多
HBase rpc 0.94中


例如在client put数据时,会调用htable的flushCommits,再调HConnectionImplementationr的processBatch,再调processBatchCallback
中,在这里异步调用线程,并使用future取得结果,最终执行的是call方法。
        // step 2: make the requests

        Map<HRegionLocation, Future<MultiResponse>> futures =
            new HashMap<HRegionLocation, Future<MultiResponse>>(
                actionsByServer.size());

        for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
          futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
        }

        // step 3: collect the failures and successes and prepare for retry

        for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
             : futures.entrySet()) {
          HRegionLocation loc = responsePerServer.getKey();


    private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
        final MultiAction<R> multi, final byte [] tableName) {
      // TODO: This does not belong in here!!! St.Ack  HConnections should
      // not be dealing in Callables; Callables have HConnections, not other
      // way around.
      final HConnection connection = this;
      return new Callable<MultiResponse>() {
       public MultiResponse call() throws IOException {
         ServerCallable<MultiResponse> callable =
           new ServerCallable<MultiResponse>(connection, tableName, null) {
             public MultiResponse call() throws IOException {
               return server.multi(multi);
             }
             @Override
             public void connect(boolean reload) throws IOException {
               server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());
             }
           };
         return callable.withoutRetries();
       }
     };
   }



其中的异步调用的线程池
    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
    if (maxThreads == 0) {
      maxThreads = 1; // is there a better default?
    }
    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);

    // Using the "direct handoff" approach, new threads will only be created
    // if it is necessary and will grow unbounded. This could be bad but in HCM
    // we only create as many Runnables as there are region servers. It means
    // it also scales when new region servers are added.
    this.pool = new ThreadPoolExecutor(1, maxThreads,
        keepAliveTime, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        new DaemonThreadFactory());
    ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);

    this.finishSetup();


HMaster or HRegionServer初始化创建HBaseServer调用HBaseRPC.getServer->

    this.rpcServer = HBaseRPC.getServer(this,
      new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
        initialIsa.getHostName(), // BindAddress is IP we got for this server.
        initialIsa.getPort(),
        numHandlers,
        0, // we dont use high priority handlers in master
        conf.getBoolean("hbase.rpc.verbose", false), conf,
        0); // this is a DNC w/o high priority handlers

HBase 0.94 HRegionServer默认启动10个handler线程用于处理rpc请求(hbase.regionserver.handler.count),

    this.rpcServer = HBaseRPC.getServer(this,
      new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
        OnlineRegions.class},
        initialIsa.getHostName(), // BindAddress is IP we got for this server.
        initialIsa.getPort(),
        conf.getInt("hbase.regionserver.handler.count", 10),
        conf.getInt("hbase.regionserver.metahandler.count", 10),
        conf.getBoolean("hbase.rpc.verbose", false),
        conf, QOS_THRESHOLD);
    // Set our address.
    this.isa = this.rpcServer.getListenerAddress();

    this.rpcServer.setErrorHandler(this);
    this.rpcServer.setQosFunction(new QosFunction());



HBaseServer的Listerner的reader线程接到RPC请求后,会丢到queue中,
其中的reader线程数是由一个线程池决定
     this.readThreads = conf.getInt(
        "ipc.server.read.threadpool.size",
        10);

    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();

      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads,
        new ThreadFactoryBuilder().setNameFormat(
          "IPC Reader %d on port " + port).setDaemon(true).build());
      for (int i = 0; i < readThreads; ++i) {
        Reader reader = new Reader();
        readers[i] = reader;
        readPool.execute(reader);
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }


1. HBaseServer创建后有几个重要的角色.
1.1 Listener deamon线程,负责接收HMaster,HRegionServer,HBase Client的http请求.
1.2 Responder demon线程,负责将处理完的请求,发送回调用者.
1.3 Connection listener接收到的每个Socket请求都会创建一个Connection 实例.
1.4 Call 每一个客户端的发送的请求由Connection读取到有效数据后都会生成一个Call实例
1.5 LinkedBlockingQueue callQueue 每个由新生成的call都会放入到callQueue这个队列里.
1.6 Handler 从callQueue中取出call,并对call进行反射的调用,生成的结果value,交由responder处理.
1.7 LinkedList Connection.responseQueue ,用来存放已经由handler处理过的属于同一个Connection的call.
HBaseServer->listener = new Listener();->Reader.run->doRunLoop->doRead->Connection.readAndProcess->processData中将rpc请求生成call并加入到queue中。
      Call call = new Call(id, param, this, responder, callSize);
      callQueueSize.add(callSize);

HBaseServer的下面方法决定了接收RPC请求的queue大小maxQueueLength ,默认是100

    String oldMaxQueueSize = this.conf.get("ipc.server.max.queue.size");
    if (oldMaxQueueSize == null) {
      this.maxQueueLength =
        this.conf.getInt("ipc.server.max.callqueue.length",
          handlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
    } else {
      LOG.warn("ipc.server.max.queue.size was renamed " +
               "ipc.server.max.callqueue.length, " +
               "please update your configuration");
      this.maxQueueLength = Integer.getInteger(oldMaxQueueSize);
    }

    this.maxQueueSize =
      this.conf.getInt("ipc.server.max.callqueue.size",
        DEFAULT_MAX_CALLQUEUE_SIZE);
     this.readThreads = conf.getInt(
        "ipc.server.read.threadpool.size",
        10);
    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueLength);



当regionserver启动时调用startServiceThreads内会调用HBaseServer的下面方法启动10个hander线程。

  public synchronized void startThreads() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];

    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(callQueue, i);
      handlers[i].start();
    }

    if (priorityHandlerCount > 0) {
      priorityHandlers = new Handler[priorityHandlerCount];
      for (int i = 0 ; i < priorityHandlerCount; i++) {
        priorityHandlers[i] = new Handler(priorityCallQueue, i);
        priorityHandlers[i].start();
      }
    }
  }


handler线程最终会调用到run的call方法,再此方法中会射出相应的方法并最终调用,也就是会调用相应的regionserver的方法。

        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);

        //Verify protocol version.
        //Bypass the version check for VersionedProtocol
        if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
          long clientVersion = call.getProtocolVersion();
          ProtocolSignature serverInfo = ((VersionedProtocol) instance)
              .getProtocolSignature(protocol.getCanonicalName(), call
                  .getProtocolVersion(), call.getClientMethodsHash());
          long serverVersion = serverInfo.getVersion();
          if (serverVersion != clientVersion) {
            LOG.warn("Version mismatch: client version=" + clientVersion
                + ", server version=" + serverVersion);
            throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
                serverVersion);
          }
        }
        Object impl = null;
        if (protocol.isAssignableFrom(this.implementation)) {
          impl = this.instance;
        }
        else {
          throw new HBaseRPC.UnknownProtocolException(protocol);
        }

        long startTime = System.currentTimeMillis();
        Object[] params = call.getParameters();
        Object value = method.invoke(impl, params);
分享到:
评论

相关推荐

    藏经阁-HBase Coprocessor-22.pdf

    4.Endpoint:终端是动态 RPC 插件的接口,它的实现代码被安装在服务器端,能够通过 HBase RPC 调用唤醒提供接口。 HBase Coprocessor 的应用场景: 1.二级索引:HBase Coprocessor 可以实现二级索引的创建和维护,...

    2-6+HBase+Coprocessor.pdf

    - Endpoint:作为动态RPC插件的接口,其服务端实现可以被HBase RPC调用触发,提供自定义功能。 【Endpoint服务端实现】 Endpoint在服务端的实现涉及到RPC通信,因此客户端和服务端需定义一致的接口。HBase使用...

    HBaseCoprocessor的实现与应用.pdf

    - **Endpoint**:动态 RPC 插件接口,实现代码部署在服务器端,通过 HBase RPC 调用触发。 #### 二、Endpoint服务端实现 Endpoint 作为一种特殊的 Coprocessor,允许在服务器端直接处理请求,无需将所有数据返回给...

    HBase源码分析

    HBase中的RPC机制与Java的RMI(远程方法调用)不同,它采用了一种轻量级的RPC机制,这种方式不依赖于Java的RMI机制,使得它能够更好地适应分布式环境下的复杂通信需求。在HBase中,RPC机制是其主要的通信手段,它...

    hbase安装与hbase架构说明

    在HBase的架构中,Client是用户与系统交互的接口,它通过远程过程调用(RPC)机制与HMaster和HRegionServer通信。对于数据读写操作,Client直接与HRegionServer交互,而对于表管理和元数据操作,Client则与HMaster...

    [HBase]源码级强力分析hadoop的RPC机制

    这些天一直奔波于长沙和武汉之间,忙着腾讯的笔试、面试,以至于对hadoopRPC(RemoteProcedureCallProtocol,远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。...

    HBase与hive整合 和 hive与hdfs结合的效率对比1

    1. 全表扫描:在无缓存情况下,Hive on HBase的查询速度远低于Hive on HDFS,因为HBase的全表扫描涉及到大量RPC调用。然而,通过设置`hbase.client.scanner.caching`参数,可以显著提高Hive on HBase的性能,尽管...

    hbase- java开发连接工具类

    3. **RPC机制**:HBase使用远程过程调用(RPC)与RegionServer进行通信,处理数据请求。这个JAR包包含了相关的RPC实现。 4. **行键(RowKey)索引**:HBase是一种列族式数据库,行键是其主要的索引方式。`hbase-...

    hbase性能优化

    增加RPC调用数量,通过修改hbase-site.xml中的hbase.regionserver.handler.count属性来实现,以提高并发处理能力。 在HBase的使用中,如果直接将时间戳作为行健,会导致写入单个region时出现热点问题。因为HBase的...

    php-hbase-thrift

    Thrift的核心在于它的序列化机制和RPC(远程过程调用)框架,使得开发者可以轻松地在各种编程语言间构建和消费服务。 在PHP访问HBase时,由于HBase本身是用Java实现的,因此需要一个中间层来桥接PHP和HBase。这就是...

    hbase-client

    这些客户端封装了HBase的RPC协议,简化了在不同语言环境下的开发工作。以Python的happybase为例,它提供了一种面向对象的方式来操作HBase,使得代码更加简洁易懂。 五、连接管理和安全性 HBase客户端需要配置正确...

    大数据HBASE考题材料

    HRegionServer与HMaster及客户端之间的通信采用RPC协议,即远程过程调用协议,这是一种用于不同计算机系统间的进程间通信的方式。 8. **HFile中的KeyValue结构** 在HFile数据格式中,KeyValue数据结构的Value...

    企业中应用HBase

    - **解决方案**:为了实现这一目标,HBase采用了一种新的RPC引擎,即Protobuf RPC引擎来替代原有的Writable RPC引擎。这种改变不仅提高了RPC请求/响应消息的效率,还通过使用Protobuf格式实现了更灵活的数据序列化。...

    thrift1 查询hbase

    1. **Thrift接口**:Thrift提供了一种序列化和RPC(远程过程调用)机制,允许开发者定义服务接口,并在多种语言之间实现这些接口。Thrift1是早期版本,虽然现在已更新到Thrift2,但对某些场景仍然适用。它通过生成...

    Hbase运维手册.pdf

    《HBase运维手册》主要涵盖了HBase数据库的运维核心要点,包括Region管理、缓存机制、读写性能、压缩操作、内存使用以及RPC调用等多个方面。以下是对这些知识点的详细解析: 1. **Region管理**: - Region数量:...

    hbase 源码包

    4. **RPC通信**:`org.apache.hadoop.hbase.ipc`包下的RpcServer实现了客户端与服务器之间的远程过程调用,处理客户端请求。 5. **版本控制与并发控制**:每个Cell都有时间戳,用于版本控制;`org.apache.hadoop....

Global site tag (gtag.js) - Google Analytics