`
agapple
  • 浏览: 1597814 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

基于AQS实现互斥信号(BooleanMutex)

 
阅读更多

背景

  最近一个月都在做项目,我主要负责分布式任务的调度的功能,需要实现一个分布式的授权控制。

  具体的需求:

  1.  首先管理员启动整个任务,并设置执行权限

  2.  工作节点收到消息后就会创建对应的线程,并开始执行任务(任务都是由一个管理员进行分配)

  3.  运行过程中管理员需要临时中断某个任务,需要设置一个互斥信号,此时对应的工作节点都需要被阻塞,注意不是完全销毁

分析

 先抛开分布式通讯这一块,首先从单个jvm如何实现进行分析, 简单点来说:

 在单jvm中就是两种线程,一个为manager,另一种为worker。1:n的对应关系,manager可以随时挂起worker的所有线程,而worker线程互不干扰。

 

 咋一看,会觉得是一个比较典型的读写锁的应用场景,读写锁特性:

  • 当读写锁是写加锁状态时, 在这个锁被解锁之前, 所有试图对这个锁加锁的线程都会被阻塞.
  • 当读写锁在读加锁状态时, 所有试图以读模式对它进行加锁的线程都可以得到访问权, 但是如果线程希望以写模式对此锁进行加锁, 它必须直到知道所有的线程释放锁.

使用读写锁实现这样的功能会存在一个问题,就是对应的写锁是没有抢占权,比如当前有读锁未释放时,写锁一直会被阻塞。而项目的需求是,manager是个领导,它可以不用排队,随时打断你。

除此之外,整个worker线程操作会是一个跨方法,跨类的复杂实现。通过lock方式实现,异常稍微处理不好,很容易造成锁未释放,导致manager一直拿不到对应的锁操作。而且worker中本省会使用一些lock操作,容易造成死锁

 

总结一下: 

  1. 需要的是一个类似于信号量的PV控制
  2. 具有的读写锁的,读线程可以不互相影响,写线程拥有最高的抢占权,可以不理会读线程是否在操作
  3. 支持线程中断 (worker线程需要允许cancel)
因此本文的互斥信号(BooleanMutex)就应运而生,它是信号量(Semaphore)的一个变种,加入了读锁的特性。比如在状态为1时可以一直得到响应,对应的P操作不会消费对应的资源

实现

  基于jdk 1.5之后的concurrent的AQS,实现了一个自己的互斥信号控制。 A.Q.S的可以看我的另一篇文章:jdk中cocurrent下的AbstractQueuedSynchronizer理解记录

 

  代码:

 

public class BooleanMutex {

    private Sync sync;

    public BooleanMutex() {
        sync = new Sync();
        set(false);
    }

    public BooleanMutex(Boolean mutex) {
        sync = new Sync();
        set(mutex);
    }

    /**
     * 阻塞等待Boolean为true
     * 
     * @throws InterruptedException
     */
    public void get() throws InterruptedException {
        sync.innerGet();
    }

    /**
     * 阻塞等待Boolean为true,允许设置超时时间
     * 
     * @param timeout
     * @param unit
     * @throws InterruptedException
     * @throws TimeoutException
     */
    public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        sync.innerGet(unit.toNanos(timeout));
    }

    /**
     * 重新设置对应的Boolean mutex
     * 
     * @param mutex
     */
    public void set(Boolean mutex) {
        if (mutex) {
            sync.innerSetTrue();
        } else {
            sync.innerSetFalse();
        }
    }

    public boolean state() {
        return sync.innerState();
    }

    /**
     * Synchronization control for BooleanMutex. Uses AQS sync state to
     * represent run status
     */
    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;

        /** State value representing that TRUE */
        private static final int  TRUE             = 1;
        /** State value representing that FALSE */
        private static final int  FALSE            = 2;

        private boolean isTrue(int state) {
            return (state & TRUE) != 0;
        }

        /**
         * 实现AQS的接口,获取共享锁的判断
         */
        protected int tryAcquireShared(int state) {
            // 如果为true,直接允许获取锁对象
            // 如果为false,进入阻塞队列,等待被唤醒
            return isTrue(getState()) ? 1 : -1;
        }

        /**
         * 实现AQS的接口,释放共享锁的判断
         */
        protected boolean tryReleaseShared(int ignore) {
            //始终返回true,代表可以release
            return true;
        }

        boolean innerState() {
            return isTrue(getState());
        }

        void innerGet() throws InterruptedException {
            acquireSharedInterruptibly(0);
        }

        void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
        }

        void innerSetTrue() {
            for (;;) {
                int s = getState();
                if (s == TRUE) {
                    return; //直接退出
                }
                if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作
                    releaseShared(0);//释放一下锁对象,唤醒一下阻塞的Thread
                }
            }
        }

        void innerSetFalse() {
            for (;;) {
                int s = getState();
                if (s == FALSE) {
                    return; //直接退出
                }
                if (compareAndSetState(s, FALSE)) {//cas更新状态,避免并发更新false操作
                    setState(FALSE);
                }
            }
        }

    }
}

 

 

代码其实还是挺简单的,主要是对AQS的一份扩展实现。 对应的javadoc和使用说明:


 

 

简单测试代码:

 

@Test
    public void test_init_true() {
        BooleanMutex mutex = new BooleanMutex(true);
        try {
            mutex.get(); //不会被阻塞
        } catch (InterruptedException e) {
            want.fail();
        }
    }

    @Test
    public void test_init_false() {
        final BooleanMutex mutex = new BooleanMutex(false);
        try {
            final CountDownLatch count = new CountDownLatch(1);
            ExecutorService executor = Executors.newCachedThreadPool();

            executor.submit(new Callable() {

                public Object call() throws Exception {
                    Thread.sleep(1000);
                    mutex.set(true);
                    count.countDown();
                    return null;
                }
            });

            mutex.get(); //会被阻塞,等异步线程释放锁对象
            count.await();
            executor.shutdown();
        } catch (InterruptedException e) {
            want.fail();
        }
    }

    @Test
    public void test_concurrent_true() {
        try {
            final BooleanMutex mutex = new BooleanMutex(true);
            final CountDownLatch count = new CountDownLatch(10);
            ExecutorService executor = Executors.newCachedThreadPool();

            for (int i = 0; i < 10; i++) {
                executor.submit(new Callable() {

                    public Object call() throws Exception {
                        mutex.get();
                        count.countDown();
                        return null;
                    }
                });
            }
            count.await();
            executor.shutdown();
        } catch (InterruptedException e) {
            want.fail();
        }
    }

    @Test
    public void test_concurrent_false() {
        try {
            final BooleanMutex mutex = new BooleanMutex(false);//初始为false
            final CountDownLatch count = new CountDownLatch(10);
            ExecutorService executor = Executors.newCachedThreadPool();

            for (int i = 0; i < 10; i++) {
                executor.submit(new Callable() {

                    public Object call() throws Exception {
                        mutex.get();//被阻塞
                        count.countDown();
                        return null;
                    }
                });
            }
            Thread.sleep(1000);
            mutex.set(true);
            count.await();
            executor.shutdown();
        } catch (InterruptedException e) {
            want.fail();
        }
    }

总结

 

  1. jdk中的A.Q.S代码还是非常精悍的,可以多多善于利用
  2. 单机版的互斥控制只是整个需求的第一步,会另起文章介绍整个分布式任务调度这一块,主要是基于zookeeper

 

  • 大小: 55.6 KB
分享到:
评论
3 楼 happyfish356 2015-09-29  
happyfish356 写道
说实话,开始看了你的描述,有点被唬倒了
一看实现,你这个也没实现带优先级的抢占....
我的意思是设置为TRUE的Manager级线程从你代码中并没有看到比Work线程优先级高.因为多线程中的背后都是CAS,本质上是没有优先级的



你现在的这个最多算个NoFair的队列,没有什么优先级先后一说.
2 楼 happyfish356 2015-09-29  
说实话,开始看了你的描述,有点被唬倒了
一看实现,你这个也没实现带优先级的抢占....
我的意思是设置为TRUE的Manager级线程从你代码中并没有看到比Work线程优先级高.因为多线程中的背后都是CAS,本质上是没有优先级的

1 楼 beneo 2011-09-30  
操,这个代码得好好看下,我先MARK下

相关推荐

    Java并发编程:深入解析抽象队列同步器(AQS)及其在Lock中的应用

    重点在于ReentrantLock的分析,它是基于AQS实现的互斥锁。相比synchronized,它提供了更多特性,如手动加锁解锁,以及公平和非公平锁的选择。公平锁(FairSync)和非公平锁(NonfairSync)的实现是通过AQS的扩展来...

    Java并发编程解析 | 解析AQS基础同步器的设计与实现

    AQS的设计和实现是基于管程技术的,管程是解决同步和互斥问题的有效方法。 管程技术是在操作系统中解决同步和互斥问题的常用方法。信号量机制是解决同步和互斥问题的另外一种方法,但是信号量机制的操作分散在各个...

    eagle:Eagle分布式rpc调用,借助Zookeeper实现服务注册和发现,基于AQS实现高性能连接池,支持分布式追踪、监控、过载保护等配置。提供Spring和SpringBoot插件,方便与Spring和SpringBoot集成

    基于AQS实现高性能连接池。 提供failover和failfast两种高可用策略。 支持同步和异步回调两种机制。 提供接口方法请求时间、tps等监控信息。 提供和自定义服务端过载保护策略。 jmh基准测试结果 运行基准测试步骤: ...

    aqs_demo.rar

    AQS通过维护一个FIFO的等待队列来管理线程的同步状态,它提供了一种抽象的方式来实现独占和共享的资源控制,如ReentrantLock、Semaphore等都是基于AQS构建的。在本文中,我们将深入探讨AQS的工作原理及其在实际项目...

    aqs_java_

    AQS,全称为AbstractQueuedSynchronizer,是一个抽象类,为构建实现阻塞锁和相关同步器(如信号量、事件等)提供了一种基础框架。它内部基于一个FIFO(先进先出)的等待队列来管理线程的同步状态。AQS的设计理念是将...

    Java并发之AQS详解.pdf

    Java并发之AQS详解 ...AQS 提供了一套灵活的同步器框架,用户可以通过实现 tryAcquire-tryRelease 和 tryAcquireShared-tryReleaseShared 方法来定义自己的同步器,从而实现多线程访问共享资源的同步控制。

    AQS的底层原理.zip

    在Java并发API中,如ReentrantLock、Semaphore、CountDownLatch等类都基于AQS实现,利用其内置的等待队列和状态管理机制,高效地实现了锁、信号量和计数器等功能。 总结,AQS是Java并发编程的精华所在,它通过抽象...

    3.1.4.AQS底层原理分析1

    在Java.util.concurrent包中,许多并发工具类如ReentrantLock、Semaphore、CountDownLatch等都基于AQS实现。 AQS的核心概念是基于一个整型的state变量,它表示了资源的状态。当state为0时,表示资源没有被占用;非0...

    深入学习Java同步机制中的底层实现

    通过学习AQS,开发者不仅能够理解`ReentrantLock`和`CountDownLatch`的工作方式,还能进一步掌握如`ReentrantReadWriteLock`(读写锁)、`Semaphore`(信号量)等其他同步工具的实现原理。掌握AQS的使用,意味着具备了...

    AQS源码分析 (1).pdf

    AQS的实现主要分为两种模式:独占模式和共享模式。在独占模式下,只有一个线程可以执行,比如ReentrantLock。在共享模式下,多个线程可以同时执行,比如Semaphore、CountDownLatch。AQS通过模板方法的设计模式,将...

    JDK_AQS解析

    在Java并发编程中,`AbstractQueuedSynchronizer`(简称AQS)是实现锁和其他同步工具的基础框架。AQS位于`java.util.concurrent`包下,通过模板方法设计模式实现了锁的底层机制。本文将详细解析AQS中的关键方法以及...

    基于JDK源码解析Java领域中的并发锁之设计与实现.pdf

    本文将基于JDK源码解析Java领域中的并发锁,探讨AQS基础同步器、LockSupport、Condition接口、Lock接口、ReadWriteLock接口以及自定义API操作的设计与实现。 一、AQS(AbstractQueuedSynchronizer)基础同步器的...

    基于JDK源码解析Java领域中的并发锁,我们需要特别关注哪些内容?

    - **实现**:基于AQS的公平锁FairSync和非公平锁NonfairSync,信号量的值表示可以并发访问的许可证数量。 8. **并发编程设计原则**: - "不要通过内存共享来实现通信,而应该通过通信来实现内存共享"。这一原则...

    一文带你看懂Java中的Lock锁底层AQS到底是如何实现的.doc

    3. **锁获取与释放**:AQS提供了一套基于CAS(Compare And Swap)的原子操作来实现锁的获取与释放。`tryAcquire()`用于尝试获取锁,如果当前线程能够获得锁,它会更新`state`并返回true,否则返回false。`tryRelease...

    AQS流程图.html

    java锁AQS基础逻辑

    CountDownLatch 和 CyclicBarrier 的运用(含AQS详解)

    虽然 CyclicBarrier 的实现不是直接基于 AQS,但它通过 ReentrantLock 实现了类似的功能,而 ReentrantLock 的底层实现正是 AQS。CyclicBarrier 内部使用 ReentrantLock 的 Condition 机制来实现线程间的同步。 ###...

    7、深入理解AQS独占锁之ReentrantLock源码分析(1).pdf

    ReentrantLock是一种基于AQS框架实现的可重入锁。它可以显式地控制锁的获取和释放,并提供了公平性和非公平性两种获取策略。此外,ReentrantLock还支持尝试获取锁的功能,即在指定时间内尝试获取锁而不阻塞。 #### ...

Global site tag (gtag.js) - Google Analytics