`
ywu
  • 浏览: 457626 次
  • 性别: Icon_minigender_1
  • 来自: 无锡
社区版块
存档分类
最新评论

ActiveMQ Failover broker url顺序

阅读更多

ActiveMQ的客户端支持失效重连。昨天在配置ActiveMQ的主从结构后,客户端使用失效重连协议连接到代理,如下所示:

failover:(tcp://master:61616,tcp://slave:61616)

 测试的时候客户端有时候连接到master上,有时候连接到slave上。这就尴尬了,正常理解的话,配在前面的broker url应该先尝试连接,连接不上才连接后面的url,即从前到后的顺序,但是测试结果来看,是不一定的。

使用logback打开了ActiveMQ的日志

<logger name="org.apache.activemq" level="debug" additivity="false">
    <appender-ref ref="STDOUT"/>
</logger>

 日志中输出的确有的时候只直接连了slave,而不是每次都先连master,而且master是正常的,那就说明的确客户端是做过处理了,并不是按从前到后的顺序依次连接。

日志中会输出“Successfully connected to slave...”,logback配置中输出了代码行数,因此很快定位到程序,org.apache.activemq.transport.failover.FailoverTransport类的876行(客户端使用的是activemq-all-5.4.3.jar),代码如下

final boolean doReconnect() {
        Exception failure = null;
        synchronized (reconnectMutex) {

            // If updateURIsURL is specified, read the file and add any new
            // transport URI's to this FailOverTransport. 
            // Note: Could track file timestamp to avoid unnecessary reading.
            String fileURL = getUpdateURIsURL();
            if (fileURL != null) {
                BufferedReader in = null;
                String newUris = null;
                StringBuffer buffer = new StringBuffer();

                try {
                    in = new BufferedReader(getURLStream(fileURL));
                    while (true) {
                        String line = in.readLine();
                        if (line == null) {
                            break;
                        }
                        buffer.append(line);
                    }
                    newUris = buffer.toString();
                } catch (IOException ioe) {
                    LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
                } finally {
                    if (in != null) {
                        try {
                            in.close();
                        } catch (IOException ioe) {
                            // ignore
                        }
                    }
                }
                
                processNewTransports(isRebalanceUpdateURIs(), newUris);
            }

            if (disposed || connectionFailure != null) {
                reconnectMutex.notifyAll();
            }

            if (connectedTransport.get() != null || disposed || connectionFailure != null) {
                return false;
            } else {
                List<URI> connectList = getConnectList();
                if (connectList.isEmpty()) {
                    failure = new IOException("No uris available to connect to.");
                } else {
                    if (!useExponentialBackOff || reconnectDelay == -1) {
                        reconnectDelay = initialReconnectDelay;
                    }
                    synchronized (backupMutex) {
                        if (backup && !backups.isEmpty()) {
                            BackupTransport bt = backups.remove(0);
                            Transport t = bt.getTransport();
                            URI uri = bt.getUri();
                            t.setTransportListener(myTransportListener);
                            try {
                                if (started) {
                                    restoreTransport(t);
                                }
                                reconnectDelay = initialReconnectDelay;
                                failedConnectTransportURI = null;
                                connectedTransportURI = uri;
                                connectedTransport.set(t);
                                reconnectMutex.notifyAll();
                                connectFailures = 0;
                                LOG.info("Successfully reconnected to backup " + uri);
                                return false;
                            } catch (Exception e) {
                                LOG.debug("Backup transport failed", e);
                            }
                        }
                    }

                    Iterator<URI> iter = connectList.iterator();
                    while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
                        URI uri = iter.next();
                        Transport t = null;
                        try {
                            LOG.debug("Attempting connect to: " + uri);
                            SslContext.setCurrentSslContext(brokerSslContext);
                            t = TransportFactory.compositeConnect(uri);
                            t.setTransportListener(myTransportListener);
                            t.start();

                            if (started) {
                                restoreTransport(t);
                            }

                            LOG.debug("Connection established");
                            reconnectDelay = initialReconnectDelay;
                            connectedTransportURI = uri;
                            connectedTransport.set(t);
                            reconnectMutex.notifyAll();
                            connectFailures = 0;
                            // Make sure on initial startup, that the
                            // transportListener
                            // has been initialized for this instance.
                            synchronized (listenerMutex) {
                                if (transportListener == null) {
                                    try {
                                        // if it isn't set after 2secs - it
                                        // probably never will be
                                        listenerMutex.wait(2000);
                                    } catch (InterruptedException ex) {
                                    }
                                }
                            }
                            if (transportListener != null) {
                                transportListener.transportResumed();
                            } else {
                                LOG.debug("transport resumed by transport listener not set");
                            }
                            if (firstConnection) {
                                firstConnection = false;
                                LOG.info("Successfully connected to " + uri);
                            } else {
                                LOG.info("Successfully reconnected to " + uri);
                            }
                            connected = true;
                            return false;
                        } catch (Exception e) {
                            failure = e;
                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);
                            if (t != null) {
                                try {
                                    t.stop();
                                } catch (Exception ee) {
                                    LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
                                }
                            }
                        } finally {
                            SslContext.setCurrentSslContext(null);
                        }
                    }
                }
            }
            int reconnectAttempts = 0;
            if (firstConnection) {
                if (this.startupMaxReconnectAttempts != 0) {
                    reconnectAttempts = this.startupMaxReconnectAttempts;
                }
            }
            if (reconnectAttempts == 0) {
                reconnectAttempts = this.maxReconnectAttempts;
            }
            if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
                LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
                connectionFailure = failure;

                // Make sure on initial startup, that the transportListener has
                // been initialized
                // for this instance.
                synchronized (listenerMutex) {
                    if (transportListener == null) {
                        try {
                            listenerMutex.wait(2000);
                        } catch (InterruptedException ex) {
                        }
                    }
                }

                if (transportListener != null) {
                    if (connectionFailure instanceof IOException) {
                        transportListener.onException((IOException) connectionFailure);
                    } else {
                        transportListener.onException(IOExceptionSupport.create(connectionFailure));
                    }
                }
                reconnectMutex.notifyAll();
                return false;
            }
        }
        if (!disposed) {

            LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
            synchronized (sleepMutex) {
                try {
                    sleepMutex.wait(reconnectDelay);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            if (useExponentialBackOff) {
                // Exponential increment of reconnect delay.
                reconnectDelay *= backOffMultiplier;
                if (reconnectDelay > maxReconnectDelay) {
                    reconnectDelay = maxReconnectDelay;
                }
            }
        }
        return !disposed;
    }

 代码List<URI> connectList = getConnectList();获取配置的broker url,打了断点后debug发现,这边获取的List中的URI有时候已经不是配置中的顺序了,进入getConnectList方法

private List<URI> getConnectList() {
        ArrayList<URI> l = new ArrayList<URI>(uris);
        boolean removed = false;
        if (failedConnectTransportURI != null) {
            removed = l.remove(failedConnectTransportURI);
        }
        if (randomize) {
            // Randomly, reorder the list by random swapping
            for (int i = 0; i < l.size(); i++) {
                int p = (int) (Math.random() * 100 % l.size());
                URI t = l.get(p);
                l.set(p, l.get(i));
                l.set(i, t);
            }
        }
        if (removed) {
            l.add(failedConnectTransportURI);
        }
        LOG.debug("urlList connectionList:" + l);
        return l;
    }

 这边有个randomize,默认值为true,即默认这边会对配置的url重新排序,所以导致连接初始化是可能不是按照想象的从前到后的顺序连接,解决也简单,在连接后价格参数,将randomize设为false

failover:(tcp://master:61616,tcp://slave:61616)?randomize=false

 这样每次都会先连接master,master连不上才会连slave。

 

 

1
1
分享到:
评论
2 楼 ywu 2016-08-17  
Tyrion 写道
activemq的官方文档给出了这个配置的说明:
http://activemq.apache.org/failover-transport-reference.html

The Failover transport chooses a URI at random by default. This effectively load-balances clients over multiple brokers. However, to have a client connect to a primary first and only connect to a secondary backup broker when the primary is unavailable, set randomize=false.

不过给你这种翻源码的精神点赞一下。

谢谢 文档读过又忘了,还是代码来的暴力
1 楼 Tyrion 2016-08-17  
activemq的官方文档给出了这个配置的说明:
http://activemq.apache.org/failover-transport-reference.html

The Failover transport chooses a URI at random by default. This effectively load-balances clients over multiple brokers. However, to have a client connect to a primary first and only connect to a secondary backup broker when the primary is unavailable, set randomize=false.

不过给你这种翻源码的精神点赞一下。

相关推荐

    ActiveMQ Failover Transport Options

    ActiveMQ 中的故障处理连接方式(Failover Transport Options)是指在 Message Queue(MQ)连接的 URI 中配置的一系列参数,用于控制连接的重连、超时、缓存等行为。下面详细介绍这些参数的作用和配置。 ...

    java springboot整合activemq工程

    spring.activemq.broker-url=failover:(tcp://10.0.1.227:61616,tcp://10.0.1.228:61616,tcp://10.0.1.229:61616,mqtt://10.0.1.227:1883,mqtt://10.0.1.228:1883,mqtt://10.0.1.229:1883) #spring.activemq.broker-...

    activemq与tomcat整合

    `brokerURL`属性指定了连接到ActiveMQ Broker的URL,`brokerName`标识了Broker的名称,`useEmbeddedBroker`设置为`false`表示不使用内嵌的Broker。 3. **配置JNDI资源**:`&lt;Resource&gt;`标签中的`name`属性定义了...

    ActiveMQ使用手册(中文版)

    - **定义:** 正确配置客户端连接到ActiveMQ Broker的URL。 - **注意事项:** 确保URL中的协议、主机名和端口号正确无误。 以上知识点全面覆盖了ActiveMQ的核心概念、配置细节、集群方式以及监控和使用注意事项等方面...

    SpringActiveMQ.rar

    8. **Spring Boot与ActiveMQ**:在Spring Boot应用中,可以利用 starters 来简化配置,通过`spring.activemq.broker-url`和`spring.activemq.user`等属性快速设置ActiveMQ连接。 9. **消息确认**:ActiveMQ支持两种...

    ActiveMQ集群

    对于客户端来说,ActiveMQ提供了failover://协议,通过该协议可以实现客户端自动重新连接到可用的Broker。客户端不需要了解底层集群的实现细节,只需知道消息代理的地址即可。ActiveMQ还支持静态发现和动态发现机制...

    ActiveMq-JMS简单实例使用tomcat.pdf

    【ActiveMQ-JMS简单实例使用Tomcat】是一个关于如何在...brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="localhost" useEmbeddedBroker="false"/&gt; ...

    activeMQ集群的使用与配置[收集].pdf

    ActiveMQ集群支持多种不同的方面,包括Queue consumer clusters、Broker clusters和Network of brokers等。 Queue Consumer Clusters ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,...

    ActiveMQ 图文介绍 英文版

    ActiveMQ的Broker架构设计灵活,支持多种类型的连接器和拓扑结构,可以适应不同的网络环境和业务需求。例如: - **集群(Clustering)**:通过主备(Master/Slave)、共享文件系统或共享数据库等方式实现高可用和负载...

    activeMQ集群的使用与配置[归类].pdf

    当一个broker失败时,客户端能够自动连接到其他存活的broker,这通过使用`failover://`协议实现。与`reliable://`协议(在ActiveMQ 3.x版本中使用)相比,`failover://`提供了更稳定的服务恢复。不过,standalone ...

    activemq activeMq笔记

    &lt;property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://192.168.1.145:61616)"/&gt; ``` ### 总结 Apache ActiveMQ 是一款功能强大的消息中间件,支持 JMS 标准并提供了丰富的特性和配置选项。...

    ActiveMq-JMS好用实例详解

    brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="localhost" useEmbeddedBroker="false"/&gt; name="jms/NormalConnectionFactory" auth=...

    ActiveMQ集群:网络连接模式(network connector)详解.docx

    ActiveMQ 通过 network connector 实现了分布式队列的目的, broker 实例之间可以共享队列和消费者列表。Network Connector 的配置主要包括两个方面:URI 配置和 networkConnector 配置。 URI 配置:URI 是 ...

    ActiveMQ 5.2指导手册

    ActiveMQ的组件包括代理(Broker)和连接器(Connectors)。代理是ActiveMQ的核心组件,负责接收、路由和分发消息。连接器则是代理与其他系统交互的接口。连接器可以分为低级别连接器和高级别连接器。低级别连接器...

    activeMQ集群的使用与配置.pdf

    在集群配置中,ActiveMQ 提供了两种主要的集群模式:Queue Consumer Clusters 和 Broker Clusters,这两种模式旨在提高系统的可靠性和性能。 1. **Queue Consumer Clusters** 在 Queue Consumer Clusters 中,多个...

Global site tag (gtag.js) - Google Analytics