`
longgangbai
  • 浏览: 7355792 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例

阅读更多

使用IBM MQTTv3实现相关的发布订阅功能

MQTTv3的发布消息的实现:

package com.etrip.mqttv3;

import com.ibm.micro.client.mqttv3.MqttClient;
import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
import com.ibm.micro.client.mqttv3.MqttMessage;
import com.ibm.micro.client.mqttv3.MqttTopic;
/**
 * MQTTV3的发布消息类
 * 
 * @author longgangbai
 */
public class MQTTPub { 
	public static void doTest(){ 
		try { 
			MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub"); 
			MqttTopic topic = client.getTopic("tokudu/china"); 
			MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes()); 
			message.setQos(1); 
			client.connect();
			while(true){
				MqttDeliveryToken token = topic.publish(message); 
				while (!token.isComplete()){ 
					token.waitForCompletion(1000); 
				} 
			}
		} catch (Exception e) { 
			e.printStackTrace(); 
		} 
	} 
} 




 MQTTV3的订阅消息类

package com.etrip.mqttv3;
import com.ibm.micro.client.mqttv3.MqttClient;
import com.ibm.micro.client.mqttv3.MqttConnectOptions;
/**
 * MQTTV3的订阅消息类
 * 
 * @author longgangbai
 */
public class MQTTSubsribe { 
	public static String doTest() { 
		try { 
			//创建MqttClient
			MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000"); 
			//回调处理类
			CallBack callback = new CallBack(); 
			client.setCallback(callback); 
			//创建连接可选项信息
			MqttConnectOptions conOptions = new MqttConnectOptions(); 
			//
			conOptions.setCleanSession(false); 
			//连接broker
			client.connect(conOptions); 
			//发布相关的订阅
			client.subscribe("tokudu/china", 1); 
			//client.disconnect(); 
		} catch (Exception e) { 
			e.printStackTrace(); 
			return "failed"; 
		} 
		return "success"; 
	} 
} 




 回调处理类处理订阅的消息类

 

package com.etrip.mqttv3;

import com.ibm.micro.client.mqttv3.MqttCallback;
import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
import com.ibm.micro.client.mqttv3.MqttMessage;
import com.ibm.micro.client.mqttv3.MqttTopic;
/**
 * 回调处理类
 * 处理订阅的消息类
 * 
 * @author longgangbai
 */
public class CallBack implements MqttCallback { 
	
	public CallBack() { 
	} 
	/**
	 * 接收到信息的处理
	 */
	public void messageArrived(MqttTopic topic, MqttMessage message) { 
		try { 
			System.out.println(" MQTTSubsribe  message.toString()"+message.toString());
		} catch (Exception e) { 
			e.printStackTrace(); 
		} 
	} 
	public void connectionLost(Throwable cause) {
		
	} 
	public void deliveryComplete(MqttDeliveryToken token) {
		
	} 
} 

 

 

测试类:

package com.etrip.mqttv3;
/**
 * MQTTV3的测试类
 * 
 * @author longgangbai
 */
public class MQTTMain {
	public static void main(String[] args) {
		//订阅消息的方法
		MQTTSubsribe.doTest();
		//发布消息的类
		MQTTPub.doTest();
		
	}
}

 

 

分享到:
评论
4 楼 u012724947 2015-04-16  
我可以连接  为什么发消息就失败
3 楼 ghris 2012-11-26  
longgangbai 写道
ghris 写道
请问mqttv3这个包在哪里能下载,找不到好郁闷。。。
我邮箱是ghris001@163.com你有的话能发给我一份吗

我在文章中刚才添加相关的jar文件,请下载。

谢谢 提供下载
2 楼 longgangbai 2012-11-26  
ghris 写道
请问mqttv3这个包在哪里能下载,找不到好郁闷。。。
我邮箱是ghris001@163.com你有的话能发给我一份吗

我在文章中刚才添加相关的jar文件,请下载。
1 楼 ghris 2012-11-23  
请问mqttv3这个包在哪里能下载,找不到好郁闷。。。
我邮箱是ghris001@163.com你有的话能发给我一份吗

相关推荐

Global site tag (gtag.js) - Google Analytics