package cn.luxh.app; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; /** * @author Luxh * */ public class WordStat { /** * TableMapper<Text,IntWritable> Text:输出的key类型,IntWritable:输出的value类型 */ public static class MyMapper extends TableMapper<Text,IntWritable>{ private static IntWritable one = new IntWritable(1); private static Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //表里面只有一个列族,所以我就直接获取每一行的值 String words = Bytes.toString(value.list().get(0).getValue()); StringTokenizer st = new StringTokenizer(words); while (st.hasMoreTokens()) { String s = st.nextToken(); word.set(s); context.write(word, one); } } } /** * TableReducer<Text,IntWritable> Text:输入的key类型,IntWritable:输入的value类型,ImmutableBytesWritable:输出类型 */ public static class MyReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val:values) { sum+=val.get(); } //添加一行记录,每一个单词作为行键 Put put = new Put(Bytes.toBytes(key.toString())); //在列族result中添加一个标识符num,赋值为每个单词出现的次数 //String.valueOf(sum)先将数字转化为字符串,否则存到数据库后会变成\x00\x00\x00\x这种形式 //然后再转二进制存到hbase。 put.add(Bytes.toBytes("result"), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf(sum))); context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf,"wordstat"); job.setJarByClass(Blog.class); Scan scan = new Scan(); //指定要查询的列族 scan.addColumn(Bytes.toBytes("content"),null); //指定Mapper读取的表为word TableMapReduceUtil.initTableMapperJob("word", scan, MyMapper.class, Text.class, IntWritable.class, job); //指定Reducer写入的表为stat TableMapReduceUtil.initTableReducerJob("stat", MyReducer.class, job); System.exit(job.waitForCompletion(true)?0:1); } }
相关推荐
MapReduce是一种分布式计算模型,常用于处理大规模数据集,而HBase是一个分布式、列式存储的NoSQL数据库,适合处理大规模结构化数据。通过MapReduce,我们可以高效地在不同HBase表之间移动大量数据。 首先,我们...
13. 使用 MapReduce 实现 Join 操作:使用 MapReduce 来实现数据的 Join 操作,以便将多个数据源合并成一个结果。 14. 使用 MapReduce 实现排序:使用 MapReduce 来实现数据的排序,以便对数据进行排序处理。 15. ...
MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。它将复杂的并行计算任务分解为两个主要阶段:Map(映射)和Reduce(化简),使得大规模数据处理变得简单易懂。C语言实现...
4. **输出(Output)**:完成化简操作后,结果会被写入到HDFS(Hadoop Distributed File System)中,作为新的数据源供后续任务使用。 本书涵盖了多个关键知识点,包括但不限于: 1. **Hadoop环境搭建**:讲解如何...
在压缩包中的代码文件,可能是各个章节的示例程序,涵盖了Hadoop的安装配置、数据输入输出、MapReduce编程模型、Hadoop流(Hadoop Streaming)、Hive数据仓库、Pig脚本语言、HBase分布式数据库、Oozie工作流管理、...
这个教程涵盖了多个关键领域,包括HBase数据库、MapReduce编程模型、多语言MapReduce实现、Chukwa监控系统、Greenplum架构以及Flume日志收集系统。下面,我们将详细探讨这些知识点。 首先,HBase是一个分布式、面向...
5. **数据输入与输出**:书中详细讲解了Hadoop如何处理各种类型的数据源,如文本文件、Avro数据、Parquet列式存储格式,以及如何通过不同的OutputFormat输出结果。 6. **MapReduce编程**:通过实例展示了如何编写...
第四章《MapReduce入门编程》深入MapReduce编程模型,解析Map和Reduce阶段的工作原理。这一章通过实例演示如何编写Java MapReduce程序,讲解输入输出格式、分区器、Combiner等相关概念,帮助理解MapReduce的执行流程...
3. **MapReduce编程模型**:MapReduce是Hadoop处理大规模数据的核心计算模型,由“Map”和“Reduce”两个阶段组成。Map将输入数据分割并转换,Reduce则对Map的输出进行聚合和总结。 4. **YARN**:YARN是Hadoop的...
3. **Chap 3 - MapReduce编程**:涵盖了MapReduce的编程模型,包括Mapper和Reducer类的实现,以及Combiner和Partitioner的作用。Combiner可以局部聚合数据,减少网络传输;Partitioner则决定数据在Reducer间的分布。...
MapReduce则是一种编程模型,用于大规模数据集的并行计算。 在Hadoop源代码中,我们可以深入理解其内部工作原理,包括数据块的分发、容错机制、任务调度和数据本地化等关键功能。以下是Hadoop源代码中的一些关键...
总的来说,Java-org.apache.hadoop涉及到的知识点广泛且深入,包括分布式系统基础、HDFS的架构和操作、MapReduce编程模型、集群管理和资源调度等。掌握这些知识对于开发分布式应用、大数据处理和分析至关重要。通过...
MapReduce则是一种编程模型,用于大规模数据集的并行计算。 二、Hadoop的应用范围 Hadoop广泛应用于数据挖掘、日志分析、互联网搜索、推荐系统等领域。通过其分布式处理能力,可以处理PB级别的数据,帮助企业快速...
6. **Hadoop数据输入与输出**:学习如何使用Hadoop的InputFormat和OutputFormat接口自定义数据格式,以及如何处理各种类型的数据源,如文本、CSV或自定义二进制格式。 7. **Hadoop实战案例**:通过实际案例,比如...
源代码中,`MapTask`和`ReduceTask`分别对应这两个阶段,`InputFormat`和`OutputFormat`定义了数据的输入和输出格式。 3. **YARN**:作为Hadoop的资源管理系统,YARN负责调度计算资源,确保任务高效执行。`...
- 数据输入输出:掌握多种数据源(如CSV、JSON、XML)的读取和写入,以及使用InputFormat和OutputFormat接口定制数据处理。 3. 进阶技术: - Pig和Hive:学习使用Pig Latin和HQL(Hive查询语言)进行数据处理,...
- **数据输入与输出**:掌握多种数据源接入Hadoop的方式,如CSV、JSON等,并熟悉数据导出到其他系统(如数据库或数据分析工具)的方法。 - **故障恢复与容错机制**:了解Hadoop如何处理节点故障,理解NameNode、...
在"Pig编程指南"中,会介绍如何使用`LOAD`命令从各种数据源加载数据,如HDFS、Cassandra或Amazon S3。同时,`STORE`命令用于将处理后的数据保存回存储系统。源码中的示例可能包括不同格式的数据输入输出,如CSV、...
3. **MapReduce编程模型** - **Map阶段**: 输入数据被分割成多个块,每个块在节点上并行执行map函数。 - **Shuffle与Sort**: map输出的数据经过排序和分区后,传递给reduce任务。 - **Reduce阶段**: 将接收到的...
5. **数据输入与输出**:了解多种数据源的接入方式,如SequenceFile、TextFile等,以及如何通过InputFormat和OutputFormat自定义处理。 6. **Hadoop的监控与调试**:学习如何使用Nagios、Ganglia等工具监控集群状态...