代码:
package conSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** * 并发单向队列简单实现 * * @author dingchd * * @param <T> */ public class NoBlockQueue<T> { private Node<T> header; private AtomicReference<Node<T>> tail; private AtomicInteger size; public NoBlockQueue() { header = new Node<T>(); tail = new AtomicReference<Node<T>>(header); size = new AtomicInteger(0); } /** * 存元素的过程分两步骤:原子更新尾节点的next、原子更新尾节点 如果第二部更新失败 则原子还原尾节点的next * * @return */ public void add(T t) { // 创建一个节点 Node<T> node = new Node<T>(); node.value = t; Node<T> curTail = null; for (;;) { curTail = tail.get(); if (curTail.next.get() == null) { if (casNext(curTail, null, node)) { if (casTail(curTail, node)) { size.incrementAndGet(); return; } else { curTail.next.getAndSet(null); } } } } } /** * 取元素分两部:原子更新header的next、第一个元素为尾节点,则将尾节点原子更新到header 如果第二部失败,则原则还原第一步 * * @return */ public T poll() { Node<T> first = null; T value = null; for (;;) { first = header.next.get(); Node<T> curTail = tail.get(); // 队列空 if (curTail == header && first == null) { break; } // 中间状态 if ((first != null && curTail == header) || (first == null && curTail != header)) { continue; } if (first != null) { // 如果tail指向第一个元素,则取队首后将tail更新至header if (curTail == first) { if (casHeaderNext(first, null)) { if (casTail(curTail, header)) { value = first.value; break; } else { header.next.getAndSet(first); } } } else { Node<T> second = first.next.get(); // 如果second为null,则说明当前获得的first已经被其他线程取走 if (second != null) { if (casHeaderNext(first, second)) { value = first.value; break; } } } } } if (value != null) { size.decrementAndGet(); } return value; } public boolean isEmpty() { return tail.get().value == null; } public T top() { Node<T> first = header.next.get(); return first == null ? null : first.value; } public int size() { return size.get(); } private final boolean casHeaderNext(Node<T> before, Node<T> after) { return header.next.compareAndSet(before, after); } private final boolean casTail(Node<T> before, Node<T> after) { return tail.compareAndSet(before, after); } private final boolean casNext(Node<T> node, Node<T> before, Node<T> after) { return node.next.compareAndSet(before, after); } static class Node<T> { T value; AtomicReference<Node<T>> next = new AtomicReference<Node<T>>(); } }
测试代码:
package conSet; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; public class NoBlockQueueTest2 { public static int SIZE = 10000; public static int C_NUM = 10; /** * @param args */ public static void main(String[] args) { for (int i = 0; i < 10000; i++) { test(); } } public static void test() { NoBlockQueue<String> queue = new NoBlockQueue<String>(); Queue<String> input = new ConcurrentLinkedQueue<String>(); Queue<String> output = new ConcurrentLinkedQueue<String>(); for (int i = 0; i < C_NUM; i++) { Runnable mp = new MP(queue, input); new Thread(mp).start(); } List<Thread> list = new ArrayList<Thread>(); for (int i = 0; i < C_NUM; i++) { Runnable mc = new MC(queue, output); Thread t = new Thread(mc); t.start(); list.add(t); } for (Thread t : list) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } ArrayList<String> sort1 = new ArrayList<String>(); ArrayList<String> sort2 = new ArrayList<String>(); while (!input.isEmpty()) { sort1.add(input.poll()); } while (!output.isEmpty()) { sort2.add(output.poll()); } Collections.sort(sort1); Collections.sort(sort2); if (sort1.size() != sort2.size()) { throw new RuntimeException("test error,size not equal"); } for (int i = 0; i < sort1.size(); i++) { String left = sort1.get(i); String right = sort2.get(i); if (!left.equals(right)) { throw new RuntimeException("test error,data wrong"); } } System.out.println("test ok size=" + queue.size()); } static class MP implements Runnable { NoBlockQueue<String> queue; Queue<String> input; public MP(NoBlockQueue<String> queue, Queue<String> input) { super(); this.queue = queue; this.input = input; } public void run() { for (int i = 0; i < NoBlockQueueTest2.SIZE; i++) { String s = UUID.randomUUID().toString(); input.add(s); queue.add(s); } } } static class MC implements Runnable { NoBlockQueue<String> queue; Queue<String> output; public MC(NoBlockQueue<String> queue, Queue<String> output) { super(); this.queue = queue; this.output = output; } public void run() { final int count = NoBlockQueueTest2.C_NUM * NoBlockQueueTest2.SIZE; for (;;) { String s = queue.poll(); if (s != null) { output.add(s); } else { if (output.size() == count) { break; } } } } } }
因为没有实现remove和itr功能,因此复杂度甚微,经过10000次的不断测试,尚未发现测试失败
相关推荐
`Concurrent`包包含了大量设计精良且线程安全的数据结构,如`ConcurrentHashMap`(线程安全的哈希映射),`ConcurrentLinkedQueue`(无界的并发队列)和`CopyOnWriteArrayList`(写时复制的列表)等。这些数据结构在...
队列操作实现是编程中一个常见的任务,尤其在处理并发和多线程环境时,队列能有效地组织和管理数据流。本篇文章将详细讲解如何自己编写队列操作,包括插入、删除以及清楚销毁队列和清空队列的区别。 首先,我们需要...
本文将深入探讨如何解决 Golang 并发性问题,并通过实际的练习来提升对并发编程的理解。 首先,Goroutines 是 Go 语言中的轻量级线程,它们比传统的操作系统线程更高效、更易于管理。创建一个 Goroutine 只需在函数...
此外,这也是一个很好的练习,可以帮助提升Java编程和并发控制的理解。 总的来说,这个项目涵盖了操作系统的核心概念、Java多线程编程以及并发控制的实战应用。通过动手实现,不仅能深化理论知识,还能锻炼编程技能...
6. **并发设计模式**:讨论适用于并发编程的设计模式,如生产者-消费者模式、读写锁模式和双端队列模式,以及如何在Java中实现这些模式。 7. **并发编程挑战**:探讨死锁、活锁、饥饿等并发问题及其解决策略,以及...
在压缩文件“quene.zip”中,可能包含关于队列的各种示例代码、讲解文档或者练习题,用于帮助学习者更好地理解和应用队列数据结构。通过解压并查看文件“quene”,可以深入探究队列的实现和应用场景,进一步提升编程...
以下将详细介绍如何利用Tesseract OCR实现多线程并发识别,以及可能涉及的相关技术点。 首先,理解Tesseract OCR的基本工作原理是至关重要的。Tesseract主要分为两个主要步骤:预处理和识别。预处理包括图像校正、...
│ 高并发编程第二阶段44讲、被动引用和类加载过程的练习巩固训练题.mp4 │ 高并发编程第二阶段45讲、ClassLoader加载阶段发生的故事.mp4 │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)...
练习学习 java 包 java.util.concurrent 并直观地展示并发双端队列的工作原理。
│ 高并发编程第二阶段44讲、被动引用和类加载过程的练习巩固训练题.mp4 │ 高并发编程第二阶段45讲、ClassLoader加载阶段发生的故事.mp4 │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)...
学习Scala并发编程,除了阅读指定的《Scala编程》或《Scala编程思想》书籍外,还可以参考Akka官方文档,参与开源项目,进行实战练习,以提升对并发编程的理解和实践能力。同时,理解并熟练使用Scala的`monad`,特别...
通过张孝祥的视频教程和对应的练习代码,你可以深入学习如何有效地使用这些并发工具,解决并发编程中的挑战,如死锁、竞态条件、活锁等问题,并提高程序的性能和可扩展性。实践中,你需要关注代码的线程安全性和效率...
这样的练习对于提升并发编程和系统级编程的能力非常有帮助。 总结起来,"fifo.rar"是一个关于使用Visual C++实现FIFO系统调用和线程间FIFO队列交互的实践案例。它涵盖了操作系统级别的命名管道通信以及C++中的多...
5. **实战案例**:书中通过具体的实战项目,如分布式锁实现、高并发下的数据库访问优化等,让读者将理论知识应用到实际场景中。 6. **课程结构**:根据压缩包内的文件名,我们可以推测课程分为多个章节,例如"第一...
simulation_new.rar_ssd5_ssd5 print_ssd5 答案_模拟队列打印"的压缩包文件,显然是针对某个课程SSD5(可能是Software System Development 5或者相关课程)的一项练习或作业,其核心目标是使用队列实现一个打印...
队列管理器(QueueRunner)是 TensorFlow 提供的一种机制,它允许我们在后台线程中加载和预处理数据,同时在主线程中执行模型的训练。 ### 2. TensorFlow 队列 队列是一种数据结构,用于存储张量并控制它们的流动...
1. **并发容器**:Java 提供了线程安全的集合类,如 `ConcurrentHashMap`(线程安全的哈希表)、`ConcurrentLinkedQueue`(无界并发队列)和 `CopyOnWriteArrayList`(读多写少场景下的线程安全列表)。 2. **`...
C++标准库提供了`std::queue`容器,可以用来实现客户队列。同时,还需要考虑队列的入队、出队操作的线程安全。 4. **银行业务逻辑**:系统应包含模拟银行核心业务的功能,如转账、存款和取款。转账涉及到两个账户...
在"3-6 并发服务器"的压缩包文件中,可能包含了实现并发服务器的相关代码示例、讲解文档或练习项目。这些资源可以帮助深入理解并发服务器的工作原理,并提供实践操作的机会。学习这些内容,你将能够熟练地编写和优化...
阻塞队列是实现线程间通信的关键组件,它确保生产者和消费者模型的正确运行。`put`和`take`方法用于在队列上执行阻塞操作,而`offer`和`poll`则提供非阻塞的替代方案。`drainTo`方法用于将队列中的所有元素转移到另...