`
hekuilove
  • 浏览: 158902 次
  • 性别: Icon_minigender_1
  • 来自: 魔都
社区版块
存档分类
最新评论

ActiveMQ发送接收TextMessage、BytesMessage

阅读更多
1、TextMessage

发送部分
package org.quinn.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SendMessage {

	public static void main(String[] args) {
		ConnectionFactory factory; //连接工厂
		Connection connection;//jms连接
		Session session;//发送、接收线程
		Destination destination;//消息目的地
		MessageProducer producer;//消息发送者

		factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
				"tcp://localhost:61616");

		try {
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  //是否为持久化 ,NON_PERSISTENT非持久化 ,PERSISTENT持久化

			for (int i = 11; i < 13; i++) {
				String msg = "第" + i + "次发送消息";

				TextMessage textMessage = session.createTextMessage(msg);
				producer.send(textMessage);
			}
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}

	}
}



接收部分
package org.quinn.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

public class ReceivedMessage {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		ConnectionFactory factory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer consumer;
		factory = new org.apache.activemq.ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				TextMessage textMessage = (TextMessage) consumer.receive();
				if (textMessage != null){
					System.out.println(textMessage.getText());
				}
				Thread.sleep(1000);
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

}



2、BytesMessage

发送部分
package org.quinn.activemq;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SendBytesMessage {

	public static void main(String[] args) {
		ConnectionFactory factory; //连接工厂
		Connection connection;//jms连接
		Session session;//发送、接收线程
		Destination destination;//消息目的地
		MessageProducer producer;//消息发送者

		factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
				"tcp://localhost:61616");

		try {
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

			BytesMessage bytesMessage = session.createBytesMessage();

			File fil = new File("C:\\Users\\WS-SH-L1051\\Desktop\\a.txt");

			InputStream is = new FileInputStream(fil);

			byte by[] = new byte[is.available()];

			is.read(by);
			
			bytesMessage.writeBytes(by);

			producer.send(destination, bytesMessage);
			
			is.close();
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}

	}
}


接收部分
package org.quinn.activemq;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;

public class ReceivedBytesMessage {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		ConnectionFactory factory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer consumer;
		factory = new org.apache.activemq.ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			connection = factory.createConnection();
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				
				Message msg = consumer.receive();
				if(msg instanceof ActiveMQBytesMessage){
					ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) msg;
					if (bytesMessage != null){
						byte []bt = new byte[(int) bytesMessage.getBodyLength()];
						bytesMessage.readBytes(bt);
						File fil =new File("C:\\Users\\WS-SH-L1051\\Desktop\\b.txt");
						if(!fil.exists()){
							fil.createNewFile();
						}
						OutputStream os = new FileOutputStream(fil);
						os.write(bt);
						os.close();
					}
				}
				Thread.sleep(1000);
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

}

分享到:
评论

相关推荐

    activemqactivemq

    消息类型在ActiveMQ中包括TextMessage(文本消息)、ObjectMessage(对象消息)、BytesMessage(字节消息)和MapMessage(映射消息),每种类型都有其特定的应用场景。 为了确保高可用性和容错性,ActiveMQ支持多种...

    AMQ接收与发送

    发送者可以使用MessageProducer接口来创建和发送消息,包括TextMessage、ObjectMessage、BytesMessage等不同类型的消息格式。发送端可以设置消息的优先级、延迟发送、持久化等属性,以满足不同业务需求。 ### 3. ...

    Activemq原理文档

    * Message:消息体,分为TextMessage、MapMessage、BytesMessage等等 消息传输流程 Activemq的消息传输流程可以分为发送消息和接收消息两个步骤: 发送消息的基本步骤: 1. 创建连接使用的工厂类JMS ...

    ActiveMQ客户端

    4. **消息类型**:JMS定义了五种消息类型:TextMessage(文本消息)、ObjectMessage(Java对象)、BytesMessage(字节流)、MapMessage(键值对)和StreamMessage(连续的数据流)。 5. **持久化**:ActiveMQ支持...

    activemq 配置说明与activemq入门讲解

    5. **消息类型**:ActiveMQ支持TextMessage、ObjectMessage、BytesMessage和MapMessage四种基本消息类型。 6. **消息持久化**:通过设置Message的持久化属性,可以在服务器重启后仍保留消息。这对于实现高可用性和...

    C#,activemq,mq

    ActiveMQ提供了多种消息类型,包括TextMessage(文本消息)、ObjectMessage(序列化对象消息)、BytesMessage(二进制消息)等。每种消息类型都有其适用的场景,例如,TextMessage适合传输文本数据,而ObjectMessage...

    ActiveMQ(中文)参考手册

    JMS 定义的消息类型有 TextMessage、MapMessage、BytesMessage、StreamMessage 和 ObjectMessage。 JMS 可靠性机制 JMS 提供了以下几种可靠性机制: * 确认机制:JMS 消息只有在被确认之后,才认为已经被成功地...

    ActiveMQ常见消息类型.docx

    在Spring JMS中,使用`JmsTemplate.convertAndSend()`方法可以发送TextMessage,而`MessageListener`接口的`onMessage(Message msg)`方法则用于接收。 2. **MapMessage**: MapMessage允许你以键值对的形式存储和...

    ActiveMQ学习 完整例子

    - **消息类型**:包括TextMessage、ObjectMessage、MapMessage和BytesMessage,根据数据类型选择合适的。 4. **消息持久化** - **消息持久化机制**:当服务器重启时,确保未被消费的消息不会丢失。 - **消息策略...

    ActiveMq总结.docx

    - **发布/订阅**:在这种模式下,消息被发送到一个主题,所有订阅了该主题的接收者都将接收到消息。这种方式适用于广播式的消息传输场景。 - **点对点**:在这种模式下,消息被发送到一个队列,每个消息只能被一个...

    ActiveMQ 5.7源码API

    在ActiveMQ中,Message支持多种类型,如TextMessage、ObjectMessage、BytesMessage等,以满足不同数据格式的需求。同时,Message还包含头信息(Header)、属性(Property)和身体(Body),这些元素共同构成了完整...

    activeMQ-API.rar

    - `Message`: 包含要传输的数据,可以是TextMessage、ObjectMessage、BytesMessage等不同类型,支持不同的数据格式。 4. **事务处理** - 在ActiveMQ API中,事务管理是通过Session实现的。如果会话被配置为事务性...

    一个activeMQ的简单例子

    JMS支持多种消息类型,如TextMessage、ObjectMessage、BytesMessage等。 2. **生产者(Producer)**:生产者是创建并发送消息的组件。在代码中,我们通过创建`MessageProducer`对象并调用其`send()`方法来发送消息...

    ActiveMQ开发实例-6

    - **BytesMessage**: 用于发送未经解释的字节流数据。 #### 五、JMS应用程序接口 - **ConnectionFactory**: 提供创建到JMS提供者的连接的功能,通过它创建的Connection对象是与特定JMS提供者之间的会话。 - **...

    spring boot ActiveMQ学习练习demo项目源码

    在这个项目中,我们将深入探讨如何在Spring Boot应用中配置、发送和接收五种不同类型的消息:text、map、byte、stream以及object。 首先,我们需要理解Spring Boot和ActiveMQ的基本概念。Spring Boot是Spring框架的...

    ActiveMQ 实战

    消息体则定义了消息内容的具体类型,JMS定义了文本消息(TextMessage)、映射消息(MapMessage)、字节消息(BytesMessage)、流消息(StreamMessage)和对象消息(ObjectMessage)。 JMS规范还定义了可靠性机制,...

    ActiveMQ 消息队列

    在实际开发过程中,可以通过编写Java代码来发送和接收消息,利用JMS API与ActiveMQ建立连接。以下是一个简单的JMS消息发送和接收的例子: ```java // 发送消息 ConnectionFactory connectionFactory = new ...

    java操作activeMQ(java项目代码及jar包可运行,队列和订阅模式)

    - `MessageProducer`和`MessageConsumer`:用于发送和接收消息的对象,通过`createProducer()`和`createConsumer()`方法创建。 4. **消息类型**: - `TextMessage`:包含纯文本消息。 - `ObjectMessage`:序列化...

Global site tag (gtag.js) - Google Analytics