测试方式:
使用我们之前使用的脚本,在hornetq做failover的环境下,施加很大的压力(50个线程),看failover能否成功(看有没有丢数据,主-副机能不能正常的切换过来)
具体的操作方式是:
Hornetq自带的example有HA这块的测试脚本
/hornetq-2.1.2.Final/examples/jms/non-transaction-failover
/hornetq-2.1.2.Final/examples/jms/transaction-failover
执行[bes@test157 transaction-failover]$./build.sh
带broker起来后启动压力测试脚本
需要修改的配置
hornetq-jms.xml
增加对应的destination(为我们压力脚本设置的目的地)
<topic name="topic1">
<entry name="/my/Topic1"/>
</topic>
hornetq-configuration.xml
在<configuration>节点下增加
<security-enabled>false</security-enabled>
hornetq-beans.xml和client-jndi.properties中对应的localhost替换为本机的ip
增加两个对应的topic的测试类,在原有的用例中没有topic的测试用例
日志数据的比较:
Hornetq日志数据的比较调用堆栈:
ReplicationCompareDataMessage
-decode:PacketDecoder
--bufferReceived:RemotingConnectionImpl
----bufferReceived:DelegatingBufferHandler
--------bufferReceived:DelegatingBufferHandler
日志比较的关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl
public void compareJournalInformation(
final JournalLoadInformation[] journalInformation) throws
HornetQException
{
if (journalLoadInformation == null ||
journalLoadInformation.length != journalInformation.length)
{
throw new HornetQException(
HornetQException.INTERNAL_ERROR,
"Live Node contains more journals than the backup node. Probably a version match error");
}
for (int i = 0; i < journalInformation.length; i++)
{
if (!journalInformation[i].equals
(journalLoadInformation[i])) {
ReplicationEndpointImpl.log
.warn("Journal comparission mismatch:\n" +
journalParametersToString(journalInformation));
}
}
}
这里面重写了org.hornetq.core.journal.JournalLoadInformation的equals方法,在看看它的equals方法
JournalLoadInformation other = (JournalLoadInformation) obj;
if(maxID != other.maxID){
return false;
}
if(numberOfRecords != other.numberOfRecords){
return false;
}
return true;
从上面我们可以看出,它比较的是maxID以及numberOfRecords这两个值。我们在看看其中的一个赋值的地方:
public void decodeRest(final HornetQBuffer buffer) {
int numberOfJournals = buffer.readInt();
journalInformation = new JournalLoadInformation[numberOfJournals];
for (int i = 0; i < numberOfJournals; i++) {
journalInformation[i] = new JournalLoadInformation();
journalInformation[i].setNumberOfRecords(buffer.readInt());
journalInformation[i].setMaxID(buffer.readLong());
}
}
就目前的调查来看,在启动HornetQ和创建session会话的时候会调用到日志比较。下图是比较日志调用的路线
compareJournals(ReplicationManagerImpl)
compareJournals(HornetQServerImpl)
initialisePart2(HornetQServerImpl)
checkActivate(HornetQServerImpl)
handleCreateSession(HornetQPacketHandler)
handleReattachSession(HornetQPacketHandler)
start(HornetQServerImpl)
日志复制通道
RepliactionManagerImpl#start方法用于获取一个与备原机器的连接,创建用于日志复制的会话
:
start(ReplicationManagerImpl)
activated(JMSServerMangerImpl)
createJournal(JMSServerMangerImpl)
initJournal(JMSServerMangerImpl)
activated(JMSServerMangerImpl)
callActivatedCallbacks(HornetQServerImpl)
initialisePart2(HornetQServerImpl)
checkActivate(HornetQServerImpl)
start(HornetQServerImpl)
关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl#start
public synchronized void start() throws Exception {
// 获取和备原机器的连接
replicatingConnection = failoverManager.getConnection();
long channelID = replicatingConnection.generateChannelID();
Channel mainChannel = replicatingConnection.getChannel(1, -1);
replicatingChannel = replicatingConnection.getChannel(channelID, -1);
replicatingChannel.setHandler(responseHandler);
CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(
channelID);
// 发送一个创建拷贝会话的命令(PacketImpl.CREATE_REPLICATION)
mainChannel.sendBlocking(replicationStartPackage);
}
上面发出的消息HorentQPacketHanler#handlePacket(final Packet packet)会处理
case CREATE_REPLICATION: {
//Create queue can also be fielded here in the case of a replicated store and forward queue creation
CreateRelicationSessionMessage request = (CreateRelicationSessionMessage)packet;
handleCreateReplication(request);
break;
}
日志的同步
调用ReplicationManagerImpl#sendReplicatePacket来复制日志;这个方法的调用者很多,如消息的发送,结束发送,创建连接工厂,物理目的地等操作
sendReplicatePacket(ReplicationManagerImpl)
appendAddRecord(ReplicatedJournal)
appendAddRecord(ReplicatedJournal)
storeMessage(JournalStroageManager)
processRoute(PostOfficeImpl)
redistribute(PostOfficeImpl)
route(PostOfficeImpl)
routeQueueInfo(PostOfficeImpl)
关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl#sendReplicationPacket
private void sendReplicatePacket(final Pakcet packet)
{
boolean runItNow = false;
OperationContext repliToken = OperationContextImpl
.getContext(executorFactory);
repliToken.replicationLineUp();
synchronized (replicationLock)
{
if (!enabled)
{
runItNow = true;
}
else
{
pendingTokens.add(repliToken);
replicatingChannel.send(packet);
}
}
if (runItNow)
{
repliToken.relicationDone();
}
}
收到一条消息就发到replicatingChannel,做到了日志同步。
replicatingChannel的建立
org.hornetq.core.replication.impl.ReplicationManagerImpl#start
public synchronized void start() throws Exception {
replicatingConnection = failoverManager.getConnection();
long channelID = replicatingConnection.generateChannelID();
//在这里为replicatingChannel赋值
replicatingChannel = replicatingConnection.getChannel(channelID, -1);
replicatingChanne.setHandler(responseHandler);
}
主备机器切换
HornetQ的失效备原是在客户端层面来做的
,之前会注册一个用于失效备原的监听器,当监听到异常时,就会尝试进行失效备原;下面是在发生失效备原时的调用堆栈:
FailoverManagerImpl#failoverOrReconnect
FailoverManagerImpl#handleConnectionFailure
FailoverManagerImpl$DelegatingFailureListener#connectionFailed
RemotingConnectionImpl#callFailureListeners
RemotingConnectionImpl#fail
FailoverManagerImpl$ChannelOHandler
Failover关键代码org.hornetq.core.client.impl.FailoverManagerImpl#failoverOrReconnect
分享到:
相关推荐
5. **灵活的消息API**: 除了传统的JMS API之外,HornetQ还提供了一套自定义的消息API,允许开发者更加高效地利用HornetQ的功能。 #### 三、HornetQ与JMS的关系 - **JMS兼容性**: HornetQ完全支持JMS 1.1规范,这...
通过以上分析,我们可以看出"HornetqTest"实例旨在帮助开发者理解并实践如何使用HornetQ进行消息传递。这个实例应该包含了创建、配置和操作HornetQ服务器以及发送和接收消息的基础示例,是学习和测试HornetQ功能的...
4. **消息的相关概念**:这是理解 HornetQ 功能的基础,涵盖了消息系统的核心概念。 - **消息相关的概念**:包括消息的发送、接收、存储和传输,以及消息的生命周期。 - **消息的种类**:详细讲解两种主要的消息...
为了确保服务的连续性和稳定性,HornetQ提供了高可用性(HA)特性,如集群和故障转移。 ##### 4.8 集群 集群功能允许消息在多台服务器之间共享,提高了系统的容错能力和扩展能力。 ##### 4.9 桥接(BRIDGE)和路由...
HornetQ 2.3.0 Alpha 发布,这不是一个简单的 Alpha 版本,同时也是一个大的发行版本。该版本对 2.2.0 进行了重构,引入一些原子故障迁移特性和大量企业特性改进。详细的新特性介绍请看发行说明。 HornetQ是一个...
HornetQ是JBoss社区所研发的开放源代码消息中间件;HornetQ是以Java 5 编写,因此只需要操作系统支持Java虚拟机,HornetQ便可运行。 支持Java消息服务 (JMS) 1.1 版本 集群 (Clustering) 支持庞大的消息(Message)...
HornetQ是一个功能强大的消息传递系统,支持JMS(Java Message Service)规范,允许应用程序通过异步通信进行解耦。它具有多种传输协议,如TCP/IP、HTTP、HTTPS以及NIO(非阻塞I/O)和UDP,使得HornetQ可以在各种...
在IT行业中,消息传递系统是分布式应用程序之间进行通信的关键组件,而HornetQ和Hermes都是此类系统的重要组成部分。HornetQ是一个高性能、轻量级且完全开源的消息中间件,它提供了JMS(Java消息服务)接口,允许...
### ActiveMQ与HornetQ性能对比分析 #### 概述 本文旨在通过一系列测试数据对比分析ActiveMQ与HornetQ在不同消息大小及数量下的性能表现。测试环境为相同的硬件配置,确保了测试结果的公正性。通过对比两者的发送...
HornetQ 2.4.0 是一款轻量级且高效的开源消息中间件(Message Queuing,简称MQ),它提供了可靠的消息传递服务,适用于分布式系统中的数据通信。这款MQ解决方案设计目标是高吞吐量、低延迟以及可扩展性,使得在大...
.NET 连接HornetQ是一项关键的技术任务,HornetQ是一款开源的消息中间件,它提供了高效、可扩展和高可用性的消息传递服务。...HornetQ的灵活性和强大的功能使得它成为许多.NET应用程序理想的异步通信解决方案。
HornetQ是JBoss公司开发的一个开源消息中间件,它在Java消息服务(JMS)规范的基础上提供了高效、可扩展且高度可靠的异步通信功能。这个“HornetQ 2.1中文手册”是一个压缩包文件,包含了对HornetQ 2.1版本的详细...
hornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.ziphornetq-2.2.5.Final.zip
HornetQ是JBoss社区开发的一个高性能、可扩展且功能丰富的开源消息传递系统,它被广泛用于企业级应用中的异步通信和事件驱动架构。 【描述】"Sumac.zip,scala中的sumac ext zkargument解析" 提到的Sumac是一个基于...
hornetq安装包, hornetq-2.4.0.Final-bin.tar 消息中间件 供项目中数据交互使用
HornetQ是一款高性能、可伸缩且开源的消息中间件,它被广泛用于构建分布式系统中的消息传递。在HornetQ中,集群配置是一种重要的特性,它允许多个HornetQ服务器形成一个集群,共享资源,提高可用性和可扩展性。本篇...
HornetQ提供自动客户端失效备援(automatic client failover)功能,能保证在服务器故障时没有消息丢失或消息重复。 * 超级灵活的集群方案。可以控制集群进行消息负载均衡的方式。分布在不同地理位置的各个集群间...
hornetq-transports-2.0.0.GA.jar
HornetQ是java开源实现的消息系统框架,性能上比ActiveQ要好一些,被集成到JBoss的消息服务中。 Table of Contents Preface 1 Chapter 1: Getting Started with HornetQ 9 Chapter 2: Setting Up HornetQ 31 ...
hornetQ 2.4.0