`
oywl2008
  • 浏览: 1051038 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

The “in-mapper combining” design pattern for Map/Reduce programming in Java

 
阅读更多

Introduction

I am reading a book by (Lin and Dyer 2010). This book is very informative about designing efficient algorithms under the Map/Reduce (M/R) programming paradigm. Of particular interest is the “in-mapper combining” design pattern that I came across while reading this book. As if engineers and data miners did not have to change their way of thinking enough while adapting to the M/R programming paradigm, our change in thinking and development must also be sensitive to the particular M/R framework as well. The in-mapper combining design pattern is meant to address some issues with M/R programming, and in particular, M/R programming under the Hadoop platform. In this blog I will discuss this in-mapper combining design patterns and show some examples. This design pattern seems to me an excellent technical screening problem—if you are so (un)fortunate. :) Hereafter, I will refer to the in-mapper combining design pattern with the acronym IMCDP.

Need for the design pattern

IMCDP may result in a more efficient algorithm implementation. I would not agree that it may result in a more efficient algorithm, per say, since it is not necessarily changing the running time complexity. In Lin’s book, the driving factor for IMCDP is to “substantially reduce both the number and size of key-value pairs that need to be shuffled from the mappers to the reducers.” 

In the canonical example of word counting, a key-value pair is emitted for every word found. For example, if we had 1,000 words, then 1,000 key-value pairs will be emitted from the mappers to the reducer(s). In between this handing off of data from the mappers to the reducer(s), a shuffle and sort step on the key-value pairs occurs. If the number of “intermediary” key-value pairs (these are they key-value pairs being sent from the mappers to the reducer(s)) are extremely high, then this amount of data could be a pain point in the speed of completing the overall M/R job. With IMCDP, the idea is to reduce the number of intermediary key-value pairs being sent from the mappers to the reducer(s). 

Of course, you may be wondering, why not just use a combiner (the mini-reducer)? The reason why we should not just use a combiner is because even if we explicitly set one for a M/R job, Hadoop may or may not run the combiner. With IMCDP, the engineer can explicitly and deterministically control how to reduce the number of intermediary key-value pairs.

Canonical word count mapper

The canonical word count mapper program is shown below. As stated before, for every word found, a corresponding intermediary key-value pair is emitted from the mapper to the reducer. Also, again, if we had 1,000,000 words, then 1,000,000 key-value pairs will be emitted from the mapper to the reducer. We do should try to optimize the implementation of this word count mapper by reducing the amount of data that needs to be passed from the mapper to the reducer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static class TokenizerMapper
 extends Mapper<Object, Text, Text, IntWritable> {
 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();
  
 public void map(Object key, Text value, Context context)
  throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
   word.set(itr.nextToken());
   context.write(word, one);
  }
 }
}

A local IMCDP

In Lin’s book, he suggests that we use an associative array (i.e. in Java, this is a Map) to store the words and their associated frequency. After we have counted all the words in the incoming text, then we emit each word and its associated frequency. I have modified the canonical word count mapper as below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static class TokenizerMapper
 extends Mapper<Object, Text, Text, IntWritable> {
 
 public void map(Object key, Text value, Context context)
  throws IOException, InterruptedException {
  Map<String, Integer> map = new HashMap<String, Integer>();
  StringTokenizer itr = new StringTokenizer(value.toString());
   
  while (itr.hasMoreTokens()) {
   String token = itr.nextToken();
   if(map.containsKey(token)) {
    int total = map.get(token).get() + 1;
    map.put(token, total);
   } else {
    map.put(token, 1);
   }
  }
   
  Iterator<Map.Entry<String, Integer>> it = map.entrySet().iterator();
  while(it.hasNext()) {
   Map.Entry<String, Integer> entry = it.next();
   String sKey = entry.getKey();
   int total = entry.getValue().intValue();
   context.write(new Text(sKey), new IntWritable(total));
  }
 }
}

I refer to this IMCDP approach as a local one because the associative array is local with respect to the method. As you can see, instead of emitting one key-value pair per token, now, we emit one key-value pair per word.

A global IMCDP

Lin suggest we may even do better than the local IMCDP with a global IMCDP approach. Instead of using an associative array per key-value input, we use an associative array per mapper. The associative array is outside of the method, and so I refer to this approach as global. However, no matter the strategy or name, both approaches, local and global IMCDP, are still considered local aggregating techniques with respect to the mapper. The code below shows the global IMCDP approach.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static class TokenizerMapper
 extends Mapper<Object, Text, Text, IntWritable> {
 
 private Map<String, Integer> map;
 
 public void map(Object key, Text value, Context context)
  throws IOException, InterruptedException {
  Map<String, Integer> map = getMap();
  StringTokenizer itr = new StringTokenizer(value.toString());
   
  while (itr.hasMoreTokens()) {
   String token = itr.nextToken();
   if(map.containsKey(token)) {
    int total = map.get(token).get() + 1;
    map.put(token, total);
   } else {
    map.put(token, 1);
   }
  
 }
  
 protected void cleanup(Context context)
  throws IOException, InterruptedException {
  Map<String, Integer> map = getMap();
  Iterator<Map.Entry<String, Integer>> it = map.entrySet().iterator();
  while(it.hasNext()) {
   Map.Entry<String, Integer> entry = it.next();
   String sKey = entry.getKey();
   int total = entry.getValue().intValue();
   context.write(new Text(sKey), new IntWritable(total));
  }
 }
 
 public Map<String, Integer> getMap() {
  if(null == map) //lazy loading
   map = new HashMap<String, Integer>();
  return map;
 }
}

As you can see, we override Mapper’s cleanup method, which is called only once before the Mapper is destroyed, to emit each word and its associated frequency. With the global IMCDP, we are emitting potentially even less key-value pairs to the reducer.

The global IMCDP approach may run into a memory limitation issue. If the associative array becomes very large and to the point where memory runs out, your mapper task will certainly crash. Lin suggests “flushing” the associative array every so often. Below, I show a way to flush the associative array.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public static class TokenizerMapper
 extends Mapper<Object, Text, Text, IntWritable> {
 
 private static final int FLUSH_SIZE = 1000;
 private Map<String, Integer> map;
 
 public void map(Object key, Text value, Context context)
  throws IOException, InterruptedException {
  Map<String, Integer> map = getMap();
  StringTokenizer itr = new StringTokenizer(value.toString());
   
  while (itr.hasMoreTokens()) {
   String token = itr.nextToken();
   if(map.containsKey(token)) {
    int total = map.get(token).get() + 1;
    map.put(token, total);
   } else {
    map.put(token, 1);
   }
  
 
  flush(context, false);
 }
 
 private void flush(Context context, boolean force)
  throws IOException, InterruptedException {
  Map<String, Integer> map = getMap();
  if(!force) {
   int size = map.size();
   if(size < FLUSH_SIZE)
    return;
  }
 
  Iterator<Map.Entry<String, Integer>> it = map.entrySet().iterator();
  while(it.hasNext()) {
   Map.Entry<String, Integer> entry = it.next();
   String sKey = entry.getKey();
   int total = entry.getValue().intValue();
   context.write(new Text(sKey), new IntWritable(total));
  }
 
  map.clear(); //make sure to empty map
 }
  
 protected void cleanup(Context context)
  throws IOException, InterruptedException {
  flush(context, true); //force flush no matter what at the end
  }
 }
 
 public Map<String, Integer> getMap() {
  if(null == map) //lazy loading
   map = new HashMap<String, Integer>();
  return map;
 }
}

Summary and conclusion

IMCDP is a way to possibly improve the speed of a M/R job by reducing the number of intermediary key-value pairs emitted from mappers to reducers. Unlike a combiner, which may or may not run at all, IMCDP can be controlled and will always run when implemented. There are several ways to implement IMCDP, what I refer to as local and global IMCDP, but they are still local aggregation design patterns with respect to the mapper. 

As always, I hoped you enjoyed reading and this blog post helped you. Cheers! Sib ntsib dua nawb mog! שלום!

Reference

 

 https://vangjee.wordpress.com/2012/03/07/the-in-mapper-combining-design-pattern-for-mapreduce-programming/

分享到:
评论

相关推荐

    mybatis-3-config/mapper.dtd 解决mybatis头文件报错

    如果更改mapper,此处应该是:-//mybatis.org//DTD Mapper 3.0//EN) ; 执行完上面后clean项目(Project-&gt;clean); 如果继续报错则更改出错的头文件 将 &lt;!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0/...

    mybatis-3-mapper.dtd

    mybatis-3-mapper.dtd

    mybatis-3-config.dtd mybatis-3-mapper.dtd

    MyBatis是一个流行的Java持久层框架,它允许程序员将SQL查询与Java代码相结合,从而实现更灵活、高效的数据库操作。在MyBatis中,有两个重要的DTD(文档类型定义)文件,即`mybatis-3-config.dtd`和`mybatis-3-...

    mybatis-3-mapper.rar

    MyBatis是一个流行的Java持久层框架,它简化了数据库操作,通过XML或注解的方式将SQL与Java代码绑定,使得开发者能够更专注于SQL本身。在处理`mybatis-3-mapper.dtd`这个问题时,我们需要深入理解MyBatis的Mapper...

    jackson-mapper-asl-1.9.13.jar

    Jackson库是Java开发中广泛使用的JSON处理库,其核心组件之一就是`jackson-mapper-asl`模块。标题中的`jackson-mapper-asl-1.9.13.jar`是这个模块的一个特定版本,用于处理JSON数据的序列化和反序列化。在本篇文章中...

    mybatis-3-mapper.dtd文件下载

    DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"&gt; &lt;mapper namespace="com.example.mapper.UserMapper"&gt; SELECT * FROM users WHERE id = #{id} &lt;/...

    mapper-4.1.5.jar

    &lt;tk-mapper.version&gt;4.1.5&lt;/tk-mapper.version&gt; &lt;pagehelper.version&gt;1.2.10&lt;/pagehelper.version&gt; &lt;/properties&gt; &lt;!-- Mybatis通用Mapper --&gt; &lt;groupId&gt;tk.mybatis&lt;/groupId&gt; &lt;artifactId&gt;...

    mybatis-3-mapper.dtd下载

    eclipse中mybatis得mapper文件不提示 1、下载该文件到你的硬盘文件夹下 2、windows --&gt;...加入xml文件下的-//mybatis.org//DTD Mapper 3.0//EN OK 3、关闭你的xml文件,刷新一下,ok。可以用了。

    eclipse中使用xml自动提示功能所需文件:mybatis-3-mapper.dtd

    eclipse中使用xml自动提示功能所需文件:mybatis-3-mapper.dtd eclipse中使用xml自动提示功能所需文件:mybatis-3-mapper.dtd eclipse中使用xml自动提示功能所需文件:mybatis-3-mapper.dtd

    mybatis-3-mapper.dtd.zip

    MyBatis是一个流行的Java持久层框架,它简化了数据库与Java应用之间的交互,通过XML或注解的方式将SQL语句映射为Java方法。在MyBatis中,`mybatis-3-mapper.dtd`文件扮演着至关重要的角色,它是MyBatis XML映射文件...

    jackson-mapper-asl-1.9.11.jar

    在Java开发领域,数据序列化和反序列化是至关重要的环节,Jackson库就是一款广泛使用的JSON处理框架,其中`jackson-mapper-asl-1.9.11.jar`是Jackson的一个关键组件,专门负责对象到JSON以及JSON到对象的映射。...

    jackson-core-asl-1.9.13及jackson-mapper-asl-1.9.13架包.rar

    可用于Spring MVC框架,spring MVC中返回使用@ResponseBody注解返回时,后台没报错,就控制台显示406 Not Acceptable 原因是缺少jackson的包:jackson-core-asl-1.9.2.jar和jackson-mapper-asl-1.9.2.jar

    Windows平台下Hadoop的Map/Reduce开发

    在Windows平台上进行Hadoop的Map/Reduce开发可能会比在Linux环境下多一些挑战,但通过详细的步骤和理解Map/Reduce的工作机制,开发者可以有效地克服这些困难。以下是对标题和描述中涉及知识点的详细说明: **Hadoop...

    java-object-mapper-benchmark, Java对象到对象映射框架的JMH基准.zip

    java-object-mapper-benchmark, Java对象到对象映射框架的JMH基准 Object-to-object映射框架微模块多层应用程序通常需要在不同对象模型之间进行映射( 比如 。 ipqos和实体。写这样的锅炉板映射代码是一个令人烦恼和...

    device-mapper-multipath-0.4.9-56.el6.i686.rpm

    device-mapper-multipath-0.4.9-56.el6.i686.rpm

Global site tag (gtag.js) - Google Analytics