HornetQ支持集群方式来支持扩展性,集群中的节点借助JGroup组件来进行节点间的通信。在UTP方式中,节点在间隔时间内不断的像广播地址中发送消息,而客户端的程序就可以通过监听这个广播地址,解析广播的内容就能够知道集群中有哪些主机。对于使用Java语言的开发者来说,HornetQ的JAVA客户端相关类屏蔽了这些底层代码,使得开发变得简单。然而对于其他语言,需要自己手动解析相关内容。
以Hornet2.1.2版本为准,HornetQ每个节点像广播地址广播的信息遵循以下格式:
节点名称的长度+节点名称+唯一标识长度+唯一标识+连接器个数+各个连接器信息
下面是解析的JAVA代码,包含的内部类主要负责控制读取。
客户端的调用只需要启动该类,然后获取hostInfoMap就能够获得集群中的主机和他们的连接信息以及备用连接信息。如果要客户端的主机信息能够实时更新,可以通过修改主机集合向客户端发消息的方式实现
/*
* Discovery .java
* 本类非线程安全
*/
package com.socket.multisocket;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
public class Discovery extends Thread {
private static final String HOST = "host";
private static final String PORT = "port";
private static final long timeout = Integer.valueOf(System.getProperty("broadcast_host_timeout", "10000"));
private static final String spliter = ":";
//存储节点信息
private static Map<String,List<Map<String,String>>> hostInfoMap = new HashMap<String,List<Map<String,String>>>();
//存储节点名称,主要是定时刷新
public final static Map<String, Long> connectors = new HashMap<String, Long>();
/**
* 当前读的位置
*/
private static int currentIndex = 0;
public static int connectorPairsSize = 0;
private String broadcastAddress;
private int broadcastPort;
public Discovery(String broadcastAddress, int broadcastPort) {
super();
this.broadcastAddress = broadcastAddress;
this.broadcastPort = broadcastPort;
}
public static Map<String,List<Map<String,String>>> getHost(){
return hostInfoMap;
}
/**
* 重置当前读指针
**
*/
public void putConnector(String key, Long currentTime) {
connectors.put(key, currentTime);
}
public void validateConnectors() {
Set<Entry<String, Long>> set = new HashSet<Entry<String,Long>>(connectors.entrySet());
for (Entry<String, Long> entry : set) {
if (entry.getValue() + timeout < System.currentTimeMillis()) {
connectors.remove(entry.getKey());
hostInfoMap.remove(entry.getKey());
}
}
}
public void run() {
InetAddress group = null;
MulticastSocket server = null;
try {
group = InetAddress.getByName(broadcastAddress);
server = new MulticastSocket(broadcastPort);
server.joinGroup(group);
final byte[] data = new byte[65535];
DatagramPacket recv = new DatagramPacket(data, data.length);
for (;;) {
server.receive(recv);
String uniqueName = ParseRecieveData.parsebytes(recv
.getData());
ParseRecieveData.release();
putConnector(uniqueName, System
.currentTimeMillis());
validateConnectors();
}
} catch (UnknownHostException e) {
throw new RuntimeException("can not find this broadcast address:"+broadcastAddress);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (server != null)
try {
server.leaveGroup(group);
} catch (IOException e) {
}
}
}
private static class ParseRecieveData {
private static final byte TYPE_BOOLEAN = 0;
private static final byte TYPE_INT = 1;
private static final byte TYPE_LONG = 2;
private static final byte TYPE_STRING = 3;
/**
* 解析报文
*
* @author Jombo
* @param bytes
*/
public static String parsebytes(byte[] bytes) {
int nodeIdLength = getInt(bytes, currentIndex);
getString(nodeIdLength, bytes, currentIndex);
int uniqueIdLength = getInt(bytes, currentIndex);
String uniqueName = getString(uniqueIdLength, bytes, currentIndex);
connectorPairsSize = getInt(bytes, currentIndex);
List<Map<String,String>> connectorList = new ArrayList<Map<String,String>>();
// connectorPairsSize可能有多个
for(int i =0 ;i<connectorPairsSize;i++){
Map<String, String> connectorPair = parseConnectorPairInfo(bytes);
connectorList.add(connectorPair);
}
hostInfoMap.put(uniqueName, connectorList);
return uniqueName;
}
/**
* 解析连接器信息
*
* @author Jombo
* @param bytes
*/
private static Map<String, String> parseConnectorPairInfo(byte[] bytes) {
Map<String, String> ConnectorPairMap = parseConnectorInfo(bytes);
Map<String, String> mainConnectorMap = parseConnectorInfo(bytes);
byte existBackup = getByte(bytes, currentIndex);
boolean existbackupConnector = existBackup == 0 ? false : true;
Map<String, String> backConnectorMap = null;
if(existbackupConnector){
backConnectorMap = parseConnectorInfo(bytes);
}
//在这里我们只想看host+port的形式,有兴趣的朋友可以把全部信息放进去
//这是为了获取主备连接
String mainconnector = mainConnectorMap.get(HOST) + spliter + mainConnectorMap.get(PORT);
String backconnector = backConnectorMap ==null ? null:backConnectorMap.get(HOST) + spliter + backConnectorMap.get(PORT);
ConnectorPairMap.put(mainconnector, backconnector);
return ConnectorPairMap;
}
private static Map<String, String> parseConnectorInfo(byte[] bytes) {
Map<String, String> paramMap = new HashMap<String, String>();
int namelength = getInt(bytes, currentIndex);
getString(namelength, bytes, currentIndex);
int factorylength = getInt(bytes, currentIndex);
getString(factorylength, bytes, currentIndex);
int paramPairsSize = getInt(bytes, currentIndex);
for (int j = 0; j < paramPairsSize; j++) {
int keylength = getInt(bytes, currentIndex);
String key = getString(keylength, bytes, currentIndex);
byte valuetype = getByte(bytes, currentIndex);
String value = "";
if (valuetype == TYPE_STRING) {
int valuelength = getInt(bytes, currentIndex);
value = getString(valuelength, bytes, currentIndex);
} else if (valuetype == TYPE_BOOLEAN) {
byte booleanvalue = getByte(bytes, currentIndex);
value = booleanvalue == 0 ? "false" : "true";
} else if (valuetype == TYPE_INT) {
value = String.valueOf(getInt(bytes, currentIndex));
} else if (valuetype == TYPE_LONG) {
value = String.valueOf(getLong(bytes, currentIndex));
} else {
throw new RuntimeException("invalid type");
}
paramMap.put(key, value);
}
return paramMap;
}
/**
* 读取一个Int
*
* @author Jombo
* @param bytes
* @param index
* begin index
* @return
*/
public static int getInt(byte[] bytes, int index) {
int val = (bytes[index] & 0xff) << 24 | (bytes[index + 1] & 0xff) << 16
| (bytes[index + 2] & 0xff) << 8 | (bytes[index + 3] & 0xff) << 0;
setCurrentIndex(index + 4);
return val;
}
/**
* 读取一个Byte
*
* @author Jombo
* @param bytes
* @param index
* @return
*/
public static byte getByte(byte[] bytes, int index) {
byte b = bytes[index];
setCurrentIndex(index + 1);
return b;
}
/**
* 读取Short
*
* @author Jombo
* @param bytes
* @param index
* @return
*/
public static short getShort(byte[] bytes, int index) {
short indexshort = (short) (bytes[index] << 8 | bytes[index + 1] & 0xFF);
setCurrentIndex(index + 2);
return indexshort;
}
/**
* 读取字符串,根据字符串的长度采取不同的格式
*
* @author Jombo
* @param length
* @param bytes
* @param index
* @return
*/
public static String getString(int length, byte[] bytes, int index) {
StringBuffer stringBuffer = new StringBuffer();
if (length < 9) {
for (int i = 0; i < length; i++) {
short indexshort = getShort(bytes, index);
stringBuffer.append((char) indexshort);
index = index + 2;
}
} else if (length < 0xfff) {
short utflen = getShort(bytes, index);
index = index + 2;
byte[] strbyte = new byte[utflen];
System.arraycopy(bytes, index, strbyte, 0, utflen);
stringBuffer.append(new String(strbyte));
setCurrentIndex(index + utflen);
} else {
int longlength = getInt(bytes, index);
index = index + 4;
byte[] strbyte = new byte[longlength];
System.arraycopy(bytes, index, strbyte, 0, longlength);
stringBuffer.append(new String(strbyte));
setCurrentIndex(index + longlength);
}
return stringBuffer.toString();
}
/**
* 读取Long
*
* @author Jombo
* @param bytes
* @param index
* @return
*/
public static long getLong(byte[] bytes, int index) {
setCurrentIndex(index + 8);
return ((long) bytes[index] & 0xff) << 56
| ((long) bytes[index + 1] & 0xff) << 48
| ((long) bytes[index + 2] & 0xff) << 40
| ((long) bytes[index + 3] & 0xff) << 32
| ((long) bytes[index + 4] & 0xff) << 24
| ((long) bytes[index + 5] & 0xff) << 16
| ((long) bytes[index + 6] & 0xff) << 8
| ((long) bytes[index + 7] & 0xff) << 0;
}
/**
* 重置当前读指针
**
*/
public static void release() {
currentIndex = 0;
}
public static void setCurrentIndex(int index) {
currentIndex = index;
}
}
}
分享到:
相关推荐
本篇文章将深入探讨HornetQ的集群配置,特别是主主集群以及通过UDP广播进行通信的方式。 ### 1. 集群概念 在HornetQ集群中,各个节点通过网络连接相互发现并建立连接。当一个节点故障时,其他节点能够接管其服务,...
4. **集群能力**: HornetQ提供了一个强大的集群解决方案,可以在多个服务器之间共享消息,提高了系统的可用性和容错性。 5. **灵活的消息API**: 除了传统的JMS API之外,HornetQ还提供了一套自定义的消息API,允许...
### HornetQ2.1中文手册关键知识点解析 #### 一、消息的相关概念 HornetQ2.1中文手册中详细介绍了消息处理系统的基本概念和技术细节,这对于理解和使用HornetQ至关重要。 ##### 4.1 消息相关的概念 - **消息**:在...
HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的...
5. **集群与高可用性**:介绍如何配置HornetQ集群,实现消息的负载均衡和故障转移。 6. **安全性**:详细阐述HornetQ的安全配置,包括用户认证、授权和加密通信。 7. **监控与管理**:讲解如何通过JMX或Web管理界面...
集群 (Clustering) 支持庞大的消息(Message)- 如在只有 50 Megabyte 随机存取存储器的主机上,HornetQ 支持数以Terabyte计的消息队列 (Message Queue)[1] 设计理念: 高性能 - 单一主机运行HornetQ,速度可达...
通过编辑`configuration/hornetq.config.xml`,可以定制HornetQ的行为,如设置持久化存储、集群配置、传输协议、安全策略等。 **5. 使用HornetQ** 开发者可以通过JMS API创建生产者和消费者,与HornetQ服务器进行...
4. **集群与高可用性**:HornetQ支持集群部署,可以在多台服务器之间共享负载,提高服务的可用性和容错性。如果集群中的某个节点故障,其他节点可以接管其任务,确保服务不间断。 5. **性能优化**:HornetQ 2.4.0 ...
- **消息的广播/订阅模式**(Publish-Subscribe):消息发布到一个主题,多个订阅者可以接收到,适用于一对多的通信。 - **传送的可靠性**:讨论了如何保证消息的可靠传输,包括确认机制、重试策略等。 - **交易...
- **高可用性**:支持集群和故障转移,确保服务连续性。 - **灵活性**:支持多种传输协议,如TCP、HTTP、HTTPS,以及多种消息模式,如点对点和发布/订阅。 - **轻量级**:设计简洁,无需依赖大量外部库,适合...
在IT行业中,消息传递系统是分布式应用程序之间进行通信的关键组件,而HornetQ和Hermes都是此类系统的重要组成部分。HornetQ是一个高性能、轻量级且完全开源的消息中间件,它提供了JMS(Java消息服务)接口,允许...
### ActiveMQ与HornetQ性能对比分析 #### 概述 本文旨在通过一系列测试数据对比分析ActiveMQ与HornetQ在不同消息大小及数量下的性能表现。测试环境为相同的硬件配置,确保了测试结果的公正性。通过对比两者的发送...
.NET 连接HornetQ是一项关键的技术任务,HornetQ是一款开源的消息中间件,它提供了高效、可扩展和高可用性的消息传递服务。在.NET环境中与HornetQ进行交互,通常需要借助特定的客户端库,如Apache.NMS.Stomp。下面将...
ZooKeeper是一个分布式的协调服务,常用于管理配置信息、命名服务、集群同步以及分组服务等。 【标签】"开源项目" 表明这两个压缩包内容都是开放源代码的软件项目,这意味着它们遵循特定的开源许可证,允许用户自由...
hornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.zip
什么是HornetQ? * HornetQ 是一个开源的软件项目。它的目标是一个多协议、可嵌入、高性能、可集群的异步消息系统。 * HornetQ 是一个消息中间件(MoM)。有关MoM和其它消息相关的概念解释请参见 Chapter 4, ...
hornetq安装包, hornetq-2.4.0.Final-bin.tar 消息中间件 供项目中数据交互使用
hornetq-transports-2.0.0.GA.jar
HornetQ是java开源实现的消息系统框架,性能上比ActiveQ要好一些,被集成到JBoss的消息服务中。 Table of Contents Preface 1 Chapter 1: Getting Started with HornetQ 9 Chapter 2: Setting Up HornetQ 31 ...
3. 分布式:HornetQ 支持集群部署,可以轻松地在多台服务器之间扩展,以满足大规模并发和高负载需求。集群内的节点可以共享负载,提供冗余和故障切换能力。 4. 灵活性:HornetQ 具有广泛的配置选项,允许开发者根据...