package test; import java.util.Random; public class Main { public static void main(String[] args) { System.out.println("我的机器可用Processor数量:" + Runtime.getRuntime().availableProcessors()); Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors()); Random r = new Random(); for(int i = 1; i<= 100; i++){ Task t = new Task(); t.setId(i); t.setName("任务"+i); t.setPrice(r.nextInt(1000)); master.submit(t); } master.execute(); long start = System.currentTimeMillis(); while(true){ if(master.isComplete()){ long end = System.currentTimeMillis() - start; int ret = master.getResult(); System.out.println("最终结果:" + ret + ", 执行耗时:" + end); break; } } } }
package test; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //1 应该有一个承装任务的集合 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); //2 使用HashMap去承装所有的worker对象 private HashMap<String, Thread> workers = new HashMap<String, Thread>(); //3 使用一个容器承装每一个worker并非执行任务的结果集 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); //4 构造方法 public Master(Worker worker, int workerCount){ // 每一个worker对象都需要有Master的引用 workQueue用于任务的领取,resultMap用于任务的提交 worker.setWorkerQueue(this.workQueue); worker.setResultMap(this.resultMap); for(int i = 0 ; i < workerCount; i++){ //key表示每一个worker的名字, value表示线程执行对象 workers.put("子节点" + Integer.toString(i), new Thread(worker)); } } //5 提交方法 public void submit(Task task){ this.workQueue.add(task); } //6 需要有一个执行的方法(启动应用程序 让所有的worker工作) public void execute(){ for(Map.Entry<String, Thread> me : workers.entrySet()){ me.getValue().start(); } } //8 判断线程是否执行完毕 public boolean isComplete() { for(Map.Entry<String, Thread> me : workers.entrySet()){ if(me.getValue().getState() != Thread.State.TERMINATED){ return false; } } return true; } //9 返回结果集数据 public int getResult() { int ret = 0; for(Map.Entry<String, Object> me : resultMap.entrySet()){ //汇总的逻辑.. ret += Integer.parseInt((String)me.getValue()); } return ret; } }
package test; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workQueue; private ConcurrentHashMap<String, Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) { this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while(true){ Task input = this.workQueue.poll(); if(input == null) break; //真正的去做业务处理 Object output = MyWorker.handle(input); this.resultMap.put(Integer.toString(input.getId()), output); } } public static Object handle(Task input) { return null; } }
package test; public class Task { private int id ; private String name; private int price; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } }
package test; public class MyWorker extends Worker { public static Object handle(Task input) { Object output = null; try { //表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库... Thread.sleep(500); output = input.getPrice(); } catch (InterruptedException e) { e.printStackTrace(); } return output; } }
相关推荐
在构建分布式系统时,Master-Worker架构是一种常见的设计模式,特别是在需要执行定时任务的场景下。本项目结合Etcd和MongoDB实现了一个基于这种架构的crontab系统,旨在提供可扩展、高可用的定时任务调度解决方案。...
这是master-worker模式自我恢复能力的关键部分。 4. **进程管理**: - 通过`ps -aux`命令可以看到运行的进程状态。在示例中,master_process和多个worker_process清晰可见。 - 当手动杀死一个worker进程(如kill ...
- **Master/Worker模式**: Spark集群通常采用Master/Worker架构,其中Master节点负责任务调度,Worker节点执行具体计算任务。 - **部署方式**: 可选择Standalone模式或者YARN模式。本文重点介绍YARN模式下的部署。 ...
1. **Spark架构和核心组件**:了解Spark的运行架构以及Master/Worker模式。 2. **RDD(弹性分布式数据集)**:掌握RDD的基本操作,如转换(Transformation)和行动(Action)。 3. **Spark SQL**:介绍如何使用Spark...
- **Master/Worker模式**:在Spark集群中,有一个主节点(Master)负责调度任务并监控集群状态,多个工作节点(Worker)执行具体的计算任务。 - **容错机制**:通过数据冗余存储以及任务重试等方式确保分布式环境...
Master-Worker是一个很好用的设计模式, 它可以把一个大工作分成很多小工作去执行, 等所有小工作都回来以后, 将所有的小工作结果做一个统整, 得到最终结果, 就很像专题小组有五个人, 把系统分成四等分, 每个人负责一...
- **Hadoop MapReduce架构**:通过Master/Worker模式实现大规模数据并行处理。 - **应用场景**:数据清洗、统计分析等。 **4.2 Spark** - **Apache Spark生态系统**:包含多个项目,如Spark SQL、MLlib等。 - **...
2. Master/Worker架构的应用:在分布式系统中,Master/Worker是常见的并行计算架构,用于管理计算作业的生命周期。Master节点负责将计算任务拆分、分配计算资源、以及监控任务执行等管理工作,而Worker节点则负责...
GatewayWorker的工作模式有四种:'worker'(普通工作进程)、'gateway'(网关进程)、'business'(业务逻辑进程)和'register'(注册服务器)。网关进程负责与浏览器建立长连接,业务进程则处理业务逻辑,而注册...
Master Worker模式的核心思想是将一个大任务分解为许多小任务,由一个主进程(Master)分配给多个工作进程(Worker)。Master负责任务的调度、监控和协调,而Worker则专注于执行具体的任务。在Java中,我们可以使用...
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://Master.hadoop:7077 ``` 可以通过 Web 界面 http://masterIP:8080 查看 Master 和 Worker 节点的状态。 Spark Component Spark 组件包括 ...
`org.apache.spark.deploy.master.Master`是Spark Master的主入口点,负责启动和监听网络接口,处理来自Worker和Driver的连接请求。此外,`org.apache.spark.rpc.RpcEnv`是Spark的远程过程调用(RPC)框架,用于...
在前端开发领域,为了应对日益复杂的业务需求和性能挑战,开发者们不断探索新的技术与架构模式。"前端开源库-worker-client-server" 提供了一个这样的解决方案,它涉及到的主要知识点包括Web Worker、Client(客户端...
【主从模型(Master/Worker)】是描述GPU-CPU协作计算的一种常见模型。在这个模型中,CPU作为“主”(Master),负责整体任务的管理和调度,而GPU作为“从”(Worker),执行由CPU分配的并行计算任务。这种模型有效...
8. **代码结构与设计模式**:"elegant-laravel-worker"可能采用了设计模式如观察者模式(Observer Pattern)来实现信号监听,以及策略模式(Strategy Pattern)来处理不同类型的停止逻辑。 9. **配置与部署**:部署...
2. **代码部署**:将“thinkphp5-Gatewayworker-web--master”文件夹上传至服务器,并根据项目需求配置相应的环境变量和配置文件。 3. **数据库配置**:创建数据库并导入项目的数据库脚本,根据实际设置调整数据库...
4. **心跳机制**:为了确保Worker节点的活性,Master和Worker之间需定期发送心跳消息。 5. **资源分配**:Master根据任务需求和Worker可用资源进行任务调度。 6. **故障恢复**:模拟实现中还需要考虑如何在Worker...