`

zookeeper 分布式任务

 
阅读更多
网上说了一大堆关于zookeeper选举,参差不齐,貌似没有啥代码可以copy,把自己也给绕进去了,这里结合一些网上的看法,自己写了代码,关于zookeeper的。
思路大概是这样:
1、每个节点进来查询是否有任务发布路径,如果没有,创建任务发布路径,自己成为领导;这个过程加锁,所以集群只有一个领导。
2、每个应用都可以发布任务。把命令写在发布路径的data里;所有应用监听到发布路径修改,执行任务类型;每个任务监听到任务后,会失去监听,那么再监听回任务发布路径,实现无限监听;
3、所有节点监听到领导挂了,再选举一个节点做为领导,具有容灾性质;

zookeeper要结合分布式内存存储工具(像memcached、redis、MQ工具等),能发挥很强大的分布式计算功能。有些很强大的工具都用到zookeeper了,像hadoop、storm、阿里dubbo等等。
这里基zookeeper的选举,没有那么复杂,抛砖引玉了
2014-04-27:经过一个项目的熏陶,对zookeeper有了一定的理解,Paxos是zookeeper的核心算法,我们是在zookeeper基础上做开发,算法理解就行了,但别给算法给绑死了,没有哪个程序要求一定要实现什么算法,必须知道zookeeper的作用是协调分布式应用。

/**
*
* @author lyq
*
*/
public class ZkAppCenter implements Watcher {

// private static ZkAppInfo info = null;

public static final String appGroup = "/APP";
private String hosts = BaseCode.getValue("ZK_HOST");
private Integer SESSION_TIMEOUT = Integer.parseInt(BaseCode.getValue("ZK_SESSION_TIMEOUT"));
String taskPath = appGroup + "_leader";

public CountDownLatch lockLatch = null;


private ZooKeeper zk = null;
protected static final Logger log =  Logger.getLogger(ZkAppCenter.class);

private String appName = null;

// public static ZkAppInfo getInstance(){
// if(info == null){
// info = new ZkAppInfo();
// }
// return info;
// }
private ZooKeeper getZK() throws Exception{
if(zk==null){
zk = new ZooKeeper(hosts, SESSION_TIMEOUT,this);
}
return zk;
}
/**
*
* @return
* @throws Exception
*/
public String registerApp() throws Exception{
if(getZK().exists(appGroup, false)==null){
getZK().create(appGroup,appGroup.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
ZkLock lock = new ZkLock(appGroup+"lock",this.getZK()) {//创建应用构建必须是有序的,锁在我上篇文字
@Override
public Object doAction()  {
try {
String addr = InetAddress.getLocalHost().getHostAddress();
appName = zk.create(appGroup+"/"+addr+"-",new byte[0] , Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
if(zk.exists(taskPath, false)==null){
zk.create(taskPath, "leader".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//创建任务发布路径
}
zk.getData(taskPath,true, null);
// listenChain();
// listenLeader();//监听leader节点
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}//每个应用注册进来必须监听leader创建的leader路径
}
};
lockLatch = lock.latch;
lock.execute();


return appName;
}


//发布应用任务时,只需要修改taskPath,因为所有应用都有watch taskPath,所以所有应用都执行了该任务;(任务集合可以放在集群内存队列里面,让每个应用去拿)



public void doTask(String taskName) throws Exception{
ZkTask taskBean = ZkTaskFactory.getInstance().getTask(taskName);
Object obj = taskBean.execute();
if(obj!=null){
this.getZK().setData(taskPath, taskName.getBytes(), -1);
}else{
                       this.getZK().setData(taskPath,taskName+":end",-1);//结束命令
                }
}


public Integer getAppsSize() throws Exception{
List<String> list =  this.getZK().getChildren(appGroup, false);
Integer appsSize = list==null?0:list.size();
if(appsSize==0){
log.error("集群失败,没有'"+appGroup+"'该应用群组里。");
}
return appsSize;
}

public static void main(String[] args) throws Exception {
try{
final CountDownLatch latchtask = new CountDownLatch(10);
for(int i = 0;i<10;i++){
final int ia = i;
new Thread(){
public void run(){
ZkAppCenter reg = new ZkAppCenter();
try {
reg.registerApp();
latchtask.await();
Thread.sleep(4000);
if(ia==9){
List<ZkTaskBean> list = new ArrayList();
for(int i=0;i<8000;i++){
list.add(new DoloadTaskBean());
}
new ZkPlan(BaseCode.RD_DOLOAD_TASKS,list,reg.getAppsSize(), 1000);
reg.doTask("TestTask");

}
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
latchtask.countDown();
}


Thread.sleep(Long.MAX_VALUE);
}finally{

}
}


//2014-04-27 lyq 这里有个问题,如果process执行太久,会导致其他任务发布不了,估计是加锁了,所以process方法中执行任务时,最好开启一个子线程:
new Thread(){
   public void run() {
   try {
   doTask(task);
} catch (Exception e) {
e.printStackTrace();
}
   }
}.start();

这样各种类型任务互不干扰,项目实战经验啊。其实想过为每个任务建zookeeper连接,但是zookeeper默认就10个连接,建太多连接还要抽象出代码去同意管理连接,因为连接是长连接,如果不抽象这块连接的程序,那么当程序挂了要费很大的劲去为每个连接建立容灾的程序;还是单个连接的路线吧。还有一些容灾的方法,跟本博客redis做法是一样的:如果失去连接,让所有任务进入等待,关闭zookeeper,重新在zookeeper注册应用。再让zookeeper任务继续执行,这里在getZk()这个方法放入监听锁~~
多线程和分布式的应用,可以充分体验锁的魅力。

zookeeper真心不错~~~
@Override
public void process(WatchedEvent event) {
try {
String eventPath = event.getPath();
if(eventPath!=null&&eventPath.contains("lock")){
this.lockLatch.countDown();//锁
}else if (event.getType() == EventType.None && event.getState() == KeeperState.SyncConnected) {
String addr = InetAddress.getLocalHost().getHostAddress();
log.info(addr + " register......");
}else if(event.getType() == EventType.NodeDeleted){
log.info(eventPath+"节点删除..........");
if(eventPath.equals(this.taskPath)){
this.registerApp();//监听到父节点删除,那么重新触发注册,防止领导节点挂掉、
}
}else{
String task = new String(this.getZK().getData(eventPath, true, null));//每次监听到任务,继续监听

doTask(task);
// }
this.log.info("this.appName:"+this.appName);
//
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
0
0
分享到:
评论

相关推荐

    从Paxos到Zookeeper 分布式一致性原理与实践 PDF电子书下载 带目录书签 完整版.pdf

    在分布式系统中,多个节点协同工作来完成复杂的任务,而如何确保这些节点之间的一致性则成为了一个重要的问题。本书《从Paxos到Zookeeper:分布式一致性原理与实践》深入浅出地讲解了分布式一致性算法及其应用,并以...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践

    《从PAXOS到ZOOKEEPER分布式一致性原理与实践》是一本深入探讨分布式系统中一致性问题的重要书籍。它涵盖了PAXOS算法的基础理论以及ZOOKEEPER在实际应用中的实践经验,是理解分布式一致性机制的宝贵资料。在这个高度...

    ZooKeeper分布式过程协同技术详解

    在分布式系统中,数据一致性、命名服务、配置管理、集群管理等任务常常变得复杂且难以处理,而ZooKeeper则通过提供一系列简单的原语来帮助解决这些问题。 1. **ZooKeeper的基本概念** - **节点(ZNode)**:...

    zookeeper分布式进程协同.pdf

    【Zookeeper:分布式进程协同】 Zookeeper是一款广泛应用于互联网行业的分布式协调服务,它为分布式应用程序提供了高可用性、一致性以及顺序访问控制等关键特性。在没有Zookeeper的世界中,分布式系统的构建将面临...

    zookeeper分布式锁实现和客户端简单实现

    **分布式锁的概念与重要性** 在分布式系统中,由于多台服务器之间可能存在数据不一致的情况,因此需要一种机制来确保在并发...理解并熟练掌握Zookeeper的分布式锁机制,对于构建高可用、高性能的分布式系统至关重要。

    Zookeeper 分布式过程.pdf

    Apache ZooKeeper是一个开源的分布式协调服务,它主要用来协调分布式应用中那些相对独立的服务。ZooKeeper的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,为用户程序提供简单易用的接口。本书...

    ZooKeeper-分布式过程协同技术详解 和从Paxos到Zookeeper

    ZooKeeper是由Apache软件基金会开发的一个开源项目,主要目标是简化分布式环境下的协调任务,为分布式应用程序提供高度可用、一致且有序的服务。 首先,ZooKeeper的核心概念包括节点(Znode)、会话(Session)和...

    从Paxos到Zookeeper 分布式一致性原理与实践 [倪超著]

    在分布式一致性实践部分,作者可能会分享实际案例,展示如何利用Zookeeper解决实际问题,如数据库复制、负载均衡、分布式任务调度等。此外,书中可能还会讨论Zookeeper与其他分布式一致性解决方案,如Raft算法、...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践&zookeeper;-3.4.6总结

    Paxos算法是解决这一问题的经典方案,而ZooKeeper则是Apache基金会开发的一个分布式协调服务,它基于Paxos和其他一致性算法实现,为分布式应用提供可靠的分布式协调服务。 Paxos算法是由Leslie Lamport提出的,它...

    Spring Task+Zookeeper分布式定时任务调度组件-Taroco-Scheduler.zip

    Taroco-Scheduler项目正是利用了Spring Task和Zookeeper的特性,创建了一个分布式任务调度解决方案。在Taroco-Scheduler-master这个压缩包中,我们可以找到项目的源代码,包括服务端(scheduler-server)和客户端...

    zookeeper分布式协调案例

    《Zookeeper分布式协调案例详解》 Zookeeper,作为Apache软件基金会的一个开源项目,是分布式系统中的核心组件,专为管理分布式应用提供高可用性、一致性以及数据同步等服务。其设计目标是简化分布式环境下的复杂...

    从Paxos到Zookeeper分布式一致性原理与实践.pdf

    《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式一致性问题的书籍,其中涵盖了Paxos协议和Zookeeper系统的核心概念和技术。分布式一致性是构建大规模、高可用性、高可靠性的分布式系统的关键,...

    zk分布式锁1

    * 分布式任务锁:ZooKeeper分布式锁可以被用来实现分布式任务的锁机制,避免了任务执行的不确定性。 ZooKeeper分布式锁的缺点 ZooKeeper分布式锁的缺点包括: * 需要单独的ZooKeeper集群:ZooKeeper分布式锁需要...

    C#基于zookeeper分布式锁的实现源码

    ZooKeeper是一个高可用的分布式协调服务,它提供了诸如命名服务、配置管理、分布式同步、组服务等功能。本篇将深入探讨如何在C#中利用ZooKeeper实现分布式锁。 首先,我们需要理解ZooKeeper的基本操作,包括创建...

    ZooKeeper-分布式过程协同技术详解-最新版

    《ZooKeeper:分布式过程协同技术详解》是深入理解Zookeeper这一分布式协调服务的重要参考资料。Zookeeper是由Apache软件基金会开发的一个开源项目,主要用于解决分布式应用中的数据一致性问题,提供诸如命名服务、...

    ZooKeeper-分布式过程协同技术详解 PDF 高清完整版

    第1章介绍ZooKeeper可以做什么,以及其设计如何支撑这些任务。第2章介绍基本概念和基本组成模块,并通过命令行工具的具体操作介绍了ZooKeeper可以做什么。第二部分(第3~8章)阐述开发人员所需要掌握的ZooKeeper库...

Global site tag (gtag.js) - Google Analytics