`
IXHONG
  • 浏览: 449336 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【转】一致性hash算法与server列表维护

阅读更多

  考虑到不用重复造轮子,特此转载好文,出处http://shift-alt-ctrl.iteye.com/blog/1963244

    普通的hash算法有个很大的问题:当hash的"模数"发生变化时,整个hash数据结构就需要重新hash,重新hash之后的数据分布一定会和hash之前的不同;在很多场景下,"模数"的变化时必然的,但是这种"数据分布"的巨大变化却会带来一些麻烦.所以,就有了"一致性hash",当然学术界对"一致性hash"的阐述,还远远不止这些.

    在编程应用方面,最直观的例子就是"分布式缓存",一个cache集群中,有N台物理server,为了提升单台server的支撑能力,可能会考虑将数据通过hash的方式相对均匀的分布在每个server上.

    判定方式: location = hashcode(key) % N;事实上,由于需要,N可能会被增加或者削减,不管程序上是否能够妥善的支持N的变更,单从"数据迁移"的面积而言,也是非常大的.

    一致性Hash很巧妙的简化了这个问题,同时再使用"虚拟节点"的方式来细分数据的分布.



 

F1

    图示中表名,有2个物理server节点,每个物理server节点有多个Snode虚拟节点,server1和server2的虚拟节点互相"穿插"且依次排列,每个snode都有一个code,它表示接受数据的hashcode起始值(或终止值),比如上述图示中第一个snode.code为500,即当一个数据的hashcode值在[0,500]时则会被存储在它上.

    引入虚拟节点之后,事情就会好很多;假如KEY1分布在Snode3上,snode3事实为物理server1,当server1故障后,snode1也将被移除,那么KEY1将会被分布在"临近的"虚拟节点上--snode2(或者snode4,由实现而定);无论是存取,下一次对KEY1的操作将会有snode2(或snode4)来服务.

    1) 一个物理server在逻辑上分成N个虚拟节点(本例中为256个)

    2) 多个物理server的虚拟节点需要散列分布,即互相"穿插".

    3) 所有的虚拟节点,在逻辑上形成一个链表

    4) 每个虚拟节点,负责一定区间的hashcode值.

import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;


public class ServerPool {

    private static final int VIRTUAL_NODES_NUMBER = 256;//物理节点对应的虚拟节点的个数
    private static final String TAG = ".-vtag-.";
    private NavigableMap<Long, SNode> innerPool = new TreeMap<Long, SNode>();
    private Hashing hashing = new Hashing();

    /**
     * 新增物理server地址
     * @param address
     * @param  weight
     * 权重,权重越高,其虚拟节点的个数越高,事实上没命中的概率越高
     * @throws Exception
     */
    public synchronized void addServer(String address,int weight) throws Exception {
        SNode prev = null;
        SNode header = null;
        String[] tmp = address.split(":");
        InnerServer server = new InnerServer(tmp[0], Integer.parseInt(tmp[1]));
        server.init();
        //将一个address下的所有虚拟节点SNode形成链表,可以在removeServer,以及
        //特殊场景下使用
        int max = 1;
        if(weight > 0){
            max = VIRTUAL_NODES_NUMBER * weight;
        }
        for (int i = 0; i < max; i++) {
            long code = hashing.hash(address + TAG + i);
            SNode current = new SNode(server, code);
            if (header == null) {
                header = current;
            }
            current.setPrev(prev);
            innerPool.put(code, current);
            prev = current;
        }
    }
    /**
     * 删除物理server地址,伴随着虚拟节点的删除
     * @param address
     */
    public synchronized void removeServer(String address) {
        long code = hashing.hash(address + TAG + (VIRTUAL_NODES_NUMBER - 1));
        SNode current = innerPool.get(code);
        if(current == null){
            return;
        }
        if(!current.getAddress().equalsIgnoreCase(address)){
            return;
        }
        current.getServer().close();;
        while (current != null) {
            current = innerPool.remove(current.getCode()).getPrev();
        }

    }

    /**
     * 根据指定的key,获取此key应该命中的物理server信息
     * @param key
     * @return
     */
    public InnerServer getServer(String key) {
        long code = hashing.hash(key);
        SNode snode = innerPool.lowerEntry(code).getValue();
        if (snode == null) {
            snode = innerPool.firstEntry().getValue();
        }
        return snode.getServer();
    }


    /**
     * 虚拟节点描述
     */
    class SNode {
        Long code;
        InnerServer server;
        SNode prev;

        SNode(InnerServer server, Long code) {
            this.server = server;
            this.code = code;
        }

        SNode getPrev() {
            return prev;
        }

        void setPrev(SNode prev) {
            this.prev = prev;
        }

        Long getCode() {
            return this.code;
        }

        InnerServer getServer() {
            return server;
        }
        String getAddress(){
            return server.ip + ":" + server.port;
        }
    }

    /**
     * hashcode生成
     */
    class Hashing {
        //少量优化性能
        private ThreadLocal<MessageDigest> md5Holder = new ThreadLocal<MessageDigest>();
        private Charset DEFAULT_CHARSET = Charset.forName("utf-8");

        public long hash(String key) {
            return hash(key.getBytes(DEFAULT_CHARSET));
        }

        public long hash(byte[] key) {
            try {
                if (md5Holder.get() == null) {
                    md5Holder.set(MessageDigest.getInstance("MD5"));
                }
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException("no md5 algorythm found");
            }
            MessageDigest md5 = md5Holder.get();

            md5.reset();
            md5.update(key);
            byte[] bKey = md5.digest();
            long res = ((long) (bKey[3] & 0xFF) << 24)
                    | ((long) (bKey[2] & 0xFF) << 16)
                    | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF);
            return res;
        }
    }

    /**
     * 与物理server的TCP链接,用于实际的IO操作
     */
    class InnerServer {
        String ip;
        int port;
        Socket socket;

        InnerServer(String ip, int port) {
            this.ip = ip;
            this.port = port;
        }

        synchronized void init() throws Exception {
            SocketAddress socketAddress = new InetSocketAddress(ip, port);
            socket = new Socket();
            socket.connect(socketAddress, 30000);
        }

        public boolean write(byte[] sources) {
            //TODO
            return true;
        }

        public byte[] read() {
            //TODO
            return new byte[]{};
        }

        public void close(){
             if(socket == null || socket.isClosed()){
                 return;
             }
            try{
                socket.close();
            } catch (Exception e){
                //
            }
        }
    }
}

 

分享到:
评论

相关推荐

    PHP一致性hash分布式算法封装类定义与用法示例

    一致性哈希(Consistent Hashing)是一种分布式系统中用于负载均衡的哈希算法。它解决的是分布式系统中如何在服务器节点变更时尽可能均匀地重新分配原有的数据和负载的问题。传统的哈希算法在服务器数量变动时需要重新...

    NGINX配置NGX-HTTP-CONSISTENT-HASH实现一致性哈希负载均衡

    2.NGX_HTTP_CONSISTENT_HASH 是一个用于 Nginx 的模块,可以实现基于一致性哈希的负载均衡策略。下载地址:https://github.com/replay/ngx_http_consistent_hash/tree/master,如果打不开,我将我下载的内容上传,...

    轻松实现Sql Server 2005下的Base64、MD5、SHA1算法函数

    本篇文章将详细介绍如何在SQL Server 2005环境下轻松实现这三种算法的函数,帮助你有效地进行数据处理。 一、Base64编码 Base64是一种用于在网络上传输二进制数据的编码方式,它可以将任意二进制数据转换为可打印的...

    nginx负载均衡中RR和ip_hash策略分析

    - 对于需要维护用户会话状态的应用程序,如购物车、登录状态等,使用ip_hash可以更好地保持用户的会话一致性。 - 在用户访问量较大的情况下,使用ip_hash可以显著提高用户体验。 #### 六、RR与ip_hash对比 - **...

    基于DHT的Kademlia 算法

    5. **一致性与稳定性**:尽管在网络中节点可能频繁加入和离开,Kademlia算法通过不断更新和维护节点之间的联系,确保了系统的稳定性和一致性。 #### 结论 基于DHT的Kademlia算法,以其高效的查询机制和健壮的网络...

    nginx_upstream_hash-0.3.2.tar.gz

    这种策略在处理会话持久化、缓存一致性等问题时非常有用,比如保持用户与特定服务器的连接,或者确保相同的数据始终被同一个服务器处理。 在0.3.2版本中,nginx_upstream_hash模块已经相当成熟,提供了稳定性和效率...

    SQL Server和.NET的一致哈希函数

    但是,由于`GetHashCode()`在不同版本的.NET Framework中可能会有变化,因此,为了确保与SQL Server的一致性,通常需要使用更稳定的哈希算法,如MD5或SHA系列。 在SQL Server中,`CHECKSUM`函数是一个简单的哈希...

    distributed-system-framework-master_c++mysql单例_虚拟机调度_

    (1)Linux下socket编程, 封装TcpServer,TcpClient(2)Libevent网络框架库的使用(3)服务器端线程池的使用以及其负载均衡(4)MySQL数据库C接口的C++类封装(5)单例模式(6)负载均衡算法之一致性hash算法(7)...

    解析nginx负载均衡

    - IP Hash算法的核心在于将客户端IP地址与后端服务器的数量相结合,通过简单的哈希运算确定请求应转发至的服务器。 - 为了确保均衡性,当经过一定次数的哈希仍然找不到可用服务器时,会退化为普通的轮询模式。 - ...

    sqlserver2000优化(必须的)

    - **存储过程**:利用存储过程封装复杂的业务逻辑,减少网络传输的开销,提高数据处理的速度和一致性。 ### 6. 定期维护与监控 - **定期分析与重构**:定期对数据库进行分析,检查表和索引的状态,及时进行重构和...

    asp编写的Md5加密算法.rar_ASP 加密解密_asp 文件md5_加密_加密算法

    ASP(Active Server Pages)是一种微软开发的服务器端脚本语言,常用于构建动态网页和Web应用程序。在ASP中实现MD5(Message-Digest Algorithm 5)加密算法是常见的安全实践,用于保护敏感数据,如用户密码。MD5是一...

    淘宝Oceanbase云存储系统实践.doc

    分布式Hash表通过一致性Hash算法将数据均匀分布在集群中,适合快速的Key-Value操作,但不支持范围查询。而分布式B+树则支持范围查询,但其数据管理更为复杂,需要处理分裂和合并的情况。例如,Amazon的Dynamo和S3...

    MD5加密算法

    MD5(Message-Digest Algorithm 5)是一种广泛使用的密码散列函数,可以产生一个128位(16字节)的散列值,通常用来确保数据的完整性和一致性。MD5算法在.NET框架结合SQL Server的应用场景中,主要用于用户密码的...

    server san 领域中ceph的经典论文集

    文中阐述了其关键特性,包括数据复制、故障检测与恢复、以及如何通过自动修复机制保证数据一致性。 2. **"Ceph: A Scalable, High-Performance Distributed File System" (weil-ceph-osdi06.pdf)** 在这篇论文中,...

    MySQL Server 5.5.rar

    5. **复制功能改进**:在MySQL 5.5中,主从复制支持半同步复制模式,确保了数据的一致性。此外,复制日志格式从Binary Log升级到Row-based Replication,减少了数据冲突的可能性。 6. **安全性和审计**:MySQL 5.5...

    完全与标准算法一致的asp的3des,base64,SHA1源码

    在给定的标题和描述中,我们关注的是三种不同的加密和编码技术:3DES、Base64和SHA1,这些技术都是在ASP环境中实现的,并且已经过测试,确保与C#和Java的标准算法保持一致。下面我们将深入探讨这些技术以及它们在...

    udp p2p的server和client

    5. **数据一致性与可靠性**:由于UDP的不可靠性,P2P程序需要处理数据丢失、重复和乱序问题。这通常通过自定义的确认机制、序列号和重传策略来实现。 6. **资源发现与交换**:P2P网络中的节点需要知道如何找到其他...

Global site tag (gtag.js) - Google Analytics