`
潜心修炼
  • 浏览: 18960 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HornetQ集群广播解析

阅读更多
HornetQ支持集群方式来支持扩展性,集群中的节点借助JGroup组件来进行节点间的通信。在UTP方式中,节点在间隔时间内不断的像广播地址中发送消息,而客户端的程序就可以通过监听这个广播地址,解析广播的内容就能够知道集群中有哪些主机。对于使用Java语言的开发者来说,HornetQ的JAVA客户端相关类屏蔽了这些底层代码,使得开发变得简单。然而对于其他语言,需要自己手动解析相关内容。
以Hornet2.1.2版本为准,HornetQ每个节点像广播地址广播的信息遵循以下格式:

节点名称的长度+节点名称+唯一标识长度+唯一标识+连接器个数+各个连接器信息

下面是解析的JAVA代码,包含的内部类主要负责控制读取。

客户端的调用只需要启动该类,然后获取hostInfoMap就能够获得集群中的主机和他们的连接信息以及备用连接信息。如果要客户端的主机信息能够实时更新,可以通过修改主机集合向客户端发消息的方式实现



/*
 * Discovery .java
 * 本类非线程安全
 */
package com.socket.multisocket;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

public class Discovery extends Thread {

  private static final String HOST = "host";

  private static final String PORT = "port";

  private static final long timeout = Integer.valueOf(System.getProperty("broadcast_host_timeout", "10000"));

  private static final String spliter = ":";
  
  //存储节点信息
  private static  Map<String,List<Map<String,String>>> hostInfoMap = new HashMap<String,List<Map<String,String>>>();
  
  //存储节点名称,主要是定时刷新
  public final static Map<String, Long> connectors = new HashMap<String, Long>();

  /**
   * 当前读的位置
   */
  private static int currentIndex = 0;

  public static int connectorPairsSize = 0;

 

  private String broadcastAddress;

  private int broadcastPort;

  public Discovery(String broadcastAddress, int broadcastPort) {
    super();
    this.broadcastAddress = broadcastAddress;
    this.broadcastPort = broadcastPort;
  }
  
  
  public static Map<String,List<Map<String,String>>> getHost(){
     return  hostInfoMap;
  }

 

  /**
   * 重置当前读指针
   ** 
   */
  public void putConnector(String key, Long currentTime) {
    connectors.put(key, currentTime);
 
  }

  public void validateConnectors() {
	
    Set<Entry<String, Long>> set = new HashSet<Entry<String,Long>>(connectors.entrySet());
    for (Entry<String, Long> entry : set) {
      if (entry.getValue() + timeout < System.currentTimeMillis()) {
        connectors.remove(entry.getKey());
        hostInfoMap.remove(entry.getKey());
      }
    }
  }

  public void run() {
    InetAddress group = null;
    MulticastSocket server = null;
    try {
      group = InetAddress.getByName(broadcastAddress);
      server = new MulticastSocket(broadcastPort);
      server.joinGroup(group);
      final byte[] data = new byte[65535];
      DatagramPacket recv = new DatagramPacket(data, data.length);
      for (;;) {
        server.receive(recv);
        String uniqueName = ParseRecieveData.parsebytes(recv
            .getData());
        ParseRecieveData.release();
        putConnector(uniqueName, System
            .currentTimeMillis());
        validateConnectors();
      }

    } catch (UnknownHostException e) {
      throw new RuntimeException("can not find this broadcast address:"+broadcastAddress);
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (server != null)
        try {
          server.leaveGroup(group);
        } catch (IOException e) {
        }
    }
  }

  private static class ParseRecieveData {
    private static final byte TYPE_BOOLEAN = 0;

    private static final byte TYPE_INT = 1;

    private static final byte TYPE_LONG = 2;

    private static final byte TYPE_STRING = 3;

    /**
     * 解析报文
     * 
     * @author Jombo
     * @param bytes
     */
    public static String parsebytes(byte[] bytes) {
      int nodeIdLength = getInt(bytes, currentIndex);
      getString(nodeIdLength, bytes, currentIndex);

      int uniqueIdLength = getInt(bytes, currentIndex);
      String uniqueName = getString(uniqueIdLength, bytes, currentIndex);

      connectorPairsSize = getInt(bytes, currentIndex);
      List<Map<String,String>> connectorList = new ArrayList<Map<String,String>>();
      // connectorPairsSize可能有多个
      for(int i =0 ;i<connectorPairsSize;i++){
       Map<String, String> connectorPair = parseConnectorPairInfo(bytes);
       connectorList.add(connectorPair);
     }
      
      hostInfoMap.put(uniqueName, connectorList);
      return uniqueName;
    }

    /**
     * 解析连接器信息
     * 
     * @author Jombo
     * @param bytes
     */
    private static Map<String, String> parseConnectorPairInfo(byte[] bytes) {
      Map<String, String>  ConnectorPairMap = parseConnectorInfo(bytes);
      Map<String, String> mainConnectorMap = parseConnectorInfo(bytes);
      byte existBackup =  getByte(bytes, currentIndex);
      boolean existbackupConnector =  existBackup == 0 ? false : true;
      Map<String, String> backConnectorMap = null;
      if(existbackupConnector){
    	  backConnectorMap = parseConnectorInfo(bytes);
      }
      //在这里我们只想看host+port的形式,有兴趣的朋友可以把全部信息放进去
      //这是为了获取主备连接
      String mainconnector = mainConnectorMap.get(HOST) + spliter + mainConnectorMap.get(PORT);
      String backconnector = backConnectorMap ==null ? null:backConnectorMap.get(HOST) + spliter + backConnectorMap.get(PORT);
      ConnectorPairMap.put(mainconnector, backconnector);
      return ConnectorPairMap;
    }

	private static Map<String, String> parseConnectorInfo(byte[] bytes) {
		Map<String, String> paramMap = new HashMap<String, String>();
		  int namelength = getInt(bytes, currentIndex);
		  getString(namelength, bytes, currentIndex);
		  int factorylength = getInt(bytes, currentIndex);
		  getString(factorylength, bytes, currentIndex);

		  int paramPairsSize = getInt(bytes, currentIndex);
		  
		  for (int j = 0; j < paramPairsSize; j++) {
		    int keylength = getInt(bytes, currentIndex);
		    String key = getString(keylength, bytes, currentIndex);
		    byte valuetype = getByte(bytes, currentIndex);
		    String value = "";
		    if (valuetype == TYPE_STRING) {
		      int valuelength = getInt(bytes, currentIndex);
		      value = getString(valuelength, bytes, currentIndex);
		    } else if (valuetype == TYPE_BOOLEAN) {
		      byte booleanvalue = getByte(bytes, currentIndex);
		      value = booleanvalue == 0 ? "false" : "true";
		    } else if (valuetype == TYPE_INT) {
		      value = String.valueOf(getInt(bytes, currentIndex));
		    } else if (valuetype == TYPE_LONG) {
		      value = String.valueOf(getLong(bytes, currentIndex));
		    } else {
		      throw new RuntimeException("invalid type");
		    }
		    paramMap.put(key, value);
		  }
		return paramMap;
	}

    /**
     * 读取一个Int
     * 
     * @author Jombo
     * @param bytes
     * @param index
     *          begin index
     * @return
     */
    public static int getInt(byte[] bytes, int index) {
      int val = (bytes[index] & 0xff) << 24 | (bytes[index + 1] & 0xff) << 16
          | (bytes[index + 2] & 0xff) << 8 | (bytes[index + 3] & 0xff) << 0;
      setCurrentIndex(index + 4);
      return val;
    }

    /**
     * 读取一个Byte
     * 
     * @author Jombo
     * @param bytes
     * @param index
     * @return
     */
    public static byte getByte(byte[] bytes, int index) {
      byte b = bytes[index];
      setCurrentIndex(index + 1);
      return b;
    }

    /**
     * 读取Short
     * 
     * @author Jombo
     * @param bytes
     * @param index
     * @return
     */
    public static short getShort(byte[] bytes, int index) {
      short indexshort = (short) (bytes[index] << 8 | bytes[index + 1] & 0xFF);
      setCurrentIndex(index + 2);
      return indexshort;
    }

    /**
     * 读取字符串,根据字符串的长度采取不同的格式
     * 
     * @author Jombo
     * @param length
     * @param bytes
     * @param index
     * @return
     */
    public static String getString(int length, byte[] bytes, int index) {
      StringBuffer stringBuffer = new StringBuffer();
      if (length < 9) {
        for (int i = 0; i < length; i++) {
          short indexshort = getShort(bytes, index);
          stringBuffer.append((char) indexshort);
          index = index + 2;
        }
      } else if (length < 0xfff) {
        short utflen = getShort(bytes, index);
        index = index + 2;
        byte[] strbyte = new byte[utflen];
        System.arraycopy(bytes, index, strbyte, 0, utflen);
        stringBuffer.append(new String(strbyte));
        setCurrentIndex(index + utflen);
      } else {
        int longlength = getInt(bytes, index);
        index = index + 4;
        byte[] strbyte = new byte[longlength];
        System.arraycopy(bytes, index, strbyte, 0, longlength);
        stringBuffer.append(new String(strbyte));
        setCurrentIndex(index + longlength);
      }
      return stringBuffer.toString();
    }

    /**
     * 读取Long
     * 
     * @author Jombo
     * @param bytes
     * @param index
     * @return
     */
    public static long getLong(byte[] bytes, int index) {
      setCurrentIndex(index + 8);
      return ((long) bytes[index] & 0xff) << 56
          | ((long) bytes[index + 1] & 0xff) << 48
          | ((long) bytes[index + 2] & 0xff) << 40
          | ((long) bytes[index + 3] & 0xff) << 32
          | ((long) bytes[index + 4] & 0xff) << 24
          | ((long) bytes[index + 5] & 0xff) << 16
          | ((long) bytes[index + 6] & 0xff) << 8
          | ((long) bytes[index + 7] & 0xff) << 0;
    }

    /**
     * 重置当前读指针
     ** 
     */
    public static void release() {
      currentIndex = 0;
    }

    public static void setCurrentIndex(int index) {
      currentIndex = index;
    }
  }
}




0
1
分享到:
评论

相关推荐

    HornetQ集群配置

    本篇文章将深入探讨HornetQ的集群配置,特别是主主集群以及通过UDP广播进行通信的方式。 ### 1. 集群概念 在HornetQ集群中,各个节点通过网络连接相互发现并建立连接。当一个节点故障时,其他节点能够接管其服务,...

    HornetQ官方学习资料

    4. **集群能力**: HornetQ提供了一个强大的集群解决方案,可以在多个服务器之间共享消息,提高了系统的可用性和容错性。 5. **灵活的消息API**: 除了传统的JMS API之外,HornetQ还提供了一套自定义的消息API,允许...

    HornetQ2.1中文手册

    ### HornetQ2.1中文手册关键知识点解析 #### 一、消息的相关概念 HornetQ2.1中文手册中详细介绍了消息处理系统的基本概念和技术细节,这对于理解和使用HornetQ至关重要。 ##### 4.1 消息相关的概念 - **消息**:在...

    HornetQ2.3 API 文档

    HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的...

    HornetQ2.1中文手册.7z

    5. **集群与高可用性**:介绍如何配置HornetQ集群,实现消息的负载均衡和故障转移。 6. **安全性**:详细阐述HornetQ的安全配置,包括用户认证、授权和加密通信。 7. **监控与管理**:讲解如何通过JMX或Web管理界面...

    HornetQ 2.1 中文文档

    集群 (Clustering) 支持庞大的消息(Message)- 如在只有 50 Megabyte 随机存取存储器的主机上,HornetQ 支持数以Terabyte计的消息队列 (Message Queue)[1] 设计理念: 高性能 - 单一主机运行HornetQ,速度可达...

    hornetq-2.3.0.Final-bin.zip

    通过编辑`configuration/hornetq.config.xml`,可以定制HornetQ的行为,如设置持久化存储、集群配置、传输协议、安全策略等。 **5. 使用HornetQ** 开发者可以通过JMS API创建生产者和消费者,与HornetQ服务器进行...

    hornetq 2.4.0免安装

    4. **集群与高可用性**:HornetQ支持集群部署,可以在多台服务器之间共享负载,提高服务的可用性和容错性。如果集群中的某个节点故障,其他节点可以接管其任务,确保服务不间断。 5. **性能优化**:HornetQ 2.4.0 ...

    Hornetq2.1中文手册

    - **消息的广播/订阅模式**(Publish-Subscribe):消息发布到一个主题,多个订阅者可以接收到,适用于一对多的通信。 - **传送的可靠性**:讨论了如何保证消息的可靠传输,包括确认机制、重试策略等。 - **交易...

    hornetq 实例

    - **高可用性**:支持集群和故障转移,确保服务连续性。 - **灵活性**:支持多种传输协议,如TCP、HTTP、HTTPS,以及多种消息模式,如点对点和发布/订阅。 - **轻量级**:设计简洁,无需依赖大量外部库,适合...

    hermes 监听hornetq JMS配置步奏

    在IT行业中,消息传递系统是分布式应用程序之间进行通信的关键组件,而HornetQ和Hermes都是此类系统的重要组成部分。HornetQ是一个高性能、轻量级且完全开源的消息中间件,它提供了JMS(Java消息服务)接口,允许...

    ActiveMQ和HornetQ性能对比

    ### ActiveMQ与HornetQ性能对比分析 #### 概述 本文旨在通过一系列测试数据对比分析ActiveMQ与HornetQ在不同消息大小及数量下的性能表现。测试环境为相同的硬件配置,确保了测试结果的公正性。通过对比两者的发送...

    .net 连接HornetQ,需要的dll

    .NET 连接HornetQ是一项关键的技术任务,HornetQ是一款开源的消息中间件,它提供了高效、可扩展和高可用性的消息传递服务。在.NET环境中与HornetQ进行交互,通常需要借助特定的客户端库,如Apache.NMS.Stomp。下面将...

    hornetq-journal-2.3.19.Final.zip

    ZooKeeper是一个分布式的协调服务,常用于管理配置信息、命名服务、集群同步以及分组服务等。 【标签】"开源项目" 表明这两个压缩包内容都是开放源代码的软件项目,这意味着它们遵循特定的开源许可证,允许用户自由...

    hornetq-2.2.5.Final.zip

    hornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.zip

    HornetQ 2_1用户手册

    什么是HornetQ? * HornetQ 是一个开源的软件项目。它的目标是一个多协议、可嵌入、高性能、可集群的异步消息系统。 * HornetQ 是一个消息中间件(MoM)。有关MoM和其它消息相关的概念解释请参见 Chapter 4, ...

    hornetq-2.4.0.Final-bin.tar

    hornetq安装包, hornetq-2.4.0.Final-bin.tar 消息中间件 供项目中数据交互使用

    hornetq-transports-2.0.0.GA.jar

    hornetq-transports-2.0.0.GA.jar

    HornetQ Messaging Developer's Guide.pdf

    HornetQ是java开源实现的消息系统框架,性能上比ActiveQ要好一些,被集成到JBoss的消息服务中。 Table of Contents Preface 1 Chapter 1: Getting Started with HornetQ 9 Chapter 2: Setting Up HornetQ 31 ...

    HornetQ2.0.0GA

    3. 分布式:HornetQ 支持集群部署,可以轻松地在多台服务器之间扩展,以满足大规模并发和高负载需求。集群内的节点可以共享负载,提供冗余和故障切换能力。 4. 灵活性:HornetQ 具有广泛的配置选项,允许开发者根据...

Global site tag (gtag.js) - Google Analytics