`

Hadoop简单的Map/Reduce

 
阅读更多
/**
 * 需求:
 *     统计Hbase数据库中某个字符串的使用人数
 *     字符串存储格式 1002;1003,2003,1443;1232,232
 *	   最后统计 模块   使用人数
 *       1    1002    X
 *       ...
 *     把最后的统计信息存储到结果表中.
 */
public class CountUse {

	public static Logger loger = Wloger.loger;

	/**
	 * Internal Mapper to be run by Hadoop.
	 */
	public static class MapTask extends TableMapper<UdcSellHTable, IntWritable> {
		private UdcSellHTable udcSell = new UdcSellHTable();
		private String tableName;
		private byte[] family;
		private HashMap<String, byte[]> indexes;
		private UdcSellHTable word = new UdcSellHTable();
		private final static IntWritable one = new IntWritable(1);

		/**
		 * Map 统计使用人数
		 */
		@SuppressWarnings("static-access")
		@Override
		protected void map(ImmutableBytesWritable rowKey, Result result,
				Context context) throws IOException, InterruptedException {
			// 搜索数据库设置到对象当中
			for (Map.Entry<String, byte[]> entity : indexes.entrySet()) {
				String column = entity.getKey();
				PropertyName property = new PropertyName(family, entity
						.getValue());
				Object type = udcSell.types.get(column);
				Object value = CountUseUtil.changeValue(new Entity(tableName,
						result), property, type);// 数据库中的值
				try {
					loger.info("debug:"+udcSell);
					loger.info("debug:"+UdcSellHTable.filter(column));
					loger.info("debug:"+value);
					BeanUtils.setProperty(udcSell, UdcSellHTable.filter(column),
							value);
				} catch (Exception e) {
					loger.info("对象属性定义有问题,请check[" + udcSell + ":" + column
							+ "[" + value + "]]");
				}
			}
			// filter 属性过滤,过滤掉不符合规则的记录
			String widget = udcSell.getWidget();// widget数据,//674324;321,321,312;3321
			word.setWidget(widget);//设置widget
			loger.debug("debug:"+widget);
			List<String> keys = CountUseUtil.getKeys(widget);

			for (String key : keys) {
				word.setUserId(udcSell.getUserId());
				word.setDate(udcSell.getDate());
				word.setModuleId(Long.parseLong(key));
				word.setUseCount(1);
				loger.info("Map <word>" + word.toString() + "</word>");
				context.write(word, one);
			}
		}

		@SuppressWarnings("static-access")
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			Configuration configuration = context.getConfiguration();
			tableName = configuration.get("input.tablename");
			String familyName = configuration.get("input.familyname");
			udcSell.setDate(CountUseUtil.yestoday());
			String[] fields = new String[] { udcSell.COLUMN_USERID,
					udcSell.getWidgetDate() };
			indexes = new HashMap<String, byte[]>();
			for (String field : fields) {
				indexes.put(field, Bytes.toBytes(field));
			}
			family = Bytes.toBytes(familyName);
		}
	}

	/**
	 * Reduce进行合计
	 *
	 */
	public static class CombinerTask extends
			Reducer<UdcSellHTable, IntWritable, UdcSellHTable, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(UdcSellHTable key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {

			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			loger.info("combiner <key>" + key + "</key>");
			loger.info("combiner <sum>" + sum + "</sum>");
			context.write(key, result);
		}
	}

	/**
	 * Reduce合计入库
	 *
	 */
	public static class Reduce extends
			TableReducer<UdcSellHTable, IntWritable, ImmutableBytesWritable> {
		private UdcSellHTable udcSell = new UdcSellHTable();
		private String tableName;
		private IntWritable result = new IntWritable();
		private HashMap<String, byte[]> indexes;
		private byte[] family;

		public void reduce(UdcSellHTable key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			key.setUseCount(result.get());
			loger.info("Reduce <key>" + key + "</key>");
			loger.info("Reduce <sum>" + sum + "</sum>");
			String keyStr = "1_" + key.getModuleId();
			Put put = new Put(Bytes.toBytes(keyStr));// 以模块id为key
			for (Map.Entry<String, byte[]> entity : indexes.entrySet()) {
				try {
					put.add(family, entity.getValue(), Bytes.toBytes(BeanUtils
							.getProperty(key, entity.getKey())));
				} catch (Exception e) {
					loger.info("对象属性定义有问题,请check[" + udcSell + ":"
							+ entity.getKey() + "[" + entity.getValue() + "]]");
				}
			}
			ImmutableBytesWritable imw = new ImmutableBytesWritable(Bytes
					.toBytes(tableName));
			context.write(imw, put);
		}

		@SuppressWarnings("static-access")
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			Configuration configuration = context.getConfiguration();
			tableName = configuration.get("output.tablename");
			String familyName = configuration.get("output.familyname");
			family = Bytes.toBytes(familyName);
			String[] fields = new String[] { udcSell.COLUMN_USERID,udcSell.COLUMN_MODULEID,
					udcSell.COLUMN_USE };
			 indexes = new HashMap<String, byte[]>();
			for (String field : fields) {
				indexes.put(field, Bytes.toBytes(field));
			}
		}
	}

	/**
	 *
	 *
	 * Job configuration.
	 */
	public static Job configureJob(Configuration conf, String[] args)
			throws IOException {
		String inputTable = "udc_sell";
		String inputFamily = "s_year";
		String outputTable = "job_result";
		String outputFamily = "s_base";
		conf.set("input.tablename", inputTable);
		conf.set("input.familyname", inputFamily);
		conf.set(TableInputFormat.INPUT_TABLE, inputTable);

		conf.set("output.tablename", outputTable);
		conf.set("output.familyname", outputFamily);
		conf.set(TableOutputFormat.OUTPUT_TABLE, outputTable);
		Job job = new Job(conf, inputTable);
		job.setNumReduceTasks(1);
		job.setJarByClass(CountUse.class);
		job.setMapperClass(MapTask.class);
		job.setMapOutputKeyClass(UdcSellHTable.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setCombinerClass(CombinerTask.class);
		job.setReducerClass(Reduce.class);
		job.setInputFormatClass(TableInputFormat.class);
		job.setOutputFormatClass(MultiTableOutputFormat.class);
		return job;
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		loger.info("开始 job");
		Job job = configureJob(conf, otherArgs);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


分享到:
评论
2 楼 a123159521 2013-11-05  
jianxin 写道
请问UdcSellHTable这个类哪里来的啊?

i found your name is very fimaliar, you recognized me?
1 楼 jianxin 2013-11-05  
请问UdcSellHTable这个类哪里来的啊?

相关推荐

    Windows平台下Hadoop的Map/Reduce开发

    在Windows平台上进行Hadoop的Map/Reduce开发可能会比在Linux环境下多一些挑战,但通过详细的步骤和理解Map/Reduce的工作机制,开发者可以有效地克服这些困难。以下是对标题和描述中涉及知识点的详细说明: **Hadoop...

    hadoop中map/reduce

    MapReduce的设计理念源于Google的同名论文,它通过将大规模数据处理任务分解为两个阶段:Map(映射)和Reduce(化简),使得海量数据能够在多台计算机上并行处理,极大地提高了数据处理效率。 Map阶段是数据处理的...

    远程调用执行Hadoop Map/Reduce

    本篇文章将深入探讨“远程调用执行Hadoop Map/Reduce”的概念、原理及其实现过程,同时结合标签“源码”和“工具”,我们将涉及到如何通过编程接口与Hadoop集群进行交互。 Hadoop MapReduce是一种编程模型,用于大...

    基于Map/Reduce的分布式搜索引擎研究

    例如,对于一个简单的计数任务,Map阶段可以将文本文件中的每一行转换为键值对(单词,1),而Reduce阶段则根据相同的键(即单词)对这些值进行累加,以得到每个单词出现的次数。 ##### 2.2 Google的Map/Reduce ...

    hadoop教程

    Hadoop Map/Reduce 框架是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上 T 级别的数据集。该框架由一个单独的 master JobTracker ...

    hadoop map-reduce turorial

    ### Hadoop Map-Reduce 教程详析 #### 目标与作用 Hadoop Map-Reduce框架是设计用于处理大规模数据集(多太字节级)的软件框架,它允许在大量廉价硬件集群上(可达数千节点)进行并行处理,确保了数据处理的可靠性...

    基于Eclipse的Hadoop应用开发环境配置

    在 Eclipse 中配置 Hadoop Installation Directory,打开 Window--&gt;Preferences,发现 Hadoop Map/Reduce 选项,在这个选项里需要配置 Hadoop Installation Directory。 5. 配置 Map/Reduce Locations 在 Window--...

    hadoop伪分布式环境搭建

    本文将介绍如何在Windows和Linux平台上搭建Hadoop伪分布式环境,包括下载安装Hadoop、配置Eclipse、搭建Map/Reduce环境、编写Java代码等步骤。 一、下载安装Hadoop 下载Hadoop插件jar包“hadoop-eclipse-plugin-...

    map/reduce template

    标题中的“map/reduce template”指的是MapReduce编程模型的一个模板或框架,它是Apache Hadoop项目的核心部分,用于处理和生成大数据集。MapReduce的工作原理分为两个主要阶段:Map阶段和Reduce阶段,它允许程序员...

    在solr文献检索中用map/reduce

    标题中的“在solr文献检索中用map/reduce”指的是使用Apache Solr,一个流行的开源搜索引擎,结合Hadoop的MapReduce框架来处理大规模的分布式搜索任务。MapReduce是一种编程模型,用于处理和生成大型数据集,它将...

    大数据与云计算技术 Hadoop概论和快速入门 共40页.ppt

    Hadoop, Apache开源的分布式框架。源自Google GFS,BigTable,MapReduce...JobTracker,hadoop的Map/Reduce调度器,负责与TackTracker通信分配计算任务并跟踪任务进度。 TaskTracker,启动和管理Map和Reduce子任务的节点。

    hadoop,map,reduce,hdfs

    它通过两个主要阶段实现:**Map阶段**和**Reduce阶段**。MapReduce的工作流程如下: 1. **Splitting**:输入数据被分成小块,每个块称为一个split。 2. **Mapping**:每个split被传递给映射函数,映射函数对输入数据...

    Hadoop Map-Reduce教程

    ### Hadoop Map-Reduce 教程 #### 一、Hadoop Map-Reduce 概述 Hadoop Map-Reduce 是一种编程模型,用于处理大规模数据集(通常为TB级或以上)。这种模型支持分布式计算,可以在成百上千台计算机上运行。Map-...

    Hadoop教程.pdf

    2. Map/Reduce是一个使用简易的软件框架,能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 3. Hadoop可以将计算任务拆分,通过Map/Reduce统一指挥多台机器,让它们...

    基于Map_Reduce的分布式搜索引擎研究

    在对Map/Reduce算法进行分析的基础上,利用开源Hadoop软件设计出高容错高性能的分布式搜索引擎,以面对搜索引擎对海量数据的处理和存储问题

    Hadoop Map-Reduce

    Hadoop Map-Reduce Map-Reduce 是 Hadoop 框架中的一种核心组件,用于处理大规模数据。Map-Reduce 依靠两大步骤来完成数据处理:Map 和 Reduce。 Map 阶段的主要任务是将输入数据拆分成小块,并将其转换成 key-...

    Hadoop安全分布式的Eclipse开发环境配置

    在Eclipse的Map/Reduce Locations面板中,右键选择New Hadoop Location,填写必要的信息,如Location Name、Map/Reduce Master Host(Master节点IP)、Port、DFS Master等,确保与实际的Hadoop集群配置相匹配。...

    Analysis-of-Stock-Market-using-Hadoop-Map-Reduce:使用Hadoop Map Reduce分析股票市场

    使用Hadoop Map Reduce分析股票市场 如何运行程序? 首先在您的系统中安装Hadoop。 请按照以下步骤进行安装 然后开始执行给定的命令 cd hadoop-3.2.2 / sbin ./start-dfs.sh ./start-yarn.sh jps 导出HADOOP_...

    Map-Reduce原理体系架构和工作机制,eclipse与Hadoop集群连接

    此外,通过Eclipse与Hadoop集群的有效连接,开发者可以在本地环境中轻松地编写、测试和调试Map-Reduce程序,进而提高开发效率。在实际应用中,Map-Reduce已经被广泛应用于搜索引擎索引构建、社交网络数据分析、金融...

Global site tag (gtag.js) - Google Analytics