qpid的client端有可能会由于某种原因与broker断开连接,如网络连接断开,broker的节点删除等等。
公司的项目需要用java编写一个Service,用来监听broker的消息。要求如果与broker断开后,必须尝试重新连接。
那应该完成以下一个步骤:
(1)检测到client与服务器断开。
(2)如果断开,尝试重新连接。
对于(1),javax.jms.Connection对象可以设置一个ExceptionListener对象,用来监听是否发生连接异常。
以下为client端的类
/**
* 该类负责监听MQ的消息。消息的消费者为{@link GCMSender}的一个实例。
*
* @author xiaofei.xu
*/
public class BrokerClient {
/** Logger */
private static final Logger logger = LoggerFactory.getLogger(BrokerClient.class);
/** broker连接 */
private AMQConnection connection;
......
/** 连接异常处理 */
private ExceptionListener exceptionListener;
/**
* 进行初始化
*
* @throws AMQException
* @throws URLSyntaxException
* @throws JMSException
*/
private void init() throws URLSyntaxException, AMQException, JMSException {
logger.info("# init() start");
this.connection = new AMQConnection(this.url);
this.connection.setExceptionListener(this.exceptionListener);
logger.info("# init() end");
}
/**
* 设置连接异常监听
*
* @param exceptionListener
* 异常监听对象
*/
public void setExceptionListener(final ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
}
......
/**
* 判定客户端是否关闭
*
* @return 关闭:true,否则:false
*/
public boolean isStop() {
return this.connection.isClosed();
}
/**
* 关闭服务
*
* @throws JMSException
* @throws NamingException
*/
public void stop() throws JMSException {
logger.info("# stop() start");
if (connection != null) {
connection.close();
}
logger.info("# stop() end");
}
}
如果检测到异常,就关闭客户端。启动Service时启动一个线程,监视连接是否正常,如果被关闭,则重新连接。
public class GCMService {
/** broker客户端名 */
private static final String CN_USER_MANAGER = "USER_MANAGER";
/** 互斥量 */
private Object lock = new Object();
/** 客户端列表 */
private List<BrokerClient> brokerClients;
......
/**
* 增加broker监听客户端
*/
private void addBrokerClient() {
logger.info("# addBrokerClient() start");
......
// 连接异常处理
userManagerBrokerClient.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException ex) {
try {
synchronized (lock) { // 同步锁
for (BrokerClient client : brokerClients) {
if (StringUtils.equals(client.getName(), CN_USER_MANAGER)) {
// 关闭客户端
client.stop();
}
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
brokerClients.add(userManagerBrokerClient);
......
logger.info("# addBrokerClient() end");
}
/**
* 启动GCM服务
*/
public void start() {
......
// 创建一个线程,监听连接是否正常
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
// 连接检查间隔时间
try {
Thread.sleep(config.getConnectionCheckInterval());
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
// 同步锁
for (BrokerClient client : brokerClients) {
if (client.isStop()) {
// 如果连接被关闭,则重新建立连接
logger.info(MessageFormat.format("The {0} client was stoped.",
new Object[] { client.getName() }));
boolean isSucceed = false;
while (!isSucceed) {
try {
logger.info("Six seconds later, try to reconnect.");
// 间隔6秒
Thread.sleep(6000);
// 开始
client.start();
isSucceed = true;
logger.info("Succeed to reconnect.");
} catch (Exception ex) {
logger.error("Failed to reconnect.", ex);
}
}
}
}
}
}
}}, "connection_check").start();
logger.info("Gcm service starts successfully.");
} catch (Exception e) {
logger.error("Gcm service failed to start.", e);
}
logger.info("# start() end");
}
/**
* 服务主入口。
*
* @param args
*/
public static void main(String[] args) {
GCMService gcm = GCMService.getInstance();
gcm.start();
}
}
分享到:
相关推荐
- **断开连接**: 当客户端完成其任务后,会断开与broker的连接。 **4. 使用Moquette** 要使用Moquette,开发者需要: - **下载和安装**: 获取Moquette的jar包,如`moquette-io-moquette-4b1a856.jar`。 - **配置**:...
C# MQTT客户端与服务端(broker)是网络通信领域中的一种常见实现,主要应用于物联网(IoT)场景,因为MQTT(Message Queuing Telemetry Transport)协议设计时考虑了低带宽、高延迟以及不可靠的网络环境。...
7. **断线重连**:M2Mqtt库支持自动重连功能,可以在连接断开后尝试重新连接到Broker。 将这个源码集成到你的工程中,你需要按照以下步骤操作: 1. 解压下载的"paho.mqtt.m2mqtt-master"压缩包。 2. 将解压后的源...
在`onHide`或`onUnload`时,调用`client.disconnect()`断开连接,以节省资源。 ```javascript App({ onLaunch: function () { let client = MQTT.create(...) client.on('connect', () => { console.log('已...
- MQTT客户端还会保存会话状态,确保在重新连接后能够接收到未接收到的消息。 6. **清理连接**: - 当不再需要使用MQTT客户端时,应调用`disconnect()`方法断开连接,释放资源。 在实际应用中,`org.eclipse....
2. `mqtt_service.dart`文件:封装了与MQTT相关的操作,如连接、断开、订阅和发布,方便其他页面调用。 3. `mqtt_connection_bloc.dart`和`mqtt_connection_event.dart`:使用Bloc状态管理库来处理MQTT连接状态的...
6. **断开连接**:当不再需要与Broker通信时,调用`disconnect`方法。 ```javascript client.disconnect(); ``` JavaScript结合MQTT和WebSocket可以创建强大的实时Web应用程序,尤其是在IoT场景中,允许网页直接...
- **断开连接**:完成工作后,调用`disconnect()`方法关闭连接。 3. **MQTTDemo项目可能包含的内容** - `MqttClient`的实例化和配置。 - 连接到MQTT broker的代码。 - 发布消息的示例,可能包括不同QoS级别的...
如果设置为false,则保留会话状态,即使客户端断开连接,当重新连接时,可以继续处理之前未完成的消息。 9. **2FI5, explanationw2s, mqtt, region4vv**:这些标签可能是特定项目、团队代码或关键词,但在这里没有...
7. **离线消息**: 当客户端断开连接时,服务器可以存储订阅主题的未读消息,待客户端重新连接后发送。 **TT3-master** - 这个文件名可能是某个MQTT客户端项目或库的版本标识。具体来说,"TT3"可能代表项目或模块的...
1. **建立连接**:使用MQTTClient提供的API创建与MQTT代理(Broker)的连接。 2. **订阅主题**:连接成功后,可以订阅感兴趣的MQTT主题。 3. **发布消息**:向订阅的主题发布消息。 4. **消息处理**:接收和处理从...
当完成 MQTT 会话后,使用`MQTTDisconnect()`函数安全地断开与broker的连接。 在实际应用中,你还需要考虑以下几点: - **错误处理**:对所有网络操作进行错误检查,并妥善处理异常情况。 - **重连机制**:当连接...
确保在连接断开后能够重新连接,可以设置重连策略。 6. **错误处理** 需要捕获和处理可能出现的连接错误,例如网络问题、认证失败等。可以通过`QMqttClient`的`errorOccurred`信号进行处理。 7. **运行和调试** ...
`paho-mqtt-c`库支持自动重连机制,当客户端与broker的连接中断后,它会尝试重新建立连接,确保服务的连续性。这对于网络不稳定或者设备间歇性断网的场景尤为重要。 SSL/TLS(Secure Sockets Layer/Transport Layer...
在完成所有操作后,记得断开与Broker的连接。 在实际应用中,Eclipse Paho MQTT客户端库常用于远程设备监控、传感器数据收集、实时数据分析等多种物联网场景。其轻量级的特性使得它尤其适合资源有限的嵌入式设备。 ...
- **断开连接**:完成调试后,断开与Broker的连接。 5. **安全考虑** 在使用MQTT时,必须考虑到安全性。这包括使用SSL/TLS加密连接,保护敏感信息,以及对客户端身份进行验证。此外,谨慎设计和管理主题,防止...
当连接中断时,应有重连机制,比如使用心跳包维持连接状态,并在断开后尝试重新连接。 - **主题订阅与发布**:客户端通过订阅主题来筛选接收的消息,主题可以是多级结构,用“/”分隔。发布消息时,客户端指定主题...
3. **连接与断开**:客户端通过TCP/IP连接到MQTT服务器(也称为Broker),进行身份验证和心跳检测,保持连接状态。断开时,客户端可以发送DISCONNECT报文通知Broker。 在“MQTT_Client”源代码中,我们可以期待以下...
为了保持连接状态,开发者需要处理连接断开的情况,例如通过`OnDisconnected`事件,并在适当的时候重新连接。还可以设置心跳间隔(KeepAlive)以确保连接的活性。 ```delphi MQTTClient1.KeepAlive := 60; // 设置...