`
chenjingbo
  • 浏览: 460571 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

LSF系列-使用zookeeper实现的简单的集群服务管理

 
阅读更多

      要实现的功能很简单,就是一个集群注册服务.对于某一个服务,都可以有对应的多个服务地址,当某个服务机器开始提供服务的时候,就 把自己的IP地址注册上去,而对应客户端来说,就是获取对应的服务机器IP列表.而zk会知道每个服务机器的服务状态.

 

本代码没有经过线上验证..仅供参考

 

     对应的接口很简单.

package zhenghui.lsf.configserver.service;

/**
 * User: zhenghui
 * Date: 13-12-22
 * Time: 下午4:57
 * 集群注册服务.
 */
public interface AddressService {

    /**
     * 设置目标服务的地址
     * serviceUniqueName 对应接口的标识符
     *
     */
    public void setServiceAddresses(String serviceUniqueName,
                                    String address);

    /**
     * 获取目标服务的地址
     *
     * @param serviceUniqueName
     * @return String 当没有可用的服务地址的时候,将会返回null
     */
    public String getServiceAddress(String serviceUniqueName);

}

 

对应的实现类如下

 

 

AddressComponent.java

 

 

package zhenghui.lsf.configserver.impl;

import org.springframework.beans.factory.InitializingBean;
import zhenghui.lsf.configserver.service.AddressService;

import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * User: zhenghui
 * Date: 13-12-22
 * Time: 下午4:58
 * 基于zk实现.根据简单原则,不做高级路由处理.直接用随机来做路由.
 */
public class AddressComponent extends ZookeeperWatcher implements AddressService, InitializingBean {

    private AtomicBoolean inited = new AtomicBoolean(false);

    private static final int DEFAULT_TIME_OUT = 30000;

    /**
     * 服务地址cache
     */
    private ConcurrentHashMap<String, Future<List<String>>> serviceAddressCache = new ConcurrentHashMap<String, Future<List<String>>>();

    /**
     * zk服务器的地址.
     */
    private String zkAdrress = "10.125.195.174:2181";

    @Override
    public void setServiceAddresses(String serviceUniqueName, String address) {
        String path = DEFAULT_SERVER_PATH + separator + serviceUniqueName;
        createPath(path, address);
    }

    private void init() throws Exception {
        // 避免被初始化多次
        if (!inited.compareAndSet(false, true)) {
            return;
        }
        createConnection(zkAdrress, DEFAULT_TIME_OUT);
    }

    @Override
    public String getServiceAddress(String serviceUniqueName) throws ExecutionException, InterruptedException {
        final String path = DEFAULT_SERVER_PATH + separator + serviceUniqueName;
        List<String> addressList;

        Future<List<String>> future = serviceAddressCache.get(path);
        if(future == null){
            FutureTask<List<String>> futureTask = new FutureTask(new Callable<List<String>>() {
                public List<String> call() {
                    return getChildren(path, true);
                }
            });
            Future<List<String>> old = serviceAddressCache.putIfAbsent(path, futureTask);
            if (old == null) {
                futureTask.run();
                addressList = futureTask.get();
            } else {
                addressList = old.get();
            }
        } else {
            addressList = future.get();
        }

        return addressList.get(new Random().nextInt(addressList.size()));
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        init();
    }

    public void setZkAdrress(String zkAdrress) {
        this.zkAdrress = zkAdrress;
    }

    @Override
    protected void addressChangeHolder(String path) {
        serviceAddressCache.remove(path);
    }
}
  

 

 

ZookeeperWatcher.java

 

 

 

package zhenghui.lsf.configserver.impl;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zhenghui.lsf.exception.LSFException;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * User: zhenghui
 * Date: 13-12-25
 * Time: 下午4:13
 * 一些zk的封装
 * 这里注意,别忘记初始化zk的path.比如创建节点的时候,path是 "/zhenghui/lsf/address/interfacename:1.0.0" 那么请保证 "/zhenghui/lsf/address"节点是存在的,否则会报错.
 */
public abstract class ZookeeperWatcher implements Watcher {

    private Logger logger = LoggerFactory.getLogger(ZookeeperWatcher.class);

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    private ZooKeeper zk;

    protected static final String DEFAULT_SERVER_PATH = "/zhenghui/lsf/address";

    /**
     * 节点path的后缀
     */
    private static final String DEFAULT_PATH_SUFFIX = "zhenghui";

    protected static final String separator = "/";

    private static final String charset_utf8 = "utf-8";

    private Stat stat = new Stat();

    /**
     * 用来记录watch被调用次数
     */
    AtomicInteger seq = new AtomicInteger();

    /**
     * 地址变更,需要做对应的处理.比如缓存清理等
     */
    abstract protected void addressChangeHolder(String path);

    /**
     * 创建zk连接
     *
     */
    protected void createConnection(String connectString, int sessionTimeout) throws LSFException {
        //先关闭连接
        releaseConnection();
        try {
            zk = new ZooKeeper(connectString, sessionTimeout, this);
            logger.info(connectString + "开始连接ZK服务器");
            connectedSemaphore.await();
        } catch (Exception e) {
            logger.error("zhenghui.lsf.configserver.impl.AddressComponent.createConnection error");
            throw new LSFException("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createConnection error", e);
        }
    }

    /**
     * 关闭ZK连接
     */
    protected void releaseConnection() {
        if (zk != null) {
            try {
                this.zk.close();
            } catch (Exception e) {
                logger.error("zhenghui.lsf.configserver.impl.AddressComponent.releaseConnection error");
            }
        }
    }

    /**
     * 创建对应的节点.
     */
    protected boolean createPath(String path, String data) {
        try {
            //先判断path是否存在
            Stat stat = exists(path, true);
            //如果不存在,则创建
            if(stat == null){
                this.zk.create(path,"zhenghui".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                logger.info("父节点创建成功.path= " + path);
            }
            String childPath = path + separator + DEFAULT_PATH_SUFFIX;
            this.zk.create(childPath,data.getBytes(charset_utf8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            logger.info("子节点创建成功.path= " + childPath);
            return true;
        } catch (Exception e) {
            logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createPath",e);
            return false;
        }
    }

    protected Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 获取子节点
     *
     * @param path 节点path
     */
    protected List<String> getChildren(String path, boolean needWatch) {
        try {
            List<String> newServerList = new ArrayList<String>();
            List<String> subList = this.zk.getChildren(path, needWatch);
            if(subList != null && !subList.isEmpty()){
                for (String subNode : subList) {
                    // 获取每个子节点下关联的server地址
                    byte[] data = zk.getData(path + separator + subNode, false, stat);
                    newServerList.add(new String(data, charset_utf8));
                }
            }
            return newServerList;
        } catch (Exception e) {
            logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.getChildren", e);
            return null;
        }
    }

    @Override
    public void process(WatchedEvent event){
//        try {
//            Thread.sleep(300);
//        } catch (Exception e) {}
        if (event == null) return;

        String logPrefix = "Watch-" + seq.incrementAndGet() + ":";
        logger.info(logPrefix + event.toString());
        // 连接状态
        Watcher.Event.KeeperState keeperState = event.getState();
        // 事件类型
        Watcher.Event.EventType eventType = event.getType();
        // 受影响的path
        String path = event.getPath();
        if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
            // 成功连接上ZK服务器
            if (Watcher.Event.EventType.None == eventType) {
                logger.info(logPrefix + "成功连接上ZK服务器");
                connectedSemaphore.countDown();
            }  else if (Watcher.Event.EventType.NodeChildrenChanged == eventType) {
                logger.info(logPrefix + "子节点变更");
                //如果是 DEFAULT_SERVER_PATH下面的接口变动,则说明是新增接口,不需要触发holder
                if(!path.equals(DEFAULT_SERVER_PATH)){
                    addressChangeHolder(path);
                }
            }
        }
        //下面可以做一些重连的工作.
        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
            logger.error(logPrefix + "与ZK服务器断开连接");
        } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
            logger.error(logPrefix + "权限检查失败");
        } else if (Watcher.Event.KeeperState.Expired == keeperState) {
            logger.error(logPrefix + "会话失效");
        }
    }
}

 

     

分享到:
评论

相关推荐

    LSF集群使用命令说明

    本文将基于提供的IBM Platform LSF Command Reference Version 8.3文档,对其中部分关键命令进行详细介绍,以便更好地理解和使用LSF集群。 #### bacct **命令功能:** `bacct`命令用于显示已完成任务的会计统计信息...

    lsf-admin(openlava)

    Spectrum LSF是IBM推出的一款高性能的集群作业调度和管理软件,旨在优化资源的使用,提高计算效率。OpenLava是基于LSF API的一个开源版本,提供类似功能。 ### LSF集群管理 #### 1. 管理集群 - **LSF守护进程启动...

    spectrum-lsf-10.1.0-documentation.pdf

    IBM Spectrum LSF 10.1.0 是一种高性能的作业调度和集群管理软件,旨在帮助用户提高计算资源的利用率和效率。本文档提供了 LSF 10.1.0 的详细文档,包括快速入门指南、发行说明、新增功能、安全性和许可证信息等。 ...

    LSF简易使用手册

    LSF,全称为Load Sharing Facility,是Platform公司开发的一款分布式资源管理系统,主要用于集群环境中的任务调度、监控和负载分析。它的主要目标是对集群资源进行统一的管理和调度,确保高效利用计算资源。 ### ...

    TTP229-LSF 8-16按键触摸 原厂规格书

    TTP229-LSF TonTouchTM IC是一款使用电容感应式原理设计的触摸芯片。此芯片内建稳压电路供触摸传感器使用,稳定的触摸效果可以应用在各种不同应用上,人体触摸面板可以通过非导电性绝缘材料连接,主要应用是以取代...

    HPC+LSF+LSF管理员手册

    《HPC+LSF+LSF管理员手册》是针对IBM Spectrum LSF v10 Release 1的管理员指南,旨在帮助用户有效地管理和监控基于高性能计算(HPC)的集群环境。LSF(Load Sharing Facility)是一种分布式作业调度系统,用于优化...

    lsf-python-api:LSF Python包装器的位置,用于控制LSF的所有内容

    它们可以与各种版本的LSF一起使用,并且由LSF开发来维护,尽管我们从开源社区中获得了贡献。 如果您计划或希望为该库做出贡献,则必须遵循此存储库根目录中随附的中的DCO流程。 从本质上讲,它要求您在拉取请求的...

    LSF License Scheduler 安装管理手册

    LSF(Load Sharing Facility)是IBM开发的一种高性能计算(HPC)集群作业调度系统,用于高效管理和优化资源分配。在IBM Spectrum LSF Version 10 Release 1.0中,"LSF License Scheduler" 是一个专门针对许可证管理...

    LSF指令手册

    - **资源管理**:LSF能够自动发现并监控集群中的计算节点,合理分配计算任务到各个节点上运行,支持动态资源调整。 - **负载均衡**:通过智能的负载均衡算法,LSF能够确保计算任务均匀地分布在集群内的各个节点上,...

    SU(6)lsfÃ-HQSS模型内的α激发态

    我们已经回顾了Romanets等人的统一耦合通道模型中使用的重归一化程序。 (Phys Rev D 85:114032,2012),及其对C = 1,S = -2和I = 0的影响,最近LHCb协作观察到了五个c(ˆ)状态 。 在模型中使用的介子-重子相互...

    论文研究-基于LSF集群技术的网格并行编译服务模型.pdf

    针对目前单机编译环境中编译资源局限、编译作业执行时间过长等问题,通过对网格集群技术的研究,提出了一种基于集群技术的网格并行编译服务模型。该模型中首先对编译作业进行分解,并依据作业调度算法,把分解后的元...

    LSF-260型砂水分离器技术说明.pdf

    LSF-260型砂水分离器技术说明.pdf

    lsf-stats:Web 浏览器中 lsf 使用情况统计的实时图表

    #lsf-stats lsf-stats使用带有实时散点图和历史图的 IBM 平台负载共享设施 (LSF) 显示集群的资源利用率统计信息。 这些实时图表可在网络浏览器中查看。 此应用程序是使用 、 、 和 api 构建的。 安装 make 打开...

    LSF_User_Guide.pdf

    - **资源管理**:LSF能够动态地监控和管理集群中的硬件资源,如CPU、内存等。 - **故障恢复**:LSF具备强大的容错机制,能够在节点出现故障时自动重启任务。 #### 资源概念 - **计算节点**:集群中的物理机器或...

    HPC+LSF+用户使用指南

    - **主机**:集群中的每台机器都是一个主机,可以是提供计算服务的工作节点或提供服务的管理节点。 3. **LSF守护进程** - **LSF守护进程**:LSF运行在集群的各个节点上,由多个守护进程组成,如Master Daemon(主...

    lsf-kubernetes:适用于IBM Spectrum LSF的HPC兼容Kubernetes集成的主页

    在共享的多租户环境中为不同的使用者和工作负载提供改进的服务水平 优化了通用图形处理单元(GPGPU)等昂贵资源的使用,以帮助确保分配给他们最重要的工作 部署选项 LSF作为Kubernetes的调度程序 想要为Kubernetes...

    lsf-perl-api:LSF Perl模块用来操纵所有LSF的位置

    LSF Perl API则是一个专门为Perl编程语言设计的接口,允许开发者通过Perl脚本与LSF系统进行交互,实现对作业提交、监控、控制等一系列操作。 **Perl API的核心功能:** 1. **作业提交**:Perl API提供函数来创建并...

    Platform LSF

    Platform LSF的核心功能包括作业调度、作业管理、集群管理、资源管理、性能监控等。它通过一个集中式的调度器来统一管理集群中的计算资源,调度器根据预设的策略将计算任务分配给集群中的各个计算节点。同时,...

    ADGG-LSF-MSF-v1.0:ADGG-LSF-MSF-v1.0

    建立React材料UI Redux 草稿JS ChartJS 棱镜JS React降价React完整日历快速开始安装依赖项: npm install或yarn 启动服务器: npm run start或yarn start 视图处于打开状态: localhost:3000 推荐最新的...LSF-MSF-v1.0

Global site tag (gtag.js) - Google Analytics