`
QING____
  • 浏览: 2262939 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

"相对平均"分布算法备忘

    博客分类:
  • JAVA
 
阅读更多

    有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.

   其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:

    1) "相对平均"怎么设计

    2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..

    3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.

 

例如:

    sid1: w1 w2 w3 w4

    sid2: w5

    sid3:w6

期望迁移之后:

    sid1:w1 w2

    sid2:w5 w3

    sid3:w4 w6

 

而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)

    sid1:w6 w5

    sid2:w2 w3

    sid3:w1 w4

   

经过提取,"相对平均"的设计代码如下,仅作备忘:

package com.test.demo.zookeeper;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.*;

public class WorkersBalanceMain {
    private List<String> servers = new ArrayList<String>();
    private Map<String, List<String>> current = new HashMap<String, List<String>>();
    private Set<String> workers = new HashSet<String>();

    public static void main(String[] args) {
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        String line;
        Set<String> servers = new HashSet<String>();
        WorkersBalanceMain balancer = new WorkersBalanceMain();
        try {
            while ((line = br.readLine()) != null) {
                if (line.startsWith("addWorker")) {
                    balancer.addWorkers(line);
                } else if (line.startsWith("addServer")) {
                    balancer.addServers(line);
                } else {
                    System.out.println("???");
                    continue;
                }
                balancer.rebalance();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("--END---");
    }

    public void addServers(String source) {
        int index = source.indexOf(" ");
        if (index == -1) {
            return;
        }
        String[] values = source.substring(index + 1).split(" ");
        if (values == null || values.length == 0) {
            return;
        }
        for (String server : values) {
            servers.add(server);
            if(current.get(server) == null){
                current.put(server,new ArrayList<String>());
            }
        }
    }

    public void addWorkers(String source) {
        int index = source.indexOf(" ");
        if (index == -1) {
            return;
        }
        String[] values = source.substring(index + 1).split(" ");
        if (values == null || values.length == 0) {
            return;
        }
        //当有新的worker提交时,将咱有一台机器接管
        String sid = servers.get(0);
        List<String> sw = current.get(sid);
        if(sw == null){
            current.put(sid,new ArrayList<String>());
        }
        for (String worker : values) {
            workers.add(worker);
            sw.add(worker);
        }

    }

    public void rebalance() {
        try {
            if (workers.isEmpty()) {
                return;
            }
            for (String sid : servers) {
                if (current.get(sid) == null) {
                    current.put(sid, new ArrayList<String>());
                }
            }
            //根据每个sid上的worker个数,整理成一个排序的map
            TreeMap<Integer, List<String>> counterMap = new TreeMap<Integer, List<String>>();
            for (Map.Entry<String, List<String>> entry : current.entrySet()) {
                int total = entry.getValue().size();
                List<String> sl = counterMap.get(total);
                if (sl == null) {
                    sl = new ArrayList<String>();
                    counterMap.put(total, sl);
                }
                sl.add(entry.getKey());//sid
            }
            int totalWorkers = workers.size();
            int totalServers = current.keySet().size();
            int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数
            while (true) {
                Map.Entry<Integer, List<String>> gt = counterMap.higherEntry(avg);  //大于平均数的列表, >= avg + 1
                Map.Entry<Integer, List<String>> lt = counterMap.lowerEntry(avg); //与平均数差值为2的 <= arg  - 1
                //允许任务个数与avg上线浮动1各个,不是绝对的平均

                if (gt == null || lt == null) {
                    break;
                }
                Integer gtKey = gt.getKey();
                Integer ltKey = lt.getKey();
                if (gtKey - ltKey < 2) {
                    break;
                }
                if (gt.getValue().size() == 0) {
                    counterMap.remove(gt.getKey());
                }
                if (lt.getValue().size() == 0) {
                    counterMap.remove(lt.getKey());
                }
                Iterator<String> it = gt.getValue().iterator(); //sid列表
                while (it.hasNext()) {
                    String _fromSid = it.next();
                    List<String> _currentWorkers = current.get(_fromSid);
                    if (_currentWorkers == null || _currentWorkers.isEmpty()) {
                        it.remove();
                        current.remove(_fromSid);
                        continue;
                    }
                    List<String> _ltServers = lt.getValue();
                    if (_ltServers.isEmpty()) {
                        counterMap.remove(ltKey);
                        break;
                    }
                    //取出需要交换出去的任务id
                    int _currentWorkersSize = _currentWorkers.size();
                    String _wid = _currentWorkers.get(_currentWorkersSize - 1);
                    String _toSid = _ltServers.get(0);
                    //从_fromSid的worker列表中移除低workerId
                    //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部
                    //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"
                    _currentWorkers.remove(_currentWorkersSize - 1);
                    it.remove();
                    _ltServers.remove(0);
                    //将此workerId添加到_toSid的worker列表中
                    List<String> _ltWorkers = current.get(_toSid);
                    if (_ltWorkers == null) {
                        _ltWorkers = new ArrayList<String>();
                        current.put(_toSid, _ltWorkers);
                    }
                    _ltWorkers.add(_wid);
                    //将gt的key降低一个数字
                    List<String> _next = counterMap.get(gtKey - 1);
                    if (_next == null) {
                        _next = new ArrayList<String>();
                        counterMap.put(gtKey - 1, _next);
                    }
                    _next.add(_fromSid);
                    //将lt的key提升一个数字
                    List<String> _prev = counterMap.get(ltKey + 1);
                    //从lt的countMap中移除,因为它将被放置在key + 1的新位置
                    Iterator<String> _ltIt = _ltServers.iterator();
                    while (_ltIt.hasNext()) {
                        if (_ltIt.next().equalsIgnoreCase(_toSid)) {
                            _ltIt.remove();
                            break;
                        }
                    }
                    if (_prev == null) {
                        _prev = new ArrayList<String>();
                        counterMap.put(ltKey + 1, _prev);
                    }
                    _prev.add(_toSid);
                }
            }
            //dump info
            for (Map.Entry<String, List<String>> entry : current.entrySet()) {
                System.out.println("Sid:" + entry.getKey());
                System.out.println(entry.getValue().toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

 

1
1
分享到:
评论

相关推荐

    分布估计算法(Estimate Distributation Algorithms:EDAs)

    分布估计算法(Estimate Distribution Algorithms,简称EDAs)是一种基于概率模型的全局优化方法,主要应用于解决复杂的连续和离散优化问题。这些算法模仿自然选择和遗传进化过程,通过构建概率模型来模拟种群的遗传...

    webgis卡口摄像机点位圆形分布算法脚本

    卡口摄像机 圆形分布算法 根据一个中心点和半径输入摄像机数进行圆形显示

    分布估计算法讲解及matlab代码

    讲解了分布估计算法的基本原理,附带一个基于matlab实现的例子代码

    分布估计算法.ppt

    分布估计算法PPT,讲解了分布估计算法的基本原理

    最佳适应算法 分布式数据库 数据分布算法 数据集合

    在分布式数据库中,数据分布算法扮演着至关重要的角色,它们决定了数据如何在各个节点之间分布,以优化性能和平衡负载。"最佳适应算法"是一种特定的数据分布策略,尤其在内存管理或磁盘空间分配中常见,但同样可以...

    【智能优化算法】基于广义正态分布优化算法求解单目标优化问题附matlab代码.zip

    本文将深入探讨基于广义正态分布优化算法(Generalized Normal Distribution Optimization Algorithm, GNDOA)在解决单目标优化问题中的应用,并结合MATLAB代码进行详细解析。 广义正态分布优化算法是一种全局优化...

    分布估计算法综述

    分布估计算法(Estimation of Distribution Algorithms, EDA)是进化计算领域的一种新兴随机优化算法。这类算法在近年来迅速崛起,成为了进化计算领域的研究热点之一。与传统的遗传算法相比,EDA 不依赖于交叉和变异...

    备忘录算法

    备忘录算法是一种优化策略,通常用于解决重复子问题,以提高递归计算的效率。在上述代码中,我们看到一个名为"Distance"的函数,它应用了备忘录技术来计算两个字符串`str1`和`str2`的编辑距离。编辑距离,也称为...

    代码 复杂网络中度分布优化算法程序

    代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序代码 复杂网络中度分布优化算法程序...

    最大熵分布算法及其应用.pdf

    最大熵分布算法的核心是利用最大熵原理,这一原理最初由物理学家Edwin Jaynes提出,目的是为了在信息不完全的情况下,基于已知信息来估计概率分布,确保分布的均匀性和最大不确定性。基于种群进化的最大熵分布算法,...

    P39_排序截短平均算法_水声目标识别_

    排序截断平均算法(Order-Truncate-Average Algorithm,简称OTA)是一种针对性的噪声处理方法,尤其适用于存在多个信号线谱(如多个声源或噪声成分)的情况。这个算法的提出是为了更准确地估计“背景噪声”的均值,...

    MSA-master_连续平均法MSA_MSA-master_msa_MSA分配算法_连续平均算法_

    连续平均法(Multiple-Stage Assignment,简称MSA)是一种在交通网络分析中常用于模拟交通流量分配的算法。它是一种迭代方法,通过对交通需求和路网供给之间的动态调整,逐渐接近最优的交通分配状态。MSA在城市规划...

    加权平均 图像融合MATLAB算法 含融合源图像

    MATLAB提供了丰富的图像处理工具箱,包括函数和类,使得实现加权平均融合算法变得相对简单。而“加权平均”和“融合源图像”则强调了融合过程的核心技术和输入数据。 综上所述,理解并实现加权平均图像融合MATLAB...

    ml-cheatsheets:一些机器学习算法备忘单

    毫升备忘单我写的一些机器学习算法备忘单可能有错误

    分布估计算法研究

    分布估计算法是一种优化技术,起源于遗传算法,主要解决了遗传算法中因交叉和变异操作可能导致的建筑块(代表优秀解决方案的部分)破坏问题。分布估计算法的核心思想是通过对优势群体的概率分布进行估计和采样来生成...

    时域同步平均算法

    时域同步平均,A为振动数据,B为方位角数据,新手上路请大家多多指教。

    基于k-平均算法的数据聚类实验

    《基于k-平均算法的数据聚类实验》 在数据分析领域,数据聚类是一种无监督学习方法,用于将数据集中的对象或样本分组到不同的类别,使得同一类内的对象相似度较高,而不同类间的对象相似度较低。k-平均算法(k-...

    智能算法:Exponential Distribution Optimizer (EDO)指数分布优化器

    Exponential Distribution Optimizer (EDO)是一种用于解决优化问题的智能算法,其核心思想是利用指数分布的特性来探索和优化复杂多模态函数。在机器学习和工程领域,优化算法是寻找最优解的关键工具,尤其对于那些...

    PBIL分布估计EDA算法

    matlab程序,PBIL 分布估计 EDA算法

Global site tag (gtag.js) - Google Analytics