`

spring boot 利用redisson实现redis的分布式锁(二)

 
阅读更多

 接上篇 http://liaoke0123.iteye.com/blog/2375469

 

有的朋友可能会问,如果一个获得一个锁,执行一个耗时任务,耗时任务耗时大于锁默认的放锁时间,那么会怎么样。

 

redisson其实已经自动放锁了,避免饿死锁。

 

在超时上,我们业务也能不允许,所以我添加了fallback策略。

 

同样的分布式锁回调接口

 

 

package com.example.demo.redis2;

import javax.persistence.Transient;

/**
 * 分布式锁回调接口
 *
 * @author lk
 */
public interface DistributedLockCallback<T> {

    /**
     * 调用者必须在此方法中实现需要加分布式锁的业务逻辑
     *
     * @return
     */
    public T process();

    /**
     * 调用者业务
     * 当超时的时候业务降级
     * 与process一同使用
     */
    public T fallback();

    /**
     * 得到分布式锁名称
     *
     * @return
     */
    public String getLockName();
}

 可以看见我新建了fallback接口

 

 

package com.example.demo.redis2.service;

import com.example.demo.redis2.DistributedLockCallback;
import com.example.demo.redis2.DistributedLockTemplate;
import com.example.demo.redis2.dao.TestEntityRepository;
import com.example.demo.redis2.entity.TestEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.transaction.Transactional;

/**
 * Created by LiaoKe on 2017/5/22.
 */
@Service
public class AsyncService {

    @Resource
    TestEntityRepository ts;


    @Resource
    DistributedLockTemplate distributedLockTemplate;

    /**
     * 加锁
     */
    @Async
    @Transactional
    public void addAsync(){
        distributedLockTemplate.lock(new DistributedLockCallback<Object>(){
            @Override
            public Object process() {
                add();
                return null;
            }

            @Override
            public Object fallback() {
                reduce();
                return null;
            }

            @Override
            public String getLockName() {
                return "MyLock";
            }
        });
    }

    /**
     * 未加锁
     */
    @Async
    public void addNoAsync(){
        add();
    }

    /**
     * 测试异步方法
     * 在不加分布式锁的情况下
     * num数目会混乱
     */
    @Async
    public void add(){
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if(ts.findAll().size()==0){
            TestEntity  t  =  new TestEntity();
            t.setNum(1);
            ts.saveAndFlush(t);
        }else{
            TestEntity dbt = ts.findAll().get(0);
            dbt.setNum(dbt.getNum()+1);
            ts.saveAndFlush(dbt);
        }
    }

    /**
     * fallback
     */
    @Async
    private void reduce(){
        if(ts.findAll().size()==0){
            TestEntity  t  =  new TestEntity();
            t.setNum(1);
            ts.saveAndFlush(t);
        }else{
            TestEntity dbt = ts.findAll().get(0);
            dbt.setNum(dbt.getNum()-1);
            ts.saveAndFlush(dbt);
        }
    }


}
 

 fallback实现了对业务超时的进行的回退,数目减1,并且在方法上加上了@Transactional 事物注解,以防止在fallback发生异常,但是数目缺+1

 

由于我们使用的默认锁设置,超时是5秒,为了模拟超时,我在add()方法中让线程暂停了6秒。我们来看效果。

 

package com.example.demo.redis2;

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;

import javax.persistence.Transient;
import java.util.concurrent.TimeUnit;

/**
 * Single Instance mode 分布式锁模板
 *
 * @author lk
 */
public class SingleDistributedLockTemplate implements DistributedLockTemplate {
    private static final long     DEFAULT_TIMEOUT   = 5;
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

    private RedissonClient    redisson;

    public SingleDistributedLockTemplate() {
    }

    public SingleDistributedLockTemplate(RedissonClient redisson) {
        this.redisson = redisson;
    }

    @Override
    public <T> T lock(DistributedLockCallback<T> callback) {
        return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT);
    }

    @Override
    public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit) {
        RLock lock = null;
        try {
            System.out.println("获取锁.......");
            lock = redisson.getLock(callback.getLockName());
            lock.lock(leaseTime, timeUnit);
            T d = callback.process();
            return d;
        } finally {
            if (lock != null) {
                if(!lock.isHeldByCurrentThread()){
                    System.out.println("超时自动放锁.......");
                    callback.fallback();
                }else{
                    System.out.println("释放锁.......");
                    lock.unlock();
                }
            }

        }
    }

    public void setRedisson(RedissonClient redisson) {
        this.redisson = redisson;
    }

}

 

 

同样的url(见上篇文章)访问结果如下

 

 可见数据库并未+1 ,fallback策略成功。

 

事物是必须加的,我们现在来模拟fallback失败

 

  /**
     * fallback
     */
    @Async
    private void reduce(){
        int i = 5 /0 ;
        if(ts.findAll().size()==0){
            TestEntity  t  =  new TestEntity();
            t.setNum(1);
            ts.saveAndFlush(t);
        }else{
            TestEntity dbt = ts.findAll().get(0);
            dbt.setNum(dbt.getNum()-1);
            ts.saveAndFlush(dbt);
        }
        
    }

 加上了一个runtimeException

 

并且去掉@Trabsactional

我们来看结果



 

 



 但是数据库中数据却增加了,显示是不行的。

现在我们加上事务注解


可见在事务下,连第一次+1都没有提交,我们事务策略成功

 

下面附上demo
 

 

  • 大小: 12.1 KB
  • 大小: 11.2 KB
  • 大小: 17.3 KB
  • 大小: 11.6 KB
  • 大小: 11.9 KB
分享到:
评论
1 楼 CodeMonkey_Xiong 2017-06-20  
      

相关推荐

Global site tag (gtag.js) - Google Analytics