`
qindongliang1922
  • 浏览: 2193230 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117793
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:126217
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:60161
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71507
社区版块
存档分类
最新评论

使用MapReduce并行构建Lucene索引

阅读更多
散仙前几篇博客上,已经写了单机程序使用使用hadoop的构建lucene索引,本篇呢,我们里看下如何使用MapReduce来构建索引,代码如下:

package com.mapreduceindex;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.util.Version;
import org.apache.solr.store.hdfs.HdfsDirectory;
import org.mortbay.log.Log;
import org.wltea.analyzer.lucene.IKAnalyzer;

import com.qin.wordcount.MyWordCount;

/**
 * 
 * 使用MapReduce构建索引
 * @author qindongliang
 * 大数据技术交流群: 376932160
 *  搜索技术一号群:  324714439
 *  搜索技术一号群:  206247899
 * Hadoop版本2.2.0
 * Lucene版本4.8.0
 *   Solr版本4.8.0
 * 
 * **/
public class BuildIndexMapReduce {

	/**
	 * 获取一个IndexWriter
	 * @param outDir 索引的输出目录
	 * @return IndexWriter 获取一个IndexWriter
	 * */
	public static IndexWriter  getIndexWriter(String outDir) throws Exception{
		Analyzer  analyzer=new IKAnalyzer(true);//IK分词
 		IndexWriterConfig    config=new IndexWriterConfig(Version.LUCENE_48, analyzer);
 		Configuration conf=new Configuration();
 		conf.set("fs.defaultFS","hdfs://192.168.46.32:9000/");//HDFS目录
 		Path path=new Path("hdfs://192.168.46.32:9000/qin/"+outDir);//索引目录
 		HdfsDirectory directory=new HdfsDirectory(path, conf);
 		long heapSize = Runtime.getRuntime().totalMemory()/ 1024L / 1024L;//总内存
		long heapMaxSize = Runtime.getRuntime().maxMemory()/ 1024L / 1024L;//使用的最大内存
		config.setRAMBufferSizeMB(((heapMaxSize-heapSize)*0.7));//空闲内存的70%作为合并因子
 		IndexWriter writer=new IndexWriter(directory, config);//
 		return writer;
		
	}
	
	/**
	 * 索引的工具类
	 * 
	 * **/
	public static class LuceneDocumentUtil{
		public static Document getDoc(String filed,String value){
			    Document d=new Document();
			    //模拟载入schemal文件,根据solr的scheml文件来灵活的坐一些索引,
				d.add(new TextField("content", value, Store.YES));
			return d;
		}
		
	}
	/**
	 * @author qindongliang
	 *
	 */
	private static class BuildIndexMapper extends Mapper<LongWritable, Text, NullWritable, NullWritable> {
		
		IndexWriter iw;
		List<Document> documenst=new ArrayList<>();
		
		
	@Override
	protected void setup(Context context)throws IOException, InterruptedException {
	    Random rd=new Random();
		int i=rd.nextInt(99999999);//此处的索引目录名可以使用UUID来使它唯一
		try{
		iw=getIndexWriter(i+"");//初始化IndexWriter
		}catch(Exception e){
			e.printStackTrace();
		}
		
		
	 
	}
		
	
	@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
		Log.info("  记录的日志信息: "+value.toString());
		String values[]=value.toString().split("\1");//此处读入被索引的文件每一行
		String fieldName=values[0];
		String fieldValue=values[1];
		Document d=LuceneDocumentUtil.getDoc(fieldName, fieldValue);
		if(d==null){
			return;
		}
		documenst.add(d);
		if(documenst.size()>5000){//使用批处理提交
			iw.addDocuments(documenst);
			documenst.clear();
		}
		
		// context.write(null, null);
		}
	/***
	 * 在Map结束时,做一些事,提交索引
	 * 
	 * */
		@Override
		protected void cleanup(Context context)throws IOException, InterruptedException {
			if(documenst.size()>0){
				iw.addDocuments(documenst);
			}
			if(iw!=null){
			iw.close(true);//关闭至合并完成
			}
			
		}
	}
public static void main(String[] args)throws Exception {
	
	Configuration conf=new Configuration();
	
    conf.set("mapreduce.job.jar", "myjob.jar");
	conf.set("fs.defaultFS","hdfs://192.168.46.32:9000");
	conf.set("mapreduce.framework.name", "yarn");  
	conf.set("yarn.resourcemanager.address", "192.168.46.32:8032"); 
	/**Job任务**/
   //Job job=new Job(conf, "testwordcount");//废弃此API
   Job job=Job.getInstance(conf, "build index ");
	job.setJarByClass(BuildIndexMapReduce.class);
 	System.out.println("模式:  "+conf.get("yarn.resourcemanager.address"));;
	// job.setCombinerClass(PCombine.class);
	 job.setNumReduceTasks(0);//设置为3
	 job.setMapperClass(BuildIndexMapper.class);
	 job.setInputFormatClass(TextInputFormat.class);
	 job.setOutputFormatClass(TextOutputFormat.class);

 
	
	 job.setMapOutputKeyClass(NullWritable.class);
	 job.setMapOutputValueClass(NullWritable.class);
 

		String path="hdfs://192.168.46.32:9000/qin/output";
		FileSystem fs=FileSystem.get(conf);
		Path p=new Path(path);
		if(fs.exists(p)){
			fs.delete(p, true);
			System.out.println("输出路径存在,已删除!");
		}
	FileInputFormat.setInputPaths(job, "hdfs://192.168.46.32:9000/qin/indexinput");
	FileOutputFormat.setOutputPath(job,p );
	System.exit(job.waitForCompletion(true) ? 0 : 1);  
}

	
	
	
}

控制台生成的信息如下:
模式:  192.168.46.32:8032
INFO - RMProxy.createRMProxy(56) | Connecting to ResourceManager at /192.168.46.32:8032
WARN - JobSubmitter.copyAndConfigureFiles(149) | Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
INFO - FileInputFormat.listStatus(287) | Total input paths to process : 3
INFO - JobSubmitter.submitJobInternal(394) | number of splits:3
INFO - Configuration.warnOnceIfDeprecated(840) | user.name is deprecated. Instead, use mapreduce.job.user.name
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.jar is deprecated. Instead, use mapreduce.job.jar
INFO - Configuration.warnOnceIfDeprecated(840) | fs.default.name is deprecated. Instead, use fs.defaultFS
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class
INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.job.name is deprecated. Instead, use mapreduce.job.name
INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.inputformat.class is deprecated. Instead, use mapreduce.job.inputformat.class
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.outputformat.class is deprecated. Instead, use mapreduce.job.outputformat.class
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class
INFO - Configuration.warnOnceIfDeprecated(840) | mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
INFO - JobSubmitter.printTokens(477) | Submitting tokens for job: job_1407866786826_0001
INFO - YarnClientImpl.submitApplication(174) | Submitted application application_1407866786826_0001 to ResourceManager at /192.168.46.32:8032
INFO - Job.submit(1272) | The url to track the job: http://h1:8088/proxy/application_1407866786826_0001/
INFO - Job.monitorAndPrintJob(1317) | Running job: job_1407866786826_0001
INFO - Job.monitorAndPrintJob(1338) | Job job_1407866786826_0001 running in uber mode : false
INFO - Job.monitorAndPrintJob(1345) |  map 0% reduce 0%
INFO - Job.monitorAndPrintJob(1345) |  map 33% reduce 0%
INFO - Job.monitorAndPrintJob(1345) |  map 100% reduce 0%
INFO - Job.monitorAndPrintJob(1356) | Job job_1407866786826_0001 completed successfully
INFO - Job.monitorAndPrintJob(1363) | Counters: 27
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=238179
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=67091
		HDFS: Number of bytes written=9708
		HDFS: Number of read operations=147
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=75
	Job Counters 
		Launched map tasks=3
		Data-local map tasks=3
		Total time spent by all maps in occupied slots (ms)=81736
		Total time spent by all reduces in occupied slots (ms)=0
	Map-Reduce Framework
		Map input records=166
		Map output records=0
		Input split bytes=326
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=11308
		CPU time spent (ms)=9200
		Physical memory (bytes) snapshot=469209088
		Virtual memory (bytes) snapshot=2544439296
		Total committed heap usage (bytes)=245399552
	File Input Format Counters 
		Bytes Read=62970
	File Output Format Counters 
		Bytes Written=0

本次,散仙测试的使用的数据源有3个文件,当然散仙在这里是小文件,在实际生产中,尽量避免有小文件存放在HDFS上,应该提前合并小文件为大文文件,散仙用了3个测试文件,所以会起了3个map进程,最后生成的索引,有3份,如果需要,我们还可以用生成的多份索引使用一个reduce作业,来完成合并。








  • 大小: 86.8 KB
分享到:
评论
3 楼 RRobinson 2014-12-04  
散仙,并行索引构建可以了,那查询呢
2 楼 zhuhongming123 2014-11-26  
散仙,请问myjob.jar文件是如何产生的……按照你的思路,报了个File myjob.jar not exist异常
1 楼 RRobinson 2014-08-12  
终于等到此篇文章了。。。。

相关推荐

    如何将Lucene索引写入Hadoop?

    标题 "如何将Lucene索引写入Hadoop" 指涉的是在大数据处理场景下,如何利用Apache Lucene的全文检索功能与Apache Hadoop的分布式计算能力相结合,实现高效的数据检索。Apache Lucene是一个高性能、全文本搜索库,而...

    PigExtend:Apache Pig+MapReduce给LuceneSolrElasticSearch构建索引

    主要是利用了Pig框架简化了自己写Hadoop MapReduce程序来构建大规模并行索引的问题,里面封装了主流的全文检索框架,如Lucene,Solr和ElasticSearch 并且支持SolrCloud集群和ElasticSearch集群的分布式索引构建。 这...

    基于Lucene和HDFS的PB级数据索引、搜索、存储系统.zip

    3. **Lucene索引分布**:每个HDFS节点上的数据都会被本地化的Lucene实例进行索引,确保索引与数据的物理位置相对应,减少网络传输开销。 4. **Shard和Replication**:Lucene的索引可能会被分成多个碎片(shards),...

    hadoop.contrib/lucene源码

    在Hadoop MapReduce中使用Lucene构建倒排索引的过程大致如下: 1. **数据预处理**:首先,我们需要将原始数据(如文本文件)转化为Hadoop支持的键值对形式。键通常是文件名或文档ID,值是文档内容。 2. **Mapper...

    lucene-6.6.3.zip

    例如,使用 HDFS 存储索引,利用 MapReduce 或 Spark 并行处理多个索引段,提升整体效率。 总结,Apache Lucene 6.6.3 是一个强大而灵活的全文检索工具,它为开发者提供了构建高效搜索引擎所需的所有组件。通过深入...

    在HDFS上使用Lucene的SourceCode

    为了提高效率,我们通常会在内存中使用`RAMDirectory`来构建索引,然后再将其写入磁盘。 ```java Directory dir = new RAMDirectory(); Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_24); IndexWriter...

    电信设备-海量数据信息索引系统和索引构建方法.zip

    4. **并行计算与MapReduce**:阐述如何利用Hadoop MapReduce或其他并行计算框架进行索引构建,提高处理速度。 5. **实时索引与流处理**:介绍如何结合Apache Kafka、Spark Streaming等实时处理工具,实现实时数据的...

    Hadoop倒排索引程序

    此外,还可以考虑使用更高级的分布式索引结构,如Bloom Filter或Lucene等,以提高索引质量和查询效率。 总的来说,“Hadoop倒排索引程序”是Hadoop并行框架在文本处理和信息检索领域的成功实践,它展示了大数据处理...

    Mdrill项目在lucene的改进上的10点心得1

    首先,Mdrill改变了Lucene创建索引的方式,使得索引能够在HDFS(Hadoop分布式文件系统)上构建。原本Lucene由于依赖本地硬盘的随机写操作,无法直接在HDFS上创建索引。Mdrill通过分析源码,发现Lucene的随机写主要用...

    MapReduce在分布式搜索引擎中的应用.pdf

    在分布式搜索引擎的场景下,MapReduce的使用可以有效地解决传统搜索引擎在处理大数据量时出现的性能瓶颈。例如,Apache的Lucene搜索引擎是一个非常流行的全文搜索引擎库,它具有很高的搜索效率和灵活性。然而,当...

    索引文件去重

    5. **分布式去重**:在大数据环境中,可以使用分布式算法,如MapReduce,将去重任务分散到多台机器上并行处理。 **LuceneClear与索引去重** 提到的“LucenceClear”可能是指Apache Lucene库的一个功能或工具,用于...

    并行隐窝倒排索引的研究。 计算机工程与应用

    对于大型文档集合的处理,研究者采用MapReduce技术在Hadoop集群上并行构建Crypt-Lucene索引。在由四台节点构成的集群上,并行构建8个Crypt-Lucene索引,结果表明,这一过程可以将构建时间降低83.4%。 在技术和工具...

    Lucence和Hadoop学习资料

    在《Lucence in Action》这本书中,读者可以深入理解Lucene的工作原理,学习如何构建和优化索引,掌握查询语法,以及如何在实际项目中部署和维护Lucene搜索引擎。 Hadoop,另一方面,是Apache软件基金会下的一个...

    三个关于搜索引擎的硕士论文

    - **索引构建**:Lucene首先对网页内容进行分词,然后创建倒排索引,以便快速定位包含特定关键词的文档。 - **文本处理**:包括停用词过滤、词干化和词形还原等,以提高搜索效果。 - **搜索算法**:如TF-IDF...

    基于Hadoop平台的分布式搜索引擎.zip

    3. 建立索引:使用Lucene或Solr等开源搜索引擎库,通过MapReduce在分布式环境中构建倒排索引。倒排索引是搜索引擎的核心,它将每个词对应到包含该词的文档集合。 4. 查询处理:用户输入查询后,搜索引擎根据倒排...

    在solr文献检索中用map/reduce

    4. **索引构建过程**:如何使用MapReduce将文献数据转化为Solr索引,包括Map阶段的数据预处理和Reduce阶段的索引创建。 5. **查询处理**:MapReduce在查询执行中的角色,如何通过分布式查询提高响应速度。 6. **...

Global site tag (gtag.js) - Google Analytics