/**
* Copyright (c) AVIT LTD (2016). All Rights Reserved.
* Welcome to www.avit.com.cn
*/
package com.avit.ipmp.common.utils;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import com.avit.ipmp.nodeManager.beans.NodeInfo;
public class SharedMqttNode<T> {
private TreeMap<Long, T> nodes = new TreeMap<Long, T>(); // 虚拟节点
private List<T> shards; // 真实机器节点
private final int NODE_NUM = 5; // 每个机器节点关联的虚拟节点个数
public SharedMqttNode(List<T> shards) {
super();
this.shards = shards;
init();
}
private void init() { // 初始化一致性hash环
for (int i = 0; i != shards.size(); ++i) { // 每个真实机器节点都需要关联虚拟节点
for (int n = 0; n < NODE_NUM; n++)
// 一个真实机器节点关联NODE_NUM个虚拟节点
nodes.put(hash(shards.get(i).hashCode()+n+""), shards.get(i));
}
}
public void add(T node) {
this.shards.add(node);
for (int i = 0; i < NODE_NUM; i++) {
// 一个真实机器节点关联NODE_NUM个虚拟节点
nodes.put(hash(node.hashCode()+i+""), node);
}
}
public void remove(T node) {
for (int i = 0; i < NODE_NUM; i++) {
nodes .remove(hash(node.hashCode()+i+""));
}
this.shards.remove(node);
}
public T getShardInfo(String key) {
SortedMap<Long, T> tail = nodes.tailMap(hash(key)); // 沿环的顺时针找到一个虚拟节点
if (tail.size() == 0) {
return nodes.get(nodes.firstKey());
}
return tail.get(tail.firstKey()); // 返回该虚拟节点对应的真实机器节点的信息
// Entry en = nodes.ceilingEntry(hash(key));
// if (en == null) {
// return (T) nodes.firstEntry().getValue();
// }
// return (T) en.getValue();
}
public List<T> getReal() {
return shards;
}
/**
* MurMurHash算法,是非加密HASH算法,性能很高,
* 比传统的CRC32,MD5,SHA-1(这两个算法都是加密HASH算法,复杂度本身就很高,带来的性能上的损害也不可避免)
* 等HASH算法要快很多,而且据说这个算法的碰撞率很低.
* http://murmurhash.googlepages.com/
*/
public Long hash(String key) {
ByteBuffer buf = ByteBuffer.wrap(key.getBytes());
int seed = 0x1234ABCD;
ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);
long m = 0xc6a4a7935bd1e995L;
int r = 47;
long h = seed ^ (buf.remaining() * m);
long k;
while (buf.remaining()>=8) {
k = buf.getLong();
k *= m;
k ^= k >>> r;
k *= m;
h ^= k;
h *= m;
}
if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(8).order(
ByteOrder.LITTLE_ENDIAN);
// for big-endian version, do this first:
// finish.position(8-buf.remaining());
finish.put(buf).rewind();
h ^= finish.getLong();
h *= m;
}
h ^= h >>> r;
h *= m;
h ^= h >>> r;
buf.order(byteOrder);
return h;
}
public static void main(String[] args) throws InterruptedException {
List<NodeInfo> list = new ArrayList<NodeInfo>();
NodeInfo ni =null;;
ni = new NodeInfo();
ni.setNodeIP("192.168.32.17");
ni.setMqttPort(8088);
list.add(ni);
ni = new NodeInfo();
ni.setNodeIP("192.168.32.18");
ni.setMqttPort(8088);
list.add(ni);
ni = new NodeInfo();
ni.setNodeIP("192.168.32.19");
ni.setMqttPort(8088);
list.add(ni);
ni = new NodeInfo();
ni.setNodeIP("192.168.2.20");
ni.setMqttPort(8088);
list.add(ni);
ni.setNodeIP("192.168.33.20");
ni.setMqttPort(8088);
list.add(ni);
ni = new NodeInfo();
ni.setNodeIP("192.168.35.20");
ni.setMqttPort(8088);
list.add(ni);
ni = new NodeInfo();
ni.setNodeIP("192.168.36.23");
ni.setMqttPort(8088);
list.add(ni);
ni = new NodeInfo();
ni.setNodeIP("192.168.3.20");
ni.setMqttPort(8088);
list.add(ni);
final SharedMqttNode<NodeInfo> sn = new SharedMqttNode<NodeInfo>(list);
TimerTask task = new TimerTask() {
@Override
public void run() {
String[] nodes = {"9922000000230198", "9320010864623470", "9922000002481525"};
for (int i = 0; i < nodes.length; i++){
System.out.println("[" + nodes[i] + "]的hash值为" +
sn.hash(nodes[i]) + ", 被路由到结点[" + sn.getShardInfo(nodes[i]).getNodeIP() + "]");
}
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
System.out.println("node size======="+sn.getReal().size());
NodeInfo ni = new NodeInfo();
ni.setNodeIP("192.168.5.17");
ni.setMqttPort(8080);
sn.add(ni);
System.out.println("node size >>>>>>>>>"+sn.getReal().size());
}
};
Timer timer = new Timer();
timer.schedule(task, 1000, 5000);
// final SharedMqttNode<NodeInfo> sn = new SharedMqttNode<NodeInfo>(list);
// ExecutorService es = Executors.newCachedThreadPool();
// final CountDownLatch cdl = new CountDownLatch(1000);
// // 1000个线程
// for (int j = 0; j < 1000; j++) {
// es.execute(new Runnable() {
//
// @Override
// public void run() {
// // Random rd = new Random(1100);
// for (int k = 0; k < 10000; k++) {
// sn.getShardInfo(String.valueOf(Math.random())).inc();
// }
// cdl.countDown();
// }
// });
// }
//
// // 等待所有线程结束
// cdl.await();
// List<NodeInfo> nodeList = sn.getReal();
// for (NodeInfo node : nodeList) {
// System.out.println("node" + node.getNodeIP()+ ":" + node.getCount());
// }
}
}
分享到:
相关推荐
一致性Hash算法旨在解决服务实例增加或移除时对现有服务调用的影响最小化问题。它通过构建虚拟节点来提高负载均衡的效果,保证了即使在服务实例动态变化的情况下也能维持较好的负载均衡。 **算法流程**: 1. **创建...
一致性哈希算法(Consistent Hashing)是一种特殊的哈希算法,它在分布式系统中被广泛应用,尤其是在分布式缓存系统中,用于提高系统的可伸缩性和降低节点变化带来的影响。在本示例中,PHP语言被用来实现这一算法,...
4. **Raft算法**:作为Paxos的一种简化实现,Raft算法更容易理解和实现,同样可以解决分布式一致性问题。它通过选举领导者来管理分布式日志,确保了集群中所有节点的状态同步。 5. **分布式锁**:在分布式系统中,...
- A*算法保证找到最短路径的条件是启发函数必须满足“允许性”(即h(n) ≤ 实际从n到目标的成本)和“一致性”(对于任意两个相邻节点n和m,h(n) ≤ cost(n,m) + h(m))。 - 高效性和可扩展性强,适用于多种应用...
常见的算法有轮询、加权轮询、最少连接数、哈希一致性等。 8. **带头结点的单链表反转**:反转链表的经典问题,可以使用迭代或递归方法,注意处理头结点和边界条件。 9. **ELFHash**:一种简单的散列函数,用于将...
- ConsistentHashLoadBalance:一致性Hash负载均衡,相同的请求参数总是被发送到相同的提供者,当某台提供者宕机时,基于虚拟节点将请求平摊到其它提供者,减少对系统的影响。 3. Dubbo的安全机制 为保证通信安全,...
4. **ConsistentHash LoadBalance**:一致性哈希策略,相同参数请求始终发送到同一提供者,减少服务提供者变动的影响。 【Dubbo是什么】 Dubbo是一个高性能、分布式、透明化的RPC服务框架,支持服务自动注册、发现...
- **Dijkstra算法怎么保证从第二组所有的顶点中选取到源点距离最小?**:Dijkstra算法通过不断更新最短路径并将其加入已知最短路径集合中,确保每次选择的顶点到源点的距离是最小的。 - **第190页倒数第八行应该是...
32. **Dijkstra算法怎么保证从第二组所有的顶点中选取到源点距离最小?** - Dijkstra算法通过逐步扩展最短路径树来保证找到源点到其他所有顶点的最短路径。关键是每次选择距离最小的未访问顶点加入到最短路径树中...
- **ConstantHashLoadBalance**:一致性Hash策略,相同参数请求总是发送到同一个提供者。 ### Dubbo简介 - **定义**:Dubbo是一个分布式、高性能、透明化的RPC服务框架,提供服务自动注册、自动发现等高效服务治理...
- 一致性Hash:通过Hash函数将节点和请求映射到一个圆环上,请求会被路由到顺时针方向最近的节点。 ### 三、数据一致性保证 - **缓冲区与数据库之间的一致性**:通常使用**加锁**机制来保证数据的一致性。例如,...
3. **数据切分**:面对海量数据,内存数据库通常采用数据切分技术,通过Hash算法将数据分散到多个节点,均衡负载,提高并发处理能力。这有助于处理高并发查询请求,确保系统的稳定性。 4. **存储策略**:内存数据库...
例如,对于一个大规模的键值存储系统,可以使用一致性哈希算法来将数据分布到不同的服务器节点上,从而实现高效的读写操作。 #### 8. 大规模数据排序与统计 在处理大规模数据时,如何高效地进行排序和统计是非常...
4. **Hadoop上的数据去重**:具体讨论如何在Hadoop环境下实现数据去重,可能涉及HDFS的特性,比如Block Checksums用于检测数据一致性,以及可能的数据去重算法,如Hash-based、Signature-based等。 5. **系统设计**...
- **数据一致性**:在分布式环境中,如何保证哈希分组后的数据一致性。 通过对"BDMS-main"这个压缩包文件的学习,学生能够深入理解哈希和分组在大数据分析中的应用,并掌握用Java编程语言实现这些概念的实际技巧。...
事务控制语言(Transactional Control Language,TCL),用于维护数据的一致性,包括COMMIT(提交事务)、ROLLBACK(回滚事务)和SAVEPOINT(设置保存点)3条语句 二、 Oracle的数据类型 类型 参数 描述 字符类型...