`
liyonghui160com
  • 浏览: 774677 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hdfs Map\Reduce到haase

阅读更多

 

1. 上传数据到hdfs中

2. 写Map\Reduce过程

3. 输出结果到hbase中

Tips:

1. 因为map是从hdfs中取数据,因此没有太大变化;而reduce需要输出结果到hbase中,所以这里继承了 TableReduce<keyin,valuein,keyout>,这里没有valueout,但是规定TableReduce的 valueout必须是Put或者Delete实例。

 

2. 已经确定了输入输出路径,所以不用在eclipse中配置Run Configuration了。

 

    import java.io.IOException;  
    import java.util.StringTokenizer;  
      
    import org.apache.hadoop.conf.Configuration;  
    import org.apache.hadoop.fs.Path;  
    import org.apache.hadoop.hbase.HBaseConfiguration;  
    import org.apache.hadoop.hbase.HColumnDescriptor;  
    import org.apache.hadoop.hbase.HTableDescriptor;  
    import org.apache.hadoop.hbase.client.HBaseAdmin;  
    import org.apache.hadoop.hbase.client.Put;  
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;  
    import org.apache.hadoop.hbase.mapreduce.TableReducer;  
    import org.apache.hadoop.hbase.util.Bytes;  
    import org.apache.hadoop.io.IntWritable;  
    import org.apache.hadoop.io.LongWritable;  
    import org.apache.hadoop.io.NullWritable;  
    import org.apache.hadoop.io.Text;  
    import org.apache.hadoop.mapreduce.Job;  
    import org.apache.hadoop.mapreduce.Mapper;  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
      
    public class WordCountHBase {  
      
        /** 
         * @param args 
         * @throws IOException  
         * @throws ClassNotFoundException  
         * @throws InterruptedException  
         */  
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {  
            // TODO Auto-generated method stub  
            String tablename = "wordcount";  
            Configuration conf = new Configuration();  
            conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);  
            createHBaseTable(tablename);  
            Job job = new Job(conf,"WordCount table");  
            job.setJarByClass(WordCountHBase.class);  
            job.setNumReduceTasks(3);  
            job.setMapperClass(Map.class);  
            job.setReducerClass(Reduce.class);  
            job.setMapOutputKeyClass(Text.class);  
            job.setMapOutputValueClass(IntWritable.class);  
            job.setInputFormatClass(TextInputFormat.class);  
            job.setOutputFormatClass(TableOutputFormat.class);  
            // 设置输入目录   
            FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/input/wordcount/*"));   
            System.exit(job.waitForCompletion(true)?0:1);  
        }  
      
        public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {  
            private final static IntWritable one = new IntWritable(1);  
            private Text text = new Text();   
              
            @Override  
            public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{  
                String s = value.toString();  
                StringTokenizer st = new StringTokenizer(s);  
                while(st.hasMoreTokens()) {  
                    text.set(st.nextToken());  
                    context.write(text, one);  
                }  
            }  
        }  
          
        public static class Reduce extends TableReducer<Text, IntWritable, NullWritable> {  
              
            @Override  
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{  
                int sum = 0;  
                for(IntWritable i:values) {  
                    sum+=i.get();  
                }  
                Put put = new Put(Bytes.toBytes(key.toString()));  
                // row,columnFamily:column,value = word,content:count,sum   
                put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(String.valueOf(sum)));  
                context.write(NullWritable.get(), put);  
            }  
        }  
        /**  
         * create a table  
         * @param tablename  
         * @throws IOException  
         */  
        public static void createHBaseTable(String tablename) throws IOException {  
            HTableDescriptor htd = new HTableDescriptor(tablename);  
            HColumnDescriptor col = new HColumnDescriptor("content");  
            htd.addFamily(col);  
            Configuration cfg = HBaseConfiguration.create();  
            HBaseAdmin admin = new HBaseAdmin(cfg);  
            if(admin.tableExists(tablename)) {  
                System.out.println("table exists,trying recreate table!");  
                admin.disableTable(tablename);  
                admin.deleteTable(tablename);  
                admin.createTable(htd);  
            }  
            else {  
                System.out.println("create new table:"+tablename);  
                admin.createTable(htd);  
            }  
        }  
    }  

 

 

 

分享到:
评论

相关推荐

    hadoop,map,reduce,hdfs

    - `hadoop fs -put &lt;localfile&gt; &lt;hdfsdir&gt;`:将本地文件上传到HDFS。 - `hadoop fs -ls &lt;path&gt;`:列出HDFS路径下的文件和目录列表。 #### HDFS Java API 除了命令行工具之外,HDFS还提供了Java API,允许开发者...

    最高气温 map reduce hadoop 实例

    【标题】:“最高气温 map reduce hadoop 实例” 在大数据处理领域,Hadoop是一个不可或缺的开源框架,它专为分布式存储和处理大量数据而设计。本实例将介绍如何使用Hadoop MapReduce解决一个实际问题——找出给定...

    hadoop map-reduce turorial

    **逐步详解**:从输入文件的读取到最终输出结果的生成,Map-Reduce框架通过Map和Reduce两个阶段实现了数据的高效处理,展现了其强大的数据并行处理能力。 #### Map-Reduce 用户界面 **负载**:用户可以通过配置...

    Hadoop Map Reduce教程

    - 数据分区:中间结果根据键进行分区,确保相同键的值被发送到同一个 Reduce 任务。 4. **Reduce 阶段**: - 合并操作:Reduce 函数接收来自不同 Map 任务的具有相同键的值,并执行合并操作。 - 输出结果:最终...

    现有student.txt和student-score.txt 将两个文件上传到hdfs上 使用Map/Reduce框架完成下面

    在本例中,需要将`student.txt`和`student_score.txt`这两个文件上传到HDFS中。这通常可以通过Hadoop命令行工具`hadoop fs -put`实现。例如: ``` hadoop fs -put student.txt /path/to/directory/ hadoop fs -put ...

    hadoop map reduce 中文教程

    - **Reduce 阶段**:在 Map 阶段之后,所有由 Map 函数产生的中间结果会被按照键进行排序和分组,然后交给 Reduce 函数进行汇总处理。Reduce 函数会对相同键的键值对集合执行聚合操作,产生最终的输出结果。 #### ...

    Hadoop Map-Reduce教程

    3. **Shuffle and Sort**: Map 任务完成后,系统将键值对按照键进行排序,并将其分发到对应的 Reduce 任务中。 4. **Reducer**: 每个 Reduce 任务接收来自多个 Map 任务的键值对,并进一步处理这些键值对,生成最终...

    map-reduce详细

    ### Map-Reduce 实现细节与问题解决 #### 客户端操作流程 Map-Reduce 的启动过程始于客户端向系统提交任务。此过程的核心是通过 `JobClient` 类的 `runJob` 静态方法来实现。具体步骤如下: 1. **JobClient 对象...

    第02节:hadoop精讲之map reduce原理及代码.pdf

    3. Shuffle阶段:Hadoop框架将所有Map任务输出的中间键值对进行排序和分组,以便所有具有相同键的值被发送到同一个Reduce任务。 4. Reduce阶段:Reduce任务对具有相同键的中间值进行归约操作,最终输出结果数据。 5....

    基于Map/Reduce的分布式搜索引擎研究

    2. **数据预处理**:使用Map/Reduce框架对收集到的数据进行清洗、去重等预处理操作。 3. **索引构建**:构建倒排索引,以便快速定位包含特定关键词的文档。 4. **查询处理**:用户发起查询请求后,搜索引擎会根据倒...

    Windows平台下Hadoop的Map/Reduce开发

    在Windows平台上进行Hadoop的Map/Reduce开发可能会比在Linux环境下多一些挑战,但通过详细的步骤和理解Map/Reduce的工作机制,开发者可以有效地克服这些困难。以下是对标题和描述中涉及知识点的详细说明: **Hadoop...

    第02节:hadoop精讲之map reduce原理及代码.rar

    MapReduce的工作机制分为两个主要阶段:Map阶段和Reduce阶段,这两个阶段之间通过中间结果 Shuffle 和 Sort 阶段连接。 1. Map阶段:在这个阶段,原始输入数据被分割成多个小块(Split),每个Split会被分配到集群...

    MR处理HDFS日志样例

    HDFS将大文件分割成多个块,并将这些块复制到多台机器上,以提高数据可用性和容错性。 2. **MapReduce**:MapReduce是Hadoop用于并行处理和分析存储在HDFS中的大型数据集的计算框架。它包含两个主要阶段:Map阶段和...

    基于Hadoop-Map Reduce的算法.zip

    MapReduce是Google提出的一种分布式计算模型,它将复杂的并行计算任务分解为两个主要阶段:Map(映射)和Reduce(规约)。Map阶段负责将原始输入数据分割成多个键值对,并分别处理;Reduce阶段则负责对Map阶段产生的...

    HDFS 通过mapreduce 进行 HBase 导入导出

    - **Reduce阶段**:Reduce 函数将接收到的所有键值对写入到 HDFS 上的新文件中,形成一个或多个输出文件。 在实际操作中,我们还需要考虑数据处理的性能、容错性和数据一致性。例如,可以使用 HBase 的 ...

    3、通过datax同步oracle相关-oracle到hdfs

    《通过DataX同步Oracle到HDFS的详细指南》 在大数据处理中,数据的迁移和同步是不可或缺的一环。DataX作为一个高效、稳定且易用的数据同步工具,被广泛应用于不同数据源之间的数据流动,例如从关系型数据库Oracle到...

    hadoop中map/reduce

    MapReduce的设计理念源于Google的同名论文,它通过将大规模数据处理任务分解为两个阶段:Map(映射)和Reduce(化简),使得海量数据能够在多台计算机上并行处理,极大地提高了数据处理效率。 Map阶段是数据处理的...

    HDFS文件系统基本文件命令、编程读写HDFS

    2. 下载 HDFS 文件:使用 FileSystem.copyToLocalFile() 方法将 HDFS 文件下载到本地文件系统。 3. 创建 HDFS 目录:使用 FileSystem.mkdirs() 方法创建一个新的 HDFS 目录。 4. 删除 HDFS 文件或目录:使用 ...

    map-reduce实现分布式爬虫

    Map阶段将原始数据切分成多个小块,分别进行处理,而Reduce阶段则负责汇总各个Map阶段的结果,最终得到我们需要的输出。 **Hadoop** 是一个开源的分布式计算框架,它实现了MapReduce模型,并提供了高容错性和可扩展...

Global site tag (gtag.js) - Google Analytics