`
vortexchoo
  • 浏览: 66029 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

线程池+队列

    博客分类:
  • java
阅读更多
笔记:自己实现的线程池+队列。

package org.vic.thread.core;

import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public abstract class ThreadPool<T> implements Runnable {

private Integer defaultPoolSize = 10;
private Integer defaultQueueSize = 10;

private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

private Long take_waittingMillis = 0L;
private Long put_warttingMillis = 0L;

private LinkedList<T> queue = new LinkedList<T>();

private boolean start = false;

public void take_waittingMills(Long val) {
this.take_waittingMillis = val;
}
public void put_waittingMills(Long val) {
this.put_warttingMillis = val;
}

/**
* @author Vic.Chu
* @param poolSize
* @param queueSize
* @param takeWaittingMillis
* @param putWaittingMillis
* Constructor
*/
public ThreadPool (Integer poolSize, Integer queueSize, Long takeWaittingMillis, Long putWaittingMillis) {
if (poolSize != null && poolSize > 0) {
defaultPoolSize = poolSize;
}
if (queueSize != null && queueSize > 0) {
defaultQueueSize = queueSize;
}
if (takeWaittingMillis != null && takeWaittingMillis > 0) {
take_waittingMills(takeWaittingMillis);
}
if (putWaittingMillis != null && putWaittingMillis > 0) {
put_waittingMills(putWaittingMillis);
}
}

/**
* @author Vic.Chu
* Constructor
*/
public ThreadPool () {}

/**
* threads will invoke this method after starting.
*/
@Override
public void run() {
try {
while (true) {
T t = take();
if (t == null) continue;
doProcess(t);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* @author Vic.Chu
* @param t
* Programmers have to implement this method for processing their data.
* @return
*/
public abstract T doProcess(T t);

/**
* @author Vic.Chu
* When new this ThreadPool, you should start it!
*/
public void start() {
if (!start) {
for (int i = 0; i < defaultPoolSize; i++) {
Thread thread = new Thread(this);
thread.start();
}
this.start = true;
}
}

/**
* @author Vic.Chu
* @param collection. the data collection which should be processed.
* @throws Exception. If the collection is null or is empty, this exception will be thrown.
*/
public void putCollections (Collection<? extends T> collection) throws Exception {
if (collection == null || collection.size() == 0) {
throw new Exception("Collection is null or is empty!");
}
for (T t : collection) {
try {
this.put(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* @author Vic.Chu
* @param t
* @throws InterruptedException
* Put a data element into the queue, await the thread to process.
*/
private void put(T t) throws InterruptedException {
lock.lock();
try {
if (queue.size() >= defaultQueueSize) {
notFull.await();
} else {
notEmpty.signal();
Thread.sleep(this.put_warttingMillis);
queue.add(t);
}
} finally {
lock.unlock();
}
}

/**
* @author Vic.Chu
* @return
* @throws InterruptedException
* Take a data element from the queue and destroy this data element in the queue.
*/
private T take() throws InterruptedException {
lock.lock();
try {
if (queue.size() == 0) {
notEmpty.await();
return null;
} else {
notFull.signal();
Thread.sleep(this.take_waittingMillis);
T t = queue.getFirst();
queue.removeFirst();
return t;
}
} finally {
lock.unlock();
}
}

/**
* @author Vic.Chu
* @return
* Get data elements count in the queue.
*/
public Integer currentQueueSize () {
return queue.size();
}

/**
* @author Vic.Chu
* @return
* Get this thread pool's size.
*/
public Integer poolSize () {
return this.defaultPoolSize;
}

/**
* @author Vic.Chu
* @return
* Get this queue's size
*/
public Integer queueSize () {
return this.defaultQueueSize;
}

/**
* test
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ThreadPool<String> tp = new ThreadPool<String>(5, 5, 0L, 0L) {
@Override
public String doProcess(String t) {
System.out.println("current process is "
+ Thread.currentThread().getName());
System.out.println(t);
return t;
}
};
tp.start();
tp.putCollections(Arrays.asList(new String[]{"s","t","r","i","n","g"}));
}
}
分享到:
评论

相关推荐

    线程池&&队列各类区别使用场景

    线程池和队列在IT领域中是两个非常重要的概念,尤其在多线程编程和并发处理中扮演着核心角色。它们各自有独特的特性和适用场景,理解它们的区别和使用场景对于优化系统性能至关重要。 首先,让我们从线程池开始。...

    Java实现的线程池、消息队列功能

    标题中的“Java实现的线程池、消息队列功能”是指在Java编程中,如何利用编程技术实现线程池和消息队列这两种重要的并发处理机制。线程池和消息队列是解决多线程环境下资源管理和任务调度的有效手段,它们在高并发、...

    ThreadPool:线程池+任务队列

    4. **任务提交接口**:用户通过这个接口向线程池提交任务,将任务添加到任务队列中。 5. **同步机制**:如互斥量`std::mutex`、条件变量`std::condition_variable`,用于保证线程安全和协调线程间通信。 线程池的...

    Qt线程池+多线程使用

    如果没有,任务会被放入队列等待,直到有线程可用。这种机制减少了线程的创建和销毁时间,提高了系统的整体性能。 要使用Qt线程池,我们需要创建一个继承自`QRunnable`的类,这个类将定义我们的任务逻辑。例如,...

    鱼刺线程池+拨号,失败重试框架源码

    1. **线程池设计**:鱼刺线程池的实现可能包括工作队列的设计,如何实现线程的创建、复用和销毁,以及如何根据任务性质进行调度。开发者应关注线程池的初始化参数,如核心线程数、最大线程数、线程存活时间等,以及...

    C++ 线程池+任务池

    任务池则是一种任务调度机制,它将待处理的任务放入一个队列,由线程池中的线程按顺序或并行地从队列中取出任务进行执行。这种方式使得任务的提交和执行解耦,简化了并发编程的复杂性。 在C++中,虽然标准库并没有...

    并发-线程池和阻塞队列.pdf

    线程池和阻塞队列是实现并发的关键组件之一。本文将详细介绍线程池原理、使用场景及注意事项,以及阻塞队列的相关知识。 首先,线程池是一种基于池化思想管理线程的技术,它可以重用一组线程执行多个任务。线程池的...

    vc线程池+iocp

    线程池的任务调度通常是通过工作队列来实现,当有新任务时,线程池会从空闲线程中选择一个来执行,如果没有空闲线程,则等待,直到有线程完成任务并返回到线程池中。 IOCP是Windows操作系统提供的一种高级I/O模型,...

    Linux 线程池+连接池

    在Linux环境下,可以通过pthread库来实现线程池,通过设置线程的数量、任务队列和线程调度策略来优化性能。 其次,连接池是针对网络通信的一种优化策略,主要应用于数据库连接、HTTP连接等。它预先建立一组连接,...

    c++线程池+任务池

    网上有现成的cthreadpool的代码,是c版的,且没有任务队列功能,不能做到任务的安全退出,根据实际需要写了一个c++版的,本程序实现了c++的封装,并实现了任务池和线程安全退出。弥补了c版的不足 在linux调试通过并...

    简单实用,线程池+socket收发数据+解析字节格式报文

    线程池,另一方面,是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池可以有效控制运行的线程数量,当线程空闲时,会自动回收,避免了频繁创建和销毁线程的开销,提高了...

    Android中的线程池与任务队列

    线程池和任务队列是实现这一目标的关键工具。本文将深入探讨Android中线程池与任务队列的概念、工作原理以及它们如何协同工作。 线程池(ThreadPool)是一种多线程处理形式,处理过程中将任务添加到队列,然后在...

    线程池 工作队列 C++ work Queue 示例

    工作队列是线程池的核心组成部分,它负责存储待执行的任务,然后由线程池中的工作线程来处理这些任务。在Windows操作系统中,系统提供了内置的线程池API,使得开发者可以方便地利用线程池进行任务调度。 线程池的...

    线程池.zip,互斥锁+条件变量+队列,实现线程池,包括线程池的创建,塞任务,和销毁线程池

    在本压缩包"线程池.zip"中,我们可以看到涉及到的核心概念有:线程池、互斥锁、条件变量以及队列。 线程池: 线程池是程序中预创建的一组可重用线程,这些线程在等待执行任务。当有新的任务到来时,线程池会挑选一...

    线程池+socket

    - **Java中的线程池**:Java的`java.util.concurrent`包提供ExecutorService接口及其实现类如ThreadPoolExecutor,可以定制线程池的大小、任务队列、拒绝策略等。 2. **Socket**: - **定义**:Socket,也称为套...

    Linux下使用EPoll+队列+多线程的C++实现

    在这个“Linux下使用EPoll+队列+多线程的C++实现”项目中,开发者使用了`epoll`来监听并管理TCP服务器的连接。`epoll`的工作流程大致如下: 1. **创建epoll句柄**:首先,通过`epoll_create()`函数创建一个epoll...

    并发-线程池和阻塞队列

    在Java编程中,"并发-线程池和阻塞队列"是两个核心概念,它们在多线程环境下处理任务调度和数据同步方面发挥着重要作用。线程池是一种管理线程资源的有效方式,而阻塞队列则常用于线程间通信和数据共享。 线程池...

    线程池 和队列-Brian Goetz

    任务队列是线程池中线程用来获取任务的队列。正确使用队列可以帮助线程池更有效地管理任务的执行顺序和依赖关系。队列的类型和容量选择也是实现高性能线程池的关键因素之一。 在Java等编程语言中,已经提供了内置的...

    workquere工作队列 多线程

    1. **线程池**:C#中的`System.Threading.ThreadPool`类是工作队列的基础,它提供了一组线程,用于执行异步任务。线程池能够根据系统负载动态调整线程数量,从而减少创建和销毁线程的开销。 2. **任务调度**:在C#...

Global site tag (gtag.js) - Google Analytics