本文使用了 BlockingQueue 作为线程池实现的数据结构,利用生产者/消费者思想进行多任务的处理。
基于 spring-boot 编写,测试。
1. 自定义线程池接口
package com.getthrough.threadpool.mythreadpool; /** * <p>This interface is a top interface that defined several necessary methods, * it imitates {@link java.util.concurrent.ExecutorService}, * {@link java.util.concurrent.ThreadPoolExecutor} * for personal learning.</p> * @author: getthrough * @date: 2018/5/20 * @description: * @version: */ public interface ThreadPool { /** * to execute the given task in the future, * it can be executed by a thread or a thread pool. * @param runnable the given task */ void execute(Runnable runnable); /** * It will close the thread pool after all submitted tasked are executed, * and will not accept new tasks. */ void shutdown(); /** * test whether the thread pool has been shut down. * @return the boolean result. */ boolean isShutdown(); }
2. 线程池的默认实现
package com.getthrough.threadpool.mythreadpool.impl; import com.getthrough.threadpool.mythreadpool.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @author: getthrough * @date: 2018/5/20 * @description: * @version: */ public class DefaultThreadPool implements ThreadPool { public Logger logger = LoggerFactory.getLogger(DefaultThreadPool.class); /** * Workers queue, get the task from {@code tasks} and run the task. */ private BlockingQueue<Worker> workers = new LinkedBlockingQueue<>(DEFAULT_POOL_SIZE); /** * The queue to accept the tasks. */ private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue(MAX_POOL_SIZE); private int corePoolSize = 0; private int maxPoolSize = 0; /** * How long will the worker waits(keep alive) for the task if there is no task in tasks. */ private volatile long aliveTime = 0L; /** * The default pool size. */ private static final int DEFAULT_POOL_SIZE = 20; /** * The maximum pool size. */ private static final int MAX_POOL_SIZE = 30; private volatile boolean isShutdown = false; public DefaultThreadPool() throws InterruptedException { this.corePoolSize = DEFAULT_POOL_SIZE; this.maxPoolSize = MAX_POOL_SIZE; new DefaultThreadPool(DEFAULT_POOL_SIZE, MAX_POOL_SIZE); } public DefaultThreadPool(int corePoolSize, int maxPoolSize) { if (corePoolSize <= 0 || maxPoolSize <= 0 || aliveTime < 0) throw new IllegalArgumentException("ERROR:arguments must greater than zero!"); if (corePoolSize > maxPoolSize) throw new IllegalArgumentException("ERROR:corePoolSize can't be greater than maxPoolSize!"); this.corePoolSize = corePoolSize; this.maxPoolSize = maxPoolSize; for (int i = 0; i < corePoolSize; i ++) { Worker worker = new Worker(getTask(0L)); workers.add(worker); worker.start(); } } @Override public void execute(Runnable runnable) { if (isShutdown) { logger.info("pool is closed, you should call start method"); return; } if (workers.size() < corePoolSize) { Worker worker = new Worker(runnable); workers.add(worker); worker.start(); logger.info("task is immediately got by work : {}", worker.getName()); } else if (workers.size() == corePoolSize) { try { tasks.put(runnable); logger.info("task waiting in the task queue..."); } catch (InterruptedException e) { logger.info("application is busy, please try again later!"); } } } @Override public void shutdown() { // reject the new task isShutdown = true; for(;;) { if (tasks.size() == 0){ // clear the work queue workers.clear(); break; } } logger.info("shutting down the pool"); } @Override public boolean isShutdown() { return workers.size() == 0; } private Runnable getTask(long timeOut) { try { return tasks.poll(timeOut, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return null; } public void start() { isShutdown = false; } private class Worker extends Thread{ private Runnable task; Worker(Runnable task) { this.task = task; } @Override public void run() { while ((task != null || (task = getTask(60L)) != null)) { try { // if (!Thread.interrupted()) task.run(); logger.info("worker : {} has finished the task.", getName()); } finally { task = null; } } } } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } }
3. 简单的 main 方法测试
package com.getthrough.threadpool; import com.getthrough.threadpool.mythreadpool.ThreadPool; import com.getthrough.threadpool.mythreadpool.impl.DefaultThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; /** * @author: getthrough * @date: 2018/5/21 * @description: * @version: */ public class TestClass { private static Logger logger = LoggerFactory.getLogger(DefaultThreadPool.class); public static void main(String[] args) throws InterruptedException { ThreadPool threadPool = new DefaultThreadPool(); for (int i = 0; i < 22; i++) { threadPool.execute(()-> { logger.info("TASK produced"); }); try { TimeUnit.MILLISECONDS.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } } TimeUnit.SECONDS.sleep(1L); threadPool.shutdown(); logger.info("shutdown : {}", threadPool.isShutdown()); threadPool.execute(new Runnable() { @Override public void run() { logger.info("task submit after shutdown"); } }); TimeUnit.SECONDS.sleep(1L); ((DefaultThreadPool)threadPool).start(); logger.info("thread pool restarted "); threadPool.execute(new Runnable() { @Override public void run() { logger.info("task submit after restart"); } }); TimeUnit.SECONDS.sleep(1L); threadPool.shutdown(); } }
在C语言中实现线程池,通常会涉及以下关键知识点: 1. **线程基础**:在Linux环境下,线程是通过POSIX线程(pthread)库来创建和管理的。`pthread_create()`用于创建新线程,`pthread_join()`等待线程结束,`...
相比传统的手动创建和管理线程,系统线程池能够减少线程的创建和销毁开销,提高系统的整体性能。 在Windows中,系统线程池(ThreadPool API)主要由以下组件组成: 1. **线程池工作项**(Work Items):这是线程池...
- 引入库:首先需要将`mcpage`库导入到Delphi项目中,通常通过安装库的`.dproj`文件或手动添加`.pas`文件实现。 - 创建线程池:在代码中,使用`TThreadPool`类创建线程池对象,如`ThreadPool := TThreadPool....
在Java 5以前,开发者必须手动实现自己的线程池。 4. Java 5中的Executors工厂类 从Java 5开始,Java内建支持线程池,引入了Executors工厂类,该工厂类包含多个静态工厂方法来创建线程池。 5. Executors工厂类中的...
4. **Monitor.TryEnter/Exit**:手动控制进入和退出临界区,避免死锁。 5. **ReaderWriterLockSlim**:读写锁,允许多个读取线程并行访问,但写入操作独占资源。 6. **EventWaitHandle**:事件标志,线程通过等待或...
在C++中,实现线程池通常有两种方式:手动实现和使用第三方库(如Boost.Thread或PPL)。 - **手动实现**:需要自己编写线程池类,包括线程管理、任务队列管理等功能。 - **使用第三方库**:例如Boost.Thread库提供...
以下是一个简单的C语言实现线程池的例子: ```c // main.c #include #include #include "thread_pool.h" // 任务处理函数 void *task_test(void *arg) { printf("working on task %d\n", (int)arg); sleep(1); ...
2. **API 简洁易用**:nPool 提供了简单直观的 API,让开发者可以轻松地在 Node.js 应用中创建和管理线程池。 3. **线程管理**:线程池能够自动管理线程的生命周期,包括创建、分配任务、回收等,减少了手动管理...
易语言简易线程池的实现 ——V雪落有声V原创 转载请保留 前文: 为了能充分理解本篇文章的内容,需要了解的知识如下: 1.事件对象的使用:http://baike.baidu.com/view/751499.htm 2.信号量的使用:...
简单易用的线程池,可以异步或同步执行任务,支持functional 和 lambad表达式。 工具库 文件操作。 std::cout风格的日志库,支持颜色高亮、代码定位、异步打印。 INI配置文件的读写。 监听者模式的消息广播器。 基于...
tcp/udp服务器,使用非常简单,只要实现具体的tcp/udp会话(Session类)逻辑,使用模板的方式可以快速的构建高性能的服务器。 对套接字多种操作的封装。 线程库 使用线程实现的简单易用的定时器。 信号量。 线程组。 ...
在.NET框架中,C#语言提供了丰富的多线程支持,其中线程池是实现高效并发处理的一种重要机制。本文将详细解析如何在C#中创建并使用线程池,通过一个简单的Demo演示来阐述相关知识点。 线程池是操作系统提供的一种...
2. **自适应线程池**:传统的线程池通常需要手动配置线程数量,并且这个数量在整个应用程序的生命周期中保持不变。自适应线程池则能够在运行时根据负载情况自动调整线程的数量,从而更好地匹配实际需求,提高系统的...