`

消息中间件:ActiveMQ

    博客分类:
  • Java
 
阅读更多

ActiveMQ(Message Queue) 来自apache, 开源的消息总线.

完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,非常快速

官方网站:http://activemq.apache.org/ 

 

一, 特性

  • 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  • 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  • 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  • 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  • 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  • 支持通过JDBC和journal提供高速的消息持久化
  • 从设计上保证了高性能的集群,客户端-服务器,点对点
  • 支持Ajax
  • 支持与Axis的整合
  • 可以很容易得调用内嵌JMS provider,进行测试

二, 什么情况使用

  • 多项目之间集成 
  • 降低系统间模块的耦合度,解耦 
  • 系统前后端隔离

三, 启动

   windows: 双击bin目录下activemq.bat脚本

   linux: ./activemq start    ./activemq stop

 

四, 测试

      ActiveMQ默认连接端口是61616,

     可通过查看该端口的信息可以测试ActiveMQ是否成功启动 netstat -an|find "61616"

 

五, 监控

   ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 

 

   admin:http://127.0.0.1:8161/admin/             user/pwd: admin/admin

 

六, 通信方式

  1, publish-subscribe:  发布订阅模式,一对多的关系。

   2, P2P

       在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。

       和ub-sub的区别在于一个topic有一个发送者和多个接收者,

         而在p2p里一个queue只有一个发送者和一个接收者。

   3, request-response

      需要双方都能给对方发送消息

  备注: 参考: http://shmilyaw-hotmail-com.iteye.com/blog/1897635

                    https://www.cnblogs.com/zhuxiaojie/p/5564187.html

 

七, 安全配置

activemq/conf/jetty.xml:

   <pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="admin" />
         <!-- 把这个改为true,当然,高版本的已经改为了true -->
        <property name="authenticate" value="true" />
  </bean>

 找到activemq/conf/activemq.xml,并打开

<plugins>
             <simpleAuthenticationPlugin>
                 <users>
                     <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
                 </users>
             </simpleAuthenticationPlugin>
</plugins>

 然后账号密码的配置在activemq/conf/credentials.properties文件中

#账号
activemq.username=admin
#密码
activemq.password=123456
guest.password=password

 

八, 示例

流程:

     1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。

     2. 利用factory构造JMS connection

     3. 启动connection

     4. 通过connection创建JMS session.

     5. 指定JMS destination.

     6. 创建JMS producer或者创建JMS message并提供destination.

     7. 创建JMS consumer或注册JMS message listener.

     8. 发送和接收JMS message.

     9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

 

public class Producter {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

public class Comsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

分享到:
评论

相关推荐

    实验三 消息中间件应用开发:ActiveMQ实现单线程多队列

    【标题】:“实验三 消息中间件应用开发:ActiveMQ实现单线程多队列” 在IT领域,消息中间件是一种重要的软件架构组件,它主要用于应用程序之间的异步通信,提高系统的可扩展性和解耦性。本实验主要关注的是如何...

    消息中间件-activeMQ.zip

    ActiveMQ是中国最流行的消息中间件之一,它是一个开源的、基于Java消息服务(JMS)规范的实现,被广泛应用于分布式系统中,用于处理异步通信、解耦系统组件以及提高系统的可扩展性。在本压缩包文件"activeMQ-master...

    分布式通讯面试专题系列:ActiveMQ+RabbitMQ+Kafka.rar

    本面试专题系列涵盖了三个关键的分布式消息中间件:ActiveMQ、RabbitMQ和Kafka,这些都是Java开发者和技术面试者需要深入了解的重要知识点。下面我们将逐一探讨这三个组件的核心概念、使用场景以及在实际开发中的...

    消息中间件-ActiveMQ.zip

    1. **消息(Message)**:消息是数据传输的基本单元,包含了要传递的信息。在 ActiveMQ 中,消息可以通过不同的消息类型,如文本、二进制或对象消息进行传输。 2. **生产者(Producer)**:生产者是创建并发送消息...

    分布式通讯面试专题系列:ActiveMQ+RabbitMQ+Kafka.zip

    本专题将深入探讨三个流行的分布式消息中间件:ActiveMQ、RabbitMQ和Kafka,它们都是实现异步处理、解耦系统以及提供高可用性和可扩展性的关键工具。 首先,ActiveMQ是Apache软件基金会开发的一款开源的消息代理,...

    java中间件之activemq

    1. **异步处理**:消息中间件使得应用可以将耗时操作异步化,提高系统的响应速度。 2. **系统解耦**:通过消息队列,各个组件可以独立工作,降低组件间的耦合度,提高系统的可维护性和扩展性。 3. **流量削峰**:在...

    消息中间件应用开发: ActiveMQ实现单 线程多队列-Java代码类资源

    本资源主要关注如何使用ActiveMQ这一流行的消息中间件来实现单线程多队列的场景,特别适合Java开发者进行应用开发。 ActiveMQ是由Apache软件基金会开发的一款开源消息中间件,它支持多种消息协议,如AMQP、STOMP、...

    2019品优购.txt

    前端:angularJS + Bootstrap 后台:SSM( springmvc+spring+mybatis) 数据库:mysql,使用mycat读写分离 开发模式:SOA 服务中间件:dubbox,需要和zookeeper配合使用 注册中心:zookeeper 消息中间件:Activemq,...

    黑马品优购电商项目全套资源

    消息中间件:Activemq,使用弹簧JMS 负载均衡:nginx的的 搜索:Solr中的集群(solrCloud),配合动物园管理员搭建,使用弹簧-数据-索洛 缓存:Redis的的集群,使用弹簧数据redis的的 图片存储:fastDFS集群 | |...

    黑马49期全系列包括品优购

    zookeeper 消息中间件:Activemq,使用spring-jms 负载均衡:nginx 搜索:solr集群(solrCloud),配合zookeeper搭建, 使用spring-data-solor 缓存:redis集群,使用spring-data-redis 图片存储:fastDFS集群 网页...

    消息中间件之ActiveMQ视频课程

    当前使用较多的消息中间件有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等。本套视频以Apache的ActiveMQ作为切入点,分为基础/实战/面试上中下三大部分,带你从零基础入门到熟练掌握ActiveMQ,能够结合...

    38 4 ActiveMQ消息中间件视频教程

    教程视频:Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件

    activeMQ一个demo

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递协议(Open Message Broker,JMS)标准,提供高效、可靠的消息传递服务。在本文中,我们将深入探讨ActiveMQ的基本概念、工作原理以及如何...

    消息中间件之ActiveMQ.mmap

    消息中间件是一门极为重要的技术。在项目开发中使用到的频率很高。该技术能够很好的解决我们项目之间数据传输的问题。相比于dubbo等RPC远程调用技术,消息中间件更灵活,能够解耦项目之间的依赖,能够消峰、异步等。...

    ActiveMQ消息中间件.zip

    1. **消息中间件**:消息中间件是软件系统中用于在不同组件之间传输数据的中介,它解耦了发送者和接收者,使得系统更加模块化,提高系统的可扩展性和容错性。ActiveMQ就是这样的一个中间件,支持多种协议,如...

    消息中间件activemq项目demo

    5. **消息中间件的作用**:消息中间件在分布式系统中起到消息传递的作用,可以缓存和路由消息,确保高可用性和容错性,同时解耦发送方和接收方,降低系统间的依赖。 6. **消息生产者和消费者**:在消息传递模型中,...

    java消息中间件教程-activemq

    - **1-2 为什么使用消息中间件**:消息中间件作为一种关键的技术组件,在现代软件架构中扮演着重要的角色。它主要用于解决应用程序之间异步通信的问题,提高了系统的解耦合度、可扩展性和容错能力。 #### 二、消息...

    ActiveMQ消息服务器 v5.18.3.zip

    Apache ActiveMQ是开源的、基于Java的消息传递平台,它作为企业级消息中间件(Message Broker)在IT行业中扮演着关键角色。ActiveMQ提供了高度可靠的消息传递服务,支持多种消息协议,如JMS(Java Message Service)...

Global site tag (gtag.js) - Google Analytics