`
jameswxx
  • 浏览: 777977 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

memcached客户端源码分析

    博客分类:
  • java
 
阅读更多

memcached的java客户端有好几种,http://code.google.com/p/memcached/wiki/Clients 罗列了以下几种

spymemcached

    * http://www.couchbase.org/code/couchbase/java
          o An improved Java API maintained by Matt Ingenthron and others at Couchbase.
          o Aggressively optimised, ability to run async, supports binary protocol, support Membase and Couchbase features, etc. See site for details.

Java memcached client

    * http://www.whalin.com/memcached
          o A Java API is maintained by Greg Whalin from Meetup.com.

More Java memcached clients

    * http://code.google.com/p/javamemcachedclient
    * http://code.google.com/p/memcache-client-forjava
    * http://code.google.com/p/xmemcached

Integrations

    * http://code.google.com/p/simple-spring-memcached
    * http://code.google.com/p/memcached-session-manager 

 

 


我看的是第二个:Java memcached client源码,代码很简洁,一共只有9个类,最主要的有以下三个
MemcachedClient.java     客户端,负责提供外出程序接口,如get/set方法等等

SockIOPool.java          一个自平衡的连接池
NativeHandler.java       负责部分数据类型的序列化

 

它包含以下几个部分
1:key的服务端分布
2:数据序列化和压缩
3:连接池(连接方式和池的动态自动平衡)
4:failover和failback机制
5:和memcached服务器的通讯协议

关于这几个点,我从key的set/get说起,会贯穿上面列举的4个部分。这个文章写下来,本来是作为一个笔记,思维比较跳跃,可能不是很连贯,如有疑问,欢迎站内交流。这个client的代码

很简洁明了,我也没有加过多注释,只是理了一个脉络。

 

 


从客户端自带的测试代码开始

package com.meetup.memcached.test;
import com.meetup.memcached.*;
import org.apache.log4j.*;

public class TestMemcached  { 
    public static void main(String[] args) {
        BasicConfigurator.configure();
        String[] servers = { "127.0.0.1:12000"};
        SockIOPool pool = SockIOPool.getInstance();
        pool.setServers( servers );
        pool.setFailover( true );//故障转移
     pool.setInitConn( 10 ); //初始化连接为10
        pool.setMinConn( 5 );//最小连接为5
        pool.setMaxConn( 250 );//最大连接为250
        pool.setMaintSleep( 30 );//平衡线程休眠时间为30ms
        pool.setNagle( false );//Nagle标志为false
        pool.setSocketTO( 3000 );//响应超时时间为3000ms
        pool.setAliveCheck( true );//需要可用状态检查
     //初始化连接池,默认名称为"default"
        pool.initialize();
        //新建一个memcached客户端,如果没有给名字
     MemcachedClient mcc = new MemcachedClient();

        // turn off most memcached client logging:
        com.meetup.memcached.Logger.getLogger( MemcachedClient.class.getName() ).setLevel( com.meetup.memcached.Logger.LEVEL_WARN );

        for ( int i = 0; i < 10; i++ ) {
            boolean success = mcc.set( "" + i, "Hello!" );
            String result = (String)mcc.get( "" + i );
            System.out.println( String.format( "set( %d ): %s", i, success ) );
            System.out.println( String.format( "get( %d ): %s", i, result ) );
        }

        System.out.println( "\n\t -- sleeping --\n" );
        try { Thread.sleep( 10000 ); } catch ( Exception ex ) { }

        for ( int i = 0; i < 10; i++ ) {
            boolean success = mcc.set( "" + i, "Hello!" );
            String result = (String)mcc.get( "" + i );
            System.out.println( String.format( "set( %d ): %s", i, success ) );
            System.out.println( String.format( "get( %d ): %s", i, result ) );
        }
    }
}

 


 
以上代码大概做了这几件事情:
初始化一个连接池
新建一个memcached客户端
set一个key/value
get一个key,并且打印出value
这是我们实际应用中很常见的场景。


连接池的创建和初始化
        连接池SockIOPool是非常重要的部分,它的好坏直接决定了客户端的性能。SockIOPool用一个HashMap持有多个连接池对象,连接池以名称作为标识,默认为"default"。看看

SockIOPool的getInstance方法就知道了。

  public static SockIOPool getInstance() {
        return getInstance("default");
    }
   
    public static synchronized SockIOPool getInstance(String poolName) {
        if (pools.containsKey(poolName)) return pools.get(poolName);

        SockIOPool pool = new SockIOPool();
        pools.put(poolName, pool);

        return pool;
    }

 

 


连接池实例化完成后,还需要初始化,看看pool.initialize()做了什么:

 

public void initialize() {
   //这里以自身作为同步锁,防止被多次初始化
   synchronized (this) {
   // 如果已经被初始化了则终止初始化过程
   if (initialized && (buckets != null || consistentBuckets != null) && (availPool != null)&& (busyPool != null)) {
      log.error("++++ trying to initialize an already initialized pool");
      return;
   }  
      // 可用连接集合
      availPool = new HashMap<String, Map<SockIO, Long>>(servers.length * initConn);
     //工作连接集合
     busyPool = new HashMap<String, Map<SockIO, Long>>(servers.length * initConn);  
     // 不可用连接集合         
     deadPool = new IdentityHashMap<SockIO, Integer>();
    hostDeadDur = new HashMap<String, Long>();
   hostDead = new HashMap<String, Date>();
   maxCreate = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier; 
   if (log.isDebugEnabled()) {
      log.debug("++++ initializing pool with following settings:");
      log.debug("++++ initial size: " + initConn);
      log.debug("++++ min spare   : " + minConn);
      log.debug("++++ max spare   : " + maxConn);
   }
   if (servers == null || servers.length <= 0) {
      log.error("++++ trying to initialize with no servers");
      throw new IllegalStateException("++++ trying to initialize with no servers");
   }
   // initalize our internal hashing structures
   if (this.hashingAlg == CONSISTENT_HASH) populateConsistentBuckets();
   else populateBuckets();
   // mark pool as initialized
   this.initialized = true;
   // start maint thread
   if (this.maintSleep > 0) this.startMaintThread();
  }
}

 
 

 

 

连接池的关闭

很简单,只是重置清空相关参数而已

public void shutDown() {
        synchronized (this) {
            if (log.isDebugEnabled()) log.debug("++++ SockIOPool shutting down...");

            if (maintThread != null && maintThread.isRunning()) {
                // stop the main thread
                stopMaintThread();

                // wait for the thread to finish
                while (maintThread.isRunning()) {
                    if (log.isDebugEnabled()) log.debug("++++ waiting for main thread to finish run +++");
                    try {
                        Thread.sleep(500);
                    } catch (Exception ex) {
                    }
                }
            }

            if (log.isDebugEnabled()) log.debug("++++ closing all internal pools.");
            closePool(availPool);
            closePool(busyPool);
            availPool = null;
            busyPool = null;
            buckets = null;
            consistentBuckets = null;
            hostDeadDur = null;
            hostDead = null;
            maintThread = null;
            initialized = false;
            if (log.isDebugEnabled()) log.debug("++++ SockIOPool finished shutting down.");
        }
    }

 

 

 

 

连接池的自动平衡
SockIOPool的initialize()方法最后有这么一行代码

// start maint thread
if (this.maintSleep > 0) this.startMaintThread();
 

 这是在初始化完成后,启动线程池平衡线程

  

  protected void startMaintThread() {
        if (maintThread != null) {
            if (maintThread.isRunning()) {
                log.error("main thread already running");
            } else {
                maintThread.start();
            }
        } else {
            maintThread = new MaintThread(this);
            maintThread.setInterval(this.maintSleep);
            maintThread.start();
        }
    }

  

 

MaintThread的run方法

 public void run() {
    this.running = true;
    while (!this.stopThread) {
        try {
            Thread.sleep(interval);
            // if pool is initialized, then
            // run the maintenance method on itself
            if (pool.isInitialized()) pool.selfMaint();
        } catch (Exception e) {
            break;
        }
    }
    this.running = false;
}

 
 其实最终的平衡方法是SockIOPool.selfMaint()

  

protected void selfMaint() {
        if (log.isDebugEnabled()) log.debug("++++ Starting self maintenance....");

        // go through avail sockets and create sockets
        // as needed to maintain pool settings
        Map<String, Integer> needSockets = new HashMap<String, Integer>();

        synchronized (this) {
            // 先统计每个服务器实例的可用连接是否小于最小可用连接数
        for (Iterator<String> i = availPool.keySet().iterator(); i.hasNext();) {
                String host = i.next();
                Map<SockIO, Long> sockets = availPool.get(host);

                if (log.isDebugEnabled()) log.debug("++++ Size of avail pool for host (" + host + ") = "
                                                    + sockets.size());

                // if pool is too small (n < minSpare)
                if (sockets.size() < minConn) {
                    // need to create new sockets
                    int need = minConn - sockets.size();
                    needSockets.put(host, need);
                }
            }
        }

        // 如果小于最小可用连接数,则要新建增加可用连接
     Map<String, Set<SockIO>> newSockets = new HashMap<String, Set<SockIO>>();

        for (String host : needSockets.keySet()) {
            Integer need = needSockets.get(host);

            if (log.isDebugEnabled()) log.debug("++++ Need to create " + need + " new sockets for pool for host: "
                                                + host);

            Set<SockIO> newSock = new HashSet<SockIO>(need);
            for (int j = 0; j < need; j++) {
                SockIO socket = createSocket(host);
                if (socket == null) break;
                newSock.add(socket);
            }
            newSockets.put(host, newSock);
        }

        // synchronize to add and remove to/from avail pool
        // as well as clean up the busy pool (no point in releasing
        // lock here as should be quick to pool adjust and no
        // blocking ops here)
        synchronized (this) {
            //将新建的连接添加到可用连接集合里
       for (String host : newSockets.keySet()) {
                Set<SockIO> sockets = newSockets.get(host);
                for (SockIO socket : sockets)
                    addSocketToPool(availPool, host, socket);
            }

            for (Iterator<String> i = availPool.keySet().iterator(); i.hasNext();) {
                String host = i.next();
                Map<SockIO, Long> sockets = availPool.get(host);
                if (log.isDebugEnabled()) log.debug("++++ Size of avail pool for host (" + host + ") = "
                                                    + sockets.size());
                 
                //如果可用连接超过了最大连接数,则要关闭一些
          if (sockets.size() > maxConn) {
                    // need to close down some sockets
                    int diff = sockets.size() - maxConn;
                    int needToClose = (diff <= poolMultiplier) ? diff : (diff) / poolMultiplier;

                    if (log.isDebugEnabled()) log.debug("++++ need to remove " + needToClose
                                                        + " spare sockets for pool for host: " + host);

                    for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {
                        if (needToClose <= 0) break;

                        // remove stale entries
                        SockIO socket = j.next();
                        long expire = sockets.get(socket).longValue();

                        // 这里回收可用连接池的闲置连接,连接设置到可用连接池里时,expire设置为当前时间。如果 (expire + maxIdle) < System.currentTimeMillis()为true,则表
明,该连接在可用连接池呆得太久了,需要回收
               if ((expire + maxIdle) < System.currentTimeMillis()) {
                            if (log.isDebugEnabled()) log.debug("+++ removing stale entry from pool as it is past its idle timeout and pool is over max spare");

                            // remove from the availPool
                            deadPool.put(socket, ZERO);
                            j.remove();
                            needToClose--;
                        }
                    }
                }
            }

            //清理正在工作的连接集合
        for (Iterator<String> i = busyPool.keySet().iterator(); i.hasNext();) {
                String host = i.next();
                Map<SockIO, Long> sockets = busyPool.get(host);
                if (log.isDebugEnabled()) log.debug("++++ Size of busy pool for host (" + host + ")  = "
                                                    + sockets.size());
                // loop through all connections and check to see if we have any hung connections
                for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {
                    // remove stale entries
                    SockIO socket = j.next();
                    long hungTime = sockets.get(socket).longValue();
                    //如果工作时间超过maxBusyTime,则也要回收掉,超过maxBusyTime,可能是服务器响应时间过长
             if ((hungTime + maxBusyTime) < System.currentTimeMillis()) {
                        log.error("+++ removing potentially hung connection from busy pool ... socket in pool for "
                                  + (System.currentTimeMillis() - hungTime) + "ms");

                        // remove from the busy pool
                        deadPool.put(socket, ZERO);
                        j.remove();
                    }
                }
            }
        }

        // 最后清理不可用连接集合
     Set<SockIO> toClose;
        synchronized (deadPool) {
            toClose = deadPool.keySet();
            deadPool = new IdentityHashMap<SockIO, Integer>();
        }

        for (SockIO socket : toClose) {
            try {
                socket.trueClose(false);
            } catch (Exception ex) {
                log.error("++++ failed to close SockIO obj from deadPool");
                log.error(ex.getMessage(), ex);
            }

            socket = null;
        }

        if (log.isDebugEnabled()) log.debug("+++ ending self maintenance.");
    }

 
 

 

 

key的服务器端分布

初始化方法其实就是根据每个服务器的权重,建立一个服务器地址集合,如果选择了一致性哈希,则对服务器地址进行一致性哈希分布,一致性哈希算法比较简单,如果不了解的同学,可以

自行google一下,initialize() 方法里有这段代码:

//一致性哈希

if (this.hashingAlg == CONSISTENT_HASH){
  populateConsistentBuckets();
}else populateBuckets();

 
 看看populateConsistentBuckets()方法

// 用一致性哈希算法将服务器分布在一个2的32次方的环里,服务器的分布位置<=servers.length*40*4

private void populateConsistentBuckets() {
    if (log.isDebugEnabled()) log.debug("++++ initializing internal hashing structure for consistent hashing");

    // store buckets in tree map
    this.consistentBuckets = new TreeMap<Long, String>();
    MessageDigest md5 = MD5.get();
    if (this.totalWeight <= 0 && this.weights != null) {
        for (int i = 0; i < this.weights.length; i++)
            this.totalWeight += (this.weights[i] == null) ? 1 : this.weights[i];
    } else if (this.weights == null) {
        this.totalWeight = this.servers.length;
    }

    for (int i = 0; i < servers.length; i++) {
       int thisWeight = 1;
       if (this.weights != null && this.weights[i] != null) thisWeight = this.weights[i];

      //这个值永远小于40 * this.servers.length,因为thisWeight/totalWeight永远小于1
     double factor = Math.floor(((double) (40 * this.servers.length * thisWeight)) / (double) this.totalWeight);

      //服务器的分布位置为factor*4,factor<=40*this.servers.length,所以服务器的分布位置& lt;=40*this.servers.length*4。
    for (long j = 0; j < factor; j++) {
          //md5值的二进制数组为16位
       byte[] d = md5.digest((servers[i] + "-" + j).getBytes());
           //16位二进制数组每4位为一组,每组第4个值左移24位,第三个值左移16位,第二个值左移8位,第一个值不移位。进行或运算,得到一个小于2的32 次方的long值。
        for (int h = 0; h < 4; h++) {
                Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) | ((long) (d[2 + h * 4] & 0xFF) << 16)
                             | ((long) (d[1 + h * 4] & 0xFF) << 8) | ((long) (d[0 + h * 4] & 0xFF));
                    consistentBuckets.put(k, servers[i]);
                    if (log.isDebugEnabled()) log.debug("++++ added " + servers[i] + " to server bucket");
             }
       }

       // create initial connections
       if (log.isDebugEnabled()) log.debug("+++ creating initial connections (" + initConn + ") for host: "
                                                + servers[i]);

       //创建连接
     for (int j = 0; j < initConn; j++) {
           SockIO socket = createSocket(servers[i]);
           if (socket == null) {
                log.error("++++ failed to create connection to: " + servers[i] + " -- only " + j + " created.");
                 break;
           }

           //添加到可用连接池
       addSocketToPool(availPool, servers[i], socket);
           if (log.isDebugEnabled()) log.debug("++++ created and added socket: " + socket.toString()
                                                    + " for host " + servers[i]);
       }
    }
}

    

 

   如果不是一致性哈希,则只是普通分布,很简单,只是根据权重将服务器地址放入buckets这个List里

private void populateBuckets() {
        if (log.isDebugEnabled()) log.debug("++++ initializing internal hashing structure for consistent hashing");

        // store buckets in tree map
        this.buckets = new ArrayList<String>();

        for (int i = 0; i < servers.length; i++) {
            if (this.weights != null && this.weights.length > i) {
                for (int k = 0; k < this.weights[i].intValue(); k++) {
                    this.buckets.add(servers[i]);
                    if (log.isDebugEnabled()) log.debug("++++ added " + servers[i] + " to server bucket");
                }
            } else {
                this.buckets.add(servers[i]);
                if (log.isDebugEnabled()) log.debug("++++ added " + servers[i] + " to server bucket");
            }

            // create initial connections
            if (log.isDebugEnabled()) log.debug("+++ creating initial connections (" + initConn + ") for host: "
                                                + servers[i]);

            for (int j = 0; j < initConn; j++) {
                SockIO socket = createSocket(servers[i]);
                if (socket == null) {
                    log.error("++++ failed to create connection to: " + servers[i] + " -- only " + j + " created.");
                    break;
                }

                //新建连接后,加入到可用连接集合里
           addSocketToPool(availPool, servers[i], socket);
                if (log.isDebugEnabled()) log.debug("++++ created and added socket: " + socket.toString()
                                                    + " for host " + servers[i]);
            }
        }
    }

 
 

 

 

如何创建socket连接

在上面的private void populateBuckets()方法里,createSocket(servers[i])是创建到服务器的连接,看看这个方法

 protected SockIO createSocket(String host) {
 SockIO socket = null;
 //hostDeadLock是一个可重入锁,它的变量声明为


 private final ReentrantLock             hostDeadLock    = new ReentrantLock();
 hostDeadLock.lock();
 try {
   //hostDead.containsKey(host)为true表示曾经连接过该服务器,但没有成功。
   //hostDead是一个HashMap,key为服务器地址,value为当时连接不成功的时间
   //hostDeadDur是一个HashMap,key为服务器地址,value为设置的重试间隔时间

    if (failover && failback && hostDead.containsKey(host) && hostDeadDur.containsKey(host)) {
        Date store = hostDead.get(host);
        long expire = hostDeadDur.get(host).longValue();

       if ((store.getTime() + expire) > System.currentTimeMillis()) return null;
    }
  } finally {
     hostDeadLock.unlock();
   }

 
 try {
      socket = new SockIO(this, host, this.socketTO, this.socketConnectTO, this.nagle);
      if (!socket.isConnected()) {
                log.error("++++ failed to get SockIO obj for: " + host + " -- new socket is not connected");
                deadPool.put(socket, ZERO);
                socket = null;
      }
  } catch (Exception ex) {
            log.error("++++ failed to get SockIO obj for: " + host);
            log.error(ex.getMessage(), ex);
            socket = null;
  }

   // if we failed to get socket, then mark
   // host dead for a duration which falls off
   hostDeadLock.lock();
   try {
          //到了这里,socket仍然为null,说明这个server悲剧了,无法和它创建连接,则要把该server丢到不可用的主机集合里
       if (socket == null) {
                Date now = new Date();
                hostDead.put(host, now);

                //如果上次就不可用了,到期了仍然不可用,就要这次的不可用时间设为上次的2倍,否则初始时长为1000ms
                long expire = (hostDeadDur.containsKey(host)) ? (((Long) hostDeadDur.get(host)).longValue() * 2) : 1000;

                if (expire > MAX_RETRY_DELAY) expire = MAX_RETRY_DELAY;

                hostDeadDur.put(host, new Long(expire));
                if (log.isDebugEnabled()) log.debug("++++ ignoring dead host: " + host + " for " + expire + " ms");

                // 既然这个host都不可用了,那与它的所有连接当然要从可用连接集合"availPool"里删除掉
          clearHostFromPool(availPool, host);
            } else {
                if (log.isDebugEnabled()) log.debug("++++ created socket (" + socket.toString() + ") for host: " + host);
                //连接创建成功,如果上次不成功,那么这次要把该host从不可用主机集合里删除掉
           if (hostDead.containsKey(host) || hostDeadDur.containsKey(host)) {
                    hostDead.remove(host);
                    hostDeadDur.remove(host);
                }
            }
        } finally {
            hostDeadLock.unlock();
        }

        return socket;
    }


  

 

   SockIO构造函数

      public SockIO(SockIOPool pool, String host, int timeout, int connectTimeout, boolean noDelay)
                                                                                                     throws IOException,
                                                                                                     UnknownHostException {
            this.pool = pool;
            String[] ip = host.split(":");
            // get socket: default is to use non-blocking connect
            sock = getSocket(ip[0], Integer.parseInt(ip[1]), connectTimeout);
            if (timeout >= 0) this.sock.setSoTimeout(timeout);
            // testing only
            sock.setTcpNoDelay(noDelay);
            // wrap streams
            in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
            out = new BufferedOutputStream(sock.getOutputStream());
            this.host = host;
        }

 
 

    getSocket方法

   protected static Socket getSocket(String host, int port, int timeout) throws IOException {
            SocketChannel sock = SocketChannel.open();
            sock.socket().connect(new InetSocketAddress(host, port), timeout);
            return sock.socket();
    }

  可以看到,socket连接是用nio方式创建的。

 

 

 

 新建MemcachedClient
MemcachedClient mcc = new MemcachedClient();新建了一个memcached客户端,看看构造函数,没作什么,只是设置参数而已。

   /**
     * Creates a new instance of MemCachedClient.
     */
    public MemcachedClient() {
        init();
    }


    private void init() {
        this.sanitizeKeys       = true;
        this.primitiveAsString  = false;
        this.compressEnable     = true;
        this.compressThreshold  = COMPRESS_THRESH;
        this.defaultEncoding    = "UTF-8";
        this.poolName           = ( this.poolName == null ) ? "default" : this.poolName;

        // get a pool instance to work with for the life of this instance
        this.pool               = SockIOPool.getInstance( poolName );
    }

 

 

 

 

 

 set方法如何工作

到此memcached客户端初始化工作完成。再回到测试类TestMemcached,看看for循环里的

boolean success = mcc.set( ""  + i, "Hello!" );
 String result = (String)mcc.get( "" + i );
 初始化后,就可以set,get了。看看set是怎么工作的。

 

/**
     * Stores data on the server; only the key and the value are specified.
     *
     * @param key key to store data under
     * @param value value to store
     * @return true, if the data was successfully stored
     */
    public boolean set( String key, Object value ) {
        return set( "set", key, value, null, null, primitiveAsString );
    }
 

    //这个set方法比较长 
   private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) {
        if ( cmdname == null || cmdname.trim().equals( "" ) || key == null ) {
            log.error( "key is null or cmd is null/empty for set()" );
            return false;
        }

        try {
            key = sanitizeKey( key );
        }
        catch ( UnsupportedEncodingException e ) {
            // if we have an errorHandler, use its hook
            if ( errorHandler != null )
                errorHandler.handleErrorOnSet( this, e, key );
            log.error( "failed to sanitize your key!", e );
            return false;
        }

        if ( value == null ) {
            log.error( "trying to store a null value to cache" );
            return false;
        }

        // get SockIO obj
        SockIOPool.SockIO sock = pool.getSock( key, hashCode );
       
        if ( sock == null ) {
            if ( errorHandler != null )
                errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key );
            return false;
        }
       
        if ( expiry == null )
            expiry = new Date(0);

        // store flags
        int flags = 0;
       
        // byte array to hold data
        byte[] val;

    //这些类型自己序列化,否则由java序列化处理
   if ( NativeHandler.isHandled( value ) ) {         
            if ( asString ) {
                //如果是字符串,则直接getBytes  
                try {
                    if ( log.isInfoEnabled() )
                        log.info( "++++ storing data as a string for key: " + key + " for class: " + value.getClass().getName() );
                    val = value.toString().getBytes( defaultEncoding );
                }
                catch ( UnsupportedEncodingException ue ) {
                    // if we have an errorHandler, use its hook
                    if ( errorHandler != null )
                        errorHandler.handleErrorOnSet( this, ue, key );
                    log.error( "invalid encoding type used: " + defaultEncoding, ue );
                    sock.close();
                    sock = null;
                    return false;
                }
            }
            else {
                try {
                    if ( log.isInfoEnabled() )
                        log.info( "Storing with native handler..." );
                    flags |= NativeHandler.getMarkerFlag( value );
                    val    = NativeHandler.encode( value );
                }
                catch ( Exception e ) {
                    // if we have an errorHandler, use its hook
                    if ( errorHandler != null )
                        errorHandler.handleErrorOnSet( this, e, key );
                    log.error( "Failed to native handle obj", e );

                    sock.close();
                    sock = null;
                    return false;
                }
            }
        }
        else {
            // 否则用java的序列化
        try {
                if ( log.isInfoEnabled() )
                    log.info( "++++ serializing for key: " + key + " for class: " + value.getClass().getName() );
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                (new ObjectOutputStream( bos )).writeObject( value );
                val = bos.toByteArray();
                flags |= F_SERIALIZED;
            }
            catch ( IOException e ) {
                // if we have an errorHandler, use its hook
                if ( errorHandler != null )
                    errorHandler.handleErrorOnSet( this, e, key );

                // if we fail to serialize, then
                // we bail
                log.error( "failed to serialize obj", e );
                log.error( value.toString() );

                // return socket to pool and bail
                sock.close();
                sock = null;
                return false;
            }
        }
       
        //压缩内容
     if ( compressEnable && val.length > compressThreshold ) {
            try {
                if ( log.isInfoEnabled() ) {
                    log.info( "++++ trying to compress data" );
                    log.info( "++++ size prior to compression: " + val.length );
                }
                ByteArrayOutputStream bos = new ByteArrayOutputStream( val.length );
                GZIPOutputStream gos = new GZIPOutputStream( bos );
                gos.write( val, 0, val.length );
                gos.finish();
                gos.close();
               
                // store it and set compression flag
                val = bos.toByteArray();
                flags |= F_COMPRESSED;

                if ( log.isInfoEnabled() )
                    log.info( "++++ compression succeeded, size after: " + val.length );
            }
            catch ( IOException e ) {
                // if we have an errorHandler, use its hook
                if ( errorHandler != null )
                    errorHandler.handleErrorOnSet( this, e, key );
                log.error( "IOException while compressing stream: " + e.getMessage() );
                log.error( "storing data uncompressed" );
            }
        }

        // now write the data to the cache server
        try {
             //按照memcached协议组装命令
        String cmd = String.format( "%s %s %d %d %d\r\n", cmdname, key, flags, (expiry.getTime() / 1000), val.length );
            sock.write( cmd.getBytes() );
            sock.write( val );
            sock.write( "\r\n".getBytes() );
            sock.flush();

            // get result code
            String line = sock.readLine();
            if ( log.isInfoEnabled() )
                log.info( "++++ memcache cmd (result code): " + cmd + " (" + line + ")" );

            if ( STORED.equals( line ) ) {
                if ( log.isInfoEnabled() )
                    log.info("++++ data successfully stored for key: " + key );
                sock.close();
                sock = null;
                return true;
            }
            else if ( NOTSTORED.equals( line ) ) {
                if ( log.isInfoEnabled() )
                    log.info( "++++ data not stored in cache for key: " + key );
            }
            else {
                log.error( "++++ error storing data in cache for key: " + key + " -- length: " + val.length );
                log.error( "++++ server response: " + line );
            }
        }
        catch ( IOException e ) {

            // if we have an errorHandler, use its hook
            if ( errorHandler != null )
                errorHandler.handleErrorOnSet( this, e, key );

            // exception thrown
            log.error( "++++ exception thrown while writing bytes to server on set" );
            log.error( e.getMessage(), e );

            try {
                sock.trueClose();
            }
            catch ( IOException ioe ) {
                log.error( "++++ failed to close socket : " + sock.toString() );
            }

            sock = null;
        }

        //用完了,就要回收哦,sock.close()不是真正的关闭,只是放入到可用连接集合里。  
       if ( sock != null ) {
            sock.close();
            sock = null;
        }
        return false;
    }

 通过set方法向服务器设置key和value,涉及到以下几个点
数据的压缩和序列化 (如果是get方法,则和set方法基本是相反的)
为key分配服务器
对于一些常用类型,采用自定义的序列化,具体要看NativeHander.java,这个类比较简单,有兴趣可以自己看看

 

 public static boolean isHandled( Object value ) {
        return (
            value instanceof Byte            ||
            value instanceof Boolean         ||
            value instanceof Integer         ||
            value instanceof Long            ||
            value instanceof Character       ||
            value instanceof String          ||
            value instanceof StringBuffer    ||
            value instanceof Float           ||
            value instanceof Short           ||
            value instanceof Double          ||
            value instanceof Date            ||
            value instanceof StringBuilder   ||
            value instanceof byte[]
            )
        ? true
        : false;
    }

 

 其他类型则用java的默认序列化

 

 

 

为key选择服务器
SockIOPool.SockIO sock = pool.getSock( key, hashCode );就是为key选择服务器

public SockIO getSock(String key, Integer hashCode) {
        if (log.isDebugEnabled()) log.debug("cache socket pick " + key + " " + hashCode);
        if (!this.initialized) {
            log.error("attempting to get SockIO from uninitialized pool!");
            return null;
        }

        // if no servers return null
        if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 0)
            || (buckets != null && buckets.size() == 0)) return null;

        // if only one server, return it
        if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 1)
            || (buckets != null && buckets.size() == 1)) {
            SockIO sock = (this.hashingAlg == CONSISTENT_HASH) ? getConnection(consistentBuckets.get(consistentBuckets.firstKey())) : getConnection(buckets.get(0));
            if (sock != null && sock.isConnected()) {
                if (aliveCheck) {//健康状态检查

                    if (!sock.isAlive()) {
                        sock.close();
                        try {
                            sock.trueClose();//有问题,真的关闭socket

                        } catch (IOException ioe) {
                            log.error("failed to close dead socket");
                        }
                        sock = null;
                    }
                }
            } else {//连接不正常,放入不可用连接集合里
          if (sock != null) {
                    deadPool.put(sock, ZERO);
                    sock = null;
                }
            }

            return sock;
        }

        Set<String> tryServers = new HashSet<String>(Arrays.asList(servers));
        // get initial bucket
        long bucket = getBucket(key, hashCode);
        String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);
      
        while (!tryServers.isEmpty()) {
            // try to get socket from bucket
            SockIO sock = getConnection(server);
            if (log.isDebugEnabled()) log.debug("cache choose " + server + " for " + key);
            if (sock != null && sock.isConnected()) {
                if (aliveCheck) {
                    if (sock.isAlive()) {
                        return sock;
                    } else {
                        sock.close();
                        try {
                            sock.trueClose();
                        } catch (IOException ioe) {
                            log.error("failed to close dead socket");
                        }
                        sock = null;
                    }
                } else {
                    return sock;
                }
            } else {
                if (sock != null) {
                    deadPool.put(sock, ZERO);
                    sock = null;
                }
            }

            // if we do not want to failover, then bail here
            if (!failover) return null;

            // log that we tried
            tryServers.remove(server);

            if (tryServers.isEmpty()) break; 
           //注意哦,下面是failover机制
        int rehashTries = 0;
            while (!tryServers.contains(server)) {
                String newKey = String.format("%s%s", rehashTries, key);
                if (log.isDebugEnabled()) log.debug("rehashing with: " + newKey);

                bucket = getBucket(newKey, null);
                server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);
                rehashTries++;
            }
        }
        return null;
    }

 
 

 

   下面这个方法是真正的从服务器获取连接

 

    
   

public SockIO getConnection(String host) {
        if (!this.initialized) {
            log.error("attempting to get SockIO from uninitialized pool!");
            return null;
        }

        if (host == null) return null;

        synchronized (this) {
            // if we have items in the pool
            // then we can return it
            if (availPool != null && !availPool.isEmpty()) {
                // take first connected socket
                Map<SockIO, Long> aSockets = availPool.get(host);
                if (aSockets != null && !aSockets.isEmpty()) {
                    for (Iterator<SockIO> i = aSockets.keySet().iterator(); i.hasNext();) {
                        SockIO socket = i.next();
                        if (socket.isConnected()) {
                            if (log.isDebugEnabled()) log.debug("++++ moving socket for host (" + host
                                                                + ") to busy pool ... socket: " + socket);
                            // remove from avail pool
                            i.remove();
                            // add to busy pool
                            addSocketToPool(busyPool, host, socket);
                            // return socket
                            return socket;
                        } else {
                            // add to deadpool for later reaping
                            deadPool.put(socket, ZERO);
                            // remove from avail pool
                            i.remove();
                        }
                    }
                }
            }
        }

        // create one socket -- let the maint thread take care of creating more
        SockIO socket = createSocket(host);
        if (socket != null) {
            synchronized (this) {
                addSocketToPool(busyPool, host, socket);
            }
        }
        return socket;
    }

 

  

 

 

 failover和failback

这两者都是发生在获取可用连接这个环节。

failover,如果为key选择的服务器不可用,则对key重新哈希选择下一个服务器,详见getSock方法的末尾。

failback,用一个hashmap存储连接失败的服务器和对应的失效持续时间,每次获取连接时,都探测是否到了重试时间。

 

 

 

 

分享到:
评论
1 楼 lixia0417 2014-02-27  
要是把代码放到附件中下载就更好了。

相关推荐

    ibus-table-chinese-erbi-1.4.6-3.el7.x64-86.rpm.tar.gz

    1、文件内容:ibus-table-chinese-erbi-1.4.6-3.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/ibus-table-chinese-erbi-1.4.6-3.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊

    基于微信小程序的新乡学院自习室预约系统.zip

    选择Java后台技术和MySQL数据库,在前台界面为提升用户体验,使用Jquery、Ajax、CSS等技术进行布局。 系统包括两类用户:学生、管理员。 学生用户只要实现了前台信息的查看,打开首页,查看网站介绍、自习室信息、在线留言、轮播图信息公告等,通过点击首页的菜单跳转到对应的功能页面菜单,包括网站首页、自习室信息、注册登录、个人中心、后台登录。 学生用户通过账户账号登录,登录后具有所有的操作权限,如果没有登录,不能在线预约。学生用户退出系统将注销个人的登录信息。 管理员通过后台的登录页面,选择管理员权限后进行登录,管理员的权限包括轮播公告管理、老师学生信息管理和信息审核管理,管理员管理后点击退出,注销登录信息。 管理员用户具有在线交流的管理,自习室信息管理、自习室预约管理。 在线交流是对前台用户留言内容进行管理,删除留言信息,查看留言信息。

    面向基层就业个性化大学生服务平台(源码+数据库+论文+ppt)java开发springboot框架javaweb,可做计算机毕业设计或课程设计

    面向基层就业个性化大学生服务平台(源码+数据库+论文+ppt)java开发springboot框架javaweb,可做计算机毕业设计或课程设计 【功能需求】 面向基层就业个性化大学生服务平台(源码+数据库+论文+ppt)java开发springboot框架javaweb,可做计算机毕业设计或课程设计 面向基层就业个性化大学生服务平台中的管理员角色主要负责了如下功能操作。 (1)职业分类管理功能需求:对职业进行划分分类管理等。 (2)用户管理功能需求:对用户信息进行维护管理等。 (3)职业信息管理功能需求:对职业信息进行发布等。 (4)问卷信息管理功能需求:可以发布学生的问卷调查操作。 (5)个性化测试管理功能需求:可以发布个性化测试试题。 (6)试题管理功能需求:对测试试题进行增删改查操作。 (7)社区交流管理功能需求:对用户的交流论坛信息进行维护管理。 面向基层就业个性化大学生服务平台中的用户角色主要负责了如下功能操作。 (1)注册登录功能需求:没有账号的用户,可以输入账号,密码,昵称,邮箱等信息进行注册操作,注册后可以输入账号和密码进行登录。 (2)职业信息功能需求:用户可以对职业信息进行查看。 (3)问卷信息功能需求:可以在线进行问卷调查答卷操作。 (4)社区交流功能需求:可以在线进行社区交流。 (5)个性化测试功能需求:可以在线进行个性化测试。 (6)公告资讯功能需求:可以查看浏览系统发布的公告资讯信息。 【环境需要】 1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。 2.IDE环境:IDEA,Eclipse,Myeclipse都可以。 3.tomcat环境:Tomcat 7.x,8.x,9.x版本均可 4.数据库:MySql 5.7/8.0等版本均可; 【购买须知】 本源码项目经过严格的调试,项目已确保无误,可直接用于课程实训或毕业设计提交。里面都有配套的运行环境软件,讲解视频,部署视频教程,一应俱全,可以自己按照教程导入运行。附有论文参考,使学习者能够快速掌握系统设计和实现的核心技术。

    三菱Fx3u程序:自动检测包装机电机控制模板,PLC脉冲与伺服定位,手自动切换功能,三菱Fx3u程序:自动检测包装机电机控制模板-涵盖伺服定位与手自动切换功能,三菱Fx3u程序,自动检测包装机 该

    三菱Fx3u程序:自动检测包装机电机控制模板,PLC脉冲与伺服定位,手自动切换功能,三菱Fx3u程序:自动检测包装机电机控制模板——涵盖伺服定位与手自动切换功能,三菱Fx3u程序,自动检测包装机。 该程序六个电机,plc本体脉冲控制3个轴,3个1pg控制。 程序内包括伺服定位,手自动切,功能快的使用,可作为模板程序,很适合新手。 ,三菱Fx3u程序; 自动检测包装机; 六个电机; PLC脉冲控制; 伺服定位; 手自动切换; 功能快捷键; 模板程序。,三菱Fx3u PLC控制下的自动包装机程序:六电机伺服定位与手自动切换模板程序

    基于多尺度集成极限学习机回归 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    计及信息间隙决策与多能转换的综合能源系统优化调度模型:实现碳经济最大化与源荷不确定性考量,基于信息间隙决策与多能转换的综合能源系统优化调度模型:源荷不确定性下的高效碳经济调度策略,计及信息间隙决策及多

    计及信息间隙决策与多能转换的综合能源系统优化调度模型:实现碳经济最大化与源荷不确定性考量,基于信息间隙决策与多能转换的综合能源系统优化调度模型:源荷不确定性下的高效碳经济调度策略,计及信息间隙决策及多能转的综合能源系统优化调度 本代码构建了含风电、光伏、光热发电系统、燃气轮机、燃气锅炉、电锅炉、储气、储电、储碳、碳捕集装置的综合能源系统优化调度模型,并考虑P2G装置与碳捕集装置联合运行,从而实现碳经济的最大化,最重要的是本文引入了信息间隙决策理论考虑了源荷的不确定性(本代码的重点)与店铺的47代码形成鲜明的对比,注意擦亮眼睛,认准原创,该代码非常适合修改创新,,提供相关的模型资料 ,计及信息间隙决策; 综合能源系统; 优化调度; 多能转换; 碳经济最大化; 风电; 光伏; 燃气轮机; 储气; 储电; 储碳; 碳捕集装置; P2G装置联合运行; 模型资料,综合能源系统优化调度模型:基于信息间隙决策和多能转换的原创方案

    IPG QCW激光模块电源驱动电路设计与实现:包含安全回路、紧急放电回路及光纤互锁功能的多版本原理图解析,IPG QCW激光模块电源驱动电路设计与实现:含安全回路、紧急放电及光纤互锁等多重保护功能的原

    IPG QCW激光模块电源驱动电路设计与实现:包含安全回路、紧急放电回路及光纤互锁功能的多版本原理图解析,IPG QCW激光模块电源驱动电路设计与实现:含安全回路、紧急放电及光纤互锁等多重保护功能的原理图解析,IPG QCW激光模块电源驱动电路, 包含安全回路,紧急放电回路,光纤互锁回路等, 元件参数请根据实际设计适当调整,此电路仅供参考,不提供pcb文件 原理图提供PDF和KICAD两个版本。 ,IPG激光模块; QCW激光电源驱动; 安全回路; 紧急放电回路; 光纤互锁回路; 原理图PDF和KICAD版本。,IPG激光模块电源驱动电路图解:含安全与紧急放电回路

    基于LSSVM的短期电力负荷预测模型及其性能评估:结果揭露精确度与误差分析,LSSVM在短期电力负荷预测中的结果分析:基于均方根误差、平均绝对误差及平均相对百分误差的评估 ,LSSVM最小二乘支持向量

    基于LSSVM的短期电力负荷预测模型及其性能评估:结果揭露精确度与误差分析,LSSVM在短期电力负荷预测中的结果分析:基于均方根误差、平均绝对误差及平均相对百分误差的评估。,LSSVM最小二乘支持向量机做短期电力负荷预测。 结果分析 均方根误差(RMSE):0.79172 平均绝对误差(MAE):0.4871 平均相对百分误差(MAPE):13.079% ,LSSVM(最小二乘支持向量机);短期电力负荷预测;均方根误差(RMSE);平均绝对误差(MAE);平均相对百分误差(MAPE),LSSVM在电力负荷短期预测中的应用及性能分析

    libmtp-examples-1.1.14-1.el7.x64-86.rpm.tar.gz

    1、文件内容:libmtp-examples-1.1.14-1.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/libmtp-examples-1.1.14-1.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊

    《基于 Transformer 的光学字符识别模型》(毕业设计,源码,教程)简单部署即可运行。功能完善、操作简单,适合毕设或课程设计.zip

    资源内项目源码是均来自个人的课程设计、毕业设计或者具体项目,代码都测试ok,都是运行成功后才上传资源,答辩评审绝对信服的,拿来就能用。放心下载使用!源码、说明、论文、数据集一站式服务,拿来就能用的绝对好资源!!! 项目备注 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、大作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 4、如有侵权请私信博主,感谢支持

    2023-04-06-项目笔记 - 第四百一十六阶段 - 4.4.2.414全局变量的作用域-414 -2025.02.21

    2023-04-06-项目笔记-第四百一十六阶段-课前小分享_小分享1.坚持提交gitee 小分享2.作业中提交代码 小分享3.写代码注意代码风格 4.3.1变量的使用 4.4变量的作用域与生命周期 4.4.1局部变量的作用域 4.4.2全局变量的作用域 4.4.2.1全局变量的作用域_1 4.4.2.414局变量的作用域_414- 2025-02-21

    MINIST数据集和春风机器学习框架

    MINIST数据集和春风机器学习框架

    ibus-table-chinese-wu-1.4.6-3.el7.x64-86.rpm.tar.gz

    1、文件内容:ibus-table-chinese-wu-1.4.6-3.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/ibus-table-chinese-wu-1.4.6-3.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊

    宿舍管理系统(源码+数据库+论文+ppt)java开发springboot框架javaweb,可做计算机毕业设计或课程设计

    宿舍管理系统(源码+数据库+论文+ppt)java开发springboot框架javaweb,可做计算机毕业设计或课程设计 【功能需求】 系统拥有管理员和学生两个角色,主要具备系统首页、个人中心、学生管理、宿舍信息管理、宿舍分配管理、水电费管理、进入宿舍管理、出入宿舍管理、维修信息管理、卫生信息管理、考勤信息管理、留言板、交流论坛、系统管理等功能模块。 【环境需要】 1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。 2.IDE环境:IDEA,Eclipse,Myeclipse都可以。 3.tomcat环境:Tomcat 7.x,8.x,9.x版本均可 4.数据库:MySql 5.7/8.0等版本均可; 【购买须知】 本源码项目经过严格的调试,项目已确保无误,可直接用于课程实训或毕业设计提交。里面都有配套的运行环境软件,讲解视频,部署视频教程,一应俱全,可以自己按照教程导入运行。附有论文参考,使学习者能够快速掌握系统设计和实现的核心技术。

    基于智能算法的无人机路径规划研究 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    人凤飞飞凤飞飞是粉色丰富

    人凤飞飞凤飞飞是粉色丰富

    2024蓝桥杯嵌入式学习资料

    2024蓝桥杯嵌入式学习资料

    image_download_1740129191509.jpg

    image_download_1740129191509.jpg

    基于Multisim仿真的带优先病房呼叫系统设计(仿真图)

    基于Multisim仿真的带优先病房呼叫系统设计(仿真图) 设计一个病房呼叫系统。 功能 (1)当有病人紧急呼叫时,产生声,光提示,并显示病人的编号; (2)根据病人的病情设计优先级别,当有多人呼叫时,病情严重者优先; (3)医护人员处理完当前最高级别的呼叫后,系统按优先级别显示其他呼叫病人的病号。

    基于STM32F103的3.6kW全桥逆变器资料:并网充电放电、智能切换与全方位保护方案,基于STM32F103的3.6kW全桥逆变器资料:并网充电放电、智能控制与全方位保护方案,逆变器光伏逆变器,3

    基于STM32F103的3.6kW全桥逆变器资料:并网充电放电、智能切换与全方位保护方案,基于STM32F103的3.6kW全桥逆变器资料:并网充电放电、智能控制与全方位保护方案,逆变器光伏逆变器,3.6kw储能逆变器全套资料 STM32储能逆变器 BOOST 全桥 基于STM32F103设计,具有并网充电、放电;并网离网自动切;485通讯,在线升级;风扇智能控制,提供过流、过压、短路、过温等全方位保护。 基于arm的方案区别于dsp。 有PCB、原理图及代码ad文件。 ,逆变器; 储能逆变器; STM32F103; 3.6kw; 485通讯; 全方位保护; 智能控制; 方案区别; PCB文件; 原理图文件; ad文件。,基于STM32F103的3.6kw储能逆变器:全方位保护与智能控制

Global site tag (gtag.js) - Google Analytics