实现解耦为:任务队列管理和线程管理
文分三部分:任务控制、线程池控制及Demo
类图:
类变量:
private int capacity; // 任务队列容量 private long timeout; // 任务超时时间(ms) private HashMap tasks;// task容器 private ArrayList readyTasks; // 准备就绪的待处理任务
1、任务控制部分
createTaskManager:创建任务管理器(静态)
/** * 创建TaskManager * * @param name * Task名称 * @param capacity * Task队列容量 * @param timeout * 任务超时时间 * @param processor * 任务处理器 * @param processorNum * 处理线程数 * @return TaskManager实例 */ public static TaskManager createTaskManager(String name, int capacity, long timeout, TaskProcessor processor, int processorNum) { TaskManager mgt = null; if (capacity >=0 && timeout >=0 && processor != null && processorNum > 0) { // 初始化管理器 mgt = new TaskManager(capacity, timeout); mgt.start(name, processor, processorNum); } return mgt; }
构造Task管理器:
注:包含一个Timer 定时延时任务处理器,用于任务超时处理
protected TaskManager(int capacity, long timeout ) { this.capacity = capacity; this.timeout = timeout; tasks = new HashMap(); readyTasks = new ArrayList(); // 任务定时器(超时任务处理) timer = new Timer(true); }
Task管理器启动mgt.start() :初始化线程控制器并启动分配线程处理任务
private void start(String name, TaskProcessor processor, int processorNum) { ProcessorControllor control = new ProcessorControllor(name, processorNum, processor, this); control.start(); }
线程控制器的构造:
public ProcessorControllor(String name, int processorNum, TaskProcessor processor, TaskManager manager){ super(name); this.taskManager = manager; this.processorNum = processorNum; this.processor = processor; this.setDaemon(true); // 线程池 创建指定数目的Thread、调度空闲线程 pool = new ThreadPool(name, this.processorNum); // 负责处理任务 threadPoolTask = new ThreadPoolTask(this.processor); }
2、线程池控制部分
线程池初始化:
public ThreadPool(String name, int maxNum) { if (maxNum > 0) { threads = new WorkThread[maxNum]; for(int i = 0; i < maxNum; i++) { threads[i] = new WorkThread(name, i); threads[i].start(); } statistic_start_time = System.currentTimeMillis(); TraceManager.TrDebug(null, "ThreadPool:" + name + " " + maxNum + " threads created."); } }
工作线程:WorkThread
做初始化,线程启动后处于阻塞状态,等待线程池抓取,空闲时交由任务处理
private class WorkThread extends Thread { // 工作线程在处理的任务task private Task task = null; private TaskParameter parameter = null; public WorkThread(String name, int index) { super("ThreadPool_" + name + "_" + index); } public void run() { TraceManager.TrDebug(null, Thread.currentThread().getName() + " started!"); while(true) { try { synchronized(this) { // 阻塞 直到线程空闲时将任务加入到task while(task == null) { this.wait(); } } toWork(); synchronized(this) { task = null; parameter = null; } workThreadDone(); } catch(InterruptedException ex) { TraceManager.TrException(null, ex); } catch(Exception ex) { TraceManager.TrException(null, ex); } } } }
线程Processor任务器:
class ThreadPoolTask implements ThreadTask { TaskProcessor processor; public ThreadPoolTask(TaskProcessor processor) { this.processor = processor; } public void run(TaskParameter parameter) { try { Task task = (Task)parameter; if (task.isTimeout()) { processor.timeout(task); }else { processor.process(task); } }catch(Exception ex){ CAP.trException(ex); } } }
线程池分配线程处理任务:
ProcessorControllor.run()
public void run() { CAP.trDebug(this.getName() + " started."); Task task = null; while (true) { // 抓取taskManager中准备就绪的任务 task = taskManager.processorControllorWait(); if (task != null) { // 获取空闲工作线程,唤醒线程 pool.getThread(threadPoolTask, task, true); } } }
从Task管理器中移除就绪任务
TaskManager.processorControllorWait():
protected Task processorControllorWait() { Task task = null; Object key = null; synchronized(readyTasks) { while (readyTasks.size() == 0) { try { readyTasks.wait(); }catch(Exception ex){ CAP.trException(ex); } } key = readyTasks.remove(0); } task = this.removeTask(key); return task; }
将就绪任务交由空闲WorkThread处理:
ThreadPool.getThread()
public int getThread(Task task, TaskParameter parameter, boolean blocked) { if(task == null) { return -1; } if (!blocked) { return getThread(task, parameter); } synchronized(this) { boolean over = false; long startTime = System.currentTimeMillis(); long elapsedTime = 0; while (!over) { for(int i = 0; i < threads.length; i++) { // 获得空闲线程 if (threads[i].isIdle(task, parameter)) { return i; } } try { // to block the calling thread. elapsedTime = System.currentTimeMillis() - startTime; if (elapsedTime < MAX_WAIT_TIME) { TraceManager.TrDebug(null, Thread.currentThread().getName() + " to wait."); this.wait(MAX_WAIT_TIME - elapsedTime); } else { over = true; TraceManager.TrDebug(null, Thread.currentThread().getName() + " waiting too long and will give up waiting."); } TraceManager.TrDebug(null, Thread.currentThread().getName() + " waked up!"); } catch(InterruptedException ex) { over = true; TraceManager.TrException(null, ex); } } } return -1; }
WorkThread唤醒,由Porcessor处理任务:
WorkThread.toWork()
private void toWork() { TraceManager.TrDebug(null, Thread.currentThread().getName() + ": to run user task."); try { this.task.run(parameter); } catch(Throwable ex) { TraceManager.TrException(null, ex);; } TraceManager.TrDebug(null, Thread.currentThread().getName() + ": user task done."); }
以上完成了Task队列、线程池的构造,现在可以添加任务到Task队列:
TaskManager.addTask
public int addTask(Task task, int priority, int taskStatus) { int rst = ERROR_INVALID_PARAMETER; if (task == null) { return rst; } Object taskKey = task.getPrimaryKey(); if (taskKey == null) { return rst; } if (priority != HIGH_PRIORITY && priority != LOW_PRIORITY) { return rst; } synchronized(tasks) { if (tasks.get(taskKey) != null) { CAP.trError("Queue key is duplicated."); return ERROR_KEY_DUPLICATED; } if (capacity > 0) { int size = tasks.size(); if (size >= capacity) { CAP.trError("Queue capacity exceeded and one task rejected!"); return ERROR_CAPACITY_EXCEEDED; } CAP.trDebug("Total tasks:" + size + "/" + capacity); } TaskContainer container = new TaskContainer(task); tasks.put(taskKey, container); timer.schedule(container, timeout); } if (taskStatus == Task.STATUS_READY) { synchronized(readyTasks) { if (priority == HIGH_PRIORITY ) { readyTasks.add(0, taskKey); }else { readyTasks.add(taskKey); } readyTasks.notifyAll(); } } rst = OK; return rst; }
3、Demo
自定义Task,实现抽象方法getPrimaryKey()
PrimaryKey为task容器的唯一主键
public class MyTask extends Task { @Override public Object getPrimaryKey() { return System.currentTimeMillis(); } }
自定义task 处理类,实现TaskProcessor接口
public class MyTaskProcessor implements TaskProcessor { public void process(Task task) { System.out.println(task.getPrimaryKey() + " processing..."); } public void timeout(Task task) { System.out.println(task.getPrimaryKey() + " timeout..."); }
创建任务管理器和添加任务:
public class MyManager { public static void main(String[] args) { TaskManager mgt = TaskManager.createTaskManager("MyManager", 5, 10 * 1000, new MyTaskProcessor(), 2); mgt.addTask(new MyTask(), TaskManager.HIGH_PRIORITY, Task.STATUS_READY); } }
相关推荐
理解线程的基本概念、状态以及如何管理和控制线程对于编写高效可靠的多线程应用程序至关重要。通过对同步机制、线程控制技术以及线程间的通信方式的掌握,开发者能够更好地利用多线程的优势解决实际问题。
文档指出,多线程通过并行处理可以在同一时间内完成多项任务,减少单个任务占用CPU时间,但同时也提到在单核CPU的计算机上,多线程实际上是通过时间分片(时间片轮转)来实现的。因此,如果线程频繁地被切换,反而...
文档中的代码示例进一步加深了对异步和多线程概念的理解,尤其是展示了在实际代码中如何将异步操作和多线程结合使用来实现更高效的任务处理。在多线程场景中,线程池被广泛使用,因为线程池可以管理和重用线程,减少...
Java多线程知识点梳理: 1. Java线程基础知识 - 线程是程序中独立的、并发的执行路径。每个线程都有自己的堆栈、程序计数器和局部变量,但与分隔的进程不同,线程之间的隔离程度较小,它们共享内存、文件句柄等...
与进程相比,线程具有更轻量级的特性,创建和销毁线程的成本更低,使得多线程成为实现并发执行任务的有效手段。 在Java等编程语言中,多线程的实现主要有两种方式:继承Thread类和实现Runnable接口。继承Thread类...
在阅读了文档“多线程实例MultiThread7在VS2005中实现.pdf”的部分内容后,我们可以从文档的描述中提取出与多线程编程相关的一系列知识点。文档中提到了Visual Studio 2005和MFC(Microsoft Foundation Classes)...
- 不断实践并深入了解多线程和并发的具体应用场景,比如数据库连接池管理、异步任务处理等。 - 关注最新的并发模型和技术趋势,如Reactor模式、CompletableFuture等。 通过以上详细的知识点梳理,我们可以看到多...
2. **如何实现多线程** - **题1**:给出一个例子展示如何使用`Thread`类创建线程。 - 创建`Thread`子类,重写`run()`方法,然后创建对象并调用`start()`方法。 - **题2**:给出一个例子展示如何使用`Runnable`...
Java多线程是指在Java语言中同时运行多个线程,从而实现对任务的并行处理。这是Java中一个非常重要的概念和技能,尤其在需要高并发处理和优化性能的场景中显得尤为重要。以下将详细梳理Java多线程编程中的一些关键...
- **目标读者**: 本教程主要面向具备丰富Java基础知识但缺乏多线程编程经验的学习者。 - **学习成果**: 学习者能够掌握编写简单的多线程程序的能力,并能够理解和分析使用线程的基本程序。 #### 二、线程基础知识 -...
4. **关键技术分析**:重点分析Java语言特性,如面向对象编程、异常处理、多线程、网络编程等,以及可能用到的框架,如Spring Boot、MyBatis等。 5. **编程实践**:通过实际编码,掌握Java编程技巧,实现功能模块,...
多线程指的是在单个进程内,允许同时存在两个或两个以上的线程执行任务,使得程序能够并发运行。 #### 线程与进程 线程和进程是操作系统中的基本概念。进程是系统进行资源分配和调度的一个独立单位,拥有独立的...
线程管理是多任务并发执行的关键。 2. **特性(Attribute)**:特性提供了一种在运行时传递元数据的方式,可以应用于类、方法、属性等各种程序元素。例如,`[AttributeUsage]`定义了特性如何被使用,`[Conditional]...
内核层提供了实时性保证、多任务管理、中断管理、定时器管理、信号量、互斥量等基本服务。组件层则是一些可选组件,如文件系统、网络协议栈等,用户可以根据实际需要选择性地加入。服务层则包括一些额外的服务和功能...
通过使用如Servlet、JavaBean、JDBC等技术以及HTTP、WebService、多线程、数据分包、断点续传等实现文件传输和管理。 6. 系统安全与认证 文中还提到了系统安全和用户认证问题。通过与全行统一AD服务集成,实现用户...
本篇文档对后端开发中的关键知识点进行了全面而深入的梳理,涵盖了Java语言基础、JVM、操作系统、网络技术、数据库、缓存、多线程、Spring框架等方面的核心概念和技术要点。以下是针对文档标题、描述以及部分内容中...
- **多线程技术**:在处理大量数据或并发任务时,多线程能提高系统性能,VB提供了Threading模坓支持多线程编程。 - **设计模式**:如工厂模式、单例模式等,这些设计模式可以提高代码的可读性和可维护性。 通过这个...