`
manzhizhen
  • 浏览: 293337 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论
阅读更多

 

      本文的ActiveMQ都基于5.10版本,参考了ActiveMQ官方文档:http://activemq.apache.org/failover-transport-reference.html

      集群是个比较广泛的概念,它有多种形式,关于消息服务的集群,大概分为Consumer集群(消费者集群)和Broker集群(消息服务器集群)两种。

      对于消费者集群,对于队列消费者,主要是:1.保证如果某一个消费者死亡了,任何它没有确认完的消息会被重传别的正常的消费者来消费;2.如果一个消费者消费消息过快,就可以比别的消费者得到更多的消息;3.如果一个消费者消费消息过慢,它就会被少得到消息。第1点几乎是所有JMS提供者都有的功能——消息重传机制(可以参考我的其他ActiveMQ博文)。第2点和第3点也是很正常的,因为大多消费者和线程是一一对应的关系,你消费速率快,当然可以自己去服务器拉取更多的消息。当然ActiveMQ在队列上给消费者提供了高性能的负载均衡策略。对于主题订阅者,由于每个订阅者接受到被推送的消息都和其他订阅者无关,所以处理相对简单,JMS也有持久订阅者这一概念,这里不多说。

      对于消息服务器集群,主要是指:1.如果集群中的某一台消息服务器宕机,与该台消息服务器相连接的生产者和消费者能否自动连接到其他正常工作的消息服务器。2.如果集群中的某一台消息服务器宕机,该台服务器上未消费的消息能否在该台服务器恢复正常之前由其他服务器转发。3.集群环境中会不会导致某台消息服务器上只有消费者或者某台消息服务器上只有生产者。对于1,ActiveMQ提供了一种叫做失效转移(也叫故障转移,FailOver)的策略。失效转移提供了在传输层上重新连接到其他任何传输器的功能。使用它很简单,只需要在uri中配置就行了,语法如下:

failover:(uri1,...,uriN)?transportOptions 或者 failover:uri1,...,uriN

例子:

failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false

failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100 (

如果这样使用报错你可以试试这个:failover://(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100  (this way works in ActiveMQ 4.1.1 the one above does not)

如果某个ActiveMQ客户端发现uri1地址失效了,它会立即转向uri地址列表中其他可以连接的消息服务器进行重连,以保证继续正常工作,请注意,并不是uri1失效了就会选则uri2重连,这种选择其他地址的方式默认是随机的,以保证负载均衡,如果你想关闭随机,可以transportOptions中加入randomize=false

 

transportOptions有多种参数可以选择,如下:

initialReconnectDelay:默认为10,单位毫秒,表示第一次尝试重连之前等待的时间。

maxReconnectDelay:默认30000,单位毫秒,表示两次重连之间的最大时间间隔。

useExponentialBackOff:默认为true,表示重连时是否加入避让指数来避免高并发。

reconnectDelayExponent:默认为2.0,重连时使用的避让指数。

maxReconnectAttempts:5.6版本之前默认为-1,5.6版本及其以后,默认为0,0表示重连的次数无限,配置大于0可以指定最大重连次数。

startupMaxReconnectAttempts:默认为0,如果该值不为0,表示客户端接收到消息服务器发送来的错误消息之前尝试连接服务器的最大次数,一旦成功连接后,maxReconnectAttempts值开始生效,如果该值为0,则默认采用maxReconnectAttempts。详见FailoverTransport.java代码:

    private int calculateReconnectAttemptLimit() {
        int maxReconnectValue = this.maxReconnectAttempts;
        if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
            maxReconnectValue = this.startupMaxReconnectAttempts;
        }
        return maxReconnectValue;
    }

randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略,记住,这种随机策略在第一次选择URI列表中的地址时就开始生效,所以,如果为true的话,一个生产者和一个消费者的Failover连接地址都是两个URI的话,有可能生产者连接的是第一个,而消费者连接的是第二个,造成一个服务器上只有生产者,一个服务器上只有消费者的尴尬境地。

backup:默认为false,表示是否在连接初始化时将URI列表中的所有地址都初始化连接,以便快速的失效转移,默认是不开启。

timeout:默认为-1,单位毫秒,是否允许在重连过程中设置超时时间来中断的正在阻塞的发送操作。-1表示不允许,其他表示超时时间。这样说你肯定不是很明白,直接看代码吧,下面给出FailoverTransport.java类中oneway方法中的一段代码给你看你就明白了:

                // Keep trying until the message is sent.
                for (int i = 0; !disposed; i++) {
                    try {

                        // Wait for transport to be connected.
                        Transport transport = connectedTransport.get();
                        long start = System.currentTimeMillis();
                        boolean timedout = false;
                        while (transport == null && !disposed && connectionFailure == null
                                && !Thread.currentThread().isInterrupted()) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Waiting for transport to reconnect..: " + command);
                            }
                            long end = System.currentTimeMillis();
                            if (command.isMessage() && timeout > 0 && (end - start > timeout)) {
                                timedout = true;
                                if (LOG.isInfoEnabled()) {
                                    LOG.info("Failover timed out after " + (end - start) + "ms");
                                }
                                break;
                            }
                            try {
                                reconnectMutex.wait(100);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Interupted: " + e, e);
                                }
                            }
                            transport = connectedTransport.get();
                        }

                       // 其余的代码略

trackMessages:默认值为false,是否缓存在发送中(in-flight messages)的消息,以便重连时让新的Transport继续发送。默认是不开启。

maxCacheSize:默认131072,如果trackMessages为true,该值表示缓存消息的最大尺寸,单位byte。

updateURIsSupported:默认值为true,表示重连时客户端新的连接器(Transport)是否从消息服务接受接受原来的URI列表的更新,5.4及其以后的版本可用。如果关闭的话,会导致重连后连接器没有其他的URI地址可以Failover。

updateURIsURL:默认为null,从5.4及其以后的版本,ActiveMQ支持从文件中加载Failover的URI地址列表,URI还是以逗号分隔,updateURIsURL为文件路径。详见FailoverTransport.java中代码:

    private void doUpdateURIsFromDisk() {
        // If updateURIsURL is specified, read the file and add any new
        // transport URI's to this FailOverTransport.
        // Note: Could track file timestamp to avoid unnecessary reading.
        String fileURL = getUpdateURIsURL();
        if (fileURL != null) {
            BufferedReader in = null;
            String newUris = null;
            StringBuffer buffer = new StringBuffer();

            try {
                in = new BufferedReader(getURLStream(fileURL));
                while (true) {
                    String line = in.readLine();
                    if (line == null) {
                        break;
                    }
                    buffer.append(line);
                }
                newUris = buffer.toString();
            } catch (IOException ioe) {
                LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException ioe) {
                        // ignore
                    }
                }
            }

            processNewTransports(isRebalanceUpdateURIs(), newUris);
        }
    }

nested.*:默认为null,5.9及其以后版本可用,表示嵌套的URL添加额外的选项 以前,如果你想检测让死连接速度更快,你必须wireFormat.maxInactivityDuration= 1000选项添加到失效转移列表中的所有嵌套的URL例如

failover:(tcp://host01:61616?wireFormat.maxInactivityDuration=1000,tcp://host02:61616?wireFormat.maxInactivityDuration=1000,tcp://host03:61616?wireFormat.maxInactivityDuration=1000)
而现在,你只需要这样:

failover:(tcp://host01:61616,tcp://host02:61616,tcp://host03:61616)?nested.wireFormat.maxInactivityDuration=1000

warnAfterReconnectAttempts.*:默认为10,5.10及其以后的版本可用,表示每次重连该次数后会打印日志告警,设置<=0的值表示禁用,FailoverTransport.java类的doReconnect()部分相关代码如下

 

            int warnInterval = getWarnAfterReconnectAttempts();
            if (warnInterval > 0 && (connectFailures % warnInterval) == 0) {
                LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.",
                         uris, connectFailures);
            }
 reconnectSupported:默认为true,表示客户端是否应响应经纪人 ConnectionControl事件重新连接(参见rebalanceClusterClients)。

 

       如果你使用Failover失效转移,则消息服务器在死亡的那一刻,你的生产者发送消息时默认将阻塞,但你可以设置发送消息阻塞的超时时间(注:timeout参数前面已经讲过了):

failover:(tcp://primary:61616)?timeout=3000

上面的设置将导致如果3秒后连接还未建立,将导致消息发送失败,但这并不会导致该连接被kill,所以你可以过一阵子后再使用这一个连接来尝试发送消息。

        如果用户希望能追踪到重连过程,可以在ActiveMQConnectionFactory设置一个TransportListener,如下所示:

			ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
			factory.setTransportListener(new TransportListener() {
				
				@Override
				public void transportResumed() {
					System.out.println("连接器已经恢复完毕!");
				}
				
				@Override
				public void transportInterupted() {
					System.out.println("连接器被中断了!");
				}
				
				@Override
				public void onException(IOException error) {
					System.out.println(error);
				}
				
				@Override
				public void onCommand(Object command) {
					System.out.println(command);
				}
			});

      

       下面我们看看如何在消息服务器(broker)上配置失效转移Failover。

       在消息服务器这边,有一些选项可以将客户端更新到新的消息服务器,如下所示:

updateClusterClients:默认为false,如果为true,则会将broker集群的拓扑结构的改变信息传递给连接的客户端。

rebalanceClusterClients:默认为false,如果为true,则如果有新的消息服务器加入到消息服务器集群中,则连接的客户端将被要求重新平衡(asked to rebalance)。注意,  priorityBackup=true能覆盖。

updateClusterClientsOnRemove:默认为false,如果为true,则当一个集群从网络中移除的时候将更新客户端。有了这个选项,可以在消息服务器移除时更新客户端,而不是仅仅只是新增消息服务器时更新。(难道官方文档有问题:if true, will update clients when a cluster is removed from the network. Having this as separate option enables clients to be updated when new brokers join, but not when brokers leave.)

updateClusterFilter:默认为null,如果有值,将会是逗号分隔的正则表达式列表,用来过滤掉失效转移时的消息服务器集群中的服务器名称。

举例:

<broker>
  ... 
  <transportConnectors>   
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" updateClusterFilter="*A*,*B*" /> 
  </<transportConnectors> 
  ...
 </broker>

如果面配置所示,如果updateClusterClients设置为true,则连接到该服务器的客户端的连接器地址Failover的URI列表只需要写这个服务器地址就行,如:failover://tcp://primary:61616,当有新的消息服务器加入时,这些客户端将自动被更新添加该新消息服务器的地址,如果发生网络或消息服务器宕机的事件,就可以重连接到新的消息服务器上。

 

       有时候我们希望客户端能优先选择某些消息服务器地址,比如既有本地服务器,又有远程服务器,我们希望本地的应用程序优先选择本地服务器进行连接,从5.6版本开始,ActiveMQ提供了优先级备份(priority backup )的特性,所以你可以让客户端自动重连到所谓的“优先级”URI,你可以在客户端如下配置URI地址:

failover:(tcp://local:61616,tcp://remote:61616)?randomize=false&priorityBackup=true

如果上面这个地址被用于客户端使用,客户端将尝试并保持连接到本地(上面local表示的地址)的消息服务器,当然,如果本地的服务器故障,将转移到远程(remote代表的地址)服务器。默认情况下,只有URI列表中的第一个被视为优先(本地)URI,在某些情况下,你希望不止一个URI地址优先,则你可以使用priorityURIs参数:

failover:(tcp://local1:61616,tcp://local2:61616,tcp://remote:61616)?randomize=false&priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616

这样,客户端将视local1和local2都为优先URI。

分享到:
评论

相关推荐

    ActiveMQ_使用failover模式进行连接切换时,线程断开

    ### ActiveMQ Failover模式下连接切换与线程断开问题详解 #### 一、问题背景及现象描述 在使用ActiveMQ消息中间件时,我们常常需要考虑如何在多个实例之间实现高可用性(HA)。其中一种常用的方式是采用**Failover...

    ActiveMQ主备自动failover方案

    ActiveMQ主备自动failover方案 ActiveMQ5.8.0版本的主备有两种方式:共享文件系统、共享数据库。性能上共享文件系统要优于共享数据库。 本文档采用共享文件系统的方式实现主备。共享文件系统最好使用分布式文件存储...

    ActiveMQ Failover Transport Options

    ActiveMQ 中的故障处理连接方式(Failover Transport Options)是指在 Message Queue(MQ)连接的 URI 中配置的一系列参数,用于控制连接的重连、超时、缓存等行为。下面详细介绍这些参数的作用和配置。 ...

    activeMQ集群的使用与配置[收集].pdf

    如果某个网络上有多个brokers,而客户使用静态发现(使用Static Transport或Failover Transport)或动态发现(使用Discovery Transport),那么客户可以容易地在某个broker失效的情况下切换到其它的brokers。...

    ActiveMQ集群

    在客户端方面,failover://协议允许客户端在Broker失效时自动进行故障转移。而网络连接器和网络地址转换器的使用,进一步保障了消息在不同Broker节点之间的顺畅传递,以及在集群环境下的高可用性。通过合理的集群...

    java springboot整合activemq工程

    spring.activemq.broker-url=failover:(tcp://10.0.1.227:61616,tcp://10.0.1.228:61616,tcp://10.0.1.229:61616,mqtt://10.0.1.227:1883,mqtt://10.0.1.228:1883,mqtt://10.0.1.229:1883) #spring.activemq.broker-...

    activemq activeMq笔记

    Apache ActiveMQ 是一款非常流行的开源消息中间件,它支持 Java 消息服务 (JMS) 标准,并提供了多种高级功能,例如持久化、集群、故障转移等。ActiveMQ 能够帮助开发者实现解耦、可靠的消息传输以及高性能的应用程序...

    activeMQ收发工具.rar

    ActiveMQ是中国最流行的开源消息中间件之一,由Apache软件基金会开发。它基于Java Message Service (JMS) 规范,提供了可靠的消息传递功能,适用于分布式系统中的应用间通信。本压缩包“activeMQ收发工具.rar”包含...

    apache-activemq-5.16.5

    3. **高可用性**:通过集群和故障转移,ActiveMQ可以实现高可用性,确保消息服务的不间断运行。当主服务器故障时,备份服务器可以无缝接管,保证业务连续性。 4. **负载均衡**:在多台服务器组成的集群中,ActiveMQ...

    最新稳定版ActiveMQ5.15.0

    1. **高可用性**:ActiveMQ通过集群和故障转移功能确保服务的连续性和数据的完整性。它支持网络中的多台服务器复制消息,当主服务器出现故障时,备份服务器可以立即接管,保证服务不中断。 2. **性能优化**:5.15.0...

    apache-activemq Linux版本

    8. **网络连接**: 支持集群和故障转移,确保高可用性和负载均衡。 在安装`apache-activemq-5.14.0`这个版本时,首先需要在Linux系统上安装Java运行环境(JRE或JDK)。然后解压下载的压缩包,进入解压后的目录,启动...

    activemqactivemq

    为了确保高可用性和容错性,ActiveMQ支持多种部署模式,如网络集群、故障转移和持久化消息。持久化保证了即使在服务器崩溃后,未被消费的消息也能恢复。 总的来说,"activemqactivemq"这个主题涵盖了使用ActiveMQ...

    ActiveMQ 配置文件详解

    **ActiveMQ配置文件详解** Apache ActiveMQ 是一个开源的消息中间件,它实现了多种消息协议,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),并且广泛应用于分布式系统中,提供可靠的...

    apache-activemq-5.15.9.rar

    为了实现高可用性和容错性,ActiveMQ支持多种部署模式,如网络集群、故障转移和复制。在集群模式下,多个ActiveMQ实例可以共享负载并提供冗余,以确保服务的连续性。 总结一下,Apache ActiveMQ 5.15.9是一个强大的...

    Apache ActiveMQ Artemis.pdf

    15. 高可用性和故障转移:介绍了ActiveMQ Artemis如何支持高可用性和故障转移。 16. 性能调优:包含了对ActiveMQ Artemis进行性能调优的技巧和策略。 17. 常见问题解答:提供了一些关于如何解决在使用ActiveMQ ...

    ActiveMQ路由配置方式

    ActiveMQ路由配置方式 ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 ...

    ActiveMQ高并发处理方案

    ### ActiveMQ高并发处理方案详解 #### 一、引言 在现代分布式系统中,消息队列作为异步通信的核心组件之一,对于提高系统的吞吐量、降低响应时间和实现服务解耦等方面起着至关重要的作用。Apache ActiveMQ作为一款...

    ActiveMQ接受和发送工具.rar

    7. **网络集群**:通过网络集群,ActiveMQ可以实现高可用性和负载均衡,一个节点失效后,其他节点可以接管其服务,确保服务不间断。 8. **安全性**:ActiveMQ支持多种身份验证和授权机制,包括JAAS(Java ...

    Jmeter测试ActiveMQ性能报告

    本报告详细阐述了使用JMeter对ActiveMQ进行性能测试的过程和结果,旨在评估ActiveMQ在JMS(Java消息服务)环境下的性能表现。JMeter作为一个强大的负载和性能测试工具,被广泛用于测试各种应用程序,包括消息中间件...

    ActiveMQ消息服务器 v6.0.1.zip

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递标准(JMS,Java Message Service),用于在分布式系统中实现可靠的消息传递。在本文中,我们将深入探讨ActiveMQ v6.0.1的核心特性、应用...

Global site tag (gtag.js) - Google Analytics