- 浏览: 577065 次
- 性别:
- 来自: 广州杭州
文章分类
最新评论
-
bohc:
谢谢,搞了两天了,现在才算是找到问题所在,解决了。
文件在使用FileChannel.map后不能被删除(Windows上) -
zhang0000jun:
在jdk1.8中执行正好和楼主的结果相反,请指教
从Java视角理解CPU缓存(CPU Cache) -
在世界的中心呼喚愛:
forenroll 写道请问楼主的那个分析工具cachemis ...
从Java视角理解CPU缓存(CPU Cache) -
xgj1988:
我这里打出的结果是: 0 L1-dcache-load-mis ...
从Java视角理解CPU缓存(CPU Cache) -
thebye85:
请教下大神,为什么频繁的park会导致大量context sw ...
从Java视角理解CPU上下文切换(Context Switch)
题外话: 请关注http://code.google.com/p/redpoll
如果有人可以提供10台左右普通机器测就好了,学校实验室不给这么多,俺已经写了一篇paper, 程序啥都好了,就差数据, 真郁闷。
进展比较慢, 走了些弯路, 不过最终还是理清了。开始考虑文档聚类后,要能通过文档的id定位到该文档的内容。而且这个id最好是整型,或者long型,而搜狐新闻给的docno是占32字节的GUID。如果不考虑这点, hadoop在算词频, 势必会生成很庞大的中间文件。
term1 docId1:tf, docId2:tf, ....
term2 docId1:tf, docId2:tf, ....
...
为了图个简便,考虑用数据库, select content from table where documentid = x 就可以直接实现这功能。然而MySql与hadoop的结合尚在初步, 没有作sharding。这样Mapper在读取数据时,就不是分布式的了。捣鼓了几天hbase, 写了些代码, 发现只存了17万条数据后,就再也存不下去了,原因不明。而且 我发现这个bigtable的仿制品还只是刚刚起步,有很多不稳定性。
没办法,看来只能靠自己了。定位其实很简单, 只要知道这篇文档在文件中的偏移量,然后用这个偏移量seek就可以了。在java中,一个long型占8字节,而且把hadoop的hdfs一般操作64m以上的文件比较有利。 把3.4G+的搜狗语料(http://www.sogou.com/labs/dl/cs.html) 全部cat在一起,然后用偏移量做文档ID是比较合理的。要定义mapper接受的key-value不是<LongWritable, Text>对的话,那得自定义InputFormat。 我针对搜狗语料做了一个SogouInputFormat,然后还有对应的RecordReader, Writable实现。结果,学校网络有问题,googlecode的svn commit不了。
搜狗的语料是采用类xml形式存储文本文件。因为处理大规模数据要求速度快,用DOM不现实。开始尝试用sax解析,结果有问题。因为有些格式出错。 于是我花了两个晚上,手写了两个状态机用来解析,终于可以读取正确,而且速度比较快。单机读取语料的速度平均51m/s,也就是说单机读取3.4G的搜狗语料一分钟多一点就可以完成。而且这种作法可以跑在mapreduce模型上了。
接下来,就是处理分词, tf, df及计算tf-idf得出VSM。
贴些代码片段:
package redpoll.examples; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; /** * Input format for sogou corpus. * @author Jeremy Chow(coderplay@gmail.com) */ public class SogouInputFormat extends FileInputFormat<LongWritable, DocumentWritable> implements JobConfigurable { private CompressionCodecFactory compressionCodecs = null; public void configure(JobConf conf) { compressionCodecs = new CompressionCodecFactory(conf); } protected boolean isSplitable(FileSystem fs, Path file) { return compressionCodecs.getCodec(file) == null; } public RecordReader<LongWritable, DocumentWritable> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new SogouRecordReader(job, (FileSplit) genericSplit); } }
package redpoll.examples; import java.io.EOFException; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; /** * A class that provides a sogou document reader from an input stream. * @author Jeremy Chow(coderplay@gmail.com) */ public class SogouCorpusReader { private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; private int bufferSize = DEFAULT_BUFFER_SIZE; /* input stream which we will get documents from */ private InputStream in; /* a buffer stores temporary bytes readed from input stream */ private byte[] buffer; /* the number of bytes of real data in the buffer */ private int bufferLength = 0; /* the current position in the buffer */ private int bufferPosn = 0; /* the buffer position in input stream */ private long bufferCurrentPosn = 0; private long currentDocPosn = 0; /* xml-like mark tags used in sogou corpus */ private byte[] docTag; private byte[] urlTag; private byte[] docnoTag; private byte[] titleTag; private byte[] contentTag; /* parser status */ enum STATUS { PREPARE, START_ELEMENT, END_ELEMENT, TEXT }; /* used for defining current parsing node */ enum NODE { NULL, DOC, URL, DOC_NO, TITLE, CONTENT, FAILED, SUCCEED }; private STATUS currentSatus; private NODE currentNode; public SogouCorpusReader(InputStream in) throws IOException { this(in, DEFAULT_BUFFER_SIZE); } public SogouCorpusReader(InputStream in, int bufferSize) throws IOException { this(in, bufferSize, "doc", "url", "docno", "contenttitle", "content"); } public SogouCorpusReader(InputStream in, int bufferSize, String doc, String url, String docno, String title, String content) throws IOException { this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; docTag = doc.getBytes("UTF-8"); urlTag = url.getBytes("UTF-8"); docnoTag = docno.getBytes("UTF-8"); titleTag = title.getBytes("UTF-8"); contentTag = content.getBytes("UTF-8"); } public SogouCorpusReader(InputStream in, Configuration conf) throws IOException { this(in, conf.getInt("redpoll.sogou.doc.buffersize", DEFAULT_BUFFER_SIZE), conf.get("redpoll.sogou.doc", "doc"), conf.get("redpoll.sogou.doc.url","url"), conf.get("redpoll.sogou.doc.docno", "docno"), conf.get("redpoll.sogou.doc.contenttitle", "contenttitle"), conf.get("redpoll.sogou.doc.content", "content")); } /** * Gets a {@link redpoll.examples.Document} instance from sogou text file. If it reached EOF, return null. * @param a {@link redpoll.examples.Document} instance getting from sogou text file. * @return the position of this document, -1 if it reached EOF. * @throws IOException */ public long nextDoc(SogouDocument doc) throws IOException { currentSatus = STATUS.PREPARE; currentNode = NODE.NULL; try { while (currentNode != NODE.SUCCEED) { adjustBuffer(); if (currentSatus == STATUS.PREPARE) { if (buffer[bufferPosn] == '<') currentSatus = STATUS.START_ELEMENT; } else if (currentSatus == STATUS.START_ELEMENT) { if (buffer[bufferPosn] == '/') { // e.g. </node> currentSatus = STATUS.END_ELEMENT; } else { int start = bufferPosn; byte[] name = null; while (buffer[bufferPosn] != '>' && buffer[bufferPosn] != '\n') { bufferPosn++; if(bufferPosn >= bufferLength) { name = new byte[bufferLength - start]; System.arraycopy(buffer, start, name, 0, bufferLength - start); start = 0; } adjustBuffer(); } // if a element ends with '\n', we consider it as a wrong element if (buffer[bufferPosn] == '\n') failed(); // FAILED else if (buffer[bufferPosn] == '>') { int len = bufferPosn - start; if (len > 0) { if (name != null) { byte[] newname = new byte[name.length + len]; System.arraycopy(name, 0, newname, 0, name.length); System.arraycopy(buffer, start, newname, name.length, len); name = newname; } else { name = new byte[len]; System.arraycopy(buffer, start, name, 0, len); } startElement(name); } ignoreWhite(); currentSatus = STATUS.TEXT; } } } else if (currentSatus == STATUS.TEXT) { int start = bufferPosn; byte[] text = null; while (buffer[bufferPosn] != '<' && buffer[bufferPosn] != '\n') { bufferPosn++; if(bufferPosn >= bufferLength) { // FIXME: if the content of a document passes through more than two buffers, it will get wrong! text = new byte[bufferLength - start]; System.arraycopy(buffer, start, text, 0, bufferLength - start); start = 0; } adjustBuffer(); } if (buffer[bufferPosn] == '<') { int len = bufferPosn - start; if (len > 0) { if (text != null) { byte[] newtext = new byte[text.length + len]; System.arraycopy(text, 0, newtext, 0, text.length); System.arraycopy(buffer, start, newtext, text.length, len); text = newtext; } else { text = new byte[len]; System.arraycopy(buffer, start, text, 0, len); } characters(text, doc); } currentSatus = STATUS.START_ELEMENT; } else if (buffer[bufferPosn] == '\n') failed(); // FAILED } else if (currentSatus == STATUS.END_ELEMENT) { int start = bufferPosn; byte[] name = null; while (buffer[bufferPosn] != '>' && buffer[bufferPosn] != '\n') { bufferPosn++; if(bufferPosn >= bufferLength) { name = new byte[bufferLength - start]; System.arraycopy(buffer, start, name, 0, bufferLength - start); start = 0; } adjustBuffer(); } if (buffer[bufferPosn] == '>') { int len = bufferPosn - start; if (len > 0) { if (name != null) { byte[] newname = new byte[name.length + len]; System.arraycopy(name, 0, newname, 0, name.length); System.arraycopy(buffer, start, newname, name.length, len); name = newname; } else { name = new byte[len]; System.arraycopy(buffer, start, name, 0, len); } endElement(name); } ignoreWhite(); currentSatus = STATUS.PREPARE; } else if (buffer[bufferPosn] != '\n') failed(); // FAILED } bufferPosn++; } } catch (EOFException eofe) { return -1; } return currentDocPosn; } /** * Close the underlying stream. * @throws IOException */ public void close() throws IOException { in.close(); } private void ignoreWhite() throws IOException, EOFException { do { bufferPosn++; adjustBuffer(); } while (buffer[bufferPosn] == '\n' || buffer[bufferPosn] == '\r' || buffer[bufferPosn] == '\t' || buffer[bufferPosn] == ' '); bufferPosn--; } private void adjustBuffer() throws IOException, EOFException { if (bufferPosn >= bufferLength) { bufferCurrentPosn += bufferLength; bufferPosn = 0; bufferLength = in.read(buffer); if (bufferLength <= 0) throw new EOFException(); } } private void startElement(byte[] name) { if ((currentNode == NODE.NULL || currentNode == NODE.FAILED) && equals(docTag, name)) { currentDocPosn = bufferCurrentPosn + bufferPosn - docTag.length - 1; currentNode = NODE.DOC; } else if (currentNode == NODE.DOC && equals(urlTag, name)) { currentNode = NODE.URL; } else if (currentNode == NODE.URL && equals(docnoTag, name)) { currentNode = NODE.DOC_NO; } else if (currentNode == NODE.DOC_NO && equals(titleTag, name)) { currentNode = NODE.TITLE; } else if (currentNode == NODE.TITLE && equals(contentTag, name)) { currentNode = NODE.CONTENT; } else { currentNode = NODE.FAILED; } } private void endElement(byte[] name) { if (currentNode == NODE.CONTENT && equals(contentTag, name)) { currentNode = NODE.SUCCEED; } } private void characters(byte[] text, SogouDocument doc) { if (currentNode == NODE.URL) { doc.setPathBytes(text); } else if (currentNode == NODE.DOC_NO) { doc.setIdBytes(text); } else if (currentNode == NODE.TITLE) { doc.setTitleBytes(text); } else if (currentNode == NODE.CONTENT) { doc.setContentBytes(text); } } private void failed() { currentNode = NODE.FAILED; } private boolean equals(final byte [] left, final byte [] right) { return left == null && right == null? true: left == null && right != null? false: left != null && right == null? false: left.length != right.length? false: WritableComparator.compareBytes(left, 0, left.length, right, 0, right.length) == 0; } }
评论
hbase现在是在解决稳定性的问题,性能/空间的优化要下个大版本。 楼主搞的这个和apache mahout有什么区别,有空我们交流下。
hbase的主要贡献者都被M$收了,这事不靠谱. 目前hive算是最好的了.
SogouCorpusReader不用实现RecordReader吗?
要的, 只是没贴出来..我的原因是这东西还可以放在其它地方用, 实现RecordReader那就被hadoop框架局限了. 近期我对代码结构做了些调整
package redpoll.examples.sogou; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; import redpoll.text.Document; /** * Treats keys as offset in file and value as an document. * @author Jeremy Chow(coderplay@gmail.com) */ public class SogouRecordReader implements RecordReader<LongWritable, Document>{ private static final Log LOG = LogFactory.getLog(SogouRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long end; private long pos; private SogouCorpusReader in; public SogouRecordReader(Configuration job, FileSplit split) throws IOException { start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); if (codec != null) { in = new SogouCorpusReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) fileIn.seek(start); in = new SogouCorpusReader(fileIn, job); } this.pos = start; } public LongWritable createKey() { return new LongWritable(); } public Document createValue() { return new SogouDocument(); } public long getPos() throws IOException { return pos; } /** * Get the progress within the split */ public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } /** * Close the input stream */ public void close() throws IOException { if (in != null) { in.close(); } } public synchronized boolean next(LongWritable key, Document value) throws IOException { if(pos < end) { long docPos = in.nextDoc((SogouDocument)value); if(docPos < 0) return false; key.set(start + docPos); pos = start + in.getPosition(); return true; } return false; } }
最近上网有些问题,google的svn上不了, 所以一直没有commit不了
发表评论
-
抛砖引玉, 淘宝统一离线数据分析平台设计
2011-11-03 22:58 8203把这个拿出来的目的, 是想得到更多的反馈意见, 请邮件至zho ... -
NameNode优化笔记 (二)
2011-01-13 15:03 0事情发生在11月初至11月中旬, 云梯用户不断反映作业运行得慢 ... -
NameNode优化笔记 (一)
2011-01-12 10:32 6192很久没有发博客了, 最 ... -
我在Hadoop云计算会议的演讲
2010-10-26 14:59 9092点击下载演讲稿 由中科院计算所主办的“Hadoop ... -
分布式online与offline设计 slides
2010-08-25 00:24 4201花了两个小时简单了做了一个ppt,给兄弟公司相关人员讲解off ... -
演讲: Hadoop与数据分析
2010-05-29 20:35 7446前些天受金山软件公司西山居朋友的邀请, 去了趟珠海与金山的朋友 ... -
Hadoop的Mapper是怎么从HDFS上读取TextInputFormat数据的
2010-05-29 11:46 7973LineRecordReader.next(LongWri ... -
Anthill: 一种基于MapReduce的分布式DBMS
2010-05-11 22:47 3770MapReduce is a parallel computi ... -
HDFS的追加/刷新/读设计
2010-01-26 00:26 3274hdfs将在0.21版(尚未发布),把DFSOutputStr ... -
TFile, SequenceFile与gz,lzo压缩的测试
2010-01-07 22:47 5022先记一记,以后解释 :) $hadoop jar tf ... -
hive权限控制
2009-09-07 14:35 5098对hive的元数据表结构要作以下调整: hive用户不与表 ... -
avro编译
2009-07-04 00:36 3766avro是doug cutting主持的rpc ... -
Hive的一些问题
2009-06-01 16:51 3838偏激了一点. 总体来说H ... -
hive的编译模块设计
2009-05-22 15:39 3741很少在博客里写翻译的东西, 这次例外. 原文在这儿 . 译文 ... -
HIVE问答, 某天的hadoop群聊天记录
2009-05-07 17:10 10899某天晚上在hadoop群里一时兴起, 回答了一些hive相关的 ... -
暨南大学并行计算实验室MapReduce研究现状
2009-05-04 21:20 51254月份在学校花了半小时做的一个ppt, 内容是我们在应用ha ... -
hadoop上最多到底能放多少个文件?
2009-02-11 18:25 4332这主要取决于NameNode的内存。因为DFS集群运行时,文件 ... -
hadoop改进方面的胡思乱想
2009-02-04 10:57 44331. 我做数据挖掘的时候, 经常需要只对key分组,不必排序。 ... -
hadoop源码分析之MapReduce(二)
2009-01-18 22:14 8601任务的申请、派发与执行 TaskTracker.run() ... -
hadoop源码分析
2008-12-26 15:37 4941blog挺难贴图的, 我已经建了一个开源的项目, 用来存放文 ...
相关推荐
Hadoop 自定义 Partitioner 实现
在Hadoop MapReduce框架中,InputFormat是处理输入数据的核心组件。它负责将原始数据分割成逻辑上的键值对(key-value pairs),然后为每个分区分配一个或多个这些键值对给Mapper。默认情况下,Hadoop支持如...
自定义inputFormat&&outputFormat1
Hadoop 自定义 Partitioner 实现
在标题“Hadoop 自定义 Partitioner 源代码”中,我们可以理解为讨论的是如何创建和理解 Partitioner 的源代码,以便于开发者可以更好地控制 MapReduce job 中的数据分片过程。自定义 Partitioner 可能涉及到以下...
在Hadoop生态系统中,自定义类型编程是开发者经常会遇到的需求,尤其当处理的数据类型不局限于Hadoop默认支持的基本类型(如IntWritable、Text等)时。本教程将深入探讨如何在MapReduce作业中创建和使用自定义数据...
Hadoop 代码使用方式 ...hadoop jar hadoop-mapreduce-custom-inputformat-1.0-SNAPSHOT.jar org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10
《Hadoop系统搭建及项目实践》课件02Hadoop安装与配置管理.pdf《Hadoop系统搭建及项目实践》课件02Hadoop安装与配置管理.pdf《Hadoop系统搭建及项目实践》课件02Hadoop安装与配置管理.pdf《Hadoop系统搭建及项目实践...
总的来说,这个压缩包中的Hadoop实例将为初学者提供宝贵的实践经验,帮助他们理解如何在实际项目中运用Hadoop处理大数据问题。通过深入研究这些案例,开发者可以更好地掌握Hadoop的核心原理,并具备解决实际问题的...
Hive的InputFormat允许我们灵活地处理各种数据源和格式,通过自定义InputFormat,我们可以按需定制数据读取逻辑,比如这里演示的按照空格拆分日志文件。理解并掌握InputFormat对于优化Hive查询性能和处理复杂数据...
hadoop最新研究重点和进展,值得大家关注和学习
Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...
在本项目中,我们主要探讨的是如何利用SpringBoot与Hadoop进行数据操作,以及如何解决在IE浏览器中通过Servlet访问Hadoop存储的图片时出现显示源码的问题。下面将详细阐述这两个关键知识点。 首先,SpringBoot是...
在这个"**HadoopDemo-master**"项目中,你将有机会实践这些概念,通过实际操作加深理解。MapReduce的编程模型、HDFS的文件操作、Zookeeper的集群管理以及Hive的数据分析都将是你探索的重点。这不仅有助于提升你的...
7. **扩展性与插件开发**:学习如何为Hadoop开发自定义InputFormat、OutputFormat、Partitioner、Combiner等组件。 8. **实战项目**:结合实际案例,运用所学知识解决大数据处理问题,如日志分析、推荐系统等。 ...
### Hadoop的最新进展:深度解析与未来展望 Hadoop,作为大数据处理领域的核心框架,其每一次技术迭代都引领着行业的发展方向。在SACC2011(Storage and Cloud Computing Conference)上,百度的Hadoop技术领导者马...
《Hadoop系统搭建及项目实践》课件05Hadoop IO操作.pdf《Hadoop系统搭建及项目实践》课件05Hadoop IO操作.pdf《Hadoop系统搭建及项目实践》课件05Hadoop IO操作.pdf《Hadoop系统搭建及项目实践》课件05Hadoop IO操作...
Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两个组件共同构成了Hadoop分布式文件系统和数据处理模型的基础。 在分布式计算领域,Google是领先的公司之一,它的计算平台成为了后来类似开源项目...
例如,使用Hadoop的InputFormat和OutputFormat接口,开发者可以定义自定义的数据输入和输出格式。同时,Hadoop的Configuration类使得配置参数变得简单,而FileSystem API则允许开发者操作HDFS上的文件。 在实际开发...