consistent hashing 算法早在 1997 年就在论文 Consistent hashing and random trees 中被提出,目前在cache 系统中应用越来越广泛;
1 基本场景
比如你有 N 个 cache 服务器(后面简称 cache ),那么如何将一个对象 object 映射到 N 个 cache 上呢,你很可能会采用类似下面的通用方法计算 object 的 hash 值,然后均匀的映射到到 N 个 cache ;
1 一个 cache 服务器 m down 掉了(在实际应用中必须要考虑这种情况),这样所有映射到 cache m 的对象都会失效,怎么办,需要把 cache m 从 cache 中移除,这时候 cache 是 N-1 台,映射公式变成了 hash(object)%(N-1);
2 由于访问加重,需要添加 cache ,这时候 cache 是 N+1 台,映射公式变成了 hash(object)%(N+1) ;
1 和 2 意味着什么?这意味着突然之间几乎所有的 cache 都失效了。对于服务器而言,这是一场灾难,洪水般的访问都会直接冲向后台服务器;
再来考虑第三个问题,由于硬件能力越来越强,你可能想让后面添加的节点多做点活,显然上面的 hash 算法也做不到。
有什么方法可以改变这个状况呢,这就是 consistent hashing...
2 hash 算法和单调性
Hash 算法的一个衡量指标是单调性( Monotonicity ),定义如下:
容易看到,上面的简单 hash 算法 hash(object)%N 难以满足单调性要求。
3 consistent hashing 算法的原理
consistent hashing 是一种 hash 算法,简单的说,在移除 / 添加一个 cache 时,它能够尽可能小的改变已存在 key 映射关系,尽可能的满足单调性的要求。
下面就来按照 5 个步骤简单讲讲 consistent hashing 算法的基本原理。
3.1 环形hash 空间
考虑通常的 hash 算法都是将 value 映射到一个 32 为的 key 值,也即是 0~2^32-1 次方的数值空间;我们可以将这个空间想象成一个首( 0 )尾( 2^32-1 )相接的圆环,如下面图 1 所示的那样。
图 1 环形 hash 空间
3.2 把对象映射到hash 空间
接下来考虑 4 个对象 object1~object4 ,通过 hash 函数计算出的 hash 值 key 在环上的分布如图 2 所示。
hash(object1) = key1;
… …
hash(object4) = key4;
图 2 4 个对象的 key 值分布
3.3 把cache 映射到hash 空间
Consistent hashing 的基本思想就是将对象和 cache 都映射到同一个 hash 数值空间中,并且使用相同的 hash算法。
假设当前有 A,B 和 C 共 3 台 cache ,那么其映射结果将如图 3 所示,他们在 hash 空间中,以对应的 hash 值排列。
hash(cache A) = key A;
… …
hash(cache C) = key C;
图 3 cache 和对象的 key 值分布
说到这里,顺便提一下 cache 的 hash 计算,一般的方法可以使用 cache 机器的 IP 地址或者机器名作为 hash输入。
3.4 把对象映射到cache
现在 cache 和对象都已经通过同一个 hash 算法映射到 hash 数值空间中了,接下来要考虑的就是如何将对象映射到 cache 上面了。
在这个环形空间中,如果沿着顺时针方向从对象的 key 值出发,直到遇见一个 cache ,那么就将该对象存储在这个 cache 上,因为对象和 cache 的 hash 值是固定的,因此这个 cache 必然是唯一和确定的。这样不就找到了对象和 cache 的映射方法了吗?!
依然继续上面的例子(参见图 3 ),那么根据上面的方法,对象 object1 将被存储到 cache A 上; object2 和object3 对应到 cache C ; object4 对应到 cache B ;
3.5 考察cache 的变动
前面讲过,通过 hash 然后求余的方法带来的最大问题就在于不能满足单调性,当 cache 有所变动时, cache会失效,进而对后台服务器造成巨大的冲击,现在就来分析分析 consistent hashing 算法。
3.5.1 移除 cache
考虑假设 cache B 挂掉了,根据上面讲到的映射方法,这时受影响的将仅是那些沿 cache B 逆时针遍历直到下一个 cache ( cache C )之间的对象,也即是本来映射到 cache B 上的那些对象。
因此这里仅需要变动对象 object4 ,将其重新映射到 cache C 上即可;参见图 4 。
图 4 Cache B 被移除后的 cache 映射
3.5.2 添加 cache
再考虑添加一台新的 cache D 的情况,假设在这个环形 hash 空间中, cache D 被映射在对象 object2 和object3 之间。这时受影响的将仅是那些沿 cache D 逆时针遍历直到下一个 cache ( cache B )之间的对象(它们是也本来映射到 cache C 上对象的一部分),将这些对象重新映射到 cache D 上即可。
因此这里仅需要变动对象 object2 ,将其重新映射到 cache D 上;参见图 5 。
图 5 添加 cache D 后的映射关系
4 虚拟节点
考量 Hash 算法的另一个指标是平衡性 (Balance) ,定义如下:
hash 算法并不是保证绝对的平衡,如果 cache 较少的话,对象并不能被均匀的映射到 cache 上,比如在上面的例子中,仅部署 cache A 和 cache C 的情况下,在 4 个对象中, cache A 仅存储了 object1 ,而 cache C 则存储了object2 、 object3 和 object4 ;分布是很不均衡的。
为了解决这种情况, consistent hashing 引入了“虚拟节点”的概念,它可以如下定义:
“虚拟节点”( virtual node )是实际节点在 hash 空间的复制品( replica ),一实际个节点对应了若干个“虚拟节点”,这个对应个数也成为“复制个数”,“虚拟节点”在 hash 空间中以 hash 值排列。
仍以仅部署 cache A 和 cache C 的情况为例,在图 4 中我们已经看到, cache 分布并不均匀。现在我们引入虚拟节点,并设置“复制个数”为 2 ,这就意味着一共会存在 4 个“虚拟节点”, cache A1, cache A2 代表了cache A ; cache C1, cache C2 代表了 cache C ;假设一种比较理想的情况,参见图 6 。
图 6 引入“虚拟节点”后的映射关系
objec1->cache A2 ; objec2->cache A1 ; objec3->cache C1 ; objec4->cache C2 ;
因此对象 object1 和 object2 都被映射到了 cache A 上,而 object3 和 object4 映射到了 cache C 上;平衡性有了很大提高。
引入“虚拟节点”后,映射关系就从 { 对象 -> 节点 } 转换到了 { 对象 -> 虚拟节点 } 。查询物体所在 cache 时的映射关系如图 7 所示。
图 7 查询对象所在 cache
“虚拟节点”的 hash 计算可以采用对应节点的 IP 地址加数字后缀的方式。例如假设 cache A 的 IP 地址为202.168.14.241 。
引入“虚拟节点”前,计算 cache A 的 hash 值:
引入“虚拟节点”后,计算“虚拟节”点 cache A1 和 cache A2 的 hash 值:
Hash(“”); // cache A1
Hash(“”); // cache A2
5 小结
Consistent hashing 的基本原理就是这些,具体的分布性等理论分析应该是很复杂的,不过一般也用不到。 上面有一个 Java 版本的例子,可以参考。 转载了一个 PHP 版的实现代码。 C语言版本
import java.util.Collection; import java.util.SortedMap; import java.util.TreeMap; /** * 将Hash算法跟虚拟节点的虚拟方式给抽出来,模版方法设计模式 * * @Author 妳那伊抹微笑 * */ public abstract class AbstractConsistentHash { /** * 复制节点,增加每个节点的复制节点有利于负载均衡 */ protected int numberOfReplicas = 8; /** * 一致性哈希 */ protected SortedMap<Long, String> circle = new TreeMap<Long, String>(); /** * 自定义一致性哈希算法 */ protected abstract long defineHash(String key); /** * 自定义circle key */ protected abstract String defineCircleKey(String node, int i); public AbstractConsistentHash() { } public AbstractConsistentHash(int numberOfReplicas, Collection<String> nodes) { this.numberOfReplicas = numberOfReplicas; for (String node : nodes) { addNode(node); } } /** * 添加节点 */ public void addNode(String node) { for (int i = 0; i < this.numberOfReplicas; i++) { /* * 计算key的哈希算法 */ long key = defineHash(defineCircleKey(node, i)); circle.put(key, node); } } /** * 删除节点 */ public void removeNode(String node) { for (int i = 0; i < this.numberOfReplicas; i++) { /* * 计算key的哈希算法 */ long key = defineHash(defineCircleKey(node, i)); circle.remove(key); } } /** * 查找节点,取得顺时针方向上最近的一个虚拟节点对应的实际节点 */ public String getNode(String node) { if (circle.isEmpty()) { return null; } /* * 计算key的哈希算法 */ long key = defineHash(node); if (!circle.containsKey(key)) { SortedMap<Long, String> tailMap = circle.tailMap(key); key = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); } return circle.get(key); } }
import java.util.Set; import; /** * HashCode 一致性哈希,使用CRC32算法计算hash值 * * @Author 妳那伊抹微笑 * */ public class CRC32ConsistentHash extends AbstractConsistentHash { public CRC32ConsistentHash(int numberOfReplicas, Set<String> nodes) { super(numberOfReplicas, nodes); } @Override protected long defineHash(String key) { CRC32 hash = new CRC32(); hash.update(key.getBytes()); return (int)hash.getValue(); } @Override protected String defineCircleKey(String node, int i) { return (node+i); }
import java.util.Set; /** * HashCode 一致性哈希,使用自带的hashCode计算hash值 * * @Author 妳那伊抹微笑 * */ public class HashCodeConsistentHash extends AbstractConsistentHash { public HashCodeConsistentHash(int numberOfReplicas, Set<String> nodes) { super(numberOfReplicas, nodes); } @Override protected long defineHash(String key) { return key.hashCode(); } @Override protected String defineCircleKey(String node, int i) { if(i > 99){ return node + "-" + i; }else if(i > 9){ return node + "-0" + i; }else{ return node + "-00" + i; } } }
import java.nio.charset.Charset; import; import java.util.Set; /** * HashCode 一致性哈希,使用自MD5算法计算hash值 * * @Author 妳那伊抹微笑 * */ public class MD5ConsistentHash extends AbstractConsistentHash{ public MD5ConsistentHash(int numberOfReplicas, Set<String> nodes) { super(numberOfReplicas, nodes); } @Override protected String defineCircleKey(String node, int i) { if(i > 99){ return node + "-" + i; }else if(i > 9){ return node + "-0" + i; }else{ return node + "-00" + i; } } @Override protected long defineHash(String key) { try { MessageDigest md5 = MessageDigest.getInstance("MD5"); md5.update(key.getBytes(Charset.forName("UTF8"))); byte[] digest = md5.digest(); long hash = 0; for (int i = 0; i < 4; i++) { hash += ((long) (digest[i * 4 + 3] & 0xFF) << 24) | ((long) (digest[i * 4 + 2] & 0xFF) << 16) | ((long) (digest[i * 4 + 1] & 0xFF) << 8) | ((long) (digest[i * 4 + 0] & 0xFF)); } System.out.println(key+ "--->" + hash); return hash; } catch (Exception ex) { return -1; } }
import; import; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * 一致性哈希Utils, 实现了redis中hash的一般操作 * * @Author 妳那伊抹微笑 * */ public class ConsistentHashRedisUtils { /** * redis server properties 配置文件的路径 */ private static final String REDIS_SERVERS_HANGZHOU_PROPERTIES = "../../conf/"; /** * 日志对象 */ private Log log = LogFactory.getLog(this.getClass()); /** * 一致性哈希 */ private AbstractConsistentHash hash; /** * Redis连接池 */ private Map<String, JedisPool> jedisPools = new HashMap<String, JedisPool>();; /** * 外网地址到内网地IP映射 */ private Map<String, String> redisNodeMaps = new LinkedHashMap<String, String>(); /** * 是否转换到内网地址创建jedis对象 */ private boolean isConvertIp = false; /** * 虚拟节点个数 */ private static int numberOfReplicas = 8; public ConsistentHashRedisUtils() { this(numberOfReplicas); } public ConsistentHashRedisUtils(int numberOfReplicas) { redisNodeMaps = getRedisConfNodeMaps(); hash = new CRC32ConsistentHash(numberOfReplicas, redisNodeMaps.keySet()); } /** * hset * * @param key * @param field * @param value */ public void hset(String key, String field, String value) { JedisPool pool = null; Jedis jedis = null; try { pool = getJedisPool(key); jedis = pool.getResource(); jedis.hset(key, field, value); } catch (Exception e) { log.error("Redis hset exception : " + key, e); pool.returnBrokenResource(jedis); } finally { pool.returnResource(jedis); } } /** * hincrBy * * @param key * @param field * @param value */ public void hincrBy(String key, String field, long value) { JedisPool pool = null; Jedis jedis = null; try { pool = getJedisPool(key); jedis = pool.getResource(); jedis.hincrBy(key, field, value); } catch (Exception e) { log.error("Redis hincrBy exception : " + key, e); pool.returnBrokenResource(jedis); } finally { pool.returnResource(jedis); } } /** * hget * * @param key * @param field * @return? */ public String hget(String key, String field) { JedisPool pool = null; Jedis jedis = null; String result = null; try { pool = getJedisPool(key); jedis = pool.getResource(); result = jedis.hget(key, field); } catch (Exception e) { log.error("Redis hget exception : " + key, e); pool.returnBrokenResource(jedis); } finally { pool.returnResource(jedis); } return result; } /** * hgetAll * * @param key * @return map */ public Map<String, String> hgetAll(String key) { JedisPool pool = null; Jedis jedis = null; Map<String, String> result = null; try { pool = getJedisPool(key); jedis = pool.getResource(); result = jedis.hgetAll(key); } catch (Exception e) { log.error("Redis hgetAll exception : " + key, e); pool.returnBrokenResource(jedis); } finally { pool.returnResource(jedis); } return result; } /** * hdel * * @param key * @param field */ public void hdel(String key, String field) { JedisPool pool = null; Jedis jedis = null; try { pool = getJedisPool(key); jedis = pool.getResource(); jedis.hdel(key, field); } catch (Exception e) { log.error("Redis hdel exception : " + key, e); pool.returnBrokenResource(jedis); } finally { pool.returnResource(jedis); } } /** * 通过userid获取对应的jedis连接 * * @param userid * @return * @throws Exception */ private JedisPool getJedisPool(String userid) throws Exception { String key = this.hash.getNode(userid); JedisPool pool = this.jedisPools.get(key); if (pool == null) { String outKey = key; /* * 将外网ip转换内网ip */ if (this.isConvertIp) { key = this.redisNodeMaps.get(key); if (key == null) { throw new Exception("not " + key + " in redisNodeMaps"); } } String[] host_port = key.split(":"); pool = getJedisPool(host_port);"-------------------->Redis ip--->port : " + host_port[0] + "--->" + host_port[1]); this.jedisPools.put(outKey, pool); } return pool; } /** * 获得redis的连接池 * * @param host_port * @return */ private JedisPool getJedisPool(String[] host_port) { JedisPoolConfig config = new JedisPoolConfig(); // config.setMaxActive(500); config.setMaxIdle(10); // config.setMaxWait(3000000); config.setTestOnBorrow(true); config.setTestOnReturn(true); return new JedisPool(config, host_port[0], Integer.parseInt(host_port[1])); } /** * 关闭所有redis链接 */ public void closeAllConnection() throws Exception { for (String key : this.jedisPools.keySet()) { JedisPool pool = this.jedisPools.get(key); if (pool != null) { pool.destroy(); } } } /** * 从配置文件中获取Redis集群的IP * * @return */ private Map<String, String> getRedisConfNodeMaps() { Map<String, String> rmaps = null; String line = null; try { BufferedReader reader = new BufferedReader( new InputStreamReader(this.getClass().getResourceAsStream(REDIS_SERVERS_HANGZHOU_PROPERTIES))); rmaps = new LinkedHashMap<String, String>(); String[] splits = null; String outIp = null; String inIp = null; while ((line = reader.readLine()) != null) { if (line.trim().length() < 5 || line.trim().startsWith("#")) continue;"redis server : " + line); splits = line.split(" +|\t+"); outIp = splits[0]; inIp = splits[1]; rmaps.put(outIp, inIp); } "ConsistentHashRedisUtils initialize configure ../../conf/ is successful ..."); } catch (Exception e) { log.error("ConsistentHashRedisUtils initialize error : " + line, e); } return rmaps; } }
以上的代码还没有解决数据转移的问题,比如其中一台Redis服务器挂掉了,应该将数据转移到该服务器顺时针最近的那一台活着的服务器上去,而且以后数据恢复,还需要自己写代码做数据校验,数据找回等等!!!这也是一个麻烦的问题 、、、(至少数据不会丢失了,这点还是不错的,数据校验跟数据找回的代码写好后,也可以一劳永逸了,嘎嘎!)
由于使用了虚拟节点的方式,上面的虚拟节点数量为8,现在新增一台Redis服务器,也会虚拟出8台虚拟节点出来,这里原先假设的是有12台Redis服务器,假如现在的这8台虚拟节点如果刚好分布到了其他8台虚拟节点之间,也就是意味着另外8台Redis中差不多每台Rdis服务器都会有八分之一的数据会跑到现在新增的这台Redis服务器上,这样也会造成数据混乱,这该如何解决 、、、也就是新增节点之后还是破坏了原有的映射关系,当然知道了这样的原理之后也是可以写代码数据校验跟找回的,重新平衡服务器的数据,不过那也太麻烦点了,伤不起啊!假如这里虚拟节点的数据变大了,那以后增加节点是不是要做的工作就更大了,这 、、、
