`

etcd分布式锁实现

 
阅读更多

引入maven依赖:

  <dependency>
	    <groupId>com.coreos</groupId>
	    <artifactId>jetcd-core</artifactId>
	    <version>0.0.2</version>
	</dependency>

 

分布式锁实现:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.Lease;
import com.coreos.jetcd.Lock;
import com.coreos.jetcd.data.ByteSequence;

/**
 * Etcd Java客户端 Jetcd提供的Lock客户端实现分布式锁
 */
public class EtcdDistributedLock {
	
	private static EtcdDistributedLock lock = null;
	private static Object mutex = new Object();
	private Client client; // etcd客户端
	private Lock lockClient; // etcd分布式锁客户端
	private Lease leaseClient; // etcd租约客户端

	private EtcdDistributedLock() {
		super();
		// 创建Etcd客户端,本例中Etcd集群只有一个节点
		this.client = Client.builder().endpoints("http://localhost:2379").build();
		this.lockClient = client.getLockClient();
		this.leaseClient = client.getLeaseClient();
	}

	/**
	 * 单例
	 */
	public static EtcdDistributedLock getInstance() {
		synchronized (mutex) { // 互斥锁
			if (null == lock) {
				lock = new EtcdDistributedLock();
			}
		}
		return lock;
	}

	/**
	 * 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。
	 * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名
	 * @param TTL: Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁
	 * @return LockResult
	 */
	public LockResult lock(String lockName, long TTL) {
		LockResult lockResult = new LockResult();
		
		/* 1.准备阶段 */
		// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;
		// 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁
		ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

		// 初始化返回值lockResult
		lockResult.setIsLockSuccess(false);
		lockResult.setService(service);

		// 记录租约ID,初始值设为 0L
		Long leaseId = 0L;

		/* 2.创建租约 */
		try {
			// 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定
			leaseId = leaseClient.grant(TTL).get().getID();
			lockResult.setLeaseId(leaseId);

			// 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定
			long period = TTL - TTL / 5;
			service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period, TimeUnit.SECONDS);
		} catch (InterruptedException | ExecutionException e) {
			System.err.println("[error]: Create lease failed:" + e);
			return lockResult;
		}
		System.out.println(System.currentTimeMillis() + "|[  lock]: "+Thread.currentThread().getName()+" start to lock.");

		/* 3.加锁操作 */
		// 执行加锁操作,并为锁对应的key绑定租约
		try {
			lockClient.lock(ByteSequence.fromString(lockName), leaseId).get();
		} catch (InterruptedException | ExecutionException e1) {
			System.err.println("[error]: lock failed:" + e1);
			return lockResult;
		}
		System.out.println(System.currentTimeMillis() + "|[  lock]: "+Thread.currentThread().getName()+" lock successfully.");

		lockResult.setIsLockSuccess(true);
		return lockResult;
	}

	/**
	 * 解锁操作,释放锁、关闭定时任务、解除租约
	 * 
	 * @param lockName:锁名
	 * @param lockResult:加锁操作返回的结果
	 */
	public void unLock(String lockName, LockResult lockResult) {
		System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" start to unlock.");
		try {
			// 释放锁
			lockClient.unlock(ByteSequence.fromString(lockName)).get();
			// 关闭定时任务
			lockResult.getService().shutdown();
			// 删除租约
			if (lockResult.getLeaseId() != 0L) {
				leaseClient.revoke(lockResult.getLeaseId());
			}
		} catch (InterruptedException | ExecutionException e) {
			System.err.println("[error]: unlock failed: " + e);
		}

		System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" unlock successfully.");
	}

	/**
	 * 在等待其它客户端释放锁期间,通过心跳续约,保证自己的锁对应租约不会失效
	 */
	static class KeepAliveTask implements Runnable {
		private Lease leaseClient;
		private long leaseId;

		KeepAliveTask(Lease leaseClient, long leaseId) {
			this.leaseClient = leaseClient;
			this.leaseId = leaseId;
		}

		@Override
		public void run() {
			// 续约一次
			leaseClient.keepAliveOnce(leaseId);
		}
	}

	/**
	 * 该class用于描述加锁的结果,同时携带解锁操作所需参数
	 */
	static class LockResult {
		private boolean isLockSuccess;
		private long leaseId;
		private ScheduledExecutorService service;

		LockResult() {
			super();
		}

		public void setIsLockSuccess(boolean isLockSuccess) {
			this.isLockSuccess = isLockSuccess;
		}

		public void setLeaseId(long leaseId) {
			this.leaseId = leaseId;
		}

		public void setService(ScheduledExecutorService service) {
			this.service = service;
		}

		public boolean getIsLockSuccess() {
			return this.isLockSuccess;
		}

		public long getLeaseId() {
			return this.leaseId;
		}

		public ScheduledExecutorService getService() {
			return this.service;
		}
	}

	/**
	 * 测试分布式锁
	 * @param args
	 */
	public static void main(String[] args) {
		// 模拟分布式场景下,多个进程 “抢锁”
		for (int i = 0; i < 10; i++) {
			new MyThread().start();
		}
	}

	static class MyThread extends Thread {
		@Override
		public void run() {
			String lockName = "/lock/mylock"; // 分布式锁名称
			
			// 1. 加锁
			LockResult lockResult = getInstance().lock(lockName, 30);
			if (lockResult.getIsLockSuccess()) { // 获得了锁
				try {
					Thread.sleep(10000); // sleep 10秒,模拟执行相关业务
				} catch (InterruptedException e) {
					System.out.println("[error]:" + e);
				}
			}

			// 2. 解锁
			getInstance().unLock(lockName, lockResult);
		}
	}
}

 

 

 

分享到:
评论

相关推荐

    分布式锁/信号量的实现方式:基于Redis、Zookeeper 、etcd 的分布式锁 确保在分布式环境中安全地访问共享资源

    下面将详细介绍基于Redis、Zookeeper以及etcd的分布式锁实现方式。 #### 二、基于Redis的分布式锁实现 Redis作为一款高性能的键值存储系统,非常适合用来实现分布式锁。以下是一个简单的示例代码: - **获取锁**...

    分布式锁实现案例.pdf.zip

    具体到案例中的"分布式锁实现案例.pdf"文档,可能会详细分析以上提到的实现方式,包括各自的优缺点、适用场景以及具体代码示例。读者可以通过阅读这份文档,深入理解分布式锁的工作机制,并学会如何在实际项目中应用...

    etcd分布式存储系统 v3.4.30.zip

    etcd是一个分布式的、可靠的键值对存储系统,它被广泛应用于服务发现、配置共享以及分布式锁等场景。在版本v3.4.30中,etcd继续提供高可用性和强一致性,这是其核心特性。这个zip文件包含了etcd v3.4.30的源代码以及...

    【代码】基于etcd的分布式队列(golang版)

    基于go+etcd实现分布式锁 clientv3文档example: 文档example client.go package etcdq import ( context v3 go.etcd.io/etcd/clientv3 time ) type Client interface { ReadItem(string) KV ReadItems(string...

    2021-07-28-.NET 6 秒杀项目---分布式锁落地实战.zip

    2. 分布式锁的实现:学习如何在.NET环境中实现分布式锁,比如使用Redis、Zookeeper或者Etcd等分布式协调服务,以确保并发访问的正确性。 3. 高并发处理:研究秒杀场景下如何处理大量用户请求,包括限流、队列处理和...

    分布式锁-分析产生的原因,推导解决方案的原理及注意事项,适用于redis/hbase/zookpeer/etcd/mysql等

    Redisson是基于Redis的Java客户端,提供了一套完整的分布式锁实现,支持单机、哨兵和集群模式。 3. 基于Zookeeper实现:Zookeeper的临时节点可以作为分布式锁的基础,节点创建即加锁,节点删除即解锁,同时Zookeeper...

    分布式锁与信号量.zip

    4. **基于Etcd的分布式锁**:Etcd是Kubernetes的基础组件,它提供了一种分布式键值存储,可以用于实现分布式锁,其API简洁,易于使用。 **信号量(Semaphore)**则是另一种并发控制机制,主要用于控制同时访问特定...

    分布式锁的实现1

    在本文中,我们将深入探讨基于Redis实现的分布式锁及其关键特性。 首先,Redis提供了`SETNX`命令,用于在键不存在的情况下设置键值,这确保了分布式锁的互斥性。然而,仅仅使用`SETNX`还不足以防止死锁,因此通常会...

    并发-分布式锁质量保障总结

    其次,选择合适的分布式锁实现,根据并发量、性能需求和系统复杂性来决定是MySQL、Redis还是Zookeeper。 **事中保障**主要是通过CodeReview来确保技术实现的正确性。对于Redis分布式锁,特别需要注意: - **Redis ...

    分布式锁与信号量-2023C-mas开发笔记

    常见的分布式锁实现包括基于数据库的锁(如MySQL的行级锁)、基于缓存的锁(如Redis的setnx命令)和基于Zookeeper的锁等。每种实现方式都有其优缺点,例如,数据库锁依赖于数据库事务,性能可能较低;而Redis和...

    分布式锁与信号量.md

    - **实现**:分布式锁的实现通常依赖于外部的服务来进行协调,例如 ZooKeeper、etcd 或 Redis 等,而信号量的实现可以是语言内置的或是基于特定并发库提供的。 在实际应用中,选择使用分布式锁还是信号量主要取决于...

    分布式锁与信号量per-ma开发笔记

    常见的分布式锁实现包括基于Zookeeper、Redis和Etcd等中间件的服务。这些实现通常会提供锁的获取、释放和超时等基本功能,同时考虑了锁的公平性、可重入性和故障恢复等问题。 信号量,另一方面,是一种更通用的并发...

    使用分布式锁出现的问题及解决方案.doc

    基于Redis的分布式锁实现通常简单易用,但并非无懈可击。一个基本的实现方式包括以下两个关键点: 1. 加锁与解锁必须使用相同的标识,通常通过生成唯一的ID来实现。 2. 设置锁的过期时间,以防止资源被永久锁定。 ...

    分布式锁与信号量概述.pdf

    Redis的分布式锁实现通常包括以下几个关键步骤: - 获取锁:使用`SETNX`命令尝试设置键值对,成功则表示获取锁。 - 释放锁:使用`DEL`命令删除相应的键,释放锁。 - 锁超时:通过`EXPIRE`命令设置键的过期时间,...

    分布式锁+id生成器+限流工具

    常见的分布式锁实现包括基于Zookeeper、Redis、Etcd等中间件。分布式锁的应用场景广泛,例如在数据库事务处理、并发控制、分布式事务中,它可以防止多个节点同时修改同一数据,从而保证数据的一致性。在设计分布式锁...

    node-etcd-lock:由etcd v3支持的Node.js分布式锁

    节点等锁 由支持的Node.js分布式锁。安装npm install node-etcd-lock用法'use strict'const assert = require ( 'assert' )const Locker = require ( 'node-etcd-lock' )const locker = new Locker ( { address : '...

    【高并发】高并发分布式锁架构解密,不是所有的锁都是分布式锁!!

    实现分布式锁通常需要一个中心协调者,例如ZooKeeper、Redis或Etcd等,它们提供了原子性的操作,如SETNX(Redis中的“Set if Not Exist”)来实现锁的获取与释放。此外,分布式锁还需要考虑以下关键特性: 1. **...

    分布式架构系统中分布锁的实现

    一种常见的分布式锁实现方式是基于Redis的`setnx`命令。`SETNX`(Set if Not eXists)是Redis中用于设置键值对的一个原子操作,只有当键不存在时,才会设置成功,并返回1;如果键已经存在,则不进行任何操作并返回0...

    云原生分布式存储基石 etcd深入解析

    在分布式协调任务如负载均衡、分布式锁、健康检查等方面,etcd也发挥着重要作用。 3. **etcd的架构设计**:etcd采用集群模式运行,每个节点都是对等的,可以通过选举产生领导者。这种设计确保了系统的高可用性。...

Global site tag (gtag.js) - Google Analytics