`

Master/Worker 模式

 
阅读更多
package test;

import java.util.Random;

public class Main {

	public static void main(String[] args) {
		System.out.println("我的机器可用Processor数量:" + Runtime.getRuntime().availableProcessors());
		Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());
		Random r = new Random();
		for(int i = 1; i<= 100; i++){
			Task t = new Task();
			t.setId(i);
			t.setName("任务"+i);
			t.setPrice(r.nextInt(1000));
			master.submit(t);
		}
		master.execute();
		long start = System.currentTimeMillis();
		while(true){
			if(master.isComplete()){
				long end = System.currentTimeMillis() - start;
				int ret = master.getResult();
				System.out.println("最终结果:" + ret + ", 执行耗时:" + end);
				break;
			}
		}
		
	}
}

 

package test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {

	//1 应该有一个承装任务的集合
	private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
	
	//2 使用HashMap去承装所有的worker对象
	private HashMap<String, Thread> workers = new HashMap<String, Thread>();
	
	//3 使用一个容器承装每一个worker并非执行任务的结果集
	private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
	
	//4 构造方法
	public Master(Worker worker, int workerCount){
		// 每一个worker对象都需要有Master的引用 workQueue用于任务的领取,resultMap用于任务的提交
		worker.setWorkerQueue(this.workQueue);
		worker.setResultMap(this.resultMap);
		for(int i = 0 ; i < workerCount; i++){
			//key表示每一个worker的名字, value表示线程执行对象
			workers.put("子节点" + Integer.toString(i), new Thread(worker));
		}
	}
	
	//5 提交方法
	public void submit(Task task){
		this.workQueue.add(task);
	}
	
	//6 需要有一个执行的方法(启动应用程序 让所有的worker工作)
	public void execute(){
		for(Map.Entry<String, Thread> me : workers.entrySet()){
			me.getValue().start();
		}
	}

	//8 判断线程是否执行完毕
	public boolean isComplete() {
		for(Map.Entry<String, Thread> me : workers.entrySet()){
			if(me.getValue().getState() != Thread.State.TERMINATED){
				return false;
			}
		}		
		return true;
	}

	//9 返回结果集数据
	public int getResult() {
		int ret = 0;
		for(Map.Entry<String, Object> me : resultMap.entrySet()){
			//汇总的逻辑..
			ret += Integer.parseInt((String)me.getValue());
		}
		return ret;
	}
}

 

package test;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable {

	private ConcurrentLinkedQueue<Task> workQueue;
	private ConcurrentHashMap<String, Object> resultMap;
	
	public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
		this.workQueue = workQueue;
	}

	public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
		this.resultMap = resultMap;
	}
	
	
	@Override
	public void run() {
		while(true){
			Task input = this.workQueue.poll();
			if(input == null) break;
			//真正的去做业务处理
			Object output = MyWorker.handle(input);
			this.resultMap.put(Integer.toString(input.getId()), output);
		}
	}
	
	public static Object handle(Task input) {
		return null;
	}


}

 

package test;

public class Task {
	private int id ;
	private String name;
	private int price;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getPrice() {
		return price;
	}
	public void setPrice(int price) {
		this.price = price;
	}
	
	
}

 

package test;

public class MyWorker extends Worker {
	
	public static Object handle(Task input) {
		Object output = null;
		try {
			//表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库...
			Thread.sleep(500);
			output = input.getPrice();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return output;
	}
}

 

分享到:
评论

相关推荐

    结合 Etcd 与 MongoDB 实现一个基于 Master-Worker 分布式 架构的 crontab 系统.zip

    在构建分布式系统时,Master-Worker架构是一种常见的设计模式,特别是在需要执行定时任务的场景下。本项目结合Etcd和MongoDB实现了一个基于这种架构的crontab系统,旨在提供可扩展、高可用的定时任务调度解决方案。...

    php实现 master-worker 守护多进程模式的实例代码

    这是master-worker模式自我恢复能力的关键部分。 4. **进程管理**: - 通过`ps -aux`命令可以看到运行的进程状态。在示例中,master_process和多个worker_process清晰可见。 - 当手动杀死一个worker进程(如kill ...

    Spark2.3.0-Hadoop2.7.4集群部署

    - **Master/Worker模式**: Spark集群通常采用Master/Worker架构,其中Master节点负责任务调度,Worker节点执行具体计算任务。 - **部署方式**: 可选择Standalone模式或者YARN模式。本文重点介绍YARN模式下的部署。 ...

    spark入门教程

    1. **Spark架构和核心组件**:了解Spark的运行架构以及Master/Worker模式。 2. **RDD(弹性分布式数据集)**:掌握RDD的基本操作,如转换(Transformation)和行动(Action)。 3. **Spark SQL**:介绍如何使用Spark...

    Spark2.x+Python大数据机器学习实战视频课程

    - **Master/Worker模式**:在Spark集群中,有一个主节点(Master)负责调度任务并监控集群状态,多个工作节点(Worker)执行具体的计算任务。 - **容错机制**:通过数据冗余存储以及任务重试等方式确保分布式环境...

    master_worker_pattern:主工人模式演示

    Master-Worker是一个很好用的设计模式, 它可以把一个大工作分成很多小工作去执行, 等所有小工作都回来以后, 将所有的小工作结果做一个统整, 得到最终结果, 就很像专题小组有五个人, 把系统分成四等分, 每个人负责一...

    深港澳金融科技师考试.pdf

    - **Hadoop MapReduce架构**:通过Master/Worker模式实现大规模数据并行处理。 - **应用场景**:数据清洗、统计分析等。 **4.2 Spark** - **Apache Spark生态系统**:包含多个项目,如Spark SQL、MLlib等。 - **...

    基于MPI的分布式数据处理系统.pdf

    2. Master/Worker架构的应用:在分布式系统中,Master/Worker是常见的并行计算架构,用于管理计算作业的生命周期。Master节点负责将计算任务拆分、分配计算资源、以及监控任务执行等管理工作,而Worker节点则负责...

    GatewayWorker扩展压缩包

    GatewayWorker的工作模式有四种:'worker'(普通工作进程)、'gateway'(网关进程)、'business'(业务逻辑进程)和'register'(注册服务器)。网关进程负责与浏览器建立长连接,业务进程则处理业务逻辑,而注册...

    javaThreadTest

    Master Worker模式的核心思想是将一个大任务分解为许多小任务,由一个主进程(Master)分配给多个工作进程(Worker)。Master负责任务的调度、监控和协调,而Worker则专注于执行具体的任务。在Java中,我们可以使用...

    spark1.2.1常用模式部署运行

    ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://Master.hadoop:7077 ``` 可以通过 Web 界面 http://masterIP:8080 查看 Master 和 Worker 节点的状态。 Spark Component Spark 组件包括 ...

    Spark-master

    `org.apache.spark.deploy.master.Master`是Spark Master的主入口点,负责启动和监听网络接口,处理来自Worker和Driver的连接请求。此外,`org.apache.spark.rpc.RpcEnv`是Spark的远程过程调用(RPC)框架,用于...

    前端开源库-worker-client-server

    在前端开发领域,为了应对日益复杂的业务需求和性能挑战,开发者们不断探索新的技术与架构模式。"前端开源库-worker-client-server" 提供了一个这样的解决方案,它涉及到的主要知识点包括Web Worker、Client(客户端...

    基于通用计算的GPU-CPU协作计算模式研究.pdf

    【主从模型(Master/Worker)】是描述GPU-CPU协作计算的一种常见模型。在这个模型中,CPU作为“主”(Master),负责整体任务的管理和调度,而GPU作为“从”(Worker),执行由CPU分配的并行计算任务。这种模型有效...

    Laravel开发-elegant-laravel-worker

    8. **代码结构与设计模式**:"elegant-laravel-worker"可能采用了设计模式如观察者模式(Observer Pattern)来实现信号监听,以及策略模式(Strategy Pattern)来处理不同类型的停止逻辑。 9. **配置与部署**:部署...

    thinkphp5-Gatewayworker-web.zip

    2. **代码部署**:将“thinkphp5-Gatewayworker-web--master”文件夹上传至服务器,并根据项目需求配置相应的环境变量和配置文件。 3. **数据库配置**:创建数据库并导入项目的数据库脚本,根据实际设置调整数据库...

    基于Akka模拟实现Spark Standalone.pdf

    4. **心跳机制**:为了确保Worker节点的活性,Master和Worker之间需定期发送心跳消息。 5. **资源分配**:Master根据任务需求和Worker可用资源进行任务调度。 6. **故障恢复**:模拟实现中还需要考虑如何在Worker...

Global site tag (gtag.js) - Google Analytics