`
tom_seed
  • 浏览: 322108 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

多线程写文件之同步锁(synchronized)与原子量(AtomicXXX)

 
阅读更多

业务需求:将数据库表中的大数据以文本方式保存到本地磁盘,即通过线程写入文件。

业务实现

  • 主线程开启创建文件缓冲流,启动多条子线程,并将文件缓冲流提供给每个子线程
  • 每个子线程调用DAO分页查询接口获取到的数据,组装拼接写入到文件缓冲流中

在这个简单的业务里面最需要注意的应该是每个子线程分页查询时的页码数,需要通过同步的方式来控制。

 

一、同步锁(synchronized)的方式

同步页码类:

/**
 * 同步对象,提供页码
 */
public class SyncObj {
	private int pageNo = 0;
	public synchronized int getPageNo() {
		pageNo ++;
		return pageNo;
	}
}

 

子线程类:

public class WriteFileThread implements Runnable {

	protected final Log log = LogFactory.getLog(this.getClass());
	private String name;
	private ItemMapper itemMapper;
	private SyncObj obj;
	private BufferedWriter bufferwriter = null;

	public WriteFileThread(String name, ItemMapper itemMapper, SyncObj obj, BufferedWriter bufferwriter){
		this.name = name;
		this.itemMapper = itemMapper;
		this.obj = obj;
		this.bufferwriter = bufferwriter;
	}

	@Override
	public void run() {
		int pageNoCopy = 0;
		List<Item> itemList = null;
		
		StringBuilder sb = new StringBuilder();
		try {
			Map<String, Object> param = new HashMap<String, Object>();
			while(true){
				
				pageNoCopy = obj.getPageNo();
				log.info("线程["+name+"]获取到的当前页码为:"+pageNoCopy);
				param.put("index", (pageNoCopy-1)*10000);
				param.put("pageSize", 10000);//一万条读一次
				
				itemList = itemMapper.queryPagination(param);
				if(itemList == null || itemList.size() == 0){
					log.info("线程["+name+"]在第"+pageNoCopy+"页退出了");
					break;
				}
				for(Item item : itemList){
					sb.append(item.getItemNum()).append(item.getItemName()).append("\n");
					bufferwriter.write(sb.toString());
					sb.delete(0, sb.length());
				}
				bufferwriter.flush();//刷新流
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

 

主线程(这里采用Http Request)代码:

@RequestMapping(value="/generate_file")
public void generateFile(){
	File file = new File("D://"+System.currentTimeMillis()+".txt");
	if(!file.exists()){
		try {
			file.createNewFile();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	SyncObj obj = new SyncObj();
	ExecutorService pool = Executors.newFixedThreadPool(20);

	FileWriter filewriter = null;
	BufferedWriter bufferwriter = null;
	try {
		filewriter = new FileWriter(file, true);
		bufferwriter = new BufferedWriter(filewriter);
		for(int i=0; i<20; i++)
			pool.execute(new WriteFileThread("线程"+(i+1), itemMapper, obj, bufferwriter));
		Thread.sleep(1000*60);
	} catch (Exception e) {
		e.printStackTrace();
	}finally{
		try {
			bufferwriter.close();
			filewriter.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

二、原子类(AtomicXXX)及同步量计数器(CountDownLatch)的应用

同步页码类:

这里使用了JDK自带的原子类,能够更高效的提供数据同步,所以同步页码类就不需要了。

 

子线程类:

public class WriteFileThread2 implements Runnable {

	protected final Log log = LogFactory.getLog(this.getClass());
	private String name;
	private ItemMapper itemMapper;
	private AtomicInteger pageNo;
	private CountDownLatch countDown;
	private BufferedWriter bufferwriter = null;

	public WriteFileThread2(String name, ItemMapper itemMapper, AtomicInteger pageNo, CountDownLatch countDown, BufferedWriter bufferwriter){
		this.name = name;
		this.itemMapper = itemMapper;
		this.pageNo = pageNo;
		this.bufferwriter = bufferwriter;
		this.countDown = countDown;
	}

	@Override
	public void run() {
		int pageNoCopy = 0;
		List<Item> itemList = null;
		
		StringBuilder sb = new StringBuilder();
		try {
			Map<String, Object> param = new HashMap<String, Object>();
			while(true){
				
				pageNoCopy = pageNo.getAndIncrement();//原子量中获取页码,并且自增1
				log.error("线程["+name+"]获取到的当前页码为:"+pageNoCopy);
				param.put("index", (pageNoCopy-1)*10000);
				param.put("pageSize", 10000);//每页获取一万条记录
				
				itemList = itemMapper.queryPagination(param);
				if(itemList == null || itemList.size() == 0){
					log.info("线程["+name+"]在第"+pageNoCopy+"页退出了");
					break;
				}
				for(Item item : itemList){
					sb.append(item.getItemNum()).append(item.getItemName()).append("\n");
					bufferwriter.write(sb.toString());
					sb.delete(0, sb.length());
				}
				bufferwriter.flush();//刷新流
			}
			countDown.countDown();//计数器自减1
		} catch (IOException e) {
			countDown.countDown();//计数器自减1
			e.printStackTrace();
		}
	}
}

 

主线程代码:

@RequestMapping(value="/generate_file2")
public void generateFile2(){
	final int threadNum = 20;//子线程数
	File file = new File("D://"+System.currentTimeMillis()+".txt");
	if(!file.exists()){
		try {
			file.createNewFile();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	AtomicInteger pageNo = new AtomicInteger(1);//页码
	CountDownLatch countDown = new CountDownLatch(threadNum);//计数器
	ExecutorService pool = Executors.newFixedThreadPool(threadNum);//固定大小线程池
	
	FileWriter filewriter = null;
	BufferedWriter bufferwriter = null;
	try {
		filewriter = new FileWriter(file, true);
		bufferwriter = new BufferedWriter(filewriter);
		for(int i=0; i<threadNum; i++)
			pool.execute(new WriteFileThread2("线程"+(i+1), itemMapper, pageNo, countDown, bufferwriter));
		countDown.await();//阻塞主线程
	} catch (Exception e) {
		e.printStackTrace();
	}finally{
		try {
			bufferwriter.close();
			filewriter.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

 

注意:

  • 每个子线程必须公用主线程中的文件缓冲流;若子线程各自使用自己的文件缓冲流,在线程刷出缓冲流数据时出现了碰撞,会导致写入的数据内容窜了。
  • 在我的程序中用了C3P0的数据源,需要注意设置最大可用连接数(maxPoolSize)及获取连接时等待超时时间(checkoutTimeout)以确保请求时不会出现超时异常。
  • 因主线程感知不到子线程出现异常的情况,所以子线程出现异常时也需要减计数器,否则主线程会一直被阻塞。

    按照正常的业务,当子线程中出现异常时,首先需要进行自我修复(例如:出现数据库连接异常时,可重新获取连接,重新进行查询操作);

    若修复不成功(例如:数据本身存在问题),需要立即通知主线程,并且终止掉其他子线程。在这里可以简单实现成:在主线程中增加原子布尔值(AtomicBoolean)作为是否异常的状态标志位,每个子线程在循环时进行检查;若出现异常,计数器减一并跳出当前线程即可。

分享到:
评论

相关推荐

    正确使用多线程同步锁@synchronized()1

    在iOS开发中,多线程同步是保证代码安全的关键技术之一。`@synchronized`关键字是Apple提供的一种简便的同步机制,它可以帮助开发者确保在多线程环境下对共享资源进行原子性操作,防止数据竞争问题。本文将深入探讨`...

    多线程通信读写文件

    通过选择合适的同步机制,合理调度线程,以及正确处理异常和资源管理,我们可以构建出高效且可靠的多线程文件操作程序。学习和实践这些概念对于任何IT专业人员来说都是必要的,无论是在服务器端开发、大数据处理还是...

    Android多线程之同步锁的使用

    本文主要介绍了Android多线程之同步锁的使用,分享给大家,具体如下: 一、同步机制关键字synchronized 对于Java来说,最常用的同步机制就是synchronized关键字,他是一种基于语言的粗略锁,能够作用于对象、函数...

    java 多线程synchronized互斥锁demo

    标题中的"java 多线程synchronized互斥锁demo"指的是一个示例,展示了如何在多线程环境下使用`synchronized`关键字创建互斥锁,确保同一时间只有一个线程可以访问特定的代码块或方法。 描述中的"一个多线程访问的同...

    多线程对文件读写操作(java)

    Java提供了一些同步机制,如`synchronized`关键字、`Lock`接口(如`ReentrantLock`)以及`java.util.concurrent`包下的工具类,这些都可以用来确保多线程访问文件时的数据一致性。 3. **使用BufferedReader和...

    Java 同步锁(synchronized)详解及实例

    Java中的同步锁,即`synchronized`关键字,是Java多线程编程中用于解决并发问题的重要机制。它确保了对共享资源的互斥访问,防止数据的不一致性。当我们有多线程环境并涉及到共享数据时,可能会出现竞态条件,就像...

    java多线程文件传输

    Java多线程文件传输是Java编程中一个重要的实践领域,特别是在大数据处理、网络通信和分布式系统中。在Java中,多线程可以提高程序的执行效率,尤其在处理并发任务时,如大文件的上传、下载和传输。下面将详细探讨...

    基于Java synchronized同步锁实现线程交互.pdf

    Java synchronized同步锁可以解决多线程带来的问题,保证同一时刻只有一个线程操作同一资源。使用wait()和notify()方法可以切换线程状态,实现线程交互。因此,在使用Java多线程的场景中,请充分理解Java线程各状态...

    一个多线程同步读写的小程序

    在这个“一个多线程同步读写的小程序”中,我们看到开发者尝试通过创建读线程和写线程来同时进行数据的读取和写入,以优化程序的执行流程。 首先,让我们深入理解多线程的概念。线程是操作系统分配处理器时间的基本...

    java实现多线程文件传输

    10. **安全性**:在多线程环境下,文件操作需注意文件锁,避免同一时刻多个线程修改同一文件,造成数据混乱。 通过以上知识点的综合运用,我们可以构建一个高效、安全的多线程文件传输系统。实际编程中,还需要根据...

    Java 同步锁 wait notify 学习心得

    标题和描述概述的知识点主要集中在Java的多线程机制中,特别是`wait`和`notify`方法在同步锁中的应用。这些方法对于控制线程之间的交互至关重要,尤其是在资源有限或需要确保数据一致性的情况下。 ### Java同步锁...

    支持10000同步锁,Spring Boot,Java

    在Java编程语言中,同步锁(Synchronized)是多线程环境下确保数据一致性的重要机制。在Spring Boot框架中,我们同样可以有效地利用同步锁来处理并发问题。标题和描述提到的"支持10000同步锁"可能是指在特定场景下,...

    java多线程之并发锁

    Java 多线程之并发锁 Java 中的多线程编程是指在一个程序中同时运行多个线程,以提高程序的执行效率和响应速度。在多线程编程中,线程间的同步是非常重要的,因为不同的线程可能会同时访问同一个共享资源,导致数据...

    Java 多线程同步 锁机制与synchronized深入解析

    总结来说,Java的synchronized关键字提供了锁机制,用于实现多线程环境下的同步。同步方法和同步代码块是两种不同级别的同步控制,同步代码块提供了更灵活的控制和更高的效率。理解并合理使用这些机制对于编写高效、...

    多线程接收文件例子

    2. **同步与互斥**:在多线程环境中,为了防止数据竞争,可能需要使用锁或其他同步机制。例如,在Java中,可以使用`synchronized`关键字,Python中可以使用`Lock`对象。这样确保在任何时候只有一个线程访问特定的...

    12丨多线程之锁优化(上):深入了解Synchronized同步锁的优化方法.html

    12丨多线程之锁优化(上):深入了解Synchronized同步锁的优化方法.html

    java锁机制Synchronizedjava锁机制Synchronized

    Java 锁机制 Synchronized 是 Java 语言中的一种同步机制,用于解决多线程并发访问共享资源时可能出现的一些问题。 Java 锁机制 Synchronized 的概念 在 Java 中,每个对象都可以被看作是一个大房子,其中有多个...

    Java、Android多线程、线程安全、线程同步

    NoHttp核心架构之多线程通信、线程安全、线程同步;synchronized锁,Lock锁;具体讲解请移步博客:http://blog.csdn.net/yanzhenjie1003/article/details/50992468

Global site tag (gtag.js) - Google Analytics