`
jimmee
  • 浏览: 538771 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

rpc中怎么处理方法的调用的?

阅读更多

1. rpc请求中怎么分发请求方法

 

方法一: 直接使用反射, 通过方法名, 参数名等反射调用

实际使用中的示例, hadoop的实现, 具体可参见 http://jimmee.iteye.com/blog/1206598 例如:

 

org.apache.hadoop.ipc.RPC

 

 

 public Writable call(Class<?> protocol, Writable param, long receivedTime) 
    throws IOException {
      try {
        Invocation call = (Invocation)param;
        if (verbose) log("Call: " + call);

        Method method =
          protocol.getMethod(call.getMethodName(),
                                   call.getParameterClasses());
        method.setAccessible(true);

        long startTime = System.currentTimeMillis();
        Object value = method.invoke(instance, call.getParameters());

   .......
 }

 

 

 

 

方式二: 使用一个标记值来区分, 例如, 如果readInt()=1, 则表示method1, 若readInt()=2, 则表示method2

 

现实中的示例, 同样是hadoop, 例如:

 

org.apache.hadoop.hdfs.server.datanode.DataXceiver

 

 

 

  * Read/write data from/to the DataXceiveServer.
   */
  public void run() {
    DataInputStream in=null; 
    try {
      in = new DataInputStream(
          new BufferedInputStream(NetUtils.getInputStream(s), 
                                  SMALL_BUFFER_SIZE));
      short version = in.readShort();
      if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
        throw new IOException( "Version Mismatch" );
      }
      boolean local = s.getInetAddress().equals(s.getLocalAddress());
      byte op = in.readByte();
      // Make sure the xciver count is not exceeded
      int curXceiverCount = datanode.getXceiverCount();
      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
        throw new IOException("xceiverCount " + curXceiverCount
                              + " exceeds the limit of concurrent xcievers "
                              + dataXceiverServer.maxXceiverCount);
      }
      long startTime = DataNode.now();
      switch ( op ) {
      case DataTransferProtocol.OP_READ_BLOCK:
        readBlock( in );
        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.readsFromLocalClient.inc();
        else
          datanode.myMetrics.readsFromRemoteClient.inc();
        break;
      case DataTransferProtocol.OP_WRITE_BLOCK:
    .....
 }

 

 

 

 

方式三: thrift的实现方式, 就是一个接口方法对应一个类, 接口的所有参数对应一个类.

 

 

 

public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor implements org.apache.thrift.TProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
    public Processor(I iface) {
      super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
    }

    protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }

    private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      // 一个接口对应一个类
      processMap.put("query", new query());
      .....
    }


    
    // 接口的参数
    public static class query_args implements org.apache.thrift.TBase<query_args, query_args._Fields>, java.io.Serializable, Cloneable   {
    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("query_args");

 

 

 

 

执行过程:

(1) 线程池处理连接(题外话: 这里的实现允许连接进来, 实际上会排队, 线程池处理链接, 如果都是长连接, 则后面的会等很久很久)

 

 

 setServing(true);
    while (!stopped_) {
      int failureCount = 0;
      try {
        TTransport client = serverTransport_.accept();
        WorkerProcess wp = new WorkerProcess(client);
        executorService_.execute(wp);
      } catch (TTransportException ttx) {
        if (!stopped_) {
          ++failureCount;
          LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
        }
      }
    }

 

 

(2) 读取消息:

 

 

 public boolean process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();
    // 方法名称
    ProcessFunction fn = processMap.get(msg.name);
    if (fn == null) {
      TProtocolUtil.skip(in, TType.STRUCT);
      in.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
      x.write(out);
      out.writeMessageEnd();
      out.getTransport().flush();
      return true;
    }
// 方法处理
    fn.process(msg.seqid, in, out, iface);
    return true;
  }


 public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
    T args = getEmptyArgsInstance();
    try {
	// 读取参数值
      args.read(iprot);
    } catch (TProtocolException e) {
      iprot.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
      oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
      x.write(oprot);
      oprot.writeMessageEnd();
      oprot.getTransport().flush();
      return;
    }
    iprot.readMessageEnd();
    // 实际方法执行
    TBase result = getResult(iface, args);
    oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));
    result.write(oprot);
    oprot.writeMessageEnd();
    oprot.getTransport().flush();
  }


  protected query_result getResult(I iface, query_args args) throws org.apache.thrift.TException {
        query_result result = new query_result();
        try {
          result.success = iface.query(args.domainName, args.query, args.mode);
        } catch (OperationException excp) {
          result.excp = excp;
        }
        return result;
      }
 
  

 

 

  

 

分享到:
评论

相关推荐

    RPC(远程过程调用)

    RPC(Remote Procedure Call)是一种计算机通信协议,它允许程序在分布式环境中的一个系统上执行另一系统上的函数或方法,就像是本地调用一样。这个过程涉及到了客户端、服务器端和服务调用的封装,使得开发者无需...

    Java RPC调用示例

    5. **调用处理**:当客户端发起请求时,服务器接收到请求后,根据请求中的信息找到对应的服务实现,执行服务方法,并将结果返回给客户端。 在"RpcTest"这个示例中,可能包含以下组件: - `ServiceInterface`:定义...

    nodejs使用原生的dubbo协议打通了dubbo的rpc方法调用.

    标题中的“nodejs使用原生的dubbo协议打通了dubbo的rpc方法调用”意味着在Node.js环境中,开发者成功地实现了对Dubbo服务的RPC(远程过程调用)访问,利用了Dubbo协议的特性。Dubbo是阿里巴巴开源的一个高性能、轻量...

    java-rpc远程过程调用

    Java RPC(Remote Procedure Call)是一种让程序在不同的网络计算机之间调用对方方法的技术。这个例子是用纯Java编写的,不依赖任何JDK之外的第三方库,因此非常适合初学者理解和学习。RPC的核心思想是透明性,即...

    RPC.rar_C++ 远程调用_VC6.0 开发RPC_windows RPC_远程过程调用

    - 客户端开发:说明如何在客户端调用远程服务,以及如何处理调用结果。 - 错误处理和安全性:讨论RPC中的错误处理机制以及如何确保通信安全。 - 实例分析:通过实际的代码示例,演示整个RPC程序的开发过程。 掌握...

    phprpc调用示例

    一般常用的调用方法 htdocs http://127.0.0.1/ 为WEB根目录 PHPRPC演示 php版本调用 http://127.0.0.1/demo-phpclient.php 调用demo-phpserver.php发布的HelloWorld()方法 demo1 ...

    用RPC机制把本地调用转换成远程调用

    通过RPC机制将本地调用转换为远程调用的能力极大地扩展了程序的功能边界,使得应用程序可以在网络中的不同节点上分布运行。本文将详细介绍如何使用RPC机制实现这一目标,并基于给定的示例程序进行逐步讲解。 #### ...

    基于thrift的RPC调用实例

    RPC(远程过程调用)是一种在分布式系统中执行远程计算的方法,使得客户端可以在不理解远程服务器内部实现的情况下,像调用本地函数一样调用远程服务器上的服务。在本实例中,我们将关注一个基于Thrift的RPC调用实现...

    rpc远程调用库C语言实现

    4. **错误处理**:在RPC调用过程中,可能会出现各种错误,如网络连接问题、请求解析错误等。因此,良好的错误处理机制是必不可少的。 5. **并发处理**:为了提高效率,RPC库通常需要支持并发请求。这可以通过多线程...

    rpc远程调用使用说明&源码

    5. **调用处理(Invocation Handler)**:处理实际的RPC调用,包括序列化、网络传输和反序列化。 深入理解RPC源码有助于优化性能、排查问题,甚至自定义实现满足特定需求的RPC框架。在学习过程中,注意关注如何处理...

    java自制简单RPC调用例子

    在这个Java自制简单RPC调用例子中,我们看到项目结构包括三个模块:API、客户端(Client)和服务器端(Server),这些都是实现RPC的核心组成部分。 1. **API模块**: API模块定义了接口,这些接口是客户端和服务端...

    基于IOCP的远程函数调用(RPC)

    远程过程调用(RPC)是一种分布式计算技术,允许程序在不同的地址空间(如不同计算机或网络中的不同进程)间调用功能,就像是调用本地函数一样方便。IOCP(I/O完成端口)是Windows系统中一种高效的I/O模型,它在RPC...

    php rpc远程过程调用

    1. **客户端**:客户端拥有想要调用的远程方法的引用,通过RPC协议将请求发送到服务器。这个请求包含方法名、参数等信息。 2. **序列化**:客户端将请求数据转换为可传输的格式,例如JSON或PHP序列化字符串。 3. *...

    可跨平台RPC远程文件操作调用(附详细说明文档)

    RPC,即Remote Procedure Call,远程过程调用,是一种在分布式计算环境中实现程序间通信的技术。它允许一个程序在不理解底层网络协议的情况下,调用执行在另一台机器上的程序,就像是本地调用一样简单。本资源提供的...

    WebServices服务接口调用---基于rpc方式应用

    RPC是一种使客户端能够执行远程服务器上的方法的技术,它隐藏了底层的网络通信细节,使得调用过程如同本地调用一样简单。在WebServices场景下,SOAP(Simple Object Access Protocol)协议通常与RPC模型结合,提供了...

    基于socket实现的rpc调用demo

    RPC(Remote Procedure Call)是一种进程间通信技术,它允许程序在不同的网络环境下调用另一台计算机上的函数或方法,就像是直接调用本地函数一样简单。本示例是基于Socket实现的RPC调用Demo,让我们深入探讨一下这...

    rpc 远程调用

    RPC使得分布式系统中的组件能够像调用本地方法一样调用远程服务,极大地简化了分布式编程。 在Java中,RPC框架的实现通常涉及到以下关键概念和技术: 1. **接口与代理**:RPC的核心是通过接口来定义服务。客户端...

    rpc调用的一个demo

    在这个“rpc调用的一个demo”中,我们将会探讨RPC的基本原理,以及如何实现一个简单的RPC调用。 首先,RPC的核心概念是透明性:客户端在调用远程服务时,并不感知到服务的远程特性,仿佛它就是一个本地方法。RPC...

    python利用phprpc进行远程调用

    Python是一种广泛使用的编程语言,而Phprpc是一个跨平台的RPC(Remote Procedure Call)框架,它使得Python程序能够与PHP程序之间进行高效、便捷的远程方法调用。在这个主题中,我们将深入探讨如何利用Phprpc在...

Global site tag (gtag.js) - Google Analytics