`
征途2010
  • 浏览: 247926 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

分布式锁解决方案

    博客分类:
  • java
阅读更多

下面说一下分布式实现的几种方式:

一、数据库悲观锁

 所谓的悲观锁:顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次拿数据的时候都会上锁。这样别人拿数据的时候就要等待直到锁的释放。

这里是采用oracle的 select  ......  where id=1 for update 来实现分布式锁,建议加上nowait,或者wait 以及 of

下面是demo:

 

select * from table where id=1 for update nowait;//当锁被占用,不等待直接报错
select * from table where id=1 for update wait 6;//当锁被占用,等待6s
select * from table where id=1 for update of columns nowait;//锁定执行的列,反之则锁所有列。

 该方案,在高并发时显然不适用,依赖于数据库的性能以及锁机制,会造成锁无法释放。

 

 

二、数据库乐观锁

所谓的乐观锁:就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。一般的方案都是加一个版本号字段(version),在查询数据时将版本号带出来,更新后将版本号+1,如果版本号一致才更新,并获取影响行数,如果没更新则报错。

 

三、redis的setnx

由于redis是单线程工作的,所以它存取key-value 的时候是单线程工作的,并且多个线程请求redis并不存在竞争问题。所以可以设置一个标识来作为一把锁,只有获取了该锁之后,才能对共享的资源进行操作,没有拿到锁的线程处于不断去取锁的状态,直到等到上一个线程释放锁(即后一个线程可以取到锁)或者超过规定超时时间不再取锁。

 

import com.test.core.base.utils.ApplicationUtil;
import com.test.redis.api.RedisStringOperationService;

/**
 * redis分布式锁
 * @author LIPENG
 * @date 2017年9月22日 下午4:25:56
 * @version V1.0
 */
public class RedisDistributionLock {
    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;
    /**
     * Lock key path.
     */
    private String lockKey;

    /**
     * 锁超时时间,防止线程在入锁以后,无限的执行等待
     */
    private int expireMsecs = 60 * 1000;

    /**
     * 锁等待时间,防止线程饥饿
     */
    private int timeoutMsecs = 10 * 1000;

    private volatile boolean locked = false;

    /**
     * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs.
     *
     * @param lockKey lock key (ex. account:1, ...)
     */
    public RedisDistributionLock(String lockKey) {
        this.lockKey = lockKey + "_lock";
    }

    /**
     * Detailed constructor with default lock expiration of 60000 msecs.
     *
     */
    public RedisDistributionLock(String lockKey, int timeoutMsecs) {
        this(lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }

    /**
     * Detailed constructor.
     *
     */
    public RedisDistributionLock(String lockKey, int timeoutMsecs, int expireMsecs) {
        this(lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }

    /**
     * @return lock key
     */
    public String getLockKey() {
        return lockKey;
    }



    /**
     * 获得 lock.
     * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
     * reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间
            if (getRedisStringOperationService().setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = getRedisStringOperationService().get(lockKey); //redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = getRedisStringOperationService().getAndSet(lockKey, expiresStr);
                //获取上一个锁到期时间,并设置现在的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受

                    //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延迟100 毫秒,  这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                使用随机的等待时间可以一定程度上保证公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
        return false;
    }


    /**
     * 释放锁
     */
    public synchronized void unlock() {
        if (locked) {
        	getRedisStringOperationService().delete(lockKey);
            locked = false;
        }
    }
    
    private RedisStringOperationService getRedisStringOperationService(){
    	return	ApplicationUtil.getBean(RedisStringOperationService.class);
    }

}

 

 

调用方式:

 

RedisDistributionLock lock = new RedisDistributionLock("key", 10000, 20000);
		try {
			if (lock.lock()) {
				// 需要加锁的代码
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}

 

 

四、使用zookeeper

当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是: 
1.建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT) 
2.每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号。 
3.在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。 
4.假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。 
5.当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。 
实现的分布式锁是严格的按照顺序访问的并发锁。

 

 

package cn.wpeace.zktest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/**
 * @author peace
 *
 */
public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";//根
    private String lockName;//竞争资源的标志
    private String waitNode;//等待前一个锁
    private String myZnode;//当前锁
    private CountDownLatch latch;//计数器
    private CountDownLatch connectedSignal=new CountDownLatch(1);
    private int sessionTimeout = 30000; 
    /**
     * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
     * @param config 192.168.1.127:2181
     * @param lockName 竞争资源标志,lockName中不能包含单词_lock_
     */
    public DistributedLock(String config, String lockName){
        this.lockName = lockName;
        // 创建一个与服务器的连接
         try {
            zk = new ZooKeeper(config, sessionTimeout, this);
            connectedSignal.await();
            Stat stat = zk.exists(root, false);//此去不执行 Watcher
            if(stat == null){
                // 创建根节点
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
            }
        } catch (IOException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }
    /**
     * zookeeper节点的监视器
     */
    public void process(WatchedEvent event) {
        //建立连接用
        if(event.getState()==KeeperState.SyncConnected){
            connectedSignal.countDown();
            return;
        }
        //其他线程放弃锁的标志
        if(this.latch != null) {  
            this.latch.countDown();  
        }
    }

    public void lock() {   
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);//等待锁
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        } 
    }
    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //创建临时子节点
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //取出所有子节点
            List<String> subNodes = zk.getChildren(root, false);
            //取出所有lockName的锁
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);

            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //如果是最小的节点,则表示取得锁
                System.out.println(myZnode + "==" + lockObjNodes.get(0));
                return true;
            }
            //如果不是最小的节点,找到比自己小1的节点
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一个子节点
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);//同时注册监听。
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,这里应该一直等待其他线程释放锁
            this.latch = null;
        }
        return true;
    }
    public void unlock() {
        try {
            System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }
    public Condition newCondition() {
        return null;
    }

    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

 

 

 调用方法:

 DistributedLock lock   = new DistributedLock("192.168.1.127:2181","lock");
 lock.lock();
 //共享资源
 if(lock != null)
  lock.unlock();

 

分享到:
评论

相关推荐

    超卖和分布式锁解决方案.docx

    ### 超卖与分布式锁解决方案 #### 一、背景介绍 随着互联网技术的发展,高并发场景变得越来越普遍,尤其是一些大型电商网站在促销活动中的“秒杀”环节更是吸引了大量的用户在同一时间进行访问。这不仅导致了瞬间...

    分布式锁解决方案.docx

    ### 分布式锁解决方案 #### 一、基于数据库实现分布式锁 在分布式系统中,当多个服务实例需要访问同一份共享资源时,就需要通过某种机制来确保这些操作不会发生冲突,这种机制通常被称为“分布式锁”。本文档介绍...

    分布式锁与信号量微服务架构中基于MQ的分布式事务解决方案.zip

    分布式锁与信号量微服务架构中基于MQ的分布式事务解决方案分布式锁与信号量微服务架构中基于MQ的分布式事务解决方案分布式锁与信号量微服务架构中基于MQ的分布式事务解决方案分布式锁与信号量微服务架构中基于MQ的...

    分布式锁,基于Netty长连接实现,自定义协议,高性能锁

    综上所述,这个基于Netty的分布式锁系统融合了高性能网络通信、自定义协议设计、公平的锁竞争策略、服务异常感知以及优雅关闭等多个关键技术,旨在提供一个高效、稳定、可靠的分布式锁解决方案,适用于大规模分布式...

    基于Spring Boot框架的分布式锁服务.zip

    本项目是一个基于Spring Boot框架的分布式锁服务,旨在提供一个高效、可靠的分布式锁解决方案。通过使用Redis作为分布式锁的存储和协调工具,项目实现了对分布式环境下资源访问的并发控制。 项目的主要特性和功能 ...

    用Redis实现分布式锁_redis_分布式_

    分布式锁是一种在分布式系统中协调多个...通过理解并实践这些概念,你可以为你的应用程序构建稳定且可靠的分布式锁解决方案。在提供的PPTX文件中,你将找到更详细的代码解析和示例,帮助你更好地理解和应用这些知识。

    SpringBoot 使用 Redis 分布式锁解决并发问题.docx

    在现代的分布式系统中,服务的高可用性和稳定性通常依赖于多副本部署。然而,这种方式也会引入并发控制问题,特别是当多个...在SpringBoot中,结合RedisTemplate和Lua脚本,我们可以构建高效、可靠的分布式锁解决方案。

    lock4j高性能分布式锁 v2.2.5.zip

    lock4j作为一款优秀的分布式锁解决方案,不仅具备高性能和高可用性,还提供了丰富的锁类型和策略,满足不同场景的需求。理解和掌握lock4j的使用,对于提升分布式系统的并发控制能力具有重要意义。在实际项目中,结合...

    基于Redis的分布式锁的实现方案.pdf

    本文讨论了分布式锁的实现方案,主要基于Redis实现分布式锁,以解决分布式系统中资源访问的同步问题。在分布式系统中,需要协调各个系统或主机之间的资源访问,以避免彼此干扰和保证一致性。分布式锁是控制分布式...

    Redis分布式锁存在的问题及解决方案(值得珍藏)

    本文将详细探讨这些问题及其解决方案。 首先,死锁问题是Redis分布式锁的一个常见挑战。当一个进程获取了锁并在执行业务逻辑时意外终止,锁就无法被释放,其他等待的进程会被永久阻塞。为了防止这种情况,我们需要...

    基于redis和lua脚本的分布式锁的实现

    “基于Redis和Lua脚本的分布式锁的实现” 基于Redis和Lua脚本的分布式锁的实现是使用Redis和Lua脚本来实现分布式锁的技术。...通过这些测试,可以证明Redis和Lua脚本的组合可以提供高性能的分布式锁解决方案。

    redis分布式锁使用实例.rar

    Redis 分布式锁是解决分布式系统中多个服务并发访问共享资源问题的一种常见方案。在分布式环境中,传统的单机锁无法满足需求,因为它们不能确保跨多个...同时,这也为你提供了设计和实现自己的分布式锁解决方案的基础。

    分布式锁原理讲解视频资料

    视频“David_6_13_分布式锁的前世今生.mp4”将详细讲解以上内容,并可能进一步探讨各种实现方式的优缺点,以及在实际项目中如何选择和优化分布式锁的方案。通过学习,你将对分布式锁有更深入的理解,能够灵活地运用...

    maven网约车-分布式锁Java代码

    在这个名为“maven网约车-分布式锁Java代码”的项目中,我们可以推测它提供了一套基于Java实现的分布式锁解决方案,专门针对网约车业务场景。 分布式锁的主要目标是协调分布在不同节点上的服务,使得在高并发环境下...

    基于Spring boot的声明式和编程式分布式锁.zip

    在Spring Boot中,常见的声明式分布式锁解决方案有Redisson、Spring Cache等。 - Redisson:它是一个全面的Redis客户端,提供了分布式锁、信号量、队列等多种功能。通过在方法上添加`@Lock`注解,可以轻松实现...

    zookeeper分布式锁实例源码

    在实践中,ZooKeeper的分布式锁解决方案需要考虑的方面还包括异常处理(如网络分区)、锁的超时机制、公平性策略(确保等待时间最长的线程优先获得锁)以及锁的粒度(细粒度锁可减少锁竞争,但会增加系统复杂性)。...

    redis实现分布式锁,自旋式加锁,lua原子性解锁

    在分布式系统中,为了保证数据的一致性和安全性,分布式锁是一种常见的解决方案。Redis作为一个高性能的键值存储系统,常被用作实现分布式锁的工具。本文将深入探讨如何使用Redis实现分布式锁,以及如何利用自旋式...

    如何实现靠谱的分布式锁技术.docx

    SharkLock作为一种新型的分布式锁解决方案,通过其独特的设计思路,在一定程度上解决了传统方案中存在的问题,提供了一个更加稳定和可靠的分布式锁服务。在实际应用中,开发者应当充分理解每种方案的特点,结合实际...

    Saga分布式事务解决方案与实践.docx

    Saga 分布式事务解决方案与实践 Saga 分布式事务解决方案是微服务架构中的一种解决方案,它可以解决微服务之间的事务一致性问题。在微服务架构中,每个服务都可以独立开发、独立演进、独立部署和独立团队,但这也...

    redis分布式锁.zip

    下面将详细介绍如何使用 Redis 实现分布式锁以及其中可能遇到的问题和解决方案。 首先,我们需要理解分布式锁的基本概念。分布式锁是一种在分布式系统中协调不同节点对共享资源访问的机制。它确保在任何时刻只有一...

Global site tag (gtag.js) - Google Analytics