`
benx
  • 浏览: 276224 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java concurrent包理解

    博客分类:
  • java
 
阅读更多

Java并发处理

<!--[if !supportLists]-->1、  <!--[endif]-->简介

了解java并发之前先了解java内存模型,java内存有主内存和工作内存,比有个对象Person,有实例变量name,那么Person的实例中name属性就是在主内存中,如果多个线程同时操作Person,那么每个线程会有Person属性name的副本放在每个线程的工作内存中,每个工作内存修改后会同步到主内存中,但是这里就有问题:一致性问题和可见性问题,导致数据丢失或脏数据。

为了解决这个问题,引入了同步机制synchronized,是多个线程同时只有一个线程可以操作共享变量(主内存对象)

 

<!--[if !supportLists]-->2、  <!--[endif]-->java5sun引入了concurrent包的一些同步机制,要了解这个首先了解AbstractQueuedSynchronizer

 

<!--[if !supportLists]-->3、  <!--[endif]-->AbstractQueuedSynchronizer了解

实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架,此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制

以上是java API的描述 ,简单就是提供线程阻塞和同步的对象,子类需要实现tryAcquiretryReleasetryAcquireSharedtryReleaseSharedisHeldExclusively等方法

 

下面是常用方法介绍

public final void acquire(int arg)

以独占模式获取对象,忽略中断。通过至少调用一次 tryAcquire(int) 来实现此方法,并在成功时返回。否则在成功之前,一直调用 tryAcquire(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。可以使用此方法来实现 Lock.lock() 方法。

参数:

arg - acquire 参数。此值被传送给 tryAcquire(int),但它是不间断的,并且可以表示任何内容。

 

protected boolean tryAcquire(int arg)

试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。

此方法总是由执行 acquire 的线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。可以用此方法来实现 Lock.tryLock() 方法。

默认实现将抛出 UnsupportedOperationException

参数:

arg - acquire 参数。该值总是传递给 acquire 方法的那个值,或者是因某个条件等待而保存在条目上的值。该值是不间断的,并且可以表示任何内容。

返回:

如果成功,则返回 true。在成功的时候,此对象已经被获取。

抛出:

IllegalMonitorStateException - 如果正在进行的获取操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。

UnsupportedOperationException - 如果不支持独占模式

 

public final boolean release(int arg)

以独占模式释放对象。如果 tryRelease(int) 返回 true,则通过消除一个或多个线程的阻塞来实现此方法。可以使用此方法来实现 Lock.unlock() 方法

参数:

arg - release 参数。此值被传送给 tryRelease(int),但它是不间断的,并且可以表示任何内容。

返回:

tryRelease(int) 返回的值

 

protected boolean tryRelease(int arg)

试图设置状态来反映独占模式下的一个释放。

此方法总是由正在执行释放的线程调用。

默认实现将抛出 UnsupportedOperationException

参数:

arg - release 参数。该值总是传递给 release 方法的那个值,或者是因某个条件等待而保存在条目上的当前状态值。该值是不间断的,并且可以表示任何内容。

返回:

如果此对象现在处于完全释放状态,从而使等待的线程都可以试图获得此对象,则返回 true;否则返回 false

 

public final void acquireShared(int arg)

以共享模式获取对象,忽略中断。通过至少先调用一次 tryAcquireShared(int) 来实现此方法,并在成功时返回。否则在成功之前,一直调用 tryAcquireShared(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。

参数:

arg - acquire 参数。此值被传送给 tryAcquireShared(int),但它是不间断的,并且可以表示任何内容。

 

 

public final boolean releaseShared(int arg)

以共享模式释放对象。如果 tryReleaseShared(int) 返回 true,则通过消除一个或多个线程的阻塞来实现该方法。

参数:

arg - release 参数。此值被传送给 tryReleaseShared(int),但它是不间断的,并且可以表示任何内容。

 

protected int tryAcquireShared(int arg)

试图在共享模式下获取对象状态。此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它。

此方法总是由执行 acquire 线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。

默认实现将抛出 UnsupportedOperationException

参数:

arg - acquire 参数。该值总是传递给 acquire 方法的那个值,或者是因某个条件等待而保存在条目上的值。该值是不间断的,并且可以表示任何内容。

返回:

在失败时返回负值;如果共享模式下的获取成功但其后续共享模式下的获取不能成功,则返回 0;如果共享模式下的获取成功并且其后续共享模式下的获取可能够成功,则返回正值,在这种情况下,后续等待线程必须检查可用性。(对三种返回值的支持使得此方法可以在只是有时候以独占方式获取对象的上下文中使用。)在成功的时候,此对象已被获取。

抛出:

IllegalMonitorStateException - 如果正在进行的获取操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。

UnsupportedOperationException - 如果不支持共享模式

 

上面几个方法总结就是获取对象,当可以获取时不阻塞,否则会阻塞,使用LockSupport.park()---unsafe.park(),释放对象

 

<!--[if !supportLists]-->4、  <!--[endif]-->CountDownLatch

 

/**

 * 原理是内部维护一个大小i的信号量,

 * 使用await方法会一直等待直到信号量为0

 * 使用countDown方法会时信号量-1,当信号量为0await取消阻塞

 * @author jin.xiong

 *

 */

public class CountDownLatch

{

    private static final class Sync extends AbstractQueuedSynchronizer

    {

 

    /**

     * 获取状态

     * @return

     */

        int getCount()

        {

            return getState();

        }

 

        /**

         * state0是才可以获取锁,否则一直等待

         */

        public int tryAcquireShared(int i)

        {

            return getState() != 0 ? -1 : 1;

        }

 

        /**

         * stage通过CAS减一

         */

        public boolean tryReleaseShared(int i)

        {

            int j;

            int k;

            do

            {

                j = getState();

                if(j == 0)

                    return false;

                k = j - 1;

            } while(!compareAndSetState(j, k));

            return k == 0;

        }

 

        Sync(int i)

        {

            setState(i);

        }

    }

 

 

    public CountDownLatch(int i)

    {

        if(i < 0)

        {

            throw new IllegalArgumentException("count < 0");

        } else

        {

            sync = new Sync(i);

            return;

        }

    }

 

    public void await()

        throws InterruptedException

    {

        sync.acquireSharedInterruptibly(1);

    }

 

    public boolean await(long l, TimeUnit timeunit)

        throws InterruptedException

    {

        return sync.tryAcquireSharedNanos(1, timeunit.toNanos(l));

    }

 

    public void countDown()

    {

        sync.releaseShared(1);

    }

 

    public long getCount()

    {

        return (long)sync.getCount();

    }

 

    public String toString()

    {

        return (new StringBuilder()).append(super.toString()).append("[Count = ").append(sync.getCount()).append("]").toString();

    }

   

   

    public  static void main(String[] args) throws InterruptedException{

    final CountDownLatch down = new CountDownLatch(10);

    System.out.println(down.getCount());

   

    new Thread(){

        public void run(){

            try {

                  down.await();

              } catch (InterruptedException e) {

              }

            System.out.println("CountDownLatch Stage 0   " +down.getCount() );

        }

    }.start();

    for(int i=0;i<10;i++){

        Thread.sleep(1000);

        System.out.println(i);

        down.countDown();

    }

    System.out.println(down.getCount());

    }

 

    private final Sync sync;

}

 

 

5Semaphore

/**

 * 计数信号量,原理内部维护一个大小为i的许可数量

 * acquire(s)方法把当前许可号-s

 * release(s)方法吧当前许可号+s

 * @author jin.xiong

 *

 */

public class Semaphore

    implements Serializable

{

    /**

     * 公平的Sync

     * 有两个判断条件,如果当前的线程不在等待FIFO线程队列的首部,将继续等待,且stage必须大于0

     * @author jin.xiong

     *

     */

    static final class FairSync extends Sync

    {

 

        protected int tryAcquireShared(int i)

        {

            Thread thread = Thread.currentThread();

            int j;

            int k;

            do

            {

                Thread thread1 = getFirstQueuedThread();

                if(thread1 != null && thread1 != thread)

                    return -1;

                j = getState();

                k = j - i;

            } while(k >= 0 && !compareAndSetState(j, k));

            return k;

        }

 

        private static final long serialVersionUID = 2014338818796000944L;

 

        FairSync(int i)

        {

            super(i);

        }

    }

 

    /**

     * 非公平的Sync

     * @author jin.xiong

     *

     */

    static final class NonfairSync extends Sync

    {

 

        protected int tryAcquireShared(int i)

        {

            return nonfairTryAcquireShared(i);

        }

 

        private static final long serialVersionUID = -2694183684443567898L;

 

        NonfairSync(int i)

        {

            super(i);

        }

    }

 

    static abstract class Sync extends AbstractQueuedSynchronizer

    {

 

    /**

     * 获取状态

     * @return

     */

        final int getPermits()

        {

            return getState();

        }

 

        /**

         * 把状态-i

         * @param i

         * @return

         */

        final int nonfairTryAcquireShared(int i)

        {

            int j;

            int k;

            do

            {

                j = getState();

                k = j - i;

            } while(k >= 0 && !compareAndSetState(j, k));

            return k;

        }

 

        /**

         * 把状态+i

         */

        protected final boolean tryReleaseShared(int i)

        {

            int j;

            do

                j = getState();

            while(!compareAndSetState(j, j + i));

            return true;

        }

 

        /**

         * 状态-i nonfairTryAcquireShared作用一样

         * @param i

         */

        final void reducePermits(int i)

        {

            int j;

            int k;

            do

            {

                j = getState();

                k = j - i;

            } while(!compareAndSetState(j, k));

        }

 

        /**

         * 把状态设为0

         * @return

         */

        final int drainPermits()

        {

            int i;

            do

                i = getState();

            while(i != 0 && !compareAndSetState(i, 0));

            return i;

        }

 

        private static final long serialVersionUID = 1192457210091910933L;

 

        Sync(int i)

        {

            setState(i);

        }

    }

 

 

    /**

     * 初始化容量为i的信号量

     * @param i

     */

    public Semaphore(int i)

    {

        sync = new NonfairSync(i);

    }

 

    public Semaphore(int i, boolean flag)

    {

        sync = ((Sync) (flag ? ((Sync) (new FairSync(i))) : ((Sync) (new NonfairSync(i)))));

    }

 

    public void acquire()

        throws InterruptedException

    {

        sync.acquireSharedInterruptibly(1);

    }

 

    public void acquireUninterruptibly()

    {

        sync.acquireShared(1);

    }

 

    public boolean tryAcquire()

    {

        return sync.nonfairTryAcquireShared(1) >= 0;

    }

 

    public boolean tryAcquire(long l, TimeUnit timeunit)

        throws InterruptedException

    {

        return sync.tryAcquireSharedNanos(1, timeunit.toNanos(l));

    }

 

    public void release()

    {

        sync.releaseShared(1);

    }

 

    public void acquire(int i)

        throws InterruptedException

    {

        if(i < 0)

        {

            throw new IllegalArgumentException();

        } else

        {

            sync.acquireSharedInterruptibly(i);

            return;

        }

    }

 

    public void acquireUninterruptibly(int i)

    {

        if(i < 0)

        {

            throw new IllegalArgumentException();

        } else

        {

            sync.acquireShared(i);

            return;

        }

    }

 

    public boolean tryAcquire(int i)

    {

        if(i < 0)

            throw new IllegalArgumentException();

        else

            return sync.nonfairTryAcquireShared(i) >= 0;

    }

 

    /**

     * 获取信号量,如果没有则等待时间l

     * @param i

     * @param l

     * @param timeunit

     * @return

     * @throws InterruptedException

     */

    public boolean tryAcquire(int i, long l, TimeUnit timeunit)

        throws InterruptedException

    {

        if(i < 0)

            throw new IllegalArgumentException();

        else

            return sync.tryAcquireSharedNanos(i, timeunit.toNanos(l));

    }

 

   /**

    * 释放i个信号量

    * @param i

    */

    public void release(int i)

    {

        if(i < 0)

        {

            throw new IllegalArgumentException();

        } else

        {

            sync.releaseShared(i);

            return;

        }

    }

 

    public int availablePermits()

    {

        return sync.getPermits();

    }

 

    /**

     * 释放所有信号量

     * @return

     */

    public int drainPermits()

    {

        return sync.drainPermits();

    }

 

    protected void reducePermits(int i)

    {

        if(i < 0)

        {

            throw new IllegalArgumentException();

        } else

        {

            sync.reducePermits(i);

            return;

        }

    }

 

    public boolean isFair()

    {

        return sync instanceof FairSync;

    }

 

    public final boolean hasQueuedThreads()

    {

        return sync.hasQueuedThreads();

    }

 

    public final int getQueueLength()

    {

        return sync.getQueueLength();

    }

 

    protected Collection getQueuedThreads()

    {

        return sync.getQueuedThreads();

    }

 

    public String toString()

    {

        return (new StringBuilder()).append(super.toString()).append("[Permits = ").append(sync.getPermits()).append("]").toString();

    }

 

    private static final long serialVersionUID = -3222578661600680210L;

    private final Sync sync;

}

 

 

分享到:
评论

相关推荐

    java concurrent 包 详细解析

    Java并发包(java.concurrent)是Java平台中处理多线程编程的核心工具包,它提供了丰富的类和接口,使得开发者能够高效、安全地编写多线程程序。这个包的设计目标是提高并发性能,减少同步代码的复杂性,并提供高级...

    java concurrent 精简源码

    这个“java concurrent 精简源码”资源很可能会包含上述概念的实际应用示例,通过学习和分析这些代码,你可以深入理解Java并发编程的精髓,并能更好地应用于实际项目中。在研究时,建议结合Java官方文档和相关的书籍...

    java.concurrent包的应用

    Java标准库中的`java.util.concurrent`包提供了线程池的实现,主要由`ExecutorService`接口和`ThreadPoolExecutor`类组成。`ExecutorService`定义了执行任务的接口,而`ThreadPoolExecutor`则是具体的线程池实现,...

    Java Concurrent Programming

    为了简化多线程编程,Java提供了一系列工具和API,如`java.util.Timer`和`java.util.concurrent`包,这些工具可以帮助开发者更高效地管理线程间的同步问题。 ##### 1.2 synchronized关键字 `synchronized`关键字是...

    java并发工具包 java.util.concurrent中文版pdf

    ### Java并发工具包 `java.util.concurrent` 知识点详解 #### 一、引言 随着多核处理器的普及和应用程序复杂度的增加,多线程编程成为了现代软件开发不可或缺的一部分。为了简化并发编程的复杂性,Java 5 引入了 `...

    使用java concurrent调用xmlp api生成pdf

    首先,`java.concurrent`包是Java标准库的一部分,提供了并发编程的支持,包括线程池、同步机制、并发容器等。在生成PDF的过程中,如果数据处理或转换工作量较大,可以利用并发处理来提高性能。例如,通过...

    Java的concurrent包动画演示

    Java的`concurrent`包是Java多线程编程的核心组件,它包含了一系列高效、线程安全的类和接口,使得开发者能够更容易地处理并发问题。这个包中的工具和类是Java平台对并行编程的强大支持,它极大地提升了程序在多...

    java并发工具包 java.util.concurrent中文版-带书签版

    通过这份中文版的`java.util.concurrent`用户指南,读者可以深入理解这些并发工具的使用方法和原理,提升在多线程环境下的编程能力。书签功能使得学习更加便捷,能够快速定位到感兴趣的章节或知识点,对Java并发编程...

    JAVA Concurrent Programming

    Java 1.5引入了`java.util.concurrent`包,包含了一系列的并发工具类,如线程池、阻塞队列、并发集合等。这些工具旨在提高并发性能并简化编程模型。例如,`ExecutorService`和`ThreadPoolExecutor`提供了线程池管理...

    java concurrent in practive

    另外,`java.util.concurrent.locks`包提供了可重入锁`ReentrantLock`和读写锁`ReentrantReadWriteLock`,它们比`synchronized`更灵活,允许更细粒度的锁控制。 死锁、活锁和饥饿是并发编程中常见的问题,书中会...

    Concurrent Programming in Java

    Java的并发库提供了一系列工具和API,如`java.util.concurrent`包,帮助开发者有效地管理并发任务。本书主要涵盖以下几个方面: 1. **线程基础**:书中首先介绍了线程的基本概念,包括如何创建和管理线程,以及线程...

    java5 并发包 (concurrent)思维导图

    Java 5并发包(`java.util.concurrent`,简称`Concurrent`包)是Java平台中用于多线程编程的重要组成部分,它提供了丰富的并发工具类,极大地简化了在多线程环境下的编程工作。这个包的设计目标是提高并发性能,减少...

    JDK concurrent

    标题 "JDK concurrent" 指的是Java开发工具包(JDK)中的并发编程相关知识。并发编程是在多线程环境中同时执行多个任务的技术,它在现代计算机系统中至关重要,尤其是在多核处理器和高并发应用中。Java JDK提供了一...

    面试-Java一些常见面试题+题解之多线程开发-JavaConcurrent.zip

    在Java编程领域,多线程开发是不可...JavaConcurrent的深入理解和应用,不仅能提升代码质量,也是成为资深Java开发者的必经之路。在面试过程中,结合具体案例和实践经验,能够更全面地展示自己的技能和解决问题的能力。

    The java.util.concurrent Synchronizer Framework

    为了更好地支持并发编程,Java平台在J2SE 1.5版本中引入了`java.util.concurrent`包,这是一个包含了许多中级并发支持类的集合,通过Java社区过程(Java Community Process, JCP)的Java规范请求(Java ...

    java_util_concurrent中文版pdf

    综上所述,《Java Util Concurrent中文版》详尽解读了这些关键概念和工具,帮助开发者深入理解Java并发编程,提升程序的并发性能和稳定性。通过学习这本书,开发者可以更好地应对多线程环境下的挑战,写出更加高效、...

    java类库 java包

    Java 类库和包是Java编程语言的核心组成部分,它们提供了丰富的功能和工具,使得开发者能够构建复杂的应用程序。本文将深入探讨Java类库和包的...理解和掌握这些类库及包的用法,是成为一位合格Java程序员的关键步骤。

    Java_Concurrent_java并发工具包.pdf

    总的来说,深入学习Java并发工具包不仅可以提升对Java体系的理解,也有助于在实际工作中编写更高效、更可靠的并发代码。通过掌握J.U.C的API、理解其背后的硬件原理和软件思想,以及避开常见的并发陷阱,开发者能够更...

    Doug Lea, Concurrent Programming in Java Design Principles and Patterns

    在Java并发编程领域,Doug Lea是一位杰出的专家,他的工作对Java内存模型(JMM)和并发工具类(如java.util.concurrent包)的开发产生了深远影响。本书主要围绕以下几个关键知识点展开: 1. **并发基础**:首先,书...

Global site tag (gtag.js) - Google Analytics