1. 上传数据到hdfs中
2. 写Map\Reduce过程
3. 输出结果到hbase中
Tips:
1. 因为map是从hdfs中取数据,因此没有太大变化;而reduce需要输出结果到hbase中,所以这里继承了 TableReduce<keyin,valuein,keyout>,这里没有valueout,但是规定TableReduce的 valueout必须是Put或者Delete实例。
2. 已经确定了输入输出路径,所以不用在eclipse中配置Run Configuration了。
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class WordCountHBase { /** * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // TODO Auto-generated method stub String tablename = "wordcount"; Configuration conf = new Configuration(); conf.set(TableOutputFormat.OUTPUT_TABLE, tablename); createHBaseTable(tablename); Job job = new Job(conf,"WordCount table"); job.setJarByClass(WordCountHBase.class); job.setNumReduceTasks(3); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); // 设置输入目录 FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/input/wordcount/*")); System.exit(job.waitForCompletion(true)?0:1); } public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text text = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{ String s = value.toString(); StringTokenizer st = new StringTokenizer(s); while(st.hasMoreTokens()) { text.set(st.nextToken()); context.write(text, one); } } } public static class Reduce extends TableReducer<Text, IntWritable, NullWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{ int sum = 0; for(IntWritable i:values) { sum+=i.get(); } Put put = new Put(Bytes.toBytes(key.toString())); // row,columnFamily:column,value = word,content:count,sum put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(String.valueOf(sum))); context.write(NullWritable.get(), put); } } /** * create a table * @param tablename * @throws IOException */ public static void createHBaseTable(String tablename) throws IOException { HTableDescriptor htd = new HTableDescriptor(tablename); HColumnDescriptor col = new HColumnDescriptor("content"); htd.addFamily(col); Configuration cfg = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(cfg); if(admin.tableExists(tablename)) { System.out.println("table exists,trying recreate table!"); admin.disableTable(tablename); admin.deleteTable(tablename); admin.createTable(htd); } else { System.out.println("create new table:"+tablename); admin.createTable(htd); } } }
相关推荐
- `hadoop fs -put <localfile> <hdfsdir>`:将本地文件上传到HDFS。 - `hadoop fs -ls <path>`:列出HDFS路径下的文件和目录列表。 #### HDFS Java API 除了命令行工具之外,HDFS还提供了Java API,允许开发者...
【标题】:“最高气温 map reduce hadoop 实例” 在大数据处理领域,Hadoop是一个不可或缺的开源框架,它专为分布式存储和处理大量数据而设计。本实例将介绍如何使用Hadoop MapReduce解决一个实际问题——找出给定...
**逐步详解**:从输入文件的读取到最终输出结果的生成,Map-Reduce框架通过Map和Reduce两个阶段实现了数据的高效处理,展现了其强大的数据并行处理能力。 #### Map-Reduce 用户界面 **负载**:用户可以通过配置...
- 数据分区:中间结果根据键进行分区,确保相同键的值被发送到同一个 Reduce 任务。 4. **Reduce 阶段**: - 合并操作:Reduce 函数接收来自不同 Map 任务的具有相同键的值,并执行合并操作。 - 输出结果:最终...
在本例中,需要将`student.txt`和`student_score.txt`这两个文件上传到HDFS中。这通常可以通过Hadoop命令行工具`hadoop fs -put`实现。例如: ``` hadoop fs -put student.txt /path/to/directory/ hadoop fs -put ...
- **Reduce 阶段**:在 Map 阶段之后,所有由 Map 函数产生的中间结果会被按照键进行排序和分组,然后交给 Reduce 函数进行汇总处理。Reduce 函数会对相同键的键值对集合执行聚合操作,产生最终的输出结果。 #### ...
3. **Shuffle and Sort**: Map 任务完成后,系统将键值对按照键进行排序,并将其分发到对应的 Reduce 任务中。 4. **Reducer**: 每个 Reduce 任务接收来自多个 Map 任务的键值对,并进一步处理这些键值对,生成最终...
### Map-Reduce 实现细节与问题解决 #### 客户端操作流程 Map-Reduce 的启动过程始于客户端向系统提交任务。此过程的核心是通过 `JobClient` 类的 `runJob` 静态方法来实现。具体步骤如下: 1. **JobClient 对象...
3. Shuffle阶段:Hadoop框架将所有Map任务输出的中间键值对进行排序和分组,以便所有具有相同键的值被发送到同一个Reduce任务。 4. Reduce阶段:Reduce任务对具有相同键的中间值进行归约操作,最终输出结果数据。 5....
2. **数据预处理**:使用Map/Reduce框架对收集到的数据进行清洗、去重等预处理操作。 3. **索引构建**:构建倒排索引,以便快速定位包含特定关键词的文档。 4. **查询处理**:用户发起查询请求后,搜索引擎会根据倒...
在Windows平台上进行Hadoop的Map/Reduce开发可能会比在Linux环境下多一些挑战,但通过详细的步骤和理解Map/Reduce的工作机制,开发者可以有效地克服这些困难。以下是对标题和描述中涉及知识点的详细说明: **Hadoop...
MapReduce的工作机制分为两个主要阶段:Map阶段和Reduce阶段,这两个阶段之间通过中间结果 Shuffle 和 Sort 阶段连接。 1. Map阶段:在这个阶段,原始输入数据被分割成多个小块(Split),每个Split会被分配到集群...
HDFS将大文件分割成多个块,并将这些块复制到多台机器上,以提高数据可用性和容错性。 2. **MapReduce**:MapReduce是Hadoop用于并行处理和分析存储在HDFS中的大型数据集的计算框架。它包含两个主要阶段:Map阶段和...
MapReduce是Google提出的一种分布式计算模型,它将复杂的并行计算任务分解为两个主要阶段:Map(映射)和Reduce(规约)。Map阶段负责将原始输入数据分割成多个键值对,并分别处理;Reduce阶段则负责对Map阶段产生的...
- **Reduce阶段**:Reduce 函数将接收到的所有键值对写入到 HDFS 上的新文件中,形成一个或多个输出文件。 在实际操作中,我们还需要考虑数据处理的性能、容错性和数据一致性。例如,可以使用 HBase 的 ...
《通过DataX同步Oracle到HDFS的详细指南》 在大数据处理中,数据的迁移和同步是不可或缺的一环。DataX作为一个高效、稳定且易用的数据同步工具,被广泛应用于不同数据源之间的数据流动,例如从关系型数据库Oracle到...
MapReduce的设计理念源于Google的同名论文,它通过将大规模数据处理任务分解为两个阶段:Map(映射)和Reduce(化简),使得海量数据能够在多台计算机上并行处理,极大地提高了数据处理效率。 Map阶段是数据处理的...
2. 下载 HDFS 文件:使用 FileSystem.copyToLocalFile() 方法将 HDFS 文件下载到本地文件系统。 3. 创建 HDFS 目录:使用 FileSystem.mkdirs() 方法创建一个新的 HDFS 目录。 4. 删除 HDFS 文件或目录:使用 ...
Map阶段将原始数据切分成多个小块,分别进行处理,而Reduce阶段则负责汇总各个Map阶段的结果,最终得到我们需要的输出。 **Hadoop** 是一个开源的分布式计算框架,它实现了MapReduce模型,并提供了高容错性和可扩展...