论坛首页 入门技术论坛

Spring-ActiveMQ的点对点和Topic

浏览 2296 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (1)
作者 正文
   发表时间:2013-09-29  
<pre name="code" class="pom.xml">
&lt;project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"&gt;
  &lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;

  &lt;groupId&gt;com.example.activemq&lt;/groupId&gt;
  &lt;artifactId&gt;activemq-test&lt;/artifactId&gt;
  &lt;version&gt;1.0-SNAPSHOT&lt;/version&gt;
  &lt;packaging&gt;jar&lt;/packaging&gt;

  &lt;name&gt;activemq-test&lt;/name&gt;
  &lt;url&gt;http://maven.apache.org&lt;/url&gt;

  &lt;properties&gt;
    &lt;project.build.sourceEncoding&gt;UTF-8&lt;/project.build.sourceEncoding&gt;
    &lt;org.springframework.version&gt;3.1.1.RELEASE&lt;/org.springframework.version&gt;
  &lt;/properties&gt;
 
&lt;repositories&gt;
  &lt;repository&gt;
&lt;id&gt;kxcomm-maven&lt;/id&gt;
&lt;name&gt;Maven kxcomm Repository&lt;/name&gt;
&lt;url&gt;http://122.13.0.56:8088/nexus/content/groups/public/&lt;/url&gt;
&lt;releases&gt;
&lt;enabled&gt;true&lt;/enabled&gt;
&lt;/releases&gt;
&lt;snapshots&gt;
&lt;enabled&gt;true&lt;/enabled&gt;
&lt;/snapshots&gt;
&lt;/repository&gt;  
&lt;/repositories&gt;

  &lt;dependencies&gt;
    &lt;dependency&gt;
      &lt;groupId&gt;junit&lt;/groupId&gt;
      &lt;artifactId&gt;junit&lt;/artifactId&gt;
      &lt;version&gt;3.8.1&lt;/version&gt;
      &lt;scope&gt;test&lt;/scope&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
&lt;groupId&gt;commons-collections&lt;/groupId&gt;
&lt;artifactId&gt;commons-collections&lt;/artifactId&gt;
&lt;version&gt;3.2&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;commons-configuration&lt;/groupId&gt;
&lt;artifactId&gt;commons-configuration&lt;/artifactId&gt;
&lt;version&gt;1.6&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;commons-io&lt;/groupId&gt;
&lt;artifactId&gt;commons-io&lt;/artifactId&gt;
&lt;version&gt;1.3.2&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;commons-logging&lt;/groupId&gt;
&lt;artifactId&gt;commons-logging&lt;/artifactId&gt;
&lt;version&gt;1.1.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;log4j&lt;/groupId&gt;
&lt;artifactId&gt;log4j&lt;/artifactId&gt;
&lt;version&gt;1.2.17&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;commons-beanutils&lt;/groupId&gt;
&lt;artifactId&gt;commons-beanutils&lt;/artifactId&gt;
&lt;version&gt;1.8.3&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework&lt;/groupId&gt;
&lt;artifactId&gt;spring-asm&lt;/artifactId&gt;
&lt;version&gt;${org.springframework.version}&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework&lt;/groupId&gt;
&lt;artifactId&gt;spring-jms&lt;/artifactId&gt;
&lt;version&gt;${org.springframework.version}&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.davidkarlsen.commonstransaction.spring&lt;/groupId&gt;
&lt;artifactId&gt;commons-transaction-spring&lt;/artifactId&gt;
&lt;version&gt;0.9&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.httpcomponents&lt;/groupId&gt;
&lt;artifactId&gt;httpclient&lt;/artifactId&gt;
&lt;version&gt;4.2-beta1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;fastutil&lt;/groupId&gt;
&lt;artifactId&gt;fastutil&lt;/artifactId&gt;
&lt;version&gt;5.0.9&lt;/version&gt;
&lt;/dependency&gt;

