源码
\hadoop-1.0.0\src\contrib\index
主要涉及的类
org.apache.hadoop.contrib.index.main.UpdateIndex
org.apache.hadoop.contrib.index.mapred.IndexUpdater
org.apache.hadoop.contrib.index.mapred.IndexUpdateMapper
org.apache.hadoop.contrib.index.mapred.IndexUpdatePartitioner
org.apache.hadoop.contrib.index.mapred.IndexUpdateCombiner
org.apache.hadoop.contrib.index.mapred.IndexUpdateReducer
org.apache.hadoop.contrib.index.example.LineDocInputFormat
org.apache.hadoop.contrib.index.mapred.IndexUpdateOutputFormat
org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration
解析
1、UpdateIndex类(main实现读取控制台参数)
UpdateIndex是整个程序的入口,提供参数给用户配置。
- -inputPaths -->mapred.input.dir (V)
- -outputPath -->mapred.output.dir (V)
- -shards -->最终每个reduct建立的索引存放hdfs位置 (V)
- -indexPath -->sea.index.path
- -numShards -->sea.num.shards
- -numMapTasks -->mapred.map.tasks (V)
- -conf 添加Configuration xml文件形式的配置方式
shards 通过转换最终保存到Configuration的sea.index.shards
shards参数 与 indexPath和numShards参数 设置一种就可以了。如果没有设置shards,则在main方法中会通过indexPath和numShards生成得到 shards 参数。
从Configuration中获得Updater类: IndexUpdateConfiguration.getIndexUpdaterClass()
public Class<? extends IIndexUpdater> getIndexUpdaterClass() { return conf.getClass("sea.index.updater", IndexUpdater.class, IIndexUpdater.class); }
在UpdateIndex最终调用IndexUpdater
updater.run(conf, inputPaths, outputPath, numMapTasks, shards);
2、IndexUpdater类(配置MapReduce Job)
public void run(Configuration conf, Path[] inputPaths, Path outputPath, int numMapTasks, Shard[] shards) throws IOException { JobConf jobConf = createJob(conf, inputPaths, outputPath, numMapTasks, shards); JobClient.runJob(jobConf); }
JobConf createJob(Configuration conf, Path[] inputPaths, Path outputPath, int numMapTasks, Shard[] shards) throws IOException { setShardGeneration(conf, shards); // iconf.set sets properties in conf IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf); Shard.setIndexShards(iconf, shards); // MapTask.MapOutputBuffer uses "io.sort.mb" to decide its max buffer size // (max buffer size = 1/2 * "io.sort.mb"). // Here we half-en "io.sort.mb" because we use the other half memory to // build an intermediate form/index in Combiner. iconf.setIOSortMB(iconf.getIOSortMB() / 2);
setShardGeneration(conf, shards);方法从shard path路径获得segment_N来获得generation,并更新shard实例的gen属性。
Shard.setIndexShards(iconf, shards);则是序列化shards为String写入到Configuration中。
接下来的代码,就是设置InputFormat,OutputFormat, Map, Reduce,Partitioner, KeyValue Class, Combine等一些列的Job必备的参数。
// create the job configuration JobConf jobConf = new JobConf(conf, IndexUpdater.class); jobConf.setJobName(this.getClass().getName() + "_" + System.currentTimeMillis()); // provided by application FileInputFormat.setInputPaths(jobConf, inputPaths); FileOutputFormat.setOutputPath(jobConf, outputPath); jobConf.setNumMapTasks(numMapTasks); // already set shards jobConf.setNumReduceTasks(shards.length); jobConf.setInputFormat(iconf.getIndexInputFormatClass()); // set by the system jobConf.setMapOutputKeyClass(IndexUpdateMapper.getMapOutputKeyClass()); jobConf.setMapOutputValueClass(IndexUpdateMapper.getMapOutputValueClass()); jobConf.setOutputKeyClass(IndexUpdateReducer.getOutputKeyClass()); jobConf.setOutputValueClass(IndexUpdateReducer.getOutputValueClass()); jobConf.setMapperClass(IndexUpdateMapper.class); jobConf.setPartitionerClass(IndexUpdatePartitioner.class); jobConf.setCombinerClass(IndexUpdateCombiner.class); jobConf.setReducerClass(IndexUpdateReducer.class); jobConf.setOutputFormat(IndexUpdateOutputFormat.class); return jobConf;
3、LineDocInputFormat(数据输入类)
从IndexUpdateConfiguration.getIndexInputFormatClass()可以获得当前默认使用的数据输入类org.apache.hadoop.contrib.index.example.LineDocInputFormat。
这里定义了org.apache.hadoop.contrib.index.example.LineDocRecordReader来解析数据,获取建立索引的命令(Insert, Update, Delete)和对应的数据。
public synchronized boolean next(DocumentID key, LineDocTextAndOp value) throws IOException { if (pos >= end) { return false; } // key is document id, which are bytes until first space if (!readInto(key.getText(), SPACE)) { // 把读到的第一个数据写入Key return false; } // read operation: i/d/u, or ins/del/upd, or insert/delete/update Text opText = new Text(); if (!readInto(opText, SPACE)) { return false; } String opStr = opText.toString(); DocumentAndOp.Op op; if (opStr.equals("i") || opStr.equals("ins") || opStr.equals("insert")) { op = DocumentAndOp.Op.INSERT; } else if (opStr.equals("d") || opStr.equals("del") || opStr.equals("delete")) { op = DocumentAndOp.Op.DELETE; } else if (opStr.equals("u") || opStr.equals("upd") || opStr.equals("update")) { op = DocumentAndOp.Op.UPDATE; } else { // default is insert op = DocumentAndOp.Op.INSERT; } value.setOp(op); if (op == DocumentAndOp.Op.DELETE) { return true; } else { // read rest of the line return readInto(value.getText(), EOL); } }
// 把InputStream offset到分隔符的数据写入到text private boolean readInto(Text text, char delimiter) throws IOException
2 ins apache
3 ins apache
4 ins apache
0 del
1 upd hadoop
2 del
3 upd hadoop
4、IndexUpdateMapper(最终形成IntermediateForm传递给Combiner和Reducer处理)
· ILocalAnalysis -->org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis
把Text组成为Lucene需要的Document。
· DocumentAndOp
public class DocumentAndOp implements Writable { private Op op; // private Document doc; // INSERT UPDATE private Term term; // DELETE UPDATE
· IntermediateForm
public class IntermediateForm implements Writable { private final Collection<Term> deleteList; private RAMDirectory dir; private IndexWriter writer; private int numDocs; public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException { if (doc.getOp() == DocumentAndOp.Op.DELETE || doc.getOp() == DocumentAndOp.Op.UPDATE) { deleteList.add(doc.getTerm()); } if (doc.getOp() == DocumentAndOp.Op.INSERT || doc.getOp() == DocumentAndOp.Op.UPDATE) { if (writer == null) { // analyzer is null because we specify an analyzer with addDocument writer = createWriter(); } writer.addDocument(doc.getDocument(), analyzer); numDocs++; } } public void process(IntermediateForm form) throws IOException { if (form.deleteList.size() > 0) { deleteList.addAll(form.deleteList); } if (form.dir.sizeInBytes() > 0) { if (writer == null) { writer = createWriter(); } writer.addIndexesNoOptimize(new Directory[] { form.dir }); numDocs++; } }
· HashingDistributionPolicy
public class HashingDistributionPolicy implements IDistributionPolicy { private int numShards; public void init(Shard[] shards) { numShards = shards.length; } public int chooseShardForInsert(DocumentID key) { int hashCode = key.hashCode(); return hashCode >= 0 ? hashCode % numShards : (-hashCode) % numShards; } public int chooseShardForDelete(DocumentID key) { int hashCode = key.hashCode(); return hashCode >= 0 ? hashCode % numShards : (-hashCode) % numShards; } }
IndexUpdateMapper.map(K, V, OutputCollector<Shard, IntermediateForm>, Reporter)
根据DocumentAndOp形成IntermediateForm,以及DistributionPolicy选择Shard,传入Combiner。
评注: 这里每条数据都要开启和关闭Writer,消耗应该不小。
5、IndexUpdateCombiner
把该节点的IntermediateForm合并成一个大的IntermediateForm集合。
评注: 感觉作用不是很大,合并减少的数据量有限; 但是却又要初始化和关闭一次Writer。
6、IndexUpdateReducer
把从Mapper传递来的IntermediateForm写入到ShardWriter。最终把Lucene索引写入到IndexPath下。
public ShardWriter(FileSystem fs, Shard shard, String tempDir, IndexUpdateConfiguration iconf) throws IOException
在Reducer最后关闭ShardWriter时,由于要等待比较长的时间。这里使用一个新的线程来发送心跳。 使用hadoop/编写mapreduce程序 注意点(! 不要关闭mapreduce的超时)
new Closeable() { volatile boolean closed = false; public void close() throws IOException { // spawn a thread to give progress heartbeats Thread prog = new Thread() { public void run() { while (!closed) { try { fReporter.setStatus("closing"); Thread.sleep(1000); } catch (InterruptedException e) { continue; } catch (Throwable e) { return; } } } }; try { prog.start(); if (writer != null) { writer.close(); } } finally { closed = true; } } }.close();
· ShardWriter
把索引写到本地,等处理完了数据后,关闭IndexWriter后把索引库转入拷贝到HDFS持久化。
// move the files created in the temp dir into the perm dir // and then delete the temp dir from the local FS private void moveFromTempToPerm() throws IOException { try { FileStatus[] fileStatus = localFs.listStatus(temp, LuceneIndexFileNameFilter.getFilter()); Path segmentsPath = null; Path segmentsGenPath = null; // move the files created in temp dir except segments_N and segments.gen for (int i = 0; i < fileStatus.length; i++) { Path path = fileStatus[i].getPath(); String name = path.getName(); if (LuceneUtil.isSegmentsGenFile(name)) { assert (segmentsGenPath == null); segmentsGenPath = path; } else if (LuceneUtil.isSegmentsFile(name)) { assert (segmentsPath == null); segmentsPath = path; } else { fs.completeLocalOutput(new Path(perm, name), path); } } // move the segments_N file if (segmentsPath != null) { fs.completeLocalOutput(new Path(perm, segmentsPath.getName()), segmentsPath); } // move the segments.gen file if (segmentsGenPath != null) { fs.completeLocalOutput(new Path(perm, segmentsGenPath.getName()), segmentsGenPath); } } finally { // finally delete the temp dir (files should have been deleted) localFs.delete(temp); } }
相关推荐
Hadoop大数据InvertedIndex文档倒排索引程序实验报告 大数据实验报告中,实现了使用Hadoop编程的InvertedIndex文档倒排索引程序。该程序使用Hadoop的MapReduce框架,通过Map、Combine和Reduce三个阶段,实现了文档...
在本实验报告中,我们探讨了如何在Hadoop框架下构建一个单词的反向索引程序,这是一个在大数据处理和搜索引擎领域中常见的任务。Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在大规模数据集上进行...
- Inverted Index:构建文档的倒排索引表。 #### 四、YARN资源管理器 1. **YARN架构**: - ResourceManager:集群资源的管理者。 - NodeManager:单个节点上的资源管理器。 - ApplicationMaster:负责应用程序...
这通常涉及到创建一个清单文件(index.yaml),并更新仓库的索引。Minio提供了命令行工具或REST接口来完成这个步骤。 7. **配置Hadoop实例**:在Hadoop Chart中,你需要定义各种配置参数,例如数据节点数量、HDFS的...
- **API**:Hadoop 提供了丰富的 API 来编写 MapReduce 应用程序,包括 `Mapper` 和 `Reducer` 接口。 - **配置**:通过 `Configuration` 类来设置各种参数,如输入输出路径、Mapper 和 Reducer 类以及文件分割策略...
- **InvertedIndex**: 构建倒排索引,即给出一个词,返回包含该词的所有文档的列表。 #### 五、Hadoop生态系统扩展 - **Lucene**: 高性能全文搜索引擎库,虽然不是Hadoop的一部分,但经常与Hadoop结合使用。 - **...
在构建倒排索引时,文档会以分块的形式存储在HDFS上。 **6. 集群优化** 为了提高性能,可以在Hadoop集群中进行各种优化,如调整Map和Reduce任务的数量、内存设置、数据本地化策略等。 **7. 进阶话题** - **...
而论文[9]提出的Trojan Index和Trojan Join算法,为Hadoop提供了一种高效的索引和JOIN操作,实验结果显示优于原生Hadoop和HadoopDB。 在工业界,百度(Baidu)对Hadoop的关键组件如Map、Shuffler和Reducer进行了C++...
- `-i <index>`:可选参数,用于指定索引文件。 - `<source>`:源文件或目录。 - `<destination>`:目标HAR文件的位置。 **示例**: ``` hadoop archive -archiveName test.har -p test/ -i test.haridx /user/test...
它已经在 Cloudera CDH5.1.0 (Hadoop 2.3.0)、Terrier 4.0 和 StreamCorpus v0.3.0 上运行的 Hadoop 集群上进行了测试。 配置:使用Maven编译任务编译后,需要在Hadoop设置中配置Terrier(见)。 最重要的值是索引...
本文将深入探讨如何利用Java编程语言和Hadoop MapReduce来实现一个反向索引,特别是在一个大型的Hadoop集群上。 反向索引是一种数据结构,广泛应用于搜索引擎和全文检索系统中,它能够快速地根据词项找到包含这些词...
Apache Nutch是一款开源的网络爬虫软件,用于抓取互联网上的网页并进行索引,是大数据领域中搜索引擎构建的重要工具。这份手册涵盖了从环境准备到系统配置的全过程,旨在帮助用户成功搭建一个分布式的Nutch系统。 ...
根据给定的信息,我们可以深入探讨倒排索引(Inverted Index)的概念以及如何利用Hadoop进行倒排索引的构建。倒排索引是一种用于快速文档检索的数据结构,尤其是在处理大规模文本数据时非常有效。 ### 倒排索引概念...
2. **Row Index**: ORC文件在每个Stripe内和Stripe之间都保存了索引,这使得查询能够快速定位到所需的数据,避免了扫描整个文件。 3. **Column Statistics**: 每个列都有对应的统计信息,如最小值、最大值、null值...
本文将通过代码实例展示如何利用Lucene和Java进行基本的数据索引和搜索,以及如何在HDFS(Hadoop分布式文件系统)上创建和搜索索引。开发环境包括Java 1.6、Eclipse 3.4.2、Lucene 2.4.0和运行在Windows XP SP3上的...
转换完成后,原来的SequenceFile数据会被保留,但会增加一个索引文件(通常命名为`index`),这样就形成了一个完整的MapFile。MapFile的索引允许用户快速定位到特定的键值对,提高了数据访问效率,尤其在需要频繁...
* 创建索引:CREATE [UNIQUE] INDEX idxname ON tabname(col…) * 删除索引:DROP INDEX idxname 视图管理 * 创建视图:CREATE VIEW viewname AS SELECT statement * 删除视图:DROP VIEW viewname 查询语句 * ...