java ipc实例,仿照hadoop ipc写的实例
1.用接口规定ipc协议的方法
2.client端用动态代理作调用远程ipc接口方法
3.server端用反射,执行ipc接口方法,并返回给client端接口方法返回值
hadoop ipc的另一个特点是server端用三个角色,Listener,Handler,Responser。server聚合这三个角色
Listener:nio socket获取请求CALL对象,放入队列中
Handler:从队列中获取CALL对象,执行ipc接口方法
Responser:被Handler调用,用nio socket返回接口方法返回值
简单实例:(实例来自github)
1.定义协议
public interface Echo { public String who() throws IOException;; public void from(String name) throws IOException;; }
2.定义代理
/*Invocation封装方法名和参数*/ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } //调用Client call方法 Invocation value = client.call(new Invocation(iface, method, args), remoteId);//调用远程的当前方法,阻塞直到server返回值 if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + "() " + callTime); } return value.getResult(); }
public Invocation call(Invocation invoked, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(invoked); //将传入的数据封装成call对象 Serializable接口 //已经向服务器端 RPCHeader ConnectionHeader验证 Connection connection = getConnection(remoteId, call); //获得一个连接 connection.sendParam(call); // 向服务端发送Call对象 boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); //等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程 } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { //因中断异常而终止,设置标志interrupted为true Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { /* local exception use the connection because it will * reflect an ip change, unlike the remoteId */ throw wrapException(connection.getRemoteAddress(), call.error); } } else { return call.value; //返回结果数据 } } }
3.server端反射ipc接口方法
private class Handler extends Thread { public Handler(int instanceNumber) { this.setDaemon(true); this.setName("IPC Server handler " + instanceNumber + " on " + port); } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); //创建大小为10240个字节的响应缓冲区 ByteArrayOutputStream buf = new ByteArrayOutputStream( INITIAL_RESP_BUF_SIZE); while (running) { try { /** pop the queue; maybe blocked here */ //获取一个远程调用请求 Server.Call final Call call = callQueue.take(); //弹出call,可能会阻塞 if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + call.connection); String errorClass = null; String error = null; Serializable value = null; /** process the current call. */ //处理当前Call CurCall.set(call); try { //调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中 value = call(call.connection.protocol, call.param, call.timestamp); //Invocation对象 } catch (Throwable e) { LOG.info(getName() + ", call " + call + ": error: " + e, e); errorClass = e.getClass().getName(); error = e.getMessage(); } CurCall.set(null); synchronized (call.connection.responseQueue) { /** * setupResponse() needs to be sync'ed together with * responder.doResponse() since setupResponse may use * SASL to encrypt response data and SASL enforces its * own message ordering. */ //将返回结果序列化到Call的成员变量response中 setupResponse( buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); /* Discard the large buf and reset it back to smaller size to freeup heap*/ //丢弃大的buf 重设到更小的容量 释放内存 if (buf.size() > maxRespSize) { LOG.warn("Large response size " + buf.size() + " for call " + call.toString()); buf = new ByteArrayOutputStream( INITIAL_RESP_BUF_SIZE); } //给客户端响应请求 responder.doRespond(call);//Responder在Server构造器初始化 } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + e); } } catch (Exception e) { LOG.info(getName() + " caught: " + e); } } LOG.info(getName() + ": exiting"); } }
public Serializable call(Class<?> iface, Serializable param, long receivedTime) throws IOException { try { Invocation call = (Invocation) param; //调用参数 Invocationd对象包含方法名称 形式参数列表和实际参数列表 if (verbose) log("Call: " + call); //从实例缓存中按照接口寻找实例对象 Object instance = INSTANCE_CACHE.get(iface); if (instance == null) throw new IOException("interface `" + iface + "` not inscribe."); //通过Class对象获取Method对象 Method method = iface.getMethod(call.getMethodName(), call.getParameterClasses()); //取消Java语言访问权限检查 method.setAccessible(true); long startTime = System.currentTimeMillis(); //调用Method对象的invoke方法 Object value = method.invoke(instance, call.getParameters()); int processingTime = (int) (System.currentTimeMillis() - startTime); int qTime = (int) (startTime - receivedTime); if (LOG.isDebugEnabled()) { LOG.debug("Served: " + call.getMethodName() + " queueTime= " + qTime + " procesingTime= " + processingTime); } if (verbose) log("Return: " + value); call.setResult(value); //向Invocation对象设置结果 return call; } catch (InvocationTargetException e) { Throwable target = e.getTargetException(); if (target instanceof IOException) { throw (IOException) target; } else { IOException ioe = new IOException(target.toString()); ioe.setStackTrace(target.getStackTrace()); throw ioe; } } catch (Throwable e) { if (!(e instanceof IOException)) { LOG.error("Unexpected throwable object ", e); } IOException ioe = new IOException(e.toString()); ioe.setStackTrace(e.getStackTrace()); throw ioe; } } }
相关推荐
Java IPC(Inter-Process Communication,进程间通信)是Java编程中一种重要的技术,它允许不同进程之间共享数据和资源,从而实现多进程间的协作。在Java中,IPC可以通过多种方式实现,包括管道(Pipes)、套接字...
本篇将深入探讨Android IPC通信的实例。 首先,Android提供了多种IPC方式,包括Binder、AIDL(Android Interface Definition Language)、Messenger、Content Provider以及Broadcast Receiver。其中,Binder是...
这个"java网络编程实例.rar"的压缩包显然包含了帮助开发者理解和实践Java网络编程的资源。让我们详细探讨一下Java网络编程的核心概念和相关知识点。 首先,Java网络编程主要基于Java的Socket编程模型,它允许程序...
6. **Pipe**:在本地进程间通信(IPC)中使用,提供单向数据流。 7. **CharSet和CharsetDecoder/Encoder**:用于字符编码和解码,支持多种字符集转换。 在`NIOClient.java`文件中,可能会创建SocketChannel连接...
在这个特定的实例中,我们关注的是通过管道(Pipe)实现的IPC,这是Java提供的一种简单而有效的通信机制。管道主要用于连接两个进程,使得一个进程的输出可以作为另一个进程的输入,从而实现数据的传递。 在Java中...
Java Socket编程是网络编程的基础,它提供了进程间通信(IPC)的能力,特别是在互联网环境中,让两个运行在不同设备上的应用程序可以相互通信。本实例压缩包包含了一系列的Java Socket编程示例,旨在帮助开发者深入...
在这个实例中,我们将探讨如何使用Java和Visual Basic(VB)来实现Socket通信。下面我们将详细讲解相关知识点。 1. **Socket基本概念** - Socket,也称为套接字,是网络通信中的一个抽象概念,它是进程间通信(IPC...
Java Socket多线程文件传输实例项目是一个典型的网络编程应用场景,主要涉及了Socket编程、多线程处理以及文件I/O操作等关键知识点。在这个项目中,开发者利用Java的Socket API实现了一个能够支持多个客户端同时进行...
在Java编程语言中,Socket是网络通信的核心组件,它提供了进程间通信(IPC)和互联网上的应用程序之间的双向通信。这个实例教程将引导你通过创建一个简单的Java程序,使用Socket类来连接到指定的主机。本教程的目标...
Socket编程是Java网络编程的重要组成部分,它提供了进程间通信(IPC)的能力,使得运行在不同设备上的应用程序可以通过网络进行通信。在这个"Socket编程实例(java)"中,我们将深入探讨Java Socket编程的基本概念、...
在操作系统中,进程是程序执行时的一个实例,拥有独立的内存空间。当多个进程运行时,它们各自的工作不互相干扰,但有时需要进行通信以完成共同的任务,这就是IPC的目的。 IPC的主要目标包括数据共享、同步和互斥。...
在Java中,通过创建`DatagramSocket`实例并调用`send()`方法发送`DatagramPacket`,同时在接收端创建`DatagramSocket`监听特定端口,并调用`receive()`方法接收数据包。 接下来是UDP组播,这是一种一对多的通信方式...
AIDL文件定义了方法的输入和输出参数,编译后会自动生成相应的Java代码,帮助开发者实现跨进程通信。 六、Binder实例 在提供的“android通信机制binder实例”中,可能包含以下几个部分: 1. 服务端接口定义(AIDL...
本文将深入探讨Android IPC机制,并通过具体的ContentProvider和Socket通信实例进行详细解析。 首先,理解Android IPC的重要性是至关重要的。由于Android系统采用严格的权限隔离,每个应用运行在自己的进程中,这...
接着,通过ApplicationContext的bindService()方法绑定Service,并在ServiceConnection的onServiceConnected()方法中接收IBinder实例。这个IBinder是Service的BBinder在Binder内核中的引用,确保了服务的唯一性。...
AIDL编译器会根据该文件生成相应的Java代码,使得进程间能够互相调用接口中的方法。 **2. AIDL的作用:** - **简化开发流程:** 通过AIDL,开发者无需手动编写大量的通信代码,减少了出错的可能性。 - **保证类型...
3. **建立连接**:客户端创建Socket实例,通过`connect()`方法连接到服务器的IP地址和端口。服务器端通过`ServerSocket`监听指定端口,当有客户端连接请求时,`accept()`方法会返回一个新的Socket对象。 4. **数据...
"大华视频监控java示例demo"是用于演示如何利用大华提供的SDK进行视频监控功能开发的实例代码。这些示例通常涵盖了设备连接、视频流获取、图像处理、报警事件处理等功能,让开发者快速理解如何与大华的监控设备进行...
在Java中,使用`java.net.Socket`类来创建一个Socket实例,以建立到指定服务器的连接。客户端通常需要执行以下步骤: 1. 创建`Socket`对象,指定服务器的IP地址和端口号,如`new Socket("服务器IP", 端口号)`。 2. ...
Android AIDL(Android Interface Definition Language)是Android系统提供的一种接口定义语言,用于处理进程间通信(IPC, Inter-Process Communication)。在Android应用开发中,当需要在不同进程间共享数据或调用...