java代码
package com.yanzhi.system; import com.yanzhi.test.TestObject; import com.yanzhi.tools.C; import com.yanzhi.tools.Global; import com.yanzhi.tools.StringUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.*; import org.apache.activemq.pool.PooledConnectionFactory; import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.core.JmsTemplate; import javax.jms.*; import java.io.Serializable; import java.util.Date; /** * Created by xiaoyunlian on 2016/2/24. */ public class MQProducer { public static Connection connection; public Connection getConnection() { if (connection == null) { connection = getConnectionObject(); } return connection; } public static Connection getConnectionObject() { try { //连接 ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory(); targetConnectionFactory.setBrokerURL(Global.getBrokerURL()); targetConnectionFactory.setTrustAllPackages(true); SingleConnectionFactory connectionFactory = new SingleConnectionFactory(); connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接 Connection connection = connectionFactory.createConnection(); connection.start(); return connection; } catch (Exception e) { e.printStackTrace(); } return null; } public Session session; public Session getSession() { try { if (session == null) { Connection connection = getConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } return session; } catch (JMSException e) { e.printStackTrace(); } return session; } public ActiveMQQueue getActiveMQQueue() { try { Session session = getSession(); return (ActiveMQQueue) session.createQueue("testQueue1,testQueue2,testQueue3"); } catch (JMSException e) { e.printStackTrace(); } return null; } public ActiveMQDestination getActiveMQDestination(String destinationName) { return getDestination(destinationName, getActiveMQQueue()); } public MessageProducer messageProducer; /** * 获取生产者 * * @return */ public MessageProducer getMessageProducer(String destinationName) { Session session = getSession(); ActiveMQDestination activeMQDestination = getActiveMQDestination(destinationName); try { if (messageProducer == null) { messageProducer = session.createProducer(activeMQDestination); } return messageProducer; } catch (JMSException e) { e.printStackTrace(); } return messageProducer; } /** * 获取某个队列:默认获取临时队列templateQueue * * @return */ public static ActiveMQDestination getDestination(String destinationName, Destination yanzhiQueueDestination) { ActiveMQDestination destination = (ActiveMQDestination) yanzhiQueueDestination; ActiveMQDestination[] destinations = destination.getCompositeDestinations(); if (StringUtils.isBlank(destinationName)) { return null; } ActiveMQDestination mqDestination = null; for (ActiveMQDestination activeMQDestination : destinations) { String name = activeMQDestination.getPhysicalName(); if (destinationName.equals(name)) { mqDestination = activeMQDestination; break; } } return mqDestination; } public static void sendMessage(String msgType, Session session, MessageProducer producer) { try { // 发送文本消息 if (C.ACTIVEMQ_MSG_TYPE_TEXT.equalsIgnoreCase(msgType)) { String textMsg = "~~~~~~~~~~~~~~测试消息 ActiveMQ Text Message!~~~~~~~~~~~~~~" + new Date() + "," + AppicationManager.getServerIP(); TextMessage msg = session.createTextMessage(); msg.setText(textMsg); producer.send(msg); } // 发送Map消息 if (C.ACTIVEMQ_MSG_TYPE_MAP.equalsIgnoreCase(msgType)) { MapMessage msg = session.createMapMessage(); msg.setBoolean("boolean", true); msg.setShort("short", (short) 0); msg.setLong("long", 123456); msg.setString("MapMessage", "ActiveMQ Map Message!"); producer.send(msg); } // 发送流消息 if (C.ACTIVEMQ_MSG_TYPE_STREAM.equalsIgnoreCase(msgType)) { String streamValue = "ActiveMQ stream Message!"; StreamMessage msg = session.createStreamMessage(); msg.writeString(streamValue); msg.writeBoolean(false); msg.writeLong(1234567890); producer.send(msg); } // 发送对象消息 if (C.ACTIVEMQ_MSG_TYPE_OBJECT.equalsIgnoreCase(msgType)) { TestObject object = new TestObject(); object.setName("对象名称"); object.setType(1); object.setFaceValue(45678); ObjectMessage msg = session.createObjectMessage(); msg.setObject(object); producer.send(msg); } // 发送字节消息 if (C.ACTIVEMQ_MSG_TYPE_BYTES.equalsIgnoreCase(msgType)) { String byteValue = "字节消息"; BytesMessage msg = session.createBytesMessage(); msg.writeBytes(byteValue.getBytes()); producer.send(msg); } } catch (Exception e) { e.printStackTrace(); } } /** * 发送对象消息 * * @param session * @param producer * @param object */ public static void sendObjectMessage(Session session, MessageProducer producer, Object object) { try { ObjectMessage msg = session.createObjectMessage(); msg.setObject((Serializable) object); producer.send(msg); } catch (Exception e) { e.printStackTrace(); } } }
测试代码:
在单元测试中放入如下代码:
MQProducer mqProducer = new MQProducer(); MessageProducer producer = mqProducer.getMessageProducer("testQueue1"); MQProducer.sendObjectMessage(mqProducer.getSession(),producer,recordList);
其中,Global.getBrokerURL()的值是:tcp://192.168.199.149:61616?wireFormat.maxInactivityDuration=0&connectionTimeout=0&keepAlive=true
相关推荐
springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用
**ActiveMQ生产者详解** ActiveMQ是Apache组织开发的一个开源消息中间件,它遵循Java Message Service(JMS)规范,提供了高效、可靠的异步通信能力。在分布式系统中,ActiveMQ作为消息代理,允许应用程序之间通过...
总结起来,这个基于SpringBoot的ActiveMQ生产者/消费者示例展示了如何在SpringBoot应用中利用ActiveMQ实现消息传递。通过这种方式,应用程序可以在不直接互相依赖的情况下交换数据,提高了系统的可扩展性和可靠性。...
本案例代码包含了一个基本的ActiveMQ生产者和消费者的应用示例,帮助开发者理解如何使用ActiveMQ进行消息传递。 1. **JMS(Java Message Service)简介** JMS是Java平台上的一个标准API,它定义了生产、发送、接收...
首先,我们需要了解如何创建一个ActiveMQ生产者。在Java中,这通常涉及到以下步骤: 1. 添加ActiveMQ的依赖到项目中。这可以通过Maven或Gradle等构建工具完成,确保引入相应的ActiveMQ客户端库。 2. 创建一个...
Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...
- 在Java中创建ActiveMQ生产者涉及到使用JMS API。 - 首先需要创建ConnectionFactory对象,这里使用ActiveMQ提供的ActiveMQConnectionFactory类。 - 然后通过ConnectionFactory创建连接(Connection)并启动。 -...
1. **ActiveMQ生产者API**: 生产者是向消息队列发布消息的组件。在ActiveMQ中,生产者使用`ConnectionFactory`创建连接,然后创建一个`Connection`对象。接着,`Connection`被用来创建一个或多个`Session`,在`...
在Spring框架中,可以通过依赖注入的方式设置ActiveMQ生产者和消费者的属性,使得配置更加简洁高效。例如: ```java public class MyProducer { private final JmsTemplate jmsTemplate; public MyProducer...
- **生产者**:生产者负责创建和发送消息到ActiveMQ。在SSH项目中,生产者可能是一个服务或控制器,它将业务数据包装成消息并发送到队列或主题。 - **消费者**:消费者订阅队列或主题,接收并处理消息。消费者可能...
- **Amq_Producer.cpp**:这是单线程消息生产者的实现,可能包含创建连接、创建生产者对象、构建消息和发送消息的代码。 - **Amq_Producer_mt.cpp**:扩展了 Amq_Producer.cpp,增加了多线程支持,每个线程独立...
2.在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有消息到达activeMQ时,消费者和订阅者会自动获取对应的消息,其中两个消费者会轮流消费消息,而两个订阅者会同时订阅所有消息;...
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。...配合 producer 生产者demo使用。
`springMQProducer.rar` 可能包含了一个简单的 Spring 生产者配置及示例代码,教你如何创建并发送消息到 ActiveMQ。 **消息消费者**(Consumer)则是接收消息的组件,它从消息队列中读取消息并进行处理。Spring 中...
在这个“ActiveMQ集群及生产者和消费者Java代码”压缩包中,我们可以探讨以下几个关键知识点: 1. **ActiveMQ集群**:ActiveMQ的集群能力允许多个服务器形成一个逻辑单元,提供高可用性和负载均衡。当一个消息代理...
java整合activemq的demo,生产者和消费者两个方法。结合自带的工具http://192.168.1.106:8161。来查看消息传递情况
6. **消息生产者与消费者**:掌握如何使用ActiveMQ收发工具创建消息生产者发送消息,以及创建消息消费者接收消息。 7. **持久化与非持久化消息**:了解消息的持久性配置,这决定了消息在服务器重启后是否仍然可用。...
生产者负责将消息放入消息队列。在Spring中,可以使用`JmsTemplate`的`send`方法来发送消息。需要指定目的地和一个消息创建器,消息创建器通常是一个回调方法,用于创建`TextMessage`、`ObjectMessage`等。 4. **...
`Session`可以用来创建消费者、生产者,以及发送和接收消息。通常我们使用事务性会话或者非事务性会话,这里以非事务性为例: ```java Session session = connection.createSession(false, Session.AUTO_...
在同一个或不同的进程中,分别运行生产者和消费者程序,确保ActiveMQ服务器正在运行(默认端口61616)。生产者发送消息后,消费者将接收到并打印出消息内容。 6. **总结** 通过上述代码示例,我们可以看到C#与...