`

activemq生产者

阅读更多

java代码  

package com.yanzhi.system;

import com.yanzhi.test.TestObject;
import com.yanzhi.tools.C;
import com.yanzhi.tools.Global;
import com.yanzhi.tools.StringUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.*;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.*;

import java.io.Serializable;
import java.util.Date;

/**
 * Created by xiaoyunlian on 2016/2/24.
 */
public class MQProducer {

    public static Connection connection;

    public Connection getConnection() {
        if (connection == null) {
            connection = getConnectionObject();
        }
        return connection;
    }

    public static Connection getConnectionObject() {
        try {
            //连接
            ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory();
            targetConnectionFactory.setBrokerURL(Global.getBrokerURL());
            targetConnectionFactory.setTrustAllPackages(true);

            SingleConnectionFactory connectionFactory = new SingleConnectionFactory();
            connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接
            Connection connection = connectionFactory.createConnection();
            connection.start();
            return connection;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public Session session;

    public Session getSession() {
        try {
            if (session == null) {
                Connection connection = getConnection();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            }
            return session;
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return session;
    }

    public ActiveMQQueue getActiveMQQueue() {
        try {
            Session session = getSession();
            return (ActiveMQQueue) session.createQueue("testQueue1,testQueue2,testQueue3");
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return null;
    }

    public ActiveMQDestination getActiveMQDestination(String destinationName) {
        return getDestination(destinationName, getActiveMQQueue());
    }

    public MessageProducer messageProducer;

    /**
     * 获取生产者
     *
     * @return
     */
    public MessageProducer getMessageProducer(String destinationName) {
        Session session = getSession();
        ActiveMQDestination activeMQDestination = getActiveMQDestination(destinationName);
        try {
            if (messageProducer == null) {
                messageProducer = session.createProducer(activeMQDestination);
            }
            return messageProducer;
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return messageProducer;
    }

    /**
     * 获取某个队列:默认获取临时队列templateQueue
     *
     * @return
     */
    public static ActiveMQDestination getDestination(String destinationName, Destination yanzhiQueueDestination) {
        ActiveMQDestination destination = (ActiveMQDestination) yanzhiQueueDestination;
        ActiveMQDestination[] destinations = destination.getCompositeDestinations();
        if (StringUtils.isBlank(destinationName)) {
            return null;
        }
        ActiveMQDestination mqDestination = null;
        for (ActiveMQDestination activeMQDestination : destinations) {
            String name = activeMQDestination.getPhysicalName();
            if (destinationName.equals(name)) {
                mqDestination = activeMQDestination;
                break;
            }
        }
        return mqDestination;
    }

    public static void sendMessage(String msgType, Session session, MessageProducer producer) {
        try {
            // 发送文本消息
            if (C.ACTIVEMQ_MSG_TYPE_TEXT.equalsIgnoreCase(msgType)) {
                String textMsg = "~~~~~~~~~~~~~~测试消息 ActiveMQ Text Message!~~~~~~~~~~~~~~" + new Date() + "," + AppicationManager.getServerIP();
                TextMessage msg = session.createTextMessage();
                msg.setText(textMsg);
                producer.send(msg);
            }
            // 发送Map消息
            if (C.ACTIVEMQ_MSG_TYPE_MAP.equalsIgnoreCase(msgType)) {
                MapMessage msg = session.createMapMessage();
                msg.setBoolean("boolean", true);
                msg.setShort("short", (short) 0);
                msg.setLong("long", 123456);
                msg.setString("MapMessage", "ActiveMQ Map Message!");
                producer.send(msg);
            }
            // 发送流消息
            if (C.ACTIVEMQ_MSG_TYPE_STREAM.equalsIgnoreCase(msgType)) {
                String streamValue = "ActiveMQ stream Message!";
                StreamMessage msg = session.createStreamMessage();
                msg.writeString(streamValue);
                msg.writeBoolean(false);
                msg.writeLong(1234567890);
                producer.send(msg);
            }
            // 发送对象消息
            if (C.ACTIVEMQ_MSG_TYPE_OBJECT.equalsIgnoreCase(msgType)) {
                TestObject object = new TestObject();
                object.setName("对象名称");
                object.setType(1);
                object.setFaceValue(45678);
                ObjectMessage msg = session.createObjectMessage();
                msg.setObject(object);
                producer.send(msg);
            }
            // 发送字节消息
            if (C.ACTIVEMQ_MSG_TYPE_BYTES.equalsIgnoreCase(msgType)) {
                String byteValue = "字节消息";
                BytesMessage msg = session.createBytesMessage();
                msg.writeBytes(byteValue.getBytes());
                producer.send(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送对象消息
     *
     * @param session
     * @param producer
     * @param object
     */
    public static void sendObjectMessage(Session session, MessageProducer producer, Object object) {
        try {
            ObjectMessage msg = session.createObjectMessage();
            msg.setObject((Serializable) object);
            producer.send(msg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 测试代码:

在单元测试中放入如下代码:

 MQProducer mqProducer = new MQProducer();
            MessageProducer producer = mqProducer.getMessageProducer("testQueue1");
            MQProducer.sendObjectMessage(mqProducer.getSession(),producer,recordList);

 其中,Global.getBrokerURL()的值是:tcp://192.168.199.149:61616?wireFormat.maxInactivityDuration=0&connectionTimeout=0&keepAlive=true

0
6
分享到:
评论

相关推荐

    springboot整合activemq 生产者 一对一,一对多

    springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用

    ActiveMQ生产者

    **ActiveMQ生产者详解** ActiveMQ是Apache组织开发的一个开源消息中间件,它遵循Java Message Service(JMS)规范,提供了高效、可靠的异步通信能力。在分布式系统中,ActiveMQ作为消息代理,允许应用程序之间通过...

    基于SpringBoot的ActiveMQ生产者/消费者

    总结起来,这个基于SpringBoot的ActiveMQ生产者/消费者示例展示了如何在SpringBoot应用中利用ActiveMQ实现消息传递。通过这种方式,应用程序可以在不直接互相依赖的情况下交换数据,提高了系统的可扩展性和可靠性。...

    activemq生产者和消费者案例代码.zip

    本案例代码包含了一个基本的ActiveMQ生产者和消费者的应用示例,帮助开发者理解如何使用ActiveMQ进行消息传递。 1. **JMS(Java Message Service)简介** JMS是Java平台上的一个标准API,它定义了生产、发送、接收...

    activeMQ生产者和消费者代码

    首先,我们需要了解如何创建一个ActiveMQ生产者。在Java中,这通常涉及到以下步骤: 1. 添加ActiveMQ的依赖到项目中。这可以通过Maven或Gradle等构建工具完成,确保引入相应的ActiveMQ客户端库。 2. 创建一个...

    spring 整合 activemq 生产者和消费者 案例源码

    Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...

    Activemq入门实例.pdf

    - 在Java中创建ActiveMQ生产者涉及到使用JMS API。 - 首先需要创建ConnectionFactory对象,这里使用ActiveMQ提供的ActiveMQConnectionFactory类。 - 然后通过ConnectionFactory创建连接(Connection)并启动。 -...

    activemq_activemq_doublezoo_源码

    1. **ActiveMQ生产者API**: 生产者是向消息队列发布消息的组件。在ActiveMQ中,生产者使用`ConnectionFactory`创建连接,然后创建一个`Connection`对象。接着,`Connection`被用来创建一个或多个`Session`,在`...

    ActiveMq总结.docx

    在Spring框架中,可以通过依赖注入的方式设置ActiveMQ生产者和消费者的属性,使得配置更加简洁高效。例如: ```java public class MyProducer { private final JmsTemplate jmsTemplate; public MyProducer...

    自己实现的 ActiveMQ 多线程客户端 包含生产消息客户端和消费者消息客户端

    - **Amq_Producer.cpp**:这是单线程消息生产者的实现,可能包含创建连接、创建生产者对象、构建消息和发送消息的代码。 - **Amq_Producer_mt.cpp**:扩展了 Amq_Producer.cpp,增加了多线程支持,每个线程独立...

    activemq实战项目,同ssh框架整合(生产者+消费者)

    - **生产者**:生产者负责创建和发送消息到ActiveMQ。在SSH项目中,生产者可能是一个服务或控制器,它将业务数据包装成消息并发送到队列或主题。 - **消费者**:消费者订阅队列或主题,接收并处理消息。消费者可能...

    Spring平台整合消息队列ActiveMQ实现发布订阅、生产者消费者模型(适合新手或者开发人员了解学习ActiveMQ机制)

    2.在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有消息到达activeMQ时,消费者和订阅者会自动获取对应的消息,其中两个消费者会轮流消费消息,而两个订阅者会同时订阅所有消息;...

    springboot整合activemq 消费者 ACK手动确认 &消息重发

    springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。...配合 producer 生产者demo使用。

    activeMQ 详细教程与源码(包含消费者与生产者)

    `springMQProducer.rar` 可能包含了一个简单的 Spring 生产者配置及示例代码,教你如何创建并发送消息到 ActiveMQ。 **消息消费者**(Consumer)则是接收消息的组件,它从消息队列中读取消息并进行处理。Spring 中...

    ActiveMQ集群及生产者和消费者Java代码.zip

    在这个“ActiveMQ集群及生产者和消费者Java代码”压缩包中,我们可以探讨以下几个关键知识点: 1. **ActiveMQ集群**:ActiveMQ的集群能力允许多个服务器形成一个逻辑单元,提供高可用性和负载均衡。当一个消息代理...

    activemq生产消费的Demo

    java整合activemq的demo,生产者和消费者两个方法。结合自带的工具http://192.168.1.106:8161。来查看消息传递情况

    activeMQ收发工具.rar

    6. **消息生产者与消费者**:掌握如何使用ActiveMQ收发工具创建消息生产者发送消息,以及创建消息消费者接收消息。 7. **持久化与非持久化消息**:了解消息的持久性配置,这决定了消息在服务器重启后是否仍然可用。...

    ActiveMQ整合Spring(多消费者)

    生产者负责将消息放入消息队列。在Spring中,可以使用`JmsTemplate`的`send`方法来发送消息。需要指定目的地和一个消息创建器,消息创建器通常是一个回调方法,用于创建`TextMessage`、`ObjectMessage`等。 4. **...

    动态创建ActiveMQ消费者

    `Session`可以用来创建消费者、生产者,以及发送和接收消息。通常我们使用事务性会话或者非事务性会话,这里以非事务性为例: ```java Session session = connection.createSession(false, Session.AUTO_...

    activeMQ Demo

    在同一个或不同的进程中,分别运行生产者和消费者程序,确保ActiveMQ服务器正在运行(默认端口61616)。生产者发送消息后,消费者将接收到并打印出消息内容。 6. **总结** 通过上述代码示例,我们可以看到C#与...

Global site tag (gtag.js) - Google Analytics