如下是一个使用Hadoop IPC实现客户端调用服务器端方法的示例,功能是返回服务器端的一个文件信息。
1 文件信息类IPCFileStatus
代码如下所示:
package org.seandeng.hadoop.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class IPCFileStatus implements Writable {
private String filename;
private long time;
public IPCFileStatus() {
}
public IPCFileStatus(String filename) {
this.filename=filename;
this.time=(new Date()).getTime();
}
public String getFilename() {
return filename;
}
public void setFilename(String filename) {
this.filename = filename;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public String toString() {
return "File: "+filename+" Create at "+(new Date(time));
}
public void readFields(DataInput in) throws IOException {
this.filename = Text.readString(in);
this.time = in.readLong();
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, filename);
out.writeLong(time);
}
}
由于IPCFileStatus类的对象需要从服务器端传到客户端,所以就需要进行序列化,Writable接口就是Hadoop定义的一个序列化接口。
由于客户端要调用服务器的方法,所以客户端需要知道服务器有哪些方法可以调用,在IPC中使用的是定义接口的方法,如定义一个IPC接口,客户端和服务器端都知道这个接口,客户端通过IPC获取到一个服务器端这个实现了接口的引用,待要调用服务器的方法时,直接使用这个引用来调用方法,这样就可以调用服务器的方法了。
2 接口IPCQueryStatus
定义一个服务器端和客户端接口IPCQueryStatus如下所示:
package org.seandeng.hadoop.ipc;
import org.apache.hadoop.ipc.VersionedProtocol;
public interface IPCQueryStatus extends VersionedProtocol {
IPCFileStatus getFileStatus(String filename);
}
在接口IPCQueryStatus中,定义了一个getFileStatus(String filename)方法,根据文件名得到一个IPCFileStatus对象,注意到IPCQueryStatus接口继承自接口 org.apache.hadoop.ipc.VersionedProtocol接口,VersionedProtocol接口是Hadoop IPC接口必须继承的一个接口,它定义了一个方法getProtocolVersion(),用于返回服务器端的接口实现的版本号,有两个参数,分别是协议接口对应的接口名称protocol和客户端期望服务器的版本号clientVersion,主要作用是检查通信双方的接口是否一致,VersionedProtocol的代码如下:
package org.apache.hadoop.ipc;
import java.io.IOException;
/**
* Superclass of all protocols that use Hadoop RPC.
* Subclasses of this interface are also supposed to have
* a static final long versionID field.
*/
public interface VersionedProtocol {
/**
* Return protocol version corresponding to protocol interface.
* @param protocol The classname of the protocol interface
* @param clientVersion The version of the protocol that the client speaks
* @return the version that the server will speak
*/
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException;
}
3 实现类IPCQueryStatusImpl
定义好了接口,那么在服务器端就需要有一个接口的实现类,用于实现具体的业务逻辑,下面的IPCQueryStatusImpl类实现了IPCQueryStatus接口,仅仅简单实现了IPCQueryStatus规定两个方法。
package org.seandeng.hadoop.ipc;
import java.io.IOException;
public class IPCQueryStatusImpl implements IPCQueryStatus {
public IPCQueryStatusImpl() {}
public IPCFileStatus getFileStatus(String filename) {
IPCFileStatus status=new IPCFileStatus(filename);
System.out.println("Method getFileStatus Called, return: "+status);
return status;
}
/**
* 用于服务器与客户端,进行IPC接口版本检查,再服务器返回给客户端时调用,如果服务器端的IPC版本与客户端不一致
* 那么就会抛出版本不一致的异常
*/
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
System.out.println("protocol: "+protocol);
System.out.println("clientVersion: "+clientVersion);
return IPCQueryServer.IPC_VER;
}
}
getFileStatus()方法根据参数filename创建了一个IPCFileStatus对象,getProtocolVersion()方法返回服务器端使用的接口版本。接口和实现类都完成之后就可以用客户端和服务器进行通信了。
4 IPCQueryServer类
服务器端进行一些成员变量的初始化,然后使用Socket绑定IP,然后在某个端口上监听客户端的请求。IPCQueryServer类相关代码如下所示:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
public class IPCQueryServer {
public static final int IPC_PORT = 32121;
public static final long IPC_VER = 5473L;
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
IPCQueryStatusImpl queryService=new IPCQueryStatusImpl();
System.out.println(conf);
Server server = RPC.getServer(queryService, "127.0.0.1", IPC_PORT, 1, false, conf);
server.start();
System.out.println("Server ready, press any key to stop");
System.in.read();
server.stop();
System.out.println("Server stopped");
} catch (Exception e) {
e.printStackTrace();
}
}
}
在服务器端先创建一个IPCQueryStatusImpl的对象,传递到RPC.getServer()方法中。服务器端使用RPC.getServer()方法穿给创建服务器端对象server,代码中RPC.getServer()方法的几个参数说明如下:
· 第一个参数queryService标识该服务器对象对外提供的服务对象实例,即客户端所要调用的具体对象,下面客户端的代码调用的接口如此对应;
· 第二个参数"127.0.0.1"表示监绑定所有的IP地址;
· 第三个参数IPC_PORT表示监听的端口;
· 第四个参数1表示Server端的Handler实例(线程)的个数为1
· 第五个参数false表示不打开调用方法日志;
· 第六个参数是Configuration对象,用于定制Server端的配置。
创建Server对象之后,调用Server.start()方法开始监听客户端的请求,并根据客户端的请求提供服务。
5 请求类IPCQueryClient
客户端需要先获取到一个代理对象,然后才能进行方法调用,在IPC中,使用RPC.getProxy()方法获取代理对象。客户端的代码如下:
package org.seandeng.hadoop.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
public class IPCQueryClient {
public static void main(String[] args) {
try {
System.out.println("Interface name: "+IPCQueryStatus.class.getName());
System.out.println("Interface name: "+IPCQueryStatus.class.getMethod("getFileStatus", String.class).getName());
InetSocketAddress addr=new InetSocketAddress("localhost", IPCQueryServer.IPC_PORT);
IPCQueryStatus query=(IPCQueryStatus) RPC.getProxy(IPCQueryStatus.class, IPCQueryServer.IPC_VER, addr,new Configuration());
IPCFileStatus status=query.getFileStatus("Z:\\temp\\7c64984cf5c3410fbe28037865d010a3.pdf");
System.out.println(status);
RPC.stopProxy(query);
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端的代码很简单,首先构造一个要请求服务器的网络地址(IP和端口),然后通过RPC.getProxy()方法获取到一个IPCQueryStatus对象,然后进行相应的方法调用。其中客户端代码中RPC.getProxy()方法的参数说明如下:
· 第一个参数是IPC接口对象,可以通过IPC接口的静态成员class直接获得。接口的静态成员class保存了该接口的java.lang.Class实例,它表示正在运行的Java应用程序中的类和接口,提供一系列与Java反射相关的重要功能;
· 第二个参数是接口版本,由于接口会根据需求不断地进行升级,形成多个版本的IPC接口,如果客户端和服务器端使用的IPC接口版本不一致,结果将是灾难性的,所以在建立IPC时,需要对IPC的双方进行版本检查;
· 第三个参数是服务器的Socket地址,用于建立IPC的底层TCP连接;
· 第四个参数是Configuration对象,用于定制IPC客户端参数。
6 执行结果
客户端的代码编写完成之后就可以运行程序了,先启动服务器端,再运行一个客户端,就完成了一次客户端调用服务器的过程,客户端调用了服务器端 IPCQueryStatusImpl对象的getFileStatus()方法,服务器端返回了方法调用结果即IPCFileStatus对象。服务器端和客户端执行日志如下所示:
服务器端:
2014-11-26 13:00:49,147 WARN conf.Configuration (Configuration.java:<clinit>(191)) - DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively Configuration: core-default.xml, core-site.xml 2014-11-26 13:00:50,124 INFO ipc.Server (Server.java:run(328)) - Starting SocketReader 2014-11-26 13:00:50,222 INFO ipc.Server (Server.java:run(598)) - IPC Server Responder: starting 2014-11-26 13:00:50,223 INFO ipc.Server (Server.java:run(434)) - IPC Server listener on 32121: starting Server ready, press any key to stop 2014-11-26 13:00:50,224 INFO ipc.Server (Server.java:run(1358)) - IPC Server handler 0 on 32121: starting protocol: org.seandeng.hadoop.ipc.IPCQueryStatus clientVersion: 5473 Method getFileStatus Called, return: File: Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf Create at Wed Nov 26 13:01:02 CST 2014 |
客户端:
Interface name: org.seandeng.hadoop.ipc.IPCQueryStatus Interface name: getFileStatus 2014-11-26 13:00:59,790 WARN conf.Configuration (Configuration.java:<clinit>(191)) - DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively File: Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf Create at Wed Nov 26 13:01:02 CST 2014 |
相关推荐
本系统利用大数据技术,合理的为用户做出推荐,推荐的结果可靠程度很高,这就是我的优势所在,因为它和一般的推荐系统的推荐算法不太一样,我的推荐算法是利用Hadoop技术写的,我们可以利用Hadoop集群的高吞吐量,一...
该文档简要介绍了大数据与Hadoop的概念及相关大数据框架,对于在大数据的初学者来说有一定的帮助,可以指导如何学习,以及学习各个框架的步骤
03_Hadoop_概论_大数据的特点.mp4 04_Hadoop_概论_大数据的应用场景.mp4 06_Hadoop_概论_未来工作内容.mp4 07_Hadoop_入门_课程介绍.mp4 11_Hadoop_入门_Hadoop优势.mp4 13_Hadoop_入门_HDFS概述.mp4 14_Hadoop_入门...
大数据之Hadoop学习教程+笔记合计_超详细完整.zip
大数据处理框架:Hadoop:大数据与Hadoop简介.docx
1.大数据框架hadoop; 2.根据表名,获取全部数据,支持翻页; 3.获取数据总条数; 4.根据表名、上次查询最后一条记录的rowkey,获取下一页数据; 5.数据支持jsonarray/list等;
在本套内部Hadoop系列培训资料中,我们将深入探讨大数据技术的核心——Hadoop及其生态系统,包括Spark、Hive、Storm、Hbase和Sqoop等关键组件。这些工具和框架共同构建了大数据解决方案的基础。 首先,Hadoop是...
(2)修改 module、software 文件夹的所有者 (1)查询是否安装 java 软件: (2)如果安装的版本低于 1.7,卸载该 jdk: (1)先获
在大数据的世界里,Hadoop是不可或缺的一个核心组件,它为海量数据处理提供了高效、可靠的解决方案。本主题将深入探讨Hadoop在数据分析中的应用及其生态系统的关键技术。 首先,我们需要理解“大数据”的概念。...
技术领域:大数据领域Hadoop技能学习 技术关键词:大数据、Hadoop 内容:大数据小白晋升之路学习必备 用途:学习
大数据整理hadoop/hive
大数据之hadoop,spart全套全技术栈视频课程,包含spark,hadoop,storm,kafka,mllib等组件的安装,编程等,依次从基础,进阶直到实际实践。
本资源包“大数据-hadoop-mapreduce代码”显然包含了与MapReduce编程相关的实例或示例代码,对于理解并应用Hadoop MapReduce具有很高的参考价值。 MapReduce的工作原理可以分为两个主要阶段:Map阶段和Reduce阶段。...
大数据与Hadoop是当前信息技术领域的核心概念,它们共同推动了数据处理能力的...无论是科学研究、商业决策还是公共服务,大数据和Hadoop都提供了强大的工具和框架,帮助人们在信息的海洋中航行,探索未知,创造价值。
大数据专业Hadoop开发技术课程的实践教学探索是一项针对高等教育大数据专业学生培养的重要研究,旨在解决当前大数据专业教学中的困境,特别是在实践教学方面的挑战。本课程的实践教学内容和方法涉及Java、Linux、...
【大数据(hadoop)竞赛...以上是对Hadoop相关知识点的详细解析,涵盖了Hadoop的基本概念、组件功能、文件系统的存储机制以及MapReduce的工作原理等方面。这些知识对于理解Hadoop大数据处理框架及其生态系统至关重要。
这个名为"java+大数据相关框架实战项目(Hadoop, Spark, Storm, Flink).zip"的压缩包文件,包含了四个核心的大数据处理框架——Hadoop、Spark、Storm和Flink的实战项目源码,这些框架都是Java开发的,用于解决大规模...
对于希望在大数据领域取得进展的企业和个人来说,理解Hadoop框架的数据处理流程并掌握其实践案例,是不可或缺的专业技能。通过对Hadoop框架的研究和实践,可以更好地应对信息社会的挑战,利用数据创造价值。