`

ActiveMQ实战1:ActiveMQ Java

阅读更多

1.准备工作

  1) 下载安装,启动activemq

  2) 下载activemq   jar包导入项目

2.消息生产者

package com.activemq.demo1;

import javax.jms.*;
import org.apache.activemq.*;
/**
 * 消息生产者,用于生成并发送消息
 */
public class ProducerTool {
	
    private String user = ActiveMQConnection.DEFAULT_USER;    
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;    
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;    
    private String subject = "TOOL.DEFAULT";    
    private Destination destination = null;    
    private Connection connection = null;    
    private Session session = null;    
    private MessageProducer producer = null;    
   
    /**
     * 初始化    
     * @throws Exception
     */
    private void initialize() throws Exception {    
        ActiveMQConnectionFactory connectionFactory = 
        		new ActiveMQConnectionFactory(user, password, url);    
        connection = connectionFactory.createConnection();
        /* 创建Session,参数解释:            
                            第一个参数     是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,
        	没有回应则抛出异常,消息发送程序负责处理这个错误。            
                            第二个参数      消息的确认模式:            
              AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,
        		但传输过程中可能因为错误而丢失消息。           
         	  CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法
         	  	(会通知消息提供者收到了消息)            
              DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息
              	(这种确认模式不在乎接收者收到重复的消息)。*/
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
        destination = session.createQueue(subject);    
        producer = session.createProducer(destination);  
        //设置是否持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);    
    }    
   
    /**
     * 发送消息    
     * @param message
     * @throws Exception
     */
    public void produceMessage(String message) throws Exception {    
        initialize(); 
        //发送TextMessage,还可发送MapMessage,ObjectMessage,StreamMessage
        TextMessage msg = session.createTextMessage(message);
        connection.start(); 
        System.out.println("Producer:-> send start.");
        producer.send(msg); 
        System.out.println("Producer:-> send complete.");
        close();
    }    
   
    /**
     * 关闭连接    
     * @throws JMSException
     */
    public void close() throws JMSException { 
    	System.out.println("Producer:->Closing Connection.");
        if (producer != null)    
            producer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
}    

3.消息消费者

package com.activemq.demo1;

import javax.jms.*;
import javax.jms.Message;

import org.apache.activemq.*;
/**
 * 消息消费者,用于接收消息
 */
public class ConsumerTool implements MessageListener {  
	
    private String user = ActiveMQConnection.DEFAULT_USER;    
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;    
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;    
    private String subject = "TOOL.DEFAULT";    
    private Destination destination = null;    
    private Connection connection = null;    
    private Session session = null;    
    private MessageConsumer consumer = null;    
   
    /**
     * 初始化    
     * @throws JMSException
     * @throws Exception
     */
    private void initialize() throws Exception {    
        ActiveMQConnectionFactory connectionFactory = 
        		new ActiveMQConnectionFactory(user, password, url);    
        connection = connectionFactory.createConnection();    
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    
    }    
   
    /**
     * 消费消息    
     * @throws Exception
     */
    public void consumeMessage() throws Exception {    
        initialize();    
        connection.start();    
        System.out.println("Consumer:->Begin listening...");    
        // 开始监听    
        consumer.setMessageListener(this); 
    }    
   
    /**
     * 关闭连接    
     * @throws JMSException
     */
    public void close() throws JMSException {    
        System.out.println("Consumer:->Closing connection");    
        if (consumer != null)    
            consumer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
   
    /**
     *  消息处理函数    
     */
    public void onMessage(Message message) { 
        try {    
            if (message instanceof TextMessage) {    
                TextMessage txtMsg = (TextMessage) message;    
                String msg = txtMsg.getText();    
                System.out.println("Consumer:->Received textMessage: " + msg);    
            } else {    
                System.out.println("Consumer:->Received: " + message);    
            } 
            close();
        } catch (JMSException e) {    
            e.printStackTrace();    
        }    
    }

}  

4.测试类

package com.activemq.demo1;

import javax.jms.*;

public class Test {    
   
    /**   
     * @param args   
     */   
    public static void main(String[] args) throws JMSException, Exception {    
        ConsumerTool consumer = new ConsumerTool();    
        ProducerTool producer = new ProducerTool();    
        // 开始监听    
        consumer.consumeMessage();    
            
        // 延时500毫秒之后发送消息    
        Thread.sleep(500);    
        producer.produceMessage("Hello, world!");    
            
    }    
}    

 

分享到:
评论

相关推荐

    activeMq 实战

    ### ActiveMQ 实战 #### JMS 基本构件概览 **ActiveMQ** 是一个高性能、功能丰富的开源消息中间件,它实现了 **Java Message Service (JMS)** 规范。JMS 规范定义了一组接口,这些接口提供了一个标准的方式来进行...

    ActiveMQ实战(英文版)

    **ActiveMQ实战(英文版)** ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循Java消息服务(JMS)规范,支持多种协议,并且跨平台运行。这本书《ActiveMQ实战(英文版)》旨在深入探讨如何在实际环境中...

    activemq实战

    #### 三、ActiveMQ实战案例介绍 - **案例概述**:本书通过一系列实例来展示如何使用ActiveMQ构建各种应用场景下的消息传递系统。 - **目的**:通过实际案例帮助读者深入了解ActiveMQ的核心概念和技术细节,并学会...

    ActiveMQ实战资料

    **ActiveMQ实战资料详解** Apache ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它是Java消息服务(Java Message Service,简称JMS)的一个实现。在分布式系统中,ActiveMQ扮演着至关重要的角色,它允许...

    ActiveMQ 实战

    ActiveMQ实战手册以介绍JMS和ActiveMQ的操作及配置为主,JMS(Java Message Service)是Java平台中对于面向消息中间件(MOM)的一种标准的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。...

    activeMQ实战demo

    在这个“ActiveMQ实战demo”中,我们将深入探讨如何使用ActiveMQ进行消息发送和接收,并了解其工作原理。 首先,让我们了解一下JMS。JMS是Java平台上的一个标准接口,定义了生产、消费、管理和消息队列的标准API。...

    ACTIVEMQ实战 部分翻译(1——4.3)

    ActiveMQ是一种广泛使用的企业级消息中间件,它遵循JMS(Java Message Service)规范,为分布式系统中的不同应用提供可靠的消息传递服务。在本篇内容中,我们会深入探讨ActiveMQ的特性、安装与运行,以及它如何与...

    ActiveMQ-Java P2P模式MQ实战

    我下载的时候是 ActiveMQ 5.14.4 Release版 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是...

    ActiveMQ实战——实现一个简易版的聊天室

    在本篇《ActiveMQ实战——实现一个简易版的聊天室》中,我们将深入探讨如何利用Apache ActiveMQ构建一个简单的在线聊天应用。ActiveMQ是Apache软件基金会的一个开源项目,它是一款功能强大的消息中间件,用于在...

    activemq-samples:Apache ActiveMQ 示例

    1. **Apache ActiveMQ 简介**: Apache ActiveMQ 是一个功能强大的、高性能的、多语言支持的消息代理。它遵循 JMS 规范,提供多种协议支持,如 OpenWire、STOMP、AMQP 和 MQTT,可以用于分布式系统中的异步通信。...

    spring-activeMQ-demo:spring-activeMQ-演示

    《Spring与ActiveMQ整合实战详解》 在Java开发领域,消息队列(Message Queue)作为解耦、异步处理和提高系统吞吐量的重要工具,被广泛应用。Spring框架以其强大的集成能力,使得与各种消息中间件如ActiveMQ的整合...

    activemq实战项目,同ssh框架整合(生产者+消费者)

    通过这个实战项目,开发者可以深入理解如何在Java Web环境中使用ActiveMQ,以及如何解决在实际应用中可能出现的问题,如消息的可靠传输和幂等问题。同时,它也提供了一个良好的学习平台,帮助初学者掌握SSH与...

    ActiveMQ 入门实战(3)--SpringBoot 整合 ActiveMQ(csdn)————程序.pdf

    - **ActiveMQ**:Apache ActiveMQ 是一个开源的消息中间件,它实现了 Java Message Service (JMS) 规范,提供可靠的消息传递和队列管理。 - **JMS**:Java Message Service 是一个标准接口,用于在分布式环境中交换...

    2019实战ActiveMQ集群与应用实战视频教程

    本视频教程通过实战的方式介绍了 ActiveMQ 的集群搭建与应用,涵盖了从基础概念到实际部署的全过程。通过学习这些知识点,不仅可以帮助开发者深入了解 ActiveMQ 的工作原理,还能够掌握如何在实际项目中有效地利用 ...

    spring boot ActiveMQ学习练习demo项目源码

    Spring Boot ActiveMQ学习练习demo项目源码是一个针对Java开发者的学习资源,它涵盖了使用Spring Boot集成ActiveMQ进行消息队列操作的基本实践。ActiveMQ是Apache软件基金会的一个开源项目,它是Java消息服务(JMS)...

    使用ActiveMQ示例.pdf

    ActiveMQ 是 Apache 开源项目提供的一款高效、可靠的开源消息中间件,它实现了 Java 消息服务(Java Message Service,简称 JMS),为分布式系统中的应用提供了异步通信的能力。JMS 是一个工业标准,它定义了一组...

    ActiveMQDemo实战.pdf

    在ActiveMQ中,生产者和消费者都是通过JMS API(Java Message Service Application Programming Interface)来与消息代理进行交互的。 生产者核心代码逻辑部分,主要介绍了如何使用JMS API来构建消息生产者,并通过...

    ActiveMQ 使用Ajax 收发消息实战

    标题中的"ActiveMQ 使用Ajax 收发消息实战"指出我们将探讨如何使用ActiveMQ消息中间件与Ajax技术结合,实现Web应用程序中的异步消息传递。ActiveMQ是Apache软件基金会的一个项目,提供了一个开源的消息代理,支持...

    消息队列activemq.zip

    四、ActiveMQ实战应用 在实际应用中,ActiveMQ常用于以下场景: - 微服务间的异步通信:微服务架构中,通过消息队列可避免直接调用导致的阻塞,提高系统响应速度。 - 数据同步:在分布式数据库或主从复制环境中,...

Global site tag (gtag.js) - Google Analytics