`
不平凡的人
  • 浏览: 35244 次
  • 性别: Icon_minigender_1
  • 来自: 嘉峪关
社区版块
存档分类
最新评论

Master-worker模式

 
阅读更多

1、Master-worker模式:

      常用的并行计算模式。

      核心思想为:系统中有两个进程协同工作:Master进程,worker进程。

      Master:负责接收与分配任务,当worker(线程任务)执行完毕后,将返回的结果告知Master,Master对接收的任务进行汇总处理。

      Worker:为一个线程任务对象,处理Master分配的任务,并将任务保存,用于Master后续的处理。

 

2、流程图:



 

3、Master,worker实现说明

(1)Master中使用到的容器

  ConcurrentLinkedQueue:用户保存用户提交的任务,该容器通过无锁的方式实现高并发情况下高性能;

  HashMap:Master指定的线程对象放入该容器中,因为由Master初始化时自己决定线程个数,不存在线程安全问题;

   ConcurrentHashMap:存放worker线程任务执行完毕后的结果集,会存在多个线程共享数据问题,因此选择高并发情况下的高性能的容器;

 

(2)Worker:

 ①实现Runnable接口,其实质为一个线程任务对象;

②持有Master中存放任务队列(ConcurrentLinkedQueue)的引用;

③持有Master中,线程执行完毕存放任务数据的Map(ConcurrentHashMap)的引用;

 

4、代码示例

(1)Master类

package net.oschina.tkj.mulitcoding.mastrerworkmodel;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Master-work模式
 * 
 * 主类的功能:1,接收任务,2,将任务分配给worker 3,当前worker任务执行完成进行结果汇总处理
 * 
 * @author Freedom
 * 
 */
public class Master {

	// 用来存放任务的队列
	// ConcurrentLinkedList 并发时,队列的性能高
	private final ConcurrentLinkedQueue<Task> jobQueue = new ConcurrentLinkedQueue<>();

	// 存放worker的map
	// 此处,选择HashMap是因为wokers的数量是有Master决定,在Master初始化时已经设定好了worker数量,不会产生线程安全问题
	private final Map<String, Thread> workers = new HashMap<String, Thread>();

	// 存放任务的的map,会存在并发问题 ,因此要使用并发安全的map
	private final ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

	/**
	 * 初始化Master指定worker
	 * 
	 * @param w
	 *            worker
	 * @param jobCount
	 *            任务数量
	 */
	public Master(Worker w, int jobCount) {
		// worker 设置保存任务的queue引用以及任务执行完成的结果集map引用
		w.setQueue(jobQueue);
		w.setResultMap(resultMap);
		// 分配任务的线程
		for (int i = 1; i <= jobCount; i++) {
			workers.put(String.valueOf(i), new Thread(w));
		}
	}

	// 接受任务,Master中每次来一个任务就保存起来
	public void submit(Task t) {
		jobQueue.add(t);
	}

	// 分配,执行任务
	public void executer() {
		for (Map.Entry<String, Thread> map : workers.entrySet()) {
			map.getValue().start();
		}
	}

	// 线程执行完成
	public boolean isComplete() {

		for (Map.Entry<String, Thread> map : workers.entrySet()) {
			if (map.getValue().getState() != Thread.State.TERMINATED) {
				return false;
			}
		}

		return true;
	}

	// Master汇总,每个worker线程传来的结果数据
	public int getResutlts() {
		int price = 0;
		for (Map.Entry<String, Object> map : resultMap.entrySet()) {
			price += (int) map.getValue();
		}
		return price;
	}

}

 

(2)worker类

package net.oschina.tkj.mulitcoding.mastrerworkmodel;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * 线程任务
 * 
 * 1.持有Master中装任务的ConcurrentLinkedQueue的引用
 * 
 * 2.持有Master中返回结果的ConcurrentHashMap的引用
 * 
 * @author Freedom
 * 
 */
public class Worker implements Runnable {

	private ConcurrentLinkedQueue<Task> queue;

	private ConcurrentHashMap<String, Object> resultMap;

	public ConcurrentLinkedQueue<Task> getQueue() {
		return queue;
	}

	public void setQueue(ConcurrentLinkedQueue<Task> queue) {
		this.queue = queue;
	}

	public ConcurrentHashMap<String, Object> getResultMap() {
		return resultMap;
	}

	public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
		this.resultMap = resultMap;
	}

	@Override
	public void run() {

		Task t = null;
		Object o = null;
		while (true) {
			if (null == queue.poll()) {
				break;
			}
			t = queue.poll();
			// 处理任务的业务逻辑,很耗时
			o = handler(t);
			// worker处理好的任务放回到结果map中
			resultMap.put(String.valueOf(t.getId()), o);
		}

	}

	/*
	 * worker处理每一个任务的业务逻辑
	 */
	private Object handler(Task t) {

		Object o = t.getPrice();
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return o;
	}

}

(3)数据实体类

package net.oschina.tkj.mulitcoding.mastrerworkmodel;

public class Task {

	private int id;
	private int price;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public int getPrice() {
		return price;
	}

	public void setPrice(int price) {
		this.price = price;
	}

}

  

(4)Main类

package net.oschina.tkj.mulitcoding.mastrerworkmodel;

import java.util.Random;

public class Main {

	public static void main(String[] args) {

		Master master = new Master(new Worker(), 100);
		Random r = new Random();
		Task t = null;
		for (int i = 1; i <= 100; i++) {
			t = new Task();
			t.setId(i);
			t.setPrice(r.nextInt(300));
			master.submit(t);
		}

		master.executer();
		long start = System.currentTimeMillis();
		while (true) {
			if (master.isComplete()) {
				long end = System.currentTimeMillis();
				int priceR = master.getResutlts();
				System.out.println("最终执行结果price:" + priceR + " 执行时间:"
						+ (end - start));
				break;
			}
		}
	}

}

 

 

 

 

  • 大小: 84.1 KB
分享到:
评论

相关推荐

    结合 Etcd 与 MongoDB 实现一个基于 Master-Worker 分布式 架构的 crontab 系统.zip

    在构建分布式系统时,Master-Worker架构是一种常见的设计模式,特别是在需要执行定时任务的场景下。本项目结合Etcd和MongoDB实现了一个基于这种架构的crontab系统,旨在提供可扩展、高可用的定时任务调度解决方案。...

    php实现 master-worker 守护多进程模式的实例代码

    这是master-worker模式自我恢复能力的关键部分。 4. **进程管理**: - 通过`ps -aux`命令可以看到运行的进程状态。在示例中,master_process和多个worker_process清晰可见。 - 当手动杀死一个worker进程(如kill ...

    Laravel开发-elegant-laravel-worker

    8. **代码结构与设计模式**:"elegant-laravel-worker"可能采用了设计模式如观察者模式(Observer Pattern)来实现信号监听,以及策略模式(Strategy Pattern)来处理不同类型的停止逻辑。 9. **配置与部署**:部署...

    nodejs 内部资料 进程管理

    本篇内部资料主要围绕Node.js的进程管理,特别是Master-Worker模式和进程间通信(IPC)进行详细讲解。 首先,进程管理服务模型经历了多个阶段。从同步到复制进程,再到多线程,直至现在的事件驱动多进程架构。Node....

    master_worker_pattern:主工人模式演示

    Master-Worker是一个很好用的设计模式, 它可以把一个大工作分成很多小工作去执行, 等所有小工作都回来以后, 将所有的小工作结果做一个统整, 得到最终结果, 就很像专题小组有五个人, 把系统分成四等分, 每个人负责一...

    开源项目-dwlnetnl-worker.zip

    【描述】"dwlnetnl-worker.zip"中的worker-master组件,很可能是项目的主要入口或控制器,它负责创建、管理和协调工作进程。在分布式系统或多线程编程中,工作池是一种常见的设计模式,通过预先创建一组可重用的工作...

    前端开源库-worker-client-server

    在前端开发领域,为了应对日益复杂的业务需求和性能挑战,开发者们不断探索新的技术与架构模式。"前端开源库-worker-client-server" 提供了一个这样的解决方案,它涉及到的主要知识点包括Web Worker、Client(客户端...

    详解通过源码解析Node.js中cluster模块的主要功能实现

    为了应对这个问题,Node.js内置了一个cluster模块,它允许我们以master-worker模式启动多个Node.js应用实例,从而充分利用多核CPU资源,并实现高可用性和负载均衡。 cluster模块的核心功能是创建一个master进程和多...

    java多线程.pdf

    Master-Worker模式中,Master线程负责分配任务,Worker线程负责执行任务;Producer-Consumer模式中,Producer负责生产数据,而Consumer负责消费数据。这些设计模式可以用来解决实际开发中遇到的多线程协同工作的常见...

    nginx.pptx

    在生产环境中,通常使用master-worker模式,因为它具有更高的稳定性和性能优化。在该模式下,一个master进程负责管理多个worker进程,接收和分发来自客户端的请求。worker进程执行实际的HTTP请求处理,提供服务。...

    Laravel开发-laravel-elasticbeanstalk-queue-worker

    在本文中,我们将深入探讨如何将Laravel应用程序与Amazon Web ...在laravel-elasticbeanstalk-queue-worker-master这个项目中,你将找到实现上述流程的具体示例和配置,帮助你在实践中更好地理解和运用这一技术。

    nginx-0.9.2.zip

    Nginx 主要有两种工作模式:Master-Worker 模式和 Event 模式。 1. **Master-Worker 模式**:主进程(Master)负责管理和调度工作进程(Worker),工作进程则处理实际的网络连接和请求。这种模式保证了 Nginx 在...

    tachyon 介绍

    Tachyon系统架构采用了Master-Worker模式,这种模式下,Tachyon系统由一个Master节点和多个Worker节点组成。Tachyon Master节点负责管理元数据和进行调度,Worker节点则负责实际的文件存储工作。由于Tachyon使用Java...

    cannon乘法mpi实现

    Master-Worker模式是常用的并行设计模式。它的核心思想是,系统有两个进程协议工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当各个Worker进程将子任务处理完后,将结果...

    dms-worker:数据管理系统-slave worker controller

    "dms-worker-master"这一文件名可能表示的是项目的主分支或主代码库。通常,这样的命名包含了项目的源码、配置文件、测试脚本等资源,用于构建和部署整个系统。深入研究这个代码库,我们可以了解到具体的设计模式、...

    hadoop-hdfs-1.docx

    3. HDFS架构:HDFS的架构采用master-worker模式,NameNode是master节点,DataNode是worker节点。NameNode负责维护文件系统的元数据,DataNode负责存储用户数据。 4. Secondary NameNode:Secondary NameNode是HDFS...

    nginx-1.7.10.tar.gz

    Nginx的工作模型是事件驱动的,主要分为两种工作模式:master-worker模式。Master进程负责管理和分发请求,Worker进程实际处理请求。这种设计使得Nginx能高效地处理高并发请求。 总结来说,"nginx-1.7.10.tar.gz"是...

Global site tag (gtag.js) - Google Analytics