`
半点玻璃心
  • 浏览: 27371 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)

阅读更多
终于把RS的定位问题搞清楚了些些,时间不等人,马上看看connection.processBatch中,step2是如何把任务提交到服务端的吧

之前已经看到,首先创建了一个Callable<MuiltyResponse>对象,而该对象的call方法实际上又创建了一个ServerCallable<MultiResponse> 对象,然后调用了它的withoutRetries方法。
这个方法很简单,调用了connect方法和multi方法
一个个开始啃吧,先看看connect,其中server是ServerCallable的成员,HRegionInterface类型
    // 备注【1】:这是ServerCallable默认的connect方法
    public void connect(final boolean reload) throws IOException {
        this.location = connection.getRegionLocation(tableName, row, reload);
        this.server = connection.getHRegionConnection(location.getHostname(),
                location.getPort());
    }
    // 备注【1】:这是createCallable的时候重写ServerCallable connect方法
    // 此前的代码中已经在step1获取到了rowkey对应的region信息,所以这里就不再重复获取了,这毕竟是一个重量级的操作
    public void connect(boolean reload) throws IOException {
        server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());
    } 


HRegionInterface
        public HRegionInterface getHRegionConnection(final String hostname,
                                                     final int port)
                throws IOException {
            return getHRegionConnection(hostname, port, false);
        }
        @Override
        public HRegionInterface getHRegionConnection(final String hostname,final int port, final boolean master)
                throws IOException {
            return getHRegionConnection(hostname, port, null, master);
        }


        HRegionInterface getHRegionConnection(final String hostname, final int port,
                                              final InetSocketAddress isa, final boolean master)
                throws IOException {
            if (master) getMaster();//如果是链接master,就去找吧,这里先不深究了
            HRegionInterface server;
            String rsName = null;
            if (isa != null) {
                rsName = Addressing.createHostAndPortStr(isa.getHostName(),
                        isa.getPort());
            } else {
                rsName = Addressing.createHostAndPortStr(hostname, port);
            }
            ensureZookeeperTrackers();
            // See if we already have a connection (common case)
            server = this.servers.get(rsName);
            if (server == null) {
                // create a unique lock for this RS (if necessary)
                this.connectionLock.putIfAbsent(rsName, rsName);
                // get the RS lock
                synchronized (this.connectionLock.get(rsName)) {
                    // do one more lookup in case we were stalled above
                    server = this.servers.get(rsName);
                    if (server == null) {
                        try {
                            // Only create isa when we need to.
                            InetSocketAddress address = isa != null ? isa :
                                    new InetSocketAddress(hostname, port);
                            // definitely a cache miss. establish an RPC for this RS
                            // 前面都是构建地址,缓存判断、操作之类,这里是核心代码
                            server = HBaseRPC.waitForProxy(this.rpcEngine,
                                    serverInterfaceClass, HRegionInterface.VERSION,
                                    address, this.conf,
                                    this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
                            this.servers.put(Addressing.createHostAndPortStr(
                                    address.getHostName(), address.getPort()), server);
                        } catch (RemoteException e) {
                            LOG.warn("RemoteException connecting to RS", e);
                            // Throw what the RemoteException was carrying.
                            throw e.unwrapRemoteException();
                        }
                    }
                }
            }
            return server;
        }


这段代码在regionlocation的博文中已经帖过,主要是利用servername string做了一个hash缓存,如果已经存在则返回之,否则创建并缓存之

HBaseRPC.waitForProxy,参数比较多
后面5个就不说了,常见参数,先看看前三个吧
RpcEngine:注释说的很清楚,RPC实现。具体是什么后续跟进
protocol:HRegionInterface的子类
clientVersion:HRegionInterface.VERSION,当前为常量29。不多说了,啃代码。主代码目测也太简单了,直接循环尝试返回rpcClient.getProxy,然后处理异常。看来还得往下挖
    public static <T extends VersionedProtocol> T waitForProxy(RpcEngine rpcClient,
             Class<T> protocol,
             long clientVersion,
             InetSocketAddress addr,
             Configuration conf,
             int maxAttempts,//hbase.client.rpc.maxattempts,默认1,这个坑货配置文件里还没有
             int rpcTimeout,
             long timeout//hbase.rpc.timeout,默认60秒,亲,你们的客户端能忍受这么长的超时等待吗
    ) throws IOException {
        // HBase does limited number of reconnects which is different from hadoop.
        long startTime = System.currentTimeMillis();
        IOException ioe;
        int reconnectAttempts = 0;
        while (true) {
            try {
                return rpcClient.getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
            } catch (SocketTimeoutException te) {  // namenode is busy
                LOG.info("Problem connecting to server: " + addr);
                ioe = te;
            } catch (IOException ioex) {
                // We only handle the ConnectException.
                ConnectException ce = null;
                if (ioex instanceof ConnectException) {
                    ce = (ConnectException) ioex;
                    ioe = ce;
                } else if (ioex.getCause() != null
                        && ioex.getCause() instanceof ConnectException) {
                    ce = (ConnectException) ioex.getCause();
                    ioe = ce;
                } else if (ioex.getMessage().toLowerCase()
                        .contains("connection refused")) {
                    ce = new ConnectException(ioex.getMessage());
                    ioe = ce;
                } else {
                    // This is the exception we can't handle.
                    ioe = ioex;
                }
                if (ce != null) {
                    handleConnectionException(++reconnectAttempts, maxAttempts, protocol,
                            addr, ce);
                }
            }
            // check if timed out
            if (System.currentTimeMillis() - timeout >= startTime) {
                throw ioe;
            }

            // wait for retry
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ie) {
                // IGNORE
            }
        }
    }


好吧,原来rpcClient.getProxy才是重头戏,一次次的浇熄哥接近终点的热情
	if (rpcEngine == null) {
                this.rpcEngine = HBaseRPC.getProtocolEngine(conf);
        }		
			
	public static synchronized RpcEngine getProtocolEngine(Configuration conf) {
        // check for a configured default engine
        Class<?> impl =
                conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class);

        LOG.debug("Using RpcEngine: " + impl.getName());
        RpcEngine engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
        return engine;
    }

RPCEgine:WritableRpcEngine,你也可以通过hbase.rpc.engine配置一个全限定的类名来覆盖它
public <T extends VersionedProtocol> T getProxy(
            Class<T> protocol, long clientVersion,
            InetSocketAddress addr, Configuration conf, int rpcTimeout)
            throws IOException {
        if (this.client == null) { // client是一个HBaseClient实例,RPCEngine初始化的时候在setconf方法中注入// MY TODO
            throw new IOException("Client must be initialized by calling setConf(Configuration)");
        }
        // 真的是创建了一个代理呢
        T proxy =
                (T) Proxy.newProxyInstance(
                        protocol.getClassLoader(), new Class[]{protocol},
                        new Invoker(client, protocol, addr, User.getCurrent(), conf,
                                HBaseRPC.getRpcTimeout(rpcTimeout)));

    /*
     * TODO: checking protocol version only needs to be done once when we setup a new
     * HBaseClient.Connection.  Doing it every time we retrieve a proxy instance is resulting
     * in unnecessary RPC traffic.
     */ //检查是否服务端版本号与客户端版本号是否一致,否则只能说再见了
        long serverVersion = ((VersionedProtocol) proxy)
                .getProtocolVersion(protocol.getName(), clientVersion);
        if (serverVersion != clientVersion) {
            throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
                    serverVersion);
        }

        return proxy;
    }


这就拿到代理类了,Handler实现为Invoker,WritableRpcEngine的一个内部类

接着继续看Invoker吧,真的是代理哦,太简单了就是记录了时间,然后就没有然后了。几乎没有业务,实现放在了Invocation类中,由HBaseClient调用

      public Object invoke(Object proxy, Method method, Object[] args)
                throws Throwable {
            final boolean logDebug = LOG.isDebugEnabled();
            long startTime = 0;
            if (logDebug) {
                startTime = System.currentTimeMillis();
            }

            HbaseObjectWritable value = (HbaseObjectWritable)
                    client.call(new Invocation(method, protocol, args), address,
                            protocol, ticket, rpcTimeout);
            if (logDebug) {
                // FIGURE HOW TO TURN THIS OFF!
                long callTime = System.currentTimeMillis() - startTime;
                LOG.debug("Call: " + method.getName() + " " + callTime);
            }
            return value.get();
        }

        public Invocation(Method method,
                      Class<? extends VersionedProtocol> declaringClass, Object[] parameters) {
        this.methodName = method.getName();
        this.parameterClasses = method.getParameterTypes();
        this.parameters = parameters;
        if (declaringClass.equals(VersionedProtocol.class)) {
            //VersionedProtocol is exempted from version check.
            clientVersion = 0;
            clientMethodsHash = 0;
        } else {
            try {
                Field versionField = declaringClass.getField("VERSION");
                versionField.setAccessible(true);
                this.clientVersion = versionField.getLong(declaringClass);
            } catch (NoSuchFieldException ex) {
                throw new RuntimeException("The " + declaringClass, ex);
            } catch (IllegalAccessException ex) {
                throw new RuntimeException(ex);
            }
            this.clientMethodsHash = ProtocolSignature.getFingerprint(
                    declaringClass.getMethods());
        }
    }

Invocation类其实主要是将要执行的类,方法,以及方法参数做了一层基于writable的封装,依赖HbaseObjectWritable类序列化和反序列化参数。这个比较漫长,也比较枯燥,专门拿一天写一篇来解析吧。

现在执行序列回到getHRegionConnection,我们拿到并缓存了一个HRegionInterface的一个代理,这个代理在执行的时候实际上是调用HbaseClient的Call方法。至于Call方法里面都做了些神马,后续再看。

既然连接准备好了,就该调用call方法了,call方法实际上是调用了server.multi方法,也就是HRegionInterface的multi方法。这下终于轮到HbaseClient出场了,隐藏得好深。

public Writable call(Writable param, InetSocketAddress addr,
                         Class<? extends VersionedProtocol> protocol,
                         User ticket, int rpcTimeout)
            throws InterruptedException, IOException {
        Call call = new Call(param);//param,这里就是封装好类名、方法名以及参数的invocation对象
        Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);//这里开始连接,亲,你真的要连接了吗,不要骗人啊。
        connection.sendParam(call);                 // MS真的连接了,还把执行代理发了出去
        boolean interrupted = false;
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (call) {
            //如果请求没完成,连接没中断,线程没中断,没超时没错误,等吧。
            while (!call.done) {
                if (connection.shouldCloseConnection.get()) {
                    throw new IOException("Unexpected closed connection");
                }
                try {
                    call.wait(1000);                       // wait for the result
                } catch (InterruptedException ignored) {
                    // save the fact that we were interrupted
                    interrupted = true;
                }
            }

            if (interrupted) {
                // set the interrupt flag now that we are done waiting
                Thread.currentThread().interrupt();
            }

            if (call.error != null) {
                if (call.error instanceof RemoteException) {
                    call.error.fillInStackTrace();
                    throw call.error;
                }
                // local exception
                throw wrapException(addr, call.error);
            }
            return call.value;
        }
    }


分享到:
评论

相关推荐

    hbase-hadoop2-compat-1.2.12-API文档-中文版.zip

    赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.2.12.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.2.12-javadoc-API文档-中文(简体)版.zip; Maven...

    hbase-hbck2-1.1.0-SNAPSHOT.jar

    hbase-hbck2-1.1.0-SNAPSHOT.jar

    hbase-hadoop2-compat-1.1.3-API文档-中文版.zip

    赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.1.3.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.1.3-javadoc-API文档-中文(简体)版.zip; Maven...

    hbase-hbck2-1.2.0-SNAPSHOT.jar

    HBCK是HBase1.x中的命令,到了HBase2.x中,HBCK命令不适用,且它的写功能(-fix)已删除; HBCK2已经被剥离出HBase成为了一个单独的项目,如果你想要使用这个工具,需要根据自己HBase的版本,编译源码。其GitHub地址...

    HBase(hbase-2.4.9-bin.tar.gz)

    HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...

    flink-hbase-2.11-1.10.0-API文档-中文版.zip

    赠送jar包:flink-hbase_2.11-1.10.0.jar; 赠送原API文档:flink-hbase_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-hbase_2.11-1.10.0-sources.jar; 赠送Maven依赖信息文件:flink-hbase_2.11-1.10.0.pom; ...

    ycsb-hbase14-binding-0.17.0

    1. **兼容性增强**:此版本的绑定确保了与HBase 1.4的完全兼容性,这意味着它可以充分利用HBase 1.4的新特性和性能优化。 2. **工作负载多样性**:YCSB提供了多种预定义的工作负载模型,如读多写少、写多读少、读写...

    hbase2.x-hbck2 jar包及测试命令

    HBCK2 jar包是这个工具的可执行文件,通常在HBase的lib目录下可以找到,名为`hbase-hbck2-x.x.x.jar`,其中`x.x.x`表示具体的HBase版本号。这个jar包包含了所有执行HBCK2命令所需的功能和类。你可以通过Hadoop的`...

    hbase-hadoop2-compat-1.1.3-API文档-中英对照版.zip

    赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.1.3.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.1.3-javadoc-API文档-中文(简体)-英语-对照版.zip...

    phoenix-hbase-2.2-5.1.2-bin.tar.gz

    本文将深入探讨这两个技术及其结合体`phoenix-hbase-2.2-5.1.2-bin.tar.gz`的详细内容。 首先,HBase(Hadoop Database)是Apache软件基金会的一个开源项目,它构建于Hadoop之上,是一款面向列的分布式数据库。...

    hbase-prefix-tree-1.1.3-API文档-中文版.zip

    赠送源代码:hbase-prefix-tree-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.1.3.pom; 包含翻译后的API文档:hbase-prefix-tree-1.1.3-javadoc-API文档-中文(简体)版.zip; Maven坐标:org....

    hbase-client-2.1.0-cdh6.3.0.jar

    hbase-client-2.1.0-cdh6.3.0.jar

    hbase-hadoop-compat-1.1.3-API文档-中文版.zip

    赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3.pom; 包含翻译后的API文档:hbase-hadoop-compat-1.1.3-javadoc-API文档-中文(简体)版.zip; Maven坐标:...

    hbase-hadoop2-compat-1.4.3-API文档-中文版.zip

    赠送源代码:hbase-hadoop2-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.4.3.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.4.3-javadoc-API文档-中文(简体)版.zip; Maven...

    hbase-hbck2-1.0.0.jar

    Hbase修复工具 示例情景: Q:缺失hbase.version文件 A:加上选项 -fixVersionFile 解决 Q:如果一个region即不在META表中,又不在hdfs上面,但是在regionserver的online region集合中 A:加上选项 -...

    hbase-metrics-api-1.4.3-API文档-中文版.zip

    赠送源代码:hbase-metrics-api-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-metrics-api-1.4.3.pom; 包含翻译后的API文档:hbase-metrics-api-1.4.3-javadoc-API文档-中文(简体)版.zip; Maven坐标:org....

    hbase-hadoop2-compat-1.2.12-API文档-中英对照版.zip

    赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.2.12.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.2.12-javadoc-API文档-中文(简体)-英语-对照版....

    hbase的hbase-1.2.0-cdh5.14.2.tar.gz资源包

    3. **配置HBase**:编辑`conf/hbase-site.xml`,设置HBase的主配置,如`hbase.rootdir`(HDFS中的HBase目录)和`hbase.zookeeper.quorum`(Zookeeper集群地址)。 4. **启动与停止**:使用`start-hbase.sh`启动HBase...

    phoenix-hbase-1.4-4.16.1-bin

    《Phoenix与HBase的深度解析:基于phoenix-hbase-1.4-4.16.1-bin的探讨》 Phoenix是一种开源的SQL层,它为Apache HBase提供了高性能的关系型数据库查询能力。在大数据领域,HBase因其分布式、列式存储的特性,常被...

    hbase-hadoop-compat-1.1.3-API文档-中英对照版.zip

    赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3.pom; 包含翻译后的API文档:hbase-hadoop-compat-1.1.3-javadoc-API文档-中文(简体)-英语-对照版.zip; ...

Global site tag (gtag.js) - Google Analytics