package example2; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; //Administrator public class ImportFromFileExample { public static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable,Text >{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new ImmutableBytesWritable(Bytes.toBytes(key.get())), value); } } public static class Reducer1 extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> { private byte[] family=null; private byte[]qualifier=null; @Override protected void setup(Context context) throws IOException, InterruptedException { String column=context.getConfiguration().get("conf.column"); byte[][]colkey=KeyValue.parseColumn(Bytes.toBytes(column)); family=colkey[0]; if(colkey.length>1){ qualifier=colkey[1]; } } public void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String valueCon=null; for(Text text:values){ valueCon+=text.toString(); } Put put = new Put(key.get()); put.add(family, qualifier, Bytes.toBytes(valueCon)); context.write(key, put); } } /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf=HBaseConfiguration.create(); String []argArray=new GenericOptionsParser(conf, args).getRemainingArgs(); if(argArray.length!=1){ System.exit(1); } conf.set("conf.column", "family1:text"); Job job=new Job(conf,"import from hdfs to hbase"); job.setJarByClass(ImportFromFileExample.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "testtable"); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); TableMapReduceUtil.initTableReducerJob("testtable", Reducer1.class, job); FileInputFormat.addInputPaths(job, argArray[0]); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关推荐
- Sink:从Channel取出数据并将其发送到目标存储,如HDFS、HBase等。 6.7.1 Flume简介 Flume具有灵活的架构,可以通过配置文件定义Source、Channel和Sink,支持数据的高可靠性传输。其逻辑结构包括Source读取数据,...
接收器则将数据从通道中取出并发送到目标位置,如 HDFS、HBase 或其他数据存储系统。 在 `flume-ng-1.6.0-cdh5.14.2` 中,你可以找到以下组件和配置: 1. **源(Sources)**:包括简单的文件读取(File Source)、...
4. **数据存储**:最后通过Sink(如HDFS Sink)将处理好的数据批量写入到Hadoop分布式文件系统中,供后续的数据分析和挖掘使用。 ### 总结 Apache Flume作为一款成熟稳定的日志采集工具,在大数据领域有着广泛的...
接收器则负责将数据从通道取出并写入目标存储,如 HDFS 或 Elasticsearch。 在 Flume 的配置文件中,用户定义了这些组件以及它们之间的连接方式。例如,一个简单的配置可能包括一个从日志目录读取数据的 FileSource...
它将数据从产生点高效传输到数据存储中心,例如HDFS(Hadoop分布式文件系统)或HBase。Flume之所以重要,是因为它在大数据生态系统中承担了日志数据收集的关键角色,特别是在处理由多个服务器产生的数据时,可以迅速...
小文件问题可以通过缓存层来改善读取性能,但对高频写入缺乏好的解决方案。数据不一致性问题也是无中心存储设计中需要关注的,例如在覆盖key的过程中,可能只覆盖了部分副本导致读取到错误的数据。 有中心存储设计...
例如,你可以设置一个 Source 为 Taildir Source(监听指定目录下的新文件),Channel 为 Memory Channel(临时存储在内存中),Sink 为 HDFS Sink(将数据写入 Hadoop 文件系统)。 **Flume 在大数据运维中的应用...
接收器负责将数据从通道中取出并发送到目的地,如HDFS、HBase或外部系统。 在CDH 5.5.2环境下,Flume NG 1.6.0的特性包括: 1. **改进的性能**:通过对代码的优化和对并发处理的支持,Flume NG在数据采集速度上有...
Flume 的设计目标是实现分布式、可靠且高可用的解决方案,它允许用户自定义数据发送源(Sources),并将收集到的数据进行简单处理,然后发送到各种数据接收端,如HDFS(Hadoop Distributed File System)、HBase等。...
- **Sink**:将数据从Channel中取出并写入到最终的目的地,如HDFS、HBase等。 #### 2.2 Event - **Event**:Flume数据传输的基本单元,由Header和Body两部分组成。Header用于存储Event的相关属性,采用键值对的形式...
4. **Sink**:Sink 从 Channel 中取出事件,进行处理或转发到其他系统,如 HDFS、HBase 或其他 Source。 5. **Channel**:Channel 作为一个中间缓存,存储事件直到它们被成功处理。常见的 Channel 类型包括 ...
- **Sink**:Sink是数据输出端,负责从Channel中批量取出事件并将其写入存储系统、索引服务或转发给其他Flume Agent。Sink具有事务性,保证数据的完整传输。 - **Event**:Event是Flume数据传输的基本单位,以事件...
例如,一个简单的配置可能包括一个从日志文件中读取数据的 Taildir Source,一个内存 Channel 用于临时存储数据,以及一个将数据写入 HDFS 的 HDFS Sink。 总结来说,Flume 提供了一个强大的日志采集和传输框架,...