`
yanguz123
  • 浏览: 568469 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

\(^_^)/ Java多线程Master-Worker模式

    博客分类:
  • Code
 
阅读更多

Master

package master_worker;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {
	// 任务队列
	protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
	// Worker进程队列
	protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
	// 子任务处理结果
	protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

	// 是否子任务都结束了
	public boolean isCompelete() {
		for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
			if (entry.getValue().getState() != Thread.State.TERMINATED) {
				return false;
			}
		}
		return true;
	}

	// Master构造,需要一个Worker进程逻辑,和需要Worker进程数量
	public Master(Worker worker, int countWorker) {
		worker.setWorkQueue(workQueue);
		worker.setResultMap(resultMap);
		for (int i = 0; i < countWorker; i++) {
			threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
		}
	}

	// 提交一个任务
	public void submit(Object job) {
		workQueue.add(job);
	}

	// 开始运行所有的Worker进程,进行处理
	public void execute() {
		for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
			entry.getValue().start();
		}
	}

	// 返回子任务结果集
	public Map<String, Thread> getThreadMap() {
		return threadMap;
	}

}

 

 

Worker

package master_worker;

import java.util.Map;
import java.util.Queue;

public abstract class Worker implements Runnable {
	// 任务队列,用于取得子任务
	protected Queue<Object> workQueue;
	// 子任务的处理结果
	protected Map<String, Object> resultMap;

	@Override
	public void run() {
		while (true) {
			// 获取子任务
			Object input = workQueue.poll();
			if (input == null)
				break;
			// 处理子任务
			Object result = handle(input);
			// 将处理结果放入结果集
			resultMap.put(Integer.toString(input.hashCode()), result);
		}
	}

	// 子任务处理逻辑,在子类中实现具体逻辑
	public abstract Object handle(Object input);

	public void setResultMap(Map<String, Object> resultMap) {
		this.resultMap = resultMap;
	}

	public void setWorkQueue(Queue<Object> workQueue) {
		this.workQueue = workQueue;
	}

}

 

 

PlusWorker

package master_worker;

public class PlusWorker extends Worker {

	public Object handle(Object input){
		Integer i = (Integer)input;
		return i*i*i;
	}
}

 

 

Test

package master_worker;

import java.util.Map;
import java.util.Set;

public class Test {
	public static void main(String[] args) {
		Master master = new Master(new PlusWorker(), 10);
		for (int i = 0; i < 1000; i++) {
			master.submit(i);
		}
		master.execute();
		int result = 0;
		Map<String, Object> resultMap = master.getResultMap();
		while (resultMap.size() > 0 || !master.isCompelete()) {
			Set<String> keys = resultMap.keySet();
			String key = null;
			for (String k : keys) {
				key = k;
				break;
			}
			Integer i = null;
			if (key != null) {
				i = (Integer) resultMap.get(key);
			}
			if (i != null) {
				result += i;
			}
			if (key != null) {
				resultMap.remove(key);
			}
		}
		System.out.println(result);
	}
}

 

分享到:
评论
2 楼 fiwrc 2017-06-18  
知道了. 谢谢~!
1 楼 fiwrc 2017-06-18  
求问  getResultMap  是什么方法???? ..  学习下..  谢谢

相关推荐

    java多线程.pdf

    Java多线程是一种允许同时执行多个线程的编程技术,它允许利用多核处理器的计算能力,提高应用程序的性能和响应能力。在介绍Java多线程的基本知识之前,首先提到的是Java多线程的实现方式。在Java中,创建线程主要有...

    java多线程源码-ExecuteFramework:java多线程框架核心源码重写

    本文将深入探讨“ExecuteFramework:java多线程框架核心源码重写”这一主题,通过分析和解释提供的ExecuteFramework-master源码,我们将理解如何构建一个自定义的多线程执行框架。 首先,我们需要了解Java中实现多...

    javaThreadTest

    总的来说,JavaThreadTest所涉及的知识点主要包括:Java多线程、线程池的概念与实现、Master Worker模式、任务提交与结果获取、线程间的通信以及分布式计算的基本原理。理解并掌握这些知识,对于开发高效、可扩展的...

    hera-master分布式任务调度源码 .zip

    - Java提供了丰富的并发工具类,如`ConcurrentHashMap`、`ReentrantLock`等,源码中会用到这些工具保证多线程环境下的数据安全和同步。 4. **网络通信**: - Hera-Master可能使用了基于Java NIO或Netty的网络库...

    Java并发编程实践.pdf

    * 并行模式:Amino将为应用程序提供一个或几个大家熟知的并行计算模式,如Master-Worker、Map-reduce、Divide and conquer、Pipeline等。 * 并行计算中的一般功能:Amino将为应用程序提供并行计算中常用的方法,如...

    sidekiq-master.zip_Java编程_Java_

    在Rails应用中,Sidekiq通过Redis作为后端存储,管理待处理任务的队列,并且使用多线程模型来并行执行任务。 Sidekiq的核心特点包括: 1. **高性能**:Sidekiq使用多线程模型,而不是Rails默认的单线程模型,从而...

    flink介绍及安装部署

    - **进程启动**:master 进程启动 Dispatcher 和 ResourceManager 线程,TaskManagers 注册至 ResourceManager。 - **高可用性**:Standalone 集群默认不具备高可用性。若需支持高可用,需进行额外配置。 - **...

    Frank_storm_java:使用java进行风暴测试

    这通常涉及设置配置参数,如worker进程数量、执行器线程数等。 6. **容错机制**:Storm具有强大的容错能力,如果某个任务失败,它会自动重新分配并重试,保证数据不丢失。 7. **本地模式**:在开发和测试阶段,...

    【Java面试系列】Nginx.pdf

    2. **创建Master和Worker进程**:Nginx启动后会产生一个Master进程和多个Worker进程。其中Master进程负责读取配置文件、维持Worker进程的运行状态;Worker进程则专门负责处理客户端请求。 3. **请求处理机制**:当...

    高性能java系统实现与调优

    - **多线程设计模式**: - **Future**:支持异步处理,通过`FutureTask`实现。 - **Master-Worker**:适用于可以分解的任务,提高并行处理效率。 - **Guarded Suspension**:在没有足够的资源时暂停执行,避免...

    chenfast-simple-tomcat-master.zip

    在BIO模式下,每个连接都需要一个单独的线程来处理,当并发连接数量较大时,系统会创建大量线程,消耗过多资源。在手写Tomcat的BIO部分,我们需要实现以下关键组件: 1. **Acceptor线程**:负责监听客户端的连接...

    gdk_workerclient

    3. **多线程**:Java中的`java.lang.Thread`是处理多线程的基础,而`java.util.concurrent`包提供了高级并发工具,如`Callable`、`Future`、`Executor`和`BlockingQueue`,这些可能在`gdk_workerclient`中被用来实现...

    Apache Spark源码走读之2 -- Job的提交与运行

    在单机模式下,可以通过命令行启动master和worker,并使用spark-shell来执行任务。在local模式下,Spark会直接在本地机器上运行,而在localcluster模式下,会模拟一个集群环境。 在Spark中,有一个核心概念叫做...

    JavaOmok:JavaStudy1-Omok

    例如,在多线程编程中,我们可以定义一个接口来处理线程结束的回调: ```java interface Callback { void onThreadFinished(); } class WorkerThread extends Thread { private Callback callback; public ...

    Java爬虫框架.pdf

    - **Scheduler**:启动爬虫并初始化TaskQueue,创建监控线程,当TaskQueue为空且所有Worker线程空闲时,程序在10分钟无变化后退出。 - **Task Master**:管理TaskQueue,可采用内存或集中式任务队列如SQLite或...

    Nginx面试专题.pdf

    Nginx采用Master-Worker多进程模型运行,其中Master进程负责读取和评估配置文件,并维持Worker进程的生命周期。Worker进程则实际处理请求。这种架构可以提高服务器的稳定性和可靠性,因为当一个Worker进程因处理请求...

Global site tag (gtag.js) - Google Analytics