`
dfwang
  • 浏览: 94402 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ThreadPool

    博客分类:
  • j2ee
阅读更多
最近面试被问到这个问题,正好找个线程池的实现代码,温习一下
/** 线程池类,工作线程作为其内部类 **/

package org.ymcn.util;

import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Logger;

/**
* 线程池
* 创建线程池,销毁线程池,添加新任务
*
* @author obullxl
*/
public final class ThreadPool {
    private static Logger logger = Logger.getLogger(ThreadPool.class);
    private static Logger taskLogger = Logger.getLogger("TaskLogger");

    private static boolean debug = taskLogger.isDebugEnabled();
    // private static boolean debug = taskLogger.isInfoEnabled();
    /* 单例 */
    private static ThreadPool instance = ThreadPool.getInstance();

    public static final int SYSTEM_BUSY_TASK_COUNT = 150;
    /* 默认池中线程数 */
    public static int worker_num = 5;
    /* 已经处理的任务数 */
    private static int taskCounter = 0;

    public static boolean systemIsBusy = false;

    private static List<Task> taskQueue = Collections
            .synchronizedList(new LinkedList<Task>());
    /* 池中的所有线程 */
    public PoolWorker[] workers;

    private ThreadPool() {
        workers = new PoolWorker[5];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new PoolWorker(i);
        }
    }

    private ThreadPool(int pool_worker_num) {
        worker_num = pool_worker_num;
        workers = new PoolWorker[worker_num];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new PoolWorker(i);
        }
    }

    public static synchronized ThreadPool getInstance() {
        if (instance == null)
            return new ThreadPool();
        return instance;
    }
    /**
    * 增加新的任务
    * 每增加一个新任务,都要唤醒任务队列
    * @param newTask
    */
    public void addTask(Task newTask) {
        synchronized (taskQueue) {
            newTask.setTaskId(++taskCounter);
            newTask.setSubmitTime(new Date());
            taskQueue.add(newTask);
            /* 唤醒队列, 开始执行 */
            taskQueue.notifyAll();
        }
        logger.info("Submit Task<" + newTask.getTaskId() + ">: "
                + newTask.info());
    }
    /**
    * 批量增加新任务
    * @param taskes
    */
    public void batchAddTask(Task[] taskes) {
        if (taskes == null || taskes.length == 0) {
            return;
        }
        synchronized (taskQueue) {
            for (int i = 0; i < taskes.length; i++) {
                if (taskes[i] == null) {
                    continue;
                }
                taskes[i].setTaskId(++taskCounter);
                taskes[i].setSubmitTime(new Date());
                taskQueue.add(taskes[i]);
            }
            /* 唤醒队列, 开始执行 */
            taskQueue.notifyAll();
        }
        for (int i = 0; i < taskes.length; i++) {
            if (taskes[i] == null) {
                continue;
            }
            logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
                    + taskes[i].info());
        }
    }
    /**
    * 线程池信息
    * @return
    */
    public String getInfo() {
        StringBuffer sb = new StringBuffer();
        sb.append("\nTask Queue Size:" + taskQueue.size());
        for (int i = 0; i < workers.length; i++) {
            sb.append("\nWorker " + i + " is "
                    + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
        }
        return sb.toString();
    }
    /**
    * 销毁线程池
    */
    public synchronized void destroy() {
        for (int i = 0; i < worker_num; i++) {
            workers[i].stopWorker();
            workers[i] = null;
        }
        taskQueue.clear();
    }

    /**
    * 池中工作线程
    *
    * @author obullxl
    */
    private class PoolWorker extends Thread {
        private int index = -1;
        /* 该工作线程是否有效 */
        private boolean isRunning = true;
        /* 该工作线程是否可以执行新任务 */
        private boolean isWaiting = true;

        public PoolWorker(int index) {
            this.index = index;
            start();
        }

        public void stopWorker() {
            this.isRunning = false;
        }

        public boolean isWaiting() {
            return this.isWaiting;
        }
        /**
        * 循环执行任务
        * 这也许是线程池的关键所在
        */
        public void run() {
            while (isRunning) {
                Task r = null;
                synchronized (taskQueue) {
                    while (taskQueue.isEmpty()) {
                        try {
                            /* 任务队列为空,则等待有新任务加入从而被唤醒 */
                            taskQueue.wait(20);
                        } catch (InterruptedException ie) {
                            logger.error(ie);
                        }
                    }
                    /* 取出任务执行 */
                    r = (Task) taskQueue.remove(0);
                }
                if (r != null) {
                    isWaiting = false;
                    try {
                        if (debug) {
                            r.setBeginExceuteTime(new Date());
                            taskLogger.debug("Worker<" + index
                                    + "> start execute Task<" + r.getTaskId() + ">");
                            if (r.getBeginExceuteTime().getTime()
                                    - r.getSubmitTime().getTime() > 1000)
                                taskLogger.debug("longer waiting time. "
                                        + r.info() + ",<" + index + ">,time:"
                                        + (r.getFinishTime().getTime() - r
                                                .getBeginExceuteTime().getTime()));
                        }
                        /* 该任务是否需要立即执行 */
                        if (r.needExecuteImmediate()) {
                            new Thread(r).start();
                        } else {
                            r.run();
                        }
                        if (debug) {
                            r.setFinishTime(new Date());
                            taskLogger.debug("Worker<" + index
                                    + "> finish task<" + r.getTaskId() + ">");
                            if (r.getFinishTime().getTime()
                                    - r.getBeginExceuteTime().getTime() > 1000)
                                taskLogger.debug("longer execution time. "
                                        + r.info() + ",<" + index + ">,time:"
                                        + (r.getFinishTime().getTime() - r
                                                .getBeginExceuteTime().getTime()));
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        logger.error(e);
                    }
                    isWaiting = true;
                    r = null;
                }
            }
        }
    }
}

/** 任务接口类 **/

package org.ymcn.util;

import java.util.Date;

/**
* 所有任务接口
* 其他任务必须继承访类
*
* @author obullxl
*/
public abstract class Task implements Runnable {
    // private static Logger logger = Logger.getLogger(Task.class);
    /* 产生时间 */
    private Date generateTime = null;
    /* 提交执行时间 */
    private Date submitTime = null;
    /* 开始执行时间 */
    private Date beginExceuteTime = null;
    /* 执行完成时间 */
    private Date finishTime = null;

    private long taskId;

    public Task() {
        this.generateTime = new Date();
    }

    /**
    * 任务执行入口
    */
    public void run() {
        /**
        * 相关执行代码
        *
        * beginTransaction();
        *
        * 执行过程中可能产生新的任务 subtask = taskCore();
        *
        * commitTransaction();
        *
        * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore());
        */
    }

    /**
    * 所有任务的核心 所以特别的业务逻辑执行之处
    *
    * @throws Exception
    */
    public abstract Task[] taskCore() throws Exception;

    /**
    * 是否用到数据库
    *
    * @return
    */
    protected abstract boolean useDb();

    /**
    * 是否需要立即执行
    *
    * @return
    */
    protected abstract boolean needExecuteImmediate();

    /**
    * 任务信息
    *
    * @return String
    */
    public abstract String info();

    public Date getGenerateTime() {
        return generateTime;
    }

    public Date getBeginExceuteTime() {
        return beginExceuteTime;
    }

    public void setBeginExceuteTime(Date beginExceuteTime) {
        this.beginExceuteTime = beginExceuteTime;
    }

    public Date getFinishTime() {
        return finishTime;
    }

    public void setFinishTime(Date finishTime) {
        this.finishTime = finishTime;
    }

    public Date getSubmitTime() {
        return submitTime;
    }

    public void setSubmitTime(Date submitTime) {
        this.submitTime = submitTime;
    }

    public long getTaskId() {
        return taskId;
    }

    public void setTaskId(long taskId) {
        this.taskId = taskId;
    }

}
分享到:
评论

相关推荐

    threadpool

    线程池(threadpool)是计算机程序中一种有效的多线程处理形式,它预先创建一组线程,待有任务需要执行时,从线程池中取出一个线程来执行任务,任务完成后,线程并不销毁,而是返回线程池等待下一次的任务分配。...

    Boost threadpool优先级实例

    Boost库是C++编程语言中的一个流行开源库,提供了丰富的功能,其中包括线程池(Boost.Threadpool)模块。本文将深入探讨如何使用Boost库中的线程池来处理具有优先级的任务,以及普通任务的执行。 首先,我们需要...

    C++11 线程池 ThreadPool

    线程池(ThreadPool)是一种管理线程资源的有效方式,它在现代并发编程中扮演着至关重要的角色。线程池允许程序预先创建一组线程,而不是每次需要时都创建新的线程,这样可以减少线程的创建和销毁开销,提高系统效率...

    threadpool.tar.gz

    在"threadpool.tar.gz"压缩包中,包含两个文件:threadpool.cc和threadpool.h,它们很可能是实现线程池的源代码文件。这里我们将详细讨论C++11引入的线程库以及如何在Linux/MacOS平台上利用这些新特性构建线程池。 ...

    Pythonpython threadpool python多线程 Python语言基础

    【Python】python threadpool python多线程 Python语言基础 文件清单 └── threadpool-1.2.7 ├── CHANGELOG.txt ├── doc │ ├── api │ │ ├── class-tree.html │ │ ├── epydoc.css │ │...

    C# ThreadPool 多线程 代码示例

    在.NET框架中,`ThreadPool`是一个非常重要的概念,它是一个预先初始化的线程集合,用于高效地执行异步任务。`ThreadPool`管理线程的创建和销毁,优化系统资源的使用,尤其适合处理大量短生命周期的任务。本示例将...

    ThreadPool 线程池管理单元

    ThreadPool 线程池管理单元 带调用例子

    threadpool_src.zip

    标题 "threadpool_src.zip" 暗示了这是一个关于线程池实现的源代码压缩包。线程池是一种多线程编程中的管理机制,它允许高效地管理和调度多个并发任务,通过预先创建并维护一组可重用的工作线程来提高系统资源利用率...

    ThreadPool-master.zip

    `ThreadPool-master.zip`中的项目提供了一个线程池的实现示例,旨在帮助程序员理解线程池的调度和管理机制。 线程池的基本思想是预先创建一定数量的线程,这些线程等待待处理的任务。当有新任务到来时,线程池会将...

    C# Thread、ThreadPool、Task测试

    在C#编程中,线程(Thread)、线程池(ThreadPool)和任务(Task)是并行处理和异步操作的重要组成部分。理解它们的工作原理和使用方法对于优化应用程序的性能至关重要。下面将详细阐述这三个概念及其相关知识点。 ...

    线程池threadpool_src

    通过分析和理解“线程池threadpool_src”的源代码,开发者可以学习如何自定义线程池,如何优化任务调度策略,以及如何在多线程环境下保证程序的稳定性和效率。同时,了解线程池的工作原理对于提升软件的并发处理能力...

    boost threadpool(修复内存泄露后的版本)

    Boost Threadpool库是一个高效、灵活且可定制的线程池实现,它被广泛用于多线程编程中,特别是在C++环境中。线程池允许开发者管理一组预创建的线程,而不是为每个任务创建新的线程,这能显著提高程序性能并减少系统...

    Tomat研究之ThreadPool

    主要类包括`ThreadWithAttributes`、`ControlRunnable`、`ThreadPool`、`MonitorRunnable`和`ThreadPoolListener`等,这些类构成了线程池的主要框架。 1. **`ThreadWithAttributes`**:这个类负责设置和获取线程...

    DELPHI的ThreadPool的线程池DEMO

    DELPHI的线程池(ThreadPool)是一种高效管理并发任务的技术,它允许程序在需要时创建线程,而不是每次需要执行任务时都手动创建。线程池通过预先创建一组线程,然后根据需要分配任务,减少了线程创建和销毁的开销,...

    QT_ThreadPool.rar

    QT_ThreadPool是一个基于QT5框架实现的线程池项目,旨在提供一种高效、灵活的多线程处理方式。线程池是一种线程管理机制,它预先创建一组线程,待有任务需要执行时,从线程池中分配线程来执行任务,而不是每次任务...

    threadPool的实现代码

    threadPool的实现代码

Global site tag (gtag.js) - Google Analytics