`
getthrough
  • 浏览: 9479 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

手动实现简单的线程池

    博客分类:
  • Java
阅读更多

手动实现简单的线程池

写在前面:

本文使用了 BlockingQueue 作为线程池实现的数据结构,利用生产者/消费者思想进行多任务的处理。

实现方式比较简单,并没有完全实现所有方法,本文可作为线程池和同步队列的入门学习参考。

受限于博主的姿势水平,本文中的一些方法肯定存在优化的空间及更好的实现方式,欢迎探讨。

 

基于 spring-boot 编写,测试。

 

1. 自定义线程池接口

package com.getthrough.threadpool.mythreadpool;

/**
 * <p>This interface is a top interface that defined several necessary methods,
 * it imitates {@link java.util.concurrent.ExecutorService},
 * {@link java.util.concurrent.ThreadPoolExecutor}
 * for personal learning.</p>
 * @author: getthrough
 * @date: 2018/5/20
 * @description:
 * @version:
 */
public interface ThreadPool {

    /**
     * to execute the given task in the future,
     * it can be executed by a thread or a thread pool.
     * @param runnable the given task
     */
    void execute(Runnable runnable);

    /**
     * It will close the thread pool after all submitted tasked are executed,
     * and will not accept new tasks.
     */
    void shutdown();

    /**
     * test whether the thread pool has been shut down.
     * @return the boolean result.
     */
    boolean isShutdown();

}

 

2. 线程池的默认实现

package com.getthrough.threadpool.mythreadpool.impl;

import com.getthrough.threadpool.mythreadpool.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author: getthrough
 * @date: 2018/5/20
 * @description:
 * @version:
 */
public class DefaultThreadPool implements ThreadPool {

    public Logger logger = LoggerFactory.getLogger(DefaultThreadPool.class);

    /**
     * Workers queue, get the task from {@code tasks} and run the task.
     */
    private BlockingQueue<Worker> workers = new LinkedBlockingQueue<>(DEFAULT_POOL_SIZE);

    /**
     * The queue to accept the tasks.
     */
    private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue(MAX_POOL_SIZE);

    private int corePoolSize = 0;

    private int maxPoolSize = 0;

    /**
     * How long will the worker waits(keep alive) for the task if there is no task in tasks.
     */
    private volatile long aliveTime = 0L;

    /**
     * The default pool size.
     */
    private static final int DEFAULT_POOL_SIZE = 20;
    /**
     * The maximum pool size.
     */
    private static final int MAX_POOL_SIZE = 30;

    private volatile boolean isShutdown = false;

    public DefaultThreadPool() throws InterruptedException {
        this.corePoolSize = DEFAULT_POOL_SIZE;
        this.maxPoolSize = MAX_POOL_SIZE;
        new DefaultThreadPool(DEFAULT_POOL_SIZE, MAX_POOL_SIZE);
    }

    public DefaultThreadPool(int corePoolSize, int maxPoolSize) {
        if (corePoolSize <= 0 || maxPoolSize <= 0 || aliveTime < 0)
            throw new IllegalArgumentException("ERROR:arguments must greater than zero!");
        if (corePoolSize > maxPoolSize)
            throw new IllegalArgumentException("ERROR:corePoolSize can't be greater than maxPoolSize!");

        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;

        for (int i = 0; i < corePoolSize; i ++) {
            Worker worker = new Worker(getTask(0L));
            workers.add(worker);
            worker.start();
        }

    }

    @Override
    public void execute(Runnable runnable) {
        if (isShutdown) {
            logger.info("pool is closed, you should call start method");
            return;
        }

        if (workers.size() < corePoolSize) {
            Worker worker = new Worker(runnable);
            workers.add(worker);
            worker.start();
            logger.info("task is immediately got by work : {}", worker.getName());
        } else if (workers.size() == corePoolSize) {
            try {
                tasks.put(runnable);
                logger.info("task waiting in the task queue...");
            } catch (InterruptedException e) {
                logger.info("application is busy, please try again later!");
            }
        }

    }

    @Override
    public void shutdown() {
        // reject the new task
        isShutdown = true;

        for(;;) {
            if (tasks.size() == 0){

                // clear the work queue
                workers.clear();
                break;

            }
        }

        logger.info("shutting down the pool");

    }

    @Override
    public boolean isShutdown() {
        return workers.size() == 0;
    }


    private Runnable getTask(long timeOut) {

        try {
            return tasks.poll(timeOut, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;

    }

    public void start() {
        isShutdown = false;
    }


    private class Worker extends Thread{

        private Runnable task;

        Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {

            while ((task != null || (task = getTask(60L)) != null)) {
                try {
//                    if (!Thread.interrupted())
                        task.run();
                    logger.info("worker : {} has finished the task.", getName());
                } finally {
                    task = null;
                }

            }

        }
    }

    public int getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(int corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

}

 

3. 简单的 main 方法测试

package com.getthrough.threadpool;

import com.getthrough.threadpool.mythreadpool.ThreadPool;
import com.getthrough.threadpool.mythreadpool.impl.DefaultThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author: getthrough
 * @date: 2018/5/21
 * @description:
 * @version:
 */
public class TestClass {

    private static Logger logger = LoggerFactory.getLogger(DefaultThreadPool.class);

    public static void main(String[] args) throws InterruptedException {
        ThreadPool threadPool = new DefaultThreadPool();

        for (int i = 0; i < 22; i++) {
            threadPool.execute(()-> {
                logger.info("TASK produced");
            });
            try {
                TimeUnit.MILLISECONDS.sleep(50L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        TimeUnit.SECONDS.sleep(1L);
        threadPool.shutdown();
        logger.info("shutdown : {}", threadPool.isShutdown());
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("task submit after shutdown");
            }
        });
        TimeUnit.SECONDS.sleep(1L);
        ((DefaultThreadPool)threadPool).start();
        logger.info("thread pool restarted ");

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                logger.info("task submit after restart");
            }
        });
        TimeUnit.SECONDS.sleep(1L);
        threadPool.shutdown();


    }

}

 

完整代码获取:https://github.com/Getthrough/my-threadpool/tree/master 

 

 

分享到:
评论

相关推荐

    Java 自己实现线程池

    Java开发,Android开发,自己实现线程池,明白线程池的实现机制

    linux线程池,c语言实现

    在C语言中实现线程池,通常会涉及以下关键知识点: 1. **线程基础**:在Linux环境下,线程是通过POSIX线程(pthread)库来创建和管理的。`pthread_create()`用于创建新线程,`pthread_join()`等待线程结束,`...

    qt 线程池实现多线程下载

    在`httpdownloadtool`这个项目中,可能包含了实现以上功能的源代码,包括`QRunnable`子类、线程池调度、限速策略以及进度显示的UI界面等组件。通过学习和分析这些代码,开发者可以更好地理解和应用Qt的多线程与网络...

    DELPHI的ThreadPool的线程池DEMO

    DELPHI的线程池(ThreadPool)是一种高效管理并发任务的技术,它允许程序在需要时创建线程,而不是每次需要执行任务时都手动创建。线程池通过预先创建一组线程,然后根据需要分配任务,减少了线程创建和销毁的开销,...

    系统线程池,windows操作系统自动的,使用简单

    相比传统的手动创建和管理线程,系统线程池能够减少线程的创建和销毁开销,提高系统的整体性能。 在Windows中,系统线程池(ThreadPool API)主要由以下组件组成: 1. **线程池工作项**(Work Items):这是线程池...

    易语言源码易语言Mysql线程池2.0模块源码.rar

    学习这个模块源码可以帮助易语言开发者了解如何在易语言中实现线程池,以及如何高效地使用MySQL进行并发操作。通过对源码的分析,开发者可以理解线程池的工作原理,提升编程技能,同时也为自定义线程池或扩展现有...

    C#线程池 所有线程运行完毕

    在C#编程中,线程池(ThreadPool)是一种高效的线程管理机制,它允许开发者...通过理解这些概念,开发者可以更好地掌握如何在C#中有效地使用线程池,实现多任务的并发执行,并确保在所有任务完成后正确地停止线程池。

    Delphi mcpage 线程池用法实例.rar

    - 引入库:首先需要将`mcpage`库导入到Delphi项目中,通常通过安装库的`.dproj`文件或手动添加`.pas`文件实现。 - 创建线程池:在代码中,使用`TThreadPool`类创建线程池对象,如`ThreadPool := TThreadPool....

    详细分析JAVA 线程池

    在Java 5以前,开发者必须手动实现自己的线程池。 4. Java 5中的Executors工厂类 从Java 5开始,Java内建支持线程池,引入了Executors工厂类,该工厂类包含多个静态工厂方法来创建线程池。 5. Executors工厂类中的...

    C# 多线程 线程池 线程同步

    4. **Monitor.TryEnter/Exit**:手动控制进入和退出临界区,避免死锁。 5. **ReaderWriterLockSlim**:读写锁,允许多个读取线程并行访问,但写入操作独占资源。 6. **EventWaitHandle**:事件标志,线程通过等待或...

    C++ 线程池

    在C++中,实现线程池通常有两种方式:手动实现和使用第三方库(如Boost.Thread或PPL)。 - **手动实现**:需要自己编写线程池类,包括线程管理、任务队列管理等功能。 - **使用第三方库**:例如Boost.Thread库提供...

    Linux线程池使用.docx

    以下是一个简单的C语言实现线程池的例子: ```c // main.c #include #include #include "thread_pool.h" // 任务处理函数 void *task_test(void *arg) { printf("working on task %d\n", (int)arg); sleep(1); ...

    Node.js 线程池 nPool.zip

    2. **API 简洁易用**:nPool 提供了简单直观的 API,让开发者可以轻松地在 Node.js 应用中创建和管理线程池。 3. **线程管理**:线程池能够自动管理线程的生命周期,包括创建、分配任务、回收等,减少了手动管理...

    易语言-易语言真正的线程池简易实现

    易语言简易线程池的实现 ——V雪落有声V原创 转载请保留 前文: 为了能充分理解本篇文章的内容,需要了解的知识如下: 1.事件对象的使用:http://baike.baidu.com/view/751499.htm 2.信号量的使用:...

    C++基于线程池技术实现大并发网络IO框架,一个基于C++11的轻量级网络框架

    简单易用的线程池,可以异步或同步执行任务,支持functional 和 lambad表达式。 工具库 文件操作。 std::cout风格的日志库,支持颜色高亮、代码定位、异步打印。 INI配置文件的读写。 监听者模式的消息广播器。 基于...

    uThreadPool线程池例程

    在本示例中,我们可以看到一系列与uThreadPool相关的源代码文件,包括`CnThreadPool.pas`、`uThreadPool.pas`等,这些都是实现线程池功能的核心文件。`CnThreadPool.pas`通常包含了线程池类的定义和实现,`...

    异步线程池框架,支持线程池动态变更&监控&报警,无需修改代码轻松引入

    在运行时,系统可以根据实时性能指标自动或手动调整线程池配置,例如,在面临突然的流量高峰时增加线程数,以处理更多的并发请求,而在低峰期减少线程数以节省资源。这种灵活性有助于提高系统的响应能力和资源利用率...

    最简单的C#线程池创建Demo演示代码

    在.NET框架中,C#语言提供了丰富的多线程支持,其中线程池是实现高效并发处理的一种重要机制。本文将详细解析如何在C#中创建并使用线程池,通过一个简单的Demo演示来阐述相关知识点。 线程池是操作系统提供的一种...

    基于反馈的自适应线程池管理框架.pdf

    2. **自适应线程池**:传统的线程池通常需要手动配置线程数量,并且这个数量在整个应用程序的生命周期中保持不变。自适应线程池则能够在运行时根据负载情况自动调整线程的数量,从而更好地匹配实际需求,提高系统的...

    linux下线程池

    在`wf170513-朱琴英-线程池`这个压缩包中,可能包含了一个关于线程池的示例代码,用于展示如何在C++或者其它编程语言中实现线程池。代码中可能会涉及以下关键部分: 1. **线程池类定义**:定义一个线程池类,包含...

Global site tag (gtag.js) - Google Analytics