让我们从创建表开始探索hbase内部机制。假设hbase.root目录为/new,测试表名为t1。
client使用HBaseAdmin的createTable接口,过程如下
1. 建立HMasterRPC连接,并调用之,由于hmaster端创建table是异步的,所以这里是一个异步操作。如果不指定split规则,默认会创建一个空region。
getMaster().createTable(desc, splitKeys);
2. client线程全表扫描meta表,检查t1表的region是否都分配好。默认重试100次,每次失败sleep。
MetaScannerVisitor visitor = new MetaScannerVisitorBase() { @Override public boolean processRow(Result rowResult) throws IOException { HRegionInfo info = Writables.getHRegionInfoOrNull( rowResult.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); ...... //拿'server'列,如果有值,则认为分配成功 byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); // Make sure that regions are assigned to server if (value != null && value.length > 0) { hostAndPort = Bytes.toString(value); } if (!(info.isOffline() || info.isSplit()) && hostAndPort != null) { actualRegCount.incrementAndGet(); } return true; } }; MetaScanner.metaScan(conf, visitor, desc.getName());
来看HMaster的create table RCP接口
1.构造CreateTableHandler
1.1 等待META表就位,如果就位,则获取META表第一个region的location,并建立RPC连接
public ServerName waitForMeta(long timeout) throws InterruptedException, IOException, NotAllMetaRegionsOnlineException { long stop = System.currentTimeMillis() + timeout; long waitTime = Math.min(50, timeout); synchronized (metaAvailable) { while(!stopped && (timeout == 0 || System.currentTimeMillis() < stop)) { if (getMetaServerConnection() != null) { return metaLocation; } // perhaps -ROOT- region isn't available, let us wait a bit and retry. metaAvailable.wait(waitTime); } if (getMetaServerConnection() == null) { throw new NotAllMetaRegionsOnlineException("Timed out (" + timeout + "ms)"); } return metaLocation; } }
1.2 判断t1表是否已存在
1.3 创建并设置t1表在zk中的节点状态为‘enabling’,节点路径/hbase/table/t1
private void setTableState(final String tableName, final TableState state) throws KeeperException { String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName); if (ZKUtil.checkExists(this.watcher, znode) == -1) { ZKUtil.createAndFailSilent(this.watcher, znode); } synchronized (this.cache) { ZKUtil.setData(this.watcher, znode, Bytes.toBytes(state.toString())); this.cache.put(tableName, state); } }
2.异步提交CreateTableHandler
this.executorService.submit(new CreateTableHandler(this, this.fileSystemManager, this.serverManager, hTableDescriptor, conf, newRegions, catalogTracker, assignmentManager));
3.CreateTableHandler运行
3.1 将table的元信息写入HDFS下的.tableinfo文件中,文件目录/new/t1/.tableinfo.0000000001。
private static Path writeTableDescriptor(final FileSystem fs, final HTableDescriptor hTableDescriptor, final Path tableDir, final FileStatus status) throws IOException { // Get temporary dir into which we'll first write a file to avoid // half-written file phenomeon. //先写tmp目录 Path tmpTableDir = new Path(tableDir, ".tmp"); //顺序号,从0开始 int currentSequenceid = status == null? 0: getTableInfoSequenceid(status.getPath()); int sequenceid = currentSequenceid; // Put arbitrary upperbound on how often we retry int retries = 10; int retrymax = currentSequenceid + retries; Path tableInfoPath = null; do { sequenceid += 1; //HDFS文件名,类是.tableinfo.0000000001 Path p = getTableInfoFileName(tmpTableDir, sequenceid); if (fs.exists(p)) { LOG.debug(p + " exists; retrying up to " + retries + " times"); continue; } try { //写内容 writeHTD(fs, p, hTableDescriptor); tableInfoPath = getTableInfoFileName(tableDir, sequenceid); //重命名成最终文件 if (!fs.rename(p, tableInfoPath)) { throw new IOException("Failed rename of " + p + " to " + tableInfoPath); } } ....... break; } while (sequenceid < retrymax); return tableInfoPath; }
3.2 创建region
HRegion region = HRegion.createHRegion(newRegion, this.fileSystemManager.getRootDir(), this.conf, this.hTableDescriptor, null, false, true);
3.3 META表新增记录,写入regioninfo列信息
private static Put makePutFromRegionInfo(HRegionInfo regionInfo) throws IOException { Put put = new Put(regionInfo.getRegionName()); put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(regionInfo)); return put; }
3.4 close region
3.5 从zk获取活着的region server
//从/hbase/rs下获取并过滤掉那些dead的机器 List<ServerName> servers = serverManager.getOnlineServersList(); // Remove the deadNotExpired servers from the server list. assignmentManager.removeDeadNotExpiredServers(servers);
3.6 region分配,默认随机均匀分配,使用多线程批量分配,业务线程等待直到所有region都分配成功,详细的分配过程将在下一篇介绍
this.assignmentManager.assignUserRegions(Arrays.asList(newRegions), servers);
3.7 设置t1表在zk中的节点状态为‘enabled’,节点路径/hbase/table/t1
小节
create table主要涉及,table元数据写入,region分配,zk状态信息修改,meta表修改和检查。
相关推荐
了解这些基本概念和操作后,我们可以通过阅读给定的“HBase Create Table.html”文件获取更具体的实现细节和示例。这个文件可能会包含如何使用HBase Shell或Java API创建表的步骤,以及一些创建表时的注意事项和最佳...
hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName)); } secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName); } catch (Exception ...
CREATE TABLE hbase_hfile_table(key int, name string, age String) STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat' ...
Table hbaseTable = hbaseConn.getTable(TableName.valueOf("user_data")); while (rs.next()) { int id = rs.getInt("id"); String name = rs.getString("name"); String email = rs.getString("email"); ...
4. `--hbase-create-table`:如果HBase中的目标表和列族不存在,加上此参数, Sqoop会在运行时自动创建。若未指定,任务会因找不到表而失败,因此在导入前需要确保HBase表和列族已创建。 在执行导入步骤前,先创建...
import org.apache.hadoop.hbase.client.Table import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.Get import org.apache.hadoop.hbase.util.Bytes ``` 2. 配置HBase连接: ...
CREATE TABLE hive_hbase_table(key int, value string, name string) STORED BY 'org.apache.hadoop.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val,cf2:val") ...
"zookeeper_server_ip"应替换为实际的Zookeeper服务器IP地址,"your_table_name"应替换为你要操作的HBase表名,"row_key"是你想要读取的行键。 接下来,我们要将读取到的HBase数据写入HDFS。Hadoop提供了`...
首先需要创建一个HBaseConfiguration对象,然后使用HBaseAdmin类的createTable方法创建表。 ```java public static void create(String tablename, String columnFamily) throws Exception { HBaseAdmin admin = ...
table = connection.create_table('my_table', {'cf': dict()}) ``` 这里 `'cf'` 是列族(Column Family)的名称,你可以根据需要添加多个列族。 4. **写入数据**:向表中插入数据: ```python row_key = 'row...
第二步,结合索引定义和 DataTable Region 的 StartKey 信息,调用 HBaseAdmin 的 createTable(final HTableDescriptor desc, byte[][] splitKeys) 方法创建索引表。 知识点三:IndexTable RowKey 的设计 Index...
hbase(main):001:0> create 'my_table', 'cf1' hbase(main):002:0> put 'my_table', 'row1', 'cf1:col1', 'value1' hbase(main):003:0> get 'my_table', 'row1' ``` 9. **监控与维护**:可以使用HBase提供的...
1. 表(Table):HBase中的表是由行和列族组成的二维表格,类似于关系型数据库的表,但结构更为灵活。 2. 行(Row):行由行键(Row Key)唯一标识,是数据的访问入口。行键是字节序列,可以自定义排序规则。 3. ...
使用`ConnectionFactory.createConnection(Configuration config)`创建与HBase的连接。配置对象`Configuration`需要设置HBase的地址、Zookeeper节点等信息。 2. **打开表**: 获取表实例:`Table table = ...
conn.create_table(table_name, families) # 创建表,families是列族名 def put_data(conn, table_name, row_key, column_family, column, value): table = conn.table(table_name) table.put(row_key, {(column...
在本文中,作者使用了HBaseConfiguration.create()方法来创建配置对象,并设置了"hbase.zookeeper.quorum"和"hbase.zookeeper.property.clientPort"两个参数。 知识点2:HTablePool的使用 HTablePool是HBase中的一...
admin.createTable(tableDesc); } ``` 读取本地文件是通过Java的`FileInputStream`和`BufferedReader`实现的。这里假设文件是文本文件,每行代表一个记录: ```java File file = new File("path_to_your_file.txt...
1. **连接HBase**:在Java代码中,我们使用`HBaseConfiguration.create()`来创建一个配置对象,然后可以设置各种配置参数,如Zookeeper的地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)...
这通常通过`HBaseConfiguration.create()`方法创建一个配置对象,然后设置相关配置,如Zookeeper地址(`HBASE_ZOOKEEPER_QUORUM`),端口(`HBASE_ZOOKEEPER_CLIENT_PORT`)等。接着,使用`ConnectionFactory.create...
`HBaseConfiguration.create()`创建配置对象,设置HBase的根目录,然后通过`ConnectionFactory.createConnection(configuration)`建立到HBase的连接。之后,调用`Admin`接口的`listTables()`方法获取表描述符数组,...