- theoffspring
- 等级:
- 性别:
- 文章: 218
- 积分: 190
- 来自: 大连
|
这些天因为脚崴了在家休息,闲着没事就琢磨写个数据抓取的软件,这必然用到HttpClient这类连接远程资源的库,学到它时,抓取资源的时候,很可能要自动下载一些东西,这必须又要用到多线程的下载技术,于是就用它下了一个简单的,功能虽不全,但一般用用还可以。
- 原理
原理是模仿迅雷以前的方式,每个线程下载一部分文件内容,写入到一个单独的临时文件,当所有线程都完成下载时,再将这些临时文件,合并成一个。
- 代码
核心代码是Downloader这个类:
package data.scrap;
import com.xdg.util.*;
import data.scrap.util.GetUtil;
import data.scrap.util.HeaderUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.util.EntityUtils;
import java.io.*;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Downloader {
private int threadNum = 1;
private String url;
volatile private long byteSize;
private String sizeUnit;
private String unitfiedSize;
private String fileName;
private final static Log log = LogFactory.getLog(Downloader.class);
volatile private boolean stopMonitor;
public Downloader(String url, int threadNum) {
this.url = url;
this.threadNum = threadNum;
}
public Downloader(String url) {
this.url = url;
}
/**
* entry point to download files
* @param saveDir
* @param saveName file name to save, use original file name if kept null
* @throws Exception
*/
public void download(String saveDir, String saveName) throws Exception {
if (FileUtil.isNotExist(saveDir)){
FileUtil.makeDirs(saveDir);
}
long startTime = System.currentTimeMillis();
ExeResult result1 = GetUtil.doGet(url);
if (result1.getResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw new HttpException("Can't connect to the remote server at '"+url+"'");
}
if (!HeaderUtil.isFile(result1.getResponse())) {
throw new HttpException("No file found at '" + url + "'");
}
byteSize = HeaderUtil.getSize(result1.getResponse());
fileName = HeaderUtil.getFileName(result1.getResponse(),url);
convertSizeUnit();
log.debug("the file size is " + unitfiedSize + sizeUnit + "/" + byteSize + "B");
final ArrayList<DownloadWorker> workers = new ArrayList<DownloadWorker>();
ExecutorService pool = Executors.newFixedThreadPool(15);
long sizePerThread = byteSize / threadNum;
for (int i = 0; i < threadNum; i++) {
DownloadWorker worker;
if (i == 0) {
worker = new DownloadWorker(result1, (i + 1), 0, sizePerThread);
} else if (i < threadNum - 1) {
worker = new DownloadWorker(url, (i + 1), i * sizePerThread, sizePerThread);
} else {
worker = new DownloadWorker(url, (i + 1), i * sizePerThread, byteSize - sizePerThread * i);
}
workers.add(worker);
}
monitorDownloadState(workers);
try {
pool.invokeAll(workers);
} catch (InterruptedException e) {
log.error(e);
throw e;
} catch (Exception e) {
log.error(e);
throw e;
}
pool.shutdown();
stopMonitor = true;
if (isDownloadOK(workers)) {
String savePath = null;
if (saveName == null) {
savePath = saveDir + "/" + fileName;
} else {
savePath = saveDir + "/" + saveName;
}
log.info("partial files are being merged into one last file '"+savePath+"'");
mergePartialFiles(workers, savePath);
log.info("merging finished, seconds elapsed is " + TimeUtil.elapsedSecond(startTime));
}
}
/**
* merge temporary and partial files into one file
* @param workers
* @param savePath
*/
private void mergePartialFiles(ArrayList<DownloadWorker> workers, String savePath) {
ArrayList<File> files = new ArrayList<File>();
for (DownloadWorker worker : workers) {
files.add(new File(worker.getFilePath()));
}
FileUtil.mergePartialFilesLinear(files.toArray(new File[]{}), savePath);
}
private boolean isDownloadOK(ArrayList<DownloadWorker> workers) {
for (DownloadWorker worker : workers) {
if (!worker.isSuccessful()) {
log.error("Thread "+worker.getId()+" fail to download all bytes, expected bytes : "
+worker.getLength()+", actual bytes: "+worker.getReadBytes());
return false;
}
}
return true;
}
private void monitorDownloadState(final ArrayList<DownloadWorker> workers) {
new Thread(new Runnable() {
public void run() {
while (true) {
boolean exit = false;
for (DownloadWorker worker : workers) {
if (worker.isInvoked()) {
exit = true;
break;
}
}
if (!exit) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
log.error(e);
}
} else {
break;
}
}
long startTime = System.currentTimeMillis();
long lastDownloadedBytes=0;
while (!stopMonitor) {
long downloadedBytes = 0;
for (DownloadWorker worker : workers) {
if (worker.isInvoked()) {
downloadedBytes += worker.getReadBytes();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error(e);
}
if (lastDownloadedBytes!=downloadedBytes){
lastDownloadedBytes=downloadedBytes;
} else {
continue;
}
log.info(downloadedBytes / 1024 + "KB(" + downloadedBytes * 100 / byteSize + "%) has been downloaded, average speed is " +
NumUtil.truncateDecimal(downloadedBytes / 1024.0 / ((System.currentTimeMillis() - startTime) / 1000.0), 2)+ "KB/S");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error(e);
}
}
}
}).start();
}
private void convertSizeUnit() {
if (byteSize < SizeUnit.MB) {
unitfiedSize = NumUtil.truncateDecimal(byteSize / 1024, 0);
sizeUnit = "KB";
} else if (byteSize >= SizeUnit.MB && byteSize < SizeUnit.GB) {
unitfiedSize = NumUtil.truncateDecimal(byteSize / SizeUnit.MB, 2);
sizeUnit = "MB";
} else {
unitfiedSize = NumUtil.truncateDecimal(byteSize / SizeUnit.GB, 2);
sizeUnit = "GB";
}
}
}
class DownloadWorker implements Callable<Object> {
private String url;
private long start;
private long length;
private ExeResult exeResult;
private final static Log log = LogFactory.getLog(DownloadWorker.class);
private int id;
private long readBytes;
private boolean invoked;
private boolean successful;
private String filePath;
DownloadWorker(String url, int id, long start, long length) {
this(GetUtil.doGet(url), id, start, length);
this.url = url;
}
DownloadWorker(String url, int id, long start) {
this(GetUtil.doGet(url), id, start);
this.url = url;
}
DownloadWorker(ExeResult exeResult, int id, long start, long length) {
this.exeResult = exeResult;
this.start = start;
this.length = length;
this.id = id;
this.url = exeResult.getUrl();
}
DownloadWorker(ExeResult exeResult, int id, long start) {
this.exeResult = exeResult;
this.start = start;
this.id = id;
this.url = exeResult.getUrl();
}
public long getReadBytes() {
return readBytes;
}
public Object call() throws Exception {
if (exeResult.getResponse().getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw new HttpException("Thread " + id + " can't connect to the remote server at '"+url+"'");
}
if (!HeaderUtil.isFile(exeResult.getResponse())) {
throw new HttpException("No file found'" + url + "'");
}
invoked = true;
filePath = SysPropUtil.getTempDir() + HeaderUtil.getFileName(exeResult.getResponse(),url) + ".part" + id;
log.debug("Thread " + id + " is download bytes of " + start + " to " + (start + length - 1) + ", path : " + filePath);
HttpResponse response = exeResult.getResponse();
// byte[] buffer = new byte[4096];
InputStream is = null;
try {
is = response.getEntity().getContent();
} catch (IOException e) {
log.error(e);
}
FileOutputStream fos = null;
try {
fos = new FileOutputStream(filePath);
} catch (FileNotFoundException e) {
log.error(e);
}
try {
long writtenBytes = FileUtil.writePart(fos, is, HeaderUtil.getSize(exeResult.getResponse()), start, length, new FileWriteProgress() {
public void changed(long readByteCnt) {
readBytes = readByteCnt;
}
});
if (writtenBytes >= length){
successful = true;
}
EntityUtils.consume(exeResult.getResponse().getEntity());
} catch (IOException e) {
log.error(e);
} finally {
exeResult.getHttpClient().getConnectionManager().shutdown();
}
return null;
}
public boolean isInvoked() {
return invoked;
}
public boolean isSuccessful() {
return successful;
}
public String getFilePath() {
return filePath;
}
public int getId() {
return id;
}
public long getLength() {
return length;
}
}
写入临时文件时,调用了FileUtil的下面这个方法:
public static long writePart(OutputStream os, InputStream is, long size, long start, long len, FileWriteProgress fileWriteProgress) throws IOException {
int bufferSize = 64*1024; //buffer size, 64KB
byte[] buffer = new byte[bufferSize];
is.skip(start);
long pos = start;
long end;
if (pos + len - 1 > size - 1) {
end = size - 1;
} else {
end = start + len - 1;
}
int cnt = 0;
long total = 0;
while (true) {
if (pos + bufferSize - 1 > end - 1) {
cnt = is.read(buffer, 0, (int) (end - pos + 1));
} else {
cnt = is.read(buffer);
}
os.write(buffer, 0, cnt);
total += cnt;
fileWriteProgress.changed(total);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
log.error(e);
}
if (pos + bufferSize - 1 >= end - 1) {
break;
} else {
pos += cnt;
}
}
os.flush();
os.close();
return total;
}
- 实测数据
找了华军上的一个文件,测试类代码:
package httpclient.test;
import data.scrap.Downloader;
public class Download {
public static void main(String[] args) throws Exception {
// Downloader downloader=new Downloader("http://dlwt.csdn.net/fd.php?i=883244521434558&s=facc18193fed9034f23d21ee20323358",1);
// Downloader downloader=new Downloader("http://dl5.csdn.net/fd.php?i=574444563309987&s=4b836c3db271e6e9189d36937772f4fe",1);
// Downloader downloader=new Downloader("http://dlwt.csdn.net/fd.php?i=145444563309889&s=31065ff22be5d0d72ceb9b700503a752",4);
Downloader downloader=new Downloader("http://dl.newhua.com:86/down/HA_FB_1.1.13%20Final_Asion_B.zip",2);
// Downloader downloader=new Downloader("http://dlwt.csdn.net/fd.php?i=676644563305445&s=b59e62e8fc5ade9cc0d5fafc340588b8",10);
downloader.download("f:/temp",null);
}
}
文件地址是
http://dl.newhua.com:86/down/HA_FB_1.1.13%20Final_Asion_B.zip
,我用的是联通的宽带,找的也是针对联通线路的地址,有兴趣的同学可以根据自己的宽带提供商找相应线路的文件以获得最高性能。
当前面的writePart方法的写入缓冲大小为48KB时,文件大小为5943KB,带宽8M,测试结果如下:
Thread Number |
Elapsed Seconds
|
Speed |
1 |
26 |
228KB/S |
2 |
17 |
350KB/S |
3 |
20 |
297KB/S |
4 |
26 |
229KB/S |
5 |
33 |
180KB/S |
6 |
40 |
149KB/S |
可以看到2个线程时,下载速度最高,线程数如果为7以上时,总有一些线程没读完数据就中断了,可能是缓冲太大,线程数太多时,连接无法处理过大的数据量,如果把缓冲设为64KB,则最快一次,2个线程用了15秒完成,但只出现了一次。即使缓冲设得小点,线程数设得高点也没用,因为服务器会对线程数有限制,过多会响应得很慢,10以下比较安全,我设过20个,速度会几K几K的那样。
我的带宽是8M,用迅雷,IDM等下载文件,会稳定在900多KB/S,用它们下载这个文件时,几秒就完事了,用HttpClient看来做下载,能力着实有限,不过如果在自己的程序中内嵌一个抓取数据用,已经足够用了。我也是刚学HttpClient,也许还有一些优化的方法吧。
- 运行环境
JDK: 1.7
OS: Win7旗舰版
HttpClient: 4.2.1
- 需要的工程
Common-Lib和Data-Scrapping-Lib我已经放到github上去了,后者还要引用前者,url分别为:
https://github.com/fxbird/Common-Lib
https://github.com/fxbird/Data-Scrapping-Lib
还有一个测试工程可以在附件中下载。它引用前面两个工程
- 总结
下载文件时,缓冲的大小和线程数是成反比的,缓冲设的越大,则可使用的线程数越小,反之越多,但缓冲也不能设得过小,刚开始时我习惯性地设成了4K,结果4个线程下载时,用了86秒,后来才想到提高缓冲。还有一个缺限就是没有对线程数超过服务器限制而自动切断多余的线程时的处理,而一个健壮的多线程下载软件对此会有完善的处理。不足之处欢迎指正,共同进步。
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
|