`

用HttpClient写了一个多线程下载软件

 
阅读更多

   这些天因为脚崴了在家休息,闲着没事就琢磨写个数据抓取的软件,这必然用到HttpClient这类连接远程资源的库,学到它时,抓取资源的时候,很可能要自动下载一些东西,这必须又要用到多线程的下载技术,于是就用它下了一个简单的,功能虽不全,但一般用用还可以。

  • 原理
    原理是模仿迅雷以前的方式,每个线程下载一部分文件内容,写入到一个单独的临时文件,当所有线程都完成下载时,再将这些临时文件,合并成一个。
  • 代码
    核心代码是Downloader这个类:
    package data.scrap;
    
    import com.xdg.util.*;
    import data.scrap.util.GetUtil;
    import data.scrap.util.HeaderUtil;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.http.HttpException;
    import org.apache.http.HttpResponse;
    import org.apache.http.HttpStatus;
    import org.apache.http.util.EntityUtils;
    
    import java.io.*;
    import java.util.ArrayList;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Downloader {
        private int threadNum = 1;
        private String url;
       volatile private long byteSize;
        private String sizeUnit;
        private String unitfiedSize;
        private String fileName;
        private final static Log log = LogFactory.getLog(Downloader.class);
        volatile private boolean stopMonitor;
    
        public Downloader(String url, int threadNum) {
            this.url = url;
            this.threadNum = threadNum;
        }
    
        public Downloader(String url) {
            this.url = url;
        }
    
        /**
         * entry point to download files
         * @param saveDir
         * @param saveName file name to save, use original file name if kept null
         * @throws Exception
         */
        public void download(String saveDir, String saveName) throws Exception {
            if (FileUtil.isNotExist(saveDir)){
                FileUtil.makeDirs(saveDir);
            }
            
            long startTime = System.currentTimeMillis();
            ExeResult result1 = GetUtil.doGet(url);
            if (result1.getResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                throw new HttpException("Can't connect to the remote server at '"+url+"'");
            }
            if (!HeaderUtil.isFile(result1.getResponse())) {
                throw new HttpException("No file found at '" + url + "'");
            }
            byteSize = HeaderUtil.getSize(result1.getResponse());
            fileName = HeaderUtil.getFileName(result1.getResponse(),url);
            convertSizeUnit();
    
            log.debug("the file size is " + unitfiedSize + sizeUnit + "/" + byteSize + "B");
            final ArrayList<DownloadWorker> workers = new ArrayList<DownloadWorker>();
            ExecutorService pool = Executors.newFixedThreadPool(15);
            long sizePerThread = byteSize / threadNum;
            for (int i = 0; i < threadNum; i++) {
                DownloadWorker worker;
                if (i == 0) {
                    worker = new DownloadWorker(result1, (i + 1), 0, sizePerThread);
                } else if (i < threadNum - 1) {
                    worker = new DownloadWorker(url, (i + 1), i * sizePerThread, sizePerThread);
                } else {
                    worker = new DownloadWorker(url, (i + 1), i * sizePerThread, byteSize - sizePerThread * i);
                }
    
                workers.add(worker);
            }
            monitorDownloadState(workers);
            try {
                pool.invokeAll(workers);
            } catch (InterruptedException e) {
                log.error(e);
                throw e;
            } catch (Exception e) {
                log.error(e);
                throw e;
            }
            pool.shutdown();
            stopMonitor = true;
            if (isDownloadOK(workers)) {
                String savePath = null;
                if (saveName == null) {
                    savePath = saveDir + "/" + fileName;
                } else {
                    savePath = saveDir + "/" + saveName;
                }
    
                log.info("partial files are being merged into one last file '"+savePath+"'");
                mergePartialFiles(workers, savePath);
                log.info("merging finished, seconds elapsed is " + TimeUtil.elapsedSecond(startTime));
            }
    
        }
    
        /**
         * merge temporary and partial files into one file
         * @param workers
         * @param savePath 
         */
        private void mergePartialFiles(ArrayList<DownloadWorker> workers, String savePath) {
            ArrayList<File> files = new ArrayList<File>();
            for (DownloadWorker worker : workers) {
                files.add(new File(worker.getFilePath()));
            }
    
            FileUtil.mergePartialFilesLinear(files.toArray(new File[]{}), savePath);
    
        }
    
        private boolean isDownloadOK(ArrayList<DownloadWorker> workers) {
            for (DownloadWorker worker : workers) {
                if (!worker.isSuccessful()) {
                    log.error("Thread "+worker.getId()+" fail to download all bytes, expected bytes : "
                            +worker.getLength()+", actual bytes: "+worker.getReadBytes());
                    return false;
                }
            }
    
            return true;
        }
    
        private void monitorDownloadState(final ArrayList<DownloadWorker> workers) {
            new Thread(new Runnable() {
                public void run() {
                    while (true) {
                        boolean exit = false;
                        for (DownloadWorker worker : workers) {
                            if (worker.isInvoked()) {
                                exit = true;
                                break;
                            }
                        }
    
                        if (!exit) {
                            try {
                                Thread.sleep(500);
                            } catch (InterruptedException e) {
                                log.error(e);
                            }
                        } else {
                            break;
                        }
    
                    }
    
                    long startTime = System.currentTimeMillis();
                    long lastDownloadedBytes=0;
                    while (!stopMonitor) {
                        long downloadedBytes = 0;
                        for (DownloadWorker worker : workers) {
                            if (worker.isInvoked()) {
                                downloadedBytes += worker.getReadBytes();
                            }
                        }
    
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            log.error(e);
                        }
    
                        if (lastDownloadedBytes!=downloadedBytes){
                            lastDownloadedBytes=downloadedBytes;
                        } else {
                            continue;
                        }
    
                        log.info(downloadedBytes / 1024 + "KB(" + downloadedBytes * 100 / byteSize + "%) has been downloaded, average speed is " +
                                NumUtil.truncateDecimal(downloadedBytes / 1024.0 / ((System.currentTimeMillis() - startTime) / 1000.0), 2)+ "KB/S");
    
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            log.error(e);
                        }
                    }
                }
            }).start();
        }
    
        private void convertSizeUnit() {
            if (byteSize < SizeUnit.MB) {
                unitfiedSize = NumUtil.truncateDecimal(byteSize / 1024, 0);
                sizeUnit = "KB";
            } else if (byteSize >= SizeUnit.MB && byteSize < SizeUnit.GB) {
                unitfiedSize = NumUtil.truncateDecimal(byteSize / SizeUnit.MB, 2);
                sizeUnit = "MB";
            } else {
                unitfiedSize = NumUtil.truncateDecimal(byteSize / SizeUnit.GB, 2);
                sizeUnit = "GB";
            }
        }
    
    }
    
    class DownloadWorker implements Callable<Object> {
        private String url;
        private long start;
        private long length;
        private ExeResult exeResult;
        private final static Log log = LogFactory.getLog(DownloadWorker.class);
        private int id;
        private long readBytes;
        private boolean invoked;
        private boolean successful;
        private String filePath;
    
        DownloadWorker(String url, int id, long start, long length) {
            this(GetUtil.doGet(url), id, start, length);
            this.url = url;
        }
    
        DownloadWorker(String url, int id, long start) {
            this(GetUtil.doGet(url), id, start);
            this.url = url;
        }
    
        DownloadWorker(ExeResult exeResult, int id, long start, long length) {
            this.exeResult = exeResult;
            this.start = start;
            this.length = length;
            this.id = id;
            this.url = exeResult.getUrl();
        }
    
        DownloadWorker(ExeResult exeResult, int id, long start) {
            this.exeResult = exeResult;
            this.start = start;
            this.id = id;
            this.url = exeResult.getUrl();
        }
    
        public long getReadBytes() {
            return readBytes;
        }
    
        public Object call() throws Exception {
            if (exeResult.getResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                throw new HttpException("Thread " + id + " can't connect to the remote server at '"+url+"'");
            }
    
            if (!HeaderUtil.isFile(exeResult.getResponse())) {
                throw new HttpException("No file found'" + url + "'");
            }
    
            invoked = true;
            filePath = SysPropUtil.getTempDir() + HeaderUtil.getFileName(exeResult.getResponse(),url) + ".part" + id;
    
            log.debug("Thread " + id + " is download bytes of " + start + " to " + (start + length - 1) + ", path : " + filePath);
            HttpResponse response = exeResult.getResponse();
    //        byte[] buffer = new byte[4096];
            InputStream is = null;
            try {
                is = response.getEntity().getContent();
            } catch (IOException e) {
                log.error(e);
            }
    
            FileOutputStream fos = null;
            try {
                fos = new FileOutputStream(filePath);
            } catch (FileNotFoundException e) {
                log.error(e);
            }
    
            try {
                long writtenBytes = FileUtil.writePart(fos, is, HeaderUtil.getSize(exeResult.getResponse()), start, length, new FileWriteProgress() {
                    public void changed(long readByteCnt) {
                        readBytes = readByteCnt;
                    }
                });
    
                if (writtenBytes >= length){
                    successful = true;
                }
                EntityUtils.consume(exeResult.getResponse().getEntity());
            } catch (IOException e) {
                log.error(e);
            } finally {
                exeResult.getHttpClient().getConnectionManager().shutdown();
            }
    
            return null;
        }
    
        public boolean isInvoked() {
            return invoked;
        }
    
        public boolean isSuccessful() {
            return successful;
        }
    
        public String getFilePath() {
            return filePath;
        }
    
        public int getId() {
            return id;
        }
    
        public long getLength() {
            return length;
        }
    }
    
     
    写入临时文件时,调用了FileUtil的下面这个方法:
     public static long writePart(OutputStream os, InputStream is, long size, long start, long len, FileWriteProgress fileWriteProgress) throws IOException {
            int bufferSize = 64*1024;  //buffer size, 64KB
            byte[] buffer = new byte[bufferSize];
            is.skip(start);
            long pos = start;
            long end;
            if (pos + len - 1 > size - 1) {
                end = size - 1;
            } else {
                end = start + len - 1;
            }
            int cnt = 0;
            long total = 0;
            while (true) {
                if (pos + bufferSize - 1 > end - 1) {
                    cnt = is.read(buffer, 0, (int) (end - pos + 1));
                } else {
                    cnt = is.read(buffer);
                }
    
                os.write(buffer, 0, cnt);
    
                total += cnt;
                fileWriteProgress.changed(total);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    log.error(e);
                }
                if (pos + bufferSize - 1 >= end - 1) {
                    break;
                } else {
                    pos += cnt;
                }
            }
    
            os.flush();
            os.close();
    
            return total;
        }
     
  • 实测数据
    找了华军上的一个文件,测试类代码:
    package httpclient.test;
    
    import data.scrap.Downloader;
    
    public class Download {
        public static void main(String[] args) throws Exception {
    //        Downloader downloader=new Downloader("http://dlwt.csdn.net/fd.php?i=883244521434558&s=facc18193fed9034f23d21ee20323358",1);
    //        Downloader downloader=new Downloader("http://dl5.csdn.net/fd.php?i=574444563309987&s=4b836c3db271e6e9189d36937772f4fe",1);
    //        Downloader downloader=new Downloader("http://dlwt.csdn.net/fd.php?i=145444563309889&s=31065ff22be5d0d72ceb9b700503a752",4);
            Downloader downloader=new Downloader("http://dl.newhua.com:86/down/HA_FB_1.1.13%20Final_Asion_B.zip",2);
    //        Downloader downloader=new Downloader("http://dlwt.csdn.net/fd.php?i=676644563305445&s=b59e62e8fc5ade9cc0d5fafc340588b8",10);
    
            downloader.download("f:/temp",null);
        }
    }
     文件地址是
    http://dl.newhua.com:86/down/HA_FB_1.1.13%20Final_Asion_B.zip
    ,我用的是联通的宽带,找的也是针对联通线路的地址,有兴趣的同学可以根据自己的宽带提供商找相应线路的文件以获得最高性能。

    当前面的writePart方法的写入缓冲大小为48KB时,文件大小为5943KB,带宽8M,测试结果如下:
    Thread Number Elapsed Seconds
    Speed
    1 26 228KB/S
    2 17 350KB/S
    3 20 297KB/S
    4 26 229KB/S
    5 33 180KB/S
    6 40 149KB/S

    可以看到2个线程时,下载速度最高,线程数如果为7以上时,总有一些线程没读完数据就中断了,可能是缓冲太大,线程数太多时,连接无法处理过大的数据量,如果把缓冲设为64KB,则最快一次,2个线程用了15秒完成,但只出现了一次。即使缓冲设得小点,线程数设得高点也没用,因为服务器会对线程数有限制,过多会响应得很慢,10以下比较安全,我设过20个,速度会几K几K的那样。

    我的带宽是8M,用迅雷,IDM等下载文件,会稳定在900多KB/S,用它们下载这个文件时,几秒就完事了,用HttpClient看来做下载,能力着实有限,不过如果在自己的程序中内嵌一个抓取数据用,已经足够用了。我也是刚学HttpClient,也许还有一些优化的方法吧。
  • 运行环境
    JDK: 1.7
    OS: Win7旗舰版
    HttpClient: 4.2.1
  • 需要的工程
    Common-Lib和Data-Scrapping-Lib我已经放到github上去了,后者还要引用前者,url分别为:
    https://github.com/fxbird/Common-Lib
    https://github.com/fxbird/Data-Scrapping-Lib
     还有一个测试工程可以在附件中下载。它引用前面两个工程
  • 总结
    下载文件时,缓冲的大小和线程数是成反比的,缓冲设的越大,则可使用的线程数越小,反之越多,但缓冲也不能设得过小,刚开始时我习惯性地设成了4K,结果4个线程下载时,用了86秒,后来才想到提高缓冲。还有一个缺限就是没有对线程数超过服务器限制而自动切断多余的线程时的处理,而一个健壮的多线程下载软件对此会有完善的处理。不足之处欢迎指正,共同进步。
分享到:
评论
2 楼 theoffspring 2014-08-31  
if(i!=我){} 写道
在开玩笑么……
把同个文件下载10遍,然后各取一段拼起来,这不叫多线程下载……

你说的是我么?我是把同一个文件下了10遍么?
1 楼 if(i!=我){} 2014-08-30  
在开玩笑么……
把同个文件下载10遍,然后各取一段拼起来,这不叫多线程下载……

相关推荐

    HTTPClient 的一个封装

    - **线程安全**:确保封装的HttpClient实例在多线程环境中的安全使用。 - **请求/响应缓存**:在本地缓存HTTP请求和响应,减少网络延迟。 文件名为httpClient的压缩包很可能包含了封装HttpClient的相关代码,可能...

    多线程下载支持断点续传

    首先,多线程下载是一种利用网络资源的方式,它将一个大文件分成多个部分,通过创建多个并行的网络连接,同时下载这些部分,从而加快下载速度。这种方式可以充分利用网络带宽,尤其在下载大文件时,可以显著减少下载...

    c#异步多线程http文件分块断点续传下载工具

    本文将详细探讨如何使用C#语言实现一个异步多线程的HTTP文件分块断点续传下载工具,并基于.NET Framework 2.0及以上版本进行开发。 首先,理解断点续传技术。断点续传允许用户在文件下载中断后,从上次中断的位置...

    C#实现http下的多线程下载

    在IT领域,尤其是在软件开发中,多线程技术是一种常用的方法,用于提高程序的执行效率和并发性。本文将深入探讨如何使用C#语言来实现HTTP环境下的多线程下载功能,这对于处理大文件下载或者优化用户体验具有重要意义...

    httpclient4.2.1.zip

    1. **多线程支持**:HttpClient 4.2.1支持多线程并发请求,可以高效地处理大量并发连接,提升了处理能力。 2. **连接管理**:它提供了连接池管理,允许重用已建立的TCP连接,减少网络延迟,提高整体性能。 3. **...

    基于HTTP多线程下载工具源码

    多线程则是并发处理的一种方式,它允许多个任务或数据块同时下载,从而提高下载速度,特别是对于大文件,能显著减少下载时间。 【描述】提到这是作者自己编写的,并愿意分享源码,这为学习者提供了一个实践和研究的...

    C# 多线程下载程序

    【C# 多线程下载程序】是一种使用C#编程语言在Windows桌面应用程序(Winform)环境下实现的下载工具,其特色在于利用多线程技术提高下载效率,并且支持断点续传功能,允许用户在下载中断后从上次停止的位置继续下载...

    基于http协议的多线程多任务下载

    多线程下载则是通过同时开启多个连接来请求文件的不同部分,每个连接负责下载文件的一部分,这样可以充分利用网络带宽,显著提升下载速度。实现多线程下载通常需要对文件进行分割,每个线程负责下载一个分割块。 ...

    Http多线程下载与断点续传分析

    多线程下载利用了计算机的多核处理器能力,将一个大文件分割成多个部分,同时进行下载,从而显著提高了下载速度。而断点续传则允许用户在文件下载中断后从上次停止的地方继续,避免了重新下载整个文件的困扰。 首先...

    Httpclient依赖包

    5. **线程安全**:HttpClient实例不是线程安全的,如果在多线程环境中使用,需要为每个线程创建独立的HttpClient实例,或者使用线程局部变量。 总的来说,HttpClient是Java开发者在进行HTTP通信时的重要工具,它的...

    commons-httpclient-3.1jar包

    4. 并发处理:在多线程环境中,需正确管理和同步HttpClient实例。 总的来说,Apache Commons HttpClient 3.1是Java开发中处理HTTP通信的强大工具,虽然现在已经有了更新的版本,但3.1版本在许多项目中仍被广泛使用...

    多线程断点续传 下载软件 研究 开发 毕业设计

    多线程断点续传下载软件是现代网络下载技术中的一个重要组成部分,尤其在处理大文件或者网络环境不稳定的情况下,它的优势显著。这类软件允许用户在下载过程中暂停、恢复,甚至在多个线程同时进行,提高了下载速度和...

    httpClient组合包.zip

    此外,HttpClient还支持异步操作,可以在多线程环境中高效地处理并发请求。 2. **httpcore-4.4.12.jar**:这是HttpClient的核心库,包含了HTTP协议的基本组件,如连接管理、请求和响应模型、编码器和解码器等。...

    仿迅雷多线程下载

    【正文】 ...通过研究和实践这个"仿迅雷多线程下载"项目,初学者不仅可以掌握Java的多线程编程,还能深入理解网络编程和文件操作,这对于提升软件开发能力,尤其是构建高性能的下载应用非常有帮助。

    多线程下载器

    多线程下载器是一种高效的文件下载工具,它利用了计算机的多核处理器和网络带宽资源,通过将大文件分割成多个小部分并同时下载,从而显著提高下载速度。这种技术在处理大文件,如软件安装包、高清视频或者大型游戏...

    多线程下载网站图片源码

    本项目"多线程下载网站图片源码"提供了一个实现多线程下载的实例,可以帮助开发者理解如何在C#环境下利用多线程技术优化文件下载流程,特别是对于图片这种大体积的文件,可以显著提高下载速度。 首先,我们需要了解...

    HttpClient3.1.jar

    `SingleClientConnManager`是默认的实现,适用于单线程或少量并发的场景,而`MultiThreadedHttpConnectionManager`则更适合多线程环境,可以有效管理多个并发的HTTP连接。 HttpClient还支持处理HTTP状态码和响应头...

    httpclient3.1 javadoc chm版

    HttpClient允许进行多线程并发请求,但需要注意线程安全问题。此外,合理设置连接超时、重试策略、连接池大小等参数也能显著提升性能。 九、与其他库的集成 HttpClient可以方便地与Spring框架、JUnit测试等结合使用...

    httpclient4.3.5

    3. **线程安全**:HttpClient 4.3.5加强了多线程环境下的安全性能,使得在并发环境中使用更加稳定。 4. **异常处理**:改进了错误处理机制,提供了更详细的错误信息,便于开发者定位问题。 5. **性能优化**:通过...

Global site tag (gtag.js) - Google Analytics