- 浏览: 111885 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
土豆蛋儿:
我想读取一个外部文件,以什么方式好了? 文件内容经常编辑
flume 自定义source -
土豆蛋儿:
大神,您好。
flume 自定义source
引用
原文:http://www.cnblogs.com/haippy/archive/2012/07/20/2600077.html
简介
Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子项目发展而来,现在已经成为了 Apache 的顶级项目。Zookeeper 为分布式系统提供了高效可靠且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等。 Zookeeper 接口简单,开发人员不必过多地纠结在分布式系统编程难于处理的同步和一致性问题上,你可以使用 Zookeeper 提供的现成(off-the-shelf)服务来实现分布式系统的配置管理,组管理,Leader 选举等功能。
英文原文地址:
引用
http://zookeeper.apache.org/doc/current/javaExample.html
一个简单的 Zookeeper Watch 客户端
为了介绍 Zookeeper Java API 的基本用法,本文将带你如何一步一步实现一个功能简单的 Zookeeper 客户端。该 Zookeeper 客户端会监视一个你指定 Zookeeper 节点 Znode, 当被监视的节点发生变化时,客户端会启动或者停止某一程序。
基本要求
该客户端具备四个基本要求:
客户端所带参数:
Zookeeper 服务地址。
被监视的 Znode 节点名称。
可执行程序及其所带的参数
客户端会获取被监视 Znode 节点的数据并启动你所指定的可执行程序。
如果被监视的 Znode 节点发生改变,客户端重新获取其内容并再次启动你所指定的可执行程序。
如果被监视的 Znode 节点消失,客户端会杀死可执行程序。
程序设计
一般而言,Zookeeper 应用程序分为两部分,其中一部分维护与服务器端的连接,另外一部分监视 Znode 节点的数据。在本程序中,Executor 类负责维护 Zookeeper 连接,DataMonitor 类监视 Zookeeper 目录树中的数据, 同时,Executor 包含了主线程和程序主要的执行逻辑,它负责少量的用户交互,以及与可执行程序的交互,该可执行程序接受你向它传入的参数,并且会根据被监视的 Znode 节点的状态变化停止或重启。
Executor类
Executor 对象是本例程最基本的“容器”,它包括Zookeeper 对象和DataMonitor对象。
public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } }
回忆一下 Executor 的任务是根据 Zookeeper 中 Znode 节点状态改变所触发的事件来启动和停止你在命令行指定的可执行程序, 在上面的代码你可以看到,Executor 类在其构造函数中实例化 Zookeeper 对象时,将其自身的引用作为 Watch 参数传递给 Zookeeper 的构造函数,同时它也将其自身的引用作为 DataMonitorListener 参数传递给 DataMonitor 的构造函数。Executor 本身实现了以下接口:
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { ...
DataMonitorListener 接口本身不是Zookeeper API 的一部分,它完全是一个自定义的接口,可以说是专门为本程序设计的。DataMonitor 对象使用该接口和“容器”(即 Executor 类)进行通信,DataMonitorListener 接口如下:
public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); }
该接口在 DataMonitor 中定义,Executor 类实现该接口,当 Executor.exists() 被调用的时候,Executor 决定是否启动或停止事先指定的应用程序(回忆一下前文所说的,当 Znode 消失时 Zookeeper 客户端会杀死该可执行程序)。
当 Executor.closing() 被调用的时候,Executor 会根据 Zookeeper 连接永久性地消失来决定是否关闭自己。
你或许已经猜到,DataMonitor 对象根据 Zookeeper 状态变化来调用这些方法吧?
以下是 Executor 类中实现 DataMonitorListener.exists() 和 DataMonitorListener.closing()的代码:
public void exists( byte[] data ) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } public void closing(int rc) { synchronized (this) { notifyAll(); } }
DataMonitor 类
DataMonitor 类是本程序 Zookeeper 逻辑的核心, 它差不多是异步的,并由事件驱动的。DataMonitor 构造函数如下:
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); }
调用 ZooKeeper.exists() 检查指定的 Znode 是否存在,并设置监视,传递自身引用作为回调对象,在某种意义上,在 watch 触发时就会引起真实的处理流程。
当 ZooKeeper.exists() 操作在服务器端完成时,ZooKeeper API 会在客户端调用 completion callback:
public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } }
上述代码首先检查 Znode 是否存在,以及其他重大的不可恢复的错误。如果文件(或者Znode)存在,它将从 Znode 获取数据,如果状态发生变化再调用 Executor 的 exists() 回调函数。注意,getData 函数本省必须要做任何的异常处理,因为本身就有监视可以处理任何错误:如果节点在调用 ZooKeeper.getData() 之前被删除,ZooKeeper.exists() 就会触发回调函数,如果存在通信错误,在连接上的监视会在该连接重建之前触发相应的事件,同时引发相应的处理。
最后,DataMonitor 处理监视事件的代码如下:
public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } }
如果客户端 Zookeeper 程序在会话失效时(Expired event)重新建立了通信信道(SyncConnected event) ,所有的会话监视会自动和服务器进行重连, (Zookeeper 3.0.0以上版本会重置之前设置的监视). 更多编程指南请参见 ZooKeeper Watches 。 当 DataMonitor 获得了指定 Znode 的事件后,它将调用 ZooKeeper.exists() 来决定究竟发生了什么。
完整的程序:
Executor.java:
/** * A simple example program to use DataMonitor to start and * stop executables based on a znode. The program watches the * specified znode and saves the data that corresponds to the * znode in the filesystem. It also starts the specified program * with the specified arguments when the znode exists and kills * the program if the znode goes away. */ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { String znode; DataMonitor dm; ZooKeeper zk; String filename; String exec[]; Process child; public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } /** * @param args */ public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ public void process(WatchedEvent event) { dm.process(event); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } } public void closing(int rc) { synchronized (this) { notifyAll(); } } static class StreamWriter extends Thread { OutputStream os; InputStream is; StreamWriter(InputStream is, OutputStream os) { this.is = is; this.os = os; start(); } public void run() { byte b[] = new byte[80]; int rc; try { while ((rc = is.read(b)) > 0) { os.write(b, 0, rc); } } catch (IOException e) { } } } public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } }
DataMonitor.java:
/** * A simple class that monitors the data and existence of a ZooKeeper * node. It uses asynchronous ZooKeeper APIs. */ import java.util.Arrays; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; public class DataMonitor implements Watcher, StatCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; DataMonitorListener listener; byte prevData[]; public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); } /** * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); } public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } } public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } } }
发表评论
-
ZooKeeper 分布式锁实现
2015-01-14 10:23 999场景描述 在分布式应 ... -
Zookeeper 分布式配置管理
2015-01-13 17:27 1147原创 配置中心代码: import java.io.IOE ... -
Zookeeper 进阶之——Zookeeper编程示例(如何使用Zookeeper实现屏障Barriers和队列Queues)
2015-01-13 09:36 499引用原文:http://www.cnblogs.com/hai ... -
ZooKeeper 典型的应用场景(二)
2015-01-12 17:26 417原文:http://www.cnblogs.com ... -
ZooKeeper 典型的应用场景
2015-01-12 17:23 526引用原文:http://www.cnblo ... -
zookeeper java-api
2015-01-12 17:14 563原文:http://www.cnblogs.com ... -
zookeeper的伪分布式搭建
2015-01-12 16:36 630引用原文:http://www.cnblo ...
相关推荐
开发者通常会使用Zookeeper的Java客户端API或者命令行工具来与Zookeeper交互。 在进行分布式开发时,理解和熟练使用Zookeeper能够极大地提升系统的稳定性和可扩展性。它作为一个可靠的中间件,可以帮助我们解决...
在 ZooKeeper 的 Java API 中,开发人员可以使用 Executor 类来维护与服务器端的连接,DataMonitor 类来监视 Znode 节点的数据。Executor 类包括 ZooKeeper 对象和 DataMonitor 对象,负责维护 ZooKeeper 连接,监视...
**ZooKeeper Java API**是Apache ZooKeeper项目的一部分,它为Java开发者提供了便捷的方式来与ZooKeeper集群进行交互。ZooKeeper是一个分布式的、开放源码的服务,主要用于维护配置信息、命名服务、分布式同步和组...
它包含了`org.apache.zookeeper`包下的各种类和接口,如`ZooKeeper`客户端实例、`WatchedEvent`事件模型、` KeeperException`异常处理等,是Java程序连接Zookeeper的基础。 2. **log4j.jar**:Log4j是Apache的一个...
【标题】"Dubbo+zookeeper 入门简单实例"揭示了如何在Java环境中结合Dubbo和Zookeeper构建一个基础的服务治理框架。Dubbo是阿里巴巴开源的一款高性能、轻量级的Java服务治理框架,而Zookeeper则是一个分布式协调服务...
Dubbo是阿里巴巴开源的一款高性能、轻量级的Java分布式服务框架,而Zookeeper则是一个分布式服务协调系统,常用于服务注册与发现。在这个实例中,我们将了解如何设置和运行一个简单的Dubbo应用,利用Zookeeper作为...
Java是开发Zookeeper客户端的主要编程语言之一,因此了解如何在Java中进行Zookeeper的增删改查操作对于开发分布式应用至关重要。 首先,我们需要导入Apache Zookeeper的Java客户端库。这通常通过在项目构建文件(如...
3. **连接管理**:使用 `CuratorFrameworkFactory` 工厂类,你可以创建一个 ZooKeeper 客户端实例。例如: ```java CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ...
在Java环境中,Curator是一个优秀的Zookeeper客户端库,简化了与Zookeeper的交互,包括服务注册与发现。本文将深入探讨如何利用Curator实现这一功能。 首先,Curator提供了一套完整的API来抽象服务注册与发现,包括...
ZooKeeper Java API编程实例分析 ZooKeeper Java API编程是 ZooKeeper 的一个重要组件,它提供了 Java 语言的 API 接口,让开发者可以使用 Java 语言编写 ZooKeeper 应用程序。下面我们将详细分析 ZooKeeper Java ...
【Dubbo+Zookeeper入门实例】深入解析 Dubbo是一个由阿里巴巴开源的高性能、轻量级的Java分布式服务框架,它的主要目标是为开发者提供一套简单易用的微服务治理方案。Dubbo的核心功能包括服务注册与发现、服务调用...
Dubbo是中国阿里巴巴开源的一款高性能、轻量级的Java服务治理框架,而Zookeeper是Apache的一个分布式协调服务,常用于分布式环境中的配置管理、命名服务、集群同步等。 【描述】提到的" dubbo+zookeeper+springmvc...
标题中的“一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架”揭示了这个项目的核心技术栈,它整合了三个在分布式系统中广泛使用的开源库:Netty、Zookeeper和Spring。让我们逐一深入探讨这三个技术以及它们...
其次,Dubbo是一款高性能、轻量级的Java RPC框架,它致力于服务治理,包括服务注册、发现、调用、负载均衡、容错等。Dubbo的核心功能包括服务提供者(Provider)、服务消费者(Consumer)、注册中心(Registry)和...
在本实例中,我们将深入探讨ZooKeeper的基本概念、工作原理以及如何使用其基本API来实现节点的增删改查操作。 **ZooKeeper基本概念** 1. **节点(ZNode)**:ZooKeeper的数据存储是以节点形式存在,每个节点都有一...
然后,你可以通过`ZooKeeper`类的构造函数创建一个ZooKeeper实例,提供服务器地址列表、会话超时时间和Watcher。例如: ```java ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 3000, new Watcher() { @...
创建一个`ZooKeeper`实例需要提供服务器地址列表、会话超时时间以及一个Watcher对象。Watcher是Zookeeper中的事件监听器,用于接收来自服务器的事件通知,例如节点数据变化或创建等。 `ZooKeeper`类提供了多种操作...
在Dubbo中,Zookeeper常被用作服务注册中心,服务提供者会在Zookeeper上注册自己的服务,服务消费者通过查询Zookeeper找到可调用的服务实例。 SpringMVC是Spring框架的一部分,用于构建Web应用程序的模型-视图-控制...
### ZooKeeper实例与Solr(tomcat)集群部署详解 #### 一、部署背景与目的 在大数据处理和搜索引擎技术领域,ZooKeeper与Solr是两个不可或缺的技术组件。ZooKeeper作为分布式协调服务,提供了高可用性以及一致性保证...
在压缩包`zookeeperCase`中,可能包含了实现这些功能的Java代码示例,包括创建Zookeeper客户端连接、创建和删除ZNode、监听节点变化、实现分布式锁逻辑以及服务注册和发现的相关类。通过学习和分析这些代码,可以...