hadoop.ipc和RPC简介
hadoop 和hbase中的大部分服务都是通过hadoop.ipt.RPC这个类来实现的。
hadoop.ipc.RPC 实现了一种远程过程调用的框架,应用可以直接定义过程调用的协议接口和协议的server端实现,就可以直接通过RPC框架获得RPC server和client端的接口代理。
hadoop.ipc.RPC 的实现利用了 hadoop.ipc.Server 和 hadoop.ipc.Client这两个类, 这两个类实现了网络中非常典型的Request-Response模式服务器和客户端框架。用户可以通过定义一个协议接口并实现出Request和Response类,以及Server端的抽象处理接口(Server.call()) 就可以实现出完整的服务器程序,而客户端程序只需要在创建hadoop.ipc.Client实体时,指定协议接口和网络相关参数,然后调用 call() 就可以发送请求并获取响应。
Hadoop.ipc.RPC作为Hadoop的底层核心组件,在hadoop
HDFS,MapReduce以及HBase中都有广泛的使用。
HDFS中NameNode,DataNode等都是通过实现对应协议的接口,然后利用hadoop.ipc.RPC获取服务器实体的。
HBase中的HBaseRPC采用的也是与hadoop.ipc.RPC类似的实现,其中的Region
Server, Master Server 都是通过实现对应的协议接口直接获取服务器实体的。
hadoop.ipc将应用逻辑与网络消息的处理分离开,并且使得逻辑对象在不同的进程或组件之间有同样的语言接口,无需区分远程对象和本地对象,使得开发者可以关注于应用的处理逻辑。
hadoop.ipc.RPC类中有两个重要的函数getServer和getProxy,getServer通过接口协议实现的实体来获取真正的server,getProxy获取远程访问的本地代理。
class RPC {
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
SocketFactory factory)
......
}
实例(远程执行Shell命令)
定义客户端与服务器的协议接口ExecProtocol
import java.lang.String;
import org.apache.hadoop.ipc.VersionedProtocol;
/* 这里不扩展VersionedProtocol 也是可以的 */
public interface ExecProtocol extends VersionedProtocol
{
public static final long versionID = 1L;
public String exec(String[] cmd);
}
ExecServer 实现了ExecProtocol,并通过RPC.getServer 获取RPCserver
import java.lang.String;
import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.io.BufferedInputStream;
import java.io.InputStream;
public class ExecServer implements ExecProtocol
{
private String host;
private int port;
private RPC.Server server;
public ExecServer (String host, int port) throws IOException
{
this.host = host;
this.port = port;
/* 获取server实体 */
this.server = RPC.getServer (this, host, port, new Configuration ());
}
public void run () throws IOException
{
/* 运行 */
this.server.start ();
}
/* 实现 VersionedProtocol 接口 */
public long getProtocolVersion (String s, long v)
{
return versionID;
}
/* 实现 ExecProtocol.exec接口 */
public String exec (String[] cmd)
{
try {
Process process = Runtime.getRuntime ().exec (cmd);
process.waitFor();
return loadStream (process.getInputStream ())
+ loadStream (process.getErrorStream ());
} catch (Exception e) {
return e.getMessage ();
}
}
/* 用于实现具体的exec接口 */
private static String loadStream (InputStream stream) throws IOException
{
if (stream == null) {
throw new java.io.IOException ("null stream");
}
stream = new java.io.BufferedInputStream (stream);
int avail = stream.available ();
byte[]data = new byte[avail];
int numRead = 0;
int pos = 0;
do {
if (pos + avail > data.length) {
byte[]newData = new byte[pos + avail];
System.arraycopy (data, 0, newData, 0, pos);
data = newData;
}
numRead = stream.read (data, pos, avail);
if (numRead >= 0) {
pos += numRead;
}
avail = stream.available ();
} while (avail > 0 && numRead >= 0);
return new String (data, 0, pos, "US-ASCII");
}
public static void main (String[]args) throws IOException
{
ExecServer s = new ExecServer ("localhost", 1600);
s.run ();
}
}
client 端通过RPC.getProxy获取本地代理
import java.lang.String;
import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.conf.Configuration;
import java.net.InetSocketAddress;
public class ExecClient
{
public static void main (String[] args) throws IOException
{
/* 服务器地址 */
InetSocketAddress addr = new InetSocketAddress ("localhost", 1600);
/* 通过RPC.getProxy 获取客户端代理类的实体 */
ExecProtocol proxy = (ExecProtocol) RPC.getProxy(ExecProtocol.class,
ExecProtocol.versionID, addr,
new Configuration ());
/* 输出 */
System.out.print (proxy.exec(args));
}
}
Hadoop RPC的实现
Hadoop.ipc.RPC的实现依赖于ipc.Server 和ipc.Client。Hadoop.ipc.RPC 将过程调用封装成具体的Invocation类,该类封装了调用的方法和参数,并用一个ObjectWritable类来定义返回的数据类型。ipc.Server 和 ipc.Client 负责网络数据的收发。
ipc.Server 的实现包含以下几个部分
Listener 监听网络端口,接受网络请求,然后交给Reader处理
Reader 非阻塞收取网络数据,并解析
Handler 调用抽象方法Server.call生成相应数据
Responder 非阻塞发送数据
Server.call() 由具体RPC.Server利用反射和代理实现,处理请求
这是一种典型的流水线结构,采用流水线的原因是Handler的操作可能导致阻塞,必须要有独立的线程或线程组处理Hander,线程之间的切换必不可少。
Listener独立为单独的线程大概是为了Reader之间负载的均衡,新加入的连接按照round robin在Reader之间进行负载均衡(实际上可能并不均衡,每条连接处理的请求以及持续的时间是不确定的)。同步时需要注意的一点是,Listener在向Reader的Selector中添加链接时,需要设置一个adding标记,并打断Selecter,这样做的目的是避免Reader的select() 操作和register()操作产生竞态。
分享到:
相关推荐
3. **序列化与反序列化**:Hadoop RPC使用Writables接口进行数据序列化,将对象转化为字节流在网络中传输,到达目的地后再反序列化为对象。这确保了数据在网络间的有效传输。 4. **安全认证**:为了保证通信安全,...
Hadoop RPC(Remote Procedure Call,远程过程调用)是 Hadoop 项目中的一个重要组件,用于实现分布式系统中的通信和数据交换。下面是对 Hadoop RPC 的详细分析。 RPCInterface Hadoop RPC 的核心是 RPCInterface...
Hadoop的RPC实现主要集中在`org.apache.hadoop.ipc`包下。`ProtobufRpcEngine`和`ReflectionUtils`是关键类,前者负责protobuf协议的序列化和反序列化,后者用于创建服务器实例。在`RPC.Server`中,可以看到对请求的...
1.java接口操作Hadoop文件系统(文件上传下载删除创建......2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客https://blog.csdn.net/qq_34233510/article/details/88142507
2. **Hadoop RPC实现**: - Hadoop的RPC基于Java的Socket和Serialization实现,它定义了一套自有的协议,用于序列化和反序列化消息,以及处理请求和响应。 - Hadoop RPC提供了一种标准接口,使得客户端可以透明地...
Hadoop的RPC框架就是基于这个理念构建的,它实现了客户端与服务器端之间的高效、安全的通信机制。 要使用Hadoop的RPC框架,你需要完成以下步骤: 1. **定义协议**:创建一个接口,声明客户端和服务器端需要交互的...
### Hadoop架构设计与源码分析 #### Hadoop简介 Hadoop是一个能够对大量数据进行分布式处理的软件框架,由Apache基金会开发。它能够提供高可靠性、高扩展性以及高效的数据处理能力,被广泛应用于大数据处理领域。 ...
RPCServer实现了一种抽象的RPC服务,同时提供Call队列。RPCServer作为服务提供者由两个部分组成:接收Call调用和处理Call调用。接收Call调用负责接收来自RPCClient的调用请求,编码成Call对象后放入到Call队列中。这...
Hadoop作为一个广泛使用的分布式计算框架,其内部大量依赖于RPC机制来实现节点间的通信。 本文将详细介绍如何使用Hadoop的RPC机制创建一个简单的协议接口、通信服务端和通信客户端程序。这不仅有助于理解分布式系统...
Hadoop 使用了一种特殊的 RPC 实现来支持其分布式环境中的各种服务。 ##### 2.2 Hadoop RPC 架构 Hadoop 的 RPC 系统主要包括以下几个关键组成部分: - **Client Proxy**:客户端代理,负责向服务器发送请求并...
通过这个小测试,我们可以更直观地了解Hadoop中的RPC工作流程,学习如何在自己的应用中实现类似的分布式通信。此外,理解RPC对于开发Hadoop生态系统的其他组件,如MapReduce任务调度或自定义服务,都是非常重要的...
Java的RMI(Remote Method Invocation)是官方提供的RPC实现,但Hadoop并不直接使用RMI,而是采用了自定义的RPC框架。这个框架允许自定义协议,如protobuf或thrift,用于数据序列化和反序列化,以及定义服务接口和...
Hadoop的RPC机制中,服务器通过监视队列(如calllist和responselist)来实现高效的并发处理和响应。这种方法避免了handler线程因等待客户端确认而阻塞,提升了系统整体的吞吐量。 总结来说,Hadoop的RPC机制是一个...
在这个过程中,`hadoop-spring`可能是一个示例项目,包含了实现Hadoop与Spring结合的代码。它可能包含了Spring配置文件、Hadoop服务接口和实现、以及如何在Spring应用中使用这些服务的示例代码。 总结来说,Hadoop...
1. **网络通信**:Hadoop使用Socket通信进行节点间的通信,如RPC(Remote Procedure Call)协议,使得Hadoop组件能远程调用其他节点上的服务。 2. **安全**:Hadoop支持Kerberos认证,确保在分布式环境中的数据安全...
《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者亲自执笔,首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和...
Hadoop中的RPC机制是基于Java的IPC(Inter-Process Communication)实现的,它在设计时考虑了性能、效率和可控制性,因此与RMI(Remote Method Invocation)等其他RPC方案有所不同。 1. **RPC原理**: Hadoop的RPC...
《Hadoop技术内幕:深入解析HADOOP COMMON和HDFS架构设计与实现原理》这本书是Hadoop技术领域的一本深入解析之作,它详尽地探讨了Hadoop的两大核心组件——HADOOP COMMON和HDFS(Hadoop Distributed File System)的...