- 浏览: 27312 次
- 性别:
- 来自: 北京
最新评论
-
pinkmoon:
HBase 0.96配置 snappy(绝对有效哦亲) -
pinkmoon:
记一次痛苦的 hadoop 2编译过程 -
半点玻璃心:
dsx1013 写道你好,我有snappy 源码安装,没有指定 ...
HBase 0.96配置 snappy(绝对有效哦亲) -
dsx1013:
你好,我有snappy 源码安装,没有指定安装目录,默认安装路 ...
HBase 0.96配置 snappy(绝对有效哦亲) -
半点玻璃心:
推文7 写道你好,我也遇到了这个问题,能否麻烦把您编译的had ...
HBase 0.96配置 snappy(绝对有效哦亲)
终于把RS的定位问题搞清楚了些些,时间不等人,马上看看connection.processBatch中,step2是如何把任务提交到服务端的吧
之前已经看到,首先创建了一个Callable<MuiltyResponse>对象,而该对象的call方法实际上又创建了一个ServerCallable<MultiResponse> 对象,然后调用了它的withoutRetries方法。
这个方法很简单,调用了connect方法和multi方法
一个个开始啃吧,先看看connect,其中server是ServerCallable的成员,HRegionInterface类型
HRegionInterface
这段代码在regionlocation的博文中已经帖过,主要是利用servername string做了一个hash缓存,如果已经存在则返回之,否则创建并缓存之
HBaseRPC.waitForProxy,参数比较多
后面5个就不说了,常见参数,先看看前三个吧
RpcEngine:注释说的很清楚,RPC实现。具体是什么后续跟进
protocol:HRegionInterface的子类
clientVersion:HRegionInterface.VERSION,当前为常量29。不多说了,啃代码。主代码目测也太简单了,直接循环尝试返回rpcClient.getProxy,然后处理异常。看来还得往下挖
好吧,原来rpcClient.getProxy才是重头戏,一次次的浇熄哥接近终点的热情
RPCEgine:WritableRpcEngine,你也可以通过hbase.rpc.engine配置一个全限定的类名来覆盖它
这就拿到代理类了,Handler实现为Invoker,WritableRpcEngine的一个内部类
接着继续看Invoker吧,真的是代理哦,太简单了就是记录了时间,然后就没有然后了。几乎没有业务,实现放在了Invocation类中,由HBaseClient调用
Invocation类其实主要是将要执行的类,方法,以及方法参数做了一层基于writable的封装,依赖HbaseObjectWritable类序列化和反序列化参数。这个比较漫长,也比较枯燥,专门拿一天写一篇来解析吧。
现在执行序列回到getHRegionConnection,我们拿到并缓存了一个HRegionInterface的一个代理,这个代理在执行的时候实际上是调用HbaseClient的Call方法。至于Call方法里面都做了些神马,后续再看。
既然连接准备好了,就该调用call方法了,call方法实际上是调用了server.multi方法,也就是HRegionInterface的multi方法。这下终于轮到HbaseClient出场了,隐藏得好深。
之前已经看到,首先创建了一个Callable<MuiltyResponse>对象,而该对象的call方法实际上又创建了一个ServerCallable<MultiResponse> 对象,然后调用了它的withoutRetries方法。
这个方法很简单,调用了connect方法和multi方法
一个个开始啃吧,先看看connect,其中server是ServerCallable的成员,HRegionInterface类型
// 备注【1】:这是ServerCallable默认的connect方法 public void connect(final boolean reload) throws IOException { this.location = connection.getRegionLocation(tableName, row, reload); this.server = connection.getHRegionConnection(location.getHostname(), location.getPort()); } // 备注【1】:这是createCallable的时候重写ServerCallable connect方法 // 此前的代码中已经在step1获取到了rowkey对应的region信息,所以这里就不再重复获取了,这毕竟是一个重量级的操作 public void connect(boolean reload) throws IOException { server = connection.getHRegionConnection(loc.getHostname(), loc.getPort()); }
HRegionInterface
public HRegionInterface getHRegionConnection(final String hostname, final int port) throws IOException { return getHRegionConnection(hostname, port, false); } @Override public HRegionInterface getHRegionConnection(final String hostname,final int port, final boolean master) throws IOException { return getHRegionConnection(hostname, port, null, master); } HRegionInterface getHRegionConnection(final String hostname, final int port, final InetSocketAddress isa, final boolean master) throws IOException { if (master) getMaster();//如果是链接master,就去找吧,这里先不深究了 HRegionInterface server; String rsName = null; if (isa != null) { rsName = Addressing.createHostAndPortStr(isa.getHostName(), isa.getPort()); } else { rsName = Addressing.createHostAndPortStr(hostname, port); } ensureZookeeperTrackers(); // See if we already have a connection (common case) server = this.servers.get(rsName); if (server == null) { // create a unique lock for this RS (if necessary) this.connectionLock.putIfAbsent(rsName, rsName); // get the RS lock synchronized (this.connectionLock.get(rsName)) { // do one more lookup in case we were stalled above server = this.servers.get(rsName); if (server == null) { try { // Only create isa when we need to. InetSocketAddress address = isa != null ? isa : new InetSocketAddress(hostname, port); // definitely a cache miss. establish an RPC for this RS // 前面都是构建地址,缓存判断、操作之类,这里是核心代码 server = HBaseRPC.waitForProxy(this.rpcEngine, serverInterfaceClass, HRegionInterface.VERSION, address, this.conf, this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); this.servers.put(Addressing.createHostAndPortStr( address.getHostName(), address.getPort()), server); } catch (RemoteException e) { LOG.warn("RemoteException connecting to RS", e); // Throw what the RemoteException was carrying. throw e.unwrapRemoteException(); } } } } return server; }
这段代码在regionlocation的博文中已经帖过,主要是利用servername string做了一个hash缓存,如果已经存在则返回之,否则创建并缓存之
HBaseRPC.waitForProxy,参数比较多
后面5个就不说了,常见参数,先看看前三个吧
RpcEngine:注释说的很清楚,RPC实现。具体是什么后续跟进
protocol:HRegionInterface的子类
clientVersion:HRegionInterface.VERSION,当前为常量29。不多说了,啃代码。主代码目测也太简单了,直接循环尝试返回rpcClient.getProxy,然后处理异常。看来还得往下挖
public static <T extends VersionedProtocol> T waitForProxy(RpcEngine rpcClient, Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int maxAttempts,//hbase.client.rpc.maxattempts,默认1,这个坑货配置文件里还没有 int rpcTimeout, long timeout//hbase.rpc.timeout,默认60秒,亲,你们的客户端能忍受这么长的超时等待吗 ) throws IOException { // HBase does limited number of reconnects which is different from hadoop. long startTime = System.currentTimeMillis(); IOException ioe; int reconnectAttempts = 0; while (true) { try { return rpcClient.getProxy(protocol, clientVersion, addr, conf, rpcTimeout); } catch (SocketTimeoutException te) { // namenode is busy LOG.info("Problem connecting to server: " + addr); ioe = te; } catch (IOException ioex) { // We only handle the ConnectException. ConnectException ce = null; if (ioex instanceof ConnectException) { ce = (ConnectException) ioex; ioe = ce; } else if (ioex.getCause() != null && ioex.getCause() instanceof ConnectException) { ce = (ConnectException) ioex.getCause(); ioe = ce; } else if (ioex.getMessage().toLowerCase() .contains("connection refused")) { ce = new ConnectException(ioex.getMessage()); ioe = ce; } else { // This is the exception we can't handle. ioe = ioex; } if (ce != null) { handleConnectionException(++reconnectAttempts, maxAttempts, protocol, addr, ce); } } // check if timed out if (System.currentTimeMillis() - timeout >= startTime) { throw ioe; } // wait for retry try { Thread.sleep(1000); } catch (InterruptedException ie) { // IGNORE } } }
好吧,原来rpcClient.getProxy才是重头戏,一次次的浇熄哥接近终点的热情
if (rpcEngine == null) { this.rpcEngine = HBaseRPC.getProtocolEngine(conf); } public static synchronized RpcEngine getProtocolEngine(Configuration conf) { // check for a configured default engine Class<?> impl = conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class); LOG.debug("Using RpcEngine: " + impl.getName()); RpcEngine engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf); return engine; }
RPCEgine:WritableRpcEngine,你也可以通过hbase.rpc.engine配置一个全限定的类名来覆盖它
public <T extends VersionedProtocol> T getProxy( Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout) throws IOException { if (this.client == null) { // client是一个HBaseClient实例,RPCEngine初始化的时候在setconf方法中注入// MY TODO throw new IOException("Client must be initialized by calling setConf(Configuration)"); } // 真的是创建了一个代理呢 T proxy = (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, new Invoker(client, protocol, addr, User.getCurrent(), conf, HBaseRPC.getRpcTimeout(rpcTimeout))); /* * TODO: checking protocol version only needs to be done once when we setup a new * HBaseClient.Connection. Doing it every time we retrieve a proxy instance is resulting * in unnecessary RPC traffic. */ //检查是否服务端版本号与客户端版本号是否一致,否则只能说再见了 long serverVersion = ((VersionedProtocol) proxy) .getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion != clientVersion) { throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion, serverVersion); } return proxy; }
这就拿到代理类了,Handler实现为Invoker,WritableRpcEngine的一个内部类
接着继续看Invoker吧,真的是代理哦,太简单了就是记录了时间,然后就没有然后了。几乎没有业务,实现放在了Invocation类中,由HBaseClient调用
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } HbaseObjectWritable value = (HbaseObjectWritable) client.call(new Invocation(method, protocol, args), address, protocol, ticket, rpcTimeout); if (logDebug) { // FIGURE HOW TO TURN THIS OFF! long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } public Invocation(Method method, Class<? extends VersionedProtocol> declaringClass, Object[] parameters) { this.methodName = method.getName(); this.parameterClasses = method.getParameterTypes(); this.parameters = parameters; if (declaringClass.equals(VersionedProtocol.class)) { //VersionedProtocol is exempted from version check. clientVersion = 0; clientMethodsHash = 0; } else { try { Field versionField = declaringClass.getField("VERSION"); versionField.setAccessible(true); this.clientVersion = versionField.getLong(declaringClass); } catch (NoSuchFieldException ex) { throw new RuntimeException("The " + declaringClass, ex); } catch (IllegalAccessException ex) { throw new RuntimeException(ex); } this.clientMethodsHash = ProtocolSignature.getFingerprint( declaringClass.getMethods()); } }
Invocation类其实主要是将要执行的类,方法,以及方法参数做了一层基于writable的封装,依赖HbaseObjectWritable类序列化和反序列化参数。这个比较漫长,也比较枯燥,专门拿一天写一篇来解析吧。
现在执行序列回到getHRegionConnection,我们拿到并缓存了一个HRegionInterface的一个代理,这个代理在执行的时候实际上是调用HbaseClient的Call方法。至于Call方法里面都做了些神马,后续再看。
既然连接准备好了,就该调用call方法了,call方法实际上是调用了server.multi方法,也就是HRegionInterface的multi方法。这下终于轮到HbaseClient出场了,隐藏得好深。
public Writable call(Writable param, InetSocketAddress addr, Class<? extends VersionedProtocol> protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { Call call = new Call(param);//param,这里就是封装好类名、方法名以及参数的invocation对象 Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);//这里开始连接,亲,你真的要连接了吗,不要骗人啊。 connection.sendParam(call); // MS真的连接了,还把执行代理发了出去 boolean interrupted = false; //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (call) { //如果请求没完成,连接没中断,线程没中断,没超时没错误,等吧。 while (!call.done) { if (connection.shouldCloseConnection.get()) { throw new IOException("Unexpected closed connection"); } try { call.wait(1000); // wait for the result } catch (InterruptedException ignored) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } // local exception throw wrapException(addr, call.error); } return call.value; } }
发表评论
-
hbase MemStoreLAB代码浅析-1
2014-09-30 17:21 1147本文基于 hbase 0.98x,如果发现源码与你的副本不符合 ... -
HBase 0.96 服务端写流程代码阅读笔记
2014-02-24 15:36 0private long doMiniBatchMutati ... -
HBase 0.96配置 snappy(绝对有效哦亲)
2014-02-12 14:10 4055通常情况下,snappy压缩算法无非是hbase 最好的伴侣, ... -
HBase Memstore flush代码阅读笔记-2 -由 XXX 触发的 flush
2014-01-22 18:44 0之前看到在执行 mutate 操作之前,RS 会检查 mems ... -
HBase Memstore flush代码阅读笔记-1 -由 lowerlimit 和 upperlimit 触发的 flush
2014-01-22 18:34 1824在写请求(put,delete)到达服务端时,服务端(HReg ... -
HBase Memstore flush代码阅读笔记-2-由单个 memstore大小超过限制触发的 flush
2014-01-23 15:15 3560本代码基于0.96.1.1:http://svn.apache ... -
HBase Memstore配置
2014-01-21 11:09 486... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.96-hadoop2)
2013-11-11 19:13 879看看MultiServerCallable的核心方法,call ... -
HBASE 代码阅读笔记-1 - PUT-2-定位RS和R-1(0.96-HADOOP2)
2013-11-08 19:54 2049按照94的阅读进度,这里该看如何定位RS和Region了 先回 ... -
HBASE 代码阅读笔记-1 - PUT操作客户端主流程(基于0.96.0-hadoop2)
2013-11-08 19:23 4490又回来了,还是看put,不过版本号变了,希望看0.94的童靴移 ... -
HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.94.12)
2013-11-08 12:55 163上一篇把提交任务的主流程整理了下,遗留了连接、发送请求、处理响 ... -
HBase Memstore配置
2013-11-07 15:47 2915HBase Memstore配置 本文为翻译,原英文地址:ht ... -
HBASE 代码阅读笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)
2013-11-07 17:01 193上一篇http://dennis-lee-gammy.itey ... -
HBASE 代码阅读笔记-1 - PUT-1-主流程(基于0.94.12)
2013-11-06 19:57 342最近闲来无事看看hbase ...
相关推荐
赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.2.12.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.2.12-javadoc-API文档-中文(简体)版.zip; Maven...
hbase-hbck2-1.1.0-SNAPSHOT.jar
赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.1.3.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.1.3-javadoc-API文档-中文(简体)版.zip; Maven...
HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...
HBCK是HBase1.x中的命令,到了HBase2.x中,HBCK命令不适用,且它的写功能(-fix)已删除; HBCK2已经被剥离出HBase成为了一个单独的项目,如果你想要使用这个工具,需要根据自己HBase的版本,编译源码。其GitHub地址...
1. **兼容性增强**:此版本的绑定确保了与HBase 1.4的完全兼容性,这意味着它可以充分利用HBase 1.4的新特性和性能优化。 2. **工作负载多样性**:YCSB提供了多种预定义的工作负载模型,如读多写少、写多读少、读写...
HBCK2 jar包是这个工具的可执行文件,通常在HBase的lib目录下可以找到,名为`hbase-hbck2-x.x.x.jar`,其中`x.x.x`表示具体的HBase版本号。这个jar包包含了所有执行HBCK2命令所需的功能和类。你可以通过Hadoop的`...
赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.1.3.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.1.3-javadoc-API文档-中文(简体)-英语-对照版.zip...
赠送源代码:hbase-prefix-tree-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-prefix-tree-1.1.3.pom; 包含翻译后的API文档:hbase-prefix-tree-1.1.3-javadoc-API文档-中文(简体)版.zip; Maven坐标:org....
本文将深入探讨这两个技术及其结合体`phoenix-hbase-2.2-5.1.2-bin.tar.gz`的详细内容。 首先,HBase(Hadoop Database)是Apache软件基金会的一个开源项目,它构建于Hadoop之上,是一款面向列的分布式数据库。...
hbase-client-2.1.0-cdh6.3.0.jar
赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3.pom; 包含翻译后的API文档:hbase-hadoop-compat-1.1.3-javadoc-API文档-中文(简体)版.zip; Maven坐标:...
赠送源代码:hbase-hadoop2-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.4.3.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.4.3-javadoc-API文档-中文(简体)版.zip; Maven...
赠送jar包:flink-hbase_2.11-1.10.0.jar; 赠送原API文档:flink-hbase_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-hbase_2.11-1.10.0-sources.jar; 赠送Maven依赖信息文件:flink-hbase_2.11-1.10.0.pom; ...
Hbase修复工具 示例情景: Q:缺失hbase.version文件 A:加上选项 -fixVersionFile 解决 Q:如果一个region即不在META表中,又不在hdfs上面,但是在regionserver的online region集合中 A:加上选项 -...
赠送源代码:hbase-metrics-api-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-metrics-api-1.4.3.pom; 包含翻译后的API文档:hbase-metrics-api-1.4.3-javadoc-API文档-中文(简体)版.zip; Maven坐标:org....
赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-1.2.12.pom; 包含翻译后的API文档:hbase-hadoop2-compat-1.2.12-javadoc-API文档-中文(简体)-英语-对照版....
3. **配置HBase**:编辑`conf/hbase-site.xml`,设置HBase的主配置,如`hbase.rootdir`(HDFS中的HBase目录)和`hbase.zookeeper.quorum`(Zookeeper集群地址)。 4. **启动与停止**:使用`start-hbase.sh`启动HBase...
《Phoenix与HBase的深度解析:基于phoenix-hbase-1.4-4.16.1-bin的探讨》 Phoenix是一种开源的SQL层,它为Apache HBase提供了高性能的关系型数据库查询能力。在大数据领域,HBase因其分布式、列式存储的特性,常被...
赠送源代码:hbase-hadoop-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop-compat-1.1.3.pom; 包含翻译后的API文档:hbase-hadoop-compat-1.1.3-javadoc-API文档-中文(简体)-英语-对照版.zip; ...