`
dingchd
  • 浏览: 15669 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

并发队列实现练习

    博客分类:
  • java
阅读更多

代码:

package conSet;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 并发单向队列简单实现
 * 
 * @author dingchd
 * 
 * @param <T>
 */
public class NoBlockQueue<T> {
	private Node<T> header;
	private AtomicReference<Node<T>> tail;

	private AtomicInteger size;

	public NoBlockQueue() {
		header = new Node<T>();
		tail = new AtomicReference<Node<T>>(header);
		size = new AtomicInteger(0);
	}

	/**
	 * 存元素的过程分两步骤:原子更新尾节点的next、原子更新尾节点 如果第二部更新失败 则原子还原尾节点的next
	 * 
	 * @return
	 */
	public void add(T t) {
		// 创建一个节点
		Node<T> node = new Node<T>();
		node.value = t;

		Node<T> curTail = null;
		for (;;) {
			curTail = tail.get();

			if (curTail.next.get() == null) {
				if (casNext(curTail, null, node)) {
					if (casTail(curTail, node)) {
						size.incrementAndGet();
						return;
					} else {
						curTail.next.getAndSet(null);
					}
				}
			}
		}
	}

	/**
	 * 取元素分两部:原子更新header的next、第一个元素为尾节点,则将尾节点原子更新到header 如果第二部失败,则原则还原第一步
	 * 
	 * @return
	 */
	public T poll() {
		Node<T> first = null;
		T value = null;
		for (;;) {
			first = header.next.get();
			Node<T> curTail = tail.get();

			// 队列空
			if (curTail == header && first == null) {
				break;
			}

			// 中间状态
			if ((first != null && curTail == header)
					|| (first == null && curTail != header)) {
				continue;
			}

			if (first != null) {
				// 如果tail指向第一个元素,则取队首后将tail更新至header
				if (curTail == first) {
					if (casHeaderNext(first, null)) {
						if (casTail(curTail, header)) {
							value = first.value;
							break;
						} else {
							header.next.getAndSet(first);
						}
					}
				} else {
					Node<T> second = first.next.get();

					// 如果second为null,则说明当前获得的first已经被其他线程取走
					if (second != null) {
						if (casHeaderNext(first, second)) {
							value = first.value;
							break;
						}
					}
				}
			}
		}

		if (value != null) {
			size.decrementAndGet();
		}

		return value;
	}

	public boolean isEmpty() {
		return tail.get().value == null;
	}

	public T top() {
		Node<T> first = header.next.get();
		return first == null ? null : first.value;
	}

	public int size() {
		return size.get();
	}

	private final boolean casHeaderNext(Node<T> before, Node<T> after) {
		return header.next.compareAndSet(before, after);
	}

	private final boolean casTail(Node<T> before, Node<T> after) {
		return tail.compareAndSet(before, after);
	}

	private final boolean casNext(Node<T> node, Node<T> before, Node<T> after) {
		return node.next.compareAndSet(before, after);
	}

	static class Node<T> {
		T value;
		AtomicReference<Node<T>> next = new AtomicReference<Node<T>>();
	}
}

 
测试代码:

package conSet;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;

public class NoBlockQueueTest2 {
	public static int SIZE = 10000;
	public static int C_NUM = 10;

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		for (int i = 0; i < 10000; i++) {
			test();
		}
	}

	public static void test() {
		NoBlockQueue<String> queue = new NoBlockQueue<String>();

		Queue<String> input = new ConcurrentLinkedQueue<String>();
		Queue<String> output = new ConcurrentLinkedQueue<String>();

		for (int i = 0; i < C_NUM; i++) {
			Runnable mp = new MP(queue, input);
			new Thread(mp).start();
		}

		List<Thread> list = new ArrayList<Thread>();
		for (int i = 0; i < C_NUM; i++) {
			Runnable mc = new MC(queue, output);
			Thread t = new Thread(mc);
			t.start();
			list.add(t);
		}

		for (Thread t : list) {
			try {
				t.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		ArrayList<String> sort1 = new ArrayList<String>();
		ArrayList<String> sort2 = new ArrayList<String>();
		while (!input.isEmpty()) {
			sort1.add(input.poll());
		}
		while (!output.isEmpty()) {
			sort2.add(output.poll());
		}

		Collections.sort(sort1);
		Collections.sort(sort2);

		if (sort1.size() != sort2.size()) {
			throw new RuntimeException("test error,size not equal");
		}

		for (int i = 0; i < sort1.size(); i++) {
			String left = sort1.get(i);
			String right = sort2.get(i);
			if (!left.equals(right)) {
				throw new RuntimeException("test error,data wrong");
			}
		}

		System.out.println("test ok size=" + queue.size());
	}

	static class MP implements Runnable {
		NoBlockQueue<String> queue;
		Queue<String> input;

		public MP(NoBlockQueue<String> queue, Queue<String> input) {
			super();
			this.queue = queue;
			this.input = input;
		}

		public void run() {
			for (int i = 0; i < NoBlockQueueTest2.SIZE; i++) {
				String s = UUID.randomUUID().toString();
				input.add(s);
				queue.add(s);
			}
		}
	}

	static class MC implements Runnable {
		NoBlockQueue<String> queue;
		Queue<String> output;

		public MC(NoBlockQueue<String> queue, Queue<String> output) {
			super();
			this.queue = queue;
			this.output = output;
		}

		public void run() {
			final int count = NoBlockQueueTest2.C_NUM * NoBlockQueueTest2.SIZE;
			for (;;) {
				String s = queue.poll();
				if (s != null) {
					output.add(s);
				} else {
					if (output.size() == count) {
						break;
					}
				}
			}
		}
	}
}

 
因为没有实现remove和itr功能,因此复杂度甚微,经过10000次的不断测试,尚未发现测试失败

 

分享到:
评论

相关推荐

    Java并发编程进阶练习代码

    `Concurrent`包包含了大量设计精良且线程安全的数据结构,如`ConcurrentHashMap`(线程安全的哈希映射),`ConcurrentLinkedQueue`(无界的并发队列)和`CopyOnWriteArrayList`(写时复制的列表)等。这些数据结构在...

    队列操作实现

    队列操作实现是编程中一个常见的任务,尤其在处理并发和多线程环境时,队列能有效地组织和管理数据流。本篇文章将详细讲解如何自己编写队列操作,包括插入、删除以及清楚销毁队列和清空队列的区别。 首先,我们需要...

    Go-解决Golang并发性练习

    本文将深入探讨如何解决 Golang 并发性问题,并通过实际的练习来提升对并发编程的理解。 首先,Goroutines 是 Go 语言中的轻量级线程,它们比传统的操作系统线程更高效、更易于管理。创建一个 Goroutine 只需在函数...

    操作系统模拟:多级反馈队列

    此外,这也是一个很好的练习,可以帮助提升Java编程和并发控制的理解。 总的来说,这个项目涵盖了操作系统的核心概念、Java多线程编程以及并发控制的实战应用。通过动手实现,不仅能深化理论知识,还能锻炼编程技能...

    java高并发程序设计(原版电子书)

    6. **并发设计模式**:讨论适用于并发编程的设计模式,如生产者-消费者模式、读写锁模式和双端队列模式,以及如何在Java中实现这些模式。 7. **并发编程挑战**:探讨死锁、活锁、饥饿等并发问题及其解决策略,以及...

    quene队列.zip

    在压缩文件“quene.zip”中,可能包含关于队列的各种示例代码、讲解文档或者练习题,用于帮助学习者更好地理解和应用队列数据结构。通过解压并查看文件“quene”,可以深入探究队列的实现和应用场景,进一步提升编程...

    Tesseract OCR多线程并发识别案例

    以下将详细介绍如何利用Tesseract OCR实现多线程并发识别,以及可能涉及的相关技术点。 首先,理解Tesseract OCR的基本工作原理是至关重要的。Tesseract主要分为两个主要步骤:预处理和识别。预处理包括图像校正、...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段44讲、被动引用和类加载过程的练习巩固训练题.mp4 │ 高并发编程第二阶段45讲、ClassLoader加载阶段发生的故事.mp4 │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)...

    Concurrent Queue Exerciser:展示并发双端队列是如何工作的。-开源

    练习学习 java 包 java.util.concurrent 并直观地展示并发双端队列的工作原理。

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段44讲、被动引用和类加载过程的练习巩固训练题.mp4 │ 高并发编程第二阶段45讲、ClassLoader加载阶段发生的故事.mp4 │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)...

    Scala并发编程程.rar

    学习Scala并发编程,除了阅读指定的《Scala编程》或《Scala编程思想》书籍外,还可以参考Akka官方文档,参与开源项目,进行实战练习,以提升对并发编程的理解和实践能力。同时,理解并熟练使用Scala的`monad`,特别...

    张孝祥Java多线程与并发库高级应用视频教程练习代码

    通过张孝祥的视频教程和对应的练习代码,你可以深入学习如何有效地使用这些并发工具,解决并发编程中的挑战,如死锁、竞态条件、活锁等问题,并提高程序的性能和可扩展性。实践中,你需要关注代码的线程安全性和效率...

    fifo.rar_Visual C++ fifo_系统调用FIFO_队列fifo

    这样的练习对于提升并发编程和系统级编程的能力非常有帮助。 总结起来,"fifo.rar"是一个关于使用Visual C++实现FIFO系统调用和线程间FIFO队列交互的实践案例。它涵盖了操作系统级别的命名管道通信以及C++中的多...

    实战java高并发程序设计 分章 高清 带作业

    5. **实战案例**:书中通过具体的实战项目,如分布式锁实现、高并发下的数据库访问优化等,让读者将理论知识应用到实际场景中。 6. **课程结构**:根据压缩包内的文件名,我们可以推测课程分为多个章节,例如"第一...

    print_simulation_new.rar_ssd5_ssd5 print_ssd5 答案_模拟队列打印

    simulation_new.rar_ssd5_ssd5 print_ssd5 答案_模拟队列打印"的压缩包文件,显然是针对某个课程SSD5(可能是Software System Development 5或者相关课程)的一项练习或作业,其核心目标是使用队列实现一个打印...

    深度学习框架(TensorFlow)基础教程——第8章:队列与线程

    队列管理器(QueueRunner)是 TensorFlow 提供的一种机制,它允许我们在后台线程中加载和预处理数据,同时在主线程中执行模型的训练。 ### 2. TensorFlow 队列 队列是一种数据结构,用于存储张量并控制它们的流动...

    java并发编程实战范例合集new(由浅入深代码范例和详细说明).docx

    1. **并发容器**:Java 提供了线程安全的集合类,如 `ConcurrentHashMap`(线程安全的哈希表)、`ConcurrentLinkedQueue`(无界并发队列)和 `CopyOnWriteArrayList`(读多写少场景下的线程安全列表)。 2. **`...

    基于C++实现的银行业务模拟系统

    C++标准库提供了`std::queue`容器,可以用来实现客户队列。同时,还需要考虑队列的入队、出队操作的线程安全。 4. **银行业务逻辑**:系统应包含模拟银行核心业务的功能,如转账、存款和取款。转账涉及到两个账户...

    并发服务器

    在"3-6 并发服务器"的压缩包文件中,可能包含了实现并发服务器的相关代码示例、讲解文档或练习项目。这些资源可以帮助深入理解并发服务器的工作原理,并提供实践操作的机会。学习这些内容,你将能够熟练地编写和优化...

    Java并发程序设计教程.pdf

    阻塞队列是实现线程间通信的关键组件,它确保生产者和消费者模型的正确运行。`put`和`take`方法用于在队列上执行阻塞操作,而`offer`和`poll`则提供非阻塞的替代方案。`drainTo`方法用于将队列中的所有元素转移到另...

Global site tag (gtag.js) - Google Analytics