`

Making Hadoop MapReduce Work with a Redis Cluster

 
阅读更多

 

Redis is a very cool open-source key-value store that can add instant value to your Hadoop installation. Since keys can contain strings, hashes, lists, sets and sorted sets, Redis can be used as a front end to serve data out of Hadoop, caching your ‘hot’ pieces of data in-memory for fast access when they are needed again. By using a Java client called Jedis, you can ingest and retrieve data with Redis. Combining this simple client with the power of MapReduce will let you write and read data to and from Redis in parallel.

 

In the code below, we use MapReduce to pull and push key/value pairs to any number of standalone Redis instances. We will be writing to, and reading from, a Redis hash, which maps string fields to string values, much like a Java HashMap. Each hash is uniquely identified by a hash key, similar to the names of tables. Each input and output format has two core configuration parameters: a CSV list of hostnames running a Redis instance, and the hash key. Similar to Hadoop’s default HashPartitioner, (key.hashCode() % number of Redis instances) is used to determine which Redis instance the key is written to. This random distribution will result in even data distribution, so long as your key-space isn’t skewed – but solving that problem is a topic for another post.


 With that said, let’s take a look at all the code. Pay attention to the comments, as they’ll tell you what is going on. First up is an implementation of OutputFormat. This class defines the key/value data types and behavior for writing to Redis instances via Jedis.

 // This output format class is templated to accept a key and value of type Text
public static class RedisHashOutputFormat extends OutputFormat<Text, Text> {

// These static conf variables and methods are used to modify the job configuration. 
// This is a common pattern for MapReduce related classes to avoid the magic string problem
public static final String REDIS_HOSTS_CONF = "mapred.redishashoutputformat.hosts";
public static final String REDIS_HASH_KEY_CONF = "mapred.redishashinputformat.key";

public static void setRedisHosts(Job job, String hosts) {
job.getConfiguration().set(REDIS_HOSTS_CONF, hosts);
}

public static void setRedisHashKey(Job job, String hashKey) {
job.getConfiguration().set(REDIS_HASH_KEY_CONF, hashKey);
}

// This method returns an instance of a RecordWriter for the task. 
// Note how we are pulling the variables set by the static methods during configuration
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
String hashKey = job.getConfiguration().get(REDIS_HASH_KEY_CONF);
String csvHosts = job.getConfiguration().get(REDIS_HOSTS_CONF);
return new RedisHashRecordWriter(hashKey, csvHosts);
}

// This method is used on the front-end prior to job submission to ensure everything is configured correctly
public void checkOutputSpecs(JobContext job) throws IOException {
String hosts = job.getConfiguration().get(REDIS_HOSTS_CONF);
if (hosts == null || hosts.isEmpty()) {
throw new IOException(REDIS_HOSTS_CONF + " is not set in configuration.");
}

String hashKey = job.getConfiguration().get(REDIS_HASH_KEY_CONF);
if (hashKey == null || hashKey.isEmpty()) {
throw new IOException(REDIS_HASH_KEY_CONF + " is not set in configuration.");
}
}

// The output committer is used on the back-end to, well, commit output. 
// Discussion of this class is out of scope, but more info can be found here
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
// use a null output committer, since
return (new NullOutputFormat<Text, Text>()).getOutputCommitter(context);
}

public static class RedisHashRecordWriter extends RecordWriter<Text, Text> {
// implementation of this static nested class is shown shortly
}
} // end RedisHashOutputFormat

 

The role of OutputFormat is to properly configure the job, ensuring that the RecordWriter implementation has everything it needs to work correctly. Once configured, the RecordWriter is what actually writes key/value pairs wherever you want them to go. A common practice is to make your RecordWriter (or reader) a static nested class, but that isn’t required. Let’s take a look at an implementation of RecordWriter:
// This class is template to write only Text keys and Text values 
	public static class RedisHashRecordWriter extends RecordWriter<Text, Text> { 
		// This map is used to map an integer to a Jedis instance 
		private HashMap<Integer, Jedis> jedisMap = new HashMap<Integer, Jedis>(); 
		// This is the name of the Redis hash 
		private String hashKey = null; 
		public RedisHashRecordWriter(String hashKey, String hosts) { 
			this.hashKey = hashKey; // Create a connection to Redis for each host 
			// Map an integer 0-(numRedisInstances - 1) to the instance 
			int i=0; for (String host : hosts.split(",")) {
				Jedis jedis = new Jedis(host); 
				jedis.connect(); 
				jedisMap.put(i++, jedis); 
				} 
			} // The write method is what will actually write the key value pairs out to Redis 
		public void write(Text key, Text value) throws IOException, InterruptedException { 
			// Get the Jedis instance that this key/value pair will be written to. 
			Jedis j = jedisMap.get(Math.abs(key.hashCode()) % jedisMap.size()); 
			// Write the key/value pair 
			j.hset(hashKey, key.toString(), value.toString()); 
			} 
		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
			// For each jedis instance, disconnect it 
			for (Jedis jedis : jedisMap.values()) {
				jedis.disconnect(); 
				} 
			} 
		} // end RedisRecordWriter -
	//See more at: http://blog.gopivotal.com/pivotal/products/making-hadoop-mapreduce-work-with-a-redis-cluster#sthash.qfnf9DRt.dpuf
		}
	}

 

 

This code demonstrates how simple it is to hook into external hosts for output. Such lightweight interfaces allow for endless possibilities, so long as the custom output formats can handle the parallel load of many map or reduce tasks.

Next up, let’s take a look at the InputFormat code to pull data out of our Redis instances. This is a bit more complex, as we’ll use a custom InputSplit implementation as well.

 

 

 

We create an InputSplit for each Redis host, and map task is created from each InputSplit. A single map task pulls all the data its assigned Redis instance.
// This input format will read all the data from a given set of Redis hosts
public static class RedisHashInputFormat extends InputFormat<Text, Text> {

// Again, the CSV list of hosts and a hash key variables and methods for configuration
public static final String REDIS_HOSTS_CONF = "mapred.redishashinputformat.hosts";
public static final String REDIS_HASH_KEY_CONF = "mapred.redishashinputformat.key";

public static void setRedisHosts(Job job, String hosts) {
job.getConfiguration().set(REDIS_HOSTS_CONF, hosts);
}

public static void setRedisHashKey(Job job, String hashKey) {
job.getConfiguration().set(REDIS_HASH_KEY_CONF, hashKey);
}

// This method will return a list of InputSplit objects.  The framework uses this to create an equivalent number of map tasks
public List<InputSplit> getSplits(JobContext job) throws IOException {

// Get our configuration values and ensure they are set
String hosts = job.getConfiguration().get(REDIS_HOSTS_CONF);
if (hosts == null || hosts.isEmpty()) {
throw new IOException(REDIS_HOSTS_CONF + " is not set in configuration.");
}

String hashKey = job.getConfiguration().get(REDIS_HASH_KEY_CONF);
if (hashKey == null || hashKey.isEmpty()) {
throw new IOException(REDIS_HASH_KEY_CONF + " is not set in configuration.");
}

// Create an input split for each Redis instance
// More on this custom split later, just know that one is created per host
List<InputSplit> splits = new ArrayList<InputSplit>();
for (String host : hosts.split(",")) {
splits.add(new RedisHashInputSplit(host, hashKey));
}

return splits;
}

// This method creates an instance of our RedisHashRecordReader
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new RedisHashRecordReader();
}

public static class RedisHashRecordReader extends RecordReader<Text, Text> {
// implementation of this static nested class is shown shortly
}

public static class RedisHashInputSplit extends InputSplit implements Writable {
// implementation of this static nested class is shown shortly
}

} // end RedisHashInputFormat
 
There are only two methods that adhere to the InputFormat abstract class: getSplits and createRecordReader. The example above demonstrates how simple it is to hook into external sources for output. The remaining static methods and variables are used to configure the job for the needs of the InputFormat and RecordReader implementations.
// This custom RecordReader will pull in all key/value pairs from a Redis instance for a given hash
public static class RedisHashRecordReader extends RecordReader<Text, Text> {

// A number of member variables to iterate and store key/value pairs from Redis
private Iterator<Entry<String, String>> keyValueMapIter = null;
private Text key = new Text(), value = new Text();
private float processedKVs = 0, totalKVs = 0;
private Entry<String, String> currentEntry = null;

// Initialize is called by the framework and given an InputSplit to process
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {

// Get the host location from the InputSplit
String host = split.getLocations()[0];
String hashKey = ((RedisHashInputSplit) split).getHashKey();

// Create a new connection to Redis
Jedis jedis = new Jedis(host);
jedis.connect();
jedis.getClient().setTimeoutInfinite();

// Get all the key/value pairs from the Redis instance and store them in memory
totalKVs = jedis.hlen(hashKey);
keyValueMapIter = jedis.hgetAll(hashKey).entrySet().iterator();
LOG.info("Got " + totalKVs + " from " + hashKey); jedis.disconnect();
}

// This method is called by Mapper’s run method to ensure all key/value pairs are read
public boolean nextKeyValue() throws IOException, InterruptedException {
if (keyValueMapIter.hasNext()) {
// Get the current entry and set the Text objects to the entry
currentEntry = keyValueMapIter.next();
key.set(currentEntry.getKey());
value.set(currentEntry.getValue());
return true;
} else {
return false;
}
}

// The next two methods are to return the current key/value pairs.  Best practice is to re-use objects rather than create new ones, i.e. don’t use “new”
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}

public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

// This method is used to report the progress metric back to the framework.  It is not required to have a true implementation, but it is recommended.
public float getProgress() throws IOException, InterruptedException {
return processedKVs / totalKVs;
}

public void close() throws IOException {
/* nothing to do */
}

} // end RedisHashRecordReader

Now that we’ve implemented a RecordReader, we need to determine what data is read by it. This is defined as an InputSplit implementation, which initializes the reader. The number of input splits determines the number of map tasks created by the framework.

This is a pretty simple task with Redis. We will create one map task for each Redis instance, which hosts a shard of our total data set. Each mapper will then connect to a single Redis instance and pull all of the data for the hash. This will all happen in parallel, similar to how MapReduce reads a file in parallel by reading its blocks. This split means we won’t overload a single Redis instance with too many connections. This is where InputSplit comes in.1

public static class RedisHashInputSplit extends InputSplit implements Writable {

// Two member variables, the hostname and the hash key (table name)
private String location = null;
private String hashKey = null;

public RedisHashInputSplit() {
// Default constructor required for reflection
}

public RedisHashInputSplit(String redisHost, String hash) {
this.location = redisHost;
this.hashKey = hash;
}

public String getHashKey() {
return this.hashKey;
}

// The following two methods are used to serialize the input information for an individual task
public void readFields(DataInput in) throws IOException {
this.location = in.readUTF();
this.hashKey = in.readUTF();
}

public void write(DataOutput out) throws IOException {
out.writeUTF(location);
out.writeUTF(hashKey);
}

// This gets the size of the split so the framework can sort them by size.  This isn’t that important here, but we could query a Redis instance and get the bytes if we desired
public long getLength() throws IOException, InterruptedException {
return 0;
}

// This method returns hints to the framework of where to launch a task for data locality
public String[] getLocations() throws IOException, InterruptedException {
return new String[] { location };
}

} // end RedisHashInputSplit
 

This demonstrates how to customize input and output using the MapReduce framework for Redis. Though it’s often overlooked, customizing I/O is a useful way to make MapReduce more flexible. If I’ve piqued your interest and you want to know more about customizing MapReduce I/O, check out chapter seven of MapReduce Design Patterns (O’Reilly 2012), “Input and Output Patterns.”

When implementing custom formats for yourself for other external sources, be mindful of how well these sources can scale, and what would happen if a task fails and is tried again. In some cases, such as this one, that doesn’t really matter. Data that was already written to Redis will just be overwritten with a new copy, and data pulled from Redis will simply be pulled again on the next attempt. Other cases, for example if we were writing to a Redis list rather than a hash, would require a little bit more effort. In this scenario, task retries would add duplicate entries to the list. It would take additional engineering to roll back committed entries, but worth the effort to ensure more fault-tolerant external outputs.

Now get out there and write some custom formats of your own!

- See more at: http://blog.gopivotal.com/pivotal/products/making-hadoop-mapreduce-work-with-a-redis-cluster#sthash.qfnf9DRt.dpuf
 
 
 
 
 
 
 

 

 

 

 

 

 

 

 

 

 

References

http://blog.gopivotal.com/pivotal/products/making-hadoop-mapreduce-work-with-a-redis-cluster

 

Hadoop Real World Solutions Cookbook

  • 大小: 467.9 KB
  • 大小: 281.2 KB
分享到:
评论

相关推荐

    Hadoop MapReduce实现tfidf源码

    在大数据处理领域,Hadoop MapReduce是一种广泛应用的分布式计算框架,它使得在大规模数据集上进行并行计算成为可能。本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document ...

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...

    大数据 hadoop mapreduce 词频统计

    【大数据Hadoop MapReduce词频统计】 大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的...

    Hadoop MapReduce实战手册(完整版)

    《Hadoop MapReduce实战手册》是一本专注于大数据处理技术的专著,主要针对Apache Hadoop中的MapReduce框架进行了深入的探讨。MapReduce是Hadoop生态系统中的核心组件之一,用于处理和生成大规模数据集。该书旨在...

    Hadoop MapReduce Cookbook 源码

    《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...

    python hadoop mapreduce 相似用户|mapreduce.rar

    在大数据处理领域,Python、Hadoop MapReduce是两个非常重要的工具。本文将深入探讨如何使用Python来编写Hadoop MapReduce程序,以实现微博关注者之间的相似用户分析。这个任务的关键在于理解并应用分布式计算原理,...

    Hadoop MapReduce v2 Cookbook, 2nd Edition-Packt Publishing(2015) 高清完整版PDF下载

    ### Hadoop MapReduce V2 知识点概览 #### 一、Hadoop MapReduce V2 生态系统介绍 **Hadoop MapReduce V2** 是Hadoop生态系统中的一个关键组件,用于处理大规模数据集。相较于V1版本,V2版本在架构上进行了重大...

    基于Apriori算法的频繁项集Hadoop mapreduce

    在大数据处理领域,Apriori算法与Hadoop MapReduce的结合是实现大规模数据挖掘的关键技术之一。Apriori算法是一种经典的关联规则学习算法,用于发现数据集中频繁出现的项集,进而挖掘出有趣的关联规则。而Hadoop ...

    Hadoop MapReduce.md

    本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...

    Hadoop MapReduce v2 Cookbook (第二版)

    Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...

    10.Hadoop MapReduce教程1

    Hadoop MapReduce 教程概述 Hadoop MapReduce 是 Hadoop 生态系统中的一部分,它是一种可靠的、可扩展的并行处理框架,用于处理大规模数据集。MapReduce 是一种编程模型,它将计算任务分解为两个阶段:Map 阶段和 ...

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...

    hadoop mapreduce编程实战

    Hadoop MapReduce 编程实战 Hadoop MapReduce 是大数据处理的核心组件之一,它提供了一个编程模型和软件框架,用于大规模数据处理。下面是 Hadoop MapReduce 编程实战的知识点总结: MapReduce 编程基础 ...

    hadoop mapreduce helloworld 能调试

    在大数据处理领域,Hadoop MapReduce 是一个至关重要的框架,它允许开发者编写分布式应用程序来处理海量数据。"Hadoop MapReduce HelloWorld 能调试" 的主题意味着我们将深入理解如何设置、运行以及调试 MapReduce ...

    Hadoop mapreduce 实现KMeans

    在大数据处理领域,Hadoop MapReduce 是一种广泛使用的分布式计算框架,它允许高效地处理海量数据。KMeans 是一种常见的无监督机器学习算法,用于聚类分析,将数据集中的对象按照相似性分组成不同的簇。现在我们来...

    Hadoop MapReduce v2 Cookbook.pdf

    《Hadoop MapReduce v2 Cookbook》是一本针对大数据处理领域的重要参考书籍,专注于介绍Hadoop MapReduce的最新版本——v2(也称为YARN,Yet Another Resource Negotiator)。Hadoop MapReduce是Apache Hadoop框架的...

Global site tag (gtag.js) - Google Analytics