`
can_do
  • 浏览: 266036 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于EMQX以切分消息方式发送文件

阅读更多
1、引入jar包如下:
1> mqtt-client-1.16-SNAPSHOT.jar
2> hawtbuf-1.11.jar
3> hawtdispatch-1.22.jar
4> hawtdispatch-transport-1.22.jar


2、代码如下:

package com.cnd.poc;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;

import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

public class TestMqttClientFuture {

	private static String EMQX_Cluster_URL = "mqttpoc.yourcompany.com";

	private static int EMQX_Cluster_Broker_Port = 1883;

	private static String EMQX_Cluster_Access_UserName = "can_do";

	private static String EMQX_Cluster_Access_Password = "passw0rd";

	private static String EMQX_Cluster_Topic_Name = "YourTm/01012345678";

	public static void main(String[] args) {
		// TODO Auto-generated method stub

		MQTT mqtt = new MQTT();
		try {
			mqtt.setHost(EMQX_Cluster_URL, EMQX_Cluster_Broker_Port);
			mqtt.setUserName(EMQX_Cluster_Access_UserName);
			mqtt.setPassword(EMQX_Cluster_Access_Password);

		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		FutureConnection connection = mqtt.futureConnection();

		boolean isFuture = true;

		Future<Void> f1 = connection.connect();
		try {
			f1.await();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		Future<byte[]> f2 = connection.subscribe(new Topic[] { new Topic(EMQX_Cluster_Topic_Name, QoS.AT_LEAST_ONCE) });
		try {
			byte[] qoses = f2.await();

		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		// We can start future receive..
		Future<Message> receive = connection.receive();
		// how to retrieve message from future object

		// send the message..
		// send file with byte[] in splitted message
		boolean isSendFileMsg = false;
		if (isSendFileMsg) {
			Future<Void> f3 = connection.publish(EMQX_Cluster_Topic_Name, "Hello can_do 24".getBytes(),
					QoS.EXACTLY_ONCE, false);

			// Then the receive will get the message.
			Message message = null;
			try {
				message = receive.await();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			message.ack();

			System.out.print("=message is:=" + new String(message.getPayload()) + "=end=");
		}

		String fileName = "E:\\to_removed\\20190610\\test_mqtt_file_transfer_del.txt";

		int permFragmentSize = 64;

		boolean isDelFileAfterSent = false;

		sendFileInByteArray(connection, fileName, permFragmentSize, isDelFileAfterSent);

		if (!isFuture) {
			Future<Void> f4 = connection.disconnect();
			try {
				f4.await();
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

	/**
	 * send file with byte array
	 */
	public static boolean sendFileInByteArray(FutureConnection paramConn, String paramFileAbsPath,
			int paramPermFragmentSize, boolean delWhenFinished) {
		boolean isSendSuccessful = false;

		File file = new File(paramFileAbsPath);

		InputStream in = null;

		int PER_FRAGMENT_SIZE = paramPermFragmentSize;

		try {
			String encodedName = URLEncoder.encode(file.getName(), "UTF-8");
		} catch (UnsupportedEncodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		try {
			byte[] block = new byte[PER_FRAGMENT_SIZE];
			int byteread = 0;
			in = new FileInputStream(paramFileAbsPath);

			int i = 0;
			while ((byteread = in.read(block)) != -1) {
				i++;
				paramConn.publish(EMQX_Cluster_Topic_Name, block, QoS.EXACTLY_ONCE, false);
				// Note:you need to clear byte[] after every time sent
				// This statement is vip
				block = new byte[PER_FRAGMENT_SIZE];
			}
			System.out.println("=i is:=" + i + "=end=");

			isSendSuccessful = true;
		} catch (Exception e1) {
			e1.printStackTrace();

			if (in != null)
				try {
					in.close();
				} catch (IOException localIOException) {
				}
		} finally {
			if (in != null)
				try {
					in.close();
				} catch (IOException localIOException1) {
				}
		}
		if (isSendSuccessful) {
			if (delWhenFinished) {
				String strFileReadme = file.getName().split("\\[_]")[0];
				File tmpFile = new File(file.getParent() + File.separator + strFileReadme);
				if (tmpFile.exists()) {
					tmpFile.delete();
				}
				file.delete();
			}
		}
		return isSendSuccessful;
	}

}
分享到:
评论

相关推荐

    切分大的文件以及合并切分的文件

    1. **获取所有子文件**:首先,我们需要知道所有切分后的子文件名,这通常可以通过某种方式(如命名规则)预先确定或从日志中获取。 2. **创建临时文件**:为了合并,创建一个新的临时文件,这将成为合并后的大文件...

    切分大的文件以及合并切分的文件2

    文件切分是将一个大的文件拆分成多个较小的文件,以便于管理、传输或适应特定系统限制。在Java中,这通常通过读取大文件的每个部分并将其写入新的文件来实现。`SplitAndCombine` 类可能有一个方法,如 `splitFile()...

    基于JAVA的PDFBOX实现文件切分切割

    基于JAVA的PDFBOX实现文件切分切割,里面有所用JAR包,commons-logging-1.2.jar,commons-logging-1.2-javadoc.jar,jbig2-imageio-3.0.4.jar,pdfbox-2.0.26.jar,另外附有以实现的JAVA代码,下载即可用

    基于Matlab的图像切分

    在图像处理领域,"基于Matlab的图像切分"是一个重要的技术,主要用于将图像分割成多个有意义的部分,以便于分析、识别或者进一步处理。Matlab作为一个强大的数学计算和数据分析平台,提供了丰富的工具箱来支持这样的...

    C# 如何 切分组合大尺寸文件 使用WinRAR压缩文件 将文件转换成网页等等...,有8个类似的示例啊 (源码)

    - 文件切分:当面对大尺寸文件时,可以使用`FileStream`和`BinaryReader`或`BinaryWriter`类将文件读取或写入到多个小文件中。通过设定每个部分的大小,可以控制切分的粒度。 - 文件组合:相反,使用`FileStream`...

    c#文件操作,切分组合,多线程的,代码简单优化过,保留原文件,

    文件切分是指将一个大文件分割成多个小文件,以便于存储、传输或处理。在C#中,我们可以使用`System.IO`命名空间中的类来实现这一功能。以下是一个简单的文件切分示例: ```csharp using System; using System.IO; ...

    基于MyBatis的数据库切分框架,可实现数据的水平切分和垂直切分

    数据库切分是应对大数据场景下性能瓶颈的一种常见解决方案。它通过将数据水平或垂直拆分到多个数据库节点上,从而提高系统的并发处理能力和存储能力。您提供的 demo 使用了 MyBatis 这一流行的 ORM 框架,实现了数据的...

    基于opencv的像中位切分算法

    **基于OpenCV的中位切分算法详解** 中位切分算法是一种经典的图像分割方法,尤其适用于处理二值图像或灰度图像。该算法通过反复将像素集合按灰度值进行排序并切分,达到分割图像的目的。在计算机视觉和图像处理领域...

    C#源码,分割文件,C#切分组合文件

    在处理大量数据或者需要高效传输时,我们可能会遇到需要将大文件分割成小块,或者将这些小块重新组合成原始文件的情况。本主题主要探讨的是如何使用C#来实现文件的分割与组合功能。 首先,文件分割是将一个大文件...

    基于vc++的图像切分系统源程序

    本文将以一个基于VC++编写的图像切分系统源程序为例,探讨其核心原理和实现方法。 首先,我们要理解VC++(Visual C++)是微软公司推出的一种面向对象的编程环境,它提供了强大的图形用户界面设计工具以及对C++语言...

    切分对话框实例,实现对话框切分

    在本文中,我们将深入探讨如何使用VC6.0开发环境实现“切分对话框实例”,这一功能主要用于在对话框界面中动态地分割显示区域,允许用户以横行或纵向方式组织内容。我们将介绍核心的编程概念和技术,以及涉及到的...

    c#文件操作切分与组合,多线程,用进度条显示进度,代码简单

    文件切分是将一个大文件分割成多个小文件的过程,以便于存储、传输或处理。在C#中,我们可以使用FileStream和BinaryReader/Writer类来实现。首先,我们需要读取原始文件的内容,然后按照指定大小分块写入新的文件。...

    实例25 如何切分组合文件

    在IT领域,文件操作是日常工作中不可或缺的一部分,无论是开发、数据传输还是备份恢复,都可能涉及到文件的切分与组合。本实例将详细介绍如何进行这项操作,尤其适用于处理大文件时,为了便于传输或存储,我们可能会...

    C# 文件切分和组合源码

    本话题聚焦于一个基于C#的实用工具,该工具能够执行文件的切割和合并操作。这个工具利用多线程技术来提升性能,并通过进度条显示分割进度,为用户提供友好的交互体验。 文件切割是将一个大文件拆分成多个小块的过程...

    切分组合文件(2.0)

    "切分组合文件(2.0)" 提供了一种高效的方法来处理大文件,特别是在有限的系统资源或者网络传输限制下。这个工具可能采用了C#编程语言,并通过Windows Forms(WinForm)界面提供用户友好的操作体验。下面我们将详细...

    文件切分合并工具(C#)

    "文件切分合并工具(C#)"是一款基于C#编程语言开发的应用程序,主要用于处理大文件,包括文件的切分、合并以及批量重命名操作。这些功能在很多场景下都非常实用,例如上传限制、备份策略和数据传输优化等。 首先,让...

    基于垂直投影的文字切分

    一个简单通过二值化并垂直投影来进行背景噪声较小的图片文本行的文字切分程序

    python实现根据文件关键字进行切分为多个文件的示例

    通过这种方式,我们可以高效地将一个大文件拆分成多个小文件,每个文件包含一个或多个连续的关键字段落,方便进一步的分析和处理。 总结来说,这个Python示例提供了一种实用的方法,用于根据文件中的关键字将大文件...

    文件切分工具

    - **电子邮件附件**:许多邮件系统对附件大小有限制,切分大文件可以确保它们能通过电子邮件发送。 - **网络传输**:在网络带宽有限的情况下,分割大文件可以加速上传和下载速度。 - **备份和存储**:切分大文件...

    基于Mycat的数据库切分方案探索

    传统关系型数据库由于缺乏扩展性,在面对大数据时存在巨大的缺陷,但是关系模型、事务机制对于大部分系统又不必不可少,目前业界主流的做法就是将传统数据库进行切分(包括垂直切分、水平切分等),提高数据库的可...

Global site tag (gtag.js) - Google Analytics