`

使用Zookeeper

 
阅读更多

  

一.部署与运行

      1.系统环境

         zk支持绝大多少操作系统.不过需要注意的是,FreeBsd系统的JVM对JAVA的NIO支持的不是很好,不建议在该系统上部署生产环境.

      2.java环境

        建议使用jdk1.6或者以上的版本.

      3.集群或者单机

         

          3.1集群模式需要至少三台机器, jdk和zk都下载配置好之后.修改$ZK_HOME/conf下面的zoo_sample.cfg

  1. tickTime=2000    
  2. initLimit=5    
  3. syncLimit=2    
  4. dataDir=/var/lib/zk/  
  5. dataLogDir=/var/lib/zk/logs    
  6. clientPort=2181  
  7. #0代表server id,在datadir下创建一个myid文件,里面的内容就是这个id
  8. server.0=192.168.1.10:8880:7770    
  9. server.1=192.168.1.11:8881:7771    
  10. server.2=192.168.1.12:8882:7772  

       接着在datadir目录下创建myid文件,内容就是server.id

       接着使用$ZK_HOME/bin里面的zkServer.sh启动,sh zkServer.sh start

       用命令echo stat | nc localhost 2181测试是否启动成功(失败有可能是防火墙的原因)

       3.1平常测试的时候只要使用单机模式即可

  1. tickTime=2000    
  2. initLimit=5    
  3. syncLimit=2    
  4. dataDir=/var/lib/zk/  
  5. dataLogDir=/var/lib/zk/logs    
  6. clientPort=2181  
  7. #0代表server id,在datadir下创建一个myid文件,里面的内容就是这个id
  8. server.1=127.0.0.1:8880:7770   

        

二.zk的客户端

          客户端的启动命令: sh zkCli -server ip:port

          创建节点:create [-s] [-e]  path  data acl s代表持久节点,e代表临时节点,acl代表权限.默认创建持久节点.例如: create /zknode 123 digest:  username:password

          列出路径下的子节点: ls [path]  .path代表路径 。 ls   /zknode

          获取节点的属性: get [path]

          更新节点属性: set [path] data [version]  

          删除节点: delete path [version]

          设置节点acl:  setAcl path acl .例如: setAcl path acl.

         

 三.zk java客户端

    1.连接客户端

     下面的代码是一个基本的zk  java客户端的代码

 

public class ZkTest implements Watcher{
    private static CountDownLatch cdl  = new CountDownLatch(1);
	public static void main(String[] args) throws IOException {
		ZooKeeper zk = new ZooKeeper("192.168.43.191:2181", 5000, new ZkTest());
		System.out.println(zk.getState());
		try {
			cdl.await();
		} catch (InterruptedException e) {
		}
		System.out.println("connect");
	}

	@Override
	public void process(WatchedEvent arg0) {
		System.out.println("事件:"+arg0+";"+arg0.getState());
		if(KeeperState.SyncConnected == arg0.getState()){
			System.out.println(arg0.getState());
			cdl.countDown();
		}
		
	}
	
}
        下面的代码是zk  session的复用

 

    

public class Zk_ID_PWD implements Watcher{
    private static CountDownLatch cdl  = new CountDownLatch(1);
	public static void main(String[] args) throws IOException, InterruptedException {
		ZooKeeper zk = new ZooKeeper("192.168.43.191:2181", 
				5000, new Zk_ID_PWD());
		cdl.await();
		long id = zk.getSessionId();
		byte[]  pwd = zk.getSessionPasswd();
		//连接失败
		zk = new ZooKeeper("192.168.43.191:2181", 
				5000, new Zk_ID_PWD(),1l,"test".getBytes());
		//连接成功
		zk = new ZooKeeper("192.168.43.191:2181",
				5000, new Zk_ID_PWD(),id,pwd);
		Thread.sleep(Integer.MAX_VALUE);
	}

	@Override
	public void process(WatchedEvent arg0) {
		System.out.println("事件:"+arg0+";"+arg0.getState());
		if(KeeperState.SyncConnected == arg0.getState()){
			System.out.println(arg0.getState());
			cdl.countDown();
		}
		
	}
	
}
 

 

      2.创建节点

         

public class Zk_Create implements Watcher {
	private static CountDownLatch cdl = new CountDownLatch(1);
	private static ZooKeeper zk = null;
	static {
		try {
			zk = new ZooKeeper("192.168.43.191:2181", 5000, new Zk_Create());
			System.out.println(zk.getState());
			
		} catch (IOException e1) {
			// TODO 请把关键信息(包括堆栈)记入日志,并删除下面一行。
			e1.printStackTrace();
		}

	}

	public static void main(String[] args) throws Exception {
           Zk_Create zc = new Zk_Create();
           try {
				cdl.await();
			} catch (InterruptedException e) {
			}
			System.out.println("connect");
           // zc.createTempNode();
            zc.createTempNodeAsync();
	}

	// 创建临时节点
	public void createTempNode() throws KeeperException, InterruptedException {
		String path = zk.create("/zcf", "zcf".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		System.out.println("成功创建节点:" + path);
		String path1 = zk.create("/zcf", "zcf".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println("成功创建节点:" + path1);
    }
    
	public void createTempNodeAsync() throws Exception{
		zk.create("/zcf", "zcf".getBytes(), 
				Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL
				,new IStringBack(),"async");
		Thread.sleep(Integer.MAX_VALUE);
    }
	
	public class IStringBack implements AsyncCallback.StringCallback{

		@Override
		public void processResult(int arg0, String arg1, Object arg2, String arg3) {

			System.out.println("create path result:"+ arg0+","+arg1+","+arg2+","+arg3);
			
		}
		
	}
	
	@Override
	public void process(WatchedEvent arg0) {
		System.out.println("事件:" + arg0 + ";" + arg0.getState());
		if (KeeperState.SyncConnected == arg0.getState()) {
			System.out.println(arg0.getState());
			cdl.countDown();
		}

	}
}
     3.权限设置以及授权

 

  

public class Zk_Auth implements Watcher{
    final static String path = "/zcf_auth";
    final static String path1 = path + "/child";
    private static CountDownLatch cdl = new CountDownLatch(3);
    public static void main(String[] args) throws Exception {
		Zk_Auth za = new Zk_Auth();
		//za.testAuth();
		za.testDelete();
	}
    //删除权限的设置,不影响删除没有子节点的节点
    public void testAuth() throws Exception{
    	ZooKeeper zk = ZKUtil.getInstance(null, new Zk_Auth());
    	ZooKeeper zk1 = ZKUtil.getInstance(null, new Zk_Auth());
    	ZooKeeper zk2 = ZKUtil.getInstance(null, new Zk_Auth());
    	cdl.await();
    	System.out.println("connect");
    	//第二个参数类似username:pwd
    	zk.addAuthInfo("digest", "zcf:true".getBytes());
    	zk.create(path, "zcf".getBytes(),
    			Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    	zk.create(path, "zcf".getBytes(),
    			Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    	
    	zk1.addAuthInfo("digest", "zcf:true".getBytes());
    	System.out.println(zk1.getData(path, false, null));
    	
    	zk2.addAuthInfo("digest", "zcf:false".getBytes());
    	//会爆没权限错误
    	System.out.println(zk2.getData(path, false, null));
    }
    public void testDelete() throws IOException, Exception{
    	ZooKeeper zk = ZKUtil.getInstance(null, new Zk_Auth());
    	ZooKeeper zk1 = ZKUtil.getInstance(null, new Zk_Auth());
    	ZooKeeper zk2 = ZKUtil.getInstance(null, new Zk_Auth());
    	cdl.await();
    	System.out.println("connect");
    	//第二个参数类似username:pwd
    	zk.addAuthInfo("digest", "zcf:true".getBytes());
    	zk.create(path, "zcf".getBytes(),
    			Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    	zk.create(path1, "zcf".getBytes(),
    			Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    	try {
    		zk1.delete(path1, -1);
		} catch (Exception e) {
			e.printStackTrace();
		}
    	zk2.addAuthInfo("digest", "zcf:true".getBytes());
    	//会爆没权限错误
    	zk2.delete(path1, -1);
    	System.out.println("成功删除节点:"+path1);
    	zk1.delete(path, -1);
    	System.out.println("成功删除节点:"+path);
    	
    }
    @Override
	public void process(WatchedEvent arg0) {
		System.out.println("事件:" + arg0 + ";" + arg0.getState());
		if (KeeperState.SyncConnected == arg0.getState()) {
			System.out.println(arg0.getState());
			cdl.countDown();
		}

	}
}
 

 

二.curator

    1.master选举

      应用场景:定时任务只需要在一台机器上执行,通过master选择功能,选择其中一台执行.

      原理:每个zk客户端会在指定path下生产一个类似这样的path.节点序号递增,每次节点最小的成为leader.然后每次执行完任务之后,抢到资源的节点会自己删除,然后重新生产一个节点,继续和其他节点竞争.

 

    [_c_54dd51db-0e4a-4fdf-9251-f39e88102a87-lock-0000000121, 
    _c_af3197ec-da19-4812-b20d-bb3a010be549-lock-0000000118,
    _c_9a0edc16-5489-4d95-b24b-e1610c133bbe-lock-0000000120]
 

 

      反思:

        我们都知道,通过Leader选举可以选举出一台机器来执行定时任务.每次到执行Job的时候重新进行一次竞选,成为Leader者进行执行。

         这里就需要考虑一下问题:

        1.  第二种方案如果服务器的时间不一致如何处理?如果每台机器Job执行的时间不一致如何处理?
        2.  定时任务的幂等性保证.(token)

 

public class Zk_Curator_Master {

    public static void main(String[] args) throws InterruptedException {

    for(int i = 0;i< 6;i++){

    CuratorFramework  cf = CuratorFrameworkFactory.newClient("192.168.43.191:2181",

        new ExponentialBackoffRetry(1000, 3));

        cf.start();

        String name ="client #"+i;

        LeaderSelector s = new LeaderSelector(cf,

        "/zcf/master_test", new LeaderSelectorListener() {

    @Override

    public void stateChanged(CuratorFramework client, ConnectionState newState) {

    

    if(newState == ConnectionState.SUSPENDED||ConnectionState.LOST == newState){

    throw new CancellationException();

    }
    }

    @Override

    public void takeLeadership(CuratorFramework arg0) throws Exception {

    System.out.println(name+"成为master角色");

     Thread.sleep(3000);

    System.out.println(name+"完成master任务");

    

    }

    });

        s.autoRequeue();

        s.start();

Thread.sleep(Integer.MAX_VALUE);

    }

}

 

  2.分布式锁以及分布式计数器

   

public class ZK_Distributed_Lock {
	
	private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
		 normalLockTest();
    	DistributedLock();
	}
    
    public static void DistributedLock(){
    	CuratorFramework  cf = 	CuratorFrameworkFactory.newClient("192.168.43.191:2181",
    			new ExponentialBackoffRetry(1000, 3));
    	cf.start();
    	final InterProcessMutex lock = new InterProcessMutex(
    			cf, "/zcf/distributeLock");
    	CountDownLatch cdl = new CountDownLatch(1);
    	for(int i = 0 ;i< 30 ;i++){
    		new Thread(new Runnable() {
				
				@Override
				public void run() {
					try {
						cdl.await();
						lock.acquire();
						SimpleDateFormat  sdf = new SimpleDateFormat("HH:mm:ss/SSS");
						System.out.println(sdf.format(new Date()));
					} catch (InterruptedException e) {
						// TODO 请把关键信息(包括堆栈)记入日志,并删除下面一行。
						e.printStackTrace();
					} catch (Exception e) {
						// TODO 请把关键信息(包括堆栈)记入日志,并删除下面一行。
						e.printStackTrace();
					}finally{
						try {
							lock.release();
						} catch (Exception e) {
							// TODO 请把关键信息(包括堆栈)记入日志,并删除下面一行。
							e.printStackTrace();
						}
					}
					
				}
			}).start();
    	}
    	cdl.countDown();
    }
    
    public static void normalLockTest(){
    	 CountDownLatch cdl = new CountDownLatch(1);
    	for(int i = 0 ;i< 10;i++){
			new Thread(new Runnable() {
				
				@Override
				public void run() {
					try {
						cdl.await();
						lock.lock();
						SimpleDateFormat  sdf = new SimpleDateFormat("HH:mm:ss/SSS");
						System.out.println(sdf.format(new Date()));
					} catch (InterruptedException e) {
						// TODO 请把关键信息(包括堆栈)记入日志,并删除下面一行。
						e.printStackTrace();
					}finally {
						lock.unlock();
					}
					
				}
			}).start();
		}
		cdl.countDown();
    }
}

 

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

public class ZK_Distributed_Count {
	
    public static void main(String[] args) {
    	DistributedCount();
	}
    
    public static void DistributedCount(){
    	CuratorFramework  cf = 	CuratorFrameworkFactory.newClient("192.168.43.191:2181",
    			new ExponentialBackoffRetry(1000, 3));
    	cf.start();
    	   	CountDownLatch cdl = new CountDownLatch(1);
    	for(int i = 0 ;i< 30 ;i++){
    		final DistributedAtomicInteger d = new DistributedAtomicInteger(
        			cf, "/zcf/distributeCount",new RetryNTimes(3, 1000));
     
    		new Thread(new Runnable() {
				
				@Override
				public void run() {
					try {
						cdl.await();
						AtomicValue<Integer> value = d.add(1);
						while(!value.succeeded()){
							value = d.add(1);
						}
						System.out.println(value.postValue());
					} catch (InterruptedException e) {
						// TODO 请把关键信息(包括堆栈)记入日志,并删除下面一行。
						e.printStackTrace();
					} catch (Exception e) {
						// TODO 请把关键信息(包括堆栈)记入日志,并删除下面一行。
						e.printStackTrace();
					}
					
				}
			}).start();
    	}
    	cdl.countDown();
    }
    
   
}
  

 

分享到:
评论

相关推荐

    dubbo使用zookeeper注册最小依赖jar包

    dubbo使用zookeeper注册最小依赖jar包,包含dubbo核心包一个,Commons包一个,log4j包一个,netty核心包一个,slf4j包两个,spring核心包一个,zookeeper包两个,javassist包一个。本地搭建可以成功调用服务。欢迎来...

    使用zookeeper实现分布式共享锁

    本篇文章将深入探讨如何使用Zookeeper实现分布式共享锁。 分布式锁是一种在多节点之间共享资源的机制,它允许在同一时间只有一个节点对资源进行操作。在Java环境中,我们可以利用Zookeeper的API来创建和管理这种锁...

    使用Zookeeper来为你的程序加上Leader Election的功能。

    在 Leader Election 场景中,ZooKeeper 使用一种称为“选举算法”的机制来决定哪个节点成为领导者。 选举算法通常基于节点的优先级或状态信息。在 ZooKeeper 中,每个参与选举的节点都会创建一个临时顺序节点...

    为什么不使用ZooKeeper构建云平台发现服务.pdf

    【描述】:本文探讨了为何在云环境中不建议使用ZooKeeper作为服务发现服务的原因,通过对比ZooKeeper与Eureka,从云平台特点、CAP原理及运维角度分析了两者的优缺点。 【标签】:技术 【正文】: 在构建云平台...

    dotnet-封装了常用的功能以更方便net开发者更好的使用zookeeper

    标题中的“dotnet-封装了常用的功能以更方便net开发者更好的使用zookeeper”指的是一个.NET框架下的Zookeeper客户端库,这个库特别针对.NET开发者进行了优化,提供了丰富的功能,旨在简化Zookeeper在.NET环境中的...

    精品课程推荐 大数据与云计算教程课件 优质大数据课程 24.使用Zookeeper构建应用(共34页).pptx

    【大数据与云计算教程】课程涵盖了从基础到高级的大数据处理技术,其中重点讲解了Hadoop、Zookeeper等关键组件的使用。Hadoop是大数据处理的核心框架,包括HDFS(分布式文件系统)、MapReduce(分布式计算模型)以及...

    Hbase与zookeeper笔记备份.rar

    4. 联合使用Zookeeper:Zookeeper在Hbase中扮演关键角色,负责节点注册、Region Server的选举、客户端的负载均衡以及监控等任务,保证了Hbase的稳定运行。 三、Hbase与Zookeeper的协同工作 1. Zookeeper在Hbase中...

    使用ZooKeeper实现分布式锁

    这里,我们将深入探讨如何利用ZooKeeper这一强大的分布式协调服务来实现分布式锁,以解决订单编号的唯一性问题。 ZooKeeper是由Apache Hadoop项目孵化的开源项目,它提供了一个高可用、高性能的分布式协调服务。其...

    自己动手写基于动态代理,使用ZooKeeper作为注册中心,以Netty进行网络通信的RPC框架

    自己动手写基于动态代理,使用ZooKeeper作为注册中心,以Netty进行网络通信的RPC框架。更多详情请查看相关博客:https://blog.csdn.net/qq_31142553/article/details/86316654

    C#使用Zookeeper示例

    1. 使用前先Nuget搜索ZooKeeper.Net安装 2. 安装ZooKeeper,百度自行搜索“Windows下安装ZooKeeper” 3. 下载路径:https://mirrors.cnnic.cn/apache/zookeeper/

    Java-zookeeper实践代码(分布式锁/注册中心)

    在IT行业中,Zookeeper是Apache...通过学习和分析这些代码,可以加深对Zookeeper在Java应用中的实际使用的理解,提升在分布式系统开发中的技能。同时,也可以借鉴这些代码设计模式,应用到自己的项目中,解决类似问题。

    apache zookeeper使用方法实例详解

    Apache ZooKeeper 使用方法实例详解 Apache ZooKeeper 是一个高效、可靠、易于使用的分布式协同服务,它可以为分布式应用提供统一命名服务、配置管理、状态同步和组服务等。 ZooKeeper 的主要特点是提供了一个简洁...

    使用ZooKeeper实现软负载均衡示例

    ZooKeeper,由Apache开发的一个分布式的,开放源码的协调服务,常被用于实现这种软负载均衡。本示例将深入探讨如何利用ZooKeeper实现软负载均衡。 首先,我们需要理解ZooKeeper的基本概念。ZooKeeper是一个高可用的...

    zookeeper的开发使用技巧和常用命令

    ZooKeeper 的开发使用技巧和常用命令 ZooKeeper 是一个开源的分布式协调服务,广泛应用于大型分布式系统中。下面是 ZooKeeper 的开发使用技巧和常用命令,适合初学者和服务器开发人员。 一、ZooKeeper 的安装和...

    zookeeper-3.9.1.zip

    然后,可以通过`bin/zkServer.sh`脚本来启动或停止Zookeeper服务,并使用`bin/zkCli.sh`连接到服务器进行操作,如创建、删除和查看ZNode(Zookeeper中的数据节点)。 总的来说,Apache ZooKeeper是分布式系统中不可...

    Zookeeper双机房容灾方案.pdf

    Zookeeper双机房容灾方案是指在分布式系统中使用Zookeeper来实现高可用性和容灾的方案。本方案使用5个Zookeeper实例来实现高可用性和容灾。 Zookeeper选举机制是指Zookeeper集群中leader的选举机制。Zookeeper...

    zookeeper 3.8.4

    **ZooKeeper 3.8.4:分布式协调服务详解** Apache ZooKeeper 是一个高度可靠的分布式协调服务,广泛应用于...正确理解和使用 ZooKeeper 可以帮助开发者解决许多分布式环境下的难题,提高系统的整体性能和可靠性。

    Zookeeper使用场景及详解

    ### Zookeeper使用场景及详解 #### 一、概述 Zookeeper是一个分布式的、开放源码的数据管理和协调服务框架。它最初是由雅虎研究院开发并开源的,后来成为了Apache的一个顶级项目。Zookeeper的设计目的是为了简化...

    zookeeper可视化工具

    使用Zookeeper可视化工具的步骤 - **安装和配置**:下载并安装相应的Zookeeper可视化工具,根据工具文档配置连接到Zookeeper集群的参数,如服务器地址、端口等。 - **连接Zookeeper**:启动工具,输入配置好的连接...

    ZooKeeper3.4.9 windos和linux

    7.1 监控工具:使用JMX(Java Management Extensions)监控ZooKeeper的运行状态,例如CPU使用率、内存使用等。 7.2 日志分析:定期检查日志文件,排查可能的错误和异常。 总结,ZooKeeper 3.4.9在Windows和Linux上...

Global site tag (gtag.js) - Google Analytics