`

infinispan5的MapReduce使用

    博客分类:
  • J2EE
 
阅读更多

参考

https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework#InfinispanDistributedExecutionFramework-MapReducemodel

 

 

首先下载我们的infinispan5的运行环境

https://github.com/infinispan/infinispan-quickstart.git

 

 

中org.infinispan.quickstart.clusteredcache.distribution.NodeClient类为

 

 

package org.infinispan.quickstart.clusteredcache.distribution;

import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.Map.Entry;

import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.config.Configuration.CacheMode;
import org.infinispan.distexec.mapreduce.Collator;
import org.infinispan.distexec.mapreduce.Collector;
import org.infinispan.distexec.mapreduce.MapReduceTask;
import org.infinispan.distexec.mapreduce.Mapper;
import org.infinispan.distexec.mapreduce.Reducer;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;

/**
 * @author :xiaofancn
 * @version :2011-11-30 下午10:15:44
 * 
 */

public class NodeClient {

	public static EmbeddedCacheManager createCacheManagerProgramatically() {

		return new DefaultCacheManager(GlobalConfiguration
				.getClusteredDefault().fluent().transport()
				.addProperty("configurationFile", "jgroups.xml").build(),
				new Configuration().fluent().mode(CacheMode.DIST_ASYNC).hash()
						.numOwners(2).build());
	}

	static class WordCountMapper implements
			Mapper<String, String, String, Integer> {
		private static final long serialVersionUID = 1L;

		@Override
		public void map(String key, String value, Collector<String, Integer> c) {
			StringTokenizer tokens = new StringTokenizer(value);
			while (tokens.hasMoreElements()) {
				System.out.println(key + "=================" + value);
				String s = (String) tokens.nextElement();
				c.emit(s, 1);
			}
		}
	}

	static class WordCountReducer implements Reducer<String, Integer> {
		/** The serialVersionUID */
		private static final long serialVersionUID = 1901016598354633256L;

		@Override
		public Integer reduce(String key, Iterator<Integer> iter) {
			int sum = 0;
			while (iter.hasNext()) {
				Integer i = (Integer) iter.next();
				sum += i;
			}
			System.out.println(key+":"+sum);
			return sum;

		}
	}

	public static void main(String[] args) {

		Cache<String, String> cache = createCacheManagerProgramatically()
				.getCache("Demo");
		
		
		int count = 1000000000;
		for (int i = 0; i < count; i++) {
			cache.put("" + i, "" + i);
		}
		cache.put("" + count, "" + 4);

		System.out.println(new Date());
		Long start = System.currentTimeMillis();

		MapReduceTask<String, String, String, Integer> task = new MapReduceTask<String, String, String, Integer>(
				cache);
		task.mappedWith(new WordCountMapper()).reducedWith(
				new WordCountReducer());
		String mostFrequentWord = task
				.execute(new Collator<String, Integer, String>() {
					@Override
					public String collate(Map<String, Integer> reducedResults) {
						String mostFrequent = "";
						int maxCount = 0;
						for (Entry<String, Integer> e : reducedResults
								.entrySet()) {
							Integer count = e.getValue();
							if (count > maxCount) {
								maxCount = count;
								mostFrequent = e.getKey();
							}
						}
						return mostFrequent;
					}

				});
		System.out.println(new Date());
		System.out.println(System.currentTimeMillis() - start);
		System.out.println("The most frequent word is " + mostFrequentWord);

	}

}
 

分别运行

org.infinispan.quickstart.clusteredcache.distribution.Node0

org.infinispan.quickstart.clusteredcache.distribution.Node1

org.infinispan.quickstart.clusteredcache.distribution.Node2

  org.infinispan.quickstart.clusteredcache.distribution.NodeClient  

分享到:
评论

相关推荐

    castmapr:CastMapR - 用于 Hazelcast 的 MapReduce - 停产!

    CastMapR 的 API 深受 Infinispan 的 MapReduce 实现的启发(感谢 RedHat 的人!),它完全透明地可用于 HazelcastClient 连接。 CastMapR 支持在 IMap、IList 和 MultiMap 上执行 MapReduce 任务,而

    基于Java的分布式计算的研究.zip

    例如,分散的集合可以在节点间共享,通过分布式缓存(如Hazelcast或Infinispan)实现高效的数据交换。 5. **ZooKeeper** ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着...

    基于Java技术的分布式系统的研究与设计.pdf

    6. **Java CORBA (Common Object Request Broker Architecture)**: 虽然现在较少使用,但CORBA曾是Java实现分布式计算的重要工具,提供了一种标准化的方式来使不同系统间的对象相互通信。 7. **Spring Framework**:...

    Java网络编程与分布式计算

    5. 分布式缓存:如Apache Ignite、Hazelcast和Infinispan等,这些Java库提供分布式缓存功能,可以跨多个节点存储和检索数据,提高性能。 6. 分布式数据库:例如Cassandra、MongoDB和HBase等,它们提供跨多台机器的...

    SSD8 Manning - Distributed Programming with JAVA

    《SSD8 Manning - Distributed Programming with JAVA》是关于使用Java进行分布式编程的参考资料,主要针对想要深入理解并实践Java在分布式系统中的应用的学习者。分布式编程是现代软件开发中的一个重要领域,它允许...

Global site tag (gtag.js) - Google Analytics