`
lfp001
  • 浏览: 100624 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

JAVA实现环形缓冲多线程读取远程文件

    博客分类:
  • JAVA
阅读更多

       如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。多线程中的同步、HttpURLConnection的超时阻塞等因素都会使代码看起来异常复杂。

      问题模型 数组buf[],N个“写”线程从文件顺序读入固定长度数据写入buf[];一个“读”线程按照文件内容顺序从缓冲区读取,一次读任意长度。一个写线程发生错误或文件读完,这个读线程和所有写线程退出。

      针对上述问题,简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;每次读小于32K的任意字节。线程同步:各个写线程互斥等待空闲块,用信号量机制分配空闲块;各写线程并发填写buf[];读线程和各写线程并发使用buf[]。

      为突出主题略去了一些和本文无关的代码。

 

一、HttpReader类功能:HTTP协议从指定URL读取数据。

package instream;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;

public final class HttpReader {
	public static final int MAX_RETRY = 10;
	private URL url;
	private HttpURLConnection httpConnection;
	private InputStream in_stream;
	private long cur_pos;			//决定seek方法中是否执行文件读取定位
	private int connect_timeout;
	private int read_timeout;
	
	public HttpReader(URL u) {
		this(u, 5000, 5000);
	}
	
	public HttpReader(URL u, int connect_timeout, int read_timeout) {
		this.connect_timeout = connect_timeout;
		this.read_timeout = read_timeout;
		url = u;
	}

	public int read(byte[] b, int off, int len) throws IOException {
		int r = in_stream.read(b, off, len);
		cur_pos += r;
		return r;
	}
			
	/*
	 * 抛出异常通知重试.
	 * 例如响应码503可能是由某种暂时的原因引起的,同一IP频繁的连接请求会遭服务器拒绝.
	 */
	public void seek(long start_pos) throws IOException {
		if (start_pos == cur_pos && in_stream != null)
			return;
		if (httpConnection != null) {
			httpConnection.disconnect();
			httpConnection = null;
		}
		if (in_stream != null) {
			in_stream.close();
			in_stream = null;
		}
		httpConnection = (HttpURLConnection) url.openConnection();
		httpConnection.setConnectTimeout(connect_timeout);
		httpConnection.setReadTimeout(read_timeout);
		String sProperty = "bytes=" + start_pos + "-";
		httpConnection.setRequestProperty("Range", sProperty);
		//httpConnection.setRequestProperty("Connection", "Keep-Alive");
		int responseCode = httpConnection.getResponseCode();
		if (responseCode < 200 || responseCode >= 300) {
			try {
				Thread.sleep(200);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			throw new IOException("HTTP responseCode="+responseCode);
		}

		in_stream = httpConnection.getInputStream();
		cur_pos = start_pos;
	}
}

  

二、IWriterCallBack接口

package instream;

/*
 * 读/写通信接口.类似于C++的回调函数
 * 
 * 例:
 * class BuffRandAcceURL 内实现本接口的方法tryWriting()
 * class BuffRandAcceURL 内new Writer(this, ...)传值到Writer.icb
 * class Writer 内调用icb.tryWriting()
 */
public interface IWriterCallBack {
	public int tryWriting() throws InterruptedException;
	public void updateBuffer(int i, int len);
	public void updateWriterCount();
	public int getWriterCount();
	public void terminateWriters();
}

  

三、Writer类:下载线程,负责向buf[]写数据。

package instream;
import java.net.URL;

public final class Writer implements Runnable {
	private static boolean isalive = true;	// 一个线程超时其它线程也能退出
	private static byte[] buf;
	private static IWriterCallBack icb;
	private HttpReader hr;
	
	public Writer(IWriterCallBack cb, URL u, byte[] b, int i) {
		hr = new HttpReader(u);
		icb = cb;
		buf = b;
		Thread t = new Thread(this,"dt_"+i);
		t.setPriority(Thread.NORM_PRIORITY + 1);
		t.start();
	}
	
	public void run() {
		int wbytes=0, wpos=0, rema = 0, retry = 0;
		int idxmask = BuffRandAcceURL.UNIT_COUNT - 1;
		boolean cont = true;
		int index = 0;		//buf[]内"块"索引号
		int startpos = 0;	//index对应的文件位置(相对于文件首的偏移量)
		long time0 = 0;
		while (cont) {
			try {
				// 1.等待空闲块
				if(retry == 0) {
					if ((startpos = icb.tryWriting()) == -1)
						break;
					index = (startpos >> BuffRandAcceURL.UNIT_LENGTH_BITS) & idxmask;
					wpos = startpos & BuffRandAcceURL.BUF_LENGTH_MASK;
					wbytes = 0;
					rema = BuffRandAcceURL.UNIT_LENGTH;
					time0 = System.currentTimeMillis();
				}
				
				// 2.定位
				hr.seek(startpos);

				// 3.下载"一块"
				int w;
				while (rema > 0 && isalive) {
					w = (rema < 2048) ? rema : 2048; //每次读几K合适?
					if ((w = hr.read(buf, wpos, w)) == -1) {
						cont = false;
						break;
					}
					rema -= w;
					wpos += w;
					startpos += w;	// 能断点续传
					wbytes += w;
				}
				
				// 下载速度足够快就结束本线程
				long t = System.currentTimeMillis() - time0;
				if(icb.getWriterCount() > 1 && t < 500)
					cont = false;
				
				//4.通知"读"线程
				retry = 0;
				icb.updateBuffer(index, wbytes);
			} catch (Exception e) {
				if(++retry == HttpReader.MAX_RETRY) {
					isalive = false;
					icb.terminateWriters();
					break;
				}
			}
		}
		icb.updateWriterCount();
		try {
			hr.close();
		} catch (Exception e) {}
		hr = null;
	}
}

 

四、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。关键是如何简单有效防止死锁?

package instream;
import java.net.URL;

public final class BuffRandAcceURL implements IWriterCallBack {
	public static final int UNIT_LENGTH_BITS = 15;
	public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS; //2^16=32K
	public static final int BUF_LENGTH = UNIT_LENGTH << 4;
	public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS; //16块
	public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);
	private static final int MAX_WRITER = 5;
	private static long file_pointer;
	private static int read_pos;
	private static int fill_bytes;
	private static byte[] buf;			//同时作写线程同步锁
	private static int[] unit_bytes;	//同时作读线程互斥锁
	private static int alloc_pos;
	private static URL url;
	private static boolean isalive = true;
	private static int writer_count;
	private static long file_length;
	private static long frame_bytes;
	private static int free_unit = UNIT_COUNT;	// "信号量"计数器
	
	public BuffRandAcceURL(String sURL) throws Exception {
		this(sURL,MAX_WRITER);
	}
	
	public BuffRandAcceURL(String sURL, int download_threads) throws Exception {
		buf = new byte[BUF_LENGTH];
		unit_bytes = new int[UNIT_COUNT];
		url = new URL(sURL);

		// 创建"写"线程
		writer_count = download_threads;
		for (int i = 0; i < download_threads; i++) {
			new Writer(this, url, buf, i + 1);
			Thread.sleep(200);
		}

		try_cache();
	}
	
	/*
	 * 缓冲
	 */
	private void try_cache() throws InterruptedException {
		int cache_size = BUF_LENGTH;
		int bi = unit_bytes[read_pos >> UNIT_LENGTH_BITS];
		if(bi != 0)
			cache_size -= UNIT_LENGTH - bi;
		while (fill_bytes < cache_size) {
			if (writer_count == 0 || isalive == false)
				return;
			synchronized (unit_bytes) {
				unit_bytes.wait(200);	//wait(200)错过通知也可结束循环?
			}
		}
	}

	private int try_reading(int i, int len) throws Exception {
		int n = (i + 1) & (UNIT_COUNT - 1);
		int r = (unit_bytes[i] > 0) ? (unit_bytes[i] + unit_bytes[n]) : unit_bytes[i];
		if (r < len) {
			if (writer_count == 0 || isalive == false)
				return r;
			try_cache();
		}
		
		return len;
	}
	
	/*
	 * 各个"写"线程互斥等待空闲块. 空闲块按由小到大的顺序分配.
	 */
	public int tryWriting() throws InterruptedException {
		int ret = -1;
		synchronized (buf) {
			while (free_unit == 0 && isalive)
				buf.wait();
			
			if(alloc_pos >= file_length || isalive == false)
				return -1;
			ret = alloc_pos;
			alloc_pos += UNIT_LENGTH;
			free_unit--;
		}
		return ret;
	}
	
	/*
	 * "写"线程向buf[]写完数据后调用,通知"读"线程
	 */
	public void updateBuffer(int i, int len) {
		synchronized (unit_bytes) {
			unit_bytes[i] = len;
			fill_bytes += len;
			unit_bytes.notify();
		}
	}
	
	/*
	 * "写"线程准备退出时调用
	 */
	public void updateWriterCount() {
		synchronized (unit_bytes) {
			writer_count--;
			unit_bytes.notify();
		}
	}
	
	public int getWriterCount() {
		return writer_count;
	}
	
	/*
	 * read方法内调用
	 */
	public void notifyWriter() {
		synchronized (buf) {
			buf.notifyAll();
		}
	}
	
	/*
	 * 被某个"写"线程调用,用于终止其它"写"线程;isalive也影响"读"线程流程
	 */
	public void terminateWriters() {
		synchronized (unit_bytes) {	
			if (isalive) {
				isalive = false;
				System.out.println("\n读取文件超时。重试 " + HttpReader.MAX_RETRY
						+ " 次后放弃,请您稍后再试。");
			}
			unit_bytes.notify();
		}
		notifyWriter();
	}
	
	public int read(byte[] b, int off, int len) throws Exception {
		int i = read_pos >> UNIT_LENGTH_BITS;
		
		// 1.等待有足够内容可读
		if(try_reading(i, len) < len || isalive == false)
			return -1;

		// 2.读取
		int tail = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH
		if (tail < len) {
			System.arraycopy(buf, read_pos, b, off, tail);
			System.arraycopy(buf, 0, b, off + tail, len - tail);
		} else
			System.arraycopy(buf, read_pos, b, off, len);

		fill_bytes -= len;
		file_pointer += len;
		read_pos += len;
		read_pos &= BUF_LENGTH_MASK;
		unit_bytes[i] -= len;
		if (unit_bytes[i] < 0) {
			int ni = read_pos >> UNIT_LENGTH_BITS;
			unit_bytes[ni] += unit_bytes[i];
			unit_bytes[i] = 0;
			free_unit++;
			notifyWriter();
		} else if (unit_bytes[i] == 0) {
			free_unit++;	// 空闲块"信号量"计数加1
			notifyWriter(); 	// 3.通知
		}
		// 如果下一块未填满,意味着文件读完,第1步已处理一次读空两块的情况

		return len;
	}

}

 

本文是JAVA开源项目jmp123源代码的一部分,单独成文旨在与朋友们交流文中提出的问题模型的解决方法。

【jmp123下载地址】http://jmp123.sourceforge.net/

分享到:
评论
7 楼 qn_lf 2010-08-18  
不用被评为隐藏帖吧 楼主贴这么多 没功劳也有苦劳
6 楼 lfp001 2010-08-18  
<div class="quote_title">ahuango 写道</div>
<div class="quote_div">最好能贴一下使用代码,否则真的很难有耐心看下去。 有无从入手的感觉 </div>
<p> </p>
<div class="quote_title">xiayh1002 写道</div>
<div class="quote_div">代码最好能下载运行一下,多线程光这么看代码,还比较难理解</div>
<p> </p>
<div class="quote_title">beginLi 写道</div>
<div class="quote_div">楼主 开源是有好处的..你的播放器就舍不得拿出来? </div>
<p> </p>
<p>这儿中介对以缓冲方式读取远程文件的方法的一介简要介绍。更详细的讲解在本站论坛和本站我的博客上有,如果你有兴趣,请看《(四)用JAVA编写MP3解码器——读取文件》</p>
<p>本站论坛 <a href="http://www.iteye.com/topic/740090">http://www.iteye.com/topic/740090</a></p>
<p>本站博客 <a href="http://lfp001.iteye.com/blog/740090">http://lfp001.iteye.com/blog/740090</a></p>
5 楼 beginLi 2010-08-18  
楼主 开源是有好处的..你的播放器就舍不得拿出来?
4 楼 xiayh1002 2010-08-18  
代码最好能下载运行一下,多线程光这么看代码,还比较难理解
3 楼 yangguo 2010-08-18  
bigkai_13 写道
这哥们最先研究肯定是C/C++ 或者是PHP。

+1.变量命名不符java的规范。
2 楼 bigkai_13 2010-08-17  
这哥们最先研究肯定是C/C++ 或者是PHP。
1 楼 ahuango 2010-08-17  
最好能贴一下使用代码,否则真的很难有耐心看下去。 有无从入手的感觉

相关推荐

    用JAVA实现缓冲多线程无阻塞读取远程文件.doc

    总结来说,这个Java实现的多线程无阻塞读取远程文件系统利用了环形缓冲区来提升效率,通过多线程并行下载数据,并通过同步机制确保数据的正确读写。`HttpReader`类作为基础工具,负责HTTP连接和数据读取,同时具备...

    arrayBuffer(环形缓冲区)

    环形缓冲区(Circular Buffer),又称为循环缓冲区或环形队列,是一种常见..."arrayBuffer(环形缓冲区)"这个类就是这样的一个实现,它提供了一种在并发环境中安全使用环形缓冲区的方式,确保了多线程同步访问的正确性。

    环形缓冲区实现原理

    在多线程环境下,对环形缓冲区的读写操作需要进行互斥控制,以避免竞态条件。常见的同步机制包括互斥锁(mutexes)、读写锁(read-write locks)以及原子操作(atomic operations)等。 环形缓冲区的优势在于其简单...

    基于Java的环形缓冲数组实现.zip

    4. **同步控制**:由于环形缓冲区可能在多线程环境下使用,因此需要确保读写操作的原子性和一致性。在Java中,可以使用`synchronized`关键字、`java.util.concurrent.locks.ReentrantLock`或`java.util.concurrent....

    C语言环形缓冲区

    在这个示例中,你可以学习如何创建和管理线程,以及如何利用这些同步机制来实现环形缓冲区的多线程读写操作。 总之,C语言实现的环形缓冲区是解决多线程环境下的数据通信问题的有效工具。通过理解和实践这样的设计...

    labview 环形缓冲区组件

    例如,它可以支持多线程读写,以及自定义的溢出策略等高级特性。 在实际应用中,环形缓冲区广泛应用于信号处理、数据记录、图像处理和通信系统等领域。例如,在一个实时信号监测系统中,环形缓冲区可以持续收集...

    Delphi实现的环形缓冲区(全文件)

    环形缓冲区是一种在计算机编程中常见的数据结构,特别是在实时系统和并发编程中。它以一个固定大小的数组为基础,通过巧妙地管理读写...在实际应用中,环形缓冲区常被用于网络通信、多线程同步、音频视频流处理等领域。

    java数组-基于java实现的环形缓冲数组.zip

    在实际应用中,环形缓冲数组常用于实现高效的线程间通信,例如在多线程环境中的生产者-消费者模型。生产者线程负责向缓冲区添加数据,而消费者线程负责消费这些数据。通过使用环形缓冲数组,可以减少锁的使用,提高...

    Delphi实现的环形缓冲区

    为了提高效率,可以在多线程环境下使用锁或其他同步机制来保护环形缓冲区的读写操作。此外,还可以考虑使用异步I/O或事件驱动模型来进一步提升数据处理能力。 描述中提到的"缓冲区测试"文件可能是包含实际代码实现...

    无锁 环形缓冲区

    无锁环形缓冲区是一种在多线程编程中常见的数据结构,它被广泛应用于高效并发环境中的数据交换。无锁(Lock-Free)意味着在访问共享数据时,无需使用传统的锁机制来确保线程安全,而是依赖于原子操作(Atomic ...

    环形缓冲区读写操作的分析与实现

    ### 环形缓冲区读写操作的分析与实现 #### 引言 环形缓冲区作为嵌入式系统中的重要数据结构,在多任务环境中扮演着关键角色。它能够有效地管理数据流,尤其是在存在单一读任务和单一写任务的情况下。通过采取适当...

    环形缓冲区建立源码下载

    2. 并发友好:在多线程环境下,环形缓冲区可以通过适当的同步机制(如互斥锁或信号量)实现高效的数据交换,减少阻塞和等待。 3. 空间利用率高:充分利用缓冲区空间,避免数据溢出或浪费。 在实际应用中,环形缓冲...

    环形缓冲区源代码

    7. roundbuffer.txt文件可能包含的就是一个具体的环形缓冲区实现的源代码,你可以通过阅读和分析这个文件来学习如何在实际项目中应用环形缓冲区。 环形缓冲区在串口通讯中的应用,能够有效地解决数据接收和处理之间...

    一个c++环形队列缓冲区

    - 函数 `readbuf` 实现了从环形缓冲区读取数据的功能。 - 当队列非空时,从数组中读取一个数据元素,并更新读指针。 - 使用模运算确保读指针不会超出数组的边界。 2. **写操作**: - 函数 `writebuf` 实现了向...

    xunhuan.rar_visual c_xunhuan _环形缓冲_环形缓冲区

    在Visual C++环境中,开发者可以利用其强大的库函数和工具实现自己的环形缓冲区,同时需要关注线程安全问题,确保在多线程环境下的正确运行。通过阅读提供的文档和资料,可以深入理解环形缓冲区的原理和应用,提升在...

    环形缓冲区实现

    环形缓冲区(Ring Buffer)是一种在计算机编程中常见的数据结构,尤其在处理实时系统、多线程通信、数据采集和存储等领域应用广泛。它在内存中创建一个固定大小的缓冲区,通过巧妙的索引管理,使得数据的读写操作...

    环形缓冲区_环形buffer_

    5. 安全性:在多线程或中断驱动的环境中,环形缓冲区的访问需要同步控制,以防止数据竞争。可以使用互斥锁(mutex)、信号量(semaphore)等机制来保证数据的一致性。 6. 清空缓冲区:有时候我们可能需要清空缓冲区...

    电子-stm32f10xdemo环形缓冲区.rar

    7. **示例代码**:`stm32f10x_demo_环形缓冲区`文件很可能是包含具体实现环形缓冲区的代码示例,可能包含了初始化、读写操作以及同步机制的实现,是理解STM32F10X环形缓冲区应用的实用参考。 理解并掌握这些知识点...

Global site tag (gtag.js) - Google Analytics