`
duan_cloud
  • 浏览: 8818 次
  • 性别: Icon_minigender_2
  • 来自: 自己输入城市...
最近访客 更多访客>>
社区版块
存档分类
最新评论
阅读更多
1 Jms是什么
Java message service:是用于创建、发送、读取消息的api,它完全尊从j2ee规范
The JavaMessage Service is a Java API that allows applications to create, send, receive, and read messages
Jsm使用于什么情况:
1 不依赖于其它组件,能被随时替换。
2 程序运行不依赖于其它组件,当其它组件没有运行时,程序依然能够正常运行
3 异步
Jms体系结构
jms 应用组成:jms provider是实现jms api的消息系统,它提供管理和控制功能。
               Jms client是生产和消费消息的java程序
               Message是在客户端传递的对象。
              受管对象(administered object)destinations and connection factories,


2 jms的两种模式
2.1 Point-to-Point Messaging Domain

特点:每个消息只有一个消费者
      生产者和消费者没有时间依赖关系(不管生产者是否在线,都不影响消费者消费信息)
      消费者确认成功处理消息

2.2 Publish/Subscribe Messaging Domain

特点:一个消息可以有多个消费者
      发布者和订阅者有时间依赖关系(订阅者只能消费订阅过的消息,它必须持续在线,durable subscription,允许订阅者离线)

3 Message Consumption

Synchronously:receive(同步获得消息),消费者不停的调用receive()方法获得消息
Asynchronously:messageListerner(异步获得消息)由监听器监听消息到来。



4 可靠性机制(获得或影响可靠性)
1.控制消息确认方式,通过设置消息确认,生产者可以知道消费者是否消费了消息。
2.设置消息持久化存储,可将消息存储在文件或数据库中确保断电后消息不丢失。
3.设置消息优先级,会影响消息的传输次序,优先级高的先传输。
4.设置消息生存时间,允许消息过期,过期的消息不传输,比如新闻过期后再获得没什么处了,就可以设置该属性。
5.创建临时目的地:临时目的地存在的时间是连接所保持的时间。
■ Controlling message acknowledgment: You can specify various levels of control over
message acknowledgment.
■ Specifying message persistence: You can specify that messages are persistent, meaning that they must not be lost in the event of a provider failure.
■ Setting message priority levels: You can set various priority levels for messages, which can affect the order in which the messages are delivered.
■ Allowing messages to expire: You can specify an expiration time for messages so that they will not be delivered if they are obsolete.
■ Creating temporary destinations: You can create temporary destinations that last only for the duration of the connection in which they are created.


4.1消息确认机制
步骤:1接收消息,2消费消息,3确认
消息能被jms provider或client确认,被哪种确认需要看session设置的
Session acknowledgment 方式:
Session.AUTO_ACKNOWLEDGE,session自动确认,不管客户端是否成功从receiver或onmessage方法中返回。
注意:在自动确认方式中,同步调用receiver方法,对消息确认步骤是个例外。在确认动作发生在第一步后,然后才是第二步。
Session.CLIENT_ACKNOWLEDGE session级,客户端确认
Session.DUPS_OK_ACKNOWLEDGE 批量确认

4.2 持久化存储

Jms支持两种传输方式来指定如果jms provider失败后消息是否丢弃,默认是持久化方式(PERSISTENT e),设置方式有两种producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 10000);最后两个参数的含义优先级、超期时间
4.3 优先级
优先级设置:优先级从0到9,默认是4
4.4到期时间
默认消息永远不超期。producer.setTimeToLive(60000); producer.send(message, DeliveryMode.NON_PERSISTENT, 3, 10000);
4.5临时目标地址:
只能消费同一个connection创建的消息(The only message consumers that can consume from a temporary destination are those createdby the same connection that created the destination)
高级可靠性机制
        创建持久化订阅:删除持久订阅先关闭订阅,topicSubscriber.close();
session.unsubscribe("MySub");
5 项目引入activemq
引入activemq-all-5.4.0.jar即可或引入

如果与第三方整合需要引入第三方程序依赖jar包。
6 Activemq

Activemq:jms provider
下面根据短信平台对activemq的需求介绍activemq的配置和使用
activemq 配置
1. 下载activemq  http://activemq.apache.org/download.html
     目录结构
    
2. 配置transport connectors (client、broker进行通讯时配置,用于接收和监听来自客户端的连接)
通过默认配置文件配置:在Conf/activemq.xml中配置如下

其它配置途径::

注意:
Broker名字最好唯一,确定后不要更改,便于查询log
删除transportconnectors的discoveryuri属性配置,为了防止发现其它的broker,引起一些莫名奇妙的问题
其它:关于uri,http://activemq.apache.org/broker-uri.html

完成以上配置后activemq可以正常启动,编写客户端程序后可以正常收发信息。
3. 持久化配置
A.
四种消息存储方式如下:
Activemq默认消息存储 Amq消息存储
KahaDB消息存储
Jdbc消息存储
基于内存的消息存储

配置以amq方式为例:
  Directory为文件存储路径,maxfilelength设置的数据文件的最大值,如果大于这个数目就新建一个数据文件。

可配置的其它属性

注意:在配置activemq时请注意networkconnectors、transportconnectors、persistence config的顺序
    
实例
客户端程序编写实例
生产者:
public class PublisherMsg {
//    private static String user = ActiveMQConnection.DEFAULT_USER;

    //    private static String password = ActiveMQConnection.DEFAULT_PASSWORD;

    public static void publishMessage() {
        String url = "tcp://localhost:61616";
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
//创建连接工厂
        Connection connection = null;
        Session session = null;
        try {
            connection = factory.createConnection();//创建连接
            connection.start();//必不可少
            session = connection.createSession(false, Session. CLIENT_ACKNOWLEDGE);
//使用非transactions方式,消息确认采用客户端确认
            Destination queue = session.createQueue("topictest.messages");
//创建队列topictest.messages
            Destination queue2 = session.createQueue("topictest2.messages");
            MessageProducer publisher = session.createProducer(queue);
//创建生产者
            MessageProducer publisher2 = session.createProducer(queue2);
            publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
//设置传输方式 持久化传输(已在配置文件中配置的初九化方式)
            for (int i = 0; i < 200; i++) {
                TextMessage message = session.createTextMessage(i + "ceshi xiaox");
//创建消息
                TextMessage message2 = session.createTextMessage(i + "ceshi xiaox2");
                System.out.println("发送");
                publisher.send(message);
//发送
                publisher2.send(message2);
            }
        } catch (JMSException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        } finally {
            try {
                connection.close();
                session.close();
            } catch (Throwable ignore) {
            }
        }

    }

    public static void main(String[] args) {
        PublisherMsg.publishMessage();
    }
}

消费者:
public class Consumer {
    public static void consumerMessage() {
        String url = "tcp://localhost:61616";
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//设置失败重转规则
        RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(2);//重传次数
        redeliveryPolicy.setInitialRedeliveryDelay(2);//重发间隔
        redeliveryPolicy.setUseExponentialBackOff(true);//增加每次重发后的间隔时间
        redeliveryPolicy.setBackOffMultiplier((short) 1);//增加指数
//
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("*.messages");//使用了activemq匹配符,意思是该消费者可以消费所有以messages结尾的队列的信息
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        System.out.println("............" + ((TextMessage) message).getText());
                        //消息处理 处理成功 发确认消息,不成功则捕获异常做回滚处理
                        message.acknowledge();//确认
                    } catch (JMSException e) {
                        try {
                            System.out.println("出错回复");
                            session.recover();//恢复
                        } catch (JMSException e1) {
                        }
                    } catch (RuntimeException e2) {
                        try {
                            System.out.println("出错回复2");
                            session.recover();
                        } catch (JMSException e1) {
                        }
                    }

                }
            });
        } catch (JMSException e) {
            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
        }
    }

    public static void main(String[] args) {
        Consumer.consumerMessage();
    }
}
7 Jms  Message
   Jms消息由三部分组成:消息头部、属性、消息体
消息头部属性通过客户端调用send()方法自动set值。
Jms定义了6种消息体:
Message:基本消息类型,没有消息体,只有消息头和属性,被用于事件通知
TextMessage:消息体为字符串的消息。用于发送简单的文本和xml数据
MapMessage: 消息体为一堆键值对,名称是字符串,值为java基本数据类型
ByteMessage:消息体为一堆字节数组。
StreamMessage:消息体为java基本类型数据,这些数据通过标准流按顺序读取
8 其它特性
1. 能与第三方框架整合例如:spring、ajax、webservice
2. 客户端支持多种语言c++、.net等
3. 经过配置networkconnector broker之间可以进行通信
高级特性
1. 支持地址(destination)层级机构,例如A.B.C ,目标地址可使用通配符
.用于分割地址名称的每个元素。
*匹配一个元素
>匹配一个或所有的尾部元素

2. 咨询消息:系统消息,由activemq broker生成用于通知系统的变化,例如一个新的受管对象连接或离开broker
3. 虚拟会话:如果希望一堆消费者能从队列获得消息消费,可以使用虚拟会话。


Virtual topic are a conventient way of combining queue semantics with a topic ,and have persistent messages consumed by a pool of consumers,each taking one message at time from the topic


4. 消息重传和死信队列

分享到:
评论

相关推荐

    Apache Active MQ 简单的示例

    **Apache Active MQ 简介** Apache Active MQ 是一个开源的消息中间件,它遵循Java消息服务(JMS)规范,提供了可靠的消息传递功能。作为Apache软件基金会的一部分,Active MQ 支持多种协议,如OpenWire、AMQP、...

    Active MQ in action 教程

    《Active MQ in action 教程》是一本深入探讨Apache ActiveMQ消息中间件的实战指南,由Bruce Snyder、Dejan Bosanac与Rob Davies三位作者共同撰写。本书旨在为读者提供全面且深入的理解ActiveMQ及其在实际项目中的...

    Active MQ C++实现通讯 X86 librariy

    Active MQ C++实现通讯 X86 librariy:CMS (stands for C++ Messaging Service)类似于JMS API用于同Message Brokers通讯(例如Active MQ)。 APR(Apache portable Run-time libraries,Apache可移植运行库)的目的如...

    Active MQ in Action

    《Active MQ in Action》是关于Apache ActiveMQ的权威指南,由Manning出版社出版。这本书深入浅出地介绍了ActiveMQ这一开源消息代理系统,是理解、配置、管理和优化ActiveMQ的关键资源。ActiveMQ是Apache软件基金会...

    active MQ 通信程序全套代码

    在“Active MQ 通信程序全套代码”中,你可能会找到一系列用于实现不同场景下消息通信的示例代码,包括Windows和Linux平台的实现。 首先,ActiveMQ的核心功能是作为一个消息代理,它接收和转发消息,允许应用程序...

    Jfinal -active mq

    Jfinal -active mq: 消费:extends JFinalQueueConsumer 生产:JFinalQueue.sendMessage(queueName, message);

    Active mq jdbc持久化所需要的包.rar

    针对"Active mq jdbc持久化所需要的包.rar"这个压缩文件,我们可以推断它包含了实现ActiveMQ使用JDBC持久化的相关依赖库。通常,这些库可能包括: 1. ActiveMQ的主库:包含了ActiveMQ的核心功能,如消息队列管理、...

    集成Websphere Application Server 和Active MQ

    标题中的“集成Websphere Application Server 和Active MQ”意味着我们将探讨如何将IBM的Websphere Application Server(WAS)与Apache ActiveMQ结合使用,以实现企业级的消息传递和队列管理功能。Websphere ...

    active mq思维导图

    Active MQ 基础知识思维导图。主要是JMS总结介绍。用于学习和复习

    camel-wmq-amq:Camel IBM Websphere MQ 到 Active MQ 桥接路由

    Camel IBM Websphere MQ 到 Active MQ 桥接路由 先决条件 IBM 为安装在 Fuse 上的 MQ 客户端提供了 OSGi jar 文件IBM_MQ_INSTALL_DIR/java/lib/OSGi 运行 AMQ 代理 带有填充属性的 JBOSS_FUSE_INSTALL_DIR/etc/ 中...

    zis.rar_active MQ_activemq_java activeMQ_java 转发

    在这个"zis.rar_active MQ_activemq_java _activeMQ_java 转发"的压缩包中,我们可以推测其主要内容可能涉及如何使用ActiveMQ在Java环境中实现消息的转发功能。 首先,我们需要理解ActiveMQ的基本概念。ActiveMQ...

    Active MQ教程+配置

    2. **商业JMS Providers**:例如IBM WebSphere MQ、BEA WebLogic JMS、Oracle AQ、NonStop Server for Java Message Service (JMS)、Sun Java System Message Queue、Sonic jms、TIBCO Enterprise For JMS和iLinkMQ...

    Active MQ 与 IBM WebSphere MQ 可用性和管理分析

    在当今的企业计算环境中,消息队列中间件(MQ)发挥着核心作用,它保证了系统之间通信的可靠性和异步性质。Apache ActiveMQ和IBM WebSphere MQ(简称IBMMQ)是市场上广泛使用的两种消息队列产品。ActiveMQ是开源社区中...

    active MQ ,消息队列

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递协议(Open Message Broker Protocol,即AMQP)和Java消息服务(Java Message Service,JMS)规范。消息队列是分布式系统中的一个重要概念...

    active MQ maven POM方式

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递协议(Open Message ...通过实际操作和调试mq-demo02项目,初学者可以快速掌握ActiveMQ的基本使用,并为后续更复杂的消息传递场景打下基础。

    active_mq_mvc源码

    在本项目"active_mq_mvc源码"中,我们主要关注的是如何在MVC(Model-View-Controller)架构中有效地集成ActiveMQ,一个流行的Java消息代理和消息中间件。源码分析将揭示设计思想、设计模式以及如何巧妙地运用反射...

    Spring Boot基于Active MQ实现整合JMS

    Spring Boot 基于 Active MQ 实现整合 JMS Spring Boot 是一个基于 Java 的框架,Active MQ 是一个开源的消息队列系统,JMS(Java Message Service)是 Java 平台上的一种消息服务规范。今天,我们将介绍如何使用 ...

    active mq 学习笔记

    作为消息队列(Message Queue,简称MQ)的一种,它主要用来在分布式系统之间进行消息传递。 **引入中间件的优势:** - **减少服务器间的依赖**:在没有引入消息中间件的情况下,各个服务之间可能存在大量的相互依赖...

    Active MQ的配置和使用

    ActiveMQ是一款开源的消息代理中间件,它实现了Java消息服务(JMS)规范,允许跨平台的编程语言进行消息通信。...ActiveMQ特点包括: 1. 开源:用户可以自由使用、修改和分发ActiveMQ的源代码。 2. 遵循JMS规范:JMS...

    消息队列active mq

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的...

Global site tag (gtag.js) - Google Analytics