&lt;dependency&gt;
&lt;groupId&gt;ch.qos.logback&lt;/groupId&gt;
&lt;artifactId&gt;logback-classic&lt;/artifactId&gt;
&lt;version&gt;0.9.27&lt;/version&gt;
&lt;scope&gt;compile&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;ch.qos.logback&lt;/groupId&gt;
&lt;artifactId&gt;logback-core&lt;/artifactId&gt;
&lt;version&gt;0.9.27&lt;/version&gt;
&lt;scope&gt;compile&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.slf4j&lt;/groupId&gt;
&lt;artifactId&gt;slf4j-api&lt;/artifactId&gt;
&lt;version&gt;1.6.1&lt;/version&gt;
&lt;scope&gt;compile&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.slf4j&lt;/groupId&gt;
&lt;artifactId&gt;jcl-over-slf4j&lt;/artifactId&gt;
&lt;version&gt;1.6.1&lt;/version&gt;
&lt;scope&gt;runtime&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.slf4j&lt;/groupId&gt;
&lt;artifactId&gt;slf4j-log4j12&lt;/artifactId&gt;
&lt;version&gt;1.7.5&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.activemq&lt;/groupId&gt;
&lt;artifactId&gt;activemq-all&lt;/artifactId&gt;
&lt;version&gt;5.8.0&lt;/version&gt;
&lt;/dependency&gt;
  &lt;/dependencies&gt;
&lt;/project&gt;
</pre>

<pre name="code" class="applicationContext.xml">
&lt;?xml version="1.0" encoding="UTF-8"?&gt;
&lt;beans
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/aop
                http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
                http://www.springframework.org/schema/tx
                http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
                http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context-2.5.xsd"
                 default-autowire="byName" default-lazy-init="true"&gt;

&lt;import resource="activemq-test.xml"/&gt;

&lt;/beans&gt;
</pre>

<pre name="code" class="activemq-test.xml">
&lt;?xml version="1.0" encoding="UTF-8"?&gt;

&lt;beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"&gt;

&lt;!-- 创建工厂连接 --&gt;
&lt;bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"&gt;
&lt;property name="brokerURL" value="tcp://localhost:61616" /&gt;
&lt;/bean&gt;

&lt;bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"&gt;
&lt;property name="connectionFactory" ref="connectionFactory" /&gt;
&lt;property name="defaultDestination" ref="rantzDestination" /&gt; 
&lt;/bean&gt;

&lt;!-- Point-to-Point --&gt;
&lt;!-- activeMQ消息目标 队列 --&gt;
&lt;bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQQueue"&gt;
&lt;constructor-arg index="0" value="rantz.marketing.queue"&gt;&lt;/constructor-arg&gt;
&lt;/bean&gt;

&lt;!-- activeMQ消息目标 主题--&gt;
        &lt;!-- &lt;bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQTopic"&gt;--&gt;
        &lt;!-- &lt;constructor-arg index="0" value="rantz.marketing.queue"&gt;&lt;/constructor-arg&gt;--&gt;
        &lt;!-- &lt;/bean&gt;--&gt;
       
&lt;bean id="producer" class="activemq.test.p2p.producer.RantzMarketingGatewayImpl"&gt;
&lt;property name="jmsTemplate" ref="jmsTemplate" /&gt;
&lt;property name="destination" ref="rantzDestination" /&gt;
&lt;/bean&gt;
&lt;bean id="consumer" class="activemq.test.p2p.consumer.MarketingReceiverGatewayImpl"&gt; 
    &lt;property name="jmsTemplate" ref="jmsTemplate" /&gt; 
&lt;/bean&gt; 
&lt;!-- Point-to-Point End--&gt;


&lt;!-- Topic --&gt;
&lt;bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"&gt;
        &lt;constructor-arg index="0" value="kxcomm.mms.topic" /&gt;
    &lt;/bean&gt;
    &lt;bean id="control" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"&gt;
        &lt;constructor-arg index="0" value="kxcomm.mms.control" /&gt;
    &lt;/bean&gt;

&lt;bean id="myListener" class="activemq.test.topic.MyListener"&gt;
    &lt;property name="connectionFactory" ref="connectionFactory" /&gt;
    &lt;property name="topic" ref="topic" /&gt;
    &lt;property name="control" ref="control" /&gt;
    &lt;/bean&gt;
  
   &lt;bean id="myPublisher" class="activemq.test.topic.MyPublisher"&gt;
    &lt;property name="connectionFactory" ref="connectionFactory" /&gt;
    &lt;property name="topic" ref="topic" /&gt;
    &lt;property name="control" ref="control" /&gt;
    &lt;/bean&gt;
&lt;!-- Topic End--&gt;

&lt;/beans&gt;
</pre>

<pre name="code" class="java">
package activemq.test.model;

import java.io.Serializable;

public class User implements Serializable{
private static final long serialVersionUID = -3098636047897519268L;
private String name;
private String sex;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User [name=" + name + ", sex=" + sex + ", age=" + age + "]";
}

}
</pre>

PTP模型
PTP(Point-to-Point)模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包含各种消息,JMS Provider 提供工具管理队列的创建、删除。JMS PTP 模型定义了客户端如何向队列发送消息,从队列接收消息,浏览队列中的消息。
<pre name="code" class="java">
package activemq.test.p2p.consumer;

import org.springframework.jms.core.JmsTemplate;

import activemq.test.model.User;
public class MarketingReceiverGatewayImpl {

private JmsTemplate jmsTemplate;

public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

public MarketingReceiverGatewayImpl() {
}

public void receiveMotorist() throws Exception{
User message  = (User)jmsTemplate.receiveAndConvert();
System.out.println("reviced msg is:" + message.toString());
}

}
</pre>
<pre name="code" class="java">
package activemq.test.p2p.consumer;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartConsumer {
public static void main(String[] args) {
/*开始加载spring配置文件*/
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");
MarketingReceiverGatewayImpl rantzMarketingGateway= (MarketingReceiverGatewayImpl) context.getBean("consumer");
System.out.println("Receive Start ...");
try {
while(true){
rantzMarketingGateway.receiveMotorist();
}

} catch (Exception e) {
e.printStackTrace();
}
    }
}
</pre>
<pre name="code" class="java">
package activemq.test.p2p.producer;

public interface IRantzMarketingGateway {
/**
*
* 发送文本对象
*
* @author zhangjh 新增日期:2013-9-20
* @since smsc-gateway
*/
public void sendMotoristInfo();

/**
*
* 发送对象
*
* @author zhangjh 新增日期:2013-9-20
* @since smsc-gateway
*/
public void sendObjectInfo();
}
</pre>
<pre name="code" class="java">
package activemq.test.p2p.producer;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import activemq.test.model.User;

public class RantzMarketingGatewayImpl implements IRantzMarketingGateway {

private JmsTemplate jmsTemplate;
private Destination destination;

public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

public Destination getDestination() {
return destination;
}

public void setDestination(Destination destination) {
this.destination = destination;
}

public void sendMotoristInfo() {
MessageCreator msg = new MessageCreator(){
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("这是一个测试,"+System.currentTimeMillis());
}
};
jmsTemplate.send(destination, msg);
}


public void sendObjectInfo() {
User u = new User();
u.setAge(17);
u.setName("yuky"+System.currentTimeMillis());
u.setSex("女");
jmsTemplate.convertAndSend(u);

}
}
</pre>
<pre name="code" class="java">
package activemq.test.p2p.producer;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartProducer {
public static void main(String[] args) {
/*开始加载spring配置文件*/
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");
IRantzMarketingGateway rantzMarketingGateway= (RantzMarketingGatewayImpl) context.getBean("producer");
for(int i=0;i&lt;10;i++){
rantzMarketingGateway.sendObjectInfo();
System.out.println("Start ...");
}

}
}
</pre>

PUB/SUB模型
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。

<pre name="code" class="java">
package activemq.test.topic;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import activemq.test.model.User;

