有了Client,有了Server,那整个过程怎么运行起来?
先说一下基本原理:
- 1. 首先客户端和服务器端之间要有一个协议,这里的协议就是以java接口类的方式暴露出来的
- 2. 虽然Client类和Server类之间已经具有通信的能力,也有了协议,那么一个真正的客户端要调用服务器端rpc调用的实现,只需要解决参数及具体的调用实现两个问题即可
- 3. 客户端要做的,就是要将参数(这个一般称为存根)通过网络传递到服务器端。这个自然而然想到使用代理模式,因为Client已经具备网络通信的能力,只要通过代理,实现获取参数进行传输即可,为什么不在Client这里实现参数的获取,如果这样的话,就违反了单一职责的原则,且扩展性不行,总不能一个客户端的调用实现一个特定的Client类吧。因此,将Client的功能单一独立出来,只负责将参数通过网络传递到服务器端
- 4. 服务器要做的工作,只需要进行调用的真正的实现即可,当然了, 最后需要能够返回正确的结果。
上面说的这些,都全部在hadoop的这个RPC里进行了实现。
客户端的主要代理实现方法如下:
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol,conf).getProxy(protocol,
clientVersion, addr, ticket, conf, factory, rpcTimeout);
}
其中是调用RpcEngine的下面这个接口方法来进行实现的:
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException
对应的,可以查看一个具体实现的代码,WritableRpcEngine类的实现:
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout)
throws IOException {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
factory, rpcTimeout));
return new ProtocolProxy<T>(protocol, proxy, true);
}
真正的代理处理在InVoker类里实现(关于JDK的动态代理,可参看
http://jimmee.iteye.com/admin/blogs/776820)
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
}
ObjectWritable value = (ObjectWritable)
// 这里取得要调用的方法,参数列表,之后通过Client对象传递给服务器端
client.call(new Invocation(method, args), remoteId);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
服务器端真正的实现,也在RpcEngine的一个具体实现里:
public Writable call(Class<?> protocol, Writable param, long receivedTime)
throws IOException {
….
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
Method method = protocol.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
// Verify rpc version
….
//Verify protocol version.
……
long startTime = System.currentTimeMillis();
// 真正的调用
Object value = method.invoke(instance, call.getParameters());
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime);
}
rpcMetrics.addRpcQueueTime(qTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
processingTime);
if (verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);
…..
}
一个简单的例子:
客户端和服务器端的协议及实现:
package cn.edu.jimmee;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
* rpc的协议接口
* @author jimmee
*/
public interface RpcProtocol extends VersionedProtocol {
public BooleanWritable printMsg(IntWritable id, Text msg);
}
package cn.edu.jimmee;
import java.io.IOException;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
/**
* rpc的协议实现接口
* @author jimmee
*/
public class RpcProtocolImpl implements RpcProtocol {
@Override
public BooleanWritable printMsg(IntWritable id, Text msg) {
System.out.println("id=" + id.get() + ", msg=" + msg.toString());
if (Math.random() < 0.5) {
return new BooleanWritable(true);
} else {
return new BooleanWritable(false);
}
}
@Override
public long getProtocolVersion(String protocol,
long clientVersion)
throws IOException {
return 0;
}
}
服务器代码:
package cn.edu.jimmee;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
/**
* rpc的server
* @author jimmee
*/
public class RpcServer {
public static void main(String[] args) throws IOException {
RpcProtocol instance = new RpcProtocolImpl();
Configuration conf = new Configuration();
Server server = RPC.getServer(instance, "127.0.0.1", 7777, conf);
server.start();
}
}
客户端的代码:
package cn.edu.jimmee;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
/**
* rpc的client端的实现
* @author jimmee
*/
public class RpcClient {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
RpcProtocol rpcClientImpl = (RpcProtocol) RPC.getProxy(RpcProtocol.class, 0, new InetSocketAddress("127.0.0.1", 7777), conf);
for (int i = 0; i < 10; i++) {
System.out.println(rpcClientImpl.printMsg(new IntWritable(i), new Text("hello" + i)));
}
}
}
分享到:
相关推荐
4. 关闭连接:RPC 服务端关闭与客户端的连接。 Hadoop RPC 的优点 Hadoop RPC 有很多优点,包括: * 高性能:Hadoop RPC 使用异步非阻塞的方式来处理客户端的调用请求,从而提高了系统的性能。 * 可扩展性:...
4. **安全认证**:为了保证通信安全,Hadoop RPC支持多种安全机制,如Kerberos,可以防止未授权的访问。 5. **连接管理**:Hadoop RPC有连接池管理,对客户端的请求进行复用和管理,提高效率并降低资源消耗。 ### ...
其中,Hadoop 的远程过程调用(RPC)机制是其核心组件之一,用于实现不同节点之间的高效通信。本文将详细介绍 Hadoop RPC 的基本概念、工作原理以及其实现细节。 #### 二、Hadoop RPC 基本介绍 ##### 2.1 RPC 概念...
在Java中模拟Hadoop的RPC通讯,主要是为了理解其连接和心跳机制,这是保证Hadoop集群稳定运行的关键部分。 RPC的核心思想是透明性,即客户端可以像调用本地方法一样调用远程服务,由RPC框架负责数据的序列化、网络...
4. **网络**: 由于标签提及“网络”,我们可以推测QuLab_RPC是一个在网络环境中运行的服务,可能处理跨网络的数据交换和通信问题。 5. **分布式**: 分布式标签暗示QuLab_RPC设计用于分布式系统,可能具有分布式处理...
在Hadoop中,远程过程调用(RPC)是核心组件之一,它使得节点间的通信变得高效且可靠。本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在...
Zookeeper是Apache Hadoop的一个子项目,它提供了一个分布式的,开放源码的协调服务,用于解决分布式应用中的命名、配置管理、组服务、分布式同步等问题。虽然在transmission_rpc的描述中没有直接提到Zookeeper,但...
在IT行业中,分布式计算系统的重要性日益凸显,而Hadoop作为其中的佼佼者,其核心组件之一就是远程过程调用(RPC,Remote Procedure Call)。RPC允许一个程序在某个网络中的计算机上执行另一个计算机上的程序,而...
4. **Hadoop RPC核心类**: - `ProtobufRpcEngine`:Hadoop使用Google的Protocol Buffers进行序列化和反序列化,此引擎负责处理这些操作。 - `RPC.Server`:服务器端的实现,负责接收和处理客户端的请求。 - `RPC...
### Hadoop的RPC通信程序详解 #### 一、引言 在分布式系统中,远程过程调用(Remote Procedure Call, RPC)是一种重要的通信机制,它允许一台计算机上的程序调用另一台计算机上的子程序,而无需程序员了解底层网络...
在分布式计算领域,Hadoop RPC(Remote Procedure Call)框架是一个至关重要的组件,它允许不同的进程之间进行通信,尤其是在大规模数据处理的场景下。Hadoop RPC是Hadoop生态系统中的基础服务,使得不同模块如HDFS...
1.java接口操作Hadoop文件系统(文件上传下载删除创建......2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客https://blog.csdn.net/qq_34233510/article/details/88142507
本文将深入探讨Hadoop中的远程过程调用(RPC)协议,这是Hadoop组件间通信的关键技术,也是理解Hadoop生态系统运作的重要一环。 RPC(Remote Procedure Call)允许一个程序在不关心远程服务器细节的情况下,调用...
Hadoop rpc源码是从Hadoop分离出的ipc,去掉了认证部分,附录使用文档.使用前请add lib包commons-logging-*.*.*.jar(我用的是1.0.4)和log4j-*.*.*.jar(我的1.2.13) 相关blog post: ...
【大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第11期_HBase简介及安装_V1.0 共21页.pdf】这篇文档主要介绍了HBase这一大数据处理的重要组件,以及其在Hadoop生态系统中的角色。HBase是一个基于列族的...
HBase是一个分布式列式存储系统,常用于大数据处理,它构建于Hadoop之上,并依赖Zookeeper进行分布式协调。 1、从配置角度优化 1.1 修改Linux配置 为了应对大数据处理中可能遇到的高并发情况,需要调整Linux系统...
**HBase** 是一个构建在 **Hadoop** 分布式文件系统 (HDFS) 之上的分布式、可扩展的大规模数据存储系统。它是针对大数据量场景设计的,特别适用于需要实时读写访问大量稀疏数据的应用场景。HBase 的设计灵感来源于 ...
### RPC架构概述 RPC(Remote Procedure Call Protocol,远程过程调用协议)是一种通过网络请求服务的方式,它允许程序调用另一个地址空间中的函数或方法,就像调用本地进程中的函数或方法一样简单。RPC框架主要...
在Hadoop中,远程过程调用(Remote Procedure Call, RPC)是一种重要的通信机制,它允许分布式系统中的组件之间进行高效且便捷的交互。Hadoop的RPC机制基于Java的客户端-服务器模型,允许客户端调用服务器上的方法,...