`
ganliang13
  • 浏览: 251019 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

mapreduce编程模型之hbase输入hdfs多路输出

阅读更多
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import com.bfd.util.Const;


public class IPCount {
	
	static class MyMapper extends TableMapper<Text, Text> {
		@Override
		public void map(ImmutableBytesWritable row, Result value,Context context) throws IOException, InterruptedException {
			  
			for (KeyValue kv : value.raw()) {
				val = new String(kv.getValue(),"UTF-8");
				qualifier = new String(kv.getQualifier());
				if(qualifier.indexOf(">brand")==-1){
					context.write(gid, outVal);
				}
			}
		}
	}
	
	static class MyReducer extends Reducer<Text, Text, Text, Text> {
		@SuppressWarnings("rawtypes")
		private MultipleOutputs multipleOutputs; 
		protected void setup(Context context) throws IOException, InterruptedException {
			multipleOutputs =new MultipleOutputs<Text,Text>(context);
		}
		
		protected void cleanup(Context context) throws IOException, InterruptedException {
			multipleOutputs.close();
		}
		@SuppressWarnings("unchecked")
		public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
			if(isTrue){
				multipleOutputs.write(NullWritable.get(),gid+"\t"+value,"active_normal");
			}else{
				multipleOutputs.write(NullWritable.get(),gid+"\t"+value,"nonactive_normal");
			}
		}			
    }
	
	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", Const.ZOOKEEPER_QUORAM);
		conf.set("zookeeper.znode.parent", Const.ZOOKEEPER_ZNODE_PARENT);
		Job job = new Job(conf, "IPCount");
		job.setJarByClass(IPCount.class);
		Scan scan = new Scan();
		scan.setCaching(500);
		scan.setCacheBlocks(false);
		TableMapReduceUtil.initTableMapperJob(args[0],scan,MyMapper.class,Text.class,Text.class,job);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(10);
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics