锁定老帖子 主题:以多线程、断点续传方式下载文件的实现
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (1)
|
|
---|---|
作者 | 正文 |
发表时间:2009-12-22
最后修改:2010-03-17
以多线程、断点续传方式下载文件,经常出现下载下来的文件大小和服务端一致,但是却无法正常打开的现象,搞了很久,贴下我的实现方式,请各位多多指教 思路: 1、将下载文件的处理放在自定义的线程类中,每下载一个文件就新启动一个下载线程。 2、在下载线程中完成对服务端的链接和身份认证,成功后开始下载文件。 3、新建n个子线程,根据下载文件的大小和线程数量得到每个子线程要下载的大小。 4、分别启动子线程,进行分段下载。 5、分段下载完成,合并临时文件。 6、合并文件完成,删除临时文件。
实现: FTP下载线程类 package com.jfc.ftp.tools; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import com.jfc.ftp.service.FTPService; import com.jfc.ftp.util.Constant; import com.jfc.ftp.util.PropertyUtil; /** * 为断点续传下载文件而启动新的线程 * @author SavageGarden * */ public class FTPThread extends Thread{ /** * 站点URL */ private String host; /** * 站点端口 */ private int port; /** * 用户 */ private String user; /** * 密码 */ private String pswd; /** * 当前线程的FTP操作接口实现类 */ private FTPService ftpService; /** * 第几个下载项 */ private int rowIndex; /** * 要下载的文件路径 */ private String filepath; /** * 要下载的文件大小 */ private long filesize; /** * 要下载的文件保存路径 */ private String savepath; /** * 标记文件已下载量 */ public int hadRead = 0; /** * 下载线程开始时间 */ public long startTime = 0; /** * 下载线程结束时间 */ public long endTime = 0; /** * 当前下载线程的互斥锁 */ public Lock ftpThreadLock; /** * 当前下载线程的状态 */ private int status = Constant.THREAD_STATUS_NEW; public synchronized int getStatus() { return status; } public synchronized void setStatus(int status) { this.status = status; } /** * 是否已经合并文件 */ private boolean hadMerger = false; public synchronized boolean isHadMerger() { return hadMerger; } public synchronized void setHadMerger(boolean hadMerger) { this.hadMerger = hadMerger; } /** * 当前下载线程的状态 */ private int completed = 0; public synchronized int getCompleted() { return completed; } public synchronized void setCompleted(int completed) { this.completed = completed; } /** * 下载线程个构造方法<br> * 根据已经取得连接的FTPTools得到连接信息<br> * 根据参数取得下载信息 * @param rowIndex * @param filepath * @param filesize * @param savepath */ public FTPThread(int rowIndex, String filepath, long filesize, String savepath) { super("FTPThread"); host = FTPTools.host; port = FTPTools.port; user = FTPTools.user; pswd = FTPTools.pswd; this.rowIndex = rowIndex; this.filepath = filepath; this.filesize = filesize; this.savepath = savepath; ftpThreadLock = new ReentrantLock(); setStatus(Constant.THREAD_STATUS_RUNNABLE); start(); } public FTPThread(int rowIndex, String filepath, long filesize, String savepath, int status) { super("FTPThread"); host = FTPTools.host; port = FTPTools.port; user = FTPTools.user; pswd = FTPTools.pswd; this.rowIndex = rowIndex; this.filepath = filepath; this.filesize = filesize; this.savepath = savepath; ftpThreadLock = new ReentrantLock(); setStatus(status); start(); } public void run() { getFTPService(); getFTPConnect(host, port); if(doLoginFTP(user, pswd)) { ResumeBrokenTransferByThread(this, rowIndex, filepath, filesize, savepath); } } /** * 获取FTPService接口实现类<br> * 首先从配置文件中找<br> * 如果没有则加载默认的实现类 * @return * @throws InstantiationException * @throws IllegalAccessException * @throws ClassNotFoundException */ public void getFTPService(){ try { ftpService = (FTPService)Class.forName(PropertyUtil.getProperty("ftp.service.name", FTPService.FTP_SERVICE_NAME)).newInstance(); } catch (Exception e) { e.printStackTrace(); } } /** * 根据服务器地址、端口获得ftp链接 * @param host * @param port * @return */ public String getFTPConnect(String host, int port) { return ftpService.getFTPConnect(host, port); } /** * 执行登录 * @param user * @param pswd * @return */ public boolean doLoginFTP(String user, String pswd) { return ftpService.doLoginFTP(user, pswd); } /** * 以断点续传的方式下载文件 * @param rowIndex * @param filepath * @param filesize * @param savepath */ public void ResumeBrokenTransfer(int rowIndex, String filepath, int filesize, String savepath) { ftpService.ResumeBrokenTransfer(rowIndex, filepath, filesize, savepath); } /** * 以多线程、断点续传的方式下载文件 * @param rowIndex * @param filepath * @param filesize * @param savepath */ public void ResumeBrokenTransferByThread(FTPThread ftpThread, int rowIndex, String filepath, long filesize, String savepath) { ftpService.ResumeBrokenTransferByThread(ftpThread, rowIndex, filepath, filesize, savepath); } /** * 在指定文件路径下查找临时文件合并为一个文件 * @param filepath * @param threadCount */ public void mergerTempFile(String filepath, int threadCount) { System.out.println("开始合并文件"); try { BufferedOutputStream data_output = new BufferedOutputStream(new FileOutputStream(filepath)); byte[] temp = new byte[Constant.TEMP_BYTE_LENGTH]; for(int i = 0; i < threadCount; i ++) { File tempFile = new File(filepath + Constant.DOWNLOAD_TEMP_NAME + i); BufferedInputStream data_input = new BufferedInputStream(new FileInputStream(tempFile)); int read = 0; int hadRead = 0; while((read = data_input.read(temp, 0, temp.length)) != -1) { data_output.write(temp, 0, read); hadRead += read; } data_input.close(); } data_output.close(); } catch (Exception e) { e.printStackTrace(); } setHadMerger(true); System.out.println("合并文件完成"); deleteTempFile(filepath, threadCount); } /** * 合并文件完成后删除临时文件 * @param filepath * @param threadCount */ public void deleteTempFile(String filepath, int threadCount) { if(isHadMerger()) { for(int i = 0; i < threadCount; i ++) { File tempFile = new File(filepath + Constant.DOWNLOAD_TEMP_NAME + i); tempFile.delete(); } } } } FTP接口实现类中的ResumeBrokenTransferByThread方法 /**
* 使用多线程、断点续传方式下载文件<br>
* 首先查找要保存的路径下有无缓存文件(以.wfml为后缀)<br>
* 不存在 重新开始下载<br>
* 存在 继续<br>
* 重新开始下载<br>
* 读取配置文件要分多少个线程来下载文件<br>
* 按照文件大小、线程数量来分配每个线程要下载的文件大小、文件名<br>
* 开始断点续传下载<br>
* 继续下载<br>
* 开始断点续传下载<br>
* @param rowIndex
* @param filepath
* @param filesize
* @param savepath
*/
public void ResumeBrokenTransferByThread(final FTPThread ftpThread, final int rowIndex, final String filepath, final long filesize, final String savepath) {
final String filename = filepath.substring(filepath.lastIndexOf("/") + 1, filepath.length());
final byte[] temp = new byte[Constant.TEMP_BYTE_LENGTH];
//得到要创建的线程数量
final int threadCount = Integer.parseInt(PropertyUtil.getProperty(Constant.RESUME_THREAD_COUNT_PROPNAME, Constant.RESUME_THREAD_COUNT_DEFAULT));
final String[] downloadSizeArray = SystemTools.getDownloadSizeArray(filesize, threadCount);
for(int i = 0; i < threadCount; i ++) {
File temp_file = new File(savepath + File.separator + filename + Constant.DOWNLOAD_TEMP_NAME + i);
System.out.println("文件" + i + "大小为:" + temp_file.length());
ftpThread.hadRead += temp_file.length();
}
System.out.println("ftpThread.hadRead : " + ftpThread.hadRead);
for(int i = 0; i < threadCount; i ++) {
final int index = i;
Thread resumeThread = new Thread(){
//当前线程的缓存文件
File tempFile = new File(savepath + File.separator + filename + Constant.DOWNLOAD_TEMP_NAME + index);
public void run() {
SocketFTPService socketFTPService = new SocketFTPService();
socketFTPService.getFTPConnect(host, port);
if(socketFTPService.doLoginFTP(user, pswd)) {
try {
int read = 0;
int hadRead = 0;
//当前线程要下载的文件大小
String downsize = downloadSizeArray[index].split(":")[1];
//当前线程要下载的文件起始点
String skipsize = downloadSizeArray[index].split(":")[0];
//将hadRead(已读文件大小)置为缓存文件的大小
hadRead = (int)tempFile.length();
//设定文件指针(下载位置)
//socketFTPService.doFTPCommand("REST " + (Integer.parseInt(skipsize) + temp_file.length()));
//readRespond();
socketFTPService.dataSocket = socketFTPService.getDataSocket();
socketFTPService.doFTPCommand("RETR " + filepath);
BufferedInputStream data_input = new BufferedInputStream(socketFTPService.dataSocket.getInputStream());
RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
//跳过当前线程要下载的文件起始点和缓存文件大小之和
data_input.skip(Integer.parseInt(skipsize) + hadRead);
raf.seek(hadRead);
System.out.println("线程" + index + "已下载 " + hadRead);
if(ftpThread.startTime == 0) {
ftpThread.startTime = System.currentTimeMillis();
}
SystemTools.addObserver();
while(hadRead < Integer.parseInt(downsize)) {
if(ftpThread.getStatus() == Constant.THREAD_STATUS_RUNNABLE) {
while((read = data_input.read(temp, 0, temp.length)) != -1) {
int temp_hadRead = hadRead;
if((temp_hadRead += read) > Integer.parseInt(downsize)) {
read = Integer.parseInt(downsize) - hadRead;
}
raf.write(temp, 0, read);
hadRead += read;
ftpThread.ftpThreadLock.lock();
try {
ftpThread.hadRead += read;
} finally {
ftpThread.ftpThreadLock.unlock();
}
SystemTools.getCurrentSpeed(rowIndex, ftpThread.startTime, ftpThread.hadRead);
SystemTools.getPrice(rowIndex, ftpThread.hadRead, filesize);
SwingUtilities.invokeLater(SystemTools.updateProgressBarRunnable);
}
System.out.println("第" + index + "个线程完成下载" + hadRead + ",完成下载" + ftpThread.hadRead);
raf.close();
if(hadRead == tempFile.length()) {
ftpThread.setCompleted(ftpThread.getCompleted() + 1);
System.out.println(ftpThread.getCompleted());
}
if(ftpThread.getCompleted() == threadCount && ftpThread.hadRead == filesize) {
ftpThread.endTime = System.currentTimeMillis();
SystemTools.getFinalSpeed(rowIndex, ftpThread.startTime, ftpThread.endTime, filesize);
ftpThread.mergerTempFile(savepath + File.separator + filename, threadCount);
}
}
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
};
resumeThread.start();
}
}
欢迎拍砖! 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2009-12-23
没耐心看完代码
个人感觉问题应该出现在这几个地方,一是从网络读取的数据是否真实度到了本地;二是文件合并的时候是否数据真的写入了合并后的文件中。 确保BufferedOutputStream中的数据已经处理干净了 确保RandomAccessFile的读取数据正确 一般遇到这样中问题就是使用debug模式跑一边,如果最终合并之后的文件数据的byte数组中,出现连续的000000,肯定是数据没有读过来导致的 |
|
返回顶楼 | |
发表时间:2009-12-23
我想看FtpService
|
|
返回顶楼 | |
发表时间:2009-12-23
我也指几个问题吧:
1. 滥用的synchronized 2. 不能确保关闭的文件流(如果出现异常,就无法关闭文件流) 3. 方法名首字母大写,一看楼主就是做.net转做java的。 4. 基本无效的异常处理。 5. 因为异常而导致的线程泄漏。 6. 子线程还没有执行完,主线程就会退出,这样子线程就丝毫没有任何管理。 这样的程序学习做sample用是可以的,不过离真正的应用还有蛮大的距离。 |
|
返回顶楼 | |
发表时间:2009-12-24
==================================================
3. 方法名首字母大写,一看楼主就是做.net转做java的。 ================================================== 真是很精屁啊 |
|
返回顶楼 | |
发表时间:2009-12-24
方法名首字母一定要大写么 我看 什么equals。。。valueOf 不也是小写么
|
|
返回顶楼 | |
发表时间:2009-12-24
我是指出楼主的问题,不是说方法首字母要大写。
|
|
返回顶楼 | |
发表时间:2009-12-24
貌似.net方面的方法名是大写开头,而java方面方法名首字母小写,常量全大写!
|
|
返回顶楼 | |
发表时间:2009-12-24
最后修改:2009-12-24
首先非常感谢大家的关注! xzqttt
“个人感觉问题应该出现在这几个地方,一是从网络读取的数据是否真实度到了本地;二是文件合并的时候是否数据真的写入了合并后的文件中。
一针见血!偶尔会出现下载下来的文件无法打开或报文件已损坏,一直以为是线程方面的问题,感谢指点解决问题的方向!
prowl
“我想看FtpService”
第二段代码即为FtpService实现类中的ResumeBrokenTransferByThread方法
凤舞凰扬
“1. 滥用的synchronized 其实是还不会用。。。 其实是还没有做异常处理。。。 其它的都小写的,唯独贴出来的这个大写了是因为“ResumeBrokenTransfer”这个词是从网上搜来了,自己不知道“断点续传”咋翻译,也就没改成小写,不过我确实没有碰过.net。。。 同2 线程泄露?请不吝指教,多谢! 确实是第一次写这种程序,感谢您的指教,谢谢!
再次感谢大家的关注和指教,谢谢! |
|
返回顶楼 | |
发表时间:2009-12-24
引用 1. 滥用的synchronized 其实是还不会用。。。 其实对于你的类中setter/getter方法大多不需要使用这个关键字;而对于线程回收以及状态判断时就有可能需要。这里面的问题比较复杂,不是一下子可以说清的。 引用 2. 不能确保关闭的文件流(如果出现异常,就无法关闭文件流) 其实是还没有做异常处理。。。 其实不管异常是否处理,一定要确保流能被正确关闭,所以应该放在finally,而不是正确执行的后面。 引用 3. 方法名首字母大写,一看楼主就是做.net转做java的。 其它的都小写的,唯独贴出来的这个大写了是因为“ResumeBrokenTransfer”这个词是从网上搜来了,自己不知道“断点续传”咋翻译,也就没改成小写,不过我确实没有碰过.net。。。 这倒是我误解了,只是因为我有做.net的同事经常有这种习惯,呵呵。 引用 4. 基本无效的异常处理。 同2 多线程程序中异常处理是其健壮性的重要保证,所以一定要重视。 引用 5. 因为异常而导致的线程泄漏。 线程泄露?请不吝指教,多谢! 对于你for循环中的线程来说,因为你前面如果出现异常,就有可能导致该线程无法正确地处理信息,同时相对应的资源没有被关闭,从而出现问题。 引用 6. 子线程还没有执行完,主线程就会退出,这样子线程就丝毫没有任何管理。
子线程还没有执行完,主线程就会退出?测试时从来没有发生过,请问您怎么得出的这个结论? 因为你的主程序执行完方法ResumeBrokenTransferByThread就已经结束了,而在方法ResumeBrokenTransferByThread中是启动另外的线程去下载文件。而这些线程已经脱离了你程序的管理。你根本就无法控制它们(比如停止等等,错误恢复等等)。对于多线程程序是一定要设计一个控制线程的。 引用 确实是第一次写这种程序,感谢您的指教,谢谢!
第一次已经很不错了,其实很值得鼓励,多写几次,多尝试,就会有经验了。 |
|
返回顶楼 | |