一 生成代码
首先写一个接口定义文件ThriftTest.thrift,里面定义了一个User的struct和一个getUser接口,以java为例,cmd执行命令thrift --gen java ThriftTest.thrift,成功后会在ThriftTest.thrift文件所在的目录生成一个gen-java的文件夹,里面包含了生成的代码文件:gen-java\thrift\test\ThriftTest.java和gen-java\thrift\test\User.java。
接口定义文件ThriftTest.thrift:
namespace java thrift.test struct User { 1: i32 id, 2: string name } service ThriftTest { User getUser(1: i32 id) }
实现ThriftTest.thrift里的getUser接口(编写TestHandler类):
import org.apache.thrift.TException; import thrift.test.ThriftTest; import thrift.test.User; public class TestHandler implements ThriftTest.Iface { @Override public User getUser(int id) throws TException { if (id == 2 ) { User user = new User(); user.setId(2); user.setName("另外一个烟火"); return user; } return null; } }
二 编写server
编写thrift的server端需要指定transport_type(通信方式),protocol_type(通信协议),server_type(服务器模式)。本例中实现了根据启动程序的命令行参数值来动态的指定transport_type,protocol_type和server_type。
1--transport_type本例中只列举使用了以下三种:
(1) buffered:使用经典的缓冲流Socket;
(2) framed:基于帧的方式的Socket,每个帧都是按照4字节的帧长加上帧的内容来组织,帧内容就是我们要收发的数据。读的时候按长度预先将整Frame数据读入Buffer,再从Buffer慢慢读取。写的时候,每次flush将Buffer中的所有数据写成一个Frame。framed这种方式有点类似于http协议的chunked编码;
(3) fastframed:和framed相比是内存利用率更高的一个内存读写缓存区,它使用自动增长的byte[](不够长度才new),而不是每次都new一个byte[],提高了内存的使用率。framed的ReadBuffer每次读入Frame时都会创建新的byte[],WriteBuffer每次flush时如果大于初始1K也会重新创建byte[]。
2--protocol_type本例中只列举使用了以下三种:
(1) binary:二进制编码格式进行数据传输;
(2) json:使用JSON的数据编码协议进行数据传输;
(3) compact:高效率的,密集的二进制编码格式进行数据传输。
3--server_type本例中只列举使用了以下四种:
(1) simple:单线程阻塞io;
(2) thread-pool:多线程阻塞io;
(3) nonblocking:单条线程非阻塞io;
(4) threaded-selector:非阻塞io,有一条线程专门负责accept,若干条Selector线程处理网络IO,一个Worker线程池处理消息。
PS:
其它的transport_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\transport目录下的例子。
其它的protocol_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\protocol目录下的例子。
其它的server_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\server目录下的例子。
server_type中有个叫THsHaServer,相比threaded-selector的区别是只有一条AcceptSelect线程处理关于网络的一切,一个Worker线程池处理消息。threaded-selector模式是目前Thrift提供的最高级的模式,对于大部分应用场景性能都不会差,因此,如果实在不知道选择哪种工作模式,使用threaded-selector就可以。
服务端TestServer.java文件:
import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TJSONProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TSimpleServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFastFramedTransport; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportFactory; import thrift.test.ThriftTest; public class TestServer { public static void main(String [] args) { try { int port = 9090; boolean ssl = false; //传输是否加密 String transport_type = "buffered"; //需要和客户端的一致才能正常通信 String protocol_type = "binary"; //需要和客户端的一致才能正常通信 String server_type = "thread-pool"; try { //从启动程序的命令行参数获取一些值,包括protocol_type,server_type和transport_type等 for (int i = 0; i < args.length; i++) { if (args[i].startsWith("--port")) { port = Integer.valueOf(args[i].split("=")[1]); } else if (args[i].startsWith("--server-type")) { server_type = args[i].split("=")[1]; server_type.trim(); } else if (args[i].startsWith("--port")) { port = Integer.parseInt(args[i].split("=")[1]); } else if (args[i].startsWith("--protocol")) { protocol_type = args[i].split("=")[1]; protocol_type.trim(); } else if (args[i].startsWith("--transport")) { transport_type = args[i].split("=")[1]; transport_type.trim(); } else if (args[i].equals("--ssl")) { ssl = true; } else if (args[i].equals("--help")) { System.out.println("Allowed options:"); System.out.println(" --help\t\t\tProduce help message"); System.out.println(" --port=arg (=" + port + ")\tPort number to connect"); System.out.println(" --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed"); System.out.println(" --protocol=arg (=" + protocol_type + ")\tProtocol: binary, json, compact"); System.out.println(" --ssl\t\t\tEncrypted Transport using SSL"); System.out.println(" --server-type=arg (=" + server_type +")\n\t\t\t\tType of server: simple, thread-pool, nonblocking, threaded-selector"); System.exit(0); } } } catch (Exception e) { System.err.println("Can not parse arguments! See --help"); System.exit(1); } try { //检查传入的变量值是否正确 if (server_type.equals("simple")) { } else if (server_type.equals("thread-pool")) { } else if (server_type.equals("nonblocking")) { if (ssl == true) { throw new Exception("SSL is not supported over nonblocking servers!"); } } else if (server_type.equals("threaded-selector")) { if (ssl == true) { throw new Exception("SSL is not supported over nonblocking servers!"); } } else { throw new Exception("Unknown server type! " + server_type); } if (protocol_type.equals("binary")) { } else if (protocol_type.equals("json")) { } else if (protocol_type.equals("compact")) { } else { throw new Exception("Unknown protocol type! " + protocol_type); } if (transport_type.equals("buffered")) { } else if (transport_type.equals("framed")) { } else if (transport_type.equals("fastframed")) { } else { throw new Exception("Unknown transport type! " + transport_type); } } catch (Exception e) { System.err.println("Error: " + e.getMessage()); System.exit(1); } // Processor TestHandler testHandler = new TestHandler(); //具体的业务逻辑类,实现ThriftTest.thrift里的getUser接口 //ThriftTest.Processor是生成的服务端代码 ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler); // Protocol factory TProtocolFactory tProtocolFactory = null; //指定的通信协议 if (protocol_type.equals("json")) { tProtocolFactory = new TJSONProtocol.Factory(); } else if (protocol_type.equals("compact")) { tProtocolFactory = new TCompactProtocol.Factory(); } else { tProtocolFactory = new TBinaryProtocol.Factory(); } TTransportFactory tTransportFactory = null; //指定的通信方式 if (transport_type.equals("framed")) { tTransportFactory = new TFramedTransport.Factory(); } else if (transport_type.equals("fastframed")) { tTransportFactory = new TFastFramedTransport.Factory(); } else { // .equals("buffered") => default value tTransportFactory = new TTransportFactory(); } TServer serverEngine = null; //指定的服务器模式 if (server_type.equals("nonblocking") || server_type.equals("threaded-selector")) { // Nonblocking servers TNonblockingServerSocket tNonblockingServerSocket = new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(port)); if (server_type.equals("nonblocking")) { // Nonblocking Server TNonblockingServer.Args tNonblockingServerArgs = new TNonblockingServer.Args(tNonblockingServerSocket); tNonblockingServerArgs.processor(testProcessor); tNonblockingServerArgs.protocolFactory(tProtocolFactory); tNonblockingServerArgs.transportFactory(tTransportFactory); serverEngine = new TNonblockingServer(tNonblockingServerArgs); } else { // server_type.equals("threaded-selector") // ThreadedSelector Server TThreadedSelectorServer.Args tThreadedSelectorServerArgs = new TThreadedSelectorServer.Args(tNonblockingServerSocket); tThreadedSelectorServerArgs.processor(testProcessor); tThreadedSelectorServerArgs.protocolFactory(tProtocolFactory); tThreadedSelectorServerArgs.transportFactory(tTransportFactory); serverEngine = new TThreadedSelectorServer(tThreadedSelectorServerArgs); } } else { // Blocking servers // SSL socket TServerSocket tServerSocket = null; if (ssl) { tServerSocket = TSSLTransportFactory.getServerSocket(port, 0); } else { tServerSocket = new TServerSocket(new TServerSocket.ServerSocketTransportArgs().port(port)); } if (server_type.equals("simple")) { // Simple Server TServer.Args tServerArgs = new TServer.Args(tServerSocket); tServerArgs.processor(testProcessor); tServerArgs.protocolFactory(tProtocolFactory); tServerArgs.transportFactory(tTransportFactory); serverEngine = new TSimpleServer(tServerArgs); } else { // server_type.equals("threadpool") // ThreadPool Server TThreadPoolServer.Args tThreadPoolServerArgs = new TThreadPoolServer.Args(tServerSocket); tThreadPoolServerArgs.processor(testProcessor); tThreadPoolServerArgs.protocolFactory(tProtocolFactory); tThreadPoolServerArgs.transportFactory(tTransportFactory); serverEngine = new TThreadPoolServer(tThreadPoolServerArgs); } } // Run it System.out.println("Starting the server on port " + port + "..."); System.out.println("transport_type:" + transport_type); System.out.println("protocol_type:" + protocol_type); System.out.println("server_type:" + server_type); serverEngine.serve(); } catch (Exception x) { x.printStackTrace(); } System.out.println("done."); } }
三 编写client
编写thrift的client 端需要指定transport_type(通信方式),protocol_type(通信协议),这两个需要跟服务端的一致才能正常通信。
客户端TestClient.java文件:
import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TJSONProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFastFramedTransport; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.THttpClient; import org.apache.thrift.transport.TSSLTransportFactory; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import thrift.test.ThriftTest; import thrift.test.User; public class TestClient { public static void main(String [] args) { String host = "localhost"; int port = 9090; String protocol_type = "binary"; //需要和服务端的一致才能正常通信 String transport_type = "buffered"; //需要和服务端的一致才能正常通信 boolean ssl = false; //传输是否加密 int socketTimeout = 1000; try { //从启动程序的命令行参数获取一些值,包括protocol_type和transport_type等 for (int i = 0; i < args.length; ++i) { if (args[i].startsWith("--host")) { host = args[i].split("=")[1]; host.trim(); } else if (args[i].startsWith("--port")) { port = Integer.valueOf(args[i].split("=")[1]); } else if (args[i].equals("--timeout")) { socketTimeout = Integer.valueOf(args[i].split("=")[1]); } else if (args[i].startsWith("--protocol")) { protocol_type = args[i].split("=")[1]; protocol_type.trim(); } else if (args[i].startsWith("--transport")) { transport_type = args[i].split("=")[1]; transport_type.trim(); } else if (args[i].equals("--ssl")) { ssl = true; } else if (args[i].equals("--help")) { System.out.println("Allowed options:"); System.out.println(" --help\t\t\tProduce help message"); System.out.println(" --host=arg (=" + host + ")\tHost to connect"); System.out.println(" --port=arg (=" + port + ")\tPort number to connect"); System.out.println(" --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed, http"); System.out.println(" --protocol=arg (=" + protocol_type + ")\tProtocol: binary, json, compact"); System.out.println(" --ssl\t\t\tEncrypted Transport using SSL"); System.exit(0); } } } catch (Exception x) { System.err.println("Can not parse arguments! See --help"); System.exit(1); } try { //检查传入的变量值是否正确 if (protocol_type.equals("binary")) { } else if (protocol_type.equals("compact")) { } else if (protocol_type.equals("json")) { } else { throw new Exception("Unknown protocol type! " + protocol_type); } if (transport_type.equals("buffered")) { } else if (transport_type.equals("framed")) { } else if (transport_type.equals("fastframed")) { } else if (transport_type.equals("http")) { } else { throw new Exception("Unknown transport type! " + transport_type); } if (transport_type.equals("http") && ssl == true) { //不支持https throw new Exception("SSL is not supported over http."); } } catch (Exception e) { System.err.println("Error: " + e.getMessage()); System.exit(1); } TTransport transport = null; //指定的通信方式 try { if (transport_type.equals("http")) { //http的transport_type(通信方式)在下一节中写个例子 String url = "http://" + host + ":" + port + "/service"; transport = new THttpClient(url); } else { TSocket socket = null; if (ssl == true) { socket = TSSLTransportFactory.getClientSocket(host, port, 0); } else { socket = new TSocket(host, port); } socket.setTimeout(socketTimeout); transport = socket; if (transport_type.equals("buffered")) { } else if (transport_type.equals("framed")) { transport = new TFramedTransport(transport); } else if (transport_type.equals("fastframed")) { transport = new TFastFramedTransport(transport); } } } catch (Exception x) { x.printStackTrace(); System.exit(1); } TProtocol tProtocol = null; //指定的通信协议 if (protocol_type.equals("json")) { tProtocol = new TJSONProtocol(transport); } else if (protocol_type.equals("compact")) { tProtocol = new TCompactProtocol(transport); } else { tProtocol = new TBinaryProtocol(transport); } //ThriftTest.Client是生成的客户端代码 ThriftTest.Client testClient = new ThriftTest.Client(tProtocol); try { System.out.println("connect " + host + ":" + port); System.out.println("protocol_type:" + protocol_type); System.out.println("transport_type:" + transport_type); if (transport.isOpen() == false) { try { transport.open(); } catch (TTransportException ttx) { ttx.printStackTrace(); System.out.println("Connect failed: " + ttx.getMessage()); System.exit(1); } } User user = testClient.getUser(2); //getUser就是ThriftTest.thrift所定义的接口 System.out.println("名字:"+ user.getName()); transport.close(); } catch (Exception x) { x.printStackTrace(); System.exit(1); } System.exit(0); } }
四 运行结果
--服务端--
Starting the server on port 9090...
transport_type:fastframed
protocol_type:binary
server_type:simple
--客户端--
connect localhost:9090
protocol_type:binary
transport_type:fastframed
名字:另外一个烟火
五 工程文件结构
有不明的地方建议先看Apache Thrift 初学小讲(一),附件src.rar是源代码文件。
相关推荐
在"Apache Thrift 初学小讲(五)【代理】"这篇博文中,我们将探讨Thrift如何实现代理服务,这在分布式系统中非常关键,因为代理可以提供负载均衡、安全控制、监控等功能。 1. **接口定义语言(IDL)**:Thrift ...
在本篇初学小讲中,我们将重点关注Thrift在负载均衡方面的应用,这对于构建大型分布式系统至关重要。Thrift通过提供一种高效的数据序列化机制以及RPC(远程过程调用)接口,使得不同编程语言之间可以轻松地进行通信...
本例改编自Apache Thrift教程: http://mikecvet.wordpress.com/2010/05/13/apache-thrift-tutorial-the-sequel/ http://chanian.com/2010/05/13/thrift-tutorial-a-php-client/ 原教程使用的是c++ server和...
这个"thrift-Demo"应该是一个演示如何使用Thrift的实例,包含了一系列的步骤和文件,帮助初学者理解Thrift的工作原理和使用方法。 在Thrift中,我们首先需要创建一个`.thrift`文件,这个文件定义了服务的接口和数据...
Thrift-0.9.1 是一个开源的跨语言服务开发框架,由Facebook于2007年开源,后来成为了Apache软件基金会的顶级项目。这个版本的Thrift源码包包含了一系列的工具和库,使得开发者能够高效地构建分布式系统。Thrift的...
THRIFT 是一个开源的跨语言服务开发框架,由 Facebook 在 2007 年创建并贡献给了 Apache 基金会。它旨在提供一种高效、可扩展、跨平台的解决方案,用于构建分布式系统中的应用程序接口(API)。通过 THRIFT,开发者...
在IT领域,尤其是在大数据处理和分布式系统中,HBase是一个重要的NoSQL数据库,它基于Apache Hadoop并提供了高性能、列式存储、可扩展的实时读写能力。而Thrift是一种跨语言的服务框架,由Facebook开发,它允许不同...
Thrift是一种开源的跨语言服务开发框架,最初由Facebook开发并贡献给Apache基金会。它允许开发者定义服务接口和数据类型,然后...这个例子是一个很好的起点,帮助初学者理解Thrift的工作机制以及如何在实际项目中应用。
thrift.demo.rar中的内容应该是一个简单的Thrift示例,帮助初学者快速上手Thrift的使用。 1. **Thrift IDL**: Thrift IDL是Thrift的核心部分,类似于SOAP的WSDL或CORBA的IDL,它用于定义服务接口、数据结构和常量...
对于“毕业设计论文”和“计算机案例”,Thrift是一个很好的研究主题,因为它涉及到分布式系统的设计、跨语言通信的实现以及性能优化等多个方面。通过学习和使用Thrift,学生可以深入理解服务化架构和如何在实际项目...
Avro的一个显著特点是它支持动态Schema,允许在不兼容的Schema之间进行数据交换。此外,Avro还提供了数据版本控制机制,确保在Schema变化时能正确处理旧数据。 在这个实例中,你可能会发现Thrift和Avro的示例代码,...
Apache HBase 是一款基于 Apache Hadoop 和 Apache ZooKeeper 的分布式、版本化、面向列的数据库。它是NoSQL数据库的一种,特别适用于处理大规模数据,尤其在实时读写场景下表现出色。HBase的设计目标是为海量稀疏...
第二个文件是"dong-rpc_master.zip",这个名字暗示了它可能是一个关于分布式远程过程调用(RPC)框架的项目源代码或者一个叫做"Dong RPC"的开源库。RPC使得一个程序可以在不理解底层网络协议的情况下,调用另一个...
3. **Apache Thrift**:一个开源集成软件框架,用于构建跨语言服务。它允许使用C++和其他语言创建高效的RPC(远程过程调用)系统。 4. **OpenCV**:一个开源计算机视觉库,包含了图像处理和计算机视觉的许多算法。 ...
- **定义**:Spark SQL 是 Apache Spark 的一个模块,用于处理结构化数据。它提供了 DataFrame 和 Dataset API,以及支持 SQL 查询的能力。这些特性使得 Spark SQL 成为处理大数据集时的一种高效工具。 - **特点**:...
【标题】"dubbo-admin-2.6.0.war(dubbo监控中心)" 提供的是一个用于管理Dubbo服务的控制台,它允许开发者和运维人员对Dubbo服务进行实时监控和管理。这个war文件是基于Java的Web应用程序,可以部署在支持Servlet ...
Hector 是一个 Java 客户端库,用于简化 Apache Cassandra 的使用。它提供了一系列高级功能,如连接池管理、故障检测与恢复、基本负载均衡等,这些功能在原生的 Cassandra Java API 中并未直接提供。对于刚接触 ...
### Hadoop 学习资源详解 ...通过以上资源,学习者可以从多个角度深入了解Hadoop的各个方面,无论是理论知识还是实践经验都能得到有效的提升。希望这些资料能够帮助大家更好地掌握Hadoop这一强大的大数据处理平台。
- **定义**:HBase 是一个分布式、版本化的、面向列的开源数据库。它建立在 Apache Hadoop 和 ZooKeeper 之上,提供了高可靠性和高性能的随机读写能力。 - **版权与历史**:该文档版权属于 2012 年的 Apache ...