`

"相对平均"分布算法

阅读更多

  有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.

   其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:

    1) "相对平均"怎么设计

    2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..

    3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.

 

例如:

    sid1: w1 w2 w3 w4

    sid2: w5

    sid3:w6

期望迁移之后:

    sid1:w1 w2

    sid2:w5 w3

    sid3:w4 w6

 

而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)

    sid1:w6 w5

    sid2:w2 w3

    sid3:w1 w4

   

经过提取,"相对平均"的设计代码如下,仅作备忘:

Java代码  收藏代码
  1. package com.test.demo.zookeeper;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.InputStreamReader;  
  5. import java.util.*;  
  6.   
  7. public class WorkersBalanceMain {  
  8.     private List<String> servers = new ArrayList<String>();  
  9.     private Map<String, List<String>> current = new HashMap<String, List<String>>();  
  10.     private Set<String> workers = new HashSet<String>();  
  11.   
  12.     public static void main(String[] args) {  
  13.         BufferedReader br = new BufferedReader(new InputStreamReader(System.in));  
  14.         String line;  
  15.         Set<String> servers = new HashSet<String>();  
  16.         WorkersBalanceMain balancer = new WorkersBalanceMain();  
  17.         try {  
  18.             while ((line = br.readLine()) != null) {  
  19.                 if (line.startsWith("addWorker")) {  
  20.                     balancer.addWorkers(line);  
  21.                 } else if (line.startsWith("addServer")) {  
  22.                     balancer.addServers(line);  
  23.                 } else {  
  24.                     System.out.println("???");  
  25.                     continue;  
  26.                 }  
  27.                 balancer.rebalance();  
  28.             }  
  29.         } catch (Exception e) {  
  30.             e.printStackTrace();  
  31.         }  
  32.         System.out.println("--END---");  
  33.     }  
  34.   
  35.     public void addServers(String source) {  
  36.         int index = source.indexOf(" ");  
  37.         if (index == -1) {  
  38.             return;  
  39.         }  
  40.         String[] values = source.substring(index + 1).split(" ");  
  41.         if (values == null || values.length == 0) {  
  42.             return;  
  43.         }  
  44.         for (String server : values) {  
  45.             servers.add(server);  
  46.             if(current.get(server) == null){  
  47.                 current.put(server,new ArrayList<String>());  
  48.             }  
  49.         }  
  50.     }  
  51.   
  52.     public void addWorkers(String source) {  
  53.         int index = source.indexOf(" ");  
  54.         if (index == -1) {  
  55.             return;  
  56.         }  
  57.         String[] values = source.substring(index + 1).split(" ");  
  58.         if (values == null || values.length == 0) {  
  59.             return;  
  60.         }  
  61.         //当有新的worker提交时,将咱有一台机器接管  
  62.         String sid = servers.get(0);  
  63.         List<String> sw = current.get(sid);  
  64.         if(sw == null){  
  65.             current.put(sid,new ArrayList<String>());  
  66.         }  
  67.         for (String worker : values) {  
  68.             workers.add(worker);  
  69.             sw.add(worker);  
  70.         }  
  71.   
  72.     }  
  73.   
  74.     public void rebalance() {  
  75.         try {  
  76.             if (workers.isEmpty()) {  
  77.                 return;  
  78.             }  
  79.             for (String sid : servers) {  
  80.                 if (current.get(sid) == null) {  
  81.                     current.put(sid, new ArrayList<String>());  
  82.                 }  
  83.             }  
  84.             //根据每个sid上的worker个数,整理成一个排序的map  
  85.             TreeMap<Integer, List<String>> counterMap = new TreeMap<Integer, List<String>>();  
  86.             for (Map.Entry<String, List<String>> entry : current.entrySet()) {  
  87.                 int total = entry.getValue().size();  
  88.                 List<String> sl = counterMap.get(total);  
  89.                 if (sl == null) {  
  90.                     sl = new ArrayList<String>();  
  91.                     counterMap.put(total, sl);  
  92.                 }  
  93.                 sl.add(entry.getKey());//sid  
  94.             }  
  95.             int totalWorkers = workers.size();  
  96.             int totalServers = current.keySet().size();  
  97.             int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数  
  98.             while (true) {  
  99.                 Map.Entry<Integer, List<String>> gt = counterMap.higherEntry(avg);  //大于平均数的列表, >= avg + 1  
  100.                 Map.Entry<Integer, List<String>> lt = counterMap.lowerEntry(avg); //与平均数差值为2的 <= arg  - 1  
  101.                 //允许任务个数与avg上线浮动1各个,不是绝对的平均  
  102.   
  103.                 if (gt == null || lt == null) {  
  104.                     break;  
  105.                 }  
  106.                 Integer gtKey = gt.getKey();  
  107.                 Integer ltKey = lt.getKey();  
  108.                 if (gt.getKey() - lt.getKey() < 2) {  
  109.                     break;  
  110.                 }  
  111.                 if (gt.getValue().size() == 0) {  
  112.                     counterMap.remove(gt.getKey());  
  113.                 }  
  114.                 if (lt.getValue().size() == 0) {  
  115.                     counterMap.remove(lt.getKey());  
  116.                 }  
  117.                 Iterator<String> it = gt.getValue().iterator(); //sid列表  
  118.                 while (it.hasNext()) {  
  119.                     String _fromSid = it.next();  
  120.                     List<String> _currentWorkers = current.get(_fromSid);  
  121.                     if (_currentWorkers == null || _currentWorkers.isEmpty()) {  
  122.                         it.remove();  
  123.                         current.remove(_fromSid);  
  124.                         continue;  
  125.                     }  
  126.                     List<String> _ltServers = lt.getValue();  
  127.                     if (_ltServers.isEmpty()) {  
  128.                         counterMap.remove(ltKey);  
  129.                         break;  
  130.                     }  
  131.                     //取出需要交换出去的任务id  
  132.                     int _currentWorkersSize = _currentWorkers.size();  
  133.                     String _wid = _currentWorkers.get(_currentWorkersSize - 1);  
  134.                     String _toSid = _ltServers.get(0);  
  135.                     //从_fromSid的worker列表中移除低workerId  
  136.                     //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部  
  137.                     //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"  
  138.                     _currentWorkers.remove(_currentWorkersSize - 1);  
  139.                     it.remove();  
  140.                     _ltServers.remove(0);  
  141.                     //将此workerId添加到_toSid的worker列表中  
  142.                     List<String> _ltWorkers = current.get(_toSid);  
  143.                     if (_ltWorkers == null) {  
  144.                         _ltWorkers = new ArrayList<String>();  
  145.                         current.put(_toSid, _ltWorkers);  
  146.                     }  
  147.                     _ltWorkers.add(_wid);  
  148.                     //将gt的key降低一个数字  
  149.                     List<String> _next = counterMap.get(gtKey - 1);  
  150.                     if (_next == null) {  
  151.                         _next = new ArrayList<String>();  
  152.                         counterMap.put(gtKey - 1, _next);  
  153.                     }  
  154.                     _next.add(_fromSid);  
  155.                     //将lt的key提升一个数字  
  156.                     List<String> _prev = counterMap.get(ltKey + 1);  
  157.                     //从lt的countMap中移除,因为它将被放置在key + 1的新位置  
  158.                     Iterator<String> _ltIt = _ltServers.iterator();  
  159.                     while (_ltIt.hasNext()) {  
  160.                         if (_ltIt.next().equalsIgnoreCase(_toSid)) {  
  161.                             _ltIt.remove();  
  162.                             break;  
  163.                         }  
  164.                     }  
  165.                     if (_prev == null) {  
  166.                         _prev = new ArrayList<String>();  
  167.                         counterMap.put(ltKey + 1, _prev);  
  168.                     }  
  169.                     _prev.add(_toSid);  
  170.                 }  
  171.             }  
  172.             //dump info  
  173.             for (Map.Entry<String, List<String>> entry : current.entrySet()) {  
  174.                 System.out.println("Sid:" + entry.getKey());  
  175.                 System.out.println(entry.getValue().toString());  
  176.             }  
  177.         } catch (Exception e) {  
  178.             e.printStackTrace();  
  179.         }  
  180.   
  181.     }  
  182.   
  183. }  

转自:http://shift-alt-ctrl.iteye.com/blog/1961598

分享到:
评论

相关推荐

    操作系统磁盘调度算法实验报告

    如果对磁道的访问请求是均匀分布的,当磁头到达磁盘的一端,并反向运动时落在磁头之后的访问请求相对较少。这是由于磁头的移动规律是圆形的,所以当磁头到达磁盘的一端时,磁头将反向运动,而不是继续向前移动。这时...

    字符串匹配算法之Horspool算法

    当模式串较短或模式串中字符分布较为均匀时,其搜索效率可能不如其他算法如KMP算法或Rabin-Karp算法。此外,对于包含大量随机字符的文本,Horspool算法的平均搜索速度可能接近于O(n),但在最坏情况下仍可能退化为O...

    数字加网图像处理调频加网抖动算法误差扩散算法C++实现

    Jarvis等人提出,也称为Jarvis-Judice-Ninke(JJN)算法,误差扩散权重分配更复杂,通常会提高图像质量,但计算量相对较大。 5. **Floyd-Steinberg算法**:由Alfred M. Floyd和Steven S. Steinberg开发,是最著名...

    算法分析期末复习

    - **稳定性**:插入排序是稳定的排序算法,即相同元素的相对位置不会改变。 - **时间复杂度**:平均情况下的时间复杂度为 O(n^2),其中 n 是待排序的元素数量。 - **快速排序** - **定义**:快速排序是一种高效的...

    图像MASK和Wallis匀光算法

    MATLAB作为一款强大的数值计算和数据可视化软件,拥有丰富的图像处理函数库,使得编写和调试这类算法变得相对简单。通过编写相应的MATLAB代码,可以方便地对图像进行预处理,实现光照的均衡化,从而提高后续分析的...

    AD采集滤波算法程序代码

    滑动平均滤波算法是在一个固定窗口内,对连续的数据进行平均,窗口随着数据的更新而移动。适用于需要快速响应的实时系统。 **优点**:计算简便,响应速度较快。 **缺点**:窗口大小的选择直接影响滤波效果,可能...

    NLM算法详解

    NLM算法的主要原理是将图像中的每个像素灰度值与周边区域的灰度分布进行整体对比,根据灰度分布的相似性来决定相应的权重,以此来加权平均周边像素的灰度值,达到去噪的目的。 NLM算法在处理图像时,首先确定一个...

    关于DILOF算法论文阅读详解1

    - **距离基**:广泛使用,适用于数据分布未知的情况,计算点之间的相对距离。 7. **应用场景**: DILOF特别适用于数据流环境,如物联网传感器数据、实时网络流量分析、金融交易监测等,这些场景中内存有限且数据...

    vwap算法交易详解

    VWAP(Volume Weighted Average Price)算法是一种经典的算法交易策略,它旨在通过分散执行大额订单来最小化市场冲击成本,同时尽可能地接近市场成交量加权平均价格进行交易。这种策略在金融工程领域尤其受到重视,...

    两个K均值算法+K中心点算法

    在提供的文件中,"K_CenterPoint_PAM.cpp"很可能是实现PAM算法的代码,而"K_means.cpp"和"K_AVG.cpp"可能分别实现了标准的K均值算法和某种变体,比如使用均值(K-Means++)或加权平均(K-AVG)来确定簇中心。...

    算法设计与分析2014.pdf

    4. **概率分析与随机化算法:** 概率分析和随机化算法考虑输入数据的概率分布或算法的随机选择,使得算法在平均意义下表现更好。 #### 考试和复习 由于描述中提到的文件是中科大研究生课程《算法设计与分析》的...

    极小化误差平方和算法聚类

    - **假设簇为凸形**: SSE算法假设数据分布是凸形的,对于非凸或不规则形状的簇可能无法得到理想的聚类结果。 - **固定簇数**: K值需要预先设定,如果实际数据的簇数量未知,可能需要多次尝试不同K值。 为了克服这些...

    鲁棒性时间序列算法Time-Series Processing using Google Earth Engine

    在计算加权平均时,该算法采用了一种自适应权重的策略,以便更好地适应不同的数据分布情况。 具体来说,鲁棒时间序列平滑算法的实现过程如下: 1. 首先,需要对数据进行预处理,包括选择合适的窗口大小和计算每个...

    内部排序算法分析

    - **适应性**:某些算法在特定的数据分布下表现更好,比如快速排序在近乎有序的数据集上效率降低,而插入排序则相对稳定。 6. **实际应用**: 内部排序算法广泛应用于数据库管理系统、数据分析、机器学习等领域,...

    LBG算法K-means

    另外,算法的复杂性相对较高,对大规模数据处理时可能效率较低。 总的来说,LBG算法和K-means都是聚类和矢量量化的重要工具,它们各自在特定场景下有其独特优势。理解并掌握这些算法,对于解决实际问题和优化数据...

    MIT 算法分析讲义

    4. **稳定性分析**:分析上述算法是否稳定,即相同值的相对顺序是否保持不变。 #### 九、第八章:中位数与序统计 **标题**:“Medians and Order Statistics” **知识点**: 1. **选择算法**:介绍如何有效地找到一...

    java 算法分析

    - **平均情况**:指算法在随机分布的输入情况下的平均表现。 通过这三种情况的分析,可以更全面地了解算法的性能特点。 ##### 2.3 渐近分析 渐近分析关注的是随着问题规模增大,算法效率的变化趋势。它主要关注的...

    5种摄像头算法,手机摄像头算法,C/C++

    双峰法是一种基于直方图的图像分割算法,其原理是寻找图像亮度直方图中的两个峰值,这两个峰值分别代表背景和前景的像素分布。通过找到这两个峰值并设定阈值,可以有效地将图像分为两部分。在智能车竞赛中,双峰法...

    非局部平均算法用于图像去噪.rar

    非局部平均算法(Non-Local Means,简称NL-means)是一种高效的图像去噪技术,尤其在保留图像细节方面表现出色。这种算法的核心思想是利用图像中的自相似性,即图像的某个像素点不仅受到其周围邻近像素的影响,还受...

Global site tag (gtag.js) - Google Analytics