点对点通讯:消息发送-Queue-消息接收,消息发送到队列,消息接收者阻塞式接收消息。
如果没有消息,接收消息方法receive()阻塞。
package mq.p2p;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producter {
//ActiveMq 的默认用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默认登录密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//ActiveMQ 的链接地址
//private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String BROKEN_URL = "failover://tcp://192.168.191.128:61616";
AtomicInteger count = new AtomicInteger(0);
//链接工厂
ConnectionFactory connectionFactory;
//链接对象
Connection connection;
//事务管理
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
public void init(){
try {
//创建一个链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//从工厂中创建一个链接
connection = connectionFactory.createConnection();
//开启链接
connection.start();
//创建一个事务(这里通过参数可以设置事务的级别)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMessage(String disname){
try {
//创建一个消息队列
Queue queue = session.createQueue(disname);
//消息生产者
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//创建一条消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"productor:我是大帅哥,我现在正在生产东西!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"productor:我是大帅哥,我现在正在生产东西!,count:"+num);
//发送消息
messageProducer.send(msg);
//提交事务
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package mq.p2p;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String BROKEN_URL = "failover://tcp://192.168.191.128:61616";
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger();
public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
System.out.println("..");
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("....");
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package mq.p2p;
public class ProducterTest {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
ProducterTest testMq = new ProducterTest();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}
private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
}
@Override
public void run() {
while(true){
try {
producter.sendMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
package mq.p2p;
public class ConsumerTest {
public static void main(String[] args){
Consumer comsumer = new Consumer();
comsumer.init();
ConsumerTest testConsumer = new ConsumerTest();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
}
private class ConsumerMq implements Runnable{
Consumer comsumer;
public ConsumerMq(Consumer comsumer){
this.comsumer = comsumer;
}
@Override
public void run() {
while(true){
try {
comsumer.getMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
相关推荐
1. **消息队列**:ActiveMQ支持多种消息模式,如点对点(Queue)和发布/订阅(Topic)。消息队列确保消息的可靠传输,即使在发送方和接收方之间发生故障时也能保持数据的完整性。 2. **JMS兼容性**:ActiveMQ完全...
总之,ActiveMQ-CPP库3.9.5版本为C++开发者提供了强大的消息传递功能,无论是简单的点对点通信还是复杂的发布/订阅模式,都能灵活应对。通过熟悉和掌握其API及特性,开发者可以构建高效、可靠的分布式系统,实现数据...
- **队列(Queues)**:提供点对点的消息传递,消息仅被一个消费者消费。 - **主题(Topics)**:支持发布/订阅模式,消息可以被多个消费者接收。 - **持久化**:ActiveMQ支持将消息持久化到磁盘,即使在服务器...
ActiveMQ的核心功能是作为消息代理,它允许应用程序通过发布/订阅和点对点模式进行异步通信。这种通信方式提高了系统的可扩展性和可靠性,因为消息可以在生产者和消费者之间独立传输,即使它们在不同的时间运行或...
SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386
- **队列与主题**:ActiveMQ支持两种消息模式——点对点(Queue)和发布/订阅(Topic)。队列保证消息的顺序传递,而主题则允许广播式的消息分发。 2. **ActiveMQ功能**: - **高可用性**:通过集群和复制策略,...
它的核心功能包括发布/订阅和点对点的消息模式,以及事务处理和持久化机制。 **2. 版本5.15.3特性** - **稳定性与性能优化**:此版本在前一版本的基础上进行了大量优化,提高了系统的稳定性和处理消息的性能。 - **...
- **主题与队列**:支持发布/订阅模式的主题和点对点模式的队列,满足不同应用场景的需求。 - **安全性**:通过JAAS(Java Authentication and Authorization Service)实现用户身份验证和权限控制。 - **消息优先级...
ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 **3. 安装过程** 下载“apache-activemq-5.3.1-bin.tar.gz”后,你需要解压到一个合适的目录。这可以通过命令行工具如`tar`完成: ```...
此外,ActiveMQ支持多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。点对点模式下,消息只被一个消费者接收;而在发布/订阅模式下,消息可以被多个订阅者消费。这些模型适用于不同的应用场景。 消息传输的...
ActiveMQ不仅支持点对点(P2P)和发布/订阅(Pub/Sub)两种消息模型,还支持多种协议,如OpenWire、STOMP、AMQP和MQTT等。 NMS是Apache开发的一个.NET接口,允许.NET开发者可以利用ActiveMQ的功能,包括创建消费者...
"jms-1.1.jar"包含了JMS 1.1规范的实现,这是JMS的第二个主要版本,提供了发布/订阅和点对点两种消息传递模式。 4. **使用场景**: activemq-all-5.15.2.jar和jms-1.1.jar通常在以下场景中使用:大型分布式系统中的...
1. **JMS支持**:ActiveMQ完全遵循JMS 1.1规范,提供了多种消息模式,如点对点、发布/订阅,以及事务和持久化机制。 2. **多协议支持**:ActiveMQ不仅支持OpenWire,还支持STOMP、AMQP、MQTT、WS和HTTP等多种消息...
1. **消息传递**:ActiveMQ支持点对点和发布/订阅两种消息模式,提供可靠的消息传输,确保消息的有序性和持久性。 2. **协议支持**:它不仅支持JMS,还通过多种协议如OpenWire、AMQP、STOMP、MQTT和WS-MQTT提供跨...
4. **消息模式**:ActiveMQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)、请求/响应(Request/Reply)等。这些模式适应了不同场景下的通信需求,如可靠的单次传递、广播或者分布式计算。 5. **高级...
ActiveMQ实现了JMS 1.1规范,提供了多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。 3. **多种协议支持**:ActiveMQ不仅支持JMS,还支持STOMP、AMQP、XMPP、OpenWire等多种消息传输协议,使其能与其他非...
7. **消息类型**:ActiveMQ支持多种消息类型,包括点对点(Queue)和发布/订阅(Topic)模式。Queue用于一对一通信,而Topic用于一对多广播式通信。 8. **高可用性和集群**:ActiveMQ支持集群和故障转移,可以创建...
- **负载均衡**:消息队列可以平衡不同服务器间的负载,避免单点过载,提高系统整体性能。 - **事务支持**:在Java EE环境中,可以利用JTA事务管理,确保消息传递的原子性和一致性。 总的来说,`activemq-ra-4.0-M3...
- **消息模型**:ActiveMQ支持点对点(Queue)和发布/订阅(Topic)两种消息模型。 2. **ActiveMQ的功能特性**: - **高性能**:ActiveMQ采用高效的内存和磁盘存储策略,确保高吞吐量和低延迟。 - **持久化**:...
此外,理解JMS规范中的概念,如队列(Queue)、主题(Topic)、生产者(Producer)、消费者(Consumer)以及消息模型(点对点、发布/订阅),对于有效利用ActiveMQ至关重要。 总之,Apache ActiveMQ是企业级消息...