- 浏览: 1397835 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (346)
- linux (10)
- hbase (50)
- hadoop (23)
- java (52)
- java multi-thread (13)
- Oracle小记 (41)
- 机器学习 (12)
- 数据结构 (10)
- hadoop hive (16)
- java io (4)
- jms (1)
- web css (1)
- kafka (19)
- xml (2)
- j2ee (1)
- spring (6)
- ibatis (2)
- mysql (3)
- ext (3)
- lucene (3)
- hadoop pig (3)
- java nio (3)
- twemproxy (1)
- antlr (2)
- maven (6)
- mina (1)
- 列数据库 (1)
- oozie (2)
- mongodb (0)
- 报错 (0)
- jetty (1)
- neo4j (1)
- zookeeper (2)
- 数据挖掘 (3)
- jvm (1)
- 数据仓库 (4)
- shell (3)
- mahout (1)
- python (9)
- yarn (3)
- storm (6)
- scala (2)
- spark (5)
- tachyon (1)
最新评论
-
guokaiwhu:
赞啊!今晚遇到相同的问题,正追根溯源,就找到了博主!
hbase 报错gc wal.FSHLog: Error while AsyncSyncer sync, request close of hlog YouAr -
喁喁不止:
很清楚,有帮助。
hive常用函数 -
dsxwjhf:
Good job !!
kafka获得最新partition offset -
Locker.Xai:
参考了
freemaker教程 -
maoweiwer:
为啥EPHEMERAL_SEQUENTIAL类型的节点并没有自 ...
zookeeper 入门讲解实例 转
hadoop的join实现,实现符合关键字,多对多连接
key:
public class MultiKey implements WritableComparable<MultiKey> { private Text departId = new Text(); private Text departNo = new Text(); public Text getDepartId() { return departId; } public void setDepartId(String departId) { this.departId = new Text(departId); } public Text getDepartNo() { return departNo; } public void setDepartNo(String departNo) { this.departNo = new Text(departNo); } @Override public void write(DataOutput out) throws IOException { departId.write(out); departNo.write(out); } @Override public void readFields(DataInput in) throws IOException { this.departId.readFields(in); this.departNo.readFields(in); } @Override public int compareTo(MultiKey o) { return (this.departId.compareTo(o.departId) !=0)? this.departId.compareTo(o.departId) : this.departNo.compareTo(o.departNo); } @Override public String toString(){ return this.departId.toString()+" : "+this.departNo.toString(); } @Override public int hashCode(){ return 0; } }
value:
public class Employee implements WritableComparable<Employee> { private String empName=""; private String departId=""; private String departNo=""; private String departName=""; private int flag; public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } public String getEmpName() { return empName; } public void setEmpName(String empName) { this.empName = empName; } public String getDepartId() { return departId; } public void setDepartId(String departId) { this.departId = departId; } public String getDepartNo() { return departNo; } public void setDepartNo(String departNo) { this.departNo = departNo; } public String getDepartName() { return departName; } public void setDepartName(String departName) { this.departName = departName; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.empName); out.writeUTF(this.departId); out.writeUTF(this.departNo); out.writeUTF(this.departName); out.writeInt(this.flag); } @Override public void readFields(DataInput in) throws IOException { this.empName = in.readUTF(); this.departId = in.readUTF(); this.departNo = in.readUTF(); this.departName = in.readUTF(); this.flag = in.readInt(); } public static void writeAllProperties(DataOutput out,Class<? extends WritableComparable<?>> type,Object obj) throws IllegalArgumentException, IllegalAccessException{ Field[] fields = type.getDeclaredFields(); for (Field field : fields) { System.out.println(field.get(obj)); } } @Override public int compareTo(Employee o) { return 0; } @Override public String toString(){ return this.empName+" "+this.departName; } }
maper:
public class MyJoinMapper extends Mapper<LongWritable, Text, MultiKey, Employee>{ @Override public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] array = line.split(","); visit(array,context); } private void visit(String[] array,Context context) throws IOException,InterruptedException{ int i = Integer.valueOf(array[0]); MultiKey key = new MultiKey(); Employee e = new Employee(); switch (i) { case 1://name e.setEmpName(array[1]); e.setFlag(1); break; default://depart e.setDepartName(array[1]); e.setFlag(2); break; } e.setDepartId(array[2]); e.setDepartNo(array[3]); key.setDepartId(e.getDepartId()); key.setDepartNo(e.getDepartNo()); context.write(key, e); } }
reducer:
public class MyJoinReducer extends Reducer<MultiKey, Employee, IntWritable, Text>{ List<emp> empList = new LinkedList<emp>(); List<depart> departList = new LinkedList<MyJoinReducer.depart>(); @Override public void reduce(MultiKey key,Iterable<Employee> values,Context context) throws IOException,InterruptedException{ for (Employee employee : values) { visite(employee); } System.out.println("----------"); System.out.println(key); for (emp em : empList) { for (depart de : departList) { Employee e = new Employee(); e.setDepartId(em.departId); e.setDepartName(de.departName); e.setDepartNo(em.departNo); e.setEmpName(em.empName); context.write(new IntWritable(1), new Text(e.toString())); } } empList = new LinkedList<emp>(); departList = new LinkedList<MyJoinReducer.depart>(); } private void visite(Employee e){ switch (e.getFlag()) { case 1: emp em = new emp(); em.departId = e.getDepartId(); em.departNo = e.getDepartName(); em.empName = e.getEmpName(); empList.add(em); break; default: depart de = new depart(); de.departName = e.getDepartName(); departList.add(de); break; } } private class emp{ public String empName; public String departId; public String departNo; } private class depart{ public String departName; } }
comparator
public class MyJoinComparator extends WritableComparator{ protected MyJoinComparator() { super(MultiKey.class,true); } }
groupcomparator:
public class MyJoinGroupComparator implements RawComparator<MultiKey> { private DataInputBuffer buffer = new DataInputBuffer(); @Override public int compare(MultiKey key1, MultiKey key2) { return key1.compareTo(key2); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return new MyJoinComparator().compare(b1, s1, l1, b2, s2, l2); } }
今天iteye的编辑器好坑爹啊,不断的崩溃
补个测试类
public class MyJoinTest { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { upload("dirk1.txt", "dirk.txt"); upload("dirk2.txt","dirk2.txt"); delete(); Configuration conf = new Configuration(); Job job = new Job(conf, "joinJob"); job.setMapperClass(MyJoinMapper.class); job.setReducerClass(MyJoinReducer.class); job.setMapOutputKeyClass(MultiKey.class); job.setMapOutputValueClass(Employee.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setGroupingComparatorClass(MyJoinGroupComparator.class); FileInputFormat.addInputPath(job, new Path("/user/dirk3/input")); FileOutputFormat.setOutputPath(job, new Path("/user/dirk3/output")); job.waitForCompletion(true); } public static void upload(String local,String remote) throws IOException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); String l = MyJoinTest.class.getResource("").getPath()+"/"+local; fs.copyFromLocalFile(false, true, new Path(l), new Path("/user/dirk3/input/"+remote)); } public static void delete() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); fs.delete(new Path("/user/dirk3/output"), true); } public static void run(){ JobConf jobConf = new JobConf(); jobConf.setOutputKeyComparatorClass(MyJoinComparator.class); jobConf.setOutputValueGroupingComparator(MyJoinComparator.class); } }
join的主要实现在reducer中
关于comparator,在通过maper向context中添加key value后,通过combine,partition之后,进入reducer阶段,进行groupComparator,决定哪些key同时进入一个reducer
发表评论
-
hadoop报错
2015-05-25 13:36 01.hadoop hdfs启动: Initializ ... -
hadoop 常用配置备忘
2015-04-30 16:04 0job名称 mapred.job.name job队列 ... -
protocal buffers入门实例
2014-09-22 21:08 1654hadoop yarn中新的系列化protocol buf ... -
hadoop MultipleOutputs规定多文件名
2014-09-18 20:58 1356在map或reduce中 1.初始化在configure或 ... -
基于hadoop的推荐算法-mahout版
2014-08-29 17:25 9545基于hadoop的推荐算法,讲其中mahout实现的基于项 ... -
Maven搭建hadoop环境报Missing artifact jdk.tools:jdk.tools:jar:1.6
2014-08-20 16:31 11149转http://blog.csdn.net/honglei9 ... -
hadoop hdfs读写
2014-07-20 14:04 1033hadoop hdfs读写 hdfs读取文件 1 ... -
hadoop namenode报错
2014-06-06 19:40 1017hadoop启动报错 2014-06-06 19:37:1 ... -
hadoop配置文件笔记
2014-05-15 23:13 1057mapred-site.xml n ... -
hadoop join
2014-03-09 23:09 1114转一个牛人的hadoop join博客 转 http:// ... -
hadoop 二次排序
2014-03-09 23:06 1724hadoop的工作流程: http://black ... -
hadoop 工作流程 图
2014-03-09 22:59 3959hadoop工作流程,用两张简单的map, redu ... -
hadoop secondnamenode配置
2014-02-28 20:26 2226一、secondnamenode是做什么的 ... -
hadoop map reduce参数
2014-01-21 21:06 0一个job会使用tasktracker的map任务槽数 ... -
hadoop 报错 org.apache.hadoop.mapred.TaskTracker: Process Thread Dump: lost task
2013-10-13 16:38 2377项目最近报错,形如: org.apache.hadoop. ... -
hadoop 调度器 capacityTaskScheduler
2013-09-27 20:24 1457贴两个不错的链接: http://blog.csdn ... -
hadoop 报错 org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException
2013-09-26 23:26 8796报错: org.apache.hadoop.hdfs.DF ... -
hadoop oozie 报错
2013-09-26 17:38 11981.oozie报异常泄露预警 关闭oozie,需要将tom ... -
hadoop自定义outputformat源码
2013-02-19 11:59 3516hadoop outputformat是reduceTask ... -
hadoop自定义inputformat源码
2013-02-17 18:14 2909hadoop的inputformat包括他 ...
相关推荐
本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...
然而,join操作因其复杂性和数据分布的特点,在Hadoop中实现起来较为困难。具体而言: 1. **数据分布与倾斜问题**:在分布式环境中,数据的不均匀分布会导致join操作性能下降。 2. **MapReduce的局限性**:Hadoop的...
本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一过程。 Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop ...
本课程设计主要围绕如何使用Hadoop的MapReduce实现SQL中的统计、GROUP BY和JOIN操作,这是一次深入理解大数据处理机制的实践过程。 首先,让我们来探讨SQL的统计功能。在SQL中,统计通常涉及到COUNT、SUM、AVG、MAX...
在Hadoop MapReduce中,Reduce Join是一种实现大规模数据集间连接的高效方法。本文将探讨Reduce Join的工作原理,以及如何利用MRV2(MapReduce v2)API对它进行重写。 首先,我们来理解什么是Reduce Join。在关系...
文件汗有三个java类,两个测试文件txt ReduceClass.java MapClass.java TaggedRecordWritable.java customers.txt ...经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.
本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...
在使用上,Elasticsearch提供了RESTful接口,使得其可以被轻松集成到各种Web应用中,实现搜索功能。同时,Elasticsearch的聚合功能也非常出色,能够对数据进行高效的统计分析,这一点上Elasticsearch已经超越了传统...
- 这个应用程序很可能是一个示例,演示了如何在Hadoop MapReduce中实现多表关联并处理Job间的依赖和参数传递。它可能包括多个Job,每个Job负责一个或多个表的处理,并通过特定机制将结果传递给后续的Job。 5. **...
同时,Hadoop通过MapReduce实现数据处理的并行化,极大地提升了处理大数据的速度。 Hadoop不仅作为数据存储的补充,还能作为数据湖,用于长期存储大量数据,以便在需要时进行分析。此外,Hadoop优化了大文件存储和...
4. 流处理改进:Spark Streaming引入了新的DStream操作,如join和window,增强了实时流处理的能力。 二、Hadoop 2.7集成 Hadoop 2.7是一个稳定且广泛使用的分布式存储和计算框架,其YARN资源管理系统为Spark提供了...
课程内容还涉及了如何从nginx日志中提取访问量最高的IP地址,使用Unix/Linux的工具链,如awk、grep、sort、join等进行简单的日志分析。 综上所述,本课程深入介绍了Hadoop在Web日志分析中的应用,从基本的日志概念...
MapReduce则是用于并行处理和计算的大数据处理模型,由“Map”阶段和“Reduce”阶段组成,实现了数据的分而治之。 在Hadoop面试中,可能会遇到以下几个关键知识点: 1. Hadoop的架构:理解Hadoop的主节点...
- **不支持记录级别操作**:无法直接更新、插入或删除单条记录,通常通过创建新表或写入文件来实现数据更新。 - **ETL工具**:Hive支持数据提取、转换和加载,适合大规模数据的预处理和分析。 - **类SQL查询语言...
- MapReduce框架下的Join实现方法。 - 外部排序和归并Join算法的应用。 - Hive SQL中的Join操作及其优化策略。 - 实际应用场景中的性能对比和案例分析。 #### 四、配置Hive元数据DB为PostgreSQL - **目的**: 将...
- **解释**: Sqoop是一个用于在Hadoop和关系型数据库之间传输数据的工具,它通过JDBC驱动程序连接到关系型数据库,从而实现数据的导入和导出。因此,正确答案是C:JDBC。 ### 27. Oracle数据导入HDFS的方法 - **...