`
y806839048
  • 浏览: 1120519 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

测试ActiveMQ主从复制

阅读更多

前天搭建了一个避免单点故障的基于zokeeper和leveldb的store。昨天测过了,kill掉Master,客户端确实是可以自动切换到被新推举出来的broker。但是消息是否也被复制过去了。会不会造成消息的丢失还需要测试一下。 测试流程

1.编写Sender和Receiver代码,通过客户端配置failover 进行一次正常的消息的发送和收,确保消息的传递是正常的。 
2.通过Sender发送一段正常的消息到broker,但是不接受,然后kill 掉当前主broker,然后在运行Receiver代码,查看消息是否接受到。 步骤一

 

 

package com.shiquanjiumei.test; 

import javax.jms.Connection; 

import javax.jms.ConnectionFactory; 

import javax.jms.DeliveryMode; 

import javax.jms.Destination; 

import javax.jms.MessageProducer; 

import javax.jms.Session; 

import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnection; 

import org.apache.activemq.ActiveMQConnectionFactory; 

public class TestMQSender { 

private static final int SEND_NUMBER = 5; 

public static void main(String[] args) 

{ // ConnectionFactory :连接工厂,JMS 用它创建连接 

ConnectionFactory connectionFactory; 

// Connection :JMS 客户端到JMS Provider 的连接 

Connection connection = null; 

// Session: 一个发送或接收消息的线程 

Session session; 

// Destination :消息的目的地;消息发送给谁. 

Destination destination; 

// MessageProducer:消息发送者 

MessageProducer producer; 

// TextMessage message;

// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现

jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,

"failover:(tcp://IP:61616,tcp://IP:61617,tcp://IP:61618)"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); 

// 启动 

connection.start(); 

// 获取操作连接 

session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 

// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 

destination = session.createQueue("FirstQueue"); 

// 得到消息生成者【发送者】 

producer = session.createProducer(destination); 

// 设置不持久化,此处学习,实际根据项目决定

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 

// 构造消息,此处写死,项目就是参数,或者方法获取 

sendMessage(session, producer); session.commit(); 

} catch (Exception e) { e.printStackTrace(); } finally { 

try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } 

public static void sendMessage(Session session, MessageProducer producer) throws Exception {

// for (int i = 1; i <= SEND_NUMBER; i++) { 

TextMessage message = session .createTextMessage("ActiveMq 发送的消息:yivan" /*+ i*/); 

// 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息:yivan" /*+ i*/); 

producer.send(message); 

// } 

} }

 

 

接受端代码

 

 

 

package com.shiquanjiumei.test; 

import javax.jms.Connection; 

import javax.jms.ConnectionFactory; 

import javax.jms.Destination; 

import javax.jms.MessageConsumer; 

import javax.jms.Session; 

import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnection; 

import org.apache.activemq.ActiveMQConnectionFactory; 

public class TestMQReceiver {

public static void main(String[] args) {

// ConnectionFactory :连接工厂,JMS 用它创建连接 

ConnectionFactory connectionFactory; 

// Connection :JMS 客户端到JMS Provider 的连接 

Connection connection = null; // Session: 一个发送或接收消息的线程

Session session; 

// Destination :消息的目的地;消息发送给谁. 

Destination destination;

// 消费者,消息接收者 

MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, 

"failover:(tcp://IP:61616,tcp://IP:61617,tcp://IP:61618)"); try {

// 构造从工厂得到连接对象 

connection = connectionFactory.createConnection();

// 启动 connection.start(); 

// 获取操作连接 

session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 

// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 

destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination); while (true) {

//设置接收者接收消息的时间,为了便于测试,这里谁定为100s 

TextMessage message = (TextMessage) consumer.receive(100000); 

if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } } } 

catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }

 

 

步骤一比较简单,结果表明现阶段运行时正常的

发送消息:ActiveMq 发送的消息:yivan

步骤二

先运行Sender,向队列中发送消息,可以看到有一条待消费信息,但是没有消费者。 
测试ActiveMQ主从复制

接下来,登录linux kill掉主Master,我现在的Master是61617.通过观察日志 
在杀死主进程的一刹那是拒绝链接的,最后当61618被推出来当主的时候,又正常了。

DEBUG 2015-12-09 15:07:12,718 org.apache.activemq.util.ThreadPoolUtils: Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@288c2c89[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1] DEBUG 2015-12-09 15:07:12,718 org.apache.activemq.transport.failover.FailoverTransport: Attempting 7th connect to: tcp://127.0.0.1:61616 DEBUG 2015-12-09 15:07:12,719 org.apache.activemq.transport.failover.FailoverTransport: Connect fail to: tcp://127.0.0.1:61616, reason: java.net.ConnectException: 拒绝连接 DEBUG 2015-12-09 15:07:12,719 org.apache.activemq.transport.tcp.TcpTransport: Stopping transport tcp://127.0.0.1:61616 DEBUG 2015-12-09 15:07:12,719 org.apache.activemq.thread.TaskRunnerFactory: Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@61501cb0[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] DEBUG 2015-12-09 15:07:12,722 org.apache.activemq.transport.tcp.TcpTransport$1: Closed socket Socket[unconnected] DEBUG 2015-12-09 15:07:12,722 org.apache.activemq.util.ThreadPoolUtils: Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@61501cb0[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1] DEBUG 2015-12-09 15:07:12,722 org.apache.activemq.transport.failover.FailoverTransport: Attempting 7th connect to: tcp://127.0.0.1:61617 DEBUG 2015-12-09 15:07:12,723 org.apache.activemq.transport.failover.FailoverTransport: Connect fail to: tcp://127.0.0.1:61617, reason: java.net.ConnectException: 拒绝连接 DEBUG 2015-12-09 15:07:12,724 org.apache.activemq.transport.tcp.TcpTransport: Stopping transport tcp://127.0.0.1:61617 DEBUG 2015-12-09 15:07:12,724 org.apache.activemq.thread.TaskRunnerFactory: Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@4a66875b[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] DEBUG 2015-12-09 15:07:12,724 org.apache.activemq.transport.tcp.TcpTransport$1: Closed socket Socket[unconnected] DEBUG 2015-12-09 15:07:12,724 org.apache.activemq.util.ThreadPoolUtils: Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@4a66875b[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1] DEBUG 2015-12-09 15:07:12,725 org.apache.activemq.transport.failover.FailoverTransport: Waiting 1280 ms before attempting connection DEBUG 2015-12-09 15:07:14,006 org.apache.activemq.transport.failover.FailoverTransport: urlList connectionList:[tcp://127.0.0.1:61618, tcp://127.0.0.1:61616, tcp://127.0.0.1:61617], from: [tcp://127.0.0.1:61616, tcp://127.0.0.1:61617, tcp://127.0.0.1:61618] DEBUG 2015-12-09 15:07:14,006 org.apache.activemq.transport.failover.FailoverTransport: Waiting 2560 ms before attempting connection. DEBUG 2015-12-09 15:07:16,196 org.springframework.web.servlet.DispatcherServlet: DispatcherServlet with name 'front' processing POST request for [/clouds/customer/getcustomerdata.shtml] DEBUG 2015-12-09 15:07:16,196 org.springframework.web.servlet.handler.AbstractUrlHandlerMapping: Mapping [/customer/getcustomerdata.shtml] to HandlerExecutionChain with handler [com.shiquanjiumei.controller.interfaces.CustomerDataController@58c8b978] and 1 interceptor DEBUG 2015-12-09 15:07:16,197 org.springframework.web.bind.annotation.support.HandlerMethodInvoker: Invoking request handler method: public void com.shiquanjiumei.controller.interfaces.CustomerDataController.getCustomerMessage(java.lang.String,java.lang.String,javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) throws java.net.UnknownHostException DEBUG 2015-12-09 15:07:16,197 org.springframework.beans.factory.support.AbstractBeanFactory: Returning cached instance of singleton bean 'transactionManager' DEBUG 2015-12-09 15:07:16,197 org.springframework.transaction.support.AbstractPlatformTransactionManager: Creating new transaction with name [com.shiquanjiumei.service.impl.BaseSetingServiceImpl.getCustomerMessage]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; '' DEBUG 2015-12-09 15:07:16,197 com.mchange.v2.resourcepool.BasicResourcePool: trace com.mchange.v2.resourcepool.BasicResourcePool@1db6ed76 [managed: 6, unused: 5, excluded: 0] (e.g. com.mchange.v2.c3p0.impl.NewPooledConnection@20f78840) DEBUG 2015-12-09 15:07:16,197 org.springframework.jdbc.datasource.DataSourceTransactionManager: Acquired Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@1fc9a6ad] for JDBC transaction DEBUG 2015-12-09 15:07:16,197 org.springframework.jdbc.datasource.DataSourceTransactionManager: Switching JDBC Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@1fc9a6ad] to manual commit DEBUG 2015-12-09 15:07:16,198 org.mybatis.spring.SqlSessionUtils: Creating a new SqlSession DEBUG 2015-12-09 15:07:16,198 org.mybatis.spring.SqlSessionUtils: Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6fddb828] DEBUG 2015-12-09 15:07:16,198 org.mybatis.spring.transaction.SpringManagedTransaction: JDBC Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@1fc9a6ad] will be managed by Spring DEBUG 2015-12-09 15:07:16,199 org.apache.ibatis.logging.jdbc.BaseJdbcLogger: ==> Preparing: select * from t_customer where registerTime between ? and ? DEBUG 2015-12-09 15:07:16,199 org.apache.ibatis.logging.jdbc.BaseJdbcLogger: ==> Parameters: 2015-12-09 03:07:30(String), 2015-12-09 03:07:40(String) DEBUG 2015-12-09 15:07:16,200 org.apache.ibatis.logging.jdbc.BaseJdbcLogger: <== Total: 0 DEBUG 2015-12-09 15:07:16,200 org.mybatis.spring.SqlSessionUtils: Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6fddb828] DEBUG 2015-12-09 15:07:16,200 org.mybatis.spring.SqlSessionUtils$SqlSessionSynchronization: Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6fddb828] DEBUG 2015-12-09 15:07:16,200 org.mybatis.spring.SqlSessionUtils$SqlSessionSynchronization: Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6fddb828] DEBUG 2015-12-09 15:07:16,200 org.mybatis.spring.SqlSessionUtils$SqlSessionSynchronization: Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6fddb828] DEBUG 2015-12-09 15:07:16,200 org.springframework.transaction.support.AbstractPlatformTransactionManager: Initiating transaction commit DEBUG 2015-12-09 15:07:16,200 org.springframework.jdbc.datasource.DataSourceTransactionManager: Committing JDBC transaction on Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@1fc9a6ad] DEBUG 2015-12-09 15:07:16,201 org.springframework.jdbc.datasource.DataSourceTransactionManager: Releasing JDBC Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@1fc9a6ad] after transaction DEBUG 2015-12-09 15:07:16,201 org.springframework.jdbc.datasource.DataSourceUtils: Returning JDBC Connection to DataSource DEBUG 2015-12-09 15:07:16,201 com.mchange.v2.resourcepool.BasicResourcePool: trace com.mchange.v2.resourcepool.BasicResourcePool@1db6ed76 [managed: 6, unused: 5, excluded: 0] (e.g. com.mchange.v2.c3p0.impl.NewPooledConnection@20f78840) DEBUG 2015-12-09 15:07:16,201 org.springframework.web.servlet.DispatcherServlet: Null ModelAndView returned to DispatcherServlet with name 'front': assuming HandlerAdapter completed request handling DEBUG 2015-12-09 15:07:16,202 org.springframework.web.servlet.FrameworkServlet: Successfully completed request DEBUG 2015-12-09 15:07:16,566 org.apache.activemq.transport.failover.FailoverTransport: Attempting 8th connect to: tcp://127.0.0.1:61618

但是当我查看后台或者运行接受Receiveder代码的时候,原来的消息丢失了。这怎么可以,这还怎么玩。 
测试ActiveMQ主从复制

其实问题在Sender的代码中 
将NON_PERSISTENT 改为PERSISTENT

// 设置不持久化,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.PERSISTENT);

在运行一遍测试流程2,发现及时当时收到消息的是server1 ,你讲server1 kill 掉后,用Recevieder的代码从新的broker中接受依旧能接受到,原因就是他进行了LevelDB本地存贮,这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储存形式,但是它提供比KahaDB更快的持久性。

以上是测试ActiveMQ主从复制的全部内容,在云栖社区的博客、问答、公众号、人物、课程等栏目也有测试ActiveMQ主从复制的相关内容,欢迎继续使用右上角搜索按钮进行搜索activemq ,以便于您获取更多的相关知识。

 

分享到:
评论

相关推荐

    Zookeeper+ActiveMQ测试.rar

    同时,"ZooInspector-zookeeper+activemq主从查看.rar"可能是ZooKeeper的可视化工具ZooInspector,它可以帮助我们更直观地观察和理解ZooKeeper的数据结构和节点状态,对于调试和监控集群状态非常有帮助。

    activeMQ static broker测试

    这可能涉及到`master-slave`配置或`shared store`模式,前者通过主从复制实现,后者则允许所有Broker共享同一存储,实现数据同步。 3. **故障检测与恢复**:在静态Broker集群中,ActiveMQ会持续监控集群中其他...

    ActiveMQ高可用+负载均衡集群的安装、配置、高可用测试

    2. **配置broker**:修改`conf/activemq.xml`配置文件,设置网络复制或主从模式。 3. **启动服务**:执行bin目录下的启动脚本,启动ActiveMQ服务。 4. **监控与管理**:通过Web Console(默认8161端口)查看和管理...

    ActiveMQ集群

    2. **主从复制**:一个主Broker接收和处理所有消息,备份Broker在主Broker故障时接管服务。这种模式提供了高可用性,但并不适合横向扩展。 3. **网络连接**:多个Broker通过网络连接器相互连接,形成一个集群,共同...

    activemq master-slave搭建的NFSV4文档

    为了实现高可用性和容错性,ActiveMQ 支持主从(master-slave)架构,这种架构可以通过复制数据来确保即使主节点故障,服务也能无缝切换到备用节点。在 ActiveMQ 5.8 版本中,使用 NFSV4(Network File System ...

    activemq集群方案.doc

    通过上述集群配置和测试,可以评估不同集群方案在性能、可靠性和资源利用率方面的表现,为实际生产环境选择最佳的ActiveMQ部署策略提供依据。同时,根据测试结果,可能需要调整配置参数,如内存限制、持久化策略等,...

    4.1 ActiveMQ的三种集群模式1

    主从切换测试包括检查当主节点故障时,slave节点能否自动接管并继续提供服务,以及在主节点恢复后,数据的一致性是否得到保持。 总结来说,ActiveMQ的基于共享文件的集群模式提供了高可用性的解决方案,通过共享...

    高可用之ActiveMQ集群:网络连接模式(network connector)详解.docx

    2. **主从复制**:一个主 broker 处理所有消息,而其他从 broker 实时复制主 broker 的状态,以便在主 broker 故障时接管服务。 3. **网络连接模式**:多个 broker 实例通过 network connector 连接,形成动态的、...

    Java高级技术路线1

    Redis支持持久化(AOF和RDB),主从复制,以及通过Jedis等客户端进行操作。 - **MongoDB**:是一个文档型数据库,以JSON-like文档存储数据,支持动态查询和索引。通过Robomongo等客户端进行操作,实现主从复制和分...

    阿里p9高并发系统设计手册20211108.zip

    2. 数据库读写分离:实现主从复制,减轻主库压力。 3. 分库分表:ShardingSphere、MyCat等分库分表中间件的使用与实践。 四、缓存技术 1. Redis与Memcached:对比两者的特性和应用场景,讲解缓存穿透、缓存雪崩和...

    Java开发工程师简历模板(三十六)

    - 熟练使用MySQL数据库,具备SQL语句优化能力,并了解MySQL的主从复制和读写分离,这有助于提升数据库性能和高可用性。 - 熟悉Lucene/Solr全文检索技术,能够实现高效的搜索功能,还掌握了ActiveMQ消息中间件,可...

    大型网站系统java控件

    这涉及到分布式缓存(如Redis、Memcached)、分布式数据库(如MySQL的主从复制、分片集群)、分布式文件系统(如Hadoop HDFS)以及分布式任务调度(如Quartz、Celery)。 4. **Java并发编程**:在处理大量用户请求...

    java2年工作经验简历_java简历.doc

    4. **数据库管理**:他对MySQL数据库有深入实践,能够编写和优化SQL语句,同时了解MySQL的主从复制和读写分离策略,这有助于提高数据库系统的可用性和扩展性。 5. **中间件与搜索引擎**:高亚林使用过Lucene/Solr...

    针对java、mysql、redis、消息队列开发的一款面经、题型的应试网站

    面试时会测试对键值对数据结构的理解(如字符串、哈希、列表、集合、有序集合),以及Redis持久化机制(RDB和AOF)、事务、发布订阅、主从复制和哨兵系统等知识。 4. **消息队列(MQ)**: 消息队列用于解耦系统组件...

    各个大厂面试题汇总.zip

    - 数据库设计:范式理论、数据库优化、读写分离、主从复制。 8. **框架与中间件**: - Spring全家桶:Spring Core、AOP、IOC、MVC、Boot、Cloud。 - MyBatis:动态SQL、Mapper接口、事务管理。 - Redis、...

    java简历模版1.docx

    - **MySQL、Oracle**:熟练掌握关系型数据库,包括SQL语句优化,MySQL的主从复制和读写分离。 - **MongoDB、Redis**:非关系型数据库的使用,特别是Redis的持久化策略和集群搭建,以及解决缓存问题的方法。 **...

    【白雪红叶】JAVA学习技术栈梳理思维导图.xmind

    ActiveMQ 常用开源框架 Spring Spring MVC Spring WebFlow spring tx aop ioc Struts ibatis Mybatis CAS Dubbo 工作能力 软实力 应急能力 创新能力 管理能力 分享能力 学习能力 沟通能力 ...

    13期RocketMQ讲义(1).pdf

    - **高可用机制**:涉及主从复制、消息投递的高可用性。 - **消息投递机制**:详细解释了生产者和消费者的投递策略。 - **消息重试**:处理消息消费失败的策略,包括顺序和无序消息。 - **死信队列**:处理无法...

    miaosha

    2. **分布式系统**:大型秒杀系统往往采用分布式架构,包括分布式缓存(如Redis)、分布式数据库(如MySQL的主从复制)和负载均衡器(如Nginx)。Java的`RMI`(远程方法调用)和`JMS`(Java消息服务)可以支持跨节点...

Global site tag (gtag.js) - Google Analytics