`
wbj0110
  • 浏览: 1602089 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现

阅读更多

  如果Spark的部署方式选择Standalone,一个采用Master/Slaves的典型架构,那么Master是有SPOF(单点故障,Single Point of Failure)。Spark可以选用ZooKeeper来实现HA。

     ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master但是只有一个是Active的,其他的都是Standby,当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。

 

1. Master的重启策略

Master在启动时,会根据启动参数来决定不同的Master故障重启策略:

  1. ZOOKEEPER实现HA
  2. FILESYSTEM:实现Master无数据丢失重启,集群的运行时数据会保存到本地/网络文件系统上
  3. 丢弃所有原来的数据重启

Master::preStart()可以看出这三种不同逻辑的实现。

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. override def preStart() {  
  2.     logInfo("Starting Spark master at " + masterUrl)  
  3.     ...  
  4.     //persistenceEngine是持久化Worker,Driver和Application信息的,这样在Master重新启动时不会影响  
  5.     //已经提交Job的运行  
  6.     persistenceEngine = RECOVERY_MODE match {  
  7.       case "ZOOKEEPER" =>  
  8.         logInfo("Persisting recovery state to ZooKeeper")  
  9.         new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)  
  10.       case "FILESYSTEM" =>  
  11.         logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)  
  12.         new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))  
  13.       case _ =>  
  14.         new BlackHolePersistenceEngine()  
  15.     }  
  16.     //leaderElectionAgent负责Leader的选取。  
  17.     leaderElectionAgent = RECOVERY_MODE match {  
  18.         case "ZOOKEEPER" =>  
  19.           context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))  
  20.         case _ => // 仅仅有一个Master的集群,那么当前的Master就是Active的  
  21.           context.actorOf(Props(classOf[MonarchyLeaderAgent], self))  
  22.       }  
  23.   }  

 

RECOVERY_MODE是一个字符串,可以从spark-env.sh中去设置。

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode""NONE")  

 

如果不设置spark.deploy.recoveryMode的话,那么集群的所有运行数据在Master重启是都会丢失,这个结论是从BlackHolePersistenceEngine的实现得出的。

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {  
  2.   override def addApplication(app: ApplicationInfo) {}  
  3.   override def removeApplication(app: ApplicationInfo) {}  
  4.   override def addWorker(worker: WorkerInfo) {}  
  5.   override def removeWorker(worker: WorkerInfo) {}  
  6.   override def addDriver(driver: DriverInfo) {}  
  7.   override def removeDriver(driver: DriverInfo) {}  
  8.   
  9.   override def readPersistedData() = (Nil, Nil, Nil)  
  10. }  

 

它把所有的接口实现为空。PersistenceEngine是一个trait。作为对比,可以看一下ZooKeeper的实现。

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)  
  2.   extends PersistenceEngine  
  3.   with Logging  
  4. {  
  5.   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir""/spark") + "/master_status"  
  6.   val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)  
  7.   
  8.   SparkCuratorUtil.mkdir(zk, WORKING_DIR)  
  9.   // 将app的信息序列化到文件WORKING_DIR/app_{app.id}中  
  10.   override def addApplication(app: ApplicationInfo) {  
  11.     serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)  
  12.   }  
  13.   
  14.   override def removeApplication(app: ApplicationInfo) {  
  15.     zk.delete().forPath(WORKING_DIR + "/app_" + app.id)  
  16.   }  

 

Spark使用的并不是ZooKeeper的API,而是使用的org.apache.curator.framework.CuratorFramework 和 org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} 。Curator在ZooKeeper上做了一层很友好的封装。

 

2. 集群启动参数的配置

简单总结一下参数的设置,通过上述代码的分析,我们知道为了使用ZooKeeper至少应该设置一下参数(实际上,仅仅需要设置这些参数。通过设置spark-env.sh:

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. spark.deploy.recoveryMode=ZOOKEEPER  
  2. spark.deploy.zookeeper.url=zk_server_1:2181,zk_server_2:2181  
  3. spark.deploy.zookeeper.dir=/dir     
  4. // OR 通过一下方式设置  
  5. export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER "  
  6. export SPARK_DAEMON_JAVA_OPTS="${SPARK_DAEMON_JAVA_OPTS} -Dspark.deploy.zookeeper.url=zk_server1:2181,zk_server_2:2181"  

 

 

各个参数的意义:

参数
默认值
含义
spark.deploy.recoveryMode
NONE
恢复模式(Master重新启动的模式),有三种:1, ZooKeeper, 2, FileSystem, 3 NONE
spark.deploy.zookeeper.url
  ZooKeeper的Server地址
spark.deploy.zookeeper.dir
/spark
ZooKeeper 保存集群元数据信息的文件目录,包括Worker,Driver和Application。

 

3. CuratorFramework简介 

CuratorFramework极大的简化了ZooKeeper的使用,它提供了high-level的API,并且基于ZooKeeper添加了很多特性,包括

  • 自动连接管理:连接到ZooKeeper的Client有可能会连接中断,Curator处理了这种情况,对于Client来说自动重连是透明的。
  • 简洁的API:简化了原生态的ZooKeeper的方法,事件等;提供了一个简单易用的接口。
  • Recipe的实现(更多介绍请点击Recipes):
    • Leader的选择
    • 共享锁
    • 缓存和监控
    • 分布式的队列
    • 分布式的优先队列

 

CuratorFrameworks通过CuratorFrameworkFactory来创建线程安全的ZooKeeper的实例。

CuratorFrameworkFactory.newClient()提供了一个简单的方式来创建ZooKeeper的实例,可以传入不同的参数来对实例进行完全的控制。获取实例后,必须通过start()来启动这个实例,在结束时,需要调用close()。

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. /** 
  2.      * Create a new client 
  3.      * 
  4.      * 
  5.      * @param connectString list of servers to connect to 
  6.      * @param sessionTimeoutMs session timeout 
  7.      * @param connectionTimeoutMs connection timeout 
  8.      * @param retryPolicy retry policy to use 
  9.      * @return client 
  10.      */  
  11.     public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)  
  12.     {  
  13.         return builder().  
  14.             connectString(connectString).  
  15.             sessionTimeoutMs(sessionTimeoutMs).  
  16.             connectionTimeoutMs(connectionTimeoutMs).  
  17.             retryPolicy(retryPolicy).  
  18.             build();  
  19.     }  


需要关注的还有两个Recipe:org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}。

 

首先看一下LeaderlatchListener,它在LeaderLatch状态变化的时候被通知:

  1. 在该节点被选为Leader的时候,接口isLeader()会被调用
  2. 在节点被剥夺Leader的时候,接口notLeader()会被调用

由于通知是异步的,因此有可能在接口被调用的时候,这个状态是准确的,需要确认一下LeaderLatch的hasLeadership()是否的确是true/false。这一点在接下来Spark的实现中可以得到体现。

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. /** 
  2. * LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed. 
  3. * 
  4. * Note that just because you are in the middle of one of these method calls, it does not necessarily mean that 
  5. * hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes 
  6. * before these methods get called. The contract is that if that happens, you should see another call to the other 
  7. * method pretty quickly. 
  8. */  
  9. public interface LeaderLatchListener  
  10. {  
  11.   /** 
  12. * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true. 
  13. * 
  14. * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If 
  15. * this occurs, you can expect {@link #notLeader()} to also be called. 
  16. */  
  17.   public void isLeader();  
  18.   
  19.   /** 
  20. * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false. 
  21. * 
  22. * Note that it is possible that by the time this method call happens, hasLeadership has become true. If 
  23. * this occurs, you can expect {@link #isLeader()} to also be called. 
  24. */  
  25.   public void notLeader();  
  26. }  

 

LeaderLatch负责在众多连接到ZooKeeper Cluster的竞争者中选择一个Leader。Leader的选择机制可以看ZooKeeper的具体实现,LeaderLatch这是完成了很好的封装。我们只需要要知道在初始化它的实例后,需要通过

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. public class LeaderLatch implements Closeable  
  2. {  
  3.     private final Logger log = LoggerFactory.getLogger(getClass());  
  4.     private final CuratorFramework client;  
  5.     private final String latchPath;  
  6.     private final String id;  
  7.     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);  
  8.     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);  
  9.     private final AtomicReference<String> ourPath = new AtomicReference<String>();  
  10.     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();  
  11.     private final CloseMode closeMode;  
  12.     private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();  
  13. .  
  14. .  
  15. .  
  16.     /** 
  17.      * Attaches a listener to this LeaderLatch 
  18.      * <p/> 
  19.      * Attaching the same listener multiple times is a noop from the second time on. 
  20.      * <p/> 
  21.      * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded 
  22.      * executor so that you can be certain that listener methods are called in sequence, but if you are fine with 
  23.      * them being called out of order you are welcome to use multiple threads. 
  24.      * 
  25.      * @param listener the listener to attach 
  26.      */  
  27.     public void addListener(LeaderLatchListener listener)  
  28.     {  
  29.         listeners.addListener(listener);  
  30.     }  

 

 

通过addListener可以将我们实现的Listener添加到LeaderLatch。在Listener里,我们在两个接口里实现了被选为Leader或者被剥夺Leader角色时的逻辑即可。

 

 

4. ZooKeeperLeaderElectionAgent的实现

实际上因为有Curator的存在,Spark实现Master的HA就变得非常简单了,ZooKeeperLeaderElectionAgent实现了接口LeaderLatchListener,在isLeader()确认所属的Master被选为Leader后,向Master发送消息ElectedLeader,Master会将自己的状态改为ALIVE。当noLeader()被调用时,它会向Master发送消息RevokedLeadership时,Master会关闭。

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,  
  2.     masterUrl: String, conf: SparkConf)  
  3.   extends LeaderElectionAgent with LeaderLatchListener with Logging  {  
  4.   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir""/spark") + "/leader_election"  
  5.   // zk是通过CuratorFrameworkFactory创建的ZooKeeper实例  
  6.   private var zk: CuratorFramework = _  
  7.   // leaderLatch:Curator负责选出Leader。  
  8.   private var leaderLatch: LeaderLatch = _  
  9.   private var status = LeadershipStatus.NOT_LEADER  
  10.   
  11.   override def preStart() {  
  12.   
  13.     logInfo("Starting ZooKeeper LeaderElection agent")  
  14.     zk = SparkCuratorUtil.newClient(conf)  
  15.     leaderLatch = new LeaderLatch(zk, WORKING_DIR)  
  16.     leaderLatch.addListener(this)  
  17.   
  18.     leaderLatch.start()  
  19.   }  

 

 

在prestart中,启动了leaderLatch来处理选举ZK中的Leader。就如在上节分析的,主要的逻辑在isLeader和noLeader中。

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. override def isLeader() {  
  2.   synchronized {  
  3.     // could have lost leadership by now.  
  4.     //现在leadership可能已经被剥夺了。。详情参见Curator的实现。  
  5.     if (!leaderLatch.hasLeadership) {  
  6.       return  
  7.     }  
  8.   
  9.     logInfo("We have gained leadership")  
  10.     updateLeadershipStatus(true)  
  11.   }  
  12. }  
  13.   
  14. override def notLeader() {  
  15.   synchronized {  
  16.     // 现在可能赋予leadership了。详情参见Curator的实现。  
  17.     if (leaderLatch.hasLeadership) {  
  18.       return  
  19.     }  
  20.   
  21.     logInfo("We have lost leadership")  
  22.     updateLeadershipStatus(false)  
  23.   }  
  24. }  

 

updateLeadershipStatus的逻辑很简单,就是向Master发送消息。

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. def updateLeadershipStatus(isLeader: Boolean) {  
  2.     if (isLeader && status == LeadershipStatus.NOT_LEADER) {  
  3.       status = LeadershipStatus.LEADER  
  4.       masterActor ! ElectedLeader  
  5.     } else if (!isLeader && status == LeadershipStatus.LEADER) {  
  6.       status = LeadershipStatus.NOT_LEADER  
  7.       masterActor ! RevokedLeadership  
  8.     }  
  9.   }  

 

 

5. 设计理念

为了解决Standalone模式下的Master的SPOF,Spark采用了ZooKeeper提供的选举功能。Spark并没有采用ZooKeeper原生的Java API,而是采用了Curator,一个对ZooKeeper进行了封装的框架。采用了Curator后,Spark不用管理与ZooKeeper的连接,这些对于Spark来说都是透明的。Spark仅仅使用了100行代码,就实现了Master的HA。当然了,Spark是站在的巨人的肩膀上。谁又会去重复发明轮子呢?

http://blog.csdn.net/anzhsoft/article/details/33740737

分享到:
评论

相关推荐

    Master基于ZooKeeper的HighAvailability源码实现

    Spark可以选用ZooKeeper来实现HA。ZooKeeper提供了一个LeaderElection机制,利用这个机制可以保证虽然集群存在多个Master但是只有一个是Active的,其他的都是Standby,当Active的Master出现故障时,另外的一个...

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...

    大数据处理利器:Spark+ZooKeeper+Kafka Scala源码示例

    项目核心采用Spark进行批处理与流处理,整合了ZooKeeper和Kafka以增强分布式计算和数据流管理能力。文件类型多样,包括175个class文件,109个crc校验文件,82个Parquet数据文件,以及67个Scala源码文件等。 项目...

    基于zookeeper的hadoop ha集群安装过程

    基于ZooKeeper的Hadoop HA集群安装过程 在大数据时代,Hadoop作为大数据处理的核心技术,高可用性(HA)是企业对其进行部署和维护的必备条件。基于ZooKeeper的Hadoop HA集群可以提供高可用性和可扩展性,满足企业对大...

    深入解析ZooKeeper分布式环境搭建+编程知识+技术开发

    zookeeper之分布式环境搭建:深入解析ZooKeeper分布式环境搭建+编程知识+技术开发; zookeeper之分布式环境搭建:深入解析ZooKeeper分布式环境搭建+编程知识+技术开发; zookeeper之分布式环境搭建:深入解析...

    Kafka技术内幕:图文详解Kafka源码设计与实现 高清带书签

    《Kafka技术内幕:图文详解Kafka源码设计与实现》这本书深入剖析了Apache Kafka这一分布式消息系统的内在机制,旨在帮助读者理解Kafka的核心设计理念,掌握其实现方式,并能运用到实际项目中。以下是该书可能涵盖的...

    Kafka技术内幕:图文详解Kafka源码设计与实现

    《Kafka技术内幕:图文详解Kafka源码设计与实现》这本书由郑奇煌撰写,主要探讨了Apache Kafka这一分布式流处理平台的核心技术与实现原理。Kafka作为大数据处理领域的重要工具,它在实时数据传输、消息队列、日志...

    Kafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌(2017.11).pdf

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是郑奇煌在2017年11月出版的一本深入解析Apache Kafka的技术专著。这本书详细介绍了Kafka的核心概念、工作原理以及源码分析,旨在帮助读者理解并掌握这个分布式流...

    基于ZooKeeper的分布式Session实现

    标题 "基于ZooKeeper的分布式Session实现" 涉及的是在分布式系统中如何利用Apache ZooKeeper来管理和共享Session信息。ZooKeeper是一款开源的分布式协调服务,它为分布式应用程序提供了一个简单一致的接口,用于处理...

    zookeeper 3.6.3 源码下载

    通过阅读源码,我们可以深入了解ZooKeeper如何处理并发、如何进行数据同步、如何实现Watcher机制,以及如何保证分布式一致性。这对于我们优化ZooKeeper性能、解决实际问题或开发类似服务具有极高的价值。 在...

    消息队列:Kafka:Kafka与Zookeeper集成教程.docx

    消息队列:Kafka:Kafka与Zookeeper集成教程.docx

    C#基于zookeeper分布式锁的实现源码

    总之,C#中基于ZooKeeper的分布式锁实现涉及对ZooKeeper的操作,包括创建临时顺序节点、监听节点变化以及正确释放锁。这样的实现方式保证了在分布式环境下的并发控制和数据一致性,同时具备良好的扩展性和容错性。...

    吊打面试官之基于zookeeper实现分布式锁源码

    通过配置`zookeeper:`节点,指定ZooKeeper服务器地址,然后在代码中使用`ZookeeperClient`进行操作。 **注意事项** 1. **锁超时机制**:为了避免死锁,需要设置锁的超时时间,超过这个时间未释放锁的客户端应该被...

    基于zookeeper的分布式锁简单实现

    分布式锁是一种在分布式系统中实现同步的技术,它允许多个节点在同一时刻访问共享资源。在大型分布式环境中,由于网络延迟和并发操作,简单的本地锁可能无法有效解决数据一致性问题。这时,Zookeeper,一个高可用的...

    ZooInspector:一款ZooKeeper的可视化工具

    如果您想深入了解其内部实现或进行二次开发,可以从源码入手。同时,`build`目录可能包含了编译和打包的相关脚本或工件,用于构建和部署ZooInspector。如果需要自定义配置或进行扩展,这部分资源将非常有用。 **...

    twill-zookeeper-0.6.0-incubating-API文档-中文版.zip

    Maven坐标:org.apache.twill:twill-zookeeper:0.6.0-incubating; 标签:apache、zookeeper、twill、jar包、java、中文文档; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。...

    zookeeper_chinese:基于官方zookeeper3.7分支源码的翻译

    Apache ZooKeeper For the latest information about Apache ZooKeeper, please visit our website at:and our wiki, at:基于Zookeeper3.7版本提供整套代码的注释和运行流程的解读

    twill-zookeeper-0.6.0-incubating-API文档-中英对照版.zip

    Maven坐标:org.apache.twill:twill-zookeeper:0.6.0-incubating; 标签:apache、zookeeper、twill、jar包、java、API文档、中英对照版; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可...

    第四课:zookeeper ZAB协议实现源码分析1

    在本课程中,我们将深入探讨Zookeeper的ZAB协议实现,并通过源码分析来理解其启动流程、快照与事务日志的存储结构。Zookeeper是一款分布式协调服务,广泛应用于分布式系统中,如Hadoop、HBase等。本节主要关注...

Global site tag (gtag.js) - Google Analytics