ActiveMQ的客户端支持失效重连。昨天在配置ActiveMQ的主从结构后,客户端使用失效重连协议连接到代理,如下所示:
failover:(tcp://master:61616,tcp://slave:61616)
测试的时候客户端有时候连接到master上,有时候连接到slave上。这就尴尬了,正常理解的话,配在前面的broker url应该先尝试连接,连接不上才连接后面的url,即从前到后的顺序,但是测试结果来看,是不一定的。
使用logback打开了ActiveMQ的日志
<logger name="org.apache.activemq" level="debug" additivity="false"> <appender-ref ref="STDOUT"/> </logger>
日志中输出的确有的时候只直接连了slave,而不是每次都先连master,而且master是正常的,那就说明的确客户端是做过处理了,并不是按从前到后的顺序依次连接。
日志中会输出“Successfully connected to slave...”,logback配置中输出了代码行数,因此很快定位到程序,org.apache.activemq.transport.failover.FailoverTransport类的876行(客户端使用的是activemq-all-5.4.3.jar),代码如下
final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
// If updateURIsURL is specified, read the file and add any new
// transport URI's to this FailOverTransport.
// Note: Could track file timestamp to avoid unnecessary reading.
String fileURL = getUpdateURIsURL();
if (fileURL != null) {
BufferedReader in = null;
String newUris = null;
StringBuffer buffer = new StringBuffer();
try {
in = new BufferedReader(getURLStream(fileURL));
while (true) {
String line = in.readLine();
if (line == null) {
break;
}
buffer.append(line);
}
newUris = buffer.toString();
} catch (IOException ioe) {
LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException ioe) {
// ignore
}
}
}
processNewTransports(isRebalanceUpdateURIs(), newUris);
}
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if (connectedTransport.get() != null || disposed || connectionFailure != null) {
return false;
} else {
List<URI> connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect to.");
} else {
if (!useExponentialBackOff || reconnectDelay == -1) {
reconnectDelay = initialReconnectDelay;
}
synchronized (backupMutex) {
if (backup && !backups.isEmpty()) {
BackupTransport bt = backups.remove(0);
Transport t = bt.getTransport();
URI uri = bt.getUri();
t.setTransportListener(myTransportListener);
try {
if (started) {
restoreTransport(t);
}
reconnectDelay = initialReconnectDelay;
failedConnectTransportURI = null;
connectedTransportURI = uri;
connectedTransport.set(t);
reconnectMutex.notifyAll();
connectFailures = 0;
LOG.info("Successfully reconnected to backup " + uri);
return false;
} catch (Exception e) {
LOG.debug("Backup transport failed", e);
}
}
}
Iterator<URI> iter = connectList.iterator();
while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
URI uri = iter.next();
Transport t = null;
try {
LOG.debug("Attempting connect to: " + uri);
SslContext.setCurrentSslContext(brokerSslContext);
t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
if (started) {
restoreTransport(t);
}
LOG.debug("Connection established");
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(t);
reconnectMutex.notifyAll();
connectFailures = 0;
// Make sure on initial startup, that the
// transportListener
// has been initialized for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
// if it isn't set after 2secs - it
// probably never will be
listenerMutex.wait(2000);
} catch (InterruptedException ex) {
}
}
}
if (transportListener != null) {
transportListener.transportResumed();
} else {
LOG.debug("transport resumed by transport listener not set");
}
if (firstConnection) {
firstConnection = false;
LOG.info("Successfully connected to " + uri);
} else {
LOG.info("Successfully reconnected to " + uri);
}
connected = true;
return false;
} catch (Exception e) {
failure = e;
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
if (t != null) {
try {
t.stop();
} catch (Exception ee) {
LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
}
}
} finally {
SslContext.setCurrentSslContext(null);
}
}
}
}
int reconnectAttempts = 0;
if (firstConnection) {
if (this.startupMaxReconnectAttempts != 0) {
reconnectAttempts = this.startupMaxReconnectAttempts;
}
}
if (reconnectAttempts == 0) {
reconnectAttempts = this.maxReconnectAttempts;
}
if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has
// been initialized
// for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
listenerMutex.wait(2000);
} catch (InterruptedException ex) {
}
}
}
if (transportListener != null) {
if (connectionFailure instanceof IOException) {
transportListener.onException((IOException) connectionFailure);
} else {
transportListener.onException(IOExceptionSupport.create(connectionFailure));
}
}
reconnectMutex.notifyAll();
return false;
}
}
if (!disposed) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
synchronized (sleepMutex) {
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
}
}
}
return !disposed;
}
代码List<URI> connectList = getConnectList();获取配置的broker url,打了断点后debug发现,这边获取的List中的URI有时候已经不是配置中的顺序了,进入getConnectList方法
private List<URI> getConnectList() { ArrayList<URI> l = new ArrayList<URI>(uris); boolean removed = false; if (failedConnectTransportURI != null) { removed = l.remove(failedConnectTransportURI); } if (randomize) { // Randomly, reorder the list by random swapping for (int i = 0; i < l.size(); i++) { int p = (int) (Math.random() * 100 % l.size()); URI t = l.get(p); l.set(p, l.get(i)); l.set(i, t); } } if (removed) { l.add(failedConnectTransportURI); } LOG.debug("urlList connectionList:" + l); return l; }
这边有个randomize,默认值为true,即默认这边会对配置的url重新排序,所以导致连接初始化是可能不是按照想象的从前到后的顺序连接,解决也简单,在连接后价格参数,将randomize设为false
failover:(tcp://master:61616,tcp://slave:61616)?randomize=false
这样每次都会先连接master,master连不上才会连slave。
相关推荐
ActiveMQ 中的故障处理连接方式(Failover Transport Options)是指在 Message Queue(MQ)连接的 URI 中配置的一系列参数,用于控制连接的重连、超时、缓存等行为。下面详细介绍这些参数的作用和配置。 ...
spring.activemq.broker-url=failover:(tcp://10.0.1.227:61616,tcp://10.0.1.228:61616,tcp://10.0.1.229:61616,mqtt://10.0.1.227:1883,mqtt://10.0.1.228:1883,mqtt://10.0.1.229:1883) #spring.activemq.broker-...
`brokerURL`属性指定了连接到ActiveMQ Broker的URL,`brokerName`标识了Broker的名称,`useEmbeddedBroker`设置为`false`表示不使用内嵌的Broker。 3. **配置JNDI资源**:`<Resource>`标签中的`name`属性定义了...
<property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://192.168.1.145:61616)"/> ``` ### 总结 Apache ActiveMQ 是一款功能强大的消息中间件,支持 JMS 标准并提供了丰富的特性和配置选项。...
- **定义:** 正确配置客户端连接到ActiveMQ Broker的URL。 - **注意事项:** 确保URL中的协议、主机名和端口号正确无误。 以上知识点全面覆盖了ActiveMQ的核心概念、配置细节、集群方式以及监控和使用注意事项等方面...
8. **Spring Boot与ActiveMQ**:在Spring Boot应用中,可以利用 starters 来简化配置,通过`spring.activemq.broker-url`和`spring.activemq.user`等属性快速设置ActiveMQ连接。 9. **消息确认**:ActiveMQ支持两种...
对于客户端来说,ActiveMQ提供了failover://协议,通过该协议可以实现客户端自动重新连接到可用的Broker。客户端不需要了解底层集群的实现细节,只需知道消息代理的地址即可。ActiveMQ还支持静态发现和动态发现机制...
【ActiveMQ-JMS简单实例使用Tomcat】是一个关于如何在...brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="localhost" useEmbeddedBroker="false"/> ...
ActiveMQ集群支持多种不同的方面,包括Queue consumer clusters、Broker clusters和Network of brokers等。 Queue Consumer Clusters ActiveMQ支持订阅同一个queue的consumers上的集群。如果一个consumer失效,...
当一个broker失败时,客户端能够自动连接到其他存活的broker,这通过使用`failover://`协议实现。与`reliable://`协议(在ActiveMQ 3.x版本中使用)相比,`failover://`提供了更稳定的服务恢复。不过,standalone ...
brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="localhost" useEmbeddedBroker="false"/> name="jms/NormalConnectionFactory" auth=...
ActiveMQ 通过 network connector 实现了分布式队列的目的, broker 实例之间可以共享队列和消费者列表。Network Connector 的配置主要包括两个方面:URI 配置和 networkConnector 配置。 URI 配置:URI 是 ...
ActiveMQ的组件包括代理(Broker)和连接器(Connectors)。代理是ActiveMQ的核心组件,负责接收、路由和分发消息。连接器则是代理与其他系统交互的接口。连接器可以分为低级别连接器和高级别连接器。低级别连接器...
在集群配置中,ActiveMQ 提供了两种主要的集群模式:Queue Consumer Clusters 和 Broker Clusters,这两种模式旨在提高系统的可靠性和性能。 1. **Queue Consumer Clusters** 在 Queue Consumer Clusters 中,多个...