`

使用阿里云消息队列

    博客分类:
  • Java
阅读更多

使用阿里云消息队列

控制台地址:http://ons.console.aliyun.com/#/home/topic

 

(1)生成Producer ID

点击"申请发布"

 示例代码:

package com.alibaba.ons.demo;

import java.util.Properties;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;

public class ProducerClient {

    public static void main(String[] args) {
       Properties properties = new Properties();
       properties.put(PropertyKeyConst.ProducerId, "PID_whuang");
       properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
       properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
       Producer producer = ONSFactory.createProducer(properties);
           
       //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
       producer.start();
       Message msg = new Message(
            //Message Topic
            "com_hbjltv", 
            //Message Tag,
            //可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤        
            "TagA",
            //Message Body
            //任何二进制形式的数据,ONS不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式
            "Hello ONS".getBytes()
        );
        
        // 设置代表消息的业务关键属性,请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下,可通过ONS Console查询消息并补发。
        // 注意:不设置也不会影响消息正常收发
        msg.setKey("ORDERID_100");
        
        //发送消息,只要不抛异常就是成功
        SendResult sendResult = producer.send(msg);
        System.out.println(sendResult);

        // 在应用退出前,销毁Producer对象
        // 注意:如果不销毁也没有问题
        producer.shutdown();
    }
}

 

 

 

(2)生成Consumer ID

点击"申请订阅"

 示例代码:

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.ConsumerId, "CID_tv_mobile");
        properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
        properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("com_hbjltv", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
    }
}   

 

(3) clientId 的限制

阿里云消息队列对clientId的名称有严格限制:

(a)必须以申请的Consumer ID 开头,后面跟@@@,接着跟用于区分客户端的标志,

例如:

CID_tv_mobile@@@86458fd 是合法的

CID_tv_mobile@@86458fd 是非法的,因为只有两个@

(b)总长度不能超过23个字符

例如

CID_tv_mobile@@@86458_A是合法的

CID_tv_mobile@@@86458_Ab是非法的,因为超过了23个字符



 

 

(4)在手机端(客户端)增加订阅逻辑

 

package com.service;

import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;

import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.content.SharedPreferences;
import android.os.IBinder;

import com.common.util.SystemHWUtil;
import com.dict.Constants3;
import com.jianli.R;
import com.push.PushCallback;
import com.string.widget.util.ValueWidget;
import com.util.MacSignature;
import com.util.ShopUtil;

/**
 * @author Dominik Obermaier
 */
public class MQTTService extends Service {

	// public static final String BROKER_URL =
	// "tcp://broker.mqttdashboard.com:1883";
	// public static String BROKER_URL = "tcp://172.16.15.50:1883";
	public static String BROKER_URL_FORMAT = "tcp://%s:%s";
	// public static final String BROKER_URL = "tcp://test.mosquitto.org:1883";

	/*
	 * In a real application, you should get an Unique Client ID of the device
	 * and use this, see
	 * http://android-developers.blogspot.de/2011/03/identifying
	 * -app-installations.html
	 */
	public static String clientId = null;

	/**
	 * 不能含有英文句点,可以包含下划线
	 */
	public static final String TOPIC = "com_hbjltv";
	private MqttClient mqttClient;
//	private String ip="182.92.80.122";
	/***
	 * 是否连接上activeMQ
	 */
	private boolean online = false;
	boolean isAliyun=false;

	public IBinder onBind(Intent intent) {
		return null;
	}

	@Override
	public void onCreate() {
		super.onCreate();
	}

	private MqttClient createMqttClient(String serverURL, String clientId) throws MqttException{
		return new MqttClient(serverURL, clientId,
				new MemoryPersistence());
	}
	/***
	 * 
	 * @param serverURL
	 * @param clientId
	 *            : 最大长度:23
	 * @param isAllowOffline
	 * @param username
	 * @param password
	 * @throws MqttException
	 */
	private void connectAndSubscribe(String serverURL, String clientId,
	/* String topicFilter, */boolean isAllowOffline, String username,
			String password) throws MqttException {
		
		if(isAliyun){
			if(!ShopUtil.validateClientId(getApplicationContext(), clientId)){
				return;
			}
		}
		mqttClient = createMqttClient(serverURL, clientId);
		MqttConnectOptions options = new MqttConnectOptions();
		options.setCleanSession(!isAllowOffline);// mqtt receive offline message
		if (ValueWidget.isNullOrEmpty(username)) {
			username = null;
		}
		String sign=null;
		if(isAliyun){
        	try {
				sign = MacSignature.macSignature(Constants3.CONSUMER_ID_TV, password);
				password=sign;
			} catch (InvalidKeyException e) {
				e.printStackTrace();
			} catch (NoSuchAlgorithmException e) {
				e.printStackTrace();
			}
        	
        }
		if (ValueWidget.isNullOrEmpty(password)) {
			password = null;
		} else {
			options.setPassword(password.toCharArray());
		}
		options.setUserName(username);
		options.setConnectionTimeout(10);
		options.setKeepAliveInterval(10);
		if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null
			mqttClient = createMqttClient(serverURL, clientId);
		}
		mqttClient.setCallback(new PushCallback(this));
		boolean isSuccess=false;
		mqttClient.connect(options);
		isSuccess=true;
		// Subscribe to all subtopics of homeautomation
		// mqttClient.subscribe(topicFilter);
		if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null
			mqttClient = createMqttClient(serverURL, clientId);
		}
		if(isAliyun){
    		final String p2ptopic = TOPIC+"/p2p/";
    		//同时订阅两个topic,一个是基于标准mqtt协议的发布订阅模式,一个是扩展的点对点推送模式
    		final String[] topicFilters=new String[]{TOPIC,p2ptopic};
    		mqttClient.subscribe(topicFilters);
    	}else{
    		mqttClient.subscribe(new String[] { TOPIC, clientId });
		}
	}

	@Override
	public void onStart(Intent intent, int startId) {
		final boolean isRestart=intent.getBooleanExtra("isRestart", false);
		ShopUtil.logger2("restart MQTT service:"+isRestart);
		// super.onStart(intent, startId);
//		if (intent == null) {//重启服务时intent 确实为空
		
//			Log.d(Constants.LOG_TAG, "intent is null");
//			return;
//		}
		Context context = getApplicationContext();
		clientId = ShopUtil.getIMEI(context);
		// Bundle bundle=intent.getExtras();
		// String ip=bundle.getString(Constants.ACTIVEMQ_IP);
//		final String ip = context.getString(R.string.pushserver_ip);
		SharedPreferences preferences = getApplicationContext()
				.getSharedPreferences(Constants3.SHAREDPREFERENCES_NAME,
						Context.MODE_PRIVATE);
		final String ip ="mqtt.ons.aliyun.com";// preferences.getString("pushserver_ip", context.getString(R.string.pushserver_ip));
		final String port = preferences.getString("pushserver_port", "1883");
		isAliyun=SystemHWUtil.parse2Boolean(preferences.getString("is_aliyun_mq_ONS", "false"));
		// String topic=bundle.getString(Constants.ACTIVEMQ_TOPIC);
		 System.out.println("push ip:"+ip);
		new Thread(new Runnable() {
			/****
			 * 尝试连接的次数,为什么要尝试连接几次那?
			 * (1)无wifi时启动,则肯定连接失败,所以尝试连接三次,只要在这个期间启动wifi就可以连接上activeMQ;<br />
			 * (2)之前连接上,然后断开wifi,然后又启动wifi,<br />
			 * 这时容易报 "Broker unavailable"异常,暂时不清楚原因,所以也需要继续尝试连接;<br />
			 * 
			 */
			private int tryTime = 5;

			@Override
			public void run() {
				System.out.println(tryTime+","+mqttClient+","+isOnline() );
				while (tryTime > 0
						&& (!isOnline() || mqttClient == null || (!mqttClient
								.isConnected())||isRestart)) {
					try {
						ShopUtil.logger2("start push service");
						ShopUtil.logger2("push server:"+ip);
						String prefix=Constants3.CONSUMER_ID_TV+"@@@";
						int remainingLength=23-prefix.length();
						String suffix=null;
						if(clientId.length()>remainingLength){
							suffix=clientId.substring(0,remainingLength);
						}else{
							suffix=clientId;
						}
						String clientId2=prefix+suffix;
						connectAndSubscribe(String.format(
								MQTTService.BROKER_URL_FORMAT, ip, port),
								clientId2, /* topic, */true, ""/*自己申请的access key*/, ""/*secret*/);
						 ShopUtil.logger2("clientId:" + clientId2);
						ShopUtil.logger2("succeed to connect to activeMQ");
						setOnline(true);
					} catch (MqttException e) {
						setOnline(false);
						mqttClient=null;
						 e.printStackTrace();
						ShopUtil.logger2("抛异常:"+e.getMessage());
						ShopUtil.logger2("ip:" + ip + " ,port:" + port);
						try {
							Thread.sleep(10000);
						} catch (InterruptedException e1) {
							e1.printStackTrace();
						}
					}
					tryTime--;
				}

			}
		}).start();
//		new Thread(new Runnable() {
//			@Override
//			public void run() {
//				System.out.println("start:"+System.currentTimeMillis());
//				try {
//					Thread.sleep(10000);
//				} catch (InterruptedException e) {
//					e.printStackTrace();
//				}
//				while(true){
//					try {
//						Thread.sleep(10000);
//						if(mqttClient!=null&& !mqttClient.isConnected()){
//							System.out.println("disConnected:"+System.currentTimeMillis());
//						}
//					} catch (InterruptedException e) {
//						e.printStackTrace();
//					}
//				}
//			}
//		}).start();
	}

	@Override
	public void onDestroy() {
		setOnline(false);
	     
		try {
			ShopUtil.logger2("MQTTService destory");
			mqttClient.disconnect(0);
		} catch (MqttException e) {
//			Toast.makeText(getApplicationContext(),
//					"Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG)
//					.show();
			e.printStackTrace();
			
		}
		mqttClient = null;
		stopForeground(true);
		Intent intent = new Intent("com.dbjtech.waiqin.destroy");  
	    sendBroadcast(intent); 
	}

	

	public boolean isOnline() {
		return online;
	}

	public void setOnline(boolean online) {
		this.online = online;
	}
	@Override
	public int onStartCommand(Intent intent, int flags, int startId) {
		flags = START_STICKY;
		return super.onStartCommand(intent, flags, startId);
	}
}

 

源代码见githb:

https://github.com/whuanghkl/mqtt_client_swing.git

阿里云消息队列MQ 开发者手册见附件

  • 大小: 28.2 KB
  • 大小: 19 KB
  • 大小: 17.7 KB
  • 大小: 267.3 KB
0
1
分享到:
评论

相关推荐

    C#版发布和订阅阿里云消息队列(Mqtt接入方式)

    在C#环境中,开发者可以使用MQTT库来实现与阿里云消息队列的交互,进行数据的发布和订阅。下面将详细介绍如何使用C#进行阿里云MQTT的接入,以及DXApp_MqttDemo项目可能包含的内容。 1. **注册阿里云账号和创建MQTT...

    阿里云消息队列MQ

    要使用阿里云消息队列MQ,开发者需要在项目中引入相应的Maven依赖,并在阿里云控制台上创建和管理Topic、ProducerId、ConsumerId、AccessKey和SecretKey。此外,确保订阅一致性,即所有属于同一Consumer ID的实例...

    阿里云 专有云企业版 V3.8.1 消息队列 RocketMQ 版 开发指南 20200529

    "阿里云 专有云企业版 V3.8.1 消息队列 RocketMQ 版 开发指南 ...该指南为企业客户提供了关于使用阿里云消息队列服务的详细指南,旨在帮助开发者正确地使用阿里云的消息队列服务,以提高应用程序的稳定性和性能。

    阿里云 专有云企业版 V3.6.0 消息队列MQ专业版 用户指南 - 20180824.pdf

    阿里云专有云企业版V3.6.0消息队列MQ专业版用户指南是阿里云官方发布的一份用户指南,旨在帮助用户快速了解和使用阿里云消息队列MQ专业版产品。该指南涵盖了产品的基本概念、功能特点、使用指南、常见问题解答等多...

    阿里云rocketmq消息队列对接demo

    该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。

    阿里云消息服务(MNS)API文档

    阿里云消息服务(MNS)是阿里云提供的一种分布式消息通知服务,该服务高效、可靠、安全、便捷,并且具备可弹性扩展的特性。MNS旨在帮助应用开发者在分布式组件之间传递数据,从而构建松耦合系统。该服务保证消息至少会...

    阿里云-消息队列-快速入门.pdf

    通过以上步骤,用户可以快速地了解和使用阿里云消息队列服务,实现消息的高效传输和应用间的解耦。对于更复杂的需求和高级特性,如消息分发策略、消息回溯、消息过滤等,用户可以进一步探索阿里云MQ的详细文档和最佳...

    阿里云 专有云企业版 V3.6.0 消息队列MQ专业版 产品简介 - 20180824.pdf

    阿里云专有云企业版V3.6.0消息...本文档旨在为用户提供一个综合的消息队列解决方案,但同时也强调了用户在使用阿里云消息队列MQ专业版时,应当遵守相关的法律规定、使用注意事项和安全规定,以避免可能的风险和损失。

    阿里云-消息队列-API指南-D.docx

    阿里云消息队列API指南是针对开发者如何使用阿里云消息队列服务进行应用程序集成的一份详细文档。消息队列在IT行业中广泛应用于分布式系统、微服务架构和异步任务处理,以实现高并发、解耦合和可靠的通信。 1. **...

    阿里云 专有云企业版 V3.6.0 消息队列MQ铂金版 运维指南 - 20180824.pdf

    本文档是阿里云专有云企业版V3.6.0消息队列MQ铂金版运维指南,旨在为用户提供消息队列MQ铂金版运维指南,帮助用户更好地使用阿里云消息队列MQ铂金版产品。 法律声明 在阅读或使用本文档之前,用户务必仔细阅读、...

    阿里云消息队列(MQS)用户指南

    阿里MQS的官方介绍文档,简单说明了阿里MQS的注册、使用、开发

    阿里云 专有云企业版 V3.8.0 消息队列 MQ 运维指南 20190621.pdf

    这份运维指南是企业用户有效管理和使用阿里云消息队列服务的重要参考资料,它确保用户能在遵守法规的同时,充分利用服务实现高效、可靠的通信。在实际运维过程中,用户应结合指南中的信息和阿里云提供的其他资源,以...

    阿里云 专有云企业版 V3.8.1 消息队列 MQ 产品简介 20190916

    "阿里云 专有云企业版 V3.8.1 消息队列 MQ 产品简介 20190916" 阿里云 专有云企业版 V3.8.1 消息队列 MQ 产品简介 20190916 是一款基于云计算平台的消息队列产品,旨在提供高效、可靠、安全的消息队列服务。该产品...

    C语言调用阿里云消息队列.doc

    阿里云消息队列MQTT协议调用Paho C接口 本文将详细介绍阿里云消息队列MQTT协议的调用Paho C接口,包括MQTT协议的简介、资源类、Parent Topic、Subtopic、Client ID、Username、Password、ServerUrl、QoS、clean...

    阿里云消息队列服务(MQS) ——入门指南

    阿里云消息队列服务(MQS) ——入门指南 带目录 MQS 是一种高效、可靠、安全、便捷、可弹性扩展的分布式消息队列服务。MQS 能够帮助应用开发者在他们应用的分布式组件上自由的传递数据,构建松耦合系统。

    阿里云消息队列kafka demo

    阿里云消息队列Kafka Demo是一个基于Scala编程语言实现的应用示例,旨在帮助开发者了解如何在阿里云环境中使用Kafka进行消息生产和消费。Kafka是一种分布式流处理平台,常用于构建实时数据管道和流应用,它能够高效...

    浅谈使用java实现阿里云消息队列简单封装

    浅谈使用java实现阿里云消息队列简单封装 本文主要介绍了使用Java实现阿里云消息队列的简单封装,包括对阿里云消息队列的介绍、设计方案、消息发送和接收的实现等。 一、阿里云消息队列简介 阿里云提供了两种消息...

    基于51单片机采集环境数据并使用MQTT协议上传至阿里云,阿里云使用AMQP消息队列下发至业务服务器,在前端页面展示数据

    【作品名称】:基于51单片机采集环境数据并使用MQTT协议上传至阿里云,阿里云使用AMQP消息队列下发至业务服务器,在前端页面展示数据 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目...

    阿里云 专有云Enterprise版 消息队列MQ铂金版 V3.3.0 运维指南 20180312.pdf

    7. 阿里云专有云Enterprise版消息队列MQ铂金版V3.3.0的使用限制:该部分内容详细介绍了阿里云专有云Enterprise版消息队列MQ铂金版V3.3.0的使用限制,包括用户使用限制、保密义务、知识产权保护等内容。 8. 阿里云...

Global site tag (gtag.js) - Google Analytics