`
378629846
  • 浏览: 215029 次
  • 性别: Icon_minigender_1
  • 来自: 哈尔滨
社区版块
存档分类
最新评论

并行计算框架的Java实现--系列一

    博客分类:
  • java
阅读更多

最近的工作需要统计一些复杂的报表,为了提高效率,想用多线程去实现,但要在所有线程完成统计任务后,将结果汇总。所以在思考有没有什么办法解决,之所以是“系列一”是因为我想记录下我的思考过程。

1、首先设计一个Executer,负责任务的执行和汇总:

public class Executer {
	//计算已经派发的任务数(条件谓词)
	public static int THREAD_COUNT = 0;
	//线程池
	private Executor pool = null;
	public Executer() {
		this(1);
	}
	public Executer(int threadPoolSize) {
		pool = Executors.newFixedThreadPool(threadPoolSize);
	}
	/**
	 * 任务派发
	 * @param job
	 */
	public void fork(Job job){
		//将任务派发给线程池去执行
		pool.execute(job);
		THREAD_COUNT++;
	}
	/**
	 * 统计任务结果
	 */
	public void join(){
		while(THREAD_COUNT > 0){
			System.out.println("threadCount: "+THREAD_COUNT);
			try {
				wait();//如果任务没有全部完成,则挂起
			} catch (Exception e) {}//这里总是抛异常,不知道为什么,好吧!先不管它
		}
	}
}

 2、写一个抽象的Job类,负责执行具体的任务

public abstract class Job implements Runnable {

	@Override
	public void run() {
		this.execute();//执行子类具体任务
		Executer.THREAD_COUNT--;
		try{
			notifyAll();//这里总是抛异常,不知道为什么,好吧!先不管它
		}catch(Exception e){}
	}
	/**
	 * 业务处理函数
	 */
	public abstract void execute();

}

 

3、测试,先来一个具体的任务实现。

public class MyJob extends Job {

	@Override
	public void execute() {
		//模拟业务需要处理1秒.
		try {Thread.sleep(1000);} catch (InterruptedException e) {}
		System.out.println("running thread id = "+Thread.currentThread().getId());
	}

}

 

4、测试。

public class Test {
	public static void main(String[] args) {
		//初始化任务池
		Executer exe = new Executer(5);
		//初始化任务
		long time = System.currentTimeMillis();
		for (int i = 0; i < 10; i++) {
			MyJob job = new MyJob();
			exe.fork(job);//派发任务
		}
		//汇总任务结果
		exe.join();
		System.out.println("time: "+(System.currentTimeMillis() - time));
	}

}

 

 5、好吧,看一下结果

 

threadCount: 10
......(表示有N多个)
threadCount: 10
running thread id = 8
running thread id = 9
running thread id = 11
running thread id = 10
running thread id = 12
threadCount: 5
......(表示有N多个)
threadCount: 5
running thread id = 9
running thread id = 10
running thread id = 12
running thread id = 8
running thread id = 11
threadCount: 3
time: 2032

 哈哈,看来是可以了,最后汇总任务的处理时间是2032毫秒,看来是比单个任务顺序执行来的快。但是有几个问题:

1)如果没有catch那个超级Exception的话,就会抛下面的异常:

java.lang.IllegalMonitorStateException
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:485)
	at com.one.Executer.join(Executer.java:38)
	at com.test.Test.main(Test.java:21)

 

2)为啥会打印N多个同样值threadCount呢?

于是和同事(河东)沟通,他说wait要放在synchronized里面才行,好吧,试一下,改进一下Executer和Job

 

public class Executer {
	//计算已经派发的任务数(条件谓词)
	public static int THREAD_COUNT = 0;
	//条件队列锁
	public static final Object LOCK = new Object();
	//线程池
	private Executor pool = null;
	public Executer() {
		this(1);
	}
	public Executer(int threadPoolSize) {
		pool = Executors.newFixedThreadPool(threadPoolSize);
	}
	/**
	 * 任务派发
	 * @param job
	 */
	public void fork(Job job){
		//将任务派发给线程池去执行
		pool.execute(job);
		//增加线程数
		synchronized (LOCK) {
			THREAD_COUNT++;
		}
	}
	/**
	 * 统计任务结果
	 */
	public void join(){
		synchronized (LOCK) {
			while(THREAD_COUNT > 0){
				System.out.println("threadCount: "+THREAD_COUNT);
				try {
					LOCK.wait();//如果任务没有全部完成,则挂起
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

 

public abstract class Job implements Runnable {

	@Override
	public void run() {
		this.execute();//执行子类具体任务
		synchronized (Executer.LOCK) {
			//处理完业务后,任务结束,递减线程数,同时唤醒主线程
			Executer.THREAD_COUNT--;
			Executer.LOCK.notifyAll();
		}
	}
	/**
	 * 业务处理函数
	 */
	public abstract void execute();

}

 6、测试一下:

threadCount: 10
running thread id = 8
running thread id = 11
running thread id = 9
threadCount: 7
running thread id = 10
threadCount: 6
running thread id = 12
threadCount: 5
running thread id = 11
running thread id = 12
running thread id = 10
threadCount: 2
running thread id = 9
running thread id = 8
threadCount: 1
time: 2016

 还真的行,谢谢河东哈!

但是原因是什么呢?回去查了查书《Java并发编程实践》,见附件!

第14.2.1节这样说:

在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。

...

由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用wait,并在每次迭代中都测试条件谓词。

14.2.4节:

由于在调用notify或notifyAll时必须持有条件队列对象的锁,而如果这些等待中线程此时不能重新获得锁,那么无法从wait返回,因此发出通知的线程应该尽快地释放,从而确保正在等待的线程尽可能尽快的解除阻塞。

 

看来之前是不会用wait和notify,哈哈~!

 

感谢河东,和你交流收获很大!

 

顺便测试一下java多线程情况下,多核CPU的利用率,修改上面的线程池大小和任务数(2个线程处理1000000个任务,去掉MyJob的sleep(这样可以多抢些CPU时间),结果如下:

 

看来window下是可以利用多核的,虽然是一个JVM进程。之前和斯亮讨论的结论是错误的。

分享到:
评论
3 楼 378629846 2014-04-18  
xinke0802 写道
将多线程的结果写入同一个文件的时候,遇到这样的问题~
java.io.IOException: Stream closed;
不知道楼主遇到没有啊~~

多线程写入同一个文件,要加锁的。可以贴代码上来看看么?
2 楼 xinke0802 2014-03-21  
将多线程的结果写入同一个文件的时候,遇到这样的问题~
java.io.IOException: Stream closed;
不知道楼主遇到没有啊~~
1 楼 Ramls 2013-02-28  
为什么执行完毕后,程序还是挂起的呢?

相关推荐

    并行计算框架的Java实现--系列二

    并行计算框架的Java实现是现代软件开发中的一个重要领域,特别是在大数据处理、机器学习和高性能计算等场景下。本系列的第二部分将深入探讨如何利用Java语言构建并行计算框架,以提高程序的运行效率。在本文中,我们...

    java8并行计算示例--可动态配置--简化框架结构--并行计算框架从五个类简化到两个类

    这个案例展示了如何将一个原本复杂的并行计算框架精简为两个关键类,从而实现更简洁、高效的编程模型。 首先,我们要理解并行计算的基本概念。并行计算是指同时使用多个处理器或计算机来处理任务,以减少总体完成...

    JAVAOpenMP并行计算框架

    OpenMP通常与C++、Fortran等语言关联,用于实现多线程并行计算,但Java作为广泛使用的跨平台语言,也有其并行计算的需求。然而,Java本身并没有直接支持OpenMP的标准库,因为Java的并发模型主要基于JVM(Java虚拟机...

    java8并行计算示例

    Java 8 是一个重要的 Java 发行版本,引入了许多新特性,其中并行计算是其一大亮点。本示例通过利用Java 8的并行流(Parallel Stream)特性,显著提升了计算效率,具体体现在对1到400亿数列求和的问题上。传统顺序...

    JAVA并行计算的一些资料 论文

    综上所述,Java并行计算涵盖了从基础的多线程到高级的分布式计算框架,开发者可以通过这些工具和概念,设计和实现高效的并行程序,处理大规模的数据和计算任务。这些论文和资料应该会深入探讨这些主题,并可能包含...

    一种对象化并行计算框架

    在大数据环境下,针对数据型统计分析系统性能劣化明显、不能满足用户使用需求的问题,本文提出了一种轻量级高性能对象化并行计算架构,研制了该架构的对象服务组件、对象管理服务组件和客户端代理组件,并将该架构和...

    基于Web的Java并行计算

    JET(Java Environment for Tasks)平台是一个旨在简化基于Web的并行计算过程的软件框架。它通过以下方式实现了高效的并行计算: - **Java Applet支持**:利用Java Applet作为客户端界面,用户可以通过简单的Web...

    OnJava8-Examples-3.0_soucecode_java_

    流可以进行过滤、映射、归约等一系列操作,支持串行和并行计算。例如,`list.stream().filter(x -&gt; x &gt; 10).collect(Collectors.toList())`将筛选出列表中大于10的元素。 4. **默认方法**:接口在Java 8中引入了...

    JAVA并发编程深度学习-无锁并行计算框架1

    在Java并发编程中,无锁并行计算框架如Disruptor提供了一种高效且低延迟的方式来处理高并发场景。无锁技术避免了线程之间的竞争条件,从而提升了多线程环境下的性能。本节我们将深入探讨Disruptor框架以及与...

    无锁并行框架Amino -- Concurrent Building Blocks

    在实际应用中,Amino通常与高级并行编程模型如C++的std::thread库、Java的Fork/Join框架或Actor模型结合使用,以实现更复杂的工作负载调度和任务分解。开发者可以通过阅读Amino的文档、示例代码和相关教程,学习如何...

    分布式与并行计算—Java实现并向算法.ZIP

    算法中的并行使用java的Fork / Join框架实现,他会将进程使用ForkJoinPool进行管理,并自动分配到空闲的CPU核心上来运算。由于个人PC的CPU核心数量较少,所以预期至多能产生常数倍的加速 效果。 本次实验使用的实验...

    java实现k-means算法

    Java 实现 K-Means 算法是一个在数据挖掘领域常见的任务,它主要用于聚类分析,即将数据分组成不同的类别或簇。K-Means 是一种迭代算法,旨在找到数据点的最佳分配,使得每个簇内的数据点尽可能相似,而不同簇之间的...

    [并行计算——结构·算法·编程]

    并行计算是现代计算机科学中的一个关键领域,特别是在大数据处理和高性能计算中起着至关重要的作用。《并行计算——结构·算法·编程》一书由陈国良教授撰写,旨在深入探讨并行计算的基本原理、算法设计以及编程实践...

    MPJ并行编程框架的实现及安装配置.pdf

    MPJ作为一种将MPI引入Java的并行编程框架,极大地丰富了Java在并行计算领域的应用。通过上述介绍,我们可以看到MPJ不仅在设计上充分考虑了并行计算的特点,还在实现机制和技术特征上做出了诸多创新。对于希望从事...

    pi.rar_PI_java 并行计算_并行计算 pi

    在实际应用中,除了Java,还有其他并行计算框架,如Hadoop和Spark,它们适用于大数据处理和分布式计算。对于更复杂的并行算法,如MapReduce,可以处理更大规模的数据并行计算。 总的来说,这个实验为学习者提供了一...

    并行计算的介绍

    #### 一、并行计算概览 并行计算是一种将计算任务分割成多个部分,并在多个处理器上同时执行的方法。这种方法可以显著提高处理大规模数据集或复杂问题的速度。并行计算是现代高性能计算的基础,广泛应用于科学计算...

    并行计算--并发构造纵览

    并行计算是现代计算机科学中的一个重要领域,它涉及到在多个处理器、核心或计算机之间同时处理任务,以提高计算效率和性能。并发构造是实现并行计算的关键技术,它允许程序的不同部分在同一时间运行,从而充分利用...

    K-Means算法java实现

    对于大数据集,可以考虑使用多线程或分布式计算框架(如Apache Spark)来并行化K-Means算法,以提升性能。 8. **错误处理和调试**: Java实现应包含适当的异常处理,以防止数据读取错误、无效输入等情况导致程序...

    java72-java-advance.zip

    8. **并发改进**:Java 7对并发API进行了一些优化,如Fork/Join框架,用于实现高效的并行计算。此外,`ConcurrentHashMap`的性能也有所提升。 9. **改进的数组初始化**:Java 7允许在数组初始化时使用紧凑的语法,...

    java-1.7.0-openjdk

    - **多线程并发工具的增强**:包括Fork/Join框架,用于并行计算,以及新的并发集合类。 - **类型推断**:在泛型中引入类型推断,简化代码编写。 - **字符串内联**:对字符串操作进行优化,提高性能。 - **动态...

Global site tag (gtag.js) - Google Analytics