`
flying.xu77
  • 浏览: 4684 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

client与broker断开后重新连接

阅读更多
qpid的client端有可能会由于某种原因与broker断开连接,如网络连接断开,broker的节点删除等等。
公司的项目需要用java编写一个Service,用来监听broker的消息。要求如果与broker断开后,必须尝试重新连接。

那应该完成以下一个步骤:
(1)检测到client与服务器断开。
(2)如果断开,尝试重新连接。

对于(1),javax.jms.Connection对象可以设置一个ExceptionListener对象,用来监听是否发生连接异常。

以下为client端的类


/**
 * 该类负责监听MQ的消息。消息的消费者为{@link GCMSender}的一个实例。
 * 
 * @author xiaofei.xu
 */
public class BrokerClient {

	/** Logger */
	private static final Logger logger = LoggerFactory.getLogger(BrokerClient.class);

	/** broker连接 */
	private AMQConnection connection;

	......

	/** 连接异常处理 */
	private ExceptionListener exceptionListener;

	/**
	 * 进行初始化
	 * 
	 * @throws AMQException
	 * @throws URLSyntaxException
	 * @throws JMSException
	 */
	private void init() throws URLSyntaxException, AMQException, JMSException {

		logger.info("# init() start");

		this.connection = new AMQConnection(this.url);
		this.connection.setExceptionListener(this.exceptionListener);

		logger.info("# init() end");
	}

	/**
	 * 设置连接异常监听
	 * 
	 * @param exceptionListener
	 *            异常监听对象
	 */
	public void setExceptionListener(final ExceptionListener exceptionListener) {
		this.exceptionListener = exceptionListener;
	}

	......

	/**
	 * 判定客户端是否关闭
	 * 
	 * @return 关闭:true,否则:false
	 */
	public boolean isStop() {
		return this.connection.isClosed();
	}

	/**
	 * 关闭服务
	 * 
	 * @throws JMSException
	 * @throws NamingException
	 */
	public void stop() throws JMSException {

		logger.info("# stop() start");

		if (connection != null) {
			connection.close();
		}

		logger.info("# stop() end");
	}
}


如果检测到异常,就关闭客户端。启动Service时启动一个线程,监视连接是否正常,如果被关闭,则重新连接。

public class GCMService {

	/** broker客户端名 */
	private static final String CN_USER_MANAGER = "USER_MANAGER";

	/** 互斥量 */
	private Object lock = new Object();

	/** 客户端列表 */
	private List<BrokerClient> brokerClients;

	......

	/**
	 * 增加broker监听客户端
	 */
	private void addBrokerClient() {
		logger.info("# addBrokerClient() start");

		......

		// 连接异常处理
		userManagerBrokerClient.setExceptionListener(new ExceptionListener() {
			@Override
			public void onException(JMSException ex) {
				try {
					synchronized (lock) { // 同步锁
						for (BrokerClient client : brokerClients) {
							if (StringUtils.equals(client.getName(), CN_USER_MANAGER)) {
								// 关闭客户端
								client.stop();
							}
						}
					}
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		brokerClients.add(userManagerBrokerClient);

		......

		logger.info("# addBrokerClient() end");
	}

	/**
	 * 启动GCM服务
	 */
	public void start() {

		......

		// 创建一个线程,监听连接是否正常
		new Thread(new Runnable() {

			@Override
			public void run() {

				while (true) {

					// 连接检查间隔时间
					try {
						Thread.sleep(config.getConnectionCheckInterval());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					synchronized (lock) {
					// 同步锁

						for (BrokerClient client : brokerClients) {

							if (client.isStop()) {
							// 如果连接被关闭,则重新建立连接

							logger.info(MessageFormat.format("The {0} client was stoped.",
							    new Object[] { client.getName() }));

							boolean isSucceed = false;
							while (!isSucceed) {

								try {

									logger.info("Six seconds later, try to reconnect.");

									// 间隔6秒
									Thread.sleep(6000);

									// 开始
									client.start();
									isSucceed = true;

									logger.info("Succeed to reconnect.");

								} catch (Exception ex) {
									logger.error("Failed to reconnect.", ex);
								}
							}
						}
					}
				}
			}
		}}, "connection_check").start();
			logger.info("Gcm service starts successfully.");
		} catch (Exception e) {
			logger.error("Gcm service failed to start.", e);
		}

		logger.info("# start() end");
	}

	/**
	 * 服务主入口。
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		GCMService gcm = GCMService.getInstance();
		gcm.start();
	}
}
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    Moquette-JavaMQTT轻量级broker

    - **断开连接**: 当客户端完成其任务后,会断开与broker的连接。 **4. 使用Moquette** 要使用Moquette,开发者需要: - **下载和安装**: 获取Moquette的jar包,如`moquette-io-moquette-4b1a856.jar`。 - **配置**:...

    C# MQTT客户端服务端broker

    C# MQTT客户端与服务端(broker)是网络通信领域中的一种常见实现,主要应用于物联网(IoT)场景,因为MQTT(Message Queuing Telemetry Transport)协议设计时考虑了低带宽、高延迟以及不可靠的网络环境。...

    mqttClient最新的源码

    7. **断线重连**:M2Mqtt库支持自动重连功能,可以在连接断开后尝试重新连接到Broker。 将这个源码集成到你的工程中,你需要按照以下步骤操作: 1. 解压下载的"paho.mqtt.m2mqtt-master"压缩包。 2. 将解压后的源...

    微信小程序连接mqtt.rar

    在`onHide`或`onUnload`时,调用`client.disconnect()`断开连接,以节省资源。 ```javascript App({ onLaunch: function () { let client = MQTT.create(...) client.on('connect', () =&gt; { console.log('已...

    使用org.eclipse.paho.client.mqttv3实现mqtt 消息队列

    - MQTT客户端还会保存会话状态,确保在重新连接后能够接收到未接收到的消息。 6. **清理连接**: - 当不再需要使用MQTT客户端时,应调用`disconnect()`方法断开连接,释放资源。 在实际应用中,`org.eclipse....

    flutter mqtt

    2. `mqtt_service.dart`文件:封装了与MQTT相关的操作,如连接、断开、订阅和发布,方便其他页面调用。 3. `mqtt_connection_bloc.dart`和`mqtt_connection_event.dart`:使用Bloc状态管理库来处理MQTT连接状态的...

    javascript client mqtt websocket

    6. **断开连接**:当不再需要与Broker通信时,调用`disconnect`方法。 ```javascript client.disconnect(); ``` JavaScript结合MQTT和WebSocket可以创建强大的实时Web应用程序,尤其是在IoT场景中,允许网页直接...

    MQTTDemo(Java)

    - **断开连接**:完成工作后,调用`disconnect()`方法关闭连接。 3. **MQTTDemo项目可能包含的内容** - `MqttClient`的实例化和配置。 - 连接到MQTT broker的代码。 - 发布消息的示例,可能包括不同QoS级别的...

    MQTTClient.rar_2FI5_MQTTClient_explanationw2s_mqtt_region4vv

    如果设置为false,则保留会话状态,即使客户端断开连接,当重新连接时,可以继续处理之前未完成的消息。 9. **2FI5, explanationw2s, mqtt, region4vv**:这些标签可能是特定项目、团队代码或关键词,但在这里没有...

    MQTT_Client

    7. **离线消息**: 当客户端断开连接时,服务器可以存储订阅主题的未读消息,待客户端重新连接后发送。 **TT3-master** - 这个文件名可能是某个MQTT客户端项目或库的版本标识。具体来说,"TT3"可能代表项目或模块的...

    IOS-MQTT-Client 编译过程与使用

    1. **建立连接**:使用MQTTClient提供的API创建与MQTT代理(Broker)的连接。 2. **订阅主题**:连接成功后,可以订阅感兴趣的MQTT主题。 3. **发布消息**:向订阅的主题发布消息。 4. **消息处理**:接收和处理从...

    mqtt开发C语言基于paho实现MQTT客户端实战案例

    当完成 MQTT 会话后,使用`MQTTDisconnect()`函数安全地断开与broker的连接。 在实际应用中,你还需要考虑以下几点: - **错误处理**:对所有网络操作进行错误检查,并妥善处理异常情况。 - **重连机制**:当连接...

    Qt Mqtt连接阿里云示例

    确保在连接断开后能够重新连接,可以设置重连策略。 6. **错误处理** 需要捕获和处理可能出现的连接错误,例如网络问题、认证失败等。可以通过`QMqttClient`的`errorOccurred`信号进行处理。 7. **运行和调试** ...

    使用paho-mqtt-c做的mqtt通讯

    `paho-mqtt-c`库支持自动重连机制,当客户端与broker的连接中断后,它会尝试重新建立连接,确保服务的连续性。这对于网络不稳定或者设备间歇性断网的场景尤为重要。 SSL/TLS(Secure Sockets Layer/Transport Layer...

    org.eclipse.paho.client.mqttv3-1.2.0.zip

    在完成所有操作后,记得断开与Broker的连接。 在实际应用中,Eclipse Paho MQTT客户端库常用于远程设备监控、传感器数据收集、实时数据分析等多种物联网场景。其轻量级的特性使得它尤其适合资源有限的嵌入式设备。 ...

    MQTT_Client.rar

    - **断开连接**:完成调试后,断开与Broker的连接。 5. **安全考虑** 在使用MQTT时,必须考虑到安全性。这包括使用SSL/TLS加密连接,保护敏感信息,以及对客户端身份进行验证。此外,谨慎设计和管理主题,防止...

    MQTT协议实现的推送功能

    当连接中断时,应有重连机制,比如使用心跳包维持连接状态,并在断开后尝试重新连接。 - **主题订阅与发布**:客户端通过订阅主题来筛选接收的消息,主题可以是多级结构,用“/”分隔。发布消息时,客户端指定主题...

    MQTT_Client源码.rar

    3. **连接与断开**:客户端通过TCP/IP连接到MQTT服务器(也称为Broker),进行身份验证和心跳检测,保持连接状态。断开时,客户端可以发送DISCONNECT报文通知Broker。 在“MQTT_Client”源代码中,我们可以期待以下...

    delphi MQTT客户端demo

    为了保持连接状态,开发者需要处理连接断开的情况,例如通过`OnDisconnected`事件,并在适当的时候重新连接。还可以设置心跳间隔(KeepAlive)以确保连接的活性。 ```delphi MQTTClient1.KeepAlive := 60; // 设置...

Global site tag (gtag.js) - Google Analytics