`

ActiveMQ启动与实现

    博客分类:
  • MQ
 
阅读更多
ActiveMQ服务器启动与java简单实现发送与接收

1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ
windows:
解压缩apache-activemq-5.14.2-bin.zip,然后双击apache-activemq-5.14.2\bin\win64\activemq.bat运行ActiveMQ程序。


centos:
下载activemq:
wget http://archive.apache.org/dist/activemq/5.13.5/apache-activemq-5.13.5-bin.tar.gz  

启动activemq:
[root@centos64 *] cd  /home/hailong/activemq/apache-activemq-5.13.5/bin  
[root@centos64 bin]./activemq start 



启动ActiveMQ以后,登陆:http://localhost:8161/admin/,用户名密码在apache-activemq-5.14.2\conf\jetty-realm.properties文件里。

什么情况下使用ActiveMQ?
多个项目之间集成
(1) 跨平台 pts->wms
(2) 多语言
(3) 多项目
降低系统间模块的耦合度,解耦
(1) 软件扩展性
系统前后端隔离
(1) 前后端隔离,屏蔽高安全区
举例:电商中,添加商品库存,商品下单后到OMS推送

3.在ActiveMQ服务器创建一个Queue


4.ActiviteMQ消息有3中形式



(1)、点对点方式(point-to-point)
点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行
(2)、发布/订阅 方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

当采用点对点模型时,消息将发送到一个队列,该队列的消息只能被一个消费者消费。而采用发布订阅模型时,消息可以被多个消费者消费,且时间上的依赖,一个接收端只能接收他创建以后客户端发送的信息。也就是说在发布订阅模型中,生产者和消费者完全独立,不需要感知对方的存在。

5.ActiviteMQ接收和发送消息基本流程


发送消息的基本步骤:
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)、使用消息生产者MessageSender发送消息

消息接收者从JMS接受消息的步骤
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。

6.简单代码实现
发送方:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        /* Connection :JMS 客户端到JMS Provider 的连接 */
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination = session.createQueue("SecondQueue");
            // 得到消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化,此处学习,实际根据项目决定
            // PERSISTENT:保存到磁盘,consumer消费之后,message被删除。
            // NON_PERSISTENT:保存到内存,消费之后message被清除。
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息,此处写死,项目就是参数,或者方法获取
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }
}


接收方:
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Destination;
import javax.jms.TextMessage;
import javax.jms.DeliveryMode;

public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                //设置接收者接收消息的时间,为了便于测试,这里谁定为1s
                TextMessage message = (TextMessage) consumer.receive(10000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}


7.JMS的主要特性
1.面向Java平台的标准消息传递API
2.在Java或JVM语言比如Scala、Groovy中具有互用性
3.无需担心底层协议
4.有queues和topics两种消息传递模型
5.支持事务
6.能够定义消息格式(消息头、属性和内容)

JMS或者NMS都没有标准的底层协议。它们可以在任何底层协议上运行,但是API是与编程语言绑定的。AMQP(RabbitMQ是一个开源的AMQP实现)解决了这个问题,它使用了一套标准的底层协议,加入了许多其他特征来支持互用性,为现代应用丰富了消息传递需求。

  • 大小: 77.6 KB
  • 大小: 95.1 KB
  • 大小: 288.9 KB
  • 大小: 15.6 KB
  • 大小: 16.3 KB
分享到:
评论

相关推荐

    windows版activemq启动包

    总的来说,这个"windows版activemq启动包"提供了一站式的解决方案,让用户能在Windows环境中轻松部署和运行ActiveMQ 5.15.2,实现高效的消息传递和系统集成。只需简单的步骤,你就可以开始探索和利用ActiveMQ的强大...

    activemq自启动并设置用户名密码

    创建一个指向ActiveMQ启动脚本的软链接,并将其放置在`/etc/init.d/`目录下: ```bash ln -s /usr/local/apache-activemq/bin/activemq /etc/init.d/ chmod +x /etc/init.d/activemq chkconfig --add activemq ...

    Linux下activeMQ的启动和停止.docx

    在Linux环境下,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它是Java Message Service (JMS) 的实现,能够处理大量的并发消息传递。ActiveMQ提供了高可用性、可扩展性和稳定性,使得它成为分布式...

    activeMq启动步骤

    如果你的项目已经集成了Spring框架,你可以将ActiveMQ与Spring结合,实现更便捷的消息管理。以下是一般步骤: 1. 添加ActiveMQ和Spring的依赖到你的项目中,如在Maven的pom.xml文件中添加相应的依赖项。 2. 配置...

    ActiveMQ实现

    关于代码实现,ActiveMQ提供了多种API和客户端库,如Java、C++、Python等,用于与消息代理进行交互。使用JMS API,你可以创建生产者发送消息,以及创建消费者接收消息。例如,创建一个简单的Java应用,你需要导入`...

    ActiveMQ学习笔记之四--启动嵌入式Broker(纯代码方式)

    在本篇ActiveMQ学习笔记中,...总之,启动ActiveMQ的嵌入式Broker是通过创建和配置`BrokerService`实例来实现的,这种方式适合于快速开发和测试环境。理解并掌握这一技巧,能帮助开发者更灵活地利用ActiveMQ的功能。

    ActiveMQ+zookeeper实现高可用和负载均衡(代码和测试)

    ### ActiveMQ+zookeeper实现高可用和负载均衡 #### 一、背景与目标 在现代分布式系统中,消息中间件如ActiveMQ扮演着重要的角色,它能够帮助应用之间进行可靠的消息传递。为了保证系统的稳定性和可靠性,通常需要...

    activemq负载均衡实现.doc

    在高可用性和高并发场景下,为了实现负载均衡和故障转移,ActiveMQ 提供了 Broker-Cluster(经纪人集群)部署模式。这种部署方式允许多个 ActiveMQ 经纪人实例通过网络互相连接,共享队列,从而实现消息的负载均衡。...

    ActiveMq+SpringMVC实现邮件异步发送

    在本项目中,ActiveMQ与SpringMVC框架结合,实现了邮件的异步发送功能。 首先,我们需要理解ActiveMQ的基本概念。ActiveMQ是Apache软件基金会的产品,遵循JMS(Java消息服务)规范,支持多种协议,并且可以跨平台...

    ActiveMQ5.13 安装与配置

    "ActiveMQ5.13 安装与配置" ActiveMQ 是 Apache 软件基金会提供的一个开源message broker,能够实现点对点(Point-to-Point)和发布/订阅(Publish/Subscribe)模式的消息传递。ActiveMQ 5.13 是 ActiveMQ 的一个...

    springboot集成activemq实现消息接收demo

    在IT行业中,Spring Boot是一个非常流行的微服务框架,它简化了Spring应用的开发过程。而ActiveMQ是Apache出品的一款...在实际生产环境中,ActiveMQ还可以与其他中间件、微服务架构结合,提供更强大的消息处理能力。

    ActiveMQ的安装与使用

    接下来我们将详细阐述ActiveMQ的安装与使用过程,包括安装前提条件、安装步骤、配置和启动以及安全配置等知识点。 首先,安装ActiveMQ之前需要有一个Java运行环境。这是因为ActiveMQ是使用Java开发的。JDK(Java ...

    采用Spring整合activeMQ与quartz的JMS数据同步实例

    在这个实例中,我们将探讨如何利用Spring框架整合ActiveMQ(一个流行的开源消息代理)和Quartz(一个广泛使用的作业调度库)来实现JMS(Java消息服务)数据同步。这个方案尤其适用于大型分布式系统,它能够确保即使...

    activeMq的简易实现和应用

    然后,在bin目录下运行相应的启动脚本(对于Windows是start ActiveMQ.cmd,对于Linux是bin/activemq start)。默认情况下,ActiveMQ会监听61616端口进行TCP连接,且管理界面可在8161端口的...

    ActiveMQ使用Ajax实现多人聊天室

    实现ActiveMQ与Ajax结合的多人聊天室步骤如下: 1. **环境准备**:首先,需要在服务器上安装和配置ActiveMQ。下载ActiveMQ的最新版本,解压并启动服务。确保ActiveMQ的Web控制台可用,这样可以通过HTTP接口与之交互...

    用C#实现的ActiveMQ发布/订阅消息传送

    在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...

    CentOS安装Activemq图文教程

    本文将详细介绍如何在CentOS系统中安装和配置Activemq,并实现开机启动的设置。 一、下载和安装Activemq 首先,我们需要从Apache官方网站下载Activemq的安装包。在浏览器中访问...

    ActiveMQ实现的消息收发案例

    本案例将探讨如何使用ActiveMQ实现简单消息的发送与接收。 首先,我们要理解JMS的基本概念。JMS是Java平台上的标准接口,用于与消息中间件进行交互。它定义了生产、存储、消费和检索消息的标准API。在JMS中,有两种...

    activemq实现信息分发实例

    - **启动ActiveMQ**:首先需要在本地安装并启动ActiveMQ服务器,可以通过bin目录下的start.bat或start.sh脚本进行启动。 - **配置连接**:在项目中配置ActiveMQ的连接参数,包括URL、用户名、密码等,确保应用程序...

    Spring+ActiveMQ实现,基于Maven

    在企业级应用开发中,Spring框架与ActiveMQ的结合使用是一种常见的消息中间件解决方案,用于实现应用程序间的异步通信和解耦。本项目基于Maven构建,提供了对Topic的实现,同时也支持轻松切换到Queue模式。 **...

Global site tag (gtag.js) - Google Analytics