接上篇并行计算框架的Java实现--系列一 。
增加对结果的处理:
1、修改Job,实现Callable接口
public abstract class Job implements Callable<Object> {
@Override
public Object call() throws Exception {
Object result = this.execute();//执行子类具体任务
synchronized (Executer.LOCK) {
//处理完业务后,任务结束,递减线程数,同时唤醒主线程
Executer.THREAD_COUNT--;
Executer.LOCK.notifyAll();
}
return result;
}
/**
* 业务处理函数
*/
public abstract Object execute();
}
2、修改Executer,增加对结果的处理
public class Executer {
//计算已经派发的任务数(条件谓词)
public static int THREAD_COUNT = 0;
//存储任务的执行结果
private List<Future<Object>> futres = new ArrayList<Future<Object>>();
//条件队列锁
public static final Object LOCK = new Object();
//线程池
private ExecutorService pool = null;
public Executer() {
this(1);
}
public Executer(int threadPoolSize) {
pool = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 任务派发
* @param job
*/
public void fork(Job job){
//将任务派发给线程池去执行
futres.add(pool.submit(job));
//增加线程数
synchronized (LOCK) {
THREAD_COUNT++;
}
}
/**
* 统计任务结果
*/
public List<Object> join(){
synchronized (LOCK) {
while(THREAD_COUNT > 0){//检查线程数,如果为0,则表示所有任务处理完成
System.out.println("threadCount: "+THREAD_COUNT);
try {
LOCK.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
List<Object> list = new ArrayList<Object>();
//取出每个任务的处理结果,汇总后返回
for (Future<Object> future : futres) {
try {
Object result = future.get();//因为任务都已经完成,这里直接get
list.add(result);
} catch (Exception e) {
e.printStackTrace();
}
}
return list;
}
}
3、测试:
public static void main(String[] args) {
//初始化任务池
Executer exe = new Executer(5);
//初始化任务
long time = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
MyJob job = new MyJob();
exe.fork(job);//派发任务
}
//汇总任务结果
List<Object> list = exe.join();
System.out.println("Result: "+list);
System.out.println("time: "+(System.currentTimeMillis() - time));
}
4、执行结果:
threadCount: 10
running thread id = 9
running thread id = 11
running thread id = 8
running thread id = 10
running thread id = 12
threadCount: 5
running thread id = 9
running thread id = 8
running thread id = 11
running thread id = 12
running thread id = 10
Result: [8, 9, 10, 11, 12, 8, 11, 12, 9, 10]
time: 2000
5、附件是完整代码
分享到:
相关推荐
这个案例展示了如何将一个原本复杂的并行计算框架精简为两个关键类,从而实现更简洁、高效的编程模型。 首先,我们要理解并行计算的基本概念。并行计算是指同时使用多个处理器或计算机来处理任务,以减少总体完成...
2. **Java的并发模型**:Java提供了丰富的并发工具,如`Thread`类、`ExecutorService`、`Future`、`Callable`、`CyclicBarrier`、`Semaphore`等,以及并发集合如`ConcurrentHashMap`、`CopyOnWriteArrayList`等,...
在压缩包文件`parallelTest`中,可能包含了实现上述并行计算的Java代码示例,可以作为学习和实践Java 8并行计算的参考。通过分析和理解这些代码,可以进一步深入理解并行计算的工作机制和优化技巧。总的来说,Java 8...
综上所述,Java并行计算涵盖了从基础的多线程到高级的分布式计算框架,开发者可以通过这些工具和概念,设计和实现高效的并行程序,处理大规模的数据和计算任务。这些论文和资料应该会深入探讨这些主题,并可能包含...
JET(Java Environment for Tasks)平台是一个旨在简化基于Web的并行计算过程的软件框架。它通过以下方式实现了高效的并行计算: - **Java Applet支持**:利用Java Applet作为客户端界面,用户可以通过简单的Web...
在大数据环境下,针对数据型统计分析系统性能劣化明显、不能满足用户使用需求的问题,本文提出了一种轻量级高性能对象化并行计算架构,研制了该架构的对象服务组件、对象管理服务组件和客户端代理组件,并将该架构和...
流可以进行过滤、映射、归约等一系列操作,支持串行和并行计算。例如,`list.stream().filter(x -> x > 10).collect(Collectors.toList())`将筛选出列表中大于10的元素。 4. **默认方法**:接口在Java 8中引入了...
在实际应用中,Amino通常与高级并行编程模型如C++的std::thread库、Java的Fork/Join框架或Actor模型结合使用,以实现更复杂的工作负载调度和任务分解。开发者可以通过阅读Amino的文档、示例代码和相关教程,学习如何...
算法中的并行使用java的Fork / Join框架实现,他会将进程使用ForkJoinPool进行管理,并自动分配到空闲的CPU核心上来运算。由于个人PC的CPU核心数量较少,所以预期至多能产生常数倍的加速 效果。 本次实验使用的实验...
在处理大规模数据时,还可以考虑使用分布式计算框架如 Apache Spark 来提升效率。 综上所述,通过 Java 和相关库,我们可以有效地在 MySQL 数据库中应用 K-Means 算法,进行数据聚类,并将结果存入新表,从而实现对...
2. **并行计算结构**: - 处理器阵列:一种并行计算硬件结构,包含多个简单的处理单元,通过共享内存进行通信。 - 多核处理器:单个芯片上的多个独立处理器核心,可在同一系统上实现并行。 - 分布式系统:由多个...
在Java并发编程中,无锁并行计算框架如Disruptor提供了一种高效且低延迟的方式来处理高并发场景。无锁技术避免了线程之间的竞争条件,从而提升了多线程环境下的性能。本节我们将深入探讨Disruptor框架以及与...
MPJ作为一种将MPI引入Java的并行编程框架,极大地丰富了Java在并行计算领域的应用。通过上述介绍,我们可以看到MPJ不仅在设计上充分考虑了并行计算的特点,还在实现机制和技术特征上做出了诸多创新。对于希望从事...
#### 二、并行计算的关键概念 - **并发**:指的是多个任务可以在同一时间段内运行,但不一定是同时执行。 - **并行**:指的是一段时间内多个任务真正地同时执行。 - **分布式计算**:将任务分散到网络中的多台...
在实际应用中,除了Java,还有其他并行计算框架,如Hadoop和Spark,它们适用于大数据处理和分布式计算。对于更复杂的并行算法,如MapReduce,可以处理更大规模的数据并行计算。 总的来说,这个实验为学习者提供了一...
7. **MapReduce**:Google提出的分布式计算框架,用于大规模数据集的并行处理,由Hadoop等开源项目实现。 8. **Actor模型**:每个Actor是独立的计算实体,通过异步消息传递进行通信,确保并发安全,如Erlang和Akka...
对于大数据集,可以考虑使用多线程或分布式计算框架(如Apache Spark)来并行化K-Means算法,以提升性能。 8. **错误处理和调试**: Java实现应包含适当的异常处理,以防止数据读取错误、无效输入等情况导致程序...
8. **并发改进**:Java 7对并发API进行了一些优化,如Fork/Join框架,用于实现高效的并行计算。此外,`ConcurrentHashMap`的性能也有所提升。 9. **改进的数组初始化**:Java 7允许在数组初始化时使用紧凑的语法,...
- **并行化**:为了提高效率,可以利用多线程或分布式计算框架,如Java并发库或Apache Spark进行并行处理。 - **异常处理**:处理可能的数据清洗问题,如缺失值或异常值。 在实际应用中,K-Means算法有其局限性,如...