`
dannyhz
  • 浏览: 388261 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

ActiveMq 的queue和Topic的例子

阅读更多
package Client;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class DeliverQueue {
	
	public static void main(String[] args) {
		String brokerUrl = "tcp://127.0.0.1:61616";
    	String userName = "admin";
    	String pswd = "admin";
    	ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    	
    	((ActiveMQConnectionFactory) connectionFactory).setBrokerURL(brokerUrl);
    	((ActiveMQConnectionFactory) connectionFactory)
		.setUserName(userName);
    	
    	((ActiveMQConnectionFactory) connectionFactory)
		.setPassword(pswd);
    	
    	try {
    		Connection conn = ((ActiveMQConnectionFactory) connectionFactory).createConnection();
    		
    		conn.start();
			
    		Session jmsSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		
    		Destination destination = jmsSession.createQueue("MONKEY_Q");
    		MessageProducer producer = jmsSession.createProducer(destination);
    		
    		Message msg = jmsSession.createTextMessage("Monkey king."); 
    		producer.send(msg);
    		
    		producer.close();
    		jmsSession.close();
    		conn.close();
    		
    		System.out.println("sent msg :" + msg);
    		
    		
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    	
	}

}



package Client;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Hello world!
 *
 */
public class ReceiveQueue 
{
    public static void main( String[] args )
    {
    	String brokerUrl = "tcp://127.0.0.1:61616";
    	String userName = "admin";
    	String pswd = "admin";
    	ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    	
    	((ActiveMQConnectionFactory) connectionFactory).setBrokerURL(brokerUrl);
    	((ActiveMQConnectionFactory) connectionFactory)
		.setUserName(userName);
    	
    	((ActiveMQConnectionFactory) connectionFactory)
		.setPassword(pswd);
    	
    	try {
    		Connection conn = ((ActiveMQConnectionFactory) connectionFactory).createConnection();
    		
    		conn.start();
			
    		Session jmsSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		
    		Destination destination = jmsSession.createQueue("MONKEY_Q");
    		MessageConsumer comsumer = jmsSession.createConsumer(destination);
    		
    		Message msg = comsumer.receive(1000L);
    		
    		comsumer.close();
    		jmsSession.close();
    		conn.close();
    		
    		System.out.println(msg);
    		
    		
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    	
 
    }
}




package Client;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ReceiveTopic implements Runnable{
	private static final String brokerUrl = "tcp://127.0.0.1:61616";
	private static final String userName = "admin";
	private static final String pswd = "admin";
	private String threadName;
	
	ReceiveTopic(String name){
		threadName = name; 
	}
	
	@Override
	public void run() {
		
		ConnectionFactory connFactory;
		Connection conn = null;
		Session session;
		
		Destination dest;
		
		MessageConsumer consumer;
		
		connFactory = new ActiveMQConnectionFactory(userName,pswd,brokerUrl);
		try{
			
			conn = connFactory.createConnection();
			conn.start();
			session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
			dest = session.createTopic("AllWorld");
			
			consumer = session.createConsumer(dest);
			
			while(true){
				TextMessage message = (TextMessage)consumer.receive(100*1000);
				if(null != message){
					System.out.println("Thread " + threadName + " has reveived the message . " + message.getText());
				}else{
					continue;
				}
			}
			
		}catch(Exception ex){
			ex.printStackTrace();
		}finally{
			try{
				if(null != conn){
					conn.close();
				}
			}catch(Throwable ignore){
				
			}
		}
		
	}

	public static void main(String[] args) {
		
		ReceiveTopic r1 = new ReceiveTopic("Thread 1 ");
		ReceiveTopic r2 = new ReceiveTopic("Thread 2 ");
		ReceiveTopic r3 = new ReceiveTopic("Thread 3 ");
		
		Thread thread1 = new Thread(r1);
		Thread thread2 = new Thread(r2);
		Thread thread3 = new Thread(r3);
		
		thread1.start();
		thread2.start();
		thread3.start();
		
	} 
	
	
}






package Client;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SendToTopic {
	private static final String brokerUrl = "tcp://127.0.0.1:61616";
	private static final String userName = "admin";
	private static final String pswd = "admin";
	private static final int SEND_NUMBER = 5;
	
	public static void sendMessage(Session session, MessageProducer producer)
	throws Exception
	{
		for(int i=1; i <= SEND_NUMBER ; i++){
			TextMessage message = session.createTextMessage("I am new a message, number " + i + ".");
			
			System.out.println("Send message " + i );
			producer.send(message);
			
		}
	}
	
	public static void main(String[] args) {
		ActiveMQConnectionFactory connFactory ;
		Connection conn = null;
		Session session;
		
		Destination dest;
		
		MessageProducer producer;
		connFactory = new ActiveMQConnectionFactory(
								userName,
								pswd,
								brokerUrl);
		
		
		try{
			conn = connFactory.createConnection();
			conn.start();

			session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
			
			dest = session.createTopic("AllWorld");
			
			producer = session.createProducer(dest);
			
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			
			sendMessage(session, producer);
			
			session.commit();
			
			
		}catch(Exception ex){
			ex.printStackTrace();
		}finally{
			try{
				if(null != conn){
					conn.close();
				}
			}catch(Throwable ignore){
				
			}
		}
		
		
	}
	
}




引用

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>

<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
</dependency>

<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</dependency>

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>

<dependency>
<groupId>opensymphony</groupId>
<artifactId>quartz</artifactId>
</dependency>

<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</dependency>

<dependency>
<groupId>com.****.****</groupId>
<artifactId>****-framework-core</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>com.****.****</groupId>
<artifactId>****-framework-activemq</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>

<dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.5.2</version>
    </dependency>
</dependencies>

分享到:
评论

相关推荐

    springboot2整合activemq的demo内含queue消息和topic消息

    - demo项目中应该包含生产者和消费者的相关代码,演示如何创建消息、发送到Queue或Topic,以及如何接收和处理消息。 - 可能包含`@JmsListener`注解用于定义消息监听器,以及`JmsTemplate`用于发送消息。 8. **...

    Apache ActiveMQ 入门最简单例子

    Session可以创建一个Destination(可能是Queue或Topic),接着创建一个Producer,并通过Producer发送消息。 ```java import javax.jms.*; public class MessageProducer { public static void main(String[] args...

    ActiveMQ例子

    ActiveMQ支持多种消息模式,如点对点(Queue)和发布/订阅(Topic)。 2. **队列(Queue)**:在点对点模型中,队列是一种FIFO(先进先出)结构,每个消息只有一个消费者。发送到队列的消息只能被一个消费者接收并...

    ActiveMQ学习 完整例子

    在"ActiveMQ学习"的完整例子中,你可以通过编写Java代码来创建生产者和消费者,实践发送和接收消息,了解消息的生命周期和状态。同时,通过配置不同的参数,体验ActiveMQ的灵活性和强大功能。例如,你可以创建一个...

    activeMQ JMS WEB 例子

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它实现了Java消息服务(JMS)标准,用于在分布式系统中高效地传递消息。JMS是一种API,它定义了...这个例子对理解和实践ActiveMQ在Web环境中的应用非常有帮助。

    一个activeMQ的简单例子

    这个简单的ActiveMQ例子可能是为了演示如何设置和使用基本的生产者和消费者,以及如何通过消息队列实现异步通信。在实际应用中,我们还可以利用ActiveMQ的高级特性,如持久化、优先级、消息筛选等,以满足更复杂的...

    springMVC+activeMQ例子

    3. **配置Spring**:在Spring的配置文件(如applicationContext.xml)中,配置JMS监听容器,指定消息目的地(如topic或queue)和消息监听器。 4. **编写消息生产者**:在Spring MVC的控制器或服务层,创建一个方法...

    jms之activeMQ 队列和广播模式例子(主要给初学者提供入门知识)

    这篇博客"jms之activeMQ 队列和广播模式例子"主要面向初学者,旨在提供ActiveMQ入门级的知识,通过实例解释队列(Queue)和主题(Topic)这两种基本的消息模式。 首先,我们要理解JMS中的队列和主题的区别。队列...

    ActiveMQ的简单例子

    本教程将通过一个简单的例子介绍ActiveMQ的两个核心模式:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。我们将使用IntelliJ IDEA作为集成开发环境来实现这些示例。 首先,我们需要...

    activemq的小例子

    4. **队列(Queue)与主题(Topic)**:队列遵循“一对一”模型,每个消息仅被一个消费者接收;主题遵循“一对多”模型,消息可以被多个订阅者接收。 5. **持久化**:ActiveMQ支持消息的持久化,即使服务器重启,未...

    jms+activeMq+spring学习简单例子

    标题“jms+activeMq+spring学习简单例子”表明这个压缩包包含了一些示例代码,用于演示如何在Spring框架中集成JMS和ActiveMQ,以便于理解和学习。通过这个例子,开发者可以了解如何在实际应用中实现基于消息的通信。...

    ActiveMQ入门及深入使用的例子

    在压缩包"ActiveMQ-5.1"中,可能包含了示例代码和配置文件,你可以根据这些资料动手实践,通过运行例子来加深对ActiveMQ的理解。这些例子涵盖了基本的发送和接收消息,以及一些高级特性,如消息选择器、事务管理等。...

    自己写的ActiveMQ的Demo例子

    ActiveMQ 支持多种协议,如 OpenWire、STOMP、AMQP 和 MQTT,允许不同平台的应用程序进行通信。它提供了消息的可靠传递、高可用性以及可伸缩性,是企业级应用程序中广泛使用的组件。 **ActiveMQ 的核心概念** 1. *...

    java ActiveMQ的例子

    在ActiveMQ中,消息通过生产者(Producer)发送到一个或多个主题(Topic)或队列(Queue)。消费者(Consumer)则订阅这些主题或从队列中获取消息。这种模式允许应用程序之间进行解耦通信,因为生产者和消费者不必...

    JMS之ActiveMQ工具类+使用例子.zip

    2. **高性能**:ActiveMQ采用了高效的缓存策略和内存管理,确保高吞吐量和低延迟。 3. **高可用性**:通过集群和故障转移功能,ActiveMQ可以提供高可用的服务,确保消息不会丢失。 4. **多种协议支持**:除了JMS,...

    activeMQ与spring整合开发的例子程序

    1. **Spring集成ActiveMQ的基本配置**:在Spring应用中使用ActiveMQ,首先需要在Spring配置文件中添加JMS相关的bean,如ConnectionFactory、Destination(Queue或Topic)、MessageListenerContainer等。这些配置将...

    activemq-web-console-5.11.2

    1.一个是admin,用来显示和管理所有的queue、topic、connection等等。 2.一个是demo,有一些使用jms和activemq的简单例子。 3.还有一个fileserver,用来支持通过activemq发送文件时的中转服务器。blob message时配置...

    ActiveMQ的安装与使用

    2. **创建队列和主题**:在控制台中可以创建消息队列(Queue)和主题(Topic),用于接收和发送消息。 3. **发送和接收消息**:通过JMS API或各种语言的客户端库(如Java、Python、C#等)可以编写程序来发送和接收...

    ActiveMQ的入门例子

    2. **主题(Topic)与队列(Queue)**:主题适用于广播消息,所有订阅者都能收到消息;队列则是一对一的通信,消息由一个消费者接收并处理。 3. **生产者(Producer)**:发布消息到消息队列或主题的组件。 4. **...

Global site tag (gtag.js) - Google Analytics