`
sunwinner
  • 浏览: 203764 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

MapReduce Algorithm - Semi-joins

 
阅读更多

In relational world, semi-join can be defined as a join between two tables returns rows from the first table where one or more matches are found in the second table. The difference between a semi-join and a conventional join is that rows in the first table will be returned at most once. Even if the second table contains two matches for a row in the first table, only one copy of the row will be returned. 

 

Well in MapReduce, if you faced with the challenge of joining two large datasets together, the obvious choice is to go with a repartition join, which leverages the full MapReduce framework to perform the join on the reduce-side. In fact, this may be you only option if you can not filter one of the datasets to a size that can be cached on the map side. However, if you believe that you can pare down one dataset to a manageable size, you can use semi-join to accomplish the work. You can use three MapReduce jobs to join two datasets together to avoid the overhead of a reducer-side join. This technique is useful in situations where you're working with large datasets, but a job can be reduced down to a size that can fit into memory of a task by filtering out records that don't match the other dataset. The three MapReduce jobs that comprise a semi-join can be illustrated as below diagram:

 In semi-join, we can leverage the replicated join introduced before (you can find details of replicated join or map-side join in this blog post).Without further ado, let me show you the code, also you can find them in Hadoop in Practice. The first job:

package com.manning.hip.ch4.joins.semijoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

public class UniqueHashedKeyJob {
    public static void main(String... args) throws Exception {
        runJob(new Path(args[0]), new Path(args[1]));
    }

    public static void runJob(Path inputPath,
                              Path outputPath)
            throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf);

        job.setJarByClass(UniqueHashedKeyJob.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        outputPath.getFileSystem(conf).delete(outputPath, true);

        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        if (!job.waitForCompletion(true)) {
            throw new Exception("Job failed");
        }
    }

    public static class Map extends Mapper<Text, Text, Text, NullWritable> {
        private Set<String> keys = new HashSet<String>();

        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            System.out.println("K[" + key + "]");
            keys.add(key.toString());
        }

        @Override
        protected void cleanup(
                Context context)
                throws IOException, InterruptedException {
            Text outputKey = new Text();
            for (String key : keys) {
                System.out.println("OutK[" + key + "]");
                outputKey.set(key);
                context.write(outputKey, NullWritable.get());
            }
        }
    }

    public static class Reduce
            extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values,
                              Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}

 The second one:

package com.manning.hip.ch4.joins.semijoin;

import com.manning.hip.ch4.joins.replicated.GenericReplicatedJoin;
import com.manning.hip.ch4.joins.replicated.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReplicatedFilterJob extends GenericReplicatedJoin {
    public static void main(String... args) throws Exception {
        runJob(new Path(args[0]), new Path(args[1]), new Path(args[2]));
    }

    public static void runJob(Path usersPath,
                              Path uniqueUsersPath,
                              Path outputPath)
            throws Exception {

        Configuration conf = new Configuration();

        FileSystem fs = uniqueUsersPath.getFileSystem(conf);

        FileStatus uniqueUserStatus = fs.getFileStatus(uniqueUsersPath);

        if (uniqueUserStatus.isDir()) {
            for (FileStatus f : fs.listStatus(uniqueUsersPath)) {
                if (f.getPath().getName().startsWith("part")) {
                    DistributedCache.addCacheFile(f.getPath().toUri(), conf);
                }
            }
        } else {
            DistributedCache.addCacheFile(uniqueUsersPath.toUri(), conf);
        }

        Job job = new Job(conf);

        job.setJarByClass(ReplicatedFilterJob.class);
        job.setMapperClass(ReplicatedFilterJob.class);

        job.setNumReduceTasks(0);

        job.setInputFormatClass(KeyValueTextInputFormat.class);

        outputPath.getFileSystem(conf).delete(outputPath, true);

        FileInputFormat.setInputPaths(job, usersPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        if (!job.waitForCompletion(true)) {
            throw new Exception("Job failed");
        }
    }

    @Override
    public Pair join(Pair inputSplitPair, Pair distCachePair) {
        return inputSplitPair;
    }
}

 And the final one:

package com.manning.hip.ch4.joins.semijoin;

import com.manning.hip.ch4.joins.replicated.GenericReplicatedJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FinalJoinJob {
    public static void main(String... args) throws Exception {
        runJob(new Path(args[0]), new Path(args[1]), new Path(args[2]));
    }

    public static void runJob(Path userLogsPath,
                              Path usersPath,
                              Path outputPath)
            throws Exception {

        Configuration conf = new Configuration();

        FileSystem fs = usersPath.getFileSystem(conf);

        FileStatus usersStatus = fs.getFileStatus(usersPath);

        if (usersStatus.isDir()) {
            for (FileStatus f : fs.listStatus(usersPath)) {
                if (f.getPath().getName().startsWith("part")) {
                    DistributedCache.addCacheFile(f.getPath().toUri(), conf);
                }
            }
        } else {
            DistributedCache.addCacheFile(usersPath.toUri(), conf);
        }

        Job job = new Job(conf);

        job.setJarByClass(FinalJoinJob.class);
        job.setMapperClass(GenericReplicatedJoin.class);

        job.setNumReduceTasks(0);

        job.setInputFormatClass(KeyValueTextInputFormat.class);

        outputPath.getFileSystem(conf).delete(outputPath, true);

        FileInputFormat.setInputPaths(job, userLogsPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        if (!job.waitForCompletion(true)) {
            throw new Exception("Job failed");
        }
    }
}

 The jobs driver class:

package com.manning.hip.ch4.joins.semijoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class Main {
    public static void main(String... args) throws Exception {
        runJob(new Path(args[0]), new Path(args[1]), new Path(args[2]));
    }

    public static void runJob(Path smallFilePath,
                              Path largeFilePath,
                              Path workPath)
            throws Exception {

        Configuration conf = new Configuration();

        FileSystem fs = workPath.getFileSystem(conf);
        fs.delete(workPath, true);

        fs.mkdirs(workPath);


        /////////////////////////////////////////////////////
        // JOB 1 - Produce unique keys from the large file
        /////////////////////////////////////////////////////
        Path uniqueKeyOutputPath = new Path(workPath, "unique");
        UniqueHashedKeyJob.runJob(largeFilePath, uniqueKeyOutputPath);

        /////////////////////////////////////////////////////
        // JOB 2 - Use the unique keys from the large file to
        //         retain the contents of the small file that
        //         match
        /////////////////////////////////////////////////////
        Path filteredSmallOutputPath = new Path(workPath, "filtered");
        ReplicatedFilterJob.runJob(smallFilePath, uniqueKeyOutputPath,
                filteredSmallOutputPath);

        /////////////////////////////////////////////////////
        // JOB 3 - The final join
        /////////////////////////////////////////////////////
        Path resultOutputPath = new Path(workPath, "result");
        FinalJoinJob.runJob(largeFilePath, filteredSmallOutputPath, resultOutputPath);
    }

}

 See the comments inline with the code to find the detailed usage.

  • 大小: 99.1 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics