`

JMS Api Demo

    博客分类:
  • j2ee
阅读更多
jms.JMSFactory
package jms;
import javax.jms.TopicConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSFactory {
	private static ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
	
	public static TopicConnectionFactory getActiveMQConnectionFactory(){
		return activeMQConnectionFactory;
	}
}

jms.JMSMessageActor
package jms;
import javax.jms.*;
public abstract class JMSMessageActor {
	protected String name=null;
	protected String defaultQueueName="defalut-queue";
	protected String defaultTopicName="defalut-topic";
	protected final int  DESTIONATION_TYPE_TOPIC=1;
	protected final int DESTIONATION_TYPE_QUEUE=2;
	public JMSMessageActor(String name) {
		this.name = name;
	}
	public abstract Destination  getDestination();
	public Destination createDefaultDestination(int type){
		Destination dest=null;
		switch(type){
			case DESTIONATION_TYPE_TOPIC:
				dest=new Topic(){
					@Override
					public String getTopicName() throws JMSException {
						return defaultTopicName;
					}};break;
			case DESTIONATION_TYPE_QUEUE:
				dest=new Queue(){
					@Override
					public String getQueueName() throws JMSException {
						return defaultQueueName;
					}};
				break;
		}
		return dest;
	}
}

jms.JMSMessageConsumer
package jms;
import javax.jms.*;
public abstract class JMSMessageConsumer extends JMSMessageActor implements Runnable,MessageListener{
	public JMSMessageConsumer(String name){
		super(name);
	}
	@Override
	public void onMessage(Message message) {
		synchronized(JMSMessageConsumer.class){
			System.out.println("##### consumer "+ name +" receive message. #####");
			System.out.println(JMSUtil.formatMessage(message));
		}
	}
	@Override
	public void run() {
		try{			
			// get topic connect factory
			ConnectionFactory factory = JMSFactory.getActiveMQConnectionFactory();
			// create connection
			Connection connection = factory.createConnection();
			// create unique client id for the connection
			connection.setClientID("consumer_connection_"+name);
			// if the connection start method is not invoked , the consumer may be not receive the message
			connection.start();
			// create session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination=getDestination();
			// if the destination is an instance of Queue , 
			// it will receive the message from the queue,
			// in other words the message can be consumed one time by one consumer 
			// and the message is durable.
			// if the destination is an instance of Topic , 
			// the subscribers of the Topic can receive the message,
			// but the message is non durable.
			MessageConsumer consumer  =session.createConsumer(destination,null,true);
			// if the destination is an instance of Topic, 
			// specify the clientID of the connection
			// and create MessageConsumer like this,
			// the subscribers of the Topic can receive the message
			// and the message is durable.
			//consumer =session.createDurableSubscriber((Topic)destination, "durable topic", null, true);
			consumer.setMessageListener(this);
		}catch(Exception e){
			throw new RuntimeException(e);
		}
	}
}

jms.JMSMessageProducer
package jms;
import javax.jms.*;
public abstract class JMSMessageProducer extends JMSMessageActor implements Runnable{
	public JMSMessageProducer(String name){
		super(name);
	}
	@Override
	public void run() {
		try{
			// get topic connect factory
			ConnectionFactory factory = JMSFactory.getActiveMQConnectionFactory(); 
			// create connection
			Connection connection = factory.createConnection();
			// create session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination=getDestination();
			// create message producer
			MessageProducer producer = session.createProducer(destination);
			// create message
			MapMessage mapMessage=session.createMapMessage();
			mapMessage.setObjectProperty("lock", "key");
			mapMessage.setObjectProperty("vegetables", "greens");
			mapMessage.setObjectProperty("fruit", "apple");
			mapMessage.setObjectProperty("meat", "pork");
			
			// producer.setTimeToLive(1000);// set the message expiration time
			
			producer.send(mapMessage);// send message
			connection.close();
			System.out.println(name+" send message success!");
		}catch(Exception e){
			throw new RuntimeException(e);
		}
	}
}

jms.JMSMessageQueueReceiver
package jms;
import javax.jms.Destination;
public class JMSMessageQueueReceiver extends JMSMessageConsumer {
	public JMSMessageQueueReceiver(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_QUEUE);
	}
}

jms.JMSMessageQueueSender
package jms;
import javax.jms.Destination;
public class JMSMessageQueueSender extends JMSMessageProducer {
	public JMSMessageQueueSender(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_QUEUE);
	}
}

jms.JMSMessageTopicPublisher
package jms;
import javax.jms.Destination;
public class JMSMessageTopicPublisher extends JMSMessageProducer {
	public JMSMessageTopicPublisher(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_TOPIC);
	}
}

jms.JMSMessageTopicSubscriber
package jms;
import javax.jms.Destination;
public class JMSMessageTopicSubscriber extends JMSMessageConsumer {
	public JMSMessageTopicSubscriber(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_TOPIC);
	}
}

jms.JMSUtil
package jms;
import java.text.SimpleDateFormat;
import java.util.*;
import javax.jms.*;
public class JMSUtil {
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	@SuppressWarnings("unchecked")
	public static String formatMessage(Message message) {
		StringBuffer result=new StringBuffer();
		try {
			String correlationId=message.getJMSCorrelationID();
			int deliveryMode=message.getJMSDeliveryMode();
			long expiration=message.getJMSExpiration();
			String messageId=message.getJMSMessageID();
			int priority=message.getJMSPriority();
			long timestamp=message.getJMSTimestamp();
			String type=message.getJMSType();
			result.append("##### message property #####\n")
				  .append("correlationId : "+correlationId+"\n")
				  .append("deliveryMode : "+deliveryMode+"\n")
				  .append("expiration : "+expiration+"\n")
				  .append("messageId : "+messageId+"\n")
				  .append("priority : "+priority+"\n")
				  .append("timestamp : "+sdf.format(new Date(timestamp))+"\n")
				  .append("type : "+type+"\n");
			Enumeration<String> names=message.getPropertyNames();
			result.append("##### message content #####\n");
			while(names.hasMoreElements()){
				String name=names.nextElement();
				String value=message.getStringProperty(name);
				result.append(name +" : "+value+"\n");
			}
		} catch (JMSException e) {
			throw new RuntimeException(e);
		}
		return result.toString();
	}
}

jms.Main
package jms;
public class Main {
	public static void main(String[] args) throws InterruptedException {
		sendMesasge();
		Thread.sleep(2000);
		createMessageReceiver();
		
		Thread.sleep(3000);
		System.exit(0);
	}
	/* create message receiver */
	private static void createMessageReceiver(){
		// create queue message receiver
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageQueueReceiver("queue_receiver_"+i));
			t.start();
		}
		// create topic message subscriber
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageTopicSubscriber("topic_subscriber_"+i));
			t.start();
		}
	}
	/* send message */
	private static void sendMesasge(){
		// create queue message sender
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageQueueSender("queue_sender_"+i));
			t.start();
		}
		// create topic publisher
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageTopicPublisher("topic_publisher_"+i));
			t.start();
		}
	}
}
  • jms.zip (5.1 KB)
  • 下载次数: 139
分享到:
评论
1 楼 陈碧滔 2013-08-12  
Apache ActiveMQ

相关推荐

    JMS demo 及 资料

    在这个"JMS demo 及 资料"的压缩包中,我们可能找到了一些关于JMS的基础教程和示例代码,帮助初学者了解和掌握JMS的基本用法。 JMS的核心概念主要包括以下几个部分: 1. **消息**: 消息是JMS中的基本数据单元,它...

    Jms做的一些的demo

    在Maven项目结构下,通常会在`test`目录下存放单元测试或集成测试代码,这些测试代码可能演示了如何使用JMS API和ActiveMQ进行消息生产和消费。 在Java中,使用JMS通常涉及以下步骤: 1. **创建ConnectionFactory*...

    Weblogic提供JMS服务Demo

    2. **编写JMS客户端代码**:使用Java API,如`javax.jms.*`,编写发送和接收消息的代码。`MessageProducer`负责发送消息,而`MessageConsumer`负责接收消息。消息可以通过多种方式传递,例如文本消息、对象消息等。 ...

    JMS 简单demo

    JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的...

    jms简单demo,集成spring和不集成

    JMS(Java Message Service)是Java平台中用于创建、发送、接收和读取消息的标准API。它提供了一种可靠和可扩展的方式来进行异步通信,是企业级应用中常见的组件。在本示例中,"jms简单demo"涵盖了两个方面:与...

    spring-jms-demo

    这个"spring-jms-demo"项目显然是一个示例应用,它展示了如何在Spring环境中使用JMS来实现消息传递。让我们深入探讨Spring JMS的核心概念、功能以及如何在实际开发中应用。 首先,JMS是Java平台的一个标准接口,...

    esper和jms做的小demo

    在这个名为"esper和jms做的小demo"的项目中,我们将会探讨这两个技术如何结合在一起,以模拟一个温度控制系统。 Esper,全称为Enterprise Semantic Processing,是一种复杂的事件处理引擎(CEP,Complex Event ...

    JMS.zip_DEMO_activemq

    总结来说,`JMS.zip_DEMO_activemq` 是一个关于 ActiveMQ 的演示示例,它通过 JMS API 展示了消息中间件的基本用法,包括生产者和消费者的创建,以及如何与 ActiveMQ 服务器进行交互。通过学习和实践这个示例,你...

    JMS消息队列机制及案例

    它通过JMS API将消息放入消息队列或发布到主题。 3. **消费者**:消费者是从队列中接收消息或者订阅主题的应用程序或组件。它们同样通过JMS API来监听和获取消息。 4. **队列(Queue)**:队列是一种点对点的通信...

    ActiveMQ+Camel+Spring+jms Demo(一)

    2. **Java Message Service (JMS)**:JMS是一个API,定义了Java应用程序如何创建、发送、接收和读取消息。它允许应用程序之间进行解耦通信,提高系统的灵活性和可扩展性。JMS接口包括MessageProducer和...

    SpringMVC+JMS(ActiveMQ)整合的Demo

    本项目结合这三者,提供了一个整合的Demo,旨在帮助开发者理解如何在SpringMVC应用中集成JMS和ActiveMQ,实现消息的发布与订阅。 首先,SpringMVC是Spring框架的一部分,专门用于构建Web应用程序的模型-视图-控制器...

    SpringJMS示例代码

    通过使用SpringJMS,开发者可以避免直接处理JMS API的复杂性,而是利用Spring的依赖注入和模板方法设计模式来创建消息驱动的应用程序。 2. **Java消息服务(JMS)** JMS是一种标准API,允许应用程序创建、发送、...

    activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。

    在这个测试中,你可以看到如何使用Spring JMS API来发送和接收消息,验证ActiveMQ配置的正确性,以及消息传递的有效性。 总的来说,这个示例涵盖了以下关键知识点: 1. ActiveMQ的安装和运行。 2. Spring框架中JMS...

    ActiveMQ+Camel+Spring+jms Demo(二)

    在本项目"ActiveMQ+Camel+Spring+jms Demo(二)"中,我们将探讨一个集成ActiveMQ消息中间件、Apache Camel路由引擎、Spring框架以及Java消息服务(JMS)的示例应用。这个组合提供了强大的企业级消息处理能力,是...

    移动代理服务器MAS短信API2.2开发手册及DEMO

    - **JAVA版开发手册**:为JAVA开发者提供了详细的API使用指南,涵盖了JDBC、JMS等多种方式与MAS网关交互。 - **C&C++版开发手册**:适用于C++和C语言的开发者,介绍如何在C/C++环境中调用MAS API,实现短信服务...

Global site tag (gtag.js) - Google Analytics