`
想起要学好java
  • 浏览: 8764 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

java 队列

    博客分类:
  • java
阅读更多

本文参考文章链接:

https://www.cnblogs.com/lemon-flm/p/7877898.html

 

Queue:一个队列就是一个先入先出(FIFO)的数据结构

Queue接口与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Deque接 口。

 

队列分为阻塞队列和非阻塞队列

非阻塞队列有PriorityQueue和ConcurrentLinkedQueue

PriorityQueue:维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。从队列里取元素时,会按排的序取出元素,而不是先进先出的默认结构。

ConcurrentLinkedQueue:是基于链接节点的、线程安全的队列。并发访问不需要同步。ConcurrentLinkedQueue 收集关于队列大小的信息会很慢,需要遍历队列。需要高并发访问并且不要求计算队列大小的,可以使用ConcurrentLinkedQueue这个队列。

 

阻塞队列实现了BlockingQueue这个接口,下面有五个实现类,分别为:

ArrayBlockingQueue :一个由数组支持的有界队列。

LinkedBlockingQueue :一个由链接节点支持的可选有界队列。

PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。

DelayQueue :一个由优先级堆支持的、基于时间的调度队列。

SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。

 

关于ArrayBlockingQueue和LinkedBlockingQueue的一点区别

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,而LinkedBlockingQueue之所以能够高效的处理并发数据,

还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据。

ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。

这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别,所以你一般自己测试的时候,可能ArrayBlockingQueue的并发性能可能比LinkedBlockingQueue还高点。

 

队列里的基本方法:

add        增加一个元索                     如果队列已满,则抛出一个IIIegaISlabEepeplian异常

remove   移除并返回队列头部的元素    如果队列为空,则抛出一个NoSuchElementException异常

element  返回队列头部的元素             如果队列为空,则抛出一个NoSuchElementException异常

offer       添加一个元素并返回true       如果队列已满,则返回false

poll         移除并返问队列头部的元素    如果队列为空,则返回null

peek       返回队列头部的元素             如果队列为空,则返回null

put         添加一个元素                      如果队列满,则阻塞

take        移除并返回队列头部的元素     如果队列为空,则阻塞

 

队列的类结构:



 

 

测试代码演示:

public class QueueFactory<T> {

private Queue<T> queue = null;

 

/**

* ConcurrentLinkedQueue --- 非阻塞队列,

* ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。

* 从队列的尾部添加元素,从头部删除元素。

* 可以多个生产者线程和多个消费者线程同时操作队列,因为队列是线程安全的。

* ConcurrentLinkedQueue 收集队列大小的信息会很慢,需要遍历队列。

* 对不需要获取队列大小并且多线程操作的队列可以使用ConcurrentLinkedQueue

* @return

*/

public ConcurrentLinkedQueue<T> createConcurrentLinkedQueueInstance() {

queue = new ConcurrentLinkedQueue<>();

return (ConcurrentLinkedQueue<T>) queue;

}

 

/**

* 优先队列PriorityQueue --- 非阻塞队列

* 维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位

* @return

*/

public PriorityQueue<T> createPriorityQueueInstance() {

queue = new PriorityQueue<>(new Comparator<T>() {

 

@Override

public int compare(T o1, T o2) {

String s1 = o1.toString();

String s2 = o2.toString();

return s1.compareTo(s2);

}

});

 

return (PriorityQueue<T>)queue;

}

 

/**

* 阻塞队列ArrayBlockingQueue,底层由数组支持

* ArrayBlockingQueue是一个有界队列,队列为空时取元素会阻塞,队列为满时放元素会阻塞。

* @param queueSize 队列大小

* @return

*/

public ArrayBlockingQueue<T> createArrayBlockingQueueInstance(int queueSize) {

queue = new ArrayBlockingQueue<>(queueSize);

return (ArrayBlockingQueue<T>)queue;

}

 

/**

* 阻塞队列LinkedBlockingQueue,底层由链表支持

* LinkedBlockingQueue每次往队列里新增和删除元素时会生产和销毁一个Node对象,在长时间高并发处理大量数据时会对GC有一定的影响

* @return

*/

public LinkedBlockingQueue<T> createLinkedBlockingQueueInstance() {

queue = new LinkedBlockingQueue<>();

return (LinkedBlockingQueue<T>)queue;

}

 

/**

* @return

*/

public <M extends Delayed> DelayQueue<M> createDepayQueueInstance() {

Queue<M> delayQueue = new DelayQueue<>();

return (DelayQueue<M>)delayQueue;

}

}

 

/**

 * 生产者线程

 * 负责把任务加入到队列中

 *

 * @param <T>

 */

public class ProduceThread<T> implements Runnable {

private Queue<T> queue;

private boolean isBlockingQueue;

private T t;

private BlockingQueue<T> blockingQueue;

 

public ProduceThread(Queue<T> queue, boolean isBlockQueue, T t) {

super();

this.queue = queue;

this.isBlockingQueue = isBlockQueue;

this.t = t;

if(isBlockQueue) {

blockingQueue = (BlockingQueue<T>)queue;

}

}

 

@Override

public void run() {

while(true) {

System.out.println("threadname = " + Thread.currentThread().getName() + " 生产任务开始");

try {

if(isBlockingQueue) {

blockingQueue.put(t);

} else {

queue.offer(t);

}

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("threadname = " + Thread.currentThread().getName() + " 生产 " + t.toString() + " 结束");

 

if(!(queue instanceof ConcurrentLinkedQueue)) {

System.out.println("threadname = " + Thread.currentThread().getName() + " 队列长度为 " + queue.size());

}

 

try {

Thread.sleep(200);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

 

}

 

/**

 * 消费者线程

 * 负责从队列里取出任务消费

 *

 * @param <T>

 */

public class ConsumeThread<T> implements Runnable {

private Queue<T> queue;

private boolean isBlockingQueue;

private BlockingQueue<T> blockingQueue;

 

public ConsumeThread(Queue<T> queue, boolean isBlockingQueue) {

super();

this.queue = queue;

this.isBlockingQueue = isBlockingQueue;

if(isBlockingQueue) {

blockingQueue = (BlockingQueue<T>)queue;

}

}

 

@Override

public void run() {

while(true) {

try {

System.out.println("threadname = " + Thread.currentThread().getName() + " 消费任务开始");

T t = null;

if(isBlockingQueue) {

t = blockingQueue.take();

if(t instanceof DelayTask) {

DelayTask d = (DelayTask)t;

System.out.println("threadname = " + Thread.currentThread().getName()

+ DateUtil.dateTimeToString(new Date(), DateUtil.pattern2) + " content = "

+ d.getContent());

}

} else {

t = queue.poll();

}

System.out.println("threadname = " + Thread.currentThread().getName() + " 消费 t = " + t + " 结束");

} catch (InterruptedException e) {

e.printStackTrace();

}

 

if(!(queue instanceof ConcurrentLinkedQueue)) {

System.out.println("threadname = " + Thread.currentThread().getName() + " 队列长度为 " + queue.size());

}

 

try {

Thread.sleep(700);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

 

}

 

public class DelayTask implements Delayed {

private long delayTime;

private String content;

private long removeTime;

 

public DelayTask(long delayTime, String content) {

super();

this.delayTime = delayTime;

this.content = content;

this.removeTime = TimeUnit.MILLISECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();

}

 

public long getDelayTime() {

return delayTime;

}

 

public void setDelayTime(long delayTime) {

this.delayTime = delayTime;

}

 

public String getContent() {

return content;

}

 

public void setContent(String content) {

this.content = content;

}

 

public long getRemoveTime() {

return removeTime;

}

 

public void setRemoveTime(long removeTime) {

this.removeTime = removeTime;

}

 

 

@Override

public int compareTo(Delayed o) {

long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);

return diff > 0 ? 1 : (diff == 0 ? 0 : -1);

}

 

@Override

public long getDelay(TimeUnit unit) {

return unit.convert(removeTime - System.currentTimeMillis(), unit);

}

 

}

 

public class QueueDemoTest {

private static QueueFactory<String> factory = new QueueFactory<>();

private static ExecutorService pool = ThreadPoolFactory.createThreadPoolInstance();

 

/**

* 测试ConcurrentLinkedQueue

* 多线程操作下安全的队列

*/

@Test

public void testConcurrentLinkedQueue() {

ConcurrentLinkedQueue<String> queue = factory.createConcurrentLinkedQueueInstance();

 

ProduceThread<String> produce = new ProduceThread<String>(queue, false, "hello concurrentLinkedQueue");

ConsumeThread<String> consume = new ConsumeThread<>(queue, false);

 

pool.execute(produce);

pool.execute(produce);

pool.execute(consume);

pool.execute(consume);

pool.execute(consume);

pool.execute(consume);

pool.execute(consume);

 

try {

Thread.sleep(30000);

} catch (InterruptedException e) {

e.printStackTrace();

}

 

pool.shutdown();

}

 

/**

* 测试优先队列PriorityQueue,

* 添加到队列里的元素会按照天然顺序或构造函数的排序规则进行排序

*/

@Test

public void testPriorityQueue() {

PriorityQueue<String> queue = factory.createPriorityQueueInstance();

 

queue.offer("baobao");

queue.offer("kezhi");

queue.offer("haoqilai");

queue.offer("shuihaojiao");

queue.offer("yuping");

 

ConsumeThread<String> consume = new ConsumeThread<>(queue, false);

pool.execute(consume);

 

try {

Thread.sleep(4000);

} catch (InterruptedException e) {

e.printStackTrace();

}

 

pool.shutdown();

 

}

 

/**

* 测试阻塞队列ArrayBlockingQueue,因为生产任务比消费任务快些,

* 所以队列满后,生产的线程是会阻塞的,等待消费的线程去消费掉队列里的消息

*/

@Test

public void testArrayBlockingQueue() {

ArrayBlockingQueue<String> queue = factory.createArrayBlockingQueueInstance(5);

 

ProduceThread<String> produce = new ProduceThread<String>(queue, true, "Hello ArrayBlockingQueue");

ConsumeThread<String> consume = new ConsumeThread<>(queue, true);

 

pool.execute(produce);

pool.execute(produce);

pool.execute(consume);

pool.execute(consume);

 

try {

Thread.sleep(20000);

} catch (InterruptedException e) {

e.printStackTrace();

}

 

pool.shutdown();

}

 

/**

* 测试阻塞队列LinkedBlockingQueue

*/

@Test

public void testLinkedBlockingQueue() {

LinkedBlockingQueue<String> queue = factory.createLinkedBlockingQueueInstance();

 

ProduceThread<String> produce = new ProduceThread<>(queue, true, "Hello LinkedBlockingQueue");

ConsumeThread<String> consume = new ConsumeThread<>(queue, true);

 

for(int i = 0; i < 2; i++) {

pool.execute(produce);

}

for(int i = 0; i < 8; i++) {

pool.execute(consume);

}

 

try {

Thread.sleep(30000);

} catch (InterruptedException e) {

e.printStackTrace();

}

 

pool.shutdown();

}

 

@Test

public void testDelayQueue() {

System.out.println(DateUtil.dateTimeToString(new Date(), DateUtil.pattern2));

 

DelayQueue<DelayTask> queue = factory.createDepayQueueInstance();

 

DelayTask d1 = new DelayTask(5000, "5秒到了任务执行");

DelayTask d2 = new DelayTask(10000, "10秒到了任务执行");

DelayTask d3 = new DelayTask(15000, "15秒到了任务执行");

DelayTask d4 = new DelayTask(20000, "20秒到了任务执行");

queue.put(d1);

queue.put(d2);

queue.put(d3);

queue.put(d4);

 

ConsumeThread<DelayTask> consume = new ConsumeThread<>(queue, true);

 

pool.execute(consume);

 

try {

Thread.sleep(25000);

} catch (InterruptedException e) {

e.printStackTrace();

}

 

pool.shutdown();

}

 

}

 

/**

 * 自定义线程池工厂

 *

 */

public class ThreadPoolFactory {

private static ExecutorService pool = null;

 

/**

* 创建自定义线程池

* @return

*/

public static ExecutorService createThreadPoolInstance() {

if (pool == null) {

pool = new ThreadPoolExecutor(20, 50, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(200),

new ThreadRejectedHandler());

}

return pool;

}

 

}

 

/**

 * 这种拒绝策略是不抛出异常也不丢弃新任务,只是加任务时处于阻塞状态

 *

 */

class ThreadRejectedHandler implements RejectedExecutionHandler {

 

@Override

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

if(!executor.isShutdown()) {

try {

executor.getQueue().put(r);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

 

}

 

  • 大小: 9.7 KB
分享到:
评论

相关推荐

    java队列

    在《Java队列》这篇博文中,可能详细探讨了以下知识点: 1. **Queue接口**:介绍`Queue`接口的基本方法,如`enqueue()`(通常通过`add()`或`offer()`实现)和`dequeue()`(通常通过`remove()`或`poll()`实现)。还...

    java队列模拟实现

    Java队列模拟实现是一个典型的计算机科学中的数据结构应用,它主要涉及了Java编程语言和队列数据结构。在这个工程中,开发者已经创建了一个基于图形用户界面(GUI)的应用程序,用于演示和操作队列的各种功能。以下...

    Java队列实现,数据结构

    在这个Java队列实现的数据结构作业练习中,我们将会探讨如何使用Java来创建一个简单的队列,并分析`Queue.java`和`Node.java`这两个文件可能包含的内容。 首先,`Queue.java`很可能是实现队列接口或类的文件。在...

    java定时器\多线程(池)\java队列Demo

    Java编程语言在处理并发任务和时间调度方面提供了丰富的工具,其中`java定时器`、`多线程(池)`和`java队列`是核心概念。让我们深入探讨这些主题。 ### 1. Java定时器(java.util.Timer) `java.util.Timer` 类允许...

    java队列源码

    在这个"java队列源码"中,我们可以看到如何利用Java来实现多线程环境下的安全队列,特别适用于抢购等高并发场景。以下将详细讨论队列、多线程以及源码实现的关键知识点。 1. **Java 队列接口与实现** - Java 提供...

    java队列实现(顺序队列、链式队列、循环队列)

    Java中的队列是一种数据结构,它遵循先进先出(FIFO)原则,即最先插入的元素将是最先被删除的。在Java中,队列的实现主要有三种:顺序队列、链式队列和循环队列。下面我们将详细探讨这三种队列的实现方式。 1. **...

    Using_Java_Queue.zip_java队列

    Java队列是Java集合框架中的一个关键组成部分,主要用于在多个线程之间同步数据传输或实现异步处理。队列遵循先进先出(FIFO)的原则,即最早添加到队列中的元素将首先被处理。本教程将深入探讨如何在Java中使用队列...

    java 队列使用

    java 队列使用,次例子是一个模拟网络爬虫工作大致流程的小例子,里面没有具体的爬取的实现,只是对爬取的流程的模拟,使用到了java 的 ArrayBlockingQueue、ConcurrentHashMap、 这2个类和java 的 volatile 关键字...

    JAVA 队列 实现卡车运输过程模拟

    下面我们将详细讨论如何使用Java队列来实现这个模拟。 首先,我们可以创建一个`Truck`类,表示每辆卡车,包含卡车编号、装载状态、当前重量等属性。当卡车到达装煤设备时,它会被添加到一个装载队列中。Java中可以...

    java定时器+多线程(池)+java队列Demo

    在“java定时器+多线程(池)+java队列Demo”这个项目中,我们将深入探讨这三个核心概念。 1. **Java定时器(java.util.Timer)**:`Timer`类用于调度周期性的任务执行。它可以安排一个任务在未来某个时间点或定期...

    DuiLie.rar_java 队列

    在IT行业中,队列是一种...以上就是使用LinkedList实现Java队列的全部操作及其背后的原理。理解这些知识点对于进行高效编程和解决复杂问题至关重要。在实际项目中,队列常用于任务调度、消息传递和并发控制等多个场景。

    java队列实现方法(顺序队列,链式队列,循环队列)

    Java 队列实现方法 Java 队列是一种常用的数据结构,用于实现先进先出(FIFO)或后进先出(LIFO)的操作。Java 中提供了多种实现队列的方法,包括顺序队列、链式队列和循环队列等。下面我们将详细介绍每种队列的...

    java队列实现

    在Java编程中,队列是一种重要的数据结构,用于存储和管理元素,特别是在处理并发和多线程场景下。本篇文章将深入探讨如何使用Java来实现队列,特别是应用于限制线程执行数量的任务调度。 首先,我们需要理解队列的...

    java 队列 链表 栈

    本篇文章将详细讲解Java中的队列、链表和栈,这些概念是许多初学者和专业人士都需要掌握的基础知识。 首先,我们来谈谈队列。队列是一种先进先出(First In First Out,简称FIFO)的数据结构,类似于现实生活中的...

    Java 队列实现原理及简单实现代码

    以下是一个简单的Java队列实现,使用数组作为底层数据结构: ```java package 栈和队列; public class Queue { private int maxSize; private long[] queArray; private int front; private int rear; ...

    Java 的 HTTP 文件队列下载.rar_java http_java 下载_java 队列

    在Java编程中,HTTP文件队列下载是一种常见的网络操作,特别是在大数据量或者多文件下载的场景下。HTTP协议是互联网上应用最为广泛的一种网络传输协议,而Java作为一门跨平台的编程语言,提供了多种方式来处理HTTP...

    Java Design Demo -简单的队列

    在IT领域,尤其是在Java编程中,队列是一种基础且重要的数据结构。队列遵循“先进先出”(First In First Out, FIFO)原则,它的主要功能是存储和管理元素序列,使得新添加的元素(入队)总是在序列末尾,而移除元素...

    java-Using-Array-for-Queue.zip_java队列实现

    在Java编程中,队列是一种线性数据结构,它遵循先进先出(FIFO)的原则。在这个场景中,我们探讨了两种使用Java实现队列的方法:一种是通过自定义的数组实现,另一种是利用内置的Vector类。接下来,我们将详细讨论这...

    java队列Java系列2021.pdf

    Java中的队列是一种数据结构,遵循“先入先出”(FIFO)的原则,主要用于实现生产者消费者模式。队列在并发编程中的应用尤为广泛,因为在多线程环境下,多个生产者或消费者可能同时对队列进行操作,因此线程安全的...

    java多线程加队列上传文件_后台处理

    ### Java多线程加队列上传文件_后台处理 #### 概述 本文将详细介绍一个基于Java实现的多线程文件上传系统,并结合队列管理技术来优化后台处理流程。该系统通过创建多个线程来并行处理客户端的文件上传请求,同时...

Global site tag (gtag.js) - Google Analytics