最近一个项目要用到ActiveMq,并且需要最大程度的保证消息不丢失。以前对activeMq不是很熟悉,完全是摸着石头过河,目前基本配置都搞定了。只是对于它的自动重连一直找不到好的解决办法,我希望的效果是当一个broker(假设只有这一个,没有备用的)如果异常down掉的话,那么监听程序能够等待broker重启后再自动重新连接。看了它的文档似乎 设置一下failover:(tcp://localhost:61616) 就可以了
Failover Transport
Failover Transport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。 Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:
failover:(uri1,...,uriN)?transportOptions
failover:uri1,...,uriN
Transport Options的可选值如下:
Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt (in ms)
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts (in ms)
useExponentialBackOff true Should an exponential backoff be used between reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
randomize true use a random algorithm to choose the URI to use for reconnect from the list provided
backup false initialize and hold a second transport connection - to enable fast failover
例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
2.2.4 Discovery transport
Discovery transport是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表。以下是配置语法:
discovery:(discoveryAgentURI)?transportOptions
discovery:discoveryAgentURI
Transport Options的可选值如下:
Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts
useExponentialBackOff true Should an exponential backoff be used btween reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
例如:discovery:(multicast://default)?initialReconnectDelay=100
为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:
Xml代码
1. <broker name="foo">
2. <transportConnectors>
3. <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
4. </transportConnectors>
5. ...
6. </broker>
在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个 broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker 的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限 制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。
在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:
Java代码
1. (ActiveMQConnection)connection).addTransportListener(new TransportListener() {
2. public void onCommand(Object cmd) {
3. }
4.
5. public void onException(IOException exp) {
6. }
7.
8. public void transportInterupted() {
9. // The transport has suffered an interruption from which it hopes to recover.
10. }
11.
12. public void transportResumed() {
13. // The transport has resumed after an interruption.
14. }
15. });
---------------------
我的做法是 不要在服务器端设置,而在本地设置:failover:(tcp://192.168.0.245:61616?wireFormat.maxInactivityDuration=0)
分享到:
相关推荐
本资源提供的内容是关于ActiveMQ的连接池实现,分为两部分:一是作者自己实现的ActiveMQ连接池,二是新版本ActiveMQ自带的连接池。连接池是一种资源管理技术,通过复用已建立的数据库连接或网络连接,减少创建和销毁...
本文将详细介绍如何实现一个ActiveMQ连接池的完整封装实例工具类,并探讨其背后的设计思想。 首先,我们需要了解JMS(Java Message Service)接口,它是Java平台中用于创建、发送、接收和读取消息的标准API。...
ActiveMQ支持多种持久化机制,包括KahaDB和JDBC,可以根据需求选择合适的存储方式。 总之,`mqttjs`作为ActiveMQ的测试工具,可以帮助开发者轻松创建MQTT客户端,进行各种消息交互测试。结合ActiveMQ的丰富功能和可...
接下来,配置ActiveMQ连接。在`application.properties`或`application.yml`中,定义ActiveMQ服务器的URL和其他参数: ```properties spring.jms aktivemq.broker-url=tcp://localhost:61616 spring.jms aktivemq....
在C#中使用ActiveMQ,首先需要创建连接工厂(ConnectionFactory),这是建立到ActiveMQ服务器连接的关键。然后,通过工厂创建一个连接(Connection),并设置相关的连接属性,如主机地址、端口、用户名和密码。接着...
- **连接断开的原因**:在ActiveMQ的Failover模式中,默认情况下,当检测到连接失败时,会触发一个重新连接的任务。如果该任务被标记为守护线程(daemon thread),则当主线程结束时,守护线程也将随之结束,这可能...
这个插件可能扩展了ActiveMQ的内置安全机制,如AccessControl或JAAS(Java Authentication and Authorization Service)配置,以支持IP或特征码的检查。 要使用这样的插件,你需要在ActiveMQ的配置文件(通常是`...
ACK机制是指ActiveMQ对消息的ACK机制,即当消费者消费消息时,需要向ActiveMQ发送ACK确认消息。ACK机制可以确保消息的可靠性和一致性。 场景分析 在使用ACK机制时,需要对系统进行场景分析,以确定系统的需求和...
在提供的Java代码实例中,我们看到如何创建一个连接到ActiveMQ服务器的生产者,并发送持久化消息到一个名为"toov5_queue"的队列。`Session.AUTO_ACKNOWLEDGE`设置意味着消费者在接收到消息后会自动确认,无需显式...
例如,ActiveMQ允许设置WebSocket连接的心跳间隔,以确保连接的活跃性。另外,如果WebSocket连接意外断开,客户端需要有一个重连策略,以恢复与ActiveMQ的通信。 至于压缩包文件"activemq_ws_接收消息",其中可能...
ActiveMQ使用了一种称为“预取策略”的机制来决定向消费者发送多少条消息。默认情况下,每个消费者的预取数量为1000条,这意味着在没有特别配置的情况下,消费者将预取最多1000条消息到本地缓冲区中。当一个消费者...
在ActiveMQ服务器中添加简单的验证机制是非常重要的,因为它可以确保只有授权的客户端能够访问和操作消息队列。默认情况下,ActiveMQ配置并不包含任何安全措施,这意味着如果不进行额外的配置,任何客户端都可能连接...
ActiveMQ支持多种消息存储机制,如KahaDB、JDBC和LevelDB。KahaDB是默认的持久化存储,配置在`<dataDirectory>`标签内: ```xml <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core"> ...
2. 高性能和可扩展性:ActiveMQ采用了高效的存储机制和网络协议,能够处理大量并发连接和高频率的消息交换。通过集群和负载均衡,可以轻松扩展以满足不断增长的需求。 3. 路由和过滤:ActiveMQ提供丰富的消息路由和...
5. **网络连接器**:网络连接器允许在不同的ActiveMQ实例之间创建透明的连接,实现消息路由和代理,提供更灵活的消息传递机制。 6. **管理工具**:ActiveMQ附带了一个基于Web的管理控制台,用户可以通过浏览器直观...
6. ActiveMQ的连接工厂(ConnectionFactory)和连接(Connection)的关闭: - 在生产者或消费者代码的finally块中,可以看到关闭连接的代码。 - 这是为了确保即使在程序异常退出时,也能够释放与ActiveMQ服务器的...
ActiveMQ 通过心跳包检测客户端状态,但长时间发送消息可能导致客户端在等待服务器响应时关闭连接,从而丢失消息。为防止这种情况,可以使用持久化消息、及时处理非持久化消息,或启用事务。事务模式确保消息在...
3. **activemq持久化机制**:activemq通过将消息写入磁盘来实现持久化,即使服务器重启,未消费的消息也能被重新加载。 总结起来,activemq作为强大的消息中间件,提供了一整套解决方案,帮助开发者构建可靠、高效...
在实际应用中,ActiveMQ的配置可能需要根据具体的业务需求进行调整,例如设置消息持久化、网络连接策略、安全性、日志记录等。消息中间件的核心作用在于提供异步处理、解耦系统组件、实现负载均衡和容错机制。...
网络连接器的实现基于ActiveMQ的内部机制,包括消息的路由、复制和消费者管理。消息在网络中传输时,会根据配置的策略决定是否转发,以及如何处理网络故障和恢复。通过适当的配置,可以实现高效且可靠的分布式消息...