`
terrencexu
  • 浏览: 122521 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

10个线程同步处理1000行消息

    博客分类:
  • Java
阅读更多

多线程,并发,是经常遇到的问题,平时解决的方案也想过很多,比如说现在有1000行消息,需要开10个线程同时处理。

 

之前想过两个方案:

 

方案一: 一次开10个线程,每个线程处理一条消息,等10个线程全部处理结束之后,再开启下10个线程,直到全部处理完毕

缺陷:需要等待其他n - 1个线程结束后,才能同时启动下n个线程

 

方案二: 将1000行消息分割为10份,每100行用一个线程处理。

优点:无等待

缺陷: 分割不均,无法充分利用所有的线程

 

现在想想,上面两个缺点挺多,就又想了两种方案:

 

方案三:使用ConcurrentLinkedQueue<Task>存储所有的Task,然后同时开启n个线程读取Queue.

优点:充分利用所有线程,无等待

缺点:需要将所有的task转移到Queue中,消耗一倍内存

 

方案四:使用java.util.concurrent包,固定数量线程池。

优点:完美解决

缺点:当单个task执行时间很短的时候,线程池中的线程并不会被全部使用,这样很多task就会block在一个线程中,降低执行速率

 

下面贴出每个方案的代码实现,备忘吧,如果有更好的想法,或者更简单的方式,再继续补充~

 

 

public class Task {

	private int id;
	
	public Task(int id) {
		this.id = id;
	}
	
	public void start() {
		System.out.println(Thread.currentThread().getName() + ": start to handle task " + id);

		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
}

 

 

 

import java.util.LinkedList;
import java.util.List;

public class TaskProducer {

	public static List<Task> produce(int count) {
		List<Task> tasks = new LinkedList<Task>();
		
		for(int i = 0; i < count; i ++) {
			tasks.add(new Task(i + 1));
		}
		
		return tasks;
	}
	
}

 

 

 

import java.util.LinkedList;
import java.util.List;

/**
 * 策略1: 每次开启n个线程,等待n个线程全部结束之后,再开启下n个线程,每个线程处理一个task.
 * 缺陷:需要等待其他n - 1个线程结束后,才能同时启动下n个线程
 */
public class Strategy1 {

	public static void main(String[] args) {
		List<Task> tasks = TaskProducer.produce(1000);
		handleTasks(tasks, 10);
		System.out.println("All finished");
	}
	
	public static void handleTasks(List<Task> tasks, int threadCount) {
		int taskCount = tasks.size();
		
		List<Thread> threadHolder = new LinkedList<Thread>();
		for(int i = 0; i < taskCount; i += threadCount) {
			for(int j = 0; j < threadCount && (i + j) < taskCount; j ++) {
				Thread thread = new Thread(new TaskHandler(tasks.get(i + j)));
				threadHolder.add(thread);
				thread.start();
			}
			
			waitToFinish(threadHolder);
			threadHolder.clear();
		}
	}
	
	public static void waitToFinish(List<Thread> threadHolder) {
		while(true) {
			boolean allFinished = true;
			for(Thread thread : threadHolder) {
				allFinished = allFinished && !thread.isAlive();
			}
			
			if(allFinished) {
				break;
			}
		}
	}
	
	public static class TaskHandler implements Runnable {
		private Task task;
		
		public TaskHandler(Task task) {
			this.task = task;
		}

		@Override
		public void run() {
			task.start();
		}
	}
	
}

 

 

 

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

/**
 * 策略2: 将所有的task分割成n个子task列表,然后开启n个线程,每个线程处理一个子列表
 * 优点:无等待
 * 缺陷: 分割不均,无法充分利用所有的线程
 */
public class Strategy2 {

	public static void main(String[] args) {
		List<Task> tasks = TaskProducer.produce(1000);
		handleTasks(tasks, 10);
		System.out.println("All finished");
	}

	public static void handleTasks(List<Task> tasks, int threadCount) {
		List<List<Task>> splitTasks = splitTasksToNThreads(tasks, threadCount);

		List<Thread> threadHolder = new LinkedList<Thread>();
		for (List<Task> segment : splitTasks) {
			Thread thread = new Thread(new TaskHandler(segment));
			threadHolder.add(thread);
			thread.start();
		}
		
		waitToFinish(threadHolder);
	}
	
	public static void waitToFinish(List<Thread> threadHolder) {
		while(true) {
			boolean allFinished = true;
			for(Thread thread : threadHolder) {
				allFinished = allFinished && !thread.isAlive();
			}
			
			if(allFinished) {
				break;
			}
		}
	}
	
	public static List<List<Task>> splitTasksToNThreads(List<Task> tasks, int threadCount) {
		List<List<Task>> splitTasks = new ArrayList<List<Task>>(threadCount);

		int taskCount = tasks.size();
		int taskPerThread = new BigDecimal(taskCount).divide(new BigDecimal(threadCount), RoundingMode.CEILING).intValue();

		for (int i = 0; i < taskCount; i += taskPerThread) {
			List<Task> segment = new LinkedList<Task>();
			for (int j = 0; j < taskPerThread && (i + j) < taskCount; j++) {
				segment.add(tasks.get(i + j));
			}

			splitTasks.add(segment);
		}

		tasks.clear();
		
		return splitTasks;
	}
	
	public static class TaskHandler implements Runnable {
		private List<Task> tasks;

		public TaskHandler(List<Task> tasks) {
			this.tasks = tasks;
		}
		
		@Override
		public void run() {
			for (Task task : tasks) {
				task.start();
			}
		}
	}

}

 

 

 

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 策略3: 使用ConcurrentLinkedQueue<Task>存储所有的Task,然后同时开启n个线程读取Queue.
 * 优点:充分利用所有线程,无等待
 * 缺点:需要将所有的task转移到Queue中,消耗一倍内存
 */
public class Strategy3 {

	public static void main(String[] args) {
		List<Task> tasks = TaskProducer.produce(1000);
		handleTasks(tasks, 10);
		System.out.println("All finished");
	}
	
	public static void handleTasks(List<Task> tasks, int threadCount) {
		Queue<Task> taskQueue = new ConcurrentLinkedQueue<Task>();
		taskQueue.addAll(tasks);
		
		List<Thread> threadHolder = new LinkedList<Thread>();
		for(int i = 0; i < threadCount; i ++) {
			Thread thread = new Thread(new TaskHandler(taskQueue));
			threadHolder.add(thread);
			thread.start();
		}
		
		waitToFinish(threadHolder);
	}
	
	public static void waitToFinish(List<Thread> threadHolder) {
		while(true) {
			boolean allFinished = true;
			for(Thread thread : threadHolder) {
				allFinished = allFinished && !thread.isAlive();
			}
			
			if(allFinished) {
				break;
			}
		}
	}
	
	public static class TaskHandler implements Runnable {
		
		private final Queue<Task> tasks;
		
		public TaskHandler(Queue<Task> tasks) {
			this.tasks = tasks;
		}
		
		public void run() {
			while(!tasks.isEmpty()) {
				Task task = tasks.poll();
				if(task != null) {
					task.start();
				}
			}
		}
		
	}
	
}

 

 

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 策略4: 使用java.util.concurrent包,线程池。
 * 优点:完美解决。
 */
public class Strategy4 {

	public static void main(String[] args) {
		List<Task> tasks = TaskProducer.produce(1000);
		handleTasks(tasks, 10);
		System.out.println("All finished");
	}
	
	public static void handleTasks(List<Task> tasks, int threadCount) {
		try {
			ExecutorService executor = Executors.newFixedThreadPool(threadCount);
			
			for(Task task : tasks) {
				executor.submit(new TaskHandler(task));
			}
			
			executor.shutdown();
			executor.awaitTermination(60, TimeUnit.SECONDS);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static class TaskHandler implements Runnable {

		private Task task;
		
		public TaskHandler(Task task) {
			this.task = task;
		}
		
		public void run() {
			task.start();
		}
		
	}
	
}
6
3
分享到:
评论
9 楼 ultime 2013-01-10  
[b][/b][i][/i][u][/u][/color][size=large][/size][color=green][align=center][/align]   正好用到,谢谢lz了。
8 楼 terrencexu 2010-09-10  
jy1245626 写道
我是这么想的,要处理1000条消息,这的动作应该是一个异步的动作,就是说开一个线程出来让他执行这个动作,至于他什么时候执行完,就不管了。既然不管了就不用waitToFinish(threadHolder);了,然后我就注释掉它,速度就是瞬间了


呃,不是的,这里的意思就是,需要等所有的任务都执行完了,才能继续进行后续的操作,比如说我需要开3个线程,把所有的服务都启动起来了,才能进行后续的测试。
7 楼 jy1245626 2010-09-09  
我是这么想的,要处理1000条消息,这的动作应该是一个异步的动作,就是说开一个线程出来让他执行这个动作,至于他什么时候执行完,就不管了。既然不管了就不用waitToFinish(threadHolder);了,然后我就注释掉它,速度就是瞬间了
6 楼 terrencexu 2010-09-08  
jy1245626 写道
呵呵,我没啥好方法,多线程我也不熟,只是运行了第一种方法后,看第二种方法,有点受不了它的速度

我昨天自己跑了一下这几个case,在10的整数倍的task数的时候,第二个的运行速度是最快的,执行速度从快到慢依次是:2 > 3 > 4 > 1,你能帮忙告诉我你的test case吗?我想试试看看什么地方需要改进改进,非常感谢 
5 楼 jy1245626 2010-09-08  
呵呵,我没啥好方法,多线程我也不熟,只是运行了第一种方法后,看第二种方法,有点受不了它的速度
4 楼 terrencexu 2010-09-07  
jy1245626 写道
第二种方法的效率巨慢,

有没有比较好的改进方法?
3 楼 jy1245626 2010-09-07  
第二种方法的效率巨慢,
2 楼 terrencexu 2010-09-06  
lgd_java2eye 写道
楼主  List<Task> tasks = TaskProducer.produce(11);  
这个是创建11个任务,而不是创建10个任务,是不是写错了啊!!!!!!!!!!!!!!!

应该创建1000个任务~~
1 楼 lgd_java2eye 2010-09-06  
楼主  List<Task> tasks = TaskProducer.produce(11);  
这个是创建11个任务,而不是创建10个任务,是不是写错了啊!!!!!!!!!!!!!!!

相关推荐

    线程同步小例子

    在这个“线程同步小例子”中,可能包含创建和管理线程、使用上述同步机制以及处理线程间的通信等方面的内容。通过分析源代码,我们可以看到如何在实际程序中应用这些同步机制,如何避免数据竞争,以及如何在多线程...

    简单实现多线程同步示例(模拟购票系统)

    本示例“简单实现多线程同步示例(模拟购票系统)”旨在通过一个具体的实例,帮助开发者理解如何在Java中创建并管理多线程以及如何实现线程同步,确保数据的一致性和正确性。 首先,我们要明确多线程的基本概念。在...

    delphi中线程同步问题

    在 Delphi 中,多线程编程常常涉及到线程同步,以确保多个线程安全地访问共享资源或执行特定操作。`Synchronize` 方法是 Delphi 中用于在主线程中安全执行代码的一种机制,尤其适用于 UI 更新。然而,在 DLL 或 ...

    iOS两个线程间嵌套发送同步消息的demo

    线程间的通信,特别是同步消息的发送,是多线程编程中的一个重要概念。本篇将深入探讨“iOS两个线程间嵌套发送同步消息”的相关知识点。 首先,我们要理解线程的概念。线程是操作系统分配CPU时间的基本单位,一个...

    多线程的批量线程同步解决方案

    "多线程的批量线程同步解决方案"这个标题暗示我们探讨的是如何在多线程环境下有效地管理和同步多个任务,确保数据一致性与程序正确性。下面将详细阐述相关知识点。 一、多线程基础 多线程是指在一个进程中同时执行...

    线程同步解决火车站售票问题

    总的来说,通过线程同步,我们可以确保在多线程环境下正确、有序地处理共享资源,避免数据不一致性和资源竞争。在这个具体的火车站售票问题中,我们可以通过合理的设计和编程技术,实现高效且无冲突的售票系统。

    Delphi多线程同步的例子

    本文将深入探讨Delphi中的多线程和线程同步,并以"SortThreads"和"delphi-thread-gui"这两个示例项目为例,讲解如何在实践中应用这些概念。 1. **多线程**:多线程允许应用程序同时执行多个独立的任务,提高程序的...

    11-线程同步

    在IT领域,线程同步是多线程编程中一个至关重要的概念,特别是在处理并发操作时。线程同步是为了确保在多线程环境下,共享资源的访问有序进行,防止数据不一致性和竞态条件的发生。本章节将深入探讨线程同步的基础...

    winform 线程同步源码

    在Windows Forms(Winform)应用程序中,线程同步是一个关键概念,它确保了多线程环境中的数据一致性、避免竞态条件以及防止UI线程被阻塞。在本资源包中,你可能会找到一系列用于理解和实践Winform线程同步的源代码...

    c#线程同步的典型例子

    C#线程同步是多线程编程中的一个重要概念,它涉及到如何控制多个线程对共享资源的访问,以避免数据不一致性和竞态条件。在C#中,线程同步通常用于确保在某一时刻只有一个线程可以访问特定的代码块或资源,从而保证...

    线程同步代码集实例

    线程同步是多线程编程中的一个重要概念,它主要用于解决多个线程并发访问共享资源时可能出现的竞争条件问题,确保数据的一致性和完整性。在Windows API中,提供了多种线程同步机制,如Event、Mutex、Semaphore和...

    IOS线程管理,线程同步

    本文将深入探讨iOS线程管理,特别是线程的创建和线程同步,这些都是开发者需要掌握的基本知识。 首先,我们来理解一下线程的概念。线程是程序执行的最小单位,一个进程可以有多个线程并行执行,这样可以充分利用...

    iOS线程同步方案

    互斥锁是一种最基础的线程同步机制,用于保护共享资源不被多个线程同时访问。在iOS中,我们可以使用`NSLock`类来实现互斥锁。当一个线程获取了锁之后,其他试图获取该锁的线程将会被阻塞,直到持有锁的线程释放锁。...

    操作系统实验多线程同步(含C++源代码)

    操作系统中的多线程同步是一个关键概念,特别是在并发编程中,它涉及到如何协调多个线程以避免数据不一致性和竞态条件。在这个实验中,我们关注的是C++编程语言中的实现,以及操作系统如何处理线程的优先级。 首先...

    MFC中的多线程同步

    在IT行业中,尤其是在Windows开发领域,多线程同步是一个至关重要的概念。MFC(Microsoft Foundation Classes)是微软提供的一种C++库,它为构建Windows应用程序提供了丰富的类和接口。在这个主题中,我们将深入探讨...

    MFC多线程同步类的使用

    【MFC多线程同步类的使用】 在MFC(Microsoft Foundation Classes)中,多线程编程是一项重要的技术,尤其在开发复杂的、并发执行的任务时。多线程允许程序同时执行多个任务,提升效率和响应速度。然而,线程间的...

    线程与消息处理

    在IT行业中,线程与消息处理是Android应用开发中的核心概念,它们对于实现高效、响应式的用户界面至关重要。本文将深入探讨这两个主题,以及它们如何在Android系统中协同工作。 线程是操作系统分配CPU时间的基本...

    某电信项目多线程同步数据实例

    在IT行业中,尤其是在大型系统开发中,多线程同步数据是一个关键的技术环节,尤其是在处理大量实时数据的场景,如电信项目。"某电信项目多线程同步数据实例"的标题揭示了一个具体的应用案例,它表明在该电信项目中,...

    Windows的消息处理与多线程编程

    2. **线程同步**:为了防止多个线程同时访问共享资源导致数据不一致,需要进行线程同步。Windows提供了多种同步机制,如互斥量(Mutex)、信号量(Semaphore)、事件对象(Event)以及临界区(Critical Section)。 ...

Global site tag (gtag.js) - Google Analytics