0 0

关于一个简单的负载均衡实现的问题(多线程相关)5

最近做一个Demo,里面涉及到一个客户端负载均衡的问题。基本问题如下:
1. A 服务依赖 B,C等服务(均为Rest风格服务,JSON作为数据协议)。
2. B,C等服务分别有N个节点,忽略权重(所有节点权重一样)。
3. A可以通过Zookeeper拿到每个依赖服务的节点列表。比如,A - [10.10.10.10, 10.10.10.11, 10.10.10.12], B - [10.10.10.20, 10.10.10.21, 10.10.10.22]
4. 现在我要做的是通过简单的统一包装类,封装请求的执行过程。负载均衡使用简单Polling模式。

问题:
Endpoints的next 方法是否需要加锁?为什么
目前的实现有什么问题?

下面是我的实现代码(基于JDK1.8):
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author: nealmi
 * Date: 4/9/14
 * Time: 10:24 PM
 */
public class PollingEndpointManagerImpl implements EndpointManager {
    private static final Log logger = LogFactory.getLog(PollingEndpointManagerImpl.class);
    private final Map<String, Endpoints> endpointsMapping = new ConcurrentHashMap<>(3, .75f, 16);
    private final RestTemplate restTemplate;
    @Autowired(required = false)
    private int maxRetries = 3;

    public PollingEndpointManagerImpl() {
        this.restTemplate = new RestTemplate();
    }

    public PollingEndpointManagerImpl(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    public PollingEndpointManagerImpl(RestTemplate restTemplate, int maxRetries) {
        this.restTemplate = restTemplate;
        this.maxRetries = maxRetries;
    }

    public int getMaxRetries() {
        return maxRetries;
    }

    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }

    @Override
    public void refresh(String key, List<String> endpoints) {
        endpointsMapping.put(key, Endpoints.create(key, endpoints));
    }

    private String next(Endpoints endpoints) {
        return endpoints.next();
    }

    @Override
    public String next(String key) {
        if (!endpointsMapping.containsKey(key)) {
            throw new RuntimeException("No endpoints for key:" + key);
        }
        return next(endpointsMapping.get(key));
    }

    @Override
    public <T> T get(String key, String uri, Class<T> t) {
        return doIt(() -> {
            String url = getFullUrl(key, uri);
            if (logger.isInfoEnabled()) {
                logger.info("Retrieving data from " + url);
            }
            ResponseEntity<T> responseEntity = restTemplate.getForEntity(url, t);
            return responseEntity.getBody();
        });
    }

    @Override
    public <T> T get(String key, String uri, ParameterizedTypeReference<T> t) {
        return doIt(() -> {
            String url = getFullUrl(key, uri);
            if (logger.isInfoEnabled()) {
                logger.info("Retrieving data from " + url);
            }
            ResponseEntity<T> responseEntity = restTemplate.exchange(url, HttpMethod.GET, null, t);
            return responseEntity.getBody();
        });
    }

