一、来自 hadoop in action 上的实例,我在这里做了一个总结。文件内容如下:
17:16:20 http://blackproof.iteye.com/blog/1806263 17:16:21 http://blackproof.iteye.com/blog/1806264 17:16:56 http://blackproof.iteye.com/blog/1806265 17:16:30 http://blackproof.iteye.com/blog/1806266 17:16:45 http://blackproof.iteye.com/blog/1806267 17:16:23 http://blackproof.iteye.com/blog/1806268
需求是:把后面的URLString 封装成 URL类型。代码如下:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import org.apache.hadoop.io.Writable; public class URLWritable implements Writable { protected URL url; public URLWritable() { } public URLWritable(URL url) { this.url = url; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(url.toString()); } @Override public void readFields(DataInput in) throws IOException { this.url = new URL(in.readUTF()); } public void set(String string) { try { this.url = new URL(string); } catch (MalformedURLException e) { throw new RuntimeException("Should not have happened " + e.toString()); } } }
import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class TimeUrlLineRecordReader extends RecordReader<Text, URLWritable> { public static final String Time_URL_SEPERATOR = "mapreduce.input.keyvaluelinerecordreader.key.value.separator"; private final LineRecordReader lineRecordReader; private byte separator = (byte) '\t'; private Text innerValue; private Text key; private URLWritable value; public static int findSeparator(byte[] utf, int start, int length, byte sep) { for (int i = start; i < (start + length); i++) { if (utf[i] == sep) { return i; } } return -1; } public static void setKeyValue(Text key, URLWritable value, byte[] line, int lineLen, int pos) { if (pos == -1) { key.set(line, 0, lineLen); value.set(StringUtils.EMPTY); } else { key.set(line, 0, pos); String url = null; System.arraycopy(line, pos + 1,url , 0, lineLen - pos - 1); value.set(url); } } public TimeUrlLineRecordReader(Configuration conf) throws IOException { lineRecordReader = new LineRecordReader(); String sepStr = conf.get(Time_URL_SEPERATOR, "\t"); this.separator = (byte) sepStr.charAt(0); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { lineRecordReader.initialize(split, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { byte[] line = null; int lineLen = -1; if (lineRecordReader.nextKeyValue()) { innerValue = lineRecordReader.getCurrentValue(); line = innerValue.getBytes(); lineLen = innerValue.getLength(); } else { return false; } if (line == null) { return false; } if (key == null) { key = new Text(); } if (value == null) { value = new URLWritable(); } int pos = findSeparator(line, 0, lineLen, this.separator); setKeyValue(key, value, line, lineLen, pos); return true; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public URLWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void close() throws IOException { lineRecordReader.close(); } }
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable>{ @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory( context.getConfiguration()).getCodec(file); return codec == null; } @Override public RecordReader<Text, URLWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { context.setStatus(split.toString()); return new TimeUrlLineRecordReader(context.getConfiguration()); } }
import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class CustomTimeUrl extends Configured implements Tool { public static class CustomTimeUrlMapper extends Mapper<Text, URLWritable, Text, URLWritable> { @Override protected void map(Text key, URLWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class CustomTimeUrlReducer extends Reducer<Text, URLWritable, Text, URLWritable> { @Override protected void reduce(Text key, Iterable<URLWritable> values,Context context)throws IOException, InterruptedException { for (URLWritable value : values) { context.write(key, value); } } } @Override public int run(String[] args) throws Exception { Job job = new Job(getConf()); job.setJarByClass(getClass()); job.setJobName("CustomTimeUrl"); job.setInputFormatClass(TimeUrlTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(URLWritable.class); job.setMapperClass(CustomTimeUrlMapper.class); job.setReducerClass(CustomTimeUrlReducer.class); FileInputFormat.setInputPaths(job, new Path("/timeurl/input/")); FileOutputFormat.setOutputPath(job, new Path("/timeurl/output")); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { int result = ToolRunner.run(new TimeUrl(), args); System.exit(result); } }
相关推荐
DAPlant可能是本文压缩包中的一个数据分析项目实例,它可能包含了实现上述功能的源代码和文档,帮助读者更好地理解和实践基于Hadoop的数据分析系统。通过研究这个项目,我们可以学习到如何在实际环境中应用Hadoop...
通过这个实验,学生将深入理解Hadoop MapReduce的工作原理,掌握如何处理自定义数据类型,使用Combiner优化性能,以及如何通过Eclipse提交和管理MapReduce任务。这些都是大数据处理和分布式计算中的核心技能,对于...
1.5.1 数据类型 1.5.2 操作和函数 1.6 表 1.6.1 托管表(Managed Tables)和外部表(External Tables) 1.6.2 分区(Partitions)和桶(Buckets) 1.6.3 存储格式 1.6.4 导入数据 1.6.5 表的修改 1.6.6 表...
ong(this.upFlow); out.writeLong(this.downFlow); out.writeLong(this.sumFlow);...在统计手机号码的流量数据案例中,`FlowBean`就是这样一个关键的自定义数据类型,使得我们可以高效地处理和分析用户流量信息。
在《实战Hadoop》的源代码中,你可以找到关于Hadoop配置、集群搭建、数据输入输出、MapReduce编程模型等多方面的实例。例如,可能会包含以下内容: 1. **Hadoop环境搭建**:源代码可能包含如何在本地或虚拟机上安装...
1.5.1 数据类型 1.5.2 操作和函数 1.6 表 1.6.1 托管表(Managed Tables)和外部表(External Tables) 1.6.2 分区(Partitions)和桶(Buckets) 1.6.3 存储格式 1.6.4 导入数据 1.6.5 表的修改 1.6.6 表的丢弃 1.7...
7. **数据输入与输出**:理解Hadoop如何处理不同类型的数据源,如文本文件、CSV、JSON等,并学习使用InputFormat和OutputFormat自定义数据格式。 8. **Hadoop应用实例**:通过具体的案例,如网页日志分析、推荐系统...
用户可以通过自定义InputFormat和OutputFormat来处理特定类型的数据。 8. **故障恢复与容错机制**:Hadoop具有内置的故障检测和恢复机制,如心跳检测、数据块的冗余复制等,以确保系统的稳定运行。如果DataNode或...
总结来说,Hadoop的序列化机制主要基于`Writable`接口,通过自定义的序列化和反序列化方法实现数据的转换。`ObjectWritable`作为通用的载体,适应了RPC通信中不同类型的对象传输。`WritableFactories`则是保证`...
这本书的示例代码是理解和学习Hadoop的重要资源,它提供了实践操作的实例,帮助读者掌握Hadoop的使用和开发技巧。 在Hadoop中,主要涉及以下几个关键知识点: 1. **Hadoop分布式文件系统(HDFS)**:HDFS是Hadoop的...
- **新API**:支持更广泛的数据类型,包括自定义类型。 #### 三、实例分析:使用Hadoop旧API统计文件行数 以下是一个使用Hadoop旧API统计文件行数的例子: ```java package bookCount; import java.io....
3. **数据输入与输出**:探讨InputFormat和OutputFormat接口,理解如何自定义输入输出格式以适应不同类型的数据源。 4. **错误处理与容错机制**:讲解Hadoop的检查点、重试和故障恢复策略,以确保任务的可靠性。 5...
5. **Block Size调整**:Hadoop 2.x允许用户自定义Block大小,以适应不同规模的数据处理需求。 总之,Hadoop 2.x是一个强大且灵活的大数据处理平台,它的改进使得在处理大规模数据时更为高效,同时也为各种大数据...
例如,结合AI和机器学习技术来处理更加复杂的数据类型和应用场景,以及与云计算平台的深度融合等。 通过上述对《Hadoop权威指南》第三版的深入解析,我们可以看到Hadoop作为一个成熟的大数据处理平台,在数据存储、...
Hadoop是一个开源的分布式计算框架,它能够有效地处理和分析大数据。Hadoop系列教程详细讲解了Hadoop的安装和实例应用,...通过Hadoop教程的学习,用户可以掌握如何处理大规模数据集,并从大规模数据集中获得洞察力。
* Reducer的输入数据类型对应Mapper的输出数据类型,也是KV。 * Reducer的业务逻辑写在reduce()方法中。 * Reducetask进程对每一组相同k的,v>组调用一次reduce()方法。 3. Driver阶段: * 整个程序需要一个...
- **实例**:提供了几个使用Pig处理数据的实例。 - **与数据库比较**:讨论了Pig与传统数据库管理系统之间的异同。 - **PigLatin**:讲解了PigLatin脚本语言的特点和用法。 - **用户定义函数**:介绍了如何在Pig中...
- **自定义类型**:如何创建和使用自定义数据类型来更好地处理特定的数据。 - **Combiner类**:一种特殊的Reducer,可以在Map端进行局部聚合,减少网络传输开销。 #### 知识点六:Hadoop编程实践 - **最佳实践**:...
6. **Hadoop数据输入与输出**:学习如何使用Hadoop的InputFormat和OutputFormat接口自定义数据格式,以及如何处理各种类型的数据源,如文本、CSV或自定义二进制格式。 7. **Hadoop实战案例**:通过实际案例,比如...