背景
前段时间学习了zookeeper后,在新的项目中刚好派上了用场,我在项目中主要负责分布式任务调度模块的开发,对我自己来说是个不小的挑战。
分布式的任务调度,技术上我们选择了zookeeper,具体的整个分布式任务调度的架构选择会另起一篇文章进行介绍。
本文主要是介绍自己在项目中zookeeper的一些扩展使用,希望可以对大家有所帮助。
项目中使用的zookeeper版本3.3.3,对应的文档地址: http://zookeeper.apache.org/doc/trunk/
扩展一:优先集群
先来点背景知识:
1.zookeeper中的server机器之间会组成leader/follower集群,1:n的关系。采用了paxos一致性算法保证了数据的一致性,就是leader/follower会采用通讯的方式进行投票来实现paxns。
2.zookeeper还支持一种observer模式,提供只读服务不参与投票,提升系统,对应文档: http://zookeeper.apache.org/doc/trunk/zookeeperObservers.html
我们项目特性的决定了我们需要进行跨机房操作,比如杭州,美国,香港,青岛等多个机房之间进行数据交互。
跨机房之间对应的网络延迟都比较大,比如中美机房走海底光缆有ping操作200ms的延迟,杭州和青岛机房有70ms的延迟。
为了提升系统的网络性能,我们在部署zookeeper网络时会在每个机房部署节点,多个机房之间再组成一个大的网络保证数据一致性。(zookeeper千万别再搞多个集群)
最后的部署结构就会是:
- 杭州机房 >=3台 (构建leader/follower的zk集群)
- 青岛机房 >=1台 (构建observer的zk集群)
- 美国机房 >=1台 (构建observer的zk集群)
- 香港机房 >=1台 (构建observer的zk集群)

- 先使用美国机房的集群ip初始化一次zk client
- 通过反射方式,强制在初始化后的zk client中的server列表中又加入杭州机房的机器列表
- ZooKeeper zk = null;
- try {
- zk = new ZooKeeper(cluster1, sessionTimeout, new AsyncWatcher() {
- public void asyncProcess(WatchedEvent event) {
- //do nothing
- }
- });
- if (serveraddrs.size() > 1) {
- // 强制的声明accessible
- ReflectionUtils.makeAccessible(clientCnxnField);
- ReflectionUtils.makeAccessible(serverAddrsField);
- // 添加第二组集群列表
- for (int i = 1; i < serveraddrs.size(); i++) {
- String cluster = serveraddrs.get(i);
- // 强制获取zk中的地址信息
- ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
- List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils
- .getField(serverAddrsField, cnxn);
- // 添加第二组集群列表
- serverAddrs.addAll(buildServerAddrs(cluster));
- }
- }
- }
扩展二:异步Watcher处理
最早在看zookeeper的代码时,一直对它的watcher处理比较满意,使用watcher推送数据可以很方便的实现分布式锁的功能。
zookeeper的watcher实现原理也挺简单的,就是在zookeeper client和zookeeper server上都保存一份对应的watcher对象。每个zookeeper机器都会有一份完整的node tree数据和watcher数据,每次leader通知follower/observer数据发生变更后,每个zookeeper server会根据自己节点中的watcher事件推送给响应的zookeeper client,每个zk client收到后再根据内存中的watcher引用,进行回调。
这里会有个问题,就是zk client在处理watcher时,回凋的过程是一个串行的执行过程,所以单个watcher的处理慢会影响整个列表的响应。
可以看一下ClientCnxn类中的EventThread处理,该线程会定时消费一个queue的数据,挨个调用processEvent(Object event) 进行回调处理。
扩展代码:
- public abstract class AsyncWatcher implements Watcher {
- private static final int DEFAULT_POOL_SIZE = 30;
- private static final int DEFAULT_ACCEPT_COUNT = 60;
- private static ExecutorService executor = new ThreadPoolExecutor(
- 1,
- DEFAULT_POOL_SIZE,
- 0L,
- TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue(
- DEFAULT_ACCEPT_COUNT),
- new NamedThreadFactory(
- "Arbitrate-Async-Watcher"),
- new ThreadPoolExecutor.CallerRunsPolicy());
- public void process(final WatchedEvent event) {
- executor.execute(new Runnable() {//提交异步处理
- @Override
- public void run() {
- asyncProcess(event);
- }
- });
- }
- public abstract void asyncProcess(WatchedEvent event);
- }
- zookeeper针对watcher的调用是以单线程串行的方式进行处理,容易造成堵塞影响,monitor的数据同步及时性
- AsyncWatcher为采取的一种策略为当不超过acceptCount=60的任务时,会采用异步线程的方式处理。如果超过60任务,会变为原先的单线程串行的模式
扩展三:重试处理
这个也不多说啥,看一下相关文档就清楚了
- http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
- http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A3
- public interface ZooKeeperOperation<T> {
- public T execute() throws KeeperException, InterruptedException;
- }
- /**
- * 包装重试策略
- */
- public <T> T retryOperation(ZooKeeperOperation<T> operation) throws KeeperException,
- InterruptedException {
- KeeperException exception = null;
- for (int i = 0; i < maxRetry; i++) {
- try {
- return (T) operation.execute();
- } catch (KeeperException.SessionExpiredException e) {
- logger.warn("Session expired for: " + this + " so reconnecting due to: " + e, e);
- throw e;
- } catch (KeeperException.ConnectionLossException e) { //特殊处理Connection Loss
- if (exception == null) {
- exception = e;
- }
- logger.warn("Attempt " + i + " failed with connection loss so "
- + "attempting to reconnect: " + e, e);
- retryDelay(i);
- }
- }
- throw exception;
- }
注意点:Watcher原子性
在使用zookeeper的过程中,需要特别注意一点就是注册对应watcher事件时,如果当前的节点已经满足了条件,比如exist的watcher,它不会触发你的watcher,而会等待下一次watcher条件的满足。
它的watcher是一个一次性的监听,而不是一个永久的订阅过程。所以在watcher响应和再次注册watcher过程并不是一个原子操作,编写多线程代码和锁时需要特别注意
总结
zookeepr是一个挺不错的产品,源代码写的也非常不错,大量使用了queue和异步Thread的处理模式,真是一个伟大的产品。
http://agapple.iteye.com/blog/1184023
相关推荐
实现Zookeeper的异地多活面临的主要挑战是如何避免跨机房的循环复制。为此,通常采取特定的架构设计,例如原生集群配合Observer角色,Observer节点只负责写入其他机房的ZK事务,不参与选举,以此避免写操作的循环。...
Observer 主要用于提升读取性能,适用于读多写少的场景,尤其是在客户端数量庞大或跨机房部署的情况下。 Zookeeper 的配置文件 `zoo.cfg` 包含了一些重要的参数,如: - **tickTime**:基础时间单位,通常设置为...
该系统可以实现跨机房的数据库同步,确保数据的一致性和高可用性。 系统架构 Otter 的系统架构主要包括三个部分:Manager 节点、Node 节点和 ZooKeeper 节点。Manager 节点负责同步配置和状态反馈,Node 节点负责...
跨机房复制时,数据处理阶段(S/E/T/L)可能分布在不同节点上,通过Zookeeper协同工作。 Otter 中的关键概念包括Pipeline(描述从源到目标的完整同步过程)、Channel(单向同步的组成部分,在双向同步中由两个...
同样导入并运行,消费者会从Zookeeper中发现服务提供者,并发起远程调用。 4. **配置解析**:在服务提供者和消费者中,都需要配置Dubbo的元数据,包括服务接口、版本、应用名、注册中心地址等。这些信息通常在`...
- **分散服务到多个机房**:通过跨地域复制,可以将服务分布到不同的地理位置,降低单点故障的风险。 - **应对机房级别的故障**:即使某个机房出现故障或中断,其他机房仍然能够接管服务,确保业务连续性不受影响。 ...
- **本地IDC发现:**优先在本地IDC内寻找服务实例,减少跨机房访问延迟。 - **跨IDC发现:**当本地IDC无法满足需求时,尝试从其他IDC获取服务实例。 #### 六、总结 微博的服务发现高可用实践充分考虑了服务化的...
Canal的设计初衷是为了满足阿里巴巴在杭州和美国双机房部署时的跨机房同步需求。 3. 管理系统架构: Otter的典型管理系统架构包括manager(WEB管理)和node(工作节点)。Manager负责推送同步配置到node节点,而...
去哪儿网通过PXC和QMHA的结合使用,有效解决了业务发展中的数据库挑战,实现了高并发交易的实时处理和数据的跨机房高可用复制。这些技术和经验对于其他希望构建高效、稳定数据库架构的公司来说,具有很高的参考价值...
QMHA的多线程复制和跨机房部署使得数据同步效率提高,failover切换时间缩短至8-16s,switchover只需2秒,增强了系统的安全性。 在后续的改进中,QMHA需要处理的问题包括自动补全binlog以确保数据完整性,权重控制以...
小规模系统可以选择CP原则(强一致性和分区容错性)的注册中心,如ZooKeeper,而大规模或跨机房系统则倾向于选择AP原则(可用性和分区容错性)的etcd或Consul。Eureka在Netflix生态系统中用于服务发现,具备良好的高...
QMHA的高可用性体现在其无网络分区设计、跨机房部署能力、主从一致性、零事务丢失、后台配置中心以及集群维护的透明化。它在性能和一致性之间找到了良好的平衡,既不像MMM/MHA那样易受网络影响,也不像PXC那样对网络...
起初,Canal被用来解决跨机房同步问题,通过业务触发器获取增量数据。随着时间推移,它逐渐演变成一个通用的增量数据处理平台,支持多种应用场景,如数据库镜像、实时备份、索引构建、缓存刷新等。目前,Canal支持...
随着业务的发展,微服务可能需要在多个机房部署,这就需要服务注册表支持跨机房的服务发现和通信,以确保高可用性和容错性。 总结来说,微服务架构的组件设计涉及服务注册与发现、服务调用、API网关等多个方面,...
魅族成功进行了机房内的NameNode迁移以及跨机房集群迁移,保证了服务的连续性。监控告警系统,如Ganglia和Nagios,用于实时监控集群状态并及时发出告警。HDFS的存储管理则通过工具化手段进行,包括用户目录空间的...
- 数据同步过程可能会跨越多个机房,每个阶段如选择(Select)、提取(Extract)、转换(Transform)和加载(Load)可能分布在不同的Node节点上。 - 多个Node节点通过Zookeeper进行协同工作,确保数据的一致性和...
随着业务发展,第二代JSF自研了框架,以适应多机房、跨语言的需求,并引入了自研注册中心和更强大的管理平台。 一个完善的服务化框架还需要包含配置中心、接口文档管理、监控中心、分布式跟踪、服务治理和网关等...
该平台旨在提供自助化、自动化和智能化的数据库运维服务,面向所有用户,支持高可用和横向扩展部署,可纳管上万台数据库,并能跨机房/网络分区部署。平台的功能层次包括标准化架构、资源适配、自动化操作和服务目录...
在第八代架构中,使用LVS或F5使用多个Nginx负载均衡一个域名多个IP,当用户访问的时候,通过DNS服务器轮询,来实现多个机房或跨机房的访问,达到千万级别到亿级别,通过增加机房解决,跨机房的一致性问题。...
Eureka通过区域(region)和区域内的具体机房(zone)的概念来进行服务的分区,使得服务发现更加灵活和贴近实际的物理网络拓扑。 Nacos是阿里巴巴开源的动态服务发现、配置管理和服务管理平台,它集成了服务注册与...