import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class PriorityQueue< T >{ /** Main lock guarding all access */ final ReentrantLock lock = new ReentrantLock( ); /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition( ); /** Condition for waiting puts */ @SuppressWarnings ( "unused" ) private final Condition notFull = lock.newCondition( ); private final AtomicInteger count = new AtomicInteger( ); private List< Queue< T > > lbqList; private int capacity; private Priority< T > priority; private boolean queue; public PriorityQueue( int capacity , Priority< T > priority , boolean queue ) { this.capacity = capacity; this.priority = priority; this.queue = queue; init( ); } private void init( ) { lbqList = new ArrayList<>( this.capacity ); for ( int i = 0 ; i < capacity ; i++ ) { if ( queue ) { lbqList.add( new LinkedList<>( ) ); } else { lbqList.add( new LinkedBlockingQueue<>( ) ); } } } public int add( T t ) throws Exception { int pr = priority.getPriority( t ); Queue< T > queue = lbqList.get( pr ); if ( count.get( ) > 0 ) { count.getAndIncrement( ); queue.add( t ); if ( count.get( ) <= 1 ) { final ReentrantLock lock = this.lock; try { lock.lockInterruptibly( ); notEmpty.signal( ); } catch ( Exception e ) { return -1; } finally { lock.unlock( ); } } } else { final ReentrantLock lock = this.lock; try { lock.lockInterruptibly( ); queue.add( t ); count.getAndIncrement( ); notEmpty.signal( ); return pr; } catch ( Exception e ) { throw e; } finally { lock.unlock( ); } } return pr; } public T get( ) throws InterruptedException { T t; if ( count.get( ) > 0 ) { t = getData(); if(t != null){ return t; } } final ReentrantLock lock = this.lock; lock.lock( ); try { while ( ( t = getData( ) ) == null ) notEmpty.await( ); return t; } finally { lock.unlock( ); } } public static interface Priority< T > { int getPriority( T t ); } public T getData(){ int i = 0; T t; for( ; ;){ t = lbqList.get( i++ ).poll( ); if( t != null){ count.getAndDecrement( ); return t; } if( i >= capacity ){ return null; } } } public int size( ) { return count.get( ); } public boolean isEmpty( ) { return count.get( ) == 0; } }
test代码
@Test public void testQ( ) throws InterruptedException { Priority< Notification > priority = new Priority< Notification >( ) { @Override public int getPriority( Notification t ) { return t.getPriority( ); } }; PriorityQueue< Notification > rq = new PriorityQueue<>( 6 , priority , false ); ThreadPoolExecutor te = new ThreadPoolExecutor( 8 , 8 , 100 , TimeUnit.MICROSECONDS , new LinkedBlockingQueue< Runnable >( ) ); ThreadPoolExecutor ted = new ThreadPoolExecutor(4 , 4 , 100 , TimeUnit.MICROSECONDS , new LinkedBlockingQueue< Runnable >( ) ); int sum = 100; int read = sum , write = sum; boolean boo = true; AtomicLong at = new AtomicLong( ); for ( ; ; ) { int i = rr.nextInt( 10 ); if ( boo && read > 0) { for (int j = 0 ; i >= j ; j++ ) { te.execute( new Runnable( ) { @Override public void run( ) { try { Notification nf = new Notification( ); nf.setId( at.getAndIncrement( ) ); //nf.setPriority( r.nextInt( 6 ) ); nf.setPriority( nf.getId( )%30==0?5:1 ); rq.add( nf ); System.out.println( String.format( " read tid : %d data : %s" ,Thread.currentThread( ).getId( ) , nf ) ); } catch ( Exception e ) { e.printStackTrace( ); } } } ); } read = read - i; boo = !boo; } else if( !boo && write > 0){ for ( int j = 0 ; i >= j ; j++ ) { ted.execute( new Runnable( ) { @Override public void run( ) { try { Notification nf = rq.get( ); System.out.println( String.format( " write tid : %d data : %s" ,Thread.currentThread( ).getId( ) , nf ) ); } catch ( InterruptedException e ) { // TODO 自动生成的 catch 块 e.printStackTrace( ); } } } ); } boo = !boo; write = write - i; } if ( read <= 0 && write <= 0 ) { break; } if( read <= 0){ boo = false; } if( write <= 0 ){ boo = true; } } System.out.println( "---------------------------------------------------------------------" ); Thread.sleep( 1000000000 ); }
相关推荐
使用顺序存储实现优先级队列,展示优先级队列和普通队列的区别之处。
优先级队列是一种特殊的数据结构,它在许多高级算法和数据处理中扮演着核心角色。在计算机科学中,我们通常使用优先级队列来解决那些需要处理具有不同优先级任务的问题。与普通队列遵循“先进先出”(FIFO)原则不同...
优先级队列是一种特殊的数据结构,它按照优先级顺序存储元素,最高优先级的元素总是在队列的前端。在计算机科学中,堆通常被用来实现优先级队列。堆是一种完全二叉树,其中每个父节点的值都大于或等于(最大堆)或...
例如,我们可以用优先级队列来实现一个简单的任务调度器,优先处理优先级高的任务。 总之,`PriorityQueue`通过堆数据结构提供了高效、灵活的优先级管理功能。在理解和使用过程中,应充分考虑其特性,以便在需要...
数据结构与算法之优先级队列 优先级队列(Priority Queue)是一种特殊的队列,它不同于一般的先进先出队列,而是每次从队列中取出的是具有最高优先权的元素。优先级队列的实现方式有多种,可以基于有序顺序表、无序...
用c语言实现的,简单易懂,希望对大家有用。
优先级队列cpp文件PriorityQueue.cpp
优先级队列的设计和实现 在计算机科学中,数据结构是一个非常重要的概念,它直接影响着算法的效率和程序的可读性。其中,优先级队列是一种特殊的数据结构,它支持对数据对象的优先级访问和操作。本章将对优先级队列...
算法中优先级队列问题...用堆排序的算法来做的例子
学生成绩管理系统是一个简单的应用示例,它可以使用小大根交替堆实现的双端优先级队列来存储和管理学生的成绩信息。在该系统中,学生的成绩信息可以被插入和删除,而优先级机制则可以根据学生的成绩排名来进行动态...
dheap(插入元素)、pop_max_dheap(删除最大元素)、pop_min_dheap(删除最小元素),is_dheap(堆验证)五个泛型算法,在此基础上实现了一个能在对数时间内获取最大和最小元素的优先级队列,相当于原stl优先级队列...
在毕业设计中,MATLAB 被广泛应用于各种科学计算和数据分析任务,而优先级队列(Priority Queue)是数据结构中的一个重要概念,它在许多算法和应用中扮演着核心角色。优先级队列通常不直接内置在MATLAB中,但可以...
优先级队列是一种特殊的数据结构,它在处理数据时遵循“优先级”原则,即具有较高优先级的元素会先于低优先级的元素被处理。在计算机科学中,优先级队列通常用于调度任务、事件处理和其他需要快速访问最高优先级元素...
在"os.rar_优先级 队列"这个主题中,我们将深入探讨操作系统如何利用优先级队列来管理进程,确保高效和公平地执行任务。 首先,进程是操作系统中运行的程序实例。在多任务环境中,操作系统需要决定哪个进程应该获得...
"优先级队列" 在数据结构中,优先级队列是一种特殊的队列,队列中的每个元素都有一个优先级,队列的出队顺序是按照元素的优先级从高到低的顺序。优先级队列可以用来解决许多实际问题,如任务调度、资源分配等。 本...
优先级队列是一种特殊的数据结构,它按照元素的优先级进行操作,通常最高优先级的元素会被优先处理。在C++标准库中,`<queue>`和`<priority_queue>`头文件提供了对队列和优先级队列的支持。在本讨论中,我们将深入...