`
wbj0110
  • 浏览: 1600970 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

RPC Server for Erlang, In Java

阅读更多

We are using Erlang to do some serious things, one of them is indeed part of a banking system. Erlang is a perfect language in concurrent and syntax (yes, I like its syntax), but lacks static typing (I hope new added -spec and -type attributes may be a bit helping), and, is not suitable for processing massive data (performance, memory etc). I tried parsing a 10M size XML file with xmerl, the lib for XML in OTP/Erlang, which causes terrible memory disk-swap and I can never get the parsed tree out.

It's really a need to get some massive data processed in other languages, for example, C, Java etc. That's why I tried to write RPC server for Erlang, in Java.

There is a jinterface lib with OTP/Erlang, which is for communication between Erlang and Java. And there are docs for how to get it to work. But, for a RPC server that is called from Erlang, there are still some tips for real world:

1. When you send back the result to caller, you need set the result as a tuple, with caller's tag Ref as the first element, and the destination should be the caller's Pid. It's something like:

OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[] {call.tag, tResult});
sConnection.send(call.to, msg); 

where, call.tag is a OtpErlangRef, and tResult can be any OtpErlangObject, call.to is a OtpErlangPid.

2. If you need to send back a massive data back to caller, the default buffer size of OtpErlangOutputStream is not good, I set it to 1024 * 1024 * 10

3. Since there may be a lot of concurrent callers call your RPC server, you have to consider the concurrent performance of your server, I choose using thread pool here.

The RPC server in Java has two class, RpcNode.java, and RpcMsg.java:

package net.lightpole.rpcnode;

import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangPid;
import com.ericsson.otp.erlang.OtpErlangRef;
import com.ericsson.otp.erlang.OtpErlangTuple;

/**
 *
 * @author Caoyuan Deng
 */
public class RpcMsg {

    public OtpErlangAtom call;
    public OtpErlangAtom mod;
    public OtpErlangAtom fun;
    public OtpErlangList args;
    public OtpErlangPid user;
    public OtpErlangPid to;
    public OtpErlangRef tag;

    public RpcMsg(OtpErlangTuple from, OtpErlangTuple request) throws IllegalArgumentException {
        if (request.arity() != 5) {
            throw new IllegalArgumentException("Not a rpc call");
        }

        /* {call, Mod, Fun, Args, userPid} */
        if (request.elementAt(0) instanceof OtpErlangAtom && ((OtpErlangAtom) request.elementAt(0)).atomValue().equals("call") &&
                request.elementAt(1) instanceof OtpErlangAtom &&
                request.elementAt(2) instanceof OtpErlangAtom &&
                request.elementAt(3) instanceof OtpErlangList &&
                request.elementAt(4) instanceof OtpErlangPid &&
                from.elementAt(0) instanceof OtpErlangPid &&
                from.elementAt(1) instanceof OtpErlangRef) {

            call = (OtpErlangAtom) request.elementAt(0);
            mod = (OtpErlangAtom) request.elementAt(1);
            fun = (OtpErlangAtom) request.elementAt(2);
            args = (OtpErlangList) request.elementAt(3);
            user = (OtpErlangPid) request.elementAt(4);
            to = (OtpErlangPid) from.elementAt(0);
            tag = (OtpErlangRef) from.elementAt(1);

        } else {
            throw new IllegalArgumentException("Not a rpc call.");
        }
    }

    /* {'$gen_call', {To, Tag}, {call, Mod, Fun, Args, User}} */
    public static RpcMsg tryToResolveRcpCall(OtpErlangObject msg) {
        if (msg instanceof OtpErlangTuple) {
            OtpErlangTuple tMsg = (OtpErlangTuple) msg;
            if (tMsg.arity() == 3) {
                OtpErlangObject[] o = tMsg.elements();
                if (o[0] instanceof OtpErlangAtom && ((OtpErlangAtom) o[0]).atomValue().equals("$gen_call") &&
                        o[1] instanceof OtpErlangTuple && ((OtpErlangTuple) o[1]).arity() == 2 &&
                        o[2] instanceof OtpErlangTuple && ((OtpErlangTuple) o[2]).arity() == 5) {
                    OtpErlangTuple from = (OtpErlangTuple) o[1];
                    OtpErlangTuple request = (OtpErlangTuple) o[2];

                    try {
                        return new RpcMsg(from, request);
                    } catch (IllegalArgumentException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
        
        return null;
    }
}
package net.lightpole.rpcnode;

import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangExit;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import com.ericsson.otp.erlang.OtpErlangTuple;
import com.ericsson.otp.erlang.OtpSelf;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * Usage:
 *   $ erl -sname clientnode -setcookie mycookie
 *   (clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).
 * 
 * @author Caoyuan Deng
 */
public abstract class RpcNode {

    public static final OtpErlangAtom OK = new OtpErlangAtom("ok");
    public static final OtpErlangAtom ERROR = new OtpErlangAtom("error");
    public static final OtpErlangAtom STOPED = new OtpErlangAtom("stoped");
    private static final int THREAD_POOL_SIZE = 100;
    private OtpSelf xSelf;
    private OtpConnection sConnection;
    private ExecutorService execService;

    public RpcNode(String xnodeName, String cookie) {
        this(xnodeName, cookie, THREAD_POOL_SIZE);
    }

    public RpcNode(String xnodeName, String cookie, int threadPoolSize) {
        execService = Executors.newFixedThreadPool(threadPoolSize);

        startServerConnection(xnodeName, cookie);
        loop();
    }

    private void startServerConnection(String xnodeName, String cookie) {
        try {
            xSelf = new OtpSelf(xnodeName, cookie);
            boolean registered = xSelf.publishPort();
            if (registered) {
                System.out.println(xSelf.node() + " is ready.");
                /**
                 * Accept an incoming connection from a remote node. A call to this
                 * method will block until an incoming connection is at least
                 * attempted.
                 */
                sConnection = xSelf.accept();
            } else {
                System.out.println("There should be an epmd running, start an epmd by running 'erl'.");
            }
        } catch (IOException ex) {
            Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
        } catch (OtpAuthException ex) {
            Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private void loop() {
        while (true) {
            try {
                final int[] flag = {0};

                final OtpErlangTuple msg = (OtpErlangTuple) sConnection.receive();

                Runnable task = new Runnable() {

                    public void run() {
                        RpcMsg call = RpcMsg.tryToResolveRcpCall(msg);

                        if (call != null) {
                            long t0 = System.currentTimeMillis();

                            flag[0] = processRpcCall(call);

                            System.out.println("Rpc time: " + (System.currentTimeMillis() - t0) / 1000.0);
                        } else {
                            try {
                                sConnection.send(sConnection.peer().node(), new OtpErlangString("unknown request"));
                            } catch (IOException ex) {
                                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
                            }
                        }
                    }
                };

                execService.execute(task);

                if (flag[0] == -1) {
                    System.out.println("Exited");
                    break;
                }

            } catch (OtpErlangExit ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IOException ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            } catch (OtpAuthException ex) {
                Logger.getLogger(RpcNode.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    protected void sendRpcResult(RpcMsg call, OtpErlangAtom head, OtpErlangObject result) throws IOException {
        OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] {head, result});

        // Should specify call.tag here
        OtpErlangTuple msg = new OtpErlangTuple(new OtpErlangObject[]{call.tag, tResult});
        // Should specify call.to here
        sConnection.send(call.to, msg, 1024 * 1024 * 10); 
    }

    public abstract int processRpcCall(RpcMsg call);
    

    // ------ helper
    public static String getShortLocalHost() {
        return getLocalHost(false);
    }

    public static String getLongLocalHost() {
        return getLocalHost(true);
    }

    private static String getLocalHost(boolean longName) {
        String localHost;
        try {
            localHost = InetAddress.getLocalHost().getHostName();
            if (!longName) {
                /* Make sure it's a short name, i.e. strip of everything after first '.' */
                int dot = localHost.indexOf(".");
                if (dot != -1) {
                    localHost = localHost.substring(0, dot);
                }
            }
        } catch (UnknownHostException e) {
            localHost = "localhost";
        }

        return localHost;
    }
}

As you can see, the RpcNode is an abstract class, by implement int processRpcCall(RpcMsg call), you can get your what ever wanted features. For example:

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package net.lightpole.xmlnode;

import basexnode.Main;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangString;
import java.io.IOException;
import net.lightpole.rpcnode.RpcMsg;
import net.lightpole.rpcnode.RpcNode;

/**
 *
 * @author dcaoyuan
 */
public class MyNode extends RpcNode {

    public MyNode(String xnodeName, String cookie, int threadPoolSize) {
        super(xnodeName, cookie, threadPoolSize);
    }

    @Override
    public int processRpcCall(RpcMsg call) {
        final String modStr = call.mod.atomValue();
        final String funStr = call.fun.atomValue();
        final OtpErlangList args = call.args;

        try {
            OtpErlangAtom head = ERROR;
            OtpErlangObject result = null;

            if (modStr.equals("xnode") && funStr.equals("stop")) {
                head = OK;
                sendRpcResult(call, head, STOPED);
                return -1;
            }

            if (modStr.equals("System") && funStr.equals("currentTimeMillis")) {
                head = OK;
                long t = System.currentTimeMillis();
                result = new OtpErlangLong(t);
            } else {
                result = new OtpErlangString("{undef,{" + modStr + "," + funStr + "}}");
            }

            if (result == null) {
                result = new OtpErlangAtom("undefined");
            }

            sendRpcResult(call, head, result);
        } catch (IOException ex) {
            ex.printStackTrace();
        } catch (Exception ex) {
        }

        return 0;
    }
}

I tested MyNode by:

$ erl -sname clientnode -setcookie mycookie
...
(clientnode@cmac)> rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).

And you can try to test its concurrent performance by:

%% $ erl -sname clientnode -setcookie mycookie
%% > xnode_test:test(10000)

-module(xnode_test).

-export([test/1]).

test(ProcN) ->
    Workers = [spawn_worker(self(), fun rpc_parse/1, {})        
     	       || I <- lists:seq(0, ProcN - 1)],
    Results = [wait_result(Worker) || Worker <- Workers].

rpc_parse({}) ->
    rpc:call(xnodename@cmac, 'System', currentTimeMillis, []).

spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.

I spawned 10000 calls to it, and it run smoothly.

I'm also considering to write a more general-purpose RPC server in Java, which can dynamically call any existed methods of Java class.

分享到:
评论

相关推荐

    dfactor:Java中的轻量级服务器框架,一个用java编写的多核服务端开发框架

    去中心化自动组网,充分利用内结点互相发现相互通信开发中js脚本开发支持因素dfactor是一个基于actor模型的消息处理框架,充分利用多核处理器,平衡业务负载,可参考erlang dfactor使用java编写,天生多平台支持,...

    RabbitMQ+Zookeeper+Dubbo+Nginx+Mysql+Redis搭建.txt

    **Dubbo** 是一个高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用、智能容错和负载均衡、服务自动注册与发现。 1. **搭建Dubbo环境** - 安装Java环境。 - 下载Dubbo的示例项目...

    thrift 小试牛刀

    同样地,对于Java示例,通过`ant`工具编译生成的Java代码,之后运行`./JavaServer`和`./JavaClient simple`启动服务端和客户端。 **知识点五:Thrift的应用场景与扩展资源** Thrift不仅适用于构建分布式系统和...

    张琨:教育社交平台的web架构分享

    服务拓扑设计上,采用了一系列开源技术和产品,例如nginx作为高性能Web服务器和反向代理服务器,以及Resin作为Java应用服务器。数据库方面,使用了MySQL和MongoDB,它们分别负责关系型数据存储和NoSQL数据存储。在...

    thrift入门教程+代码

    例如,对于Java,Thrift会生成`Iface`、`Processor`、`Client`和`Server`接口及类,开发者只需实现`Iface`接口。 5. 客户端与服务器通信:客户端使用生成的客户端代码来创建服务代理,通过代理调用服务方法。服务器...

    thrift基础文档

    支持的语言包括C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk等。 3. **传输层**:Thrift提供多种传输协议,如TCP、HTTP、HTTP/2等,可以根据实际需求选择合适的通信方式。 4. **...

    RabbitMQ超级详解(附带常见问题及答案、安装流程、基础知识、高级特性、实战案列)-用这一篇就够了

    RabbitMQ是一款广泛使用的开源消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议,适用于多种编程语言,如Java、Python、Ruby、JavaScript等。它提供了一种可靠且可扩展的方式来在分布式系统中传递...

    《RabbitMQ实战指南》知识点总结-RabbitMQ-Demo.zip

    - Erlang是RabbitMQ的依赖,需先安装Erlang环境。 - 使用`rabbitmq-plugins enable rabbitmq_management`命令开启管理插件,便于通过Web界面管理RabbitMQ。 3. **AMQP协议** - AMQP定义了消息的结构、交换机类型...

    springCloud

    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展...

Global site tag (gtag.js) - Google Analytics