`

activemq例子代码 发送BytesMessage消息

    博客分类:
  • JMS
阅读更多
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;

//发送TextMessage
public class SendMessage {
    
    private static final String url = "tcp://localhost:61616";;
    private static final String QUEUE_NAME = "choice.queue";
    protected String expectedBody = "<hello>world!</hello>";
    
    public void sendMessage() throws JMSException{

        Connection connection = null;
        
        try{            
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            connection = connectionFactory.createConnection();
            
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);          
            Destination destination = session.createQueue(QUEUE_NAME);
            MessageProducer producer = session.createProducer(destination);
            TextMessage message = session.createTextMessage(expectedBody);
            message.setStringProperty("headname", "remoteB");
                producer.send(message);
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            connection.close();
        }
    }


***************************************************************************************
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
//发送BytesMessage
public class SendMessage {
    
    private String url = "tcp://localhost:61616";
        
    public void sendMessage() throws JMSException{
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session  session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("test.queue");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        BytesMessage message = session.createBytesMessage();
        byte[] content = getFileByte("d://test.jar");
        message.writeBytes(content);
        try{
            producer.send(message);
            System.out.println("successful send message");
        }catch(Exception e){
            e.printStackTrace();
            e.getMessage();
        }finally{
            session.close();
            connection.close();
        }                
    }
    
    private byte[] getFileByte(String filename){
        byte[] buffer = null;
        FileInputStream fin = null;
        try {
            File file = new File(filename);
            fin = new FileInputStream(file); 
            buffer = new byte[fin.available()];
            fin.read(buffer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                fin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return buffer;
    }
发送完消息后可以访问 
http://localhost:8161/admin/queues.jsp 
看到相应的queue中是否有消息 

 

 

 

 

 适用收取TextMessage消息 
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;


public class ReceiveMessage {
    
    private static final String url = "tcp://172.16.168.167:61616";
    private static final String QUEUE_NAME = "szf.queue";

    
    public void receiveMessage(){
        Connection connection = null;
        try{
            try{
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
                connection = connectionFactory.createConnection();
            }catch(Exception e){
//                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//                connection = connectionFactory.createConnection();
            }            
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(QUEUE_NAME);
            MessageConsumer consumer = session.createConsumer(destination);
            consumeMessagesAndClose(connection,session,consumer);
        }catch(Exception e){
            
        }
    }
    
    protected void consumeMessagesAndClose(Connection connection,
            Session session, MessageConsumer consumer) throws JMSException {
        for (int i = 0; i < 1;) {
            Message message = consumer.receive(1000);
            if (message != null) {
                i++;
                onMessage(message);
            }
        }
        System.out.println("Closing connection");
        consumer.close();
        session.close();
        connection.close();
    }
    
    public void onMessage(Message message){
        try{
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage)message;
                String msg = txtMsg.getText();
                System.out.println("Received: " + msg);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
        
    public static void main(String args[]){
        ReceiveMessage rm = new ReceiveMessage();
        rm.receiveMessage();
    }
}

 

分享到:
评论
4 楼 GFJGH 2015-01-08  
怎么收取的byte数组呢
3 楼 hundsun_2008 2012-10-25  
怎么接受BytesMessage呢?BytesMessage肯定不能转换为textMessage 接收的
2 楼 jychenok 2011-08-24  
楼上要设置respose.set编码
1 楼 herry 2009-06-10  
不知道楼主用ActiveMQ发送过中文消息没?
我获取到得中文消息为乱码,不管是用TextMessage,还是BytesMessage,还是ObjectMessage

相关推荐

    一个activeMQ的简单例子

    这个简单的ActiveMQ例子可能是为了演示如何设置和使用基本的生产者和消费者,以及如何通过消息队列实现异步通信。在实际应用中,我们还可以利用ActiveMQ的高级特性,如持久化、优先级、消息筛选等,以满足更复杂的...

    ActiveMQ学习 完整例子

    在"ActiveMQ学习"的完整例子中,你可以通过编写Java代码来创建生产者和消费者,实践发送和接收消息,了解消息的生命周期和状态。同时,通过配置不同的参数,体验ActiveMQ的灵活性和强大功能。例如,你可以创建一个...

    ACTIVEMQ C#下的例子

    6. **创建消息**: 在这个例子中,我们发送的是文件内容,所以消息可能是包含文件二进制数据的BytesMessage。 ```csharp var message = session.CreateBytesMessage(fileBytes); ``` 文件内容需要先读取到字节...

    ActiveMQ 消息队列

    在实际开发过程中,可以通过编写Java代码来发送和接收消息,利用JMS API与ActiveMQ建立连接。以下是一个简单的JMS消息发送和接收的例子: ```java // 发送消息 ConnectionFactory connectionFactory = new ...

    JMS开发例子.rar

    消息本身可以有多种类型,如TextMessage(文本消息)、ObjectMessage(对象消息)、BytesMessage(字节消息)、MapMessage(映射消息)和StreamMessage(流消息)。每种消息类型适用于不同的数据传输场景。 在JMS...

    java-jms小例子

    5. **示例代码**:在“jmstest”文件中,可能包含了一个简单的Java程序,它创建了生产者和消费者,并使用`ActiveMQ`这样的JMS提供商作为消息中间件。生产者会发送一个`TextMessage`到一个队列或主题,而消费者则监听...

    J2EE中的JMS 消息服务

    常见的消息代理有ActiveMQ、RabbitMQ和Apache Kafka等。 4. **队列(Queue)**:提供一对一的消息传递模型,每个消息仅由一个消费者接收,确保消息顺序和至少一次交付。 5. **主题(Topic)**:提供一对多的消息...

Global site tag (gtag.js) - Google Analytics