`
bobboy007
  • 浏览: 31597 次
  • 性别: Icon_minigender_1
  • 来自: 淄博
社区版块
存档分类
最新评论

Hadoop2.5.2 从hdfs mapreduce导出数据到多个hbase表

 
阅读更多

hadoop和hbase配置好正常运行时的进程情况,jps后查看

60559 HRegionServer

7329 Main

20653 Jps

29355 HQuorumPeer

16221 ResourceManager

29417 HMaster

16538 NodeManager

15750 NameNode

15880 DataNode

16046 SecondaryNameNode

 

网上很多例子都是基于hadoop 0.9x 的,新版hadoop函数有变。

例子是从 hadoop hdfs上读取文件,map reduce后写入多个hbase 表

故重新测试例子如下环境:

hadoop 2.5.2

hbase 1.1.4

有一种场景:例如需要分析日志,统计后,存储到hbase 结果集表和索引表:

例子中没用新版hbase函数,若用新版函数请参考修改

http://bobboy007.iteye.com/admin/blogs/2289537

package jyw.test;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; 
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.io.Writable;
/*
 * 测试reduce写入多个表
 * */
public class HBaseMultiTableOutputReduce {

	// 实现 Map 类
	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	/* 实现 Reduce 类
	 * map 的输出类型
	 * map的输出值类型
	 * reduce的输出类型
	 * reduce的输出类型
	 * 查是否有setup,clear方法,测试到 myql
 */
	public static class Reduce extends Reducer<Text, IntWritable, Writable, Put> {

		public void reduce(Text key, Iterable<IntWritable> values, Context context) {
			ImmutableBytesWritable putTable1 = new ImmutableBytesWritable(Bytes.toBytes("wordcount"));
			ImmutableBytesWritable putTable2 = new ImmutableBytesWritable(Bytes.toBytes("wordcount1"));
			int sum = 0;

			Iterator<IntWritable> iterator = values.iterator();
			while (iterator.hasNext()) {
				sum += iterator.next().get();
			}

			// Put 实例化,每个词存一行
			Put put = new Put(Bytes.toBytes(key.toString()));
			// 列族为 content,列修饰符为 count,列值为数目
			put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));

			try {
				context.write(putTable1, put);
				context.write(putTable2, put);
			} catch (Exception e) {
				e.printStackTrace();
			}
			// context.write(NullWritable.get(), put);
		}
	}

	// 创建 HBase 数据表
	public static void createHBaseTable(String tableName) throws IOException {
		// 创建表描述
		HTableDescriptor htd = new HTableDescriptor(tableName);
		// 创建列族描述
		HColumnDescriptor col = new HColumnDescriptor("content");
		htd.addFamily(col);

		// 配置 HBase
		Configuration conf = HBaseConfiguration.create();

		// conf.set("hbase.zookeeper.quorum","127.0.0.1");
		// conf.set("hbase.zookeeper.property.clientPort", "2181");
		HBaseAdmin hAdmin = new HBaseAdmin(conf);

		if (hAdmin.tableExists(tableName)) {
			System.out.println("该数据表已经存在,正在重新创建。");
			// hAdmin.disableTable(tableName);
			// hAdmin.deleteTable(tableName);
		} else {

			System.out.println("创建表:" + tableName);
			hAdmin.createTable(htd);
		}
	}

	public static void main(String[] args) throws Exception {
		String tableName1 = "wordcount";
		String tableName2 = "wordcount1";
		// 第一步:创建数据库表
		HBaseMultiTableOutputReduce.createHBaseTable(tableName1);
		HBaseMultiTableOutputReduce.createHBaseTable(tableName2);
		// 第二步:进行 MapReduce 处理
		// 配置 MapReduce
		Configuration conf = new Configuration();
		// 这几句话很关键
		// conf.set("mapred.job.tracker", "master:9001");
		// conf.set("hbase.zookeeper.quorum","master");
		// conf.set("hbase.zookeeper.property.clientPort", "2181");
		// conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);

		Job job = new Job(conf, "multi output Count");
		job.setJarByClass(HBaseMultiTableOutputReduce.class);

		// 设置 Map 和 Reduce 处理类
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);

		// 设置输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 设置输入和输出格式
		job.setInputFormatClass(TextInputFormat.class);
		// job.setOutputFormatClass(TableOutputFormat.class);
		job.setOutputFormatClass(MultiTableOutputFormat.class);

		// 设置输入目录
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.42:9000/user/jiayongwei/input/"));
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
}

 

分享到:
评论

相关推荐

    Hadoop介绍,HDFS和MapReduce工作原理

    Hadoop介绍,HDFS和MapReduce工作原理

    Hadoop/HDFS/MapReduce/HBase

    对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看

    HDFS 通过mapreduce 进行 HBase 导入导出

    标题 "HDFS 通过 mapreduce 进行 HBase 导入导出" 涉及的是大数据处理领域中的两个重要组件——Hadoop Distributed File System (HDFS) 和 HBase,以及它们之间的数据交互。HDFS 是 Hadoop 的分布式文件系统,而 ...

    hbase和hadoop数据块损坏处理

    * hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot 'snap_test' -copyto /data/huang_test:将快照导出到 HDFS * clone_snapshot 'snap_test', 'test':将快照恢复到 HBase 表中 五、手动修复 ...

    eclipse开发hadoop2.5.2所用到都jar

    总结来说,"eclipse开发hadoop2.5.2所用到的jar"指的是在Eclipse中开发Hadoop应用时,需要导入的一系列JAR文件,包括Hadoop的核心组件、依赖库和其他辅助工具,以支持Hadoop MapReduce的编程和调试。这些JAR文件确保...

    详解Hadoop核心架构HDFS+MapReduce+Hbase+Hive

    通过这一阶段的调研总结,从内部机理的角度详细分析,HDFS、MapReduce、Hbase、Hive是如何运行,以及基于Hadoop数据仓库的构建和分布式数据库内部具体实现。如有不足,后续及时修改。整个Hadoop的体系结构主要是通过...

    Hadoop中的HDFS和Mapreduce

    ### Hadoop中的HDFS和MapReduce #### Hadoop核心组件:HDFS与MapReduce **Hadoop** 是一个能够处理海量数据的开源软件框架,它最初由Apache开发,旨在为大规模数据提供分布式处理能力。Hadoop的核心组件包括**HDFS...

    hadoop-2.5.2.tar.gz

    Hadoop生态系统还包括HBase、Hive、Pig、Oozie等工具,它们分别提供了NoSQL数据库服务、SQL查询接口、数据流处理和工作流调度等功能,共同构建了一个强大的大数据处理平台。 总结来说,“hadoop-2.5.2.tar.gz”是...

    《hadoop-HDFS+MapReduce+Hive+Hbase快速入门》

    《hadoop-HDFS+MapReduce+Hive+Hbase快速入门》,一门入门hadoop的经典书籍,相信能够给学习云计算的大家带来帮助。

    hadoop 2.5.2 源码

    Hadoop 2.5.2是Hadoop发展过程中的一个重要版本,它引入了许多改进和优化,旨在提高系统的稳定性和性能。在这个版本中,我们能够深入理解Hadoop的核心组件、工作原理以及如何进行定制和优化。 1. Hadoop的核心组件 ...

    hadoop2.5.2window下eclipse环境搭建

    ### hadoop2.5.2在Windows下的Eclipse环境搭建详解 #### 一、Hadoop简介 Hadoop是由Apache基金会所开发的一个开源分布式计算框架,主要用于处理和存储大规模数据集。它通过分布式文件系统(HDFS)和MapReduce编程...

    [案例]从冷备份的hdfs数据中恢复到原来的hbase表

    ### 从冷备份的HDFS数据中恢复到原来的HBase表 #### 概述 本文档详细介绍了一种从HDFS中的冷备份数据恢复至HBase表的方法。此过程适用于使用了HBase 1.1.x 和 Hadoop 2.7.x版本的环境。通过以下步骤,可以有效地将...

    hadoop基础,hdfs,hive,mapreduce,hbase

    hadoop基础,hdfs,hive,mapreduce,hbase

    Hadoop原理与技术MapReduce实验

    (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 (6)网页上查看MapReduce任务 2.矩阵相乘实验(matrix) (1)写matrix代码并把代码生成jar包 (2)运行命令 (1):把linus下的文件放...

    hadoop的mapreduce把oracle/mysq导入到hbase和hdfs中的程序

    总结来说,这个程序集是关于如何利用Hadoop MapReduce从传统的关系型数据库(Oracle和MySQL)中提取数据,并将其有效地导入到Hadoop的分布式存储组件(HDFS和HBase)中,以实现大数据的存储和处理。这涉及到了数据...

    Hadoop中HDFS和MapReduce框架介绍pdf

    HDFS的一个显著特点是数据复制,通过复制数据块到多个节点,提高数据的容错性。默认情况下,每个数据块有三个副本,这样即使有两个节点故障,数据仍可从第三个节点恢复。 MapReduce是Hadoop的另一个核心组件,它是...

    hdfs2-7_3+hbase1_2_5 HA withQJM环境搭建

    本文将深入探讨如何构建一个基于HDFS 2.7.3和HBase 1.2.5的HA环境,并使用Quorum Journal Manager (QJM)来确保数据的可靠性。QJM是一种在Hadoop中实现NameNode HA的关键组件,它通过协调JournalNodes来持久化HDFS的...

Global site tag (gtag.js) - Google Analytics