`
顽石
  • 浏览: 167223 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

etcd事件监听

 
阅读更多

    etcd是CoreOS开发的一个高可用的键值存储系统,主要用于共享配置和服务发现。它使用Go语言编写,并通过Raft一致性算法处理日志复制以保证强一致性。etcd目前的最新版本是2.2.0。

    和zookeeper的二进制接口不同,它提供了HTTP/JSON的rest api接口,所以对使用它的客户端来说是很友好的,几乎每种编程语言都有较成熟的http client开发包,基于这些开发包很容易编写etcd客户端。在java中已经有个比较好的客户端etcd4j(https://github.com/jurmous/etcd4j),类似zookeeper有好用的curator库。

   

   etcd事件

    etcd中和数据变化(包括目录&值key、value变化)相关的事件类型有:set, delete, update, create, compareAndSwap、compareAndDelete。在etcd http response (json格式)action属性就是事件类型。

     etcd的事件watch监听接口也是使用http访问,它提供两种监听模式,一种是一次性监听,类似zookeeper的事件watch,监听到一次事件后,需要客户端重新发起监听,比较繁琐。另一种是持久监听(stream),当有事件时,会连续触发,不需要客户端重新发起监听。

     和zookeeper一样也存在客户端丢失监听事件的可能,它不保证每个事件客户端都能监听到,不过我们在实际使用过程中,通常是在应用启动时,在开始监听之后或之前先查询etcd,将需要的数据全量加载到内存中,后续才是根据监听事件来增量更新内存中的数据或全量刷新数据。另外目前版本的etcd最多只保存1千条事件历史,满1千条后,如果有新的事件产生,事件历史库中最老的事件将被丟弃,这个可以理解为etcd服务端事件丢失。由于有1千条的限制,在有事件浪涌时(在单位时间内例如1秒内产生2千条事件),事件监听处理较慢的时候或未监听时也会发生客户端事件丢失。   

 

 

    一次性watch监听

   可用curl发送http请求来进行演示,假设在本机安装有etcd,客户端访问端口为2379(旧版本监听端口为4001),监听key为/application。一次性监听命令如下:

    curl "http://127.0.0.1:2379/v2/keys/application?wait=true"

    查询字符串中的wait=true表示监听,还可加查询参数recursive=true表示附加监听/application的子目录和后辈目录,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true" 。

    启动监听后etcd如有变化事件发生,则返回json格式的事件,结果类似如下:

    HTTP/1.1 200 OK

    Content-Type: application/json

    X-Etcd-Cluster-Id: e88d54f6225f06ad

    X-Etcd-Index: 271

    X-Raft-Index: 872202

    X-Raft-Term: 5

    Date: Sat, 26 Sep 2015 08:43:17 GMT

    Transfer-Encoding: chunked

 

    {"action":"set","node":{"key":"/application/store","value":"        {...}","modifiedIndex":271,"createdIndex":271},"prevNode":{...}}

   上面的curl返回结果中显示了http头,是加了curl的-i选项。

   从结果可看出action属性为set事件,modifiedIndex是该事件的index,为271。X-Etcd-Index头表示启动监听时etcd的当前index。etcd的index是单调递增的正整数,该整数取值空间是etcd中的所有key共享的。

    由于是一次性监听,所以curl会退出,继续监听要重新运行curl。

    和zookeeper事件监听一样,上面的http请求只能监听到命令发出之后产生的事件(更精确的说是在etcd服务器收到监听请求之后),之前发生的事件是监听不到的。另外在前一个监听得到触发开始(中间包括事件处理耗时)到启动下一次监听之间(并且下一次监听是etcd服务器收到监听请求才真正开始生效,这中间还有网络延)肯定存在时间间隔,如果在这个时间间隔(区间)内有事件发生,客户端是监听不到这些事件的,这个也和zookeeper类似。也就是如果两个事件并行发生或在发生时间上相距很近(例如update一个key后立即delete),后一事件的发生刚好落在这个间隔内,那这个事件在采用一次性监听模式的客户端中会丢失,这些事件对客户端监听不可见。

    

 

    那是不是就一定不能获取先前的事件了?这个要看情况,只要事件还在etcd的事件历史中,并且知道事件的index或起始index,还是可以取到单个事件的。在查询字符串中加waitIndex参数,其值是该事件的index或起始index。例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&waitIndex=269" ,表示查询/application目录或后辈目录中index等于或大于269的事件,如果有则返回一个最接近269的事件,否则会挂起curl,直到有满足条件的事件发生。这个其实是查询事件,已经不是正规的事件监听了。所以如果知道起始index,可以每次index加1来遍历查询事件,遍历查询时可以根据返回事件中的index来调整后续查询条件中使用的index。 

    如果waitIndex对应的事件已被etcd丢弃(见前面提到的etcd事件历史库),此时如果获取该事件,etcd将返回http状态码400( Bad Request),http body类似如下:

    {"errorCode":401,"message":"The event in requested index is outdated and cleared","cause":"the requested history has been cleared [3305/1]","index":4304} 

    另外可以监听根目录(根key),例如curl "http://127.0.0.1:2379/v2/keys?wait=true&recursive=true" 。

 

    持久监听(流式watch)

    要在查询字符串中加stream=true,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&stream=true"

    发出上述命令后,会和etcd服务器之间建立一个http长连接(此处的长连接不是指http keep alive,虽然也需要keep alive),其应答http头中的Transfer-Encoding为chunked,类似server push和comet中的multipart/x-mixed-replace,其http response body不结束,后续的每个事件作为http body的一部分,在该长连接中以一个或多个body chunk块的形式推送给客户端。可以看到在流式持久监听中curl即使收到事件也不会退出,它一直在等待后续将要发生的事件。一次性监听http头中也是chunked,但收到一个事件后,etcd会发送一个结束chunk(大小为0,表示http response结束),因此收到该chunk后curl会退出。

     相比一次性监听,除了更简便,持久监听也比一次性监听可靠性高,不会出现上面提到的在时间间隔内监听不到事件的情况。

      持久监听也是监听从命令发出之后发生的事件,先前的事件是监听不到的,不像JMS的持久订阅可以收到启动之前的jms消息。

      如果在持久监听中加waitIndex参数,分两种情况:一种是waitIndex的值小于或等于启动监听时etcd的当前index(这个值在X-Etcd-Index http头中可看到),则curl接收到满足条件的事件后挂起(不退出),后续再也收不到其他事件,即最多只能收到一个事件。第二种是waitIndex的值大于启动监听时etcd的当前index,则waitIndex参数无效,持久监听的行为同没有waitIndex参数一样。

 

 

   使用etcd4j进行事件监听

   在使用etcd4j进行事件监听时,有个注意事项:如果在同一个jvm虚拟机中既有修改查询etcd数据的操作,也有监听etcd事件,监听事件使用的EtcdClient实例和修改&查询操作使用的EtcdClient实例应分开,不能共用同一个实例,否则目前的etcd4j(2.7.0版本)会出现问题。多个key可以共用一个EtcdClient实例来进行监听,也可每个key使用一个单独的实例来进行监听。

  • 一次性监听

  //创建etcd客户端实例

  EtcdClient   etcdClient = new EtcdClient (new URI[]{new URI("127.0.0.1:2379")});

  /*setRetryPolicy设置重试策略

  * recursive()监听/application目录和其后辈目录中的事件,设置查询字符串recursive=true.

  *  waitForChange()监听,设置查询字符串wait=true.

 * -1表示永不超时,send()发送http请求进行监听

  */

  EtcdResponsePromise<EtcdKeysResponse> promise = etcd.get("/application").setRetryPolicy(

                        new RetryNTimes(300,Integer.MAX_VALUE)).recursive().waitForChange().

                        timeout(-1, TimeUnit.SECONDS).send();

    //增加listener

    promise.addListener(new IsSimplePromiseResponseHandler<EtcdKeysResponse>()

    {

         @Override

         public void onResponse(ResponsePromise<EtcdKeysResponse> promise) {

                   //从promise中取响应,检查是否有异常

                   EtcdKeysResponse response = promise.getNow();

                   Throwable  exception = promise.getException();

                    ... 处理事件

                    //再次发起http请求进行监听

                    ...

         }   

     }

   );

 

     还可使用waitForChange(long waitIndex)来监听指定的事件。

 

  • 持久监听

     etcd4j的当前版本2.7.0不支持持久监听,可以使用

Java异步async httpclient(https://github.com/AsyncHttpClient/async-http-client)来实现持久监听,最挫最原始的持久监听实验代码如下:

     AsyncHttpClient asyncHttpClient = ...;

      //发送http get异步请求,不超时,且提供一个匿名的异步请求完成handler

      asyncHttpClient.prepareGet("http://127.0.0.1:2379/v2/keys/application?   wait=true&recursive=true&stream=true").setRequestTimeout(-1).execute(

       new AsyncHandler<String>() {

           //这个异常回调方法可类比为zookeeper的会话过期

           @Override

           public void onThrowable(Throwable t) {

                  // TODO Auto-generated method stub

                  //todo:异常,重新重试发起监听

            }

             

             //在会话期,有事件时会回调onBodyPartReceived方法

           @Override

           public State  onBodyPartReceived(HttpResponseBodyPart  bodyPart)

throws Exception {

                 // TODO Auto-generated method stub

                // bodyPart对应一个http chunk块,一个事件的数据可能由多个chunk块组成

                System.out.println("event="+new String(bodyPart.getBodyByteBuffer().array(),"utf-8"));

                //todo:将chunk中的数据加入缓冲区,待收到完整的事件数据后将json格式

                //的数据解析成java对象

               ...

               return State.CONTINUE;

         }

  

        @Override

        public  State onStatusReceived(HttpResponseStatus  responseStatus)

throws Exception {

               // TODO Auto-generated method stub

               return State.CONTINUE;

         }

            

            //一个会话期只调用一次,这个回调方法可类比为zookeeper会话建立

         @Override

         public  State onHeadersReceived(HttpResponseHeaders headers)

throws Exception {

               // TODO Auto-generated method stub

               return State.CONTINUE;

          }

 

         @Override

          public String onCompleted() throws Exception {

                    // TODO Auto-generated method stub

                   return "endWatch";

          }

   

    });

   

     

    

 

    

    

    

   

    

 

 

    

  • 大小: 23.6 KB
分享到:
评论

相关推荐

    etcdWatcher:ETCD密钥监听器实现

    1. **Watch API**:ETCD提供了Watch API,允许客户端订阅并监听特定key或key前缀的变更事件。etcdWatcher就是基于这个API实现的,它会创建一个长连接,持续监听目标键的增删改事件。 2. **事件处理**:当ETCD中的键...

    etcd-v3.5.9

    应用可以通过监听etcd中的特定键来获取或接收配置变更的实时通知,实现动态配置更新。 ### 6. 安全性与认证 etcd支持SSL/TLS加密通信,以保护数据在传输过程中的安全。同时,它还提供了身份验证和授权功能,通过...

    etcd实战及其原理分析.pdf

    - 分布式通知和协调:etcd的watch机制可以实时监听并通知键值变化,用于事件协调。 - 主备选举:etcd的raft算法可以用于选举集群中的领导者。 1.3 为什么选择etcd 与ZooKeeper相比,etcd具有以下优势: - 更简单的...

    k8s 二进制安装 etcd 3.5.1

    3. **配置etcd**:创建一个配置文件,例如`/etc/etcd/etcd.conf`,配置监听地址、初始集群成员等信息。例如: ``` [etcd] name = node1 data-dir = /var/lib/etcd initial-advertise-peer-urls=...

    将有问题的etcd节点重新加入集群1

    - `ETCD_LISTEN_CLIENT_URLS`: 设置etcd监听的客户端URL。 - `ETCD_LISTEN_PEER_URLS`: 设置etcd监听的对等节点URL。 - `ETCD_INITIAL_CLUSTER_TOKEN`: 用于集群初始化的安全令牌。 - `ETCD_NAME`: 节点的名称。 - `...

    springboot整合etcd配置中心-etcd-config-spring-boot.zip

    5. **动态刷新配置**:为了实现实时更新配置,我们可以利用Spring Boot的`@RefreshScope`注解和监听`/refresh`端点。当Etcd中的配置发生变化时,通过调用`/refresh`接口,服务会重新加载配置,无需重启服务。 6. **...

    etcd官网3.5.0离线安装包及自编服务脚本.rar

    4. 配置etcd,包括设置初始集群成员、监听地址、数据存储路径等。这可以通过修改etcd的配置文件完成。 5. 使用自编的服务脚本将etcd注册为系统服务,这样可以通过`systemctl start/stop/restart etcd`来控制etcd的...

    windows64位etcd:etcd-v3.0.17-windows-amd64.zip

    要启动etcd服务,只需在命令行中执行`etcd.exe`,可以加上一些命令行参数来配置etcd的行为,例如设置数据存储路径、监听端口、集群成员等。 **etcd的基本操作** 1. **设置键值对**:使用`etcdctl put key value`...

    etcd压缩包安装下载

    6. **观察者模式**: 支持Watch功能,可以实时监听键值的变更,及时做出响应。 7. **版本控制**: 每个键值都有版本信息,可以追踪历史记录和进行回滚操作。 **安装etcd** 1. **下载**: 针对Windows平台,你可以...

    微服务etcd配置中心.zip

    4. **事件通知**:Etcd支持Watch功能,客户端可以监听特定键值的变化,一旦发生变化,Etcd会主动推送给客户端,这对于实时更新配置非常有用。 5. **版本控制**:Etcd为每个键记录版本历史,可以查看或回滚到之前的...

    Etcd官网文档中文版.pdf

    - 实现Watch功能,可以监听键值变化并实时通知客户端。 4. **安装与配置**: - 在不同的操作系统上安装Etcd,包括Linux、macOS和Windows。 - 配置Etcd服务器参数,如端口、数据存储路径、日志级别等。 - 创建...

    etcd manager

    2. **Watch机制**: etcd 支持对键值的变化进行实时监听,当某个键的值发生变化时,可以触发客户端的回调函数,实现快速响应和事件驱动的编程模型。 3. **Raft一致性算法**: etcd 使用 Raft 算法来保证集群中的数据...

    windows系统下etcd的安装包

    4. **配置etcd**: 在启动etcd之前,你可能需要配置一些参数,如数据存储路径、监听地址、集群成员等。这些可以通过命令行参数或配置文件(如`etcd.conf`)来设置。例如,你可以设置`--data-dir`参数来指定数据存储的...

    etcd-windows安装包

    1. **服务发现**:etcd可以作为服务注册与发现的中间件,允许应用通过key-value接口来查询和监听其他服务的状态,实现微服务间的动态连接。 2. **配置共享**:它可以用来共享和分发配置信息,使得多个实例或节点...

    etcd安装包etcdlinux

    在运行etcd之前,需要配置etcd的运行参数,如监听地址、初始集群配置等。通常创建一个配置文件,例如`/etc/etcd/etcd.conf`。 5. **启动etcd** 使用命令启动etcd服务: ``` etcd --config-file=/etc/etcd/etcd....

    windows版本 etcd-v3.5.0-windows-amd64

    默认情况下,etcd将在本地主机的2379端口上监听HTTP API,并在2380端口上用于集群内部通信。 **配置etcd** etcd可以通过命令行参数或者配置文件进行设置。在Windows上,可以创建一个配置文件,如`etcd.conf`,然后...

    etcd-v3.5.0-linux-amd64.tar.gz

    5. **watch 功能**:etcd 提供了 watch 功能,客户端可以监听某个键的变化,一旦键值发生变化,etcd 会主动推送通知给客户端,实现实时更新。 6. **多版本控制**:etcd 实现了多版本控制,每个键可以有多个历史版本...

    etcd-v3.5.11

    通常,etcd可以通过命令行参数配置,如设置初始集群成员、监听地址、数据目录等。此外,还可以通过配置文件(如`etcd.conf`)来定义更复杂的设置。 **etcd集群** etcd集群由多个节点组成,每个节点都运行着etcd...

    etcd-main依赖包

    - `WATCH`:可以监听特定键值的变化,实现事件通知。 **4. 配置管理** etcd常用于动态配置管理,比如服务的地址、端口、配置文件等。服务启动时可以从etcd获取最新配置,当配置变更时,通过WATCH接口实时感知并...

    etcd-v3.0.9-linux-amd64.tar.gz

    2. 修改配置文件(如`etcd.conf`),指定数据目录、监听端口、初始集群配置等参数。 3. 使用`etcd --config-file=etcd.conf`启动etcd服务。 4. 通过`etcdctl`命令行工具与etcd进行交互,例如:`etcdctl put key ...

Global site tag (gtag.js) - Google Analytics