`
liudunxu2
  • 浏览: 31998 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
文章分类
社区版块
存档分类
最新评论

xmemcached中的一致性hash实现

 
阅读更多
1.内部使用TreeMap<Long, List<Session>>实现session存储和定位,其中key为hash值,value为这个hash值对应的session存储集合。其中并不是有多少session就有几个键值,而是会有NUM_REPS(默认为160)*session.size个键值。还需要与session的权重相乘
2.使用treemap的tailMap方法高效定位session位置(为的是与jdk1.5兼容,jdk6使用的是ceilingKey,metaq做消费者负载均衡默认使用的就是ceilingKey)。
3.性能与hash算法密切相关,因为使用的是sessionList.get(this.random.nextInt(size))方式,如果在buildmap时,多个session的hash映射到同一个key,使用get方式获取时,可能得不到正确的缓存位置。
4.使用虚拟节点,默认个数为session的个数的160倍,也跟session的权重有关,具体一致性hash的虚拟节点介绍,可以参考http://blog.csdn.net/x15594/article/details/6270242
/**
 *Copyright [2009-2010] [dennis zhuang(killme2008@gmail.com)]
 *Licensed under the Apache License, Version 2.0 (the "License");
 *you may not use this file except in compliance with the License.
 *You may obtain a copy of the License at
 *             http://www.apache.org/licenses/LICENSE-2.0
 *Unless required by applicable law or agreed to in writing,
 *software distributed under the License is distributed on an "AS IS" BASIS,
 *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 *either express or implied. See the License for the specific language governing permissions and limitations under the License
 */
package net.rubyeye.xmemcached.impl;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;

import net.rubyeye.xmemcached.HashAlgorithm;
import net.rubyeye.xmemcached.networking.MemcachedSession;

import com.google.code.yanf4j.core.Session;

/**
 * ConnectionFactory instance that sets up a ketama compatible connection.
 *
 * <p>
 * This implementation piggy-backs on the functionality of the
 * <code>DefaultConnectionFactory</code> in terms of connections and queue
 * handling. Where it differs is that it uses both the <code>
 * KetamaNodeLocator</code>
 * and the <code>HashAlgorithm.KETAMA_HASH</code> to provide consistent node
 * hashing.
 *
 * @see http://www.last.fm/user/RJ/journal/2007/04/10/392555/
 *
 * </p>
 */
/**
 * Consistent Hash Algorithm implementation,based on TreeMap.tailMap(hash)
 * method.
 * 
 * @author dennis
 * 
 */
public class KetamaMemcachedSessionLocator extends
AbstractMemcachedSessionLocator {

	static final int NUM_REPS = 160;
	private transient volatile TreeMap<Long, List<Session>> ketamaSessions = new TreeMap<Long, List<Session>>();
	private final HashAlgorithm hashAlg;
	private volatile int maxTries;
	private final Random random = new Random();

	/**
	 * compatible with nginx-upstream-consistent,patched by wolfg1969
	 */
	static final int DEFAULT_PORT = 11211;
	private final boolean cwNginxUpstreamConsistent;

	public KetamaMemcachedSessionLocator() {
		this.hashAlg = HashAlgorithm.KETAMA_HASH;
		this.cwNginxUpstreamConsistent = false;
	}

	public KetamaMemcachedSessionLocator(boolean cwNginxUpstreamConsistent) {
		this.hashAlg = HashAlgorithm.KETAMA_HASH;
		this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
	}

	public KetamaMemcachedSessionLocator(HashAlgorithm alg) {
		this.hashAlg = alg;
		this.cwNginxUpstreamConsistent = false;
	}

	public KetamaMemcachedSessionLocator(HashAlgorithm alg,
			boolean cwNginxUpstreamConsistent) {
		this.hashAlg = alg;
		this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
	}

	public KetamaMemcachedSessionLocator(List<Session> list, HashAlgorithm alg) {
		super();
		this.hashAlg = alg;
		this.cwNginxUpstreamConsistent = false;
		this.buildMap(list, alg);
	}

	private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
		TreeMap<Long, List<Session>> sessionMap = new TreeMap<Long, List<Session>>();

		for (Session session : list) {
			String sockStr = null;
			if (this.cwNginxUpstreamConsistent) {
				InetSocketAddress serverAddress = session
						.getRemoteSocketAddress();
				sockStr = serverAddress.getAddress().getHostAddress();
				if (serverAddress.getPort() != DEFAULT_PORT) {
					sockStr = sockStr + ":" + serverAddress.getPort();
				}
			} else {
				if (session instanceof MemcachedTCPSession) {
					// Always use the first time resolved address.
					sockStr = ((MemcachedTCPSession) session)
							.getInetSocketAddressWrapper()
							.getRemoteAddressStr();
				}
				if (sockStr == null) {
					sockStr = String.valueOf(session.getRemoteSocketAddress());
				}
			}
			/**
			 * Duplicate 160 X weight references
			 */
			int numReps = NUM_REPS;
			if (session instanceof MemcachedTCPSession) {
				numReps *= ((MemcachedSession) session).getWeight();
			}
			if (alg == HashAlgorithm.KETAMA_HASH) {
				for (int i = 0; i < numReps / 4; i++) {
					byte[] digest = HashAlgorithm.computeMd5(sockStr + "-" + i);
					for (int h = 0; h < 4; h++) {
						long k = (long) (digest[3 + h * 4] & 0xFF) << 24
								| (long) (digest[2 + h * 4] & 0xFF) << 16
								| (long) (digest[1 + h * 4] & 0xFF) << 8
								| digest[h * 4] & 0xFF;
						this.getSessionList(sessionMap, k).add(session);
					}

				}
			} else {
				for (int i = 0; i < numReps; i++) {
					long key = alg.hash(sockStr + "-" + i);
					this.getSessionList(sessionMap, key).add(session);
				}
			}
		}
		this.ketamaSessions = sessionMap;
		this.maxTries = list.size();
	}

	private List<Session> getSessionList(
			TreeMap<Long, List<Session>> sessionMap, long k) {
		List<Session> sessionList = sessionMap.get(k);
		if (sessionList == null) {
			sessionList = new ArrayList<Session>();
			sessionMap.put(k, sessionList);
		}
		return sessionList;
	}

	public final Session getSessionByKey(final String key) {
		if (this.ketamaSessions == null || this.ketamaSessions.size() == 0) {
			return null;
		}
		long hash = this.hashAlg.hash(key);
		Session rv = this.getSessionByHash(hash);
		int tries = 0;
		while (!this.failureMode && (rv == null || rv.isClosed())
				&& tries++ < this.maxTries) {
			hash = this.nextHash(hash, key, tries);
			rv = this.getSessionByHash(hash);
		}
		return rv;
	}

	public final Session getSessionByHash(final long hash) {
		TreeMap<Long, List<Session>> sessionMap = this.ketamaSessions;
		if (sessionMap.size() == 0) {
			return null;
		}
		Long resultHash = hash;
		if (!sessionMap.containsKey(hash)) {
			// Java 1.6 adds a ceilingKey method, but xmemcached is compatible
			// with jdk5,So use tailMap method to do this.
			SortedMap<Long, List<Session>> tailMap = sessionMap.tailMap(hash);
			if (tailMap.isEmpty()) {
				resultHash = sessionMap.firstKey();
			} else {
				resultHash = tailMap.firstKey();
			}
		}
		//
		// if (!sessionMap.containsKey(resultHash)) {
		// resultHash = sessionMap.ceilingKey(resultHash);
		// if (resultHash == null && sessionMap.size() > 0) {
		// resultHash = sessionMap.firstKey();
		// }
		// }
		List<Session> sessionList = sessionMap.get(resultHash);
		if (sessionList == null || sessionList.size() == 0) {
			return null;
		}
		int size = sessionList.size();
		return sessionList.get(this.random.nextInt(size));
	}

	public final long nextHash(long hashVal, String key, int tries) {
		long tmpKey = this.hashAlg.hash(tries + key);
		hashVal += (int) (tmpKey ^ tmpKey >>> 32);
		hashVal &= 0xffffffffL; /* truncate to 32-bits */
		return hashVal;
	}

	public final void updateSessions(final Collection<Session> list) {
		this.buildMap(list, this.hashAlg);
	}
}


分享到:
评论

相关推荐

    xmemcached 中文开发手册

    - 实现了一致性哈希算法,允许开发者在多个memcached服务器间智能地分配数据。 4. **允许设置节点权重**: - 可以根据服务器的能力和当前负载情况,手动调整每个memcached节点接收数据的比例,从而实现更合理的...

    xmemcached

    在使用这个版本时,开发者可以期待一个更稳定、更健壮的Memcached客户端,以便在实际项目中实现高效的数据缓存。 总结来说,Xmemcached作为Java环境下Memcached的优秀客户端,通过提供强大的功能和优化的性能,使得...

    Xmemcached官方中文手册

    手册中会详细介绍如何配置Spring XML文件以启用Xmemcached,以及如何在代码中使用Spring托管的Memcached客户端。 **5. 开发实例** 手册中的开发实例部分,通过具体的代码示例展示了如何使用Xmemcached进行缓存操作...

    xmemcached 2.4.6.rar

    在2.4.6这个版本中,xmemcached继续优化了性能和稳定性,为开发分布式缓存应用提供了强大支持。本文将深入探讨xmemcached 2.4.6的关键特性和应用场景,以及如何有效地利用这个库来提升系统的性能。 一、xmemcached...

    Xmemcached一个java实现的分布式缓存

    - **一致性哈希**: 通过一致性哈希算法实现分布式环境中数据的均匀分布。 6. **优化与最佳实践** - **合理设置缓存大小**: 根据服务器资源和应用需求调整缓存大小,避免内存溢出。 - **合理设置过期时间**: 根据...

    xmemcached api doc

    2. **命令编码与解码**:xmemcached 内部实现了高效的命令编码和解码机制,确保数据在传输过程中的准确性和效率。 3. **异步操作支持**:除了同步 API,xmemcached 还提供异步操作接口,以非阻塞的方式处理请求,...

    xmemcached-1.4.3.jar

    2. 负载均衡:在分布式环境中,xmemcached能够实现键值对的智能分布,通过一致性哈希算法,将数据均匀分配到多个Memcached节点,减少数据迁移和热点问题。 3. 错误处理:xmemcached具有健全的错误处理机制,对于...

    xmemcached jar包,源文件,api

    - **支持ketama一致性哈希**:通过ketama一致性哈希策略,xmemcached能够在动态添加或删除服务器时保持数据的一致性,减少数据迁移。 - **扩展性**:支持自定义序列化和反序列化策略,方便处理各种复杂的数据类型。 ...

    xmemcached-1.2.4源码

    xmemcached-1.2.4的官方源码。 xmemcached XMemcached is a high performance, easy to use blocking multithreaded memcached client in java. It's nio based (using my opensource nio framework :yanf4j), ...

    xmemcached源码

    java的memcached客户端,支持一致性hash,支持动态增删服务器,客户端源码

    Xmemcached用户指南

    - **一致性哈希算法**:XMemcached实现了客户端侧的分布机制,并且内置了一致性哈希算法,使得数据可以在多个Memcached实例间均匀分布。 **4. 节点权重设置** - **灵活调节负载**:用户可以通过设置不同节点的权重...

    xmemcached-1.2.6.2

    xmemcached-1.2.6.2

    xmemcached.chm文档

    xmemcached.chm帮助文档

    memcache.spymemcached,和xmemcached 三种缓存实例

    - 基于一致性哈希的分布式策略 - 可配置的压缩和过期策略 2. **xmemcached**: xmemcached由Kenshoo公司开发,也是一个高性能、高可用性的Memcached Java客户端。与spymemcached相比,xmemcached提供了更全面的...

    Xmemcached用户指南 后端 - Java.zip

    - **一致性哈希**:为了在分布式环境中保持负载均衡,Xmemcached采用一致性哈希策略,减少数据迁移的影响。 5. **异常处理与性能调优** 在开发过程中,需要关注网络异常、超时异常等,合理处理这些异常以保证服务...

    Xmemcached测试实例

    资源中包括测试类以及依赖的jar包,绝对实用。 测试类包括Xmemcached客户端与memcached client for java两者,可运行比较性能。 XMemcached简介: XMemcached是基于 java nio的Memcached客户端,java nio相比于...

    Xmemcached memcached 实例

    类包括Xmemcached客户端实现和builder实现以及memcached client for java实现,对初学者有借鉴作用,特别是在开发简单例子时出现的超时情况的可以看看是否是同本事例相同。 xmemcached time out 5000 1000

    xmemcached 2.4.5 2.3.2

    4. API一致性:为了保持与Java编程规范的一致性,2.4.5版本可能对部分API进行了调整,使其更加符合Java开发者的使用习惯。 5. 日志系统更新:新的版本可能改进了日志记录系统,提供了更丰富的日志级别和更易读的...

Global site tag (gtag.js) - Google Analytics