`

阻塞队列步步升华

    博客分类:
  • Java
 
阅读更多
1、首先我们定义一个简单的队列。这个队列可以不断的往队列中放数据,一个放数据的线程,一个取数据的线程。队列可以无限大,所以这样会有内存泄露的危险。
package com.zte;

import java.util.Random;
import java.util.Vector;


public class SimpleQueue {

	Vector<Integer>  vector = new Vector<Integer>();
	
	public synchronized void put(){
		System.out.println(Thread.currentThread().getName() + ",开始放数据。");
		vector.add(new Random().nextInt(1000));
		System.out.println(Thread.currentThread().getName() + ",已放完数据。");
		System.out.println("队列中还有:" + vector.size());
		this.notify(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的
	}
	
	public synchronized Integer get(){
		System.out.println(Thread.currentThread().getName() + ",开始取数据。");
		while(vector.size() == 0) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}					
		}
		
		Integer integer = vector.firstElement();
		vector.remove(integer);
		System.out.println(Thread.currentThread().getName() + ",已取完数据。");
		System.out.println("队列中还有:" + vector.size());
		return integer;	
	}
	

	public static void main(String[] args){
		final SimpleQueue simpleQueue = new SimpleQueue();
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				try {
					while(true){
					simpleQueue.get();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while(true){
			        simpleQueue.put();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
			}
		}).start();
	
	}

}



2、我们定义一个简单的队列。三个放数据的线程,三个取数据的线程。队列可以无限大,如果放数据的线程远多余取数据的线程,这样就会大大增加内存泄露的危险
package com.zte;

import java.util.Random;


public class MitThreadBufferArraySimpleQueue {

	Integer[] array = new Integer[20];
	int putPostion;
	int getPostion;
	int count;
	public synchronized void put(){
		if(count == array.length) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + ",开始放数据。");
		array[putPostion] = new Random().nextInt(1000);
		putPostion ++;
		count ++;
		if(putPostion == array.length) putPostion = 0;
		System.out.println(Thread.currentThread().getName() + ",已放完数据。");
		System.out.println("队列中还有:" + count +"个对象");
		this.notify(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的
	}
	
	public synchronized Integer get(){
		if(count == 0) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		System.out.println(Thread.currentThread().getName() + ",开始取数据。");
	    Integer intObject = array[getPostion];
	    getPostion ++ ;
	    count--;
	    if(getPostion == array.length) getPostion = 0;
		
		System.out.println(Thread.currentThread().getName() + ",已取完数据。");
		System.out.println("队列中还有:" + count + "个对象");
		this.notify();
		return intObject;	
	}
	

	public static void main(String[] args){
		final MitThreadBufferArraySimpleQueue bufferArraySimpleQueue= new MitThreadBufferArraySimpleQueue();
		for(int i=0 ; i<3;i++) {
		new Thread(new Runnable() {			
			@Override
			public void run() {
				try {
					while(true){
						bufferArraySimpleQueue.get();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();
		}
		
		for(int j=0 ; j<3;j++) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while(true){
						bufferArraySimpleQueue.put();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
			}
		}).start();
		}
	}

}


3、定义一个带有缓冲区的队列,这样可以避免内存溢出。虽然避免了内存溢出问题,但拥塞现象严重
package com.zte;

import java.util.Random;


public class MitThreadBufferArraySimpleQueue {

	Integer[] array = new Integer[20];
	int putPostion;
	int getPostion;
	int count;
	public synchronized void put(){
		if(count == array.length) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + ",开始放数据。");
		array[putPostion] = new Random().nextInt(1000);
		putPostion ++;
		count ++;
		if(putPostion == array.length) putPostion = 0;
		System.out.println(Thread.currentThread().getName() + ",已放完数据。");
		System.out.println("队列中还有:" + count +"个对象");
		this.notify(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的
	}
	
	public synchronized Integer get(){
		if(count == 0) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		System.out.println(Thread.currentThread().getName() + ",开始取数据。");
	    Integer intObject = array[getPostion];
	    getPostion ++ ;
	    count--;
	    if(getPostion == array.length) getPostion = 0;
		
		System.out.println(Thread.currentThread().getName() + ",已取完数据。");
		System.out.println("队列中还有:" + count + "个对象");
		this.notify();
		return intObject;	
	}
	

	public static void main(String[] args){
		final MitThreadBufferArraySimpleQueue bufferArraySimpleQueue= new MitThreadBufferArraySimpleQueue();
		for(int i=0 ; i<3;i++) {
		new Thread(new Runnable() {			
			@Override
			public void run() {
				try {
					while(true){
						bufferArraySimpleQueue.get();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();
		}
		
		for(int j=0 ; j<3;j++) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while(true){
						bufferArraySimpleQueue.put();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
			}
		}).start();
		}
	}

}


4、考虑带缓冲区的队列,启动多个线程存放数据,出现数据脏数据。
package com.zte;

import java.util.Random;


public class ConditionMitThreadBufferArraySimpleQueue {

	Integer[] array = new Integer[20];
	int putPostion;
	int getPostion;
	int count;
	public synchronized void put(){
		if(count == array.length) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + ",开始放数据。");
		array[putPostion] = new Random().nextInt(1000);
		putPostion ++;
		count ++;
		if(putPostion == array.length) putPostion = 0;
		System.out.println(Thread.currentThread().getName() + ",已放完数据。");
		System.out.println("队列中还有:" + count +"个对象");
		this.notify(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的
	}
	
	public synchronized Integer get(){
		if(count == 0) {
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		System.out.println(Thread.currentThread().getName() + ",开始取数据。");
	    Integer intObject = array[getPostion];
	    getPostion ++ ;
	    count--;
	    if(getPostion == array.length) getPostion = 0;
		
		System.out.println(Thread.currentThread().getName() + ",已取完数据。");
		System.out.println("队列中还有:" + count + "个对象");
		this.notify();
		return intObject;	
	}
	

	public static void main(String[] args){
		final ConditionMitThreadBufferArraySimpleQueue bufferArraySimpleQueue= new ConditionMitThreadBufferArraySimpleQueue();
		for(int i=0 ; i<3;i++) {
		new Thread(new Runnable() {			
			@Override
			public void run() {
				try {
					while(true){
						bufferArraySimpleQueue.get();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();
		}
		
		for(int j=0 ; j<3;j++) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while(true){
						bufferArraySimpleQueue.put();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
			}
		}).start();
		}
	}

}


运行部分结果:
队列中还有:-2个对象
Thread-0,开始取数据。
Thread-0,已取完数据。
队列中还有:-3个对象
Thread-1,开始取数据。
Thread-1,已取完数据。
队列中还有:-4个对象
Thread-5,开始放数据。
Thread-5,已放完数据。
队列中还有:-3个对象
Thread-4,开始放数据。
Thread-4,已放完数据。
队列中还有:-2个对象
Thread-3,开始放数据。
Thread-3,已放完数据。
队列中还有:-1个对象
Thread-0,开始取数据。
Thread-0,已取完数据。
队列中还有:-2个对象
Thread-2,开始取数据。
Thread-2,已取完数据。
队列中还有:-3个对象
Thread-0,开始取数据。
Thread-0,已取完数据。
队列中还有:-4个对象

5、运用JAVA5中锁与Condition机制,完美解决问题
package com.zte;

import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class BufferArraySimpleQueue {

	Integer[] array = new Integer[20];
	int putPostion;
	int getPostion;
	int count;
	Lock lock = new ReentrantLock();
	Condition readCondition = lock.newCondition();
	Condition writeCondition = lock.newCondition();
	public  void put(){
	 lock.lock();
	  try{	
		if(count == array.length) {
			try {
				writeCondition.await();				
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println(Thread.currentThread().getName() + ",开始放数据。");
		array[putPostion] = new Random().nextInt(1000);
		putPostion ++;
		count ++;
		if(putPostion == array.length) putPostion = 0;
		System.out.println(Thread.currentThread().getName() + ",已放完数据。");
		System.out.println("队列中还有:" + count +"个对象");
		readCondition.signal(); //只是通知另外的线程你可以获得锁,如果本线程获取了锁,其他线程也是获取不到锁的
	  }finally {
		  lock.unlock();
	  }
	  }
	
	public  Integer get(){
		lock.lock();
		try{
		if(count == 0) {
			try {
			   readCondition.await();	
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		System.out.println(Thread.currentThread().getName() + ",开始取数据。");
	    Integer intObject = array[getPostion];
	    getPostion ++ ;
	    count--;
	    if(getPostion == array.length) getPostion = 0;
		
		System.out.println(Thread.currentThread().getName() + ",已取完数据。");
		System.out.println("队列中还有:" + count + "个对象");
		writeCondition.signal();
		return intObject;	
		}finally{
			lock.unlock();
		}
		}
	

	public static void main(String[] args){
		final BufferArraySimpleQueue bufferArraySimpleQueue= new BufferArraySimpleQueue();
	
		new Thread(new Runnable() {			
			@Override
			public void run() {
				try {
					while(true){
						bufferArraySimpleQueue.get();
					Thread.sleep(new Random().nextInt(1000));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}).start();
		
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					while(true){
						bufferArraySimpleQueue.put();
					Thread.sleep(new Random().nextInt(500));
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
			}
		}).start();
	
	}

}
分享到:
评论

相关推荐

    支持多线程和泛型的阻塞队列

    阻塞队列是一种在多线程编程中广泛使用的并发数据结构,它在计算机科学和编程领域,特别是Java和C++等面向对象语言中扮演着重要角色。标题中的“支持多线程和泛型的阻塞队列”意味着我们讨论的是一个能够同时处理多...

    阻塞队列阻塞队列阻塞队列

    在Java编程语言中,阻塞队列是一种线程安全的数据结构,它在多线程并发控制中发挥着重要作用。阻塞队列的核心特性是当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;同样,当队列满时,试图插入...

    14-阻塞队列BlockingQueue实战及其原理分析二.pdf

    阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...

    java模拟阻塞队列

    阻塞队列结合了队列的数据结构与线程同步机制,使得生产者可以在队列满时被阻塞,而消费者则在队列空时被阻塞,这样可以避免无效的循环检查,提高程序的运行效率。 首先,我们需要了解什么是生产者-消费者模型。在...

    c++11 实现的阻塞队列

    C++11 实现的阻塞队列 C++11 中的阻塞队列是指在多线程环境下,实现生产者消费者模式的队列。阻塞队列的实现需要解决两个问题:线程安全和阻塞机制。在 C++11 中,我们可以使用 std::mutex、std::condition_...

    并发-线程池和阻塞队列.pdf

    阻塞队列的一个重要特点是线程在队列满时加入元素会阻塞,在队列空时取出元素也会阻塞,直到有空间或元素可用。Java中的ArrayBlockingQueue和LinkedBlockingQueue都是典型的阻塞队列实现。 阻塞队列为线程间通信...

    BlockingQueue(阻塞队列)详解

    - **定义**:阻塞队列是一种特殊的队列,除了具有队列的基本特性外,还提供了额外的阻塞行为,即当队列空时,从队列中获取元素的操作将会阻塞,等待队列变得非空;当队列满时,向队列插入元素的操作也会阻塞,等待...

    Java实现简单的阻塞队列2种方式

    在Java编程中,阻塞队列是一种特殊类型的并发数据结构,它在多线程环境中的应用广泛,主要用于线程间的协作通信。阻塞队列在队列满时会阻止生产者线程添加元素,在队列空时会阻止消费者线程取出元素,直到条件满足...

    并发-线程池和阻塞队列

    在Java编程中,"并发-线程池和阻塞队列"是两个核心概念,它们在多线程环境下处理任务调度和数据同步方面发挥着重要作用。线程池是一种管理线程资源的有效方式,而阻塞队列则常用于线程间通信和数据共享。 线程池...

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    同时,作为阻塞队列,当生产者尝试向满队列添加元素时,或者消费者尝试从空队列中获取元素时,线程会被阻塞,直到队列有可用空间或数据,这大大简化了多线程同步的问题。 在生产者/消费者模式中,生产者通常通过`...

    10、阻塞队列BlockingQueue实战及其原理分析.pdf

    ### 10、阻塞队列BlockingQueue 实战及其原理分析 #### 一、阻塞队列概述 阻塞队列(BlockingQueue)是Java语言中`java.util.concurrent`包下提供的一种重要的线程安全队列。它继承自`Queue`接口,并在此基础上...

    剖析Java中阻塞队列的实现原理及应用场景

    Java中的阻塞队列是一种特殊的线程安全的数据结构,它在多线程环境下用于高效地处理生产者-消费者问题。阻塞队列的核心特性在于当队列为空时,尝试获取元素的线程会被阻塞,直到队列中有元素可用;同样,当队列满时...

    阻塞队列(Blocking Queue)是一个支持两个附加操作的队列.txt

    阻塞队列的主要特点在于它支持两个额外的条件操作:当队列为空时,尝试从队列中取元素的操作会被阻塞,直到队列中出现新的元素;同样地,当队列已满时,尝试向队列中添加元素的操作也会被阻塞,直到队列中出现可用...

    java线程聊天室(阻塞队列实现)

    【Java线程聊天室(阻塞队列实现)】 在Java编程中,多线程是构建并发应用程序的关键技术。在创建一个线程聊天室时,我们通常会涉及到多个线程之间的交互,例如用户发送消息、接收消息以及处理网络通信等。而阻塞...

    10、阻塞队列BlockingQueue实战及其原理分析

    阻塞队列BlockingQueue是Java并发编程中一个重要的数据结构,它是线程安全的队列,主要用于生产者消费者模型中的数据交换。在Java的`java.util.concurrent`包中,提供了多种实现阻塞队列的类,如`ArrayBlockingQueue...

    高性能阻塞队列的数据结构创新.pptx

    ### 高性能阻塞队列的数据结构创新 #### 一、阻塞队列的概念与应用 **阻塞队列**是一种特殊的队列,它具备线程安全的特点,并且当队列为空时,从队列中获取元素的操作会被阻塞;同样地,当队列满时,向队列中添加...

    Java并发编程(21)并发新特性-阻塞队列和阻塞栈(含代

    在Java并发编程中,阻塞队列和阻塞栈是两个重要的并发数据结构,它们在多线程环境下的高效通信和资源管理中扮演着至关重要的角色。这些数据结构源自Java的并发包`java.util.concurrent`,是实现并发设计模式如生产者...

    Java并发编程:阻塞队列

     在前面我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。  使用非阻塞队列的时候有一个很大问题是:它不会对当前线程产生阻塞,那么在面对类似...

    java阻塞队列实现原理及实例解析.docx

    Java 阻塞队列(Blocking Queue)是一种特殊类型的并发数据结构,它在多线程编程中扮演着重要的角色。阻塞队列的核心特性在于其在队列为空或满时能够自动阻塞线程,从而实现线程间的同步和通信。这种机制使得生产者...

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    2. **队列实现**:接着可能会讲解几种具体的阻塞队列实现,比如`ArrayBlockingQueue`是基于数组的有界队列,`LinkedBlockingQueue`基于链表,以及`PriorityBlockingQueue`是无界的优先级队列,它们各自的特点和适用...

Global site tag (gtag.js) - Google Analytics