`

memcached 客户端代码 Java memcached client学习(一致性hash)

阅读更多
昨天写了一篇短文 描述了淘宝面试的一些题。
今天下午 装了在ubuntu中装了 memcached ,装起来还是很简单的。
主要是装一下。
  memcached: wget http://memcached.googlecode.com/files/memcached-1.4.4.tar.gz  /data/
  libevent:wget  http://monkey.org/~provos/libevent-1.4.13-stable.tar.gz   /
  


自己分配了一个端口。那么我想还是从最简单的memcached  --
java memcached client源码,总的代码量还是很少的
主要是如下两个类:
MemcachedClient.java
SockIOPool.java

好 先看推荐的测试代码:
/**
 * Copyright (c) 2008 Greg Whalin
 * All rights reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the BSD license
 *
 * This library is distributed in the hope that it will be
 * useful, but WITHOUT ANY WARRANTY; without even the implied
 * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
 * PURPOSE.
 *
 * You should have received a copy of the BSD License along with this
 * library.
 *
 * @author greg whalin <greg@meetup.com> 
 */
package com.meetup.memcached.test;

import com.meetup.memcached.*;
import org.apache.log4j.*;

public class TestMemcached  {  
	public static void main(String[] args) {
		      // memcached should be running on port 11211 but NOT on 11212

		BasicConfigurator.configure();
		String[] servers = { "localhost:11211"};
		SockIOPool pool = SockIOPool.getInstance();
		pool.setServers( servers );
		pool.setFailover( true );
		pool.setInitConn( 10 ); 
		pool.setMinConn( 5 );
		pool.setMaxConn( 250 );
		pool.setMaintSleep( 30 );
          //这是开启一个nagle 算法。改算法避免网络中充塞小封包,提高网络的利用率
		pool.setNagle( false );
		pool.setSocketTO( 3000 );
		pool.setAliveCheck( true );
		pool.initialize();

		MemcachedClient mcc = new MemcachedClient();

		// turn off most memcached client logging:
		com.meetup.memcached.Logger.getLogger( MemcachedClient.class.getName() ).setLevel( com.meetup.memcached.Logger.LEVEL_WARN );

		for ( int i = 0; i < 10; i++ ) {
			boolean success = mcc.set( "" + i, "Hello!" );
			String result = (String)mcc.get( "" + i );
			System.out.println( String.format( "set( %d ): %s", i, success ) );
			System.out.println( String.format( "get( %d ): %s", i, result ) );
		}

}


其实 对于我来说 我很想明白的是连接池是如何配置的,关键在于 pool.initialize(); 这个方法如何初始化的。

/** 
	 * Initializes the pool. 
	 */
	public void initialize() {

		synchronized( this ) {

			// check to see if already initialized
			if ( initialized
					&& ( buckets != null || consistentBuckets != null )
					&& ( availPool != null )
					&& ( busyPool != null ) ) {
				log.error( "++++ trying to initialize an already initialized pool" );
				return;
			}

			// pools
			availPool   = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );
			busyPool    = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn );
			deadPool    = new IdentityHashMap<SockIO,Integer>();

			hostDeadDur = new HashMap<String,Long>();
			hostDead    = new HashMap<String,Date>();
			maxCreate   = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier;		// only create up to maxCreate connections at once

			if ( log.isDebugEnabled() ) {
				log.debug( "++++ initializing pool with following settings:" );
				log.debug( "++++ initial size: " + initConn );
				log.debug( "++++ min spare   : " + minConn );
				log.debug( "++++ max spare   : " + maxConn );
			}

			// if servers is not set, or it empty, then
			// throw a runtime exception
			if ( servers == null || servers.length <= 0 ) {
				log.error( "++++ trying to initialize with no servers" );
				throw new IllegalStateException( "++++ trying to initialize with no servers" );
			}

			// initalize our internal hashing structures
			if ( this.hashingAlg == CONSISTENT_HASH )
				populateConsistentBuckets();
			else
				populateBuckets();

			// mark pool as initialized
			this.initialized = true;

			// start maint thread
			if ( this.maintSleep > 0 )
				this.startMaintThread();
		}
	}




如上代码流程如下:
1  检测是否已经被初始化
2  定义可用链接 ,繁忙链接池
3  判断是否一致性hash算法 还是普通的算法
4  定义一个后台线程 ,来维护
好 ,首先来分析下一致性hash算法。

从如下代码来分析 :
        if ( 
                  this.hashingAlg == CONSISTENT_HASH )
				populateConsistentBuckets();


/** 将server添加到一致性hash的2的32次 圆环  **/
	private void populateConsistentBuckets() {
		if ( log.isDebugEnabled() )
			log.debug( "++++ initializing internal hashing structure for consistent hashing" );

		// store buckets in tree map 
		this.consistentBuckets = new TreeMap<Long,String>();

		MessageDigest md5 = MD5.get();
		//得到总的权重
		if ( this.totalWeight <= 0 && this.weights !=  null ) {
			for ( int i = 0; i < this.weights.length; i++ )
				this.totalWeight += ( this.weights[i] == null ) ? 1 : this.weights[i];
		}
		else if ( this.weights == null ) {
			this.totalWeight = this.servers.length;
		}
		
	
		for ( int i = 0; i < servers.length; i++ ) {
			//每台服务器的权重
			int thisWeight = 1;
			if ( this.weights != null && this.weights[i] != null ) {
				thisWeight = this.weights[i];
			}
			
			//有兴趣的朋友可以参考平衡Hash 算法的另一个指标是平衡性 (Balance) ,定义如下:	平衡性 平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用
			//了解决这种情况, consistent hashing 引入了“虚拟节点”的概念,它可以如下定义:	“虚拟节点”( virtual node )是实际节点在 hash 空间的复制品( replica ),一实际个节点对应了若干个“虚拟节点”,这个对应个数也成为“复制个数”,“虚拟节点”在 hash 空间中以 hash 值排列。
			double factor = Math.floor(((double)(40 * this.servers.length * thisWeight)) / (double)this.totalWeight);
			
			for ( long j = 0; j < factor; j++ ) { //加密规则类似 127.0.0.1_1  
				byte[] d = md5.digest( ( servers[i] + "-" + j ).getBytes() ); //转化成16位的字节数组
				//16位二进制数组每4位为一组,每组第4个值左移24位,第三个值左移16位,第二个值左移8位,第一个值不移位。进行或运算,得到一个小于2的32 次方的long值
				for ( int h = 0 ; h < 4; h++ ) { //因为是16位 
					Long k =  //实际上每个字节进行了运算
						  ((long)(d[3+h*4]&0xFF) << 24) 
						| ((long)(d[2+h*4]&0xFF) << 16) 
						| ((long)(d[1+h*4]&0xFF) << 8)  
						| ((long)(d[0+h*4]&0xFF));

					consistentBuckets.put( k, servers[i] );
					if ( log.isDebugEnabled() )
						log.debug( "++++ added " + servers[i] + " to server bucket" );
				}				
			}
			
			

			// create initial connections
			if ( log.isDebugEnabled() )
				log.debug( "+++ creating initial connections (" + initConn + ") for host: " + servers[i] );

			//创建链接
			for ( int j = 0; j < initConn; j++ ) {
				SockIO socket = createSocket( servers[i] );
				if ( socket == null ) {
					log.error( "++++ failed to create connection to: " + servers[i] + " -- only " + j + " created." );
					break;
				}
				//加入socket到连接池 这里慢慢谈
				addSocketToPool( availPool, servers[i], socket );
				if ( log.isDebugEnabled() )
					log.debug( "++++ created and added socket: " + socket.toString() + " for host " + servers[i] );
			}
		}
		
	}


好 比如说 我们调用了 如下代码:
MemcachedClient mcc = new MemcachedClient();
mcc.set("6", 1);


这里key 如何定位到一台server呢?我先把一致性hash算法的定位方法说下。
//得到定位server的Socket封装对象
SockIOPool.SockIO sock = pool.getSock( key, hashCode );


//计算出key对应的hash值(md5) ,然后
long bucket = getBucket( key, hashCode );


//得到大于hash的map,因为treemap已经排好序了。调用tailMap可以得到大于等于这个hash的对象 ,然后调用firstKey得到圆环上的hash值
SortedMap<Long,String> tmap =
			this.consistentBuckets.tailMap( hv );
		return ( tmap.isEmpty() ) ? this.consistentBuckets.firstKey() : tmap.firstKey();



明天开始继续看代码 ,还是要坚持。
一致性hash的算法参考
http://xok.la/2010/06/memcache_consistent_hashing.html




分享到:
评论

相关推荐

    memCache源码java客户端

    1. **MemcachedClient**: 这是客户端的主要类,负责建立与memcached服务器的连接,执行各种操作。 2. **Operation**: 包含了各种操作接口,如`get`, `set`, `delete`, `increment`等,它们是异步执行的。 3. **...

    神级memcached源代码分析文档_1.4.0代码分析

    客户端的哈希算法决定了key如何映射到特定的server,通常采用一致性哈希,以保证server增减时,数据迁移的影响最小化。 总结,通过深入分析Memcached的源代码,我们可以了解到其高效运行背后的设计思想和技术实现,...

    Java开发中的Memcache原理及实现

    MemcachedClient client = new MemcachedClient(new InetSocketAddress("localhost", 11211)); // 存储数据 client.set("myKey", 60, "myValue"); // key存活60秒 // 获取数据 String value = (String) ...

    数据平台缓存技术方案Memcached-Redis[汇编].pdf

    一致性哈希算法是解决 Memcached 客户端选择服务器问题的解决方案。该算法可以取代传统的取模操作,解决了上述取模操作无法应对增删服务器所遇到的问题。然而,普通一致性哈希算法有潜在的问题是节点 Hash 后会不...

    Memcached的封装优化及相关操作api

    - `MemcachedClient`:客户端实例,负责与服务器通信。 - `OperationFuture`:异步操作的未来结果,等待操作完成并获取结果。 - `CacheManager`:管理多个缓存实例,支持动态添加或删除服务器。 - `...

    pymemcache

    ### pymemcache:一个全面、快速的纯Python memcached客户端库 #### 一、基本使用方法 `pymemcache` 是一个纯Python编写的memcached客户端库,它提供了高效且简便的方式来与memcached服务器进行交互。下面我们将...

    libmemcached

    - **一致性哈希(Consistent Hashing)**: 通过一致性哈希算法,`libmemcached`可以在添加或删除服务器时最小化数据重新分布的影响。 - **缓存失效策略**: 提供了TTL(Time To Live)和LRU(Least Recently Used)等...

    人人网技术架构介绍(人人网-黄晶)

    在实际应用中,NuclearRose有普通节点(Node)负责处理客户端请求和数据存储,中心节点(Seed)维护整个实例的拓扑关系并执行健康检查,而Nuclear Client则向用户提供CRUD(创建、读取、更新、删除)操作的API接口。...

    beansdb设计与实现

    - **最终一致性**:BeansDB在设计上追求最终一致性,即在短期内可能会存在数据不一致的情况,但最终所有副本都会达到一致的状态。 - **高可用性**:即使某些节点发生故障,整个系统仍然可以正常运行,不会导致服务...

    Redis 35 道面试题及答案.docx

    - Redis的操作具有原子性,确保了并发环境下的数据一致性。 - 提供了持久化选项,如RDB(Redis Database Backup)内存快照和AOF(Append Only File)日志,以在系统崩溃后恢复数据。 - 支持主从复制,可实现高...

Global site tag (gtag.js) - Google Analytics