第七章:小朱笔记hadoop之源码分析-hdfs分析
第四节:namenode分析
4.1 namenode启动过程分析
org.apache.hadoop.hdfs.server.namenode.main 方法是系统的入口,它会调用 createNameNode 创建 NameNode 实例。 createNameNode 分析命令行参数,如果是 FORMAT 戒 FINALIZE,调用对应的方法后退出,如果是其他的参数,将创建NameNode 对象。NameNode的构造函数会调initialize,初始化NameNode的成员发量,包括创建 RPC 服务器,初始化FSNamesystem,初始化RPC服务器和回收站线程。
创建的服务如下:
服务 类 server ipc.RPC.Server serviceRpcServer ipc.RPC.Server HttpServer http.HttpServer Trash Emptier fs.Trash.Trash.Emptier hbthread hdfs.server.namenode.FSNamesystem.HeartbeatMonitor lmthread hdfs.server.namenode.LeaseManager.Monitor replthread hdfs.server.namenode.FSNamesystem.ReplicationMonitor dnthread hdfs.server.namenode.DecommissionManager.Monitor
初始化 name-node 入口:
/** * Initialize name-node. * * @param conf the configuration */ private void initialize(Configuration conf) throws IOException { InetSocketAddress socAddr = NameNode.getAddress(conf); //从配置conf中获取到Namenode服务器所使用的Socket地址 读取fs.default.name的值,获取hdfs集群的地址 UserGroupInformation.setConfiguration(conf); //设置用户权限信息 //如果dfs.namenode.keytab.file存在,并且kerberos已经开启,则调用UserGroupInformation.loginUserFromKeytab进行登陆 //登陆使用dfs.namenode.kerberos.principal作为用户名,否则使用当前linux的user作为用户。 SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName()); //获取namenode同时处理请求的数量配置,默认为10 服务器上处理器Handler线程的数量 int handlerCount = conf.getInt("dfs.namenode.handler.count", 10); // set service-level authorization security policy //如果授权配置(hadoop.security.authorization)开启了,则刷新授权策略 if (serviceAuthEnabled = conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) { //默认会重新加载hadoop-policy.xml中的acl配置,加载完成后acl配置保存在 ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider()); } //创建服务指标,用于观察namenode服务状态 myMetrics = NameNodeInstrumentation.create(conf);// 初始化NameNodeMetrics //启动FSNamesystem,启动FSNamesystem时,会启动各种thread执行namenode职责 //1.init FSNamesystem this.namesystem = new FSNamesystem(this, conf); //如果安全机制开启 if (UserGroupInformation.isSecurityEnabled()) { //启动守护线程每5秒执行一次ExpiredTokenRemover namesystem.activateSecretManager(); } //2.init namenode-datanode,namenode-snn RPC Server //创建rpc服务器,如果dfs.namenode.servicerpc-address配置项存在,则用来作为服务器地址 InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf); if (dnSocketAddr != null) { int serviceHandlerCount = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); setRpcServiceServerAddress(conf); } //3.init namenode-client RPC Server this.server = RPC.getServer(this, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); // The rpc-server port can be ephemeral... ensure we have the correct info this.serverAddress = this.server.getListenerAddress(); FileSystem.setDefaultUri(conf, getUri(serverAddress)); LOG.info("Namenode up at: " + this.serverAddress); //4. start Http Server startHttpServer(conf); //5. start namenode-client RPC Server this.server.start(); if (serviceRpcServer != null) { serviceRpcServer.start(); //6. start namenode-datanode,namenode-snn RPC Server } //7.start TrashEmptier //启动垃圾清理守护线程,读取fs.trash.interval的值作为两次清理的时间间隔。默认每60分钟清理一次 startTrashEmptier(conf); }
(1)初始化FSNamesystem
启动FSNamesystem,启动FSNamesystem时,会启动各种thread执行namenode职责。FSNamesystem 的构造函数会调用 initialize 方法,去初始化上面我们分析过的一堆成员发量。几个重要的步骤包括加载 FSImage editlog 设置系统为安全模式,初始化各个工作线程和 HTTP 服务器。
FSNamesystem重要属性:
// Default initial capacity and load factor of map public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16; public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f; private boolean isPermissionEnabled;//是否打开权限检查,可以通过配置项dfs.permissions来设置。 //本地文件的用户文件属主和文件组,可以通过hadoop.job.ugi设置,如果没有设置,那么将使用启动HDFS的用户(通过whoami获得)和该用户所在的组(通过groups获得)作为值 private UserGroupInformation fsOwner; private String supergroup;//对应配置项dfs.permissions.supergroup,应用在defaultPermission中,是系统的超级组。 //缺省权限,缺省用户为fsOwner,缺省用户组为supergroup,缺省权限为0777,可以通过dfs.upgrade.permission修改。 private PermissionStatus defaultPermission; // FSNamesystemMetrics counter variables //系统总容量/已使用容量/剩余容量 private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L; //系统总连接数,根据DataNode心跳信息跟新 private int totalLoad = 0; boolean isAccessTokenEnabled; BlockTokenSecretManager accessTokenHandler; private long accessKeyUpdateInterval; private long accessTokenLifetime; // Scan interval is not configurable. private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); private DelegationTokenSecretManager dtSecretManager; //分别是成员变量pendingReplications(正在复制的数据块), //neededReplications(需要复制的数据块)的大小, //scheduledReplicationBlocksCount是当前正在处理的复制工作数目 volatile long pendingReplicationBlocksCount = 0L; volatile long corruptReplicaBlocksCount = 0L; volatile long underReplicatedBlocksCount = 0L; volatile long scheduledReplicationBlocksCount = 0L; volatile long excessBlocksCount = 0L; volatile long pendingDeletionBlocksCount = 0L; // // Stores the correct file name hierarchy //指向系统使用的FSDirectory对象。 public FSDirectory dir; // // Mapping: Block -> { INode, datanodes, self ref } // Updated only in response to client-sent information. // final BlocksMap blocksMap = new BlocksMap(DEFAULT_INITIAL_MAP_CAPACITY, DEFAULT_MAP_LOAD_FACTOR); // // Store blocks-->datanodedescriptor(s) map of corrupt replicas // //保存损坏(如:校验没通过)的数据块到对应DataNode的关系,CorruptReplicasMap类图如下,类只有一个成员变量, //保存Block到一个DatanodeDescriptor的集合的映射和这个映射上的一系列操作 // public CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); /** * Stores the datanode -> block map. * <p> * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by * storage id. In order to keep the storage map consistent it tracks * all storages ever registered with the namenode. * A descriptor corresponding to a specific storage id can be * <ul> * <li>added to the map if it is a new storage id;</li> * <li>updated with a new datanode started as a replacement for the old one * with the same storage id; and </li> * <li>removed if and only if an existing datanode is restarted to serve a * different storage id.</li> * </ul> <br> * The list of the {@link DatanodeDescriptor}s in the map is checkpointed * in the namespace image file. Only the {@link DatanodeInfo} part is * persistent, the list of blocks is restored from the datanode block * reports. * <p> * Mapping: StorageID -> DatanodeDescriptor * * 保存了StorageID >> DatanodeDescriptor的映射,用于保证DataNode使用的Storage的一致性。 */ NavigableMap<String, DatanodeDescriptor> datanodeMap = new TreeMap<String, DatanodeDescriptor>(); // // Keeps a Collection for every named machine containing // blocks that have recently been invalidated and are thought to live // on the machine in question. // Mapping: StorageID -> ArrayList<Block> // // 保存了每个DataNode上无效但还存在的数据块(StorageID >> ArrayList<Block>)。 // 保存了每个DataNode上有效,但需要删除的数据块(StorageID >> TreeSet<Block>),这种情况可能发生在一个DataNode故障后恢复后, // 上面的数据块在系统中副本数太多,需要删除一些数据块。 private Map<String, Collection<Block>> recentInvalidateSets = new TreeMap<String, Collection<Block>>(); // // Keeps a TreeSet for every named node. Each treeset contains // a list of the blocks that are "extra" at that location. We'll // eventually remove these extras. // Mapping: StorageID -> TreeSet<Block> //保存Datanode上有效但需要删除的数据块(StorageID -> TreeSet<Block>)比如一个Datanode故障恢复后,上面的数据块在系统中副本数太多,需要删除一些数据块。 // Map<String, Collection<Block>> excessReplicateMap = new TreeMap<String, Collection<Block>>(); Random r = new Random(); /** * Stores a set of DatanodeDescriptor objects. * This is a subset of {@link #datanodeMap}, containing nodes that are * considered alive. * The {@link HeartbeatMonitor} periodically checks for outdated entries, * and removes them from the list. * 所有目前活着的DataNode,线程HeartbeatMonitor会定期检查 */ ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>(); /** * Store set of Blocks that need to be replicated 1 or more times. * Set of: Block * * 需要进行复制的数据块。UnderReplicatedBlocks的类图如下,它其实是一个数组,数组的下标是优先级(0的优先级最高,如果数据块只有一个副本,它的优先级是0), * 数组的内容是一个Block集合。UnderReplicatedBlocks提供一些方法,对Block进行增加,修改,查找和删除。 * */ private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks(); // We also store pending replication-orders.保存正在复制的数据块的相关信息 private PendingReplicationBlocks pendingReplications; public LeaseManager leaseManager = new LeaseManager(this); // // Threaded object that checks to see if we have been // getting heartbeats from all clients. // Daemon hbthread = null; // HeartbeatMonitor thread 对应DataNode心跳检查 public Daemon lmthread = null; // LeaseMonitor thread 租约检查 Daemon smmthread = null; // SafeModeMonitor thread 安全模式检查 public Daemon replthread = null; // Replication thread 数据块复制 private ReplicationMonitor replmon = null; // Replication metrics private volatile boolean fsRunning = true; //系统运行标志 long systemStart = 0;//系统启动时间 // The maximum number of replicates we should allow for a single block private int maxReplication; // How many outgoing replication streams a given node should have at one time private int maxReplicationStreams; // MIN_REPLICATION is how many copies we need in place or else we disallow the write private int minReplication; // Default replication private int defaultReplication; // Variable to stall new replication checks for testing purposes private volatile boolean stallReplicationWork = false; // heartbeatRecheckInterval is how often namenode checks for expired datanodes private long heartbeatRecheckInterval; // heartbeatExpireInterval is how long namenode waits for datanode to report // heartbeat private long heartbeatExpireInterval; //replicationRecheckInterval is how often namenode checks for new replication work private long replicationRecheckInterval; // default block size of a file private long defaultBlockSize = 0; // allow appending to hdfs files private boolean supportAppends = true; /** * Last block index used for replication work. */ private int replIndex = 0; ///和neededReplications配合,记录下一个进行复制的数据块位置。 private long missingBlocksInCurIter = 0; private long missingBlocksInPrevIter = 0; public static FSNamesystem fsNamesystemObject; /** NameNode RPC address */ private InetSocketAddress nameNodeAddress = null; // TODO: name-node has this field, it should be removed here //安全模式是这样一种状态,系统处于这个状态时,不接受任何对名字空间的修改,同时也不会对数据块进行复制或删除数据块。 //NameNode启动的时候会自动进入安全模式,同时也可以手工进入(不会自动离开)。系统启动以后,DataNode会报告目前它拥有的数据块的信息, //当系统接收到的Block信息到达一定门槛,同时每个Block都有dfs.replication.min个副本后,系统等待一段时间后就离开安全模式。这个门槛定义的参数包括: //dfs.safemode.threshold.pct:接受到的Block的比例,缺省为95%,就是说,必须DataNode报告的数据块数目占总数的95%,才到达门槛; //dfs.replication.min:缺省为1,即每个副本都存在系统中; //dfs.replication.min:等待时间,缺省为0,单位秒。 private SafeModeInfo safeMode; // safe mode information //保存了主机名(String)到DatanodeDescriptor数组的映射(Host2NodesMap唯一的成员变量为HashMap<String,DatanodeDescriptor[]> map,它的方法都是对这个map进行操作)。 private Host2NodesMap host2DataNodeMap = new Host2NodesMap(); // datanode networktoplogy // 定义了HDFS的网络拓扑,网络拓扑对应选择数据块副本的位置很重要。如在一个层次型的网络中,接到同一个交换机的两个节点间的网络速度, // 会比跨越多个交换机的两个节点间的速度快,但是,如果某交换机故障,那么它对接到它上面的两个节点会同时有影响,但跨越多个交换机的两个节点,这种影响会小得多 NetworkTopology clusterMap = new NetworkTopology(); private DNSToSwitchMapping dnsToSwitchMapping; // for block replicas placement //用于为数据块备份选择目标,例如,用户写文件时,需要选择一些DataNode,作为数据块的存放位置,这时候就利用它来选择目标地址。 //chooseTarget是ReplicationTargetChooser中最重要的方法, //它通过内部的一个NetworkTopology对象,计算出一个DatanodeDescriptor数组,该数组就是选定的DataNode,同时,顺序就是最佳的数据流顺序 ReplicationTargetChooser replicator; //保存了系统中允许/不允许连接到NameNode的机器列表 private HostsFileReader hostsReader; // 线程句柄,该线程用于检测DataNode上的Decommission进程。例如,某节点被列入到不允许连接到NameNode的机器列表中(HostsFileReader) // 那么,该节点会进入Decommission状态,它上面的数据块会被复制到其它节点,复制结束后机器进入DatanodeInfo.AdminStates.DECOMMISSIONED,这台机器就可以从HDFS中撤掉。 private Daemon dnthread = null; //系统能拥有的INode最大数(配置项dfs.max.objects,0为无限制)。 private long maxFsObjects = 0; // maximum number of fs objects /** * The global generation stamp for this file system. */ private final GenerationStamp generationStamp = new GenerationStamp(); // Ask Datanode only up to this many blocks to delete. // 发送给DataNode删除数据块消息中,能包含的最大数据块数。比方说,如果某DataNode上有250个Block需要被删除,而这个参数是100, // 那么一共会有3条删除数据块消息消息,前面两条包含了100个数据块,最后一条是50个。 int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT; // precision of access times. //用于控制文件的access时间的精度,也就是说,小于这个精度的两次对文件访问,后面的那次就不做记录了。 private long accessTimePrecision = 0; private String nameNodeHostName;
FSNamesystem初始化方法:
private void initialize(NameNode nn, Configuration conf) throws IOException { this.systemStart = now(); setConfigurationParameters(conf); //读取配置文件 dtSecretManager = createDelegationTokenSecretManager(conf); //读取dfs.namenode.delegation.token的相关配置 this.nameNodeAddress = nn.getNameNodeAddress(); this.registerMBean(conf); // register the MBean for the FSNamesystemStutus this.dir = new FSDirectory(this, conf); StartupOption startOpt = NameNode.getStartupOption(conf); this.dir.loadFSImage(getNamespaceDirs(conf), getNamespaceEditsDirs(conf), startOpt); //加载namenode持久化在硬盘的信息 long timeTakenToLoadFSImage = now() - systemStart; LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"); NameNode.getNameNodeMetrics().setFsImageLoadTime(timeTakenToLoadFSImage); //加载安全模式信息,进入安全模式 this.safeMode = new SafeModeInfo(conf); //将block的总数设置给safemode setBlockTotal(); pendingReplications = new PendingReplicationBlocks( conf.getInt("dfs.replication.pending.timeout.sec", -1) * 1000L); if (isAccessTokenEnabled) { accessTokenHandler = new BlockTokenSecretManager(true, accessKeyUpdateInterval, accessTokenLifetime); } //启动心跳监控的线程 this.hbthread = new Daemon(new HeartbeatMonitor()); //启动文件租约管理监控的线程 this.lmthread = new Daemon(leaseManager.new Monitor()); //启动副本监控的线程 this.replmon = new ReplicationMonitor(); this.replthread = new Daemon(replmon); hbthread.start(); lmthread.start(); replthread.start(); //读取主机信息黑白名单 this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""), conf.get("dfs.hosts.exclude","")); //启动退役节点监控的线程 this.dnthread = new Daemon(new DecommissionManager(this).new Monitor( conf.getInt("dfs.namenode.decommission.interval", 30), conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5))); dnthread.start(); this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); /* If the dns to swith mapping supports cache, resolve network * locations of those hosts in the include list, * and store the mapping in the cache; so future calls to resolve * will be fast. */ if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts())); } InetSocketAddress socAddr = NameNode.getAddress(conf); this.nameNodeHostName = socAddr.getHostName(); //将这个类注册到监控系统 registerWith(DefaultMetricsSystem.INSTANCE); }
从代码中可以看到,FSNamesystem的initialize方法主要的工作是:通过读取配置文件设置成员变量;创建FSDirectory并 loadFSImage;设置系统安全模式;启动一系列的后台线程monitorDaemon。其中对 loadFSImage非常重要。 Namenode会将HDFS的文件和目录元数据存储在一个叫fsimage的二进制文 件中,每次保存fsimage之后到下次保存之间的所有hdfs操作,将会记录在editlog文件中,当editlog达到一定的大小(bytes,由 fs.checkpoint.size参数定义)或从上次保存过后一定时间段过后(sec,由fs.checkpoint.period参数定 义),namenode会重新将内存中对整个HDFS的目录树和文件元数据刷到fsimage文件中。Namenode就是通过这种方式来保证HDFS中 元数据信息的安全性。当namenode重启加载fsimage时,就是按照如下格式协议从文件流中加载元数据信息。
从fsimag的存储格式可以看出,fsimage保存有如下信息:
写道
1. image head,其中包含:
a) imgVersion(int):当前image的版本信息
b) namespaceID(int):用来确保别的HDFS instance中的datanode不会误连上当前NN。
c) numFiles(long):整个文件系统中包含有多少文件和目录
d) genStamp(long):生成该image时的时间戳信息。
2.文件或目录的源数据信息,如果是目录,则包含以下信息:
a)path(String):该目录的路径,如”/user/zhuhui/data”
b)replications(short):副本数(目录虽然没有副本,但这里记录的目录副本数也为3)
c)mtime(long):该目录的修改时间的时间戳信息
d)atime(long):该目录的访问时间的时间戳信息
e)blocksize(long):目录的blocksize都为0
f)numBlocks(int):实际有多少个文件块,目录的该值都为-1,表示该item为目录
g)nsQuota(long):namespace Quota值,若没加Quota限制则为-1
h)dsQuota(long):disk Quota值,若没加限制则也为-1
i)username(String):该目录的所属用户名
j)group(String):该目录的所属组
k)permission(short):该目录的permission信息,如644等,有一个short来记录。
3.如果是文件,则还会额外包含如下信息:
a)blockid(long):属于该文件的block的blockid,
b)numBytes(long):该block的大小
c)genStamp(long):该block的时间戳
a) imgVersion(int):当前image的版本信息
b) namespaceID(int):用来确保别的HDFS instance中的datanode不会误连上当前NN。
c) numFiles(long):整个文件系统中包含有多少文件和目录
d) genStamp(long):生成该image时的时间戳信息。
2.文件或目录的源数据信息,如果是目录,则包含以下信息:
a)path(String):该目录的路径,如”/user/zhuhui/data”
b)replications(short):副本数(目录虽然没有副本,但这里记录的目录副本数也为3)
c)mtime(long):该目录的修改时间的时间戳信息
d)atime(long):该目录的访问时间的时间戳信息
e)blocksize(long):目录的blocksize都为0
f)numBlocks(int):实际有多少个文件块,目录的该值都为-1,表示该item为目录
g)nsQuota(long):namespace Quota值,若没加Quota限制则为-1
h)dsQuota(long):disk Quota值,若没加限制则也为-1
i)username(String):该目录的所属用户名
j)group(String):该目录的所属组
k)permission(short):该目录的permission信息,如644等,有一个short来记录。
3.如果是文件,则还会额外包含如下信息:
a)blockid(long):属于该文件的block的blockid,
b)numBytes(long):该block的大小
c)genStamp(long):该block的时间戳
当该文件对应的numBlocks数不为1,而是大于1时,表示该文件对应有多个block信息,此时紧接在该fsimage之后的就会有多个 blockid,numBytes和genStamp信息。因此,在namenode启动时,就需要对fsimage按照如下格式进行顺序的加载,以将 fsimage中记录的HDFS元数据信息加载到内存中。
namenode在加载fsimage过程其实非常简单,就是从fsimage中不停的顺序读取文件和目录的元数据信息,并在内存中构建整个 namespace,此时BlocksMap中每个block对应的datanodes 列表暂时为空。当fsimage加载完毕后,整个HDFS的目录结构在内存中就已经初始化完毕,所缺的就是每个文件对应的block对应的 datanode列表信息。这些信息需要从datanode的blockReport中获取,所以加载fsimage完毕后,namenode进程进入 rpc等待状态,等待所有的datanodes发送blockReports。
HDFS将fsimage和edits文件内容读入内存,进行合并,填充与INode相关的元数据结构,并将新的元数据内存镜像导出,在磁盘上形成新的 fsimage和edits文件。HDFS同时还接收DataNode的心跳信息,填充与Block和Data Node相关的元数据结构。
saveNameSpace将元数据写入到磁盘,具体操作步骤:首先将current目录重命名为lastcheckpoint.tmp;然后在创建新的 current目录,并保存文件;最后将lastcheckpoint.tmp重命名为privios.checkpoint。
namenode在加载fsimage过程其实非常简单,就是从fsimage中不停的顺序读取文件和目录的元数据信息,并在内存中构建整个 namespace,此时BlocksMap中每个block对应的datanodes 列表暂时为空。当fsimage加载完毕后,整个HDFS的目录结构在内存中就已经初始化完毕,所缺的就是每个文件对应的block对应的 datanode列表信息。这些信息需要从datanode的blockReport中获取,所以加载fsimage完毕后,namenode进程进入 rpc等待状态,等待所有的datanodes发送blockReports。
HDFS将fsimage和edits文件内容读入内存,进行合并,填充与INode相关的元数据结构,并将新的元数据内存镜像导出,在磁盘上形成新的 fsimage和edits文件。HDFS同时还接收DataNode的心跳信息,填充与Block和Data Node相关的元数据结构。
saveNameSpace将元数据写入到磁盘,具体操作步骤:首先将current目录重命名为lastcheckpoint.tmp;然后在创建新的 current目录,并保存文件;最后将lastcheckpoint.tmp重命名为privios.checkpoint。
在加载完成时,初始化了一下四个守护线程:
//启动心跳监控的线程 this.hbthread = new Daemon(new HeartbeatMonitor()); //启动文件租约管理监控的线程 this.lmthread = new Daemon(leaseManager.new Monitor()); //启动副本监控的线程 this.replmon = new ReplicationMonitor(); this.replthread = new Daemon(replmon); hbthread.start(); lmthread.start(); replthread.start(); //启动退役节点监控的线程 this.dnthread = new Daemon(new DecommissionManager(this).new Monitor( conf.getInt("dfs.namenode.decommission.interval", 30), conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5))); dnthread.start();(2)初始化RPC服务器
//init namenode-client RPC Server this.server = RPC.getServer(this, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }server启动了三个(假设handlercount为1)线程,这三个线程分别为listener,responder,handler ,这三个线程之间是有职责关系的:Hadoop的Server采用了Java的NIO,这样的话就不需要为每一个socket连接建立一个线程,读取 socket上的数据。在Server中,只需要一个线程,就可以accept新的连接请求和读取socket上的数据,这个线程,就是 Listener。请求处理线程一般有多个,它们都是Server.Handle类的实例。它们的run方法循环地取出一个Server.Call,调用 Server.call方法,搜集结果并串行化,然后将结果放入Responder队列中。对于处理完的请求,需要将结果写回去,同样,利用NIO,只需 要一个线程,相关的逻辑在Responder里。
(3)启动Trash Emptier 线程
//启动垃圾清理守护线程,读取fs.trash.interval的值作为两次清理的时间间隔。默认每60分钟清理一次 startTrashEmptier(conf); private void startTrashEmptier(Configuration conf) throws IOException { this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier"); this.emptier.setDaemon(true); this.emptier.start(); }
相关推荐
在Hadoop这个分布式计算框架中,HDFS(Hadoop Distributed File System)和MapReduce是两个核心组件,它们共同构建了大数据处理的基础架构。HDFS提供了高容错性的分布式存储,而MapReduce则提供了大规模数据集的并行...
### Hadoop-HDFS环境下文件上传与下载操作指南 #### 一、Windows环境下配置Hadoop环境 **1.1 下载Hadoop** 为了在Windows环境下配置Hadoop环境,首先需要下载Hadoop软件包。推荐下载Hadoop 2.7.7版本,可以从清华...
《IDEA中的Hadoop HDFS插件:提升大数据开发效率》 在大数据处理领域,Apache Hadoop是一个不可或缺的工具,其分布式文件系统(HDFS)为海量数据存储提供了可靠的解决方案。而对于开发人员来说,拥有一个良好的集成...
- **第一阶段:NameNode启动** - 加载编辑日志和映像文件到内存。 - 当客户端对元数据进行增删改操作时,这些操作被记录到编辑日志中,并在内存中执行。 - **第二阶段:Secondary NameNode工作** - 向主NameNode...
《Idea Hadoop-HDFS插件详解与应用》 在大数据开发领域,Hadoop作为分布式计算框架的重要代表,其HDFS(Hadoop Distributed File System)是数据存储的核心组件。为了方便开发者在IDEA(IntelliJ IDEA)环境中更加...
自己的笔记,仅供参考,包含HDFS的启动停止,HDFS基本原理(上传文件、连接校验,下载文件,数据存储位置,通信机制,namenode和DataNode职责,元数据工作机制),java端操作HDFS的基本方法
4. **启动Hadoop**:运行`winutils.exe`初始化HDFS目录,然后分别启动NameNode、DataNode和YARN的Resource Manager及Node Manager。你可以通过命令行或者脚本自动化这个过程。 5. **测试Hadoop**:启动Hadoop后,...
安装Ranger-HDFS插件涉及以下几个步骤:首先,需要将"ranger-2.0.0-SNAPSHOT-hdfs-plugin"解压并按照官方文档的指导部署到HDFS集群中的各个NameNode节点;接着,配置Ranger Admin服务,导入HDFS插件,并为HDFS创建...
5. **启动Hadoop服务**:通过`sbin`目录下的脚本启动Hadoop的各个服务,如`start-dfs.sh`启动HDFS,`start-yarn.sh`启动YARN。 6. **Hadoop命令**:现在可以通过`hadoop fs`或`hadoop dfs`命令与HDFS交互,进行文件...
### Hadoop-HDFS知识点解析 #### 一、HDFS概述 **1.1 HDFS产出背景及定义** HDFS(Hadoop Distributed File System)是一种分布式文件系统,它为Hadoop的大数据处理提供了高效的存储能力。随着互联网的发展,数据...
### 大数据、Hadoop与HDFS详解 随着信息技术的快速发展和互联网的普及,数据量呈爆炸性增长态势。传统的数据处理工具和技术已无法满足如此大规模数据的存储、管理和分析需求。为此,Apache Hadoop应运而生,它提供...
- **错误日志分析**:当遇到问题时,检查Hadoop的日志文件,如`logs/hadoop-root-namenode-localhost.out`和`logs/hadoop-root-datanode-localhost.out`,它们会提供错误信息帮助解决问题。 - **防火墙配置**:...
### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...
Hadoop-Eclipse-Plugin-3.1.1是一款专为Eclipse集成开发环境设计的插件,用于方便地在Hadoop分布式文件系统(HDFS)上进行开发和调试MapReduce程序。这款插件是Hadoop生态系统的组成部分,它使得Java开发者能够更加...
Hadoop技术-HDFS元数据是Hadoop技术中非常重要的一部分,HDFS(Hadoop Distributed File System)是Hadoop技术的核心组件之一,负责存储和管理大规模数据。HDFS元数据是Hadoop技术中用于维护整个文件系统的数据,...
HDFS(Hadoop Distributed File System)则是Hadoop的核心组件之一,负责数据的分布式存储。本篇将深入探讨Hadoop平台上的HDFS,以及如何在该平台上进行文件操作。 一、Hadoop平台基础 Hadoop是基于Java开发的,它...
配置完成后,你可以通过命令行运行Hadoop的各种命令,如启动HDFS服务 (`hadoop dfsadmin -report`) 或执行MapReduce任务。在Windows上,可能还需要安装Java开发套件(JDK)并配置Java环境变量,因为Hadoop依赖Java...