`
truemylife
  • 浏览: 230154 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

一个任务队列的BlockingQueue实现

阅读更多

一、Concurrent简单介绍

Concurrentjdk1.5推出来的对多线程实现的进一步封装,它大大的简化了多线程开发。concurrent包分成了三个部分,分别是java.util.concurrentjava.util.concurrent.atomicjava.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。

       Executor:具体Runnable任务的执行者。

ExecutorService:一个线程池管理者,其实现类有多种,比如普通线程池,定时调度线程池ScheduledExecutorService等,我们能把一个

Runnable,Callable提交到池中让其调度。

Future:是与Runnable,Callable进行交互的接口,比如一个线程执行结束后取返回的结果等等,还提供了cancel终止线程。

BlockingQueue:阻塞队列。

 

 

 

二、一个任务队列的BlockingQueue实现

 

public int FetchInvertal = 1000;
public Datum newdatum = null;
// Creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE.
// 理论上Integer.MAX_VALUE.的任务排队
final BlockingQueue<Datum> queue = new LinkedBlockingQueue<Datum>();
// 线程池
// 其中一个线程,取出待处理资料,put到queue
// 其余四个线程处理具体业务
final ExecutorService fetchdataservice = Executors
				.newSingleThreadExecutor();
final ExecutorService convertservice = Executors.newFixedThreadPool(4);
//计数
final AtomicInteger wc = new AtomicInteger();
final AtomicReference<String> atomstarttime=new AtomicReference<String>(null);
// FetchList线程
Runnable fetchlist = new Runnable() {
public void run() {
while (true) {
	try {
		List<Datum> tmpls = getLatestDatums("1", tomstarttime.get());
		if (tmpls != null && tmpls.size() > 0) {
		queue.addAll(tmpls);
		atomstarttime.set(tmpls.get(tmpls.size() - 1).getAddtime());
		tmpls=null;
		}
// 每隔一秒钟就检测一下是否有新的待处理的数据
		Thread.sleep(FetchInvertal);
		} catch (InterruptedException ex) {
	}
}
}
};
fetchdataservice.submit(fetchlist);
fetchdataservice.shutdown();

// 四个处理线程
for (int index = 0; index < 4; index++) {
		Runnable exewrite = new Runnable() {
		public void run() {
		int port = 8100 + wc.getAndIncrement();
		while (true) {
		try {
//如果队列里没有数据,会自动退出当前进程,为了防止进程被停掉,先判断是否有任务队列 
//队列里去看有没有任务,一直循环。
			if (queue.size() > 0) {
					Datum datum = queue.take();
					if (datum != null) {
					if (dobiz==true) {//此条件是伪码
										// 若成功,自动审核通过
						updateDatumState(datum.getUuid(), "3");
					} else {
						// 若不成功,自动审核不通过
						updateDatumState(datum.getUuid(), "2");
					}
				}
			} else {
					Thread.sleep(2000);//如果队列里没有任务了,睡眠两秒
			}
		} catch (InterruptedException e) {
	}
}
}
};
convertservice.submit(exewrite);
}
convertservice.shutdown();
}

private List<Datum> getLatestDatums(String status, String starttime) {
		//返回最新产生的待处理的任务列表
	}

private String updateDatumState(String uuid, String status) {
		//更新当前任务状态,打上已处理完成,或处理异常的标志
	}

 

 

 

 

 

分享到:
评论

相关推荐

    BlockingQueue队列自定义超时时间取消线程池任务

    首先,`BlockingQueue`是一个并发容器,它遵循先进先出(FIFO)原则,具有阻塞性质,当队列满时,生产者线程会被阻塞,直到有消费者取走元素;当队列空时,消费者线程会被阻塞,直到生产者放入新的元素。常用实现如`...

    10、阻塞队列BlockingQueue实战及其原理分析

    阻塞队列BlockingQueue是Java并发编程中一个重要的数据结构,它是线程安全的队列,主要用于生产者消费者模型中的数据交换。在Java的`java.util.concurrent`包中,提供了多种实现阻塞队列的类,如`ArrayBlockingQueue...

    10、阻塞队列BlockingQueue实战及其原理分析.pdf

    1. **线程池**:线程池中的任务队列通常是一个阻塞队列,当任务数超过线程池的容量时,新提交的任务将被放入任务队列中等待执行。 2. **生产者-消费者模型**:阻塞队列可以有效地解决生产者和消费者之间的并发问题,...

    java中线程队列BlockingQueue的用法

    - `take()`: 从队列中取出并返回第一个元素,如果队列为空,则会阻塞当前线程,直到队列有元素。 - `offer(E e)`: 尝试将元素放入队列,如果队列已满,可能会立即返回失败。 - `poll()`: 从队列中取出并返回第一...

    BlockingQueue(阻塞队列)详解

    - **take()**:从队列中取出一个对象,如果队列为空,则当前线程被阻塞,直到队列中有对象可取。 - **drainTo(Collection c)**:将队列中的所有元素移除并添加到指定集合中,可选参数为移除的最大数量。 **3. ...

    JVM优先级线程池做任务队列的实现方法

    在JVM优先级线程池中,任务队列通常使用BlockingQueue接口来实现。 知识点4:线程池的执行流程 线程池的执行流程主要包括以下步骤: 1. 收到请求后,参数校验后传入线程池排队。 2. 返回结果:“请求成功,正在...

    高效的实现队列

    链表提供更灵活的动态扩展性,每个节点包含元素值和指向下一个节点的指针。链表的头部是队头,尾部是队尾。出队只需改变头部指针,入队则在尾部添加新节点。这种方法不会受限于固定大小,但插入和删除操作需要遍历...

    线程----BlockingQueue

    - **poll(time)**: 从`BlockingQueue`中移除并返回队列的第一个元素。如果在指定时间内无法移除元素,则返回`null`。 - **take()**: 从`BlockingQueue`中移除并返回队列的第一个元素。如果队列为空,则调用该方法的...

    spring-blockingqueue:用Spring Boot阻止队列

    BlockingQueue是Java并发包`java.util.concurrent`中的一个接口,它提供了在队列满时阻塞插入操作和队列空时阻塞删除操作的能力。这种设计模式被称为生产者-消费者模型,它有效地解决了线程间的同步问题,避免了不必...

    BlockingQueue的使用

    BlockingQueue是Java并发编程中非常重要的一个数据结构,它是一个具有阻塞特性的队列,主要用于线程间的协作。在多线程环境下,BlockingQueue能够有效地实现生产者-消费者模式,提高了程序的并发性能和效率。本文将...

    java利用delayedQueue实现本地的延迟队列

    为了使用 DelayQueue,我们需要首先声明一个 Delayed 的对象,例如,我们可以声明一个 Task 对象, Task 对象实现了 Delayed 接口,用于表示一个具有延迟执行的任务。 ``` public class Task&lt;T extends Runnable&gt; ...

    一口气说出Java 6种延时队列的实现方法(面试官也得服)

    DelayQueue 是一个 BlockingQueue,无界阻塞队列,内部使用的是 PriorityQueue,PriorityQueue 使用完全二叉堆来实现队列元素排序。在向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件...

    java 线程池实现多并发队列后进先出

    线程池通过任务队列(工作队列)来管理待执行的任务。在"java 线程池实现多并发队列后进先出"这个主题中,我们关注的是线程池如何利用特定类型的队列来实现后进先出(LIFO,Last-In-First-Out)的行为。通常,线程池...

    阻塞队列阻塞队列阻塞队列

    ArrayBlockingQueue是一个基于固定数组实现的阻塞队列。它的容量在创建时就需要指定,并且不可变。队列内部采用双指针机制,一个指向头部,一个指向尾部,进行元素的入队和出队操作。ArrayBlockingQueue支持公平和非...

    支持多线程和泛型的阻塞队列

    在C++环境中,`BlockingQueue.h`可能是一个自定义实现的阻塞队列头文件。它可能会包含如下内容: 1. **模板类定义**:定义一个模板类`BlockingQueue&lt;T&gt;`,其中`T`代表队列中元素的类型。 2. **数据成员**:使用`std...

    线程池&&队列各类区别使用场景

    2. 任务队列(Task Queue):存储待执行任务的地方,线程从这里获取任务并执行。 3. 控制机制:如线程数量限制,超时策略等,用于控制线程的生命周期和任务调度。 线程池的类型有很多种,常见的有Java的...

    工作队列示例

    2. **工作队列(BlockingQueue)** - 一个阻塞队列,用于存储待处理的任务。当队列满时,生产者会阻塞直到队列有空位;当队列空时,消费者会阻塞直到有新任务可用。 3. **拒绝策略** - 当线程池和工作队列都满载时,...

    android自定义消息队列

    标题“android自定义消息队列”指的是开发者为Android应用创建一个自定义的消息传递机制,这通常涉及到线程间通信和任务调度。描述中的“实现的简单的开始,暂停功能”意味着这个自定义消息队列能够控制任务的执行...

    java 多线程 队列工厂

    例如,可以创建一个`QueueFactory`类,包含一个`createQueue()`方法,该方法根据传入的参数(如队列类型)返回相应类型的队列实例。这样,当需要改变队列实现时,只需修改创建队列的方式,而无需更改使用队列的代码...

    java队列

    另外,`ArrayDeque`类虽然名字中没有“队列”,但它也是一个高效的双端队列实现,可以作为队列使用。 在《Java队列》这篇博文中,可能详细探讨了以下知识点: 1. **Queue接口**:介绍`Queue`接口的基本方法,如`...

Global site tag (gtag.js) - Google Analytics