`
thinkerAndThinker
  • 浏览: 284850 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ几种模式总结

 
阅读更多

原文地址:http://blog.csdn.net/czp11210/article/details/8822070

Queue(点到点)模式

在点对点的传输方式中,消息数据被持久化,每条消息都能被消费,没有监听QUEUE地址也能被消费,数据不会丢失,一对一的发布接受策略,保证数据完整。

创建MAVEN项目

点击下一步,填写grupId为me.czp,artifactId为example-mq,name为exampleMq,点完成

然后在Package Explorer看到example-mq项目,如下:

创建生产者

[java] view plain copy
 
  1. package mq.p2p;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.Connection;  
  6.   
  7. import javax.jms.ConnectionFactory;  
  8.   
  9. import javax.jms.Destination;  
  10.   
  11. import javax.jms.JMSException;  
  12.   
  13. import javax.jms.MapMessage;  
  14.   
  15. import javax.jms.MessageProducer;  
  16.   
  17. import javax.jms.Session;  
  18.   
  19. import org.apache.activemq.ActiveMQConnection;  
  20.   
  21. importorg.apache.activemq.ActiveMQConnectionFactory;  
  22.   
  23. public class Producer {  
  24.   
  25.          publicstatic void main(String[] args) {  
  26.   
  27.                    Stringuser = ActiveMQConnection.DEFAULT_USER;  
  28.   
  29.                    Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;  
  30.   
  31.                    Stringurl = ActiveMQConnection.DEFAULT_BROKER_URL;  
  32.   
  33.                    Stringsubject = "test.queue";  
  34.   
  35.                    ConnectionFactorycontectionFactory = new ActiveMQConnectionFactory( user, password, url);  
  36.   
  37.                    try{  
  38.   
  39.                             Connectionconnection = contectionFactory.createConnection();  
  40.   
  41.                             connection.start();  
  42.   
  43.                             Sessionsession = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
  44.   
  45.                             Destinationdestination = session.createQueue(subject);  
  46.   
  47.                             MessageProducerproducer = session.createProducer(destination);  
  48.   
  49.                             for(int i = 0; i <= 20; i++) {  
  50.   
  51.                                      MapMessagemessage = session.createMapMessage();  
  52.   
  53.                                      Datedate = new Date();  
  54.   
  55.                                      message.setLong("count",date.getTime());  
  56.   
  57.                                      Thread.sleep(1000);  
  58.   
  59.                                      producer.send(message);  
  60.   
  61.                                      System.out.println("--发送消息:" +date);  
  62.   
  63.                             }  
  64.   
  65.                             Thread.sleep(2000);  
  66.   
  67.                             session.commit();  
  68.   
  69.                             session.close();  
  70.   
  71.                             connection.close();  
  72.   
  73.                    }catch (JMSException e) {  
  74.   
  75.                             e.printStackTrace();  
  76.   
  77.                    }catch (InterruptedException e) {  
  78.   
  79.                             e.printStackTrace();  
  80.   
  81.                    }  
  82.   
  83.          }  
  84.   
  85. }  



创建消费者

[java] view plain copy
 
  1. package mq.p2p;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.Connection;  
  6.   
  7. import javax.jms.ConnectionFactory;  
  8.   
  9. import javax.jms.Destination;  
  10.   
  11. import javax.jms.JMSException;  
  12.   
  13. import javax.jms.MapMessage;  
  14.   
  15. import javax.jms.Message;  
  16.   
  17. import javax.jms.MessageConsumer;  
  18.   
  19. import javax.jms.MessageListener;  
  20.   
  21. import javax.jms.Session;  
  22.   
  23. import org.apache.activemq.ActiveMQConnection;  
  24.   
  25. import org.apache.activemq.ActiveMQConnectionFactory;  
  26.   
  27.    
  28.   
  29. public class Customer {  
  30.   
  31.     public static void main(String[] args) {  
  32.   
  33.         Stringuser = ActiveMQConnection.DEFAULT_USER;  
  34.   
  35.         Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;  
  36.   
  37.         Stringurl = ActiveMQConnection.DEFAULT_BROKER_URL;  
  38.   
  39.         Stringsubject = "test.queue";  
  40.   
  41.         ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( user, password, url);  
  42.   
  43.         Connectionconnection;  
  44.   
  45.         try {  
  46.   
  47.             connection= connectionFactory.createConnection();  
  48.   
  49.             connection.start();  
  50.   
  51.             final Session session =connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
  52.   
  53.             Destinationdestination = session.createQueue(subject);  
  54.   
  55.             MessageConsumermessage = session.createConsumer(destination);  
  56.   
  57.             message.setMessageListener(new MessageListener() {  
  58.   
  59.                 publicvoid onMessage(Message msg){  
  60.   
  61.                     MapMessagemessage = (MapMessage) msg;  
  62.   
  63.                     try {  
  64.   
  65.                         System.out.println("--收到消息:" +new Date());  
  66.   
  67.                         session.commit();  
  68.   
  69.                     }catch(JMSException e) {  
  70.   
  71.                         e.printStackTrace();  
  72.   
  73.                     }  
  74.   
  75.                 }  
  76.   
  77.             });  
  78.   
  79.             Thread.sleep(30000);  
  80.   
  81.             session.close();  
  82.   
  83.             Thread.sleep(30000);  
  84.   
  85.             connection.close();  
  86.   
  87.             Thread.sleep(30000);  
  88.   
  89.         }catch(JMSException e) {  
  90.   
  91.             e.printStackTrace();  
  92.   
  93.         }catch(InterruptedException e) {  
  94.   
  95.             e.printStackTrace();  
  96.   
  97.         }  
  98.   
  99.     }  
  100.   
  101. }  



 

[java] view plain copy
 
  1. package mq.p2p;  
  2.   
  3.    
  4.   
  5. import java.util.Date;  
  6.   
  7. import javax.jms.Connection;  
  8.   
  9. import javax.jms.ConnectionFactory;  
  10.   
  11. import javax.jms.Destination;  
  12.   
  13. import javax.jms.JMSException;  
  14.   
  15. import javax.jms.MapMessage;  
  16.   
  17. import javax.jms.Message;  
  18.   
  19. import javax.jms.MessageConsumer;  
  20.   
  21. import javax.jms.MessageListener;  
  22.   
  23. import javax.jms.Session;  
  24.   
  25. import org.apache.activemq.ActiveMQConnection;  
  26.   
  27. import org.apache.activemq.ActiveMQConnectionFactory;  
  28.   
  29.    
  30.   
  31. /* 
  32.  
  33.  *第二个消费者 
  34.  
  35.  */  
  36.   
  37. public class Customer2 {  
  38.   
  39.     publicstatic void main(String[] args) {  
  40.   
  41.         Stringuser = ActiveMQConnection.DEFAULT_USER;  
  42.   
  43.         Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;  
  44.   
  45.         Stringurl = ActiveMQConnection.DEFAULT_BROKER_URL;  
  46.   
  47.         Stringsubject = "test.queue";  
  48.   
  49.         ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( user, password, url);  
  50.   
  51.         Connectionconnection;  
  52.   
  53.         try{  
  54.   
  55.             connection= connectionFactory.createConnection();  
  56.   
  57.             connection.start();  
  58.   
  59.             finalSession session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
  60.   
  61.             Destinationdestination = session.createQueue(subject);  
  62.   
  63.             MessageConsumermessage = session.createConsumer(destination);  
  64.   
  65.             message.setMessageListener(newMessageListener() {  
  66.   
  67.                 publicvoid onMessage(Message msg) {  
  68.   
  69.                     MapMessagemessage = (MapMessage) msg;  
  70.   
  71.                     try{  
  72.   
  73.                         System.out.println("--收到消息2:"new Date(message.getLong("count")));  
  74.   
  75.                         session.commit();  
  76.   
  77.                     }catch (JMSException e) {  
  78.   
  79.                         e.printStackTrace();  
  80.   
  81.                     }  
  82.   
  83.                 }  
  84.   
  85.             });  
  86.   
  87.             Thread.sleep(30000);  
  88.   
  89.             session.close();  
  90.   
  91.             Thread.sleep(30000);  
  92.   
  93.             connection.close();  
  94.   
  95.             Thread.sleep(30000);  
  96.   
  97.         }catch (JMSException e) {  
  98.   
  99.             e.printStackTrace();  
  100.   
  101.         }catch (InterruptedException e) {  
  102.   
  103.             e.printStackTrace();  
  104.   
  105.         }  
  106.   
  107.     }  
  108.   
  109. }  

 

 

学习体会

1.      运行结果,一个消息只能由一个消费者消费,不能同时被多个消费者获取

2.      测试表明,消息消费者消费消息时机是队列没有被消息提供者锁住,也就是说只有消息提供者执行了session.close()后消费者才会执行onMessage()方法

3.      启动顺序没有要求,可以先启动消费者,再启动提供者,也可以先启动提供者,然后再启动消费者

Topic(发布/订阅)模式

创建生产者

[java] view plain copy
 
  1. package mq.topic;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.Connection;  
  6.   
  7. import javax.jms.Destination;  
  8.   
  9. import javax.jms.MapMessage;  
  10.   
  11. import javax.jms.MessageProducer;  
  12.   
  13. import javax.jms.Session;  
  14.   
  15. import org.apache.activemq.ActiveMQConnection;  
  16.   
  17. import org.apache.activemq.ActiveMQConnectionFactory;  
  18.   
  19. public classPublisher {  
  20.   
  21.     public static void main(String[] arg){  
  22.   
  23.         Stringuser = ActiveMQConnection.DEFAULT_USER;  
  24.   
  25.         Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;  
  26.   
  27.         Stringurl = "tcp://localhost:61616";  
  28.   
  29.         Stringsubject = "mq.topic";  
  30.   
  31.         ActiveMQConnectionFactory amcf = newActiveMQConnectionFactory(user, password, url);  
  32.   
  33.         try {  
  34.   
  35.             Connectionconn = amcf.createConnection();  
  36.   
  37.             conn.start();  
  38.   
  39.             Sessionsession = conn.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
  40.   
  41.             Destinationd = session.createTopic(subject);  
  42.   
  43.             MessageProducerproducer = session.createProducer(d);  
  44.   
  45.             for (int i = 0; i <= 20; i++){  
  46.   
  47.                 MapMessagemessage = session.createMapMessage();  
  48.   
  49.                 Datedate = newDate();  
  50.   
  51.                 message.setLong("count",date.getTime());  
  52.   
  53.                 Thread.sleep(1000);  
  54.   
  55.                 producer.send(message);  
  56.   
  57.                 System.out.println("--发送消息:" + date);  
  58.   
  59.             }  
  60.   
  61.             session.commit();  
  62.   
  63.             session.close();  
  64.   
  65.             conn.close();  
  66.   
  67.         }catch(Exception e) {  
  68.   
  69.             e.printStackTrace();  
  70.   
  71.         }  
  72.   
  73.     }  
  74.   
  75. }  



 

创建消费者

[java] view plain copy
 
  1. package mq.topic;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.Connection;  
  6.   
  7. import javax.jms.JMSException;  
  8.   
  9. import javax.jms.MapMessage;  
  10.   
  11. import javax.jms.Message;  
  12.   
  13. import javax.jms.MessageConsumer;  
  14.   
  15. import javax.jms.MessageListener;  
  16.   
  17. import javax.jms.Session;  
  18.   
  19. import javax.jms.Topic;  
  20.   
  21. import org.apache.activemq.ActiveMQConnection;  
  22.   
  23. import org.apache.activemq.ActiveMQConnectionFactory;  
  24.   
  25. public class SubscriberFirst {  
  26.   
  27.     public static void main(String[] args) {  
  28.   
  29.         Stringuser = ActiveMQConnection.DEFAULT_USER;  
  30.   
  31.         Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;  
  32.   
  33.         Stringurl = "tcp://localhost:61616";  
  34.   
  35.         Stringsubject = "mq.topic";  
  36.   
  37.         ActiveMQConnectionFactoryfactory = newActiveMQConnectionFactory(user, password, url);  
  38.   
  39.         Connectionconnection;  
  40.   
  41.         try {  
  42.   
  43.             connection= factory.createConnection();  
  44.   
  45.             connection.start();  
  46.   
  47.             final Session session =connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
  48.   
  49.             Topictopic = session.createTopic(subject);  
  50.   
  51.             MessageConsumerconsumer = session.createConsumer(topic);  
  52.   
  53.             consumer.setMessageListener(new MessageListener() {  
  54.   
  55.                 publicvoid onMessage(Message msg){  
  56.   
  57.                     MapMessagemessage = (MapMessage) msg;  
  58.   
  59.                     try {  
  60.   
  61.                         System.out.println("--订阅者一收到消息:" +new Date(message.getLong("count")));  
  62.   
  63.                         session.commit();  
  64.   
  65.                     }catch(JMSException e) {  
  66.   
  67.                         e.printStackTrace();  
  68.   
  69.                     }  
  70.   
  71.                 }  
  72.   
  73.             });  
  74.   
  75.         }catch(JMSException e) {  
  76.   
  77.             e.printStackTrace();  
  78.   
  79.         }  
  80.   
  81.     }  
  82.   
  83. }  

 

[java] view plain copy
 
  1. package mq.topic;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.Connection;  
  6.   
  7. import javax.jms.JMSException;  
  8.   
  9. import javax.jms.MapMessage;  
  10.   
  11. import javax.jms.Message;  
  12.   
  13. import javax.jms.MessageConsumer;  
  14.   
  15. import javax.jms.MessageListener;  
  16.   
  17. import javax.jms.Session;  
  18.   
  19. import javax.jms.Topic;  
  20.   
  21. importorg.apache.activemq.ActiveMQConnection;  
  22.   
  23. importorg.apache.activemq.ActiveMQConnectionFactory;  
  24.   
  25. public class SubscriberSecond {  
  26.   
  27.    
  28.   
  29.          publicstatic void main(String[] args) {  
  30.   
  31.                    Stringuser = ActiveMQConnection.DEFAULT_USER;  
  32.   
  33.                    Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;  
  34.   
  35.                    Stringurl = "tcp://localhost:61616";  
  36.   
  37.                    Stringsubject = "mq.topic";  
  38.   
  39.                    ActiveMQConnectionFactoryfactory = new ActiveMQConnectionFactory(user, password, url);  
  40.   
  41.                    Connectionconnection;  
  42.   
  43.                    try{  
  44.   
  45.                             connection= factory.createConnection();  
  46.   
  47.                             connection.start();  
  48.   
  49.                             finalSession session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
  50.   
  51.                             Topictopic = session.createTopic(subject);  
  52.   
  53.                             MessageConsumerconsumer = session.createConsumer(topic);  
  54.   
  55.                             consumer.setMessageListener(newMessageListener() {  
  56.   
  57.                                      publicvoid onMessage(Message msg) {  
  58.   
  59.                                                MapMessagemessage = (MapMessage) msg;  
  60.   
  61.                                                try{  
  62.   
  63.                                                         System.out.println("--订阅者二收到消息:"new Date(message.getLong("count")));  
  64.   
  65.                                                         session.commit();  
  66.   
  67.                                                }catch (JMSException e) {  
  68.   
  69.                                                         e.printStackTrace();  
  70.   
  71.                                                }  
  72.   
  73.                                      }  
  74.   
  75.                             });  
  76.   
  77.                    }catch (JMSException e) {  
  78.   
  79.                             e.printStackTrace();  
  80.   
  81.                    }  
  82.   
  83.          }  
  84.   
  85. }  

 

学习体会

1.生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。

2.订阅者一和订阅者二都能收到一样的消息,也就是说在发布订阅模式下,一份消息可以被多个消费者消费

消息相关概念

消息类型

JMS 消息由以下三部分组成:

 消息头。每个消息头字段都有相应的getter 和setter 方法。

 消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。

 消息体。消息主题内容,JMS定义消息包括TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage类型

消息确认

JMS 消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通

常包含三个阶段:客户接收消息、客户处理消息和消息被确认。

在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会

话中,消息何时被确认取决于创建会话时的应答模式(acknowledgementmode)。

该参数有以下三个可选值:

Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,

或者从MessageListener.onMessage 方法成功返回的时候,会话自动确认

客户收到的消息。

 Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge 方法确认消

息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被

消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消

费者消费了10 个消息,然后确认第5 个消息,那么所有10 个消息都被确

认。

 Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝的确认消息的提交。如

果JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的

消息,那么JMS provider 必须把消息头的JMSRedelivered 字段设置为

true。

消息持久化

AMQ Message Store

AMQ Message Store 是ActiveMQ5.0缺省的持久化存储。Message commands 被

保存到transactional journal(由rolling data logs 组成)。Messages 被保

存到data logs 中,同时被reference store 进行索引以提高存取速度。Date logs

由一些单独的data log 文件组成,缺省的文件大小是32M,如果某个消息的大

小超过了data log 文件的大小,那么可以修改配置以增加data log 文件的大小。

Generated by FoxitPDF Creator © Foxit Software

http://www.foxitsoftware.comFor evaluation only.

如果某个data log 文件中所有的消息都被成功消费了,那么这个data log 文件

将会被标记,以便在下一轮的清理中被删除或者归档。以下是其配置的一个例子:

Xml 代码

[html] view plain copy
 
  1. <brokerbrokerNamebrokerbrokerName="broker" persistent="true"useShutdownHook="  
  2.   
  3. false">  
  4.   
  5. <persistenceAdapter>  
  6.   
  7.  <amqPersistenceAdapterdirectoryamqPersistenceAdapterdirectory="${activemq.base}/data" m  
  8.   
  9. axFileLength="32mb"/>  
  10.   
  11. </persistenceAdapter>  
  12.   
  13. </broker>  



Kaha Persistence

Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息

使用模式进行了优化。在Kaha 中,数据被追加到data logs 中。当不再需要log

文件中的数据的时候,log 文件会被丢弃。以下是其配置的一个例子:

Xml 代码

[html] view plain copy
 
  1. <brokerbrokerNamebrokerbrokerName="broker" persistent="true"useShutdownHook="  
  2.   
  3. false">  
  4.   
  5.  <persistenceAdapter>  
  6.   
  7. <kahaPersistenceAdapterdirectorykahaPersistenceAdapterdirectory="activemq-data" maxDa  
  8.   
  9. taFileLength="33554432"/>  
  10.   
  11.  </persistenceAdapter>  
  12.   
  13.  </broker>  



JDBC Persistence

 

目前支持的数据库有Apache Derby,Axion, DB2, HSQL, Informix, MaxDB,

MySQLOracle,Postgresql, SQLServer, Sybase。

如果你使用的数据库不被支持,那么可以调整StatementProvider来保证使

用正确的SQL 方言(flavour of SQL)。通常绝大多数数据库支持以下adaptor:

org.activemq.store.jdbc.adapter.BlobJDBCAdapter

org.activemq.store.jdbc.adapter.BytesJDBCAdapter

org.activemq.store.jdbc.adapter.DefaultJDBCAdapter

 org.activemq.store.jdbc.adapter.ImageJDBCAdapter

Generated by FoxitPDF Creator © Foxit Software

http://www.foxitsoftware.comFor evaluation only.

也可以在配置文件中直接指定JDBC adaptor,例如:

Xml 代码

 

[html] view plain copy
 
  1. <jdbcPersistenceAdapter adapterClass="org.apache.activemq.store.jdbc.adapter.ImageBasedJDBCAdaptor"/>  



 

以下是其配置的一个例子:

Xml 代码

[html] view plain copy
 
  1.  <persistence>  
  2.   
  3.  <jdbcPersistence dataSourceRef="mysql-ds"/>  
  4.   
  5.  </persistence>  
  6.   
  7.  <bean id="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">  
  8.   
  9.  <property name="driverClassName"value="com.mysql.jdbc.Driver"/>  
  10.   
  11.  <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>  
  12.   
  13. <propertynamepropertyname="username" value="activemq"/>  
  14.   
  15.  <property name="password"value="activemq"/>  
  16.   
  17.  <propertynamepropertyname="poolPreparedStatements" value="true"/>  
  18.   
  19. </bean>  



需要注意的是,如果使用mysql,那么需要设置relaxAutoCommit 标志为true。

ActiveMQ集群

Broker集群

一个常见的场景是有多个 JMS broker ,有一个客户连接到其中一个 broker 。如果这个 broker 失效,那么客户会自动重新连接到其它的 broker 。在 ActiveMQ 中使用 failover:// 协议来实现这个功能。如果某个网络上有多个 brokers 而且客户使用静态发现(使用 Static Transport  Failover Transport )或动态发现(使用 Discovery Transport ),那么客户可以容易地在某个 broker 失效的情况下切换到其它的 brokers 。当有多个broker 时,如果某个 broker 上没有 consumers ,那么这个 broker 上的消息可能会因得不到处理而积压起来。目前的解决方案是使用 Network of brokers ,以便在 broker 之间存储转发消息。

 

Static-broker1的配置

[html] view plain copy
 
  1. <broker   xmlns="http://activemq.apache.org/schema/core"brokerName="static-broker1"dataDirectory="${activemq.base}/data">   
  2.   
  3.       <networkConnectors>  
  4.   
  5.                     <networkConnectorurinetworkConnectoruri="static:(tcp://localhost:7777)"  
  6.   
  7.                           name="bridge"  
  8.   
  9.                           dynamicOnly="false"  
  10.   
  11.                           conduitSubscriptions="true"  
  12.   
  13.                            decreaseNetworkConsumerPriority="false">  
  14.   
  15.                     </networkConnector>  
  16.   
  17.        </networkConnectors>  
  18.   
  19.       <persistenceAdapter>  
  20.   
  21.           <kahaDBdirectorykahaDBdirectory="${activemq.base}/data/static-broker1/kahadb" />  
  22.   
  23.       </persistenceAdapter>  
  24.   
  25.        <transportConnectors>  
  26.   
  27.           <transportConnector name="openwire" uri="tcp://0.0.0.0:6666"/>  
  28.   
  29.       </transportConnectors>         
  30.   
  31.   </broker>  



Static-broker2的配置

 

[html] view plain copy
 
  1. <broker  xmlns="http://activemq.apache.org/schema/core"  brokerName="static-broker2"dataDirectory="${activemq.base}/data">  
  2.   
  3.          <networkConnectors>  
  4.   
  5.                    <networkConnector uri="static:(tcp://localhost:6666)"  
  6.   
  7.                           name="bridge"  
  8.   
  9.                           dynamicOnly="false"  
  10.   
  11.                           conduitSubscriptions="true"  
  12.   
  13.                           decreaseNetworkConsumerPriority="false">  
  14.   
  15.                    </networkConnector>  
  16.   
  17.       </networkConnectors>  
  18.   
  19.        <persistenceAdapter>  
  20.   
  21.            <kahaDBdirectorykahaDBdirectory="${activemq.base}/data/static-broker2/kahadb" />  
  22.   
  23.        </persistenceAdapter>        
  24.   
  25.        <transportConnectors>  
  26.   
  27.            <transportConnectornametransportConnectorname="openwire" uri="tcp://0.0.0.0:7777"/>  
  28.   
  29.        </transportConnectors>        
  30.   
  31. lt;/broker>  



 

现在分别启动static-broker1与static-broker2两个消息中间实例

然后在在应用层编写一个消息提供者和一个消息消费者,提供者对static-broker1里的队列test.queue写5笔消息,然后消费者从static-broker2里面的队列test.queue读数据,测试证明是可以读到那五笔记录的

具体见sample-mq例子工程代码包mq.bloker.cluster下面的代码

 

学习体会

1.  networkConnectors网络链接并不是把static-broker1里面的队列消息复制给static-broker2,而是static-broker2上的消费者访问test.queue上的消息时,当static-broker2上面没有这个队列时才会去static-broker1里面去找test.queue上面的消息,

2. static-broker1队列里面的消息一旦被static-broker2上面消费者消费后,static-broker1上面的消费者就没有了,说明一个队列在集群里面消息只存在一份,没有任何副本消息

3. 当static-broker1与static-broker2上面都有队列test.queue,而且他们各自都有五条消息,在这时如果static-broker2有一个消费者去消费,它是先消费static-broker2上面五条,然后再消费static-broker1上面的五条

主从集群

Share Nothing Master/Slave

  Master 收到消息后保证先转发给slave后再发给客户端,slave有自己独立存储, 在slave配置文件中需指定Master URI, 这样当Master宕掉后,客户端会Failover到Slave.

配置说明

Masterbloker无需其他配置,Slavebloker配置如下:

[html] view plain copy
 
  1. <services>   
  2.   
  3.   <masterConnector remoteURI="tcp://localhost:62001" userName="user"  
  4.   
  5. password="password"/>  
  6.   
  7. </services>  



其中如果broker无需认证的话,用户名、密码就可以不配置了

缺陷总结

1.Master bloker只能配置一个slaver bloker

2.如果主MQ发生故障,要手动停止所有服务,手动停止slave,复制slave数据文件到master,然后手动启动master

 

注:该特性在5.8版本废弃掉了,masterConnector属性已经无效

Attribute 'masterConnectorURI' is not allowed to appear in element'broker'.

     

Share Database Master/Slave

        和 Share File System Master/Slave 类似,只不过用数据库锁代替了共享文件系统锁。

 

1)文件配置

Activemq-jdbc-master.xml:

[html] view plain copy
 
  1. <brokerxmlnsbrokerxmlns="http://activemq.apache.org/schema/core"brokerName="static-broker1">  
  2.   
  3.          <persistenceAdapter>  
  4.   
  5.        <jdbcPersistenceAdapter dataDirectory="activemq-data"dataSource="#mysql-ds"/>  
  6.   
  7.    </persistenceAdapter>  
  8.   
  9.         <transportConnectors>  
  10.   
  11.            <transportConnector name="openwire"uri="tcp://0.0.0.0:6666"/>  
  12.   
  13.        </transportConnectors>         
  14.   
  15.    </broker>  
  16.   
  17.            <bean id="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">  
  18.   
  19.    <property name="driverClassName"value="com.mysql.jdbc.Driver"/>  
  20.   
  21.    <property name="url"value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>  
  22.   
  23.      <property name="username" value="root"/>  
  24.   
  25.    <property name="password" value="1229"/>  
  26.   
  27.    <property name="poolPreparedStatements"value="true"/>  
  28.   
  29.  </bean>  

 

Activemq-jdbc-slave.xml:

 

[html] view plain copy
 
  1. <broker  xmlns="http://activemq.apache.org/schema/core"brokerName="static-broker2"  >  
  2.   
  3.                   <persistenceAdapter>  
  4.   
  5.        <jdbcPersistenceAdapterdataDirectoryjdbcPersistenceAdapterdataDirectory="activemq-data" dataSource="#mysql-ds"/>  
  6.   
  7.    </persistenceAdapter>  
  8.   
  9.        <transportConnectors>  
  10.   
  11.            <transportConnectornametransportConnectorname="openwire" uri="tcp://0.0.0.0:7777"/>  
  12.   
  13.        </transportConnectors>        
  14.   
  15.    </broker>  
  16.   
  17.  <bean  id="mysql-ds"  class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">  
  18.   
  19.    <propertynamepropertyname="driverClassName" value="com.mysql.jdbc.Driver"/>  
  20.   
  21.    <property name="url"value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>  
  22.   
  23.    <property name="username"value="root"/>  
  24.   
  25.    <property name="password"value="1229"/>  
  26.   
  27.    <propertynamepropertyname="poolPreparedStatements" value="true"/>  
  28.   
  29.  </bean>  

 

2)加载数据库驱动

把mssql-jtds-1.2.jar与mysql-connector-Java-5.0.7-bin.jar数据库mysql的驱动包放入F:\apache-activemq-5.8.0\lib文件夹里

3)启动主从两实例MQ

注意:如果是第一次启动时,会在数据库里生成存储消息所需要的三张表

4)编写客户端

编写消息提供者-master节点写消息

[java] view plain copy
 
  1. package mq.broker.master.slave;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import javax.jms.Connection;  
  6.   
  7. import javax.jms.ConnectionFactory;  
  8.   
  9. import javax.jms.Destination;  
  10.   
  11. import javax.jms.JMSException;  
  12.   
  13. import javax.jms.MapMessage;  
  14.   
  15. import javax.jms.MessageProducer;  
  16.   
  17. import javax.jms.Session;  
  18.   
  19. import org.apache.activemq.ActiveMQConnection;  
  20.   
  21. importorg.apache.activemq.ActiveMQConnectionFactory;  
  22.   
  23. public class Producer {  
  24.   
  25.          publicstatic void main(String[] args) {  
  26.   
  27.                    Stringuser = ActiveMQConnection.DEFAULT_USER;  
  28.   
  29.                    Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;  
  30.   
  31.                    String url = "tcp://localhost:6666";  
  32.   
  33.                    Stringsubject = "test.queue";  
  34.   
  35.                    ConnectionFactorycontectionFactory = new ActiveMQConnectionFactory( user, password, url);  
  36.   
  37.                    try{  
  38.   
  39.                             Connectionconnection = contectionFactory.createConnection();  
  40.   
  41.                             connection.start();  
  42.   
  43.                             Sessionsession = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
  44.   
  45.                             Destinationdestination = session.createQueue(subject);  
  46.   
  47.                             MessageProducerproducer = session.createProducer(destination);  
  48.   
  49.                             for(int i = 0; i <= 5; i++) {  
  50.   
  51.                                      MapMessagemessage = session.createMapMessage();  
  52.   
  53.                                      Datedate = new Date();  
  54.   
  55.                                      message.setLong("count",date.getTime());  
  56.   
  57.                                      Thread.sleep(1000);  
  58.   
  59.                                      producer.send(message);  
  60.   
  61.                                      System.out.println("--发送消息:" +date);  
  62.   
  63.                             }  
  64.   
  65.                             session.commit();  
  66.   
  67.                             session.close();  
  68.   
  69.                             connection.close();  
  70.   
  71.                    }catch (JMSException e) {  
  72.   
  73.                             e.printStackTrace();  
  74.   
  75.                    }catch (InterruptedException e) {  
  76.   
  77.                             e.printStackTrace();  
  78.   
  79.                    }  
  80.   
  81.          }  
  82.   
  83. }  

 

编写消息消费者-slave节点拿消息进行消费

[java] view plain copy
 
  1. packagemq.broker.master.slave;  
  2.   
  3. importjava.util.Date;  
  4.   
  5. importjavax.jms.Connection;  
  6.   
  7. importjavax.jms.ConnectionFactory;  
  8.   
  9. importjavax.jms.Destination;  
  10.   
  11. importjavax.jms.JMSException;  
  12.   
  13. importjavax.jms.MapMessage;  
  14.   
  15. importjavax.jms.Message;  
  16.   
  17. importjavax.jms.MessageConsumer;  
  18.   
  19. importjavax.jms.MessageListener;  
  20.   
  21. importjavax.jms.Session;  
  22.   
  23. importorg.apache.activemq.ActiveMQConnection;  
  24.   
  25. importorg.apache.activemq.ActiveMQConnectionFactory;  
  26.   
  27. publicclass Customer {  
  28.   
  29.          public static void main(String[] args){  
  30.   
  31.                    String user =ActiveMQConnection.DEFAULT_USER;  
  32.   
  33.                    String password =ActiveMQConnection.DEFAULT_PASSWORD;  
  34.   
  35.                   String url= "failover:(tcp://localhost:7777)";  
  36.   
  37.                    String subject ="test.queue";  
  38.   
  39.                    ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( user, password, url);  
  40.   
  41.                    Connection connection;  
  42.   
  43.                    try {  
  44.   
  45.                             connection =connectionFactory.createConnection();  
  46.   
  47.                             connection.start();  
  48.   
  49.                             final Sessionsession = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
  50.   
  51.                             Destinationdestination = session.createQueue(subject);  
  52.   
  53.                             MessageConsumermessage = session.createConsumer(destination);  
  54.   
  55.                             message.setMessageListener(newMessageListener() {  
  56.   
  57.                                      public voidonMessage(Message msg) {  
  58.   
  59.                                                MapMessagemessage = (MapMessage) msg;  
  60.   
  61.                                                try{  
  62.   
  63.                                                         System.out.println("--收到消息:" + newDate(message.getLong("count")));  
  64.   
  65.                                                         session.commit();  
  66.   
  67.                                                }catch (JMSException e) {  
  68.   
  69.                                                         e.printStackTrace();  
  70.   
  71.                                                }  
  72.   
  73.                                      }  
  74.   
  75.                             });  
  76.   
  77.                             Thread.sleep(30000);  
  78.   
  79.                             session.close();  
  80.   
  81.                             Thread.sleep(30000);  
  82.   
  83.                             connection.close();  
  84.   
  85.                             Thread.sleep(30000);  
  86.   
  87.                    } catch (JMSException e) {  
  88.   
  89.                             e.printStackTrace();  
  90.   
  91.                    } catch (InterruptedExceptione) {  
  92.   
  93.                             e.printStackTrace();  
  94.   
  95.                    }  
  96.   
  97.          }  
  98.   
  99. }  



5)学习体会

 1.存储消息的三张数据库表是在MQ实例第一次启动时生成,而非是写消息进去时生成,从这意义上讲,算是个“饿模式”,呵呵

 2.从master节点写消息,然后从slave读消息,是读不到消息的,只有当把master节点给关闭后,slave里面才能读到消息

 Share File System Master/Slave

       没有物理上的Master, Slave之分, 只有逻辑上的Master 和Slave, Master与Slave共享一个存储文件, 谁最先启动就会持有共享文件锁,成为Master, 后启动的由于获取文件锁时会被阻塞,成为Slave,当Master宕掉时,共享文件锁被释放掉,Slave获取的文件锁后就成为Master, 客户端使用failover 协议后会failover到可用的broker上。

      

  后续完成,呵呵

具体见sample-mq例子工程

activeMQ性能优化、大数据量、安全控制后续补充了,现在没有时间,呵呵!

分享到:
评论

相关推荐

    activemq的几种基本通信方式总结

    总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式...

    activeMQ一个demo

    3. Topic和Queue:在ActiveMQ中,有两种主要的消息传输模式——主题(Topic)和队列(Queue)。主题适用于发布/订阅模式,多个消费者可以订阅同一个主题并接收消息;队列则遵循先进先出(FIFO)原则,一条消息只能被...

    ActiveMQ 消息队列

    JMS定义了几种常用的消息格式: - **StreamMessage**:用于发送原始数据流,如整型、浮点型等基本数据类型。 - **MapMessage**:用于发送键值对数据,类似于Map集合。 - **TextMessage**:用于发送文本字符串消息。...

    基于activemq实现Android推送(服务端+客户端)

    我们将重点关注MQTT(Message Queuing Telemetry Transport)协议,这是一种轻量级的发布/订阅模式的消息传输协议,特别适合于移动设备和低带宽、高延迟的网络环境。 首先,我们需要了解ActiveMQ的基本概念。...

    activeMQ 推送之mqtt客户端

    总结来说,了解 MQTT 协议的基本原理和 ActiveMQ 的 MQTT 支持是实现 MQTT 客户端的关键。通过正确配置 ActiveMQ 代理,并利用 MQTT 客户端库,可以有效地实现在物联网设备间或应用程序间的消息通信。

    消息队列介绍和SpringBoot2.x整合RockketMQ、ActiveMQ

    在SpringBoot中整合ActiveMQ,主要涉及以下几个步骤: 1. **添加依赖**:在`pom.xml`中添加ActiveMQ的Spring Boot Starter依赖。 2. **配置ActiveMQ**:在`application.properties`或`application.yml`中配置...

    JMS之Spring +activeMQ实现消息队列

    总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...

    基于ActiveMQ的消息总线逻辑与物理架构设计详解

    消息总线(Message Bus)是一种设计模式,用于模块间的通信,能够在不同组件间进行解耦合和信息传输。 ### 消息中间件的核心功能 消息中间件技术的核心功能包括异步处理和解耦。异步处理意味着发送者和接收者不...

    MQ:ActiveMQ的使用以及集群的配置等信息

    ActiveMQ的使用主要包括以下几个方面: 1. **安装与启动**:下载ActiveMQ的发行版,解压后通过命令行工具bin目录下的`start.sh`(Linux/Unix)或`start.bat`(Windows)启动服务。默认情况下,ActiveMQ会在8161端口...

    2.2 broker 启动方式1

    根据给定的信息,本文将详细解释 Apache ActiveMQ 的几种启动方式以及如何在 Java 应用程序中内嵌启动 ActiveMQ Broker。ActiveMQ 是一个开源的消息中间件,支持多种消息传递模式,广泛应用于分布式系统中。 ### 一...

    消息队列及中转软件总结汇编.docx

    本文将对几个常见的开源消息队列及中转软件进行概述,包括ZeroMQ、ActiveMQ、Redis、MongoDB和Memcached。 **ZeroMQ**,又称为ZMQ,是一款轻量级的消息队列库,它提供了一种套接字编程模型,允许应用程序通过队列...

    南航移动面试题总结

    **总结:** 设计模式是解决特定问题的最佳实践,除了上述几种外,还有许多其他模式如装饰模式、适配器模式、原型模式等,它们各自适用于不同的场景。 #### SOA(面向服务的架构) SOA是一种架构风格,旨在通过服务...

    MQTT协议通讯,支持JS、JAVA、微信小程序客户端

    此案例中提到了几种不同的客户端实现方式: 1. **JS客户端**: 使用Eclipse Paho提供的JavaScript客户端库。 - **库**: Eclipse Paho MQTT JavaScript Client Library - **官网**: ...

    企业号回调模式

    在企业级应用开发中,回调模式是一种常见的设计模式,它在异步操作或者分布式系统中发挥着关键作用。回调模式允许程序在特定事件发生时执行一段代码,而不是等待该事件完成后再进行下一步操作。这种模式在处理网络...

    西电分布式计算课程(PPT总结版)笔记

    文档重点介绍了几种通信技术: - **底层通信技术:** 包括TCP/UDP这样的点对点通信技术。 - **并发服务技术:** 如多线程和线程池等。 - **上层通信技术:** 比如基于消息中间件的通信技术。 **1.2 TCP/IP 与 ...

    开源项目研究与应用小结.pdf

    文档中提到了几种不同的技术栈,用于构建高并发的WEB应用。 ##### 1. 企业级、重型架构 - Java **技术栈**:Apache + Java/Tomcat + DB **特点**: - **优点**:成熟度高,拥有丰富的开源jar包资源。 - **缺点*...

    相关总结-消息队列_20181009.rar

    消息队列(Message Queue)是一种基于发布/订阅或生产/消费模式的组件,它作为不同系统之间通信的中介,允许服务之间通过发送和接收消息来交换数据,而不是直接调用彼此。这种模式可以有效地缓解高并发下的系统压力...

Global site tag (gtag.js) - Google Analytics