一、目的
了解hbase的都知道,由于hbase基于行健有序存储,在查询时使用行健十分高效,然后想要实现关系型数据库那样可以随意组合的多条件查询、查询总记录数、分页等就比较麻烦了。想要实现这样的功能,我们可以采用两种方法:
- 使用hbase提供的filter,
- 自己实现二级索引,通过二级索引查询多符合条件的行健,然后再查询hbase。
第一种方法不多说了,使用起来很方便,但是局限性也很大,hbase的filter是直接扫记录的,如果数据范围很大,会导致查询速度很慢。所以如果能先使用行健把记录缩小到一个较小范围,那么就比较适合,否则就不适用了。此外该方法不能解决获取总数的为。
第二种是适用范围就比较广泛了,不过根据实现二级索引的方式解决的问题也不同。这里我们选择solr主要是因为solr可以很轻松实现各种查询(本来就是全文检索引擎)。
二、实现方法
其实hbase结合solr实现方法还是比较简单的,重点在于一些实现细节上。将hbase记录写入solr的关键就在于hbase提供的 Coprocessor,Coprocessor提供了两个实现:endpoint和observer,endpoint相当于关系型数据库的存储过程, 而observer则相当于触发器。说到这相信大家应该就明白了,我们要利用的就是observer。observer允许我们在记录put前后做一些处 理,而我们就是通过postPut将记录同步写入solr(关于Coprocessor具体内容请自行查资料)。
而写入solr这块就比较简单了,如果是单机就使用ConcurrentUpdateSolrServer,如果是集群就是用 CloudSolrServer。不过这里需要注意的是由于CloudSolrServer不像ConcurrentUpdateSolrServer那 样内置缓存,默认情况下hbase没写一条数据就会向solr提交一次,这样速度会非常慢(很可能hbase写完很久solr这边还在提交),因此要自己 实现一个缓存池,根据hbase的写入速度动态调整,并批量向solr提交。
三、实现代码
实现方法弄清处置后代码就很容易写了。首先看下Coprocessor的代码:
import com.uboxol.model.VmMoney;
import com.uboxol.solr.SolrWriter;
import java.io.IOException;
/**
* Created with IntelliJ IDEA.
* User: guojing
* Date: 14-10-24
* Time: 上午11:08
* To change this template use File | Settings | File Templates.
*/
public class SolrIndexCoprocessorObserver extends BaseRegionObserver {
private static Logger log = Logger.getLogger(SolrIndexCoprocessorObserver.class);
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
String rowKey = Bytes.toString(put.getRow());
try {
Cell cellInnerCode = put.get(Bytes.toBytes("data"), Bytes.toBytes("inner_code")).get(0);
String innerCode = new String(CellUtil.cloneValue(cellInnerCode));
Cell cellNodeId = put.get(Bytes.toBytes("data"), Bytes.toBytes("node_id")).get(0);
String nodeId = new String(CellUtil.cloneValue(cellNodeId));
Cell cellPayType = put.get(Bytes.toBytes("data"), Bytes.toBytes("pay_type")).get(0);
String payType = new String(CellUtil.cloneValue(cellPayType));
Cell cellCts = put.get(Bytes.toBytes("data"), Bytes.toBytes("cts")).get(0);
String cts = new String(CellUtil.cloneValue(cellCts));
Cell cellTraSeq = put.get(Bytes.toBytes("data"), Bytes.toBytes("tra_seq")).get(0);
String traSeq = new String(CellUtil.cloneValue(cellTraSeq));
cts=cts.replace("-","");
cts=cts.replace(" ","");
cts=cts.replace(":","");
VmMoney vm = new VmMoney();
vm.setCts(cts);
vm.setId(new Integer(id));
vm.setInnerCode(innerCode);
vm.setNodeId(new Integer(nodeId));
vm.setPayType(new Integer(payType));
vm.setRowKey(rowKey);
vm.setTraSeq(traSeq);
SolrWriter so = new SolrWriter();
so.addDocToCache(vm);
} catch (Exception ex){
log.info("write "+rowKey+" to solr fail:"+ex.getMessage());
ex.printStackTrace();
}
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
String rowKey = Bytes.toString(delete.getRow());
try {
SolrWriter so = new SolrWriter();
so.deleteDoc(rowKey);
} catch (Exception ex){
log.info("delete "+rowKey+" from solr fail:"+ex.getMessage());
ex.printStackTrace();
}
}
}
里边代码很简单,就是在hbase记录写入后和删除后调用SolrWriter进行处理。下边看下SolrWriter的实现:
import com.uboxol.model.VmMoney;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SolrWriter {
private static Logger log = Logger.getLogger(SolrWriter.class);
public static String urlSolr = ""; //solr地址
private static String defaultCollection = ""; //默认collection
private static int zkClientTimeOut =0 ;//zk客户端请求超时间
private static int zkConnectTimeOut =0;//zk客户端连接超时间
private static CloudSolrServer solrserver = null;
private static int maxCacheCount = 0; //缓存大小,当达到该上限时提交
private static Vector<VmMoney> cache = null; //缓存
public static Lock commitLock =new ReentrantLock(); //在添加缓存或进行提交时加锁
private static int maxCommitTime = 60; //最大提交时间,s
static {
Configuration conf = HBaseConfiguration.create();
urlSolr = conf.get("hbase.solr.zklist", "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181");
defaultCollection = conf.get("hbase.solr.collection","collection1");
zkClientTimeOut = conf.getInt("hbase.solr.zkClientTimeOut", 10000);
zkConnectTimeOut = conf.getInt("hbase.solr.zkConnectTimeOut", 10000);
maxCacheCount = conf.getInt("hbase.solr.maxCacheCount", 10000);
maxCommitTime = conf.getInt("hbase.solr.maxCommitTime", 60*5);
log.info("solr init param"+urlSolr+" "+defaultCollection+" "+zkClientTimeOut+" "+zkConnectTimeOut+" "+maxCacheCount+" "+maxCommitTime);
try {
cache=new Vector<VmMoney>(maxCacheCount);
solrserver = new CloudSolrServer(urlSolr);
solrserver.setDefaultCollection(defaultCollection);
solrserver.setZkClientTimeout(zkClientTimeOut);
solrserver.setZkConnectTimeout(zkConnectTimeOut);
//启动定时任务,第一次延迟10执行,之后每隔指定时间执行一次
Timer timer=new Timer();
timer.schedule(new CommitTimer(),10*1000,maxCommitTime*1000);
} catch (Exception ex){
ex.printStackTrace();
}
}
/**
* 批量提交
*/
public void inputDoc(List<VmMoney> vmMoneyList) throws IOException, SolrServerException {
if (vmMoneyList == null || vmMoneyList.size() == 0) {
return;
}
List<SolrInputDocument> doclist= new ArrayList<SolrInputDocument>(vmMoneyList.size());
for (VmMoney vm : vmMoneyList) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", vm.getId());
doc.addField("node_id", vm.getNodeId());
doc.addField("inner_code", vm.getInnerCode());
doc.addField("pay_type", vm.getPayType());
doc.addField("rowkey", vm.getRowKey());
doc.addField("cts", vm.getCts());
doc.addField("tra_seq", vm.getTraSeq());
doclist.add(doc);
}
solrserver.add(doclist);
}
/**
* 单条提交
*/
public void inputDoc(VmMoney vmMoney) throws IOException, SolrServerException {
if (vmMoney == null) {
return;
}
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", vmMoney.getId());
doc.addField("node_id", vmMoney.getNodeId());
doc.addField("inner_code", vmMoney.getInnerCode());
doc.addField("pay_type", vmMoney.getPayType());
doc.addField("rowkey", vmMoney.getRowKey());
doc.addField("cts", vmMoney.getCts());
doc.addField("tra_seq", vmMoney.getTraSeq());
solrserver.add(doc);
}
public void deleteDoc(List<String> rowkeys) throws IOException, SolrServerException {
if (rowkeys == null || rowkeys.size() == 0) {
return;
}
solrserver.deleteById(rowkeys);
}
public void deleteDoc(String rowkey) throws IOException, SolrServerException {
solrserver.deleteById(rowkey);
}
/**
* 添加记录到cache,如果cache达到maxCacheCount,则提交
*/
public static void addDocToCache(VmMoney vmMoney) {
commitLock.lock();
try {
cache.add(vmMoney);
log.info("cache commit maxCacheCount:"+maxCacheCount);
if (cache.size() >= maxCacheCount) {
log.info("cache commit count:"+cache.size());
new SolrWriter().inputDoc(cache);
cache.clear();
}
} catch (Exception ex) {
log.info(ex.getMessage());
} finally {
commitLock.unlock();
}
}
/**
* 提交定时器
*/
static class CommitTimer extends TimerTask {
@Override
public void run() {
commitLock.lock();
try {
if (cache.size() > 0) { //大于0则提交
log.info("timer commit count:"+cache.size());
new SolrWriter().inputDoc(cache);
cache.clear();
}
} catch (Exception ex) {
log.info(ex.getMessage());
} finally {
commitLock.unlock();
}
}
}
}
SolrWriter 的重点就在于addDocToCache方法和定时器CommitTimer,addDocToCache会在hbase每次插入数据时将记录插入缓存, 并且判断是否达到上限,如果达到则将缓存内所用数据提交到solr,此外CommitTimer 则会每隔一段时间提交一次,以保证缓存内所有数据最终写入solr。
其他一些辅助代码就不贴了,可自行到github查看: hbase-solr-coprocessor (代码仅作参考,由于业务不同不能直接运行)
四、部署
这里重点说下hbase的Coprocessor部署的一些问题。部署步骤如下:
- 将Coprocessor代码打成jar包,拷贝到所有hbase的region server上,注意jdk一定要1.6,高版本可能会导致无法加载
- 将hbase的hbase.coprocessor.abortonerror设置成true,待确定Coprocessor运行正常后在改为false。此步骤非必要,但是如果Coprocessor有问题会导致所有region无法启动
- 由于我们实现的Coprocessor是region级的,所以不需要启动,直接通过hbase shell即可加载: disable 'tablename'
alter 'tablename',METHOD => 'table_att','coprocessor'=>'jar包路径,本地使用file:///开头,hdfs上的则用hdfs:///开头|1001|参数,多个逗号隔开'
enable 'tablename'
五、总结
这次hbase+solr的部署前后花了不少时间,其实理论方面都很简单,让人感觉轻而易举,但是实际实现的过程中就会遇到不少问题,就比如写入缓存之类的,如果不去测试,就很容易被忽略。
装载:http://itindex.net/detail/51766-solr-hbase-%E7%B4%A2%E5%BC%95
相关推荐
CDH 使用 Solr 实现 HBase 二级索引 在大数据处理中,HBase 是一种流行的 NoSQL 数据库,用于存储大量的数据。然而,在查询和检索数据时,HBase 的性能可能不太理想。这是因为 HBase 是基于 Key-Value 的存储方式,...
### hbase+solr创建二级索引完整操作 #### 一、概述 本文档详细介绍了如何利用HBase和Solr创建二级索引的过程...通过以上步骤,您可以成功地在HBase与Solr之间创建二级索引,提升数据检索效率,实现更高效的查询体验。
测试代码,目的是借助solr实现hbase二级索引,以使hbase支持高效的多条件查询。主要通过hbase的coprocessor的Observer实现,通过coprocessor在记录插入hbase时向solr中创建索引。 项目核心为...
描述中提到的"hbase-transactional-tableindexed-master"可能是一个包含实现HBase二级索引和事务处理的项目或代码库。在HBase中,事务支持是相对有限的,主要是因为其分布式和无中心的设计。为了实现事务,通常需要...
为了解决这个问题,我们可以结合HBase的Coprocessor机制和Solr来构建二级索引,实现更高效的查询性能。 首先,我们来理解一下“二级索引”。在HBase中,一级索引是基于行键(Row Key)的,它是默认的、快速的访问...
Solr 是一个开源的全文搜索引擎,而 HBase 是基于 Hadoop 的分布式数据库系统。这两者结合使用,可以构建强大的实时搜索和分析平台,尤其适合处理海量非结构化数据。 一、Apache Solr Solr 是 Lucene 项目的一部分...
通过引入Solr作为二级索引,我们可以利用Solr的全文搜索和多字段索引能力,实现对HBase数据的快速查询。 以下是构建二级索引的基本步骤: 1. **安装与配置**:在CDH集群中安装Solr和HBase,并进行相应的配置,确保...
hbaseSecondaryIndex是一个hbase二级索引的实现,基于solr+hbase coprocessor框架为hbase提供索引表支持。 此工程可打包为一个hbase的插件,通用方便。 这篇文档的目的是为了向您介绍hbaseSecondaryIndex的基本使用...
文章指出,部署这一通用方案后,实验结果表明,该方案能够很好地满足创建和维护HBase二级索引的要求,并且能够保证索引与记录的一致性。这表明,该方案在实践中具有很好的可行性和效果,对于进一步研究分布式存储中...
3. 内存索引机制的实现:基于协处理器的HBase内存索引机制的实现,包括如何快速构建二级索引和持久化操作。 4. 性能优化:基于协处理器的HBase内存索引机制的性能优化,包括如何提高HBase的查询效率和可用性。 应用...
课时31:数据层设计与实现之二级索引开发(续) 课时32:Spring集成Solrj之入门操作 课时33:Spring集成Solrj之高级操作 课时34:高亮查询功能开发之一 课时35:高亮查询功能开发之二 课时36:课程总结
标题中的"solr-8.6.3.tgz+hbase-2.3.3-bin.tar.gz"表明我们有两个重要的开源软件版本:Apache Solr 8.6.3和HBase 2.3.3。Solr是Apache软件基金会的一个项目,主要用于全文搜索、企业级搜索和大数据分析。而HBase则是...
#### 四、HBase二级索引设计 在HBase中,数据是以RowKey为主键进行排序的,这导致按非RowKey字段查询数据时效率较低。为了提高这类查询的性能,HBase引入了二级索引的概念。二级索引通常采用以下几种方式实现: 1....
当我们在HBase上构建二级索引时,通常会利用Solr来提升查询性能,特别是对于那些需要进行复杂查询和全文搜索的应用场景。 标题提到的"morphlines.conf"和"morphline-hbase-mapper.xml"是这两个关键步骤中的配置文件...
- **Phoenix**:Phoenix是在HBase上构建的一个SQL层,它可以在HBase之上建立二级索引,从而支持更复杂的查询需求。 2. **自建HBase索引**:除了引入外部的索引服务之外,还可以通过自建HBase索引来增强检索能力。...
HBase与Solr的集成主要基于SolrCloud模式,通过HBase的RegionServer与Solr的Zookeeper进行通信,实现数据的实时同步。在这样的架构下,HBase负责存储大量结构化数据,Solr则负责提供丰富的搜索功能。hbase-solr-rest...