浏览 7979 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2014-08-12
散仙前几篇博客上,已经写了单机程序使用使用hadoop的构建lucene索引,本篇呢,我们里看下如何使用MapReduce来构建索引,代码如下:
<pre name="code" class="java">package com.mapreduceindex; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.commons.io.output.NullWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.util.Version; import org.apache.solr.store.hdfs.HdfsDirectory; import org.mortbay.log.Log; import org.wltea.analyzer.lucene.IKAnalyzer; import com.qin.wordcount.MyWordCount; /** * * 使用MapReduce构建索引 * @author qindongliang * 大数据技术交流群: 376932160 * 搜索技术一号群: 324714439 * 搜索技术一号群: 206247899 * Hadoop版本2.2.0 * Lucene版本4.8.0 * Solr版本4.8.0 * * **/ public class BuildIndexMapReduce { /** * 获取一个IndexWriter * @param outDir 索引的输出目录 * @return IndexWriter 获取一个IndexWriter * */ public static IndexWriter getIndexWriter(String outDir) throws Exception{ Analyzer analyzer=new IKAnalyzer(true);//IK分词 IndexWriterConfig config=new IndexWriterConfig(Version.LUCENE_48, analyzer); Configuration conf=new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.46.32:9000/");//HDFS目录 Path path=new Path("hdfs://192.168.46.32:9000/qin/"+outDir);//索引目录 HdfsDirectory directory=new HdfsDirectory(path, conf); long heapSize = Runtime.getRuntime().totalMemory()/ 1024L / 1024L;//总内存 long heapMaxSize = Runtime.getRuntime().maxMemory()/ 1024L / 1024L;//使用的最大内存 config.setRAMBufferSizeMB(((heapMaxSize-heapSize)*0.7));//空闲内存的70%作为合并因子 IndexWriter writer=new IndexWriter(directory, config);// return writer; } /** * 索引的工具类 * * **/ public static class LuceneDocumentUtil{ public static Document getDoc(String filed,String value){ Document d=new Document(); //模拟载入schemal文件,根据solr的scheml文件来灵活的坐一些索引, d.add(new TextField("content", value, Store.YES)); return d; } } /** * @author qindongliang * */ private static class BuildIndexMapper extends Mapper&lt;LongWritable, Text, NullWritable, NullWritable&gt; { IndexWriter iw; List&lt;Document&gt; documenst=new ArrayList&lt;&gt;(); @Override protected void setup(Context context)throws IOException, InterruptedException { Random rd=new Random(); int i=rd.nextInt(99999999);//此处的索引目录名可以使用UUID来使它唯一 try{ iw=getIndexWriter(i+"");//初始化IndexWriter }catch(Exception e){ e.printStackTrace(); } } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { Log.info(" 记录的日志信息: "+value.toString()); String values[]=value.toString().split("\1");//此处读入被索引的文件每一行 String fieldName=values[0]; String fieldValue=values[1]; Document d=LuceneDocumentUtil.getDoc(fieldName, fieldValue); if(d==null){ return; } documenst.add(d); if(documenst.size()&gt;5000){//使用批处理提交 iw.addDocuments(documenst); documenst.clear(); } // context.write(null, null); } /*** * 在Map结束时,做一些事,提交索引 * * */ @Override protected void cleanup(Context context)throws IOException, InterruptedException { if(documenst.size()&gt;0){ iw.addDocuments(documenst); } if(iw!=null){ iw.close(true);//关闭至合并完成 } } } public static void main(String[] args)throws Exception { Configuration conf=new Configuration(); conf.set("mapreduce.job.jar", "myjob.jar"); conf.set("fs.defaultFS","hdfs://192.168.46.32:9000"); conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.address", "192.168.46.32:8032"); /**Job任务**/ //Job job=new Job(conf, "testwordcount");//废弃此API Job job=Job.getInstance(conf, "build index "); job.setJarByClass(BuildIndexMapReduce.class); System.out.println("模式: "+conf.get("yarn.resourcemanager.address"));; // job.setCombinerClass(PCombine.class); job.setNumReduceTasks(0);//设置为3 job.setMapperClass(BuildIndexMapper.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(NullWritable.class); String path="hdfs://192.168.46.32:9000/qin/output"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.46.32:9000/qin/indexinput"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } } </pre> 控制台生成的信息如下: <pre name="code" class="java">模式: 192.168.46.32:8032 INFO - RMProxy.createRMProxy(56) | Connecting to ResourceManager at /192.168.46.32:8032 WARN - JobSubmitter.copyAndConfigureFiles(149) | Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. INFO - FileInputFormat.listStatus(287) | Total input paths to process : 3 INFO - JobSubmitter.submitJobInternal(394) | number of splits:3 INFO - Configuration.warnOnceIfDeprecated(840) | user.name is deprecated. Instead, use mapreduce.job.user.name INFO - Configuration.warnOnceIfDeprecated(840) | mapred.jar is deprecated. Instead, use mapreduce.job.jar INFO - Configuration.warnOnceIfDeprecated(840) | fs.default.name is deprecated. Instead, use fs.defaultFS INFO - Configuration.warnOnceIfDeprecated(840) | mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces INFO - Configuration.warnOnceIfDeprecated(840) | mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class INFO - Configuration.warnOnceIfDeprecated(840) | mapred.job.name is deprecated. Instead, use mapreduce.job.name INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.inputformat.class is deprecated. Instead, use mapreduce.job.inputformat.class INFO - Configuration.warnOnceIfDeprecated(840) | mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir INFO - Configuration.warnOnceIfDeprecated(840) | mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir INFO - Configuration.warnOnceIfDeprecated(840) | mapreduce.outputformat.class is deprecated. Instead, use mapreduce.job.outputformat.class INFO - Configuration.warnOnceIfDeprecated(840) | mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps INFO - Configuration.warnOnceIfDeprecated(840) | mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class INFO - Configuration.warnOnceIfDeprecated(840) | mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir INFO - JobSubmitter.printTokens(477) | Submitting tokens for job: job_1407866786826_0001 INFO - YarnClientImpl.submitApplication(174) | Submitted application application_1407866786826_0001 to ResourceManager at /192.168.46.32:8032 INFO - Job.submit(1272) | The url to track the job: http://h1:8088/proxy/application_1407866786826_0001/ INFO - Job.monitorAndPrintJob(1317) | Running job: job_1407866786826_0001 INFO - Job.monitorAndPrintJob(1338) | Job job_1407866786826_0001 running in uber mode : false INFO - Job.monitorAndPrintJob(1345) | map 0% reduce 0% INFO - Job.monitorAndPrintJob(1345) | map 33% reduce 0% INFO - Job.monitorAndPrintJob(1345) | map 100% reduce 0% INFO - Job.monitorAndPrintJob(1356) | Job job_1407866786826_0001 completed successfully INFO - Job.monitorAndPrintJob(1363) | Counters: 27 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=238179 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=67091 HDFS: Number of bytes written=9708 HDFS: Number of read operations=147 HDFS: Number of large read operations=0 HDFS: Number of write operations=75 Job Counters Launched map tasks=3 Data-local map tasks=3 Total time spent by all maps in occupied slots (ms)=81736 Total time spent by all reduces in occupied slots (ms)=0 Map-Reduce Framework Map input records=166 Map output records=0 Input split bytes=326 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=11308 CPU time spent (ms)=9200 Physical memory (bytes) snapshot=469209088 Virtual memory (bytes) snapshot=2544439296 Total committed heap usage (bytes)=245399552 File Input Format Counters Bytes Read=62970 File Output Format Counters Bytes Written=0 </pre> 本次,散仙测试的使用的数据源有3个文件,当然散仙在这里是小文件,在实际生产中,尽量避免有小文件存放在HDFS上,应该提前合并小文件为大文文件,散仙用了3个测试文件,所以会起了3个map进程,最后生成的索引,有3份,如果需要,我们还可以用生成的多份索引使用一个reduce作业,来完成合并。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |