论坛首页 综合技术论坛

第七章:小朱笔记hadoop之源码分析-hdfs分析 第五节:Datanode 分析

浏览 2271 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2013-06-16  

第七章:小朱笔记hadoop之源码分析-hdfs分析

第五节:Datanode 分析

5.1 Datanode 启动过程分析

5.1 Datanode 启动过程分析

(1)shell脚本启动DataNode

start-dfs.sh  
"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode $dataStartOpt 

 

(2)main()函数启动分析

        主线程阻塞,让DataNode的任务循环执行,调用createDataNode方法创建datanode,等datanode线程结束 。

    //主线程阻塞,让DataNode的任务循环执行  
     public static void secureMain(String [] args, SecureResources resources) {  
       try {  
         LOG.info("start up datanode...");  
         if(null!=args){   
           for(String arg:args) {  
             LOG.info("arg:"+arg);  
           }  
         }  
         StringUtils.startupShutdownMessage(DataNode.class, args, LOG);  
         DataNode datanode = createDataNode(args, null, resources);  
         if (datanode != null)  
           datanode.join();  
       } catch (Throwable e) {  
         LOG.error(StringUtils.stringifyException(e));  
         System.exit(-1);  
       } finally {  
         // We need to add System.exit here because either shutdown was called or  
         // some disk related conditions like volumes tolerated or volumes required  
         // condition was not met. Also, In secure mode, control will go to Jsvc and  
         // the process hangs without System.exit.  
         LOG.info("Exiting Datanode");  
         System.exit(0);  
       }  
     }  

 

(3)创建Datanode实例,并启动Datanode线程

  调用instantiateDataNode方法初始化datanode,调用runDatanodeDaemon方法运行datanode线程.

    /** Instantiate & Start a single datanode daemon and wait for it to finish. 
     *  If this thread is specifically interrupted, it will stop waiting. 
     *  LimitedPrivate for creating secure datanodes 
     */  
    public static DataNode createDataNode(String args[],  
              Configuration conf, SecureResources resources) throws IOException {  
       
       //初始化DataNode  
      DataNode dn = instantiateDataNode(args, conf, resources);  
      
      runDatanodeDaemon(dn);  
      //进行DataNode注册,创建线程,设置守护线程,启动线程  
      return dn;  
    }  

 

(4)实例化DataNode结点

  解析启动参数:
  如果设置了机架配置${dfs.network.script},退出程序
  通过配置${dfs.data.dir}得到datanode的存储目录
  调用makeInstance方法创建实例
  makeInstance检查数据存储目录的合法性 并初始化DataNode对象
  DataNode构造函数中调用startDataNode根据具体配置文件的信息进行具体的初始化过程
  调用startDataNode方法启动datanode
  如果启动出错,调用shutdown方法关闭datanode

 

    /** 
      * Start a Datanode with specified server sockets for secure environments 
      * where they are run with privileged ports and injected from a higher 
      * level of capability 
      */  
     DataNode(final Configuration conf,  
              final AbstractList<File> dataDirs, SecureResources resources) throws IOException {  
       super(conf);  
       SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,   
           DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);  
      
       datanodeObject = this;  
       supportAppends = conf.getBoolean("dfs.support.append", false);  
       this.userWithLocalPathAccess = conf  
           .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);  
       try {  
         startDataNode(conf, dataDirs, resources);  
       } catch (IOException ie) {  
         shutdown();  
         throw ie;  
       }     
     }  

 

(5)创建Datanode实例

写道
(a)获得本地主机名和namenode的地址
(b)连接namenode,本地datanode的名称为:“machineName:port”
(c)从namenode得到version和id信息
(d)初始化存储目录结构,如果有目录没有格式化,对其进行格式化
(e)打开datanode监听端口ss,默认端口是50010
(f)初始化DataXceiverServer后台线程,使用ss接收请求
(g)初始化DataBlockScanner,块的校验只支持FSDataset
(h)初始化并启动datanode信息服务器infoServer,默认访问地址是http://0.0.0.0:5007,如果允许https,默认https端口是50475 infoServer添加DataBlockScanner的Servlet,访问地址是http://0.0.0.0:50075/blockScannerReport .
(i)初始化并启动ipc服务器,用于RPC调用,默认端口是50020

 

    /** 
      * This method starts the data node with the specified conf. 
      *  
      * @param conf - the configuration 
      *  if conf's CONFIG_PROPERTY_SIMULATED property is set 
      *  then a simulated storage based data node is created. 
      *  
      * @param dataDirs - only for a non-simulated storage data node 
      * @throws IOException 
      * @throws MalformedObjectNameException  
      * @throws MBeanRegistrationException  
      * @throws InstanceAlreadyExistsException  
      */  
     void startDataNode(Configuration conf,   
                        AbstractList<File> dataDirs, SecureResources resources  
                        ) throws IOException {  
       if(UserGroupInformation.isSecurityEnabled() && resources == null)  
         throw new RuntimeException("Cannot start secure cluster without " +  
                "privileged resources.");  
         
       this.secureResources = resources;  
       // use configured nameserver & interface to get local hostname  
       //设置machineName  
       if (conf.get("slave.host.name") != null) {  
         machineName = conf.get("slave.host.name");     
       }  
       if (machineName == null) {  
         machineName = DNS.getDefaultHost(conf.get("dfs.datanode.dns.interface","default"),conf.get("dfs.datanode.dns.nameserver","default"));  
       }  
         
       //获取nameNode的地址信息    
       InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);  
         
       //setSocketout时间    
       this.socketTimeout =  conf.getInt("dfs.socket.timeout",HdfsConstants.READ_TIMEOUT);  
       this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",HdfsConstants.WRITE_TIMEOUT);  
         
       /* Based on results on different platforms, we might need set the default  
        * to false on some of them. */  
       this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", true);  
         
       //写包的大小,默认64K    
       this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);  
      
       //创建本地socketaddress地址    
       InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);  
       int tmpPort = socAddr.getPort();  
         
       //DataStorage保存了存储相关的信息  
       storage = new DataStorage();// Data storage information file. 数据存储信息文件    
         
       //构造一个注册器  
       // construct registration  
       this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);  
      
       //通过动态代理生成namenode实例  
       // connect to name node  
       this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class,DatanodeProtocol.versionID,nameNodeAddr, conf);  
         
       // get version and id info from the name-node   
       // 从名称节点获取版本和id信息  主要包含buildVersin和distributeUpgradeVersion,用于版本检验  
       NamespaceInfo nsInfo = handshake();  
         
         
       StartupOption startOpt = getStartupOption(conf);  
       assert startOpt != null : "Startup option must be set.";  
         
       boolean simulatedFSDataset = conf.getBoolean("dfs.datanode.simulateddatastorage", false);  
         
       //判断一下是否是伪分布式,否则走正常判断,此处分析正常逻辑  
       if (simulatedFSDataset) {  
           setNewStorageID(dnRegistration);  
           dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;  
           dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;  
           // it would have been better to pass storage as a parameter to  
           // constructor below - need to augment ReflectionUtils used below.  
           conf.set("StorageId", dnRegistration.getStorageID());  
           try {  
             //Equivalent of following (can't do because Simulated is in test dir)  
             //  this.data = new SimulatedFSDataset(conf);  
             this.data = (FSDatasetInterface) ReflectionUtils.newInstance(  
                 Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);  
           } catch (ClassNotFoundException e) {  
             throw new IOException(StringUtils.stringifyException(e));  
           }  
       } else { // real storage  
         // read storage info, lock data dirs and transition fs state if necessary  
         storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);  
         // adjust  
         //将storage进行信息注册  
         this.dnRegistration.setStorageInfo(storage);  
           
         // initialize data node internal structure  
         //根据storage和conf信息,生成FSDataset,用于数据块操作    
         this.data = new FSDataset(storage, conf);  
       }  
           
       // register datanode MXBean  
       this.registerMXBean(conf); // register the MXBean for DataNode  
         
       // Allow configuration to delay block reports to find bugs  
       artificialBlockReceivedDelay = conf.getInt(  
           "dfs.datanode.artificialBlockReceivedDelay", 0);  
      
       // find free port or use privileged port provide  
       //初始化Socket服务器端,区分NIO和IO    
       ServerSocket ss;  
       if(secureResources == null) {  
         ss = (socketWriteTimeout > 0) ?   
           ServerSocketChannel.open().socket() : new ServerSocket();  
         Server.bind(ss, socAddr, 0);  
       } else {  
         ss = resources.getStreamingSocket();  
       }  
       //设置接收的buffer缓存大小,默认64K  
       ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);   
         
       // adjust machine name with the actual port  
       tmpPort = ss.getLocalPort();  
       selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),  
                                        tmpPort);  
       this.dnRegistration.setName(machineName + ":" + tmpPort);  
       LOG.info("Opened info server at " + tmpPort);  
         
       //服务器 用于接收/发送 一个数据块 。 这是创建监听来自客户或其他 DataNodes 的 请求 。 这种小型服务器不使用 thHadoop IPC机制  
       //初始化处理类dataXceiverServer    
       this.threadGroup = new ThreadGroup("dataXceiverServer");  
       this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));  
       this.threadGroup.setDaemon(true); // auto destroy when empty  
      
       //分别设置块状态信息间隔时间和心跳间隔时间  
       this.blockReportInterval = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);  
       this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",BLOCKREPORT_INITIAL_DELAY)* 1000L;   
       if (this.initialBlockReportDelay >= blockReportInterval) {  
         this.initialBlockReportDelay = 0;  
         LOG.info("dfs.blockreport.initialDelay is greater than " +  
           "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");  
       }  
       this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;  
         
         
       DataNode.nameNodeAddr = nameNodeAddr;  
      
       //initialize periodic block scanner  
       String reason = null;  
       if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {  
         reason = "verification is turned off by configuration";  
       } else if ( !(data instanceof FSDataset) ) {  
         reason = "verifcation is supported only with FSDataset";  
       }   
       if ( reason == null ) {  
         //初始化一个定期检查scanner blockScanner用于定时对文件块进行扫描    
         blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);  
       } else {  
         LOG.info("Periodic Block Verification is disabled because " +  
                  reason + ".");  
       }  
      
       //create a servlet to serve full-file content  
       //创建HttpServer,内部用jetty实现,用于页面监控    
       InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);  
       String infoHost = infoSocAddr.getHostName();  
       int tmpInfoPort = infoSocAddr.getPort();  
       this.infoServer = (secureResources == null)   
          ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,   
              conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN))  
          : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,  
              conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN),  
              secureResources.getListener());  
       if (conf.getBoolean("dfs.https.enable", false)) {  
         boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);  
         InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(  
             "dfs.datanode.https.address", infoHost + ":" + 0));  
         Configuration sslConf = new Configuration(false);  
         sslConf.addResource(conf.get("dfs.https.server.keystore.resource",  
             "ssl-server.xml"));  
         this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);  
       }  
       this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);  
       this.infoServer.addInternalServlet(null, "/getFileChecksum/*",  
           FileChecksumServlets.GetServlet.class);  
      
       this.infoServer.setAttribute("datanode", this);  
       this.infoServer.setAttribute("datanode.blockScanner", blockScanner);  
       this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);  
       this.infoServer.addServlet(null, "/blockScannerReport",   
                                  DataBlockScanner.Servlet.class);  
      
       if (WebHdfsFileSystem.isEnabled(conf, LOG)) {  
         infoServer.addJerseyResourcePackage(DatanodeWebHdfsMethods.class  
             .getPackage().getName() + ";" + Param.class.getPackage().getName(),  
             WebHdfsFileSystem.PATH_PREFIX + "/*");  
       }  
       this.infoServer.start();  
         
         
       // adjust info port  
       this.dnRegistration.setInfoPort(this.infoServer.getPort());  
       myMetrics = DataNodeInstrumentation.create(conf,  
                                                  dnRegistration.getStorageID());  
         
       // set service-level authorization security policy  
       if (conf.getBoolean(  
             ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {  
         ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());  
       }  
      
       // BlockTokenSecretManager is created here, but it shouldn't be  
       // used until it is initialized in register().  
       this.blockTokenSecretManager = new BlockTokenSecretManager(false,0, 0);  
         
       //init ipc server  
       //开启本地ipc服务,监听来自client和其它    
       InetSocketAddress ipcAddr = NetUtils.createSocketAddr(conf.get("dfs.datanode.ipc.address"));  
       ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),   
           conf.getInt("dfs.datanode.handler.count", 3), false, conf,  
           blockTokenSecretManager);  
         
       dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());  
      
       LOG.info("dnRegistration = " + dnRegistration);  
     }  

 

 

(6)运行DataNode结点

进行DataNode注册,创建线程,设置守护线程,启动线程。

    /** Start a single datanode daemon and wait for it to finish. 
     *  If this thread is specifically interrupted, it will stop waiting. 
     */  
    public static void runDatanodeDaemon(DataNode dn) throws IOException {  
      if (dn != null) {  
        //register datanode  
        dn.register();  
        dn.dataNodeThread = new Thread(dn, dnThreadName);  
        dn.dataNodeThread.setDaemon(true); // needed for JUnit testing  
        dn.dataNodeThread.start();  
      }  
    }  

 

 启动DataXceiverServer,然后进入datanode的正常运行。检查是否需要升级,调用offerService方法提供服务.

    public void run() {  
       LOG.info(dnRegistration + "In DataNode.run, data = " + data);  
       ///启动数据块的流读写服务器,  
       // start dataXceiveServer  
       dataXceiverServer.start();  
       //内部hadoop ipc服务器  
       ipcServer.start();  
             
       while (shouldRun) {  
         try {  
           //检测是否需要升级hadoop文件系统  
           startDistributedUpgradeIfNeeded();  
           //DataNode提供服务,定时发送心跳给NameNode,响应NameNode返回的命令并执行  
           offerService();  
         } catch (Exception ex) {  
           LOG.error("Exception: " + StringUtils.stringifyException(ex));  
           if (shouldRun) {  
             try {  
               Thread.sleep(5000);  
             } catch (InterruptedException ie) {  
             }  
           }  
         }  
       }  
             
       LOG.info(dnRegistration + ":Finishing DataNode in: "+data);  
       shutdown();  
     }  

 

 

论坛首页 综合技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics