- 浏览: 1595469 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
jsrgzhangzhiyong:
关于null值的转换还是感觉不太友好,就像 mapstruct ...
我也造了个轮子:BeanMapping(属性拷贝) -
he037:
a417930422 写道引用使用EPHEMERAL会引出一个 ...
基于zookeeper的分布式lock实现 -
seancheer:
qianshangding 写道首先节点启动后,尝试读取本地的 ...
zookeeper学习记录三(session,watcher,persit机制) -
雪夜归人:
您好,我想咨询一下,开源的canal都能支持mysql的哪些版 ...
Canal BinlogChange(mysql5.6) -
zhoudengyun:
copy 一份做记录,后续学习,请知悉
阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费
背景
最近一个月都在做项目,我主要负责分布式任务的调度的功能,需要实现一个分布式的授权控制。
具体的需求:
1. 首先管理员启动整个任务,并设置执行权限
2. 工作节点收到消息后就会创建对应的线程,并开始执行任务(任务都是由一个管理员进行分配)
3. 运行过程中管理员需要临时中断某个任务,需要设置一个互斥信号,此时对应的工作节点都需要被阻塞,注意不是完全销毁
分析
先抛开分布式通讯这一块,首先从单个jvm如何实现进行分析, 简单点来说:
在单jvm中就是两种线程,一个为manager,另一种为worker。1:n的对应关系,manager可以随时挂起worker的所有线程,而worker线程互不干扰。
咋一看,会觉得是一个比较典型的读写锁的应用场景,读写锁特性:
- 当读写锁是写加锁状态时, 在这个锁被解锁之前, 所有试图对这个锁加锁的线程都会被阻塞.
- 当读写锁在读加锁状态时, 所有试图以读模式对它进行加锁的线程都可以得到访问权, 但是如果线程希望以写模式对此锁进行加锁, 它必须直到知道所有的线程释放锁.
使用读写锁实现这样的功能会存在一个问题,就是对应的写锁是没有抢占权,比如当前有读锁未释放时,写锁一直会被阻塞。而项目的需求是,manager是个领导,它可以不用排队,随时打断你。
除此之外,整个worker线程操作会是一个跨方法,跨类的复杂实现。通过lock方式实现,异常稍微处理不好,很容易造成锁未释放,导致manager一直拿不到对应的锁操作。而且worker中本省会使用一些lock操作,容易造成死锁
总结一下:
- 需要的是一个类似于信号量的PV控制
- 具有的读写锁的,读线程可以不互相影响,写线程拥有最高的抢占权,可以不理会读线程是否在操作
- 支持线程中断 (worker线程需要允许cancel)
因此本文的互斥信号(BooleanMutex)就应运而生,它是信号量(Semaphore)的一个变种,加入了读锁的特性。比如在状态为1时可以一直得到响应,对应的P操作不会消费对应的资源
实现
基于jdk 1.5之后的concurrent的AQS,实现了一个自己的互斥信号控制。 A.Q.S的可以看我的另一篇文章:jdk中cocurrent下的AbstractQueuedSynchronizer理解记录
代码:
public class BooleanMutex { private Sync sync; public BooleanMutex() { sync = new Sync(); set(false); } public BooleanMutex(Boolean mutex) { sync = new Sync(); set(mutex); } /** * 阻塞等待Boolean为true * * @throws InterruptedException */ public void get() throws InterruptedException { sync.innerGet(); } /** * 阻塞等待Boolean为true,允许设置超时时间 * * @param timeout * @param unit * @throws InterruptedException * @throws TimeoutException */ public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { sync.innerGet(unit.toNanos(timeout)); } /** * 重新设置对应的Boolean mutex * * @param mutex */ public void set(Boolean mutex) { if (mutex) { sync.innerSetTrue(); } else { sync.innerSetFalse(); } } public boolean state() { return sync.innerState(); } /** * Synchronization control for BooleanMutex. Uses AQS sync state to * represent run status */ private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** State value representing that TRUE */ private static final int TRUE = 1; /** State value representing that FALSE */ private static final int FALSE = 2; private boolean isTrue(int state) { return (state & TRUE) != 0; } /** * 实现AQS的接口,获取共享锁的判断 */ protected int tryAcquireShared(int state) { // 如果为true,直接允许获取锁对象 // 如果为false,进入阻塞队列,等待被唤醒 return isTrue(getState()) ? 1 : -1; } /** * 实现AQS的接口,释放共享锁的判断 */ protected boolean tryReleaseShared(int ignore) { //始终返回true,代表可以release return true; } boolean innerState() { return isTrue(getState()); } void innerGet() throws InterruptedException { acquireSharedInterruptibly(0); } void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); } void innerSetTrue() { for (;;) { int s = getState(); if (s == TRUE) { return; //直接退出 } if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作 releaseShared(0);//释放一下锁对象,唤醒一下阻塞的Thread } } } void innerSetFalse() { for (;;) { int s = getState(); if (s == FALSE) { return; //直接退出 } if (compareAndSetState(s, FALSE)) {//cas更新状态,避免并发更新false操作 setState(FALSE); } } } } }
代码其实还是挺简单的,主要是对AQS的一份扩展实现。 对应的javadoc和使用说明:
简单测试代码:
@Test public void test_init_true() { BooleanMutex mutex = new BooleanMutex(true); try { mutex.get(); //不会被阻塞 } catch (InterruptedException e) { want.fail(); } } @Test public void test_init_false() { final BooleanMutex mutex = new BooleanMutex(false); try { final CountDownLatch count = new CountDownLatch(1); ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(new Callable() { public Object call() throws Exception { Thread.sleep(1000); mutex.set(true); count.countDown(); return null; } }); mutex.get(); //会被阻塞,等异步线程释放锁对象 count.await(); executor.shutdown(); } catch (InterruptedException e) { want.fail(); } } @Test public void test_concurrent_true() { try { final BooleanMutex mutex = new BooleanMutex(true); final CountDownLatch count = new CountDownLatch(10); ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { executor.submit(new Callable() { public Object call() throws Exception { mutex.get(); count.countDown(); return null; } }); } count.await(); executor.shutdown(); } catch (InterruptedException e) { want.fail(); } } @Test public void test_concurrent_false() { try { final BooleanMutex mutex = new BooleanMutex(false);//初始为false final CountDownLatch count = new CountDownLatch(10); ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { executor.submit(new Callable() { public Object call() throws Exception { mutex.get();//被阻塞 count.countDown(); return null; } }); } Thread.sleep(1000); mutex.set(true); count.await(); executor.shutdown(); } catch (InterruptedException e) { want.fail(); } }
总结
- jdk中的A.Q.S代码还是非常精悍的,可以多多善于利用
- 单机版的互斥控制只是整个需求的第一步,会另起文章介绍整个分布式任务调度这一块,主要是基于zookeeper
评论
3 楼
happyfish356
2015-09-29
happyfish356 写道
说实话,开始看了你的描述,有点被唬倒了
一看实现,你这个也没实现带优先级的抢占....
我的意思是设置为TRUE的Manager级线程从你代码中并没有看到比Work线程优先级高.因为多线程中的背后都是CAS,本质上是没有优先级的
一看实现,你这个也没实现带优先级的抢占....
我的意思是设置为TRUE的Manager级线程从你代码中并没有看到比Work线程优先级高.因为多线程中的背后都是CAS,本质上是没有优先级的
你现在的这个最多算个NoFair的队列,没有什么优先级先后一说.
2 楼
happyfish356
2015-09-29
说实话,开始看了你的描述,有点被唬倒了
一看实现,你这个也没实现带优先级的抢占....
我的意思是设置为TRUE的Manager级线程从你代码中并没有看到比Work线程优先级高.因为多线程中的背后都是CAS,本质上是没有优先级的
一看实现,你这个也没实现带优先级的抢占....
我的意思是设置为TRUE的Manager级线程从你代码中并没有看到比Work线程优先级高.因为多线程中的背后都是CAS,本质上是没有优先级的
1 楼
beneo
2011-09-30
操,这个代码得好好看下,我先MARK下
发表评论
-
yugong QuickStart
2016-03-05 01:52 0几点说明 a. 数据迁移的方案可参见设计文档,oracl ... -
阿里巴巴开源项目: 阿里巴巴去Oracle数据迁移同步工具
2016-03-05 18:29 6509背景 08年左右,阿里巴巴开始尝试MySQL的相关 ... -
愚公performance
2016-03-02 17:29 0性能测试 全量测试 场景1 (单主键, ... -
yugong AdminGuide
2016-03-02 16:40 0环境要求 操作系统 数据库 迁移方案 部署 ... -
Tddl_hint
2014-01-27 13:52 0背景 工作原理 Hint格式 direct模 ... -
tddl5分库规则
2014-01-26 14:41 0背景 工作原理 构建语法树 元数据 基于 ... -
tddl5优化器
2014-01-22 15:12 0背景 工作原理 构建语法树 元数据 抽象语 ... -
Canal BinlogChange(mariadb5/10)
2014-01-20 17:25 4587背景 先前开源了一个 ... -
asynload quickstart
2013-10-08 22:49 0几点说明: 1. asyncload是做为一个j ... -
映射规则配置
2013-09-26 11:25 0背景 因为alibaba的特殊业务,比如: 同 ... -
网友文档贡献
2013-09-18 15:50 01. Otter源代码解析系列 链接:http://e ... -
Manager配置介绍
2013-09-16 13:00 0通道配置说明 多种同步方式配置 a. 单向同步 ... -
canal&otter FAQ
2013-09-05 17:30 0常见问题 1. canal和 ... -
阿里巴巴开源项目:分布式数据库同步系统otter(解决中美异地机房)
2013-08-22 16:48 40433项目背景 阿里巴巴B2B公司,因为业务的特性 ... -
Otter AdminGuide
2013-08-19 11:06 0几点说明 otter系统自带了manager,所以简化了一 ... -
Otter高可用性
2013-08-17 23:41 0基本需求 网络不可靠,异地机房尤为明显. man ... -
Otter数据一致性
2013-08-17 23:39 0技术选型分析 需要处理一致性的业务场景: 多地修改 ( ... -
Otter扩展性
2013-08-17 22:20 0扩展性定义 按照实现不同,可分为两类: 数据处理自定 ... -
Otter双向回环控制
2013-08-17 21:37 0基本需求 支持mysql/oracle的异构数据库的双 ... -
Otter调度模型
2013-08-17 20:13 0背景 在介绍调度模型之前,首先了解一下otter系统要解 ...
相关推荐
重点在于ReentrantLock的分析,它是基于AQS实现的互斥锁。相比synchronized,它提供了更多特性,如手动加锁解锁,以及公平和非公平锁的选择。公平锁(FairSync)和非公平锁(NonfairSync)的实现是通过AQS的扩展来...
AQS的设计和实现是基于管程技术的,管程是解决同步和互斥问题的有效方法。 管程技术是在操作系统中解决同步和互斥问题的常用方法。信号量机制是解决同步和互斥问题的另外一种方法,但是信号量机制的操作分散在各个...
基于AQS实现高性能连接池。 提供failover和failfast两种高可用策略。 支持同步和异步回调两种机制。 提供接口方法请求时间、tps等监控信息。 提供和自定义服务端过载保护策略。 jmh基准测试结果 运行基准测试步骤: ...
AQS通过维护一个FIFO的等待队列来管理线程的同步状态,它提供了一种抽象的方式来实现独占和共享的资源控制,如ReentrantLock、Semaphore等都是基于AQS构建的。在本文中,我们将深入探讨AQS的工作原理及其在实际项目...
AQS,全称为AbstractQueuedSynchronizer,是一个抽象类,为构建实现阻塞锁和相关同步器(如信号量、事件等)提供了一种基础框架。它内部基于一个FIFO(先进先出)的等待队列来管理线程的同步状态。AQS的设计理念是将...
在Java并发API中,如ReentrantLock、Semaphore、CountDownLatch等类都基于AQS实现,利用其内置的等待队列和状态管理机制,高效地实现了锁、信号量和计数器等功能。 总结,AQS是Java并发编程的精华所在,它通过抽象...
在Java.util.concurrent包中,许多并发工具类如ReentrantLock、Semaphore、CountDownLatch等都基于AQS实现。 AQS的核心概念是基于一个整型的state变量,它表示了资源的状态。当state为0时,表示资源没有被占用;非0...
通过学习AQS,开发者不仅能够理解`ReentrantLock`和`CountDownLatch`的工作方式,还能进一步掌握如`ReentrantReadWriteLock`(读写锁)、`Semaphore`(信号量)等其他同步工具的实现原理。掌握AQS的使用,意味着具备了...
在Java并发编程中,`AbstractQueuedSynchronizer`(简称AQS)是实现锁和其他同步工具的基础框架。AQS位于`java.util.concurrent`包下,通过模板方法设计模式实现了锁的底层机制。本文将详细解析AQS中的关键方法以及...
AQS的实现主要分为两种模式:独占模式和共享模式。在独占模式下,只有一个线程可以执行,比如ReentrantLock。在共享模式下,多个线程可以同时执行,比如Semaphore、CountDownLatch。AQS通过模板方法的设计模式,将...
本文将基于JDK源码解析Java领域中的并发锁,探讨AQS基础同步器、LockSupport、Condition接口、Lock接口、ReadWriteLock接口以及自定义API操作的设计与实现。 一、AQS(AbstractQueuedSynchronizer)基础同步器的...
- **实现**:基于AQS的公平锁FairSync和非公平锁NonfairSync,信号量的值表示可以并发访问的许可证数量。 8. **并发编程设计原则**: - "不要通过内存共享来实现通信,而应该通过通信来实现内存共享"。这一原则...
3. **锁获取与释放**:AQS提供了一套基于CAS(Compare And Swap)的原子操作来实现锁的获取与释放。`tryAcquire()`用于尝试获取锁,如果当前线程能够获得锁,它会更新`state`并返回true,否则返回false。`tryRelease...
java锁AQS基础逻辑
虽然 CyclicBarrier 的实现不是直接基于 AQS,但它通过 ReentrantLock 实现了类似的功能,而 ReentrantLock 的底层实现正是 AQS。CyclicBarrier 内部使用 ReentrantLock 的 Condition 机制来实现线程间的同步。 ###...
ReentrantLock是一种基于AQS框架实现的可重入锁。它可以显式地控制锁的获取和释放,并提供了公平性和非公平性两种获取策略。此外,ReentrantLock还支持尝试获取锁的功能,即在指定时间内尝试获取锁而不阻塞。 #### ...