1、核心工具类
package junit; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.math.NumberUtils; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.queryParser.MultiFieldQueryParser; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.highlight.Formatter; import org.apache.lucene.search.highlight.Fragmenter; import org.apache.lucene.search.highlight.Highlighter; import org.apache.lucene.search.highlight.QueryScorer; import org.apache.lucene.search.highlight.SimpleFragmenter; import org.apache.lucene.search.highlight.SimpleHTMLFormatter; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; import org.junit.Test; import org.wltea.analyzer.lucene.IKAnalyzer; import com.ljq.entity.Person; import com.ljq.utils.HBaseAPI; import com.ljq.utils.LuceneUtil; import com.ljq.utils.XMLPropertyConfig; /** * 模拟功能:假如从互联网上采集数据,采集到的数据实时存储到HBase数据库上,<br/> * 现需把这些数据从HBase读取出来,并在本地创建lucene索引,<br/>方便前端通过索引进行查询,把查询取得的结果在界面上展现出来。<br/><br/> * * 要求该功能要7*24不间断执行,实时性要求非常高,故采用Strom实时计算框架为载体,实现上面的模拟功能。 * * @author 林计钦 * @version 1.0 2013-6-5 下午05:36:09 */ public class IndexCreationTest { private static StringBuffer ids=new StringBuffer(); private static IndexWriter indexWriter = LuceneUtil.getIndexWriter(); // 计算时间评估参数 private static long _initStartTime = 0; // 获取CAS consumer被实例化时的系统时间 private static long mTimeForCommit = 60; // 经过多久时间提交 // 计数器 private static int mIndexCommitBatchNum = 100; // 批量更新索引 private static int mDocCount = 0; /** * 从HBase实时读取数据,并在本地创建lucene索引 */ @Test public void createIndexFromHBase(){ readTable("1370422328578"); } /** * 查询索引 * @throws IOException */ @Test public void searchIndex() throws IOException{ for(int i=0;i<500;i++){ search(new String[]{"id", "name", "age", "rowKey"}, i+""); } // search(new String[]{"id", "name", "age", "rowKey"}, "0"); // search(new String[]{"id", "name", "age", "rowKey"}, "998"); //数据写入txt文件 BufferedWriter writer = new BufferedWriter(new FileWriter(new File("E:\\123.txt"))); writer.write(ids.toString()); writer.close(); } /** * 从HBase实时读取数据,并在本地创建lucene索引 * * @param startRow 指定开始的行(时间戳) */ public static void readTable(String startRow) { int i = 0; try { while (true) { // [{"timestamp":"1370422360734","id":"950","name":"lin950","age":"950","row":"1370422507578"}] System.out.println("startRow=" + startRow); List<Map<String, String>> datas = HBaseAPI.scan("tb_stu", null, String .valueOf(startRow), 100); i += datas.size(); System.out.println(i); if (datas != null && datas.size() > 0) { for (Map<String, String> data : datas) { String row = data.get(HBaseAPI.HBASE_ROW); startRow = row; // System.out.println(String.format("id:%s, name:%s, // age:%s, rowKey:%s.", // data.get("id"), data.get("name"), data.get("age"), // row)); createIndex(data.get("id"), data.get("name"), data.get("age"), row); } } if(isCommitTime()){ System.out.println("indexCommit"); try { indexWriter.commit(); // 批量提交 indexWriter.forceMerge(1); // forceMerge代替optimize System.out.println("indexWriter.commit();"); } catch (Exception e) { e.printStackTrace(); } System.out.println("indexCommit fin"); } } }catch(Exception e){ e.printStackTrace(); } } private static void createIndex(String id, String name, String age, String rowKey){ if (StringUtils.isBlank(id) || StringUtils.isBlank(name) || StringUtils.isBlank(age) || StringUtils.isBlank(rowKey)) { System.out.println(String.format("id:%s, name:%s, age:%s, rowKey:%s.", id, name, age, rowKey)); return; } try { Document doc = new Document(); doc.add(new Field("id", id, Field.Store.YES, Field.Index.NOT_ANALYZED)); doc.add(new Field("name", name, Field.Store.YES, Field.Index.ANALYZED)); doc.add(new Field("age", age, Field.Store.YES, Field.Index.NOT_ANALYZED)); doc.add(new Field("rowKey", rowKey, Field.Store.YES, Field.Index.NOT_ANALYZED)); //更新索引 if(LuceneUtil.existsIndex()){ System.out.println("---update index---"); indexWriter.updateDocument(new Term("id", id), doc); }else { //第一次创建索引 System.out.println("---create index---"); indexWriter.addDocument(doc); } } catch (Exception e) { e.printStackTrace(); } } /** * 判断是否可以提交索引 * @return */ private static synchronized boolean isCommitTime(){ //每隔1分钟提交一次 if((System.currentTimeMillis() - _initStartTime) >= (mTimeForCommit*1000)){ _initStartTime = System.currentTimeMillis(); return true; } //累加到100条提交一次 else if(mDocCount % mIndexCommitBatchNum == 0){ _initStartTime = System.currentTimeMillis(); return true ; } else return false; } /** * 搜索、高亮显示 * * @param fields * @param keyword */ private void search(String[] fields, String keyword) { IndexSearcher indexSearcher = null; try { // 创建索引搜索器,且只读 IndexReader indexReader = IndexReader.open(FSDirectory.open(new File(XMLPropertyConfig.getConfigXML() .getString("index_path"))), true); indexSearcher = new IndexSearcher(indexReader); MultiFieldQueryParser queryParser = new MultiFieldQueryParser(Version.LUCENE_35, fields, new IKAnalyzer()); Query query = queryParser.parse(keyword); // 返回前number条记录 TopDocs topDocs = indexSearcher.search(query, 1000); // 信息展示 int totalCount = topDocs.totalHits; //System.out.println("共检索出 " + totalCount + " 条记录"); // 高亮显示 /* * 创建高亮器,使搜索的结果高亮显示 SimpleHTMLFormatter:用来控制你要加亮的关键字的高亮方式 此类有2个构造方法 * :SimpleHTMLFormatter()默认的构造方法.加亮方式:<B>关键字</B> * :SimpleHTMLFormatter(String preTag, String * postTag).加亮方式:preTag关键字postTag */ Formatter formatter = new SimpleHTMLFormatter("<font color='red'>", "</font>"); /* * QueryScorer QueryScorer * 是内置的计分器。计分器的工作首先是将片段排序。QueryScorer使用的项是从用户输入的查询中得到的; * 它会从原始输入的单词、词组和布尔查询中提取项,并且基于相应的加权因子(boost factor)给它们加权。 * 为了便于QueryScoere使用,还必须对查询的原始形式进行重写。 比如,带通配符查询、模糊查询、前缀查询以及范围查询 * 等,都被重写为BoolenaQuery中所使用的项。 * 在将Query实例传递到QueryScorer之前,可以调用Query.rewrite * (IndexReader)方法来重写Query对象 */ QueryScorer fragmentScorer = new QueryScorer(query); Highlighter highlighter = new Highlighter(formatter, fragmentScorer); Fragmenter fragmenter = new SimpleFragmenter(100); /* * Highlighter利用Fragmenter将原始文本分割成多个片段。 * 内置的SimpleFragmenter将原始文本分割成相同大小的片段,片段默认的大小为100个字符。这个大小是可控制的。 */ highlighter.setTextFragmenter(fragmenter); ScoreDoc[] scoreDocs = topDocs.scoreDocs; for (ScoreDoc scDoc : scoreDocs) { Document document = indexSearcher.doc(scDoc.doc); String id = document.get("id"); String name = document.get("name"); String age = document.get("age"); String rowKey = document.get("rowKey"); float score = scDoc.score; //相似度 //高亮显示 String lighterName = highlighter.getBestFragment(new IKAnalyzer(), "name", name); if (null == lighterName) { lighterName = name; } String lighterAge = highlighter.getBestFragment(new IKAnalyzer(), "age", age); if (null == lighterAge) { lighterAge = age; } Person person = new Person(); person.setId(NumberUtils.toLong(id)); person.setName(lighterName); person.setAge(NumberUtils.toInt(age)); ids.append(id).append("\n\r"); System.out.println(String.format("id:%s, name:%s, age:%s, rowKey:%s, 相似度:%s.", id, lighterName, lighterAge, rowKey, score)); } } catch (Exception e) { e.printStackTrace(); } finally { try { indexSearcher.close(); } catch (IOException e) { e.printStackTrace(); } } } }
2、LuceneUtil类->Lucene工具类
/** * lucene工具类,采用IKAnalyzer中文分词器 * * @author 林计钦 * @version 1.0 2013-6-3 下午03:51:29 */ public class LuceneUtil { /** 索引库路径 */ private static final String indexPath = XMLPropertyConfig.getConfigXML() .getString("index_path"); private static final Logger log=Logger.getLogger(LuceneUtil.class); public static IndexWriter getIndexWriter(){ try { //索引库路径不存在则新建一个 File indexFile=new File(indexPath); if(!indexFile.exists()) indexFile.mkdir(); Directory fsDirectory = FSDirectory.open(indexFile); IndexWriterConfig confIndex = new IndexWriterConfig(Version.LUCENE_35, new IKAnalyzer()); confIndex.setOpenMode(OpenMode.CREATE_OR_APPEND); if (IndexWriter.isLocked(fsDirectory)) { IndexWriter.unlock(fsDirectory); } return new IndexWriter(fsDirectory, confIndex); } catch (Exception e) { e.printStackTrace(); } return null; } /** * 判断索引库是否已创建 * * @return true:存在,false:不存在 * @throws Exception */ public static boolean existsIndex() throws Exception { File file = new File(indexPath); if (!file.exists()) { file.mkdirs(); } String indexSufix = "/segments.gen"; // 根据索引文件segments.gen是否存在判断是否是第一次创建索引 File indexFile = new File(indexPath + indexSufix); return indexFile.exists(); } }
3、HBaseAPI类->HBase数据库封装类
/** * HBase数据库封装类 * * @author 林计钦 * @version 1.0 2013-6-4 上午11:02:17 */ public class HBaseAPI { /**主键*/ public static String HBASE_ROW = "row"; /**列镞*/ public static String HBASE_FAMILY = "family"; /**列名*/ public static String HBASE_QUALIFIER = "qualifier"; /**列值*/ public static String HBASE_QUALIFIERVALUE = "qualifiervalue"; /**时间戳*/ public static String HBASE_TIMESTAMP = "timestamp"; /** 访问HBase线程池大小 */ public static int poolSize = 1000; public static Configuration conf; private static HTablePool tablePool = null; private static final Logger log=Logger.getLogger(HBaseAPI.class); static { //来自$HBase/conf/hbase-site.xml配置文件 conf = new Configuration(); conf.set("hbase.master.port", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_master_port")); conf.set("hbase.zookeeper.quorum", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_zookeeper_quorum")); conf.set("hbase.zookeeper.property.clientPort", XMLPropertyConfig.getConfigXML().getString("hbase.hbase_zookeeper_property_clientPort")); } /** * HTablePool对HBase表进行CRUD操作,不推荐用HTable对HBase表进行CRUD操作。<br/><br/> * * HTablePool可以解决HTable存在的线程不安全问题,同时通过维护固定数量的HTable对象,能够在程序运行期间复用这些HTable资源对象。 * * @return */ public static HTablePool getHTablePool() { if (tablePool == null) { tablePool = new HTablePool(conf, poolSize); } return tablePool; } /** * 从startRow开始查询,查询maxCount条记录 * * @param tableName 表名 * @param startRow 指定开始的行(时间戳) * @param maxCount 从startRow开始查询,查询maxCount条记录,最高阀值为10000 * @return [{"timestamp":"1370412537880","id":"1","name":"zhangsan","age":"20","row":"quanzhou"}] */ public static List<Map<String, String>> scan(String tableName, FilterList filterList, String startRow, int maxCount) { List<Map<String, String>> datas = new ArrayList<Map<String, String>>(); ResultScanner rs = null; try { HTable table = (HTable) getHTablePool().getTable(tableName); Scan scan = new Scan(); if(filterList!=null){ scan.setFilter(filterList); } if (startRow != null && !"".equals(startRow.trim())) { scan.setStartRow(Bytes.toBytes(startRow)); } if(maxCount<=0){ maxCount = 10000; } if (maxCount > 10000) { maxCount = 10000; } scan.setCaching(maxCount + 1); rs = table.getScanner(scan); //Result类提供了raw()、list()、getValue(byte[] family, byte[] qualifier)三种方法来遍历数据 for (Result r : rs) { HashMap<String, String> map = new HashMap<String, String>(); long timestamp = 0; for (KeyValue kv : r.list()) { timestamp = kv.getTimestamp(); String qualifier = Bytes.toString(kv.getQualifier()); //列名 String value = Bytes.toString(kv.getValue()); //列值 map.put(qualifier, value); } map.put(HBASE_ROW, Bytes.toString(r.getRow())); map.put(HBASE_TIMESTAMP, "" + timestamp); datas.add(map); // 假如到了指定条数就跳出 if (datas.size() >= maxCount) { break; } } } catch (Exception e) { e.printStackTrace(); }finally{ if(rs!=null){ rs.close(); } //table.close(); } return datas; } }
相关推荐
Solr是Apache Lucene的一个子项目,是一个高性能、全文本搜索引擎服务器。它可以接收来自HBase的实时数据,并创建基于这些数据的索引,从而支持复杂的查询操作。在这个场景下,Solr被用作二级索引的存储和查询平台。...
5. **Elasticsearch**: Elasticsearch 是一个基于 Lucene 的分布式搜索引擎,用于实时、高效地搜索和分析大量数据。在本项目中,Elasticsearch 可能用于对用户轨迹数据进行全文检索和分析,支持快速的实时查询,便于...
Elasticsearch是一个基于Lucene构建的开源搜索引擎,它能提供全文检索功能,并以分布式方式运行。Hbase则是一个开源的非关系型数据库(NoSQL),它建立在Hadoop文件系统(HDFS)之上,是Hadoop的一个子项目,用于...
2. **Elasticsearch**: Elasticsearch是一款基于Lucene的搜索服务器,提供实时、分布式、全文检索功能。随着版本的升级,它增加了更多高级特性,如更强大的分析能力、更优化的性能以及对大数据处理的支持。 3. **...
Elasticsearch(简称ES)是一款基于Lucene的开源分布式全文检索引擎。它具备高扩展性、可分布式的特性,能够支持大规模数据的存储与检索。ES适用于多种场景,如日志分析、网站搜索等,具有实时性好、易于扩展等特点。...
Elasticsearch是一个基于Lucene的分布式搜索服务器,其设计目的是提供一个分布式的、可扩展的全文搜索引擎,同时也具备数据的聚合和统计分析能力。而Hadoop是一个由Apache软件基金会开发的开源框架,它允许使用简单...
利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎 ## 基本介绍 - InjectDriver 将本地的url注入到hbase数据库中等待下一步执行 - FetchDriver 负责抓取url对应的网页内容 - ParserUrlDriver 解析所抓取...
Solr 是 Lucene 项目的一部分,提供了一个高效、可扩展的全文检索、分析和分布式搜索平台。它支持多种数据源,包括 XML、JSON、CSV 等,并且提供了丰富的查询语法,可以实现复杂的搜索需求。Solr 的主要特性包括: ...
利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎 基本介绍 InjectDriver 将本地的url注入到hbase数据库中等待下一步执行 FetchDriver 负责抓取url对应的网页内容 ParserUrlDriver 解析所抓取网页内容...
首先,Hadoop是一个由Apache软件基金会支持的开源分布式存储与计算框架,其发展起源于Apache Lucene、Apache Nutch以及Google的三大论文:MapReduce、GFS和BigTable。Hadoop生态系统包括Hadoop核心、Hadoop Common、...
一、ElasticSearch和Hbase ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,...
Solr 是一个基于 Lucene 的搜索平台,具有强大的查询和索引能力。通过创建 Solr 的分片,我们可以实现 HBase 的二级索引。我们可以使用 solrctl 命令来创建 Solr 的分片。例如,我们可以使用以下命令创建一个名为 ll...
特别是在Java、Java Enterprise Edition、SOA、Spring、Hibernate、Hadoop、Hive、Flume、Sqoop、Oozie、Spark、Shark、YARN、Impala、Kafka、Storm、Solr/Lucene以及NoSQL数据库如HBase、Cassandra、MongoDB、MPP...
9. **查询优化**: 结合Elasticsearch的查询语言(如Lucene Query Parser或JSON Query DSL),可以实现复杂和高性能的搜索查询,同时充分利用Elasticsearch的全文搜索和聚合功能。 总之,Elasticsearch-HBase ...
Solr是Apache Lucene项目的一部分,是一个全文搜索引擎服务器。它允许用户快速地搜索大量文本数据,提供高级的索引和查询功能,通常用于构建企业级的搜索解决方案。 在压缩包子文件的文件名称列表中,我们看到"solr...