`
裴小星
  • 浏览: 265315 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
8ccf5db2-0d60-335f-a337-3c30d2feabdb
Java NIO翻译
浏览量:27829
F3e939f0-dc16-3d6e-8c0b-3315c810fb91
PureJS开发过程详解
浏览量:74157
07a6d496-dc19-3c71-92cf-92edb5203cef
MongoDB Java ...
浏览量:62960
社区版块
存档分类
最新评论

MongoDB Java Driver 源码分析(8):com.mongodb.RelicaSetStatus

阅读更多
  RelicaSetStatus 用于读取集群中的服务器节点的信息。

  getMaster 方法和 getASecondary 方法分别可用于以获取 master 节点和 secondary 节点。
  内部类 ReplicaSetStatus.Node 包含了节点的状态信息,内部类 ReplicaSetStatus.Node.Updater 用于实例化一个定时更新节点状态的线程。

  getMaster 方法和 getASecondary 方法分析如下:
    // 获取 master 服务器地址
    ServerAddress getMaster(){
        // 获取 master 服务器节点
        Node n = getMasterNode();
        if ( n == null )
            return null;

        // 返回节点地址
        return n._addr;
    }

    // 获取 mater 服务器节点
    Node getMasterNode(){
        // 检查数据连是否已经关闭接
        _checkClosed();

        // 遍历所有节点,找到 master 服务器节点
        for ( int i=0; i<_all.size(); i++ ){
            Node n = _all.get(i);
            if ( n.master() )
                return n;
        }
        return null;
    }

    // 获取一个最佳的 secondary 服务器地址
    ServerAddress getASecondary(){
        // 检查数据连是否已经关闭接
        _checkClosed();

        Node best = null;
        double badBeforeBest = 0;

        // 随机选取起点
        int start = _random.nextInt( _all.size() );

        double mybad = 0;

        for ( int i=0; i<_all.size(); i++ ){
            Node n = _all.get( ( start + i ) % _all.size() );

            // 不是 secondary 节点,跳过
            if ( ! n.secondary() ){
                mybad++;
                continue;
            }

            // 找到第一个  secondary 节点
            // 设置 best,继续查找
            if ( best == null )
                best = n;
                badBeforeBest = mybad;
                mybad = 0;
                continue;
            }

            // 第 n 个 secondary 节点
            // 与之前找到的节点比较,选用最好的
            // 比较 ping 值
            long diff = best._pingTime - n._pingTime;
            if ( diff > slaveAcceptableLatencyMS ||
                 // 一种保证随机分布的算法
                 ( ( badBeforeBest - mybad ) / ( _all.size() - 1 ) ) > _random.nextDouble() )
                {
                best = n;
                badBeforeBest = mybad;
                mybad = 0;
            }

        }

        if ( best == null )
            return null;

        // 返回 best 的地址
        return best._addr;
    }

包含节点状态信息的内部类 ReplicaSetStatus.Node

  ReplicaSetStatus.Node 包含了节点的信息:
final ServerAddress _addr; // 地址
final Set<String> _names = Collections.synchronizedSet( new HashSet<String>() ); // 节点名称
DBPort _port; // 数据库端口

boolean _ok = false;  // 状态是否正常
long _lastCheck = 0;  // 上次检查时间
long _pingTime = 0;  // ping 延时

boolean _isMaster = false;  // 是否为 master 节点
boolean _isSecondary = false;  // 是否为 secondary 节点

double _priority = 0; // 优先级


  另外,它也提供了更新节点的方法 upadate 和 updateAll:
        // 更新节点状态
        synchronized void update(Set<Node> seenNodes){
            try {

                // 发送 admin 请求,检查状态 
                long start = System.currentTimeMillis();
                CommandResult res = _port.runCommand( _mongo.getDB("admin") , _isMasterCmd );
                _lastCheck = System.currentTimeMillis();
                _pingTime = _lastCheck - start;

                // 状态异常
                if ( res == null ){
                    _ok = false;
                    return;
                }

                // 状态正常
                _ok = true;
 
               // 是 mater 节点
                _isMaster = res.getBoolean( "ismaster" , false );

                // 是 secondary 节点 
                _isSecondary = res.getBoolean( "secondary" , false );

                // 是 primary 节点
                _lastPrimarySignal = res.getString( "primary" );

                // 获取 hosts 信息
                if ( res.containsField( "hosts" ) ){
                    for ( Object x : (List)res.get("hosts") ){
                        String host = x.toString();
                        Node node = _addIfNotHere(host);
                        if (node != null && seenNodes != null)
                            seenNodes.add(node);
                    }
                }

                // 获取 passives 信息
                if ( res.containsField( "passives" ) ){
                    for ( Object x : (List)res.get("passives") ){
                        String host = x.toString();
                        Node node = _addIfNotHere(host);
                        if (node != null && seenNodes != null)
                            seenNodes.add(node);
                    }
                }

                // 获取 maxBsonObjectSize
                if (_isMaster ) {
                    if (res.containsField("maxBsonObjectSize"))
                        maxBsonObjectSize = ((Integer)res.get( "maxBsonObjectSize" )).intValue();
                    else
                        maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE;
                }

                // 获取 setName
                if (res.containsField("setName")) {
	                String setName = res.get( "setName" ).toString();
	                if ( _setName == null ){
	                    _setName = setName;
	                    _logger = Logger.getLogger( _rootLogger.getName() + "." + setName );
	                }
	                else if ( !_setName.equals( setName ) ){
	                    _logger.log( Level.SEVERE , "mis match set name old: " + _setName + " new: " + setName );
	                    return;
	                }
                }

            }
            catch ( ... ){
                // ...
            }
        }

    // 更新所有节点状态
    synchronized void updateAll(){
        HashSet<Node> seenNodes = new HashSet<Node>();

       // 遍历更新所有节点
       for ( int i=0; i<_all.size(); i++ ){
            Node n = _all.get(i);
            n.update(seenNodes);
        }

        // 移除已经不存在的节点
        if (!seenNodes.isEmpty()) {
            // not empty, means that at least 1 server gave node list
            // remove unused hosts
            Iterator<Node> it = _all.iterator();
            while (it.hasNext()) {
                if (!seenNodes.contains(it.next()))
                    it.remove();
            }
        }
    }

定时更新节点状态的内部类  ReplicaSetStatus.Node.Updater

  ReplicaSetStatus.Node.Updater 继承了 Thread,可以实例化一个定时更新节点状态的线程。
        // 覆写 Thread 类的 run 方法
        public void run(){
            while ( ! _closed ){
                try {
                    // 更新所有节点状态
                    updateAll();

                    // 如果当前时间大于 _nextResolveTime
                    // 则更新所有节点并设置 _nextResolveTime
                    long now = System.currentTimeMillis();
                    if (inetAddrCacheMS > 0 && _nextResolveTime < now) {
                        _nextResolveTime = now + inetAddrCacheMS;
                        for (Node node : _all) {
                            node.updateAddr();
                        }
                    }

                    // 检查 master ,以避免更新带来的不同步
                    _mongo.getConnector().checkMaster(true, false);
                }
                catch ( Exception e ){
                    _logger.log( Level.WARNING , "couldn't do update pass" , e );
                }

                // sleep一段时间,等待下次更新
                try {
                    Thread.sleep( updaterIntervalMS );
                }
                catch ( InterruptedException ie ){
                }

            }
        }
1
1
分享到:
评论

相关推荐

    数据库驱动常见错误"java.lang.ClassNotFoundException:解决了jsp连接Error establishing socket.

    "java.lang.ClassNotFoundException: com.microsoft.jdbc.sqlserver.SQLServerDriver" 解决方案 [Microsoft][SQLServer 2000 Driver for JDBC]Error establishing socket. 解决了jsp连接 sql server 2000的问题

    MongoDB Java Driver 源码分析(1):Package 概述

    本篇文章将聚焦于MongoDB Java Driver的源码分析,首先从Package概述的角度进行深入探讨。 MongoDB Java Driver的源码主要分为以下几个核心包: 1. **com.mongodb**: 这是最顶层的包,包含了驱动的核心组件。`...

    mongodb c#驱动最新驱动mongodb.driver.dll 版本2.12.0-beta1

    标题提到的是 MongoDB 的 C# 驱动的最新版本——mongodb.driver.dll,具体为 2.12.0-beta1 版本。 MongoDB.Driver.dll 是 C# 驱动的核心组件,它包含了连接、查询、更新和操作 MongoDB 数据库所需的所有功能。这个...

    mongodb-java-driver-4.4.0.jar

    mongodb-java-driver-4.4.0.jar

    mongodb-driver-sync-4.2.3-API文档-中英对照版.zip

    标签:mongodb、driver、sync、中英对照文档、jar包、java; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,...

    mongodb-driver-core-4.2.3-API文档-中文版.zip

    标签:mongodb、driver、core、中文文档、jar包、java; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请...

    mongodb-driver-sync-4.2.3-API文档-中文版.zip

    标签:mongodb、driver、sync、中文文档、jar包、java; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请...

    mongodb-driver-core-3.5.0.jar

    `mongodb-driver-3.5.0.jar`是完整版的MongoDB Java驱动,它扩展了`mongodb-driver-core`,提供了更高级别的操作接口,如`MongoClient`和`MongoDatabase`,方便开发者进行数据库操作。这个驱动程序使开发者能够方便...

    MongoDB Java Driver 简单操作

    ### MongoDB Java Driver 简单操作详解 #### 一、简介 MongoDB 是一款非常流行的文档型数据库系统,因其灵活性和高性能而被广泛应用于多种场景之中。为了方便开发者使用 Java 进行开发,MongoDB 提供了官方的 Java ...

    mongo-java-driver-3.12.7.jar

    mongo-java-driver-3.12.7 最新版本,java连接MongoDB最新驱动,有需要的可以自行下载

    Java连接mongoDB需要的jar包(3.9.1)

    这里提到的"Java连接mongoDB需要的jar包(3.9.1)"是指Java开发者用于连接MongoDB数据库的一组关键库文件,包括`bson-3.9.1.jar`、`mongodb-driver-3.9.1.jar`和`mongodb-driver-core-3.9.1.jar`。这些JAR文件是...

    mongo-java-driver-3.2.2.jar.zip

    MongoDB是一个流行的开源、文档型数据库系统,而`mongo-java-driver`是官方提供的Java API,允许开发者在Java应用程序中执行各种数据库操作,如读取、写入、查询等。 在本例中,我们讨论的是`mongo-java-driver`的...

    MongoDB_3.8.2驱动jar包及其同版本依赖包bson和mongodb-driver-core

    java和mongodb连接,需要mongodb-driver,您还必须下载其依赖项: bson和 mongodb-driver-core》》3个包: mongodb-driver-3.8.2.jar; bson-3.8.2.jar; mongodb-driver-core-3.8.2.jar

    mongodb-driver-core-4.2.3-API文档-中英对照版.zip

    标签:mongodb、driver、core、中英对照文档、jar包、java; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,...

    MongoDB c#驱动 dll

    MongoDB.Driver.dll构建在MongoDB.Driver.Core.dll之上,提供了一套更高级别的、易于使用的接口。 在C#中使用这些DLL,开发者首先需要安装MongoDB.CSharpDriver NuGet包,这将自动引入所有必需的依赖。然后,可以...

    【解决方案】pyspark 初次连接mongo 时报错Class not found exception:com.mongodb.spark.sql.DefaultSource

     df = spark.read.format(com.mongodb.spark.sql.DefaultSource).load()  File /home/cisco/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py, line 165, in load  

    mongodb-java-driver-3.5.0.jar最新驱动包

    亲测可用,解压包含三个jar包,引用时sources和doc包根据需要添加。 mongo-java-driver-3.5.0.jar; mongo-java-driver-3.5.0-javadoc.jar; mongo-java-driver-3.5.0-sources.jar;

    mongodb-async-driver-2.0.1 jar包

    MongoDB异步驱动程序(mongodb-async-driver)是为Java开发者设计的一个库,它允许应用程序以非阻塞的方式与MongoDB服务器进行通信,提高了处理大量并发请求的能力。 在"mongodb-async-driver-2.0.1.jar"这个特定...

    mongoDB java driver api

    8. **聚合框架**:MongoDB的聚合框架允许你对数据进行处理和分析,类似于SQL的GROUP BY和JOIN操作。Java驱动提供了`aggregate()`方法来执行聚合操作,包括阶段(如 `$match`, `$group`, `$sort`, `$project` 等)。 ...

    mongodb-java-driver源码依赖库

    10. **`com.mongodb.async.*`**:对于异步编程,MongoDB Java Driver提供了异步版本的客户端和集合操作,以配合Java 8的 CompletableFuture 或其他异步库。 了解这些核心概念后,开发者可以更有效地使用MongoDB ...

Global site tag (gtag.js) - Google Analytics