`
wh0426
  • 浏览: 56177 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
Group-logo
架构师的知识与实践
浏览量:56177
社区版块
存档分类
最新评论

redis分布式内存锁:余量扣除示例

阅读更多

余量扣除,即在高并发,大用户下,每个用户的余量数据频繁发生变化。例如:12306的某车次票的余量,商品库存,短信余量账本等。

针对,此类频繁发生修改的原子类余量对象,采用mysql,oracle等数据,一定会存在操作瓶颈。本文拟采用内存的办法实现,使用redis+Redisson客户端完成。当然,或许可以采用mangodb这类no-sql数据库。

Redisson客户端

https://github.com/mrniko/redisson/wiki

实现redis分布锁的客户端开源项目,redission支持4中连接redis方式,分别为单机,主从, Sentinel , Cluster 集群,并提供以下类库

1.AtomicLong原子操作

2.分布式List

3.分布式Set

4.分布式Map

5.分布式Queue,

6.分布式SortedSet,

7.分布式ConcureentMap

8.分布式Lock

9.分布式CountDownLatch

10. 分布式Publish / Subscribe, HyperLogLog等

 



余量扣除代码片段

resources/redis.properties

 

#redis部署模式 SingleHost 1 MasterSlave 2 Sentinel 3  Cluster 4
redis.deploymentModel=2
redis.hosts=192.168.161.73:6379,192.168.161.129:6379
redis.masterName=mymaster
redis.masteAddress=192.168.161.73:6379



 

RedissonClient

 

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;

import org.redisson.Config;
import org.redisson.Redisson;
import org.redisson.connection.RandomLoadBalancer;
import org.redisson.core.RAtomicLong;
import org.redisson.core.RMap;


/**
 *分布式锁客户端
 * @author 
 *
 */

public class RedissonClient {
	private static RedissonClient instance;
	private RedissonClient(String filename) throws FileNotFoundException, IOException{
		init(filename);
 	}
	public static synchronized RedissonClient getInstance(String filename){
		if(instance==null){
			try {
				 instance=new  RedissonClient(filename);
			 } catch (FileNotFoundException e) {
				// TODO Auto-generated catch block
  				e.printStackTrace();
			} catch (IOException e) {
				 // TODO Auto-generated catch block
				 e.printStackTrace();
			}
 		}
 		return instance;	
	 }
	 public static void main(String[] args){
		RedissonClient client= RedissonClient.getInstance("resources/redis.properties");
 		Redisson redisson=client.getSingleClient("ip:6379");
		/*
		RMap<String, String> map = redisson.getMap("anyMap");
		String prevObject = map.put("123", new String());
		String currentObject = map.putIfAbsent("323", new String());
		String obj = map.remove("123");

		map.fastPut("321", new String());
		map.fastRemove("321");

		Future<String> putAsyncFuture = map.putAsync("321");
		Future<Void> fastPutAsyncFuture = map.fastPutAsync("321");

		map.fastPutAsync("321", new String());
		map.fastRemoveAsync("321");
		redisson.shutdown();
		*/
		
		/**
		 * Distributed Object storage example
		 * Redisson redisson = Redisson.create();

			RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
			bucket.set(new AnyObject());
			bucket.setAsync(new AnyObject());
			AnyObject obj = bucket.get();
			
			redisson.shutdown();
		 */
		
		/**Distributed Set example
		 Redisson redisson = Redisson.create();

 		RSet<SomeObject> set = redisson.getSet("anySet");
		set.add(new SomeObject());
		set.remove(new SomeObject());

		set.addAsync(new SomeObject());

		redisson.shutdown();
		**/
                final RAtomicLong atomicLong = redisson.getAtomicLong("anyAtomicLong");
		//atomicLong.set(1000);//初始化余量为1000,可以通过redis linux客户端设置1000
		for(int i=0;i<2000;i++){//开始扣减余量,每次扣1.共计扣2000次。
			new Runnable(){
	
				@Override
				public void run() {
					
					try {
						Thread.currentThread().sleep(5);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					if(atomicLong.get()==0) {//当余量为0时,系统提示余量不够并退出
						System.out.println(" error less than 0");
						return ;
					}
					long r=atomicLong.decrementAndGet();
					System.out.println("get "+r);
					
				}
				
			}.run();;
	    }
		
	
	}
	private static Redisson redisson=null;
	public Redisson  getClient(){
		return  redisson;
	}
	public  void  init(String filename) throws FileNotFoundException, IOException
	{
		//RedissonClient client=new RedissonClient();
		PropertyReader propReader=PropertyReader.getInstance(filename);
		Properties props=propReader.getProperties(filename);
		String masterName=props.getProperty("redis.masterName");
		String masteAddress=props.getProperty("redis.masteAddress");
		int deployment_model=Integer.valueOf(props.getProperty("redis.deploymentModel")).intValue();
		String hosts=props.getProperty("redis.hosts");
		//redis部署模式 SingleHost 1 MasterSlave 2 Sentinel 3  Cluster 4 
		switch(deployment_model){
			case 1:
				redisson=getSingleClient(hosts);//单机
				break;
			case 2:
				redisson=getMasterSlaveClient(masteAddress,hosts);
				break;
			case 3:
				redisson=getSentinelClient(masterName,hosts);
				break;
			case 4:
				redisson=getClusterClient(hosts);
				break;
		}
		
	}
	public Redisson getSingleClient(String host){
		//Single server connection:
		// connects to default Redis server 127.0.0.1:6379
		//edisson redisson = Redisson.create();
		
		// connects to single Redis server via Config
		Config config = new Config();
		    config.useSingleServer()
		          .setAddress(host)
		          .setConnectionPoolSize(1000)
		          ;

		Redisson redisson = Redisson.create(config);
		return redisson;
	}
	//Master/Slave servers connection:
	public Redisson getMasterSlaveClient(String add,String hosts){
		Config config = new Config();
		String[] hostarr=hosts.split(",");
		config.useMasterSlaveConnection()
		    .setMasterAddress(add)
		    .setLoadBalancer(new RandomLoadBalancer()) // RoundRobinLoadBalancer used by default
		   
		    .addSlaveAddress(hostarr)
		    .setMasterConnectionPoolSize(10000)
		    .setSlaveConnectionPoolSize(10000);

		Redisson redisson = Redisson.create(config);
		return redisson;
	}
	//Sentinel servers connection:
	public Redisson getSentinelClient(String masterName,String hosts){
		String[] hostarr=hosts.split(",");
		Config config = new Config();
		config.useSentinelConnection()
		    .setMasterName(masterName)
		    .addSentinelAddress(hostarr)
		    .setMasterConnectionPoolSize(10000)
		    .setSlaveConnectionPoolSize(10000);

		Redisson redisson = Redisson.create(config);
		return redisson;
	}
	//Cluster nodes connections:
	public Redisson getClusterClient(String hosts){
		Config config = new Config();
		config.useClusterServers()
		    .setScanInterval(2000) // sets cluster state scan interval
		    .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
		    .setMasterConnectionPoolSize(10000)
		    .setSlaveConnectionPoolSize(10000);

		Redisson redisson = Redisson.create(config);
		return redisson;
	}
}

 

 

余量扣除代码片段2,扣除任意数值。

 

 

/**
	 * 扣取现金账本
	 * @param actid  账户id
	  * @return
	  * @throws FileNotFoundException
	 * @throws IOException
	  * @throws InterruptedException 
	 */
	private boolean deductCashAccount(String actid,int amount) throws FileNotFoundException, IOException, InterruptedException{
		/**扣余额直接操作redis缓存数据库,key由账户ID,-字符,字符串balance组成*/
		 long start=System.currentTimeMillis();
		 if(redisson==null) {
			 logger.error("Redisson is NULL");
			 return false;
		}
		String key="CASH_"+actid;
		String lock_point="LOCK_CASH_"+actid;
		RLock lock=redisson.getLock(lock_point);//获取账户锁对象
		logger.info("get lock "+lock_point);
		boolean locked=lock.tryLock(10,60, TimeUnit.SECONDS);//尝试锁住账户对象,waitTime第一个参数获取锁超时时间30毫秒,leaseTime第二参数,锁自动释放时间
		if(!locked) {
			logger.info("cann't get lock ,id="+actid);
			return false;
		}
		//lock.lock();
		logger.info("get lock "+lock_point+" ok");
		RBucket<Integer> atomicbalance = redisson.getBucket(key);//获取原子余量
		boolean result_flag=true;
		if(atomicbalance.get()==0) {
			logger.error(" error ,balance less than  or equal to 0");
			
			result_flag=false;
		}
		else{
			atomicbalance.set(atomicbalance.get().intValue()-amount);//扣除余量
			logger.info("balance is  "+atomicbalance.get());
			result_flag=true;
		}
		lock.unlock();//解锁
		 logger.info("debut cash , cost time:"+(System.currentTimeMillis()-start));
		return result_flag;
	}



 

实验结论:

本人在一项目中,使用以上两个分布式锁对象,进行了ab压力测试。设置初始余量,将以前代码片,发布为http api服务,ab多线程压力测试扣除余量操作,最终扣除的余量正确。

分享到:
评论
1 楼 newboy2004 2015-07-30  
这样搞的话,你redis里的数据和数据库里的数据 什么时候做同步呢?而且还有你这个扣除余量的这个数据,是都从数据库加载到这个redis缓存里吗?还是直接就不存在数据库,一开始所有的这些东西都存在于redis中,请指点,谢谢

相关推荐

    C#实操控制并发之Lock和Redis分布式锁

    本文将深入探讨C#中如何使用Lock和Redis分布式锁来解决并发问题,以秒杀系统为例进行阐述。 首先,让我们理解什么是并发控制。并发控制是指在多线程环境下确保数据的一致性和完整性,防止多个线程同时访问同一资源...

    redis分布式锁工具类

    现在很多项目单机版已经不满足了,分布式变得越受欢迎,同时也带来很多问题,分布式锁也变得没那么容易实现,分享一个redis分布式锁工具类,里面的加锁采用lua脚本(脚本比较简单,采用java代码实现,无须外部调用...

    记录redisson实现redis分布式事务锁

    首先,Redis作为一个内存数据库,其高速读写性能使其成为实现分布式锁的理想选择。分布式锁的主要作用是在多节点环境下保证同一时刻只有一个节点可以执行特定操作,避免并发问题。Redisson的分布式锁通过`RLock`接口...

    springboot基于redis分布式锁

    以下是一个简单的Spring Data Redis的分布式锁实现示例: ```java import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @Component public class...

    Java基于redis实现分布式锁代码实例

    Java基于Redis实现分布式锁代码实例 分布式锁的必要性 在多线程环境中,资源竞争是一个常见的问题。例如,在一个简单的用户操作中,一个线程修改用户状态,首先在内存中读取用户状态,然后在内存中进行修改,然后...

    redis分布式锁.zip

    Redis 分布式锁是分布式系统中解决并发控制和数据一致性问题的一种常见机制。在大型分布式应用中,单机锁无法满足需求,因为它们局限于单个服务器。Redis 的高可用性和低延迟特性使其成为实现分布式锁的理想选择。...

    C++基于redis的分布式锁redisAPI

    client.connect("localhost", 6379, [/* callback */](std::string host, std::string port, cpp_redis::client::connect_state status) { if (status == cpp_redis::client::connect_state::connected) client....

    Spring Boot+Redis 分布式锁:模拟抢单.docx

    Spring Boot+Redis 分布式锁:模拟抢单

    redis实现分布式锁,自旋式加锁,lua原子性解锁

    Redis作为一个高性能的键值存储系统,常被用作实现分布式锁的工具。本文将深入探讨如何使用Redis实现分布式锁,以及如何利用自旋式加锁和Lua脚本实现原子性解锁。 首先,我们来理解分布式锁的基本概念。分布式锁是...

    redis分布式加锁解锁代码

    redis分布式锁,包含单服务器上锁解锁情况,和分布式上锁解锁情况,全部封装在类里,有需要可以下载,希望可以帮助到你。

    用Redis实现分布式锁_redis_分布式_

    1. 响应速度:Redis是内存数据库,读写速度快,非常适合处理高并发的锁请求。 2. 原子性:Redis的操作是原子性的,如`SETNX`(设置如果不存在)、`EXPIRE`(设置过期时间),确保了锁的获取与释放不会出现竞态条件。 3. ...

    提供一些通用,可用组件_#redis实现分布式锁:_1、不支持可重入_2、配置集群模式_2.1集群模_services.zip

    提供一些通用,可用组件_#redis实现分布式锁:_1、不支持可重入_2、配置集群模式_2.1集群模_services

    redis分布式锁工具包,提供纯Java方式调用,支持传统Spring工程.zip

    Redis作为一个内存数据库,因其高性能和丰富的数据结构,常被用作实现分布式锁的中间件。 该工具包基于Redis的`SETNX`(设置如果不存在)命令和`EXPIRE`(设置过期时间)命令来创建和释放锁,确保了锁的互斥性和...

    Spring Boot+Redis 分布式锁:模拟抢单.pdf

    本次文档将围绕Spring Boot结合Redis实现分布式锁的机制进行详细解读,并通过模拟抢单的场景来展示其实际运用。 首先,了解Redis的分布式锁是如何运作的至关重要。在Redis的诸多操作中,`set`命令的`nx`选项(即`...

    java开发基于SpringBoot+WebSocket+Redis分布式即时通讯群聊系统.zip

    Java开发基于SpringBoot+WebSocket+Redis分布式即时通讯群聊系统。一个基于Spring Boot + WebSocket + Redis,可快速开发的分布式即时通讯群聊系统。适用于直播间聊天、游戏内聊天、客服聊天等临时性群聊场景。 ...

    redis实现分布式锁(java/jedis)

    redis实现分布式锁(java/jedis),其中包含工具方法以及使用demo 本资源是利用java的jedis实现 redis实现分布式锁(java/jedis),其中包含工具方法以及使用demo 本资源是利用java的jedis实现

    zk:redis分布式锁.zip

    本压缩包“zk:redis分布式锁.zip”提供了基于Zookeeper(zk)和Redis两种分布式锁实现的示例和相关资料。 首先,我们来看Zookeeper(zk)的分布式锁。Zookeeper是Apache的一个开源项目,提供了一个高可用的、高性能...

    Redis分布式锁实现Redisson 15问.doc

    Redis分布式锁实现Redisson 15问 Redis分布式锁是指在分布式系统中,多个服务实例之间对同一个资源加锁的机制,以保证数据的一致性和安全性。Redisson是一个基于Redis的分布式锁实现,它提供了一个高效、可靠的加锁...

    C#.net Redis分布式锁源码实现

    Redis作为一个内存数据存储系统,因其高性能、高可用性以及丰富的数据结构,常被用作分布式锁的实现平台。C#.NET开发人员可以通过StackExchange.Redis库与Redis进行交互。 在C#.NET中使用Redis实现分布式锁,主要...

Global site tag (gtag.js) - Google Analytics