`
luweimstr
  • 浏览: 19188 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hbase客户端.准备知识.excutorservice

阅读更多

在hbase客户端htable中批处理操作是通过ExecutorService实现的。ExecutorService类似于线程池,用户提交的put,delete等操作都被响应地创建了线程在ExecutorService中执行,并对各个操作的响应进行返回或异常处理。本文对ExecutorService进行初步介绍,作为hbase客户端代码学习的准备知识。

    通常我们会创建一个ExecutorService对象并向其中丢一些线程,然后就任由之执行。例如下面的例子1。

package java.ExecutorServiceStudy;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

class Reader implements Runnable {

	@Override
	public void run() {
		System.err.println(Thread.currentThread().getId());

	}

}

public class ExecutorServiceStudy {
	public static void main(String args[]) {
		int readThreads = 10;

		Reader[] readers = new Reader[readThreads];
		
		ExecutorService readPool = Executors.newFixedThreadPool(
				readThreads,
				new ThreadFactoryBuilder()
						.setNameFormat("ExecutorServiceStudy " + 1)
						.setDaemon(true).build());
		
		for (int i = 0; i < readThreads; ++i) {
			Reader reader = new Reader();
			readers[i] = reader;
			readPool.execute(reader);
		}
	}
}

 

然而,在hbase中我们如果只是将put,delete操作丢到线程池中任他执行是不够的。所以我们常常需要对各个线程的执行情况或者结果做处理。我们还可以向ExecutorService中丢(submit)一些Callable的对象,并且在submit的时候将其返回值Future记录先来,将来再处理。也就是我们要把握住各个线程的“未来”,而不是任由其发展。例如下面的例子2.

package java.ExecutorServiceStudy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * a thread pool where we can throw threads. Anyway, how can we get response/
 * results from each thread?
 * 
 * @author wlu 2012-08-09
 * 
 */
class MyCallable implements Callable<String> {
	String id = "";

	public MyCallable(String s) {
		id = s;
	}

	@Override
	public String call() throws Exception {
		int r = new Random().nextInt();
		r = Math.abs(r) % 10;
		for (int i = 0; i < 10000; i++) {
			for (int j = 0; j < r * 10000; j++)
				;
		}
		return "loop " + r + " X 10^8 times @ id = " + id;
	}

}

public class ExecutorServiceStudy2 {
	static int readThreads = 10;

	static ExecutorService pool = Executors.newFixedThreadPool(
			readThreads,
			new ThreadFactoryBuilder()
					.setNameFormat("IPC Reader %d on port " + 1)
					.setDaemon(true).build());

	static Map<String, Future> futures = new TreeMap<String, Future>();

	public static void main(String args[]) throws InterruptedException,
			ExecutionException {
		for (int i = 0; i < 10; i++) {
			String id = i + "";
			futures.put(id, pool.submit(new MyCallable(id)));
		}

		// tmp the keys in a list
		List<String> keys = new ArrayList<String>();

		for (String s : futures.keySet()) {
			keys.add(s);
		}

		int idx = 0;

		// poll the list and deal with result of finished thread
		while (!keys.isEmpty()) {
			Object ss = null;
			try {
				ss = futures.get(keys.get(idx)).get(5, TimeUnit.MILLISECONDS);
			} catch (TimeoutException e) {
				ss = null;
			}
			// not finished yet
			if (ss == null) {
				idx = (idx + 1) % keys.size();
				continue;
			}
			// finished, remove from the list
			keys.remove(idx);
			if (idx >= keys.size()) {
				idx = 0;
			}
			System.err.println(ss);
		}
	}
}

 

 可以发现,我们将每个线程打上标签,并把各自的标签和Future绑定在一起(存放在Map中)。在Future的get()函数执行时,会阻塞直到线程执行完成。在上面的例子里,我们对各个Future进行轮训。首先将它们存放在一个List中,轮训时对于没有执行完成的线程暂且跳过,对于已完成的线程则处理线程执行结果,并把它的Future从List中删除掉。get函数的参数是阻塞超时时间,也就是说如果在超时时间之内没有完成,则先跳过。

 

  这样我们就掌握了用ExecutorService执行多线程,并异步地处理各个线程的返回结果。上面例2的执行结果形如:

 

loop 0 X 10^8 times @ id = 2
loop 0 X 10^8 times @ id = 3
loop 3 X 10^8 times @ id = 8
loop 4 X 10^8 times @ id = 9
loop 5 X 10^8 times @ id = 0
loop 6 X 10^8 times @ id = 6
loop 9 X 10^8 times @ id = 4
loop 7 X 10^8 times @ id = 7
loop 7 X 10^8 times @ id = 1
loop 8 X 10^8 times @ id = 5

 

id小的线程先与id大的线程执行,从结果可以发现,线程执行时间开销决定了线程结果处理顺序。

0
1
分享到:
评论

相关推荐

    Pure-Go HBase 客户端.zip

    支持的版本HBase &gt;= 1.0安装go get github.com/tsuna/gohbase示例用法创建客户端client := gohbase.NewClient("localhost")插入单元格// Values maps a ColumnFamily -&gt; Qualifiers -&gt; Values.values := map[string]...

    hbase客户端连接工具winutils-2.2.0.zip

    5. **配置HBase配置文件**:创建一个名为`hbase-site.xml`的配置文件,包含HBase集群的地址(如`hbase.zookeeper.quorum`)和其他必要的参数,例如`hbase.master`和`hbase.rootdir`。 6. **编写Java代码**:使用...

    Hbase可视化客户端.rar

    HBase的可视化客户端是与HBase交互的一种工具,它提供了图形化的用户界面,使得用户可以更直观、便捷地进行数据的插入、查询、删除等操作,而无需编写复杂的命令行指令或编程代码。这样的客户端对于非开发人员或者...

    HBase2.x之RIT问题解决.docx

    HBase 2.x之RIT问题解决 HBase 2.x中的Region-In-Transition(RIT)机制是一种Region状态变迁机制,例如merge、split、assign、unassign等操作。在RIT过程中,可能会出现异常情况,从而导致Region的状态一直保持在...

    hbase-2.0.0.3.0.0.0-1634-bin.tar.gz

    1. 环境准备:确保你已安装Java开发环境(JDK)、Maven和Git。Ambari 2.7.x需要与HBase版本兼容,因此需确保所有组件的版本匹配。 2. 源码获取:从Apache官方网站或者通过Git克隆源码仓库,获取HBase 2.0.0.3.0.0.0...

    Hbase1.x可视化客户端工具

    在HBase 1.x版本中,常用的可视化客户端工具有Apache Ambari的HBase组件、HBase Shell增强版如Beeswax或HBase Console、以及第三方工具如HBAdmin、HBase Manager等。这些工具通常会提供详细的使用说明,帮助用户快速...

    基于Spring Boot和HBase的HBase客户端应用.zip

    # 基于Spring Boot和HBase的HBase客户端应用 ## 项目概述 本项目是一个基于Spring Boot框架的HBase客户端应用,旨在提供与HBase数据库的高效交互。通过集成Spring Boot的便捷性和HBase的高性能存储能力,该项目...

    hbase-site.xml.doc

    * hbase.master.logcleaner.plugins:org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner, org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner,这个参数指定了预写日志的清理插件。...

    nosql-实验三HBase管理工具和客户端.docx

    HBase 管理工具和客户端详解 HBase 是一种基于列式存储的 NoSQL 数据库,具有高性能、可扩展性强和灵活的 schema 设计等特点。在 HBase 中,有多种管理工具和客户端可以帮助用户更好地管理和操作 HBase 数据库。 ...

    hbase jar包.zip

    首先,hbase-client-2.2.4.jar是HBase客户端的核心库,它提供了与HBase服务器交互的API,包括数据的读写、扫描、行键操作等。这个版本的HBase客户端已经对HBase 2.2.4进行了优化,确保了与服务端的兼容性和性能。 ...

    hbase用于查询客户端工具

    在HBase环境中,有多种客户端工具可供开发人员和管理员使用来进行查询和其他操作。以下是一些主要的HBase查询客户端工具及其详细说明: 1. **HBase Shell**:这是HBase自带的一个命令行接口,用户可以通过Java REPL...

    第八章(Hadoop大数据处理实战)HBase实战.pdf

    第八章(Hadoop大数据处理实战)HBase实战.pdf第八章(Hadoop大数据处理实战)HBase实战.pdf第八章(Hadoop大数据处理实战)HBase实战.pdf第八章(Hadoop大数据处理实战)HBase实战.pdf第八章(Hadoop大数据处理实战)HBase...

    hadoop-2.7.5,hbase1.2.6.zookeeper.3.3.6完全分布式安装教程

    hadoop-2.7.5,hbase1.2.6.zookeeper.3.3.6完全分布式安装教程 hadoop-2.7.5,hbase1.2.6.zookeeper.3.3.6完全分布式安装教程 hadoop-2.7.5,hbase1.2.6.zookeeper.3.3.6完全分布式安装教程

    hbase资料_hbase-default.xml.zip

    6. **客户端配置**:`hbase.client.scanner.caching`设置每次从RegionServer获取的行数,影响客户端扫描性能。`hbase.client.retries.number`是客户端重试次数,决定在网络不稳定或服务器繁忙时的容错能力。 7. **...

    高性能HBase客户端AsynchronousHBase.zip

    asynchbase(Asynchronous HBase)是完全异步,非阻塞的,线程安全的,高性能的 HBase 客户端。asynchbase 是 Java 库使用 HBase 的替代品,要求一个完全异步,非阻塞,线程安全,高性能的 HBase API。这个 HBase ...

    hbase-0.90.5.tar.gz与hadoop0.20.2版本匹配

    HBase是Apache软件基金会开发的一个开源分布式数据库,它是基于Google的Bigtable模型设计的,用于存储大规模结构化数据。HBase构建在Hadoop之上,两者都是Apache Hadoop生态系统的重要组成部分。Hadoop是一个分布式...

    hbase-0.94.27.tar.gz

    在HBase 0.94.27中,我们可以探讨以下关键知识点: 1. **分布式架构**:HBase采用Master-Slave架构,由一个HMaster主节点负责区域服务器(RegionServer)的管理、故障检测和负载均衡,多个RegionServer存储实际的...

    hbase培训.ppt

    hbase培训.ppt

    HBaseClient:HBase客户端数据管理软件

    HBase客户端数据管理软件 概要说明 类似PL/SQL,是一个HBase数据库的客户端数据管理软件。是免费开源的软件。 基于XJava,使用xml配置文件绘制可视化界面。 可视化界面操作 表 表的定义、编辑、删除; 数据 数据的...

Global site tag (gtag.js) - Google Analytics