`
jimmee
  • 浏览: 538818 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hadoop_rpc之RPC(4)

 
阅读更多
有了Client,有了Server,那整个过程怎么运行起来?
先说一下基本原理:
  • 1. 首先客户端和服务器端之间要有一个协议,这里的协议就是以java接口类的方式暴露出来的
  • 2. 虽然Client类和Server类之间已经具有通信的能力,也有了协议,那么一个真正的客户端要调用服务器端rpc调用的实现,只需要解决参数及具体的调用实现两个问题即可
  • 3. 客户端要做的,就是要将参数(这个一般称为存根)通过网络传递到服务器端。这个自然而然想到使用代理模式,因为Client已经具备网络通信的能力,只要通过代理,实现获取参数进行传输即可,为什么不在Client这里实现参数的获取,如果这样的话,就违反了单一职责的原则,且扩展性不行,总不能一个客户端的调用实现一个特定的Client类吧。因此,将Client的功能单一独立出来,只负责将参数通过网络传递到服务器端
  • 4. 服务器要做的工作,只需要进行调用的真正的实现即可,当然了, 最后需要能够返回正确的结果。


上面说的这些,都全部在hadoop的这个RPC里进行了实现。
客户端的主要代理实现方法如下:
 
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout) throws IOException {    
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    return getProtocolEngine(protocol,conf).getProxy(protocol,
        clientVersion, addr, ticket, conf, factory, rpcTimeout);
  }


其中是调用RpcEngine的下面这个接口方法来进行实现的:
 
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout)
    throws IOException

对应的,可以查看一个具体实现的代码,WritableRpcEngine类的实现:
 public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout)
    throws IOException {    

    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
            factory, rpcTimeout));
    return new ProtocolProxy<T>(protocol, proxy, true);
}


真正的代理处理在InVoker类里实现(关于JDK的动态代理,可参看http://jimmee.iteye.com/admin/blogs/776820
  
public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      long startTime = 0;
      if (LOG.isDebugEnabled()) {
        startTime = System.currentTimeMillis();
      }
		
      ObjectWritable value = (ObjectWritable)
      // 这里取得要调用的方法,参数列表,之后通过Client对象传递给服务器端
client.call(new Invocation(method, args), remoteId);
      if (LOG.isDebugEnabled()) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }


服务器端真正的实现,也在RpcEngine的一个具体实现里:
public Writable call(Class<?> protocol, Writable param, long receivedTime) 
    throws IOException {
		….
        Invocation call = (Invocation)param;
        if (verbose) log("Call: " + call);

        Method method = protocol.getMethod(call.getMethodName(),
                                           call.getParameterClasses());
        method.setAccessible(true);

        // Verify rpc version
        ….
        
        //Verify protocol version.
       ……

        long startTime = System.currentTimeMillis();
		  // 真正的调用
        Object value = method.invoke(instance, call.getParameters());
        int processingTime = (int) (System.currentTimeMillis() - startTime);
        int qTime = (int) (startTime-receivedTime);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Served: " + call.getMethodName() +
                    " queueTime= " + qTime +
                    " procesingTime= " + processingTime);
        }
        rpcMetrics.addRpcQueueTime(qTime);
        rpcMetrics.addRpcProcessingTime(processingTime);
        rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
                                             processingTime);
        if (verbose) log("Return: "+value);

        return new ObjectWritable(method.getReturnType(), value);
		…..     
  }

一个简单的例子:
客户端和服务器端的协议及实现:
package cn.edu.jimmee;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
 * rpc的协议接口
 * @author jimmee
 */
public interface RpcProtocol extends VersionedProtocol {
	public BooleanWritable printMsg(IntWritable id, Text msg);
}

package cn.edu.jimmee;
import java.io.IOException;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
/**
 * rpc的协议实现接口
 * @author jimmee
 */
public class RpcProtocolImpl implements RpcProtocol {
	@Override
	public BooleanWritable printMsg(IntWritable id, Text msg) {
		System.out.println("id=" + id.get() + ", msg=" + msg.toString());	
		if (Math.random() < 0.5) {
			return new BooleanWritable(true);
		} else {
			return new BooleanWritable(false);
		}
	}

	@Override
	public long getProtocolVersion(String protocol,
            long clientVersion)
			throws IOException {
		return 0;
	}
}


服务器代码:
package cn.edu.jimmee;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
/**
 * rpc的server
 * @author jimmee
 */
public class RpcServer {
	public static void main(String[] args) throws IOException {
		RpcProtocol instance = new RpcProtocolImpl();
		Configuration conf = new Configuration();
		Server server = RPC.getServer(instance, "127.0.0.1", 7777, conf);
		server.start();
	}
}

客户端的代码:
package cn.edu.jimmee;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
/**
 * rpc的client端的实现
 * @author jimmee
 */
public class RpcClient {
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		RpcProtocol rpcClientImpl = (RpcProtocol) RPC.getProxy(RpcProtocol.class, 0, new InetSocketAddress("127.0.0.1", 7777), conf);
		for (int i = 0; i < 10; i++) {
			System.out.println(rpcClientImpl.printMsg(new IntWritable(i), new Text("hello" + i)));
		}
	}
}

分享到:
评论

相关推荐

    Hadoop_RPC详细分析.doc

    4. 关闭连接:RPC 服务端关闭与客户端的连接。 Hadoop RPC 的优点 Hadoop RPC 有很多优点,包括: * 高性能:Hadoop RPC 使用异步非阻塞的方式来处理客户端的调用请求,从而提高了系统的性能。 * 可扩展性:...

    hadoop rpc实例

    4. **安全认证**:为了保证通信安全,Hadoop RPC支持多种安全机制,如Kerberos,可以防止未授权的访问。 5. **连接管理**:Hadoop RPC有连接池管理,对客户端的请求进行复用和管理,提高效率并降低资源消耗。 ### ...

    学习hadoop_源代码,RPC_部分

    其中,Hadoop 的远程过程调用(RPC)机制是其核心组件之一,用于实现不同节点之间的高效通信。本文将详细介绍 Hadoop RPC 的基本概念、工作原理以及其实现细节。 #### 二、Hadoop RPC 基本介绍 ##### 2.1 RPC 概念...

    java_RPC_hadoop.zip

    在Java中模拟Hadoop的RPC通讯,主要是为了理解其连接和心跳机制,这是保证Hadoop集群稳定运行的关键部分。 RPC的核心思想是透明性,即客户端可以像调用本地方法一样调用远程服务,由RPC框架负责数据的序列化、网络...

    PyPI 官网下载 | QuLab_RPC-1.3.3.tar.gz

    4. **网络**: 由于标签提及“网络”,我们可以推测QuLab_RPC是一个在网络环境中运行的服务,可能处理跨网络的数据交换和通信问题。 5. **分布式**: 分布式标签暗示QuLab_RPC设计用于分布式系统,可能具有分布式处理...

    Hadoop RPC机制分析

    在Hadoop中,远程过程调用(RPC)是核心组件之一,它使得节点间的通信变得高效且可靠。本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在...

    PyPI 官网下载 | transmission_rpc-0.0.8.tar.gz

    Zookeeper是Apache Hadoop的一个子项目,它提供了一个分布式的,开放源码的协调服务,用于解决分布式应用中的命名、配置管理、组服务、分布式同步等问题。虽然在transmission_rpc的描述中没有直接提到Zookeeper,但...

    hadoop中RPC协议的小测试例子(吴超老师)

    在IT行业中,分布式计算系统的重要性日益凸显,而Hadoop作为其中的佼佼者,其核心组件之一就是远程过程调用(RPC,Remote Procedure Call)。RPC允许一个程序在某个网络中的计算机上执行另一个计算机上的程序,而...

    java操作hadoop的RPC,源码

    4. **Hadoop RPC核心类**: - `ProtobufRpcEngine`:Hadoop使用Google的Protocol Buffers进行序列化和反序列化,此引擎负责处理这些操作。 - `RPC.Server`:服务器端的实现,负责接收和处理客户端的请求。 - `RPC...

    Hadoop的RPC通信程序

    ### Hadoop的RPC通信程序详解 #### 一、引言 在分布式系统中,远程过程调用(Remote Procedure Call, RPC)是一种重要的通信机制,它允许一台计算机上的程序调用另一台计算机上的子程序,而无需程序员了解底层网络...

    Hadoop自己的Rpc框架使用Demo

    在分布式计算领域,Hadoop RPC(Remote Procedure Call)框架是一个至关重要的组件,它允许不同的进程之间进行通信,尤其是在大规模数据处理的场景下。Hadoop RPC是Hadoop生态系统中的基础服务,使得不同模块如HDFS...

    Hadoop Java接口+RPC代码实现

    1.java接口操作Hadoop文件系统(文件上传下载删除创建......2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客https://blog.csdn.net/qq_34233510/article/details/88142507

    Hadoop_RPCDemo:Hadoop原始解析之RPC协议

    本文将深入探讨Hadoop中的远程过程调用(RPC)协议,这是Hadoop组件间通信的关键技术,也是理解Hadoop生态系统运作的重要一环。 RPC(Remote Procedure Call)允许一个程序在不关心远程服务器细节的情况下,调用...

    Hadoop rpc源码

    Hadoop rpc源码是从Hadoop分离出的ipc,去掉了认证部分,附录使用文档.使用前请add lib包commons-logging-*.*.*.jar(我用的是1.0.4)和log4j-*.*.*.jar(我的1.2.13) 相关blog post: ...

    大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第11期_HBase简介及安装_V1.0 共21页.pdf

    【大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第11期_HBase简介及安装_V1.0 共21页.pdf】这篇文档主要介绍了HBase这一大数据处理的重要组件,以及其在Hadoop生态系统中的角色。HBase是一个基于列族的...

    大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第12期副刊_HBase性能优化_V1.0 共26页.pdf

    HBase是一个分布式列式存储系统,常用于大数据处理,它构建于Hadoop之上,并依赖Zookeeper进行分布式协调。 1、从配置角度优化 1.1 修改Linux配置 为了应对大数据处理中可能遇到的高并发情况,需要调整Linux系统...

    细细品味Hadoop_Hadoop集群(第11期)_HBase简介及安装.pdf

    **HBase** 是一个构建在 **Hadoop** 分布式文件系统 (HDFS) 之上的分布式、可扩展的大规模数据存储系统。它是针对大数据量场景设计的,特别适用于需要实时读写访问大量稀疏数据的应用场景。HBase 的设计灵感来源于 ...

    rpc架构与hadoop分享

    ### RPC架构概述 RPC(Remote Procedure Call Protocol,远程过程调用协议)是一种通过网络请求服务的方式,它允许程序调用另一个地址空间中的函数或方法,就像调用本地进程中的函数或方法一样简单。RPC框架主要...

    Hadoop里的RPC机制过程

    在Hadoop中,远程过程调用(Remote Procedure Call, RPC)是一种重要的通信机制,它允许分布式系统中的组件之间进行高效且便捷的交互。Hadoop的RPC机制基于Java的客户端-服务器模型,允许客户端调用服务器上的方法,...

Global site tag (gtag.js) - Google Analytics