`

activemq例子代码 发送BytesMessage消息

    博客分类:
  • JMS
阅读更多
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;

//发送TextMessage
public class SendMessage {
    
    private static final String url = "tcp://localhost:61616";;
    private static final String QUEUE_NAME = "choice.queue";
    protected String expectedBody = "<hello>world!</hello>";
    
    public void sendMessage() throws JMSException{

        Connection connection = null;
        
        try{            
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            connection = connectionFactory.createConnection();
            
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);          
            Destination destination = session.createQueue(QUEUE_NAME);
            MessageProducer producer = session.createProducer(destination);
            TextMessage message = session.createTextMessage(expectedBody);
            message.setStringProperty("headname", "remoteB");
                producer.send(message);
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            connection.close();
        }
    }


***************************************************************************************
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
//发送BytesMessage
public class SendMessage {
    
    private String url = "tcp://localhost:61616";
        
    public void sendMessage() throws JMSException{
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("test.queue");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        BytesMessage message = session.createBytesMessage();
        byte[] content = getFileByte("d://test.jar");
        message.writeBytes(content);
        try{
            producer.send(message);
            System.out.println("successful send message");
        }catch(Exception e){
            e.printStackTrace();
            e.getMessage();
        }finally{
            session.close();
            connection.close();
        }                
    }
    
    private byte[] getFileByte(String filename){
        byte[] buffer = null;
        FileInputStream fin = null;
        try {
            File file = new File(filename);
            fin = new FileInputStream(file); 
            buffer = new byte[fin.available()];
            fin.read(buffer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                fin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return buffer;
    }
发送完消息后可以访问 
http://localhost:8161/admin/queues.jsp 
看到相应的queue中是否有消息 

 

 

 

 

 适用收取TextMessage消息 
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;


public class ReceiveMessage {
    
    private static final String url = "tcp://172.16.168.167:61616";
    private static final String QUEUE_NAME = "szf.queue";

    
    public void receiveMessage(){
        Connection connection = null;
        try{
            try{
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
                connection = connectionFactory.createConnection();
            }catch(Exception e){
//                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//                connection = connectionFactory.createConnection();
            }            
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(QUEUE_NAME);
            MessageConsumer consumer = session.createConsumer(destination);
            consumeMessagesAndClose(connection,session,consumer);
        }catch(Exception e){
            
        }
    }
    
    protected void consumeMessagesAndClose(Connection connection,
            Session session, MessageConsumer consumer) throws JMSException {
        for (int i = 0; i < 1;) {
            Message message = consumer.receive(1000);
            if (message != null) {
                i++;
                onMessage(message);
            }
        }
        System.out.println("Closing connection");
        consumer.close();
        session.close();
        connection.close();
    }
    
    public void onMessage(Message message){
        try{
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage)message;
                String msg = txtMsg.getText();
                System.out.println("Received: " + msg);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
        
    public static void main(String args[]){
        ReceiveMessage rm = new ReceiveMessage();
        rm.receiveMessage();
    }
}

 

分享到:
评论
4 楼 GFJGH 2015-01-08  
怎么收取的byte数组呢
3 楼 hundsun_2008 2012-10-25  
怎么接受BytesMessage呢?BytesMessage肯定不能转换为textMessage 接收的
2 楼 jychenok 2011-08-24  
楼上要设置respose.set编码
1 楼 herry 2009-06-10  
不知道楼主用ActiveMQ发送过中文消息没?
我获取到得中文消息为乱码,不管是用TextMessage,还是BytesMessage,还是ObjectMessage

相关推荐

    activemq 通过ajax发送接收消息简单例子

    发送消息的示例代码可能如下: ```javascript $.ajax({ url: 'http://localhost:8161/api/message/myQueue?type=send', type: 'POST', contentType: 'application/json', data: JSON.stringify({ message: $('#...

    ActiveMQ例子

    3. **连接与发送**:建立与ActiveMQ服务器的连接,通过生产者发送消息。 4. **接收消息**:作为消费者订阅队列或主题,接收并处理消息。 5. **使用Web控制台**:访问ActiveMQ的Web界面,监控队列和主题的状态,管理...

    springMVC+activeMQ例子

    4. **编写消息生产者**:在Spring MVC的控制器或服务层,创建一个方法来发送消息到ActiveMQ。可以使用`JmsTemplate`或者`DefaultMessageListenerContainer`来实现。 5. **编写消息消费者**:创建一个实现`Message...

    ActiveMQ学习笔记之九--发送消息到队列中

    这篇"ActiveMQ学习笔记之九--发送消息到队列中"主要探讨的是如何通过编程方式向ActiveMQ队列发送消息,这对于理解和应用消息中间件至关重要。 首先,我们要理解ActiveMQ中的队列(Queue)概念。队列是一种先进先出...

    activemq-cpp发送接收消息,消息过滤器

    发送消息的基本步骤包括: 1. **初始化连接**:创建一个`ConnectionFactory`实例,然后通过它建立到ActiveMQ服务器的连接。这通常涉及到设置URL、用户名和密码等参数。 2. **创建会话**:连接建立后,需要创建一个...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    6. 发送消息业务类:在业务层,调用上述接口发送消息。这通常发生在某些业务操作完成后,需要通知其他系统或组件的时候。 7. 测试与调试:编写测试用例,确保消息能正确发送和接收,同时监控ActiveMQ服务器以查看...

    activeMQ 例子 真实环境下测试过

    这些示例代码通常会展示如何创建生产者发送消息、创建消费者接收消息、设置消息属性、使用事务、处理异常等常见操作。通过学习和运行这些示例,开发者可以快速理解和掌握ActiveMQ的基本用法和高级特性。 在实际环境...

    ActiveMQ接受和发送工具.rar

    它们通过连接到Broker并使用JMS API创建和发送消息。 - **Consumer**:消费者是接收消息的应用程序。它们同样通过JMS API订阅主题或队列来接收消息。 - **Queue**:FIFO(先进先出)结构,每个消息仅被一个消费者...

    activeMQ发送消息返回消息

    在ActiveMQ中发送消息,通常涉及以下步骤: 1. **创建ConnectionFactory**:ConnectionFactory是创建Connection的工厂类,它是JMS规范的一部分。在ActiveMQ中,你需要通过配置文件或者编程方式来创建...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    一个activeMQ的简单例子

    在代码中,我们通过创建`MessageProducer`对象并调用其`send()`方法来发送消息。 3. **消费者(Consumer)**:消费者是接收消息的组件。我们通过创建`MessageConsumer`对象来接收消息,通常是在`MessageListener`...

    activeMQ收发工具.rar

    6. **消息生产者与消费者**:掌握如何使用ActiveMQ收发工具创建消息生产者发送消息,以及创建消息消费者接收消息。 7. **持久化与非持久化消息**:了解消息的持久性配置,这决定了消息在服务器重启后是否仍然可用。...

    activemq消息的发送与接受封装的工具类

    activemq消息的发送与接受封装的工具类,只要你导入jar包

    SpringBoot整合ActiveMQ(消息中间件)实现邮件发送功能

    4. **发送消息**:通过`JmsTemplate`或者`JMSTemplate`的`convertAndSend`方法可以向ActiveMQ发送消息。 **二、邮件发送功能** 邮件发送功能通常涉及以下步骤: 1. **配置邮件服务器**:在配置文件中设置SMTP...

    apache-activemq-5.0.0-src.zip_ActiveMQ 源代码_activemq_activemq.src

    通过深入学习和研究ActiveMQ的源代码,开发者不仅可以掌握消息中间件的基本原理,还能学习到高级特性如事务处理、消息优先级、延迟消息、死信队列等。同时,对于Java NIO、多线程编程、网络编程等领域也有很好的实践...

    ActiveMQ代码用例

    2. **生产者**:发送消息的组件。 3. **消费者**:接收并处理消息的组件。 4. **队列(Queue)**:FIFO(先进先出)结构,每个消息仅被一个消费者接收。 5. **主题(Topic)**:多播模式,消息可以被多个订阅者接收...

    ActiveMQ实现例子

    在上述代码中,我们创建了一个名为"MyQueue"的消息队列,生产者发送消息到这个队列,消费者则从队列中接收消息。你可以根据需要创建多个生产者和消费者,以实现更复杂的消息传递场景。 ActiveMQ还支持多种高级特性...

    activemqactivemq

    消息类型在ActiveMQ中包括TextMessage(文本消息)、ObjectMessage(对象消息)、BytesMessage(字节消息)和MapMessage(映射消息),每种类型都有其特定的应用场景。 为了确保高可用性和容错性,ActiveMQ支持多种...

Global site tag (gtag.js) - Google Analytics