`
javahacker2
  • 浏览: 44145 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ZooKeeper实现分布式队列Queue

阅读更多

ZooKeeper实现分布式队列Queue

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

转载请注明出处:
http://blog.fens.me/zookeeper-queue
zookeeper-queue

前言
ZooKeeper是一个分步式的协作系统,何为协作,ZooKeeper价值又有何体现。通过这篇文章的分布式队列的案例,你将了解到ZooKeeper的强大。关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用

目录

  1. 分布式队列
  2. 设计思路
  3. 程序实现

1. 分布式队列

队列有很多种产品,大都是消息系统所实现的,像ActiveMQ,JBossMQ,RabbitMQ,IBM-MQ等。分步式队列产品并不太多,像Beanstalkd。

本文实现的分布式对列,是基于ZooKeeper现实的一种同步的分步式队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。

2. 设计思路

创建一个父目录 /queue,每个成员都监控(Watch)标志位目录/queue/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /queue/x(i)的临时目录节点,然后每个成员获取 /queue 目录的所有目录节点,也就是 x(i)。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /queue/start 的出现,如果已经相等就创建 /queue/start。

产品流程图
zookeeper-queue

应用实例
zookeeper-queue-dataprocess

图标解释

  1. app1,app2,app3,app4是4个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue,是znode的队列,假设队列长度为3
  4. /queue/x1,是znode队列中,1号排对者,由app1提交,同步请求,app1挂载等待
  5. /queue/x2,是znode队列中,2号排对者,由app2提交,同步请求,app2挂起等待
  6. /queue/x3,是znode队列中,3号排对者,由app3提交,同步请求,app3挂起等待
  7. /queue/start,当znode队列中满了,触发创建开始节点
  8. 当/qeueu/start被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束

注:

  • 1). 创建/queue/x1,/queue/x2,/queue/x3没有前后顺序,提交后程序就同步挂起。
  • 2). app1可以通过zk2提交,app2也可通过zk3提交
  • 3). app1可以提交3次请求,生成x1,x2,x3使用队列充满
  • 4). /queue/start被创建后,zk1会监听到这个事件,再告诉app1,队列已完成!

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,提交3个请求


   public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        joinQueue(zk, 1);
        joinQueue(zk, 2);
        joinQueue(zk, 3);
        zk.close();
    }

创建一个与服务器的连接


    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监听/queue/start创建的事件
            public void process(WatchedEvent event) {
                if (event.getPath() != null && event.getPath().equals("/queue/start") && event.getType() == Event.EventType.NodeCreated) {
                    System.out.println("Queue has Completed.Finish testing!!!");
                }
            }
        });
        return zk;
    }

出始化队列


    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => /queue/start");
        zk.exists("/queue/start", true);

        if (zk.exists("/queue", false) == null) {
            System.out.println("create /queue task-queue");
            zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue is exist!");
        }
    }

增加队列节点


    public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue/x" + x + " x" + x);
        zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        isCompleted(zk);
    }

检查队列是否完整


    public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
        int size = 3;
        int length = zk.getChildren("/queue", true).size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create /queue/start start");
            zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } 
    }

启动函数main


public static void main(String[] args) throws Exception {
    doOne();
}

运行结果:


WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3
create /queue/x2 x2
Queue Complete:2/3
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

完全符合我的们预期。接下来我们看分布式环境

2). 分布式模拟实验

模拟app1通过zk1提交x1,app2通过zk2提交x2,app3通过zk3提交x3


    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            joinQueue(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            joinQueue(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            joinQueue(zk, 3);
            break;
        }
    }

注:

  • 1). 为了简单起见,我们没有增加复杂的多线程控制的机制。
  • 2). 没有调用zk.close()方法,也就是说,app1执行完单独的提交,app1就结束了,但zk1还存在着,所以/queue/x1存在于队列。
  • 3). 程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3

zk-run1

执行app1–>zk1


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 4] ls /queue
[x10000000011]

执行app2–>zk2


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x2 x2
Queue Complete:2/3

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[x20000000012, x10000000011]

执行app3–>zk3


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[x30000000016, x10000000014, start, x20000000015]

/queue/stats被建立,打印出“Queue has Completed.Finish testing!!!”,代表调用app4完成!

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:


package org.conan.zookeeper.demo;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

public class QueueZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        joinQueue(zk, 1);
        joinQueue(zk, 2);
        joinQueue(zk, 3);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            joinQueue(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            joinQueue(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            joinQueue(zk, 3);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals("/queue/start")) {
                    System.out.println("Queue has Completed.Finish testing!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => /queue/start");
        zk.exists("/queue/start", true);

        if (zk.exists("/queue", false) == null) {
            System.out.println("create /queue task-queue");
            zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue is exist!");
        }
    }

    public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue/x" + x + " x" + x);
        zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
        int size = 3;
        int length = zk.getChildren("/queue", true).size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create /queue/start start");
            zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } 
    }

}
分享到:
评论

相关推荐

    分布式队列1

    ZooKeeper 实现分布式队列的核心在于其提供的节点创建、监控以及顺序节点特性。 在ZooKeeper中创建分布式队列时,首先需要指定一个根节点(root node),我们将其称为队列节点(queue node)。例如,可以创建一个名...

    JAVA面试题(Zookeeper、消息队列、分布式等最新的也有)

    本篇将详细探讨Java面试中涉及的一些关键概念,包括Zookeeper、消息队列以及分布式系统。 首先,Zookeeper是Apache Hadoop的一个子项目,它是一个分布式的,开放源码的分布式应用程序协调服务。Zookeeper提供了诸如...

    基于Go实现的分布式MQ消息队列

    消息队列(Message Queue, MQ)作为一种重要的中间件技术,在现代软件开发中扮演着关键角色,尤其在高并发、分布式系统环境中。本文将探讨一种基于Go语言实现的分布式消息队列——KiteQ,并深入剖析其设计理念和技术...

    zookeeper-3.4Windows版本

    4. **MQ(Message Queue)**:消息队列如RabbitMQ或Kafka本身,可以利用ZooKeeper作为分布式锁、集群管理和节点状态同步的工具,提升整个消息系统的稳定性和可靠性。 安装Zookeeper-3.4.9步骤大致如下: 1. 下载并...

    java6.0源码-zookeeper-mq:基于ZooKeeper的可靠高可用消息队列(原型)

    Zookeeper-MQ是一个基于Apache ZooKeeper构建的消息队列系统,它利用了ZooKeeper的分布式协调能力,实现了可靠的、高可用的消息传递功能。在深入探讨其源码之前,我们先来了解一下Zookeeper和消息队列的基本概念。 ...

    zookeeper应用场景实现demo及ppt资料

    1、master选举:mastersel 2、数据的发布和订阅:subscribe 3、负载均衡:balance 4、分布式锁:lock 5、分布式队列:queue 6、命名服务:nameservice 资料来自极客学院

    PyPI 官网下载 | queue_bqsr_status-0.16.tar.gz

    总结来说,`queue_bqsr_status`是一个针对生物信息学任务,特别是BQSR流程的分布式协调工具,它利用ZooKeeper实现分布式一致性,遵循云原生设计理念,提供高效可靠的任务管理能力。对于开发者来说,研究这个库可以...

    zookeeper常见应用场景简单实现及ppt

    1、master选举:mastersel 2、数据的发布和订阅:subscribe 3、负载均衡:balance 4、分布式锁:lock 5、分布式队列:queue 6、命名服务:nameservice 演示代码下载(代码来自极客学院演示demo):

    PyPI 官网下载 | AMQPQueue-0.3.5.tar.gz

    总结来说,AMQPQueue是一个Python库,用于与AMQP服务器交互,可能是为了实现分布式系统中的异步通信。它的特性可能包括与Zookeeper的集成,适应云环境的需求,并提供了丰富的功能来支持消息队列的管理和操作。使用这...

    PyPI 官网下载 | blueque-0.2.3.tar.gz

    本文将深入探讨从PyPI官网下载的"blueque-0.2.3.tar.gz"这一资源,它是一个包含blueque库的压缩包,主要用于实现分布式系统中的任务队列管理。通过分析其标签,我们可以了解到blueque与Zookeeper、分布式计算、云...

    PyPI 官网下载 | sqs_queue-0.1.1.tar.gz

    在使用`sqs_queue`时,如果项目中也引入了ZooKeeper,那么这个库可能提供了与ZooKeeper交互的接口,以实现更高级别的分布式一致性策略,例如选举主节点、分布式锁等。 "云原生"(Cloud Native)的标签暗示了`sqs_...

    Java分布式应用学习笔记

    在分布式系统中,消息队列(Message Queue, MQ)和发布/订阅(Publish/Subscribe, Pub/Sub)模型是关键组件。例如,Apache ActiveMQ和RabbitMQ是流行的Java消息中间件,它们负责在分布式组件间传递消息,确保异步...

    PyPI 官网下载 | mq_client_abstraction-0.0.32.tar.gz

    消息队列(Message Queue)在分布式系统中起着至关重要的作用,它允许系统组件之间通过异步通信进行解耦。mq_client_abstraction库的目标就是将这种复杂性隐藏起来,提供一个统一的接口,使得开发者可以轻松地切换...

    分布式系统原理与范型(第二版)

    5. 消息队列(Message Queue, MQ):通过消息传递来解耦分布式系统中的组件,提高系统的并行性和容错性。 6. 分布式数据库:如ACID(原子性、一致性、隔离性、持久性)和BASE(基本可用、软状态、最终一致性)模型...

    PyPI 官网下载 | mq_client_abstraction-0.0.38.tar.gz

    这个库是实现消息队列客户端抽象化的工具,对于理解和应用分布式系统、云原生技术具有重要价值。 首先,我们要理解什么是消息队列(Message Queue,MQ)。MQ是一种异步通信机制,它允许应用程序之间通过消息进行...

    PyPI 官网下载 | cdk-tweet-queue-1.0.153.tar.gz

    总的来说,cdk-tweet-queue-1.0.153.tar.gz是一个与云原生、分布式系统和Zookeeper集成的Python库,它提供了处理推文消息队列的能力,可能是为了简化在大规模分布式环境中的数据管理和处理。对这个库的深入理解和...

    PyPI 官网下载 | iterable-queue-1.2.0.tar.gz

    如果`iterable-queue`与ZooKeeper集成,那么它可能支持在分布式环境中实现高可用性和一致性。"云原生"则意味着这个库可能设计时考虑了微服务架构、容器化和持续部署等现代云技术的最佳实践。 至于"Python库",这是...

Global site tag (gtag.js) - Google Analytics