    private <T> T doIt(RequestExecutor<T> requestExecutor) {
        int counter = 0;
        while (true) {
            try {
                return requestExecutor.execute();
            } catch (Exception e) {
                if (counter < maxRetries) {
                    counter++;
                    if (logger.isInfoEnabled()) {
                        logger.info("Request failure, retrying... counter= " + counter + ", error:" + e.getMessage());
                    }
                } else {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    private static interface RequestExecutor<T> {
        T execute();
    }

    private String getFullUrl(String key, String uri) {
        return "http://" + next(key) + uri;
    }

    private static class Endpoints {
        private String key;
        private int counter = 0;
        private List<String> endpoints = Lists.newArrayList();
        private ReentrantLock lock = new ReentrantLock();

        private Endpoints() {
        }

        private Endpoints(String key, List<String> endpoints) {
            this.key = key;
            this.endpoints = endpoints;
        }

        private static Endpoints create(String key, List<String> endpoints) {
            return new Endpoints(key, endpoints);
        }

        public String getKey() {
            return key;
        }

        public List<String> getEndpoints() {
            return endpoints;
        }

        public String next() {
            try {
                lock.tryLock(10, TimeUnit.SECONDS);
                if (counter >= endpoints.size()) {
                    counter = (0);
                }
                String endpoint = endpoints.get(counter);
                counter++;
                return endpoint;
            } catch (Exception e) {
                logger.error("Error when getting next endpoint. counter=" + counter + "  endpoints.size=" + endpoints.size(), e);
            } finally {
                lock.unlock();
            }

            return null;
        }
    }
}


2014年4月21日 16:45

1个答案 按时间排序 按投票排序

0 0

Endpoints的next需要加锁,它负责轮训,如果多个线程同一时间执行到
if (counter >= endpoints.size()) { 
    counter = (0); 

有可能连续二次都是从endpoints.get(0)的服务执行,也就没法保证轮询的正确性,或者count++也是多线程同时读的0,有个可能线程1, 0++,现场 0++。

你用到了最新的语法,还没具体看新的语法,不会。不过看程序大致是一个前端的一个
负载均衡,

2014年4月22日 08:02

相关推荐

    java 简易负载均衡例子

    本示例是一个基于Java的简易负载均衡实现,利用了Java的Selector机制。Selector在Java NIO(非阻塞I/O)中扮演着核心角色,它允许单线程监控多个通道的事件,如连接请求、数据读写等。 首先,我们来看`lbs.json`...

    基于OpenMP多线程动态负载均衡技术研究.pdf

    综合以上内容,可以看出基于OpenMP的多线程动态负载均衡技术研究是一个涉及多个方面的复杂领域,包括并行编程模型、性能优化、编译时成本模型、调度策略以及在多核和分布式系统中的实际应用等。这些研究对于优化并行...

    负载均衡模拟

    在这个名为“负载均衡v1.1”的压缩包中,可能包含了源代码文件、配置文件以及相关的文档,展示了如何在Linux环境下实现多线程负载均衡器。通过分析这些文件,我们可以深入理解如何利用C语言或C++来设计和实现这样一...

    负载均衡系统

    在这个负载均衡系统中,多线程技术用于并发地处理来自客户端的请求,提高系统的并行处理能力。线程间需要协调通信,避免数据冲突,这就需要用到线程同步机制。 3. **条件变量与临界区**:条件变量是一种线程同步...

    c#多线程socket开发(一个服务器对多个客户端)

    C#语言开发多线程Socket服务器端程序,实现一个服务器同时与多个客户端连接对话。这里,我们将详细讲解如何使用C#语言开发多线程Socket服务器端程序,实现一个服务器同时与多个客户端连接对话。 多线程Socket服务器...

    负载均衡调度器程序

    总之,这个负载均衡调度器程序通过多线程实现了高效的数据分发,并通过配置文件灵活调整参数。通过对配置文件的解析,程序能够动态适应不同的服务器环境和负载需求。这种设计模式在大型分布式系统中具有广泛的应用...

    多线程编程实现矩阵乘法

    在计算机科学中,多线程编程是一种技术,...但实现过程中需要注意线程同步、通信和负载均衡等问题,以确保正确性和性能。通过持续的优化,我们可以利用现代多核处理器的全部潜力,使得复杂计算任务在更短的时间内完成。

    多线程实现的五种不同排序

    在本资源中,"多线程实现的五种不同排序"提供了VC++(Visual C++)源码,这对于学习多线程编程以及数据结构排序的初学者来说是一个很好的实践材料。下面将详细介绍这五个不同的排序算法及其在多线程环境下的实现。 ...

    用asio自封装负载均衡网络库、定时器、线程池

    2. **负载均衡**:在多线程环境中,负载均衡是确保系统资源有效利用的关键。可以实现一个策略接口,如轮询、最少连接数等,然后在接收到新的连接请求时,根据策略选择合适的线程或工作单元来处理。 3. **线程池**:...

    HAProxy+Nginx实现负载均衡

    以下是一个简单的部署案例: - **环境配置**: - **HAProxy服务器**:CentOS 6.5 x86_64,IP地址为192.168.25.5,使用HAProxy版本1.4.24。 - **Nginx服务器1**:CentOS 6.5 x86_64,IP地址为192.168.25.3,使用...

    Nginx负载均衡案例

    在这个案例中,我们将探讨如何利用Nginx配置实现简单的负载均衡策略,并通过多线程测试来验证其性能。 首先,Nginx的负载均衡功能主要通过其`upstream`模块实现。在配置文件中,我们需要定义一个`upstream`块,列出...

    Memcached负载均衡Jar包

    Xmemcached是一个功能强大的Java客户端,提供了线程安全、高性能的连接池管理,以及基于一致性哈希的负载均衡算法。一致性哈希是一种分布式哈希表算法,它的主要优点在于当新的服务器加入或离开集群时,只有较少的...

    IIS负载均衡详解

    Application Request Routing是IIS的一项功能,它作为一个反向代理,可以接收HTTP请求并智能地将其分发到后端服务器群,以实现负载均衡。ARR不仅能够提供故障转移和负载分散,还能帮助实现服务器间的会话持久化,...

    完美解决多应用服务器负载均衡环境下spring quartz同一定时任务重复执行问题

    在多应用服务器负载均衡环境下,Spring Quartz定时任务的重复执行问题是一个常见的挑战。Spring Quartz是一个强大的、开源的作业调度框架,允许开发者定义和执行复杂的定时任务。然而,当多个服务器实例并行运行时,...

    libevent 多线程 HTTP post服务器

    6. **服务器架构设计**:设计一个高效的多线程HTTP POST服务器要考虑的因素包括请求调度、负载均衡、错误处理、性能优化等。 7. **测试与调试**:"libeventTest"可能包含的测试代码用于验证服务器的正确性和性能,...

    向量在同一个块中用多个线程执行

    多线程编程的关键在于合理地分配任务,确保负载均衡,避免出现某些线程过早完成任务而其他线程还在忙碌的情况,这被称为线程阻塞。同时,还需要注意线程间的通信和同步问题,防止数据竞争和死锁的发生。在单线程块中...

    java简单分布式架构,多个数据源,线程池多线程访问

    本项目围绕“Java简单分布式架构,多个数据源,线程池多线程访问”这一主题展开,旨在通过利用Java技术栈实现一个高效的分布式系统。 首先,我们关注的是“分布式”这一概念。分布式系统是由多台计算机通过网络连接...

    低开销的分布式数据库负载均衡技术.pdf

    一个好的负载均衡策略能够确保系统的各个组件都能得到高效利用,避免过载或空闲的情况发生,尤其是在多个服务器之间分发用户请求的场景中至关重要。 文章进一步阐述了提出的两层负载均衡策略。第一层是基于数据访问...

Global site tag (gtag.js) - Google Analytics