Master
package master_worker; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { // 任务队列 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>(); // Worker进程队列 protected Map<String, Thread> threadMap = new HashMap<String, Thread>(); // 子任务处理结果 protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); // 是否子任务都结束了 public boolean isCompelete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } // Master构造,需要一个Worker进程逻辑,和需要Worker进程数量 public Master(Worker worker, int countWorker) { worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); for (int i = 0; i < countWorker; i++) { threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); } } // 提交一个任务 public void submit(Object job) { workQueue.add(job); } // 开始运行所有的Worker进程,进行处理 public void execute() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { entry.getValue().start(); } } // 返回子任务结果集 public Map<String, Thread> getThreadMap() { return threadMap; } }
Worker
package master_worker; import java.util.Map; import java.util.Queue; public abstract class Worker implements Runnable { // 任务队列,用于取得子任务 protected Queue<Object> workQueue; // 子任务的处理结果 protected Map<String, Object> resultMap; @Override public void run() { while (true) { // 获取子任务 Object input = workQueue.poll(); if (input == null) break; // 处理子任务 Object result = handle(input); // 将处理结果放入结果集 resultMap.put(Integer.toString(input.hashCode()), result); } } // 子任务处理逻辑,在子类中实现具体逻辑 public abstract Object handle(Object input); public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } public void setWorkQueue(Queue<Object> workQueue) { this.workQueue = workQueue; } }
PlusWorker
package master_worker; public class PlusWorker extends Worker { public Object handle(Object input){ Integer i = (Integer)input; return i*i*i; } }
Test
package master_worker; import java.util.Map; import java.util.Set; public class Test { public static void main(String[] args) { Master master = new Master(new PlusWorker(), 10); for (int i = 0; i < 1000; i++) { master.submit(i); } master.execute(); int result = 0; Map<String, Object> resultMap = master.getResultMap(); while (resultMap.size() > 0 || !master.isCompelete()) { Set<String> keys = resultMap.keySet(); String key = null; for (String k : keys) { key = k; break; } Integer i = null; if (key != null) { i = (Integer) resultMap.get(key); } if (i != null) { result += i; } if (key != null) { resultMap.remove(key); } } System.out.println(result); } }
相关推荐
Java多线程是一种允许同时执行多个线程的编程技术,它允许利用多核处理器的计算能力,提高应用程序的性能和响应能力。在介绍Java多线程的基本知识之前,首先提到的是Java多线程的实现方式。在Java中,创建线程主要有...
本文将深入探讨“ExecuteFramework:java多线程框架核心源码重写”这一主题,通过分析和解释提供的ExecuteFramework-master源码,我们将理解如何构建一个自定义的多线程执行框架。 首先,我们需要了解Java中实现多...
总的来说,JavaThreadTest所涉及的知识点主要包括:Java多线程、线程池的概念与实现、Master Worker模式、任务提交与结果获取、线程间的通信以及分布式计算的基本原理。理解并掌握这些知识,对于开发高效、可扩展的...
- Java提供了丰富的并发工具类,如`ConcurrentHashMap`、`ReentrantLock`等,源码中会用到这些工具保证多线程环境下的数据安全和同步。 4. **网络通信**: - Hera-Master可能使用了基于Java NIO或Netty的网络库...
* 并行模式:Amino将为应用程序提供一个或几个大家熟知的并行计算模式,如Master-Worker、Map-reduce、Divide and conquer、Pipeline等。 * 并行计算中的一般功能:Amino将为应用程序提供并行计算中常用的方法,如...
在Rails应用中,Sidekiq通过Redis作为后端存储,管理待处理任务的队列,并且使用多线程模型来并行执行任务。 Sidekiq的核心特点包括: 1. **高性能**:Sidekiq使用多线程模型,而不是Rails默认的单线程模型,从而...
- **进程启动**:master 进程启动 Dispatcher 和 ResourceManager 线程,TaskManagers 注册至 ResourceManager。 - **高可用性**:Standalone 集群默认不具备高可用性。若需支持高可用,需进行额外配置。 - **...
这通常涉及设置配置参数,如worker进程数量、执行器线程数等。 6. **容错机制**:Storm具有强大的容错能力,如果某个任务失败,它会自动重新分配并重试,保证数据不丢失。 7. **本地模式**:在开发和测试阶段,...
2. **创建Master和Worker进程**:Nginx启动后会产生一个Master进程和多个Worker进程。其中Master进程负责读取配置文件、维持Worker进程的运行状态;Worker进程则专门负责处理客户端请求。 3. **请求处理机制**:当...
- **多线程设计模式**: - **Future**:支持异步处理,通过`FutureTask`实现。 - **Master-Worker**:适用于可以分解的任务,提高并行处理效率。 - **Guarded Suspension**:在没有足够的资源时暂停执行,避免...
在BIO模式下,每个连接都需要一个单独的线程来处理,当并发连接数量较大时,系统会创建大量线程,消耗过多资源。在手写Tomcat的BIO部分,我们需要实现以下关键组件: 1. **Acceptor线程**:负责监听客户端的连接...
3. **多线程**:Java中的`java.lang.Thread`是处理多线程的基础,而`java.util.concurrent`包提供了高级并发工具,如`Callable`、`Future`、`Executor`和`BlockingQueue`,这些可能在`gdk_workerclient`中被用来实现...
在单机模式下,可以通过命令行启动master和worker,并使用spark-shell来执行任务。在local模式下,Spark会直接在本地机器上运行,而在localcluster模式下,会模拟一个集群环境。 在Spark中,有一个核心概念叫做...
例如,在多线程编程中,我们可以定义一个接口来处理线程结束的回调: ```java interface Callback { void onThreadFinished(); } class WorkerThread extends Thread { private Callback callback; public ...
- **Scheduler**:启动爬虫并初始化TaskQueue,创建监控线程,当TaskQueue为空且所有Worker线程空闲时,程序在10分钟无变化后退出。 - **Task Master**:管理TaskQueue,可采用内存或集中式任务队列如SQLite或...
Nginx采用Master-Worker多进程模型运行,其中Master进程负责读取和评估配置文件,并维持Worker进程的生命周期。Worker进程则实际处理请求。这种架构可以提高服务器的稳定性和可靠性,因为当一个Worker进程因处理请求...