`
liyonghui160com
  • 浏览: 774628 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

从hadoop取出文件写入hbase表中

阅读更多

 

 

package example2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

//Administrator
public class ImportFromFileExample {
	
	public static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable,Text >{
		
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			context.write(new ImmutableBytesWritable(Bytes.toBytes(key.get())), value);
		}

		
	}
	
	public static class Reducer1 extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> {
		private byte[] family=null;
		private byte[]qualifier=null;
		

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			String column=context.getConfiguration().get("conf.column");
			byte[][]colkey=KeyValue.parseColumn(Bytes.toBytes(column));
			family=colkey[0];
			if(colkey.length>1){
				qualifier=colkey[1];
			}
		}
		
        public void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
        	String valueCon=null;
        	for(Text text:values){
        		valueCon+=text.toString();
        	}
            Put put = new Put(key.get());
            put.add(family, qualifier, Bytes.toBytes(valueCon));
            context.write(key, put);
        }
    }

	/**
	 * @param args
	 */
	public static void main(String[] args)throws Exception {
		Configuration conf=HBaseConfiguration.create();
		String []argArray=new GenericOptionsParser(conf, args).getRemainingArgs();
		if(argArray.length!=1){
			System.exit(1);
		}
		conf.set("conf.column", "family1:text");
		Job job=new Job(conf,"import from hdfs to hbase");
		job.setJarByClass(ImportFromFileExample.class);
		job.setMapperClass(ImportMapper.class);
		job.setOutputFormatClass(TableOutputFormat.class);
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "testtable");
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Put.class);
		TableMapReduceUtil.initTableReducerJob("testtable", Reducer1.class, job);
		FileInputFormat.addInputPaths(job, argArray[0]);
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

}

 

分享到:
评论

相关推荐

    云计算第三版精品课程配套PPT课件含习题(33页)第6章 Hadoop 2.0 大家族(四).pptx

    - Sink:从Channel取出数据并将其发送到目标存储,如HDFS、HBase等。 6.7.1 Flume简介 Flume具有灵活的架构,可以通过配置文件定义Source、Channel和Sink,支持数据的高可靠性传输。其逻辑结构包括Source读取数据,...

    flume-ng-1.6.0-cdh5.14.2.rar

    接收器则将数据从通道中取出并发送到目标位置,如 HDFS、HBase 或其他数据存储系统。 在 `flume-ng-1.6.0-cdh5.14.2` 中,你可以找到以下组件和配置: 1. **源(Sources)**:包括简单的文件读取(File Source)、...

    Flume 构建高可用、可扩展的海量日志采集系统

    4. **数据存储**:最后通过Sink(如HDFS Sink)将处理好的数据批量写入到Hadoop分布式文件系统中,供后续的数据分析和挖掘使用。 ### 总结 Apache Flume作为一款成熟稳定的日志采集工具,在大数据领域有着广泛的...

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    接收器则负责将数据从通道取出并写入目标存储,如 HDFS 或 Elasticsearch。 在 Flume 的配置文件中,用户定义了这些组件以及它们之间的连接方式。例如,一个简单的配置可能包括一个从日志目录读取数据的 FileSource...

    巴豆大数据团队讲师课件Flume.pdf

    它将数据从产生点高效传输到数据存储中心,例如HDFS(Hadoop分布式文件系统)或HBase。Flume之所以重要,是因为它在大数据生态系统中承担了日志数据收集的关键角色,特别是在处理由多个服务器产生的数据时,可以迅速...

    分布式存储的元数据设计

    小文件问题可以通过缓存层来改善读取性能,但对高频写入缺乏好的解决方案。数据不一致性问题也是无中心存储设计中需要关注的,例如在覆盖key的过程中,可能只覆盖了部分副本导致读取到错误的数据。 有中心存储设计...

    日志服务器 Apache Flume.tar

    例如,你可以设置一个 Source 为 Taildir Source(监听指定目录下的新文件),Channel 为 Memory Channel(临时存储在内存中),Sink 为 HDFS Sink(将数据写入 Hadoop 文件系统)。 **Flume 在大数据运维中的应用...

    flume-ng-1.6.0-cdh5.5.2-src.tar.gz

    接收器负责将数据从通道中取出并发送到目的地,如HDFS、HBase或外部系统。 在CDH 5.5.2环境下,Flume NG 1.6.0的特性包括: 1. **改进的性能**:通过对代码的优化和对并发处理的支持,Flume NG在数据采集速度上有...

    Flume环境部署和配置详解及案例大全1

    Flume 的设计目标是实现分布式、可靠且高可用的解决方案,它允许用户自定义数据发送源(Sources),并将收集到的数据进行简单处理,然后发送到各种数据接收端,如HDFS(Hadoop Distributed File System)、HBase等。...

    Flume学习文档(1){Flume基本概念、Flume事件概念与原理}.docx

    - **Sink**:将数据从Channel中取出并写入到最终的目的地,如HDFS、HBase等。 #### 2.2 Event - **Event**:Flume数据传输的基本单元,由Header和Body两部分组成。Header用于存储Event的相关属性,采用键值对的形式...

    Flume1.5.0入门:安装、部署、及flume的案例

    4. **Sink**:Sink 从 Channel 中取出事件,进行处理或转发到其他系统,如 HDFS、HBase 或其他 Source。 5. **Channel**:Channel 作为一个中间缓存,存储事件直到它们被成功处理。常见的 Channel 类型包括 ...

    大数据技术之Flume.docx

    - **Sink**:Sink是数据输出端,负责从Channel中批量取出事件并将其写入存储系统、索引服务或转发给其他Flume Agent。Sink具有事务性,保证数据的完整传输。 - **Event**:Event是Flume数据传输的基本单位,以事件...

    Flume架构以及应用介绍

    例如,一个简单的配置可能包括一个从日志文件中读取数据的 Taildir Source,一个内存 Channel 用于临时存储数据,以及一个将数据写入 HDFS 的 HDFS Sink。 总结来说,Flume 提供了一个强大的日志采集和传输框架,...

Global site tag (gtag.js) - Google Analytics