`

hadoop 二次排序 插入数据库

 
阅读更多

     二次排序:根据自定义对象的compareTo 方法排序

    由下面的代码实现可以看出 二次排序的实质是 先根据第一个字段排完序后再排第二个字段

若还有第三个字段参与进来是否可以叫作三次排序呢   (?_ ?)

 

     另:根据程序断点初步判断 

设置job的sort   会在mapper 至combiner阶段执行

设置job的group会在combiner至reduce 阶段执行

不过在从combiner到reduce的时候若传递的key为自定义的对象即使重写了hashcode 和equals 方法也不会当成相同的key来处理 不得已在本程序中传输key为一个空Text()

   不知是否有别的方法可以实现  ?

 

插入数据库的操作在 附件中有详细的实现.

 

package hdfs.demo2.final_indb;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Demo2_3Mapp { 
 
	/**
	 * 用户自定义对象 保存
	 * @author Administrator
	 *
	 */
	public static class TopTenPair implements WritableComparable<TopTenPair>, DBWritable, Writable  {
		int prodid; //商品编码
		int price;  //商品价格
		int count;  //商品销售数量
		@Override
		public void write(PreparedStatement statement) throws SQLException {
			statement.setInt(1, prodid);
			statement.setInt(2, price);
			statement.setInt(3, count);
		}

		@Override
		public void readFields(ResultSet resultSet) throws SQLException {
			this.prodid = resultSet.getInt(1);
			this.price = resultSet.getInt(2);
			this.count = resultSet.getInt(3);
		}
		
		/**
		 * Set the prodId and price and count  values.
		 */
		public void set(int prodid, int price, int count) {
			this.prodid = prodid;
			this.price = price;
			this.count = count;
		}

		public int getProdid() {
			return prodid;
		}

		public int getPrice() {
			return price;
		}	
		public int getCount() {
			return count;
		}

		@Override
		// 反序列化,从流中的二进制转换成IntPair
		public void readFields(DataInput in) throws IOException {
			prodid = in.readInt();
			price = in.readInt();
			count  = in.readInt();
		}

		@Override
		// 序列化,将IntPair转化成使用流传送的二进制
		public void write(DataOutput out) throws IOException {
			out.writeInt(prodid);
			out.writeInt(price);
			out.writeInt(count);
		}

		@Override
		// key的比较
		public int compareTo(TopTenPair o) {
			if ( o.count ==count) {
				if( o.count==0){
					return  o.prodid - prodid;
				}
				return  o.price-price;
			}
			return o.count-count;
		}

		// 新定义类应该重写的两个方法
		@Override
		public int hashCode() {
			return count+prodid*3 ;
		}

		@Override
		public boolean equals(Object right) {
			if (right == null)
				return false;
			if (this == right)
				return true;
			if (right instanceof TopTenPair) {
				TopTenPair r = (TopTenPair) right;
				return r.prodid == prodid && r.price == price&& r.count == count;
			} else {
				return false;
			}
		}
		@Override
		public String toString(){
			return getProdid()+"\t"+getPrice()+"\t"+getCount();
		}
	}

	public static class TopTenPairS extends   TopTenPair{
		public TopTenPairS(){
		}
		// key的比较
		@Override
		public int compareTo(TopTenPair o) {
			return  o.price-price;
		}
	}
	/**
	 * 分区函数类。根据first确定Partition。
	 */
	public static class FirstPartitioner extends
			Partitioner<TopTenPair, Text> {
		@Override
		public int getPartition(TopTenPair key, Text value,
				int numPartitions) {
			return Math.abs(key.getProdid()  ) % numPartitions;
		}
	}

	/**
	 * 分组函数类。只要first相同就属于同一个组。
	 */
	public static class GroupingComparator extends WritableComparator {
		protected GroupingComparator() {
			super(TopTenPair.class, true);
		}
		@Override
		// Compare two WritableComparables.
		public int compare(WritableComparable w1, WritableComparable w2) {
			TopTenPair ip1 = (TopTenPair) w1;
			TopTenPair ip2 = (TopTenPair) w2;
			
			if (ip1.count == ip2.count) {
				if(ip1.count==0){
					return  ip1.prodid - ip2.prodid;
				}
				return  ip1.price - ip2.price ;
			}
			return ip1.count-ip2.count;
		}
	}
	public static class Map extends
		Mapper<LongWritable, Text, Text, Text> {
		private final Text intkey= new Text();
		private final Text intvalue = new Text();
		
		//商品ID  售价    数量
		public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line);
			int prodid = 0;
			int price = 0;
			if (tokenizer.hasMoreTokens())
				prodid = Integer.parseInt(tokenizer.nextToken());
			if (tokenizer.hasMoreTokens())
				price = Integer.parseInt(tokenizer.nextToken());
			intkey.set(prodid+"");
			intvalue.set(price+"");
//			intvalue.set(0, price, 0);
			context.write(intkey, intvalue);
		}
	}

	public static class Demo2_3Combiner extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			int count=0;
			int maxPrice=0;
			for (Text value : values) {
				int v=Integer.parseInt(value.toString());
				maxPrice=v<maxPrice?maxPrice:v;
				count++;
			}
			//key :prodId  
			context.write(new Text(),new Text(key+"-"+maxPrice+"-"+count));
		}
	}
	public static class Reduce extends
			Reducer<Text, Text, TopTenPairS, Text> {
	 TopTenPair pair = new TopTenPair();


		public void reduce(Text key, Iterable<Text> values,
				Context context) throws IOException, InterruptedException {
			String [] strs=null;	
			TopTenPair pair ;
			List<TopTenPair> list=new ArrayList<Demo2_3Mapp.TopTenPair>();
			for (Text val : values) {
				pair = new TopTenPair();
				strs=val.toString().split("-");
				pair.set(Integer.parseInt(strs[0]), 
						Integer.parseInt(strs[1]),
						Integer.parseInt(strs[2]));
				list.add(pair);
			}
			//按 count属性排序
			Collections.sort(list);
			

			List<TopTenPairS> lists=new ArrayList<Demo2_3Mapp.TopTenPairS>();
			//取前4个对象
			for(int i =0;i<4&& i<list.size();i++){
				TopTenPair ttp=list.get(i);
				TopTenPairS ttps=new TopTenPairS();
				ttps.set(ttp.getProdid(), ttp.getPrice(), ttp.getCount());
				lists.add(ttps);
			}
			//按 price 属性排序
			Collections.sort(lists);
			for(TopTenPairS ttps:lists){
				System.out.println(ttps);
				//参考 DBRecordWriter 
				//key 为数据类型, value:无用
				context.write( ttps , new Text()); //输出到数据中
			}
		}
	}

}

 

 

 

分享到:
评论

相关推荐

    Hadoop基础面试题(附答案)

    包括结构化数据(如关系型数据库中的数据)、半结构化数据(如XML、JSON等)和非结构化数据(如文本、图像、视频等)。 - **快速化**:数据生成速度快,同时需要高效的处理速度来确保及时分析并获取有用信息。 - **...

    第二次试验-HBase1

    在这个实验中,我们将深入学习HBase,一个分布式、高性能的NoSQL数据库,它是Apache Hadoop生态系统的一部分。HBase基于Google的Bigtable设计,专为处理大规模数据而设计,支持随机读写操作。在这个实验中,我们主要...

    MongoDB 是一个基于分布式文件存储的数据库.pdf

    - **插入文档**:除了 `insert()` 方法之外,还可以使用 `insertMany()` 一次插入多个文档。例如: ```javascript db.mycollection.insertMany([ {name: "Alice", age: 25, city: "Los Angeles"}, {name: "Bob",...

    HBase技术介绍.docx

    **HBase**,全称为Hadoop Database,是一款构建在Hadoop之上、面向列的分布式数据库系统。它具备高可靠性、高性能和可扩展性等特点,能够在成本相对低廉的硬件设备上构建大规模的数据存储集群。 #### 二、HBase的...

    实验手册:搜狗搜索日志分析系统实现-Hadoop2.0-v1.2-noted

    - **实现方法**:通过HBase Shell命令,进行表的创建、数据的插入和查询等操作。 #### 十一、使用Sqoop将数据导入HBase - **需求描述**:将HDFS中的数据导入HBase数据库中,利用HBase的列式存储特点进行高效的数据...

    二期认证笔试

    - **排序与搜索算法**:冒泡、选择、插入排序,快速排序,归并排序,哈希查找,二分查找。 - **动态规划**和**贪心算法**:解决最优化问题的方法。 6. **云计算与大数据**: - **云计算模型**:IaaS、PaaS、SaaS...

    教你如何迅速秒杀掉 海量数据处理面试题.pdf

    第一轮划分按照某个特征将数据分布到一组桶中,第二轮再在每个桶内进行划分,以进一步细分数据集。 Bloom Filter是一种空间效率很高的概率型数据结构,用于判断一个元素是否在一个集合中。它通过多个哈希函数映射到...

    2021年最新总结,腾讯、阿里、美团、百度、字节跳动、京东等技术面试题,以及答案,专家出题人分析汇总 .zip

    - **排序算法**:快速排序、归并排序、堆排序、冒泡排序、插入排序等的原理及复杂度分析。 - **查找算法**:二分查找、哈希查找等。 2. **操作系统** - **进程与线程**:进程的概念、状态转换、线程同步(互斥锁...

    面试题.zip

    3. **算法**:排序(快速排序、归并排序、堆排序等)、查找(二分查找、哈希查找)、图算法(Dijkstra、Floyd、Prim)、动态规划、回溯法、贪心策略等,这些都是衡量程序员逻辑思维能力的重要标准。 4. **操作系统*...

    阿里巴巴面试题

    - 排序算法:快速排序、归并排序、冒泡排序、插入排序、希尔排序、堆排序等。 - 搜索算法:深度优先搜索(DFS)、广度优先搜索(BFS)、二分查找、KMP字符串匹配等。 - 动态规划和贪心策略在解决实际问题中的应用。 ...

    IT企业面试试题

    - 掌握排序算法(快速排序、归并排序、冒泡排序、插入排序等)和查找算法(二分查找、哈希查找)。 - 能够解决实际问题,例如动态规划、回溯法、贪心策略等。 3. **操作系统原理**: - 进程与线程的概念,理解...

    面试题收集

    - 排序算法:快速排序、归并排序、堆排序、冒泡排序、选择排序等的实现及复杂度分析。 - 查找算法:二分查找、哈希表的查找效率分析。 2. 编程语言基础: - Java:面向对象特性(封装、继承、多态)、异常处理、...

    IT名企面试资料

    - 排序:快速排序、归并排序、堆排序等常见排序算法的原理和复杂度分析。 - 图:图的遍历(深度优先搜索、广度优先搜索)和最短路径算法(Dijkstra、Floyd)。 - 查找:哈希表、二分查找等高效查找方法。 2. ...

    2017最新大数据架构师精英课程

    94_job二次排序5 t3 Z2 R- ]( a: s* c0 Z 95_从db输入数据进行mr计算: L. M4 I6 y, R2 l/ u/ L 96_输出数据到db中 97_NLineInputFormat& u( k1 T& z( O# P, S* y1 Y 98_KeyValueTextInputFormat* p$ O1 z- h, n" e( ...

    Java开发者或者大数据开发者面试知识点整理.zip

    8. **算法与数据结构**:基础算法如排序(快速、归并、冒泡、插入等)、查找(二分查找、哈希查找等),复杂度分析。了解链表、树(二叉树、平衡树、B树、B+树等)、图、堆、队列、栈等数据结构的实现和应用。 9. *...

    中国电信IT研发中心2016校招笔试

    2. 数据结构与算法:笔试中可能会考察常见的数据结构(如数组、链表、树、图、堆、队列、栈等)及其操作,以及排序和搜索算法(如冒泡排序、快速排序、二分查找等)。 3. 操作系统:基础的进程管理、内存管理、文件...

    数据海量分页

    "数据海量分页"就是一种有效的解决策略,它允许用户逐步加载和查看大规模数据集,而不是一次性加载所有数据,从而避免了性能瓶颈和用户体验的下降。 在数据库系统中,分页是通过SQL查询实现的,通常结合`LIMIT`和`...

    整理的面试题.rar

    - **排序和查找算法**:快速排序、归并排序、插入排序、二分查找等,了解它们的时间复杂性和适用场景。 2. **计算机网络**: - **TCP/IP五层模型或OSI七层模型**:理解每一层的功能和协议,如TCP、UDP、HTTP、FTP...

    大厂面试系列二.pdf

    在海量日志数据中提取出某日访问百度次数最多的IP问题,可以使用Hadoop等分布式处理工具,或者使用内存数据库如Redis进行高速处理。 在有10个文件,每个文件1G,每个文件的每一行都存放的是用户的query,要按照...

Global site tag (gtag.js) - Google Analytics