`

Thread_大批量数据的分页处理(生产者-消费者)

 
阅读更多

java应用中通常会有处理大批量数据的场景,这里介绍一种分页处理的方式,仅供参考。

大批量数据通常不能一次读取或者写入,因为一次读取会消耗过多的内存,一次写入会长时间占用DB连接,

我的思路是参照分页查询的模式缩小每次操作的数据集,循环执行,直到处理完毕。

 

两种实现方式:

1.采用单线程模式顺序执行。

每次读取固定大小的数据记录,然后插入到存储设备。

 

2.采用多线程模式。

1个线程读取,另1个线程负责处理数据。

读写并发执行,一定程度上提高了功能的吞吐量和CPU的利用率。

 

具体实现:

1. 采用固定大小的队列来存储分页数据;

2. 队列满了,则开始执行入库等操作;

3. 队列空了,则开始向队列中插入数据;

 

 

/**
 * 批量数据分页处理
 * 
 * 场景:大批量数据查询、入库
 * 解决:拆分大批量为小数据集合
 * 
 * 关键点:指定长度的queue/是否可读取read/存、取数据线程间的通讯
 * 
 * @author charles
 *
 */
public class PageDataHandler {
	boolean parseComplete = false;
	DataBuffer dataBuffer = new DataBuffer();
	
	private static String[] data = null;
	private static int rowCount = 0;
	private static int pageCount = 0;
	
	
	// 准备数据
	static {
		data = new String[]{"0-0","1-1","2-2","3-3","4-4","5-5","6-6","7-7","8-8"};
		rowCount = data.length;
		pageCount = rowCount / DataBuffer.ROWS_PER_PAGE + ((rowCount % DataBuffer.ROWS_PER_PAGE) == 0 ? 0 : 1);
	}
	
	
	/**
	 * 分页查询数据
	 * @param page
	 * @return
	 */
	private void findByPage() throws Exception {
		
		Thread t = new Thread() {
			public void run() {
				try {
					for(int i = 0; i < pageCount; i++){
						int begin = i * DataBuffer.ROWS_PER_PAGE;
						//装满queue为止
						for(int j = 0; j < DataBuffer.ROWS_PER_PAGE; j++){
							if(begin+j < data.length)
								dataBuffer.put(data[begin+j]);
						}
					}
					parseComplete = true;
					dataBuffer.triggerRead(true);
					
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};
		
		t.start();
	}
	
	
	/**
	 * 消费queue中的数据
	 */
	private void bizz(){
		Thread t = new Thread() {
			public void run() {
				try {
					int bufferSize = 0;
					while (true) {
						bufferSize = dataBuffer.queue.size();
						if(bufferSize > 0)
							dataBuffer.oper();
						
						if(parseComplete == true)
							break;
					}
					
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};
		
		t.start();
	}
	
	
	public static void main(String args[]) throws Exception {
		PageDataHandler pageHandler = new PageDataHandler();		
		pageHandler.findByPage();//1.分页获取数据
		pageHandler.bizz();//2.操作分页数据
	}
}



 

 

import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class DataBuffer {
	public static final int ROWS_PER_PAGE = 3;
	// 指定长度的queue
	public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(ROWS_PER_PAGE);
	// 是否可读取
	private boolean read = false;
	
	
	public synchronized void put(String data) throws Exception {
		queue.put(data);
		System.out.println("** put : "+ data);
		
		if(queue.size() == ROWS_PER_PAGE)
			triggerRead(true);
		
		while(read){
			try {
				this.wait();
			} catch (InterruptedException e) {
			}
		}
	}
	
	
	public synchronized void triggerRead(boolean read) {
		this.read = read;
		this.notifyAll();
	}
	
	
	public synchronized void oper() throws Exception {
		while(!read){
			try {
				this.wait();
			} catch (InterruptedException e) {
			}
		}
		
		System.out.println("** queue size : "+ queue.size());
		
		for(Iterator it = queue.iterator(); it.hasNext();){
			String data = (String)it.next();
			//操作数据
			System.out.println("** get : "+ queue.take());
		}
		triggerRead(false);
	}
}

 

分享到:
评论

相关推荐

    jchc.rar_tearshmj_生产者_生产者-消费者问题 c++ _生产者和消费者_生产者消费者

    1. 定义一个缓冲区,作为生产者和消费者共享的数据存储区域。 2. 使用互斥锁确保任何时候只有一个线程可以访问缓冲区。 3. 使用条件变量让生产者知道何时可以生产新的产品(当缓冲区未满时),以及让消费者知道何时...

    用线程做一个生产者-消费者同步问题.zip_信号量_信号量c#_生产者 消费者_生产者-消费者_生产者消费者

    用线程做一个生产者-消费者同步问题 其实我是想用线程做一个生产者-消费者同步问题,现在已经成功了,贴出来,大家请指证. 我想让线程thread1和thread2按顺序显示,于是加了个信号量mysem,并且在新建两个线程的两个...

    PC.rar_pv操作_生产者消费者_生产者-消费者问题

    在生产者-消费者问题中,我们可以设置两个信号量:一个是buffer,表示缓冲区的空闲位置,另一个是item,表示缓冲区中的数据个数。当生产者试图往缓冲区放数据时,它会先执行P(buffer)检查是否有空位,如果没有则等待...

    labview 生产者消费者例子

    总之,LabVIEW 生产者-消费者模型是解决多线程同步问题的有效工具,它通过队列、事件结构、条件变量和信号量等手段,协调了数据生成与处理的过程,提高了系统的并发性和效率。通过学习和实践提供的例程,可以进一步...

    JAVA_生产者-消费者

    在这个模式中,"生产者"负责生成数据,而"消费者"则负责处理这些数据。下面将详细介绍这一模式及其在Java中的实现。 生产者-消费者模型的核心是共享资源,通常是某种缓冲区或队列,生产者将产品放入这个共享空间,...

    生产者-消费者.zip

    "生产者-消费者"模式是一种经典的并发设计模式,广泛应用于解决数据处理中的异步问题。本项目提供了一个简单的Java实现,帮助开发者理解如何在实际场景中运用这种模式。 生产者-消费者模式的核心思想是共享资源...

    计算机课程设计 操作系统生产者-消费者问题

    创建“生产者”线程 缓冲区 是否阻塞 “生产者”等待 “消费者”取出缓冲区的数据 创建“消费者”线程 “消费者”阻塞 缓冲区是否为空 输入数据 “生产者”生产产品后被唤醒 NO NO YES 运行结果 1. 截图一: 2. ...

    C++ 多线程通信方式简介并结合生产者-消费者模式代码实现

    生产者-消费者模式是一种典型的多线程同步问题,一个或多个生产者线程生成数据,一个或多个消费者线程消耗这些数据。在此模式中,通常使用队列作为缓冲区,同时利用互斥量和条件变量来同步生产者和消费者的动作。 ...

    Java多线程实现生产者消费者

    通过这样的设计,我们可以有效地控制生产者和消费者的执行顺序,确保了数据的正确性,并且避免了先消费后生产的情况。在实际应用中,可能需要根据具体需求进行优化,例如添加异常处理、使用更高级的同步工具(如`...

    Java多线程 生产者-消费者模式

    在这个模式中,生产者负责生成数据并放入共享的数据结构(如队列),而消费者则从这个数据结构中取出数据并进行处理。这个模式有效地实现了线程间的协作,避免了资源竞争和死锁。 在Java中,我们可以使用`java.util...

    producers-and-consumers.zip_C# 生产者_c# 生产 消费者_c#消费者_c#生产消费_neckq

    生产者负责生成数据,而消费者则负责处理这些数据。在这个模型中,通常会有一个共享的数据缓冲区,生产者将数据放入缓冲区,而消费者从中取出数据。为了确保数据的一致性和防止资源竞争,通常会使用线程同步机制,如...

    pc.zip_生产者 消费者_生产者与消费者_生产者消费者

    在IT领域,生产者-消费者模型(Producer-Consumer Pattern)是一种经典的多线程问题解决方案,主要应用于并发编程中。...在实际应用中,生产者-消费者模型广泛应用于消息队列、数据处理、I/O操作等多种场景。

    并发控制-生产者-消费者问题实验报告

    4. **互斥与同步**:确保生产者与消费者之间能够正确地进行互斥和同步操作,即防止数据竞争并保证消息的有序处理。 5. **实现方式**:通过线程和进程两种方式实现并发控制模型。 6. **测试方案**:设计不同的...

    多线程间通信:多生产者-多消费者实例

    在这个模式中,多个生产者线程生成数据,而多个消费者线程则负责消费这些数据。为了确保数据的一致性和避免资源竞争,我们需要采用有效的同步机制。 首先,我们要理解“生产者-消费者”模型的基础,这是由计算机...

    java生产者-消费者

    生产者-消费者模式是多线程编程中的一种经典模式,它通过共享缓冲区来协调多个线程之间的交互:一组线程(生产者)负责向缓冲区添加数据,而另一组线程(消费者)则从缓冲区读取并处理这些数据。这种模式能够有效地...

    java多线程实现生产者和消费者

    在并发编程中,"生产者-消费者"模式是一种经典的解决问题的范式,用于协调两个或更多线程间的协作,其中一部分线程(生产者)生成数据,另一部分线程(消费者)消费这些数据。 生产者-消费者模型的核心在于共享资源...

    生产者消费者问题C#

    问题的关键是确保生产者不会在数据尚未被消费者处理时过度生产,同时也要避免消费者在无数据可消费时浪费资源。 在C#中,我们通常使用`System.Threading.Thread`类来创建线程,`System.Threading.Monitor`类来实现...

    基于JAVA线程机制研究生产者-消费者问题.pdf

    生产者负责生成数据并放入缓冲区,而消费者则从缓冲区中取出数据进行处理。在这种情况下,需要确保生产者不会在缓冲区满时向其中添加数据,消费者也不会在缓冲区为空时尝试从中取出数据。 在Java中,线程机制是实现...

    comsumerandproducer.rar_生产者 消费者_生产者与消费者_生产者与消费者问题

    这个问题通常出现在一个系统中,其中一个或多个线程(生产者)生成数据,而其他线程(消费者)则负责处理这些数据。在并发环境中,必须确保生产者不会在消费者未准备好接收时生产数据,同时消费者也不会尝试消费尚未...

Global site tag (gtag.js) - Google Analytics