`
manzhizhen
  • 浏览: 293315 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ActiveMQ中的NetworkConnector(网络连接器)详解

阅读更多

       注:本文以ActiveMQ5.10版本为基础。

       我们知道,ActiveMQ中的TransportConnector(传输连接器)主要用于配置ActiveMQ服务端和客户端之间的通信方式,NetworkConnector(网络连接器)则主要用来配置ActiveMQ服务端与服务端之间的通信。在某些场景,网络拓扑中我们可能会需要大量的生产者和消费者,也就是说我们会有大量的ActiveMQ客户端,这样,我们就需要在网络环境中有多个MQ服务器,服务器之间是可以透传消息的,这是,我们就需要用到NetworkConnector,我们先从简单的分析起,假设我们需要部署两台MQ服务器,如下图:

 

 使用NetworkConnector的简单场景

 如上图所示,服务器S1和S2通过NewworkConnector相连,则生产者P1发送消息,消费者C3和C4都可以接收到,而生产者P3发送的消息,消费者C1和C2同样也可以接收到,要使用NewworkConnector的功能,需要在服务器S1的activemq.xml中的broker节点下添加如下配置(注:10.79.11.172:61617为S2的地址):

<networkConnectors>     
          <networkConnector uri="
static:(tcp://10.79.11.172:61617)"/>   
</networkConnectors>
如果只是这样,S1可以将消息发送到S2,即S2对S1来说仿佛就是S1的消费者,但此时通信只是单向的,发送到S2上的的消息是不能发送到S1上的如果要想S1也收到从S2发来的消息,需要在S2的activemq.xml中的broker节点下也添加如下配置(注:10.79.11.171:61617为S1的地址):
<networkConnectors>     
          <networkConnector uri="
static:(tcp://10.79.11.171:61617)"/>   
</networkConnectors>
这样,S1和S2就可以双向通信了。这时,我们可以访问S1和S2的控制台页面(S1默认是http://10.79.11.171:8161)中的“Network”来看桥接的结果。如果有多个,可以用逗号隔开,如下:
<networkConnectors>     
          <networkConnector uri="
static:(tcp://10.79.11.171:61617,tcp://10.79.11.173:61617,tcp://10.79.11.174:61617)"/>   
</networkConnectors>
       静态连接器有一些常用的属性,如下:
【initialReconnectDelay】默认值1000,每次重连的间隔时间,单位为毫秒,当然得useExponentialBackOff=false才有效。
maxReconnectDelay】:默认值30000,最大重连间隔,单位为毫秒。
useExponentialBackOff】:默认值为true,是否不断增加重连时间间隔。
backOffMultiplier】:默认值为2,使用useExponentialBackOff特性的次数。
例如:uri="static:(tcp://host1:61616,tcp://host2:61616)?maxReconnectDelay=5000&useExponentialBackOff=false"
       有人会想,如果只有两三台MQ服务器,我们可以像上面那样简单配置,如果MQ服务器非常多,我们就不太可能在每个MQ服务器的xml文件中去一个个配置其它服务器地址了,所以,我们希望MQ服务器能在自己的网络中发现其他的MQ服务器并自动建立“桥接”,幸运的是,ActiveMQ提供了多播发现(Multicast Discovery)的功能,我们可以在每台MQ服务器的activemq.xml中的broker节点下添加如下配置:
<networkConnectors>      
       <networkConnector uri="multicast://default"/>   
</networkConnectors>
这样,这些在网络中的MQ服务器,在启动和运行中,将不断的发现其他新的MQ服务器了,并和他们创建连接,注意,如果新的MQ服务器中没有配置networkConnectors,那么这种连接就是单向的,而不是双向。
       当然,如果NetworkConnector的功能就只有如此,那本文也就没有继续下去的必要了,接下来,让我们看看NetworkConnector常用的几个属性配置。个人认为,结合源码看属性,才能理解的更深刻和贴切,因为需要通过好几段话来解释一个功能,还不如直接给读者几行源码(非程序猿除外)来的深刻。
name:默认值是bridge,这无需过多说明,可以给每个MQ服务器起个属于自己的名字,看MQ日志也方便。
dynamicOnly:默认为false,官方文档是这样说的,默认的时候,也就是该值为false的时候,网络中的持久化订阅者是在启动的时候被激活的,如果改值为true,则对应的持久化订阅者被激活时,才激活一个网络的持久化订阅者,笔者看了下源码,调试过,但不了解该值配置true或false带来的实际上的效果是什么,有兴趣的读者可以去看一下DurableConduitBridge#setupStaticDestinations中的源码,笔者弄明白后会后续补充的。

decreaseNetworkConsumerPriority:默认值为false,如果该值为false,网络上的消费者和本地的消费者有相同的优先级,就是说一个消息发送到本地的消费者和其他的网络消费者有相同的概率。如果该值为true,则其他网络消费者的优先级从-5开始算起,根据网络跳数(network hops)来算优先级,当然是越“远”优先级越低。

 

networkTTL:默认值为1,该值表示消息或订阅可以通过的消息提供者(brokers)的数量。拿上图来说,假设P1向S1发了条消息,另外有个ActiveMQ服务器S3(上图未画出)与S2相互桥接,但不直接与S1桥接,因为该值为1,即使S3上有该队列的消费者或主题的订阅者,但对于S1来说S3这些网络消费者和订阅者是不可见的,所以P1发的这条消息就不能被S3上的消费者给消费,如果想要让S3上的消费者也有机会消费,可以让S3和S1桥接,或者将S1的networkTTL参数配置为2.

 

messageTTL:默认为1,从5.9版本开始有效,表示消息能通过的消息提供者数量,该值可以覆盖掉networkTTL中对消息通过数量的限制。

 

consumerTTL:默认为1,从5.9版本开始有效,表示订阅能通过的消息提供者数量,该值可以覆盖掉networkTTL中对订阅通过数量的限制。

 

conduitSubscriptions:默认为true。该值表示多个消费者订阅一个相同的目的地是否在网络上被对待为一个消费者。以上图为例,如果采用默认的分配策略,P1连续发120条消息,结果应该是C1和C2都收到40条,而C3和C4共收到40条(C3和C4平分这40条),为什么不是C1、C2、C3和C4都收到大概30条呢?因为该值为true,C3和C4对于S1来说只是一个消费者,而不是两个。注意,如果你的多个网络提供者中的消费者使用了消息选择器(selector),你又设置了conduitSubscriptions为true,则有可能会导致消息不会被发送到正确的网络消费者从而消息不被消费,因为网络消费者中的消息选择器对生产者的提供者来说是透明的,所以此种情况下,请将conduitSubscriptions设置为false。

 

excludedDestinations:默认为空,用来配置不会在网络中转发的目的地(Destination)。

 

dynamicallyIncludedDestinations:默认为空,用来配置可以在网络中转发的目的地,和上面的excludedDestinations的功能相反。例如:

<networkConnector uri="static:(tcp://host)">
  <dynamicallyIncludedDestinations>
    <queue physicalName="queue1"/>
    <topic physicalName="topic1"/>
  </dynamicallyIncludedDestinations>
</networkConnector>

 

staticallyIncludedDestinations:默认为空,用来配置可以在网络中转发的目的地,但它和dynamicallyIncludedDestinations不同的是dynamicallyIncludedDestinations只在对方有该目的地的消费者时才将消息转发给对方,staticallyIncludedDestinations则不管对方有没有该目的地都将消息转发给对方。

举例配置如下:

  <networkConnectors>     
   <networkConnector uri="static:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61618)" >
    <staticallyIncludedDestinations>
     <queue physicalName="TopicTestQueue1"/>
    </staticallyIncludedDestinations>
   </networkConnector>
  </networkConnectors>

 

duplex:默认值为false,默认情况下,在两个提供者之间的连接上的消息流动方向是单向(单工连接),如果该值设置为true,则一个连接上可以双向流动消息(双工连接)。可以想象,单工连接比双工连接吞吐量要高,但增加了连接数量方面的开销。如果在使用双工连接时为了增加吞吐量,可以建立多个双工连接,但每个连接必须起不同的名字,例如:

 <networkConnectors>
        <networkConnector name="SYSTEM1" duplex="true" uri="static:(tcp://10.79.6.56:61616)">
                <dynamicallyIncludedDestinations>
                        <topic physicalName="outgoing.System1" />
                </dynamicallyIncludedDestinations>
        </networkConnector>
        <networkConnector name="SYSTEM2" duplex="true" uri="static:(tcp://10.79.6.56:61616)">
                <dynamicallyIncludedDestinations>
                        <topic physicalName="outgoing.System2"/>
                </dynamicallyIncludedDestinations>
        </networkConnector>
</networkConnectors>

 

prefetchSize:默认是1000,指网络连接中的消费者的预读数量,该值必须大于0,因为网络消费者不能采用poll来获取消息(轮询消息)。

 

suppressDuplicateQueueSubscriptions:(从5.3版本开始有效)默认为false,如果该参数为true,会防止提供者之间相互桥接时的队列重复订阅的问题,其实该参数为true和false只是ActiveMQ代码中的微妙的性能变化,对实际的集群结果并不会带来什么影响。

 

bridgeTempDestinations:默认值为true,该值用来指定在MQ服务器在网络中创建临时目的地(temp destinations)时是否需要广播公告消息(broadcast advisory messages )。如果设置为false,表示禁用此功能,则当生产者和消费者不是连接到同一个MQ服务器时,可以减少网络开销。

 

alwaysSyncSend:(从5.6版本开始有效)默认为false,表示是否总是异步发送。默认时,非持久化消息被发送到远程的MQ服务器时将采用请求/应答方式而不是oneway方式。此参数对待持久化和非持久化消息相同。

 

staticBridge:(从5.6版本开始有效)默认为false,如果设置为true,MQ服务器将不会动态应答新的消费者,也意味着该提供者将对其他远程提供者的主题不感兴趣。当设置为true是,我们可以使用staticallyIncludedDestinations来创建配置需要订阅的主题,例如:

<networkConnector uri="static:(tcp://host)" staticBridge="true">
        <staticallyIncludedDestinations>
            <queue physicalName="always.include.queue"/>
        </staticallyIncludedDestinations>
</networkConnector>

 

       消息如果一开始就是持久化的化的或者是被持久订阅的,则在网络经纪人中传播时也会保持这种特性,然而,如果消息一开始就不是持久化的或者不是持久订阅的,则在网络的MQ服务器中传播时也一定不会有这些特性。

       总的消息顺序在网络经纪人中是不保证的,当然,即使不是网络经纪人,在单个经纪人环境下,有多个消费者时,消息被消费的顺序也是不保证的。

       如果你希望提供者不受任何远程提供者上消费者的影响,或者希望不管远程提供者上是否有消费者都将所有消息转发到远程提供者,那么你可以考虑使用静态网络(static networks)。 

       就如前面你说看到的,如果你想增加吞吐量,或者想采用不同的通信方式(如tcp或者nio),又或者你想采用更灵活的配置,两个提供者之间可以由多个NetworkConnector相连, 例如,如果使用分布式队列(distributed queues),你可能希望在通过网络对队列接收者能采用等效加权(equivalent weighting),但只针对活跃的接收者,你可以如下配置:

<networkConnectors>
      <networkConnector uri="static:(tcp://localhost:61617)" name="queues_only" conduitSubscriptions="false"   decreaseNetworkConsumerPriority="false">
        <excludedDestinations>
            <topic physicalName=">"/>
        </excludedDestinations>
      </networkConnector>
</networkConnectors>

这里你需要注意,你只能在excludedDestinations和dynamicallyIncludedDestinations上使用通配符。 还有,当你使用持久主题订阅者时,请不要去修改NetworkConnector和提供者的(默认)名字,因为网络中的ActiveMQ是通过提供者名字和NetworkConnector名字加上持久订阅者名字来标志一个持久订阅者的。
         一般情况下,在采用网络提供者时,为了防止消息在提供者之间重复流动,一个消息是不允许再一次发送到同一个网络提供者的,但这样就有一个问题,如果某些消息发送到一个网络提供者时,该网络提供者刚好因为某种原因重启,导致消息消费失败,这些消息需要重新发送到该网络提供者,这样就矛盾了。有一种解决方案就是强制让客户端采用rebalanceClusterClients方式进行重连(http://manzhizhen.iteye.com/blog/2105572 )。再一个,就是让消息回到原来的地方重新发送,因为那个网络提供者已经没有消费者了。有一种目的地策略(destination policy),它通过配置一个replayWhenNoConsumers=true的conditionalNetworkBridgeFilterFactory来提供了这种行为,conditionalNetworkBridgeFilterFactory提供了一个可配置的基于提供者时间(broker-in time)的重发延迟(replayDelay ),配置如下:
 <destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue="TEST.>" enableAudit="false">
        <networkBridgeFilterFactory>
   <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
        </networkBridgeFilterFactory>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>
主要,在ActiveMQ5.9之前的版本使用replayWhenNoConsumers="true"需要加enableAudit="false",具体原因请看文章后面提供的官网链接,这里不细说。conditionalNetworkBridgeFilterFactory可以对目的地进行比例限制,可以有效的避免低优先级的网络消费者饿死本地的快消费者。
本文对网络连接器的属性介绍的不是很详细,官网的文章似乎也有些前后矛盾表述不清的地方,还是等以后有时间分析分析源代码再来补充吧!

感兴趣的童鞋可以去看看官方文档:http://activemq.apache.org/networks-of-brokers.html

  • 大小: 37.5 KB
分享到:
评论

相关推荐

    高可用之ActiveMQ集群:网络连接模式(network connector)详解.docx

    配置网络连接器主要涉及在 `activemq.xml` 文件中 `&lt;networkConnectors&gt;` 节点内添加 `&lt;networkConnector&gt;` 元素。例如: ```xml &lt;networkConnector uri="static:(tcp://localhost:62001)"/&gt; ``` uri 属性可以...

    ActiveMQ集群:网络连接模式(network connector)详解.docx

    ActiveMQ 集群网络连接模式(Network Connector)详解 ActiveMQ 集群网络连接模式(Network Connector)是 ActiveMQ 提供的一种集群功能,旨在提高消息服务的横向扩展性和高可用性。通过将多个不同的 broker 实例...

    4.2、静态网络连接 networkConnection单向和双向deplux 1

    在ActiveMQ中,网络连接(networkConnector)是一个关键特性,它允许不同的ActiveMQ Brokers之间进行通信,从而实现集群和消息传递的高可用性。在本文中,我们将深入探讨网络连接器的工作原理,特别是单向和双向连接...

    activemq ssl双向认证连接

    在这个场景中,我们将深入探讨如何在Java工程中使用ActiveMQ实现SSL双向认证连接。 首先,让我们了解SSL双向认证的基本概念。在传统的SSL单向认证中,客户端只需要验证服务器的身份,而服务器无需验证客户端。而在...

    自己实现的ActiveMQ连接池和新版本ActiveMQ自带的连接池,封装好的工具类,可直接使用

    连接池是一种资源管理技术,通过复用已建立的数据库连接或网络连接,减少创建和销毁连接的开销,提高系统的性能和响应速度。 首先,我们来看自定义的ActiveMQ连接池。作者自己实现的连接池通常是为了满足特定场景下...

    ActiveMQ集群

    网络连接器负责在不同的Broker实例之间建立连接,确保消息可以在整个集群中透明地流动。有三种主要的部署拓扑结构: 1. **嵌入式**:在一个应用程序内部运行多个Broker实例,通常用于测试和简单部署。 2. **主从...

    spring配置activemq详解

    在"spring配置activemq详解"这个主题中,我们将探讨如何在Spring项目中配置和使用ActiveMQ。以下是对这个主题的详细说明: 1. **配置ActiveMQ**: - 首先,我们需要在项目中引入ActiveMQ的相关依赖,这通常通过在`...

    ActiveMQ 配置文件详解

    **ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...

    ActiveMQ整合Spring使用连接池

    在Spring的XML配置文件中,你可以定义一个`ConnectionFactory` bean,这将使用ActiveMQ的TCP连接工厂。示例配置如下: ```xml &lt;bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory...

    ActiveMQ连接和使用测试工程

    **ActiveMQ连接与使用测试工程详解** ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息协议,如OpenWire、STOMP、AMQP和MQTT等。在Java开发环境中,ActiveMQ作为中间件广泛...

    ActiveMQ环境搭建及实例详解的源码

    4. **网络连接**:ActiveMQ可以构建多节点集群,实现高可用性和负载均衡。 5. **Web管理界面**:ActiveMQ提供了Web管理界面,可以实时监控消息队列的状态,查看和管理消息。 6. **安全性**:ActiveMQ支持用户认证...

    linux环境下ActiveMQ持久化、集群环境搭建详解

    Linux 环境下 ActiveMQ 持久化、集群环境搭建详解 在 Linux 环境下搭建 ActiveMQ 持久化和集群环境是一种复杂的任务,需要对 Linux 操作系统、Java 环境、ActiveMQ 等方面有深入的了解。以下是搭建 ActiveMQ 持久化...

    Java消息中间件ActiveMQ学习资料

    在高可用性和负载均衡的场景下,ActiveMQ可以通过网络连接器在网络中的不同节点间建立连接,实现消息的同步和复制。网络连接器允许消息在集群中的节点间流动,提高了系统的可靠性和性能。理解网络连接器的工作原理和...

    消息队列中间件ActiveMQ入门到精通视频教程及资料

    001-ActiveMQ基础;002-安全机制+签收模式+发送模式+MessageProducer;...ActiveMQ集群:网络连接模式(network connector)详解.docx;ActiveMQ集群:网络连接模式(network connector)详解.docx;示例;

    ActiveMQ+spring配置方案详解

    在IT行业中,消息队列(Message Queue)是分布式系统...在Spring的配置文件(如`applicationContext.xml`)中,我们可以通过`&lt;bean&gt;`标签创建一个连接工厂(ConnectionFactory),并指定ActiveMQ服务器的URL: ```xml ...

    ActiveMQ连接池完整封装实例工具类

    在企业级应用中,为了提高消息处理的效率和稳定性,通常会使用连接池来管理与ActiveMQ服务器的连接。本文将详细介绍如何实现一个ActiveMQ连接池的完整封装实例工具类,并探讨其背后的设计思想。 首先,我们需要了解...

    ActiveMq-JMS好用实例详解

    ### ActiveMQ-JMS好用实例详解 #### 一、ActiveMQ简介及特点 **ActiveMQ** 是一个非常流行的开源消息中间件,它基于 **Java消息服务(JMS)** 规范,能够提供高度可靠的消息传递机制。ActiveMQ 以其丰富的功能集、...

    SpringBoot集成ActiveMQ实例详解.docx

    在Spring Boot中集成ActiveMQ非常简便,因为Spring Boot提供了`spring-boot-starter-activemq`启动器,该启动器包含了自动配置的支持。首先,我们需要在项目中添加相应的依赖: ```xml &lt;groupId&gt;org.spring...

    mqttjs(activemq测试工具)

    在这个例子中,我们连接到ActiveMQ服务器(将`your.active.mq.server`替换为实际的IP地址或域名),订阅了`test/topic`主题,并向该主题发布了一条消息。当收到消息时,控制台会打印出接收到的主题和消息内容。 在...

Global site tag (gtag.js) - Google Analytics