`
m635674608
  • 浏览: 5052189 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

JedisCluster 源码分析

 
阅读更多
  BinaryJedisCluster 
  public String set(final byte[] key, final byte[] value) {
    return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value);
      } 
    }.runBinary(key);
  }

   

JedisClusterCommand
 public T runBinary(byte[] key) {
    if (key == null) {
      throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
    }

    return runWithRetries(key, this.redirections, false, false);
  }

 private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {
    if (redirections <= 0) {
      throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
    }

    Jedis connection = null;
    try {

      if (asking) {
        // TODO: Pipeline asking with the original command to make it
        // faster....
        connection = askConnection.get();
        connection.asking();

        // if asking success, reset asking flag
        asking = false;
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
//获取 连接
          connection = connectionHandler.getConnectionFromSlot(
//获取槽 
JedisClusterCRC16.getSlot(key));
        }
      }

      return execute(connection);
    } catch (JedisConnectionException jce) {
      if (tryRandomNode) {
        // maybe all connection is down
        throw jce;
      }

      // release current connection before recursion
      releaseConnection(connection);
      connection = null;

      // retry with random connection
      return runWithRetries(key, redirections - 1, true, asking);
    } catch (JedisRedirectionException jre) {
      // if MOVED redirection occurred,
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache
        // recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }

      // release current connection before recursion or renewing
      releaseConnection(connection);
      connection = null;

      if (jre instanceof JedisAskDataException) {
        asking = true;
      askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
      } else if (jre instanceof JedisMovedDataException) {
      } else {
        throw new JedisClusterException(jre);
      }

      return runWithRetries(key, redirections - 1, false, asking);
    } finally {
      releaseConnection(connection);
    }
  }

 @Override
  public Jedis getConnectionFromSlot(int slot) {
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // It can't guaranteed to get valid connection because of node
      // assignment
      return connectionPool.getResource();
    } else {
      return getConnection();
    }
  }

  

public abstract class JedisClusterConnectionHandler {
  protected final JedisClusterInfoCache cache;

  public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
                                       final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout);
    //通过slot 初始化集群信息
    initializeSlotsCache(nodes, poolConfig);
  }

  abstract Jedis getConnection();

  abstract Jedis getConnectionFromSlot(int slot);

  public Jedis getConnectionFromNode(HostAndPort node) {
    cache.setNodeIfNotExist(node);
    return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource();
  }

public class JedisClusterInfoCache {
  private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
  private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();

  private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  private final Lock r = rwl.readLock();
  private final Lock w = rwl.writeLock();
  private final GenericObjectPoolConfig poolConfig;

  private int connectionTimeout;
  private int soTimeout;

  private static final int MASTER_NODE_INDEX = 2;

  public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {
    this(poolConfig, timeout, timeout);
  }

  public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,
      final int connectionTimeout, final int soTimeout) {
    this.poolConfig = poolConfig;
    this.connectionTimeout = connectionTimeout;
    this.soTimeout = soTimeout;
  }

  public void discoverClusterNodesAndSlots(Jedis jedis) {
    w.lock();

    try {
      this.nodes.clear();
      this.slots.clear();

      List<Object> slots = jedis.clusterSlots();

      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;

        if (slotInfo.size() <= MASTER_NODE_INDEX) {
          continue;
        }

        List<Integer> slotNums = getAssignedSlotArray(slotInfo);

        // hostInfos
        int size = slotInfo.size();
        for (int i = MASTER_NODE_INDEX; i < size; i++) {
          List<Object> hostInfos = (List<Object>) slotInfo.get(i);
          if (hostInfos.size() <= 0) {
            continue;
          }

          HostAndPort targetNode = generateHostAndPort(hostInfos);
          setNodeIfNotExist(targetNode);
          if (i == MASTER_NODE_INDEX) {
            assignSlotsToNode(slotNums, targetNode);
          }
        }
      }
    } finally {
      w.unlock();
    }
  }
  //初始化集群信息 
  public void discoverClusterSlots(Jedis jedis) {
    w.lock();

    try {
      this.slots.clear();
      //通过 slots 命令获取集群信息
      List<Object> slots = jedis.clusterSlots();

      for (Object slotInfoObj : slots) {
        List<Object> slotInfo = (List<Object>) slotInfoObj;

        if (slotInfo.size() <= 2) {
          continue;
        }

        List<Integer> slotNums = getAssignedSlotArray(slotInfo);

        // hostInfos
        List<Object> hostInfos = (List<Object>) slotInfo.get(2);
        if (hostInfos.size() <= 0) {
          continue;
        }

        // at this time, we just use master, discard slave information
        HostAndPort targetNode = generateHostAndPort(hostInfos);

        setNodeIfNotExist(targetNode);
        assignSlotsToNode(slotNums, targetNode);
      }
    } finally {
      w.unlock();
    }
  }

  private HostAndPort generateHostAndPort(List<Object> hostInfos) {
    return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
        ((Long) hostInfos.get(1)).intValue());
  }

  public void setNodeIfNotExist(HostAndPort node) {
    w.lock();
    try {
      String nodeKey = getNodeKey(node);
      if (nodes.containsKey(nodeKey)) return;

      JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
              connectionTimeout, soTimeout, null, 0, null);
      nodes.put(nodeKey, nodePool);
    } finally {
      w.unlock();
    }
  }

  public void assignSlotToNode(int slot, HostAndPort targetNode) {
    w.lock();
    try {
      JedisPool targetPool = nodes.get(getNodeKey(targetNode));

      if (targetPool == null) {
        setNodeIfNotExist(targetNode);
        targetPool = nodes.get(getNodeKey(targetNode));
      }
      slots.put(slot, targetPool);
    } finally {
      w.unlock();
    }
  }

  public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
    w.lock();
    try {
      JedisPool targetPool = nodes.get(getNodeKey(targetNode));

      if (targetPool == null) {
        setNodeIfNotExist(targetNode);
        targetPool = nodes.get(getNodeKey(targetNode));
      }

      for (Integer slot : targetSlots) {
        slots.put(slot, targetPool);
      }
    } finally {
      w.unlock();
    }
  }

  public JedisPool getNode(String nodeKey) {
    r.lock();
    try {
      return nodes.get(nodeKey);
    } finally {
      r.unlock();
    }
  }

  public JedisPool getSlotPool(int slot) {
    r.lock();
    try {
      return slots.get(slot);
    } finally {
      r.unlock();
    }
  }

  public Map<String, JedisPool> getNodes() {
    r.lock();
    try {
      return new HashMap<String, JedisPool>(nodes);
    } finally {
      r.unlock();
    }
  }

  public static String getNodeKey(HostAndPort hnp) {
    return hnp.getHost() + ":" + hnp.getPort();
  }

  public static String getNodeKey(Client client) {
    return client.getHost() + ":" + client.getPort();
  }

  public static String getNodeKey(Jedis jedis) {
    return getNodeKey(jedis.getClient());
  }

  private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
    List<Integer> slotNums = new ArrayList<Integer>();
    for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
        .intValue(); slot++) {
      slotNums.add(slot);
    }
    return slotNums;
  }

}

  
  public Map<String, JedisPool> getNodes() {
    return cache.getNodes();
  }

  private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {
    for (HostAndPort hostAndPort : startNodes) {
      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
      try {
        cache.discoverClusterNodesAndSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
        // try next nodes
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }

    for (HostAndPort node : startNodes) {
      cache.setNodeIfNotExist(node);
    }
  }

  public void renewSlotCache() {
    for (JedisPool jp : getShuffledNodesPool()) {
      Jedis jedis = null;
      try {
        jedis = jp.getResource();
        cache.discoverClusterSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
        // try next nodes
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
  }

  public void renewSlotCache(Jedis jedis) {
    try {
      cache.discoverClusterSlots(jedis);
    } catch (JedisConnectionException e) {
      renewSlotCache();
    }
  }

  protected List<JedisPool> getShuffledNodesPool() {
    List<JedisPool> pools = new ArrayList<JedisPool>();
    pools.addAll(cache.getNodes().values());
    Collections.shuffle(pools);
    return pools;
  }
}

   总结:

  1.JedisCluster 会初始化一个 连接获取集群信息通过 solts 命令。(JedisClusterInfoCache 构造方法初始化)

  2.get ,set 的时候。会通过key JedisClusterCRC16.getSlot(key) 定位到solt

  3. 然后根据solt获取 jedis

public Jedis getConnectionFromSlot(int slot) {
    JedisPool connectionPool = cache.getSlotPool(slot);

  4.执行操作

 

  读操作:主库,从库都会读

  写操作:主库写

分享到:
评论

相关推荐

    redis集群环境搭建以及java中jedis客户端集群代码实现

    JedisCluster jedisCluster = new JedisCluster(nodes, poolConfig); // 在集群中执行命令 String result = jedisCluster.get("key"); jedisCluster.set("key", "value"); // 关闭JedisCluster实例 jedisCluster....

    Redis学习实践 - 适合初学者 从0到精通

    redis命令实践 Redis数据结构、原理分析、应用实战 什么是Redis Redis的作用 ...Redis实战及源码分析 分布式锁实战 管道模式 Redis的应用架构 缓存与数据一致性问题 缓存雪崩与缓存穿透 布隆过滤器

    Redis学习实践 - 超实用超详细

    redis基本命令 Redis数据结构、原理分析、应用实战 什么是Redis Redis的作用 ...Redis实战及源码分析 分布式锁实战 管道模式 Redis的应用架构 缓存与数据一致性问题 缓存雪崩与缓存穿透 布隆过滤器

    Redis集群搭建及使用

    JedisCluster jedisCluster = new JedisCluster(new HostAndPort("node1_ip", node1_port), 2000); ``` 3. **编写业务逻辑**: - 使用注入的`StringRedisTemplate`或`JedisCluster`对象进行数据的读写操作。 4....

    jedis-2.1.0-sources.jar

    **Jedis:Redis的Java客户端** Jedis是Java开发者用于操作Redis数据库的开源客户端库。Redis是一个高性能的键值...通过阅读和分析源码,开发者可以更好地定制Jedis以满足特定项目需求,或为社区贡献自己的改进和优化。

    redis单机和集群Java版、缓存及缓存同步

    Redis 是一个基于键值对的高性能内存数据存储系统,由 C 语言编写,常被用于缓存和数据...Java 开发者可以使用 Jedis 或 JedisCluster 进行连接和操作,同时需要关注缓存管理和数据同步策略以确保系统的稳定性和性能。

    2019年 Redis从入门到高可用 分布式实战教程

    10-14 JedisCluster执行源码分析.mp4 10-13 smart客户端实现原理.mp4 10-12 ask重定向.mp4 10-11 moved异常说明和操作.mp4 10-10 客户端路由-目录.mp4 10-1 集群伸缩目录.mp4 1-9 特性5-功能丰富.mp4 1-8 ...

    tomcat-redis-session-manager源码

    通过源码分析,我们可以看到它在设计上的灵活性和实用性,以及在处理高并发场景时的优秀性能。无论是开发者还是运维人员,理解并掌握这套解决方案的内在机制,都能在构建大型Web应用时提供有力支持。

    狂神redis源码笔记.rar

    在“狂神redis源码笔记”中,我们可以期待学习到Redis的源码分析、内部机制以及如何通过Java进行高效操作。源码分析是深入理解Redis工作原理的关键,有助于开发者优化使用方式或进行定制化开发。 Redis的核心知识点...

    spring-data-redis 1.7.6

    深入源码分析,我们可以看到Spring-data-redis如何巧妙地封装了Jedis客户端,提供了抽象的RedisOperations接口,使得操作Redis变得简单。例如,RedisTemplate中的opsForValue()方法返回一个ValueOperations实例,...

    redis学习笔记,redis详解,Java源码.zip

    Redis是一款高性能的键值对数据库,它以内存存储为主,数据持久...通过阅读本学习笔记和源码分析,你可以深入了解Redis的工作原理,掌握如何在实际项目中高效地使用Redis。不断实践和探索,你将成为Redis的熟练驾驭者。

    Redis管理平台Repoll v1.0.zip

    3. 源码分析: Repoll的源代码可以帮助学习者深入了解如何与Redis服务器进行通信,以及如何构建一个完整的管理平台。源代码可能包含以下几个部分: - 客户端库:使用如Jedis(Java)或redis-py(Python)等客户端...

    【redis-7.0.15】

    它以C语言编写,支持网络,是完全开源的,拥有丰富的数据结构,如字符串、哈希、列表、集合、有序集合等,同时还提供了发布/订阅、事务、持久化、主从复制等功能,广泛应用于缓存、消息中间件、实时分析、社交网络等...

    Redis-5.0.0集群配置

    - **Jedis (Java Cluster Client)**: Java应用程序可以通过Jedis客户端库来连接Redis集群。 - **r3c (C++ Cluster Client)**: 对于使用C++开发的应用程序,可以使用r3c作为客户端库。 #### 11. 新增节点 - **添加...

    一个很好的redis例子

    4. 示例代码分析:在TestRedisSerializer-master这个项目中,`TestRedisSerializer`可能是测试类,它演示了如何序列化和反序列化Java对象并存储到Redis。序列化是将Java对象转换为字节流的过程,以便于存储或传输,...

    redis in action, java+python

    《Redis in Action:Java+Python》是一本深入...通过阅读《Redis in Action》并结合提供的Java和Python源码,开发者不仅能掌握Redis的基本操作,还能深入了解其在实际项目中的应用和优化技巧,提升开发效率和系统性能。

    redis的学习手册

    最后,手册可能还会涉及Redis与其他技术的集成,如与Spring框架的整合,使用Jedis或Lettuce等客户端库进行开发,以及在微服务架构中的应用案例。 总之,学习Redis不仅需要理解其核心概念和数据结构,还要掌握持久化...

    Redis-3.pdf

    - **10.3 Java 客户端 Jedis**:使用 Java 编写的 Redis 客户端,支持集群模式。 - **10.4 C++ 客户端 r3c**:适用于 C++ 应用程序的 Redis 集群客户端。 #### 十一、节点管理 - **11.1 添加新主节点**:通过 `...

    有图有真相。windows环境下配置tomat+redis+nginx集群共享session

    - 要实现集群,需配置`server.xml`中的`Cluster`元素,设置`Channel`和`Manager`。 2. **Redis**: - Redis是一个高性能的键值数据库,常被用作缓存服务。 - 在Session共享中,Redis作为Session仓库,存储所有...

Global site tag (gtag.js) - Google Analytics