public class MyListener implements MessageListener {
private ActiveMQConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private MessageProducer producer;
private Topic topic;
    private Topic control;

public Topic getTopic() {
return topic;
}

public void setTopic(Topic topic) {
this.topic = topic;
}

public Topic getControl() {
return control;
}

public void setControl(Topic control) {
this.control = control;
}

public ActiveMQConnectionFactory getConnectionFactory() {
return connectionFactory;
}

public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

public void onMessage(Message message) {
try{
if (checkText(message, "SHUTDOWN")) {
            try {
                connection.close();
                System.out.println("退出监听消息");
            } catch (Exception e) {
                e.printStackTrace(System.out);
            }

        } else if (checkText(message, "REPORT")) {
            // send a report:
            try {
            System.out.println("MyListener-&gt;收到 a report");
                long time = System.currentTimeMillis();
                String msg = "MyListener-&gt;返回 a report :" + time + "ms";
                System.out.println(msg);
                producer.send(session.createTextMessage(msg));
            } catch (Exception e) {
                e.printStackTrace(System.out);
            }
        } else {
        ObjectMessage obj = (ObjectMessage)message;
        User u = (User) obj.getObject();
        System.out.println("Received  messages."+ u.toString());
        }
}catch(Exception e){

}
}

public void run() throws JMSException {
if(connectionFactory!=null){
System.out.println("connectionFactory is ok");
connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(this);
        connection.start();
       
        producer = session.createProducer(control);
        System.out.println("Waiting for messages...");
}
}

private static boolean checkText(Message m, String s) {
        try {
            return m instanceof TextMessage &amp;&amp; ((TextMessage)m).getText().equals(s);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return false;
        }
    }

}
</pre>
<pre name="code" class="java">
package activemq.test.topic;



import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartListener {

public static void main(String[] args) {
/*开始加载spring配置文件*/
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");
MyListener myListener= (MyListener) context.getBean("myListener");

try {
if(myListener!=null){
System.out.println("success...");
}
myListener.run();

} catch (Exception e) {
e.printStackTrace();
}
    }

}
</pre>
<pre name="code" class="java">
package activemq.test.topic;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import activemq.test.model.User;

public class MyPublisher implements MessageListener {
private ActiveMQConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private MessageProducer publisher;
private Topic topic;
private Topic control;
private final Object mutex = new Object();

public ActiveMQConnectionFactory getConnectionFactory() {
return connectionFactory;
}

public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

public Topic getTopic() {
return topic;
}

public void setTopic(Topic topic) {
this.topic = topic;
}

public Topic getControl() {
return control;
}

public void setControl(Topic control) {
this.control = control;
}


public void onMessage(Message message) {
        synchronized (mutex) {
            System.out.println("Received report " + getReport(message) );
           
        }
    }

Object getReport(Message m) {
        try {
            return ((TextMessage)m).getText();
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return e.toString();
        }
    }

public void publish() throws Exception {
User u = new User();
u.setAge(17);
u.setName("yuky"+System.currentTimeMillis());
u.setSex("女");
        // send events
ObjectMessage obj = session.createObjectMessage();
obj.setObject(u);
        for (int i = 0; i &lt; 10; i++) {
            publisher.send(obj);
            publisher.send(session.createTextMessage("REPORT"));
        }
    }

public void run() throws Exception {
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
publisher = session.createProducer(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

session.createConsumer(control).setMessageListener(this);
        connection.start();
}

public void stop() throws JMSException{
publisher.send(session.createTextMessage("SHUTDOWN"));
connection.stop();
        connection.close();
}
}
</pre>
<pre name="code" class="java">
package activemq.test.topic;

import javax.jms.JMSException;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartPublisher {
public static void main(String[] args) throws InterruptedException {
/*开始加载spring配置文件*/
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml");
MyPublisher publisher= (MyPublisher) context.getBean("myPublisher");
try {
publisher.run();
publisher.publish();
} catch (Exception e) {
try {
publisher.stop();
} catch (JMSException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}

    }
}
</pre>
论坛首页 入门技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics