activemq点对点消息传送
消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送
ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现
ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。
bin/activemq start
AcitveMQ的数据传送流程:
proceder->broker->consumer
ActiveMQ的两种消息传递类型:
(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,消费者获取该条队列里的数据。
(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producter {
//ActiveMq 的默认用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默认登录密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//ActiveMQ 的链接地址
//private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String BROKEN_URL = "failover://tcp://192.168.1.128:61616";
AtomicInteger count = new AtomicInteger(0);
//链接工厂
ConnectionFactory connectionFactory;
//链接对象
Connection connection;
//事务管理
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
public void init(){
try {
//创建一个链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//从工厂中创建一个链接
connection = connectionFactory.createConnection();
//开启链接
connection.start();
//创建一个事务(这里通过参数可以设置事务的级别)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMessage(String disname){
try {
//创建一个消息队列
Queue queue = session.createQueue(disname);
//消息生产者
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//创建一条消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"productor:我是大帅哥,我现在正在生产东西!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"productor:我是大帅哥,我现在正在生产东西!,count:"+num);
//发送消息
messageProducer.send(msg);
//提交事务
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Consumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String BROKEN_URL = "failover://tcp://192.168.1.128:61616";
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger();
public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
System.out.println("..");
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("....");
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ProducterTest {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
ProducterTest testMq = new ProducterTest();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}
private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
}
@Override
public void run() {
while(true){
try {
producter.sendMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
//第一种消费消息的方式
public class ConsumerTest {
public static void main(String[] args){
Consumer comsumer = new Consumer();
comsumer.init();
ConsumerTest testConsumer = new ConsumerTest();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
}
private class ConsumerMq implements Runnable{
Consumer comsumer;
public ConsumerMq(Consumer comsumer){
this.comsumer = comsumer;
}
@Override
public void run() {
while(true){
try {
comsumer.getMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
//第二种消费消息的方式
//点对点-使用监听器接收消息
public class ConsumerListener {
// tcp 地址 服务器器端地址
//public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值为 "tcp://localhost:61616";
public static final String BROKER_URL = "tcp://192.168.1.128:61616";
// 目标地址,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp中可以查询到发送的mq消息
public static final String DESTINATION = "Jaycekon-MQ";
//测试连接使用默认的用户名
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//默认为null
//测试连接使用默认的密码
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认为null
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 1、创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ConsumerListener.DEFAULT_USER, ConsumerListener.DEFAULT_PASSWORD, ConsumerListener.BROKER_URL);
// 2、通过工厂创建一个连接
connection = factory.createQueueConnection();
// 3、启动连接
connection.start();
// 4、创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、创建一个消息队列
Queue queue = session.createQueue(DESTINATION);
// 创建消息接收者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
//使用内部类为消息接收者加载相应的Listener监听
receiver.setMessageListener(new MessageListener() {
//重写onMessage方法
public void onMessage(Message msg) {
if (msg != null) {
TextMessage textMessage = (TextMessage) msg;
try {
System.out.println("接收#" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠10s再关闭 接收生产者发送的全部的10条消息
// 需要注意的是这里使用sleep会使当前正在执行的线程进入休眠状态
// 也就是QueueReceiver_Listener这个类进入休眠状态了,而接收者的监听器仍然会继续执行的哦。
Thread.sleep(1000 * 10);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
ConsumerListener.run();
}
}
分享到:
相关推荐
与点对点模型不同,发布/订阅模式中的消费者可以是多个,每个订阅者都能接收到所有发布的消息。 C#中使用ActiveMQ的NMS库来进行消息操作。NMS提供了一组API,使得.NET开发者能够轻松地与ActiveMQ交互。以下是一些...
3. **jms规范教程.pdf** - 这是一个关于JMS规范的教程,可能详细讲解了JMS接口、消息类型(如点对点和发布/订阅模型)、消息队列和主题等核心概念,为理解JMS和ActiveMQ的工作原理提供了基础。 综上所述,这个主题...
JMS,作为SUN公司提出的旨在统一不同MOM系统的规范,为开发人员提供了两套消息模型:点对点(PTP)和发布/订阅(Pub/Sub)。JMS定义了Java中访问消息中间件的标准接口,但本身并不提供实现,实现这些接口的消息...
通过本文的介绍,我们可以了解到ActiveMQ作为一种高性能、可靠的MOM系统,不仅支持传统的点对点消息传递模式,还支持更为灵活的发布/订阅模式。此外,通过JMS这一标准化接口,ActiveMQ能够与其他Java应用无缝集成,...
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。...⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点 ⒏ 支持Ajax ⒐ 支持与Axis的整合 ⒑ 可以很容易得调用内嵌JMS provider,进行测试.
4. **创建目的地**: 目的地可以是队列或主题,取决于你想要实现的通信模式(点对点还是发布/订阅)。 ```csharp var destination = session.CreateQueue("myQueue"); ``` 5. **创建生产者**: 生产者负责发送消息...
1. **点对点模型 (PTP)**:消息从生产者传送到消费者,每个消息只能被一个消费者消费。在PTP模型中,Destination是一个Queue(队列),消息先发送到队列中,然后由队列根据策略分发给注册的消费者。这种模型适合于...
ActiveMQ采用了一种灵活且强大的消息传递模型,它支持多种消息传递模式,如点对点(PTP)和发布/订阅(pub/sub)等。在ActiveMQ中,消息可以通过队列(Queue)或主题(Topic)进行传递。 - **队列(Queue)**:在队列中,每个...
最后,ActiveMQ还提供了一些高级代理器特性和客户端选项,这些高级特性允许用户对消息代理进行更细粒度的控制,以适应各种复杂的业务场景。 总结来说,ActiveMQ是一个功能丰富、高度可靠且易于使用的消息中间件,它...
消息传递有两种主要模型:点对点和发布/订阅。P2P模型中,消息直接从一个进程发送到另一个进程。而在Pub/Sub模型中,发送者(发布者)广播消息,而接收者(订阅者)选择他们想要接收的消息类型。 4. **协议与标准*...
作者深入探讨了如何配置高可用性ActiveMQ、在代理网络间传送消息以及为高并发程序部署ActiveMQ。这些高级部署技巧对于保证消息系统的稳定和可靠性至关重要。 代理功能实战部分详细介绍了ActiveMQ的一些高级功能,如...
ActiveMQ是Apache出品的开源消息总线。ActiveMQ是一个完全支持JMS1.1和...7、从设计上保证了高性能的集群,客户端-服务器,点对点 8、支持Ajax 9、支持与Axis的整合 10、可以很容易得调用内嵌JMS provider,进行测试
1. **Queue**:队列是一种点对点的消息模式,其中每个消息只能被一个Receiver消费。 2. **Session**:会话是发送和接收消息的工作单元,它处理事务和消息确认。 3. **Connection**:连接是应用程序与JMS提供者的通信...
消息队列是一种在消息传输过程中保存消息的容器,它允许在两台计算机之间传送消息,这些消息可以很简单,比如只包含文本字符串,也可以很复杂,如包含嵌入对象。消息队列的作用主要是异步处理、提高系统性能、降低...
- ActiveMQ支持多种消息传递模式,包括点对点和发布订阅。 - 它还支持持久化存储、集群等功能,能够满足大型企业级应用的需求。 综上所述,掌握JMS的基本概念和核心API对于开发可靠的分布式系统至关重要。了解JMS...
1. 点对点模型(P2P):消息生产者将消息发送到队列中,然后消息消费者从队列中取出并消费消息。队列的特点是消息只能被消费一次,之后将不再存储,且一个消息只会被一个消费者消费。 2. 发布/订阅模型(Pub/Sub)...
Queue用于点对点的消息传输,而Topic用于发布/订阅模型。在ActiveMQ中,每个Queue或Topic可以视作一个消息服务通道,客户端可以连接到这些通道上进行消息的发送和接收。 ActiveMQ的监控通常涉及监控其消息的发送...
- **点对点(Point-to-Point, PTP)**:消息仅能被一个消费者接收。目的地称为队列(Queue)。 - 特点:每个消息只能有一个消费者;生产者和消费者之间不存在时间相关性。 - **发布/订阅(Publish/Subscribe, Pub/...