Reduce-side joining / repartitioned sort-merge join
Note:DataJoinReducerBase, on the other hand, is the workhorse of the datajoin package, and it simplifies our programming by performing a full outer join for us. Our reducer subclass only has to implement the combine() method to filter out unwanted combinations to get the desired join operation (inner join, left outer join, etc.). It’s also in the combine() method that we format the combination into the appropriate
output format.
When run the sample code in <<hadoop in action>> chapter 5. There will some errors coming up. see
http://stackoverflow.com/questions/10201500/hadoop-reduce-side-join-using-datajoin
The correct code after modified likes
package com.test.datamine.topic; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DataJoin extends Configured implements Tool { public static class MapClass extends DataJoinMapperBase { protected Text generateInputTag(String inputFile) { String datasource = inputFile.split("-")[0]; return new Text(datasource); } protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(","); String groupKey = tokens[0]; return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class Reduce extends DataJoinReducerBase { protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2) return null; String joinedStr = ""; for (int i = 0; i < values.length; i++) { if (i > 0) joinedStr += ","; TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(",", 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable() { this.tag = new Text(); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void setData(Writable data) { this.data = data; } public void write(DataOutput out) throws IOException { // this.tag.write(out); // this.data.write(out); this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } public void readFields(DataInput in) throws IOException { // this.tag.readFields(in); // this.data.readFields(in); this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } this.data.readFields(in); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoin.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); FileSystem fs = in.getFileSystem(conf); fs.delete(out); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DataJoin(), args); System.exit(res); } }
The test data is consist of two files
customers
1,Stephanie Leung,555-555-5555 2,Edward Kim,123-456-7890 3,Jose Madriz,281-330-8004 4,David Stork,408-555-0000
orders
3,A,12.95,02-Jun-2008 1,B,88.25,20-May-2008 2,C,32.00,30-Nov-2007 3,D,25.02,22-Jan-2009
the joined result is
1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008 2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007 3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009 3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
相关推荐
hadoop-0.21.0-datajoin.jar
java运行依赖jar包
java运行依赖jar包
java运行依赖jar包
文件汗有三个java类,两个测试文件txt ReduceClass.java MapClass.java TaggedRecordWritable.java customers.txt ...经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.
Write Pig Latin scripts to sort, group, join, project, and filter your data Use Grunt to work with the Hadoop Distributed File System (HDFS) Build complex data processing pipelines with Pig’s macros ...
org.apache.hadoop.contrib.utils.join org.apache.hadoop.examples org.apache.hadoop.examples.dancing org.apache.hadoop.examples.pi org.apache.hadoop.examples.pi.math org.apache.hadoop.examples....
- **解释**: 在Hadoop中,`dfs.data.dir`配置项用于指定DataNode存储数据块的本地目录。因此,正确答案是C:DataNode存储数据块的目录。 ### 10. 修改Namenode Web端口 - **知识点**: 修改Namenode的Web端口需要...
《Programming Pig Dataflow Scripting with Hadoop》第二版是一本深入探讨使用Apache Pig进行数据流脚本编程的专业书籍。在Hadoop生态系统中,Pig是一个强大的工具,它为大数据处理提供了一个高级语言,使得开发者...
### Hadoop开发规范:HiveSQL开发指南 #### 一、概述 HiveSQL作为Hadoop生态中的一个重要组件,主要用于处理大规模数据集的查询与分析。为了提高代码的可读性、可维护性和效率,制定一套合理的HiveSQL开发规范至关...
data = sc.textFile("hdfs://localhost:9000/user/hadoop/data.txt") # 将数据转换为整数 numbers = data.map(lambda line: int(line)) # 在内存中缓存数据 numbers.cache() # 执行计算 sum = numbers....
[INFO] Apache Hadoop Data Join ........................... SUCCESS [5.463s] [INFO] Apache Hadoop Extras .............................. SUCCESS [3.394s] [INFO] Apache Hadoop Pipes ........................
[INFO] Apache Hadoop Data Join ........................... SUCCESS [5.463s] [INFO] Apache Hadoop Extras .............................. SUCCESS [3.394s] [INFO] Apache Hadoop Pipes ........................
在 eBay 的 Carmel SQL-on-Hadoop 查询引擎中,团队针对 Spark 的 Skew Join 进行了进一步的优化和扩展,以适应内部大规模查询需求。他们可能采用了自定义的策略来检测和处理数据倾斜,比如使用预处理步骤来识别倾斜...
【大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第14期副刊_Hive性能优化_V1.0 共19页.pdf】 本文档主要关注Hadoop集群中的Hive性能优化,旨在帮助读者理解如何提高Hive查询效率,避免性能瓶颈。Hive是...
data = sc.textFile("hdfs://localhost:9000/user/hadoop/data.txt") # 将数据转换为整数 numbers = data.map(lambda line: int(line)) # 在内存中缓存数据 numbers.cache() # 执行计算 sum = numbers....
NoSQL运动(Join the NoSQL movement)是对传统关系型数据库解决方案的一种补充,它适用于处理半结构化或非结构化的数据。本书可能会介绍如MongoDB、Cassandra和Redis等流行的NoSQL数据库,并解释它们是如何适应...
本书还介绍了如何使用SQL中的常见操作(如SELECT、WHERE、GROUP BY、JOIN)来理解MapReduce的工作原理。通过对这些操作的讨论,本书说明了如何在MapReduce环境中实现它们,帮助读者理解大数据问题的解决方案是通过将...
- **分区和桶(Bucketing)**:通过分区和桶,Hive 可以更高效地执行 join 操作,因为相同分区或桶内的数据通常存储在一起,减少了数据的扫描量。 - **减少 MapReduce 任务数量**:通过优化查询语句,合并不必要的...