`
longgangbai
  • 浏览: 7342965 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

MQTT的学习研究(七)基于HTTP POST MQTT 发布消息服务端使用

阅读更多

参阅官方文档

 

http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21220_.htm

 

 

           HTTP POST puts a message to a queue, or a publication to a topic. The HTTPPOST Java sample is an example an HTTP POST request of a message to a queue. Instead of using Java, you could create an HTTP POST request using a browser form, or an AJAX toolkit instead.

Figure 1 shows an HTTP request to put a message on a queue called myQueue. This request contains the HTTP header x-msg-correlId to set the correlation ID of the WebSphere MQ message.

Figure 1. Example of an HTTP POST request to a queue
POST /msg/queue/myQueue/ HTTP/1.1
Host: www.example.org
Content-Type: text/plain
x-msg-correlID: 1234567890
Content-Length: 50

Here's my message body that will appear on the queue.

Figure 2 shows the response sent back to the client. There is no response content.

Figure 2. Example of an HTTP POST response
HTTP/1.1 200 OK
Date: Wed, 2 Jan 2007 22:38:34 GMT
Server: Apache-Coyote/1.1 WMQ-HTTP/1.1 JEE-Bridge/1.1
Content-Length: 0

 

请求的协议格式和请求的响应格式

 

The HTTP POST operation puts a message on a WebSphere® MQ queue, or publishes a message to a topic.

Syntax


Request

>>-POST-- --| Path |-- --HTTP version--CRLF--------------------->

   .-CRLF---------------.  .-CRLF---------------.   
   V                    |  V                    |   
>----+----------------+-+----+----------------+-+--------------->
     '-general-header-'      '-request-header-'     

   .-CRLF----------------------------.        .-CRLF----.   
   V                                 |        V         |   
>----+-----------------------------+-+--CRLF----Message-+------><
     '-| entity header (Request) |-'                        

Path

|--/--contextRoot--/-------------------------------------------->

>--msg/--+-queue/--queueName--+-------------+-+--/--------------|
         |                    '-@--qMgrName-' |      
         '-topic/--topicName------------------'      

entity-header (Request)

|--+----------------------------------------------+-------------|
   +-standard entity-header-- --entity-value------+   
   +-x-msg-class-- --message type-----------------+   
   +-x-msg-correlId-- --correlation ID------------+   
   +-x-msg-encoding-- --encoding type-------------+   
   +-x-msg-expiry-- --duration--------------------+   
   +-x-msg-format-- --message format--------------+   
   +-x-msg-msgId-- --message ID-------------------+   
   +-x-msg-persistence-- --persistence------------+   
   +-x-msg-priority-- --priority class------------+   
   +-x-msg-replyTo-- --reply-to queue-------------+   
   +-x-msg-require-headers-- --entity header name-+   
   '-x-msg-usr-- --user properties----------------'   

Note:
  1. If a question mark (?) is used it must be substituted with %3f. For example, orange?topic should be specified as orange%3ftopic.
  2. @qMgrName is only valid on an HTTP POST

Response

>>-HTTP version-- --HTTP Status-Code-- --HTTP Reason-Phrase--CRLF-->

   .-CRLF---------------.  .-CRLF----------------.   
   V                    |  V                     |   
>----+----------------+-+----+-----------------+-+-------------->
     '-general-header-'      '-response-header-'     

   .-CRLF-----------------------------.   
   V                                  |   
>----+------------------------------+-+------------------------><
     '-| entity-header (Response) |-'     

entity-header (Response)

|--+-----------------------------------------+------------------|
   +-standard entity-header-- --entity-value-+   
   +-x-msg-class-- --message type------------+   
   +-x-msg-correlId-- --correlation ID-------+   
   +-x-msg-encoding-- --encoding type--------+   
   +-x-msg-expiry-- --duration---------------+   
   +-x-msg-format-- --message format---------+   
   +-x-msg-msgId-- --message ID--------------+   
   +-x-msg-persistence-- --persistence-------+   
   +-x-msg-priority-- --priority class-------+   
   +-x-msg-replyTo-- --reply-to queue--------+   
   +-x-msg-timestamp-- --HTTP-date-----------+   
   '-x-msg-usr-- --user properties-----------'   

HTTP POST方式实现如下:
package com.etrip.mqttv3.http;



/**
 * This sample shows how to post a message. It has the same behaviour as the
 * amqsput command in that it will read in lines from the command line and put
 * them to the queue. It will put non-persistent String messages on to the queue
 * with UNLIMITED expiry and LOW (0) priority. The program is terminated by
 * either EOF being put into the entry line (^Z on windows) or a blank line.
 * usage: java HTTPPOST <Queue (default=SYSTEM.DEFAULT.LOCAL.QUEUE)> <host:port
 * (default localhost:8080> <context-root (the MQ Bridge for HTTP's
 * context-root)>
 */

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
/**
 * 
 * 采用HTTP POST发布相关的消息 
 *     The HTTP POST operation puts a message on a WebSphere® MQ queue, or publishes
 *  a message to a topic. 
 *  
 *  发布消息到主题或者队列的路径:
 *  
 *  
 *  
 *  
 * 
 * @author longgangbai
 */
public class HTTPPOST
{
   private static final String DEFAULT_HOST = "localhost";
   private static final String DEFAULT_PORT = "8080";
   private static final String DEFAULT_QUEUE = "SYSTEM.DEFAULT.LOCAL.QUEUE";

   private static final String DEFAULT_CONTEXT_ROOT = "mq";

   private static final String CRLF = "\r\n";

   public static int MALFORMED_URL_EXCEPTION_RC = -1;

   public static int END_IOEXCEPTION_RC = -2;

   /**
    * 构建发布主题队列路径
    * 
    * @param host
    * @param port
    * @param context
    * @param queueName
    */
	private static String getPublishQueueURL(String host, String port,
			String context, String queueName) {
		StringBuffer urlString =new StringBuffer("http://");
		   if(StringUtils.isEmtry(host)){
			   host=DEFAULT_HOST;
		   }
		   
		   if(StringUtils.isEmtry(port)){
			   port=DEFAULT_PORT;
		   }
		   urlString.append(host).append(":").append(port);
		   if(StringUtils.isEmtry(context)){
			   context=DEFAULT_CONTEXT_ROOT;
		   }
		   urlString.append("/");
		   urlString.append(context);
		   urlString.append("/msg/queue/");
		   if(StringUtils.isEmtry(queueName)){
		   }
		   queueName=DEFAULT_QUEUE;
		   urlString.append(queueName);
		   System.out.println("urlString="+urlString);
		   return urlString.toString();
	}
   
	/**
	 *  
	 * @param host
	 * @param port
	 * @param context
	 * @param queueName
	 * @param message
	 * @return
	 * @throws MalformedURLException
	 */
	   public static boolean publishTopic(String host,String port,String context,String queueName,String message ){
		   boolean response = true;   
		   HttpURLConnection connection=null;
		   try {
				    String publishURL=getPublishQueueURL(host, port, context, queueName);
				      URL url=new URL(publishURL);
				      connection = (HttpURLConnection) url.openConnection();

				      /* Build the headers */
				      // the verb first
				      connection.setRequestMethod("POST");

				      // Content type is a string message
				      connection.setRequestProperty("content-type", "text/plain");

				      // set the message priority to low
				      connection.setRequestProperty("x-msg-priority", "LOW");

				      // Ensure we can get the output stream from the connection
				      connection.setDoOutput(true);

				      OutputStream outputStream = connection.getOutputStream();
				      // wrapper the outputstream in a writer
				      BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
				            outputStream));

				      // Now write the actual content.
				      // Make sure the CRLF is there in case some HTTP servers don't understand
				      // that it's the end of the message
				      writer.write(message + CRLF + CRLF);
				      writer.flush();

				      // now actually send the message
				      connection.connect();
				     
				      // check the response for errors
				      int responseCode = connection.getResponseCode();
					if (responseCode != 200)
				      {
				    	 
				    	  String responseMessage =connection.getResponseMessage();
				    	  System.out.println("responsere sponseCode "+responseCode+" response request ="+responseMessage);
				    	  System.out.println("responsere context ");
				         BufferedReader reader = new BufferedReader(new InputStreamReader(
				        		 connection.getErrorStream()));
				           String line = null;
				           while ((line = reader.readLine()) != null)
				           {
				              System.out.println(line);
				           }
				         connection.disconnect();
				         response = false;
				      }else{
				    	  //获取相应的消息头信息
				    	  String responseQueueName=connection.getHeaderField("x-msg-replyTo");
				    	  System.out.println("responseQueueName="+responseQueueName);
				    	  System.out.println("response successful context :"+connection.getResponseMessage());
				      }
			} catch (MalformedURLException e) {
				response = false;
				e.printStackTrace();
				// TODO: handle exception
			} catch (IOException e) {
				 response = false;
				// TODO Auto-generated catch block
				e.printStackTrace();
			}finally{
				connection.disconnect();
			}
		    return response;
	   }
	   
	   public static void main(String[] args) {
		   HTTPPOST.publishTopic("192.168.208.46", "8080", "mq", "java_lover", "this is a message ");
	   }
}
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics