http://zhangjun5965.iteye.com/blog/2375278
概述
hdfs中的文件是以块的形式存储的,每个块默认有三个副本,这些副本又存放在不同的datandoe上,读取文件的过程,就是先获取这些块的地址,然后依次读取各个快的数据
hdfs读写数据通过DataXceiverServer提供一个服务,建立java的socket服务,接受来自客户端的各种请求,每种请求会有不同的操作码,服务端通过这个操作码来判断是哪种请求。每次来一个请求,就新建一个线程来具体处理逻辑,具体的实现我们下面做一些简单的分析
DataXceiverServer介绍
了解DataXceiverServer
DataXceiverServer类位于org.apache.hadoop.hdfs.server.datanode包下,
/**
* Server used for receiving/sending a block of data.
* This is created to listen for requests from clients or
* other DataNodes. This small server does not use the
* Hadoop IPC mechanism.
*/
class DataXceiverServer implements Runnable {
/*
*PeerServer是一个接口,在datanode启动的时候初始化了他的一个实 *现类TcpPeerServer,TcpPeerServer类封装了Java的*ServerSocket功能.通过java的socket功能来提供服务,监听请求,处理请求
*
*/
private final PeerServer peerServer;
//所属datanode
private final DataNode datanode;
private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
private boolean closed = false;
/**
* 如名字所示,最大的Xceiver的数量
* Maximal number of concurrent xceivers per node.
* Enforcing the limit is required in order to avoid data-node
* running out of memory.
*/
int maxXceiverCount =
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
}
从注释中了解到DataXceiverServer用于接受和发送数据块,监听客户端和其他的datanode的请求。
初始化工作
既然DataXceiverServer是用来接受和发送数据的,那么就应该是datanode工作的一部分,我们从datanode的初始化的代码中找到了DataXceiverServer的初始化代码.
在datanode的startDataNode方法中,通过 initDataXceiver(conf);来初始化DataXceiverServer,我们进入initDataXceiver方法.
private void initDataXceiver(Configuration conf) throws IOException {
// find free port or use privileged port provided
TcpPeerServer tcpPeerServer;
if (secureResources != null) {
tcpPeerServer = new TcpPeerServer(secureResources);
} else {
int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
DataNode.getStreamingAddr(conf), backlogLength);
}
tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
DomainPeerServer domainPeerServer =
getDomainPeerServer(conf, streamingAddr.getPort());
if (domainPeerServer != null) {
this.localDataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(domainPeerServer, conf, this));
LOG.info("Listening on UNIX domain socket: " +
domainPeerServer.getBindPath());
}
}
this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
}
通过代码,我们看到首先new 了一个PeerServer的子类TcpPeerServer. 然后通过这个peerServer和相应的conf作为参数new了一个DataXceiverServer对象,并且将其加入了一个线程组,设置成守护线程.
这里涉及到了两个重要的概念,一个是线程组,一个是守护线程.在java中可以对线程组中的线程进行操作,比如interrupt操作可以打断一个线程组内的所有的线程,设置成守护线程,这样可以在主线程退出的情况下然所有的守护线程自动退出.
工作原理
我们来看DataXceiverServer的run方法
@Override
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
//通过accept方法在这里一直阻塞,直到有请求过来,我们通过跟踪代码,看到内部其实是封装了java的serverSocket的accept方法.
peer = peerServer.accept();
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount
+ " exceeds the limit of concurrent xcievers: "
+ maxXceiverCount);
}
//当有请求过来的时候,就通过DataXceiver.create创建了一个守护进程,并将其加到线程组里.
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
...................
}
//省略了异常处理和关闭服务处理
...................
}
通过代码我们看到,每次来了一个请求,DataXceiverServer就创建一个守护进程DataXceiver去处理请求,每个datanode上能创建多少个DataXceiver,就是DataXceiverServer中的变量maxXceiverCount来控制的. 这个变量可以通过配置文件来配置,变量名是dfs.datanode.max.transfer.threads,默认数字是4096,这个可以根据datanode的运行情况和性能来进行配置,也是hdfs优化的一个重要参数.
DataXceiver介绍
Op类介绍
当发送和接受数据的服务DataXceiver创建之后,是通过Op类中的各个操作码来标识各种操作的,Op类具体路径是org.apache.hadoop.hdfs.protocol.datatransfer.Op,在这里定义了一些操作码,用于区分不同的操作,比如读、写、copy等。
public enum Op {
WRITE_BLOCK((byte)80),
READ_BLOCK((byte)81),
READ_METADATA((byte)82),
REPLACE_BLOCK((byte)83),
COPY_BLOCK((byte)84),
BLOCK_CHECKSUM((byte)85),
TRANSFER_BLOCK((byte)86),
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89);
.........................
}
处理逻辑
既然DataXceiver是一个线程,那么他的处理逻辑就应该在run方法里,我们来看run方法
/**
* Read/write data from/to the DataXceiverServer.
*/
@Override
public void run() {
int opsProcessed = 0;
Op op = null;
try {
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
try {
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId());
input = new BufferedInputStream(saslStreams.in,
HdfsConstants.SMALL_BUFFER_SIZE);
socketOut = saslStreams.out;
} catch (InvalidMagicNumberException imne) {
if (imne.isHandshake4Encryption()) {
LOG.info("Failed to read expected encryption handshake from client " +
"at " + peer.getRemoteAddressString() + ". Perhaps the client " +
"is running an older version of Hadoop which does not support " +
"encryption");
} else {
LOG.info("Failed to read expected SASL data transfer protection " +
"handshake from client at " + peer.getRemoteAddressString() +
". Perhaps the client is running an older version of Hadoop " +
"which does not support SASL data transfer protection");
}
return;
}
super.initialize(new DataInputStream(input));
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
} else {
peer.setReadTimeout(dnConf.socketTimeout);
}
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
// Since we optimistically expect the next op, it's quite normal to get EOF here.
if (opsProcessed > 0 &&
(err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
}
} else {
incrDatanodeNetworkErrors();
throw err;
}
break;
}
// restore normal timeout
if (opsProcessed != 0) {
peer.setReadTimeout(dnConf.socketTimeout);
}
opStartTime = monotonicNow();
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
........................
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
+ datanode.getXceiverCount());
}
updateCurrentThreadName("Cleaning up");
if (peer != null) {
dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in);
}
}
}
通过 op = readOp();获取具体是什么操作,读、写、copy等,然后processOp(op);方法来处理具体的逻辑
在方法中,通过switch来具体的分发,让不同的方法执行不同的逻辑
/** Process op by the corresponding method. */
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
跟踪代码,最后还是调用了DataXceiver类里面的readBlock方法来做具体的读取数据的操作
@Override
public void readBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName;
long read = 0;
updateCurrentThreadName("Sending block " + block);
OutputStream baseStream = getOutputStream();
DataOutputStream out = getBufferedOutputStream();
checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
// send the block
BlockSender blockSender = null;
..............................
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
true, false, sendChecksum, datanode, clientTraceFmt,
cachingStrategy);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
sendResponse(ERROR, msg);
throw e;
}
// send op status
writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
long beginRead = Time.monotonicNow();
read = blockSender.sendBlock(out, baseStream, null); // send data
.................................
} catch ( SocketException ignored ) {
.................................
} finally {
IOUtils.closeStream(blockSender);
}
}
主要就是构造了一个BlockSender对象,通过其sendBlock方法来将数据发送到客户端
BlockSender 读取数据
传统方式实现数据传输
传统方式读取数据,首先内核读出全盘数据,然后将数据跨越内核用户推到应用程序,然后应用程序再次跨越内核用户将数据推回,写出到套接字。应用程序实际上在这里担当了一个不怎么高效的中介角色,将磁盘文件的数据转入套接字
零拷贝实现数据传输
原理
Java 类库通过 java.nio.channels.FileChannel 中的 transferTo() 方法来在 Linux 和 UNIX 系统上支持零拷贝。可以使用 transferTo() 方法直接将字节从它被调用的通道上传输到另外一个可写字节通道上,数据无需流经应用程序
具体操作
BlockSender的doSendBlock方法中,通过以下的操作来判断是否可以进行transferTo操作。
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
在经过一系列的检查之后,在sendPacket方法进行具体的操作
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
} else {
// normal transfer
out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
其中sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime);封装了具体的java底层的操作
客户端读数据流程分析
通过前面的代码我们知道datanode在启动的时候启动了java的socket来监听请求,那么客户端的请求是怎么发送的呢?这个就是接下来我们要研究的问题.
java api读取数据
我们先来一段简单的java api读取hdfs数据的代码
@Test
public void testRead() {
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path p = new Path("hdfs://localhost:9000/a.txt");
FSDataInputStream in = fs.open(p);
BufferedReader buff = new BufferedReader(new InputStreamReader(in));
String str = null;
while ((str = buff.readLine()) != null) {
System.out.println(str);
}
buff.close();
in.close();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
首先通过FileSystem fs = FileSystem.get(conf);来实例化FileSystem的子类,也就是分布式文件系统DistributedFileSystem。(具体是通过传进来的conf里面路径的前缀的配置来决定实例化哪个系统,如hdfs://,就是DistributedFileSystem,具体这里就不讲了)
然后通过 fs.open(p); 来打开一个输入流,用于读取数据
构造DFSInputStream
跟踪代码,我们打开DistributedFileSystem的open方法
@Override
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
DFSInputStream是分布式文件用于读数据的输入流,用它来对hdfs的文件进行读操作,通过DFSClient的open方法打开了一个DFSInputStream。
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
获取文件的块信息
在DFSClient的open方法中,new了一个DFSInputStream类,从namnode获取文件的块信息的主要方法是openInfo,我们来分析下。
内部的fetchLocatedBlocksAndGetLastBlockLength方法,我们从名字可以了解到,获取所有的块信息以及最后一个快的长度,获取最后一个块的长度主要是针对并发读写的情况,读数据的时候如果有其他线程在进行追加操作,最后的块的大小会有所改变。
跟踪代码,最后调用了org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(ClientProtocol, String, long, long)方法来通过namenode的代理来发请求,具体的是在ClientProtocol接口的实现类ClientNamenodeProtocolTranslatorPB的getBlockLocations方法中封装了请求的对象GetBlockLocationsRequestProto,通过hadoop rpc发送到namenode来获取数据
@Override
public LocatedBlocks getBlockLocations(String src, long offset, long length)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
.newBuilder()
.setSrc(src)
.setOffset(offset)
.setLength(length)
.build();
try {
GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null,
req);
return resp.hasLocations() ?
PBHelper.convert(resp.getLocations()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
具体的实现代码是在namenode的服务代理org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer的getBlockLocations方法实现。
@Override // ClientProtocol
public LocatedBlocks getBlockLocations(String src,
long offset,
long length)
throws IOException {
checkNNStartup();
metrics.incrGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
}
主要就是通过namesystem.getBlockLocations来从命名空间中获取相应的信息,主要就是先获取文件对应的INodeFile,然后再获取文件对应的块信息,返回的块的信息根据datanode距离客户端的距离做了简单的排序,具体的可以跟踪下代码
DFSInputStream read 数据
DFSInputStream的read方法进入readWithStrategy方法,然后进入blockSeekTo方法
接下来在org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(long) 方法,构造出来一个BlockReader对象,用于读取数据
/**
* 打开了一个输入流用于读取数据
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
..................................
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
........................
}
跟踪build方法,看到调用了RemoteBlockReader2.newBlockReader来获取的BlockReader对象。
/**
* 打开了一个输入流用于读取数据
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
..................................
blockReader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
setStartOffset(offsetIntoBlock).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(blk.getNumBytes() - offsetIntoBlock).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(!shortCircuitForbidden).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
........................
}
Sender发送数据
在org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader方法中,通过Sender对象的readBlock来读取数据。
public static BlockReader newBlockReader(String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
boolean verifyChecksum,
String clientName,
Peer peer, DatanodeID datanodeID,
PeerCache peerCache,
CachingStrategy cachingStrategy) throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum, cachingStrategy);
................................
}
readBlock方法中,通过发送值为81的状态码org.apache.hadoop.hdfs.protocol.datatransfer.Op.READ_BLOCK 到DataXceiver中的peer服务。
@Override
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.build();
send(out, Op.READ_BLOCK, proto);
}
在这里我们看到把各个参数都set到了OpReadBlockProto对象里,然后发送出去,也就是发送到了初始化的DataXceiverServer服务.
这个时候服务端一直阻塞的socket线程将会收到操作码为81的读请求,然后就进行后续的处理
我们看下,其实其他的一些对于数据的操作,如copyBlock,writeBlock都是在Sender中完成的.
总结
至此,我们分析了hdfs读取数据的全部流程,包括服务端如何初始化、如何为每个过来的请求建立一个线程用于读取数据,如果利用零拷贝技术来减少开销,以及客户端如何发送读取数据的请求。
由于本人目前尚处在学习的阶段,难免有错误或者疏漏,如有问题,请大家多多指教。
相关推荐
《Hadoop_2.X_HDFS源码剖析》是由徐鹏编著的一本深入解析Hadoop 2.x版本中HDFS(Hadoop Distributed File System)源码的专业书籍。这本书旨在帮助读者理解HDFS的核心机制,提升在分布式存储系统方面的专业技能。 ...
在Java编程环境中,Hadoop分布式文件系统(HDFS)提供了丰富的Java API,使得开发者能够方便地与HDFS进行交互,包括文件的上传、下载、读写等操作。本篇文章将详细探讨如何使用HDFS Java API来实现文件上传的功能。 ...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。...因此,无论你是想成为数据工程师、数据科学家还是系统管理员,这个"Hadoop源码分析视频教程"都是不容错过的学习资源。
### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...
Hadoop 源码解析 - DataNode Hadoop 作为一个大数据处理框架,其核心组件之一是分布式文件系统(HDFS),而 DataNode 是 HDFS 中的重要组件之一。DataNode 负责存储和管理数据块,提供数据访问服务。本文将对 ...
接下来,“Hadoop学习总结之二:HDFS读写过程解析.doc”详细解释了HDFS的数据读写流程。在写入数据时,客户端首先与NameNode通信获取数据块位置,然后将数据分块并发送到各个DataNode。在读取数据时,客户端同样先...
通过阅读和理解Hadoop源码分析-HDFS部分.pdf文档,我们可以更深入地理解这些组件的工作原理,掌握HDFS在处理大数据时的内部机制。这对于我们优化HDFS的性能,解决实际问题,以及开发相关的分布式应用都具有重要的...
Hadoop作为大数据处理领域的一个重要框架,提供了强大的分布式计算...通过深入分析其源码,不仅可以掌握数据处理流程的每一个细节,还可以根据实际需要进行扩展和优化,这对于大数据开发人员来说是一个非常重要的技能。
### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...
本篇文章将围绕"Hadoop源码"这一主题,深度探讨HDFS的底层实现机制,帮助读者从源码层面理解其工作原理。 一、HDFS概述 HDFS是基于Google发表的GFS论文设计的分布式文件系统,旨在处理和存储大量数据。它采用了主从...
- YARN源码解析可以帮助理解资源分配和任务执行的流程。 5. 开发与调试 - 通过阅读源码,开发者可以自定义Hadoop的行为,例如编写自定义InputFormat、OutputFormat或Partitioner。 - 调试工具,如Hadoop的日志...
接着,会接触到Hadoop的基础知识,包括HDFS的架构和MapReduce的工作流程,理解如何将爬取到的数据导入Hadoop环境。 在实际操作中,学生们可能会遇到数据清洗、去重、异常处理等问题,需要运用Python的pandas库进行...
### Hadoop源码分析知识点详解 #### 一、Hadoop及其核心技术背景 Hadoop作为一款开源的分布式计算框架,其核心思想来源于Google发布的几篇重要论文。这些论文详细阐述了Google构建其分布式计算平台的关键技术和...
`Writable`接口定义了对象如何写入和读取数据流,这是Hadoop内部通信的关键。例如,自定义的`MyWritable`类会实现`write`方法,将成员变量`counter`和`timestamp`写入`DataOutput`。 通过深入分析Hadoop的源码,...
《Hadoop云计算和云存储源码实现解析》是针对大数据技术初学者及进阶者的一份宝贵资料,它深入探讨了Hadoop在云计算和云存储领域的应用与源码解析。Hadoop作为开源的大数据处理框架,是理解大数据处理机制的关键。本...
通过这个实战项目,学习者不仅可以掌握Java操作Hadoop MapReduce的基本方法,还能深入了解大数据处理流程,以及如何从海量气象数据中提取有价值的信息。此外,对于提升数据处理能力和分布式计算的理解也大有裨益。...
《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》是一本深入探讨Hadoop核心组件的书籍,其源代码提供了对Hadoop内部工作原理的直观理解。这本书主要关注两个关键部分:Hadoop Common和HDFS...
《深入剖析Hadoop源码》 Hadoop,作为开源大数据处理框架,因其高效、可靠、可扩展的特性,成为大数据领域的基石。本文将对Hadoop的源码进行深入解析,帮助读者理解其内部机制,尤其是核心组件HDFS(Hadoop ...
通过对Hadoop源码的深入分析,我们可以更清晰地理解其工作原理,这有助于优化性能,解决实际问题,以及为未来的分布式系统设计提供灵感。Hadoop的源码不仅是一份技术文档,更是学习分布式计算、云计算和大数据处理的...
- 当用户尝试在IE浏览器中访问存储在Hadoop上的图片时,如果直接访问HDFS的URL,浏览器可能无法正确解析二进制数据,而是将其当作文本显示,即出现“显示源码”的问题。 - 为了解决这个问题,我们通常会在服务器端...