`

activemq消息队列-点对点通讯

    博客分类:
  • java
阅读更多

 

  点对点通讯:消息发送-Queue-消息接收,消息发送到队列,消息接收者阻塞式接收消息。

                       如果没有消息,接收消息方法receive()阻塞。

 

  package mq.p2p;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producter {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址
    //private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String BROKEN_URL = "failover://tcp://192.168.191.128:61616";
   
    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

package mq.p2p;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String BROKEN_URL = "failover://tcp://192.168.191.128:61616";
   
    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                System.out.println("..");
                TextMessage msg = (TextMessage) consumer.receive();
                System.out.println("....");
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

package mq.p2p;

public class ProducterTest {
    public static void main(String[] args){
        Producter producter = new Producter();
        producter.init();
        ProducterTest testMq = new ProducterTest();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable{
        Producter producter;
        public ProductorMq(Producter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

package mq.p2p;

public class ConsumerTest {
    public static void main(String[] args){
        Consumer comsumer = new Consumer();
        comsumer.init();
        ConsumerTest testConsumer = new ConsumerTest();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Consumer comsumer;
        public ConsumerMq(Consumer comsumer){
            this.comsumer = comsumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

分享到:
评论

相关推荐

    apache-activemq-5.9.0-bin

    1. **消息队列**:ActiveMQ支持多种消息模式,如点对点(Queue)和发布/订阅(Topic)。消息队列确保消息的可靠传输,即使在发送方和接收方之间发生故障时也能保持数据的完整性。 2. **JMS兼容性**:ActiveMQ完全...

    activemq-cpp-library-3.9.5-src.zip

    总之,ActiveMQ-CPP库3.9.5版本为C++开发者提供了强大的消息传递功能,无论是简单的点对点通信还是复杂的发布/订阅模式,都能灵活应对。通过熟悉和掌握其API及特性,开发者可以构建高效、可靠的分布式系统,实现数据...

    apache-activemq-5.8.0-bin.zip

    - **队列(Queues)**:提供点对点的消息传递,消息仅被一个消费者消费。 - **主题(Topics)**:支持发布/订阅模式,消息可以被多个消费者接收。 - **持久化**:ActiveMQ支持将消息持久化到磁盘,即使在服务器...

    apache-activemq-5.13.2-bin.tar.gz

    ActiveMQ的核心功能是作为消息代理,它允许应用程序通过发布/订阅和点对点模式进行异步通信。这种通信方式提高了系统的可扩展性和可靠性,因为消息可以在生产者和消费者之间独立传输,即使它们在不同的时间运行或...

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅

    SpringBoot整合ActiveMQ消息队列和双向队列、点对点与发布订阅,可以参考我的博客文章进行学习https://blog.csdn.net/sujin_/article/details/82956386

    apache-activemq-5.3.0-bin.zip

    - **队列与主题**:ActiveMQ支持两种消息模式——点对点(Queue)和发布/订阅(Topic)。队列保证消息的顺序传递,而主题则允许广播式的消息分发。 2. **ActiveMQ功能**: - **高可用性**:通过集群和复制策略,...

    apache-activemq-5.15.3-bin

    它的核心功能包括发布/订阅和点对点的消息模式,以及事务处理和持久化机制。 **2. 版本5.15.3特性** - **稳定性与性能优化**:此版本在前一版本的基础上进行了大量优化,提高了系统的稳定性和处理消息的性能。 - **...

    apache-activemq-5.15.7-bin.tar.gz

    - **主题与队列**:支持发布/订阅模式的主题和点对点模式的队列,满足不同应用场景的需求。 - **安全性**:通过JAAS(Java Authentication and Authorization Service)实现用户身份验证和权限控制。 - **消息优先级...

    apache-activemq-5.3.1-bin.tar.gz

    ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 **3. 安装过程** 下载“apache-activemq-5.3.1-bin.tar.gz”后,你需要解压到一个合适的目录。这可以通过命令行工具如`tar`完成: ```...

    apache-activemq-5.9.1-bin.tar.gz

    此外,ActiveMQ支持多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。点对点模式下,消息只被一个消费者接收;而在发布/订阅模式下,消息可以被多个订阅者消费。这些模型适用于不同的应用场景。 消息传输的...

    Apache.NMS.ActiveMQ-1.7.0-bin

    ActiveMQ不仅支持点对点(P2P)和发布/订阅(Pub/Sub)两种消息模型,还支持多种协议,如OpenWire、STOMP、AMQP和MQTT等。 NMS是Apache开发的一个.NET接口,允许.NET开发者可以利用ActiveMQ的功能,包括创建消费者...

    activemq-all-5.15.2.jar 和 jms-1.1.jar

    "jms-1.1.jar"包含了JMS 1.1规范的实现,这是JMS的第二个主要版本,提供了发布/订阅和点对点两种消息传递模式。 4. **使用场景**: activemq-all-5.15.2.jar和jms-1.1.jar通常在以下场景中使用:大型分布式系统中的...

    apache-activemq-5.15.9-bin.zip

    1. **JMS支持**:ActiveMQ完全遵循JMS 1.1规范,提供了多种消息模式,如点对点、发布/订阅,以及事务和持久化机制。 2. **多协议支持**:ActiveMQ不仅支持OpenWire,还支持STOMP、AMQP、MQTT、WS和HTTP等多种消息...

    apache-activemq-5.7.0-bin.tar.gz

    1. **消息传递**:ActiveMQ支持点对点和发布/订阅两种消息模式,提供可靠的消息传输,确保消息的有序性和持久性。 2. **协议支持**:它不仅支持JMS,还通过多种协议如OpenWire、AMQP、STOMP、MQTT和WS-MQTT提供跨...

    activemq-cpp-library-3.6.0-src.tar.gz_C# ActiveMQ_activemq_activ

    4. **消息模式**:ActiveMQ支持多种消息模式,如点对点(Queue)、发布/订阅(Topic)、请求/响应(Request/Reply)等。这些模式适应了不同场景下的通信需求,如可靠的单次传递、广播或者分布式计算。 5. **高级...

    apache-activemq-5.15.1-bin.tar.gz

    ActiveMQ实现了JMS 1.1规范,提供了多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。 3. **多种协议支持**:ActiveMQ不仅支持JMS,还支持STOMP、AMQP、XMPP、OpenWire等多种消息传输协议,使其能与其他非...

    apache-activemq-5.15.2-bin.zip

    7. **消息类型**:ActiveMQ支持多种消息类型,包括点对点(Queue)和发布/订阅(Topic)模式。Queue用于一对一通信,而Topic用于一对多广播式通信。 8. **高可用性和集群**:ActiveMQ支持集群和故障转移,可以创建...

    activemq-ra-4.0-M3.jar.zip

    - **负载均衡**:消息队列可以平衡不同服务器间的负载,避免单点过载,提高系统整体性能。 - **事务支持**:在Java EE环境中,可以利用JTA事务管理,确保消息传递的原子性和一致性。 总的来说,`activemq-ra-4.0-M3...

    apache-activemq-5.14.5-bin.tar.gz

    - **消息模型**:ActiveMQ支持点对点(Queue)和发布/订阅(Topic)两种消息模型。 2. **ActiveMQ的功能特性**: - **高性能**:ActiveMQ采用高效的内存和磁盘存储策略,确保高吞吐量和低延迟。 - **持久化**:...

    apache-activemq-5.15.5-bin.tar.gz

    此外,理解JMS规范中的概念,如队列(Queue)、主题(Topic)、生产者(Producer)、消费者(Consumer)以及消息模型(点对点、发布/订阅),对于有效利用ActiveMQ至关重要。 总之,Apache ActiveMQ是企业级消息...

Global site tag (gtag.js) - Google Analytics