`

源码解读--(3)hbase-examples MultiThreadedClientExample

阅读更多
源码解读--(1)hbase客户端源代码 http://aperise.iteye.com/blog/2372350
源码解读--(2)hbase-examples BufferedMutator Example http://aperise.iteye.com/blog/2372505
源码解读--(3)hbase-examples MultiThreadedClientExample http://aperise.iteye.com/blog/2372534

1.轻量级的table,重量级的connection

    hbase-examples里面提供的另外一个例子MultiThreadedClientExample,讲解了另外一种使用hbase客户端的例子,在这个例子里面table是一个轻量级的对象,在线程启动时创建退出时销毁,而table后面的connection从未关闭过,connection是重量级的对象,一直维持着和zookeeper的链接、异步操作和其他状态,我们可以从中学习到另外一种多线程操作hbase客户端的例子

 

2.MultiThreadedClientExample

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
 * 本例用于展示在多线程中操作hbase客户端
 * 本例中table是一个轻量级的对象,在线程启动时创建退出时销毁,而table后面的connection从未关闭过
 * 本例中connection是重量级的对象,一直维持着和zookeeper的链接、异步操作和其他状态
 * 本例中模拟向hbase服务端提交500000次请求(其中30% 批量写,20%单条写,50%用于scans )
 *
 */
public class MultiThreadedClientExample extends Configured implements Tool {
    private static final Log LOG = LogFactory.getLog(MultiThreadedClientExample.class);
    private static final int DEFAULT_NUM_OPERATIONS = 500000;

    //默认测试的是hbase的数据表test列簇d
    private static final byte[] FAMILY = Bytes.toBytes("d");
    private static final byte[] QUAL = Bytes.toBytes("test");

    private final ExecutorService internalPool;//线程池
    private final int threads;//线程池大小

    public MultiThreadedClientExample() throws IOException {
        // Runtime.getRuntime().availableProcessors() 为当前机器CPU核数,这里取CPU核数* 4
        this.threads = Runtime.getRuntime().availableProcessors() * 4;

        // 这里调用google的guava-12.0.0.1.jar的ThreadFactoryBuilder,默认创建的是Executors.defaultThreadFactory(),创建的是后台线程工厂类,规范化了线程的名称
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("internal-pol-%d").build();
        //初始化线程池
        this.internalPool = Executors.newFixedThreadPool(threads, threadFactory);
    }

    @Override
    public int run(String[] args) throws Exception {
    	//参数个数只能为2个,第一个为表名,第二个为操作的次数
        if (args.length < 1 || args.length > 2) {
            System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]");
            return -1;
        }

        final TableName tableName = TableName.valueOf(args[0]);//如果传入了表名,就使用传入的hbase表名
        int numOperations = DEFAULT_NUM_OPERATIONS;
        if (args.length == 2) {
            numOperations = Integer.parseInt(args[1]);//如果传入了操作次数,就使用传入的操作次数
        }

        //Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
        //这里ForkJoinPool相继传入org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation、org.apache.hadoop.hbase.client.HTable、org.apache.hadoop.hbase.client.AsyncProcess使用
        ExecutorService service = new ForkJoinPool(threads * 2);

        // 为写操作创建一个单独的链接writeConnection
        final Connection writeConnection = ConnectionFactory.createConnection(getConf(), service);
        // 为读操作创建一个单独的链接readConnection
        final Connection readConnection = ConnectionFactory.createConnection(getConf(), service);

        // hbase 表tableName的region信息加载到cache
        // 这个操作在region个数超过250000个时不要操作
        warmUpConnectionCache(readConnection, tableName);
        warmUpConnectionCache(writeConnection, tableName);

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(numOperations);
        for (int i = 0; i < numOperations; i++) {
        	//生成线程安全的随机浮点数r
            double r = ThreadLocalRandom.current().nextDouble();
            Future<Boolean> f;
            
            if (r < .30) {//30% 批量写
                f = internalPool.submit(new WriteExampleCallable(writeConnection, tableName));
            } else if (r < .50) {//20%单条写
                f = internalPool.submit(new SingleWriteExampleCallable(writeConnection, tableName));
            } else {//50%用于scans
                f = internalPool.submit(new ReadExampleCallable(writeConnection, tableName));
            }
            futures.add(f);
        }

        // 等待每个操作完成,如果没完成,等待10分钟
        for (Future<Boolean> f : futures) {
            f.get(10, TimeUnit.MINUTES);
        }

        // 关闭线程池internalPool和service
        internalPool.shutdownNow();
        service.shutdownNow();
        return 0;
    }
    
    // hbase 表tableName的region信息加载到cache
    // 这个操作在region个数超过250000个时不要操作
    private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException {
        try (RegionLocator locator = connection.getRegionLocator(tn)) {
            LOG.info("Warmed up region location cache for " + tn + " got " + locator.getAllRegionLocations().size());
        }
    }

    /**
     * 30% 批量写任务
     */
    public static class WriteExampleCallable implements Callable<Boolean> {
        private final Connection connection;
        private final TableName tableName;

        public WriteExampleCallable(Connection connection, TableName tableName) {
            this.connection = connection;
            this.tableName = tableName;
        }

        @Override
        public Boolean call() throws Exception {
            // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
        	//这里利用jdk1.7里的新特性try(必须实现java.io.Closeable的对象){}catch (Exception e) {}  
        	//相当于调用了finally功能,调用(必须实现java.io.Closeable的对象)的close()方法,也即会调用table.close()
            try (Table t = connection.getTable(tableName)) {
                byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
                int rows = 30;

                // Array to put the batch
                ArrayList<Put> puts = new ArrayList<>(rows);
                for (int i = 0; i < 30; i++) {
                    byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
                    Put p = new Put(rk);
                    p.addImmutable(FAMILY, QUAL, value);
                    puts.add(p);
                }

                // 批量提交到hbase服务端
                t.put(puts);
            }
            return true;
        }
    }

    /**
     * 20%单条写任务
     */
    public static class SingleWriteExampleCallable implements Callable<Boolean> {
        private final Connection connection;
        private final TableName tableName;

        public SingleWriteExampleCallable(Connection connection, TableName tableName) {
            this.connection = connection;
            this.tableName = tableName;
        }

        @Override
        public Boolean call() throws Exception {
            // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
        	//这里利用jdk1.7里的新特性try(必须实现java.io.Closeable的对象){}catch (Exception e) {}  
        	//相当于调用了finally功能,调用(必须实现java.io.Closeable的对象)的close()方法,也即会调用table.close()
            try (Table t = connection.getTable(tableName)) {
                byte[] value = Bytes.toBytes(Double.toString(ThreadLocalRandom.current().nextDouble()));
                byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
                Put p = new Put(rk);
                p.addImmutable(FAMILY, QUAL, value);
                t.put(p);
            }
            return true;
        }
    }

    /**
     * 50%用于scans
     */
    public static class ReadExampleCallable implements Callable<Boolean> {
        private final Connection connection;
        private final TableName tableName;

        public ReadExampleCallable(Connection connection, TableName tableName) {
            this.connection = connection;
            this.tableName = tableName;
        }

        @Override
        public Boolean call() throws Exception {
            // total length in bytes of all read rows.
            int result = 0;

            // Number of rows the scan will read before being considered done.
            int toRead = 100;
            try (Table t = connection.getTable(tableName)) {
            	//要朝找的rowkey的起始值
                byte[] rk = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
                Scan s = new Scan(rk);

                //设置scan的filter为KeyOnlyFilter,意思是scan比较的时候只着重比较rowkey
                s.setFilter(new KeyOnlyFilter());

                //每次只取20条数据
                s.setCaching(20);

                //设置hbase不适用缓存,缓存是为了下次取这些数据更快,就把之前的数据放置到hbase服务端的blockcache
                s.setCacheBlocks(false);

                // https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
            	//这里利用jdk1.7里的新特性try(必须实现java.io.Closeable的对象){}catch (Exception e) {}  
            	//相当于调用了finally功能,调用(必须实现java.io.Closeable的对象)的close()方法,也即会调用ResultScanner.close()
                try (ResultScanner rs = t.getScanner(s)) {
                    // 遍历hbase的行
                    for (Result r : rs) {
                        result += r.getRow().length;
                        toRead -= 1;

                        // 只取100条数据,达到100条就退出
                        if (toRead <= 0) {
                            break;
                        }
                    }
                }
            }
            return result > 0;
        }
    }

    public static void main(String[] args) throws Exception {
    	//调用工具类ToolRunner执行实现了接口Tool的对象MultiThreadedClientExample的run方法,同时会把String[] args传入MultiThreadedClientExample的run方法  
        ToolRunner.run(new MultiThreadedClientExample(), args);
    }
}

 

分享到:
评论

相关推荐

    phoenix-core-4.7.0-HBase-1.1-API文档-中文版.zip

    赠送jar包:phoenix-core-4.7.0-HBase-1.1.jar; 赠送原API文档:phoenix-core-4.7.0-HBase-1.1-javadoc.jar; 赠送源代码:phoenix-core-4.7.0-HBase-1.1-sources.jar; 赠送Maven依赖信息文件:phoenix-core-4.7.0...

    apache-phoenix-4.14.3-HBase-1.3-bin.tar.gz

    这个"apache-phoenix-4.14.3-HBase-1.3-bin.tar.gz"文件是Phoenix的特定版本,针对HBase 1.3构建的二进制发行版。 1. **Apache Phoenix架构**:Phoenix主要由四部分组成:SQL解析器、元数据存储、优化器和执行器。...

    hbase-meta-repair-hbase-2.0.2.jar

    HBase 元数据修复工具包。 ①修改 jar 包中的application.properties,重点是 zookeeper.address、zookeeper.nodeParent、hdfs....③开始修复 `java -jar -Drepair.tableName=表名 hbase-meta-repair-hbase-2.0.2.jar`

    spring-boot-starter-hbase自定义的spring-boot的hbasestarter

    《Spring Boot Starter HBase:构建高效HBase操作的利器》 在Java开发中,Spring Boot以其简洁、高效的特性,已经成为Web开发框架的首选之一。它通过自动配置和启动器(starter)来简化应用的搭建与运行。而当我们...

    apache-phoenix-5.0.0-HBase-2.0-bin.tar.gz

    Apache Phoenix是构建在HBase之上的关系型数据库层,作为内嵌的客户端JDBC驱动用以对HBase中的数据进行低延迟访问。Apache Phoenix会将用户编写的sql查询编译为一系列的scan操作,最终产生通用的JDBC结果集返回给...

    apache-phoenix-4.14.0-HBase-1.2-src.tar.gz

    在"apache-phoenix-4.14.0-HBase-1.2-src.tar.gz"这个压缩包中,我们主要会发现以下几个关键的知识点: 1. **Phoenix架构**:Phoenix采用了分层架构,包括客户端驱动、服务器端元数据服务、以及SQL编译器和执行器。...

    apache-phoenix-4.8.1-HBase-1.2-bin.tar.gz

    在标题"apache-phoenix-4.8.1-HBase-1.2-bin.tar.gz"中,我们可以看到这是Apache Phoenix的4.8.1版本,它兼容HBase的1.2版本。这个压缩包是二进制发行版,通常包含了运行Phoenix所需的全部文件,包括JAR包、配置文件...

    apache-kylin-3.0.2-bin-hbase1x.tar.gz

    3. **HBase集成**:此版本是针对HBase 1.x版本的,这意味着Kylin能够与HBase紧密集成,利用HBase的分布式存储和实时访问特性,为大数据提供强大的底层支持。 4. **SQL兼容性**:支持更多的SQL标准,使得Kylin能更好...

    phoenix-core-4.7.0-HBase-1.1-API文档-中英对照版.zip

    赠送jar包:phoenix-core-4.7.0-HBase-1.1.jar; 赠送原API文档:phoenix-core-4.7.0-HBase-1.1-javadoc.jar; 赠送源代码:phoenix-core-4.7.0-HBase-1.1-sources.jar; 赠送Maven依赖信息文件:phoenix-core-4.7.0...

    apache-phoenix-4.8.1-HBase-0.98-bin.tar

    在您提供的压缩包 "apache-phoenix-4.8.1-HBase-0.98-bin.tar" 中,包含了 Apache Phoenix 的特定版本,即 4.8.1,针对 HBase 0.98 版本。 **Apache Phoenix 知识点:** 1. **SQL on HBase**: Phoenix 使得非关系...

    apache-atlas-2.0.0-hbase-hook.tar.gz

    3. **配置HBase**:在HBase的配置文件(如hbase-site.xml)中,也需要进行相应的配置,比如启用HBase的 Coprocessor,这使得Apache Atlas能够通过HBase的API获取和更新元数据。 4. **启动和测试**:在完成上述配置...

    spring-boot-starter-hbase.zip

    3. **模板操作**:通过Spring的Template机制,提供了易于使用的API来执行常见的HBase操作,如增删改查等,简化了对HBase的操作。 4. **数据映射**:支持将Java对象直接映射到HBase的表和行键,使得可以使用面向对象...

    apache-atlas-2.1.0-hbase-hook.tar.gz

    Apache Atlas 2.1.0 版本引入了对HBase的集成支持,也就是我们这里提到的 "apache-atlas-2.1.0-hbase-hook",这是一个针对HBase数据库的Hook组件。这个tar.gz压缩包是为在CDH6.3.1(Cloudera Data Hub 6.3.1)平台上...

    小牛学堂-大数据24期-04-Hadoop Hive Hbase Flume Sqoop-12天适合初学者

    小牛学堂-大数据24期-04-Hadoop Hive Hbase Flume Sqoop-12天适合初学者.txt

    apache-phoenix-4.10.0-HBase-1.1-bin.tar.gz

    这个名为"apache-phoenix-4.10.0-HBase-1.1-bin.tar.gz"的压缩包包含了Phoenix 4.10.0版本,专为运行在HBase 1.1版本上的系统设计。通过使用Phoenix,开发人员可以利用SQL语法来操作存储在HBase中的数据,从而简化了...

    apache-phoenix-4.8.1-HBase-0.98-src.tar.gz

    "apache-phoenix-4.8.1-HBase-0.98-src.tar.gz" 是Phoenix项目的4.8.1版本,针对HBase 0.98的源代码包。 在这款源码包中,开发者和研究人员可以深入理解Phoenix的工作原理,以及如何自定义和优化其功能。以下是这个...

    spring-boot-starter-hbase:Spring Boot Starter HBase的

    自定义的spring-boot的hbase starter,为hbase的query和更新等操作提供简易的api并集成spring-boot的auto configuration 版本 本项目版本 hbase版本 1.0.0 hbase1.1.2 打包 修改相关的maven私服地址 gradle clean ...

    phoenix-core-4.8.1-HBase-1.2.jar

    hbase上面二级索引组件 phoenix jar包 对应版本 hbase1.2 phoenix4.8

    phoenix-5.0.0-HBase-2.0-client

    "phoenix-5.0.0-HBase-2.0-client" 是一个针对Apache HBase数据库的Phoenix客户端库,主要用于通过SQL查询语句与HBase进行交互。这个版本的Phoenix客户端是为HBase 2.0版本设计和优化的,确保了与该版本HBase的兼容...

Global site tag (gtag.js) - Google Analytics