`
halloffame
  • 浏览: 56132 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Apache Thrift 初学小讲(二)【一个简单示例】

阅读更多

一 生成代码

首先写一个接口定义文件ThriftTest.thrift,里面定义了一个User的struct和一个getUser接口,以java为例,cmd执行命令thrift --gen java ThriftTest.thrift,成功后会在ThriftTest.thrift文件所在的目录生成一个gen-java的文件夹,里面包含了生成的代码文件:gen-java\thrift\test\ThriftTest.java和gen-java\thrift\test\User.java。

 

接口定义文件ThriftTest.thrift:

namespace java thrift.test

struct User
{
    1: i32 id,
    2: string name
}

service ThriftTest
{
    User getUser(1: i32 id)
}

 

实现ThriftTest.thrift里的getUser接口(编写TestHandler类):

import org.apache.thrift.TException;

import thrift.test.ThriftTest;
import thrift.test.User;

public class TestHandler implements ThriftTest.Iface {

	@Override
	public User getUser(int id) throws TException {
		if (id == 2 ) {
			User user = new User();
			user.setId(2);
			user.setName("另外一个烟火");
			return user;
		}
		return null;
	}

}

 

二 编写server

 编写thrift的server端需要指定transport_type(通信方式),protocol_type(通信协议),server_type(服务器模式)。本例中实现了根据启动程序的命令行参数值来动态的指定transport_type,protocol_type和server_type。

 

1--transport_type本例中只列举使用了以下三种:

 

(1) buffered:使用经典的缓冲流Socket;

(2) framed:基于帧的方式的Socket,每个帧都是按照4字节的帧长加上帧的内容来组织,帧内容就是我们要收发的数据。读的时候按长度预先将整Frame数据读入Buffer,再从Buffer慢慢读取。写的时候,每次flush将Buffer中的所有数据写成一个Frame。framed这种方式有点类似于http协议的chunked编码

(3) fastframed:和framed相比是内存利用率更高的一个内存读写缓存区,它使用自动增长的byte[](不够长度才new),而不是每次都new一个byte[],提高了内存的使用率。framed的ReadBuffer每次读入Frame时都会创建新的byte[],WriteBuffer每次flush时如果大于初始1K也会重新创建byte[]。

 

2--protocol_type本例中只列举使用了以下三种:

 

(1) binary:二进制编码格式进行数据传输;

(2) json:使用JSON的数据编码协议进行数据传输;

(3) compact:高效率的,密集的二进制编码格式进行数据传输。

 

3--server_type本例中只列举使用了以下四种:

 

(1) simple:单线程阻塞io;

(2) thread-pool:多线程阻塞io;

(3) nonblocking:单条线程非阻塞io;

(4) threaded-selector:非阻塞io,有一条线程专门负责accept,若干条Selector线程处理网络IO,一个Worker线程池处理消息。

 

PS:

其它的transport_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\transport目录下的例子。

其它的protocol_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\protocol目录下的例子。 

其它的server_type请参考下载的thrift-0.9.3\lib\java\test\org\apache\thrift\server目录下的例子。

 

server_type中有个叫THsHaServer,相比threaded-selector的区别是只有一条AcceptSelect线程处理关于网络的一切,一个Worker线程池处理消息。threaded-selector模式是目前Thrift提供的最高级的模式,对于大部分应用场景性能都不会差,因此,如果实在不知道选择哪种工作模式,使用threaded-selector就可以。

 

服务端TestServer.java文件:

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory;

import thrift.test.ThriftTest;

public class TestServer {
	public static void main(String [] args) {
		try {
	        int port = 9090;
	        boolean ssl = false; //传输是否加密
	        String transport_type = "buffered"; //需要和客户端的一致才能正常通信
	        String protocol_type = "binary"; //需要和客户端的一致才能正常通信
	        String server_type = "thread-pool";
	        
	        try {
	        	//从启动程序的命令行参数获取一些值,包括protocol_type,server_type和transport_type等
	        	for (int i = 0; i < args.length; i++) {
	        		if (args[i].startsWith("--port")) {
	        			port = Integer.valueOf(args[i].split("=")[1]);
	        		} else if (args[i].startsWith("--server-type")) {
	        			server_type = args[i].split("=")[1];
	        			server_type.trim();
	        		} else if (args[i].startsWith("--port")) {
	        			port = Integer.parseInt(args[i].split("=")[1]);
	        		} else if (args[i].startsWith("--protocol")) {
	        			protocol_type = args[i].split("=")[1];
	        			protocol_type.trim();
	        		} else if (args[i].startsWith("--transport")) {
	        			transport_type = args[i].split("=")[1];
	        			transport_type.trim();
	        		} else if (args[i].equals("--ssl")) {
	        			ssl = true;
	        		} else if (args[i].equals("--help")) {
			            System.out.println("Allowed options:");
			            System.out.println("  --help\t\t\tProduce help message");
			            System.out.println("  --port=arg (=" + port + ")\tPort number to connect");
			            System.out.println("  --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed");
			            System.out.println("  --protocol=arg (=" + protocol_type + ")\tProtocol: binary, json, compact");
			            System.out.println("  --ssl\t\t\tEncrypted Transport using SSL");
			            System.out.println("  --server-type=arg (=" + server_type +")\n\t\t\t\tType of server: simple, thread-pool, nonblocking, threaded-selector");
			            System.exit(0);
	        		}
	        	}
	        } catch (Exception e) {
	        	System.err.println("Can not parse arguments! See --help");
	        	System.exit(1);
	        }
	
	        try {
	        	//检查传入的变量值是否正确
		        if (server_type.equals("simple")) {
		        } else if (server_type.equals("thread-pool")) {
		        } else if (server_type.equals("nonblocking")) {
		        	if (ssl == true) {
		        		throw new Exception("SSL is not supported over nonblocking servers!");
		        	}
		        } else if (server_type.equals("threaded-selector")) {
		        	if (ssl == true) {
		        		throw new Exception("SSL is not supported over nonblocking servers!");
		        	}
		        } else {
		        	throw new Exception("Unknown server type! " + server_type);
		        }
		        
		        if (protocol_type.equals("binary")) {
		        } else if (protocol_type.equals("json")) {
		        } else if (protocol_type.equals("compact")) {
		        } else {
		        	throw new Exception("Unknown protocol type! " + protocol_type);
		        }
		        if (transport_type.equals("buffered")) {
		        } else if (transport_type.equals("framed")) {
		        } else if (transport_type.equals("fastframed")) {
		        } else {
		        	throw new Exception("Unknown transport type! " + transport_type);
		        }
	        } catch (Exception e) {
	        	System.err.println("Error: " + e.getMessage());
	        	System.exit(1);
	        }
	
	        // Processor
	        TestHandler testHandler = new TestHandler(); //具体的业务逻辑类,实现ThriftTest.thrift里的getUser接口
	        //ThriftTest.Processor是生成的服务端代码
	        ThriftTest.Processor testProcessor = new ThriftTest.Processor(testHandler);
	
	        // Protocol factory
	        TProtocolFactory tProtocolFactory = null; //指定的通信协议
	        if (protocol_type.equals("json")) {
	        	tProtocolFactory = new TJSONProtocol.Factory();
	        } else if (protocol_type.equals("compact")) {
	        	tProtocolFactory = new TCompactProtocol.Factory();
	        } else {
	        	tProtocolFactory = new TBinaryProtocol.Factory();
	        }
	
	        TTransportFactory tTransportFactory = null; //指定的通信方式
	
	        if (transport_type.equals("framed")) {
	        	tTransportFactory = new TFramedTransport.Factory();
	        } else if (transport_type.equals("fastframed")) {
	        	tTransportFactory = new TFastFramedTransport.Factory();
	        } else { // .equals("buffered") => default value
	        	tTransportFactory = new TTransportFactory();
	        }
	
	        TServer serverEngine = null; //指定的服务器模式	
	
	        if (server_type.equals("nonblocking") || server_type.equals("threaded-selector")) {
	        	// Nonblocking servers
	        	TNonblockingServerSocket tNonblockingServerSocket =
	        			new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(port));
	
		        if (server_type.equals("nonblocking")) {
		        	// Nonblocking Server
		        	TNonblockingServer.Args tNonblockingServerArgs
		              	= new TNonblockingServer.Args(tNonblockingServerSocket);
		        	tNonblockingServerArgs.processor(testProcessor);
		        	tNonblockingServerArgs.protocolFactory(tProtocolFactory);
		        	tNonblockingServerArgs.transportFactory(tTransportFactory);
		
		        	serverEngine = new TNonblockingServer(tNonblockingServerArgs);
		        } else { // server_type.equals("threaded-selector")
		        	// ThreadedSelector Server
		        	TThreadedSelectorServer.Args tThreadedSelectorServerArgs
		              	= new TThreadedSelectorServer.Args(tNonblockingServerSocket);
		        	tThreadedSelectorServerArgs.processor(testProcessor);
		        	tThreadedSelectorServerArgs.protocolFactory(tProtocolFactory);
		        	tThreadedSelectorServerArgs.transportFactory(tTransportFactory);
		
		        	serverEngine = new TThreadedSelectorServer(tThreadedSelectorServerArgs);
		        }
	        } else {
	        	// Blocking servers	
	        	// SSL socket
	        	TServerSocket tServerSocket = null;
	        	if (ssl) {
	        		tServerSocket = TSSLTransportFactory.getServerSocket(port, 0);
	        	} else {
	        		tServerSocket = new TServerSocket(new TServerSocket.ServerSocketTransportArgs().port(port));
	        	}
	
	        	if (server_type.equals("simple")) {
	        		// Simple Server
	        		TServer.Args tServerArgs = new TServer.Args(tServerSocket);
	        		tServerArgs.processor(testProcessor);
	        		tServerArgs.protocolFactory(tProtocolFactory);
	        		tServerArgs.transportFactory(tTransportFactory);
	
	        		serverEngine = new TSimpleServer(tServerArgs);
	        	} else { // server_type.equals("threadpool")
	        		// ThreadPool Server
	        		TThreadPoolServer.Args tThreadPoolServerArgs
	        			= new TThreadPoolServer.Args(tServerSocket);
	        		tThreadPoolServerArgs.processor(testProcessor);
	        		tThreadPoolServerArgs.protocolFactory(tProtocolFactory);
	        		tThreadPoolServerArgs.transportFactory(tTransportFactory);
	
	        		serverEngine = new TThreadPoolServer(tThreadPoolServerArgs);
	        	}
	        }
	
	        // Run it
	        System.out.println("Starting the server on port " + port + "...");
	        System.out.println("transport_type:" + transport_type);
	        System.out.println("protocol_type:" + protocol_type);
	        System.out.println("server_type:" + server_type);
	        serverEngine.serve();
	
	    } catch (Exception x) {
	    	x.printStackTrace();
	    }
	    System.out.println("done.");
	}
}

 

三 编写client 

编写thrift的client 端需要指定transport_type(通信方式),protocol_type(通信协议),这两个需要跟服务端的一致才能正常通信。

 

客户端TestClient.java文件:

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import thrift.test.ThriftTest;
import thrift.test.User;

public class TestClient {
    public static void main(String [] args) {
	    String host = "localhost";
	    int port = 9090;
	    String protocol_type = "binary"; //需要和服务端的一致才能正常通信
	    String transport_type = "buffered"; //需要和服务端的一致才能正常通信
	    boolean ssl = false; //传输是否加密	
	    int socketTimeout = 1000;
	
	    try {
	    	//从启动程序的命令行参数获取一些值,包括protocol_type和transport_type等
	    	for (int i = 0; i < args.length; ++i) {
		        if (args[i].startsWith("--host")) {
		            host = args[i].split("=")[1];
		            host.trim();
		        } else if (args[i].startsWith("--port")) {
		            port = Integer.valueOf(args[i].split("=")[1]); 
		        } else if (args[i].equals("--timeout")) {
		            socketTimeout = Integer.valueOf(args[i].split("=")[1]);
		        } else if (args[i].startsWith("--protocol")) {
		            protocol_type = args[i].split("=")[1];
		            protocol_type.trim();
		        } else if (args[i].startsWith("--transport")) {
		            transport_type = args[i].split("=")[1];
		            transport_type.trim();
		        } else if (args[i].equals("--ssl")) {
		            ssl = true;
		        } else if (args[i].equals("--help")) {
		            System.out.println("Allowed options:");
		            System.out.println("  --help\t\t\tProduce help message"); 
		            System.out.println("  --host=arg (=" + host + ")\tHost to connect");
		            System.out.println("  --port=arg (=" + port + ")\tPort number to connect");
		            System.out.println("  --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed, http");
		            System.out.println("  --protocol=arg (=" + protocol_type + ")\tProtocol: binary, json, compact");
		            System.out.println("  --ssl\t\t\tEncrypted Transport using SSL");
		            System.exit(0);
		        }
	        }
	    } catch (Exception x) {
	        System.err.println("Can not parse arguments! See --help");
	        System.exit(1);
	    }
	
	    try {
	    	//检查传入的变量值是否正确
	        if (protocol_type.equals("binary")) {
	        } else if (protocol_type.equals("compact")) {
	        } else if (protocol_type.equals("json")) {
	        } else {
	            throw new Exception("Unknown protocol type! " + protocol_type); 
	        }
	        
	        if (transport_type.equals("buffered")) {
	        } else if (transport_type.equals("framed")) {
	        } else if (transport_type.equals("fastframed")) {
	        } else if (transport_type.equals("http")) {
	        } else {
	            throw new Exception("Unknown transport type! " + transport_type);
	        }
	        
	        if (transport_type.equals("http") && ssl == true) { //不支持https
	            throw new Exception("SSL is not supported over http.");
	        }
	    } catch (Exception e) {
	        System.err.println("Error: " + e.getMessage());
	        System.exit(1);
	    }
	
	    TTransport transport = null; //指定的通信方式
	
	    try {
	        if (transport_type.equals("http")) { //http的transport_type(通信方式)在下一节中写个例子
	        	String url = "http://" + host + ":" + port + "/service";
	        	transport = new THttpClient(url);
	        } else {
	        	TSocket socket = null;
		        if (ssl == true) {
		            socket = TSSLTransportFactory.getClientSocket(host, port, 0);
		        } else {
		            socket = new TSocket(host, port);
		        }
		        
		        socket.setTimeout(socketTimeout);
		        
		        transport = socket;
		        if (transport_type.equals("buffered")) {
		        } else if (transport_type.equals("framed")) {
		            transport = new TFramedTransport(transport);
		        } else if (transport_type.equals("fastframed")) {
		            transport = new TFastFramedTransport(transport);
		        }
	        }
	    } catch (Exception x) {
	        x.printStackTrace();
	        System.exit(1);
	    }
	
	    TProtocol tProtocol = null; //指定的通信协议
	    if (protocol_type.equals("json")) {
	        tProtocol = new TJSONProtocol(transport);
	    } else if (protocol_type.equals("compact")) {
	        tProtocol = new TCompactProtocol(transport);
	    } else {
	        tProtocol = new TBinaryProtocol(transport);
	    }
	
	    //ThriftTest.Client是生成的客户端代码
	    ThriftTest.Client testClient = new ThriftTest.Client(tProtocol);
    
	    try {
	        System.out.println("connect " + host + ":" + port);	
	        System.out.println("protocol_type:" + protocol_type);
	        System.out.println("transport_type:" + transport_type);
	        if (transport.isOpen() == false) {
		        try {
		            transport.open();
		        } catch (TTransportException ttx) {
		            ttx.printStackTrace();
		            System.out.println("Connect failed: " + ttx.getMessage());
		            System.exit(1);
		        }
	        }

	        User user = testClient.getUser(2); //getUser就是ThriftTest.thrift所定义的接口
	        System.out.println("名字:"+ user.getName());

	        transport.close();
	    } catch (Exception x) {
	        x.printStackTrace();
	        System.exit(1);
	    }

	    System.exit(0);
    }
}

 

四 运行结果

--服务端--

Starting the server on port 9090...

transport_type:fastframed

protocol_type:binary

server_type:simple

 

--客户端--

connect localhost:9090

protocol_type:binary

transport_type:fastframed

名字:另外一个烟火

 

五 工程文件结构

 


 有不明的地方建议先看Apache Thrift 初学小讲(一),附件src.rar是源代码文件。

  • src.rar (12.5 KB)
  • 下载次数: 21
2
1
分享到:
评论

相关推荐

    Apache Thrift 初学小讲(五)【代理】

    在"Apache Thrift 初学小讲(五)【代理】"这篇博文中,我们将探讨Thrift如何实现代理服务,这在分布式系统中非常关键,因为代理可以提供负载均衡、安全控制、监控等功能。 1. **接口定义语言(IDL)**:Thrift ...

    Apache Thrift 初学小讲(七)【负载均衡】

    在本篇初学小讲中,我们将重点关注Thrift在负载均衡方面的应用,这对于构建大型分布式系统至关重要。Thrift通过提供一种高效的数据序列化机制以及RPC(远程过程调用)接口,使得不同编程语言之间可以轻松地进行通信...

    用C#和C++写的Apache Thrift的小范例

    本例改编自Apache Thrift教程: http://mikecvet.wordpress.com/2010/05/13/apache-thrift-tutorial-the-sequel/ http://chanian.com/2010/05/13/thrift-tutorial-a-php-client/ 原教程使用的是c++ server和...

    thrift-Demo

    这个"thrift-Demo"应该是一个演示如何使用Thrift的实例,包含了一系列的步骤和文件,帮助初学者理解Thrift的工作原理和使用方法。 在Thrift中,我们首先需要创建一个`.thrift`文件,这个文件定义了服务的接口和数据...

    thrift-0.9.1

    Thrift-0.9.1 是一个开源的跨语言服务开发框架,由Facebook于2007年开源,后来成为了Apache软件基金会的顶级项目。这个版本的Thrift源码包包含了一系列的工具和库,使得开发者能够高效地构建分布式系统。Thrift的...

    THRIFT 学习资料

    THRIFT 是一个开源的跨语言服务开发框架,由 Facebook 在 2007 年创建并贡献给了 Apache 基金会。它旨在提供一种高效、可扩展、跨平台的解决方案,用于构建分布式系统中的应用程序接口(API)。通过 THRIFT,开发者...

    thrift1 查询hbase

    在IT领域,尤其是在大数据处理和分布式系统中,HBase是一个重要的NoSQL数据库,它基于Apache Hadoop并提供了高性能、列式存储、可扩展的实时读写能力。而Thrift是一种跨语言的服务框架,由Facebook开发,它允许不同...

    thrift c++ php

    Thrift是一种开源的跨语言服务开发框架,最初由Facebook开发并贡献给Apache基金会。它允许开发者定义服务接口和数据类型,然后...这个例子是一个很好的起点,帮助初学者理解Thrift的工作机制以及如何在实际项目中应用。

    thrift.demo.rar

    thrift.demo.rar中的内容应该是一个简单的Thrift示例,帮助初学者快速上手Thrift的使用。 1. **Thrift IDL**: Thrift IDL是Thrift的核心部分,类似于SOAP的WSDL或CORBA的IDL,它用于定义服务接口、数据结构和常量...

    Thrift服务开发框架 v0.16.0.gz

    对于“毕业设计论文”和“计算机案例”,Thrift是一个很好的研究主题,因为它涉及到分布式系统的设计、跨语言通信的实现以及性能优化等多个方面。通过学习和使用Thrift,学生可以深入理解服务化架构和如何在实际项目...

    Thrift和Avro实例

    Avro的一个显著特点是它支持动态Schema,允许在不兼容的Schema之间进行数据交换。此外,Avro还提供了数据版本控制机制,确保在Schema变化时能正确处理旧数据。 在这个实例中,你可能会发现Thrift和Avro的示例代码,...

    hbase中文文档,适合初学者学习

    Apache HBase 是一款基于 Apache Hadoop 和 Apache ZooKeeper 的分布式、版本化、面向列的数据库。它是NoSQL数据库的一种,特别适用于处理大规模数据,尤其在实时读写场景下表现出色。HBase的设计目标是为海量稀疏...

    Java_20231129.zip

    第二个文件是"dong-rpc_master.zip",这个名字暗示了它可能是一个关于分布式远程过程调用(RPC)框架的项目源代码或者一个叫做"Dong RPC"的开源库。RPC使得一个程序可以在不理解底层网络协议的情况下,调用另一个...

    实用的c++函数库电子书

    3. **Apache Thrift**:一个开源集成软件框架,用于构建跨语言服务。它允许使用C++和其他语言创建高效的RPC(远程过程调用)系统。 4. **OpenCV**:一个开源计算机视觉库,包含了图像处理和计算机视觉的许多算法。 ...

    Spark SQL学习笔记

    - **定义**:Spark SQL 是 Apache Spark 的一个模块,用于处理结构化数据。它提供了 DataFrame 和 Dataset API,以及支持 SQL 查询的能力。这些特性使得 Spark SQL 成为处理大数据集时的一种高效工具。 - **特点**:...

    dubbo-admin-2.6.0.war(dubbo监控中心)

    【标题】"dubbo-admin-2.6.0.war(dubbo监控中心)" 提供的是一个用于管理Dubbo服务的控制台,它允许开发者和运维人员对Dubbo服务进行实时监控和管理。这个war文件是基于Java的Web应用程序,可以部署在支持Servlet ...

    hector client guide

    Hector 是一个 Java 客户端库,用于简化 Apache Cassandra 的使用。它提供了一系列高级功能,如连接池管理、故障检测与恢复、基本负载均衡等,这些功能在原生的 Cassandra Java API 中并未直接提供。对于刚接触 ...

    hadoop学习资料地址

    ### Hadoop 学习资源详解 ...通过以上资源,学习者可以从多个角度深入了解Hadoop的各个方面,无论是理论知识还是实践经验都能得到有效的提升。希望这些资料能够帮助大家更好地掌握Hadoop这一强大的大数据处理平台。

    HBase 官方文档

    - **定义**:HBase 是一个分布式、版本化的、面向列的开源数据库。它建立在 Apache Hadoop 和 ZooKeeper 之上,提供了高可靠性和高性能的随机读写能力。 - **版权与历史**:该文档版权属于 2012 年的 Apache ...

Global site tag (gtag.js) - Google Analytics