Python 进行分布式系统协调
(点击上方公众号,可快速关注)
来源:naughty
链接:my.oschina.net/taogang/blog/410864
随着大数据时代的到来,分布式是解决大数据问题的一个主要手段,随着越来越多的分布式的服务,如何在分布式的系统中对这些服务做协调变成了一个很棘手的问题。今天我们就来看看如何使用Python,利用开源对分布式服务做协调。
在对分布式的应用做协调的时候,主要会碰到以下的应用场景:
-
业务发现(service discovery)找到分布式系统中存在那些可用的服务和节点
-
名字服务 (name service)通过给定的名字知道到对应的资源
-
配置管理 (configuration management)如何在分布式的节点中共享配置文件,保证一致性。
-
故障发现和故障转移 (failure detection and failover)当某一个节点出故障的时候,如何检测到并通知其它节点, 或者把想用的服务转移到其它的可用节点
-
领导选举(leader election)如何在众多的节点中选举一个领导者,来协调所有的节点
-
分布式的锁 (distributed exclusive lock)如何通过锁在分布式的服务中进行同步
-
消息和通知服务 (message queue and notification)如何在分布式的服务中传递消息,以通知的形式对事件作出主动的响应
有许多的开源软件试图解决以上的全部或者部分问题,例如ZooKeeper,consul,doozerd等等,我们现在就看看它们是如何做的。
ZooKeeper
ZooKeeper是使用最广泛,也是最有名的解决分布式服务的协调问题的开源软件了,它最早和Hadoop一起开发,后来成为了Apache的顶级项目,很多开源的项目都在使用ZooKeeper,例如大名鼎鼎的Kafka。
Zookeeper本身是一个分布式的应用,通过对共享的数据的管理来实现对分布式应用的协调。
ZooKeeper使用一个树形目录作为数据模型,这个目录和文件目录类似,目录上的每一个节点被称作ZNodes。
ZooKeeper提供基本的API来操纵和控制Znodes,包括对节点的创建,删除,设置和获取数据,获得子节点等。
除了这些基本的操作,ZooKeeper还提供了一些配方(Recipe),其实就是一些常见的用例,例如锁,两阶段提交,领导选举等等。
ZooKeeper本身是用Java开发的,所以对Java的支持是最自然的。它同时还提供了C语言的绑定。
Kazoo是一个非常成熟的Zookeeper Python客户端,我们这就看看如果使用Python来调用ZooKeeper。(注意,运行以下的例子,需要在本地启动ZooKeeper的服务)
基本操作
以下的例子现实了对Znode的基本操作,首先要创建一个客户端的连接,并启动客户端。然后我们可以利用该客户端对Znode做增删改,取内容的操作。最后推出客户端。
from kazoo.client import KazooClient
import logging
logging.basicConfig()
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# Ensure a path, create if necessary
zk.ensure_path("/test/zk1")
# Create a node with data
zk.create("/test/zk1/node", b"a test value")
# Determine if a node exists
if zk.exists("/test/zk1"):
print "the node exist"
# Print the version of a node and its data
data, stat = zk.get("/test/zk1")
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
# List the children
children = zk.get_children("/test/zk1")
print("There are %s children with names %s" % (len(children), children))
zk.stop()
通过对ZNode的操作,我们可以完成一些分布式服务协调的基本需求,包括名字服务,配置服务,分组等等。
故障检测(Failure Detection)
在分布式系统中,一个最基本的需求就是当某一个服务出问题的时候,能够通知其它的节点或者某个管理节点。
ZooKeeper提供ephemeral Node的概念,当创建该Node的服务退出或者异常中止的时候,该Node会被删除,所以我们就可以利用这种行为来监控服务运行状态。
以下是worker的代码
from kazoo.client import KazooClient
import time
import logging
logging.basicConfig()
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# Ensure a path, create if necessary
zk.ensure_path("/test/failure_detection")
# Create a node with data
zk.create("/test/failure_detection/worker",
value=b"a test value", ephemeral=True)
while True:
print "I am alive!"
time.sleep(3)
zk.stop()
以下的monitor 代码,监控worker服务是否运行。
from kazoo.client import KazooClient
import time
import logging
logging.basicConfig()
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# Determine if a node exists
while True:
if zk.exists("/test/failure_detection/worker"):
print "the worker is alive!"
else:
print "the worker is dead!"
break
time.sleep(3)
zk.stop()
领导选举
Kazoo直接提供了领导选举的API,使用起来非常方便。
from kazoo.client import KazooClient
import time
import uuid
import logging
logging.basicConfig()
my_id = uuid.uuid4()
def leader_func():
print "I am the leader {}".format(str(my_id))
while True:
print "{} is working! ".format(str(my_id))
time.sleep(3)
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
election = zk.Election("/electionpath")
# blocks until the election is won, then calls
# leader_func()
election.run(leader_func)
zk.stop()
你可以同时运行多个worker,其中一个会获得Leader,当你杀死当前的leader后,会有一个新的leader被选出。
分布式锁
锁的概念大家都熟悉,当我们希望某一件事在同一时间只有一个服务在做,或者某一个资源在同一时间只有一个服务能访问,这个时候,我们就需要用到锁。
from kazoo.client import KazooClient
import time
import uuid
import logging
logging.basicConfig()
my_id = uuid.uuid4()
def work():
print "{} is working! ".format(str(my_id))
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
lock = zk.Lock("/lockpath", str(my_id))
print "I am {}".format(str(my_id))
while True:
with lock:
work()
time.sleep(3)
zk.stop()
当你运行多个worker的时候,不同的worker会试图获取同一个锁,然而只有一个worker会工作,其它的worker必须等待获得锁后才能执行。
监视
ZooKeeper提供了监视(Watch)的功能,当节点的数据被修改的时候,监控的function会被调用。我们可以利用这一点进行配置文件的同步,发消息,或其他需要通知的功能。
from kazoo.client import KazooClient
import time
import logging
logging.basicConfig()
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
@zk.DataWatch('/path/to/watch')
def my_func(data, stat):
if data:
print "Data is %s" % data
print "Version is %s" % stat.version
else :
print "data is not available"
while True:
time.sleep(10)
zk.stop()
除了我们上面列举的内容外,Kazoo还提供了许多其他的功能,例如:计数,租约,队列等等,大家有兴趣可以参考它的文档
Consul
Consul是用Go开发的分布式服务协调管理的工具,它提供了服务发现,健康检查,Key/Value存储等功能,并且支持跨数据中心的功能。
Consul提供ZooKeeper类似的功能,它的基于HTTP的API可以方便的和各种语言进行绑定。自然Python也在列。
与Zookeeper有所差异的是Consul通过基于Client/Server架构的Agent部署来支持跨Data Center的功能。
Consul在Cluster伤的每一个节点都运行一个Agent,这个Agent可以使Server或者Client模式。Client负责到Server的高效通信,相对为无状态的。 Server负责包括选举领导节点,维护cluster的状态,对所有的查询做响应,跨数据中心的通信等等。
KV基本操作
类似于Zookeeper,Consul支持对KV的增删查改的操作。
import consul
c = consul.Consul()
# set data for key foo
c.kv.put('foo', 'bar')
# poll a key for updates
index = None
while True:
index, data = c.kv.get('foo', index=index)
print data['Value']
c.kv.delete('foo')
这里和ZooKeeper对Znode的操作几乎是一样的。
服务发现(Service Discovery)和健康检查(Health Check)
Consul的另一个主要的功能是用于对分布式的服务做管理,用户可以注册一个服务,同时还提供对服务做健康检测的功能。
首先,用户需要定义一个服务。
{
"service": {
"name": "redis",
"tags": ["master"],
"address": "127.0.0.1",
"port": 8000,
"checks": [
{
"script": "/usr/local/bin/check_redis.py",
"interval": "10s"
}
]
}}
其中,服务的名字是必须的,其它的字段可以自选,包括了服务的地址,端口,相应的健康检查的脚本。当用户注册了一个服务后,就可以通过Consul来查询该服务,获得该服务的状态。
Consul支持三种Check的模式:
-
调用一个外部脚本(Script),在该模式下,consul定时会调用一个外部脚本,通过脚本的返回内容获得对应服务的健康状态。
-
调用HTTP,在该模式下,consul定时会调用一个HTTP请求,返回2XX,则为健康;429 (Too many request)是警告。其它均为不健康
-
主动上报,在该模式下,服务需要主动调用一个consul提供的HTTP PUT请求,上报健康状态。
Python API提供对应的接口,大家可以参考 http://python-consul.readthedocs.org/en/latest/
-
Consul.Agent.Service
-
Consul.Agent.Check
Consul的Health Check和Zookeeper的Failure Detection略有不同,ZooKeeper可以利用ephemeral Node来检测服务的状态,Consul的Health Check,通过调用脚本,HTTP或者主动上报的方式检查服务的状态,更为灵活,可以获得等多的信息,但是也需要做更多的工作。
故障检测(Failure Detection)
Consul提供Session的概念,利用Session可以检查服务是否存活。
对每一个服务我们都可以创建一个session对象,注意这里我们设置了ttl,consul会以ttl的数值为间隔时间,持续的对session的存活做检查。对应的在服务中,我们需要持续的renew session,保证session是合法的。
import consul
import time
c = consul.Consul()
s = c.session.create(name="worker",behavior='delete',ttl=10)
print "session id is {}".format(s)
while True:
c.session.renew(s)
print "I am alive ..."
time.sleep(3)
Moniter代码用于监控worker相关联的session的状态,但发现worker session已经不存在了,就做出响应的处理。
import consul
import time
def is_session_exist(name, sessions):
for s in sessions:
if s['Name'] == name:
return True
return False
c = consul.Consul()
while True:
index, sessions = c.session.list()
if is_session_exist('worker', sessions):
print "worker is alive ..."
else:
print 'worker is dead!'
break
time.sleep(3)
这里注意,因为是基于ttl(最小10秒)的检测,从业务中断到被检测到,至少有10秒的时延,对应需要实时响应的情景,并不适用。Zookeeper使用ephemeral Node的方式时延相对短一点,但也非实时。
领导选举和分布式的锁
无论是Consul本身还是Python客户端,都不直接提供Leader Election的功能,但是这篇文档(http://www.consul.io/docs/guides/leader-election.html)介绍了如何利用Consul的KV存储来实现Leader Election,利用Consul的KV功能,可以很方便的实现领导选举和锁的功能。
当对某一个Key做put操作的时候,可以创建一个session对象,设置一个acquire标志为该 session,这样就获得了一个锁,获得所得客户则是被选举的leader。
代码如下:
import consul
import time
c = consul.Consul()
def request_lead(namespace, session_id):
lock = c.kv.put(leader_namespace,"leader check", acquire=session_id)
return lock
def release_lead(session_id):
c.session.destroy(session_id)
def whois_lead(namespace):
index,value = c.kv.get(namespace)
session = value.get('Session')
if session is None:
print 'No one is leading, maybe in electing'
else:
index, value = c.session.info(session)
print '{} is leading'.format(value['ID'])
def work_non_block():
print "working"
def work_block():
while True:
print "working"
time.sleep(3)
leader_namespace = 'leader/test'
## initialize leader key/value node
leader_index, leader_node = c.kv.get(leader_namespace)
if leader_node is None:
c.kv.put(leader_namespace,"a leader test")
while True:
whois_lead(leader_namespace)
session_id = c.session.create(ttl=10)
if request_lead(leader_namespace,session_id):
print "I am now the leader"
work_block()
release_lead(session_id)
else:
print "wait leader elected!"
time.sleep(3)
利用同样的机制,可以方便的实现锁,信号量等分布式的同步操作。
监视
Consul的Agent提供了Watch的功能,然而Python客户端并没有相应的接口。
etcd
etcd是另一个用GO开发的分布式协调应用,它提供一个分布式的Key/Value存储来进行共享的配置管理和服务发现。
同样的etcd使用基于HTTP的API,可以灵活的进行不同语言的绑定,我们用的是这个客户端https://github.com/jplana/python-etcd
基本操作
import etcd
client = etcd.Client()
client.write('/nodes/n1', 1)
print client.read('/nodes/n1').value
etcd对节点的操作和ZooKeeper类似,不过etcd不支持ZooKeeper的ephemeral Node的概念,要监控服务的状态似乎比较麻烦。
分布式锁
etcd支持分布式锁,以下是一个例子。
import sys
sys.path.append("../../")
import etcd
import uuid
import time
my_id = uuid.uuid4()
def work():
print "I get the lock {}".format(str(my_id))
client = etcd.Client()
lock = etcd.Lock(client, '/customerlock', ttl=60)
with lock as my_lock:
work()
lock.is_locked() # True
lock.renew(60)
lock.is_locked() # False
老版本的etcd支持leader election,但是在最新版该功能被deprecated了,参见https://coreos.com/etcd/docs/0.4.7/etcd-modules/
其它
我们针对分布式协调的功能讨论了三个不同的开源应用,其实还有许多其它的选择,我这里就不一一介绍,大家有兴趣可以访问以下的链接:
-
eureka https://github.com/Netflix/eurekaNetflix开发的定位服务,应用于fail over和load balance的功能
-
curator http://curator.apache.org/基于ZooKeeper的更高层次的封装
-
doozerd https://github.com/ha/doozerd基于GO的高可靠,分布式的数据存储,过去两年已经不活跃
-
openreplica http://openreplica.org/基于Python开发的,面向对象的接口的分布式应用协调的工具
-
serf http://www.serfdom.io/serf提供轻量级的cluster成员管理,故障检测(failure detection)和协调。开发基于GO语言。Consul使用了serf提供的功能
-
noah https://github.com/lusis/Noah基于ruby的ZooKeeper实现,过去三年不活跃
-
copy cat https://github.com/kuujo/copycat基于日志的分布式协调的框架,使用Java开发
总结
ZooKeeper无疑是分布式协调应用的最佳选择,功能全,社区活跃,用户群体很大,对所有典型的用例都有很好的封装,支持不同语言的绑定。缺点是,整个应用比较重,依赖于Java,不支持跨数据中心。
Consul作为使用Go语言开发的分布式协调,对业务发现的管理提供很好的支持,他的HTTP API也能很好的和不同的语言绑定,并支持跨数据中心的应用。缺点是相对较新,适合喜欢尝试新事物的用户。
etcd是一个更轻量级的分布式协调的应用,提供了基本的功能,更适合一些轻量级的应用来使用。
相关推荐
《Python金融大数据风控建模实战》是一本深入探讨如何利用Python进行金融大数据分析与风险控制建模的实践指南。在当今信息化社会,金融行业的风险控制变得尤为重要,而Python以其强大的数据处理能力、丰富的库资源...
【大数据分布式全文检索系统设计与实现】 随着信息技术的迅速进步,大数据已经成为各行各业的关键生产要素。在大数据时代,海量数据的快速增长使得数据挖掘和利用成为关注焦点。为了从这些海量信息中快速有效地获取...
Hadoop使用HDFS分布式文件系统存储数据,并通过MapReduce进行大规模并行计算。Spark则提供了更高效的数据处理能力,尤其在迭代计算和流处理方面表现出色。Python的PySpark库允许我们直接在Python环境中操作Spark集群...
本文主要探讨的是基于Python的大数据反电信诈骗管理系统的开发与实现,该系统旨在利用大数据技术来识别并预防电信诈骗行为。在当前社会,电信诈骗已成为一个严重的社会问题,因此,设计这样一个系统具有重大的现实...
Python基于大数据的图书分析系统的设计与实现(Python 毕业设计,带源码,教程),可作为毕业设计、课程设计、期末大作业等,下载即用,无需修改。 1. 运行教程 在项目文件夹下导入必要的包: pip install -r ...
3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 -------- ----------------------------...
分布式系统是一种由多台计算机通过网络互相连接,协同工作以实现共同目标的系统。...通过学习这个资料,开发者将能够理解如何利用Python构建可扩展、高效且可靠的分布式系统,从而应对日益复杂的计算需求和大数据挑战。
Python基于大数据的学习视频数据分析与个性化推荐系统源码+文档说明Python基于大数据的学习视频数据分析与个性化推荐系统源码+文档说明Python基于大数据的学习视频数据分析与个性化推荐系统源码+文档说明Python基于...
本次分享的知识点将围绕如何基于Spark进行藏汉双语语料大数据的分布式爬取展开,以下内容涵盖了分布式系统、分布式开发、Spark平台应用等关键知识点。 1. 分布式系统 分布式系统是指由多个计算机和外围设备共同组成...
Python金融大数据风控建模实战:基于机器学习源代码+文档说明,含有代码注释,新手也可看懂,个人手打98分项目,导师非常认可的高分项目,毕业设计、期末大作业和课程设计高分必看,下载下来,简单部署,就可以使用...
随着大数据技术的发展,利用Python语言设计分布式共享系统已成为一种趋势。本文将详细介绍如何使用Python语言设计一个分布式文件共享系统,并分析其背后的设计思路。 首先,分布式文件共享系统的设计是为了改善在...
基于python的局域网分布式深度学习计算系统代码.zip基于python的局域网分布式深度学习计算系统代码.zip基于python的局域网分布式深度学习计算系统代码.zip基于python的局域网分布式深度学习计算系统代码.zip基于...
通过PySpark,你可以学习如何创建SparkSession,读取HDFS或其他分布式存储系统中的数据,然后进行MapReduce操作,如数据过滤、聚合和转换。 机器学习库Scikit-learn也是Python大数据中的重要一环。它可以用于构建和...
本项目“Python-分布式系统中常用的的算法python实现”聚焦于将这些算法用Python语言进行实践,同时提供了实用的工具类。下面我们将详细探讨其中涉及的知识点。 1. **一致性哈希算法 (Consistent Hashing)**: 在...
毕业设计:Python基于大数据反电信诈骗管理系统的设计与实现(源码 + 数据库 + 说明文档) 2 开发技术简介 5 2.1 基于B/S结构开发 5 2.2 Django框架 5 2.3 MySQL数据库 5 2.4 python语言简介 6 3.1 可行性分析 6...
Python基于大数据的学习视频数据分析与个性化推荐系统源码+文档说明(高分项目).zip本资源中的源码都是经过本地编译过可运行的,资源项目的难度比较适中,内容都是经过助教老师审定过的能够满足学习、使用需求,...
基于Python和大数据hadoop电影分析系统源码+文档说明.zip,本项目是一套98分毕业设计系统,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业,包含:项目源码、...
本文介绍了一种基于大数据分布式计算下的数据挖掘技术,用于环境保护中的水质监测。该方法主要包括以下几个步骤:利用网络布点技术采集水样图像,然后对图像进行切割,并通过颜色矩来提取图像特征。最后,采用支持...
3. **大数据框架集成**:如Hadoop MapReduce可以通过PyDoop进行Python接口操作,Spark有PySpark接口,使得Python能直接处理大规模数据。 4. **机器学习库**:Scikit-learn提供多种机器学习算法,如线性回归、决策树...
Python基于大数据的学习视频数据分析与个性化推荐系统+源代码+文档说明.zip已获导师指导并通过的97分的高分大作业设计项目,可作为课程设计和期末大作业,下载即用无需修改,项目完整确保可以运行。 Python基于...