In relational world, semi-join can be defined as a join between two tables returns rows from the first table where one or more matches are found in the second table. The difference between a semi-join and a conventional join is that rows in the first table will be returned at most once. Even if the second table contains two matches for a row in the first table, only one copy of the row will be returned.
Well in MapReduce, if you faced with the challenge of joining two large datasets together, the obvious choice is to go with a repartition join, which leverages the full MapReduce framework to perform the join on the reduce-side. In fact, this may be you only option if you can not filter one of the datasets to a size that can be cached on the map side. However, if you believe that you can pare down one dataset to a manageable size, you can use semi-join to accomplish the work. You can use three MapReduce jobs to join two datasets together to avoid the overhead of a reducer-side join. This technique is useful in situations where you're working with large datasets, but a job can be reduced down to a size that can fit into memory of a task by filtering out records that don't match the other dataset. The three MapReduce jobs that comprise a semi-join can be illustrated as below diagram:
In semi-join, we can leverage the replicated join introduced before (you can find details of replicated join or map-side join in this blog post).Without further ado, let me show you the code, also you can find them in Hadoop in Practice. The first job:
package com.manning.hip.ch4.joins.semijoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.HashSet; import java.util.Set; public class UniqueHashedKeyJob { public static void main(String... args) throws Exception { runJob(new Path(args[0]), new Path(args[1])); } public static void runJob(Path inputPath, Path outputPath) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(UniqueHashedKeyJob.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); if (!job.waitForCompletion(true)) { throw new Exception("Job failed"); } } public static class Map extends Mapper<Text, Text, Text, NullWritable> { private Set<String> keys = new HashSet<String>(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { System.out.println("K[" + key + "]"); keys.add(key.toString()); } @Override protected void cleanup( Context context) throws IOException, InterruptedException { Text outputKey = new Text(); for (String key : keys) { System.out.println("OutK[" + key + "]"); outputKey.set(key); context.write(outputKey, NullWritable.get()); } } } public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
The second one:
package com.manning.hip.ch4.joins.semijoin; import com.manning.hip.ch4.joins.replicated.GenericReplicatedJoin; import com.manning.hip.ch4.joins.replicated.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReplicatedFilterJob extends GenericReplicatedJoin { public static void main(String... args) throws Exception { runJob(new Path(args[0]), new Path(args[1]), new Path(args[2])); } public static void runJob(Path usersPath, Path uniqueUsersPath, Path outputPath) throws Exception { Configuration conf = new Configuration(); FileSystem fs = uniqueUsersPath.getFileSystem(conf); FileStatus uniqueUserStatus = fs.getFileStatus(uniqueUsersPath); if (uniqueUserStatus.isDir()) { for (FileStatus f : fs.listStatus(uniqueUsersPath)) { if (f.getPath().getName().startsWith("part")) { DistributedCache.addCacheFile(f.getPath().toUri(), conf); } } } else { DistributedCache.addCacheFile(uniqueUsersPath.toUri(), conf); } Job job = new Job(conf); job.setJarByClass(ReplicatedFilterJob.class); job.setMapperClass(ReplicatedFilterJob.class); job.setNumReduceTasks(0); job.setInputFormatClass(KeyValueTextInputFormat.class); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.setInputPaths(job, usersPath); FileOutputFormat.setOutputPath(job, outputPath); if (!job.waitForCompletion(true)) { throw new Exception("Job failed"); } } @Override public Pair join(Pair inputSplitPair, Pair distCachePair) { return inputSplitPair; } }
And the final one:
package com.manning.hip.ch4.joins.semijoin; import com.manning.hip.ch4.joins.replicated.GenericReplicatedJoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FinalJoinJob { public static void main(String... args) throws Exception { runJob(new Path(args[0]), new Path(args[1]), new Path(args[2])); } public static void runJob(Path userLogsPath, Path usersPath, Path outputPath) throws Exception { Configuration conf = new Configuration(); FileSystem fs = usersPath.getFileSystem(conf); FileStatus usersStatus = fs.getFileStatus(usersPath); if (usersStatus.isDir()) { for (FileStatus f : fs.listStatus(usersPath)) { if (f.getPath().getName().startsWith("part")) { DistributedCache.addCacheFile(f.getPath().toUri(), conf); } } } else { DistributedCache.addCacheFile(usersPath.toUri(), conf); } Job job = new Job(conf); job.setJarByClass(FinalJoinJob.class); job.setMapperClass(GenericReplicatedJoin.class); job.setNumReduceTasks(0); job.setInputFormatClass(KeyValueTextInputFormat.class); outputPath.getFileSystem(conf).delete(outputPath, true); FileInputFormat.setInputPaths(job, userLogsPath); FileOutputFormat.setOutputPath(job, outputPath); if (!job.waitForCompletion(true)) { throw new Exception("Job failed"); } } }
The jobs driver class:
package com.manning.hip.ch4.joins.semijoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class Main { public static void main(String... args) throws Exception { runJob(new Path(args[0]), new Path(args[1]), new Path(args[2])); } public static void runJob(Path smallFilePath, Path largeFilePath, Path workPath) throws Exception { Configuration conf = new Configuration(); FileSystem fs = workPath.getFileSystem(conf); fs.delete(workPath, true); fs.mkdirs(workPath); ///////////////////////////////////////////////////// // JOB 1 - Produce unique keys from the large file ///////////////////////////////////////////////////// Path uniqueKeyOutputPath = new Path(workPath, "unique"); UniqueHashedKeyJob.runJob(largeFilePath, uniqueKeyOutputPath); ///////////////////////////////////////////////////// // JOB 2 - Use the unique keys from the large file to // retain the contents of the small file that // match ///////////////////////////////////////////////////// Path filteredSmallOutputPath = new Path(workPath, "filtered"); ReplicatedFilterJob.runJob(smallFilePath, uniqueKeyOutputPath, filteredSmallOutputPath); ///////////////////////////////////////////////////// // JOB 3 - The final join ///////////////////////////////////////////////////// Path resultOutputPath = new Path(workPath, "result"); FinalJoinJob.runJob(largeFilePath, filteredSmallOutputPath, resultOutputPath); } }
See the comments inline with the code to find the detailed usage.
相关推荐
Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----...
Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
hadoop-mapreduce-examples-2.7.1.jar
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.6.5.jar 赠送原API文档:hadoop-mapreduce-client-core-2.6.5-javadoc.jar 赠送源代码:hadoop-mapreduce-client-core-2.6.5-sources.jar 包含翻译后的API文档:...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
hadoop-mapreduce-examples-2.6.5.jar 官方案例源码
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据...
MapReduce--->实现简单的数据清洗需要的数据文件
赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...
hadoop-mapreduce-examples-2.0.0-alpha.jar
赠送jar包:hadoop-mapreduce-client-jobclient-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.5.1-sources.jar; 赠送...
hadoop-mapreduce-client-core-2.5.1.jar,mapreduce必备组件,供学习使用 欢迎下载
赠送jar包:hadoop-mapreduce-client-jobclient-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.7.3-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-jobclient-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.5.1-sources.jar; 赠送...