`

kazoo--zookeeper的python驱动

阅读更多

     

核心类讲解

kazoo.client.KazooClient(hosts='127.0.0.1:2181', 
                         timeout=10.0, 
                         client_id=None, 
                         handler=None, 
                         default_acl=None, 
                         auth_data=None, 
                         read_only=None, 
                         randomize_hosts=True, 
                         connection_retry=None, 
                         command_retry=None, 
                         logger=None, 
                         **kwargs)          

该类是kazoo模块的最主要的一个类,用于连接zookeeper服务器,参数:

  • `hosts`:指定ZooKeeper的ip和端口,可以是以逗号分隔的多个ZooKeeper服务器IP和端口,客户端会随机选择一个来连接。
  • `timeout`:会话超时时间,在连接断开后就开始计算,如果在此会话时间内重新连接上的话,该连接创建的临时节点就不会移除。默认会话超时最小是2倍的tickTime(在zk服务器配置文件中设置),最大是20倍的tickTime。会话过期由ZooKeeper集群,而不是客户端来管理。客户端与集群建立会话时提供该超时值,集群使用这个值来确定客户端会话何时过期,集群在指定的超时时间内没有得到客户端的消息时发生会话过期,会话过期时集群将删除会话的所有临时节点,立即通知所有(观察节点)客户端。
  • `client_id`:传递一个双元素数组:[会话id, 密码]。客户端取得ZooKeeper服务句柄时,ZooKeeper创建一个会话,由一个64位数标识,这个数将返回给客户端。如果连接到其他服务器,客户端将在连接握手时发送会话ID。出于安全考虑,服务器会为会话ID创建一个密码,ZooKeeper服务器可以校验这个密码。这个密码将在创建会话时与会话ID一同发送给客户端。与新的服务器重新建立会话的时候,客户端会和会话ID一同发送这个密码。
  • `read_only`:创建一个只读的连接。
  • `randomize_hosts`:随机选择zk服务器连接。

类实例属性及方法:

zk.start(timeout=15)
初始化到zookeeper服务器的连接,超过timeout时间没连接到zk服务器则会产生timeout_exception异常

In [1]: from kazoo.client import KazooClient
In [2]: zk = KazooClient(hosts="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
In [3]: zk.start()       #到这一步没生成异常就说明正常连接到zk服务器了

zk.stop()
一旦连接上,客户端会尽力保持连接,不管间歇性的连接丢失。如果要主动丢弃连接,就使用该方法,该方法会断开连接和关闭该连接的session,此时该连接创建的所有临时节点都会立即移除,并触发这些临时节点的DataWatch和这些临时节点的父节点的ChildrenWatch

zk.restart() 
重启连接会话

zk.state 
当前连接状态,值为如下三个之一:LOST、CONNECTED、SUSPENDED。当实例化一个KazooClient连接时处于LOST状态;然后使用start()真正建立连接后处于CONNECTED状态;如果此时连接出现问题或客户端切换到另一台zk服务器,此时将处于SUSPENDED状态;在会话有效期内重新连接上又变回CONNECTED状态,如果重连上但是会话过期,则变为LOST状态。

In [57]: zk1 = KazooClient(hosts="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
In [58]: zk1.state
Out[58]: 'LOST'
In [59]: zk1.start()
In [60]: zk1.state
Out[60]: 'CONNECTED'
In [62]: zk1.stop()
In [63]: zk1.state
Out[63]: 'LOST'
In [64]: zk1.close()
In [65]: zk1.state
Out[65]: 'LOST'

zk.connected 
客户端是否已连接到zk服务器,已连接上返回True

zk.add_listener(listener) 
添加一个函数对象作为回调函数,当连接状态改变时,就会自动调用该回调函数,具体看后面的“监听连接事件”。

zk.remove_listener(listener) 
移除一个listener

zk.state_listeners 
listener状态

zk.create(path, value=’’, acl=None, ephemeral=False, sequence=False, makepath=False) 
创建一个节点,ephemeral表示改节点是临时节点,sequence表示该节点为顺序节点,默认当节点的父节点或祖先节点不存在时,创建该节点会失败,可以使用makepath设置为True来自动创建缺少的祖先节点。
zk节点(znode)可以分为如下四类:

  • `PERSISTENT`:持续的,相比于EPHEMERAL,不会随着client session的close/expire而消失
  • `PERSISTENT_SEQUENTIAL`:顺序的,会自动在节点名后面添加一个自增计数,格式为%010d
  • `EPHEMERAL`:临时节点,生命周期依赖于client session,对应session close/expire后其znode也会消失,临时节点不能有子节点
  • `EPHEMERAL_SEQUENTIAL`

该方法可能触发如下异常:

  • `NodeExistsError`:当要创建的节点已经存在时
  • `NoNodeError`:当makepath为False且祖先节点不存在时
  • `NoChildrenForEphemeralsError`:父节点为临时节点,在一个临时节点下面创建子节点会报该异常
  • `ZookeeperError`:节点值太大,zk默认节点值限制为1M
  • `ZookeeperError`:服务器返回一个非0状态码

zk.get_children(path, watch=None, include_data=False)
获取指定节点的所有子节点,以列表返回。如果include_data为True,则还会返回该节点的ZnodeStat状态

zk.get(path, watch=None) 
获取指定节点的值,节点不存在触发NoNodeError异常

In [12]: zk.get("/xj")
Out[12]:
('2222',          #节点的值
  ZnodeStat(
    czxid=4294967304,             #创建该节点的zxid
    mzxid=17179869186,            #最近一次修改该节点的zxid
    ctime=1386060984217,          #秒数表示的znode创建时间,这里最后3位数是毫秒数,如1386060984217,准确应该为1386060984.217
    mtime=1386296567754,          #该节点的最近一次修改时间
    version=7,           #该节点数据修改次数
    cversion=4,          #改节点的子节点修改次数   
    aversion=0,          #该节点的ACL修改次数
    ephemeralOwner=0,    #如果znode是临时节点,则指示节点所有者的会话ID;如果不是临时节点,则为零
    dataLength=4,        #该节点的数据长度
    numChildren=4,       #子节点个数
    pzxid=8589934670    
  )
)

zk.set(path, value, version=-1) 
设置节点的值,返回该节点的ZnodeStat信息。版本不匹配产生BadVersionError异常、节点不存在产生NoNodeError、提供的值value太大产生ZookeeperError异常、如果zk返回一个非零错误状态码则产生ZookeeperError

In [15]: zk.set('/xj', "new_value")
Out[15]: ZnodeStat(czxid=4294967304, mzxid=21474836483, ctime=1386060984217, mtime=1386642770131, version=8, cversion=4, aversion=0, ephemeralOwner=0, dataLength=9, numChildren=4, pzxid=8589934670)

zk.delete(path, version=-1, recursive=False) 
删除节点,recursive为True表示递归删除节点及其子节点,如果有子节点且recursive为False,则会产生NotEmptyError异常,表示该节点有子节点不能删除;版本不匹配产生BadVersionError异常;节点不存在产生NoNodeError;如果zk返回一个非零错误状态码则产生ZookeeperError

zk.exists(path, watch=None) 
检查节点是否存在,存在返回节点的ZnodeStat信息,否则返回None

zk.ensure_path(path, acl=None) 
自动创建节点的祖先节点,如想创建一个节点”/a/b/c”,但是”/a/b”不存在,这时候使用该方法就可以自动把不存在的祖先节点一起创建了,create()makepath参数也能实现该功能

zk.sync(path) 
阻塞并等待指定节点同步到所有zk服务器,返回同步的节点

zk.command(cmd=’ruok’) 
用于执行zk服务器提供的四字命令,这些四字命令如下:

conf               获取zk服务器的配置信息
cons               输出指定server上所有客户端连接的详细信息,包括客户端IP,会话ID等
crst               功能性命令。重置所有连接的统计信息
dump               这个命令针对Leader执行,用于输出所有等待队列中的会话和临时节点的信息
envi               用于输出server的环境变量。包括操作系统环境和Java环境
ruok               用于测试server是否处于无错状态。如果正常,则返回“imok”,否则没有任何响应。注意:ruok不是一个特别有用的命令,它不能反映一个server是否处于正常工作。“stat”命令更靠谱
stat               输出server简要状态和连接的客户端信息
srvr               和stat类似
srst               重置server的统计信息
wchs               列出所有watcher信息概要信息,数量等
wchc               列出所有watcher信息,以watcher的session为归组单元排列,列出该会话订阅了哪些path
wchp               列出所有watcher信息,以watcher的path为归组单元排列,列出该path被哪些会话订阅着,意,wchc和wchp这两个命令执行的输出结果都是针对session的,对于运维人员来说可视化效果并不理想,可以尝试将cons命令执行输出的信息整合起来,就可以用客户端IP来代替会话ID了,具体可以看这个实现:http://rdc.taobao.com/team/jm/archives/1450
mntr               输出一些ZK运行时信息,通过对这些返回结果的解析,可以达到监控的效果

zk.hosts 
一个迭代器,显示该客户端随机选择的zk服务器

In [31]: list(zk.hosts)
Out[31]: [('127.0.0.1', 2181), ('127.0.0.1', 2182), ('127.0.0.1', 2183)]

zk.last_zxid 
获取zk服务器最新的一个zxid

In [37]: zk.last_zxid
Out[37]: 21474836491

zk.client_id 
返回连接的session_id和密码

zk.chroot 
查看当前连接根节点

In [30]: zk.chroot
Out[30]: '/xj'

 

监听连接事件:

用于监控连接是否断开、恢复或者是会话过期,kazoo通过kazoo.client.KazooState类来实现该功能,该类有三个值如下:

KazooState.CONNECTED    已正常连接上或已重新连接上zk服务器的连接状态
KazooState.SUSPENDED    连接被中断,但是会话时间还没过期,该连接创建的临时节点也都还在   
KazooState.LOST         该连接已确认死亡(连接中断且会话过期),此时该连接创建的临时节点都已被移除

涉及KazooClient类的方法:
zk.state 
当前连接状态,值为如下三个之一:LOST、CONNECTED、SUSPENDED。当实例化一个KazooClient连接时处于LOST状态;然后使用start()真正建立连接后处于CONNECTED状态;如果此时连接出现问题或客户端切换到另一台zk服务器,此时将处于SUSPENDED状态;在会话有效期内重新连接上又变回CONNECTED状态,如果重连上但是会话过期,则变为LOST状态。

zk.connected 
客户端是否已连接到zk服务器,已连接返回True

zk.add_listener(listener) 
添加一个函数对象作为回调函数,当连接状态改变时,就会自动调用该函数

zk.remove_listener(listener) 
移除一个listener

zk.state_listeners 
listener状态

例子:

In [45]: from kazoo.client import KazooClient
In [46]: from kazoo.client import KazooState
In [47]: zk = KazooClient(hosts="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")

In [48]: def my_listener(state):
   ....:     if state == KazooState.LOST:
   ....:         print "trigger LOST state"
   ....:     elif state == KazooState.SUSPENDED:
   ....:         print "trigger SUPENDED state"
   ....:     else:
   ....:         print "connected or reconnected"

In [51]: zk.add_listener(my_listener)

In [52]: zk.start()         
connected or reconnected               #触发CONNECTED状态

In [53]: zk.stop()
trigger LOST state               #触发LOST状态

 

重连保持session_id不变:

客户端与zookeeper服务器的连接断开了,重新连接后zookeeper是如何知道这是之前一个连接的重新连接呢?这是靠KazooClient类的client_id参数(是一个双元素元组)来保证的,如果该参数为空表示是一个新连接,此时zookeeper服务器为该连接分配一个session_id和对应秘钥(元组形式);如果该参数不为空,则zookeeper就知道这个连接是之前一个连接断开后重新连接上来的,然后就会去查该连接的session是否过期,如果还没过期就继续保存这个连接的临时节点。
当前想到的做法就是,将zk.client_id序列化到一个文件中,如pickle.dump(zk.client_id, fileobj),然后下次连接就通过pickle.load(fileobj)来获取这个session_id,然后传入KazooClient类的client_id参数中。

应用举例:
如zookeeper做游戏服务器的负载均衡,每一个游戏服都在zookeeper服务器上注册一个临时节点,游戏服务器断开后,如果在session会话期内通过上一个连接的client_id重新连接上来,则临时节点不会被删除,表示该游戏服依然正常,没有挂掉;如果超过session会话有效期还没连接上来,则临时节点被移除,判定为该游戏服已经挂掉,不再转发客户端请求过来。

 

watcher

分为两种:

  • `dataWatch`:针对节点的创建、修改、删除,都会触发该watch(同时创建、删除节点也会触发该节点的父节点的childrenWatch)
  • `childrenWatch`:针对子节点的创建、删除,才会触发该watch

kazoo.client.KazooClient类提供了两个装饰器方法来实现这两类watcher:

@zk.ChildrenWatch
@zk.DataWatch

不同于默认的watch规则,使用该装饰器定义的watch会一直存在,而不是默认的一次性,也就是说只要对一个路径定义了watch,该watch就会一直存在,监控该路径的任何变动。

DataWatch用法:

@zk.DataWatch("/xj")          #对该节点create、set、delete操作都会触发该watch
def changed(data, stat, event):          #data是该节点的值;stat是节点的ZnodeState状态信息;event是WatchedEvent类实例,有三个值:type表示触发该watch的操作类型(如CREATED表示是一个创建节点的操作触发了该watch)、state表示当前连接状态、path表示操作的路径。这三个参数也不是必须提供,提供一个或两个也行
    ...

例子:

from kazoo.client import KazooClient
import time

zk = KazooClient(hosts="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
zk.start()

@zk.DataWatch("/xj")
def changed(data, stat, event):
    print "--------------DataWatch---------------"
    print "data:", data
    print "stat:", stat
    print "event:", event

zk.create("/xj", "value1")
time.sleep(2)
zk.set("/xj", "value2")
time.sleep(2)
zk.delete("/xj")
time.sleep(2)

执行上面代码:

[root@centos6 ~]# python kazoo_watcher.py
--------------DataWatch---------------               #谨记,watch函数定义完的就会调用一次
data: None
stat: None
event: None
--------------DataWatch---------------
data: value1
stat: ZnodeStat(czxid=21474836501, mzxid=21474836501, ctime=1386646806240, mtime=1386646806240, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=6, numChildren=0, pzxid=21474836501)
event: WatchedEvent(type='CREATED', state='CONNECTED', path=u'/xj')          #这里可以通过event.type这种方式来访问
--------------DataWatch---------------
data: value2
stat: ZnodeStat(czxid=21474836501, mzxid=21474836502, ctime=1386646806240, mtime=1386646808268, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=6, numChildren=0, pzxid=21474836501)
event: WatchedEvent(type='CHANGED', state='CONNECTED', path=u'/xj')
--------------DataWatch---------------
data: None
stat: None
event: WatchedEvent(type='DELETED', state='CONNECTED', path=u'/xj')

ChildrenWatch用法:

@zk.ChildrenWatch("/xj")          #如果/xj节点的子节点有变动(添加、删除),触发该watch
def childWatch(children):               #这种watch函数只有一个参数,children以列表形式保存了该节点的所有子节点
    ....

例子:

from kazoo.client import KazooClient
import time

zk = KazooClient(hosts="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
zk.start()

@zk.ChildrenWatch("/xj")
def childWatch(children):
        print '---------------ChildWatch----------------'
        print "children:", children

zk.create("/xj/a", "value1")
time.sleep(2)
zk.set("/xj/a", "value2")
time.sleep(2)
zk.delete("/xj/a")
time.sleep(2)

执行上面代码:

[root@centos6 ~]# python kazoo_watcher.py
---------------ChildWatch----------------
children: []          #定义完watch函数后自动执行一次
---------------ChildWatch----------------
children: [u'a']
---------------ChildWatch----------------
children: []          #这里是zk.delete("/xj/a")触发的,对子节点set不会触发父节点的ChilrenWatch

 

事务:

zookeeper3.4开始支持事务操作,在一个事务中可以执行多个操作,如果有一个操作未执行成功,则回滚到事务开始之前的状态

transaction = zk.transaction()
transaction.check('/node/a', version=3)
transaction.create('/node/b', b"a value")
results = transaction.commit()

 

chroot(改变连接根节点):

可以改变新连接的根节点,这样做有很多好处,比如不同应用只能访问zk服务器的不同节点,不用担心看到或修改了其他应用的节点信息

In[24]:zk.get_children("/")#默认根节点Out[24]:[u'xj',u'sapser',u'user0000000030',u'user0000000031',u'user0000000032',u'id-0000000038',u'user0000000033',u'user0000000034',u'sapserr']In[25]:zk.get("/xj")Out[25]:('',ZnodeStat(czxid=21474836510,mzxid=21474836510,ctime=1386647457406,mtime=1386647457406,version=0,cversion=5,aversion=0,ephemeralOwner=0,dataLength=0,numChildren=1,pzxid=25769803779))#从numChildren看出/xj节点有一个子节点In[26]:zk1=KazooClient(hosts="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/xj")#改变zk1连接的根目录为/xj节点,该连接只能看到/xj节点之下的东西。注意/xj必须位于hosts参数末尾位置,位于其他位置都会出错In[27]:zk1.start()In[28]:zk1.get_children("/")#这是根节点"/"其实就是/xj节点,可以看到只有一个子节点Out[28]:[u'a']

reference:
http://sapser.github.io/python/zookeeper/2014/07/24/zookeeper-kazoo
分享到:
评论

相关推荐

    Python库 | kazoo-api-0.1.0.tar.gz

    `kazoo-api-0.1.0`是Python与Zookeeper交互的接口,它为Python开发者提供了一套易于使用的API,以便在Python应用程序中操作Zookeeper。这个库的特点在于其稳定性和强大的功能集,包括节点的创建、删除、读取和更新,...

    PyPI 官网下载 | kazoo-2.6.1.tar.gz

    《PyPI官网下载 | kazoo-2.6.1.tar.gz》 在Python的世界中,PyPI(Python Package Index)是官方的软件仓库,它为开发者提供了上传和分享Python库的平台,用户则可以通过它来下载和安装各种Python模块。今天我们要...

    Python库 | types-kazoo-0.1.3.tar.gz

    总之,`types-kazoo-0.1.3`是Python与Zookeeper通信的重要工具,它通过提供简洁的API和强大的功能,降低了使用Zookeeper进行分布式系统开发的复杂度。无论你是初学者还是经验丰富的开发者,Kazoo都是构建可靠、高...

    Python库 | kazoo-2.3.1-py2.py3-none-any.whl

    《Python库 kazoo-2.3.1-py2.py3-none-any.whl:深入解析与应用》 在Python的世界里,丰富的库是其强大功能的重要支撑。今天我们要讨论的是名为“kazoo”的库,它的一个特定版本——2.3.1。这个库以`.whl`格式提供...

    python-kazoo-2.2.1-1.el7.noarch.rpm

    官方离线安装包,亲测可用。使用rpm -ivh [rpm完整包名] 进行安装

    Python库 | asana-kazoo-2.0.1dev.tar.gz

    资源分类:Python库 所属语言:Python 资源全名:asana-kazoo-2.0.1dev.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    python-kazoo-doc-2.5.0-7.el8.noarch.rpm

    官方离线安装包,亲测可用

    python3-kazoo-2.5.0-7.el8.noarch.rpm

    离线安装包,亲测可用

    zookeeper python接口实例详解

    Python 中访问 Zookeeper 的接口主要有两个库:zkpython 和 kazoo。本实例主要介绍的是 zkpython。 **zkpython 安装** 在安装 zkpython 之前,需要确保 Zookeeper 服务已经正确安装在 `/usr/local/zookeeper` 目录...

    Kazoo安装和使用文档

    Kazoo 是一个 Python 库,用于连接和操作 ZooKeeper。ZooKeeper 是一个分布式应用程序协调服务,提供高可用性和高性能的数据存储和管理功能。Kazoo 库提供了一个简单的接口来使用 ZooKeeper,允许开发者轻松地连接和...

    kazoo-braintree:Braintree簿记员

    【kazoo-braintree:Braintree簿记员】是一个基于Erlang语言的开源项目,主要用于集成和管理Braintree支付平台的服务。Braintree是一家提供全球支付解决方案的公司,其服务包括信用卡处理、付款网关、移动支付以及...

    kazoo-cccp

    "Kazoo-CCCP" 是一个基于Erlang编程语言构建的项目,它可能是一个分布式系统或服务,尤其在通信、并发处理或高可用性领域。Erlang是一种为了解决电信行业中的大规模并发问题而设计的面向过程的语言,其特点包括轻量...

    Kazoo-CallRecordingS3:使用 S3 存储的 Kazoo 通话录音

    Kazoo-CallRecordingS3 使用 S3 存储的 Kazoo 通话录音如何使用它您将需要 S3 API 密钥和秘密创建“my-recordings”存储桶,然后在此存储桶中创建 accountID 文件夹。 TODO:如果不存在则添加自动创建文件夹例如在...

    Zookeeper接口kazoo实例解析

    而Kazoo是Python中用于操作Zookeeper的一个强大库,它提供了丰富的API来简化与Zookeeper交互。 在Python中使用Kazoo,首先需要进行安装。通过`yum install python-pip`确保pip已经安装,接着使用`pip install kazoo...

    python+zk笔记,xmind

    1. **安装zkpython库**:Python与Zookeeper通信通常依赖于`zkpython`库(也称为` kazoo`),它是Python的一个Zookeeper客户端。可以通过`pip install kazoo`来安装。 2. **建立连接**:使用`KazooClient`类初始化一...

    kazoo:Kazoo是一个高级Python库,可以更轻松地使用Apache Zookeeper

    kazoo为Python客户端的实现了更高级别的API。 有关更多信息,请参见。 执照 kazoo是根据Apache License 2.0提供的。 作者 kazoo从开始的,并通过与开源社区的合作已与和代码合并。 从那以后,它聚集了一个活跃的...

    ZooKeeper 分布式协调服务的基本安装与Python实践

    此外,展示了利用 Python 和 kazoo 库连接 ZooKeeper 执行相似的基本操作的实例代码,便于开发者理解和使用。 适合人群:对分布式系统感兴趣的技术开发者,尤其是希望学习分布式服务协调机制的初学者。 使用场景及...

    Kazoo Embedded PBX:该项目是关于为嵌入式计算机安装和设置kazoo-开源

    该项目旨在提供 Kazoo 平台以安装在嵌入式和/或通用 x86-64 计算机上。

Global site tag (gtag.js) - Google Analytics